Open
Conversation
Define the directory layout, metadata.json schema, and per-entity Parquet file format for dumping and restoring subgraph data.
Add `OidValue::Int4Range(Bound<i32>, Bound<i32>)` variant to properly deserialize PostgreSQL int4range columns (OID 3904). Update `select_cols` in dsl.rs to bind block_range as `Range<Integer>` instead of using the Bytes placeholder, resolving the existing TODO.
Add block$ (immutable), block_range (mutable), and causality_region to the column lookup so callers that need these columns by name can find them. Also add CAUSALITY_REGION_COL pseudo-column static and tighten with_nsp visibility to pub(crate).
Create the foundation for Parquet-based dump/restore: a schema mapping module that converts relational Table definitions to Arrow Schema objects. - arrow_schema() maps Table -> Arrow Schema with system columns (vid, block tracking, causality_region) followed by data columns - data_sources_arrow_schema() provides fixed schema for data_sources$ table - All ColumnType variants mapped (TSVector skipped, Enum -> Utf8) - List columns wrapped in Arrow List type based on Column.is_list()
Implement rows_to_record_batch() which converts Vec<DynamicRow<OidValue>> into Arrow RecordBatch objects. Uses a type-erased ColumnBuilder enum that dispatches on Arrow DataType to create the appropriate array builder. Supports all OidValue scalar variants (Bool, Int, Int8, Bytes, String, BigDecimal, Timestamp) and all array variants (BoolArray, Ints, Int8Array, BytesArray, StringArray, BigDecimalArray, TimestampArray). Block range columns arrive as separate Int32 values from the dump query, keeping the converter as a clean 1:1 mapping.
Add ParquetChunkWriter that wraps Arrow's ArrowWriter to stream RecordBatches into a ZSTD-compressed Parquet file while tracking row count and vid range. Returns ChunkInfo metadata on finish for inclusion in metadata.json.
Wire dump queries and Parquet writing into Layout::dump(). This is the core of the dump feature, covering both entity tables and the special data_sources$ table. For entity tables: build a DynamicSelectClause that selects vid, block columns (split into lower/upper for mutable tables), causality_region, and data columns. Use VidBatcher for adaptive batching, convert rows to Arrow RecordBatch, and write via ParquetChunkWriter. For data_sources$: use a concrete QueryableByName struct with raw SQL (fixed schema, no DynamicSelectClause needed). Check table existence via catalog::table_exists before attempting dump. Metadata includes version, network, block pointers, entity count, graft info, health, indexes, and per-table chunk tracking. Written atomically via tmp+rename so its presence signals a complete dump. Add entity_count() helper in detail.rs. Wire dump() through DeploymentStore and SubgraphStore.
Wire the dump subcommand into graphman CLI. Takes a deployment search argument and an output directory, resolves the deployment, and delegates to SubgraphStore::dump().
Add `parquet/reader.rs` with `read_batches()` to read Parquet files back into Arrow RecordBatches. This is the foundation for the restore pipeline (step 1 of the restore plan).
Add `RestoreRow`, `DataSourceRestoreRow` structs and conversion functions `record_batch_to_restore_rows()` and `record_batch_to_data_source_rows()` that convert Arrow RecordBatches back into typed data suitable for database insertion. This is step 2 of the restore plan.
Add a new constructor to InsertQuery that accepts RestoreRow data from Parquet files, bypassing the WriteChunk/EntityWrite pipeline. This is the insertion foundation for the restore path. Also add From<i32> impl for CausalityRegion to allow constructing values from deserialized Parquet data.
Add a method to read and validate metadata.json from a dump directory. Make fields of Metadata, Manifest, BlockPtr, Health, and Error structs pub(crate) so they are accessible from the restore module.
Add import_data() to the restore module that reads Parquet chunks and inserts entity data into PostgreSQL tables. Supports resumability by checking max(vid) in each table and skipping already-imported rows. Entity tables use InsertQuery::for_restore() for efficient batch inserts. The data_sources$ table uses raw SQL with bind parameters since it has a fixed schema outside the Layout.
Wire up the restore pipeline through SubgraphStore::restore() and DeploymentStore::restore(). Uses plan_restore() to determine whether to create or replace the deployment site, validates the target shard exists, resolves the subgraph name for deployment rule matching, and assigns the restored deployment to a node. Changes: - DeploymentStore::restore() coordinates schema creation, data import, and finalization - Inner::restore() handles conflict resolution, site allocation, and node assignment via deployment rules - Expose create_site() and find_active_site() on primary::Connection - Make create_site() accept an `active` parameter
Add `graphman restore` CLI with options: --directory Path to dump directory --shard Target shard (default: primary) --name Subgraph name for deployment rule matching --replace Drop and recreate if exists in target shard --add Create copy in a different shard --force Restore regardless of current state
Wrap the dump's data-reading operations in a REPEATABLE READ READ ONLY transaction to get a consistent MVCC snapshot. This prevents head block vs entity data mismatches, cross-table inconsistency, and missing or phantom rows caused by concurrent indexing or pruning. Add a TransactionBuilder for PermittedConnection since diesel-async's TransactionBuilder requires TransactionManager = AnsiTransactionManager, which pool-wrapped connection types don't satisfy.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR implements
graphman dumpandgraphman restorecommands that do what it says on the tin.Dumps are consistent, but they are taken using a single transaction; that may have very bad effects on the overall system if dumps take a very long time. Restore is a little nicer in that it splits the data import into multiple jobs.
There are lots of ways in which this could be improved, but I feel this is a useful starting point, at the very least for development and test systems. The help text for the dump and restore command have scary warnings about not using them in production - and they shouldn't, even though I think at least for smaller subgraphs they might work ok.