Skip to content
Draft
120 changes: 120 additions & 0 deletions docs/en/engines/table-engines/special/hybrid.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
---
description: 'Hybrid unions multiple data sources behind per-segment predicates so queries behave like a single table while data is migrated or tiered.'
slug: /engines/table-engines/special/hybrid
title: 'Hybrid Table Engine'
sidebar_label: 'Hybrid'
sidebar_position: 11
---

# Hybrid table engine

`Hybrid` builds on top of the [Distributed](./distributed.md) table engine. It lets you expose several data sources as one logical table and assign every source its own predicate.
The engine rewrites incoming queries so that each segment receives the original query plus its predicate. This keeps all of the Distributed optimisations (remote aggregation, `skip_unused_shards`,
global JOIN pushdown, and so on) while you duplicate or migrate data across clusters, storage types, or formats.

It keeps the same execution pipeline as `engine=Distributed` but can read from multiple underlying sources simultaneously—similar to `engine=Merge`—while still pushing logic down to each source.

Typical use cases include:

- Zero-downtime migrations where "old" and "new" replicas temporarily overlap.
- Tiered storage, for example fresh data on a local cluster and historical data in S3.
- Gradual roll-outs where only a subset of rows should be served from a new backend.

By giving mutually exclusive predicates to the segments (for example, `date < watermark` and `date >= watermark`), you ensure that each row is read from exactly one source.

## Enable the engine

The Hybrid engine is experimental. Enable it per session (or in the user profile) before creating tables:

```sql
SET allow_experimental_hybrid_table = 1;
```

### Automatic Type Alignment

Hybrid segments can evolve independently, so the same logical column may use different physical types. With the experimental `hybrid_table_auto_cast_columns = 1` **(enabled by default and requires `allow_experimental_analyzer = 1`)**, the engine inserts the necessary `CAST` operations into each rewritten query so every shard receives the schema defined by the Hybrid table. You can opt out by setting the flag to `0` if it causes issues.

Segment schemas are cached when you create or attach a Hybrid table. If you alter a segment later (for example change a column type), refresh the Hybrid table (detach/attach or recreate it) so the cached headers stay in sync with the new schema; otherwise the auto-cast feature may miss the change and queries can still fail with header/type errors.

## Engine definition

```sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
column1 type1,
column2 type2,
...
)
ENGINE = Hybrid(table_function_1, predicate_1 [, table_function_2, predicate_2 ...])
```

You must pass at least two arguments – the first table function and its predicate. Additional sources are appended as `table_function, predicate` pairs. The first table function is also used for `INSERT` statements.

### Arguments and behaviour

- `table_function_n` must be a valid table function (for example `remote`, `remoteSecure`, `cluster`, `clusterAllReplicas`, `s3Cluster`) or a fully qualified table name (`database.table`). The first argument must be a table function—such as `remote` or `cluster`—because it instantiates the underlying `Distributed` storage.
- `predicate_n` must be an expression that can be evaluated on the table columns. The engine adds it to the segment's query with an additional `AND`, so expressions like `event_date >= '2025-09-01'` or `id BETWEEN 10 AND 15` are typical.
- The query planner picks the same processing stage for every segment as it does for the base `Distributed` plan, so remote aggregation, ORDER BY pushdown, `skip_unused_shards`, and the legacy/analyzer execution modes behave the same way.
- `INSERT` statements are forwarded to the first table function only. If you need multi-destination writes, use explicit `INSERT` statements into the respective sources.
- Align schemas across the segments. ClickHouse builds a common header and rejects creation if any segment misses a column defined in the Hybrid schema. If the physical types differ you may need to add casts on one side or in the query, just as you would when reading from heterogeneous replicas.

## Example: local cluster plus S3 historical tier

The following commands illustrate a two-segment layout. Hot data stays on a local ClickHouse cluster, while historical rows come from public S3 Parquet files.

```sql
-- Local MergeTree table that keeps current data
CREATE OR REPLACE TABLE btc_blocks_local
(
`hash` FixedString(64),
`version` Int64,
`mediantime` DateTime64(9),
`nonce` Int64,
`bits` FixedString(8),
`difficulty` Float64,
`chainwork` FixedString(64),
`size` Int64,
`weight` Int64,
`coinbase_param` String,
`number` Int64,
`transaction_count` Int64,
`merkle_root` FixedString(64),
`stripped_size` Int64,
`timestamp` DateTime64(9),
`date` Date
)
ENGINE = MergeTree
ORDER BY (timestamp)
PARTITION BY toYYYYMM(date);

-- Hybrid table that unions the local shard with historical data in S3
CREATE OR REPLACE TABLE btc_blocks ENGINE = Hybrid(
remote('localhost:9000', currentDatabase(), 'btc_blocks_local'), date >= '2025-09-01',
s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN), date < '2025-09-01'
) AS btc_blocks_local;

-- Writes target the first (remote) segment
INSERT INTO btc_blocks
SELECT *
FROM s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN)
WHERE date BETWEEN '2025-09-01' AND '2025-09-30';

-- Reads seamlessly combine both predicates
SELECT * FROM btc_blocks WHERE date = '2025-08-01'; -- data from s3
SELECT * FROM btc_blocks WHERE date = '2025-09-05'; -- data from MergeTree (TODO: still analyzes s3)
SELECT * FROM btc_blocks WHERE date IN ('2025-08-31','2025-09-01') -- data from both sources, single copy always


-- Run analytic queries as usual
SELECT
date,
count(),
uniqExact(CAST(hash, 'Nullable(String)')) AS hashes,
sum(CAST(number, 'Nullable(Int64)')) AS blocks_seen
FROM btc_blocks
WHERE date BETWEEN '2025-08-01' AND '2025-09-30'
GROUP BY date
ORDER BY date;
```

Because the predicates are applied inside every segment, queries such as `ORDER BY`, `GROUP BY`, `LIMIT`, `JOIN`, and `EXPLAIN` behave as if you were reading from a single `Distributed` table. When sources expose different physical types (for example `FixedString(64)` versus `String` in Parquet), add explicit casts during ingestion or in the query, as shown above.
150 changes: 150 additions & 0 deletions src/Analyzer/Passes/HybridCastsPass.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#include <Analyzer/Passes/HybridCastsPass.h>

#include <Analyzer/QueryTreeBuilder.h>
#include <Analyzer/QueryTreePassManager.h>
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Analyzer/Utils.h>
#include <Analyzer/Resolve/IdentifierResolver.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/TableNode.h>
#include <Analyzer/UnionNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>

#include <Storages/IStorage.h>
#include <Storages/StorageDistributed.h>

#include <Core/Settings.h>
#include <Core/SettingsEnums.h>
#include <Common/Exception.h>

namespace DB
{

namespace Setting
{
extern const SettingsBool hybrid_table_auto_cast_columns;
}

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}

namespace
{

/// Collect Hybrid table expressions that require casts to normalize headers across segments.
///
/// Hybrid is currently exposed only as an engine (TableNode). If it ever gets a table function
/// wrapper, this visitor must also look at TableFunctionNode and unwrap to the underlying
/// StorageDistributed so cached casts can be picked up there as well.
class HybridCastTablesCollector : public InDepthQueryTreeVisitor<HybridCastTablesCollector>
{
public:
explicit HybridCastTablesCollector(std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map_)
: cast_map(cast_map_)
{}

static bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr &) { return true; }

void visitImpl(QueryTreeNodePtr & node)
{
const auto * table = node->as<TableNode>();
if (!table)
return;

const auto * storage = table->getStorage().get();
if (const auto * distributed = typeid_cast<const StorageDistributed *>(storage))
{
ColumnsDescription to_cast = distributed->getColumnsToCast();
if (!to_cast.empty())
cast_map.emplace(node.get(), std::move(to_cast)); // repeated table_expression can overwrite
}
}

private:
std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map;
};

// Visitor replaces all usages of the column with CAST(column, type) in the query tree.
class HybridCastVisitor : public InDepthQueryTreeVisitor<HybridCastVisitor>
{
public:
HybridCastVisitor(
const std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map_,
ContextPtr context_)
: cast_map(cast_map_)
, context(std::move(context_))
{}

bool shouldTraverseTopToBottom() const { return false; }

static bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & child)
{
/// Traverse all child nodes so casts also apply inside subqueries and UNION branches.
(void)child;
return true;
}

void visitImpl(QueryTreeNodePtr & node)
{
auto * column_node = node->as<ColumnNode>();
if (!column_node)
return;

auto column_source = column_node->getColumnSourceOrNull();
if (!column_source)
return;

auto it = cast_map.find(column_source.get());
if (it == cast_map.end())
return;

const auto & column_name = column_node->getColumnName();
auto expected_column_opt = it->second.tryGetPhysical(column_name);
if (!expected_column_opt)
return;

auto column_clone = std::static_pointer_cast<ColumnNode>(column_node->clone());

auto cast_node = buildCastFunction(column_clone, expected_column_opt->type, context);
const auto & alias = node->getAlias();
if (!alias.empty())
cast_node->setAlias(alias);
else
cast_node->setAlias(expected_column_opt->name);

node = cast_node;
}

private:
const std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map;
ContextPtr context;
};


} // namespace

void HybridCastsPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
const auto & settings = context->getSettingsRef();
if (!settings[Setting::hybrid_table_auto_cast_columns])
return;

auto * query = query_tree_node->as<QueryNode>();
if (!query)
return;

std::unordered_map<const IQueryTreeNode *, ColumnsDescription> cast_map;
HybridCastTablesCollector collector(cast_map);
collector.visit(query_tree_node);
if (cast_map.empty())
return;

HybridCastVisitor visitor(cast_map, context);
visitor.visit(query_tree_node);
}

}
32 changes: 32 additions & 0 deletions src/Analyzer/Passes/HybridCastsPass.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include <Analyzer/IQueryTreePass.h>
#include <Interpreters/Context_fwd.h>

namespace DB
{

/// Adds CASTs for Hybrid segments when physical types differ from the Hybrid schema
///
/// It normalizes headers coming from different segments when table structure in some segments
/// differs from the Hybrid table definition. For example column X is UInt32 in the Hybrid table,
/// but Int64 in an additional segment.
///
/// Without these casts ConvertingActions may fail to reconcile mismatched headers when casts are impossible
/// (e.g. AggregateFunction states carry hashed data tied to their argument type and cannot be recast), for example:
/// "Conversion from AggregateFunction(uniq, Decimal(38, 0)) to AggregateFunction(uniq, UInt64) is not supported"
/// (CANNOT_CONVERT_TYPE).
///
/// Per-segment casts are not reliable because WithMergeState strips aliases, so merged pipelines
/// from different segments would return different headers (with or without CAST), leading to errors
/// like "Cannot find column `max(value)` in source stream, there are only columns: [max(_CAST(value, 'UInt64'))]"
/// (THERE_IS_NO_COLUMN).
class HybridCastsPass : public IQueryTreePass
{
public:
String getName() override { return "HybridCastsPass"; }
String getDescription() override { return "Inject casts for Hybrid columns to match schema types"; }
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

}
3 changes: 3 additions & 0 deletions src/Analyzer/QueryTreePassManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include <Analyzer/Passes/SumIfToCountIfPass.h>
#include <Analyzer/Passes/UniqInjectiveFunctionsEliminationPass.h>
#include <Analyzer/Passes/UniqToCountPass.h>
#include <Analyzer/Passes/HybridCastsPass.h>
#include <Analyzer/Utils.h>

namespace DB
Expand Down Expand Up @@ -325,6 +326,8 @@ void addQueryTreePasses(QueryTreePassManager & manager, bool only_analyze)
manager.addPass(std::make_unique<InverseDictionaryLookupPass>());

manager.addPass(std::make_unique<DisableParallelReplicasPass>());

manager.addPass(std::make_unique<HybridCastsPass>());
}

}
Loading
Loading