Class InMemoryEventBus
In-memory ring-buffer event bus that also serves as the audit log.
All events published are retained up to capacity entries and
can be retrieved via GetEvents() for the Dashboard Logs page.
public sealed class InMemoryEventBus : IEventBus, IDisposable
- Inheritance
-
InMemoryEventBus
- Implements
- Inherited Members
Remarks
Design:
- GetEvents(CancellationToken) returns a non-blocking snapshot of the history ring buffer.
- PublishAsync(GatewayEvent, CancellationToken) applies subscription filters before dispatching.
- StreamAsync(string?, CancellationToken) streams live events from an unbounded channel; filter uses prefix matching.
Constructors
InMemoryEventBus(int)
public InMemoryEventBus(int capacity = 10000)
Parameters
capacityint
Methods
Create(int)
Creates a new IEventBus backed by an in-memory ring buffer.
Use as the factory in DI: services.AddSingleton<IEventBus>(InMemoryEventBus.Create());
public static IEventBus Create(int capacity = 10000)
Parameters
capacityint
Returns
Dispose()
Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
public void Dispose()
GetEvents(CancellationToken)
Returns a non-blocking snapshot of all events currently in the ring buffer, oldest first.
Returns an empty array immediately if ct is already cancelled.
Intended for audit / history queries from the Dashboard Logs page.
public Task<GatewayEvent[]> GetEvents(CancellationToken ct = default)
Parameters
Returns
- Task<GatewayEvent[]>
PublishAsync(GatewayEvent, CancellationToken)
Publishes an event to all subscribers.
public Task PublishAsync(GatewayEvent evt, CancellationToken ct = default)
Parameters
evtGatewayEventctCancellationToken
Returns
StreamAsync(string?, CancellationToken)
Streams live events to the caller. The eventTypeFilter is treated
as a prefix: a filter of "agent" matches "agent.started", "agent.completed", etc.
Pass null to receive all events.
public IAsyncEnumerable<GatewayEvent> StreamAsync(string? eventTypeFilter, CancellationToken ct = default)
Parameters
eventTypeFilterstringctCancellationToken
Returns
Subscribe(string?, Func<GatewayEvent, Task>)
Subscribes to events matching the given filter.
public IDisposable Subscribe(string? eventTypeFilter, Func<GatewayEvent, Task> handler)
Parameters
eventTypeFilterstringhandlerFunc<GatewayEvent, Task>