Skip to content

Commit

Permalink
JsonRpcConnection#HandleIncomingMessages(): don't block #m_IoStrand w…
Browse files Browse the repository at this point in the history
…ith #MessageHandler()
  • Loading branch information
Al2Klimov committed Dec 13, 2024
1 parent e50eb52 commit b044f39
Showing 1 changed file with 48 additions and 34 deletions.
82 changes: 48 additions & 34 deletions lib/remote/jsonrpcconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "base/logger.hpp"
#include "base/exception.hpp"
#include "base/convert.hpp"
#include "base/shared.hpp"
#include "base/tlsstream.hpp"
#include <memory>
#include <utility>
Expand Down Expand Up @@ -66,13 +67,14 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
return ch::duration_cast<ch::milliseconds>(d).count();
});

auto& io (m_IoStrand.context());
auto msg (Shared<std::pair<String, AsioConditionVariable>>::Make(String(), io));

m_Stream->next_layer().SetSeen(&m_Seen);

while (!m_ShuttingDown) {
String jsonString;

try {
jsonString = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
msg->first = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
} catch (const std::exception& ex) {
Log(m_ShuttingDown ? LogDebug : LogNotice, "JsonRpcConnection")
<< "Error while reading JSON-RPC message for identity '" << m_Identity
Expand All @@ -83,52 +85,64 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)

m_Seen = Utility::GetTime();
if (m_Endpoint) {
m_Endpoint->AddMessageReceived(jsonString.GetLength());
m_Endpoint->AddMessageReceived(msg->first.GetLength());
}

String rpcMethod("UNKNOWN");
ch::steady_clock::duration cpuBoundDuration(0);
auto start (ch::steady_clock::now());
CpuBoundWork handleMessage (yc);

try {
CpuBoundWork handleMessage (yc);
// Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
cpuBoundDuration = ch::steady_clock::now() - start;

// Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
cpuBoundDuration = ch::steady_clock::now() - start;
io.post([this, keepAlive = Ptr(this), msg, start, toMilliseconds, cpuBoundDuration] {
Defer wakeUp ([this, msg] {
m_IoStrand.post([msg] { msg->second.Set(); });
});

Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
if (String method = message->Get("method"); !method.IsEmpty()) {
rpcMethod = std::move(method);
}
String rpcMethod ("UNKNOWN");

try {
Dictionary::Ptr message = JsonRpc::DecodeMessage(msg->first);
if (String method = message->Get("method"); !method.IsEmpty()) {
rpcMethod = std::move(method);
}

MessageHandler(message);
MessageHandler(message);

l_TaskStats.InsertValue(Utility::GetTime(), 1);
l_TaskStats.InsertValue(Utility::GetTime(), 1);

auto total = ch::steady_clock::now() - start;
auto total = ch::steady_clock::now() - start;
Log msg (total >= ch::seconds(5) ? LogWarning : LogDebug, "JsonRpcConnection");

Log msg(total >= ch::seconds(5) ? LogWarning : LogDebug, "JsonRpcConnection");
msg << "Processed JSON-RPC '" << rpcMethod << "' message for identity '" << m_Identity
<< "' (took total " << toMilliseconds(total) << "ms";
msg << "Processed JSON-RPC '" << rpcMethod << "' message for identity '" << m_Identity
<< "' (took total " << toMilliseconds(total) << "ms";

if (cpuBoundDuration >= ch::seconds(1)) {
msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore";
}
msg << ").";
} catch (const std::exception& ex) {
auto total = ch::steady_clock::now() - start;
if (cpuBoundDuration >= ch::seconds(1)) {
msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore";
}

Log msg(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection");
msg << "Error while processing JSON-RPC '" << rpcMethod << "' message for identity '"
<< m_Identity << "' (took total " << toMilliseconds(total) << "ms";
msg << ").";
} catch (const std::exception& ex) {
auto total = ch::steady_clock::now() - start;

Log msg (m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection");

msg << "Error while processing JSON-RPC '" << rpcMethod << "' message for identity '"
<< m_Identity << "' (took total " << toMilliseconds(total) << "ms";

if (cpuBoundDuration >= ch::seconds(1)) {
msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore";
}

if (cpuBoundDuration >= ch::seconds(1)) {
msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore";
msg << "): " << DiagnosticInformation(ex);

Disconnect();
}
msg << "): " << DiagnosticInformation(ex);
});

break;
}
msg->second.Wait(yc);
msg->second.Clear();
}

Disconnect();
Expand Down Expand Up @@ -381,7 +395,7 @@ void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message)
resultMessage->Set("jsonrpc", "2.0");
resultMessage->Set("id", message->Get("id"));

SendMessageInternal(resultMessage);
SendMessage(resultMessage);
}
}

Expand Down

0 comments on commit b044f39

Please sign in to comment.