xref: /freebsd-src/contrib/llvm-project/llvm/lib/Support/Parallel.cpp (revision 0fca6ea1d4eea4c934cfff25ac9ee8ad6fe95583)
10b57cec5SDimitry Andric //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
20b57cec5SDimitry Andric //
30b57cec5SDimitry Andric // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
40b57cec5SDimitry Andric // See https://llvm.org/LICENSE.txt for license information.
50b57cec5SDimitry Andric // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
60b57cec5SDimitry Andric //
70b57cec5SDimitry Andric //===----------------------------------------------------------------------===//
80b57cec5SDimitry Andric 
90b57cec5SDimitry Andric #include "llvm/Support/Parallel.h"
100b57cec5SDimitry Andric #include "llvm/Config/llvm-config.h"
11480093f4SDimitry Andric #include "llvm/Support/ManagedStatic.h"
120b57cec5SDimitry Andric #include "llvm/Support/Threading.h"
130b57cec5SDimitry Andric 
140b57cec5SDimitry Andric #include <atomic>
15480093f4SDimitry Andric #include <future>
160b57cec5SDimitry Andric #include <thread>
17480093f4SDimitry Andric #include <vector>
180b57cec5SDimitry Andric 
195ffd83dbSDimitry Andric llvm::ThreadPoolStrategy llvm::parallel::strategy;
205ffd83dbSDimitry Andric 
210b57cec5SDimitry Andric namespace llvm {
220b57cec5SDimitry Andric namespace parallel {
23bdd1243dSDimitry Andric #if LLVM_ENABLE_THREADS
24bdd1243dSDimitry Andric 
25bdd1243dSDimitry Andric #ifdef _WIN32
2606c3fb27SDimitry Andric static thread_local unsigned threadIndex = UINT_MAX;
27bdd1243dSDimitry Andric 
2806c3fb27SDimitry Andric unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; }
29bdd1243dSDimitry Andric #else
3006c3fb27SDimitry Andric thread_local unsigned threadIndex = UINT_MAX;
31bdd1243dSDimitry Andric #endif
32bdd1243dSDimitry Andric 
330b57cec5SDimitry Andric namespace detail {
340b57cec5SDimitry Andric 
350b57cec5SDimitry Andric namespace {
360b57cec5SDimitry Andric 
370b57cec5SDimitry Andric /// An abstract class that takes closures and runs them asynchronously.
380b57cec5SDimitry Andric class Executor {
390b57cec5SDimitry Andric public:
400b57cec5SDimitry Andric   virtual ~Executor() = default;
4154521a2fSDimitry Andric   virtual void add(std::function<void()> func) = 0;
4206c3fb27SDimitry Andric   virtual size_t getThreadCount() const = 0;
430b57cec5SDimitry Andric 
440b57cec5SDimitry Andric   static Executor *getDefaultExecutor();
450b57cec5SDimitry Andric };
460b57cec5SDimitry Andric 
470b57cec5SDimitry Andric /// An implementation of an Executor that runs closures on a thread pool
480b57cec5SDimitry Andric ///   in filo order.
490b57cec5SDimitry Andric class ThreadPoolExecutor : public Executor {
500b57cec5SDimitry Andric public:
515ffd83dbSDimitry Andric   explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) {
5206c3fb27SDimitry Andric     ThreadCount = S.compute_thread_count();
530b57cec5SDimitry Andric     // Spawn all but one of the threads in another thread as spawning threads
540b57cec5SDimitry Andric     // can take a while.
55480093f4SDimitry Andric     Threads.reserve(ThreadCount);
56480093f4SDimitry Andric     Threads.resize(1);
57480093f4SDimitry Andric     std::lock_guard<std::mutex> Lock(Mutex);
5806c3fb27SDimitry Andric     // Use operator[] before creating the thread to avoid data race in .size()
59*0fca6ea1SDimitry Andric     // in 'safe libc++' mode.
6006c3fb27SDimitry Andric     auto &Thread0 = Threads[0];
6106c3fb27SDimitry Andric     Thread0 = std::thread([this, S] {
625ffd83dbSDimitry Andric       for (unsigned I = 1; I < ThreadCount; ++I) {
635ffd83dbSDimitry Andric         Threads.emplace_back([=] { work(S, I); });
64480093f4SDimitry Andric         if (Stop)
65480093f4SDimitry Andric           break;
660b57cec5SDimitry Andric       }
67480093f4SDimitry Andric       ThreadsCreated.set_value();
685ffd83dbSDimitry Andric       work(S, 0);
69480093f4SDimitry Andric     });
70480093f4SDimitry Andric   }
71480093f4SDimitry Andric 
72480093f4SDimitry Andric   void stop() {
73480093f4SDimitry Andric     {
74480093f4SDimitry Andric       std::lock_guard<std::mutex> Lock(Mutex);
75480093f4SDimitry Andric       if (Stop)
76480093f4SDimitry Andric         return;
77480093f4SDimitry Andric       Stop = true;
78480093f4SDimitry Andric     }
79480093f4SDimitry Andric     Cond.notify_all();
80480093f4SDimitry Andric     ThreadsCreated.get_future().wait();
810b57cec5SDimitry Andric   }
820b57cec5SDimitry Andric 
830b57cec5SDimitry Andric   ~ThreadPoolExecutor() override {
84480093f4SDimitry Andric     stop();
85480093f4SDimitry Andric     std::thread::id CurrentThreadId = std::this_thread::get_id();
86480093f4SDimitry Andric     for (std::thread &T : Threads)
87480093f4SDimitry Andric       if (T.get_id() == CurrentThreadId)
88480093f4SDimitry Andric         T.detach();
89480093f4SDimitry Andric       else
90480093f4SDimitry Andric         T.join();
910b57cec5SDimitry Andric   }
920b57cec5SDimitry Andric 
935ffd83dbSDimitry Andric   struct Creator {
945ffd83dbSDimitry Andric     static void *call() { return new ThreadPoolExecutor(strategy); }
955ffd83dbSDimitry Andric   };
96480093f4SDimitry Andric   struct Deleter {
97480093f4SDimitry Andric     static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
98480093f4SDimitry Andric   };
99480093f4SDimitry Andric 
10054521a2fSDimitry Andric   void add(std::function<void()> F) override {
101480093f4SDimitry Andric     {
102480093f4SDimitry Andric       std::lock_guard<std::mutex> Lock(Mutex);
10354521a2fSDimitry Andric       WorkStack.push_back(std::move(F));
104480093f4SDimitry Andric     }
1050b57cec5SDimitry Andric     Cond.notify_one();
1060b57cec5SDimitry Andric   }
1070b57cec5SDimitry Andric 
10806c3fb27SDimitry Andric   size_t getThreadCount() const override { return ThreadCount; }
10906c3fb27SDimitry Andric 
1100b57cec5SDimitry Andric private:
1115ffd83dbSDimitry Andric   void work(ThreadPoolStrategy S, unsigned ThreadID) {
112bdd1243dSDimitry Andric     threadIndex = ThreadID;
1135ffd83dbSDimitry Andric     S.apply_thread_strategy(ThreadID);
1140b57cec5SDimitry Andric     while (true) {
1150b57cec5SDimitry Andric       std::unique_lock<std::mutex> Lock(Mutex);
11654521a2fSDimitry Andric       Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
1170b57cec5SDimitry Andric       if (Stop)
1180b57cec5SDimitry Andric         break;
11954521a2fSDimitry Andric       auto Task = std::move(WorkStack.back());
12054521a2fSDimitry Andric       WorkStack.pop_back();
1210b57cec5SDimitry Andric       Lock.unlock();
1220b57cec5SDimitry Andric       Task();
1230b57cec5SDimitry Andric     }
1240b57cec5SDimitry Andric   }
1250b57cec5SDimitry Andric 
1260b57cec5SDimitry Andric   std::atomic<bool> Stop{false};
12754521a2fSDimitry Andric   std::vector<std::function<void()>> WorkStack;
1280b57cec5SDimitry Andric   std::mutex Mutex;
1290b57cec5SDimitry Andric   std::condition_variable Cond;
130480093f4SDimitry Andric   std::promise<void> ThreadsCreated;
131480093f4SDimitry Andric   std::vector<std::thread> Threads;
13206c3fb27SDimitry Andric   unsigned ThreadCount;
1330b57cec5SDimitry Andric };
1340b57cec5SDimitry Andric 
1350b57cec5SDimitry Andric Executor *Executor::getDefaultExecutor() {
136480093f4SDimitry Andric   // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
137480093f4SDimitry Andric   // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
138480093f4SDimitry Andric   // stops the thread pool and waits for any worker thread creation to complete
139480093f4SDimitry Andric   // but does not wait for the threads to finish. The wait for worker thread
140480093f4SDimitry Andric   // creation to complete is important as it prevents intermittent crashes on
141480093f4SDimitry Andric   // Windows due to a race condition between thread creation and process exit.
142480093f4SDimitry Andric   //
143480093f4SDimitry Andric   // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
144480093f4SDimitry Andric   // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
145480093f4SDimitry Andric   // destructor ensures it has been stopped and waits for worker threads to
146480093f4SDimitry Andric   // finish. The wait is important as it prevents intermittent crashes on
147480093f4SDimitry Andric   // Windows when the process is doing a full exit.
148480093f4SDimitry Andric   //
149480093f4SDimitry Andric   // The Windows crashes appear to only occur with the MSVC static runtimes and
150480093f4SDimitry Andric   // are more frequent with the debug static runtime.
151480093f4SDimitry Andric   //
152480093f4SDimitry Andric   // This also prevents intermittent deadlocks on exit with the MinGW runtime.
1535ffd83dbSDimitry Andric 
1545ffd83dbSDimitry Andric   static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
155480093f4SDimitry Andric                        ThreadPoolExecutor::Deleter>
156480093f4SDimitry Andric       ManagedExec;
157480093f4SDimitry Andric   static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
158480093f4SDimitry Andric   return Exec.get();
1590b57cec5SDimitry Andric }
1608bcb0991SDimitry Andric } // namespace
161bdd1243dSDimitry Andric } // namespace detail
1620b57cec5SDimitry Andric 
16306c3fb27SDimitry Andric size_t getThreadCount() {
16406c3fb27SDimitry Andric   return detail::Executor::getDefaultExecutor()->getThreadCount();
16506c3fb27SDimitry Andric }
16606c3fb27SDimitry Andric #endif
1670b57cec5SDimitry Andric 
1680b57cec5SDimitry Andric // Latch::sync() called by the dtor may cause one thread to block. If is a dead
1690b57cec5SDimitry Andric // lock if all threads in the default executor are blocked. To prevent the dead
17006c3fb27SDimitry Andric // lock, only allow the root TaskGroup to run tasks parallelly. In the scenario
1710b57cec5SDimitry Andric // of nested parallel_for_each(), only the outermost one runs parallelly.
17206c3fb27SDimitry Andric TaskGroup::TaskGroup()
17306c3fb27SDimitry Andric #if LLVM_ENABLE_THREADS
17406c3fb27SDimitry Andric     : Parallel((parallel::strategy.ThreadsRequested != 1) &&
17506c3fb27SDimitry Andric                (threadIndex == UINT_MAX)) {}
17606c3fb27SDimitry Andric #else
17706c3fb27SDimitry Andric     : Parallel(false) {}
17806c3fb27SDimitry Andric #endif
179349cc55cSDimitry Andric TaskGroup::~TaskGroup() {
180349cc55cSDimitry Andric   // We must ensure that all the workloads have finished before decrementing the
181349cc55cSDimitry Andric   // instances count.
182349cc55cSDimitry Andric   L.sync();
183349cc55cSDimitry Andric }
1840b57cec5SDimitry Andric 
18554521a2fSDimitry Andric void TaskGroup::spawn(std::function<void()> F) {
186bdd1243dSDimitry Andric #if LLVM_ENABLE_THREADS
1870b57cec5SDimitry Andric   if (Parallel) {
1880b57cec5SDimitry Andric     L.inc();
18954521a2fSDimitry Andric     detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] {
1900b57cec5SDimitry Andric       F();
1910b57cec5SDimitry Andric       L.dec();
19254521a2fSDimitry Andric     });
193bdd1243dSDimitry Andric     return;
194bdd1243dSDimitry Andric   }
195bdd1243dSDimitry Andric #endif
1960b57cec5SDimitry Andric   F();
1970b57cec5SDimitry Andric }
1980b57cec5SDimitry Andric 
1990b57cec5SDimitry Andric } // namespace parallel
2000b57cec5SDimitry Andric } // namespace llvm
20104eeddc0SDimitry Andric 
20281ad6265SDimitry Andric void llvm::parallelFor(size_t Begin, size_t End,
20304eeddc0SDimitry Andric                        llvm::function_ref<void(size_t)> Fn) {
20404eeddc0SDimitry Andric #if LLVM_ENABLE_THREADS
20506c3fb27SDimitry Andric   if (parallel::strategy.ThreadsRequested != 1) {
20604eeddc0SDimitry Andric     auto NumItems = End - Begin;
20704eeddc0SDimitry Andric     // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
20804eeddc0SDimitry Andric     // overhead on large inputs.
20904eeddc0SDimitry Andric     auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
21004eeddc0SDimitry Andric     if (TaskSize == 0)
21104eeddc0SDimitry Andric       TaskSize = 1;
21204eeddc0SDimitry Andric 
213bdd1243dSDimitry Andric     parallel::TaskGroup TG;
21404eeddc0SDimitry Andric     for (; Begin + TaskSize < End; Begin += TaskSize) {
21504eeddc0SDimitry Andric       TG.spawn([=, &Fn] {
21604eeddc0SDimitry Andric         for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
21704eeddc0SDimitry Andric           Fn(I);
21804eeddc0SDimitry Andric       });
21904eeddc0SDimitry Andric     }
2201ac55f4cSDimitry Andric     if (Begin != End) {
2211ac55f4cSDimitry Andric       TG.spawn([=, &Fn] {
2221ac55f4cSDimitry Andric         for (size_t I = Begin; I != End; ++I)
2231ac55f4cSDimitry Andric           Fn(I);
2241ac55f4cSDimitry Andric       });
2251ac55f4cSDimitry Andric     }
22604eeddc0SDimitry Andric     return;
22704eeddc0SDimitry Andric   }
22804eeddc0SDimitry Andric #endif
22904eeddc0SDimitry Andric 
23004eeddc0SDimitry Andric   for (; Begin != End; ++Begin)
23104eeddc0SDimitry Andric     Fn(Begin);
23204eeddc0SDimitry Andric }
233