Skip to content

Commit

Permalink
fixed some tests
Browse files Browse the repository at this point in the history
Took 1 hour 12 minutes
  • Loading branch information
kagurazaka-ayano committed Jan 28, 2024
1 parent a72c0a6 commit 1359a71
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 7 deletions.
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@ cmake --build . --target KawaiiMQ
```

to run test, run

```bash
cmake --build .
ctest

```

in build directory

to build documentation, run `./scripts/makeDoc.sh`
Expand Down
15 changes: 15 additions & 0 deletions include/KawaiiMQ/MessageQueueManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ namespace messaging {
*/
bool isRelatedAny(const Topic& topic);

/**
* flush all queues and topics
* @remark this function is for testing only
* @warning this function will clear all queues and topics
* @warning DO NOT USE THIS FUNCTION IN PRODUCTION
*/
void flush();

private:
MessageQueueManager() = default;
mutable std::shared_mutex mtx;
Expand Down Expand Up @@ -167,5 +175,12 @@ namespace messaging {
std::shared_lock lock(mtx);
return topic_map.find(topic) != topic_map.end();
}
template<typename T>
requires DerivedFromTemplate<IMessage, T>
void MessageQueueManager<T>::flush() {
std::lock_guard lock(mtx);
topic_map.clear();
registered_topic.clear();
}
}
#endif //KAWAIIMQ_MESSAGEQUEUEMANAGER_HPP
4 changes: 4 additions & 0 deletions include/KawaiiMQ/Queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,13 @@ namespace messaging {
std::unique_lock lock(mtx);
if (timeout_ms == 0) {
cond.wait(lock, [this](){return !queue.empty();});

}
else {
cond.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this](){return !queue.empty();});
if(queue.empty()) {
throw std::runtime_error("queue fetch timeout");
}
}
auto ret = std::make_shared<T>(std::move(queue.front()));
queue.pop();
Expand Down
68 changes: 68 additions & 0 deletions src/test/ManagerTest/ManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,71 @@ TEST(ManagerTest, RelateAndUnrelateMultiThread) {
EXPECT_EQ(manager->getAllRelatedQueue(topic1).size(), 0);
EXPECT_EQ(manager->getAllRelatedQueue(topic2).size(), 0);
}

TEST(ManagerTest, ConcurrentAccess) {
auto topic1 = messaging::Topic("topic1");
auto topic2 = messaging::Topic("topic2");
auto queue1 = messaging::Queue<messaging::IntMessage>("test1");
auto queue2 = messaging::Queue<messaging::IntMessage>("test2");

auto manager = messaging::MessageQueueManager<messaging::IntMessage>::Instance();

std::thread t1([&]() {
manager->relate(topic1, queue1);
manager->relate(topic2, queue2);
});

std::thread t2([&]() {
manager->unrelate(topic1, queue1);
manager->unrelate(topic2, queue2);
});

t1.join();
t2.join();

EXPECT_EQ(manager->getAllRelatedQueue(topic1).size(), 0);
EXPECT_EQ(manager->getAllRelatedQueue(topic2).size(), 0);
}

TEST(ProducerConsumerTest, HighDataFlowWithSingleProducerMultipleConsumers) {
messaging::Producer<messaging::IntMessage> producer;
messaging::Consumer<messaging::IntMessage> consumer1, consumer2, consumer3;
messaging::Topic topic1{"topic1"};
messaging::Queue<messaging::IntMessage> queue;
auto manager = messaging::MessageQueueManager<messaging::IntMessage>::Instance();
manager->flush();
manager->relate(topic1, queue);
producer.subscribe(topic1);
consumer1.subscribe(topic1);
consumer2.subscribe(topic1);
consumer3.subscribe(topic1);

const int dataFlow = 10000;
int ans = 0;
for (int i = 0; i < dataFlow; ++i) {
producer.publishMessage(topic1, messaging::IntMessage(i));
ans += i;
}

auto t1 = std::async(([&]() {
int ret = 0;
for (int i = 0; i < dataFlow / 3; ++i)
ret += consumer1.fetchSingleTopic(topic1)[0]->getContent();
return ret;
}));
auto t2 = std::async(([&]() {
int ret = 0;
for (int i = 0; i < dataFlow / 3; ++i)
ret += consumer2.fetchSingleTopic(topic1)[0]->getContent();
return ret;
}));
auto t3 = std::async(([&]() {
int ret = 0;
for (int i = 0; i < dataFlow - (dataFlow / 3) * 2; ++i)
ret += consumer3.fetchSingleTopic(topic1)[0]->getContent();
return ret;
}));

ASSERT_EQ(t1.get() + t2.get() + t3.get(), ans);
manager->unrelate(topic1, queue);
}
163 changes: 159 additions & 4 deletions src/test/OperatorTest/ProducerConsumerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,52 @@
#include "kawaiiMQ.h"
#include "gtest/gtest.h"
#include <string>
#include <thread>
#include <future>

using namespace messaging;

TEST(ProducerTest, SubscribeToNewTopic) {
Producer<IntMessage> producer;
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
Topic topic1{"topic1"};
manager->relate(topic1, queue);

ASSERT_NO_THROW(producer.subscribe(topic1));
manager->unrelate(topic1, queue);
}

TEST(ProducerTest, SubscribeToAlreadySubscribedTopic) {
Producer<IntMessage> producer;
Topic topic1{"topic1"};
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->relate(topic1, queue);
producer.subscribe(topic1);
ASSERT_THROW(producer.subscribe(topic1), std::runtime_error);
manager->unrelate(topic1, queue);
}

TEST(ProducerTest, UnsubscribeFromSubscribedTopic) {
Producer<IntMessage> producer;
Topic topic1{"topic1"};
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->relate(topic1, queue);
producer.subscribe(topic1);
ASSERT_NO_THROW(producer.unsubscribe(topic1));
manager->unrelate(topic1, queue);
}

TEST(ProducerTest, UnsubscribeFromNotSubscribedTopic) {
Producer<IntMessage> producer;
Topic topic1{"topic1"};
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->relate(topic1, queue);
ASSERT_THROW(producer.unsubscribe(topic1), std::runtime_error);
manager->unrelate(topic1, queue);
}

TEST(ProducerTest, PublishMessageToSubscribedTopic) {
Expand All @@ -46,6 +65,7 @@ TEST(ProducerTest, PublishMessageToSubscribedTopic) {
auto message = IntMessage(1);
producer.subscribe(topic1);
ASSERT_NO_THROW(producer.publishMessage(topic1, message));
manager->unrelate(topic1, queue);
}

TEST(ProducerTest, PublishMessageToNotSubscribedTopic) {
Expand All @@ -58,17 +78,18 @@ TEST(ProducerTest, PublishMessageToNotSubscribedTopic) {
TEST(ProducerTest, BroadcastMessageWhenNoTopicSubscribed) {
Producer<IntMessage> producer;
auto message = IntMessage(1);
ASSERT_NO_THROW(producer.broadcastMessage(message));
ASSERT_THROW(producer.broadcastMessage(message), std::runtime_error);
}

TEST(ProducerTest, BroadcastMessageWhenTopicsSubscribed) {
Producer<IntMessage> producer;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
Topic topic1{"topic1"};
Topic topic2{"topic2"};
auto queue1 = Queue<IntMessage>();
auto queue2 = Queue<IntMessage>();
auto message = IntMessage(1);
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->relate(topic1, queue1);
manager->relate(topic2, queue2);
producer.subscribe(topic1);
Expand All @@ -79,47 +100,81 @@ TEST(ProducerTest, BroadcastMessageWhenTopicsSubscribed) {

TEST(ConsumerTest, SubscribeToNewTopic) {
Consumer<IntMessage> consumer;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
Topic topic1{"topic1"};
Queue<IntMessage> queue;
manager->relate(topic1, queue);
ASSERT_NO_THROW(consumer.subscribe(topic1));
}

TEST(ConsumerTest, SubscribeToAlreadySubscribedTopic) {
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
Consumer<IntMessage> consumer;
Topic topic1{"topic1"};
Queue<IntMessage> queue;
manager->relate(topic1, queue);
consumer.subscribe(topic1);
ASSERT_THROW(consumer.subscribe(topic1), std::runtime_error);
}

TEST(ConsumerTest, UnsubscribeFromSubscribedTopic) {
Consumer<IntMessage> consumer;
Topic topic1{"topic1"};
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
manager->relate(topic1, queue);
consumer.subscribe(topic1);
ASSERT_NO_THROW(consumer.unsubscribe(topic1));
}

TEST(ConsumerTest, UnsubscribeFromNotSubscribedTopic) {
Consumer<IntMessage> consumer;
Topic topic1{"topic1"};
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
manager->relate(topic1, queue);
ASSERT_THROW(consumer.unsubscribe(topic1), std::runtime_error);
}

TEST(ConsumerTest, FetchMessageFromSubscribedTopic) {
TEST(ConsumerTest, FetchMessageFromSubscribedEmptyTopic) {
Consumer<IntMessage> consumer;
Topic topic1{"topic1"};
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
manager->relate(topic1, queue);
consumer.subscribe(topic1);
ASSERT_NO_THROW(consumer.fetchSingleTopic(topic1));
ASSERT_THROW(consumer.fetchSingleTopic(topic1), std::runtime_error);
}

TEST(ConsumerTest, FetchMessageFromNotSubscribedTopic) {
Consumer<IntMessage> consumer;
Topic topic1{"topic1"};
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
manager->relate(topic1, queue);
ASSERT_THROW(consumer.fetchSingleTopic(topic1), std::runtime_error);
}

TEST(ConsumerTest, FetchMessageFromAllSubscribedTopics) {
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
Consumer<IntMessage> consumer;
Topic topic1{"topic1"};
Queue<IntMessage> queue1;
manager->relate(topic1, queue1);
Topic topic2{"topic2"};
Queue<IntMessage> queue2;
manager->relate(topic2, queue2);
Producer<IntMessage> producer;
producer.subscribe(topic1);
producer.subscribe(topic2);
producer.broadcastMessage(IntMessage(1));
consumer.subscribe(topic1);
consumer.subscribe(topic2);
ASSERT_NO_THROW(consumer.fetchMessage());
Expand All @@ -129,3 +184,103 @@ TEST(ConsumerTest, FetchMessageWhenNoTopicSubscribed) {
Consumer<IntMessage> consumer;
ASSERT_NO_THROW(consumer.fetchMessage());
}


TEST(ProducerTest, ConcurrentPublishToSubscribedTopic) {
Producer<IntMessage> producer;
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
Topic topic1{"topic1"};
manager->relate(topic1, queue);
auto message = IntMessage(1);
producer.subscribe(topic1);

std::thread t1([&]() { producer.publishMessage(topic1, message); });
std::thread t2([&]() { producer.publishMessage(topic1, message); });

t1.join();
t2.join();

ASSERT_EQ(queue.size(), 2);
manager->unrelate(topic1, queue);
}

TEST(ConsumerTest, ConcurrentFetchFromSubscribedTopic) {
Consumer<IntMessage> consumer;
Topic topic1{"topic1"};
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
manager->relate(topic1, queue);
consumer.subscribe(topic1);

Producer<IntMessage> producer;
producer.subscribe(topic1);
producer.publishMessage(topic1, IntMessage(1));
producer.publishMessage(topic1, IntMessage(2));

std::thread t1([&]() { consumer.fetchSingleTopic(topic1); });
std::thread t2([&]() { consumer.fetchSingleTopic(topic1); });

t1.join();
t2.join();

ASSERT_EQ(queue.size(), 0);
manager->unrelate(topic1, queue);
}

TEST(ProducerConsumerTest, MultipleConsumersFetchingFromSingleProducer) {
Producer<IntMessage> producer;
Consumer<IntMessage> consumer1, consumer2, consumer3;
Topic topic1{"topic1"};
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
manager->relate(topic1, queue);
producer.subscribe(topic1);
consumer1.subscribe(topic1);
consumer2.subscribe(topic1);
consumer3.subscribe(topic1);

producer.publishMessage(topic1, IntMessage(1));
producer.publishMessage(topic1, IntMessage(2));
producer.publishMessage(topic1, IntMessage(3));

std::thread t1([&]() { consumer1.fetchSingleTopic(topic1); });
std::thread t2([&]() { consumer2.fetchSingleTopic(topic1); });
std::thread t3([&]() { consumer3.fetchSingleTopic(topic1); });

t1.join();
t2.join();
t3.join();

ASSERT_EQ(queue.size(), 0);
manager->unrelate(topic1, queue);
}

TEST(ProducerConsumerTest, NoDataMissedWithMultipleConsumersFetchingFromSingleProducer) {
Producer<IntMessage> producer;
Consumer<IntMessage> consumer1, consumer2, consumer3;
Topic topic1{"topic1"};
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
manager->relate(topic1, queue);
producer.subscribe(topic1);
consumer1.subscribe(topic1);
consumer2.subscribe(topic1);
consumer3.subscribe(topic1);

producer.publishMessage(topic1, IntMessage(1));
producer.publishMessage(topic1, IntMessage(2));
producer.publishMessage(topic1, IntMessage(3));

auto f1 = std::async(std::launch::async, [&]() { return consumer1.fetchSingleTopic(topic1); });
auto f2 = std::async(std::launch::async, [&]() { return consumer1.fetchSingleTopic(topic1); });
auto f3 = std::async(std::launch::async, [&]() { return consumer1.fetchSingleTopic(topic1); });

ASSERT_EQ(f1.get()[0]->getContent() + f2.get()[0]->getContent() + f3.get()[0]->getContent(), 6);

manager->unrelate(topic1, queue);
}

0 comments on commit 1359a71

Please sign in to comment.