diff --git a/writers/iceberg/debezium-server-iceberg-sink/pom.xml b/writers/iceberg/debezium-server-iceberg-sink/pom.xml
index 0d20e71..c14466b 100644
--- a/writers/iceberg/debezium-server-iceberg-sink/pom.xml
+++ b/writers/iceberg/debezium-server-iceberg-sink/pom.xml
@@ -311,6 +311,11 @@
5.0.0
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.15.2
+
org.apache.spark
spark-core_${version.spark.scala}
diff --git a/writers/iceberg/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/rpc/OlakeRowsIngester.java b/writers/iceberg/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/rpc/OlakeRowsIngester.java
index 6acf062..194af5b 100644
--- a/writers/iceberg/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/rpc/OlakeRowsIngester.java
+++ b/writers/iceberg/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/rpc/OlakeRowsIngester.java
@@ -9,25 +9,26 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.List;
-
import java.util.Map;
import java.util.stream.Collectors;
-
// This class is used to receive rows from the Olake Golang project and dump it into iceberg using prebuilt code here.
@Dependent
public class OlakeRowsIngester extends StringArrayServiceGrpc.StringArrayServiceImplBase {
-
+ private static final Logger LOGGER = LoggerFactory.getLogger(OlakeRowsIngester.class);
private String icebergNamespace = "public";
Catalog icebergCatalog;
-
private final IcebergTableOperator icebergTableOperator;
+ // Create a single reusable ObjectMapper instance
+ private static final ObjectMapper objectMapper = new ObjectMapper();
public OlakeRowsIngester() {
icebergTableOperator = new IcebergTableOperator();
@@ -45,56 +46,55 @@ public void setIcebergCatalog(Catalog icebergCatalog) {
this.icebergCatalog = icebergCatalog;
}
-
@Override
public void sendStringArray(Messaging.StringArrayRequest request, StreamObserver responseObserver) {
+ String requestId = String.format("[Thread-%d-%d]", Thread.currentThread().getId(), System.nanoTime());
+ long startTime = System.currentTimeMillis();
// Retrieve the array of strings from the request
List messages = request.getMessagesList();
+ long parsingStartTime = System.currentTimeMillis();
Map> result =
- messages.stream()
+ messages.parallelStream() // Use parallel stream for concurrent processing
.map(message -> {
try {
- ObjectMapper objectMapper = new ObjectMapper();
-
// Read the entire JSON message into a Map:
- Map messageMap =
- objectMapper.readValue(message, new TypeReference