Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
01c8e39
feat(operators): implement native streaming async_execute for all ope…
claude Mar 4, 2026
ef73fd2
docs(design): add async operator strategy discussion with Kafka/Flink…
claude Mar 4, 2026
ff2be93
fix(operators): add strict validation to streaming async_execute paths
claude Mar 4, 2026
5f713a6
docs: add async_execute plan for Node classes
claude Mar 4, 2026
cd47875
docs(plan): comprehensive plan for process_packet/async_process_packe…
claude Mar 4, 2026
c6271ad
docs(plan): address concurrent paths — route through async_process_pa…
claude Mar 4, 2026
f7b05b8
feat: add async_execute to Node classes, unify process_packet interface
claude Mar 4, 2026
af9b7b7
test(channels): add async pipeline → sync DB retrieval examples
claude Mar 4, 2026
c618de3
test(pipeline): add Pipeline + @function_pod + async orchestrator int…
claude Mar 4, 2026
7ce6884
fix(function_pod): route FunctionNode.async_execute through async_pro…
claude Mar 5, 2026
6170726
test(pipeline): rewrite async integration test as single cohesive exa…
claude Mar 5, 2026
d158de5
refactor(orchestrator): run persistent nodes, return None
claude Mar 5, 2026
cf5871a
fix(operators): address Copilot review comments on async execution
claude Mar 5, 2026
48e3528
feat(join): restore streaming symmetric hash join with correct system…
claude Mar 5, 2026
b027522
test(system-tags): add sync vs async system-tag equivalence tests
claude Mar 5, 2026
5850c1e
refactor(join): pass pipeline hashes to async_execute instead of stor…
claude Mar 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 42 additions & 12 deletions DESIGN_ISSUES.md
Original file line number Diff line number Diff line change
Expand Up @@ -416,10 +416,10 @@ message text. A shared parameterized validation helper would eliminate the dupli
## `src/orcapod/core/operators/` — Async execution

### O1 — Operators use barrier-mode `async_execute` only; streaming/incremental overrides needed
**Status:** open
**Status:** in progress
**Severity:** medium

All operators currently use the default barrier-mode `async_execute` inherited from
All operators originally used the default barrier-mode `async_execute` inherited from
`StaticOutputPod`: collect all input rows into memory, materialize to `ArrowTableStream`(s),
run the existing sync `static_process`, then emit results. This works correctly but negates the
latency and memory benefits of the push-based channel model.
Expand All @@ -428,20 +428,27 @@ Three categories of improvement are planned:

1. **Streaming overrides (row-by-row, zero buffering)** — for operators that process rows
independently:
- `PolarsFilter` — evaluate predicate per row, emit or drop immediately
- `MapTags` / `MapPackets` — rename columns per row, emit immediately
- `SelectTagColumns` / `SelectPacketColumns` — project columns per row, emit immediately
- `DropTagColumns` / `DropPacketColumns` — drop columns per row, emit immediately
- ~~`PolarsFilter` — evaluate predicate per row, emit or drop immediately~~ (kept barrier:
Polars expressions require DataFrame context for evaluation)
- `MapTags` / `MapPackets` — rename columns per row, emit immediately ✅
- `SelectTagColumns` / `SelectPacketColumns` — project columns per row, emit immediately ✅
- `DropTagColumns` / `DropPacketColumns` — drop columns per row, emit immediately ✅

2. **Incremental overrides (stateful, eager emit)** — for multi-input operators that can
produce partial results before all inputs are consumed:
- `Join` — symmetric hash join: index each input by tag keys, emit matches as they arrive
- `MergeJoin` — same approach, with list-merge on colliding packet columns
- `SemiJoin` — buffer the right (filter) input fully, then stream the left input and emit
matches (right must be fully consumed first, but left can stream)
- `Join` — symmetric hash join for 2 inputs (streaming, with correct
system-tag name-extending via `input_pipeline_hashes` passed directly
to `async_execute`); barrier fallback for N>2 inputs via `static_process`. ✅
- `MergeJoin` — kept barrier: complex column-merging logic
- `SemiJoin` — build right, stream left through hash lookup ✅

3. **Streaming accumulation:**
- `Batch` — emit full batches as they accumulate (`batch_size > 0`); barrier fallback
when `batch_size == 0` (batch everything) ✅

3. **Barrier-only (no change needed):**
- `Batch` — inherently requires all rows before grouping; barrier mode is correct
**Remaining:** `PolarsFilter` (barrier), `MergeJoin` (barrier) could receive incremental
overrides in the future but require careful handling of Polars expression evaluation and
system-tag evolution respectively.

---

Expand Down Expand Up @@ -527,6 +534,29 @@ await AddResult(grade_pf).async_execute([input_ch], output_ch)

---

## `src/orcapod/hashing/semantic_hashing/`

### H1 — Semantic hasher does not support PEP 604 union types (`int | None`)
**Status:** open
**Severity:** medium

The `BaseSemanticHasher` raises `BeartypeDoorNonpepException` when hashing a
`PythonPacketFunction` whose return type uses PEP 604 syntax (`int | None`).
The hasher's `_handle_unknown` path receives `types.UnionType` (the Python 3.10+ type for
`X | Y` expressions) and has no registered handler for it.

`typing.Optional[int]` also fails (different error path through beartype).

This means packet functions cannot use union return types — a common pattern for functions
that may filter packets by returning `None`.

**Workaround:** Use non-union return types and raise/return sentinel values instead.

**Fix needed:** Register a `TypeHandlerProtocol` for `types.UnionType` (and
`typing.Union`/`typing.Optional`) in the semantic hasher's type handler registry.

---

### G2 — Pod Group abstraction for other composite pod patterns
**Status:** open
**Severity:** low
Expand Down
52 changes: 47 additions & 5 deletions orcapod-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -468,15 +468,57 @@ async def async_execute(

Nodes consume `(Tag, Packet)` pairs from input channels and produce them to an output channel. This enables push-based, streaming execution where data flows through the pipeline as soon as it's available, with backpressure propagated via bounded channel buffers.

**Operator async strategies:**
**FunctionPod async strategy:** Streaming mode — each input `(tag, packet)` is processed independently with semaphore-controlled concurrency. Uses `asyncio.TaskGroup` for structured concurrency.

#### Operator Async Strategies

Each operator overrides `async_execute` with the most efficient streaming pattern its semantics permit. The default fallback (inherited from `StaticOutputPod`) is barrier mode: collect all inputs via `asyncio.gather`, materialize to `ArrowTableStream`, call `static_process`, and emit results. Operators override this default when a more incremental strategy is possible.

| Strategy | Description | Operators |
|---|---|---|
| **Barrier mode** (default) | Collect all inputs, run `static_process`, emit results | Batch (inherently barrier) |
| **Streaming overrides** | Process rows individually, zero buffering | PolarsFilter, MapTags, MapPackets, Select/Drop columns |
| **Incremental overrides** | Stateful, emit partial results as inputs arrive | Join (symmetric hash join), MergeJoin, SemiJoin (buffer right, stream left) |
| **Per-row streaming** | Transform each `(Tag, Packet)` independently as it arrives; zero buffering beyond the current row | SelectTagColumns, SelectPacketColumns, DropTagColumns, DropPacketColumns, MapTags, MapPackets |
| **Accumulate-and-emit** | Buffer rows up to `batch_size`, emit full batches immediately, flush partial at end | Batch (`batch_size > 0`) |
| **Build-probe** | Collect one side fully (build), then stream the other through a hash lookup (probe) | SemiJoin |
| **Symmetric hash join** | Read both sides concurrently, buffer + index both, emit matches as they're found | Join (2 inputs) |
| **Barrier mode** | Collect all inputs, run `static_process`, emit results | PolarsFilter, MergeJoin, Batch (`batch_size = 0`), Join (N > 2 inputs) |

**FunctionPod async strategy:** Streaming mode — each input `(tag, packet)` is processed independently with semaphore-controlled concurrency. Uses `asyncio.TaskGroup` for structured concurrency.
#### Per-Row Streaming (Unary Column/Map Operators)

For operators that transform each row independently (column selection, column dropping, column renaming), the async path iterates `async for tag, packet in inputs[0]` and applies the transformation per row. Column metadata (which columns to drop, the rename map, etc.) is computed lazily on the first row and cached for subsequent rows. This avoids materializing the entire input into an Arrow table, enabling true pipeline-level streaming where upstream producers and downstream consumers run concurrently.

#### Accumulate-and-Emit (Batch)

When `batch_size > 0`, Batch accumulates rows into a buffer and emits a batched result stream each time the buffer reaches `batch_size`. Any partial batch at the end is emitted unless `drop_partial_batch` is set. When `batch_size = 0` (meaning "batch everything into one group"), the operator must see all input before producing output, so it falls back to barrier mode.

#### Build-Probe (SemiJoin)

SemiJoin is non-commutative: the left side is filtered by the right side. The async implementation collects the right (build) side fully, constructs a hash set of its key tuples, then streams the left (probe) side through the lookup — emitting each left row whose keys appear in the right set. This is the same pattern as Kafka's KStream-KTable join: the table side is materialized, the stream side drives output.

#### Symmetric Hash Join

The 2-input Join uses a symmetric hash join — the same algorithm used by Apache Kafka for KStream-KStream joins and by Apache Flink for regular streaming joins. Both input channels are drained concurrently into a shared `asyncio.Queue`. For each arriving row:

1. Buffer the row on its side and index it by the shared key columns.
2. Probe the opposite side's index for matching keys.
3. Emit all matches immediately.

When the first rows from both sides have arrived, the shared key columns are determined (intersection of tag column names). Any rows that arrived before shared keys were known are re-indexed and cross-matched in a one-time reconciliation step.

**Comparison with industry stream processors:**

| Aspect | Kafka Streams (KStream-KStream) | Apache Flink (Regular Join) | OrcaPod |
|---|---|---|---|
| Algorithm | Symmetric windowed hash join | Symmetric hash join with state TTL | Symmetric hash join |
| Windowing | Required (sliding window bounds state) | Optional (TTL evicts old state) | Not needed (finite streams) |
| State backend | RocksDB state stores for fault tolerance | RocksDB / heap state with checkpointing | In-memory buffers |
| State cleanup | Window expiry evicts old records | TTL or watermark eviction | Natural termination — inputs are finite |
| N-way joins | Chained pairwise joins | Chained pairwise joins | 2-way: symmetric hash; N > 2: barrier + Arrow join |

The symmetric hash join is optimal for our use case: it emits results with minimum latency (as soon as a match exists on both sides) and requires no windowing complexity since OrcaPod streams are finite. For N > 2 inputs, the operator falls back to barrier mode with Arrow-level join execution, which is efficient for bounded data and avoids the complexity of chaining pairwise streaming joins.

**Why not build-probe for Join?** Since Join is commutative and input sizes are unknown upfront, there is no principled way to choose which side to build vs. probe. Symmetric hash join avoids this asymmetry. SemiJoin, being non-commutative, has a natural build (right) and probe (left) side.

**Why barrier for PolarsFilter and MergeJoin?** PolarsFilter requires a Polars DataFrame context for predicate evaluation, which needs full materialization. MergeJoin's column-merging semantics (colliding columns become sorted `list[T]`) require seeing all rows to produce correctly typed output columns.

### Sync / Async Equivalence

Expand Down
Loading