Skip to content

Commit

Permalink
sqlite: fix too many parameters on trim sectors (#410)
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger authored Jun 10, 2024
1 parent cdd3745 commit 94db73b
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 48 deletions.
79 changes: 31 additions & 48 deletions persist/sqlite/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,71 +658,54 @@ ORDER BY root_index ASC;`, contractID, i, j)
}, nil
}

// lastNContractSectors returns the last n sector IDs for a contract.
func lastNContractSectors(tx txn, contractID int64, n uint64) (roots []contractSectorRootRef, err error) {
const query = `SELECT csr.id, csr.sector_id, ss.sector_root FROM contract_sector_roots csr
// trimSectors deletes the last n sector roots for a contract and returns the
// deleted sector roots in order.
func trimSectors(tx txn, contractID int64, n uint64, log *zap.Logger) ([]types.Hash256, error) {
selectStmt, err := tx.Prepare(`SELECT csr.id, csr.sector_id, ss.sector_root FROM contract_sector_roots csr
INNER JOIN stored_sectors ss ON (csr.sector_id=ss.id)
WHERE csr.contract_id=$1
ORDER BY root_index DESC
LIMIT $2;`

rows, err := tx.Query(query, contractID, n)
if err != nil {
return nil, err
}
defer rows.Close()

for rows.Next() {
ref, err := scanContractSectorRootRef(rows)
if err != nil {
return nil, fmt.Errorf("failed to scan sector ref: %w", err)
}
roots = append(roots, ref)
}
return
}

// deleteContractSectorRoots deletes the contract sector roots with the given IDs.
func deleteContractSectorRoots(tx txn, ids []int64) error {
query := `DELETE FROM contract_sector_roots WHERE id IN (` + queryPlaceHolders(len(ids)) + `);`
res, err := tx.Exec(query, queryArgs(ids)...)
LIMIT 1`)
if err != nil {
return fmt.Errorf("failed to delete contract sector roots: %w", err)
} else if n, err := res.RowsAffected(); err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
} else if n != int64(len(ids)) {
return fmt.Errorf("expected %v rows affected, got %v", len(ids), n)
return nil, fmt.Errorf("failed to prepare select statement: %w", err)
}
return nil
}
defer selectStmt.Close()

// trimSectors deletes the last n sector roots for a contract and returns the
// deleted sector roots in order.
func trimSectors(tx txn, contractID int64, n uint64, log *zap.Logger) ([]types.Hash256, error) {
refs, err := lastNContractSectors(tx, contractID, n)
deleteStmt, err := tx.Prepare(`DELETE FROM contract_sector_roots WHERE id=$1;`)
if err != nil {
return nil, fmt.Errorf("failed to get sector roots: %w", err)
}
return nil, fmt.Errorf("failed to prepare delete statement: %w", err)
}

sectorIDs := make([]int64, 0, n)
roots := make([]types.Hash256, n)
for i := 0; i < int(n); i++ {
var contractSectorID int64
var root types.Hash256
var sectorID int64

if err := selectStmt.QueryRow(contractID).Scan(&contractSectorID, &sectorID, (*sqlHash256)(&root)); err != nil {
return nil, fmt.Errorf("failed to get sector root: %w", err)
} else if res, err := deleteStmt.Exec(contractSectorID); err != nil {
return nil, fmt.Errorf("failed to delete sector root: %w", err)
} else if n, err := res.RowsAffected(); err != nil {
return nil, fmt.Errorf("failed to get rows affected: %w", err)
} else if n != 1 {
return nil, fmt.Errorf("expected 1 row affected, got %v", n)
}

var contractSectorRootIDs []int64
roots := make([]types.Hash256, len(refs))
var sectorIDs []int64
for i, ref := range refs {
contractSectorRootIDs = append(contractSectorRootIDs, ref.dbID)
roots[len(roots)-i-1] = ref.root // reverse the order to match the contract sector roots
sectorIDs = append(sectorIDs, ref.sectorID)
sectorIDs = append(sectorIDs, sectorID)
roots[len(roots)-i-1] = root // reverse order
}

if err := deleteContractSectorRoots(tx, contractSectorRootIDs); err != nil {
return nil, fmt.Errorf("failed to delete contract sector roots: %w", err)
} else if err := incrementNumericStat(tx, metricContractSectors, -len(contractSectorRootIDs), time.Now()); err != nil {
if err := incrementNumericStat(tx, metricContractSectors, -int(n), time.Now()); err != nil {
return nil, fmt.Errorf("failed to decrement contract sectors: %w", err)
}

removed, err := pruneSectors(tx, sectorIDs)
if err != nil {
return nil, fmt.Errorf("failed to prune sectors: %w", err)
}

log.Debug("trimmed sectors", zap.Stringers("trimmed", roots), zap.Stringers("removed", removed))
return roots, nil
}
Expand Down
80 changes: 80 additions & 0 deletions persist/sqlite/contracts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,3 +399,83 @@ func TestContracts(t *testing.T) {
t.Fatal("expected no contracts")
}
}

func BenchmarkTrimSectors(b *testing.B) {
log := zaptest.NewLogger(b)
db, err := OpenDatabase(filepath.Join(b.TempDir(), "test.db"), log)
if err != nil {
b.Fatal(err)
}
defer db.Close()

renterKey := types.NewPrivateKeyFromSeed(frand.Bytes(32))
hostKey := types.NewPrivateKeyFromSeed(frand.Bytes(32))

contractUnlockConditions := types.UnlockConditions{
PublicKeys: []types.UnlockKey{
renterKey.PublicKey().UnlockKey(),
hostKey.PublicKey().UnlockKey(),
},
SignaturesRequired: 2,
}

// add a contract to the database
contract := contracts.SignedRevision{
Revision: types.FileContractRevision{
ParentID: frand.Entropy256(),
UnlockConditions: contractUnlockConditions,
FileContract: types.FileContract{
UnlockHash: types.Hash256(contractUnlockConditions.UnlockHash()),
RevisionNumber: 1,
WindowStart: 100,
WindowEnd: 200,
},
},
}

if err := db.AddContract(contract, []types.Transaction{}, types.ZeroCurrency, contracts.Usage{}, 0); err != nil {
b.Fatal(err)
}

volumeID, err := db.AddVolume("test.dat", false)
if err != nil {
b.Fatal(err)
} else if err := db.SetAvailable(volumeID, true); err != nil {
b.Fatal(err)
} else if err = db.GrowVolume(volumeID, uint64(b.N)); err != nil {
b.Fatal(err)
}

roots := make([]types.Hash256, 0, b.N)
appendActions := make([]contracts.SectorChange, 0, b.N)
releaseFuncs := make([]func() error, 0, b.N)

for i := 0; i < b.N; i++ {
root := frand.Entropy256()
roots = append(roots, root)
appendActions = append(appendActions, contracts.SectorChange{Action: contracts.SectorActionAppend, Root: root})

release, err := db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil })
if err != nil {
b.Fatal(err)
}
releaseFuncs = append(releaseFuncs, release)
}

if err := db.ReviseContract(contract, nil, contracts.Usage{}, appendActions); err != nil {
b.Fatal(err)
}
for _, fn := range releaseFuncs {
if err := fn(); err != nil {
b.Fatal(err)
}
}

b.ResetTimer()
b.ReportAllocs()
b.ReportMetric(float64(b.N), "sectors")

if err := db.ReviseContract(contract, roots, contracts.Usage{}, []contracts.SectorChange{{Action: contracts.SectorActionTrim, A: uint64(b.N)}}); err != nil {
b.Fatal(err)
}
}

0 comments on commit 94db73b

Please sign in to comment.