Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Refine the error message when schema mismatch in ExchangeReceiver #9744

Merged
merged 2 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions dbms/src/Flash/Coprocessor/CodecUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,39 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Common/TiFlashException.h>
#include <Flash/Coprocessor/CodecUtils.h>
#include <Flash/Coprocessor/DAGUtils.h>

namespace DB
{
namespace ErrorCodes
namespace DB::ErrorCodes
{
extern const int LOGICAL_ERROR;
} // namespace ErrorCodes
} // namespace DB::ErrorCodes

namespace CodecUtils
namespace DB::CodecUtils
{
void checkColumnSize(const String & identifier, size_t expected, size_t actual)
{
if unlikely (expected != actual)
if (unlikely(expected != actual))
throw Exception(
fmt::format("{} schema size mismatch, expected {}, actual {}.", identifier, expected, actual),
ErrorCodes::LOGICAL_ERROR);
ErrorCodes::LOGICAL_ERROR,
"{} schema size mismatch, expected {}, actual {}.",
identifier,
expected,
actual);
}

void checkDataTypeName(const String & identifier, size_t column_index, const String & expected, const String & actual)
{
if unlikely (expected != actual)
if (unlikely(expected != actual))
throw Exception(
fmt::format(
"{} schema mismatch at column {}, expected {}, actual {}",
identifier,
column_index,
expected,
actual),
ErrorCodes::LOGICAL_ERROR);
ErrorCodes::LOGICAL_ERROR,
"{} schema mismatch at column {}, expected {}, actual {}",
identifier,
column_index,
expected,
actual);
}

} // namespace CodecUtils
} // namespace DB
} // namespace DB::CodecUtils
69 changes: 36 additions & 33 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include <magic_enum.hpp>
#include <memory>
#include <mutex>
#include <type_traits>

namespace DB
{
Expand Down Expand Up @@ -816,7 +815,17 @@ ExchangeReceiverResult ExchangeReceiverBase<RPCContext>::toExchangeReceiveResult
recv_msg->getReqInfo(),
recv_msg->getErrorPtr()->msg());

return toDecodeResult(stream_id, block_queue, header, recv_msg, decoder_ptr);
try
{
return toDecodeResult(stream_id, block_queue, header, recv_msg, decoder_ptr);
}
catch (DB::Exception & e)
{
// Add the MPPTask identifier and exector_id to the error message, make it easier to
// identify the specific stage within a complex query where the error occurs
e.addMessage(fmt::format("{}", exc_log->identifier()));
e.rethrow();
}
}
case ReceiveStatus::eof:
return handleUnnormalChannel(block_queue, decoder_ptr);
Expand Down Expand Up @@ -882,42 +891,36 @@ ExchangeReceiverResult ExchangeReceiverBase<RPCContext>::toDecodeResult(
{
assert(recv_msg != nullptr);
const auto * resp_ptr = recv_msg->getRespPtr(stream_id);
if (resp_ptr
!= nullptr) /// the data of the last packet is serialized from tipb::SelectResponse including execution summaries.
{
auto select_resp = std::make_shared<tipb::SelectResponse>();
if (unlikely(!select_resp->ParseFromString(*resp_ptr)))
{
return ExchangeReceiverResult::newError(recv_msg->getSourceIndex(), recv_msg->getReqInfo(), "decode error");
}
else
{
auto result
= ExchangeReceiverResult::newOk(select_resp, recv_msg->getSourceIndex(), recv_msg->getReqInfo());
/// If mocking TiFlash as TiDB, we should decode chunks from select_resp.
if (unlikely(!result.resp->chunks().empty()))
{
assert(recv_msg->getChunks(stream_id).empty());
// Fine grained shuffle should only be enabled when sending data to TiFlash node.
// So all data should be encoded into MPPDataPacket.chunks.
RUNTIME_CHECK_MSG(
!enable_fine_grained_shuffle_flag,
"Data should not be encoded into tipb::SelectResponse.chunks when fine grained shuffle is enabled");
result.decode_detail = CoprocessorReader::decodeChunks(select_resp, block_queue, header, schema);
}
else if (!recv_msg->getChunks(stream_id).empty())
{
result.decode_detail = decodeChunks(stream_id, recv_msg, block_queue, decoder_ptr);
}
return result;
}
}
else /// the non-last packets
if (resp_ptr == nullptr)
{
/// the non-last packets
auto result = ExchangeReceiverResult::newOk(nullptr, recv_msg->getSourceIndex(), recv_msg->getReqInfo());
result.decode_detail = decodeChunks(stream_id, recv_msg, block_queue, decoder_ptr);
return result;
}

/// the data of the last packet is serialized from tipb::SelectResponse including execution summaries.
auto select_resp = std::make_shared<tipb::SelectResponse>();
if (unlikely(!select_resp->ParseFromString(*resp_ptr)))
return ExchangeReceiverResult::newError(recv_msg->getSourceIndex(), recv_msg->getReqInfo(), "decode error");

auto result = ExchangeReceiverResult::newOk(select_resp, recv_msg->getSourceIndex(), recv_msg->getReqInfo());
/// If mocking TiFlash as TiDB, we should decode chunks from select_resp.
if (unlikely(!result.resp->chunks().empty()))
{
assert(recv_msg->getChunks(stream_id).empty());
// Fine grained shuffle should only be enabled when sending data to TiFlash node.
// So all data should be encoded into MPPDataPacket.chunks.
RUNTIME_CHECK_MSG(
!enable_fine_grained_shuffle_flag,
"Data should not be encoded into tipb::SelectResponse.chunks when fine grained shuffle is enabled");
result.decode_detail = CoprocessorReader::decodeChunks(select_resp, block_queue, header, schema);
}
else if (!recv_msg->getChunks(stream_id).empty())
{
result.decode_detail = decodeChunks(stream_id, recv_msg, block_queue, decoder_ptr);
}
return result;
}

template <typename RPCContext>
Expand Down
6 changes: 2 additions & 4 deletions dbms/src/Flash/Mpp/ExchangeReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
#include <Flash/Mpp/AsyncRequestHandler.h>
#include <Flash/Mpp/GRPCReceiverContext.h>

#include <future>
#include <memory>
#include <mutex>
#include <thread>

namespace DB
{
Expand All @@ -36,8 +34,8 @@ struct ExchangeReceiverResult
size_t call_index;
String req_info;
bool meet_error;
String error_msg;
bool eof;
String error_msg;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Define error_msg after eof so that meet_error and eof can share the same padding and minimal the struct size of ExchangeReceiverResult

DecodeDetail decode_detail;

ExchangeReceiverResult()
Expand Down Expand Up @@ -74,8 +72,8 @@ struct ExchangeReceiverResult
, call_index(call_index_)
, req_info(req_info_)
, meet_error(meet_error_)
, error_msg(error_msg_)
, eof(eof_)
, error_msg(error_msg_)
{}
};

Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1021,8 +1021,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
}

LOG_INFO(log, "Using api_version={}", storage_config.api_version);

// Set whether to use safe point v2.
PDClientHelper::enable_safepoint_v2 = config().getBool("enable_safe_point_v2", false);

Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Server/StorageConfigParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,12 @@ void TiFlashStorageConfig::parseMisc(const String & storage_section, const Logge

lazily_init_store = get_bool_config_or_default("lazily_init_store", lazily_init_store);

LOG_INFO(log, "format_version {} lazily_init_store {}", format_version, lazily_init_store);
LOG_INFO(
log,
"format_version={} lazily_init_store={} api_version={}",
format_version,
lazily_init_store,
api_version);
}

Strings TiFlashStorageConfig::getAllNormalPaths() const
Expand Down
132 changes: 85 additions & 47 deletions dbms/src/Storages/DeltaMerge/RestoreDMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Interpreters/Context.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/RestoreDMFile.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/Page/PageStorage.h>
Expand All @@ -29,38 +31,61 @@ DMFilePtr restoreDMFileFromRemoteDataSource(
UInt64 file_page_id,
UInt64 meta_version)
{
auto path_delegate = dm_context.path_pool->getStableDiskDelegator();
auto wn_ps = dm_context.global_context.getWriteNodePageStorage();
auto full_page_id = UniversalPageIdFormat::toFullPageId(
UniversalPageIdFormat::toFullPrefix(dm_context.keyspace_id, StorageType::Data, dm_context.physical_table_id),
file_page_id);
auto full_external_id = wn_ps->getNormalPageId(full_page_id);
auto local_external_id = UniversalPageIdFormat::getU64ID(full_external_id);
auto remote_data_location = wn_ps->getCheckpointLocation(full_page_id);
const auto & lock_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id));
auto file_oid = lock_key_view.asDataFile().getDMFileOID();
auto prepared = remote_data_store->prepareDMFile(file_oid, file_page_id);
auto dmfile = prepared->restore(DMFileMeta::ReadMode::all(), meta_version);
// gc only begin to run after restore so we can safely call addRemoteDTFileIfNotExists here
path_delegate.addRemoteDTFileIfNotExists(local_external_id, dmfile->getBytesOnDisk());
DMFilePtr dmfile;
try
{
auto path_delegate = dm_context.path_pool->getStableDiskDelegator();
auto wn_ps = dm_context.global_context.getWriteNodePageStorage();
auto full_page_id = UniversalPageIdFormat::toFullPageId(
UniversalPageIdFormat::toFullPrefix(
dm_context.keyspace_id,
StorageType::Data,
dm_context.physical_table_id),
file_page_id);
auto full_external_id = wn_ps->getNormalPageId(full_page_id);
auto local_external_id = UniversalPageIdFormat::getU64ID(full_external_id);
auto remote_data_location = wn_ps->getCheckpointLocation(full_page_id);
const auto & lock_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id));
auto file_oid = lock_key_view.asDataFile().getDMFileOID();
auto prepared = remote_data_store->prepareDMFile(file_oid, file_page_id);
dmfile = prepared->restore(DMFileMeta::ReadMode::all(), meta_version);
// gc only begin to run after restore so we can safely call addRemoteDTFileIfNotExists here
path_delegate.addRemoteDTFileIfNotExists(local_external_id, dmfile->getBytesOnDisk());
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("file_page_id={} meta_version={}", file_page_id, meta_version));
e.rethrow();
}
Comment on lines +55 to +59
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the Exception come from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when trying to use an old version tiflash binary but load new data type

[2024/12/26 23:27:43.778 +08:00] [ERROR] [Exception.cpp:91] ["Storage inited fail, keyspace=4294967295 table_id=127: Code: 50, e.displayText() = DB::Exception: Unknown data type family: StringV2: file_page_id=14494 meta_version=0: while restoreSegment, segment_id=7997 ident=keyspace=4294967295 table_id=127: 
  0x55a0533503d3    StackTrace::StackTrace() [tiflash+35009491]
                    dbms/src/Common/StackTrace.cpp:23
  0x55a05334daa6    DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>> const&, int) [tiflash+34998950]
                    dbms/src/Common/Exception.h:46
  0x55a058b076db    DB::DataTypeFactory::get(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>> const&, std::__1::shared_ptr<DB::IAST> const&) const [tiflash+126985947]
                    dbms/src/DataTypes/DataTypeFactory.cpp:119
  0x55a058b0718b    DB::DataTypeFactory::get(std::__1::shared_ptr<DB::IAST> const&) const [tiflash+126984587]
                    dbms/src/DataTypes/DataTypeFactory.cpp:87
  0x55a058b0740a    DB::DataTypeFactory::getOrSet(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>> const&) [tiflash+126985226]
                    dbms/src/DataTypes/DataTypeFactory.cpp:56
  0x55a0588d4228    DB::DM::ColumnStat::mergeFromProto(dtpb::ColumnStat const&) [tiflash+124678696]
                    dbms/src/Storages/DeltaMerge/File/ColumnStat.h:81
  0x55a0588d0f90    DB::DM::DMFileMetaV2::parseExtendColumnStat(std::__1::basic_string_view<char, std::__1::char_traits<char>>) [tiflash+124665744]
                    dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp:127
  0x55a0588d04b3    DB::DM::DMFileMetaV2::parse(std::__1::basic_string_view<char, std::__1::char_traits<char>>) [tiflash+124662963]
                    dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp:86
  0x55a0588d2771    DB::DM::DMFileMetaV2::read(std::__1::shared_ptr<DB::FileProvider> const&, DB::DM::DMFileMeta::ReadMode const&) [tiflash+124671857]
                    dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp:292
  0x55a0588af57e    DB::DM::DMFile::restore(std::__1::shared_ptr<DB::FileProvider> const&, unsigned long, unsigned long, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>> const&, DB::DM::DMFileMeta::ReadMode const&, unsigned long, unsigned int) [tiflash+124527998]
                    dbms/src/Storages/DeltaMerge/File/DMFile.cpp:0
  0x55a058862b25    DB::DM::restoreDMFileFromLocal(DB::DM::DMContext const&, unsigned long, unsigned long) [tiflash+124214053]
                    dbms/src/Storages/DeltaMerge/RestoreDMFile.cpp:72
  0x55a058859601    DB::DM::StableValueSpace::restore(DB::DM::DMContext&, DB::ReadBuffer&, unsigned long) [tiflash+124175873]
                    dbms/src/Storages/DeltaMerge/StableValueSpace.cpp:197
  0x55a058859456    DB::DM::StableValueSpace::restore(DB::DM::DMContext&, unsigned long) [tiflash+124175446]
                    dbms/src/Storages/DeltaMerge/StableValueSpace.cpp:182
  0x55a05879d494    DB::DM::Segment::restoreSegment(std::__1::shared_ptr<DB::Logger> const&, DB::DM::DMContext&, unsigned long) [tiflash+123405460]
                    dbms/src/Storages/DeltaMerge/Segment.cpp:429
  0x55a058732c07    std::__1::__function::__func<DB::DM::DeltaMergeStore::DeltaMergeStore(DB::Context&, bool, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>> const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>> const&, unsigned int, long, long, bool, std::__1::vector<DB::DM::ColumnDefine, std::__1::allocator<DB::DM::ColumnDefine>> const&, DB::DM::ColumnDefine const&, bool, unsigned long, std::__1::shared_ptr<std::__1::vector<DB::DM::LocalIndexInfo, std::__1::allocator<DB::DM::LocalIndexInfo>>>, DB::DM::DeltaMergeStore::Settings const&, DB::ThreadPoolImpl<DB::ThreadFromGlobalPoolImpl<false>>*)::$_1, std::__1::allocator<DB::DM::DeltaMergeStore::DeltaMergeStore(DB::Context&, bool, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>> const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>> const&, unsigned int, long, long, bool, std::__1::vector<DB::DM::ColumnDefine, std::__1::allocator<DB::DM::ColumnDefine>> const&, DB::DM::ColumnDefine const&, bool, unsigned long, std::__1::shared_ptr<std::__1::vector<DB::DM::LocalIndexInfo, std::__1::allocator<DB::DM::LocalIndexInfo>>>, DB::DM::DeltaMergeStore::Settings const&, DB::ThreadPoolImpl<DB::ThreadFromGlobalPoolImpl<false>>*)::$_1>, void ()>::operator()() [tiflash+122969095]
                    dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp:295
  0x55a0534088f5    std::__1::packaged_task<void ()>::operator()() [tiflash+35764469]
                    /DATA/disk1/ra_common/tiflash-env-17/sysroot/bin/../include/c++/v1/future:1891
  0x55a053406e05    DB::ThreadPoolImpl<DB::ThreadFromGlobalPoolImpl<false>>::worker(std::__1::__list_iterator<DB::ThreadFromGlobalPoolImpl<false>, void*>) [tiflash+35757573]
                    /DATA/disk1/ra_common/tiflash-env-17/sysroot/bin/../include/c++/v1/__functional/function.h:517
  0x55a053409723    std::__1::__function::__func<DB::ThreadFromGlobalPoolImpl<false>::ThreadFromGlobalPoolImpl<void DB::ThreadPoolImpl<DB::ThreadFromGlobalPoolImpl<false>>::scheduleImpl<void>(std::__1::function<void ()>, long, std::__1::optional<unsigned long>, bool)::'lambda0'()>(void&&)::'lambda'(), std::__1::allocator<DB::ThreadFromGlobalPoolImpl<false>::ThreadFromGlobalPoolImpl<void DB::ThreadPoolImpl<DB::ThreadFromGlobalPoolImpl<false>>::scheduleImpl<void>(std::__1::function<void ()>, long, std::__1::optional<unsigned long>, bool)::'lambda0'()>(void&&)::'lambda'()>, void ()>::operator()() [tiflash+35768099]
                    dbms/src/Common/UniThreadPool.cpp:167
  0x55a0534055f5    DB::ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) [tiflash+35751413]
                    /DATA/disk1/ra_common/tiflash-env-17/sysroot/bin/../include/c++/v1/__functional/function.h:517
  0x55a053407cf5    void* std::__1::__thread_proxy[abi:ue170006]<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, void DB::ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, long, std::__1::optional<unsigned long>, bool)::'lambda0'()>>(void*) [tiflash+35761397]
                    dbms/src/Common/UniThreadPool.cpp:167
  0x7fc92f140802    start_thread [libc.so.6+653314]
  0x7fc92f0e0450    clone3 [libc.so.6+259152]

assert(dmfile != nullptr);
return dmfile;
}

DMFilePtr restoreDMFileFromLocal(const DMContext & dm_context, UInt64 file_page_id, UInt64 meta_version)
{
auto path_delegate = dm_context.path_pool->getStableDiskDelegator();
auto file_id = dm_context.storage_pool->dataReader()->getNormalPageId(file_page_id);
auto file_parent_path = path_delegate.getDTFilePath(file_id);
auto dmfile = DMFile::restore(
dm_context.global_context.getFileProvider(),
file_id,
file_page_id,
file_parent_path,
DMFileMeta::ReadMode::all(),
meta_version,
dm_context.keyspace_id);
auto res = path_delegate.updateDTFileSize(file_id, dmfile->getBytesOnDisk());
RUNTIME_CHECK_MSG(res, "update dt file size failed, path={}", dmfile->path());
DMFilePtr dmfile;
try
{
auto path_delegate = dm_context.path_pool->getStableDiskDelegator();
auto file_id = dm_context.storage_pool->dataReader()->getNormalPageId(file_page_id);
auto file_parent_path = path_delegate.getDTFilePath(file_id);
dmfile = DMFile::restore(
dm_context.global_context.getFileProvider(),
file_id,
file_page_id,
file_parent_path,
DMFileMeta::ReadMode::all(),
meta_version,
dm_context.keyspace_id);
auto res = path_delegate.updateDTFileSize(file_id, dmfile->getBytesOnDisk());
RUNTIME_CHECK_MSG(res, "update dt file size failed, path={}", dmfile->path());
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("file_page_id={} meta_version={}", file_page_id, meta_version));
e.rethrow();
}
assert(dmfile != nullptr);
return dmfile;
}

Expand All @@ -72,26 +97,39 @@ DMFilePtr restoreDMFileFromCheckpoint(
UInt64 file_page_id,
UInt64 meta_version)
{
auto full_page_id = UniversalPageIdFormat::toFullPageId(
UniversalPageIdFormat::toFullPrefix(dm_context.keyspace_id, StorageType::Data, dm_context.physical_table_id),
file_page_id);
auto remote_data_location = temp_ps->getCheckpointLocation(full_page_id);
auto data_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id)).asDataFile();
auto file_oid = data_key_view.getDMFileOID();
auto data_key = data_key_view.toFullKey();
auto delegator = dm_context.path_pool->getStableDiskDelegator();
auto new_local_page_id = dm_context.storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
PS::V3::CheckpointLocation loc{
.data_file_id = std::make_shared<String>(data_key),
.offset_in_file = 0,
.size_in_file = 0,
};
wbs.data.putRemoteExternal(new_local_page_id, loc);
auto prepared = remote_data_store->prepareDMFile(file_oid, new_local_page_id);
auto dmfile = prepared->restore(DMFileMeta::ReadMode::all(), meta_version);
wbs.writeLogAndData();
// new_local_page_id is already applied to PageDirectory so we can safely call addRemoteDTFileIfNotExists here
delegator.addRemoteDTFileIfNotExists(new_local_page_id, dmfile->getBytesOnDisk());
DMFilePtr dmfile;
try
{
auto full_page_id = UniversalPageIdFormat::toFullPageId(
UniversalPageIdFormat::toFullPrefix(
dm_context.keyspace_id,
StorageType::Data,
dm_context.physical_table_id),
file_page_id);
auto remote_data_location = temp_ps->getCheckpointLocation(full_page_id);
auto data_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id)).asDataFile();
auto file_oid = data_key_view.getDMFileOID();
auto data_key = data_key_view.toFullKey();
auto delegator = dm_context.path_pool->getStableDiskDelegator();
auto new_local_page_id = dm_context.storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
PS::V3::CheckpointLocation loc{
.data_file_id = std::make_shared<String>(data_key),
.offset_in_file = 0,
.size_in_file = 0,
};
wbs.data.putRemoteExternal(new_local_page_id, loc);
auto prepared = remote_data_store->prepareDMFile(file_oid, new_local_page_id);
dmfile = prepared->restore(DMFileMeta::ReadMode::all(), meta_version);
wbs.writeLogAndData();
// new_local_page_id is already applied to PageDirectory so we can safely call addRemoteDTFileIfNotExists here
delegator.addRemoteDTFileIfNotExists(new_local_page_id, dmfile->getBytesOnDisk());
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("file_page_id={} meta_version={}", file_page_id, meta_version));
e.rethrow();
}
assert(dmfile != nullptr);
return dmfile;
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ SegmentPtr Segment::restoreSegment( //
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("while restoreSegment, segment_id={}", segment_id));
e.addMessage(fmt::format("while restoreSegment, segment_id={} ident={}", segment_id, parent_log->identifier()));
e.rethrow();
}
RUNTIME_CHECK_MSG(false, "unreachable");
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/FormatVersion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const StorageFormatVersion & toStorageFormat(UInt64 setting)
case 103:
return STORAGE_FORMAT_V103;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal setting value: {}", setting);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal format_version value: {}", setting);
}
}
} // namespace
Expand Down