diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 5efd1af424..3125969461 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -94,6 +94,11 @@ apicurio-registry-avro-serde-pulsar test + + io.apicurio + apicurio-registry-avro-serde-nats + test + io.apicurio apicurio-registry-utils-converter diff --git a/integration-tests/src/test/java/io/apicurio/tests/serdes/apicurio/nats/AvroNatsSerdeIT.java b/integration-tests/src/test/java/io/apicurio/tests/serdes/apicurio/nats/AvroNatsSerdeIT.java new file mode 100644 index 0000000000..3c69ac0df9 --- /dev/null +++ b/integration-tests/src/test/java/io/apicurio/tests/serdes/apicurio/nats/AvroNatsSerdeIT.java @@ -0,0 +1,110 @@ +package io.apicurio.tests.serdes.apicurio.nats; + +import io.apicurio.registry.serde.avro.nats.client.streaming.consumers.NatsConsumer; +import io.apicurio.registry.serde.avro.nats.client.streaming.consumers.NatsConsumerImpl; +import io.apicurio.registry.serde.avro.nats.client.streaming.consumers.NatsConsumerRecord; +import io.apicurio.registry.serde.avro.nats.client.streaming.producers.NatsProducer; +import io.apicurio.registry.serde.avro.nats.client.streaming.producers.NatsProducerImpl; +import io.apicurio.registry.serde.avro.strategy.TopicRecordIdStrategy; +import io.apicurio.registry.serde.config.SerdeConfig; +import io.apicurio.tests.ApicurioRegistryBaseIT; +import io.apicurio.tests.utils.Constants; +import io.nats.client.Connection; +import io.nats.client.JetStreamApiException; +import io.nats.client.JetStreamManagement; +import io.nats.client.Nats; +import io.nats.client.Options; +import io.nats.client.PullSubscribeOptions; +import io.nats.client.api.ConsumerConfiguration; +import io.nats.client.api.StreamConfiguration; +import io.quarkus.test.junit.QuarkusIntegrationTest; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; + +import java.io.IOException; +import java.util.Date; +import java.util.Map; + +@Tag(Constants.SERDES) +@QuarkusIntegrationTest +public class AvroNatsSerdeIT extends ApicurioRegistryBaseIT { + + private static final String SCHEMAV1 = "{\"type\":\"record\",\"name\":\"test\",\"namespace\":\"com.github" + + ".sourabhagrawal\",\"fields\":[{\"name\":\"Message\",\"type\":\"string\"},{\"name\":\"Time\",\"type\":\"long\"}]}"; + + private GenericContainer nats; + + public static final Integer NATS_PORT = 4222; + + public static final Integer NATS_MGMT_PORT = 8222; + + @BeforeAll + void setupEnvironment() { + if (nats == null || !nats.isRunning()) { + nats = new GenericContainer<>("nats:2.10.20").withExposedPorts(NATS_PORT, NATS_MGMT_PORT) + .withCommand("--jetstream"); + nats.start(); + } + } + + @AfterAll + void teardownEnvironment() throws Exception { + if (nats != null && nats.isRunning()) { + nats.stop(); + } + } + + @Test + public void testNatsJsonSchema() throws IOException, InterruptedException, JetStreamApiException { + String subjectId = generateArtifactId(); + Schema schema = new Schema.Parser().parse(SCHEMAV1); + GenericRecord record = new GenericData.Record(schema); + Date now = new Date(); + record.put("Message", "Hello!"); + record.put("Time", now.getTime()); + + JetStreamManagement jsm; + try (Connection connection = Nats.connect(new Options.Builder() + .server("nats://" + nats.getHost() + ":" + nats.getMappedPort(NATS_PORT)).build())) { + jsm = connection.jetStreamManagement(); + + StreamConfiguration stream = new StreamConfiguration.Builder().subjects(subjectId).name(subjectId) + .build(); + + ConsumerConfiguration consumerConfiguration = ConsumerConfiguration.builder().durable(subjectId) + .durable(subjectId).filterSubject(subjectId).build(); + + jsm.addStream(stream); // Create Stream in advance + jsm.addOrUpdateConsumer(stream.getName(), consumerConfiguration); // Create Consumer in advance + + PullSubscribeOptions options = PullSubscribeOptions.builder().bind(true).stream(stream.getName()) + .durable(consumerConfiguration.getDurable()).build(); + + Map configs = Map.of(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true", + SerdeConfig.REGISTRY_URL, ApicurioRegistryBaseIT.getRegistryV3ApiUrl(), + SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, TopicRecordIdStrategy.class.getName()); + + NatsProducer producer = new NatsProducerImpl<>(connection, subjectId, configs); + NatsConsumer consumer = new NatsConsumerImpl<>(connection, subjectId, options, + configs); + + producer.send(record); + + NatsConsumerRecord message = consumer.receive(); + + if (message.getPayload() != null) { + GenericRecord event1 = message.getPayload(); + Assertions.assertEquals(record, event1); + } + + message.ack(); + } + } +} diff --git a/pom.xml b/pom.xml index 0c4450da80..64a6796ba3 100644 --- a/pom.xml +++ b/pom.xml @@ -89,6 +89,7 @@ serdes/kafka/protobuf-serde serdes/kafka/jsonschema-serde serdes/pulsar/avro-serde + serdes/nats/avro-serde distro docs java-sdk @@ -208,6 +209,7 @@ 3.6.0 3.3.1 + 2.17.1 2.3 33.3.0-jre 7.0.0 @@ -378,6 +380,11 @@ apicurio-registry-avro-serde-pulsar ${project.version} + + io.apicurio + apicurio-registry-avro-serde-nats + ${project.version} + io.apicurio apicurio-registry-utils-import-export @@ -526,6 +533,11 @@ kafka-clients ${kafka-clients.version} + + io.nats + jnats + ${nats.version} + io.apicurio apicurio-data-models diff --git a/serdes/generic/serde-common-jsonschema/src/main/java/io/apicurio/registry/serde/jsonschema/JsonSchemaDeserializer.java b/serdes/generic/serde-common-jsonschema/src/main/java/io/apicurio/registry/serde/jsonschema/JsonSchemaDeserializer.java index b28b191d2d..194746f9af 100644 --- a/serdes/generic/serde-common-jsonschema/src/main/java/io/apicurio/registry/serde/jsonschema/JsonSchemaDeserializer.java +++ b/serdes/generic/serde-common-jsonschema/src/main/java/io/apicurio/registry/serde/jsonschema/JsonSchemaDeserializer.java @@ -101,7 +101,7 @@ public SchemaParser schemaParser() { * int, int) */ @Override - protected T readData(ParsedSchema schema, ByteBuffer buffer, int start, int length) { + public T readData(ParsedSchema schema, ByteBuffer buffer, int start, int length) { return internalReadData(schema, buffer, start, length); } diff --git a/serdes/kafka/jsonschema-serde/src/main/java/io/apicurio/registry/serde/jsonschema/JsonSchemaKafkaDeserializer.java b/serdes/kafka/jsonschema-serde/src/main/java/io/apicurio/registry/serde/jsonschema/JsonSchemaKafkaDeserializer.java index b162c42166..93cb081df0 100644 --- a/serdes/kafka/jsonschema-serde/src/main/java/io/apicurio/registry/serde/jsonschema/JsonSchemaKafkaDeserializer.java +++ b/serdes/kafka/jsonschema-serde/src/main/java/io/apicurio/registry/serde/jsonschema/JsonSchemaKafkaDeserializer.java @@ -65,7 +65,7 @@ public SchemaParser schemaParser() { * java.nio.ByteBuffer, int, int) */ @Override - protected T readData(ParsedSchema schema, ByteBuffer buffer, int start, int length) { + public T readData(ParsedSchema schema, ByteBuffer buffer, int start, int length) { return jsonSchemaDeserializer.readData(schema, buffer, start, length); } diff --git a/serdes/nats/avro-serde/pom.xml b/serdes/nats/avro-serde/pom.xml new file mode 100644 index 0000000000..e09e90f251 --- /dev/null +++ b/serdes/nats/avro-serde/pom.xml @@ -0,0 +1,33 @@ + + + 4.0.0 + + io.apicurio + apicurio-registry + 3.0.0-SNAPSHOT + ../../../pom.xml + + + apicurio-registry-avro-serde-nats + jar + apicurio-registry-avro-serde-nats + + + ${project.basedir}/../../.. + + + + + + io.apicurio + apicurio-registry-serde-common-avro + + + + io.nats + jnats + + + + + diff --git a/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/exceptions/ApicurioNatsException.java b/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/exceptions/ApicurioNatsException.java new file mode 100644 index 0000000000..6cbb867be5 --- /dev/null +++ b/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/exceptions/ApicurioNatsException.java @@ -0,0 +1,8 @@ +package io.apicurio.registry.serde.avro.nats.client.exceptions; + +public class ApicurioNatsException extends RuntimeException { + + public ApicurioNatsException(Throwable e) { + super(e); + } +} diff --git a/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/consumers/NatsConsumer.java b/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/consumers/NatsConsumer.java new file mode 100644 index 0000000000..c58bd5bebf --- /dev/null +++ b/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/consumers/NatsConsumer.java @@ -0,0 +1,19 @@ +package io.apicurio.registry.serde.avro.nats.client.streaming.consumers; + +import io.nats.client.JetStreamApiException; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; + +public interface NatsConsumer extends AutoCloseable { + + String getSubject(); + + NatsConsumerRecord receive() throws JetStreamApiException, IOException; + + NatsConsumerRecord receive(Duration timeout) throws JetStreamApiException, IOException; + + Collection> receive(int batchSize, Duration timeout) + throws JetStreamApiException, IOException; +} diff --git a/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/consumers/NatsConsumerImpl.java b/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/consumers/NatsConsumerImpl.java new file mode 100644 index 0000000000..9213c3fb12 --- /dev/null +++ b/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/consumers/NatsConsumerImpl.java @@ -0,0 +1,97 @@ +package io.apicurio.registry.serde.avro.nats.client.streaming.consumers; + +import io.apicurio.registry.serde.avro.AvroDeserializer; +import io.apicurio.registry.serde.avro.AvroSerdeConfig; +import io.nats.client.Connection; +import io.nats.client.JetStreamApiException; +import io.nats.client.JetStreamSubscription; +import io.nats.client.Message; +import io.nats.client.PullSubscribeOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class NatsConsumerImpl implements NatsConsumer { + + private final AvroDeserializer deserializer; + + private final Connection connection; + + private final String subject; + + private final PullSubscribeOptions subscribeOptions; + + private JetStreamSubscription subscription; + + private static final Logger logger = LoggerFactory.getLogger(NatsConsumerImpl.class); + + public NatsConsumerImpl(Connection connection, String subject, PullSubscribeOptions subscribeOptions, + Map config) { + this.connection = connection; + this.subject = subject; + this.subscribeOptions = subscribeOptions; + + AvroSerdeConfig serializerConfig = new AvroSerdeConfig(config); + deserializer = new AvroDeserializer<>(); + + deserializer.configure(serializerConfig, false); + } + + private JetStreamSubscription getLazySubscription() throws IOException, JetStreamApiException { + if (subscription == null) { + subscription = connection.jetStream().subscribe(subject, subscribeOptions); + } + return subscription; + } + + @Override + public String getSubject() { + return subject; + } + + @Override + public NatsConsumerRecord receive() throws JetStreamApiException, IOException { + return receive(Duration.ofSeconds(3)); + } + + @Override + public NatsConsumerRecord receive(Duration timeout) throws JetStreamApiException, IOException { + Collection> messages = receive(1, timeout); + Optional> record = messages.stream().findFirst(); + return record.orElse(null); + } + + @Override + public List> receive(int batchSize, Duration timeout) + throws JetStreamApiException, IOException { + List messages = getLazySubscription().fetch(batchSize, timeout); + + if (messages == null || messages.isEmpty()) { + logger.info("Receive timeout ({} ms)", timeout.toMillis()); + // TODO consider throwing an exception? + return Collections.emptyList(); + } + + List> records = new ArrayList<>(); + for (Message message : messages) { + DATA payload = deserializer.deserializeData(subject, message.getData()); + records.add(new NatsConsumerRecordImpl<>(message, payload)); + } + return records; + } + + @Override + public void close() throws Exception { + if (subscription != null) { + subscription.unsubscribe(); + } + } +} diff --git a/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/consumers/NatsConsumerRecord.java b/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/consumers/NatsConsumerRecord.java new file mode 100644 index 0000000000..9f20ee9b4b --- /dev/null +++ b/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/consumers/NatsConsumerRecord.java @@ -0,0 +1,12 @@ +package io.apicurio.registry.serde.avro.nats.client.streaming.consumers; + +import io.nats.client.Message; + +public interface NatsConsumerRecord { + + Message getNatsMessage(); + + T getPayload(); + + void ack(); +} diff --git a/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/consumers/NatsConsumerRecordImpl.java b/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/consumers/NatsConsumerRecordImpl.java new file mode 100644 index 0000000000..1efda861bd --- /dev/null +++ b/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/consumers/NatsConsumerRecordImpl.java @@ -0,0 +1,30 @@ +package io.apicurio.registry.serde.avro.nats.client.streaming.consumers; + +import io.nats.client.Message; + +public class NatsConsumerRecordImpl implements NatsConsumerRecord { + + private final Message natsMessage; + + private final T payload; + + public NatsConsumerRecordImpl(Message natsMessage, T payload) { + this.natsMessage = natsMessage; + this.payload = payload; + } + + @Override + public Message getNatsMessage() { + return natsMessage; + } + + @Override + public T getPayload() { + return payload; + } + + @Override + public void ack() { + natsMessage.ack(); + } +} diff --git a/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/producers/NatsProducer.java b/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/producers/NatsProducer.java new file mode 100644 index 0000000000..f4a8a6d142 --- /dev/null +++ b/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/producers/NatsProducer.java @@ -0,0 +1,8 @@ +package io.apicurio.registry.serde.avro.nats.client.streaming.producers; + +import io.apicurio.registry.serde.avro.nats.client.exceptions.ApicurioNatsException; + +public interface NatsProducer extends AutoCloseable { + + void send(T message) throws ApicurioNatsException; +} diff --git a/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/producers/NatsProducerImpl.java b/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/producers/NatsProducerImpl.java new file mode 100644 index 0000000000..3464540858 --- /dev/null +++ b/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/client/streaming/producers/NatsProducerImpl.java @@ -0,0 +1,49 @@ +package io.apicurio.registry.serde.avro.nats.client.streaming.producers; + +import io.apicurio.registry.serde.avro.AvroSerdeConfig; +import io.apicurio.registry.serde.avro.AvroSerializer; +import io.apicurio.registry.serde.avro.nats.client.exceptions.ApicurioNatsException; +import io.nats.client.Connection; +import io.nats.client.JetStream; + +import java.io.IOException; +import java.util.Map; + +public class NatsProducerImpl implements NatsProducer { + + private final Connection connection; + + private final JetStream jetStream; + + private final AvroSerializer serializer; + + private final String subject; + + public NatsProducerImpl(Connection connection, String subject, Map config) + throws IOException { + this.connection = connection; + this.subject = subject; + AvroSerdeConfig deserializerConfig = new AvroSerdeConfig(config); + serializer = new AvroSerializer<>(); + serializer.configure(deserializerConfig, false); + // config.get(NatsProducerConfig.SERIALIZER_CLASS_CONFIG) + + jetStream = connection.jetStream(); + } + + @Override + public void send(DATA message) throws ApicurioNatsException { + byte[] data = serializer.serializeData(subject, message); + + try { + jetStream.publish(subject, data); + } catch (Exception ex) { + throw new ApicurioNatsException(ex); + } + } + + @Override + public void close() throws Exception { + connection.close(); + } +} diff --git a/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/config/NatsConsumerConfig.java b/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/config/NatsConsumerConfig.java new file mode 100644 index 0000000000..3028714b11 --- /dev/null +++ b/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/config/NatsConsumerConfig.java @@ -0,0 +1,6 @@ +package io.apicurio.registry.serde.avro.nats.config; + +public class NatsConsumerConfig { + + public static String DESERIALIZER_CLASS_CONFIG = "apicurio.registry.nats.deserializer-class"; +} diff --git a/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/config/NatsProducerConfig.java b/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/config/NatsProducerConfig.java new file mode 100644 index 0000000000..169eb00707 --- /dev/null +++ b/serdes/nats/avro-serde/src/main/java/io/apicurio/registry/serde/avro/nats/config/NatsProducerConfig.java @@ -0,0 +1,6 @@ +package io.apicurio.registry.serde.avro.nats.config; + +public class NatsProducerConfig { + + public static String SERIALIZER_CLASS_CONFIG = "apicurio.registry.nats.serializer-class"; +}