Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code cleanup #63

Merged
merged 9 commits into from
Jun 26, 2024
Merged
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,10 @@ integration-test-container-dev: build-image-dev start-deps-container-dev test-db

.PHONY: mocks
mocks:
$(MOCKGENERATE) -source=interfaces/client.go -destination=mocks/firebase/client.go
$(MOCKGENERATE) -source=interfaces/client.go -destination=mocks/interfaces/client.go
$(MOCKGENERATE) -source=interfaces/apns.go -destination=mocks/interfaces/apns.go
$(MOCKGENERATE) -source=interfaces/statsd.go -destination=mocks/interfaces/statsd.go
$(MOCKGENERATE) -source=interfaces/stats_reporter.go -destination=mocks/interfaces/stats_reporter.go
$(MOCKGENERATE) -source=interfaces/feedback_reporter.go -destination=mocks/interfaces/feedback_reporter.go
$(MOCKGENERATE) -source=interfaces/message_handler.go -destination=mocks/interfaces/message_handler.go
$(MOCKGENERATE) -source=interfaces/rate_limiter.go -destination=mocks/interfaces/rate_limiter.go
10 changes: 5 additions & 5 deletions e2e/fcm_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ import (
"github.com/stretchr/testify/suite"
"github.com/topfreegames/pusher/config"
"github.com/topfreegames/pusher/extensions"
"github.com/topfreegames/pusher/extensions/handler"
"github.com/topfreegames/pusher/extensions/firebase"
"github.com/topfreegames/pusher/interfaces"
firebaseMock "github.com/topfreegames/pusher/mocks/firebase"
mocks "github.com/topfreegames/pusher/mocks/interfaces"
"github.com/topfreegames/pusher/pusher"
"go.uber.org/mock/gomock"
Expand Down Expand Up @@ -46,7 +45,7 @@ func (s *FcmE2ETestSuite) SetupSuite() {
s.vConfig = v
}

func (s *FcmE2ETestSuite) setupFcmPusher(appName string) (*firebaseMock.MockPushClient, *mocks.MockStatsDClient) {
func (s *FcmE2ETestSuite) setupFcmPusher(appName string) (*mocks.MockPushClient, *mocks.MockStatsDClient) {
ctrl := gomock.NewController(s.T())

statsdClientMock := mocks.NewMockStatsDClient(ctrl)
Expand Down Expand Up @@ -74,14 +73,15 @@ func (s *FcmE2ETestSuite) setupFcmPusher(appName string) (*firebaseMock.MockPush
limit := s.vConfig.GetInt("gcm.rateLimit.rpm")
rateLimiter := extensions.NewRateLimiter(limit, s.vConfig, []interfaces.StatsReporter{statsReport}, logger)

pushClient := firebaseMock.NewMockPushClient(ctrl)
pushClient := mocks.NewMockPushClient(ctrl)
gcmPusher.MessageHandler = map[string]interfaces.MessageHandler{
appName: handler.NewMessageHandler(
appName: firebase.NewMessageHandler(
appName,
pushClient,
[]interfaces.FeedbackReporter{},
[]interfaces.StatsReporter{statsReport},
rateLimiter,
nil,
logger,
s.config.GCM.ConcurrentWorkers,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package extensions
package apns

import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/topfreegames/pusher/extensions"
"os"
"sync"
"time"
Expand All @@ -40,8 +41,6 @@ import (
"github.com/topfreegames/pusher/structs"
)

var apnsResMutex sync.Mutex

// pusherAPNSKafkaMessage is the notification format received in Kafka messages.
type pusherAPNSKafkaMessage struct {
ApnsID string
Expand Down Expand Up @@ -72,10 +71,8 @@ type APNSMessageHandler struct {
sentMessages int64
ignoredMessages int64
successesReceived int64
requestsHeap *TimeoutHeap
CacheCleaningInterval int
IsProduction bool
consumptionManager interfaces.ConsumptionManager
retryInterval time.Duration
maxRetryAttempts uint
rateLimiter interfaces.RateLimiter
Expand All @@ -93,31 +90,22 @@ func NewAPNSMessageHandler(
statsReporters []interfaces.StatsReporter,
feedbackReporters []interfaces.FeedbackReporter,
pushQueue interfaces.APNSPushQueue,
consumptionManager interfaces.ConsumptionManager,
rateLimiter interfaces.RateLimiter,
) (*APNSMessageHandler, error) {
a := &APNSMessageHandler{
authKeyPath: authKeyPath,
keyID: keyID,
teamID: teamID,
ApnsTopic: topic,
appName: appName,
Config: config,
failuresReceived: 0,
feedbackReporters: feedbackReporters,
IsProduction: isProduction,
Logger: logger,
pendingMessagesWG: pendingMessagesWG,
ignoredMessages: 0,
inFlightNotificationsMapLock: &sync.Mutex{},
responsesReceived: 0,
sentMessages: 0,
StatsReporters: statsReporters,
successesReceived: 0,
requestsHeap: NewTimeoutHeap(config),
PushQueue: pushQueue,
consumptionManager: consumptionManager,
rateLimiter: rateLimiter,
authKeyPath: authKeyPath,
keyID: keyID,
teamID: teamID,
ApnsTopic: topic,
appName: appName,
Config: config,
feedbackReporters: feedbackReporters,
IsProduction: isProduction,
Logger: logger,
pendingMessagesWG: pendingMessagesWG,
StatsReporters: statsReporters,
PushQueue: pushQueue,
rateLimiter: rateLimiter,
}

if a.Logger != nil {
Expand Down Expand Up @@ -196,35 +184,26 @@ func (a *APNSMessageHandler) HandleMessages(ctx context.Context, message interfa
if err != nil {
l.WithError(err).Error("error parsing kafka message")
a.waitGroupDone()
apnsResMutex.Lock()
a.ignoredMessages++
apnsResMutex.Unlock()
return
}
l = l.WithField("notification", parsedNotification)

allowed := a.rateLimiter.Allow(ctx, parsedNotification.DeviceToken, a.appName, "apns")
if !allowed {
extensions.StatsReporterNotificationRateLimitReached(a.StatsReporters, a.appName, "apns")
a.waitGroupDone()
return
}

n, err := a.buildAndValidateNotification(parsedNotification)
if err != nil {
l.WithError(err).Error("notification is invalid")
a.waitGroupDone()
apnsResMutex.Lock()
a.ignoredMessages++
apnsResMutex.Unlock()
return
}

allowed := a.rateLimiter.Allow(ctx, parsedNotification.DeviceToken, a.appName, "apns")
if !allowed {
statsReporterNotificationRateLimitReached(a.StatsReporters, a.appName, "apns")
l.WithField("message", message).Warn("rate limit reached")
return
}

a.sendNotification(n)
statsReporterHandleNotificationSent(a.StatsReporters, a.appName, "apns")

apnsResMutex.Lock()
a.sentMessages++
apnsResMutex.Unlock()
extensions.StatsReporterHandleNotificationSent(a.StatsReporters, a.appName, "apns")
}

func (a *APNSMessageHandler) parseKafkaMessage(message interfaces.KafkaMessage) (*pusherAPNSKafkaMessage, error) {
Expand All @@ -240,8 +219,8 @@ func (a *APNSMessageHandler) parseKafkaMessage(message interfaces.KafkaMessage)
}
notification.Metadata["game"] = a.appName
notification.Metadata["deviceToken"] = notification.DeviceToken
hostname, err := os.Hostname()

hostname, err := os.Hostname()
if err != nil {
a.Logger.WithError(err).Error("error retrieving hostname")
} else {
Expand All @@ -253,7 +232,7 @@ func (a *APNSMessageHandler) parseKafkaMessage(message interfaces.KafkaMessage)
}

func (a *APNSMessageHandler) buildAndValidateNotification(notification *pusherAPNSKafkaMessage) (*structs.ApnsNotification, error) {
if notification.PushExpiry > 0 && notification.PushExpiry < MakeTimestamp() {
if notification.PushExpiry > 0 && notification.PushExpiry < extensions.MakeTimestamp() {
return nil, errors.New("push message has expired")
}

Expand All @@ -277,7 +256,7 @@ func (a *APNSMessageHandler) buildAndValidateNotification(notification *pusherAP

func (a *APNSMessageHandler) sendNotification(notification *structs.ApnsNotification) {
before := time.Now()
defer statsReporterReportSendNotificationLatency(a.StatsReporters, time.Since(before), a.appName, "apns", "client", "apns")
defer extensions.StatsReporterReportSendNotificationLatency(a.StatsReporters, time.Since(before), a.appName, "apns", "client", "apns")

notification.SendAttempts += 1
a.PushQueue.Push(notification)
Expand Down Expand Up @@ -311,106 +290,38 @@ func (a *APNSMessageHandler) handleAPNSResponse(responseWithMetadata *structs.Re
}
delete(responseWithMetadata.Metadata, "timestamp")

apnsResMutex.Lock()
a.responsesReceived++
apnsResMutex.Unlock()

parsedTopic := ParsedTopic{
parsedTopic := extensions.ParsedTopic{
Game: a.appName,
Platform: "apns",
}

a.waitGroupDone()

if responseWithMetadata.Reason == "" {
sendFeedbackErr := sendToFeedbackReporters(a.feedbackReporters, responseWithMetadata, parsedTopic)
if sendFeedbackErr != nil {
l.WithError(sendFeedbackErr).Error("error sending feedback to reporter")
}
apnsResMutex.Lock()
a.successesReceived++
apnsResMutex.Unlock()
statsReporterHandleNotificationSuccess(a.StatsReporters, a.appName, "apns")
extensions.StatsReporterHandleNotificationSuccess(a.StatsReporters, a.appName, "apns")
return nil
}

apnsResMutex.Lock()
a.failuresReceived++
apnsResMutex.Unlock()

pErr := pusher_errors.NewPushError(a.mapErrorReason(responseWithMetadata.Reason), responseWithMetadata.Reason)
responseWithMetadata.Err = pErr
statsReporterHandleNotificationFailure(a.StatsReporters, a.appName, "apns", pErr)
err := pErr
l.Info("notification failed")
extensions.StatsReporterHandleNotificationFailure(a.StatsReporters, a.appName, "apns", pErr)

switch responseWithMetadata.Reason {
case apns2.ReasonBadDeviceToken, apns2.ReasonUnregistered, apns2.ReasonTopicDisallowed, apns2.ReasonDeviceTokenNotForTopic:
// https://developer.apple.com/library/content/documentation/NetworkingInternet/Conceptual/RemoteNotificationsPG/CommunicatingwithAPNs.html
l.WithFields(log.Fields{
"category": "TokenError",
log.ErrorKey: responseWithMetadata.Reason,
}).Debug("received an error")
if responseWithMetadata.Metadata != nil {
responseWithMetadata.Metadata["deleteToken"] = true
if responseWithMetadata.Metadata == nil {
responseWithMetadata.Metadata = map[string]interface{}{}
}
case apns2.ReasonBadCertificate, apns2.ReasonBadCertificateEnvironment, apns2.ReasonForbidden:
l.WithFields(log.Fields{
"category": "CertificateError",
log.ErrorKey: responseWithMetadata.Reason,
}).Debug("received an error")
case apns2.ReasonExpiredProviderToken, apns2.ReasonInvalidProviderToken, apns2.ReasonMissingProviderToken:
l.WithFields(log.Fields{
"category": "ProviderTokenError",
log.ErrorKey: responseWithMetadata.Reason,
}).Debug("received an error")
case apns2.ReasonMissingTopic:
l.WithFields(log.Fields{
"category": "TopicError",
log.ErrorKey: responseWithMetadata.Reason,
}).Debug("received an error")
case apns2.ReasonIdleTimeout, apns2.ReasonShutdown, apns2.ReasonInternalServerError, apns2.ReasonServiceUnavailable:
l.WithFields(log.Fields{
"category": "AppleError",
log.ErrorKey: responseWithMetadata.Reason,
}).Debug("received an error")
default:
l.WithFields(log.Fields{
"category": "DefaultError",
log.ErrorKey: responseWithMetadata.Reason,
}).Debug("received an error")
responseWithMetadata.Metadata["deleteToken"] = true
}
sendFeedbackErr := sendToFeedbackReporters(a.feedbackReporters, responseWithMetadata, parsedTopic)

sendFeedbackErr := extensions.SendToFeedbackReporters(a.feedbackReporters, responseWithMetadata, parsedTopic)
if sendFeedbackErr != nil {
l.WithError(sendFeedbackErr).Error("error sending feedback to reporter")
}
return err
}

// LogStats from time to time.
func (a *APNSMessageHandler) LogStats() {
l := a.Logger.WithFields(log.Fields{
"method": "apnsMessageHandler.logStats",
"interval(ns)": a.LogStatsInterval,
})

ticker := time.NewTicker(a.LogStatsInterval)
for range ticker.C {
apnsResMutex.Lock()
if a.sentMessages > 0 || a.responsesReceived > 0 || a.ignoredMessages > 0 || a.successesReceived > 0 || a.failuresReceived > 0 {
l.WithFields(log.Fields{
"sentMessages": a.sentMessages,
"ignoredMessages": a.ignoredMessages,
"responsesReceived": a.responsesReceived,
"successesReceived": a.successesReceived,
"failuresReceived": a.failuresReceived,
}).Info("flushing stats")
a.sentMessages = 0
a.responsesReceived = 0
a.ignoredMessages = 0
a.successesReceived = 0
a.failuresReceived = 0
}
apnsResMutex.Unlock()
}
return nil
}

func (a *APNSMessageHandler) mapErrorReason(reason string) string {
Expand Down
Loading
Loading