Skip to content

Add native async streaming overrides for core operators#70

Merged
eywalker merged 16 commits intodevfrom
claude/native-async-operators-4Ed8f
Mar 5, 2026
Merged

Add native async streaming overrides for core operators#70
eywalker merged 16 commits intodevfrom
claude/native-async-operators-4Ed8f

Conversation

@eywalker
Copy link
Contributor

@eywalker eywalker commented Mar 4, 2026

Summary

Implement native async streaming execution for core operators, replacing the default barrier-mode async_execute with operator-specific strategies that process rows incrementally without materializing entire inputs into memory.

Key Changes

New Streaming Implementations

  • Column Selection/Dropping (SelectTagColumns, SelectPacketColumns, DropTagColumns, DropPacketColumns): Per-row column filtering with zero buffering — each (tag, packet) pair is processed independently and forwarded immediately.

  • Column Mapping (MapTags, MapPackets): Per-row column renaming with lazy computation of rename maps on first row, then applied to subsequent rows.

  • Batch: Accumulate-and-emit strategy — groups rows into batches of specified size and emits full batches immediately, with optional partial batch dropping.

  • SemiJoin: Build-probe pattern — collects right input into a hash index keyed by common tag columns, then streams left input through the index, emitting matches without waiting for left to complete.

  • Join: Symmetric hash join for binary inputs — both sides are read concurrently via a merged queue; each arriving row is indexed and immediately probed against the opposite side's buffer. Single-input passthrough and N-ary (N>2) inputs fall back to barrier mode with concurrent collection.

Testing

  • Added comprehensive test suite (tests/test_channels/test_native_async_operators.py, 1252 lines) covering:
    • Per-operator streaming behavior validation
    • Sync/async equivalence for all operators
    • Empty input handling
    • Multi-stage pipeline integration
    • Edge cases (exact batch multiples, disjoint schemas, large inputs)

Documentation

  • Updated orcapod-design.md with operator async strategy table and detailed descriptions
  • Updated DESIGN_ISSUES.md to mark O1 as "in progress" with implementation notes

Implementation Details

  • All streaming overrides use async for to consume input channels without buffering entire datasets
  • Operators that require state (SemiJoin, Join) use asyncio.Queue for concurrent multi-channel consumption
  • Fallback to static_process via _materialize_to_stream helper when streaming semantics don't apply (N-ary joins, empty inputs in SemiJoin)
  • System tags and source metadata are preserved through transformations
  • All implementations properly close output channels in finally blocks to ensure cleanup

https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY

Copilot AI review requested due to automatic review settings March 4, 2026 15:56
@codecov-commenter
Copy link

codecov-commenter commented Mar 4, 2026

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

❌ Patch coverage is 96.38009% with 16 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/orcapod/pipeline/orchestrator.py 84.84% 5 Missing ⚠️
src/orcapod/core/operators/join.py 96.49% 4 Missing ⚠️
src/orcapod/core/operators/column_selection.py 95.23% 3 Missing ⚠️
src/orcapod/core/function_pod.py 96.55% 2 Missing ⚠️
src/orcapod/core/operators/semijoin.py 95.55% 2 Missing ⚠️

📢 Thoughts on this report? Let us know!

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces native (non–barrier-mode) async_execute implementations for several core operators so that channel-based pipelines can process (Tag, Packet) rows incrementally, reducing latency and avoiding full-input materialization where possible.

Changes:

  • Added streaming/ incremental async_execute overrides for column selection/dropping, tag/packet mapping, batching, semijoin, and join.
  • Added a large async operator equivalence/integration test suite focused on these new native async paths.
  • Updated design documentation to describe the new async execution strategies and current status.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
tests/test_channels/test_native_async_operators.py Adds comprehensive async operator tests and pipeline integration scenarios.
src/orcapod/core/operators/column_selection.py Adds per-row streaming async_execute for select/drop tag/packet column operators.
src/orcapod/core/operators/mappers.py Adds per-row streaming async_execute for MapTags / MapPackets.
src/orcapod/core/operators/batch.py Adds accumulate-and-emit streaming async_execute for Batch when batch_size > 0.
src/orcapod/core/operators/semijoin.py Adds build-probe async execution: collect right, stream left with hash lookup.
src/orcapod/core/operators/join.py Adds async execution with single-input passthrough, symmetric hash join for 2 inputs, barrier fallback for N>2.
orcapod-design.md Documents operator async strategy table and detailed strategy descriptions.
DESIGN_ISSUES.md Updates O1 status and notes on which operators are streaming vs barrier.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +348 to +353
# Merge system tags with side suffix to avoid collisions
merged_sys: dict = {}
for k, v in left_tag.system_tags().items():
merged_sys[k] = v
for k, v in right_tag.system_tags().items():
merged_sys[k] = v
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_merge_row_pair merges left/right Tag.system_tags() by plain dict update, which can overwrite colliding system-tag keys and does not apply Join's canonical system-tag renaming scheme (see _predict_system_tag_schema() / arrow_data_utils.append_to_system_tags() in static_process). This makes async_execute results diverge from the documented/system-tagged schema and can lose provenance data. Consider falling back to barrier/static_process whenever either side has any system tags, or implement the same renaming logic in async (which likely requires passing upstream ordering/identity metadata into async_execute).

Suggested change
# Merge system tags with side suffix to avoid collisions
merged_sys: dict = {}
for k, v in left_tag.system_tags().items():
merged_sys[k] = v
for k, v in right_tag.system_tags().items():
merged_sys[k] = v
# NOTE: Async join does not implement the canonical system-tag
# renaming scheme used in the static/barrier path. To avoid
# silently dropping or corrupting provenance metadata, we do not
# permit system-tagged rows here.
left_sys = left_tag.system_tags()
right_sys = right_tag.system_tags()
if left_sys or right_sys:
raise InputValidationError(
"Join.async_execute does not support system tags; "
"use the barrier/static Join path for system-tagged inputs."
)
merged_sys: dict = {}

Copilot uses AI. Check for mistakes.
Comment on lines +231 to +234
# Merge both channels into one tagged queue
_SENTINEL = object()
queue: asyncio.Queue = asyncio.Queue()

Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The merged asyncio.Queue in _symmetric_hash_join is created unbounded (no maxsize). That breaks backpressure: if inputs produce faster than the join loop can consume/send downstream, the queue can grow without bound and increase memory usage. Consider giving the queue a bounded maxsize (and letting _drain await queue.put) or using a different multiplexing approach (e.g., asyncio.wait on receive tasks) that preserves backpressure.

Copilot uses AI. Check for mistakes.
Comment on lines +219 to +227
Both sides are read concurrently via a merged queue. Each
arriving row is added to its side's index and immediately probed
against the opposite side. Matched rows are emitted to
``output`` as soon as found, so downstream consumers can begin
work before either input is fully consumed.

For correct system-tag column naming the implementation falls
back to ``static_process`` (which requires materialised streams)
when the total result set is ready.
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _symmetric_hash_join docstring claims it “falls back to static_process … when the total result set is ready” for correct system-tag column naming, but the implementation never performs that fallback (it streams merged rows directly). Either implement the fallback or update the docstring/comments so behavior and limitations are explicit.

Suggested change
Both sides are read concurrently via a merged queue. Each
arriving row is added to its side's index and immediately probed
against the opposite side. Matched rows are emitted to
``output`` as soon as found, so downstream consumers can begin
work before either input is fully consumed.
For correct system-tag column naming the implementation falls
back to ``static_process`` (which requires materialised streams)
when the total result set is ready.
Both sides are read concurrently via a merged queue. Each
arriving row is added to its side's index and immediately probed
against the opposite side. Matched rows are emitted to
``output`` as soon as they are found, so downstream consumers
can begin work before either input is fully consumed.
Unlike :meth:`static_process`, this streaming implementation does
not perform a second pass over materialised streams and does not
normalise system-tag column naming in any special way; joined
rows are forwarded with whatever tag-related columns are present
in the incoming packets.

Copilot uses AI. Check for mistakes.
Comment on lines +152 to +156
right_stream = self._materialize_to_stream(right_rows) if right_rows else None
if right_stream is None:
# No right data at all — need to check schemas.
# Empty right with common keys → empty result.
# Since we can't build a right stream with 0 rows,
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the if not right_rows: branch, right_stream = self._materialize_to_stream(right_rows) if right_rows else None will always evaluate to None (right_rows is known empty here), making the later static_process(left_stream, right_stream) path unreachable. Consider removing this dead code and clarifying the intended semantics for empty-right inputs in channel execution (and updating the docstring accordingly).

Copilot uses AI. Check for mistakes.
DESIGN_ISSUES.md Outdated
Comment on lines +273 to +275
- `Join` — symmetric hash join (kept barrier: complex system-tag name-extending logic)
- `MergeJoin` — same approach (kept barrier: complex column-merging logic)
- `SemiJoin` — build right, stream left through hash lookup ✅
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DESIGN_ISSUES.md still states that Join is “kept barrier” due to system-tag logic, but this PR adds a native async_execute + symmetric hash join implementation in src/orcapod/core/operators/join.py. Please update this status text (and the “Remaining” list below) to reflect the current implementation and/or its limitations (e.g., barrier fallback when system tags are present).

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 18 out of 18 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +231 to +241
# Merge both channels into one tagged queue
_SENTINEL = object()
queue: asyncio.Queue = asyncio.Queue()

async def _drain(
ch: ReadableChannel[tuple[TagProtocol, PacketProtocol]],
side: int,
) -> None:
async for item in ch:
await queue.put((side, item))
await queue.put((side, _SENTINEL))
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The symmetric hash join uses an unbounded asyncio.Queue(). This can bypass channel backpressure: a fast producer can drain bounded input channels into an unbounded queue, growing memory without bound. Use a bounded queue (asyncio.Queue(maxsize=...)) sized to the configured channel buffers (or another conservative bound) so upstream backpressure is preserved.

Copilot uses AI. Check for mistakes.
Comment on lines +228 to +233
"""
from orcapod.core.datagrams import Packet, Tag

# Merge both channels into one tagged queue
_SENTINEL = object()
queue: asyncio.Queue = asyncio.Queue()
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_symmetric_hash_join imports Packet/Tag but doesn't use them (the actual construction happens in _merge_row_pair). Removing the unused import avoids lint failures.

Copilot uses AI. Check for mistakes.
Comment on lines +146 to +157
# Without data we can't determine common keys from row
# objects alone, so fall back to barrier mode.
left_rows = await left_ch.collect()
if not left_rows:
return
left_stream = self._materialize_to_stream(left_rows)
right_stream = self._materialize_to_stream(right_rows) if right_rows else None
if right_stream is None:
# No right data at all — need to check schemas.
# Empty right with common keys → empty result.
# Since we can't build a right stream with 0 rows,
# just pass left through (safe: no filter rows = no filter).
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SemiJoin.binary_static_process returns an empty result when there are common keys and the right side is empty, but async_execute currently passes the left stream through when right_rows is empty (because it can't infer schema from data). This creates a concrete sync/async semantic mismatch. Since OperatorNode calls validate_inputs(*streams) at construction time, consider computing/storing the common-key set during validation (from stream schemas) and using that here: if stored common_keys is non-empty, return empty when the right channel is empty; if it’s empty, pass left through.

Suggested change
# Without data we can't determine common keys from row
# objects alone, so fall back to barrier mode.
left_rows = await left_ch.collect()
if not left_rows:
return
left_stream = self._materialize_to_stream(left_rows)
right_stream = self._materialize_to_stream(right_rows) if right_rows else None
if right_stream is None:
# No right data at all — need to check schemas.
# Empty right with common keys → empty result.
# Since we can't build a right stream with 0 rows,
# just pass left through (safe: no filter rows = no filter).
#
# If validation has already computed a common-key set based on
# stream schemas, we can use that here to preserve sync/async
# semantic parity: a non-empty common-key set implies the
# semi-join must yield an empty result when the right side is
# empty.
validated_common_keys = getattr(self, "_validated_common_keys", None)
if validated_common_keys:
# Known common keys and no right rows → empty result.
return
# Without a known common-key set we can't safely decide based
# on schemas here, so fall back to barrier mode and pass the
# left stream through (preserving existing behavior).
left_rows = await left_ch.collect()
if not left_rows:
return
left_stream = self._materialize_to_stream(left_rows)
right_stream = self._materialize_to_stream(right_rows) if right_rows else None
if right_stream is None:
# No right data at all, and no known common-key set from
# validation — treat this as "no filter" and pass left
# through.

Copilot uses AI. Check for mistakes.
Comment on lines +8 to +13
from orcapod.channels import ReadableChannel, WritableChannel

from orcapod import contexts
from orcapod.channels import Channel, ReadableChannel, WritableChannel
from orcapod.config import Config
from orcapod.core.static_output_pod import StaticOutputPod
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PersistentOperatorNode imports ReadableChannel/WritableChannel twice (both directly and via the later Channel, ReadableChannel, WritableChannel import). This is redundant and will trip linters; keep a single channels import statement.

Copilot uses AI. Check for mistakes.

async def forward() -> None:
async for item in intermediate.reader:
collected.append(item)
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In PersistentOperatorNode.async_execute, forward() always appends to collected, even when cache_mode is OFF. For large streams this defeats the streaming/memory benefits of async execution; only accumulate results when cache_mode == LOG.

Suggested change
collected.append(item)
if self._cache_mode == CacheMode.LOG:
collected.append(item)

Copilot uses AI. Check for mistakes.
Comment on lines +202 to +207
# N > 2: concurrent collection + static_process
all_rows = await asyncio.gather(*(ch.collect() for ch in inputs))
streams = [self._materialize_to_stream(rows) for rows in all_rows]
result = self.static_process(*streams)
for tag, packet in result.iter_packets():
await output.send((tag, packet))
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Join.async_execute (N>2 inputs) materializes every collected input via _materialize_to_stream(rows), but _materialize_to_stream([]) raises ValueError on empty input. If any input channel is empty, the join result should be empty rather than raising; add an explicit empty-input short-circuit (or create an empty stream with the correct schema) before materializing.

Copilot uses AI. Check for mistakes.
logger.info("Checking for cache...")
output_packet = self.get_cached_output_for_packet(packet)
if output_packet is not None:
logger.info(f"Cache hit for {packet}!")
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proposed implementation of CachedPacketFunction.async_call logs the entire packet object on cache hits (logger.info(f"Cache hit for {packet}!")), which will serialize and write all packet fields to application logs. Because Packet.__repr__ expands to the full data dict, any sensitive values carried in packets (PII, secrets, tokens) will be exposed to log files or centralized logging systems. To avoid this, the async and sync cache paths should log only non-sensitive identifiers (e.g., content hash, record ID, or a redacted summary) or make detailed packet logging opt-in under a debug-level flag instead of default INFO-level logging.

Suggested change
logger.info(f"Cache hit for {packet}!")
logger.info("Cache hit for packet")

Copilot uses AI. Check for mistakes.
claude added 16 commits March 5, 2026 01:29
…rators

Replace barrier-based synchronous async_execute with true streaming
implementations:

- SelectTagColumns, SelectPacketColumns, DropTagColumns, DropPacketColumns:
  per-row streaming with lazy column computation on first row
- MapTags, MapPackets: per-row streaming with lazy rename map on first row
- Batch: accumulate-and-emit for batch_size>0, barrier for batch_size=0
- SemiJoin: build-probe pattern (collect right, stream left through hash)
- Join (2 inputs): symmetric hash join via merged asyncio.Queue with
  concurrent reads from both channels
- Join (N>2 inputs): concurrent collection + static_process
- PolarsFilter, MergeJoin: kept barrier mode (require full materialization)

Includes 67 comprehensive tests mirroring sync operator test patterns,
covering functional correctness, data preservation, empty input handling,
sync/async equivalence, edge cases, and multi-stage pipeline integration.

All 283 tests pass (67 new + 36 existing async + 180 sync operators).

https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
… comparison

Expand the Asynchronous Execution section with detailed descriptions of
each operator async strategy (per-row streaming, accumulate-and-emit,
build-probe, symmetric hash join, barrier mode), algorithm rationale,
and a comparison table against Kafka Streams and Apache Flink.

https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
The streaming async_execute overrides for Select/Drop Tag/Packet columns
were missing the strict-mode validation that the sync paths perform via
validate_unary_input. This caused two regression test failures:

1. SelectPacketColumns with nonexistent column + strict=True no longer
   raised InputValidationError — fixed by checking on first row.
2. Empty input no longer raised ValueError (from _materialize_to_stream)
   — this is actually better behavior (graceful handling), so the test
   is updated to expect clean completion instead of an error.

Also added strict validation to SelectTagColumns, DropTagColumns, and
DropPacketColumns streaming paths for consistency.

https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
Detailed implementation plan for adding async_execute to FunctionNode,
PersistentFunctionNode, OperatorNode, and PersistentOperatorNode, plus
CachedPacketFunction.async_call with cache support.

https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
…t unification

Rewrites the async Node plan to establish process_packet and
async_process_packet as the single per-packet entry point across
FunctionPod, FunctionNode, and PersistentFunctionNode. Details
all call chains, the CachedPacketFunction async_call gap,
OperatorNode delegation pattern, and PersistentOperatorNode
TaskGroup-based forwarding with post-hoc DB storage.

https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
…cket, remove _execute_concurrent

Updates the plan to fully route _iter_packets_concurrent through
async_process_packet (with sync process_packet fallback in event loop).
Removes _execute_concurrent module-level helper. FunctionNode's concurrent
path uses self.async_process_packet so PersistentFunctionNode's cache +
pipeline record logic kicks in via polymorphism.

https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
- Add _FunctionPodBase.async_process_packet as async counterpart of
  process_packet
- Fix FunctionPod.async_execute to route through async_process_packet
  instead of calling packet_function.async_call directly
- Rewrite FunctionPodStream._iter_packets_concurrent and
  FunctionNode._iter_packets_concurrent to route through
  process_packet/async_process_packet (with event-loop fallback)
- Remove _execute_concurrent module-level helper
- Add FunctionNode.process_packet + async_process_packet (delegate to pod)
- Add FunctionNode.async_execute (sequential streaming)
- Add CachedPacketFunction.async_call with cache check + recording
- Add PersistentFunctionNode.async_process_packet (cache + pipeline records)
- Add PersistentFunctionNode.async_execute (two-phase: replay cached,
  then compute missing)
- Add OperatorNode.async_execute (pass-through delegation)
- Extract PersistentOperatorNode._store_output_stream from
  _compute_and_store for reuse by async path
- Add PersistentOperatorNode.async_execute (TaskGroup-based forwarding
  with post-hoc DB storage for LOG mode, DB replay for REPLAY mode)
- Add 27 tests covering protocol conformance, CachedPacketFunction
  async_call, all Node async_execute variants, process_packet routing,
  and end-to-end async pipelines
- Log semantic hasher union type bug (H1) in DESIGN_ISSUES.md

https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
Add TestAsyncPipelineThenSyncRetrieval with three concrete examples:
- PersistentFunctionNode: async execute, then sync get_all_records
- PersistentOperatorNode (LOG): async execute, then sync retrieval + REPLAY
- Multi-stage pipeline: Source → FunctionNode → OperatorNode, both
  persistent, with sync retrieval from each stage's DB

https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
…egration test

Integration test demonstrating the recommended workflow:
1. Define functions with @function_pod decorator
2. Build pipeline with Pipeline context manager (auto-compiles)
3. Execute asynchronously via TaskGroup + async_execute channels
4. Retrieve results synchronously from pipeline database

Covers: compile verification, sync baseline, async streaming, async→sync
DB retrieval, and sync/async equivalence.

https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
…cess_packet

Dev's FunctionNode.async_execute was calling _packet_function.call()
directly, bypassing async_process_packet. This broke the design
invariant that all per-packet processing routes through
process_packet (sync) / async_process_packet (async), which is the
extension point that subclasses like PersistentFunctionNode override
for DB-backed caching and pipeline record storage.

Now FunctionNode.async_execute delegates to self.async_process_packet,
consistent with FunctionPod.async_execute and
PersistentFunctionNode.async_execute.

https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
…mple

Single narrative script: define @function_pod → build Pipeline →
run AsyncPipelineOrchestrator → verify streamed output. Also tests
Pipeline.run(ExecutorType.ASYNC_CHANNELS), run_async() from an
existing event loop, sync DB retrieval, and sync/async equivalence.

https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
The orchestrator now walks Pipeline._node_graph (persistent nodes)
instead of GraphTracker._node_lut (non-persistent nodes).  This means
async execution writes results to the pipeline databases via the
persistent nodes themselves (PersistentFunctionNode, PersistentOperatorNode).

After orchestrator.run(pipeline), callers retrieve data the same way
as after sync execution: pipeline.<label>.get_all_records().

Changes:
- AsyncPipelineOrchestrator: accept Pipeline, walk _node_graph,
  return None instead of StreamProtocol
- PersistentSourceNode: add async_execute (materialize to cache DB
  then push cached rows to output channel)
- Update all orchestrator tests to use Pipeline and verify via
  get_all_records()
- Rewrite integration test for the new DB-retrieval pattern

https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
Join:
- Replace streaming symmetric hash join with barrier-mode delegation
  to static_process for correct system-tag name-extending logic
- Add empty-input guard to prevent ValueError from _materialize_to_stream([])
- Remove _merge_row_pair and _symmetric_hash_join (unused after rewrite)

SemiJoin:
- Fix empty-right semantic mismatch: store common keys during validation
  and use them in async_execute to match sync behavior (empty right +
  common keys → empty result)
- Remove dead code in empty-right branch

operator_node.py:
- Remove duplicate ReadableChannel/WritableChannel imports
- Only collect results in PersistentOperatorNode.async_execute forward()
  when cache_mode is LOG, preserving streaming benefits when OFF

DESIGN_ISSUES.md:
- Update Join status from "kept barrier" to reflect new async override
  with static_process delegation

https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
… tags

Re-implements the symmetric hash join for 2-input async execution
with proper system-tag name-extending logic:

- Store per-input system-tag suffixes (pipeline_hash:canonical_position)
  during validate_nonzero_inputs, making them available to async_execute
  without needing stream objects
- _merge_row_pair now renames system-tag keys by appending the correct
  suffix via BLOCK_SEPARATOR, matching static_process behavior exactly
- Use bounded asyncio.Queue(maxsize=64) to preserve backpressure
- Keep barrier fallback (collect + static_process) for N>2 inputs with
  empty-input guard

Verified: sync and async produce identical system-tag column names.

https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
Operator-level tests (TestJoinSystemTagEquivalence):
- Two-way join: system-tag column names match sync exactly
- Two-way join: system-tag values match sync exactly
- Two-way join: suffixes contain pipeline_hash and canonical position
- Commutativity: Join(A,B) and Join(B,A) produce identical system tags
- Three-way join (N>2 barrier): system tags match sync

Operator-level tests (TestSemiJoinSystemTagEquivalence):
- SemiJoin preserves left-side system tags identically in both paths

Pipeline-level tests (TestSyncAsyncSystemTagEquivalence):
- End-to-end pipeline: system-tag columns match between sync and async
- System-tag column names follow name-extending convention (:N suffix)
- All system-tag column values match row-by-row

Also adds run_binary_validated helper that calls validate_inputs
before async_execute, matching the OperatorNode contract.

https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
…ing state

Replace the pattern of storing _async_system_tag_suffixes on the Join
operator during validate_nonzero_inputs with a cleaner approach: pass
input_pipeline_hashes directly to async_execute as a keyword argument.

OperatorNode/PersistentOperatorNode compute hashes from their input
streams and pass them through. Join._compute_system_tag_suffixes derives
canonical suffixes from the hashes at execution time.

This eliminates hidden mutable state on the operator and makes the data
flow explicit: channels + their corresponding pipeline hashes travel
together through the same call.

https://claude.ai/code/session_01TmKbk8PSQGLoMkNi9DETtY
@eywalker eywalker force-pushed the claude/native-async-operators-4Ed8f branch from 8449417 to 5850c1e Compare March 5, 2026 01:31
@eywalker eywalker merged commit 9ee9383 into dev Mar 5, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants