Skip to content

Commit

Permalink
Db Package Updates
Browse files Browse the repository at this point in the history
- Added Comments
- Did some linting fixes
- Split up insert to call helper functions
- Added some unit tests; some difficult due to no interfaces in gocb itself
  • Loading branch information
kristinapathak committed Jan 17, 2019
1 parent f3bb9e7 commit d627811
Show file tree
Hide file tree
Showing 4 changed files with 434 additions and 64 deletions.
56 changes: 49 additions & 7 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

137 changes: 80 additions & 57 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,41 @@ import (
"time"
)

// Interface describes the main functionality needed to connect to a db
type Interface interface {
Initialize() error
GetHistory(deviceId string) (History, error)
GetTombstone(deviceId string) (map[string]Event, error)
UpdateHistory(deviceId string, events []Event) error
InsertEvent(deviceId string, event Event, tombstoneKey string) error
GetHistory(deviceI string) (History, error)
GetTombstone(deviceID string) (map[string]Event, error)
UpdateHistory(deviceID string, events []Event) error
InsertEvent(deviceID string, event Event, tombstoneKey string) error
RemoveAll() error
}

type bucketWrapper interface {
Manager(username, password string) *gocb.BucketManager
Get(key string, valuePtr interface{}) (gocb.Cas, error)
MutateIn(key string, cas gocb.Cas, expiry uint32) *gocb.MutateInBuilder
Counter(key string, delta, initial int64, expiry uint32) (uint64, gocb.Cas, error)
Insert(key string, value interface{}, expiry uint32) (gocb.Cas, error)
ExecuteN1qlQuery(q *gocb.N1qlQuery, params interface{}) (gocb.QueryResults, error)
}

// the prefixes for the different documents being stored in couchbase
const (
historyDoc = "history"
counterDoc = "counter"
tombstoneDoc = "tombstone"
)

var (
errInvaliddeviceID = errors.New("Invalid device ID")
errInvalidEvent = errors.New("Invalid event")
)

// TODO: Add a way to try to reconnect to the database after a command fails because the connection broke

// DbConnection contains the bucket connection and configuration values
type DbConnection struct {
// Connection contains the bucket connection and configuration values
type Connection struct {
Server string
Username string
Password string
Expand All @@ -54,7 +69,7 @@ type DbConnection struct {
NumRetries int
// the time duration to add when creating TTLs for history documents
Timeout time.Duration
bucketConn *gocb.Bucket
bucketConn bucketWrapper
}

// History is a list of events related to a device id,
Expand All @@ -73,7 +88,7 @@ type Event struct {
// the id for the event
//
// required: true
Id string `json:"id"`
ID string `json:"id"`

// the time this event was found
//
Expand Down Expand Up @@ -112,7 +127,7 @@ type Event struct {
}

// Initialize creates the connection with couchbase and opens the specified bucket
func (db *DbConnection) Initialize() error {
func (db *Connection) Initialize() error {
var err error

cluster, err := gocb.Connect("couchbase://" + db.Server)
Expand Down Expand Up @@ -152,122 +167,130 @@ func (db *DbConnection) Initialize() error {
}

// GetHistory returns the history (list of events) for a given device
func (db *DbConnection) GetHistory(deviceId string) (History, error) {
func (db *Connection) GetHistory(deviceID string) (History, error) {
var (
deviceInfo History
)
if deviceId == "" {
if deviceID == "" {
return History{}, emperror.WrapWith(errors.New("Invalid device id"), "Get history not attempted",
"device id", deviceId)
"device id", deviceID)
}
key := strings.Join([]string{historyDoc, deviceId}, ":")
key := strings.Join([]string{historyDoc, deviceID}, ":")
_, err := db.bucketConn.Get(key, &deviceInfo)
if err != nil {
return History{}, emperror.WrapWith(err, "Getting history from database failed", "device id", deviceId)
return History{}, emperror.WrapWith(err, "Getting history from database failed", "device id", deviceID)
}
return deviceInfo, nil
}

// GetTombstone returns the tombstone (map of events) for a given device
func (db *DbConnection) GetTombstone(deviceId string) (map[string]Event, error) {
func (db *Connection) GetTombstone(deviceID string) (map[string]Event, error) {
var (
deviceInfo map[string]Event
)
if deviceId == "" {
if deviceID == "" {
return map[string]Event{}, emperror.WrapWith(errors.New("Invalid device id"), "Get tombstone not attempted",
"device id", deviceId)
"device id", deviceID)
}
key := strings.Join([]string{tombstoneDoc, deviceId}, ":")
key := strings.Join([]string{tombstoneDoc, deviceID}, ":")
_, err := db.bucketConn.Get(key, &deviceInfo)
if err != nil {
return map[string]Event{}, emperror.WrapWith(err, "Getting tombstone from database failed", "device id", deviceId)
return map[string]Event{}, emperror.WrapWith(err, "Getting tombstone from database failed", "device id", deviceID)
}
return deviceInfo, nil
}

// UpdateHistory updates the history to the list of events given for a given device
func (db *DbConnection) UpdateHistory(deviceId string, events []Event) error {
key := strings.Join([]string{historyDoc, deviceId}, ":")
func (db *Connection) UpdateHistory(deviceID string, events []Event) error {
key := strings.Join([]string{historyDoc, deviceID}, ":")
newTimeout := uint32(time.Now().Add(db.Timeout).Unix())
_, err := db.bucketConn.MutateIn(key, 0, newTimeout).Upsert("events", &events, false).Execute()
if err != nil {
return emperror.WrapWith(err, "Update history failed", "device id", deviceId,
return emperror.WrapWith(err, "Update history failed", "device id", deviceID,
"events", events)
}
return nil
}

// InsertEvent adds an event to the history of the given device id and adds it to the tombstone if a key is given
func (db *DbConnection) InsertEvent(deviceId string, event Event, tombstoneMapKey string) error {
if valid, err := isEventValid(deviceId, event); !valid {
return emperror.WrapWith(err, "Insert event not attempted", "device id", deviceId,
func (db *Connection) InsertEvent(deviceID string, event Event, tombstoneMapKey string) error {
if valid, err := isEventValid(deviceID, event); !valid {
return emperror.WrapWith(err, "Insert event not attempted", "device id", deviceID,
"event", event)
}

// get event id using the device id
counterKey := strings.Join([]string{counterDoc, deviceId}, ":")
counterKey := strings.Join([]string{counterDoc, deviceID}, ":")
eventID, _, err := db.bucketConn.Counter(counterKey, 1, 0, 0)
if err != nil {
return emperror.WrapWith(err, "Failed to get event id", "device id", deviceId)
return emperror.WrapWith(err, "Failed to get event id", "device id", deviceID)
}

event.Id = strconv.FormatUint(eventID, 10)
event.ID = strconv.FormatUint(eventID, 10)

//if tombstonekey isn't empty string, then set the tombstone map at that key
if tombstoneMapKey != "" {
tombstoneKey := strings.Join([]string{tombstoneDoc, deviceId}, ":")
events := make(map[string]Event)
events[tombstoneMapKey] = event
_, err = db.bucketConn.Insert(tombstoneKey, &events, 0)
if err != nil && err != gocb.ErrKeyExists {
return emperror.WrapWith(err, "Failed to create tombstone", "device id", deviceId,
"event id", eventID, "event", event)
err = db.upsertToTombstone(deviceID, tombstoneMapKey, event)
if err != nil {
return err
}
}
// append to the history, create if it doesn't exist
err = db.upsertToHistory(deviceID, event)
return err
}

func (db *Connection) upsertToTombstone(deviceID string, tombstoneMapKey string, event Event) error {
tombstoneKey := strings.Join([]string{tombstoneDoc, deviceID}, ":")
events := map[string]Event{tombstoneMapKey: event}
_, err := db.bucketConn.Insert(tombstoneKey, &events, 0)
if err != nil && err != gocb.ErrKeyExists {
return emperror.WrapWith(err, "Failed to create tombstone", "device id", deviceID,
"event id", event.ID, "event", event)
}
if err != nil {
_, err = db.bucketConn.MutateIn(tombstoneKey, 0, 0).
Upsert(tombstoneMapKey, &event, false).
Execute()
if err != nil {
_, err = db.bucketConn.MutateIn(tombstoneKey, 0, 0).
Upsert(tombstoneMapKey, &event, false).
Execute()
if err != nil {
return emperror.WrapWith(err, "Failed to add event to tombstone", "device id", deviceId,
"event id", eventID, "event", event)
}
return emperror.WrapWith(err, "Failed to add event to tombstone", "device id", deviceID,
"event id", event.ID, "event", event)
}
}
return nil
}

// append to the history, create if it doesn't exist
func (db *Connection) upsertToHistory(deviceID string, event Event) error {
newTimeout := uint32(time.Now().Add(db.Timeout).Unix())
historyKey := strings.Join([]string{historyDoc, deviceId}, ":")
historyKey := strings.Join([]string{historyDoc, deviceID}, ":")
eventDoc := History{
Events: []Event{event},
}
_, err = db.bucketConn.Insert(historyKey, &eventDoc, newTimeout)
_, err := db.bucketConn.Insert(historyKey, &eventDoc, newTimeout)
if err != nil && err != gocb.ErrKeyExists {
return emperror.WrapWith(err, "Failed to create history document", "device id", deviceId,
"event id", eventID, "event", event)
return emperror.WrapWith(err, "Failed to create history document", "device id", deviceID,
"event id", event.ID, "event", event)
}
if err != nil {
_, err = db.bucketConn.MutateIn(historyKey, 0, newTimeout).ArrayPrepend("events", &event, false).Execute()
if err != nil {
return emperror.WrapWith(err, "Failed to add event to history", "device id", deviceId,
"event id", eventID, "event", event)
return emperror.WrapWith(err, "Failed to add event to history", "device id", deviceID,
"event id", event.ID, "event", event)
}
}

return nil
}

func isEventValid(deviceId string, event Event) (bool, error) {
if deviceId == "" {
return false, errors.New("Invalid device id")
func isEventValid(deviceID string, event Event) (bool, error) {
if deviceID == "" {
return false, errInvaliddeviceID
}
if event.Source == "" || event.Destination == "" || len(event.Details) == 0 {
return false, errors.New("Invalid event")
return false, errInvalidEvent
}
return true, nil
}

// RemoveAll removes everything in the database. Used for testing
func (db *DbConnection) RemoveAll() error {
func (db *Connection) RemoveAll() error {
_, err := db.bucketConn.ExecuteN1qlQuery(gocb.NewN1qlQuery("DELETE FROM devices;"), nil)
if err != nil {
return emperror.Wrap(err, "Removing all devices from database failed")
Expand Down
Loading

0 comments on commit d627811

Please sign in to comment.