Skip to content

Commit

Permalink
requested changes
Browse files Browse the repository at this point in the history
  • Loading branch information
H-Shay committed Jan 30, 2025
1 parent 115ff00 commit b0f804e
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 57 deletions.
136 changes: 80 additions & 56 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -1606,33 +1606,60 @@ def _get_rooms_for_user_by_join_date_txn(
from_ts,
)

async def set_room_participation(self, room_id: str, user_id: str) -> None:
async def set_room_participation(self, user_id: str, room_id: str) -> None:
"""
Record the provided user as participating in the given room
Args:
room_id: ID of the room to set the participant in
user_id: the user ID of the user
room_id: ID of the room to set the participant in
"""
await self.db_pool.simple_update(
"room_memberships",
{"user_id": user_id, "room_id": room_id},
{"participant": True},
"update_room_participation",

def _set_room_participation_txn(
txn: LoggingTransaction, user_id: str, room_id: str
) -> None:
sql = """
UPDATE room_memberships
SET participant = True
WHERE user_id = ?
AND room_id = ?
ORDER BY event_stream_ordering DESC
LIMIT 1
"""
txn.execute(sql, (user_id, room_id))

await self.db_pool.runInteraction(
"_set_room_participation_txn", _set_room_participation_txn, user_id, room_id
)

async def get_room_participation(self, room_id: str, user_id: str) -> bool:
async def get_room_participation(self, user_id: str, room_id: str) -> bool:
"""
Check whether a user is listed as a participant in a room
Args:
room_id: ID of the room to check in
user_id: user ID of the user
"""
return await self.db_pool.simple_select_one_onecol(
"room_memberships",
{"user_id": user_id, "room_id": room_id},
"participant",

def _get_room_participation_txn(
txn: LoggingTransaction, user_id: str, room_id: str
) -> bool:
sql = """
SELECT participant
FROM room_memberships
WHERE user_id = ?
AND room_id = ?
ORDER BY event_stream_ordering DESC
LIMIT 1
"""
txn.execute(sql, (user_id, room_id))
res = txn.fetchone()
if res:
return res[0]
return False

return await self.db_pool.runInteraction(
"_get_room_participation_txn", _get_room_participation_txn, user_id, room_id
)


Expand Down Expand Up @@ -1672,76 +1699,73 @@ def __init__(
async def _populate_participant(self, progress: JsonDict, batch_size: int) -> int:
"""
Background update to populate column `participant` on `room_memberships` table
one room at a time
A 'participant' is someone who is currently joined to a room and has sent at least
one `m.room.message` or `m.room.encrypted` event.
"""
last_room_id = progress.get("last_room_id", "")
stream_token = progress.get("last_stream_token", None)

def _get_current_room_txn(
txn: LoggingTransaction, last_room_id: str
) -> Optional[str]:
def _get_max_stream_token_txn(txn: LoggingTransaction) -> Optional[str]:
sql = """
SELECT room_id from room_memberships WHERE room_id > ?
ORDER BY room_id
SELECT event_stream_ordering from room_memberships
ORDER BY event_stream_ordering DESC
LIMIT 1;
"""
txn.execute(sql, (last_room_id,))
txn.execute(sql)
res = txn.fetchone()
if res:
room_id = res[0]
return room_id
else:
return None
assert res is not None
return res[0]

def _background_populate_participant_per_room_txn(
txn: LoggingTransaction, current_room_id: str
def _background_populate_participant_txn(
txn: LoggingTransaction, stream_token: str
) -> None:
sql = """
SELECT DISTINCT c.state_key
FROM current_state_events AS c
INNER JOIN events AS e USING(room_id)
WHERE room_id = ?
AND c.membership = 'join'
AND e.type = 'm.room.message'
OR e.type = 'm.room.encrypted'
AND c.state_key = e.sender;
UPDATE room_memberships
SET participant = True
FROM (
SELECT DISTINCT c.state_key, e.room_id
FROM current_state_events AS c
INNER JOIN events AS e ON c.room_id = e.room_id
WHERE c.membership = 'join'
AND c.state_key = e.sender
AND (
e.type = 'm.room.message'
OR e.type = 'm.room.encrypted'
)
) AS subquery
WHERE room_memberships.user_id = subquery.state_key
AND room_memberships.room_id = subquery.room_id
AND room_memberships.event_stream_ordering <= ?;
"""

txn.execute(sql, (current_room_id,))
res = txn.fetchall()
txn.execute(sql, (stream_token,))

if res:
participants = [user[0] for user in res]
for participant in participants:
self.db_pool.simple_update_txn(
txn,
table="room_memberships",
keyvalues={"user_id": participant, "room_id": current_room_id},
updatevalues={"participant": True},
)
if stream_token is None:
stream_token = await self.db_pool.runInteraction(
"_get_max_stream_token", _get_max_stream_token_txn
)

current_room_id = await self.db_pool.runInteraction(
"_get_current_room_txn", _get_current_room_txn, last_room_id
)
if not current_room_id:
if stream_token <= 0:
await self.db_pool.updates._end_background_update(
"populate_participant_bg_update"
)
return 1
return 1000

logger.info(f"stream token is {stream_token}")
await self.db_pool.runInteraction(
"_background_populate_participant_per_room_txn",
_background_populate_participant_per_room_txn,
current_room_id,
"_background_populate_participant_txn",
_background_populate_participant_txn,
stream_token,
)

progress["last_room_id"] = current_room_id
progress["last_stream_token"] = stream_token - 1000
await self.db_pool.runInteraction(
"populate_participant_bg_update",
self.db_pool.updates._background_update_progress_txn,
"populate_participant_bg_update",
progress,
)
return 1
return 1000

async def _background_add_membership_profile(
self, progress: JsonDict, batch_size: int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
-- <https://www.gnu.org/licenses/agpl-3.0.html>.

-- Add a column `participant` to `room_memberships` table to track whether a room member has sent
-- a `m.room.message` event into a room they are a member of
-- a `m.room.message` or `m.room.encrypted` event into a room they are a member of
ALTER TABLE room_memberships ADD COLUMN participant BOOLEAN DEFAULT FALSE;

-- Add a background update to populate `participant` column
Expand Down

0 comments on commit b0f804e

Please sign in to comment.