Skip to content

Commit

Permalink
fix: mark message as found
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Aug 8, 2024
1 parent a9d0eaf commit 0c3ceef
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
22 changes: 13 additions & 9 deletions cmd/storemsgcounter/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,13 @@ func Execute(ctx context.Context, options Options) error {
runIdLogger := logger.With(zap.String("syncRunId", runId))
runIdLogger.Info("rechecking missing messages status")

err := application.checkMissingMessageStatus(ctx, runId, runIdLogger)
err := application.checkMissingMessageStatus(ctx, storenodeIDs, runId, runIdLogger)
if err != nil {
logger.Error("could not recheck the status of missing messages", zap.Error(err))
return
}

err = application.countMissingMessages()
err = application.countMissingMessages(storenodeIDs)
if err != nil {
logger.Error("could not count missing messages", zap.Error(err))
return
Expand Down Expand Up @@ -338,7 +338,7 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno
return nil
}

func (app *Application) checkMissingMessageStatus(ctx context.Context, runId string, logger *zap.Logger) error {
func (app *Application) checkMissingMessageStatus(ctx context.Context, storenodes []peer.ID, runId string, logger *zap.Logger) error {
now := app.node.Timesource().Now()

// Get all messages whose status is missing or does not exist, and the column found_on_recheck is false
Expand All @@ -349,7 +349,8 @@ func (app *Application) checkMissingMessageStatus(ctx context.Context, runId str
}

wg := sync.WaitGroup{}
for storenodeID, messageHashes := range missingMessages {

for _, storenodeID := range storenodes {
wg.Add(1)
go func(peerID peer.ID, messageHashes []pb.MessageHash) {
defer wg.Done()
Expand All @@ -369,15 +370,14 @@ func (app *Application) checkMissingMessageStatus(ctx context.Context, runId str

app.metrics.RecordMissingMessagesPrevHour(peerID, len(messageHashes)-len(foundMissingMessages))

}(storenodeID, messageHashes)
}(storenodeID, missingMessages[storenodeID])
}

wg.Wait()

return nil
}

func (app *Application) countMissingMessages() error {
func (app *Application) countMissingMessages(storenodes []peer.ID) error {

// not including last two hours in now to let sync work
now := app.node.Timesource().Now().Add(-2 * time.Hour)
Expand All @@ -396,8 +396,8 @@ func (app *Application) countMissingMessages() error {
if err != nil {
return err
}
for storenode, cnt := range results {
app.metrics.RecordMissingMessagesLastWeek(storenode, cnt)
for _, storenodeID := range storenodes {
app.metrics.RecordMissingMessagesLastWeek(storenodeID, results[storenodeID])
}
return nil
}
Expand Down Expand Up @@ -539,6 +539,10 @@ func (app *Application) verifyMessageExistence(ctx context.Context, runId string
var result *store.Result
var err error

if len(messageHashes) == 0 {
return
}

peerInfo := app.node.Host().Peerstore().PeerInfo(peerID)

queryLogger := logger.With(zap.Stringer("storenode", peerID))
Expand Down
8 changes: 6 additions & 2 deletions internal/persistence/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (d *DBStore) GetTopicSyncStatus(ctx context.Context, clusterID uint, pubsub
}

func (d *DBStore) GetMissingMessages(from time.Time, to time.Time, clusterID uint) (map[peer.ID][]pb.MessageHash, error) {
rows, err := d.db.Query("SELECT messageHash, storenode FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist'", from.UnixNano(), to.UnixNano(), clusterID)
rows, err := d.db.Query("SELECT messageHash, storenode FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' AND fundOnRecheck = false", from.UnixNano(), to.UnixNano(), clusterID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -307,6 +307,10 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash,
}

func (d *DBStore) MarkMessagesAsFound(peerID peer.ID, messageHashes []pb.MessageHash, clusterID uint) error {
if len(messageHashes) == 0 {
return nil
}

query := "UPDATE missingMessages SET foundOnRecheck = true WHERE clusterID = $1 AND messageHash IN ("
for i := range messageHashes {
if i > 0 {
Expand Down Expand Up @@ -343,7 +347,7 @@ func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode peer.ID) err
}

func (d *DBStore) CountMissingMessages(from time.Time, to time.Time, clusterID uint) (map[peer.ID]int, error) {
rows, err := d.db.Query("SELECT storenode, count(1) as cnt FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' GROUP BY storenode", from.UnixNano(), to.UnixNano(), clusterID)
rows, err := d.db.Query("SELECT storenode, count(1) as cnt FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' AND foundOnRecheck = false GROUP BY storenode", from.UnixNano(), to.UnixNano(), clusterID)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 0c3ceef

Please sign in to comment.