1 //========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 9 #include "llvm/Support/ThreadPool.h" 10 11 #include "llvm/ADT/STLExtras.h" 12 #include "llvm/ADT/SetVector.h" 13 #include "llvm/ADT/SmallVector.h" 14 #include "llvm/ADT/Triple.h" 15 #include "llvm/Support/CommandLine.h" 16 #include "llvm/Support/Host.h" 17 #include "llvm/Support/Program.h" 18 #include "llvm/Support/TargetSelect.h" 19 #include "llvm/Support/Threading.h" 20 21 #include "gtest/gtest.h" 22 23 using namespace llvm; 24 25 // Fixture for the unittests, allowing to *temporarily* disable the unittests 26 // on a particular platform 27 class ThreadPoolTest : public testing::Test { 28 Triple Host; 29 SmallVector<Triple::ArchType, 4> UnsupportedArchs; 30 SmallVector<Triple::OSType, 4> UnsupportedOSs; 31 SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments; 32 protected: 33 // This is intended for platform as a temporary "XFAIL" 34 bool isUnsupportedOSOrEnvironment() { 35 Triple Host(Triple::normalize(sys::getProcessTriple())); 36 37 if (find(UnsupportedEnvironments, Host.getEnvironment()) != 38 UnsupportedEnvironments.end()) 39 return true; 40 41 if (is_contained(UnsupportedOSs, Host.getOS())) 42 return true; 43 44 if (is_contained(UnsupportedArchs, Host.getArch())) 45 return true; 46 47 return false; 48 } 49 50 ThreadPoolTest() { 51 // Add unsupported configuration here, example: 52 // UnsupportedArchs.push_back(Triple::x86_64); 53 54 // See https://llvm.org/bugs/show_bug.cgi?id=25829 55 UnsupportedArchs.push_back(Triple::ppc64le); 56 UnsupportedArchs.push_back(Triple::ppc64); 57 } 58 59 /// Make sure this thread not progress faster than the main thread. 60 void waitForMainThread() { 61 std::unique_lock<std::mutex> LockGuard(WaitMainThreadMutex); 62 WaitMainThread.wait(LockGuard, [&] { return MainThreadReady; }); 63 } 64 65 /// Set the readiness of the main thread. 66 void setMainThreadReady() { 67 { 68 std::unique_lock<std::mutex> LockGuard(WaitMainThreadMutex); 69 MainThreadReady = true; 70 } 71 WaitMainThread.notify_all(); 72 } 73 74 void SetUp() override { MainThreadReady = false; } 75 76 std::vector<llvm::BitVector> RunOnAllSockets(ThreadPoolStrategy S); 77 78 std::condition_variable WaitMainThread; 79 std::mutex WaitMainThreadMutex; 80 bool MainThreadReady = false; 81 }; 82 83 #define CHECK_UNSUPPORTED() \ 84 do { \ 85 if (isUnsupportedOSOrEnvironment()) \ 86 return; \ 87 } while (0); 88 89 TEST_F(ThreadPoolTest, AsyncBarrier) { 90 CHECK_UNSUPPORTED(); 91 // test that async & barrier work together properly. 92 93 std::atomic_int checked_in{0}; 94 95 ThreadPool Pool; 96 for (size_t i = 0; i < 5; ++i) { 97 Pool.async([this, &checked_in] { 98 waitForMainThread(); 99 ++checked_in; 100 }); 101 } 102 ASSERT_EQ(0, checked_in); 103 setMainThreadReady(); 104 Pool.wait(); 105 ASSERT_EQ(5, checked_in); 106 } 107 108 static void TestFunc(std::atomic_int &checked_in, int i) { checked_in += i; } 109 110 TEST_F(ThreadPoolTest, AsyncBarrierArgs) { 111 CHECK_UNSUPPORTED(); 112 // Test that async works with a function requiring multiple parameters. 113 std::atomic_int checked_in{0}; 114 115 ThreadPool Pool; 116 for (size_t i = 0; i < 5; ++i) { 117 Pool.async(TestFunc, std::ref(checked_in), i); 118 } 119 Pool.wait(); 120 ASSERT_EQ(10, checked_in); 121 } 122 123 TEST_F(ThreadPoolTest, Async) { 124 CHECK_UNSUPPORTED(); 125 ThreadPool Pool; 126 std::atomic_int i{0}; 127 Pool.async([this, &i] { 128 waitForMainThread(); 129 ++i; 130 }); 131 Pool.async([&i] { ++i; }); 132 ASSERT_NE(2, i.load()); 133 setMainThreadReady(); 134 Pool.wait(); 135 ASSERT_EQ(2, i.load()); 136 } 137 138 TEST_F(ThreadPoolTest, GetFuture) { 139 CHECK_UNSUPPORTED(); 140 ThreadPool Pool(hardware_concurrency(2)); 141 std::atomic_int i{0}; 142 Pool.async([this, &i] { 143 waitForMainThread(); 144 ++i; 145 }); 146 // Force the future using get() 147 Pool.async([&i] { ++i; }).get(); 148 ASSERT_NE(2, i.load()); 149 setMainThreadReady(); 150 Pool.wait(); 151 ASSERT_EQ(2, i.load()); 152 } 153 154 TEST_F(ThreadPoolTest, GetFutureWithResult) { 155 CHECK_UNSUPPORTED(); 156 ThreadPool Pool(hardware_concurrency(2)); 157 auto F1 = Pool.async([] { return 1; }); 158 auto F2 = Pool.async([] { return 2; }); 159 160 setMainThreadReady(); 161 Pool.wait(); 162 ASSERT_EQ(1, F1.get()); 163 ASSERT_EQ(2, F2.get()); 164 } 165 166 TEST_F(ThreadPoolTest, GetFutureWithResultAndArgs) { 167 CHECK_UNSUPPORTED(); 168 ThreadPool Pool(hardware_concurrency(2)); 169 auto Fn = [](int x) { return x; }; 170 auto F1 = Pool.async(Fn, 1); 171 auto F2 = Pool.async(Fn, 2); 172 173 setMainThreadReady(); 174 Pool.wait(); 175 ASSERT_EQ(1, F1.get()); 176 ASSERT_EQ(2, F2.get()); 177 } 178 179 TEST_F(ThreadPoolTest, PoolDestruction) { 180 CHECK_UNSUPPORTED(); 181 // Test that we are waiting on destruction 182 std::atomic_int checked_in{0}; 183 { 184 ThreadPool Pool; 185 for (size_t i = 0; i < 5; ++i) { 186 Pool.async([this, &checked_in] { 187 waitForMainThread(); 188 ++checked_in; 189 }); 190 } 191 ASSERT_EQ(0, checked_in); 192 setMainThreadReady(); 193 } 194 ASSERT_EQ(5, checked_in); 195 } 196 197 #if LLVM_ENABLE_THREADS == 1 198 199 // FIXME: Skip some tests below on non-Windows because multi-socket systems 200 // were not fully tested on Unix yet, and llvm::get_thread_affinity_mask() 201 // isn't implemented for Unix (need AffinityMask in Support/Unix/Program.inc). 202 #ifdef _WIN32 203 204 std::vector<llvm::BitVector> 205 ThreadPoolTest::RunOnAllSockets(ThreadPoolStrategy S) { 206 llvm::SetVector<llvm::BitVector> ThreadsUsed; 207 std::mutex Lock; 208 { 209 std::condition_variable AllThreads; 210 std::mutex AllThreadsLock; 211 unsigned Active = 0; 212 213 ThreadPool Pool(S); 214 for (size_t I = 0; I < S.compute_thread_count(); ++I) { 215 Pool.async([&] { 216 { 217 std::lock_guard<std::mutex> Guard(AllThreadsLock); 218 ++Active; 219 AllThreads.notify_one(); 220 } 221 waitForMainThread(); 222 std::lock_guard<std::mutex> Guard(Lock); 223 auto Mask = llvm::get_thread_affinity_mask(); 224 ThreadsUsed.insert(Mask); 225 }); 226 } 227 EXPECT_EQ(true, ThreadsUsed.empty()); 228 { 229 std::unique_lock<std::mutex> Guard(AllThreadsLock); 230 AllThreads.wait(Guard, 231 [&]() { return Active == S.compute_thread_count(); }); 232 } 233 setMainThreadReady(); 234 } 235 return ThreadsUsed.takeVector(); 236 } 237 238 TEST_F(ThreadPoolTest, AllThreads_UseAllRessources) { 239 CHECK_UNSUPPORTED(); 240 std::vector<llvm::BitVector> ThreadsUsed = RunOnAllSockets({}); 241 ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size()); 242 } 243 244 TEST_F(ThreadPoolTest, AllThreads_OneThreadPerCore) { 245 CHECK_UNSUPPORTED(); 246 std::vector<llvm::BitVector> ThreadsUsed = 247 RunOnAllSockets(llvm::heavyweight_hardware_concurrency()); 248 ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size()); 249 } 250 251 // From TestMain.cpp. 252 extern const char *TestMainArgv0; 253 254 // Just a reachable symbol to ease resolving of the executable's path. 255 static cl::opt<std::string> ThreadPoolTestStringArg1("thread-pool-string-arg1"); 256 257 #ifdef _WIN32 258 #define setenv(name, var, ignore) _putenv_s(name, var) 259 #endif 260 261 TEST_F(ThreadPoolTest, AffinityMask) { 262 CHECK_UNSUPPORTED(); 263 264 // Skip this test if less than 4 threads are available. 265 if (llvm::hardware_concurrency().compute_thread_count() < 4) 266 return; 267 268 using namespace llvm::sys; 269 if (getenv("LLVM_THREADPOOL_AFFINITYMASK")) { 270 std::vector<llvm::BitVector> ThreadsUsed = RunOnAllSockets({}); 271 // Ensure the threads only ran on CPUs 0-3. 272 // NOTE: Don't use ASSERT* here because this runs in a subprocess, 273 // and will show up as un-executed in the parent. 274 assert(llvm::all_of(ThreadsUsed, 275 [](auto &T) { return T.getData().front() < 16UL; }) && 276 "Threads ran on more CPUs than expected! The affinity mask does not " 277 "seem to work."); 278 return; 279 } 280 std::string Executable = 281 sys::fs::getMainExecutable(TestMainArgv0, &ThreadPoolTestStringArg1); 282 StringRef argv[] = {Executable, "--gtest_filter=ThreadPoolTest.AffinityMask"}; 283 284 // Add environment variable to the environment of the child process. 285 int Res = setenv("LLVM_THREADPOOL_AFFINITYMASK", "1", false); 286 ASSERT_EQ(Res, 0); 287 288 std::string Error; 289 bool ExecutionFailed; 290 BitVector Affinity; 291 Affinity.resize(4); 292 Affinity.set(0, 4); // Use CPUs 0,1,2,3. 293 int Ret = sys::ExecuteAndWait(Executable, argv, {}, {}, 0, 0, &Error, 294 &ExecutionFailed, nullptr, &Affinity); 295 ASSERT_EQ(0, Ret); 296 } 297 298 #endif // #ifdef _WIN32 299 #endif // #if LLVM_ENABLE_THREADS == 1 300