From b6de60cf586787dc0e0cd2d90c5357b64568378f Mon Sep 17 00:00:00 2001 From: mcalinghee Date: Fri, 17 Nov 2023 17:46:01 +0100 Subject: [PATCH] Default users permission should change to admin only on private/public room (#13) * code refactoring * users default permission on a room should to admin only on private/public room --- manage_last_admin/__init__.py | 81 ++++++++++-- tests/test_manage_last_admin.py | 215 +++++++++++++++++++++++++------- 2 files changed, 246 insertions(+), 50 deletions(-) diff --git a/manage_last_admin/__init__.py b/manage_last_admin/__init__.py index e360fad..1126236 100644 --- a/manage_last_admin/__init__.py +++ b/manage_last_admin/__init__.py @@ -14,7 +14,7 @@ # limitations under the License. import copy import logging -from typing import Any, Dict, Iterable, Optional, Tuple +from typing import Any, Dict, Final, Iterable, Optional, Tuple import attr from synapse.api.constants import EventTypes, Membership @@ -27,6 +27,22 @@ logger = logging.getLogger(__name__) +class RoomType: + DIRECT: Final = "DIRECT" + PUBLIC: Final = "PUBLIC" + PRIVATE: Final = "PRIVATE" + EXTERNAL: Final = "EXTERNAL" + UNKNOWN: Final = "UNKNOWN" + + +ACCESS_RULES_TYPE = "im.vector.room.access_rules" + + +class AccessRules: + RESTRICTED = "restricted" + UNRESTRICTED = "unrestricted" + + @attr.s(auto_attribs=True, frozen=True) class ManageLastAdminConfig: promote_moderators: bool = False @@ -128,9 +144,18 @@ async def _on_room_leave( # If not, we see the default power level as admin logger.info("Make admin as default level in room %s", event.room_id) + await self._set_room_users_default_to_admin(event, state_events) + return - current_power_levels = state_events.get((EventTypes.PowerLevels, "")) + async def _set_room_users_default_to_admin( + self, event: EventBase, state_events: StateMap[EventBase] + ) -> None: + # We make sure to change default permission only on public or private rooms + is_room_public_or_private = _is_room_public_or_private(state_events) + if not is_room_public_or_private: + return + current_power_levels = state_events.get((EventTypes.PowerLevels, "")) # Make a deep copy of the content so we don't edit the "users" dict from # the event that's currently in the room's state. power_levels_content = ( @@ -138,11 +163,9 @@ async def _on_room_leave( if current_power_levels is None else copy.deepcopy(current_power_levels.content) ) - # Send a new power levels event with a similar content to the previous one # except users_default is 100 to allow any user to be admin of the room. power_levels_content["users_default"] = 100 - # Just to be safe, also delete all users that don't have a power level of # 100, in order to prevent anyone from being unable to be admin the room. # Julien : I am not why it's needed @@ -151,7 +174,6 @@ async def _on_room_leave( if level == 100: users[user] = level power_levels_content["users"] = users - await self._api.create_and_send_event_into_room( { "room_id": event.room_id, @@ -165,8 +187,6 @@ async def _on_room_leave( } ) - return - async def _promote_to_admins( self, users_to_promote: Iterable[str], @@ -213,6 +233,53 @@ def _maybe_get_event_id_dict_for_room_version( return {"event_id": "!%s:%s" % (random_id, server_name)} +def _is_room_encrypted( + state_events: StateMap[EventBase], +) -> bool: + room_encryption = state_events.get((EventTypes.RoomEncryption, "")) + if room_encryption: + return True + return False + + +def _get_access_rule_type( + state_events: StateMap[EventBase], +) -> Optional[Any]: + access_rule_type_event = state_events.get((ACCESS_RULES_TYPE, "")) + if access_rule_type_event is None: + return None + return access_rule_type_event.content["rule"] + + +def _is_room_public_or_private( + state_events: StateMap[EventBase], +) -> bool: + """Checks if the room is public or private + + Args: + state_events: The current state of the room, from which we can check the room's type. + + Returns: + True if this room is public or private otherwise false. + """ + room_type = _get_room_type(state_events) + return room_type in [RoomType.PRIVATE, RoomType.PUBLIC] + + +def _get_room_type( + state_events: StateMap[EventBase], +) -> str: + is_room_encrypted = _is_room_encrypted(state_events) + if not is_room_encrypted: + return RoomType.PUBLIC + access_rule_type = _get_access_rule_type(state_events) + if access_rule_type == AccessRules.RESTRICTED: + return RoomType.PRIVATE + if access_rule_type == AccessRules.UNRESTRICTED: + return RoomType.EXTERNAL + return RoomType.UNKNOWN + + def _is_last_admin_leaving( event: EventBase, power_level_content: Dict[str, Any], diff --git a/tests/test_manage_last_admin.py b/tests/test_manage_last_admin.py index 64a758c..71469c3 100644 --- a/tests/test_manage_last_admin.py +++ b/tests/test_manage_last_admin.py @@ -23,9 +23,10 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.room_versions import RoomVersions from synapse.events import EventBase, make_event_from_dict -from synapse.types import JsonDict +from synapse.types import JsonDict, MutableStateMap from synapse.util.stringutils import random_string +from manage_last_admin import ACCESS_RULES_TYPE from tests import create_module @@ -39,8 +40,12 @@ def setUp(self) -> None: self.user_id = "@alice:example.com" self.left_user_id = "@nothere:example.com" self.mod_user_id = "@mod:example.com" + self.regular_user_id = "@someuser:example.com" self.room_id = "!someroom:example.com" - self.state = { + self.state = self.get_public_room() + + def get_basic_room_state(self) -> MutableStateMap[EventBase]: + return { (EventTypes.PowerLevels, ""): self.create_event( { "sender": self.user_id, @@ -67,18 +72,19 @@ def setUp(self) -> None: self.user_id: 100, self.left_user_id: 75, self.mod_user_id: 50, + self.regular_user_id: 0, }, "users_default": 0, }, "room_id": self.room_id, }, ), - (EventTypes.JoinRules, ""): self.create_event( + (EventTypes.Member, self.user_id): self.create_event( { "sender": self.user_id, - "type": EventTypes.JoinRules, - "state_key": "", - "content": {"join_rule": "public"}, + "type": EventTypes.Member, + "state_key": self.user_id, + "content": {"membership": Membership.JOIN}, "room_id": self.room_id, }, ), @@ -91,6 +97,15 @@ def setUp(self) -> None: "room_id": self.room_id, }, ), + (EventTypes.Member, self.regular_user_id): self.create_event( + { + "sender": self.regular_user_id, + "type": EventTypes.Member, + "state_key": self.regular_user_id, + "content": {"membership": Membership.JOIN}, + "room_id": self.room_id, + }, + ), (EventTypes.Member, self.left_user_id): self.create_event( { "sender": self.left_user_id, @@ -102,10 +117,107 @@ def setUp(self) -> None: ), } - async def test_power_levels_sent_when_last_admin_leaves(self) -> None: - """Tests that the module sends the right power levels update when it sees its last admin leave.""" - module = create_module() + def get_public_room(self) -> MutableStateMap[EventBase]: + state = self.get_basic_room_state() + return self.make_room_public(state) + + def get_private_room(self) -> MutableStateMap[EventBase]: + state = self.get_basic_room_state() + return self.make_room_private(state) + + def get_other_room(self) -> MutableStateMap[EventBase]: + state = self.get_basic_room_state() + return self.make_room_unknown(state) + + def make_room_public( + self, state: MutableStateMap[EventBase] + ) -> MutableStateMap[EventBase]: + state[(EventTypes.JoinRules, "")] = self.create_event( + { + "sender": self.user_id, + "type": EventTypes.JoinRules, + "state_key": "", + "content": {"join_rule": "public"}, + "room_id": self.room_id, + }, + ) + state[(ACCESS_RULES_TYPE, "")] = self.create_event( + { + "sender": self.user_id, + "type": ACCESS_RULES_TYPE, + "state_key": "", + "content": {"rule": "restricted"}, + "room_id": self.room_id, + } + ) + return state + + def make_room_private( + self, state: MutableStateMap[EventBase] + ) -> MutableStateMap[EventBase]: + state[(EventTypes.JoinRules, "")] = self.create_event( + { + "sender": self.user_id, + "type": EventTypes.JoinRules, + "state_key": "", + "content": {"join_rule": "invite"}, + "room_id": self.room_id, + }, + ) + state[(ACCESS_RULES_TYPE, "")] = self.create_event( + { + "sender": self.user_id, + "type": ACCESS_RULES_TYPE, + "state_key": "", + "content": {"rule": "restricted"}, + "room_id": self.room_id, + }, + ) + state[(EventTypes.RoomEncryption, "")] = self.create_event( + { + "sender": self.user_id, + "type": EventTypes.RoomEncryption, + "state_key": "", + "content": {"algorithm": "m.megolm.v1.aes-sha2"}, + "room_id": self.room_id, + }, + ) + return state + + def make_room_unknown( + self, state: MutableStateMap[EventBase] + ) -> MutableStateMap[EventBase]: + state[(EventTypes.JoinRules, "")] = self.create_event( + { + "sender": self.user_id, + "type": EventTypes.JoinRules, + "state_key": "", + "content": {"join_rule": "invite"}, + "room_id": self.room_id, + }, + ) + state[(ACCESS_RULES_TYPE, "")] = self.create_event( + { + "sender": self.user_id, + "type": ACCESS_RULES_TYPE, + "state_key": "", + "content": {"rule": "unrestricted"}, + "room_id": self.room_id, + }, + ) + state[(EventTypes.RoomEncryption, "")] = self.create_event( + { + "sender": self.user_id, + "type": EventTypes.RoomEncryption, + "state_key": "", + "content": {"algorithm": "m.megolm.v1.aes-sha2"}, + "room_id": self.room_id, + }, + ) + return state + async def do_set_room_users_default_when_last_admin_leaves(self) -> None: + module = create_module() leave_event = self.create_event( { "sender": self.user_id, @@ -115,31 +227,24 @@ async def test_power_levels_sent_when_last_admin_leaves(self) -> None: "state_key": self.user_id, }, ) - allowed, replacement = await module.check_event_allowed( leave_event, self.state ) self.assertTrue(allowed) self.assertEqual(replacement, None) - # Test that the leave triggered a freeze of the room. self.assertTrue(module._api.create_and_send_event_into_room.called) # type: ignore[attr-defined] args, _ = module._api.create_and_send_event_into_room.call_args # type: ignore[attr-defined] self.assertEqual(len(args), 1) - pl_event_dict = args[0] - self.assertEqual(pl_event_dict["content"]["users_default"], 100) + # We make sure that user with pl=100 remains for user, pl in pl_event_dict["content"]["users"].items(): self.assertEqual(pl, 100, user) - async def test_promote_when_last_admin_leaves(self) -> None: - """Tests that the module promotes whoever has the highest non-default PL to admin - when the last admin leaves, if the config allows it. - """ + async def do_promote_when_last_admin_leaves(self) -> None: # Set the config flag to allow promoting custom PLs before freezing the room. module = create_module(config_override={"promote_moderators": True}) - # Make the last admin leave. leave_event = self.create_event( { @@ -150,19 +255,16 @@ async def test_promote_when_last_admin_leaves(self) -> None: "state_key": self.user_id, }, ) - # Check that we get the right result back from the callback. allowed, replacement = await module.check_event_allowed( leave_event, self.state ) self.assertTrue(allowed) self.assertEqual(replacement, None) - # Test that a new event was sent into the room. self.assertTrue(module._api.create_and_send_event_into_room.called) # type: ignore[attr-defined] args, _ = module._api.create_and_send_event_into_room.call_args # type: ignore[attr-defined] self.assertEqual(len(args), 1) - # Test that: # * the event is a power levels update # * the user who is PL 75 but left the room didn't get promoted @@ -177,41 +279,68 @@ async def test_promote_when_last_admin_leaves(self) -> None: evt_dict["content"]["users"][self.mod_user_id], 100, evt_dict ) - # Now we push both the leave event and the power levels update into the state of - # the room. - self.state[(EventTypes.Member, self.user_id)] = leave_event - self.state[(EventTypes.PowerLevels, "")] = self.create_event(evt_dict) - - # Make the mod (newly admin) leave the room. - new_leave_event = self.create_event( + async def do_nothing_when_admin_leaves(self, module: Any) -> None: + leave_event = self.create_event( { - "sender": self.mod_user_id, + "sender": self.user_id, "type": EventTypes.Member, "content": {"membership": Membership.LEAVE}, "room_id": self.room_id, - "state_key": self.mod_user_id, + "state_key": self.user_id, }, ) - - # Check that we get the right result back from the callback. allowed, replacement = await module.check_event_allowed( - new_leave_event, - self.state, + leave_event, self.state ) self.assertTrue(allowed) self.assertEqual(replacement, None) + # Test that no event is generated + self.assertFalse(module._api.create_and_send_event_into_room.called) - # Test that a new event was sent into the room. - self.assertTrue(module._api.create_and_send_event_into_room.called) # type: ignore[attr-defined] - args, _ = module._api.create_and_send_event_into_room.call_args # type: ignore[attr-defined] - self.assertEqual(len(args), 1) + # TEST SCENARIOS # - # Test that now that there's no user to promote anymore, the room default user level is 100. - pl_event_dict = args[0] + async def test_set_room_users_default_when_last_admin_leaves_on_public_room( + self, + ) -> None: + """Tests that the module sends the right power levels update + when it sees its last admin leaving a public room.""" + await self.do_set_room_users_default_when_last_admin_leaves() - self.assertEqual(pl_event_dict["content"]["users_default"], 100) - for user, pl in pl_event_dict["content"]["users"].items(): - self.assertEqual(pl, 100, user) + async def test_promote_when_last_admin_leaves_on_public_room(self) -> None: + """Tests that the module promotes whoever has the highest non-default PL to admin + when the last admin leaves a public room, if the config allows it. + """ + await self.do_promote_when_last_admin_leaves() + + async def test_set_room_users_default_when_last_admin_leaves_on_private_room( + self, + ) -> None: + """Tests that the module sends the right power levels update + when it sees its last admin leaving a private room.""" + self.state = self.get_private_room() + await self.do_set_room_users_default_when_last_admin_leaves() + + async def test_promote_when_last_admin_leaves_on_private_room(self) -> None: + """Tests that the module promotes whoever has the highest non-default PL to admin + when the last admin leaves a private room, if the config allows it. + """ + self.state = self.get_private_room() + await self.do_promote_when_last_admin_leaves() + + async def test_do_not_set_room_users_default_when_last_admin_leaves_on_other_room( + self, + ) -> None: + """Tests that the module do not send any event when last member leaves an unknown room.""" + self.state = self.get_other_room() + module = create_module() + await self.do_nothing_when_admin_leaves(module) + + async def test_promote_when_last_admin_leaves_on_other_room(self) -> None: + """Tests that the module promotes whoever has the highest non-default PL to admin + when the last admin leaves an unknown room, if the config allows it. + """ + self.state = self.get_other_room() + await self.do_promote_when_last_admin_leaves() class ManageLastAdminTestRoomV9(ManageLastAdminTestCases.BaseManageLastAdminTest):