Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-kafka: support schema registry and Avro decoding #2089

Merged
merged 16 commits into from
Nov 15, 2024

Conversation

williamhbaker
Copy link
Member

@williamhbaker williamhbaker commented Oct 25, 2024

Description:

Re-works the capture to support connecting to a Schema registry, which enables parsing Avro messages and discovering collection keys based on an available topic key schema.

The first few commits are a general refactoring of the existing code. The ones after that implement the schema registry, discovery, and message parsing additions. There are a few other improvements thrown in with that, as detailed in the individual commits.

I restructured things pretty comprehensively, so there's not a whole lot left of the original connector. These changes are not backward compatible, but I don't think this will be a real problem considering the existing Kafka captures we have running.

Closes #29
Closes #1211
Closes #2137

Workflow steps:

(How does one use this feature, and how has it changed)

Documentation links affected:

The Kafka source connector documentation needs to be updated to describe the schema registry configuration, and how collections are discovered based on available key schemas.

Notes for reviewers:

(anything that might help someone review this PR)


This change is Reviewable

Some general refactoring of the source-kafka code, which is mostly consolidation
of separate modules that had gotten kind of fragmented.

Along with this, adds support for backfilling bindings by re-structuring the
state checkpoint.

I'll need to go back in and work on better error messages after the schema
registry and Avro support is added, since right now it is mostly going to panic
when things aren't as expected.
Changes the source-kafka tests to use `flowctl preview`, in a similar way as the
native python captures do.
Use the same compose file for integration tests as unit tests, and simplify the
setup of the testing.
…registry

Use the schema registry API to look up registered schemas for topics and
discover appropriate collection keys for the resulting collections.

This initial implementation only works with Avro schemas. Support for JSON
schemas will follow. Protobuf is also an option but that will probably be an
enhancement for a later time.

Decoding Avro documents will also need to be added, and that will come in a
following commit.
Implements capturing Avro encoded messages using schemas retrieved from the
configured schema registry.

The hard work is mostly translating an Avro value into a JSON value in a way
that is maximally useful. The implementation here is a lot like the one in the
Flow parser, but since we are unconstrained with backward-compatibility in the
Kafka capture it includes some enhancements that would be difficult to include
in the parser's version.
Adds support for discovering collection keys when there is a message key schema
in JSON format. Supporting messages encoded with JSON schemas was already
implemented and generally trivial since JSON messages are self-describing, but a
test case was added for that too.
Adds deletion records, where the payload is absent, to the snapshot tests to
verify these work end-to-end.
Capture header key/value pairs and message timestamps as additional metadata.
Adds some additional context to error messages and refactors / cleans-up the
test code to be more maintainable.

Also formalizes punting on schema reference resolution, since I'm not sure how
widely that is used and it would be much easier to implement if we have an
actual example of somebody using it.
As a general standard, deletion events contain only the document key, with an
"op" property set to "d", and these documents should be reduced with a merge
strategy instead of a last-write-wins to allow for "soft" deletions in a
materialization.
* Include the zstd feature so we can capture messages written in zstd
* Correctly discover JSON schemas with `additionalPropertes: false`
* Small updates to the config so that it shows up better in the UI
Using the regular stdout is a pretty huge performance increase, so use that
instead of the async version.
We have to call `poll` to generate an OAuth token for the consumer to use prior
to fetching metadata etc., and the streaming form of the consumer does not seem
to provide a way to do that without blocking.

Maybe there is some other way this could be done, but it was easy enough to
switch (back) to the non-streaming form of the consumer to have a no-op `poll`
call work as it did before. I had originally switched to using the streaming
consumer for unrelated reasons that are no longer relevant, and had kept it that
way since it was slightly more ergonomic, but the BaseConsumer works just fine
too and gets AWS MSK working again.
…istry is not available

We might as well try to parse messages as JSON if they have a schema magic byte,
even if the schema registry isn't configured.
@williamhbaker williamhbaker changed the title source-kafka: add support for schema registry, AVRO, etc. source-kafka: support schema registry and Avro decoding Nov 7, 2024
@williamhbaker williamhbaker added the change:planned This is a planned change label Nov 7, 2024
@williamhbaker williamhbaker marked this pull request as ready for review November 7, 2024 19:34
@williamhbaker williamhbaker requested a review from a team November 7, 2024 19:34
Copy link
Member

@willdonnelly willdonnelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

I read through the code and didn't see any obvious issues, but this is such a large refactoring that my LGTM is still more of a rubber-stamp than I'd prefer.

But since you say this is breaking backwards compatibility anyway, and I see a grand total of one Kafka capture in production that's actually green, I think that's sufficient.

… configuration

* Force a more explicit distinction for using a schema registry vs. not using
  one, and add opt-in clarification that not using a schema registry requires
  that all messages be encoded as JSON and collections will use the generic keys
  of partition of offset values.

* Minor adjustments to error messages for common cases of incorrect auth or
  connection details.
@williamhbaker williamhbaker merged commit c48bf72 into main Nov 15, 2024
51 of 53 checks passed
@williamhbaker williamhbaker deleted the wb/kafka-avro-etc branch November 15, 2024 16:08
williamhbaker added a commit to estuary/flow that referenced this pull request Nov 15, 2024
Updates for estuary/connectors#2089, mostly related to
configuring schema registry on the capture.
williamhbaker added a commit to estuary/flow that referenced this pull request Nov 15, 2024
Updates for estuary/connectors#2089, mostly related to
configuring schema registry on the capture.
williamhbaker added a commit to estuary/flow that referenced this pull request Nov 15, 2024
Updates for estuary/connectors#2089, mostly related to
configuring schema registry on the capture.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
change:planned This is a planned change
Projects
None yet
2 participants