Skip to content

Commit

Permalink
feat: impl table procedure in standalone mode
Browse files Browse the repository at this point in the history
  • Loading branch information
DevilExileSu committed Aug 4, 2023
1 parent cf7e8c9 commit a1db375
Show file tree
Hide file tree
Showing 14 changed files with 618 additions and 22 deletions.
52 changes: 49 additions & 3 deletions src/datanode/src/sql/truncate_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_procedure::{watcher, ProcedureWithId};
use common_query::Output;
use common_telemetry::logging::info;
use snafu::ResultExt;
use table::engine::TableReference;
use table::requests::TruncateTableRequest;
use table_procedure::TruncateTableProcedure;

use crate::error::Result;
use crate::error::{self, Result};
use crate::sql::SqlHandler;

impl SqlHandler {
Expand All @@ -28,8 +32,29 @@ impl SqlHandler {
table: &table_name,
};

let _table = self.get_table(&table_ref).await?;
// TODO(DevilExileSu): implement truncate table-procedure.
let table = self.get_table(&table_ref).await?;
let engine_procedure = self.engine_procedure(table)?;

let procedure =
TruncateTableProcedure::new(req, self.catalog_manager.clone(), engine_procedure);

let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;

info!(
"Truncate table {} by procedure {}",
table_name, procedure_id
);

let mut watcher = self
.procedure_manager
.submit(procedure_with_id)
.await
.context(error::SubmitProcedureSnafu { procedure_id })?;

watcher::wait(&mut watcher)
.await
.context(error::WaitProcedureSnafu { procedure_id })?;
Ok(Output::AffectedRows(0))
}
}
Expand All @@ -39,6 +64,7 @@ mod tests {
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::QueryRequest;
use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use query::parser::{QueryLanguageParser, QueryStatement};
use query::query_engine::SqlStatementExecutor;
Expand All @@ -50,6 +76,7 @@ mod tests {

#[tokio::test(flavor = "multi_thread")]
async fn test_truncate_table_by_procedure() {
common_telemetry::init_default_ut_logging();
let instance = MockInstance::new("truncate_table_by_procedure").await;

// Create table first.
Expand Down Expand Up @@ -90,5 +117,24 @@ mod tests {
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(0)));

// Verify table is empty.
let query = Request::Query(QueryRequest {
query: Some(Query::Sql("SELECT * FROM demo".to_string())),
});

let output = instance
.inner()
.do_query(query, QueryContext::arc())
.await
.unwrap();
if let Output::Stream(stream) = output {
let output = RecordBatches::try_collect(stream)
.await
.unwrap()
.pretty_print()
.unwrap();
assert_eq!("++\n++", output)
}
}
}
11 changes: 11 additions & 0 deletions src/file-table-engine/src/engine/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,17 @@ impl TableEngineProcedure for ImmutableFileTableEngine {
let procedure = Box::new(DropImmutableFileTable::new(request, self.clone()));
Ok(procedure)
}

fn truncate_table_procedure(
&self,
_ctx: &EngineContext,
_request: TruncateTableRequest,
) -> TableResult<BoxedProcedure> {
table_error::UnsupportedSnafu {
operation: "TRUNCATE TABLE",
}
.fail()
}
}

#[cfg(test)]
Expand Down
15 changes: 15 additions & 0 deletions src/mito/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use table::requests::{
};
use table::{error as table_error, Result as TableResult, Table, TableRef};

use self::procedure::TruncateMitoTable;
use crate::config::EngineConfig;
use crate::engine::procedure::{AlterMitoTable, CreateMitoTable, DropMitoTable, TableCreator};
use crate::error::{
Expand Down Expand Up @@ -238,6 +239,19 @@ impl<S: StorageEngine> TableEngineProcedure for MitoEngine<S> {
);
Ok(procedure)
}

fn truncate_table_procedure(
&self,
_ctx: &EngineContext,
request: TruncateTableRequest,
) -> TableResult<BoxedProcedure> {
let procedure = Box::new(
TruncateMitoTable::new(request, self.inner.clone())
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?,
);
Ok(procedure)
}
}

pub(crate) struct MitoEngineInner<S: StorageEngine> {
Expand Down Expand Up @@ -724,6 +738,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;

Ok(true)
} else {
Ok(false)
Expand Down
5 changes: 4 additions & 1 deletion src/mito/src/engine/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod alter;
mod create;
mod drop;
mod truncate;

use std::sync::Arc;

Expand All @@ -23,6 +24,7 @@ use common_procedure::ProcedureManager;
pub(crate) use create::{CreateMitoTable, TableCreator};
pub(crate) use drop::DropMitoTable;
use store_api::storage::StorageEngine;
pub(crate) use truncate::TruncateMitoTable;

use crate::engine::MitoEngineInner;

Expand All @@ -37,7 +39,8 @@ pub(crate) fn register_procedure_loaders<S: StorageEngine>(
// The procedure names are expected to be unique, so we just panic on error.
CreateMitoTable::register_loader(engine_inner.clone(), procedure_manager);
AlterMitoTable::register_loader(engine_inner.clone(), procedure_manager);
DropMitoTable::register_loader(engine_inner, procedure_manager);
DropMitoTable::register_loader(engine_inner.clone(), procedure_manager);
TruncateMitoTable::register_loader(engine_inner, procedure_manager)
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion src/mito/src/engine/procedure/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ mod tests {
let TestEnv {
table_engine,
dir: _dir,
} = procedure_test_util::setup_test_engine("add_column").await;
} = procedure_test_util::setup_test_engine("drop_table").await;
let schema = Arc::new(test_util::schema_for_test());
let request = test_util::new_create_request(schema.clone());

Expand Down
199 changes: 199 additions & 0 deletions src/mito/src/engine/procedure/truncate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use async_trait::async_trait;
use common_procedure::error::{Error, FromJsonSnafu, ToJsonSnafu};
use common_procedure::{Context, LockKey, Procedure, ProcedureManager, Result, Status};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::storage::StorageEngine;
use table::engine::TableReference;
use table::requests::TruncateTableRequest;
use table::Table;

use crate::engine::MitoEngineInner;
use crate::table::MitoTable;

/// Procedure to truncate a [MitoTable].
pub(crate) struct TruncateMitoTable<S: StorageEngine> {
data: TruncateTableData,
engine_inner: Arc<MitoEngineInner<S>>,
table: Option<Arc<MitoTable<S::Region>>>,
}

#[async_trait]
impl<S: StorageEngine> Procedure for TruncateMitoTable<S> {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}

async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
match self.data.state {
TruncateTableState::Prepare => self.on_prepare(),
TruncateTableState::EngineTruncateTable => self.on_engine_truncate_table().await,
}
}

fn dump(&self) -> Result<String> {
let json = serde_json::to_string(&self.data).context(ToJsonSnafu)?;
Ok(json)
}

fn lock_key(&self) -> LockKey {
let table_ref = self.data.table_ref();
let Some(table) = &self.table else { return LockKey::default() };
let info = table.table_info();
let keys = info
.meta
.region_numbers
.iter()
.map(|number| format!("{table_ref}/region-{number}"));
LockKey::new(keys)
}
}

impl<S: StorageEngine> TruncateMitoTable<S> {
const TYPE_NAME: &str = "mito::TruncateMitoTable";

/// Returns a new [TruncateMitoTable].
pub(crate) fn new(
request: TruncateTableRequest,
engine_inner: Arc<MitoEngineInner<S>>,
) -> Result<Self> {
let data = TruncateTableData {
state: TruncateTableState::Prepare,
request,
};
let table = engine_inner.get_mito_table(data.request.table_id);

Ok(TruncateMitoTable {
data,
engine_inner,
table,
})
}

/// Register the loader of this procedure to the `procedure_manager`.
///
/// # Panics
/// Panics on error.
pub(crate) fn register_loader(
engine_inner: Arc<MitoEngineInner<S>>,
procedure_manager: &dyn ProcedureManager,
) {
procedure_manager
.register_loader(
Self::TYPE_NAME,
Box::new(move |data| {
Self::from_json(data, engine_inner.clone()).map(|p| Box::new(p) as _)
}),
)
.unwrap()
}

/// Recover the procedure from json.
fn from_json(json: &str, engine_inner: Arc<MitoEngineInner<S>>) -> Result<Self> {
let data: TruncateTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
let table = engine_inner.get_mito_table(data.request.table_id);

Ok(TruncateMitoTable {
data,
engine_inner,
table,
})
}

/// Prepare table info.
fn on_prepare(&mut self) -> Result<Status> {
self.data.state = TruncateTableState::EngineTruncateTable;

Ok(Status::executing(true))
}

async fn on_engine_truncate_table(&mut self) -> Result<Status> {
let engine = &self.engine_inner;
engine
.truncate_table(self.data.request.clone())
.await
.map_err(Error::from_error_ext)?;
Ok(Status::Done)
}
}

/// Represents each step while truncating table in the mito engine.
#[derive(Debug, Serialize, Deserialize)]
enum TruncateTableState {
/// Prepare to truncate the table.
Prepare,
/// Engine truncate the table.
EngineTruncateTable,
}

/// Serializable data of [TruncateMitoTable].
#[derive(Debug, Serialize, Deserialize)]
struct TruncateTableData {
state: TruncateTableState,
request: TruncateTableRequest,
}

impl TruncateTableData {
fn table_ref(&self) -> TableReference {
self.request.table_ref()
}
}

#[cfg(test)]
mod tests {
use table::engine::{EngineContext, TableEngine, TableEngineProcedure};

use super::*;
use crate::engine::procedure::procedure_test_util::{self, TestEnv};
use crate::table::test_util;

#[tokio::test]
async fn test_procedure_truncate_table() {
common_telemetry::init_default_ut_logging();

let TestEnv {
table_engine,
dir: _dir,
} = procedure_test_util::setup_test_engine("truncate_table").await;
let schema = Arc::new(test_util::schema_for_test());
let request = test_util::new_create_request(schema.clone());

let engine_ctx = EngineContext::default();
// Create table first.
let mut procedure = table_engine
.create_table_procedure(&engine_ctx, request.clone())
.unwrap();
procedure_test_util::execute_procedure_until_done(&mut procedure).await;

let table_id = request.id;

let request = test_util::new_truncate_request();

// Truncate the table.
let mut procedure = table_engine
.truncate_table_procedure(&engine_ctx, request.clone())
.unwrap();
procedure_test_util::execute_procedure_until_done(&mut procedure).await;

assert!(table_engine
.get_table(&engine_ctx, table_id)
.unwrap()
.is_some());
}
}
Loading

0 comments on commit a1db375

Please sign in to comment.