Skip to content

Commit

Permalink
Switch to using the per-thread default stream instead of a stream per…
Browse files Browse the repository at this point in the history
… task.
  • Loading branch information
insertinterestingnamehere committed Apr 16, 2021
1 parent 5a7457e commit 2c88d82
Showing 1 changed file with 28 additions and 43 deletions.
71 changes: 28 additions & 43 deletions parla/cuda.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import logging
from functools import wraps, lru_cache
import os, sys
from typing import Dict, List, Optional, Collection

from parla import array
Expand All @@ -15,6 +16,14 @@

logger = logging.getLogger(__name__)

if 'cupy_backends' in sys.modules:
# TODO: This should be dynamically configurable. That needs to be fixed upstream though.
raise ImportError("cupy must be imported after parla.gpu for per-thread default stream configuration to work properly.")

os.env["CUPY_CUDA_PER_THREAD_DEFAULT_STREAM"] = "1"
# numba responds to this env var even if it has already been imported.
os.env["NUMBA_CUDA_PER_THREAD_DEFAULT_STREAM"] = "1"

try:
import cupy
import cupy.cuda
Expand Down Expand Up @@ -167,33 +176,19 @@ def get_array_module(self, a):
# Integration with parla.environments

class _GPUStacksLocal(threading.local):
_stream_stack: List[cupy.cuda.Stream]
_device_stack: List[cupy.cuda.Device]

def __init__(self):
super(_GPUStacksLocal, self).__init__()
self._stream_stack = []
self._device_stack = []

def push_stream(self, stream):
self._stream_stack.append(stream)

def pop_stream(self) -> cupy.cuda.Stream:
return self._stream_stack.pop()

def push_device(self, dev):
self._device_stack.append(dev)

def pop_device(self) -> cupy.cuda.Device:
return self._device_stack.pop()

@property
def stream(self):
if self._stream_stack:
return self._stream_stack[-1]
else:
return None
@property
def device(self):
if self._device_stack:
return self._device_stack[-1]
Expand All @@ -213,30 +208,20 @@ def __init__(self, descriptor: "GPUComponent", env: TaskEnvironment):
# Use a stack per thread per GPU component just in case.
self._stack = _GPUStacksLocal()

def _make_stream(self):
with self.gpu.cupy_device:
return cupy.cuda.Stream(null=False, non_blocking=True)

def __enter__(self):
_gpu_locals._gpus = self.gpus
dev = self.gpu.cupy_device
dev.__enter__()
self._stack.push_device(dev)
stream = self._make_stream()
stream.__enter__()
self._stack.push_stream(stream)
return self

def __exit__(self, exc_type, exc_val, exc_tb):
dev = self._stack.device
stream = self._stack.stream
try:
stream.synchronize()
stream.__exit__(exc_type, exc_val, exc_tb)
cupy.cuda.get_current_stream().synchronize()
_gpu_locals._gpus = None
ret = dev.__exit__(exc_type, exc_val, exc_tb)
finally:
self._stack.pop_stream()
self._stack.pop_device()
return ret

Expand All @@ -245,15 +230,15 @@ def initialize_thread(self) -> None:
# Trigger cuBLAS/etc. initialization for this GPU in this thread.
with cupy.cuda.Device(gpu.index) as device:
a = cupy.asarray([2.])
cupy.cuda.get_current_stream().synchronize()
with cupy.cuda.Stream(False, True) as stream:
cupy.asnumpy(cupy.sqrt(a))
device.cublas_handle
device.cusolver_handle
device.cusolver_sp_handle
device.cusparse_handle
stream.synchronize()
device.synchronize()
stream = cupy.cuda.get_current_stream()
stream.synchronize()
cupy.asnumpy(cupy.sqrt(a))
device.cublas_handle
device.cusolver_handle
device.cusolver_sp_handle
device.cusparse_handle
stream.synchronize()
device.synchronize()

class GPUComponent(EnvironmentComponentDescriptor):
"""A single GPU CUDA component which configures the environment to use the specific GPU using a single
Expand Down Expand Up @@ -312,15 +297,15 @@ def initialize_thread(self) -> None:
# Trigger cuBLAS/etc. initialization for this GPU in this thread.
with cupy.cuda.Device(gpu.index) as device:
a = cupy.asarray([2.])
cupy.cuda.get_current_stream().synchronize()
with cupy.cuda.Stream(False, True) as stream:
cupy.asnumpy(cupy.sqrt(a))
device.cublas_handle
device.cusolver_handle
device.cusolver_sp_handle
device.cusparse_handle
stream.synchronize()
device.synchronize()
stream = cupy.cuda.get_current_stream()
stream.synchronize()
cupy.asnumpy(cupy.sqrt(a))
device.cublas_handle
device.cusolver_handle
device.cusolver_sp_handle
device.cusparse_handle
stream.synchronize()
device.synchronize()


class MultiGPUComponent(EnvironmentComponentDescriptor):
Expand Down

0 comments on commit 2c88d82

Please sign in to comment.