Skip to content

Commit

Permalink
region cache: retry scan or batch scan regions when returned region h…
Browse files Browse the repository at this point in the history
…as no leader (#1480)

 

Signed-off-by: cfzjywxk <[email protected]>
  • Loading branch information
cfzjywxk authored Oct 23, 2024
1 parent 8dfa86b commit 691e80a
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 4 deletions.
28 changes: 25 additions & 3 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2223,7 +2223,18 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte,
backoffErr = errors.Errorf("PD returned regions have gaps, limit: %d", limit)
continue
}
return c.handleRegionInfos(bo, regionsInfo, true)
validRegions, err := c.handleRegionInfos(bo, regionsInfo, true)
if err != nil {
return nil, err
}
// If the region information is loaded from the local disk and the current leader has not
// yet reported a heartbeat to PD, the region information scanned at this time will not include the leader.
// Retry if there is no valid regions with leaders.
if len(validRegions) == 0 {
backoffErr = errors.Errorf("All returned regions have no leaders, limit: %d", limit)
continue
}
return validRegions, nil
}
}

Expand Down Expand Up @@ -2290,7 +2301,18 @@ func (c *RegionCache) batchScanRegions(bo *retry.Backoffer, keyRanges []pd.KeyRa
)
continue
}
return c.handleRegionInfos(bo, regionsInfo, opt.needRegionHasLeaderPeer)
validRegions, err := c.handleRegionInfos(bo, regionsInfo, opt.needRegionHasLeaderPeer)
if err != nil {
return nil, err
}
// If the region information is loaded from the local disk and the current leader has not
// yet reported a heartbeat to PD, the region information scanned at this time will not include the leader.
// Retry if there is no valid regions with leaders.
if len(validRegions) == 0 {
backoffErr = errors.Errorf("All returned regions have no leaders, limit: %d", limit)
continue
}
return validRegions, nil
}
}

Expand Down Expand Up @@ -2397,7 +2419,7 @@ func (c *RegionCache) handleRegionInfos(bo *retry.Backoffer, regionsInfo []*pd.R
regions = append(regions, region)
}
if len(regions) == 0 {
return nil, errors.New("receive Regions with no peer")
return nil, nil
}
if len(regions) < len(regionsInfo) {
logutil.Logger(context.Background()).Debug(
Expand Down
44 changes: 43 additions & 1 deletion internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ import (

type inspectedPDClient struct {
pd.Client
getRegion func(ctx context.Context, cli pd.Client, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error)
getRegion func(ctx context.Context, cli pd.Client, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error)
batchScanRegions func(ctx context.Context, keyRanges []pd.KeyRange, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error)
}

func (c *inspectedPDClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) {
Expand All @@ -76,6 +77,13 @@ func (c *inspectedPDClient) GetRegion(ctx context.Context, key []byte, opts ...p
return c.Client.GetRegion(ctx, key, opts...)
}

func (c *inspectedPDClient) BatchScanRegions(ctx context.Context, keyRanges []pd.KeyRange, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) {
if c.batchScanRegions != nil {
return c.batchScanRegions(ctx, keyRanges, limit, opts...)
}
return c.Client.BatchScanRegions(ctx, keyRanges, limit, opts...)
}

func TestBackgroundRunner(t *testing.T) {
t.Run("ShutdownWait", func(t *testing.T) {
dur := 100 * time.Millisecond
Expand Down Expand Up @@ -459,6 +467,40 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() {
s.cluster.AddStore(storeMeta.GetId(), storeMeta.GetAddress(), storeMeta.GetLabels()...)
}

func (s *testRegionCacheSuite) TestReturnRegionWithNoLeader() {
region := s.getRegion([]byte("x"))
NoLeaderRegion := &pd.Region{
Meta: region.meta,
Leader: nil,
}

originalBatchScanRegions := s.cache.pdClient.BatchScanRegions

batchScanCnt := 0
s.cache.pdClient = &inspectedPDClient{
Client: s.cache.pdClient,
batchScanRegions: func(ctx context.Context, keyRanges []pd.KeyRange, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) {
if batchScanCnt == 0 {
batchScanCnt++
return []*pd.Region{NoLeaderRegion}, nil
} else {
return originalBatchScanRegions(ctx, keyRanges, limit, opts...)
}
},
}

bo := retry.NewBackofferWithVars(context.Background(), 1000, nil)
returnedRegions, err := s.cache.scanRegions(bo, nil, nil, 100)
s.Nil(err)
s.Equal(len(returnedRegions), 1)
s.Equal(returnedRegions[0].meta.GetId(), region.GetID())

returnedRegions, err = s.cache.batchScanRegions(bo, []pd.KeyRange{{StartKey: nil, EndKey: nil}}, 100, WithNeedRegionHasLeaderPeer())
s.Nil(err)
s.Equal(len(returnedRegions), 1)
s.Equal(returnedRegions[0].meta.GetId(), region.GetID())
}

func (s *testRegionCacheSuite) TestNeedExpireRegionAfterTTL() {
s.onClosed = func() { SetRegionCacheTTLWithJitter(600, 60) }
SetRegionCacheTTLWithJitter(2, 0)
Expand Down

0 comments on commit 691e80a

Please sign in to comment.