diff --git a/tools/pika_cdc/conf/cdc.yml b/tools/pika_cdc/conf/cdc.yml index e09d827b0..db122d006 100644 --- a/tools/pika_cdc/conf/cdc.yml +++ b/tools/pika_cdc/conf/cdc.yml @@ -1,4 +1,4 @@ -# pika_server +# pika_server, this port is pika replication port. pika_server : 127.0.0.1:11221 # For data from one DB of one pika, a separate MQ topic is created, # and the name of the topic is the dbname of the pika @@ -6,8 +6,6 @@ kafka_servers: - 127.0.0.1:9092 redis_servers: - 127.0.0.1:6379 -pulsar_servers: - - 127.0.0.1:6650 # retry times while send message failed retries : 0 # retry interval while send message failed(ms) diff --git a/tools/pika_cdc/conf/conf.go b/tools/pika_cdc/conf/conf.go index 6cee119d0..f07ac86d6 100644 --- a/tools/pika_cdc/conf/conf.go +++ b/tools/pika_cdc/conf/conf.go @@ -16,7 +16,6 @@ type PikaCdcConfig struct { PikaServer string `yaml:"pika_server"` KafkaServers []string `yaml:"kafka_servers"` RedisServers []string `yaml:"redis_servers"` - PulsarServers []string `yaml:"pulsar_servers"` Retries int `yaml:"retries"` RetryInterval int `yaml:"retry_interval"` ParallelThreadSize int `yaml:"parallel_thread_size"` diff --git a/tools/pika_cdc/consumer/kafka.go b/tools/pika_cdc/consumer/kafka.go index 3fdcd243a..ea86a2d99 100644 --- a/tools/pika_cdc/consumer/kafka.go +++ b/tools/pika_cdc/consumer/kafka.go @@ -14,7 +14,6 @@ type Kafka struct { wg sync.WaitGroup msgChanns map[string]chan []byte stopChan chan bool - once sync.Once protocol KafkaProtocol } @@ -46,26 +45,13 @@ func NewKafka(server string, retries int, msgChanns map[string]chan []byte) (*Ka return k, nil } -func (k *Kafka) close() error { - //k.stopChan <- true - //close(k.stopChan) - //close(*k.messageChan) - //for _, conn := range k.kafkaConns { - // err := conn.Close() - // if err != nil { - // logrus.Warn(err) - // return err - // } - //} - return nil -} func (k *Kafka) Close() error { - //var err error - //err = nil - //k.once.Do(func() { - // err = k.close() - //}) - //return err + k.Stop() + for _, conn := range k.kafkaConns { + if err := conn.Close(); err != nil { + return err + } + } return nil } func (k *Kafka) Run() {