Skip to content

Commit

Permalink
Merge pull request #87 from ifwe/switch-to-json-response
Browse files Browse the repository at this point in the history
Change the response format from logger output to a structured JSON response
  • Loading branch information
mishan committed Feb 17, 2016
2 parents 4f49f4d + 2a3f6c1 commit e17e001
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 62 deletions.
15 changes: 8 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,19 @@ import (
"fmt"
"os"
"runtime"
. "github.com/uniqush/uniqush-push/srv"

"github.com/uniqush/uniqush-push/srv"
)

var uniqushPushConfFlags = flag.String("config", "/etc/uniqush/uniqush-push.conf", "Config file path")
var uniqushPushShowVersionFlag = flag.Bool("version", false, "Version info")

var uniqushPushVersion = "uniqush-push 1.5.2"
var uniqushPushVersion = "uniqush-push 2.0.0"

func installPushSrvices() {
InstallGCM()
InstallAPNS()
InstallADM()
func installPushServices() {
srv.InstallGCM()
srv.InstallAPNS()
srv.InstallADM()
}

func main() {
Expand All @@ -43,7 +44,7 @@ func main() {
return
}
runtime.GOMAXPROCS(runtime.NumCPU() + 1)
installPushSrvices()
installPushServices()

err := Run(*uniqushPushConfFlags, uniqushPushVersion)
if err != nil {
Expand Down
70 changes: 49 additions & 21 deletions pushbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package main
import (
"sync"
"time"

. "github.com/uniqush/log"
. "github.com/uniqush/uniqush-push/db"
. "github.com/uniqush/uniqush-push/push"
Expand Down Expand Up @@ -84,7 +85,8 @@ func (self *PushBackEnd) Unsubscribe(service, sub string, dp *DeliveryPoint) err
func (self *PushBackEnd) processError() {
for err := range self.errChan {
rid := randomUniqId()
e := self.fixError(rid, err, self.loggers[LOGGER_PUSH], 0*time.Second)
nullHandler := &NullApiResponseHandler{}
e := self.fixError(rid, "", err, self.loggers[LOGGER_PUSH], 0*time.Second, nullHandler)
switch e0 := e.(type) {
case *InfoReport:
self.loggers[LOGGER_PUSH].Infof("%v", e0)
Expand All @@ -94,7 +96,7 @@ func (self *PushBackEnd) processError() {
}
}

func (self *PushBackEnd) fixError(reqId string, event error, logger Logger, after time.Duration) error {
func (self *PushBackEnd) fixError(reqId string, remoteAddr string, event error, logger Logger, after time.Duration, handler ApiResponseHandler) error {
var service string
var sub string
var ok bool
Expand All @@ -115,17 +117,20 @@ func (self *PushBackEnd) fixError(reqId string, event error, logger Logger, afte
if after <= 1*time.Second {
after = 5 * time.Second
}
providerName := err.Provider.Name()
destinationName := err.Destination.Name()
if after > 1*time.Minute {
logger.Errorf("RequestID=%v Service=%v Subscriber=%v PushServiceProvider=%v DeliveryPoint=%v Failed after retry", reqId, service, sub, err.Provider.Name(), err.Destination.Name())
logger.Errorf("RequestID=%v Service=%v Subscriber=%v PushServiceProvider=%v DeliveryPoint=%v Failed after retry", reqId, service, sub, providerName, destinationName)
handler.AddDetailsToHandler(ApiResponseDetails{RequestId: &reqId, From: &remoteAddr, Service: &service, Subscriber: &sub, PushServiceProvider: &providerName, DeliveryPoint: &destinationName, Code: UNIQUSH_ERROR_FAILED_RETRY})
return nil
}
logger.Infof("RequestID=%v Service=%v Subscriber=%v PushServiceProvider=%v DeliveryPoint=%v Retry after %v", reqId, service, sub, err.Provider.Name(), err.Destination.Name(), after)
logger.Infof("RequestID=%v Service=%v Subscriber=%v PushServiceProvider=%v DeliveryPoint=%v Retry after %v", reqId, service, sub, providerName, destinationName, after)
go func() {
<-time.After(after)
subs := make([]string, 1)
subs[0] = sub
after = 2 * after
self.pushImpl(reqId, service, subs, err.Content, nil, self.loggers[LOGGER_PUSH], err.Provider, err.Destination, after)
self.pushImpl(reqId, remoteAddr, service, subs, err.Content, nil, self.loggers[LOGGER_PUSH], err.Provider, err.Destination, after, handler)
}()
case *PushServiceProviderUpdate:
if err.Provider == nil {
Expand All @@ -136,10 +141,13 @@ func (self *PushBackEnd) fixError(reqId string, event error, logger Logger, afte
}
psp := err.Provider
e := self.db.ModifyPushServiceProvider(psp)
pspName := psp.Name()
if e != nil {
logger.Errorf("RequestID=%v Service=%v PushServiceProvider=%v Update Failed: %v", reqId, service, psp.Name(), e)
logger.Errorf("RequestID=%v Service=%v PushServiceProvider=%v Update Failed: %v", reqId, service, pspName, e)
handler.AddDetailsToHandler(ApiResponseDetails{RequestId: &reqId, From: &remoteAddr, Service: &service, PushServiceProvider: &pspName, Code: UNIQUSH_ERROR_UPDATE_PUSH_SERVICE_PROVIDER})
} else {
logger.Infof("RequestID=%v Service=%v PushServiceProvider=%v Update Success", reqId, service, psp.Name())
logger.Infof("RequestID=%v Service=%v PushServiceProvider=%v Update Success", reqId, service, pspName)
handler.AddDetailsToHandler(ApiResponseDetails{RequestId: &reqId, From: &remoteAddr, Service: &service, PushServiceProvider: &pspName, Code: UNIQUSH_SUCCESS})
}
case *DeliveryPointUpdate:
if err.Destination == nil {
Expand All @@ -150,10 +158,13 @@ func (self *PushBackEnd) fixError(reqId string, event error, logger Logger, afte
}
dp := err.Destination
e := self.db.ModifyDeliveryPoint(dp)
dpName := dp.Name()
if e != nil {
logger.Errorf("Subscriber=%v DeliveryPoint=%v Update Failed: %v", sub, dp.Name(), e)
logger.Errorf("Subscriber=%v DeliveryPoint=%v Update Failed: %v", sub, dpName, e)
handler.AddDetailsToHandler(ApiResponseDetails{RequestId: &reqId, From: &remoteAddr, Subscriber: &sub, Service: &service, DeliveryPoint: &dpName, Code: UNIQUSH_ERROR_UPDATE_DELIVERY_POINT})
} else {
logger.Infof("Service=%v Subscriber=%v DeliveryPoint=%v Update Success", service, sub, dp.Name())
logger.Infof("Service=%v Subscriber=%v DeliveryPoint=%v Update Success", service, sub, dpName)
handler.AddDetailsToHandler(ApiResponseDetails{RequestId: &reqId, From: &remoteAddr, Subscriber: &sub, Service: &service, DeliveryPoint: &dpName, Code: UNIQUSH_SUCCESS})
}
case *InvalidRegistrationUpdate:
if err.Provider == nil || err.Destination == nil {
Expand All @@ -167,10 +178,13 @@ func (self *PushBackEnd) fixError(reqId string, event error, logger Logger, afte
}
dp := err.Destination
e := self.Unsubscribe(service, sub, dp)
dpName := dp.Name()
if e != nil {
logger.Errorf("Service=%v Subscriber=%v DeliveryPoint=%v Removing invalid reg failed: %v", service, sub, dp.Name(), e)
logger.Errorf("Service=%v Subscriber=%v DeliveryPoint=%v Removing invalid reg failed: %v", service, sub, dpName, e)
handler.AddDetailsToHandler(ApiResponseDetails{RequestId: &reqId, From: &remoteAddr, Service: &service, Subscriber: &sub, DeliveryPoint: &dpName, Code: UNIQUSH_REMOVE_INVALID_REG})
} else {
logger.Infof("Service=%v Subscriber=%v DeliveryPoint=%v Invalid registration removed", service, sub, dp.Name())
logger.Infof("Service=%v Subscriber=%v DeliveryPoint=%v Invalid registration removed", service, sub, dpName)
handler.AddDetailsToHandler(ApiResponseDetails{RequestId: &reqId, From: &remoteAddr, Service: &service, Subscriber: &sub, DeliveryPoint: &dpName, Code: UNIQUSH_REMOVE_INVALID_REG})
}
case *UnsubscribeUpdate:
if err.Provider == nil || err.Destination == nil {
Expand All @@ -184,32 +198,41 @@ func (self *PushBackEnd) fixError(reqId string, event error, logger Logger, afte
}
dp := err.Destination
e := self.Unsubscribe(service, sub, dp)
dpName := dp.Name()
if e != nil {
logger.Errorf("Service=%v Subscriber=%v DeliveryPoint=%v Unsubscribe failed: %v", service, sub, dp.Name(), e)
logger.Errorf("Service=%v Subscriber=%v DeliveryPoint=%v Unsubscribe failed: %v", service, sub, dpName, e)
handler.AddDetailsToHandler(ApiResponseDetails{RequestId: &reqId, From: &remoteAddr, Service: &service, Subscriber: &sub, DeliveryPoint: &dpName, Code: UNIQUSH_UPDATE_UNSUBSCRIBE})
} else {
logger.Infof("Service=%v Subscriber=%v DeliveryPoint=%v Unsubscribe success", service, sub, dp.Name())
logger.Infof("Service=%v Subscriber=%v DeliveryPoint=%v Unsubscribe success", service, sub, dpName)
handler.AddDetailsToHandler(ApiResponseDetails{RequestId: &reqId, From: &remoteAddr, Service: &service, Subscriber: &sub, DeliveryPoint: &dpName, Code: UNIQUSH_UPDATE_UNSUBSCRIBE})
}
default:
return err
}
return nil
}

func (self *PushBackEnd) collectResult(reqId string, service string, resChan <-chan *PushResult, logger Logger, after time.Duration) {
func (self *PushBackEnd) collectResult(reqId string, remoteAddr string, service string, resChan <-chan *PushResult, logger Logger, after time.Duration, handler ApiResponseHandler) {
for res := range resChan {
var sub string
var ok bool
if res.Provider != nil && res.Destination != nil {
if sub, ok = res.Destination.FixedData["subscriber"]; !ok {
logger.Errorf("RequestID=%v Subscriber=%v DeliveryPoint=%v Bad Delivery Point: No subscriber", reqId, sub, res.Destination.Name())
destinationName := res.Destination.Name()
logger.Errorf("RequestID=%v Subscriber=%v DeliveryPoint=%v Bad Delivery Point: No subscriber", reqId, sub, destinationName)
handler.AddDetailsToHandler(ApiResponseDetails{RequestId: &reqId, From: &remoteAddr, Service: &service, Subscriber: &sub, DeliveryPoint: &destinationName, Code: UNIQUSH_ERROR_BAD_DELIVERY_POINT})
continue
}
}
if res.Err == nil {
logger.Infof("RequestID=%v Service=%v Subscriber=%v PushServiceProvider=%v DeliveryPoint=%v MsgId=%v Success!", reqId, service, sub, res.Provider.Name(), res.Destination.Name(), res.MsgId)
providerName := res.Provider.Name()
destinationName := res.Destination.Name()
msgId := res.MsgId
logger.Infof("RequestID=%v Service=%v Subscriber=%v PushServiceProvider=%v DeliveryPoint=%v MsgId=%v Success!", reqId, service, sub, providerName, destinationName, msgId)
handler.AddDetailsToHandler(ApiResponseDetails{RequestId: &reqId, From: &remoteAddr, Service: &service, Subscriber: &sub, PushServiceProvider: &providerName, DeliveryPoint: &destinationName, MessageId: &msgId, Code: UNIQUSH_SUCCESS})
continue
}
err := self.fixError(reqId, res.Err, logger, after)
err := self.fixError(reqId, remoteAddr, res.Err, logger, after, handler)
if err != nil {
pspName := "Unknown"
dpName := "Unknown"
Expand All @@ -220,6 +243,7 @@ func (self *PushBackEnd) collectResult(reqId string, service string, resChan <-c
dpName = res.Destination.Name()
}
logger.Errorf("RequestID=%v Service=%v Subscriber=%v PushServiceProvider=%v DeliveryPoint=%v Failed: %v", reqId, service, sub, pspName, dpName, err)
handler.AddDetailsToHandler(ApiResponseDetails{RequestId: &reqId, From: &remoteAddr, Service: &service, Subscriber: &sub, PushServiceProvider: &pspName, DeliveryPoint: &dpName, Code: UNIQUSH_ERROR_GENERIC})
}
}
}
Expand All @@ -233,11 +257,11 @@ func (self *PushBackEnd) NumberOfDeliveryPoints(service, sub string, logger Logg
return len(pspDpList)
}

func (self *PushBackEnd) Push(reqId string, service string, subs []string, notif *Notification, perdp map[string][]string, logger Logger) {
self.pushImpl(reqId, service, subs, notif, perdp, logger, nil, nil, 0*time.Second)
func (self *PushBackEnd) Push(reqId string, remoteAddr string, service string, subs []string, notif *Notification, perdp map[string][]string, logger Logger, handler ApiResponseHandler) {
self.pushImpl(reqId, remoteAddr, service, subs, notif, perdp, logger, nil, nil, 0*time.Second, handler)
}

func (self *PushBackEnd) pushImpl(reqId string, service string, subs []string, notif *Notification, perdp map[string][]string, logger Logger, provider *PushServiceProvider, dest *DeliveryPoint, after time.Duration) {
func (self *PushBackEnd) pushImpl(reqId string, remoteAddr string, service string, subs []string, notif *Notification, perdp map[string][]string, logger Logger, provider *PushServiceProvider, dest *DeliveryPoint, after time.Duration, handler ApiResponseHandler) {
dpChanMap := make(map[string]chan *DeliveryPoint)
wg := new(sync.WaitGroup)
for _, sub := range subs {
Expand All @@ -252,12 +276,14 @@ func (self *PushBackEnd) pushImpl(reqId string, service string, subs []string, n
pspDpList, err = self.db.GetPushServiceProviderDeliveryPointPairs(service, sub)
if err != nil {
logger.Errorf("RequestID=%v Service=%v Subscriber=%v Failed: Database Error %v", reqId, service, sub, err)
handler.AddDetailsToHandler(ApiResponseDetails{RequestId: &reqId, From: &remoteAddr, Service: &service, Subscriber: &sub, Code: UNIQUSH_ERROR_DATABASE})
continue
}
}

if len(pspDpList) == 0 {
logger.Errorf("RequestID=%v Service=%v Subscriber=%v Failed: No device", reqId, service, sub)
handler.AddDetailsToHandler(ApiResponseDetails{RequestId: &reqId, From: &remoteAddr, Service: &service, Subscriber: &sub, Code: UNIQUSH_ERROR_NO_DEVICE})
continue
}

Expand All @@ -266,10 +292,12 @@ func (self *PushBackEnd) pushImpl(reqId string, service string, subs []string, n
dp := pair.DeliveryPoint
if psp == nil {
logger.Errorf("RequestID=%v Service=%v Subscriber=%v Failed once: nil Push Service Provider", reqId, service, sub)
handler.AddDetailsToHandler(ApiResponseDetails{RequestId: &reqId, From: &remoteAddr, Service: &service, Subscriber: &sub, Code: UNIQUSH_ERROR_NO_PUSH_SERVICE_PROVIDER})
continue
}
if dp == nil {
logger.Errorf("RequestID=%v Service=%v Subscriber=%v Failed once: nil Delivery Point", reqId, service, sub)
handler.AddDetailsToHandler(ApiResponseDetails{RequestId: &reqId, From: &remoteAddr, Service: &service, Subscriber: &sub, Code: UNIQUSH_ERROR_NO_DELIVERY_POINT})
continue
}
var ch chan *DeliveryPoint
Expand All @@ -294,7 +322,7 @@ func (self *PushBackEnd) pushImpl(reqId string, service string, subs []string, n
}()
wg.Add(1)
go func() {
self.collectResult(reqId, service, resChan, logger, after)
self.collectResult(reqId, remoteAddr, service, resChan, logger, after, handler)
wg.Done()
}()
}
Expand Down
Loading

0 comments on commit e17e001

Please sign in to comment.