Skip to content

Commit

Permalink
fix conflict
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Mar 27, 2024
2 parents e75e186 + bc92c13 commit 46385af
Show file tree
Hide file tree
Showing 34 changed files with 624 additions and 136 deletions.
43 changes: 31 additions & 12 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,12 +606,22 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
log.Info("[pd] changing service mode",
zap.String("old-mode", c.serviceMode.String()),
zap.String("new-mode", newMode.String()))
c.resetTSOClientLocked(newMode)
oldMode := c.serviceMode
c.serviceMode = newMode
log.Info("[pd] service mode changed",
zap.String("old-mode", oldMode.String()),
zap.String("new-mode", newMode.String()))
}

// Reset a new TSO client.
func (c *client) resetTSOClientLocked(mode pdpb.ServiceMode) {
// Re-create a new TSO client.
var (
newTSOCli *tsoClient
newTSOSvcDiscovery ServiceDiscovery
)
switch newMode {
switch mode {
case pdpb.ServiceMode_PD_SVC_MODE:
newTSOCli = newTSOClient(c.ctx, c.option,
c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{})
Expand Down Expand Up @@ -649,11 +659,6 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
// We are switching from API service mode to PD service mode, so delete the old tso microservice discovery.
oldTSOSvcDiscovery.Close()
}
oldMode := c.serviceMode
c.serviceMode = newMode
log.Info("[pd] service mode changed",
zap.String("old-mode", oldMode.String()),
zap.String("new-mode", newMode.String()))
}

func (c *client) getTSOClient() *tsoClient {
Expand All @@ -662,6 +667,13 @@ func (c *client) getTSOClient() *tsoClient {
return c.tsoClient
}

// ResetTSOClient resets the TSO client, only for test.
func (c *client) ResetTSOClient() {
c.Lock()
defer c.Unlock()
c.resetTSOClientLocked(c.serviceMode)
}

func (c *client) getServiceMode() pdpb.ServiceMode {
c.RLock()
defer c.RUnlock()
Expand Down Expand Up @@ -779,15 +791,22 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
defer span.Finish()
}

req := c.getTSORequest(ctx, dcLocation)
if err := c.dispatchTSORequestWithRetry(req); err != nil {
req.tryDone(err)
}
return req
}

func (c *client) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest {
req := tsoReqPool.Get().(*tsoRequest)
req.requestCtx = ctx
req.clientCtx = c.ctx
// Set needed fields in the request before using it.
req.start = time.Now()
req.clientCtx = c.ctx
req.requestCtx = ctx
req.physical = 0
req.logical = 0
req.dcLocation = dcLocation

if err := c.dispatchTSORequestWithRetry(req); err != nil {
req.done <- err
}
return req
}

Expand Down
6 changes: 6 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ func TestUpdateURLs(t *testing.T) {
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members)
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2], members[0]}), cli.GetServiceURLs())
cli.updateURLs(members[1:])
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members[2:])
re.Equal(getURLs([]*pdpb.Member{members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members[3:])
re.Equal(getURLs([]*pdpb.Member{members[3]}), cli.GetServiceURLs())
}

const testClientURL = "tmp://test.url:5255"
Expand Down
15 changes: 13 additions & 2 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,22 @@ type RegionsInfo struct {
Regions []RegionInfo `json:"regions"`
}

func newRegionsInfo(count int64) *RegionsInfo {
return &RegionsInfo{
Count: count,
Regions: make([]RegionInfo, 0, count),
}
}

// Merge merges two RegionsInfo together and returns a new one.
func (ri *RegionsInfo) Merge(other *RegionsInfo) *RegionsInfo {
newRegionsInfo := &RegionsInfo{
Regions: make([]RegionInfo, 0, ri.Count+other.Count),
if ri == nil {
ri = newRegionsInfo(0)
}
if other == nil {
other = newRegionsInfo(0)
}
newRegionsInfo := newRegionsInfo(ri.Count + other.Count)
m := make(map[int64]RegionInfo, ri.Count+other.Count)
for _, region := range ri.Regions {
m[region.ID] = region
Expand Down
148 changes: 129 additions & 19 deletions client/http/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,140 @@ import (

func TestMergeRegionsInfo(t *testing.T) {
re := require.New(t)
regionsInfo1 := &RegionsInfo{
Count: 1,
Regions: []RegionInfo{
{
ID: 1,
StartKey: "",
EndKey: "a",
testCases := []struct {
source *RegionsInfo
target *RegionsInfo
}{
// Different regions.
{
source: &RegionsInfo{
Count: 1,
Regions: []RegionInfo{
{
ID: 1,
StartKey: "",
EndKey: "a",
},
},
},
target: &RegionsInfo{
Count: 1,
Regions: []RegionInfo{
{
ID: 2,
StartKey: "a",
EndKey: "",
},
},
},
},
}
regionsInfo2 := &RegionsInfo{
Count: 1,
Regions: []RegionInfo{
{
ID: 2,
StartKey: "a",
EndKey: "",
// Same region.
{
source: &RegionsInfo{
Count: 1,
Regions: []RegionInfo{
{
ID: 1,
StartKey: "",
EndKey: "a",
},
},
},
target: &RegionsInfo{
Count: 1,
Regions: []RegionInfo{
{
ID: 1,
StartKey: "",
EndKey: "a",
},
},
},
},
{
source: &RegionsInfo{
Count: 1,
Regions: []RegionInfo{
{
ID: 1,
StartKey: "",
EndKey: "a",
},
},
},
target: nil,
},
{
source: nil,
target: &RegionsInfo{
Count: 1,
Regions: []RegionInfo{
{
ID: 2,
StartKey: "a",
EndKey: "",
},
},
},
},
{
source: nil,
target: nil,
},
{
source: &RegionsInfo{
Count: 1,
Regions: []RegionInfo{
{
ID: 1,
StartKey: "",
EndKey: "a",
},
},
},
target: newRegionsInfo(0),
},
{
source: newRegionsInfo(0),
target: &RegionsInfo{
Count: 1,
Regions: []RegionInfo{
{
ID: 2,
StartKey: "a",
EndKey: "",
},
},
},
},
{
source: newRegionsInfo(0),
target: newRegionsInfo(0),
},
}
for idx, tc := range testCases {
regionsInfo := tc.source.Merge(tc.target)
if tc.source == nil {
tc.source = newRegionsInfo(0)
}
if tc.target == nil {
tc.target = newRegionsInfo(0)
}
m := make(map[int64]RegionInfo, tc.source.Count+tc.target.Count)
for _, region := range tc.source.Regions {
m[region.ID] = region
}
for _, region := range tc.target.Regions {
m[region.ID] = region
}
mergedCount := len(m)
re.Equal(int64(mergedCount), regionsInfo.Count, "case %d", idx)
re.Len(regionsInfo.Regions, mergedCount, "case %d", idx)
// All regions in source and target should be in the merged result.
for _, region := range append(tc.source.Regions, tc.target.Regions...) {
re.Contains(regionsInfo.Regions, region, "case %d", idx)
}
}
regionsInfo := regionsInfo1.Merge(regionsInfo2)
re.Equal(int64(2), regionsInfo.Count)
re.Len(regionsInfo.Regions, 2)
re.Subset(regionsInfo.Regions, append(regionsInfo1.Regions, regionsInfo2.Regions...))
}

func TestRuleStartEndKey(t *testing.T) {
Expand Down
30 changes: 28 additions & 2 deletions client/tso_batch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@ package pd

import (
"context"
"runtime/trace"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
)

type tsoBatchController struct {
Expand Down Expand Up @@ -130,9 +136,29 @@ func (tbc *tsoBatchController) adjustBestBatchSize() {
}
}

func (tbc *tsoBatchController) revokePendingRequest(err error) {
func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical int64, suffixBits uint32, err error) {
for i := 0; i < tbc.collectedRequestCount; i++ {
tsoReq := tbc.collectedRequests[i]
tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits)
defer trace.StartRegion(tsoReq.requestCtx, "pdclient.tsoReqDequeue").End()
tsoReq.tryDone(err)
}
// Prevent the finished requests from being processed again.
tbc.collectedRequestCount = 0
}

func (tbc *tsoBatchController) revokePendingRequests(err error) {
for i := 0; i < len(tbc.tsoRequestCh); i++ {
req := <-tbc.tsoRequestCh
req.done <- err
req.tryDone(err)
}
}

func (tbc *tsoBatchController) clear() {
log.Info("[pd] clear the tso batch controller",
zap.Int("max-batch-size", tbc.maxBatchSize), zap.Int("best-batch-size", tbc.bestBatchSize),
zap.Int("collected-request-count", tbc.collectedRequestCount), zap.Int("pending-request-count", len(tbc.tsoRequestCh)))
tsoErr := errors.WithStack(errClosing)
tbc.finishCollectedRequests(0, 0, 0, tsoErr)
tbc.revokePendingRequests(tsoErr)
}
11 changes: 8 additions & 3 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"go.uber.org/zap"
Expand Down Expand Up @@ -64,6 +63,13 @@ var tsoReqPool = sync.Pool{
},
}

func (req *tsoRequest) tryDone(err error) {
select {
case req.done <- err:
default:
}
}

type tsoClient struct {
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -140,9 +146,8 @@ func (c *tsoClient) Close() {
c.tsoDispatcher.Range(func(_, dispatcherInterface any) bool {
if dispatcherInterface != nil {
dispatcher := dispatcherInterface.(*tsoDispatcher)
tsoErr := errors.WithStack(errClosing)
dispatcher.tsoBatchController.revokePendingRequest(tsoErr)
dispatcher.dispatcherCancel()
dispatcher.tsoBatchController.clear()
}
return true
})
Expand Down
Loading

0 comments on commit 46385af

Please sign in to comment.