-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmigrate
executable file
·102 lines (79 loc) · 4.95 KB
/
migrate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#!/bin/bash
echo "Cleaning Confluent Cloud cluster"
for i in cloud-wordcount-counts-store-changelog cloud-wordcount-counts-store-repartition streams-plaintext-output streams-plaintext-input migration-demo-connect-configs migration-demo-connect-offsets migration-demo-connect-statuses _confluent-command; do
confluent kafka topic delete $i;
done
for i in cloud-wordcount-counts-store-changelog cloud-wordcount-counts-store-repartition streams-plaintext-output streams-plaintext-input ; do
confluent kafka topic create $i;
done
set -e
echo Pause source connectors
curl -s -f -X PUT http://localhost:8083/connectors/connector-source/pause -H 'Content-Type:application/json'
echo Wait for Kafka Streams lag to reach 0
# EoS could return lag of 0 or 1 due to the commit mark
until [[ `docker-compose exec broker kafka-consumer-groups --describe --bootstrap-server localhost:19092 --group streams-wordcount | grep 'streams-wordcount-' | awk '{ print $6 }' | grep -v '0' | grep -v '1' | grep -v '-' | wc -l | sed -e 's/^[[:space:]]*//'` -eq 0 ]]; do
echo -n .
sleep 5
done
echo
echo Wait for sink connect lag to reach 0
# EoS could return lag of 0 or 1 due to the commit mark
until [[ `docker-compose exec broker kafka-consumer-groups --describe --bootstrap-server localhost:19092 --group connect-connector-sink | grep 'connect-connector-sink-' | awk '{ print $6 }' | grep -v '0' | wc -l | sed -e 's/^[[:space:]]*//'` -eq 0 ]]; do
echo -n .
sleep 5
done
echo
echo Delete connectors
curl -s -f -X PUT http://localhost:8083/connectors/connector-sink/pause -H 'Content-Type:application/json'
echo Stop Kafka Streams
docker-compose stop kafka-streams
echo Start Kafka Connect that uses cloud
docker-compose up -d connect-cloud
echo Waiting for Kafka Connect to be up and running
until curl -s -f -X 'GET' 'http://localhost:8084/'; do
echo -n "."
sleep 5
done
echo
echo Starting Replicator
curl -s -f -X POST http://localhost:8084/connectors -H 'Content-Type: application/json' --data "`cat connect/replicator.json| tr -d '\r\n' | sed "s/CCLOUD_CLUSTER/$CCLOUD_CLUSTER/" | sed "s/CLUSTER_API_KEY/$CLUSTER_API_KEY/" | sed "s#CLUSTER_API_SECRET#$CLUSTER_API_SECRET#"`" | jq .
echo Wait for Replicator to finish moving local data to cloud
for i in `seq 3`; do
sleep 10
curl -s http://localhost:8084/connectors/replicator/status | jq
done
echo Ensure Replicator lag is 0
until [[ `curl -s -f http://localhost:8084/WorkerMetrics/replicator | jq '.tasks[].metrics[].messageLag' | awk '{sum=sum+$0} END{print sum}'` -eq '0' ]]; do
echo -n .
sleep 5
done
echo
echo Stopping Replicator
curl -s -f -X DELETE localhost:8084/connectors/replicator
echo Sopping Kafka Connect as we will manually copy connector configs and offsets
docker-compose stop connect-cloud
echo Copying connector-source and connector-sink configs to kafka
for line in `docker-compose exec broker kafka-console-consumer --timeout-ms 15000 --bootstrap-server localhost:29092 --topic docker-connect-configs --property print.key=true --property 'key.separator=|' --from-beginning | grep -E 'connector-source|connector-sink'`; do
if [[ "$line" =~ 'PAUSED' ]]; then
continue;
fi
docker-compose exec -e "msg=$line" broker bash -c "echo \"\$msg\" | kafka-console-producer --broker-list $CCLOUD_CLUSTER --topic migration-demo-connect-configs --property 'parse.key=true' --property 'key.separator=|' --producer-property \"sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='$CLUSTER_API_KEY' password='$CLUSTER_API_SECRET';\" --producer-property \"security.protocol=SASL_SSL\" --producer-property \"sasl.mechanism=PLAIN\""
done
echo Copying connector-source and connector-sink offsets to kafka
for line in `docker-compose exec broker kafka-console-consumer --timeout-ms 5000 --bootstrap-server localhost:19092 --topic docker-connect-offsets --property print.key=true --property 'key.separator=|' --from-beginning | grep 'connector-source' | tail -n 1`; do
docker-compose exec -e "msg=$line" broker bash -c "echo \"\$msg\" | kafka-console-producer --broker-list $CCLOUD_CLUSTER --topic migration-demo-connect-offsets --property 'parse.key=true' --property 'key.separator=|' --producer-property \"sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='$CLUSTER_API_KEY' password='$CLUSTER_API_SECRET';\" --producer-property \"security.protocol=SASL_SSL\" --producer-property \"sasl.mechanism=PLAIN\""
done
echo Restarting Kafka Connect with
docker-compose up -d connect-cloud
echo Waiting for Kafka Connect using cloud to be up and running
until curl -s -f -X 'GET' 'http://localhost:8084/'; do
echo -n "."
sleep 5
done
echo
echo Resuming source connector
curl -s -X 'PUT' 'http://localhost:8084/connectors/connector-source/resume' -H 'Content-Type:application/json'
echo Resuming sink connector
curl -s -X 'PUT' 'http://localhost:8084/connectors/connector-sink/resume' -H 'Content-Type:application/json'
echo Restarting Kafka Streams
docker-compose up -d kafka-streams-cloud