Skip to content

Commit

Permalink
Merge pull request #100 from ifwe/add-query-subscribers
Browse files Browse the repository at this point in the history
Commit /subscriptions, /services and /rebuildserviceset API endpoints
  • Loading branch information
mishan committed Mar 7, 2016
2 parents 6884c4c + cd0ec9b commit 0d8cf50
Show file tree
Hide file tree
Showing 10 changed files with 436 additions and 1 deletion.
12 changes: 12 additions & 0 deletions conf/uniqush-push.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ loglevel=standard
log=on
loglevel=standard

[PSPs]
log=on
loglevel=standard

[Subscribe]
log=on
loglevel=standard
Expand All @@ -25,6 +29,14 @@ loglevel=standard
log=on
loglevel=standard

[Subscriptions]
log=on
loglevel=standard

[Services]
log=on
loglevel=standard

[Database]
engine=redis
port=0
Expand Down
20 changes: 20 additions & 0 deletions configparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ const (
LOGGER_WEB = iota
LOGGER_ADDPSP
LOGGER_RMPSP
LOGGER_PSPS
LOGGER_SUB
LOGGER_UNSUB
LOGGER_PUSH
LOGGER_SUBSCRIPTIONS
LOGGER_SERVICES
LOGGER_NR_LOGGERS
)

Expand Down Expand Up @@ -165,6 +168,12 @@ func LoadLoggers(c *conf.ConfigFile) (loggers []log.Logger, err error) {
return
}

loggers[LOGGER_PSPS], err = loadLogger(logfile, c, "PSPs", "[PSPs]")
if err != nil {
loggers = nil
return
}

loggers[LOGGER_SUB], err = loadLogger(logfile, c, "Subscribe", "[Subscribe]")
if err != nil {
loggers = nil
Expand All @@ -182,6 +191,17 @@ func LoadLoggers(c *conf.ConfigFile) (loggers []log.Logger, err error) {
loggers = nil
return
}

loggers[LOGGER_SUBSCRIPTIONS], err = loadLogger(logfile, c, "Subscriptions", "[Subscriptions]")
if err != nil {
loggers = nil
return
}
loggers[LOGGER_SERVICES], err = loadLogger(logfile, c, "Services", "[Services]")
if err != nil {
loggers = nil
return
}
return
}

Expand Down
57 changes: 57 additions & 0 deletions db/pushdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"sync"

"github.com/uniqush/log"
"github.com/uniqush/uniqush-push/push"
)

Expand All @@ -46,6 +47,12 @@ type PushDatabase interface {

ModifyPushServiceProvider(psp *push.PushServiceProvider) error

// Get a set of all push service providers
GetPushServiceProviderConfigs() ([]*push.PushServiceProvider, error)

// RebuildServiceSet() ensures that a set of all PSPs exists. After FixServiceSet is called on a pre-existing uniqush setup, the set of all PSPs will be accurate (Even after calls to AddPushServiceProvider/RemovePushServiceProvider)
RebuildServiceSet() error

// The delivery point may be anonymous whose Name is empty string
// For anonymous delivery point, it will be added to database and its Name will be set
// Return value: selected push service provider, error
Expand All @@ -65,6 +72,8 @@ type PushDatabase interface {
GetPushServiceProviderDeliveryPointPairs(service string,
subscriber string) ([]PushServiceProviderDeliveryPointPair, error)

GetSubscriptions(services []string, user string, logger log.Logger) ([]map[string]string, error)

FlushCache() error
}

Expand Down Expand Up @@ -264,6 +273,38 @@ func (f *pushDatabaseOpts) ModifyPushServiceProvider(psp *push.PushServiceProvid
return addErrorSource("ModifyPushServiceProvider", f.db.SetPushServiceProvider(psp))
}

func (f *pushDatabaseOpts) GetServiceNames() ([]string, error) {
f.dblock.RLock()
defer f.dblock.RUnlock()
psps, err := f.db.GetServiceNames()
if err != nil {
return nil, fmt.Errorf("GetServiceNames: %v", err)
}
return psps, nil
}

func (f *pushDatabaseOpts) GetPushServiceProviderConfigs() ([]*push.PushServiceProvider, error) {
serviceNames, err := f.GetServiceNames()
if err != nil {
return nil, err
}
f.dblock.RLock()
defer f.dblock.RUnlock()
var pspNames []string
for _, serviceName := range serviceNames {
pspsForService, err := f.db.GetPushServiceProvidersByService(serviceName)
if err != nil {
return nil, fmt.Errorf("GetPushServiceProvidersByService couldn't get psps for service %q: %v", serviceName, err)
}
pspNames = append(pspNames, pspsForService...)
}
psps, errs := f.db.GetPushServiceProviderConfigs(pspNames)
if len(errs) > 0 {
return nil, fmt.Errorf("GetServiceNames has invalid configs: %v", errs)
}
return psps, nil
}

func (f *pushDatabaseOpts) ModifyDeliveryPoint(dp *push.DeliveryPoint) error {
if len(dp.Name()) == 0 {
return nil
Expand All @@ -273,6 +314,22 @@ func (f *pushDatabaseOpts) ModifyDeliveryPoint(dp *push.DeliveryPoint) error {
return addErrorSource("ModifyDeliveryPoint", f.db.SetDeliveryPoint(dp))
}

func (f *pushDatabaseOpts) GetSubscriptions(services []string, user string, logger log.Logger) ([]map[string]string, error) {
f.dblock.RLock()
defer f.dblock.RUnlock()
subs, err := f.db.GetSubscriptions(services, user, logger)
if err != nil {
return nil, fmt.Errorf("GetSubscriptions: %v", err)
}
return subs, nil
}

func (f *pushDatabaseOpts) RebuildServiceSet() error {
f.dblock.Lock()
defer f.dblock.Unlock()
return f.db.RebuildServiceSet()
}

func addErrorSource(fnName string, err error) error {
if err == nil {
return nil
Expand Down
164 changes: 164 additions & 0 deletions db/pushredisdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"

redis "github.com/monnand/goredis"
"github.com/uniqush/log"
"github.com/uniqush/uniqush-push/push"
)

Expand All @@ -41,6 +42,7 @@ const (
SERVICE_DELIVERY_POINT_TO_PUSH_SERVICE_PROVIDER_PREFIX string = "srv.dp-2-psp:" // STRING (prefix of) - Maps a service name + delivery point name to the push service provider
SERVICE_TO_PUSH_SERVICE_PROVIDERS_PREFIX string = "srv-2-psp:" // SET (prefix of) - Maps a service name to a set of PSP names
DELIVERY_POINT_COUNTER_PREFIX string = "delivery.point.counter:" // STRING (prefix of) - Maps a delivery point name to the number of subcribers(summed across each service).
SERVICES_SET string = "services{0}" // SET - This is a set of service names.
)

func newPushRedisDB(c *DatabaseConfig) (*PushRedisDB, error) {
Expand Down Expand Up @@ -144,6 +146,35 @@ func (r *PushRedisDB) GetPushServiceProvider(name string) (*push.PushServiceProv
return r.keyValueToPushServiceProvider(name, b)
}

func (r *PushRedisDB) GetPushServiceProviderConfigs(names []string) ([]*push.PushServiceProvider, []error) {
if len(names) == 0 {
return nil, nil
}
keys := make([]string, len(names))
for i, name := range names {
keys[i] = PUSH_SERVICE_PROVIDER_PREFIX + name
}
values, err := r.client.Mget(keys...)
if err != nil {
return nil, []error{fmt.Errorf("GetPushServiceProviderConfigs: %v", err)}
}
errors := make([]error, 0)
psps := make([]*push.PushServiceProvider, 0)
for i, value := range values {
if value == nil {
errors = append(errors, fmt.Errorf("Missing a PushServiceProvider for %q, key %q", names[i], keys[i]))
continue
}
psp, err := r.keyValueToPushServiceProvider(names[i], value)
if err != nil {
errors = append(errors, fmt.Errorf("Invalid psp for %s: %v", names[i], err))
} else {
psps = append(psps, psp)
}
}
return psps, errors
}

func (r *PushRedisDB) SetPushServiceProvider(psp *push.PushServiceProvider) error {
if err := r.client.Set(PUSH_SERVICE_PROVIDER_PREFIX+psp.Name(), pushServiceProviderToValue(psp)); err != nil {
return fmt.Errorf("SetPushServiceProvider %q failed: %v", psp.Name(), err)
Expand Down Expand Up @@ -290,17 +321,150 @@ func (r *PushRedisDB) RemovePushServiceProviderFromService(srv, psp string) erro
if err != nil {
return fmt.Errorf("RemovePSPFromService failed for psp %q of service %q: %v", psp, srv, err)
}
// Unfortunately, a service name might be associated with multiple push service providers, so the check seems to be needed. (/addpsp allows psps with the same service name but different pushservicetypes, if I understand correctly)
exists, err := r.client.Exists(SERVICE_TO_PUSH_SERVICE_PROVIDERS_PREFIX + srv)
if err != nil {
return fmt.Errorf("Unable to determine if service %q still exists after removing psp %q: %v", srv, psp, err)
}
if !exists {
_, err := r.client.Srem(SERVICES_SET, []byte(srv)) // Non-essential. Used to list services in API.
if err != nil {
return fmt.Errorf("Unable to remove %q from set of services", srv)
}
}
return nil
}

func (r *PushRedisDB) AddPushServiceProviderToService(srv, psp string) error {
r.client.Sadd(SERVICES_SET, []byte(srv)) // Non-essential. Used to list services in API.
_, err := r.client.Sadd(SERVICE_TO_PUSH_SERVICE_PROVIDERS_PREFIX+srv, []byte(psp))
if err != nil {
return fmt.Errorf("AddPSPToService failed for psp %q of service %q: %v", psp, srv, err)
}
return nil
}

func (r *PushRedisDB) GetServiceNames() ([]string, error) {
b, err := r.client.Smembers(SERVICES_SET)
if err != nil {
return nil, fmt.Errorf("Could not get services from redis: %v", err)
}
serviceList := make([]string, 0)

for _, service := range b {
serviceList = append(serviceList, string(service))
}
return serviceList, nil
}

// RebuildServiceSet builds the set of unique service
func (r *PushRedisDB) RebuildServiceSet() error {
// Run KEYS, then replace the PSP set with the result of KEYS.
// If any step fails, then return an error.
pspKeys, err := r.client.Keys(PUSH_SERVICE_PROVIDER_PREFIX + "*")
if err != nil {
return fmt.Errorf("Failed to fetch PSPs using redis KEYS command: %v", err)
}

if len(pspKeys) == 0 {
return nil
}

pspNames := make([]string, len(pspKeys))
N := len(PUSH_SERVICE_PROVIDER_PREFIX)
for i, key := range pspKeys {
if len(key) < N || key[:N] != PUSH_SERVICE_PROVIDER_PREFIX {
return fmt.Errorf("KEYS %s* returned %q - this shouldn't happen", PUSH_SERVICE_PROVIDER_PREFIX, key)
}
pspNames[i] = key[N:]
}

psps, errs := r.GetPushServiceProviderConfigs(pspNames)
if len(errs) > 0 {
return fmt.Errorf("RebuildServiceSet: found one or more invalid psps: %v", errs)
}
serviceNameSet := make(map[string]bool)
for i, psp := range psps {
serviceName, ok := psp.FixedData["service"]
if !ok || serviceName == "" {
return fmt.Errorf("RebuildServiceSet: found PSP %q with empty service name: data=%v", pspNames[i], psp)
}
serviceNameSet[serviceName] = true
}
// TODO: Sadd adding multiple values at once.
for serviceName, _ := range serviceNameSet {
_, err := r.client.Sadd(SERVICES_SET, []byte(serviceName))
if err != nil {
return err
}
}
return nil
}

func (r *PushRedisDB) FlushCache() error {
return r.client.Save()
}

func (r *PushRedisDB) GetSubscriptions(queryServices []string, subscriber string, logger log.Logger) ([]map[string]string, error) {
if len(queryServices) == 0 {
definedServices, err := r.GetServiceNames()
if err != nil {
return nil, fmt.Errorf("GetSubscriptions: %v", err)
}
queryServices = definedServices
}

var serviceForDeliveryPointNames []string
var deliveryPointNames []string
for _, service := range queryServices {
if service == "" {
logger.Errorf("empty service defined")
continue
}

deliveryPoints, err := r.client.Smembers(SERVICE_SUBSCRIBER_TO_DELIVERY_POINTS_PREFIX + service + ":" + subscriber)

if err != nil {
return nil, fmt.Errorf("Could not get subscriber information")
}
if len(deliveryPoints) == 0 {
// it is OK to not have delivery points for a service
continue
}
for _, deliveryPointName := range deliveryPoints {
deliveryPointNames = append(deliveryPointNames, string(deliveryPointName))
serviceForDeliveryPointNames = append(serviceForDeliveryPointNames, service)
}
}

if len(deliveryPointNames) == 0 {
// Return empty map without error.
return make([]map[string]string, 0), nil
}

deliveryPointData, err := r.mgetRawDeliveryPoints(deliveryPointNames...)
if err != nil {
return nil, err
}

// Unserialize the subscriptions. If there are any invalid subscriptions, remove them and log it.
// serviceForDeliveryPointNames, deliveryPointNames, and deliveryPointData all use the same index i.
var subscriptions []map[string]string
for i, data := range deliveryPointData {
dpName := deliveryPointNames[i]
service := serviceForDeliveryPointNames[i]
if data != nil {
subscriptionData, err := push.UnserializeSubscription(data)
if err != nil {
logger.Errorf("Error unserializing subscription for delivery point data for dp %q user %q service %q data %v: %v", dpName, subscriber, service, subscriptionData, err)
continue
}
subscriptions = append(subscriptions, subscriptionData)
} else {
logger.Errorf("Redis error fetching subscriber delivery point data for dp %q user %q service %q, removing...", dpName, subscriber, service)
// TODO: Remove corrupt/invalid redis keys for subscribers.
}
}

return subscriptions, nil
}
5 changes: 5 additions & 0 deletions db/rawdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package db

import (
"github.com/uniqush/log"
"github.com/uniqush/uniqush-push/push"
)

Expand Down Expand Up @@ -50,6 +51,7 @@ type pushRawDatabaseWriter interface {
SetPushServiceProvider(psp *push.PushServiceProvider) error
RemoveDeliveryPoint(dp string) error
RemovePushServiceProvider(psp string) error
RebuildServiceSet() error

AddDeliveryPointToServiceSubscriber(srv, sub, dp string) error
RemoveDeliveryPointFromServiceSubscriber(srv, sub, dp string) error
Expand All @@ -66,6 +68,9 @@ type pushRawDatabaseWriter interface {
type pushRawDatabaseReader interface {
GetDeliveryPoint(name string) (*push.DeliveryPoint, error)
GetPushServiceProvider(name string) (*push.PushServiceProvider, error)
GetServiceNames() ([]string, error)
GetPushServiceProviderConfigs([]string) ([]*push.PushServiceProvider, []error)
GetSubscriptions(queryServices []string, subscriber string, logger log.Logger) ([]map[string]string, error)

GetDeliveryPointsNameByServiceSubscriber(srv, sub string) (map[string][]string, error)
GetPushServiceProviderNameByServiceDeliveryPoint(srv, dp string) (string, error)
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
var uniqushPushConfFlags = flag.String("config", "/etc/uniqush/uniqush-push.conf", "Config file path")
var uniqushPushShowVersionFlag = flag.Bool("version", false, "Version info")

var uniqushPushVersion = "uniqush-push 2.0.0"
var uniqushPushVersion = "uniqush-push 2.1.0"

func installPushServices() {
srv.InstallGCM()
Expand Down
Loading

0 comments on commit 0d8cf50

Please sign in to comment.