-
Notifications
You must be signed in to change notification settings - Fork 29
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a new interface for submitting actor tasks in batches(Batch Remote)
Signed-off-by: 稚鱼 <[email protected]>
- Loading branch information
Showing
1 changed file
with
204 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,204 @@ | ||
## Summary | ||
### General Motivation | ||
**Core Motivation**: | ||
1. Improve the performance of batch calling ActorTask. | ||
2. Implement Ray's native collective communication library through this interface. | ||
|
||
|
||
In distributed computing scenarios, such as AI training and inference, there is a need to send a large number of RPC CALL requests(ActorTask) to multiple Actors in batches. | ||
For example, in a typical heart-shaped architecture with one master Actor and 400 worker Actors, the same computational request needs to be sent to 400 worker Actors. | ||
However, the computing tasks of each Worker Actor are very short. In such a scenario, the performance requirements for executing batch actor task remote of a large number of Actor are very high. | ||
Therefore, for the scenario of batch calling Actor tasks, I want to add a new optimization API, batch_remote(), to improve the performance of batch submission of Actor Task calls. | ||
After my own performance testing and comparison, this interface has significant performance improvements. | ||
|
||
Current situation of batch calling actor tasks: | ||
``` | ||
actors = [WorkerActor.remote() for _ in range(400)] | ||
# This loop's repeated invocation actually wastes a lot of performance. | ||
for actor in actors: | ||
actor.compute.remote(args) | ||
``` | ||
|
||
Using the new Batch Remote API: | ||
``` | ||
actors = [WorkerActor.remote() for _ in range(400)] | ||
# Calling it only once can greatly improve performance. | ||
ray.batch_remote(actors).compute.remote(args) | ||
``` | ||
|
||
The Batch Remote API can save the following performance costs(The N is the number of Actors): | ||
1. Reduce (N-1) times of parameter serialization performance time. | ||
2. Reduce (N-1) times of putting parameter into object store performance time for scenarios with large parameters. | ||
3. Reduce (N-1) times of python and C++ execution layer switching and repeated parameter verification performance time. | ||
|
||
|
||
### Should this change be within `ray` or outside? | ||
This requires adding a new interface in Ray Core. | ||
|
||
## Stewardship | ||
### Required Reviewers | ||
... | ||
|
||
### Shepherd of the Proposal (should be a senior committer) | ||
... | ||
|
||
## Design and Architecture | ||
### API | ||
Use case | ||
``` | ||
ray.batch_remote(actors).compute.remote(args) | ||
``` | ||
|
||
### Implementation | ||
|
||
1. In the ActorTask calling logic, the parameter verification, parameter serialization, and parameter putting into the object store can be reused from the original logic. | ||
2. Then, add a BatchSubmitActorTask interface in the C++ CoreWorker layer, and loop through the original SubmitActorTask interface to submit multiple ActorTasks. This way, the batch submission of ActorTasks can be handled in the C++ CoreWorker layer, saving time on parameter serialization and putting into the object store. | ||
|
||
Overall, the implementation is relatively simple, with most of it reusing the original ActorTask submission logic. | ||
|
||
``` | ||
def batch_remote(actors: List[ray.actor.ActorHandle]) -> BatchRemoteHandle: | ||
... | ||
class BatchRemoteHandle: | ||
def __init__(self, actors: List[ray.actor.ActorHandle]): | ||
... | ||
def _actor_method_call(): | ||
object_refs = worker.core_worker.batch_submit_actor_task(actor_ids, ...) | ||
return object_refs | ||
C++: | ||
class CoreWorker: | ||
BatchSubmitActorTask(actor_ids, &return_ids) { | ||
for( auto actor_id : actor_ids) { | ||
SubmitActorTask(actor_id, ..., return_ids) | ||
} | ||
} | ||
``` | ||
|
||
### Performance improvement comparison: | ||
We have already implemented and conducted extensive performance testing internally. | ||
The following are the performance comparison results. | ||
|
||
**Table 1: Comparison of remote call time with varying parameter sizes and 400 Actors** | ||
|
||
|
||
Parameter Size (byte) | Time taken for foreach_remote(ms) | Time taken for batch_remote(ms) | The improvement rate | ||
-- | -- | -- | -- | ||
10 | 40.532 | 9.226 | 77.2% | ||
409846 | 584.345 | 24.106 | 95.9% | ||
819446 | 606.725 | 16.019 | 97.4% | ||
1229047 | 976.974 | 19.403 | 98.0% | ||
1638647 | 993.454 | 23.184 | 97.7% | ||
2048247 | 972.438 | 19.028 | 98.0% | ||
2457850 | 987.04 | 17.642 | 98.2% | ||
2867450 | 976.165 | 15.07 | 98.5% | ||
3277050 | 1108.331 | 18.272 | 98.4% | ||
3686650 | 1186.371 | 16.011 | 98.7% | ||
4096250 | 1335.575 | 15.951 | 98.8% | ||
4505850 | 1490.914 | 20.928 | 98.6% | ||
4915450 | 1511.744 | 23.041 | 98.5% | ||
5325050 | 1716.752 | 17.515 | 99.0% | ||
5734650 | 2009.711 | 22.891 | 98.9% | ||
6144250 | 2424.166 | 23.129 | 99.0% | ||
6553850 | 2354.033 | 20.271 | 99.1% | ||
6963450 | 2599.015 | 24.347 | 99.1% | ||
7373050 | 2610.843 | 17.91 | 99.3% | ||
7782650 | 2751.179 | 17.258 | 99.4% | ||
|
||
![Comparison of remote call time with varying parameter sizes and 400 Actors](https://github.com/ray-project/ray/assets/11072802/87591859-d247-4444-95ee-e37f04efc095) | ||
|
||
|
||
**Conclusion:** | ||
1. The larger the parameter size, the greater the performance gain of batch_remote(). | ||
|
||
|
||
**Table 2: Comparison of remote call time with varying numbers of Actors and a fixed parameter size (1MB)** | ||
|
||
actor counts | Time taken for foreach_remote(ms) | Time taken for batch_remote(ms) | The improvement rate | ||
-- | -- | -- | -- | ||
50 | 95.889 | 4.657 | 95.1% | ||
100 | 196.184 | 8.447 | 95.7% | ||
150 | 291.879 | 15.228 | 94.8% | ||
200 | 373.806 | 21.161 | 94.3% | ||
250 | 475.482 | 20.768 | 95.6% | ||
300 | 577.406 | 24.323 | 95.8% | ||
350 | 646.415 | 24.59 | 96.2% | ||
400 | 763.94 | 34.806 | 95.4% | ||
450 | 874.601 | 30.723 | 96.5% | ||
500 | 955.915 | 45.888 | 95.2% | ||
550 | 1041.728 | 39.736 | 96.2% | ||
600 | 1124.786 | 45.122 | 96.0% | ||
650 | 1262.363 | 46.687 | 96.3% | ||
700 | 1331.427 | 60.543 | 95.5% | ||
750 | 1407.485 | 47.386 | 96.6% | ||
800 | 1555.571 | 55.297 | 96.4% | ||
850 | 1549.03 | 60.493 | 96.1% | ||
900 | 1675.685 | 58.268 | 96.5% | ||
950 | 2314.186 | 48.785 | 97.9% | ||
|
||
![Comparison of remote call time with varying numbers of Actors and 1MB parameter size](https://github.com/ray-project/ray/assets/11072802/fe41eb01-24bd-4418-90c1-81b6f413e548) | ||
|
||
**Conclusion:** | ||
The more actors, the greater the performance gain. | ||
|
||
|
||
**Table 3: Comparison of remote call time with varying numbers of Actors and no parameters in remote calls** | ||
|
||
This test is to confirm the degree of performance optimization after reducing the frequency of switching between the Python and C++ execution layers. | ||
|
||
actor counts | Time taken for foreach_remote(ms) | Time taken for batch_remote(ms) | The improvement rate | ||
-- | -- | -- | -- | ||
50 | 2.083 | 1.257 | 39.7% | ||
100 | 4.005 | 2.314 | 42.2% | ||
150 | 5.582 | 3.467 | 37.9% | ||
200 | 8.104 | 3.574 | 55.9% | ||
250 | 10.104 | 4.64 | 54.1% | ||
300 | 11.858 | 6.224 | 47.5% | ||
350 | 13.826 | 8.017 | 42.0% | ||
400 | 15.862 | 8.145 | 48.7% | ||
450 | 18.368 | 9.261 | 49.6% | ||
500 | 18.881 | 10.722 | 43.2% | ||
550 | 21.129 | 11.944 | 43.5% | ||
600 | 23.413 | 12.925 | 44.8% | ||
650 | 26.485 | 13.328 | 49.7% | ||
700 | 27.855 | 14.303 | 48.7% | ||
750 | 29.432 | 14.922 | 49.3% | ||
800 | 31.03 | 16.329 | 47.4% | ||
850 | 32.405 | 17.582 | 45.7% | ||
900 | 34.388 | 18.521 | 46.1% | ||
950 | 36.499 | 19.658 | 46.1% | ||
|
||
![Comparison of remote call time with varying numbers of Actors and no parameters in remote calls](https://github.com/ray-project/ray/assets/11072802/493d036b-8eb2-4397-ac8b-492bc5b526b9) | ||
|
||
|
||
**Conclusion:** | ||
After comparison, in the scenario of 400 actors and remote calls without parameters, the performance is optimized by 40%~50%. | ||
|
||
**Summary:** | ||
The newly added Batch Remote API can improve performance in the case of batch calling Actor task. It can reduce performance costs such as parameter serialization, object store consumption, and Python and C++ execution layer switching, thereby improving the performance of the entire distributed computing system. | ||
Especially in the following scenario: | ||
1. large parameters | ||
2. a large number of Actors | ||
|
||
|
||
## Compatibility, Deprecation, and Migration Plan | ||
N/A | ||
|
||
## Test Plan and Acceptance Criteria | ||
1. The basic test cases. | ||
|
||
## (Optional) Follow-on Work | ||
|
||
### 1. Implement Ray's native collective communication library through this interface. | ||
|
||
The collection communication of Ray's CPU computing scenario is currently implemented through Gloo. However, there are two issues with the collection communication implemented through Gloo: | ||
1. Users need to require other dependency(Gloo), which increases the usage cost. | ||
2. According to our testing, Gloo has many problems when it comes to supporting large-scale scenarios with more than 400+ nodes. | ||
|
||
Therefore, we want to develop a native collection communication library for Ray with the aim of making it more convenient for users and supporting large-scale scenarios. | ||
This native library will undoubtedly rely on the current batch remote API to improve performance. After the API is completed, we will consider implementing the native collection communication library for Ray. | ||
|