Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support TopicRecordNameStrategy #1342

Draft
wants to merge 15 commits into
base: master
Choose a base branch
from

Conversation

manuelarte
Copy link

@manuelarte manuelarte commented Nov 20, 2024

ok, there are a lot of changes in this PR. I will try to summarize what I changed:

  • I changed how internally the Serializer/Deserializer is created, to accept a SubjectNameStrategy interface, that right now is implemented by two structs:
    • TopicNameStrategy
    • TopicRecordNameStrategy
  • That interface implements how to get the subject, and that's used to upload/get the schema(s) for the topic.
  • I Added 2 examples, avro producer topicrecordnamestrategy and avro consumer topicrecordnamestrategy, and they are tested against a Kotlin producer/consumer also using TopicRecordNameStrategy (https://github.com/manuelarte/kafka-kotlin-multiple-schemas-one-topic). So I tested:
    • Go Producer -> Go Consumer
    • Go Producer -> Kotlin Consumer
    • Kotlin Producer (2 different messages) -> Go Consuming (2 different messages)

I am, more or less, happy on how the serialization works, but I am not so happy on how the deserialization works.
For the deserialization to work I need the fully qualified name (-), and I need to extract that from the schemaregistry_client.SchemaInfo, and I could not thing of a better idea now than, parsing the values and check what type of schema type is (AVRO, JSON, PROTOBUF), and then build it based on the field values.
Also, because you can have more than one message, I don't use DeserializeInto right away, but first I retrieve the schema, then I compare that schema with the possible messages for that topic

    schemaInfo, err := deser.GetSchema(*e.TopicPartition.Topic, e.Value, "AVRO")
    if err != nil {
	fmt.Printf("Failed to retrieve schema: %s\n", err)
    }
    schemaInfo.SchemaType = "AVRO"
    switch schemaInfo.Name() {
        case User{}.SchemaName():
            value := User{}
	    err = deser.DeserializeInto(*e.TopicPartition.Topic, e.Value, &value)
	    ...
        case UserFavoriteColorUpdated{}.SchemaName():
           value := UserFavoriteColorUpdated{}
           err = deser.DeserializeInto(*e.TopicPartition.Topic, e.Value, &value)
    ...

IMHO this PR is already a bit big and it's still not covering more cases like, TopicRecordName for JSON or PROTOBUF, but I think it's a good start to get feedback and possible improvements.

PS: in case this pr is too big, I could split it in several isolared steps, starting with refactoring the ConfigureSerializer method.

@manuelarte manuelarte requested review from a team as code owners November 20, 2024 16:39
@confluent-cla-assistant
Copy link

❌ Error getting contributor login(s).
Please ensure the email address associated with this commit is [added to your Github account].(https://docs.github.com/en/pull-requests/committing-changes-to-your-project/troubleshooting-commits/why-are-my-commits-linked-to-the-wrong-user#commits-are-not-linked-to-any-user)

@manuelarte manuelarte marked this pull request as draft November 28, 2024 20:29
@manuelarte
Copy link
Author

I put this PR in draft because I am looking for feedback. It would be amazing if I get it since I spent some time making TopicRecordName strategy to work

@rayokota
Copy link
Member

rayokota commented Dec 9, 2024

I put this PR in draft because I am looking for feedback. It would be amazing if I get it since I spent some time making TopicRecordName strategy to work

Thanks @manuelarte . I will try to take a look, but can't promise as to when since we're in crunch time before the holidays.

@manuelarte
Copy link
Author

manuelarte commented Dec 9, 2024

Thanks @rayokota . I completely understand. As mentioned before, I think this pr could be spit in several smaller ones to facilitate the functionality of supporting TopicRecordName strategy.

@ekazakas
Copy link

ekazakas commented Dec 13, 2024

I think it might need a few improvements. There is

type SerializerConfig struct {
	// UseSchemaID specifies a schema ID to use during serialization
	UseSchemaID int
	// UseLatestVersion specifies whether to use the latest schema version during serialization
	UseLatestVersion bool
	// UseLatestWithMetadata specifies whether to use the latest schema with metadata during serialization
	UseLatestWithMetadata map[string]string
	// RuleConfig specifies configuration options to the rules
	RuleConfig map[string]string
}

Maybe, for TopicRecordNameStrategy the UseSchemaID,UseLatestVersion and UseLatestWithMetadata do not make sense anymore, because if the user will have 2 different schema IDs, he will be unable to set them per record. Maybe this should be put as some sort of "hint" struct for Serialize(...) function? Maybe it would be better to put the SerializerConfig as a parameter on Serialize(...) function rather than globally for the whole serializer?

@ekazakas
Copy link

ekazakas commented Dec 13, 2024

I've made PR that implements a more fine grained control when serializing according to comments above:
#1365

@manuelarte
Copy link
Author

I think it might need a few improvements. There is

type SerializerConfig struct {
	// UseSchemaID specifies a schema ID to use during serialization
	UseSchemaID int
	// UseLatestVersion specifies whether to use the latest schema version during serialization
	UseLatestVersion bool
	// UseLatestWithMetadata specifies whether to use the latest schema with metadata during serialization
	UseLatestWithMetadata map[string]string
	// RuleConfig specifies configuration options to the rules
	RuleConfig map[string]string
}

Maybe, for TopicRecordNameStrategy the UseSchemaID,UseLatestVersion and UseLatestWithMetadata do not make sense anymore, because if the user will have 2 different schema IDs, he will be unable to set them per record. Maybe this should be put as some sort of "hint" struct for Serialize(...) function? Maybe it would be better to put the SerializerConfig as a parameter on Serialize(...) function rather than globally for the whole serializer?

Thanks @ekazakas for your comment. Did you get any feedback about the serialisation hint?

@manuelarte
Copy link
Author

Hi @rayokota , did you guys have time to check this PR?

@ekazakas
Copy link

Thanks @ekazakas for your comment. Did you get any feedback about the serialisation hint?

None so far, it would be nice if someone could check my PRs as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants