From a1df576ac72808b0fb6cb73eaef15dfb63e5beac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Enrique=20J=2E=20Hern=C3=A1ndez?= Date: Wed, 17 Nov 2021 16:50:30 +0100 Subject: [PATCH] Add -value-avro-schema-id feature To look for the schema using the identifier directly (low level) --- README.md | 9 +++++++++ coder.go | 12 +++++++++++- produce.go | 17 +++++++++++++++++ system_test.go | 14 ++++++++------ testdata/avromessages-with-id.txt | 21 +++++++++++++++++++++ 5 files changed, 66 insertions(+), 7 deletions(-) create mode 100644 testdata/avromessages-with-id.txt diff --git a/README.md b/README.md index db67cd9..6fb08d3 100644 --- a/README.md +++ b/README.md @@ -257,6 +257,15 @@ Not only `TopicNameStrategy` is supported but `TopicRecordNameStrategy` is suppo a schema using `-value-avro-record-name`. This can be used in conjunction with `-value-avro-schema` or get latest version of the schema from the Schema Registry without providing any parameter. +If the globally unique schema identifier is already known, it can be used instead: + +```sh +$ echo '{"value": {"FirstName": "Cristina"}, "key": "id-44"}' | hkt produce -topic actors -registry http://localhost:8081 -value-avro-schema-id 100 +``` + +Produces the record using schema whose identifier is `100`. + + diff --git a/coder.go b/coder.go index 40a5fa4..f570541 100644 --- a/coder.go +++ b/coder.go @@ -31,6 +31,7 @@ func (c *coder) addFlags(flags *flag.FlagSet) { flags.StringVar(&c.registryURL, "registry", "", "The Avro schema registry server URL.") flags.StringVar(&c.avscFile, "value-avro-schema", "", `Path to AVSC file to format the file. If it is set, then -valuecodec is set to "avro"`) flags.StringVar(&c.avroRecordName, "value-avro-record-name", "", "Record name to use when using TopicRecordNameStrategy to find the schema subject in Schema Registry") + flags.Int64Var(&c.avroSchemaID, "value-avro-schema-id", 0, "Use it to select schema to produce Avro formatted schemas.") } // decoderForType returns a function to decode key or value depending of the expected format defined in typ @@ -75,9 +76,9 @@ func (c *coder) makeAvroDecoder(keyOrValue string) func(m json.RawMessage) ([]by } enc := c.avroRegistry.Encoder(subject) + ctx := context.Background() if c.avroSchemaID == 0 { - ctx := context.Background() if c.avroSchema == nil { sch, err := c.avroRegistry.Schema(ctx, subject, "latest") if err != nil { @@ -100,6 +101,15 @@ func (c *coder) makeAvroDecoder(keyOrValue string) func(m json.RawMessage) ([]by c.avroSchemaID = id } } + if c.avroSchema == nil { + // only got the schemaID, gather from the registry + dec := c.avroRegistry.Decoder() + sch, err := dec.SchemaForID(ctx, c.avroSchemaID) + if err != nil { + return nil, err + } + c.avroSchema = sch + } // Canonicalize the schema to remove default values and logical types // to work around https://github.com/linkedin/goavro/issues/198 diff --git a/produce.go b/produce.go index 3f5404d..418991b 100644 --- a/produce.go +++ b/produce.go @@ -97,6 +97,17 @@ func (cmd *produceCmd) run(args []string) error { cmd.partitioner = partitioner var err error + if cmd.avroSchemaID != 0 { + // Override -valuecodec flag + cmd.valueCodecType = "avro" + if cmd.avscFile != "" { + return fmt.Errorf("cannot use -value-avro-schema with -value-avro-schema-id") + } + if cmd.avroRecordName != "" { + return fmt.Errorf("cannot use -value-avro-record-name with -value-avro-schema-id") + } + } + if cmd.avscFile != "" { // Override -valuecodec flag cmd.valueCodecType = "avro" @@ -363,4 +374,10 @@ Running the following command: $ echo '{"key": "id-44", "value": {"Band": "cloud nothings"}}' | hkt produce -topic topic-3 -registry http://localhost:8081 -value-avro-schema band.avsc In order to know the subject, "-value-avro-record-name" can be also used. + +3. Use the globally unique schema identifier + +It uses the schema identifier defined in "-value-avro-schema-id". (Warning: It does not perform any check against the registry) + + $ echo '{"key": "id-45", "value": {"Field": 2}}' | hkt produce -topic topic-1 -registry http://localhost:8081 -value-avro-schema-id 109 `, ENV_BROKERS, ENV_REGISTRY) diff --git a/system_test.go b/system_test.go index f3de968..e3795fd 100644 --- a/system_test.go +++ b/system_test.go @@ -54,10 +54,12 @@ func TestSystem(t *testing.T) { Setup: func(e *testscript.Env) error { topic := randomString(6) recordName := "alt" - deregisterFn := registerSchemas(t, topic, recordName) + schemaIDs, deregisterFn := registerSchemas(t, topic, recordName) e.Vars = append(e.Vars, "topic="+topic, "recordName="+recordName, + fmt.Sprintf("schemaID=%d", schemaIDs[0]), + fmt.Sprintf("recordSchemaID=%d", schemaIDs[1]), ENV_BROKERS+"="+testBrokerAddr, ENV_REGISTRY+"="+testRegistryAddr, "now="+time.Now().UTC().Format(time.RFC3339), @@ -85,8 +87,8 @@ func randomString(length int) string { // registerSchemas registers to testRegistry two schemas: // 1. using ${topic}-value as subject -> TopicNameStrategy // 2. using ${topic}-${recordName} as subject -> TopicRecordNameStrategy -// returns the function to call when the test ends. -func registerSchemas(t *testing.T, topic, recordName string) func() { +// returns the schema identifiers and the function to call when the test ends. +func registerSchemas(t *testing.T, topic, recordName string) ([]int64, func()) { c := qt.New(t) ctx := context.Background() @@ -103,7 +105,7 @@ func registerSchemas(t *testing.T, topic, recordName string) func() { c.Check(err, qt.IsNil) subject := topic + "-value" - _, err = reg.Register(ctx, subject, typ) + id, err := reg.Register(ctx, subject, typ) c.Check(err, qt.IsNil) type AltR struct { @@ -114,10 +116,10 @@ func registerSchemas(t *testing.T, topic, recordName string) func() { c.Check(err, qt.IsNil) altSubject := topic + "-" + recordName - _, err = reg.Register(ctx, altSubject, typ) + altID, err := reg.Register(ctx, altSubject, typ) c.Check(err, qt.IsNil) - return func() { + return []int64{id, altID}, func() { err := reg.DeleteSubject(ctx, subject) c.Check(err, qt.IsNil) err = reg.DeleteSubject(ctx, altSubject) diff --git a/testdata/avromessages-with-id.txt b/testdata/avromessages-with-id.txt new file mode 100644 index 0000000..658d09e --- /dev/null +++ b/testdata/avromessages-with-id.txt @@ -0,0 +1,21 @@ +# Schemas are registered in system_test.go +kt admin -createtopic $topic -topicdetail topic-detail.json + +stdin produce-schema-1-stdin.json +kt produce -topic $topic -value-avro-schema-id $schemaID + +stdin produce-schema-2-stdin.json +kt produce -topic $topic -value-avro-schema-id $recordSchemaID + +kt consume -valuecodec avro $topic +cmp stdout consume-stdout.json + +-- topic-detail.json -- +{"NumPartitions": 1, "ReplicationFactor": 1} +-- produce-schema-1-stdin.json -- +{"key":"k1","value":{"Foo":1},"time":"2021-11-16T01:01:01Z"} +-- produce-schema-2-stdin.json -- +{"key":"k2","value":{"Bar":2.122121},"time":"2021-11-17T01:01:02Z"} +-- consume-stdout.json -- +{"partition":0,"offset":0,"key":"k1","value":{"Foo":1},"time":"2021-11-16T01:01:01Z"} +{"partition":0,"offset":1,"key":"k2","value":{"Bar":2.122121},"time":"2021-11-17T01:01:02Z"}