Skip to content

Commit

Permalink
MAINT: allow latest schema version if not specified in confluent sche…
Browse files Browse the repository at this point in the history
…ma (opensearch-project#4453)

Signed-off-by: George Chen <[email protected]>
  • Loading branch information
chenqi0805 authored Apr 24, 2024
1 parent 2f0a564 commit c31aa6a
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public void setup() {
jsonSourceConfig = mock(KafkaSourceConfig.class);
avroSourceConfig = mock(KafkaSourceConfig.class);
pluginMetrics = mock(PluginMetrics.class);
pluginConfigObservable = mock(PluginConfigObservable.class);
Random random = new Random();
testId = random.nextInt();
testValue = random.nextDouble();
Expand Down Expand Up @@ -251,12 +252,21 @@ public void setup() {
}

@Test
public void KafkaJsonProducerConsumerTest() {
public void KafkaJsonProducerConsumerTestWithSpecifiedSchemaVersion() {
when(schemaConfig.getVersion()).thenReturn(2);
produceJsonRecords(bootstrapServers, numRecordsProduced);
consumeRecords(bootstrapServers, jsonSourceConfig);
await().atMost(Duration.ofSeconds(20)).
untilAsserted(() -> assertThat(numRecordsReceived.get(), equalTo(numRecordsProduced)));
produceJsonRecords(bootstrapServers, numRecordsProduced);
consumeRecords(bootstrapServers, jsonSourceConfig);
await().atMost(Duration.ofSeconds(20)).
untilAsserted(() -> assertThat(numRecordsReceived.get(), equalTo(numRecordsProduced)));
}

@Test
public void KafkaJsonProducerConsumerTestWithLatestSchemaVersion() {
when(schemaConfig.getVersion()).thenReturn(null);
produceJsonRecords(bootstrapServers, numRecordsProduced);
consumeRecords(bootstrapServers, jsonSourceConfig);
await().atMost(Duration.ofSeconds(20)).
untilAsserted(() -> assertThat(numRecordsReceived.get(), equalTo(numRecordsProduced)));
}

public void consumeRecords(String servers, KafkaSourceConfig sourceConfig) {
Expand Down Expand Up @@ -293,14 +303,23 @@ public void produceJsonRecords(String servers, int numRecords) throws Serializat


@Test
public void KafkaAvroProducerConsumerTest() {
public void KafkaAvroProducerConsumerTestWithSpecifiedSchemaVersion() {
when(schemaConfig.getVersion()).thenReturn(1);
produceAvroRecords(bootstrapServers, numRecordsProduced);
consumeRecords(bootstrapServers, avroSourceConfig);
await().atMost(Duration.ofSeconds(20)).
untilAsserted(() -> assertThat(numRecordsReceived.get(), equalTo(numRecordsProduced)));
}

@Test
public void KafkaAvroProducerConsumerTestWithLatestSchemaVersion() {
when(schemaConfig.getVersion()).thenReturn(null);
produceAvroRecords(bootstrapServers, numRecordsProduced);
consumeRecords(bootstrapServers, avroSourceConfig);
await().atMost(Duration.ofSeconds(20)).
untilAsserted(() -> assertThat(numRecordsReceived.get(), equalTo(numRecordsProduced)));
}

public void produceAvroRecords(String servers, int numRecords) throws SerializationException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class SchemaConfig {
private String registryURL;

@JsonProperty("version")
private int version;
private Integer version;

@JsonAlias("schema_registry_api_key")
@JsonProperty("api_key")
Expand Down Expand Up @@ -103,7 +103,7 @@ public String getRegistryURL() {
return registryURL;
}

public int getVersion() {
public Integer getVersion() {
return version;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,15 @@ private void setPropertiesForSchemaType(final KafkaConsumerConfig kafkaConsumerC
properties.put(KafkaAvroDeserializerConfig.AUTO_REGISTER_SCHEMAS, false);
final CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(properties.getProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG),
100, propertyMap);
final SchemaConfig schemaConfig = kafkaConsumerConfig.getSchemaConfig();
try {
schemaType = schemaRegistryClient.getSchemaMetadata(topic.getName() + "-value",
kafkaConsumerConfig.getSchemaConfig().getVersion()).getSchemaType();
final String subject = topic.getName() + "-value";
if (schemaConfig.getVersion() != null) {
schemaType = schemaRegistryClient.getSchemaMetadata(subject,
kafkaConsumerConfig.getSchemaConfig().getVersion()).getSchemaType();
} else {
schemaType = schemaRegistryClient.getLatestSchemaMetadata(subject).getSchemaType();
}
} catch (IOException | RestClientException e) {
LOG.error("Failed to connect to the schema registry...");
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,15 @@ private void setPropertiesForSchemaType(Properties properties, TopicConfig topic
properties.put("auto.register.schemas", false);
schemaRegistryClient = new CachedSchemaRegistryClient(getSchemaRegistryUrl(),
100, propertyMap);
final SchemaConfig schemaConfig = sourceConfig.getSchemaConfig();
try {
schemaType = schemaRegistryClient.getSchemaMetadata(topic.getName() + "-value",
sourceConfig.getSchemaConfig().getVersion()).getSchemaType();
final String subject = topic.getName() + "-value";
if (schemaConfig.getVersion() != null) {
schemaType = schemaRegistryClient.getSchemaMetadata(subject,
schemaConfig.getVersion()).getSchemaType();
} else {
schemaType = schemaRegistryClient.getLatestSchemaMetadata(subject).getSchemaType();
}
} catch (IOException | RestClientException e) {
LOG.error("Failed to connect to the schema registry...");
throw new RuntimeException(e);
Expand Down

0 comments on commit c31aa6a

Please sign in to comment.