xref: /llvm-project/llvm/lib/Support/Parallel.cpp (revision 2946cd701067404b99c39fb29dc9c74bd7193eb3)
1 //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
9 #include "llvm/Support/Parallel.h"
10 #include "llvm/Config/llvm-config.h"
11 
12 #if LLVM_ENABLE_THREADS
13 
14 #include "llvm/Support/Threading.h"
15 
16 #include <atomic>
17 #include <stack>
18 #include <thread>
19 
20 using namespace llvm;
21 
22 namespace {
23 
24 /// An abstract class that takes closures and runs them asynchronously.
25 class Executor {
26 public:
27   virtual ~Executor() = default;
28   virtual void add(std::function<void()> func) = 0;
29 
30   static Executor *getDefaultExecutor();
31 };
32 
33 #if defined(_MSC_VER)
34 /// An Executor that runs tasks via ConcRT.
35 class ConcRTExecutor : public Executor {
36   struct Taskish {
37     Taskish(std::function<void()> Task) : Task(Task) {}
38 
39     std::function<void()> Task;
40 
41     static void run(void *P) {
42       Taskish *Self = static_cast<Taskish *>(P);
43       Self->Task();
44       concurrency::Free(Self);
45     }
46   };
47 
48 public:
49   virtual void add(std::function<void()> F) {
50     Concurrency::CurrentScheduler::ScheduleTask(
51         Taskish::run, new (concurrency::Alloc(sizeof(Taskish))) Taskish(F));
52   }
53 };
54 
55 Executor *Executor::getDefaultExecutor() {
56   static ConcRTExecutor exec;
57   return &exec;
58 }
59 
60 #else
61 /// An implementation of an Executor that runs closures on a thread pool
62 ///   in filo order.
63 class ThreadPoolExecutor : public Executor {
64 public:
65   explicit ThreadPoolExecutor(unsigned ThreadCount = hardware_concurrency())
66       : Done(ThreadCount) {
67     // Spawn all but one of the threads in another thread as spawning threads
68     // can take a while.
69     std::thread([&, ThreadCount] {
70       for (size_t i = 1; i < ThreadCount; ++i) {
71         std::thread([=] { work(); }).detach();
72       }
73       work();
74     }).detach();
75   }
76 
77   ~ThreadPoolExecutor() override {
78     std::unique_lock<std::mutex> Lock(Mutex);
79     Stop = true;
80     Lock.unlock();
81     Cond.notify_all();
82     // Wait for ~Latch.
83   }
84 
85   void add(std::function<void()> F) override {
86     std::unique_lock<std::mutex> Lock(Mutex);
87     WorkStack.push(F);
88     Lock.unlock();
89     Cond.notify_one();
90   }
91 
92 private:
93   void work() {
94     while (true) {
95       std::unique_lock<std::mutex> Lock(Mutex);
96       Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
97       if (Stop)
98         break;
99       auto Task = WorkStack.top();
100       WorkStack.pop();
101       Lock.unlock();
102       Task();
103     }
104     Done.dec();
105   }
106 
107   std::atomic<bool> Stop{false};
108   std::stack<std::function<void()>> WorkStack;
109   std::mutex Mutex;
110   std::condition_variable Cond;
111   parallel::detail::Latch Done;
112 };
113 
114 Executor *Executor::getDefaultExecutor() {
115   static ThreadPoolExecutor exec;
116   return &exec;
117 }
118 #endif
119 }
120 
121 void parallel::detail::TaskGroup::spawn(std::function<void()> F) {
122   L.inc();
123   Executor::getDefaultExecutor()->add([&, F] {
124     F();
125     L.dec();
126   });
127 }
128 #endif // LLVM_ENABLE_THREADS
129