1 //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 9 #include "llvm/Support/Parallel.h" 10 #include "llvm/Config/llvm-config.h" 11 #include "llvm/Support/ManagedStatic.h" 12 13 #if LLVM_ENABLE_THREADS 14 15 #include "llvm/Support/Threading.h" 16 17 #include <atomic> 18 #include <future> 19 #include <stack> 20 #include <thread> 21 #include <vector> 22 23 namespace llvm { 24 namespace parallel { 25 namespace detail { 26 27 namespace { 28 29 /// An abstract class that takes closures and runs them asynchronously. 30 class Executor { 31 public: 32 virtual ~Executor() = default; 33 virtual void add(std::function<void()> func) = 0; 34 35 static Executor *getDefaultExecutor(); 36 }; 37 38 /// An implementation of an Executor that runs closures on a thread pool 39 /// in filo order. 40 class ThreadPoolExecutor : public Executor { 41 public: 42 explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) { 43 unsigned ThreadCount = S.compute_thread_count(); 44 // Spawn all but one of the threads in another thread as spawning threads 45 // can take a while. 46 Threads.reserve(ThreadCount); 47 Threads.resize(1); 48 std::lock_guard<std::mutex> Lock(Mutex); 49 Threads[0] = std::thread([this, ThreadCount, S] { 50 for (unsigned I = 1; I < ThreadCount; ++I) { 51 Threads.emplace_back([=] { work(S, I); }); 52 if (Stop) 53 break; 54 } 55 ThreadsCreated.set_value(); 56 work(S, 0); 57 }); 58 } 59 60 void stop() { 61 { 62 std::lock_guard<std::mutex> Lock(Mutex); 63 if (Stop) 64 return; 65 Stop = true; 66 } 67 Cond.notify_all(); 68 ThreadsCreated.get_future().wait(); 69 } 70 71 ~ThreadPoolExecutor() override { 72 stop(); 73 std::thread::id CurrentThreadId = std::this_thread::get_id(); 74 for (std::thread &T : Threads) 75 if (T.get_id() == CurrentThreadId) 76 T.detach(); 77 else 78 T.join(); 79 } 80 81 struct Deleter { 82 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); } 83 }; 84 85 void add(std::function<void()> F) override { 86 { 87 std::lock_guard<std::mutex> Lock(Mutex); 88 WorkStack.push(F); 89 } 90 Cond.notify_one(); 91 } 92 93 private: 94 void work(ThreadPoolStrategy S, unsigned ThreadID) { 95 S.apply_thread_strategy(ThreadID); 96 while (true) { 97 std::unique_lock<std::mutex> Lock(Mutex); 98 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); }); 99 if (Stop) 100 break; 101 auto Task = WorkStack.top(); 102 WorkStack.pop(); 103 Lock.unlock(); 104 Task(); 105 } 106 } 107 108 std::atomic<bool> Stop{false}; 109 std::stack<std::function<void()>> WorkStack; 110 std::mutex Mutex; 111 std::condition_variable Cond; 112 std::promise<void> ThreadsCreated; 113 std::vector<std::thread> Threads; 114 }; 115 116 Executor *Executor::getDefaultExecutor() { 117 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via 118 // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This 119 // stops the thread pool and waits for any worker thread creation to complete 120 // but does not wait for the threads to finish. The wait for worker thread 121 // creation to complete is important as it prevents intermittent crashes on 122 // Windows due to a race condition between thread creation and process exit. 123 // 124 // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to 125 // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor 126 // destructor ensures it has been stopped and waits for worker threads to 127 // finish. The wait is important as it prevents intermittent crashes on 128 // Windows when the process is doing a full exit. 129 // 130 // The Windows crashes appear to only occur with the MSVC static runtimes and 131 // are more frequent with the debug static runtime. 132 // 133 // This also prevents intermittent deadlocks on exit with the MinGW runtime. 134 static ManagedStatic<ThreadPoolExecutor, object_creator<ThreadPoolExecutor>, 135 ThreadPoolExecutor::Deleter> 136 ManagedExec; 137 static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec)); 138 return Exec.get(); 139 } 140 } // namespace 141 142 static std::atomic<int> TaskGroupInstances; 143 144 // Latch::sync() called by the dtor may cause one thread to block. If is a dead 145 // lock if all threads in the default executor are blocked. To prevent the dead 146 // lock, only allow the first TaskGroup to run tasks parallelly. In the scenario 147 // of nested parallel_for_each(), only the outermost one runs parallelly. 148 TaskGroup::TaskGroup() : Parallel(TaskGroupInstances++ == 0) {} 149 TaskGroup::~TaskGroup() { --TaskGroupInstances; } 150 151 void TaskGroup::spawn(std::function<void()> F) { 152 if (Parallel) { 153 L.inc(); 154 Executor::getDefaultExecutor()->add([&, F] { 155 F(); 156 L.dec(); 157 }); 158 } else { 159 F(); 160 } 161 } 162 163 } // namespace detail 164 } // namespace parallel 165 } // namespace llvm 166 #endif // LLVM_ENABLE_THREADS 167