xref: /llvm-project/llvm/unittests/Support/ThreadPool.cpp (revision 89e6a288674c9fae33aeb5448c7b1fe782b2bf53)
1 //========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
9 #include "llvm/Support/ThreadPool.h"
10 #include "llvm/ADT/STLExtras.h"
11 #include "llvm/ADT/SetVector.h"
12 #include "llvm/ADT/SmallVector.h"
13 #include "llvm/Config/llvm-config.h" // for LLVM_ENABLE_THREADS
14 #include "llvm/Support/CommandLine.h"
15 #include "llvm/Support/Program.h"
16 #include "llvm/Support/TargetSelect.h"
17 #include "llvm/Support/Threading.h"
18 #include "llvm/TargetParser/Host.h"
19 #include "llvm/TargetParser/Triple.h"
20 
21 #ifdef _WIN32
22 #include "llvm/Support/Windows/WindowsSupport.h"
23 #endif
24 
25 #include <chrono>
26 #include <thread>
27 
28 #include "gtest/gtest.h"
29 
30 namespace testing {
31 namespace internal {
32 // Specialize gtest construct to provide friendlier name in the output.
33 #if LLVM_ENABLE_THREADS
34 template <> std::string GetTypeName<llvm::StdThreadPool>() {
35   return "llvm::StdThreadPool";
36 }
37 #endif
38 template <> std::string GetTypeName<llvm::SingleThreadExecutor>() {
39   return "llvm::SingleThreadExecutor";
40 }
41 } // namespace internal
42 } // namespace testing
43 
44 using namespace llvm;
45 
46 // Fixture for the unittests, allowing to *temporarily* disable the unittests
47 // on a particular platform
48 template <typename ThreadPoolImpl> class ThreadPoolTest : public testing::Test {
49   Triple Host;
50   SmallVector<Triple::ArchType, 4> UnsupportedArchs;
51   SmallVector<Triple::OSType, 4> UnsupportedOSs;
52   SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments;
53 
54 protected:
55   // This is intended for platform as a temporary "XFAIL"
56   bool isUnsupportedOSOrEnvironment() {
57     Triple Host(Triple::normalize(sys::getProcessTriple()));
58 
59     if (find(UnsupportedEnvironments, Host.getEnvironment()) !=
60         UnsupportedEnvironments.end())
61       return true;
62 
63     if (is_contained(UnsupportedOSs, Host.getOS()))
64       return true;
65 
66     if (is_contained(UnsupportedArchs, Host.getArch()))
67       return true;
68 
69     return false;
70   }
71 
72   ThreadPoolTest() {
73     // Add unsupported configuration here, example:
74     //   UnsupportedArchs.push_back(Triple::x86_64);
75 
76     // See https://llvm.org/bugs/show_bug.cgi?id=25829
77     UnsupportedArchs.push_back(Triple::ppc64le);
78     UnsupportedArchs.push_back(Triple::ppc64);
79   }
80 
81   /// Make sure this thread not progress faster than the main thread.
82   void waitForMainThread() { waitForPhase(1); }
83 
84   /// Set the readiness of the main thread.
85   void setMainThreadReady() { setPhase(1); }
86 
87   /// Wait until given phase is set using setPhase(); first "main" phase is 1.
88   /// See also PhaseResetHelper below.
89   void waitForPhase(int Phase) {
90     std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex);
91     CurrentPhaseCondition.wait(
92         LockGuard, [&] { return CurrentPhase == Phase || CurrentPhase < 0; });
93   }
94   /// If a thread waits on another phase, the test could bail out on a failed
95   /// assertion and ThreadPool destructor would wait() on all threads, which
96   /// would deadlock on the task waiting. Create this helper to automatically
97   /// reset the phase and unblock such threads.
98   struct PhaseResetHelper {
99     PhaseResetHelper(ThreadPoolTest *test) : test(test) {}
100     ~PhaseResetHelper() { test->setPhase(-1); }
101     ThreadPoolTest *test;
102   };
103 
104   /// Advance to the given phase.
105   void setPhase(int Phase) {
106     {
107       std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex);
108       assert(Phase == CurrentPhase + 1 || Phase < 0);
109       CurrentPhase = Phase;
110     }
111     CurrentPhaseCondition.notify_all();
112   }
113 
114   void SetUp() override { CurrentPhase = 0; }
115 
116   SmallVector<llvm::BitVector, 0> RunOnAllSockets(ThreadPoolStrategy S);
117 
118   std::condition_variable CurrentPhaseCondition;
119   std::mutex CurrentPhaseMutex;
120   int CurrentPhase; // -1 = error, 0 = setup, 1 = ready, 2+ = custom
121 };
122 
123 using ThreadPoolImpls = ::testing::Types<
124 #if LLVM_ENABLE_THREADS
125     StdThreadPool,
126 #endif
127     SingleThreadExecutor>;
128 
129 TYPED_TEST_SUITE(ThreadPoolTest, ThreadPoolImpls, );
130 
131 #define CHECK_UNSUPPORTED()                                                    \
132   do {                                                                         \
133     if (this->isUnsupportedOSOrEnvironment())                                  \
134       GTEST_SKIP();                                                            \
135   } while (0);
136 
137 TYPED_TEST(ThreadPoolTest, AsyncBarrier) {
138   CHECK_UNSUPPORTED();
139   // test that async & barrier work together properly.
140 
141   std::atomic_int checked_in{0};
142 
143   DefaultThreadPool Pool;
144   for (size_t i = 0; i < 5; ++i) {
145     Pool.async([this, &checked_in] {
146       this->waitForMainThread();
147       ++checked_in;
148     });
149   }
150   ASSERT_EQ(0, checked_in);
151   this->setMainThreadReady();
152   Pool.wait();
153   ASSERT_EQ(5, checked_in);
154 }
155 
156 static void TestFunc(std::atomic_int &checked_in, int i) { checked_in += i; }
157 
158 TYPED_TEST(ThreadPoolTest, AsyncBarrierArgs) {
159   CHECK_UNSUPPORTED();
160   // Test that async works with a function requiring multiple parameters.
161   std::atomic_int checked_in{0};
162 
163   DefaultThreadPool Pool;
164   for (size_t i = 0; i < 5; ++i) {
165     Pool.async(TestFunc, std::ref(checked_in), i);
166   }
167   Pool.wait();
168   ASSERT_EQ(10, checked_in);
169 }
170 
171 TYPED_TEST(ThreadPoolTest, Async) {
172   CHECK_UNSUPPORTED();
173   DefaultThreadPool Pool;
174   std::atomic_int i{0};
175   Pool.async([this, &i] {
176     this->waitForMainThread();
177     ++i;
178   });
179   Pool.async([&i] { ++i; });
180   ASSERT_NE(2, i.load());
181   this->setMainThreadReady();
182   Pool.wait();
183   ASSERT_EQ(2, i.load());
184 }
185 
186 TYPED_TEST(ThreadPoolTest, GetFuture) {
187   CHECK_UNSUPPORTED();
188   DefaultThreadPool Pool(hardware_concurrency(2));
189   std::atomic_int i{0};
190   Pool.async([this, &i] {
191     this->waitForMainThread();
192     ++i;
193   });
194   // Force the future using get()
195   Pool.async([&i] { ++i; }).get();
196   ASSERT_NE(2, i.load());
197   this->setMainThreadReady();
198   Pool.wait();
199   ASSERT_EQ(2, i.load());
200 }
201 
202 TYPED_TEST(ThreadPoolTest, GetFutureWithResult) {
203   CHECK_UNSUPPORTED();
204   DefaultThreadPool Pool(hardware_concurrency(2));
205   auto F1 = Pool.async([] { return 1; });
206   auto F2 = Pool.async([] { return 2; });
207 
208   this->setMainThreadReady();
209   Pool.wait();
210   ASSERT_EQ(1, F1.get());
211   ASSERT_EQ(2, F2.get());
212 }
213 
214 TYPED_TEST(ThreadPoolTest, GetFutureWithResultAndArgs) {
215   CHECK_UNSUPPORTED();
216   DefaultThreadPool Pool(hardware_concurrency(2));
217   auto Fn = [](int x) { return x; };
218   auto F1 = Pool.async(Fn, 1);
219   auto F2 = Pool.async(Fn, 2);
220 
221   this->setMainThreadReady();
222   Pool.wait();
223   ASSERT_EQ(1, F1.get());
224   ASSERT_EQ(2, F2.get());
225 }
226 
227 TYPED_TEST(ThreadPoolTest, PoolDestruction) {
228   CHECK_UNSUPPORTED();
229   // Test that we are waiting on destruction
230   std::atomic_int checked_in{0};
231   {
232     DefaultThreadPool Pool;
233     for (size_t i = 0; i < 5; ++i) {
234       Pool.async([this, &checked_in] {
235         this->waitForMainThread();
236         ++checked_in;
237       });
238     }
239     ASSERT_EQ(0, checked_in);
240     this->setMainThreadReady();
241   }
242   ASSERT_EQ(5, checked_in);
243 }
244 
245 // Check running tasks in different groups.
246 TYPED_TEST(ThreadPoolTest, Groups) {
247   CHECK_UNSUPPORTED();
248   // Need at least two threads, as the task in group2
249   // might block a thread until all tasks in group1 finish.
250   ThreadPoolStrategy S = hardware_concurrency(2);
251   if (S.compute_thread_count() < 2)
252     GTEST_SKIP();
253   DefaultThreadPool Pool(S);
254   typename TestFixture::PhaseResetHelper Helper(this);
255   ThreadPoolTaskGroup Group1(Pool);
256   ThreadPoolTaskGroup Group2(Pool);
257 
258   // Check that waiting for an empty group is a no-op.
259   Group1.wait();
260 
261   std::atomic_int checked_in1{0};
262   std::atomic_int checked_in2{0};
263 
264   for (size_t i = 0; i < 5; ++i) {
265     Group1.async([this, &checked_in1] {
266       this->waitForMainThread();
267       ++checked_in1;
268     });
269   }
270   Group2.async([this, &checked_in2] {
271     this->waitForPhase(2);
272     ++checked_in2;
273   });
274   ASSERT_EQ(0, checked_in1);
275   ASSERT_EQ(0, checked_in2);
276   // Start first group and wait for it.
277   this->setMainThreadReady();
278   Group1.wait();
279   ASSERT_EQ(5, checked_in1);
280   // Second group has not yet finished, start it and wait for it.
281   ASSERT_EQ(0, checked_in2);
282   this->setPhase(2);
283   Group2.wait();
284   ASSERT_EQ(5, checked_in1);
285   ASSERT_EQ(1, checked_in2);
286 }
287 
288 // Check recursive tasks.
289 TYPED_TEST(ThreadPoolTest, RecursiveGroups) {
290   CHECK_UNSUPPORTED();
291   DefaultThreadPool Pool;
292   ThreadPoolTaskGroup Group(Pool);
293 
294   std::atomic_int checked_in1{0};
295 
296   for (size_t i = 0; i < 5; ++i) {
297     Group.async([this, &Pool, &checked_in1] {
298       this->waitForMainThread();
299 
300       ThreadPoolTaskGroup LocalGroup(Pool);
301 
302       // Check that waiting for an empty group is a no-op.
303       LocalGroup.wait();
304 
305       std::atomic_int checked_in2{0};
306       for (size_t i = 0; i < 5; ++i) {
307         LocalGroup.async([&checked_in2] { ++checked_in2; });
308       }
309       LocalGroup.wait();
310       ASSERT_EQ(5, checked_in2);
311 
312       ++checked_in1;
313     });
314   }
315   ASSERT_EQ(0, checked_in1);
316   this->setMainThreadReady();
317   Group.wait();
318   ASSERT_EQ(5, checked_in1);
319 }
320 
321 TYPED_TEST(ThreadPoolTest, RecursiveWaitDeadlock) {
322   CHECK_UNSUPPORTED();
323   ThreadPoolStrategy S = hardware_concurrency(2);
324   if (S.compute_thread_count() < 2)
325     GTEST_SKIP();
326   DefaultThreadPool Pool(S);
327   typename TestFixture::PhaseResetHelper Helper(this);
328   ThreadPoolTaskGroup Group(Pool);
329 
330   // Test that a thread calling wait() for a group and is waiting for more tasks
331   // returns when the last task finishes in a different thread while the waiting
332   // thread was waiting for more tasks to process while waiting.
333 
334   // Task A runs in the first thread. It finishes and leaves
335   // the background thread waiting for more tasks.
336   Group.async([this] {
337     this->waitForMainThread();
338     this->setPhase(2);
339   });
340   // Task B is run in a second thread, it launches yet another
341   // task C in a different group, which will be handled by the waiting
342   // thread started above.
343   Group.async([this, &Pool] {
344     this->waitForPhase(2);
345     ThreadPoolTaskGroup LocalGroup(Pool);
346     LocalGroup.async([this] {
347       this->waitForPhase(3);
348       // Give the other thread enough time to check that there's no task
349       // to process and suspend waiting for a notification. This is indeed racy,
350       // but probably the best that can be done.
351       std::this_thread::sleep_for(std::chrono::milliseconds(10));
352     });
353     // And task B only now will wait for the tasks in the group (=task C)
354     // to finish. This test checks that it does not deadlock. If the
355     // `NotifyGroup` handling in ThreadPool::processTasks() didn't take place,
356     // this task B would be stuck waiting for tasks to arrive.
357     this->setPhase(3);
358     LocalGroup.wait();
359   });
360   this->setMainThreadReady();
361   Group.wait();
362 }
363 
364 #if LLVM_ENABLE_THREADS == 1
365 
366 // FIXME: Skip some tests below on non-Windows because multi-socket systems
367 // were not fully tested on Unix yet, and llvm::get_thread_affinity_mask()
368 // isn't implemented for Unix (need AffinityMask in Support/Unix/Program.inc).
369 #ifdef _WIN32
370 
371 template <typename ThreadPoolImpl>
372 SmallVector<llvm::BitVector, 0>
373 ThreadPoolTest<ThreadPoolImpl>::RunOnAllSockets(ThreadPoolStrategy S) {
374   llvm::SetVector<llvm::BitVector> ThreadsUsed;
375   std::mutex Lock;
376   {
377     std::condition_variable AllThreads;
378     std::mutex AllThreadsLock;
379     unsigned Active = 0;
380 
381     DefaultThreadPool Pool(S);
382     for (size_t I = 0; I < S.compute_thread_count(); ++I) {
383       Pool.async([&] {
384         {
385           std::lock_guard<std::mutex> Guard(AllThreadsLock);
386           ++Active;
387           AllThreads.notify_one();
388         }
389         this->waitForMainThread();
390         std::lock_guard<std::mutex> Guard(Lock);
391         auto Mask = llvm::get_thread_affinity_mask();
392         ThreadsUsed.insert(Mask);
393       });
394     }
395     EXPECT_EQ(true, ThreadsUsed.empty());
396     {
397       std::unique_lock<std::mutex> Guard(AllThreadsLock);
398       AllThreads.wait(Guard,
399                       [&]() { return Active == S.compute_thread_count(); });
400     }
401     this->setMainThreadReady();
402   }
403   return ThreadsUsed.takeVector();
404 }
405 
406 TYPED_TEST(ThreadPoolTest, AllThreads_UseAllRessources) {
407   CHECK_UNSUPPORTED();
408   // After Windows 11, the OS is free to deploy the threads on any CPU socket.
409   // We cannot relibly ensure that all thread affinity mask are covered,
410   // therefore this test should not run.
411   if (llvm::RunningWindows11OrGreater())
412     GTEST_SKIP();
413   auto ThreadsUsed = this->RunOnAllSockets({});
414   ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
415 }
416 
417 TYPED_TEST(ThreadPoolTest, AllThreads_OneThreadPerCore) {
418   CHECK_UNSUPPORTED();
419   // After Windows 11, the OS is free to deploy the threads on any CPU socket.
420   // We cannot relibly ensure that all thread affinity mask are covered,
421   // therefore this test should not run.
422   if (llvm::RunningWindows11OrGreater())
423     GTEST_SKIP();
424   auto ThreadsUsed =
425       this->RunOnAllSockets(llvm::heavyweight_hardware_concurrency());
426   ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
427 }
428 
429 // From TestMain.cpp.
430 extern const char *TestMainArgv0;
431 
432 // Just a reachable symbol to ease resolving of the executable's path.
433 static cl::opt<std::string> ThreadPoolTestStringArg1("thread-pool-string-arg1");
434 
435 #ifdef _WIN32
436 #define setenv(name, var, ignore) _putenv_s(name, var)
437 #endif
438 
439 TYPED_TEST(ThreadPoolTest, AffinityMask) {
440   CHECK_UNSUPPORTED();
441 
442   // Skip this test if less than 4 threads are available.
443   if (llvm::hardware_concurrency().compute_thread_count() < 4)
444     GTEST_SKIP();
445 
446   using namespace llvm::sys;
447   if (getenv("LLVM_THREADPOOL_AFFINITYMASK")) {
448     auto ThreadsUsed = this->RunOnAllSockets({});
449     // Ensure the threads only ran on CPUs 0-3.
450     // NOTE: Don't use ASSERT* here because this runs in a subprocess,
451     // and will show up as un-executed in the parent.
452     assert(llvm::all_of(ThreadsUsed,
453                         [](auto &T) { return T.getData().front() < 16UL; }) &&
454            "Threads ran on more CPUs than expected! The affinity mask does not "
455            "seem to work.");
456     GTEST_SKIP();
457   }
458   std::string Executable =
459       sys::fs::getMainExecutable(TestMainArgv0, &ThreadPoolTestStringArg1);
460   StringRef argv[] = {Executable, "--gtest_filter=ThreadPoolTest.AffinityMask"};
461 
462   // Add environment variable to the environment of the child process.
463   int Res = setenv("LLVM_THREADPOOL_AFFINITYMASK", "1", false);
464   ASSERT_EQ(Res, 0);
465 
466   std::string Error;
467   bool ExecutionFailed;
468   BitVector Affinity;
469   Affinity.resize(4);
470   Affinity.set(0, 4); // Use CPUs 0,1,2,3.
471   int Ret = sys::ExecuteAndWait(Executable, argv, {}, {}, 0, 0, &Error,
472                                 &ExecutionFailed, nullptr, &Affinity);
473   ASSERT_EQ(0, Ret);
474 }
475 
476 #endif // #ifdef _WIN32
477 #endif // #if LLVM_ENABLE_THREADS == 1
478