Skip to content

Commit

Permalink
Add timestamp metadata field to kafka connector (#776)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Oct 31, 2024
1 parent 066fbe8 commit 4f56baf
Show file tree
Hide file tree
Showing 38 changed files with 380 additions and 334 deletions.
1 change: 0 additions & 1 deletion crates/arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ async fn compile_sql<'a>(
.unwrap_or(json!({})),
&table.config,
Some(&table.schema),
None,
)
.map_err(log_and_map)?;

Expand Down
7 changes: 2 additions & 5 deletions crates/arroyo-connectors/src/blackhole/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::blackhole::operator::BlackholeSinkFunc;
use anyhow::anyhow;
use arrow::datatypes::DataType;
use arroyo_operator::connector::{Connection, Connector};
use arroyo_operator::operator::OperatorNode;
use arroyo_rpc::api_types::connections::{
Expand Down Expand Up @@ -79,9 +78,8 @@ impl Connector for BlackholeConnector {
_options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
self.from_config(None, name, EmptyConfig {}, EmptyConfig {}, schema, None)
self.from_config(None, name, EmptyConfig {}, EmptyConfig {}, schema)
}

fn from_config(
Expand All @@ -91,7 +89,6 @@ impl Connector for BlackholeConnector {
config: Self::ProfileT,
table: Self::TableT,
s: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let description = "Blackhole".to_string();

Expand All @@ -102,7 +99,7 @@ impl Connector for BlackholeConnector {
format: None,
bad_data: None,
framing: None,
additional_fields: None,
metadata_fields: vec![],
};

Ok(Connection {
Expand Down
7 changes: 2 additions & 5 deletions crates/arroyo-connectors/src/confluent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::kafka::{
};
use crate::{kafka, pull_opt};
use anyhow::anyhow;
use arrow::datatypes::DataType;
use arroyo_operator::connector::{Connection, Connector};
use arroyo_operator::operator::OperatorNode;
use arroyo_rpc::api_types::connections::{
Expand Down Expand Up @@ -162,7 +161,6 @@ impl Connector for ConfluentConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let connection = profile
.map(|p| {
Expand All @@ -174,7 +172,7 @@ impl Connector for ConfluentConnector {

let table = KafkaConnector::table_from_options(options)?;

self.from_config(None, name, connection, table, schema, None)
self.from_config(None, name, connection, table, schema)
}

fn from_config(
Expand All @@ -184,12 +182,11 @@ impl Connector for ConfluentConnector {
config: Self::ProfileT,
mut table: Self::TableT,
schema: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
table
.client_configs
.insert("client.id".to_string(), CLIENT_ID.to_string());
KafkaConnector {}.from_config(id, name, config.into(), table, schema, None)
KafkaConnector {}.from_config(id, name, config.into(), table, schema)
}

fn make_operator(
Expand Down
7 changes: 2 additions & 5 deletions crates/arroyo-connectors/src/filesystem/delta.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::{anyhow, bail};
use arrow::datatypes::DataType;
use arroyo_operator::connector::Connection;
use arroyo_storage::BackendConfig;
use std::collections::HashMap;
Expand Down Expand Up @@ -78,7 +77,6 @@ impl Connector for DeltaLakeConnector {
config: Self::ProfileT,
table: Self::TableT,
schema: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<arroyo_operator::connector::Connection> {
let TableType::Sink {
write_path,
Expand Down Expand Up @@ -125,7 +123,7 @@ impl Connector for DeltaLakeConnector {
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
additional_fields: None,
metadata_fields: schema.metadata_fields(),
};

Ok(Connection {
Expand All @@ -145,11 +143,10 @@ impl Connector for DeltaLakeConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let table = file_system_sink_from_options(options, schema, CommitStyle::DeltaLake)?;

self.from_config(None, name, EmptyConfig {}, table, schema, None)
self.from_config(None, name, EmptyConfig {}, table, schema)
}

fn make_operator(
Expand Down
8 changes: 2 additions & 6 deletions crates/arroyo-connectors/src/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ mod sink;
mod source;

use anyhow::{anyhow, bail, Result};
use arrow::datatypes::DataType;
use arroyo_storage::BackendConfig;
use regex::Regex;
use std::collections::HashMap;
Expand Down Expand Up @@ -115,7 +114,6 @@ impl Connector for FileSystemConnector {
config: Self::ProfileT,
table: Self::TableT,
schema: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let (description, connection_type) = match table.table_type {
TableType::Source { .. } => ("FileSystem".to_string(), ConnectionType::Source),
Expand Down Expand Up @@ -170,7 +168,7 @@ impl Connector for FileSystemConnector {
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
additional_fields: None,
metadata_fields: schema.metadata_fields(),
};

Ok(Connection {
Expand All @@ -190,7 +188,6 @@ impl Connector for FileSystemConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
match options.remove("type") {
Some(t) if t == "source" => {
Expand All @@ -214,13 +211,12 @@ impl Connector for FileSystemConnector {
},
},
schema,
None,
)
}
Some(t) if t == "sink" => {
let table = file_system_sink_from_options(options, schema, CommitStyle::Direct)?;

self.from_config(None, name, EmptyConfig {}, table, schema, None)
self.from_config(None, name, EmptyConfig {}, table, schema)
}
Some(t) => bail!("unknown type: {}", t),
None => bail!("must have type set"),
Expand Down
7 changes: 2 additions & 5 deletions crates/arroyo-connectors/src/fluvio/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::{anyhow, bail};
use arrow::datatypes::DataType;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::connector::{Connection, Connector};
use arroyo_operator::operator::OperatorNode;
Expand Down Expand Up @@ -89,7 +88,6 @@ impl Connector for FluvioConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let endpoint = options.remove("endpoint");
let topic = pull_opt("topic", options)?;
Expand Down Expand Up @@ -118,7 +116,7 @@ impl Connector for FluvioConnector {
type_: table_type,
};

Self::from_config(self, None, name, EmptyConfig {}, table, schema, None)
Self::from_config(self, None, name, EmptyConfig {}, table, schema)
}

fn from_config(
Expand All @@ -128,7 +126,6 @@ impl Connector for FluvioConnector {
config: EmptyConfig,
table: FluvioTable,
schema: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let (typ, desc) = match table.type_ {
TableType::Source { .. } => (
Expand Down Expand Up @@ -157,7 +154,7 @@ impl Connector for FluvioConnector {
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
additional_fields: None,
metadata_fields: schema.metadata_fields(),
};

Ok(Connection {
Expand Down
6 changes: 1 addition & 5 deletions crates/arroyo-connectors/src/impulse/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
mod operator;

use anyhow::{anyhow, bail};
use arrow::datatypes::DataType;
use arroyo_operator::connector::{Connection, Connector};
use arroyo_operator::operator::OperatorNode;
use arroyo_rpc::api_types::connections::FieldType::Primitive;
Expand Down Expand Up @@ -102,7 +101,6 @@ impl Connector for ImpulseConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let event_rate = f64::from_str(&pull_opt("event_rate", options)?)
.map_err(|_| anyhow!("invalid value for event_rate; expected float"))?;
Expand Down Expand Up @@ -136,7 +134,6 @@ impl Connector for ImpulseConnector {
message_count,
},
None,
None,
)
}

Expand All @@ -147,7 +144,6 @@ impl Connector for ImpulseConnector {
config: Self::ProfileT,
table: Self::TableT,
_: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let description = format!(
"{}Impulse<{} eps{}>",
Expand All @@ -170,7 +166,7 @@ impl Connector for ImpulseConnector {
format: None,
bad_data: None,
framing: None,
additional_fields: None,
metadata_fields: vec![],
};

Ok(Connection {
Expand Down
68 changes: 25 additions & 43 deletions crates/arroyo-connectors/src/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{anyhow, bail};
use arrow::datatypes::DataType;
use arroyo_formats::de::ArrowDeserializer;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::connector::Connection;
use arroyo_operator::connector::{Connection, MetadataDef};
use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, TestSourceMessage};
use arroyo_rpc::df::ArroyoSchema;
use arroyo_rpc::formats::{BadData, Format, JsonFormat};
Expand Down Expand Up @@ -189,7 +189,6 @@ impl Connector for KafkaConnector {
config: KafkaConfig,
table: KafkaTable,
schema: Option<&ConnectionSchema>,
metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let (typ, desc) = match table.type_ {
TableType::Source { .. } => (
Expand All @@ -209,21 +208,14 @@ impl Connector for KafkaConnector {
.map(|t| t.to_owned())
.ok_or_else(|| anyhow!("'format' must be set for Kafka connection"))?;

let metadata_fields = metadata_fields.map(|fields| {
fields
.into_iter()
.map(|(k, (v, _))| (k, v))
.collect::<HashMap<String, String>>()
});

let config = OperatorConfig {
connection: serde_json::to_value(config).unwrap(),
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
additional_fields: metadata_fields,
metadata_fields: schema.metadata_fields(),
};

Ok(Connection {
Expand Down Expand Up @@ -316,13 +308,33 @@ impl Connector for KafkaConnector {
}
}

fn metadata_defs(&self) -> &'static [MetadataDef] {
&[
MetadataDef {
name: "offset_id",
data_type: DataType::Int64,
},
MetadataDef {
name: "partition",
data_type: DataType::Int32,
},
MetadataDef {
name: "topic",
data_type: DataType::Utf8,
},
MetadataDef {
name: "timestamp",
data_type: DataType::Int64,
},
]
}

fn from_options(
&self,
name: &str,
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
profile: Option<&ConnectionProfile>,
metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let connection = profile
.map(|p| {
Expand All @@ -334,37 +346,7 @@ impl Connector for KafkaConnector {

let table = Self::table_from_options(options)?;

let allowed_metadata_udf_args: HashMap<&str, DataType> = [
("offset_id", DataType::Int64),
("partition", DataType::Int32),
("topic", DataType::Utf8),
]
.iter()
.cloned()
.collect();

if let Some(fields) = &metadata_fields {
for (field_name, data_type) in fields.values() {
match allowed_metadata_udf_args.get(field_name.as_str()) {
Some(expected_type) => {
if expected_type != data_type {
return Err(anyhow!(
"Invalid datatype for metadata field '{}': expected '{:?}', found '{:?}'",
field_name, expected_type, data_type
));
}
}
None => {
return Err(anyhow!(
"Invalid metadata field name for Kafka connector: '{}'",
field_name
));
}
}
}
}

Self::from_config(self, None, name, connection, table, schema, metadata_fields)
Self::from_config(self, None, name, connection, table, schema)
}

fn make_operator(
Expand Down Expand Up @@ -424,7 +406,7 @@ impl Connector for KafkaConnector {
.unwrap_or(u32::MAX),
)
.unwrap(),
metadata_fields: config.additional_fields,
metadata_fields: config.metadata_fields,
})))
}
TableType::Sink {
Expand Down
Loading

0 comments on commit 4f56baf

Please sign in to comment.