Skip to content

Commit

Permalink
Changed History Document Type and Added Comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kristinapathak committed Jan 16, 2019
1 parent bab50fe commit a77236b
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 84 deletions.
16 changes: 8 additions & 8 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 0 additions & 27 deletions Gopkg.toml
Original file line number Diff line number Diff line change
@@ -1,30 +1,3 @@
# Gopkg.toml example
#
# Refer to https://golang.github.io/dep/docs/Gopkg.toml.html
# for detailed Gopkg.toml documentation.
#
# required = ["github.com/user/thing/cmd/thing"]
# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
#
# [[constraint]]
# name = "github.com/user/project"
# version = "1.0.0"
#
# [[constraint]]
# name = "github.com/user/project2"
# branch = "dev"
# source = "github.com/myfork/project2"
#
# [[override]]
# name = "github.com/x/y"
# version = "2.4.0"
#
# [prune]
# non-go = false
# go-tests = true
# unused-packages = true


[[constraint]]
name = "github.com/spf13/viper"
version = "1.2.1"
Expand Down
118 changes: 69 additions & 49 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,42 @@ import (

type Interface interface {
Initialize() error
GetHistory(deviceId string) ([]Event, error)
GetHistory(deviceId string) (History, error)
GetTombstone(deviceId string) (map[string]Event, error)
RemoveHistory(deviceId string, numToRemove int) error
UpdateHistory(deviceId string, events []Event) error
InsertEvent(deviceId string, event Event, tombstoneKey string) error
RemoveAll() error
}

// the prefixes for the different documents being stored in couchbase
const (
historyDoc = "history"
counterDoc = "counter"
tombstoneDoc = "tombstone"
)

// TODO: Add a way to try to reconnect to the database after a command fails because the connection broke

// DbConnection contains the bucket connection and configuration values
type DbConnection struct {
Server string
Username string
Password string
Bucket string
Server string
Username string
Password string
Bucket string
// number of times to try when initially connecting to the database
NumRetries int
// the time duration to add when creating TTLs for history documents
Timeout time.Duration
bucketConn *gocb.Bucket
}

// Tombstone hold the map of the last of certain events
// that are saved so that they are not deleted
// History is a list of events related to a device id,
// and has a TTL
//
// swagger:model Tombstone
type Tombstone struct {
Events map[string]Event `json:"events"`
// swagger:model History
type History struct {
// the list of events from newest to oldest
Events []Event `json:"events"`
}

// Event represents the event information in the database
Expand Down Expand Up @@ -104,6 +111,7 @@ type Event struct {
Details map[string]interface{} `json:"details"`
}

// Initialize creates the connection with couchbase and opens the specified bucket
func (db *DbConnection) Initialize() error {
var err error

Expand Down Expand Up @@ -143,22 +151,24 @@ func (db *DbConnection) Initialize() error {
return nil
}

func (db *DbConnection) GetHistory(deviceId string) ([]Event, error) {
// GetHistory returns the history (list of events) for a given device
func (db *DbConnection) GetHistory(deviceId string) (History, error) {
var (
deviceInfo []Event
deviceInfo History
)
if deviceId == "" {
return []Event{}, emperror.WrapWith(errors.New("Invalid device id"), "Get history not attempted",
return History{}, emperror.WrapWith(errors.New("Invalid device id"), "Get history not attempted",
"device id", deviceId)
}
key := strings.Join([]string{historyDoc, deviceId}, ":")
_, err := db.bucketConn.Get(key, &deviceInfo)
if err != nil {
return []Event{}, emperror.WrapWith(err, "Getting history from database failed", "device id", deviceId)
return History{}, emperror.WrapWith(err, "Getting history from database failed", "device id", deviceId)
}
return deviceInfo, nil
}

// GetTombstone returns the tombstone (map of events) for a given device
func (db *DbConnection) GetTombstone(deviceId string) (map[string]Event, error) {
var (
deviceInfo map[string]Event
Expand All @@ -175,78 +185,88 @@ func (db *DbConnection) GetTombstone(deviceId string) (map[string]Event, error)
return deviceInfo, nil
}

func (db *DbConnection) RemoveHistory(deviceId string, numToRemove int) error {
// UpdateHistory updates the history to the list of events given for a given device
func (db *DbConnection) UpdateHistory(deviceId string, events []Event) error {
key := strings.Join([]string{historyDoc, deviceId}, ":")
for a := 0; a < numToRemove; a++ {
_, err := db.bucketConn.ListRemove(key, 0)
return emperror.WrapWith(err, "Removing from history failed", "number of events successfully removed", a,
"device id", deviceId)
newTimeout := uint32(time.Now().Add(db.Timeout).Unix())
_, err := db.bucketConn.MutateIn(key, 0, newTimeout).Upsert("events", &events, false).Execute()
if err != nil {
return emperror.WrapWith(err, "Update history failed", "device id", deviceId,
"events", events)
}
return nil
}

// InsertEvent adds an event to the history of the given device id and adds it to the tombstone if a key is given
func (db *DbConnection) InsertEvent(deviceId string, event Event, tombstoneMapKey string) error {
if valid, err := isStateValid(deviceId, event); !valid {
if valid, err := isEventValid(deviceId, event); !valid {
return emperror.WrapWith(err, "Insert event not attempted", "device id", deviceId,
"event", event)
}

// get event id given device id
// get event id using the device id
counterKey := strings.Join([]string{counterDoc, deviceId}, ":")
eventId, _, err := db.bucketConn.Counter(counterKey, 1, 0, 0)
eventID, _, err := db.bucketConn.Counter(counterKey, 1, 0, 0)
if err != nil {
return emperror.WrapWith(err, "Failed to get event id", "device id", deviceId)
}

event.Id = strconv.FormatUint(eventId, 10)

// append to the history, create if it doesn't exist (like that java thing?)
historyKey := strings.Join([]string{historyDoc, deviceId}, ":")
_, err = db.bucketConn.ListAppend(historyKey, &event, true)
if err != nil {
return emperror.WrapWith(err, "Failed to add event to history", "device id", deviceId,
"event id", eventId, "event", event)
}
// update expiry time of the list document
newTimeout := time.Now().Add(db.Timeout).Unix()
_, err = db.bucketConn.Touch(historyKey, 0, uint32(newTimeout))
if err != nil {
return emperror.WrapWith(err, "Failed to update timeout", "device id", deviceId,
"event id", eventId, "event", event)
}
event.Id = strconv.FormatUint(eventID, 10)

//if tombstonekey isn't empty string, then set the tombstone map at that key
if tombstoneMapKey != "" {
tombstoneKey := strings.Join([]string{tombstoneDoc, deviceId}, ":")
events := make(map[string]Event)
events[tombstoneKey] = event
_, err := db.bucketConn.Insert(tombstoneKey, &events, 0)
events[tombstoneMapKey] = event
_, err = db.bucketConn.Insert(tombstoneKey, &events, 0)
if err != nil && err != gocb.ErrKeyExists {
return emperror.WrapWith(err, "Failed to create tombstone", "device id", deviceId,
"event id", eventId, "event", event)
"event id", eventID, "event", event)
}
_, err = db.bucketConn.MutateIn(tombstoneKey, 0, 0).
Upsert(tombstoneMapKey, &event, false).
Execute()
if err != nil {
return emperror.WrapWith(err, "Failed to add event to tombstone", "device id", deviceId,
"event id", eventId, "event", event)
_, err = db.bucketConn.MutateIn(tombstoneKey, 0, 0).
Upsert(tombstoneMapKey, &event, false).
Execute()
if err != nil {
return emperror.WrapWith(err, "Failed to add event to tombstone", "device id", deviceId,
"event id", eventID, "event", event)
}
}
}

// append to the history, create if it doesn't exist
newTimeout := uint32(time.Now().Add(db.Timeout).Unix())
historyKey := strings.Join([]string{historyDoc, deviceId}, ":")
eventDoc := History{
Events: []Event{event},
}
_, err = db.bucketConn.Insert(historyKey, &eventDoc, newTimeout)
if err != nil && err != gocb.ErrKeyExists {
return emperror.WrapWith(err, "Failed to create history document", "device id", deviceId,
"event id", eventID, "event", event)
}
if err != nil {
_, err = db.bucketConn.MutateIn(historyKey, 0, newTimeout).ArrayPrepend("events", &event, false).Execute()
if err != nil {
return emperror.WrapWith(err, "Failed to add event to history", "device id", deviceId,
"event id", eventID, "event", event)
}
}

return nil
}

func isStateValid(deviceId string, event Event) (bool, error) {
func isEventValid(deviceId string, event Event) (bool, error) {
if deviceId == "" {
return false, errors.New("Invalid device id")
}
if event.Source == "" {
if event.Source == "" || event.Destination == "" || len(event.Details) == 0 {
return false, errors.New("Invalid event")
}
return true, nil
}

// RemoveAll removes everything in the database. Used for testing
func (db *DbConnection) RemoveAll() error {
_, err := db.bucketConn.ExecuteN1qlQuery(gocb.NewN1qlQuery("DELETE FROM devices;"), nil)
if err != nil {
Expand Down

0 comments on commit a77236b

Please sign in to comment.