diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index 44f1662dde1..66ce2a2f146 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -60,13 +60,15 @@ void JsonRpcConnection::Start() void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) { + namespace ch = std::chrono; + m_Stream->next_layer().SetSeen(&m_Seen); for (;;) { - String message; + String jsonString; try { - message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024); + jsonString = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024); } catch (const std::exception& ex) { Log(m_ShuttingDown ? LogDebug : LogNotice, "JsonRpcConnection") << "Error while reading JSON-RPC message for identity '" << m_Identity @@ -76,20 +78,54 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) } m_Seen = Utility::GetTime(); + if (m_Endpoint) { + m_Endpoint->AddMessageReceived(jsonString.GetLength()); + } + + String rpcMethod("UNKNOWN"); + String diagnosticInfo; // Contains the diagnostic/debug information in case of an error. + + ch::steady_clock::duration cpuBoundDuration, totalDuration; + auto start (ch::steady_clock::now()); try { + Defer extractTotalDuration ([&start, &totalDuration]() { + totalDuration = ch::steady_clock::now() - start; + }); + CpuBoundWork handleMessage (yc); + // Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads. + cpuBoundDuration = ch::steady_clock::now() - start; + + Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString); + if (auto method = message->Get("method"); !method.IsEmpty()) { + rpcMethod = method; + } + MessageHandler(message); l_TaskStats.InsertValue(Utility::GetTime(), 1); } catch (const std::exception& ex) { - Log(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection") - << "Error while processing JSON-RPC message for identity '" << m_Identity - << "': " << DiagnosticInformation(ex); + diagnosticInfo = DiagnosticInformation(ex); + } - break; + auto severity = LogDebug; + if (totalDuration >= ch::seconds(5) || (!m_ShuttingDown && !diagnosticInfo.IsEmpty())) { + // Either this RPC message took an unexpectedly long time to process, or a fatal error + // has occurred, so promote the log entry from debug to warning. + severity = LogWarning; + } + + Log statsLog(severity, "JsonRpcConnection"); + statsLog << "Processing JSON-RPC '" << rpcMethod << "' message for identity '" << m_Identity << "' "; + + if (cpuBoundDuration >= ch::seconds(1)) { + statsLog << "waited " << ch::duration_cast(cpuBoundDuration).count() << "ms on semaphore and "; } + + statsLog << "took total " << ch::duration_cast(totalDuration).count() + << "ms" << (diagnosticInfo.IsEmpty() ? "." : ": Error: "+diagnosticInfo); } Disconnect(); @@ -259,10 +295,8 @@ void JsonRpcConnection::Disconnect() } } -void JsonRpcConnection::MessageHandler(const String& jsonString) +void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message) { - Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString); - if (m_Endpoint && message->Contains("ts")) { double ts = message->Get("ts"); @@ -281,8 +315,6 @@ void JsonRpcConnection::MessageHandler(const String& jsonString) origin->FromZone = m_Endpoint->GetZone(); else origin->FromZone = Zone::GetByName(message->Get("originZone")); - - m_Endpoint->AddMessageReceived(jsonString.GetLength()); } Value vmethod; diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index 3515573bb7c..8261bb34c24 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -89,7 +89,19 @@ class JsonRpcConnection final : public Object void CheckLiveness(boost::asio::yield_context yc); bool ProcessMessage(); - void MessageHandler(const String& jsonString); + + /** + * MessageHandler routes the provided message to its corresponding handler (if any). + * + * This will first verify the timestamp of that RPC message (if any) and subsequently, rejects any message whose + * timestamp is less than the remote log position of the client Endpoint; otherwise, the endpoint's remote log + * position is updated to that timestamp. It is not expected to happen, but any message lacking an RPC method or + * referring to a non-existent one is also discarded. Afterwards, the RPC handler is then called for that message + * and sends it's result back to the sender if the message contains an ID. + * + * @param message Dictionary::Ptr The RPC message you want to process. + */ + void MessageHandler(const Dictionary::Ptr& message); void CertificateRequestResponseHandler(const Dictionary::Ptr& message);