Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Kafka REST, SR and message consumer APIs using direct connections #141

Merged
merged 1 commit into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ public Future<MessageViewerContext> process(MessageViewerContext context) {
public ConsumeStrategy chooseStrategy(MessageViewerContext context) {
var connectionType = context.getConnectionState().getType();
return switch (connectionType) {
case LOCAL -> nativeConsumeStrategy;
case DIRECT, LOCAL -> nativeConsumeStrategy;
case CCLOUD -> confluentCloudConsumeStrategy;
// TODO: DIRECT connection strategy is needed
case PLATFORM, DIRECT -> throw new ProcessorFailedException(
case PLATFORM -> throw new ProcessorFailedException(
context.failf(
501,
"This endpoint does not yet support connection-type=%s",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.confluent.idesidecar.restapi.proxy.clusters.processors;

import io.confluent.idesidecar.restapi.connections.CCloudConnectionState;
import io.confluent.idesidecar.restapi.connections.DirectConnectionState;
import io.confluent.idesidecar.restapi.connections.LocalConnectionState;
import io.confluent.idesidecar.restapi.connections.PlatformConnectionState;
import io.confluent.idesidecar.restapi.exceptions.ProcessorFailedException;
Expand Down Expand Up @@ -32,6 +33,9 @@ public Future<ClusterProxyContext> process(ClusterProxyContext context) {
case LocalConnectionState localConnection -> {
// Do nothing
}
case DirectConnectionState directConnection -> {
// TODO: DIRECT check auth status and fail if not connected/authenticated
}
case PlatformConnectionState platformConnection -> {
// Do nothing
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import io.confluent.idesidecar.restapi.proxy.clusters.strategy.ConfluentCloudSchemaRegistryClusterStrategy;
import io.confluent.idesidecar.restapi.proxy.clusters.strategy.ConfluentLocalKafkaClusterStrategy;
import io.confluent.idesidecar.restapi.proxy.clusters.strategy.ConfluentLocalSchemaRegistryClusterStrategy;
import io.confluent.idesidecar.restapi.proxy.clusters.strategy.DirectKafkaClusterStrategy;
import io.confluent.idesidecar.restapi.proxy.clusters.strategy.DirectSchemaRegistryClusterStrategy;
import io.vertx.core.Future;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
Expand All @@ -27,12 +29,18 @@ public class ClusterStrategyProcessor extends
@Inject
ConfluentLocalKafkaClusterStrategy confluentLocalKafkaClusterStrategy;

@Inject
DirectKafkaClusterStrategy directKafkaClusterStrategy;

@Inject
ConfluentCloudSchemaRegistryClusterStrategy confluentCloudSchemaRegistryClusterStrategy;

@Inject
ConfluentLocalSchemaRegistryClusterStrategy confluentLocalSchemaRegistryClusterStrategy;

@Inject
DirectSchemaRegistryClusterStrategy directSchemaRegistryClusterStrategy;


@Override
public Future<ClusterProxyContext> process(ClusterProxyContext context) {
Expand All @@ -57,8 +65,9 @@ public ClusterStrategy chooseStrategy(
case LOCAL ->
clusterType == ClusterType.KAFKA
? confluentLocalKafkaClusterStrategy : confluentLocalSchemaRegistryClusterStrategy;
// TODO: DIRECT proxy strategy
case DIRECT -> null;
case DIRECT ->
clusterType == ClusterType.KAFKA
? directKafkaClusterStrategy : directSchemaRegistryClusterStrategy;
case PLATFORM -> null;
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.confluent.idesidecar.restapi.proxy.clusters.strategy;

import static io.confluent.idesidecar.restapi.util.RequestHeadersConstants.CONNECTION_ID_HEADER;

import io.confluent.idesidecar.restapi.application.SidecarAccessTokenBean;
import io.confluent.idesidecar.restapi.proxy.clusters.ClusterProxyContext;
import io.vertx.core.MultiMap;
import io.vertx.core.http.HttpHeaders;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;

/**
* Strategy for processing requests and responses for a Kafka cluster.
*/
@ApplicationScoped
public class DirectKafkaClusterStrategy extends ClusterStrategy {

@ConfigProperty(name = "ide-sidecar.api.host")
String sidecarHost;

@Inject
SidecarAccessTokenBean accessToken;

/**
* We require the connection ID header to be passed to our implementation of the Kafka REST API
* at the /internal/kafka path. This is used to identify the connection that the request is
* associated with. We also pass the access token as a Bearer token in the Authorization header.
* @param context The context of the proxy request.
* @return The headers to be passed to our implementation of the Kafka REST API.
*/
@Override
public MultiMap constructProxyHeaders(ClusterProxyContext context) {
var headers = super.constructProxyHeaders(context);
return headers
.add(CONNECTION_ID_HEADER, context.getConnectionId())
.add(HttpHeaders.AUTHORIZATION, "Bearer %s".formatted(accessToken.getToken()));
}

/**
* Route the request back to ourselves at the /internal/kafka path.
* Context: We used to send this proxy request to the Kafka REST server running in the
* confluent-local container, but now we route the request to our own implementation of the
* Kafka REST API, served at the /internal/kafka path. This was done to get early feedback
* on our in-house implementation of the Kafka REST API.
* @param requestUri The URI of the incoming request.
* @param clusterUri The Kafka REST URI running alongside the Kafka cluster.
* (unused here)
* @return The URI of the Kafka REST API running in the sidecar.
*/
@Override
public String constructProxyUri(String requestUri, String clusterUri) {
return uriUtil.combine(
sidecarHost, requestUri.replaceFirst("^(/kafka|kafka)", "/internal/kafka")
);
}

/**
* In addition to replacing the cluster URLs with the sidecar host, we also need to replace
* the internal Kafka REST path /internal/kafka with the external facing /kafka path.
* @param proxyResponse The response body from the Kafka REST API.
* @param clusterUri The URI of the Kafka REST API running alongside the Kafka cluster.
* (unused here)
* @param sidecarHost The host of the sidecar.
* @return The response body with the internal Kafka REST path replaced with the external path.
*/
@Override
public String processProxyResponse(String proxyResponse, String clusterUri, String sidecarHost) {
return super
.processProxyResponse(proxyResponse, sidecarHost, sidecarHost)
.replaceAll("%s/internal/kafka".formatted(sidecarHost), "%s/kafka".formatted(sidecarHost));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.confluent.idesidecar.restapi.proxy.clusters.strategy;

import jakarta.enterprise.context.ApplicationScoped;

/**
* Strategy for processing requests and responses for a Confluent local Kafka cluster.
*/
@ApplicationScoped
public class DirectSchemaRegistryClusterStrategy extends ClusterStrategy {
// No specific configuration for this strategy
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,26 @@ private static String getProductSchema(SchemaFormat format) {
@Nested
@Tag("io.confluent.common.utils.IntegrationTest")
@TestProfile(NoAccessFilterProfile.class)
class SadPath extends AbstractSidecarIT {
class SadPathWithLocal extends SadPath {

@BeforeEach
public void beforeEach() {
setupLocalConnection(SadPath.class);
setupLocalConnection(this);
}
}

@Nested
@Tag("io.confluent.common.utils.IntegrationTest")
@TestProfile(NoAccessFilterProfile.class)
class SadPathWithDirect extends SadPath {

@BeforeEach
public void beforeEach() {
setupDirectConnection(this);
}
}

abstract class SadPath extends AbstractSidecarIT {

@Test
void shouldThrowNotFoundWhenClusterDoesNotExist() {
Expand Down Expand Up @@ -264,15 +278,30 @@ void shouldThrowBadRequestForInvalidProtobufData(Object badData) {
}
}


@Nested
@Tag("io.confluent.common.utils.IntegrationTest")
@TestProfile(NoAccessFilterProfile.class)
class HappyPath extends AbstractSidecarIT {
class HappyPathWithLocal extends HappyPath {

@BeforeEach
public void beforeEach() {
setupLocalConnection(HappyPath.class);
setupLocalConnection(this);
}
}

@Nested
@Tag("io.confluent.common.utils.IntegrationTest")
@TestProfile(NoAccessFilterProfile.class)
class HappyPathWithDirect extends HappyPath {

@BeforeEach
public void beforeEach() {
setupDirectConnection(this);
}
}

abstract class HappyPath extends AbstractSidecarIT {

private static RecordData jsonData(Object data) {
return new RecordData(null, null, data);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright [2024 - 2024] Confluent Inc.
*/

package io.confluent.idesidecar.restapi.kafkarest.api;

import io.confluent.idesidecar.restapi.testutil.NoAccessFilterProfile;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import io.quarkus.test.junit.TestProfile;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;

@QuarkusIntegrationTest
@Tag("io.confluent.common.utils.IntegrationTest")
@TestProfile(NoAccessFilterProfile.class)
public class ClusterV3ApiImplDirectIT extends ClusterV3ApiImplIT {

@BeforeEach
public void beforeEach() {
setupDirectConnection(this);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,11 @@

import static org.hamcrest.Matchers.equalTo;

import io.confluent.idesidecar.restapi.testutil.NoAccessFilterProfile;
import io.confluent.idesidecar.restapi.util.AbstractSidecarIT;
import io.confluent.idesidecar.restapi.util.ConfluentLocalKafkaWithRestProxyContainer;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import io.quarkus.test.junit.TestProfile;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@QuarkusIntegrationTest
@Tag("io.confluent.common.utils.IntegrationTest")
@TestProfile(NoAccessFilterProfile.class)
public class ClusterV3ApiImplIT extends AbstractSidecarIT {

@BeforeEach
public void beforeEach() {
setupLocalConnection(ClusterV3ApiImplIT.class);
}
abstract class ClusterV3ApiImplIT extends AbstractSidecarIT {

@Test
void shouldListKafkaClusters() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright [2024 - 2024] Confluent Inc.
*/

package io.confluent.idesidecar.restapi.kafkarest.api;

import io.confluent.idesidecar.restapi.testutil.NoAccessFilterProfile;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import io.quarkus.test.junit.TestProfile;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;

@QuarkusIntegrationTest
@Tag("io.confluent.common.utils.IntegrationTest")
@TestProfile(NoAccessFilterProfile.class)
public class ClusterV3ApiImplLocalIT extends ClusterV3ApiImplIT {

@BeforeEach
public void beforeEach() {
setupDirectConnection(this);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright [2024 - 2024] Confluent Inc.
*/

package io.confluent.idesidecar.restapi.kafkarest.api;

import io.confluent.idesidecar.restapi.testutil.NoAccessFilterProfile;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import io.quarkus.test.junit.TestProfile;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;

@QuarkusIntegrationTest
@Tag("io.confluent.common.utils.IntegrationTest")
@TestProfile(NoAccessFilterProfile.class)
public class PartitionV3ApiImplDirectIT extends PartitionV3ApiImplIT {

@BeforeEach
public void beforeEach() {
setupDirectConnection(this);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,10 @@
import static io.confluent.idesidecar.restapi.util.ConfluentLocalKafkaWithRestProxyContainer.CLUSTER_ID;
import static org.hamcrest.Matchers.equalTo;

import io.confluent.idesidecar.restapi.testutil.NoAccessFilterProfile;
import io.confluent.idesidecar.restapi.util.AbstractSidecarIT;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import io.quarkus.test.junit.TestProfile;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@QuarkusIntegrationTest
@Tag("io.confluent.common.utils.IntegrationTest")
@TestProfile(NoAccessFilterProfile.class)
public class PartitionV3ApiImplIT extends AbstractSidecarIT {

@BeforeEach
public void beforeEach() {
setupLocalConnection(PartitionV3ApiImplIT.class);
}
abstract class PartitionV3ApiImplIT extends AbstractSidecarIT {

@Test
void shouldListTopicPartitions() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright [2024 - 2024] Confluent Inc.
*/

package io.confluent.idesidecar.restapi.kafkarest.api;

import io.confluent.idesidecar.restapi.testutil.NoAccessFilterProfile;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import io.quarkus.test.junit.TestProfile;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;

@QuarkusIntegrationTest
@Tag("io.confluent.common.utils.IntegrationTest")
@TestProfile(NoAccessFilterProfile.class)
public class PartitionV3ApiImplLocalIT extends PartitionV3ApiImplIT {

@BeforeEach
public void beforeEach() {
setupLocalConnection(this);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright [2024 - 2024] Confluent Inc.
*/

package io.confluent.idesidecar.restapi.kafkarest.api;

import io.confluent.idesidecar.restapi.testutil.NoAccessFilterProfile;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import io.quarkus.test.junit.TestProfile;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;

@QuarkusIntegrationTest
@Tag("io.confluent.common.utils.IntegrationTest")
@TestProfile(NoAccessFilterProfile.class)
public class TopicV3ApiImplDirectIT extends TopicV3ApiImplIT {

@BeforeEach
public void beforeEach() {
setupDirectConnection(this);
}

}
Loading