diff --git a/docs/en/engines/table-engines/special/hybrid.md b/docs/en/engines/table-engines/special/hybrid.md new file mode 100644 index 000000000000..12df6cd859b8 --- /dev/null +++ b/docs/en/engines/table-engines/special/hybrid.md @@ -0,0 +1,120 @@ +--- +description: 'Hybrid unions multiple data sources behind per-segment predicates so queries behave like a single table while data is migrated or tiered.' +slug: /engines/table-engines/special/hybrid +title: 'Hybrid Table Engine' +sidebar_label: 'Hybrid' +sidebar_position: 11 +--- + +# Hybrid table engine + +`Hybrid` builds on top of the [Distributed](./distributed.md) table engine. It lets you expose several data sources as one logical table and assign every source its own predicate. +The engine rewrites incoming queries so that each segment receives the original query plus its predicate. This keeps all of the Distributed optimisations (remote aggregation, `skip_unused_shards`, +global JOIN pushdown, and so on) while you duplicate or migrate data across clusters, storage types, or formats. + +It keeps the same execution pipeline as `engine=Distributed` but can read from multiple underlying sources simultaneously—similar to `engine=Merge`—while still pushing logic down to each source. + +Typical use cases include: + +- Zero-downtime migrations where "old" and "new" replicas temporarily overlap. +- Tiered storage, for example fresh data on a local cluster and historical data in S3. +- Gradual roll-outs where only a subset of rows should be served from a new backend. + +By giving mutually exclusive predicates to the segments (for example, `date < watermark` and `date >= watermark`), you ensure that each row is read from exactly one source. + +## Enable the engine + +The Hybrid engine is experimental. Enable it per session (or in the user profile) before creating tables: + +```sql +SET allow_experimental_hybrid_table = 1; +``` + +### Automatic Type Alignment + +Hybrid segments can evolve independently, so the same logical column may use different physical types. With the experimental `hybrid_table_auto_cast_columns = 1` **(enabled by default and requires `allow_experimental_analyzer = 1`)**, the engine inserts the necessary `CAST` operations into each rewritten query so every shard receives the schema defined by the Hybrid table. You can opt out by setting the flag to `0` if it causes issues. + +Segment schemas are cached when you create or attach a Hybrid table. If you alter a segment later (for example change a column type), refresh the Hybrid table (detach/attach or recreate it) so the cached headers stay in sync with the new schema; otherwise the auto-cast feature may miss the change and queries can still fail with header/type errors. + +## Engine definition + +```sql +CREATE TABLE [IF NOT EXISTS] [db.]table_name +( + column1 type1, + column2 type2, + ... +) +ENGINE = Hybrid(table_function_1, predicate_1 [, table_function_2, predicate_2 ...]) +``` + +You must pass at least two arguments – the first table function and its predicate. Additional sources are appended as `table_function, predicate` pairs. The first table function is also used for `INSERT` statements. + +### Arguments and behaviour + +- `table_function_n` must be a valid table function (for example `remote`, `remoteSecure`, `cluster`, `clusterAllReplicas`, `s3Cluster`) or a fully qualified table name (`database.table`). The first argument must be a table function—such as `remote` or `cluster`—because it instantiates the underlying `Distributed` storage. +- `predicate_n` must be an expression that can be evaluated on the table columns. The engine adds it to the segment's query with an additional `AND`, so expressions like `event_date >= '2025-09-01'` or `id BETWEEN 10 AND 15` are typical. +- The query planner picks the same processing stage for every segment as it does for the base `Distributed` plan, so remote aggregation, ORDER BY pushdown, `skip_unused_shards`, and the legacy/analyzer execution modes behave the same way. +- `INSERT` statements are forwarded to the first table function only. If you need multi-destination writes, use explicit `INSERT` statements into the respective sources. +- Align schemas across the segments. ClickHouse builds a common header and rejects creation if any segment misses a column defined in the Hybrid schema. If the physical types differ you may need to add casts on one side or in the query, just as you would when reading from heterogeneous replicas. + +## Example: local cluster plus S3 historical tier + +The following commands illustrate a two-segment layout. Hot data stays on a local ClickHouse cluster, while historical rows come from public S3 Parquet files. + +```sql +-- Local MergeTree table that keeps current data +CREATE OR REPLACE TABLE btc_blocks_local +( + `hash` FixedString(64), + `version` Int64, + `mediantime` DateTime64(9), + `nonce` Int64, + `bits` FixedString(8), + `difficulty` Float64, + `chainwork` FixedString(64), + `size` Int64, + `weight` Int64, + `coinbase_param` String, + `number` Int64, + `transaction_count` Int64, + `merkle_root` FixedString(64), + `stripped_size` Int64, + `timestamp` DateTime64(9), + `date` Date +) +ENGINE = MergeTree +ORDER BY (timestamp) +PARTITION BY toYYYYMM(date); + +-- Hybrid table that unions the local shard with historical data in S3 +CREATE OR REPLACE TABLE btc_blocks ENGINE = Hybrid( + remote('localhost:9000', currentDatabase(), 'btc_blocks_local'), date >= '2025-09-01', + s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN), date < '2025-09-01' +) AS btc_blocks_local; + +-- Writes target the first (remote) segment +INSERT INTO btc_blocks +SELECT * +FROM s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN) +WHERE date BETWEEN '2025-09-01' AND '2025-09-30'; + +-- Reads seamlessly combine both predicates +SELECT * FROM btc_blocks WHERE date = '2025-08-01'; -- data from s3 +SELECT * FROM btc_blocks WHERE date = '2025-09-05'; -- data from MergeTree (TODO: still analyzes s3) +SELECT * FROM btc_blocks WHERE date IN ('2025-08-31','2025-09-01') -- data from both sources, single copy always + + +-- Run analytic queries as usual +SELECT + date, + count(), + uniqExact(CAST(hash, 'Nullable(String)')) AS hashes, + sum(CAST(number, 'Nullable(Int64)')) AS blocks_seen +FROM btc_blocks +WHERE date BETWEEN '2025-08-01' AND '2025-09-30' +GROUP BY date +ORDER BY date; +``` + +Because the predicates are applied inside every segment, queries such as `ORDER BY`, `GROUP BY`, `LIMIT`, `JOIN`, and `EXPLAIN` behave as if you were reading from a single `Distributed` table. When sources expose different physical types (for example `FixedString(64)` versus `String` in Parquet), add explicit casts during ingestion or in the query, as shown above. diff --git a/src/Analyzer/Passes/HybridCastsPass.cpp b/src/Analyzer/Passes/HybridCastsPass.cpp new file mode 100644 index 000000000000..f40e7664df50 --- /dev/null +++ b/src/Analyzer/Passes/HybridCastsPass.cpp @@ -0,0 +1,150 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +namespace DB +{ + +namespace Setting +{ + extern const SettingsBool hybrid_table_auto_cast_columns; +} + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +namespace +{ + +/// Collect Hybrid table expressions that require casts to normalize headers across segments. +/// +/// Hybrid is currently exposed only as an engine (TableNode). If it ever gets a table function +/// wrapper, this visitor must also look at TableFunctionNode and unwrap to the underlying +/// StorageDistributed so cached casts can be picked up there as well. +class HybridCastTablesCollector : public InDepthQueryTreeVisitor +{ +public: + explicit HybridCastTablesCollector(std::unordered_map & cast_map_) + : cast_map(cast_map_) + {} + + static bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr &) { return true; } + + void visitImpl(QueryTreeNodePtr & node) + { + const auto * table = node->as(); + if (!table) + return; + + const auto * storage = table->getStorage().get(); + if (const auto * distributed = typeid_cast(storage)) + { + ColumnsDescription to_cast = distributed->getColumnsToCast(); + if (!to_cast.empty()) + cast_map.emplace(node.get(), std::move(to_cast)); // repeated table_expression can overwrite + } + } + +private: + std::unordered_map & cast_map; +}; + +// Visitor replaces all usages of the column with CAST(column, type) in the query tree. +class HybridCastVisitor : public InDepthQueryTreeVisitor +{ +public: + HybridCastVisitor( + const std::unordered_map & cast_map_, + ContextPtr context_) + : cast_map(cast_map_) + , context(std::move(context_)) + {} + + bool shouldTraverseTopToBottom() const { return false; } + + static bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & child) + { + /// Traverse all child nodes so casts also apply inside subqueries and UNION branches. + (void)child; + return true; + } + + void visitImpl(QueryTreeNodePtr & node) + { + auto * column_node = node->as(); + if (!column_node) + return; + + auto column_source = column_node->getColumnSourceOrNull(); + if (!column_source) + return; + + auto it = cast_map.find(column_source.get()); + if (it == cast_map.end()) + return; + + const auto & column_name = column_node->getColumnName(); + auto expected_column_opt = it->second.tryGetPhysical(column_name); + if (!expected_column_opt) + return; + + auto column_clone = std::static_pointer_cast(column_node->clone()); + + auto cast_node = buildCastFunction(column_clone, expected_column_opt->type, context); + const auto & alias = node->getAlias(); + if (!alias.empty()) + cast_node->setAlias(alias); + else + cast_node->setAlias(expected_column_opt->name); + + node = cast_node; + } + +private: + const std::unordered_map & cast_map; + ContextPtr context; +}; + + +} // namespace + +void HybridCastsPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context) +{ + const auto & settings = context->getSettingsRef(); + if (!settings[Setting::hybrid_table_auto_cast_columns]) + return; + + auto * query = query_tree_node->as(); + if (!query) + return; + + std::unordered_map cast_map; + HybridCastTablesCollector collector(cast_map); + collector.visit(query_tree_node); + if (cast_map.empty()) + return; + + HybridCastVisitor visitor(cast_map, context); + visitor.visit(query_tree_node); +} + +} diff --git a/src/Analyzer/Passes/HybridCastsPass.h b/src/Analyzer/Passes/HybridCastsPass.h new file mode 100644 index 000000000000..6b3159d6e925 --- /dev/null +++ b/src/Analyzer/Passes/HybridCastsPass.h @@ -0,0 +1,32 @@ +#pragma once + +#include +#include + +namespace DB +{ + +/// Adds CASTs for Hybrid segments when physical types differ from the Hybrid schema +/// +/// It normalizes headers coming from different segments when table structure in some segments +/// differs from the Hybrid table definition. For example column X is UInt32 in the Hybrid table, +/// but Int64 in an additional segment. +/// +/// Without these casts ConvertingActions may fail to reconcile mismatched headers when casts are impossible +/// (e.g. AggregateFunction states carry hashed data tied to their argument type and cannot be recast), for example: +/// "Conversion from AggregateFunction(uniq, Decimal(38, 0)) to AggregateFunction(uniq, UInt64) is not supported" +/// (CANNOT_CONVERT_TYPE). +/// +/// Per-segment casts are not reliable because WithMergeState strips aliases, so merged pipelines +/// from different segments would return different headers (with or without CAST), leading to errors +/// like "Cannot find column `max(value)` in source stream, there are only columns: [max(_CAST(value, 'UInt64'))]" +/// (THERE_IS_NO_COLUMN). +class HybridCastsPass : public IQueryTreePass +{ +public: + String getName() override { return "HybridCastsPass"; } + String getDescription() override { return "Inject casts for Hybrid columns to match schema types"; } + void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override; +}; + +} diff --git a/src/Analyzer/QueryTreePassManager.cpp b/src/Analyzer/QueryTreePassManager.cpp index 46e7b63b0423..8e25989cb8eb 100644 --- a/src/Analyzer/QueryTreePassManager.cpp +++ b/src/Analyzer/QueryTreePassManager.cpp @@ -54,6 +54,7 @@ #include #include #include +#include #include namespace DB @@ -325,6 +326,8 @@ void addQueryTreePasses(QueryTreePassManager & manager, bool only_analyze) manager.addPass(std::make_unique()); manager.addPass(std::make_unique()); + + manager.addPass(std::make_unique()); } } diff --git a/src/Analyzer/Utils.cpp b/src/Analyzer/Utils.cpp index 08d7d186aa2f..50886d3774c2 100644 --- a/src/Analyzer/Utils.cpp +++ b/src/Analyzer/Utils.cpp @@ -49,6 +49,8 @@ #include +#include + #include namespace DB { @@ -975,6 +977,174 @@ void resolveAggregateFunctionNodeByName(FunctionNode & function_node, const Stri function_node.resolveAsAggregateFunction(std::move(aggregate_function)); } +namespace +{ + +/// `materialized_only`: +/// false = strip any marker. +/// true = strip only markers that arrived in the query already materialized (arg2 is String). +/// Markers injected in the current rewrite keep arg2 as ColumnNode until finalization. +bool stripAliasMarker(QueryTreeNodePtr & node, bool materialized_only) +{ + auto * function_node = node->as(); + if (!function_node || function_node->getFunctionName() != "__aliasMarker") + return false; + + auto & arguments = function_node->getArguments().getNodes(); + if (arguments.size() != 2 || !arguments[0] || !arguments[1]) + return false; + + if (materialized_only) + { + const auto * marker_id_node = arguments[1]->as(); + if (!marker_id_node || !isString(marker_id_node->getResultType())) + return false; + } + + auto replacement = arguments[0]; + if (!replacement->hasAlias() && function_node->hasAlias()) + replacement->setAlias(function_node->getAlias()); + + node = std::move(replacement); + return true; +} + +String buildDeterministicFallbackAliasMarkerId(const ColumnNode & marker_column_node, const QueryTreeNodePtr & marker_expression_node) +{ + IQueryTreeNode::CompareOptions compare_options + { + .compare_aliases = false, + .compare_types = false, + .ignore_cte = true, + }; + + String alias_id = marker_column_node.getColumnName(); + + if (const auto & marker_source = marker_column_node.getColumnSourceOrNull()) + { + /// Keep fallback ids deterministic and source-specific when table aliases are not available yet. + alias_id += "__src_" + getHexUIntLowercase(marker_source->getTreeHash(compare_options)); + } + + /// Add expression hash to avoid collapsing different marker payloads with the same column name. + alias_id += "__expr_" + getHexUIntLowercase(marker_expression_node->getTreeHash(compare_options)); + + return alias_id; +} + +void stripAliasMarkersFromPayloadSubtree(QueryTreeNodePtr & node) +{ + while (stripAliasMarker(node, false)) + {} + + for (auto & child : node->getChildren()) + { + if (child) + stripAliasMarkersFromPayloadSubtree(child); + } +} + +/// Finalize __aliasMarker nodes right before distributed SQL boundaries. +/// This pass strips nested markers from arg0 payload and materializes arg2 to String constant. +class FinalizeAliasMarkersForDistributedSerializationVisitor : public InDepthQueryTreeVisitor +{ +public: + explicit FinalizeAliasMarkersForDistributedSerializationVisitor(ContextPtr context_) + : context(std::move(context_)) + {} + + bool shouldTraverseTopToBottom() const + { + return false; + } + + static bool needChildVisit(const QueryTreeNodePtr & parent_node, const QueryTreeNodePtr &) + { + auto * function_node = parent_node->as(); + if (!function_node || function_node->getFunctionName() != "__aliasMarker") + return true; + + /// __aliasMarker subtrees are processed explicitly in visitImpl: + /// arg0 is recursively cleaned from nested wrappers and arg2 is materialized in place. + return false; + } + + void visitImpl(QueryTreeNodePtr & node) + { + auto * function_node = node->as(); + if (!function_node || function_node->getFunctionName() != "__aliasMarker") + return; + + auto & arguments = function_node->getArguments().getNodes(); + if (arguments.size() != 2 || !arguments[0] || !arguments[1]) + return; + + /// Remove nested marker wrappers in payload subtree; keep only this node as the boundary marker. + stripAliasMarkersFromPayloadSubtree(arguments[0]); + + String alias_id; + if (const auto * marker_column_node = arguments[1]->as()) + { + if (const auto & marker_source = marker_column_node->getColumnSourceOrNull(); + marker_source && marker_source->hasAlias()) + { + alias_id = marker_source->getAlias() + "." + marker_column_node->getColumnName(); + } + else + { + /// In some distributed subquery execution paths marker ids are materialized + /// before alias uniquification assigns source aliases. + alias_id = buildDeterministicFallbackAliasMarkerId(*marker_column_node, arguments[0]); + } + } + else if (const auto * marker_id_node = arguments[1]->as(); + marker_id_node && isString(marker_id_node->getResultType())) + { + alias_id = marker_id_node->getValue().safeGet(); + } + + if (alias_id.empty()) + return; + + arguments[1] = std::make_shared(std::move(alias_id), std::make_shared()); + resolveOrdinaryFunctionNodeByName(*function_node, "__aliasMarker", context); + } + +private: + ContextPtr context; +}; + +/// Strip incoming __aliasMarker wrappers between distributed hops. +/// This keeps marker lifecycle hop-local and avoids forwarding stale previous-hop ids. +class StripMaterializedAliasMarkersVisitor : public InDepthQueryTreeVisitor +{ +public: + bool shouldTraverseTopToBottom() const + { + return false; + } + + void visitImpl(QueryTreeNodePtr & node) + { + while (stripAliasMarker(node, true)) + {} + } +}; + +} + +void finalizeAliasMarkersForDistributedSerialization(QueryTreeNodePtr & node, const ContextPtr & context) +{ + FinalizeAliasMarkersForDistributedSerializationVisitor visitor(context); + visitor.visit(node); +} + +void stripMaterializedAliasMarkers(QueryTreeNodePtr & node) +{ + StripMaterializedAliasMarkersVisitor visitor; + visitor.visit(node); +} + std::pair getExpressionSource(const QueryTreeNodePtr & node) { if (const auto * column = node->as()) diff --git a/src/Analyzer/Utils.h b/src/Analyzer/Utils.h index 9a19af2b4e0d..0ace391dc488 100644 --- a/src/Analyzer/Utils.h +++ b/src/Analyzer/Utils.h @@ -157,6 +157,15 @@ void resolveOrdinaryFunctionNodeByName(FunctionNode & function_node, const Strin /// Arguments and parameters are taken from the node. void resolveAggregateFunctionNodeByName(FunctionNode & function_node, const String & function_name); +/// Finalize __aliasMarker nodes before distributed SQL boundaries: +/// 1) collapse nested wrappers to keep only current-hop marker id; +/// 2) materialize marker id (arg2) to String ConstantNode. +void finalizeAliasMarkersForDistributedSerialization(QueryTreeNodePtr & node, const ContextPtr & context); + +/// Remove incoming/materialized __aliasMarker wrappers (arg2 is String ConstantNode), +/// preserving wrapped expressions. +void stripMaterializedAliasMarkers(QueryTreeNodePtr & node); + /// Returns single source of expression node. /// First element of pair is source node, can be nullptr if there are no sources or multiple sources. /// Second element of pair is true if there is at most one source, false if there are multiple sources. diff --git a/src/Analyzer/createUniqueAliasesIfNecessary.cpp b/src/Analyzer/createUniqueAliasesIfNecessary.cpp index 8e8192e29020..3acdf70d69e2 100644 --- a/src/Analyzer/createUniqueAliasesIfNecessary.cpp +++ b/src/Analyzer/createUniqueAliasesIfNecessary.cpp @@ -226,6 +226,7 @@ void createUniqueAliasesIfNecessary(QueryTreeNodePtr & node, const ContextPtr & * It's required to create a valid AST for distributed query. */ CreateUniqueArrayJoinAliasesVisitor(context).visit(node); + } } diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index e72653747d54..0d32bb970af2 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -2280,6 +2280,11 @@ Show internal aliases (such as __table1) in EXPLAIN PLAN instead of those specif \ DECLARE(UInt64, query_plan_max_step_description_length, 500, R"( Maximum length of step description in EXPLAIN PLAN. +)", 0) \ + \ + DECLARE(Bool, enable_alias_marker, true, R"( +Enable __aliasMarker injection for ALIAS column expressions when using the analyzer. +This stabilizes action node names across planner/analyzer stages without changing query semantics. )", 0) \ \ DECLARE(UInt64, preferred_block_size_bytes, 1000000, R"( @@ -7338,6 +7343,12 @@ Allows creation of tables with the [TimeSeries](../../engines/table-engines/inte - 0 — the [TimeSeries](../../engines/table-engines/integrations/time-series.md) table engine is disabled. - 1 — the [TimeSeries](../../engines/table-engines/integrations/time-series.md) table engine is enabled. )", EXPERIMENTAL) \ + DECLARE(Bool, allow_experimental_hybrid_table, false, R"( +Allows creation of tables with the [Hybrid](../../engines/table-engines/special/hybrid.md) table engine. +)", EXPERIMENTAL) \ + DECLARE(Bool, hybrid_table_auto_cast_columns, true, R"( +Automatically cast columns to the schema defined in Hybrid tables when remote segments expose different physical types. Works only with analyzer. Enabled by default, does nothing if (experimental) Hybrid tables are disabled; disable it if it causes issues. Segment schemas are cached when the Hybrid table is created or attached; if a segment schema changes later, detach/attach or recreate the Hybrid table so the cached headers stay in sync. +)", 0) \ DECLARE(Bool, allow_experimental_codecs, false, R"( If it is set to true, allow to specify experimental compression codecs (but we don't have those yet and this option does nothing). )", EXPERIMENTAL) \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index e261e53d1382..d53741bd224f 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -44,6 +44,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() // }); addSettingsChanges(settings_changes_history, "26.1", { + // altinity: antalya-specific features + {"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."}, + {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}, + {"enable_alias_marker", true, true, "New setting."}, + // altinity: antalya-specific features {"parallel_replicas_filter_pushdown", false, false, "New setting"}, {"use_statistics", true, true, "Enable this optimization by default."}, {"ignore_on_cluster_for_replicated_database", false, false, "Add a new setting to ignore ON CLUSTER clause for DDL queries with a replicated database."}, diff --git a/src/Databases/enableAllExperimentalSettings.cpp b/src/Databases/enableAllExperimentalSettings.cpp index b659d3517615..2ef37686b3d2 100644 --- a/src/Databases/enableAllExperimentalSettings.cpp +++ b/src/Databases/enableAllExperimentalSettings.cpp @@ -56,6 +56,7 @@ void enableAllExperimentalSettings(ContextMutablePtr context) context->setSetting("allow_experimental_ytsaurus_table_engine", 1); context->setSetting("allow_experimental_ytsaurus_dictionary_source", 1); context->setSetting("allow_experimental_time_series_aggregate_functions", 1); + context->setSetting("allow_experimental_hybrid_table", 1); context->setSetting("allow_experimental_lightweight_update", 1); context->setSetting("allow_experimental_insert_into_iceberg", 1); context->setSetting("allow_experimental_iceberg_compaction", 1); diff --git a/src/Functions/identity.cpp b/src/Functions/identity.cpp index 2cfde87b88db..473c26a2396c 100644 --- a/src/Functions/identity.cpp +++ b/src/Functions/identity.cpp @@ -1,5 +1,6 @@ #include #include +#include namespace DB { @@ -33,4 +34,21 @@ REGISTER_FUNCTION(ActionName) factory.registerFunction(); } +REGISTER_FUNCTION(AliasMarker) +{ + factory.registerFunction(FunctionDocumentation{ + .description = R"( +Internal function. Not for direct use. +)", + .syntax = {"__aliasMarker(expr, alias_name)"}, + .arguments = { + {"expr", "Expression to mark.", {"Any"}}, + {"alias_name", "Alias name attached to the expression.", {"String"}}, + }, + .returned_value = {"Returns expr unchanged.", {"Any"}}, + .introduced_in = {25, 8}, + .category = FunctionDocumentation::Category::Other, + }); +} + } diff --git a/src/Functions/identity.h b/src/Functions/identity.h index c16086648e7c..195040a5eec6 100644 --- a/src/Functions/identity.h +++ b/src/Functions/identity.h @@ -97,4 +97,56 @@ class FunctionActionName : public FunctionIdentityBase } }; +struct AliasMarkerName +{ + static constexpr auto name = "__aliasMarker"; +}; + +/** + * __aliasMarker is a transport-time alias preservation hint for distributed SQL paths. + * + * Why it exists: + * - When a distributed query is planned and mergeable-state flows are used, the final aliasing step + * is intentionally skipped. + * - That is desired, but it also prevents preserving/injecting initiator-side expression names + * (for example, names coming from ALIAS columns or certain CAST expressions). + * - This becomes especially problematic when shard schemas differ slightly. + * - Some injected alias columns must preserve a specific output name; otherwise remote headers may diverge + * from initiator expectations (header mismatch, wrong column association, and similar inconsistencies). + * + * Lifecycle/invariants: + * 1) Injected only around rewritten alias expressions that require stable output identity. + * 2) Materialized before SQL serialization: the marker id is converted to a String alias identifier. + * 3) Consumed by analyzer/planner on receiver to enforce alias naming in actions. + * 4) Removed/stripped before forwarding to the next hop, then (if needed) re-injected for that hop only. + * + * This is a temporary bridge while distributed plan transport still relies on SQL text in these paths. + * As query plan serialization fully replaces that boundary, this marker path should become unnecessary. + */ +class FunctionAliasMarker : public IFunction +{ +public: + static constexpr auto name = AliasMarkerName::name; + static FunctionPtr create(ContextPtr) { return std::make_shared(); } + + String getName() const override { return name; } + size_t getNumberOfArguments() const override { return 2; } + ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {}; } + bool isSuitableForConstantFolding() const override { return false; } + bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; } + + DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override + { + if (arguments.size() != 2) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Function __aliasMarker expects 2 arguments"); + + return arguments.front(); + } + + ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override + { + return arguments.front().column; + } +}; + } diff --git a/src/Interpreters/ActionsVisitor.cpp b/src/Interpreters/ActionsVisitor.cpp index 278558f23049..3e4e13247d12 100644 --- a/src/Interpreters/ActionsVisitor.cpp +++ b/src/Interpreters/ActionsVisitor.cpp @@ -705,6 +705,9 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data & if (node.name == "lambda") throw Exception(ErrorCodes::UNEXPECTED_EXPRESSION, "Unexpected lambda expression"); + if (node.name == "__aliasMarker") + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Function __aliasMarker is internal and supported only with the analyzer"); + /// Function arrayJoin. if (node.name == "arrayJoin") { diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp index 55ae930c7f01..128d0af00418 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp @@ -59,7 +59,8 @@ ASTPtr rewriteSelectQuery( const ASTPtr & query, const std::string & remote_database, const std::string & remote_table, - ASTPtr table_function_ptr) + ASTPtr table_function_ptr, + ASTPtr additional_filter) { auto modified_query_ast = query->clone(); @@ -72,8 +73,33 @@ ASTPtr rewriteSelectQuery( if (!context->getSettingsRef()[Setting::allow_experimental_analyzer]) { + // Apply additional filter if provided + if (additional_filter) + { + if (select_query.where()) + { + /// WHERE AND + select_query.setExpression( + ASTSelectQuery::Expression::WHERE, + makeASTFunction("and", select_query.where(), additional_filter->clone())); + } + else + { + /// No WHERE – simply set it + select_query.setExpression( + ASTSelectQuery::Expression::WHERE, additional_filter->clone()); + } + } + if (table_function_ptr) - select_query.addTableFunction(table_function_ptr); + { + select_query.addTableFunction(table_function_ptr->clone()); + + // Reset semantic table information for all column identifiers to prevent + // RestoreQualifiedNamesVisitor from adding wrong table names + ResetSemanticTableVisitor::Data data; + ResetSemanticTableVisitor(data).visit(modified_query_ast); + } else select_query.replaceDatabaseAndTable(remote_database, remote_table); @@ -85,6 +111,7 @@ ASTPtr rewriteSelectQuery( data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query->as(), 0)); data.remote_table.database = remote_database; data.remote_table.table = remote_table; + RestoreQualifiedNamesVisitor(data).visit(modified_query_ast); } } diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.h b/src/Interpreters/ClusterProxy/SelectStreamFactory.h index 0b6246c31f94..413688504e92 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.h +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.h @@ -41,7 +41,8 @@ ASTPtr rewriteSelectQuery( const ASTPtr & query, const std::string & remote_database, const std::string & remote_table, - ASTPtr table_function_ptr = nullptr); + ASTPtr table_function_ptr = nullptr, + ASTPtr additional_filter = nullptr); using ColumnsDescriptionByShardNum = std::unordered_map; using AdditionalShardFilterGenerator = std::function; diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index cc3a1e2955b5..d77e5fd1030f 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -332,7 +333,8 @@ void executeQuery( const std::string & sharding_key_column_name, const DistributedSettings & distributed_settings, AdditionalShardFilterGenerator shard_filter_generator, - bool is_remote_function) + bool is_remote_function, + std::span additional_query_infos) { const Settings & settings = context->getSettingsRef(); @@ -360,6 +362,7 @@ void executeQuery( new_context->increaseDistributedDepth(); const size_t shards = cluster->getShardCount(); + const bool has_additional_query_infos = !additional_query_infos.empty(); if (context->getSettingsRef()[Setting::allow_experimental_analyzer]) { @@ -468,6 +471,28 @@ void executeQuery( plans.emplace_back(std::move(plan)); } + if (has_additional_query_infos) + { + if (!header) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Header is not initialized for local hybrid plan creation"); + + const Block & header_block = *header; + for (const auto & additional_query_info : additional_query_infos) + { + auto additional_plan = createLocalPlan( + additional_query_info.query, + header_block, + context, + processed_stage, + 0, /// shard_num is not applicable for local hybrid plans + 1, /// shard_count is not applicable for local hybrid plans + false, + ""); + + plans.emplace_back(std::move(additional_plan)); + } + } + if (plans.empty()) return; @@ -483,6 +508,8 @@ void executeQuery( input_headers.emplace_back(plan->getCurrentHeader()); auto union_step = std::make_unique(std::move(input_headers)); + if (has_additional_query_infos) + union_step->setStepDescription("Hybrid"); query_plan.unitePlans(std::move(union_step), std::move(plans)); } diff --git a/src/Interpreters/ClusterProxy/executeQuery.h b/src/Interpreters/ClusterProxy/executeQuery.h index 3ec53b5ddb78..d7fcd6a75af2 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.h +++ b/src/Interpreters/ClusterProxy/executeQuery.h @@ -5,6 +5,7 @@ #include #include +#include namespace DB { @@ -88,7 +89,8 @@ void executeQuery( const std::string & sharding_key_column_name, const DistributedSettings & distributed_settings, AdditionalShardFilterGenerator shard_filter_generator, - bool is_remote_function); + bool is_remote_function, + std::span additional_query_infos = {}); std::optional executeInsertSelectWithParallelReplicas( const ASTInsertQuery & query_ast, diff --git a/src/Interpreters/TranslateQualifiedNamesVisitor.cpp b/src/Interpreters/TranslateQualifiedNamesVisitor.cpp index 11e9f47ccc7b..397498638072 100644 --- a/src/Interpreters/TranslateQualifiedNamesVisitor.cpp +++ b/src/Interpreters/TranslateQualifiedNamesVisitor.cpp @@ -437,4 +437,15 @@ void RestoreQualifiedNamesMatcher::visit(ASTIdentifier & identifier, ASTPtr &, D } } +void ResetSemanticTableMatcher::visit(ASTPtr & ast, Data & data) +{ + if (auto * t = ast->as()) + visit(*t, ast, data); +} + +void ResetSemanticTableMatcher::visit(ASTIdentifier & identifier, ASTPtr &, Data &) +{ + identifier.resetSemanticTable(); +} + } diff --git a/src/Interpreters/TranslateQualifiedNamesVisitor.h b/src/Interpreters/TranslateQualifiedNamesVisitor.h index 00c85d08873f..becff4845755 100644 --- a/src/Interpreters/TranslateQualifiedNamesVisitor.h +++ b/src/Interpreters/TranslateQualifiedNamesVisitor.h @@ -80,4 +80,33 @@ struct RestoreQualifiedNamesMatcher using RestoreQualifiedNamesVisitor = InDepthNodeVisitor; + +/// Reset semantic->table for all column identifiers in the AST. +/// +/// PROBLEM DESCRIPTION: +/// When an AST is passed through multiple query rewrites (e.g., in Hybrid -> remote), +/// the semantic->table information attached to ASTIdentifier nodes can become stale and +/// cause incorrect column qualification. This happens because: +/// +/// 1. During initial parsing, semantic->table is populated with the original table name +/// 2. When the query is rewritten (e.g., FROM clause changed from table to remote() function inside Hybrid), +/// the AST structure is modified but semantic->table information is preserved +/// 3. Subsequent visitors like RestoreQualifiedNamesVisitor called in remote() function over the same AST +/// may use this stale semantic->table information to incorrectly qualify column names with the original table name +/// +/// SOLUTION: +/// This visitor clears semantic->table for all column identifiers, ensuring that subsequent +/// visitors work with clean semantic information and don't apply stale table qualifications. +struct ResetSemanticTableMatcher +{ + // No data needed for this visitor + struct Data {}; + + static bool needChildVisit(const ASTPtr &, const ASTPtr &) { return true; } + static void visit(ASTPtr & ast, Data & data); + static void visit(ASTIdentifier & identifier, ASTPtr &, Data & data); +}; + +using ResetSemanticTableVisitor = InDepthNodeVisitor; + } diff --git a/src/Parsers/ASTIdentifier.cpp b/src/Parsers/ASTIdentifier.cpp index d8171415de4a..07dcee853c31 100644 --- a/src/Parsers/ASTIdentifier.cpp +++ b/src/Parsers/ASTIdentifier.cpp @@ -167,6 +167,18 @@ void ASTIdentifier::restoreTable() } } +void ASTIdentifier::resetSemanticTable() +{ + // Only reset semantic table for column identifiers (not table identifiers) + if (semantic && !semantic->special) + { + semantic->table.clear(); + semantic->can_be_alias = true; + semantic->membership = std::nullopt; + } +} + + boost::intrusive_ptr ASTIdentifier::createTable() const { if (name_parts.size() == 1) return make_intrusive(name_parts[0]); diff --git a/src/Parsers/ASTIdentifier.h b/src/Parsers/ASTIdentifier.h index 02825fda69b6..f4e35f989101 100644 --- a/src/Parsers/ASTIdentifier.h +++ b/src/Parsers/ASTIdentifier.h @@ -52,6 +52,8 @@ class ASTIdentifier : public ASTWithAlias void updateTreeHashImpl(SipHash & hash_state, bool ignore_alias) const override; void restoreTable(); // TODO(ilezhankin): get rid of this + void resetSemanticTable(); // Reset semantic to empty string (see ResetSemanticTableVisitor) + boost::intrusive_ptr createTable() const; // returns |nullptr| if identifier is not table. String full_name; diff --git a/src/Planner/PlannerActionsVisitor.cpp b/src/Planner/PlannerActionsVisitor.cpp index bc5251980c1c..57ceb2d4b03d 100644 --- a/src/Planner/PlannerActionsVisitor.cpp +++ b/src/Planner/PlannerActionsVisitor.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -30,6 +31,7 @@ #include #include +#include #include #include @@ -87,6 +89,17 @@ String calculateActionNodeNameWithCastIfNeeded(const ConstantNode & constant_nod return buffer.str(); } +String tryExtractAliasMarkerIdFromSecondArgument(const QueryTreeNodePtr & argument) +{ + if (const auto * second_argument_constant = argument->as(); + second_argument_constant && isString(second_argument_constant->getResultType())) + { + return second_argument_constant->getValue().safeGet(); + } + + return {}; +} + class ActionNodeNameHelper { public: @@ -179,7 +192,23 @@ class ActionNodeNameHelper case QueryTreeNodeType::FUNCTION: { const auto & function_node = node->as(); - if (function_node.getFunctionName() == "__actionName") + if (function_node.getFunctionName() == "__aliasMarker") + { + /// Perform sanity check, because user may call this function with unexpected arguments + const auto & function_argument_nodes = function_node.getArguments().getNodes(); + if (function_argument_nodes.size() != 2) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Function __aliasMarker expects 2 arguments"); + + result = tryExtractAliasMarkerIdFromSecondArgument(function_argument_nodes.at(1)); + if (result.empty()) + result = calculateActionNodeName(function_argument_nodes.at(0)); + + /// Empty node name is not allowed and leads to logical errors + if (result.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Function __aliasMarker is internal and should not be used directly"); + break; + } + else if (function_node.getFunctionName() == "__actionName") { /// Perform sanity check, because user may call this function with unexpected arguments const auto & function_argument_nodes = function_node.getArguments().getNodes(); @@ -364,7 +393,7 @@ class ActionNodeNameHelper } default: { - throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid action query tree node {}", node->formatASTForErrorMessage()); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid action query tree node {} (node_type: {})", node->formatASTForErrorMessage(), static_cast(node_type)); } } @@ -617,6 +646,18 @@ class ActionsScopeNode return node; } + const ActionsDAG::Node * addAliasIfNecessary(const std::string & node_name, const ActionsDAG::Node * child) + { + auto it = node_name_to_node.find(node_name); + if (it != node_name_to_node.end()) + return it->second; + + const auto * node = &actions_dag.addAlias(*child, node_name); + node_name_to_node[node->result_name] = node; + + return node; + } + private: std::unordered_map node_name_to_node; ActionsDAG & actions_dag; @@ -1082,6 +1123,34 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::vi { const auto & function_node = node->as(); + if (function_node.getFunctionName() == "__aliasMarker") + { + const auto & function_arguments = function_node.getArguments().getNodes(); + if (function_arguments.size() != 2) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Function __aliasMarker expects 2 arguments"); + + auto [child_name, levels] = visitImpl(function_arguments.at(0)); + auto alias_id = tryExtractAliasMarkerIdFromSecondArgument(function_arguments.at(1)); + if (alias_id.empty()) + alias_id = child_name; + + if (alias_id == child_name) + return {child_name, levels}; + + size_t level = levels.max(); + const auto * child_node = actions_stack[level].getNodeOrThrow(child_name); + actions_stack[level].addAliasIfNecessary(alias_id, child_node); + + size_t actions_stack_size = actions_stack.size(); + for (size_t i = level + 1; i < actions_stack_size; ++i) + { + auto & actions_stack_node = actions_stack[i]; + actions_stack_node.addInputColumnIfNecessary(alias_id, function_node.getResultType()); + } + + return {alias_id, levels}; + } + if (function_node.getFunctionName() == "indexHint") return visitIndexHintFunction(node); if (function_node.getFunctionName() == "exists") diff --git a/src/Planner/Utils.cpp b/src/Planner/Utils.cpp index 3da16547e3cc..dafec6a09f51 100644 --- a/src/Planner/Utils.cpp +++ b/src/Planner/Utils.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 9327d63c4327..d9d9e18aae60 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -41,6 +41,7 @@ #include #include #include +#include #include #include #include @@ -87,6 +88,7 @@ #include #include +#include #include #include @@ -102,6 +104,7 @@ #include #include +#include #include #include @@ -115,9 +118,9 @@ #include #include #include - #include #include +#include namespace fs = std::filesystem; @@ -148,6 +151,29 @@ namespace CurrentMetrics namespace DB { +namespace +{ +void replaceCurrentDatabaseFunction(ASTPtr & ast, const ContextPtr & context) +{ + if (!ast) + return; + + if (auto * func = ast->as()) + { + if (func->name == "currentDatabase") + { + ast = evaluateConstantExpressionForDatabaseName(ast, context); + return; + } + } + + for (auto & child : ast->children) + replaceCurrentDatabaseFunction(child, context); +} + + +} + namespace Setting { extern const SettingsBool allow_experimental_analyzer; @@ -177,7 +203,10 @@ namespace Setting extern const SettingsBool prefer_localhost_replica; extern const SettingsUInt64 allow_experimental_parallel_reading_from_replicas; extern const SettingsBool prefer_global_in_and_join; + extern const SettingsBool serialize_query_plan; extern const SettingsBool enable_global_with_statement; + extern const SettingsBool allow_experimental_hybrid_table; + extern const SettingsBool enable_alias_marker; } namespace DistributedSetting @@ -198,6 +227,8 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; extern const int STORAGE_REQUIRES_PARAMETER; extern const int BAD_ARGUMENTS; + extern const int UNKNOWN_DATABASE; + extern const int UNKNOWN_TABLE; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int INCORRECT_NUMBER_OF_COLUMNS; extern const int INFINITE_LOOP; @@ -209,6 +240,7 @@ namespace ErrorCodes extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES; extern const int ARGUMENT_OUT_OF_BOUND; extern const int TOO_LARGE_DISTRIBUTED_DEPTH; + extern const int SUPPORT_IS_DISABLED; } namespace ActionLocks @@ -522,6 +554,10 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( if (to_stage == QueryProcessingStage::WithMergeableState) return QueryProcessingStage::WithMergeableState; + // TODO: check logic + if (!segments.empty()) + nodes += segments.size(); + /// If there is only one node, the query can be fully processed by the /// shard, initiator will work as a proxy only. if (nodes == 1) @@ -564,6 +600,9 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( bool StorageDistributed::isShardingKeySuitsQueryTreeNodeExpression( const QueryTreeNodePtr & expr, const SelectQueryInfo & query_info) const { + if (!segments.empty()) + return false; + ColumnsWithTypeAndName empty_input_columns; ColumnNodePtrWithHashSet empty_correlated_columns_set; // When comparing sharding key expressions, we need to ignore table qualifiers in column names @@ -604,6 +643,7 @@ bool StorageDistributed::isShardingKeySuitsQueryTreeNodeExpression( return allOutputsDependsOnlyOnAllowedNodes(sharding_key_dag, irreducibe_nodes, matches); } +// TODO: support additional segments std::optional StorageDistributed::getOptimizedQueryProcessingStageAnalyzer(const SelectQueryInfo & query_info, const Settings & settings) const { bool optimize_sharding_key_aggregation = settings[Setting::optimize_skip_unused_shards] && settings[Setting::optimize_distributed_group_by_sharding_key] @@ -662,6 +702,7 @@ std::optional StorageDistributed::getOptimizedQueryP return QueryProcessingStage::Complete; } +// TODO: support additional segments std::optional StorageDistributed::getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, const Settings & settings) const { bool optimize_sharding_key_aggregation = settings[Setting::optimize_skip_unused_shards] && settings[Setting::optimize_distributed_group_by_sharding_key] @@ -753,9 +794,11 @@ StorageSnapshotPtr StorageDistributed::getStorageSnapshot(const StorageMetadataP namespace { +/// Rebuild alias ColumnNode references into expression nodes and optionally +/// wrap them with __aliasMarker for distributed SQL transport. class ReplaseAliasColumnsVisitor : public InDepthQueryTreeVisitor { - static QueryTreeNodePtr getColumnNodeAliasExpression(const QueryTreeNodePtr & node) + QueryTreeNodePtr getColumnNodeAliasExpression(const QueryTreeNodePtr & node) const { const auto * column_node = node->as(); if (!column_node || !column_node->hasExpression()) @@ -768,16 +811,164 @@ class ReplaseAliasColumnsVisitor : public InDepthQueryTreeVisitorgetExpression(); - column_expression->setAlias(column_node->getColumnName()); - return column_expression; + const String original_expression_alias = column_expression->hasAlias() ? column_expression->getAlias() : String{}; + + const auto & settings = context->getSettingsRef(); + /// With serialized query plans we transfer actions directly and do not need SQL-only alias markers. + const bool use_alias_marker = settings[Setting::enable_alias_marker] && !settings[Setting::serialize_query_plan]; + if (!use_alias_marker) + { + return column_expression; + } + + if (auto * function_node = column_expression->as(); + function_node && function_node->getFunctionName() == "__aliasMarker") + { + auto & arguments = function_node->getArguments().getNodes(); + if (!arguments.empty() && arguments[0]) + column_expression = arguments[0]; + } + + QueryTreeNodes arguments; + arguments.reserve(2); + /// Preserve the original column reference in arg2 so normal analyzer passes + /// (alias/source uniquification) can still transform it consistently. + /// Before query is sent to shard this ColumnNode is materialized to String ConstantNode. + arguments.emplace_back(std::move(column_expression)); + arguments.emplace_back(std::make_shared(column_node->getColumn(), column_source)); + + auto alias_marker_node = std::make_shared("__aliasMarker"); + alias_marker_node->getArguments().getNodes() = std::move(arguments); + if (!original_expression_alias.empty()) + { + alias_marker_node->getArguments().getNodes()[0]->removeAlias(); + alias_marker_node->setAlias(original_expression_alias); + } + resolveOrdinaryFunctionNodeByName(*alias_marker_node, "__aliasMarker", context); + + return alias_marker_node; } public: + explicit ReplaseAliasColumnsVisitor(ContextPtr context_) : context(std::move(context_)) {} + void visitImpl(QueryTreeNodePtr & node) { if (auto column_expression = getColumnNodeAliasExpression(node)) node = column_expression; } + + static bool needChildVisit(const QueryTreeNodePtr & parent_node, const QueryTreeNodePtr & child_node) + { + auto * function_node = parent_node->as(); + if (!function_node || function_node->getFunctionName() != "__aliasMarker") + return true; + + const auto & arguments = function_node->getArguments().getNodes(); + if (arguments.size() < 2) + return true; + + /// Do not recurse into __aliasMarker arg2. + /// It is an internal column-reference payload used only for later id materialization, + /// and visiting it here can re-expand aliases or create recursive rewrites. + return child_node.get() != arguments[1].get(); + } + +private: + ContextPtr context; +}; + +using ColumnNameToColumnNodeMap = std::unordered_map; + +ColumnNameToColumnNodeMap buildColumnNodesForTableExpression(const QueryTreeNodePtr & table_expression_node, const ContextPtr & context) +{ + const TableNode * table_node = table_expression_node->as(); + const TableFunctionNode * table_function_node = table_expression_node->as(); + if (!table_node && !table_function_node) + return {}; + + // Rebuild per-column nodes (including ALIAS expressions) for the replacement table expression. + const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot(); + auto get_column_options = GetColumnsOptions(GetColumnsOptions::All).withVirtuals(); + if (storage_snapshot->storage.supportsSubcolumns()) + get_column_options.withSubcolumns(); + + auto column_names_and_types = storage_snapshot->getColumns(get_column_options); + const auto & columns_description = storage_snapshot->metadata->getColumns(); + + ColumnNameToColumnNodeMap column_name_to_node; + column_name_to_node.reserve(column_names_and_types.size()); + + for (const auto & column_name_and_type : column_names_and_types) + { + const auto & column_default = columns_description.getDefault(column_name_and_type.name); + if (column_default && column_default->kind == ColumnDefaultKind::Alias) + { + auto alias_expression = buildQueryTree(column_default->expression, context); + QueryAnalysisPass(table_expression_node).run(alias_expression, context); + if (!alias_expression->getResultType()->equals(*column_name_and_type.type)) + alias_expression = buildCastFunction(alias_expression, column_name_and_type.type, context, true); + + auto column_node = std::make_shared(column_name_and_type, std::move(alias_expression), table_expression_node); + column_name_to_node.emplace(column_name_and_type.name, std::move(column_node)); + } + else + { + auto column_node = std::make_shared(column_name_and_type, table_expression_node); + column_name_to_node.emplace(column_name_and_type.name, std::move(column_node)); + } + } + + return column_name_to_node; +} + +class ReplaceColumnNodesForTableExpressionVisitor : public InDepthQueryTreeVisitor +{ +public: + ReplaceColumnNodesForTableExpressionVisitor( + const QueryTreeNodePtr & from_, + const QueryTreeNodePtr & to_, + const ColumnNameToColumnNodeMap & column_name_to_node_) + : from(from_), to(to_), column_name_to_node(column_name_to_node_) + {} + + void visitImpl(QueryTreeNodePtr & node) + { + auto * column_node = node->as(); + if (!column_node) + return; + + auto column_source = column_node->getColumnSourceOrNull(); + if (!column_source) + return; + + if (column_source.get() != from.get()) + return; + + auto it = column_name_to_node.find(column_node->getColumnName()); + if (it != column_name_to_node.end()) + { + auto replacement = it->second->clone(); + replacement->setAlias(column_node->getAlias()); + node = std::move(replacement); + } + else + { + // Preserve the column name but rebind its source to the replacement table expression. + column_node->setColumnSource(to); + } + } + + static bool needChildVisit(const QueryTreeNodePtr &, const QueryTreeNodePtr & child_node) + { + auto child_node_type = child_node->getNodeType(); + return !(child_node_type == QueryTreeNodeType::QUERY || child_node_type == QueryTreeNodeType::UNION); + } + +private: + QueryTreeNodePtr from; + QueryTreeNodePtr to; + const ColumnNameToColumnNodeMap & column_name_to_node; }; class RewriteInToGlobalInVisitor : public InDepthQueryTreeVisitorWithContext @@ -861,7 +1052,8 @@ bool rewriteJoinToGlobalJoinIfNeeded(QueryTreeNodePtr join_tree) QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, const StorageSnapshotPtr & distributed_storage_snapshot, const StorageID & remote_storage_id, - const ASTPtr & remote_table_function) + const ASTPtr & remote_table_function, + const ASTPtr & additional_filter = nullptr) { auto & planner_context = query_info.planner_context; const auto & query_context = planner_context->getQueryContext(); @@ -928,8 +1120,55 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, replacement_table_expression->setAlias(query_info.table_expression->getAlias()); - auto query_tree_to_modify = query_info.query_tree->cloneAndReplace(query_info.table_expression, std::move(replacement_table_expression)); - ReplaseAliasColumnsVisitor replace_alias_columns_visitor; + QueryTreeNodePtr filter; + if (additional_filter) + { + const auto & context = query_info.planner_context->getQueryContext(); + + filter = buildQueryTree(additional_filter->clone(), query_context); + // Resolve now; alias expressions are normalized later for the merged query. + QueryAnalysisPass(replacement_table_expression).run(filter, context); + } + + auto query_tree_to_modify = query_info.query_tree->cloneAndReplace(query_info.table_expression, replacement_table_expression); + + // Apply additional filter if provided + if (filter) + { + auto & query = query_tree_to_modify->as(); + query.getWhere() = query.hasWhere() + ? mergeConditionNodes({query.getWhere(), filter}, query_context) + : std::move(filter); + } + + if (additional_filter) + { + auto replacement_columns = buildColumnNodesForTableExpression(replacement_table_expression, query_context); + + /** + * When Hybrid injects a segment predicate, the query tree may end up mixing + * two different column interpretations for the same name: + * - SELECT list columns are resolved against the Hybrid schema (physical columns). + * - WHERE predicate columns are resolved against the segment schema (ALIAS columns). + * + * If we later expand alias columns only in one place, the analyzer can see + * two different expressions with the same alias (e.g. `computed` as a column + * vs `value * 2 AS computed`), which triggers MULTIPLE_EXPRESSIONS_FOR_ALIAS. + * + * To prevent this, we rebuild ColumnNodes from the replacement table expression + * (including fully-resolved ALIAS expressions) and rewrite the whole query tree + * so all references to the replaced table share the same column source and + * the same alias semantics. This keeps SELECT and WHERE consistent before + * ReplaseAliasColumnsVisitor performs final alias expansion. + */ + ReplaceColumnNodesForTableExpressionVisitor replace_query_columns_visitor( + replacement_table_expression, + replacement_table_expression, + replacement_columns); + replace_query_columns_visitor.visit(query_tree_to_modify); + } + + ReplaseAliasColumnsVisitor replace_alias_columns_visitor(query_context); replace_alias_columns_visitor.visit(query_tree_to_modify); const auto & settings = query_context->getSettingsRef(); @@ -946,7 +1185,10 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, rewriteJoinToGlobalJoinIfNeeded(query_node.getJoinTree()); } - return buildQueryTreeForShard(query_info.planner_context, query_tree_to_modify, /*allow_global_join_for_right_table*/ false); + auto shard_query_tree = buildQueryTreeForShard(query_info.planner_context, query_tree_to_modify, /*allow_global_join_for_right_table*/ false); + finalizeAliasMarkersForDistributedSerialization(shard_query_tree, query_context); + return shard_query_tree; + } } @@ -965,30 +1207,90 @@ void StorageDistributed::read( SelectQueryInfo modified_query_info = query_info; + std::vector additional_query_infos; + const auto & settings = local_context->getSettingsRef(); + auto metadata_ptr = getInMemoryMetadataPtr(); + + auto describe_segment_target = [&](const HybridSegment & segment) -> String + { + if (segment.storage_id) + return segment.storage_id->getNameForLogs(); + if (segment.table_function_ast) + return segment.table_function_ast->formatForLogging(); + chassert(false, "Hybrid segment is missing both storage_id and table_function_ast"); + return String{""}; + }; + + auto describe_base_target = [&]() -> String + { + if (remote_table_function_ptr) + return remote_table_function_ptr->formatForLogging(); + if (!remote_database.empty()) + return remote_database + "." + remote_table; + return remote_table; + }; + + String base_target = describe_base_target(); + + const bool log_hybrid_query_rewrites = (!segments.empty() || base_segment_predicate); + + auto log_rewritten_query = [&](const String & target, const ASTPtr & ast) + { + if (!log_hybrid_query_rewrites || !ast) + return; + + LOG_TRACE(log, "rewriteSelectQuery (target: {}) -> {}", target, ast->formatForLogging()); + }; if (settings[Setting::allow_experimental_analyzer]) { - StorageID remote_storage_id = StorageID{remote_database, remote_table}; + StorageID remote_storage_id = StorageID::createEmpty(); + if (!remote_table_function_ptr) + remote_storage_id = StorageID{remote_database, remote_table}; auto query_tree_distributed = buildQueryTreeDistributed(modified_query_info, query_info.initial_storage_snapshot ? query_info.initial_storage_snapshot : storage_snapshot, remote_storage_id, - remote_table_function_ptr); + remote_table_function_ptr, + base_segment_predicate); Block block = *InterpreterSelectQueryAnalyzer::getSampleBlock(query_tree_distributed, local_context, SelectQueryOptions(processed_stage).analyze()); /** For distributed tables we do not need constants in header, since we don't send them to remote servers. * Moreover, constants can break some functions like `hostName` that are constants only for local queries. */ for (auto & column : block) column.column = column.column->convertToFullColumnIfConst(); + header = std::make_shared(std::move(block)); modified_query_info.query = queryNodeToDistributedSelectQuery(query_tree_distributed); modified_query_info.query_tree = std::move(query_tree_distributed); + log_rewritten_query(base_target, modified_query_info.query); + + if (!segments.empty()) + { + for (const auto & segment : segments) + { + // Create a modified query info with the segment predicate + SelectQueryInfo additional_query_info = query_info; + + auto additional_query_tree = buildQueryTreeDistributed(additional_query_info, + query_info.initial_storage_snapshot ? query_info.initial_storage_snapshot : storage_snapshot, + segment.storage_id ? *segment.storage_id : StorageID::createEmpty(), + segment.storage_id ? nullptr : segment.table_function_ast, + segment.predicate_ast); + + additional_query_info.query = queryNodeToDistributedSelectQuery(additional_query_tree); + additional_query_info.query_tree = std::move(additional_query_tree); + log_rewritten_query(describe_segment_target(segment), additional_query_info.query); + + additional_query_infos.push_back(std::move(additional_query_info)); + } + } - /// Return directly (with correct header) if no shard to query. - if (modified_query_info.getCluster()->getShardsInfo().empty()) + // For empty shards - avoid early return if we have additional segments + if (modified_query_info.getCluster()->getShardsInfo().empty() && segments.empty()) return; } else @@ -997,9 +1299,39 @@ void StorageDistributed::read( modified_query_info.query = ClusterProxy::rewriteSelectQuery( local_context, modified_query_info.query, - remote_database, remote_table, remote_table_function_ptr); + remote_database, remote_table, remote_table_function_ptr, + base_segment_predicate); + log_rewritten_query(base_target, modified_query_info.query); - if (modified_query_info.getCluster()->getShardsInfo().empty()) + if (!segments.empty()) + { + for (const auto & segment : segments) + { + SelectQueryInfo additional_query_info = query_info; + + if (segment.storage_id) + { + additional_query_info.query = ClusterProxy::rewriteSelectQuery( + local_context, additional_query_info.query, + segment.storage_id->database_name, segment.storage_id->table_name, + nullptr, + segment.predicate_ast); + } + else + { + additional_query_info.query = ClusterProxy::rewriteSelectQuery( + local_context, additional_query_info.query, + "", "", segment.table_function_ast, + segment.predicate_ast); + } + + log_rewritten_query(describe_segment_target(segment), additional_query_info.query); + additional_query_infos.push_back(std::move(additional_query_info)); + } + } + + // For empty shards - avoid early return if we have additional segments + if (modified_query_info.getCluster()->getShardsInfo().empty() && segments.empty()) { Pipe pipe(std::make_shared(header)); auto read_from_pipe = std::make_unique(std::move(pipe)); @@ -1010,34 +1342,38 @@ void StorageDistributed::read( } } - ClusterProxy::SelectStreamFactory select_stream_factory = - ClusterProxy::SelectStreamFactory( + if (!modified_query_info.getCluster()->getShardsInfo().empty() || !additional_query_infos.empty()) + { + ClusterProxy::SelectStreamFactory select_stream_factory = + ClusterProxy::SelectStreamFactory( + header, + storage_snapshot, + processed_stage); + + auto shard_filter_generator = ClusterProxy::getShardFilterGeneratorForCustomKey( + *modified_query_info.getCluster(), local_context, metadata_ptr->columns); + + ClusterProxy::executeQuery( + query_plan, header, - storage_snapshot, - processed_stage); - - auto shard_filter_generator = ClusterProxy::getShardFilterGeneratorForCustomKey( - *modified_query_info.getCluster(), local_context, getInMemoryMetadataPtr()->columns); - - ClusterProxy::executeQuery( - query_plan, - header, - processed_stage, - remote_storage, - remote_table_function_ptr, - select_stream_factory, - log, - local_context, - modified_query_info, - sharding_key_expr, - sharding_key_column_name, - *distributed_settings, - shard_filter_generator, - is_remote_function); - - /// This is a bug, it is possible only when there is no shards to query, and this is handled earlier. - if (!query_plan.isInitialized()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline is not initialized"); + processed_stage, + remote_storage, + remote_table_function_ptr, + select_stream_factory, + log, + local_context, + modified_query_info, + sharding_key_expr, + sharding_key_column_name, + *distributed_settings, + shard_filter_generator, + is_remote_function, + additional_query_infos); + + /// This is a bug, it is possible only when there is no shards to query, and this is handled earlier. + if (!query_plan.isInitialized()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline is not initialized"); + } } @@ -2024,6 +2360,37 @@ void StorageDistributed::delayInsertOrThrowIfNeeded() const } } +void StorageDistributed::setHybridLayout(std::vector segments_) +{ + segments = std::move(segments_); + log = getLogger("Hybrid (" + getStorageID().table_name + ")"); + + auto virtuals = createVirtuals(); + // or _segment_index? + virtuals.addEphemeral("_table_index", std::make_shared(), "Index of the table function in Hybrid (0 for main table, 1+ for additional segments)"); + setVirtuals(virtuals); +} + +void StorageDistributed::setCachedColumnsToCast(ColumnsDescription columns) +{ + cached_columns_to_cast = std::move(columns); + if (!cached_columns_to_cast.empty() && log) + { + Names columns_with_types; + const auto cached_columns = cached_columns_to_cast.getAllPhysical(); + columns_with_types.reserve(cached_columns.size()); + for (const auto & col : cached_columns) + columns_with_types.emplace_back(col.name + " " + col.type->getName()); + LOG_DEBUG(log, "Hybrid auto-cast will apply to: [{}]", fmt::join(columns_with_types, ", ")); + } +} + +ColumnsDescription StorageDistributed::getColumnsToCast() const +{ + return cached_columns_to_cast; +} + + void registerStorageDistributed(StorageFactory & factory) { factory.registerStorage("Distributed", [](const StorageFactory::Arguments & args) @@ -2128,6 +2495,283 @@ void registerStorageDistributed(StorageFactory & factory) }); } +void registerStorageHybrid(StorageFactory & factory) +{ + factory.registerStorage("Hybrid", [](const StorageFactory::Arguments & args) -> StoragePtr + { + ASTs & engine_args = args.engine_args; + + if (engine_args.size() < 2) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Storage Hybrid requires at least 2 arguments, got {}", engine_args.size()); + + const ContextPtr & global_context = args.getContext(); + ContextPtr local_context = args.getLocalContext(); + if (!local_context) + local_context = global_context; + + if (args.mode <= LoadingStrictnessLevel::CREATE + && !local_context->getSettingsRef()[Setting::allow_experimental_hybrid_table]) + { + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, + "Experimental Hybrid table engine is not enabled (the setting 'allow_experimental_hybrid_table')"); + } + + // Validate first argument - must be a table function + ASTPtr first_arg = engine_args[0]; + if (const auto * func = first_arg->as()) + { + // Check if it's a valid table function name + if (!TableFunctionFactory::instance().isTableFunctionName(func->name)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "First argument must be a table function, got: {}", func->name); + + // Check if it's one of the supported remote table functions + if (func->name != "remote" && func->name != "remoteSecure" && + func->name != "cluster" && func->name != "clusterAllReplicas") + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "First argument must be one of: remote, remoteSecure, cluster, clusterAllReplicas, got: {}", func->name); + } + else + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "First argument must be a table function, got: {}", first_arg->getID()); + } + + // Now handle the first table function (which must be a TableFunctionRemote) + auto table_function = TableFunctionFactory::instance().get(first_arg, local_context); + if (!table_function) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid table function in Hybrid engine"); + + // Capture the physical columns reported by the first segment (table function) + ColumnsDescription first_segment_columns = table_function->getActualTableStructure(local_context, true); + + // For schema inference, prefer user-provided columns, otherwise use the physical ones + ColumnsDescription columns_to_use = args.columns; + if (columns_to_use.empty()) + columns_to_use = first_segment_columns; + + const auto physical_columns = columns_to_use.getAllPhysical(); + + NameSet columns_to_cast_names; + auto validate_segment_schema = [&](const ColumnsDescription & segment_columns, const String & segment_name) + { + for (const auto & column : physical_columns) + { + // all columns defined as physical in hybrid should exists in segments (but can be aliases there) + auto found = segment_columns.tryGetColumn(GetColumnsOptions(GetColumnsOptions::AllPhysicalAndAliases), column.name); + if (!found) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Hybrid segment {} is missing column '{}' required by Hybrid schema", + segment_name, column.name); + } + + // if the type of the column is the segment differs - we need to add it to the list of columns which require casts + if (!found->type->equals(*column.type)) + columns_to_cast_names.emplace(column.name); + } + }; + + validate_segment_schema(first_segment_columns, engine_args[0]->formatForLogging()); + + // Execute the table function to get the underlying storage + StoragePtr storage = table_function->execute( + first_arg, + local_context, + args.table_id.table_name, + columns_to_use, + false, // use_global_context = false + false); // is_insert_query = false + + // table function execution wraps the actual storage in a StorageTableFunctionProxy, to make initialize it lazily in queries + // here we need to get the nested storage + if (auto proxy = std::dynamic_pointer_cast(storage)) + { + storage = proxy->getNested(); + } + + // Cast to StorageDistributed to access its methods + auto distributed_storage = std::dynamic_pointer_cast(storage); + if (!distributed_storage) + { + // Debug: Print the actual type we got + std::string actual_type = storage ? storage->getName() : "nullptr"; + throw Exception(ErrorCodes::LOGICAL_ERROR, + "TableFunctionRemote did not return a StorageDistributed or StorageProxy, got: {}", actual_type); + } + + auto validate_predicate = [&](ASTPtr & predicate, size_t argument_index) + { + try + { + auto syntax_result = TreeRewriter(local_context).analyze(predicate, physical_columns); + ExpressionAnalyzer(predicate, syntax_result, local_context).getActions(true); + } + catch (const Exception & e) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Argument #{} must be a valid SQL expression: {}", argument_index, e.message()); + } + }; + + ASTPtr second_arg = engine_args[1]; + validate_predicate(second_arg, 1); + distributed_storage->setBaseSegmentPredicate(second_arg); + + // Parse additional table function pairs (if any) + std::vector segment_definitions; + for (size_t i = 2; i < engine_args.size(); i += 2) + { + if (i + 1 >= engine_args.size()) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Table function pairs must have both table function and predicate, got odd number of arguments"); + + ASTPtr table_function_ast = engine_args[i]; + ASTPtr predicate_ast = engine_args[i + 1]; + + validate_predicate(predicate_ast, i + 1); + + // Validate table function or table identifier + if (const auto * func = table_function_ast->as()) + { + // It's a table function - validate it + if (!TableFunctionFactory::instance().isTableFunctionName(func->name)) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Argument #{}: additional table function must be a valid table function, got: {}", i, func->name); + } + + // Normalize arguments (evaluate `currentDatabase()`, expand named collections, etc.). + // TableFunctionFactory::get mutates the AST in-place inside TableFunctionRemote::parseArguments. + ASTPtr normalized_table_function_ast = table_function_ast->clone(); + auto additional_table_function = TableFunctionFactory::instance().get(normalized_table_function_ast, local_context); + ColumnsDescription segment_columns = additional_table_function->getActualTableStructure(local_context, true); + replaceCurrentDatabaseFunction(normalized_table_function_ast, local_context); + + validate_segment_schema(segment_columns, normalized_table_function_ast->formatForLogging()); + + // It's a table function - store the AST and cached schema for later execution + segment_definitions.emplace_back(normalized_table_function_ast, predicate_ast); + } + else if (const auto * ast_identifier = table_function_ast->as()) + { + // It's an identifier - try to convert it to a table identifier + auto table_identifier = ast_identifier->createTable(); + if (!table_identifier) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Argument #{}: identifier '{}' cannot be converted to table identifier", i, ast_identifier->name()); + } + + StoragePtr validated_table; + try + { + // Parse table identifier to get StorageID + StorageID storage_id(table_identifier); + + // Fill database for unqualified identifiers using current database (or the target table database). + if (storage_id.database_name.empty()) + { + String default_database = local_context->getCurrentDatabase(); + if (default_database.empty()) + default_database = args.table_id.database_name; + + if (default_database.empty()) + { + throw Exception(ErrorCodes::UNKNOWN_DATABASE, + "Argument #{}: table identifier '{}' does not specify database and no default database is selected", + i, ast_identifier->name()); + } + + storage_id.database_name = default_database; + + // Update AST so the table definition stores a fully qualified name. + auto qualified_identifier = make_intrusive(storage_id.database_name, storage_id.table_name); + qualified_identifier->alias = ast_identifier->alias; + qualified_identifier->prefer_alias_to_column_name = ast_identifier->prefer_alias_to_column_name; + table_function_ast = qualified_identifier; + engine_args[i] = table_function_ast; + } + + // Sanity check: verify the table exists + try + { + auto database = DatabaseCatalog::instance().getDatabase(storage_id.database_name, local_context); + if (!database) + { + throw Exception(ErrorCodes::UNKNOWN_DATABASE, + "Database '{}' does not exist", storage_id.database_name); + } + + auto table = database->tryGetTable(storage_id.table_name, local_context); + if (!table) + { + throw Exception(ErrorCodes::UNKNOWN_TABLE, + "Table '{}.{}' does not exist", storage_id.database_name, storage_id.table_name); + } + validated_table = table; + } + catch (const Exception & e) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Argument #{}: table '{}' validation failed: {}", i, ast_identifier->name(), e.message()); + } + + ColumnsDescription segment_columns; + + if (validated_table) + segment_columns = validated_table->getInMemoryMetadataPtr()->getColumns(); + + validate_segment_schema(segment_columns, storage_id.getNameForLogs()); + + segment_definitions.emplace_back(table_function_ast, predicate_ast, storage_id); + } + catch (const Exception & e) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Argument #{}: invalid table identifier '{}': {}", i, ast_identifier->name(), e.message()); + } + } + else + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Argument #{}: additional argument must be either a table function or table identifier, got: {}", i, table_function_ast->getID()); + } + + } + + // Fix the database and table names - this is the same pattern used in InterpreterCreateQuery + // The TableFunctionRemote creates a StorageDistributed with "_table_function" database, + // but we need to rename it to the correct database and table names + distributed_storage->renameInMemory({args.table_id.database_name, args.table_id.table_name, args.table_id.uuid}); + + // Store segment definitions for later use + distributed_storage->setHybridLayout(std::move(segment_definitions)); + if (!columns_to_cast_names.empty()) + { + NamesAndTypesList cast_cols; + + // 'physical' columns of Hybrid will be read from segments, and may need CASTS + for (const auto & col : physical_columns) + { + if (columns_to_cast_names.contains(col.name)) + cast_cols.emplace_back(col.name, col.type); + } + distributed_storage->setCachedColumnsToCast(ColumnsDescription(cast_cols)); + } + + return distributed_storage; + }, + { + .supports_settings = false, + .supports_parallel_insert = true, + .supports_schema_inference = true, + .source_access_type = AccessTypeObjects::Source::REMOTE, + }); +} + bool StorageDistributed::initializeDiskOnConfigChange(const std::set & new_added_disks) { if (!storage_policy || !data_volume) diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 68dca4b92703..d3906a6dd8d9 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -50,6 +50,27 @@ class StorageDistributed final : public IStorage, WithContext friend class StorageSystemDistributionQueue; public: + /// Structure to hold table function AST, predicate, optional StorageID, and cached physical columns for the segment. + /// Cached columns let us detect schema mismatches and enable features like hybrid_table_auto_cast_columns without + /// re-fetching remote headers on every query. + struct HybridSegment + { + ASTPtr table_function_ast; + ASTPtr predicate_ast; + std::optional storage_id; // For table identifiers instead of table functions + + HybridSegment(ASTPtr table_function_ast_, ASTPtr predicate_ast_) + : table_function_ast(std::move(table_function_ast_)) + , predicate_ast(std::move(predicate_ast_)) + {} + + HybridSegment(ASTPtr table_function_ast_, ASTPtr predicate_ast_, StorageID storage_id_) + : table_function_ast(std::move(table_function_ast_)) + , predicate_ast(std::move(predicate_ast_)) + , storage_id(std::move(storage_id_)) + {} + }; + StorageDistributed( const StorageID & id_, const ColumnsDescription & columns_, @@ -70,7 +91,12 @@ class StorageDistributed final : public IStorage, WithContext ~StorageDistributed() override; - std::string getName() const override { return "Distributed"; } + std::string getName() const override + { + return (segments.empty() && !base_segment_predicate) + ? "Distributed" + : "Hybrid"; + } bool supportsSampling() const override { return true; } bool supportsFinal() const override { return true; } @@ -138,6 +164,20 @@ class StorageDistributed final : public IStorage, WithContext size_t getShardCount() const; + /// Set optional predicate applied to the base segment + void setBaseSegmentPredicate(ASTPtr predicate) { base_segment_predicate = std::move(predicate); } + + /// Set segment definitions for Hybrid engine along with cached schema info + void setHybridLayout(std::vector segments_); + void setCachedColumnsToCast(ColumnsDescription columns); + + /// Getter methods for ClusterProxy::executeQuery + StorageID getRemoteStorageID() const { return remote_storage; } + ColumnsDescription getColumnsToCast() const; + ExpressionActionsPtr getShardingKeyExpression() const { return sharding_key_expr; } + const DistributedSettings * getDistributedSettings() const { return distributed_settings.get(); } + bool isRemoteFunction() const { return is_remote_function; } + bool initializeDiskOnConfigChange(const std::set & new_added_disks) override; private: @@ -273,6 +313,21 @@ class StorageDistributed final : public IStorage, WithContext pcg64 rng; bool is_remote_function; + + /// Additional filter expression for Hybrid engine + ASTPtr base_segment_predicate; + + /// Additional segments for Hybrid engine + std::vector segments; + + /// Hybrid build the list of columns which need to be casted once during CREATE/ATTACH + /// those are columns which type differs from the expected at least on one segment. + /// is is used by HybridCastsPass and hybrid_table_auto_cast_columns feature + /// without cache that would require reading the headers of the segments before every query + /// which may trigger extra DESCRIBE call in case of remote queries. + /// Subsequent segment DDL changes are not auto-detected; + /// reattach/recreate the Hybrid table to refresh. + ColumnsDescription cached_columns_to_cast; }; } diff --git a/src/Storages/buildQueryTreeForShard.cpp b/src/Storages/buildQueryTreeForShard.cpp index 939dcfdfaa1a..f2f04f833d5e 100644 --- a/src/Storages/buildQueryTreeForShard.cpp +++ b/src/Storages/buildQueryTreeForShard.cpp @@ -403,7 +403,10 @@ TableNodePtr executeSubqueryNode(const QueryTreeNodePtr & subquery_node, ContextMutablePtr & mutable_context, size_t subquery_depth) { - const auto subquery_hash = subquery_node->getTreeHash(); + auto subquery_node_to_execute = subquery_node->clone(); + finalizeAliasMarkersForDistributedSerialization(subquery_node_to_execute, mutable_context); + + const auto subquery_hash = subquery_node_to_execute->getTreeHash(); const auto temporary_table_name = fmt::format("_data_{}", toString(subquery_hash)); const auto & external_tables = mutable_context->getExternalTables(); @@ -419,7 +422,7 @@ TableNodePtr executeSubqueryNode(const QueryTreeNodePtr & subquery_node, auto context_copy = Context::createCopy(mutable_context); updateContextForSubqueryExecution(context_copy); - InterpreterSelectQueryAnalyzer interpreter(subquery_node, context_copy, subquery_options); + InterpreterSelectQueryAnalyzer interpreter(subquery_node_to_execute, context_copy, subquery_options); auto & query_plan = interpreter.getQueryPlan(); auto sample_block_with_unique_names = *query_plan.getCurrentHeader(); @@ -506,6 +509,10 @@ QueryTreeNodePtr getSubqueryFromTableExpression( QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify, bool allow_global_join_for_right_table) { + /// Incoming materialized markers are hop-local metadata. + /// Strip them before this node prepares/executes subqueries for the next hop. + stripMaterializedAliasMarkers(query_tree_to_modify); + CollectColumnSourceToColumnsVisitor collect_column_source_to_columns_visitor; collect_column_source_to_columns_visitor.visit(query_tree_to_modify); diff --git a/src/Storages/registerStorages.cpp b/src/Storages/registerStorages.cpp index 165d21b2b0ea..e9af6b095129 100644 --- a/src/Storages/registerStorages.cpp +++ b/src/Storages/registerStorages.cpp @@ -13,6 +13,7 @@ void registerStorageNull(StorageFactory & factory); void registerStorageMerge(StorageFactory & factory); void registerStorageBuffer(StorageFactory & factory); void registerStorageDistributed(StorageFactory & factory); +void registerStorageHybrid(StorageFactory & factory); void registerStorageMemory(StorageFactory & factory); void registerStorageFile(StorageFactory & factory); void registerStorageURL(StorageFactory & factory); @@ -122,6 +123,7 @@ void registerStorages() registerStorageMerge(factory); registerStorageBuffer(factory); registerStorageDistributed(factory); + registerStorageHybrid(factory); registerStorageMemory(factory); registerStorageFile(factory); registerStorageURL(factory); diff --git a/tests/queries/0_stateless/03643_hybrid.reference b/tests/queries/0_stateless/03643_hybrid.reference new file mode 100644 index 000000000000..d5ebcfa6d834 --- /dev/null +++ b/tests/queries/0_stateless/03643_hybrid.reference @@ -0,0 +1,225 @@ +Hybrid creation requires allow_experimental_hybrid_table +Check Hybrid engine is registered +Hybrid +Ensure no leftovers before validation checks +Expect error when Hybrid has no arguments +Expect error when Hybrid has a single literal argument +Expect error when Hybrid arguments are literals only +Expect error when first argument is a table function of the wrong subtype (can not construct Distributed from file) +Expect error when first argument is not a table function (scalar expression) +Expect error when first argument is a table function of the wrong subtype (can not construct Distributed from url) +Expect error when predicate references a missing column +Missing column + schema inference +Create Hybrid table with remote() and constant predicate (explicit column list) +CREATE TABLE default.test_tiered_distributed\n(\n `dummy` UInt8\n)\nENGINE = Hybrid(remote(\'localhost:9000\'), 1) +dummy UInt8 +0 +Row 1: +────── +database: default +name: test_tiered_distributed +engine: Hybrid +create_table_query: CREATE TABLE default.test_tiered_distributed (`dummy` UInt8) ENGINE = Hybrid(remote('localhost:9000'), 1) +engine_full: Hybrid(remote('localhost:9000'), 1) +Create Hybrid table with remote table function and predicate (inference) +CREATE TABLE default.test_tiered_distributed_numbers_range\n(\n `number` UInt64\n)\nENGINE = Hybrid(remote(\'localhost:9000\', \'system.numbers\'), number < 5) +0 +1 +2 +3 +4 +Create Hybrid table with two remote segments as table +CREATE TABLE default.test_tiered_distributed_numbers_dual\n(\n `number` UInt64\n)\nENGINE = Hybrid(remote(\'localhost:9000\', \'system.numbers\'), number < 5, remote(\'localhost:9000\', system.numbers), (number >= 10) AND (number <= 15))\nCOMMENT \'Generates all natural numbers, starting from 0 (to 2^64 - 1, and then again) in sorted order.\' +0 +1 +2 +3 +4 +10 +11 +12 +13 +14 +15 +0 +1 +2 +3 +4 +10 +11 +12 +13 +14 +15 +Create Hybrid table combining remote function and local table +0 +1 +2 +3 +4 +10 +11 +12 +13 +14 +15 +Verify Hybrid skips segment with always false predicate on the first segment +10 +11 +12 +13 +14 +15 +Verify Hybrid skips segment with always false predicate on the second segment +0 +1 +2 +Hybrid raises when a segment is missing a column used by the base schema +Prepare local MergeTree table for multi-segment tests +Populate local table with sample data +Create Hybrid table with three segment pairs +Count rows across all segments +6 +Count rows from segments with id > 4 +1 +Count rows where value > 200 +3 +Count rows named Alice +1 +Select rows ordered by value descending (id > 2) +4 David 300.2 +5 Eve 250.1 +3 Charlie 150.7 +Limit results ordered by id +0 Invalid 2022-01-01 10:00:00 0.5 +1 Alice 2022-01-01 10:00:00 100.5 +2 Bob 2022-01-02 11:00:00 200.3 +Explain plan for filter on value +Union (Hybrid) + ReadFromRemote (Read from remote replica) + ReadFromRemote (Read from remote replica) + ReadFromRemote (Read from remote replica) +Union (Hybrid) + ReadFromRemote (Read from remote replica) + ReadFromRemote (Read from remote replica) + ReadFromRemote (Read from remote replica) +Union (Hybrid) + ReadFromRemote (Read from remote replica) + Expression ((Projection + Before ORDER BY)) + Expression (WHERE) + ReadFromMergeTree (default.test_tiered_local_data) + Expression ((Projection + Before ORDER BY)) + Expression (WHERE) + ReadFromMergeTree (default.test_tiered_local_data) +Union (Hybrid) + ReadFromRemote (Read from remote replica) + Expression ((Project names + Projection)) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.test_tiered_local_data) + Expression ((Project names + Projection)) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.test_tiered_local_data) +Aggregate values across name when filtering by event_time +David 1 300.2 +Eve 1 250.1 +Bob 1 200.3 +Charlie 1 150.7 +Verify additional_table_filters works consistently (legacy analyser) +2 Bob 200.3 +Verify additional_table_filters works consistently (new analyser) +2 Bob 200.3 +Clean up Hybrid table with three segment pairs +Clean up local helper table +Drop predicate filtering fixtures if they exist +Create local tables representing before/after watermark partitions +Create second local table with different value type +Insert rows before watermark into both tables +Insert rows after watermark into both tables +Create Hybrid table with analyzer disabled during reads +Insert row via Hybrid table (should go to first segment) +Verify that inserted row landed in first table +17 John 2025-09-25 400 +Verify that second table did not receive the inserted row +0 +Read predicate-filtered data with analyzer disabled and no localhost preference +14 David 2025-09-05 400 +15 Eve 2025-09-10 500 +16 Frank 2025-09-15 600 +17 John 2025-09-25 400 +21 Alice 2025-08-15 100 +22 Bob 2025-08-20 200 +23 Charlie 2025-08-25 300 +Read predicate-filtered data with analyzer enabled and no localhost preference +14 David 2025-09-05 400 +15 Eve 2025-09-10 500 +16 Frank 2025-09-15 600 +17 John 2025-09-25 400 +21 Alice 2025-08-15 100 +22 Bob 2025-08-20 200 +23 Charlie 2025-08-25 300 +Read predicate-filtered data with analyzer disabled and prefer localhost replica +14 David 2025-09-05 400 +15 Eve 2025-09-10 500 +16 Frank 2025-09-15 600 +17 John 2025-09-25 400 +21 Alice 2025-08-15 100 +22 Bob 2025-08-20 200 +23 Charlie 2025-08-25 300 +Read predicate-filtered data with analyzer enabled and prefer localhost replica +14 David 2025-09-05 400 +15 Eve 2025-09-10 500 +16 Frank 2025-09-15 600 +17 John 2025-09-25 400 +21 Alice 2025-08-15 100 +22 Bob 2025-08-20 200 +23 Charlie 2025-08-25 300 +Check if the subqueries were recorded in query_log (hybrid_table_auto_cast_columns = 0) +Row 1: +────── +type: QueryFinish +is_initial_query2: 1 +tbl: ['_table_function.remote','db.test_tiered_watermark'] +qry: SELECT * FROM test_tiered_watermark ORDER BY id DESC SETTINGS enable_analyzer = 1, hybrid_table_auto_cast_columns = 0, prefer_localhost_replica = 0, log_queries=1, serialize_query_plan=0, log_comment = 'test_tiered_watermark1', max_threads=1 FORMAT Null; +log_comment: test_tiered_watermark1 + +Row 2: +────── +type: QueryFinish +is_initial_query2: 0 +tbl: ['db.test_tiered_watermark_after'] +qry: SELECT `__table1`.`id` AS `id`, `__table1`.`name` AS `name`, `__table1`.`date` AS `date`, `__table1`.`value` AS `value` FROM `db`.`test_tiered_watermark_after` AS `__table1` WHERE `__table1`.`date` >= '2025-09-01' ORDER BY `__table1`.`id` DESC +log_comment: test_tiered_watermark1 + +Row 3: +────── +type: QueryFinish +is_initial_query2: 0 +tbl: ['db.test_tiered_watermark_before'] +qry: SELECT `__table1`.`id` AS `id`, `__table1`.`name` AS `name`, `__table1`.`date` AS `date`, `__table1`.`value` AS `value` FROM `db`.`test_tiered_watermark_before` AS `__table1` WHERE `__table1`.`date` < '2025-09-01' ORDER BY `__table1`.`id` DESC +log_comment: test_tiered_watermark1 +Check if the subqueries were recorded in query_log (hybrid_table_auto_cast_columns = 1) +Row 1: +────── +type: QueryFinish +is_initial_query2: 1 +tbl: ['_table_function.remote','db.test_tiered_watermark'] +qry: SELECT * FROM test_tiered_watermark ORDER BY id DESC SETTINGS enable_analyzer = 1, hybrid_table_auto_cast_columns = 1, prefer_localhost_replica = 0, log_queries=1, serialize_query_plan=0, log_comment = 'test_tiered_watermark2', max_threads=1 FORMAT Null; +log_comment: test_tiered_watermark2 + +Row 2: +────── +type: QueryFinish +is_initial_query2: 0 +tbl: ['db.test_tiered_watermark_after'] +qry: SELECT _CAST(`__table1`.`id`, 'UInt32') AS `id`, _CAST(`__table1`.`name`, 'String') AS `name`, `__table1`.`date` AS `date`, _CAST(`__table1`.`value`, 'UInt32') AS `value` FROM `db`.`test_tiered_watermark_after` AS `__table1` WHERE `__table1`.`date` >= '2025-09-01' ORDER BY _CAST(`__table1`.`id`, 'UInt32') AS `id` DESC +log_comment: test_tiered_watermark2 + +Row 3: +────── +type: QueryFinish +is_initial_query2: 0 +tbl: ['db.test_tiered_watermark_before'] +qry: SELECT _CAST(`__table1`.`id`, 'UInt32') AS `id`, _CAST(`__table1`.`name`, 'String') AS `name`, `__table1`.`date` AS `date`, _CAST(`__table1`.`value`, 'UInt32') AS `value` FROM `db`.`test_tiered_watermark_before` AS `__table1` WHERE `__table1`.`date` < '2025-09-01' ORDER BY _CAST(`__table1`.`id`, 'UInt32') DESC +log_comment: test_tiered_watermark2 +Clean up predicate filtering tables diff --git a/tests/queries/0_stateless/03643_hybrid.sql b/tests/queries/0_stateless/03643_hybrid.sql new file mode 100644 index 000000000000..851045aafcfa --- /dev/null +++ b/tests/queries/0_stateless/03643_hybrid.sql @@ -0,0 +1,407 @@ +SELECT 'Hybrid creation requires allow_experimental_hybrid_table'; +SET allow_experimental_hybrid_table = 0; +CREATE TABLE test_hybrid_requires_setting (`dummy` UInt8) ENGINE = Hybrid(remote('localhost:9000'), 1); -- { serverError SUPPORT_IS_DISABLED } +DROP TABLE IF EXISTS test_hybrid_requires_setting SYNC; + +SET allow_experimental_hybrid_table = 1; + +SELECT 'Check Hybrid engine is registered'; +SELECT name FROM system.table_engines WHERE name = 'Hybrid'; + +SELECT 'Ensure no leftovers before validation checks'; +DROP TABLE IF EXISTS test_tiered_distributed SYNC; +DROP TABLE IF EXISTS test_tiered_distributed_bad_args SYNC; +DROP TABLE IF EXISTS test_tiered_distributed_invalid_first_arg SYNC; + +SELECT 'Expect error when Hybrid has no arguments'; +CREATE TABLE test_tiered_distributed_bad_args (`id` UInt32,`name` String) ENGINE = Hybrid(); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH } + +SELECT 'Expect error when Hybrid has a single literal argument'; +CREATE TABLE test_tiered_distributed_bad_args (`id` UInt32,`name` String) ENGINE = Hybrid(1); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH } + +SELECT 'Expect error when Hybrid arguments are literals only'; +CREATE TABLE test_tiered_distributed_bad_args (`id` UInt32,`name` String) ENGINE = Hybrid(1, 1); -- { serverError BAD_ARGUMENTS } + +SELECT 'Expect error when first argument is a table function of the wrong subtype (can not construct Distributed from file)'; +CREATE TABLE test_tiered_distributed_invalid_first_arg (`id` UInt32, `name` String) ENGINE = Hybrid(file('foo.x'), 1); -- { serverError BAD_ARGUMENTS } + +SELECT 'Expect error when first argument is not a table function (scalar expression)'; +CREATE TABLE test_tiered_distributed_invalid_first_arg (`id` UInt32, `name` String) ENGINE = Hybrid(sin(3), 1); -- { serverError BAD_ARGUMENTS } + +SELECT 'Expect error when first argument is a table function of the wrong subtype (can not construct Distributed from url)'; +CREATE TABLE test_tiered_distributed_invalid_first_arg (`id` UInt32, `name` String) ENGINE = Hybrid(url('http://google.com', 'RawBLOB'), 1); -- { serverError BAD_ARGUMENTS } + +SELECT 'Expect error when predicate references a missing column'; +CREATE TABLE test_tiered_distributed_bad_args(`number` UInt64) ENGINE = Hybrid(remote('localhost:9000', system.numbers), number2 < 5); -- { serverError BAD_ARGUMENTS } + +SELECT 'Missing column + schema inference'; +CREATE TABLE test_tiered_distributed_bad_args ENGINE = Hybrid(remote('localhost:9000', system.numbers), number2 < 5); -- { serverError BAD_ARGUMENTS } + +DROP TABLE IF EXISTS test_tiered_distributed_bad_args SYNC; + +SELECT 'Create Hybrid table with remote() and constant predicate (explicit column list)'; +DROP TABLE IF EXISTS test_tiered_distributed SYNC; +CREATE TABLE test_tiered_distributed(`dummy` UInt8) ENGINE = Hybrid(remote('localhost:9000'), 1); +SHOW CREATE TABLE test_tiered_distributed; +DESCRIBE TABLE test_tiered_distributed; +SELECT * FROM test_tiered_distributed; +SELECT database, name, engine, create_table_query, engine_full FROM system.tables WHERE table = 'test_tiered_distributed' FORMAT Vertical; +DROP TABLE IF EXISTS test_tiered_distributed SYNC; + +SELECT 'Create Hybrid table with remote table function and predicate (inference)'; +DROP TABLE IF EXISTS test_tiered_distributed_numbers_range SYNC; +CREATE TABLE test_tiered_distributed_numbers_range ENGINE = Hybrid(remote('localhost:9000', system.numbers), number < 5); +SHOW CREATE TABLE test_tiered_distributed_numbers_range; +SELECT * FROM test_tiered_distributed_numbers_range ORDER BY number; +DROP TABLE IF EXISTS test_tiered_distributed_numbers_range SYNC; + +SELECT 'Create Hybrid table with two remote segments as table'; +DROP TABLE IF EXISTS test_tiered_distributed_numbers_dual SYNC; +CREATE TABLE test_tiered_distributed_numbers_dual ENGINE = Hybrid( + remote('localhost:9000', system.numbers), number < 5, + remote('localhost:9000', system.numbers), number BETWEEN 10 AND 15 +) AS system.numbers; + +SHOW CREATE TABLE test_tiered_distributed_numbers_dual; +SELECT * FROM test_tiered_distributed_numbers_dual ORDER BY number SETTINGS enable_analyzer = 0; +SELECT * FROM test_tiered_distributed_numbers_dual ORDER BY number SETTINGS enable_analyzer = 1; +DROP TABLE IF EXISTS test_tiered_distributed_numbers_dual SYNC; + +SELECT 'Create Hybrid table combining remote function and local table'; +DROP TABLE IF EXISTS test_tiered_distributed_numbers_mixed SYNC; +CREATE TABLE test_tiered_distributed_numbers_mixed +( + `number` UInt64 +) ENGINE = Hybrid( + remote('localhost:9000', system.numbers), number < 5, + system.numbers, number BETWEEN 10 AND 15 +); +SELECT * FROM test_tiered_distributed_numbers_mixed ORDER BY number; +DROP TABLE IF EXISTS test_tiered_distributed_numbers_mixed SYNC; + +SELECT 'Verify Hybrid skips segment with always false predicate on the first segment'; +DROP TABLE IF EXISTS test_tiered_distributed_numbers_skip_first SYNC; +CREATE TABLE test_tiered_distributed_numbers_skip_first +( + `number` UInt64 +) ENGINE = Hybrid( + remote('localhost:9000', system.numbers), 0, + system.numbers, number BETWEEN 10 AND 15 +); +SELECT * FROM test_tiered_distributed_numbers_skip_first ORDER BY number; +DROP TABLE IF EXISTS test_tiered_distributed_numbers_skip_first SYNC; + +SELECT 'Verify Hybrid skips segment with always false predicate on the second segment'; +DROP TABLE IF EXISTS test_tiered_distributed_numbers_skip_second SYNC; +CREATE TABLE test_tiered_distributed_numbers_skip_second +( + `number` UInt64 +) ENGINE = Hybrid( + remote('localhost:9000', system.numbers), number < 3, + system.numbers, 0 +); +SELECT * FROM test_tiered_distributed_numbers_skip_second ORDER BY number; +DROP TABLE IF EXISTS test_tiered_distributed_numbers_skip_second SYNC; + +SELECT 'Hybrid raises when a segment is missing a column used by the base schema'; +DROP TABLE IF EXISTS test_hybrid_segment_full SYNC; +DROP TABLE IF EXISTS test_hybrid_segment_partial SYNC; +DROP TABLE IF EXISTS test_hybrid_missing_column SYNC; + +CREATE TABLE test_hybrid_segment_full +( + `id` UInt32, + `value` UInt32 +) +ENGINE = MergeTree() +ORDER BY id; + +CREATE TABLE test_hybrid_segment_partial +( + `id` UInt32 +) +ENGINE = MergeTree() +ORDER BY id; + +INSERT INTO test_hybrid_segment_full VALUES (1, 10), (2, 20); +INSERT INTO test_hybrid_segment_partial VALUES (3), (4); + +CREATE TABLE test_hybrid_missing_column ENGINE = Hybrid( + remote('localhost:9000', currentDatabase(), 'test_hybrid_segment_full'), id < 3, + remote('localhost:9000', currentDatabase(), 'test_hybrid_segment_partial'), id >= 3 +); -- { serverError BAD_ARGUMENTS } + +DROP TABLE IF EXISTS test_hybrid_missing_column SYNC; +DROP TABLE IF EXISTS test_hybrid_segment_partial SYNC; +DROP TABLE IF EXISTS test_hybrid_segment_full SYNC; + +----------------------------- + +SELECT 'Prepare local MergeTree table for multi-segment tests'; +DROP TABLE IF EXISTS test_tiered_local_data SYNC; +CREATE TABLE test_tiered_local_data +( + `id` UInt32, + `name` String, + `event_time` DateTime, + `value` Float64 +) ENGINE = MergeTree() +ORDER BY id; + +SELECT 'Populate local table with sample data'; +INSERT INTO test_tiered_local_data VALUES + (0, 'Invalid', '2022-01-01 10:00:00', 0.5), + (1, 'Alice', '2022-01-01 10:00:00', 100.5), + (2, 'Bob', '2022-01-02 11:00:00', 200.3), + (3, 'Charlie', '2022-01-03 12:00:00', 150.7), + (4, 'David', '2022-01-04 13:00:00', 300.2), + (5, 'Eve', '2022-01-05 14:00:00', 250.1); + +SELECT 'Create Hybrid table with three segment pairs'; +DROP TABLE IF EXISTS test_tiered_multi_segment SYNC; + +CREATE TABLE test_tiered_multi_segment +( + `id` UInt32, + `name` String, + `event_time` DateTime, + `value` Float64 +) +ENGINE = Hybrid( + remote('127.0.0.2:9000', currentDatabase(), 'test_tiered_local_data'), + id <= 2, + cluster('test_shard_localhost', currentDatabase(), 'test_tiered_local_data'), + id = 3, + remoteSecure('127.0.0.1:9440', currentDatabase(), 'test_tiered_local_data'), + id > 3 +); + +SELECT 'Count rows across all segments'; +SELECT count() FROM test_tiered_multi_segment; +SELECT 'Count rows from segments with id > 4'; +SELECT count() FROM test_tiered_multi_segment WHERE id > 4; +SELECT 'Count rows where value > 200'; +SELECT count() FROM test_tiered_multi_segment WHERE value > 200; +SELECT 'Count rows named Alice'; +SELECT count() AS alice_rows FROM test_tiered_multi_segment WHERE name = 'Alice'; + +SELECT 'Select rows ordered by value descending (id > 2)'; +SELECT id, name, value FROM test_tiered_multi_segment WHERE id > 2 ORDER BY value DESC; +SELECT 'Limit results ordered by id'; +SELECT * FROM test_tiered_multi_segment ORDER BY id LIMIT 3; +SELECT 'Explain plan for filter on value'; +EXPLAIN SELECT * FROM test_tiered_multi_segment WHERE value > 150 SETTINGS prefer_localhost_replica=0, enable_analyzer=0; +EXPLAIN SELECT * FROM test_tiered_multi_segment WHERE value > 150 SETTINGS prefer_localhost_replica=0, enable_analyzer=1; +EXPLAIN SELECT * FROM test_tiered_multi_segment WHERE value > 150 SETTINGS prefer_localhost_replica=1, enable_analyzer=0; +EXPLAIN SELECT * FROM test_tiered_multi_segment WHERE value > 150 SETTINGS prefer_localhost_replica=1, enable_analyzer=1; + +SELECT 'Aggregate values across name when filtering by event_time'; +SELECT + name, + count() AS count, + avg(value) AS avg_value +FROM test_tiered_multi_segment +WHERE event_time >= '2022-01-02' +GROUP BY name +ORDER BY avg_value DESC; + +SELECT 'Verify additional_table_filters works consistently (legacy analyser)'; +SELECT id, name, value +FROM test_tiered_multi_segment +WHERE id < 3 +ORDER BY id +SETTINGS additional_table_filters = {'test_tiered_multi_segment' : 'id > 1'}, allow_experimental_analyzer = 0; + +SELECT 'Verify additional_table_filters works consistently (new analyser)'; +SELECT id, name, value +FROM test_tiered_multi_segment +WHERE id < 3 +ORDER BY id +SETTINGS additional_table_filters = {'test_tiered_multi_segment' : 'id > 1'}, allow_experimental_analyzer = 1; + + +SELECT 'Clean up Hybrid table with three segment pairs'; +DROP TABLE IF EXISTS test_tiered_multi_segment SYNC; +SELECT 'Clean up local helper table'; +DROP TABLE IF EXISTS test_tiered_local_data SYNC; + +--------------------------------- + +-- Test Hybrid engine predicate filtering functionality + +SELECT 'Drop predicate filtering fixtures if they exist'; +DROP TABLE IF EXISTS test_tiered_watermark_after SYNC; +DROP TABLE IF EXISTS test_tiered_watermark_before SYNC; +DROP TABLE IF EXISTS test_tiered_watermark SYNC; + +SELECT 'Create local tables representing before/after watermark partitions'; +CREATE TABLE test_tiered_watermark_after +( + `id` UInt32, + `name` String, + `date` Date, + `value` UInt64 +) +ENGINE = MergeTree() +ORDER BY id; + +SELECT 'Create second local table with different value type'; +CREATE TABLE test_tiered_watermark_before +( + `id` Int32, + `name` Nullable(String), + `date` Date, + `value` Decimal128(0) +) +ENGINE = MergeTree() +ORDER BY id; + +SELECT 'Insert rows before watermark into both tables'; +INSERT INTO test_tiered_watermark_after VALUES + (11, 'Alice', '2025-08-15', 100), + (12, 'Bob', '2025-08-20', 200), + (13, 'Charlie', '2025-08-25', 300); +INSERT INTO test_tiered_watermark_before VALUES + (21, 'Alice', '2025-08-15', 100), + (22, 'Bob', '2025-08-20', 200), + (23, 'Charlie', '2025-08-25', 300); + +SELECT 'Insert rows after watermark into both tables'; +INSERT INTO test_tiered_watermark_after VALUES + (14, 'David', '2025-09-05', 400), + (15, 'Eve', '2025-09-10', 500), + (16, 'Frank', '2025-09-15', 600); +INSERT INTO test_tiered_watermark_before VALUES + (24, 'David', '2025-09-05', 400), + (25, 'Eve', '2025-09-10', 500), + (26, 'Frank', '2025-09-15', 600); + + +SELECT 'Create Hybrid table with analyzer disabled during reads'; +CREATE TABLE test_tiered_watermark +( + `id` UInt32, + `name` String, + `date` Date, + `value` UInt32 +) +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'test_tiered_watermark_after'), + date >= '2025-09-01', + remote('127.0.0.1:9000', currentDatabase(), 'test_tiered_watermark_before'), + date < '2025-09-01' +); + +SELECT 'Insert row via Hybrid table (should go to first segment)'; +INSERT INTO test_tiered_watermark SETTINGS distributed_foreground_insert = 1 +VALUES (17, 'John', '2025-09-25', 400); + +SELECT 'Verify that inserted row landed in first table'; +SELECT * FROM test_tiered_watermark_after WHERE id = 17 ORDER BY id; +SELECT 'Verify that second table did not receive the inserted row'; +SELECT count() FROM test_tiered_watermark_before WHERE id = 17; + + +SELECT 'Read predicate-filtered data with analyzer disabled and no localhost preference'; +SELECT * FROM test_tiered_watermark ORDER BY id SETTINGS enable_analyzer = 0, prefer_localhost_replica = 0; +SELECT 'Read predicate-filtered data with analyzer enabled and no localhost preference'; +SELECT * FROM test_tiered_watermark ORDER BY id SETTINGS enable_analyzer = 1, prefer_localhost_replica = 0; +SELECT 'Read predicate-filtered data with analyzer disabled and prefer localhost replica'; +SELECT * FROM test_tiered_watermark ORDER BY id SETTINGS enable_analyzer = 0, prefer_localhost_replica = 1; +SELECT 'Read predicate-filtered data with analyzer enabled and prefer localhost replica'; +SELECT * FROM test_tiered_watermark ORDER BY id SETTINGS enable_analyzer = 1, prefer_localhost_replica = 1; + +-- other combinations of settings work, but give a bit different content in the query_log +-- See the problem around is_initial_query described in https://github.com/Altinity/ClickHouse/issues/1077 +SELECT 'Check if the subqueries were recorded in query_log (hybrid_table_auto_cast_columns = 0)'; + +SELECT * FROM test_tiered_watermark ORDER BY id DESC SETTINGS enable_analyzer = 1, hybrid_table_auto_cast_columns = 0, prefer_localhost_replica = 0, log_queries=1, serialize_query_plan=0, log_comment = 'test_tiered_watermark1', max_threads=1 FORMAT Null; +SYSTEM FLUSH LOGS; +SELECT + type, + query_id = initial_query_id AS is_initial_query2, + arraySort(arrayMap(x -> replaceAll(x, currentDatabase(), 'db'), tables)) as tbl, + replaceAll(query, currentDatabase(), 'db') as qry, + log_comment +FROM system.query_log +WHERE + event_time > now() - 300 AND type = 'QueryFinish' AND + initial_query_id IN ( + SELECT initial_query_id + FROM system.query_log + WHERE + event_time > now() - 300 + and log_comment = 'test_tiered_watermark1' + and current_database = currentDatabase() + and query_id = initial_query_id ) +ORDER BY tbl, event_time_microseconds +FORMAT Vertical; + +SELECT 'Check if the subqueries were recorded in query_log (hybrid_table_auto_cast_columns = 1)'; + +SELECT * FROM test_tiered_watermark ORDER BY id DESC SETTINGS enable_analyzer = 1, hybrid_table_auto_cast_columns = 1, prefer_localhost_replica = 0, log_queries=1, serialize_query_plan=0, log_comment = 'test_tiered_watermark2', max_threads=1 FORMAT Null; +SYSTEM FLUSH LOGS; +SELECT + type, + query_id = initial_query_id AS is_initial_query2, + arraySort(arrayMap(x -> replaceAll(x, currentDatabase(), 'db'), tables)) as tbl, + replaceAll(query, currentDatabase(), 'db') as qry, + log_comment +FROM system.query_log +WHERE + event_time > now() - 300 AND type = 'QueryFinish' AND + initial_query_id IN ( + SELECT initial_query_id + FROM system.query_log + WHERE + event_time > now() - 300 + and log_comment = 'test_tiered_watermark2' + and current_database = currentDatabase() + and query_id = initial_query_id ) +ORDER BY tbl, event_time_microseconds +FORMAT Vertical; + + +SELECT 'Clean up predicate filtering tables'; +DROP TABLE IF EXISTS test_tiered_watermark SYNC; +DROP TABLE IF EXISTS test_tiered_watermark_after SYNC; +DROP TABLE IF EXISTS test_tiered_watermark_before SYNC; + +-- TODO: - addressed by 03644_hybrid_auto_cast.sql +-- Code: 70. DB::Exception: Received from localhost:9000. DB::Exception: Conversion from AggregateFunction(sum, Decimal(38, 0)) to AggregateFunction(sum, UInt32) is not supported: while converting source column `sum(__table1.value)` to destination column `sum(__table1.value)`. (CANNOT_CONVERT_TYPE) +-- SELECT sum(value) FROM test_tiered_watermark; + +-- TODO: +-- Code: 47. DB::Exception: Received from localhost:9000. DB::Exception: Received from 127.0.0.2:9000. DB::Exception: Identifier '__table1._database' cannot be resolved from table with name __table1. In scope SELECT __table1._database AS _database, __table1._table AS row_count FROM default.test_tiered_watermark_after AS __table1 WHERE __table1.date >= '2025-09-01'. Maybe you meant: ['__table1._table']. (UNKNOWN_IDENTIFIER) +-- SELECT _database, _table, count() AS row_count FROM test_tiered_watermark GROUP BY _database, _table ORDER BY _database, _table; + +-- Other things which may need attention: +-- complex combinations? (overview / over Merge) +-- prefer_localhost_replica +-- threads versus local subquery pipeline part +-- ALTER support + +-- TODO +-- SELECT _table_index, count() AS row_count FROM test_debug_tiered GROUP BY _table_index ORDER BY _table_index; + +-- TODO +-- 1. Integration tests (similar to tests/queries/0_stateless) +-- - Base SELECT with date split: part in Distributed, part in S3 -> results should match a manual UNION ALL (with correct ORDER BY/aggregation). +-- - GROUP BY / ORDER BY / LIMIT: confirm the stage is selected correctly, finalization happens at the top, rows_before_limit_at_least is correct (createLocalPlan already keeps LIMIT). +-- - JOIN: with a small table on the initiator; check GLOBAL JOIN scenarios. Ensure remote segments behave the same as remote shard subqueries created through createLocalPlan. +-- - skipUnusedShards: with analyzer ensure segment conditions are respected (where FILTER DAG is available). +-- - Constants: hostName()/now() in SELECT across several segments -> ensure no discrepancies. +-- - EXPLAIN PLAN/PIPELINE: show child plans for segments and remote plans. +-- - Subqueries in logs. +-- - Different column sets/types: supertype in snapshot, converting actions on read. +-- - Object columns: same as Distributed — use ColumnsDescriptionByShardNum for segments if needed (optional for local segments; already implemented for Distributed). + +-- Condition with dictGet('a1_watermarks_dict', ...) + +-- access rights check + + +-- TODO: +-- test for distributed_aggregation_memory_efficient & enable_memory_bound_merging_of_aggregation_results +-- to avoid UNKNOWN_AGGREGATED_DATA_VARIANT when mixing different aggregation variants +-- from remote shards (with memory_bound) and local segments (without memory_bound) diff --git a/tests/queries/0_stateless/03644_hybrid_auto_cast.reference b/tests/queries/0_stateless/03644_hybrid_auto_cast.reference new file mode 100644 index 000000000000..869ac32216b6 --- /dev/null +++ b/tests/queries/0_stateless/03644_hybrid_auto_cast.reference @@ -0,0 +1,13 @@ +hybrid_table_auto_cast_columns = 0, enable_analyzer = 1 (headers mismatch) +hybrid_table_auto_cast_columns = 0, enable_analyzer = 0 (headers mismatch) +1 +hybrid_table_auto_cast_columns = 0, enable_analyzer = 1 manual cast +600 +1 +hybrid_table_auto_cast_columns = 0, enable_analyzer = 0 manual cast +600 +1 +hybrid_table_auto_cast_columns = 1, enable_analyzer = 1 +600 +1 +hybrid_table_auto_cast_columns = 1, enable_analyzer = 0 (analizer required) diff --git a/tests/queries/0_stateless/03644_hybrid_auto_cast.sql b/tests/queries/0_stateless/03644_hybrid_auto_cast.sql new file mode 100644 index 000000000000..7248dd9a55ef --- /dev/null +++ b/tests/queries/0_stateless/03644_hybrid_auto_cast.sql @@ -0,0 +1,85 @@ +SET allow_experimental_hybrid_table = 1, + prefer_localhost_replica = 0; + +DROP TABLE IF EXISTS test_tiered_watermark_after; +DROP TABLE IF EXISTS test_tiered_watermark_before; +DROP TABLE IF EXISTS test_tiered_watermark; + +CREATE TABLE test_tiered_watermark_after +( + `id` UInt32, + `name` String, + `date` Date, + `value` UInt64, + `categories` Array(UInt32) +) +ENGINE = MergeTree() +ORDER BY id; + +CREATE TABLE test_tiered_watermark_before +( + `id` Int32, + `name` Nullable(String), + `date` Date, + `value` Decimal128(0), + `categories` Array(Int64) +) +ENGINE = MergeTree() +ORDER BY id; + +INSERT INTO test_tiered_watermark_after VALUES + (11, 'Alice', '2025-08-15', 100, [100, 10]), + (12, 'Bob', '2025-08-20', 200, [200, 20]), + (13, 'Charlie', '2025-08-25', 300, [300, 30]), + (14, 'David', '2025-09-05', 400, [400, 40]), + (15, 'Eve', '2025-09-10', 500, [500, 50]), + (16, 'Frank', '2025-09-15', 600, [600, 60]); + +INSERT INTO test_tiered_watermark_before VALUES + (21, 'Alice', '2025-08-15', 100, [100, 10]), + (22, 'Bob', '2025-08-20', 200, [200, 20]), + (23, 'Charlie', '2025-08-25', 300, [300, 30]), + (24, 'David', '2025-09-05', 400, [400, 40]), + (25, 'Eve', '2025-09-10', 500, [500, 50]), + (26, 'Frank', '2025-09-15', 600, [600, 60]); + +CREATE TABLE test_tiered_watermark +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'test_tiered_watermark_after'), + date >= '2025-09-01', + remote('127.0.0.1:9000', currentDatabase(), 'test_tiered_watermark_before'), + date < '2025-09-01' +); + +-- the problem +SELECT 'hybrid_table_auto_cast_columns = 0, enable_analyzer = 1 (headers mismatch)'; +SET hybrid_table_auto_cast_columns = 0, enable_analyzer = 1; +SELECT max(value) FROM test_tiered_watermark; -- { serverError CANNOT_CONVERT_TYPE } +SELECT sum(if(arrayExists(x -> (x IN (10)), categories), 1, 0)) AS x FROM test_tiered_watermark; -- { serverError THERE_IS_NO_COLUMN } + +SELECT 'hybrid_table_auto_cast_columns = 0, enable_analyzer = 0 (headers mismatch)'; +SET hybrid_table_auto_cast_columns = 0, enable_analyzer = 0; +SELECT max(value) FROM test_tiered_watermark; -- { serverError CANNOT_CONVERT_TYPE } +SELECT sum(if(arrayExists(x -> (x IN (10)), categories), 1, 0)) AS x FROM test_tiered_watermark; -- works w/o analyzer + +-- workaround - explicit cast +SELECT 'hybrid_table_auto_cast_columns = 0, enable_analyzer = 1 manual cast'; +SET hybrid_table_auto_cast_columns = 0, enable_analyzer = 1; +SELECT max(value::UInt32) FROM test_tiered_watermark; +SELECT sum(if(arrayExists(x -> (x IN (10)), categories::Array(UInt32)), 1, 0)) AS x FROM test_tiered_watermark; + +SELECT 'hybrid_table_auto_cast_columns = 0, enable_analyzer = 0 manual cast'; +SET hybrid_table_auto_cast_columns = 0, enable_analyzer = 0; +SELECT max(value::UInt32) FROM test_tiered_watermark; +SELECT sum(if(arrayExists(x -> (x IN (10)), categories::Array(UInt32)), 1, 0)) AS x FROM test_tiered_watermark; + +-- feature to add casts automatically +SELECT 'hybrid_table_auto_cast_columns = 1, enable_analyzer = 1'; +SET hybrid_table_auto_cast_columns = 1, enable_analyzer = 1; +SELECT max(value) FROM test_tiered_watermark; +SELECT sum(if(arrayExists(x -> (x IN (10)), categories), 1, 0)) AS x FROM test_tiered_watermark; + +SELECT 'hybrid_table_auto_cast_columns = 1, enable_analyzer = 0 (analizer required)'; +SET hybrid_table_auto_cast_columns = 1, enable_analyzer = 0; +SELECT max(value) FROM test_tiered_watermark; -- { serverError CANNOT_CONVERT_TYPE } + diff --git a/tests/queries/0_stateless/03644_hybrid_unqualified_table.reference b/tests/queries/0_stateless/03644_hybrid_unqualified_table.reference new file mode 100644 index 000000000000..98bae7afae05 --- /dev/null +++ b/tests/queries/0_stateless/03644_hybrid_unqualified_table.reference @@ -0,0 +1,3 @@ +Hybrid allows unqualified local tables by default +3 +1 diff --git a/tests/queries/0_stateless/03644_hybrid_unqualified_table.sql b/tests/queries/0_stateless/03644_hybrid_unqualified_table.sql new file mode 100644 index 000000000000..672beea1afc3 --- /dev/null +++ b/tests/queries/0_stateless/03644_hybrid_unqualified_table.sql @@ -0,0 +1,33 @@ +SET allow_experimental_hybrid_table = 1; + +SELECT 'Hybrid allows unqualified local tables by default'; + +DROP TABLE IF EXISTS test_hybrid_unqualified_segment SYNC; +DROP TABLE IF EXISTS test_hybrid_unqualified SYNC; + +CREATE TABLE test_hybrid_unqualified_segment +( + `number` UInt64 +) +ENGINE = MergeTree() +ORDER BY tuple(); + +INSERT INTO test_hybrid_unqualified_segment VALUES (10), (20); + +CREATE TABLE test_hybrid_unqualified +( + `number` UInt64 +) +ENGINE = Hybrid( + remote('localhost:9000', system.numbers), number = 0, + test_hybrid_unqualified_segment, number >= 10 +); + +SELECT count() FROM test_hybrid_unqualified; + +SELECT positionCaseInsensitive(engine_full, concat(currentDatabase(), '.test_hybrid_unqualified_segment')) > 0 +FROM system.tables +WHERE database = currentDatabase() AND name = 'test_hybrid_unqualified'; + +DROP TABLE IF EXISTS test_hybrid_unqualified SYNC; +DROP TABLE IF EXISTS test_hybrid_unqualified_segment SYNC; diff --git a/tests/queries/0_stateless/03645_hybrid_alias_columns.reference b/tests/queries/0_stateless/03645_hybrid_alias_columns.reference new file mode 100644 index 000000000000..22bfe3702822 --- /dev/null +++ b/tests/queries/0_stateless/03645_hybrid_alias_columns.reference @@ -0,0 +1,11 @@ +test1 +1 ['foo1','bar1_before'] foo1 +2 ['foo2','bar2_after'] foo2 +Hybrid segment predicates with alias columns +3 30 60 33 +Insert into Hybrid with EPHEMERAL column +2 0A0B0C0D +Select from Hybrid with EPHEMERAL column +1 5A90B714 +2 0A0B0C0D +10 01020304 diff --git a/tests/queries/0_stateless/03645_hybrid_alias_columns.sql b/tests/queries/0_stateless/03645_hybrid_alias_columns.sql new file mode 100644 index 000000000000..5f1eec5866b0 --- /dev/null +++ b/tests/queries/0_stateless/03645_hybrid_alias_columns.sql @@ -0,0 +1,157 @@ +SET allow_experimental_hybrid_table = 1, + prefer_localhost_replica = 0; + +DROP TABLE IF EXISTS test_hybrid_alias_cast; +DROP TABLE IF EXISTS test_hybrid_alias_after; +DROP TABLE IF EXISTS test_hybrid_alias_before; + +CREATE TABLE test_hybrid_alias_after +( + a UInt32, + arr Array(String), + arr_1 ALIAS arr[1] +) +ENGINE = MergeTree() +ORDER BY (a, arr[1]) +SETTINGS index_granularity = 1; + +CREATE TABLE test_hybrid_alias_before +( + a UInt32, + arr Array(String), + arr_1 MATERIALIZED arr[1] +) +ENGINE = MergeTree() +ORDER BY (a, arr_1) +SETTINGS index_granularity = 1; + +INSERT INTO test_hybrid_alias_after VALUES (1, ['foo1', 'bar1_after']), (2, ['foo2', 'bar2_after']); +INSERT INTO test_hybrid_alias_before VALUES (1, ['foo1', 'bar1_before']), (2, ['foo2', 'bar2_before']); + +CREATE TABLE test_hybrid_alias_cast +( + a UInt32, + arr Array(String), + arr_1 String +) +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'test_hybrid_alias_after'), + a >= 2, + remote('127.0.0.1:9000', currentDatabase(), 'test_hybrid_alias_before'), + a < 2 +); + +SELECT 'test1'; +SELECT * FROM test_hybrid_alias_cast WHERE arr_1 like 'foo%' ORDER BY a; + +DROP TABLE test_hybrid_alias_cast; +DROP TABLE test_hybrid_alias_after; +DROP TABLE test_hybrid_alias_before; + +DROP TABLE IF EXISTS test_hybrid_alias_predicate; +DROP TABLE IF EXISTS test_hybrid_alias_predicate_left; +DROP TABLE IF EXISTS test_hybrid_alias_predicate_right; + +CREATE TABLE test_hybrid_alias_predicate_left +( + id Int32, + value Int32, + date_col Date, + computed ALIAS value * 2, + sum_alias ALIAS id + value +) +ENGINE = MergeTree() +ORDER BY (date_col, id) +PARTITION BY toYYYYMM(date_col); + +CREATE TABLE test_hybrid_alias_predicate_right +( + id Int32, + value Int32, + date_col Date, + computed ALIAS value * 2, + sum_alias ALIAS id + value +) +ENGINE = MergeTree() +ORDER BY (date_col, id) +PARTITION BY toYYYYMM(date_col); + +INSERT INTO test_hybrid_alias_predicate_left (id, value, date_col) VALUES + (1, 10, '2025-01-15'), + (2, 20, '2025-01-16'), + (3, 30, '2025-01-17'); + +INSERT INTO test_hybrid_alias_predicate_right (id, value, date_col) VALUES + (4, 40, '2025-01-10'), + (5, 50, '2025-01-11'), + (6, 60, '2025-01-12'); + +CREATE TABLE test_hybrid_alias_predicate +( + id Int32, + value Int32, + date_col Date, + computed Int64, + sum_alias Int64 +) +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'test_hybrid_alias_predicate_left'), + computed >= 60, + remote('127.0.0.1:9000', currentDatabase(), 'test_hybrid_alias_predicate_right'), + computed < 60 +); + +SELECT 'Hybrid segment predicates with alias columns'; +SELECT id, value, computed, sum_alias FROM test_hybrid_alias_predicate; + +DROP TABLE test_hybrid_alias_predicate; +DROP TABLE test_hybrid_alias_predicate_left; +DROP TABLE test_hybrid_alias_predicate_right; + +DROP TABLE IF EXISTS test_hybrid_ephem; +DROP TABLE IF EXISTS test_hybrid_ephem_after; +DROP TABLE IF EXISTS test_hybrid_ephem_before; + +CREATE TABLE test_hybrid_ephem_after +( + id UInt64, + unhexed String EPHEMERAL, + hexed FixedString(4) DEFAULT unhex(unhexed) +) +ENGINE = MergeTree() +ORDER BY id; + +CREATE TABLE test_hybrid_ephem_before +( + id UInt64, + hexed FixedString(4) +) +ENGINE = MergeTree() +ORDER BY id; + +INSERT INTO test_hybrid_ephem_after (id, unhexed) VALUES (1, '5a90b714'); +INSERT INTO test_hybrid_ephem_before (id, hexed) VALUES (10, unhex('01020304')); + +CREATE TABLE test_hybrid_ephem +( + id UInt64, + unhexed String EPHEMERAL, + hexed FixedString(4) DEFAULT unhex(unhexed) +) +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'test_hybrid_ephem_after'), + id < 10, + remote('127.0.0.1:9000', currentDatabase(), 'test_hybrid_ephem_before'), + id >= 10 +); + +SELECT 'Insert into Hybrid with EPHEMERAL column'; +INSERT INTO test_hybrid_ephem (id, unhexed) VALUES (2, '0a0b0c0d'); +SELECT id, hex(hexed) FROM test_hybrid_ephem_after WHERE id = 2; + +SELECT 'Select from Hybrid with EPHEMERAL column'; +SELECT id, hex(hexed) FROM test_hybrid_ephem ORDER BY id; + +DROP TABLE test_hybrid_ephem; +DROP TABLE test_hybrid_ephem_after; +DROP TABLE test_hybrid_ephem_before; diff --git a/tests/queries/0_stateless/03648_alias_marker_with_mergeable_state.reference b/tests/queries/0_stateless/03648_alias_marker_with_mergeable_state.reference new file mode 100644 index 000000000000..5f061a829b23 --- /dev/null +++ b/tests/queries/0_stateless/03648_alias_marker_with_mergeable_state.reference @@ -0,0 +1,17 @@ +---- stage: with_mergeable_state (analyzer=1, setting=enable_alias_marker=1) ---- +Header: sum(foo) AggregateFunction(sum, Int64) +---- stage: with_mergeable_state (analyzer=0) ---- +Expected error: Function __aliasMarker is internal and supported only with the analyzer +---- explicit __aliasMarker in user query (analyzer=1) ---- +Explicit __aliasMarker call is allowed +---- stage: complete (analyzer=1) ---- +Header: x Int64 +---- stage: fetch_columns (analyzer=1) ---- +Header: __table1.number UInt64 +---- stage: with_mergeable_state (analyzer=1) ---- +Header: sum(foo) AggregateFunction(sum, Int64) +---- stage: with_mergeable_state_after_aggregation (analyzer=1) ---- +Header: sum(foo) Int64 +---- stage: with_mergeable_state_after_aggregation_and_limit (analyzer=1) ---- +Header: intDiv(__table1.number, 10_UInt8) UInt64 + sum(foo) Int64 diff --git a/tests/queries/0_stateless/03648_alias_marker_with_mergeable_state.sh b/tests/queries/0_stateless/03648_alias_marker_with_mergeable_state.sh new file mode 100755 index 000000000000..fb0580e796f0 --- /dev/null +++ b/tests/queries/0_stateless/03648_alias_marker_with_mergeable_state.sh @@ -0,0 +1,57 @@ +#!/usr/bin/env bash + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +echo "---- stage: with_mergeable_state (analyzer=1, setting=enable_alias_marker=1) ----" +$CLICKHOUSE_CLIENT --enable_analyzer=1 --query_kind secondary_query --stage with_mergeable_state --multiquery 2>&1 <<'EOF' | sed -n '/^Header:/,/^ [^ ]/p' | sed '$d' +SET enable_alias_marker=1; +EXPLAIN header=1 +SELECT sum(__aliasMarker(number*2-3,'foo')) AS x +FROM numbers(10); +EOF + +echo "---- stage: with_mergeable_state (analyzer=0) ----" +alias_marker_error_output=$($CLICKHOUSE_CLIENT --enable_analyzer=0 --stage with_mergeable_state --query \ + "EXPLAIN header=1 SELECT sum(__aliasMarker(number*2-3,'foo')) AS x FROM numbers(10)" 2>&1) +if grep -q "Function __aliasMarker is internal and supported only with the analyzer" <<<"${alias_marker_error_output}" +then + echo "Expected error: Function __aliasMarker is internal and supported only with the analyzer" +else + echo "${alias_marker_error_output}" +fi + +echo "---- explicit __aliasMarker in user query (analyzer=1) ----" +if $CLICKHOUSE_CLIENT --enable_analyzer=1 --query \ + "SELECT __aliasMarker(number*2-3,'foo') FROM numbers(1)" >/dev/null 2>&1 +then + echo "Explicit __aliasMarker call is allowed" +else + echo "Unexpected error for explicit __aliasMarker call" +fi + +echo "---- stage: complete (analyzer=1) ----" +$CLICKHOUSE_CLIENT --enable_analyzer=1 --query_kind secondary_query --stage complete --query \ + "EXPLAIN header=1 SELECT sum(__aliasMarker(number*2-3,'foo')) AS x FROM numbers(10)" \ + 2>&1 | sed -n '/^Header:/,/^ [^ ]/p' | sed '$d' + +echo "---- stage: fetch_columns (analyzer=1) ----" +$CLICKHOUSE_CLIENT --enable_analyzer=1 --query_kind secondary_query --stage fetch_columns --query \ + "EXPLAIN header=1 SELECT sum(__aliasMarker(number*2-3,'foo')) AS x FROM numbers(10)" \ + 2>&1 | sed -n '/^Header:/,/^ [^ ]/p' | sed '$d' + +echo "---- stage: with_mergeable_state (analyzer=1) ----" +$CLICKHOUSE_CLIENT --enable_analyzer=1 --query_kind secondary_query --stage with_mergeable_state --query \ + "EXPLAIN header=1 SELECT sum(__aliasMarker(number*2-3,'foo')) AS x FROM numbers(10)" \ + 2>&1 | sed -n '/^Header:/,/^ [^ ]/p' | sed '$d' + +echo "---- stage: with_mergeable_state_after_aggregation (analyzer=1) ----" +$CLICKHOUSE_CLIENT --enable_analyzer=1 --query_kind secondary_query --stage with_mergeable_state_after_aggregation --query \ + "EXPLAIN header=1 SELECT sum(__aliasMarker(number*2-3,'foo')) AS x FROM numbers(10)" \ + 2>&1 | sed -n '/^Header:/,/^ [^ ]/p' | sed '$d' + +echo "---- stage: with_mergeable_state_after_aggregation_and_limit (analyzer=1) ----" +$CLICKHOUSE_CLIENT --enable_analyzer=1 --query_kind secondary_query --stage with_mergeable_state_after_aggregation_and_limit --query \ + "EXPLAIN header=1 SELECT sum(__aliasMarker(number*2-3,'foo')) AS x FROM numbers(10) GROUP BY intDiv(number,10) AS y ORDER BY y LIMIT 10" \ + 2>&1 | sed -n '/^Header:/,/^ [^ ]/p' | sed '$d' diff --git a/tests/queries/0_stateless/03842_hybrid_alias_issue_1424.reference b/tests/queries/0_stateless/03842_hybrid_alias_issue_1424.reference new file mode 100644 index 000000000000..6f78da4c4f59 --- /dev/null +++ b/tests/queries/0_stateless/03842_hybrid_alias_issue_1424.reference @@ -0,0 +1,42 @@ +max in subquery +4294967294 +sum in subquery +-4921211434 +cte min with predicate +679772422 +cte with limit +-2147483648 -4294967296 +-1762862292 -574613778 +-1329695183 -1573638336 +-221724287 679772422 +0 0 +550067609 -3048000734 +1084637461 3417479706 +1169291374 -3082049462 +1899628504 -740161250 +2147483647 4294967294 +cte without limit +-2147483648 -4294967296 +-1762862292 -574613778 +-1329695183 -1573638336 +-221724287 679772422 +0 0 +550067609 -3048000734 +1084637461 3417479706 +1169291374 -3082049462 +1899628504 -740161250 +2147483647 4294967294 +group by in subquery +10 10 +intersect with order by +-221724287 679772422 +1084637461 3417479706 +2147483647 4294967294 +intersect without order by +-221724287 679772422 +1084637461 3417479706 +2147483647 4294967294 +constant alias in subquery +9 7 32 +constant alias predicate +2 diff --git a/tests/queries/0_stateless/03842_hybrid_alias_issue_1424.sql b/tests/queries/0_stateless/03842_hybrid_alias_issue_1424.sql new file mode 100644 index 000000000000..3d2f527eb534 --- /dev/null +++ b/tests/queries/0_stateless/03842_hybrid_alias_issue_1424.sql @@ -0,0 +1,202 @@ +SET allow_experimental_hybrid_table = 1, enable_analyzer = 1; + +DROP TABLE IF EXISTS test_hybrid_issue_1424; +DROP TABLE IF EXISTS test_hybrid_issue_1424_left; +DROP TABLE IF EXISTS test_hybrid_issue_1424_right; +DROP TABLE IF EXISTS test_hybrid_issue_1424_const; +DROP TABLE IF EXISTS test_hybrid_issue_1424_const_left; +DROP TABLE IF EXISTS test_hybrid_issue_1424_const_right; + +CREATE TABLE test_hybrid_issue_1424_left +( + id Int32, + value Int32, + date_col Date, + computed ALIAS value * 2 +) +ENGINE = MergeTree +PARTITION BY toYYYYMM(date_col) +ORDER BY (date_col, id); + +INSERT INTO test_hybrid_issue_1424_left VALUES + (toInt32(2147483647), toInt32(2147483647), toDate('2149-06-06')), + (toInt32(-2147483648), toInt32(-2147483648), toDate('1970-01-01')), + (toInt32(0), toInt32(0), '1970-01-01'), + (toInt32(1084637461), toInt32(1708739853), toDate(1335613783)), + (toInt32(-221724287), toInt32(339886211), toDate(1294089763)), + (toInt32(-1762862292), toInt32(-287306889), toDate(1375707465)), + (toInt32(1169291374), toInt32(-1541024731), toDate(1082126480)), + (toInt32(-1329695183), toInt32(-786819168), toDate(1226000164)), + (toInt32(1899628504), toInt32(-370080625), toDate(1179050966)), + (toInt32(550067609), toInt32(-1524000367), toDate(1410654931)); + +CREATE TABLE test_hybrid_issue_1424_right +( + id Int32, + value Int32, + date_col Date, + computed ALIAS value * 2 +) +ENGINE = MergeTree +PARTITION BY toYYYYMM(date_col) +ORDER BY (date_col, id); + +INSERT INTO test_hybrid_issue_1424_right VALUES + (toInt32(2147483647), toInt32(2147483647), toDate('2149-06-06')), + (toInt32(-2147483648), toInt32(-2147483648), toDate('1970-01-01')), + (toInt32(0), toInt32(0), '1970-01-01'), + (toInt32(1084637461), toInt32(1708739853), toDate(1335613783)), + (toInt32(-221724287), toInt32(339886211), toDate(1294089763)), + (toInt32(-1762862292), toInt32(-287306889), toDate(1375707465)), + (toInt32(1169291374), toInt32(-1541024731), toDate(1082126480)), + (toInt32(-1329695183), toInt32(-786819168), toDate(1226000164)), + (toInt32(1899628504), toInt32(-370080625), toDate(1179050966)), + (toInt32(550067609), toInt32(-1524000367), toDate(1410654931)); + +CREATE TABLE test_hybrid_issue_1424 +( + id Int32, + value Int32, + date_col Date, + computed Int64 +) +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'test_hybrid_issue_1424_left'), date_col >= '2025-01-15', + remote('127.0.0.1:9000', currentDatabase(), 'test_hybrid_issue_1424_right'), date_col < '2025-01-15' +); + +SELECT 'max in subquery'; +SELECT max_computed FROM (SELECT max(computed) AS max_computed FROM test_hybrid_issue_1424); + +SELECT 'sum in subquery'; +SELECT sum_computed FROM (SELECT sum(computed) AS sum_computed FROM test_hybrid_issue_1424); + +SELECT 'cte min with predicate'; +WITH cte AS +( + SELECT min(computed) AS min_computed + FROM test_hybrid_issue_1424 + WHERE computed > 50 +) +SELECT * FROM cte; + +SELECT 'cte with limit'; +WITH ranked AS +( + SELECT id, computed + FROM test_hybrid_issue_1424 + LIMIT 10 +) +SELECT * +FROM ranked +ORDER BY id ASC; + +SELECT 'cte without limit'; +WITH ranked AS +( + SELECT id, computed + FROM test_hybrid_issue_1424 +) +SELECT * +FROM ranked +ORDER BY id ASC; + +SELECT 'group by in subquery'; +WITH monthly AS +( + SELECT count() AS cnt + FROM test_hybrid_issue_1424 + GROUP BY computed +) +SELECT sum(cnt), count() FROM monthly; + +SELECT 'intersect with order by'; +SELECT * +FROM +( + SELECT id, computed + FROM test_hybrid_issue_1424 + WHERE computed > 100 + INTERSECT + SELECT id, computed + FROM test_hybrid_issue_1424 + WHERE value > 50 +) +ORDER BY id; + +SELECT 'intersect without order by'; +SELECT * +FROM +( + SELECT id, computed + FROM test_hybrid_issue_1424 + WHERE computed > 100 + INTERSECT + SELECT id, computed + FROM test_hybrid_issue_1424 + WHERE value > 50 +) +ORDER BY id; + +CREATE TABLE test_hybrid_issue_1424_const_left +( + id Int32, + value Int32, + date_col Date, + computed ALIAS toInt64(7) +) +ENGINE = MergeTree +PARTITION BY toYYYYMM(date_col) +ORDER BY (date_col, id); + +INSERT INTO test_hybrid_issue_1424_const_left VALUES + (1, 1, toDate('2025-01-15')), + (2, 2, toDate('2025-02-01')); + +CREATE TABLE test_hybrid_issue_1424_const_right +( + id Int32, + value Int32, + date_col Date, + computed ALIAS toInt64(9) +) +ENGINE = MergeTree +PARTITION BY toYYYYMM(date_col) +ORDER BY (date_col, id); + +INSERT INTO test_hybrid_issue_1424_const_right VALUES + (3, 3, toDate('2024-12-31')), + (4, 4, toDate('2020-01-01')); + +CREATE TABLE test_hybrid_issue_1424_const +( + id Int32, + value Int32, + date_col Date, + computed Int64 +) +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'test_hybrid_issue_1424_const_left'), date_col >= '2025-01-15', + remote('127.0.0.1:9000', currentDatabase(), 'test_hybrid_issue_1424_const_right'), date_col < '2025-01-15' +); + +SELECT 'constant alias in subquery'; +SELECT max_computed, min_computed, sum_computed +FROM +( + SELECT + max(computed) AS max_computed, + min(computed) AS min_computed, + sum(computed) AS sum_computed + FROM test_hybrid_issue_1424_const +); + +SELECT 'constant alias predicate'; +SELECT count() FROM test_hybrid_issue_1424_const WHERE computed = 9; + +DROP TABLE test_hybrid_issue_1424; +DROP TABLE test_hybrid_issue_1424_left; +DROP TABLE test_hybrid_issue_1424_right; +DROP TABLE test_hybrid_issue_1424_const; +DROP TABLE test_hybrid_issue_1424_const_left; +DROP TABLE test_hybrid_issue_1424_const_right; diff --git a/tests/queries/0_stateless/03843_distributed_alias_same_expression.reference b/tests/queries/0_stateless/03843_distributed_alias_same_expression.reference new file mode 100644 index 000000000000..18bd6916995a --- /dev/null +++ b/tests/queries/0_stateless/03843_distributed_alias_same_expression.reference @@ -0,0 +1,7 @@ +first +1999-03-29 01:15:33.000 +second +1999-03-29 01:15:33.000 +third +1999-03-29 01:15:33.000 +fourth diff --git a/tests/queries/0_stateless/03843_distributed_alias_same_expression.sql b/tests/queries/0_stateless/03843_distributed_alias_same_expression.sql new file mode 100644 index 000000000000..1767e932b6b7 --- /dev/null +++ b/tests/queries/0_stateless/03843_distributed_alias_same_expression.sql @@ -0,0 +1,44 @@ +-- Regression coverage for distributed ORDER BY + ALIAS columns with identical expressions. +-- Related issue: https://github.com/ClickHouse/ClickHouse/issues/79916 + +DROP TABLE IF EXISTS test_alias_same_expr_remote; + +CREATE TABLE test_alias_same_expr_remote +( + dt DateTime64(3), + String_7 String, + alias_String_7_0 String ALIAS String_7, + alias_String_7_1 String ALIAS String_7 +) +ENGINE = MergeTree() +ORDER BY dt; + +INSERT INTO test_alias_same_expr_remote VALUES ('1999-03-29T01:15:33', ''); + +SELECT 'first'; +SELECT dt, alias_String_7_0, alias_String_7_1 +FROM remote('127.0.0.{1,2}', currentDatabase(), test_alias_same_expr_remote) +LIMIT 1; + +SELECT 'second'; +SELECT dt, alias_String_7_0, alias_String_7_1 +FROM remote('127.0.0.{1,2}', currentDatabase(), test_alias_same_expr_remote) +ORDER BY dt +LIMIT 1 +SETTINGS enable_analyzer = 0; + +SELECT 'third'; +SELECT dt, alias_String_7_0, alias_String_7_1 +FROM remote('127.0.0.{1,2}', currentDatabase(), test_alias_same_expr_remote) +ORDER BY dt +LIMIT 1 +SETTINGS enable_analyzer = 1; + +SELECT 'fourth'; +SELECT dt, alias_String_7_0, alias_String_7_1 +FROM remote('127.0.0.{1,2}', currentDatabase(), test_alias_same_expr_remote) +ORDER BY dt +LIMIT 1 +SETTINGS enable_analyzer = 1, enable_alias_marker = 0; -- { serverError NUMBER_OF_COLUMNS_DOESNT_MATCH } + +DROP TABLE test_alias_same_expr_remote; diff --git a/tests/queries/0_stateless/03844_distributed_nested_alias_marker.reference b/tests/queries/0_stateless/03844_distributed_nested_alias_marker.reference new file mode 100644 index 000000000000..7b05cb1e81a0 --- /dev/null +++ b/tests/queries/0_stateless/03844_distributed_nested_alias_marker.reference @@ -0,0 +1,4 @@ +analyzer +x x +legacy +x x diff --git a/tests/queries/0_stateless/03844_distributed_nested_alias_marker.sql b/tests/queries/0_stateless/03844_distributed_nested_alias_marker.sql new file mode 100644 index 000000000000..b725acf38949 --- /dev/null +++ b/tests/queries/0_stateless/03844_distributed_nested_alias_marker.sql @@ -0,0 +1,34 @@ +DROP TABLE IF EXISTS test_nested_alias_dist; +DROP TABLE IF EXISTS test_nested_alias_local; + +CREATE TABLE test_nested_alias_local +( + dt DateTime64(3), + base String, + a String ALIAS base, + b String ALIAS a +) +ENGINE = MergeTree() +ORDER BY dt; + +INSERT INTO test_nested_alias_local VALUES ('1999-03-29T01:15:33', 'x'); + +CREATE TABLE test_nested_alias_dist AS test_nested_alias_local +ENGINE = Distributed('test_shard_localhost', currentDatabase(), test_nested_alias_local, rand()); + +SELECT 'analyzer'; +SELECT a, b +FROM test_nested_alias_dist +ORDER BY dt +LIMIT 1 +SETTINGS enable_analyzer = 1; + +SELECT 'legacy'; +SELECT a, b +FROM test_nested_alias_dist +ORDER BY dt +LIMIT 1 +SETTINGS enable_analyzer = 0; + +DROP TABLE test_nested_alias_dist; +DROP TABLE test_nested_alias_local; diff --git a/tests/queries/0_stateless/03845_distributed_global_in_join_alias_chain.reference b/tests/queries/0_stateless/03845_distributed_global_in_join_alias_chain.reference new file mode 100644 index 000000000000..325078d71cc1 --- /dev/null +++ b/tests/queries/0_stateless/03845_distributed_global_in_join_alias_chain.reference @@ -0,0 +1,8 @@ +rewrite_in +1 +1 +rewrite_join +1 +1 +1 +1 diff --git a/tests/queries/0_stateless/03845_distributed_global_in_join_alias_chain.sql b/tests/queries/0_stateless/03845_distributed_global_in_join_alias_chain.sql new file mode 100644 index 000000000000..9bd95d72fd20 --- /dev/null +++ b/tests/queries/0_stateless/03845_distributed_global_in_join_alias_chain.sql @@ -0,0 +1,34 @@ +DROP TABLE IF EXISTS test_global_alias_chain_dist; +DROP TABLE IF EXISTS test_global_alias_chain_local; + +CREATE TABLE test_global_alias_chain_local +( + id UInt64, + base UInt64, + a UInt64 ALIAS base, + b UInt64 ALIAS a +) +ENGINE = MergeTree() +ORDER BY id; + +INSERT INTO test_global_alias_chain_local VALUES (1, 1); + +CREATE TABLE test_global_alias_chain_dist AS test_global_alias_chain_local +ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), test_global_alias_chain_local, rand()); + +SELECT 'rewrite_in'; +SELECT id +FROM test_global_alias_chain_dist +WHERE id IN (SELECT b FROM test_global_alias_chain_dist) +ORDER BY id +SETTINGS enable_analyzer = 1, distributed_product_mode = 'global'; + +SELECT 'rewrite_join'; +SELECT l.id +FROM test_global_alias_chain_dist AS l +INNER JOIN (SELECT b FROM test_global_alias_chain_dist) AS r ON l.id = r.b +ORDER BY l.id +SETTINGS enable_analyzer = 1, distributed_product_mode = 'global'; + +DROP TABLE test_global_alias_chain_dist; +DROP TABLE test_global_alias_chain_local; diff --git a/tests/queries/0_stateless/03846_distributed_global_in_alias_marker_collision.reference b/tests/queries/0_stateless/03846_distributed_global_in_alias_marker_collision.reference new file mode 100644 index 000000000000..9a3a29a69ce8 --- /dev/null +++ b/tests/queries/0_stateless/03846_distributed_global_in_alias_marker_collision.reference @@ -0,0 +1,2 @@ +global_in_collision_check +1 diff --git a/tests/queries/0_stateless/03846_distributed_global_in_alias_marker_collision.sql b/tests/queries/0_stateless/03846_distributed_global_in_alias_marker_collision.sql new file mode 100644 index 000000000000..d47e6a304ba1 --- /dev/null +++ b/tests/queries/0_stateless/03846_distributed_global_in_alias_marker_collision.sql @@ -0,0 +1,56 @@ +DROP TABLE IF EXISTS test_marker_collision_dist; +DROP TABLE IF EXISTS test_marker_collision_main; +DROP TABLE IF EXISTS test_marker_collision_left; +DROP TABLE IF EXISTS test_marker_collision_right; + +CREATE TABLE test_marker_collision_main +( + id UInt64 +) +ENGINE = MergeTree() +ORDER BY id; + +INSERT INTO test_marker_collision_main VALUES (1); + +CREATE TABLE test_marker_collision_left +( + id UInt64, + x UInt64, + b UInt64 ALIAS x +) +ENGINE = MergeTree() +ORDER BY id; + +CREATE TABLE test_marker_collision_right +( + id UInt64, + y UInt64, + b UInt64 ALIAS y +) +ENGINE = MergeTree() +ORDER BY id; + +INSERT INTO test_marker_collision_left VALUES (1, 1); +INSERT INTO test_marker_collision_right VALUES (1, 20); + +CREATE TABLE test_marker_collision_dist AS test_marker_collision_main +ENGINE = Distributed('test_shard_localhost', currentDatabase(), test_marker_collision_main, rand()); + +SELECT 'global_in_collision_check'; +SELECT id +FROM test_marker_collision_dist +WHERE id GLOBAL IN +( + SELECT test_marker_collision_left.id + FROM test_marker_collision_left + INNER JOIN test_marker_collision_right + ON test_marker_collision_left.id = test_marker_collision_right.id + WHERE test_marker_collision_left.b + test_marker_collision_right.b = 21 +) +ORDER BY id +SETTINGS enable_analyzer = 1, enable_alias_marker = 1; + +DROP TABLE test_marker_collision_dist; +DROP TABLE test_marker_collision_main; +DROP TABLE test_marker_collision_left; +DROP TABLE test_marker_collision_right;