1 //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 9 #include "llvm/Support/Parallel.h" 10 #include "llvm/Config/llvm-config.h" 11 #include "llvm/Support/ManagedStatic.h" 12 #include "llvm/Support/Threading.h" 13 14 #include <atomic> 15 #include <deque> 16 #include <future> 17 #include <thread> 18 #include <vector> 19 20 llvm::ThreadPoolStrategy llvm::parallel::strategy; 21 22 namespace llvm { 23 namespace parallel { 24 #if LLVM_ENABLE_THREADS 25 26 #ifdef _WIN32 27 static thread_local unsigned threadIndex; 28 29 unsigned getThreadIndex() { return threadIndex; } 30 #else 31 thread_local unsigned threadIndex; 32 #endif 33 34 namespace detail { 35 36 namespace { 37 38 /// An abstract class that takes closures and runs them asynchronously. 39 class Executor { 40 public: 41 virtual ~Executor() = default; 42 virtual void add(std::function<void()> func, bool Sequential = false) = 0; 43 44 static Executor *getDefaultExecutor(); 45 }; 46 47 /// An implementation of an Executor that runs closures on a thread pool 48 /// in filo order. 49 class ThreadPoolExecutor : public Executor { 50 public: 51 explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) { 52 unsigned ThreadCount = S.compute_thread_count(); 53 // Spawn all but one of the threads in another thread as spawning threads 54 // can take a while. 55 Threads.reserve(ThreadCount); 56 Threads.resize(1); 57 std::lock_guard<std::mutex> Lock(Mutex); 58 // Use operator[] before creating the thread to avoid data race in .size() 59 // in “safe libc++” mode. 60 auto &Thread0 = Threads[0]; 61 Thread0 = std::thread([this, ThreadCount, S] { 62 for (unsigned I = 1; I < ThreadCount; ++I) { 63 Threads.emplace_back([=] { work(S, I); }); 64 if (Stop) 65 break; 66 } 67 ThreadsCreated.set_value(); 68 work(S, 0); 69 }); 70 } 71 72 void stop() { 73 { 74 std::lock_guard<std::mutex> Lock(Mutex); 75 if (Stop) 76 return; 77 Stop = true; 78 } 79 Cond.notify_all(); 80 ThreadsCreated.get_future().wait(); 81 } 82 83 ~ThreadPoolExecutor() override { 84 stop(); 85 std::thread::id CurrentThreadId = std::this_thread::get_id(); 86 for (std::thread &T : Threads) 87 if (T.get_id() == CurrentThreadId) 88 T.detach(); 89 else 90 T.join(); 91 } 92 93 struct Creator { 94 static void *call() { return new ThreadPoolExecutor(strategy); } 95 }; 96 struct Deleter { 97 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); } 98 }; 99 100 void add(std::function<void()> F, bool Sequential = false) override { 101 { 102 bool UseSequentialQueue = 103 Sequential || parallel::strategy.ThreadsRequested == 1; 104 std::lock_guard<std::mutex> Lock(Mutex); 105 if (UseSequentialQueue) 106 WorkQueueSequential.emplace_front(std::move(F)); 107 else 108 WorkQueue.emplace_back(std::move(F)); 109 } 110 Cond.notify_one(); 111 } 112 113 private: 114 bool hasSequentialTasks() const { 115 return !WorkQueueSequential.empty() && !SequentialQueueIsLocked; 116 } 117 118 bool hasGeneralTasks() const { return !WorkQueue.empty(); } 119 120 void work(ThreadPoolStrategy S, unsigned ThreadID) { 121 threadIndex = ThreadID; 122 S.apply_thread_strategy(ThreadID); 123 while (true) { 124 std::unique_lock<std::mutex> Lock(Mutex); 125 Cond.wait(Lock, [&] { 126 return Stop || hasGeneralTasks() || hasSequentialTasks(); 127 }); 128 if (Stop) 129 break; 130 bool Sequential = hasSequentialTasks(); 131 if (Sequential) 132 SequentialQueueIsLocked = true; 133 else 134 assert(hasGeneralTasks()); 135 136 auto &Queue = Sequential ? WorkQueueSequential : WorkQueue; 137 auto Task = std::move(Queue.back()); 138 Queue.pop_back(); 139 Lock.unlock(); 140 Task(); 141 if (Sequential) 142 SequentialQueueIsLocked = false; 143 } 144 } 145 146 std::atomic<bool> Stop{false}; 147 std::atomic<bool> SequentialQueueIsLocked{false}; 148 std::deque<std::function<void()>> WorkQueue; 149 std::deque<std::function<void()>> WorkQueueSequential; 150 std::mutex Mutex; 151 std::condition_variable Cond; 152 std::promise<void> ThreadsCreated; 153 std::vector<std::thread> Threads; 154 }; 155 156 Executor *Executor::getDefaultExecutor() { 157 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via 158 // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This 159 // stops the thread pool and waits for any worker thread creation to complete 160 // but does not wait for the threads to finish. The wait for worker thread 161 // creation to complete is important as it prevents intermittent crashes on 162 // Windows due to a race condition between thread creation and process exit. 163 // 164 // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to 165 // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor 166 // destructor ensures it has been stopped and waits for worker threads to 167 // finish. The wait is important as it prevents intermittent crashes on 168 // Windows when the process is doing a full exit. 169 // 170 // The Windows crashes appear to only occur with the MSVC static runtimes and 171 // are more frequent with the debug static runtime. 172 // 173 // This also prevents intermittent deadlocks on exit with the MinGW runtime. 174 175 static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator, 176 ThreadPoolExecutor::Deleter> 177 ManagedExec; 178 static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec)); 179 return Exec.get(); 180 } 181 } // namespace 182 } // namespace detail 183 #endif 184 185 static std::atomic<int> TaskGroupInstances; 186 187 // Latch::sync() called by the dtor may cause one thread to block. If is a dead 188 // lock if all threads in the default executor are blocked. To prevent the dead 189 // lock, only allow the first TaskGroup to run tasks parallelly. In the scenario 190 // of nested parallel_for_each(), only the outermost one runs parallelly. 191 TaskGroup::TaskGroup() : Parallel(TaskGroupInstances++ == 0) {} 192 TaskGroup::~TaskGroup() { 193 // We must ensure that all the workloads have finished before decrementing the 194 // instances count. 195 L.sync(); 196 --TaskGroupInstances; 197 } 198 199 void TaskGroup::spawn(std::function<void()> F, bool Sequential) { 200 #if LLVM_ENABLE_THREADS 201 if (Parallel) { 202 L.inc(); 203 detail::Executor::getDefaultExecutor()->add( 204 [&, F = std::move(F)] { 205 F(); 206 L.dec(); 207 }, 208 Sequential); 209 return; 210 } 211 #endif 212 F(); 213 } 214 215 } // namespace parallel 216 } // namespace llvm 217 218 void llvm::parallelFor(size_t Begin, size_t End, 219 llvm::function_ref<void(size_t)> Fn) { 220 // If we have zero or one items, then do not incur the overhead of spinning up 221 // a task group. They are surprisingly expensive, and because they do not 222 // support nested parallelism, a single entry task group can block parallel 223 // execution underneath them. 224 #if LLVM_ENABLE_THREADS 225 auto NumItems = End - Begin; 226 if (NumItems > 1 && parallel::strategy.ThreadsRequested != 1) { 227 // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling 228 // overhead on large inputs. 229 auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup; 230 if (TaskSize == 0) 231 TaskSize = 1; 232 233 parallel::TaskGroup TG; 234 for (; Begin + TaskSize < End; Begin += TaskSize) { 235 TG.spawn([=, &Fn] { 236 for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I) 237 Fn(I); 238 }); 239 } 240 if (Begin != End) { 241 TG.spawn([=, &Fn] { 242 for (size_t I = Begin; I != End; ++I) 243 Fn(I); 244 }); 245 } 246 return; 247 } 248 #endif 249 250 for (; Begin != End; ++Begin) 251 Fn(Begin); 252 } 253