109467b48Spatrick //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
209467b48Spatrick //
309467b48Spatrick // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
409467b48Spatrick // See https://llvm.org/LICENSE.txt for license information.
509467b48Spatrick // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
609467b48Spatrick //
709467b48Spatrick //===----------------------------------------------------------------------===//
809467b48Spatrick
909467b48Spatrick #include "llvm/Support/Parallel.h"
1009467b48Spatrick #include "llvm/Config/llvm-config.h"
1109467b48Spatrick #include "llvm/Support/ManagedStatic.h"
1209467b48Spatrick #include "llvm/Support/Threading.h"
1309467b48Spatrick
1409467b48Spatrick #include <atomic>
1509467b48Spatrick #include <future>
1609467b48Spatrick #include <stack>
1709467b48Spatrick #include <thread>
1809467b48Spatrick #include <vector>
1909467b48Spatrick
20097a140dSpatrick llvm::ThreadPoolStrategy llvm::parallel::strategy;
21097a140dSpatrick
2209467b48Spatrick namespace llvm {
2309467b48Spatrick namespace parallel {
24*d415bd75Srobert #if LLVM_ENABLE_THREADS
25*d415bd75Srobert
26*d415bd75Srobert #ifdef _WIN32
27*d415bd75Srobert static thread_local unsigned threadIndex;
28*d415bd75Srobert
getThreadIndex()29*d415bd75Srobert unsigned getThreadIndex() { return threadIndex; }
30*d415bd75Srobert #else
31*d415bd75Srobert thread_local unsigned threadIndex;
32*d415bd75Srobert #endif
33*d415bd75Srobert
3409467b48Spatrick namespace detail {
3509467b48Spatrick
3609467b48Spatrick namespace {
3709467b48Spatrick
3809467b48Spatrick /// An abstract class that takes closures and runs them asynchronously.
3909467b48Spatrick class Executor {
4009467b48Spatrick public:
4109467b48Spatrick virtual ~Executor() = default;
4209467b48Spatrick virtual void add(std::function<void()> func) = 0;
4309467b48Spatrick
4409467b48Spatrick static Executor *getDefaultExecutor();
4509467b48Spatrick };
4609467b48Spatrick
4709467b48Spatrick /// An implementation of an Executor that runs closures on a thread pool
4809467b48Spatrick /// in filo order.
4909467b48Spatrick class ThreadPoolExecutor : public Executor {
5009467b48Spatrick public:
ThreadPoolExecutor(ThreadPoolStrategy S=hardware_concurrency ())51097a140dSpatrick explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) {
52097a140dSpatrick unsigned ThreadCount = S.compute_thread_count();
5309467b48Spatrick // Spawn all but one of the threads in another thread as spawning threads
5409467b48Spatrick // can take a while.
5509467b48Spatrick Threads.reserve(ThreadCount);
5609467b48Spatrick Threads.resize(1);
5709467b48Spatrick std::lock_guard<std::mutex> Lock(Mutex);
58097a140dSpatrick Threads[0] = std::thread([this, ThreadCount, S] {
59097a140dSpatrick for (unsigned I = 1; I < ThreadCount; ++I) {
60097a140dSpatrick Threads.emplace_back([=] { work(S, I); });
6109467b48Spatrick if (Stop)
6209467b48Spatrick break;
6309467b48Spatrick }
6409467b48Spatrick ThreadsCreated.set_value();
65097a140dSpatrick work(S, 0);
6609467b48Spatrick });
6709467b48Spatrick }
6809467b48Spatrick
stop()6909467b48Spatrick void stop() {
7009467b48Spatrick {
7109467b48Spatrick std::lock_guard<std::mutex> Lock(Mutex);
7209467b48Spatrick if (Stop)
7309467b48Spatrick return;
7409467b48Spatrick Stop = true;
7509467b48Spatrick }
7609467b48Spatrick Cond.notify_all();
7709467b48Spatrick ThreadsCreated.get_future().wait();
7809467b48Spatrick }
7909467b48Spatrick
~ThreadPoolExecutor()8009467b48Spatrick ~ThreadPoolExecutor() override {
8109467b48Spatrick stop();
8209467b48Spatrick std::thread::id CurrentThreadId = std::this_thread::get_id();
8309467b48Spatrick for (std::thread &T : Threads)
8409467b48Spatrick if (T.get_id() == CurrentThreadId)
8509467b48Spatrick T.detach();
8609467b48Spatrick else
8709467b48Spatrick T.join();
8809467b48Spatrick }
8909467b48Spatrick
90097a140dSpatrick struct Creator {
callllvm::parallel::detail::__anon2cb6b1800111::ThreadPoolExecutor::Creator91097a140dSpatrick static void *call() { return new ThreadPoolExecutor(strategy); }
92097a140dSpatrick };
9309467b48Spatrick struct Deleter {
callllvm::parallel::detail::__anon2cb6b1800111::ThreadPoolExecutor::Deleter9409467b48Spatrick static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
9509467b48Spatrick };
9609467b48Spatrick
add(std::function<void ()> F)9709467b48Spatrick void add(std::function<void()> F) override {
9809467b48Spatrick {
9909467b48Spatrick std::lock_guard<std::mutex> Lock(Mutex);
100*d415bd75Srobert WorkStack.push(std::move(F));
10109467b48Spatrick }
10209467b48Spatrick Cond.notify_one();
10309467b48Spatrick }
10409467b48Spatrick
10509467b48Spatrick private:
work(ThreadPoolStrategy S,unsigned ThreadID)106097a140dSpatrick void work(ThreadPoolStrategy S, unsigned ThreadID) {
107*d415bd75Srobert threadIndex = ThreadID;
108097a140dSpatrick S.apply_thread_strategy(ThreadID);
10909467b48Spatrick while (true) {
11009467b48Spatrick std::unique_lock<std::mutex> Lock(Mutex);
11109467b48Spatrick Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
11209467b48Spatrick if (Stop)
11309467b48Spatrick break;
114*d415bd75Srobert auto Task = std::move(WorkStack.top());
11509467b48Spatrick WorkStack.pop();
11609467b48Spatrick Lock.unlock();
11709467b48Spatrick Task();
11809467b48Spatrick }
11909467b48Spatrick }
12009467b48Spatrick
12109467b48Spatrick std::atomic<bool> Stop{false};
12209467b48Spatrick std::stack<std::function<void()>> WorkStack;
12309467b48Spatrick std::mutex Mutex;
12409467b48Spatrick std::condition_variable Cond;
12509467b48Spatrick std::promise<void> ThreadsCreated;
12609467b48Spatrick std::vector<std::thread> Threads;
12709467b48Spatrick };
12809467b48Spatrick
getDefaultExecutor()12909467b48Spatrick Executor *Executor::getDefaultExecutor() {
13009467b48Spatrick // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
13109467b48Spatrick // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
13209467b48Spatrick // stops the thread pool and waits for any worker thread creation to complete
13309467b48Spatrick // but does not wait for the threads to finish. The wait for worker thread
13409467b48Spatrick // creation to complete is important as it prevents intermittent crashes on
13509467b48Spatrick // Windows due to a race condition between thread creation and process exit.
13609467b48Spatrick //
13709467b48Spatrick // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
13809467b48Spatrick // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
13909467b48Spatrick // destructor ensures it has been stopped and waits for worker threads to
14009467b48Spatrick // finish. The wait is important as it prevents intermittent crashes on
14109467b48Spatrick // Windows when the process is doing a full exit.
14209467b48Spatrick //
14309467b48Spatrick // The Windows crashes appear to only occur with the MSVC static runtimes and
14409467b48Spatrick // are more frequent with the debug static runtime.
14509467b48Spatrick //
14609467b48Spatrick // This also prevents intermittent deadlocks on exit with the MinGW runtime.
147097a140dSpatrick
148097a140dSpatrick static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
14909467b48Spatrick ThreadPoolExecutor::Deleter>
15009467b48Spatrick ManagedExec;
15109467b48Spatrick static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
15209467b48Spatrick return Exec.get();
15309467b48Spatrick }
15409467b48Spatrick } // namespace
155*d415bd75Srobert } // namespace detail
156*d415bd75Srobert #endif
15709467b48Spatrick
15809467b48Spatrick static std::atomic<int> TaskGroupInstances;
15909467b48Spatrick
16009467b48Spatrick // Latch::sync() called by the dtor may cause one thread to block. If is a dead
16109467b48Spatrick // lock if all threads in the default executor are blocked. To prevent the dead
16209467b48Spatrick // lock, only allow the first TaskGroup to run tasks parallelly. In the scenario
16309467b48Spatrick // of nested parallel_for_each(), only the outermost one runs parallelly.
TaskGroup()16409467b48Spatrick TaskGroup::TaskGroup() : Parallel(TaskGroupInstances++ == 0) {}
~TaskGroup()165*d415bd75Srobert TaskGroup::~TaskGroup() {
166*d415bd75Srobert // We must ensure that all the workloads have finished before decrementing the
167*d415bd75Srobert // instances count.
168*d415bd75Srobert L.sync();
169*d415bd75Srobert --TaskGroupInstances;
170*d415bd75Srobert }
17109467b48Spatrick
spawn(std::function<void ()> F)17209467b48Spatrick void TaskGroup::spawn(std::function<void()> F) {
173*d415bd75Srobert #if LLVM_ENABLE_THREADS
17409467b48Spatrick if (Parallel) {
17509467b48Spatrick L.inc();
176*d415bd75Srobert detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] {
17709467b48Spatrick F();
17809467b48Spatrick L.dec();
17909467b48Spatrick });
180*d415bd75Srobert return;
181*d415bd75Srobert }
182*d415bd75Srobert #endif
18309467b48Spatrick F();
18409467b48Spatrick }
18509467b48Spatrick
execute(std::function<void ()> F)186*d415bd75Srobert void TaskGroup::execute(std::function<void()> F) {
187*d415bd75Srobert if (parallel::strategy.ThreadsRequested == 1)
188*d415bd75Srobert F();
189*d415bd75Srobert else
190*d415bd75Srobert spawn(F);
191*d415bd75Srobert }
19209467b48Spatrick } // namespace parallel
19309467b48Spatrick } // namespace llvm
194*d415bd75Srobert
parallelFor(size_t Begin,size_t End,llvm::function_ref<void (size_t)> Fn)195*d415bd75Srobert void llvm::parallelFor(size_t Begin, size_t End,
196*d415bd75Srobert llvm::function_ref<void(size_t)> Fn) {
197*d415bd75Srobert // If we have zero or one items, then do not incur the overhead of spinning up
198*d415bd75Srobert // a task group. They are surprisingly expensive, and because they do not
199*d415bd75Srobert // support nested parallelism, a single entry task group can block parallel
200*d415bd75Srobert // execution underneath them.
201*d415bd75Srobert #if LLVM_ENABLE_THREADS
202*d415bd75Srobert auto NumItems = End - Begin;
203*d415bd75Srobert if (NumItems > 1 && parallel::strategy.ThreadsRequested != 1) {
204*d415bd75Srobert // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
205*d415bd75Srobert // overhead on large inputs.
206*d415bd75Srobert auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
207*d415bd75Srobert if (TaskSize == 0)
208*d415bd75Srobert TaskSize = 1;
209*d415bd75Srobert
210*d415bd75Srobert parallel::TaskGroup TG;
211*d415bd75Srobert for (; Begin + TaskSize < End; Begin += TaskSize) {
212*d415bd75Srobert TG.spawn([=, &Fn] {
213*d415bd75Srobert for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
214*d415bd75Srobert Fn(I);
215*d415bd75Srobert });
216*d415bd75Srobert }
217*d415bd75Srobert if (Begin != End) {
218*d415bd75Srobert TG.spawn([=, &Fn] {
219*d415bd75Srobert for (size_t I = Begin; I != End; ++I)
220*d415bd75Srobert Fn(I);
221*d415bd75Srobert });
222*d415bd75Srobert }
223*d415bd75Srobert return;
224*d415bd75Srobert }
225*d415bd75Srobert #endif
226*d415bd75Srobert
227*d415bd75Srobert for (; Begin != End; ++Begin)
228*d415bd75Srobert Fn(Begin);
229*d415bd75Srobert }
230