Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrency issues and data race on tests #53

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
f1ea19a
Fix data races wip
miguelreiswildlife Apr 8, 2024
abee50a
wip
miguelreiswildlife Apr 9, 2024
54c88be
wip
miguelreiswildlife Apr 9, 2024
c208102
wip
miguelreiswildlife Apr 9, 2024
0fe1cf9
wip
miguelreiswildlife Apr 9, 2024
c643c4e
wip
miguelreiswildlife Apr 9, 2024
208763f
wip
miguelreiswildlife Apr 9, 2024
40b6be8
Fix timeout
miguelreiswildlife Apr 9, 2024
7e632d1
fix test
miguelreiswildlife Apr 9, 2024
b47b27e
Add missing lock
miguelreiswildlife Apr 9, 2024
c407513
Rollback to lower case
miguelreiswildlife Apr 10, 2024
1cb6161
Remove commented code
miguelreiswildlife Apr 10, 2024
9493f8f
Add missing lock/unlock
miguelreiswildlife Apr 10, 2024
75ce4ec
Use single lock/unlock on invalid_token_handler
miguelreiswildlife Apr 10, 2024
c9c08f5
Add missing lock/unlock
miguelreiswildlife Apr 10, 2024
19af568
Prevent deadlock on retry
miguelreiswildlife Apr 11, 2024
6e9f015
More logs on apns initialization
miguelreiswildlife Apr 11, 2024
518cf23
More logs on apns client
miguelreiswildlife Apr 11, 2024
ed9430e
Try fix infinite loop
miguelreiswildlife Apr 11, 2024
ccd487a
Fix mock
miguelreiswildlife Apr 11, 2024
48e9947
More logging
miguelreiswildlife Apr 12, 2024
6187d5c
More logs
miguelreiswildlife Apr 12, 2024
49480fa
Call Resume only when Pause is called
miguelreiswildlife Apr 15, 2024
4e75896
Remove pause + resume
miguelreiswildlife Apr 16, 2024
7f40896
Fix data races
miguelreiswildlife Apr 16, 2024
0613681
Some more data race fixes
miguelreiswildlife Apr 18, 2024
2e30896
Add e2e tests for apns
miguelreiswildlife Apr 23, 2024
f9caa82
Fix test
miguelreiswildlife Apr 23, 2024
8e312ed
Try fix test on CI
miguelreiswildlife Apr 30, 2024
27bd099
Try fix tests
miguelreiswildlife Apr 30, 2024
253b940
Fix test
miguelreiswildlife Apr 30, 2024
54f7688
Fix tests
miguelreiswildlife Apr 30, 2024
9a39608
Fix tests
miguelreiswildlife Apr 30, 2024
6d02dd7
Fix tests
miguelreiswildlife Apr 30, 2024
a3084df
Fix tests
miguelreiswildlife Apr 30, 2024
b3a459a
Fix tests
miguelreiswildlife May 2, 2024
313b638
Fix tests
miguelreiswildlife May 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions extensions/apns_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type APNSMessageHandler struct {
teamID string
appName string
PushQueue interfaces.APNSPushQueue
Topic string
ApnsTopic string
Config *viper.Viper
failuresReceived int64
InFlightNotificationsMap map[string]*inFlightNotification
Expand Down Expand Up @@ -99,7 +99,7 @@ func NewAPNSMessageHandler(
authKeyPath: authKeyPath,
keyID: keyID,
teamID: teamID,
Topic: topic,
ApnsTopic: topic,
appName: appName,
Config: config,
failuresReceived: 0,
Expand Down Expand Up @@ -203,7 +203,11 @@ func (a *APNSMessageHandler) HandleMessages(ctx context.Context, message interfa
return
}
statsReporterHandleNotificationSent(a.StatsReporters, a.appName, "apns")

apnsResMutex.Lock()
a.sentMessages++
apnsResMutex.Unlock()

a.inFlightNotificationsMapLock.Lock()
ifn := &inFlightNotification{
notification: notification,
Expand Down Expand Up @@ -261,7 +265,7 @@ func (a *APNSMessageHandler) sendNotification(notification *Notification) error
}
l.WithField("notification", notification).Debug("adding notification to apns push queue")
a.PushQueue.Push(&apns2.Notification{
Topic: a.Topic,
Topic: a.ApnsTopic,
DeviceToken: notification.DeviceToken,
Payload: payload,
ApnsID: notification.ApnsID,
Expand All @@ -288,20 +292,34 @@ func (a *APNSMessageHandler) handleAPNSResponse(responseWithMetadata *structs.Re
sendAttempts := inFlightNotificationInstance.sendAttempts.Load()
if responseWithMetadata.Reason == apns2.ReasonTooManyRequests &&
uint(sendAttempts) < a.maxRetryAttempts {
a.consumptionManager.Pause(inFlightNotificationInstance.kafkaTopic)
l.WithFields(log.Fields{
"sendAttempts": sendAttempts,
"maxRetries": a.maxRetryAttempts,
"apnsID": responseWithMetadata.ApnsID,
}).Debug("retrying notification")

err := a.consumptionManager.Pause(inFlightNotificationInstance.kafkaTopic)
if err != nil {
l.WithError(err).Error("error pausing consumption")
}
inFlightNotificationInstance.sendAttempts.Add(1)
<-time.After(a.retryInterval)
if err := a.sendNotification(inFlightNotificationInstance.notification); err == nil {
return nil
}
}
if uint(sendAttempts) > 0 {
a.consumptionManager.Resume(inFlightNotificationInstance.kafkaTopic)
err := a.consumptionManager.Resume(inFlightNotificationInstance.kafkaTopic)
if err != nil {
l.WithError(err).Error("error resuming consumption")
}
}
responseWithMetadata.Metadata = inFlightNotificationInstance.notification.Metadata
responseWithMetadata.Timestamp = responseWithMetadata.Metadata["timestamp"].(int64)
delete(responseWithMetadata.Metadata, "timestamp")
a.inFlightNotificationsMapLock.Lock()
delete(a.InFlightNotificationsMap, responseWithMetadata.ApnsID)
a.inFlightNotificationsMapLock.Unlock()
}

apnsResMutex.Lock()
Expand Down
Loading
Loading