Skip to content

Commit

Permalink
fix: etcd range pagenation in table metadata migration tool (#2035)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelScofield authored Jul 25, 2023
1 parent 0b4ac98 commit 48996b0
Showing 1 changed file with 47 additions and 10 deletions.
57 changes: 47 additions & 10 deletions src/cmd/src/cli/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ use common_meta::key::table_info::{TableInfoKey, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameValue};
use common_meta::key::table_region::{RegionDistribution, TableRegionKey, TableRegionValue};
use common_meta::key::TableMetaKey;
use common_meta::rpc::store::{BatchPutRequest, PutRequest, RangeRequest};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchPutRequest, PutRequest, RangeRequest, RangeResponse,
};
use common_meta::util::get_prefix_end_key;
use common_telemetry::info;
use etcd_client::Client;
use meta_srv::service::store::etcd::EtcdStore;
Expand Down Expand Up @@ -61,24 +64,58 @@ struct MigrateTableMetadata {
#[async_trait]
impl Tool for MigrateTableMetadata {
async fn do_work(&self) -> Result<()> {
let req = RangeRequest::new().with_prefix(b"__tg".to_vec());
let resp = self.etcd_store.range(req).await.unwrap();
for kv in resp.kvs {
let key = String::from_utf8_lossy(kv.key());
let value = TableGlobalValue::from_bytes(kv.value())
.unwrap_or_else(|e| panic!("table global value is corrupted: {e}, key: {key}"));
let mut key = b"__tg".to_vec();
let range_end = get_prefix_end_key(&key);

self.create_table_name_key(&value).await;
let mut processed_keys = 0;
loop {
info!("Start scanning key from: {}", String::from_utf8_lossy(&key));

self.create_datanode_table_keys(&value).await;
let req = RangeRequest::new()
.with_range(key, range_end.clone())
.with_limit(1000);
let resp = self.etcd_store.range(req).await.unwrap();
for kv in resp.kvs.iter() {
let key = String::from_utf8_lossy(kv.key());
let value = TableGlobalValue::from_bytes(kv.value())
.unwrap_or_else(|e| panic!("table global value is corrupted: {e}, key: {key}"));

self.split_table_global_value(&key, value).await;
self.create_table_name_key(&value).await;

self.create_datanode_table_keys(&value).await;

self.split_table_global_value(&key, value).await;
}

self.delete_migrated_keys(&resp).await;

processed_keys += resp.kvs.len();

if resp.more {
key = get_prefix_end_key(resp.kvs.last().unwrap().key());
} else {
break;
}
}
info!("Total migrated TableGlobalKeys: {processed_keys}");
Ok(())
}
}

impl MigrateTableMetadata {
async fn delete_migrated_keys(&self, resp: &RangeResponse) {
info!("Deleting {} TableGlobalKeys", resp.kvs.len());
let req = BatchDeleteRequest {
keys: resp.kvs.iter().map(|kv| kv.key().to_vec()).collect(),
prev_kv: false,
};
if self.dryrun {
info!("Dryrun: do nothing");
} else {
self.etcd_store.batch_delete(req).await.unwrap();
}
}

async fn split_table_global_value(&self, key: &str, value: TableGlobalValue) {
let table_id = value.table_id();
let region_distribution: RegionDistribution = value.regions_id_map.into_iter().collect();
Expand Down

0 comments on commit 48996b0

Please sign in to comment.