xref: /openbsd-src/gnu/llvm/llvm/lib/Support/Parallel.cpp (revision d415bd752c734aee168c4ee86ff32e8cc249eb16)
109467b48Spatrick //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
209467b48Spatrick //
309467b48Spatrick // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
409467b48Spatrick // See https://llvm.org/LICENSE.txt for license information.
509467b48Spatrick // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
609467b48Spatrick //
709467b48Spatrick //===----------------------------------------------------------------------===//
809467b48Spatrick 
909467b48Spatrick #include "llvm/Support/Parallel.h"
1009467b48Spatrick #include "llvm/Config/llvm-config.h"
1109467b48Spatrick #include "llvm/Support/ManagedStatic.h"
1209467b48Spatrick #include "llvm/Support/Threading.h"
1309467b48Spatrick 
1409467b48Spatrick #include <atomic>
1509467b48Spatrick #include <future>
1609467b48Spatrick #include <stack>
1709467b48Spatrick #include <thread>
1809467b48Spatrick #include <vector>
1909467b48Spatrick 
20097a140dSpatrick llvm::ThreadPoolStrategy llvm::parallel::strategy;
21097a140dSpatrick 
2209467b48Spatrick namespace llvm {
2309467b48Spatrick namespace parallel {
24*d415bd75Srobert #if LLVM_ENABLE_THREADS
25*d415bd75Srobert 
26*d415bd75Srobert #ifdef _WIN32
27*d415bd75Srobert static thread_local unsigned threadIndex;
28*d415bd75Srobert 
getThreadIndex()29*d415bd75Srobert unsigned getThreadIndex() { return threadIndex; }
30*d415bd75Srobert #else
31*d415bd75Srobert thread_local unsigned threadIndex;
32*d415bd75Srobert #endif
33*d415bd75Srobert 
3409467b48Spatrick namespace detail {
3509467b48Spatrick 
3609467b48Spatrick namespace {
3709467b48Spatrick 
3809467b48Spatrick /// An abstract class that takes closures and runs them asynchronously.
3909467b48Spatrick class Executor {
4009467b48Spatrick public:
4109467b48Spatrick   virtual ~Executor() = default;
4209467b48Spatrick   virtual void add(std::function<void()> func) = 0;
4309467b48Spatrick 
4409467b48Spatrick   static Executor *getDefaultExecutor();
4509467b48Spatrick };
4609467b48Spatrick 
4709467b48Spatrick /// An implementation of an Executor that runs closures on a thread pool
4809467b48Spatrick ///   in filo order.
4909467b48Spatrick class ThreadPoolExecutor : public Executor {
5009467b48Spatrick public:
ThreadPoolExecutor(ThreadPoolStrategy S=hardware_concurrency ())51097a140dSpatrick   explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) {
52097a140dSpatrick     unsigned ThreadCount = S.compute_thread_count();
5309467b48Spatrick     // Spawn all but one of the threads in another thread as spawning threads
5409467b48Spatrick     // can take a while.
5509467b48Spatrick     Threads.reserve(ThreadCount);
5609467b48Spatrick     Threads.resize(1);
5709467b48Spatrick     std::lock_guard<std::mutex> Lock(Mutex);
58097a140dSpatrick     Threads[0] = std::thread([this, ThreadCount, S] {
59097a140dSpatrick       for (unsigned I = 1; I < ThreadCount; ++I) {
60097a140dSpatrick         Threads.emplace_back([=] { work(S, I); });
6109467b48Spatrick         if (Stop)
6209467b48Spatrick           break;
6309467b48Spatrick       }
6409467b48Spatrick       ThreadsCreated.set_value();
65097a140dSpatrick       work(S, 0);
6609467b48Spatrick     });
6709467b48Spatrick   }
6809467b48Spatrick 
stop()6909467b48Spatrick   void stop() {
7009467b48Spatrick     {
7109467b48Spatrick       std::lock_guard<std::mutex> Lock(Mutex);
7209467b48Spatrick       if (Stop)
7309467b48Spatrick         return;
7409467b48Spatrick       Stop = true;
7509467b48Spatrick     }
7609467b48Spatrick     Cond.notify_all();
7709467b48Spatrick     ThreadsCreated.get_future().wait();
7809467b48Spatrick   }
7909467b48Spatrick 
~ThreadPoolExecutor()8009467b48Spatrick   ~ThreadPoolExecutor() override {
8109467b48Spatrick     stop();
8209467b48Spatrick     std::thread::id CurrentThreadId = std::this_thread::get_id();
8309467b48Spatrick     for (std::thread &T : Threads)
8409467b48Spatrick       if (T.get_id() == CurrentThreadId)
8509467b48Spatrick         T.detach();
8609467b48Spatrick       else
8709467b48Spatrick         T.join();
8809467b48Spatrick   }
8909467b48Spatrick 
90097a140dSpatrick   struct Creator {
callllvm::parallel::detail::__anon2cb6b1800111::ThreadPoolExecutor::Creator91097a140dSpatrick     static void *call() { return new ThreadPoolExecutor(strategy); }
92097a140dSpatrick   };
9309467b48Spatrick   struct Deleter {
callllvm::parallel::detail::__anon2cb6b1800111::ThreadPoolExecutor::Deleter9409467b48Spatrick     static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
9509467b48Spatrick   };
9609467b48Spatrick 
add(std::function<void ()> F)9709467b48Spatrick   void add(std::function<void()> F) override {
9809467b48Spatrick     {
9909467b48Spatrick       std::lock_guard<std::mutex> Lock(Mutex);
100*d415bd75Srobert       WorkStack.push(std::move(F));
10109467b48Spatrick     }
10209467b48Spatrick     Cond.notify_one();
10309467b48Spatrick   }
10409467b48Spatrick 
10509467b48Spatrick private:
work(ThreadPoolStrategy S,unsigned ThreadID)106097a140dSpatrick   void work(ThreadPoolStrategy S, unsigned ThreadID) {
107*d415bd75Srobert     threadIndex = ThreadID;
108097a140dSpatrick     S.apply_thread_strategy(ThreadID);
10909467b48Spatrick     while (true) {
11009467b48Spatrick       std::unique_lock<std::mutex> Lock(Mutex);
11109467b48Spatrick       Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
11209467b48Spatrick       if (Stop)
11309467b48Spatrick         break;
114*d415bd75Srobert       auto Task = std::move(WorkStack.top());
11509467b48Spatrick       WorkStack.pop();
11609467b48Spatrick       Lock.unlock();
11709467b48Spatrick       Task();
11809467b48Spatrick     }
11909467b48Spatrick   }
12009467b48Spatrick 
12109467b48Spatrick   std::atomic<bool> Stop{false};
12209467b48Spatrick   std::stack<std::function<void()>> WorkStack;
12309467b48Spatrick   std::mutex Mutex;
12409467b48Spatrick   std::condition_variable Cond;
12509467b48Spatrick   std::promise<void> ThreadsCreated;
12609467b48Spatrick   std::vector<std::thread> Threads;
12709467b48Spatrick };
12809467b48Spatrick 
getDefaultExecutor()12909467b48Spatrick Executor *Executor::getDefaultExecutor() {
13009467b48Spatrick   // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
13109467b48Spatrick   // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
13209467b48Spatrick   // stops the thread pool and waits for any worker thread creation to complete
13309467b48Spatrick   // but does not wait for the threads to finish. The wait for worker thread
13409467b48Spatrick   // creation to complete is important as it prevents intermittent crashes on
13509467b48Spatrick   // Windows due to a race condition between thread creation and process exit.
13609467b48Spatrick   //
13709467b48Spatrick   // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
13809467b48Spatrick   // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
13909467b48Spatrick   // destructor ensures it has been stopped and waits for worker threads to
14009467b48Spatrick   // finish. The wait is important as it prevents intermittent crashes on
14109467b48Spatrick   // Windows when the process is doing a full exit.
14209467b48Spatrick   //
14309467b48Spatrick   // The Windows crashes appear to only occur with the MSVC static runtimes and
14409467b48Spatrick   // are more frequent with the debug static runtime.
14509467b48Spatrick   //
14609467b48Spatrick   // This also prevents intermittent deadlocks on exit with the MinGW runtime.
147097a140dSpatrick 
148097a140dSpatrick   static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
14909467b48Spatrick                        ThreadPoolExecutor::Deleter>
15009467b48Spatrick       ManagedExec;
15109467b48Spatrick   static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
15209467b48Spatrick   return Exec.get();
15309467b48Spatrick }
15409467b48Spatrick } // namespace
155*d415bd75Srobert } // namespace detail
156*d415bd75Srobert #endif
15709467b48Spatrick 
15809467b48Spatrick static std::atomic<int> TaskGroupInstances;
15909467b48Spatrick 
16009467b48Spatrick // Latch::sync() called by the dtor may cause one thread to block. If is a dead
16109467b48Spatrick // lock if all threads in the default executor are blocked. To prevent the dead
16209467b48Spatrick // lock, only allow the first TaskGroup to run tasks parallelly. In the scenario
16309467b48Spatrick // of nested parallel_for_each(), only the outermost one runs parallelly.
TaskGroup()16409467b48Spatrick TaskGroup::TaskGroup() : Parallel(TaskGroupInstances++ == 0) {}
~TaskGroup()165*d415bd75Srobert TaskGroup::~TaskGroup() {
166*d415bd75Srobert   // We must ensure that all the workloads have finished before decrementing the
167*d415bd75Srobert   // instances count.
168*d415bd75Srobert   L.sync();
169*d415bd75Srobert   --TaskGroupInstances;
170*d415bd75Srobert }
17109467b48Spatrick 
spawn(std::function<void ()> F)17209467b48Spatrick void TaskGroup::spawn(std::function<void()> F) {
173*d415bd75Srobert #if LLVM_ENABLE_THREADS
17409467b48Spatrick   if (Parallel) {
17509467b48Spatrick     L.inc();
176*d415bd75Srobert     detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] {
17709467b48Spatrick       F();
17809467b48Spatrick       L.dec();
17909467b48Spatrick     });
180*d415bd75Srobert     return;
181*d415bd75Srobert   }
182*d415bd75Srobert #endif
18309467b48Spatrick   F();
18409467b48Spatrick }
18509467b48Spatrick 
execute(std::function<void ()> F)186*d415bd75Srobert void TaskGroup::execute(std::function<void()> F) {
187*d415bd75Srobert   if (parallel::strategy.ThreadsRequested == 1)
188*d415bd75Srobert     F();
189*d415bd75Srobert   else
190*d415bd75Srobert     spawn(F);
191*d415bd75Srobert }
19209467b48Spatrick } // namespace parallel
19309467b48Spatrick } // namespace llvm
194*d415bd75Srobert 
parallelFor(size_t Begin,size_t End,llvm::function_ref<void (size_t)> Fn)195*d415bd75Srobert void llvm::parallelFor(size_t Begin, size_t End,
196*d415bd75Srobert                        llvm::function_ref<void(size_t)> Fn) {
197*d415bd75Srobert   // If we have zero or one items, then do not incur the overhead of spinning up
198*d415bd75Srobert   // a task group.  They are surprisingly expensive, and because they do not
199*d415bd75Srobert   // support nested parallelism, a single entry task group can block parallel
200*d415bd75Srobert   // execution underneath them.
201*d415bd75Srobert #if LLVM_ENABLE_THREADS
202*d415bd75Srobert   auto NumItems = End - Begin;
203*d415bd75Srobert   if (NumItems > 1 && parallel::strategy.ThreadsRequested != 1) {
204*d415bd75Srobert     // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
205*d415bd75Srobert     // overhead on large inputs.
206*d415bd75Srobert     auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
207*d415bd75Srobert     if (TaskSize == 0)
208*d415bd75Srobert       TaskSize = 1;
209*d415bd75Srobert 
210*d415bd75Srobert     parallel::TaskGroup TG;
211*d415bd75Srobert     for (; Begin + TaskSize < End; Begin += TaskSize) {
212*d415bd75Srobert       TG.spawn([=, &Fn] {
213*d415bd75Srobert         for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
214*d415bd75Srobert           Fn(I);
215*d415bd75Srobert       });
216*d415bd75Srobert     }
217*d415bd75Srobert     if (Begin != End) {
218*d415bd75Srobert       TG.spawn([=, &Fn] {
219*d415bd75Srobert         for (size_t I = Begin; I != End; ++I)
220*d415bd75Srobert           Fn(I);
221*d415bd75Srobert       });
222*d415bd75Srobert     }
223*d415bd75Srobert     return;
224*d415bd75Srobert   }
225*d415bd75Srobert #endif
226*d415bd75Srobert 
227*d415bd75Srobert   for (; Begin != End; ++Begin)
228*d415bd75Srobert     Fn(Begin);
229*d415bd75Srobert }
230