From d493b7f068f9755fc556031cd0830face8b1e880 Mon Sep 17 00:00:00 2001 From: LeeHao <1838249551@qq.com> Date: Mon, 21 Oct 2024 21:44:52 +0800 Subject: [PATCH] chore(pika_cdc): correct the code format Signed-off-by: LeeHao <1838249551@qq.com> --- tools/pika_cdc/conf/cdc.yml | 4 +++- tools/pika_cdc/conf/conf.go | 3 ++- tools/pika_cdc/consumer/kafka.go | 3 +-- tools/pika_cdc/main.go | 2 +- tools/pika_cdc/pika/replprotocol_test.go | 26 +++++++++++------------- 5 files changed, 19 insertions(+), 19 deletions(-) diff --git a/tools/pika_cdc/conf/cdc.yml b/tools/pika_cdc/conf/cdc.yml index db122d006a..be123c410f 100644 --- a/tools/pika_cdc/conf/cdc.yml +++ b/tools/pika_cdc/conf/cdc.yml @@ -1,5 +1,7 @@ # pika_server, this port is pika replication port. -pika_server : 127.0.0.1:11221 +pika_repl_server : 127.0.0.1:11221 +# pika_server, this port is pika redis client port. +pika_client_server : 127.0.0.1:9221 # For data from one DB of one pika, a separate MQ topic is created, # and the name of the topic is the dbname of the pika kafka_servers: diff --git a/tools/pika_cdc/conf/conf.go b/tools/pika_cdc/conf/conf.go index f07ac86d60..feef12a570 100644 --- a/tools/pika_cdc/conf/conf.go +++ b/tools/pika_cdc/conf/conf.go @@ -13,7 +13,8 @@ import ( ) type PikaCdcConfig struct { - PikaServer string `yaml:"pika_server"` + PikaReplServer string `yaml:"pika_repl_server"` + PikaClientServer string `yaml:"pika_client_server"` KafkaServers []string `yaml:"kafka_servers"` RedisServers []string `yaml:"redis_servers"` Retries int `yaml:"retries"` diff --git a/tools/pika_cdc/consumer/kafka.go b/tools/pika_cdc/consumer/kafka.go index ea86a2d990..51740cd6fe 100644 --- a/tools/pika_cdc/consumer/kafka.go +++ b/tools/pika_cdc/consumer/kafka.go @@ -35,9 +35,8 @@ func NewKafka(server string, retries int, msgChanns map[string]chan []byte) (*Ka conn, err := kafka.DialLeader(context.Background(), "tcp", server, dbname, 0) if err != nil { return k, err - } else { - k.kafkaConns[dbname] = conn } + k.kafkaConns[dbname] = conn k.msgChanns[dbname] = chann } k.stopChan = make(chan bool) diff --git a/tools/pika_cdc/main.go b/tools/pika_cdc/main.go index 74705436a9..043dc72271 100644 --- a/tools/pika_cdc/main.go +++ b/tools/pika_cdc/main.go @@ -8,7 +8,7 @@ import ( ) func main() { - if pikaServer, err := pika.New(conf.ConfigInstance.PikaServer, conf.ConfigInstance.BufferMsgNumbers); err != nil { + if pikaServer, err := pika.New(conf.ConfigInstance.PikaReplServer, conf.ConfigInstance.BufferMsgNumbers); err != nil { logrus.Fatal("failed to connect pika server, {}", err) } else { if consumers, err := consumer.GenerateConsumers(conf.ConfigInstance, pikaServer.MsgChanns); err != nil { diff --git a/tools/pika_cdc/pika/replprotocol_test.go b/tools/pika_cdc/pika/replprotocol_test.go index a163ecca82..8c78a4f1fd 100644 --- a/tools/pika_cdc/pika/replprotocol_test.go +++ b/tools/pika_cdc/pika/replprotocol_test.go @@ -14,14 +14,14 @@ import ( "os" "pika_cdc/conf" "pika_cdc/pika/proto/inner" - "strconv" "testing" "time" ) func TestConnect(t *testing.T) { cxt := context.Background() - addr := "127.0.0.1:9221" + + addr := conf.ConfigInstance.PikaClientServer client := redis.NewClient(&redis.Options{ Addr: addr, Password: "", // no password set @@ -31,14 +31,13 @@ func TestConnect(t *testing.T) { } func TestSendMetaSync(t *testing.T) { - ip := string("127.0.0.1") + ip := "127.0.0.1" listener, e := net.Listen("tcp", ":0") if e != nil { os.Exit(1) } selfPort := getPort(listener.Addr().String()) - var masterPort int32 = getPort(conf.ConfigInstance.PikaServer) + 2000 - addr := ip + ":" + strconv.Itoa(int(masterPort)) + addr := conf.ConfigInstance.PikaReplServer tt := inner.Type_kMetaSync request := inner.InnerRequest{ Type: &tt, @@ -116,9 +115,7 @@ func getResponse(conn net.Conn) *inner.InnerResponse { func sendReplReq(conn net.Conn, request *inner.InnerRequest) (net.Conn, error) { if conn == nil { - ip := string("127.0.0.1") - var masterReplPort int32 = getPort(conf.ConfigInstance.PikaServer) + 2000 - addr := ip + ":" + strconv.Itoa(int(masterReplPort)) + addr := conf.ConfigInstance.PikaReplServer newConn, err := net.Dial("tcp", addr) if err != nil { return nil, err @@ -141,9 +138,7 @@ func sendReplReq(conn net.Conn, request *inner.InnerRequest) (net.Conn, error) { func sendMetaSyncRequest(conn net.Conn) (net.Conn, error) { if conn == nil { - ip := string("127.0.0.1") - var masterReplPort int32 = getPort(conf.ConfigInstance.PikaServer) + 2000 - addr := ip + ":" + strconv.Itoa(int(masterReplPort)) + addr := conf.ConfigInstance.PikaReplServer newConn, err := net.Dial("tcp", addr) if err != nil { return nil, err @@ -166,7 +161,7 @@ func sendMetaSyncRequest(conn net.Conn) (net.Conn, error) { } func TestGetOffsetFromMaster(t *testing.T) { - ip := string("127.0.0.1") + ip := "127.0.0.1" listener, e := net.Listen("tcp", ":0") if e != nil { os.Exit(1) @@ -217,7 +212,7 @@ func TestGetOffsetFromMaster(t *testing.T) { } func TestSendDbSyncReqMsg(t *testing.T) { - ip := string("127.0.0.1") + ip := "127.0.0.1" listener, e := net.Listen("tcp", ":0") if e != nil { os.Exit(1) @@ -279,7 +274,7 @@ func BuildInternalTag(resp []byte) (tag string) { return string(buf) } -// CustomData 解析后的自定义数据结构 +// Pika Binlog Item type BinlogItem struct { Type uint16 CreateTime uint32 @@ -291,6 +286,8 @@ type BinlogItem struct { Content []byte } +// Test incremental synchronization of data +// pika -> redis func TestGetIncrementalSync(t *testing.T) { conn, err := sendMetaSyncRequest(nil) if err != nil { @@ -391,6 +388,7 @@ func TestGetIncrementalSync(t *testing.T) { } }() } + // never stop as a backend service for { } }