Skip to content

Commit

Permalink
Fix for testcase assuming non-blocking removeByID (ros#2283)
Browse files Browse the repository at this point in the history
Rationale for the testcase was the following deadlock scenario:
https://gist.github.com/iwanders/ede48fb649fd47f9b1f9a52c527b463c

Changed testcase presents how the same scenario can be carried
out with blocking removeByID (with exception for self-removal).
The external mutex from the scenario must be unlocked for the
call of ros::Timer::stop, otherwise scenario stays as it is.
External thread returns from removeByID once cb call finishes,
spinner thread returns immediately.
  • Loading branch information
aurzenligl committed Sep 28, 2022
1 parent 35aab20 commit 25fb5a8
Showing 1 changed file with 45 additions and 45 deletions.
90 changes: 45 additions & 45 deletions test/test_roscpp/test/test_callback_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/thread/reverse_lock.hpp>
#include <boost/function.hpp>

using namespace ros;
Expand All @@ -66,6 +67,26 @@ class CountingCallback : public CallbackInterface
};
typedef boost::shared_ptr<CountingCallback> CountingCallbackPtr;

struct CustomCallback : public CallbackInterface
{
template <typename Func>
CustomCallback(Func fun)
: function(fun), count(0)
{}

virtual CallResult call()
{
function();
++count;

return Success;
}

boost::function<void()> function;
size_t count;
};
typedef boost::shared_ptr<CustomCallback> CustomCallbackPtr;

void callAvailableThread(CallbackQueue* queue, bool& done)
{
while (!done)
Expand Down Expand Up @@ -184,73 +205,52 @@ TEST(CallbackQueue, removeSelf)
EXPECT_EQ(cb2->count, 1U);
}

class BlockingCallback : public CallbackInterface
{
public:
BlockingCallback()
: count(0)
{}

virtual CallResult call()
{
mutex_.lock();
++count;

return Success;
}

boost::mutex mutex_;
size_t count;
};
typedef boost::shared_ptr<BlockingCallback> BlockingCallbackPtr;


// This test checks whether removing callbacks by an id doesn't block if one of those callback is being executed.
TEST(CallbackQueue, removeCallbackWhileExecuting)
// This test checks whether self-removing callbacks by an id doesn't block if one of those callback is being executed.
TEST(CallbackQueue, selfRemoveCallbackWhileExecuting)
{
const uint64_t owner_id = 1;
const uint64_t unrelated_id = 2;

boost::mutex external_mtx;

CallbackQueue queue;
BlockingCallbackPtr cb1(boost::make_shared<BlockingCallback>());
CustomCallbackPtr cb1(boost::make_shared<CustomCallback>([&]() {
boost::unique_lock<boost::mutex> external_lock(external_mtx);
boost::this_thread::sleep_for(boost::chrono::milliseconds(300));

{
boost::reverse_lock<boost::unique_lock<boost::mutex>> unlocker(external_lock);
queue.removeByID(owner_id); // external thread blocks here, spinner doesn't
}
}));
CountingCallbackPtr cb2(boost::make_shared<CountingCallback>());
CountingCallbackPtr cb3(boost::make_shared<CountingCallback>());

cb1->mutex_.lock(); // lock the mutex to ensure the blocking callback will stall processing of callback queue.

queue.addCallback(cb1, owner_id); // Add the blocking callback.

// Now, we need to serve the callback queue from another thread.
bool done = false;
boost::thread t = boost::thread(boost::bind(&callAvailableThread, &queue, boost::ref(done)));

ros::WallDuration(1.0).sleep(); // Callback 1 should be being served now.

queue.addCallback(cb1, owner_id); // Add the self-removing callback.
queue.addCallback(cb2, unrelated_id); // Add a second callback with different owner.
queue.addCallback(cb3, owner_id); // Add a third with same owner, this one should never get executed.

// Now try to remove the callback that's being executed.
queue.removeByID(owner_id); // This should not block because cb1 is being served, it should prevent cb3 from running.

ros::WallDuration(1.0).sleep();
// Let's use an external thread to execute cb function and hold its external lock
boost::thread t1([&]() { cb1->call(); });
boost::this_thread::sleep_for(boost::chrono::milliseconds(100));

// The removeByID should not block, so now we can unblock the blocking callback.
cb1->mutex_.unlock(); // This allows processing of cb1 to continue.
// Now, we need to serve the callback queue from another thread.
bool done = false;
boost::thread t2([&]() { callAvailableThread(&queue, done); });

while (!queue.isEmpty()) // Wait until the queue is empty.
{
ros::WallDuration(0.01).sleep();
}

// Properly shut down our callback serving thread.
// Properly shut down our threads.
done = true;
t.join();
t2.join();
t1.join();

EXPECT_EQ(cb1->count, 1U);
EXPECT_EQ(cb1->count, 2U);
EXPECT_EQ(cb2->count, 1U);
EXPECT_EQ(cb3->count, 0U);

cb1->mutex_.unlock(); // Ensure the mutex is unlocked before destruction.
}

class RecursiveCallback : public CallbackInterface
Expand Down

0 comments on commit 25fb5a8

Please sign in to comment.