From 11e574ccc4ee3e29b35ebd9aa5f1660c60956264 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 13 Sep 2021 20:38:53 +0800 Subject: [PATCH] server: when init data dir, skip if get changefeed info failed. (#2778) (#2787) * This is an automated cherry-pick of #2778 Signed-off-by: ti-chi-bot * fix conflicts Signed-off-by: Neil Shen Co-authored-by: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Co-authored-by: Neil Shen --- cdc/kv/etcd.go | 18 ++++++++++ cdc/kv/etcd_test.go | 40 ++++++++++++++++++++++ cdc/server.go | 10 ++---- cdc/server_test.go | 81 ++++++++++++++++++++++++++++----------------- 4 files changed, 112 insertions(+), 37 deletions(-) diff --git a/cdc/kv/etcd.go b/cdc/kv/etcd.go index aa3f7e5584e..bda6a10ca76 100644 --- a/cdc/kv/etcd.go +++ b/cdc/kv/etcd.go @@ -201,6 +201,24 @@ func (c CDCEtcdClient) GetChangeFeeds(ctx context.Context) (int64, map[string]*m return revision, details, nil } +// GetAllChangeFeedInfo queries all changefeed information +func (c CDCEtcdClient) GetAllChangeFeedInfo(ctx context.Context) (map[string]*model.ChangeFeedInfo, error) { + _, details, err := c.GetChangeFeeds(ctx) + if err != nil { + return nil, errors.Trace(err) + } + allFeedInfo := make(map[string]*model.ChangeFeedInfo, len(details)) + for id, rawDetail := range details { + info := &model.ChangeFeedInfo{} + if err := info.Unmarshal(rawDetail.Value); err != nil { + return nil, errors.Trace(err) + } + allFeedInfo[id] = info + } + + return allFeedInfo, nil +} + // GetChangeFeedInfo queries the config of a given changefeed func (c CDCEtcdClient) GetChangeFeedInfo(ctx context.Context, id string) (*model.ChangeFeedInfo, error) { key := GetEtcdKeyChangeFeedInfo(id) diff --git a/cdc/kv/etcd_test.go b/cdc/kv/etcd_test.go index 569a7953c3f..e577c044a12 100644 --- a/cdc/kv/etcd_test.go +++ b/cdc/kv/etcd_test.go @@ -246,6 +246,46 @@ func (s *etcdSuite) TestOpChangeFeedDetail(c *check.C) { c.Assert(cerror.ErrChangeFeedNotExists.Equal(err), check.IsTrue) } +func (s etcdSuite) TestGetAllChangeFeedInfo(c *check.C) { + defer testleak.AfterTest(c)() + defer s.TearDownTest(c) + + ctx := context.Background() + infos := []struct { + id string + info *model.ChangeFeedInfo + }{ + { + id: "a", + info: &model.ChangeFeedInfo{ + SinkURI: "root@tcp(127.0.0.1:3306)/mysql", + SortDir: "/old-version/sorter", + }, + }, + { + id: "b", + info: &model.ChangeFeedInfo{ + SinkURI: "root@tcp(127.0.0.1:4000)/mysql", + }, + }, + } + + for _, item := range infos { + err := s.client.SaveChangeFeedInfo(ctx, item.info, item.id) + c.Assert(err, check.IsNil) + } + + allChangFeedInfo, err := s.client.GetAllChangeFeedInfo(ctx) + c.Assert(err, check.IsNil) + + for _, item := range infos { + obtained, found := allChangFeedInfo[item.id] + c.Assert(found, check.IsTrue) + c.Assert(item.info.SinkURI, check.Equals, obtained.SinkURI) + c.Assert(item.info.SortDir, check.Equals, obtained.SortDir) + } +} + func (s *etcdSuite) TestRemoveAllTaskXXX(c *check.C) { defer testleak.AfterTest(c)() defer s.TearDownTest(c) diff --git a/cdc/server.go b/cdc/server.go index f5668792426..2a9d2a92c88 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -427,17 +427,13 @@ func (s *Server) setUpDataDir(ctx context.Context) error { } // data-dir will be decide by exist changefeed for backward compatibility - allStatus, err := cli.GetAllChangeFeedStatus(ctx) + allInfo, err := cli.GetAllChangeFeedInfo(ctx) if err != nil { return errors.Trace(err) } - candidates := make([]string, 0, len(allStatus)) - for id := range allStatus { - info, err := cli.GetChangeFeedInfo(ctx, id) - if err != nil { - return errors.Trace(err) - } + candidates := make([]string, 0, len(allInfo)) + for _, info := range allInfo { if info.SortDir != "" { candidates = append(candidates, info.SortDir) } diff --git a/cdc/server_test.go b/cdc/server_test.go index 2c1c8de363c..1020168c8d5 100644 --- a/cdc/server_test.go +++ b/cdc/server_test.go @@ -20,15 +20,19 @@ import ( "time" "github.com/pingcap/check" + "github.com/pingcap/ticdc/cdc/kv" + "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/etcd" "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/ticdc/pkg/util/testleak" + "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" "golang.org/x/sync/errgroup" ) type serverSuite struct { + server *Server e *embed.Etcd clientURL *url.URL ctx context.Context @@ -37,15 +41,35 @@ type serverSuite struct { } func (s *serverSuite) SetUpTest(c *check.C) { - dir := c.MkDir() var err error + dir := c.MkDir() s.clientURL, s.e, err = etcd.SetupEmbedEtcd(dir) c.Assert(err, check.IsNil) + + pdEndpoints := []string{ + "http://" + s.clientURL.Host, + "http://invalid-pd-host:2379", + } + server, err := NewServer(pdEndpoints) + c.Assert(err, check.IsNil) + c.Assert(server, check.NotNil) + s.server = server + s.ctx, s.cancel = context.WithCancel(context.Background()) + client, err := clientv3.New(clientv3.Config{ + Endpoints: s.server.pdEndpoints, + Context: s.ctx, + DialTimeout: 5 * time.Second, + }) + c.Assert(err, check.IsNil) + etcdClient := kv.NewCDCEtcdClient(s.ctx, client) + s.server.etcdClient = &etcdClient + s.errg = util.HandleErrWithErrGroup(s.ctx, s.e.Err(), func(e error) { c.Log(e) }) } func (s *serverSuite) TearDownTest(c *check.C) { + s.server.Close() s.e.Close() s.cancel() err := s.errg.Wait() @@ -60,52 +84,49 @@ func (s *serverSuite) TestEtcdHealthChecker(c *check.C) { defer testleak.AfterTest(c)() defer s.TearDownTest(c) - ctx, cancel := context.WithCancel(context.Background()) - pdEndpoints := []string{ - "http://" + s.clientURL.Host, - "http://invalid-pd-host:2379", - } - server, err := NewServer(pdEndpoints) - c.Assert(err, check.IsNil) - c.Assert(server, check.NotNil) - s.errg.Go(func() error { - err := server.etcdHealthChecker(ctx) + err := s.server.etcdHealthChecker(s.ctx) c.Assert(err, check.Equals, context.Canceled) return nil }) // longer than one check tick 3s time.Sleep(time.Second * 4) - cancel() + s.cancel() } -func (s *serverSuite) TestInitDataDir(c *check.C) { +func (s *serverSuite) TestSetUpDataDir(c *check.C) { defer testleak.AfterTest(c)() defer s.TearDownTest(c) - ctx, cancel := context.WithCancel(context.Background()) - pdEndpoints := []string{ - "http://" + s.clientURL.Host, - "http://invalid-pd-host:2379", - } - server, err := NewServer(pdEndpoints) + conf := config.GetGlobalServerConfig() + // DataDir is not set, and no changefeed exist, use the default + conf.DataDir = "" + err := s.server.setUpDataDir(s.ctx) c.Assert(err, check.IsNil) - c.Assert(server, check.NotNil) + c.Assert(conf.DataDir, check.Equals, defaultDataDir) + c.Assert(conf.Sorter.SortDir, check.Equals, filepath.Join(defaultDataDir, config.DefaultSortDir)) - conf := config.GetGlobalServerConfig() - conf.DataDir = c.MkDir() + // DataDir is not set, but has existed changefeed, use the one with the largest available space + conf.DataDir = "" + dir := c.MkDir() + err = s.server.etcdClient.SaveChangeFeedInfo(s.ctx, &model.ChangeFeedInfo{SortDir: dir}, "a") + c.Assert(err, check.IsNil) - err = server.initDataDir(ctx) + err = s.server.etcdClient.SaveChangeFeedInfo(s.ctx, &model.ChangeFeedInfo{}, "b") c.Assert(err, check.IsNil) - c.Assert(conf.DataDir, check.Not(check.Equals), "") - c.Assert(conf.Sorter.SortDir, check.Equals, filepath.Join(conf.DataDir, "/tmp/sorter")) - config.StoreGlobalServerConfig(conf) - server.etcdClient = nil - conf.DataDir = "" - err = server.initDataDir(ctx) + err = s.server.setUpDataDir(s.ctx) + c.Assert(err, check.IsNil) + + c.Assert(conf.DataDir, check.Equals, dir) + c.Assert(conf.Sorter.SortDir, check.Equals, filepath.Join(dir, config.DefaultSortDir)) + + conf.DataDir = c.MkDir() + // DataDir has been set, just use it + err = s.server.setUpDataDir(s.ctx) c.Assert(err, check.IsNil) c.Assert(conf.DataDir, check.Not(check.Equals), "") + c.Assert(conf.Sorter.SortDir, check.Equals, filepath.Join(conf.DataDir, config.DefaultSortDir)) - cancel() + s.cancel() }