Skip to content

Commit

Permalink
Implement nats avro serdes (#5195)
Browse files Browse the repository at this point in the history
  • Loading branch information
carlesarnal authored Sep 18, 2024
1 parent 09b4fc6 commit dbb071b
Show file tree
Hide file tree
Showing 15 changed files with 397 additions and 2 deletions.
5 changes: 5 additions & 0 deletions integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@
<artifactId>apicurio-registry-avro-serde-pulsar</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-avro-serde-nats</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-utils-converter</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package io.apicurio.tests.serdes.apicurio.nats;

import io.apicurio.registry.serde.avro.nats.client.streaming.consumers.NatsConsumer;
import io.apicurio.registry.serde.avro.nats.client.streaming.consumers.NatsConsumerImpl;
import io.apicurio.registry.serde.avro.nats.client.streaming.consumers.NatsConsumerRecord;
import io.apicurio.registry.serde.avro.nats.client.streaming.producers.NatsProducer;
import io.apicurio.registry.serde.avro.nats.client.streaming.producers.NatsProducerImpl;
import io.apicurio.registry.serde.avro.strategy.TopicRecordIdStrategy;
import io.apicurio.registry.serde.config.SerdeConfig;
import io.apicurio.tests.ApicurioRegistryBaseIT;
import io.apicurio.tests.utils.Constants;
import io.nats.client.Connection;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.PullSubscribeOptions;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.StreamConfiguration;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;

import java.io.IOException;
import java.util.Date;
import java.util.Map;

@Tag(Constants.SERDES)
@QuarkusIntegrationTest
public class AvroNatsSerdeIT extends ApicurioRegistryBaseIT {

private static final String SCHEMAV1 = "{\"type\":\"record\",\"name\":\"test\",\"namespace\":\"com.github"
+ ".sourabhagrawal\",\"fields\":[{\"name\":\"Message\",\"type\":\"string\"},{\"name\":\"Time\",\"type\":\"long\"}]}";

private GenericContainer<?> nats;

public static final Integer NATS_PORT = 4222;

public static final Integer NATS_MGMT_PORT = 8222;

@BeforeAll
void setupEnvironment() {
if (nats == null || !nats.isRunning()) {
nats = new GenericContainer<>("nats:2.10.20").withExposedPorts(NATS_PORT, NATS_MGMT_PORT)
.withCommand("--jetstream");
nats.start();
}
}

@AfterAll
void teardownEnvironment() throws Exception {
if (nats != null && nats.isRunning()) {
nats.stop();
}
}

@Test
public void testNatsJsonSchema() throws IOException, InterruptedException, JetStreamApiException {
String subjectId = generateArtifactId();
Schema schema = new Schema.Parser().parse(SCHEMAV1);
GenericRecord record = new GenericData.Record(schema);
Date now = new Date();
record.put("Message", "Hello!");
record.put("Time", now.getTime());

JetStreamManagement jsm;
try (Connection connection = Nats.connect(new Options.Builder()
.server("nats://" + nats.getHost() + ":" + nats.getMappedPort(NATS_PORT)).build())) {
jsm = connection.jetStreamManagement();

StreamConfiguration stream = new StreamConfiguration.Builder().subjects(subjectId).name(subjectId)
.build();

ConsumerConfiguration consumerConfiguration = ConsumerConfiguration.builder().durable(subjectId)
.durable(subjectId).filterSubject(subjectId).build();

jsm.addStream(stream); // Create Stream in advance
jsm.addOrUpdateConsumer(stream.getName(), consumerConfiguration); // Create Consumer in advance

PullSubscribeOptions options = PullSubscribeOptions.builder().bind(true).stream(stream.getName())
.durable(consumerConfiguration.getDurable()).build();

Map<String, Object> configs = Map.of(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true",
SerdeConfig.REGISTRY_URL, ApicurioRegistryBaseIT.getRegistryV3ApiUrl(),
SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, TopicRecordIdStrategy.class.getName());

NatsProducer<GenericRecord> producer = new NatsProducerImpl<>(connection, subjectId, configs);
NatsConsumer<GenericRecord> consumer = new NatsConsumerImpl<>(connection, subjectId, options,
configs);

producer.send(record);

NatsConsumerRecord<GenericRecord> message = consumer.receive();

if (message.getPayload() != null) {
GenericRecord event1 = message.getPayload();
Assertions.assertEquals(record, event1);
}

message.ack();
}
}
}
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
<module>serdes/kafka/protobuf-serde</module>
<module>serdes/kafka/jsonschema-serde</module>
<module>serdes/pulsar/avro-serde</module>
<module>serdes/nats/avro-serde</module>
<module>distro</module>
<module>docs</module>
<module>java-sdk</module>
Expand Down Expand Up @@ -208,6 +209,7 @@
<kafka-clients.version>3.6.0</kafka-clients.version>
<pulsar-clients.version>3.3.1</pulsar-clients.version>

<nats.version>2.17.1</nats.version>
<snakeyaml.version>2.3</snakeyaml.version>
<guava.version>33.3.0-jre</guava.version>
<woodstox-core.version>7.0.0</woodstox-core.version>
Expand Down Expand Up @@ -378,6 +380,11 @@
<artifactId>apicurio-registry-avro-serde-pulsar</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-avro-serde-nats</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-utils-import-export</artifactId>
Expand Down Expand Up @@ -526,6 +533,11 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
</dependency>
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>${nats.version}</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-data-models</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public SchemaParser<JsonSchema, T> schemaParser() {
* int, int)
*/
@Override
protected T readData(ParsedSchema<JsonSchema> schema, ByteBuffer buffer, int start, int length) {
public T readData(ParsedSchema<JsonSchema> schema, ByteBuffer buffer, int start, int length) {
return internalReadData(schema, buffer, start, length);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public SchemaParser<JsonSchema, T> schemaParser() {
* java.nio.ByteBuffer, int, int)
*/
@Override
protected T readData(ParsedSchema<JsonSchema> schema, ByteBuffer buffer, int start, int length) {
public T readData(ParsedSchema<JsonSchema> schema, ByteBuffer buffer, int start, int length) {
return jsonSchemaDeserializer.readData(schema, buffer, start, length);
}

Expand Down
33 changes: 33 additions & 0 deletions serdes/nats/avro-serde/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry</artifactId>
<version>3.0.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

<artifactId>apicurio-registry-avro-serde-nats</artifactId>
<packaging>jar</packaging>
<name>apicurio-registry-avro-serde-nats</name>

<properties>
<projectRoot>${project.basedir}/../../..</projectRoot>
</properties>

<dependencies>

<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serde-common-avro</artifactId>
</dependency>

<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.apicurio.registry.serde.avro.nats.client.exceptions;

public class ApicurioNatsException extends RuntimeException {

public ApicurioNatsException(Throwable e) {
super(e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.apicurio.registry.serde.avro.nats.client.streaming.consumers;

import io.nats.client.JetStreamApiException;

import java.io.IOException;
import java.time.Duration;
import java.util.Collection;

public interface NatsConsumer<T> extends AutoCloseable {

String getSubject();

NatsConsumerRecord<T> receive() throws JetStreamApiException, IOException;

NatsConsumerRecord<T> receive(Duration timeout) throws JetStreamApiException, IOException;

Collection<NatsConsumerRecord<T>> receive(int batchSize, Duration timeout)
throws JetStreamApiException, IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package io.apicurio.registry.serde.avro.nats.client.streaming.consumers;

import io.apicurio.registry.serde.avro.AvroDeserializer;
import io.apicurio.registry.serde.avro.AvroSerdeConfig;
import io.nats.client.Connection;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.PullSubscribeOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class NatsConsumerImpl<DATA> implements NatsConsumer<DATA> {

private final AvroDeserializer<DATA> deserializer;

private final Connection connection;

private final String subject;

private final PullSubscribeOptions subscribeOptions;

private JetStreamSubscription subscription;

private static final Logger logger = LoggerFactory.getLogger(NatsConsumerImpl.class);

public NatsConsumerImpl(Connection connection, String subject, PullSubscribeOptions subscribeOptions,
Map<String, Object> config) {
this.connection = connection;
this.subject = subject;
this.subscribeOptions = subscribeOptions;

AvroSerdeConfig serializerConfig = new AvroSerdeConfig(config);
deserializer = new AvroDeserializer<>();

deserializer.configure(serializerConfig, false);
}

private JetStreamSubscription getLazySubscription() throws IOException, JetStreamApiException {
if (subscription == null) {
subscription = connection.jetStream().subscribe(subject, subscribeOptions);
}
return subscription;
}

@Override
public String getSubject() {
return subject;
}

@Override
public NatsConsumerRecord<DATA> receive() throws JetStreamApiException, IOException {
return receive(Duration.ofSeconds(3));
}

@Override
public NatsConsumerRecord<DATA> receive(Duration timeout) throws JetStreamApiException, IOException {
Collection<NatsConsumerRecord<DATA>> messages = receive(1, timeout);
Optional<NatsConsumerRecord<DATA>> record = messages.stream().findFirst();
return record.orElse(null);
}

@Override
public List<NatsConsumerRecord<DATA>> receive(int batchSize, Duration timeout)
throws JetStreamApiException, IOException {
List<Message> messages = getLazySubscription().fetch(batchSize, timeout);

if (messages == null || messages.isEmpty()) {
logger.info("Receive timeout ({} ms)", timeout.toMillis());
// TODO consider throwing an exception?
return Collections.emptyList();
}

List<NatsConsumerRecord<DATA>> records = new ArrayList<>();
for (Message message : messages) {
DATA payload = deserializer.deserializeData(subject, message.getData());
records.add(new NatsConsumerRecordImpl<>(message, payload));
}
return records;
}

@Override
public void close() throws Exception {
if (subscription != null) {
subscription.unsubscribe();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.apicurio.registry.serde.avro.nats.client.streaming.consumers;

import io.nats.client.Message;

public interface NatsConsumerRecord<T> {

Message getNatsMessage();

T getPayload();

void ack();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.apicurio.registry.serde.avro.nats.client.streaming.consumers;

import io.nats.client.Message;

public class NatsConsumerRecordImpl<T> implements NatsConsumerRecord<T> {

private final Message natsMessage;

private final T payload;

public NatsConsumerRecordImpl(Message natsMessage, T payload) {
this.natsMessage = natsMessage;
this.payload = payload;
}

@Override
public Message getNatsMessage() {
return natsMessage;
}

@Override
public T getPayload() {
return payload;
}

@Override
public void ack() {
natsMessage.ack();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.apicurio.registry.serde.avro.nats.client.streaming.producers;

import io.apicurio.registry.serde.avro.nats.client.exceptions.ApicurioNatsException;

public interface NatsProducer<T> extends AutoCloseable {

void send(T message) throws ApicurioNatsException;
}
Loading

0 comments on commit dbb071b

Please sign in to comment.