xref: /llvm-project/llvm/unittests/Support/ThreadPool.cpp (revision d768bf994f508d7eaf9541a568be3d71096febf5)
1 //========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
9 #include "llvm/Support/ThreadPool.h"
10 
11 #include "llvm/ADT/STLExtras.h"
12 #include "llvm/ADT/SetVector.h"
13 #include "llvm/ADT/SmallVector.h"
14 #include "llvm/Support/CommandLine.h"
15 #include "llvm/Support/Program.h"
16 #include "llvm/Support/TargetSelect.h"
17 #include "llvm/Support/Threading.h"
18 #include "llvm/TargetParser/Host.h"
19 #include "llvm/TargetParser/Triple.h"
20 
21 #ifdef _WIN32
22 #include "llvm/Support/Windows/WindowsSupport.h"
23 #endif
24 
25 #include <chrono>
26 #include <thread>
27 
28 #include "gtest/gtest.h"
29 
30 using namespace llvm;
31 
32 // Fixture for the unittests, allowing to *temporarily* disable the unittests
33 // on a particular platform
34 class ThreadPoolTest : public testing::Test {
35   Triple Host;
36   SmallVector<Triple::ArchType, 4> UnsupportedArchs;
37   SmallVector<Triple::OSType, 4> UnsupportedOSs;
38   SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments;
39 
40 protected:
41   // This is intended for platform as a temporary "XFAIL"
42   bool isUnsupportedOSOrEnvironment() {
43     Triple Host(Triple::normalize(sys::getProcessTriple()));
44 
45     if (find(UnsupportedEnvironments, Host.getEnvironment()) !=
46         UnsupportedEnvironments.end())
47       return true;
48 
49     if (is_contained(UnsupportedOSs, Host.getOS()))
50       return true;
51 
52     if (is_contained(UnsupportedArchs, Host.getArch()))
53       return true;
54 
55     return false;
56   }
57 
58   ThreadPoolTest() {
59     // Add unsupported configuration here, example:
60     //   UnsupportedArchs.push_back(Triple::x86_64);
61 
62     // See https://llvm.org/bugs/show_bug.cgi?id=25829
63     UnsupportedArchs.push_back(Triple::ppc64le);
64     UnsupportedArchs.push_back(Triple::ppc64);
65   }
66 
67   /// Make sure this thread not progress faster than the main thread.
68   void waitForMainThread() { waitForPhase(1); }
69 
70   /// Set the readiness of the main thread.
71   void setMainThreadReady() { setPhase(1); }
72 
73   /// Wait until given phase is set using setPhase(); first "main" phase is 1.
74   /// See also PhaseResetHelper below.
75   void waitForPhase(int Phase) {
76     std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex);
77     CurrentPhaseCondition.wait(
78         LockGuard, [&] { return CurrentPhase == Phase || CurrentPhase < 0; });
79   }
80   /// If a thread waits on another phase, the test could bail out on a failed
81   /// assertion and ThreadPool destructor would wait() on all threads, which
82   /// would deadlock on the task waiting. Create this helper to automatically
83   /// reset the phase and unblock such threads.
84   struct PhaseResetHelper {
85     PhaseResetHelper(ThreadPoolTest *test) : test(test) {}
86     ~PhaseResetHelper() { test->setPhase(-1); }
87     ThreadPoolTest *test;
88   };
89 
90   /// Advance to the given phase.
91   void setPhase(int Phase) {
92     {
93       std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex);
94       assert(Phase == CurrentPhase + 1 || Phase < 0);
95       CurrentPhase = Phase;
96     }
97     CurrentPhaseCondition.notify_all();
98   }
99 
100   void SetUp() override { CurrentPhase = 0; }
101 
102   std::vector<llvm::BitVector> RunOnAllSockets(ThreadPoolStrategy S);
103 
104   std::condition_variable CurrentPhaseCondition;
105   std::mutex CurrentPhaseMutex;
106   int CurrentPhase; // -1 = error, 0 = setup, 1 = ready, 2+ = custom
107 };
108 
109 #define CHECK_UNSUPPORTED()                                                    \
110   do {                                                                         \
111     if (isUnsupportedOSOrEnvironment())                                        \
112       GTEST_SKIP();                                                            \
113   } while (0);
114 
115 TEST_F(ThreadPoolTest, AsyncBarrier) {
116   CHECK_UNSUPPORTED();
117   // test that async & barrier work together properly.
118 
119   std::atomic_int checked_in{0};
120 
121   ThreadPool Pool;
122   for (size_t i = 0; i < 5; ++i) {
123     Pool.async([this, &checked_in] {
124       waitForMainThread();
125       ++checked_in;
126     });
127   }
128   ASSERT_EQ(0, checked_in);
129   setMainThreadReady();
130   Pool.wait();
131   ASSERT_EQ(5, checked_in);
132 }
133 
134 static void TestFunc(std::atomic_int &checked_in, int i) { checked_in += i; }
135 
136 TEST_F(ThreadPoolTest, AsyncBarrierArgs) {
137   CHECK_UNSUPPORTED();
138   // Test that async works with a function requiring multiple parameters.
139   std::atomic_int checked_in{0};
140 
141   ThreadPool Pool;
142   for (size_t i = 0; i < 5; ++i) {
143     Pool.async(TestFunc, std::ref(checked_in), i);
144   }
145   Pool.wait();
146   ASSERT_EQ(10, checked_in);
147 }
148 
149 TEST_F(ThreadPoolTest, Async) {
150   CHECK_UNSUPPORTED();
151   ThreadPool Pool;
152   std::atomic_int i{0};
153   Pool.async([this, &i] {
154     waitForMainThread();
155     ++i;
156   });
157   Pool.async([&i] { ++i; });
158   ASSERT_NE(2, i.load());
159   setMainThreadReady();
160   Pool.wait();
161   ASSERT_EQ(2, i.load());
162 }
163 
164 TEST_F(ThreadPoolTest, GetFuture) {
165   CHECK_UNSUPPORTED();
166   ThreadPool Pool(hardware_concurrency(2));
167   std::atomic_int i{0};
168   Pool.async([this, &i] {
169     waitForMainThread();
170     ++i;
171   });
172   // Force the future using get()
173   Pool.async([&i] { ++i; }).get();
174   ASSERT_NE(2, i.load());
175   setMainThreadReady();
176   Pool.wait();
177   ASSERT_EQ(2, i.load());
178 }
179 
180 TEST_F(ThreadPoolTest, GetFutureWithResult) {
181   CHECK_UNSUPPORTED();
182   ThreadPool Pool(hardware_concurrency(2));
183   auto F1 = Pool.async([] { return 1; });
184   auto F2 = Pool.async([] { return 2; });
185 
186   setMainThreadReady();
187   Pool.wait();
188   ASSERT_EQ(1, F1.get());
189   ASSERT_EQ(2, F2.get());
190 }
191 
192 TEST_F(ThreadPoolTest, GetFutureWithResultAndArgs) {
193   CHECK_UNSUPPORTED();
194   ThreadPool Pool(hardware_concurrency(2));
195   auto Fn = [](int x) { return x; };
196   auto F1 = Pool.async(Fn, 1);
197   auto F2 = Pool.async(Fn, 2);
198 
199   setMainThreadReady();
200   Pool.wait();
201   ASSERT_EQ(1, F1.get());
202   ASSERT_EQ(2, F2.get());
203 }
204 
205 TEST_F(ThreadPoolTest, PoolDestruction) {
206   CHECK_UNSUPPORTED();
207   // Test that we are waiting on destruction
208   std::atomic_int checked_in{0};
209   {
210     ThreadPool Pool;
211     for (size_t i = 0; i < 5; ++i) {
212       Pool.async([this, &checked_in] {
213         waitForMainThread();
214         ++checked_in;
215       });
216     }
217     ASSERT_EQ(0, checked_in);
218     setMainThreadReady();
219   }
220   ASSERT_EQ(5, checked_in);
221 }
222 
223 // Check running tasks in different groups.
224 TEST_F(ThreadPoolTest, Groups) {
225   CHECK_UNSUPPORTED();
226   // Need at least two threads, as the task in group2
227   // might block a thread until all tasks in group1 finish.
228   ThreadPoolStrategy S = hardware_concurrency(2);
229   if (S.compute_thread_count() < 2)
230     GTEST_SKIP();
231   ThreadPool Pool(S);
232   PhaseResetHelper Helper(this);
233   ThreadPoolTaskGroup Group1(Pool);
234   ThreadPoolTaskGroup Group2(Pool);
235 
236   // Check that waiting for an empty group is a no-op.
237   Group1.wait();
238 
239   std::atomic_int checked_in1{0};
240   std::atomic_int checked_in2{0};
241 
242   for (size_t i = 0; i < 5; ++i) {
243     Group1.async([this, &checked_in1] {
244       waitForMainThread();
245       ++checked_in1;
246     });
247   }
248   Group2.async([this, &checked_in2] {
249     waitForPhase(2);
250     ++checked_in2;
251   });
252   ASSERT_EQ(0, checked_in1);
253   ASSERT_EQ(0, checked_in2);
254   // Start first group and wait for it.
255   setMainThreadReady();
256   Group1.wait();
257   ASSERT_EQ(5, checked_in1);
258   // Second group has not yet finished, start it and wait for it.
259   ASSERT_EQ(0, checked_in2);
260   setPhase(2);
261   Group2.wait();
262   ASSERT_EQ(5, checked_in1);
263   ASSERT_EQ(1, checked_in2);
264 }
265 
266 // Check recursive tasks.
267 TEST_F(ThreadPoolTest, RecursiveGroups) {
268   CHECK_UNSUPPORTED();
269   ThreadPool Pool;
270   ThreadPoolTaskGroup Group(Pool);
271 
272   std::atomic_int checked_in1{0};
273 
274   for (size_t i = 0; i < 5; ++i) {
275     Group.async([this, &Pool, &checked_in1] {
276       waitForMainThread();
277 
278       ThreadPoolTaskGroup LocalGroup(Pool);
279 
280       // Check that waiting for an empty group is a no-op.
281       LocalGroup.wait();
282 
283       std::atomic_int checked_in2{0};
284       for (size_t i = 0; i < 5; ++i) {
285         LocalGroup.async([&checked_in2] { ++checked_in2; });
286       }
287       LocalGroup.wait();
288       ASSERT_EQ(5, checked_in2);
289 
290       ++checked_in1;
291     });
292   }
293   ASSERT_EQ(0, checked_in1);
294   setMainThreadReady();
295   Group.wait();
296   ASSERT_EQ(5, checked_in1);
297 }
298 
299 TEST_F(ThreadPoolTest, RecursiveWaitDeadlock) {
300   CHECK_UNSUPPORTED();
301   ThreadPoolStrategy S = hardware_concurrency(2);
302   if (S.compute_thread_count() < 2)
303     GTEST_SKIP();
304   ThreadPool Pool(S);
305   PhaseResetHelper Helper(this);
306   ThreadPoolTaskGroup Group(Pool);
307 
308   // Test that a thread calling wait() for a group and is waiting for more tasks
309   // returns when the last task finishes in a different thread while the waiting
310   // thread was waiting for more tasks to process while waiting.
311 
312   // Task A runs in the first thread. It finishes and leaves
313   // the background thread waiting for more tasks.
314   Group.async([this] {
315     waitForMainThread();
316     setPhase(2);
317   });
318   // Task B is run in a second thread, it launches yet another
319   // task C in a different group, which will be handled by the waiting
320   // thread started above.
321   Group.async([this, &Pool] {
322     waitForPhase(2);
323     ThreadPoolTaskGroup LocalGroup(Pool);
324     LocalGroup.async([this] {
325       waitForPhase(3);
326       // Give the other thread enough time to check that there's no task
327       // to process and suspend waiting for a notification. This is indeed racy,
328       // but probably the best that can be done.
329       std::this_thread::sleep_for(std::chrono::milliseconds(10));
330     });
331     // And task B only now will wait for the tasks in the group (=task C)
332     // to finish. This test checks that it does not deadlock. If the
333     // `NotifyGroup` handling in ThreadPool::processTasks() didn't take place,
334     // this task B would be stuck waiting for tasks to arrive.
335     setPhase(3);
336     LocalGroup.wait();
337   });
338   setMainThreadReady();
339   Group.wait();
340 }
341 
342 #if LLVM_ENABLE_THREADS == 1
343 
344 // FIXME: Skip some tests below on non-Windows because multi-socket systems
345 // were not fully tested on Unix yet, and llvm::get_thread_affinity_mask()
346 // isn't implemented for Unix (need AffinityMask in Support/Unix/Program.inc).
347 #ifdef _WIN32
348 
349 std::vector<llvm::BitVector>
350 ThreadPoolTest::RunOnAllSockets(ThreadPoolStrategy S) {
351   llvm::SetVector<llvm::BitVector> ThreadsUsed;
352   std::mutex Lock;
353   {
354     std::condition_variable AllThreads;
355     std::mutex AllThreadsLock;
356     unsigned Active = 0;
357 
358     ThreadPool Pool(S);
359     for (size_t I = 0; I < S.compute_thread_count(); ++I) {
360       Pool.async([&] {
361         {
362           std::lock_guard<std::mutex> Guard(AllThreadsLock);
363           ++Active;
364           AllThreads.notify_one();
365         }
366         waitForMainThread();
367         std::lock_guard<std::mutex> Guard(Lock);
368         auto Mask = llvm::get_thread_affinity_mask();
369         ThreadsUsed.insert(Mask);
370       });
371     }
372     EXPECT_EQ(true, ThreadsUsed.empty());
373     {
374       std::unique_lock<std::mutex> Guard(AllThreadsLock);
375       AllThreads.wait(Guard,
376                       [&]() { return Active == S.compute_thread_count(); });
377     }
378     setMainThreadReady();
379   }
380   return ThreadsUsed.takeVector();
381 }
382 
383 TEST_F(ThreadPoolTest, AllThreads_UseAllRessources) {
384   CHECK_UNSUPPORTED();
385   // After Windows 11, the OS is free to deploy the threads on any CPU socket.
386   // We cannot relibly ensure that all thread affinity mask are covered,
387   // therefore this test should not run.
388   if (llvm::RunningWindows11OrGreater())
389     GTEST_SKIP();
390   std::vector<llvm::BitVector> ThreadsUsed = RunOnAllSockets({});
391   ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
392 }
393 
394 TEST_F(ThreadPoolTest, AllThreads_OneThreadPerCore) {
395   CHECK_UNSUPPORTED();
396   // After Windows 11, the OS is free to deploy the threads on any CPU socket.
397   // We cannot relibly ensure that all thread affinity mask are covered,
398   // therefore this test should not run.
399   if (llvm::RunningWindows11OrGreater())
400     GTEST_SKIP();
401   std::vector<llvm::BitVector> ThreadsUsed =
402       RunOnAllSockets(llvm::heavyweight_hardware_concurrency());
403   ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
404 }
405 
406 // From TestMain.cpp.
407 extern const char *TestMainArgv0;
408 
409 // Just a reachable symbol to ease resolving of the executable's path.
410 static cl::opt<std::string> ThreadPoolTestStringArg1("thread-pool-string-arg1");
411 
412 #ifdef _WIN32
413 #define setenv(name, var, ignore) _putenv_s(name, var)
414 #endif
415 
416 TEST_F(ThreadPoolTest, AffinityMask) {
417   CHECK_UNSUPPORTED();
418 
419   // Skip this test if less than 4 threads are available.
420   if (llvm::hardware_concurrency().compute_thread_count() < 4)
421     GTEST_SKIP();
422 
423   using namespace llvm::sys;
424   if (getenv("LLVM_THREADPOOL_AFFINITYMASK")) {
425     std::vector<llvm::BitVector> ThreadsUsed = RunOnAllSockets({});
426     // Ensure the threads only ran on CPUs 0-3.
427     // NOTE: Don't use ASSERT* here because this runs in a subprocess,
428     // and will show up as un-executed in the parent.
429     assert(llvm::all_of(ThreadsUsed,
430                         [](auto &T) { return T.getData().front() < 16UL; }) &&
431            "Threads ran on more CPUs than expected! The affinity mask does not "
432            "seem to work.");
433     GTEST_SKIP();
434   }
435   std::string Executable =
436       sys::fs::getMainExecutable(TestMainArgv0, &ThreadPoolTestStringArg1);
437   StringRef argv[] = {Executable, "--gtest_filter=ThreadPoolTest.AffinityMask"};
438 
439   // Add environment variable to the environment of the child process.
440   int Res = setenv("LLVM_THREADPOOL_AFFINITYMASK", "1", false);
441   ASSERT_EQ(Res, 0);
442 
443   std::string Error;
444   bool ExecutionFailed;
445   BitVector Affinity;
446   Affinity.resize(4);
447   Affinity.set(0, 4); // Use CPUs 0,1,2,3.
448   int Ret = sys::ExecuteAndWait(Executable, argv, {}, {}, 0, 0, &Error,
449                                 &ExecutionFailed, nullptr, &Affinity);
450   ASSERT_EQ(0, Ret);
451 }
452 
453 #endif // #ifdef _WIN32
454 #endif // #if LLVM_ENABLE_THREADS == 1
455