From 67967e4f5eee7ae5363c86dd41cf7bbb34d9dfcf Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Wed, 11 Sep 2024 19:58:15 +0800 Subject: [PATCH 1/4] client: Add benchmark for tsoStream and dispatcher Signed-off-by: MyonKeminta --- client/tso_dispatcher_test.go | 107 +++++++++++++++++++ client/tso_stream_test.go | 188 ++++++++++++++++++++++++++++++++++ 2 files changed, 295 insertions(+) create mode 100644 client/tso_dispatcher_test.go create mode 100644 client/tso_stream_test.go diff --git a/client/tso_dispatcher_test.go b/client/tso_dispatcher_test.go new file mode 100644 index 00000000000..b1a87d0a2cd --- /dev/null +++ b/client/tso_dispatcher_test.go @@ -0,0 +1,107 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pd + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/pingcap/log" + "go.uber.org/zap/zapcore" +) + +type mockTSOServiceProvider struct { + option *option +} + +func newMockTSOServiceProvider(option *option) *mockTSOServiceProvider { + return &mockTSOServiceProvider{ + option: option, + } +} + +func (m *mockTSOServiceProvider) getOption() *option { + return m.option +} + +func (m *mockTSOServiceProvider) getServiceDiscovery() ServiceDiscovery { + return NewMockPDServiceDiscovery([]string{mockStreamURL}, nil) +} + +func (m *mockTSOServiceProvider) updateConnectionCtxs(ctx context.Context, _dc string, connectionCtxs *sync.Map) bool { + _, ok := connectionCtxs.Load(mockStreamURL) + if ok { + return true + } + ctx, cancel := context.WithCancel(ctx) + stream := &tsoStream{ + serverURL: mockStreamURL, + stream: newMockTSOStreamImpl(ctx, true), + } + connectionCtxs.LoadOrStore(mockStreamURL, &tsoConnectionContext{ctx, cancel, mockStreamURL, stream}) + return true +} + +func BenchmarkTSODispatcherHandleRequests(b *testing.B) { + log.SetLevel(zapcore.FatalLevel) + + ctx := context.Background() + + reqPool := &sync.Pool{ + New: func() any { + return &tsoRequest{ + done: make(chan error, 1), + physical: 0, + logical: 0, + dcLocation: globalDCLocation, + } + }, + } + getReq := func() *tsoRequest { + req := reqPool.Get().(*tsoRequest) + req.clientCtx = ctx + req.requestCtx = ctx + req.physical = 0 + req.logical = 0 + req.start = time.Now() + req.pool = reqPool + return req + } + + dispatcher := newTSODispatcher(ctx, globalDCLocation, defaultMaxTSOBatchSize, newMockTSOServiceProvider(newOption())) + var wg sync.WaitGroup + wg.Add(1) + + go dispatcher.handleDispatcher(&wg) + defer func() { + dispatcher.close() + wg.Wait() + }() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + req := getReq() + dispatcher.push(req) + _, _, err := req.Wait() + if err != nil { + panic(fmt.Sprintf("unexpected error from tsoReq: %+v", err)) + } + } + // Don't count the time cost in `defer` + b.StopTimer() +} diff --git a/client/tso_stream_test.go b/client/tso_stream_test.go new file mode 100644 index 00000000000..9c68a35d55e --- /dev/null +++ b/client/tso_stream_test.go @@ -0,0 +1,188 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pd + +import ( + "context" + "io" + "testing" + "time" +) + +const mockStreamURL = "mock:///" + +type requestMsg struct { + clusterID uint64 + keyspaceGroupID uint32 + count int64 +} + +type resultMsg struct { + r tsoRequestResult + err error + breakStream bool +} + +type mockTSOStreamImpl struct { + ctx context.Context + requestCh chan requestMsg + resultCh chan resultMsg + keyspaceID uint32 + errorState error + + autoGenerateResult bool + // Current progress of generating TSO results + resGenPhysical, resGenLogical int64 +} + +func newMockTSOStreamImpl(ctx context.Context, autoGenerateResult bool) *mockTSOStreamImpl { + return &mockTSOStreamImpl{ + ctx: ctx, + requestCh: make(chan requestMsg, 64), + resultCh: make(chan resultMsg, 64), + keyspaceID: 0, + + autoGenerateResult: autoGenerateResult, + resGenPhysical: 10000, + resGenLogical: 0, + } +} + +func (s *mockTSOStreamImpl) Send(clusterID uint64, _keyspaceID, keyspaceGroupID uint32, _dcLocation string, count int64) error { + select { + case <-s.ctx.Done(): + return s.ctx.Err() + default: + } + s.requestCh <- requestMsg{ + clusterID: clusterID, + keyspaceGroupID: keyspaceGroupID, + count: count, + } + return nil +} + +func (s *mockTSOStreamImpl) Recv() (ret tsoRequestResult, retErr error) { + // This stream have ever receive an error, it returns the error forever. + if s.errorState != nil { + return tsoRequestResult{}, s.errorState + } + + select { + case <-s.ctx.Done(): + s.errorState = s.ctx.Err() + return tsoRequestResult{}, s.errorState + default: + } + + var res resultMsg + needGenRes := false + if s.autoGenerateResult { + select { + case res = <-s.resultCh: + default: + needGenRes = true + } + } else { + select { + case res = <-s.resultCh: + case <-s.ctx.Done(): + s.errorState = s.ctx.Err() + return tsoRequestResult{}, s.errorState + } + } + + if !res.breakStream { + var req requestMsg + select { + case req = <-s.requestCh: + case <-s.ctx.Done(): + s.errorState = s.ctx.Err() + return tsoRequestResult{}, s.errorState + } + if needGenRes { + + physical := s.resGenPhysical + logical := s.resGenLogical + req.count + if logical >= (1 << 18) { + physical += logical >> 18 + logical &= (1 << 18) - 1 + } + + s.resGenPhysical = physical + s.resGenLogical = logical + + res = resultMsg{ + r: tsoRequestResult{ + physical: s.resGenPhysical, + logical: s.resGenLogical, + count: uint32(req.count), + suffixBits: 0, + respKeyspaceGroupID: 0, + }, + } + } + } + if res.err != nil { + s.errorState = res.err + } + return res.r, res.err +} + +func (s *mockTSOStreamImpl) returnResult(physical int64, logical int64, count uint32) { + s.resultCh <- resultMsg{ + r: tsoRequestResult{ + physical: physical, + logical: logical, + count: count, + suffixBits: 0, + respKeyspaceGroupID: s.keyspaceID, + }, + } +} + +func (s *mockTSOStreamImpl) returnError(err error) { + s.resultCh <- resultMsg{ + err: err, + } +} + +func (s *mockTSOStreamImpl) breakStream(err error) { + s.resultCh <- resultMsg{ + err: err, + breakStream: true, + } +} + +func (s *mockTSOStreamImpl) stop() { + s.breakStream(io.EOF) +} + +func BenchmarkTSOStreamSendRecv(b *testing.B) { + streamInner := newMockTSOStreamImpl(context.Background(), true) + stream := tsoStream{ + serverURL: mockStreamURL, + stream: streamInner, + } + defer streamInner.stop() + + now := time.Now() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _, _, _, _ = stream.processRequests(1, 1, 1, globalDCLocation, 1, now) + } + b.StopTimer() +} From 7dbcd770af83d272e769e71ae4f81dd0ed85204a Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Thu, 12 Sep 2024 15:02:02 +0800 Subject: [PATCH 2/4] Reimplement mock receiving Signed-off-by: MyonKeminta --- client/tso_stream_test.go | 102 ++++++++++++++++++++++++++------------ 1 file changed, 70 insertions(+), 32 deletions(-) diff --git a/client/tso_stream_test.go b/client/tso_stream_test.go index 9c68a35d55e..c543d74c827 100644 --- a/client/tso_stream_test.go +++ b/client/tso_stream_test.go @@ -74,7 +74,7 @@ func (s *mockTSOStreamImpl) Send(clusterID uint64, _keyspaceID, keyspaceGroupID return nil } -func (s *mockTSOStreamImpl) Recv() (ret tsoRequestResult, retErr error) { +func (s *mockTSOStreamImpl) Recv() (tsoRequestResult, error) { // This stream have ever receive an error, it returns the error forever. if s.errorState != nil { return tsoRequestResult{}, s.errorState @@ -87,60 +87,98 @@ func (s *mockTSOStreamImpl) Recv() (ret tsoRequestResult, retErr error) { default: } - var res resultMsg - needGenRes := false - if s.autoGenerateResult { + var ( + res resultMsg + hasRes bool + req requestMsg + hasReq bool + ) + + // Try to match a pair of request and result from each channel and allowing breaking the stream at any time. + select { + case <-s.ctx.Done(): + s.errorState = s.ctx.Err() + return tsoRequestResult{}, s.errorState + case req = <-s.requestCh: + hasReq = true select { case res = <-s.resultCh: + hasRes = true default: - needGenRes = true } - } else { + case res = <-s.resultCh: + hasRes = true select { - case res = <-s.resultCh: - case <-s.ctx.Done(): + case req = <-s.requestCh: + hasReq = true + default: + } + } + // Either req or res should be ready at this time. + + if hasRes { + if res.breakStream { s.errorState = s.ctx.Err() return tsoRequestResult{}, s.errorState + } else { + // Do not allow manually assigning result. + if s.autoGenerateResult { + panic("trying manually specifying result for mockTSOStreamImpl when it's auto-generating mode") + } } + } else if s.autoGenerateResult { + res = s.autoGenResult(req.count) + hasRes = true } - if !res.breakStream { - var req requestMsg + if !hasReq { + // If req is not ready, the res must be ready. So it's certain that it don't need to be canceled by breakStream. select { - case req = <-s.requestCh: case <-s.ctx.Done(): s.errorState = s.ctx.Err() return tsoRequestResult{}, s.errorState + case req = <-s.requestCh: + hasReq = true } - if needGenRes { - - physical := s.resGenPhysical - logical := s.resGenLogical + req.count - if logical >= (1 << 18) { - physical += logical >> 18 - logical &= (1 << 18) - 1 - } - - s.resGenPhysical = physical - s.resGenLogical = logical - - res = resultMsg{ - r: tsoRequestResult{ - physical: s.resGenPhysical, - logical: s.resGenLogical, - count: uint32(req.count), - suffixBits: 0, - respKeyspaceGroupID: 0, - }, - } + } else if !hasRes { + select { + case <-s.ctx.Done(): + s.errorState = s.ctx.Err() + return tsoRequestResult{}, s.errorState + case res = <-s.resultCh: + hasRes = true } } + + // Both res and req should be ready here. if res.err != nil { s.errorState = res.err } return res.r, res.err } +func (s *mockTSOStreamImpl) autoGenResult(count int64) resultMsg { + physical := s.resGenPhysical + logical := s.resGenLogical + count + if logical >= (1 << 18) { + physical += logical >> 18 + logical &= (1 << 18) - 1 + } + + s.resGenPhysical = physical + s.resGenLogical = logical + + return resultMsg{ + r: tsoRequestResult{ + physical: s.resGenPhysical, + logical: s.resGenLogical, + count: uint32(count), + suffixBits: 0, + respKeyspaceGroupID: 0, + }, + } +} + func (s *mockTSOStreamImpl) returnResult(physical int64, logical int64, count uint32) { s.resultCh <- resultMsg{ r: tsoRequestResult{ From 123a869fb5349867b80f458517b4c53c7a6d089f Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Thu, 12 Sep 2024 15:27:09 +0800 Subject: [PATCH 3/4] Fix incorrect error generating Signed-off-by: MyonKeminta --- client/tso_stream_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/client/tso_stream_test.go b/client/tso_stream_test.go index c543d74c827..e47fe75eca9 100644 --- a/client/tso_stream_test.go +++ b/client/tso_stream_test.go @@ -118,7 +118,10 @@ func (s *mockTSOStreamImpl) Recv() (tsoRequestResult, error) { if hasRes { if res.breakStream { - s.errorState = s.ctx.Err() + if res.err == nil { + panic("breaking mockTSOStreamImpl without error") + } + s.errorState = res.err return tsoRequestResult{}, s.errorState } else { // Do not allow manually assigning result. From ef6e365c7a9a3061db8012b79bfdab9d1ce806a9 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Thu, 12 Sep 2024 16:14:33 +0800 Subject: [PATCH 4/4] Fix lint Signed-off-by: MyonKeminta --- client/tso_dispatcher_test.go | 4 ++-- client/tso_stream_test.go | 15 +++++++++------ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/client/tso_dispatcher_test.go b/client/tso_dispatcher_test.go index b1a87d0a2cd..7b7ac8f736b 100644 --- a/client/tso_dispatcher_test.go +++ b/client/tso_dispatcher_test.go @@ -39,11 +39,11 @@ func (m *mockTSOServiceProvider) getOption() *option { return m.option } -func (m *mockTSOServiceProvider) getServiceDiscovery() ServiceDiscovery { +func (*mockTSOServiceProvider) getServiceDiscovery() ServiceDiscovery { return NewMockPDServiceDiscovery([]string{mockStreamURL}, nil) } -func (m *mockTSOServiceProvider) updateConnectionCtxs(ctx context.Context, _dc string, connectionCtxs *sync.Map) bool { +func (*mockTSOServiceProvider) updateConnectionCtxs(ctx context.Context, _dc string, connectionCtxs *sync.Map) bool { _, ok := connectionCtxs.Load(mockStreamURL) if ok { return true diff --git a/client/tso_stream_test.go b/client/tso_stream_test.go index e47fe75eca9..4709fe04f4c 100644 --- a/client/tso_stream_test.go +++ b/client/tso_stream_test.go @@ -123,11 +123,9 @@ func (s *mockTSOStreamImpl) Recv() (tsoRequestResult, error) { } s.errorState = res.err return tsoRequestResult{}, s.errorState - } else { + } else if s.autoGenerateResult { // Do not allow manually assigning result. - if s.autoGenerateResult { - panic("trying manually specifying result for mockTSOStreamImpl when it's auto-generating mode") - } + panic("trying manually specifying result for mockTSOStreamImpl when it's auto-generating mode") } } else if s.autoGenerateResult { res = s.autoGenResult(req.count) @@ -141,7 +139,8 @@ func (s *mockTSOStreamImpl) Recv() (tsoRequestResult, error) { s.errorState = s.ctx.Err() return tsoRequestResult{}, s.errorState case req = <-s.requestCh: - hasReq = true + // Skip the assignment to make linter happy. + // hasReq = true } } else if !hasRes { select { @@ -149,7 +148,8 @@ func (s *mockTSOStreamImpl) Recv() (tsoRequestResult, error) { s.errorState = s.ctx.Err() return tsoRequestResult{}, s.errorState case res = <-s.resultCh: - hasRes = true + // Skip the assignment to make linter happy. + // hasRes = true } } @@ -182,6 +182,7 @@ func (s *mockTSOStreamImpl) autoGenResult(count int64) resultMsg { } } +// nolint:unused func (s *mockTSOStreamImpl) returnResult(physical int64, logical int64, count uint32) { s.resultCh <- resultMsg{ r: tsoRequestResult{ @@ -194,12 +195,14 @@ func (s *mockTSOStreamImpl) returnResult(physical int64, logical int64, count ui } } +// nolint:unused func (s *mockTSOStreamImpl) returnError(err error) { s.resultCh <- resultMsg{ err: err, } } +// nolint:unused func (s *mockTSOStreamImpl) breakStream(err error) { s.resultCh <- resultMsg{ err: err,