Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to using the per-thread default stream #54

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 28 additions & 43 deletions parla/cuda.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import logging
from functools import wraps, lru_cache
import os, sys
from typing import Dict, List, Optional, Collection

from parla import array
Expand All @@ -15,6 +16,14 @@

logger = logging.getLogger(__name__)

if 'cupy_backends' in sys.modules:
# TODO: This should be dynamically configurable. That needs to be fixed upstream though.
raise ImportError("cupy must be imported after parla.gpu for per-thread default stream configuration to work properly.")

os.environ["CUPY_CUDA_PER_THREAD_DEFAULT_STREAM"] = "1"
# numba responds to this env var even if it has already been imported.
os.environ["NUMBA_CUDA_PER_THREAD_DEFAULT_STREAM"] = "1"

try:
import cupy
import cupy.cuda
Expand Down Expand Up @@ -167,33 +176,19 @@ def get_array_module(self, a):
# Integration with parla.environments

class _GPUStacksLocal(threading.local):
_stream_stack: List[cupy.cuda.Stream]
_device_stack: List[cupy.cuda.Device]

def __init__(self):
super(_GPUStacksLocal, self).__init__()
self._stream_stack = []
self._device_stack = []

def push_stream(self, stream):
self._stream_stack.append(stream)

def pop_stream(self) -> cupy.cuda.Stream:
return self._stream_stack.pop()

def push_device(self, dev):
self._device_stack.append(dev)

def pop_device(self) -> cupy.cuda.Device:
return self._device_stack.pop()

@property
def stream(self):
if self._stream_stack:
return self._stream_stack[-1]
else:
return None
@property
def device(self):
if self._device_stack:
return self._device_stack[-1]
Expand All @@ -213,30 +208,20 @@ def __init__(self, descriptor: "GPUComponent", env: TaskEnvironment):
# Use a stack per thread per GPU component just in case.
self._stack = _GPUStacksLocal()

def _make_stream(self):
with self.gpu.cupy_device:
return cupy.cuda.Stream(null=False, non_blocking=True)

def __enter__(self):
_gpu_locals._gpus = self.gpus
dev = self.gpu.cupy_device
dev.__enter__()
self._stack.push_device(dev)
stream = self._make_stream()
stream.__enter__()
self._stack.push_stream(stream)
return self

def __exit__(self, exc_type, exc_val, exc_tb):
dev = self._stack.device
stream = self._stack.stream
try:
stream.synchronize()
stream.__exit__(exc_type, exc_val, exc_tb)
cupy.cuda.get_current_stream().synchronize()
_gpu_locals._gpus = None
ret = dev.__exit__(exc_type, exc_val, exc_tb)
finally:
self._stack.pop_stream()
self._stack.pop_device()
return ret

Expand All @@ -245,15 +230,15 @@ def initialize_thread(self) -> None:
# Trigger cuBLAS/etc. initialization for this GPU in this thread.
with cupy.cuda.Device(gpu.index) as device:
a = cupy.asarray([2.])
cupy.cuda.get_current_stream().synchronize()
with cupy.cuda.Stream(False, True) as stream:
cupy.asnumpy(cupy.sqrt(a))
device.cublas_handle
device.cusolver_handle
device.cusolver_sp_handle
device.cusparse_handle
stream.synchronize()
device.synchronize()
stream = cupy.cuda.get_current_stream()
stream.synchronize()
cupy.asnumpy(cupy.sqrt(a))
device.cublas_handle
device.cusolver_handle
device.cusolver_sp_handle
device.cusparse_handle
stream.synchronize()
device.synchronize()

class GPUComponent(EnvironmentComponentDescriptor):
"""A single GPU CUDA component which configures the environment to use the specific GPU using a single
Expand Down Expand Up @@ -312,15 +297,15 @@ def initialize_thread(self) -> None:
# Trigger cuBLAS/etc. initialization for this GPU in this thread.
with cupy.cuda.Device(gpu.index) as device:
a = cupy.asarray([2.])
cupy.cuda.get_current_stream().synchronize()
with cupy.cuda.Stream(False, True) as stream:
cupy.asnumpy(cupy.sqrt(a))
device.cublas_handle
device.cusolver_handle
device.cusolver_sp_handle
device.cusparse_handle
stream.synchronize()
device.synchronize()
stream = cupy.cuda.get_current_stream()
stream.synchronize()
cupy.asnumpy(cupy.sqrt(a))
device.cublas_handle
device.cusolver_handle
device.cusolver_sp_handle
device.cusparse_handle
stream.synchronize()
device.synchronize()


class MultiGPUComponent(EnvironmentComponentDescriptor):
Expand Down