diff --git a/docs/en/engines/database-engines/datalake.md b/docs/en/engines/database-engines/datalake.md index b7e3f2c05a52..9c20511172b3 100644 --- a/docs/en/engines/database-engines/datalake.md +++ b/docs/en/engines/database-engines/datalake.md @@ -46,21 +46,22 @@ catalog_type, The following settings are supported: -| Setting | Description | -|-------------------------|-----------------------------------------------------------------------------------------| -| `catalog_type` | Type of catalog: `glue`, `unity` (Delta), `rest` (Iceberg), `hive`, `onelake` (Iceberg) | -| `warehouse` | The warehouse/database name to use in the catalog. | -| `catalog_credential` | Authentication credential for the catalog (e.g., API key or token) | -| `auth_header` | Custom HTTP header for authentication with the catalog service | -| `auth_scope` | OAuth2 scope for authentication (if using OAuth) | -| `storage_endpoint` | Endpoint URL for the underlying storage | -| `oauth_server_uri` | URI of the OAuth2 authorization server for authentication | +| Setting | Description | +|-------------------------|-----------------------------------------------------------------------------------------------| +| `catalog_type` | Type of catalog: `glue`, `unity` (Delta), `rest` (Iceberg), `hive`, `onelake` (Iceberg) | +| `warehouse` | The warehouse/database name to use in the catalog. | +| `catalog_credential` | Authentication credential for the catalog (e.g., API key or token) | +| `auth_header` | Custom HTTP header for authentication with the catalog service | +| `auth_scope` | OAuth2 scope for authentication (if using OAuth) | +| `storage_endpoint` | Endpoint URL for the underlying storage | +| `oauth_server_uri` | URI of the OAuth2 authorization server for authentication | | `vended_credentials` | Boolean indicating whether to use vended credentials from the catalog (supports AWS S3 and Azure ADLS Gen2) | -| `aws_access_key_id` | AWS access key ID for S3/Glue access (if not using vended credentials) | -| `aws_secret_access_key` | AWS secret access key for S3/Glue access (if not using vended credentials) | -| `region` | AWS region for the service (e.g., `us-east-1`) | -| `dlf_access_key_id` | Access key ID for DLF access | -| `dlf_access_key_secret` | Access key Secret for DLF access | +| `aws_access_key_id` | AWS access key ID for S3/Glue access (if not using vended credentials) | +| `aws_secret_access_key` | AWS secret access key for S3/Glue access (if not using vended credentials) | +| `region` | AWS region for the service (e.g., `us-east-1`) | +| `dlf_access_key_id` | Access key ID for DLF access | +| `dlf_access_key_secret` | Access key Secret for DLF access | +| `namespaces` | Comma-separated list of namespaces, implemented for catalog types: `rest`, `glue` and `unity` | ## Examples {#examples} @@ -83,4 +84,30 @@ SETTINGS onelake_client_secret = client_secret; SHOW TABLES IN databse_name; SELECT count() from database_name.table_name; -``` \ No newline at end of file +``` + +## Namespace filter {#namespace} + +By default, ClickHouse reads tables from all namespaces available in the catalog. You can limit this behavior using the `namespaces` database setting. The value should be a comma‑separated list of namespaces that are allowed to be read. + +Supported catalog types are `rest`, `glue` and `unity`. + +For example, if the catalog contains three namespaces - `dev`, `stage`, and `prod` - and you want to read data only from dev and stage, set: +``` +namespaces='dev,stage' +``` + +### Nested namespaces {#namespace-nested} + +The Iceberg (`rest`) catalog supports nested namespaces. The `namespaces` filter accepts the following patterns: + +- `namespace` - includes tables from the specified namespace, but not from its nested namespaces. +- `namespace.nested` - includes tables from the nested namespace, but not from the parent. +- `namespace.*` - includes tables from all nested namespaces, but not from the parent. + +If you need to include both a namespace and its nested namespaces, specify both explicitly. For example: +``` +namespaces='namespace,namespace.*' +``` + +The default value is '*', which means all namespaces are included. diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 8b32f9be80b7..0f1f44dfa7f8 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -645,6 +645,7 @@ M(763, SESSION_REFUSED) \ M(764, DEDUPLICATION_IS_NOT_POSSIBLE) \ M(765, UNKNOWN_MASKING_POLICY) \ + M(766, CATALOG_NAMESPACE_DISABLED) \ \ M(900, DISTRIBUTED_CACHE_ERROR) \ M(901, CANNOT_USE_DISTRIBUTED_CACHE) \ diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index d617751a3eb6..577e9a3764d9 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -63,6 +63,7 @@ namespace DatabaseDataLakeSetting extern const DatabaseDataLakeSettingsString onelake_client_secret; extern const DatabaseDataLakeSettingsString dlf_access_key_id; extern const DatabaseDataLakeSettingsString dlf_access_key_secret; + extern const DatabaseDataLakeSettingsString namespaces; } namespace Setting @@ -142,6 +143,7 @@ std::shared_ptr DatabaseDataLake::getCatalog() const .aws_access_key_id = settings[DatabaseDataLakeSetting::aws_access_key_id].value, .aws_secret_access_key = settings[DatabaseDataLakeSetting::aws_secret_access_key].value, .region = settings[DatabaseDataLakeSetting::region].value, + .namespaces = settings[DatabaseDataLakeSetting::namespaces].value }; switch (settings[DatabaseDataLakeSetting::catalog_type].value) @@ -156,6 +158,7 @@ std::shared_ptr DatabaseDataLake::getCatalog() const settings[DatabaseDataLakeSetting::auth_header], settings[DatabaseDataLakeSetting::oauth_server_uri].value, settings[DatabaseDataLakeSetting::oauth_server_use_request_body].value, + settings[DatabaseDataLakeSetting::namespaces].value, Context::getGlobalContextInstance()); break; } @@ -179,6 +182,7 @@ std::shared_ptr DatabaseDataLake::getCatalog() const settings[DatabaseDataLakeSetting::warehouse].value, url, settings[DatabaseDataLakeSetting::catalog_credential].value, + settings[DatabaseDataLakeSetting::namespaces].value, Context::getGlobalContextInstance()); break; } @@ -274,24 +278,24 @@ std::shared_ptr DatabaseDataLake::getConfigur #if USE_AWS_S3 case DB::DatabaseDataLakeStorageType::S3: { - return std::make_shared(storage_settings); + return std::make_shared(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value); } #endif #if USE_AZURE_BLOB_STORAGE case DB::DatabaseDataLakeStorageType::Azure: { - return std::make_shared(storage_settings); + return std::make_shared(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value); } #endif #if USE_HDFS case DB::DatabaseDataLakeStorageType::HDFS: { - return std::make_shared(storage_settings); + return std::make_shared(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value); } #endif case DB::DatabaseDataLakeStorageType::Local: { - return std::make_shared(storage_settings); + return std::make_shared(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value); } /// Fake storage in case when catalog store not only /// primary-type tables (DeltaLake or Iceberg), but for @@ -303,7 +307,7 @@ std::shared_ptr DatabaseDataLake::getConfigur /// dependencies and the most lightweight case DB::DatabaseDataLakeStorageType::Other: { - return std::make_shared(storage_settings); + return std::make_shared(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value); } #if !USE_AWS_S3 || !USE_AZURE_BLOB_STORAGE || !USE_HDFS default: @@ -320,7 +324,7 @@ std::shared_ptr DatabaseDataLake::getConfigur #if USE_AWS_S3 case DB::DatabaseDataLakeStorageType::S3: { - return std::make_shared(storage_settings); + return std::make_shared(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value); } #endif #if USE_AZURE_BLOB_STORAGE @@ -331,7 +335,7 @@ std::shared_ptr DatabaseDataLake::getConfigur #endif case DB::DatabaseDataLakeStorageType::Local: { - return std::make_shared(storage_settings); + return std::make_shared(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value); } /// Fake storage in case when catalog store not only /// primary-type tables (DeltaLake or Iceberg), but for @@ -343,7 +347,7 @@ std::shared_ptr DatabaseDataLake::getConfigur /// dependencies and the most lightweight case DB::DatabaseDataLakeStorageType::Other: { - return std::make_shared(storage_settings); + return std::make_shared(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value); } default: throw Exception(ErrorCodes::BAD_ARGUMENTS, @@ -358,12 +362,12 @@ std::shared_ptr DatabaseDataLake::getConfigur #if USE_AWS_S3 case DB::DatabaseDataLakeStorageType::S3: { - return std::make_shared(storage_settings); + return std::make_shared(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value); } #endif case DB::DatabaseDataLakeStorageType::Other: { - return std::make_shared(storage_settings); + return std::make_shared(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value); } default: throw Exception(ErrorCodes::BAD_ARGUMENTS, diff --git a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp index d953e11b9d39..c5e70cc9b4df 100644 --- a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp +++ b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp @@ -34,6 +34,7 @@ namespace ErrorCodes DECLARE(String, onelake_client_secret, "", "Client secret from azure", 0) \ DECLARE(String, dlf_access_key_id, "", "Access id of DLF token for Paimon REST Catalog", 0) \ DECLARE(String, dlf_access_key_secret, "", "Access secret of DLF token for Paimon REST Catalog", 0) \ + DECLARE(String, namespaces, "*", "Comma-separated list of allowed namespaces", 0) \ #define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \ DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS) \ diff --git a/src/Databases/DataLake/GlueCatalog.cpp b/src/Databases/DataLake/GlueCatalog.cpp index 0bb26afaafa1..b17c3b9b50df 100644 --- a/src/Databases/DataLake/GlueCatalog.cpp +++ b/src/Databases/DataLake/GlueCatalog.cpp @@ -56,6 +56,7 @@ namespace DB::ErrorCodes { extern const int BAD_ARGUMENTS; extern const int DATALAKE_DATABASE_ERROR; + extern const int CATALOG_NAMESPACE_DISABLED; } namespace DB::Setting @@ -80,14 +81,6 @@ namespace DB::StorageObjectStorageSetting extern const StorageObjectStorageSettingsString iceberg_metadata_file_path; } -namespace DB::DatabaseDataLakeSetting -{ - extern const DatabaseDataLakeSettingsString storage_endpoint; - extern const DatabaseDataLakeSettingsString aws_access_key_id; - extern const DatabaseDataLakeSettingsString aws_secret_access_key; - extern const DatabaseDataLakeSettingsString region; -} - namespace CurrentMetrics { extern const Metric MarkCacheBytes; @@ -175,6 +168,7 @@ GlueCatalog::GlueCatalog( glue_client = std::make_unique(credentials_provider, endpoint_provider, client_configuration); } + boost::split(allowed_namespaces, settings.namespaces, boost::is_any_of(", "), boost::token_compress_on); } GlueCatalog::~GlueCatalog() = default; @@ -200,8 +194,9 @@ DataLake::ICatalog::Namespaces GlueCatalog::getDatabases(const std::string & pre for (const auto & db : dbs) { const auto & db_name = db.GetName(); - if (!db_name.starts_with(prefix)) + if (!isNamespaceAllowed(db_name) || !db_name.starts_with(prefix)) continue; + result.push_back(db_name); if (limit != 0 && result.size() >= limit) break; @@ -281,6 +276,9 @@ DB::Names GlueCatalog::getTables() const bool GlueCatalog::existsTable(const std::string & database_name, const std::string & table_name) const { + if (!isNamespaceAllowed(database_name)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, "Namespace {} is filtered by `namespaces` database parameter", database_name); + Aws::Glue::Model::GetTableRequest request; request.SetDatabaseName(database_name); request.SetName(table_name); @@ -294,6 +292,9 @@ bool GlueCatalog::tryGetTableMetadata( const std::string & table_name, TableMetadata & result) const { + if (!isNamespaceAllowed(database_name)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, "Namespace {} is filtered by `namespaces` database parameter", database_name); + Aws::Glue::Model::GetTableRequest request; request.SetDatabaseName(database_name); request.SetName(table_name); @@ -512,7 +513,7 @@ GlueCatalog::ObjectStorageWithPath GlueCatalog::createObjectStorageForEarlyTable auto storage_settings = std::make_shared(); storage_settings->loadFromSettingsChanges(settings.allChanged()); - auto configuration = std::make_shared(storage_settings); + auto configuration = std::make_shared(storage_settings, settings.namespaces); DB::StorageObjectStorageConfiguration::initialize(*configuration, args, getContext(), false); auto object_storage = configuration->createObjectStorage(getContext(), true); @@ -580,6 +581,11 @@ void GlueCatalog::createNamespaceIfNotExists(const String & namespace_name) cons void GlueCatalog::createTable(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr /*metadata_content*/) const { + if (!isNamespaceAllowed(namespace_name)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, + "Failed to create table {}, namespace {} is filtered by `namespaces` database parameter", + table_name, namespace_name); + createNamespaceIfNotExists(namespace_name); Aws::Glue::Model::CreateTableRequest request; @@ -652,6 +658,11 @@ bool GlueCatalog::updateMetadata(const String & namespace_name, const String & t void GlueCatalog::dropTable(const String & namespace_name, const String & table_name) const { + if (!isNamespaceAllowed(namespace_name)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, + "Failed to drop table {}, namespace {} is filtered by `namespaces` database parameter", + table_name, namespace_name); + Aws::Glue::Model::DeleteTableRequest request; request.SetDatabaseName(namespace_name); request.SetName(table_name); @@ -665,6 +676,11 @@ void GlueCatalog::dropTable(const String & namespace_name, const String & table_ response.GetError().GetMessage()); } +bool GlueCatalog::isNamespaceAllowed(const std::string & namespace_) const +{ + return allowed_namespaces.contains("*") || allowed_namespaces.contains(namespace_); +} + } #endif diff --git a/src/Databases/DataLake/GlueCatalog.h b/src/Databases/DataLake/GlueCatalog.h index 7392cdfb4afd..d6980d20b349 100644 --- a/src/Databases/DataLake/GlueCatalog.h +++ b/src/Databases/DataLake/GlueCatalog.h @@ -73,6 +73,9 @@ class GlueCatalog final : public ICatalog, private DB::WithContext std::string region; CatalogSettings settings; DB::ASTPtr table_engine_definition; + std::unordered_set allowed_namespaces; + + bool isNamespaceAllowed(const std::string & namespace_) const; DataLake::ICatalog::Namespaces getDatabases(const std::string & prefix, size_t limit = 0) const; DB::Names getTablesForDatabase(const std::string & db_name, size_t limit = 0) const; diff --git a/src/Databases/DataLake/ICatalog.h b/src/Databases/DataLake/ICatalog.h index 5b4e481f01e7..ff5eab9b2210 100644 --- a/src/Databases/DataLake/ICatalog.h +++ b/src/Databases/DataLake/ICatalog.h @@ -120,6 +120,7 @@ struct CatalogSettings String aws_access_key_id; String aws_secret_access_key; String region; + String namespaces; DB::SettingsChanges allChanged() const; }; diff --git a/src/Databases/DataLake/RestCatalog.cpp b/src/Databases/DataLake/RestCatalog.cpp index 6229e84eca84..8d1a08a01416 100644 --- a/src/Databases/DataLake/RestCatalog.cpp +++ b/src/Databases/DataLake/RestCatalog.cpp @@ -41,6 +41,7 @@ namespace DB::ErrorCodes extern const int DATALAKE_DATABASE_ERROR; extern const int LOGICAL_ERROR; extern const int BAD_ARGUMENTS; + extern const int CATALOG_NAMESPACE_DISABLED; } namespace DataLake @@ -129,6 +130,7 @@ RestCatalog::RestCatalog( const std::string & auth_header_, const std::string & oauth_server_uri_, bool oauth_server_use_request_body_, + const std::string & namespaces_, DB::ContextPtr context_) : ICatalog(warehouse_) , DB::WithContext(context_) @@ -137,6 +139,7 @@ RestCatalog::RestCatalog( , auth_scope(auth_scope_) , oauth_server_uri(oauth_server_uri_) , oauth_server_use_request_body(oauth_server_use_request_body_) + , allowed_namespaces(namespaces_) { if (!catalog_credential_.empty()) { @@ -375,6 +378,8 @@ bool RestCatalog::empty() const bool found_table = false; auto stop_condition = [&](const std::string & namespace_name) -> bool { + if (!allowed_namespaces.isNamespaceAllowed(namespace_name, /*nested*/ false)) + return false; const auto tables = getTables(namespace_name, /* limit */1); found_table = !tables.empty(); return found_table; @@ -399,6 +404,8 @@ DB::Names RestCatalog::getTables() const runner.enqueueAndKeepTrack( [=, &tables, &mutex, this] { + if (!allowed_namespaces.isNamespaceAllowed(current_namespace, /*nested*/ false)) + return; auto tables_in_namespace = getTables(current_namespace); std::lock_guard lock(mutex); std::move(tables_in_namespace.begin(), tables_in_namespace.end(), std::back_inserter(tables)); @@ -436,9 +443,21 @@ void RestCatalog::getNamespacesRecursive( break; if (func) - func(current_namespace); + { + if (allowed_namespaces.isNamespaceAllowed(current_namespace, /*nested*/ false)) + func(current_namespace); + else + { + LOG_DEBUG(log, "Tables in namespace {} are filtered", current_namespace); + } + } - getNamespacesRecursive(current_namespace, result, stop_condition, func); + if (allowed_namespaces.isNamespaceAllowed(current_namespace, /*nested*/ true)) + getNamespacesRecursive(current_namespace, result, stop_condition, func); + else + { + LOG_DEBUG(log, "Nested namespaces in namespace {} are filtered", current_namespace); + } } } @@ -541,6 +560,10 @@ RestCatalog::Namespaces RestCatalog::parseNamespaces(DB::ReadBuffer & buf, const DB::Names RestCatalog::getTables(const std::string & base_namespace, size_t limit) const { + if (!allowed_namespaces.isNamespaceAllowed(base_namespace, /*nested*/ false)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, + "Namespace {} is filtered by `namespaces` database parameter", base_namespace); + auto encoded_namespace = encodeNamespaceForURI(base_namespace); const std::string endpoint = std::filesystem::path(NAMESPACES_ENDPOINT) / encoded_namespace / "tables"; @@ -610,6 +633,8 @@ bool RestCatalog::tryGetTableMetadata( } catch (const DB::Exception & ex) { + if (ex.code() == DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED) + throw; LOG_DEBUG(log, "tryGetTableMetadata response: {}", ex.what()); return false; } @@ -631,6 +656,10 @@ bool RestCatalog::getTableMetadataImpl( { LOG_DEBUG(log, "Checking table {} in namespace {}", table_name, namespace_name); + if (!allowed_namespaces.isNamespaceAllowed(namespace_name, /*nested*/ false)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, + "Namespace {} is filtered by `namespaces` database parameter", namespace_name); + DB::HTTPHeaderEntries headers; if (result.requiresCredentials()) { @@ -840,6 +869,10 @@ void RestCatalog::createNamespaceIfNotExists(const String & namespace_name, cons void RestCatalog::createTable(const String & namespace_name, const String & table_name, const String & /*new_metadata_path*/, Poco::JSON::Object::Ptr metadata_content) const { + if (!allowed_namespaces.isNamespaceAllowed(namespace_name, /*nested*/ false)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, + "Failed to create table {}, namespace {} is filtered by `namespaces` database parameter", table_name, namespace_name); + createNamespaceIfNotExists(namespace_name, metadata_content->getValue("location")); const std::string endpoint = fmt::format("{}/namespaces/{}/tables", base_url, namespace_name); @@ -944,6 +977,11 @@ bool RestCatalog::updateMetadata(const String & namespace_name, const String & t void RestCatalog::dropTable(const String & namespace_name, const String & table_name) const { + if (!allowed_namespaces.isNamespaceAllowed(namespace_name, /*nested*/ false)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, + "Failed to drop table {}, namespace {} is filtered by `namespaces` database parameter", + table_name, namespace_name); + const std::string endpoint = fmt::format("{}/namespaces/{}/tables/{}?purgeRequested=False", base_url, namespace_name, table_name); Poco::JSON::Object::Ptr request_body = nullptr; @@ -957,6 +995,69 @@ void RestCatalog::dropTable(const String & namespace_name, const String & table_ } } +/// "alpha,alpha.a1,bravo,bravo.*,charlie,delta.d1,echo.*" +/// allows tables from +/// - "alpha" namespace +/// - "alpha.a1" namespace +/// - "bravo" namespace +/// - any nested namespaces of "bravo" +/// - "charlie" namespace, but not from nested of "charlie" +/// - "delta.d1" namespace, but not from "delta" +/// - any nested namespaces of "echo", but not "echo" itself +/// "bravo.*.b2" makes no sense for now, asterisk allows all nested +RestCatalog::AllowedNamespaces::AllowedNamespaces(const std::string & namespaces_) +{ + std::vector list_of_namespaces; + boost::split(list_of_namespaces, namespaces_, boost::is_any_of(", "), boost::token_compress_on); + for (const auto & ns : list_of_namespaces) + { + std::vector list_of_nested_namespaces; + boost::split(list_of_nested_namespaces, ns, boost::is_any_of(".")); + + size_t len = list_of_nested_namespaces.size(); + if (!len) + continue; + + AllowedNamespaces * current = &(nested_namespaces[list_of_nested_namespaces[0]]); + for (size_t i = 1; i <= len; ++i) + { + if (i == len) + current->allow_tables = true; + else + { + current = &(current->nested_namespaces[list_of_nested_namespaces[i]]); + if (list_of_nested_namespaces[i] == "*") + { + current->allow_tables = true; + break; + } + } + } + } +} + +bool RestCatalog::AllowedNamespaces::isNamespaceAllowed(const std::string & namespace_, bool nested) const +{ + // Trivial case, check here to avoid split namespace on nested + if (nested_namespaces.contains("*")) + return true; + + std::vector list_of_nested_namespaces; + boost::split(list_of_nested_namespaces, namespace_, boost::is_any_of(".")); + + const AllowedNamespaces * current = this; + for (const auto & nns : list_of_nested_namespaces) + { + if (current->nested_namespaces.contains("*")) + return true; + auto it = current->nested_namespaces.find(nns); + if (it == current->nested_namespaces.end()) + return false; + current = &(it->second); + } + + return nested ? !current->nested_namespaces.empty() : current->allow_tables; +} } diff --git a/src/Databases/DataLake/RestCatalog.h b/src/Databases/DataLake/RestCatalog.h index f9ce53729ce4..65fc11bf11eb 100644 --- a/src/Databases/DataLake/RestCatalog.h +++ b/src/Databases/DataLake/RestCatalog.h @@ -29,6 +29,7 @@ class RestCatalog final : public ICatalog, private DB::WithContext const std::string & auth_header_, const std::string & oauth_server_uri_, bool oauth_server_use_request_body_, + const std::string & namespaces_, DB::ContextPtr context_); explicit RestCatalog( @@ -115,6 +116,26 @@ class RestCatalog final : public ICatalog, private DB::WithContext bool oauth_server_use_request_body; mutable std::optional access_token; +public: + class AllowedNamespaces + { + public: + AllowedNamespaces() {} + explicit AllowedNamespaces(const std::string & namespaces_); + + /// Check if nested namespaces (nesetd=true) or tables (nested=false) are allowed in namespace + bool isNamespaceAllowed(const std::string & namespace_, bool nested) const; + + private: + /// List of allowed nested namespaces + std::unordered_map nested_namespaces; + /// Tables from current level are allowed + bool allow_tables = false; + }; + +private: + AllowedNamespaces allowed_namespaces; + Poco::Net::HTTPBasicCredentials credentials{}; DB::ReadWriteBufferFromHTTPPtr createReadBuffer( diff --git a/src/Databases/DataLake/UnityCatalog.cpp b/src/Databases/DataLake/UnityCatalog.cpp index 1f59c16242fd..8b183aab2fd7 100644 --- a/src/Databases/DataLake/UnityCatalog.cpp +++ b/src/Databases/DataLake/UnityCatalog.cpp @@ -17,6 +17,7 @@ namespace DB::ErrorCodes { extern const int DATALAKE_DATABASE_ERROR; extern const int LOGICAL_ERROR; + extern const int CATALOG_NAMESPACE_DISABLED; } namespace @@ -161,6 +162,9 @@ bool UnityCatalog::tryGetTableMetadata( const std::string & table_name, TableMetadata & result) const { + if (!isNamespaceAllowed(schema_name)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, "Namespace {} is filtered by `namespaces` database parameter", schema_name); + auto full_table_name = warehouse + "." + schema_name + "." + table_name; Poco::Dynamic::Var json; std::string json_str; @@ -280,6 +284,9 @@ bool UnityCatalog::tryGetTableMetadata( bool UnityCatalog::existsTable(const std::string & schema_name, const std::string & table_name) const { + if (!isNamespaceAllowed(schema_name)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, "Namespace {} is filtered by `namespaces` database parameter", schema_name); + String json_str; Poco::Dynamic::Var json; try @@ -389,7 +396,7 @@ DataLake::ICatalog::Namespaces UnityCatalog::getSchemas(const std::string & base chassert(schema_info->get("catalog_name").extract() == warehouse); UnityCatalogFullSchemaName schema_name = parseFullSchemaName(schema_info->get("full_name").extract()); - if (schema_name.schema_name.starts_with(base_prefix)) + if (isNamespaceAllowed(schema_name.schema_name) && schema_name.schema_name.starts_with(base_prefix)) schemas.push_back(schema_name.schema_name); if (limit && schemas.size() > limit) @@ -431,6 +438,7 @@ UnityCatalog::UnityCatalog( const std::string & catalog_, const std::string & base_url_, const std::string & catalog_credential_, + const std::string & namespaces_, DB::ContextPtr context_) : ICatalog(catalog_) , DB::WithContext(context_) @@ -438,6 +446,12 @@ UnityCatalog::UnityCatalog( , log(getLogger("UnityCatalog(" + catalog_ + ")")) , auth_header("Authorization", "Bearer " + catalog_credential_) { + boost::split(allowed_namespaces, namespaces_, boost::is_any_of(", "), boost::token_compress_on); +} + +bool UnityCatalog::isNamespaceAllowed(const std::string & namespace_) const +{ + return allowed_namespaces.contains("*") || allowed_namespaces.contains(namespace_); } } diff --git a/src/Databases/DataLake/UnityCatalog.h b/src/Databases/DataLake/UnityCatalog.h index 2e6262d6e5d7..cd42dad71610 100644 --- a/src/Databases/DataLake/UnityCatalog.h +++ b/src/Databases/DataLake/UnityCatalog.h @@ -21,6 +21,7 @@ class UnityCatalog final : public ICatalog, private DB::WithContext const std::string & catalog_, const std::string & base_url_, const std::string & catalog_credential_, + const std::string & namespaces_, DB::ContextPtr context_); ~UnityCatalog() override = default; @@ -59,6 +60,10 @@ class UnityCatalog final : public ICatalog, private DB::WithContext Poco::Net::HTTPBasicCredentials credentials{}; + std::unordered_set allowed_namespaces; + + bool isNamespaceAllowed(const std::string & namespace_) const; + DataLake::ICatalog::Namespaces getSchemas(const std::string & base_prefix, size_t limit = 0) const; DB::Names getTablesForSchema(const std::string & schema, size_t limit = 0) const; diff --git a/src/Databases/DataLake/tests/gtest_rest_catalog_allowed_namespaces.cpp b/src/Databases/DataLake/tests/gtest_rest_catalog_allowed_namespaces.cpp new file mode 100644 index 000000000000..7a1981511c3f --- /dev/null +++ b/src/Databases/DataLake/tests/gtest_rest_catalog_allowed_namespaces.cpp @@ -0,0 +1,69 @@ +#include +#include + + +TEST(TestRestCatalogAllowedNamespaces, TestAllAllowed) +{ + DataLake::RestCatalog::AllowedNamespaces namespaces("*"); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo", /*nested*/ true)); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo", /*nested*/ false)); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ true)); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ false)); +} + +TEST(TestRestCatalogAllowedNamespaces, TestAllBlocked) +{ + DataLake::RestCatalog::AllowedNamespaces namespaces(""); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo", /*nested*/ false)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ false)); +} + +TEST(TestRestCatalogAllowedNamespaces, TestTableInNamespaceAllowed) +{ + DataLake::RestCatalog::AllowedNamespaces namespaces("foo"); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo", /*nested*/ true)); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo", /*nested*/ false)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ false)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("biz", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("biz", /*nested*/ false)); +} + +TEST(TestRestCatalogAllowedNamespaces, TestSpecificNestedNamespaceAllowed) +{ + DataLake::RestCatalog::AllowedNamespaces namespaces("foo.bar"); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo", /*nested*/ false)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ true)); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ false)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("bar", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("bar", /*nested*/ false)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("biz", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("biz", /*nested*/ false)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo.biz", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo.biz", /*nested*/ false)); +} + +TEST(TestRestCatalogAllowedNamespaces, TestNestedNamespacesAllowed) +{ + DataLake::RestCatalog::AllowedNamespaces namespaces("foo.*"); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo", /*nested*/ false)); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ true)); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ false)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("biz", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("biz", /*nested*/ false)); +} + +TEST(TestRestCatalogAllowedNamespaces, TestTablesAndNestedNamespacesAllowed) +{ + DataLake::RestCatalog::AllowedNamespaces namespaces("foo,foo.*"); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo", /*nested*/ true)); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo", /*nested*/ false)); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ true)); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ false)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("biz", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("biz", /*nested*/ false)); +} diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 0dc4e6b7653d..6f1668d5b926 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -69,7 +69,11 @@ template { public: - explicit DataLakeConfiguration(DataLakeStorageSettingsPtr settings_) : settings(settings_) {} + explicit DataLakeConfiguration( + DataLakeStorageSettingsPtr settings_, + std::optional catalog_namespaces_ = std::nullopt) + : settings(settings_) + , catalog_namespaces(catalog_namespaces_.value_or("*")) {} bool isDataLakeConfiguration() const override { return true; } @@ -317,6 +321,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl .aws_access_key_id = (*settings)[DataLakeStorageSetting::storage_aws_access_key_id].value, .aws_secret_access_key = (*settings)[DataLakeStorageSetting::storage_aws_secret_access_key].value, .region = (*settings)[DataLakeStorageSetting::storage_region].value, + .namespaces = catalog_namespaces, }; return std::make_shared( @@ -338,6 +343,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl (*settings)[DataLakeStorageSetting::storage_auth_header], (*settings)[DataLakeStorageSetting::storage_oauth_server_uri].value, (*settings)[DataLakeStorageSetting::storage_oauth_server_use_request_body].value, + catalog_namespaces, context); } @@ -371,6 +377,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl LoggerPtr log = getLogger("DataLakeConfiguration"); const DataLakeStorageSettingsPtr settings; ObjectStoragePtr ready_object_storage; + std::string catalog_namespaces; void assertInitialized() const { diff --git a/tests/integration/test_database_delta/test.py b/tests/integration/test_database_delta/test.py index 0adf736c51b7..f2e6e66377ba 100644 --- a/tests/integration/test_database_delta/test.py +++ b/tests/integration/test_database_delta/test.py @@ -24,6 +24,9 @@ from helpers.test_tools import TSV +CATALOG_NAME = "unity_catalog_test_db" + + def start_unity_catalog(node): node.exec_in_container( [ @@ -661,3 +664,42 @@ def get_table_versions(): }, ).strip() ) + + +def test_namespace_filter(started_cluster): + node = started_cluster.instances["node1"] + + # Use the same table name in all namespaces + table_name = f"table_{uuid.uuid4()}".replace("-", "_") + namespace_prefix = f"namespace_{uuid.uuid4()}_".replace("-", "_") + + + def create_namespace(suffix): + namespace = f"{namespace_prefix}{suffix}" + execute_spark_query( + node, f"CREATE SCHEMA {namespace}" + ) + execute_spark_query( + node, f"CREATE TABLE {namespace}.{table_name} (col1 int, col2 double) using Delta location '/var/lib/clickhouse/user_files/tmp/{namespace}/{table_name}'" + ) + + create_namespace("alpha"); + create_namespace("bravo"); + + node.query( + f""" + drop database if exists {CATALOG_NAME}; + create database {CATALOG_NAME} + engine DataLakeCatalog('http://localhost:8080/api/2.1/unity-catalog') + settings warehouse = 'unity', catalog_type='unity', vended_credentials=false, namespaces = '{namespace_prefix}alpha' + """, + settings={"allow_database_unity_catalog": "1"}, + ) + + assert node.query(f"SELECT name FROM system.tables WHERE database='{CATALOG_NAME}' ORDER BY name", settings={"show_data_lake_catalogs_in_system_tables": 1}) == TSV( + [ + [f"{namespace_prefix}alpha.{table_name}"], + ]) + + assert node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}alpha.{table_name}`") == "0\n" + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}bravo.{table_name}`") diff --git a/tests/integration/test_database_glue/test.py b/tests/integration/test_database_glue/test.py index e476bcf19efc..47c987ae9d97 100644 --- a/tests/integration/test_database_glue/test.py +++ b/tests/integration/test_database_glue/test.py @@ -16,6 +16,7 @@ from pyiceberg.table.sorting import SortField, SortOrder from pyiceberg.transforms import DayTransform, IdentityTransform from helpers.config_cluster import minio_access_key, minio_secret_key +from helpers.test_tools import TSV import decimal from pyiceberg.types import ( DoubleType, @@ -715,3 +716,41 @@ def test_table_without_metadata_location(started_cluster): assert "Iceberg" in create_table_result, f"Expected Iceberg engine in: {create_table_result}" node.query(f"DROP DATABASE IF EXISTS {db_name} SYNC") + + +def test_namespace_filter(started_cluster): + node = started_cluster.instances["node1"] + + # Use the same table name in all namespaces + table_name = f"table_{uuid.uuid4()}" + table2_name = f"table2_{uuid.uuid4()}" + namespace_prefix = f"namespace_{uuid.uuid4()}_" + + catalog = load_catalog_impl(started_cluster) + + def create_namespace(suffix): + namespace = f"{namespace_prefix}{suffix}" + catalog.create_namespace(namespace) + create_table(catalog, namespace, table_name, DEFAULT_SCHEMA, PartitionSpec(), DEFAULT_SORT_ORDER) + + create_namespace("alpha"); + create_namespace("bravo"); + + create_clickhouse_glue_database(started_cluster, node, CATALOG_NAME, + additional_settings={ + "namespaces": f"{namespace_prefix}alpha" + }) + + assert node.query(f"SELECT name FROM system.tables WHERE database='{CATALOG_NAME}' ORDER BY name", settings={"show_data_lake_catalogs_in_system_tables": 1}) == TSV( + [ + [f"{namespace_prefix}alpha.{table_name}"], + ]) + + assert node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}alpha.{table_name}`") == "0\n" + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}bravo.{table_name}`") + + node.query(f"CREATE TABLE {CATALOG_NAME}.`{namespace_prefix}alpha.{table2_name}` (x String) ENGINE = IcebergS3('http://minio:9000/warehouse-glue/{namespace_prefix}alpha/a1/{table2_name}/', '{minio_access_key}', '{minio_secret_key}')") + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"CREATE TABLE {CATALOG_NAME}.`{namespace_prefix}bravo.{table2_name}` (x String) ENGINE = IcebergS3('http://minio:9000/warehouse-glue/{namespace_prefix}bravo/{table2_name}/', '{minio_access_key}', '{minio_secret_key}')") + + node.query(f"DROP TABLE {CATALOG_NAME}.`{namespace_prefix}alpha.{table_name}`") + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"DROP TABLE {CATALOG_NAME}.`{namespace_prefix}bravo.{table_name}`") diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 327a8f299bdb..1e0ec2a9d309 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -704,3 +704,69 @@ def test_gcs(started_cluster): """ ) assert "Google cloud storage converts to S3" in str(err.value) + + +def test_namespace_filter(started_cluster): + node = started_cluster.instances["node1"] + + # Use the same table name in all namespaces + table_name = f"table_{uuid.uuid4()}" + table2_name = f"table2_{uuid.uuid4()}" + namespace_prefix = f"namespace_{uuid.uuid4()}_" + + catalog = load_catalog_impl(started_cluster) + + def create_namespace(suffix): + namespace = f"{namespace_prefix}{suffix}" + catalog.create_namespace(namespace) + create_table(catalog, namespace, table_name, DEFAULT_SCHEMA, PartitionSpec(), DEFAULT_SORT_ORDER) + + create_namespace("alpha"); + create_namespace("alpha.a1"); + create_namespace("alpha.a2"); + create_namespace("bravo"); + create_namespace("bravo.b1"); + create_namespace("charlie"); + create_namespace("charlie.c1"); + create_namespace("delta"); + create_namespace("delta.d1"); + create_namespace("delta.d2"); + create_namespace("echo"); + create_namespace("echo.e1"); + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME, + additional_settings={ + "namespaces": f"{namespace_prefix}alpha,{namespace_prefix}alpha.a1,{namespace_prefix}bravo,{namespace_prefix}bravo.*,{namespace_prefix}charlie,{namespace_prefix}delta.d1,{namespace_prefix}echo.*" + }) + + assert node.query(f"SELECT name FROM system.tables WHERE database='{CATALOG_NAME}' ORDER BY name", settings={"show_data_lake_catalogs_in_system_tables": 1}) == TSV( + [ + [f"{namespace_prefix}alpha.a1.{table_name}"], + [f"{namespace_prefix}alpha.{table_name}"], + [f"{namespace_prefix}bravo.b1.{table_name}"], + [f"{namespace_prefix}bravo.{table_name}"], + [f"{namespace_prefix}charlie.{table_name}"], + [f"{namespace_prefix}delta.d1.{table_name}"], + [f"{namespace_prefix}echo.e1.{table_name}"], + ]) + + assert node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}alpha.{table_name}`") == "0\n" + assert node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}alpha.a1.{table_name}`") == "0\n" + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}alpha.a2.{table_name}`") + assert node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}bravo.{table_name}`") == "0\n" + assert node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}bravo.b1.{table_name}`") == "0\n" + assert node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}charlie.{table_name}`") == "0\n" + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}charlie.c1.{table_name}`") + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}delta.{table_name}`") + assert node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}delta.d1.{table_name}`") == "0\n" + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}delta.d2.{table_name}`") + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}echo.{table_name}`") + assert node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}echo.e1.{table_name}`") == "0\n" + + node.query(f"CREATE TABLE {CATALOG_NAME}.`{namespace_prefix}alpha.{table2_name}` (x String) ENGINE = IcebergS3('http://minio:9000/warehouse-rest/{namespace_prefix}alpha/{table2_name}/', '{minio_access_key}', '{minio_secret_key}')") + node.query(f"CREATE TABLE {CATALOG_NAME}.`{namespace_prefix}alpha.a1.{table2_name}` (x String) ENGINE = IcebergS3('http://minio:9000/warehouse-rest/{namespace_prefix}alpha/a1/{table2_name}/', '{minio_access_key}', '{minio_secret_key}')") + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"CREATE TABLE {CATALOG_NAME}.`{namespace_prefix}alpha.a2.{table2_name}` (x String) ENGINE = IcebergS3('http://minio:9000/warehouse-rest/{namespace_prefix}alpha/a2/{table2_name}/', '{minio_access_key}', '{minio_secret_key}')") + + node.query(f"DROP TABLE {CATALOG_NAME}.`{namespace_prefix}alpha.{table_name}`") + node.query(f"DROP TABLE {CATALOG_NAME}.`{namespace_prefix}alpha.a1.{table_name}`") + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"DROP TABLE {CATALOG_NAME}.`{namespace_prefix}alpha.a2.{table_name}`")