Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Oct 31, 2024
1 parent 066fbe8 commit bbac9e0
Show file tree
Hide file tree
Showing 36 changed files with 332 additions and 278 deletions.
1 change: 0 additions & 1 deletion crates/arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ async fn compile_sql<'a>(
.unwrap_or(json!({})),
&table.config,
Some(&table.schema),
None,
)
.map_err(log_and_map)?;

Expand Down
6 changes: 2 additions & 4 deletions crates/arroyo-connectors/src/blackhole/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ impl Connector for BlackholeConnector {
_options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
self.from_config(None, name, EmptyConfig {}, EmptyConfig {}, schema, None)
self.from_config(None, name, EmptyConfig {}, EmptyConfig {}, schema)
}

fn from_config(
Expand All @@ -91,7 +90,6 @@ impl Connector for BlackholeConnector {
config: Self::ProfileT,
table: Self::TableT,
s: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let description = "Blackhole".to_string();

Expand All @@ -102,7 +100,7 @@ impl Connector for BlackholeConnector {
format: None,
bad_data: None,
framing: None,
additional_fields: None,
metadata_fields: vec![],
};

Ok(Connection {
Expand Down
6 changes: 2 additions & 4 deletions crates/arroyo-connectors/src/confluent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ impl Connector for ConfluentConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let connection = profile
.map(|p| {
Expand All @@ -174,7 +173,7 @@ impl Connector for ConfluentConnector {

let table = KafkaConnector::table_from_options(options)?;

self.from_config(None, name, connection, table, schema, None)
self.from_config(None, name, connection, table, schema)
}

fn from_config(
Expand All @@ -184,12 +183,11 @@ impl Connector for ConfluentConnector {
config: Self::ProfileT,
mut table: Self::TableT,
schema: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
table
.client_configs
.insert("client.id".to_string(), CLIENT_ID.to_string());
KafkaConnector {}.from_config(id, name, config.into(), table, schema, None)
KafkaConnector {}.from_config(id, name, config.into(), table, schema)
}

fn make_operator(
Expand Down
6 changes: 2 additions & 4 deletions crates/arroyo-connectors/src/filesystem/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ impl Connector for DeltaLakeConnector {
config: Self::ProfileT,
table: Self::TableT,
schema: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<arroyo_operator::connector::Connection> {
let TableType::Sink {
write_path,
Expand Down Expand Up @@ -125,7 +124,7 @@ impl Connector for DeltaLakeConnector {
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
additional_fields: None,
metadata_fields: schema.metadata_fields(),
};

Ok(Connection {
Expand All @@ -145,11 +144,10 @@ impl Connector for DeltaLakeConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let table = file_system_sink_from_options(options, schema, CommitStyle::DeltaLake)?;

self.from_config(None, name, EmptyConfig {}, table, schema, None)
self.from_config(None, name, EmptyConfig {}, table, schema)
}

fn make_operator(
Expand Down
7 changes: 2 additions & 5 deletions crates/arroyo-connectors/src/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ impl Connector for FileSystemConnector {
config: Self::ProfileT,
table: Self::TableT,
schema: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let (description, connection_type) = match table.table_type {
TableType::Source { .. } => ("FileSystem".to_string(), ConnectionType::Source),
Expand Down Expand Up @@ -170,7 +169,7 @@ impl Connector for FileSystemConnector {
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
additional_fields: None,
metadata_fields: schema.metadata_fields(),
};

Ok(Connection {
Expand All @@ -190,7 +189,6 @@ impl Connector for FileSystemConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
match options.remove("type") {
Some(t) if t == "source" => {
Expand All @@ -214,13 +212,12 @@ impl Connector for FileSystemConnector {
},
},
schema,
None,
)
}
Some(t) if t == "sink" => {
let table = file_system_sink_from_options(options, schema, CommitStyle::Direct)?;

self.from_config(None, name, EmptyConfig {}, table, schema, None)
self.from_config(None, name, EmptyConfig {}, table, schema)
}
Some(t) => bail!("unknown type: {}", t),
None => bail!("must have type set"),
Expand Down
6 changes: 2 additions & 4 deletions crates/arroyo-connectors/src/fluvio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ impl Connector for FluvioConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let endpoint = options.remove("endpoint");
let topic = pull_opt("topic", options)?;
Expand Down Expand Up @@ -118,7 +117,7 @@ impl Connector for FluvioConnector {
type_: table_type,
};

Self::from_config(self, None, name, EmptyConfig {}, table, schema, None)
Self::from_config(self, None, name, EmptyConfig {}, table, schema)
}

fn from_config(
Expand All @@ -128,7 +127,6 @@ impl Connector for FluvioConnector {
config: EmptyConfig,
table: FluvioTable,
schema: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let (typ, desc) = match table.type_ {
TableType::Source { .. } => (
Expand Down Expand Up @@ -157,7 +155,7 @@ impl Connector for FluvioConnector {
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
additional_fields: None,
metadata_fields: schema.metadata_fields(),
};

Ok(Connection {
Expand Down
5 changes: 1 addition & 4 deletions crates/arroyo-connectors/src/impulse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ impl Connector for ImpulseConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let event_rate = f64::from_str(&pull_opt("event_rate", options)?)
.map_err(|_| anyhow!("invalid value for event_rate; expected float"))?;
Expand Down Expand Up @@ -136,7 +135,6 @@ impl Connector for ImpulseConnector {
message_count,
},
None,
None,
)
}

Expand All @@ -147,7 +145,6 @@ impl Connector for ImpulseConnector {
config: Self::ProfileT,
table: Self::TableT,
_: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let description = format!(
"{}Impulse<{} eps{}>",
Expand All @@ -170,7 +167,7 @@ impl Connector for ImpulseConnector {
format: None,
bad_data: None,
framing: None,
additional_fields: None,
metadata_fields: vec![],
};

Ok(Connection {
Expand Down
70 changes: 26 additions & 44 deletions crates/arroyo-connectors/src/kafka/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use anyhow::{anyhow, bail};
use arrow::datatypes::DataType;
use arrow::datatypes::{DataType};
use arroyo_formats::de::ArrowDeserializer;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::connector::Connection;
use arroyo_operator::connector::{Connection, MetadataDef};
use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, TestSourceMessage};
use arroyo_rpc::df::ArroyoSchema;
use arroyo_rpc::formats::{BadData, Format, JsonFormat};
Expand Down Expand Up @@ -189,7 +189,6 @@ impl Connector for KafkaConnector {
config: KafkaConfig,
table: KafkaTable,
schema: Option<&ConnectionSchema>,
metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let (typ, desc) = match table.type_ {
TableType::Source { .. } => (
Expand All @@ -209,21 +208,14 @@ impl Connector for KafkaConnector {
.map(|t| t.to_owned())
.ok_or_else(|| anyhow!("'format' must be set for Kafka connection"))?;

let metadata_fields = metadata_fields.map(|fields| {
fields
.into_iter()
.map(|(k, (v, _))| (k, v))
.collect::<HashMap<String, String>>()
});

let config = OperatorConfig {
connection: serde_json::to_value(config).unwrap(),
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
additional_fields: metadata_fields,
metadata_fields: schema.metadata_fields()
};

Ok(Connection {
Expand Down Expand Up @@ -316,13 +308,33 @@ impl Connector for KafkaConnector {
}
}

fn metadata_defs(&self) -> &'static [MetadataDef] {
&[
MetadataDef {
name: "offset_id",
data_type: DataType::Int64,
},
MetadataDef {
name: "partition",
data_type: DataType::Int32,
},
MetadataDef {
name: "topic",
data_type: DataType::Utf8,
},
MetadataDef {
name: "timestamp",
data_type: DataType::Int64,
}
]
}

fn from_options(
&self,
name: &str,
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
profile: Option<&ConnectionProfile>,
metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let connection = profile
.map(|p| {
Expand All @@ -334,37 +346,7 @@ impl Connector for KafkaConnector {

let table = Self::table_from_options(options)?;

let allowed_metadata_udf_args: HashMap<&str, DataType> = [
("offset_id", DataType::Int64),
("partition", DataType::Int32),
("topic", DataType::Utf8),
]
.iter()
.cloned()
.collect();

if let Some(fields) = &metadata_fields {
for (field_name, data_type) in fields.values() {
match allowed_metadata_udf_args.get(field_name.as_str()) {
Some(expected_type) => {
if expected_type != data_type {
return Err(anyhow!(
"Invalid datatype for metadata field '{}': expected '{:?}', found '{:?}'",
field_name, expected_type, data_type
));
}
}
None => {
return Err(anyhow!(
"Invalid metadata field name for Kafka connector: '{}'",
field_name
));
}
}
}
}

Self::from_config(self, None, name, connection, table, schema, metadata_fields)
Self::from_config(self, None, name, connection, table, schema)
}

fn make_operator(
Expand Down Expand Up @@ -424,7 +406,7 @@ impl Connector for KafkaConnector {
.unwrap_or(u32::MAX),
)
.unwrap(),
metadata_fields: config.additional_fields,
metadata_fields: config.metadata_fields,
})))
}
TableType::Sink {
Expand Down
34 changes: 15 additions & 19 deletions crates/arroyo-connectors/src/kafka/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use arroyo_formats::de::FieldValueType;
use arroyo_rpc::formats::{BadData, Format, Framing};
use arroyo_rpc::grpc::rpc::TableConfig;
use arroyo_rpc::schema_resolver::SchemaResolver;
use arroyo_rpc::{grpc::rpc::StopMode, ControlMessage, ControlResp};
use arroyo_rpc::{grpc::rpc::StopMode, ControlMessage, ControlResp, MetadataField};

use arroyo_operator::context::ArrowContext;
use arroyo_operator::operator::SourceOperator;
Expand Down Expand Up @@ -36,7 +36,7 @@ pub struct KafkaSourceFunc {
pub schema_resolver: Option<Arc<dyn SchemaResolver + Sync>>,
pub client_configs: HashMap<String, String>,
pub messages_per_second: NonZeroU32,
pub metadata_fields: Option<HashMap<String, String>>,
pub metadata_fields: Vec<MetadataField>,
}

#[derive(Copy, Clone, Debug, Encode, Decode, PartialEq, PartialOrd)]
Expand Down Expand Up @@ -169,7 +169,7 @@ impl KafkaSourceFunc {

let mut flush_ticker = tokio::time::interval(Duration::from_millis(50));
flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);

loop {
select! {
message = consumer.recv() => {
Expand All @@ -179,30 +179,26 @@ impl KafkaSourceFunc {
let timestamp = msg.timestamp().to_millis()
.ok_or_else(|| UserError::new("Failed to read timestamp from Kafka record",
"The message read from Kafka did not contain a message timestamp"))?;

let topic = msg.topic();

let connector_metadata = if let Some(metadata_fields) = &self.metadata_fields {
let connector_metadata = if !self.metadata_fields.is_empty() {
let mut connector_metadata = HashMap::new();
for (key, value) in metadata_fields {
match value.as_str() {
"offset_id" => {
connector_metadata.insert(key, FieldValueType::Int64(msg.offset()));
}
"partition" => {
connector_metadata.insert(key, FieldValueType::Int32(msg.partition()));
}
"topic" => {
connector_metadata.insert(key, FieldValueType::String(&self.topic));
}
_ => {}
}
for f in &self.metadata_fields {
connector_metadata.insert(&f.field_name, match f.key.as_str() {
"offset_id" => FieldValueType::Int64(msg.offset()),
"partition" => FieldValueType::Int32(msg.partition()),
"topic" => FieldValueType::String(topic),
"timestamp" => FieldValueType::Int64(timestamp),
k => unreachable!("Invalid metadata key '{}'", k),
});
}
Some(connector_metadata)
} else {
None
};


ctx.deserialize_slice(v, from_millis(timestamp as u64), connector_metadata).await?;
ctx.deserialize_slice(v, from_millis(timestamp.max(0) as u64), connector_metadata.as_ref()).await?;


if ctx.should_flush() {
Expand Down
Loading

0 comments on commit bbac9e0

Please sign in to comment.