Skip to content

Commit

Permalink
Merge pull request #40 from heetch/producers-using-schema-id
Browse files Browse the repository at this point in the history
Add -value-avro-schema-id to produce command
  • Loading branch information
sixstone-qq authored Nov 19, 2021
2 parents d7131b4 + a1df576 commit 5423271
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 8 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,15 @@ Not only `TopicNameStrategy` is supported but `TopicRecordNameStrategy` is suppo
a schema using `-value-avro-record-name`. This can be used in conjunction with `-value-avro-schema` or get
latest version of the schema from the Schema Registry without providing any parameter.

If the globally unique schema identifier is already known, it can be used instead:

```sh
$ echo '{"value": {"FirstName": "Cristina"}, "key": "id-44"}' | hkt produce -topic actors -registry http://localhost:8081 -value-avro-schema-id 100
```

Produces the record using schema whose identifier is `100`.


</details>


Expand Down
12 changes: 11 additions & 1 deletion coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (c *coder) addFlags(flags *flag.FlagSet) {
flags.StringVar(&c.registryURL, "registry", "", "The Avro schema registry server URL.")
flags.StringVar(&c.avscFile, "value-avro-schema", "", `Path to AVSC file to format the file. If it is set, then -valuecodec is set to "avro"`)
flags.StringVar(&c.avroRecordName, "value-avro-record-name", "", "Record name to use when using TopicRecordNameStrategy to find the schema subject in Schema Registry")
flags.Int64Var(&c.avroSchemaID, "value-avro-schema-id", 0, "Use it to select schema to produce Avro formatted schemas.")
}

// decoderForType returns a function to decode key or value depending of the expected format defined in typ
Expand Down Expand Up @@ -75,9 +76,9 @@ func (c *coder) makeAvroDecoder(keyOrValue string) func(m json.RawMessage) ([]by
}

enc := c.avroRegistry.Encoder(subject)
ctx := context.Background()

if c.avroSchemaID == 0 {
ctx := context.Background()
if c.avroSchema == nil {
sch, err := c.avroRegistry.Schema(ctx, subject, "latest")
if err != nil {
Expand All @@ -100,6 +101,15 @@ func (c *coder) makeAvroDecoder(keyOrValue string) func(m json.RawMessage) ([]by
c.avroSchemaID = id
}
}
if c.avroSchema == nil {
// only got the schemaID, gather from the registry
dec := c.avroRegistry.Decoder()
sch, err := dec.SchemaForID(ctx, c.avroSchemaID)
if err != nil {
return nil, err
}
c.avroSchema = sch
}

// Canonicalize the schema to remove default values and logical types
// to work around https://github.com/linkedin/goavro/issues/198
Expand Down
19 changes: 18 additions & 1 deletion produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ func (cmd *produceCmd) run(args []string) error {
cmd.partitioner = partitioner
var err error

if cmd.avroSchemaID != 0 {
// Override -valuecodec flag
cmd.valueCodecType = "avro"
if cmd.avscFile != "" {
return fmt.Errorf("cannot use -value-avro-schema with -value-avro-schema-id")
}
if cmd.avroRecordName != "" {
return fmt.Errorf("cannot use -value-avro-record-name with -value-avro-schema-id")
}
}

if cmd.avscFile != "" {
// Override -valuecodec flag
cmd.valueCodecType = "avro"
Expand Down Expand Up @@ -326,7 +337,7 @@ AVRO
It can produce messages using Avro format provided -registry flag and -valuecodec avro.
In order to know which schema to use, there are several ways:
In order to know which schema to use, there are several ways:
1. Kafka Schema Registry TopicNameStrategy
Expand Down Expand Up @@ -363,4 +374,10 @@ Running the following command:
$ echo '{"key": "id-44", "value": {"Band": "cloud nothings"}}' | hkt produce -topic topic-3 -registry http://localhost:8081 -value-avro-schema band.avsc
In order to know the subject, "-value-avro-record-name" can be also used.
3. Use the globally unique schema identifier
It uses the schema identifier defined in "-value-avro-schema-id". (Warning: It does not perform any check against the registry)
$ echo '{"key": "id-45", "value": {"Field": 2}}' | hkt produce -topic topic-1 -registry http://localhost:8081 -value-avro-schema-id 109
`, ENV_BROKERS, ENV_REGISTRY)
14 changes: 8 additions & 6 deletions system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ func TestSystem(t *testing.T) {
Setup: func(e *testscript.Env) error {
topic := randomString(6)
recordName := "alt"
deregisterFn := registerSchemas(t, topic, recordName)
schemaIDs, deregisterFn := registerSchemas(t, topic, recordName)
e.Vars = append(e.Vars,
"topic="+topic,
"recordName="+recordName,
fmt.Sprintf("schemaID=%d", schemaIDs[0]),
fmt.Sprintf("recordSchemaID=%d", schemaIDs[1]),
ENV_BROKERS+"="+testBrokerAddr,
ENV_REGISTRY+"="+testRegistryAddr,
"now="+time.Now().UTC().Format(time.RFC3339),
Expand Down Expand Up @@ -85,8 +87,8 @@ func randomString(length int) string {
// registerSchemas registers to testRegistry two schemas:
// 1. using ${topic}-value as subject -> TopicNameStrategy
// 2. using ${topic}-${recordName} as subject -> TopicRecordNameStrategy
// returns the function to call when the test ends.
func registerSchemas(t *testing.T, topic, recordName string) func() {
// returns the schema identifiers and the function to call when the test ends.
func registerSchemas(t *testing.T, topic, recordName string) ([]int64, func()) {
c := qt.New(t)
ctx := context.Background()

Expand All @@ -103,7 +105,7 @@ func registerSchemas(t *testing.T, topic, recordName string) func() {
c.Check(err, qt.IsNil)

subject := topic + "-value"
_, err = reg.Register(ctx, subject, typ)
id, err := reg.Register(ctx, subject, typ)
c.Check(err, qt.IsNil)

type AltR struct {
Expand All @@ -114,10 +116,10 @@ func registerSchemas(t *testing.T, topic, recordName string) func() {
c.Check(err, qt.IsNil)

altSubject := topic + "-" + recordName
_, err = reg.Register(ctx, altSubject, typ)
altID, err := reg.Register(ctx, altSubject, typ)
c.Check(err, qt.IsNil)

return func() {
return []int64{id, altID}, func() {
err := reg.DeleteSubject(ctx, subject)
c.Check(err, qt.IsNil)
err = reg.DeleteSubject(ctx, altSubject)
Expand Down
21 changes: 21 additions & 0 deletions testdata/avromessages-with-id.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Schemas are registered in system_test.go
kt admin -createtopic $topic -topicdetail topic-detail.json

stdin produce-schema-1-stdin.json
kt produce -topic $topic -value-avro-schema-id $schemaID

stdin produce-schema-2-stdin.json
kt produce -topic $topic -value-avro-schema-id $recordSchemaID

kt consume -valuecodec avro $topic
cmp stdout consume-stdout.json

-- topic-detail.json --
{"NumPartitions": 1, "ReplicationFactor": 1}
-- produce-schema-1-stdin.json --
{"key":"k1","value":{"Foo":1},"time":"2021-11-16T01:01:01Z"}
-- produce-schema-2-stdin.json --
{"key":"k2","value":{"Bar":2.122121},"time":"2021-11-17T01:01:02Z"}
-- consume-stdout.json --
{"partition":0,"offset":0,"key":"k1","value":{"Foo":1},"time":"2021-11-16T01:01:01Z"}
{"partition":0,"offset":1,"key":"k2","value":{"Bar":2.122121},"time":"2021-11-17T01:01:02Z"}

0 comments on commit 5423271

Please sign in to comment.