Observer Pattern Guide
Comprehensive guide to using the Observer pattern in PatternKit.
Overview
The Observer pattern defines a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically. This implementation provides a thread-safe event hub with predicate filtering.
flowchart LR
P[Publisher] -->|Publish| H[Observer Hub]
H -->|Notify| S1[Subscriber 1]
H -->|Notify| S2[Subscriber 2]
H -->|Skip| S3[Subscriber 3 - filtered]
Getting Started
Installation
using PatternKit.Behavioral.Observer;
Basic Usage
// Create the hub
var hub = Observer<string>.Create()
.OnError((ex, in msg) => Console.Error.WriteLine($"Handler failed: {ex.Message}"))
.ThrowAggregate()
.Build();
// Subscribe
var subscription = hub.Subscribe((in string msg) => Console.WriteLine(msg));
// Publish
hub.Publish("Hello, World!");
// Unsubscribe
subscription.Dispose();
Core Concepts
Event Hub
The central coordinator that manages subscriptions and delivers events:
var hub = Observer<OrderEvent>.Create()
.OnError((ex, in e) => Log.Error(ex, "Handler failed for {OrderId}", e.OrderId))
.SwallowErrors() // Don't let one handler break others
.Build();
Subscriptions
Subscribe to receive events:
// Subscribe to all events
var allSub = hub.Subscribe((in OrderEvent e) => ProcessOrder(e));
// Subscribe with filter
var urgentSub = hub.Subscribe(
(in OrderEvent e) => e.Priority == Priority.Urgent,
(in OrderEvent e) => HandleUrgent(e));
Publishing
Broadcast events to all matching subscribers:
hub.Publish(new OrderEvent { OrderId = 123, Priority = Priority.Normal });
Error Handling Policies
ThrowAggregate (Default)
Run all handlers, collect exceptions, throw AggregateException:
var hub = Observer<Event>.Create()
.OnError((ex, in e) => Log.Warn(ex.Message)) // Called for each
.ThrowAggregate() // Throws AggregateException at end
.Build();
ThrowFirstError
Stop on first exception, propagate immediately:
var hub = Observer<Event>.Create()
.ThrowFirstError() // Stops on first failure
.Build();
SwallowErrors
Never throw, errors go to sink only:
var hub = Observer<Event>.Create()
.OnError((ex, in e) => Log.Error(ex.Message)) // Always called
.SwallowErrors() // Never throws
.Build();
Common Patterns
Event Bus
Loosely-coupled component communication:
public abstract record AppEvent(DateTime Timestamp);
public record UserLoggedIn(DateTime Timestamp, string UserId) : AppEvent(Timestamp);
public record SettingsChanged(DateTime Timestamp, string Key) : AppEvent(Timestamp);
public class EventBus
{
private readonly Observer<AppEvent> _hub;
public EventBus()
{
_hub = Observer<AppEvent>.Create()
.SwallowErrors()
.Build();
}
public IDisposable Subscribe<T>(Action<T> handler) where T : AppEvent
{
return _hub.Subscribe(
(in AppEvent e) => e is T,
(in AppEvent e) => handler((T)e));
}
public void Publish(AppEvent @event) => _hub.Publish(@event);
}
// Usage
var bus = new EventBus();
bus.Subscribe<UserLoggedIn>(e => Console.WriteLine($"Welcome, {e.UserId}!"));
bus.Subscribe<SettingsChanged>(e => RefreshUI());
bus.Publish(new UserLoggedIn(DateTime.UtcNow, "alice"));
Domain Events
Broadcast domain changes:
public class Order
{
private readonly Observer<OrderEvent> _events;
public Order(Observer<OrderEvent> events)
{
_events = events;
}
public void Ship(string trackingNumber)
{
Status = OrderStatus.Shipped;
TrackingNumber = trackingNumber;
_events.Publish(new OrderShipped(Id, trackingNumber, DateTime.UtcNow));
}
}
// Handlers registered elsewhere
hub.Subscribe((in OrderShipped e) => emailService.SendShippingNotification(e));
hub.Subscribe((in OrderShipped e) => analyticsService.TrackShipment(e));
Price Feed
Real-time data distribution:
public class StockFeed
{
private readonly Observer<PriceUpdate> _hub;
public StockFeed()
{
_hub = Observer<PriceUpdate>.Create()
.OnError((ex, in p) => Log.Error(ex, "Price handler failed for {Symbol}", p.Symbol))
.SwallowErrors()
.Build();
}
public IDisposable SubscribeSymbol(string symbol, Action<PriceUpdate> handler) =>
_hub.Subscribe(
(in PriceUpdate p) => p.Symbol == symbol,
(in PriceUpdate p) => handler(p));
public IDisposable SubscribeMovements(decimal threshold, Action<PriceUpdate> handler) =>
_hub.Subscribe(
(in PriceUpdate p) => Math.Abs(p.Change) >= threshold,
(in PriceUpdate p) => handler(p));
public void PublishUpdate(PriceUpdate update) => _hub.Publish(update);
}
Progress Reporting
Multiple progress displays:
public class ProgressReporter
{
private readonly Observer<ProgressUpdate> _hub;
public ProgressReporter()
{
_hub = Observer<ProgressUpdate>.Create()
.SwallowErrors()
.Build();
}
public IDisposable Subscribe(Action<ProgressUpdate> handler) =>
_hub.Subscribe((in ProgressUpdate p) => handler(p));
public void Report(int current, int total, string message = "") =>
_hub.Publish(new ProgressUpdate(current, total, message));
}
// Usage
var reporter = new ProgressReporter();
// UI progress bar
reporter.Subscribe(p => progressBar.Value = (int)(p.Current * 100.0 / p.Total));
// Console output
reporter.Subscribe(p => Console.Write($"\r[{p.Current}/{p.Total}] {p.Message}"));
// Log every 10%
int lastPercent = 0;
reporter.Subscribe(p =>
{
int percent = p.Current * 100 / p.Total;
if (percent / 10 > lastPercent / 10)
{
Log.Info("Progress: {Percent}%", percent);
lastPercent = percent;
}
});
Extending the Pattern
Async Handlers
Wrap async operations:
public static IDisposable SubscribeAsync<T>(
this Observer<T> hub,
Func<T, Task> asyncHandler)
{
return hub.Subscribe((in T e) =>
{
// Fire and forget with error handling
_ = Task.Run(async () =>
{
try
{
await asyncHandler(e);
}
catch (Exception ex)
{
Log.Error(ex, "Async handler failed");
}
});
});
}
Buffered Publishing
Batch notifications:
public class BufferedObserver<T>
{
private readonly Observer<IReadOnlyList<T>> _hub;
private readonly List<T> _buffer = new();
private readonly int _batchSize;
private readonly object _lock = new();
public BufferedObserver(int batchSize)
{
_batchSize = batchSize;
_hub = Observer<IReadOnlyList<T>>.Create().Build();
}
public void Add(T item)
{
IReadOnlyList<T>? batch = null;
lock (_lock)
{
_buffer.Add(item);
if (_buffer.Count >= _batchSize)
{
batch = _buffer.ToList();
_buffer.Clear();
}
}
if (batch != null)
_hub.Publish(batch);
}
public void Flush()
{
IReadOnlyList<T>? batch = null;
lock (_lock)
{
if (_buffer.Count > 0)
{
batch = _buffer.ToList();
_buffer.Clear();
}
}
if (batch != null)
_hub.Publish(batch);
}
}
Typed Event Bus
Strong typing with multiple event types:
public class TypedEventBus
{
private readonly ConcurrentDictionary<Type, object> _hubs = new();
public IDisposable Subscribe<T>(Action<T> handler)
{
var hub = GetHub<T>();
return hub.Subscribe((in T e) => handler(e));
}
public void Publish<T>(T @event)
{
var hub = GetHub<T>();
hub.Publish(@event);
}
private Observer<T> GetHub<T>()
{
return (Observer<T>)_hubs.GetOrAdd(typeof(T), _ =>
Observer<T>.Create().SwallowErrors().Build());
}
}
Best Practices
Subscription Management
- Dispose subscriptions: Prevent memory leaks
- Use using statements: For scoped subscriptions
- Track subscriptions: Store IDisposable references
public class Component : IDisposable
{
private readonly List<IDisposable> _subscriptions = new();
public Component(Observer<Event> hub)
{
_subscriptions.Add(hub.Subscribe(HandleEvent));
}
public void Dispose()
{
foreach (var sub in _subscriptions)
sub.Dispose();
}
}
Error Handling
- Use SwallowErrors for resilience: One handler shouldn't break others
- Always provide OnError: Log failures
- Keep handlers simple: Complex logic should be in separate methods
Thread Safety
- Hub is thread-safe: Publish and Subscribe from any thread
- Handlers may run concurrently: If publishing from multiple threads
- Dispose is atomic: Safe to call from any thread
Troubleshooting
Handlers not called
- Check subscription wasn't disposed
- Verify filter predicate matches
- Ensure Publish is called after Subscribe
Memory leak
- Store and dispose subscription references
- Use weak references for long-lived subscribers
- Dispose subscriptions when component is destroyed
Handler exceptions
- Check error policy (ThrowAggregate vs SwallowErrors)
- Verify OnError handler is registered
- Look for exceptions in OnError sink