Enterprise Message Routing
PatternKit messaging routing primitives model common Enterprise Integration Patterns for in-process workflows. They are small fluent builders over delegates, designed to compose with Message<TPayload> and MessageContext.
These APIs do not provide broker delivery, persistence, or cross-process guarantees. Use them behind your transport, queue consumer, API handler, background worker, or generated dispatcher when the message is already in your process.
Content-Based Router
ContentRouter<TPayload, TResult> selects the first matching route.
using PatternKit.Messaging;
using PatternKit.Messaging.Routing;
var router = ContentRouter<Order, string>.Create()
.When((m, _) => m.Payload.Total > 100m).Then((_, _) => "priority")
.Default((_, _) => "standard")
.Build();
var route = router.Route(Message<Order>.Create(new Order("order-1", 150m)));
Use AsyncContentRouter<TPayload, TResult> when predicates or handlers need async work.
Recipient List
RecipientList<TPayload> sends a message to every matching recipient in registration order and returns the names that received it.
var delivered = new List<string>();
var recipients = RecipientList<Order>.Create()
.To("audit", (_, _) => delivered.Add("audit"))
.When("billing", (m, _) => m.Payload.Total > 0m)
.Then((_, _) => delivered.Add("billing"))
.Build();
var result = recipients.Dispatch(Message<Order>.Create(new Order("order-1", 150m)));
Use AsyncRecipientList<TPayload> for async recipient handlers.
Use [GenerateRecipientList] when the recipient map is part of application structure and should be compiled into a strongly typed factory:
[GenerateRecipientList(typeof(Order))]
public static partial class OrderRecipients
{
private static bool IsPriority(Message<Order> message, MessageContext context)
=> message.Payload.Priority == "priority";
[RecipientListRecipient("priority-audit", 10, nameof(IsPriority))]
private static void PriorityAudit(Message<Order> message, MessageContext context)
{
// deliver to audit sink
}
}
The generated factory returns the same RecipientList<TPayload> runtime type as the fluent API.
Splitter
Splitter<TPayload, TItem> turns one message into item messages. Child messages preserve the parent headers. When the parent has a message id and no causation id, child messages set CausationId to the parent MessageId.
var splitter = Splitter<Order, LineItem>.Create()
.Use((m, _) => m.Payload.Lines)
.Build();
var lineMessages = splitter.Split(orderMessage);
Use [GenerateSplitter] when the split projection is stable and should be compiled into a named factory:
[GenerateSplitter(typeof(Order), typeof(LineItem))]
public static partial class OrderLineSplitter
{
[SplitterProjection]
private static IEnumerable<LineItem> ProjectLines(Message<Order> message, MessageContext context)
=> message.Payload.Lines;
}
Aggregator
Aggregator<TKey, TItem, TResult> groups messages in memory until a completion policy is satisfied, then projects the completed group into a result and removes it from the open groups.
var aggregator = Aggregator<string, LineItem, decimal>.Create()
.KeyBy((m, _) => m.Headers.CorrelationId ?? "missing")
.CompleteWhen((_, messages, _) => messages.Count == 2)
.Project((_, messages, _) => messages.Sum(m => m.Payload.Amount))
.Build();
var result = aggregator.Add(lineMessage);
if (result.Completed)
{
Console.WriteLine(result.Result);
}
Duplicate message ids are ignored by default. Use DuplicateMessagePolicy.Replace or DuplicateMessagePolicy.Include when a workflow needs different behavior.
Use [GenerateAggregator] when the correlation, completion, and projection contract is part of the application topology:
[GenerateAggregator(typeof(string), typeof(LineItem), typeof(decimal))]
public static partial class OrderLineAggregator
{
[AggregatorCorrelation]
private static string Correlate(Message<LineItem> message, MessageContext context)
=> message.Headers.CorrelationId ?? message.Payload.Sku;
[AggregatorCompletion]
private static bool Complete(string key, IReadOnlyList<Message<LineItem>> messages, MessageContext context)
=> messages.Count == 2;
[AggregatorProjection]
private static decimal Project(string key, IReadOnlyList<Message<LineItem>> messages, MessageContext context)
=> messages.Sum(message => message.Payload.Amount);
}
Choosing Boundaries
Use these primitives for:
- deterministic in-process routing
- composing handlers behind a queue consumer or HTTP endpoint
- testable routing rules without broker dependencies
- dynamic flows that still need clear, explicit code
Use external infrastructure for:
- durable queues
- retry after process restart
- competing consumers
- exactly-once or at-least-once delivery contracts
- transactional outbox persistence
API
- ContentRouter<TPayload, TResult>
- AsyncContentRouter<TPayload, TResult>
- RecipientList<TPayload>
- AsyncRecipientList<TPayload>
- GenerateRecipientListAttribute
- RecipientListRecipientAttribute
- Splitter<TPayload, TItem>
- GenerateSplitterAttribute
- SplitterProjectionAttribute
- Aggregator<TKey, TItem, TResult>
- AggregationResult<TKey, TResult>
- DuplicateMessagePolicy
- GenerateAggregatorAttribute
- AggregatorCorrelationAttribute
- AggregatorCompletionAttribute
- AggregatorProjectionAttribute
Example Source
src/PatternKit.Examples/Messaging/MessageRoutingExample.cstest/PatternKit.Examples.Tests/Messaging/MessageRoutingExampleTests.cs