AsyncObserver
An asynchronous, thread-safe event hub for broadcasting events of type TEvent to multiple subscribers. Handlers and predicates are ValueTask-based, allowing you to await I/O without blocking.
Use it when your observers perform async work (I/O, timers, pipelines) and you want a clean, fluent API matching the synchronous Observer<TEvent>.
What it is
- Typed:
AsyncObserver<TEvent>delivers strongly-typed events. - Async-first:
PredicateandHandlerreturnValueTask. - Cancellation:
PublishAsyncaccepts aCancellationToken. - Lock-free, copy-on-write subscriptions: publish iterates a snapshot without locks.
- Predicate filters: decide per-subscriber whether to run the handler.
- Immutable & thread-safe after
Build().
TL;DR example
using PatternKit.Behavioral.Observer;
var hub = AsyncObserver<string>.Create()
.OnError(static (ex, in msg) => { Console.Error.WriteLine(ex.Message); return ValueTask.CompletedTask; })
.ThrowAggregate() // default
.Build();
hub.Subscribe(async (in string msg) => { await Task.Delay(1); Console.WriteLine($"ALL:{msg}"); });
hub.Subscribe(
predicate: static (in string msg) => new ValueTask<bool>(msg.StartsWith("warn:", StringComparison.Ordinal)),
handler: async (in string msg) => { await Task.Yield(); Console.WriteLine($"WARN:{msg}"); });
await hub.PublishAsync("hello");
await hub.PublishAsync("warn: disk");
API shape
var hub = AsyncObserver<TEvent>.Create()
.OnError(static (Exception ex, in TEvent evt) => /* log */ ValueTask.CompletedTask) // optional
.ThrowAggregate() // default
// .ThrowFirstError()
// .SwallowErrors()
.Build();
IDisposable s1 = hub.Subscribe(static (in TEvent evt) => /* ValueTask handler */);
IDisposable s2 = hub.Subscribe(static (in TEvent evt) => /* ValueTask<bool> filter */, static (in TEvent evt) => /* ValueTask handler */);
await hub.PublishAsync(evt, cancellationToken);
PublishAsync(TEvent evt, CancellationToken ct = default)drives the async flow; it does not takeinbecause async methods cannot havein/ref/outparameters.- Delegates still use
in TEventfor zero-copy of large structs.
Error handling policies
Same as Observer<TEvent>:
- ThrowAggregate (default): run all matching handlers; collect exceptions and throw a single
AggregateExceptionat the end. Error sink is awaited for each failure. - ThrowFirstError: throw immediately on the first failing handler; remaining handlers do not run.
- SwallowErrors: never throw from
PublishAsync; failures go only to the error sink if configured.
Error sink forms:
- Async:
.OnError((ex, in evt) => ValueTask) - Sync adapter:
.OnError((ex, in evt) => void)which is adapted toValueTask.
Interop with synchronous Observer
You can reuse synchronous delegates via adapter overloads:
var asyncHub = AsyncObserver<int>.Create().Build();
asyncHub.Subscribe(static (in int x) => x > 0, static (in int x) => Console.WriteLine($"+:{x}"));
asyncHub.Subscribe(static (in int x) => Console.WriteLine(x));
These overloads wrap sync delegates in ValueTask delegates with zero allocations on the fast path.
Notes
- Ordering: handlers run in registration order.
- Reentrancy: subscribing/unsubscribing during
PublishAsyncaffects subsequent publishes. - Cancellation:
PublishAsyncchecks the token between subscribers; a cancellation stops the loop withOperationCanceledException. - Performance: copy-on-write subscriptions keep publish contention-free; avoid heavy per-event allocations in handlers.
See also
- Observer for the synchronous variant
- ActionChain and Mediator for other orchestration styles