1 //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 9 #include "llvm/Support/Parallel.h" 10 #include "llvm/Config/llvm-config.h" 11 12 #if LLVM_ENABLE_THREADS 13 14 #include "llvm/Support/Threading.h" 15 16 #include <atomic> 17 #include <stack> 18 #include <thread> 19 20 using namespace llvm; 21 22 namespace { 23 24 /// An abstract class that takes closures and runs them asynchronously. 25 class Executor { 26 public: 27 virtual ~Executor() = default; 28 virtual void add(std::function<void()> func) = 0; 29 30 static Executor *getDefaultExecutor(); 31 }; 32 33 #if defined(_MSC_VER) 34 /// An Executor that runs tasks via ConcRT. 35 class ConcRTExecutor : public Executor { 36 struct Taskish { 37 Taskish(std::function<void()> Task) : Task(Task) {} 38 39 std::function<void()> Task; 40 41 static void run(void *P) { 42 Taskish *Self = static_cast<Taskish *>(P); 43 Self->Task(); 44 concurrency::Free(Self); 45 } 46 }; 47 48 public: 49 virtual void add(std::function<void()> F) { 50 Concurrency::CurrentScheduler::ScheduleTask( 51 Taskish::run, new (concurrency::Alloc(sizeof(Taskish))) Taskish(F)); 52 } 53 }; 54 55 Executor *Executor::getDefaultExecutor() { 56 static ConcRTExecutor exec; 57 return &exec; 58 } 59 60 #else 61 /// An implementation of an Executor that runs closures on a thread pool 62 /// in filo order. 63 class ThreadPoolExecutor : public Executor { 64 public: 65 explicit ThreadPoolExecutor(unsigned ThreadCount = hardware_concurrency()) 66 : Done(ThreadCount) { 67 // Spawn all but one of the threads in another thread as spawning threads 68 // can take a while. 69 std::thread([&, ThreadCount] { 70 for (size_t i = 1; i < ThreadCount; ++i) { 71 std::thread([=] { work(); }).detach(); 72 } 73 work(); 74 }).detach(); 75 } 76 77 ~ThreadPoolExecutor() override { 78 std::unique_lock<std::mutex> Lock(Mutex); 79 Stop = true; 80 Lock.unlock(); 81 Cond.notify_all(); 82 // Wait for ~Latch. 83 } 84 85 void add(std::function<void()> F) override { 86 std::unique_lock<std::mutex> Lock(Mutex); 87 WorkStack.push(F); 88 Lock.unlock(); 89 Cond.notify_one(); 90 } 91 92 private: 93 void work() { 94 while (true) { 95 std::unique_lock<std::mutex> Lock(Mutex); 96 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); }); 97 if (Stop) 98 break; 99 auto Task = WorkStack.top(); 100 WorkStack.pop(); 101 Lock.unlock(); 102 Task(); 103 } 104 Done.dec(); 105 } 106 107 std::atomic<bool> Stop{false}; 108 std::stack<std::function<void()>> WorkStack; 109 std::mutex Mutex; 110 std::condition_variable Cond; 111 parallel::detail::Latch Done; 112 }; 113 114 Executor *Executor::getDefaultExecutor() { 115 static ThreadPoolExecutor exec; 116 return &exec; 117 } 118 #endif 119 } 120 121 void parallel::detail::TaskGroup::spawn(std::function<void()> F) { 122 L.inc(); 123 Executor::getDefaultExecutor()->add([&, F] { 124 F(); 125 L.dec(); 126 }); 127 } 128 #endif // LLVM_ENABLE_THREADS 129