Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Core/ProtocolDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_ME
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_METADATA = 3;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO = 4;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_EXCLUDED_ROWS = 5;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_EXCLUDED_ROWS;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH = 6;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH;

static constexpr auto DATA_LAKE_TABLE_STATE_SNAPSHOT_PROTOCOL_VERSION = 1;

Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,9 @@ Use multiple threads for azure multipart upload.
)", 0) \
DECLARE(Bool, s3_throw_on_zero_files_match, false, R"(
Throw an error, when ListObjects request cannot match any files
)", 0) \
DECLARE(Bool, s3_propagate_credentials_to_other_storages, false, R"(
Allow copying base storage credentials to secondary object storages with a different endpoint. Default: 0 (credentials only copied when endpoint matches base).
)", 0) \
DECLARE(Bool, hdfs_throw_on_zero_files_match, false, R"(
Throw an error if matched zero files according to glob expansion rules.
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{
{"iceberg_partition_timezone", "", "", "New setting."},
// {"object_storage_max_nodes", 0, 0, "Antalya: New setting"},
{"s3_propagate_credentials_to_other_storages", false, false, "New setting"},
});
addSettingsChanges(settings_changes_history, "26.1",
{
Expand Down
4 changes: 3 additions & 1 deletion src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,9 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
LOG_DEBUG(log, "Has no credentials");
}
}
else if (!lightweight && table_metadata.requiresCredentials() && std::find(vended_credentials_catalogs.begin(), vended_credentials_catalogs.end(), catalog->getCatalogType()) == vended_credentials_catalogs.end())
else if (!lightweight && table_metadata.requiresCredentials()
&& std::find(vended_credentials_catalogs.begin(), vended_credentials_catalogs.end(), catalog->getCatalogType()) == vended_credentials_catalogs.end()
&& table_metadata.getStorageType() != DatabaseDataLakeStorageType::Local)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
Expand Down
6 changes: 2 additions & 4 deletions src/Interpreters/ClusterFunctionReadTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o
data_lake_metadata = object->data_lake_metadata.value();

#if USE_AVRO
if (std::dynamic_pointer_cast<IcebergDataObjectInfo>(object))
{
iceberg_info = dynamic_cast<IcebergDataObjectInfo &>(*object).info;
}
if (auto iceberg_object = std::dynamic_pointer_cast<IcebergDataObjectInfo>(object))
iceberg_info = iceberg_object->info;
#endif

const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes];
Expand Down
70 changes: 46 additions & 24 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ Plan getPlan(
const DataLakeStorageSettings & data_lake_settings,
const PersistentTableComponents & persistent_table_components,
ObjectStoragePtr object_storage,
SecondaryStorages & secondary_storages,
const String & write_format,
ContextPtr context,
CompressionMethod compression_method)
Expand Down Expand Up @@ -155,27 +156,30 @@ Plan getPlan(
std::unordered_map<String, std::shared_ptr<ManifestFilePlan>> manifest_files;
for (const auto & snapshot : snapshots_info)
{
auto manifest_list = getManifestList(object_storage, persistent_table_components, context, snapshot.manifest_list_path, log);
auto manifest_list = getManifestList(object_storage, persistent_table_components, context, snapshot.manifest_list_absolute_path, log, secondary_storages);
for (const auto & manifest_file : manifest_list)
{
plan.manifest_list_to_manifest_files[snapshot.manifest_list_path].push_back(manifest_file.manifest_file_path);
if (!plan.manifest_file_to_first_snapshot.contains(manifest_file.manifest_file_path))
plan.manifest_file_to_first_snapshot[manifest_file.manifest_file_path] = snapshot.snapshot_id;
plan.manifest_list_to_manifest_files[snapshot.manifest_list_absolute_path].push_back(manifest_file.manifest_file_absolute_path);
if (!plan.manifest_file_to_first_snapshot.contains(manifest_file.manifest_file_absolute_path))
{
plan.manifest_file_to_first_snapshot[manifest_file.manifest_file_absolute_path] = snapshot.snapshot_id;
}
auto manifest_file_content = getManifestFile(
object_storage,
persistent_table_components,
context,
log,
manifest_file.manifest_file_path,
manifest_file.manifest_file_absolute_path,
manifest_file.added_sequence_number,
manifest_file.added_snapshot_id);
manifest_file.added_snapshot_id,
secondary_storages);

if (!manifest_files.contains(manifest_file.manifest_file_path))
if (!manifest_files.contains(manifest_file.manifest_file_absolute_path))
{
manifest_files[manifest_file.manifest_file_path] = std::make_shared<ManifestFilePlan>(current_schema);
manifest_files[manifest_file.manifest_file_path]->path = manifest_file.manifest_file_path;
manifest_files[manifest_file.manifest_file_absolute_path] = std::make_shared<ManifestFilePlan>(current_schema);
manifest_files[manifest_file.manifest_file_absolute_path]->path = manifest_file.manifest_file_absolute_path;
}
manifest_files[manifest_file.manifest_file_path]->manifest_lists_path.push_back(snapshot.manifest_list_path);
manifest_files[manifest_file.manifest_file_absolute_path]->manifest_lists_path.push_back(snapshot.manifest_list_absolute_path);
auto data_files = manifest_file_content->getFilesWithoutDeleted(FileContentType::DATA);
auto positional_delete_files = manifest_file_content->getFilesWithoutDeleted(FileContentType::POSITION_DELETE);
for (const auto & pos_delete_file : positional_delete_files)
Expand All @@ -187,19 +191,23 @@ Plan getPlan(
if (plan.partitions.size() <= partition_index)
plan.partitions.push_back({});

IcebergDataObjectInfoPtr data_object_info = std::make_shared<IcebergDataObjectInfo>(data_file, 0);
auto [resolved_storage, resolved_key] = resolveObjectStorageForPath(
persistent_table_components.table_location, data_file->file_path, object_storage, secondary_storages, context);

IcebergDataObjectInfoPtr data_object_info = std::make_shared<IcebergDataObjectInfo>(*data_file, 0, resolved_storage, resolved_key);
std::shared_ptr<DataFilePlan> data_file_ptr;
if (!plan.path_to_data_file.contains(manifest_file.manifest_file_path))
std::string path_identifier = resolved_storage->getDescription() + ":" + resolved_storage->getObjectsNamespace() + "|" + resolved_key;
if (!plan.path_to_data_file.contains(path_identifier))
{
data_file_ptr = std::make_shared<DataFilePlan>(DataFilePlan{
.data_object_info = data_object_info,
.manifest_list = manifest_files[manifest_file.manifest_file_path],
.manifest_list = manifest_files[manifest_file.manifest_file_absolute_path],
.patched_path = plan.generator.generateDataFileName()});
plan.path_to_data_file[manifest_file.manifest_file_path] = data_file_ptr;
plan.path_to_data_file[path_identifier] = data_file_ptr;
}
else
{
data_file_ptr = plan.path_to_data_file[manifest_file.manifest_file_path];
data_file_ptr = plan.path_to_data_file[path_identifier];
}
plan.partitions[partition_index].push_back(data_file_ptr);
plan.snapshot_id_to_data_files[snapshot.snapshot_id].push_back(plan.partitions[partition_index].back());
Expand Down Expand Up @@ -232,7 +240,9 @@ static void writeDataFiles(
const std::optional<FormatSettings> & format_settings,
ContextPtr context,
const String & write_format,
CompressionMethod write_compression_method)
CompressionMethod write_compression_method,
const String & table_location,
SecondaryStorages & secondary_storages)
{
for (auto & [_, data_file] : initial_plan.path_to_data_file)
{
Expand All @@ -243,10 +253,15 @@ static void writeDataFiles(
format_settings,
// todo make compaction using same FormatParserSharedResources
std::make_shared<FormatParserSharedResources>(context->getSettingsRef(), 1),
context);
context,
table_location,
secondary_storages);

RelativePathWithMetadata relative_path(data_file->data_object_info->getPath());
auto read_buffer = createReadBuffer(relative_path, object_storage, context, getLogger("IcebergCompaction"));
ObjectStoragePtr storage_to_use = data_file->data_object_info->getResolvedStorage();
if (!storage_to_use)
storage_to_use = object_storage;
RelativePathWithMetadata object_info(data_file->data_object_info->getPath());
auto read_buffer = createReadBuffer(object_info, storage_to_use, context, getLogger("IcebergCompaction"));

const Settings & settings = context->getSettingsRef();
auto parser_shared_resources = std::make_shared<FormatParserSharedResources>(
Expand Down Expand Up @@ -400,6 +415,9 @@ void writeMetadataFiles(
{
manifest_entry->patched_path = plan.generator.generateManifestEntryName();
manifest_file_renamings[manifest_entry->path] = manifest_entry->patched_path.path_in_metadata;

std::vector<String> manifest_data_filenames(data_filenames.begin(), data_filenames.end());

auto buffer_manifest_entry = object_storage->writeObject(
StoredObject(manifest_entry->patched_path.path_in_storage),
WriteMode::Rewrite,
Expand All @@ -417,7 +435,7 @@ void writeMetadataFiles(
partition_columns,
plan.partition_encoder.getPartitionValue(grouped_by_manifest_files_partitions[manifest_entry]),
ChunkPartitioner(fields_from_partition_spec, current_schema, context, sample_block_).getResultTypes(),
std::vector(data_filenames.begin(), data_filenames.end()),
manifest_data_filenames,
manifest_entry->statistics,
sample_block_,
snapshot,
Expand All @@ -438,17 +456,17 @@ void writeMetadataFiles(
if (plan.history[i].added_files == 0)
continue;

manifest_list_renamings[plan.history[i].manifest_list_path] = new_snapshots[i].metadata_path;
manifest_list_renamings[plan.history[i].manifest_list_absolute_path] = new_snapshots[i].metadata_path;
}

for (size_t i = 0; i < plan.history.size(); ++i)
{
if (plan.history[i].added_files == 0)
continue;

auto initial_manifest_list_name = plan.history[i].manifest_list_path;
auto initial_manifest_list_name = plan.history[i].manifest_list_absolute_path;
auto initial_manifest_entries = plan.manifest_list_to_manifest_files[initial_manifest_list_name];
auto renamed_manifest_list = manifest_list_renamings[initial_manifest_list_name];
auto renamed_manifest_list = manifest_list_renamings[plan.history[i].manifest_list_absolute_path];
std::vector<String> renamed_manifest_entries;
Int32 total_manifest_file_sizes = 0;
for (const auto & initial_manifest_entry : initial_manifest_entries)
Expand Down Expand Up @@ -516,6 +534,7 @@ void compactIcebergTable(
IcebergHistory snapshots_info,
const PersistentTableComponents & persistent_table_components,
ObjectStoragePtr object_storage_,
SecondaryStorages & secondary_storages_,
const DataLakeStorageSettings & data_lake_settings,
const std::optional<FormatSettings> & format_settings_,
SharedHeader sample_block_,
Expand All @@ -527,6 +546,7 @@ void compactIcebergTable(
data_lake_settings,
persistent_table_components,
object_storage_,
secondary_storages_,
write_format,
context_,
persistent_table_components.metadata_compression_method);
Expand All @@ -540,7 +560,9 @@ void compactIcebergTable(
format_settings_,
context_,
write_format,
persistent_table_components.metadata_compression_method);
persistent_table_components.metadata_compression_method,
persistent_table_components.table_location,
secondary_storages_);
writeMetadataFiles(plan, object_storage_, context_, sample_block_, write_format, persistent_table_components.table_path);
clearOldFiles(object_storage_, old_files);
}
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <Storages/ObjectStorage/DataLakes/Iceberg/PersistentTableComponents.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/Utils.h>


namespace DB::Iceberg
Expand All @@ -15,6 +16,7 @@ void compactIcebergTable(
IcebergHistory snapshots_info,
const PersistentTableComponents & persistent_table_components,
DB::ObjectStoragePtr object_storage_,
SecondaryStorages & secondary_storages_,
const DataLakeStorageSettings & data_lake_settings,
const std::optional<DB::FormatSettings> & format_settings_,
DB::SharedHeader sample_block_,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <Poco/String.h>
#include "config.h"

#include <Core/Settings.h>
Expand All @@ -13,6 +14,7 @@

#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Core/ProtocolDefines.h>

namespace DB::ErrorCodes
{
Expand All @@ -33,16 +35,22 @@ extern const SettingsBool use_roaring_bitmap_iceberg_positional_deletes;

#if USE_AVRO

IcebergDataObjectInfo::IcebergDataObjectInfo(Iceberg::ManifestFileEntryPtr data_manifest_file_entry_, Int32 schema_id_relevant_to_iterator_)
: ObjectInfo(RelativePathWithMetadata(data_manifest_file_entry_->file_path))
IcebergDataObjectInfo::IcebergDataObjectInfo(
const Iceberg::ManifestFileEntry & data_manifest_file_entry_,
Int32 schema_id_relevant_to_iterator_,
ObjectStoragePtr resolved_storage_,
const String & resolved_key_)
: ObjectInfo(RelativePathWithMetadata(resolved_key_.empty() ? data_manifest_file_entry_.file_path : resolved_key_))
, info{
data_manifest_file_entry_->file_path_key,
data_manifest_file_entry_->schema_id,
schema_id_relevant_to_iterator_,
data_manifest_file_entry_->added_sequence_number,
data_manifest_file_entry_->file_format,
/* position_deletes_objects */ {},
/* equality_deletes_objects */ {}}
data_manifest_file_entry_.file_path_from_metadata,
resolved_storage_ ? data_manifest_file_entry_.file_path : data_manifest_file_entry_.file_path_from_metadata, // absolute path
data_manifest_file_entry_.schema_id,
schema_id_relevant_to_iterator_,
data_manifest_file_entry_.added_sequence_number,
data_manifest_file_entry_.file_format,
/* position_deletes_objects */ {},
/* equality_deletes_objects */ {}}
, resolved_storage(std::move(resolved_storage_))
{
}

Expand All @@ -56,13 +64,15 @@ std::shared_ptr<ISimpleTransform> IcebergDataObjectInfo::getPositionDeleteTransf
const SharedHeader & header,
const std::optional<FormatSettings> & format_settings,
FormatParserSharedResourcesPtr parser_shared_resources,
ContextPtr context_)
ContextPtr context_,
const String & table_location,
SecondaryStorages & secondary_storages)
{
IcebergDataObjectInfoPtr self = shared_from_this();
if (!context_->getSettingsRef()[Setting::use_roaring_bitmap_iceberg_positional_deletes].value)
return std::make_shared<IcebergStreamingPositionDeleteTransform>(header, self, object_storage, format_settings, parser_shared_resources, context_);
return std::make_shared<IcebergStreamingPositionDeleteTransform>(header, self, object_storage, format_settings, parser_shared_resources, context_, table_location, secondary_storages);
else
return std::make_shared<IcebergBitmapPositionDeleteTransform>(header, self, object_storage, format_settings, parser_shared_resources, context_);
return std::make_shared<IcebergBitmapPositionDeleteTransform>(header, self, object_storage, format_settings, parser_shared_resources, context_, table_location, secondary_storages);
}

void IcebergDataObjectInfo::addPositionDeleteObject(Iceberg::ManifestFileEntryPtr position_delete_object)
Expand Down Expand Up @@ -91,7 +101,11 @@ void IcebergDataObjectInfo::addEqualityDeleteObject(const Iceberg::ManifestFileE
void IcebergObjectSerializableInfo::serializeForClusterFunctionProtocol(WriteBuffer & out, size_t protocol_version) const
{
checkVersion(protocol_version);
writeStringBinary(data_object_file_path_key, out);
writeStringBinary(data_object_file_path_from_metadata, out);
if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH)
{
writeStringBinary(data_object_file_absolute_path, out);
}
writeVarInt(underlying_format_read_schema_id, out);
writeVarInt(schema_id_relevant_to_iterator, out);
writeVarInt(sequence_number, out);
Expand Down Expand Up @@ -140,7 +154,11 @@ void IcebergObjectSerializableInfo::serializeForClusterFunctionProtocol(WriteBuf
void IcebergObjectSerializableInfo::deserializeForClusterFunctionProtocol(ReadBuffer & in, size_t protocol_version)
{
checkVersion(protocol_version);
readStringBinary(data_object_file_path_key, in);
readStringBinary(data_object_file_path_from_metadata, in);
if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH)
{
readStringBinary(data_object_file_absolute_path, in);
}
readVarInt(underlying_format_read_schema_id, in);
readVarInt(schema_id_relevant_to_iterator, in);
readVarInt(sequence_number, in);
Expand Down
Loading
Loading