From fe2cd508fba03ffb101d89b1c65b76e1adab6038 Mon Sep 17 00:00:00 2001 From: krishung5 Date: Mon, 11 Dec 2023 17:36:21 -0800 Subject: [PATCH 1/4] Set release flags and clean up response factory map before returning error --- src/python_be.cc | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/python_be.cc b/src/python_be.cc index 6de5bcf3..08fa641c 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -1361,6 +1361,17 @@ ModelInstanceState::ProcessRequestsDecoupled( reporter.SetBatchStatistics(request_count); if (response_batch.data_->has_error) { + for (auto& infer_request : pb_infer_requests) { + // Reset the release flags for all the requests. + infer_request->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); + // Clean up the response factory map. + { + std::lock_guard guard{response_factory_map_mutex_}; + response_factory_map_.erase( + reinterpret_cast(infer_request->RequestAddress())); + } + } + if (response_batch.data_->is_error_set) { auto error = PbString::LoadFromSharedMemory( Stub()->ShmPool(), response_batch.data_->error); @@ -1368,11 +1379,6 @@ ModelInstanceState::ProcessRequestsDecoupled( TRITONSERVER_ERROR_INTERNAL, error->String().c_str()); } - // Reset the release flags for all the requests. - for (auto& infer_request : pb_infer_requests) { - infer_request->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); - } - return TRITONSERVER_ErrorNew( TRITONSERVER_ERROR_INTERNAL, "Failed to process the requests."); } From 419a3a55580798d1086fa930aeab4051da6f5ef3 Mon Sep 17 00:00:00 2001 From: krishung5 Date: Tue, 12 Dec 2023 11:40:34 -0800 Subject: [PATCH 2/4] Address comments --- src/python_be.cc | 23 +++++++++++++++-------- src/python_be.h | 4 ++++ 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/python_be.cc b/src/python_be.cc index 08fa641c..d99efe58 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -1362,14 +1362,7 @@ ModelInstanceState::ProcessRequestsDecoupled( if (response_batch.data_->has_error) { for (auto& infer_request : pb_infer_requests) { - // Reset the release flags for all the requests. - infer_request->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); - // Clean up the response factory map. - { - std::lock_guard guard{response_factory_map_mutex_}; - response_factory_map_.erase( - reinterpret_cast(infer_request->RequestAddress())); - } + CleanupDecoupledRequests(infer_request); } if (response_batch.data_->is_error_set) { @@ -1875,6 +1868,20 @@ ModelInstanceState::ShareCUDAMemoryPool(const int32_t device_id) #endif // TRITON_ENABLE_GPU } +void +ModelInstanceState::CleanupDecoupledRequests( + const std::unique_ptr& infer_request) +{ + // Reset the release flags for all the requests. + infer_request->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); + // Clean up the response factory map. + { + std::lock_guard guard{response_factory_map_mutex_}; + response_factory_map_.erase( + reinterpret_cast(infer_request->RequestAddress())); + } +} + ModelInstanceState::~ModelInstanceState() { ModelState* model_state = reinterpret_cast(Model()); diff --git a/src/python_be.h b/src/python_be.h index 2fc755ca..75d76955 100644 --- a/src/python_be.h +++ b/src/python_be.h @@ -429,5 +429,9 @@ class ModelInstanceState : public BackendModelInstance { // Attempt to share CUDA memory pool with the stub process void ShareCUDAMemoryPool(const int32_t device_id); + + // Cleanup the decoupled requests when there is an error in the response. + void CleanupDecoupledRequests( + const std::unique_ptr& infer_request); }; }}} // namespace triton::backend::python From f8b2eb6eaf00dd41a85d8338f70190f7d37fe2c9 Mon Sep 17 00:00:00 2001 From: krishung5 Date: Tue, 12 Dec 2023 12:02:39 -0800 Subject: [PATCH 3/4] Move the cleanup function to the outside scope --- src/python_be.cc | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/src/python_be.cc b/src/python_be.cc index d99efe58..3dac6911 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -1361,10 +1361,6 @@ ModelInstanceState::ProcessRequestsDecoupled( reporter.SetBatchStatistics(request_count); if (response_batch.data_->has_error) { - for (auto& infer_request : pb_infer_requests) { - CleanupDecoupledRequests(infer_request); - } - if (response_batch.data_->is_error_set) { auto error = PbString::LoadFromSharedMemory( Stub()->ShmPool(), response_batch.data_->error); @@ -1874,12 +1870,21 @@ ModelInstanceState::CleanupDecoupledRequests( { // Reset the release flags for all the requests. infer_request->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); + // Clean up the response factory map. { std::lock_guard guard{response_factory_map_mutex_}; response_factory_map_.erase( reinterpret_cast(infer_request->RequestAddress())); } + + // We should only delete the response factory for the requests that have + // not been closed. + if (!ExistsInClosedRequests(infer_request->RequestAddress())) { + LOG_IF_ERROR( + infer_request->DeleteResponseFactory(), + "Failed to delete the response factory."); + } } ModelInstanceState::~ModelInstanceState() @@ -2512,15 +2517,8 @@ TRITONBACKEND_ModelInstanceExecute( } } - // We should only delete the response factory for the requests that have - // not been closed. for (auto& infer_request : infer_requests) { - if (!instance_state->ExistsInClosedRequests( - infer_request->RequestAddress())) { - LOG_IF_ERROR( - infer_request->DeleteResponseFactory(), - "Failed to delete the response factory."); - } + instance_state->CleanupDecoupledRequests(infer_request); } } } From 489217f1ee0d21cdf16e5e12eebb215c5650f94b Mon Sep 17 00:00:00 2001 From: krishung5 Date: Thu, 14 Dec 2023 14:44:35 -0800 Subject: [PATCH 4/4] Delete response factory when response sender goes out of scope --- src/infer_request.cc | 14 ------- src/infer_request.h | 4 -- src/ipc_message.h | 3 +- src/pb_response_iterator.cc | 2 +- src/pb_stub.cc | 18 ++++++--- src/pb_stub.h | 9 +++-- src/python_be.cc | 79 ++++++++----------------------------- src/python_be.h | 10 +---- src/response_sender.cc | 7 ++++ src/response_sender.h | 1 + 10 files changed, 47 insertions(+), 100 deletions(-) diff --git a/src/infer_request.cc b/src/infer_request.cc index c21feeaa..f18900d0 100644 --- a/src/infer_request.cc +++ b/src/infer_request.cc @@ -405,20 +405,6 @@ InferRequest::InferRequest( #endif } -#ifndef TRITON_PB_STUB -TRITONSERVER_Error* -InferRequest::DeleteResponseFactory() -{ - TRITONBACKEND_ResponseFactory* response_factory = - reinterpret_cast( - response_factory_address_); - TRITONSERVER_Error* error = - TRITONBACKEND_ResponseFactoryDelete(response_factory); - - return error; -} -#endif - #ifdef TRITON_PB_STUB bool InferRequest::IsCancelled() diff --git a/src/infer_request.h b/src/infer_request.h index 38850c61..b8dee87c 100644 --- a/src/infer_request.h +++ b/src/infer_request.h @@ -137,10 +137,6 @@ class InferRequest { intptr_t RequestAddress(); ~InferRequest() {} -#ifndef TRITON_PB_STUB - TRITONSERVER_Error* DeleteResponseFactory(); -#endif - private: InferRequest( AllocatedSharedMemory& infer_request_shm, diff --git a/src/ipc_message.h b/src/ipc_message.h index d720a84d..866070f6 100644 --- a/src/ipc_message.h +++ b/src/ipc_message.h @@ -54,7 +54,8 @@ typedef enum PYTHONSTUB_commandtype_enum { PYTHONSTUB_AutoCompleteRequest, PYTHONSTUB_AutoCompleteResponse, PYTHONSTUB_LogRequest, - PYTHONSTUB_CleanupRequest, + PYTHONSTUB_BLSDecoupledInferPayloadCleanup, + PYTHONSTUB_BLSDecoupledResponseFactoryCleanup, PYTHONSTUB_MetricFamilyRequestNew, PYTHONSTUB_MetricFamilyRequestDelete, PYTHONSTUB_MetricRequestNew, diff --git a/src/pb_response_iterator.cc b/src/pb_response_iterator.cc index 1e0d631a..9abf4997 100644 --- a/src/pb_response_iterator.cc +++ b/src/pb_response_iterator.cc @@ -133,7 +133,7 @@ void ResponseIterator::Clear() { std::unique_ptr& stub = Stub::GetOrCreateInstance(); - stub->EnqueueCleanupId(id_); + stub->EnqueueCleanupId(id_, PYTHONSTUB_BLSDecoupledInferPayloadCleanup); { std::lock_guard lock{mu_}; response_buffer_.push(DUMMY_MESSAGE); diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 4c5e9ae7..53a6c540 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -993,8 +993,12 @@ Stub::ServiceStubToParentRequests() stub_to_parent_buffer_.pop(); if (utils_msg_payload->command_type == PYTHONSTUB_LogRequest) { SendLogMessage(utils_msg_payload); - } else if (utils_msg_payload->command_type == PYTHONSTUB_CleanupRequest) { - SendCleanupId(utils_msg_payload); + } else if ( + (utils_msg_payload->command_type == + PYTHONSTUB_BLSDecoupledInferPayloadCleanup) || + (utils_msg_payload->command_type == + PYTHONSTUB_BLSDecoupledResponseFactoryCleanup)) { + SendCleanupId(utils_msg_payload, utils_msg_payload->command_type); } else if ( utils_msg_payload->command_type == PYTHONSTUB_IsRequestCancelled) { SendIsCancelled(utils_msg_payload); @@ -1040,7 +1044,9 @@ Stub::SendLogMessage(std::unique_ptr& utils_msg_payload) } void -Stub::SendCleanupId(std::unique_ptr& utils_msg_payload) +Stub::SendCleanupId( + std::unique_ptr& utils_msg_payload, + const PYTHONSTUB_CommandType& command_type) { void* id = utils_msg_payload->utils_message_ptr; { @@ -1050,7 +1056,7 @@ Stub::SendCleanupId(std::unique_ptr& utils_msg_payload) std::unique_ptr ipc_message = IPCMessage::Create(shm_pool_, true /* inline_response */); - ipc_message->Command() = PYTHONSTUB_CleanupRequest; + ipc_message->Command() = command_type; AllocatedSharedMemory cleanup_request_message = shm_pool_->Construct( sizeof(CleanupMessage) + @@ -1072,11 +1078,11 @@ Stub::SendCleanupId(std::unique_ptr& utils_msg_payload) } void -Stub::EnqueueCleanupId(void* id) +Stub::EnqueueCleanupId(void* id, const PYTHONSTUB_CommandType& command_type) { if (id != nullptr) { std::unique_ptr utils_msg_payload = - std::make_unique(PYTHONSTUB_CleanupRequest, id); + std::make_unique(command_type, id); EnqueueUtilsMessage(std::move(utils_msg_payload)); } } diff --git a/src/pb_stub.h b/src/pb_stub.h index 12b47abc..74a66b95 100644 --- a/src/pb_stub.h +++ b/src/pb_stub.h @@ -315,10 +315,13 @@ class Stub { std::shared_ptr infer_response); /// Send the id to the python backend for object cleanup - void SendCleanupId(std::unique_ptr& utils_msg_payload); + void SendCleanupId( + std::unique_ptr& utils_msg_payload, + const PYTHONSTUB_CommandType& command_type); - /// Add cleanup id to queue - void EnqueueCleanupId(void* id); + /// Add cleanup id to queue. This is used for cleaning up the infer_payload + /// and the response factory for BLS decoupled response. + void EnqueueCleanupId(void* id, const PYTHONSTUB_CommandType& command_type); /// Add request cancellation query to queue void EnqueueIsCancelled(PbCancel* pb_cancel); diff --git a/src/python_be.cc b/src/python_be.cc index 3dac6911..8dfa72b1 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -379,21 +379,7 @@ ModelInstanceState::SaveRequestsToSharedMemory( std::unique_ptr infer_request; if (model_state->IsDecoupled()) { TRITONBACKEND_ResponseFactory* factory_ptr; - // Reuse the response factory if there is already a response factory - // associated with the request - std::lock_guard guard{response_factory_map_mutex_}; - { - if (response_factory_map_.find(reinterpret_cast(request)) != - response_factory_map_.end()) { - factory_ptr = - response_factory_map_[reinterpret_cast(request)]; - } else { - RETURN_IF_ERROR( - TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request)); - response_factory_map_[reinterpret_cast(request)] = - factory_ptr; - } - } + RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request)); infer_request = std::make_unique( id, correlation_id, pb_input_tensors, requested_output_names, @@ -843,7 +829,8 @@ ModelInstanceState::StubToParentMQMonitor() ProcessLogRequest(message); break; } - case PYTHONSTUB_CleanupRequest: { + case PYTHONSTUB_BLSDecoupledInferPayloadCleanup: + case PYTHONSTUB_BLSDecoupledResponseFactoryCleanup: { ProcessBLSCleanupRequest(message); break; } @@ -941,9 +928,17 @@ ModelInstanceState::ProcessBLSCleanupRequest( Stub()->ShmPool()->Load(message->Args()); CleanupMessage* cleanup_message_ptr = reinterpret_cast(cleanup_request_message.data_.get()); - - void* id = cleanup_message_ptr->id; - infer_payload_.erase(reinterpret_cast(id)); + intptr_t id = reinterpret_cast(cleanup_message_ptr->id); + if (message->Command() == PYTHONSTUB_BLSDecoupledInferPayloadCleanup) { + // Remove the InferPayload object from the map. + infer_payload_.erase(id); + } else if ( + message->Command() == PYTHONSTUB_BLSDecoupledResponseFactoryCleanup) { + // Delete response factory + std::unique_ptr< + TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> + response_factory(reinterpret_cast(id)); + } { bi::scoped_lock lock{*(message->ResponseMutex())}; @@ -1172,12 +1167,6 @@ ModelInstanceState::ResponseSendDecoupled( std::lock_guard guard{closed_requests_mutex_}; closed_requests_.push_back(send_message_payload->request_address); } - - // Clean up the response factory map. - { - std::lock_guard guard{response_factory_map_mutex_}; - response_factory_map_.erase(send_message_payload->request_address); - } } if (send_message_payload->response != 0) { @@ -1195,14 +1184,7 @@ ModelInstanceState::ResponseSendDecoupled( error_message); std::vector, void*>> gpu_output_buffers; - std::unique_ptr< - TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> - response_factory_ptr; GPUBuffersHelper gpu_buffer_helper; - if (send_message_payload->flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { - response_factory_ptr.reset( - reinterpret_cast(response_factory)); - } #ifdef TRITON_ENABLE_GPU for (auto& output_tensor : infer_response->OutputTensors()) { @@ -1289,13 +1271,6 @@ ModelInstanceState::ResponseSendDecoupled( response_factory, send_message_payload->flags); SetErrorForResponseSendMessage( send_message_payload, WrapTritonErrorInSharedPtr(error), error_message); - - if (send_message_payload->flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { - std::unique_ptr< - TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> - response_factory(reinterpret_cast( - send_message_payload->response_factory_address)); - } } } @@ -1864,29 +1839,6 @@ ModelInstanceState::ShareCUDAMemoryPool(const int32_t device_id) #endif // TRITON_ENABLE_GPU } -void -ModelInstanceState::CleanupDecoupledRequests( - const std::unique_ptr& infer_request) -{ - // Reset the release flags for all the requests. - infer_request->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); - - // Clean up the response factory map. - { - std::lock_guard guard{response_factory_map_mutex_}; - response_factory_map_.erase( - reinterpret_cast(infer_request->RequestAddress())); - } - - // We should only delete the response factory for the requests that have - // not been closed. - if (!ExistsInClosedRequests(infer_request->RequestAddress())) { - LOG_IF_ERROR( - infer_request->DeleteResponseFactory(), - "Failed to delete the response factory."); - } -} - ModelInstanceState::~ModelInstanceState() { ModelState* model_state = reinterpret_cast(Model()); @@ -2518,7 +2470,8 @@ TRITONBACKEND_ModelInstanceExecute( } for (auto& infer_request : infer_requests) { - instance_state->CleanupDecoupledRequests(infer_request); + // Reset the release flags for all the requests. + infer_request->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); } } } diff --git a/src/python_be.h b/src/python_be.h index 75d76955..e644e159 100644 --- a/src/python_be.h +++ b/src/python_be.h @@ -288,9 +288,6 @@ class ModelInstanceState : public BackendModelInstance { std::unique_ptr thread_pool_; std::unordered_map> infer_payload_; std::unique_ptr request_executor_; - std::mutex response_factory_map_mutex_; - std::unordered_map - response_factory_map_; public: static TRITONSERVER_Error* Create( @@ -403,7 +400,8 @@ class ModelInstanceState : public BackendModelInstance { std::unique_ptr* infer_response, bi::managed_external_buffer::handle_t* response_handle); - // Process the bls decoupled cleanup request + // Process the bls decoupled cleanup request for InferPayload and + // ResponseFactory void ProcessBLSCleanupRequest(const std::unique_ptr& message); // Process request cancellation query @@ -429,9 +427,5 @@ class ModelInstanceState : public BackendModelInstance { // Attempt to share CUDA memory pool with the stub process void ShareCUDAMemoryPool(const int32_t device_id); - - // Cleanup the decoupled requests when there is an error in the response. - void CleanupDecoupledRequests( - const std::unique_ptr& infer_request); }; }}} // namespace triton::backend::python diff --git a/src/response_sender.cc b/src/response_sender.cc index c6b8f788..fe06e554 100644 --- a/src/response_sender.cc +++ b/src/response_sender.cc @@ -45,6 +45,13 @@ ResponseSender::ResponseSender( { } +ResponseSender::~ResponseSender() +{ + std::unique_ptr& stub = Stub::GetOrCreateInstance(); + stub->EnqueueCleanupId( + reinterpret_cast(response_factory_address_), + PYTHONSTUB_BLSDecoupledResponseFactoryCleanup); +} void ResponseSender::Send( diff --git a/src/response_sender.h b/src/response_sender.h index fda0d5d3..d29a6ab6 100644 --- a/src/response_sender.h +++ b/src/response_sender.h @@ -38,6 +38,7 @@ class ResponseSender { intptr_t request_address, intptr_t response_factory_address, std::unique_ptr& shm_pool, const std::shared_ptr& pb_cancel); + ~ResponseSender(); void Send(std::shared_ptr response, const uint32_t flags); bool IsCancelled();