Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial support for Avro formats #386

Merged
merged 12 commits into from
Oct 31, 2023
Merged

Initial support for Avro formats #386

merged 12 commits into from
Oct 31, 2023

Conversation

mwylde
Copy link
Member

@mwylde mwylde commented Oct 30, 2023

This PR adds initial support for the Avro format, initially only for deserialization in sources:

CREATE TABLE pizza_orders_avro (
    value TEXT
) WITH (
    connector = 'kafka',
    bootstrap_servers = 'localhost:9092',
    topic = 'pizza_orders_avro',
    type = 'source',
    format = 'avro',
    'schema_registry.endpoint' = 'http://localhost:8081',
    'avro.confluent_schema_registry' = 'true'
);

Initially supported:

  • Reading Avro data with either an embedded schema, a pre-registered schema, or with schemas fetched from Confluent Schema Repository
  • Struct generation from a subset of Avro features (records, primitives, and unions of [T, null]
  • "Unstructured" avro (via avro-to-json conversion)
  • Schema evolution (via confluent schema registry or pre-registered schema)

Followup work for more complete avro support will include:

  • Serialization (sink) support
  • Complete union/array support

The current implementation is pretty inefficient. In order to support avro records with features that we don't currently support statically, currently avro is parsed, then converted to JSON, then deserialized into the struct via our normal JSON deserialization pathway. This allows us to utilize the existing RawJson approach for unsupported features.

This PR also reworks how we interact with Confluent Schema Registry. The schema registry is now configured as part of the Kafka connection, so as to not require redefining it each time you create a table. We also now to the schema resolution as part of connection table creation, rather than having the frontend request the schema and then fill it in.

Copy link
Contributor

@jacksonrnewhouse jacksonrnewhouse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple of comments

}

/// A schema resolver that return errors when schemas are requests; this is intended
/// to be used when schemas as embedded into the message and we do not expect to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a wording issue here.


#[async_trait]
pub trait SchemaResolver: Send {
async fn resolve_schema(&self, id: u32) -> Result<Option<String>, String>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use anyhow errors?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the errors are meant to be used as the message in a UserError, which takes a string. You could rely on anyhow::Error::to_string, but that includes various formatting logic that I don't want; the SchemaResolver should be responsible for producing a good String error message that can be shown to the user.

@mwylde mwylde enabled auto-merge (squash) October 31, 2023 17:58
@mwylde mwylde merged commit 5fcec8c into master Oct 31, 2023
8 checks passed
zh4ngx pushed a commit to StrikeTeamOne/arroyo that referenced this pull request Nov 16, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants