Skip to content

Commit

Permalink
Fix multi-topics consumer could receive old messages after seek (#388)
Browse files Browse the repository at this point in the history
### Motivation

See apache/pulsar#21945

### Modifications

In C++ client, the multi-topics consumer receives messages by
configuring internal consumers with a message listener that adds
messages to `incomingMessages_`. So this patch pauses the listeners
before seek and resumes them after seek.

Add `MultiTopicsConsumerTest.testSeekToNewerPosition` for test.
  • Loading branch information
BewareMyPower authored Feb 2, 2024
1 parent d107d32 commit a1e2b4a
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 57 deletions.
2 changes: 0 additions & 2 deletions lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,6 @@ class ConsumerImpl : public ConsumerImplBase {
const ClientConnectionPtr& cnx, MessageId& messageId);

friend class PulsarFriend;

// these two declared friend to access setNegativeAcknowledgeEnabledForTesting
friend class MultiTopicsConsumerImpl;

FRIEND_TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages);
Expand Down
51 changes: 0 additions & 51 deletions lib/MultiResultCallback.h

This file was deleted.

38 changes: 34 additions & 4 deletions lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "LookupService.h"
#include "MessageImpl.h"
#include "MessagesImpl.h"
#include "MultiResultCallback.h"
#include "MultiTopicsBrokerConsumerStatsImpl.h"
#include "TopicName.h"
#include "UnAckedMessageTrackerDisabled.h"
Expand Down Expand Up @@ -521,6 +520,9 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) {
}

void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
if (PULSAR_UNLIKELY(duringSeek_.load(std::memory_order_acquire))) {
return;
}
LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic()
<< " message:" << msg.getDataAsString());
msg.impl_->setTopicName(consumer.impl_->getTopicPtr());
Expand Down Expand Up @@ -907,9 +909,37 @@ void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callb
return;
}

MultiResultCallback multiResultCallback(callback, consumers_.size());
consumers_.forEachValue([&timestamp, &multiResultCallback](ConsumerImplPtr consumer) {
consumer->seekAsync(timestamp, multiResultCallback);
duringSeek_.store(true, std::memory_order_release);
consumers_.forEachValue([](const ConsumerImplPtr& consumer) { consumer->pauseMessageListener(); });
unAckedMessageTrackerPtr_->clear();
incomingMessages_.clear();
incomingMessagesSize_ = 0L;

auto weakSelf = weak_from_this();
auto numConsumersLeft = std::make_shared<std::atomic<int64_t>>(consumers_.size());
auto wrappedCallback = [this, weakSelf, callback, numConsumersLeft](Result result) {
auto self = weakSelf.lock();
if (PULSAR_UNLIKELY(!self)) {
callback(result);
return;
}
if (result != ResultOk) {
*numConsumersLeft = 0; // skip the following callbacks
callback(result);
return;
}
if (--*numConsumersLeft > 0) {
return;
}
duringSeek_.store(false, std::memory_order_release);
listenerExecutor_->postWork([this, self] {
consumers_.forEachValue(
[](const ConsumerImplPtr& consumer) { consumer->resumeMessageListener(); });
});
callback(ResultOk);
};
consumers_.forEachValue([timestamp, &wrappedCallback](const ConsumerImplPtr& consumer) {
consumer->seekAsync(timestamp, wrappedCallback);
});
}

Expand Down
1 change: 1 addition & 0 deletions lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
const Commands::SubscriptionMode subscriptionMode_;
boost::optional<MessageId> startMessageId_;
ConsumerInterceptorsPtr interceptors_;
std::atomic_bool duringSeek_{false};

/* methods */
void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
Expand Down
105 changes: 105 additions & 0 deletions tests/MultiTopicsConsumerTest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>

#include <chrono>

#include "ThreadSafeMessages.h"
#include "lib/LogUtils.h"

static const std::string lookupUrl = "pulsar://localhost:6650";

DECLARE_LOG_OBJECT()

using namespace pulsar;

extern std::string unique_str();

TEST(MultiTopicsConsumerTest, testSeekToNewerPosition) {
const std::string topicPrefix = "multi-topics-consumer-seek-to-newer-position";
Client client{lookupUrl};
std::vector<std::string> topics{topicPrefix + unique_str(), topicPrefix + unique_str()};
Producer producer1;
ASSERT_EQ(ResultOk, client.createProducer(topics[0], producer1));
Producer producer2;
ASSERT_EQ(ResultOk, client.createProducer(topics[1], producer2));
producer1.send(MessageBuilder().setContent("1-0").build());
producer2.send(MessageBuilder().setContent("2-0").build());
producer1.send(MessageBuilder().setContent("1-1").build());
producer2.send(MessageBuilder().setContent("2-1").build());

Consumer consumer;
ConsumerConfiguration conf;
conf.setSubscriptionInitialPosition(InitialPositionEarliest);
ASSERT_EQ(ResultOk, client.subscribe(topics, "sub", conf, consumer));
std::vector<int64_t> timestamps;
Message msg;
for (int i = 0; i < 4; i++) {
ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
timestamps.emplace_back(msg.getPublishTimestamp());
}
std::sort(timestamps.begin(), timestamps.end());
const auto timestamp = timestamps[2];
consumer.close();

ThreadSafeMessages messages{2};

// Test synchronous receive after seek
ASSERT_EQ(ResultOk, client.subscribe(topics, "sub-2", conf, consumer));
consumer.seek(timestamp);
for (int i = 0; i < 2; i++) {
ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
messages.add(msg);
}
ASSERT_EQ(messages.getSortedValues(), (std::vector<std::string>{"1-1", "2-1"}));
consumer.close();

// Test asynchronous receive after seek
ASSERT_EQ(ResultOk, client.subscribe(topics, "sub-3", conf, consumer));
messages.clear();
consumer.seek(timestamp);
for (int i = 0; i < 2; i++) {
consumer.receiveAsync([&messages](Result result, const Message& msg) {
if (result == ResultOk) {
messages.add(msg);
} else {
LOG_ERROR("Failed to receive: " << result);
}
});
}
ASSERT_TRUE(messages.wait(std::chrono::seconds(3)));
ASSERT_EQ(messages.getSortedValues(), (std::vector<std::string>{"1-1", "2-1"}));
consumer.close();

// Test message listener
conf.setMessageListener([&messages](Consumer consumer, Message msg) { messages.add(msg); });
messages.clear();
messages.setMinNumMsgs(4);
ASSERT_EQ(ResultOk, client.subscribe(topics, "sub-4", conf, consumer));
ASSERT_TRUE(messages.wait(std::chrono::seconds(3)));
ASSERT_EQ(messages.getSortedValues(), (std::vector<std::string>{"1-0", "1-1", "2-0", "2-1"}));
messages.clear();
messages.setMinNumMsgs(2);
consumer.seek(timestamp);
ASSERT_TRUE(messages.wait(std::chrono::seconds(3)));
ASSERT_EQ(messages.getSortedValues(), (std::vector<std::string>{"1-1", "2-1"}));

client.close();
}
75 changes: 75 additions & 0 deletions tests/ThreadSafeMessages.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once

#include <pulsar/Message.h>

#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <vector>

namespace pulsar {

// When we receive messages in the message listener or the callback of receiveAsync(), we need to verify the
// received messages in the test thread. This class is a helper class for thread-safe access to the messages.
class ThreadSafeMessages {
public:
ThreadSafeMessages(size_t minNumMsgs) : minNumMsgs_(minNumMsgs) {}

template <typename Duration>
bool wait(Duration duration) {
std::unique_lock<std::mutex> lock{mutex_};
return cond_.wait_for(lock, duration, [this] { return msgs_.size() >= minNumMsgs_; });
}

void add(const Message& msg) {
std::lock_guard<std::mutex> lock{mutex_};
msgs_.emplace_back(msg);
if (msgs_.size() >= minNumMsgs_) {
cond_.notify_all();
}
}

void clear() {
std::lock_guard<std::mutex> lock{mutex_};
msgs_.clear();
}

std::vector<std::string> getSortedValues() const {
std::unique_lock<std::mutex> lock{mutex_};
std::vector<std::string> values(msgs_.size());
std::transform(msgs_.cbegin(), msgs_.cend(), values.begin(),
[](const Message& msg) { return msg.getDataAsString(); });
lock.unlock();
std::sort(values.begin(), values.end());
return values;
}

void setMinNumMsgs(size_t minNumMsgs) noexcept { minNumMsgs_ = minNumMsgs; }

private:
std::atomic_size_t minNumMsgs_;
std::vector<Message> msgs_;
mutable std::mutex mutex_;
mutable std::condition_variable cond_;
};

} // namespace pulsar

0 comments on commit a1e2b4a

Please sign in to comment.