Table of Contents

Class InMemoryEventBus

Namespace
JD.AI.Core.Events
Assembly
JD.AI.Core.dll

In-memory ring-buffer event bus that also serves as the audit log. All events published are retained up to capacity entries and can be retrieved via GetEvents() for the Dashboard Logs page.

public sealed class InMemoryEventBus : IEventBus, IDisposable
Inheritance
InMemoryEventBus
Implements
Inherited Members

Remarks

Design:

Constructors

InMemoryEventBus(int)

public InMemoryEventBus(int capacity = 10000)

Parameters

capacity int

Methods

Create(int)

Creates a new IEventBus backed by an in-memory ring buffer. Use as the factory in DI: services.AddSingleton<IEventBus>(InMemoryEventBus.Create());

public static IEventBus Create(int capacity = 10000)

Parameters

capacity int

Returns

IEventBus

Dispose()

Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.

public void Dispose()

GetEvents(CancellationToken)

Returns a non-blocking snapshot of all events currently in the ring buffer, oldest first. Returns an empty array immediately if ct is already cancelled. Intended for audit / history queries from the Dashboard Logs page.

public Task<GatewayEvent[]> GetEvents(CancellationToken ct = default)

Parameters

ct CancellationToken

Returns

Task<GatewayEvent[]>

PublishAsync(GatewayEvent, CancellationToken)

Publishes an event to all subscribers.

public Task PublishAsync(GatewayEvent evt, CancellationToken ct = default)

Parameters

evt GatewayEvent
ct CancellationToken

Returns

Task

StreamAsync(string?, CancellationToken)

Streams live events to the caller. The eventTypeFilter is treated as a prefix: a filter of "agent" matches "agent.started", "agent.completed", etc. Pass null to receive all events.

public IAsyncEnumerable<GatewayEvent> StreamAsync(string? eventTypeFilter, CancellationToken ct = default)

Parameters

eventTypeFilter string
ct CancellationToken

Returns

IAsyncEnumerable<GatewayEvent>

Subscribe(string?, Func<GatewayEvent, Task>)

Subscribes to events matching the given filter.

public IDisposable Subscribe(string? eventTypeFilter, Func<GatewayEvent, Task> handler)

Parameters

eventTypeFilter string
handler Func<GatewayEvent, Task>

Returns

IDisposable