Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement SHOW CREATE FLOW #4040

Merged
merged 14 commits into from
Jun 7, 2024
2 changes: 1 addition & 1 deletion src/common/meta/src/key/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub(crate) mod flow_info;
pub mod flow_info;
pub(crate) mod flow_name;
pub(crate) mod flownode_flow;
pub(crate) mod table_flow;
Expand Down
20 changes: 20 additions & 0 deletions src/common/meta/src/key/flow/flow_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,26 @@ impl FlowInfoValue {
pub fn source_table_ids(&self) -> &[TableId] {
&self.source_table_ids
}

pub fn flow_name(&self) -> &String {
&self.flow_name
}

pub fn sink_table_name(&self) -> &TableName {
&self.sink_table_name
}

pub fn raw_sql(&self) -> &String {
&self.raw_sql
}

pub fn expire_after(&self) -> Option<i64> {
self.expire_after
}

pub fn comment(&self) -> &String {
&self.comment
}
}

pub type FlowInfoManagerRef = Arc<FlowInfoManager>;
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,9 @@ pub fn check_permission(
Statement::ShowCreateTable(stmt) => {
validate_param(&stmt.table_name, query_ctx)?;
}
Statement::ShowCreateFlow(stmt) => {
validate_param(&stmt.flow_name, query_ctx)?;
}
Statement::CreateExternalTable(stmt) => {
validate_param(&stmt.name, query_ctx)?;
}
Expand Down
38 changes: 38 additions & 0 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,44 @@ impl StatementExecutor {
self.show_create_table(table_name, table_ref, query_ctx)
.await
}
Statement::ShowCreateFlow(show) => {
let obj_name = &show.flow_name;
let (catalog_name, flow_name) = match &obj_name.0[..] {
[table] => (query_ctx.current_catalog().to_string(), table.value.clone()),
[catalog, table] => (catalog.value.clone(), table.value.clone()),
_ => {
return InvalidSqlSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {obj_name}",
),
}
.fail()
}
};

let flow_name_val = self
.flow_metadata_manager
.flow_name_manager()
.get(&catalog_name, &flow_name)
.await
.context(error::TableMetadataManagerSnafu)?
.context(error::FlowNotFoundSnafu {
flow_name: &flow_name,
})?;

let flow_val = self
.flow_metadata_manager
.flow_info_manager()
.get(flow_name_val.flow_id())
.await
.context(error::TableMetadataManagerSnafu)?
.context(error::FlowNotFoundSnafu {
flow_name: &flow_name,
})?;

self.show_create_flow(obj_name.clone(), flow_val, query_ctx)
.await
}
Statement::SetVariables(set_var) => {
let var_name = set_var.variable.to_string().to_uppercase();
match var_name.as_str() {
Expand Down
13 changes: 13 additions & 0 deletions src/operator/src/statement/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_meta::key::flow::flow_info::FlowInfoValue;
use common_query::Output;
use common_telemetry::tracing;
use partition::manager::PartitionInfo;
Expand All @@ -23,6 +24,7 @@ use sql::statements::create::Partitions;
use sql::statements::show::{
ShowColumns, ShowDatabases, ShowIndex, ShowKind, ShowTables, ShowVariables,
};
use sqlparser::ast::ObjectName;
use table::metadata::TableType;
use table::table_name::TableName;
use table::TableRef;
Expand Down Expand Up @@ -105,6 +107,17 @@ impl StatementExecutor {
.context(error::ExecuteStatementSnafu)
}

#[tracing::instrument(skip_all)]
pub async fn show_create_flow(
&self,
flow_name: ObjectName,
flow_val: FlowInfoValue,
query_ctx: QueryContextRef,
) -> Result<Output> {
query::sql::show_create_flow(flow_name, flow_val, query_ctx)
.context(error::ExecuteStatementSnafu)
}

#[tracing::instrument(skip_all)]
pub fn show_variable(&self, stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
query::sql::show_variable(stmt, query_ctx).context(error::ExecuteStatementSnafu)
Expand Down
53 changes: 52 additions & 1 deletion src/query/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use common_datasource::file_format::{infer_schemas, FileFormat, Format};
use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::build_backend;
use common_datasource::util::find_dir_and_filename;
use common_meta::key::flow::flow_info::FlowInfoValue;
use common_query::prelude::GREPTIME_TIMESTAMP;
use common_query::Output;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
Expand All @@ -49,10 +50,13 @@ use regex::Regex;
use session::context::QueryContextRef;
pub use show_create_table::create_table_stmt;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::create::Partitions;
use sql::ast::Ident;
use sql::parser::ParserContext;
use sql::statements::create::{CreateFlow, Partitions};
use sql::statements::show::{
ShowColumns, ShowDatabases, ShowIndex, ShowKind, ShowTables, ShowVariables,
};
use sqlparser::ast::ObjectName;
use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
use table::TableRef;

Expand Down Expand Up @@ -134,6 +138,13 @@ static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
]))
});

static SHOW_CREATE_FLOW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
Arc::new(Schema::new(vec![
ColumnSchema::new("Flow", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("Create Flow", ConcreteDataType::string_datatype(), false),
]))
});

fn null() -> Expr {
lit(ScalarValue::Null)
}
Expand Down Expand Up @@ -606,6 +617,46 @@ pub fn show_create_table(
Ok(Output::new_with_record_batches(records))
}

pub fn show_create_flow(
flow_name: ObjectName,
flow_val: FlowInfoValue,
query_ctx: QueryContextRef,
) -> Result<Output> {
let mut parser_ctx =
ParserContext::new(query_ctx.sql_dialect(), flow_val.raw_sql()).context(error::SqlSnafu)?;

let query = parser_ctx.parser_query().context(error::SqlSnafu)?;

let comment = if flow_val.comment().is_empty() {
None
} else {
Some(flow_val.comment().clone())
};

let stmt = CreateFlow {
flow_name,
sink_table_name: ObjectName(vec![Ident {
value: flow_val.sink_table_name().table_name.clone(),
quote_style: None,
}]),
or_replace: true,
if_not_exists: true,
expire_after: flow_val.expire_after(),
comment,
query,
};

let sql = format!("{}", stmt);
let columns = vec![
Arc::new(StringVector::from(vec![flow_val.flow_name().clone()])) as _,
Arc::new(StringVector::from(vec![sql])) as _,
];
let records = RecordBatches::try_from_columns(SHOW_CREATE_FLOW_OUTPUT_SCHEMA.clone(), columns)
.context(error::CreateRecordBatchSnafu)?;

Ok(Output::new_with_record_batches(records))
}

pub fn describe_table(table: TableRef) -> Result<Output> {
let table_info = table.table_info();
let columns_schemas = table_info.meta.schema.column_schemas();
Expand Down
4 changes: 4 additions & 0 deletions src/sql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ pub enum Error {
#[snafu(display("Invalid table name: {}", name))]
InvalidTableName { name: String },

#[snafu(display("Invalid flow name: {}", name))]
InvalidFlowName { name: String },

#[snafu(display("Invalid default constraint, column: {}", column))]
InvalidDefault {
column: String,
Expand Down Expand Up @@ -274,6 +277,7 @@ impl ErrorExt for Error {
| InvalidDatabaseOption { .. }
| ColumnTypeMismatch { .. }
| InvalidTableName { .. }
| InvalidFlowName { .. }
| InvalidSqlValue { .. }
| TimestampOverflow { .. }
| InvalidTableOption { .. }
Expand Down
23 changes: 17 additions & 6 deletions src/sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use snafu::ResultExt;
use sqlparser::ast::Ident;
use sqlparser::ast::{Ident, Query};
use sqlparser::dialect::Dialect;
use sqlparser::keywords::Keyword;
use sqlparser::parser::{Parser, ParserError, ParserOptions};
Expand All @@ -38,6 +38,21 @@ pub struct ParserContext<'a> {
}

impl<'a> ParserContext<'a> {
/// Construct a new ParserContext.
pub fn new(dialect: &'a dyn Dialect, sql: &'a str) -> Result<ParserContext<'a>> {
let parser = Parser::new(dialect)
.with_options(ParserOptions::new().with_trailing_commas(true))
.try_with_sql(sql)
.context(SyntaxSnafu)?;

Ok(ParserContext { parser, sql })
}

/// Parses parser context to Query.
pub fn parser_query(&mut self) -> Result<Box<Query>> {
Ok(Box::new(self.parser.parse_query().context(SyntaxSnafu)?))
}

/// Parses SQL with given dialect
pub fn create_with_dialect(
evenyag marked this conversation as resolved.
Show resolved Hide resolved
sql: &'a str,
Expand All @@ -46,11 +61,7 @@ impl<'a> ParserContext<'a> {
) -> Result<Vec<Statement>> {
let mut stmts: Vec<Statement> = Vec::new();

let parser = Parser::new(dialect)
.with_options(ParserOptions::new().with_trailing_commas(true))
.try_with_sql(sql)
.context(SyntaxSnafu)?;
let mut parser_ctx = ParserContext { sql, parser };
let mut parser_ctx = ParserContext::new(dialect, sql)?;

let mut expecting_statement_delimiter = false;
loop {
Expand Down
28 changes: 25 additions & 3 deletions src/sql/src/parsers/show_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ use snafu::{ensure, ResultExt};
use sqlparser::keywords::Keyword;
use sqlparser::tokenizer::Token;

use crate::error::{self, InvalidDatabaseNameSnafu, InvalidTableNameSnafu, Result};
use crate::error::{
self, InvalidDatabaseNameSnafu, InvalidFlowNameSnafu, InvalidTableNameSnafu, Result,
};
use crate::parser::ParserContext;
use crate::statements::show::{
ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowStatus, ShowTables,
ShowVariables,
ShowColumns, ShowCreateFlow, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowStatus,
ShowTables, ShowVariables,
};
use crate::statements::statement::Statement;

Expand Down Expand Up @@ -62,6 +64,8 @@ impl<'a> ParserContext<'a> {
} else if self.consume_token("CREATE") {
if self.consume_token("TABLE") {
self.parse_show_create_table()
} else if self.consume_token("FLOW") {
self.parse_show_create_flow()
} else {
self.unsupported(self.peek_token_as_string())
}
Expand Down Expand Up @@ -109,6 +113,24 @@ impl<'a> ParserContext<'a> {
Ok(Statement::ShowCreateTable(ShowCreateTable { table_name }))
}

fn parse_show_create_flow(&mut self) -> Result<Statement> {
let raw_flow_name = self
.parse_object_name()
.with_context(|_| error::UnexpectedSnafu {
sql: self.sql,
expected: "a flow name",
actual: self.peek_token_as_string(),
})?;
let flow_name = Self::canonicalize_object_name(raw_flow_name);
ensure!(
!flow_name.0.is_empty(),
InvalidFlowNameSnafu {
name: flow_name.to_string(),
}
);
Ok(Statement::ShowCreateFlow(ShowCreateFlow { flow_name }))
}

fn parse_show_table_name(&mut self) -> Result<String> {
self.parser.next_token();
let table_name = self
Expand Down
43 changes: 38 additions & 5 deletions src/sql/src/statements/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,17 +269,17 @@ impl Display for CreateFlow {
if self.or_replace {
write!(f, "OR REPLACE ")?;
}
write!(f, "TASK ")?;
write!(f, "FLOW ")?;
if self.if_not_exists {
write!(f, "IF NOT EXISTS ")?;
}
write!(f, "{} ", &self.flow_name)?;
write!(f, "OUTPUT AS {} ", &self.sink_table_name)?;
writeln!(f, "{}", &self.flow_name)?;
writeln!(f, "SINK TO {}", &self.sink_table_name)?;
if let Some(expire_after) = &self.expire_after {
write!(f, "EXPIRE AFTER {} ", expire_after)?;
writeln!(f, "EXPIRE AFTER {} ", expire_after)?;
}
if let Some(comment) = &self.comment {
write!(f, "COMMENT '{}' ", comment)?;
writeln!(f, "COMMENT '{}'", comment)?;
}
write!(f, "AS {}", &self.query)
}
Expand Down Expand Up @@ -604,4 +604,37 @@ WITH(
}
}
}

#[test]
fn test_display_create_flow() {
let sql = r"CREATE FLOW filter_numbers
SINK TO out_num_cnt
AS SELECT number FROM numbers_input where number > 10;";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, result.len());

match &result[0] {
Statement::CreateFlow(c) => {
let new_sql = format!("\n{}", c);
assert_eq!(
r#"
CREATE FLOW filter_numbers
SINK TO out_num_cnt
AS SELECT number FROM numbers_input WHERE number > 10"#,
&new_sql
);

let new_result = ParserContext::create_with_dialect(
&new_sql,
&GreptimeDbDialect {},
ParseOptions::default(),
)
.unwrap();
assert_eq!(result, new_result);
}
_ => unreachable!(),
}
}
}
Loading