diff --git a/Cargo.lock b/Cargo.lock index 6cce288492c..d2a6e4213a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1134,6 +1134,7 @@ name = "engine_traits" version = "0.0.1" dependencies = [ "error_code", + "fail", "file_system", "kvproto", "log_wrappers", @@ -2097,7 +2098,7 @@ dependencies = [ [[package]] name = "kvproto" version = "0.0.2" -source = "git+https://github.com/pingcap/kvproto.git#ab6bb38882c491c8ee9da13112cb1d9a35695180" +source = "git+https://github.com/pingcap/kvproto.git#2ac2a7984b2d01b96ed56fd8474f4bf80fa33c51" dependencies = [ "futures 0.3.15", "grpcio", diff --git a/Cargo.toml b/Cargo.toml index a499ca1ec30..1155dbc773c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -237,7 +237,6 @@ test_util = { path = "components/test_util", default-features = false } tokio = { version = "1.5", features = ["macros", "rt-multi-thread", "time"] } zipf = "6.1.0" - [patch.crates-io] # TODO: remove this when new raft-rs is published. raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false } @@ -256,6 +255,12 @@ rusoto_sts = { git = "https://github.com/tikv/rusoto", branch = "gh1482-s3-addr- [target.'cfg(target_os = "linux")'.dependencies] procinfo = { git = "https://github.com/tikv/procinfo-rs", rev = "5125fc1a69496b73b26b3c08b6e8afc3c665a56e" } +# When you modify TiKV cooperatively with kvproto, this will be useful to submit the PR to TiKV and the PR to +# kvproto at the same time. +# After the PR to kvproto is merged, remember to comment this out and run `cargo update -p kvproto`. +# [patch.'https://github.com/pingcap/kvproto'] +# kvproto = {git = "https://github.com/your_github_id/kvproto", branch="your_branch"} + [workspace] # See https://github.com/rust-lang/rfcs/blob/master/text/2957-cargo-features2.md # Without resolver = 2, using `cargo build --features x` to build `cmd` diff --git a/components/engine_traits/Cargo.toml b/components/engine_traits/Cargo.toml index 038883cbd56..fb3fb3f038d 100644 --- a/components/engine_traits/Cargo.toml +++ b/components/engine_traits/Cargo.toml @@ -23,6 +23,7 @@ prost-codec = [ "txn_types/prost-codec", "file_system/prost-codec", ] +failpoints = ["fail/failpoints"] [dependencies] error_code = { path = "../error_code", default-features = false } @@ -38,6 +39,7 @@ slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debu slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "d592f88e4dbba5eb439998463054f1a44fbf17b9" } kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false } raft = { version = "0.6.0-alpha", default-features = false } +fail = "0.4" [dev-dependencies] toml = "0.5" diff --git a/components/engine_traits/src/lib.rs b/components/engine_traits/src/lib.rs index 155dad3b5a0..f806d7e8b27 100644 --- a/components/engine_traits/src/lib.rs +++ b/components/engine_traits/src/lib.rs @@ -259,6 +259,8 @@ extern crate tikv_alloc; #[macro_use] extern crate serde_derive; extern crate slog_global; +#[macro_use(fail_point)] +extern crate fail; // These modules contain traits that need to be implemented by engines, either // they are required by KvEngine or are an associated type of KvEngine. It is diff --git a/components/engine_traits/src/util.rs b/components/engine_traits/src/util.rs index a2d05474d18..ecb79844719 100644 --- a/components/engine_traits/src/util.rs +++ b/components/engine_traits/src/util.rs @@ -3,6 +3,7 @@ use super::{Error, Result}; use tikv_util::codec; use tikv_util::codec::number::{self, NumberEncoder}; +use tikv_util::time::UnixSecs; /// Check if key in range [`start_key`, `end_key`). #[allow(dead_code)] @@ -24,6 +25,18 @@ pub fn check_key_in_range( } } +pub fn ttl_current_ts() -> u64 { + fail_point!("ttl_current_ts", |r| r.map_or(2, |e| e.parse().unwrap())); + UnixSecs::now().into_inner() +} + +pub fn ttl_to_expire_ts(ttl: u64) -> u64 { + if ttl == 0 { + return 0; + } + ttl.saturating_add(ttl_current_ts()) +} + pub fn append_expire_ts(value: &mut Vec, expire_ts: u64) { value.encode_u64(expire_ts).unwrap(); } diff --git a/components/error_code/src/sst_importer.rs b/components/error_code/src/sst_importer.rs index 1a353c58cdb..dffdeaed86e 100644 --- a/components/error_code/src/sst_importer.rs +++ b/components/error_code/src/sst_importer.rs @@ -17,5 +17,6 @@ define_error_codes!( CANNOT_READ_EXTERNAL_STORAGE => ("CannotReadExternalStorage", "", ""), WRONG_KEY_PREFIX => ("WrongKeyPrefix", "", ""), BAD_FORMAT => ("BadFormat", "", ""), - FILE_CONFLICT => ("FileConflict", "", "") + FILE_CONFLICT => ("FileConflict", "", ""), + TTL_NOT_ENABLED => ("TTLNotEnabled", "", "") ); diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 03c4369f91b..d862c569a26 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -4086,7 +4086,7 @@ mod tests { pub fn create_tmp_importer(path: &str) -> (TempDir, Arc) { let dir = Builder::new().prefix(path).tempdir().unwrap(); let importer = - Arc::new(SSTImporter::new(&ImportConfig::default(), dir.path(), None).unwrap()); + Arc::new(SSTImporter::new(&ImportConfig::default(), dir.path(), None, false).unwrap()); (dir, importer) } diff --git a/components/server/src/server.rs b/components/server/src/server.rs index 2136bdc8367..69ed1db3815 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -730,6 +730,7 @@ impl TiKVServer { &self.config.import, import_path, self.encryption_key_manager.clone(), + self.config.storage.enable_ttl, ) .unwrap(), ); diff --git a/components/sst_importer/src/errors.rs b/components/sst_importer/src/errors.rs index f2c48f7830d..54d4482ec4b 100644 --- a/components/sst_importer/src/errors.rs +++ b/components/sst_importer/src/errors.rs @@ -107,6 +107,9 @@ pub enum Error { #[error("ingest file conflict")] FileConflict, + + #[error("ttl is not enabled")] + TTLNotEnabled, } impl From for Error { @@ -148,6 +151,7 @@ impl ErrorCodeExt for Error { Error::Encryption(e) => e.error_code(), Error::CodecError(e) => e.error_code(), Error::FileConflict => error_code::sst_importer::FILE_CONFLICT, + Error::TTLNotEnabled => error_code::sst_importer::TTL_NOT_ENABLED, } } } diff --git a/components/sst_importer/src/import_file.rs b/components/sst_importer/src/import_file.rs new file mode 100644 index 00000000000..d00d26089bc --- /dev/null +++ b/components/sst_importer/src/import_file.rs @@ -0,0 +1,399 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. + +use std::collections::HashMap; +use std::fmt; +use std::io::{self, Write}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use encryption::{DataKeyManager, EncrypterWriter}; +use engine_rocks::{ + encryption::get_env as get_encrypted_env, file_system::get_env as get_inspected_env, + RocksSstReader, +}; +use engine_traits::{EncryptionKeyManager, KvEngine, SSTMetaInfo, SstReader}; +use file_system::{get_io_rate_limiter, sync_dir, File, OpenOptions}; +use kvproto::import_sstpb::*; +use tikv_util::time::Instant; +use uuid::{Builder as UuidBuilder, Uuid}; + +use crate::metrics::*; +use crate::{Error, Result}; + +// `SyncableWrite` extends io::Write with sync +trait SyncableWrite: io::Write + Send { + // sync all metadata to storage + fn sync(&self) -> io::Result<()>; +} + +impl SyncableWrite for File { + fn sync(&self) -> io::Result<()> { + self.sync_all() + } +} + +impl SyncableWrite for EncrypterWriter { + fn sync(&self) -> io::Result<()> { + self.sync_all() + } +} + +#[derive(Clone)] +pub struct ImportPath { + // The path of the file that has been uploaded. + pub save: PathBuf, + // The path of the file that is being uploaded. + pub temp: PathBuf, + // The path of the file that is going to be ingested. + pub clone: PathBuf, +} + +impl ImportPath { + // move file from temp to save. + pub fn save(mut self, key_manager: Option<&DataKeyManager>) -> Result<()> { + file_system::rename(&self.temp, &self.save)?; + if let Some(key_manager) = key_manager { + let temp_str = self + .temp + .to_str() + .ok_or_else(|| Error::InvalidSSTPath(self.temp.clone()))?; + let save_str = self + .save + .to_str() + .ok_or_else(|| Error::InvalidSSTPath(self.save.clone()))?; + key_manager.link_file(temp_str, save_str)?; + key_manager.delete_file(temp_str)?; + } + // sync the directory after rename + self.save.pop(); + sync_dir(&self.save)?; + Ok(()) + } +} + +impl fmt::Debug for ImportPath { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ImportPath") + .field("save", &self.save) + .field("temp", &self.temp) + .field("clone", &self.clone) + .finish() + } +} + +/// ImportFile is used to handle the writing and verification of SST files. +pub struct ImportFile { + meta: SstMeta, + path: ImportPath, + file: Option>, + digest: crc32fast::Hasher, + key_manager: Option>, +} + +impl ImportFile { + pub fn create( + meta: SstMeta, + path: ImportPath, + key_manager: Option>, + ) -> Result { + let file: Box = if let Some(ref manager) = key_manager { + // key manager will truncate existed file, so we should check exist manually. + if path.temp.exists() { + return Err(Error::Io(io::Error::new( + io::ErrorKind::AlreadyExists, + format!("file already exists, {}", path.temp.to_str().unwrap()), + ))); + } + Box::new(manager.create_file(&path.temp)?) + } else { + Box::new( + OpenOptions::new() + .write(true) + .create_new(true) + .open(&path.temp)?, + ) + }; + Ok(ImportFile { + meta, + path, + file: Some(file), + digest: crc32fast::Hasher::new(), + key_manager, + }) + } + + pub fn append(&mut self, data: &[u8]) -> Result<()> { + self.file.as_mut().unwrap().write_all(data)?; + self.digest.update(data); + Ok(()) + } + + pub fn finish(&mut self) -> Result<()> { + self.validate()?; + // sync is a wrapping for File::sync_all + self.file.take().unwrap().sync()?; + if self.path.save.exists() { + return Err(Error::FileExists( + self.path.save.clone(), + "finalize SST write cache", + )); + } + file_system::rename(&self.path.temp, &self.path.save)?; + if let Some(ref manager) = self.key_manager { + let tmp_str = self.path.temp.to_str().unwrap(); + let save_str = self.path.save.to_str().unwrap(); + manager.link_file(tmp_str, save_str)?; + manager.delete_file(self.path.temp.to_str().unwrap())?; + } + Ok(()) + } + + fn cleanup(&mut self) -> Result<()> { + self.file.take(); + if self.path.temp.exists() { + if let Some(ref manager) = self.key_manager { + manager.delete_file(self.path.temp.to_str().unwrap())?; + } + file_system::remove_file(&self.path.temp)?; + } + Ok(()) + } + + fn validate(&self) -> Result<()> { + let crc32 = self.digest.clone().finalize(); + let expect = self.meta.get_crc32(); + if crc32 != expect { + let reason = format!("crc32 {}, expect {}", crc32, expect); + return Err(Error::FileCorrupted(self.path.temp.clone(), reason)); + } + Ok(()) + } +} + +impl fmt::Debug for ImportFile { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ImportFile") + .field("meta", &self.meta) + .field("path", &self.path) + .finish() + } +} + +impl Drop for ImportFile { + fn drop(&mut self) { + if let Err(e) = self.cleanup() { + warn!("cleanup failed"; "file" => ?self, "err" => %e); + } + } +} + +/// ImportDir is responsible for operating SST files and related path +/// calculations. +/// +/// The file being written is stored in `$root/.temp/$file_name`. After writing +/// is completed, the file is moved to `$root/$file_name`. The file generated +/// from the ingestion process will be placed in `$root/.clone/$file_name`. +pub struct ImportDir { + root_dir: PathBuf, + temp_dir: PathBuf, + clone_dir: PathBuf, +} + +impl ImportDir { + const TEMP_DIR: &'static str = ".temp"; + const CLONE_DIR: &'static str = ".clone"; + + pub fn new>(root: P) -> Result { + let root_dir = root.as_ref().to_owned(); + let temp_dir = root_dir.join(Self::TEMP_DIR); + let clone_dir = root_dir.join(Self::CLONE_DIR); + if temp_dir.exists() { + file_system::remove_dir_all(&temp_dir)?; + } + if clone_dir.exists() { + file_system::remove_dir_all(&clone_dir)?; + } + file_system::create_dir_all(&temp_dir)?; + file_system::create_dir_all(&clone_dir)?; + Ok(ImportDir { + root_dir, + temp_dir, + clone_dir, + }) + } + + pub fn join(&self, meta: &SstMeta) -> Result { + let file_name = sst_meta_to_path(meta)?; + let save_path = self.root_dir.join(&file_name); + let temp_path = self.temp_dir.join(&file_name); + let clone_path = self.clone_dir.join(&file_name); + Ok(ImportPath { + save: save_path, + temp: temp_path, + clone: clone_path, + }) + } + + pub fn create( + &self, + meta: &SstMeta, + key_manager: Option>, + ) -> Result { + let path = self.join(meta)?; + if path.save.exists() { + return Err(Error::FileExists(path.save, "create SST upload cache")); + } + ImportFile::create(meta.clone(), path, key_manager) + } + + pub fn delete_file(&self, path: &Path, key_manager: Option<&DataKeyManager>) -> Result<()> { + if path.exists() { + file_system::remove_file(&path)?; + if let Some(manager) = key_manager { + manager.delete_file(path.to_str().unwrap())?; + } + } + + Ok(()) + } + + pub fn delete(&self, meta: &SstMeta, manager: Option<&DataKeyManager>) -> Result { + let path = self.join(meta)?; + self.delete_file(&path.save, manager)?; + self.delete_file(&path.temp, manager)?; + self.delete_file(&path.clone, manager)?; + Ok(path) + } + + pub fn exist(&self, meta: &SstMeta) -> Result { + let path = self.join(meta)?; + Ok(path.save.exists()) + } + + pub fn validate( + &self, + meta: &SstMeta, + key_manager: Option>, + ) -> Result { + let path = self.join(meta)?; + let path_str = path.save.to_str().unwrap(); + let env = get_encrypted_env(key_manager, None /*base_env*/)?; + let env = get_inspected_env(Some(env), get_io_rate_limiter())?; + let sst_reader = RocksSstReader::open_with_env(&path_str, Some(env))?; + sst_reader.verify_checksum()?; + // TODO: check the length and crc32 of ingested file. + let meta_info = sst_reader.sst_meta_info(meta.to_owned()); + Ok(meta_info) + } + + pub fn ingest( + &self, + metas: &[SstMeta], + engine: &E, + key_manager: Option>, + ) -> Result<()> { + let start = Instant::now(); + + let mut paths = HashMap::new(); + let mut ingest_bytes = 0; + for meta in metas { + let path = self.join(meta)?; + let cf = meta.get_cf_name(); + super::prepare_sst_for_ingestion(&path.save, &path.clone, key_manager.as_deref())?; + ingest_bytes += meta.get_length(); + paths.entry(cf).or_insert_with(Vec::new).push(path); + } + + for (cf, cf_paths) in paths { + let files: Vec<&str> = cf_paths.iter().map(|p| p.clone.to_str().unwrap()).collect(); + engine.ingest_external_file_cf(cf, &files)?; + } + INPORTER_INGEST_COUNT.observe(metas.len() as _); + IMPORTER_INGEST_BYTES.observe(ingest_bytes as _); + IMPORTER_INGEST_DURATION + .with_label_values(&["ingest"]) + .observe(start.saturating_elapsed().as_secs_f64()); + Ok(()) + } + + pub fn list_ssts(&self) -> Result> { + let mut ssts = Vec::new(); + for e in file_system::read_dir(&self.root_dir)? { + let e = e?; + if !e.file_type()?.is_file() { + continue; + } + let path = e.path(); + match path_to_sst_meta(&path) { + Ok(sst) => ssts.push(sst), + Err(e) => error!(%e; "path_to_sst_meta failed"; "path" => %path.to_str().unwrap(),), + } + } + Ok(ssts) + } +} + +const SST_SUFFIX: &str = ".sst"; + +pub fn sst_meta_to_path(meta: &SstMeta) -> Result { + Ok(PathBuf::from(format!( + "{}_{}_{}_{}_{}{}", + UuidBuilder::from_slice(meta.get_uuid())?.build(), + meta.get_region_id(), + meta.get_region_epoch().get_conf_ver(), + meta.get_region_epoch().get_version(), + meta.get_cf_name(), + SST_SUFFIX, + ))) +} + +pub fn path_to_sst_meta>(path: P) -> Result { + let path = path.as_ref(); + let file_name = match path.file_name().and_then(|n| n.to_str()) { + Some(name) => name, + None => return Err(Error::InvalidSSTPath(path.to_owned())), + }; + + // A valid file name should be in the format: + // "{uuid}_{region_id}_{region_epoch.conf_ver}_{region_epoch.version}_{cf}.sst" + if !file_name.ends_with(SST_SUFFIX) { + return Err(Error::InvalidSSTPath(path.to_owned())); + } + let elems: Vec<_> = file_name.trim_end_matches(SST_SUFFIX).split('_').collect(); + if elems.len() != 5 { + return Err(Error::InvalidSSTPath(path.to_owned())); + } + + let mut meta = SstMeta::default(); + let uuid = Uuid::parse_str(elems[0])?; + meta.set_uuid(uuid.as_bytes().to_vec()); + meta.set_region_id(elems[1].parse()?); + meta.mut_region_epoch().set_conf_ver(elems[2].parse()?); + meta.mut_region_epoch().set_version(elems[3].parse()?); + meta.set_cf_name(elems[4].to_owned()); + Ok(meta) +} + +#[cfg(test)] +mod test { + use super::*; + use engine_traits::CF_DEFAULT; + + #[test] + fn test_sst_meta_to_path() { + let mut meta = SstMeta::default(); + let uuid = Uuid::new_v4(); + meta.set_uuid(uuid.as_bytes().to_vec()); + meta.set_region_id(1); + meta.set_cf_name(CF_DEFAULT.to_owned()); + meta.mut_region_epoch().set_conf_ver(2); + meta.mut_region_epoch().set_version(3); + + let path = sst_meta_to_path(&meta).unwrap(); + let expected_path = format!("{}_1_2_3_default.sst", uuid); + assert_eq!(path.to_str().unwrap(), &expected_path); + + let new_meta = path_to_sst_meta(path).unwrap(); + assert_eq!(meta, new_meta); + } +} diff --git a/components/sst_importer/src/lib.rs b/components/sst_importer/src/lib.rs index 31fd5c9109f..b4949a38432 100644 --- a/components/sst_importer/src/lib.rs +++ b/components/sst_importer/src/lib.rs @@ -14,13 +14,17 @@ extern crate tikv_alloc; mod config; mod errors; -pub mod metrics; +mod import_file; +mod sst_writer; mod util; #[macro_use] pub mod import_mode; +pub mod metrics; pub mod sst_importer; pub use self::config::Config; pub use self::errors::{error_inc, Error, Result}; -pub use self::sst_importer::{sst_meta_to_path, SSTImporter, SSTWriter}; +pub use self::import_file::sst_meta_to_path; +pub use self::sst_importer::SSTImporter; +pub use self::sst_writer::{RawSSTWriter, TxnSSTWriter}; pub use self::util::prepare_sst_for_ingestion; diff --git a/components/sst_importer/src/metrics.rs b/components/sst_importer/src/metrics.rs index 49b27b65726..7c3238cfc53 100644 --- a/components/sst_importer/src/metrics.rs +++ b/components/sst_importer/src/metrics.rs @@ -24,13 +24,26 @@ lazy_static! { exponential_buckets(0.01, 2.0, 20).unwrap() ) .unwrap(); - pub static ref IMPORT_WRITE_CHUNK_DURATION: Histogram = register_histogram!( - "tikv_import_write_chunk_duration", - "Bucketed histogram of import write chunk duration", + pub static ref IMPORT_LOCAL_WRITE_CHUNK_DURATION_VEC: HistogramVec = register_histogram_vec!( + "tikv_import_local_write_chunk_duration", + "Bucketed histogram of local backend write chunk duration", + &["type"], // Start from 10ms. exponential_buckets(0.01, 2.0, 20).unwrap() ) .unwrap(); + pub static ref IMPORT_LOCAL_WRITE_BYTES_VEC: IntCounterVec = register_int_counter_vec!( + "tikv_import_local_write_bytes", + "Number of bytes written from local backend", + &["type"] + ) + .unwrap(); + pub static ref IMPORT_LOCAL_WRITE_KEYS_VEC: IntCounterVec = register_int_counter_vec!( + "tikv_import_local_write_keys", + "Number of keys written from local backend", + &["type"] + ) + .unwrap(); pub static ref IMPORTER_DOWNLOAD_DURATION: HistogramVec = register_histogram_vec!( "tikv_import_download_duration", "Bucketed histogram of importer download duration", diff --git a/components/sst_importer/src/sst_importer.rs b/components/sst_importer/src/sst_importer.rs index d3d4503b2f7..4147e49e6a4 100644 --- a/components/sst_importer/src/sst_importer.rs +++ b/components/sst_importer/src/sst_importer.rs @@ -1,9 +1,6 @@ // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. use std::borrow::Cow; -use std::collections::HashMap; -use std::fmt; -use std::io::{self, Write}; use std::ops::Bound; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -13,11 +10,9 @@ use kvproto::backup::StorageBackend; #[cfg(feature = "prost-codec")] use kvproto::import_sstpb::pair::Op as PairOp; #[cfg(not(feature = "prost-codec"))] -use kvproto::import_sstpb::PairOp; use kvproto::import_sstpb::*; -use uuid::{Builder as UuidBuilder, Uuid}; -use encryption::{DataKeyManager, EncrypterWriter}; +use encryption::DataKeyManager; use engine_rocks::{ encryption::get_env as get_encrypted_env, file_system::get_env as get_inspected_env, RocksSstReader, @@ -26,20 +21,23 @@ use engine_traits::{ name_to_cf, EncryptionKeyManager, Iterator, KvEngine, SSTMetaInfo, SeekKey, SstExt, SstReader, SstWriter, SstWriterBuilder, CF_DEFAULT, CF_WRITE, }; -use file_system::{get_io_rate_limiter, sync_dir, File, OpenOptions}; +use file_system::{get_io_rate_limiter, OpenOptions}; use tikv_util::time::{Instant, Limiter}; -use txn_types::{is_short_value, Key, TimeStamp, Write as KvWrite, WriteRef, WriteType}; +use txn_types::{Key, TimeStamp, WriteRef}; -use super::Config; -use super::{Error, Result}; +use crate::import_file::{ImportDir, ImportFile}; use crate::import_mode::{ImportModeSwitcher, RocksDBMetricsFn}; use crate::metrics::*; +use crate::sst_writer::{RawSSTWriter, TxnSSTWriter}; +use crate::Config; +use crate::{Error, Result}; /// SSTImporter manages SST files that are waiting for ingesting. pub struct SSTImporter { dir: ImportDir, key_manager: Option>, switcher: ImportModeSwitcher, + enable_ttl: bool, } impl SSTImporter { @@ -47,12 +45,14 @@ impl SSTImporter { cfg: &Config, root: P, key_manager: Option>, + enable_ttl: bool, ) -> Result { let switcher = ImportModeSwitcher::new(cfg); Ok(SSTImporter { dir: ImportDir::new(root)?, key_manager, switcher, + enable_ttl, }) } @@ -411,7 +411,7 @@ impl SSTImporter { self.dir.list_ssts() } - pub fn new_writer(&self, db: &E, meta: SstMeta) -> Result> { + pub fn new_txn_writer(&self, db: &E, meta: SstMeta) -> Result> { let mut default_meta = meta.clone(); default_meta.set_cf_name(CF_DEFAULT.to_owned()); let default_path = self.dir.join(&default_meta)?; @@ -430,7 +430,7 @@ impl SSTImporter { .build(write_path.temp.to_str().unwrap()) .unwrap(); - Ok(SSTWriter::new( + Ok(TxnSSTWriter::new( default, write, default_path, @@ -440,445 +440,27 @@ impl SSTImporter { self.key_manager.clone(), )) } -} - -pub struct SSTWriter { - default: E::SstWriter, - default_entries: u64, - default_path: ImportPath, - default_meta: SstMeta, - write: E::SstWriter, - write_entries: u64, - write_path: ImportPath, - write_meta: SstMeta, - key_manager: Option>, -} -impl SSTWriter { - pub fn new( - default: E::SstWriter, - write: E::SstWriter, - default_path: ImportPath, - write_path: ImportPath, - default_meta: SstMeta, - write_meta: SstMeta, - key_manager: Option>, - ) -> Self { - SSTWriter { + pub fn new_raw_writer( + &self, + db: &E, + mut meta: SstMeta, + ) -> Result> { + meta.set_cf_name(CF_DEFAULT.to_owned()); + let default_path = self.dir.join(&meta)?; + let default = E::SstWriterBuilder::new() + .set_db(&db) + .set_cf(CF_DEFAULT) + .build(default_path.temp.to_str().unwrap()) + .unwrap(); + Ok(RawSSTWriter::new( default, default_path, - default_entries: 0, - default_meta, - write, - write_path, - write_entries: 0, - write_meta, - key_manager, - } - } - - pub fn write(&mut self, batch: WriteBatch) -> Result<()> { - let commit_ts = TimeStamp::new(batch.get_commit_ts()); - for m in batch.get_pairs().iter() { - let k = Key::from_raw(m.get_key()).append_ts(commit_ts); - self.put(k.as_encoded(), m.get_value(), m.get_op())?; - } - Ok(()) - } - - fn put(&mut self, key: &[u8], value: &[u8], op: PairOp) -> Result<()> { - let k = keys::data_key(key); - let (_, commit_ts) = Key::split_on_ts_for(key)?; - let w = match (op, is_short_value(value)) { - (PairOp::Delete, _) => KvWrite::new(WriteType::Delete, commit_ts, None), - (PairOp::Put, true) => KvWrite::new(WriteType::Put, commit_ts, Some(value.to_vec())), - (PairOp::Put, false) => { - self.default.put(&k, value)?; - self.default_entries += 1; - KvWrite::new(WriteType::Put, commit_ts, None) - } - }; - self.write.put(&k, &w.as_ref().to_bytes())?; - self.write_entries += 1; - Ok(()) - } - - pub fn finish(self) -> Result> { - let default_meta = self.default_meta.clone(); - let write_meta = self.write_meta.clone(); - let mut metas = Vec::with_capacity(2); - let (default_entries, write_entries) = (self.default_entries, self.write_entries); - let (p1, p2) = (self.default_path.clone(), self.write_path.clone()); - let (w1, w2, key_manager) = (self.default, self.write, self.key_manager); - if default_entries > 0 { - w1.finish()?; - Self::save(p1, key_manager.as_deref())?; - metas.push(default_meta); - } - if write_entries > 0 { - w2.finish()?; - Self::save(p2, key_manager.as_deref())?; - metas.push(write_meta); - } - info!("finish write to sst"; - "default entries" => default_entries, - "write entries" => write_entries, - ); - Ok(metas) - } - - // move file from temp to save. - fn save(mut import_path: ImportPath, key_manager: Option<&DataKeyManager>) -> Result<()> { - file_system::rename(&import_path.temp, &import_path.save)?; - if let Some(key_manager) = key_manager { - let temp_str = import_path - .temp - .to_str() - .ok_or_else(|| Error::InvalidSSTPath(import_path.temp.clone()))?; - let save_str = import_path - .save - .to_str() - .ok_or_else(|| Error::InvalidSSTPath(import_path.save.clone()))?; - key_manager.link_file(temp_str, save_str)?; - key_manager.delete_file(temp_str)?; - } - // sync the directory after rename - import_path.save.pop(); - sync_dir(&import_path.save)?; - Ok(()) - } -} - -/// ImportDir is responsible for operating SST files and related path -/// calculations. -/// -/// The file being written is stored in `$root/.temp/$file_name`. After writing -/// is completed, the file is moved to `$root/$file_name`. The file generated -/// from the ingestion process will be placed in `$root/.clone/$file_name`. -/// -/// TODO: Add size and rate limit. -pub struct ImportDir { - root_dir: PathBuf, - temp_dir: PathBuf, - clone_dir: PathBuf, -} - -impl ImportDir { - const TEMP_DIR: &'static str = ".temp"; - const CLONE_DIR: &'static str = ".clone"; - - fn new>(root: P) -> Result { - let root_dir = root.as_ref().to_owned(); - let temp_dir = root_dir.join(Self::TEMP_DIR); - let clone_dir = root_dir.join(Self::CLONE_DIR); - if temp_dir.exists() { - file_system::remove_dir_all(&temp_dir)?; - } - if clone_dir.exists() { - file_system::remove_dir_all(&clone_dir)?; - } - file_system::create_dir_all(&temp_dir)?; - file_system::create_dir_all(&clone_dir)?; - Ok(ImportDir { - root_dir, - temp_dir, - clone_dir, - }) - } - - fn join(&self, meta: &SstMeta) -> Result { - let file_name = sst_meta_to_path(meta)?; - let save_path = self.root_dir.join(&file_name); - let temp_path = self.temp_dir.join(&file_name); - let clone_path = self.clone_dir.join(&file_name); - Ok(ImportPath { - save: save_path, - temp: temp_path, - clone: clone_path, - }) - } - - fn create( - &self, - meta: &SstMeta, - key_manager: Option>, - ) -> Result { - let path = self.join(meta)?; - if path.save.exists() { - return Err(Error::FileExists(path.save, "create SST upload cache")); - } - ImportFile::create(meta.clone(), path, key_manager) - } - - fn delete_file(&self, path: &Path, key_manager: Option<&DataKeyManager>) -> Result<()> { - if path.exists() { - file_system::remove_file(&path)?; - if let Some(manager) = key_manager { - manager.delete_file(path.to_str().unwrap())?; - } - } - - Ok(()) - } - - fn delete(&self, meta: &SstMeta, manager: Option<&DataKeyManager>) -> Result { - let path = self.join(meta)?; - self.delete_file(&path.save, manager)?; - self.delete_file(&path.temp, manager)?; - self.delete_file(&path.clone, manager)?; - Ok(path) - } - - fn exist(&self, meta: &SstMeta) -> Result { - let path = self.join(meta)?; - Ok(path.save.exists()) - } - - fn validate( - &self, - meta: &SstMeta, - key_manager: Option>, - ) -> Result { - let path = self.join(meta)?; - let path_str = path.save.to_str().unwrap(); - let env = get_encrypted_env(key_manager, None /*base_env*/)?; - let env = get_inspected_env(Some(env), get_io_rate_limiter())?; - let sst_reader = RocksSstReader::open_with_env(&path_str, Some(env))?; - sst_reader.verify_checksum()?; - // TODO: check the length and crc32 of ingested file. - let meta_info = sst_reader.sst_meta_info(meta.to_owned()); - Ok(meta_info) - } - - fn ingest( - &self, - metas: &[SstMeta], - engine: &E, - key_manager: Option>, - ) -> Result<()> { - let start = Instant::now(); - - let mut paths = HashMap::new(); - let mut ingest_bytes = 0; - for meta in metas { - let path = self.join(meta)?; - let cf = meta.get_cf_name(); - super::prepare_sst_for_ingestion(&path.save, &path.clone, key_manager.as_deref())?; - ingest_bytes += meta.get_length(); - paths.entry(cf).or_insert_with(Vec::new).push(path); - } - - for (cf, cf_paths) in paths { - let files: Vec<&str> = cf_paths.iter().map(|p| p.clone.to_str().unwrap()).collect(); - engine.ingest_external_file_cf(cf, &files)?; - } - INPORTER_INGEST_COUNT.observe(metas.len() as _); - IMPORTER_INGEST_BYTES.observe(ingest_bytes as _); - IMPORTER_INGEST_DURATION - .with_label_values(&["ingest"]) - .observe(start.saturating_elapsed().as_secs_f64()); - Ok(()) - } - - fn list_ssts(&self) -> Result> { - let mut ssts = Vec::new(); - for e in file_system::read_dir(&self.root_dir)? { - let e = e?; - if !e.file_type()?.is_file() { - continue; - } - let path = e.path(); - match path_to_sst_meta(&path) { - Ok(sst) => ssts.push(sst), - Err(e) => error!(%e; "path_to_sst_meta failed"; "path" => %path.to_str().unwrap(),), - } - } - Ok(ssts) - } -} - -#[derive(Clone)] -pub struct ImportPath { - // The path of the file that has been uploaded. - save: PathBuf, - // The path of the file that is being uploaded. - temp: PathBuf, - // The path of the file that is going to be ingested. - clone: PathBuf, -} - -impl fmt::Debug for ImportPath { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ImportPath") - .field("save", &self.save) - .field("temp", &self.temp) - .field("clone", &self.clone) - .finish() - } -} -// `SyncableWrite` extends io::Write with sync -trait SyncableWrite: io::Write + Send { - // sync all metadata to storage - fn sync(&self) -> io::Result<()>; -} - -impl SyncableWrite for File { - fn sync(&self) -> io::Result<()> { - self.sync_all() - } -} - -impl SyncableWrite for EncrypterWriter { - fn sync(&self) -> io::Result<()> { - self.sync_all() - } -} - -/// ImportFile is used to handle the writing and verification of SST files. -pub struct ImportFile { - meta: SstMeta, - path: ImportPath, - file: Option>, - digest: crc32fast::Hasher, - key_manager: Option>, -} - -impl ImportFile { - fn create( - meta: SstMeta, - path: ImportPath, - key_manager: Option>, - ) -> Result { - let file: Box = if let Some(ref manager) = key_manager { - // key manager will truncate existed file, so we should check exist manually. - if path.temp.exists() { - return Err(Error::Io(io::Error::new( - io::ErrorKind::AlreadyExists, - format!("file already exists, {}", path.temp.to_str().unwrap()), - ))); - } - Box::new(manager.create_file(&path.temp)?) - } else { - Box::new( - OpenOptions::new() - .write(true) - .create_new(true) - .open(&path.temp)?, - ) - }; - Ok(ImportFile { meta, - path, - file: Some(file), - digest: crc32fast::Hasher::new(), - key_manager, - }) - } - - pub fn append(&mut self, data: &[u8]) -> Result<()> { - self.file.as_mut().unwrap().write_all(data)?; - self.digest.update(data); - Ok(()) - } - - pub fn finish(&mut self) -> Result<()> { - self.validate()?; - // sync is a wrapping for File::sync_all - self.file.take().unwrap().sync()?; - if self.path.save.exists() { - return Err(Error::FileExists( - self.path.save.clone(), - "finalize SST write cache", - )); - } - file_system::rename(&self.path.temp, &self.path.save)?; - if let Some(ref manager) = self.key_manager { - let tmp_str = self.path.temp.to_str().unwrap(); - let save_str = self.path.save.to_str().unwrap(); - manager.link_file(tmp_str, save_str)?; - manager.delete_file(self.path.temp.to_str().unwrap())?; - } - Ok(()) - } - - fn cleanup(&mut self) -> Result<()> { - self.file.take(); - if self.path.temp.exists() { - if let Some(ref manager) = self.key_manager { - manager.delete_file(self.path.temp.to_str().unwrap())?; - } - file_system::remove_file(&self.path.temp)?; - } - Ok(()) - } - - fn validate(&self) -> Result<()> { - let crc32 = self.digest.clone().finalize(); - let expect = self.meta.get_crc32(); - if crc32 != expect { - let reason = format!("crc32 {}, expect {}", crc32, expect); - return Err(Error::FileCorrupted(self.path.temp.clone(), reason)); - } - Ok(()) - } -} - -impl Drop for ImportFile { - fn drop(&mut self) { - if let Err(e) = self.cleanup() { - warn!("cleanup failed"; "file" => ?self, "err" => %e); - } - } -} - -impl fmt::Debug for ImportFile { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ImportFile") - .field("meta", &self.meta) - .field("path", &self.path) - .finish() - } -} - -const SST_SUFFIX: &str = ".sst"; - -pub fn sst_meta_to_path(meta: &SstMeta) -> Result { - Ok(PathBuf::from(format!( - "{}_{}_{}_{}_{}{}", - UuidBuilder::from_slice(meta.get_uuid())?.build(), - meta.get_region_id(), - meta.get_region_epoch().get_conf_ver(), - meta.get_region_epoch().get_version(), - meta.get_cf_name(), - SST_SUFFIX, - ))) -} - -fn path_to_sst_meta>(path: P) -> Result { - let path = path.as_ref(); - let file_name = match path.file_name().and_then(|n| n.to_str()) { - Some(name) => name, - None => return Err(Error::InvalidSSTPath(path.to_owned())), - }; - - // A valid file name should be in the format: - // "{uuid}_{region_id}_{region_epoch.conf_ver}_{region_epoch.version}_{cf}.sst" - if !file_name.ends_with(SST_SUFFIX) { - return Err(Error::InvalidSSTPath(path.to_owned())); - } - let elems: Vec<_> = file_name.trim_end_matches(SST_SUFFIX).split('_').collect(); - if elems.len() != 5 { - return Err(Error::InvalidSSTPath(path.to_owned())); + self.key_manager.clone(), + self.enable_ttl, + )) } - - let mut meta = SstMeta::default(); - let uuid = Uuid::parse_str(elems[0])?; - meta.set_uuid(uuid.as_bytes().to_vec()); - meta.set_region_id(elems[1].parse()?); - meta.mut_region_epoch().set_conf_ver(elems[2].parse()?); - meta.mut_region_epoch().set_version(elems[3].parse()?); - meta.set_cf_name(elems[4].to_owned()); - Ok(meta) } fn key_to_bound(key: &[u8]) -> Bound<&[u8]> { @@ -915,26 +497,25 @@ fn is_after_end_bound>(value: &[u8], bound: &Bound) -> bool { #[cfg(test)] mod tests { - use super::*; - use test_sst_importer::*; - use std::f64::INFINITY; - use tikv_util::stream::block_on_external_io; + use std::io; use engine_traits::{ - collect, EncryptionMethod, Iterable, Iterator, SeekKey, CF_DEFAULT, DATA_CFS, - }; - use engine_traits::{Error as TraitError, TablePropertiesExt}; - use engine_traits::{ - ExternalSstFileInfo, TableProperties, TablePropertiesCollection, UserCollectedProperties, + collect, EncryptionMethod, Error as TraitError, ExternalSstFileInfo, Iterable, Iterator, + SeekKey, SstReader, SstWriter, TableProperties, TablePropertiesCollection, + TablePropertiesExt, UserCollectedProperties, CF_DEFAULT, DATA_CFS, }; + use file_system::File; use tempfile::Builder; - use test_sst_importer::{ - new_sst_reader, new_sst_writer, new_test_engine, RocksSstWriter, PROP_TEST_MARKER_CF_NAME, - }; + use test_sst_importer::*; + use test_util::new_test_key_manager; + use tikv_util::stream::block_on_external_io; use txn_types::{Value, WriteType}; + use uuid::Uuid; - use test_util::new_test_key_manager; + use super::*; + use crate::import_file::ImportPath; + use crate::*; fn do_test_import_dir(key_manager: Option>) { let temp_dir = Builder::new().prefix("test_import_dir").tempdir().unwrap(); @@ -1096,24 +677,6 @@ mod tests { do_test_import_file(Some(key_manager)); } - #[test] - fn test_sst_meta_to_path() { - let mut meta = SstMeta::default(); - let uuid = Uuid::new_v4(); - meta.set_uuid(uuid.as_bytes().to_vec()); - meta.set_region_id(1); - meta.set_cf_name(CF_DEFAULT.to_owned()); - meta.mut_region_epoch().set_conf_ver(2); - meta.mut_region_epoch().set_version(3); - - let path = sst_meta_to_path(&meta).unwrap(); - let expected_path = format!("{}_1_2_3_default.sst", uuid); - assert_eq!(path.to_str().unwrap(), &expected_path); - - let new_meta = path_to_sst_meta(path).unwrap(); - assert_eq!(meta, new_meta); - } - fn create_external_sst_file_with_write_fn( write_fn: F, ) -> Result<(tempfile::TempDir, StorageBackend, SstMeta)> @@ -1327,7 +890,7 @@ mod tests { // performs the download. let importer_dir = tempfile::tempdir().unwrap(); let cfg = Config::default(); - let importer = SSTImporter::new(&cfg, &importer_dir, None).unwrap(); + let importer = SSTImporter::new(&cfg, &importer_dir, None, false).unwrap(); let db = create_sst_test_engine().unwrap(); let range = importer @@ -1376,7 +939,8 @@ mod tests { let importer_dir = tempfile::tempdir().unwrap(); let cfg = Config::default(); let (temp_dir, key_manager) = new_key_manager_for_test(); - let importer = SSTImporter::new(&cfg, &importer_dir, Some(key_manager.clone())).unwrap(); + let importer = + SSTImporter::new(&cfg, &importer_dir, Some(key_manager.clone()), false).unwrap(); let db_path = temp_dir.path().join("db"); let env = get_encrypted_env(Some(key_manager), None /*base_env*/).unwrap(); @@ -1428,7 +992,7 @@ mod tests { // performs the download. let importer_dir = tempfile::tempdir().unwrap(); let cfg = Config::default(); - let importer = SSTImporter::new(&cfg, &importer_dir, None).unwrap(); + let importer = SSTImporter::new(&cfg, &importer_dir, None, false).unwrap(); let db = create_sst_test_engine().unwrap(); let range = importer @@ -1472,7 +1036,7 @@ mod tests { // performs the download. let importer_dir = tempfile::tempdir().unwrap(); let cfg = Config::default(); - let importer = SSTImporter::new(&cfg, &importer_dir, None).unwrap(); + let importer = SSTImporter::new(&cfg, &importer_dir, None, false).unwrap(); // creates a sample SST file. let (_ext_sst_dir, backend, meta) = create_sample_external_sst_file_txn_default().unwrap(); @@ -1515,7 +1079,7 @@ mod tests { // performs the download. let importer_dir = tempfile::tempdir().unwrap(); let cfg = Config::default(); - let importer = SSTImporter::new(&cfg, &importer_dir, None).unwrap(); + let importer = SSTImporter::new(&cfg, &importer_dir, None, false).unwrap(); // creates a sample SST file. let (_ext_sst_dir, backend, meta) = create_sample_external_sst_file_txn_write().unwrap(); @@ -1580,7 +1144,7 @@ mod tests { // performs the download. let importer_dir = tempfile::tempdir().unwrap(); let cfg = Config::default(); - let importer = SSTImporter::new(&cfg, &importer_dir, None).unwrap(); + let importer = SSTImporter::new(&cfg, &importer_dir, None, false).unwrap(); let db = create_sst_test_engine().unwrap(); let range = importer @@ -1651,7 +1215,7 @@ mod tests { let (_ext_sst_dir, backend, mut meta) = create_sample_external_sst_file().unwrap(); let importer_dir = tempfile::tempdir().unwrap(); let cfg = Config::default(); - let importer = SSTImporter::new(&cfg, &importer_dir, None).unwrap(); + let importer = SSTImporter::new(&cfg, &importer_dir, None, false).unwrap(); let db = create_sst_test_engine().unwrap(); // note: the range doesn't contain the DATA_PREFIX 'z'. meta.mut_range().set_start(b"t123_r02".to_vec()); @@ -1696,7 +1260,7 @@ mod tests { let (_ext_sst_dir, backend, mut meta) = create_sample_external_sst_file().unwrap(); let importer_dir = tempfile::tempdir().unwrap(); let cfg = Config::default(); - let importer = SSTImporter::new(&cfg, &importer_dir, None).unwrap(); + let importer = SSTImporter::new(&cfg, &importer_dir, None, false).unwrap(); let db = create_sst_test_engine().unwrap(); meta.mut_range().set_start(b"t5_r02".to_vec()); meta.mut_range().set_end(b"t5_r12".to_vec()); @@ -1742,7 +1306,7 @@ mod tests { meta.set_uuid(vec![0u8; 16]); let importer_dir = tempfile::tempdir().unwrap(); let cfg = Config::default(); - let importer = SSTImporter::new(&cfg, &importer_dir, None).unwrap(); + let importer = SSTImporter::new(&cfg, &importer_dir, None, false).unwrap(); let db = create_sst_test_engine().unwrap(); let backend = external_storage_export::make_local_backend(ext_sst_dir.path()); @@ -1766,7 +1330,7 @@ mod tests { let (_ext_sst_dir, backend, mut meta) = create_sample_external_sst_file().unwrap(); let importer_dir = tempfile::tempdir().unwrap(); let cfg = Config::default(); - let importer = SSTImporter::new(&cfg, &importer_dir, None).unwrap(); + let importer = SSTImporter::new(&cfg, &importer_dir, None, false).unwrap(); let db = create_sst_test_engine().unwrap(); meta.mut_range().set_start(vec![b'x']); meta.mut_range().set_end(vec![b'y']); @@ -1791,7 +1355,7 @@ mod tests { let (_ext_sst_dir, backend, meta) = create_sample_external_sst_file().unwrap(); let importer_dir = tempfile::tempdir().unwrap(); let cfg = Config::default(); - let importer = SSTImporter::new(&cfg, &importer_dir, None).unwrap(); + let importer = SSTImporter::new(&cfg, &importer_dir, None, false).unwrap(); let db = create_sst_test_engine().unwrap(); let result = importer.download::( @@ -1812,51 +1376,6 @@ mod tests { } } - #[test] - fn test_write_sst() { - let mut meta = SstMeta::default(); - meta.set_uuid(Uuid::new_v4().as_bytes().to_vec()); - - let importer_dir = tempfile::tempdir().unwrap(); - let cfg = Config::default(); - let importer = SSTImporter::new(&cfg, &importer_dir, None).unwrap(); - let db_path = importer_dir.path().join("db"); - let db = new_test_engine(db_path.to_str().unwrap(), DATA_CFS); - - let mut w = importer.new_writer::(&db, meta).unwrap(); - let mut batch = WriteBatch::default(); - let mut pairs = vec![]; - - // put short value kv in wirte cf - let mut pair = Pair::default(); - pair.set_key(b"k1".to_vec()); - pair.set_value(b"short_value".to_vec()); - pairs.push(pair); - - // put big value kv in default cf - let big_value = vec![42; 256]; - let mut pair = Pair::default(); - pair.set_key(b"k2".to_vec()); - pair.set_value(big_value); - pairs.push(pair); - - // put delete type key in write cf - let mut pair = Pair::default(); - pair.set_key(b"k3".to_vec()); - pair.set_op(PairOp::Delete); - pairs.push(pair); - - // generate two cf metas - batch.set_commit_ts(10); - batch.set_pairs(pairs.into()); - w.write(batch).unwrap(); - assert_eq!(w.write_entries, 3); - assert_eq!(w.default_entries, 1); - - let metas = w.finish().unwrap(); - assert_eq!(metas.len(), 2); - } - #[test] fn test_download_rawkv_sst() { // creates a sample SST file. @@ -1866,7 +1385,7 @@ mod tests { // performs the download. let importer_dir = tempfile::tempdir().unwrap(); let cfg = Config::default(); - let importer = SSTImporter::new(&cfg, &importer_dir, None).unwrap(); + let importer = SSTImporter::new(&cfg, &importer_dir, None, false).unwrap(); let db = create_sst_test_engine().unwrap(); let range = importer @@ -1918,7 +1437,7 @@ mod tests { // performs the download. let importer_dir = tempfile::tempdir().unwrap(); let cfg = Config::default(); - let importer = SSTImporter::new(&cfg, &importer_dir, None).unwrap(); + let importer = SSTImporter::new(&cfg, &importer_dir, None, false).unwrap(); let db = create_sst_test_engine().unwrap(); let range = importer @@ -1966,7 +1485,7 @@ mod tests { // performs the download. let importer_dir = tempfile::tempdir().unwrap(); let cfg = Config::default(); - let importer = SSTImporter::new(&cfg, &importer_dir, None).unwrap(); + let importer = SSTImporter::new(&cfg, &importer_dir, None, false).unwrap(); let db = create_sst_test_engine().unwrap(); let range = importer diff --git a/components/sst_importer/src/sst_writer.rs b/components/sst_importer/src/sst_writer.rs new file mode 100644 index 00000000000..99d01e5e297 --- /dev/null +++ b/components/sst_importer/src/sst_writer.rs @@ -0,0 +1,349 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. + +use engine_traits::util::{append_expire_ts, ttl_to_expire_ts}; +use kvproto::import_sstpb::*; +use std::sync::Arc; + +use encryption::DataKeyManager; +use engine_traits::{KvEngine, SstWriter}; +use tikv_util::time::Instant; +use txn_types::{is_short_value, Key, TimeStamp, Write as KvWrite, WriteType}; + +use crate::import_file::ImportPath; +use crate::metrics::*; +use crate::Result; + +pub struct TxnSSTWriter { + default: E::SstWriter, + default_entries: u64, + default_bytes: u64, + default_path: ImportPath, + default_meta: SstMeta, + write: E::SstWriter, + write_entries: u64, + write_bytes: u64, + write_path: ImportPath, + write_meta: SstMeta, + key_manager: Option>, +} + +impl TxnSSTWriter { + pub fn new( + default: E::SstWriter, + write: E::SstWriter, + default_path: ImportPath, + write_path: ImportPath, + default_meta: SstMeta, + write_meta: SstMeta, + key_manager: Option>, + ) -> Self { + TxnSSTWriter { + default, + default_path, + default_entries: 0, + default_bytes: 0, + default_meta, + write, + write_path, + write_entries: 0, + write_bytes: 0, + write_meta, + key_manager, + } + } + + pub fn write(&mut self, batch: WriteBatch) -> Result<()> { + let start = Instant::now_coarse(); + + let commit_ts = TimeStamp::new(batch.get_commit_ts()); + for m in batch.get_pairs().iter() { + let k = Key::from_raw(m.get_key()).append_ts(commit_ts); + self.put(k.as_encoded(), m.get_value(), m.get_op())?; + } + + IMPORT_LOCAL_WRITE_CHUNK_DURATION_VEC + .with_label_values(&["txn"]) + .observe(start.saturating_elapsed().as_secs_f64()); + Ok(()) + } + + fn put(&mut self, key: &[u8], value: &[u8], op: PairOp) -> Result<()> { + let k = keys::data_key(key); + let (_, commit_ts) = Key::split_on_ts_for(key)?; + let w = match (op, is_short_value(value)) { + (PairOp::Delete, _) => KvWrite::new(WriteType::Delete, commit_ts, None), + (PairOp::Put, true) => KvWrite::new(WriteType::Put, commit_ts, Some(value.to_vec())), + (PairOp::Put, false) => { + self.default.put(&k, value)?; + self.default_entries += 1; + self.default_bytes += (k.len() + value.len()) as u64; + KvWrite::new(WriteType::Put, commit_ts, None) + } + }; + let write = w.as_ref().to_bytes(); + self.write.put(&k, &write)?; + self.write_entries += 1; + self.write_bytes += (k.len() + write.len()) as u64; + Ok(()) + } + + pub fn finish(self) -> Result> { + let default_meta = self.default_meta.clone(); + let write_meta = self.write_meta.clone(); + let mut metas = Vec::with_capacity(2); + let (default_entries, write_entries) = (self.default_entries, self.write_entries); + let (default_bytes, write_bytes) = (self.default_bytes, self.write_bytes); + let (p1, p2) = (self.default_path.clone(), self.write_path.clone()); + let (w1, w2, key_manager) = (self.default, self.write, self.key_manager); + + if default_entries > 0 { + w1.finish()?; + p1.save(key_manager.as_deref())?; + metas.push(default_meta); + } + if write_entries > 0 { + w2.finish()?; + p2.save(key_manager.as_deref())?; + metas.push(write_meta); + } + + info!("finish write to sst"; + "default entries" => default_entries, + "default bytes" => default_bytes, + "write entries" => write_entries, + "write bytes" => write_bytes, + ); + IMPORT_LOCAL_WRITE_KEYS_VEC + .with_label_values(&["txn_default_cf"]) + .inc_by(default_entries); + IMPORT_LOCAL_WRITE_BYTES_VEC + .with_label_values(&["txn_default_cf"]) + .inc_by(default_bytes); + IMPORT_LOCAL_WRITE_KEYS_VEC + .with_label_values(&["txn_write_cf"]) + .inc_by(write_entries); + IMPORT_LOCAL_WRITE_BYTES_VEC + .with_label_values(&["txn_write_cf"]) + .inc_by(write_bytes); + + Ok(metas) + } +} + +pub struct RawSSTWriter { + default: E::SstWriter, + default_entries: u64, + default_deletes: u64, + default_bytes: u64, + default_path: ImportPath, + default_meta: SstMeta, + key_manager: Option>, + enable_ttl: bool, +} + +impl RawSSTWriter { + pub fn new( + default: E::SstWriter, + default_path: ImportPath, + default_meta: SstMeta, + key_manager: Option>, + enable_ttl: bool, + ) -> Self { + RawSSTWriter { + default, + default_path, + default_entries: 0, + default_bytes: 0, + default_deletes: 0, + default_meta, + key_manager, + enable_ttl, + } + } + + fn put(&mut self, key: &[u8], value: &[u8], op: PairOp) -> Result<()> { + let k = keys::data_key(key); + match op { + PairOp::Delete => { + self.default.delete(&k)?; + self.default_deletes += 1; + self.default_bytes += k.len() as u64; + } + PairOp::Put => { + self.default.put(&k, value)?; + self.default_entries += 1; + self.default_bytes += (k.len() + value.len()) as u64; + } + } + Ok(()) + } + + pub fn write(&mut self, mut batch: RawWriteBatch) -> Result<()> { + let start = Instant::now_coarse(); + + if self.enable_ttl { + let ttl = batch.get_ttl(); + let expire_ts = ttl_to_expire_ts(ttl); + for mut m in batch.take_pairs().into_iter() { + if m.get_op() == PairOp::Put { + append_expire_ts(m.mut_value(), expire_ts); + } + self.put(m.get_key(), m.get_value(), m.get_op())?; + } + } else { + if batch.get_ttl() != 0 { + return Err(crate::Error::TTLNotEnabled); + } + for m in batch.take_pairs().into_iter() { + self.put(m.get_key(), m.get_value(), m.get_op())?; + } + } + + IMPORT_LOCAL_WRITE_CHUNK_DURATION_VEC + .with_label_values(&["raw"]) + .observe(start.saturating_elapsed().as_secs_f64()); + Ok(()) + } + + pub fn finish(self) -> Result> { + if self.default_entries == 0 { + return Ok(vec![]); + } + + self.default.finish()?; + self.default_path.save(self.key_manager.as_deref())?; + + info!( + "finish raw write to sst"; + "default entries" => self.default_entries, + "default bytes" => self.default_deletes, + "default bytes" => self.default_bytes + ); + IMPORT_LOCAL_WRITE_KEYS_VEC + .with_label_values(&["raw_default_cf"]) + .inc_by(self.default_entries); + IMPORT_LOCAL_WRITE_KEYS_VEC + .with_label_values(&["raw_default_cf_delete"]) + .inc_by(self.default_deletes); + IMPORT_LOCAL_WRITE_BYTES_VEC + .with_label_values(&["raw_default_cf"]) + .inc_by(self.default_bytes); + + Ok(vec![self.default_meta]) + } +} + +#[cfg(test)] +mod tests { + use engine_traits::DATA_CFS; + use test_sst_importer::*; + use uuid::Uuid; + + use super::*; + use crate::{Config, SSTImporter}; + + #[test] + fn test_write_txn_sst() { + let mut meta = SstMeta::default(); + meta.set_uuid(Uuid::new_v4().as_bytes().to_vec()); + + let importer_dir = tempfile::tempdir().unwrap(); + let cfg = Config::default(); + let importer = SSTImporter::new(&cfg, &importer_dir, None, false).unwrap(); + let db_path = importer_dir.path().join("db"); + let db = new_test_engine(db_path.to_str().unwrap(), DATA_CFS); + + let mut w = importer.new_txn_writer::(&db, meta).unwrap(); + let mut batch = WriteBatch::default(); + let mut pairs = vec![]; + + // put short value kv in wirte cf + let mut pair = Pair::default(); + pair.set_key(b"k1".to_vec()); + pair.set_value(b"short_value".to_vec()); + pairs.push(pair); + + // put big value kv in default cf + let big_value = vec![42; 256]; + let mut pair = Pair::default(); + pair.set_key(b"k2".to_vec()); + pair.set_value(big_value); + pairs.push(pair); + + // put delete type key in write cf + let mut pair = Pair::default(); + pair.set_key(b"k3".to_vec()); + pair.set_op(PairOp::Delete); + pairs.push(pair); + + // generate two cf metas + batch.set_commit_ts(10); + batch.set_pairs(pairs.into()); + w.write(batch).unwrap(); + assert_eq!(w.write_entries, 3); + assert_eq!(w.default_entries, 1); + + let metas = w.finish().unwrap(); + assert_eq!(metas.len(), 2); + } + + #[test] + fn test_raw_write_sst() { + let mut meta = SstMeta::default(); + meta.set_uuid(Uuid::new_v4().as_bytes().to_vec()); + + let importer_dir = tempfile::tempdir().unwrap(); + let cfg = Config::default(); + let importer = SSTImporter::new(&cfg, &importer_dir, None, true).unwrap(); + let db_path = importer_dir.path().join("db"); + let db = new_test_engine(db_path.to_str().unwrap(), DATA_CFS); + + let mut w = importer.new_raw_writer::(&db, meta).unwrap(); + let mut batch = RawWriteBatch::default(); + let mut pairs = vec![]; + + // put value + let mut pair = Pair::default(); + pair.set_key(b"k1".to_vec()); + pair.set_value(b"short_value".to_vec()); + pairs.push(pair); + + // delete value + let mut pair = Pair::default(); + pair.set_key(b"k2".to_vec()); + pair.set_op(PairOp::Delete); + pairs.push(pair); + + // generate meta + batch.set_ttl(10); + batch.set_pairs(pairs.into()); + w.write(batch).unwrap(); + assert_eq!(w.default_entries, 1); + assert_eq!(w.default_deletes, 1); + // ttl takes 8 more bytes + assert_eq!( + w.default_bytes as usize, + b"zk1".len() + b"short_value".len() + 8 + b"zk2".len() + ); + + let metas = w.finish().unwrap(); + assert_eq!(metas.len(), 1); + } + + #[test] + fn test_raw_write_ttl_not_enabled() { + let mut meta = SstMeta::default(); + meta.set_uuid(Uuid::new_v4().as_bytes().to_vec()); + + let importer_dir = tempfile::tempdir().unwrap(); + let cfg = Config::default(); + let importer = SSTImporter::new(&cfg, &importer_dir, None, false).unwrap(); + let db_path = importer_dir.path().join("db"); + let db = new_test_engine(db_path.to_str().unwrap(), DATA_CFS); + + let mut w = importer.new_raw_writer::(&db, meta).unwrap(); + let mut batch = RawWriteBatch::default(); + batch.set_ttl(10); + assert!(w.write(batch).is_err()); + } +} diff --git a/components/test_raftstore/src/node.rs b/components/test_raftstore/src/node.rs index 51c5368430a..42a21c33fa0 100644 --- a/components/test_raftstore/src/node.rs +++ b/components/test_raftstore/src/node.rs @@ -250,7 +250,7 @@ impl Simulator for NodeCluster { let importer = { let dir = Path::new(engines.kv.path()).join("import-sst"); - Arc::new(SSTImporter::new(&cfg.import, dir, None).unwrap()) + Arc::new(SSTImporter::new(&cfg.import, dir, None, false).unwrap()) }; let local_reader = LocalReader::new(engines.kv.clone(), store_meta.clone(), router.clone()); diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index e8ddfdda40e..8e043eec56b 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -320,7 +320,7 @@ impl Simulator for ServerCluster { // Create import service. let importer = { let dir = Path::new(engines.kv.path()).join("import-sst"); - Arc::new(SSTImporter::new(&cfg.import, dir, key_manager.clone()).unwrap()) + Arc::new(SSTImporter::new(&cfg.import, dir, key_manager.clone(), false).unwrap()) }; let import_service = ImportSSTService::new( cfg.import.clone(), diff --git a/src/import/mod.rs b/src/import/mod.rs index 18b9144031b..2e258137cc2 100644 --- a/src/import/mod.rs +++ b/src/import/mod.rs @@ -18,7 +18,7 @@ mod sst_service; pub use self::sst_service::ImportSSTService; pub use sst_importer::Config; pub use sst_importer::{Error, Result}; -pub use sst_importer::{SSTImporter, SSTWriter}; +pub use sst_importer::{SSTImporter, TxnSSTWriter}; use grpcio::{RpcStatus, RpcStatusCode}; use std::fmt::Debug; diff --git a/src/import/sst_service.rs b/src/import/sst_service.rs index 8b47ce796fb..b2a941d0f2b 100644 --- a/src/import/sst_service.rs +++ b/src/import/sst_service.rs @@ -21,6 +21,8 @@ use kvproto::{errorpb, kvrpcpb::Context}; #[cfg(feature = "prost-codec")] use kvproto::import_sstpb::write_request::*; #[cfg(feature = "protobuf-codec")] +use kvproto::import_sstpb::RawWriteRequest_oneof_chunk as RawChunk; +#[cfg(feature = "protobuf-codec")] use kvproto::import_sstpb::WriteRequest_oneof_chunk as Chunk; use kvproto::import_sstpb::*; @@ -100,6 +102,7 @@ where let p = sst_meta_to_path(meta)?; Ok(slots.insert(p)) } + fn release_lock(task_slots: &Arc>>, meta: &SstMeta) -> Result { let mut slots = task_slots.lock().unwrap(); let p = sst_meta_to_path(meta)?; @@ -137,6 +140,7 @@ where term: header.get_current_term(), }) } + fn check_write_stall(&self) -> Option { if self.importer.get_mode() == SwitchMode::Normal && self @@ -224,6 +228,68 @@ where } } +#[macro_export] +macro_rules! impl_write { + ($fn:ident, $req_ty:ident, $resp_ty:ident, $chunk_ty:ident, $writer_fn:ident) => { + fn $fn( + &mut self, + _ctx: RpcContext<'_>, + stream: RequestStream<$req_ty>, + sink: ClientStreamingSink<$resp_ty>, + ) { + let import = self.importer.clone(); + let engine = self.engine.clone(); + let (rx, buf_driver) = + create_stream_with_buffer(stream, self.cfg.stream_channel_window); + let mut rx = rx.map_err(Error::from); + + let timer = Instant::now_coarse(); + let label = stringify!($fn); + let handle_task = async move { + let res = async move { + let first_req = rx.try_next().await?; + let meta = match first_req { + Some(r) => match r.chunk { + Some($chunk_ty::Meta(m)) => m, + _ => return Err(Error::InvalidChunk), + }, + _ => return Err(Error::InvalidChunk), + }; + + let writer = match import.$writer_fn(&engine, meta) { + Ok(w) => w, + Err(e) => { + error!("build writer failed {:?}", e); + return Err(Error::InvalidChunk); + } + }; + let writer = rx + .try_fold(writer, |mut writer, req| async move { + let batch = match req.chunk { + Some($chunk_ty::Batch(b)) => b, + _ => return Err(Error::InvalidChunk), + }; + writer.write(batch)?; + Ok(writer) + }) + .await?; + + writer.finish().map(|metas| { + let mut resp = $resp_ty::default(); + resp.set_metas(metas.into()); + resp + }) + } + .await; + crate::send_rpc_response!(res, sink, label, timer); + }; + + self.threads.spawn_ok(buf_driver); + self.threads.spawn_ok(handle_task); + } + }; +} + impl ImportSst for ImportSSTService where E: KvEngine, @@ -522,64 +588,6 @@ where ctx.spawn(ctx_task); } - fn write( - &mut self, - _ctx: RpcContext<'_>, - stream: RequestStream, - sink: ClientStreamingSink, - ) { - let label = "write"; - let timer = Instant::now_coarse(); - let import = self.importer.clone(); - let engine = self.engine.clone(); - let (rx, buf_driver) = create_stream_with_buffer(stream, self.cfg.stream_channel_window); - let mut rx = rx.map_err(Error::from); - - let handle_task = async move { - let res = async move { - let first_req = rx.try_next().await?; - let meta = match first_req { - Some(r) => match r.chunk { - Some(Chunk::Meta(m)) => m, - _ => return Err(Error::InvalidChunk), - }, - _ => return Err(Error::InvalidChunk), - }; - - let writer = match import.new_writer::(&engine, meta) { - Ok(w) => w, - Err(e) => { - error!("build writer failed {:?}", e); - return Err(Error::InvalidChunk); - } - }; - let writer = rx - .try_fold(writer, |mut writer, req| async move { - let start = Instant::now_coarse(); - let batch = match req.chunk { - Some(Chunk::Batch(b)) => b, - _ => return Err(Error::InvalidChunk), - }; - writer.write(batch)?; - IMPORT_WRITE_CHUNK_DURATION.observe(start.saturating_elapsed_secs()); - Ok(writer) - }) - .await?; - - writer.finish().map(|metas| { - let mut resp = WriteResponse::default(); - resp.set_metas(metas.into()); - resp - }) - } - .await; - crate::send_rpc_response!(res, sink, label, timer); - }; - - self.threads.spawn_ok(buf_driver); - self.threads.spawn_ok(handle_task); - } - fn duplicate_detect( &mut self, _ctx: RpcContext<'_>, @@ -646,6 +654,16 @@ where }; self.threads.spawn_ok(handle_task); } + + impl_write!(write, WriteRequest, WriteResponse, Chunk, new_txn_writer); + + impl_write!( + raw_write, + RawWriteRequest, + RawWriteResponse, + RawChunk, + new_raw_writer + ); } // add error statistics from pb error response diff --git a/src/server/metrics.rs b/src/server/metrics.rs index e4f588c0b24..208e57d97c7 100644 --- a/src/server/metrics.rs +++ b/src/server/metrics.rs @@ -42,6 +42,7 @@ make_auto_flush_static_metric! { raw_batch_delete, raw_get_key_ttl, raw_compare_and_swap, + raw_checksum, unsafe_destroy_range, physical_scan_lock, register_lock_observer, diff --git a/src/server/service/kv.rs b/src/server/service/kv.rs index 2b72f6add10..fadb5e8b514 100644 --- a/src/server/service/kv.rs +++ b/src/server/service/kv.rs @@ -291,6 +291,13 @@ impl + 'static, E: Engine, L: LockManager> Tikv for RawCasResponse ); + handle_request!( + raw_checksum, + future_raw_checksum, + RawChecksumRequest, + RawChecksumResponse + ); + fn kv_import(&mut self, _: RpcContext<'_>, _: ImportRequest, _: UnarySink) { unimplemented!(); } @@ -707,6 +714,7 @@ impl + 'static, E: Engine, L: LockManager> Tikv for } } + #[allow(clippy::collapsible_else_if)] fn split_region( &mut self, ctx: RpcContext<'_>, @@ -718,13 +726,21 @@ impl + 'static, E: Engine, L: LockManager> Tikv for let region_id = req.get_context().get_region_id(); let (cb, f) = paired_future_callback(); - let mut split_keys = if !req.get_split_key().is_empty() { - vec![Key::from_raw(req.get_split_key()).into_encoded()] + let mut split_keys = if req.is_raw_kv { + if !req.get_split_key().is_empty() { + vec![req.get_split_key().to_vec()] + } else { + req.take_split_keys().to_vec() + } } else { - req.take_split_keys() - .into_iter() - .map(|x| Key::from_raw(&x).into_encoded()) - .collect() + if !req.get_split_key().is_empty() { + vec![Key::from_raw(req.get_split_key()).into_encoded()] + } else { + req.take_split_keys() + .into_iter() + .map(|x| Key::from_raw(&x).into_encoded()) + .collect() + } }; split_keys.sort(); let req = CasualMessage::SplitRegion { @@ -1724,6 +1740,34 @@ fn future_raw_compare_and_swap( } } +fn future_raw_checksum( + storage: &Storage, + mut req: RawChecksumRequest, +) -> impl Future> { + let f = storage.raw_checksum( + req.take_context(), + req.get_algorithm(), + req.take_ranges().into(), + ); + async move { + let v = f.await; + let mut resp = RawChecksumResponse::default(); + if let Some(err) = extract_region_error(&v) { + resp.set_region_error(err); + } else { + match v { + Ok((checksum, kvs, bytes)) => { + resp.set_checksum(checksum); + resp.set_total_kvs(kvs); + resp.set_total_bytes(bytes); + } + Err(e) => resp.set_error(format!("{}", e)), + } + } + Ok(resp) + } +} + fn future_copr( copr: &Endpoint, peer: Option, diff --git a/src/server/ttl/mod.rs b/src/server/ttl/mod.rs index 32e9f1897f0..0dafaecf700 100644 --- a/src/server/ttl/mod.rs +++ b/src/server/ttl/mod.rs @@ -3,5 +3,5 @@ mod ttl_checker; mod ttl_compaction_filter; -pub use ttl_checker::{TTLChecker, Task as TTLCheckerTask}; +pub use ttl_checker::{check_ttl_and_compact_files, TTLChecker, Task as TTLCheckerTask}; pub use ttl_compaction_filter::TTLCompactionFilterFactory; diff --git a/src/server/ttl/ttl_checker.rs b/src/server/ttl/ttl_checker.rs index 1c44d619004..8043794e628 100644 --- a/src/server/ttl/ttl_checker.rs +++ b/src/server/ttl/ttl_checker.rs @@ -141,7 +141,7 @@ impl RunnableWithTimer for TTLChecker } } -fn check_ttl_and_compact_files( +pub fn check_ttl_and_compact_files( engine: &E, start_key: &[u8], end_key: &[u8], @@ -211,69 +211,3 @@ fn check_ttl_and_compact_files( "time_takes" => ?timer.saturating_elapsed(), ); } - -#[cfg(test)] -mod tests { - use super::*; - - use crate::config::DbConfig; - use crate::storage::kv::TestEngineBuilder; - use crate::storage::raw::ttl::TEST_CURRENT_TS; - use engine_traits::util::append_expire_ts; - use engine_traits::{MiscExt, Peekable, SyncMutable, CF_DEFAULT}; - - #[test] - fn test_ttl_checker() { - let mut cfg = DbConfig::default(); - cfg.defaultcf.disable_auto_compactions = true; - let dir = tempfile::TempDir::new().unwrap(); - let builder = TestEngineBuilder::new().path(dir.path()).ttl(true); - let engine = builder.build_with_cfg(&cfg).unwrap(); - - let kvdb = engine.get_rocksdb(); - let key1 = b"zkey1"; - let mut value1 = vec![0; 10]; - append_expire_ts(&mut value1, 10); - kvdb.put_cf(CF_DEFAULT, key1, &value1).unwrap(); - kvdb.flush_cf(CF_DEFAULT, true).unwrap(); - let key2 = b"zkey2"; - let mut value2 = vec![0; 10]; - append_expire_ts(&mut value2, TEST_CURRENT_TS + 20); - kvdb.put_cf(CF_DEFAULT, key2, &value2).unwrap(); - let key3 = b"zkey3"; - let mut value3 = vec![0; 10]; - append_expire_ts(&mut value3, 20); - kvdb.put_cf(CF_DEFAULT, key3, &value3).unwrap(); - kvdb.flush_cf(CF_DEFAULT, true).unwrap(); - let key4 = b"zkey4"; - let mut value4 = vec![0; 10]; - append_expire_ts(&mut value4, 0); - kvdb.put_cf(CF_DEFAULT, key4, &value4).unwrap(); - kvdb.flush_cf(CF_DEFAULT, true).unwrap(); - let key5 = b"zkey5"; - let mut value5 = vec![0; 10]; - append_expire_ts(&mut value5, 10); - kvdb.put_cf(CF_DEFAULT, key5, &value5).unwrap(); - kvdb.flush_cf(CF_DEFAULT, true).unwrap(); - - assert!(kvdb.get_value_cf(CF_DEFAULT, key1).unwrap().is_some()); - assert!(kvdb.get_value_cf(CF_DEFAULT, key2).unwrap().is_some()); - assert!(kvdb.get_value_cf(CF_DEFAULT, key3).unwrap().is_some()); - assert!(kvdb.get_value_cf(CF_DEFAULT, key4).unwrap().is_some()); - assert!(kvdb.get_value_cf(CF_DEFAULT, key5).unwrap().is_some()); - - let _ = check_ttl_and_compact_files(&kvdb, b"zkey1", b"zkey25", false); - assert!(kvdb.get_value_cf(CF_DEFAULT, key1).unwrap().is_none()); - assert!(kvdb.get_value_cf(CF_DEFAULT, key2).unwrap().is_some()); - assert!(kvdb.get_value_cf(CF_DEFAULT, key3).unwrap().is_none()); - assert!(kvdb.get_value_cf(CF_DEFAULT, key4).unwrap().is_some()); - assert!(kvdb.get_value_cf(CF_DEFAULT, key5).unwrap().is_some()); - - let _ = check_ttl_and_compact_files(&kvdb, b"zkey2", b"zkey6", false); - assert!(kvdb.get_value_cf(CF_DEFAULT, key1).unwrap().is_none()); - assert!(kvdb.get_value_cf(CF_DEFAULT, key2).unwrap().is_some()); - assert!(kvdb.get_value_cf(CF_DEFAULT, key3).unwrap().is_none()); - assert!(kvdb.get_value_cf(CF_DEFAULT, key4).unwrap().is_some()); - assert!(kvdb.get_value_cf(CF_DEFAULT, key5).unwrap().is_none()); - } -} diff --git a/src/server/ttl/ttl_compaction_filter.rs b/src/server/ttl/ttl_compaction_filter.rs index 843cd9d068f..d920a1588eb 100644 --- a/src/server/ttl/ttl_compaction_filter.rs +++ b/src/server/ttl/ttl_compaction_filter.rs @@ -3,13 +3,12 @@ use std::ffi::CString; use crate::server::metrics::TTL_CHECKER_ACTIONS_COUNTER_VEC; -use crate::storage::raw::ttl::current_ts; use engine_rocks::raw::{ new_compaction_filter_raw, CompactionFilter, CompactionFilterContext, CompactionFilterDecision, CompactionFilterFactory, CompactionFilterValueType, DBCompactionFilter, }; use engine_rocks::{RocksTtlProperties, RocksUserCollectedPropertiesNoRc}; -use engine_traits::util::get_expire_ts; +use engine_traits::util::{get_expire_ts, ttl_current_ts}; pub struct TTLCompactionFilterFactory; @@ -18,7 +17,7 @@ impl CompactionFilterFactory for TTLCompactionFilterFactory { &self, context: &CompactionFilterContext, ) -> *mut DBCompactionFilter { - let current = current_ts(); + let current = ttl_current_ts(); let mut min_expire_ts = u64::MAX; for i in 0..context.file_numbers().len() { @@ -81,58 +80,3 @@ impl CompactionFilter for TTLCompactionFilter { } } } - -#[cfg(test)] -mod tests { - use engine_traits::util::append_expire_ts; - - use crate::config::DbConfig; - use crate::storage::kv::TestEngineBuilder; - use crate::storage::raw::ttl::TEST_CURRENT_TS; - use engine_rocks::raw::CompactOptions; - use engine_rocks::util::get_cf_handle; - use engine_traits::{MiscExt, Peekable, SyncMutable, CF_DEFAULT}; - - #[test] - fn test_ttl_compaction_filter() { - let mut cfg = DbConfig::default(); - cfg.writecf.disable_auto_compactions = true; - let dir = tempfile::TempDir::new().unwrap(); - let builder = TestEngineBuilder::new().path(dir.path()).ttl(true); - let engine = builder.build_with_cfg(&cfg).unwrap(); - let kvdb = engine.get_rocksdb(); - - let key1 = b"zkey1"; - let mut value1 = vec![0; 10]; - append_expire_ts(&mut value1, 10); - kvdb.put_cf(CF_DEFAULT, key1, &value1).unwrap(); - kvdb.flush_cf(CF_DEFAULT, true).unwrap(); - - let db = kvdb.as_inner(); - let handle = get_cf_handle(db, CF_DEFAULT).unwrap(); - db.compact_range_cf_opt(handle, &CompactOptions::new(), None, None); - - assert!(kvdb.get_value_cf(CF_DEFAULT, key1).unwrap().is_none()); - - let key2 = b"zkey2"; - let mut value2 = vec![0; 10]; - append_expire_ts(&mut value2, TEST_CURRENT_TS + 20); - kvdb.put_cf(CF_DEFAULT, key2, &value2).unwrap(); - let key3 = b"zkey3"; - let mut value3 = vec![0; 10]; - append_expire_ts(&mut value3, 20); - kvdb.put_cf(CF_DEFAULT, key3, &value3).unwrap(); - kvdb.flush_cf(CF_DEFAULT, true).unwrap(); - - let key4 = b"zkey4"; - let mut value4 = vec![0; 10]; - append_expire_ts(&mut value4, 0); - kvdb.put_cf(CF_DEFAULT, key4, &value4).unwrap(); - kvdb.flush_cf(CF_DEFAULT, true).unwrap(); - - db.compact_range_cf_opt(handle, &CompactOptions::new(), None, None); - assert!(kvdb.get_value_cf(CF_DEFAULT, key2).unwrap().is_some()); - assert!(kvdb.get_value_cf(CF_DEFAULT, key3).unwrap().is_none()); - assert!(kvdb.get_value_cf(CF_DEFAULT, key4).unwrap().is_some()); - } -} diff --git a/src/storage/metrics.rs b/src/storage/metrics.rs index 8d5c66fbb5a..561e90315cc 100644 --- a/src/storage/metrics.rs +++ b/src/storage/metrics.rs @@ -141,6 +141,7 @@ make_auto_flush_static_metric! { raw_get_key_ttl, raw_compare_and_swap, raw_atomic_store, + raw_checksum, } pub label_enum CommandStageKind { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 6feeb22933f..8a69adbb245 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -80,7 +80,8 @@ use concurrency_manager::ConcurrencyManager; use engine_traits::{CfName, CF_DEFAULT, DATA_CFS}; use futures::prelude::*; use kvproto::kvrpcpb::{ - CommandPri, Context, GetRequest, IsolationLevel, KeyRange, LockInfo, RawGetRequest, + ChecksumAlgorithm, CommandPri, Context, GetRequest, IsolationLevel, KeyRange, LockInfo, + RawGetRequest, }; use kvproto::pdpb::QueryKind; use raftstore::store::util::build_key_range; @@ -1718,6 +1719,61 @@ impl Storage { let cmd = RawAtomicStore::new(cf, muations, None, ctx); self.sched_txn_command(cmd, callback) } + + pub fn raw_checksum( + &self, + ctx: Context, + algorithm: ChecksumAlgorithm, + ranges: Vec, + ) -> impl Future> { + const CMD: CommandKind = CommandKind::raw_checksum; + let priority = ctx.get_priority(); + let priority_tag = get_priority_tag(priority); + let enable_ttl = self.enable_ttl; + + let res = self.read_pool.spawn_handle( + async move { + KV_COMMAND_COUNTER_VEC_STATIC.get(CMD).inc(); + SCHED_COMMANDS_PRI_COUNTER_VEC_STATIC + .get(priority_tag) + .inc(); + + if algorithm != ChecksumAlgorithm::Crc64Xor { + return Err(box_err!("unknown checksum algorithm {:?}", algorithm)); + } + + let command_duration = tikv_util::time::Instant::now_coarse(); + let snap_ctx = SnapContext { + pb_ctx: &ctx, + ..Default::default() + }; + let snapshot = + Self::with_tls_engine(|engine| Self::snapshot(engine, snap_ctx)).await?; + let begin_instant = tikv_util::time::Instant::now_coarse(); + let ret = if enable_ttl { + let snap = TTLSnapshot::from(snapshot); + raw::raw_checksum_ranges(snap, ranges).await + } else { + raw::raw_checksum_ranges(snapshot, ranges).await + }; + SCHED_PROCESSING_READ_HISTOGRAM_STATIC + .get(CMD) + .observe(begin_instant.saturating_elapsed().as_secs_f64()); + SCHED_HISTOGRAM_VEC_STATIC + .get(CMD) + .observe(command_duration.saturating_elapsed().as_secs_f64()); + + ret + }, + priority, + thread_rng().next_u64(), + ); + + async move { + res.map_err(|_| Error::from(ErrorInner::SchedTooBusy)) + .await? + } + } } fn get_priority_tag(priority: CommandPri) -> CommandPriority { diff --git a/src/storage/raw/mod.rs b/src/storage/raw/mod.rs index 82ca5183809..bc763375f00 100644 --- a/src/storage/raw/mod.rs +++ b/src/storage/raw/mod.rs @@ -3,5 +3,5 @@ mod store; pub mod ttl; -pub use store::RawStore; +pub use store::{raw_checksum_ranges, RawStore}; pub use ttl::TTLSnapshot; diff --git a/src/storage/raw/store.rs b/src/storage/raw/store.rs index bb22de10b5b..0c37b4c8430 100644 --- a/src/storage/raw/store.rs +++ b/src/storage/raw/store.rs @@ -2,15 +2,15 @@ use super::ttl::TTLSnapshot; -use crate::storage::kv::{Cursor, ScanMode, Snapshot}; +use crate::storage::kv::{Cursor, Iterator, ScanMode, Snapshot}; use crate::storage::Statistics; use crate::storage::{Error, Result}; use engine_traits::{CfName, IterOptions, DATA_KEY_PREFIX_LEN}; -use txn_types::{Key, KvPair}; - +use kvproto::kvrpcpb::KeyRange; use std::time::Duration; use tikv_util::time::Instant; +use txn_types::{Key, KvPair}; use yatp::task::future::reschedule; const MAX_TIME_SLICE: Duration = Duration::from_millis(2); @@ -238,3 +238,38 @@ impl<'a, S: Snapshot> RawStoreInner { Ok(pairs) } } + +pub async fn raw_checksum_ranges( + snapshot: S, + ranges: Vec, +) -> Result<(u64, u64, u64)> { + let mut total_bytes = 0; + let mut total_kvs = 0; + let mut digest = crc64fast::Digest::new(); + let mut row_count = 0; + let mut time_slice_start = Instant::now(); + for r in ranges { + let mut opts = IterOptions::new(None, None, false); + opts.set_upper_bound(r.get_end_key(), DATA_KEY_PREFIX_LEN); + let mut iter = snapshot.iter(opts)?; + iter.seek(&Key::from_encoded(r.get_start_key().to_vec()))?; + while iter.valid()? { + row_count += 1; + if row_count >= MAX_BATCH_SIZE { + if time_slice_start.saturating_elapsed() > MAX_TIME_SLICE { + reschedule().await; + time_slice_start = Instant::now(); + } + row_count = 0; + } + let k = iter.key(); + let v = iter.value(); + digest.write(k); + digest.write(v); + total_kvs += 1; + total_bytes += k.len() + v.len(); + iter.next()?; + } + } + Ok((digest.sum64(), total_kvs, total_bytes as u64)) +} diff --git a/tests/failpoints/cases/mod.rs b/tests/failpoints/cases/mod.rs index 75a52757518..0c69d41f503 100644 --- a/tests/failpoints/cases/mod.rs +++ b/tests/failpoints/cases/mod.rs @@ -25,3 +25,4 @@ mod test_stale_read; mod test_storage; mod test_transaction; mod test_transfer_leader; +mod test_ttl; diff --git a/tests/failpoints/cases/test_ttl.rs b/tests/failpoints/cases/test_ttl.rs new file mode 100644 index 00000000000..0546e3ee318 --- /dev/null +++ b/tests/failpoints/cases/test_ttl.rs @@ -0,0 +1,109 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. + +use engine_rocks::raw::CompactOptions; +use engine_rocks::util::get_cf_handle; +use engine_traits::util::append_expire_ts; +use engine_traits::{MiscExt, Peekable, SyncMutable, CF_DEFAULT}; +use tikv::config::DbConfig; +use tikv::server::ttl::check_ttl_and_compact_files; +use tikv::storage::kv::TestEngineBuilder; + +#[test] +fn test_ttl_checker() { + fail::cfg("ttl_current_ts", "return(100)").unwrap(); + let mut cfg = DbConfig::default(); + cfg.defaultcf.disable_auto_compactions = true; + let dir = tempfile::TempDir::new().unwrap(); + let builder = TestEngineBuilder::new().path(dir.path()).ttl(true); + let engine = builder.build_with_cfg(&cfg).unwrap(); + + let kvdb = engine.get_rocksdb(); + let key1 = b"zkey1"; + let mut value1 = vec![0; 10]; + append_expire_ts(&mut value1, 10); + kvdb.put_cf(CF_DEFAULT, key1, &value1).unwrap(); + kvdb.flush_cf(CF_DEFAULT, true).unwrap(); + let key2 = b"zkey2"; + let mut value2 = vec![0; 10]; + append_expire_ts(&mut value2, 120); + kvdb.put_cf(CF_DEFAULT, key2, &value2).unwrap(); + let key3 = b"zkey3"; + let mut value3 = vec![0; 10]; + append_expire_ts(&mut value3, 20); + kvdb.put_cf(CF_DEFAULT, key3, &value3).unwrap(); + kvdb.flush_cf(CF_DEFAULT, true).unwrap(); + let key4 = b"zkey4"; + let mut value4 = vec![0; 10]; + append_expire_ts(&mut value4, 0); + kvdb.put_cf(CF_DEFAULT, key4, &value4).unwrap(); + kvdb.flush_cf(CF_DEFAULT, true).unwrap(); + let key5 = b"zkey5"; + let mut value5 = vec![0; 10]; + append_expire_ts(&mut value5, 10); + kvdb.put_cf(CF_DEFAULT, key5, &value5).unwrap(); + kvdb.flush_cf(CF_DEFAULT, true).unwrap(); + + assert!(kvdb.get_value_cf(CF_DEFAULT, key1).unwrap().is_some()); + assert!(kvdb.get_value_cf(CF_DEFAULT, key2).unwrap().is_some()); + assert!(kvdb.get_value_cf(CF_DEFAULT, key3).unwrap().is_some()); + assert!(kvdb.get_value_cf(CF_DEFAULT, key4).unwrap().is_some()); + assert!(kvdb.get_value_cf(CF_DEFAULT, key5).unwrap().is_some()); + + let _ = check_ttl_and_compact_files(&kvdb, b"zkey1", b"zkey25", false); + assert!(kvdb.get_value_cf(CF_DEFAULT, key1).unwrap().is_none()); + assert!(kvdb.get_value_cf(CF_DEFAULT, key2).unwrap().is_some()); + assert!(kvdb.get_value_cf(CF_DEFAULT, key3).unwrap().is_none()); + assert!(kvdb.get_value_cf(CF_DEFAULT, key4).unwrap().is_some()); + assert!(kvdb.get_value_cf(CF_DEFAULT, key5).unwrap().is_some()); + + let _ = check_ttl_and_compact_files(&kvdb, b"zkey2", b"zkey6", false); + assert!(kvdb.get_value_cf(CF_DEFAULT, key1).unwrap().is_none()); + assert!(kvdb.get_value_cf(CF_DEFAULT, key2).unwrap().is_some()); + assert!(kvdb.get_value_cf(CF_DEFAULT, key3).unwrap().is_none()); + assert!(kvdb.get_value_cf(CF_DEFAULT, key4).unwrap().is_some()); + assert!(kvdb.get_value_cf(CF_DEFAULT, key5).unwrap().is_none()); +} + +#[test] +fn test_ttl_compaction_filter() { + fail::cfg("ttl_current_ts", "return(100)").unwrap(); + let mut cfg = DbConfig::default(); + cfg.writecf.disable_auto_compactions = true; + let dir = tempfile::TempDir::new().unwrap(); + let builder = TestEngineBuilder::new().path(dir.path()).ttl(true); + let engine = builder.build_with_cfg(&cfg).unwrap(); + let kvdb = engine.get_rocksdb(); + + let key1 = b"zkey1"; + let mut value1 = vec![0; 10]; + append_expire_ts(&mut value1, 10); + kvdb.put_cf(CF_DEFAULT, key1, &value1).unwrap(); + kvdb.flush_cf(CF_DEFAULT, true).unwrap(); + + let db = kvdb.as_inner(); + let handle = get_cf_handle(db, CF_DEFAULT).unwrap(); + db.compact_range_cf_opt(handle, &CompactOptions::new(), None, None); + + assert!(kvdb.get_value_cf(CF_DEFAULT, key1).unwrap().is_none()); + + let key2 = b"zkey2"; + let mut value2 = vec![0; 10]; + append_expire_ts(&mut value2, 120); + kvdb.put_cf(CF_DEFAULT, key2, &value2).unwrap(); + let key3 = b"zkey3"; + let mut value3 = vec![0; 10]; + append_expire_ts(&mut value3, 20); + kvdb.put_cf(CF_DEFAULT, key3, &value3).unwrap(); + kvdb.flush_cf(CF_DEFAULT, true).unwrap(); + + let key4 = b"zkey4"; + let mut value4 = vec![0; 10]; + append_expire_ts(&mut value4, 0); + kvdb.put_cf(CF_DEFAULT, key4, &value4).unwrap(); + kvdb.flush_cf(CF_DEFAULT, true).unwrap(); + + db.compact_range_cf_opt(handle, &CompactOptions::new(), None, None); + assert!(kvdb.get_value_cf(CF_DEFAULT, key2).unwrap().is_some()); + assert!(kvdb.get_value_cf(CF_DEFAULT, key3).unwrap().is_none()); + assert!(kvdb.get_value_cf(CF_DEFAULT, key4).unwrap().is_some()); +} diff --git a/tests/integrations/config/dynamic/raftstore.rs b/tests/integrations/config/dynamic/raftstore.rs index 0e93b21934f..75e2295f525 100644 --- a/tests/integrations/config/dynamic/raftstore.rs +++ b/tests/integrations/config/dynamic/raftstore.rs @@ -76,7 +76,7 @@ fn start_raftstore( .as_path() .display() .to_string(); - Arc::new(SSTImporter::new(&cfg.import, &p, None).unwrap()) + Arc::new(SSTImporter::new(&cfg.import, &p, None, false).unwrap()) }; let snap_mgr = { let p = dir diff --git a/tests/integrations/raftstore/test_bootstrap.rs b/tests/integrations/raftstore/test_bootstrap.rs index 1259b4f221c..b065f65b603 100644 --- a/tests/integrations/raftstore/test_bootstrap.rs +++ b/tests/integrations/raftstore/test_bootstrap.rs @@ -97,7 +97,7 @@ fn test_node_bootstrap_with_prepared_data() { let importer = { let dir = tmp_path.path().join("import-sst"); - Arc::new(SSTImporter::new(&cfg.import, dir, None).unwrap()) + Arc::new(SSTImporter::new(&cfg.import, dir, None, false).unwrap()) }; let (split_check_scheduler, _) = dummy_scheduler(); diff --git a/tests/integrations/server/kv_service.rs b/tests/integrations/server/kv_service.rs index 120a3ca8e3b..59cadacccb1 100644 --- a/tests/integrations/server/kv_service.rs +++ b/tests/integrations/server/kv_service.rs @@ -948,7 +948,7 @@ fn test_double_run_node() { let coprocessor_host = CoprocessorHost::new(router, raftstore::coprocessor::Config::default()); let importer = { let dir = Path::new(engines.kv.path()).join("import-sst"); - Arc::new(SSTImporter::new(&ImportConfig::default(), dir, None).unwrap()) + Arc::new(SSTImporter::new(&ImportConfig::default(), dir, None, false).unwrap()) }; let (split_check_scheduler, _) = dummy_scheduler();