From b044f39843def881d0971e39579b44198c095ce5 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 13 Dec 2024 17:07:21 +0100 Subject: [PATCH] JsonRpcConnection#HandleIncomingMessages(): don't block #m_IoStrand with #MessageHandler() --- lib/remote/jsonrpcconnection.cpp | 82 +++++++++++++++++++------------- 1 file changed, 48 insertions(+), 34 deletions(-) diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index c70b82d94e..bcbc29f004 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -13,6 +13,7 @@ #include "base/logger.hpp" #include "base/exception.hpp" #include "base/convert.hpp" +#include "base/shared.hpp" #include "base/tlsstream.hpp" #include #include @@ -66,13 +67,14 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) return ch::duration_cast(d).count(); }); + auto& io (m_IoStrand.context()); + auto msg (Shared>::Make(String(), io)); + m_Stream->next_layer().SetSeen(&m_Seen); while (!m_ShuttingDown) { - String jsonString; - try { - jsonString = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024); + msg->first = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024); } catch (const std::exception& ex) { Log(m_ShuttingDown ? LogDebug : LogNotice, "JsonRpcConnection") << "Error while reading JSON-RPC message for identity '" << m_Identity @@ -83,52 +85,64 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) m_Seen = Utility::GetTime(); if (m_Endpoint) { - m_Endpoint->AddMessageReceived(jsonString.GetLength()); + m_Endpoint->AddMessageReceived(msg->first.GetLength()); } - String rpcMethod("UNKNOWN"); ch::steady_clock::duration cpuBoundDuration(0); auto start (ch::steady_clock::now()); + CpuBoundWork handleMessage (yc); - try { - CpuBoundWork handleMessage (yc); + // Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads. + cpuBoundDuration = ch::steady_clock::now() - start; - // Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads. - cpuBoundDuration = ch::steady_clock::now() - start; + io.post([this, keepAlive = Ptr(this), msg, start, toMilliseconds, cpuBoundDuration] { + Defer wakeUp ([this, msg] { + m_IoStrand.post([msg] { msg->second.Set(); }); + }); - Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString); - if (String method = message->Get("method"); !method.IsEmpty()) { - rpcMethod = std::move(method); - } + String rpcMethod ("UNKNOWN"); + + try { + Dictionary::Ptr message = JsonRpc::DecodeMessage(msg->first); + if (String method = message->Get("method"); !method.IsEmpty()) { + rpcMethod = std::move(method); + } - MessageHandler(message); + MessageHandler(message); - l_TaskStats.InsertValue(Utility::GetTime(), 1); + l_TaskStats.InsertValue(Utility::GetTime(), 1); - auto total = ch::steady_clock::now() - start; + auto total = ch::steady_clock::now() - start; + Log msg (total >= ch::seconds(5) ? LogWarning : LogDebug, "JsonRpcConnection"); - Log msg(total >= ch::seconds(5) ? LogWarning : LogDebug, "JsonRpcConnection"); - msg << "Processed JSON-RPC '" << rpcMethod << "' message for identity '" << m_Identity - << "' (took total " << toMilliseconds(total) << "ms"; + msg << "Processed JSON-RPC '" << rpcMethod << "' message for identity '" << m_Identity + << "' (took total " << toMilliseconds(total) << "ms"; - if (cpuBoundDuration >= ch::seconds(1)) { - msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore"; - } - msg << ")."; - } catch (const std::exception& ex) { - auto total = ch::steady_clock::now() - start; + if (cpuBoundDuration >= ch::seconds(1)) { + msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore"; + } - Log msg(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection"); - msg << "Error while processing JSON-RPC '" << rpcMethod << "' message for identity '" - << m_Identity << "' (took total " << toMilliseconds(total) << "ms"; + msg << ")."; + } catch (const std::exception& ex) { + auto total = ch::steady_clock::now() - start; + + Log msg (m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection"); + + msg << "Error while processing JSON-RPC '" << rpcMethod << "' message for identity '" + << m_Identity << "' (took total " << toMilliseconds(total) << "ms"; + + if (cpuBoundDuration >= ch::seconds(1)) { + msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore"; + } - if (cpuBoundDuration >= ch::seconds(1)) { - msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore"; + msg << "): " << DiagnosticInformation(ex); + + Disconnect(); } - msg << "): " << DiagnosticInformation(ex); + }); - break; - } + msg->second.Wait(yc); + msg->second.Clear(); } Disconnect(); @@ -381,7 +395,7 @@ void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message) resultMessage->Set("jsonrpc", "2.0"); resultMessage->Set("id", message->Get("id")); - SendMessageInternal(resultMessage); + SendMessage(resultMessage); } }