Skip to content

Commit

Permalink
[fix](cloud) Support clean tablet file cache when tablet drop (#46390)
Browse files Browse the repository at this point in the history
  • Loading branch information
deardeng authored Jan 13, 2025
1 parent d9eb14a commit be2932a
Show file tree
Hide file tree
Showing 8 changed files with 424 additions and 106 deletions.
43 changes: 39 additions & 4 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1659,11 +1659,46 @@ void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& r
.tag("tablet_id", drop_tablet_req.tablet_id);
return;
});
// 1. erase lru from tablet mgr
// TODO(dx) clean tablet file cache
// get tablet's info(such as cachekey, tablet id, rsid)
MonotonicStopWatch watch;
watch.start();
auto weak_tablets = engine.tablet_mgr().get_weak_tablets();
std::ostringstream rowset_ids_stream;
bool found = false;
for (auto& weak_tablet : weak_tablets) {
auto tablet = weak_tablet.lock();
if (tablet == nullptr) {
continue;
}
if (tablet->tablet_id() != drop_tablet_req.tablet_id) {
continue;
}
found = true;
auto clean_rowsets = tablet->get_snapshot_rowset(true);
// Get first 10 rowset IDs as comma-separated string, just for log
int count = 0;
for (const auto& rowset : clean_rowsets) {
if (count >= 10) break;
if (count > 0) {
rowset_ids_stream << ",";
}
rowset_ids_stream << rowset->rowset_id().to_string();
count++;
}

CloudTablet::recycle_cached_data(std::move(clean_rowsets));
break;
}

if (!found) {
LOG(WARNING) << "tablet not found when dropping tablet_id=" << drop_tablet_req.tablet_id
<< ", cost " << static_cast<int64_t>(watch.elapsed_time()) / 1e9 << "(s)";
return;
}

engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id);
// 2. gen clean file cache task
LOG(INFO) << "drop cloud tablet_id=" << drop_tablet_req.tablet_id
<< " and clean file cache first 10 rowsets {" << rowset_ids_stream.str() << "}, cost "
<< static_cast<int64_t>(watch.elapsed_time()) / 1e9 << "(s)";
return;
}

Expand Down
6 changes: 5 additions & 1 deletion be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,8 @@ void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,

uint64_t CloudTablet::delete_expired_stale_rowsets() {
std::vector<RowsetSharedPtr> expired_rowsets;
// ATTN: trick, Use stale_rowsets to temporarily increase the reference count of the rowset shared pointer in _stale_rs_version_map so that in the recycle_cached_data function, it checks if the reference count is 2.
std::vector<RowsetSharedPtr> stale_rowsets;
int64_t expired_stale_sweep_endtime =
::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec;
std::vector<std::string> version_to_delete;
Expand All @@ -409,6 +411,7 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
auto rs_it = _stale_rs_version_map.find(v_ts->version());
if (rs_it != _stale_rs_version_map.end()) {
expired_rowsets.push_back(rs_it->second);
stale_rowsets.push_back(rs_it->second);
LOG(INFO) << "erase stale rowset, tablet_id=" << tablet_id()
<< " rowset_id=" << rs_it->second->rowset_id().to_string()
<< " version=" << rs_it->first.to_string();
Expand Down Expand Up @@ -456,7 +459,8 @@ void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& rowset

if (config::enable_file_cache) {
for (const auto& rs : rowsets) {
if (rs.use_count() >= 1) {
// rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2.
if (rs.use_count() > 2) {
LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has "
<< rs.use_count()
<< " references. File Cache won't be recycled when query is using it.";
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,12 @@ class CloudTablet final : public BaseTablet {

void build_tablet_report_info(TTabletInfo* tablet_info);

static void recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets);

private:
// FIXME(plat1ko): No need to record base size if rowsets are ordered by version
void update_base_size(const Rowset& rs);

static void recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets);

Status sync_if_not_running();

CloudStorageEngine& _engine;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.apache.doris.regression.action.HttpCliAction
import org.apache.doris.regression.util.DataUtils
import org.apache.doris.regression.util.JdbcUtils
import org.apache.doris.regression.util.Hdfs
import org.apache.doris.regression.util.Http
import org.apache.doris.regression.util.SuiteUtils
import org.apache.doris.regression.util.DebugPoint
import org.apache.doris.regression.RunMode
Expand Down Expand Up @@ -2831,4 +2832,94 @@ class Suite implements GroovyInterceptable {
assertEquals(re_fe, re_be)
assertEquals(re_fe, re_no_fold)
}

def backendIdToHost = { ->
def spb = sql_return_maparray """SHOW BACKENDS"""
def beIdToHost = [:]
spb.each {
beIdToHost[it.BackendId] = it.Host
}
beIdToHost
}

def getTabletAndBeHostFromBe = { bes ->
def ret = [:]
bes.each { be ->
// {"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3}
def data = Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true).data
def tablets = data.tablets.collect { it.tablet_id as String }
tablets.each{
ret[it] = data.host
}
}
ret
}

def getTabletAndBeHostFromFe = { table ->
def result = sql_return_maparray """SHOW TABLETS FROM $table"""
def bes = backendIdToHost.call()
// tablet : [backendId, host]
def ret = [:]
result.each {
ret[it.TabletId] = [it.BackendId, bes[it.BackendId]]
}
ret
}

// get rowset_id segment_id from ms
// curl '175.40.101.1:5000/MetaService/http/get_value?token=greedisgood9999&unicode&key_type=MetaRowsetKey&instance_id=default_instance_id&tablet_id=27700&version=2'
def getSegmentFilesFromMs = { msHttpPort, tabletId, version, check_func ->
httpTest {
endpoint msHttpPort
op "get"
uri "/MetaService/http/get_value?token=greedisgood9999&unicode&key_type=MetaRowsetKey&instance_id=default_instance_id&tablet_id=${tabletId}&version=${version}"
check check_func
}
}

def getRowsetFileCacheDirFromBe = { beHttpPort, msHttpPort, tabletId, version ->
def hashValues = []
def segmentFiles = []
getSegmentFilesFromMs(msHttpPort, tabletId, version) {
respCode, body ->
def json = parseJson(body)
logger.info("get tablet {} version {} from ms, response {}", tabletId, version, json)
// {"rowset_id":"0","partition_id":"27695","tablet_id":"27700","txn_id":"7057526525952","tablet_schema_hash":0,"rowset_type":"BETA_ROWSET","rowset_state":"COMMITTED","start_version":"3","end_version":"3","version_hash":"0","num_rows":"1","total_disk_size":"895","data_disk_size":"895","index_disk_size":"0","empty":false,"load_id":{"hi":"-1646598626735601581","lo":"-6677682539881484579"},"delete_flag":false,"creation_time":"1736153402","num_segments":"1","rowset_id_v2":"0200000000000004694889e84c76391cfd52ec7db0a483ba","resource_id":"1","newest_write_timestamp":"1736153402","segments_key_bounds":[{"min_key":"AoAAAAAAAAAC","max_key":"AoAAAAAAAAAC"}],"txn_expiration":"1736167802","segments_overlap_pb":"NONOVERLAPPING","compaction_level":"0","segments_file_size":["895"],"index_id":"27697","schema_version":0,"enable_segments_file_size":true,"has_variant_type_in_schema":false,"enable_inverted_index_file_info":false}
def segmentNum = json.num_segments as int
def rowsetId = json.rowset_id_v2 as String
segmentFiles = (0..<segmentNum).collect { i -> "${rowsetId}_${i}.dat" }
}

segmentFiles.each {
// curl '175.40.51.3:8040/api/file_cache?op=hash&value=0200000000000004694889e84c76391cfd52ec7db0a483ba_0.dat'
def data = Http.GET("http://${beHttpPort}/api/file_cache?op=hash&value=${it}", true)
// {"hash":"2b79c649a1766dad371054ee168f0574"}
logger.info("get tablet {} segmentFile {}, response {}", tabletId, it, data)
hashValues << data.hash
}
hashValues
}

// get table's tablet file cache
def getTabletFileCacheDirFromBe = { msHttpPort, table, version ->
// beHost HashFile
def beHostToHashFile = [:]

def getTabletsAndHostFromFe = getTabletAndBeHostFromFe(table)
getTabletsAndHostFromFe.each {
def beHost = it.Value[1]
def tabletId = it.Key
def hashRet = getRowsetFileCacheDirFromBe(beHost + ":8040", msHttpPort, tabletId, version)
hashRet.each {
def hashFile = it
if (beHostToHashFile.containsKey(beHost)) {
beHostToHashFile[beHost].add(hashFile)
} else {
beHostToHashFile[beHost] = [hashFile]
}
}
}
beHostToHashFile
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class Http {
conn.setRequestProperty('Authorization', 'Basic cm9vdDo=') //token for root
def code = conn.responseCode
def text = conn.content.text
logger.info("http post url=${url}, isJson=${isJson}, response code=${code}, text=${text}")
logger.info("http get url=${url}, isJson=${isJson}, response code=${code}, text=${text}")
Assert.assertEquals(200, code)
if (isJson) {
def json = new JsonSlurper()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.apache.doris.regression.suite.ClusterOptions
import org.apache.doris.regression.util.Http

suite('test_clean_stale_rs_file_cache', 'docker') {
if (!isCloudMode()) {
return;
}
def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
'cloud_tablet_rebalancer_interval_second=1',
'sys_log_verbose_modules=org',
'heartbeat_interval_second=1'
]
options.beConfigs += [
'report_tablet_interval_seconds=1',
'cumulative_compaction_min_deltas=5',
'tablet_rowset_stale_sweep_by_size=false',
'tablet_rowset_stale_sweep_time_sec=60',
'vacuum_stale_rowsets_interval_s=10'
]
options.setFeNum(1)
options.setBeNum(1)
options.cloudMode = true

def table = "test_clean_stale_rs_file_cache"

docker(options) {
def ms = cluster.getAllMetaservices().get(0)
def msHttpPort = ms.host + ":" + ms.httpPort
sql """CREATE TABLE $table (
`k1` int(11) NULL,
`k2` int(11) NULL,
`v1` varchar(2048)
)
DUPLICATE KEY(`k1`, `k2`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES (
"replication_num"="1"
);
"""
// version 2
sql """
insert into $table values (1, 1, 'v1'), (2, 2, 'v2'), (3, 3, 'v3')
"""
def cacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2)
// version 3
sql """
insert into $table values (10, 1, 'v1'), (20, 2, 'v2'), (30, 3, 'v3')
"""
def cacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 2)
// version 4
sql """
insert into $table values (100, 1, 'v1'), (200, 2, 'v2'), (300, 3, 'v3')
"""
// version 5
sql """
insert into $table values (1000, 1, 'v1'), (2000, 2, 'v2'), (3000, 3, 'v3')
"""
// version 6
sql """
insert into $table values (10000, 1, 'v1'), (20000, 2, 'v2'), (30000, 3, 'v3')
"""

def mergedCacheDir = cacheDirVersion2 + cacheDirVersion3.collectEntries { host, hashFiles ->
[(host): cacheDirVersion2[host] ? (cacheDirVersion2[host] + hashFiles) : hashFiles]
}
for (int i = 0; i < 5; i++) {
sql """
select count(*) from $table
"""
}
def beforeGetFromFe = getTabletAndBeHostFromFe(table)
logger.info("fe tablets {}, cache dir {}", beforeGetFromFe , mergedCacheDir)
// wait compaction finish, and vacuum_stale_rowsets work
sleep(80 * 1000)

// check cache file has been deleted
beforeGetFromFe.each {
def tabletId = it.Key
def backendId = it.Value[0]
def backendHost = it.Value[1]
def be = cluster.getBeByBackendId(backendId.toLong())
def dataPath = new File("${be.path}/storage/file_cache")
def subDirs = []

def collectDirs
collectDirs = { File dir ->
if (dir.exists()) {
dir.eachDir { subDir ->
subDirs << subDir.name
collectDirs(subDir)
}
}
}


collectDirs(dataPath)
logger.info("BE {} file_cache subdirs: {}", backendHost, subDirs)
def cacheDir = mergedCacheDir[backendHost]

// add check
cacheDir.each { hashFile ->
assertFalse(subDirs.any { subDir -> subDir.startsWith(hashFile) },
"Found unexpected cache file pattern ${hashFile} in BE ${backendHost}'s file_cache directory. " +
"Matching subdir found in: ${subDirs}")
}
}

}
}
Loading

0 comments on commit be2932a

Please sign in to comment.