- If Kafka Streams leverage
exactly-once
, Replicator must be configured withisolation.level=read_committed
- Migrated application need to be restarted with
auto.offset.reset=latest
- In order to migrate connectors one by one, for source connectors, data must be extracted from the topics
connect-configs
andconnect-offsets
.- Source connectors (e.g. JDBC Source or Filestream source) commit offsets in Kafka Connect internal topics
connect-offsets
.
- Source connectors (e.g. JDBC Source or Filestream source) commit offsets in Kafka Connect internal topics
- On the source cluster:
- Stop producers and source connectors
- Wait for each topology and connectors to clear up lag.
- Stop the Kafka Streams application
- Copy Kafka Connect source connector offset json payloads
- Stop consumer and sink connectors
- Migration start:
- Start Replicator with
isolation.level=read_committed
- Wait for replicator to catch up
- Start Replicator with
- Start application on the destination cluster
- Setup connectors and source connect offset json playloads
- Start producers, connectors, Kafka Streams and consumers
- A source connector stream
data/source.txt
and push it tostreams-plaintext-input
- The Kafka Streams application count the occurence of each world from the
streams-plaintext-input
topic and push the result tostreams-wordcount-output
- This application has two internal topics:
streams-wordcount-counts-store-repartition
andstreams-wordcount-counts-store-changelog
- A sink connector consume the
streams-wordcount-output
topics and write the value of each message indata/sink.txt
export CCLOUD_CLUSTER=XXXXX.confluent.cloud:9092
export CLUSTER_API_KEY=XXXXX
export CLUSTER_API_SECRET=XXXXX
# Start the local environment, the connectors and Kafka Streams
./up
# Migrate Kafka Streams and connectors to Confluent Cloud
./migrate
echo "World" >> data/source.txt
sleep 10
cat data/sink.txt