Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for writing Avro data #422

Merged
merged 12 commits into from
Nov 27, 2023
Merged

Add support for writing Avro data #422

merged 12 commits into from
Nov 27, 2023

Conversation

mwylde
Copy link
Member

@mwylde mwylde commented Nov 27, 2023

This PR adds support for serializing data as avro in sinks, following on #386 which added support for avro deserialization in sources.

Users can now enable avro in sinks like this:

 create table kafka  with (                                                   
     connector = 'kafka',                                 
     type = 'sink',                                       
     bootstrap_servers = 'localhost:9092',                
     'schema_registry.endpoint' = 'http://localhost:8081',
     format = 'avro',                                     
     'avro.confluent_schema_registry' = 'true',           
     'topic' = 'sink_avr3'                               
 );                                                       
                                                          
 insert into kafka                                        
 select * from nexmark;

The avro.confluent_schema_registry is optional; if set, we will attempt to register the schema in the avro schema registry. If it's not set, we will include the schema in each message (writing an entire avro header for each piece of data).

This work involved a number of steps, including codegen to generate avro serialization functions, creation of avro schemas from arrow schemas, and adding the ability to register avro schemas with confluent schema registry.

One challenging design decision was to figure out where in our system we should handle the schema registration. Typically setup like this is performed in an operator (often restricted to the operator with index 0), but that was not obviously best in this case one for several reasons:

  • We really only want to do this once, not every time the job is run
  • In order to serialize data for confluent schema registry, we need to know the schema id. That means that we'd need some amount of coordination between the operators.
  • There may be failures when registering the schema (for example, it will fail if the new schema is incompatible with the existing one) which would ideally happen earlier than runtime

For these reasons, I opted to do this in the api during pipeline submisison. That allows us to register it once, embed the id in the format for all subtasks to consume, and gives us the ability to give errors messages back to the user at job submission time.

This required some moving around of code so that both the api and worker could have access to it. I decided to introduce a new module—arroyo-formats—for all of the format related code, so it could easily be included in all of the services that need it.

One follow on from this work is using existing schemas for topics that already have a predefined schema in the registry, however that will require supporting more of the avro schema spec for struct generation (like arrays).

.fields
.iter()
.map(|f| {
let name = AvroFormat::sanitize_field(&match f.alias.as_ref() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the constraints on names here? the raw f.name can have math characters, e.g. *. Should we use the field_name which already does _ replacement on non-alpha numeric characters?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed by moving this logic into AvroFormat::sanitize_field (I can't use field_name(), as this also needs to be called on raw arrow Fields that don't have our extra methods).

@mwylde mwylde enabled auto-merge (squash) November 27, 2023 19:06
@mwylde mwylde disabled auto-merge November 27, 2023 19:20
@mwylde mwylde force-pushed the avro_serialization branch 2 times, most recently from e5814eb to 39061e1 Compare November 27, 2023 20:15
@mwylde mwylde merged commit ed2bc75 into master Nov 27, 2023
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants