Skip to content

Commit

Permalink
feat: table provider use ReadFromRegion
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed May 13, 2024
1 parent c7472b1 commit 8dafa39
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 86 deletions.
10 changes: 6 additions & 4 deletions src/table/src/table/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ use datafusion_expr::expr::Expr as DfExpr;
use datafusion_expr::TableProviderFilterPushDown as DfTableProviderFilterPushDown;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalSortExpr;
use store_api::region_engine::SinglePartitionScanner;
use store_api::storage::ScanRequest;

use crate::table::scan::StreamScanAdapter;
use crate::table::scan::{ReadFromRegion, StreamScanAdapter};
use crate::table::{TableRef, TableType};

/// Adapt greptime's [TableRef] to DataFusion's [TableProvider].
Expand Down Expand Up @@ -110,11 +111,12 @@ impl TableProvider for DfTableProviderAdapter {
.collect::<Vec<_>>()
});

let mut stream_adapter = StreamScanAdapter::new(stream);
let scanner = Arc::new(SinglePartitionScanner::new(stream));
let mut plan = ReadFromRegion::new(scanner);
if let Some(sort_expr) = sort_expr {
stream_adapter = stream_adapter.with_output_ordering(sort_expr);
plan = plan.with_output_ordering(sort_expr);
}
Ok(Arc::new(stream_adapter))
Ok(Arc::new(plan))
}

fn supports_filters_pushdown(
Expand Down
82 changes: 41 additions & 41 deletions tests/cases/distributed/optimizer/order_by.result
Original file line number Diff line number Diff line change
@@ -1,61 +1,61 @@
-- SQLNESS REPLACE (peers.*) REDACTED
explain select * from numbers;

+---------------+-------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | StreamScanAdapter { stream: "<SendableRecordBatchStream>" } |
| | |
+---------------+-------------------------------------------------------------+
+---------------+-----------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | SinglePartitionScanner: <SendableRecordBatchStream> |
| | |
+---------------+-----------------------------------------------------+

-- SQLNESS REPLACE (peers.*) REDACTED
explain select * from numbers order by number desc;

+---------------+---------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | SortExec: expr=[number@0 DESC] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>" } |
| | |
+---------------+---------------------------------------------------------------+
+---------------+-------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | SortExec: expr=[number@0 DESC] |
| | SinglePartitionScanner: <SendableRecordBatchStream> |
| | |
+---------------+-------------------------------------------------------+

-- SQLNESS REPLACE (peers.*) REDACTED
explain select * from numbers order by number asc;

+---------------+---------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>" } |
| | |
+---------------+---------------------------------------------------------------+
+---------------+-------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] |
| | SinglePartitionScanner: <SendableRecordBatchStream> |
| | |
+---------------+-------------------------------------------------------+

-- SQLNESS REPLACE (peers.*) REDACTED
explain select * from numbers order by number desc limit 10;

+---------------+-----------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: TopK(fetch=10), expr=[number@0 DESC] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>" } |
| | |
+---------------+-----------------------------------------------------------------+
+---------------+---------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: TopK(fetch=10), expr=[number@0 DESC] |
| | SinglePartitionScanner: <SendableRecordBatchStream> |
| | |
+---------------+---------------------------------------------------------+

-- SQLNESS REPLACE (peers.*) REDACTED
explain select * from numbers order by number asc limit 10;

+---------------+-----------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>" } |
| | |
+---------------+-----------------------------------------------------------------+
+---------------+------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST] |
| | SinglePartitionScanner: <SendableRecordBatchStream> |
| | |
+---------------+------------------------------------------------------------+

82 changes: 41 additions & 41 deletions tests/cases/standalone/optimizer/order_by.result
Original file line number Diff line number Diff line change
@@ -1,56 +1,56 @@
explain select * from numbers;

+---------------+-------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | StreamScanAdapter { stream: "<SendableRecordBatchStream>" } |
| | |
+---------------+-------------------------------------------------------------+
+---------------+-----------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | SinglePartitionScanner: <SendableRecordBatchStream> |
| | |
+---------------+-----------------------------------------------------+

explain select * from numbers order by number desc;

+---------------+---------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | SortExec: expr=[number@0 DESC] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>" } |
| | |
+---------------+---------------------------------------------------------------+
+---------------+-------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | SortExec: expr=[number@0 DESC] |
| | SinglePartitionScanner: <SendableRecordBatchStream> |
| | |
+---------------+-------------------------------------------------------+

explain select * from numbers order by number asc;

+---------------+---------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>" } |
| | |
+---------------+---------------------------------------------------------------+
+---------------+-------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] |
| | SinglePartitionScanner: <SendableRecordBatchStream> |
| | |
+---------------+-------------------------------------------------------+

explain select * from numbers order by number desc limit 10;

+---------------+-----------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: TopK(fetch=10), expr=[number@0 DESC] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>" } |
| | |
+---------------+-----------------------------------------------------------------+
+---------------+---------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: TopK(fetch=10), expr=[number@0 DESC] |
| | SinglePartitionScanner: <SendableRecordBatchStream> |
| | |
+---------------+---------------------------------------------------------+

explain select * from numbers order by number asc limit 10;

+---------------+-----------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>" } |
| | |
+---------------+-----------------------------------------------------------------+
+---------------+------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST] |
| | SinglePartitionScanner: <SendableRecordBatchStream> |
| | |
+---------------+------------------------------------------------------------+

0 comments on commit 8dafa39

Please sign in to comment.