Iterator / Flow Pattern Guide
Comprehensive guide to using the Iterator/Flow pattern in PatternKit.
Overview
Flow provides a functional pipeline for transforming sequences with lazy evaluation. It extends the traditional Iterator pattern with LINQ-like operators plus sharing, forking, and branching capabilities.
flowchart LR
subgraph Pipeline
S[Source] --> M[Map]
M --> F[Filter]
F --> FM[FlatMap]
end
subgraph SharedPipeline
Share --> Fork1[Fork]
Share --> Fork2[Fork]
Share --> Branch
Branch --> True
Branch --> False
end
Getting Started
Installation
using PatternKit.Behavioral.Iterator;
Basic Usage
var result = Flow<int>.From(Enumerable.Range(1, 10))
.Map(x => x * 2) // Transform each element
.Filter(x => x > 10) // Keep matching elements
.FlatMap(x => new[] { x, x + 1 }) // Expand each element
.Tee(Console.WriteLine) // Side effect (debugging)
.ToList(); // Materialize
Core Concepts
Lazy Evaluation
All Flow operators are lazy - they don't execute until terminal operations:
// Nothing happens yet
var flow = Flow<int>.From(GetExpensiveData())
.Map(Process)
.Filter(Validate);
// Now it executes
var list = flow.ToList();
Basic Operators
Map
Transform each element:
Flow<int>.From(new[] { 1, 2, 3 })
.Map(x => x * 2) // 2, 4, 6
Filter
Keep elements matching predicate:
Flow<int>.From(new[] { 1, 2, 3, 4, 5 })
.Filter(x => x % 2 == 0) // 2, 4
FlatMap
One-to-many transformation:
Flow<string>.From(new[] { "hello", "world" })
.FlatMap(s => s.ToCharArray()) // h, e, l, l, o, w, o, r, l, d
Tee
Side effect without transformation:
Flow<int>.From(data)
.Tee(x => Console.WriteLine($"Processing: {x}"))
.Map(Process) // Tee doesn't change the flow
Terminal Operations
ToList / ToArray
Materialize the flow:
var list = flow.ToList();
var array = flow.ToArray();
Fold
Reduce to a single value:
var sum = Flow<int>.From(new[] { 1, 2, 3, 4, 5 })
.Fold(0, (acc, x) => acc + x); // 15
var max = Flow<int>.From(data)
.Fold(int.MinValue, Math.Max);
FirstOption
Get first element safely:
var first = Flow<int>.From(data)
.Filter(x => x > 100)
.FirstOption(); // Some(value) or None
Sharing and Forking
The Problem with Re-enumeration
Without sharing, each terminal operation re-executes the source:
var flow = Flow<int>.From(ExpensiveSource());
var list1 = flow.ToList(); // Executes source
var list2 = flow.ToList(); // Executes source AGAIN!
Share()
Buffer the source for replay:
var shared = Flow<int>.From(ExpensiveSource()).Share();
var list1 = shared.Fork().ToList(); // Executes source, buffers
var list2 = shared.Fork().ToList(); // Reads from buffer
Fork()
Create independent readers:
var shared = Flow<int>.From(data).Share();
var doubled = shared.Fork().Map(x => x * 2);
var filtered = shared.Fork().Filter(x => x > 10);
var squared = shared.Fork().Map(x => x * x);
// Each fork processes independently
Branch()
Partition by predicate:
var shared = Flow<int>.From(Enumerable.Range(1, 10)).Share();
var (evens, odds) = shared.Branch(x => x % 2 == 0);
var evenSum = evens.Fold(0, (a, x) => a + x); // 2+4+6+8+10 = 30
var oddSum = odds.Fold(0, (a, x) => a + x); // 1+3+5+7+9 = 25
Common Patterns
Data Transformation Pipeline
public class DataPipeline
{
public List<ProcessedRecord> Process(IEnumerable<RawRecord> records)
{
return Flow<RawRecord>.From(records)
.Filter(r => r.IsValid)
.Map(r => Normalize(r))
.Filter(r => !r.IsDuplicate)
.Map(r => Enrich(r))
.Map(r => Transform(r))
.ToList();
}
}
Log Processing
public class LogAnalyzer
{
public AnalysisResult Analyze(IEnumerable<LogEntry> logs)
{
var shared = Flow<LogEntry>.From(logs).Share();
var (errors, nonErrors) = shared.Branch(l => l.Level == "ERROR");
var (warnings, info) = nonErrors.Branch(l => l.Level == "WARN");
return new AnalysisResult
{
ErrorCount = errors.Fold(0, (c, _) => c + 1),
WarningCount = warnings.Fold(0, (c, _) => c + 1),
InfoCount = info.Fold(0, (c, _) => c + 1),
TopErrors = errors
.Map(e => e.Message)
.ToList()
.GroupBy(m => m)
.OrderByDescending(g => g.Count())
.Take(10)
.ToList()
};
}
}
Multi-Format Export
public class Exporter
{
public void Export(IEnumerable<Record> records)
{
var shared = Flow<Record>.From(records).Share();
// JSON export
var json = shared.Fork()
.Map(r => JsonSerializer.Serialize(r))
.ToList();
File.WriteAllText("export.json", $"[{string.Join(",", json)}]");
// CSV export
var csv = shared.Fork()
.Map(r => $"{r.Id},{r.Name},{r.Value}")
.ToList();
File.WriteAllLines("export.csv", csv);
// Summary
var summary = shared.Fork()
.Fold(new Summary(), (s, r) =>
{
s.Count++;
s.Total += r.Value;
return s;
});
Console.WriteLine($"Exported {summary.Count} records, total: {summary.Total}");
}
}
Related Types
ReplayableSequence
Multi-cursor random access:
var seq = ReplayableSequence<int>.From(Enumerable.Range(1, 100));
var cursor1 = seq.CreateCursor();
var cursor2 = seq.CreateCursor();
// Cursors move independently
cursor1.MoveNext(); cursor1.MoveNext(); // at position 2
cursor2.MoveNext(); // at position 1
// Look ahead without moving
var next = cursor1.Peek(5); // Look 5 ahead
WindowSequence
Sliding windows:
// Sliding window
var windows = WindowSequence<int>.Create(
Enumerable.Range(1, 5),
windowSize: 3);
// [1,2,3], [2,3,4], [3,4,5]
// Batching (non-overlapping)
var batches = WindowSequence<int>.Create(
Enumerable.Range(1, 10),
windowSize: 3,
stride: 3);
// [1,2,3], [4,5,6], [7,8,9], [10]
// Moving average
var data = new[] { 1.0, 2.0, 3.0, 4.0, 5.0 };
var movingAvg = WindowSequence<double>.Create(data, 3)
.Select(w => w.Average())
.ToList();
// 2.0, 3.0, 4.0
Extending the Pattern
Custom Operators
public static class FlowExtensions
{
public static Flow<T> Take<T>(this Flow<T> flow, int count)
{
return Flow<T>.From(TakeIterator(flow, count));
static IEnumerable<T> TakeIterator(Flow<T> f, int n)
{
int taken = 0;
foreach (var item in f)
{
if (taken++ >= n) yield break;
yield return item;
}
}
}
public static Flow<T> Skip<T>(this Flow<T> flow, int count)
{
return Flow<T>.From(SkipIterator(flow, count));
static IEnumerable<T> SkipIterator(Flow<T> f, int n)
{
int skipped = 0;
foreach (var item in f)
{
if (skipped++ < n) continue;
yield return item;
}
}
}
public static Flow<T> DistinctBy<T, TKey>(this Flow<T> flow, Func<T, TKey> keySelector)
{
return Flow<T>.From(DistinctIterator(flow, keySelector));
static IEnumerable<T> DistinctIterator(Flow<T> f, Func<T, TKey> selector)
{
var seen = new HashSet<TKey>();
foreach (var item in f)
{
if (seen.Add(selector(item)))
yield return item;
}
}
}
}
Best Practices
When to Share
- Multiple consumers: Fork before multiple terminal operations
- Expensive sources: Database queries, network calls
- Side effects in source: Prevent duplicate side effects
Memory Considerations
- SharedFlow buffers: All elements retained until GC
- Large datasets: Consider streaming approaches
- Clear references: Dispose flows when done
Performance Tips
- Filter early: Reduce elements before expensive operations
- Avoid unnecessary sharing: Single consumer doesn't need Share
- Use Fold for aggregation: More efficient than ToList + LINQ
Troubleshooting
Elements processed multiple times
Did you forget to Share() before forking?
// Wrong
var flow = Flow<int>.From(source);
flow.Map(x => x * 2).ToList();
flow.Filter(x => x > 10).ToList(); // Re-enumerates source!
// Right
var shared = Flow<int>.From(source).Share();
shared.Fork().Map(x => x * 2).ToList();
shared.Fork().Filter(x => x > 10).ToList();
Out of memory
SharedFlow buffers everything:
// May run out of memory for large sources
var shared = Flow<byte>.From(ReadHugeFile()).Share();
// Better: stream without sharing if possible
Flow<byte>.From(ReadHugeFile())
.Map(Process)
.ToList();