Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Oct 31, 2024
1 parent bbac9e0 commit 3d1624c
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 72 deletions.
6 changes: 3 additions & 3 deletions crates/arroyo-connectors/src/kafka/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{anyhow, bail};
use arrow::datatypes::{DataType};
use arrow::datatypes::DataType;
use arroyo_formats::de::ArrowDeserializer;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::connector::{Connection, MetadataDef};
Expand Down Expand Up @@ -215,7 +215,7 @@ impl Connector for KafkaConnector {
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
metadata_fields: schema.metadata_fields()
metadata_fields: schema.metadata_fields(),
};

Ok(Connection {
Expand Down Expand Up @@ -325,7 +325,7 @@ impl Connector for KafkaConnector {
MetadataDef {
name: "timestamp",
data_type: DataType::Int64,
}
},
]
}

Expand Down
4 changes: 2 additions & 2 deletions crates/arroyo-connectors/src/kafka/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl KafkaSourceFunc {

let mut flush_ticker = tokio::time::interval(Duration::from_millis(50));
flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);

loop {
select! {
message = consumer.recv() => {
Expand All @@ -179,7 +179,7 @@ impl KafkaSourceFunc {
let timestamp = msg.timestamp().to_millis()
.ok_or_else(|| UserError::new("Failed to read timestamp from Kafka record",
"The message read from Kafka did not contain a message timestamp"))?;

let topic = msg.topic();

let connector_metadata = if !self.metadata_fields.is_empty() {
Expand Down
12 changes: 5 additions & 7 deletions crates/arroyo-connectors/src/mqtt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,12 @@ impl Connector for MqttConnector {
}

fn metadata_defs(&self) -> &'static [MetadataDef] {
&[
MetadataDef {
name: "topic",
data_type: DataType::Utf8,
}
]
&[MetadataDef {
name: "topic",
data_type: DataType::Utf8,
}]
}

fn from_options(
&self,
name: &str,
Expand Down
6 changes: 3 additions & 3 deletions crates/arroyo-connectors/src/mqtt/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ impl MqttSourceFunc {
let qos = self.qos;
let mut flush_ticker = tokio::time::interval(Duration::from_millis(50));
flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);

loop {
select! {
event = eventloop.poll() => {
match event {
Ok(MqttEvent::Incoming(Incoming::Publish(p))) => {
let topic = String::from_utf8_lossy(&p.topic).to_string();

let connector_metadata = if !self.metadata_fields.is_empty() {
let mut connector_metadata = HashMap::new();
for mf in &self.metadata_fields {
Expand All @@ -162,7 +162,7 @@ impl MqttSourceFunc {
} else {
None
};

ctx.deserialize_slice(&p.payload, SystemTime::now(), connector_metadata.as_ref()).await?;
rate_limiter.until_ready().await;
}
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-formats/src/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ pub(crate) fn add_additional_fields(
}

pub(crate) fn add_additional_fields_using_builder(
additional_fields: Option<&HashMap<&String , FieldValueType<'_>>>,
additional_fields: Option<&HashMap<&String, FieldValueType<'_>>>,
additional_fields_builder: &mut Option<HashMap<String, Box<dyn ArrayBuilder>>>,
) {
if let Some(fields) = additional_fields {
Expand Down
22 changes: 15 additions & 7 deletions crates/arroyo-operator/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage,
};
use arroyo_rpc::{primitive_to_sql, OperatorConfig};
use arroyo_types::DisplayAsSql;
use datafusion::sql::unparser::expr_to_sql;
use serde::de::DeserializeOwned;
use serde::ser::Serialize;
use serde_json::value::Value;
use std::collections::HashMap;
use datafusion::sql::unparser::expr_to_sql;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use arroyo_types::DisplayAsSql;

#[derive(Debug, Clone)]
pub struct Connection {
Expand Down Expand Up @@ -51,7 +51,7 @@ pub trait Connector: Send {
}

fn metadata(&self) -> arroyo_rpc::api_types::connections::Connector;

fn metadata_defs(&self) -> &'static [MetadataDef] {
&[]
}
Expand Down Expand Up @@ -278,21 +278,29 @@ impl<C: Connector> ErasedConnector for C {
if let Some(schema) = schema {
for sf in schema.fields.iter() {
if let Some(key) = &sf.metadata_key {
let field = self.metadata_defs()
let field = self
.metadata_defs()
.iter()
.find(|f| f.name == key)
.ok_or_else(|| anyhow!("unknown metadata field '{}' for {} connector '{}'", key, self.name(), name))?;
.ok_or_else(|| {
anyhow!(
"unknown metadata field '{}' for {} connector '{}'",
key,
self.name(),
name
)
})?;

let arrow_field: Field = sf.clone().into();

if !field.data_type.equals_datatype(arrow_field.data_type()) {
bail!("incorrect data type for metadata field '{}'; expected {}, but found {}",
arrow_field.name(), DisplayAsSql(&field.data_type), DisplayAsSql(arrow_field.data_type()));
}
}
}
}

self.from_options(name, options, schema, profile)
}

Expand Down
6 changes: 3 additions & 3 deletions crates/arroyo-planner/src/extension/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};

use prost::Message;

use super::{ArroyoExtension, DebeziumUnrollingExtension, NodeWithIncomingEdges};
use crate::tables::FieldSpec;
use crate::{
builder::{NamedNode, Planner},
schema_from_df_fields,
schemas::add_timestamp_field,
tables::ConnectorTable,
};
use crate::tables::FieldSpec;
use super::{ArroyoExtension, DebeziumUnrollingExtension, NodeWithIncomingEdges};
pub(crate) const TABLE_SOURCE_NAME: &str = "TableSourceExtension";

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand All @@ -31,7 +31,7 @@ impl TableSourceExtension {
.fields
.iter()
.filter_map(|field| match field {
FieldSpec::StructField(field) | FieldSpec::MetadataField { field, ..} => {
FieldSpec::StructField(field) | FieldSpec::MetadataField { field, .. } => {
Some((Some(name.clone()), Arc::new(field.clone())).into())
}
FieldSpec::VirtualField { .. } => None,
Expand Down
32 changes: 20 additions & 12 deletions crates/arroyo-planner/src/rewriters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@ impl<'a> SourceRewriter<'a> {
.find_map(|f| {
if f.field().name() == &watermark_field {
return match f {
FieldSpec::StructField(field) | FieldSpec::MetadataField {field, ..} => Some(Expr::Column(Column {
relation: None,
name: field.name().to_string(),
})),
FieldSpec::StructField(field)
| FieldSpec::MetadataField { field, .. } => {
Some(Expr::Column(Column {
relation: None,
name: field.name().to_string(),
}))
}
FieldSpec::VirtualField { expression, .. } => Some(expression.clone()),
};
}
Expand Down Expand Up @@ -84,10 +87,12 @@ impl<'a> SourceRewriter<'a> {
.fields
.iter()
.map(|field| match field {
FieldSpec::StructField(field) | FieldSpec::MetadataField { field, ..} => Expr::Column(Column {
relation: Some(qualifier.clone()),
name: field.name().to_string(),
}),
FieldSpec::StructField(field) | FieldSpec::MetadataField { field, .. } => {
Expr::Column(Column {
relation: Some(qualifier.clone()),
name: field.name().to_string(),
})
}
FieldSpec::VirtualField { field, expression } => expression
.clone()
.alias_qualified(Some(qualifier.clone()), field.name().to_string()),
Expand All @@ -106,10 +111,13 @@ impl<'a> SourceRewriter<'a> {
.find_map(|f| {
if f.field().name() == &event_time_field {
return match f {
FieldSpec::StructField(field) | FieldSpec::MetadataField { field, ..} => Some(Expr::Column(Column {
relation: Some(qualifier.clone()),
name: field.name().to_string(),
})),
FieldSpec::StructField(field)
| FieldSpec::MetadataField { field, .. } => {
Some(Expr::Column(Column {
relation: Some(qualifier.clone()),
name: field.name().to_string(),
}))
}
FieldSpec::VirtualField { expression, .. } => Some(expression.clone()),
};
}
Expand Down
34 changes: 14 additions & 20 deletions crates/arroyo-planner/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use arroyo_rpc::api_types::connections::{
use arroyo_rpc::formats::{BadData, Format, Framing, JsonFormat};
use arroyo_rpc::grpc::api::ConnectorOp;
use arroyo_types::ArroyoExtensionType;
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion::common::{config::ConfigOptions, DFSchema, Result, ScalarValue};
use datafusion::common::{plan_err, Column, DataFusionError};
use datafusion::execution::context::SessionState;
Expand Down Expand Up @@ -61,7 +62,6 @@ use datafusion::{
sqlparser::ast::{ColumnDef, ColumnOption, Statement, Value},
},
};
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use syn::Meta;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -99,7 +99,7 @@ impl FieldSpec {
pub fn field(&self) -> &Field {
match self {
FieldSpec::StructField(f) => f,
FieldSpec::MetadataField { field, ..} => field,
FieldSpec::MetadataField { field, .. } => field,
FieldSpec::VirtualField { field, .. } => field,
}
}
Expand Down Expand Up @@ -236,7 +236,7 @@ impl ConnectorTable {
}
_ => field_spec,
},
FieldSpec::MetadataField {.. } | FieldSpec::VirtualField { .. } => {
FieldSpec::MetadataField { .. } | FieldSpec::VirtualField { .. } => {
unreachable!("delta lake is only a sink, can't have virtual fields")
}
})
Expand Down Expand Up @@ -286,7 +286,7 @@ impl ConnectorTable {
if let Some(key) = f.metadata_key() {
sf.metadata_key = Some(key.to_string());
}

Ok(sf)
})
.collect::<Result<_>>()?;
Expand All @@ -305,12 +305,7 @@ impl ConnectorTable {
.map_err(|e| DataFusionError::Plan(format!("could not create connection schema: {}", e)))?;

let connection = connector
.from_options(
name,
options,
Some(&schema),
connection_profile,
)
.from_options(name, options, Some(&schema), connection_profile)
.map_err(|e| DataFusionError::Plan(e.to_string()))?;

let mut table: ConnectorTable = connection.into();
Expand Down Expand Up @@ -520,16 +515,18 @@ struct MetadataFinder {
depth: usize,
}

impl <'a> TreeNodeVisitor<'a> for MetadataFinder {
impl<'a> TreeNodeVisitor<'a> for MetadataFinder {
type Node = Expr;

fn f_down(&mut self, node: &'a Self::Node) -> Result<TreeNodeRecursion> {
if let Expr::ScalarFunction(func) = node {
if func.name() == "metadata" {
if self.depth > 0 {
return plan_err!("Metadata columns must have only a single call to 'metadata'");
return plan_err!(
"Metadata columns must have only a single call to 'metadata'"
);
}

return if let &[arg] = &func.args.as_slice() {
if let Expr::Literal(ScalarValue::Utf8(Some(key))) = &arg {
self.key = Some(key.clone());
Expand All @@ -539,7 +536,7 @@ impl <'a> TreeNodeVisitor<'a> for MetadataFinder {
}
} else {
plan_err!("For metadata columns, metadata call must have a single argument")
}
};
}
}
self.depth += 1;
Expand Down Expand Up @@ -622,9 +619,9 @@ impl Table {
&mut PlannerContext::default(),
)?;

let mut metadata_finder = MetadataFinder::default();
let mut metadata_finder = MetadataFinder::default();
df_expr.visit(&mut metadata_finder)?;

if let Some(key) = metadata_finder.key {
Ok(FieldSpec::MetadataField {
field: struct_field,
Expand Down Expand Up @@ -666,10 +663,7 @@ impl Table {
}

let connector = with_map.remove("connector");
let fields = Self::schema_from_columns(
columns,
schema_provider,
)?;
let fields = Self::schema_from_columns(columns, schema_provider)?;

let primary_keys = columns
.iter()
Expand Down
23 changes: 15 additions & 8 deletions crates/arroyo-rpc/src/api_types/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,13 @@ impl ConnectionSchema {
}

pub fn validate(self) -> anyhow::Result<Self> {
let non_metadata_fields: Vec<_> = self.fields.iter().filter(|f| f.metadata_key.is_none()).collect();
let non_metadata_fields: Vec<_> = self
.fields
.iter()
.filter(|f| f.metadata_key.is_none())
.collect();
println!("non metadata fields = {:?}", non_metadata_fields);

match &self.format {
Some(Format::RawString(_)) => {
if non_metadata_fields.len() != 1
Expand Down Expand Up @@ -310,13 +314,16 @@ impl ConnectionSchema {
let fields: Vec<Field> = self.fields.iter().map(|f| f.clone().into()).collect();
Arc::new(ArroyoSchema::from_fields(fields))
}

pub fn metadata_fields(&self) -> Vec<MetadataField> {
self.fields.iter()
.filter_map(|f| Some(MetadataField {
field_name: f.field_name.clone(),
key: f.metadata_key.clone()?
}))
self.fields
.iter()
.filter_map(|f| {
Some(MetadataField {
field_name: f.field_name.clone(),
key: f.metadata_key.clone()?,
})
})
.collect()
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl Default for OperatorConfig {
bad_data: None,
framing: None,
rate_limit: None,
metadata_fields: vec![]
metadata_fields: vec![],
}
}
}
Expand Down
Loading

0 comments on commit 3d1624c

Please sign in to comment.