Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh authored May 7, 2024
2 parents 1f320a6 + 60ff9db commit 096cf21
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 94 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,30 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.mongo.buffer;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
* Base Record Buffer writer that transform the source data into a JacksonEvent,
* Record Buffer writer that transform the source data into a JacksonEvent,
* and then writes to buffer.
*/
public abstract class RecordBufferWriter {
public class RecordBufferWriter {
private static final Logger LOG = LoggerFactory.getLogger(RecordBufferWriter.class);
static final String RECORDS_PROCESSED_COUNT = "recordsProcessed";
static final String RECORDS_PROCESSING_ERROR_COUNT = "recordProcessingErrors";
private final PluginMetrics pluginMetrics;
private final Counter recordSuccessCounter;
private final Counter recordErrorCounter;
private final BufferAccumulator<Record<Event>> bufferAccumulator;

public RecordBufferWriter(final BufferAccumulator<Record<Event>> bufferAccumulator) {
private RecordBufferWriter(final BufferAccumulator<Record<Event>> bufferAccumulator,
final PluginMetrics pluginMetrics) {
this.bufferAccumulator = bufferAccumulator;
this.pluginMetrics = pluginMetrics;
this.recordSuccessCounter = pluginMetrics.counter(RECORDS_PROCESSED_COUNT);
this.recordErrorCounter = pluginMetrics.counter(RECORDS_PROCESSING_ERROR_COUNT);
}

public abstract void writeToBuffer(final AcknowledgementSet acknowledgementSet,
final List<Event> records);
public static RecordBufferWriter create(final BufferAccumulator<Record<Event>> bufferAccumulator,
final PluginMetrics pluginMetrics) {
return new RecordBufferWriter(bufferAccumulator, pluginMetrics);
}

void flushBuffer() throws Exception {
bufferAccumulator.flush();
Expand All @@ -45,4 +56,27 @@ public void addToBuffer(final AcknowledgementSet acknowledgementSet,

bufferAccumulator.add(new Record<>(record));
}

public void writeToBuffer(final AcknowledgementSet acknowledgementSet,
final List<Event> records) {

int eventCount = 0;
for (final Event record : records) {
try {
addToBuffer(acknowledgementSet, record);
eventCount++;
} catch (Exception e) {
// will this cause too many logs?
LOG.error("Failed to add event to buffer due to {}", e.getMessage());
}
}

try {
flushBuffer();
recordSuccessCounter.increment(eventCount);
} catch (Exception e) {
LOG.error("Failed to write {} events to buffer due to {}", eventCount, e.getMessage());
recordErrorCounter.increment(eventCount);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ public class ExportPartitionWorker implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ExportPartitionWorker.class);
private static final int PARTITION_KEY_PARTS = 4;
static final Duration VERSION_OVERLAP_TIME_FOR_EXPORT = Duration.ofMinutes(5);
static final String SUCCESS_ITEM_COUNTER_NAME = "exportRecordsSuccessTotal";
static final String FAILURE_ITEM_COUNTER_NAME = "exportRecordsFailedTotal";
static final String EXPORT_RECORDS_TOTAL_COUNT = "exportRecordsTotal";
static final String SUCCESS_ITEM_COUNTER_NAME = "exportRecordsProcessed";
static final String FAILURE_ITEM_COUNTER_NAME = "exportRecordProcessingErrors";
static final String BYTES_RECEIVED = "bytesReceived";
static final String BYTES_PROCESSED = "bytesProcessed";
private static final String PARTITION_KEY_SPLITTER = "\\|";
private static final String COLLECTION_SPLITTER = "\\.";

Expand All @@ -67,9 +69,11 @@ public class ExportPartitionWorker implements Runnable {
private static volatile boolean shouldStop = false;

private final MongoDBSourceConfig sourceConfig;
private final Counter exportRecordTotalCounter;
private final Counter successItemsCounter;
private final Counter failureItemsCounter;
private final DistributionSummary bytesReceivedSummary;
private final DistributionSummary bytesProcessedSummary;
private final RecordBufferWriter recordBufferWriter;
private final PartitionKeyRecordConverter recordConverter;
private final DataQueryPartition dataQueryPartition;
Expand All @@ -95,9 +99,11 @@ public ExportPartitionWorker(final RecordBufferWriter recordBufferWriter,
this.partitionCheckpoint = partitionCheckpoint;
this.startLine = 0;// replace it with checkpoint line
this.exportStartTime = exportStartTime;
this.exportRecordTotalCounter = pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT);
this.successItemsCounter = pluginMetrics.counter(SUCCESS_ITEM_COUNTER_NAME);
this.failureItemsCounter = pluginMetrics.counter(FAILURE_ITEM_COUNTER_NAME);
this.bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED);
this.bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED);
}

private boolean shouldWaitForS3Partition(final String collection) {
Expand Down Expand Up @@ -149,6 +155,7 @@ public void run() {
int recordCount = 0;
int lastRecordNumberProcessed = 0;
final List<Event> records = new ArrayList<>();
final List<Long> recordBytes = new ArrayList<>();
try (MongoCursor<Document> cursor = col.find(query).iterator()) {
while (cursor.hasNext() && !Thread.currentThread().isInterrupted()) {
if (shouldStop) {
Expand All @@ -162,11 +169,13 @@ public void run() {
if (totalRecords <= startLine) {
continue;
}
exportRecordTotalCounter.increment();

try {
final Document document = cursor.next();
final String record = document.toJson(JSON_WRITER_SETTINGS);
final long bytes = record.getBytes().length;
recordBytes.add(bytes);
bytesReceivedSummary.record(bytes);
final Optional<BsonDocument> primaryKeyDoc = Optional.ofNullable(document.toBsonDocument());
final String primaryKeyBsonType = primaryKeyDoc.map(bsonDocument -> bsonDocument.get(DOCUMENTDB_ID_FIELD_NAME).getBsonType().name()).orElse(UNKNOWN_TYPE);
Expand All @@ -184,7 +193,10 @@ public void run() {
if ((recordCount - startLine) % DEFAULT_BATCH_SIZE == 0) {
LOG.debug("Write to buffer for line " + (recordCount - DEFAULT_BATCH_SIZE) + " to " + recordCount);
recordBufferWriter.writeToBuffer(acknowledgementSet, records);
successItemsCounter.increment(records.size());
bytesProcessedSummary.record(recordBytes.stream().mapToLong(Long::longValue).sum());
records.clear();
recordBytes.clear();
lastRecordNumberProcessed = recordCount;
// checkpointing in finally block when all records are processed
}
Expand All @@ -195,11 +207,10 @@ public void run() {
lastCheckpointTime = System.currentTimeMillis();
}

successItemsCounter.increment();
successRecords += 1;
} catch (Exception e) {
LOG.error("Failed to add record to buffer with error.", e);
failureItemsCounter.increment();
failureItemsCounter.increment(records.size());
failedRecords += 1;
}
}
Expand All @@ -209,9 +220,12 @@ public void run() {
// If all records were written to buffer, checkpoint will be done in finally block
recordBufferWriter.writeToBuffer(acknowledgementSet, records);
partitionCheckpoint.checkpoint(recordCount);
successItemsCounter.increment(records.size());
bytesProcessedSummary.record(recordBytes.stream().mapToLong(Long::longValue).sum());
}

records.clear();
recordBytes.clear();

LOG.info("Completed writing query partition: {} to buffer", query);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.mongo.buffer.ExportRecordBufferWriter;
import org.opensearch.dataprepper.plugins.mongo.buffer.RecordBufferWriter;
import org.opensearch.dataprepper.plugins.mongo.converter.PartitionKeyRecordConverter;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.DataQueryPartition;
Expand Down Expand Up @@ -89,7 +88,7 @@ public ExportWorker(final EnhancedSourceCoordinator sourceCoordinator,
this.sourceCoordinator = sourceCoordinator;
executor = Executors.newFixedThreadPool(MAX_JOB_COUNT);
final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);
recordBufferWriter = ExportRecordBufferWriter.create(bufferAccumulator, pluginMetrics);
recordBufferWriter = RecordBufferWriter.create(bufferAccumulator, pluginMetrics);
this.acknowledgementSetManager = acknowledgementSetManager;
this.sourceConfig = sourceConfig;
this.startLine = 0;// replace it with checkpoint line
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.mongo.buffer.ExportRecordBufferWriter;
import org.opensearch.dataprepper.plugins.mongo.buffer.RecordBufferWriter;
import org.opensearch.dataprepper.plugins.mongo.configuration.CollectionConfig;
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
Expand Down Expand Up @@ -53,7 +52,7 @@ public StreamScheduler(final EnhancedSourceCoordinator sourceCoordinator,
final PluginMetrics pluginMetrics) {
this.sourceCoordinator = sourceCoordinator;
final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);
recordBufferWriter = ExportRecordBufferWriter.create(bufferAccumulator, pluginMetrics);
recordBufferWriter = RecordBufferWriter.create(bufferAccumulator, pluginMetrics);
this.acknowledgementSetManager = acknowledgementSetManager;
this.sourceConfig = sourceConfig;
checkArgument(Objects.nonNull(s3PathPrefix), "S3 path prefix must not be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public class StreamWorker {
OperationType.UPDATE, OperationType.REPLACE);
private static final Set<OperationType> STREAM_TERMINATE_OPERATION_TYPE = Set.of(OperationType.INVALIDATE,
OperationType.DROP, OperationType.DROP_DATABASE);
static final String SUCCESS_ITEM_COUNTER_NAME = "streamRecordsSuccessTotal";
static final String FAILURE_ITEM_COUNTER_NAME = "streamRecordsFailedTotal";
static final String SUCCESS_ITEM_COUNTER_NAME = "changeEventsProcessed";
static final String FAILURE_ITEM_COUNTER_NAME = "changeEventsProcessingErrors";
static final String BYTES_RECEIVED = "bytesReceived";
private final RecordBufferWriter recordBufferWriter;
private final PartitionKeyRecordConverter recordConverter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.opensearch.dataprepper.plugins.mongo.buffer.ExportRecordBufferWriter.EXPORT_RECORDS_PROCESSED_COUNT;
import static org.opensearch.dataprepper.plugins.mongo.buffer.ExportRecordBufferWriter.EXPORT_RECORDS_PROCESSING_ERROR_COUNT;
import static org.opensearch.dataprepper.plugins.mongo.buffer.RecordBufferWriter.RECORDS_PROCESSED_COUNT;
import static org.opensearch.dataprepper.plugins.mongo.buffer.RecordBufferWriter.RECORDS_PROCESSING_ERROR_COUNT;

@ExtendWith(MockitoExtension.class)
class ExportRecordBufferWriterTest {
Expand Down Expand Up @@ -61,8 +61,8 @@ class ExportRecordBufferWriterTest {

@BeforeEach
void setup() {
given(pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT)).willReturn(exportRecordSuccess);
given(pluginMetrics.counter(EXPORT_RECORDS_PROCESSING_ERROR_COUNT)).willReturn(exportRecordErrors);
given(pluginMetrics.counter(RECORDS_PROCESSED_COUNT)).willReturn(exportRecordSuccess);
given(pluginMetrics.counter(RECORDS_PROCESSING_ERROR_COUNT)).willReturn(exportRecordErrors);

}

Expand All @@ -79,7 +79,7 @@ void test_writeToBuffer() throws Exception {
int numberOfRecords = random.nextInt(10);

final List<Event> data = generateData(numberOfRecords);
final ExportRecordBufferWriter recordBufferWriter = ExportRecordBufferWriter.create(bufferAccumulator,
final RecordBufferWriter recordBufferWriter = RecordBufferWriter.create(bufferAccumulator,
pluginMetrics);

recordBufferWriter.writeToBuffer(null, data);
Expand All @@ -93,7 +93,7 @@ void test_writeToBuffer() throws Exception {
void test_writeSingleRecordToBuffer() throws Exception {
final ArgumentCaptor<Record<Event>> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class);

final ExportRecordBufferWriter recordBufferWriter = ExportRecordBufferWriter.create(bufferAccumulator,
final RecordBufferWriter recordBufferWriter = RecordBufferWriter.create(bufferAccumulator,
pluginMetrics);

recordBufferWriter.writeToBuffer(null, List.of(event));
Expand All @@ -108,7 +108,7 @@ void test_writeSingleRecordToBuffer() throws Exception {
void test_writeSingleRecordToBufferWithAcknowledgementSet() throws Exception {
final ArgumentCaptor<Record<Event>> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class);

final ExportRecordBufferWriter recordBufferWriter = ExportRecordBufferWriter.create(bufferAccumulator,
final RecordBufferWriter recordBufferWriter = RecordBufferWriter.create(bufferAccumulator,
pluginMetrics);
recordBufferWriter.writeToBuffer(acknowledgementSet, List.of(event));
verify(bufferAccumulator).add(recordArgumentCaptor.capture());
Expand All @@ -123,7 +123,7 @@ void test_writeSingleRecordToBufferWithAcknowledgementSet() throws Exception {
void test_writeSingleRecordFlushException() throws Exception {
final ArgumentCaptor<Record<Event>> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class);

final ExportRecordBufferWriter recordBufferWriter = ExportRecordBufferWriter.create(bufferAccumulator,
final RecordBufferWriter recordBufferWriter = RecordBufferWriter.create(bufferAccumulator,
pluginMetrics);
doThrow(RuntimeException.class).when(bufferAccumulator).flush();

Expand Down
Loading

0 comments on commit 096cf21

Please sign in to comment.