10b57cec5SDimitry Andric //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===// 20b57cec5SDimitry Andric // 30b57cec5SDimitry Andric // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 40b57cec5SDimitry Andric // See https://llvm.org/LICENSE.txt for license information. 50b57cec5SDimitry Andric // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 60b57cec5SDimitry Andric // 70b57cec5SDimitry Andric //===----------------------------------------------------------------------===// 80b57cec5SDimitry Andric 90b57cec5SDimitry Andric #include "llvm/Support/Parallel.h" 100b57cec5SDimitry Andric #include "llvm/Config/llvm-config.h" 11480093f4SDimitry Andric #include "llvm/Support/ManagedStatic.h" 120b57cec5SDimitry Andric #include "llvm/Support/Threading.h" 130b57cec5SDimitry Andric 140b57cec5SDimitry Andric #include <atomic> 15480093f4SDimitry Andric #include <future> 160b57cec5SDimitry Andric #include <thread> 17480093f4SDimitry Andric #include <vector> 180b57cec5SDimitry Andric 195ffd83dbSDimitry Andric llvm::ThreadPoolStrategy llvm::parallel::strategy; 205ffd83dbSDimitry Andric 210b57cec5SDimitry Andric namespace llvm { 220b57cec5SDimitry Andric namespace parallel { 23bdd1243dSDimitry Andric #if LLVM_ENABLE_THREADS 24bdd1243dSDimitry Andric 25bdd1243dSDimitry Andric #ifdef _WIN32 2606c3fb27SDimitry Andric static thread_local unsigned threadIndex = UINT_MAX; 27bdd1243dSDimitry Andric 2806c3fb27SDimitry Andric unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; } 29bdd1243dSDimitry Andric #else 3006c3fb27SDimitry Andric thread_local unsigned threadIndex = UINT_MAX; 31bdd1243dSDimitry Andric #endif 32bdd1243dSDimitry Andric 330b57cec5SDimitry Andric namespace detail { 340b57cec5SDimitry Andric 350b57cec5SDimitry Andric namespace { 360b57cec5SDimitry Andric 370b57cec5SDimitry Andric /// An abstract class that takes closures and runs them asynchronously. 380b57cec5SDimitry Andric class Executor { 390b57cec5SDimitry Andric public: 400b57cec5SDimitry Andric virtual ~Executor() = default; 4154521a2fSDimitry Andric virtual void add(std::function<void()> func) = 0; 4206c3fb27SDimitry Andric virtual size_t getThreadCount() const = 0; 430b57cec5SDimitry Andric 440b57cec5SDimitry Andric static Executor *getDefaultExecutor(); 450b57cec5SDimitry Andric }; 460b57cec5SDimitry Andric 470b57cec5SDimitry Andric /// An implementation of an Executor that runs closures on a thread pool 480b57cec5SDimitry Andric /// in filo order. 490b57cec5SDimitry Andric class ThreadPoolExecutor : public Executor { 500b57cec5SDimitry Andric public: 515ffd83dbSDimitry Andric explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) { 5206c3fb27SDimitry Andric ThreadCount = S.compute_thread_count(); 530b57cec5SDimitry Andric // Spawn all but one of the threads in another thread as spawning threads 540b57cec5SDimitry Andric // can take a while. 55480093f4SDimitry Andric Threads.reserve(ThreadCount); 56480093f4SDimitry Andric Threads.resize(1); 57480093f4SDimitry Andric std::lock_guard<std::mutex> Lock(Mutex); 5806c3fb27SDimitry Andric // Use operator[] before creating the thread to avoid data race in .size() 59*0fca6ea1SDimitry Andric // in 'safe libc++' mode. 6006c3fb27SDimitry Andric auto &Thread0 = Threads[0]; 6106c3fb27SDimitry Andric Thread0 = std::thread([this, S] { 625ffd83dbSDimitry Andric for (unsigned I = 1; I < ThreadCount; ++I) { 635ffd83dbSDimitry Andric Threads.emplace_back([=] { work(S, I); }); 64480093f4SDimitry Andric if (Stop) 65480093f4SDimitry Andric break; 660b57cec5SDimitry Andric } 67480093f4SDimitry Andric ThreadsCreated.set_value(); 685ffd83dbSDimitry Andric work(S, 0); 69480093f4SDimitry Andric }); 70480093f4SDimitry Andric } 71480093f4SDimitry Andric 72480093f4SDimitry Andric void stop() { 73480093f4SDimitry Andric { 74480093f4SDimitry Andric std::lock_guard<std::mutex> Lock(Mutex); 75480093f4SDimitry Andric if (Stop) 76480093f4SDimitry Andric return; 77480093f4SDimitry Andric Stop = true; 78480093f4SDimitry Andric } 79480093f4SDimitry Andric Cond.notify_all(); 80480093f4SDimitry Andric ThreadsCreated.get_future().wait(); 810b57cec5SDimitry Andric } 820b57cec5SDimitry Andric 830b57cec5SDimitry Andric ~ThreadPoolExecutor() override { 84480093f4SDimitry Andric stop(); 85480093f4SDimitry Andric std::thread::id CurrentThreadId = std::this_thread::get_id(); 86480093f4SDimitry Andric for (std::thread &T : Threads) 87480093f4SDimitry Andric if (T.get_id() == CurrentThreadId) 88480093f4SDimitry Andric T.detach(); 89480093f4SDimitry Andric else 90480093f4SDimitry Andric T.join(); 910b57cec5SDimitry Andric } 920b57cec5SDimitry Andric 935ffd83dbSDimitry Andric struct Creator { 945ffd83dbSDimitry Andric static void *call() { return new ThreadPoolExecutor(strategy); } 955ffd83dbSDimitry Andric }; 96480093f4SDimitry Andric struct Deleter { 97480093f4SDimitry Andric static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); } 98480093f4SDimitry Andric }; 99480093f4SDimitry Andric 10054521a2fSDimitry Andric void add(std::function<void()> F) override { 101480093f4SDimitry Andric { 102480093f4SDimitry Andric std::lock_guard<std::mutex> Lock(Mutex); 10354521a2fSDimitry Andric WorkStack.push_back(std::move(F)); 104480093f4SDimitry Andric } 1050b57cec5SDimitry Andric Cond.notify_one(); 1060b57cec5SDimitry Andric } 1070b57cec5SDimitry Andric 10806c3fb27SDimitry Andric size_t getThreadCount() const override { return ThreadCount; } 10906c3fb27SDimitry Andric 1100b57cec5SDimitry Andric private: 1115ffd83dbSDimitry Andric void work(ThreadPoolStrategy S, unsigned ThreadID) { 112bdd1243dSDimitry Andric threadIndex = ThreadID; 1135ffd83dbSDimitry Andric S.apply_thread_strategy(ThreadID); 1140b57cec5SDimitry Andric while (true) { 1150b57cec5SDimitry Andric std::unique_lock<std::mutex> Lock(Mutex); 11654521a2fSDimitry Andric Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); }); 1170b57cec5SDimitry Andric if (Stop) 1180b57cec5SDimitry Andric break; 11954521a2fSDimitry Andric auto Task = std::move(WorkStack.back()); 12054521a2fSDimitry Andric WorkStack.pop_back(); 1210b57cec5SDimitry Andric Lock.unlock(); 1220b57cec5SDimitry Andric Task(); 1230b57cec5SDimitry Andric } 1240b57cec5SDimitry Andric } 1250b57cec5SDimitry Andric 1260b57cec5SDimitry Andric std::atomic<bool> Stop{false}; 12754521a2fSDimitry Andric std::vector<std::function<void()>> WorkStack; 1280b57cec5SDimitry Andric std::mutex Mutex; 1290b57cec5SDimitry Andric std::condition_variable Cond; 130480093f4SDimitry Andric std::promise<void> ThreadsCreated; 131480093f4SDimitry Andric std::vector<std::thread> Threads; 13206c3fb27SDimitry Andric unsigned ThreadCount; 1330b57cec5SDimitry Andric }; 1340b57cec5SDimitry Andric 1350b57cec5SDimitry Andric Executor *Executor::getDefaultExecutor() { 136480093f4SDimitry Andric // The ManagedStatic enables the ThreadPoolExecutor to be stopped via 137480093f4SDimitry Andric // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This 138480093f4SDimitry Andric // stops the thread pool and waits for any worker thread creation to complete 139480093f4SDimitry Andric // but does not wait for the threads to finish. The wait for worker thread 140480093f4SDimitry Andric // creation to complete is important as it prevents intermittent crashes on 141480093f4SDimitry Andric // Windows due to a race condition between thread creation and process exit. 142480093f4SDimitry Andric // 143480093f4SDimitry Andric // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to 144480093f4SDimitry Andric // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor 145480093f4SDimitry Andric // destructor ensures it has been stopped and waits for worker threads to 146480093f4SDimitry Andric // finish. The wait is important as it prevents intermittent crashes on 147480093f4SDimitry Andric // Windows when the process is doing a full exit. 148480093f4SDimitry Andric // 149480093f4SDimitry Andric // The Windows crashes appear to only occur with the MSVC static runtimes and 150480093f4SDimitry Andric // are more frequent with the debug static runtime. 151480093f4SDimitry Andric // 152480093f4SDimitry Andric // This also prevents intermittent deadlocks on exit with the MinGW runtime. 1535ffd83dbSDimitry Andric 1545ffd83dbSDimitry Andric static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator, 155480093f4SDimitry Andric ThreadPoolExecutor::Deleter> 156480093f4SDimitry Andric ManagedExec; 157480093f4SDimitry Andric static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec)); 158480093f4SDimitry Andric return Exec.get(); 1590b57cec5SDimitry Andric } 1608bcb0991SDimitry Andric } // namespace 161bdd1243dSDimitry Andric } // namespace detail 1620b57cec5SDimitry Andric 16306c3fb27SDimitry Andric size_t getThreadCount() { 16406c3fb27SDimitry Andric return detail::Executor::getDefaultExecutor()->getThreadCount(); 16506c3fb27SDimitry Andric } 16606c3fb27SDimitry Andric #endif 1670b57cec5SDimitry Andric 1680b57cec5SDimitry Andric // Latch::sync() called by the dtor may cause one thread to block. If is a dead 1690b57cec5SDimitry Andric // lock if all threads in the default executor are blocked. To prevent the dead 17006c3fb27SDimitry Andric // lock, only allow the root TaskGroup to run tasks parallelly. In the scenario 1710b57cec5SDimitry Andric // of nested parallel_for_each(), only the outermost one runs parallelly. 17206c3fb27SDimitry Andric TaskGroup::TaskGroup() 17306c3fb27SDimitry Andric #if LLVM_ENABLE_THREADS 17406c3fb27SDimitry Andric : Parallel((parallel::strategy.ThreadsRequested != 1) && 17506c3fb27SDimitry Andric (threadIndex == UINT_MAX)) {} 17606c3fb27SDimitry Andric #else 17706c3fb27SDimitry Andric : Parallel(false) {} 17806c3fb27SDimitry Andric #endif 179349cc55cSDimitry Andric TaskGroup::~TaskGroup() { 180349cc55cSDimitry Andric // We must ensure that all the workloads have finished before decrementing the 181349cc55cSDimitry Andric // instances count. 182349cc55cSDimitry Andric L.sync(); 183349cc55cSDimitry Andric } 1840b57cec5SDimitry Andric 18554521a2fSDimitry Andric void TaskGroup::spawn(std::function<void()> F) { 186bdd1243dSDimitry Andric #if LLVM_ENABLE_THREADS 1870b57cec5SDimitry Andric if (Parallel) { 1880b57cec5SDimitry Andric L.inc(); 18954521a2fSDimitry Andric detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] { 1900b57cec5SDimitry Andric F(); 1910b57cec5SDimitry Andric L.dec(); 19254521a2fSDimitry Andric }); 193bdd1243dSDimitry Andric return; 194bdd1243dSDimitry Andric } 195bdd1243dSDimitry Andric #endif 1960b57cec5SDimitry Andric F(); 1970b57cec5SDimitry Andric } 1980b57cec5SDimitry Andric 1990b57cec5SDimitry Andric } // namespace parallel 2000b57cec5SDimitry Andric } // namespace llvm 20104eeddc0SDimitry Andric 20281ad6265SDimitry Andric void llvm::parallelFor(size_t Begin, size_t End, 20304eeddc0SDimitry Andric llvm::function_ref<void(size_t)> Fn) { 20404eeddc0SDimitry Andric #if LLVM_ENABLE_THREADS 20506c3fb27SDimitry Andric if (parallel::strategy.ThreadsRequested != 1) { 20604eeddc0SDimitry Andric auto NumItems = End - Begin; 20704eeddc0SDimitry Andric // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling 20804eeddc0SDimitry Andric // overhead on large inputs. 20904eeddc0SDimitry Andric auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup; 21004eeddc0SDimitry Andric if (TaskSize == 0) 21104eeddc0SDimitry Andric TaskSize = 1; 21204eeddc0SDimitry Andric 213bdd1243dSDimitry Andric parallel::TaskGroup TG; 21404eeddc0SDimitry Andric for (; Begin + TaskSize < End; Begin += TaskSize) { 21504eeddc0SDimitry Andric TG.spawn([=, &Fn] { 21604eeddc0SDimitry Andric for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I) 21704eeddc0SDimitry Andric Fn(I); 21804eeddc0SDimitry Andric }); 21904eeddc0SDimitry Andric } 2201ac55f4cSDimitry Andric if (Begin != End) { 2211ac55f4cSDimitry Andric TG.spawn([=, &Fn] { 2221ac55f4cSDimitry Andric for (size_t I = Begin; I != End; ++I) 2231ac55f4cSDimitry Andric Fn(I); 2241ac55f4cSDimitry Andric }); 2251ac55f4cSDimitry Andric } 22604eeddc0SDimitry Andric return; 22704eeddc0SDimitry Andric } 22804eeddc0SDimitry Andric #endif 22904eeddc0SDimitry Andric 23004eeddc0SDimitry Andric for (; Begin != End; ++Begin) 23104eeddc0SDimitry Andric Fn(Begin); 23204eeddc0SDimitry Andric } 233