Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
use a locking mechanism for the scheduler instead of the singleton mo…
Browse files Browse the repository at this point in the history
…del in the cron lib
  • Loading branch information
andremedeiros committed Jun 8, 2023
1 parent 0b48a13 commit 755a39e
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 10 deletions.
20 changes: 15 additions & 5 deletions internal/cmd/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@ import (
"github.com/christianselig/apollo-backend/internal/repository"
)

const batchSize = 250
const accountEnqueueSeconds = 60
const (
batchSize = 250
accountEnqueueSeconds = 60
)

var (
enqueueAccountsMutex sync.Mutex
)

func SchedulerCmd(ctx context.Context) *cobra.Command {
cmd := &cobra.Command{
Expand Down Expand Up @@ -103,9 +109,7 @@ func SchedulerCmd(ctx context.Context) *cobra.Command {
s := gocron.NewScheduler(time.UTC)
s.SetMaxConcurrentJobs(8, gocron.WaitMode)

eaj, _ := s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) })
eaj.SingletonMode()

_, _ = s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) })
_, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, []rmq.Queue{subredditQueue, trendingQueue}) })
_, _ = s.Every(5).Seconds().Do(func() { enqueueUsers(ctx, logger, statsd, db, userQueue) })
_, _ = s.Every(5).Seconds().Do(func() { enqueueLiveActivities(ctx, logger, db, redis, luaSha, liveActivitiesQueue) })
Expand Down Expand Up @@ -456,6 +460,12 @@ func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *stats
}

func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) {
if enqueueAccountsMutex.TryLock() {
defer enqueueAccountsMutex.Unlock()
} else {
return
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down
2 changes: 1 addition & 1 deletion internal/itunes/receipt.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func NewIAPResponse(receipt string, production bool) (*IAPResponse, error) {

verificationPayload := map[string]string{
"receipt-data": receipt,
"password": "***REMOVED***",
"password": "0c03b67512c5445d89dc70b81b02959c",
}

bb, err := json.Marshal(verificationPayload)
Expand Down
4 changes: 2 additions & 2 deletions internal/reddit/testdata/refresh_token.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"access_token": "***REMOVED***",
"access_token": "xxx",
"token_type": "bearer",
"expires_in": 3600,
"refresh_token": "***REMOVED***",
"refresh_token": "yyy",
"scope": "account creddits edit flair history identity livemanage modconfig modcontributors modflair modlog modmail modothers modposts modself modtraffic modwiki mysubreddits privatemessages read report save structuredstyles submit subscribe vote wikiedit wikiread"
}
4 changes: 2 additions & 2 deletions internal/reddit/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func TestRefreshTokenResponseParsing(t *testing.T) {
rtr := ret.(*reddit.RefreshTokenResponse)
assert.NotNil(t, rtr)

assert.Equal(t, "***REMOVED***", rtr.AccessToken)
assert.Equal(t, "***REMOVED***", rtr.RefreshToken)
assert.Equal(t, "xxx", rtr.AccessToken)
assert.Equal(t, "yyy", rtr.RefreshToken)
assert.Equal(t, 1*time.Hour, rtr.Expiry)
}

Expand Down

0 comments on commit 755a39e

Please sign in to comment.