Streaming Execution
Process arbitrarily large datasets with constant memory. Same ARO syntax, streaming pipeline under the hood.
Syntax Transparency
Streaming is an internal optimization, not a new feature. You write the exact same ARO code. The runtime automatically detects pipeline patterns and executes them as memory-efficient streams.
Overview
When ARO processes a large file, the naive approach loads everything into memory at once. A 10 GB CSV file would consume 20 GB or more of RAM, crashing most systems. With streaming execution, the same code runs in roughly 1 MB of memory by processing data one row at a time.
| Step | Without Streaming | With Streaming |
|---|---|---|
| Read file | 10 GB (full load) | 64 KB (chunked) |
| Parse rows | 15-20 GB (all at once) | ~1 KB per row |
| Filter | 15-20 GB (new array) | 0 (pass/reject inline) |
| Peak memory | ~20 GB | ~1 MB |
Syntax Transparency
There are no new keywords. The code you already write is the streaming code:
(* This is automatically streamed - no syntax change needed *)
Read the <data> from the <file: "huge.csv">.
Filter the <filtered> from <data> where <status> = "active".
Log <filtered> to the <console>.
Under the hood, ARO sees this as a pipeline of three stages. Each row flows from
Read through Filter to Log without the entire
dataset ever being held in memory.
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Read │───>│ Filter │───>│ Log │
│ (Stream) │ │ (Stream) │ │ (Drain) │
└──────────┘ └──────────┘ └──────────┘
│ │ │
│ Row 1 │ Pass? │ Print
│──────────────>│──────────────>│────────> console
│ Row 2 │ Reject │
│──────────────>│───────X │
│ Row 3 │ Pass? │ Print
│──────────────>│──────────────>│────────> console
Memory: O(1) per row, not O(n) for entire dataset
Action Classification
Actions fall into three categories that determine how they participate in a streaming pipeline:
| Category | Actions | Behavior |
|---|---|---|
| Transformations | Filter, Map, Transform, Parse, Validate |
Process one element at a time, pass it downstream |
| Drains | Log, Return, Store, Send, Reduce |
Consume the stream and produce a final result |
| Barriers | Sort, GroupBy, Distinct, Reverse |
Need the full dataset; use external algorithms with disk spillover |
Explicit Control
Streaming is the default. If you need the legacy eager behavior for small files
that will be accessed multiple times, use the eager qualifier:
(* Default: streaming - constant memory *)
Read the <data> from the <file: "huge.csv">.
(* Explicit: load everything into memory *)
<Read: eager> the <data> from the <file: "small.csv">.
Stream Tee
A stream can only be consumed once. When the same variable feeds into multiple downstream operations of different types, ARO automatically creates a Stream Tee that splits the stream for parallel consumption:
Filter the <active> from <orders> where <status> = "active".
Reduce the <total> from <active> with sum(<amount>). (* consumer 1 *)
Log <active> to the <console>. (* consumer 2 *)
The Stream Tee uses a bounded ring buffer so both consumers see every element. The fastest consumer drives the stream; slower consumers read from the buffer. If a consumer falls behind, the buffer spills to disk automatically.
┌─────────────────┐
active-stream ────────> │ Stream Tee │
└─────────────────┘
│ │
v v
Consumer 1 Consumer 2
(Reduce) (Log)
Pipeline Optimization
Aggregation Fusion
When multiple Reduce operations consume the same stream, ARO's
semantic analyzer fuses them into a single pass.
Instead of iterating the data three times, the runtime maintains all accumulators
simultaneously:
(* These three reduces are fused into one pass *)
Reduce the <total> from <active-orders> with sum(<amount>).
Reduce the <count> from <active-orders> with count().
Reduce the <avg> from <active-orders> with avg(<amount>).
The parser detects that all three operations share the same source
(active-orders) and rewrites them into a single fused reduce.
Memory usage is O(1) regardless of dataset size -- just the accumulators.
| Approach | Memory | Passes Over Data |
|---|---|---|
| Naive (3 separate reduces) | O(n) materialized | 3 |
| Stream Tee | O(buffer) bounded | 1 (shared) |
| Aggregation Fusion | O(1) accumulators | 1 |
Unified Data Sources
Every data source in ARO produces the same stream abstraction. The same filter, reduce, and transform operations work identically whether the data comes from a file, an HTTP response, a socket, or a repository:
| Source | ARO Syntax | Chunk Strategy |
|---|---|---|
| CSV File | Read from <file: "x.csv"> |
Line by line |
| JSON File | Read from <file: "x.json"> |
Array elements |
| JSONL File | Read from <file: "x.jsonl"> |
Line by line (native) |
| HTTP Response | Request from <url> |
Chunked transfer |
| TCP Socket | Extract from <packet: data> |
Packet by packet |
| Repository | Retrieve from <repo> |
Cursor-based |
| Directory | List from <directory> |
Entry by entry |
Example: Multi-Stage Pipeline
This example processes a list of transactions through multiple filter stages, then runs aggregation on the result. In streaming mode, each row flows through all stages immediately -- non-matching rows are discarded without allocating memory:
(Application-Start: Streaming Pipeline Demo) {
Create the <transactions> with [
{id: 1, year: "2024", amount: 1200, status: "completed", category: "electronics"},
{id: 2, year: "2024", amount: 450, status: "pending", category: "electronics"},
{id: 3, year: "2024", amount: 800, status: "completed", category: "furniture"},
{id: 4, year: "2024", amount: 950, status: "completed", category: "electronics"},
{id: 5, year: "2024", amount: 2000, status: "completed", category: "electronics"}
].
(* Multi-stage filter pipeline *)
Filter the <high-value> from <transactions>
where <amount> > 500.
Filter the <completed> from <high-value>
where <status> = "completed".
Filter the <electronics> from <completed>
where <category> = "electronics".
(* Aggregation fusion: all reduces run in a single pass *)
Reduce the <total-amount> from <electronics> with sum(<amount>).
Reduce the <count> from <electronics> with count().
Reduce the <avg-amount> from <electronics> with avg(<amount>).
Reduce the <max-amount> from <electronics> with max(<amount>).
Log <total-amount> to the <console>.
Log <count> to the <console>.
Log <avg-amount> to the <console>.
Log <max-amount> to the <console>.
Return an <OK: status> for the <streaming-demo>.
}
Example: JSONL Streaming
JSONL (JSON Lines) is the ideal format for streaming because each line is an independently parseable JSON object. ARO reads JSONL files line by line without loading the entire file:
(Application-Start: Streaming Demo) {
(* Read JSONL file - streamed line by line in production *)
Read the <log-data> from "./events.jsonl".
(* Filter errors - streaming: pass/reject each element *)
Filter the <errors> from <log-data> where <level> = "ERROR".
(* Multi-stage filter pipeline *)
Filter the <api-events> from <log-data> where <service> = "api".
Filter the <api-errors> from <api-events> where <level> = "ERROR".
(* Reduce with O(1) memory accumulators *)
Reduce the <error-count> from <errors> with count().
Reduce the <total-time> from <api-events> with sum(<time>).
Reduce the <avg-time> from <api-events> with avg(<time>).
Log <error-count> to the <console>.
Log <avg-time> to the <console>.
Return an <OK: status> for the <demo>.
}
Learn More
See the full streaming specification in ARO-0051: Streaming Execution Engine.