diff --git a/src/Databases/Iceberg/DatabaseIceberg.cpp b/src/Databases/Iceberg/DatabaseIceberg.cpp index 58f4d37baac3..880a27d61d9b 100644 --- a/src/Databases/Iceberg/DatabaseIceberg.cpp +++ b/src/Databases/Iceberg/DatabaseIceberg.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -37,6 +38,7 @@ namespace DatabaseIcebergSetting extern const DatabaseIcebergSettingsString storage_endpoint; extern const DatabaseIcebergSettingsString oauth_server_uri; extern const DatabaseIcebergSettingsBool vended_credentials; + extern const DatabaseIcebergSettingsString object_storage_cluster; } namespace Setting { @@ -235,7 +237,10 @@ StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_ /// no table structure in table definition AST. StorageObjectStorage::Configuration::initialize(*configuration, args, context_, /* with_table_structure */false, std::move(storage_settings)); - return std::make_shared( + auto cluster_name = settings[DatabaseIcebergSetting::object_storage_cluster].value; + + return std::make_shared( + cluster_name, configuration, configuration->createObjectStorage(context_, /* is_readonly */ false), context_, @@ -243,11 +248,9 @@ StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_ /* columns */columns, /* constraints */ConstraintsDescription{}, /* comment */"", - getFormatSettings(context_), - LoadingStrictnessLevel::CREATE, - /* distributed_processing */false, - /* partition_by */nullptr, - /* lazy_init */true); + /* format_settings */ getFormatSettings(context_), + /* mode */ LoadingStrictnessLevel::CREATE, + /* partition_by */nullptr); } DatabaseTablesIteratorPtr DatabaseIceberg::getTablesIterator( diff --git a/src/Databases/Iceberg/DatabaseIcebergSettings.cpp b/src/Databases/Iceberg/DatabaseIcebergSettings.cpp index 37b4909106ba..4847309a6283 100644 --- a/src/Databases/Iceberg/DatabaseIcebergSettings.cpp +++ b/src/Databases/Iceberg/DatabaseIcebergSettings.cpp @@ -23,6 +23,7 @@ namespace ErrorCodes DECLARE(String, warehouse, "", "Warehouse name inside the catalog", 0) \ DECLARE(String, auth_header, "", "Authorization header of format 'Authorization: '", 0) \ DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \ + DECLARE(String, object_storage_cluster, "", "Cluster for distributed requests", 0) \ #define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \ DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS) diff --git a/src/Parsers/FunctionSecretArgumentsFinder.h b/src/Parsers/FunctionSecretArgumentsFinder.h index 9277093c6085..20bb7ab8b354 100644 --- a/src/Parsers/FunctionSecretArgumentsFinder.h +++ b/src/Parsers/FunctionSecretArgumentsFinder.h @@ -102,7 +102,7 @@ class FunctionSecretArgumentsFinder } else if ((function->name() == "s3") || (function->name() == "cosn") || (function->name() == "oss") || (function->name() == "deltaLake") || (function->name() == "hudi") || (function->name() == "iceberg") || - (function->name() == "gcs")) + (function->name() == "icebergS3") || (function->name() == "gcs")) { /// s3('url', 'aws_access_key_id', 'aws_secret_access_key', ...) findS3FunctionSecretArguments(/* is_cluster_function= */ false); diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 490546d19195..ec67b9733bee 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -216,12 +216,12 @@ class IStorage : public std::enable_shared_from_this, public TypePromo /// Update storage metadata. Used in ALTER or initialization of Storage. /// Metadata object is multiversion, so this method can be called without /// any locks. - void setInMemoryMetadata(const StorageInMemoryMetadata & metadata_) + virtual void setInMemoryMetadata(const StorageInMemoryMetadata & metadata_) { metadata.set(std::make_unique(metadata_)); } - void setVirtuals(VirtualColumnsDescription virtuals_) + virtual void setVirtuals(VirtualColumnsDescription virtuals_) { virtuals.set(std::make_unique(std::move(virtuals_))); } diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 28b5a84166a2..6b5e9f0e49ba 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -36,6 +36,11 @@ namespace Setting extern const SettingsBool skip_unavailable_shards; } +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +} + IStorageCluster::IStorageCluster( const String & cluster_name_, const StorageID & table_id_, @@ -73,13 +78,21 @@ void IStorageCluster::read( SelectQueryInfo & query_info, ContextPtr context, QueryProcessingStage::Enum processed_stage, - size_t /*max_block_size*/, - size_t /*num_streams*/) + size_t max_block_size, + size_t num_streams) { + auto cluster_name_from_settings = getClusterName(context); + + if (cluster_name_from_settings.empty()) + { + readFallBackToPure(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams); + return; + } + storage_snapshot->check(column_names); updateBeforeRead(context); - auto cluster = getCluster(context); + auto cluster = getClusterImpl(context, cluster_name_from_settings); /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) @@ -126,6 +139,20 @@ void IStorageCluster::read( query_plan.addStep(std::move(reading)); } +SinkToStoragePtr IStorageCluster::write( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, + bool async_insert) +{ + auto cluster_name_from_settings = getClusterName(context); + + if (cluster_name_from_settings.empty()) + return writeFallBackToPure(query, metadata_snapshot, context, async_insert); + + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method write is not supported by storage {}", getName()); +} + void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { createExtension(nullptr); @@ -196,9 +223,9 @@ ContextPtr ReadFromCluster::updateSettings(const Settings & settings) return new_context; } -ClusterPtr IStorageCluster::getCluster(ContextPtr context) const +ClusterPtr IStorageCluster::getClusterImpl(ContextPtr context, const String & cluster_name_) { - return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef()); + return context->getCluster(cluster_name_)->getClusterWithReplicasAsShards(context->getSettingsRef()); } } diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 4d7a047e0c3e..2992c3bc2497 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -29,10 +29,16 @@ class IStorageCluster : public IStorage SelectQueryInfo & query_info, ContextPtr context, QueryProcessingStage::Enum processed_stage, - size_t /*max_block_size*/, - size_t /*num_streams*/) override; + size_t max_block_size, + size_t num_streams) override; - ClusterPtr getCluster(ContextPtr context) const; + SinkToStoragePtr write( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, + bool async_insert) override; + + ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); } /// Query is needed for pruning by virtual columns (_file, _path) virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const = 0; @@ -43,11 +49,38 @@ class IStorageCluster : public IStorage bool supportsOptimizationToSubcolumns() const override { return false; } bool supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr) const override { return true; } + const String & getOriginalClusterName() const { return cluster_name; } + virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); } + protected: virtual void updateBeforeRead(const ContextPtr &) {} virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {} + virtual void readFallBackToPure( + QueryPlan & /* query_plan */, + const Names & /* column_names */, + const StorageSnapshotPtr & /* storage_snapshot */, + SelectQueryInfo & /* query_info */, + ContextPtr /* context */, + QueryProcessingStage::Enum /* processed_stage */, + size_t /* max_block_size */, + size_t /* num_streams */) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method readFallBackToPure is not supported by storage {}", getName()); + } + + virtual SinkToStoragePtr writeFallBackToPure( + const ASTPtr & /*query*/, + const StorageMetadataPtr & /*metadata_snapshot*/, + ContextPtr /*context*/, + bool /*async_insert*/) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method writeFallBackToPure is not supported by storage {}", getName()); + } + private: + static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_); + LoggerPtr log; String cluster_name; }; diff --git a/src/Storages/ObjectStorage/Azure/Configuration.cpp b/src/Storages/ObjectStorage/Azure/Configuration.cpp index faa1554dbe8d..0e273158bbf5 100644 --- a/src/Storages/ObjectStorage/Azure/Configuration.cpp +++ b/src/Storages/ObjectStorage/Azure/Configuration.cpp @@ -132,8 +132,6 @@ void StorageAzureConfiguration::fromNamedCollection(const NamedCollection & coll String connection_url; String container_name; - std::optional account_name; - std::optional account_key; if (collection.has("connection_string")) connection_url = collection.get("connection_string"); @@ -173,14 +171,10 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context, std::unordered_map engine_args_to_idx; - String connection_url = checkAndGetLiteralArgument(engine_args[0], "connection_string/storage_account_url"); String container_name = checkAndGetLiteralArgument(engine_args[1], "container"); blob_path = checkAndGetLiteralArgument(engine_args[2], "blobpath"); - std::optional account_name; - std::optional account_key; - auto is_format_arg = [] (const std::string & s) -> bool { return s == "auto" || FormatFactory::instance().getAllFormats().contains(Poco::toLower(s)); @@ -444,6 +438,22 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded( } } +ASTPtr StorageAzureConfiguration::createArgsWithAccessData() const +{ + auto arguments = std::make_shared(); + + arguments->children.push_back(std::make_shared(connection_params.endpoint.storage_account_url)); + arguments->children.push_back(std::make_shared(connection_params.endpoint.container_name)); + arguments->children.push_back(std::make_shared(blob_path)); + if (account_name && account_key) + { + arguments->children.push_back(std::make_shared(*account_name)); + arguments->children.push_back(std::make_shared(*account_key)); + } + + return arguments; +} + } #endif diff --git a/src/Storages/ObjectStorage/Azure/Configuration.h b/src/Storages/ObjectStorage/Azure/Configuration.h index 72124465c462..c915696f2448 100644 --- a/src/Storages/ObjectStorage/Azure/Configuration.h +++ b/src/Storages/ObjectStorage/Azure/Configuration.h @@ -79,6 +79,8 @@ class StorageAzureConfiguration : public StorageObjectStorage::Configuration ContextPtr context, bool with_structure) override; + ASTPtr createArgsWithAccessData() const override; + protected: void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override; void fromAST(ASTs & args, ContextPtr context, bool with_structure) override; @@ -86,6 +88,8 @@ class StorageAzureConfiguration : public StorageObjectStorage::Configuration std::string blob_path; std::vector blobs_paths; AzureBlobStorage::ConnectionParams connection_params; + std::optional account_name; + std::optional account_key; }; } diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index ede70567da4c..5d3adcdfa72a 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -62,6 +62,8 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl BaseStorageConfiguration::setPartitionColumns(current_metadata->getPartitionColumns()); } } + + updated = true; } std::optional tryGetTableStructureFromMetadata() const override @@ -114,6 +116,8 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl private: DataLakeMetadataPtr current_metadata; + bool updated = false; + ReadFromFormatInfo prepareReadingFromFormat( ObjectStoragePtr object_storage, const Strings & requested_columns, diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.cpp b/src/Storages/ObjectStorage/HDFS/Configuration.cpp index 3c4897b5062e..2a81a7ff78c9 100644 --- a/src/Storages/ObjectStorage/HDFS/Configuration.cpp +++ b/src/Storages/ObjectStorage/HDFS/Configuration.cpp @@ -235,6 +235,13 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded( } } +ASTPtr StorageHDFSConfiguration::createArgsWithAccessData() const +{ + auto arguments = std::make_shared(); + arguments->children.push_back(std::make_shared(url + path)); + return arguments; +} + } #endif diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.h b/src/Storages/ObjectStorage/HDFS/Configuration.h index db8ab7f9e4db..f38382e173ed 100644 --- a/src/Storages/ObjectStorage/HDFS/Configuration.h +++ b/src/Storages/ObjectStorage/HDFS/Configuration.h @@ -65,6 +65,8 @@ class StorageHDFSConfiguration : public StorageObjectStorage::Configuration ContextPtr context, bool with_structure) override; + ASTPtr createArgsWithAccessData() const override; + private: void fromNamedCollection(const NamedCollection &, ContextPtr context) override; void fromAST(ASTs & args, ContextPtr, bool /* with_structure */) override; diff --git a/src/Storages/ObjectStorage/S3/Configuration.cpp b/src/Storages/ObjectStorage/S3/Configuration.cpp index 629628c762fa..d29e4bc130ac 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.cpp +++ b/src/Storages/ObjectStorage/S3/Configuration.cpp @@ -363,11 +363,11 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_ if (engine_args_to_idx.contains("format")) { - format = checkAndGetLiteralArgument(args[engine_args_to_idx["format"]], "format"); + auto format_ = checkAndGetLiteralArgument(args[engine_args_to_idx["format"]], "format"); /// Set format to configuration only of it's not 'auto', /// because we can have default format set in configuration. - if (format != "auto") - format = format; + if (format_ != "auto") + format = format_; } if (engine_args_to_idx.contains("structure")) @@ -585,6 +585,30 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded( } } +ASTPtr StorageS3Configuration::createArgsWithAccessData() const +{ + auto arguments = std::make_shared(); + + arguments->children.push_back(std::make_shared(url.uri_str)); + if (auth_settings[S3AuthSetting::no_sign_request]) + { + arguments->children.push_back(std::make_shared("NOSIGN")); + } + else + { + arguments->children.push_back(std::make_shared(auth_settings[S3AuthSetting::access_key_id].value)); + arguments->children.push_back(std::make_shared(auth_settings[S3AuthSetting::secret_access_key].value)); + if (!auth_settings[S3AuthSetting::session_token].value.empty()) + arguments->children.push_back(std::make_shared(auth_settings[S3AuthSetting::session_token].value)); + if (format != "auto") + arguments->children.push_back(std::make_shared(format)); + if (!compression_method.empty()) + arguments->children.push_back(std::make_shared(compression_method)); + } + + return arguments; +} + } #endif diff --git a/src/Storages/ObjectStorage/S3/Configuration.h b/src/Storages/ObjectStorage/S3/Configuration.h index c4614a281899..746431987601 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.h +++ b/src/Storages/ObjectStorage/S3/Configuration.h @@ -94,6 +94,8 @@ class StorageS3Configuration : public StorageObjectStorage::Configuration ContextPtr context, bool with_structure) override; + ASTPtr createArgsWithAccessData() const override; + private: void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override; void fromAST(ASTs & args, ContextPtr context, bool with_structure) override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 1913f658ccf3..1ca4383a82fc 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -60,6 +60,9 @@ String StorageObjectStorage::getPathSample(ContextPtr context) if (context->getSettingsRef()[Setting::use_hive_partitioning]) local_distributed_processing = false; + if (!configuration->isArchive() && !configuration->isPathWithGlobs() && !local_distributed_processing) + return configuration->getPath(); + auto file_iterator = StorageObjectStorageSource::createFileIterator( configuration, query_settings, @@ -72,9 +75,6 @@ String StorageObjectStorage::getPathSample(ContextPtr context) {} // file_progress_callback ); - if (!configuration->isArchive() && !configuration->isPathWithGlobs() && !local_distributed_processing) - return configuration->getPath(); - if (auto file = file_iterator->next(0)) return file->getPath(); return ""; @@ -165,6 +165,13 @@ void StorageObjectStorage::Configuration::update(ObjectStoragePtr object_storage { IObjectStorage::ApplyNewSettingsOptions options{.allow_client_change = !isStaticConfiguration()}; object_storage_ptr->applyNewSettings(context->getConfigRef(), getTypeName() + ".", context, options); + updated = true; +} + +void StorageObjectStorage::Configuration::updateIfRequired(ObjectStoragePtr object_storage_ptr, ContextPtr local_context) +{ + if (!updated) + update(object_storage_ptr, local_context); } bool StorageObjectStorage::hasExternalDynamicMetadata() const diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 1349e24320fd..0212b80c2014 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -246,7 +246,13 @@ class StorageObjectStorage::Configuration String structure = "auto"; virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context); + void updateIfRequired(ObjectStoragePtr object_storage, ContextPtr local_context); + /// Create arguments for table function with path and access parameters + virtual ASTPtr createArgsWithAccessData() const + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method createArgsWithAccessData is not supported by storage {}", getEngineName()); + } protected: virtual void fromNamedCollection(const NamedCollection & collection, ContextPtr context) = 0; @@ -256,6 +262,7 @@ class StorageObjectStorage::Configuration void assertInitialized() const; bool initialized = false; + bool updated = false; DataLakePartitionColumns partition_columns; bool allow_dynamic_metadata_for_data_lakes; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 54478dab1de8..b3e4af67d6a8 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -4,32 +4,45 @@ #include #include #include +#include +#include +#include +#include #include #include +#include +#include #include #include #include #include - namespace DB { namespace Setting { extern const SettingsBool use_hive_partitioning; + extern const SettingsString object_storage_cluster; } namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int UNKNOWN_FUNCTION; + extern const int NOT_IMPLEMENTED; } + String StorageObjectStorageCluster::getPathSample(StorageInMemoryMetadata metadata, ContextPtr context) { auto query_settings = configuration->getQuerySettings(context); /// We don't want to throw an exception if there are no files with specified path. query_settings.throw_on_zero_files_match = false; + + if (!configuration->isArchive() && !configuration->isPathWithGlobs()) + return configuration->getPath(); + auto file_iterator = StorageObjectStorageSource::createFileIterator( configuration, query_settings, @@ -44,6 +57,7 @@ String StorageObjectStorageCluster::getPathSample(StorageInMemoryMetadata metada if (auto file = file_iterator->next(0)) return file->getPath(); + return ""; } @@ -51,10 +65,15 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( const String & cluster_name_, ConfigurationPtr configuration_, ObjectStoragePtr object_storage_, + ContextPtr context_, const StorageID & table_id_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, - ContextPtr context_) + const String & comment_, + std::optional format_settings_, + LoadingStrictnessLevel mode_, + ASTPtr partition_by_ +) : IStorageCluster( cluster_name_, table_id_, getLogger(fmt::format("{}({})", configuration_->getEngineName(), table_id_.table_name))) , configuration{configuration_} @@ -75,6 +94,19 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.columns, context_, sample_path)); setInMemoryMetadata(metadata); + + pure_storage = std::make_shared( + configuration, + object_storage, + context_, + getStorageID(), + getInMemoryMetadata().getColumns(), + getInMemoryMetadata().getConstraints(), + comment_, + format_settings_, + mode_, + /* distributed_processing */false, + partition_by_); } std::string StorageObjectStorageCluster::getName() const @@ -82,12 +114,112 @@ std::string StorageObjectStorageCluster::getName() const return configuration->getEngineName(); } +void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context) +{ + // Change table engine on table function for distributed request + // CREATE TABLE t (...) ENGINE=IcebergS3(...) + // SELECT * FROM t + // change on + // SELECT * FROM icebergS3(...) + // to execute on cluster nodes + + auto * select_query = query->as(); + if (!select_query || !select_query->tables()) + return; + + auto * tables = select_query->tables()->as(); + + if (tables->children.empty()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Expected SELECT query from table with engine {}, got '{}'", + configuration->getEngineName(), queryToString(query)); + + auto * table_expression = tables->children[0]->as()->table_expression->as(); + + if (!table_expression) + return; + + if (!table_expression->database_and_table_name) + return; + + auto & table_identifier_typed = table_expression->database_and_table_name->as(); + + auto table_alias = table_identifier_typed.tryGetAlias(); + + static std::unordered_map engine_to_function = { + {"S3", "s3"}, + {"Azure", "azureBlobStorage"}, + {"HDFS", "hdfs"}, + {"Iceberg", "icebergS3"}, + {"IcebergS3", "icebergS3"}, + {"IcebergAzure", "icebergAzure"}, + {"IcebergHDFS", "icebergHDFS"}, + {"DeltaLake", "deltaLake"}, + {"Hudi", "hudi"} + }; + + auto p = engine_to_function.find(configuration->getEngineName()); + if (p == engine_to_function.end()) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Can't find table function for engine {}", + configuration->getEngineName() + ); + } + + std::string table_function_name = p->second; + + auto function_ast = std::make_shared(); + function_ast->name = table_function_name; + + auto cluster_name = getClusterName(context); + + if (cluster_name.empty()) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Can't be here without cluster name, no cluster name in query {}", + queryToString(query)); + } + + function_ast->arguments = configuration->createArgsWithAccessData(); + function_ast->children.push_back(function_ast->arguments); + function_ast->setAlias(table_alias); + + ASTPtr function_ast_ptr(function_ast); + + table_expression->database_and_table_name = nullptr; + table_expression->table_function = function_ast_ptr; + table_expression->children[0] = function_ast_ptr; + + auto settings = select_query->settings(); + if (settings) + { + auto & settings_ast = settings->as(); + settings_ast.changes.insertSetting("object_storage_cluster", cluster_name); + } + else + { + auto settings_ast_ptr = std::make_shared(); + settings_ast_ptr->is_standalone = false; + settings_ast_ptr->changes.setSetting("object_storage_cluster", cluster_name); + select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, std::move(settings_ast_ptr)); + } + + cluster_name_in_settings = true; +} + void StorageObjectStorageCluster::updateQueryToSendIfNeeded( ASTPtr & query, const DB::StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) { + updateQueryForDistributedEngineIfNeeded(query, context); + ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query); + if (!expression_list) { throw Exception( @@ -136,4 +268,67 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; } +void StorageObjectStorageCluster::readFallBackToPure( + QueryPlan & query_plan, + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + size_t num_streams) +{ + pure_storage->read(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams); +} + +SinkToStoragePtr StorageObjectStorageCluster::writeFallBackToPure( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, + bool async_insert) +{ + return pure_storage->write(query, metadata_snapshot, context, async_insert); +} + +String StorageObjectStorageCluster::getClusterName(ContextPtr context) const +{ + /// StorageObjectStorageCluster is always created for cluster or non-cluster variants. + /// User can specify cluster name in table definition or in setting `object_storage_cluster` + /// only for several queries. When it specified in both places, priority is given to the query setting. + /// When it is empty, non-cluster realization is used. + auto cluster_name_from_settings = context->getSettingsRef()[Setting::object_storage_cluster].value; + if (cluster_name_from_settings.empty()) + cluster_name_from_settings = getOriginalClusterName(); + return cluster_name_from_settings; +} + +QueryProcessingStage::Enum StorageObjectStorageCluster::getQueryProcessingStage( + ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info) const +{ + /// Full query if fall back to pure storage. + if (getClusterName(context).empty()) + return QueryProcessingStage::Enum::FetchColumns; + + /// Distributed storage. + return IStorageCluster::getQueryProcessingStage(context, to_stage, storage_snapshot, query_info); +} + +void StorageObjectStorageCluster::truncate( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr local_context, + TableExclusiveLockHolder & lock_holder) +{ + /// Full query if fall back to pure storage. + if (getClusterName(local_context).empty()) + return pure_storage->truncate(query, metadata_snapshot, local_context, lock_holder); + + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported by storage {}", getName()); +} + +void StorageObjectStorageCluster::addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const +{ + configuration->addStructureAndFormatToArgsIfNeeded(args, "", configuration->format, context, /*with_structure=*/false); +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 32a942d4a857..bd01fb4d86ad 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -17,10 +17,15 @@ class StorageObjectStorageCluster : public IStorageCluster const String & cluster_name_, ConfigurationPtr configuration_, ObjectStoragePtr object_storage_, + ContextPtr context_, const StorageID & table_id_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, - ContextPtr context_); + const String & comment_, + std::optional format_settings_, + LoadingStrictnessLevel mode_, + ASTPtr partition_by_ = nullptr + ); std::string getName() const override; @@ -31,16 +36,73 @@ class StorageObjectStorageCluster : public IStorageCluster void setClusterNameInSettings(bool cluster_name_in_settings_) { cluster_name_in_settings = cluster_name_in_settings_; } + String getClusterName(ContextPtr context) const override; + + void setInMemoryMetadata(const StorageInMemoryMetadata & metadata_) override + { + pure_storage->setInMemoryMetadata(metadata_); + IStorageCluster::setInMemoryMetadata(metadata_); + } + + void setVirtuals(VirtualColumnsDescription virtuals_) override + { + pure_storage->setVirtuals(virtuals_); + IStorageCluster::setVirtuals(virtuals_); + } + + QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; + + void truncate( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr local_context, + TableExclusiveLockHolder &) override; + + void addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const override; + private: void updateQueryToSendIfNeeded( ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override; + void readFallBackToPure( + QueryPlan & query_plan, + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + size_t num_streams) override; + + SinkToStoragePtr writeFallBackToPure( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, + bool async_insert) override; + + /* + In case the table was created with `object_storage_cluster` setting, + modify the AST query object so that it uses the table function implementation + by mapping the engine name to table function name and setting `object_storage_cluster`. + For table like + CREATE TABLE table ENGINE=S3(...) SETTINGS object_storage_cluster='cluster' + coverts request + SELECT * FROM table + to + SELECT * FROM s3(...) SETTINGS object_storage_cluster='cluster' + to make distributed request over cluster 'cluster'. + */ + void updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context); + const String engine_name; const StorageObjectStorage::ConfigurationPtr configuration; const ObjectStoragePtr object_storage; bool cluster_name_in_settings; + + /// non-clustered storage to fall back on pure realisation if needed + std::shared_ptr pure_storage; }; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp index 383f54342036..174162fd6465 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp @@ -17,7 +17,13 @@ namespace DB allow_dynamic_metadata_for_data_lakes, \ false, \ "If enabled, indicates that metadata is taken from iceberg specification that is pulled from cloud before each query.", \ - 0) + 0) \ + DECLARE( \ + String, \ + object_storage_cluster, \ + "", \ + "Cluster for distributed requests", \ + 0) \ #define LIST_OF_STORAGE_OBJECT_STORAGE_SETTINGS(M, ALIAS) \ STORAGE_OBJECT_STORAGE_RELATED_SETTINGS(M, ALIAS) \ diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index b01566d00ca3..b5f0c386dba0 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -132,6 +132,8 @@ std::shared_ptr StorageObjectStorageSourc const bool is_archive = configuration->isArchive(); + configuration->updateIfRequired(object_storage, local_context); + std::unique_ptr iterator; if (configuration->isPathWithGlobs()) { diff --git a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp index c56038f441a5..4c7c8abebe11 100644 --- a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include "Common/logger_useful.h" @@ -19,13 +20,18 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } +namespace StorageObjectStorageSetting +{ + extern const StorageObjectStorageSettingsString object_storage_cluster; +} + namespace { // LocalObjectStorage is only supported for Iceberg Datalake operations where Avro format is required. For regular file access, use FileStorage instead. #if USE_AWS_S3 || USE_AZURE_BLOB_STORAGE || USE_HDFS || USE_AVRO -std::shared_ptr +StoragePtr createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObjectStorage::ConfigurationPtr configuration, ContextPtr context) { auto & engine_args = args.engine_args; @@ -37,6 +43,8 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject queue_settings->loadFromQuery(*args.storage_def); + auto cluster_name = (*queue_settings)[StorageObjectStorageSetting::object_storage_cluster].value; + StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, context, false, std::move(queue_settings)); // Use format settings from global server context + settings from @@ -61,7 +69,8 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject if (args.storage_def->partition_by) partition_by = args.storage_def->partition_by->clone(); - return std::make_shared( + return std::make_shared( + cluster_name, configuration, configuration->createObjectStorage(context, /* is_readonly */ false), args.getContext(), @@ -71,7 +80,6 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject args.comment, format_settings, args.mode, - /* distributed_processing */ false, partition_by); } diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp index 54d5c101826f..b563039fa325 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp @@ -46,7 +46,7 @@ StoragePtr TableFunctionObjectStorageCluster::execute /* format_settings */ std::nullopt, /// No format_settings /* mode */ LoadingStrictnessLevel::CREATE, /* distributed_processing */ true, - /*partition_by_=*/nullptr); + /* partition_by */ nullptr); } else { @@ -54,10 +54,14 @@ StoragePtr TableFunctionObjectStorageCluster::execute ITableFunctionCluster::cluster_name, configuration, object_storage, + context, StorageID(Base::getDatabaseName(), table_name), columns, ConstraintsDescription{}, - context); + /* comment */ String{}, + /* format_settings */ std::nullopt, /// No format_settings + /* mode */ LoadingStrictnessLevel::CREATE, + /* partition_by */ nullptr); } storage->startup(); diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index c20338f96622..63e546429151 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -721,6 +721,73 @@ def test_remote_no_hedged(started_cluster): assert TSV(pure_s3) == TSV(s3_distributed) +def test_distributed_s3_table_engine(started_cluster): + node = started_cluster.instances["s0_0_0"] + + resp_def = node.query( + """ + SELECT * from s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) + """ + ) + + node.query("DROP TABLE IF EXISTS single_node"); + node.query( + """ + CREATE TABLE single_node + (name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))) + ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV') + """ + ) + query_id_engine_single_node = str(uuid.uuid4()) + resp_engine_single_node = node.query( + """ + SELECT * FROM single_node ORDER BY (name, value, polygon) + """, + query_id = query_id_engine_single_node + ) + assert resp_def == resp_engine_single_node + + node.query("DROP TABLE IF EXISTS distributed"); + node.query( + """ + CREATE TABLE distributed + (name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))) + ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV') + SETTINGS object_storage_cluster='cluster_simple' + """ + ) + query_id_engine_distributed = str(uuid.uuid4()) + resp_engine_distributed = node.query( + """ + SELECT * FROM distributed ORDER BY (name, value, polygon) + """, + query_id = query_id_engine_distributed + ) + assert resp_def == resp_engine_distributed + + node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") + + hosts_engine_single_node = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_single_node}' + """ + ) + assert int(hosts_engine_single_node) == 1 + hosts_engine_distributed = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_distributed}' + """ + ) + assert int(hosts_engine_distributed) == 3 + + def test_hive_partitioning(started_cluster): node = started_cluster.instances["s0_0_0"] for i in range(1, 5): diff --git a/tests/integration/test_storage_azure_blob_storage/test_cluster.py b/tests/integration/test_storage_azure_blob_storage/test_cluster.py index 54e1ced79577..2036de1becd6 100644 --- a/tests/integration/test_storage_azure_blob_storage/test_cluster.py +++ b/tests/integration/test_storage_azure_blob_storage/test_cluster.py @@ -104,8 +104,57 @@ def test_select_all(cluster): query_id=query_id_distributed_alt_syntax, ) print(distributed_azure_alt_syntax) + azure_query( + node, + f""" + DROP TABLE IF EXISTS azure_engine_table_single_node; + CREATE TABLE azure_engine_table_single_node + (key UInt64, data String) + ENGINE=AzureBlobStorage( + '{storage_account_url}', + 'cont', + 'test_cluster_select_all.csv', + 'devstoreaccount1', + 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', + 'CSV', + 'auto' + ) + """, + ) + query_id_engine_single_node = str(uuid.uuid4()) + azure_engine_single_node = azure_query( + node, + "SELECT * FROM azure_engine_table_single_node", + query_id=query_id_engine_single_node, + ) + azure_query( + node, + f""" + DROP TABLE IF EXISTS azure_engine_table_distributed; + CREATE TABLE azure_engine_table_distributed + (key UInt64, data String) + ENGINE=AzureBlobStorage( + '{storage_account_url}', + 'cont', + 'test_cluster_select_all.csv', + 'devstoreaccount1', + 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', + 'CSV', + 'auto' + ) + SETTINGS object_storage_cluster='simple_cluster' + """, + ) + query_id_engine_distributed = str(uuid.uuid4()) + azure_engine_distributed = azure_query( + node, + "SELECT * FROM azure_engine_table_distributed", + query_id=query_id_engine_distributed, + ) assert TSV(pure_azure) == TSV(distributed_azure) assert TSV(pure_azure) == TSV(distributed_azure_alt_syntax) + assert TSV(pure_azure) == TSV(azure_engine_single_node) + assert TSV(pure_azure) == TSV(azure_engine_distributed) for _, node_ in cluster.instances.items(): node_.query("SYSTEM FLUSH LOGS") nodes_pure = node.query( @@ -135,6 +184,24 @@ def test_select_all(cluster): """, ) assert int(nodes_distributed_alt_syntax) == 3 + nodes_engine_single_node = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('simple_cluster', system.query_log) + WHERE type='QueryFinish' + AND initial_query_id='{query_id_engine_single_node}' + """, + ) + assert int(nodes_engine_single_node) == 1 + nodes_engine_distributed = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('simple_cluster', system.query_log) + WHERE type='QueryFinish' + AND initial_query_id='{query_id_engine_distributed}' + """, + ) + assert int(nodes_engine_distributed) == 3 def test_count(cluster): diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index edab338aa351..d6c9deb35cc8 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -208,13 +208,17 @@ def get_creation_expression( table_function=False, allow_dynamic_metadata_for_data_lakes=False, run_on_cluster=False, + object_storage_cluster=False, **kwargs, ): - allow_dynamic_metadata_for_datalakes_suffix = ( - " SETTINGS allow_dynamic_metadata_for_data_lakes = 1" - if allow_dynamic_metadata_for_data_lakes - else "" - ) + settings_suffix = "" + if allow_dynamic_metadata_for_data_lakes or object_storage_cluster: + settings = [] + if allow_dynamic_metadata_for_data_lakes: + settings.append("allow_dynamic_metadata_for_data_lakes = 1") + if object_storage_cluster: + settings.append(f"object_storage_cluster = '{object_storage_cluster}'") + settings_suffix = " SETTINGS " + ", ".join(settings) if storage_type == "s3": if "bucket" in kwargs: @@ -234,7 +238,7 @@ def get_creation_expression( DROP TABLE IF EXISTS {table_name}; CREATE TABLE {table_name} ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')""" - + allow_dynamic_metadata_for_datalakes_suffix + + settings_suffix ) elif storage_type == "azure": @@ -254,7 +258,7 @@ def get_creation_expression( DROP TABLE IF EXISTS {table_name}; CREATE TABLE {table_name} ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})""" - + allow_dynamic_metadata_for_datalakes_suffix + + settings_suffix ) elif storage_type == "hdfs": @@ -290,7 +294,7 @@ def get_creation_expression( DROP TABLE IF EXISTS {table_name}; CREATE TABLE {table_name} ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format})""" - + allow_dynamic_metadata_for_datalakes_suffix + + settings_suffix ) else: @@ -326,10 +330,18 @@ def create_iceberg_table( table_name, cluster, format="Parquet", + object_storage_cluster=False, **kwargs, ): node.query( - get_creation_expression(storage_type, table_name, cluster, format, **kwargs) + get_creation_expression( + storage_type, + table_name, + cluster, + format, + object_storage_cluster=object_storage_cluster, + **kwargs, + ) ) @@ -575,6 +587,28 @@ def test_types(started_cluster, format_version, storage_type): ) +def count_secondary_subqueries(started_cluster, query_id, expected, comment): + for node_name, replica in started_cluster.instances.items(): + cluster_secondary_queries = ( + replica.query( + f""" + SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log + WHERE + type = 'QueryFinish' + AND NOT is_initial_query + AND initial_query_id='{query_id}' + """ + ) + .strip() + .split("\n") + ) + + logging.info( + f"[{node_name}] cluster_secondary_queries {comment}: {cluster_secondary_queries}" + ) + assert len(cluster_secondary_queries) == expected + + @pytest.mark.parametrize("format_version", ["1", "2"]) @pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs"]) def test_cluster_table_function(started_cluster, format_version, storage_type): @@ -662,66 +696,77 @@ def add_df(mode): .split() ) + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, object_storage_cluster='cluster_simple') + query_id_cluster_table_engine = str(uuid.uuid4()) + select_cluster_table_engine = ( + instance.query( + f""" + SELECT * FROM {TABLE_NAME} + """, + query_id=query_id_cluster_table_engine, + ) + .strip() + .split() + ) + + select_remote_cluster = ( + instance.query(f"SELECT * FROM remote('node2',{table_function_expr_cluster})") + .strip() + .split() + ) + + instance.query(f"DROP TABLE IF EXISTS `{TABLE_NAME}` SYNC") + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster) + query_id_pure_table_engine = str(uuid.uuid4()) + select_pure_table_engine = ( + instance.query( + f""" + SELECT * FROM {TABLE_NAME} + """, + query_id=query_id_pure_table_engine, + ) + .strip() + .split() + ) + query_id_pure_table_engine_cluster = str(uuid.uuid4()) + select_pure_table_engine_cluster = ( + instance.query( + f""" + SELECT * FROM {TABLE_NAME} + SETTINGS object_storage_cluster='cluster_simple' + """, + query_id=query_id_pure_table_engine_cluster, + ) + .strip() + .split() + ) + # Simple size check assert len(select_regular) == 600 assert len(select_cluster) == 600 assert len(select_cluster_alt_syntax) == 600 + assert len(select_cluster_table_engine) == 600 + assert len(select_remote_cluster) == 600 + assert len(select_pure_table_engine) == 600 + assert len(select_pure_table_engine_cluster) == 600 # Actual check assert select_cluster == select_regular assert select_cluster_alt_syntax == select_regular + assert select_cluster_table_engine == select_regular + assert select_remote_cluster == select_regular + assert select_pure_table_engine == select_regular + assert select_pure_table_engine_cluster == select_regular # Check query_log for replica in started_cluster.instances.values(): replica.query("SYSTEM FLUSH LOGS") - for node_name, replica in started_cluster.instances.items(): - cluster_secondary_queries = ( - replica.query( - f""" - SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log - WHERE - type = 'QueryStart' - AND NOT is_initial_query - AND initial_query_id='{query_id_cluster}' - """ - ) - .strip() - .split("\n") - ) - - logging.info( - f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}" - ) - assert len(cluster_secondary_queries) == 1 - - for node_name, replica in started_cluster.instances.items(): - cluster_secondary_queries = ( - replica.query( - f""" - SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log - WHERE - type = 'QueryStart' - AND NOT is_initial_query - AND initial_query_id='{query_id_cluster_alt_syntax}' - """ - ) - .strip() - .split("\n") - ) - - logging.info( - f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}" - ) - assert len(cluster_secondary_queries) == 1 - - select_remote_cluster = ( - instance.query(f"SELECT * FROM remote('node2',{table_function_expr_cluster})") - .strip() - .split() - ) - assert len(select_remote_cluster) == 600 - assert select_remote_cluster == select_regular + count_secondary_subqueries(started_cluster, query_id_cluster, 1, "table function") + count_secondary_subqueries(started_cluster, query_id_cluster_alt_syntax, 1, "table function alt syntax") + count_secondary_subqueries(started_cluster, query_id_cluster_table_engine, 1, "cluster table engine") + count_secondary_subqueries(started_cluster, query_id_pure_table_engine, 0, "table engine") + count_secondary_subqueries(started_cluster, query_id_pure_table_engine_cluster, 1, "table engine with cluster setting") @pytest.mark.parametrize("format_version", ["1", "2"])