10b57cec5SDimitry Andric //==-- llvm/Support/ThreadPool.cpp - A ThreadPool implementation -*- C++ -*-==// 20b57cec5SDimitry Andric // 30b57cec5SDimitry Andric // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 40b57cec5SDimitry Andric // See https://llvm.org/LICENSE.txt for license information. 50b57cec5SDimitry Andric // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 60b57cec5SDimitry Andric // 70b57cec5SDimitry Andric //===----------------------------------------------------------------------===// 80b57cec5SDimitry Andric // 90b57cec5SDimitry Andric // This file implements a crude C++11 based thread pool. 100b57cec5SDimitry Andric // 110b57cec5SDimitry Andric //===----------------------------------------------------------------------===// 120b57cec5SDimitry Andric 130b57cec5SDimitry Andric #include "llvm/Support/ThreadPool.h" 140b57cec5SDimitry Andric 150b57cec5SDimitry Andric #include "llvm/Config/llvm-config.h" 1604eeddc0SDimitry Andric 1706c3fb27SDimitry Andric #include "llvm/Support/FormatVariadic.h" 180b57cec5SDimitry Andric #include "llvm/Support/Threading.h" 190b57cec5SDimitry Andric #include "llvm/Support/raw_ostream.h" 200b57cec5SDimitry Andric 210b57cec5SDimitry Andric using namespace llvm; 220b57cec5SDimitry Andric 23*0fca6ea1SDimitry Andric ThreadPoolInterface::~ThreadPoolInterface() = default; 240b57cec5SDimitry Andric 2581ad6265SDimitry Andric // A note on thread groups: Tasks are by default in no group (represented 2681ad6265SDimitry Andric // by nullptr ThreadPoolTaskGroup pointer in the Tasks queue) and functionality 2781ad6265SDimitry Andric // here normally works on all tasks regardless of their group (functions 2881ad6265SDimitry Andric // in that case receive nullptr ThreadPoolTaskGroup pointer as argument). 2981ad6265SDimitry Andric // A task in a group has a pointer to that ThreadPoolTaskGroup in the Tasks 3081ad6265SDimitry Andric // queue, and functions called to work only on tasks from one group take that 3181ad6265SDimitry Andric // pointer. 3281ad6265SDimitry Andric 33*0fca6ea1SDimitry Andric #if LLVM_ENABLE_THREADS 34*0fca6ea1SDimitry Andric 35*0fca6ea1SDimitry Andric StdThreadPool::StdThreadPool(ThreadPoolStrategy S) 360eae32dcSDimitry Andric : Strategy(S), MaxThreadCount(S.compute_thread_count()) {} 370eae32dcSDimitry Andric 38*0fca6ea1SDimitry Andric void StdThreadPool::grow(int requested) { 3981ad6265SDimitry Andric llvm::sys::ScopedWriter LockGuard(ThreadsLock); 400eae32dcSDimitry Andric if (Threads.size() >= MaxThreadCount) 410eae32dcSDimitry Andric return; // Already hit the max thread pool size. 420eae32dcSDimitry Andric int newThreadCount = std::min<int>(requested, MaxThreadCount); 430eae32dcSDimitry Andric while (static_cast<int>(Threads.size()) < newThreadCount) { 440eae32dcSDimitry Andric int ThreadID = Threads.size(); 450eae32dcSDimitry Andric Threads.emplace_back([this, ThreadID] { 4606c3fb27SDimitry Andric set_thread_name(formatv("llvm-worker-{0}", ThreadID)); 470eae32dcSDimitry Andric Strategy.apply_thread_strategy(ThreadID); 4881ad6265SDimitry Andric processTasks(nullptr); 4981ad6265SDimitry Andric }); 5081ad6265SDimitry Andric } 5181ad6265SDimitry Andric } 5281ad6265SDimitry Andric 5381ad6265SDimitry Andric #ifndef NDEBUG 5481ad6265SDimitry Andric // The group of the tasks run by the current thread. 5581ad6265SDimitry Andric static LLVM_THREAD_LOCAL std::vector<ThreadPoolTaskGroup *> 5681ad6265SDimitry Andric *CurrentThreadTaskGroups = nullptr; 5781ad6265SDimitry Andric #endif 5881ad6265SDimitry Andric 5981ad6265SDimitry Andric // WaitingForGroup == nullptr means all tasks regardless of their group. 60*0fca6ea1SDimitry Andric void StdThreadPool::processTasks(ThreadPoolTaskGroup *WaitingForGroup) { 610b57cec5SDimitry Andric while (true) { 624824e7fdSDimitry Andric std::function<void()> Task; 6381ad6265SDimitry Andric ThreadPoolTaskGroup *GroupOfTask; 640b57cec5SDimitry Andric { 650b57cec5SDimitry Andric std::unique_lock<std::mutex> LockGuard(QueueLock); 6681ad6265SDimitry Andric bool workCompletedForGroup = false; // Result of workCompletedUnlocked() 670b57cec5SDimitry Andric // Wait for tasks to be pushed in the queue 6881ad6265SDimitry Andric QueueCondition.wait(LockGuard, [&] { 6981ad6265SDimitry Andric return !EnableFlag || !Tasks.empty() || 7081ad6265SDimitry Andric (WaitingForGroup != nullptr && 7181ad6265SDimitry Andric (workCompletedForGroup = 7281ad6265SDimitry Andric workCompletedUnlocked(WaitingForGroup))); 7381ad6265SDimitry Andric }); 740b57cec5SDimitry Andric // Exit condition 750b57cec5SDimitry Andric if (!EnableFlag && Tasks.empty()) 760b57cec5SDimitry Andric return; 7781ad6265SDimitry Andric if (WaitingForGroup != nullptr && workCompletedForGroup) 7881ad6265SDimitry Andric return; 790b57cec5SDimitry Andric // Yeah, we have a task, grab it and release the lock on the queue 800b57cec5SDimitry Andric 810b57cec5SDimitry Andric // We first need to signal that we are active before popping the queue 820b57cec5SDimitry Andric // in order for wait() to properly detect that even if the queue is 830b57cec5SDimitry Andric // empty, there is still a task in flight. 840b57cec5SDimitry Andric ++ActiveThreads; 8581ad6265SDimitry Andric Task = std::move(Tasks.front().first); 8681ad6265SDimitry Andric GroupOfTask = Tasks.front().second; 8781ad6265SDimitry Andric // Need to count active threads in each group separately, ActiveThreads 8881ad6265SDimitry Andric // would never be 0 if waiting for another group inside a wait. 8981ad6265SDimitry Andric if (GroupOfTask != nullptr) 9081ad6265SDimitry Andric ++ActiveGroups[GroupOfTask]; // Increment or set to 1 if new item 9181ad6265SDimitry Andric Tasks.pop_front(); 920b57cec5SDimitry Andric } 9381ad6265SDimitry Andric #ifndef NDEBUG 9481ad6265SDimitry Andric if (CurrentThreadTaskGroups == nullptr) 9581ad6265SDimitry Andric CurrentThreadTaskGroups = new std::vector<ThreadPoolTaskGroup *>; 9681ad6265SDimitry Andric CurrentThreadTaskGroups->push_back(GroupOfTask); 9781ad6265SDimitry Andric #endif 9881ad6265SDimitry Andric 990b57cec5SDimitry Andric // Run the task we just grabbed 1000b57cec5SDimitry Andric Task(); 1010b57cec5SDimitry Andric 10281ad6265SDimitry Andric #ifndef NDEBUG 10381ad6265SDimitry Andric CurrentThreadTaskGroups->pop_back(); 10481ad6265SDimitry Andric if (CurrentThreadTaskGroups->empty()) { 10581ad6265SDimitry Andric delete CurrentThreadTaskGroups; 10681ad6265SDimitry Andric CurrentThreadTaskGroups = nullptr; 10781ad6265SDimitry Andric } 10881ad6265SDimitry Andric #endif 10981ad6265SDimitry Andric 1105ffd83dbSDimitry Andric bool Notify; 11181ad6265SDimitry Andric bool NotifyGroup; 1120b57cec5SDimitry Andric { 113*0fca6ea1SDimitry Andric // Adjust `ActiveThreads`, in case someone waits on StdThreadPool::wait() 1145ffd83dbSDimitry Andric std::lock_guard<std::mutex> LockGuard(QueueLock); 1150b57cec5SDimitry Andric --ActiveThreads; 11681ad6265SDimitry Andric if (GroupOfTask != nullptr) { 11781ad6265SDimitry Andric auto A = ActiveGroups.find(GroupOfTask); 11881ad6265SDimitry Andric if (--(A->second) == 0) 11981ad6265SDimitry Andric ActiveGroups.erase(A); 12081ad6265SDimitry Andric } 12181ad6265SDimitry Andric Notify = workCompletedUnlocked(GroupOfTask); 12281ad6265SDimitry Andric NotifyGroup = GroupOfTask != nullptr && Notify; 1230b57cec5SDimitry Andric } 1245ffd83dbSDimitry Andric // Notify task completion if this is the last active thread, in case 125*0fca6ea1SDimitry Andric // someone waits on StdThreadPool::wait(). 1265ffd83dbSDimitry Andric if (Notify) 1270b57cec5SDimitry Andric CompletionCondition.notify_all(); 12881ad6265SDimitry Andric // If this was a task in a group, notify also threads waiting for tasks 12981ad6265SDimitry Andric // in this function on QueueCondition, to make a recursive wait() return 13081ad6265SDimitry Andric // after the group it's been waiting for has finished. 13181ad6265SDimitry Andric if (NotifyGroup) 13281ad6265SDimitry Andric QueueCondition.notify_all(); 1330b57cec5SDimitry Andric } 1340b57cec5SDimitry Andric } 13581ad6265SDimitry Andric 136*0fca6ea1SDimitry Andric bool StdThreadPool::workCompletedUnlocked(ThreadPoolTaskGroup *Group) const { 13781ad6265SDimitry Andric if (Group == nullptr) 13881ad6265SDimitry Andric return !ActiveThreads && Tasks.empty(); 13981ad6265SDimitry Andric return ActiveGroups.count(Group) == 0 && 14081ad6265SDimitry Andric !llvm::any_of(Tasks, 14181ad6265SDimitry Andric [Group](const auto &T) { return T.second == Group; }); 1420b57cec5SDimitry Andric } 1430b57cec5SDimitry Andric 144*0fca6ea1SDimitry Andric void StdThreadPool::wait() { 14581ad6265SDimitry Andric assert(!isWorkerThread()); // Would deadlock waiting for itself. 1460b57cec5SDimitry Andric // Wait for all threads to complete and the queue to be empty 1475ffd83dbSDimitry Andric std::unique_lock<std::mutex> LockGuard(QueueLock); 14881ad6265SDimitry Andric CompletionCondition.wait(LockGuard, 14981ad6265SDimitry Andric [&] { return workCompletedUnlocked(nullptr); }); 15081ad6265SDimitry Andric } 15181ad6265SDimitry Andric 152*0fca6ea1SDimitry Andric void StdThreadPool::wait(ThreadPoolTaskGroup &Group) { 15381ad6265SDimitry Andric // Wait for all threads in the group to complete. 15481ad6265SDimitry Andric if (!isWorkerThread()) { 15581ad6265SDimitry Andric std::unique_lock<std::mutex> LockGuard(QueueLock); 15681ad6265SDimitry Andric CompletionCondition.wait(LockGuard, 15781ad6265SDimitry Andric [&] { return workCompletedUnlocked(&Group); }); 15881ad6265SDimitry Andric return; 15981ad6265SDimitry Andric } 16081ad6265SDimitry Andric // Make sure to not deadlock waiting for oneself. 16181ad6265SDimitry Andric assert(CurrentThreadTaskGroups == nullptr || 16281ad6265SDimitry Andric !llvm::is_contained(*CurrentThreadTaskGroups, &Group)); 16381ad6265SDimitry Andric // Handle the case of recursive call from another task in a different group, 16481ad6265SDimitry Andric // in which case process tasks while waiting to keep the thread busy and avoid 16581ad6265SDimitry Andric // possible deadlock. 16681ad6265SDimitry Andric processTasks(&Group); 1670b57cec5SDimitry Andric } 1680b57cec5SDimitry Andric 169*0fca6ea1SDimitry Andric bool StdThreadPool::isWorkerThread() const { 17081ad6265SDimitry Andric llvm::sys::ScopedReader LockGuard(ThreadsLock); 171fe6060f1SDimitry Andric llvm::thread::id CurrentThreadId = llvm::this_thread::get_id(); 172fe6060f1SDimitry Andric for (const llvm::thread &Thread : Threads) 173fe6060f1SDimitry Andric if (CurrentThreadId == Thread.get_id()) 174fe6060f1SDimitry Andric return true; 175fe6060f1SDimitry Andric return false; 176fe6060f1SDimitry Andric } 177fe6060f1SDimitry Andric 1780b57cec5SDimitry Andric // The destructor joins all threads, waiting for completion. 179*0fca6ea1SDimitry Andric StdThreadPool::~StdThreadPool() { 1800b57cec5SDimitry Andric { 1810b57cec5SDimitry Andric std::unique_lock<std::mutex> LockGuard(QueueLock); 1820b57cec5SDimitry Andric EnableFlag = false; 1830b57cec5SDimitry Andric } 1840b57cec5SDimitry Andric QueueCondition.notify_all(); 18581ad6265SDimitry Andric llvm::sys::ScopedReader LockGuard(ThreadsLock); 1860b57cec5SDimitry Andric for (auto &Worker : Threads) 1870b57cec5SDimitry Andric Worker.join(); 1880b57cec5SDimitry Andric } 1890b57cec5SDimitry Andric 190*0fca6ea1SDimitry Andric #endif // LLVM_ENABLE_THREADS Disabled 1910b57cec5SDimitry Andric 1920b57cec5SDimitry Andric // No threads are launched, issue a warning if ThreadCount is not 0 193*0fca6ea1SDimitry Andric SingleThreadExecutor::SingleThreadExecutor(ThreadPoolStrategy S) { 1940eae32dcSDimitry Andric int ThreadCount = S.compute_thread_count(); 1955ffd83dbSDimitry Andric if (ThreadCount != 1) { 1960b57cec5SDimitry Andric errs() << "Warning: request a ThreadPool with " << ThreadCount 1970b57cec5SDimitry Andric << " threads, but LLVM_ENABLE_THREADS has been turned off\n"; 1980b57cec5SDimitry Andric } 1990b57cec5SDimitry Andric } 2000b57cec5SDimitry Andric 201*0fca6ea1SDimitry Andric void SingleThreadExecutor::wait() { 2020b57cec5SDimitry Andric // Sequential implementation running the tasks 2030b57cec5SDimitry Andric while (!Tasks.empty()) { 20481ad6265SDimitry Andric auto Task = std::move(Tasks.front().first); 20581ad6265SDimitry Andric Tasks.pop_front(); 2060b57cec5SDimitry Andric Task(); 2070b57cec5SDimitry Andric } 2080b57cec5SDimitry Andric } 2090b57cec5SDimitry Andric 210*0fca6ea1SDimitry Andric void SingleThreadExecutor::wait(ThreadPoolTaskGroup &) { 21181ad6265SDimitry Andric // Simply wait for all, this works even if recursive (the running task 21281ad6265SDimitry Andric // is already removed from the queue). 21381ad6265SDimitry Andric wait(); 21481ad6265SDimitry Andric } 21581ad6265SDimitry Andric 216*0fca6ea1SDimitry Andric bool SingleThreadExecutor::isWorkerThread() const { 21704eeddc0SDimitry Andric report_fatal_error("LLVM compiled without multithreading"); 21804eeddc0SDimitry Andric } 21904eeddc0SDimitry Andric 220*0fca6ea1SDimitry Andric SingleThreadExecutor::~SingleThreadExecutor() { wait(); } 221