diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index e9856cc5dc..70b72bdd16 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -21,16 +21,15 @@ import ( "fmt" "github.com/getsentry/sentry-go" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" - "github.com/sirupsen/logrus" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/storage/shared" "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/sirupsen/logrus" ) // updateLatestEvents updates the list of latest events for this room in the database and writes the @@ -58,7 +57,7 @@ func (r *Inputer) updateLatestEvents( transactionID *api.TransactionID, rewritesState bool, historyVisibility gomatrixserverlib.HistoryVisibility, -) (err error) { +) error { trace, ctx := internal.StartRegion(ctx, "updateLatestEvents") defer trace.EndRegion() @@ -70,25 +69,60 @@ func (r *Inputer) updateLatestEvents( defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err) + // If the event has already been written to the output log then we + // don't need to do anything, as we've handled it already. + hasBeenSent, err := updater.HasEventBeenSent(stateAtEvent.EventNID) + if err != nil { + return fmt.Errorf("u.updater.HasEventBeenSent: %w", err) + } + if hasBeenSent { + return nil + } + u := latestEventsUpdater{ - ctx: ctx, - api: r, - updater: updater, - roomInfo: roomInfo, - stateAtEvent: stateAtEvent, - event: event, - sendAsServer: sendAsServer, - transactionID: transactionID, - rewritesState: rewritesState, - historyVisibility: historyVisibility, + api: r, + updater: updater, + stateAtEvent: stateAtEvent, + event: event, + rewritesState: rewritesState, } - if err = u.doUpdateLatestEvents(); err != nil { + var updates []api.OutputEvent + updates, err = u.doUpdateLatestEvents(ctx, roomInfo) + if err != nil { return fmt.Errorf("u.doUpdateLatestEvents: %w", err) } + update, err := u.makeOutputNewRoomEvent(ctx, transactionID, sendAsServer, updater.LastEventIDSent(), historyVisibility) + if err != nil { + return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err) + } + updates = append(updates, *update) + + // Send the event to the output logs. + // We do this inside the database transaction to ensure that we only mark an event as sent if we sent it. + // (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but + // the write to the output log succeeds) + // TODO: This assumes that writing the event to the output log is synchronous. It should be possible to + // send the event asynchronously but we would need to ensure that 1) the events are written to the log in + // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the + // necessary bookkeeping we'll keep the event sending synchronous for now. + if len(updates) > 0 { + if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID().String(), updates); err != nil { + return fmt.Errorf("u.api.WriteOutputEvents: %w", err) + } + + if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil { + return fmt.Errorf("u.updater.MarkEventAsSent: %w", err) + } + } + + if err = u.updater.SetLatestEvents(roomInfo.RoomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil { + return fmt.Errorf("u.updater.SetLatestEvents: %w", err) + } + succeeded = true - return + return nil } // latestEventsUpdater tracks the state used to update the latest events in the @@ -96,18 +130,11 @@ func (r *Inputer) updateLatestEvents( // The state could be passed using function arguments, but it becomes impractical // when there are so many variables to pass around. type latestEventsUpdater struct { - ctx context.Context api *Inputer updater *shared.RoomUpdater - roomInfo *types.RoomInfo stateAtEvent types.StateAtEvent event gomatrixserverlib.PDU - transactionID *api.TransactionID rewritesState bool - // Which server to send this event as. - sendAsServer string - // The eventID of the event that was processed before this one. - lastEventIDSent string // The latest events in the room after processing this event. oldLatest types.StateAtEventAndReferences latest types.StateAtEventAndReferences @@ -122,13 +149,9 @@ type latestEventsUpdater struct { // The snapshots of current state before and after processing this event oldStateNID types.StateSnapshotNID newStateNID types.StateSnapshotNID - // The history visibility of the event itself (from the state before the event). - historyVisibility gomatrixserverlib.HistoryVisibility } -func (u *latestEventsUpdater) doUpdateLatestEvents() error { - u.lastEventIDSent = u.updater.LastEventIDSent() - +func (u *latestEventsUpdater) doUpdateLatestEvents(ctx context.Context, roomInfo *types.RoomInfo) ([]api.OutputEvent, error) { // If we are doing a regular event update then we will get the // previous latest events to use as a part of the calculation. If // we are overwriting the latest events because we have a complete @@ -141,17 +164,10 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { u.oldLatest = u.updater.LatestEvents() } - // If the event has already been written to the output log then we - // don't need to do anything, as we've handled it already. - if hasBeenSent, err := u.updater.HasEventBeenSent(u.stateAtEvent.EventNID); err != nil { - return fmt.Errorf("u.updater.HasEventBeenSent: %w", err) - } else if hasBeenSent { - return nil - } - // Work out what the latest events are. This will include the new // event if it is not already referenced. extremitiesChanged, err := u.calculateLatest( + ctx, u.oldLatest, u.event, types.StateAtEventAndReference{ EventID: u.event.EventID(), @@ -159,61 +175,35 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { }, ) if err != nil { - return fmt.Errorf("u.calculateLatest: %w", err) + return nil, fmt.Errorf("u.calculateLatest: %w", err) } // Now that we know what the latest events are, it's time to get the // latest state. - var updates []api.OutputEvent + var membershipUpdates []api.OutputEvent if extremitiesChanged || u.rewritesState { - if err = u.latestState(); err != nil { - return fmt.Errorf("u.latestState: %w", err) + if err = u.latestState(ctx, roomInfo); err != nil { + return nil, fmt.Errorf("u.latestState: %w", err) } // If we need to generate any output events then here's where we do it. // TODO: Move this! - if updates, err = u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added); err != nil { - return fmt.Errorf("u.api.updateMemberships: %w", err) + if membershipUpdates, err = u.api.updateMemberships(ctx, u.updater, u.removed, u.added); err != nil { + return nil, fmt.Errorf("u.api.updateMemberships: %w", err) } } else { u.newStateNID = u.oldStateNID } - if err = u.updater.SetLatestEvents(u.roomInfo.RoomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil { - return fmt.Errorf("u.updater.SetLatestEvents: %w", err) - } - - update, err := u.makeOutputNewRoomEvent() - if err != nil { - return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err) - } - updates = append(updates, *update) - - // Send the event to the output logs. - // We do this inside the database transaction to ensure that we only mark an event as sent if we sent it. - // (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but - // the write to the output log succeeds) - // TODO: This assumes that writing the event to the output log is synchronous. It should be possible to - // send the event asynchronously but we would need to ensure that 1) the events are written to the log in - // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the - // necessary bookkeeping we'll keep the event sending synchronous for now. - if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID().String(), updates); err != nil { - return fmt.Errorf("u.api.WriteOutputEvents: %w", err) - } - - if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil { - return fmt.Errorf("u.updater.MarkEventAsSent: %w", err) - } - - return nil + return membershipUpdates, nil } -func (u *latestEventsUpdater) latestState() error { - trace, ctx := internal.StartRegion(u.ctx, "processEventWithMissingState") +func (u *latestEventsUpdater) latestState(ctx context.Context, roomInfo *types.RoomInfo) error { + trace, ctx := internal.StartRegion(ctx, "processEventWithMissingState") defer trace.EndRegion() var err error - roomState := state.NewStateResolution(u.updater, u.roomInfo, u.api.Queryer) + roomState := state.NewStateResolution(u.updater, roomInfo, u.api.Queryer) // Work out if the state at the extremities has actually changed // or not. If they haven't then we won't bother doing all of the @@ -289,23 +279,27 @@ func (u *latestEventsUpdater) latestState() error { if removed := len(u.removed) - len(u.added); !u.rewritesState && removed > 0 { logrus.WithFields(logrus.Fields{ - "event_id": u.event.EventID(), - "room_id": u.event.RoomID().String(), - "old_state_nid": u.oldStateNID, - "new_state_nid": u.newStateNID, - "old_latest": u.oldLatest.EventIDs(), - "new_latest": u.latest.EventIDs(), + "event_id": u.event.EventID(), + "room_id": u.event.RoomID().String(), + "old_state_nid": u.oldStateNID, + "new_state_nid": u.newStateNID, + "old_latest": u.oldLatest.EventIDs(), + "new_latest": u.latest.EventIDs(), + "rewrites_state": u.rewritesState, + "state_at_event": fmt.Sprintf("%#v", u.stateAtEvent), }).Warnf("State reset detected (removing %d events)", removed) sentry.WithScope(func(scope *sentry.Scope) { scope.SetLevel("warning") scope.SetTag("room_id", u.event.RoomID().String()) scope.SetContext("State reset", map[string]interface{}{ - "Event ID": u.event.EventID(), - "Old state NID": fmt.Sprintf("%d", u.oldStateNID), - "New state NID": fmt.Sprintf("%d", u.newStateNID), - "Old latest": u.oldLatest.EventIDs(), - "New latest": u.latest.EventIDs(), - "State removed": removed, + "Event ID": u.event.EventID(), + "Old state NID": fmt.Sprintf("%d", u.oldStateNID), + "New state NID": fmt.Sprintf("%d", u.newStateNID), + "Old latest": u.oldLatest.EventIDs(), + "New latest": u.latest.EventIDs(), + "State removed": removed, + "State rewritten": fmt.Sprintf("%v", u.rewritesState), + "State at event": fmt.Sprintf("%#v", u.stateAtEvent), }) sentry.CaptureMessage("State reset detected") }) @@ -326,11 +320,12 @@ func (u *latestEventsUpdater) latestState() error { // calculateLatest works out the new set of forward extremities. Returns // true if the new event is included in those extremites, false otherwise. func (u *latestEventsUpdater) calculateLatest( + ctx context.Context, oldLatest []types.StateAtEventAndReference, newEvent gomatrixserverlib.PDU, newStateAndRef types.StateAtEventAndReference, ) (bool, error) { - trace, _ := internal.StartRegion(u.ctx, "calculateLatest") + trace, _ := internal.StartRegion(ctx, "calculateLatest") defer trace.EndRegion() // First of all, get a list of all of the events in our current @@ -387,7 +382,13 @@ func (u *latestEventsUpdater) calculateLatest( return true, nil } -func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) { +func (u *latestEventsUpdater) makeOutputNewRoomEvent( + ctx context.Context, + transactionID *api.TransactionID, + sendAsServer string, + lastEventIDSent string, + historyVisibility gomatrixserverlib.HistoryVisibility, +) (*api.OutputEvent, error) { latestEventIDs := make([]string, len(u.latest)) for i := range u.latest { latestEventIDs[i] = u.latest[i].EventID @@ -396,14 +397,14 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) ore := api.OutputNewRoomEvent{ Event: &types.HeaderedEvent{PDU: u.event}, RewritesState: u.rewritesState, - LastSentEventID: u.lastEventIDSent, + LastSentEventID: lastEventIDSent, LatestEventIDs: latestEventIDs, - TransactionID: u.transactionID, - SendAsServer: u.sendAsServer, - HistoryVisibility: u.historyVisibility, + TransactionID: transactionID, + SendAsServer: sendAsServer, + HistoryVisibility: historyVisibility, } - eventIDMap, err := u.stateEventMap() + eventIDMap, err := u.stateEventMap(ctx) if err != nil { return nil, err } @@ -427,7 +428,7 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) } // retrieve an event nid -> event ID map for all events that need updating -func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error) { +func (u *latestEventsUpdater) stateEventMap(ctx context.Context) (map[types.EventNID]string, error) { cap := len(u.added) + len(u.removed) + len(u.stateBeforeEventRemoves) + len(u.stateBeforeEventAdds) stateEventNIDs := make(types.EventNIDs, 0, cap) allStateEntries := make([]types.StateEntry, 0, cap) @@ -439,5 +440,5 @@ func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error) stateEventNIDs = append(stateEventNIDs, entry.EventNID) } stateEventNIDs = stateEventNIDs[:util.SortAndUnique(stateEventNIDs)] - return u.updater.EventIDs(u.ctx, stateEventNIDs) + return u.updater.EventIDs(ctx, stateEventNIDs) }