From c5afe6130097b5ae66d7d450333238ebcd6666bf Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Tue, 27 Feb 2024 16:42:34 -0800 Subject: [PATCH 1/8] go/store/datas/pull: Create a PullChunkTracker for keeping track of what to pull. The PullChunkTracker is an optimization which can concurrently call HasMany on the destination database while chunks are being pulled from the source database. --- go/store/datas/pull/pull_chunk_tracker.go | 260 ++++++++++++++++++ .../datas/pull/pull_chunk_tracker_test.go | 138 ++++++++++ go/store/datas/pull/puller.go | 137 ++++++--- 3 files changed, 494 insertions(+), 41 deletions(-) create mode 100644 go/store/datas/pull/pull_chunk_tracker.go create mode 100644 go/store/datas/pull/pull_chunk_tracker_test.go diff --git a/go/store/datas/pull/pull_chunk_tracker.go b/go/store/datas/pull/pull_chunk_tracker.go new file mode 100644 index 0000000000..3a43c6d233 --- /dev/null +++ b/go/store/datas/pull/pull_chunk_tracker.go @@ -0,0 +1,260 @@ +// Copyright 2024 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pull + +import ( + "context" + "errors" + "sync" + + "github.com/dolthub/dolt/go/store/hash" +) + +type Haser interface { + HasMany(context.Context, hash.HashSet) (hash.HashSet, error) +} + +type TrackerConfig struct { + BatchSize int + + HasManyThreadCount int + + Haser Haser +} + +type PullChunkTracker struct { + ctx context.Context + seen hash.HashSet + cfg TrackerConfig + wg sync.WaitGroup + + uncheckedCh chan hash.Hash + reqCh chan *trackerGetAbsentReq +} + +func NewPullChunkTracker(ctx context.Context, initial hash.HashSet, cfg TrackerConfig) *PullChunkTracker { + ret := &PullChunkTracker{ + ctx: ctx, + seen: make(hash.HashSet), + cfg: cfg, + uncheckedCh: make(chan hash.Hash), + reqCh: make(chan *trackerGetAbsentReq), + } + ret.wg.Add(1) + go func() { + defer ret.wg.Done() + ret.thread(initial) + }() + return ret +} + +func (t *PullChunkTracker) Seen(h hash.Hash) { + if !t.seen.Has(h) { + t.seen.Insert(h) + t.addUnchecked(h) + } +} + +func (t *PullChunkTracker) Close() { + close(t.uncheckedCh) + t.wg.Wait() +} + +func (t *PullChunkTracker) addUnchecked(h hash.Hash) { + select { + case t.uncheckedCh <- h: + case <-t.ctx.Done(): + } +} + +func (t *PullChunkTracker) GetChunksToFetch() (hash.HashSet, bool, error) { + var req trackerGetAbsentReq + req.ready = make(chan struct{}) + + select { + case t.reqCh <- &req: + case <-t.ctx.Done(): + return nil, false, t.ctx.Err() + } + + select { + case <-req.ready: + case <-t.ctx.Done(): + return nil, false, t.ctx.Err() + } + + return req.hs, req.ok, req.err +} + +func (t *PullChunkTracker) thread(initial hash.HashSet) { + doneCh := make(chan struct{}) + hasManyReqCh := make(chan trackerHasManyReq) + hasManyRespCh := make(chan trackerHasManyResp) + + var wg sync.WaitGroup + wg.Add(t.cfg.HasManyThreadCount) + + for i := 0; i < t.cfg.HasManyThreadCount; i++ { + go func() { + defer wg.Done() + hasManyThread(t.ctx, t.cfg.Haser, hasManyReqCh, hasManyRespCh, doneCh) + }() + } + + defer func() { + close(doneCh) + wg.Wait() + }() + + unchecked := make([]hash.HashSet, 0) + absent := make([]hash.HashSet, 0) + + var err error + outstanding := 0 + + if len(initial) > 0 { + unchecked = append(unchecked, initial) + outstanding += 1 + } + + for { + var thisReqCh = t.reqCh + if outstanding != 0 && len(absent) == 0 { + // If we are waiting for a HasMany response and we don't currently have any + // absent addresses to return, block any absent requests. + thisReqCh = nil + } + + var thisHasManyReqCh chan trackerHasManyReq + var hasManyReq trackerHasManyReq + if len(unchecked) > 0 { + hasManyReq.hs = unchecked[0] + thisHasManyReqCh = hasManyReqCh + } + + select { + case h, ok := <-t.uncheckedCh: + if !ok { + return + } + if len(unchecked) == 0 || len(unchecked[len(unchecked)-1]) >= t.cfg.BatchSize { + outstanding += 1 + unchecked = append(unchecked, make(hash.HashSet)) + } + unchecked[len(unchecked)-1].Insert(h) + case resp := <-hasManyRespCh: + outstanding -= 1 + if resp.err != nil { + err = errors.Join(err, resp.err) + } else if len(resp.hs) > 0 { + absent = append(absent, resp.hs) + } + case thisHasManyReqCh <- hasManyReq: + copy(unchecked[:], unchecked[1:]) + if len(unchecked) > 1 { + unchecked[len(unchecked)-1] = nil + } + unchecked = unchecked[:len(unchecked)-1] + case req := <-thisReqCh: + if err != nil { + req.err = err + close(req.ready) + err = nil + } else if len(absent) == 0 { + req.ok = false + close(req.ready) + } else { + req.ok = true + req.hs = absent[0] + var i int + for i = 1; i < len(absent); i++ { + if len(req.hs)+len(absent[i]) < t.cfg.BatchSize { + req.hs.InsertAll(absent[i]) + } else { + break + } + } + copy(absent[:], absent[i:]) + for j := range absent[:len(absent)-i] { + absent[len(absent)-i+j] = nil + } + absent = absent[:len(absent)-i] + close(req.ready) + } + case <-t.ctx.Done(): + return + } + } +} + +// Run by a PullChunkTracker, calls HasMany on a batch of addresses and delivers the results. +func hasManyThread(ctx context.Context, haser Haser, reqCh <-chan trackerHasManyReq, respCh chan<- trackerHasManyResp, doneCh <-chan struct{}) { + for { + select { + case req := <-reqCh: + hs, err := haser.HasMany(ctx, req.hs) + if err != nil { + select { + case respCh <- trackerHasManyResp{err: err}: + case <-ctx.Done(): + return + case <-doneCh: + return + } + } else { + select { + case respCh <- trackerHasManyResp{hs: hs}: + case <-ctx.Done(): + return + case <-doneCh: + return + } + } + case <-doneCh: + return + case <-ctx.Done(): + return + } + } +} + +// Sent by the tracker thread to a HasMany thread, includes a batch of +// addresses to HasMany. The response comes back to the tracker thread on a +// separate channel as a |trackerHasManyResp|. +type trackerHasManyReq struct { + hs hash.HashSet +} + +// Sent by the HasMany thread back to the tracker thread. +// If HasMany returned an error, it will be returned here. +type trackerHasManyResp struct { + hs hash.HashSet + err error +} + +// Sent by a client calling |GetChunksToFetch| to the tracker thread. The +// tracker thread will return a batch of chunk addresses that need to be +// fetched from source and added to destination. +// +// This will block until HasMany requests are completed. +// +// If |ok| is |false|, then the Tracker is closing because every absent address +// has been delivered. +type trackerGetAbsentReq struct { + hs hash.HashSet + err error + ok bool + ready chan struct{} +} diff --git a/go/store/datas/pull/pull_chunk_tracker_test.go b/go/store/datas/pull/pull_chunk_tracker_test.go new file mode 100644 index 0000000000..44c3427c33 --- /dev/null +++ b/go/store/datas/pull/pull_chunk_tracker_test.go @@ -0,0 +1,138 @@ +// Copyright 2024 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pull + +import ( + "context" + "errors" + "testing" + + "github.com/dolthub/dolt/go/store/hash" + + "github.com/stretchr/testify/assert" +) + +func TestPullChunkTracker(t *testing.T) { + t.Run("Empty", func(t *testing.T) { + tracker := NewPullChunkTracker(context.Background(), make(hash.HashSet), TrackerConfig{ + BatchSize: 64 * 1024, + HasManyThreadCount: 3, + Haser: nil, + }) + hs, ok, err := tracker.GetChunksToFetch() + assert.Len(t, hs, 0) + assert.False(t, ok) + assert.NoError(t, err) + tracker.Close() + }) + + t.Run("HasAllInitial", func(t *testing.T) { + hs := make(hash.HashSet) + for i := byte(0); i < byte(10); i++ { + var h hash.Hash + h[0] = i + hs.Insert(h) + } + tracker := NewPullChunkTracker(context.Background(), hs, TrackerConfig{ + BatchSize: 64 * 1024, + HasManyThreadCount: 3, + Haser: hasAllHaser{}, + }) + hs, ok, err := tracker.GetChunksToFetch() + assert.Len(t, hs, 0) + assert.False(t, ok) + assert.NoError(t, err) + tracker.Close() + }) + + t.Run("HasNoneInitial", func(t *testing.T) { + hs := make(hash.HashSet) + for i := byte(0); i < byte(10); i++ { + var h hash.Hash + h[0] = i + hs.Insert(h) + } + tracker := NewPullChunkTracker(context.Background(), hs, TrackerConfig{ + BatchSize: 64 * 1024, + HasManyThreadCount: 3, + Haser: hasNoneHaser{}, + }) + hs, ok, err := tracker.GetChunksToFetch() + assert.Len(t, hs, 10) + assert.True(t, ok) + assert.NoError(t, err) + hs, ok, err = tracker.GetChunksToFetch() + assert.Len(t, hs, 0) + assert.False(t, ok) + assert.NoError(t, err) + + for i := byte(0); i < byte(10); i++ { + var h hash.Hash + h[1] = i + tracker.Seen(h) + } + + cnt := 0 + for { + hs, ok, err := tracker.GetChunksToFetch() + assert.NoError(t, err) + if !ok { + assert.Equal(t, 10, cnt) + break + } + cnt += len(hs) + } + + tracker.Close() + }) + + t.Run("HasManyError", func(t *testing.T) { + hs := make(hash.HashSet) + for i := byte(0); i < byte(10); i++ { + var h hash.Hash + h[0] = i + hs.Insert(h) + } + tracker := NewPullChunkTracker(context.Background(), hs, TrackerConfig{ + BatchSize: 64 * 1024, + HasManyThreadCount: 3, + Haser: errHaser{}, + }) + _, _, err := tracker.GetChunksToFetch() + assert.Error(t, err) + tracker.Close() + }) +} + +type hasAllHaser struct { +} + +func (hasAllHaser) HasMany(context.Context, hash.HashSet) (hash.HashSet, error) { + return make(hash.HashSet), nil +} + +type hasNoneHaser struct { +} + +func (hasNoneHaser) HasMany(ctx context.Context, hs hash.HashSet) (hash.HashSet, error) { + return hs, nil +} + +type errHaser struct { +} + +func (errHaser) HasMany(ctx context.Context, hs hash.HashSet) (hash.HashSet, error) { + return nil, errors.New("always throws an error") +} diff --git a/go/store/datas/pull/puller.go b/go/store/datas/pull/puller.go index 5bc93578eb..71d8ba6c03 100644 --- a/go/store/datas/pull/puller.go +++ b/go/store/datas/pull/puller.go @@ -45,7 +45,6 @@ var ErrDBUpToDate = errors.New("the database does not need to be pulled as it's var ErrIncompatibleSourceChunkStore = errors.New("the chunk store of the source database does not implement NBSCompressedChunkStore.") const ( - maxChunkWorkers = 2 outstandingTableFiles = 2 ) @@ -251,6 +250,7 @@ func emitStats(s *stats, ch chan Stats) (cancel func()) { defer wg.Done() updateduration := 1 * time.Second ticker := time.NewTicker(updateduration) + defer ticker.Stop() for { select { case <-ticker.C: @@ -308,6 +308,11 @@ func (p *Puller) uploadTempTableFile(ctx context.Context, tmpTblFile tempTblFile _ = tmpTblFile.read.Remove() }() + // So far, we've added all the bytes for the compressed chunk data. + // We add the remaining bytes here --- bytes for the index and the + // table file footer. + atomic.AddUint64(&p.stats.bufferedSendBytes, uint64(fileSize)-tmpTblFile.chunksLen) + // By tracking the number of bytes uploaded here, // we can add bytes on to our bufferedSendBytes when // we have to retry a table file write. @@ -318,22 +323,89 @@ func (p *Puller) uploadTempTableFile(ctx context.Context, tmpTblFile tempTblFile return nil, 0, err } - if localUploaded == 0 { - // So far, we've added all the bytes for the compressed chunk data. - // We add the remaining bytes here --- bytes for the index and the - // table file footer. - atomic.AddUint64(&p.stats.bufferedSendBytes, uint64(fileSize)-tmpTblFile.chunksLen) - } else { - // A retry. We treat it as if what was already uploaded was rebuffered. - atomic.AddUint64(&p.stats.bufferedSendBytes, uint64(localUploaded)) - localUploaded = 0 - } + // If we've ever uploaded anything before, this is a retry. We + // treat it as if what was already uploaded was rebuffered. + atomic.AddUint64(&p.stats.bufferedSendBytes, uint64(localUploaded)) + localUploaded = 0 + fWithStats := countingReader{countingReader{rc, &localUploaded}, &p.stats.finishedSendBytes} return fWithStats, uint64(fileSize), nil }) } +// Upload Table Files +// +// This runs a goroutine to upload table files it reads off of completedTables. +// It forwards uploaded tables to to fileIdToNumChunksCh. + +type manifestEntry struct { + fileID string + numChunks int +} + +func (p *Puller) uploadTableFiles(ctx context.Context, completedTables <-chan FilledWriters, fileIdToNumChunksCh chan<- manifestEntry) error { + for { + select { + case tblFile, ok := <-completedTables: + if !ok { + return nil + } + p.tablefileSema.Release(1) + + // content length before we finish the write, which will + // add the index and table file footer. + chunksLen := tblFile.wr.ContentLength() + + id, err := tblFile.wr.Finish() + if err != nil { + return err + } + + ttf := tempTblFile{ + id: id, + read: tblFile.wr, + numChunks: tblFile.wr.ChunkCount(), + chunksLen: chunksLen, + contentLen: tblFile.wr.ContentLength(), + contentHash: tblFile.wr.GetMD5(), + } + err = p.uploadTempTableFile(ctx, ttf) + if err != nil { + return err + } + + select { + case fileIdToNumChunksCh <- manifestEntry{ttf.id, ttf.numChunks}: + case <-ctx.Done(): + return ctx.Err() + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// Add Table Files to Manifest +// +// At the end, after all table files are uploaded, we add them all to the +// destination manifest at once. + +func (p *Puller) addTableFilesToManifest(ctx context.Context, fileIdToNumChunksCh <-chan manifestEntry) error { + fileIdToNumChunks := make(map[string]int) + for { + select { + case entry, ok := <-fileIdToNumChunksCh: + if !ok { + return p.sinkDBCS.(chunks.TableFileStore).AddTableFilesToManifest(ctx, fileIdToNumChunks) + } + fileIdToNumChunks[entry.fileID] = entry.numChunks + case <-ctx.Done(): + return ctx.Err() + } + } +} + func (p *Puller) processCompletedTables(ctx context.Context, completedTables <-chan FilledWriters) error { fileIdToNumChunks := make(map[string]int) @@ -398,35 +470,22 @@ func (p *Puller) Pull(ctx context.Context) error { } const batchSize = 64 * 1024 - // refs are added to |visited| on first sight - visited := p.hashes - // |absent| are visited, un-batched refs - absent := p.hashes.Copy() - // |batches| are visited, un-fetched refs - batches := make([]hash.HashSet, 0, 64) - - for absent.Size() > 0 || len(batches) > 0 { - if absent.Size() >= batchSize { - var bb []hash.HashSet - absent, bb = batchNovel(absent, batchSize) - batches = append(batches, bb...) - } - if len(batches) == 0 { - batches = append(batches, absent) - absent = make(hash.HashSet) - } - - b := batches[len(batches)-1] - batches = batches[:len(batches)-1] + tracker := NewPullChunkTracker(ctx, p.hashes, TrackerConfig{ + BatchSize: batchSize, + HasManyThreadCount: 3, + Haser: p.sinkDBCS, + }) - b, err = p.sinkDBCS.HasMany(ctx, b) + for { + toFetch, hasMore, err := tracker.GetChunksToFetch() if err != nil { return err - } else if b.Size() == 0 { - continue + } + if !hasMore { + break } - err = p.getCmp(ctx, b, absent, visited, completedTables) + err = p.getCmp(ctx, toFetch, tracker, completedTables) if err != nil { return err } @@ -460,7 +519,7 @@ func batchNovel(absent hash.HashSet, batch int) (remainder hash.HashSet, batches return } -func (p *Puller) getCmp(ctx context.Context, batch, absent, visited hash.HashSet, completedTables chan FilledWriters) error { +func (p *Puller) getCmp(ctx context.Context, batch hash.HashSet, tracker *PullChunkTracker, completedTables chan FilledWriters) error { found := make(chan nbs.CompressedChunk, 4096) processed := make(chan CmpChnkAndRefs, 4096) @@ -496,11 +555,7 @@ func (p *Puller) getCmp(ctx context.Context, batch, absent, visited hash.HashSet return err } err = p.waf(chnk, func(h hash.Hash, _ bool) error { - if !visited.Has(h) { - // first sight of |h| - visited.Insert(h) - absent.Insert(h) - } + tracker.Seen(h) return nil }) if err != nil { From 1a6bf25c2d6f2f94d572dbd51b508ed62e8a9862 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 29 Feb 2024 10:26:07 -0800 Subject: [PATCH 2/8] go/store/datas/pull: PullChunkTracker: Make sure that the initial set of hashes to pull are also seen. --- go/store/datas/pull/pull_chunk_tracker.go | 1 + .../datas/pull/pull_chunk_tracker_test.go | 35 +++++++++++++++++-- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/go/store/datas/pull/pull_chunk_tracker.go b/go/store/datas/pull/pull_chunk_tracker.go index 3a43c6d233..523bcb9796 100644 --- a/go/store/datas/pull/pull_chunk_tracker.go +++ b/go/store/datas/pull/pull_chunk_tracker.go @@ -52,6 +52,7 @@ func NewPullChunkTracker(ctx context.Context, initial hash.HashSet, cfg TrackerC uncheckedCh: make(chan hash.Hash), reqCh: make(chan *trackerGetAbsentReq), } + ret.seen.InsertAll(initial) ret.wg.Add(1) go func() { defer ret.wg.Done() diff --git a/go/store/datas/pull/pull_chunk_tracker_test.go b/go/store/datas/pull/pull_chunk_tracker_test.go index 44c3427c33..8de82cb758 100644 --- a/go/store/datas/pull/pull_chunk_tracker_test.go +++ b/go/store/datas/pull/pull_chunk_tracker_test.go @@ -59,7 +59,7 @@ func TestPullChunkTracker(t *testing.T) { t.Run("HasNoneInitial", func(t *testing.T) { hs := make(hash.HashSet) - for i := byte(0); i < byte(10); i++ { + for i := byte(1); i <= byte(10); i++ { var h hash.Hash h[0] = i hs.Insert(h) @@ -78,7 +78,7 @@ func TestPullChunkTracker(t *testing.T) { assert.False(t, ok) assert.NoError(t, err) - for i := byte(0); i < byte(10); i++ { + for i := byte(1); i <= byte(10); i++ { var h hash.Hash h[1] = i tracker.Seen(h) @@ -114,6 +114,37 @@ func TestPullChunkTracker(t *testing.T) { assert.Error(t, err) tracker.Close() }) + + t.Run("InitialAreSeen", func(t *testing.T) { + hs := make(hash.HashSet) + for i := byte(0); i < byte(10); i++ { + var h hash.Hash + h[0] = i + hs.Insert(h) + } + tracker := NewPullChunkTracker(context.Background(), hs, TrackerConfig{ + BatchSize: 64 * 1024, + HasManyThreadCount: 3, + Haser: hasNoneHaser{}, + }) + hs, ok, err := tracker.GetChunksToFetch() + assert.Len(t, hs, 10) + assert.True(t, ok) + assert.NoError(t, err) + + for i := byte(0); i < byte(10); i++ { + var h hash.Hash + h[0] = i + tracker.Seen(h) + } + + hs, ok, err = tracker.GetChunksToFetch() + assert.Len(t, hs, 0) + assert.False(t, ok) + assert.NoError(t, err) + + tracker.Close() + }) } type hasAllHaser struct { From 383a196d53326141e7e0d162bdb881ebb158077d Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Mon, 4 Mar 2024 12:13:46 -0800 Subject: [PATCH 3/8] go/store/datas/pull: puller.go: Simplify diff to relevant portion of this change. --- go/store/datas/pull/puller.go | 91 ++++------------------------------- 1 file changed, 10 insertions(+), 81 deletions(-) diff --git a/go/store/datas/pull/puller.go b/go/store/datas/pull/puller.go index 71d8ba6c03..3387257567 100644 --- a/go/store/datas/pull/puller.go +++ b/go/store/datas/pull/puller.go @@ -308,11 +308,6 @@ func (p *Puller) uploadTempTableFile(ctx context.Context, tmpTblFile tempTblFile _ = tmpTblFile.read.Remove() }() - // So far, we've added all the bytes for the compressed chunk data. - // We add the remaining bytes here --- bytes for the index and the - // table file footer. - atomic.AddUint64(&p.stats.bufferedSendBytes, uint64(fileSize)-tmpTblFile.chunksLen) - // By tracking the number of bytes uploaded here, // we can add bytes on to our bufferedSendBytes when // we have to retry a table file write. @@ -323,10 +318,16 @@ func (p *Puller) uploadTempTableFile(ctx context.Context, tmpTblFile tempTblFile return nil, 0, err } - // If we've ever uploaded anything before, this is a retry. We - // treat it as if what was already uploaded was rebuffered. - atomic.AddUint64(&p.stats.bufferedSendBytes, uint64(localUploaded)) - localUploaded = 0 + if localUploaded == 0 { + // So far, we've added all the bytes for the compressed chunk data. + // We add the remaining bytes here --- bytes for the index and the + // table file footer. + atomic.AddUint64(&p.stats.bufferedSendBytes, uint64(fileSize)-tmpTblFile.chunksLen) + } else { + // A retry. We treat it as if what was already uploaded was rebuffered. + atomic.AddUint64(&p.stats.bufferedSendBytes, uint64(localUploaded)) + localUploaded = 0 + } fWithStats := countingReader{countingReader{rc, &localUploaded}, &p.stats.finishedSendBytes} @@ -334,78 +335,6 @@ func (p *Puller) uploadTempTableFile(ctx context.Context, tmpTblFile tempTblFile }) } -// Upload Table Files -// -// This runs a goroutine to upload table files it reads off of completedTables. -// It forwards uploaded tables to to fileIdToNumChunksCh. - -type manifestEntry struct { - fileID string - numChunks int -} - -func (p *Puller) uploadTableFiles(ctx context.Context, completedTables <-chan FilledWriters, fileIdToNumChunksCh chan<- manifestEntry) error { - for { - select { - case tblFile, ok := <-completedTables: - if !ok { - return nil - } - p.tablefileSema.Release(1) - - // content length before we finish the write, which will - // add the index and table file footer. - chunksLen := tblFile.wr.ContentLength() - - id, err := tblFile.wr.Finish() - if err != nil { - return err - } - - ttf := tempTblFile{ - id: id, - read: tblFile.wr, - numChunks: tblFile.wr.ChunkCount(), - chunksLen: chunksLen, - contentLen: tblFile.wr.ContentLength(), - contentHash: tblFile.wr.GetMD5(), - } - err = p.uploadTempTableFile(ctx, ttf) - if err != nil { - return err - } - - select { - case fileIdToNumChunksCh <- manifestEntry{ttf.id, ttf.numChunks}: - case <-ctx.Done(): - return ctx.Err() - } - case <-ctx.Done(): - return ctx.Err() - } - } -} - -// Add Table Files to Manifest -// -// At the end, after all table files are uploaded, we add them all to the -// destination manifest at once. - -func (p *Puller) addTableFilesToManifest(ctx context.Context, fileIdToNumChunksCh <-chan manifestEntry) error { - fileIdToNumChunks := make(map[string]int) - for { - select { - case entry, ok := <-fileIdToNumChunksCh: - if !ok { - return p.sinkDBCS.(chunks.TableFileStore).AddTableFilesToManifest(ctx, fileIdToNumChunks) - } - fileIdToNumChunks[entry.fileID] = entry.numChunks - case <-ctx.Done(): - return ctx.Err() - } - } -} - func (p *Puller) processCompletedTables(ctx context.Context, completedTables <-chan FilledWriters) error { fileIdToNumChunks := make(map[string]int) From b3e3082bb02aca133a08e7baf1b4970e08b71861 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Tue, 5 Mar 2024 11:01:39 -0800 Subject: [PATCH 4/8] go/store/datas/pull: pull_chunk_tracker.go: PR feedback: Make HasManyThreadCount a constant. --- go/store/datas/pull/pull_chunk_tracker.go | 18 +++++++++--------- go/store/datas/pull/pull_chunk_tracker_test.go | 15 +++++---------- go/store/datas/pull/puller.go | 3 +-- 3 files changed, 15 insertions(+), 21 deletions(-) diff --git a/go/store/datas/pull/pull_chunk_tracker.go b/go/store/datas/pull/pull_chunk_tracker.go index 523bcb9796..77b00ca118 100644 --- a/go/store/datas/pull/pull_chunk_tracker.go +++ b/go/store/datas/pull/pull_chunk_tracker.go @@ -22,18 +22,18 @@ import ( "github.com/dolthub/dolt/go/store/hash" ) -type Haser interface { +type HasManyer interface { HasMany(context.Context, hash.HashSet) (hash.HashSet, error) } type TrackerConfig struct { BatchSize int - HasManyThreadCount int - - Haser Haser + HasManyer HasManyer } +const hasManyThreadCount = 3 + type PullChunkTracker struct { ctx context.Context seen hash.HashSet @@ -105,12 +105,12 @@ func (t *PullChunkTracker) thread(initial hash.HashSet) { hasManyRespCh := make(chan trackerHasManyResp) var wg sync.WaitGroup - wg.Add(t.cfg.HasManyThreadCount) + wg.Add(hasManyThreadCount) - for i := 0; i < t.cfg.HasManyThreadCount; i++ { + for i := 0; i < hasManyThreadCount; i++ { go func() { defer wg.Done() - hasManyThread(t.ctx, t.cfg.Haser, hasManyReqCh, hasManyRespCh, doneCh) + hasManyThread(t.ctx, t.cfg.HasManyer, hasManyReqCh, hasManyRespCh, doneCh) }() } @@ -201,11 +201,11 @@ func (t *PullChunkTracker) thread(initial hash.HashSet) { } // Run by a PullChunkTracker, calls HasMany on a batch of addresses and delivers the results. -func hasManyThread(ctx context.Context, haser Haser, reqCh <-chan trackerHasManyReq, respCh chan<- trackerHasManyResp, doneCh <-chan struct{}) { +func hasManyThread(ctx context.Context, hasManyer HasManyer, reqCh <-chan trackerHasManyReq, respCh chan<- trackerHasManyResp, doneCh <-chan struct{}) { for { select { case req := <-reqCh: - hs, err := haser.HasMany(ctx, req.hs) + hs, err := hasManyer.HasMany(ctx, req.hs) if err != nil { select { case respCh <- trackerHasManyResp{err: err}: diff --git a/go/store/datas/pull/pull_chunk_tracker_test.go b/go/store/datas/pull/pull_chunk_tracker_test.go index 8de82cb758..28b7b73a86 100644 --- a/go/store/datas/pull/pull_chunk_tracker_test.go +++ b/go/store/datas/pull/pull_chunk_tracker_test.go @@ -28,8 +28,7 @@ func TestPullChunkTracker(t *testing.T) { t.Run("Empty", func(t *testing.T) { tracker := NewPullChunkTracker(context.Background(), make(hash.HashSet), TrackerConfig{ BatchSize: 64 * 1024, - HasManyThreadCount: 3, - Haser: nil, + HasManyer: nil, }) hs, ok, err := tracker.GetChunksToFetch() assert.Len(t, hs, 0) @@ -47,8 +46,7 @@ func TestPullChunkTracker(t *testing.T) { } tracker := NewPullChunkTracker(context.Background(), hs, TrackerConfig{ BatchSize: 64 * 1024, - HasManyThreadCount: 3, - Haser: hasAllHaser{}, + HasManyer: hasAllHaser{}, }) hs, ok, err := tracker.GetChunksToFetch() assert.Len(t, hs, 0) @@ -66,8 +64,7 @@ func TestPullChunkTracker(t *testing.T) { } tracker := NewPullChunkTracker(context.Background(), hs, TrackerConfig{ BatchSize: 64 * 1024, - HasManyThreadCount: 3, - Haser: hasNoneHaser{}, + HasManyer: hasNoneHaser{}, }) hs, ok, err := tracker.GetChunksToFetch() assert.Len(t, hs, 10) @@ -107,8 +104,7 @@ func TestPullChunkTracker(t *testing.T) { } tracker := NewPullChunkTracker(context.Background(), hs, TrackerConfig{ BatchSize: 64 * 1024, - HasManyThreadCount: 3, - Haser: errHaser{}, + HasManyer: errHaser{}, }) _, _, err := tracker.GetChunksToFetch() assert.Error(t, err) @@ -124,8 +120,7 @@ func TestPullChunkTracker(t *testing.T) { } tracker := NewPullChunkTracker(context.Background(), hs, TrackerConfig{ BatchSize: 64 * 1024, - HasManyThreadCount: 3, - Haser: hasNoneHaser{}, + HasManyer: hasNoneHaser{}, }) hs, ok, err := tracker.GetChunksToFetch() assert.Len(t, hs, 10) diff --git a/go/store/datas/pull/puller.go b/go/store/datas/pull/puller.go index 3387257567..915c2859ae 100644 --- a/go/store/datas/pull/puller.go +++ b/go/store/datas/pull/puller.go @@ -401,8 +401,7 @@ func (p *Puller) Pull(ctx context.Context) error { const batchSize = 64 * 1024 tracker := NewPullChunkTracker(ctx, p.hashes, TrackerConfig{ BatchSize: batchSize, - HasManyThreadCount: 3, - Haser: p.sinkDBCS, + HasManyer: p.sinkDBCS, }) for { From 7b7c3b267982d442050f658a2b10735163a05468 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Tue, 5 Mar 2024 11:11:42 -0800 Subject: [PATCH 5/8] go/store/datas/pull: pull_chunk_tracker.go: PR feedback: Add a comment about PullChunkTracker. --- go/store/datas/pull/pull_chunk_tracker.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/go/store/datas/pull/pull_chunk_tracker.go b/go/store/datas/pull/pull_chunk_tracker.go index 77b00ca118..7d91fe1510 100644 --- a/go/store/datas/pull/pull_chunk_tracker.go +++ b/go/store/datas/pull/pull_chunk_tracker.go @@ -34,6 +34,17 @@ type TrackerConfig struct { const hasManyThreadCount = 3 +// A PullChunkTracker keeps track of seen chunk addresses and returns every +// seen chunk address which is not already in the destination database exactly +// once. A Puller instantiantes one of these with the initial set of addresses +// to pull, and repeatedly calls |GetChunksToFetch|. It passes in all +// references it finds in the fetched chunks to |Seen|, and continues to call +// |GetChunksToFetch| and deliver new addresses to |Seen| until +// |GetChunksToFetch| returns |false| from its |more| return boolean. +// +// PullChunkTracker is able to call |HasMany| on the destination database in +// parallel with other work the Puller does and abstracts out the logic for +// keeping track of seen, unchecked and to pull hcunk addresses. type PullChunkTracker struct { ctx context.Context seen hash.HashSet @@ -56,7 +67,7 @@ func NewPullChunkTracker(ctx context.Context, initial hash.HashSet, cfg TrackerC ret.wg.Add(1) go func() { defer ret.wg.Done() - ret.thread(initial) + ret.reqRespThread(initial) }() return ret } @@ -99,7 +110,9 @@ func (t *PullChunkTracker) GetChunksToFetch() (hash.HashSet, bool, error) { return req.hs, req.ok, req.err } -func (t *PullChunkTracker) thread(initial hash.HashSet) { +// The main logic of the PullChunkTracker, receives requests from other threads +// and responds to them. +func (t *PullChunkTracker) reqRespThread(initial hash.HashSet) { doneCh := make(chan struct{}) hasManyReqCh := make(chan trackerHasManyReq) hasManyRespCh := make(chan trackerHasManyResp) From a19717a9756a5a8cd1d00b7a29732b586dc204ac Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Tue, 5 Mar 2024 11:38:33 -0800 Subject: [PATCH 6/8] go/store/datas/pull: pull_chunk_tracker.go: PR feedback: Add a test for when HasMany returns a subset of queried chunks. --- .../datas/pull/pull_chunk_tracker_test.go | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/go/store/datas/pull/pull_chunk_tracker_test.go b/go/store/datas/pull/pull_chunk_tracker_test.go index 28b7b73a86..8dac6ae97d 100644 --- a/go/store/datas/pull/pull_chunk_tracker_test.go +++ b/go/store/datas/pull/pull_chunk_tracker_test.go @@ -140,6 +140,60 @@ func TestPullChunkTracker(t *testing.T) { tracker.Close() }) + + t.Run("StaticHaser", func(t *testing.T) { + haser := staticHaser{make(hash.HashSet)} + initial := make([]hash.Hash, 4) + initial[0][0] = 1 + initial[1][0] = 2 + initial[2][0] = 1 + initial[2][1] = 1 + initial[3][0] = 1 + initial[3][1] = 2 + haser.has.Insert(initial[0]) + haser.has.Insert(initial[1]) + haser.has.Insert(initial[2]) + haser.has.Insert(initial[3]) + + hs := make(hash.HashSet) + // Start with 1 - 5 + for i := byte(1); i <= byte(5); i++ { + var h hash.Hash + h[0] = i + hs.Insert(h) + } + tracker := NewPullChunkTracker(context.Background(), hs, TrackerConfig{ + BatchSize: 64 * 1024, + HasManyer: haser, + }) + + // Should get back 03, 04, 05 + hs, ok, err := tracker.GetChunksToFetch() + assert.Len(t, hs, 3) + assert.True(t, ok) + assert.NoError(t, err) + + for i := byte(1); i <= byte(10); i++ { + var h hash.Hash + h[0] = 1 + h[1] = i + tracker.Seen(h) + } + + // Should get back 13, 14, 15, 16, 17, 18, 19, 1(10). + cnt := 0 + for { + hs, ok, err := tracker.GetChunksToFetch() + assert.NoError(t, err) + if !ok { + break + } + cnt += len(hs) + } + assert.Equal(t, 8, cnt) + + tracker.Close() + }) } type hasAllHaser struct { @@ -156,6 +210,20 @@ func (hasNoneHaser) HasMany(ctx context.Context, hs hash.HashSet) (hash.HashSet, return hs, nil } +type staticHaser struct { + has hash.HashSet +} + +func (s staticHaser) HasMany(ctx context.Context, query hash.HashSet) (hash.HashSet, error) { + ret := make(hash.HashSet) + for h := range query { + if !s.has.Has(h) { + ret.Insert(h) + } + } + return ret, nil +} + type errHaser struct { } From dce814d7490ee9972eedd2e6b940dea07348afe3 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Tue, 5 Mar 2024 12:04:06 -0800 Subject: [PATCH 7/8] go/store/datas/pull: pull_chunk_tracker.go: PR feedback: Round out the last batch returned from GetChunksToFetch. --- go/store/datas/pull/pull_chunk_tracker.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/go/store/datas/pull/pull_chunk_tracker.go b/go/store/datas/pull/pull_chunk_tracker.go index 7d91fe1510..1e6a5bf772 100644 --- a/go/store/datas/pull/pull_chunk_tracker.go +++ b/go/store/datas/pull/pull_chunk_tracker.go @@ -194,9 +194,17 @@ func (t *PullChunkTracker) reqRespThread(initial hash.HashSet) { req.hs = absent[0] var i int for i = 1; i < len(absent); i++ { - if len(req.hs)+len(absent[i]) < t.cfg.BatchSize { + l := len(absent[i]) + if len(req.hs)+l < t.cfg.BatchSize { req.hs.InsertAll(absent[i]) } else { + for h := range absent[i] { + if len(req.hs) >= t.cfg.BatchSize { + break + } + req.hs.Insert(h) + absent[i].Remove(h) + } break } } From ccb3dd3e17e0886c1c9a5425c6427ad8487a53b2 Mon Sep 17 00:00:00 2001 From: reltuk Date: Tue, 5 Mar 2024 20:13:26 +0000 Subject: [PATCH 8/8] [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh --- .../datas/pull/pull_chunk_tracker_test.go | 24 +++++++++---------- go/store/datas/pull/puller.go | 4 ++-- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/go/store/datas/pull/pull_chunk_tracker_test.go b/go/store/datas/pull/pull_chunk_tracker_test.go index 8dac6ae97d..8c34d61d82 100644 --- a/go/store/datas/pull/pull_chunk_tracker_test.go +++ b/go/store/datas/pull/pull_chunk_tracker_test.go @@ -27,8 +27,8 @@ import ( func TestPullChunkTracker(t *testing.T) { t.Run("Empty", func(t *testing.T) { tracker := NewPullChunkTracker(context.Background(), make(hash.HashSet), TrackerConfig{ - BatchSize: 64 * 1024, - HasManyer: nil, + BatchSize: 64 * 1024, + HasManyer: nil, }) hs, ok, err := tracker.GetChunksToFetch() assert.Len(t, hs, 0) @@ -45,8 +45,8 @@ func TestPullChunkTracker(t *testing.T) { hs.Insert(h) } tracker := NewPullChunkTracker(context.Background(), hs, TrackerConfig{ - BatchSize: 64 * 1024, - HasManyer: hasAllHaser{}, + BatchSize: 64 * 1024, + HasManyer: hasAllHaser{}, }) hs, ok, err := tracker.GetChunksToFetch() assert.Len(t, hs, 0) @@ -63,8 +63,8 @@ func TestPullChunkTracker(t *testing.T) { hs.Insert(h) } tracker := NewPullChunkTracker(context.Background(), hs, TrackerConfig{ - BatchSize: 64 * 1024, - HasManyer: hasNoneHaser{}, + BatchSize: 64 * 1024, + HasManyer: hasNoneHaser{}, }) hs, ok, err := tracker.GetChunksToFetch() assert.Len(t, hs, 10) @@ -103,8 +103,8 @@ func TestPullChunkTracker(t *testing.T) { hs.Insert(h) } tracker := NewPullChunkTracker(context.Background(), hs, TrackerConfig{ - BatchSize: 64 * 1024, - HasManyer: errHaser{}, + BatchSize: 64 * 1024, + HasManyer: errHaser{}, }) _, _, err := tracker.GetChunksToFetch() assert.Error(t, err) @@ -119,8 +119,8 @@ func TestPullChunkTracker(t *testing.T) { hs.Insert(h) } tracker := NewPullChunkTracker(context.Background(), hs, TrackerConfig{ - BatchSize: 64 * 1024, - HasManyer: hasNoneHaser{}, + BatchSize: 64 * 1024, + HasManyer: hasNoneHaser{}, }) hs, ok, err := tracker.GetChunksToFetch() assert.Len(t, hs, 10) @@ -163,8 +163,8 @@ func TestPullChunkTracker(t *testing.T) { hs.Insert(h) } tracker := NewPullChunkTracker(context.Background(), hs, TrackerConfig{ - BatchSize: 64 * 1024, - HasManyer: haser, + BatchSize: 64 * 1024, + HasManyer: haser, }) // Should get back 03, 04, 05 diff --git a/go/store/datas/pull/puller.go b/go/store/datas/pull/puller.go index 915c2859ae..ed89c3466c 100644 --- a/go/store/datas/pull/puller.go +++ b/go/store/datas/pull/puller.go @@ -400,8 +400,8 @@ func (p *Puller) Pull(ctx context.Context) error { const batchSize = 64 * 1024 tracker := NewPullChunkTracker(ctx, p.hashes, TrackerConfig{ - BatchSize: batchSize, - HasManyer: p.sinkDBCS, + BatchSize: batchSize, + HasManyer: p.sinkDBCS, }) for {