-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathgpq.go
378 lines (314 loc) · 9.97 KB
/
gpq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
package gpq
import (
"errors"
"fmt"
"sync"
"time"
"github.com/JustinTimperio/gpq/disk"
"github.com/JustinTimperio/gpq/ftime"
"github.com/JustinTimperio/gpq/queues"
"github.com/JustinTimperio/gpq/schema"
"github.com/google/uuid"
)
// GPQ is a generic priority queue that supports priority levels and timeouts
// It is implemented using a heap for each priority level and a priority queue of non-empty buckets
// It also supports disk caching using badgerDB with the option to lazily disk writes and deletes
// The GPQ is thread-safe and supports concurrent access
type GPQ[d any] struct {
// options is a struct that contains the options for the GPQ
options schema.GPQOptions
// buckets is a map of priority buckets
queue queues.CorePriorityQueue[d]
// diskCache is a badgerDB used to store items in the GPQ
diskCache *disk.Disk[d]
// activeDBSessions is a wait group for active disk cache sessions
activeDBSessions *sync.WaitGroup
// lazyDiskMessageChan is a channel used to send messages to the lazy disk cache
lazyDiskSendChan chan schema.Item[d]
// lazyDiskDeleteChan is a channel used to send messages to the lazy disk cache
lazyDiskDeleteChan chan schema.DeleteMessage
// batchHandler allows for synchronization of disk cache batches
batchHandler *batchHandler[d]
// batchCounter is used to keep track the current batch number
batchCounter *batchCounter
}
// NewGPQ creates a new GPQ with the given number of buckets
// The number of buckets is the number of priority levels you want to support
// You must provide the number of buckets ahead of time and all priorities you submit
// must be within the range of 0 to NumOfBuckets
func NewGPQ[d any](Options schema.GPQOptions) (uint, *GPQ[d], error) {
var diskCache *disk.Disk[d]
var err error
var sender chan schema.Item[d]
var receiver chan schema.DeleteMessage
if Options.DiskCacheEnabled {
diskCache, err = disk.NewDiskCache[d](nil, Options)
if err != nil {
return 0, nil, err
}
if Options.LazyDiskCacheEnabled {
sender = make(chan schema.Item[d], Options.LazyDiskCacheChannelSize)
receiver = make(chan schema.DeleteMessage, Options.LazyDiskCacheChannelSize)
}
}
gpq := &GPQ[d]{
queue: queues.NewCorePriorityQueue[d](Options, diskCache, receiver),
options: Options,
diskCache: diskCache,
activeDBSessions: &sync.WaitGroup{},
lazyDiskSendChan: sender,
lazyDiskDeleteChan: receiver,
batchHandler: newBatchHandler(diskCache),
batchCounter: newBatchCounter(Options.LazyDiskBatchSize),
}
var restored uint
if Options.DiskCacheEnabled {
items, err := gpq.diskCache.RestoreFromDisk()
if err != nil {
return 0, gpq, err
}
errs := gpq.restoreDB(items)
if errs != nil {
return 0, gpq, fmt.Errorf("Failed to Restore DB, received %d errors! Errors: %v", len(errs), errs)
}
restored = uint(len(items))
if Options.LazyDiskCacheEnabled {
go gpq.lazyDiskWriter(Options.DiskWriteDelay)
go gpq.lazyDiskDeleter()
}
}
return restored, gpq, nil
}
// ItemsInQueue returns the total number of items in the queue
func (g *GPQ[d]) ItemsInQueue() uint {
return g.queue.ItemsInQueue()
}
// ItemsInDB returns the total number of items currently commit to disk
func (g *GPQ[d]) ItemsInDB() uint {
return g.diskCache.ItemsInDB()
}
// ActiveBuckets returns the total number of buckets(priorities) that have messages within
func (g *GPQ[d]) ActiveBuckets() uint {
return g.queue.ActiveBuckets()
}
// Enqueue adds an item to the queue with the given options
func (g *GPQ[d]) Enqueue(item schema.Item[d]) error {
if item.Priority > uint(g.options.MaxPriority) {
return errors.New("Priority bucket does not exist")
}
item.SubmittedAt = ftime.Now()
item.LastEscalated = item.SubmittedAt
if g.options.DiskCacheEnabled && !item.WasRestored {
key, err := uuid.New().MarshalBinary()
if err != nil {
return err
}
item.DiskUUID = key
if g.options.LazyDiskCacheEnabled {
item.BatchNumber = g.batchCounter.increment()
g.lazyDiskSendChan <- item
} else {
err = g.diskCache.WriteSingle(key, item)
if err != nil {
return err
}
}
}
return g.queue.Enqueue(&item)
}
// EnqueueBatch takes a slice of items and attempts to enqueue them in their perspective buckets
// If a error is generated, it is attached to a slice of errors. Currently the batch will be commit
// in the partial state, and it is up to the user to parse the errors and resend messages that failed.
// In the future this will most likely change with the addition of transactions.
func (g *GPQ[d]) EnqueueBatch(items []schema.Item[d]) []error {
var (
errors []error
processedItems []*schema.Item[d]
)
for i := 0; i < len(items); i++ {
if items[i].Priority > uint(g.options.MaxPriority) {
errors = append(errors, fmt.Errorf("No Bucket exists to place message %d with priority %d", i, items[i].Priority))
continue
}
if g.options.DiskCacheEnabled {
key, err := uuid.New().MarshalBinary()
if err != nil {
errors = append(errors, fmt.Errorf("Unable to generate UUID for message %d with priority %d", i, items[i].Priority))
continue
}
items[i].DiskUUID = key
if g.options.LazyDiskCacheEnabled {
items[i].BatchNumber = g.batchCounter.increment()
g.lazyDiskSendChan <- items[i]
} else {
err = g.diskCache.WriteSingle(items[i].DiskUUID, items[i])
if err != nil {
errors = append(errors, fmt.Errorf("Unable to write message %d with priority %d", i, items[i].Priority))
continue
}
}
}
processedItems = append(processedItems, &items[i])
}
return g.queue.EnqueueBatch(processedItems)
}
// Dequeue removes and returns the item with the highest priority in the queue
func (g *GPQ[d]) Dequeue() (item *schema.Item[d], err error) {
item, err = g.queue.Dequeue()
if err != nil {
return item, err
}
if g.options.DiskCacheEnabled {
if g.options.LazyDiskCacheEnabled {
dm := schema.DeleteMessage{
DiskUUID: item.DiskUUID,
BatchNumber: item.BatchNumber,
WasRestored: item.WasRestored,
}
g.lazyDiskDeleteChan <- dm
} else {
err = g.diskCache.DeleteSingle(item.DiskUUID)
if err != nil {
return item, err
}
}
}
return item, nil
}
// DequeueBatch takes a batch size, and returns a slice ordered by priority up to the batchSize provided
// enough messages are present to fill the batch. Partial batches will be returned if a error is encountered.
func (g *GPQ[d]) DequeueBatch(batchSize uint) (items []*schema.Item[d], errs []error) {
items, errs = g.queue.DequeueBatch(batchSize)
if errs != nil {
return items, errs
}
if g.options.DiskCacheEnabled {
for i := 0; i < len(items); i++ {
if g.options.LazyDiskCacheEnabled {
dm := schema.DeleteMessage{
DiskUUID: items[i].DiskUUID,
BatchNumber: items[i].BatchNumber,
WasRestored: items[i].WasRestored,
}
g.lazyDiskDeleteChan <- dm
} else {
err := g.diskCache.DeleteSingle(items[i].DiskUUID)
if err != nil {
return nil, []error{err}
}
}
}
}
return items, nil
}
// Prioritize orders the queue based on the individual options added to
// every message in the queue. Prioritizing the queue is a stop-the-world
// event, so consider your usage carefully.
func (g *GPQ[d]) Prioritize() (escalated, removed uint, err error) {
return g.queue.Prioritize()
}
// Close performs a safe shutdown of the GPQ and the disk cache preventing data loss
func (g *GPQ[d]) Close() {
if g.options.DiskCacheEnabled {
if g.options.LazyDiskCacheEnabled {
close(g.lazyDiskSendChan)
close(g.lazyDiskDeleteChan)
}
// Wait for all db sessions to sync to disk
g.activeDBSessions.Wait()
// Safely close the diskCache
g.diskCache.Close()
}
}
func (g *GPQ[d]) restoreDB(items []*schema.Item[d]) []error {
// Quick sanity check
for i := 0; i < len(items); i++ {
if items[i].Priority > uint(g.options.MaxPriority) {
return []error{fmt.Errorf("You are trying to restore items with priorities higher than the max allowed for this queue")}
}
}
return g.queue.EnqueueBatch(items)
}
func (g *GPQ[d]) lazyDiskWriter(maxDelay time.Duration) {
g.activeDBSessions.Add(1)
defer g.activeDBSessions.Done()
var mux sync.Mutex
var wg sync.WaitGroup
var closer = make(chan struct{}, 1)
batch := make(map[uint][]*schema.Item[d], 0)
ticker := time.NewTicker(maxDelay)
wg.Add(2)
go func() {
defer wg.Done()
for {
select {
case item, ok := <-g.lazyDiskSendChan:
if !ok {
closer <- struct{}{}
mux.Lock()
for k, v := range batch {
g.batchHandler.processBatch(v, k)
batch[k] = batch[k][:0]
}
mux.Unlock()
return
}
mux.Lock()
batch[item.BatchNumber] = append(batch[item.BatchNumber], &item)
mux.Unlock()
}
}
}()
go func() {
defer wg.Done()
for {
select {
case <-ticker.C:
mux.Lock()
for k, v := range batch {
if len(v) >= int(g.options.LazyDiskBatchSize) {
g.batchHandler.processBatch(v, k)
batch[k] = batch[k][:0]
}
}
mux.Unlock()
case <-closer:
return
}
}
}()
wg.Wait()
}
func (g *GPQ[d]) lazyDiskDeleter() {
g.activeDBSessions.Add(1)
defer g.activeDBSessions.Done()
batch := make(map[uint][]*schema.DeleteMessage, 0)
restored := make([]*schema.DeleteMessage, 0)
for {
select {
case item, ok := <-g.lazyDiskDeleteChan:
if !ok {
g.batchHandler.deleteBatch(restored, 0, true)
for i, v := range batch {
g.batchHandler.deleteBatch(v, i, false)
batch[item.BatchNumber] = batch[item.BatchNumber][:0]
}
return
}
if item.WasRestored {
restored = append(restored, &item)
if len(restored) >= int(g.options.LazyDiskBatchSize) {
g.batchHandler.deleteBatch(restored, 0, true)
restored = restored[:0]
}
continue
}
// If the batch is full, process it and delete the items from the disk cache
batch[item.BatchNumber] = append(batch[item.BatchNumber], &item)
if len(batch[item.BatchNumber]) >= int(g.options.LazyDiskBatchSize) {
g.batchHandler.deleteBatch(batch[item.BatchNumber], item.BatchNumber, false)
batch[item.BatchNumber] = batch[item.BatchNumber][:0]
}
}
}
}