Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/find schema by name #40

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public class AvroSinkMapper extends SinkMapper {
private static final String UNDEFINED = "undefined";
private static final String SCHEMA_REGISTRY = "registry";
private static final String SCHEMA_ID = "id";
private static final String SCHEMA_NAME = "name";
public static final String USE_AVRO_SERIALIZER = "use.avro.serializer";

private String[] attributeNameArray;
Expand Down Expand Up @@ -150,7 +151,9 @@ public void init(StreamDefinition streamDefinition, OptionHolder optionHolder, M
optionHolder.validateAndGetStaticValue(DEFAULT_AVRO_MAPPING_PREFIX.concat(".").
concat(SCHEMA_REGISTRY), null),
optionHolder.validateAndGetStaticValue(DEFAULT_AVRO_MAPPING_PREFIX.concat(".").
concat(SCHEMA_ID), null), streamDefinition.getId());
concat(SCHEMA_ID), null), streamDefinition.getId(),
optionHolder.validateAndGetStaticValue(DEFAULT_AVRO_MAPPING_PREFIX.concat(".").
concat(SCHEMA_NAME), null), streamDefinition.getName());
useAvroSerializer = Boolean.parseBoolean(
optionHolder.validateAndGetStaticValue(USE_AVRO_SERIALIZER, "false"));
}
Expand All @@ -162,8 +165,14 @@ private Schema getAvroSchema(String schemaDefinition, String schemaRegistryURL,
if (schemaDefinition != null) {
returnSchema = new Schema.Parser().parse(schemaDefinition);
} else if (schemaRegistryURL != null) {
SchemaRegistryReader schemaRegistryReader = new SchemaRegistryReader();
returnSchema = schemaRegistryReader.getSchemaFromID(schemaRegistryURL, schemaID);
if (schemaID != null) {
SchemaRegistryReader schemaRegistryReader = new SchemaRegistryReader();
returnSchema = schemaRegistryReader.getSchemaFromID(schemaRegistryURL, schemaID);
}
if (streamName != null) {
SchemaRegistryReader schemaRegistryReader = new SchemaRegistryReader();
returnSchema = schemaRegistryReader.getSchemaFromName(schemaRegistryURL, streamName);
}
} else if (attributeList.size() > 0) {
log.warn("Schema Definition and Schema Registry is not specified in Stream. Hence generating " +
"schema from stream attributes.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@
"The schema is retrieved from the schema registry via the specified ID.",

type = {DataType.STRING}),
@Parameter(name = "schema.name",
description =
"This specifies the name of the Avro schema. "
type = {DataType.STRING}),
@Parameter(name = "fail.on.missing.attribute",
description = "If this parameter is set to 'true', a JSON execution failing or returning a " +
"null value results in that message being dropped by the system.\n" +
Expand Down Expand Up @@ -206,7 +210,9 @@ public void init(StreamDefinition streamDefinition, OptionHolder optionHolder, L
optionHolder.validateAndGetStaticValue(DEFAULT_AVRO_MAPPING_PREFIX.concat(".").
concat(SCHEMA_REGISTRY), null),
optionHolder.validateAndGetStaticValue(DEFAULT_AVRO_MAPPING_PREFIX.concat(".").
concat(SCHEMA_ID), null), streamDefinition.getId());
concat(SCHEMA_ID), null), streamDefinition.getId(),
optionHolder.validateAndGetStaticValue(DEFAULT_AVRO_MAPPING_PREFIX.concat(".").
concat(SCHEMA_NAME), null), streamDefinition.getName());
useAvroDeserializer = Boolean.parseBoolean(
optionHolder.validateAndGetStaticValue(USE_AVRO_DESERIALIZER, "false"));
}
Expand All @@ -218,8 +224,14 @@ private Schema getAvroSchema(String schemaDefinition, String schemaRegistryURL,
if (schemaDefinition != null) {
schema = new Schema.Parser().parse(schemaDefinition);
} else if (schemaRegistryURL != null) {
SchemaRegistryReader schemaRegistryReader = new SchemaRegistryReader();
schema = schemaRegistryReader.getSchemaFromID(schemaRegistryURL, schemaID);
if (schemaID != null) {
SchemaRegistryReader schemaRegistryReader = new SchemaRegistryReader();
returnSchema = schemaRegistryReader.getSchemaFromID(schemaRegistryURL, schemaID);
}
if (streamName != null) {
SchemaRegistryReader schemaRegistryReader = new SchemaRegistryReader();
returnSchema = schemaRegistryReader.getSchemaFromName(schemaRegistryURL, streamName);
}
} else if (streamAttributes.size() > 0) {
log.warn("Schema Definition or Schema Registry is not specified in Stream. Hence generating " +
"schema from stream attributes.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,8 @@ public interface SchemaRegistryClient {
@RequestLine("GET /schemas/ids/{id}")
@Headers("Content-Type: application/json")
LinkedTreeMap findByID(@Param("id") String id);

@RequestLine("GET /subjects/{schemaName}/versions/latest")
@Headers("Content-Type: application/json")
LinkedTreeMap findBySchemaName(@Param("schemaName") schemaName id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,23 @@
*/
public class SchemaRegistryReader {
public Schema getSchemaFromID(String registryURL, String schemaID) throws SchemaParseException, FeignException {
SchemaRegistryClient registryClient = Feign.builder()
SchemaRegistryClient registryClient = getSchemaRegistryClient(registryURL);
LinkedTreeMap returnedSchema = registryClient.findByID(schemaID);
String jsonSchema = returnedSchema.get("schema").toString();
return new Schema.Parser().parse(jsonSchema);
}

public Schema getSchemaFromName(String registryURL, String schemaName) throws SchemaParseException, FeignException {
SchemaRegistryClient registryClient = getSchemaRegistryClient(registryURL);
LinkedTreeMap returnedSchema = registryClient.findBySchemaName(schemaName);
String jsonSchema = returnedSchema.get("schema").toString();
return new Schema.Parser().parse(jsonSchema);
}
private SchemaRegistryClient getSchemaRegistryClient(String registryURL){
return Feign.builder()
.client(new OkHttpClient())
.encoder(new GsonEncoder())
.decoder(new GsonDecoder())
.target(SchemaRegistryClient.class, registryURL);
LinkedTreeMap returnedSchema = registryClient.findByID(schemaID);
String jsonSchema = returnedSchema.get("schema").toString();
return new Schema.Parser().parse(jsonSchema);
}
}