Skip to content

Commit

Permalink
refactor(mito): mv mito2 request (#2086)
Browse files Browse the repository at this point in the history
* refactor: mv request mod to crate level

* refactor: mv SkippedFields
  • Loading branch information
evenyag authored Aug 3, 2023
1 parent 90b2200 commit fdd4929
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 55 deletions.
4 changes: 2 additions & 2 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use store_api::storage::RegionId;

use crate::config::MitoConfig;
use crate::error::{RecvSnafu, Result};
pub use crate::worker::request::CreateRequest;
use crate::worker::request::{CloseRequest, OpenRequest, RegionRequest, RequestBody};
pub use crate::request::CreateRequest;
use crate::request::{CloseRequest, OpenRequest, RegionRequest, RequestBody};
use crate::worker::WorkerGroup;

/// Region engine implementation for timeseries data.
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/engine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use store_api::storage::RegionId;

use super::*;
use crate::error::Error;
use crate::request::RegionOptions;
use crate::test_util::{CreateRequestBuilder, TestEnv};
use crate::worker::request::RegionOptions;

#[tokio::test]
async fn test_engine_new_stop() {
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub mod read;
#[allow(dead_code)]
mod region;
#[allow(dead_code)]
pub mod request;
#[allow(dead_code)]
pub mod sst;
#[allow(dead_code)]
mod worker;
Expand Down
90 changes: 44 additions & 46 deletions src/mito2/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,53 +119,7 @@ impl RegionMetadata {
pub fn to_json(&self) -> Result<String> {
serde_json::to_string(&self).context(SerdeJsonSnafu)
}
}

/// Fields skipped in serialization.
struct SkippedFields {
/// Last schema.
schema: SchemaRef,
/// Id of the time index column.
time_index: ColumnId,
/// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
id_to_index: HashMap<ColumnId, usize>,
}

impl SkippedFields {
/// Constructs skipped fields from `column_metadatas`.
fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
let column_schemas = column_metadatas
.iter()
.map(|column_metadata| column_metadata.column_schema.clone())
.collect();
let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
let time_index = column_metadatas
.iter()
.find_map(|col| {
if col.semantic_type == SemanticType::Timestamp {
Some(col.column_id)
} else {
None
}
})
.context(InvalidMetaSnafu {
reason: "time index not found",
})?;
let id_to_index = column_metadatas
.iter()
.enumerate()
.map(|(idx, col)| (col.column_id, idx))
.collect();

Ok(SkippedFields {
schema,
time_index,
id_to_index,
})
}
}

impl RegionMetadata {
/// Find column by id.
pub(crate) fn column_by_id(&self, column_id: ColumnId) -> Option<&ColumnMetadata> {
self.id_to_index
Expand Down Expand Up @@ -366,6 +320,50 @@ pub enum SemanticType {
Timestamp,
}

/// Fields skipped in serialization.
struct SkippedFields {
/// Last schema.
schema: SchemaRef,
/// Id of the time index column.
time_index: ColumnId,
/// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
id_to_index: HashMap<ColumnId, usize>,
}

impl SkippedFields {
/// Constructs skipped fields from `column_metadatas`.
fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
let column_schemas = column_metadatas
.iter()
.map(|column_metadata| column_metadata.column_schema.clone())
.collect();
let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
let time_index = column_metadatas
.iter()
.find_map(|col| {
if col.semantic_type == SemanticType::Timestamp {
Some(col.column_id)
} else {
None
}
})
.context(InvalidMetaSnafu {
reason: "time index not found",
})?;
let id_to_index = column_metadatas
.iter()
.enumerate()
.map(|(idx, col)| (col.column_id, idx))
.collect();

Ok(SkippedFields {
schema,
time_index,
id_to_index,
})
}
}

#[cfg(test)]
mod test {
use datatypes::prelude::ConcreteDataType;
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::error::Result;
use crate::manifest::manager::RegionManifestManager;
use crate::manifest::options::RegionManifestOptions;
use crate::metadata::{ColumnMetadata, RegionMetadataRef, SemanticType};
use crate::worker::request::{CreateRequest, RegionOptions};
use crate::request::{CreateRequest, RegionOptions};
use crate::worker::WorkerGroup;

/// Env to test mito engine.
Expand Down
3 changes: 1 addition & 2 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
mod handle_close;
mod handle_create;
mod handle_open;
pub(crate) mod request;

use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
Expand All @@ -38,7 +37,7 @@ use crate::config::MitoConfig;
use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu};
use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef};
use crate::region::{RegionMap, RegionMapRef};
use crate::worker::request::{RegionRequest, RequestBody, WorkerRequest};
use crate::request::{RegionRequest, RequestBody, WorkerRequest};

/// Identifier for a worker.
pub(crate) type WorkerId = u32;
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/worker/handle_close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use common_telemetry::info;

use crate::error::Result;
use crate::worker::request::CloseRequest;
use crate::request::CloseRequest;
use crate::worker::RegionWorkerLoop;

impl<S> RegionWorkerLoop<S> {
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/worker/handle_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use snafu::ensure;
use crate::error::{RegionExistsSnafu, Result};
use crate::metadata::{RegionMetadataBuilder, INIT_REGION_VERSION};
use crate::region::opener::RegionOpener;
use crate::worker::request::CreateRequest;
use crate::request::CreateRequest;
use crate::worker::RegionWorkerLoop;

impl<S> RegionWorkerLoop<S> {
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/worker/handle_open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use common_telemetry::info;

use crate::error::Result;
use crate::region::opener::RegionOpener;
use crate::worker::request::OpenRequest;
use crate::request::OpenRequest;
use crate::worker::RegionWorkerLoop;

impl<S> RegionWorkerLoop<S> {
Expand Down

0 comments on commit fdd4929

Please sign in to comment.