diff --git a/src/Core/ProtocolDefines.h b/src/Core/ProtocolDefines.h index 45e6463a4225..a4c3d5f11df4 100644 --- a/src/Core/ProtocolDefines.h +++ b/src/Core/ProtocolDefines.h @@ -38,7 +38,8 @@ static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_ME static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_METADATA = 3; static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO = 4; static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_EXCLUDED_ROWS = 5; -static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_EXCLUDED_ROWS; +static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH = 6; +static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH; static constexpr auto DATA_LAKE_TABLE_STATE_SNAPSHOT_PROTOCOL_VERSION = 1; diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 81b27527d74d..dd73feb83059 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -579,6 +579,9 @@ Use multiple threads for azure multipart upload. )", 0) \ DECLARE(Bool, s3_throw_on_zero_files_match, false, R"( Throw an error, when ListObjects request cannot match any files +)", 0) \ + DECLARE(Bool, s3_propagate_credentials_to_other_storages, false, R"( +Allow copying base storage credentials to secondary object storages with a different endpoint. Default: 0 (credentials only copied when endpoint matches base). )", 0) \ DECLARE(Bool, hdfs_throw_on_zero_files_match, false, R"( Throw an error if matched zero files according to glob expansion rules. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index e0a1b386b374..37cb0dfd9b83 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -43,6 +43,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() { {"iceberg_partition_timezone", "", "", "New setting."}, // {"object_storage_max_nodes", 0, 0, "Antalya: New setting"}, + {"s3_propagate_credentials_to_other_storages", false, false, "New setting"}, }); addSettingsChanges(settings_changes_history, "26.1", { diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index f8162c998932..855e2cb7d948 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -502,7 +502,9 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con LOG_DEBUG(log, "Has no credentials"); } } - else if (!lightweight && table_metadata.requiresCredentials() && std::find(vended_credentials_catalogs.begin(), vended_credentials_catalogs.end(), catalog->getCatalogType()) == vended_credentials_catalogs.end()) + else if (!lightweight && table_metadata.requiresCredentials() + && std::find(vended_credentials_catalogs.begin(), vended_credentials_catalogs.end(), catalog->getCatalogType()) == vended_credentials_catalogs.end() + && table_metadata.getStorageType() != DatabaseDataLakeStorageType::Local) { throw Exception( ErrorCodes::BAD_ARGUMENTS, diff --git a/src/Interpreters/ClusterFunctionReadTask.cpp b/src/Interpreters/ClusterFunctionReadTask.cpp index 6eda3481a78f..592175167914 100644 --- a/src/Interpreters/ClusterFunctionReadTask.cpp +++ b/src/Interpreters/ClusterFunctionReadTask.cpp @@ -35,10 +35,8 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o data_lake_metadata = object->data_lake_metadata.value(); #if USE_AVRO - if (std::dynamic_pointer_cast(object)) - { - iceberg_info = dynamic_cast(*object).info; - } + if (auto iceberg_object = std::dynamic_pointer_cast(object)) + iceberg_info = iceberg_object->info; #endif const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes]; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp index 6aeaf91961f5..0813580e762a 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp @@ -113,6 +113,7 @@ Plan getPlan( const DataLakeStorageSettings & data_lake_settings, const PersistentTableComponents & persistent_table_components, ObjectStoragePtr object_storage, + SecondaryStorages & secondary_storages, const String & write_format, ContextPtr context, CompressionMethod compression_method) @@ -155,27 +156,30 @@ Plan getPlan( std::unordered_map> manifest_files; for (const auto & snapshot : snapshots_info) { - auto manifest_list = getManifestList(object_storage, persistent_table_components, context, snapshot.manifest_list_path, log); + auto manifest_list = getManifestList(object_storage, persistent_table_components, context, snapshot.manifest_list_absolute_path, log, secondary_storages); for (const auto & manifest_file : manifest_list) { - plan.manifest_list_to_manifest_files[snapshot.manifest_list_path].push_back(manifest_file.manifest_file_path); - if (!plan.manifest_file_to_first_snapshot.contains(manifest_file.manifest_file_path)) - plan.manifest_file_to_first_snapshot[manifest_file.manifest_file_path] = snapshot.snapshot_id; + plan.manifest_list_to_manifest_files[snapshot.manifest_list_absolute_path].push_back(manifest_file.manifest_file_absolute_path); + if (!plan.manifest_file_to_first_snapshot.contains(manifest_file.manifest_file_absolute_path)) + { + plan.manifest_file_to_first_snapshot[manifest_file.manifest_file_absolute_path] = snapshot.snapshot_id; + } auto manifest_file_content = getManifestFile( object_storage, persistent_table_components, context, log, - manifest_file.manifest_file_path, + manifest_file.manifest_file_absolute_path, manifest_file.added_sequence_number, - manifest_file.added_snapshot_id); + manifest_file.added_snapshot_id, + secondary_storages); - if (!manifest_files.contains(manifest_file.manifest_file_path)) + if (!manifest_files.contains(manifest_file.manifest_file_absolute_path)) { - manifest_files[manifest_file.manifest_file_path] = std::make_shared(current_schema); - manifest_files[manifest_file.manifest_file_path]->path = manifest_file.manifest_file_path; + manifest_files[manifest_file.manifest_file_absolute_path] = std::make_shared(current_schema); + manifest_files[manifest_file.manifest_file_absolute_path]->path = manifest_file.manifest_file_absolute_path; } - manifest_files[manifest_file.manifest_file_path]->manifest_lists_path.push_back(snapshot.manifest_list_path); + manifest_files[manifest_file.manifest_file_absolute_path]->manifest_lists_path.push_back(snapshot.manifest_list_absolute_path); auto data_files = manifest_file_content->getFilesWithoutDeleted(FileContentType::DATA); auto positional_delete_files = manifest_file_content->getFilesWithoutDeleted(FileContentType::POSITION_DELETE); for (const auto & pos_delete_file : positional_delete_files) @@ -187,19 +191,23 @@ Plan getPlan( if (plan.partitions.size() <= partition_index) plan.partitions.push_back({}); - IcebergDataObjectInfoPtr data_object_info = std::make_shared(data_file, 0); + auto [resolved_storage, resolved_key] = resolveObjectStorageForPath( + persistent_table_components.table_location, data_file->file_path, object_storage, secondary_storages, context); + + IcebergDataObjectInfoPtr data_object_info = std::make_shared(*data_file, 0, resolved_storage, resolved_key); std::shared_ptr data_file_ptr; - if (!plan.path_to_data_file.contains(manifest_file.manifest_file_path)) + std::string path_identifier = resolved_storage->getDescription() + ":" + resolved_storage->getObjectsNamespace() + "|" + resolved_key; + if (!plan.path_to_data_file.contains(path_identifier)) { data_file_ptr = std::make_shared(DataFilePlan{ .data_object_info = data_object_info, - .manifest_list = manifest_files[manifest_file.manifest_file_path], + .manifest_list = manifest_files[manifest_file.manifest_file_absolute_path], .patched_path = plan.generator.generateDataFileName()}); - plan.path_to_data_file[manifest_file.manifest_file_path] = data_file_ptr; + plan.path_to_data_file[path_identifier] = data_file_ptr; } else { - data_file_ptr = plan.path_to_data_file[manifest_file.manifest_file_path]; + data_file_ptr = plan.path_to_data_file[path_identifier]; } plan.partitions[partition_index].push_back(data_file_ptr); plan.snapshot_id_to_data_files[snapshot.snapshot_id].push_back(plan.partitions[partition_index].back()); @@ -232,7 +240,9 @@ static void writeDataFiles( const std::optional & format_settings, ContextPtr context, const String & write_format, - CompressionMethod write_compression_method) + CompressionMethod write_compression_method, + const String & table_location, + SecondaryStorages & secondary_storages) { for (auto & [_, data_file] : initial_plan.path_to_data_file) { @@ -243,10 +253,15 @@ static void writeDataFiles( format_settings, // todo make compaction using same FormatParserSharedResources std::make_shared(context->getSettingsRef(), 1), - context); + context, + table_location, + secondary_storages); - RelativePathWithMetadata relative_path(data_file->data_object_info->getPath()); - auto read_buffer = createReadBuffer(relative_path, object_storage, context, getLogger("IcebergCompaction")); + ObjectStoragePtr storage_to_use = data_file->data_object_info->getResolvedStorage(); + if (!storage_to_use) + storage_to_use = object_storage; + RelativePathWithMetadata object_info(data_file->data_object_info->getPath()); + auto read_buffer = createReadBuffer(object_info, storage_to_use, context, getLogger("IcebergCompaction")); const Settings & settings = context->getSettingsRef(); auto parser_shared_resources = std::make_shared( @@ -400,6 +415,9 @@ void writeMetadataFiles( { manifest_entry->patched_path = plan.generator.generateManifestEntryName(); manifest_file_renamings[manifest_entry->path] = manifest_entry->patched_path.path_in_metadata; + + std::vector manifest_data_filenames(data_filenames.begin(), data_filenames.end()); + auto buffer_manifest_entry = object_storage->writeObject( StoredObject(manifest_entry->patched_path.path_in_storage), WriteMode::Rewrite, @@ -417,7 +435,7 @@ void writeMetadataFiles( partition_columns, plan.partition_encoder.getPartitionValue(grouped_by_manifest_files_partitions[manifest_entry]), ChunkPartitioner(fields_from_partition_spec, current_schema, context, sample_block_).getResultTypes(), - std::vector(data_filenames.begin(), data_filenames.end()), + manifest_data_filenames, manifest_entry->statistics, sample_block_, snapshot, @@ -438,7 +456,7 @@ void writeMetadataFiles( if (plan.history[i].added_files == 0) continue; - manifest_list_renamings[plan.history[i].manifest_list_path] = new_snapshots[i].metadata_path; + manifest_list_renamings[plan.history[i].manifest_list_absolute_path] = new_snapshots[i].metadata_path; } for (size_t i = 0; i < plan.history.size(); ++i) @@ -446,9 +464,9 @@ void writeMetadataFiles( if (plan.history[i].added_files == 0) continue; - auto initial_manifest_list_name = plan.history[i].manifest_list_path; + auto initial_manifest_list_name = plan.history[i].manifest_list_absolute_path; auto initial_manifest_entries = plan.manifest_list_to_manifest_files[initial_manifest_list_name]; - auto renamed_manifest_list = manifest_list_renamings[initial_manifest_list_name]; + auto renamed_manifest_list = manifest_list_renamings[plan.history[i].manifest_list_absolute_path]; std::vector renamed_manifest_entries; Int32 total_manifest_file_sizes = 0; for (const auto & initial_manifest_entry : initial_manifest_entries) @@ -516,6 +534,7 @@ void compactIcebergTable( IcebergHistory snapshots_info, const PersistentTableComponents & persistent_table_components, ObjectStoragePtr object_storage_, + SecondaryStorages & secondary_storages_, const DataLakeStorageSettings & data_lake_settings, const std::optional & format_settings_, SharedHeader sample_block_, @@ -527,6 +546,7 @@ void compactIcebergTable( data_lake_settings, persistent_table_components, object_storage_, + secondary_storages_, write_format, context_, persistent_table_components.metadata_compression_method); @@ -540,7 +560,9 @@ void compactIcebergTable( format_settings_, context_, write_format, - persistent_table_components.metadata_compression_method); + persistent_table_components.metadata_compression_method, + persistent_table_components.table_location, + secondary_storages_); writeMetadataFiles(plan, object_storage_, context_, sample_block_, write_format, persistent_table_components.table_path); clearOldFiles(object_storage_, old_files); } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h index 0916002f99f3..ad4b28de1343 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB::Iceberg @@ -15,6 +16,7 @@ void compactIcebergTable( IcebergHistory snapshots_info, const PersistentTableComponents & persistent_table_components, DB::ObjectStoragePtr object_storage_, + SecondaryStorages & secondary_storages_, const DataLakeStorageSettings & data_lake_settings, const std::optional & format_settings_, DB::SharedHeader sample_block_, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp index b5112a5a290c..86e812b4d361 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp @@ -1,3 +1,4 @@ +#include #include "config.h" #include @@ -13,6 +14,7 @@ #include #include +#include namespace DB::ErrorCodes { @@ -33,16 +35,22 @@ extern const SettingsBool use_roaring_bitmap_iceberg_positional_deletes; #if USE_AVRO -IcebergDataObjectInfo::IcebergDataObjectInfo(Iceberg::ManifestFileEntryPtr data_manifest_file_entry_, Int32 schema_id_relevant_to_iterator_) - : ObjectInfo(RelativePathWithMetadata(data_manifest_file_entry_->file_path)) +IcebergDataObjectInfo::IcebergDataObjectInfo( + const Iceberg::ManifestFileEntry & data_manifest_file_entry_, + Int32 schema_id_relevant_to_iterator_, + ObjectStoragePtr resolved_storage_, + const String & resolved_key_) + : ObjectInfo(RelativePathWithMetadata(resolved_key_.empty() ? data_manifest_file_entry_.file_path : resolved_key_)) , info{ - data_manifest_file_entry_->file_path_key, - data_manifest_file_entry_->schema_id, - schema_id_relevant_to_iterator_, - data_manifest_file_entry_->added_sequence_number, - data_manifest_file_entry_->file_format, - /* position_deletes_objects */ {}, - /* equality_deletes_objects */ {}} + data_manifest_file_entry_.file_path_from_metadata, + resolved_storage_ ? data_manifest_file_entry_.file_path : data_manifest_file_entry_.file_path_from_metadata, // absolute path + data_manifest_file_entry_.schema_id, + schema_id_relevant_to_iterator_, + data_manifest_file_entry_.added_sequence_number, + data_manifest_file_entry_.file_format, + /* position_deletes_objects */ {}, + /* equality_deletes_objects */ {}} + , resolved_storage(std::move(resolved_storage_)) { } @@ -56,13 +64,15 @@ std::shared_ptr IcebergDataObjectInfo::getPositionDeleteTransf const SharedHeader & header, const std::optional & format_settings, FormatParserSharedResourcesPtr parser_shared_resources, - ContextPtr context_) + ContextPtr context_, + const String & table_location, + SecondaryStorages & secondary_storages) { IcebergDataObjectInfoPtr self = shared_from_this(); if (!context_->getSettingsRef()[Setting::use_roaring_bitmap_iceberg_positional_deletes].value) - return std::make_shared(header, self, object_storage, format_settings, parser_shared_resources, context_); + return std::make_shared(header, self, object_storage, format_settings, parser_shared_resources, context_, table_location, secondary_storages); else - return std::make_shared(header, self, object_storage, format_settings, parser_shared_resources, context_); + return std::make_shared(header, self, object_storage, format_settings, parser_shared_resources, context_, table_location, secondary_storages); } void IcebergDataObjectInfo::addPositionDeleteObject(Iceberg::ManifestFileEntryPtr position_delete_object) @@ -91,7 +101,11 @@ void IcebergDataObjectInfo::addEqualityDeleteObject(const Iceberg::ManifestFileE void IcebergObjectSerializableInfo::serializeForClusterFunctionProtocol(WriteBuffer & out, size_t protocol_version) const { checkVersion(protocol_version); - writeStringBinary(data_object_file_path_key, out); + writeStringBinary(data_object_file_path_from_metadata, out); + if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH) + { + writeStringBinary(data_object_file_absolute_path, out); + } writeVarInt(underlying_format_read_schema_id, out); writeVarInt(schema_id_relevant_to_iterator, out); writeVarInt(sequence_number, out); @@ -140,7 +154,11 @@ void IcebergObjectSerializableInfo::serializeForClusterFunctionProtocol(WriteBuf void IcebergObjectSerializableInfo::deserializeForClusterFunctionProtocol(ReadBuffer & in, size_t protocol_version) { checkVersion(protocol_version); - readStringBinary(data_object_file_path_key, in); + readStringBinary(data_object_file_path_from_metadata, in); + if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH) + { + readStringBinary(data_object_file_absolute_path, in); + } readVarInt(underlying_format_read_schema_id, in); readVarInt(schema_id_relevant_to_iterator, in); readVarInt(sequence_number, in); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h index 259c77c0176f..eddfafa8155a 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h @@ -15,7 +15,8 @@ namespace DB::Iceberg { struct IcebergObjectSerializableInfo { - String data_object_file_path_key; + String data_object_file_path_from_metadata; + String data_object_file_absolute_path; Int32 underlying_format_read_schema_id; Int32 schema_id_relevant_to_iterator; Int64 sequence_number; @@ -35,6 +36,7 @@ struct IcebergObjectSerializableInfo #if USE_AVRO #include +#include #include @@ -47,7 +49,13 @@ struct IcebergDataObjectInfo : public ObjectInfo, std::enable_shared_from_this & format_settings, FormatParserSharedResourcesPtr parser_shared_resources, - ContextPtr context_); + ContextPtr context_, + const String & table_location, + SecondaryStorages & secondary_storages); std::optional getFileFormat() const override { return info.file_format; } void addPositionDeleteObject(Iceberg::ManifestFileEntryPtr position_delete_object); void addEqualityDeleteObject(const Iceberg::ManifestFileEntryPtr & equality_delete_object); + + std::optional getAbsolutePath() const + { + if (info.data_object_file_absolute_path.empty()) + return std::nullopt; + return info.data_object_file_absolute_path; + } + + ObjectStoragePtr getResolvedStorage() const { return resolved_storage; } + + void setResolvedStorage(ObjectStoragePtr storage) { resolved_storage = std::move(storage); } + Iceberg::IcebergObjectSerializableInfo info; + +private: + /// For files located in a different storage than the table's main storage + ObjectStoragePtr resolved_storage; }; using IcebergDataObjectInfoPtr = std::shared_ptr; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp index 671d529fdf71..00ee51eb251c 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp @@ -41,6 +41,7 @@ #include #include #include +#include #include @@ -169,9 +170,10 @@ std::optional SingleThreadIcebergKeysIterator::next() persistent_components, local_context, log, - data_snapshot->manifest_list_entries[manifest_file_index].manifest_file_path, + data_snapshot->manifest_list_entries[manifest_file_index].manifest_file_absolute_path, data_snapshot->manifest_list_entries[manifest_file_index].added_sequence_number, - data_snapshot->manifest_list_entries[manifest_file_index].added_snapshot_id); + data_snapshot->manifest_list_entries[manifest_file_index].added_snapshot_id, + *secondary_storages); internal_data_index = 0; files = files_generator(current_manifest_file_content); } @@ -208,7 +210,9 @@ std::optional SingleThreadIcebergKeysIterator::next() switch (pruning_status) { case PruningReturnStatus::NOT_PRUNED: + { return manifest_file_entry; + } case PruningReturnStatus::MIN_MAX_INDEX_PRUNED: { ++min_max_index_pruned_files; break; @@ -246,7 +250,8 @@ SingleThreadIcebergKeysIterator::SingleThreadIcebergKeysIterator( const ActionsDAG * filter_dag_, Iceberg::TableStateSnapshotPtr table_snapshot_, Iceberg::IcebergDataSnapshotPtr data_snapshot_, - PersistentTableComponents persistent_components_) + PersistentTableComponents persistent_components_, + std::shared_ptr secondary_storages_) : object_storage(object_storage_) , filter_dag(filter_dag_ ? std::make_shared(filter_dag_->clone()) : nullptr) , local_context(local_context_) @@ -267,6 +272,7 @@ SingleThreadIcebergKeysIterator::SingleThreadIcebergKeysIterator( , persistent_components(persistent_components_) , files_generator(files_generator_) , log(getLogger("IcebergIterator")) + , secondary_storages(secondary_storages_) , manifest_file_content_type(manifest_file_content_type_) { } @@ -278,10 +284,12 @@ IcebergIterator::IcebergIterator( IDataLakeMetadata::FileProgressCallback callback_, Iceberg::TableStateSnapshotPtr table_snapshot_, Iceberg::IcebergDataSnapshotPtr data_snapshot_, - PersistentTableComponents persistent_components_) + PersistentTableComponents persistent_components_, + std::shared_ptr secondary_storages_) : logger(getLogger("IcebergIterator")) , filter_dag(filter_dag_ ? std::make_unique(filter_dag_->clone()) : nullptr) , object_storage(std::move(object_storage_)) + , local_context(local_context_) , table_state_snapshot(table_snapshot_) , persistent_components(persistent_components_) , data_files_iterator( @@ -293,7 +301,8 @@ IcebergIterator::IcebergIterator( filter_dag.get(), table_snapshot_, data_snapshot_, - persistent_components_) + persistent_components_, + secondary_storages_) , deletes_iterator( object_storage, local_context_, @@ -308,10 +317,12 @@ IcebergIterator::IcebergIterator( filter_dag.get(), table_snapshot_, data_snapshot_, - persistent_components_) + persistent_components_, + secondary_storages_) , blocking_queue(100) , producer_task(std::nullopt) , callback(std::move(callback_)) + , secondary_storages(secondary_storages_) { auto delete_file = deletes_iterator.next(); while (delete_file.has_value()) @@ -369,12 +380,16 @@ ObjectInfoPtr IcebergIterator::next(size_t) Iceberg::ManifestFileEntryPtr manifest_file_entry; if (blocking_queue.pop(manifest_file_entry)) { - IcebergDataObjectInfoPtr object_info - = std::make_shared(manifest_file_entry, table_state_snapshot->schema_id); + auto [storage_to_use, resolved_key] = resolveObjectStorageForPath( + persistent_components.table_location, manifest_file_entry->file_path, object_storage, *secondary_storages, local_context); + + IcebergDataObjectInfoPtr object_info = std::make_shared( + *manifest_file_entry, table_state_snapshot->schema_id, storage_to_use, resolved_key); + for (const auto & position_delete : defineDeletesSpan(manifest_file_entry, position_deletes_files, /* is_equality_delete */ false, logger)) { - const auto & data_file_path = object_info->info.data_object_file_path_key; + const auto & data_file_path = object_info->info.data_object_file_path_from_metadata; const auto & lower = position_delete->lower_reference_data_file_path; const auto & upper = position_delete->upper_reference_data_file_path; bool can_contain_data_file_deletes @@ -414,7 +429,7 @@ ObjectInfoPtr IcebergIterator::next(size_t) logger, "Finally got {} position delete elements for data file {}", object_info->info.position_deletes_objects.size(), - object_info->info.data_object_file_path_key); + object_info->info.data_object_file_path_from_metadata); } for (const auto & equality_delete : @@ -429,7 +444,7 @@ ObjectInfoPtr IcebergIterator::next(size_t) logger, "Finally got {} equality delete elements for data file {}", object_info->info.equality_deletes_objects.size(), - object_info->info.data_object_file_path_key); + object_info->info.data_object_file_path_from_metadata); } ProfileEvents::increment(ProfileEvents::IcebergMetadataReturnedObjectInfos); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h index 0cd7a2ed298e..e6441fbe5362 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h @@ -26,6 +26,7 @@ #include #include #include +#include namespace DB { @@ -45,7 +46,8 @@ class SingleThreadIcebergKeysIterator const ActionsDAG * filter_dag_, TableStateSnapshotPtr table_snapshot_, IcebergDataSnapshotPtr data_snapshot_, - PersistentTableComponents persistent_components); + PersistentTableComponents persistent_components, + std::shared_ptr secondary_storages_); std::optional next(); @@ -63,6 +65,7 @@ class SingleThreadIcebergKeysIterator LoggerPtr log; std::vector files; + std::shared_ptr secondary_storages; // By Iceberg design it is difficult to avoid storing position deletes in memory. size_t manifest_file_index = 0; @@ -89,7 +92,8 @@ class IcebergIterator : public IObjectIterator IDataLakeMetadata::FileProgressCallback callback_, Iceberg::TableStateSnapshotPtr table_snapshot_, Iceberg::IcebergDataSnapshotPtr data_snapshot_, - Iceberg::PersistentTableComponents persistent_components); + Iceberg::PersistentTableComponents persistent_components, + std::shared_ptr secondary_storages_); ObjectInfoPtr next(size_t) override; @@ -100,6 +104,7 @@ class IcebergIterator : public IObjectIterator LoggerPtr logger; std::unique_ptr filter_dag; ObjectStoragePtr object_storage; + ContextPtr local_context; const Iceberg::TableStateSnapshotPtr table_state_snapshot; Iceberg::PersistentTableComponents persistent_components; Iceberg::SingleThreadIcebergKeysIterator data_files_iterator; @@ -111,6 +116,7 @@ class IcebergIterator : public IObjectIterator std::vector equality_deletes_files; std::exception_ptr exception; std::mutex exception_mutex; + std::shared_ptr secondary_storages; // Sometimes data or manifests can be located on another storage }; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 9a9ad6ef5302..01590babff4f 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -201,6 +201,7 @@ IcebergMetadata::IcebergMetadata( IcebergMetadataFilesCachePtr cache_ptr) : log(getLogger("IcebergMetadata")) , object_storage(std::move(object_storage_)) + , secondary_storages(std::make_shared()) , persistent_components(initializePersistentTableComponents(configuration_, cache_ptr, context_)) , data_lake_settings(configuration_->getDataLakeSettings()) , write_format(configuration_->format) @@ -320,9 +321,9 @@ IcebergDataSnapshotPtr IcebergMetadata::createIcebergDataSnapshotFromSnapshotJSO object_storage, persistent_components, local_context, - getProperFilePathFromMetadataInfo( - manifest_list_file_path, persistent_components.table_path, persistent_components.table_location), - log), + makeAbsolutePath(persistent_components.table_location, manifest_list_file_path), + log, + *secondary_storages), snapshot_id, schema_id, total_rows, @@ -351,6 +352,7 @@ bool IcebergMetadata::optimize( snapshots_info, persistent_components, object_storage, + *secondary_storages, data_lake_settings, format_settings, sample_block, @@ -705,7 +707,10 @@ IcebergMetadata::IcebergHistory IcebergMetadata::getHistory(ContextPtr local_con const auto snapshot = snapshots->getObject(static_cast(i)); history_record.snapshot_id = snapshot->getValue(f_metadata_snapshot_id); - history_record.manifest_list_path = snapshot->getValue(f_manifest_list); + + auto manifest_list_path = snapshot->getValue(f_manifest_list); + history_record.manifest_list_absolute_path = makeAbsolutePath(persistent_components.table_location, manifest_list_path); + const auto summary = snapshot->getObject(f_summary); if (summary->has(f_added_data_files)) history_record.added_files = summary->getValue(f_added_data_files); @@ -775,9 +780,10 @@ bool IcebergMetadata::isDataSortedBySortingKey(StorageMetadataPtr storage_metada persistent_components, context, log, - manifest_list_entry.manifest_file_path, + manifest_list_entry.manifest_file_absolute_path, manifest_list_entry.added_sequence_number, - manifest_list_entry.added_snapshot_id); + manifest_list_entry.added_snapshot_id, + *secondary_storages); if (!manifest_file_ptr->areAllDataFilesSortedBySortOrderID(sorting_key.sort_order_id.value())) return false; @@ -812,9 +818,10 @@ std::optional IcebergMetadata::totalRows(ContextPtr local_context) const persistent_components, local_context, log, - manifest_list_entry.manifest_file_path, + manifest_list_entry.manifest_file_absolute_path, manifest_list_entry.added_sequence_number, - manifest_list_entry.added_snapshot_id); + manifest_list_entry.added_snapshot_id, + *secondary_storages); auto data_count = manifest_file_ptr->getRowsCountInAllFilesExcludingDeleted(FileContentType::DATA); auto position_deletes_count = manifest_file_ptr->getRowsCountInAllFilesExcludingDeleted(FileContentType::POSITION_DELETE); if (!data_count.has_value() || !position_deletes_count.has_value()) @@ -847,9 +854,10 @@ std::optional IcebergMetadata::totalBytes(ContextPtr local_context) cons persistent_components, local_context, log, - manifest_list_entry.manifest_file_path, + manifest_list_entry.manifest_file_absolute_path, manifest_list_entry.added_sequence_number, - manifest_list_entry.added_snapshot_id); + manifest_list_entry.added_snapshot_id, + *secondary_storages); auto count = manifest_file_ptr->getBytesCountInAllDataFilesExcludingDeleted(); if (!count.has_value()) return {}; @@ -918,7 +926,8 @@ ObjectIterator IcebergMetadata::iterate( callback, iceberg_table_state, getRelevantDataSnapshotFromTableStateSnapshot(*iceberg_table_state, local_context), - persistent_components); + persistent_components, + secondary_storages); } NamesAndTypesList IcebergMetadata::getTableSchema(ContextPtr local_context) const @@ -962,7 +971,7 @@ void IcebergMetadata::addDeleteTransformers( { builder.addSimpleTransform( [&](const SharedHeader & header) - { return iceberg_object_info->getPositionDeleteTransformer(object_storage, header, format_settings, parser_shared_resources, local_context); }); + { return iceberg_object_info->getPositionDeleteTransformer(object_storage, header, format_settings, parser_shared_resources, local_context, persistent_components.table_location, *secondary_storages); }); } const auto & delete_files = iceberg_object_info->info.equality_deletes_objects; LOG_DEBUG(log, "Constructing filter transform for equality delete, there are {} delete files", delete_files.size()); @@ -972,9 +981,13 @@ void IcebergMetadata::addDeleteTransformers( { /// get header of delete file Block delete_file_header; - RelativePathWithMetadata delete_file_object(delete_file.file_path); + + auto [delete_storage_to_use, resolved_delete_key] = resolveObjectStorageForPath( + persistent_components.table_location, delete_file.file_path, object_storage, *secondary_storages, local_context); + + RelativePathWithMetadata delete_file_object(resolved_delete_key); { - auto schema_read_buffer = createReadBuffer(delete_file_object, object_storage, local_context, log); + auto schema_read_buffer = createReadBuffer(delete_file_object, delete_storage_to_use, local_context, log); auto schema_reader = FormatFactory::instance().getSchemaReader(delete_file.file_format, *schema_read_buffer, local_context); auto columns_with_names = schema_reader->readSchema(); ColumnsWithTypeAndName initial_header_data; @@ -997,7 +1010,7 @@ void IcebergMetadata::addDeleteTransformers( } /// Then we read the content of the delete file. auto mutable_columns_for_set = block_for_set.cloneEmptyColumns(); - std::unique_ptr data_read_buffer = createReadBuffer(delete_file_object, object_storage, local_context, log); + std::unique_ptr data_read_buffer = createReadBuffer(delete_file_object, delete_storage_to_use, local_context, log); CompressionMethod compression_method = chooseCompressionMethod(delete_file.file_path, "auto"); auto delete_format = FormatFactory::instance().getInput( delete_file.file_format, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index defe1b4acc7a..ef915e0968cf 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -28,6 +28,7 @@ #include #include #include +#include namespace DB { @@ -131,6 +132,12 @@ class IcebergMetadata : public IDataLakeMetadata void drop(ContextPtr context) override; + ObjectIterator createIcebergKeysIterator( + Strings && data_files_, + ObjectStoragePtr, + IDataLakeMetadata::FileProgressCallback callback_, + ContextPtr local_context); + std::optional partitionKey(ContextPtr) const override; std::optional sortingKey(ContextPtr) const override; @@ -155,6 +162,7 @@ class IcebergMetadata : public IDataLakeMetadata LoggerPtr log; const ObjectStoragePtr object_storage; + mutable std::shared_ptr secondary_storages; // Sometimes data or manifests can be located on another storage DB::Iceberg::PersistentTableComponents persistent_components; const DataLakeStorageSettings & data_lake_settings; const String write_format; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadataFilesCache.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadataFilesCache.h index 0bc228c40c07..2eb02960a044 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadataFilesCache.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadataFilesCache.h @@ -31,7 +31,7 @@ namespace DB /// And we can get `ManifestFileContent` from cache by ManifestFileEntry. struct ManifestFileCacheKey { - String manifest_file_path; + String manifest_file_absolute_path; Int64 added_sequence_number; Int64 added_snapshot_id; Iceberg::ManifestFileContentType content_type; @@ -73,7 +73,7 @@ struct IcebergMetadataFilesCacheCell : private boost::noncopyable size_t total_size = 0; for (const auto & entry: manifest_file_cache_keys) { - total_size += sizeof(ManifestFileCacheKey) + entry.manifest_file_path.capacity(); + total_size += sizeof(ManifestFileCacheKey) + entry.manifest_file_absolute_path.capacity(); } return total_size; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp index 885b0c011907..4efd8e268c39 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -253,7 +254,6 @@ ManifestFileContent::ManifestFileContent( content_type = FileContentType(manifest_file_deserializer.getValueFromRowByName(i, c_data_file_content, TypeIndex::Int32).safeGet()); const auto status = ManifestEntryStatus(manifest_file_deserializer.getValueFromRowByName(i, f_status, TypeIndex::Int32).safeGet()); - if (status == ManifestEntryStatus::DELETED) continue; @@ -296,9 +296,8 @@ ManifestFileContent::ManifestFileContent( } const auto schema_id = schema_id_opt.has_value() ? schema_id_opt.value() : manifest_schema_id; - const auto file_path_key - = manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet(); - const auto file_path = getProperFilePathFromMetadataInfo(manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet(), common_path, table_location); + const auto file_path_from_metadata = manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet(); + const auto file_path = makeAbsolutePath(table_location, file_path_from_metadata); /// NOTE: This is weird, because in manifest file partition looks like this: /// { @@ -442,23 +441,22 @@ ManifestFileContent::ManifestFileContent( switch (content_type) { case FileContentType::DATA: - this->data_files_without_deleted.emplace_back( - std::make_shared( - file_path_key, - file_path, - i, - status, - added_sequence_number, - snapshot_id, - schema_id, - partition_key_value, - common_partition_specification, - columns_infos, - file_format, - /*lower_reference_data_file_path_ = */ std::nullopt, - /*upper_reference_data_file_path_ = */ std::nullopt, - /*equality_ids*/ std::nullopt, - sort_order_id)); + this->data_files_without_deleted.emplace_back(std::make_shared( + file_path_from_metadata, + file_path, + i, + status, + added_sequence_number, + snapshot_id, + schema_id, + partition_key_value, + common_partition_specification, + columns_infos, + file_format, + /*lower_reference_data_file_path_ = */ std::nullopt, + /*upper_reference_data_file_path_ = */ std::nullopt, + /*equality_ids*/ std::nullopt, + sort_order_id)); break; case FileContentType::POSITION_DELETE: { @@ -490,7 +488,7 @@ ManifestFileContent::ManifestFileContent( } this->position_deletes_files_without_deleted.emplace_back( std::make_shared( - file_path_key, + file_path_from_metadata, file_path, i, status, @@ -522,7 +520,7 @@ ManifestFileContent::ManifestFileContent( "Couldn't find field {} in equality delete file entry", c_data_file_equality_ids); this->equality_deletes_files.emplace_back( std::make_shared( - file_path_key, + file_path_from_metadata, file_path, i, status, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h index 43ebcc594f4b..a0aa549b5eda 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h @@ -9,6 +9,7 @@ #include #include #include +#include #include @@ -58,7 +59,7 @@ using PartitionSpecification = std::vector; struct ManifestFileEntry : public boost::noncopyable { // It's the original string in the Iceberg metadata - String file_path_key; + String file_path_from_metadata; // It's a processed file path to be used by Object Storage String file_path; Int64 row_number; @@ -84,7 +85,7 @@ struct ManifestFileEntry : public boost::noncopyable String dumpDeletesMatchingInfo() const; ManifestFileEntry( - const String& file_path_key_, + const String& file_path_from_metadata_, const String& file_path_, Int64 row_number_, ManifestEntryStatus status_, @@ -99,7 +100,7 @@ struct ManifestFileEntry : public boost::noncopyable std::optional upper_reference_data_file_path_, std::optional> equality_ids_, std::optional sort_order_id_) - : file_path_key(file_path_key_) + : file_path_from_metadata(file_path_from_metadata_) , file_path(file_path_) , row_number(row_number_) , status(status_) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp index 73bf48fa44a0..deeb05a49102 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp @@ -218,13 +218,16 @@ static std::optional writeDataFiles( Field cur_value; col_data_filename.column->get(i, cur_value); - String path_without_namespace; - if (cur_value.safeGet().starts_with(blob_storage_namespace_name)) - path_without_namespace = cur_value.safeGet().substr(blob_storage_namespace_name.size()); + String original_path = cur_value.safeGet(); + String file_path_key = original_path; - if (!path_without_namespace.starts_with('/')) - path_without_namespace = "/" + path_without_namespace; - col_data_filename_without_namespaces->insert(path_without_namespace); + if (original_path.starts_with(blob_storage_namespace_name)) + file_path_key = original_path.substr(blob_storage_namespace_name.size()); + + if (!file_path_key.empty() && !file_path_key.starts_with('/')) + file_path_key = "/" + file_path_key; + + col_data_filename_without_namespaces->insert(file_path_key); } col_data_filename.column = std::move(col_data_filename_without_namespaces); Columns chunk_pos_delete; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp index 8d7820f9ea71..de095af8ca59 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp @@ -60,7 +60,7 @@ Poco::JSON::Array::Ptr IcebergPositionDeleteTransform::getSchemaFields() void IcebergPositionDeleteTransform::initializeDeleteSources() { /// Create filter on the data object to get interested rows - auto iceberg_data_path = iceberg_object_info->info.data_object_file_path_key; + auto iceberg_data_path = iceberg_object_info->info.data_object_file_path_from_metadata; ASTPtr where_ast = makeASTFunction( "equals", make_intrusive(IcebergPositionDeleteTransform::data_file_path_column_name), @@ -68,11 +68,12 @@ void IcebergPositionDeleteTransform::initializeDeleteSources() for (const auto & position_deletes_object : iceberg_object_info->info.position_deletes_objects) { + /// Resolve the position delete file path to get the correct storage and key + auto [delete_storage_to_use, resolved_key] = resolveObjectStorageForPath( + table_location, position_deletes_object.file_path, object_storage, secondary_storages, context); - auto object_path = position_deletes_object.file_path; - auto object_metadata = object_storage->getObjectMetadata(object_path, /*with_tags=*/ false); - auto object_info = RelativePathWithMetadata{object_path, object_metadata}; - + auto object_metadata = delete_storage_to_use->getObjectMetadata(resolved_key, /*with_tags=*/ false); + RelativePathWithMetadata object_info(resolved_key, object_metadata); String format = position_deletes_object.file_format; if (boost::to_lower_copy(format) != "parquet") @@ -80,7 +81,7 @@ void IcebergPositionDeleteTransform::initializeDeleteSources() Block initial_header; { - std::unique_ptr read_buf_schema = createReadBuffer(object_info, object_storage, context, log); + std::unique_ptr read_buf_schema = createReadBuffer(object_info, delete_storage_to_use, context, log); auto schema_reader = FormatFactory::instance().getSchemaReader(format, *read_buf_schema, context); auto columns_with_names = schema_reader->readSchema(); ColumnsWithTypeAndName initial_header_data; @@ -91,9 +92,9 @@ void IcebergPositionDeleteTransform::initializeDeleteSources() initial_header = Block(initial_header_data); } - CompressionMethod compression_method = chooseCompressionMethod(object_path, "auto"); + CompressionMethod compression_method = chooseCompressionMethod(resolved_key, "auto"); - delete_read_buffers.push_back(createReadBuffer(object_info, object_storage, context, log)); + delete_read_buffers.push_back(createReadBuffer(object_info, delete_storage_to_use, context, log)); auto syntax_result = TreeRewriter(context).analyze(where_ast, initial_header.getNamesAndTypesList()); ExpressionAnalyzer analyzer(where_ast, syntax_result, context); @@ -155,7 +156,7 @@ void IcebergBitmapPositionDeleteTransform::initialize() { // Add filename matching check auto filename_in_delete_record = filename_column->getDataAt(i); - auto current_data_file_path = iceberg_object_info->info.data_object_file_path_key; + auto current_data_file_path = iceberg_object_info->info.data_object_file_path_from_metadata; // Only add to delete bitmap when the filename in delete record matches current data file path if (filename_in_delete_record == current_data_file_path || filename_in_delete_record == "/" + current_data_file_path) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h index 0062952ee7c6..424e0e381bca 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h @@ -28,7 +28,9 @@ class IcebergPositionDeleteTransform : public ISimpleTransform ObjectStoragePtr object_storage_, const std::optional & format_settings_, FormatParserSharedResourcesPtr parser_shared_resources_, - ContextPtr context_) + ContextPtr context_, + const String & table_location_, + SecondaryStorages & secondary_storages_) : ISimpleTransform(header_, header_, false) , header(header_) , iceberg_object_info(iceberg_object_info_) @@ -36,6 +38,8 @@ class IcebergPositionDeleteTransform : public ISimpleTransform , format_settings(format_settings_) , context(context_) , parser_shared_resources(parser_shared_resources_) + , table_location(table_location_) + , secondary_storages(secondary_storages_) { initializeDeleteSources(); } @@ -56,6 +60,9 @@ class IcebergPositionDeleteTransform : public ISimpleTransform ContextPtr context; FormatParserSharedResourcesPtr parser_shared_resources; + const String table_location; + SecondaryStorages & secondary_storages; + /// We need to keep the read buffers alive since the delete_sources depends on them. std::vector> delete_read_buffers; std::vector> delete_sources; @@ -72,8 +79,10 @@ class IcebergBitmapPositionDeleteTransform : public IcebergPositionDeleteTransfo ObjectStoragePtr object_storage_, const std::optional & format_settings_, FormatParserSharedResourcesPtr parser_shared_resources_, - ContextPtr context_) - : IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, parser_shared_resources_, context_) + ContextPtr context_, + const String & table_location_, + SecondaryStorages & secondary_storages_) + : IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, parser_shared_resources_, context_, table_location_, secondary_storages_) { initialize(); } @@ -98,8 +107,10 @@ class IcebergStreamingPositionDeleteTransform : public IcebergPositionDeleteTran ObjectStoragePtr object_storage_, const std::optional & format_settings_, FormatParserSharedResourcesPtr parser_shared_resources_, - ContextPtr context_) - : IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, parser_shared_resources_, context_) + ContextPtr context_, + const String & table_location_, + SecondaryStorages & secondary_storages_) + : IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, parser_shared_resources_, context_, table_location_, secondary_storages_) { initialize(); } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h index 47fc360ad13d..13f10182e612 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h @@ -36,7 +36,7 @@ struct IcebergHistoryRecord DB::DateTime64 made_current_at; Int64 parent_id; bool is_current_ancestor; - String manifest_list_path; + String manifest_list_absolute_path; Int32 added_files = 0; Int32 added_records = 0; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp index f7d7ca2734b6..e03b4850f373 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp @@ -70,9 +70,10 @@ Iceberg::ManifestFilePtr getManifestFile( const PersistentTableComponents & persistent_table_components, ContextPtr local_context, LoggerPtr log, - const String & filename, + const String & absolute_path, Int64 inherited_sequence_number, - Int64 inherited_snapshot_id) + Int64 inherited_snapshot_id, + SecondaryStorages & secondary_storages) { auto log_level = local_context->getSettingsRef()[Setting::iceberg_metadata_log_level].value; @@ -81,19 +82,22 @@ Iceberg::ManifestFilePtr getManifestFile( auto create_fn = [&, use_iceberg_metadata_cache]() { - RelativePathWithMetadata manifest_object_info(filename); + auto [storage_to_use, resolved_key_in_storage] = resolveObjectStorageForPath( + persistent_table_components.table_location, absolute_path, object_storage, secondary_storages, local_context); + + RelativePathWithMetadata manifest_object_info(resolved_key_in_storage); auto read_settings = local_context->getReadSettings(); /// Do not utilize filesystem cache if more precise cache enabled if (use_iceberg_metadata_cache) read_settings.enable_filesystem_cache = false; - auto buffer = createReadBuffer(manifest_object_info, object_storage, local_context, log, read_settings); - Iceberg::AvroForIcebergDeserializer manifest_file_deserializer(std::move(buffer), filename, getFormatSettings(local_context)); + auto buffer = createReadBuffer(manifest_object_info, storage_to_use, local_context, log, read_settings); + Iceberg::AvroForIcebergDeserializer manifest_file_deserializer(std::move(buffer), resolved_key_in_storage, getFormatSettings(local_context)); return std::make_shared( manifest_file_deserializer, - filename, + resolved_key_in_storage, persistent_table_components.format_version, persistent_table_components.table_path, *persistent_table_components.schema_processor, @@ -101,13 +105,13 @@ Iceberg::ManifestFilePtr getManifestFile( inherited_snapshot_id, persistent_table_components.table_location, local_context, - filename); + absolute_path); }; if (use_iceberg_metadata_cache && persistent_table_components.table_uuid.has_value()) { auto manifest_file = persistent_table_components.metadata_cache->getOrSetManifestFile( - IcebergMetadataFilesCache::getKey(persistent_table_components.table_uuid.value(), filename), create_fn); + IcebergMetadataFilesCache::getKey(persistent_table_components.table_uuid.value(), absolute_path), create_fn); return manifest_file; } return create_fn(); @@ -117,8 +121,9 @@ ManifestFileCacheKeys getManifestList( ObjectStoragePtr object_storage, const PersistentTableComponents & persistent_table_components, ContextPtr local_context, - const String & filename, - LoggerPtr log) + const String & absolute_path, + LoggerPtr log, + SecondaryStorages & secondary_storages) { IcebergMetadataLogLevel log_level = local_context->getSettingsRef()[Setting::iceberg_metadata_log_level].value; @@ -127,24 +132,27 @@ ManifestFileCacheKeys getManifestList( auto create_fn = [&, use_iceberg_metadata_cache]() { - RelativePathWithMetadata object_info(filename); + auto [storage_to_use, key_in_storage] = resolveObjectStorageForPath( + persistent_table_components.table_location, absolute_path, object_storage, secondary_storages, local_context); + + RelativePathWithMetadata object_info(key_in_storage); auto read_settings = local_context->getReadSettings(); /// Do not utilize filesystem cache if more precise cache enabled if (use_iceberg_metadata_cache) read_settings.enable_filesystem_cache = false; - auto manifest_list_buf = createReadBuffer(object_info, object_storage, local_context, log, read_settings); - AvroForIcebergDeserializer manifest_list_deserializer(std::move(manifest_list_buf), filename, getFormatSettings(local_context)); + auto manifest_list_buf = createReadBuffer(object_info, storage_to_use, local_context, log, read_settings); + AvroForIcebergDeserializer manifest_list_deserializer(std::move(manifest_list_buf), key_in_storage, getFormatSettings(local_context)); ManifestFileCacheKeys manifest_file_cache_keys; - insertRowToLogTable( + insertRowToLogTable( local_context, manifest_list_deserializer.getMetadataContent(), DB::IcebergMetadataLogLevel::ManifestListMetadata, persistent_table_components.table_path, - filename, + key_in_storage, std::nullopt, std::nullopt); @@ -152,8 +160,8 @@ ManifestFileCacheKeys getManifestList( { const std::string file_path = manifest_list_deserializer.getValueFromRowByName(i, f_manifest_path, TypeIndex::String).safeGet(); - const auto manifest_file_name = getProperFilePathFromMetadataInfo( - file_path, persistent_table_components.table_path, persistent_table_components.table_location); + + const auto manifest_absolute_path = makeAbsolutePath(persistent_table_components.table_location, file_path); Int64 added_sequence_number = 0; auto added_snapshot_id = manifest_list_deserializer.getValueFromRowByName(i, f_added_snapshot_id); if (added_snapshot_id.isNull()) @@ -172,14 +180,14 @@ ManifestFileCacheKeys getManifestList( manifest_list_deserializer.getValueFromRowByName(i, f_content, TypeIndex::Int32).safeGet()); } manifest_file_cache_keys.emplace_back( - manifest_file_name, added_sequence_number, added_snapshot_id.safeGet(), content_type); + manifest_absolute_path, added_sequence_number, added_snapshot_id.safeGet(), content_type); insertRowToLogTable( local_context, manifest_list_deserializer.getContent(i), DB::IcebergMetadataLogLevel::ManifestListEntry, persistent_table_components.table_path, - filename, + key_in_storage, i, std::nullopt); } @@ -192,7 +200,7 @@ ManifestFileCacheKeys getManifestList( ManifestFileCacheKeys manifest_file_cache_keys; if (use_iceberg_metadata_cache && persistent_table_components.table_uuid.has_value()) manifest_file_cache_keys = persistent_table_components.metadata_cache->getOrSetManifestFileCacheKeys( - IcebergMetadataFilesCache::getKey(persistent_table_components.table_uuid.value(), filename), create_fn); + IcebergMetadataFilesCache::getKey(persistent_table_components.table_uuid.value(), absolute_path), create_fn); else manifest_file_cache_keys = create_fn(); return manifest_file_cache_keys; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h index 41ae27c5a052..fc1c52d61870 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h @@ -19,6 +19,7 @@ #include #include +#include namespace DB::Iceberg { @@ -28,17 +29,19 @@ Iceberg::ManifestFilePtr getManifestFile( const PersistentTableComponents & persistent_table_components, ContextPtr local_context, LoggerPtr log, - const String & filename, + const String & absolute_path, Int64 inherited_sequence_number, - Int64 inherited_snapshot_id); + Int64 inherited_snapshot_id, + SecondaryStorages & secondary_storages); ManifestFileCacheKeys getManifestList( ObjectStoragePtr object_storage, const PersistentTableComponents & persistent_table_components, ContextPtr local_context, - const String & filename, - LoggerPtr log); + const String & absolute_path, + LoggerPtr log, + SecondaryStorages & secondary_storages); } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index 681db549db38..46b6e74d3253 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -36,12 +36,13 @@ #include #include #include +#include #if USE_AVRO #include -#include #include +#include #include #include #include @@ -92,7 +93,6 @@ static constexpr auto MAX_TRANSACTION_RETRIES = 100; namespace DB::Iceberg { - using namespace DB; static CompressionMethod getCompressionMethodFromMetadataFile(const String & path) { @@ -301,78 +301,6 @@ std::optional parseTransformAndArgument(const String & tra return std::nullopt; } -// This function is used to get the file path inside the directory which corresponds to iceberg table from the full blob path which is written in manifest and metadata files. -// For example, if the full blob path is s3://bucket/table_name/data/00000-1-1234567890.avro, the function will return table_name/data/00000-1-1234567890.avro -// Common path should end with "" or "/". -std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location) -{ - auto trim_backward_slash = [](std::string_view str) -> std::string_view - { - if (str.ends_with('/')) - { - return str.substr(0, str.size() - 1); - } - return str; - }; - auto trim_forward_slash = [](std::string_view str) -> std::string_view - { - if (str.starts_with('/')) - { - return str.substr(1); - } - return str; - }; - common_path = trim_backward_slash(common_path); - table_location = trim_backward_slash(table_location); - - if (data_path.starts_with(table_location) && table_location.ends_with(common_path)) - { - return std::filesystem::path{common_path} / trim_forward_slash(data_path.substr(table_location.size())); - } - - - auto pos = data_path.find(common_path); - /// Valid situation when data and metadata files are stored in different directories. - if (pos == std::string::npos) - { - /// connection://bucket - auto prefix = table_location.substr(0, table_location.size() - common_path.size()); - return std::string{data_path.substr(prefix.size())}; - } - - size_t good_pos = std::string::npos; - while (pos != std::string::npos) - { - auto potential_position = pos + common_path.size(); - if ((std::string_view(data_path.data() + potential_position, 6) == "/data/") - || (std::string_view(data_path.data() + potential_position, 10) == "/metadata/")) - { - good_pos = pos; - break; - } - size_t new_pos = data_path.find(common_path, pos + 1); - if (new_pos == std::string::npos) - { - break; - } - pos = new_pos; - } - - - if (good_pos != std::string::npos) - { - return std::string{data_path.substr(good_pos)}; - } - else if (pos != std::string::npos) - { - return std::string{data_path.substr(pos)}; - } - else - { - throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Expected to find '{}' in data path: '{}'", common_path, data_path); - } -} - enum class MostRecentMetadataFileSelectionWay { BY_LAST_UPDATED_MS_FIELD, @@ -1448,3 +1376,29 @@ void sortBlockByKeyDescription(Block & block, const KeyDescription & sort_descri } #endif + +namespace DB +{ + +ObjectStoragePtr getResolvedStorageFromObjectInfo([[maybe_unused]] const ObjectInfoPtr & object_info, const ObjectStoragePtr & default_storage) +{ +#if USE_AVRO + if (auto iceberg_info = std::dynamic_pointer_cast(object_info)) + { + if (auto resolved = iceberg_info->getResolvedStorage()) + return resolved; + } +#endif + return default_storage; +} + +std::optional getAbsolutePathFromObjectInfo([[maybe_unused]] const ObjectInfoPtr & object_info) +{ +#if USE_AVRO + if (auto iceberg_info = std::dynamic_pointer_cast(object_info)) + return iceberg_info->getAbsolutePath(); +#endif + return std::nullopt; +} + +} diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h index ba7e5619bf60..3c69fe05d523 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h @@ -1,5 +1,7 @@ #pragma once +#include +#include #include #include #include @@ -12,9 +14,20 @@ #include #include +#include + +namespace DB +{ +struct ObjectInfo; +using ObjectInfoPtr = std::shared_ptr; + +/// These functions are always available; they return fallback values when USE_AVRO is not defined +ObjectStoragePtr getResolvedStorageFromObjectInfo([[maybe_unused]] const ObjectInfoPtr & object_info, const ObjectStoragePtr & default_storage); +std::optional getAbsolutePathFromObjectInfo([[maybe_unused]] const ObjectInfoPtr & object_info); +} + #if USE_AVRO -#include #include #include #include @@ -49,8 +62,6 @@ bool writeMetadataFileAndVersionHint( bool try_write_version_hint ); -std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location); - struct TransformAndArgument { String transform_name; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 60748c6a1faa..a2e5fdd18c18 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -36,6 +36,8 @@ #include #include #include +#include +#include #if ENABLE_DISTRIBUTED_CACHE #include #include @@ -83,6 +85,7 @@ namespace ErrorCodes extern const int FILE_DOESNT_EXIST; } + StorageObjectStorageSource::StorageObjectStorageSource( String name_, ObjectStoragePtr object_storage_, @@ -165,6 +168,7 @@ std::shared_ptr StorageObjectStorageSource::createFileIterator( local_context->getSettingsRef()[Setting::max_threads], /*is_archive_=*/is_archive && !expect_whole_archive, object_storage, + configuration->getPathForRead().path, local_context); if (is_archive && expect_whole_archive) @@ -406,11 +410,15 @@ Chunk StorageObjectStorageSource::generate() path); } + /// For _path column, use absolute path if available (e.g., file:///home/...) + /// Otherwise, fall back to key in storage (relative path) + std::string path_for_virtual_column = getAbsolutePathFromObjectInfo(object_info).value_or(path); + VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk( chunk, read_from_format_info.requested_virtual_columns, { - .path = path, + .path = path_for_virtual_column, .size = object_info->isArchive() ? object_info->fileSizeInArchive() : object_metadata->size_bytes, .filename = &filename, .last_modified = object_metadata->last_modified, @@ -576,16 +584,18 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade bool with_tags = read_from_format_info.requested_virtual_columns.contains("_tags"); const auto & path = object_info->isArchive() ? object_info->getPathToArchive() : object_info->getPath(); + ObjectStoragePtr storage_to_use = getResolvedStorageFromObjectInfo(object_info, object_storage); + if (query_settings.ignore_non_existent_file) { - auto metadata = object_storage->tryGetObjectMetadata(path, with_tags); + auto metadata = storage_to_use->tryGetObjectMetadata(path, with_tags); if (!metadata) return {}; object_info->setObjectMetadata(metadata.value()); } else - object_info->setObjectMetadata(object_storage->getObjectMetadata(path, with_tags)); + object_info->setObjectMetadata(storage_to_use->getObjectMetadata(path, with_tags)); } } while (query_settings.skip_empty_files && object_info->getObjectMetadata()->size_bytes == 0); @@ -642,7 +652,11 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade else { compression_method = chooseCompressionMethod(object_info->getFileName(), configuration->compression_method); - read_buf = createReadBuffer(object_info->relative_path_with_metadata, object_storage, context_, log); + read_buf = createReadBuffer( + object_info->relative_path_with_metadata, + getResolvedStorageFromObjectInfo(object_info, object_storage), + context_, + log); } Block initial_header = read_from_format_info.format_header; @@ -1203,11 +1217,13 @@ StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator( size_t max_threads_count, bool is_archive_, ObjectStoragePtr object_storage_, + const std::string & table_location_, ContextPtr context_) : WithContext(context_) , callback(callback_) , is_archive(is_archive_) , object_storage(object_storage_) + , table_location(table_location_) { ThreadPool pool( CurrentMetrics::StorageObjectStorageThreads, @@ -1233,7 +1249,25 @@ StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator( { auto object = object_future.get(); if (object) + { +#if USE_AVRO + /// For Iceberg objects, resolve the storage from the absolute path + if (auto iceberg_info = std::dynamic_pointer_cast(object)) + { + if (auto abs_path = iceberg_info->getAbsolutePath()) + { + auto [storage_to_use, key] = resolveObjectStorageForPath( + table_location, *abs_path, object_storage, secondary_storages, getContext()); + if (!key.empty()) + { + iceberg_info->setResolvedStorage(storage_to_use); + iceberg_info->relative_path_with_metadata.relative_path = key; + } + } + } +#endif buffer.push_back(object); + } } } @@ -1244,10 +1278,28 @@ ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator::next(size_t) ObjectInfoPtr object_info; if (current_index >= buffer.size()) { - auto task = callback(); - if (!task || task->isEmpty()) + auto raw = callback(); + if (!raw || raw->isEmpty()) return nullptr; - object_info = task->getObjectInfo(); + + object_info = raw->getObjectInfo(); + +#if USE_AVRO + /// For Iceberg objects, resolve the storage from the absolute path + if (auto iceberg_info = std::dynamic_pointer_cast(object_info)) + { + if (auto abs_path = iceberg_info->getAbsolutePath()) + { + auto [storage_to_use, key] = resolveObjectStorageForPath( + table_location, *abs_path, object_storage, secondary_storages, getContext()); + if (!key.empty() && storage_to_use != object_storage) + { + iceberg_info->setResolvedStorage(storage_to_use); + iceberg_info->relative_path_with_metadata.relative_path = key; + } + } + } +#endif } else { @@ -1342,7 +1394,10 @@ StorageObjectStorageSource::ArchiveIterator::createArchiveReader(ObjectInfoPtr o /* path_to_archive */ object_info->getPath(), /* archive_read_function */ [=, this]() - { return createReadBuffer(object_info->relative_path_with_metadata, object_storage, getContext(), log); }, + { + auto storage = getResolvedStorageFromObjectInfo(object_info, object_storage); + return createReadBuffer(object_info->relative_path_with_metadata, storage, getContext(), log); + }, /* archive_size */ size); } @@ -1364,7 +1419,10 @@ ObjectInfoPtr StorageObjectStorageSource::ArchiveIterator::next(size_t processor } if (!archive_object->getObjectMetadata()) - archive_object->setObjectMetadata(object_storage->getObjectMetadata(archive_object->getPath(), /*with_tags=*/ false)); + { + ObjectStoragePtr storage_to_use = getResolvedStorageFromObjectInfo(archive_object, object_storage); + archive_object->setObjectMetadata(storage_to_use->getObjectMetadata(archive_object->getPath(), /*with_tags=*/ false)); + } archive_reader = createArchiveReader(archive_object); file_enumerator = archive_reader->firstFile(); @@ -1390,7 +1448,10 @@ ObjectInfoPtr StorageObjectStorageSource::ArchiveIterator::next(size_t processor return {}; if (!archive_object->getObjectMetadata()) - archive_object->setObjectMetadata(object_storage->getObjectMetadata(archive_object->getPath(), /*with_tags=*/ false)); + { + ObjectStoragePtr storage_to_use = getResolvedStorageFromObjectInfo(archive_object, object_storage); + archive_object->setObjectMetadata(storage_to_use->getObjectMetadata(archive_object->getPath(), /*with_tags=*/ false)); + } archive_reader = createArchiveReader(archive_object); if (!archive_reader->fileExists(path_in_archive)) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.h b/src/Storages/ObjectStorage/StorageObjectStorageSource.h index 77fa731de958..1ac8bf926e28 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -156,6 +157,7 @@ class StorageObjectStorageSource::ReadTaskIterator : public IObjectIterator, pri size_t max_threads_count, bool is_archive_, ObjectStoragePtr object_storage_, + const std::string & table_location_, ContextPtr context_); ObjectInfoPtr next(size_t) override; @@ -170,6 +172,10 @@ class StorageObjectStorageSource::ReadTaskIterator : public IObjectIterator, pri std::atomic_size_t index = 0; bool is_archive; ObjectStoragePtr object_storage; + std::string table_location; +#if USE_AVRO + SecondaryStorages secondary_storages; /// For Iceberg: cache of storages for external file locations +#endif /// path_to_archive -> archive reader. std::unordered_map> archive_readers; std::mutex archive_readers_mutex; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 5a54601b171c..943543ca26ae 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -1,4 +1,6 @@ #include +#include +#include #include #include #include @@ -81,7 +83,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t auto next_file = files.back(); files.pop_back(); - auto file_identifier = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getIdentifier(); + auto file_identifier = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : getAbsolutePathFromObjectInfo(next_file).value_or(next_file->getIdentifier()); auto it = unprocessed_files.find(file_identifier); if (it == unprocessed_files.end()) continue; @@ -135,7 +137,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter } else { - file_identifier = object_info->getIdentifier(); + file_identifier = getAbsolutePathFromObjectInfo(object_info).value_or(object_info->getIdentifier()); } size_t file_replica_idx = getReplicaForFile(file_identifier); @@ -177,7 +179,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s auto next_file = it->second; unprocessed_files.erase(it); - auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getPath(); + auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : getAbsolutePathFromObjectInfo(next_file).value_or(next_file->getPath()); LOG_TRACE( log, "Iterator exhausted. Assigning unprocessed file {} to replica {}", diff --git a/src/Storages/ObjectStorage/Utils.cpp b/src/Storages/ObjectStorage/Utils.cpp index f1cb422e2cd2..b5330ac180b2 100644 --- a/src/Storages/ObjectStorage/Utils.cpp +++ b/src/Storages/ObjectStorage/Utils.cpp @@ -1,19 +1,36 @@ #include #include +#include #include #include #include #include +#include #include #include #include #include +#include #include #include #include #include #include #include +#include + +#include +#include +#include +#if USE_AWS_S3 +#include +#endif +#if USE_AZURE_BLOB_STORAGE +#include +#endif +#if USE_HDFS +#include +#endif namespace DB { @@ -25,6 +42,195 @@ namespace ErrorCodes extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; } +namespace +{ + +#if USE_AVRO +std::string normalizeScheme(const std::string & scheme) +{ + auto scheme_lowercase = Poco::toLower(scheme); + + if (scheme_lowercase == "s3a" || scheme_lowercase == "s3n" || scheme_lowercase == "gs" || scheme_lowercase == "gcs" || scheme_lowercase == "oss") + scheme_lowercase = "s3"; + else if (scheme_lowercase == "wasb" || scheme_lowercase == "wasbs" || scheme_lowercase == "abfss") + scheme_lowercase = "abfs"; + + return scheme_lowercase; +} + +std::string factoryTypeForScheme(const std::string & normalized_scheme) +{ + if (normalized_scheme == "s3") return "s3"; + if (normalized_scheme == "abfs") return "azure"; + if (normalized_scheme == "hdfs") return "hdfs"; + if (normalized_scheme == "file") return "local"; + return ""; +} + +#if USE_AWS_S3 +/// For s3:// URIs (generic), bucket needs to match. +/// For explicit http(s):// URIs, both bucket and endpoint must match. +bool s3URIMatches(const S3::URI & target_uri, const std::string & base_bucket, const std::string & base_endpoint, const std::string & target_scheme_normalized) +{ + bool bucket_matches = (target_uri.bucket == base_bucket); + bool endpoint_matches = (target_uri.endpoint == base_endpoint); + bool is_generic_s3_uri = (target_scheme_normalized == "s3"); + return bucket_matches && (endpoint_matches || is_generic_s3_uri); +} + +bool sameEndpointAuthority(const std::string & a, const std::string & b) +{ + SchemeAuthorityKey pa(a); + SchemeAuthorityKey pb(b); + if (pa.authority.empty() || pb.authority.empty()) + return false; + return pa.authority == pb.authority; +} +#endif +std::pair getOrCreateStorageAndKey( + const std::string & cache_key, + const std::string & key_to_use, + const std::string & storage_type, + SecondaryStorages & secondary_storages, + const ContextPtr & context, + std::function configure_fn) +{ + std::lock_guard lock(secondary_storages.mutex); + if (auto it = secondary_storages.storages.find(cache_key); it != secondary_storages.storages.end()) + return {it->second, key_to_use}; + + Poco::AutoPtr cfg(new Poco::Util::MapConfiguration); + const std::string config_prefix = "object_storages." + cache_key; + + cfg->setString(config_prefix + ".object_storage_type", storage_type); + + configure_fn(*cfg, config_prefix); + + /// Create under lock to avoid duplicate creation and wasted work + ObjectStoragePtr storage = ObjectStorageFactory::instance().create(cache_key, *cfg, config_prefix, context, /*skip_access_check*/ true); + + secondary_storages.storages.emplace(cache_key, storage); + return {storage, key_to_use}; +} +#endif + +bool isAbsolutePath(const std::string & path) +{ + if (!path.empty() && (path.front() == '/' || path.find("://") != std::string_view::npos)) + return true; + + return false; +} + +/// Normalize a path string by removing redundant components and leading slashes. +std::string normalizePathString(const std::string & path) +{ + std::filesystem::path fs_path(path); + std::filesystem::path normalized = fs_path.lexically_normal(); + + std::string normalized_result = normalized.string(); + + while (!normalized_result.empty() && normalized_result.front() == '/') + normalized_result = normalized_result.substr(1); + + return normalized_result; +} + +/// Convert a path (relative to table location or absolute path) to a key that will be looked up in the object storage. +/// +/// - If `table_location` is empty, the path is treated as already relative to storage root. +/// - If `path` is an absolute path, its key component (without scheme/authority) is returned. +/// - If `table_location` parses to a URI whose key part is empty, `path` is returned unchanged (exception will be thrown when looking up non-existing object in storage) +/// +/// - Otherwise, `path` is treated as relative to `table_location`'s key: +/// leading '/' stripped, concatenated to table_location key, and the result is normalized. +std::string convertPathToKeyInStorage(const std::string & table_location, const std::string & path) +{ + if (table_location.empty()) + { + if (!path.empty() && path.front() == '/') + return path.substr(1); + return path; + } + + if (isAbsolutePath(path)) + return SchemeAuthorityKey(path).key; // Absolute path, return the key part + + SchemeAuthorityKey base{table_location}; + if (base.key.empty()) + return path; // Table location is empty, return the path as is + + std::string base_key_trimmed = base.key; + while (!base_key_trimmed.empty() && base_key_trimmed.front() == '/') + base_key_trimmed = base_key_trimmed.substr(1); + while (!base_key_trimmed.empty() && base_key_trimmed.back() == '/') + base_key_trimmed.pop_back(); + + std::string rel_path = path; + while (!rel_path.empty() && rel_path.front() == '/') + rel_path = rel_path.substr(1); + + if (!base_key_trimmed.empty() && (rel_path == base_key_trimmed || rel_path.starts_with(base_key_trimmed + "/"))) + return normalizePathString(rel_path); // Path already includes table location + + std::string result = base.key; + if (!result.empty() && result.back() != '/') + result += '/'; + result += rel_path; + + return normalizePathString(result); +} + +} + +SchemeAuthorityKey::SchemeAuthorityKey(const std::string & uri) +{ + if (uri.empty()) + return; + + if (auto scheme_sep = uri.find("://"); scheme_sep != std::string_view::npos) + { + scheme = Poco::toLower(uri.substr(0, scheme_sep)); + auto rest = uri.substr(scheme_sep + 3); // skip :// + + // authority is up to next '/' + auto slash = rest.find('/'); + if (slash == std::string_view::npos) + { + /// Bad URI: missing path component after authority. + /// Exception will be thrown when looking up non-existing object in the storage, so we can just return here. + authority = std::string(rest); + key = "/"; + return; + } + authority = std::string(rest.substr(0, slash)); + /// For file:// URIs, the path is absolute, so we need to keep the leading '/' + /// e.g. file:///home/user/data -> scheme="file", authority="", key="/home/user/data" + if (scheme == "file") + key = std::string(rest.substr(slash)); + else + key = std::string(rest.substr(++slash)); + return; + } + + /// Check for scheme:/path (common for file: https://datatracker.ietf.org/doc/html/rfc8089#appendix-B) + if (auto colon = uri.find(':'); colon != std::string_view::npos && colon > 0) + { + auto after_colon = uri.substr(colon + 1); + + if (!after_colon.empty() && after_colon[0] == '/') + { + scheme = Poco::toLower(uri.substr(0, colon)); + authority = ""; // No authority + key = std::string(after_colon); + return; + } + } + + // Relative path (paths starting with '/' without a scheme are now handled by the caller) + key = std::string(uri); +} + std::optional checkAndGetNewFileOnInsertIfNeeded( const IObjectStorage & object_storage, const StorageObjectStorageConfiguration & configuration, @@ -255,5 +461,327 @@ extern const SettingsUInt64 max_download_buffer_size; extern const SettingsBool use_cache_for_count_from_files; extern const SettingsString filesystem_cache_name; extern const SettingsUInt64 filesystem_cache_boundary_alignment; +extern const SettingsBool s3_propagate_credentials_to_other_storages; +} + +std::string makeAbsolutePath(const std::string & table_location, const std::string & path) +{ + if (isAbsolutePath(path)) + return path; + + auto table_location_decomposed = SchemeAuthorityKey(table_location); + + std::string normalized_key = convertPathToKeyInStorage(table_location, path); + + if (!table_location_decomposed.scheme.empty()) + return table_location_decomposed.scheme + "://" + table_location_decomposed.authority + "/" + normalized_key; + + return normalized_key; } + +#if USE_AVRO +std::pair resolveObjectStorageForPath( + const std::string & table_location, + const std::string & path, + const DB::ObjectStoragePtr & base_storage, + SecondaryStorages & secondary_storages, + const DB::ContextPtr & context) +{ + if (!isAbsolutePath(path)) + return {base_storage, convertPathToKeyInStorage(table_location, path)}; // Relative path definitely goes to base storage + + SchemeAuthorityKey table_location_decomposed{table_location}; + SchemeAuthorityKey target_decomposed{path}; + + if (target_decomposed.scheme.empty() && target_decomposed.key.starts_with('/')) + return {base_storage, convertPathToKeyInStorage(table_location, target_decomposed.key)}; + + const std::string base_scheme_normalized = normalizeScheme(table_location_decomposed.scheme); + const std::string target_scheme_normalized = normalizeScheme(target_decomposed.scheme); + + // For S3 URIs, use S3::URI to properly handle all kinds of URIs, e.g. https://s3.amazonaws.com/bucket/... == s3://bucket/... + #if USE_AWS_S3 + if (target_scheme_normalized == "s3" || target_scheme_normalized == "https" || target_scheme_normalized == "http") + { + std::string normalized_path = path; + if (target_decomposed.scheme == "s3a" || target_decomposed.scheme == "s3n") + { + normalized_path = "s3://" + target_decomposed.authority + "/" + target_decomposed.key; + } + else if (target_decomposed.scheme == "gcs") + { + normalized_path = "gs://" + target_decomposed.authority + "/" + target_decomposed.key; + } + S3::URI s3_uri(normalized_path); + + std::string key_to_use = s3_uri.key; + + bool use_base_storage = false; + if (base_storage->getType() == ObjectStorageType::S3) + { + if (auto s3_storage = std::dynamic_pointer_cast(base_storage)) + { + const std::string base_bucket = s3_storage->getObjectsNamespace(); + const std::string base_endpoint = s3_storage->getDescription(); + + if (s3URIMatches(s3_uri, base_bucket, base_endpoint, target_scheme_normalized)) + use_base_storage = true; + } + } + + if (!use_base_storage && (base_scheme_normalized == "s3" || base_scheme_normalized == "https" || base_scheme_normalized == "http")) + { + std::string normalized_table_location = table_location; + if (table_location_decomposed.scheme == "s3a" || table_location_decomposed.scheme == "s3n") + { + normalized_table_location = "s3://" + table_location_decomposed.authority + "/" + table_location_decomposed.key; + } + else if (table_location_decomposed.scheme == "gcs") + { + normalized_table_location = "gs://" + table_location_decomposed.authority + "/" + table_location_decomposed.key; + } + S3::URI base_s3_uri(normalized_table_location); + + if (s3URIMatches(s3_uri, base_s3_uri.bucket, base_s3_uri.endpoint, target_scheme_normalized)) + use_base_storage = true; + } + + if (use_base_storage) + return {base_storage, key_to_use}; + + const std::string storage_cache_key = "s3://" + s3_uri.bucket + "@" + (s3_uri.endpoint.empty() ? "amazonaws.com" : s3_uri.endpoint); + + return getOrCreateStorageAndKey( + storage_cache_key, + key_to_use, + "s3", + secondary_storages, + context, + [&](Poco::Util::MapConfiguration & cfg, const std::string & config_prefix) + { + bool endpoint_explicit = (target_decomposed.scheme == "http" || target_decomposed.scheme == "https"); + + std::string endpoint_to_use; + + if (endpoint_explicit) + { + endpoint_to_use = s3_uri.endpoint.empty() + ? ("https://" + s3_uri.bucket + ".s3.amazonaws.com") + : s3_uri.endpoint; + } + else + { + std::string base_endpoint; + if (base_storage->getType() == ObjectStorageType::S3) + base_endpoint = base_storage->getDescription(); + + if (!base_endpoint.empty()) + { + if (base_endpoint.find(".s3.") != std::string::npos && base_endpoint.find(".amazonaws.com") != std::string::npos) + { + /// AWS-style: https://oldbucket.s3.us-east-1.amazonaws.com -> https://newbucket.s3.us-east-1.amazonaws.com + size_t s3_pos = base_endpoint.find(".s3."); + size_t scheme_end = base_endpoint.find("://"); + if (scheme_end != std::string::npos) + { + std::string scheme = base_endpoint.substr(0, scheme_end + 3); + std::string suffix = base_endpoint.substr(s3_pos); + + /// Trim path after endpoint + size_t slash_pos = suffix.find('/', 1); + if (slash_pos != std::string::npos) + suffix = suffix.substr(0, slash_pos); + endpoint_to_use = scheme + s3_uri.bucket + suffix; + } + } + else + { + /// Path-style (e.g. minio): http://host:port/oldbucket -> http://host:port/newbucket + size_t scheme_end = base_endpoint.find("://"); + if (scheme_end != std::string::npos) + { + size_t path_start = base_endpoint.find('/', scheme_end + 3); + if (path_start != std::string::npos) + base_endpoint = base_endpoint.substr(0, path_start); + } + if (!base_endpoint.empty() && base_endpoint.back() == '/') + base_endpoint.pop_back(); + endpoint_to_use = base_endpoint + "/" + s3_uri.bucket; + } + } + + /// Fallback: base storage is not S3 + if (endpoint_to_use.empty()) + { + endpoint_to_use = s3_uri.endpoint.empty() + ? ("https://" + s3_uri.bucket + ".s3.amazonaws.com") + : s3_uri.endpoint; + } + } + + cfg.setString(config_prefix + ".endpoint", endpoint_to_use); + + /// Copy credentials from base storage when the endpoint is the same or + /// or s3_propagate_credentials_to_other_storages is 1 + if (base_storage->getType() == ObjectStorageType::S3 + && (context->getSettingsRef()[Setting::s3_propagate_credentials_to_other_storages] + || sameEndpointAuthority(base_storage->getDescription(), endpoint_to_use))) + { + if (auto s3_storage = std::dynamic_pointer_cast(base_storage)) + { + if (auto s3_client = s3_storage->tryGetS3StorageClient()) + { + const auto credentials = s3_client->getCredentials(); + const String & access_key_id = credentials.GetAWSAccessKeyId(); + const String & secret_access_key = credentials.GetAWSSecretKey(); + const String & session_token = credentials.GetSessionToken(); + const String & region = s3_client->getRegion(); + + if (!access_key_id.empty()) + cfg.setString(config_prefix + ".access_key_id", access_key_id); + if (!secret_access_key.empty()) + cfg.setString(config_prefix + ".secret_access_key", secret_access_key); + if (!session_token.empty()) + cfg.setString(config_prefix + ".session_token", session_token); + if (!region.empty()) + cfg.setString(config_prefix + ".region", region); + } + } + } + }); + } + #endif + + #if USE_HDFS + if (target_scheme_normalized == "hdfs") + { + bool use_base_storage = false; + + // Check if base_storage matches (only if it's HDFS) + if (base_storage->getType() == ObjectStorageType::HDFS) + { + if (auto hdfs_storage = std::dynamic_pointer_cast(base_storage)) + { + const std::string base_url = hdfs_storage->getDescription(); + // Extract endpoint from base URL (hdfs://namenode:port/path -> hdfs://namenode:port) + std::string base_endpoint; + if (auto pos = base_url.find('/', base_url.find("//") + 2); pos != std::string::npos) + base_endpoint = base_url.substr(0, pos); + else + base_endpoint = base_url; + + // For HDFS, compare endpoints (namenode addresses) + std::string target_endpoint = target_scheme_normalized + "://" + target_decomposed.authority; + + if (base_endpoint == target_endpoint) + use_base_storage = true; + + // Also check if table_location matches + if (!use_base_storage && base_scheme_normalized == "hdfs") + { + if (table_location_decomposed.authority == target_decomposed.authority) + use_base_storage = true; + } + } + } + + if (use_base_storage) + return {base_storage, target_decomposed.key}; + } + #endif + + /// Fallback for schemes not handled above (e.g., abfs, file) + if (base_scheme_normalized == target_scheme_normalized && table_location_decomposed.authority == target_decomposed.authority) + return {base_storage, target_decomposed.key}; + + const std::string cache_key = target_scheme_normalized + "://" + target_decomposed.authority; + + const std::string type_for_factory = factoryTypeForScheme(target_scheme_normalized); + if (type_for_factory.empty()) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported storage scheme '{}' in path '{}'", target_scheme_normalized, path); + + /// Handle storage types that need new storage creation + return getOrCreateStorageAndKey( + cache_key, + target_decomposed.key, + type_for_factory, + secondary_storages, + context, + [&](Poco::Util::MapConfiguration & cfg, const std::string & config_prefix) + { + if (target_scheme_normalized == "file") + { + std::filesystem::path fs_path(target_decomposed.key); + std::filesystem::path parent = fs_path.parent_path(); + std::string dir_path = parent.string(); + + if (dir_path.empty() || dir_path == "/") + dir_path = "/"; + else if (dir_path.back() != '/') + dir_path += '/'; + + cfg.setString(config_prefix + ".path", dir_path); + } + else if (target_scheme_normalized == "abfs") + { + std::string container_name; + std::string account_name; + const auto & authority = target_decomposed.authority; + + auto at_pos = authority.find('@'); + if (at_pos != std::string::npos) + { + container_name = authority.substr(0, at_pos); + account_name = authority.substr(at_pos + 1); + /// Remove .dfs.core.windows.net suffix if present + auto suffix_pos = account_name.find('.'); + if (suffix_pos != std::string::npos) + account_name = account_name.substr(0, suffix_pos); + } + else + container_name = authority; + + cfg.setString(config_prefix + ".container_name", container_name); + if (!account_name.empty()) + cfg.setString(config_prefix + ".account_name", account_name); + +#if USE_AZURE_BLOB_STORAGE + /// Copy credentials from base Azure storage if available + if (base_storage->getType() == ObjectStorageType::Azure) + { + if (auto azure_storage = std::dynamic_pointer_cast(base_storage)) + { + const auto & conn_params = azure_storage->getConnectionParameters(); + const auto & auth_method = azure_storage->getAzureBlobStorageAuthMethod(); + + if (std::holds_alternative(auth_method)) + { + cfg.setString(config_prefix + ".connection_string", + std::get(auth_method).toUnderType()); + } + else + { + const auto & endpoint = conn_params.endpoint; + if (!endpoint.storage_account_url.empty()) + cfg.setString(config_prefix + ".storage_account_url", endpoint.storage_account_url); + if (account_name.empty() && !endpoint.account_name.empty()) + cfg.setString(config_prefix + ".account_name", endpoint.account_name); + } + } + } +#endif + } + else if (target_scheme_normalized == "hdfs") + { + // HDFS endpoint must end with '/' + auto endpoint = target_scheme_normalized + "://" + target_decomposed.authority; + if (!endpoint.empty() && endpoint.back() != '/') + endpoint.push_back('/'); + cfg.setString(config_prefix + ".endpoint", endpoint); + } + }); +} + +#endif + } diff --git a/src/Storages/ObjectStorage/Utils.h b/src/Storages/ObjectStorage/Utils.h index 3045c8ec74f4..2ba50591b5d2 100644 --- a/src/Storages/ObjectStorage/Utils.h +++ b/src/Storages/ObjectStorage/Utils.h @@ -2,11 +2,38 @@ #include #include +#include +#include +#include + namespace DB { class IObjectStorage; +#if USE_AVRO +/// Thread-safe wrapper for secondary object storages map +/// (now only used for Iceberg) +struct SecondaryStorages +{ + mutable std::mutex mutex; + std::map storages; +}; +#endif + +// A URI split into components +// s3://bucket/a/b -> scheme="s3", authority="bucket", path="/a/b" +// file:///var/x -> scheme="file", authority="", path="/var/x" +// /abs/p -> scheme="", authority="", path="/abs/p" +struct SchemeAuthorityKey +{ + explicit SchemeAuthorityKey(const std::string & uri); + + std::string scheme; + std::string authority; + std::string key; +}; + std::optional checkAndGetNewFileOnInsertIfNeeded( const IObjectStorage & object_storage, const StorageObjectStorageConfiguration & configuration, @@ -63,5 +90,18 @@ struct ParseFromDiskResult ParseFromDiskResult parseFromDisk(ASTs args, bool with_structure, ContextPtr context, const fs::path & prefix); +std::string makeAbsolutePath(const std::string & table_location, const std::string & path); + +#if USE_AVRO +/// Resolve object storage and key for reading from that storage +/// If path is relative -- it must be read from base_storage +/// Otherwise, look for a suitable storage in secondary_storages +std::pair resolveObjectStorageForPath( + const std::string & table_location, + const std::string & path, + const DB::ObjectStoragePtr & base_storage, + SecondaryStorages & secondary_storages, + const DB::ContextPtr & context); +#endif } diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 7e052ec9cabc..9be122792f47 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -812,10 +812,10 @@ std::function IStorageURLBase::getReadPOSTDataCallback( namespace { - class ReadBufferIterator : public IReadBufferIterator, WithContext + class StorageURLReadBufferIterator : public IReadBufferIterator, WithContext { public: - ReadBufferIterator( + StorageURLReadBufferIterator( const std::vector & urls_to_check_, std::optional format_, const CompressionMethod & compression_method_, @@ -1045,7 +1045,7 @@ std::pair IStorageURLBase::getTableStructureAndForma else urls_to_check = {uri}; - ReadBufferIterator read_buffer_iterator(urls_to_check, format, compression_method, headers, format_settings, context); + StorageURLReadBufferIterator read_buffer_iterator(urls_to_check, format, compression_method, headers, format_settings, context); if (format) return {readSchemaFromFormat(*format, format_settings, read_buffer_iterator, context), *format}; return detectFormatAndReadSchema(format_settings, read_buffer_iterator, context); diff --git a/tests/integration/test_storage_iceberg_multistorage/__init__.py b/tests/integration/test_storage_iceberg_multistorage/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_storage_iceberg_multistorage/configs/config.d/cluster.xml b/tests/integration/test_storage_iceberg_multistorage/configs/config.d/cluster.xml new file mode 100644 index 000000000000..54c08b27abe8 --- /dev/null +++ b/tests/integration/test_storage_iceberg_multistorage/configs/config.d/cluster.xml @@ -0,0 +1,20 @@ + + + + + + node1 + 9000 + + + node2 + 9000 + + + node3 + 9000 + + + + + diff --git a/tests/integration/test_storage_iceberg_multistorage/configs/config.d/named_collections.xml b/tests/integration/test_storage_iceberg_multistorage/configs/config.d/named_collections.xml new file mode 100644 index 000000000000..516e4ba63a3a --- /dev/null +++ b/tests/integration/test_storage_iceberg_multistorage/configs/config.d/named_collections.xml @@ -0,0 +1,15 @@ + + + + http://minio1:9001/root/ + minio + ClickHouse_Minio_P@ssw0rd + + + devstoreaccount1 + Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== + + + + + diff --git a/tests/integration/test_storage_iceberg_multistorage/configs/config.d/query_log.xml b/tests/integration/test_storage_iceberg_multistorage/configs/config.d/query_log.xml new file mode 100644 index 000000000000..a63e91f41fbc --- /dev/null +++ b/tests/integration/test_storage_iceberg_multistorage/configs/config.d/query_log.xml @@ -0,0 +1,6 @@ + + + system + query_log
+
+
diff --git a/tests/integration/test_storage_iceberg_multistorage/configs/users.d/users.xml b/tests/integration/test_storage_iceberg_multistorage/configs/users.d/users.xml new file mode 100644 index 000000000000..4b6ba057ecb1 --- /dev/null +++ b/tests/integration/test_storage_iceberg_multistorage/configs/users.d/users.xml @@ -0,0 +1,9 @@ + + + + + default + 1 + + + diff --git a/tests/integration/test_storage_iceberg_multistorage/test.py b/tests/integration/test_storage_iceberg_multistorage/test.py new file mode 100644 index 000000000000..5b477055b2ab --- /dev/null +++ b/tests/integration/test_storage_iceberg_multistorage/test.py @@ -0,0 +1,426 @@ +import pytest +import pyspark +import os +import shutil +import tempfile +import json +import avro.datafile +import avro.io + +from helpers.cluster import ClickHouseCluster +from helpers.s3_tools import ( + LocalUploader, + S3Uploader, + AzureUploader, + LocalDownloader, + S3Downloader, + prepare_s3_bucket, +) +from helpers.iceberg_utils import ( + get_uuid_str, + default_upload_directory, + default_download_directory, +) + +def get_spark(): + builder = ( + pyspark.sql.SparkSession.builder.appName("test_storage_iceberg_multistorage") + .config( + "spark.sql.catalog.spark_catalog", + "org.apache.iceberg.spark.SparkSessionCatalog", + ) + .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.spark_catalog.type", "hadoop") + .config("spark.sql.catalog.spark_catalog.warehouse", "/var/lib/clickhouse/user_files/iceberg_data") + .config( + "spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", + ) + .master("local") + ) + return builder.getOrCreate() + + +@pytest.fixture(scope="package") +def started_cluster(): + try: + cluster = ClickHouseCluster(__file__, with_spark=True) + cluster.add_instance( + "node1", + main_configs=[ + "configs/config.d/query_log.xml", + "configs/config.d/cluster.xml", + "configs/config.d/named_collections.xml", + ], + user_configs=["configs/users.d/users.xml"], + with_minio=True, + with_azurite=True, + stay_alive=True, + ) + + cluster.start() + + prepare_s3_bucket(cluster) + + cluster.spark_session = get_spark() + + cluster.default_s3_uploader = S3Uploader(cluster.minio_client, cluster.minio_bucket) + cluster.default_s3_downloader = S3Downloader(cluster.minio_client, cluster.minio_bucket) + + cluster.azure_container_name = "mycontainer" + cluster.blob_service_client.create_container(cluster.azure_container_name) + cluster.default_azure_uploader = AzureUploader(cluster.blob_service_client, cluster.azure_container_name) + + cluster.default_local_uploader = LocalUploader(cluster.instances["node1"]) + cluster.default_local_downloader = LocalDownloader(cluster.instances["node1"]) + + # Create extra S3 buckets for test_four_different_locations + for i in range(1, 4): + bucket_name = f"{cluster.minio_bucket}-storage{i}" + if not cluster.minio_client.bucket_exists(bucket_name): + cluster.minio_client.make_bucket(bucket_name) + + yield cluster + + finally: + cluster.shutdown() + + +def modify_avro_file(avro_path: str, field_path: list, modifier_func) -> None: + """ + Modify a field in an AVRO file, preserving the rest of it as is. + + field_path: list of keys to navigate to the field + modifier_func: function that takes old value and returns new value + """ + with open(avro_path, 'rb') as f: + reader = avro.datafile.DataFileReader(f, avro.io.DatumReader()) + schema = reader.datum_reader.writers_schema + # Preserve all file metadata (partition-spec, format-version, etc.) + metadata = dict(reader.meta) + records = list(reader) + reader.close() + + for record in records: + obj = record + for key in field_path[:-1]: + if obj is None or key not in obj: + break + obj = obj[key] + else: + if obj and field_path[-1] in obj: + obj[field_path[-1]] = modifier_func(obj[field_path[-1]]) + + with open(avro_path, 'wb') as f: + writer = avro.datafile.DataFileWriter(f, avro.io.DatumWriter(), schema) + for key, value in metadata.items(): + if not key.startswith('avro.'): + writer.set_meta(key, value) + for record in records: + writer.append(record) + writer.close() + + +def get_absolute_path(storage_type: str, cluster, relative_path: str) -> str: + """Convert relative path to absolute path for given storage type.""" + relative_path = relative_path.lstrip("/") + + if storage_type == "s3": + return f"s3a://{cluster.minio_bucket}/{relative_path}" + elif storage_type.startswith("s3:"): # s3:bucket_name format + bucket = storage_type.split(":")[1] + return f"s3a://{bucket}/{relative_path}" + elif storage_type == "azure": + return f"abfs://{cluster.azure_container_name}@{cluster.azurite_account}/{relative_path}" + elif storage_type.startswith("azure:"): # azure:container_name format + container = storage_type.split(":")[1] + return f"abfs://{container}@{cluster.azurite_account}/{relative_path}" + elif storage_type == "local": + return f"file:///{relative_path}" + else: + raise ValueError(f"Unknown storage type: {storage_type}") + + +def get_uploader(storage_type: str, cluster): + if storage_type == "s3": + return cluster.default_s3_uploader + elif storage_type.startswith("s3:"): + bucket = storage_type.split(":")[1] + return S3Uploader(cluster.minio_client, bucket) + elif storage_type == "azure": + return cluster.default_azure_uploader + elif storage_type.startswith("azure:"): + container = storage_type.split(":")[1] + return AzureUploader(cluster.blob_service_client, container) + elif storage_type == "local": + return cluster.default_local_uploader + else: + raise ValueError(f"Unknown storage type: {storage_type}") + + +def get_table_function(metadata_storage: str): + if metadata_storage == "s3" or metadata_storage.startswith("s3:"): + return "icebergS3" + elif metadata_storage == "azure" or metadata_storage.startswith("azure:"): + return "icebergAzure" + elif metadata_storage == "local": + return "icebergLocal" + else: + raise ValueError(f"Unknown storage type: {metadata_storage}") + + +def get_query_args(metadata_storage: str, cluster, table_path: str): + """Get query arguments for the iceberg table function.""" + minio_url = f"http://{cluster.minio_host}:{cluster.minio_port}" + if metadata_storage == "s3": + return f"s3, filename='{table_path}/', format=Parquet, url='{minio_url}/{cluster.minio_bucket}/'" + elif metadata_storage.startswith("s3:"): + bucket = metadata_storage.split(":")[1] + return f"s3, filename='{table_path}/', format=Parquet, url='{minio_url}/{bucket}/'" + elif metadata_storage == "azure": + return f"azure, container='{cluster.azure_container_name}', storage_account_url='{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', blob_path='{table_path}/', format=Parquet" + elif metadata_storage.startswith("azure:"): + container = metadata_storage.split(":")[1] + return f"azure, container='{container}', storage_account_url='{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', blob_path='{table_path}/', format=Parquet" + elif metadata_storage == "local": + return f"local, path='/{table_path}', format=Parquet" + else: + raise ValueError(f"Unknown storage type: {metadata_storage}") + + +def find_files(directory: str, suffix: str) -> list: + """Find files ending with given suffix.""" + result = [] + for root, _, files in os.walk(directory): + for f in files: + if f.endswith(suffix): + result.append(os.path.join(root, f)) + return result + + +def path_modifier(old_path: str, new_storage: str, cluster, base_path: str): + """Create a new absolute path for a different storage location.""" + # Extract just the filename/relative portion + if "://" in old_path: + # Parse out the path part after protocol://bucket/ + parts = old_path.split("/") + # Find where the actual path starts (after bucket) + for i, part in enumerate(parts): + if base_path.split("/")[0] in part or "var" in part: + relative = "/".join(parts[i:]) + break + else: + relative = parts[-1] + else: + relative = old_path.lstrip("/") + + return get_absolute_path(new_storage, cluster, relative) + + +# ============================================================================= +# Tests +# ============================================================================= + +STORAGE_TYPES = ["s3", "azure", "local"] + +def _get_type_family(t): + if t.startswith("s3"): + return "s3" + elif t.startswith("azure"): + return "azure" + return t + +def _generate_valid_combinations(): + """ + Generate valid storage combinations. + Rule: all components must be same type family as metadata, OR local. + Local doesn't need credentials, so S3+local and Azure+local work. + But S3+Azure doesn't work (credentials aren't interchangeable). + """ + combinations = [] + for metadata in STORAGE_TYPES: + main_family = _get_type_family(metadata) + for manifest_list in STORAGE_TYPES: + if _get_type_family(manifest_list) not in (main_family, "local"): + continue + for manifest in STORAGE_TYPES: + if _get_type_family(manifest) not in (main_family, "local"): + continue + for data in STORAGE_TYPES: + if _get_type_family(data) not in (main_family, "local"): + continue + combinations.append((metadata, manifest_list, manifest, data)) + return combinations + +VALID_COMBINATIONS = _generate_valid_combinations() + +@pytest.mark.parametrize("metadata_storage,manifest_list_storage,manifest_storage,data_storage", VALID_COMBINATIONS) +def test_multi_storage_combinations(started_cluster, metadata_storage, manifest_list_storage, manifest_storage, data_storage): + """ + Test Iceberg table with all components in different storage locations. + """ + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + + TABLE_NAME = f"test_combo_{get_uuid_str()}" + + spark.sql(f"CREATE TABLE {TABLE_NAME} (id INT, value STRING) USING iceberg OPTIONS('format-version'='2')") + spark.sql(f"INSERT INTO {TABLE_NAME} VALUES (1, 'alpha'), (2, 'beta'), (3, 'gamma')") + + # Upload to default S3 first + default_upload_directory(started_cluster, "s3", f"/iceberg_data/default/{TABLE_NAME}/", f"/iceberg_data/default/{TABLE_NAME}/") + + # Download all files + temp_dir = tempfile.mkdtemp() + host_path = os.path.join(temp_dir, TABLE_NAME) + os.makedirs(host_path, exist_ok=True) + + default_download_directory(started_cluster, "s3", f"/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}/", host_path) + + base_path = f"var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}" + metadata_dir = os.path.join(host_path, "metadata") + data_dir = os.path.join(host_path, "data") + + # Step 1: Modify manifest files to point to data_storage + manifest_files = [f for f in find_files(metadata_dir, ".avro") if not os.path.basename(f).startswith("snap-")] + for mf in manifest_files: + modify_avro_file(mf, ["data_file", "file_path"], + lambda p: path_modifier(p, data_storage, started_cluster, base_path)) + + # Step 2: Modify manifest-list files to point to manifest_storage + manifest_list_files = [f for f in find_files(metadata_dir, ".avro") if os.path.basename(f).startswith("snap-")] + for ml in manifest_list_files: + modify_avro_file(ml, ["manifest_path"], + lambda p: path_modifier(p, manifest_storage, started_cluster, base_path)) + + # Step 3: Modify metadata.json to point to manifest_list_storage + for mj in find_files(metadata_dir, ".metadata.json"): + with open(mj, 'r') as f: + data = json.load(f) + + data["location"] = get_absolute_path(metadata_storage, started_cluster, base_path) + + # Update snapshot manifest-list paths + if "snapshots" in data: + for snap in data["snapshots"]: + if "manifest-list" in snap: + snap["manifest-list"] = path_modifier(snap["manifest-list"], manifest_list_storage, started_cluster, base_path) + + with open(mj, 'w') as f: + json.dump(data, f, indent=2) + + # Step 4: Upload to respective storages + # Metadata files (*.metadata.json, version-hint.text) + meta_uploader = get_uploader(metadata_storage, started_cluster) + for f in find_files(metadata_dir, ".metadata.json") + find_files(metadata_dir, "version-hint.text"): + rel = os.path.relpath(f, host_path) + meta_uploader.upload_file(f, f"{base_path}/{rel}") + + # Manifest-list files + ml_uploader = get_uploader(manifest_list_storage, started_cluster) + for f in manifest_list_files: + rel = os.path.relpath(f, host_path) + ml_uploader.upload_file(f, f"{base_path}/{rel}") + + # Manifest files + m_uploader = get_uploader(manifest_storage, started_cluster) + for f in manifest_files: + rel = os.path.relpath(f, host_path) + m_uploader.upload_file(f, f"{base_path}/{rel}") + + # Data files + d_uploader = get_uploader(data_storage, started_cluster) + if os.path.exists(data_dir): + for f in find_files(data_dir, ".parquet"): + rel = os.path.relpath(f, host_path) + d_uploader.upload_file(f, f"{base_path}/{rel}") + + shutil.rmtree(temp_dir) + + func = get_table_function(metadata_storage) + args = get_query_args(metadata_storage, started_cluster, base_path) + + assert instance.query(f"SELECT * FROM {func}({args}) ORDER BY id") == "1\talpha\n2\tbeta\n3\tgamma\n" + + +# S3 is the primary use case for cross-bucket access. +# Azure cross-container: not supported (account_key not extractable from credential object). +def test_four_different_s3_buckets(started_cluster): + """S3: each component in a different bucket (metadata, manifest-list, manifest, data).""" + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + + TABLE_NAME = f"test_four_buckets_{get_uuid_str()}" + buckets = [ + started_cluster.minio_bucket, + f"{started_cluster.minio_bucket}-storage1", + f"{started_cluster.minio_bucket}-storage2", + f"{started_cluster.minio_bucket}-storage3", + ] + + metadata_storage = f"s3:{buckets[0]}" + manifest_list_storage = f"s3:{buckets[1]}" + manifest_storage = f"s3:{buckets[2]}" + data_storage = f"s3:{buckets[3]}" + + uploaders = {f"s3:{b}": S3Uploader(started_cluster.minio_client, b) for b in buckets} + + spark.sql(f"CREATE TABLE {TABLE_NAME} (id INT, name STRING, score INT) USING iceberg OPTIONS('format-version'='2')") + spark.sql(f"INSERT INTO {TABLE_NAME} VALUES (1, 'Alice', 100), (2, 'Bob', 85), (3, 'Carol', 92)") + + default_upload_directory(started_cluster, "s3", f"/iceberg_data/default/{TABLE_NAME}/", f"/iceberg_data/default/{TABLE_NAME}/") + + temp_dir = tempfile.mkdtemp() + host_path = os.path.join(temp_dir, TABLE_NAME) + os.makedirs(host_path, exist_ok=True) + + default_download_directory(started_cluster, "s3", f"/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}/", host_path) + + base_path = f"var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}" + metadata_dir = os.path.join(host_path, "metadata") + data_dir = os.path.join(host_path, "data") + + manifest_files = [f for f in find_files(metadata_dir, ".avro") if not os.path.basename(f).startswith("snap-")] + for mf in manifest_files: + modify_avro_file(mf, ["data_file", "file_path"], + lambda p: path_modifier(p, data_storage, started_cluster, base_path)) + + manifest_list_files = [f for f in find_files(metadata_dir, ".avro") if os.path.basename(f).startswith("snap-")] + for ml in manifest_list_files: + modify_avro_file(ml, ["manifest_path"], + lambda p: path_modifier(p, manifest_storage, started_cluster, base_path)) + + for mj in find_files(metadata_dir, ".metadata.json"): + with open(mj, 'r') as f: + data = json.load(f) + data["location"] = get_absolute_path(metadata_storage, started_cluster, base_path) + if "snapshots" in data: + for snap in data["snapshots"]: + if "manifest-list" in snap: + snap["manifest-list"] = path_modifier(snap["manifest-list"], manifest_list_storage, started_cluster, base_path) + with open(mj, 'w') as f: + json.dump(data, f, indent=2) + + for f in find_files(metadata_dir, ".metadata.json") + find_files(metadata_dir, "version-hint.text"): + rel = os.path.relpath(f, host_path) + uploaders[metadata_storage].upload_file(f, f"{base_path}/{rel}") + + for f in manifest_list_files: + rel = os.path.relpath(f, host_path) + uploaders[manifest_list_storage].upload_file(f, f"{base_path}/{rel}") + + for f in manifest_files: + rel = os.path.relpath(f, host_path) + uploaders[manifest_storage].upload_file(f, f"{base_path}/{rel}") + + if os.path.exists(data_dir): + for f in find_files(data_dir, ".parquet"): + rel = os.path.relpath(f, host_path) + uploaders[data_storage].upload_file(f, f"{base_path}/{rel}") + + shutil.rmtree(temp_dir) + + minio_url = f"http://{started_cluster.minio_host}:{started_cluster.minio_port}" + result = instance.query(f"SELECT * FROM icebergS3(s3, filename='{base_path}/', format=Parquet, url='{minio_url}/{buckets[0]}/') ORDER BY id") + + assert result == "1\tAlice\t100\n2\tBob\t85\n3\tCarol\t92\n" \ No newline at end of file diff --git a/tests/integration/test_storage_iceberg_schema_evolution/test_array_evolved_with_struct.py b/tests/integration/test_storage_iceberg_schema_evolution/test_array_evolved_with_struct.py index 5cb1c02a0c07..9a60da2b2301 100644 --- a/tests/integration/test_storage_iceberg_schema_evolution/test_array_evolved_with_struct.py +++ b/tests/integration/test_storage_iceberg_schema_evolution/test_array_evolved_with_struct.py @@ -55,7 +55,7 @@ def execute_spark_query(query: str): execute_spark_query( f""" - INSERT INTO {TABLE_NAME} VALUES (ARRAY(named_struct('name', 'Singapore', 'zip', 12345), named_struct('name', 'Moscow', 'zip', 54321)), ARRAY(1,2)); + INSERT INTO {TABLE_NAME} VALUES (ARRAY(named_struct('city', 'Singapore', 'zip', 12345), named_struct('city', 'Moscow', 'zip', 54321)), ARRAY(1,2)); """ ) diff --git a/tests/queries/0_stateless/data_minio/field_ids_complex_test/metadata/v1.metadata.json b/tests/queries/0_stateless/data_minio/field_ids_complex_test/metadata/v1.metadata.json index 8d367d20f041..a983881af8f0 100644 --- a/tests/queries/0_stateless/data_minio/field_ids_complex_test/metadata/v1.metadata.json +++ b/tests/queries/0_stateless/data_minio/field_ids_complex_test/metadata/v1.metadata.json @@ -1,7 +1,7 @@ { "format-version" : 2, "table-uuid" : "d4b695ca-ceeb-4537-8a2a-eee90dc6e313", - "location" : "s3a://test/field_ids_struct_test/metadata/field_ids_complex_test", + "location" : "s3a://test/field_ids_complex_test", "last-sequence-number" : 1, "last-updated-ms" : 1757661733693, "last-column-id" : 9, @@ -96,7 +96,7 @@ "total-position-deletes" : "0", "total-equality-deletes" : "0" }, - "manifest-list" : "s3a://test/field_ids_struct_test/metadata/field_ids_complex_test/metadata/snap-607752583403487091-1-140c8dff-1d83-4841-bc40-9aa85205b555.avro", + "manifest-list" : "s3a://test/field_ids_complex_test/metadata/snap-607752583403487091-1-140c8dff-1d83-4841-bc40-9aa85205b555.avro", "schema-id" : 0 } ], "statistics" : [ ], diff --git a/tests/queries/0_stateless/data_minio/field_ids_struct_test/metadata/v1.metadata.json b/tests/queries/0_stateless/data_minio/field_ids_struct_test/metadata/v1.metadata.json index 2d149abb44e7..d6c9079228ac 100644 --- a/tests/queries/0_stateless/data_minio/field_ids_struct_test/metadata/v1.metadata.json +++ b/tests/queries/0_stateless/data_minio/field_ids_struct_test/metadata/v1.metadata.json @@ -1,7 +1,7 @@ { "format-version" : 2, "table-uuid" : "149ecc15-7afc-4311-86b3-3a4c8d4ec08e", - "location" : "s3a://test/field_ids_struct_test/metadata/field_ids_struct_test", + "location" : "s3a://test/field_ids_struct_test", "last-sequence-number" : 1, "last-updated-ms" : 1753959190403, "last-column-id" : 6, @@ -84,7 +84,7 @@ "total-position-deletes" : "0", "total-equality-deletes" : "0" }, - "manifest-list" : "s3a://test/field_ids_struct_test/metadata/field_ids_struct_test/metadata/snap-2512638186869817292-1-ec467367-15a4-4610-8ea8-cf76797afb03.avro", + "manifest-list" : "s3a://test/field_ids_struct_test/metadata/snap-2512638186869817292-1-ec467367-15a4-4610-8ea8-cf76797afb03.avro", "schema-id" : 0 } ], "statistics" : [ ], diff --git a/tests/queries/0_stateless/data_minio/field_ids_table_test/metadata/v1.metadata.json b/tests/queries/0_stateless/data_minio/field_ids_table_test/metadata/v1.metadata.json index 32225eb618ad..1ddc3492cc82 100644 --- a/tests/queries/0_stateless/data_minio/field_ids_table_test/metadata/v1.metadata.json +++ b/tests/queries/0_stateless/data_minio/field_ids_table_test/metadata/v1.metadata.json @@ -1,7 +1,7 @@ { "format-version" : 2, "table-uuid" : "8f1f9ae2-18bb-421e-b640-ec2f85e67bce", - "location" : "s3a://test/field_ids_table_test/metadata/field_ids_table_test", + "location" : "s3a://test/field_ids_table_test", "last-sequence-number" : 1, "last-updated-ms" : 1752481476160, "last-column-id" : 1, @@ -56,7 +56,7 @@ "total-position-deletes" : "0", "total-equality-deletes" : "0" }, - "manifest-list" : "s3a://test/field_ids_table_test/metadata/field_ids_table_test/metadata/snap-2811410366534688344-1-3b002f99-b012-4041-9a97-db477fcc7115.avro", + "manifest-list" : "s3a://test/field_ids_table_test/metadata/snap-2811410366534688344-1-3b002f99-b012-4041-9a97-db477fcc7115.avro", "schema-id" : 0 } ], "statistics" : [ ],