From 9fba04893bb7885d95d0e74338ea3ea01ee05ca1 Mon Sep 17 00:00:00 2001 From: Laurent Broudoux Date: Wed, 28 Sep 2022 21:24:42 +0200 Subject: [PATCH] feat: add schema registry infos to Kafka binding (#115) * feat: Add schema registry infos to Kafka binding * feat: add schema registry infos to Kafka binding - update to latest spec * feat: Move schema encoding to message and Add topic, partitions and replicas at the channel level * typo Co-authored-by: Khuda Dad Nomani <32505158+KhudaDad414@users.noreply.github.com> --- kafka/README.md | 95 ++++++++++++++++++++++++++----- kafka/json_schemas/channel.json | 46 +++++++++++++++ kafka/json_schemas/message.json | 29 ++++++++-- kafka/json_schemas/operation.json | 10 ++-- kafka/json_schemas/server.json | 37 ++++++++++++ 5 files changed, 193 insertions(+), 24 deletions(-) create mode 100644 kafka/json_schemas/channel.json create mode 100644 kafka/json_schemas/server.json diff --git a/kafka/README.md b/kafka/README.md index adc5606e..6efcf74e 100644 --- a/kafka/README.md +++ b/kafka/README.md @@ -6,38 +6,81 @@ This document defines how to describe Kafka-specific information on AsyncAPI. ## Version -Current version is `0.2.0`. +Current version is `0.3.0`. ## Server Binding Object -This object MUST NOT contain any properties. Its name is reserved for future use. +This object contains information about the server representation in Kafka. +##### Fixed Fields + +Field Name | Type | Description | Applicability [default] | Constraints +---|:---:|:---:|:---:|--- +`schemaRegistryUrl` | string (url) | API URL for the Schema Registry used when producing Kafka messages (if a Schema Registry was used) | OPTIONAL | - +`schemaRegistryVendor` | string | The vendor of Schema Registry and Kafka serdes library that should be used (e.g. `apicurio`, `confluent`, `ibm`, or `karapace`) | OPTIONAL | MUST NOT be specified if `schemaRegistryUrl` is not specified +`bindingVersion` | string | The version of this binding. | OPTIONAL [`latest`] +##### Example + +```yaml +servers: + production: + bindings: + kafka: + schemaRegistryUrl: 'https://my-schema-registry.com' + schemaRegistryVendor: 'confluent' + bindingVersion: '0.3.0' +``` ## Channel Binding Object -This object MUST NOT contain any properties. Its name is reserved for future use. +This object contains information about the channel representation in Kafka (eg. a Kafka topic). + +##### Fixed Fields + +Field Name | Type | Description | Applicability [default] | Constraints +---|:---:|:---:|:---:|--- +`topic` | string | Kafka topic name if different from channel name. | OPTIONAL | - +`partitions` | integer | Number of partitions configured on this topic (useful to know how many parallel consumers you may run). | OPTIONAL | Must be positive +`replicas` | integer | Number of replicas configured on this topic. | OPTIONAL | MUST be positive +`bindingVersion` | string | The version of this binding. If omitted, "latest" MUST be assumed. | OPTIONAL [`latest`] | - +This object MUST contain only the properties defined above. + +##### Example + +This example is valid for any Confluent compatible schema registry. Here we describe the implementation using the first 4 bytes in payload to store schema identifier. + +```yaml +channels: + user-signedup: + bindings: + kafka: + topic: 'my-specific-topic-name' + partitions: 20 + replicas: 3 + bindingVersion: '0.3.0' +``` ## Operation Binding Object -This object contains information about the operation representation in Kafka. +This object contains information about the operation representation in Kafka (eg. the way to consume messages) ##### Fixed Fields -Field Name | Type | Description ----|:---:|--- -`groupId` | [Schema Object][schemaObject] | Id of the consumer group. -`clientId` | [Schema Object][schemaObject] | Id of the consumer inside a consumer group. -`bindingVersion` | string | The version of this binding. If omitted, "latest" MUST be assumed. +Field Name | Type | Description | Applicability [default] | Constraints +---|:---:|:---:|:---:|--- +`groupId` | [Schema Object][schemaObject] | Id of the consumer group. | OPTIONAL | - +`clientId` | [Schema Object][schemaObject] | Id of the consumer inside a consumer group. | OPTIONAL | - +`bindingVersion` | string | The version of this binding. If omitted, "latest" MUST be assumed. | OPTIONAL [`latest`] | - This object MUST contain only the properties defined above. @@ -46,7 +89,7 @@ This object MUST contain only the properties defined above. ```yaml channels: user-signedup: - publish: + subscribe: bindings: kafka: groupId: @@ -55,7 +98,7 @@ channels: clientId: type: string enum: ['myClientId'] - bindingVersion: '0.1.0' + bindingVersion: '0.3.0' ``` @@ -69,11 +112,32 @@ This object contains information about the message representation in Kafka. Field Name | Type | Description ---|:---:|--- -`key` | [Schema Object][schemaObject] \| [AVRO Schema Object](https://avro.apache.org/docs/current/spec.html) | The message key. **NOTE**: You can also use the [reference object](https://asyncapi.io/docs/specifications/v2.1.0#referenceObject) way. +`key` | [Schema Object][schemaObject] \| [AVRO Schema Object](https://avro.apache.org/docs/current/spec.html) | The message key. **NOTE**: You can also use the [reference object](https://asyncapi.io/docs/specifications/v2.4.0#referenceObject) way. +`schemaIdLocation` | string | If a Schema Registry is used when performing this operation, tells where the id of schema is stored (e.g. `header` or `payload`). | OPTIONAL | MUST NOT be specified if `schemaRegistryUrl` is not specified at the Server level +`schemaIdPayloadEncoding` | string | Number of bytes or vendor specific values when schema id is encoded in payload (e.g `confluent`/ `apicurio-legacy` / `apicurio-new`). | OPTIONAL | MUST NOT be specified if `schemaRegistryUrl` is not specified at the Server level +`schemaLookupStrategy` | string | Freeform string for any naming strategy class to use. Clients should default to the vendor default if not supplied. | OPTIONAL | MUST NOT be specified if `schemaRegistryUrl` is not specified at the Server level `bindingVersion` | string | The version of this binding. If omitted, "latest" MUST be assumed. This object MUST contain only the properties defined above. +This example is valid for any Confluent compatible schema registry. Here we describe the implementation using the first 4 bytes in payload to store schema identifier. + +```yaml +channels: + test: + publish: + message: + bindings: + kafka: + key: + type: string + enum: ['myKey'] + schemaIdLocation: 'payload' + schemaIdPayloadEncoding: '4' + bindingVersion: '0.3.0' +``` + +This is another example that describes the use if Apicurio schema registry. We describe the `apicurio-new` way of serializing without details on how it's implemented. We reference a [specific lookup strategy](https://www.apicur.io/registry/docs/apicurio-registry/2.2.x/getting-started/assembly-using-kafka-client-serdes.html#registry-serdes-concepts-strategy_registry) that may be used to retrieve schema Id from registry during serialization. ```yaml channels: @@ -85,7 +149,10 @@ channels: key: type: string enum: ['myKey'] - bindingVersion: '0.1.0' + schemaIdLocation: 'payload' + schemaIdPayloadEncoding: 'apicurio-new' + schemaLookupStrategy: 'TopicIdStrategy' + bindingVersion: '0.3.0' ``` -[schemaObject]: https://www.asyncapi.com/docs/specifications/2.0.0/#schemaObject +[schemaObject]: https://www.asyncapi.com/docs/specifications/2.4.0/#schemaObject diff --git a/kafka/json_schemas/channel.json b/kafka/json_schemas/channel.json new file mode 100644 index 00000000..4db52eb5 --- /dev/null +++ b/kafka/json_schemas/channel.json @@ -0,0 +1,46 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "http://asyncapi.com/bindings/kafka/channel.json", + "title": "Channel Schema", + "description": "This object contains information about the channel representation in Kafka.", + "type": "object", + "additionalProperties": false, + "patternProperties": { + "^x-[\\w\\d\\.\\-\\_]+$": { + "$ref": "https://raw.githubusercontent.com/asyncapi/spec-json-schemas/v2.14.0/schemas/2.4.0.json#/definitions/specificationExtension" + } + }, + "properties": { + "topic": { + "type": "string", + "description": "Kafka topic name if different from channel name." + }, + "partitions": { + "type": "integer", + "minimum": 1, + "description": "Number of partitions configured on this topic." + }, + "replicas": { + "type": "integer", + "minimum": 1, + "description": "Number of replicas configured on this topic." + }, + "bindingVersion": { + "type": "string", + "enum": [ + "0.3.0" + ], + "description": "The version of this binding. If omitted, 'latest' MUST be assumed." + } + + }, + "examples": [ + { + "topic": "my-specific-topic", + "partitions": 20, + "replicas": 3, + "bindingVersion": "0.3.0" + } + ] + } + \ No newline at end of file diff --git a/kafka/json_schemas/message.json b/kafka/json_schemas/message.json index 701beaa4..f1428c9c 100644 --- a/kafka/json_schemas/message.json +++ b/kafka/json_schemas/message.json @@ -6,21 +6,35 @@ "additionalProperties": false, "patternProperties": { "^x-[\\w\\d\\.\\-\\_]+$": { - "$ref": "https://raw.githubusercontent.com/asyncapi/asyncapi-node/v2.7.7/schemas/2.0.0.json#/definitions/specificationExtension" + "$ref": "https://raw.githubusercontent.com/asyncapi/asyncapi-node/v2.14.0/schemas/2.4.0.json#/definitions/specificationExtension" } }, "properties": { "key": { - "$ref": "https://raw.githubusercontent.com/asyncapi/asyncapi-node/v2.7.7/schemas/2.0.0.json#/definitions/schema", + "$ref": "https://raw.githubusercontent.com/asyncapi/asyncapi-node/v2.14.0/schemas/2.4.0.json#/definitions/schema", "description": "The message key." }, + "schemaIdLocation": { + "type": "string", + "description": "If a Schema Registry is used when performing this operation, tells where the id of schema is stored.", + "enum": ["header", "payload"] + }, + "schemaIdPayloadEncoding": { + "type": "string", + "description": "Number of bytes or vendor specific values when schema id is encoded in payload." + }, + "schemaLookupStrategy": { + "type": "string", + "description": "Freeform string for any naming strategy class to use. Clients should default to the vendor default if not supplied." + }, "bindingVersion": { "type": "string", "enum": [ - "0.1.0" + "0.3.0" ], "description": "The version of this binding. If omitted, 'latest' MUST be assumed." } + }, "examples": [ { @@ -30,13 +44,18 @@ "myKey" ] }, - "bindingVersion": "0.1.0" + "schemaIdLocation": "payload", + "schemaIdPayloadEncoding": "apicurio-new", + "schemaLookupStrategy": "TopicIdStrategy", + "bindingVersion": "0.3.0" }, { "key": { "$ref": "path/to/user-create.avsc#/UserCreate" }, - "bindingVersion": "0.2.0" + "schemaIdLocation": "payload", + "schemaIdPayloadEncoding": "4", + "bindingVersion": "0.3.0" } ] } diff --git a/kafka/json_schemas/operation.json b/kafka/json_schemas/operation.json index 500744ca..ea54cdab 100644 --- a/kafka/json_schemas/operation.json +++ b/kafka/json_schemas/operation.json @@ -7,22 +7,22 @@ "additionalProperties": false, "patternProperties": { "^x-[\\w\\d\\.\\-\\_]+$": { - "$ref": "https://raw.githubusercontent.com/asyncapi/asyncapi-node/v2.7.7/schemas/2.0.0.json#/definitions/specificationExtension" + "$ref": "https://raw.githubusercontent.com/asyncapi/spec-json-schemas/v2.14.0/schemas/2.4.0.json#/definitions/specificationExtension" } }, "properties": { "groupId": { - "$ref": "https://raw.githubusercontent.com/asyncapi/asyncapi-node/v2.7.7/schemas/2.0.0.json#/definitions/schema", + "$ref": "https://raw.githubusercontent.com/asyncapi/spec-json-schemas/v2.14.0/schemas/2.4.0.json#/definitions/schema", "description": "Id of the consumer group." }, "clientId": { - "$ref": "https://raw.githubusercontent.com/asyncapi/asyncapi-node/v2.7.7/schemas/2.0.0.json#/definitions/schema", + "$ref": "https://raw.githubusercontent.com/asyncapi/spec-json-schemas/v2.14.0/schemas/2.4.0.json#/definitions/schema", "description": "Id of the consumer inside a consumer group." }, "bindingVersion": { "type": "string", "enum": [ - "0.1.0" + "0.3.0" ], "description": "The version of this binding. If omitted, 'latest' MUST be assumed." } @@ -42,7 +42,7 @@ "myClientId" ] }, - "bindingVersion": "0.1.0" + "bindingVersion": "0.3.0" } ] } diff --git a/kafka/json_schemas/server.json b/kafka/json_schemas/server.json new file mode 100644 index 00000000..f29e3d33 --- /dev/null +++ b/kafka/json_schemas/server.json @@ -0,0 +1,37 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "http://asyncapi.com/bindings/kafka/server.json", + "title": "Server Schema", + "description": "This object contains server connection information to a Kafka broker. This object contains additional information not possible to represent within the core AsyncAPI specification.", + "type": "object", + "additionalProperties": false, + "patternProperties": { + "^x-[\\w\\d\\.\\-\\_]+$": { + "$ref": "https://raw.githubusercontent.com/asyncapi/spec-json-schemas/v2.14.0/schemas/2.4.0.json#/definitions/specificationExtension" + } + }, + "properties": { + "schemaRegistryUrl": { + "type": "string", + "description": "API URL for the Schema Registry used when producing Kafka messages (if a Schema Registry was used)." + }, + "schemaRegistryVendor": { + "type": "string", + "description": "The vendor of the Schema Registry and Kafka serdes library that should be used." + }, + "bindingVersion": { + "type": "string", + "enum": [ + "0.3.0" + ], + "description": "The version of this binding." + } + }, + "examples": [ + { + "schemaRegistryUrl": "https://my-schema-registry.com", + "schemaRegistryVendor": "confluent", + "bindingVersion": "0.3.0" + } + ] +}