Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example usage of kafka endpoints as an input and output for triton server using Python API #92

Merged
merged 13 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions Triton_Inference_Server_Python_API/examples/kafka-io/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
<!--
# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-->

# Triton Inference Server Kafka I/O Deployment

Using the Triton Inference Server In-Process Python API you can
integrate triton server based models into any Python framework
to consume the messages from a Kafka topic and produce the inference results
back to the kafka topic of choice.

This directory contains an example Triton Inference Server
deployment based on Kafka I/O that uses threads for each of server, consumer and producer.

| [Installation](#installation) | [Run Deployment](#starting-the-pipeline) | [Send Requests](#send-requests-to-deployment) |


## Installation

In this Kafka I/O pipeline we deploy a pre-processing stage of tokenization based on `transformers` tokenization module and can be extended to any type of models as needed.

### Pre-requisite
1. [Docker](https://docs.docker.com/engine/install/)

### Starting docker container
Once you have the docker service up and running, launch a container by executing the following command:

```bash
docker run --rm -it --gpus all -v <path>/<to>/tutorials/Triton_Inference_Server_Python_API/examples/kafka-io/:/opt/tritonserver/kafka-io -w /opt/tritonserver/kafka-io --entrypoint bash nvcr.io/nvidia/tritonserver:24.06-py3
```

### Clone Repository
pthalasta marked this conversation as resolved.
Show resolved Hide resolved

```bash
git clone https://github.com/triton-inference-server/tutorials.git
cd tutorials/Triton_Inference_Server_Python_API/examples/kafka-io
```

*Note: Skip this step if you have mounted the git repository from local directory to the docker container*


### Install dependencies
pthalasta marked this conversation as resolved.
Show resolved Hide resolved

Please note that installation times may vary depending on
your hardware configuration and network connection.


```bash
pip install -r requirements.txt
```

If triton server is not already installed, install the dependency by using the following command.

```bash
pip install /opt/tritonserver/python/tritonserver-2.44.0-py3-none-any.whl
pthalasta marked this conversation as resolved.
Show resolved Hide resolved
```

Next run the provided `start-kafka.sh` script that will perform the following actions:
1. Download kafka and it's dependencies
2. Start Kafka service by starting Zookeeper and Kafka brokers
3. Create 2 new topics with names `inference-input` as input queue and `inference-output` to store the inference results

```bash
chmod +x start-kafka.sh
./start-kafka.sh
```

## Starting the pipeline

### Start the inference pipeline

Run the provided `start-server.sh` script that will perform the following actions:
1. Export Kafka Producer and Consumer configs, topic names for input and output topics, model name and repositories.
2. Start the server.

```bash
chmod +x start-server.sh
./start-server.sh
```

pthalasta marked this conversation as resolved.
Show resolved Hide resolved
*Note: In the above invocation, we are using default of 1 thread for kafka consumer, however, if you need to increase the concurrency, please set the environment variable `KAFKA_CONSUMER_MAX_WORKER_THREADS` to the desired value and restart the server. This should start the server with new concurrency of the consumer to increase the throughput of the deployment*

## Send Requests to Deployment

In order to send requests to inference pipeline deployed, produce messages into the input kafka topic using the following command.

```bash
cd kafka_2.13-3.7.0
bin/kafka-console-producer.sh --topic inference-input --bootstrap-server localhost:9092
```

Once, the above command has been executed, you should see a prompt `>` to start ingesting the messages to the input topic.

```bash
> this is a sample message
>
```

Once you have produced enough messages, you can exit the prompt by pressing `Ctrl+C`.

#### Example Output
Once the workflow consumes the ingested messages from the kafka topic, it invokes the triton server and produces the inference output as `json` string to the output kafka topic. Once the message has been ingested, we can start the consumer to see the output messages from the pipeline ingested to the output topic

```bash
bin/kafka-console-consumer.sh --topic inference-output --from-beginning --bootstrap-server localhost:9092
```

Since, our example has a tokenizer deployed as a custom model in triton, we should see an output inserted into kafka topic as shown below.

```bash
{"model": {"name": "tokenizer", "version": 1, "state": null, "reason": null}, "request_id": "", "parameters": {}, "outputs": {"input_ids": [[101, 1142, 1110, 2774, 3802, 118, 1207, 130, 102, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]], "token_type_ids": [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]], "attention_mask": [[1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]}, "error": null, "classification_label": null, "final": true}
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from typing import Dict, List

import numpy as np
import triton_python_backend_utils as pb_utils
from transformers import BertTokenizerFast, TensorType


class TritonPythonModel:
tokenizer: BertTokenizerFast

def initialize(self, args: Dict[str, str]) -> None:
"""
Initialize the tokenization process
:param args: arguments from Triton config file
"""
self.tokenizer = BertTokenizerFast.from_pretrained("bert-base-cased")

def execute(self, requests) -> "List[List[pb_utils.Tensor]]":
"""
Parse and tokenize each request
:param requests: 1 or more requests received by Triton server.
:return: text as input tensors
"""
responses = []
# for loop for batch requests (disabled in our case)
for request in requests:
# binary data typed back to string
query = [
t.decode("UTF-8")
for t in pb_utils.get_input_tensor_by_name(request, "TEXT")
.as_numpy()
.tolist()
]
tokens: Dict[str, np.ndarray] = self.tokenizer(
text=query,
return_tensors=TensorType.NUMPY,
padding="max_length",
max_length=256,
truncation=True,
)
# tensorrt uses int32 as input type, ort uses int64
tokens = {k: v.astype(np.int64) for k, v in tokens.items()}
# communicate the tokenization results to Triton server
outputs = list()
for input_name in self.tokenizer.model_input_names:
tensor_input = pb_utils.Tensor(input_name, tokens[input_name])
outputs.append(tensor_input)

inference_response = pb_utils.InferenceResponse(output_tensors=outputs)
responses.append(inference_response)

return responses

def finalize(self):
"""`finalize` is called only once when the model is being unloaded.
Implementing `finalize` function is OPTIONAL. This function allows
the model to perform any necessary clean ups before exit.
"""
print("Cleaning up...")
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: "tokenizer"
max_batch_size: 0
backend: "python"

input [
{
name: "TEXT"
data_type: TYPE_STRING
dims: [ -1 ]
}
]

output [
{
name: "input_ids"
data_type: TYPE_INT64
dims: [-1, 256]
},
{
name: "attention_mask"
data_type: TYPE_INT64
dims: [-1, 256]
},
{
name: "token_type_ids"
data_type: TYPE_INT64
dims: [ -1, 256 ]
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
click==8.1.7
confluent_kafka==2.5.0
gcn-kafka==0.3.3
jsonschema==4.23.0
pandas==2.2.2
ray==2.32.0
ray[serve]==2.32.0
torch==2.3.1
transformers==4.42.4
tritonclient==2.47.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/sh
export DEBIAN_FRONTEND=noninteractive

wget https://dlcdn.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
tar -xzf kafka_2.13-3.7.0.tgz
cd kafka_2.13-3.7.0

echo "Setting up JAVA 17"
apt-get update -q -y
apt install -q -y openjdk-17-jdk openjdk-17-jre

echo "Configuring brokers to localhost for kafka server"
sed -i -e 's/#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/localhost:9092/g' config/server.properties

echo "Starting zookeeper"
nohup bin/zookeeper-server-start.sh -daemon config/zookeeper.properties > /dev/null 2>&1 &
sleep 5
echo "Successfully started zookeeper, starting kafka brokers"
nohup bin/kafka-server-start.sh -daemon config/server.properties > /dev/null 2>&1 &
sleep 5
echo "Successfully started kafka brokers, creating input and output topics..."

bin/kafka-topics.sh --create --topic inference-input --bootstrap-server localhost:9092
bin/kafka-topics.sh --create --topic inference-output --bootstrap-server localhost:9092

echo "Successfully created topics.\nInput topic: inference-input\nOutput topic: inference-output"

echo "Topic description:"
bin/kafka-topics.sh --describe --topic inference-input --bootstrap-server localhost:9092
bin/kafka-topics.sh --describe --topic inference-output --bootstrap-server localhost:9092
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/sh

export KAFKA_CONSUMER_MAX_WORKER_THREADS=1
export CONSUMER_CONFIGS='{"bootstrap.servers": "localhost:9092", "security.protocol": "PLAINTEXT", "group.id": "triton-server-kafka-consumer"}'
export PRODUCER_CONFIGS='{"bootstrap.servers": "localhost:9092", "security.protocol": "PLAINTEXT"}'
export CONSUMER_TOPICS='inference-input'
export PRODUCER_TOPIC='inference-output'
export MODEL_INPUT_NAME='TEXT'
export MODEL_NAME='tokenizer'
export MODEL_REPOSITORY='./models'

nohup serve run tritonserver_deployment:entrypoint &
pthalasta marked this conversation as resolved.
Show resolved Hide resolved
tail -f nohup.out
Loading