LeechCraft 0.6.70-18450-gabe19ee3b0
Modular cross-platform feature rich live environment.
Loading...
Searching...
No Matches
corochanneltest.cpp
Go to the documentation of this file.
1/**********************************************************************
2 * LeechCraft - modular cross-platform feature rich internet client.
3 * Copyright (C) 2006-2014 Georg Rudoy
4 *
5 * Distributed under the Boost Software License, Version 1.0.
6 * (See accompanying file LICENSE or copy at https://www.boost.org/LICENSE_1_0.txt)
7 **********************************************************************/
8
9#include "corochanneltest.h"
10#include <QtConcurrentRun>
11#include <QtTest>
12#include "coro.h"
13#include "coro/channel.h"
14#include "coro/channelutils.h"
15#include "coro/getresult.h"
16
17QTEST_GUILESS_MAIN (LC::Util::CoroChannelTest)
18
19namespace LC::Util
20{
21 void CoroChannelTest::testSingleRecv ()
22 {
23 using namespace std::chrono_literals;
24
25 constexpr auto producersCount = 32;
26 constexpr auto repCount = 100;
27 constexpr auto sleepLength = 1ms;
28
29 Channel<int> ch { this };
30
31 std::vector<std::thread> threads;
32 std::atomic_int expected;
33 for (int i = 0; i < producersCount; ++i)
34 threads.emplace_back ([&, i]
35 {
36 for (int j = 0; j < repCount; ++j)
37 {
38 const auto val = j * producersCount + i;
39 ch.Send (val);
40 expected.fetch_add (val, std::memory_order::relaxed);
41 std::this_thread::sleep_for (sleepLength);
42 }
43 });
44
45 auto mainThread = std::this_thread::get_id ();
46 auto reader = [] (auto mainThread, Channel<int> *ch) -> Task<int, ThreadSafetyExtension>
47 {
48 int sum = 0;
49 while (auto next = co_await ch->Receive ())
50 {
51 [=]
52 {
53 auto thisThread = std::this_thread::get_id ();
54 QCOMPARE (thisThread, mainThread);
55 } ();
56
57 sum += *next;
58 }
59 co_return sum;
60 } (mainThread, &ch);
61
62 for (auto& thread : threads)
63 thread.join ();
64
65 ch.Close ();
66
67 auto result = GetTaskResult (reader);
68 QCOMPARE (result, expected);
69 }
70
71 void CoroChannelTest::testManyRecvs ()
72 {
73 using namespace std::chrono_literals;
74
75 constexpr auto producersCount = 32;
76 constexpr auto consumersCount = 8;
77 constexpr auto repCount = 100;
78 constexpr auto sleepLength = 1ms;
79
80 Channel<int> ch;
81
82 std::vector<std::thread> producers;
83 std::atomic_int expected;
84 for (int i = 0; i < producersCount; ++i)
85 producers.emplace_back ([&, i]
86 {
87 for (int j = 0; j < repCount; ++j)
88 {
89 const auto val = j * producersCount + i;
90 ch.Send (val);
91 expected.fetch_add (val, std::memory_order::relaxed);
92 std::this_thread::sleep_for (sleepLength);
93 }
94 });
95
96 std::atomic_int sum;
97 std::vector<std::thread> consumers;
98 for (int i = 0; i < consumersCount; ++i)
99 consumers.emplace_back ([&]
100 {
101 auto reader = [] (Channel<int> *ch) -> Task<int, ThreadSafetyExtension>
102 {
103 int sum = 0;
104 while (auto next = co_await ch->Receive ())
105 sum += *next;
106 co_return sum;
107 } (&ch);
108 sum.fetch_add (GetTaskResult (reader));
109 });
110
111 for (auto& thread : producers)
112 thread.join ();
113
114 ch.Close ();
115
116 for (auto& thread : consumers)
117 thread.join ();
118
119 QCOMPARE (sum, expected);
120 }
121
122 void CoroChannelTest::testSingleThreaded ()
123 {
124 constexpr auto iterations = 1000;
125
126 Channel<int> ch;
127
128 auto reader = [] (Channel<int> *ch) -> Task<int, ThreadSafetyExtension>
129 {
130 int sum = 0;
131 while (auto next = co_await ch->Receive ())
132 sum += *next;
133 co_return sum;
134 } (&ch);
135
136 int expected = 0;
137 for (int i = 0; i < iterations; ++i)
138 {
139 expected += i;
140 ch.Send (i);
141 }
142
143 ch.Close ();
144
145 const auto result = GetTaskResult (reader);
146 QCOMPARE (result, expected);
147 }
148
149 void CoroChannelTest::testSingleThreadedTimered ()
150 {
151 using namespace std::chrono_literals;
152
153 constexpr auto iterations = 100;
154 constexpr auto interval = 1ms;
155
156 Channel<int> ch;
157
158 auto reader = [] (Channel<int> *ch) -> Task<int, ThreadSafetyExtension>
159 {
160 int sum = 0;
161 while (auto next = co_await ch->Receive ())
162 sum += *next;
163 co_return sum;
164 } (&ch);
165
166 int expected = 0;
167
168 QTimer timer;
169 timer.callOnTimeout ([&, i = 0] mutable
170 {
171 expected += i;
172 ch.Send (i);
173 if (++i == iterations)
174 {
175 timer.stop ();
176 ch.Close ();
177 }
178 });
179 timer.start (interval);
180
181 const auto result = GetTaskResult (reader);
182 QCOMPARE (result, expected);
183 }
184
185 void CoroChannelTest::testMerge ()
186 {
187 using namespace std::chrono_literals;
188
189 constexpr auto numChannels = 100;
190 constexpr auto iterations = 100;
191 constexpr auto interval = 1ms;
192
193 QVector<Channel_ptr<int>> channels;
194 std::generate_n (std::back_inserter (channels), numChannels, [] { return std::make_shared<Channel<int>> (); });
195
196 auto merged = MergeChannels (channels);
197
198 auto reader = [] (Channel<int> *ch) -> Task<int, ThreadSafetyExtension>
199 {
200 int sum = 0;
201 while (auto next = co_await ch->Receive ())
202 sum += *next;
203 co_return sum;
204 } (merged.get ());
205
206 int expected = 0;
207
208 QTimer timer;
209 timer.callOnTimeout ([&, i = 0] mutable
210 {
211 for (int j = 0; j < numChannels; ++j)
212 {
213 auto value = i * numChannels + j;
214 expected += value;
215 channels [j]->Send (value);
216 }
217 if (++i == iterations)
218 {
219 timer.stop ();
220 for (auto chan : channels)
221 chan->Close ();
222 }
223 });
224 timer.start (interval);
225
226 const auto result = GetTaskResult (reader);
227 QCOMPARE (result, expected);
228 }
229}
T GetTaskResult(Task< T, Extensions... > task)
Definition getresult.h:19
Channel_ptr< T > MergeChannels(QVector< Channel_ptr< T > > channels)