From 8b60c27c2e96008e78c30234d764cd7fcb609aaf Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 31 Oct 2024 14:15:45 +0800 Subject: [PATCH] feat: enhance windowed-sort optimizer rule (#4910) * add RegionScanner::metadata Signed-off-by: Ruihang Xia * skip PartSort when there is no tag column Signed-off-by: Ruihang Xia * add more sqlness test Signed-off-by: Ruihang Xia * handle desc Signed-off-by: Ruihang Xia * fix: should keep part sort on DESC Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/file-engine/src/engine.rs | 3 +- src/mito2/src/read/seq_scan.rs | 5 + src/mito2/src/read/unordered_scan.rs | 5 + src/query/src/optimizer/parallelize_scan.rs | 29 +++- src/query/src/optimizer/windowed_sort.rs | 29 +++- src/query/src/part_sort.rs | 1 + src/query/src/window_sort.rs | 1 + src/store-api/src/region_engine.rs | 17 +- src/table/src/table/scan.rs | 59 +++++-- .../common/order/windowed_sort.result | 150 +++++++++++++++++- .../standalone/common/order/windowed_sort.sql | 47 ++++++ 11 files changed, 320 insertions(+), 26 deletions(-) diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index e6313f4322cc..a29a3add23d6 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -91,8 +91,9 @@ impl RegionEngine for FileRegionEngine { request: ScanRequest, ) -> Result { let stream = self.handle_query(region_id, request).await?; + let metadata = self.get_metadata(region_id).await?; // We don't support enabling append mode for file engine. - let scanner = Box::new(SinglePartitionScanner::new(stream, false)); + let scanner = Box::new(SinglePartitionScanner::new(stream, false, metadata)); Ok(scanner) } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 1a789c8d5f21..9b7a71a36c51 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -27,6 +27,7 @@ use common_telemetry::tracing; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; use snafu::ResultExt; +use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties}; use store_api::storage::TimeSeriesRowSelector; use tokio::sync::Semaphore; @@ -321,6 +322,10 @@ impl RegionScanner for SeqScan { let predicate = self.stream_ctx.input.predicate(); predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false) } + + fn metadata(&self) -> RegionMetadataRef { + self.stream_ctx.input.mapper.metadata().clone() + } } impl DisplayAs for SeqScan { diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 2401504f7915..707b7d4ba65c 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -26,6 +26,7 @@ use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; use futures::{Stream, StreamExt}; use snafu::ResultExt; +use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties}; use crate::error::{PartitionOutOfRangeSnafu, Result}; @@ -229,6 +230,10 @@ impl RegionScanner for UnorderedScan { let predicate = self.stream_ctx.input.predicate(); predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false) } + + fn metadata(&self) -> RegionMetadataRef { + self.stream_ctx.input.mapper.metadata().clone() + } } impl DisplayAs for UnorderedScan { diff --git a/src/query/src/optimizer/parallelize_scan.rs b/src/query/src/optimizer/parallelize_scan.rs index 19f5db39d333..02cd04df87b6 100644 --- a/src/query/src/optimizer/parallelize_scan.rs +++ b/src/query/src/optimizer/parallelize_scan.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use common_telemetry::debug; use datafusion::config::ConfigOptions; use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::ExecutionPlan; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{DataFusionError, Result}; @@ -48,9 +49,16 @@ impl ParallelizeScan { plan: Arc, config: &ConfigOptions, ) -> Result> { + let mut first_order_expr = None; + let result = plan .transform_down(|plan| { - if let Some(region_scan_exec) = plan.as_any().downcast_ref::() { + if let Some(sort_exec) = plan.as_any().downcast_ref::() { + // save the first order expr + first_order_expr = sort_exec.expr().first().cloned(); + } else if let Some(region_scan_exec) = + plan.as_any().downcast_ref::() + { if region_scan_exec.is_partition_set() { return Ok(Transformed::no(plan)); } @@ -66,10 +74,21 @@ impl ParallelizeScan { "Assign {total_range_num} ranges to {expected_partition_num} partitions" ); - // sort the ranges in each partition - // TODO(ruihang): smart sort! - for ranges in partition_ranges.iter_mut() { - ranges.sort_by(|a, b| a.start.cmp(&b.start)); + // Sort the ranges in each partition based on the order expr + // + // This optimistically assumes that the first order expr is on the time index column + // to skip the validation of the order expr. As it's not harmful if this condition + // is not met. + if let Some(order_expr) = &first_order_expr + && order_expr.options.descending + { + for ranges in partition_ranges.iter_mut() { + ranges.sort_by(|a, b| b.end.cmp(&a.end)); + } + } else { + for ranges in partition_ranges.iter_mut() { + ranges.sort_by(|a, b| a.start.cmp(&b.start)); + } } // update the partition ranges diff --git a/src/query/src/optimizer/windowed_sort.rs b/src/query/src/optimizer/windowed_sort.rs index 62d4495cf335..63150fc1f896 100644 --- a/src/query/src/optimizer/windowed_sort.rs +++ b/src/query/src/optimizer/windowed_sort.rs @@ -77,7 +77,6 @@ impl WindowedSortPhysicalRule { }; if let Some(first_sort_expr) = sort_exec.expr().first() - && !first_sort_expr.options.descending && let Some(column_expr) = first_sort_expr .expr .as_any() @@ -87,18 +86,28 @@ impl WindowedSortPhysicalRule { } else { return Ok(Transformed::no(plan)); } - let first_sort_expr = sort_exec.expr().first().unwrap().clone(); - let part_sort_exec = Arc::new(PartSortExec::new( - first_sort_expr.clone(), - scanner_info.partition_ranges.clone(), - sort_exec.input().clone(), - )); + + // PartSortExec is unnecessary if: + // - there is no tag column, and + // - the sort is ascending on the time index column + let new_input = if scanner_info.tag_columns.is_empty() + && !first_sort_expr.options.descending + { + sort_exec.input().clone() + } else { + Arc::new(PartSortExec::new( + first_sort_expr.clone(), + scanner_info.partition_ranges.clone(), + sort_exec.input().clone(), + )) + }; + let windowed_sort_exec = WindowedSortExec::try_new( first_sort_expr, sort_exec.fetch(), scanner_info.partition_ranges, - part_sort_exec, + new_input, )?; return Ok(Transformed { @@ -119,11 +128,13 @@ impl WindowedSortPhysicalRule { struct ScannerInfo { partition_ranges: Vec>, time_index: String, + tag_columns: Vec, } fn fetch_partition_range(input: Arc) -> DataFusionResult> { let mut partition_ranges = None; let mut time_index = None; + let mut tag_columns = None; input.transform_up(|plan| { // Unappliable case, reset the state. @@ -139,6 +150,7 @@ fn fetch_partition_range(input: Arc) -> DataFusionResult() { partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges()); time_index = region_scan_exec.time_index(); + tag_columns = Some(region_scan_exec.tag_columns()); // set distinguish_partition_ranges to true, this is an incorrect workaround region_scan_exec.with_distinguish_partition_range(true); @@ -151,6 +163,7 @@ fn fetch_partition_range(input: Arc) -> DataFusionResult, + #[allow(dead_code)] // this is used under #[debug_assertions] partition: usize, cur_part_idx: usize, metrics: BaselineMetrics, diff --git a/src/query/src/window_sort.rs b/src/query/src/window_sort.rs index 305585b2679d..38b64e29aaa0 100644 --- a/src/query/src/window_sort.rs +++ b/src/query/src/window_sort.rs @@ -270,6 +270,7 @@ pub struct WindowedSortStream { /// working ranges promise once input stream get a value out of current range, future values will never be in this range all_avail_working_range: Vec<(TimeRange, BTreeSet)>, /// The input partition ranges + #[allow(dead_code)] // this is used under #[debug_assertions] ranges: Vec, /// Execution metrics metrics: BaselineMetrics, diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 0832385c930b..8dd706395d1d 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -265,6 +265,9 @@ pub trait RegionScanner: Debug + DisplayAs + Send { /// Returns the schema of the record batches. fn schema(&self) -> SchemaRef; + /// Returns the metadata of the region. + fn metadata(&self) -> RegionMetadataRef; + /// Prepares the scanner with the given partition ranges. /// /// This method is for the planner to adjust the scanner's behavior based on the partition ranges. @@ -414,11 +417,16 @@ pub struct SinglePartitionScanner { stream: Mutex>, schema: SchemaRef, properties: ScannerProperties, + metadata: RegionMetadataRef, } impl SinglePartitionScanner { - /// Creates a new [SinglePartitionScanner] with the given stream. - pub fn new(stream: SendableRecordBatchStream, append_mode: bool) -> Self { + /// Creates a new [SinglePartitionScanner] with the given stream and metadata. + pub fn new( + stream: SendableRecordBatchStream, + append_mode: bool, + metadata: RegionMetadataRef, + ) -> Self { let schema = stream.schema(); Self { stream: Mutex::new(Some(stream)), @@ -426,6 +434,7 @@ impl SinglePartitionScanner { properties: ScannerProperties::default() .with_parallelism(1) .with_append_mode(append_mode), + metadata, } } } @@ -468,6 +477,10 @@ impl RegionScanner for SinglePartitionScanner { fn has_predicate(&self) -> bool { false } + + fn metadata(&self) -> RegionMetadataRef { + self.metadata.clone() + } } impl DisplayAs for SinglePartitionScanner { diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 9207c1e86d6e..cc94a054de84 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -154,6 +154,16 @@ impl RegionScanExec { .timestamp_column() .map(|x| x.name.clone()) } + + pub fn tag_columns(&self) -> Vec { + self.scanner + .lock() + .unwrap() + .metadata() + .primary_key_columns() + .map(|col| col.column_schema.name.clone()) + .collect() + } } impl ExecutionPlan for RegionScanExec { @@ -301,33 +311,45 @@ impl DfRecordBatchStream for StreamWithMetricWrapper { mod test { use std::sync::Arc; + use api::v1::SemanticType; use common_recordbatch::{RecordBatch, RecordBatches}; use datafusion::prelude::SessionContext; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; - use datatypes::vectors::Int32Vector; + use datatypes::vectors::{Int32Vector, TimestampMillisecondVector}; use futures::TryStreamExt; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use store_api::region_engine::SinglePartitionScanner; + use store_api::storage::RegionId; use super::*; #[tokio::test] async fn test_simple_table_scan() { let ctx = SessionContext::new(); - let schema = Arc::new(Schema::new(vec![ColumnSchema::new( - "a", - ConcreteDataType::int32_datatype(), - false, - )])); + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false), + ColumnSchema::new( + "b", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + ])); let batch1 = RecordBatch::new( schema.clone(), - vec![Arc::new(Int32Vector::from_slice([1, 2])) as _], + vec![ + Arc::new(Int32Vector::from_slice([1, 2])) as _, + Arc::new(TimestampMillisecondVector::from_slice([1000, 2000])) as _, + ], ) .unwrap(); let batch2 = RecordBatch::new( schema.clone(), - vec![Arc::new(Int32Vector::from_slice([3, 4, 5])) as _], + vec![ + Arc::new(Int32Vector::from_slice([3, 4, 5])) as _, + Arc::new(TimestampMillisecondVector::from_slice([3000, 4000, 5000])) as _, + ], ) .unwrap(); @@ -335,7 +357,26 @@ mod test { RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap(); let stream = recordbatches.as_stream(); - let scanner = Box::new(SinglePartitionScanner::new(stream, false)); + let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "b", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }) + .primary_key(vec![1]); + let region_metadata = Arc::new(builder.build().unwrap()); + + let scanner = Box::new(SinglePartitionScanner::new(stream, false, region_metadata)); let plan = RegionScanExec::new(scanner); let actual: SchemaRef = Arc::new( plan.properties diff --git a/tests/cases/standalone/common/order/windowed_sort.result b/tests/cases/standalone/common/order/windowed_sort.result index 1cc0ab7720e3..9ecec83d2053 100644 --- a/tests/cases/standalone/common/order/windowed_sort.result +++ b/tests/cases/standalone/common/order/windowed_sort.result @@ -1,3 +1,4 @@ +-- Test without PK, with a windowed sort query. CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX); Affected Rows: 0 @@ -69,7 +70,39 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t LIMIT 5; | 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED |_|_|_SortPreservingMergeExec: [t@1 ASC NULLS LAST] REDACTED |_|_|_WindowedSortExec REDACTED -|_|_|_PartSortExec t@1 ASC NULLS LAST REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED +|_|_|_| +|_|_| Total rows: 5_| ++-+-+-+ + +SELECT * FROM test ORDER BY t DESC LIMIT 5; + ++---+-------------------------+ +| i | t | ++---+-------------------------+ +| 4 | 1970-01-01T00:00:00.012 | +| 4 | 1970-01-01T00:00:00.011 | +| 4 | 1970-01-01T00:00:00.010 | +| 3 | 1970-01-01T00:00:00.009 | +| 3 | 1970-01-01T00:00:00.008 | ++---+-------------------------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM test ORDER BY t DESC LIMIT 5; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED +|_|_|_SortPreservingMergeExec: [t@1 DESC] REDACTED +|_|_|_WindowedSortExec REDACTED +|_|_|_PartSortExec t@1 DESC REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED |_|_|_| |_|_| Total rows: 5_| @@ -79,3 +112,118 @@ DROP TABLE test; Affected Rows: 0 +-- Test with PK, with a windowed sort query. +CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX); + +Affected Rows: 0 + +INSERT INTO test_pk VALUES (1, 1, 1), (2, NULL, 2), (3, 1, 3); + +Affected Rows: 3 + +ADMIN FLUSH_TABLE('test_pk'); + ++------------------------------+ +| ADMIN FLUSH_TABLE('test_pk') | ++------------------------------+ +| 0 | ++------------------------------+ + +INSERT INTO test_pk VALUES (4, 2, 4), (5, 2, 5), (6, NULL, 6); + +Affected Rows: 3 + +ADMIN FLUSH_TABLE('test_pk'); + ++------------------------------+ +| ADMIN FLUSH_TABLE('test_pk') | ++------------------------------+ +| 0 | ++------------------------------+ + +INSERT INTO test_pk VALUES (7, 3, 7), (8, 3, 8), (9, 3, 9); + +Affected Rows: 3 + +ADMIN FLUSH_TABLE('test_pk'); + ++------------------------------+ +| ADMIN FLUSH_TABLE('test_pk') | ++------------------------------+ +| 0 | ++------------------------------+ + +INSERT INTO test_pk VALUES (10, 4, 10), (11, 4, 11), (12, 4, 12); + +Affected Rows: 3 + +SELECT * FROM test_pk ORDER BY t LIMIT 5; + ++----+---+-------------------------+ +| pk | i | t | ++----+---+-------------------------+ +| 1 | 1 | 1970-01-01T00:00:00.001 | +| 2 | | 1970-01-01T00:00:00.002 | +| 3 | 1 | 1970-01-01T00:00:00.003 | +| 4 | 2 | 1970-01-01T00:00:00.004 | +| 5 | 2 | 1970-01-01T00:00:00.005 | ++----+---+-------------------------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t LIMIT 5; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED +|_|_|_SortPreservingMergeExec: [t@2 ASC NULLS LAST] REDACTED +|_|_|_WindowedSortExec REDACTED +|_|_|_PartSortExec t@2 ASC NULLS LAST REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED +|_|_|_| +|_|_| Total rows: 5_| ++-+-+-+ + +SELECT * FROM test_pk ORDER BY t DESC LIMIT 5; + ++----+---+-------------------------+ +| pk | i | t | ++----+---+-------------------------+ +| 12 | 4 | 1970-01-01T00:00:00.012 | +| 11 | 4 | 1970-01-01T00:00:00.011 | +| 10 | 4 | 1970-01-01T00:00:00.010 | +| 9 | 3 | 1970-01-01T00:00:00.009 | +| 8 | 3 | 1970-01-01T00:00:00.008 | ++----+---+-------------------------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t DESC LIMIT 5; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED +|_|_|_SortPreservingMergeExec: [t@2 DESC] REDACTED +|_|_|_WindowedSortExec REDACTED +|_|_|_PartSortExec t@2 DESC REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED +|_|_|_| +|_|_| Total rows: 5_| ++-+-+-+ + +DROP TABLE test_pk; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/order/windowed_sort.sql b/tests/cases/standalone/common/order/windowed_sort.sql index 7767825e3d0c..e8006f74ce17 100644 --- a/tests/cases/standalone/common/order/windowed_sort.sql +++ b/tests/cases/standalone/common/order/windowed_sort.sql @@ -1,3 +1,4 @@ +-- Test without PK, with a windowed sort query. CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX); INSERT INTO test VALUES (1, 1), (NULL, 2), (1, 3); @@ -23,4 +24,50 @@ SELECT * FROM test ORDER BY t LIMIT 5; -- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED EXPLAIN ANALYZE SELECT * FROM test ORDER BY t LIMIT 5; +SELECT * FROM test ORDER BY t DESC LIMIT 5; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM test ORDER BY t DESC LIMIT 5; + DROP TABLE test; + +-- Test with PK, with a windowed sort query. +CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX); + +INSERT INTO test_pk VALUES (1, 1, 1), (2, NULL, 2), (3, 1, 3); + +ADMIN FLUSH_TABLE('test_pk'); + +INSERT INTO test_pk VALUES (4, 2, 4), (5, 2, 5), (6, NULL, 6); + +ADMIN FLUSH_TABLE('test_pk'); + +INSERT INTO test_pk VALUES (7, 3, 7), (8, 3, 8), (9, 3, 9); + +ADMIN FLUSH_TABLE('test_pk'); + +INSERT INTO test_pk VALUES (10, 4, 10), (11, 4, 11), (12, 4, 12); + +SELECT * FROM test_pk ORDER BY t LIMIT 5; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t LIMIT 5; + +SELECT * FROM test_pk ORDER BY t DESC LIMIT 5; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t DESC LIMIT 5; + +DROP TABLE test_pk;