diff --git a/rust/cubestore/cubestore/src/cluster/mod.rs b/rust/cubestore/cubestore/src/cluster/mod.rs index 64abaf2df21bb..167487944ade9 100644 --- a/rust/cubestore/cubestore/src/cluster/mod.rs +++ b/rust/cubestore/cubestore/src/cluster/mod.rs @@ -15,7 +15,8 @@ pub mod ingestion; use crate::cluster::worker_pool::{worker_main, WorkerPool}; #[cfg(not(target_os = "windows"))] use crate::cluster::worker_services::{ - DefaultServicesServerProcessor, DefaultWorkerServicesDef, WorkerProcessing, + DefaultServicesServerProcessor, DefaultWorkerServicesDef, ServicesServerProcessor, + WorkerProcessing, }; use crate::ack_error; @@ -864,9 +865,10 @@ impl ClusterImpl { || self.config_obj.worker_bind_address().is_some()) && self.config_obj.select_worker_pool_size() > 0 { + let injector = self.injector.upgrade().unwrap(); let mut pool = self.select_process_pool.write().await; let arc = Arc::new(WorkerPool::new( - DefaultServicesServerProcessor::new(), + ServicesServerProcessor::init(injector).await, self.config_obj.select_worker_pool_size(), Duration::from_secs(self.config_obj.query_timeout()), "sel", diff --git a/rust/cubestore/cubestore/src/cluster/worker_pool.rs b/rust/cubestore/cubestore/src/cluster/worker_pool.rs index a8ea7b420e3f2..53e39945e45f3 100644 --- a/rust/cubestore/cubestore/src/cluster/worker_pool.rs +++ b/rust/cubestore/cubestore/src/cluster/worker_pool.rs @@ -445,6 +445,7 @@ mod tests { use tokio::runtime::Builder; use crate::cluster::worker_pool::{worker_main, WorkerPool}; + use crate::config::injection::Injector; use crate::config::Config; use crate::queryplanner::serialized_plan::SerializedLogicalPlan; use crate::util::respawn; @@ -637,6 +638,11 @@ mod tests { impl ServicesServerProcessor for TestServicesServerProcessor { type Request = i64; type Response = bool; + + async fn init(_injector: Arc) -> Arc { + Arc::new(Self {}) + } + async fn process(&self, request: i64) -> bool { request % 2 == 0 } diff --git a/rust/cubestore/cubestore/src/cluster/worker_services.rs b/rust/cubestore/cubestore/src/cluster/worker_services.rs index 5a6b40c5e1f75..a6e959ef3e127 100644 --- a/rust/cubestore/cubestore/src/cluster/worker_services.rs +++ b/rust/cubestore/cubestore/src/cluster/worker_services.rs @@ -1,3 +1,4 @@ +use crate::config::injection::Injector; use crate::util::cancellation_token_guard::CancellationGuard; use crate::CubeError; use async_trait::async_trait; @@ -39,6 +40,7 @@ pub trait WorkerProcessing { pub trait ServicesServerProcessor { type Request: Debug + Serialize + DeserializeOwned + Sync + Send + 'static; type Response: Debug + Serialize + DeserializeOwned + Sync + Send + 'static; + async fn init(injector: Arc) -> Arc; async fn process(&self, request: Self::Request) -> Self::Response; } @@ -70,6 +72,9 @@ impl DefaultServicesServerProcessor { impl ServicesServerProcessor for DefaultServicesServerProcessor { type Request = (); type Response = (); + async fn init(_injector: Arc) -> Arc { + Arc::new(Self {}) + } async fn process(&self, _request: ()) -> () { () } diff --git a/rust/cubestore/cubestore/src/metastore/rocks_fs.rs b/rust/cubestore/cubestore/src/metastore/rocks_fs.rs index 0dd8d34cbe962..90d0a7a97a757 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_fs.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_fs.rs @@ -121,7 +121,10 @@ impl BaseRocksStoreFs { return async move { let local = remote_fs.local_file(f.clone()).await?; // TODO persist file size - Ok::<_, CubeError>((f.clone(), remote_fs.upload_file(local, f.clone()).await?)) + Ok::<_, CubeError>(( + f.clone(), + remote_fs.upload_file(local, f.clone()).await?, + )) }; }) .collect::>(), diff --git a/rust/cubestore/cubestore/src/remotefs/gcs.rs b/rust/cubestore/cubestore/src/remotefs/gcs.rs index c631e061de97c..aea7ac9a7285b 100644 --- a/rust/cubestore/cubestore/src/remotefs/gcs.rs +++ b/rust/cubestore/cubestore/src/remotefs/gcs.rs @@ -1,6 +1,6 @@ use crate::app_metrics; use crate::di_service; -use crate::remotefs::{LocalDirRemoteFs, RemoteFile, RemoteFs, CommonRemoteFsUtils}; +use crate::remotefs::{CommonRemoteFsUtils, LocalDirRemoteFs, RemoteFile, RemoteFs}; use crate::util::lock::acquire_lock; use crate::CubeError; use async_trait::async_trait; @@ -119,7 +119,6 @@ di_service!(GCSRemoteFs, [RemoteFs]); #[async_trait] impl RemoteFs for GCSRemoteFs { - async fn temp_upload_path(&self, remote_path: String) -> Result { CommonRemoteFsUtils::temp_upload_path(self, remote_path).await } @@ -270,7 +269,10 @@ impl RemoteFs for GCSRemoteFs { .collect::>()) } - async fn list_with_metadata(&self, remote_prefix: String) -> Result, CubeError> { + async fn list_with_metadata( + &self, + remote_prefix: String, + ) -> Result, CubeError> { let prefix = self.gcs_path(&remote_prefix); let list = Object::list_prefix(self.bucket.as_str(), prefix.as_str()).await?; let leading_slash = Regex::new(format!("^{}", self.gcs_path("")).as_str()).unwrap(); diff --git a/rust/cubestore/cubestore/src/remotefs/minio.rs b/rust/cubestore/cubestore/src/remotefs/minio.rs index 5f008a322589a..670295c223288 100644 --- a/rust/cubestore/cubestore/src/remotefs/minio.rs +++ b/rust/cubestore/cubestore/src/remotefs/minio.rs @@ -1,5 +1,5 @@ use crate::di_service; -use crate::remotefs::{LocalDirRemoteFs, RemoteFile, RemoteFs, CommonRemoteFsUtils}; +use crate::remotefs::{CommonRemoteFsUtils, LocalDirRemoteFs, RemoteFile, RemoteFs}; use crate::util::lock::acquire_lock; use crate::CubeError; use async_trait::async_trait; @@ -144,7 +144,6 @@ di_service!(MINIORemoteFs, [RemoteFs]); #[async_trait] impl RemoteFs for MINIORemoteFs { - async fn temp_upload_path(&self, remote_path: String) -> Result { CommonRemoteFsUtils::temp_upload_path(self, remote_path).await } @@ -284,7 +283,10 @@ impl RemoteFs for MINIORemoteFs { .collect::>()) } - async fn list_with_metadata(&self, remote_prefix: String) -> Result, CubeError> { + async fn list_with_metadata( + &self, + remote_prefix: String, + ) -> Result, CubeError> { let path = self.s3_path(&remote_prefix); let bucket = self.bucket.read().unwrap().clone(); let list = cube_ext::spawn_blocking(move || bucket.list_blocking(path, None)).await??; diff --git a/rust/cubestore/cubestore/src/remotefs/mod.rs b/rust/cubestore/cubestore/src/remotefs/mod.rs index 7eb2c1064dd0e..4d55ba665dfca 100644 --- a/rust/cubestore/cubestore/src/remotefs/mod.rs +++ b/rust/cubestore/cubestore/src/remotefs/mod.rs @@ -14,13 +14,13 @@ use datafusion::cube_ext; use futures::future::BoxFuture; use futures::FutureExt; use log::debug; +use serde::{Deserialize, Serialize}; use std::fmt::Debug; use std::path::{Path, PathBuf}; use std::sync::Arc; use tempfile::{NamedTempFile, PathPersistError}; use tokio::fs; use tokio::sync::{Mutex, RwLock}; -use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RemoteFile { @@ -54,7 +54,7 @@ pub trait RemoteFs: DIService + Send + Sync + Debug { &self, remote_path: String, expected_size: u64, - ) -> Result<(), CubeError>; + ) -> Result<(), CubeError>; /// In addition to uploading this file to the remote filesystem, this function moves the file /// from `temp_upload_path` to `self.local_path(remote_path)` on the local file system. @@ -74,7 +74,8 @@ pub trait RemoteFs: DIService + Send + Sync + Debug { async fn list(&self, remote_prefix: String) -> Result, CubeError>; - async fn list_with_metadata(&self, remote_prefix: String) -> Result, CubeError>; + async fn list_with_metadata(&self, remote_prefix: String) + -> Result, CubeError>; async fn local_path(&self) -> Result; @@ -87,9 +88,14 @@ impl CommonRemoteFsUtils { /// /// Use this path to prepare files for upload. Writing into `local_path()` directly can result /// in files being deleted by the background cleanup process, see `QueueRemoteFs::cleanup_loop`. - pub async fn temp_upload_path(remote_fs: &dyn RemoteFs, remote_path: String) -> Result { + pub async fn temp_upload_path( + remote_fs: &dyn RemoteFs, + remote_path: String, + ) -> Result { // Putting files into a subdirectory prevents cleanups from removing them. - remote_fs.local_file(format!("uploads/{}", remote_path)).await + remote_fs + .local_file(format!("uploads/{}", remote_path)) + .await } /// Convention is to use this directory for creating files to be uploaded later. @@ -183,7 +189,6 @@ di_service!(RemoteFsRpcClient, [RemoteFs]); #[async_trait] impl RemoteFs for LocalDirRemoteFs { - async fn temp_upload_path(&self, remote_path: String) -> Result { CommonRemoteFsUtils::temp_upload_path(self, remote_path).await } @@ -333,7 +338,10 @@ impl RemoteFs for LocalDirRemoteFs { .collect::>()) } - async fn list_with_metadata(&self, remote_prefix: String) -> Result, CubeError> { + async fn list_with_metadata( + &self, + remote_prefix: String, + ) -> Result, CubeError> { let remote_dir = self.remote_dir.read().await.as_ref().cloned(); let result = Self::list_recursive( remote_dir.clone().unwrap_or(self.dir.clone()), @@ -494,9 +502,16 @@ mod tests { name_maker: NameMaker, download_test: bool, ) { - assert_eq!(remote_fs.local_path().await.unwrap(), local_dir.to_str().unwrap()); + assert_eq!( + remote_fs.local_path().await.unwrap(), + local_dir.to_str().unwrap() + ); - let local_file = remote_fs.local_file("test.tst".to_string()).await.ok().unwrap(); + let local_file = remote_fs + .local_file("test.tst".to_string()) + .await + .ok() + .unwrap(); assert_eq!(local_file, local_dir.join("test.tst").to_str().unwrap()); let local_file_path = Path::new("test_dir") @@ -569,14 +584,20 @@ mod tests { for filename in root_files.iter().chain(subdir_files.iter()) { assert!(!local_dir.join(filename).is_file()); - remote_fs.download_file(filename.clone(), None).await.unwrap(); + remote_fs + .download_file(filename.clone(), None) + .await + .unwrap(); assert!(local_dir.join(filename).is_file()); } } for filename in root_files.iter().chain(subdir_files.iter()) { assert!(local_dir.join(&filename).is_file()); - assert_eq!(&remote_fs.list(filename.clone()).await.unwrap()[0], filename); + assert_eq!( + &remote_fs.list(filename.clone()).await.unwrap()[0], + filename + ); remote_fs.delete_file(filename.clone()).await.unwrap(); diff --git a/rust/cubestore/cubestore/src/remotefs/queue.rs b/rust/cubestore/cubestore/src/remotefs/queue.rs index ed667569edc36..6dd94cad30684 100644 --- a/rust/cubestore/cubestore/src/remotefs/queue.rs +++ b/rust/cubestore/cubestore/src/remotefs/queue.rs @@ -1,6 +1,6 @@ use crate::config::ConfigObj; use crate::di_service; -use crate::remotefs::{RemoteFile, RemoteFs, CommonRemoteFsUtils}; +use crate::remotefs::{CommonRemoteFsUtils, RemoteFile, RemoteFs}; use crate::util::lock::acquire_lock; use crate::CubeError; use async_trait::async_trait; @@ -33,9 +33,9 @@ pub struct QueueRemoteFs { impl Debug for QueueRemoteFs { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("QueueRemoteFs").finish() - //TODO FIX IT - /* .field("remote_fs", &self.remote_fs) - .finish() */ + //TODO FIX IT + /* .field("remote_fs", &self.remote_fs) + .finish() */ } } @@ -187,7 +187,6 @@ impl QueueRemoteFs { #[async_trait] impl RemoteFs for QueueRemoteFs { - async fn temp_upload_path(&self, remote_path: String) -> Result { CommonRemoteFsUtils::temp_upload_path(self, remote_path).await } @@ -325,7 +324,10 @@ impl RemoteFs for QueueRemoteFs { self.remote_fs.list(remote_prefix).await } - async fn list_with_metadata(&self, remote_prefix: String) -> Result, CubeError> { + async fn list_with_metadata( + &self, + remote_prefix: String, + ) -> Result, CubeError> { self.remote_fs.list_with_metadata(remote_prefix).await } @@ -388,7 +390,6 @@ mod test { #[async_trait] impl RemoteFs for MockFs { - async fn temp_upload_path(&self, remote_path: String) -> Result { CommonRemoteFsUtils::temp_upload_path(self, remote_path).await } @@ -513,7 +514,10 @@ mod test { let r = tokio::spawn(QueueRemoteFs::wait_processing_loops(queue_fs.clone())); let res = queue_fs - .upload_file(path.to_str().unwrap().to_string(), "temp-upload/foo.csv".to_string()) + .upload_file( + path.to_str().unwrap().to_string(), + "temp-upload/foo.csv".to_string(), + ) .await; queue_fs.stop_processing_loops().unwrap(); r.await.unwrap().unwrap(); @@ -536,7 +540,10 @@ mod test { let r = tokio::spawn(QueueRemoteFs::wait_processing_loops(queue_fs.clone())); let res = queue_fs - .upload_file(path.to_str().unwrap().to_string(), "temp-upload/foo.csv".to_string()) + .upload_file( + path.to_str().unwrap().to_string(), + "temp-upload/foo.csv".to_string(), + ) .await; queue_fs.stop_processing_loops().unwrap(); r.await.unwrap().unwrap(); @@ -559,7 +566,10 @@ mod test { let r = tokio::spawn(QueueRemoteFs::wait_processing_loops(queue_fs.clone())); let res = queue_fs - .upload_file(path.to_str().unwrap().to_string(), "temp-upload/foo.csv".to_string()) + .upload_file( + path.to_str().unwrap().to_string(), + "temp-upload/foo.csv".to_string(), + ) .await; queue_fs.stop_processing_loops().unwrap(); r.await.unwrap().unwrap(); @@ -577,7 +587,9 @@ mod test { config.injector().get_service("original_remote_fs").await, ); let r = tokio::spawn(QueueRemoteFs::wait_processing_loops(queue_fs.clone())); - let res = queue_fs.download_file("temp-upload/foo.csv".to_string(), None).await; + let res = queue_fs + .download_file("temp-upload/foo.csv".to_string(), None) + .await; match res { Ok(_) => assert!(false), Err(e) => assert!(e.is_corrupt_data()), @@ -599,13 +611,24 @@ mod test { ); let r = tokio::spawn(QueueRemoteFs::wait_processing_loops(queue_fs.clone())); queue_fs - .upload_file(path.to_str().unwrap().to_string(), "temp-upload/foo.csv".to_string()) + .upload_file( + path.to_str().unwrap().to_string(), + "temp-upload/foo.csv".to_string(), + ) .await .unwrap(); - std::fs::remove_file(queue_fs.local_file("temp-upload/foo.csv".to_string()).await.unwrap()).unwrap(); + std::fs::remove_file( + queue_fs + .local_file("temp-upload/foo.csv".to_string()) + .await + .unwrap(), + ) + .unwrap(); - let res = queue_fs.download_file("temp-upload/foo.csv".to_string(), Some(1)).await; + let res = queue_fs + .download_file("temp-upload/foo.csv".to_string(), Some(1)) + .await; match res { Ok(_) => assert!(false), @@ -632,12 +655,23 @@ mod test { let r = tokio::spawn(QueueRemoteFs::wait_processing_loops(queue_fs.clone())); queue_fs - .upload_file(path.to_str().unwrap().to_string(), "temp-upload/foo.csv".to_string()) + .upload_file( + path.to_str().unwrap().to_string(), + "temp-upload/foo.csv".to_string(), + ) .await .unwrap(); - std::fs::remove_file(queue_fs.local_file("temp-upload/foo.csv".to_string()).await.unwrap()).unwrap(); + std::fs::remove_file( + queue_fs + .local_file("temp-upload/foo.csv".to_string()) + .await + .unwrap(), + ) + .unwrap(); - let res = queue_fs.download_file("temp-upload/foo.csv".to_string(), None).await; + let res = queue_fs + .download_file("temp-upload/foo.csv".to_string(), None) + .await; match res { Ok(_) => assert!(false), diff --git a/rust/cubestore/cubestore/src/remotefs/s3.rs b/rust/cubestore/cubestore/src/remotefs/s3.rs index 142116ec93bd1..10285cdf9ed8f 100644 --- a/rust/cubestore/cubestore/src/remotefs/s3.rs +++ b/rust/cubestore/cubestore/src/remotefs/s3.rs @@ -1,6 +1,6 @@ use crate::app_metrics; use crate::di_service; -use crate::remotefs::{LocalDirRemoteFs, RemoteFile, RemoteFs, CommonRemoteFsUtils}; +use crate::remotefs::{CommonRemoteFsUtils, LocalDirRemoteFs, RemoteFile, RemoteFs}; use crate::util::lock::acquire_lock; use crate::CubeError; use async_trait::async_trait; @@ -133,7 +133,6 @@ di_service!(S3RemoteFs, [RemoteFs]); #[async_trait] impl RemoteFs for S3RemoteFs { - async fn temp_upload_path(&self, remote_path: String) -> Result { CommonRemoteFsUtils::temp_upload_path(self, remote_path).await } @@ -290,7 +289,10 @@ impl RemoteFs for S3RemoteFs { .collect::>()) } - async fn list_with_metadata(&self, remote_prefix: String) -> Result, CubeError> { + async fn list_with_metadata( + &self, + remote_prefix: String, + ) -> Result, CubeError> { let path = self.s3_path(&remote_prefix); let bucket = self.bucket.read().unwrap().clone(); let list = cube_ext::spawn_blocking(move || bucket.list_blocking(path, None)).await??; diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 44c3a4772fe7b..42b2f43bd4254 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -2194,7 +2194,6 @@ mod tests { #[async_trait::async_trait] impl RemoteFs for FailingRemoteFs { - async fn temp_upload_path(&self, remote_path: String) -> Result { CommonRemoteFsUtils::temp_upload_path(self, remote_path).await } @@ -3925,7 +3924,10 @@ mod tests { .injector .get_service_typed::() .await - .upload_file(path_2.to_str().unwrap().to_string(), "temp-uploads/orders.csv.gz".to_string()) + .upload_file( + path_2.to_str().unwrap().to_string(), + "temp-uploads/orders.csv.gz".to_string(), + ) .await .unwrap(); diff --git a/rust/cubestore/cubestore/src/store/mod.rs b/rust/cubestore/cubestore/src/store/mod.rs index 9087ac99573bf..88a28922e07d1 100644 --- a/rust/cubestore/cubestore/src/store/mod.rs +++ b/rust/cubestore/cubestore/src/store/mod.rs @@ -315,7 +315,9 @@ impl WALDataStore for WALStore { ))); } let remote_path = WALStore::wal_remote_path(wal_id); - self.remote_fs.download_file(remote_path.clone(), None).await?; + self.remote_fs + .download_file(remote_path.clone(), None) + .await?; let local_file = self.remote_fs.local_file(remote_path.clone()).await?; Ok( cube_ext::spawn_blocking(move || -> Result { @@ -735,7 +737,10 @@ impl ChunkStore { let file_size = chunk.get_row().file_size(); let chunk_id = chunk.get_id(); let remote_path = ChunkStore::chunk_file_name(chunk); - let result = self.remote_fs.download_file(remote_path.clone(), file_size).await; + let result = self + .remote_fs + .download_file(remote_path.clone(), file_size) + .await; deactivate_table_on_corrupt_data( self.meta_store.clone(), @@ -1345,7 +1350,9 @@ impl ChunkStore { let fs = self.remote_fs.clone(); Ok(cube_ext::spawn(async move { - let file_size = fs.upload_file(local_file.to_string(), remote_path.clone()).await?; + let file_size = fs + .upload_file(local_file.to_string(), remote_path.clone()) + .await?; Ok((chunk, Some(file_size))) })) }