Skip to content

Commit

Permalink
refactor: remove StreamScanAdapter
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed May 13, 2024
1 parent 8dafa39 commit 175dac9
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 106 deletions.
2 changes: 1 addition & 1 deletion src/query/src/dummy_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::RegionEngineRef;
use store_api::storage::{RegionId, ScanRequest};
use table::table::scan::{ReadFromRegion, StreamScanAdapter};
use table::table::scan::ReadFromRegion;

use crate::error::{GetRegionMetadataSnafu, Result};

Expand Down
2 changes: 1 addition & 1 deletion src/table/src/table/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datafusion_physical_expr::PhysicalSortExpr;
use store_api::region_engine::SinglePartitionScanner;
use store_api::storage::ScanRequest;

use crate::table::scan::{ReadFromRegion, StreamScanAdapter};
use crate::table::scan::ReadFromRegion;
use crate::table::{TableRef, TableType};

/// Adapt greptime's [TableRef] to DataFusion's [TableProvider].
Expand Down
115 changes: 11 additions & 104 deletions src/table/src/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@
// limitations under the License.

use std::any::Any;
use std::fmt::{self, Debug, Formatter};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::task::{Context, Poll};

use common_query::error::ExecuteRepeatedlySnafu;
use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream, SendableRecordBatchStream};
use common_telemetry::tracing::Span;
use common_telemetry::tracing_context::TracingContext;
Expand All @@ -30,108 +28,13 @@ use datafusion::physical_plan::{
RecordBatchStream as DfRecordBatchStream,
};
use datafusion_common::DataFusionError;
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr};
use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortExpr};
use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datatypes::schema::SchemaRef;
use futures::{Stream, StreamExt};
use snafu::OptionExt;
use store_api::region_engine::RegionScannerRef;

use crate::table::metrics::MemoryUsageMetrics;

/// Adapt greptime's [SendableRecordBatchStream] to [ExecutionPlan].
pub struct StreamScanAdapter {
stream: Mutex<Option<SendableRecordBatchStream>>,
schema: SchemaRef,
output_ordering: Option<Vec<PhysicalSortExpr>>,
metric: ExecutionPlanMetricsSet,
properties: PlanProperties,
}

impl Debug for StreamScanAdapter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamScanAdapter")
.field("stream", &"<SendableRecordBatchStream>")
.finish()
}
}

impl StreamScanAdapter {
pub fn new(stream: SendableRecordBatchStream) -> Self {
let schema = stream.schema();
let properties = PlanProperties::new(
EquivalenceProperties::new(schema.arrow_schema().clone()),
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
);
Self {
stream: Mutex::new(Some(stream)),
schema,
output_ordering: None,
metric: ExecutionPlanMetricsSet::new(),
properties,
}
}

pub fn with_output_ordering(mut self, output_ordering: Vec<PhysicalSortExpr>) -> Self {
self.output_ordering = Some(output_ordering);
self
}
}

impl ExecutionPlan for StreamScanAdapter {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> ArrowSchemaRef {
self.schema.arrow_schema().clone()
}

fn properties(&self) -> &PlanProperties {
&self.properties
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
Ok(self)
}

fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> DfResult<DfSendableRecordBatchStream> {
let tracing_context = TracingContext::from_json(context.session_id().as_str());
let span = tracing_context.attach(common_telemetry::tracing::info_span!("stream_adapter"));

let mut stream = self.stream.lock().unwrap();
let stream = stream.take().context(ExecuteRepeatedlySnafu)?;
let mem_usage_metrics = MemoryUsageMetrics::new(&self.metric, partition);
Ok(Box::pin(StreamWithMetricWrapper {
stream,
metric: mem_usage_metrics,
span,
}))
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
}
}

impl DisplayAs for StreamScanAdapter {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self)
}
}

/// A plan to read multiple partitions from a region of a table.
#[derive(Debug)]
pub struct ReadFromRegion {
Expand Down Expand Up @@ -271,12 +174,15 @@ impl DfRecordBatchStream for StreamWithMetricWrapper {

#[cfg(test)]
mod test {
use std::sync::Arc;

use common_recordbatch::{RecordBatch, RecordBatches};
use datafusion::prelude::SessionContext;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::Int32Vector;
use futures::TryStreamExt;
use store_api::region_engine::SinglePartitionScanner;

use super::*;

Expand Down Expand Up @@ -304,9 +210,10 @@ mod test {
RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap();
let stream = recordbatches.as_stream();

let scan = StreamScanAdapter::new(stream);
let scanner = Arc::new(SinglePartitionScanner::new(stream));
let plan = ReadFromRegion::new(scanner);
let actual: SchemaRef = Arc::new(
scan.properties
plan.properties
.eq_properties
.schema()
.clone()
Expand All @@ -315,12 +222,12 @@ mod test {
);
assert_eq!(actual, schema);

let stream = scan.execute(0, ctx.task_ctx()).unwrap();
let stream = plan.execute(0, ctx.task_ctx()).unwrap();
let recordbatches = stream.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(batch1.df_record_batch(), &recordbatches[0]);
assert_eq!(batch2.df_record_batch(), &recordbatches[1]);

let result = scan.execute(0, ctx.task_ctx());
let result = plan.execute(0, ctx.task_ctx());
assert!(result.is_err());
match result {
Err(e) => assert!(e
Expand Down

0 comments on commit 175dac9

Please sign in to comment.