diff --git a/README.md b/README.md index 658c505..b3b187a 100644 --- a/README.md +++ b/README.md @@ -15,13 +15,10 @@ cmake --build . --target KawaiiMQ ``` to run test, run - ```bash cmake --build . ctest - ``` - in build directory to build documentation, run `./scripts/makeDoc.sh` diff --git a/include/KawaiiMQ/MessageQueueManager.hpp b/include/KawaiiMQ/MessageQueueManager.hpp index 2efdbd4..64299cf 100644 --- a/include/KawaiiMQ/MessageQueueManager.hpp +++ b/include/KawaiiMQ/MessageQueueManager.hpp @@ -79,6 +79,14 @@ namespace messaging { */ bool isRelatedAny(const Topic& topic); + /** + * flush all queues and topics + * @remark this function is for testing only + * @warning this function will clear all queues and topics + * @warning DO NOT USE THIS FUNCTION IN PRODUCTION + */ + void flush(); + private: MessageQueueManager() = default; mutable std::shared_mutex mtx; @@ -167,5 +175,12 @@ namespace messaging { std::shared_lock lock(mtx); return topic_map.find(topic) != topic_map.end(); } + template + requires DerivedFromTemplate + void MessageQueueManager::flush() { + std::lock_guard lock(mtx); + topic_map.clear(); + registered_topic.clear(); + } } #endif //KAWAIIMQ_MESSAGEQUEUEMANAGER_HPP diff --git a/include/KawaiiMQ/Queue.hpp b/include/KawaiiMQ/Queue.hpp index e245f29..d3244b3 100644 --- a/include/KawaiiMQ/Queue.hpp +++ b/include/KawaiiMQ/Queue.hpp @@ -116,9 +116,13 @@ namespace messaging { std::unique_lock lock(mtx); if (timeout_ms == 0) { cond.wait(lock, [this](){return !queue.empty();}); + } else { cond.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this](){return !queue.empty();}); + if(queue.empty()) { + throw std::runtime_error("queue fetch timeout"); + } } auto ret = std::make_shared(std::move(queue.front())); queue.pop(); diff --git a/src/test/ManagerTest/ManagerTest.cpp b/src/test/ManagerTest/ManagerTest.cpp index af79be0..a428a77 100644 --- a/src/test/ManagerTest/ManagerTest.cpp +++ b/src/test/ManagerTest/ManagerTest.cpp @@ -74,3 +74,71 @@ TEST(ManagerTest, RelateAndUnrelateMultiThread) { EXPECT_EQ(manager->getAllRelatedQueue(topic1).size(), 0); EXPECT_EQ(manager->getAllRelatedQueue(topic2).size(), 0); } + +TEST(ManagerTest, ConcurrentAccess) { + auto topic1 = messaging::Topic("topic1"); + auto topic2 = messaging::Topic("topic2"); + auto queue1 = messaging::Queue("test1"); + auto queue2 = messaging::Queue("test2"); + + auto manager = messaging::MessageQueueManager::Instance(); + + std::thread t1([&]() { + manager->relate(topic1, queue1); + manager->relate(topic2, queue2); + }); + + std::thread t2([&]() { + manager->unrelate(topic1, queue1); + manager->unrelate(topic2, queue2); + }); + + t1.join(); + t2.join(); + + EXPECT_EQ(manager->getAllRelatedQueue(topic1).size(), 0); + EXPECT_EQ(manager->getAllRelatedQueue(topic2).size(), 0); +} + +TEST(ProducerConsumerTest, HighDataFlowWithSingleProducerMultipleConsumers) { + messaging::Producer producer; + messaging::Consumer consumer1, consumer2, consumer3; + messaging::Topic topic1{"topic1"}; + messaging::Queue queue; + auto manager = messaging::MessageQueueManager::Instance(); + manager->flush(); + manager->relate(topic1, queue); + producer.subscribe(topic1); + consumer1.subscribe(topic1); + consumer2.subscribe(topic1); + consumer3.subscribe(topic1); + + const int dataFlow = 10000; + int ans = 0; + for (int i = 0; i < dataFlow; ++i) { + producer.publishMessage(topic1, messaging::IntMessage(i)); + ans += i; + } + + auto t1 = std::async(([&]() { + int ret = 0; + for (int i = 0; i < dataFlow / 3; ++i) + ret += consumer1.fetchSingleTopic(topic1)[0]->getContent(); + return ret; + })); + auto t2 = std::async(([&]() { + int ret = 0; + for (int i = 0; i < dataFlow / 3; ++i) + ret += consumer2.fetchSingleTopic(topic1)[0]->getContent(); + return ret; + })); + auto t3 = std::async(([&]() { + int ret = 0; + for (int i = 0; i < dataFlow - (dataFlow / 3) * 2; ++i) + ret += consumer3.fetchSingleTopic(topic1)[0]->getContent(); + return ret; + })); + + ASSERT_EQ(t1.get() + t2.get() + t3.get(), ans); + manager->unrelate(topic1, queue); +} diff --git a/src/test/OperatorTest/ProducerConsumerTest.cpp b/src/test/OperatorTest/ProducerConsumerTest.cpp index fd08429..dd36a65 100644 --- a/src/test/OperatorTest/ProducerConsumerTest.cpp +++ b/src/test/OperatorTest/ProducerConsumerTest.cpp @@ -8,33 +8,52 @@ #include "kawaiiMQ.h" #include "gtest/gtest.h" #include +#include +#include using namespace messaging; TEST(ProducerTest, SubscribeToNewTopic) { Producer producer; + Queue queue; + auto manager = MessageQueueManager::Instance(); Topic topic1{"topic1"}; + manager->relate(topic1, queue); + ASSERT_NO_THROW(producer.subscribe(topic1)); + manager->unrelate(topic1, queue); } TEST(ProducerTest, SubscribeToAlreadySubscribedTopic) { Producer producer; Topic topic1{"topic1"}; + Queue queue; + auto manager = MessageQueueManager::Instance(); + manager->relate(topic1, queue); producer.subscribe(topic1); ASSERT_THROW(producer.subscribe(topic1), std::runtime_error); + manager->unrelate(topic1, queue); } TEST(ProducerTest, UnsubscribeFromSubscribedTopic) { Producer producer; Topic topic1{"topic1"}; + Queue queue; + auto manager = MessageQueueManager::Instance(); + manager->relate(topic1, queue); producer.subscribe(topic1); ASSERT_NO_THROW(producer.unsubscribe(topic1)); + manager->unrelate(topic1, queue); } TEST(ProducerTest, UnsubscribeFromNotSubscribedTopic) { Producer producer; Topic topic1{"topic1"}; + Queue queue; + auto manager = MessageQueueManager::Instance(); + manager->relate(topic1, queue); ASSERT_THROW(producer.unsubscribe(topic1), std::runtime_error); + manager->unrelate(topic1, queue); } TEST(ProducerTest, PublishMessageToSubscribedTopic) { @@ -46,6 +65,7 @@ TEST(ProducerTest, PublishMessageToSubscribedTopic) { auto message = IntMessage(1); producer.subscribe(topic1); ASSERT_NO_THROW(producer.publishMessage(topic1, message)); + manager->unrelate(topic1, queue); } TEST(ProducerTest, PublishMessageToNotSubscribedTopic) { @@ -58,17 +78,18 @@ TEST(ProducerTest, PublishMessageToNotSubscribedTopic) { TEST(ProducerTest, BroadcastMessageWhenNoTopicSubscribed) { Producer producer; auto message = IntMessage(1); - ASSERT_NO_THROW(producer.broadcastMessage(message)); + ASSERT_THROW(producer.broadcastMessage(message), std::runtime_error); } TEST(ProducerTest, BroadcastMessageWhenTopicsSubscribed) { Producer producer; + auto manager = MessageQueueManager::Instance(); + manager->flush(); Topic topic1{"topic1"}; Topic topic2{"topic2"}; auto queue1 = Queue(); auto queue2 = Queue(); auto message = IntMessage(1); - auto manager = MessageQueueManager::Instance(); manager->relate(topic1, queue1); manager->relate(topic2, queue2); producer.subscribe(topic1); @@ -79,13 +100,21 @@ TEST(ProducerTest, BroadcastMessageWhenTopicsSubscribed) { TEST(ConsumerTest, SubscribeToNewTopic) { Consumer consumer; + auto manager = MessageQueueManager::Instance(); + manager->flush(); Topic topic1{"topic1"}; + Queue queue; + manager->relate(topic1, queue); ASSERT_NO_THROW(consumer.subscribe(topic1)); } TEST(ConsumerTest, SubscribeToAlreadySubscribedTopic) { + auto manager = MessageQueueManager::Instance(); + manager->flush(); Consumer consumer; Topic topic1{"topic1"}; + Queue queue; + manager->relate(topic1, queue); consumer.subscribe(topic1); ASSERT_THROW(consumer.subscribe(topic1), std::runtime_error); } @@ -93,6 +122,10 @@ TEST(ConsumerTest, SubscribeToAlreadySubscribedTopic) { TEST(ConsumerTest, UnsubscribeFromSubscribedTopic) { Consumer consumer; Topic topic1{"topic1"}; + Queue queue; + auto manager = MessageQueueManager::Instance(); + manager->flush(); + manager->relate(topic1, queue); consumer.subscribe(topic1); ASSERT_NO_THROW(consumer.unsubscribe(topic1)); } @@ -100,26 +133,48 @@ TEST(ConsumerTest, UnsubscribeFromSubscribedTopic) { TEST(ConsumerTest, UnsubscribeFromNotSubscribedTopic) { Consumer consumer; Topic topic1{"topic1"}; + Queue queue; + auto manager = MessageQueueManager::Instance(); + manager->flush(); + manager->relate(topic1, queue); ASSERT_THROW(consumer.unsubscribe(topic1), std::runtime_error); } -TEST(ConsumerTest, FetchMessageFromSubscribedTopic) { +TEST(ConsumerTest, FetchMessageFromSubscribedEmptyTopic) { Consumer consumer; Topic topic1{"topic1"}; + Queue queue; + auto manager = MessageQueueManager::Instance(); + manager->flush(); + manager->relate(topic1, queue); consumer.subscribe(topic1); - ASSERT_NO_THROW(consumer.fetchSingleTopic(topic1)); + ASSERT_THROW(consumer.fetchSingleTopic(topic1), std::runtime_error); } TEST(ConsumerTest, FetchMessageFromNotSubscribedTopic) { Consumer consumer; Topic topic1{"topic1"}; + Queue queue; + auto manager = MessageQueueManager::Instance(); + manager->flush(); + manager->relate(topic1, queue); ASSERT_THROW(consumer.fetchSingleTopic(topic1), std::runtime_error); } TEST(ConsumerTest, FetchMessageFromAllSubscribedTopics) { + auto manager = MessageQueueManager::Instance(); + manager->flush(); Consumer consumer; Topic topic1{"topic1"}; + Queue queue1; + manager->relate(topic1, queue1); Topic topic2{"topic2"}; + Queue queue2; + manager->relate(topic2, queue2); + Producer producer; + producer.subscribe(topic1); + producer.subscribe(topic2); + producer.broadcastMessage(IntMessage(1)); consumer.subscribe(topic1); consumer.subscribe(topic2); ASSERT_NO_THROW(consumer.fetchMessage()); @@ -129,3 +184,103 @@ TEST(ConsumerTest, FetchMessageWhenNoTopicSubscribed) { Consumer consumer; ASSERT_NO_THROW(consumer.fetchMessage()); } + + +TEST(ProducerTest, ConcurrentPublishToSubscribedTopic) { + Producer producer; + Queue queue; + auto manager = MessageQueueManager::Instance(); + manager->flush(); + Topic topic1{"topic1"}; + manager->relate(topic1, queue); + auto message = IntMessage(1); + producer.subscribe(topic1); + + std::thread t1([&]() { producer.publishMessage(topic1, message); }); + std::thread t2([&]() { producer.publishMessage(topic1, message); }); + + t1.join(); + t2.join(); + + ASSERT_EQ(queue.size(), 2); + manager->unrelate(topic1, queue); +} + +TEST(ConsumerTest, ConcurrentFetchFromSubscribedTopic) { + Consumer consumer; + Topic topic1{"topic1"}; + Queue queue; + auto manager = MessageQueueManager::Instance(); + manager->flush(); + manager->relate(topic1, queue); + consumer.subscribe(topic1); + + Producer producer; + producer.subscribe(topic1); + producer.publishMessage(topic1, IntMessage(1)); + producer.publishMessage(topic1, IntMessage(2)); + + std::thread t1([&]() { consumer.fetchSingleTopic(topic1); }); + std::thread t2([&]() { consumer.fetchSingleTopic(topic1); }); + + t1.join(); + t2.join(); + + ASSERT_EQ(queue.size(), 0); + manager->unrelate(topic1, queue); +} + +TEST(ProducerConsumerTest, MultipleConsumersFetchingFromSingleProducer) { + Producer producer; + Consumer consumer1, consumer2, consumer3; + Topic topic1{"topic1"}; + Queue queue; + auto manager = MessageQueueManager::Instance(); + manager->flush(); + manager->relate(topic1, queue); + producer.subscribe(topic1); + consumer1.subscribe(topic1); + consumer2.subscribe(topic1); + consumer3.subscribe(topic1); + + producer.publishMessage(topic1, IntMessage(1)); + producer.publishMessage(topic1, IntMessage(2)); + producer.publishMessage(topic1, IntMessage(3)); + + std::thread t1([&]() { consumer1.fetchSingleTopic(topic1); }); + std::thread t2([&]() { consumer2.fetchSingleTopic(topic1); }); + std::thread t3([&]() { consumer3.fetchSingleTopic(topic1); }); + + t1.join(); + t2.join(); + t3.join(); + + ASSERT_EQ(queue.size(), 0); + manager->unrelate(topic1, queue); +} + +TEST(ProducerConsumerTest, NoDataMissedWithMultipleConsumersFetchingFromSingleProducer) { + Producer producer; + Consumer consumer1, consumer2, consumer3; + Topic topic1{"topic1"}; + Queue queue; + auto manager = MessageQueueManager::Instance(); + manager->flush(); + manager->relate(topic1, queue); + producer.subscribe(topic1); + consumer1.subscribe(topic1); + consumer2.subscribe(topic1); + consumer3.subscribe(topic1); + + producer.publishMessage(topic1, IntMessage(1)); + producer.publishMessage(topic1, IntMessage(2)); + producer.publishMessage(topic1, IntMessage(3)); + + auto f1 = std::async(std::launch::async, [&]() { return consumer1.fetchSingleTopic(topic1); }); + auto f2 = std::async(std::launch::async, [&]() { return consumer1.fetchSingleTopic(topic1); }); + auto f3 = std::async(std::launch::async, [&]() { return consumer1.fetchSingleTopic(topic1); }); + + ASSERT_EQ(f1.get()[0]->getContent() + f2.get()[0]->getContent() + f3.get()[0]->getContent(), 6); + + manager->unrelate(topic1, queue); +}