-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support AWS Kinesis Data Streams as a Source #4836
Changes from 1 commit
c2dcf76
1523b8a
f8f89fb
0567858
63a2f91
0fd763b
92a797b
806f78d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.kinesis.source.converter; | ||
|
||
import org.opensearch.dataprepper.model.codec.InputCodec; | ||
import org.opensearch.dataprepper.model.event.Event; | ||
import org.opensearch.dataprepper.model.record.Record; | ||
import software.amazon.kinesis.retrieval.KinesisClientRecord; | ||
|
||
import java.io.ByteArrayInputStream; | ||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.function.Consumer; | ||
|
||
public class KinesisRecordConverter { | ||
|
||
private final InputCodec codec; | ||
|
||
public KinesisRecordConverter(final InputCodec codec) { | ||
this.codec = codec; | ||
} | ||
|
||
public List<Record<Event>> convert(List<KinesisClientRecord> kinesisClientRecords) throws IOException { | ||
List<Record<Event>> records = new ArrayList<>(); | ||
for (KinesisClientRecord record : kinesisClientRecords) { | ||
processRecord(record, records::add); | ||
} | ||
return records; | ||
} | ||
|
||
private void processRecord(KinesisClientRecord record, Consumer<Record<Event>> eventConsumer) throws IOException { | ||
// Read bytebuffer | ||
byte[] arr = new byte[record.data().remaining()]; | ||
record.data().get(arr); | ||
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(arr); | ||
codec.parse(byteArrayInputStream, eventConsumer); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.kinesis.source.processor; | ||
|
||
import lombok.Builder; | ||
import lombok.Getter; | ||
import lombok.Setter; | ||
import software.amazon.kinesis.processor.RecordProcessorCheckpointer; | ||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; | ||
|
||
@Builder | ||
@Getter | ||
@Setter | ||
public class KinesisCheckpointerRecord { | ||
private RecordProcessorCheckpointer checkpointer; | ||
private ExtendedSequenceNumber extendedSequenceNumber; | ||
private boolean readyToCheckpoint; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.kinesis.source.processor; | ||
|
||
import software.amazon.kinesis.processor.RecordProcessorCheckpointer; | ||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; | ||
|
||
import java.util.ArrayList; | ||
import java.util.LinkedHashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
|
||
public class KinesisCheckpointerTracker { | ||
private final Map<ExtendedSequenceNumber, KinesisCheckpointerRecord> checkpointerRecordList = new LinkedHashMap<>(); | ||
|
||
public synchronized void addRecordForCheckpoint(final ExtendedSequenceNumber extendedSequenceNumber, | ||
final RecordProcessorCheckpointer checkpointer) { | ||
checkpointerRecordList.put(extendedSequenceNumber, KinesisCheckpointerRecord.builder() | ||
.extendedSequenceNumber(extendedSequenceNumber) | ||
.checkpointer(checkpointer) | ||
.readyToCheckpoint(false) | ||
.build()); | ||
} | ||
|
||
public synchronized void markSequenceNumberForCheckpoint(final ExtendedSequenceNumber extendedSequenceNumber) { | ||
if (!checkpointerRecordList.containsKey(extendedSequenceNumber)) { | ||
throw new IllegalArgumentException("checkpointer not available"); | ||
} | ||
checkpointerRecordList.get(extendedSequenceNumber).setReadyToCheckpoint(true); | ||
} | ||
|
||
public synchronized Optional<KinesisCheckpointerRecord> getLatestAvailableCheckpointRecord() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe rename this to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have renamed this method to |
||
Optional<KinesisCheckpointerRecord> kinesisCheckpointerRecordOptional = Optional.empty(); | ||
List<ExtendedSequenceNumber> toRemoveRecords = new ArrayList<>(); | ||
|
||
for (Map.Entry<ExtendedSequenceNumber, KinesisCheckpointerRecord> entry: checkpointerRecordList.entrySet()) { | ||
KinesisCheckpointerRecord kinesisCheckpointerRecord = entry.getValue(); | ||
|
||
// Break out of the loop on the first record which is not ready for checkpoint | ||
if (!kinesisCheckpointerRecord.isReadyToCheckpoint()) { | ||
break; | ||
} | ||
|
||
kinesisCheckpointerRecordOptional = Optional.of(kinesisCheckpointerRecord); | ||
toRemoveRecords.add(entry.getKey()); | ||
} | ||
|
||
//Cleanup the ones which are already marked for checkpoint | ||
for (ExtendedSequenceNumber extendedSequenceNumber: toRemoveRecords) { | ||
checkpointerRecordList.remove(extendedSequenceNumber); | ||
} | ||
|
||
return kinesisCheckpointerRecordOptional; | ||
} | ||
|
||
public synchronized int size() { | ||
return checkpointerRecordList.size(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this codec the type of data they send to Kinesis? It's not just JSON records from Kinesis streams? Or is it always provided from Kinesis as just bytes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we read from Kinesis, it always comes in as bytes. Reference Record