From edc03b340504c1843281f18f5600c2f5ed46e7f8 Mon Sep 17 00:00:00 2001 From: Matthew Kotila Date: Fri, 28 Jul 2023 11:49:01 -0700 Subject: [PATCH] Store sequence ID in timestamps tuple object (#367) * Store sequence ID in timestamps tuple object * Fix bug and address feedback * Address feedback * Address feedback --- src/c++/perf_analyzer/CMakeLists.txt | 1 + .../client_backend/mock_client_backend.h | 45 ++++-- src/c++/perf_analyzer/infer_context.cc | 38 +++-- src/c++/perf_analyzer/infer_context.h | 36 ++--- src/c++/perf_analyzer/inference_profiler.cc | 40 ++--- src/c++/perf_analyzer/inference_profiler.h | 10 +- src/c++/perf_analyzer/load_manager.cc | 19 ++- src/c++/perf_analyzer/load_manager.h | 8 +- src/c++/perf_analyzer/mock_infer_context.h | 28 +++- .../perf_analyzer/mock_inference_profiler.h | 3 +- src/c++/perf_analyzer/perf_utils.h | 4 - src/c++/perf_analyzer/request_record.h | 61 +++++++ src/c++/perf_analyzer/sequence_manager.cc | 8 +- src/c++/perf_analyzer/sequence_manager.h | 6 + src/c++/perf_analyzer/test_infer_context.cc | 57 ++++++- .../perf_analyzer/test_inference_profiler.cc | 64 ++++---- src/c++/perf_analyzer/test_load_manager.cc | 152 ++++++++++-------- .../perf_analyzer/test_sequence_manager.cc | 12 ++ 18 files changed, 390 insertions(+), 202 deletions(-) create mode 100644 src/c++/perf_analyzer/request_record.h diff --git a/src/c++/perf_analyzer/CMakeLists.txt b/src/c++/perf_analyzer/CMakeLists.txt index 517f5bc9d..089cc1c28 100644 --- a/src/c++/perf_analyzer/CMakeLists.txt +++ b/src/c++/perf_analyzer/CMakeLists.txt @@ -104,6 +104,7 @@ set( concurrency_ctx_id_tracker.h fifo_ctx_id_tracker.h rand_ctx_id_tracker.h + request_record.h ) add_executable( diff --git a/src/c++/perf_analyzer/client_backend/mock_client_backend.h b/src/c++/perf_analyzer/client_backend/mock_client_backend.h index ddc14f663..483af914d 100644 --- a/src/c++/perf_analyzer/client_backend/mock_client_backend.h +++ b/src/c++/perf_analyzer/client_backend/mock_client_backend.h @@ -103,7 +103,7 @@ class MockInferInput : public InferInput { /// class MockInferResult : public InferResult { public: - MockInferResult(const InferOptions& options) : req_id_{options.request_id_} {} + MockInferResult(const InferOptions& options) : req_id_(options.request_id_) {} Error Id(std::string* id) const override { @@ -468,14 +468,36 @@ class MockClientStats { /// Mock implementation of ClientBackend interface /// -class MockClientBackend : public ClientBackend { +class NaggyMockClientBackend : public ClientBackend { public: - MockClientBackend(std::shared_ptr stats) { stats_ = stats; } + NaggyMockClientBackend(std::shared_ptr stats) : stats_(stats) + { + ON_CALL(*this, AsyncStreamInfer(testing::_, testing::_, testing::_)) + .WillByDefault( + [this]( + const InferOptions& options, + const std::vector& inputs, + const std::vector& outputs) + -> Error { + stats_->CaptureRequest( + MockClientStats::ReqType::ASYNC_STREAM, options, inputs, + outputs); + + LaunchAsyncMockRequest(options, stream_callback_); + + return stats_->GetNextReturnStatus(); + }); + } MOCK_METHOD( Error, ModelConfig, (rapidjson::Document*, const std::string&, const std::string&), (override)); + MOCK_METHOD( + Error, AsyncStreamInfer, + (const InferOptions&, const std::vector&, + const std::vector&), + (override)); Error Infer( InferResult** result, const InferOptions& options, @@ -506,18 +528,6 @@ class MockClientBackend : public ClientBackend { return stats_->GetNextReturnStatus(); } - Error AsyncStreamInfer( - const InferOptions& options, const std::vector& inputs, - const std::vector& outputs) - { - stats_->CaptureRequest( - MockClientStats::ReqType::ASYNC_STREAM, options, inputs, outputs); - - LaunchAsyncMockRequest(options, stream_callback_); - - return stats_->GetNextReturnStatus(); - } - Error StartStream(OnCompleteFn callback, bool enable_stats) { stats_->CaptureStreamStart(); @@ -601,6 +611,8 @@ class MockClientBackend : public ClientBackend { return Error::Success; } + OnCompleteFn stream_callback_; + private: void LaunchAsyncMockRequest(const InferOptions options, OnCompleteFn callback) { @@ -619,9 +631,10 @@ class MockClientBackend : public ClientBackend { size_t local_completed_req_count_ = 0; std::shared_ptr stats_; - OnCompleteFn stream_callback_; }; +using MockClientBackend = testing::NiceMock; + /// Mock factory that always creates a MockClientBackend instead /// of a real backend /// diff --git a/src/c++/perf_analyzer/infer_context.cc b/src/c++/perf_analyzer/infer_context.cc index dc65c2adc..f020cd8fa 100644 --- a/src/c++/perf_analyzer/infer_context.cc +++ b/src/c++/perf_analyzer/infer_context.cc @@ -74,7 +74,9 @@ InferContext::SendSequenceInferRequest(uint32_t seq_stat_index, bool delayed) sequence_manager_->DecrementRemainingQueries(seq_stat_index); - SendRequest(request_id_++, delayed); + SendRequest( + request_id_++, delayed, + sequence_manager_->GetSequenceID(seq_stat_index)); } } @@ -95,12 +97,15 @@ InferContext::CompleteOngoingSequence(uint32_t seq_stat_index) sequence_manager_->DecrementRemainingQueries(seq_stat_index); bool is_delayed = false; - SendRequest(request_id_++, is_delayed); + SendRequest( + request_id_++, is_delayed, + sequence_manager_->GetSequenceID(seq_stat_index)); } } void -InferContext::SendRequest(const uint64_t request_id, const bool delayed) +InferContext::SendRequest( + const uint64_t request_id, const bool delayed, const uint64_t sequence_id) { if (!thread_stat_->status_.IsOk()) { return; @@ -111,14 +116,13 @@ InferContext::SendRequest(const uint64_t request_id, const bool delayed) infer_data_.options_->request_id_ = std::to_string(request_id); { std::lock_guard lock(thread_stat_->mu_); - auto it = - async_req_map_ - .emplace( - infer_data_.options_->request_id_, AsyncRequestProperties()) - .first; + auto it = async_req_map_ + .emplace(infer_data_.options_->request_id_, RequestRecord()) + .first; it->second.start_time_ = std::chrono::system_clock::now(); it->second.sequence_end_ = infer_data_.options_->sequence_end_; it->second.delayed_ = delayed; + it->second.sequence_id_ = sequence_id; } thread_stat_->idle_timer.Start(); @@ -157,13 +161,13 @@ InferContext::SendRequest(const uint64_t request_id, const bool delayed) std::vector> end_time_syncs{end_time_sync}; { - // Add the request timestamp to thread Timestamp vector with proper + // Add the request record to thread request records vector with proper // locking std::lock_guard lock(thread_stat_->mu_); auto total = end_time_sync - start_time_sync; - thread_stat_->request_timestamps_.emplace_back(std::make_tuple( + thread_stat_->request_records_.emplace_back(RequestRecord( start_time_sync, std::move(end_time_syncs), - infer_data_.options_->sequence_end_, delayed)); + infer_data_.options_->sequence_end_, delayed, sequence_id)); thread_stat_->status_ = infer_backend_->ClientInferStat(&(thread_stat_->contexts_stat_[id_])); if (!thread_stat_->status_.IsOk()) { @@ -238,7 +242,7 @@ InferContext::AsyncCallbackFuncImpl(cb::InferResult* result) std::shared_ptr result_ptr(result); bool is_final_response{true}; if (thread_stat_->cb_status_.IsOk()) { - // Add the request timestamp to thread Timestamp vector with + // Add the request record to thread request records vector with // proper locking std::lock_guard lock(thread_stat_->mu_); thread_stat_->cb_status_ = result_ptr->RequestStatus(); @@ -254,7 +258,8 @@ InferContext::AsyncCallbackFuncImpl(cb::InferResult* result) return; } if (is_null_response == false) { - it->second.end_times.push_back(std::chrono::system_clock::now()); + it->second.response_times_.push_back( + std::chrono::system_clock::now()); } thread_stat_->cb_status_ = result_ptr->IsFinalResponse(&is_final_response); @@ -262,9 +267,10 @@ InferContext::AsyncCallbackFuncImpl(cb::InferResult* result) return; } if (is_final_response) { - thread_stat_->request_timestamps_.emplace_back(std::make_tuple( - it->second.start_time_, it->second.end_times, - it->second.sequence_end_, it->second.delayed_)); + thread_stat_->request_records_.emplace_back( + it->second.start_time_, it->second.response_times_, + it->second.sequence_end_, it->second.delayed_, + it->second.sequence_id_); infer_backend_->ClientInferStat(&(thread_stat_->contexts_stat_[id_])); thread_stat_->cb_status_ = ValidateOutputs(result); async_req_map_.erase(request_id); diff --git a/src/c++/perf_analyzer/infer_context.h b/src/c++/perf_analyzer/infer_context.h index c91fbcacc..fb048546e 100644 --- a/src/c++/perf_analyzer/infer_context.h +++ b/src/c++/perf_analyzer/infer_context.h @@ -36,6 +36,7 @@ #include "iinfer_data_manager.h" #include "infer_data.h" #include "perf_utils.h" +#include "request_record.h" #include "sequence_manager.h" namespace triton { namespace perfanalyzer { @@ -55,31 +56,16 @@ struct ThreadStat { // Tracks the amount of time this thread spent sleeping or waiting IdleTimer idle_timer; - // A vector of request timestamps - // Request latency will be end_time - start_time - TimestampVector request_timestamps_; + // A vector of request records + std::vector request_records_; // A lock to protect thread data std::mutex mu_; // The number of sent requests by this thread. std::atomic num_sent_requests_{0}; }; -/// The properties of an asynchronous request required in -/// the callback to effectively interpret the response. -struct AsyncRequestProperties { - AsyncRequestProperties() : sequence_end_(false), delayed_(true) {} - // The timestamp of when the request was started. - std::chrono::time_point start_time_; - // Whether or not the request is at the end of a sequence. - bool sequence_end_; - // Whether or not the request is delayed as per schedule. - bool delayed_; - // Collection of response times - std::vector> end_times; -}; - #ifndef DOCTEST_CONFIG_DISABLE -class MockInferContext; +class NaggyMockInferContext; #endif /// Sends inference requests to the server @@ -146,7 +132,11 @@ class InferContext { /// A helper function to issue inference request to the server. /// \param request_id The unique id to be associated with the request. /// \param delayed Whether the request fell behind its scheduled time. - virtual void SendRequest(const uint64_t request_id, const bool delayed); + /// \param sequence_id Sequence ID of the request. Note that the default of + /// `0` means the request is not a sequence. + virtual void SendRequest( + const uint64_t request_id, const bool delayed, + const uint64_t sequence_id = 0); /// Update inputs based on custom json data void UpdateJsonData(); @@ -159,8 +149,8 @@ class InferContext { // Callback function for handling asynchronous requests void AsyncCallbackFuncImpl(cb::InferResult* result); - const bool async_{false}; - const bool streaming_{false}; + bool async_{false}; + bool streaming_{false}; const bool on_sequence_model_{false}; bool using_json_data_{false}; const int32_t batch_size_{0}; @@ -172,7 +162,7 @@ class InferContext { std::shared_ptr infer_data_manager_; uint64_t request_id_ = 0; - std::map async_req_map_; + std::map async_req_map_; std::atomic total_ongoing_requests_{0}; size_t data_step_id_; @@ -203,7 +193,7 @@ class InferContext { std::shared_ptr sequence_manager_{nullptr}; #ifndef DOCTEST_CONFIG_DISABLE - friend MockInferContext; + friend NaggyMockInferContext; public: InferContext() = default; diff --git a/src/c++/perf_analyzer/inference_profiler.cc b/src/c++/perf_analyzer/inference_profiler.cc index b0dd3f224..76d4d113f 100644 --- a/src/c++/perf_analyzer/inference_profiler.cc +++ b/src/c++/perf_analyzer/inference_profiler.cc @@ -681,13 +681,13 @@ InferenceProfiler::ProfileHelper( size_t completed_trials = 0; std::queue error; std::deque measurement_perf_statuses; - all_timestamps_.clear(); + all_request_records_.clear(); previous_window_end_ns_ = 0; - // Start with a fresh empty timestamp vector in the manager + // Start with a fresh empty request records vector in the manager // - TimestampVector empty_timestamps; - RETURN_IF_ERROR(manager_->SwapTimestamps(empty_timestamps)); + std::vector empty_request_records; + RETURN_IF_ERROR(manager_->SwapRequestRecords(empty_request_records)); do { PerfStatus measurement_perf_status; @@ -1193,11 +1193,11 @@ InferenceProfiler::Measure( RETURN_IF_ERROR(manager_->GetAccumulatedClientStat(&end_stat)); prev_client_side_stats_ = end_stat; - TimestampVector current_timestamps; - RETURN_IF_ERROR(manager_->SwapTimestamps(current_timestamps)); - all_timestamps_.insert( - all_timestamps_.end(), current_timestamps.begin(), - current_timestamps.end()); + std::vector current_request_records; + RETURN_IF_ERROR(manager_->SwapRequestRecords(current_request_records)); + all_request_records_.insert( + all_request_records_.end(), current_request_records.begin(), + current_request_records.end()); RETURN_IF_ERROR(Summarize( start_status, end_status, start_stat, end_stat, perf_status, @@ -1257,23 +1257,23 @@ InferenceProfiler::ValidLatencyMeasurement( valid_sequence_count = 0; response_count = 0; std::vector erase_indices{}; - for (size_t i = 0; i < all_timestamps_.size(); i++) { - const auto& timestamp = all_timestamps_[i]; - uint64_t request_start_ns = CHRONO_TO_NANOS(std::get<0>(timestamp)); - uint64_t request_end_ns = CHRONO_TO_NANOS(std::get<1>(timestamp).back()); + for (size_t i = 0; i < all_request_records_.size(); i++) { + const auto& request_record = all_request_records_[i]; + uint64_t request_start_ns = CHRONO_TO_NANOS(request_record.start_time_); + uint64_t request_end_ns = + CHRONO_TO_NANOS(request_record.response_times_.back()); if (request_start_ns <= request_end_ns) { // Only counting requests that end within the time interval if ((request_end_ns >= valid_range.first) && (request_end_ns <= valid_range.second)) { valid_latencies->push_back(request_end_ns - request_start_ns); - response_count += std::get<1>(timestamp).size(); + response_count += request_record.response_times_.size(); erase_indices.push_back(i); - // Just add the sequence_end flag here. - if (std::get<2>(timestamp)) { + if (request_record.sequence_end_) { valid_sequence_count++; } - if (std::get<3>(timestamp)) { + if (request_record.delayed_) { delayed_request_count++; } } @@ -1281,10 +1281,10 @@ InferenceProfiler::ValidLatencyMeasurement( } // Iterate through erase indices backwards so that erases from - // `all_timestamps_` happen from the back to the front to avoid using wrong - // indices after subsequent erases + // `all_request_records_` happen from the back to the front to avoid using + // wrong indices after subsequent erases std::for_each(erase_indices.rbegin(), erase_indices.rend(), [this](size_t i) { - this->all_timestamps_.erase(this->all_timestamps_.begin() + i); + this->all_request_records_.erase(this->all_request_records_.begin() + i); }); // Always sort measured latencies as percentile will be reported as default diff --git a/src/c++/perf_analyzer/inference_profiler.h b/src/c++/perf_analyzer/inference_profiler.h index b07cd93ae..b76a8dac7 100644 --- a/src/c++/perf_analyzer/inference_profiler.h +++ b/src/c++/perf_analyzer/inference_profiler.h @@ -183,9 +183,9 @@ cb::Error ReportPrometheusMetrics(const Metrics& metrics); /// time. /// 2. After given time interval, the profiler gets end status from the server /// and records the end time. -/// 3. The profiler obtains the timestamps recorded by concurrency manager, -/// and uses the timestamps that are recorded between start time and end time -/// to measure client side status and update status_summary. +/// 3. The profiler obtains the request records recorded by concurrency manager, +/// and uses the request records that are recorded between start time and end +/// time to measure client side status and update status_summary. /// class InferenceProfiler { public: @@ -678,8 +678,8 @@ class InferenceProfiler { bool include_server_stats_; std::shared_ptr mpi_driver_; - /// The timestamps of the requests completed during all measurements - TimestampVector all_timestamps_; + /// The request records of the requests completed during all measurements + std::vector all_request_records_; /// The end time of the previous measurement window uint64_t previous_window_end_ns_; diff --git a/src/c++/perf_analyzer/load_manager.cc b/src/c++/perf_analyzer/load_manager.cc index 5b76c9f4b..369317e66 100644 --- a/src/c++/perf_analyzer/load_manager.cc +++ b/src/c++/perf_analyzer/load_manager.cc @@ -59,20 +59,19 @@ LoadManager::CheckHealth() } cb::Error -LoadManager::SwapTimestamps(TimestampVector& new_timestamps) +LoadManager::SwapRequestRecords(std::vector& new_request_records) { - TimestampVector total_timestamp; - // Gather request timestamps with proper locking from all the worker - // threads + std::vector total_request_records; + // Gather request records with proper locking from all the worker threads for (auto& thread_stat : threads_stat_) { std::lock_guard lock(thread_stat->mu_); - total_timestamp.insert( - total_timestamp.end(), thread_stat->request_timestamps_.begin(), - thread_stat->request_timestamps_.end()); - thread_stat->request_timestamps_.clear(); + total_request_records.insert( + total_request_records.end(), thread_stat->request_records_.begin(), + thread_stat->request_records_.end()); + thread_stat->request_records_.clear(); } // Swap the results - total_timestamp.swap(new_timestamps); + total_request_records.swap(new_request_records); return cb::Error::Success; } @@ -82,7 +81,7 @@ LoadManager::CountCollectedRequests() uint64_t num_of_requests = 0; for (auto& thread_stat : threads_stat_) { std::lock_guard lock(thread_stat->mu_); - num_of_requests += thread_stat->request_timestamps_.size(); + num_of_requests += thread_stat->request_records_.size(); } return num_of_requests; } diff --git a/src/c++/perf_analyzer/load_manager.h b/src/c++/perf_analyzer/load_manager.h index 5a10ae592..5e75ab9ea 100644 --- a/src/c++/perf_analyzer/load_manager.h +++ b/src/c++/perf_analyzer/load_manager.h @@ -76,11 +76,11 @@ class LoadManager { /// \return cb::Error object indicating success or failure. cb::Error CheckHealth(); - /// Swap the content of the timestamp vector recorded by the load - /// manager with a new timestamp vector - /// \param new_timestamps The timestamp vector to be swapped. + /// Swap the content of the request records vector recorded by the load + /// manager with a new request records vector + /// \param new_request_records The request records vector to be swapped. /// \return cb::Error object indicating success or failure. - cb::Error SwapTimestamps(TimestampVector& new_timestamps); + cb::Error SwapRequestRecords(std::vector& new_request_records); /// Get the sum of all contexts' stat /// \param contexts_stat Returned the accumulated stat from all contexts diff --git a/src/c++/perf_analyzer/mock_infer_context.h b/src/c++/perf_analyzer/mock_infer_context.h index 3b9d938b3..e1c15d03c 100644 --- a/src/c++/perf_analyzer/mock_infer_context.h +++ b/src/c++/perf_analyzer/mock_infer_context.h @@ -1,4 +1,4 @@ -// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -30,9 +30,22 @@ namespace triton { namespace perfanalyzer { -class MockInferContext : public InferContext { +class NaggyMockInferContext : public InferContext { public: - MOCK_METHOD(void, SendRequest, (const uint64_t, const bool), (override)); + NaggyMockInferContext() + { + ON_CALL(*this, SendRequest(testing::_, testing::_, testing::_)) + .WillByDefault( + [this]( + const uint64_t request_id, const bool delayed, + const uint64_t sequence_id) -> void { + this->InferContext::SendRequest(request_id, delayed, sequence_id); + }); + } + + MOCK_METHOD( + void, SendRequest, (const uint64_t, const bool, const uint64_t), + (override)); std::shared_ptr& sequence_manager_{ InferContext::sequence_manager_}; @@ -42,6 +55,15 @@ class MockInferContext : public InferContext { std::shared_ptr& thread_stat_{InferContext::thread_stat_}; std::reference_wrapper& execute_{InferContext::execute_}; bool& using_json_data_{InferContext::using_json_data_}; + bool& async_{InferContext::async_}; + bool& streaming_{InferContext::streaming_}; + InferData& infer_data_{InferContext::infer_data_}; + std::unique_ptr& infer_backend_{ + InferContext::infer_backend_}; + std::function& async_callback_func_{ + InferContext::async_callback_func_}; }; +using MockInferContext = testing::NiceMock; + }} // namespace triton::perfanalyzer diff --git a/src/c++/perf_analyzer/mock_inference_profiler.h b/src/c++/perf_analyzer/mock_inference_profiler.h index a31485091..c64b5fb08 100644 --- a/src/c++/perf_analyzer/mock_inference_profiler.h +++ b/src/c++/perf_analyzer/mock_inference_profiler.h @@ -113,7 +113,8 @@ class NaggyMockInferenceProfiler : public InferenceProfiler { std::shared_ptr& parser_{InferenceProfiler::parser_}; std::unique_ptr& manager_{InferenceProfiler::manager_}; bool& include_lib_stats_{InferenceProfiler::include_lib_stats_}; - TimestampVector& all_timestamps_{InferenceProfiler::all_timestamps_}; + std::vector& all_request_records_{ + InferenceProfiler::all_request_records_}; }; using MockInferenceProfiler = testing::NiceMock; diff --git a/src/c++/perf_analyzer/perf_utils.h b/src/c++/perf_analyzer/perf_utils.h index 1865b8430..7166936a9 100644 --- a/src/c++/perf_analyzer/perf_utils.h +++ b/src/c++/perf_analyzer/perf_utils.h @@ -53,10 +53,6 @@ constexpr uint64_t NANOS_PER_MILLIS = 1000000; #define CHRONO_TO_MILLIS(TS) (CHRONO_TO_NANOS(TS) / pa::NANOS_PER_MILLIS) //============================================================================== -using TimestampVector = std::vector, - std::vector>, uint32_t, - bool>>; // Will use the characters specified here to construct random strings std::string const character_set = diff --git a/src/c++/perf_analyzer/request_record.h b/src/c++/perf_analyzer/request_record.h new file mode 100644 index 000000000..ba441549a --- /dev/null +++ b/src/c++/perf_analyzer/request_record.h @@ -0,0 +1,61 @@ +// Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of NVIDIA CORPORATION nor the names of its +// contributors may be used to endorse or promote products derived +// from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#pragma once + +#include +#include +#include +#include + +namespace triton { namespace perfanalyzer { + +/// A record of an individual request +struct RequestRecord { + RequestRecord() = default; + RequestRecord( + std::chrono::time_point start_time, + std::vector> + response_times, + bool sequence_end, bool delayed, uint64_t sequence_id) + : start_time_(start_time), response_times_(response_times), + sequence_end_(sequence_end), delayed_(delayed), + sequence_id_(sequence_id) + { + } + // The timestamp of when the request was started. + std::chrono::time_point start_time_; + // Collection of response times + std::vector> + response_times_; + // Whether or not the request is at the end of a sequence. + bool sequence_end_; + // Whether or not the request is delayed as per schedule. + bool delayed_; + // Sequence ID of the request + uint64_t sequence_id_; +}; + +}} // namespace triton::perfanalyzer diff --git a/src/c++/perf_analyzer/sequence_manager.cc b/src/c++/perf_analyzer/sequence_manager.cc index 089768232..eaf5d6e00 100644 --- a/src/c++/perf_analyzer/sequence_manager.cc +++ b/src/c++/perf_analyzer/sequence_manager.cc @@ -1,4 +1,4 @@ -// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -53,6 +53,12 @@ SequenceManager::InitSequenceStatuses(size_t num_sequence_statuses) } } +const uint64_t +SequenceManager::GetSequenceID(size_t sequence_status_index) const +{ + return sequence_statuses_.at(sequence_status_index)->seq_id_; +} + std::mutex& SequenceManager::GetMutex(size_t sequence_status_index) { diff --git a/src/c++/perf_analyzer/sequence_manager.h b/src/c++/perf_analyzer/sequence_manager.h index d204b0f0b..c419a87f0 100644 --- a/src/c++/perf_analyzer/sequence_manager.h +++ b/src/c++/perf_analyzer/sequence_manager.h @@ -68,6 +68,12 @@ class SequenceManager { /// void InitSequenceStatuses(size_t num_sequence_statuses); + /// Gets the sequence ID for the specified sequence status object. + /// \param sequence_status_index The index of the sequence status object. + /// \return The sequence ID for the specified sequence status object. + /// + const uint64_t GetSequenceID(size_t sequence_status_index) const; + /// Gets a non-const reference to the mutex for the specified sequence status /// object. /// \param sequence_status_index The index of the sequence status object. diff --git a/src/c++/perf_analyzer/test_infer_context.cc b/src/c++/perf_analyzer/test_infer_context.cc index 0af45b4dd..951fb2b10 100644 --- a/src/c++/perf_analyzer/test_infer_context.cc +++ b/src/c++/perf_analyzer/test_infer_context.cc @@ -1,4 +1,4 @@ -// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -24,6 +24,7 @@ // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#include "client_backend/mock_client_backend.h" #include "doctest.h" #include "gmock/gmock.h" #include "infer_context.h" @@ -92,7 +93,7 @@ TEST_CASE("update_seq_json_data: testing the UpdateSeqJsonData function") std::shared_ptr mic{std::make_shared()}; - EXPECT_CALL(*mic, SendRequest(testing::_, testing::_)) + EXPECT_CALL(*mic, SendRequest(testing::_, testing::_, testing::_)) .Times(6) .WillRepeatedly(testing::Return()); @@ -122,4 +123,56 @@ TEST_CASE("update_seq_json_data: testing the UpdateSeqJsonData function") REQUIRE(testing::Test::HasFailure() == false); } +TEST_CASE("send_request: testing the SendRequest function") +{ + MockInferContext mock_infer_context{}; + + SUBCASE("testing logic relevant to request record sequence ID") + { + mock_infer_context.thread_stat_ = std::make_shared(); + mock_infer_context.thread_stat_->contexts_stat_.emplace_back(); + mock_infer_context.async_ = true; + mock_infer_context.streaming_ = true; + mock_infer_context.infer_data_.options_ = + std::make_unique("my_model"); + std::shared_ptr mock_client_stats{ + std::make_shared()}; + mock_infer_context.infer_backend_ = + std::make_unique(mock_client_stats); + + const uint64_t request_id{5}; + const bool delayed{false}; + const uint64_t sequence_id{2}; + + mock_infer_context.infer_data_.options_->request_id_ = + std::to_string(request_id); + + cb::MockInferResult* mock_infer_result{ + new cb::MockInferResult(*mock_infer_context.infer_data_.options_)}; + + cb::OnCompleteFn& stream_callback{mock_infer_context.async_callback_func_}; + + EXPECT_CALL( + dynamic_cast( + *mock_infer_context.infer_backend_), + AsyncStreamInfer(testing::_, testing::_, testing::_)) + .WillOnce( + [&mock_infer_result, &stream_callback]( + const cb::InferOptions& options, + const std::vector& inputs, + const std::vector& outputs) + -> cb::Error { + stream_callback(mock_infer_result); + return cb::Error::Success; + }); + + mock_infer_context.SendRequest(request_id, delayed, sequence_id); + + CHECK(mock_infer_context.thread_stat_->request_records_.size() == 1); + CHECK( + mock_infer_context.thread_stat_->request_records_[0].sequence_id_ == + sequence_id); + } +} + }} // namespace triton::perfanalyzer diff --git a/src/c++/perf_analyzer/test_inference_profiler.cc b/src/c++/perf_analyzer/test_inference_profiler.cc index 71ed2eb89..bd3ab3e5b 100644 --- a/src/c++/perf_analyzer/test_inference_profiler.cc +++ b/src/c++/perf_analyzer/test_inference_profiler.cc @@ -38,10 +38,10 @@ class TestInferenceProfiler : public InferenceProfiler { const std::pair& valid_range, size_t& valid_sequence_count, size_t& delayed_request_count, std::vector* latencies, size_t& response_count, - TimestampVector& all_timestamps) + std::vector& all_request_records) { InferenceProfiler inference_profiler{}; - inference_profiler.all_timestamps_ = all_timestamps; + inference_profiler.all_request_records_ = all_request_records; inference_profiler.ValidLatencyMeasurement( valid_range, valid_sequence_count, delayed_request_count, latencies, response_count); @@ -171,53 +171,58 @@ TEST_CASE("testing the ValidLatencyMeasurement function") const std::pair window{4, 17}; using time_point = std::chrono::time_point; using ns = std::chrono::nanoseconds; - TimestampVector all_timestamps{ + std::vector all_request_records{ // request ends before window starts, this should not be possible to exist // in the vector of requests, but if it is, we exclude it: not included in // current window - std::make_tuple( + RequestRecord( time_point(ns(1)), std::vector{time_point(ns(2))}, 0, - false), + false, 0), // request starts before window starts and ends inside window: included in // current window - std::make_tuple( + RequestRecord( time_point(ns(3)), std::vector{time_point(ns(5))}, 0, - false), + false, 0), // requests start and end inside window: included in current window - std::make_tuple( + RequestRecord( time_point(ns(6)), std::vector{time_point(ns(9))}, 0, - false), - std::make_tuple( + false, 0), + RequestRecord( time_point(ns(10)), std::vector{time_point(ns(14))}, 0, - false), + false, 0), // request starts before window ends and ends after window ends: not // included in current window - std::make_tuple( + RequestRecord( time_point(ns(15)), std::vector{time_point(ns(20))}, 0, - false), + false, 0), // request starts after window ends: not included in current window - std::make_tuple( + RequestRecord( time_point(ns(21)), std::vector{time_point(ns(27))}, 0, - false)}; + false, 0)}; TestInferenceProfiler::ValidLatencyMeasurement( window, valid_sequence_count, delayed_request_count, &latencies, - response_count, all_timestamps); + response_count, all_request_records); - const auto& convert_timestamp_to_latency{ - [](std::tuple, uint32_t, bool> t) { - return CHRONO_TO_NANOS(std::get<1>(t).back()) - - CHRONO_TO_NANOS(std::get<0>(t)); - }}; + const auto& convert_request_record_to_latency{[](RequestRecord t) { + return CHRONO_TO_NANOS(t.response_times_.back()) - + CHRONO_TO_NANOS(t.start_time_); + }}; CHECK(latencies.size() == 3); - CHECK(latencies[0] == convert_timestamp_to_latency(all_timestamps[1])); - CHECK(latencies[1] == convert_timestamp_to_latency(all_timestamps[2])); - CHECK(latencies[2] == convert_timestamp_to_latency(all_timestamps[3])); + CHECK( + latencies[0] == + convert_request_record_to_latency(all_request_records[1])); + CHECK( + latencies[1] == + convert_request_record_to_latency(all_request_records[2])); + CHECK( + latencies[2] == + convert_request_record_to_latency(all_request_records[3])); } TEST_CASE("test_check_window_for_stability") @@ -871,23 +876,24 @@ TEST_CASE( auto request1_timestamp{clock_epoch + std::chrono::nanoseconds(1)}; auto response1_timestamp{clock_epoch + std::chrono::nanoseconds(2)}; auto response2_timestamp{clock_epoch + std::chrono::nanoseconds(3)}; - auto timestamp1{std::make_tuple( + auto request_record1{RequestRecord( request1_timestamp, std::vector>{ response1_timestamp, response2_timestamp}, - 0, false)}; + 0, false, 0)}; auto request2_timestamp{clock_epoch + std::chrono::nanoseconds(4)}; auto response3_timestamp{clock_epoch + std::chrono::nanoseconds(5)}; auto response4_timestamp{clock_epoch + std::chrono::nanoseconds(6)}; auto response5_timestamp{clock_epoch + std::chrono::nanoseconds(7)}; - auto timestamp2{std::make_tuple( + auto request_record2{RequestRecord( request2_timestamp, std::vector>{ response3_timestamp, response4_timestamp, response5_timestamp}, - 0, false)}; + 0, false, 0)}; - mock_inference_profiler.all_timestamps_ = {timestamp1, timestamp2}; + mock_inference_profiler.all_request_records_ = { + request_record1, request_record2}; const std::pair valid_range{ std::make_pair(0, UINT64_MAX)}; diff --git a/src/c++/perf_analyzer/test_load_manager.cc b/src/c++/perf_analyzer/test_load_manager.cc index 224dc895f..ada49e25b 100644 --- a/src/c++/perf_analyzer/test_load_manager.cc +++ b/src/c++/perf_analyzer/test_load_manager.cc @@ -33,6 +33,21 @@ namespace cb = triton::perfanalyzer::clientbackend; namespace triton { namespace perfanalyzer { +namespace { + +bool +operator==(const RequestRecord& lhs, const RequestRecord& rhs) +{ + return std::tie( + lhs.start_time_, lhs.response_times_, lhs.sequence_end_, + lhs.delayed_, lhs.sequence_id_) == + std::tie( + rhs.start_time_, rhs.response_times_, rhs.sequence_end_, + rhs.delayed_, rhs.sequence_id_); +} + +} // namespace + class TestLoadManager : public TestLoadManagerBase, public LoadManager { public: ~TestLoadManager() = default; @@ -108,83 +123,83 @@ class TestLoadManager : public TestLoadManagerBase, public LoadManager { CHECK(CheckHealth().IsOk() == expect_ok); } - /// Test the public function SwapTimestamps + /// Test the public function SwapRequestRecords /// - /// It will gather all timestamps from the thread_stats - /// and return them, and clear the thread_stats timestamps + /// It will gather all request records from the thread_stats + /// and return them, and clear the thread_stats request records /// - void TestSwapTimeStamps() + void TestSwapRequestRecords() { using time_point = std::chrono::time_point; using ns = std::chrono::nanoseconds; - auto timestamp1 = std::make_tuple( - time_point(ns(1)), std::vector{time_point(ns(2))}, 0, - false); - auto timestamp2 = std::make_tuple( - time_point(ns(3)), std::vector{time_point(ns(4))}, 0, - false); - auto timestamp3 = std::make_tuple( - time_point(ns(5)), std::vector{time_point(ns(6))}, 0, - false); - - TimestampVector source_timestamps; + auto request_record1 = RequestRecord( + time_point(ns(1)), std::vector{time_point(ns(2))}, 0, false, + 0); + auto request_record2 = RequestRecord( + time_point(ns(3)), std::vector{time_point(ns(4))}, 0, false, + 0); + auto request_record3 = RequestRecord( + time_point(ns(5)), std::vector{time_point(ns(6))}, 0, false, + 0); + + std::vector source_request_records; SUBCASE("No threads") { - auto ret = SwapTimestamps(source_timestamps); - CHECK(source_timestamps.size() == 0); + auto ret = SwapRequestRecords(source_request_records); + CHECK(source_request_records.size() == 0); CHECK(ret.IsOk() == true); } - SUBCASE("Source has timestamps") + SUBCASE("Source has request records") { - // Any timestamps in the vector passed in to Swaptimestamps will + // Any request records in the vector passed in to SwapRequestRecords will // be dropped on the floor // - source_timestamps.push_back(timestamp1); - auto ret = SwapTimestamps(source_timestamps); - CHECK(source_timestamps.size() == 0); + source_request_records.push_back(request_record1); + auto ret = SwapRequestRecords(source_request_records); + CHECK(source_request_records.size() == 0); CHECK(ret.IsOk() == true); } SUBCASE("One thread") { auto stat1 = std::make_shared(); - stat1->request_timestamps_.push_back(timestamp1); - stat1->request_timestamps_.push_back(timestamp2); - stat1->request_timestamps_.push_back(timestamp3); + stat1->request_records_.push_back(request_record1); + stat1->request_records_.push_back(request_record2); + stat1->request_records_.push_back(request_record3); threads_stat_.push_back(stat1); - CHECK(stat1->request_timestamps_.size() == 3); - auto ret = SwapTimestamps(source_timestamps); - CHECK(stat1->request_timestamps_.size() == 0); + CHECK(stat1->request_records_.size() == 3); + auto ret = SwapRequestRecords(source_request_records); + CHECK(stat1->request_records_.size() == 0); - REQUIRE(source_timestamps.size() == 3); - CHECK(source_timestamps[0] == timestamp1); - CHECK(source_timestamps[1] == timestamp2); - CHECK(source_timestamps[2] == timestamp3); + REQUIRE(source_request_records.size() == 3); + CHECK(source_request_records[0] == request_record1); + CHECK(source_request_records[1] == request_record2); + CHECK(source_request_records[2] == request_record3); CHECK(ret.IsOk() == true); } SUBCASE("Multiple threads") { auto stat1 = std::make_shared(); - stat1->request_timestamps_.push_back(timestamp2); + stat1->request_records_.push_back(request_record2); auto stat2 = std::make_shared(); - stat2->request_timestamps_.push_back(timestamp1); - stat2->request_timestamps_.push_back(timestamp3); + stat2->request_records_.push_back(request_record1); + stat2->request_records_.push_back(request_record3); threads_stat_.push_back(stat1); threads_stat_.push_back(stat2); - CHECK(stat1->request_timestamps_.size() == 1); - CHECK(stat2->request_timestamps_.size() == 2); - auto ret = SwapTimestamps(source_timestamps); - CHECK(stat1->request_timestamps_.size() == 0); - CHECK(stat2->request_timestamps_.size() == 0); + CHECK(stat1->request_records_.size() == 1); + CHECK(stat2->request_records_.size() == 2); + auto ret = SwapRequestRecords(source_request_records); + CHECK(stat1->request_records_.size() == 0); + CHECK(stat2->request_records_.size() == 0); - REQUIRE(source_timestamps.size() == 3); - CHECK(source_timestamps[0] == timestamp2); - CHECK(source_timestamps[1] == timestamp1); - CHECK(source_timestamps[2] == timestamp3); + REQUIRE(source_request_records.size() == 3); + CHECK(source_request_records[0] == request_record2); + CHECK(source_request_records[1] == request_record1); + CHECK(source_request_records[2] == request_record3); CHECK(ret.IsOk() == true); } } @@ -271,22 +286,22 @@ class TestLoadManager : public TestLoadManagerBase, public LoadManager { /// Test the public function CountCollectedRequests /// - /// It will count all timestamps in the thread_stats (and not modify + /// It will count all request records in the thread_stats (and not modify /// the thread_stats in any way) /// void TestCountCollectedRequests() { using time_point = std::chrono::time_point; using ns = std::chrono::nanoseconds; - auto timestamp1 = std::make_tuple( - time_point(ns(1)), std::vector{time_point(ns(2))}, 0, - false); - auto timestamp2 = std::make_tuple( - time_point(ns(3)), std::vector{time_point(ns(4))}, 0, - false); - auto timestamp3 = std::make_tuple( - time_point(ns(5)), std::vector{time_point(ns(6))}, 0, - false); + auto request_record1 = RequestRecord( + time_point(ns(1)), std::vector{time_point(ns(2))}, 0, false, + 0); + auto request_record2 = RequestRecord( + time_point(ns(3)), std::vector{time_point(ns(4))}, 0, false, + 0); + auto request_record3 = RequestRecord( + time_point(ns(5)), std::vector{time_point(ns(6))}, 0, false, + 0); SUBCASE("No threads") { @@ -295,32 +310,32 @@ class TestLoadManager : public TestLoadManagerBase, public LoadManager { SUBCASE("One thread") { auto stat1 = std::make_shared(); - stat1->request_timestamps_.push_back(timestamp1); - stat1->request_timestamps_.push_back(timestamp2); - stat1->request_timestamps_.push_back(timestamp3); + stat1->request_records_.push_back(request_record1); + stat1->request_records_.push_back(request_record2); + stat1->request_records_.push_back(request_record3); threads_stat_.push_back(stat1); - CHECK(stat1->request_timestamps_.size() == 3); + CHECK(stat1->request_records_.size() == 3); CHECK(CountCollectedRequests() == 3); - CHECK(stat1->request_timestamps_.size() == 3); + CHECK(stat1->request_records_.size() == 3); } SUBCASE("Multiple threads") { auto stat1 = std::make_shared(); - stat1->request_timestamps_.push_back(timestamp2); + stat1->request_records_.push_back(request_record2); auto stat2 = std::make_shared(); - stat2->request_timestamps_.push_back(timestamp1); - stat2->request_timestamps_.push_back(timestamp3); + stat2->request_records_.push_back(request_record1); + stat2->request_records_.push_back(request_record3); threads_stat_.push_back(stat1); threads_stat_.push_back(stat2); - CHECK(stat1->request_timestamps_.size() == 1); - CHECK(stat2->request_timestamps_.size() == 2); + CHECK(stat1->request_records_.size() == 1); + CHECK(stat2->request_records_.size() == 2); CHECK(CountCollectedRequests() == 3); - CHECK(stat1->request_timestamps_.size() == 1); - CHECK(stat2->request_timestamps_.size() == 2); + CHECK(stat1->request_records_.size() == 1); + CHECK(stat2->request_records_.size() == 2); } } @@ -361,10 +376,11 @@ TEST_CASE("load_manager_check_health: Test the public function CheckHealth()") } TEST_CASE( - "load_manager_swap_timestamps: Test the public function SwapTimeStamps()") + "load_manager_swap_request_records: Test the public function " + "SwapRequestRecords()") { TestLoadManager tlm(PerfAnalyzerParameters{}); - tlm.TestSwapTimeStamps(); + tlm.TestSwapRequestRecords(); } TEST_CASE( diff --git a/src/c++/perf_analyzer/test_sequence_manager.cc b/src/c++/perf_analyzer/test_sequence_manager.cc index 83302bfe1..243500b85 100644 --- a/src/c++/perf_analyzer/test_sequence_manager.cc +++ b/src/c++/perf_analyzer/test_sequence_manager.cc @@ -31,6 +31,18 @@ namespace triton { namespace perfanalyzer { +TEST_CASE("get_sequence_id: testing the GetSequenceID function") +{ + MockSequenceManager msm{}; + + std::shared_ptr sequence_status{ + std::make_shared(5)}; + + msm.sequence_statuses_.push_back(sequence_status); + + CHECK(msm.GetSequenceID(0) == 5); +} + TEST_CASE( "test_set_infer_sequence_options: testing the SetInferSequenceOptions " "function")