-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka sink #3127
Kafka sink #3127
Conversation
Signed-off-by: rajeshLovesToCode <[email protected]>
Signed-off-by: rajeshLovesToCode <[email protected]>
Signed-off-by: rajeshLovesToCode <[email protected]>
Signed-off-by: rajeshLovesToCode <[email protected]>
|
||
private static final Integer NUM_OF_PARTITIONS = 3; | ||
private static final Short REPLICATION_FACTOR = 1; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: rename NUM_OF_PARTITIONS to DEFAULT_NUM_OF_PARTITIONS
rename REPLICATION_FACTOR to DEFAULT_REPLICATION_FACTOR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changes have been incorporated.
} else if (MessageFormat.AVRO.toString().equalsIgnoreCase(serdeFormat)) { | ||
publishAvroMessage(record, topic, key, dataForDlq); | ||
publishAvroMessage(record, key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this check Why not serdeFormat == MessageFormat.AVRO
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changes have been incorporated.
import java.util.Map; | ||
import java.util.Properties; | ||
|
||
public class SchemaService { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this only support Confluent Schema Registry? How about AWS GLUE Registry? Even if you do not implement it, it would be great if this can support that without having to change much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now it only supports confluent schema registry and we are not sure about AWS GLUE Registry.
@@ -97,10 +105,12 @@ private void doInitializeInternal() { | |||
|
|||
@Override | |||
public void doOutput(Collection<Record<Event>> records) { | |||
reentrantLock.lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this lock protecting? Is it possible to use synchronized(<object>) {
instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have imlemented this as other sinks like s3 sink.
Signed-off-by: rajeshLovesToCode <[email protected]>
Signed-off-by: rajeshLovesToCode <[email protected]>
Signed-off-by: rajeshLovesToCode <[email protected]>
|
||
public static class S3FileConfig { | ||
@Valid | ||
@Size(max = 0, message = "Schema from file is not supported.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all 3 messages needs to be updated as per the field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changes have been incorporated.
return s3FileConfig; | ||
} | ||
|
||
@AssertTrue(message = "Only one of Inline schema or Schema file location or S3 file config config must be specified") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minro: config repeated 2 times after S3 file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changes have been incorporated.
@JsonProperty("s3_file_config") | ||
private S3FileConfig s3FileConfig; | ||
|
||
@JsonProperty("is_create") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure what does is_create mean? create topic? should be renamed as per functionality
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changes have been incorporated.
@@ -38,6 +38,11 @@ public class TopicConfig { | |||
static final Integer DEFAULT_NUM_OF_WORKERS = 2; | |||
static final Duration DEFAULT_HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(5); | |||
|
|||
|
|||
private static final Integer DEFAULT_NUM_OF_PARTITIONS = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pleasae set values as per kafka defaults. if its 3 and then ignore this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changes have been incorporated.
try { | ||
final NewTopic newTopic = new NewTopic(topicName, numberOfPartitions, replicationFactor); | ||
adminClient.createTopics(Collections.singletonList(newTopic)).all().get(); | ||
LOG.info(topicName + " created successfully"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's also retention period.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changes have been incorporated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few minro comments, can be addressed in next review as well.
Signed-off-by: rajeshLovesToCode <[email protected]>
Signed-off-by: rajeshLovesToCode <[email protected]>
Signed-off-by: rajeshLovesToCode <[email protected]>
Signed-off-by: rajeshLovesToCode <[email protected]>
Signed-off-by: rajeshLovesToCode <[email protected]>
* -Support for kafka-sink Signed-off-by: rajeshLovesToCode <[email protected]> * -Support for kafka-sink Signed-off-by: rajeshLovesToCode <[email protected]> * -Support for kafka-sink Signed-off-by: rajeshLovesToCode <[email protected]> * -Support for kafka-sink Signed-off-by: rajeshLovesToCode <[email protected]> * -Support for kafka-sink Signed-off-by: rajeshLovesToCode <[email protected]> * -Support for kafka-sink Signed-off-by: rajeshLovesToCode <[email protected]> * -Support for kafka-sink Signed-off-by: rajeshLovesToCode <[email protected]> * -Support for kafka-sink Signed-off-by: rajeshLovesToCode <[email protected]> * -Support for kafka-sink Signed-off-by: rajeshLovesToCode <[email protected]>
Description
Kafka-sink auto topic creation and schema registration on the fly
Issues Resolved
Github issue #1986
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.