Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
waralexrom committed Sep 4, 2023
1 parent 2e53335 commit f8a1321
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 220 deletions.
67 changes: 28 additions & 39 deletions rust/cubestore/cubestore/src/cluster/ingestion/job_processor.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::config::injection::DIService;
use crate::config::{Config, ConfigObj};
use crate::config::Config;
use crate::import::ImportService;
use crate::metastore::job::{Job, JobType};
use crate::metastore::table::Table;
use crate::metastore::{MetaStore, RowKey, TableId};
use crate::queryplanner::trace_data_loaded::DataLoadedSize;
use crate::store::compaction::CompactionService;
use crate::store::ChunkDataStore;
use crate::CubeError;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -47,15 +48,15 @@ pub struct JobProcessorImpl {

impl JobProcessorImpl {
pub fn new(
config_obj: Arc<dyn ConfigObj>,
meta_store: Arc<dyn MetaStore>,
chunk_store: Arc<dyn ChunkDataStore>,
compaction_service: Arc<dyn CompactionService>,
import_service: Arc<dyn ImportService>,
) -> Arc<Self> {
Arc::new(Self {
processor: JobIsolatedProcessor::new(
config_obj,
meta_store,
chunk_store,
compaction_service,
import_service,
),
Expand All @@ -72,66 +73,37 @@ impl JobProcessor for JobProcessorImpl {
}

async fn process_job(&self, job: Job) -> Result<JobProcessResult, CubeError> {
self.processor
.process_separate_job(&job)
.await
self.processor.process_separate_job(&job).await
}
}

crate::di_service!(JobProcessorImpl, [JobProcessor]);

//TODO
#[cfg(target_os = "windows")]
pub struct JobProcessor {
process_pool:
Arc<WorkerPool<IngestionWorkerMessage, JobProcessResult, IngestionWorkerProcessor>>,
}

#[cfg(target_os = "windows")]
impl JobProcessor {
pub fn new(pool_size: usize, timeout: Duration) -> Arc<Self> {
Arc::new(Self {
process_pool: Arc::new(WorkerPool::new(pool_size, timeout)),
})
}
pub async fn wait_processing_loops(&self) {
self.process_pool.wait_processing_loops().await
}
pub async fn stop_processing_loops(&self) -> Result<(), CubeError> {
self.process_pool.stop_workers().await
}
pub async fn process_job(&self, job: Job) -> Result<JobProcessResult, CubeError> {
self.process_pool
.process(IngestionWorkerMessage::Job(job))
.await
}
}

pub struct JobIsolatedProcessor {
config_obj: Arc<dyn ConfigObj>,
meta_store: Arc<dyn MetaStore>,
chunk_store: Arc<dyn ChunkDataStore>,
compaction_service: Arc<dyn CompactionService>,
import_service: Arc<dyn ImportService>,
}

impl JobIsolatedProcessor {
pub fn new(
config_obj: Arc<dyn ConfigObj>,
meta_store: Arc<dyn MetaStore>,
chunk_store: Arc<dyn ChunkDataStore>,
compaction_service: Arc<dyn CompactionService>,
import_service: Arc<dyn ImportService>,
) -> Arc<Self> {
Arc::new(Self {
config_obj,
meta_store,
chunk_store,
compaction_service,
import_service,
})
}

pub async fn new_from_config(config: &Config) -> Arc<Self> {
Self::new(
config.config_obj(),
config.injector().get_service_typed().await,
config.injector().get_service_typed().await,
config.injector().get_service_typed().await,
config.injector().get_service_typed().await,
Expand All @@ -145,11 +117,9 @@ impl JobIsolatedProcessor {
let compaction_service = self.compaction_service.clone();
let partition_id = *partition_id;
let data_loaded_size = DataLoadedSize::new();
println!("!!!! CCCCCCCC");
let r = compaction_service
.compact(partition_id, data_loaded_size.clone())
.await;
println!("!!!! FFFFFF");
r?;
Ok(JobProcessResult::new(data_loaded_size.get()))
} else {
Expand Down Expand Up @@ -215,6 +185,25 @@ impl JobIsolatedProcessor {
Self::fail_job_row_key(job)
}
}
JobType::RepartitionChunk => {
if let RowKey::Table(TableId::Chunks, chunk_id) = job.row_reference() {
let chunk_id = *chunk_id;
let chunk = self.meta_store.get_chunk(chunk_id).await?;
if chunk.get_row().in_memory() {
return Err(CubeError::internal(
"In-memory chunk cannot be repartitioned in separate process"
.to_string(),
));
}
let data_loaded_size = DataLoadedSize::new();
self.chunk_store
.repartition_chunk(chunk_id, data_loaded_size.clone())
.await?;
Ok(JobProcessResult::new(data_loaded_size.get()))
} else {
Self::fail_job_row_key(job)
}
}
_ => Err(CubeError::internal(format!(
"Job {:?} cannot be processed in separate process",
job.job_type()
Expand Down
183 changes: 96 additions & 87 deletions rust/cubestore/cubestore/src/cluster/ingestion/job_runner.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
use crate::cluster::ingestion::job_processor::JobProcessor;
use crate::cluster::rate_limiter::{ProcessRateLimiter, TaskType, TraceIndex};
use crate::config::ConfigObj;
use crate::metastore::{
IdRow, MetaStore, RowKey, TableId,
};
use crate::metastore::job::{Job, JobStatus, JobType};
use crate::import::ImportService;
use crate::store::ChunkDataStore;
use crate::store::compaction::CompactionService;
use tokio_util::sync::CancellationToken;
use crate::queryplanner::trace_data_loaded::DataLoadedSize;
use crate::metastore::job::{Job, JobStatus, JobType};
use crate::metastore::table::Table;
use tokio::time::timeout;
use tokio::task::JoinHandle;
use crate::metastore::{IdRow, MetaStore, RowKey, TableId};
use crate::queryplanner::trace_data_loaded::DataLoadedSize;
use crate::store::compaction::CompactionService;
use crate::store::ChunkDataStore;
use crate::util::aborting_join_handle::AbortingJoinHandle;
use std::time::SystemTime;
use crate::cluster::rate_limiter::{ProcessRateLimiter, TaskType, TraceIndex};
use futures_timer::Delay;
use std::time::Duration;
use crate::CubeError;
use std::sync::Arc;
use log::{debug, error, info};
use tokio::sync::{oneshot, Notify};
use core::mem;
use datafusion::cube_ext;
use crate::cluster::ingestion::job_processor::JobProcessor;
use futures_timer::Delay;
use log::{debug, error, info};
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use tokio::sync::{oneshot, Notify};
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;

pub struct JobRunner {
pub config_obj: Arc<dyn ConfigObj>,
Expand Down Expand Up @@ -218,17 +216,19 @@ impl JobRunner {
trace_obj,
};

let res = job_processor.process_job(job_to_move).await;
if let Ok(job_res) = res {
process_rate_limiter
.commit_task_usage(
TaskType::Job,
job_res.data_loaded_size() as i64,
trace_index,
)
.await;
match job_processor.process_job(job_to_move).await {
Ok(job_res) => {
process_rate_limiter
.commit_task_usage(
TaskType::Job,
job_res.data_loaded_size() as i64,
trace_index,
)
.await;
Ok(())
}
Err(e) => Err(e),
}
Ok(())
}))
} else {
Self::fail_job_row_key(job)
Expand Down Expand Up @@ -264,40 +264,33 @@ impl JobRunner {
}
}
JobType::MultiPartitionSplit => {
if let RowKey::Table(TableId::MultiPartitions, id) = job.row_reference() {
let compaction_service = self.compaction_service.clone();
let id = *id;
if let RowKey::Table(TableId::MultiPartitions, _) = job.row_reference() {
let job_to_move = job.clone();
let job_processor = self.job_processor.clone();
Ok(cube_ext::spawn(async move {
compaction_service.split_multi_partition(id).await
job_processor.process_job(job_to_move).await.map(|_| ())
}))
} else {
Self::fail_job_row_key(job)
}
}
JobType::FinishMultiSplit => {
if let RowKey::Table(TableId::MultiPartitions, multi_part_id) = job.row_reference()
{
let meta_store = self.meta_store.clone();
let compaction_service = self.compaction_service.clone();
let multi_part_id = *multi_part_id;
if let RowKey::Table(TableId::MultiPartitions, _) = job.row_reference() {
let job_to_move = job.clone();
let job_processor = self.job_processor.clone();
Ok(cube_ext::spawn(async move {
for p in meta_store.find_unsplit_partitions(multi_part_id).await? {
compaction_service
.finish_multi_split(multi_part_id, p)
.await?
}
Ok(())
job_processor.process_job(job_to_move).await.map(|_| ())
}))
} else {
Self::fail_job_row_key(job)
}
}
JobType::TableImport => {
if let RowKey::Table(TableId::Tables, table_id) = job.row_reference() {
let import_service = self.import_service.clone();
let table_id = *table_id;
if let RowKey::Table(TableId::Tables, _) = job.row_reference() {
let job_to_move = job.clone();
let job_processor = self.job_processor.clone();
Ok(cube_ext::spawn(async move {
import_service.import_table(table_id).await
job_processor.process_job(job_to_move).await.map(|_| ())
}))
} else {
Self::fail_job_row_key(job)
Expand All @@ -311,6 +304,8 @@ impl JobRunner {
let process_rate_limiter = self.process_rate_limiter.clone();
let timeout = Some(Duration::from_secs(self.config_obj.import_job_timeout()));
let metastore = self.meta_store.clone();
let job_to_move = job.clone();
let job_processor = self.job_processor.clone();
Ok(cube_ext::spawn(async move {
let is_streaming = Table::is_stream_location(&location);
let data_loaded_size = if is_streaming {
Expand All @@ -322,26 +317,31 @@ impl JobRunner {
process_rate_limiter
.wait_for_allow(TaskType::Job, timeout)
.await?; //TODO config, may be same ad orphaned timeout
match job_processor.process_job(job_to_move).await {
Ok(job_res) => {
let trace_obj =
metastore.get_trace_obj_by_table_id(table_id).await?;
let trace_index = TraceIndex {
table_id: Some(table_id),
trace_obj,
};
process_rate_limiter
.commit_task_usage(
TaskType::Job,
job_res.data_loaded_size() as i64,
trace_index,
)
.await;
Ok(())
}
Err(e) => Err(e),
}
} else {
import_service
.clone()
.import_table_part(table_id, &location, data_loaded_size.clone())
.await
}
let res = import_service
.clone()
.import_table_part(table_id, &location, data_loaded_size.clone())
.await;
if let Some(data_loaded) = &data_loaded_size {
let trace_obj = metastore.get_trace_obj_by_table_id(table_id).await?;
let trace_index = TraceIndex {
table_id: Some(table_id),
trace_obj,
};
process_rate_limiter
.commit_task_usage(
TaskType::Job,
data_loaded.get() as i64,
trace_index,
)
.await;
}
res
}))
} else {
Self::fail_job_row_key(job)
Expand All @@ -354,32 +354,41 @@ impl JobRunner {
let process_rate_limiter = self.process_rate_limiter.clone();
let timeout = Some(Duration::from_secs(self.config_obj.import_job_timeout()));
let metastore = self.meta_store.clone();
let job_to_move = job.clone();
let job_processor = self.job_processor.clone();
Ok(cube_ext::spawn(async move {
process_rate_limiter
.wait_for_allow(TaskType::Job, timeout)
.await?; //TODO config, may be same ad orphaned timeout
let chunk = metastore.get_chunk(chunk_id).await?;
let (_, _, table, _) = metastore
.get_partition_for_compaction(chunk.get_row().get_partition_id())
.await?;
let table_id = table.get_id();
let trace_obj = metastore.get_trace_obj_by_table_id(table_id).await?;
let trace_index = TraceIndex {
table_id: Some(table_id),
trace_obj,
};
let data_loaded_size = DataLoadedSize::new();
let res = chunk_store
.repartition_chunk(chunk_id, data_loaded_size.clone())
.await;
process_rate_limiter
.commit_task_usage(
TaskType::Job,
data_loaded_size.get() as i64,
trace_index,
)
.await;
res
if !chunk.get_row().in_memory() {
let (_, _, table, _) = metastore
.get_partition_for_compaction(chunk.get_row().get_partition_id())
.await?;
let table_id = table.get_id();
let trace_obj = metastore.get_trace_obj_by_table_id(table_id).await?;
let trace_index = TraceIndex {
table_id: Some(table_id),
trace_obj,
};
match job_processor.process_job(job_to_move).await {
Ok(job_res) => {
process_rate_limiter
.commit_task_usage(
TaskType::Job,
job_res.data_loaded_size() as i64,
trace_index,
)
.await;
Ok(())
}
Err(e) => Err(e),
}
} else {
chunk_store
.repartition_chunk(chunk_id, DataLoadedSize::new())
.await
}
}))
} else {
Self::fail_job_row_key(job)
Expand Down
Loading

0 comments on commit f8a1321

Please sign in to comment.