Skip to content

Commit

Permalink
Merge branch 'master' into improve-big-query-only
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Aug 5, 2024
2 parents 881b24d + 1335ff9 commit 0945a1b
Show file tree
Hide file tree
Showing 125 changed files with 1,408 additions and 1,255 deletions.
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,6 @@ issues:
- path: (pd-analysis|pd-api-bench|pd-backup|pd-ctl|pd-heartbeat-bench|pd-recover|pd-simulator|pd-tso-bench|pd-ut|regions-dump|stores-dump)
linters:
- errcheck
include:
# remove the comment after the path is ready
# - EXC0012
6 changes: 2 additions & 4 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,7 @@ type tsoConnectionContext struct {
// Current URL of the stream connection.
streamURL string
// Current stream to send gRPC requests.
// - `pdpb.PD_TsoClient` for a leader/follower in the PD cluster.
// - `tsopb.TSO_TsoClient` for a primary/secondary in the TSO cluster.
stream tsoStream
stream *tsoStream
}

// updateConnectionCtxs will choose the proper way to update the connections for the given dc-location.
Expand Down Expand Up @@ -382,7 +380,7 @@ func (c *tsoClient) tryConnectToTSO(
var (
networkErrNum uint64
err error
stream tsoStream
stream *tsoStream
url string
cc *grpc.ClientConn
updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) {
Expand Down
4 changes: 2 additions & 2 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) {
streamCtx context.Context
cancel context.CancelFunc
streamURL string
stream tsoStream
stream *tsoStream
)
// Loop through each batch of TSO requests and send them for processing.
streamLoopTimer := time.NewTimer(option.timeout)
Expand Down Expand Up @@ -393,7 +393,7 @@ func chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext
}

func (td *tsoDispatcher) processRequests(
stream tsoStream, dcLocation string, tbc *tsoBatchController,
stream *tsoStream, dcLocation string, tbc *tsoBatchController,
) error {
var (
requests = tbc.getCollectedRequests()
Expand Down
138 changes: 68 additions & 70 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,22 @@ func (*tsoTSOStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBui
// TSO Stream Builder

type tsoStreamBuilder interface {
build(context.Context, context.CancelFunc, time.Duration) (tsoStream, error)
build(context.Context, context.CancelFunc, time.Duration) (*tsoStream, error)
}

type pdTSOStreamBuilder struct {
serverURL string
client pdpb.PDClient
}

func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (tsoStream, error) {
func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (*tsoStream, error) {
done := make(chan struct{})
// TODO: we need to handle a conner case that this goroutine is timeout while the stream is successfully created.
go checkStreamTimeout(ctx, cancel, done, timeout)
stream, err := b.client.Tso(ctx)
done <- struct{}{}
if err == nil {
return &pdTSOStream{stream: stream, serverURL: b.serverURL}, nil
return &tsoStream{stream: pdTSOStreamAdapter{stream}, serverURL: b.serverURL}, nil
}
return nil, err
}
Expand All @@ -74,14 +74,14 @@ type tsoTSOStreamBuilder struct {

func (b *tsoTSOStreamBuilder) build(
ctx context.Context, cancel context.CancelFunc, timeout time.Duration,
) (tsoStream, error) {
) (*tsoStream, error) {
done := make(chan struct{})
// TODO: we need to handle a conner case that this goroutine is timeout while the stream is successfully created.
go checkStreamTimeout(ctx, cancel, done, timeout)
stream, err := b.client.Tso(ctx)
done <- struct{}{}
if err == nil {
return &tsoTSOStream{stream: stream, serverURL: b.serverURL}, nil
return &tsoStream{stream: tsoTSOStreamAdapter{stream}, serverURL: b.serverURL}, nil
}
return nil, err
}
Expand All @@ -99,86 +99,53 @@ func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done cha
<-done
}

// TSO Stream

type tsoStream interface {
getServerURL() string
// processRequests processes TSO requests in streaming mode to get timestamps
processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error)
type tsoRequestResult struct {
physical, logical int64
count uint32
suffixBits uint32
respKeyspaceGroupID uint32
}

type pdTSOStream struct {
serverURL string
stream pdpb.PD_TsoClient
type grpcTSOStreamAdapter interface {
Send(clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
count int64) error
Recv() (tsoRequestResult, error)
}

func (s *pdTSOStream) getServerURL() string {
return s.serverURL
type pdTSOStreamAdapter struct {
stream pdpb.PD_TsoClient
}

func (s *pdTSOStream) processRequests(
clusterID uint64, _, _ uint32, dcLocation string, count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
func (s pdTSOStreamAdapter) Send(clusterID uint64, _, _ uint32, dcLocation string, count int64) error {
req := &pdpb.TsoRequest{
Header: &pdpb.RequestHeader{
ClusterId: clusterID,
},
Count: uint32(count),
DcLocation: dcLocation,
}
return s.stream.Send(req)
}

if err = s.stream.Send(req); err != nil {
if err == io.EOF {
err = errs.ErrClientTSOStreamClosed
} else {
err = errors.WithStack(err)
}
return
}
tsoBatchSendLatency.Observe(time.Since(batchStartTime).Seconds())
func (s pdTSOStreamAdapter) Recv() (tsoRequestResult, error) {
resp, err := s.stream.Recv()
duration := time.Since(start).Seconds()
if err != nil {
requestFailedDurationTSO.Observe(duration)
if err == io.EOF {
err = errs.ErrClientTSOStreamClosed
} else {
err = errors.WithStack(err)
}
return
return tsoRequestResult{}, err
}
requestDurationTSO.Observe(duration)
tsoBatchSize.Observe(float64(count))

if resp.GetCount() != uint32(count) {
err = errors.WithStack(errTSOLength)
return
}

ts := resp.GetTimestamp()
respKeyspaceGroupID = defaultKeySpaceGroupID
physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
return
return tsoRequestResult{
physical: resp.GetTimestamp().GetPhysical(),
logical: resp.GetTimestamp().GetLogical(),
count: resp.GetCount(),
suffixBits: resp.GetTimestamp().GetSuffixBits(),
respKeyspaceGroupID: defaultKeySpaceGroupID,
}, nil
}

type tsoTSOStream struct {
serverURL string
stream tsopb.TSO_TsoClient
type tsoTSOStreamAdapter struct {
stream tsopb.TSO_TsoClient
}

func (s *tsoTSOStream) getServerURL() string {
return s.serverURL
}

func (s *tsoTSOStream) processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
func (s tsoTSOStreamAdapter) Send(clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, count int64) error {
req := &tsopb.TsoRequest{
Header: &tsopb.RequestHeader{
ClusterId: clusterID,
Expand All @@ -188,8 +155,40 @@ func (s *tsoTSOStream) processRequests(
Count: uint32(count),
DcLocation: dcLocation,
}
return s.stream.Send(req)
}

if err = s.stream.Send(req); err != nil {
func (s tsoTSOStreamAdapter) Recv() (tsoRequestResult, error) {
resp, err := s.stream.Recv()
if err != nil {
return tsoRequestResult{}, err
}
return tsoRequestResult{
physical: resp.GetTimestamp().GetPhysical(),
logical: resp.GetTimestamp().GetLogical(),
count: resp.GetCount(),
suffixBits: resp.GetTimestamp().GetSuffixBits(),
respKeyspaceGroupID: resp.GetHeader().GetKeyspaceGroupId(),
}, nil
}

type tsoStream struct {
serverURL string
// The internal gRPC stream.
// - `pdpb.PD_TsoClient` for a leader/follower in the PD cluster.
// - `tsopb.TSO_TsoClient` for a primary/secondary in the TSO cluster.
stream grpcTSOStreamAdapter
}

func (s *tsoStream) getServerURL() string {
return s.serverURL
}

func (s *tsoStream) processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
if err = s.stream.Send(clusterID, keyspaceID, keyspaceGroupID, dcLocation, count); err != nil {
if err == io.EOF {
err = errs.ErrClientTSOStreamClosed
} else {
Expand All @@ -198,7 +197,7 @@ func (s *tsoTSOStream) processRequests(
return
}
tsoBatchSendLatency.Observe(time.Since(batchStartTime).Seconds())
resp, err := s.stream.Recv()
res, err := s.stream.Recv()
duration := time.Since(start).Seconds()
if err != nil {
requestFailedDurationTSO.Observe(duration)
Expand All @@ -212,13 +211,12 @@ func (s *tsoTSOStream) processRequests(
requestDurationTSO.Observe(duration)
tsoBatchSize.Observe(float64(count))

if resp.GetCount() != uint32(count) {
if res.count != uint32(count) {
err = errors.WithStack(errTSOLength)
return
}

ts := resp.GetTimestamp()
respKeyspaceGroupID = resp.GetHeader().GetKeyspaceGroupId()
physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
respKeyspaceGroupID = res.respKeyspaceGroupID
physical, logical, suffixBits = res.physical, res.logical, res.suffixBits
return
}
3 changes: 3 additions & 0 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func NewTSOServiceCommand() *cobra.Command {
Short: "Run the TSO service",
Run: tso.CreateServerWrapper,
}
cmd.Flags().StringP("name", "", "", "human-readable name for this tso member")
cmd.Flags().BoolP("version", "V", false, "print version information and exit")
cmd.Flags().StringP("config", "", "", "config file")
cmd.Flags().StringP("backend-endpoints", "", "", "url for etcd client")
Expand All @@ -114,6 +115,7 @@ func NewSchedulingServiceCommand() *cobra.Command {
Short: "Run the scheduling service",
Run: scheduling.CreateServerWrapper,
}
cmd.Flags().StringP("name", "", "", "human-readable name for this scheduling member")
cmd.Flags().BoolP("version", "V", false, "print version information and exit")
cmd.Flags().StringP("config", "", "", "config file")
cmd.Flags().StringP("backend-endpoints", "", "", "url for etcd client")
Expand All @@ -134,6 +136,7 @@ func NewResourceManagerServiceCommand() *cobra.Command {
Short: "Run the resource manager service",
Run: resource_manager.CreateServerWrapper,
}
cmd.Flags().StringP("name", "", "", "human-readable name for this resource manager member")
cmd.Flags().BoolP("version", "V", false, "print version information and exit")
cmd.Flags().StringP("config", "", "", "config file")
cmd.Flags().StringP("backend-endpoints", "", "", "url for etcd client")
Expand Down
4 changes: 2 additions & 2 deletions conf/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@

[schedule]
## Controls the size limit of Region Merge.
# max-merge-region-size = 20
# max-merge-region-size = 54
## Specifies the upper limit of the Region Merge key.
# max-merge-region-keys = 200000
# max-merge-region-keys = 540000
## Controls the time interval between the split and merge operations on the same Region.
# split-merge-interval = "1h"
## When PD fails to receive the heartbeat from a store after the specified period of time,
Expand Down
10 changes: 5 additions & 5 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,23 +371,23 @@ func TestPriorityQueue(t *testing.T) {
pq.Remove(uint64(1))
re.Nil(pq.Get(1))
re.Equal(2, pq.Len())
entry := pq.Peek()
entry := pq.peek()
re.Equal(2, entry.Priority)
re.Equal(testData[2], entry.Value)

// case3 update 3's priority to highest
pq.Put(-1, testData[3])
entry = pq.Peek()
entry = pq.peek()
re.Equal(-1, entry.Priority)
re.Equal(testData[3], entry.Value)
pq.Remove(entry.Value.ID())
re.Equal(testData[2], pq.Peek().Value)
re.Equal(testData[2], pq.peek().Value)
re.Equal(1, pq.Len())

// case4 remove all element
pq.Remove(uint64(2))
re.Equal(0, pq.Len())
re.Empty(pq.items)
re.Nil(pq.Peek())
re.Nil(pq.Tail())
re.Nil(pq.peek())
re.Nil(pq.tail())
}
Loading

0 comments on commit 0945a1b

Please sign in to comment.