-
Notifications
You must be signed in to change notification settings - Fork 16
Improvements to partition export #1402
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: antalya-26.1
Are you sure you want to change the base?
Changes from all commits
5c09119
68ae542
4d98ec9
0fdc716
f2ae2cc
a759957
122fdc3
16ae536
6d74239
51fc82f
645c359
0d7768b
060ce68
a49476d
4601ae1
6fad4e2
f1a8d15
ee5a230
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| #include <Storages/MergeTree/ExportPartFromPartitionExportTask.h> | ||
| #include <Common/ProfileEvents.h> | ||
|
|
||
| namespace ProfileEvents | ||
| { | ||
| extern const Event ExportPartitionZooKeeperRequests; | ||
| extern const Event ExportPartitionZooKeeperGetChildren; | ||
| extern const Event ExportPartitionZooKeeperCreate; | ||
| } | ||
| namespace DB | ||
| { | ||
|
|
||
| ExportPartFromPartitionExportTask::ExportPartFromPartitionExportTask( | ||
| StorageReplicatedMergeTree & storage_, | ||
| const std::string & key_, | ||
| const MergeTreePartExportManifest & manifest_) | ||
| : storage(storage_), | ||
| key(key_), | ||
| manifest(manifest_) | ||
| { | ||
| export_part_task = std::make_shared<ExportPartTask>(storage, manifest); | ||
| } | ||
|
|
||
| bool ExportPartFromPartitionExportTask::executeStep() | ||
| { | ||
| const auto zk = storage.getZooKeeper(); | ||
| const auto part_name = manifest.data_part->name; | ||
|
|
||
| LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Attempting to lock part: {}", part_name); | ||
|
|
||
| ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); | ||
| ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate); | ||
| if (Coordination::Error::ZOK == zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / part_name, storage.replica_name, zkutil::CreateMode::Ephemeral)) | ||
| { | ||
| LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Locked part: {}", part_name); | ||
| export_part_task->executeStep(); | ||
| return false; | ||
| } | ||
|
|
||
| std::lock_guard inner_lock(storage.export_manifests_mutex); | ||
| storage.export_manifests.erase(manifest); | ||
|
|
||
| LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Failed to lock part {}, skipping", part_name); | ||
| return false; | ||
|
Comment on lines
+43
to
+44
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Clean up the pre-inserted Useful? React with 👍 / 👎. |
||
| } | ||
|
|
||
| void ExportPartFromPartitionExportTask::cancel() noexcept | ||
| { | ||
| export_part_task->cancel(); | ||
| } | ||
|
|
||
| void ExportPartFromPartitionExportTask::onCompleted() | ||
| { | ||
| export_part_task->onCompleted(); | ||
| } | ||
|
|
||
| StorageID ExportPartFromPartitionExportTask::getStorageID() const | ||
| { | ||
| return export_part_task->getStorageID(); | ||
| } | ||
|
|
||
| Priority ExportPartFromPartitionExportTask::getPriority() const | ||
| { | ||
| return export_part_task->getPriority(); | ||
| } | ||
|
|
||
| String ExportPartFromPartitionExportTask::getQueryId() const | ||
| { | ||
| return export_part_task->getQueryId(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| #pragma once | ||
|
|
||
| #include <Storages/MergeTree/IExecutableTask.h> | ||
| #include <Storages/MergeTree/MergeTreePartExportManifest.h> | ||
| #include <Storages/StorageReplicatedMergeTree.h> | ||
| #include <Storages/MergeTree/ExportPartTask.h> | ||
|
|
||
| namespace DB | ||
| { | ||
|
|
||
| /* | ||
| Decorator around the ExportPartTask to lock the part inside the task | ||
| */ | ||
| class ExportPartFromPartitionExportTask : public IExecutableTask | ||
| { | ||
| public: | ||
| explicit ExportPartFromPartitionExportTask( | ||
| StorageReplicatedMergeTree & storage_, | ||
| const std::string & key_, | ||
| const MergeTreePartExportManifest & manifest_); | ||
| bool executeStep() override; | ||
| void onCompleted() override; | ||
| StorageID getStorageID() const override; | ||
| Priority getPriority() const override; | ||
| String getQueryId() const override; | ||
|
|
||
| void cancel() noexcept override; | ||
|
|
||
| private: | ||
| StorageReplicatedMergeTree & storage; | ||
| std::string key; | ||
| MergeTreePartExportManifest manifest; | ||
| std::shared_ptr<ExportPartTask> export_part_task; | ||
| }; | ||
|
|
||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make
lock_inside_the_taskoptional when deserializing metadata. Existingmetadata.jsonznodes created before this change do not contain that key, sojson->getValue<bool>("lock_inside_the_task")throws during polling/system-table reads and can prevent processing of in-flight exports after upgrade until those znodes are manually cleaned up.Useful? React with 👍 / 👎.