1 //========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 9 #include "llvm/Support/ThreadPool.h" 10 #include "llvm/ADT/STLExtras.h" 11 #include "llvm/ADT/SetVector.h" 12 #include "llvm/ADT/SmallVector.h" 13 #include "llvm/Config/llvm-config.h" // for LLVM_ENABLE_THREADS 14 #include "llvm/Support/CommandLine.h" 15 #include "llvm/Support/Program.h" 16 #include "llvm/Support/TargetSelect.h" 17 #include "llvm/Support/Threading.h" 18 #include "llvm/TargetParser/Host.h" 19 #include "llvm/TargetParser/Triple.h" 20 21 #ifdef _WIN32 22 #include "llvm/Support/Windows/WindowsSupport.h" 23 #endif 24 25 #include <chrono> 26 #include <thread> 27 28 #include "gtest/gtest.h" 29 30 namespace testing { 31 namespace internal { 32 // Specialize gtest construct to provide friendlier name in the output. 33 #if LLVM_ENABLE_THREADS 34 template <> std::string GetTypeName<llvm::StdThreadPool>() { 35 return "llvm::StdThreadPool"; 36 } 37 #endif 38 template <> std::string GetTypeName<llvm::SingleThreadExecutor>() { 39 return "llvm::SingleThreadExecutor"; 40 } 41 } // namespace internal 42 } // namespace testing 43 44 using namespace llvm; 45 46 // Fixture for the unittests, allowing to *temporarily* disable the unittests 47 // on a particular platform 48 template <typename ThreadPoolImpl> class ThreadPoolTest : public testing::Test { 49 Triple Host; 50 SmallVector<Triple::ArchType, 4> UnsupportedArchs; 51 SmallVector<Triple::OSType, 4> UnsupportedOSs; 52 SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments; 53 54 protected: 55 // This is intended for platform as a temporary "XFAIL" 56 bool isUnsupportedOSOrEnvironment() { 57 Triple Host(Triple::normalize(sys::getProcessTriple())); 58 59 if (find(UnsupportedEnvironments, Host.getEnvironment()) != 60 UnsupportedEnvironments.end()) 61 return true; 62 63 if (is_contained(UnsupportedOSs, Host.getOS())) 64 return true; 65 66 if (is_contained(UnsupportedArchs, Host.getArch())) 67 return true; 68 69 return false; 70 } 71 72 ThreadPoolTest() { 73 // Add unsupported configuration here, example: 74 // UnsupportedArchs.push_back(Triple::x86_64); 75 76 // See https://llvm.org/bugs/show_bug.cgi?id=25829 77 UnsupportedArchs.push_back(Triple::ppc64le); 78 UnsupportedArchs.push_back(Triple::ppc64); 79 } 80 81 /// Make sure this thread not progress faster than the main thread. 82 void waitForMainThread() { waitForPhase(1); } 83 84 /// Set the readiness of the main thread. 85 void setMainThreadReady() { setPhase(1); } 86 87 /// Wait until given phase is set using setPhase(); first "main" phase is 1. 88 /// See also PhaseResetHelper below. 89 void waitForPhase(int Phase) { 90 std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex); 91 CurrentPhaseCondition.wait( 92 LockGuard, [&] { return CurrentPhase == Phase || CurrentPhase < 0; }); 93 } 94 /// If a thread waits on another phase, the test could bail out on a failed 95 /// assertion and ThreadPool destructor would wait() on all threads, which 96 /// would deadlock on the task waiting. Create this helper to automatically 97 /// reset the phase and unblock such threads. 98 struct PhaseResetHelper { 99 PhaseResetHelper(ThreadPoolTest *test) : test(test) {} 100 ~PhaseResetHelper() { test->setPhase(-1); } 101 ThreadPoolTest *test; 102 }; 103 104 /// Advance to the given phase. 105 void setPhase(int Phase) { 106 { 107 std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex); 108 assert(Phase == CurrentPhase + 1 || Phase < 0); 109 CurrentPhase = Phase; 110 } 111 CurrentPhaseCondition.notify_all(); 112 } 113 114 void SetUp() override { CurrentPhase = 0; } 115 116 SmallVector<llvm::BitVector, 0> RunOnAllSockets(ThreadPoolStrategy S); 117 118 std::condition_variable CurrentPhaseCondition; 119 std::mutex CurrentPhaseMutex; 120 int CurrentPhase; // -1 = error, 0 = setup, 1 = ready, 2+ = custom 121 }; 122 123 using ThreadPoolImpls = ::testing::Types< 124 #if LLVM_ENABLE_THREADS 125 StdThreadPool, 126 #endif 127 SingleThreadExecutor>; 128 129 TYPED_TEST_SUITE(ThreadPoolTest, ThreadPoolImpls, ); 130 131 #define CHECK_UNSUPPORTED() \ 132 do { \ 133 if (this->isUnsupportedOSOrEnvironment()) \ 134 GTEST_SKIP(); \ 135 } while (0); 136 137 TYPED_TEST(ThreadPoolTest, AsyncBarrier) { 138 CHECK_UNSUPPORTED(); 139 // test that async & barrier work together properly. 140 141 std::atomic_int checked_in{0}; 142 143 DefaultThreadPool Pool; 144 for (size_t i = 0; i < 5; ++i) { 145 Pool.async([this, &checked_in] { 146 this->waitForMainThread(); 147 ++checked_in; 148 }); 149 } 150 ASSERT_EQ(0, checked_in); 151 this->setMainThreadReady(); 152 Pool.wait(); 153 ASSERT_EQ(5, checked_in); 154 } 155 156 static void TestFunc(std::atomic_int &checked_in, int i) { checked_in += i; } 157 158 TYPED_TEST(ThreadPoolTest, AsyncBarrierArgs) { 159 CHECK_UNSUPPORTED(); 160 // Test that async works with a function requiring multiple parameters. 161 std::atomic_int checked_in{0}; 162 163 DefaultThreadPool Pool; 164 for (size_t i = 0; i < 5; ++i) { 165 Pool.async(TestFunc, std::ref(checked_in), i); 166 } 167 Pool.wait(); 168 ASSERT_EQ(10, checked_in); 169 } 170 171 TYPED_TEST(ThreadPoolTest, Async) { 172 CHECK_UNSUPPORTED(); 173 DefaultThreadPool Pool; 174 std::atomic_int i{0}; 175 Pool.async([this, &i] { 176 this->waitForMainThread(); 177 ++i; 178 }); 179 Pool.async([&i] { ++i; }); 180 ASSERT_NE(2, i.load()); 181 this->setMainThreadReady(); 182 Pool.wait(); 183 ASSERT_EQ(2, i.load()); 184 } 185 186 TYPED_TEST(ThreadPoolTest, GetFuture) { 187 CHECK_UNSUPPORTED(); 188 DefaultThreadPool Pool(hardware_concurrency(2)); 189 std::atomic_int i{0}; 190 Pool.async([this, &i] { 191 this->waitForMainThread(); 192 ++i; 193 }); 194 // Force the future using get() 195 Pool.async([&i] { ++i; }).get(); 196 ASSERT_NE(2, i.load()); 197 this->setMainThreadReady(); 198 Pool.wait(); 199 ASSERT_EQ(2, i.load()); 200 } 201 202 TYPED_TEST(ThreadPoolTest, GetFutureWithResult) { 203 CHECK_UNSUPPORTED(); 204 DefaultThreadPool Pool(hardware_concurrency(2)); 205 auto F1 = Pool.async([] { return 1; }); 206 auto F2 = Pool.async([] { return 2; }); 207 208 this->setMainThreadReady(); 209 Pool.wait(); 210 ASSERT_EQ(1, F1.get()); 211 ASSERT_EQ(2, F2.get()); 212 } 213 214 TYPED_TEST(ThreadPoolTest, GetFutureWithResultAndArgs) { 215 CHECK_UNSUPPORTED(); 216 DefaultThreadPool Pool(hardware_concurrency(2)); 217 auto Fn = [](int x) { return x; }; 218 auto F1 = Pool.async(Fn, 1); 219 auto F2 = Pool.async(Fn, 2); 220 221 this->setMainThreadReady(); 222 Pool.wait(); 223 ASSERT_EQ(1, F1.get()); 224 ASSERT_EQ(2, F2.get()); 225 } 226 227 TYPED_TEST(ThreadPoolTest, PoolDestruction) { 228 CHECK_UNSUPPORTED(); 229 // Test that we are waiting on destruction 230 std::atomic_int checked_in{0}; 231 { 232 DefaultThreadPool Pool; 233 for (size_t i = 0; i < 5; ++i) { 234 Pool.async([this, &checked_in] { 235 this->waitForMainThread(); 236 ++checked_in; 237 }); 238 } 239 ASSERT_EQ(0, checked_in); 240 this->setMainThreadReady(); 241 } 242 ASSERT_EQ(5, checked_in); 243 } 244 245 // Check running tasks in different groups. 246 TYPED_TEST(ThreadPoolTest, Groups) { 247 CHECK_UNSUPPORTED(); 248 // Need at least two threads, as the task in group2 249 // might block a thread until all tasks in group1 finish. 250 ThreadPoolStrategy S = hardware_concurrency(2); 251 if (S.compute_thread_count() < 2) 252 GTEST_SKIP(); 253 DefaultThreadPool Pool(S); 254 typename TestFixture::PhaseResetHelper Helper(this); 255 ThreadPoolTaskGroup Group1(Pool); 256 ThreadPoolTaskGroup Group2(Pool); 257 258 // Check that waiting for an empty group is a no-op. 259 Group1.wait(); 260 261 std::atomic_int checked_in1{0}; 262 std::atomic_int checked_in2{0}; 263 264 for (size_t i = 0; i < 5; ++i) { 265 Group1.async([this, &checked_in1] { 266 this->waitForMainThread(); 267 ++checked_in1; 268 }); 269 } 270 Group2.async([this, &checked_in2] { 271 this->waitForPhase(2); 272 ++checked_in2; 273 }); 274 ASSERT_EQ(0, checked_in1); 275 ASSERT_EQ(0, checked_in2); 276 // Start first group and wait for it. 277 this->setMainThreadReady(); 278 Group1.wait(); 279 ASSERT_EQ(5, checked_in1); 280 // Second group has not yet finished, start it and wait for it. 281 ASSERT_EQ(0, checked_in2); 282 this->setPhase(2); 283 Group2.wait(); 284 ASSERT_EQ(5, checked_in1); 285 ASSERT_EQ(1, checked_in2); 286 } 287 288 // Check recursive tasks. 289 TYPED_TEST(ThreadPoolTest, RecursiveGroups) { 290 CHECK_UNSUPPORTED(); 291 DefaultThreadPool Pool; 292 ThreadPoolTaskGroup Group(Pool); 293 294 std::atomic_int checked_in1{0}; 295 296 for (size_t i = 0; i < 5; ++i) { 297 Group.async([this, &Pool, &checked_in1] { 298 this->waitForMainThread(); 299 300 ThreadPoolTaskGroup LocalGroup(Pool); 301 302 // Check that waiting for an empty group is a no-op. 303 LocalGroup.wait(); 304 305 std::atomic_int checked_in2{0}; 306 for (size_t i = 0; i < 5; ++i) { 307 LocalGroup.async([&checked_in2] { ++checked_in2; }); 308 } 309 LocalGroup.wait(); 310 ASSERT_EQ(5, checked_in2); 311 312 ++checked_in1; 313 }); 314 } 315 ASSERT_EQ(0, checked_in1); 316 this->setMainThreadReady(); 317 Group.wait(); 318 ASSERT_EQ(5, checked_in1); 319 } 320 321 TYPED_TEST(ThreadPoolTest, RecursiveWaitDeadlock) { 322 CHECK_UNSUPPORTED(); 323 ThreadPoolStrategy S = hardware_concurrency(2); 324 if (S.compute_thread_count() < 2) 325 GTEST_SKIP(); 326 DefaultThreadPool Pool(S); 327 typename TestFixture::PhaseResetHelper Helper(this); 328 ThreadPoolTaskGroup Group(Pool); 329 330 // Test that a thread calling wait() for a group and is waiting for more tasks 331 // returns when the last task finishes in a different thread while the waiting 332 // thread was waiting for more tasks to process while waiting. 333 334 // Task A runs in the first thread. It finishes and leaves 335 // the background thread waiting for more tasks. 336 Group.async([this] { 337 this->waitForMainThread(); 338 this->setPhase(2); 339 }); 340 // Task B is run in a second thread, it launches yet another 341 // task C in a different group, which will be handled by the waiting 342 // thread started above. 343 Group.async([this, &Pool] { 344 this->waitForPhase(2); 345 ThreadPoolTaskGroup LocalGroup(Pool); 346 LocalGroup.async([this] { 347 this->waitForPhase(3); 348 // Give the other thread enough time to check that there's no task 349 // to process and suspend waiting for a notification. This is indeed racy, 350 // but probably the best that can be done. 351 std::this_thread::sleep_for(std::chrono::milliseconds(10)); 352 }); 353 // And task B only now will wait for the tasks in the group (=task C) 354 // to finish. This test checks that it does not deadlock. If the 355 // `NotifyGroup` handling in ThreadPool::processTasks() didn't take place, 356 // this task B would be stuck waiting for tasks to arrive. 357 this->setPhase(3); 358 LocalGroup.wait(); 359 }); 360 this->setMainThreadReady(); 361 Group.wait(); 362 } 363 364 #if LLVM_ENABLE_THREADS == 1 365 366 // FIXME: Skip some tests below on non-Windows because multi-socket systems 367 // were not fully tested on Unix yet, and llvm::get_thread_affinity_mask() 368 // isn't implemented for Unix (need AffinityMask in Support/Unix/Program.inc). 369 #ifdef _WIN32 370 371 template <typename ThreadPoolImpl> 372 SmallVector<llvm::BitVector, 0> 373 ThreadPoolTest<ThreadPoolImpl>::RunOnAllSockets(ThreadPoolStrategy S) { 374 llvm::SetVector<llvm::BitVector> ThreadsUsed; 375 std::mutex Lock; 376 { 377 std::condition_variable AllThreads; 378 std::mutex AllThreadsLock; 379 unsigned Active = 0; 380 381 DefaultThreadPool Pool(S); 382 for (size_t I = 0; I < S.compute_thread_count(); ++I) { 383 Pool.async([&] { 384 { 385 std::lock_guard<std::mutex> Guard(AllThreadsLock); 386 ++Active; 387 AllThreads.notify_one(); 388 } 389 this->waitForMainThread(); 390 std::lock_guard<std::mutex> Guard(Lock); 391 auto Mask = llvm::get_thread_affinity_mask(); 392 ThreadsUsed.insert(Mask); 393 }); 394 } 395 EXPECT_EQ(true, ThreadsUsed.empty()); 396 { 397 std::unique_lock<std::mutex> Guard(AllThreadsLock); 398 AllThreads.wait(Guard, 399 [&]() { return Active == S.compute_thread_count(); }); 400 } 401 this->setMainThreadReady(); 402 } 403 return ThreadsUsed.takeVector(); 404 } 405 406 TYPED_TEST(ThreadPoolTest, AllThreads_UseAllRessources) { 407 CHECK_UNSUPPORTED(); 408 // After Windows 11, the OS is free to deploy the threads on any CPU socket. 409 // We cannot relibly ensure that all thread affinity mask are covered, 410 // therefore this test should not run. 411 if (llvm::RunningWindows11OrGreater()) 412 GTEST_SKIP(); 413 auto ThreadsUsed = this->RunOnAllSockets({}); 414 ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size()); 415 } 416 417 TYPED_TEST(ThreadPoolTest, AllThreads_OneThreadPerCore) { 418 CHECK_UNSUPPORTED(); 419 // After Windows 11, the OS is free to deploy the threads on any CPU socket. 420 // We cannot relibly ensure that all thread affinity mask are covered, 421 // therefore this test should not run. 422 if (llvm::RunningWindows11OrGreater()) 423 GTEST_SKIP(); 424 auto ThreadsUsed = 425 this->RunOnAllSockets(llvm::heavyweight_hardware_concurrency()); 426 ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size()); 427 } 428 429 // From TestMain.cpp. 430 extern const char *TestMainArgv0; 431 432 // Just a reachable symbol to ease resolving of the executable's path. 433 static cl::opt<std::string> ThreadPoolTestStringArg1("thread-pool-string-arg1"); 434 435 #ifdef _WIN32 436 #define setenv(name, var, ignore) _putenv_s(name, var) 437 #endif 438 439 TYPED_TEST(ThreadPoolTest, AffinityMask) { 440 CHECK_UNSUPPORTED(); 441 442 // Skip this test if less than 4 threads are available. 443 if (llvm::hardware_concurrency().compute_thread_count() < 4) 444 GTEST_SKIP(); 445 446 using namespace llvm::sys; 447 if (getenv("LLVM_THREADPOOL_AFFINITYMASK")) { 448 auto ThreadsUsed = this->RunOnAllSockets({}); 449 // Ensure the threads only ran on CPUs 0-3. 450 // NOTE: Don't use ASSERT* here because this runs in a subprocess, 451 // and will show up as un-executed in the parent. 452 assert(llvm::all_of(ThreadsUsed, 453 [](auto &T) { return T.getData().front() < 16UL; }) && 454 "Threads ran on more CPUs than expected! The affinity mask does not " 455 "seem to work."); 456 GTEST_SKIP(); 457 } 458 std::string Executable = 459 sys::fs::getMainExecutable(TestMainArgv0, &ThreadPoolTestStringArg1); 460 StringRef argv[] = {Executable, "--gtest_filter=ThreadPoolTest.AffinityMask"}; 461 462 // Add environment variable to the environment of the child process. 463 int Res = setenv("LLVM_THREADPOOL_AFFINITYMASK", "1", false); 464 ASSERT_EQ(Res, 0); 465 466 std::string Error; 467 bool ExecutionFailed; 468 BitVector Affinity; 469 Affinity.resize(4); 470 Affinity.set(0, 4); // Use CPUs 0,1,2,3. 471 int Ret = sys::ExecuteAndWait(Executable, argv, {}, {}, 0, 0, &Error, 472 &ExecutionFailed, nullptr, &Affinity); 473 ASSERT_EQ(0, Ret); 474 } 475 476 #endif // #ifdef _WIN32 477 #endif // #if LLVM_ENABLE_THREADS == 1 478