Skip to content

Commit

Permalink
support cooldown data do linked shcema change
Browse files Browse the repository at this point in the history
  • Loading branch information
duanxujian committed Oct 29, 2024
1 parent 141b452 commit 05c7d5a
Show file tree
Hide file tree
Showing 16 changed files with 153 additions and 43 deletions.
4 changes: 4 additions & 0 deletions be/src/io/fs/broker_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -546,4 +546,8 @@ std::string BrokerFileSystem::error_msg(const std::string& err) const {
return fmt::format("({}:{}), {}", _broker_addr.hostname, _broker_addr.port, err);
}

Status BrokerFileSystem::copy_path_impl(const Path& src, const Path& dest) {
return Status::NotSupported("BrokerFileSystem not support this method!");
}

} // namespace doris::io
1 change: 1 addition & 0 deletions be/src/io/fs/broker_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class BrokerFileSystem final : public RemoteFileSystem {
const std::string& checksum) override;
Status download_impl(const Path& remote_file, const Path& local_file) override;
Status direct_download_impl(const Path& remote_file, std::string* content) override;
Status copy_path_impl(const Path& src, const Path& dest) override;

private:
BrokerFileSystem(const TNetworkAddress& broker_addr,
Expand Down
6 changes: 6 additions & 0 deletions be/src/io/fs/file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,11 @@ Status FileSystem::rename_dir(const Path& orig_name, const Path& new_name) {
FILESYSTEM_M(rename_dir_impl(orig_path, new_path));
}

Status FileSystem::copy_path(const Path& src, const Path& dest) {
auto src_path = absolute_path(src);
auto dest_path = absolute_path(dest);
FILESYSTEM_M(copy_path_impl(src_path, dest_path));
}

} // namespace io
} // namespace doris
4 changes: 4 additions & 0 deletions be/src/io/fs/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class FileSystem : public std::enable_shared_from_this<FileSystem> {
Status list(const Path& dir, bool only_file, std::vector<FileInfo>* files, bool* exists);
Status rename(const Path& orig_name, const Path& new_name);
Status rename_dir(const Path& orig_name, const Path& new_name);
// Copy src path to dest path. If `src` is a directory, this method will call recursively for each directory entry.
Status copy_path(const Path& src, const Path& dest);

std::shared_ptr<FileSystem> getSPtr() { return shared_from_this(); }

Expand Down Expand Up @@ -166,6 +168,8 @@ class FileSystem : public std::enable_shared_from_this<FileSystem> {
/// rename dir from orig_name to new_name
virtual Status rename_dir_impl(const Path& orig_name, const Path& new_name) = 0;

virtual Status copy_path_impl(const Path& src, const Path& dest) = 0;

virtual Path absolute_path(const Path& path) const {
if (path.is_absolute()) {
return path;
Expand Down
13 changes: 13 additions & 0 deletions be/src/io/fs/hdfs_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,19 @@ Status HdfsFileSystem::direct_download_impl(const Path& remote_file, std::string
return Status::OK();
}

Status HdfsFileSystem::copy_path_impl(const Path& src, const Path& dest) {
Path src_path = convert_path(src, _fs_name);
Path dest_path = convert_path(dest, _fs_name);
CHECK_HDFS_HANDLE(_fs_handle);
int ret = hdfsCopy(_fs_handle->hdfs_fs, src_path.c_str(), _fs_handle->hdfs_fs, dest_path.c_str());
if (ret != 0) {
return Status::IOError("fail to copy path from {} to {}: {}", src_path.native(),
dest_path.native(), hdfs_error());
}
LOG(INFO) << "succeed to copy path from " << src_path.native() << " to " << dest_path.native();
return Status::OK();
}

HdfsFileSystemHandle* HdfsFileSystem::get_handle() {
return _fs_handle;
}
Expand Down
1 change: 1 addition & 0 deletions be/src/io/fs/hdfs_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ class HdfsFileSystem final : public RemoteFileSystem {
const std::string& checksum) override;
Status download_impl(const Path& remote_file, const Path& local_file) override;
Status direct_download_impl(const Path& remote_file, std::string* content) override;
Status copy_path_impl(const Path& src, const Path& dest) override;

private:
Status delete_internal(const Path& path, int is_recursive);
Expand Down
6 changes: 0 additions & 6 deletions be/src/io/fs/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,6 @@ Status LocalFileSystem::get_space_info_impl(const Path& path, size_t* capacity,
return Status::OK();
}

Status LocalFileSystem::copy_path(const Path& src, const Path& dest) {
auto src_path = absolute_path(src);
auto dest_path = absolute_path(dest);
FILESYSTEM_M(copy_path_impl(src_path, dest_path));
}

Status LocalFileSystem::copy_path_impl(const Path& src, const Path& dest) {
std::error_code ec;
std::filesystem::copy(src, dest, std::filesystem::copy_options::recursive, ec);
Expand Down
5 changes: 2 additions & 3 deletions be/src/io/fs/local_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ class LocalFileSystem final : public FileSystem {
Status delete_and_create_directory(const Path& dir);
// return disk available space where the given path is.
Status get_space_info(const Path& path, size_t* capacity, size_t* available);
// Copy src path to dest path. If `src` is a directory, this method will call recursively for each directory entry.
Status copy_path(const Path& src, const Path& dest);
// return true if parent path contain sub path
static bool contain_path(const Path& parent, const Path& sub);
// delete dir or file
Expand Down Expand Up @@ -100,14 +98,15 @@ class LocalFileSystem final : public FileSystem {
bool* exists) override;
Status rename_impl(const Path& orig_name, const Path& new_name) override;
Status rename_dir_impl(const Path& orig_name, const Path& new_name) override;
Status copy_path_impl(const Path& src, const Path& dest) override;

Status link_file_impl(const Path& src, const Path& dest);
Status md5sum_impl(const Path& file, std::string* md5sum);
Status iterate_directory_impl(const std::string& dir,
const std::function<bool(const FileInfo&)>& cb);
Status mtime_impl(const Path& file, time_t* m_time);
Status delete_and_create_directory_impl(const Path& dir);
Status get_space_info_impl(const Path& path, size_t* capacity, size_t* available);
Status copy_path_impl(const Path& src, const Path& dest);
Status delete_directory_or_file_impl(const Path& path);
Status permission_impl(const Path& file, std::filesystem::perms prms);

Expand Down
5 changes: 5 additions & 0 deletions be/src/io/fs/s3_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,13 +520,18 @@ Status S3FileSystem::copy(const Path& src, const Path& dst) {
.WithBucket(_s3_conf.bucket);
Aws::S3::Model::CopyObjectOutcome response = _client->CopyObject(request);
if (response.IsSuccess()) {
LOG(INFO) << "succeed to copy from " << src.native() << " to " << dst.native();
return Status::OK();
} else {
return Status::IOError("failed to copy from {} to {}: {}", src.native(), dst.native(),
error_msg(src_key, response));
}
}

Status S3FileSystem::copy_path_impl(const Path& src, const Path& dst) {
return copy(src, dst);
}

Status S3FileSystem::copy_dir(const Path& src, const Path& dst) {
std::vector<FileInfo> files;
bool exists = false;
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/fs/s3_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class S3FileSystem final : public RemoteFileSystem {
}
}

Status copy_path_impl(const Path& src, const Path& dest) override;

private:
S3FileSystem(S3Conf&& s3_conf, std::string&& id, RuntimeProfile* profile);

Expand Down
13 changes: 6 additions & 7 deletions be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,20 +321,19 @@ Status BetaRowset::link_files_to(const std::string& dir, RowsetId new_rowset_id,
}

Status BetaRowset::copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) {
if (!is_local() && num_segments() > 0) [[unlikely]] {
DCHECK(false) << rowset_id();
return Status::NotSupported("cannot copy remote files, rowset_id={}",
rowset_id().to_string());
auto fs = _rowset_meta->fs();
if (!fs) {
return Status::Error<INIT_FAILED>("get fs failed");
}
bool exists = false;
for (int i = 0; i < num_segments(); ++i) {
auto dst_path = segment_file_path(dir, new_rowset_id, i);
RETURN_IF_ERROR(io::global_local_filesystem()->exists(dst_path, &exists));
RETURN_IF_ERROR(fs->exists(dst_path, &exists));
if (exists) {
return Status::Error<FILE_ALREADY_EXIST>("file already exist: {}", dst_path);
}
auto src_path = segment_file_path(i);
RETURN_IF_ERROR(io::global_local_filesystem()->copy_path(src_path, dst_path));
RETURN_IF_ERROR(fs->copy_path(src_path, dst_path));
for (auto& column : _schema->columns()) {
// if (column.has_inverted_index()) {
const TabletIndex* index_meta = _schema->get_inverted_index(column.unique_id());
Expand All @@ -345,7 +344,7 @@ Status BetaRowset::copy_files_to(const std::string& dir, const RowsetId& new_row
std::string inverted_index_dst_file_path =
InvertedIndexDescriptor::get_index_file_name(dst_path,
index_meta->index_id());
RETURN_IF_ERROR(io::global_local_filesystem()->copy_path(
RETURN_IF_ERROR(fs->copy_path(
inverted_index_src_file_path, inverted_index_dst_file_path));
LOG(INFO) << "success to copy file. from=" << inverted_index_src_file_path << ", "
<< "to=" << inverted_index_dst_file_path;
Expand Down
17 changes: 16 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "gutil/integral_types.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_reader_options.h"
Expand All @@ -49,6 +50,7 @@
#include "olap/rowset/segment_v2/segment_writer.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_schema.h"
#include "runtime/thread_context.h"
#include "segcompaction.h"
Expand Down Expand Up @@ -448,7 +450,20 @@ Status BetaRowsetWriter::_add_block(const vectorized::Block* block,

Status BetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET);
RETURN_IF_ERROR(rowset->link_files_to(_context.rowset_dir, _context.rowset_id));
if (rowset->is_local()) {
RETURN_IF_ERROR(rowset->link_files_to(_context.rowset_dir, _context.rowset_id));
} else {
// use old rowset id for remote rowset
_rowset_meta->set_rowset_id(rowset->rowset_meta()->rowset_id());
_context.rowset_id = rowset->rowset_meta()->rowset_id();
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(rowset->rowset_meta()->tablet_id());
if (tablet->cooldown_conf().first == tablet->replica_id()) {
RETURN_IF_ERROR(rowset->copy_files_to(_context.rowset_dir, _context.rowset_id));
} else {
LOG(INFO) << "tablet is not cooldown replica, skip copy remote file. tablet: " << _context.tablet_id;
}
}
_num_rows_written += rowset->num_rows();
_total_data_size += rowset->rowset_meta()->data_disk_size();
_total_index_size += rowset->rowset_meta()->index_disk_size();
Expand Down
13 changes: 5 additions & 8 deletions be/src/olap/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,11 @@ class RowsetMeta {

// This method may return nullptr.
const io::FileSystemSPtr& fs() {
if (!_fs) {
if (is_local()) {
_fs = io::global_local_filesystem();
} else {
_fs = get_filesystem(resource_id());
LOG_IF(WARNING, !_fs) << "Cannot get file system: " << resource_id();
}
// remote resource properties may be modified by 'ALTER RESOURCE'
if (is_local()) {
_fs = io::global_local_filesystem();
} else {
_fs = get_filesystem(resource_id());
}
return _fs;
}
Expand All @@ -104,7 +102,6 @@ class RowsetMeta {
if (fs && fs->type() != io::FileSystemType::LOCAL) {
_rowset_meta_pb.set_resource_id(fs->id());
}
_fs = std::move(fs);
}

const std::string& resource_id() const { return _rowset_meta_pb.resource_id(); }
Expand Down
41 changes: 24 additions & 17 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,12 @@ Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, RowsetWr
DCHECK(ret == 1);
}
}
if (!rowset_reader->rowset()->is_local()) {
// use base_tablet's cooldown_meta_id to ensure that the cooldown_meta_id
// is the same for all replicas of the new tablet
std::lock_guard wlock(new_tablet->get_header_lock());
new_tablet->tablet_meta()->set_cooldown_meta_id(base_tablet->tablet_meta()->cooldown_meta_id());
}
return Status::OK();
}
}
Expand Down Expand Up @@ -925,6 +931,7 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
} while (false);
}

bool is_linked_sc = false;
do {
if (!res) {
break;
Expand Down Expand Up @@ -970,7 +977,7 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
_tablet_ids_in_converting.insert(new_tablet->tablet_id());
}
int64_t real_alter_version = 0;
res = _convert_historical_rowsets(sc_params, &real_alter_version);
res = _convert_historical_rowsets(sc_params, &real_alter_version, &is_linked_sc);
{
std::lock_guard<std::shared_mutex> wrlock(_mutex);
_tablet_ids_in_converting.erase(new_tablet->tablet_id());
Expand Down Expand Up @@ -1002,7 +1009,12 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
if (res) {
// _validate_alter_result should be outside the above while loop.
// to avoid requiring the header lock twice.
res = _validate_alter_result(new_tablet, request);
if (is_linked_sc && base_tablet->cooldown_conf().first != base_tablet->replica_id()) {
// only cooldown replica will do data copy for linked sc, skip validate for follower replica.
LOG(INFO) << "follower cooldown replica linked sc, skip _validate_alter_result";
} else {
res = _validate_alter_result(new_tablet, request);
}
}

// if failed convert history data, then just remove the new tablet
Expand Down Expand Up @@ -1039,7 +1051,7 @@ Status SchemaChangeHandler::_get_versions_to_be_changed(
// The `real_alter_version` parameter indicates that the version of [0-real_alter_version] is
// converted from a base tablet, only used for the mow table now.
Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams& sc_params,
int64_t* real_alter_version) {
int64_t* real_alter_version, bool* is_linked_sc) {
LOG(INFO) << "begin to convert historical rowsets for new_tablet from base_tablet."
<< " base_tablet=" << sc_params.base_tablet->full_name()
<< ", new_tablet=" << sc_params.new_tablet->full_name();
Expand All @@ -1066,6 +1078,10 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
<< ", base_tablet=" << sc_params.base_tablet->full_name()
<< ", new_tablet=" << sc_params.new_tablet->full_name();

if (!sc_directly && !sc_sorting) {
*is_linked_sc = true;
}

auto process_alter_exit = [&]() -> Status {
{
// save tablet meta here because rowset meta is not saved during add rowset
Expand Down Expand Up @@ -1117,8 +1133,12 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap();
context.tablet_schema = new_tablet->tablet_schema();
context.newest_write_timestamp = rs_reader->newest_write_timestamp();
context.fs = io::global_local_filesystem();
context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE;
if (!sc_sorting && !sc_directly) {
context.fs = rs_reader->rowset()->rowset_meta()->fs();
} else {
context.fs = io::global_local_filesystem();
}
Status status = new_tablet->create_rowset_writer(context, &rowset_writer);
if (!status.ok()) {
res = Status::Error<ROWSET_BUILDER_INIT>("create_rowset_writer failed, reason={}",
Expand Down Expand Up @@ -1318,19 +1338,6 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
}
}
}

// if rs_reader has remote files, link schema change is not supported,
// use directly schema change instead.
if (!(*sc_directly) && !(*sc_sorting)) {
// check has remote rowset
for (auto& rs_reader : sc_params.ref_rowset_readers) {
if (!rs_reader->rowset()->is_local()) {
*sc_directly = true;
break;
}
}
}

return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ class SchemaChangeHandler {
const TAlterTabletReqV2& request);

static Status _convert_historical_rowsets(const SchemaChangeParams& sc_params,
int64_t* real_alter_version);
int64_t* real_alter_version, bool* is_linked_sc);

static Status _parse_request(const SchemaChangeParams& sc_params, BlockChanger* changer,
bool* sc_sorting, bool* sc_directly);
Expand Down
Loading

0 comments on commit 05c7d5a

Please sign in to comment.