Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregator setup pipelining #487

Draft
wants to merge 2 commits into
base: dev
Choose a base branch
from
Draft

Aggregator setup pipelining #487

wants to merge 2 commits into from

Conversation

doscortados
Copy link

Closes #460

@doscortados doscortados requested a review from a team January 28, 2025 14:39
@doscortados doscortados changed the base branch from main to dev January 28, 2025 14:40
@ozankaymak ozankaymak requested review from mmtftr, ekrembal, ceyhunsen and ozankaymak and removed request for a team January 28, 2025 14:42
Copy link

@mmtftr mmtftr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for the changes.

I believe the original issue also wants the try_join_alls to happen in background tasks so that they're happening in parallel (you can check aggregator for details)

We're also going to be working on moving the business logic out of the gRPC handlers, and moving parsing/validation logic out into trait impls so that they're more easily tested and the code has less clutter.

core/src/rpc/aggregator.rs Outdated Show resolved Hide resolved
core/src/rpc/aggregator.rs Outdated Show resolved Hide resolved
@doscortados
Copy link
Author

Thank you for the comments @mmtftr , and sorry for my late reply!
This is rather initial version and significant re-work will follow in further commits.

At this point I see that mpsc channel isn't working out for multiplexing N-over-M streams, and I am trying out mpmc one to avoid pulling all the params in memory before streaming it to consumers.

@doscortados doscortados reopened this Feb 3, 2025
@doscortados doscortados requested a review from mmtftr February 3, 2025 09:55
watchtower_params_setup_task_handle,
])
.await
.map_err(|e| BridgeError::Error(format!("aggregator setup failed: {e:?}")))?;
Copy link
Author

@doscortados doscortados Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dear reviewer,
Please suggest better error type here, this one seems like a too general one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe BridgeError::RPCStreamEndedUnexpectedly? Or we can create a new one in core/src/rpc/error.rs.

Copy link
Member

@ceyhunsen ceyhunsen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, it looks good. But apparently there is a deadlock or something like that: CI/CD tests are not finishing.

watchtower_params_setup_task_handle,
])
.await
.map_err(|e| BridgeError::Error(format!("aggregator setup failed: {e:?}")))?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe BridgeError::RPCStreamEndedUnexpectedly? Or we can create a new one in core/src/rpc/error.rs.


future.await?; // TODO: This is dangerous: If channel size becomes not sufficient, this will block forever.
let operator_params_setup_task_handle = tokio::spawn(async move {
for response in operator_params_response_streams.iter_mut() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this loop:

  1. For each operator
  2. Stream all messages to operator (await)

This will block for each operator and won't pipeline operators in parallel.

can we collect all async blocks on the inside and join them on the outside of this loop?

.map_err(|_| output_stream_ended_prematurely())?;
let watchtower_params_setup_task_handle =
tokio::spawn(async move {
for response in watchtower_params_response_streams.iter_mut() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, let's collect the inner blocks and join on the outside

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Pipeline for aggregator setup
3 participants