diff --git a/Gopkg.lock b/Gopkg.lock index fb495f1..28c376f 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -66,12 +66,12 @@ version = "v1.0.0" [[projects]] - digest = "1:4112546e6964796e1c92a9ffdea8fd7ae81ffbf81eda4f946f50937e178f53da" + digest = "1:ff41a98e14bb578ecb9be42c98bcb0465e158988a1f6637a1fcd8c52522068e7" name = "github.com/hashicorp/go-retryablehttp" packages = ["."] pruneopts = "UT" - revision = "4502c0ecdaf0b50d857611af23831260f99be6bf" - version = "v0.5.0" + revision = "d3a63d3c72068412f9ecd7c36faafd874d2e2888" + version = "v0.5.1" [[projects]] branch = "master" @@ -109,7 +109,7 @@ version = "v1.0.0" [[projects]] - digest = "1:ac52144f192f9c6a37ac9d0abdb9c837da80446a25a881db2b63c04e1259bd8a" + digest = "1:5b9aac5494a84e9e098d31ae0b6ef87983998b500f8bb08de0656712e7f81b1d" name = "github.com/hashicorp/vault" packages = [ "api", @@ -121,8 +121,8 @@ "helper/strutil", ] pruneopts = "UT" - revision = "08df121c8b9adcc2b8fd55fc8506c3f9714c7e61" - version = "v1.0.1" + revision = "37a1dc9c477c1c68c022d2084550f25bf20cac33" + version = "v1.0.2" [[projects]] digest = "1:c568d7727aa262c32bdf8a3f7db83614f7af0ed661474b24588de635c20024c7" @@ -269,11 +269,11 @@ [[projects]] branch = "master" - digest = "1:91137b48dc3eb34409f731b49f63a5ebf73218168a065e1a93af24eb5b2f99e8" + digest = "1:fe2af5c0e6b4188bb1907e051cd086dae4f7ab3a2f4c1b62c03fefca848ab900" name = "golang.org/x/sys" packages = ["unix"] pruneopts = "UT" - revision = "48ac38b7c8cbedd50b1613c0fccacfc7d88dfcdf" + revision = "a457fd036447854c0c02e89ea439481bdcf941a2" [[projects]] digest = "1:a2ab62866c75542dd18d2b069fec854577a20211d7c0ea6ae746072a1dccdd18" diff --git a/Gopkg.toml b/Gopkg.toml index 801e543..984f62d 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -1,30 +1,3 @@ -# Gopkg.toml example -# -# Refer to https://golang.github.io/dep/docs/Gopkg.toml.html -# for detailed Gopkg.toml documentation. -# -# required = ["github.com/user/thing/cmd/thing"] -# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] -# -# [[constraint]] -# name = "github.com/user/project" -# version = "1.0.0" -# -# [[constraint]] -# name = "github.com/user/project2" -# branch = "dev" -# source = "github.com/myfork/project2" -# -# [[override]] -# name = "github.com/x/y" -# version = "2.4.0" -# -# [prune] -# non-go = false -# go-tests = true -# unused-packages = true - - [[constraint]] name = "github.com/spf13/viper" version = "1.2.1" diff --git a/db/db.go b/db/db.go index 6625516..a0d0fd7 100644 --- a/db/db.go +++ b/db/db.go @@ -28,35 +28,42 @@ import ( type Interface interface { Initialize() error - GetHistory(deviceId string) ([]Event, error) + GetHistory(deviceId string) (History, error) GetTombstone(deviceId string) (map[string]Event, error) - RemoveHistory(deviceId string, numToRemove int) error + UpdateHistory(deviceId string, events []Event) error InsertEvent(deviceId string, event Event, tombstoneKey string) error RemoveAll() error } +// the prefixes for the different documents being stored in couchbase const ( historyDoc = "history" counterDoc = "counter" tombstoneDoc = "tombstone" ) +// TODO: Add a way to try to reconnect to the database after a command fails because the connection broke + +// DbConnection contains the bucket connection and configuration values type DbConnection struct { - Server string - Username string - Password string - Bucket string + Server string + Username string + Password string + Bucket string + // number of times to try when initially connecting to the database NumRetries int + // the time duration to add when creating TTLs for history documents Timeout time.Duration bucketConn *gocb.Bucket } -// Tombstone hold the map of the last of certain events -// that are saved so that they are not deleted +// History is a list of events related to a device id, +// and has a TTL // -// swagger:model Tombstone -type Tombstone struct { - Events map[string]Event `json:"events"` +// swagger:model History +type History struct { + // the list of events from newest to oldest + Events []Event `json:"events"` } // Event represents the event information in the database @@ -104,6 +111,7 @@ type Event struct { Details map[string]interface{} `json:"details"` } +// Initialize creates the connection with couchbase and opens the specified bucket func (db *DbConnection) Initialize() error { var err error @@ -143,22 +151,24 @@ func (db *DbConnection) Initialize() error { return nil } -func (db *DbConnection) GetHistory(deviceId string) ([]Event, error) { +// GetHistory returns the history (list of events) for a given device +func (db *DbConnection) GetHistory(deviceId string) (History, error) { var ( - deviceInfo []Event + deviceInfo History ) if deviceId == "" { - return []Event{}, emperror.WrapWith(errors.New("Invalid device id"), "Get history not attempted", + return History{}, emperror.WrapWith(errors.New("Invalid device id"), "Get history not attempted", "device id", deviceId) } key := strings.Join([]string{historyDoc, deviceId}, ":") _, err := db.bucketConn.Get(key, &deviceInfo) if err != nil { - return []Event{}, emperror.WrapWith(err, "Getting history from database failed", "device id", deviceId) + return History{}, emperror.WrapWith(err, "Getting history from database failed", "device id", deviceId) } return deviceInfo, nil } +// GetTombstone returns the tombstone (map of events) for a given device func (db *DbConnection) GetTombstone(deviceId string) (map[string]Event, error) { var ( deviceInfo map[string]Event @@ -175,78 +185,88 @@ func (db *DbConnection) GetTombstone(deviceId string) (map[string]Event, error) return deviceInfo, nil } -func (db *DbConnection) RemoveHistory(deviceId string, numToRemove int) error { +// UpdateHistory updates the history to the list of events given for a given device +func (db *DbConnection) UpdateHistory(deviceId string, events []Event) error { key := strings.Join([]string{historyDoc, deviceId}, ":") - for a := 0; a < numToRemove; a++ { - _, err := db.bucketConn.ListRemove(key, 0) - return emperror.WrapWith(err, "Removing from history failed", "number of events successfully removed", a, - "device id", deviceId) + newTimeout := uint32(time.Now().Add(db.Timeout).Unix()) + _, err := db.bucketConn.MutateIn(key, 0, newTimeout).Upsert("events", &events, false).Execute() + if err != nil { + return emperror.WrapWith(err, "Update history failed", "device id", deviceId, + "events", events) } return nil } +// InsertEvent adds an event to the history of the given device id and adds it to the tombstone if a key is given func (db *DbConnection) InsertEvent(deviceId string, event Event, tombstoneMapKey string) error { - if valid, err := isStateValid(deviceId, event); !valid { + if valid, err := isEventValid(deviceId, event); !valid { return emperror.WrapWith(err, "Insert event not attempted", "device id", deviceId, "event", event) } - // get event id given device id + // get event id using the device id counterKey := strings.Join([]string{counterDoc, deviceId}, ":") - eventId, _, err := db.bucketConn.Counter(counterKey, 1, 0, 0) + eventID, _, err := db.bucketConn.Counter(counterKey, 1, 0, 0) if err != nil { return emperror.WrapWith(err, "Failed to get event id", "device id", deviceId) } - event.Id = strconv.FormatUint(eventId, 10) - - // append to the history, create if it doesn't exist (like that java thing?) - historyKey := strings.Join([]string{historyDoc, deviceId}, ":") - _, err = db.bucketConn.ListAppend(historyKey, &event, true) - if err != nil { - return emperror.WrapWith(err, "Failed to add event to history", "device id", deviceId, - "event id", eventId, "event", event) - } - // update expiry time of the list document - newTimeout := time.Now().Add(db.Timeout).Unix() - _, err = db.bucketConn.Touch(historyKey, 0, uint32(newTimeout)) - if err != nil { - return emperror.WrapWith(err, "Failed to update timeout", "device id", deviceId, - "event id", eventId, "event", event) - } + event.Id = strconv.FormatUint(eventID, 10) //if tombstonekey isn't empty string, then set the tombstone map at that key if tombstoneMapKey != "" { tombstoneKey := strings.Join([]string{tombstoneDoc, deviceId}, ":") events := make(map[string]Event) - events[tombstoneKey] = event - _, err := db.bucketConn.Insert(tombstoneKey, &events, 0) + events[tombstoneMapKey] = event + _, err = db.bucketConn.Insert(tombstoneKey, &events, 0) if err != nil && err != gocb.ErrKeyExists { return emperror.WrapWith(err, "Failed to create tombstone", "device id", deviceId, - "event id", eventId, "event", event) + "event id", eventID, "event", event) } - _, err = db.bucketConn.MutateIn(tombstoneKey, 0, 0). - Upsert(tombstoneMapKey, &event, false). - Execute() if err != nil { - return emperror.WrapWith(err, "Failed to add event to tombstone", "device id", deviceId, - "event id", eventId, "event", event) + _, err = db.bucketConn.MutateIn(tombstoneKey, 0, 0). + Upsert(tombstoneMapKey, &event, false). + Execute() + if err != nil { + return emperror.WrapWith(err, "Failed to add event to tombstone", "device id", deviceId, + "event id", eventID, "event", event) + } + } + } + + // append to the history, create if it doesn't exist + newTimeout := uint32(time.Now().Add(db.Timeout).Unix()) + historyKey := strings.Join([]string{historyDoc, deviceId}, ":") + eventDoc := History{ + Events: []Event{event}, + } + _, err = db.bucketConn.Insert(historyKey, &eventDoc, newTimeout) + if err != nil && err != gocb.ErrKeyExists { + return emperror.WrapWith(err, "Failed to create history document", "device id", deviceId, + "event id", eventID, "event", event) + } + if err != nil { + _, err = db.bucketConn.MutateIn(historyKey, 0, newTimeout).ArrayPrepend("events", &event, false).Execute() + if err != nil { + return emperror.WrapWith(err, "Failed to add event to history", "device id", deviceId, + "event id", eventID, "event", event) } } return nil } -func isStateValid(deviceId string, event Event) (bool, error) { +func isEventValid(deviceId string, event Event) (bool, error) { if deviceId == "" { return false, errors.New("Invalid device id") } - if event.Source == "" { + if event.Source == "" || event.Destination == "" || len(event.Details) == 0 { return false, errors.New("Invalid event") } return true, nil } +// RemoveAll removes everything in the database. Used for testing func (db *DbConnection) RemoveAll() error { _, err := db.bucketConn.ExecuteN1qlQuery(gocb.NewN1qlQuery("DELETE FROM devices;"), nil) if err != nil {