Table of Contents

Reactive Extensions

The Extensions.Reactive package adds streaming step support via IAsyncEnumerable<T>.

Installation

dotnet add package WorkflowFramework.Extensions.Reactive

IAsyncStep<T>

A step that produces a stream of results instead of a single output:

using WorkflowFramework.Extensions.Reactive;

public class PagedFetchStep : IAsyncStep<Order>
{
    public string Name => "FetchOrders";

    public async IAsyncEnumerable<Order> ExecuteStreamingAsync(IWorkflowContext context)
    {
        int page = 0;
        while (true)
        {
            var batch = await api.GetOrdersAsync(page++);
            if (batch.Count == 0) yield break;
            foreach (var order in batch)
                yield return order;
        }
    }
}

AsyncStepAdapter<T>

Wraps an IAsyncStep<T> so it can be used as a standard IStep in any workflow. Results are collected into a list and stored in context:

var step = new AsyncStepAdapter<Order>(new PagedFetchStep());

var workflow = new WorkflowBuilder()
    .Step(step)
    .Build();

await workflow.RunAsync(context);
var orders = (List<Order>)context.Properties["FetchOrders.Results"];

Extension Methods

// Collect all results
List<Order> orders = await asyncStep.CollectAsync(context);

// Process each result as it arrives
await asyncStep.ForEachAsync(context, async order =>
{
    await ProcessOrderAsync(order);
});