10#include <QtConcurrentRun>
21 void CoroChannelTest::testSingleRecv ()
23 using namespace std::chrono_literals;
25 constexpr auto producersCount = 32;
26 constexpr auto repCount = 100;
27 constexpr auto sleepLength = 1ms;
29 Channel<int> ch {
this };
31 std::vector<std::thread> threads;
32 std::atomic_int expected;
33 for (
int i = 0; i < producersCount; ++i)
34 threads.emplace_back ([&, i]
36 for (int j = 0; j < repCount; ++j)
38 const auto val = j * producersCount + i;
40 expected.fetch_add (val, std::memory_order::relaxed);
41 std::this_thread::sleep_for (sleepLength);
45 auto mainThread = std::this_thread::get_id ();
49 while (
auto next =
co_await ch->Receive ())
53 auto thisThread = std::this_thread::get_id ();
54 QCOMPARE (thisThread, mainThread);
62 for (
auto& thread : threads)
68 QCOMPARE (result, expected);
71 void CoroChannelTest::testManyRecvs ()
73 using namespace std::chrono_literals;
75 constexpr auto producersCount = 32;
76 constexpr auto consumersCount = 8;
77 constexpr auto repCount = 100;
78 constexpr auto sleepLength = 1ms;
82 std::vector<std::thread> producers;
83 std::atomic_int expected;
84 for (
int i = 0; i < producersCount; ++i)
85 producers.emplace_back ([&, i]
87 for (int j = 0; j < repCount; ++j)
89 const auto val = j * producersCount + i;
91 expected.fetch_add (val, std::memory_order::relaxed);
92 std::this_thread::sleep_for (sleepLength);
97 std::vector<std::thread> consumers;
98 for (
int i = 0; i < consumersCount; ++i)
99 consumers.emplace_back ([&]
101 auto reader = [] (Channel<int> *ch) -> Task<int, ThreadSafetyExtension>
104 while (auto next = co_await ch->Receive ())
111 for (
auto& thread : producers)
116 for (
auto& thread : consumers)
119 QCOMPARE (sum, expected);
122 void CoroChannelTest::testSingleThreaded ()
124 constexpr auto iterations = 1000;
128 auto reader = [] (Channel<int> *ch) -> Task<int, ThreadSafetyExtension>
131 while (
auto next =
co_await ch->Receive ())
137 for (
int i = 0; i < iterations; ++i)
146 QCOMPARE (result, expected);
149 void CoroChannelTest::testSingleThreadedTimered ()
151 using namespace std::chrono_literals;
153 constexpr auto iterations = 100;
154 constexpr auto interval = 1ms;
158 auto reader = [] (Channel<int> *ch) -> Task<int, ThreadSafetyExtension>
161 while (
auto next =
co_await ch->Receive ())
169 timer.callOnTimeout ([&, i = 0]
mutable
173 if (++i == iterations)
179 timer.start (interval);
182 QCOMPARE (result, expected);
185 void CoroChannelTest::testMerge ()
187 using namespace std::chrono_literals;
189 constexpr auto numChannels = 100;
190 constexpr auto iterations = 100;
191 constexpr auto interval = 1ms;
193 QVector<Channel_ptr<int>> channels;
194 std::generate_n (std::back_inserter (channels), numChannels, [] {
return std::make_shared<Channel<int>> (); });
198 auto reader = [] (Channel<int> *ch) -> Task<int, ThreadSafetyExtension>
201 while (
auto next =
co_await ch->Receive ())
209 timer.callOnTimeout ([&, i = 0]
mutable
211 for (
int j = 0; j < numChannels; ++j)
213 auto value = i * numChannels + j;
215 channels [j]->Send (value);
217 if (++i == iterations)
220 for (
auto chan : channels)
224 timer.start (interval);
227 QCOMPARE (result, expected);
T GetTaskResult(Task< T, Extensions... > task)
Channel_ptr< T > MergeChannels(QVector< Channel_ptr< T > > channels)