diff --git a/pkg/sync/syncWorker.go b/pkg/sync/syncWorker.go index 68e14360..ca0a3630 100644 --- a/pkg/sync/syncWorker.go +++ b/pkg/sync/syncWorker.go @@ -12,6 +12,7 @@ import ( "github.com/xmtp/xmtpd/pkg/db/queries" envUtils "github.com/xmtp/xmtpd/pkg/envelopes" clientInterceptors "github.com/xmtp/xmtpd/pkg/interceptors/client" + "github.com/xmtp/xmtpd/pkg/misbehavior" "github.com/xmtp/xmtpd/pkg/proto/xmtpv4/envelopes" "github.com/xmtp/xmtpd/pkg/proto/xmtpv4/message_api" "github.com/xmtp/xmtpd/pkg/registrant" @@ -31,6 +32,7 @@ type syncWorker struct { subscriptions map[uint32]struct{} subscriptionsMutex sync.RWMutex cancel context.CancelFunc + misbehaviorService misbehavior.MisbehaviorService } type originatorStream struct { @@ -58,14 +60,15 @@ func startSyncWorker( ctx, cancel := context.WithCancel(ctx) s := &syncWorker{ - ctx: ctx, - log: log.Named("syncWorker"), - nodeRegistry: nodeRegistry, - registrant: registrant, - store: store, - wg: sync.WaitGroup{}, - subscriptions: make(map[uint32]struct{}), - cancel: cancel, + ctx: ctx, + log: log.Named("syncWorker"), + nodeRegistry: nodeRegistry, + registrant: registrant, + store: store, + wg: sync.WaitGroup{}, + subscriptions: make(map[uint32]struct{}), + cancel: cancel, + misbehaviorService: misbehavior.NewLoggingMisbehaviorService(log), } if err := s.start(); err != nil { return nil, err @@ -362,8 +365,16 @@ func (s *syncWorker) validateAndInsertEnvelope( lastNs = stream.lastEnvelope.OriginatorNs() } if env.OriginatorSequenceID() != lastSequenceID+1 || env.OriginatorNs() < lastNs { - // TODO(rich) Submit misbehavior report and continue - s.log.Error("Received out of order envelope") + if err = s.misbehaviorService.SafetyFailure( + misbehavior.NewSafetyFailureReport( + stream.nodeID, + message_api.Misbehavior_MISBEHAVIOR_OUT_OF_ORDER, + true, + []*envUtils.OriginatorEnvelope{env, stream.lastEnvelope}, + ), + ); err != nil { + s.log.Debug("Failed to submit misbehavior report", zap.Error(err)) + } } if env.OriginatorSequenceID() > lastSequenceID {