Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-7722 Performance improve in KafkaRecordEmitter class #128

Merged
merged 2 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ public int getCommitLogMarkedCompletePollInterval() {
* See {@link EventOrderGuaranteeMode for details}.
*/
public static final Field EVENT_ORDER_GUARANTEE_MODE = Field.create("event.order.guarantee.mode")
.withDisplayName("VarInt Handling")
.withDisplayName("Event order guarantee")
.withEnum(EventOrderGuaranteeMode.class, EventOrderGuaranteeMode.COMMITLOG_FILE)
.withImportance(Importance.MEDIUM)
.withDescription("Specifies how grantee order of change events.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class ComponentFactoryStandalone implements ComponentFactory {
@Override
public OffsetWriter offsetWriter(CassandraConnectorConfig config) {
try {
return new FileOffsetWriter(config.offsetBackingStoreDir());
return new FileOffsetWriter(config.offsetBackingStoreDir(), config.offsetFlushIntervalMs(), config.maxOffsetFlushSize());
}
catch (IOException e) {
throw new CassandraConnectorConfigException(String.format("cannot create file offset writer into %s", config.offsetBackingStoreDir()), e);
Expand All @@ -30,8 +30,6 @@ public Emitter recordEmitter(CassandraConnectorContext context) {
config,
new KafkaProducer<>(config.getKafkaConfigs()),
context.getOffsetWriter(),
config.offsetFlushIntervalMs(),
config.maxOffsetFlushSize(),
config.getKeyConverter(),
config.getValueConverter(),
context.getErroneousCommitLogs(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -54,11 +58,20 @@ public class FileOffsetWriter implements OffsetWriter {
private final FileLock snapshotOffsetFileLock;
private final FileLock commitLogOffsetFileLock;

public FileOffsetWriter(String offsetDir) throws IOException {
private final OffsetFlushPolicy offsetFlushPolicy;
private final ExecutorService executorService;

private long timeOfLastFlush;
private long unflushedRecordCount;

public FileOffsetWriter(String offsetDir, OffsetFlushPolicy offsetFlushPolicy) throws IOException {
if (offsetDir == null) {
throw new CassandraConnectorConfigException("Offset file directory must be configured at the start");
}

this.offsetFlushPolicy = offsetFlushPolicy;
this.timeOfLastFlush = System.currentTimeMillis();
jpechane marked this conversation as resolved.
Show resolved Hide resolved

File offsetDirectory = new File(offsetDir);
if (!offsetDirectory.exists()) {
Files.createDirectories(offsetDirectory.toPath());
Expand All @@ -71,58 +84,84 @@ public FileOffsetWriter(String offsetDir) throws IOException {

loadOffset(this.snapshotOffsetFile, snapshotProps);
loadOffset(this.commitLogOffsetFile, commitLogProps);
this.executorService = Executors.newFixedThreadPool(1);
jpechane marked this conversation as resolved.
Show resolved Hide resolved
}

public FileOffsetWriter(String offsetDir, Duration offsetFlushIntervalMs, long maxOffsetFlushSize) throws IOException {
this(offsetDir, offsetFlushIntervalMs.isZero() ? OffsetFlushPolicy.always() : OffsetFlushPolicy.periodic(offsetFlushIntervalMs, maxOffsetFlushSize));
}

public FileOffsetWriter(String offsetDir) throws IOException {
this(offsetDir, OffsetFlushPolicy.never());
}

@Override
public void markOffset(String sourceTable, String sourceOffset, boolean isSnapshot) {
executorService.submit(() -> performMarkOffset(sourceTable, sourceOffset, isSnapshot));
}

private void performMarkOffset(String sourceTable, String sourceOffset, boolean isSnapshot) {
if (isSnapshot) {
synchronized (snapshotOffsetFileLock) {
if (!isOffsetProcessed(sourceTable, sourceOffset, isSnapshot)) {
snapshotProps.setProperty(sourceTable, sourceOffset);
}
if (!isOffsetProcessed(sourceTable, sourceOffset, isSnapshot)) {
snapshotProps.setProperty(sourceTable, sourceOffset);
}
}
else {
synchronized (commitLogOffsetFileLock) {
if (!isOffsetProcessed(sourceTable, sourceOffset, isSnapshot)) {
commitLogProps.setProperty(sourceTable, sourceOffset);
}
if (!isOffsetProcessed(sourceTable, sourceOffset, isSnapshot)) {
commitLogProps.setProperty(sourceTable, sourceOffset);
}
}
unflushedRecordCount += 1;
maybeFlushOffset();
}

@Override
public boolean isOffsetProcessed(String sourceTable, String sourceOffset, boolean isSnapshot) {
if (isSnapshot) {
synchronized (snapshotOffsetFileLock) {
return snapshotProps.containsKey(sourceTable);
}
return snapshotProps.containsKey(sourceTable);
}
else {
synchronized (commitLogOffsetFileLock) {
OffsetPosition currentOffset = OffsetPosition.parse(sourceOffset);
OffsetPosition recordedOffset = commitLogProps.containsKey(sourceTable) ? OffsetPosition.parse((String) commitLogProps.get(sourceTable)) : null;
return recordedOffset != null && currentOffset.compareTo(recordedOffset) <= 0;
}
OffsetPosition currentOffset = OffsetPosition.parse(sourceOffset);
OffsetPosition recordedOffset = commitLogProps.containsKey(sourceTable) ? OffsetPosition.parse((String) commitLogProps.get(sourceTable)) : null;
return recordedOffset != null && currentOffset.compareTo(recordedOffset) <= 0;
}
}

private void maybeFlushOffset() {
long now = System.currentTimeMillis();
long timeSinceLastFlush = now - timeOfLastFlush;
if (offsetFlushPolicy.shouldFlush(Duration.ofMillis(timeSinceLastFlush), unflushedRecordCount)) {
this.performFlush();
timeOfLastFlush = now;
unflushedRecordCount = 0;
}
}

@Override
public void flush() {
executorService.submit(this::performFlush);
jpechane marked this conversation as resolved.
Show resolved Hide resolved
}

private void performFlush() {
try {
synchronized (snapshotOffsetFileLock) {
saveOffset(snapshotOffsetFile, snapshotProps);
}
synchronized (commitLogOffsetFileLock) {
saveOffset(commitLogOffsetFile, commitLogProps);
}
saveOffset(snapshotOffsetFile, snapshotProps);
saveOffset(commitLogOffsetFile, commitLogProps);
}
catch (IOException e) {
LOGGER.warn("Ignoring flush failure", e);
}
}

@Override
public void close() {
try {
if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) {
executorService.shutdown();
}
}
catch (InterruptedException ignored) {
}

try {
snapshotOffsetFileLock.release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,11 @@
*/
package io.debezium.connector.cassandra;

import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.storage.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,25 +27,19 @@ public class KafkaRecordEmitter implements Emitter {
private final KafkaProducer<byte[], byte[]> producer;
private final TopicNamingStrategy<KeyspaceTable> topicNamingStrategy;
private final OffsetWriter offsetWriter;
private final OffsetFlushPolicy offsetFlushPolicy;
private final Set<String> erroneousCommitLogs;
private final CommitLogTransfer commitLogTransfer;
private final Map<Record, Future<RecordMetadata>> futures = new LinkedHashMap<>();
private final Object lock = new Object();
private final Converter keyConverter;
private final Converter valueConverter;
private long timeOfLastFlush;
private long emitCount = 0;
private final AtomicLong emitCount = new AtomicLong();

@SuppressWarnings("unchecked")
public KafkaRecordEmitter(CassandraConnectorConfig connectorConfig, KafkaProducer<byte[], byte[]> kafkaProducer,
OffsetWriter offsetWriter, Duration offsetFlushIntervalMs, long maxOffsetFlushSize,
Converter keyConverter, Converter valueConverter, Set<String> erroneousCommitLogs,
CommitLogTransfer commitLogTransfer) {
OffsetWriter offsetWriter, Converter keyConverter, Converter valueConverter,
Set<String> erroneousCommitLogs, CommitLogTransfer commitLogTransfer) {
this.producer = kafkaProducer;
this.topicNamingStrategy = connectorConfig.getTopicNamingStrategy(CommonConnectorConfig.TOPIC_NAMING_STRATEGY);
this.offsetWriter = offsetWriter;
this.offsetFlushPolicy = offsetFlushIntervalMs.isZero() ? OffsetFlushPolicy.always() : OffsetFlushPolicy.periodic(offsetFlushIntervalMs, maxOffsetFlushSize);
this.erroneousCommitLogs = erroneousCommitLogs;
this.commitLogTransfer = commitLogTransfer;
this.keyConverter = keyConverter;
Expand All @@ -60,13 +49,9 @@ public KafkaRecordEmitter(CassandraConnectorConfig connectorConfig, KafkaProduce
@Override
public void emit(Record record) {
try {
synchronized (lock) {
ProducerRecord<byte[], byte[]> producerRecord = toProducerRecord(record);
Future<RecordMetadata> future = producer.send(producerRecord);
LOGGER.trace("Sent to topic {}: {}", producerRecord.topic(), record);
futures.put(record, future);
maybeFlushAndMarkOffset();
}
ProducerRecord<byte[], byte[]> producerRecord = toProducerRecord(record);
producer.send(producerRecord, (metadata, exception) -> callback(record, exception));
LOGGER.trace("Sent to topic {}: {}", producerRecord.topic(), record);
}
catch (Exception e) {
if (record.getSource().snapshot || commitLogTransfer.getClass().getName().equals(CassandraConnectorConfig.DEFAULT_COMMIT_LOG_TRANSFER_CLASS)) {
Expand All @@ -84,46 +69,30 @@ protected ProducerRecord<byte[], byte[]> toProducerRecord(Record record) {
return new ProducerRecord<>(topic, serializedKey, serializedValue);
}

private void maybeFlushAndMarkOffset() {
long now = System.currentTimeMillis();
long timeSinceLastFlush = now - timeOfLastFlush;
if (offsetFlushPolicy.shouldFlush(Duration.ofMillis(timeSinceLastFlush), futures.size())) {
flushAndMarkOffset();
timeOfLastFlush = now;
private void callback(Record record, Exception exception) {
if (exception != null) {
LOGGER.error("Failed to emit record {}", record, exception);
return;
}
}

private void flushAndMarkOffset() {
futures.entrySet().stream().filter(this::flush).filter(this::hasOffset).forEach(this::markOffset);
offsetWriter.flush();
futures.clear();
}

private boolean flush(Map.Entry<Record, Future<RecordMetadata>> recordEntry) {
try {
recordEntry.getValue().get(); // wait
if (++emitCount % 10_000 == 0) {
LOGGER.debug("Emitted {} records to Kafka Broker", emitCount);
emitCount = 0;
}
return true;
long emitted = emitCount.incrementAndGet();
if (emitted % 10_000 == 0) {
jpechane marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.debug("Emitted {} records to Kafka Broker", emitted);
emitCount.addAndGet(-emitted);
}
catch (ExecutionException | InterruptedException e) {
LOGGER.error("Failed to emit record {}", recordEntry.getKey(), e);
return false;
if (hasOffset(record)) {
markOffset(record);
}
}

private boolean hasOffset(Map.Entry<Record, Future<RecordMetadata>> recordEntry) {
Record record = recordEntry.getKey();
private boolean hasOffset(Record record) {
if (record.getSource().snapshot || commitLogTransfer.getClass().getName().equals(CassandraConnectorConfig.DEFAULT_COMMIT_LOG_TRANSFER_CLASS)) {
return record.shouldMarkOffset();
}
return record.shouldMarkOffset() && !erroneousCommitLogs.contains(record.getSource().offsetPosition.fileName);
}

private void markOffset(Map.Entry<Record, Future<RecordMetadata>> recordEntry) {
SourceInfo source = recordEntry.getKey().getSource();
private void markOffset(Record record) {
SourceInfo source = record.getSource();
String sourceTable = source.keyspaceTable.name();
String sourceOffset = source.offsetPosition.serialize();
boolean isSnapshot = source.snapshot;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ static OffsetFlushPolicy always() {
return new AlwaysFlushOffsetPolicy();
}

static OffsetFlushPolicy never() {
return new NeverFlushOffsetPolicy();
}

static OffsetFlushPolicy periodic(Duration offsetFlushInterval, long maxOffsetFlushSize) {
return new PeriodicFlushOffsetPolicy(offsetFlushInterval, maxOffsetFlushSize);
}
Expand All @@ -49,4 +53,12 @@ public boolean shouldFlush(Duration timeSinceLastFlush, long numOfRecordsSinceLa
return true;
}
}

class NeverFlushOffsetPolicy implements OffsetFlushPolicy {

@Override
public boolean shouldFlush(Duration timeSinceLastFlush, long numOfRecordsSinceLastFlush) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ public void setUp() throws Exception {
context.getCassandraConnectorConfig(),
null,
context.getOffsetWriter(),
context.getCassandraConnectorConfig().offsetFlushIntervalMs(),
context.getCassandraConnectorConfig().maxOffsetFlushSize(),
context.getCassandraConnectorConfig().getKeyConverter(),
context.getCassandraConnectorConfig().getValueConverter(),
context.getErroneousCommitLogs(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ public void testFlush() throws IOException {
process(commitLogRecordDiffTable);

offsetWriter.flush();
// Sleep a little bit to ensure the operation is done
try {
Thread.sleep(100);
jpechane marked this conversation as resolved.
Show resolved Hide resolved
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
try (FileInputStream fis = new FileInputStream(offsetDir.toString() + "/" + FileOffsetWriter.SNAPSHOT_OFFSET_FILE)) {
snapshotProps.load(fis);
}
Expand Down Expand Up @@ -153,5 +160,12 @@ private void process(ChangeRecord record) {
record.getSource().keyspaceTable.name(),
record.getSource().offsetPosition.serialize(),
record.getSource().snapshot);
// Sleep a little bit to ensure the operation is done
try {
Thread.sleep(100);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package io.debezium.connector.cassandra;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
Expand All @@ -19,11 +18,9 @@ public class TestingKafkaRecordEmitter extends KafkaRecordEmitter {
public List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();

public TestingKafkaRecordEmitter(CassandraConnectorConfig connectorConfig, KafkaProducer<byte[], byte[]> kafkaProducer,
OffsetWriter offsetWriter, Duration offsetFlushIntervalMs, long maxOffsetFlushSize,
Converter keyConverter, Converter valueConverter, Set<String> erroneousCommitLogs,
CommitLogTransfer commitLogTransfer) {
super(connectorConfig, kafkaProducer, offsetWriter, offsetFlushIntervalMs, maxOffsetFlushSize, keyConverter, valueConverter,
erroneousCommitLogs, commitLogTransfer);
OffsetWriter offsetWriter, Converter keyConverter, Converter valueConverter,
Set<String> erroneousCommitLogs, CommitLogTransfer commitLogTransfer) {
super(connectorConfig, kafkaProducer, offsetWriter, keyConverter, valueConverter, erroneousCommitLogs, commitLogTransfer);
}

@Override
Expand Down