Skip to content

Commit

Permalink
Minor comment
Browse files Browse the repository at this point in the history
  • Loading branch information
arjunsk committed Jan 9, 2024
1 parent 021414e commit a81c86a
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions pkg/txn/c_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ type Scheduler struct {
sync.Mutex
nextTs uint64

readVisibilityWaiter *TsWaiter
// `readTsMarker` marks the visibility(ts) of read operations in a `newTransaction` to other transactions.
// Here we don't need TsWaiter as such, as we are not using the `WaitFor` API. However,
// we are using the `DoneTill` API to get the last completed readTs to remove the old readyToCommitTxns.
readTsMarker *TsWaiter
// `commitVisibilityWaiter` blocks `newTransaction` to ensure previous commits are visible to new reads.
commitVisibilityWaiter *TsWaiter

readyToCommitTxns []ReadyToCommitTxn
Expand All @@ -18,17 +22,17 @@ type Scheduler struct {
func NewScheduler() *Scheduler {
scheduler := &Scheduler{
nextTs: 1,
readVisibilityWaiter: NewTsWaiter(),
readTsMarker: NewTsWaiter(),
commitVisibilityWaiter: NewTsWaiter(),
}

scheduler.readVisibilityWaiter.Done(scheduler.nextTs - 1)
scheduler.readTsMarker.Done(scheduler.nextTs - 1)
scheduler.commitVisibilityWaiter.Done(scheduler.nextTs - 1)
return scheduler
}

func (o *Scheduler) Stop() {
o.readVisibilityWaiter.Stop()
o.readTsMarker.Stop()
o.commitVisibilityWaiter.Stop()
}

Expand All @@ -37,7 +41,7 @@ func (o *Scheduler) NewReadTs() uint64 {
defer o.Unlock()

beginTimestamp := o.nextTs - 1
o.readVisibilityWaiter.Begin(beginTimestamp)
o.readTsMarker.Begin(beginTimestamp)

err := o.commitVisibilityWaiter.WaitFor(context.Background(), beginTimestamp)
if err != nil {
Expand Down Expand Up @@ -73,7 +77,7 @@ func (o *Scheduler) NewCommitTs(transaction *Txn) (uint64, error) {
}

func (o *Scheduler) DoneRead(transaction *Txn) {
o.readVisibilityWaiter.Done(transaction.snapshot.ts)
o.readTsMarker.Done(transaction.snapshot.ts)
}

func (o *Scheduler) DoneCommit(commitTs uint64) {
Expand All @@ -98,7 +102,7 @@ func (o *Scheduler) hasConflictFor(txn *Txn) bool {

func (o *Scheduler) gcOldReadyToCommitTxns() {
updatedReadyToCommitTxns := o.readyToCommitTxns[:0]
lastCommittedTxnTs := o.readVisibilityWaiter.DoneTill()
lastCommittedTxnTs := o.readTsMarker.DoneTill()

for _, readyToCommitTxn := range o.readyToCommitTxns {
if readyToCommitTxn.commitTs <= lastCommittedTxnTs {
Expand Down

0 comments on commit a81c86a

Please sign in to comment.