diff --git a/pkg/txn/c_scheduler.go b/pkg/txn/c_scheduler.go index ada8399..1e432d9 100644 --- a/pkg/txn/c_scheduler.go +++ b/pkg/txn/c_scheduler.go @@ -9,7 +9,11 @@ type Scheduler struct { sync.Mutex nextTs uint64 - readVisibilityWaiter *TsWaiter + // `readTsMarker` marks the visibility(ts) of read operations in a `newTransaction` to other transactions. + // Here we don't need TsWaiter as such, as we are not using the `WaitFor` API. However, + // we are using the `DoneTill` API to get the last completed readTs to remove the old readyToCommitTxns. + readTsMarker *TsWaiter + // `commitVisibilityWaiter` blocks `newTransaction` to ensure previous commits are visible to new reads. commitVisibilityWaiter *TsWaiter readyToCommitTxns []ReadyToCommitTxn @@ -18,17 +22,17 @@ type Scheduler struct { func NewScheduler() *Scheduler { scheduler := &Scheduler{ nextTs: 1, - readVisibilityWaiter: NewTsWaiter(), + readTsMarker: NewTsWaiter(), commitVisibilityWaiter: NewTsWaiter(), } - scheduler.readVisibilityWaiter.Done(scheduler.nextTs - 1) + scheduler.readTsMarker.Done(scheduler.nextTs - 1) scheduler.commitVisibilityWaiter.Done(scheduler.nextTs - 1) return scheduler } func (o *Scheduler) Stop() { - o.readVisibilityWaiter.Stop() + o.readTsMarker.Stop() o.commitVisibilityWaiter.Stop() } @@ -37,7 +41,7 @@ func (o *Scheduler) NewReadTs() uint64 { defer o.Unlock() beginTimestamp := o.nextTs - 1 - o.readVisibilityWaiter.Begin(beginTimestamp) + o.readTsMarker.Begin(beginTimestamp) err := o.commitVisibilityWaiter.WaitFor(context.Background(), beginTimestamp) if err != nil { @@ -73,7 +77,7 @@ func (o *Scheduler) NewCommitTs(transaction *Txn) (uint64, error) { } func (o *Scheduler) DoneRead(transaction *Txn) { - o.readVisibilityWaiter.Done(transaction.snapshot.ts) + o.readTsMarker.Done(transaction.snapshot.ts) } func (o *Scheduler) DoneCommit(commitTs uint64) { @@ -98,7 +102,7 @@ func (o *Scheduler) hasConflictFor(txn *Txn) bool { func (o *Scheduler) gcOldReadyToCommitTxns() { updatedReadyToCommitTxns := o.readyToCommitTxns[:0] - lastCommittedTxnTs := o.readVisibilityWaiter.DoneTill() + lastCommittedTxnTs := o.readTsMarker.DoneTill() for _, readyToCommitTxn := range o.readyToCommitTxns { if readyToCommitTxn.commitTs <= lastCommittedTxnTs {