1 //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 9 #include "llvm/Support/Parallel.h" 10 #include "llvm/Config/llvm-config.h" 11 #include "llvm/Support/ManagedStatic.h" 12 #include "llvm/Support/Threading.h" 13 14 #include <atomic> 15 #include <future> 16 #include <stack> 17 #include <thread> 18 #include <vector> 19 20 llvm::ThreadPoolStrategy llvm::parallel::strategy; 21 thread_local unsigned llvm::parallel::threadIndex; 22 23 namespace llvm { 24 namespace parallel { 25 #if LLVM_ENABLE_THREADS 26 namespace detail { 27 28 namespace { 29 30 /// An abstract class that takes closures and runs them asynchronously. 31 class Executor { 32 public: 33 virtual ~Executor() = default; 34 virtual void add(std::function<void()> func) = 0; 35 36 static Executor *getDefaultExecutor(); 37 }; 38 39 /// An implementation of an Executor that runs closures on a thread pool 40 /// in filo order. 41 class ThreadPoolExecutor : public Executor { 42 public: 43 explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) { 44 unsigned ThreadCount = S.compute_thread_count(); 45 // Spawn all but one of the threads in another thread as spawning threads 46 // can take a while. 47 Threads.reserve(ThreadCount); 48 Threads.resize(1); 49 std::lock_guard<std::mutex> Lock(Mutex); 50 Threads[0] = std::thread([this, ThreadCount, S] { 51 for (unsigned I = 1; I < ThreadCount; ++I) { 52 Threads.emplace_back([=] { work(S, I); }); 53 if (Stop) 54 break; 55 } 56 ThreadsCreated.set_value(); 57 work(S, 0); 58 }); 59 } 60 61 void stop() { 62 { 63 std::lock_guard<std::mutex> Lock(Mutex); 64 if (Stop) 65 return; 66 Stop = true; 67 } 68 Cond.notify_all(); 69 ThreadsCreated.get_future().wait(); 70 } 71 72 ~ThreadPoolExecutor() override { 73 stop(); 74 std::thread::id CurrentThreadId = std::this_thread::get_id(); 75 for (std::thread &T : Threads) 76 if (T.get_id() == CurrentThreadId) 77 T.detach(); 78 else 79 T.join(); 80 } 81 82 struct Creator { 83 static void *call() { return new ThreadPoolExecutor(strategy); } 84 }; 85 struct Deleter { 86 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); } 87 }; 88 89 void add(std::function<void()> F) override { 90 { 91 std::lock_guard<std::mutex> Lock(Mutex); 92 WorkStack.push(std::move(F)); 93 } 94 Cond.notify_one(); 95 } 96 97 private: 98 void work(ThreadPoolStrategy S, unsigned ThreadID) { 99 threadIndex = ThreadID; 100 S.apply_thread_strategy(ThreadID); 101 while (true) { 102 std::unique_lock<std::mutex> Lock(Mutex); 103 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); }); 104 if (Stop) 105 break; 106 auto Task = std::move(WorkStack.top()); 107 WorkStack.pop(); 108 Lock.unlock(); 109 Task(); 110 } 111 } 112 113 std::atomic<bool> Stop{false}; 114 std::stack<std::function<void()>> WorkStack; 115 std::mutex Mutex; 116 std::condition_variable Cond; 117 std::promise<void> ThreadsCreated; 118 std::vector<std::thread> Threads; 119 }; 120 121 Executor *Executor::getDefaultExecutor() { 122 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via 123 // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This 124 // stops the thread pool and waits for any worker thread creation to complete 125 // but does not wait for the threads to finish. The wait for worker thread 126 // creation to complete is important as it prevents intermittent crashes on 127 // Windows due to a race condition between thread creation and process exit. 128 // 129 // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to 130 // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor 131 // destructor ensures it has been stopped and waits for worker threads to 132 // finish. The wait is important as it prevents intermittent crashes on 133 // Windows when the process is doing a full exit. 134 // 135 // The Windows crashes appear to only occur with the MSVC static runtimes and 136 // are more frequent with the debug static runtime. 137 // 138 // This also prevents intermittent deadlocks on exit with the MinGW runtime. 139 140 static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator, 141 ThreadPoolExecutor::Deleter> 142 ManagedExec; 143 static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec)); 144 return Exec.get(); 145 } 146 } // namespace 147 } // namespace detail 148 #endif 149 150 static std::atomic<int> TaskGroupInstances; 151 152 // Latch::sync() called by the dtor may cause one thread to block. If is a dead 153 // lock if all threads in the default executor are blocked. To prevent the dead 154 // lock, only allow the first TaskGroup to run tasks parallelly. In the scenario 155 // of nested parallel_for_each(), only the outermost one runs parallelly. 156 TaskGroup::TaskGroup() : Parallel(TaskGroupInstances++ == 0) {} 157 TaskGroup::~TaskGroup() { 158 // We must ensure that all the workloads have finished before decrementing the 159 // instances count. 160 L.sync(); 161 --TaskGroupInstances; 162 } 163 164 void TaskGroup::spawn(std::function<void()> F) { 165 #if LLVM_ENABLE_THREADS 166 if (Parallel) { 167 L.inc(); 168 detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] { 169 F(); 170 L.dec(); 171 }); 172 return; 173 } 174 #endif 175 F(); 176 } 177 178 void TaskGroup::execute(std::function<void()> F) { 179 if (parallel::strategy.ThreadsRequested == 1) 180 F(); 181 else 182 spawn(F); 183 } 184 } // namespace parallel 185 } // namespace llvm 186 187 void llvm::parallelFor(size_t Begin, size_t End, 188 llvm::function_ref<void(size_t)> Fn) { 189 // If we have zero or one items, then do not incur the overhead of spinning up 190 // a task group. They are surprisingly expensive, and because they do not 191 // support nested parallelism, a single entry task group can block parallel 192 // execution underneath them. 193 #if LLVM_ENABLE_THREADS 194 auto NumItems = End - Begin; 195 if (NumItems > 1 && parallel::strategy.ThreadsRequested != 1) { 196 // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling 197 // overhead on large inputs. 198 auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup; 199 if (TaskSize == 0) 200 TaskSize = 1; 201 202 parallel::TaskGroup TG; 203 for (; Begin + TaskSize < End; Begin += TaskSize) { 204 TG.spawn([=, &Fn] { 205 for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I) 206 Fn(I); 207 }); 208 } 209 for (; Begin != End; ++Begin) 210 Fn(Begin); 211 return; 212 } 213 #endif 214 215 for (; Begin != End; ++Begin) 216 Fn(Begin); 217 } 218