Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .claude/hooks/session-start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash
set -euo pipefail

# Only run in remote (Claude Code on the web) environments
if [ "${CLAUDE_CODE_REMOTE:-}" != "true" ]; then
exit 0
fi

# Install system dependencies required by pygraphviz
if ! dpkg -s libgraphviz-dev >/dev/null 2>&1; then
sudo apt-get update -qq
sudo apt-get install -y -qq graphviz libgraphviz-dev >/dev/null 2>&1
fi

# Install Python dependencies using uv
cd "$CLAUDE_PROJECT_DIR"
uv sync --group dev
14 changes: 14 additions & 0 deletions .claude/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"hooks": {
"SessionStart": [
{
"hooks": [
{
"type": "command",
"command": "$CLAUDE_PROJECT_DIR/.claude/hooks/session-start.sh"
}
]
}
]
}
}
39 changes: 39 additions & 0 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: Deploy docs

on:
push:
branches: [main]
workflow_dispatch:

permissions:
contents: read
pages: write
id-token: write

concurrency:
group: "pages"
cancel-in-progress: false

jobs:
deploy:
runs-on: ubuntu-latest
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
steps:
- uses: actions/checkout@v4

- uses: astral-sh/setup-uv@v4

- name: Install docs dependencies
run: uv sync --group docs
Comment on lines +26 to +29
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs deployment workflow installs the full project (not just MkDocs), so it likely needs the same CI prerequisites as run-tests.yml (notably graphviz/libgraphviz-dev for pygraphviz, and an explicit Python version via actions/setup-python). Also consider using uv sync --locked --group docs to ensure the build fails if uv.lock is out of date, matching the reproducibility guarantees used in the test workflow.

Suggested change
- uses: astral-sh/setup-uv@v4
- name: Install docs dependencies
run: uv sync --group docs
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- uses: astral-sh/setup-uv@v4
- name: Install system dependencies
run: |
sudo apt-get update
sudo apt-get install -y graphviz libgraphviz-dev
- name: Install docs dependencies
run: uv sync --locked --group docs

Copilot uses AI. Check for mistakes.

- name: Build documentation
run: uv run mkdocs build

- uses: actions/upload-pages-artifact@v3
with:
path: site/

- id: deployment
uses: actions/deploy-pages@v4
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,6 @@ dj_*_conf.json
# pixi environments
.pixi/*
!.pixi/config.toml

# Nested repo clone (pre-existing artifact)
orcapod-python/
160 changes: 132 additions & 28 deletions .zed/rules
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## Naming convention

Always write "orcapod" with a lowercase p — never "OrcaPod" or "Orcapod". This applies
everywhere: documentation, docstrings, comments, commit messages, and code comments.

## Running commands

Always run Python commands via `uv run`, e.g.:
Expand All @@ -7,6 +12,14 @@ Always run Python commands via `uv run`, e.g.:

Never use `python`, `pytest`, or `python3` directly.

## Branch hygiene

Periodically check the target branch (typically dev) for updates and incorporate them into
your working branch. Before pushing, fetch and rebase onto the latest target branch to avoid
divergence and merge conflicts. If cherry-picking is needed due to unrelated commit history,
prefer cherry-picking your commits onto a fresh branch from the target rather than resolving
massive rebase conflicts.

## Updating agent instructions

When adding or changing any instruction, update BOTH:
Expand Down Expand Up @@ -57,37 +70,49 @@ Examples:
## Project layout

src/orcapod/
types.py — Schema, ColumnConfig, ContentHash
types.py — Schema, ColumnConfig, ContentHash, PipelineConfig,
NodeConfig, ExecutorType, CacheMode
system_constants.py — Column prefixes and separators
errors.py — InputValidationError, DuplicateTagError, FieldNotResolvableError
config.py — Config dataclass
channels.py — Async channel primitives (Channel, BroadcastChannel,
ReadableChannel, WritableChannel, ChannelClosed)
contexts/ — DataContext (semantic_hasher, arrow_hasher, type_converter)
protocols/
hashing_protocols.py — PipelineElementProtocol, ContentIdentifiableProtocol
database_protocols.py — ArrowDatabaseProtocol
pipeline_protocols.py — Pipeline-level protocols
semantic_types_protocols.py — TypeConverterProtocol
core_protocols/ — StreamProtocol, PodProtocol, SourceProtocol,
PacketFunctionProtocol, DatagramProtocol, TagProtocol,
PacketProtocol, TrackerProtocol
PacketProtocol, TrackerProtocol, AsyncExecutableProtocol,
PacketFunctionExecutorProtocol, OperatorPodProtocol,
LabelableProtocol, TemporalProtocol, TraceableProtocol
core/
base.py — ContentIdentifiableBase, PipelineElementBase, TraceableBase
static_output_pod.py — StaticOutputPod (operator base), DynamicPodStream
function_pod.py — FunctionPod, FunctionPodStream, FunctionNode
base.py — LabelableMixin, DataContextMixin, TraceableBase
function_pod.py — FunctionPod, FunctionPodStream, @function_pod decorator
packet_function.py — PacketFunctionBase, PythonPacketFunction, CachedPacketFunction
operator_node.py — OperatorNode (DB-backed operator execution)
tracker.py — Invocation tracking
tracker.py — BasicTrackerManager, GraphTracker
datagrams/
datagram.py — Datagram (unified dict/Arrow backing, lazy conversion)
tag_packet.py — Tag (+ system tags), Packet (+ source info)
sources/
base.py — RootSource (abstract, no upstream)
arrow_table_source.py — Core source — all other sources delegate to it
derived_source.py — DerivedSource (backed by FunctionNode/OperatorNode DB)
persistent_source.py — PersistentSource (DB-backed caching wrapper)
derived_source.py — DerivedSource (backed by node DB records)
csv_source.py, dict_source.py, list_source.py,
data_frame_source.py, delta_table_source.py — Delegating wrappers
source_registry.py — SourceRegistry for provenance resolution
streams/
base.py — StreamBase (abstract)
arrow_table_stream.py — ArrowTableStream (concrete, immutable)
nodes/
function_node.py — FunctionNode, PersistentFunctionNode
operator_node.py — OperatorNode, PersistentOperatorNode
source_node.py — SourceNode (leaf stream in graph)
operators/
static_output_pod.py — StaticOutputOperatorPod, DynamicPodStream
base.py — UnaryOperator, BinaryOperator, NonZeroInputOperator
join.py — Join (N-ary inner join, commutative)
merge_join.py — MergeJoin (binary, colliding cols → sorted list[T])
Expand All @@ -96,27 +121,61 @@ src/orcapod/
column_selection.py — Select/Drop Tag/Packet columns
mappers.py — MapTags, MapPackets (rename columns)
filters.py — PolarsFilter
executors/
base.py — PacketFunctionExecutorBase (ABC)
local.py — LocalExecutor (default in-process)
ray.py — RayExecutor (dispatch to Ray cluster)
pipeline/
graph.py — Pipeline (extends GraphTracker, compiles to persistent nodes)
nodes.py — PersistentSourceNode (DB-backed leaf wrapper)
orchestrator.py — AsyncPipelineOrchestrator (channel-based concurrent execution)
hashing/
semantic_hashing/ — BaseSemanticHasher, type handlers
semantic_types/ — Type conversion (Python ↔ Arrow)
databases/ — ArrowDatabaseProtocol implementations (Delta Lake, in-memory)
file_hashers.py — BasicFileHasher, CachedFileHasher
arrow_hashers.py — Arrow-specific hashing
arrow_serialization.py — Arrow serialization utilities
arrow_utils.py — Arrow manipulation for hashing
defaults.py — Factory functions for default hashers
hash_utils.py — hash_file(), get_function_components()
string_cachers.py — String caching strategies
versioned_hashers.py — Versioned hasher support
visitors.py — Visitor pattern for hashing
semantic_hashing/ — BaseSemanticHasher, type handlers, TypeHandlerRegistry
semantic_types/ — Type conversion (Python ↔ Arrow), UniversalTypeConverter,
SemanticTypeRegistry, type inference
databases/
delta_lake_databases.py — DeltaTableDatabase
in_memory_databases.py — InMemoryArrowDatabase
noop_database.py — NoOpArrowDatabase
file_utils.py — File utilities for database operations
execution_engines/
ray_execution_engine.py — RayEngine (execution on Ray clusters)
utils/
arrow_data_utils.py — System tag manipulation, source info, column helpers
arrow_utils.py — Arrow table utilities
schema_utils.py — Schema extraction, union, intersection, compatibility
lazy_module.py — LazyModule for deferred heavy imports
function_info.py — Function introspection utilities
git_utils.py — Git metadata extraction
name.py — Name utilities
object_spec.py — Object specification/serialization
polars_data_utils.py — Polars-specific utilities

tests/
test_core/
datagrams/ — Lazy conversion, dict/Arrow round-trip
sources/ — Source construction, protocol conformance, DerivedSource
streams/ — ArrowTableStream behavior
function_pod/ — FunctionPod, FunctionNode, pipeline hash integration
sources/ — Source construction, protocol conformance, DerivedSource,
PersistentSource
streams/ — ArrowTableStream behavior, convenience methods
function_pod/ — FunctionPod, FunctionNode, pipeline hash integration,
@function_pod decorator
operators/ — All operators, OperatorNode, MergeJoin
packet_function/ — PacketFunction, CachedPacketFunction
test_hashing/ — Semantic hasher, hash stability
packet_function/ — PacketFunction, CachedPacketFunction, executor
test_channels/ — Async channels, async_execute for operators/nodes/pods,
native async operators, pipeline integration
test_pipeline/ — Pipeline compilation, AsyncPipelineOrchestrator
test_hashing/ — Semantic hasher, hash stability, file hashers, string cachers
test_databases/ — Delta Lake, in-memory, no-op databases
test_semantic_types/ — Type converter tests
test_semantic_types/ — Type converter, semantic registry, struct converters

---

Expand All @@ -126,8 +185,12 @@ See orcapod-design.md at the project root for the full design specification.

### Core data flow

Pull-based (synchronous):
RootSource → ArrowTableStream → [Operator / FunctionPod] → ArrowTableStream → ...

Push-based (async pipeline):
Pipeline.compile() → AsyncPipelineOrchestrator.run() → channels → persistent nodes → DB

Every stream is an immutable sequence of (Tag, Packet) pairs backed by a PyArrow Table.
Tag columns are join keys and metadata; packet columns are the data payload.

Expand All @@ -143,22 +206,53 @@ Key methods: output_schema(), keys(), iter_packets(), as_table().

Source (core/sources/) — produces a stream from external data. ArrowTableSource is the core
implementation; CSV/Delta/DataFrame/Dict/List sources all delegate to it internally. Each
source adds source-info columns and a system tag column. DerivedSource wraps a
FunctionNode/OperatorNode's DB records as a new source.
source adds source-info columns and a system tag column. DerivedSource wraps a node's DB
records as a new source. PersistentSource wraps any RootSource with DB-backed caching
(deduped by per-row content hash).

Function Pod (core/function_pod.py) — wraps a PacketFunction that transforms individual
packets. Never inspects tags. Two execution models:
- FunctionPod → FunctionPodStream: lazy, in-memory
- FunctionNode: DB-backed, two-phase (yield cached results first, then compute missing)
packets. Never inspects tags. Supports async functions via PythonPacketFunction. The
@function_pod decorator creates FunctionPod instances directly from Python functions.

Node (core/nodes/) — graph-aware wrappers that participate in the computation DAG:
- SourceNode — leaf stream in the graph (wraps a StreamProtocol)
- FunctionNode / PersistentFunctionNode — packet function invocations (persistent variant
is DB-backed with two-phase execution: yield cached, then compute missing)
- OperatorNode / PersistentOperatorNode — operator invocations (persistent variant
is DB-backed with deduplication)

Operator (core/operators/) — structural pod transforming streams without synthesizing new
packet values. All subclass StaticOutputPod:
packet values. All subclass StaticOutputOperatorPod. Each operator also implements
AsyncExecutableProtocol for push-based channel execution:
- UnaryOperator — 1 input (Batch, Select/Drop columns, Map, Filter)
- BinaryOperator — 2 inputs (MergeJoin, SemiJoin)
- NonZeroInputOperator — 1+ inputs (Join)

OperatorNode (core/operator_node.py) — DB-backed operator execution, analogous to
FunctionNode.
Executor (core/executors/) — pluggable execution backends for packet functions:
- LocalExecutor — default in-process execution
- RayExecutor — dispatches to a Ray cluster

Channel (channels.py) — async primitives for push-based pipeline execution:
- Channel[T] — bounded async channel with backpressure and close signaling
- BroadcastChannel[T] — fan-out channel for multiple consumers
- ReadableChannel[T] / WritableChannel[T] — consumer/producer protocols

Pipeline (pipeline/) — persistent, async-capable pipeline infrastructure:
- Pipeline — extends GraphTracker; records invocations during a with block, then compile()
replaces every node with its persistent variant (leaf streams → PersistentSourceNode,
function nodes → PersistentFunctionNode, operator nodes → PersistentOperatorNode)
- AsyncPipelineOrchestrator — executes a compiled pipeline using channels; walks the
persistent node graph in topological order, creates bounded channels between nodes,
launches all nodes concurrently via asyncio.TaskGroup

### Async execution model

All pipeline nodes implement AsyncExecutableProtocol:
async def async_execute(inputs, output) → None

The orchestrator wires channels between nodes and launches tasks without knowing node types.
PipelineConfig controls buffer sizes (channel_buffer_size) and concurrency limits
(default_max_concurrency). Per-node overrides are set via NodeConfig.

### Strict operator / function pod boundary

Expand Down Expand Up @@ -199,18 +293,28 @@ Prefixes are computed from SystemConstant in system_constants.py.
- LazyModule("pyarrow") — deferred import for heavy deps. Used in
if TYPE_CHECKING: / else: blocks.
- Argument symmetry — operators return frozenset (commutative) or tuple (ordered).
- StaticOutputPod.process() → DynamicPodStream — wraps static_process() with staleness
detection and automatic recomputation.
- StaticOutputOperatorPod.process() → DynamicPodStream — wraps static_process() with
staleness detection and automatic recomputation.
- Source delegation — CSVSource, DictSource, etc. create an internal ArrowTableSource.
- Pipeline context manager — records non-persistent nodes during with block, then compile()
promotes them to persistent variants with DB backing.
- AsyncExecutableProtocol — unified interface for all pipeline nodes. The orchestrator
wires channels and launches tasks without knowing node types.
- GraphTracker — tracks operator/function pod invocations in a NetworkX DAG; Pipeline
extends it to add compilation and persistence.

### Important implementation details

- ArrowTableSource raises ValueError if any tag_columns are not in the table.
- ArrowTableStream requires at least one packet column; raises ValueError otherwise.
- FunctionNode Phase 1 returns ALL records in the shared pipeline_path DB table.
- PersistentFunctionNode Phase 1 returns ALL records in the shared pipeline_path DB table.
Phase 2 skips inputs whose hash is already in the DB.
- Empty data → ArrowTableSource raises ValueError("Table is empty").
- DerivedSource before run() → raises ValueError (no computed records).
- Join requires non-overlapping packet columns; raises InputValidationError on collision.
- MergeJoin requires colliding columns to have identical types; merges into sorted list[T].
- Operators predict output schema (including system tag names) without computation.
- CachedFileHasher uses mtime+size cache busting to detect file changes without re-hashing.
- PersistentSource cache is always on; returns the union of all cached data across runs.
- AsyncPipelineOrchestrator uses BroadcastChannel for fan-out (one node feeding multiple
downstream consumers).
Loading