diff --git a/compaction.go b/compaction.go index 9e56fb0f08..ce69a033de 100644 --- a/compaction.go +++ b/compaction.go @@ -1279,7 +1279,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) { iter := overlaps.Iter() for m := iter.First(); m != nil; m = iter.Next() { - newFiles, err := d.excise(ingestFlushable.exciseSpan.UserKeyBounds(), m, ve, l) + newFiles, err := d.excise(context.TODO(), ingestFlushable.exciseSpan.UserKeyBounds(), m, ve, l) if err != nil { return nil, err } @@ -1298,7 +1298,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) { } if len(ingestSplitFiles) > 0 { - if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil { + if err := d.ingestSplit(context.TODO(), ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil { return nil, err } } diff --git a/compaction_test.go b/compaction_test.go index 990e9ea60c..edd6283c21 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -2106,7 +2106,7 @@ func TestCompactionErrorCleanup(t *testing.T) { require.NoError(t, w.Set([]byte(k), nil)) } require.NoError(t, w.Close()) - require.NoError(t, d.Ingest([]string{"ext"})) + require.NoError(t, d.Ingest(context.Background(), []string{"ext"})) } ingest("a", "c") ingest("b") @@ -2591,7 +2591,7 @@ func TestCompaction_LogAndApplyFails(t *testing.T) { require.NoError(t, w.Set(key, nil)) require.NoError(t, w.Close()) // Ingest the SST. - return db.Ingest([]string{fName}) + return db.Ingest(context.Background(), []string{fName}) } testCases := []struct { @@ -2800,7 +2800,7 @@ func TestCompactionErrorStats(t *testing.T) { require.NoError(t, w.Set([]byte(k), nil)) } require.NoError(t, w.Close()) - require.NoError(t, d.Ingest([]string{"ext"})) + require.NoError(t, d.Ingest(context.Background(), []string{"ext"})) } ingest("a", "c") // Snapshot will preserve the older "a" key during compaction. diff --git a/data_test.go b/data_test.go index d6048bbf29..3e356b2dba 100644 --- a/data_test.go +++ b/data_test.go @@ -1352,7 +1352,7 @@ func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { } } - if _, err := d.IngestAndExcise(paths, nil /* shared */, nil /* external */, exciseSpan, sstContainsExciseTombstone); err != nil { + if _, err := d.IngestAndExcise(context.Background(), paths, nil /* shared */, nil /* external */, exciseSpan, sstContainsExciseTombstone); err != nil { return err } return nil @@ -1364,7 +1364,7 @@ func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { paths = append(paths, arg.String()) } - if err := d.Ingest(paths); err != nil { + if err := d.Ingest(context.Background(), paths); err != nil { return err } return nil @@ -1444,7 +1444,7 @@ func runIngestExternalCmd( external = append(external, ef) } - if _, err := d.IngestExternalFiles(external); err != nil { + if _, err := d.IngestExternalFiles(context.Background(), external); err != nil { return err } return nil diff --git a/db_test.go b/db_test.go index 8a882ca455..867f6f36b3 100644 --- a/db_test.go +++ b/db_test.go @@ -1110,7 +1110,7 @@ func TestDBClosed(t *testing.T) { require.True(t, errors.Is(catch(func() { _, _, _ = d.Get(nil) }), ErrClosed)) require.True(t, errors.Is(catch(func() { _ = d.Delete(nil, nil) }), ErrClosed)) require.True(t, errors.Is(catch(func() { _ = d.DeleteRange(nil, nil, nil) }), ErrClosed)) - require.True(t, errors.Is(catch(func() { _ = d.Ingest(nil) }), ErrClosed)) + require.True(t, errors.Is(catch(func() { _ = d.Ingest(context.Background(), nil) }), ErrClosed)) require.True(t, errors.Is(catch(func() { _ = d.LogData(nil, nil) }), ErrClosed)) require.True(t, errors.Is(catch(func() { _ = d.Merge(nil, nil, nil) }), ErrClosed)) require.True(t, errors.Is(catch(func() { _ = d.RatchetFormatMajorVersion(internalFormatNewest) }), ErrClosed)) @@ -1182,7 +1182,7 @@ func TestDBConcurrentCompactClose(t *testing.T) { }) require.NoError(t, w.Set([]byte(fmt.Sprint(j)), nil)) require.NoError(t, w.Close()) - require.NoError(t, d.Ingest([]string{path})) + require.NoError(t, d.Ingest(context.Background(), []string{path})) } require.NoError(t, d.Close()) @@ -1642,7 +1642,7 @@ func TestMemtableIngestInversion(t *testing.T) { }) require.NoError(t, w.Set([]byte("cc"), []byte("foo"))) require.NoError(t, w.Close()) - require.NoError(t, d.Ingest([]string{path})) + require.NoError(t, d.Ingest(context.Background(), []string{path})) } { path := "ingest2.sst" @@ -1654,7 +1654,7 @@ func TestMemtableIngestInversion(t *testing.T) { require.NoError(t, w.Set([]byte("bb"), []byte("foo2"))) require.NoError(t, w.Set([]byte("cc"), []byte("foo2"))) require.NoError(t, w.Close()) - require.NoError(t, d.Ingest([]string{path})) + require.NoError(t, d.Ingest(context.Background(), []string{path})) } { path := "ingest3.sst" @@ -1665,7 +1665,7 @@ func TestMemtableIngestInversion(t *testing.T) { }) require.NoError(t, w.Set([]byte("bb"), []byte("foo3"))) require.NoError(t, w.Close()) - require.NoError(t, d.Ingest([]string{path})) + require.NoError(t, d.Ingest(context.Background(), []string{path})) } { path := "ingest4.sst" @@ -1676,7 +1676,7 @@ func TestMemtableIngestInversion(t *testing.T) { }) require.NoError(t, w.Set([]byte("bb"), []byte("foo4"))) require.NoError(t, w.Close()) - require.NoError(t, d.Ingest([]string{path})) + require.NoError(t, d.Ingest(context.Background(), []string{path})) } // We now have a base compaction blocked. Block a memtable flush to cause @@ -1755,7 +1755,7 @@ func TestMemtableIngestInversion(t *testing.T) { }) require.NoError(t, w.DeleteRange([]byte("cc"), []byte("e"))) require.NoError(t, w.Close()) - require.NoError(t, d.Ingest([]string{path})) + require.NoError(t, d.Ingest(context.Background(), []string{path})) } t.Log("main ingest complete") printLSM() @@ -1789,7 +1789,7 @@ func TestMemtableIngestInversion(t *testing.T) { }) require.NoError(t, w.Set([]byte("cc"), []byte("doesntmatter"))) require.NoError(t, w.Close()) - require.NoError(t, d.Ingest([]string{path})) + require.NoError(t, d.Ingest(context.Background(), []string{path})) } // Unblock earlier flushes. We will first finish flushing the blocked diff --git a/event_listener_test.go b/event_listener_test.go index 42b1d97d63..87249935fd 100644 --- a/event_listener_test.go +++ b/event_listener_test.go @@ -6,6 +6,7 @@ package pebble import ( "bytes" + "context" "fmt" "reflect" "runtime" @@ -147,7 +148,7 @@ func TestEventListener(t *testing.T) { if err := w.Close(); err != nil { return err.Error() } - if err := d.Ingest([]string{"ext/0"}); err != nil { + if err := d.Ingest(context.Background(), []string{"ext/0"}); err != nil { return err.Error() } return memLog.String() @@ -190,7 +191,7 @@ func TestEventListener(t *testing.T) { if err := writeTable(tableB, 'b'); err != nil { return err.Error() } - if err := d.Ingest([]string{tableA, tableB}); err != nil { + if err := d.Ingest(context.Background(), []string{tableA, tableB}); err != nil { return err.Error() } diff --git a/flushable_test.go b/flushable_test.go index 82b4420dd2..f8ee20580b 100644 --- a/flushable_test.go +++ b/flushable_test.go @@ -2,6 +2,7 @@ package pebble import ( "bytes" + "context" "fmt" "testing" @@ -57,7 +58,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { // We can reuse the ingestLoad function for this test even if we're // not actually ingesting a file. - lr, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheID, pendingOutputs) + lr, err := ingestLoad(context.Background(), d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheID, pendingOutputs) if err != nil { panic(err) } @@ -85,7 +86,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { // (e.g. because the files reside on a different filesystem), ingestLink will // fall back to copying, and if that fails we undo our work and return an // error. - if err := ingestLinkLocal(jobID, d.opts, d.objProvider, lr.local); err != nil { + if err := ingestLinkLocal(context.Background(), jobID, d.opts, d.objProvider, lr.local); err != nil { panic("couldn't hard link sstables") } diff --git a/ingest.go b/ingest.go index d0972d1544..7332aac6b6 100644 --- a/ingest.go +++ b/ingest.go @@ -423,6 +423,7 @@ func (r *ingestLoadResult) fileCount() int { } func ingestLoad( + ctx context.Context, opts *Options, fmv FormatMajorVersion, paths []string, @@ -431,8 +432,6 @@ func ingestLoad( cacheID uint64, pending []base.FileNum, ) (ingestLoadResult, error) { - ctx := context.TODO() - localFileNums := pending[:len(paths)] sharedFileNums := pending[len(paths) : len(paths)+len(shared)] externalFileNums := pending[len(paths)+len(shared) : len(paths)+len(shared)+len(external)] @@ -590,11 +589,15 @@ func ingestCleanup(objProvider objstorage.Provider, meta []ingestLocalMeta) erro // ingestLinkLocal creates new objects which are backed by either hardlinks to or // copies of the ingested files. func ingestLinkLocal( - jobID JobID, opts *Options, objProvider objstorage.Provider, localMetas []ingestLocalMeta, + ctx context.Context, + jobID JobID, + opts *Options, + objProvider objstorage.Provider, + localMetas []ingestLocalMeta, ) error { for i := range localMetas { objMeta, err := objProvider.LinkOrCopyFromLocal( - context.TODO(), opts.FS, localMetas[i].path, fileTypeTable, localMetas[i].FileBacking.DiskFileNum, + ctx, opts.FS, localMetas[i].path, fileTypeTable, localMetas[i].FileBacking.DiskFileNum, objstorage.CreateOptions{PreferSharedStorage: true}, ) if err != nil { @@ -1027,14 +1030,14 @@ func ingestTargetLevel( // can produce a noticeable hiccup in performance. See // https://github.com/cockroachdb/pebble/issues/25 for an idea for how to fix // this hiccup. -func (d *DB) Ingest(paths []string) error { +func (d *DB) Ingest(ctx context.Context, paths []string) error { if err := d.closed.Load(); err != nil { panic(err) } if d.opts.ReadOnly { return ErrReadOnly } - _, err := d.ingest(paths, nil /* shared */, KeyRange{}, false, nil /* external */) + _, err := d.ingest(ctx, paths, nil /* shared */, KeyRange{}, false, nil /* external */) return err } @@ -1115,21 +1118,23 @@ type ExternalFile struct { // IngestWithStats does the same as Ingest, and additionally returns // IngestOperationStats. -func (d *DB) IngestWithStats(paths []string) (IngestOperationStats, error) { +func (d *DB) IngestWithStats(ctx context.Context, paths []string) (IngestOperationStats, error) { if err := d.closed.Load(); err != nil { panic(err) } if d.opts.ReadOnly { return IngestOperationStats{}, ErrReadOnly } - return d.ingest(paths, nil, KeyRange{}, false, nil) + return d.ingest(ctx, paths, nil, KeyRange{}, false, nil) } // IngestExternalFiles does the same as IngestWithStats, and additionally // accepts external files (with locator info that can be resolved using // d.opts.SharedStorage). These files must also be non-overlapping with // each other, and must be resolvable through d.objProvider. -func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats, error) { +func (d *DB) IngestExternalFiles( + ctx context.Context, external []ExternalFile, +) (IngestOperationStats, error) { if err := d.closed.Load(); err != nil { panic(err) } @@ -1140,7 +1145,7 @@ func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats, if d.opts.Experimental.RemoteStorage == nil { return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured") } - return d.ingest(nil, nil, KeyRange{}, false, external) + return d.ingest(ctx, nil, nil, KeyRange{}, false, external) } // IngestAndExcise does the same as IngestWithStats, and additionally accepts a @@ -1154,6 +1159,7 @@ func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats, // Panics if this DB instance was not instantiated with a remote.Storage and // shared sstables are present. func (d *DB) IngestAndExcise( + ctx context.Context, paths []string, shared []SharedSSTMeta, external []ExternalFile, @@ -1181,7 +1187,7 @@ func (d *DB) IngestAndExcise( v, FormatMinForSharedObjects, ) } - return d.ingest(paths, shared, exciseSpan, sstsContainExciseTombstone, external) + return d.ingest(ctx, paths, shared, exciseSpan, sstsContainExciseTombstone, external) } // Both DB.mu and commitPipeline.mu must be held while this is called. @@ -1303,6 +1309,7 @@ func (d *DB) handleIngestAsFlushable( // See comment at Ingest() for details on how this works. func (d *DB) ingest( + ctx context.Context, paths []string, shared []SharedSSTMeta, exciseSpan KeyRange, @@ -1325,7 +1332,6 @@ func (d *DB) ingest( } } } - ctx := context.Background() // Allocate file numbers for all of the files being ingested and mark them as // pending in order to prevent them from being deleted. Note that this causes // the file number ordering to be out of alignment with sequence number @@ -1342,7 +1348,7 @@ func (d *DB) ingest( // Load the metadata for all the files being ingested. This step detects // and elides empty sstables. - loadResult, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheID, pendingOutputs) + loadResult, err := ingestLoad(ctx, d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheID, pendingOutputs) if err != nil { return IngestOperationStats{}, err } @@ -1362,7 +1368,7 @@ func (d *DB) ingest( // (e.g. because the files reside on a different filesystem), ingestLinkLocal // will fall back to copying, and if that fails we undo our work and return an // error. - if err := ingestLinkLocal(jobID, d.opts, d.objProvider, loadResult.local); err != nil { + if err := ingestLinkLocal(ctx, jobID, d.opts, d.objProvider, loadResult.local); err != nil { return IngestOperationStats{}, err } @@ -1697,7 +1703,7 @@ func (d *DB) ingest( // // The manifest lock must be held when calling this method. func (d *DB) excise( - exciseSpan base.UserKeyBounds, m *fileMetadata, ve *versionEdit, level int, + ctx context.Context, exciseSpan base.UserKeyBounds, m *fileMetadata, ve *versionEdit, level int, ) ([]manifest.NewFileEntry, error) { numCreatedFiles := 0 // Check if there's actually an overlap between m and exciseSpan. @@ -1722,7 +1728,7 @@ func (d *DB) excise( return nil } var err error - iters, err = d.newIters(context.TODO(), m, &IterOptions{ + iters, err = d.newIters(ctx, m, &IterOptions{ CategoryAndQoS: sstable.CategoryAndQoS{ Category: "pebble-ingest", QoSLevel: sstable.LatencySensitiveQoSLevel, @@ -1982,6 +1988,7 @@ type ingestSplitFile struct { // // d.mu as well as the manifest lock must be held when calling this method. func (d *DB) ingestSplit( + ctx context.Context, ve *versionEdit, updateMetrics func(*fileMetadata, int, []newFileEntry), files []ingestSplitFile, @@ -2047,7 +2054,7 @@ func (d *DB) ingestSplit( // as we're guaranteed to not have any data overlap between splitFile and // s.ingestFile. d.excise will return an error if we pass an inclusive user // key bound _and_ we end up seeing data overlap at the end key. - added, err := d.excise(base.UserKeyBoundsFromInternal(s.ingestFile.Smallest, s.ingestFile.Largest), splitFile, ve, s.level) + added, err := d.excise(ctx, base.UserKeyBoundsFromInternal(s.ingestFile.Smallest, s.ingestFile.Largest), splitFile, ve, s.level) if err != nil { return err } @@ -2288,7 +2295,7 @@ func (d *DB) ingestApply( iter := overlaps.Iter() for m := iter.First(); m != nil; m = iter.Next() { - newFiles, err := d.excise(exciseSpan.UserKeyBounds(), m, ve, level) + newFiles, err := d.excise(ctx, exciseSpan.UserKeyBounds(), m, ve, level) if err != nil { return nil, err } @@ -2308,7 +2315,7 @@ func (d *DB) ingestApply( if len(filesToSplit) > 0 { // For the same reasons as the above call to excise, we hold the db mutex // while calling this method. - if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, filesToSplit, replacedFiles); err != nil { + if err := d.ingestSplit(ctx, ve, updateLevelMetricsOnExcise, filesToSplit, replacedFiles); err != nil { return nil, err } } diff --git a/ingest_test.go b/ingest_test.go index 3b68eb5936..fd703e60e2 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -129,7 +129,7 @@ func TestIngestLoad(t *testing.T) { Comparer: DefaultComparer, FS: mem, }).WithFSDefaults() - lr, err := ingestLoad(opts, dbVersion, []string{"ext"}, nil, nil, 0, []base.FileNum{1}) + lr, err := ingestLoad(context.Background(), opts, dbVersion, []string{"ext"}, nil, nil, 0, []base.FileNum{1}) if err != nil { return err.Error() } @@ -220,7 +220,7 @@ func TestIngestLoadRand(t *testing.T) { Comparer: DefaultComparer, FS: mem, }).WithFSDefaults() - lr, err := ingestLoad(opts, version, paths, nil, nil, 0, pending) + lr, err := ingestLoad(context.Background(), opts, version, paths, nil, nil, 0, pending) require.NoError(t, err) for _, m := range lr.local { @@ -240,7 +240,7 @@ func TestIngestLoadInvalid(t *testing.T) { Comparer: DefaultComparer, FS: mem, }).WithFSDefaults() - if _, err := ingestLoad(opts, internalFormatNewest, []string{"invalid"}, nil, nil, 0, []base.FileNum{1}); err == nil { + if _, err := ingestLoad(context.Background(), opts, internalFormatNewest, []string{"invalid"}, nil, nil, 0, []base.FileNum{1}); err == nil { t.Fatalf("expected error, but found success") } } @@ -336,7 +336,7 @@ func TestIngestLink(t *testing.T) { opts.FS.Remove(meta[i].path) } - err = ingestLinkLocal(0 /* jobID */, opts, objProvider, meta) + err = ingestLinkLocal(context.Background(), 0 /* jobID */, opts, objProvider, meta) if i < count { if err == nil { t.Fatalf("expected error, but found success") @@ -403,7 +403,7 @@ func TestIngestLinkFallback(t *testing.T) { meta := &fileMetadata{FileNum: 1} meta.InitPhysicalBacking() - err = ingestLinkLocal(0, opts, objProvider, []ingestLocalMeta{{fileMetadata: meta, path: "source"}}) + err = ingestLinkLocal(context.Background(), 0, opts, objProvider, []ingestLocalMeta{{fileMetadata: meta, path: "source"}}) require.NoError(t, err) dest, err := mem.Open("000001.sst") @@ -839,7 +839,7 @@ func TestExcise(t *testing.T) { current.IterAllLevelsAndSublevels(func(iter manifest.LevelIterator, l manifest.Layer) { for m := iter.SeekGE(d.cmp, exciseSpan.Start); m != nil && d.cmp(m.Smallest.UserKey, exciseSpan.End) < 0; m = iter.Next() { - _, err := d.excise(exciseSpan.UserKeyBounds(), m, ve, l.Level()) + _, err := d.excise(context.Background(), exciseSpan.UserKeyBounds(), m, ve, l.Level()) if err != nil { td.Fatalf(t, "error when excising %s: %s", m.FileNum, err.Error()) } @@ -1129,7 +1129,7 @@ func testIngestSharedImpl( require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, KeyRange{Start: startKey, End: endKey}, false) + _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil /* external */, KeyRange{Start: startKey, End: endKey}, false) require.NoError(t, err) return fmt.Sprintf("replicated %d shared SSTs", len(sharedSSTs)) @@ -1191,7 +1191,7 @@ func testIngestSharedImpl( for level := range current.Levels { iter := current.Levels[level].Iter() for m := iter.SeekGE(d.cmp, exciseSpan.Start); m != nil && d.cmp(m.Smallest.UserKey, exciseSpan.End) < 0; m = iter.Next() { - _, err := d.excise(exciseSpan.UserKeyBounds(), m, ve, level) + _, err := d.excise(context.Background(), exciseSpan.UserKeyBounds(), m, ve, level) if err != nil { d.mu.Lock() d.mu.versions.logUnlock() @@ -1358,7 +1358,9 @@ func TestSimpleIngestShared(t *testing.T) { Level: 6, Size: uint64(size + 5), } - _, err = d.IngestAndExcise([]string{}, []SharedSSTMeta{sharedSSTMeta}, nil /* external */, KeyRange{Start: []byte("d"), End: []byte("ee")}, false) + _, err = d.IngestAndExcise( + context.Background(), []string{}, []SharedSSTMeta{sharedSSTMeta}, nil, /* external */ + KeyRange{Start: []byte("d"), End: []byte("ee")}, false) require.NoError(t, err) // TODO(bilal): Once reading of shared sstables is in, verify that the values @@ -1628,7 +1630,7 @@ func TestConcurrentExcise(t *testing.T) { require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, nil, KeyRange{Start: startKey, End: endKey}, false) + _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil, KeyRange{Start: startKey, End: endKey}, false) require.NoError(t, err) return fmt.Sprintf("replicated %d shared SSTs", len(sharedSSTs)) @@ -1690,7 +1692,7 @@ func TestConcurrentExcise(t *testing.T) { for level := range current.Levels { iter := current.Levels[level].Iter() for m := iter.SeekGE(d.cmp, exciseSpan.Start); m != nil && d.cmp(m.Smallest.UserKey, exciseSpan.End) < 0; m = iter.Next() { - _, err := d.excise(exciseSpan.UserKeyBounds(), m, ve, level) + _, err := d.excise(context.Background(), exciseSpan.UserKeyBounds(), m, ve, level) if err != nil { d.mu.Lock() d.mu.versions.logUnlock() @@ -2063,7 +2065,7 @@ func TestIngestExternal(t *testing.T) { ) require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise([]string{sstPath}, nil /* shared */, externalFiles, KeyRange{Start: startKey, End: endKey}, false) + _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, nil /* shared */, externalFiles, KeyRange{Start: startKey, End: endKey}, false) require.NoError(t, err) return fmt.Sprintf("replicated %d external SSTs", len(externalFiles)) @@ -2245,7 +2247,7 @@ func BenchmarkIngestOverlappingMemtable(b *testing.B) { assertNoError(w.Close()) b.StartTimer() - assertNoError(d.Ingest([]string{"ext"})) + assertNoError(d.Ingest(context.Background(), []string{"ext"})) } }) } @@ -2520,8 +2522,8 @@ func TestIngestError(t *testing.T) { }() ii.Store(i) - err1 := d.Ingest([]string{"ext0"}) - err2 := d.Ingest([]string{"ext1"}) + err1 := d.Ingest(context.Background(), []string{"ext0"}) + err2 := d.Ingest(context.Background(), []string{"ext1"}) err := firstError(err1, err2) if err != nil && !errors.Is(err, errorfs.ErrInjected) { t.Fatal(err) @@ -2562,7 +2564,7 @@ func TestIngestIdempotence(t *testing.T) { for i := 0; i < count; i++ { ingestPath := fs.PathJoin(dir, fmt.Sprintf("ext%d", i)) require.NoError(t, fs.Link(path, ingestPath)) - require.NoError(t, d.Ingest([]string{ingestPath})) + require.NoError(t, d.Ingest(context.Background(), []string{ingestPath})) } require.NoError(t, d.Close()) } @@ -2604,7 +2606,7 @@ func TestIngestCompact(t *testing.T) { // flushed. require.NoError(t, d.Set(key, nil, nil)) } - require.NoError(t, d.Ingest([]string{src(i)})) + require.NoError(t, d.Ingest(context.Background(), []string{src(i)})) } require.NoError(t, d.Close()) @@ -2641,7 +2643,7 @@ func TestConcurrentIngest(t *testing.T) { // Perform N ingestions concurrently. for i := 0; i < cap(errCh); i++ { go func(i int) { - err := d.Ingest([]string{src(i)}) + err := d.Ingest(context.Background(), []string{src(i)}) if err == nil { if _, err = d.opts.FS.Stat(src(i)); oserror.IsNotExist(err) { err = nil @@ -2686,7 +2688,7 @@ func TestConcurrentIngestCompact(t *testing.T) { require.NoError(t, w.Set([]byte(k), nil)) } require.NoError(t, w.Close()) - require.NoError(t, d.Ingest([]string{"ext"})) + require.NoError(t, d.Ingest(context.Background(), []string{"ext"})) } compact := func(start, end string) { @@ -2807,7 +2809,7 @@ func TestIngestFlushQueuedMemTable(t *testing.T) { require.NoError(t, w.Set([]byte(k), nil)) } require.NoError(t, w.Close()) - stats, err := d.IngestWithStats([]string{"ext"}) + stats, err := d.IngestWithStats(context.Background(), []string{"ext"}) require.NoError(t, err) require.Equal(t, stats.ApproxIngestedIntoL0Bytes, stats.Bytes) require.Equal(t, 1, stats.MemtableOverlappingFiles) @@ -2836,7 +2838,7 @@ func TestIngestStats(t *testing.T) { require.NoError(t, w.Set([]byte(k), nil)) } require.NoError(t, w.Close()) - stats, err := d.IngestWithStats([]string{"ext"}) + stats, err := d.IngestWithStats(context.Background(), []string{"ext"}) require.NoError(t, err) if expectedLevel == 0 { require.Equal(t, stats.ApproxIngestedIntoL0Bytes, stats.Bytes) @@ -2884,7 +2886,7 @@ func TestIngestFlushQueuedLargeBatch(t *testing.T) { require.NoError(t, w.Set([]byte(k), nil)) } require.NoError(t, w.Close()) - require.NoError(t, d.Ingest([]string{"ext"})) + require.NoError(t, d.Ingest(context.Background(), []string{"ext"})) } ingest("a") @@ -2922,7 +2924,7 @@ func TestIngestMemtablePendingOverlap(t *testing.T) { require.NoError(t, w.Set([]byte(k), nil)) } require.NoError(t, w.Close()) - require.NoError(t, d.Ingest([]string{"ext"})) + require.NoError(t, d.Ingest(context.Background(), []string{"ext"})) } var wg sync.WaitGroup @@ -3047,7 +3049,7 @@ func TestIngestMemtableOverlapRace(t *testing.T) { go untilDone(func() { filename := fmt.Sprintf("ext%d", totalIngests) require.NoError(t, mem.Link("ext", filename)) - require.NoError(t, d.Ingest([]string{filename})) + require.NoError(t, d.Ingest(context.Background(), []string{filename})) totalIngests++ }) @@ -3187,7 +3189,7 @@ func TestIngestFileNumReuseCrash(t *testing.T) { for _, f := range files { func() { defer func() { err = recover().(error) }() - err = d.Ingest([]string{fs.PathJoin(dir, f)}) + err = d.Ingest(context.Background(), []string{fs.PathJoin(dir, f)}) }() if err == nil || !errors.Is(err, errorfs.ErrInjected) { t.Fatalf("expected injected error, got %v", err) @@ -3644,7 +3646,7 @@ func TestIngestValidation(t *testing.T) { } // Ingest the external table. - err = d.Ingest([]string{ingestTableName}) + err = d.Ingest(context.Background(), []string{ingestTableName}) if err != nil { et.errLoc = errReportLocationIngest et.err = err @@ -3734,7 +3736,7 @@ func BenchmarkManySSTables(b *testing.B) { require.NoError(b, w.Close()) paths = append(paths, n) } - require.NoError(b, d.Ingest(paths)) + require.NoError(b, d.Ingest(context.Background(), paths)) { const broadIngest = "broad.sst" @@ -3744,7 +3746,7 @@ func BenchmarkManySSTables(b *testing.B) { require.NoError(b, w.Set([]byte("0"), nil)) require.NoError(b, w.Set([]byte("Z"), nil)) require.NoError(b, w.Close()) - require.NoError(b, d.Ingest([]string{broadIngest})) + require.NoError(b, d.Ingest(context.Background(), []string{broadIngest})) } switch op { @@ -3769,7 +3771,7 @@ func runBenchmarkManySSTablesIngest(b *testing.B, d *DB, fs vfs.FS, count int) { w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{}) require.NoError(b, w.Set([]byte(n), nil)) require.NoError(b, w.Close()) - require.NoError(b, d.Ingest([]string{n})) + require.NoError(b, d.Ingest(context.Background(), []string{n})) } } diff --git a/iterator_test.go b/iterator_test.go index 23765cb1e9..7d5bc63d50 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -2696,7 +2696,7 @@ func BenchmarkSeekPrefixTombstones(b *testing.B) { w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), wOpts) require.NoError(b, w.DeleteRange(testkeys.Key(ks, i), testkeys.Key(ks, i+1))) require.NoError(b, w.Close()) - require.NoError(b, d.Ingest([]string{filename})) + require.NoError(b, d.Ingest(context.Background(), []string{filename})) }() } diff --git a/metamorphic/ops.go b/metamorphic/ops.go index 06a1b17ce4..845723309d 100644 --- a/metamorphic/ops.go +++ b/metamorphic/ops.go @@ -704,7 +704,7 @@ func (o *ingestOp) run(t *Test, h historyRecorder) { } err = firstError(err, t.withRetries(func() error { - return t.getDB(o.dbID).Ingest(paths) + return t.getDB(o.dbID).Ingest(context.Background(), paths) })) h.Recordf("%s // %v", o, err) @@ -920,7 +920,7 @@ func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) { if t.testOpts.useExcise { err = firstError(err, t.withRetries(func() error { - _, err := db.IngestAndExcise([]string{path}, nil /* shared */, nil /* external */, pebble.KeyRange{ + _, err := db.IngestAndExcise(context.Background(), []string{path}, nil /* shared */, nil /* external */, pebble.KeyRange{ Start: o.exciseStart, End: o.exciseEnd, }, o.sstContainsExciseTombstone) @@ -929,7 +929,7 @@ func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) { } else { err = firstError(err, o.simulateExcise(db, t)) err = firstError(err, t.withRetries(func() error { - return db.Ingest([]string{path}) + return db.Ingest(context.Background(), []string{path}) })) } @@ -1011,7 +1011,7 @@ func (o *ingestExternalFilesOp) run(t *Test, h historyRecorder) { } } if len(paths) > 0 { - err = db.Ingest(paths) + err = db.Ingest(context.Background(), paths) } } else { external := make([]pebble.ExternalFile, len(o.objs)) @@ -1034,7 +1034,7 @@ func (o *ingestExternalFilesOp) run(t *Test, h historyRecorder) { external[i].SyntheticPrefix = obj.syntheticPrefix } } - _, err = db.IngestExternalFiles(external) + _, err = db.IngestExternalFiles(context.Background(), external) } h.Recordf("%s // %v", o, err) @@ -1976,7 +1976,7 @@ func (r *replicateOp) runSharedReplicate( return } - _, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, pebble.KeyRange{Start: r.start, End: r.end}, false) + _, err = dest.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil /* external */, pebble.KeyRange{Start: r.start, End: r.end}, false) h.Recordf("%s // %v", r, err) } @@ -2039,7 +2039,7 @@ func (r *replicateOp) runExternalReplicate( return } - _, err = dest.IngestAndExcise([]string{sstPath}, nil, externalSSTs /* external */, pebble.KeyRange{Start: r.start, End: r.end}, false /* sstContainsExciseTombstone */) + _, err = dest.IngestAndExcise(context.Background(), []string{sstPath}, nil, externalSSTs /* external */, pebble.KeyRange{Start: r.start, End: r.end}, false /* sstContainsExciseTombstone */) h.Recordf("%s // %v", r, err) } @@ -2127,7 +2127,7 @@ func (r *replicateOp) run(t *Test, h historyRecorder) { panic(err) } - err = dest.Ingest([]string{sstPath}) + err = dest.Ingest(context.Background(), []string{sstPath}) h.Recordf("%s // %v", r, err) } diff --git a/open_test.go b/open_test.go index 7667d590ef..61763b2ccf 100644 --- a/open_test.go +++ b/open_test.go @@ -613,7 +613,7 @@ func TestOpenReadOnly(t *testing.T) { require.EqualValues(t, ErrReadOnly, d.Delete(nil, nil)) require.EqualValues(t, ErrReadOnly, d.DeleteRange(nil, nil, nil)) - require.EqualValues(t, ErrReadOnly, d.Ingest(nil)) + require.EqualValues(t, ErrReadOnly, d.Ingest(context.Background(), nil)) require.EqualValues(t, ErrReadOnly, d.LogData(nil, nil)) require.EqualValues(t, ErrReadOnly, d.Merge(nil, nil, nil)) require.EqualValues(t, ErrReadOnly, d.Set(nil, nil, nil)) @@ -1222,7 +1222,7 @@ func TestOpenNeverFlushed(t *testing.T) { } db, err := Open("", opts) require.NoError(t, err) - require.NoError(t, db.Ingest([]string{"to-ingest.sst"})) + require.NoError(t, db.Ingest(context.Background(), []string{"to-ingest.sst"})) require.NoError(t, db.Close()) db, err = Open("", opts) diff --git a/range_del_test.go b/range_del_test.go index dd3b9d0103..7a051f56df 100644 --- a/range_del_test.go +++ b/range_del_test.go @@ -6,6 +6,7 @@ package pebble import ( "bytes" + "context" "fmt" "runtime" "strings" @@ -609,7 +610,7 @@ func benchmarkRangeDelIterate(b *testing.B, entries, deleted int, snapshotCompac if err := w.Close(); err != nil { b.Fatal(err) } - if err := d.Ingest([]string{"ext"}); err != nil { + if err := d.Ingest(context.Background(), []string{"ext"}); err != nil { b.Fatal(err) } diff --git a/replay/replay.go b/replay/replay.go index c6e4728574..2691cc414a 100644 --- a/replay/replay.go +++ b/replay/replay.go @@ -683,7 +683,7 @@ func (r *Runner) applyWorkloadSteps(ctx context.Context) error { r.metrics.writeBytes.Store(step.cumulativeWriteBytes) r.stepsApplied <- step case ingestStepKind: - if err := r.d.Ingest(step.tablesToIngest); err != nil { + if err := r.d.Ingest(context.Background(), step.tablesToIngest); err != nil { return err } r.metrics.writeBytes.Store(step.cumulativeWriteBytes) diff --git a/scan_internal_test.go b/scan_internal_test.go index 92f6ae7a96..9a7fff1d25 100644 --- a/scan_internal_test.go +++ b/scan_internal_test.go @@ -462,7 +462,7 @@ func TestScanInternal(t *testing.T) { file, err := d.opts.FS.Create("temp0.sst", vfs.WriteCategoryUnspecified) require.NoError(t, err) writeSST(points, rangeDels, rangeKeys, objstorageprovider.NewFileWritable(file)) - require.NoError(t, d.Ingest([]string{"temp0.sst"})) + require.NoError(t, d.Ingest(context.Background(), []string{"temp0.sst"})) } else if ingestExternal { points, rangeDels, rangeKeys := batchSort(b) largestUnsafe := points.Last() @@ -485,7 +485,7 @@ func TestScanInternal(t *testing.T) { EndKeyIsInclusive: true, HasPointKey: true, } - _, err = d.IngestExternalFiles([]ExternalFile{ef}) + _, err = d.IngestExternalFiles(context.Background(), []ExternalFile{ef}) require.NoError(t, err) } else if name != "" { batches[name] = b diff --git a/tool/db.go b/tool/db.go index 78e0614d96..51396f52d9 100644 --- a/tool/db.go +++ b/tool/db.go @@ -625,7 +625,7 @@ func (d *dbT) runExcise(cmd *cobra.Command, args []string) { return } - _, err = db.IngestAndExcise([]string{path}, nil, nil, span, true /* sstsContainExciseTombstone */) + _, err = db.IngestAndExcise(context.Background(), []string{path}, nil, nil, span, true /* sstsContainExciseTombstone */) if err != nil { fmt.Fprintf(stderr, "Error excising: %s\n", err) return diff --git a/version_set_test.go b/version_set_test.go index b280ee1563..e76d5cad4c 100644 --- a/version_set_test.go +++ b/version_set_test.go @@ -34,7 +34,7 @@ func writeAndIngest(t *testing.T, mem vfs.FS, d *DB, k InternalKey, v []byte, fi w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{}) require.NoError(t, w.Add(k, v)) require.NoError(t, w.Close()) - require.NoError(t, d.Ingest([]string{path})) + require.NoError(t, d.Ingest(context.Background(), []string{path})) } func TestVersionSet(t *testing.T) {