From 3d1624c2fcff13c882d50f6f1517765f300a1d01 Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Wed, 30 Oct 2024 21:18:31 -0700 Subject: [PATCH] fmt --- crates/arroyo-connectors/src/kafka/mod.rs | 6 ++-- .../arroyo-connectors/src/kafka/source/mod.rs | 4 +-- crates/arroyo-connectors/src/mqtt/mod.rs | 12 +++---- .../arroyo-connectors/src/mqtt/source/mod.rs | 6 ++-- crates/arroyo-formats/src/de.rs | 2 +- crates/arroyo-operator/src/connector.rs | 22 ++++++++---- .../src/extension/table_source.rs | 6 ++-- crates/arroyo-planner/src/rewriters.rs | 32 ++++++++++------- crates/arroyo-planner/src/tables.rs | 34 ++++++++----------- .../arroyo-rpc/src/api_types/connections.rs | 23 ++++++++----- crates/arroyo-rpc/src/lib.rs | 2 +- crates/arroyo-types/src/lib.rs | 12 ++++--- 12 files changed, 89 insertions(+), 72 deletions(-) diff --git a/crates/arroyo-connectors/src/kafka/mod.rs b/crates/arroyo-connectors/src/kafka/mod.rs index 1211798fa..f03c97c10 100644 --- a/crates/arroyo-connectors/src/kafka/mod.rs +++ b/crates/arroyo-connectors/src/kafka/mod.rs @@ -1,5 +1,5 @@ use anyhow::{anyhow, bail}; -use arrow::datatypes::{DataType}; +use arrow::datatypes::DataType; use arroyo_formats::de::ArrowDeserializer; use arroyo_formats::ser::ArrowSerializer; use arroyo_operator::connector::{Connection, MetadataDef}; @@ -215,7 +215,7 @@ impl Connector for KafkaConnector { format: Some(format), bad_data: schema.bad_data.clone(), framing: schema.framing.clone(), - metadata_fields: schema.metadata_fields() + metadata_fields: schema.metadata_fields(), }; Ok(Connection { @@ -325,7 +325,7 @@ impl Connector for KafkaConnector { MetadataDef { name: "timestamp", data_type: DataType::Int64, - } + }, ] } diff --git a/crates/arroyo-connectors/src/kafka/source/mod.rs b/crates/arroyo-connectors/src/kafka/source/mod.rs index 29af7c94c..89fa297d0 100644 --- a/crates/arroyo-connectors/src/kafka/source/mod.rs +++ b/crates/arroyo-connectors/src/kafka/source/mod.rs @@ -169,7 +169,7 @@ impl KafkaSourceFunc { let mut flush_ticker = tokio::time::interval(Duration::from_millis(50)); flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); - + loop { select! { message = consumer.recv() => { @@ -179,7 +179,7 @@ impl KafkaSourceFunc { let timestamp = msg.timestamp().to_millis() .ok_or_else(|| UserError::new("Failed to read timestamp from Kafka record", "The message read from Kafka did not contain a message timestamp"))?; - + let topic = msg.topic(); let connector_metadata = if !self.metadata_fields.is_empty() { diff --git a/crates/arroyo-connectors/src/mqtt/mod.rs b/crates/arroyo-connectors/src/mqtt/mod.rs index 273ed274b..d642b7c5f 100644 --- a/crates/arroyo-connectors/src/mqtt/mod.rs +++ b/crates/arroyo-connectors/src/mqtt/mod.rs @@ -240,14 +240,12 @@ impl Connector for MqttConnector { } fn metadata_defs(&self) -> &'static [MetadataDef] { - &[ - MetadataDef { - name: "topic", - data_type: DataType::Utf8, - } - ] + &[MetadataDef { + name: "topic", + data_type: DataType::Utf8, + }] } - + fn from_options( &self, name: &str, diff --git a/crates/arroyo-connectors/src/mqtt/source/mod.rs b/crates/arroyo-connectors/src/mqtt/source/mod.rs index f0bdcccb3..6f8f9f19d 100644 --- a/crates/arroyo-connectors/src/mqtt/source/mod.rs +++ b/crates/arroyo-connectors/src/mqtt/source/mod.rs @@ -142,14 +142,14 @@ impl MqttSourceFunc { let qos = self.qos; let mut flush_ticker = tokio::time::interval(Duration::from_millis(50)); flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); - + loop { select! { event = eventloop.poll() => { match event { Ok(MqttEvent::Incoming(Incoming::Publish(p))) => { let topic = String::from_utf8_lossy(&p.topic).to_string(); - + let connector_metadata = if !self.metadata_fields.is_empty() { let mut connector_metadata = HashMap::new(); for mf in &self.metadata_fields { @@ -162,7 +162,7 @@ impl MqttSourceFunc { } else { None }; - + ctx.deserialize_slice(&p.payload, SystemTime::now(), connector_metadata.as_ref()).await?; rate_limiter.until_ready().await; } diff --git a/crates/arroyo-formats/src/de.rs b/crates/arroyo-formats/src/de.rs index 0a162492b..3938fe0fc 100644 --- a/crates/arroyo-formats/src/de.rs +++ b/crates/arroyo-formats/src/de.rs @@ -500,7 +500,7 @@ pub(crate) fn add_additional_fields( } pub(crate) fn add_additional_fields_using_builder( - additional_fields: Option<&HashMap<&String , FieldValueType<'_>>>, + additional_fields: Option<&HashMap<&String, FieldValueType<'_>>>, additional_fields_builder: &mut Option>>, ) { if let Some(fields) = additional_fields { diff --git a/crates/arroyo-operator/src/connector.rs b/crates/arroyo-operator/src/connector.rs index adfb9fcef..7ae078935 100644 --- a/crates/arroyo-operator/src/connector.rs +++ b/crates/arroyo-operator/src/connector.rs @@ -5,14 +5,14 @@ use arroyo_rpc::api_types::connections::{ ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage, }; use arroyo_rpc::{primitive_to_sql, OperatorConfig}; +use arroyo_types::DisplayAsSql; +use datafusion::sql::unparser::expr_to_sql; use serde::de::DeserializeOwned; use serde::ser::Serialize; use serde_json::value::Value; use std::collections::HashMap; -use datafusion::sql::unparser::expr_to_sql; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot; -use arroyo_types::DisplayAsSql; #[derive(Debug, Clone)] pub struct Connection { @@ -51,7 +51,7 @@ pub trait Connector: Send { } fn metadata(&self) -> arroyo_rpc::api_types::connections::Connector; - + fn metadata_defs(&self) -> &'static [MetadataDef] { &[] } @@ -278,13 +278,21 @@ impl ErasedConnector for C { if let Some(schema) = schema { for sf in schema.fields.iter() { if let Some(key) = &sf.metadata_key { - let field = self.metadata_defs() + let field = self + .metadata_defs() .iter() .find(|f| f.name == key) - .ok_or_else(|| anyhow!("unknown metadata field '{}' for {} connector '{}'", key, self.name(), name))?; + .ok_or_else(|| { + anyhow!( + "unknown metadata field '{}' for {} connector '{}'", + key, + self.name(), + name + ) + })?; let arrow_field: Field = sf.clone().into(); - + if !field.data_type.equals_datatype(arrow_field.data_type()) { bail!("incorrect data type for metadata field '{}'; expected {}, but found {}", arrow_field.name(), DisplayAsSql(&field.data_type), DisplayAsSql(arrow_field.data_type())); @@ -292,7 +300,7 @@ impl ErasedConnector for C { } } } - + self.from_options(name, options, schema, profile) } diff --git a/crates/arroyo-planner/src/extension/table_source.rs b/crates/arroyo-planner/src/extension/table_source.rs index 07db29e30..8ff2d6a0b 100644 --- a/crates/arroyo-planner/src/extension/table_source.rs +++ b/crates/arroyo-planner/src/extension/table_source.rs @@ -8,14 +8,14 @@ use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; use prost::Message; +use super::{ArroyoExtension, DebeziumUnrollingExtension, NodeWithIncomingEdges}; +use crate::tables::FieldSpec; use crate::{ builder::{NamedNode, Planner}, schema_from_df_fields, schemas::add_timestamp_field, tables::ConnectorTable, }; -use crate::tables::FieldSpec; -use super::{ArroyoExtension, DebeziumUnrollingExtension, NodeWithIncomingEdges}; pub(crate) const TABLE_SOURCE_NAME: &str = "TableSourceExtension"; #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -31,7 +31,7 @@ impl TableSourceExtension { .fields .iter() .filter_map(|field| match field { - FieldSpec::StructField(field) | FieldSpec::MetadataField { field, ..} => { + FieldSpec::StructField(field) | FieldSpec::MetadataField { field, .. } => { Some((Some(name.clone()), Arc::new(field.clone())).into()) } FieldSpec::VirtualField { .. } => None, diff --git a/crates/arroyo-planner/src/rewriters.rs b/crates/arroyo-planner/src/rewriters.rs index 3720bb126..2c3952de0 100644 --- a/crates/arroyo-planner/src/rewriters.rs +++ b/crates/arroyo-planner/src/rewriters.rs @@ -49,10 +49,13 @@ impl<'a> SourceRewriter<'a> { .find_map(|f| { if f.field().name() == &watermark_field { return match f { - FieldSpec::StructField(field) | FieldSpec::MetadataField {field, ..} => Some(Expr::Column(Column { - relation: None, - name: field.name().to_string(), - })), + FieldSpec::StructField(field) + | FieldSpec::MetadataField { field, .. } => { + Some(Expr::Column(Column { + relation: None, + name: field.name().to_string(), + })) + } FieldSpec::VirtualField { expression, .. } => Some(expression.clone()), }; } @@ -84,10 +87,12 @@ impl<'a> SourceRewriter<'a> { .fields .iter() .map(|field| match field { - FieldSpec::StructField(field) | FieldSpec::MetadataField { field, ..} => Expr::Column(Column { - relation: Some(qualifier.clone()), - name: field.name().to_string(), - }), + FieldSpec::StructField(field) | FieldSpec::MetadataField { field, .. } => { + Expr::Column(Column { + relation: Some(qualifier.clone()), + name: field.name().to_string(), + }) + } FieldSpec::VirtualField { field, expression } => expression .clone() .alias_qualified(Some(qualifier.clone()), field.name().to_string()), @@ -106,10 +111,13 @@ impl<'a> SourceRewriter<'a> { .find_map(|f| { if f.field().name() == &event_time_field { return match f { - FieldSpec::StructField(field) | FieldSpec::MetadataField { field, ..} => Some(Expr::Column(Column { - relation: Some(qualifier.clone()), - name: field.name().to_string(), - })), + FieldSpec::StructField(field) + | FieldSpec::MetadataField { field, .. } => { + Some(Expr::Column(Column { + relation: Some(qualifier.clone()), + name: field.name().to_string(), + })) + } FieldSpec::VirtualField { expression, .. } => Some(expression.clone()), }; } diff --git a/crates/arroyo-planner/src/tables.rs b/crates/arroyo-planner/src/tables.rs index 1fb97cc51..0583eeb8b 100644 --- a/crates/arroyo-planner/src/tables.rs +++ b/crates/arroyo-planner/src/tables.rs @@ -21,6 +21,7 @@ use arroyo_rpc::api_types::connections::{ use arroyo_rpc::formats::{BadData, Format, Framing, JsonFormat}; use arroyo_rpc::grpc::api::ConnectorOp; use arroyo_types::ArroyoExtensionType; +use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion::common::{config::ConfigOptions, DFSchema, Result, ScalarValue}; use datafusion::common::{plan_err, Column, DataFusionError}; use datafusion::execution::context::SessionState; @@ -61,7 +62,6 @@ use datafusion::{ sqlparser::ast::{ColumnDef, ColumnOption, Statement, Value}, }, }; -use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use syn::Meta; #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -99,7 +99,7 @@ impl FieldSpec { pub fn field(&self) -> &Field { match self { FieldSpec::StructField(f) => f, - FieldSpec::MetadataField { field, ..} => field, + FieldSpec::MetadataField { field, .. } => field, FieldSpec::VirtualField { field, .. } => field, } } @@ -236,7 +236,7 @@ impl ConnectorTable { } _ => field_spec, }, - FieldSpec::MetadataField {.. } | FieldSpec::VirtualField { .. } => { + FieldSpec::MetadataField { .. } | FieldSpec::VirtualField { .. } => { unreachable!("delta lake is only a sink, can't have virtual fields") } }) @@ -286,7 +286,7 @@ impl ConnectorTable { if let Some(key) = f.metadata_key() { sf.metadata_key = Some(key.to_string()); } - + Ok(sf) }) .collect::>()?; @@ -305,12 +305,7 @@ impl ConnectorTable { .map_err(|e| DataFusionError::Plan(format!("could not create connection schema: {}", e)))?; let connection = connector - .from_options( - name, - options, - Some(&schema), - connection_profile, - ) + .from_options(name, options, Some(&schema), connection_profile) .map_err(|e| DataFusionError::Plan(e.to_string()))?; let mut table: ConnectorTable = connection.into(); @@ -520,16 +515,18 @@ struct MetadataFinder { depth: usize, } -impl <'a> TreeNodeVisitor<'a> for MetadataFinder { +impl<'a> TreeNodeVisitor<'a> for MetadataFinder { type Node = Expr; fn f_down(&mut self, node: &'a Self::Node) -> Result { if let Expr::ScalarFunction(func) = node { if func.name() == "metadata" { if self.depth > 0 { - return plan_err!("Metadata columns must have only a single call to 'metadata'"); + return plan_err!( + "Metadata columns must have only a single call to 'metadata'" + ); } - + return if let &[arg] = &func.args.as_slice() { if let Expr::Literal(ScalarValue::Utf8(Some(key))) = &arg { self.key = Some(key.clone()); @@ -539,7 +536,7 @@ impl <'a> TreeNodeVisitor<'a> for MetadataFinder { } } else { plan_err!("For metadata columns, metadata call must have a single argument") - } + }; } } self.depth += 1; @@ -622,9 +619,9 @@ impl Table { &mut PlannerContext::default(), )?; - let mut metadata_finder = MetadataFinder::default(); + let mut metadata_finder = MetadataFinder::default(); df_expr.visit(&mut metadata_finder)?; - + if let Some(key) = metadata_finder.key { Ok(FieldSpec::MetadataField { field: struct_field, @@ -666,10 +663,7 @@ impl Table { } let connector = with_map.remove("connector"); - let fields = Self::schema_from_columns( - columns, - schema_provider, - )?; + let fields = Self::schema_from_columns(columns, schema_provider)?; let primary_keys = columns .iter() diff --git a/crates/arroyo-rpc/src/api_types/connections.rs b/crates/arroyo-rpc/src/api_types/connections.rs index 379065cc7..5a2a8f623 100644 --- a/crates/arroyo-rpc/src/api_types/connections.rs +++ b/crates/arroyo-rpc/src/api_types/connections.rs @@ -276,9 +276,13 @@ impl ConnectionSchema { } pub fn validate(self) -> anyhow::Result { - let non_metadata_fields: Vec<_> = self.fields.iter().filter(|f| f.metadata_key.is_none()).collect(); + let non_metadata_fields: Vec<_> = self + .fields + .iter() + .filter(|f| f.metadata_key.is_none()) + .collect(); println!("non metadata fields = {:?}", non_metadata_fields); - + match &self.format { Some(Format::RawString(_)) => { if non_metadata_fields.len() != 1 @@ -310,13 +314,16 @@ impl ConnectionSchema { let fields: Vec = self.fields.iter().map(|f| f.clone().into()).collect(); Arc::new(ArroyoSchema::from_fields(fields)) } - + pub fn metadata_fields(&self) -> Vec { - self.fields.iter() - .filter_map(|f| Some(MetadataField { - field_name: f.field_name.clone(), - key: f.metadata_key.clone()? - })) + self.fields + .iter() + .filter_map(|f| { + Some(MetadataField { + field_name: f.field_name.clone(), + key: f.metadata_key.clone()?, + }) + }) .collect() } } diff --git a/crates/arroyo-rpc/src/lib.rs b/crates/arroyo-rpc/src/lib.rs index 4eac04e74..36130bbe6 100644 --- a/crates/arroyo-rpc/src/lib.rs +++ b/crates/arroyo-rpc/src/lib.rs @@ -210,7 +210,7 @@ impl Default for OperatorConfig { bad_data: None, framing: None, rate_limit: None, - metadata_fields: vec![] + metadata_fields: vec![], } } } diff --git a/crates/arroyo-types/src/lib.rs b/crates/arroyo-types/src/lib.rs index 287dde768..c078c8d14 100644 --- a/crates/arroyo-types/src/lib.rs +++ b/crates/arroyo-types/src/lib.rs @@ -458,7 +458,7 @@ pub struct CheckpointBarrier { pub struct DisplayAsSql<'a>(pub &'a DataType); -impl <'a> Display for DisplayAsSql<'a> { +impl<'a> Display for DisplayAsSql<'a> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self.0 { DataType::Boolean => write!(f, "BOOLEAN"), @@ -475,16 +475,18 @@ impl <'a> Display for DisplayAsSql<'a> { DataType::Time64(_) => write!(f, "TIME"), DataType::Duration(_) => write!(f, "INTERVAL"), DataType::Interval(_) => write!(f, "INTERVAL"), - DataType::Binary | DataType::FixedSizeBinary(_) | DataType::LargeBinary => write!(f, "BYTEA"), + DataType::Binary | DataType::FixedSizeBinary(_) | DataType::LargeBinary => { + write!(f, "BYTEA") + } DataType::Utf8 | DataType::LargeUtf8 => write!(f, "TEXT"), DataType::List(inner) => { write!(f, "{}[]", DisplayAsSql(&inner.data_type())) - }, - dt => write!(f, "{}", dt) + } + dt => write!(f, "{}", dt), } } } - + #[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Hash, Serialize)] pub enum DatePart { Year,