Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,18 @@
M(ZooKeeperBytesSent, "Number of bytes send over network while communicating with ZooKeeper.", ValueType::Bytes) \
M(ZooKeeperBytesReceived, "Number of bytes received over network while communicating with ZooKeeper.", ValueType::Bytes) \
\
M(ExportPartitionZooKeeperRequests, "Total number of ZooKeeper requests made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperGet, "Number of 'get' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperGetChildren, "Number of 'getChildren' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperGetChildrenWatch, "Number of 'getChildrenWatch' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperGetWatch, "Number of 'getWatch' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperCreate, "Number of 'create' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperSet, "Number of 'set' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperRemove, "Number of 'remove' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperRemoveRecursive, "Number of 'removeRecursive' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperMulti, "Number of 'multi' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperExists, "Number of 'exists' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
\
M(DistributedConnectionTries, "Total count of distributed connection attempts.", ValueType::Number) \
M(DistributedConnectionUsable, "Total count of successful distributed connections to a usable server (with required table, but maybe stale).", ValueType::Number) \
M(DistributedConnectionFailTry, "Total count when distributed connection fails with retry.", ValueType::Number) \
Expand Down
8 changes: 8 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7368,6 +7368,14 @@ Throw an error if there are pending mutations when exporting a merge tree part.
)", 0) \
DECLARE(Bool, export_merge_tree_part_throw_on_pending_patch_parts, true, R"(
Throw an error if there are pending patch parts when exporting a merge tree part.
)", 0) \
DECLARE(Bool, export_merge_tree_partition_lock_inside_the_task, false, R"(
Only lock a part when the task is already running. This might help with busy waiting where the scheduler locks a part, but the task ends in the pending list.
On the other hand, there is a chance once the task executes that part has already been locked by another replica and the task will simply early exit.
)", 0) \
DECLARE(Bool, export_merge_tree_partition_system_table_prefer_remote_information, true, R"(
Controls whether the system.replicated_partition_exports will prefer to query ZooKeeper to get the most up to date information or use the local information.
Querying ZooKeeper is expensive, and only available if the ZooKeeper feature flag MULTI_READ is enabled.
)", 0) \
DECLARE(Timezone, iceberg_partition_timezone, "", R"(
Time zone by which partitioning of Iceberg tables was performed.
Expand Down
2 changes: 2 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
// {"output_format_parquet_write_checksums", false, true, "New setting."},
{"export_merge_tree_part_max_bytes_per_file", 0, 0, "New setting."},
{"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."},
{"export_merge_tree_partition_lock_inside_the_task", false, false, "New setting."},
{"export_merge_tree_partition_system_table_prefer_remote_information", true, true, "New setting."},
// {"cluster_table_function_split_granularity", "file", "file", "New setting."},
// {"cluster_table_function_buckets_batch_size", 0, 0, "New setting."},
{"export_merge_tree_part_throw_on_pending_mutations", true, true, "New setting."},
Expand Down
5 changes: 5 additions & 0 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ struct ExportReplicatedMergeTreePartitionManifest
size_t max_bytes_per_file;
size_t max_rows_per_file;
MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy;
bool lock_inside_the_task; /// todo temporary

std::string toJsonString() const
{
Expand All @@ -141,6 +142,7 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("create_time", create_time);
json.set("max_retries", max_retries);
json.set("ttl_seconds", ttl_seconds);
json.set("lock_inside_the_task", lock_inside_the_task);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
Expand Down Expand Up @@ -173,6 +175,7 @@ struct ExportReplicatedMergeTreePartitionManifest
manifest.parquet_parallel_encoding = json->getValue<bool>("parquet_parallel_encoding");
manifest.max_bytes_per_file = json->getValue<size_t>("max_bytes_per_file");
manifest.max_rows_per_file = json->getValue<size_t>("max_rows_per_file");

if (json->has("file_already_exists_policy"))
{
const auto file_already_exists_policy = magic_enum::enum_cast<MergeTreePartExportManifest::FileAlreadyExistsPolicy>(json->getValue<String>("file_already_exists_policy"));
Expand All @@ -184,6 +187,8 @@ struct ExportReplicatedMergeTreePartitionManifest
/// what to do if it's not a valid value?
}

manifest.lock_inside_the_task = json->getValue<bool>("lock_inside_the_task");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep manifest parsing backward-compatible

Make lock_inside_the_task optional when deserializing metadata. Existing metadata.json znodes created before this change do not contain that key, so json->getValue<bool>("lock_inside_the_task") throws during polling/system-table reads and can prevent processing of in-flight exports after upgrade until those znodes are manually cleaned up.

Useful? React with 👍 / 👎.


return manifest;
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct ExportReplicatedMergeTreePartitionTaskEntry
/// This is used to prevent the parts from being deleted before finishing the export operation
/// It does not mean this replica will export all the parts
/// There is also a chance this replica does not contain a given part and it is totally ok.
std::vector<DataPartPtr> part_references;
mutable std::vector<DataPartPtr> part_references;

std::string getCompositeKey() const
{
Expand Down
4 changes: 4 additions & 0 deletions src/Storages/MergeTree/BackgroundJobsAssignee.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ bool BackgroundJobsAssignee::scheduleCommonTask(ExecutableTaskPtr common_task, b
return schedule_res;
}

std::size_t BackgroundJobsAssignee::getAvailableMoveExecutors() const
{
return getContext()->getMovesExecutor()->getAvailableSlots();
}

String BackgroundJobsAssignee::toString(Type type)
{
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/BackgroundJobsAssignee.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class BackgroundJobsAssignee : public WithContext
bool scheduleMoveTask(ExecutableTaskPtr move_task);
bool scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger);

std::size_t getAvailableMoveExecutors() const;

/// Just call finish
~BackgroundJobsAssignee();

Expand Down
71 changes: 71 additions & 0 deletions src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#include <Storages/MergeTree/ExportPartFromPartitionExportTask.h>
#include <Common/ProfileEvents.h>

namespace ProfileEvents
{
extern const Event ExportPartitionZooKeeperRequests;
extern const Event ExportPartitionZooKeeperGetChildren;
extern const Event ExportPartitionZooKeeperCreate;
}
namespace DB
{

ExportPartFromPartitionExportTask::ExportPartFromPartitionExportTask(
StorageReplicatedMergeTree & storage_,
const std::string & key_,
const MergeTreePartExportManifest & manifest_)
: storage(storage_),
key(key_),
manifest(manifest_)
{
export_part_task = std::make_shared<ExportPartTask>(storage, manifest);
}

bool ExportPartFromPartitionExportTask::executeStep()
{
const auto zk = storage.getZooKeeper();
const auto part_name = manifest.data_part->name;

LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Attempting to lock part: {}", part_name);

ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate);
if (Coordination::Error::ZOK == zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / part_name, storage.replica_name, zkutil::CreateMode::Ephemeral))
{
LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Locked part: {}", part_name);
export_part_task->executeStep();
return false;
}

std::lock_guard inner_lock(storage.export_manifests_mutex);
storage.export_manifests.erase(manifest);

LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Failed to lock part {}, skipping", part_name);
return false;
Comment on lines +43 to +44

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Remove export manifest when lock acquisition fails

Clean up the pre-inserted export_manifests entry on the lock-failure path. In lock_inside_the_task mode, the scheduler inserts the manifest before execution, but if tryCreate(.../locks/<part>) fails here, the task exits without erasing that manifest, leaving the part permanently marked as already-exporting on this replica and blocking retries if the current lock owner dies before finishing.

Useful? React with 👍 / 👎.

}

void ExportPartFromPartitionExportTask::cancel() noexcept
{
export_part_task->cancel();
}

void ExportPartFromPartitionExportTask::onCompleted()
{
export_part_task->onCompleted();
}

StorageID ExportPartFromPartitionExportTask::getStorageID() const
{
return export_part_task->getStorageID();
}

Priority ExportPartFromPartitionExportTask::getPriority() const
{
return export_part_task->getPriority();
}

String ExportPartFromPartitionExportTask::getQueryId() const
{
return export_part_task->getQueryId();
}
}
36 changes: 36 additions & 0 deletions src/Storages/MergeTree/ExportPartFromPartitionExportTask.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once

#include <Storages/MergeTree/IExecutableTask.h>
#include <Storages/MergeTree/MergeTreePartExportManifest.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/ExportPartTask.h>

namespace DB
{

/*
Decorator around the ExportPartTask to lock the part inside the task
*/
class ExportPartFromPartitionExportTask : public IExecutableTask
{
public:
explicit ExportPartFromPartitionExportTask(
StorageReplicatedMergeTree & storage_,
const std::string & key_,
const MergeTreePartExportManifest & manifest_);
bool executeStep() override;
void onCompleted() override;
StorageID getStorageID() const override;
Priority getPriority() const override;
String getQueryId() const override;

void cancel() noexcept override;

private:
StorageReplicatedMergeTree & storage;
std::string key;
MergeTreePartExportManifest manifest;
std::shared_ptr<ExportPartTask> export_part_task;
};

}
11 changes: 11 additions & 0 deletions src/Storages/MergeTree/ExportPartTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExpo
{
}

const MergeTreePartExportManifest & ExportPartTask::getManifest() const
{
return manifest;
}

bool ExportPartTask::executeStep()
{
auto local_context = Context::createCopy(storage.getContext());
Expand Down Expand Up @@ -223,6 +228,11 @@ bool ExportPartTask::executeStep()

exec.setCancelCallback(is_cancelled_callback, 100);

if (isCancelled())
{
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Export part was cancelled");
}

exec.execute();

if (isCancelled())
Expand Down Expand Up @@ -318,6 +328,7 @@ bool ExportPartTask::executeStep()

void ExportPartTask::cancel() noexcept
{
LOG_INFO(getLogger("ExportPartTask"), "Export part {} task cancel() method called", manifest.data_part->name);
cancel_requested.store(true);
pipeline.cancel();
}
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/ExportPartTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class ExportPartTask : public IExecutableTask
StorageID getStorageID() const override;
Priority getPriority() const override;
String getQueryId() const override;
const MergeTreePartExportManifest & getManifest() const;

void cancel() noexcept override;

Expand Down
Loading
Loading