Add native async streaming overrides for core operators#70
Conversation
|
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
This PR introduces native (non–barrier-mode) async_execute implementations for several core operators so that channel-based pipelines can process (Tag, Packet) rows incrementally, reducing latency and avoiding full-input materialization where possible.
Changes:
- Added streaming/ incremental
async_executeoverrides for column selection/dropping, tag/packet mapping, batching, semijoin, and join. - Added a large async operator equivalence/integration test suite focused on these new native async paths.
- Updated design documentation to describe the new async execution strategies and current status.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
tests/test_channels/test_native_async_operators.py |
Adds comprehensive async operator tests and pipeline integration scenarios. |
src/orcapod/core/operators/column_selection.py |
Adds per-row streaming async_execute for select/drop tag/packet column operators. |
src/orcapod/core/operators/mappers.py |
Adds per-row streaming async_execute for MapTags / MapPackets. |
src/orcapod/core/operators/batch.py |
Adds accumulate-and-emit streaming async_execute for Batch when batch_size > 0. |
src/orcapod/core/operators/semijoin.py |
Adds build-probe async execution: collect right, stream left with hash lookup. |
src/orcapod/core/operators/join.py |
Adds async execution with single-input passthrough, symmetric hash join for 2 inputs, barrier fallback for N>2. |
orcapod-design.md |
Documents operator async strategy table and detailed strategy descriptions. |
DESIGN_ISSUES.md |
Updates O1 status and notes on which operators are streaming vs barrier. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/orcapod/core/operators/join.py
Outdated
| # Merge system tags with side suffix to avoid collisions | ||
| merged_sys: dict = {} | ||
| for k, v in left_tag.system_tags().items(): | ||
| merged_sys[k] = v | ||
| for k, v in right_tag.system_tags().items(): | ||
| merged_sys[k] = v |
There was a problem hiding this comment.
_merge_row_pair merges left/right Tag.system_tags() by plain dict update, which can overwrite colliding system-tag keys and does not apply Join's canonical system-tag renaming scheme (see _predict_system_tag_schema() / arrow_data_utils.append_to_system_tags() in static_process). This makes async_execute results diverge from the documented/system-tagged schema and can lose provenance data. Consider falling back to barrier/static_process whenever either side has any system tags, or implement the same renaming logic in async (which likely requires passing upstream ordering/identity metadata into async_execute).
| # Merge system tags with side suffix to avoid collisions | |
| merged_sys: dict = {} | |
| for k, v in left_tag.system_tags().items(): | |
| merged_sys[k] = v | |
| for k, v in right_tag.system_tags().items(): | |
| merged_sys[k] = v | |
| # NOTE: Async join does not implement the canonical system-tag | |
| # renaming scheme used in the static/barrier path. To avoid | |
| # silently dropping or corrupting provenance metadata, we do not | |
| # permit system-tagged rows here. | |
| left_sys = left_tag.system_tags() | |
| right_sys = right_tag.system_tags() | |
| if left_sys or right_sys: | |
| raise InputValidationError( | |
| "Join.async_execute does not support system tags; " | |
| "use the barrier/static Join path for system-tagged inputs." | |
| ) | |
| merged_sys: dict = {} |
src/orcapod/core/operators/join.py
Outdated
| # Merge both channels into one tagged queue | ||
| _SENTINEL = object() | ||
| queue: asyncio.Queue = asyncio.Queue() | ||
|
|
There was a problem hiding this comment.
The merged asyncio.Queue in _symmetric_hash_join is created unbounded (no maxsize). That breaks backpressure: if inputs produce faster than the join loop can consume/send downstream, the queue can grow without bound and increase memory usage. Consider giving the queue a bounded maxsize (and letting _drain await queue.put) or using a different multiplexing approach (e.g., asyncio.wait on receive tasks) that preserves backpressure.
src/orcapod/core/operators/join.py
Outdated
| Both sides are read concurrently via a merged queue. Each | ||
| arriving row is added to its side's index and immediately probed | ||
| against the opposite side. Matched rows are emitted to | ||
| ``output`` as soon as found, so downstream consumers can begin | ||
| work before either input is fully consumed. | ||
|
|
||
| For correct system-tag column naming the implementation falls | ||
| back to ``static_process`` (which requires materialised streams) | ||
| when the total result set is ready. |
There was a problem hiding this comment.
The _symmetric_hash_join docstring claims it “falls back to static_process … when the total result set is ready” for correct system-tag column naming, but the implementation never performs that fallback (it streams merged rows directly). Either implement the fallback or update the docstring/comments so behavior and limitations are explicit.
| Both sides are read concurrently via a merged queue. Each | |
| arriving row is added to its side's index and immediately probed | |
| against the opposite side. Matched rows are emitted to | |
| ``output`` as soon as found, so downstream consumers can begin | |
| work before either input is fully consumed. | |
| For correct system-tag column naming the implementation falls | |
| back to ``static_process`` (which requires materialised streams) | |
| when the total result set is ready. | |
| Both sides are read concurrently via a merged queue. Each | |
| arriving row is added to its side's index and immediately probed | |
| against the opposite side. Matched rows are emitted to | |
| ``output`` as soon as they are found, so downstream consumers | |
| can begin work before either input is fully consumed. | |
| Unlike :meth:`static_process`, this streaming implementation does | |
| not perform a second pass over materialised streams and does not | |
| normalise system-tag column naming in any special way; joined | |
| rows are forwarded with whatever tag-related columns are present | |
| in the incoming packets. |
| right_stream = self._materialize_to_stream(right_rows) if right_rows else None | ||
| if right_stream is None: | ||
| # No right data at all — need to check schemas. | ||
| # Empty right with common keys → empty result. | ||
| # Since we can't build a right stream with 0 rows, |
There was a problem hiding this comment.
In the if not right_rows: branch, right_stream = self._materialize_to_stream(right_rows) if right_rows else None will always evaluate to None (right_rows is known empty here), making the later static_process(left_stream, right_stream) path unreachable. Consider removing this dead code and clarifying the intended semantics for empty-right inputs in channel execution (and updating the docstring accordingly).
DESIGN_ISSUES.md
Outdated
| - `Join` — symmetric hash join (kept barrier: complex system-tag name-extending logic) | ||
| - `MergeJoin` — same approach (kept barrier: complex column-merging logic) | ||
| - `SemiJoin` — build right, stream left through hash lookup ✅ |
There was a problem hiding this comment.
DESIGN_ISSUES.md still states that Join is “kept barrier” due to system-tag logic, but this PR adds a native async_execute + symmetric hash join implementation in src/orcapod/core/operators/join.py. Please update this status text (and the “Remaining” list below) to reflect the current implementation and/or its limitations (e.g., barrier fallback when system tags are present).
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 18 out of 18 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/orcapod/core/operators/join.py
Outdated
| # Merge both channels into one tagged queue | ||
| _SENTINEL = object() | ||
| queue: asyncio.Queue = asyncio.Queue() | ||
|
|
||
| async def _drain( | ||
| ch: ReadableChannel[tuple[TagProtocol, PacketProtocol]], | ||
| side: int, | ||
| ) -> None: | ||
| async for item in ch: | ||
| await queue.put((side, item)) | ||
| await queue.put((side, _SENTINEL)) |
There was a problem hiding this comment.
The symmetric hash join uses an unbounded asyncio.Queue(). This can bypass channel backpressure: a fast producer can drain bounded input channels into an unbounded queue, growing memory without bound. Use a bounded queue (asyncio.Queue(maxsize=...)) sized to the configured channel buffers (or another conservative bound) so upstream backpressure is preserved.
src/orcapod/core/operators/join.py
Outdated
| """ | ||
| from orcapod.core.datagrams import Packet, Tag | ||
|
|
||
| # Merge both channels into one tagged queue | ||
| _SENTINEL = object() | ||
| queue: asyncio.Queue = asyncio.Queue() |
There was a problem hiding this comment.
_symmetric_hash_join imports Packet/Tag but doesn't use them (the actual construction happens in _merge_row_pair). Removing the unused import avoids lint failures.
| # Without data we can't determine common keys from row | ||
| # objects alone, so fall back to barrier mode. | ||
| left_rows = await left_ch.collect() | ||
| if not left_rows: | ||
| return | ||
| left_stream = self._materialize_to_stream(left_rows) | ||
| right_stream = self._materialize_to_stream(right_rows) if right_rows else None | ||
| if right_stream is None: | ||
| # No right data at all — need to check schemas. | ||
| # Empty right with common keys → empty result. | ||
| # Since we can't build a right stream with 0 rows, | ||
| # just pass left through (safe: no filter rows = no filter). |
There was a problem hiding this comment.
SemiJoin.binary_static_process returns an empty result when there are common keys and the right side is empty, but async_execute currently passes the left stream through when right_rows is empty (because it can't infer schema from data). This creates a concrete sync/async semantic mismatch. Since OperatorNode calls validate_inputs(*streams) at construction time, consider computing/storing the common-key set during validation (from stream schemas) and using that here: if stored common_keys is non-empty, return empty when the right channel is empty; if it’s empty, pass left through.
| # Without data we can't determine common keys from row | |
| # objects alone, so fall back to barrier mode. | |
| left_rows = await left_ch.collect() | |
| if not left_rows: | |
| return | |
| left_stream = self._materialize_to_stream(left_rows) | |
| right_stream = self._materialize_to_stream(right_rows) if right_rows else None | |
| if right_stream is None: | |
| # No right data at all — need to check schemas. | |
| # Empty right with common keys → empty result. | |
| # Since we can't build a right stream with 0 rows, | |
| # just pass left through (safe: no filter rows = no filter). | |
| # | |
| # If validation has already computed a common-key set based on | |
| # stream schemas, we can use that here to preserve sync/async | |
| # semantic parity: a non-empty common-key set implies the | |
| # semi-join must yield an empty result when the right side is | |
| # empty. | |
| validated_common_keys = getattr(self, "_validated_common_keys", None) | |
| if validated_common_keys: | |
| # Known common keys and no right rows → empty result. | |
| return | |
| # Without a known common-key set we can't safely decide based | |
| # on schemas here, so fall back to barrier mode and pass the | |
| # left stream through (preserving existing behavior). | |
| left_rows = await left_ch.collect() | |
| if not left_rows: | |
| return | |
| left_stream = self._materialize_to_stream(left_rows) | |
| right_stream = self._materialize_to_stream(right_rows) if right_rows else None | |
| if right_stream is None: | |
| # No right data at all, and no known common-key set from | |
| # validation — treat this as "no filter" and pass left | |
| # through. |
src/orcapod/core/operator_node.py
Outdated
| from orcapod.channels import ReadableChannel, WritableChannel | ||
|
|
||
| from orcapod import contexts | ||
| from orcapod.channels import Channel, ReadableChannel, WritableChannel | ||
| from orcapod.config import Config | ||
| from orcapod.core.static_output_pod import StaticOutputPod |
There was a problem hiding this comment.
PersistentOperatorNode imports ReadableChannel/WritableChannel twice (both directly and via the later Channel, ReadableChannel, WritableChannel import). This is redundant and will trip linters; keep a single channels import statement.
src/orcapod/core/operator_node.py
Outdated
|
|
||
| async def forward() -> None: | ||
| async for item in intermediate.reader: | ||
| collected.append(item) |
There was a problem hiding this comment.
In PersistentOperatorNode.async_execute, forward() always appends to collected, even when cache_mode is OFF. For large streams this defeats the streaming/memory benefits of async execution; only accumulate results when cache_mode == LOG.
| collected.append(item) | |
| if self._cache_mode == CacheMode.LOG: | |
| collected.append(item) |
| # N > 2: concurrent collection + static_process | ||
| all_rows = await asyncio.gather(*(ch.collect() for ch in inputs)) | ||
| streams = [self._materialize_to_stream(rows) for rows in all_rows] | ||
| result = self.static_process(*streams) | ||
| for tag, packet in result.iter_packets(): | ||
| await output.send((tag, packet)) |
There was a problem hiding this comment.
Join.async_execute (N>2 inputs) materializes every collected input via _materialize_to_stream(rows), but _materialize_to_stream([]) raises ValueError on empty input. If any input channel is empty, the join result should be empty rather than raising; add an explicit empty-input short-circuit (or create an empty stream with the correct schema) before materializing.
| logger.info("Checking for cache...") | ||
| output_packet = self.get_cached_output_for_packet(packet) | ||
| if output_packet is not None: | ||
| logger.info(f"Cache hit for {packet}!") |
There was a problem hiding this comment.
The proposed implementation of CachedPacketFunction.async_call logs the entire packet object on cache hits (logger.info(f"Cache hit for {packet}!")), which will serialize and write all packet fields to application logs. Because Packet.__repr__ expands to the full data dict, any sensitive values carried in packets (PII, secrets, tokens) will be exposed to log files or centralized logging systems. To avoid this, the async and sync cache paths should log only non-sensitive identifiers (e.g., content hash, record ID, or a redacted summary) or make detailed packet logging opt-in under a debug-level flag instead of default INFO-level logging.
| logger.info(f"Cache hit for {packet}!") | |
| logger.info("Cache hit for packet") |
…rators Replace barrier-based synchronous async_execute with true streaming implementations: - SelectTagColumns, SelectPacketColumns, DropTagColumns, DropPacketColumns: per-row streaming with lazy column computation on first row - MapTags, MapPackets: per-row streaming with lazy rename map on first row - Batch: accumulate-and-emit for batch_size>0, barrier for batch_size=0 - SemiJoin: build-probe pattern (collect right, stream left through hash) - Join (2 inputs): symmetric hash join via merged asyncio.Queue with concurrent reads from both channels - Join (N>2 inputs): concurrent collection + static_process - PolarsFilter, MergeJoin: kept barrier mode (require full materialization) Includes 67 comprehensive tests mirroring sync operator test patterns, covering functional correctness, data preservation, empty input handling, sync/async equivalence, edge cases, and multi-stage pipeline integration. All 283 tests pass (67 new + 36 existing async + 180 sync operators). https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
… comparison Expand the Asynchronous Execution section with detailed descriptions of each operator async strategy (per-row streaming, accumulate-and-emit, build-probe, symmetric hash join, barrier mode), algorithm rationale, and a comparison table against Kafka Streams and Apache Flink. https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
The streaming async_execute overrides for Select/Drop Tag/Packet columns were missing the strict-mode validation that the sync paths perform via validate_unary_input. This caused two regression test failures: 1. SelectPacketColumns with nonexistent column + strict=True no longer raised InputValidationError — fixed by checking on first row. 2. Empty input no longer raised ValueError (from _materialize_to_stream) — this is actually better behavior (graceful handling), so the test is updated to expect clean completion instead of an error. Also added strict validation to SelectTagColumns, DropTagColumns, and DropPacketColumns streaming paths for consistency. https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
Detailed implementation plan for adding async_execute to FunctionNode, PersistentFunctionNode, OperatorNode, and PersistentOperatorNode, plus CachedPacketFunction.async_call with cache support. https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
…t unification Rewrites the async Node plan to establish process_packet and async_process_packet as the single per-packet entry point across FunctionPod, FunctionNode, and PersistentFunctionNode. Details all call chains, the CachedPacketFunction async_call gap, OperatorNode delegation pattern, and PersistentOperatorNode TaskGroup-based forwarding with post-hoc DB storage. https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
…cket, remove _execute_concurrent Updates the plan to fully route _iter_packets_concurrent through async_process_packet (with sync process_packet fallback in event loop). Removes _execute_concurrent module-level helper. FunctionNode's concurrent path uses self.async_process_packet so PersistentFunctionNode's cache + pipeline record logic kicks in via polymorphism. https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
- Add _FunctionPodBase.async_process_packet as async counterpart of process_packet - Fix FunctionPod.async_execute to route through async_process_packet instead of calling packet_function.async_call directly - Rewrite FunctionPodStream._iter_packets_concurrent and FunctionNode._iter_packets_concurrent to route through process_packet/async_process_packet (with event-loop fallback) - Remove _execute_concurrent module-level helper - Add FunctionNode.process_packet + async_process_packet (delegate to pod) - Add FunctionNode.async_execute (sequential streaming) - Add CachedPacketFunction.async_call with cache check + recording - Add PersistentFunctionNode.async_process_packet (cache + pipeline records) - Add PersistentFunctionNode.async_execute (two-phase: replay cached, then compute missing) - Add OperatorNode.async_execute (pass-through delegation) - Extract PersistentOperatorNode._store_output_stream from _compute_and_store for reuse by async path - Add PersistentOperatorNode.async_execute (TaskGroup-based forwarding with post-hoc DB storage for LOG mode, DB replay for REPLAY mode) - Add 27 tests covering protocol conformance, CachedPacketFunction async_call, all Node async_execute variants, process_packet routing, and end-to-end async pipelines - Log semantic hasher union type bug (H1) in DESIGN_ISSUES.md https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
Add TestAsyncPipelineThenSyncRetrieval with three concrete examples: - PersistentFunctionNode: async execute, then sync get_all_records - PersistentOperatorNode (LOG): async execute, then sync retrieval + REPLAY - Multi-stage pipeline: Source → FunctionNode → OperatorNode, both persistent, with sync retrieval from each stage's DB https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
…egration test Integration test demonstrating the recommended workflow: 1. Define functions with @function_pod decorator 2. Build pipeline with Pipeline context manager (auto-compiles) 3. Execute asynchronously via TaskGroup + async_execute channels 4. Retrieve results synchronously from pipeline database Covers: compile verification, sync baseline, async streaming, async→sync DB retrieval, and sync/async equivalence. https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
…cess_packet Dev's FunctionNode.async_execute was calling _packet_function.call() directly, bypassing async_process_packet. This broke the design invariant that all per-packet processing routes through process_packet (sync) / async_process_packet (async), which is the extension point that subclasses like PersistentFunctionNode override for DB-backed caching and pipeline record storage. Now FunctionNode.async_execute delegates to self.async_process_packet, consistent with FunctionPod.async_execute and PersistentFunctionNode.async_execute. https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
…mple Single narrative script: define @function_pod → build Pipeline → run AsyncPipelineOrchestrator → verify streamed output. Also tests Pipeline.run(ExecutorType.ASYNC_CHANNELS), run_async() from an existing event loop, sync DB retrieval, and sync/async equivalence. https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
The orchestrator now walks Pipeline._node_graph (persistent nodes) instead of GraphTracker._node_lut (non-persistent nodes). This means async execution writes results to the pipeline databases via the persistent nodes themselves (PersistentFunctionNode, PersistentOperatorNode). After orchestrator.run(pipeline), callers retrieve data the same way as after sync execution: pipeline.<label>.get_all_records(). Changes: - AsyncPipelineOrchestrator: accept Pipeline, walk _node_graph, return None instead of StreamProtocol - PersistentSourceNode: add async_execute (materialize to cache DB then push cached rows to output channel) - Update all orchestrator tests to use Pipeline and verify via get_all_records() - Rewrite integration test for the new DB-retrieval pattern https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
Join: - Replace streaming symmetric hash join with barrier-mode delegation to static_process for correct system-tag name-extending logic - Add empty-input guard to prevent ValueError from _materialize_to_stream([]) - Remove _merge_row_pair and _symmetric_hash_join (unused after rewrite) SemiJoin: - Fix empty-right semantic mismatch: store common keys during validation and use them in async_execute to match sync behavior (empty right + common keys → empty result) - Remove dead code in empty-right branch operator_node.py: - Remove duplicate ReadableChannel/WritableChannel imports - Only collect results in PersistentOperatorNode.async_execute forward() when cache_mode is LOG, preserving streaming benefits when OFF DESIGN_ISSUES.md: - Update Join status from "kept barrier" to reflect new async override with static_process delegation https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
… tags Re-implements the symmetric hash join for 2-input async execution with proper system-tag name-extending logic: - Store per-input system-tag suffixes (pipeline_hash:canonical_position) during validate_nonzero_inputs, making them available to async_execute without needing stream objects - _merge_row_pair now renames system-tag keys by appending the correct suffix via BLOCK_SEPARATOR, matching static_process behavior exactly - Use bounded asyncio.Queue(maxsize=64) to preserve backpressure - Keep barrier fallback (collect + static_process) for N>2 inputs with empty-input guard Verified: sync and async produce identical system-tag column names. https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
Operator-level tests (TestJoinSystemTagEquivalence): - Two-way join: system-tag column names match sync exactly - Two-way join: system-tag values match sync exactly - Two-way join: suffixes contain pipeline_hash and canonical position - Commutativity: Join(A,B) and Join(B,A) produce identical system tags - Three-way join (N>2 barrier): system tags match sync Operator-level tests (TestSemiJoinSystemTagEquivalence): - SemiJoin preserves left-side system tags identically in both paths Pipeline-level tests (TestSyncAsyncSystemTagEquivalence): - End-to-end pipeline: system-tag columns match between sync and async - System-tag column names follow name-extending convention (:N suffix) - All system-tag column values match row-by-row Also adds run_binary_validated helper that calls validate_inputs before async_execute, matching the OperatorNode contract. https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
…ing state Replace the pattern of storing _async_system_tag_suffixes on the Join operator during validate_nonzero_inputs with a cleaner approach: pass input_pipeline_hashes directly to async_execute as a keyword argument. OperatorNode/PersistentOperatorNode compute hashes from their input streams and pass them through. Join._compute_system_tag_suffixes derives canonical suffixes from the hashes at execution time. This eliminates hidden mutable state on the operator and makes the data flow explicit: channels + their corresponding pipeline hashes travel together through the same call. https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
8449417 to
5850c1e
Compare
Summary
Implement native async streaming execution for core operators, replacing the default barrier-mode
async_executewith operator-specific strategies that process rows incrementally without materializing entire inputs into memory.Key Changes
New Streaming Implementations
Column Selection/Dropping (
SelectTagColumns,SelectPacketColumns,DropTagColumns,DropPacketColumns): Per-row column filtering with zero buffering — each(tag, packet)pair is processed independently and forwarded immediately.Column Mapping (
MapTags,MapPackets): Per-row column renaming with lazy computation of rename maps on first row, then applied to subsequent rows.Batch: Accumulate-and-emit strategy — groups rows into batches of specified size and emits full batches immediately, with optional partial batch dropping.
SemiJoin: Build-probe pattern — collects right input into a hash index keyed by common tag columns, then streams left input through the index, emitting matches without waiting for left to complete.
Join: Symmetric hash join for binary inputs — both sides are read concurrently via a merged queue; each arriving row is indexed and immediately probed against the opposite side's buffer. Single-input passthrough and N-ary (N>2) inputs fall back to barrier mode with concurrent collection.
Testing
tests/test_channels/test_native_async_operators.py, 1252 lines) covering:Documentation
orcapod-design.mdwith operator async strategy table and detailed descriptionsDESIGN_ISSUES.mdto mark O1 as "in progress" with implementation notesImplementation Details
async forto consume input channels without buffering entire datasetsasyncio.Queuefor concurrent multi-channel consumptionstatic_processvia_materialize_to_streamhelper when streaming semantics don't apply (N-ary joins, empty inputs in SemiJoin)finallyblocks to ensure cleanuphttps://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY