xref: /llvm-project/llvm/unittests/Support/ThreadPool.cpp (revision 6149e57dc1313d32c85524f8009a1249e0b8f4d1)
1 //========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
9 #include "llvm/Support/ThreadPool.h"
10 
11 #include "llvm/ADT/STLExtras.h"
12 #include "llvm/ADT/SetVector.h"
13 #include "llvm/ADT/SmallVector.h"
14 #include "llvm/ADT/Triple.h"
15 #include "llvm/Support/CommandLine.h"
16 #include "llvm/Support/Host.h"
17 #include "llvm/Support/Program.h"
18 #include "llvm/Support/TargetSelect.h"
19 #include "llvm/Support/Threading.h"
20 
21 #include "gtest/gtest.h"
22 
23 using namespace llvm;
24 
25 // Fixture for the unittests, allowing to *temporarily* disable the unittests
26 // on a particular platform
27 class ThreadPoolTest : public testing::Test {
28   Triple Host;
29   SmallVector<Triple::ArchType, 4> UnsupportedArchs;
30   SmallVector<Triple::OSType, 4> UnsupportedOSs;
31   SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments;
32 protected:
33   // This is intended for platform as a temporary "XFAIL"
34   bool isUnsupportedOSOrEnvironment() {
35     Triple Host(Triple::normalize(sys::getProcessTriple()));
36 
37     if (find(UnsupportedEnvironments, Host.getEnvironment()) !=
38         UnsupportedEnvironments.end())
39       return true;
40 
41     if (is_contained(UnsupportedOSs, Host.getOS()))
42       return true;
43 
44     if (is_contained(UnsupportedArchs, Host.getArch()))
45       return true;
46 
47     return false;
48   }
49 
50   ThreadPoolTest() {
51     // Add unsupported configuration here, example:
52     //   UnsupportedArchs.push_back(Triple::x86_64);
53 
54     // See https://llvm.org/bugs/show_bug.cgi?id=25829
55     UnsupportedArchs.push_back(Triple::ppc64le);
56     UnsupportedArchs.push_back(Triple::ppc64);
57   }
58 
59   /// Make sure this thread not progress faster than the main thread.
60   void waitForMainThread() {
61     std::unique_lock<std::mutex> LockGuard(WaitMainThreadMutex);
62     WaitMainThread.wait(LockGuard, [&] { return MainThreadReady; });
63   }
64 
65   /// Set the readiness of the main thread.
66   void setMainThreadReady() {
67     {
68       std::unique_lock<std::mutex> LockGuard(WaitMainThreadMutex);
69       MainThreadReady = true;
70     }
71     WaitMainThread.notify_all();
72   }
73 
74   void SetUp() override { MainThreadReady = false; }
75 
76   std::vector<llvm::BitVector> RunOnAllSockets(ThreadPoolStrategy S);
77 
78   std::condition_variable WaitMainThread;
79   std::mutex WaitMainThreadMutex;
80   bool MainThreadReady = false;
81 };
82 
83 #define CHECK_UNSUPPORTED()                                                    \
84   do {                                                                         \
85     if (isUnsupportedOSOrEnvironment())                                        \
86       return;                                                                  \
87   } while (0);
88 
89 TEST_F(ThreadPoolTest, AsyncBarrier) {
90   CHECK_UNSUPPORTED();
91   // test that async & barrier work together properly.
92 
93   std::atomic_int checked_in{0};
94 
95   ThreadPool Pool;
96   for (size_t i = 0; i < 5; ++i) {
97     Pool.async([this, &checked_in] {
98       waitForMainThread();
99       ++checked_in;
100     });
101   }
102   ASSERT_EQ(0, checked_in);
103   setMainThreadReady();
104   Pool.wait();
105   ASSERT_EQ(5, checked_in);
106 }
107 
108 static void TestFunc(std::atomic_int &checked_in, int i) { checked_in += i; }
109 
110 TEST_F(ThreadPoolTest, AsyncBarrierArgs) {
111   CHECK_UNSUPPORTED();
112   // Test that async works with a function requiring multiple parameters.
113   std::atomic_int checked_in{0};
114 
115   ThreadPool Pool;
116   for (size_t i = 0; i < 5; ++i) {
117     Pool.async(TestFunc, std::ref(checked_in), i);
118   }
119   Pool.wait();
120   ASSERT_EQ(10, checked_in);
121 }
122 
123 TEST_F(ThreadPoolTest, Async) {
124   CHECK_UNSUPPORTED();
125   ThreadPool Pool;
126   std::atomic_int i{0};
127   Pool.async([this, &i] {
128     waitForMainThread();
129     ++i;
130   });
131   Pool.async([&i] { ++i; });
132   ASSERT_NE(2, i.load());
133   setMainThreadReady();
134   Pool.wait();
135   ASSERT_EQ(2, i.load());
136 }
137 
138 TEST_F(ThreadPoolTest, GetFuture) {
139   CHECK_UNSUPPORTED();
140   ThreadPool Pool(hardware_concurrency(2));
141   std::atomic_int i{0};
142   Pool.async([this, &i] {
143     waitForMainThread();
144     ++i;
145   });
146   // Force the future using get()
147   Pool.async([&i] { ++i; }).get();
148   ASSERT_NE(2, i.load());
149   setMainThreadReady();
150   Pool.wait();
151   ASSERT_EQ(2, i.load());
152 }
153 
154 TEST_F(ThreadPoolTest, GetFutureWithResult) {
155   CHECK_UNSUPPORTED();
156   ThreadPool Pool(hardware_concurrency(2));
157   auto F1 = Pool.async([] { return 1; });
158   auto F2 = Pool.async([] { return 2; });
159 
160   setMainThreadReady();
161   Pool.wait();
162   ASSERT_EQ(1, F1.get());
163   ASSERT_EQ(2, F2.get());
164 }
165 
166 TEST_F(ThreadPoolTest, GetFutureWithResultAndArgs) {
167   CHECK_UNSUPPORTED();
168   ThreadPool Pool(hardware_concurrency(2));
169   auto Fn = [](int x) { return x; };
170   auto F1 = Pool.async(Fn, 1);
171   auto F2 = Pool.async(Fn, 2);
172 
173   setMainThreadReady();
174   Pool.wait();
175   ASSERT_EQ(1, F1.get());
176   ASSERT_EQ(2, F2.get());
177 }
178 
179 TEST_F(ThreadPoolTest, PoolDestruction) {
180   CHECK_UNSUPPORTED();
181   // Test that we are waiting on destruction
182   std::atomic_int checked_in{0};
183   {
184     ThreadPool Pool;
185     for (size_t i = 0; i < 5; ++i) {
186       Pool.async([this, &checked_in] {
187         waitForMainThread();
188         ++checked_in;
189       });
190     }
191     ASSERT_EQ(0, checked_in);
192     setMainThreadReady();
193   }
194   ASSERT_EQ(5, checked_in);
195 }
196 
197 #if LLVM_ENABLE_THREADS == 1
198 
199 // FIXME: Skip some tests below on non-Windows because multi-socket systems
200 // were not fully tested on Unix yet, and llvm::get_thread_affinity_mask()
201 // isn't implemented for Unix (need AffinityMask in Support/Unix/Program.inc).
202 #ifdef _WIN32
203 
204 std::vector<llvm::BitVector>
205 ThreadPoolTest::RunOnAllSockets(ThreadPoolStrategy S) {
206   llvm::SetVector<llvm::BitVector> ThreadsUsed;
207   std::mutex Lock;
208   {
209     std::condition_variable AllThreads;
210     std::mutex AllThreadsLock;
211     unsigned Active = 0;
212 
213     ThreadPool Pool(S);
214     for (size_t I = 0; I < S.compute_thread_count(); ++I) {
215       Pool.async([&] {
216         {
217           std::lock_guard<std::mutex> Guard(AllThreadsLock);
218           ++Active;
219           AllThreads.notify_one();
220         }
221         waitForMainThread();
222         std::lock_guard<std::mutex> Guard(Lock);
223         auto Mask = llvm::get_thread_affinity_mask();
224         ThreadsUsed.insert(Mask);
225       });
226     }
227     EXPECT_EQ(true, ThreadsUsed.empty());
228     {
229       std::unique_lock<std::mutex> Guard(AllThreadsLock);
230       AllThreads.wait(Guard,
231                       [&]() { return Active == S.compute_thread_count(); });
232     }
233     setMainThreadReady();
234   }
235   return ThreadsUsed.takeVector();
236 }
237 
238 TEST_F(ThreadPoolTest, AllThreads_UseAllRessources) {
239   CHECK_UNSUPPORTED();
240   std::vector<llvm::BitVector> ThreadsUsed = RunOnAllSockets({});
241   ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
242 }
243 
244 TEST_F(ThreadPoolTest, AllThreads_OneThreadPerCore) {
245   CHECK_UNSUPPORTED();
246   std::vector<llvm::BitVector> ThreadsUsed =
247       RunOnAllSockets(llvm::heavyweight_hardware_concurrency());
248   ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
249 }
250 
251 // From TestMain.cpp.
252 extern const char *TestMainArgv0;
253 
254 // Just a reachable symbol to ease resolving of the executable's path.
255 static cl::opt<std::string> ThreadPoolTestStringArg1("thread-pool-string-arg1");
256 
257 #ifdef _WIN32
258 #define setenv(name, var, ignore) _putenv_s(name, var)
259 #endif
260 
261 TEST_F(ThreadPoolTest, AffinityMask) {
262   CHECK_UNSUPPORTED();
263 
264   // Skip this test if less than 4 threads are available.
265   if (llvm::hardware_concurrency().compute_thread_count() < 4)
266     return;
267 
268   using namespace llvm::sys;
269   if (getenv("LLVM_THREADPOOL_AFFINITYMASK")) {
270     std::vector<llvm::BitVector> ThreadsUsed = RunOnAllSockets({});
271     // Ensure the threads only ran on CPUs 0-3.
272     // NOTE: Don't use ASSERT* here because this runs in a subprocess,
273     // and will show up as un-executed in the parent.
274     assert(llvm::all_of(ThreadsUsed,
275                         [](auto &T) { return T.getData().front() < 16UL; }) &&
276            "Threads ran on more CPUs than expected! The affinity mask does not "
277            "seem to work.");
278     return;
279   }
280   std::string Executable =
281       sys::fs::getMainExecutable(TestMainArgv0, &ThreadPoolTestStringArg1);
282   StringRef argv[] = {Executable, "--gtest_filter=ThreadPoolTest.AffinityMask"};
283 
284   // Add environment variable to the environment of the child process.
285   int Res = setenv("LLVM_THREADPOOL_AFFINITYMASK", "1", false);
286   ASSERT_EQ(Res, 0);
287 
288   std::string Error;
289   bool ExecutionFailed;
290   BitVector Affinity;
291   Affinity.resize(4);
292   Affinity.set(0, 4); // Use CPUs 0,1,2,3.
293   int Ret = sys::ExecuteAndWait(Executable, argv, {}, {}, 0, 0, &Error,
294                                 &ExecutionFailed, nullptr, &Affinity);
295   ASSERT_EQ(0, Ret);
296 }
297 
298 #endif // #ifdef _WIN32
299 #endif // #if LLVM_ENABLE_THREADS == 1
300