diff --git a/changelog.d/17893.bugfix b/changelog.d/17893.bugfix new file mode 100644 index 00000000000..9df676e35c4 --- /dev/null +++ b/changelog.d/17893.bugfix @@ -0,0 +1 @@ +Fix a bug where all messages from a server could be blocked because of one bad event. Contributed by @morguldir. diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 2e56b671f06..d195755655b 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -405,7 +405,15 @@ def __init__( for name, sigs in event_dict.pop("signatures", {}).items() } - assert "event_id" not in event_dict + # In newer room versions (3+), the `event_id` is derived from a hash of the + # event canonical JSON, so it should not be explicitly provided in the event + # dictionary. + # + # If we see an `event_id` in a newer room version, then it's an invalid event + # and we should reject it. + assert ( + "event_id" not in event_dict + ), "Event ID should not be provided for events in non-v1/v2 room versions" unsigned = dict(event_dict.pop("unsigned", {})) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 1932fa82a4a..8b10ad368ec 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -56,7 +56,10 @@ SynapseError, UnsupportedRoomVersionError, ) -from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion +from synapse.api.room_versions import ( + KNOWN_ROOM_VERSIONS, + RoomVersion, +) from synapse.crypto.event_signing import compute_event_signature from synapse.events import EventBase from synapse.events.snapshot import EventContext @@ -129,6 +132,8 @@ # federation. _INBOUND_EVENT_HANDLING_LOCK_NAME = "federation_inbound_pdu" +_UNKNOWN_EVENT_ID = "" + class FederationServer(FederationBase): def __init__(self, hs: "HomeServer"): @@ -432,6 +437,8 @@ async def _handle_pdus_in_txn( newest_pdu_ts = 0 + pdu_results = {} + for p in transaction.pdus: # FIXME (richardv): I don't think this works: # https://github.com/matrix-org/synapse/issues/8429 @@ -446,7 +453,7 @@ async def _handle_pdus_in_txn( # We try and pull out an event ID so that if later checks fail we # can log something sensible. We don't mandate an event ID here in # case future event formats get rid of the key. - possible_event_id = p.get("event_id", "") + possible_event_id = p.get("event_id", _UNKNOWN_EVENT_ID) # Now we get the room ID so that we can check that we know the # version of the room. @@ -469,14 +476,31 @@ async def _handle_pdus_in_txn( logger.info("Ignoring PDU: %s", e) continue - event = event_from_pdu_json(p, room_version) + try: + event = event_from_pdu_json(p, room_version) + except Exception as e: + # We can only provide feedback to the federating server if we can + # determine what the `event_id` is but since we failed to parse the + # event, we can't derive the `event_id` so there is nothing to use as + # the `pdu_results` key. Best we can do is just log for our own record + # and move on. + + # If an `event_id` happened to be provided in the + # event dictionary, then use that. + if possible_event_id != _UNKNOWN_EVENT_ID: + pdu_results[possible_event_id] = { + "error": f"Failed to convert JSON into event: {e}" + } + logger.warning( + f"Failed to parse event {possible_event_id} in transaction from {origin}, due to: {e}" + ) + continue + pdus_by_room.setdefault(room_id, []).append(event) if event.origin_server_ts > newest_pdu_ts: newest_pdu_ts = event.origin_server_ts - pdu_results = {} - # we can process different rooms in parallel (which is useful if they # require callouts to other servers to fetch missing events), but # impose a limit to avoid going too crazy with ram/cpu. diff --git a/tests/federation/test_federation_server.py b/tests/federation/test_federation_server.py index 88261450b1f..2858c01a85e 100644 --- a/tests/federation/test_federation_server.py +++ b/tests/federation/test_federation_server.py @@ -25,9 +25,12 @@ from twisted.test.proto_helpers import MemoryReactor -from synapse.api.room_versions import KNOWN_ROOM_VERSIONS +from synapse.api.constants import EventTypes +from synapse.api.errors import NotFoundError +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions from synapse.config.server import DEFAULT_ROOM_VERSION from synapse.events import EventBase, make_event_from_dict +from synapse.events.builder import EventBuilder from synapse.rest import admin from synapse.rest.client import login, room from synapse.server import HomeServer @@ -84,6 +87,79 @@ async def failing_handler(_origin: str, _content: JsonDict) -> None: ) self.assertEqual(500, channel.code, channel.result) + def test_accept_valid_pdus_and_ignore_invalid(self) -> None: + """ + Test to make sure that old v1/v2 formatted events (that include `event_id`) are + rejected from a newer room version that don't support it but we still accept + properly formatted/valid events from the same batch. + """ + user = self.register_user("user1", "test") + tok = self.login("user1", "test") + room_id = self.helper.create_room_as("user1", tok=tok) + + def builder(message: str) -> EventBuilder: + return self.hs.get_event_builder_factory().for_room_version( + RoomVersions.V10, + { + "type": EventTypes.Message, + "sender": user, + "room_id": room_id, + "content": {"body": message, "msgtype": "m.text"}, + }, + ) + + def make_event(message: str) -> EventBase: + event, _ = self.get_success( + self.hs.get_event_creation_handler().create_new_client_event( + builder(message), + ) + ) + return event + + event1 = make_event("event1") + event2 = make_event("event2") + event3 = make_event("event3") + event1_json = event1.get_pdu_json() + event2_json = event2.get_pdu_json() + event3_json = event3.get_pdu_json() + + # Purposely adding `event_id` that shouldn't be there + event2_json["event_id"] = event2.event_id + + channel = self.make_signed_federation_request( + "PUT", + "/_matrix/federation/v1/send/txn", + {"pdus": [event1_json, event2_json, event3_json]}, + ) + body = channel.json_body + # Ensure the response indicates an error for the corrupt event + # and that it indicates success for valid events + pdus: JsonDict = body["pdus"] + self.assertIncludes( + set(pdus.keys()), + {event1.event_id, event2.event_id, event3.event_id}, + exact=True, + ) + self.assertEqual(pdus[event1.event_id], {}) + self.assertNotEqual(pdus[event2.event_id]["error"], "") + self.assertEqual(pdus[event3.event_id], {}) + + # Make sure other valid events from the send transaction were persisted successfully + self.get_success( + self.hs.get_storage_controllers().main.get_event(event1.event_id) + ) + + # Make sure the corrupt event isn't persisted + self.get_failure( + self.hs.get_storage_controllers().main.get_event(event2.event_id), + NotFoundError, + ) + + # Verify that we continue looking at events that come after the corrupted one + self.get_success( + self.hs.get_storage_controllers().main.get_event(event3.event_id) + ) + class ServerACLsTestCase(unittest.TestCase): def test_blocked_server(self) -> None: