diff --git a/debezium-server-voyager/debezium-server-voyagerexporter/src/main/java/io/debezium/server/ybexporter/YbExporterConsumer.java b/debezium-server-voyager/debezium-server-voyagerexporter/src/main/java/io/debezium/server/ybexporter/YbExporterConsumer.java index f161fa7c5b..ecf729b9e9 100644 --- a/debezium-server-voyager/debezium-server-voyagerexporter/src/main/java/io/debezium/server/ybexporter/YbExporterConsumer.java +++ b/debezium-server-voyager/debezium-server-voyagerexporter/src/main/java/io/debezium/server/ybexporter/YbExporterConsumer.java @@ -46,6 +46,8 @@ public class YbExporterConsumer extends BaseChangeConsumer { private RecordTransformer recordTransformer; Thread flusherThread; boolean shutDown = false; + Object flushingSnapshotFilesLock = new Object(); + public YbExporterConsumer(String dataDir){ this.dataDir = dataDir; @@ -121,9 +123,11 @@ else if (exporterRole.equals(TARGET_DB_EXPORTER_FB_ROLE)){ } while (true) { - for (RecordWriter writer : snapshotWriters.values()) { - writer.flush(); - writer.sync(); + synchronized (flushingSnapshotFilesLock){ + for (RecordWriter writer : snapshotWriters.values()) { + writer.flush(); + writer.sync(); + } } // TODO: doing more than flushing files to disk. maybe move this call to another thread? if (exportStatus != null) { @@ -282,7 +286,9 @@ private void checkIfSnapshotAlreadyComplete(Record r) { } private void handleSnapshotComplete() { - closeSnapshotWriters(); + synchronized (flushingSnapshotFilesLock){ + closeSnapshotWriters(); + } exportStatus.updateMode(ExportMode.STREAMING); exportStatus.flushToDisk(); openCDCWriter();