diff --git a/src/infer_request.cc b/src/infer_request.cc index c21feeaa..f18900d0 100644 --- a/src/infer_request.cc +++ b/src/infer_request.cc @@ -405,20 +405,6 @@ InferRequest::InferRequest( #endif } -#ifndef TRITON_PB_STUB -TRITONSERVER_Error* -InferRequest::DeleteResponseFactory() -{ - TRITONBACKEND_ResponseFactory* response_factory = - reinterpret_cast( - response_factory_address_); - TRITONSERVER_Error* error = - TRITONBACKEND_ResponseFactoryDelete(response_factory); - - return error; -} -#endif - #ifdef TRITON_PB_STUB bool InferRequest::IsCancelled() diff --git a/src/infer_request.h b/src/infer_request.h index 38850c61..b8dee87c 100644 --- a/src/infer_request.h +++ b/src/infer_request.h @@ -137,10 +137,6 @@ class InferRequest { intptr_t RequestAddress(); ~InferRequest() {} -#ifndef TRITON_PB_STUB - TRITONSERVER_Error* DeleteResponseFactory(); -#endif - private: InferRequest( AllocatedSharedMemory& infer_request_shm, diff --git a/src/ipc_message.h b/src/ipc_message.h index d720a84d..866070f6 100644 --- a/src/ipc_message.h +++ b/src/ipc_message.h @@ -54,7 +54,8 @@ typedef enum PYTHONSTUB_commandtype_enum { PYTHONSTUB_AutoCompleteRequest, PYTHONSTUB_AutoCompleteResponse, PYTHONSTUB_LogRequest, - PYTHONSTUB_CleanupRequest, + PYTHONSTUB_BLSDecoupledInferPayloadCleanup, + PYTHONSTUB_BLSDecoupledResponseFactoryCleanup, PYTHONSTUB_MetricFamilyRequestNew, PYTHONSTUB_MetricFamilyRequestDelete, PYTHONSTUB_MetricRequestNew, diff --git a/src/pb_response_iterator.cc b/src/pb_response_iterator.cc index 1e0d631a..9abf4997 100644 --- a/src/pb_response_iterator.cc +++ b/src/pb_response_iterator.cc @@ -133,7 +133,7 @@ void ResponseIterator::Clear() { std::unique_ptr& stub = Stub::GetOrCreateInstance(); - stub->EnqueueCleanupId(id_); + stub->EnqueueCleanupId(id_, PYTHONSTUB_BLSDecoupledInferPayloadCleanup); { std::lock_guard lock{mu_}; response_buffer_.push(DUMMY_MESSAGE); diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 4c5e9ae7..53a6c540 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -993,8 +993,12 @@ Stub::ServiceStubToParentRequests() stub_to_parent_buffer_.pop(); if (utils_msg_payload->command_type == PYTHONSTUB_LogRequest) { SendLogMessage(utils_msg_payload); - } else if (utils_msg_payload->command_type == PYTHONSTUB_CleanupRequest) { - SendCleanupId(utils_msg_payload); + } else if ( + (utils_msg_payload->command_type == + PYTHONSTUB_BLSDecoupledInferPayloadCleanup) || + (utils_msg_payload->command_type == + PYTHONSTUB_BLSDecoupledResponseFactoryCleanup)) { + SendCleanupId(utils_msg_payload, utils_msg_payload->command_type); } else if ( utils_msg_payload->command_type == PYTHONSTUB_IsRequestCancelled) { SendIsCancelled(utils_msg_payload); @@ -1040,7 +1044,9 @@ Stub::SendLogMessage(std::unique_ptr& utils_msg_payload) } void -Stub::SendCleanupId(std::unique_ptr& utils_msg_payload) +Stub::SendCleanupId( + std::unique_ptr& utils_msg_payload, + const PYTHONSTUB_CommandType& command_type) { void* id = utils_msg_payload->utils_message_ptr; { @@ -1050,7 +1056,7 @@ Stub::SendCleanupId(std::unique_ptr& utils_msg_payload) std::unique_ptr ipc_message = IPCMessage::Create(shm_pool_, true /* inline_response */); - ipc_message->Command() = PYTHONSTUB_CleanupRequest; + ipc_message->Command() = command_type; AllocatedSharedMemory cleanup_request_message = shm_pool_->Construct( sizeof(CleanupMessage) + @@ -1072,11 +1078,11 @@ Stub::SendCleanupId(std::unique_ptr& utils_msg_payload) } void -Stub::EnqueueCleanupId(void* id) +Stub::EnqueueCleanupId(void* id, const PYTHONSTUB_CommandType& command_type) { if (id != nullptr) { std::unique_ptr utils_msg_payload = - std::make_unique(PYTHONSTUB_CleanupRequest, id); + std::make_unique(command_type, id); EnqueueUtilsMessage(std::move(utils_msg_payload)); } } diff --git a/src/pb_stub.h b/src/pb_stub.h index 12b47abc..74a66b95 100644 --- a/src/pb_stub.h +++ b/src/pb_stub.h @@ -315,10 +315,13 @@ class Stub { std::shared_ptr infer_response); /// Send the id to the python backend for object cleanup - void SendCleanupId(std::unique_ptr& utils_msg_payload); + void SendCleanupId( + std::unique_ptr& utils_msg_payload, + const PYTHONSTUB_CommandType& command_type); - /// Add cleanup id to queue - void EnqueueCleanupId(void* id); + /// Add cleanup id to queue. This is used for cleaning up the infer_payload + /// and the response factory for BLS decoupled response. + void EnqueueCleanupId(void* id, const PYTHONSTUB_CommandType& command_type); /// Add request cancellation query to queue void EnqueueIsCancelled(PbCancel* pb_cancel); diff --git a/src/python_be.cc b/src/python_be.cc index 6de5bcf3..8dfa72b1 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -379,21 +379,7 @@ ModelInstanceState::SaveRequestsToSharedMemory( std::unique_ptr infer_request; if (model_state->IsDecoupled()) { TRITONBACKEND_ResponseFactory* factory_ptr; - // Reuse the response factory if there is already a response factory - // associated with the request - std::lock_guard guard{response_factory_map_mutex_}; - { - if (response_factory_map_.find(reinterpret_cast(request)) != - response_factory_map_.end()) { - factory_ptr = - response_factory_map_[reinterpret_cast(request)]; - } else { - RETURN_IF_ERROR( - TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request)); - response_factory_map_[reinterpret_cast(request)] = - factory_ptr; - } - } + RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request)); infer_request = std::make_unique( id, correlation_id, pb_input_tensors, requested_output_names, @@ -843,7 +829,8 @@ ModelInstanceState::StubToParentMQMonitor() ProcessLogRequest(message); break; } - case PYTHONSTUB_CleanupRequest: { + case PYTHONSTUB_BLSDecoupledInferPayloadCleanup: + case PYTHONSTUB_BLSDecoupledResponseFactoryCleanup: { ProcessBLSCleanupRequest(message); break; } @@ -941,9 +928,17 @@ ModelInstanceState::ProcessBLSCleanupRequest( Stub()->ShmPool()->Load(message->Args()); CleanupMessage* cleanup_message_ptr = reinterpret_cast(cleanup_request_message.data_.get()); - - void* id = cleanup_message_ptr->id; - infer_payload_.erase(reinterpret_cast(id)); + intptr_t id = reinterpret_cast(cleanup_message_ptr->id); + if (message->Command() == PYTHONSTUB_BLSDecoupledInferPayloadCleanup) { + // Remove the InferPayload object from the map. + infer_payload_.erase(id); + } else if ( + message->Command() == PYTHONSTUB_BLSDecoupledResponseFactoryCleanup) { + // Delete response factory + std::unique_ptr< + TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> + response_factory(reinterpret_cast(id)); + } { bi::scoped_lock lock{*(message->ResponseMutex())}; @@ -1172,12 +1167,6 @@ ModelInstanceState::ResponseSendDecoupled( std::lock_guard guard{closed_requests_mutex_}; closed_requests_.push_back(send_message_payload->request_address); } - - // Clean up the response factory map. - { - std::lock_guard guard{response_factory_map_mutex_}; - response_factory_map_.erase(send_message_payload->request_address); - } } if (send_message_payload->response != 0) { @@ -1195,14 +1184,7 @@ ModelInstanceState::ResponseSendDecoupled( error_message); std::vector, void*>> gpu_output_buffers; - std::unique_ptr< - TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> - response_factory_ptr; GPUBuffersHelper gpu_buffer_helper; - if (send_message_payload->flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { - response_factory_ptr.reset( - reinterpret_cast(response_factory)); - } #ifdef TRITON_ENABLE_GPU for (auto& output_tensor : infer_response->OutputTensors()) { @@ -1289,13 +1271,6 @@ ModelInstanceState::ResponseSendDecoupled( response_factory, send_message_payload->flags); SetErrorForResponseSendMessage( send_message_payload, WrapTritonErrorInSharedPtr(error), error_message); - - if (send_message_payload->flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { - std::unique_ptr< - TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> - response_factory(reinterpret_cast( - send_message_payload->response_factory_address)); - } } } @@ -1368,11 +1343,6 @@ ModelInstanceState::ProcessRequestsDecoupled( TRITONSERVER_ERROR_INTERNAL, error->String().c_str()); } - // Reset the release flags for all the requests. - for (auto& infer_request : pb_infer_requests) { - infer_request->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); - } - return TRITONSERVER_ErrorNew( TRITONSERVER_ERROR_INTERNAL, "Failed to process the requests."); } @@ -2499,15 +2469,9 @@ TRITONBACKEND_ModelInstanceExecute( } } - // We should only delete the response factory for the requests that have - // not been closed. for (auto& infer_request : infer_requests) { - if (!instance_state->ExistsInClosedRequests( - infer_request->RequestAddress())) { - LOG_IF_ERROR( - infer_request->DeleteResponseFactory(), - "Failed to delete the response factory."); - } + // Reset the release flags for all the requests. + infer_request->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); } } } diff --git a/src/python_be.h b/src/python_be.h index 2fc755ca..e644e159 100644 --- a/src/python_be.h +++ b/src/python_be.h @@ -288,9 +288,6 @@ class ModelInstanceState : public BackendModelInstance { std::unique_ptr thread_pool_; std::unordered_map> infer_payload_; std::unique_ptr request_executor_; - std::mutex response_factory_map_mutex_; - std::unordered_map - response_factory_map_; public: static TRITONSERVER_Error* Create( @@ -403,7 +400,8 @@ class ModelInstanceState : public BackendModelInstance { std::unique_ptr* infer_response, bi::managed_external_buffer::handle_t* response_handle); - // Process the bls decoupled cleanup request + // Process the bls decoupled cleanup request for InferPayload and + // ResponseFactory void ProcessBLSCleanupRequest(const std::unique_ptr& message); // Process request cancellation query diff --git a/src/response_sender.cc b/src/response_sender.cc index c6b8f788..fe06e554 100644 --- a/src/response_sender.cc +++ b/src/response_sender.cc @@ -45,6 +45,13 @@ ResponseSender::ResponseSender( { } +ResponseSender::~ResponseSender() +{ + std::unique_ptr& stub = Stub::GetOrCreateInstance(); + stub->EnqueueCleanupId( + reinterpret_cast(response_factory_address_), + PYTHONSTUB_BLSDecoupledResponseFactoryCleanup); +} void ResponseSender::Send( diff --git a/src/response_sender.h b/src/response_sender.h index fda0d5d3..d29a6ab6 100644 --- a/src/response_sender.h +++ b/src/response_sender.h @@ -38,6 +38,7 @@ class ResponseSender { intptr_t request_address, intptr_t response_factory_address, std::unique_ptr& shm_pool, const std::shared_ptr& pb_cancel); + ~ResponseSender(); void Send(std::shared_ptr response, const uint32_t flags); bool IsCancelled();