1 //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 9 #include "llvm/Support/Parallel.h" 10 #include "llvm/Config/llvm-config.h" 11 #include "llvm/Support/ManagedStatic.h" 12 #include "llvm/Support/Threading.h" 13 14 #include <atomic> 15 #include <future> 16 #include <thread> 17 #include <vector> 18 19 llvm::ThreadPoolStrategy llvm::parallel::strategy; 20 21 namespace llvm { 22 namespace parallel { 23 #if LLVM_ENABLE_THREADS 24 25 #ifdef _WIN32 26 static thread_local unsigned threadIndex = UINT_MAX; 27 28 unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; } 29 #else 30 thread_local unsigned threadIndex = UINT_MAX; 31 #endif 32 33 namespace detail { 34 35 namespace { 36 37 /// An abstract class that takes closures and runs them asynchronously. 38 class Executor { 39 public: 40 virtual ~Executor() = default; 41 virtual void add(std::function<void()> func) = 0; 42 virtual size_t getThreadCount() const = 0; 43 44 static Executor *getDefaultExecutor(); 45 }; 46 47 /// An implementation of an Executor that runs closures on a thread pool 48 /// in filo order. 49 class ThreadPoolExecutor : public Executor { 50 public: 51 explicit ThreadPoolExecutor(ThreadPoolStrategy S) { 52 ThreadCount = S.compute_thread_count(); 53 // Spawn all but one of the threads in another thread as spawning threads 54 // can take a while. 55 Threads.reserve(ThreadCount); 56 Threads.resize(1); 57 std::lock_guard<std::mutex> Lock(Mutex); 58 // Use operator[] before creating the thread to avoid data race in .size() 59 // in 'safe libc++' mode. 60 auto &Thread0 = Threads[0]; 61 Thread0 = std::thread([this, S] { 62 for (unsigned I = 1; I < ThreadCount; ++I) { 63 Threads.emplace_back([=] { work(S, I); }); 64 if (Stop) 65 break; 66 } 67 ThreadsCreated.set_value(); 68 work(S, 0); 69 }); 70 } 71 72 void stop() { 73 { 74 std::lock_guard<std::mutex> Lock(Mutex); 75 if (Stop) 76 return; 77 Stop = true; 78 } 79 Cond.notify_all(); 80 ThreadsCreated.get_future().wait(); 81 } 82 83 ~ThreadPoolExecutor() override { 84 stop(); 85 std::thread::id CurrentThreadId = std::this_thread::get_id(); 86 for (std::thread &T : Threads) 87 if (T.get_id() == CurrentThreadId) 88 T.detach(); 89 else 90 T.join(); 91 } 92 93 struct Creator { 94 static void *call() { return new ThreadPoolExecutor(strategy); } 95 }; 96 struct Deleter { 97 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); } 98 }; 99 100 void add(std::function<void()> F) override { 101 { 102 std::lock_guard<std::mutex> Lock(Mutex); 103 WorkStack.push_back(std::move(F)); 104 } 105 Cond.notify_one(); 106 } 107 108 size_t getThreadCount() const override { return ThreadCount; } 109 110 private: 111 void work(ThreadPoolStrategy S, unsigned ThreadID) { 112 threadIndex = ThreadID; 113 S.apply_thread_strategy(ThreadID); 114 while (true) { 115 std::unique_lock<std::mutex> Lock(Mutex); 116 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); }); 117 if (Stop) 118 break; 119 auto Task = std::move(WorkStack.back()); 120 WorkStack.pop_back(); 121 Lock.unlock(); 122 Task(); 123 } 124 } 125 126 std::atomic<bool> Stop{false}; 127 std::vector<std::function<void()>> WorkStack; 128 std::mutex Mutex; 129 std::condition_variable Cond; 130 std::promise<void> ThreadsCreated; 131 std::vector<std::thread> Threads; 132 unsigned ThreadCount; 133 }; 134 135 Executor *Executor::getDefaultExecutor() { 136 #ifdef _WIN32 137 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via 138 // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This 139 // stops the thread pool and waits for any worker thread creation to complete 140 // but does not wait for the threads to finish. The wait for worker thread 141 // creation to complete is important as it prevents intermittent crashes on 142 // Windows due to a race condition between thread creation and process exit. 143 // 144 // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to 145 // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor 146 // destructor ensures it has been stopped and waits for worker threads to 147 // finish. The wait is important as it prevents intermittent crashes on 148 // Windows when the process is doing a full exit. 149 // 150 // The Windows crashes appear to only occur with the MSVC static runtimes and 151 // are more frequent with the debug static runtime. 152 // 153 // This also prevents intermittent deadlocks on exit with the MinGW runtime. 154 155 static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator, 156 ThreadPoolExecutor::Deleter> 157 ManagedExec; 158 static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec)); 159 return Exec.get(); 160 #else 161 // ManagedStatic is not desired on other platforms. When `Exec` is destroyed 162 // by llvm_shutdown(), worker threads will clean up and invoke TLS 163 // destructors. This can lead to race conditions if other threads attempt to 164 // access TLS objects that have already been destroyed. 165 static ThreadPoolExecutor Exec(strategy); 166 return &Exec; 167 #endif 168 } 169 } // namespace 170 } // namespace detail 171 172 size_t getThreadCount() { 173 return detail::Executor::getDefaultExecutor()->getThreadCount(); 174 } 175 #endif 176 177 // Latch::sync() called by the dtor may cause one thread to block. If is a dead 178 // lock if all threads in the default executor are blocked. To prevent the dead 179 // lock, only allow the root TaskGroup to run tasks parallelly. In the scenario 180 // of nested parallel_for_each(), only the outermost one runs parallelly. 181 TaskGroup::TaskGroup() 182 #if LLVM_ENABLE_THREADS 183 : Parallel((parallel::strategy.ThreadsRequested != 1) && 184 (threadIndex == UINT_MAX)) {} 185 #else 186 : Parallel(false) {} 187 #endif 188 TaskGroup::~TaskGroup() { 189 // We must ensure that all the workloads have finished before decrementing the 190 // instances count. 191 L.sync(); 192 } 193 194 void TaskGroup::spawn(std::function<void()> F) { 195 #if LLVM_ENABLE_THREADS 196 if (Parallel) { 197 L.inc(); 198 detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] { 199 F(); 200 L.dec(); 201 }); 202 return; 203 } 204 #endif 205 F(); 206 } 207 208 } // namespace parallel 209 } // namespace llvm 210 211 void llvm::parallelFor(size_t Begin, size_t End, 212 llvm::function_ref<void(size_t)> Fn) { 213 #if LLVM_ENABLE_THREADS 214 if (parallel::strategy.ThreadsRequested != 1) { 215 auto NumItems = End - Begin; 216 // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling 217 // overhead on large inputs. 218 auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup; 219 if (TaskSize == 0) 220 TaskSize = 1; 221 222 parallel::TaskGroup TG; 223 for (; Begin + TaskSize < End; Begin += TaskSize) { 224 TG.spawn([=, &Fn] { 225 for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I) 226 Fn(I); 227 }); 228 } 229 if (Begin != End) { 230 TG.spawn([=, &Fn] { 231 for (size_t I = Begin; I != End; ++I) 232 Fn(I); 233 }); 234 } 235 return; 236 } 237 #endif 238 239 for (; Begin != End; ++Begin) 240 Fn(Begin); 241 } 242