Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
waralexrom committed Sep 18, 2023
1 parent 5421221 commit 3441ac9
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 45 deletions.
6 changes: 4 additions & 2 deletions rust/cubestore/cubestore/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ pub mod ingestion;
use crate::cluster::worker_pool::{worker_main, WorkerPool};
#[cfg(not(target_os = "windows"))]
use crate::cluster::worker_services::{
DefaultServicesServerProcessor, DefaultWorkerServicesDef, WorkerProcessing,
DefaultServicesServerProcessor, DefaultWorkerServicesDef, ServicesServerProcessor,
WorkerProcessing,
};

use crate::ack_error;
Expand Down Expand Up @@ -864,9 +865,10 @@ impl ClusterImpl {
|| self.config_obj.worker_bind_address().is_some())
&& self.config_obj.select_worker_pool_size() > 0
{
let injector = self.injector.upgrade().unwrap();
let mut pool = self.select_process_pool.write().await;
let arc = Arc::new(WorkerPool::new(
DefaultServicesServerProcessor::new(),
ServicesServerProcessor::init(injector).await,
self.config_obj.select_worker_pool_size(),
Duration::from_secs(self.config_obj.query_timeout()),
"sel",
Expand Down
6 changes: 6 additions & 0 deletions rust/cubestore/cubestore/src/cluster/worker_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ mod tests {
use tokio::runtime::Builder;

use crate::cluster::worker_pool::{worker_main, WorkerPool};
use crate::config::injection::Injector;
use crate::config::Config;
use crate::queryplanner::serialized_plan::SerializedLogicalPlan;
use crate::util::respawn;
Expand Down Expand Up @@ -637,6 +638,11 @@ mod tests {
impl ServicesServerProcessor for TestServicesServerProcessor {
type Request = i64;
type Response = bool;

async fn init(_injector: Arc<Injector>) -> Arc<Self> {
Arc::new(Self {})
}

async fn process(&self, request: i64) -> bool {
request % 2 == 0
}
Expand Down
5 changes: 5 additions & 0 deletions rust/cubestore/cubestore/src/cluster/worker_services.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::config::injection::Injector;
use crate::util::cancellation_token_guard::CancellationGuard;
use crate::CubeError;
use async_trait::async_trait;
Expand Down Expand Up @@ -39,6 +40,7 @@ pub trait WorkerProcessing {
pub trait ServicesServerProcessor {
type Request: Debug + Serialize + DeserializeOwned + Sync + Send + 'static;
type Response: Debug + Serialize + DeserializeOwned + Sync + Send + 'static;
async fn init(injector: Arc<Injector>) -> Arc<Self>;
async fn process(&self, request: Self::Request) -> Self::Response;
}

Expand Down Expand Up @@ -70,6 +72,9 @@ impl DefaultServicesServerProcessor {
impl ServicesServerProcessor for DefaultServicesServerProcessor {
type Request = ();
type Response = ();
async fn init(_injector: Arc<Injector>) -> Arc<Self> {
Arc::new(Self {})
}
async fn process(&self, _request: ()) -> () {
()
}
Expand Down
5 changes: 4 additions & 1 deletion rust/cubestore/cubestore/src/metastore/rocks_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ impl BaseRocksStoreFs {
return async move {
let local = remote_fs.local_file(f.clone()).await?;
// TODO persist file size
Ok::<_, CubeError>((f.clone(), remote_fs.upload_file(local, f.clone()).await?))
Ok::<_, CubeError>((
f.clone(),
remote_fs.upload_file(local, f.clone()).await?,
))
};
})
.collect::<Vec<_>>(),
Expand Down
8 changes: 5 additions & 3 deletions rust/cubestore/cubestore/src/remotefs/gcs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::app_metrics;
use crate::di_service;
use crate::remotefs::{LocalDirRemoteFs, RemoteFile, RemoteFs, CommonRemoteFsUtils};
use crate::remotefs::{CommonRemoteFsUtils, LocalDirRemoteFs, RemoteFile, RemoteFs};
use crate::util::lock::acquire_lock;
use crate::CubeError;
use async_trait::async_trait;
Expand Down Expand Up @@ -119,7 +119,6 @@ di_service!(GCSRemoteFs, [RemoteFs]);

#[async_trait]
impl RemoteFs for GCSRemoteFs {

async fn temp_upload_path(&self, remote_path: String) -> Result<String, CubeError> {
CommonRemoteFsUtils::temp_upload_path(self, remote_path).await
}
Expand Down Expand Up @@ -270,7 +269,10 @@ impl RemoteFs for GCSRemoteFs {
.collect::<Vec<_>>())
}

async fn list_with_metadata(&self, remote_prefix: String) -> Result<Vec<RemoteFile>, CubeError> {
async fn list_with_metadata(
&self,
remote_prefix: String,
) -> Result<Vec<RemoteFile>, CubeError> {
let prefix = self.gcs_path(&remote_prefix);
let list = Object::list_prefix(self.bucket.as_str(), prefix.as_str()).await?;
let leading_slash = Regex::new(format!("^{}", self.gcs_path("")).as_str()).unwrap();
Expand Down
8 changes: 5 additions & 3 deletions rust/cubestore/cubestore/src/remotefs/minio.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::di_service;
use crate::remotefs::{LocalDirRemoteFs, RemoteFile, RemoteFs, CommonRemoteFsUtils};
use crate::remotefs::{CommonRemoteFsUtils, LocalDirRemoteFs, RemoteFile, RemoteFs};
use crate::util::lock::acquire_lock;
use crate::CubeError;
use async_trait::async_trait;
Expand Down Expand Up @@ -144,7 +144,6 @@ di_service!(MINIORemoteFs, [RemoteFs]);

#[async_trait]
impl RemoteFs for MINIORemoteFs {

async fn temp_upload_path(&self, remote_path: String) -> Result<String, CubeError> {
CommonRemoteFsUtils::temp_upload_path(self, remote_path).await
}
Expand Down Expand Up @@ -284,7 +283,10 @@ impl RemoteFs for MINIORemoteFs {
.collect::<Vec<_>>())
}

async fn list_with_metadata(&self, remote_prefix: String) -> Result<Vec<RemoteFile>, CubeError> {
async fn list_with_metadata(
&self,
remote_prefix: String,
) -> Result<Vec<RemoteFile>, CubeError> {
let path = self.s3_path(&remote_prefix);
let bucket = self.bucket.read().unwrap().clone();
let list = cube_ext::spawn_blocking(move || bucket.list_blocking(path, None)).await??;
Expand Down
43 changes: 32 additions & 11 deletions rust/cubestore/cubestore/src/remotefs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ use datafusion::cube_ext;
use futures::future::BoxFuture;
use futures::FutureExt;
use log::debug;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::{NamedTempFile, PathPersistError};
use tokio::fs;
use tokio::sync::{Mutex, RwLock};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemoteFile {
Expand Down Expand Up @@ -54,7 +54,7 @@ pub trait RemoteFs: DIService + Send + Sync + Debug {
&self,
remote_path: String,
expected_size: u64,
) -> Result<(), CubeError>;
) -> Result<(), CubeError>;

/// In addition to uploading this file to the remote filesystem, this function moves the file
/// from `temp_upload_path` to `self.local_path(remote_path)` on the local file system.
Expand All @@ -74,7 +74,8 @@ pub trait RemoteFs: DIService + Send + Sync + Debug {

async fn list(&self, remote_prefix: String) -> Result<Vec<String>, CubeError>;

async fn list_with_metadata(&self, remote_prefix: String) -> Result<Vec<RemoteFile>, CubeError>;
async fn list_with_metadata(&self, remote_prefix: String)
-> Result<Vec<RemoteFile>, CubeError>;

async fn local_path(&self) -> Result<String, CubeError>;

Expand All @@ -87,9 +88,14 @@ impl CommonRemoteFsUtils {
///
/// Use this path to prepare files for upload. Writing into `local_path()` directly can result
/// in files being deleted by the background cleanup process, see `QueueRemoteFs::cleanup_loop`.
pub async fn temp_upload_path(remote_fs: &dyn RemoteFs, remote_path: String) -> Result<String, CubeError> {
pub async fn temp_upload_path(
remote_fs: &dyn RemoteFs,
remote_path: String,
) -> Result<String, CubeError> {
// Putting files into a subdirectory prevents cleanups from removing them.
remote_fs.local_file(format!("uploads/{}", remote_path)).await
remote_fs
.local_file(format!("uploads/{}", remote_path))
.await
}

/// Convention is to use this directory for creating files to be uploaded later.
Expand Down Expand Up @@ -183,7 +189,6 @@ di_service!(RemoteFsRpcClient, [RemoteFs]);

#[async_trait]
impl RemoteFs for LocalDirRemoteFs {

async fn temp_upload_path(&self, remote_path: String) -> Result<String, CubeError> {
CommonRemoteFsUtils::temp_upload_path(self, remote_path).await
}
Expand Down Expand Up @@ -333,7 +338,10 @@ impl RemoteFs for LocalDirRemoteFs {
.collect::<Vec<_>>())
}

async fn list_with_metadata(&self, remote_prefix: String) -> Result<Vec<RemoteFile>, CubeError> {
async fn list_with_metadata(
&self,
remote_prefix: String,
) -> Result<Vec<RemoteFile>, CubeError> {
let remote_dir = self.remote_dir.read().await.as_ref().cloned();
let result = Self::list_recursive(
remote_dir.clone().unwrap_or(self.dir.clone()),
Expand Down Expand Up @@ -494,9 +502,16 @@ mod tests {
name_maker: NameMaker,
download_test: bool,
) {
assert_eq!(remote_fs.local_path().await.unwrap(), local_dir.to_str().unwrap());
assert_eq!(
remote_fs.local_path().await.unwrap(),
local_dir.to_str().unwrap()
);

let local_file = remote_fs.local_file("test.tst".to_string()).await.ok().unwrap();
let local_file = remote_fs
.local_file("test.tst".to_string())
.await
.ok()
.unwrap();
assert_eq!(local_file, local_dir.join("test.tst").to_str().unwrap());

let local_file_path = Path::new("test_dir")
Expand Down Expand Up @@ -569,14 +584,20 @@ mod tests {

for filename in root_files.iter().chain(subdir_files.iter()) {
assert!(!local_dir.join(filename).is_file());
remote_fs.download_file(filename.clone(), None).await.unwrap();
remote_fs
.download_file(filename.clone(), None)
.await
.unwrap();
assert!(local_dir.join(filename).is_file());
}
}

for filename in root_files.iter().chain(subdir_files.iter()) {
assert!(local_dir.join(&filename).is_file());
assert_eq!(&remote_fs.list(filename.clone()).await.unwrap()[0], filename);
assert_eq!(
&remote_fs.list(filename.clone()).await.unwrap()[0],
filename
);

remote_fs.delete_file(filename.clone()).await.unwrap();

Expand Down
68 changes: 51 additions & 17 deletions rust/cubestore/cubestore/src/remotefs/queue.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::config::ConfigObj;
use crate::di_service;
use crate::remotefs::{RemoteFile, RemoteFs, CommonRemoteFsUtils};
use crate::remotefs::{CommonRemoteFsUtils, RemoteFile, RemoteFs};
use crate::util::lock::acquire_lock;
use crate::CubeError;
use async_trait::async_trait;
Expand Down Expand Up @@ -33,9 +33,9 @@ pub struct QueueRemoteFs {
impl Debug for QueueRemoteFs {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("QueueRemoteFs").finish()
//TODO FIX IT
/* .field("remote_fs", &self.remote_fs)
.finish() */
//TODO FIX IT
/* .field("remote_fs", &self.remote_fs)
.finish() */
}
}

Expand Down Expand Up @@ -187,7 +187,6 @@ impl QueueRemoteFs {

#[async_trait]
impl RemoteFs for QueueRemoteFs {

async fn temp_upload_path(&self, remote_path: String) -> Result<String, CubeError> {
CommonRemoteFsUtils::temp_upload_path(self, remote_path).await
}
Expand Down Expand Up @@ -325,7 +324,10 @@ impl RemoteFs for QueueRemoteFs {
self.remote_fs.list(remote_prefix).await
}

async fn list_with_metadata(&self, remote_prefix: String) -> Result<Vec<RemoteFile>, CubeError> {
async fn list_with_metadata(
&self,
remote_prefix: String,
) -> Result<Vec<RemoteFile>, CubeError> {
self.remote_fs.list_with_metadata(remote_prefix).await
}

Expand Down Expand Up @@ -388,7 +390,6 @@ mod test {

#[async_trait]
impl RemoteFs for MockFs {

async fn temp_upload_path(&self, remote_path: String) -> Result<String, CubeError> {
CommonRemoteFsUtils::temp_upload_path(self, remote_path).await
}
Expand Down Expand Up @@ -513,7 +514,10 @@ mod test {

let r = tokio::spawn(QueueRemoteFs::wait_processing_loops(queue_fs.clone()));
let res = queue_fs
.upload_file(path.to_str().unwrap().to_string(), "temp-upload/foo.csv".to_string())
.upload_file(
path.to_str().unwrap().to_string(),
"temp-upload/foo.csv".to_string(),
)
.await;
queue_fs.stop_processing_loops().unwrap();
r.await.unwrap().unwrap();
Expand All @@ -536,7 +540,10 @@ mod test {

let r = tokio::spawn(QueueRemoteFs::wait_processing_loops(queue_fs.clone()));
let res = queue_fs
.upload_file(path.to_str().unwrap().to_string(), "temp-upload/foo.csv".to_string())
.upload_file(
path.to_str().unwrap().to_string(),
"temp-upload/foo.csv".to_string(),
)
.await;
queue_fs.stop_processing_loops().unwrap();
r.await.unwrap().unwrap();
Expand All @@ -559,7 +566,10 @@ mod test {

let r = tokio::spawn(QueueRemoteFs::wait_processing_loops(queue_fs.clone()));
let res = queue_fs
.upload_file(path.to_str().unwrap().to_string(), "temp-upload/foo.csv".to_string())
.upload_file(
path.to_str().unwrap().to_string(),
"temp-upload/foo.csv".to_string(),
)
.await;
queue_fs.stop_processing_loops().unwrap();
r.await.unwrap().unwrap();
Expand All @@ -577,7 +587,9 @@ mod test {
config.injector().get_service("original_remote_fs").await,
);
let r = tokio::spawn(QueueRemoteFs::wait_processing_loops(queue_fs.clone()));
let res = queue_fs.download_file("temp-upload/foo.csv".to_string(), None).await;
let res = queue_fs
.download_file("temp-upload/foo.csv".to_string(), None)
.await;
match res {
Ok(_) => assert!(false),
Err(e) => assert!(e.is_corrupt_data()),
Expand All @@ -599,13 +611,24 @@ mod test {
);
let r = tokio::spawn(QueueRemoteFs::wait_processing_loops(queue_fs.clone()));
queue_fs
.upload_file(path.to_str().unwrap().to_string(), "temp-upload/foo.csv".to_string())
.upload_file(
path.to_str().unwrap().to_string(),
"temp-upload/foo.csv".to_string(),
)
.await
.unwrap();

std::fs::remove_file(queue_fs.local_file("temp-upload/foo.csv".to_string()).await.unwrap()).unwrap();
std::fs::remove_file(
queue_fs
.local_file("temp-upload/foo.csv".to_string())
.await
.unwrap(),
)
.unwrap();

let res = queue_fs.download_file("temp-upload/foo.csv".to_string(), Some(1)).await;
let res = queue_fs
.download_file("temp-upload/foo.csv".to_string(), Some(1))
.await;

match res {
Ok(_) => assert!(false),
Expand All @@ -632,12 +655,23 @@ mod test {
let r = tokio::spawn(QueueRemoteFs::wait_processing_loops(queue_fs.clone()));

queue_fs
.upload_file(path.to_str().unwrap().to_string(), "temp-upload/foo.csv".to_string())
.upload_file(
path.to_str().unwrap().to_string(),
"temp-upload/foo.csv".to_string(),
)
.await
.unwrap();
std::fs::remove_file(queue_fs.local_file("temp-upload/foo.csv".to_string()).await.unwrap()).unwrap();
std::fs::remove_file(
queue_fs
.local_file("temp-upload/foo.csv".to_string())
.await
.unwrap(),
)
.unwrap();

let res = queue_fs.download_file("temp-upload/foo.csv".to_string(), None).await;
let res = queue_fs
.download_file("temp-upload/foo.csv".to_string(), None)
.await;

match res {
Ok(_) => assert!(false),
Expand Down
Loading

0 comments on commit 3441ac9

Please sign in to comment.