Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

leak memory #8026

Open
aTunass opened this issue Feb 21, 2025 · 1 comment
Open

leak memory #8026

aTunass opened this issue Feb 21, 2025 · 1 comment
Labels
memory Related to memory usage, memory growth, and memory leaks

Comments

@aTunass
Copy link

aTunass commented Feb 21, 2025

I tried custom backend for my text generating model. I have allocated some memory and released them. However, to match the logic of the model, I have changed them about the request and response process. I tried running it and it successfully loaded the model, but I found there was a memory leak. My code is based on the bls_custom example. I tried observing from the original code and my code and actually saw that my code's memory increased (1000 requests increased by about 3Mb) while the original code would increase and decrease. Can you take a look and show me where the problem is? I'm also new to C++, so the code will have a lot of confusing places

#include "bls.h"
#include <algorithm>
#include <cmath>
#include <numeric>  

namespace triton { namespace backend { namespace bls {

BLSExecutor::BLSExecutor(TRITONSERVER_Server* server)
    : server_(server), model_executor_(server)
{
}

std::vector<int64_t> BLSExecutor::concat_along_column(
    const std::vector<int64_t>& original, int batch_size, int old_columns,
    const std::vector<int64_t>& new_column) 
{
    int new_columns = old_columns + 1; // Số cột mới sau khi nối
    std::vector<int64_t> result(batch_size * new_columns); // Mảng kết quả

    for (int i = 0; i < batch_size; ++i) {
        
        for (int j = 0; j < old_columns; ++j) {
            result[i * new_columns + j] = original[i * old_columns + j];
        }
       
        result[i * new_columns + old_columns] = new_column[i];
    }

    return result;
}

std::vector<float> BLSExecutor::concat_along_column_float(
    const std::vector<float>& original, int batch_size, int old_columns,
    const std::vector<float>& new_column) 
{
    int new_columns = old_columns + 1; // Số cột mới sau khi nối
    std::vector<float> result(batch_size * new_columns); // Mảng kết quả

    for (int i = 0; i < batch_size; ++i) {
        // Copy dữ liệu cũ
        for (int j = 0; j < old_columns; ++j) {
            result[i * new_columns + j] = original[i * old_columns + j];
        }
        // Gán dữ liệu từ new_column vào cột mới
        result[i * new_columns + old_columns] = new_column[i];
    }

    return result;
}

std::vector<std::vector<float>> BLSExecutor::splitIntoBatches(const std::vector<float>& data, size_t batch_size) {
    std::vector<std::vector<float>> batches;

    if (data.size() % batch_size != 0) {
        throw std::runtime_error("Data size is not a multiple of batch size!");
    }

    size_t num_batches = data.size() / batch_size;
    
    for (size_t i = 0; i < num_batches; i++) {
        batches.emplace_back(data.begin() + i * batch_size, data.begin() + (i + 1) * batch_size);
    }

    return batches;
}

std::vector<float> BLSExecutor::softmax(const std::vector<float>& input) {
    std::vector<float> output(input.size());
    float max_val = *std::max_element(input.begin(), input.end()); // Tránh tràn số
    float sum_exp = 0.0;

    for (size_t i = 0; i < input.size(); i++) {
        output[i] = std::exp(input[i] - max_val);
        sum_exp += output[i];
    }

    if (sum_exp == 0.0) {
        throw std::runtime_error("Softmax sum is zero, possible numerical instability.");
    }


    for (size_t i = 0; i < input.size(); i++) {
        output[i] /= sum_exp;
    }

    return output;
}

std::pair<std::vector<float>, std::vector<int>> BLSExecutor::top_k(const std::vector<float>& scores, int k) {
    int n = scores.size();
    std::vector<int> indices(n);
    std::iota(indices.begin(), indices.end(), 0); // Tạo danh sách index [0, 1, 2, ...]

    std::partial_sort(indices.begin(), indices.begin() + k, indices.end(),
        [&](int a, int b) { return scores[a] > scores[b]; });

    std::vector<float> top_values(k);
    std::vector<int> top_indices(k);
    for (int i = 0; i < k; i++) {
        top_values[i] = scores[indices[i]];
        top_indices[i] = indices[i];
    }

    return {top_values, top_indices};
}

void BLSExecutor::FreeAllocatedBuffers() {
    for (void* ptr : allocated_buffers) {
        free(ptr);
    }
    allocated_buffers.clear();  // Xóa danh sách sau khi giải phóng
}

BLSExecutor::~BLSExecutor() {
    FreeAllocatedBuffers();
}

TRITONSERVER_Error*
BLSExecutor::PrepareInferenceRequest(
    TRITONBACKEND_Request* bls_request,
    TRITONSERVER_InferenceRequest** irequest, const std::string model_name)
{

  const char* request_id;
  uint64_t correlation_id;
  uint32_t flags;
  RETURN_IF_ERROR(TRITONBACKEND_RequestId(bls_request, &request_id));
  RETURN_IF_ERROR(
      TRITONBACKEND_RequestCorrelationId(bls_request, &correlation_id));
  RETURN_IF_ERROR(TRITONBACKEND_RequestFlags(bls_request, &flags));


  RETURN_IF_ERROR(TRITONSERVER_InferenceRequestNew(
      irequest, server_, model_name.c_str(), -1 /* model_version */));

  RETURN_IF_ERROR(TRITONSERVER_InferenceRequestSetId(*irequest, request_id));
  RETURN_IF_ERROR(
      TRITONSERVER_InferenceRequestSetCorrelationId(*irequest, correlation_id));
  RETURN_IF_ERROR(TRITONSERVER_InferenceRequestSetFlags(*irequest, flags));
  RETURN_IF_ERROR(TRITONSERVER_InferenceRequestSetReleaseCallback(
      *irequest, InferRequestComplete, nullptr /* request_release_userp */));

  return nullptr;  
}

TRITONSERVER_Error*
BLSExecutor::PrepareInferenceInput(
    TRITONBACKEND_Request* bls_request, TRITONSERVER_InferenceRequest* irequest,
    std::vector<int64_t>& tgt_inp)
{

  uint32_t input_count;
  RETURN_IF_ERROR(TRITONBACKEND_RequestInputCount(bls_request, &input_count));

  TRITONBACKEND_Input* input;
  const char* name;
  TRITONSERVER_DataType datatype;
  const int64_t* shape;
  uint32_t dims_count;
  size_t data_byte_size;
  TRITONSERVER_MemoryType data_memory_type;
  int64_t data_memory_id;
  const char* data_buffer;

  for (size_t count = 0; count < input_count; count++) { 
    RETURN_IF_ERROR(TRITONBACKEND_RequestInputByIndex(
        bls_request, count /* index */, &input));
    RETURN_IF_ERROR(TRITONBACKEND_InputProperties(
        input, &name, &datatype, &shape, &dims_count, nullptr, nullptr));
    RETURN_IF_ERROR(TRITONBACKEND_InputBuffer(
        input, 0 /* idx */, reinterpret_cast<const void**>(&data_buffer),
        &data_byte_size, &data_memory_type, &data_memory_id));

    const char* modified_name = (std::strcmp(name, "hidden") == 0) ? "hidden_input" : name;

    RETURN_IF_ERROR(TRITONSERVER_InferenceRequestAddInput(
        irequest, modified_name, datatype, shape, dims_count));
    RETURN_IF_ERROR(TRITONSERVER_InferenceRequestAppendInputData(
        irequest, modified_name, data_buffer, data_byte_size, data_memory_type,
        data_memory_id));
  }


  const char* new_input_name = "tgt_inp";
  batch_size_ = shape[0];
  TRITONSERVER_DataType new_input_dtype = TRITONSERVER_TYPE_INT64;  // torch.long tương ứng với int64

  size_t total_elements = tgt_inp.size();
  const void* input_data_ptr = tgt_inp.data();
  size_t input_data_byte_size = tgt_inp.size() * sizeof(int64_t);

  int remaining_dim = total_elements / batch_size_;
  int64_t new_input_shape[] = {batch_size_, remaining_dim};  // Giống như tensor torch.full((1, batch_size))
  uint32_t new_input_dims_count = 2;
  
  RETURN_IF_ERROR(TRITONSERVER_InferenceRequestAddInput(
    irequest, new_input_name, new_input_dtype, new_input_shape, new_input_dims_count));
  std::vector<int64_t> translated_sentence(batch_size_ * 1, 1); 
  RETURN_IF_ERROR(TRITONSERVER_InferenceRequestAppendInputData(
    irequest, new_input_name, input_data_ptr, input_data_byte_size, 
    data_memory_type, data_memory_id));  

  return nullptr;  // success
}

TRITONSERVER_Error* 
BLSExecutor::GetBatchSize(
    TRITONBACKEND_Request* bls_request) {
  uint32_t input_count;
  RETURN_IF_ERROR(TRITONBACKEND_RequestInputCount(bls_request, &input_count));
  
  if (input_count == 0) {
    return TRITONSERVER_ErrorNew(
        TRITONSERVER_ERROR_INVALID_ARG,
        "Request does not contain any inputs");
  }

  TRITONBACKEND_Input* input;
  const int64_t* shape;
  uint32_t dims_count;


  RETURN_IF_ERROR(TRITONBACKEND_RequestInputByIndex(
      bls_request, 0 /* index */, &input));
  RETURN_IF_ERROR(TRITONBACKEND_InputProperties(
      input, nullptr, nullptr, &shape, &dims_count, nullptr, nullptr));
  
  if (dims_count == 0) {
    return TRITONSERVER_ErrorNew(
        TRITONSERVER_ERROR_INVALID_ARG,
        "Input tensor has no dimensions");
  }

  batch_size_ = shape[0];
  return nullptr;  // success
}

TRITONSERVER_Error*
BLSExecutor::CustomPrepareInferenceInput(
    TRITONBACKEND_Request* bls_request, TRITONSERVER_InferenceRequest* irequest, std::vector<uint8_t>& output_data, std::vector<int64_t>& tgt_inp)
{

  uint32_t input_count;
  RETURN_IF_ERROR(TRITONBACKEND_RequestInputCount(bls_request, &input_count));

  TRITONBACKEND_Input* input;
  const char* name;
  TRITONSERVER_DataType datatype;
  const int64_t* shape;
  uint32_t dims_count;
  size_t data_byte_size;
  TRITONSERVER_MemoryType data_memory_type;
  int64_t data_memory_id;
  const char* data_buffer;

  for (size_t count = 0; count < input_count; count++) { 
    RETURN_IF_ERROR(TRITONBACKEND_RequestInputByIndex(
        bls_request, count /* index */, &input));
    RETURN_IF_ERROR(TRITONBACKEND_InputProperties(
        input, &name, &datatype, &shape, &dims_count, nullptr, nullptr));
    RETURN_IF_ERROR(TRITONBACKEND_InputBuffer(
        input, 0 /* idx */, reinterpret_cast<const void**>(&data_buffer),
        &data_byte_size, &data_memory_type, &data_memory_id));

    const char* modified_name = (std::strcmp(name, "hidden") == 0) ? "hidden_input" : name;

    if (std::strcmp(modified_name, "hidden_input") == 0) {
      void* aligned_memory = std::aligned_alloc(64, output_data.size()*sizeof(float));
      memcpy(aligned_memory, output_data.data(), output_data.size());
      allocated_buffers.push_back(aligned_memory);
      RETURN_IF_ERROR(TRITONSERVER_InferenceRequestAddInput(
        irequest, modified_name, datatype, shape, dims_count));
      RETURN_IF_ERROR(TRITONSERVER_InferenceRequestAppendInputData(
        irequest, modified_name, aligned_memory, data_byte_size, data_memory_type,
        data_memory_id));
    } else {
      RETURN_IF_ERROR(TRITONSERVER_InferenceRequestAddInput(
        irequest, modified_name, datatype, shape, dims_count));
      RETURN_IF_ERROR(TRITONSERVER_InferenceRequestAppendInputData(
          irequest, modified_name, data_buffer, data_byte_size, data_memory_type,
          data_memory_id));
    }
  }

  const char* new_input_name = "tgt_inp";
  batch_size_ = shape[0];
  TRITONSERVER_DataType new_input_dtype = TRITONSERVER_TYPE_INT64;  // torch.long tương ứng với int64

  size_t total_elements = tgt_inp.size();
  const void* input_data_ptr = tgt_inp.data();
  size_t input_data_byte_size = tgt_inp.size() * sizeof(int64_t);

  int remaining_dim = total_elements / batch_size_;
  int64_t new_input_shape[] = {batch_size_, remaining_dim};  // Giống như tensor torch.full((1, batch_size))
  uint32_t new_input_dims_count = 2;
  
  RETURN_IF_ERROR(TRITONSERVER_InferenceRequestAddInput(
    irequest, new_input_name, new_input_dtype, new_input_shape, new_input_dims_count));
  std::vector<int64_t> translated_sentence(batch_size_ * 1, 1);  
  RETURN_IF_ERROR(TRITONSERVER_InferenceRequestAppendInputData(
    irequest, new_input_name, input_data_ptr, input_data_byte_size, 
    data_memory_type, data_memory_id));  
  return nullptr;  // success
}

TRITONSERVER_Error*
BLSExecutor::PrepareInferenceOutput(
    TRITONBACKEND_Request* bls_request, TRITONSERVER_InferenceRequest* irequest)
{

  uint32_t output_count;
  RETURN_IF_ERROR(TRITONBACKEND_RequestOutputCount(bls_request, &output_count));
  const char* output_name;
  for (size_t count = 0; count < output_count; count++) {
    RETURN_IF_ERROR(TRITONBACKEND_RequestOutputName(
        bls_request, count /* index */, &output_name));
    
    if (std::strcmp(output_name, "token_text") == 0) {
      output_name = "output";
    }
    if (std::strcmp(output_name, "prob_text") == 0) {
      output_name = "hidden_output";
    }

    RETURN_IF_ERROR(
        TRITONSERVER_InferenceRequestAddRequestedOutput(irequest, output_name));
  }

  return nullptr;  // success
}

void
BLSExecutor::Execute(
    TRITONBACKEND_Request* bls_request, TRITONBACKEND_Response** response)
{

  std::vector<std::string> model_names = {"vietocr_decoder.onnx.gpu_t4"};
  size_t num_model = model_names.size();

  // Check if both models are valid before executing request.
  try {
    for (size_t i = 0; i < num_model; i++) {
      // Check if the model is ready.
      bool is_ready = false;
      THROW_IF_TRITON_ERROR(TRITONSERVER_ServerModelIsReady(
          server_, model_names[i].c_str(), -1 /* model_version */, &is_ready));
      if (!is_ready) {
        throw BLSBackendException(
            (std::string("Failed to execute the inference request. Model '") +
             model_names[i].c_str() + "' is not ready.")
                .c_str());
      }

      uint32_t txn_flags;
      THROW_IF_TRITON_ERROR(TRITONSERVER_ServerModelTransactionProperties(
          server_, model_names[i].c_str(), -1 /* model_version */, &txn_flags,
          nullptr /* voidp */));
      if ((txn_flags & TRITONSERVER_TXN_DECOUPLED) != 0) {
        throw BLSBackendException(
            std::string("Model '") + model_names[i].c_str() +
            "' is using the decoupled. This BLS Backend doesn't support models "
            "using the decoupled transaction policy.");
      }
    }
  }
  catch (const BLSBackendException& bls_exception) {
    LOG_MESSAGE(TRITONSERVER_LOG_ERROR, bls_exception.what());
    RESPOND_AND_SET_NULL_IF_ERROR(
        response,
        TRITONSERVER_ErrorNew(
            TRITONSERVER_ERROR_INTERNAL, "Failed to send inference requests"));
    return;
  }

  GetBatchSize(bls_request);

  std::vector<std::vector<uint8_t>> output_data;
  int max_length = 0;
  int max_seq_length = 1;
  std::vector<int64_t> translated_sentence(batch_size_ * 1, 1); 
  std::vector<float> char_probs(batch_size_ * 1, 1.0f); 
  
  while (max_length <= max_seq_length) {
    std::vector<std::future<TRITONSERVER_InferenceResponse*>> futures(num_model);
    TRITONSERVER_InferenceRequest* irequest = nullptr;

    try {
      for (size_t icount = 0; icount < num_model; icount++) {
        THROW_IF_TRITON_ERROR(
          PrepareInferenceRequest(bls_request, &irequest, model_names[icount]));
        if (max_length==0){
          THROW_IF_TRITON_ERROR(PrepareInferenceInput(bls_request, irequest, translated_sentence));
        } else {
          THROW_IF_TRITON_ERROR(CustomPrepareInferenceInput(bls_request, irequest, output_data[0], translated_sentence));
        }
        THROW_IF_TRITON_ERROR(PrepareInferenceOutput(bls_request, irequest));
        THROW_IF_TRITON_ERROR(
            model_executor_.AsyncExecute(irequest, &futures[icount]));
      }
    }
    catch (const BLSBackendException& bls_exception) {
      LOG_MESSAGE(TRITONSERVER_LOG_ERROR, bls_exception.what());
      LOG_IF_ERROR(
          TRITONSERVER_InferenceRequestDelete(irequest),
          "Failed to delete inference request.");
      RESPOND_AND_SET_NULL_IF_ERROR(
          response,
          TRITONSERVER_ErrorNew(
              TRITONSERVER_ERROR_INTERNAL, "Failed to send inference requests"));
      return;
    }

    for (auto& future : futures) {
      future.wait();
    }
    FreeAllocatedBuffers();

    if (max_length==0){
      int64_t output_shape_token_text[] = {batch_size_, static_cast<int64_t>(translated_sentence.size()/batch_size_)};
      SendData(response, "token_text", TRITONSERVER_TYPE_INT64,
              output_shape_token_text, 2,
              translated_sentence.data(), translated_sentence.size() * sizeof(int64_t));

      int64_t output_shape_prob_text[] = {batch_size_, static_cast<int64_t>(char_probs.size()/batch_size_)};
      SendData(response, "prob_text", TRITONSERVER_TYPE_FP32,
              output_shape_prob_text, 2,
              char_probs.data(), char_probs.size() * sizeof(float));

      break;}
    } else {
      output_data.clear();
      output_data = CustomConstructFinalResponse(response, std::move(futures));

      if (output_data.size() < 2 || output_data[1].empty()) {
        LOG_MESSAGE(TRITONSERVER_LOG_ERROR, "Invalid output_data size. Skipping iteration.");
        return;
      }

      std::vector<float> float_data(
          reinterpret_cast<const float*>(output_data[1].data()),
          reinterpret_cast<const float*>(output_data[1].data()) + (output_data[1].size() / sizeof(float))
      );

      if (float_data.size() % 233 != 0) {
        LOG_MESSAGE(TRITONSERVER_LOG_ERROR, "Output tensor size is not divisible by 233. Skipping iteration.");
        return;
      }

      auto batches = splitIntoBatches(float_data, 233);

      std::vector<int64_t> another_vector_indices;
      another_vector_indices.reserve(batch_size_);

      std::vector<float> another_vector_value;
      another_vector_value.reserve(batch_size_);

      for (size_t i = 0; i < batches.size(); i++) {
          std::vector<float> probs = softmax(batches[i]);
          auto [top_values, top_indices] = top_k(probs, 5);

          another_vector_indices.push_back(top_indices[0]);
          another_vector_value.push_back(top_values[0]);
      }
      int old_columns = translated_sentence.size()/batch_size_;
      translated_sentence = concat_along_column(translated_sentence, batch_size_, old_columns, another_vector_indices);
      another_vector_indices.clear();

      char_probs = concat_along_column_float(char_probs, batch_size_, old_columns, another_vector_value);
      another_vector_value.clear();
    
    max_length++;
    }

  }
}

TRITONSERVER_Error* BLSExecutor::SendData(
  TRITONBACKEND_Response** response,
  const char* output_name,
  TRITONSERVER_DataType output_datatype,
  const int64_t* output_shape,
  uint64_t dims_count,
  const void* output_data,
  size_t output_byte_size)
{
  TRITONBACKEND_Output* output;
  RESPOND_AND_SET_NULL_IF_ERROR(
      response, TRITONBACKEND_ResponseOutput(
              *response, &output, output_name, output_datatype,
              output_shape, dims_count));
  
  void* output_buffer;
  TRITONSERVER_MemoryType output_memory_type = TRITONSERVER_MEMORY_CPU;
  int64_t output_memory_id = 0;
  
  RESPOND_AND_SET_NULL_IF_ERROR(
      response, TRITONBACKEND_OutputBuffer(
                    output, &output_buffer, output_byte_size,
                    &output_memory_type, &output_memory_id));

  if (output_memory_type == TRITONSERVER_MEMORY_GPU) {
      return TRITONSERVER_ErrorNew(
          TRITONSERVER_ERROR_INTERNAL,
          "Failed to create output buffer in CPU memory");
  }
  memcpy(output_buffer, output_data, output_byte_size);
  return nullptr;  // Trả về nullptr nếu không có lỗi
}


std::vector<std::vector<uint8_t>> 
BLSExecutor::CustomConstructFinalResponse(
    TRITONBACKEND_Response** response,
    std::vector<std::future<TRITONSERVER_InferenceResponse*>> futures)
{
  std::vector<std::vector<uint8_t>>  output_data;
  std::vector<TRITONSERVER_InferenceResponse*> completed_responses(futures.size(), nullptr);

  for (size_t icount = 0; icount < futures.size(); icount++) {
    if (!futures[icount].valid()) {
        std::cerr << "Future at index " << icount << " is invalid!" << std::endl;
        continue;  // Bỏ qua phần tử không hợp lệ
    }
    completed_responses[icount] = futures[icount].get();
    uint32_t output_count = 1;
    THROW_IF_TRITON_ERROR(TRITONSERVER_InferenceResponseOutputCount(completed_responses[icount], &output_count));
    FreeAllocatedBuffers();

    for (uint32_t output_idx = 0; output_idx < output_count; output_idx++) {
      const char* output_name;
      TRITONSERVER_DataType output_datatype;
      const int64_t* output_shape;
      uint64_t dims_count;
      size_t output_byte_size;
      TRITONSERVER_MemoryType output_memory_type;
      int64_t output_memory_id;
      const void* output_base;
      void* userp;

      try {
        THROW_IF_TRITON_ERROR(
            TRITONSERVER_InferenceResponseError(completed_responses[icount]));
      }
      catch (const BLSBackendException& bls_exception) {
        LOG_MESSAGE(TRITONSERVER_LOG_ERROR, bls_exception.what());

        if (completed_responses[icount] != nullptr) {
          LOG_IF_ERROR(
              TRITONSERVER_InferenceResponseDelete(completed_responses[icount]),
              "Failed to delete inference response.");
        }
        return std::vector<std::vector<uint8_t>>();
      }

      RESPOND_AND_SET_NULL_IF_ERROR(
          response,
          TRITONSERVER_InferenceResponseOutput(
              completed_responses[icount], output_idx, &output_name, &output_datatype,
              &output_shape, &dims_count, &output_base, &output_byte_size,
              &output_memory_type, &output_memory_id, &userp));

      output_data.emplace_back();  
      output_data.back().resize(output_byte_size);  
      memcpy(output_data.back().data(), output_base, output_byte_size);

    }
    LOG_IF_ERROR(
        TRITONSERVER_InferenceResponseDelete(completed_responses[icount]),
        "Failed to delete inference response.");
  }
  FreeAllocatedBuffers();
  return output_data;
}

}}}  // namespace triton::backend::bls
@rmccorm4 rmccorm4 added the memory Related to memory usage, memory growth, and memory leaks label Feb 21, 2025
@rmccorm4
Copy link
Contributor

rmccorm4 commented Feb 21, 2025

Hi @aTunass thanks for raising an issue!

CC @krishung5 for viz

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
memory Related to memory usage, memory growth, and memory leaks
Development

No branches or pull requests

2 participants