Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Allow Python backend to directly write Numpy arrays to SHM #264

Draft
wants to merge 6 commits into
base: r23.05
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
FROM asnpdsacr.azurecr.io/public/tritonserver:23.05-tf2-python-py3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove


#RUN DEBIAN_FRONTEND="noninteractive" apt-get update && apt-get -y install tzdata

RUN apt-get update \
&& apt-get install -y build-essential \
gcc \
g++ \
gdb \
clang \
make \
ninja-build \
cmake \
autoconf \
automake \
libtool \
valgrind \
locales-all \
dos2unix \
rsync \
tar
RUN apt-get install -y python3-pip python3.10-dev
RUN apt-get install -y rapidjson-dev libarchive-dev zlib1g-dev
RUN apt-get install -y git
RUN pip3 install numpy
RUN rm -r /opt/tritonserver/backends/python
RUN git config --global --add safe.directory '*'
RUN apt-get install -y ssh

RUN useradd -m user && yes password | passwd user
RUN apt-get install gdbserver
Binary file added models/.DS_Store
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
�֧�����D�����ٲ�����ٿ� ��������(���������2
Binary file not shown.
Binary file not shown.
Binary file not shown.
43 changes: 43 additions & 0 deletions models/category_tensorflow_model/config.pbtxt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name: "category_tensorflow_model"
platform: "tensorflow_savedmodel"
max_batch_size: 0

parameters: {
key: "TF_SIGNATURE_DEF"
value: {
string_value: "call"
}
}

input [
{
name: "candidatesss"
data_type: TYPE_FP32
dims: [ -1 , -1]

}
]
input [
{
name: "user_history"
data_type: TYPE_FP32
dims: [ -1 , -1]

}
]
output [
{
name: "scores"
data_type: TYPE_FP32
dims: [ -1 ]
}
]

instance_group [
{
count: 2
kind: KIND_CPU
}
]

dynamic_batching { }
63 changes: 63 additions & 0 deletions models/test_bls/1/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""
Category model
"""
import time
from typing import cast

import numpy as np

try:
import triton_python_backend_utils as pb_utils
except ImportError:
import tests.stub.triton_python_backend_utils

pb_utils: tests.stub.triton_python_backend_utils = cast(
tests.stub.triton_python_backend_utils, None
)


def breakpoint():
import pydevd_pycharm

pydevd_pycharm.settrace(
'host.docker.internal', port=5858, stdoutToServer=True, stderrToServer=True
)


class TritonPythonModel:
def initialize(self, args):
import triton_python_backend_utils
self.shm = triton_python_backend_utils.shared_memory
self.candidates_cache = np.random.random((500000, 200)).astype(np.float32)

def execute_request(self, request):
n = int(pb_utils.get_input_tensor_by_name(request, "n").as_numpy()[0])
candidates = np.random.randint(100000, size=n)
candidate_tensor: pb_utils.Tensor = pb_utils.new_shm_tensor("candidatesss", self.shm, (n, 200), np.float32)
np.take(self.candidates_cache, candidates, axis=0, out=candidate_tensor.as_numpy(), mode='clip')

context_array = np.random.random((10, 200)).astype(np.float32)
context_tensor = pb_utils.Tensor(
"user_history",
context_array,
)

inference_response = pb_utils.InferenceRequest(
model_name="category_tensorflow_model",
requested_output_names=["scores"],
inputs=[candidate_tensor, context_tensor],
).exec()

if inference_response.has_error():
raise pb_utils.TritonModelException(inference_response.error().message())
else:
scores = pb_utils.get_output_tensor_by_name(inference_response, "scores")

out_scores = pb_utils.Tensor("scores", scores.as_numpy()[:400])

response = pb_utils.InferenceResponse(output_tensors=[out_scores])

return response

def execute(self, requests):
return [self.execute_request(request) for request in requests]
22 changes: 22 additions & 0 deletions models/test_bls/config.pbtxt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: "test_bls"
backend: "python"

input [
{
name: "n"
data_type: TYPE_INT32
dims: [ -1]

}
]

output [
{
name: "scores"
data_type: TYPE_FP32
dims: [ -1 ]
}
]


instance_group [{ kind: KIND_CPU }]
68 changes: 68 additions & 0 deletions models/test_bls_before/1/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""
Category model
"""
import time
from typing import cast

import numpy as np

try:
import triton_python_backend_utils as pb_utils
except ImportError:
import tests.stub.triton_python_backend_utils

pb_utils: tests.stub.triton_python_backend_utils = cast(
tests.stub.triton_python_backend_utils, None
)


def breakpoint():
import pydevd_pycharm

pydevd_pycharm.settrace(
'host.docker.internal', port=5858, stdoutToServer=True, stderrToServer=True
)


class TritonPythonModel:
def initialize(self, args):
import triton_python_backend_utils
self.shm = triton_python_backend_utils.shared_memory
self.candidates_cache = np.random.random((500000, 200)).astype(np.float32)

def execute_request(self, request):
n = pb_utils.get_input_tensor_by_name(request, "n").as_numpy()[0]
candidates = np.random.randint(100000, size=int(n))

context_array = np.random.random((10, 200)).astype(np.float32)
candidates_array = np.take(self.candidates_cache, candidates, axis=0)

candidate_tensor = pb_utils.Tensor(
"candidatesss",
candidates_array,
)

context_tensor = pb_utils.Tensor(
"user_history",
context_array,
)

inference_response = pb_utils.InferenceRequest(
model_name="category_tensorflow_model",
requested_output_names=["scores"],
inputs=[candidate_tensor, context_tensor],
).exec()

if inference_response.has_error():
raise pb_utils.TritonModelException(inference_response.error().message())
else:
scores = pb_utils.get_output_tensor_by_name(inference_response, "scores")

out_scores = pb_utils.Tensor("scores", scores.as_numpy()[:400])

response = pb_utils.InferenceResponse(output_tensors=[out_scores])

return response

def execute(self, requests):
return [self.execute_request(request) for request in requests]
22 changes: 22 additions & 0 deletions models/test_bls_before/config.pbtxt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: "test_bls_before"
backend: "python"

input [
{
name: "n"
data_type: TYPE_INT32
dims: [ -1]

}
]

output [
{
name: "scores"
data_type: TYPE_FP32
dims: [ -1 ]
}
]


instance_group [{ kind: KIND_CPU }]
56 changes: 56 additions & 0 deletions models/test_take/1/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
Category model
"""
import time
from typing import cast
import timeit
import numpy as np

try:
import triton_python_backend_utils as pb_utils
except ImportError:
import tests.stub.triton_python_backend_utils

pb_utils: tests.stub.triton_python_backend_utils = cast(
tests.stub.triton_python_backend_utils, None
)


def breakpoint():
import pydevd_pycharm

pydevd_pycharm.settrace(
'host.docker.internal', port=5858, stdoutToServer=True, stderrToServer=True
)


class TritonPythonModel:
def initialize(self, args):
import triton_python_backend_utils
shm = triton_python_backend_utils.shared_memory
n = 100000
candidate_tensor = pb_utils.new_shm_tensor("candidatesss", shm, (n, 200), np.float32) # Offset is 68
buffer = candidate_tensor.as_numpy()

pb_utils.Logger.log_error(f"buffer - {buffer}, {buffer.dtype}, {buffer.shape}, {buffer.flags}, {buffer.base}")
candidates_cache = np.random.random((500000, 200)).astype(np.float32)
candidates = np.random.randint(100000, size=n)
np_out = np.empty((n, 200), dtype=np.float32)

r1 = timeit.timeit("buffer[:] = np.take(candidates_cache, candidates, axis=0, mode='clip')", number=100, globals={"candidates_cache":candidates_cache, "candidates":candidates, "buffer": buffer, "np":np})*10
r2 = timeit.timeit("np.take(candidates_cache, candidates, axis=0, mode='clip', out=buffer)", number=100, globals={"candidates_cache":candidates_cache, "candidates":candidates, "buffer": buffer, "np":np})*10
r3 = timeit.timeit("r = np.take(candidates_cache, candidates, axis=0, mode='clip')", number=100, globals={"candidates_cache":candidates_cache, "candidates":candidates, "buffer": buffer, "np":np})*10
r4 = timeit.timeit("np.take(candidates_cache, candidates, axis=0, mode='clip', out=np_out)", number=100, globals={"candidates_cache":candidates_cache, "candidates":candidates, "buffer": buffer, "np":np, "np_out":np_out})*10

pb_utils.Logger.log_error(f"Buffer - assignment - {r1}")
pb_utils.Logger.log_error(f"Buffer - output - {r2}")
pb_utils.Logger.log_error(f"Baseline - assignment - {r3}")
pb_utils.Logger.log_error(f"Baseline - np out - {r4}")
pb_utils.Logger.log_error(f"numpy version {np.__version__}")


def execute_request(self, request):
pass

def execute(self, requests):
return [self.execute_request(request) for request in requests]
22 changes: 22 additions & 0 deletions models/test_take/config.pbtxt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: "test_take"
backend: "python"

input [
{
name: "n"
data_type: TYPE_INT32
dims: [ -1]

}
]

output [
{
name: "scores"
data_type: TYPE_FP32
dims: [ -1 ]
}
]


instance_group [{ kind: KIND_CPU }]
9 changes: 8 additions & 1 deletion src/pb_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,12 @@ Stub::StubSetup()
py::setattr(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove all the changes except the ones in the src directory.

python_backend_utils, "MetricFamily",
c_python_backend_utils.attr("MetricFamily"));
py::setattr(
python_backend_utils, "new_shm_tensor",
c_python_backend_utils.attr("new_shm_tensor"));

c_python_backend_utils.attr("shared_memory") = py::cast(shm_pool_.get());
python_backend_utils.attr("shared_memory") = py::cast(shm_pool_.get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed.


deserialize_bytes_ = python_backend_utils.attr("deserialize_bytes_tensor");
serialize_bytes_ = python_backend_utils.attr("serialize_byte_tensor");
Expand Down Expand Up @@ -494,6 +498,7 @@ Stub::Initialize(bi::managed_external_buffer::handle_t map_handle)
python_backend_utils, "InferenceResponse",
c_python_backend_utils.attr("InferenceResponse"));
c_python_backend_utils.attr("shared_memory") = py::cast(shm_pool_.get());
python_backend_utils.attr("shared_memory") = py::cast(shm_pool_.get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not required.


py::object TritonPythonModel = sys.attr("TritonPythonModel");
deserialize_bytes_ = python_backend_utils.attr("deserialize_bytes_tensor");
Expand Down Expand Up @@ -1516,7 +1521,7 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module)
.def("get_response_sender", &InferRequest::GetResponseSender);

py::class_<PbTensor, std::shared_ptr<PbTensor>>(module, "Tensor")
.def(py::init(&PbTensor::FromNumpy))
.def(py::init(&PbTensor::FromNumpy), py::arg("name"), py::arg("numpy_array"))
.def("name", &PbTensor::Name)
// The reference_internal is added to make sure that the NumPy object has
// the same lifetime as the tensor object. This means even when the NumPy
Expand Down Expand Up @@ -1603,6 +1608,8 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module)

py::register_exception<PythonBackendException>(
module, "TritonModelException");

module.def("new_shm_tensor", &PbTensor::CreateInSHM, "Creates a new Tensor directly into shared memory");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this to pb.Tensor.new(shape, dtype, device='cpu')?

}

extern "C" {
Expand Down
Loading