Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bumping async_stream causes test failures #4599

Open
poljar opened this issue Jan 31, 2025 · 0 comments
Open

Bumping async_stream causes test failures #4599

poljar opened this issue Jan 31, 2025 · 0 comments

Comments

@poljar
Copy link
Contributor

poljar commented Jan 31, 2025

As discovered in #4596, updating async-stream cause some tests to fail.

The reason the identity_status_changes test fail is because the event handler for the m.room.member event registered over here:

fn wrap_room_member_events(
room: &Room,
) -> (EventHandlerDropGuard, impl Stream<Item = RoomIdentityChange>) {
let own_user_id = room.own_user_id().to_owned();
let room_id = room.room_id();
let (sender, receiver) = mpsc::channel(16);
let handle =
room.client.add_room_event_handler(room_id, move |event: SyncRoomMemberEvent| async move {
if *event.state_key() == own_user_id {
return;
}
let _: Result<_, _> = sender.send(RoomIdentityChange::SyncRoomMemberEvent(event)).await;
});
let drop_guard = room.client.event_handler_drop_guard(handle);
(drop_guard, ReceiverStream::new(receiver))
}

isn't called anymore. As we can see, we create a drop guard for the event handler. So once the guard is dropped the handler is removed. The drop guard gets moved into a stream over here:

let (drop_guard, room_member_events) = wrap_room_member_events(&room);
let mut unprocessed_stream = combine_streams(identity_updates, room_member_events);
let own_user_id = room.client.user_id().ok_or(Error::InsufficientData)?.to_owned();
let mut state = IdentityStatusChanges {
room_identity_state: RoomIdentityState::new(room).await,
_drop_guard: drop_guard,
};
Ok(stream!({
let mut current_state =
filter_for_initial_update(state.room_identity_state.current_state(), &own_user_id);
if !current_state.is_empty() {
current_state.sort();
yield current_state;
}
while let Some(item) = unprocessed_stream.next().await {
let mut update = filter_non_self(
state.room_identity_state.process_change(item).await,
&own_user_id,
);
if !update.is_empty() {
update.sort();
yield update;
}
}
}))
}
}

This means that the handler should be alive as long as the stream is alive, but as it seems something changed between async_stream version 0.3.5 and 0.3.6 and the drop guard gets dropped before the stream is.

Downgrading async_stream fixes the issue. The only relevant change seems to be in this PR: tokio-rs/async-stream@97d1d3e.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant