Skip to content

Commit

Permalink
chore(pika_cdc): remove some redundant code
Browse files Browse the repository at this point in the history
Signed-off-by: LeeHao <[email protected]>
  • Loading branch information
ForestLH committed Sep 28, 2024
1 parent f4f52bc commit b74b686
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 24 deletions.
4 changes: 1 addition & 3 deletions tools/pika_cdc/conf/cdc.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
# pika_server
# pika_server, this port is pika replication port.
pika_server : 127.0.0.1:11221
# For data from one DB of one pika, a separate MQ topic is created,
# and the name of the topic is the dbname of the pika
kafka_servers:
- 127.0.0.1:9092
redis_servers:
- 127.0.0.1:6379
pulsar_servers:
- 127.0.0.1:6650
# retry times while send message failed
retries : 0
# retry interval while send message failed(ms)
Expand Down
1 change: 0 additions & 1 deletion tools/pika_cdc/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type PikaCdcConfig struct {
PikaServer string `yaml:"pika_server"`
KafkaServers []string `yaml:"kafka_servers"`
RedisServers []string `yaml:"redis_servers"`
PulsarServers []string `yaml:"pulsar_servers"`
Retries int `yaml:"retries"`
RetryInterval int `yaml:"retry_interval"`
ParallelThreadSize int `yaml:"parallel_thread_size"`
Expand Down
26 changes: 6 additions & 20 deletions tools/pika_cdc/consumer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ type Kafka struct {
wg sync.WaitGroup
msgChanns map[string]chan []byte
stopChan chan bool
once sync.Once
protocol KafkaProtocol
}

Expand Down Expand Up @@ -46,26 +45,13 @@ func NewKafka(server string, retries int, msgChanns map[string]chan []byte) (*Ka
return k, nil
}

func (k *Kafka) close() error {
//k.stopChan <- true
//close(k.stopChan)
//close(*k.messageChan)
//for _, conn := range k.kafkaConns {
// err := conn.Close()
// if err != nil {
// logrus.Warn(err)
// return err
// }
//}
return nil
}
func (k *Kafka) Close() error {
//var err error
//err = nil
//k.once.Do(func() {
// err = k.close()
//})
//return err
k.Stop()
for _, conn := range k.kafkaConns {
if err := conn.Close(); err != nil {
return err
}
}
return nil
}
func (k *Kafka) Run() {
Expand Down

0 comments on commit b74b686

Please sign in to comment.