-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathbucket.go
508 lines (423 loc) · 23.2 KB
/
bucket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
// Copyright 2013-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.
package sgbucket
import (
"context"
"errors"
"expvar"
"fmt"
)
// BucketDocument is a raw representation of a document, body and xattrs as bytes, along with cas.
type BucketDocument struct {
Body []byte
Xattrs map[string][]byte
Cas uint64
Expiry uint32 // Item expiration time (UNIX Epoch time)
IsTombstone bool // IsTombstone is true if the document is a tombstone
}
// BucketStoreFeature can be tested for with BucketStoreFeatureIsSupported.IsSupported.
type BucketStoreFeature int
const (
BucketStoreFeatureXattrs = BucketStoreFeature(iota)
BucketStoreFeatureN1ql
BucketStoreFeatureCrc32cMacroExpansion
BucketStoreFeatureCreateDeletedWithXattr
BucketStoreFeatureSubdocOperations
BucketStoreFeaturePreserveExpiry
BucketStoreFeatureCollections
BucketStoreFeatureSystemCollections
BucketStoreFeatureMobileXDCR
BucketStoreFeatureMultiXattrSubdocOperations
BucketStoreFeatureN1qlIfNotExistsDDL
)
// BucketStore is a basic interface that describes a bucket - with one or many underlying DataStore.
type BucketStore interface {
GetName() string // The bucket's name
UUID() (string, error) // The bucket's UUID
Close(context.Context) // Closes the bucket
// A list of all DataStore names in the bucket.
ListDataStores() ([]DataStoreName, error)
// The default data store of the bucket (always exists.)
DefaultDataStore() DataStore
// Returns a named data store in the bucket, or an error if it doesn't exist.
NamedDataStore(DataStoreName) (DataStore, error)
MutationFeedStore
BucketStoreFeatureIsSupported
}
// DynamicDataStoreBucket is an interface that describes a bucket that can change its set of DataStores.
type DynamicDataStoreBucket interface {
CreateDataStore(context.Context, DataStoreName) error // CreateDataStore creates a new DataStore in the bucket
DropDataStore(DataStoreName) error // DropDataStore drops a DataStore from the bucket
}
// MutationFeedStore is a DataStore that supports a DCP or TAP streaming mutation feed.
type MutationFeedStore interface {
// GetMaxVbno returns the number of vBuckets of this store; usually 1024.
GetMaxVbno() (uint16, error)
// StartDCPFeed starts a new DCP event feed. Events will be passed to the callback function.
// To close the feed, pass a channel in args.Terminator and close that channel. The callback will be called for each event processed. dbStats are optional to provide metrics.
StartDCPFeed(ctx context.Context, args FeedArguments, callback FeedEventCallbackFunc, dbStats *expvar.Map) error
}
// BucketStoreFeatureIsSupported allows a BucketStore to be tested for support for various features.
type BucketStoreFeatureIsSupported interface {
IsSupported(feature BucketStoreFeature) bool // IsSupported reports whether the bucket/datastore supports a given feature
}
// DataStore is a basic key-value store with extended attributes and subdoc operations.
// A Couchbase Server collection within a bucket is an example of a DataStore.
// The expiry field (exp) can take offsets or UNIX Epoch times. See https://developer.couchbase.com/documentation/server/3.x/developer/dev-guide-3.0/doc-expiration.html
type DataStore interface {
// GetName returns bucket.scope.collection
GetName() string
// An integer that uniquely identifies this Collection in its Bucket.
// The default collection always has the ID zero.
GetCollectionID() uint32
KVStore
XattrStore
SubdocStore
BucketStoreFeatureIsSupported
DataStoreName
}
// UpsertOptions are the options to use with the set operations
type UpsertOptions struct {
PreserveExpiry bool // PreserveExpiry will keep the existing expiry of an existing document if available
}
// MutateInOptions is a struct of options for mutate in operations, to be used by both sync gateway and rosmar
type MutateInOptions struct {
PreserveExpiry bool // PreserveExpiry will keep the existing document expiry on modification
MacroExpansion []MacroExpansionSpec
}
// MacroExpansionSpec is a path, value pair where the path is a xattr path and the macro to be used to populate that path
type MacroExpansionSpec struct {
Path string
Type MacroExpansionType
}
// MacroExpansionType defines the macro expansion types used by Sync Gateway and supported by CBS and rosmar
type MacroExpansionType int
const (
MacroCas MacroExpansionType = iota // Document CAS
MacroCrc32c // crc32c hash of the document body
)
var (
macroExpansionTypeStrings = []string{"CAS", "crc32c"}
)
func (t MacroExpansionType) String() string {
return macroExpansionTypeStrings[t]
}
func NewMacroExpansionSpec(specPath string, macro MacroExpansionType) MacroExpansionSpec {
return MacroExpansionSpec{
Path: specPath,
Type: macro,
}
}
// A KVStore implements the basic key-value CRUD operations.
type KVStore interface {
// Get retrives a document value of a key and unmarshals it.
// Parameters:
// - k: The key (document ID)
// - rv: The value, if any, is stored here. Must be a pointer.
// If it is a `*[]byte` the raw value will be stored in it.
// Otherwise it's written to by json.Unmarshal; the usual type is `*map[string]any`.
// If the document is a tombstone, nothing is stored.
// Return values:
// - cas: The document's current CAS (sequence) number.
// - err: Error, if any. Returns an error if the key does not exist.
Get(k string, rv interface{}) (cas uint64, err error)
// GetRaw returns value of a key as a raw byte array.
// Parameters:
// - k: The key (document ID)
// Return values:
// - rv: The raw value. Nil if the document is a tombstone.
// - cas: The document's current CAS (sequence) number.
// - err: Error, if any. Returns an error if the key does not exist.
GetRaw(k string) (rv []byte, cas uint64, err error)
// GetAndTouchRaw is like GetRaw, but also sets the document's expiration time.
// Since this changes the document, it generates a new CAS value and posts an event.
GetAndTouchRaw(k string, exp uint32) (rv []byte, cas uint64, err error)
// Touch is equivalent to GetAndTouchRaw, but does not return the value.
Touch(k string, exp uint32) (cas uint64, err error)
// Add creates a document; similar to Set but gives up if the key exists with a non-nil value.
// Parameters:
// - k: The key (document ID)
// - exp: Expiration timestamp (0 for never)
// - v: The value to set. Will be marshaled to JSON unless it is a `[]byte` or `*[]byte`.
// Return values:
// - added: True if the document was added, false if it already has a value.
// - err: Error, if any. Does not return ErrKeyExists.
Add(k string, exp uint32, v interface{}) (added bool, err error)
// AddRaw creates a document; similar to SetRaw but gives up if the key exists with a non-nil value.
// Parameters:
// - k: The key (document ID)
// - exp: Expiration timestamp (0 for never)
// - v: The raw value to set.
// Return values:
// - added: True if the document was added, false if it already has a value.
// - err: Error, if any. Does not return ErrKeyExists.
AddRaw(k string, exp uint32, v []byte) (added bool, err error)
// Set upserts a a document, creating it if it doesn't exist.
// Parameters:
// - k: The key (document ID)
// - exp: Expiration timestamp (0 for never)
// - opts: Options. Use PreserveExpiry=true to leave the expiration alone
// - v: The value to set. Will be marshaled to JSON unless it is a `[]byte` or `*[]byte`
// Return values:
// - err: Error, if any
Set(k string, exp uint32, opts *UpsertOptions, v interface{}) error
// Set upserts a document, creating it if it doesn't exist.
// Parameters:
// - k: The key (document ID)
// - exp: Expiration timestamp (0 for never)
// - opts: Options. Use PreserveExpiry=true to leave the expiration alone
// - v: The raw value to set
// Return values:
// - err: Error, if any. Does not return ErrKeyExists
SetRaw(k string, exp uint32, opts *UpsertOptions, v []byte) error
// WriteCas is the most general write method. Sets the value of a document, creating it if it doesn't
// exist, but checks for CAS conflicts:
// If the document has a value, and its CAS differs from the input `cas` parameter, the method
// fails and returns a CasMismatchErr.
// Parameters:
// - k: The key (document ID)
// - exp: Expiration timestamp (0 for never)
// - cas: Expected CAS value
// - v: The value to set. Will be marshaled to JSON unless it is a `[]byte` or `*[]byte`
// - opt: Options; see WriteOptions for details
// Return values:
// - casOut: The new CAS value
// - err: Error, if any. May be CasMismatchErr
WriteCas(k string, exp uint32, cas uint64, v interface{}, opt WriteOptions) (casOut uint64, err error)
// Delete removes a document by setting its value to nil, making it a tombstone.
// System xattrs are preserved but user xattrs are removed.
// Returns an error if the document doesn't exist or has no value.
Delete(k string) error
// Remove a document if its CAS matches the given value.
// System xattrs are preserved but user xattrs are removed.
// Returns an erorr if the document doesn't exist or has no value. Returns a CasMismatchErr if the CAS doesn't match.
Remove(k string, cas uint64) (casOut uint64, err error)
// Update interactively updates a document. The document's current value (nil if none) is passed to
// the callback, then the result of the callback is used to update the value.
//
// Warning: If the document's CAS changes between the read and the write, the method retries;
// therefore you must be prepared for your callback to be called multiple times.
//
// Note: The new value is assumed to be JSON, i.e. when the document is updated its "is JSON"
// flag is set. The UpdateFunc callback unfortunately has no way to override this.
//
// Parameters:
// - k: The key (document ID)
// - exp: Expiration timestamp to set (0 for never)
// - callback: Will be called to compute the new value
// Return values:
// - casOut: The document's new CAS
// - err: Error, if any (including an error returned by the callback)
Update(k string, exp uint32, callback UpdateFunc) (casOut uint64, err error)
// Incr adds a number to a document serving as a counter.
// The document's value must be an ASCII decimal integer.
// Parameters:
// - k: The key (document ID)
// - amt: The amount to add to the existing value
// - def: The number to store if there is no existing value
// - exp: Expiration timestamp to set (0 for never)
// Return values:
// - casOut: The document's new CAS
// - err: Error, if any
Incr(k string, amt, def uint64, exp uint32) (casOut uint64, err error)
// GetExpiry returns the document's current expiration timestamp.
GetExpiry(ctx context.Context, k string) (expiry uint32, err error)
// Exists tests whether a document exists.
// A tombstone with a nil value is still considered to exist.
Exists(k string) (exists bool, err error)
}
// SubdocStore is an extension of KVStore that allows individual properties in a document to be accessed.
// Documents accessed through this API must have values that are JSON objects.
// Properties are specified by SQL++ paths that look like "foo.bar.baz" or "foo.bar[3].baz".
type SubdocStore interface {
// SubdocInsert adds an individual JSON property to a document. The document must exist.
// If the property already exists, returns `ErrPathExists`.
// If the parent property doesn't exist, returns `ErrPathNotFound`.
// If a parent property has the wrong type, returns ErrPathMismatch.
// Parameters:
// - k: The key (document ID)
// - subdocPath: The JSON path of the property to set
// - cas: Expected CAS value, or 0 to ignore CAS conflicts
// - value: The value to set. Will be marshaled to JSON.
SubdocInsert(ctx context.Context, k string, subdocPath string, cas uint64, value interface{}) error
// GetSubDocRaw returns the raw JSON value of a document property.
// If the property doesn't exist, returns ErrPathNotFound.
// If a parent property has the wrong type, returns ErrPathMismatch.
// Parameters:
// - k: The key (document ID)
// - subdocPath: The JSON path of the property to get
// Return values:
// - value: The property value as JSON
// - casOut: The document's current CAS (sequence) number.
// - err: Error, if any.
GetSubDocRaw(ctx context.Context, k string, subdocPath string) (value []byte, casOut uint64, err error)
// WriteSubDoc sets an individual JSON property in a document.
// Creates the document if it didn't exist.
// If the parent property doesn't exist, returns `ErrPathNotFound`.
// If a parent property has the wrong type, returns ErrPathMismatch.
// Parameters:
// - docID: The document ID or key
// - subdocPath: The JSON path of the property to set
// - cas: Expected CAS value, or 0 to ignore CAS conflicts
// - value: The raw value to set. Must be valid JSON.
// Return values:
// - casOut: The document's new CAS
// - err: Error, if any
WriteSubDoc(ctx context.Context, k string, subdocPath string, cas uint64, value []byte) (casOut uint64, err error)
}
// XattrStore is a data store that supports extended attributes, i.e. document metadata.
type XattrStore interface {
// Writes a document and updates xattr values. Fails on a CAS mismatch.
// Parameters:
// - k: The key (document ID)
// - exp: Expiration timestamp (0 for never)
// - cas: Expected CAS value
// - opts: Options; use PreserveExpiry to avoid setting expiry
// - value: The raw value to set, or nil to *leave unchanged*
// - xattrValues: Each key represent a raw xattrs value to set, setting any of these values to nil will result in an error.
// - xattrsToDelete: The names of xattrs to delete.
WriteWithXattrs(ctx context.Context, k string, exp uint32, cas uint64, value []byte, xattrsValues map[string][]byte, xattrsToDelete []string, opts *MutateInOptions) (casOut uint64, err error)
// WriteTombstoneWithXattrs is used when writing a tombstone. This is used when creating a tombstone an existing document, modifying a tombstone, or creating a tombstone from no document. If deleteBody=true, will delete an existing body.
WriteTombstoneWithXattrs(ctx context.Context, k string, exp uint32, cas uint64, xattrValue map[string][]byte, xattrsToDelete []string, deleteBody bool, opts *MutateInOptions) (casOut uint64, err error)
// WriteResurrectionWithXattrs is used when resurrecting a tombstone. Any existing xattrs on a document will be overwritten.
WriteResurrectionWithXattrs(ctx context.Context, k string, exp uint32, body []byte, xattrs map[string][]byte, opts *MutateInOptions) (casOut uint64, err error)
// SetXattrs updates xattrs of a document.
// Parameters:
// - k: The key (document ID)
// - xattrs: Each xattr value is stored as a key with the raw value to set or nil to delete.
SetXattrs(ctx context.Context, k string, xattrs map[string][]byte) (casOut uint64, err error)
// RemoveXattrs removes xattrs by name. Fails on a CAS mismatch.
// - k: The key (document ID)
// - xattrKey: The name of the xattr to update
// - cas: Expected CAS value
RemoveXattrs(ctx context.Context, k string, xattrKeys []string, cas uint64) (err error)
// DeleteSubDocPaths removes any SQL++ subdoc paths from a document.
DeleteSubDocPaths(ctx context.Context, k string, paths ...string) (err error)
// GetXattrs returns the xattrs with the following keys. If the key is not present, it will not be present in the returned map.
GetXattrs(ctx context.Context, k string, xattrKeys []string) (xattrs map[string][]byte, casOut uint64, err error)
// GetWithXattrs returns a document's value as well as an xattrs.
GetWithXattrs(ctx context.Context, k string, xattrKeys []string) (v []byte, xv map[string][]byte, cas uint64, err error)
// DeleteWithXattrs removes a document and its named xattrs. User xattrs will always be deleted, but system xattrs must be manually removed when a document becomes a tombstone.
DeleteWithXattrs(ctx context.Context, k string, xattrKeys []string) error
// WriteUpdateWithXattrs preforms an interactive update of a document with MVCC.
// See the documentation of WriteUpdateWithXattrsFunc for details.
// - k: The key (document ID)
// - xattrs: The name of the xattrs to view or update.
// - exp: Expiration timestamp (0 for never)
// - cas: Expected CAS value
// - opts: Options; use PreserveExpiry to avoid setting expiry
// - previous: The current document, if known. Will be used in place of the initial Get
// - callback: The callback that mutates the document
WriteUpdateWithXattrs(ctx context.Context, k string, xattrs []string, exp uint32, previous *BucketDocument, opts *MutateInOptions, callback WriteUpdateWithXattrsFunc) (casOut uint64, err error)
// UpdateXattrs will update the xattrs for a document. Use MutateInOptions to preserve the expiry value of a document. This operation returns an error on a CAS mismatch.
UpdateXattrs(ctx context.Context, k string, exp uint32, cas uint64, xv map[string][]byte, opts *MutateInOptions) (casOut uint64, err error)
}
// DeletableStore is a data store that supports deletion of the underlying persistent storage.
type DeleteableStore interface {
// CloseAndDelete closes the store and removes its persistent storage.
CloseAndDelete(ctx context.Context) error
}
type DeletableBucket = DeleteableStore
// FlushableStore is a data store that supports flush.
type FlushableStore interface {
Flush() error
}
// WriteOptions are option flags for the Write method.
type WriteOptions int
const (
Raw = WriteOptions(1 << iota) // Value is raw []byte; don't JSON-encode it
AddOnly // Fail with ErrKeyExists if key already has a value
Persist // After write, wait until it's written to disk
Indexable // After write, wait until it's ready for views to index
Append // Appends to value instead of replacing it
)
// MissingError is returned by Bucket API when a document is missing
type MissingError struct {
Key string // The document's ID
}
func (err MissingError) Error() string {
return fmt.Sprintf("key %q missing", err.Key)
}
// XattrMissingError is returned by Bucket API when an Xattr is missing
type XattrMissingError struct {
Key string // The document ID
Xattrs []string // missing xattrs
}
func (err XattrMissingError) Error() string {
return fmt.Sprintf("key %q's xattr %q missing", err.Key, err.Xattrs)
}
// ErrKeyExists is returned from Write with AddOnly flag, when key already exists in the bucket.
// (This is *not* returned from the Add method! Add has an extra boolean parameter to
// indicate this state, so it returns (false,nil).)
var ErrKeyExists = errors.New("Key exists")
// ErrTimeout returned from Write with Perist or Indexable flags, if the value doesn't become
// persistent or indexable within the timeout period.
var ErrTimeout = errors.New("Timeout")
// ErrCasFailureShouldRetry is returned from an update callback causes the function to re-fetch the doc and try again.
var ErrCasFailureShouldRetry = errors.New("CAS failure should retry")
// DocTooBigErr is returned when trying to store a document value larger than the limit (usually 20MB.)
type DocTooBigErr struct{}
func (err DocTooBigErr) Error() string {
return "document value too large"
}
// CasMismatchErr is returned when the input CAS does not match the document's current CAS.
type CasMismatchErr struct {
Expected, Actual uint64
}
func (err CasMismatchErr) Error() string {
return fmt.Sprintf("cas mismatch: expected %x, really %x", err.Expected, err.Actual)
}
// ErrPathNotFound is returned by subdoc operations when the path is not found.
var ErrPathNotFound = errors.New("subdocument path not found in document")
// ErrPathExists is returned by subdoc operations when the path already exists, and is expected to not exist.
var ErrPathExists = errors.New("subdocument path already exists in document")
// ErrPathMismatch is returned by subdoc operations when the path exists but has the wrong type.
var ErrPathMismatch = errors.New("type mismatch in subdocument path")
// ErrDeleteXattrOnTombstone is returned when trying to delete an xattr on a tombstone document.
var ErrDeleteXattrOnTombstone = errors.New("cannot delete xattr on tombstone")
// ErrDeleteXattrOnTombstoneResurrection is returned when trying to delete an xattr on a resurrection of a tombstone document.
var ErrDeleteXattrOnTombstoneResurrection = errors.New("cannot delete xattr on resurrection of a tombstone")
// ErrDeleteXattrOnDocumentInsert is returned when trying to specify xattrs to delete on a document insert, which is invalid.
var ErrDeleteXattrOnDocumentInsert = errors.New("cannot delete xattrs on document insert")
// ErrUpsertAndDeleteSameXattr is returned when trying to upsert and delete the same xattr in the same operation.
var ErrUpsertAndDeleteSameXattr = errors.New("cannot upsert and delete the same xattr in the same operation")
// ErrNilXattrValue is returned when trying to set a named xattr to a nil value. This is allowed in Couchbase Server, but not rosmar and has no use in Sync Gateway.
var ErrNilXattrValue = errors.New("nil xattr value not allowed")
// ErrDocumentExistsOnResurrection is returned when trying to resurrect a document that already exists in a live form.
var ErrDocumentExistsOnResurrection = errors.New("document already exists on resurrection")
// ErrNeedXattrs is returned xattrs are not specified.
var ErrNeedXattrs = errors.New("xattrs must be specified to update or to delete")
// ErrNeedBody is returned when a function requires a non nil body.
var ErrNeedBody = errors.New("body must be specified")
// UpdateFunc is a callback passed to KVStore.Update.
// Parameters:
// - current: The document's current raw value. nil if it's a tombstone or doesn't exist.
// Results:
// - updated: The new value to store, or nil to leave the value alone.
// - expiry: Nil to leave expiry alone, else a pointer to a new timestamp.
// - delete: If true, the document will be deleted.
// - err: Returning an error aborts the update.
type UpdateFunc func(current []byte) (updated []byte, expiry *uint32, delete bool, err error)
// UpdatedDoc is returned by WriteUpdateWithXattrsFunc, to indicate the new document value and xattrs.
type UpdatedDoc struct {
Doc []byte // Raw value of the document
Xattrs map[string][]byte // Each xattr found with its value. If the xattr is specified, it will be preserved.
XattrsToDelete []string // xattrs to delete. This must be empty if the updated document will be a resurrection of a tombstone.
IsTombstone bool // IsTombstone is true if the document is a tombstone
Expiry *uint32 // Expiry is non-nil to set an expiry
Spec []MacroExpansionSpec // Spec represents which macros to expand
}
// WriteUpdateWithXattrsFunc is used by XattrStore.WriteUpdateWithXattrs, used to transform the doc in preparation for update.
// Parameters:
// - doc: Current document raw value
// - xattrs: Current value of xattrs
// - cas: Document's current CAS
// Return values:
// - UpdatedDoc: New value to store (or nil to leave unchanged)
// - err: If non-nil, cancels update.
type WriteUpdateWithXattrsFunc func(doc []byte, xattrs map[string][]byte, cas uint64) (UpdatedDoc, error)