diff --git a/client/tso_dispatcher_test.go b/client/tso_dispatcher_test.go new file mode 100644 index 00000000000..7b7ac8f736b --- /dev/null +++ b/client/tso_dispatcher_test.go @@ -0,0 +1,107 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pd + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/pingcap/log" + "go.uber.org/zap/zapcore" +) + +type mockTSOServiceProvider struct { + option *option +} + +func newMockTSOServiceProvider(option *option) *mockTSOServiceProvider { + return &mockTSOServiceProvider{ + option: option, + } +} + +func (m *mockTSOServiceProvider) getOption() *option { + return m.option +} + +func (*mockTSOServiceProvider) getServiceDiscovery() ServiceDiscovery { + return NewMockPDServiceDiscovery([]string{mockStreamURL}, nil) +} + +func (*mockTSOServiceProvider) updateConnectionCtxs(ctx context.Context, _dc string, connectionCtxs *sync.Map) bool { + _, ok := connectionCtxs.Load(mockStreamURL) + if ok { + return true + } + ctx, cancel := context.WithCancel(ctx) + stream := &tsoStream{ + serverURL: mockStreamURL, + stream: newMockTSOStreamImpl(ctx, true), + } + connectionCtxs.LoadOrStore(mockStreamURL, &tsoConnectionContext{ctx, cancel, mockStreamURL, stream}) + return true +} + +func BenchmarkTSODispatcherHandleRequests(b *testing.B) { + log.SetLevel(zapcore.FatalLevel) + + ctx := context.Background() + + reqPool := &sync.Pool{ + New: func() any { + return &tsoRequest{ + done: make(chan error, 1), + physical: 0, + logical: 0, + dcLocation: globalDCLocation, + } + }, + } + getReq := func() *tsoRequest { + req := reqPool.Get().(*tsoRequest) + req.clientCtx = ctx + req.requestCtx = ctx + req.physical = 0 + req.logical = 0 + req.start = time.Now() + req.pool = reqPool + return req + } + + dispatcher := newTSODispatcher(ctx, globalDCLocation, defaultMaxTSOBatchSize, newMockTSOServiceProvider(newOption())) + var wg sync.WaitGroup + wg.Add(1) + + go dispatcher.handleDispatcher(&wg) + defer func() { + dispatcher.close() + wg.Wait() + }() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + req := getReq() + dispatcher.push(req) + _, _, err := req.Wait() + if err != nil { + panic(fmt.Sprintf("unexpected error from tsoReq: %+v", err)) + } + } + // Don't count the time cost in `defer` + b.StopTimer() +} diff --git a/client/tso_stream_test.go b/client/tso_stream_test.go new file mode 100644 index 00000000000..4709fe04f4c --- /dev/null +++ b/client/tso_stream_test.go @@ -0,0 +1,232 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pd + +import ( + "context" + "io" + "testing" + "time" +) + +const mockStreamURL = "mock:///" + +type requestMsg struct { + clusterID uint64 + keyspaceGroupID uint32 + count int64 +} + +type resultMsg struct { + r tsoRequestResult + err error + breakStream bool +} + +type mockTSOStreamImpl struct { + ctx context.Context + requestCh chan requestMsg + resultCh chan resultMsg + keyspaceID uint32 + errorState error + + autoGenerateResult bool + // Current progress of generating TSO results + resGenPhysical, resGenLogical int64 +} + +func newMockTSOStreamImpl(ctx context.Context, autoGenerateResult bool) *mockTSOStreamImpl { + return &mockTSOStreamImpl{ + ctx: ctx, + requestCh: make(chan requestMsg, 64), + resultCh: make(chan resultMsg, 64), + keyspaceID: 0, + + autoGenerateResult: autoGenerateResult, + resGenPhysical: 10000, + resGenLogical: 0, + } +} + +func (s *mockTSOStreamImpl) Send(clusterID uint64, _keyspaceID, keyspaceGroupID uint32, _dcLocation string, count int64) error { + select { + case <-s.ctx.Done(): + return s.ctx.Err() + default: + } + s.requestCh <- requestMsg{ + clusterID: clusterID, + keyspaceGroupID: keyspaceGroupID, + count: count, + } + return nil +} + +func (s *mockTSOStreamImpl) Recv() (tsoRequestResult, error) { + // This stream have ever receive an error, it returns the error forever. + if s.errorState != nil { + return tsoRequestResult{}, s.errorState + } + + select { + case <-s.ctx.Done(): + s.errorState = s.ctx.Err() + return tsoRequestResult{}, s.errorState + default: + } + + var ( + res resultMsg + hasRes bool + req requestMsg + hasReq bool + ) + + // Try to match a pair of request and result from each channel and allowing breaking the stream at any time. + select { + case <-s.ctx.Done(): + s.errorState = s.ctx.Err() + return tsoRequestResult{}, s.errorState + case req = <-s.requestCh: + hasReq = true + select { + case res = <-s.resultCh: + hasRes = true + default: + } + case res = <-s.resultCh: + hasRes = true + select { + case req = <-s.requestCh: + hasReq = true + default: + } + } + // Either req or res should be ready at this time. + + if hasRes { + if res.breakStream { + if res.err == nil { + panic("breaking mockTSOStreamImpl without error") + } + s.errorState = res.err + return tsoRequestResult{}, s.errorState + } else if s.autoGenerateResult { + // Do not allow manually assigning result. + panic("trying manually specifying result for mockTSOStreamImpl when it's auto-generating mode") + } + } else if s.autoGenerateResult { + res = s.autoGenResult(req.count) + hasRes = true + } + + if !hasReq { + // If req is not ready, the res must be ready. So it's certain that it don't need to be canceled by breakStream. + select { + case <-s.ctx.Done(): + s.errorState = s.ctx.Err() + return tsoRequestResult{}, s.errorState + case req = <-s.requestCh: + // Skip the assignment to make linter happy. + // hasReq = true + } + } else if !hasRes { + select { + case <-s.ctx.Done(): + s.errorState = s.ctx.Err() + return tsoRequestResult{}, s.errorState + case res = <-s.resultCh: + // Skip the assignment to make linter happy. + // hasRes = true + } + } + + // Both res and req should be ready here. + if res.err != nil { + s.errorState = res.err + } + return res.r, res.err +} + +func (s *mockTSOStreamImpl) autoGenResult(count int64) resultMsg { + physical := s.resGenPhysical + logical := s.resGenLogical + count + if logical >= (1 << 18) { + physical += logical >> 18 + logical &= (1 << 18) - 1 + } + + s.resGenPhysical = physical + s.resGenLogical = logical + + return resultMsg{ + r: tsoRequestResult{ + physical: s.resGenPhysical, + logical: s.resGenLogical, + count: uint32(count), + suffixBits: 0, + respKeyspaceGroupID: 0, + }, + } +} + +// nolint:unused +func (s *mockTSOStreamImpl) returnResult(physical int64, logical int64, count uint32) { + s.resultCh <- resultMsg{ + r: tsoRequestResult{ + physical: physical, + logical: logical, + count: count, + suffixBits: 0, + respKeyspaceGroupID: s.keyspaceID, + }, + } +} + +// nolint:unused +func (s *mockTSOStreamImpl) returnError(err error) { + s.resultCh <- resultMsg{ + err: err, + } +} + +// nolint:unused +func (s *mockTSOStreamImpl) breakStream(err error) { + s.resultCh <- resultMsg{ + err: err, + breakStream: true, + } +} + +func (s *mockTSOStreamImpl) stop() { + s.breakStream(io.EOF) +} + +func BenchmarkTSOStreamSendRecv(b *testing.B) { + streamInner := newMockTSOStreamImpl(context.Background(), true) + stream := tsoStream{ + serverURL: mockStreamURL, + stream: streamInner, + } + defer streamInner.stop() + + now := time.Now() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _, _, _, _ = stream.processRequests(1, 1, 1, globalDCLocation, 1, now) + } + b.StopTimer() +}