1 //========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 9 #include "llvm/Support/ThreadPool.h" 10 11 #include "llvm/ADT/STLExtras.h" 12 #include "llvm/ADT/SetVector.h" 13 #include "llvm/ADT/SmallVector.h" 14 #include "llvm/Support/CommandLine.h" 15 #include "llvm/Support/Program.h" 16 #include "llvm/Support/TargetSelect.h" 17 #include "llvm/Support/Threading.h" 18 #include "llvm/TargetParser/Host.h" 19 #include "llvm/TargetParser/Triple.h" 20 21 #ifdef _WIN32 22 #include "llvm/Support/Windows/WindowsSupport.h" 23 #endif 24 25 #include <chrono> 26 #include <thread> 27 28 #include "gtest/gtest.h" 29 30 using namespace llvm; 31 32 // Fixture for the unittests, allowing to *temporarily* disable the unittests 33 // on a particular platform 34 class ThreadPoolTest : public testing::Test { 35 Triple Host; 36 SmallVector<Triple::ArchType, 4> UnsupportedArchs; 37 SmallVector<Triple::OSType, 4> UnsupportedOSs; 38 SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments; 39 40 protected: 41 // This is intended for platform as a temporary "XFAIL" 42 bool isUnsupportedOSOrEnvironment() { 43 Triple Host(Triple::normalize(sys::getProcessTriple())); 44 45 if (find(UnsupportedEnvironments, Host.getEnvironment()) != 46 UnsupportedEnvironments.end()) 47 return true; 48 49 if (is_contained(UnsupportedOSs, Host.getOS())) 50 return true; 51 52 if (is_contained(UnsupportedArchs, Host.getArch())) 53 return true; 54 55 return false; 56 } 57 58 ThreadPoolTest() { 59 // Add unsupported configuration here, example: 60 // UnsupportedArchs.push_back(Triple::x86_64); 61 62 // See https://llvm.org/bugs/show_bug.cgi?id=25829 63 UnsupportedArchs.push_back(Triple::ppc64le); 64 UnsupportedArchs.push_back(Triple::ppc64); 65 } 66 67 /// Make sure this thread not progress faster than the main thread. 68 void waitForMainThread() { waitForPhase(1); } 69 70 /// Set the readiness of the main thread. 71 void setMainThreadReady() { setPhase(1); } 72 73 /// Wait until given phase is set using setPhase(); first "main" phase is 1. 74 /// See also PhaseResetHelper below. 75 void waitForPhase(int Phase) { 76 std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex); 77 CurrentPhaseCondition.wait( 78 LockGuard, [&] { return CurrentPhase == Phase || CurrentPhase < 0; }); 79 } 80 /// If a thread waits on another phase, the test could bail out on a failed 81 /// assertion and ThreadPool destructor would wait() on all threads, which 82 /// would deadlock on the task waiting. Create this helper to automatically 83 /// reset the phase and unblock such threads. 84 struct PhaseResetHelper { 85 PhaseResetHelper(ThreadPoolTest *test) : test(test) {} 86 ~PhaseResetHelper() { test->setPhase(-1); } 87 ThreadPoolTest *test; 88 }; 89 90 /// Advance to the given phase. 91 void setPhase(int Phase) { 92 { 93 std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex); 94 assert(Phase == CurrentPhase + 1 || Phase < 0); 95 CurrentPhase = Phase; 96 } 97 CurrentPhaseCondition.notify_all(); 98 } 99 100 void SetUp() override { CurrentPhase = 0; } 101 102 std::vector<llvm::BitVector> RunOnAllSockets(ThreadPoolStrategy S); 103 104 std::condition_variable CurrentPhaseCondition; 105 std::mutex CurrentPhaseMutex; 106 int CurrentPhase; // -1 = error, 0 = setup, 1 = ready, 2+ = custom 107 }; 108 109 #define CHECK_UNSUPPORTED() \ 110 do { \ 111 if (isUnsupportedOSOrEnvironment()) \ 112 GTEST_SKIP(); \ 113 } while (0); 114 115 TEST_F(ThreadPoolTest, AsyncBarrier) { 116 CHECK_UNSUPPORTED(); 117 // test that async & barrier work together properly. 118 119 std::atomic_int checked_in{0}; 120 121 ThreadPool Pool; 122 for (size_t i = 0; i < 5; ++i) { 123 Pool.async([this, &checked_in] { 124 waitForMainThread(); 125 ++checked_in; 126 }); 127 } 128 ASSERT_EQ(0, checked_in); 129 setMainThreadReady(); 130 Pool.wait(); 131 ASSERT_EQ(5, checked_in); 132 } 133 134 static void TestFunc(std::atomic_int &checked_in, int i) { checked_in += i; } 135 136 TEST_F(ThreadPoolTest, AsyncBarrierArgs) { 137 CHECK_UNSUPPORTED(); 138 // Test that async works with a function requiring multiple parameters. 139 std::atomic_int checked_in{0}; 140 141 ThreadPool Pool; 142 for (size_t i = 0; i < 5; ++i) { 143 Pool.async(TestFunc, std::ref(checked_in), i); 144 } 145 Pool.wait(); 146 ASSERT_EQ(10, checked_in); 147 } 148 149 TEST_F(ThreadPoolTest, Async) { 150 CHECK_UNSUPPORTED(); 151 ThreadPool Pool; 152 std::atomic_int i{0}; 153 Pool.async([this, &i] { 154 waitForMainThread(); 155 ++i; 156 }); 157 Pool.async([&i] { ++i; }); 158 ASSERT_NE(2, i.load()); 159 setMainThreadReady(); 160 Pool.wait(); 161 ASSERT_EQ(2, i.load()); 162 } 163 164 TEST_F(ThreadPoolTest, GetFuture) { 165 CHECK_UNSUPPORTED(); 166 ThreadPool Pool(hardware_concurrency(2)); 167 std::atomic_int i{0}; 168 Pool.async([this, &i] { 169 waitForMainThread(); 170 ++i; 171 }); 172 // Force the future using get() 173 Pool.async([&i] { ++i; }).get(); 174 ASSERT_NE(2, i.load()); 175 setMainThreadReady(); 176 Pool.wait(); 177 ASSERT_EQ(2, i.load()); 178 } 179 180 TEST_F(ThreadPoolTest, GetFutureWithResult) { 181 CHECK_UNSUPPORTED(); 182 ThreadPool Pool(hardware_concurrency(2)); 183 auto F1 = Pool.async([] { return 1; }); 184 auto F2 = Pool.async([] { return 2; }); 185 186 setMainThreadReady(); 187 Pool.wait(); 188 ASSERT_EQ(1, F1.get()); 189 ASSERT_EQ(2, F2.get()); 190 } 191 192 TEST_F(ThreadPoolTest, GetFutureWithResultAndArgs) { 193 CHECK_UNSUPPORTED(); 194 ThreadPool Pool(hardware_concurrency(2)); 195 auto Fn = [](int x) { return x; }; 196 auto F1 = Pool.async(Fn, 1); 197 auto F2 = Pool.async(Fn, 2); 198 199 setMainThreadReady(); 200 Pool.wait(); 201 ASSERT_EQ(1, F1.get()); 202 ASSERT_EQ(2, F2.get()); 203 } 204 205 TEST_F(ThreadPoolTest, PoolDestruction) { 206 CHECK_UNSUPPORTED(); 207 // Test that we are waiting on destruction 208 std::atomic_int checked_in{0}; 209 { 210 ThreadPool Pool; 211 for (size_t i = 0; i < 5; ++i) { 212 Pool.async([this, &checked_in] { 213 waitForMainThread(); 214 ++checked_in; 215 }); 216 } 217 ASSERT_EQ(0, checked_in); 218 setMainThreadReady(); 219 } 220 ASSERT_EQ(5, checked_in); 221 } 222 223 // Check running tasks in different groups. 224 TEST_F(ThreadPoolTest, Groups) { 225 CHECK_UNSUPPORTED(); 226 // Need at least two threads, as the task in group2 227 // might block a thread until all tasks in group1 finish. 228 ThreadPoolStrategy S = hardware_concurrency(2); 229 if (S.compute_thread_count() < 2) 230 GTEST_SKIP(); 231 ThreadPool Pool(S); 232 PhaseResetHelper Helper(this); 233 ThreadPoolTaskGroup Group1(Pool); 234 ThreadPoolTaskGroup Group2(Pool); 235 236 // Check that waiting for an empty group is a no-op. 237 Group1.wait(); 238 239 std::atomic_int checked_in1{0}; 240 std::atomic_int checked_in2{0}; 241 242 for (size_t i = 0; i < 5; ++i) { 243 Group1.async([this, &checked_in1] { 244 waitForMainThread(); 245 ++checked_in1; 246 }); 247 } 248 Group2.async([this, &checked_in2] { 249 waitForPhase(2); 250 ++checked_in2; 251 }); 252 ASSERT_EQ(0, checked_in1); 253 ASSERT_EQ(0, checked_in2); 254 // Start first group and wait for it. 255 setMainThreadReady(); 256 Group1.wait(); 257 ASSERT_EQ(5, checked_in1); 258 // Second group has not yet finished, start it and wait for it. 259 ASSERT_EQ(0, checked_in2); 260 setPhase(2); 261 Group2.wait(); 262 ASSERT_EQ(5, checked_in1); 263 ASSERT_EQ(1, checked_in2); 264 } 265 266 // Check recursive tasks. 267 TEST_F(ThreadPoolTest, RecursiveGroups) { 268 CHECK_UNSUPPORTED(); 269 ThreadPool Pool; 270 ThreadPoolTaskGroup Group(Pool); 271 272 std::atomic_int checked_in1{0}; 273 274 for (size_t i = 0; i < 5; ++i) { 275 Group.async([this, &Pool, &checked_in1] { 276 waitForMainThread(); 277 278 ThreadPoolTaskGroup LocalGroup(Pool); 279 280 // Check that waiting for an empty group is a no-op. 281 LocalGroup.wait(); 282 283 std::atomic_int checked_in2{0}; 284 for (size_t i = 0; i < 5; ++i) { 285 LocalGroup.async([&checked_in2] { ++checked_in2; }); 286 } 287 LocalGroup.wait(); 288 ASSERT_EQ(5, checked_in2); 289 290 ++checked_in1; 291 }); 292 } 293 ASSERT_EQ(0, checked_in1); 294 setMainThreadReady(); 295 Group.wait(); 296 ASSERT_EQ(5, checked_in1); 297 } 298 299 TEST_F(ThreadPoolTest, RecursiveWaitDeadlock) { 300 CHECK_UNSUPPORTED(); 301 ThreadPoolStrategy S = hardware_concurrency(2); 302 if (S.compute_thread_count() < 2) 303 GTEST_SKIP(); 304 ThreadPool Pool(S); 305 PhaseResetHelper Helper(this); 306 ThreadPoolTaskGroup Group(Pool); 307 308 // Test that a thread calling wait() for a group and is waiting for more tasks 309 // returns when the last task finishes in a different thread while the waiting 310 // thread was waiting for more tasks to process while waiting. 311 312 // Task A runs in the first thread. It finishes and leaves 313 // the background thread waiting for more tasks. 314 Group.async([this] { 315 waitForMainThread(); 316 setPhase(2); 317 }); 318 // Task B is run in a second thread, it launches yet another 319 // task C in a different group, which will be handled by the waiting 320 // thread started above. 321 Group.async([this, &Pool] { 322 waitForPhase(2); 323 ThreadPoolTaskGroup LocalGroup(Pool); 324 LocalGroup.async([this] { 325 waitForPhase(3); 326 // Give the other thread enough time to check that there's no task 327 // to process and suspend waiting for a notification. This is indeed racy, 328 // but probably the best that can be done. 329 std::this_thread::sleep_for(std::chrono::milliseconds(10)); 330 }); 331 // And task B only now will wait for the tasks in the group (=task C) 332 // to finish. This test checks that it does not deadlock. If the 333 // `NotifyGroup` handling in ThreadPool::processTasks() didn't take place, 334 // this task B would be stuck waiting for tasks to arrive. 335 setPhase(3); 336 LocalGroup.wait(); 337 }); 338 setMainThreadReady(); 339 Group.wait(); 340 } 341 342 #if LLVM_ENABLE_THREADS == 1 343 344 // FIXME: Skip some tests below on non-Windows because multi-socket systems 345 // were not fully tested on Unix yet, and llvm::get_thread_affinity_mask() 346 // isn't implemented for Unix (need AffinityMask in Support/Unix/Program.inc). 347 #ifdef _WIN32 348 349 std::vector<llvm::BitVector> 350 ThreadPoolTest::RunOnAllSockets(ThreadPoolStrategy S) { 351 llvm::SetVector<llvm::BitVector> ThreadsUsed; 352 std::mutex Lock; 353 { 354 std::condition_variable AllThreads; 355 std::mutex AllThreadsLock; 356 unsigned Active = 0; 357 358 ThreadPool Pool(S); 359 for (size_t I = 0; I < S.compute_thread_count(); ++I) { 360 Pool.async([&] { 361 { 362 std::lock_guard<std::mutex> Guard(AllThreadsLock); 363 ++Active; 364 AllThreads.notify_one(); 365 } 366 waitForMainThread(); 367 std::lock_guard<std::mutex> Guard(Lock); 368 auto Mask = llvm::get_thread_affinity_mask(); 369 ThreadsUsed.insert(Mask); 370 }); 371 } 372 EXPECT_EQ(true, ThreadsUsed.empty()); 373 { 374 std::unique_lock<std::mutex> Guard(AllThreadsLock); 375 AllThreads.wait(Guard, 376 [&]() { return Active == S.compute_thread_count(); }); 377 } 378 setMainThreadReady(); 379 } 380 return ThreadsUsed.takeVector(); 381 } 382 383 TEST_F(ThreadPoolTest, AllThreads_UseAllRessources) { 384 CHECK_UNSUPPORTED(); 385 // After Windows 11, the OS is free to deploy the threads on any CPU socket. 386 // We cannot relibly ensure that all thread affinity mask are covered, 387 // therefore this test should not run. 388 if (llvm::RunningWindows11OrGreater()) 389 GTEST_SKIP(); 390 std::vector<llvm::BitVector> ThreadsUsed = RunOnAllSockets({}); 391 ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size()); 392 } 393 394 TEST_F(ThreadPoolTest, AllThreads_OneThreadPerCore) { 395 CHECK_UNSUPPORTED(); 396 // After Windows 11, the OS is free to deploy the threads on any CPU socket. 397 // We cannot relibly ensure that all thread affinity mask are covered, 398 // therefore this test should not run. 399 if (llvm::RunningWindows11OrGreater()) 400 GTEST_SKIP(); 401 std::vector<llvm::BitVector> ThreadsUsed = 402 RunOnAllSockets(llvm::heavyweight_hardware_concurrency()); 403 ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size()); 404 } 405 406 // From TestMain.cpp. 407 extern const char *TestMainArgv0; 408 409 // Just a reachable symbol to ease resolving of the executable's path. 410 static cl::opt<std::string> ThreadPoolTestStringArg1("thread-pool-string-arg1"); 411 412 #ifdef _WIN32 413 #define setenv(name, var, ignore) _putenv_s(name, var) 414 #endif 415 416 TEST_F(ThreadPoolTest, AffinityMask) { 417 CHECK_UNSUPPORTED(); 418 419 // Skip this test if less than 4 threads are available. 420 if (llvm::hardware_concurrency().compute_thread_count() < 4) 421 GTEST_SKIP(); 422 423 using namespace llvm::sys; 424 if (getenv("LLVM_THREADPOOL_AFFINITYMASK")) { 425 std::vector<llvm::BitVector> ThreadsUsed = RunOnAllSockets({}); 426 // Ensure the threads only ran on CPUs 0-3. 427 // NOTE: Don't use ASSERT* here because this runs in a subprocess, 428 // and will show up as un-executed in the parent. 429 assert(llvm::all_of(ThreadsUsed, 430 [](auto &T) { return T.getData().front() < 16UL; }) && 431 "Threads ran on more CPUs than expected! The affinity mask does not " 432 "seem to work."); 433 GTEST_SKIP(); 434 } 435 std::string Executable = 436 sys::fs::getMainExecutable(TestMainArgv0, &ThreadPoolTestStringArg1); 437 StringRef argv[] = {Executable, "--gtest_filter=ThreadPoolTest.AffinityMask"}; 438 439 // Add environment variable to the environment of the child process. 440 int Res = setenv("LLVM_THREADPOOL_AFFINITYMASK", "1", false); 441 ASSERT_EQ(Res, 0); 442 443 std::string Error; 444 bool ExecutionFailed; 445 BitVector Affinity; 446 Affinity.resize(4); 447 Affinity.set(0, 4); // Use CPUs 0,1,2,3. 448 int Ret = sys::ExecuteAndWait(Executable, argv, {}, {}, 0, 0, &Error, 449 &ExecutionFailed, nullptr, &Affinity); 450 ASSERT_EQ(0, Ret); 451 } 452 453 #endif // #ifdef _WIN32 454 #endif // #if LLVM_ENABLE_THREADS == 1 455