diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 0641809e..844d58f2 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -24,6 +24,7 @@ #include #include "AsioDefines.h" +#include "ClientConnectionAdaptor.h" #include "ClientImpl.h" #include "Commands.h" #include "ConnectionPool.h" @@ -1469,19 +1470,8 @@ Future ClientConnection::newGetSchema(const std::string& top return promise.getFuture(); } -void ClientConnection::checkServerError(ServerError error) { - switch (error) { - case proto::ServerError::ServiceNotReady: - close(ResultDisconnected); - break; - case proto::ServerError::TooManyRequests: - // TODO: Implement maxNumberOfRejectedRequestPerConnection like - // https://github.com/apache/pulsar/pull/274 - close(ResultDisconnected); - break; - default: - break; - } +void ClientConnection::checkServerError(ServerError error, const std::string& message) { + pulsar::adaptor::checkServerError(*this, error, message); } void ClientConnection::handleSendReceipt(const proto::CommandSendReceipt& sendReceipt) { @@ -1573,7 +1563,7 @@ void ClientConnection::handlePartitionedMetadataResponse( << partitionMetadataResponse.request_id() << " error: " << partitionMetadataResponse.error() << " msg: " << partitionMetadataResponse.message()); - checkServerError(partitionMetadataResponse.error()); + checkServerError(partitionMetadataResponse.error(), partitionMetadataResponse.message()); lookupDataPromise->setFailed( getResult(partitionMetadataResponse.error(), partitionMetadataResponse.message())); } else { @@ -1650,7 +1640,7 @@ void ClientConnection::handleLookupTopicRespose( LOG_ERROR(cnxString_ << "Failed lookup req_id: " << lookupTopicResponse.request_id() << " error: " << lookupTopicResponse.error() << " msg: " << lookupTopicResponse.message()); - checkServerError(lookupTopicResponse.error()); + checkServerError(lookupTopicResponse.error(), lookupTopicResponse.message()); lookupDataPromise->setFailed( getResult(lookupTopicResponse.error(), lookupTopicResponse.message())); } else { diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 851ec0c3..b16fc694 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -404,7 +404,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this + +#include "ProtoApiEnums.h" +#include "PulsarApi.pb.h" + +namespace pulsar { + +namespace adaptor { + +template +inline void checkServerError(Connection& connection, ServerError error, const std::string& message) { + switch (error) { + case proto::ServerError::ServiceNotReady: + // There are some typical error messages that should not trigger the + // close() of the connection. + // "Namespace bundle ... is being unloaded" + // "KeeperException$..." + // "Failed to acquire ownership for namespace bundle ..." + // Before https://github.com/apache/pulsar/pull/21211, the error of the 1st and 2nd messages + // is ServiceNotReady. Before https://github.com/apache/pulsar/pull/21993, the error of the 3rd + // message is ServiceNotReady. + if (message.find("Failed to acquire ownership") == std::string::npos && + message.find("KeeperException") == std::string::npos && + message.find("is being unloaded") == std::string::npos) { + connection.close(ResultDisconnected); + } + break; + case proto::ServerError::TooManyRequests: + // TODO: Implement maxNumberOfRejectedRequestPerConnection like + // https://github.com/apache/pulsar/pull/274 + connection.close(ResultDisconnected); + break; + default: + break; + } +} + +} // namespace adaptor + +} // namespace pulsar diff --git a/tests/ConnectionTest.cc b/tests/ConnectionTest.cc new file mode 100644 index 00000000..e0d063e9 --- /dev/null +++ b/tests/ConnectionTest.cc @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + */ +#include +#include + +#include + +#include "lib/ClientConnection.h" +#include "lib/ClientConnectionAdaptor.h" + +using namespace pulsar; + +class MockClientConnection { + public: + MOCK_METHOD(void, close, (Result)); + + void checkServerError(ServerError error, const std::string& message) { + ::pulsar::adaptor::checkServerError(*this, error, message); + } +}; + +// These error messages come from +// https://github.com/apache/pulsar/blob/a702e5a582eaa8292720f9e25fc2dcf858078813/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java#L334-L351 +static const std::vector retryableErrorMessages{ + "Namespace bundle public/default/0x00000000_0xffffffff is being unloaded", + "org.apache.zookeeper.KeeperException$OperationTimeoutException: KeeperErrorCode = OperationTimeout for " + "/namespace/public/default/0x00000000_0xffffffff", + "Failed to acquire ownership for namespace bundle public/default/0x00000000_0xffffffff"}; + +TEST(ConnectionTest, testCheckServerError) { + MockClientConnection conn; + EXPECT_CALL(conn, close(ResultDisconnected)).Times(0); + for (auto&& msg : retryableErrorMessages) { + conn.checkServerError(pulsar::proto::ServiceNotReady, msg); + } +}