Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed request to tables with Object Storage Engines #615

Open
wants to merge 20 commits into
base: antalya
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"least_greatest_legacy_null_behavior", true, false, "New setting"},
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
{"input_format_parquet_use_metadata_cache", false, false, "New setting"},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, one more thing. I believe you added this because of a CI/CD failure. @Enmk has merged a PR that fixes this. Perhaps you want to update your branch?

}
},
{"24.11",
Expand Down
47 changes: 34 additions & 13 deletions src/Databases/Iceberg/DatabaseIceberg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <Storages/ConstraintsDescription.h>
#include <Storages/StorageNull.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>

#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
Expand All @@ -37,10 +38,12 @@ namespace DatabaseIcebergSetting
extern const DatabaseIcebergSettingsString storage_endpoint;
extern const DatabaseIcebergSettingsString oauth_server_uri;
extern const DatabaseIcebergSettingsBool vended_credentials;
extern const DatabaseIcebergSettingsString object_storage_cluster;
}
namespace Setting
{
extern const SettingsBool allow_experimental_database_iceberg;
extern const SettingsString object_storage_cluster;
}

namespace ErrorCodes
Expand Down Expand Up @@ -235,19 +238,37 @@ StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_
/// no table structure in table definition AST.
StorageObjectStorage::Configuration::initialize(*configuration, args, context_, /* with_table_structure */false, std::move(storage_settings));

return std::make_shared<StorageObjectStorage>(
configuration,
configuration->createObjectStorage(context_, /* is_readonly */ false),
context_,
StorageID(getDatabaseName(), name),
/* columns */columns,
/* constraints */ConstraintsDescription{},
/* comment */"",
getFormatSettings(context_),
LoadingStrictnessLevel::CREATE,
/* distributed_processing */false,
/* partition_by */nullptr,
/* lazy_init */true);
auto cluster_name = settings[DatabaseIcebergSetting::object_storage_cluster].value;
if (cluster_name.empty())
cluster_name = context_->getSettingsRef()[Setting::object_storage_cluster].value;

if (cluster_name.empty())
{
return std::make_shared<StorageObjectStorage>(
configuration,
configuration->createObjectStorage(context_, /* is_readonly */ false),
context_,
StorageID(getDatabaseName(), name),
/* columns */columns,
/* constraints */ConstraintsDescription{},
/* comment */"",
getFormatSettings(context_),
LoadingStrictnessLevel::CREATE,
/* distributed_processing */false,
/* partition_by */nullptr,
/* lazy_init */true);
}
else
{
return std::make_shared<StorageObjectStorageCluster>(
cluster_name,
configuration,
configuration->createObjectStorage(context_, /* is_readonly */ false),
StorageID(getDatabaseName(), name),
columns,
ConstraintsDescription{},
context_);
}
}

DatabaseTablesIteratorPtr DatabaseIceberg::getTablesIterator(
Expand Down
1 change: 1 addition & 0 deletions src/Databases/Iceberg/DatabaseIcebergSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace ErrorCodes
DECLARE(String, warehouse, "", "Warehouse name inside the catalog", 0) \
DECLARE(String, auth_header, "", "Authorization header of format 'Authorization: <scheme> <auth_info>'", 0) \
DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \
DECLARE(String, object_storage_cluster, "", "Cluster for distributed requests", 0) \

#define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \
DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS)
Expand Down
2 changes: 1 addition & 1 deletion src/Parsers/FunctionSecretArgumentsFinder.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class FunctionSecretArgumentsFinder
}
else if ((function->name() == "s3") || (function->name() == "cosn") || (function->name() == "oss") ||
(function->name() == "deltaLake") || (function->name() == "hudi") || (function->name() == "iceberg") ||
(function->name() == "gcs"))
(function->name() == "icebergS3") || (function->name() == "gcs"))
{
/// s3('url', 'aws_access_key_id', 'aws_secret_access_key', ...)
findS3FunctionSecretArguments(/* is_cluster_function= */ false);
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class IStorageCluster : public IStorage
bool supportsOptimizationToSubcolumns() const override { return false; }
bool supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr) const override { return true; }

const String & getClusterName() const { return cluster_name; }

protected:
virtual void updateBeforeRead(const ContextPtr &) {}
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}
Expand Down
33 changes: 32 additions & 1 deletion src/Storages/ObjectStorage/Azure/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ void StorageAzureConfiguration::fromNamedCollection(const NamedCollection & coll
compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));

blobs_paths = {blob_path};
if (account_name && account_key)
{
if (saved_params.empty())
{
saved_params.push_back(*account_name);
saved_params.push_back(*account_key);
}
}
connection_params = getConnectionParams(connection_url, container_name, account_name, account_key, context);
}

Expand All @@ -173,7 +181,6 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,

std::unordered_map<std::string_view, size_t> engine_args_to_idx;


String connection_url = checkAndGetLiteralArgument<String>(engine_args[0], "connection_string/storage_account_url");
String container_name = checkAndGetLiteralArgument<String>(engine_args[1], "container");
blob_path = checkAndGetLiteralArgument<String>(engine_args[2], "blobpath");
Expand Down Expand Up @@ -279,6 +286,14 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
}

blobs_paths = {blob_path};
if (account_name && account_key)
{
if (saved_params.empty())
{
saved_params.push_back(*account_name);
saved_params.push_back(*account_key);
}
}
connection_params = getConnectionParams(connection_url, container_name, account_name, account_key, context);
}

Expand Down Expand Up @@ -444,6 +459,22 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
}
}

void StorageAzureConfiguration::getTableFunctionArguments(ASTs & args) const
{
if (!args.empty())
{ /// Just check
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just remove this comment

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, if you implement this function in the way I suggested in #615 (comment), perhaps this whole if statement is no longer necessary?

throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty");
}

args.push_back(std::make_shared<ASTLiteral>(connection_params.endpoint.storage_account_url));
args.push_back(std::make_shared<ASTIdentifier>(connection_params.endpoint.container_name));
args.push_back(std::make_shared<ASTLiteral>(blob_path));
for (const auto & arg : saved_params)
{
args.push_back(std::make_shared<ASTLiteral>(arg));
}
}

}

#endif
3 changes: 3 additions & 0 deletions src/Storages/ObjectStorage/Azure/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,16 @@ class StorageAzureConfiguration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;

void getTableFunctionArguments(ASTs & args) const override;

protected:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;

std::string blob_path;
std::vector<String> blobs_paths;
AzureBlobStorage::ConnectionParams connection_params;
std::vector<std::string> saved_params;
};

}
Expand Down
10 changes: 10 additions & 0 deletions src/Storages/ObjectStorage/HDFS/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,16 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded(
}
}

void StorageHDFSConfiguration::getTableFunctionArguments(ASTs & args) const
{
if (!args.empty())
{ /// Just check
throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty");
}

args.push_back(std::make_shared<ASTLiteral>(url + path));
}

}

#endif
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/HDFS/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class StorageHDFSConfiguration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;

void getTableFunctionArguments(ASTs & args) const override;

private:
void fromNamedCollection(const NamedCollection &, ContextPtr context) override;
void fromAST(ASTs & args, ContextPtr, bool /* with_structure */) override;
Expand Down
31 changes: 28 additions & 3 deletions src/Storages/ObjectStorage/S3/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,11 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_

if (engine_args_to_idx.contains("format"))
{
format = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["format"]], "format");
auto format_ = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["format"]], "format");
/// Set format to configuration only of it's not 'auto',
/// because we can have default format set in configuration.
if (format != "auto")
format = format;
if (format_ != "auto")
format = format_;
}

if (engine_args_to_idx.contains("structure"))
Expand Down Expand Up @@ -585,6 +585,31 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
}
}

void StorageS3Configuration::getTableFunctionArguments(ASTs & args) const
{
if (!args.empty())
{ /// Just check
throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty");
}

args.push_back(std::make_shared<ASTLiteral>(url.uri_str));
if (auth_settings[S3AuthSetting::no_sign_request])
{
args.push_back(std::make_shared<ASTLiteral>("NOSIGN"));
}
else
{
args.push_back(std::make_shared<ASTLiteral>(auth_settings[S3AuthSetting::access_key_id].value));
args.push_back(std::make_shared<ASTLiteral>(auth_settings[S3AuthSetting::secret_access_key].value));
if (!auth_settings[S3AuthSetting::session_token].value.empty())
args.push_back(std::make_shared<ASTLiteral>(auth_settings[S3AuthSetting::session_token].value));
if (format != "auto")
args.push_back(std::make_shared<ASTLiteral>(format));
if (!compression_method.empty())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok not to add some other arguments like structure and headers as the docs suggest? https://clickhouse.com/docs/en/sql-reference/table-functions/s3

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this fields are already added in addStructureAndFormatToArgsIfNeeded methods.
https://github.com/ClickHouse/ClickHouse/blob/master/src/Storages/ObjectStorage/S3/Configuration.cpp#L399
These field required for table functions too, when access parameters need to add only for table engine case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: perhaps add a comment saying this is already added somewhere else?

Copy link
Author

@ianton-ru ianton-ru Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I rename method to addPathAndAccessKeysToArgs, to consistency with addStructureAndFormatToArgsIfNeeded

args.push_back(std::make_shared<ASTLiteral>(compression_method));
}
}

}

#endif
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/S3/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ class StorageS3Configuration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;

void getTableFunctionArguments(ASTs & args) const override;

private:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;
Expand Down
6 changes: 3 additions & 3 deletions src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ String StorageObjectStorage::getPathSample(ContextPtr context)
if (context->getSettingsRef()[Setting::use_hive_partitioning])
local_distributed_processing = false;

if (!configuration->isArchive() && !configuration->isPathWithGlobs() && !local_distributed_processing)
return configuration->getPath();

auto file_iterator = StorageObjectStorageSource::createFileIterator(
configuration,
query_settings,
Expand All @@ -72,9 +75,6 @@ String StorageObjectStorage::getPathSample(ContextPtr context)
{} // file_progress_callback
);

if (!configuration->isArchive() && !configuration->isPathWithGlobs() && !local_distributed_processing)
return configuration->getPath();

if (auto file = file_iterator->next(0))
return file->getPath();
return "";
Expand Down
4 changes: 4 additions & 0 deletions src/Storages/ObjectStorage/StorageObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ class StorageObjectStorage::Configuration

virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context);

virtual void getTableFunctionArguments(ASTs & /* args */) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getTableFunctionArguments is not supported by storage {}", getEngineName());
}

protected:
virtual void fromNamedCollection(const NamedCollection & collection, ContextPtr context) = 0;
Expand Down
Loading
Loading