xref: /llvm-project/llvm/lib/Support/ThreadPool.cpp (revision 8404aeb56a73ab24f9b295111de3b37a37f0b841)
1 //==-- llvm/Support/ThreadPool.cpp - A ThreadPool implementation -*- C++ -*-==//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 //
9 // This file implements a crude C++11 based thread pool.
10 //
11 //===----------------------------------------------------------------------===//
12 
13 #include "llvm/Support/ThreadPool.h"
14 
15 #include "llvm/Config/llvm-config.h"
16 #include "llvm/Support/Threading.h"
17 #include "llvm/Support/raw_ostream.h"
18 
19 using namespace llvm;
20 
21 #if LLVM_ENABLE_THREADS
22 
23 ThreadPool::ThreadPool(ThreadPoolStrategy S)
24     : ActiveThreads(0), EnableFlag(true),
25       ThreadCount(S.compute_thread_count()) {
26   // Create ThreadCount threads that will loop forever, wait on QueueCondition
27   // for tasks to be queued or the Pool to be destroyed.
28   Threads.reserve(ThreadCount);
29   for (unsigned ThreadID = 0; ThreadID < ThreadCount; ++ThreadID) {
30     Threads.emplace_back([S, ThreadID, this] {
31       S.apply_thread_strategy(ThreadID);
32       while (true) {
33         PackagedTaskTy Task;
34         {
35           std::unique_lock<std::mutex> LockGuard(QueueLock);
36           // Wait for tasks to be pushed in the queue
37           QueueCondition.wait(LockGuard,
38                               [&] { return !EnableFlag || !Tasks.empty(); });
39           // Exit condition
40           if (!EnableFlag && Tasks.empty())
41             return;
42           // Yeah, we have a task, grab it and release the lock on the queue
43 
44           // We first need to signal that we are active before popping the queue
45           // in order for wait() to properly detect that even if the queue is
46           // empty, there is still a task in flight.
47           {
48             std::unique_lock<std::mutex> LockGuard(CompletionLock);
49             ++ActiveThreads;
50           }
51           Task = std::move(Tasks.front());
52           Tasks.pop();
53         }
54         // Run the task we just grabbed
55         Task();
56 
57         {
58           // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
59           std::unique_lock<std::mutex> LockGuard(CompletionLock);
60           --ActiveThreads;
61         }
62 
63         // Notify task completion, in case someone waits on ThreadPool::wait()
64         CompletionCondition.notify_all();
65       }
66     });
67   }
68 }
69 
70 void ThreadPool::wait() {
71   // Wait for all threads to complete and the queue to be empty
72   std::unique_lock<std::mutex> LockGuard(CompletionLock);
73   // The order of the checks for ActiveThreads and Tasks.empty() matters because
74   // any active threads might be modifying the Tasks queue, and this would be a
75   // race.
76   CompletionCondition.wait(LockGuard,
77                            [&] { return !ActiveThreads && Tasks.empty(); });
78 }
79 
80 std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) {
81   /// Wrap the Task in a packaged_task to return a future object.
82   PackagedTaskTy PackagedTask(std::move(Task));
83   auto Future = PackagedTask.get_future();
84   {
85     // Lock the queue and push the new task
86     std::unique_lock<std::mutex> LockGuard(QueueLock);
87 
88     // Don't allow enqueueing after disabling the pool
89     assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
90 
91     Tasks.push(std::move(PackagedTask));
92   }
93   QueueCondition.notify_one();
94   return Future.share();
95 }
96 
97 // The destructor joins all threads, waiting for completion.
98 ThreadPool::~ThreadPool() {
99   {
100     std::unique_lock<std::mutex> LockGuard(QueueLock);
101     EnableFlag = false;
102   }
103   QueueCondition.notify_all();
104   for (auto &Worker : Threads)
105     Worker.join();
106 }
107 
108 #else // LLVM_ENABLE_THREADS Disabled
109 
110 // No threads are launched, issue a warning if ThreadCount is not 0
111 ThreadPool::ThreadPool(ThreadPoolStrategy S)
112     : ActiveThreads(0), ThreadCount(S.compute_thread_count()) {
113   if (ThreadCount != 1) {
114     errs() << "Warning: request a ThreadPool with " << ThreadCount
115            << " threads, but LLVM_ENABLE_THREADS has been turned off\n";
116   }
117 }
118 
119 void ThreadPool::wait() {
120   // Sequential implementation running the tasks
121   while (!Tasks.empty()) {
122     auto Task = std::move(Tasks.front());
123     Tasks.pop();
124     Task();
125   }
126 }
127 
128 std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) {
129   // Get a Future with launch::deferred execution using std::async
130   auto Future = std::async(std::launch::deferred, std::move(Task)).share();
131   // Wrap the future so that both ThreadPool::wait() can operate and the
132   // returned future can be sync'ed on.
133   PackagedTaskTy PackagedTask([Future]() { Future.get(); });
134   Tasks.push(std::move(PackagedTask));
135   return Future;
136 }
137 
138 ThreadPool::~ThreadPool() { wait(); }
139 
140 #endif
141