Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add design doc for memory tracking in the plugin #2628

Open
wants to merge 2 commits into
base: branch-25.02
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added docs/img/memory_state_machine.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
40 changes: 40 additions & 0 deletions docs/memory_management.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
## Memory management

The Spark-RAPIDS plugin manages device memory to effectively allocate the limited device memory resource among concurrent tasks.
jihoonson marked this conversation as resolved.
Show resolved Hide resolved
For memory management, the plugin tracks every device memory allocation and de-allocation request during processing.
While there is enough memory available, the allocation request succeeds and the task continues processing.
However, when the allocation request cannot succeed due to lack of memory, the plugin pauses that thread. When all of the active tasks have at least one thread paused, the plugin starts to roll back some of those paused threads to points where all of their input data is spillable, and let the other threads try to complete. If every thread except one has been rolled back and the one remaining thread cannot still make progress, then pluging picks up one thread to split its input and try again.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer it if we reword things a bit

Suggested change
For memory management, the plugin tracks every device memory allocation and de-allocation request during processing.
While there is enough memory available, the allocation request succeeds and the task continues processing.
However, when the allocation request cannot succeed due to lack of memory, the plugin pauses that thread. When all of the active tasks have at least one thread paused, the plugin starts to roll back some of those paused threads to points where all of their input data is spillable, and let the other threads try to complete. If every thread except one has been rolled back and the one remaining thread cannot still make progress, then pluging picks up one thread to split its input and try again.
For memory management, the plugin uses RMM and wraps it to provide the ability to recover from out of memory errors. The first line of defense is spilling which is provided in the spark-rapids plugin itself. The second line of defense is described in this document and is implemented in [SparkResourceAdaptorJni.cpp](../src/main/cpp/src/SparkResourceAdaptorJni.cpp). This code keeps track of each task thread and tracks the state of those threads.
While RMM allocation requests succeed this will not interfere with the running threads.
However, when the allocation request fails, even after spilling, this code will try and pause or roll back threads to free up memory and allow other threads/tasks to succeed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I overhauled this part. Please have another look.

### State machine for OOM handler

The Spark-RAPIDS plugin keeps track of the state of the individual threads. Note that one Spark task can use multiple threads during execution.

The thread can have one of these states at a time:

- `UNKNOWN`: the thread has not been registered with the tracking system.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `UNKNOWN`: the thread has not been registered with the tracking system.
- `UNKNOWN`: the thread has not been registered with the tracking system. In this state the thread will not be messed with. It is here as a fail-safe.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if the additional comment is necessary. Is it not clear that the system will do nothing with the unregistered threads?

- `THREAD_RUNNING`: the thread is running normally.
jihoonson marked this conversation as resolved.
Show resolved Hide resolved
- `THREAD_ALLOC`: the thread has initiated a memory allocation.
jihoonson marked this conversation as resolved.
Show resolved Hide resolved
- `THREAD_ALLOC_FREE`: the thread has requested a memory free before the allocation completes.
jihoonson marked this conversation as resolved.
Show resolved Hide resolved
- `THREAD_BLOCKED`: the allocation is blocked due to lack of memory. The thread is waiting for enough memory to be available.
jihoonson marked this conversation as resolved.
Show resolved Hide resolved
- `THREAD_BUFN_THROW`: a deadlock has been detected as all threads are blocked, and this thread has been selected to roll back to the point where all its data is spillable.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `THREAD_BUFN_THROW`: a deadlock has been detected as all threads are blocked, and this thread has been selected to roll back to the point where all its data is spillable.
- `THREAD_BUFN_THROW`: a deadlock has been detected as all threads are blocked, and this thread has been selected to roll back to the point where all its data is spillable. An exception will be thrown to trigger this rollback when the thread wakes up. It is expected that the user code will catch the exception and free any non-spillable memory.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is expected that the user code will catch the exception and free any non-spillable memory.

I suppose that the "user code" means the user of this state machine, not the end user?

- `THREAD_BUFN_WAIT`: the thread has initiated the rollback.
jihoonson marked this conversation as resolved.
Show resolved Hide resolved
- `THREAD_BUFN`: the thread has rolled back and is now blocked until further notice (BUFN). The task will be unblocked once high priority tasks release enough memory.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `THREAD_BUFN`: the thread has rolled back and is now blocked until further notice (BUFN). The task will be unblocked once high priority tasks release enough memory.
- `THREAD_BUFN`: the thread has rolled back and is now blocked until further notice (BUFN). The task will be unblocked once another task completes. In this case completes may mean that it releases the GPU semaphore instead of fully completing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether I understand the code correctly for when the state transitions from THREAD_BUFN. AFAIT, the state transitions from THREAD_BUFN to THREAD_RUNNING after a free is called (do_deallocate() -> dealloc_core() -> wake_next_highest_priority_blocked()). Another place where the state is transitioned from THREAD_BUFN is pool_thread_finished_for_tasks(), which is called when a data receive is completed during shuffle. I don't seem to see that releasing the semaphore directly triggers unblocking a task. Can you give me some pointers where it happens in the code?

Another question for this state: why is wake_next_highest_priority_blocked() called in post_alloc_success_core()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about your comment, I think this part should be better to just explain what each state is. Perhaps I should add another section to explain when the state transition happens from one to another? I don't think every transition is worth to explain, such as THREAD_RUNNING -> UNKNOWN, so we can probably explain only those important ones.

- `THREAD_SPLIT_THROW`: a deadlock has been detected as all threads are BUFN, and this thread has been selected to roll back, split its input, and retry.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `THREAD_SPLIT_THROW`: a deadlock has been detected as all threads are BUFN, and this thread has been selected to roll back, split its input, and retry.
- `THREAD_SPLIT_THROW`: a deadlock has been detected as all threads are BUFN, and this thread has been selected to roll back, split its input, and retry. Not all code is guaranteed to support splitting its input to try again.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rephrased as

A deadlock has been detected as all threads are BUFN, and this thread has been selected to roll back, split its input, and retry. Note that the processing will fail without retrying if the input cannot be further split.

- `THREAD_REMOVE_THROW`: the task has been unregistered while blocked.
jihoonson marked this conversation as resolved.
Show resolved Hide resolved

The thread state can change based on the diagram below. Note that the thread state can transition from any state to `UNKNOWN`, but it is omitted in the diagram for brevity.
jihoonson marked this conversation as resolved.
Show resolved Hide resolved

![alt text](img/memory_state_machine.png "Thread state machine")

### Thread priority

The Spark-RAPIDS plugin uses the thread priority when it needs to break ties between threads. See the [Deadlock busting](#deadlock-busting) section below for an example use case. The thread priority is currently decoupled with the query priority. That is, the threads processing a high priority query do not necessarily have the same high priority. Instead, each task thread is assigned a priority based on their `task_id` and `thread_id`. Shuffle threads have the highest priority, and thus are always prioritized over task threads. This is because other task threads may depend on shuffle indirectly, and this lets us avoid situations of priority inversion. In the future, we may consider taking the query priority into the thread priority.

### Deadlock busting

The deadlock can occur when every active task has at least one thread that is either directly blocked on a memory allocation or indirectly blocked by shuffle being blocked on a memory allocation. When this happens, the lowest priority thread (see the above [Thread priority](thread-priority) section for the thread priority) is selected to break the deadlock. There are two cases of the deadlock.

1) All threads are blocked and there is at least one thread in the `THREAD_BLOCKED` state. In this case, the lowest priority thread is selected among `THREAD_BLOCKED` threads to break the deadlock. The thread selected transitions its state to `THREAD_BUFN_THROW` and initiates the rollback-and-retry process. After the rollback, all input data of the thread will be spillable and the thread will block before allocating more GPU memory until enough memory is freed up for other threads.
2) If all threads are blocked and are in the `THREAD_BUFN` state, the lowest priority thread is selected to split its input first and then retry with a smaller input. The thread selected transitions its state to `THREAD_SPLIT_THROW` and initiates the rollback-split-and-retry process.

If the thread selected is a task thread and its priority is not the highest priority, the thread will transition its state into the `THREAD_BUFN_THROW` state. Any threads that was just marked as `THREAD_BUFN_THROW` will be awaken to start the rollback process and initiate the retry. After the rollback, all input data of the thread will be spillable and the thread will block before allocating more GPU memory until enough memory is freed up for other threads.
2 changes: 2 additions & 0 deletions src/main/cpp/src/SparkResourceAdaptorJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ void cache_thread_reg_jni(JNIEnv* env)
// again until we know that progress has been made. We might add an API
// in the future to know when a retry section has passed, which would
// probably be a preferable time to restart all BUFN threads.
//
// See `docs/memory_management.md` for the design of the state machine.
enum class thread_state {
UNKNOWN = -1, // unknown state, this is really here for logging and anything transitioning to
// this state should actually be accomplished by deleting the thread from the state
Expand Down