Skip to content

Commit

Permalink
Fixing timeout in unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
sean-parent committed Mar 15, 2024
1 parent 2f4037b commit b657d25
Show file tree
Hide file tree
Showing 11 changed files with 336 additions and 320 deletions.
22 changes: 10 additions & 12 deletions docs/libraries/concurrency/channel/merge_channel_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <iostream>
#include <thread>

#include <stlab/concurrency/await.hpp>
#include <stlab/concurrency/channel.hpp>
#include <stlab/concurrency/default_executor.hpp>

Expand All @@ -16,17 +17,14 @@ int main() {
tie(send2, receive2) = channel<int>(default_executor);
tie(send3, receive3) = channel<int>(default_executor);

std::atomic_int all_done{ 0 };
std::atomic_int all_done{0};

auto merged = merge_channel<unordered_t>(default_executor,
[](int x) { return x; },
receive1,
receive2,
receive3)
| [&_all_done = all_done](int x) {
cout << x << '\n';
++_all_done;
};
auto merged = merge_channel<unordered_t>(
default_executor, [](int x) { return x; }, receive1, receive2, receive3) |
[&_all_done = all_done](int x) {
cout << x << '\n';
++_all_done;
};

receive1.set_ready();
receive2.set_ready();
Expand All @@ -41,7 +39,7 @@ int main() {

// Waiting just for illustrational purpose
while (all_done < 6) {
this_thread::sleep_for(std::chrono::milliseconds(1));
invoke_waiting([] { this_thread::sleep_for(std::chrono::milliseconds(1)); });
}

pre_exit();
Expand All @@ -58,4 +56,4 @@ int main() {
4
5
6
*/
*/
14 changes: 8 additions & 6 deletions docs/libraries/for_each_argument_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,27 @@
#include <mutex>
#include <thread>

#include <stlab/concurrency/future.hpp>
#include <stlab/concurrency/await.hpp>
#include <stlab/concurrency/default_executor.hpp>
#include <stlab/concurrency/future.hpp>
#include <stlab/concurrency/immediate_executor.hpp>
#include <stlab/concurrency/utility.hpp>

using namespace stlab;
using namespace std;

int main(int, char**) {
auto fi = []{
auto fi = [] {
static std::size_t count_s{0};
return make_ready_future<int>(count_s++, immediate_executor);
};

auto f = when_all(default_executor, [](auto... args){
for_each_argument([](auto x) { cout << x << "\n"; }, args...);
}, fi(), fi(), fi());
auto f = when_all(
default_executor,
[](auto... args) { for_each_argument([](auto x) { cout << x << "\n"; }, args...); }, fi(),
fi(), fi());

while (!f.get_try()) this_thread::sleep_for(chrono::milliseconds(1));
await(f);

pre_exit();
}
Expand Down
56 changes: 33 additions & 23 deletions test/channel_process_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <boost/test/unit_test.hpp>

#include <stlab/concurrency/await.hpp>
#include <stlab/concurrency/channel.hpp>
#include <stlab/concurrency/default_executor.hpp>
#include <stlab/concurrency/future.hpp>
Expand All @@ -17,6 +18,8 @@

#include "channel_test_helper.hpp"

using namespace stlab;

using channel_test_fixture_int_1 = channel_test_helper::channel_test_fixture<int, 1>;

BOOST_FIXTURE_TEST_SUITE(int_channel_process_void_functor, channel_test_fixture_int_1)
Expand Down Expand Up @@ -164,14 +167,16 @@ BOOST_AUTO_TEST_CASE(int_channel_split_process_one_step) {
std::atomic_int index2{0};
std::vector<int> results2(10, 0);

auto check1 = _receive[0] | channel_test_helper::sum<1>() | [& _index = index1, &_results = results1](int x) {
_results[x] = x;
++_index;
};
auto check2 = _receive[0] | channel_test_helper::sum<1>() | [& _index = index2, &_results = results2](int x) {
_results[x] = x;
++_index;
};
auto check1 = _receive[0] | channel_test_helper::sum<1>() |
[&_index = index1, &_results = results1](int x) {
_results[x] = x;
++_index;
};
auto check2 = _receive[0] | channel_test_helper::sum<1>() |
[&_index = index2, &_results = results2](int x) {
_results[x] = x;
++_index;
};

_receive[0].set_ready();
for (auto i = 0; i < 10; ++i)
Expand All @@ -193,14 +198,16 @@ BOOST_AUTO_TEST_CASE(int_channel_split_process_two_steps) {
std::atomic_int index2{0};
std::vector<int> results2(5);

auto check1 = _receive[0] | channel_test_helper::sum<2>() | [& _index = index1, &_results = results1](int x) {
_results[_index] = x;
++_index;
};
auto check2 = _receive[0] | channel_test_helper::sum<2>() | [& _index = index2, &_results = results2](int x) {
_results[_index] = x;
++_index;
};
auto check1 = _receive[0] | channel_test_helper::sum<2>() |
[&_index = index1, &_results = results1](int x) {
_results[_index] = x;
++_index;
};
auto check2 = _receive[0] | channel_test_helper::sum<2>() |
[&_index = index2, &_results = results2](int x) {
_results[_index] = x;
++_index;
};

_receive[0].set_ready();
for (auto i = 0; i < 10; ++i)
Expand All @@ -221,8 +228,10 @@ BOOST_AUTO_TEST_CASE(int_channel_split_process_many_steps) {
std::atomic_int result1{0};
std::atomic_int result2{0};

auto check1 = _receive[0] | channel_test_helper::sum<10>() | [& _result = result1](int x) { _result = x; };
auto check2 = _receive[0] | channel_test_helper::sum<10>() | [& _result = result2](int x) { _result = x; };
auto check1 =
_receive[0] | channel_test_helper::sum<10>() | [&_result = result1](int x) { _result = x; };
auto check2 =
_receive[0] | channel_test_helper::sum<10>() | [&_result = result2](int x) { _result = x; };

_receive[0].set_ready();
for (auto i = 0; i < 10; ++i)
Expand Down Expand Up @@ -261,7 +270,7 @@ BOOST_AUTO_TEST_CASE(int_channel_process_with_two_steps_timed) {
channel_test_helper::manual_scheduler::run_next_task();

while (result == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
invoke_waiting([] { std::this_thread::sleep_for(std::chrono::milliseconds(10)); });
}

BOOST_REQUIRE_EQUAL(42, result);
Expand All @@ -282,13 +291,14 @@ BOOST_AUTO_TEST_CASE(int_channel_process_with_two_steps_timed_wo_timeout) {
send(42);

while (channel_test_helper::timed_sum::current_sum() != 42) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
invoke_waiting([] { std::this_thread::sleep_for(std::chrono::milliseconds(10)); });
}

send(43);

while (result == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
invoke_waiting([] {
std::this_thread::sleep_for(std::chrono::milliseconds(10)); });
}

BOOST_REQUIRE_EQUAL(85, result);
Expand Down Expand Up @@ -330,7 +340,7 @@ BOOST_AUTO_TEST_CASE(int_channel_process_set_error_is_called_on_upstream_error)
send(42);

while (!check) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
invoke_waiting([] { std::this_thread::sleep_for(std::chrono::milliseconds(10)); });
}

BOOST_REQUIRE_EQUAL(true, check.load());
Expand Down Expand Up @@ -372,7 +382,7 @@ BOOST_AUTO_TEST_CASE(int_channel_process_close_is_called_on_upstream_error) {
send(42);

while (!check) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
invoke_waiting([] { std::this_thread::sleep_for(std::chrono::milliseconds(10)); });
}

BOOST_REQUIRE_EQUAL(true, check.load());
Expand Down
15 changes: 8 additions & 7 deletions test/channel_test_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@

#include <stlab/concurrency/channel.hpp>

#include <stlab/concurrency/await.hpp>
#include <stlab/concurrency/default_executor.hpp>
#include <stlab/concurrency/task.hpp>
#include <stlab/scope.hpp>

#include <queue>
#include <thread>


using lock_t = std::unique_lock<std::mutex>;

namespace channel_test_helper {
Expand All @@ -42,7 +42,8 @@ class manual_scheduler {

static void wait_until_queue_size_of(std::size_t n) {
while (stlab::scope<lock_t>(_mutex, [&] { return _tasks.size(); }) < n) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
stlab::invoke_waiting(
[] { std::this_thread::sleep_for(std::chrono::milliseconds(1)); });
}
}

Expand All @@ -63,7 +64,8 @@ struct channel_test_fixture_base {
template <typename F>
void wait_until_done(F&& f) const {
while (!std::forward<F>(f)()) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
stlab::invoke_waiting(
[] { std::this_thread::sleep_for(std::chrono::milliseconds(1)); });
}
}
};
Expand All @@ -73,9 +75,7 @@ struct channel_test_fixture : channel_test_fixture_base {
std::array<stlab::sender<T>, N> _send;
std::array<stlab::receiver<T>, N> _receive;

channel_test_fixture() {
test_reset();
}
channel_test_fixture() { test_reset(); }

void test_reset() {
for (std::size_t i = 0; i < N; i++)
Expand Down Expand Up @@ -120,7 +120,8 @@ class channel_combine_test_fixture {
template <typename F>
void wait_until_done(F&& f) const {
while (!std::forward<F>(f)()) {
std::this_thread::sleep_for(std::chrono::microseconds(1));
stlab::invoke_waiting(
[] { std::this_thread::sleep_for(std::chrono::microseconds(1)); });
}
}
};
Expand Down
Loading

0 comments on commit b657d25

Please sign in to comment.