Skip to content

Commit

Permalink
in work
Browse files Browse the repository at this point in the history
  • Loading branch information
waralexrom committed Aug 31, 2023
1 parent 6072e21 commit 95b3ce9
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 101 deletions.
73 changes: 48 additions & 25 deletions rust/cubestore/cubestore/src/cluster/ingestion/job_processor.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
#[cfg(not(target_os = "windows"))]
use crate::cluster::ingestion::worker::{IngestionWorkerMessage, IngestionWorkerProcessor};
#[cfg(not(target_os = "windows"))]
use crate::cluster::worker_pool::{worker_main, MessageProcessor, WorkerPool};
use crate::config::injection::DIService;
use crate::config::{Config, ConfigObj};
use crate::import::ImportService;
use crate::metastore::job::{Job, JobStatus, JobType};
use crate::metastore::job::{Job, JobType};
use crate::metastore::table::Table;
use crate::metastore::{MetaStore, RowKey, TableId};
use crate::queryplanner::trace_data_loaded::DataLoadedSize;
use crate::store::compaction::CompactionService;
use crate::CubeError;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;

#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct JobProcessResult {
Expand All @@ -37,31 +34,52 @@ impl Default for JobProcessResult {
}
}

#[cfg(not(target_os = "windows"))]
pub struct JobProcessor {
process_pool:
Arc<WorkerPool<IngestionWorkerMessage, JobProcessResult, IngestionWorkerProcessor>>,
#[async_trait]
pub trait JobProcessor: DIService + Send + Sync {
async fn wait_processing_loops(&self);
async fn stop_processing_loops(&self) -> Result<(), CubeError>;
async fn process_job(&self, job: Job) -> Result<JobProcessResult, CubeError>;
}

#[cfg(not(target_os = "windows"))]
impl JobProcessor {
pub fn new(pool_size: usize, timeout: Duration) -> Arc<Self> {
pub struct JobProcessorImpl {
processor: Arc<JobIsolatedProcessor>,
}

impl JobProcessorImpl {
pub fn new(
config_obj: Arc<dyn ConfigObj>,
meta_store: Arc<dyn MetaStore>,
compaction_service: Arc<dyn CompactionService>,
import_service: Arc<dyn ImportService>,
) -> Arc<Self> {
Arc::new(Self {
process_pool: Arc::new(WorkerPool::new(pool_size, timeout)),
processor: JobIsolatedProcessor::new(
config_obj,
meta_store,
compaction_service,
import_service,
),
})
}
pub async fn wait_processing_loops(&self) {
self.process_pool.wait_processing_loops().await
}
pub async fn stop_processing_loops(&self) -> Result<(), CubeError> {
self.process_pool.stop_workers().await
}

#[async_trait]
impl JobProcessor for JobProcessorImpl {
async fn wait_processing_loops(&self) {}

async fn stop_processing_loops(&self) -> Result<(), CubeError> {
Ok(())
}
pub async fn process_job(&self, job: Job) -> Result<JobProcessResult, CubeError> {
println!("^^^^^^^ ");
self.process_pool.process(IngestionWorkerMessage::Job(job)).await

async fn process_job(&self, job: Job) -> Result<JobProcessResult, CubeError> {
self.processor
.process_separate_job(&job)
.await
}
}

crate::di_service!(JobProcessorImpl, [JobProcessor]);

//TODO
#[cfg(target_os = "windows")]
pub struct JobProcessor {
Expand All @@ -83,7 +101,9 @@ impl JobProcessor {
self.process_pool.stop_workers().await
}
pub async fn process_job(&self, job: Job) -> Result<JobProcessResult, CubeError> {
self.process_pool.process(IngestionWorkerMessage::Job(job)).await
self.process_pool
.process(IngestionWorkerMessage::Job(job))
.await
}
}

Expand Down Expand Up @@ -125,9 +145,12 @@ impl JobIsolatedProcessor {
let compaction_service = self.compaction_service.clone();
let partition_id = *partition_id;
let data_loaded_size = DataLoadedSize::new();
compaction_service
println!("!!!! CCCCCCCC");
let r = compaction_service
.compact(partition_id, data_loaded_size.clone())
.await?;
.await;
println!("!!!! FFFFFF");
r?;
Ok(JobProcessResult::new(data_loaded_size.get()))
} else {
Self::fail_job_row_key(job)
Expand Down
13 changes: 5 additions & 8 deletions rust/cubestore/cubestore/src/cluster/ingestion/job_runner.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::config::{Config, ConfigObj};
use crate::config::ConfigObj;
use crate::metastore::{
deactivate_table_on_corrupt_data, Chunk, IdRow, MetaStore, MetaStoreEvent, Partition, RowKey,
TableId,
IdRow, MetaStore, RowKey, TableId,
};
use crate::metastore::job::{Job, JobStatus, JobType};
use crate::import::ImportService;
Expand All @@ -19,8 +18,8 @@ use futures_timer::Delay;
use std::time::Duration;
use crate::CubeError;
use std::sync::Arc;
use log::{debug, error, info, warn};
use tokio::sync::{oneshot, watch, Notify, RwLock};
use log::{debug, error, info};
use tokio::sync::{oneshot, Notify};
use core::mem;
use datafusion::cube_ext;
use crate::cluster::ingestion::job_processor::JobProcessor;
Expand All @@ -36,7 +35,7 @@ pub struct JobRunner {
pub notify: Arc<Notify>,
pub stop_token: CancellationToken,
pub is_long_term: bool,
pub job_processor: Arc<JobProcessor>,
pub job_processor: Arc<dyn JobProcessor>,
}

impl JobRunner {
Expand Down Expand Up @@ -199,7 +198,6 @@ impl JobRunner {
}
JobType::PartitionCompaction => {
if let RowKey::Table(TableId::Partitions, partition_id) = job.row_reference() {
let compaction_service = self.compaction_service.clone();
let partition_id = *partition_id;
let process_rate_limiter = self.process_rate_limiter.clone();
let timeout = Some(Duration::from_secs(self.config_obj.import_job_timeout()));
Expand All @@ -220,7 +218,6 @@ impl JobRunner {
trace_obj,
};

let data_loaded_size = DataLoadedSize::new();
let res = job_processor.process_job(job_to_move).await;
if let Ok(job_res) = res {
process_rate_limiter
Expand Down
1 change: 0 additions & 1 deletion rust/cubestore/cubestore/src/cluster/ingestion/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
pub mod worker;
pub mod job_processor;
pub mod job_runner;
46 changes: 0 additions & 46 deletions rust/cubestore/cubestore/src/cluster/ingestion/worker.rs

This file was deleted.

23 changes: 11 additions & 12 deletions rust/cubestore/cubestore/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub mod worker_pool;

pub mod rate_limiter;

mod ingestion;
pub mod ingestion;

#[cfg(not(target_os = "windows"))]
use crate::cluster::worker_pool::{worker_main, MessageProcessor, WorkerPool};
Expand All @@ -16,13 +16,11 @@ use crate::cluster::message::NetworkMessage;
use crate::cluster::rate_limiter::{ProcessRateLimiter, TaskType, TraceIndex};
use crate::cluster::transport::{ClusterTransport, MetaStoreTransport, WorkerConnection};
use crate::config::injection::{DIService, Injector};
use crate::config::{is_router, WorkerServices};
use crate::config::is_router;
#[allow(unused_imports)]
use crate::config::{Config, ConfigObj};
use crate::import::ImportService;
use crate::metastore::chunks::chunk_file_name;
use crate::metastore::job::{Job, JobStatus, JobType};
use crate::metastore::table::Table;
use crate::metastore::{
deactivate_table_on_corrupt_data, Chunk, IdRow, MetaStore, MetaStoreEvent, Partition, RowKey,
TableId,
Expand All @@ -33,18 +31,14 @@ use crate::metastore::{
};
use crate::queryplanner::query_executor::{QueryExecutor, SerializedRecordBatchStream};
use crate::queryplanner::serialized_plan::SerializedPlan;
use crate::queryplanner::trace_data_loaded::DataLoadedSize;
use crate::remotefs::RemoteFs;
use crate::store::compaction::CompactionService;
use crate::store::ChunkDataStore;
use crate::telemetry::tracing::TracingHelper;
use crate::util::aborting_join_handle::AbortingJoinHandle;
use crate::CubeError;
use arrow::datatypes::SchemaRef;
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use core::mem;
use datafusion::cube_ext;
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use flatbuffers::bitflags::_core::pin::Pin;
Expand All @@ -67,7 +61,6 @@ use std::time::SystemTime;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::{oneshot, watch, Notify, RwLock};
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use tracing::{instrument, Instrument};
Expand Down Expand Up @@ -289,6 +282,12 @@ impl MessageProcessor<WorkerMessage, (SchemaRef, Vec<SerializedRecordBatchStream
}
}
}

fn process_titile() -> String {
std::env::var("CUBESTORE_SELECT_WORKER_TITLE")
.ok()
.unwrap_or("--sel-worker".to_string())
}
}

#[cfg(not(target_os = "windows"))]
Expand Down Expand Up @@ -822,20 +821,20 @@ impl ClusterImpl {
let arc = Arc::new(WorkerPool::new(
self.config_obj.select_worker_pool_size(),
Duration::from_secs(self.config_obj.query_timeout()),
"sel",
vec![("_CUBESTORE_SUBPROCESS_TYPE".to_string(), "SELECT_WORKER".to_string())]
));
*pool = Some(arc.clone());
futures.push(cube_ext::spawn(
async move { arc.wait_processing_loops().await },
));
}
#[cfg(not(target_os = "windows"))]
let job_processor = {
let job_processor = JobProcessor::new(self.config_obj.job_runners_count(), Duration::from_secs(600)); //TODO - timeout
let job_processor = self.injector.upgrade().unwrap().get_service_typed::<dyn JobProcessor>().await; //TODO - timeout
let job_processor_to_move = job_processor.clone();
futures.push(cube_ext::spawn(
async move { job_processor_to_move.wait_processing_loops().await },
));
println!("!!!!! ---- !!!!!!");
job_processor

};
Expand Down
Loading

0 comments on commit 95b3ce9

Please sign in to comment.