Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sj/feat/kafka connect impl #910

Merged
merged 5 commits into from
Nov 24, 2023

Conversation

sago2k8
Copy link
Contributor

@sago2k8 sago2k8 commented Nov 1, 2023

Initial impl of the kafka connect api for the dataplane api.

ref:

key points:

  • Add support for custom http response status code. I.E 202, 204

  • Forward the actual status code from the kafka-connect response

  • Add preliminar protos for some kafka connect methods/endpoints

  • Kafka connect endpoints implemented:

    • List Connect Clusters: implements list for multiple connect clusters, This is the console abstraction
    • Get Connect Cluster: Implement the get connect cluster, list plugins and information about the cluster(commit, version, ids)
    • Restart Connector: restart connector, change the response to be 204 in REST for now, add support for the parameter onlyFailed and includeTasks,
    • Delete Connector: deletes the connector, same 204 response when in rest.
    • Pause and Resume Connectors: pause/ resume running connector.
    • List Connectors: List Connectors, interface changed to be aligned with our grpc-connect interface.
    • Create Connector: Creates a connector.
  • The schema of the request and responses is more or less similar to Confluent but Errors and List request, which are heavily inspired by our ControlPlane API and GRPC-connect/REST grpc gateway. style, this was discussed and we consider UX over compatibility.

  • There are some changes done around integration tests, dependency updates and refactor some test consuming test-containers.

instead of a map on the ListConnectors Response as in the kafka connect api definition, I Used an Array. IE:

[
    {
        "name": "mongo-import-7",
        "info": {
            "name": "mongo-import-7",
            "config": {
                "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
                "database.include.list": "main",
                "errors.log.enable": "false",
                "errors.log.include.messages": "false",
                "header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter",
                "key.converter": "org.apache.kafka.connect.json.JsonConverter",
                "mongodb.connection.string": "mongodb+srv://santiago:[email protected]",
                "mongodb.members.auto.discover": "true",
                "mongodb.ssl.enabled": "false",
                "mongodb.ssl.invalid.hostname.allowed": "false",
                "name": "mongo-import-7",
                "provide.transaction.metadata": "false",
                "sanitize.field.names": "false",
                "tasks.max": "2",
                "tombstones.on.delete": "false",
                "topic.prefix": "mongo",
                "value.converter": "org.apache.kafka.connect.json.JsonConverter"
            },
            "tasks": [
                {
                    "connector": "mongo-import-7",
                    "task": 0
                }
            ],
            "type": "source"
        },
        "status": {
            "name": "mongo-import-7",
            "connector": {
                "state": "RUNNING",
                "worker_id": "172.20.0.4:8083"
            },
            "tasks": [
                {
                    "id": 0,
                    "state": "RUNNING",
                    "worker_id": "172.20.0.4:8083",
                    "trace": ""
                }
            ],
            "type": "source",
            "trace": ""
        },
        "holistic_state": "HEALTHY",
        "errors": []
    },
    {
        "name": "mong-import",
        "info": {
            "name": "mong-import",
            "config": {
                "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
                "database.include.list": "main",
                "errors.log.enable": "false",
                "errors.log.include.messages": "false",
                "header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter",
                "key.converter": "org.apache.kafka.connect.json.JsonConverter",
                "mongodb.connection.string": "mongodb+srv://santiago:[email protected]",
                "mongodb.members.auto.discover": "true",
                "mongodb.ssl.enabled": "false",
                "mongodb.ssl.invalid.hostname.allowed": "false",
                "name": "mong-import",
                "provide.transaction.metadata": "false",
                "sanitize.field.names": "false",
                "tasks.max": "2",
                "tombstones.on.delete": "false",
                "topic.prefix": "mongo",
                "value.converter": "org.apache.kafka.connect.json.JsonConverter"
            },
            "tasks": [
                {
                    "connector": "mong-import",
                    "task": 0
                }
            ],
            "type": "source"
        },
        "status": {
            "name": "mong-import",
            "connector": {
                "state": "RUNNING",
                "worker_id": "172.20.0.4:8083"
            },
            "tasks": [
                {
                    "id": 0,
                    "state": "RUNNING",
                    "worker_id": "172.20.0.4:8083",
                    "trace": ""
                }
            ],
            "type": "source",
            "trace": ""
        },
        "holistic_state": "HEALTHY",
        "errors": []
    },
    {
        "name": "connector test",
        "info": {
            "name": "connector test",
            "config": {
                "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
                "errors.log.enable": "false",
                "errors.log.include.messages": "false",
                "header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter",
                "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
                "mongodb.connection.string": "mongodb://test:test@localhost:27017",
                "mongodb.members.auto.discover": "true",
                "mongodb.ssl.enabled": "false",
                "mongodb.ssl.invalid.hostname.allowed": "false",
                "name": "connector test",
                "provide.transaction.metadata": "false",
                "sanitize.field.names": "false",
                "tombstones.on.delete": "false",
                "topic.prefix": "sasdas",
                "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
            },
            "tasks": [],
            "type": "source"
        },
        "status": {
            "name": "connector test",
            "connector": {
                "state": "RUNNING",
                "worker_id": "172.20.0.4:8083"
            },
            "tasks": [],
            "type": "source",
            "trace": ""
        },
        "holistic_state": "UNHEALTHY",
        "errors": [
            {
                "type": "ERROR",
                "title": "Connector connector test is in unhealthy state.",
                "content": "Connector connector test is in running state but has no tasks."
            }
        ]
    },
    {
        "name": "mong-import-6",
        "info": {
            "name": "mong-import-6",
            "config": {
                "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
                "database.include.list": "main",
                "errors.log.enable": "false",
                "errors.log.include.messages": "false",
                "header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter",
                "key.converter": "org.apache.kafka.connect.json.JsonConverter",
                "mongodb.connection.string": "mongodb+srv://santiago:[email protected]",
                "mongodb.members.auto.discover": "true",
                "mongodb.ssl.enabled": "false",
                "mongodb.ssl.invalid.hostname.allowed": "false",
                "name": "mong-import-6",
                "provide.transaction.metadata": "false",
                "sanitize.field.names": "false",
                "tasks.max": "2",
                "tombstones.on.delete": "false",
                "topic.prefix": "mongo",
                "value.converter": "org.apache.kafka.connect.json.JsonConverter"
            },
            "tasks": [
                {
                    "connector": "mong-import-6",
                    "task": 0
                }
            ],
            "type": "source"
        },
        "status": {
            "name": "mong-import-6",
            "connector": {
                "state": "RUNNING",
                "worker_id": "172.20.0.4:8083"
            },
            "tasks": [
                {
                    "id": 0,
                    "state": "RUNNING",
                    "worker_id": "172.20.0.4:8083",
                    "trace": ""
                }
            ],
            "type": "source",
            "trace": ""
        },
        "holistic_state": "HEALTHY",
        "errors": []
    }
]

@sago2k8 sago2k8 force-pushed the sj/feat/kafka-connect-impl branch 3 times, most recently from 41043fd to d5d37d6 Compare November 6, 2023 03:58
// GetStatusCodeFromAPIError tries to parse given error as kafa connect
// ApiError and returns the status code, if parsing is not possible it returns
// a fallback error code
func GetStatusCodeFromAPIError(err error, fallback int) int {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forward the actual error code of the response when possible

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should check if the status code is within our expected range (200, 202, 400-503) or the likes, to ensure we are not getting non-sense back from Kafka connect. If it's not within the range we'll also return the fallback. The API reference says they will do so, but may introduce 3xx as well at some point. Just to ensure we are not bringing ourselves in trouble whenever they change something there (e.g. in schema registry they use custom error codes like 40401).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only account for 400 and 500 status code since those are the errors that should be present

@sago2k8 sago2k8 force-pushed the sj/feat/kafka-connect-impl branch from d5d37d6 to a8e6567 Compare November 6, 2023 04:03
@sago2k8 sago2k8 marked this pull request as ready for review November 6, 2023 04:28
@weeco
Copy link
Contributor

weeco commented Nov 6, 2023

@sago2k8 If we use an array in the response, we can't introduce pagination without a breaking API change. We try to provide paginated responses where reasonable. Debatable whether this is the case here (or on other responses).

Copy link
Contributor

@weeco weeco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job, left a few minor comments 👍

Comment on lines 50 to 41
enum ExpandOption {
// buf:lint:ignore ENUM_ZERO_VALUE_SUFFIX
UNSPECIFIED = 0;
// buf:lint:ignore ENUM_VALUE_UPPER_SNAKE_CASE
status = 1;
// buf:lint:ignore ENUM_VALUE_UPPER_SNAKE_CASE
info = 2;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be discussed for all enums on the kafka connect API. How much do we want to remain Kafka connect API compatible versus consistency & follow best practices for the Redpanda API.

Santiago opted for best possibly compatibility with the Kafka Connect API when it comes to enums as far as I can tell.

@birdayz any opinion?

message ListConnectorsRequest {
repeated ExpandOption expand = 1 [(buf.validate.field).repeated.max_items = 2];
string cluster_name = 2 [
(google.api.field_behavior) = REQUIRED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this field behavior annotation required if we already set the validate annotation? As far as I remember it the buf validate annotations are already considered for client code generation, but I may be wrong. Same applies to the remaining fields where both is set

Copy link
Contributor Author

@sago2k8 sago2k8 Nov 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is useful for the generated documentation, this field will be marked as required in the Swagger generated documentation.

rpc ListConnectors(ListConnectorsRequest) returns (ListConnectorsResponse) {
option (google.api.http) = {
get: "/v1alpha1/{cluster_name}/connectors"
response_body: "connectors"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plain array response may prevent us from adding pagination. Let's discuss whether we need it or not.

@sago2k8 sago2k8 force-pushed the sj/feat/kafka-connect-impl branch 16 times, most recently from abbd8a2 to 5767991 Compare November 14, 2023 09:42
@sago2k8 sago2k8 force-pushed the sj/feat/kafka-connect-impl branch 5 times, most recently from 3d3df09 to 2e4e5bb Compare November 17, 2023 23:19
@sago2k8 sago2k8 force-pushed the sj/feat/kafka-connect-impl branch from 2e4e5bb to 5382071 Compare November 17, 2023 23:37
}

message ListConnectorsRequest {
repeated string expand = 1 [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about removing the expand parameter and always setting both info and status to true? Personally I don't find this very idiomatic, but at least we have a stable response schema now, so that I'm fine with keeping it as well.

To me it feels like they just added these as optional parameters at a later point to remain 100% backwards compatible (no additional fields added in newer versions).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense removing it


message CreateConnectorResponse {
string name = 1;
// cod fig is the connector cron configuration map, every key is a string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment on lines 225 to 208
optional ConnectorSpec info = 2;
optional ConnectorStatus status = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we remove the optional expand parameter we can remove the optional.

// Connect equivalent REST endpoint
rpc GetConnector(GetConnectorRequest) returns (GetConnectorResponse) {
option (google.api.http) = {
get: "/v1alpha1/kafka-connect/clusters/{cluster_name}/connectors/{name}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's present the URL path in the sync. I can imagine it may be controversial, because:

  1. It contains kafka-connect. We maybe want to use managed-connectors for Cloud, but not for community.
  2. We always have a single kafka connect cluster in cloud

@@ -84,6 +84,8 @@ tasks:
vars:
VERSION: 0.2.2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to 0.3.1

@@ -84,6 +84,8 @@ tasks:
vars:
VERSION: 0.2.2
cmds:
- 'go install go.vallahaye.net/connect-gateway/cmd/protoc-gen-connect-gateway@d941a13c0fbf10931098de2fd084d3ee8d0bd1e4'
- |
GOBIN={{ .BUILD_ROOT }}/bin/go go install go.vallahaye.net/connect-gateway/cmd/protoc-gen-connect-gateway@d941a13c0fbf10931098de2fd084d3ee8d0bd1e4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reference version from var again

// this includes holistic unified connector status that takes into account not
// just the connector instance state, but also state of all the tasks within the
// connector
enum HolisticState {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe ConnectorHolisticState or just ConnectorState. The name HolisticState for the entire API package may be too generic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConnectorState may be clashing with the state that Kafka connect already reports for a Connector though. This state is kind of the reason why we introduced our "own state" that considers the task states as well as the connector states.

@sago2k8 sago2k8 force-pushed the sj/feat/kafka-connect-impl branch 2 times, most recently from dee540f to c3c82d6 Compare November 23, 2023 20:36

Verified

This commit was signed with the committer’s verified signature.
sago2k8 Santiago Jimenez Giraldo
Implement messages for list connectors and extend tooling for the proto
generate task

Signed-off-by: Santiago Jimenez Giraldo <[email protected]>
@sago2k8 sago2k8 force-pushed the sj/feat/kafka-connect-impl branch from c3c82d6 to 1910290 Compare November 23, 2023 20:50

Verified

This commit was signed with the committer’s verified signature.
sago2k8 Santiago Jimenez Giraldo
Extend Kafka Connect service abstraction so we expose more details and
the correct http error code

Signed-off-by: Santiago Jimenez Giraldo <[email protected]>

Verified

This commit was signed with the committer’s verified signature.
sago2k8 Santiago Jimenez Giraldo
Implement grcp connect for the kafka-connect service, add mapper for
transforming the kafka connect responses into a valid grpc connect
responses, errors etc

Signed-off-by: Santiago Jimenez Giraldo <[email protected]>
@sago2k8 sago2k8 force-pushed the sj/feat/kafka-connect-impl branch from 1910290 to 2c2c984 Compare November 23, 2023 21:15

Verified

This commit was signed with the committer’s verified signature.
sago2k8 Santiago Jimenez Giraldo
Add a simple integration test, to validate an error case

Signed-off-by: Santiago Jimenez Giraldo <[email protected]>

Verified

This commit was signed with the committer’s verified signature.
sago2k8 Santiago Jimenez Giraldo
- Update testcontainers to the latest version.
- use methods of testcontainers/modules/redpanda for retrieving schema
  registry
- Update action since the issues linked for the override have been
  resolved

Signed-off-by: Santiago Jimenez Giraldo <[email protected]>
@sago2k8 sago2k8 force-pushed the sj/feat/kafka-connect-impl branch from 2c2c984 to 7797e67 Compare November 23, 2023 21:20
@sago2k8
Copy link
Contributor Author

sago2k8 commented Nov 23, 2023

Thanks for the review @weeco I addressed your comments :)

@sago2k8 sago2k8 merged commit b00af36 into redpanda-data:master Nov 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants