-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtxn_iterator.go
174 lines (147 loc) · 4.82 KB
/
txn_iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package corekv
import (
"bytes"
"github.com/hardcore-os/corekv/lsm"
"github.com/hardcore-os/corekv/utils"
"math"
"sync/atomic"
)
// Iterator helps iterating over the KV pairs in a lexicographically sorted order.
type TxnIterator struct {
iitr utils.Iterator
txn *Txn
readTs uint64
opt IteratorOptions
item *Item
lastKey []byte // Used to skip over multiple versions of the same key.
closed bool
latestTs uint64
}
type IteratorOptions struct {
Reverse bool // Direction of iteration. False is forward, true is backward.
AllVersions bool // Fetch all valid versions of the same key.
InternalAccess bool // Used to allow internal access to keys.
// The following option is used to narrow down the SSTables that iterator
// picks up. If Prefix is specified, only tables which could have this
// prefix are picked based on their range of keys.
prefixIsKey bool // If set, use the prefix for bloom filter lookup.
Prefix []byte // Only iterate over this given prefix.
SinceTs uint64 // Only read data that has version > SinceTs.
}
// NewIterator 方法会生成一个新的事务迭代器。
// 在 Option 中,可以设置只迭代 Key,或者迭代 Key-Value
func (txn *Txn) NewIterator(opt IteratorOptions) *TxnIterator {
if txn.discarded {
panic("Transaction has already been discarded")
}
if txn.db.IsClosed() {
panic(utils.ErrDBClosed.Error())
}
// Keep track of the number of active iterators.
atomic.AddInt32(&txn.numIterators, 1)
var iters []utils.Iterator
if itr := txn.newPendingWritesIterator(opt.Reverse); itr != nil {
iters = append(iters, itr)
}
for _, iter := range txn.db.lsm.NewIterators(nil) {
iters = append(iters, iter)
}
res := &TxnIterator{
txn: txn,
iitr: lsm.NewMergeIterator(iters, opt.Reverse),
opt: opt,
readTs: txn.readTs,
}
return res
}
// NewKeyIterator is just like NewIterator, but allows the user to iterate over all versions of a
// single key. Internally, it sets the Prefix option in provided opt, and uses that prefix to
// additionally run bloom filter lookups before picking tables from the LSM tree.
func (txn *Txn) NewKeyIterator(key []byte, opt IteratorOptions) *TxnIterator {
if len(opt.Prefix) > 0 {
panic("opt.Prefix should be nil for NewKeyIterator.")
}
opt.Prefix = key // This key must be without the timestamp.
opt.prefixIsKey = true
opt.AllVersions = true
return txn.NewIterator(opt)
}
// Item returns pointer to the current key-value pair.
// This item is only valid until it.Next() gets called.
func (it *TxnIterator) Item() *Item {
tx := it.txn
tx.addReadKey(it.item.Entry().Key)
return it.item
}
// Valid returns false when iteration is done.
func (it *TxnIterator) Valid() bool {
if it.item == nil {
return false
}
if it.opt.prefixIsKey {
return bytes.Equal(it.item.Entry().Key, it.opt.Prefix)
}
return bytes.HasPrefix(it.item.Entry().Key, it.opt.Prefix)
}
// ValidForPrefix returns false when iteration is done
// or when the current key is not prefixed by the specified prefix.
func (it *TxnIterator) ValidForPrefix(prefix []byte) bool {
return it.Valid() && bytes.HasPrefix(it.item.Entry().Key, prefix)
}
// Close would close the iterator. It is important to call this when you're done with iteration.
func (it *TxnIterator) Close() {
if it.closed {
return
}
it.closed = true
if it.iitr == nil {
atomic.AddInt32(&it.txn.numIterators, -1)
return
}
it.iitr.Close()
// TODO: We could handle this error.
_ = it.txn.db.vlog.decrIteratorCount()
atomic.AddInt32(&it.txn.numIterators, -1)
}
// Next would advance the iterator by one. Always check it.Valid() after a Next()
// to ensure you have access to a valid it.Item().
func (it *TxnIterator) Next() {
if it.iitr == nil {
return
}
it.iitr.Next()
}
// Seek would seek to the provided key if present. If absent, it would seek to the next
// smallest key greater than the provided key if iterating in the forward direction.
// Behavior would be reversed if iterating backwards.
func (it *TxnIterator) Seek(key []byte) uint64 {
if it.iitr == nil {
return it.latestTs
}
if len(key) > 0 {
it.txn.addReadKey(key)
}
it.lastKey = it.lastKey[:0]
if len(key) == 0 {
key = it.opt.Prefix
}
if len(key) == 0 {
it.iitr.Rewind()
return it.latestTs
}
if !it.opt.Reverse {
// Using maxUint64 instead of it.readTs because we want seek to return latestTs of the key.
// All the keys with ts > readTs will be discarded for iteration by the prefetch function.
key = utils.KeyWithTs(key, math.MaxUint64)
} else {
key = utils.KeyWithTs(key, 0)
}
it.iitr.Seek(key)
return it.latestTs
}
// Rewind would rewind the iterator cursor all the way to zero-th position, which would be the
// smallest key if iterating forward, and largest if iterating backward. It does not keep track of
// whether the cursor started with a Seek().
func (it *TxnIterator) Rewind() {
it.Seek(nil)
}