← Back to Documentation

Streaming Execution

Process arbitrarily large datasets with constant memory. Same ARO syntax, streaming pipeline under the hood.

Syntax Transparency

Streaming is an internal optimization, not a new feature. You write the exact same ARO code. The runtime automatically detects pipeline patterns and executes them as memory-efficient streams.

Overview

When ARO processes a large file, the naive approach loads everything into memory at once. A 10 GB CSV file would consume 20 GB or more of RAM, crashing most systems. With streaming execution, the same code runs in roughly 1 MB of memory by processing data one row at a time.

Step Without Streaming With Streaming
Read file 10 GB (full load) 64 KB (chunked)
Parse rows 15-20 GB (all at once) ~1 KB per row
Filter 15-20 GB (new array) 0 (pass/reject inline)
Peak memory ~20 GB ~1 MB

Syntax Transparency

There are no new keywords. The code you already write is the streaming code:

(* This is automatically streamed - no syntax change needed *)
Read the <data> from the <file: "huge.csv">.
Filter the <filtered> from <data> where <status> = "active".
Log <filtered> to the <console>.

Under the hood, ARO sees this as a pipeline of three stages. Each row flows from Read through Filter to Log without the entire dataset ever being held in memory.

  ┌──────────┐    ┌──────────┐    ┌──────────┐
  │  Read    │───>│  Filter  │───>│   Log    │
  │ (Stream) │    │ (Stream) │    │ (Drain)  │
  └──────────┘    └──────────┘    └──────────┘
       │               │               │
       │    Row 1      │    Pass?      │    Print
       │──────────────>│──────────────>│────────> console
       │    Row 2      │    Reject     │
       │──────────────>│───────X       │
       │    Row 3      │    Pass?      │    Print
       │──────────────>│──────────────>│────────> console

  Memory: O(1) per row, not O(n) for entire dataset

Action Classification

Actions fall into three categories that determine how they participate in a streaming pipeline:

Category Actions Behavior
Transformations Filter, Map, Transform, Parse, Validate Process one element at a time, pass it downstream
Drains Log, Return, Store, Send, Reduce Consume the stream and produce a final result
Barriers Sort, GroupBy, Distinct, Reverse Need the full dataset; use external algorithms with disk spillover

Explicit Control

Streaming is the default. If you need the legacy eager behavior for small files that will be accessed multiple times, use the eager qualifier:

(* Default: streaming - constant memory *)
Read the <data> from the <file: "huge.csv">.

(* Explicit: load everything into memory *)
<Read: eager> the <data> from the <file: "small.csv">.

Stream Tee

A stream can only be consumed once. When the same variable feeds into multiple downstream operations of different types, ARO automatically creates a Stream Tee that splits the stream for parallel consumption:

Filter the <active> from <orders> where <status> = "active".
Reduce the <total> from <active> with sum(<amount>).  (* consumer 1 *)
Log <active> to the <console>.                       (* consumer 2 *)

The Stream Tee uses a bounded ring buffer so both consumers see every element. The fastest consumer drives the stream; slower consumers read from the buffer. If a consumer falls behind, the buffer spills to disk automatically.

                          ┌─────────────────┐
  active-stream ────────> │   Stream Tee    │
                          └─────────────────┘
                               │         │
                               v         v
                          Consumer 1  Consumer 2
                          (Reduce)    (Log)

Pipeline Optimization

Aggregation Fusion

When multiple Reduce operations consume the same stream, ARO's semantic analyzer fuses them into a single pass. Instead of iterating the data three times, the runtime maintains all accumulators simultaneously:

(* These three reduces are fused into one pass *)
Reduce the <total> from <active-orders> with sum(<amount>).
Reduce the <count> from <active-orders> with count().
Reduce the <avg> from <active-orders> with avg(<amount>).

The parser detects that all three operations share the same source (active-orders) and rewrites them into a single fused reduce. Memory usage is O(1) regardless of dataset size -- just the accumulators.

Approach Memory Passes Over Data
Naive (3 separate reduces) O(n) materialized 3
Stream Tee O(buffer) bounded 1 (shared)
Aggregation Fusion O(1) accumulators 1

Unified Data Sources

Every data source in ARO produces the same stream abstraction. The same filter, reduce, and transform operations work identically whether the data comes from a file, an HTTP response, a socket, or a repository:

Source ARO Syntax Chunk Strategy
CSV File Read from <file: "x.csv"> Line by line
JSON File Read from <file: "x.json"> Array elements
JSONL File Read from <file: "x.jsonl"> Line by line (native)
HTTP Response Request from <url> Chunked transfer
TCP Socket Extract from <packet: data> Packet by packet
Repository Retrieve from <repo> Cursor-based
Directory List from <directory> Entry by entry

Example: Multi-Stage Pipeline

This example processes a list of transactions through multiple filter stages, then runs aggregation on the result. In streaming mode, each row flows through all stages immediately -- non-matching rows are discarded without allocating memory:

(Application-Start: Streaming Pipeline Demo) {
    Create the <transactions> with [
        {id: 1, year: "2024", amount: 1200, status: "completed", category: "electronics"},
        {id: 2, year: "2024", amount: 450, status: "pending", category: "electronics"},
        {id: 3, year: "2024", amount: 800, status: "completed", category: "furniture"},
        {id: 4, year: "2024", amount: 950, status: "completed", category: "electronics"},
        {id: 5, year: "2024", amount: 2000, status: "completed", category: "electronics"}
    ].

    (* Multi-stage filter pipeline *)
    Filter the <high-value> from <transactions>
        where <amount> > 500.

    Filter the <completed> from <high-value>
        where <status> = "completed".

    Filter the <electronics> from <completed>
        where <category> = "electronics".

    (* Aggregation fusion: all reduces run in a single pass *)
    Reduce the <total-amount> from <electronics> with sum(<amount>).
    Reduce the <count> from <electronics> with count().
    Reduce the <avg-amount> from <electronics> with avg(<amount>).
    Reduce the <max-amount> from <electronics> with max(<amount>).

    Log <total-amount> to the <console>.
    Log <count> to the <console>.
    Log <avg-amount> to the <console>.
    Log <max-amount> to the <console>.

    Return an <OK: status> for the <streaming-demo>.
}

Example: JSONL Streaming

JSONL (JSON Lines) is the ideal format for streaming because each line is an independently parseable JSON object. ARO reads JSONL files line by line without loading the entire file:

(Application-Start: Streaming Demo) {
    (* Read JSONL file - streamed line by line in production *)
    Read the <log-data> from "./events.jsonl".

    (* Filter errors - streaming: pass/reject each element *)
    Filter the <errors> from <log-data> where <level> = "ERROR".

    (* Multi-stage filter pipeline *)
    Filter the <api-events> from <log-data> where <service> = "api".
    Filter the <api-errors> from <api-events> where <level> = "ERROR".

    (* Reduce with O(1) memory accumulators *)
    Reduce the <error-count> from <errors> with count().
    Reduce the <total-time> from <api-events> with sum(<time>).
    Reduce the <avg-time> from <api-events> with avg(<time>).

    Log <error-count> to the <console>.
    Log <avg-time> to the <console>.

    Return an <OK: status> for the <demo>.
}

Learn More

See the full streaming specification in ARO-0051: Streaming Execution Engine.