Skip to content

Commit

Permalink
DBZ-7722 Performance improve in KafkaRecordEmitter class
Browse files Browse the repository at this point in the history
change OffsetWriter methods to return Future object to use async approaches. use ElapsedTimeStrategy in PeriodicFlushOffsetPolicy class. use io.debezium.util.Threads in FileOffsetWriter class.
  • Loading branch information
samssh authored and jpechane committed Apr 18, 2024
1 parent cb84533 commit 40943a8
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ protected CassandraConnectorContext generateTaskContext(Configuration configurat
return new CassandraConnectorContext(config,
new Cassandra3SchemaLoader(),
new Cassandra3SchemaChangeListenerProvider(),
new FileOffsetWriter(config.offsetBackingStoreDir()));
new FileOffsetWriter(config));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ protected CassandraConnectorContext generateTaskContext(Configuration configurat
return new CassandraConnectorContext(config,
new Cassandra4SchemaLoader(),
new Cassandra4SchemaChangeListenerProvider(),
new FileOffsetWriter(config.offsetBackingStoreDir()));
new FileOffsetWriter(config));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
*/
package io.debezium.connector.cassandra;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
Expand Down Expand Up @@ -36,8 +39,9 @@ public OffsetWriter offsetWriter(CassandraConnectorConfig config) {
return new OffsetWriter() {

@Override
public void markOffset(String sourceTable, String sourceOffset, boolean isSnapshot) {
public Future<?> markOffset(String sourceTable, String sourceOffset, boolean isSnapshot) {
offset.putOffset(sourceTable, isSnapshot, sourceOffset);
return CompletableFuture.completedFuture(new Object());
}

@Override
Expand All @@ -50,8 +54,8 @@ public boolean isOffsetProcessed(String sourceTable, String sourceOffset, boolea
}

@Override
public void flush() {

public Future<?> flush() {
return CompletableFuture.completedFuture(new Object());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class ComponentFactoryStandalone implements ComponentFactory {
@Override
public OffsetWriter offsetWriter(CassandraConnectorConfig config) {
try {
return new FileOffsetWriter(config.offsetBackingStoreDir(), config.offsetFlushIntervalMs(), config.maxOffsetFlushSize());
return new FileOffsetWriter(config);
}
catch (IOException e) {
throw new CassandraConnectorConfigException(String.format("cannot create file offset writer into %s", config.offsetBackingStoreDir()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.connector.cassandra.exceptions.CassandraConnectorConfigException;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import io.debezium.util.Threads;

/**
* A concrete implementation of {@link OffsetWriter} which tracks the progress of events
Expand Down Expand Up @@ -61,18 +61,19 @@ public class FileOffsetWriter implements OffsetWriter {
private final OffsetFlushPolicy offsetFlushPolicy;
private final ExecutorService executorService;

private long timeOfLastFlush;
private long unflushedRecordCount;

public FileOffsetWriter(String offsetDir, OffsetFlushPolicy offsetFlushPolicy) throws IOException {
if (offsetDir == null) {
public FileOffsetWriter(CassandraConnectorConfig config) throws IOException {
if (config.offsetBackingStoreDir() == null) {
throw new CassandraConnectorConfigException("Offset file directory must be configured at the start");
}

this.offsetFlushPolicy = offsetFlushPolicy;
this.timeOfLastFlush = System.currentTimeMillis();
if (config.offsetFlushIntervalMs().isZero()) {
this.offsetFlushPolicy = OffsetFlushPolicy.always();
}
else {
this.offsetFlushPolicy = OffsetFlushPolicy.periodic(config.offsetFlushIntervalMs(), config.maxOffsetFlushSize());
}

File offsetDirectory = new File(offsetDir);
File offsetDirectory = new File(config.offsetBackingStoreDir());
if (!offsetDirectory.exists()) {
Files.createDirectories(offsetDirectory.toPath());
}
Expand All @@ -84,20 +85,12 @@ public FileOffsetWriter(String offsetDir, OffsetFlushPolicy offsetFlushPolicy) t

loadOffset(this.snapshotOffsetFile, snapshotProps);
loadOffset(this.commitLogOffsetFile, commitLogProps);
this.executorService = Executors.newFixedThreadPool(1);
}

public FileOffsetWriter(String offsetDir, Duration offsetFlushIntervalMs, long maxOffsetFlushSize) throws IOException {
this(offsetDir, offsetFlushIntervalMs.isZero() ? OffsetFlushPolicy.always() : OffsetFlushPolicy.periodic(offsetFlushIntervalMs, maxOffsetFlushSize));
}

public FileOffsetWriter(String offsetDir) throws IOException {
this(offsetDir, OffsetFlushPolicy.never());
this.executorService = Threads.newSingleThreadExecutor(AbstractSourceConnector.class, config.getConnectorName(), "offset-writer");
}

@Override
public void markOffset(String sourceTable, String sourceOffset, boolean isSnapshot) {
executorService.submit(() -> performMarkOffset(sourceTable, sourceOffset, isSnapshot));
public Future<?> markOffset(String sourceTable, String sourceOffset, boolean isSnapshot) {
return executorService.submit(() -> performMarkOffset(sourceTable, sourceOffset, isSnapshot));
}

private void performMarkOffset(String sourceTable, String sourceOffset, boolean isSnapshot) {
Expand All @@ -111,8 +104,9 @@ private void performMarkOffset(String sourceTable, String sourceOffset, boolean
commitLogProps.setProperty(sourceTable, sourceOffset);
}
}
unflushedRecordCount += 1;
maybeFlushOffset();
if (offsetFlushPolicy.shouldFlush()) {
this.performFlush();
}
}

@Override
Expand All @@ -127,19 +121,9 @@ public boolean isOffsetProcessed(String sourceTable, String sourceOffset, boolea
}
}

private void maybeFlushOffset() {
long now = System.currentTimeMillis();
long timeSinceLastFlush = now - timeOfLastFlush;
if (offsetFlushPolicy.shouldFlush(Duration.ofMillis(timeSinceLastFlush), unflushedRecordCount)) {
this.performFlush();
timeOfLastFlush = now;
unflushedRecordCount = 0;
}
}

@Override
public void flush() {
executorService.submit(this::performFlush);
public Future<?> flush() {
return executorService.submit(this::performFlush);
}

private void performFlush() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
*/
public class KafkaRecordEmitter implements Emitter {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordEmitter.class);
private static final int RECORD_LOG_COUNT = 10_000;

private final KafkaProducer<byte[], byte[]> producer;
private final TopicNamingStrategy<KeyspaceTable> topicNamingStrategy;
Expand Down Expand Up @@ -75,7 +76,7 @@ private void callback(Record record, Exception exception) {
return;
}
long emitted = emitCount.incrementAndGet();
if (emitted % 10_000 == 0) {
if (emitted % RECORD_LOG_COUNT == 0) {
LOGGER.debug("Emitted {} records to Kafka Broker", emitted);
emitCount.addAndGet(-emitted);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

import java.time.Duration;

import io.debezium.util.Clock;
import io.debezium.util.ElapsedTimeStrategy;

/**
* This policy determines how frequently the offset is flushed to disk.
*
Expand All @@ -17,48 +20,51 @@
* Always means that the offset if flushed to disk every time a record is processed.
*/
public interface OffsetFlushPolicy {
boolean shouldFlush(Duration timeSinceLastFlush, long numOfRecordsSinceLastFlush);
boolean shouldFlush();

static OffsetFlushPolicy always() {
return new AlwaysFlushOffsetPolicy();
}

static OffsetFlushPolicy never() {
return new NeverFlushOffsetPolicy();
}

static OffsetFlushPolicy periodic(Duration offsetFlushInterval, long maxOffsetFlushSize) {
return new PeriodicFlushOffsetPolicy(offsetFlushInterval, maxOffsetFlushSize);
}

class PeriodicFlushOffsetPolicy implements OffsetFlushPolicy {
private final Duration offsetFlushInterval;
private final long maxOffsetFlushSize;
private long unflushedRecordCount;
private final ElapsedTimeStrategy elapsedTimeStrategy;

PeriodicFlushOffsetPolicy(Duration offsetFlushInterval, long maxOffsetFlushSize) {
this.offsetFlushInterval = offsetFlushInterval;
this.maxOffsetFlushSize = maxOffsetFlushSize;
this.unflushedRecordCount = 0;
this.elapsedTimeStrategy = ElapsedTimeStrategy.constant(Clock.system(), offsetFlushInterval);
}

@Override
public boolean shouldFlush(Duration timeSinceLastFlush, long numOfRecordsSinceLastFlush) {
return timeSinceLastFlush.compareTo(offsetFlushInterval) >= 0 || numOfRecordsSinceLastFlush >= this.maxOffsetFlushSize;
public boolean shouldFlush() {
if (unflushedRecordCount >= this.maxOffsetFlushSize) {
clear();
return true;
}
if (elapsedTimeStrategy.hasElapsed()) {
clear();
return true;
}
unflushedRecordCount += 1;
return false;
}
}

class AlwaysFlushOffsetPolicy implements OffsetFlushPolicy {

@Override
public boolean shouldFlush(Duration timeSinceLastFlush, long numOfRecordsSinceLastFlush) {
return true;
private void clear() {
unflushedRecordCount = 0;
}
}

class NeverFlushOffsetPolicy implements OffsetFlushPolicy {
class AlwaysFlushOffsetPolicy implements OffsetFlushPolicy {

@Override
public boolean shouldFlush(Duration timeSinceLastFlush, long numOfRecordsSinceLastFlush) {
return false;
public boolean shouldFlush() {
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package io.debezium.connector.cassandra;

import java.util.concurrent.Future;

/**
* Interface for recording offset.
*/
Expand All @@ -16,7 +18,7 @@ public interface OffsetWriter {
* @param sourceOffset string in the format of <file_name>:<file_position>
* @param isSnapshot whether the offset is coming from a snapshot or commit log
*/
void markOffset(String sourceTable, String sourceOffset, boolean isSnapshot);
Future<?> markOffset(String sourceTable, String sourceOffset, boolean isSnapshot);

/**
* Determine if an offset has been processed based on the table name, offset position, and whether
Expand All @@ -31,7 +33,7 @@ public interface OffsetWriter {
/**
* Flush latest offsets to disk.
*/
void flush();
Future<?> flush();

/**
* Close all resources used by this class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -233,8 +234,19 @@ private void processResultSet(TableMetadata tableMetadata, ResultSet resultSet)
long rowNum = 0L;
// mark snapshot complete immediately if table is empty
if (!rowIter.hasNext()) {
offsetWriter.markOffset(tableName, OffsetPosition.defaultOffsetPosition().serialize(), true);
offsetWriter.flush();
try {
offsetWriter.markOffset(tableName, OffsetPosition.defaultOffsetPosition().serialize(), true).get();
}
catch (InterruptedException | ExecutionException e) {
LOGGER.error("error in marking snapshot offset of table {}", tableMetadata.getName(), e);
}
try {
offsetWriter.flush().get();
}
catch (InterruptedException | ExecutionException e) {
LOGGER.error("error in flushing snapshot offsets to disk of table {}", tableMetadata.getName(), e);
}

}

while (rowIter.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.connect.data.Schema;
import org.junit.Before;
Expand All @@ -30,11 +30,13 @@ public class FileOffsetWriterTest {
private Properties snapshotProps;
private Properties commitLogProps;
private CassandraSchemaFactory schemaFactory = CassandraSchemaFactory.get();
private CassandraConnectorConfig config;

@Before
public void setUp() throws IOException {
offsetDir = Files.createTempDirectory("offset");
offsetWriter = new FileOffsetWriter(offsetDir.toAbsolutePath().toString());
config = new CassandraConnectorConfig(Configuration.from(TestUtils.generateDefaultConfigMap()));
offsetDir = Path.of(config.offsetBackingStoreDir());
offsetWriter = new FileOffsetWriter(config);
snapshotProps = new Properties();
commitLogProps = new Properties();
}
Expand Down Expand Up @@ -106,12 +108,10 @@ public void testFlush() throws IOException {
process(commitLogRecord);
process(commitLogRecordDiffTable);

offsetWriter.flush();
// Sleep a little bit to ensure the operation is done
try {
Thread.sleep(100);
offsetWriter.flush().get();
}
catch (InterruptedException e) {
catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
try (FileInputStream fis = new FileInputStream(offsetDir.toString() + "/" + FileOffsetWriter.SNAPSHOT_OFFSET_FILE)) {
Expand All @@ -132,7 +132,7 @@ public void testFlush() throws IOException {

@Test(expected = CassandraConnectorTaskException.class)
public void testTwoFileWriterCannotCoexist() throws IOException {
new FileOffsetWriter(offsetDir.toAbsolutePath().toString());
new FileOffsetWriter(config);
}

private ChangeRecord generateRecord(boolean markOffset, boolean isSnapshot, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) {
Expand All @@ -156,15 +156,13 @@ private boolean isProcessed(ChangeRecord record) {
}

private void process(ChangeRecord record) {
offsetWriter.markOffset(
record.getSource().keyspaceTable.name(),
record.getSource().offsetPosition.serialize(),
record.getSource().snapshot);
// Sleep a little bit to ensure the operation is done
try {
Thread.sleep(100);
offsetWriter.markOffset(
record.getSource().keyspaceTable.name(),
record.getSource().offsetPosition.serialize(),
record.getSource().snapshot).get();
}
catch (InterruptedException e) {
catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ protected CassandraConnectorContext generateTaskContext(Configuration configurat
return new CassandraConnectorContext(config,
new DseSchemaLoader(),
new DseSchemaChangeListenerProvider(),
new FileOffsetWriter(config.offsetBackingStoreDir()));
new FileOffsetWriter(config));
}
}

0 comments on commit 40943a8

Please sign in to comment.