From e070ba3c32c9108dc499c250ddfc583271e22693 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 22 May 2024 00:02:25 +0800 Subject: [PATCH] feat: respect time range when building parquet reader (#3947) * feat: convert timestamp range filters to predicates * chore: rebase main * fix: remove prediactes once they have been added to timestamp filters to avoid duplicate filtering * fix: some comments * fix: resolve conflicts --- src/common/recordbatch/src/filter.rs | 24 +- src/mito2/src/engine/prune_test.rs | 19 +- src/mito2/src/error.rs | 9 + src/mito2/src/read/scan_region.rs | 13 +- src/mito2/src/sst/parquet/reader.rs | 80 +++- src/query/src/tests/time_range_filter_test.rs | 10 +- src/table/src/predicate.rs | 420 +++++++++--------- 7 files changed, 340 insertions(+), 235 deletions(-) diff --git a/src/common/recordbatch/src/filter.rs b/src/common/recordbatch/src/filter.rs index 7a5e361138c5..195abb118135 100644 --- a/src/common/recordbatch/src/filter.rs +++ b/src/common/recordbatch/src/filter.rs @@ -14,7 +14,7 @@ //! Util record batch stream wrapper that can perform precise filter. -use datafusion::logical_expr::{Expr, Operator}; +use datafusion::logical_expr::{Expr, Literal, Operator}; use datafusion_common::arrow::array::{ArrayRef, Datum, Scalar}; use datafusion_common::arrow::buffer::BooleanBuffer; use datafusion_common::arrow::compute::kernels::cmp; @@ -43,6 +43,28 @@ pub struct SimpleFilterEvaluator { } impl SimpleFilterEvaluator { + pub fn new(column_name: String, lit: T, op: Operator) -> Option { + match op { + Operator::Eq + | Operator::NotEq + | Operator::Lt + | Operator::LtEq + | Operator::Gt + | Operator::GtEq => {} + _ => return None, + } + + let Expr::Literal(val) = lit.lit() else { + return None; + }; + + Some(Self { + column_name, + literal: val.to_scalar().ok()?, + op, + }) + } + pub fn try_new(predicate: &Expr) -> Option { match predicate { Expr::BinaryExpr(binary) => { diff --git a/src/mito2/src/engine/prune_test.rs b/src/mito2/src/engine/prune_test.rs index b297af136a95..687172417d14 100644 --- a/src/mito2/src/engine/prune_test.rs +++ b/src/mito2/src/engine/prune_test.rs @@ -26,7 +26,7 @@ use crate::test_util::{ build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv, }; -async fn check_prune_row_groups(expr: Expr, expected: &str) { +async fn check_prune_row_groups(exprs: Vec, expected: &str) { let mut env = TestEnv::new(); let engine = env.create_engine(MitoConfig::default()).await; @@ -55,7 +55,7 @@ async fn check_prune_row_groups(expr: Expr, expected: &str) { .scan_to_stream( region_id, ScanRequest { - filters: vec![expr], + filters: exprs, ..Default::default() }, ) @@ -70,7 +70,9 @@ async fn test_read_parquet_stats() { common_telemetry::init_default_ut_logging(); check_prune_row_groups( - datafusion_expr::col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(4000), None))), + vec![ + datafusion_expr::col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(4000), None))) + ], "\ +-------+---------+---------------------+ | tag_0 | field_0 | ts | @@ -94,7 +96,7 @@ async fn test_read_parquet_stats() { async fn test_prune_tag() { // prune result: only row group 1&2 check_prune_row_groups( - datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))), + vec![datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string()))))], "\ +-------+---------+---------------------+ | tag_0 | field_0 | ts | @@ -114,9 +116,10 @@ async fn test_prune_tag_and_field() { common_telemetry::init_default_ut_logging(); // prune result: only row group 1 check_prune_row_groups( - col("tag_0") - .gt(lit(ScalarValue::Utf8(Some("4".to_string())))) - .and(col("field_0").lt(lit(8.0))), + vec![ + col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))), + col("field_0").lt(lit(8.0)), + ], "\ +-------+---------+---------------------+ | tag_0 | field_0 | ts | @@ -124,8 +127,6 @@ async fn test_prune_tag_and_field() { | 5 | 5.0 | 1970-01-01T00:00:05 | | 6 | 6.0 | 1970-01-01T00:00:06 | | 7 | 7.0 | 1970-01-01T00:00:07 | -| 8 | 8.0 | 1970-01-01T00:00:08 | -| 9 | 9.0 | 1970-01-01T00:00:09 | +-------+---------+---------------------+", ) .await; diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index f81052623c43..cebb9a97a2b3 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -20,6 +20,7 @@ use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use common_runtime::JoinError; +use common_time::Timestamp; use datatypes::arrow::error::ArrowError; use datatypes::prelude::ConcreteDataType; use object_store::ErrorKind; @@ -693,6 +694,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to build time range filters for value: {:?}", timestamp))] + BuildTimeRangeFilter { + timestamp: Timestamp, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -802,6 +810,7 @@ impl ErrorExt for Error { EncodeMemtable { .. } | ReadDataPart { .. } => StatusCode::Internal, ChecksumMismatch { .. } => StatusCode::Unexpected, RegionStopped { .. } => StatusCode::RegionNotReady, + BuildTimeRangeFilter { .. } => StatusCode::Unexpected, } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index f619744dc0c9..08bc5bc30d32 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -22,7 +22,7 @@ use common_telemetry::{debug, error, warn}; use common_time::range::TimestampRange; use store_api::region_engine::{RegionScannerRef, SinglePartitionScanner}; use store_api::storage::ScanRequest; -use table::predicate::{Predicate, TimeRangePredicateBuilder}; +use table::predicate::{build_time_range_predicate, Predicate}; use tokio::sync::{mpsc, Semaphore}; use tokio_stream::wrappers::ReceiverStream; @@ -235,7 +235,7 @@ impl ScanRegion { } /// Creates a scan input. - fn scan_input(self, filter_deleted: bool) -> Result { + fn scan_input(mut self, filter_deleted: bool) -> Result { let time_range = self.build_time_range_predicate(); let ssts = &self.version.ssts; @@ -300,7 +300,7 @@ impl ScanRegion { } /// Build time range predicate from filters. - fn build_time_range_predicate(&self) -> TimestampRange { + fn build_time_range_predicate(&mut self) -> TimestampRange { let time_index = self.version.metadata.time_index_column(); let unit = time_index .column_schema @@ -308,8 +308,11 @@ impl ScanRegion { .as_timestamp() .expect("Time index must have timestamp-compatible type") .unit(); - TimeRangePredicateBuilder::new(&time_index.column_schema.name, unit, &self.request.filters) - .build() + build_time_range_predicate( + &time_index.column_schema.name, + unit, + &mut self.request.filters, + ) } /// Use the latest schema to build the index applier. diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index b56d776e6722..fb41923677ba 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -23,7 +23,10 @@ use async_trait::async_trait; use common_recordbatch::filter::SimpleFilterEvaluator; use common_telemetry::{debug, warn}; use common_time::range::TimestampRange; -use datafusion_expr::Expr; +use common_time::timestamp::TimeUnit; +use common_time::Timestamp; +use datafusion_common::ScalarValue; +use datafusion_expr::{Expr, Operator}; use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; use itertools::Itertools; @@ -38,6 +41,7 @@ use store_api::storage::ColumnId; use table::predicate::Predicate; use crate::cache::CacheManagerRef; +use crate::error; use crate::error::{ ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadParquetSnafu, Result, }; @@ -225,7 +229,7 @@ impl ParquetReaderBuilder { metrics.build_cost = start.elapsed(); - let filters = if let Some(predicate) = &self.predicate { + let mut filters = if let Some(predicate) = &self.predicate { predicate .exprs() .iter() @@ -240,6 +244,11 @@ impl ParquetReaderBuilder { } else { vec![] }; + + if let Some(time_range) = &self.time_range { + filters.extend(time_range_to_predicate(*time_range, ®ion_meta)?); + } + let codec = McmpRowCodec::new( read_format .metadata() @@ -449,6 +458,59 @@ impl ParquetReaderBuilder { } } +/// Transforms time range into [SimpleFilterEvaluator]. +fn time_range_to_predicate( + time_range: TimestampRange, + metadata: &RegionMetadataRef, +) -> Result> { + let ts_col = metadata.time_index_column(); + let ts_col_id = ts_col.column_id; + + let ts_to_filter = |op: Operator, timestamp: &Timestamp| { + let value = match timestamp.unit() { + TimeUnit::Second => ScalarValue::TimestampSecond(Some(timestamp.value()), None), + TimeUnit::Millisecond => { + ScalarValue::TimestampMillisecond(Some(timestamp.value()), None) + } + TimeUnit::Microsecond => { + ScalarValue::TimestampMicrosecond(Some(timestamp.value()), None) + } + TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(timestamp.value()), None), + }; + let evaluator = SimpleFilterEvaluator::new(ts_col.column_schema.name.clone(), value, op) + .context(error::BuildTimeRangeFilterSnafu { + timestamp: *timestamp, + })?; + Ok(SimpleFilterContext::new( + evaluator, + ts_col_id, + SemanticType::Timestamp, + ts_col.column_schema.data_type.clone(), + )) + }; + + let predicates = match (time_range.start(), time_range.end()) { + (Some(start), Some(end)) => { + vec![ + ts_to_filter(Operator::GtEq, start)?, + ts_to_filter(Operator::Lt, end)?, + ] + } + + (Some(start), None) => { + vec![ts_to_filter(Operator::GtEq, start)?] + } + + (None, Some(end)) => { + vec![ts_to_filter(Operator::Lt, end)?] + } + (None, None) => { + vec![] + } + }; + Ok(predicates) +} + /// Parquet reader metrics. #[derive(Debug, Default)] struct Metrics { @@ -570,6 +632,20 @@ pub(crate) struct SimpleFilterContext { } impl SimpleFilterContext { + fn new( + filter: SimpleFilterEvaluator, + column_id: ColumnId, + semantic_type: SemanticType, + data_type: ConcreteDataType, + ) -> Self { + Self { + filter, + column_id, + semantic_type, + data_type, + } + } + /// Creates a context for the `expr`. /// /// Returns None if the column to filter doesn't exist in the SST metadata or the diff --git a/src/query/src/tests/time_range_filter_test.rs b/src/query/src/tests/time_range_filter_test.rs index 87b69c6588a9..8c52d327c892 100644 --- a/src/query/src/tests/time_range_filter_test.rs +++ b/src/query/src/tests/time_range_filter_test.rs @@ -29,7 +29,7 @@ use datatypes::vectors::{Int64Vector, TimestampMillisecondVector}; use store_api::data_source::{DataSource, DataSourceRef}; use store_api::storage::ScanRequest; use table::metadata::FilterPushDownType; -use table::predicate::TimeRangePredicateBuilder; +use table::predicate::build_time_range_predicate; use table::test_util::MemTable; use table::{Table, TableRef}; @@ -114,14 +114,14 @@ struct TimeRangeTester { impl TimeRangeTester { async fn check(&self, sql: &str, expect: TimestampRange) { let _ = exec_selection(self.engine.clone(), sql).await; - let filters = self.get_filters(); + let mut filters = self.take_filters(); - let range = TimeRangePredicateBuilder::new("ts", TimeUnit::Millisecond, &filters).build(); + let range = build_time_range_predicate("ts", TimeUnit::Millisecond, &mut filters); assert_eq!(expect, range); } - fn get_filters(&self) -> Vec { - self.filter.write().unwrap().drain(..).collect() + fn take_filters(&self) -> Vec { + std::mem::take(&mut self.filter.write().unwrap()) } } diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index 079022925ff7..a9365004cbc8 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -128,250 +128,244 @@ impl Predicate { } } -// tests for `TimeRangePredicateBuilder` locates in src/query/tests/time_range_filter_test.rs +// tests for `build_time_range_predicate` locates in src/query/tests/time_range_filter_test.rs // since it requires query engine to convert sql to filters. -/// `TimeRangePredicateBuilder` extracts time range from logical exprs to facilitate fast +/// `build_time_range_predicate` extracts time range from logical exprs to facilitate fast /// time range pruning. -pub struct TimeRangePredicateBuilder<'a> { +pub fn build_time_range_predicate<'a>( ts_col_name: &'a str, ts_col_unit: TimeUnit, - filters: &'a [Expr], -} - -impl<'a> TimeRangePredicateBuilder<'a> { - pub fn new(ts_col_name: &'a str, ts_col_unit: TimeUnit, filters: &'a [Expr]) -> Self { - Self { - ts_col_name, - ts_col_unit, - filters, - } - } - - pub fn build(&self) -> TimestampRange { - let mut res = TimestampRange::min_to_max(); - for expr in self.filters { - let range = self - .extract_time_range_from_expr(expr) - .unwrap_or_else(TimestampRange::min_to_max); + filters: &'a mut Vec, +) -> TimestampRange { + let mut res = TimestampRange::min_to_max(); + let mut filters_remain = vec![]; + for expr in std::mem::take(filters) { + if let Some(range) = extract_time_range_from_expr(ts_col_name, ts_col_unit, &expr) { res = res.and(&range); + } else { + filters_remain.push(expr); } - res } + *filters = filters_remain; + res +} - /// Extract time range filter from `WHERE`/`IN (...)`/`BETWEEN` clauses. - /// Return None if no time range can be found in expr. - fn extract_time_range_from_expr(&self, expr: &Expr) -> Option { - match expr { - Expr::BinaryExpr(BinaryExpr { left, op, right }) => { - self.extract_from_binary_expr(left, op, right) - } - Expr::Between(Between { - expr, - negated, - low, - high, - }) => self.extract_from_between_expr(expr, negated, low, high), - Expr::InList(InList { - expr, - list, - negated, - }) => self.extract_from_in_list_expr(expr, *negated, list), - _ => None, +/// Extract time range filter from `WHERE`/`IN (...)`/`BETWEEN` clauses. +/// Return None if no time range can be found in expr. +fn extract_time_range_from_expr( + ts_col_name: &str, + ts_col_unit: TimeUnit, + expr: &Expr, +) -> Option { + match expr { + Expr::BinaryExpr(BinaryExpr { left, op, right }) => { + extract_from_binary_expr(ts_col_name, ts_col_unit, left, op, right) } + Expr::Between(Between { + expr, + negated, + low, + high, + }) => extract_from_between_expr(ts_col_name, ts_col_unit, expr, negated, low, high), + Expr::InList(InList { + expr, + list, + negated, + }) => extract_from_in_list_expr(ts_col_name, expr, *negated, list), + _ => None, } +} - fn extract_from_binary_expr( - &self, - left: &Expr, - op: &Operator, - right: &Expr, - ) -> Option { - match op { - Operator::Eq => self - .get_timestamp_filter(left, right) - .and_then(|(ts, _)| ts.convert_to(self.ts_col_unit)) - .map(TimestampRange::single), - Operator::Lt => { - let (ts, reverse) = self.get_timestamp_filter(left, right)?; - if reverse { - // [lit] < ts_col - let ts_val = ts.convert_to(self.ts_col_unit)?.value(); - Some(TimestampRange::from_start(Timestamp::new( - ts_val + 1, - self.ts_col_unit, - ))) - } else { - // ts_col < [lit] - ts.convert_to_ceil(self.ts_col_unit) - .map(|ts| TimestampRange::until_end(ts, false)) - } - } - Operator::LtEq => { - let (ts, reverse) = self.get_timestamp_filter(left, right)?; - if reverse { - // [lit] <= ts_col - ts.convert_to_ceil(self.ts_col_unit) - .map(TimestampRange::from_start) - } else { - // ts_col <= [lit] - ts.convert_to(self.ts_col_unit) - .map(|ts| TimestampRange::until_end(ts, true)) - } - } - Operator::Gt => { - let (ts, reverse) = self.get_timestamp_filter(left, right)?; - if reverse { - // [lit] > ts_col - ts.convert_to_ceil(self.ts_col_unit) - .map(|t| TimestampRange::until_end(t, false)) - } else { - // ts_col > [lit] - let ts_val = ts.convert_to(self.ts_col_unit)?.value(); - Some(TimestampRange::from_start(Timestamp::new( - ts_val + 1, - self.ts_col_unit, - ))) - } +fn extract_from_binary_expr( + ts_col_name: &str, + ts_col_unit: TimeUnit, + left: &Expr, + op: &Operator, + right: &Expr, +) -> Option { + match op { + Operator::Eq => get_timestamp_filter(ts_col_name, left, right) + .and_then(|(ts, _)| ts.convert_to(ts_col_unit)) + .map(TimestampRange::single), + Operator::Lt => { + let (ts, reverse) = get_timestamp_filter(ts_col_name, left, right)?; + if reverse { + // [lit] < ts_col + let ts_val = ts.convert_to(ts_col_unit)?.value(); + Some(TimestampRange::from_start(Timestamp::new( + ts_val + 1, + ts_col_unit, + ))) + } else { + // ts_col < [lit] + ts.convert_to_ceil(ts_col_unit) + .map(|ts| TimestampRange::until_end(ts, false)) } - Operator::GtEq => { - let (ts, reverse) = self.get_timestamp_filter(left, right)?; - if reverse { - // [lit] >= ts_col - ts.convert_to(self.ts_col_unit) - .map(|t| TimestampRange::until_end(t, true)) - } else { - // ts_col >= [lit] - ts.convert_to_ceil(self.ts_col_unit) - .map(TimestampRange::from_start) - } + } + Operator::LtEq => { + let (ts, reverse) = get_timestamp_filter(ts_col_name, left, right)?; + if reverse { + // [lit] <= ts_col + ts.convert_to_ceil(ts_col_unit) + .map(TimestampRange::from_start) + } else { + // ts_col <= [lit] + ts.convert_to(ts_col_unit) + .map(|ts| TimestampRange::until_end(ts, true)) } - Operator::And => { - // instead of return none when failed to extract time range from left/right, we unwrap the none into - // `TimestampRange::min_to_max`. - let left = self - .extract_time_range_from_expr(left) - .unwrap_or_else(TimestampRange::min_to_max); - let right = self - .extract_time_range_from_expr(right) - .unwrap_or_else(TimestampRange::min_to_max); - Some(left.and(&right)) + } + Operator::Gt => { + let (ts, reverse) = get_timestamp_filter(ts_col_name, left, right)?; + if reverse { + // [lit] > ts_col + ts.convert_to_ceil(ts_col_unit) + .map(|t| TimestampRange::until_end(t, false)) + } else { + // ts_col > [lit] + let ts_val = ts.convert_to(ts_col_unit)?.value(); + Some(TimestampRange::from_start(Timestamp::new( + ts_val + 1, + ts_col_unit, + ))) } - Operator::Or => { - let left = self.extract_time_range_from_expr(left)?; - let right = self.extract_time_range_from_expr(right)?; - Some(left.or(&right)) + } + Operator::GtEq => { + let (ts, reverse) = get_timestamp_filter(ts_col_name, left, right)?; + if reverse { + // [lit] >= ts_col + ts.convert_to(ts_col_unit) + .map(|t| TimestampRange::until_end(t, true)) + } else { + // ts_col >= [lit] + ts.convert_to_ceil(ts_col_unit) + .map(TimestampRange::from_start) } - Operator::NotEq - | Operator::Plus - | Operator::Minus - | Operator::Multiply - | Operator::Divide - | Operator::Modulo - | Operator::IsDistinctFrom - | Operator::IsNotDistinctFrom - | Operator::RegexMatch - | Operator::RegexIMatch - | Operator::RegexNotMatch - | Operator::RegexNotIMatch - | Operator::BitwiseAnd - | Operator::BitwiseOr - | Operator::BitwiseXor - | Operator::BitwiseShiftRight - | Operator::BitwiseShiftLeft - | Operator::StringConcat - | Operator::ArrowAt - | Operator::AtArrow - | Operator::LikeMatch - | Operator::ILikeMatch - | Operator::NotLikeMatch - | Operator::NotILikeMatch => None, } + Operator::And => { + // instead of return none when failed to extract time range from left/right, we unwrap the none into + // `TimestampRange::min_to_max`. + let left = extract_time_range_from_expr(ts_col_name, ts_col_unit, left) + .unwrap_or_else(TimestampRange::min_to_max); + let right = extract_time_range_from_expr(ts_col_name, ts_col_unit, right) + .unwrap_or_else(TimestampRange::min_to_max); + Some(left.and(&right)) + } + Operator::Or => { + let left = extract_time_range_from_expr(ts_col_name, ts_col_unit, left)?; + let right = extract_time_range_from_expr(ts_col_name, ts_col_unit, right)?; + Some(left.or(&right)) + } + Operator::NotEq + | Operator::Plus + | Operator::Minus + | Operator::Multiply + | Operator::Divide + | Operator::Modulo + | Operator::IsDistinctFrom + | Operator::IsNotDistinctFrom + | Operator::RegexMatch + | Operator::RegexIMatch + | Operator::RegexNotMatch + | Operator::RegexNotIMatch + | Operator::BitwiseAnd + | Operator::BitwiseOr + | Operator::BitwiseXor + | Operator::BitwiseShiftRight + | Operator::BitwiseShiftLeft + | Operator::StringConcat + | Operator::ArrowAt + | Operator::AtArrow + | Operator::LikeMatch + | Operator::ILikeMatch + | Operator::NotLikeMatch + | Operator::NotILikeMatch => None, } +} - fn get_timestamp_filter(&self, left: &Expr, right: &Expr) -> Option<(Timestamp, bool)> { - let (col, lit, reverse) = match (left, right) { - (Expr::Column(column), Expr::Literal(scalar)) => (column, scalar, false), - (Expr::Literal(scalar), Expr::Column(column)) => (column, scalar, true), - _ => { - return None; - } - }; - if col.name != self.ts_col_name { +fn get_timestamp_filter(ts_col_name: &str, left: &Expr, right: &Expr) -> Option<(Timestamp, bool)> { + let (col, lit, reverse) = match (left, right) { + (Expr::Column(column), Expr::Literal(scalar)) => (column, scalar, false), + (Expr::Literal(scalar), Expr::Column(column)) => (column, scalar, true), + _ => { return None; } - - return_none_if_utf8!(lit); - scalar_value_to_timestamp(lit, None).map(|t| (t, reverse)) + }; + if col.name != ts_col_name { + return None; } - fn extract_from_between_expr( - &self, - expr: &Expr, - negated: &bool, - low: &Expr, - high: &Expr, - ) -> Option { - let Expr::Column(col) = expr else { - return None; - }; - if col.name != self.ts_col_name { - return None; - } + return_none_if_utf8!(lit); + scalar_value_to_timestamp(lit, None).map(|t| (t, reverse)) +} - if *negated { - return None; - } +fn extract_from_between_expr( + ts_col_name: &str, + ts_col_unit: TimeUnit, + expr: &Expr, + negated: &bool, + low: &Expr, + high: &Expr, +) -> Option { + let Expr::Column(col) = expr else { + return None; + }; + if col.name != ts_col_name { + return None; + } - match (low, high) { - (Expr::Literal(low), Expr::Literal(high)) => { - return_none_if_utf8!(low); - return_none_if_utf8!(high); + if *negated { + return None; + } - let low_opt = scalar_value_to_timestamp(low, None) - .and_then(|ts| ts.convert_to(self.ts_col_unit)); - let high_opt = scalar_value_to_timestamp(high, None) - .and_then(|ts| ts.convert_to_ceil(self.ts_col_unit)); - Some(TimestampRange::new_inclusive(low_opt, high_opt)) - } - _ => None, + match (low, high) { + (Expr::Literal(low), Expr::Literal(high)) => { + return_none_if_utf8!(low); + return_none_if_utf8!(high); + + let low_opt = + scalar_value_to_timestamp(low, None).and_then(|ts| ts.convert_to(ts_col_unit)); + let high_opt = scalar_value_to_timestamp(high, None) + .and_then(|ts| ts.convert_to_ceil(ts_col_unit)); + Some(TimestampRange::new_inclusive(low_opt, high_opt)) } + _ => None, } +} - /// Extract time range filter from `IN (...)` expr. - fn extract_from_in_list_expr( - &self, - expr: &Expr, - negated: bool, - list: &[Expr], - ) -> Option { - if negated { - return None; - } - let Expr::Column(col) = expr else { - return None; - }; - if col.name != self.ts_col_name { - return None; - } +/// Extract time range filter from `IN (...)` expr. +fn extract_from_in_list_expr( + ts_col_name: &str, + expr: &Expr, + negated: bool, + list: &[Expr], +) -> Option { + if negated { + return None; + } + let Expr::Column(col) = expr else { + return None; + }; + if col.name != ts_col_name { + return None; + } - if list.is_empty() { - return Some(TimestampRange::empty()); - } - let mut init_range = TimestampRange::empty(); - for expr in list { - if let Expr::Literal(scalar) = expr { - return_none_if_utf8!(scalar); - if let Some(timestamp) = scalar_value_to_timestamp(scalar, None) { - init_range = init_range.or(&TimestampRange::single(timestamp)) - } else { - // TODO(hl): maybe we should raise an error here since cannot parse - // timestamp value from in list expr - return None; - } + if list.is_empty() { + return Some(TimestampRange::empty()); + } + let mut init_range = TimestampRange::empty(); + for expr in list { + if let Expr::Literal(scalar) = expr { + return_none_if_utf8!(scalar); + if let Some(timestamp) = scalar_value_to_timestamp(scalar, None) { + init_range = init_range.or(&TimestampRange::single(timestamp)) + } else { + // TODO(hl): maybe we should raise an error here since cannot parse + // timestamp value from in list expr + return None; } } - Some(init_range) } + Some(init_range) } #[cfg(test)] @@ -395,7 +389,7 @@ mod tests { fn check_build_predicate(expr: Expr, expect: TimestampRange) { assert_eq!( expect, - TimeRangePredicateBuilder::new("ts", TimeUnit::Millisecond, &[expr]).build() + build_time_range_predicate("ts", TimeUnit::Millisecond, &mut vec![expr]) ); }