17d62b00eSchristos /* Parallel for loops 27d62b00eSchristos 3*6881a400Schristos Copyright (C) 2019-2023 Free Software Foundation, Inc. 47d62b00eSchristos 57d62b00eSchristos This file is part of GDB. 67d62b00eSchristos 77d62b00eSchristos This program is free software; you can redistribute it and/or modify 87d62b00eSchristos it under the terms of the GNU General Public License as published by 97d62b00eSchristos the Free Software Foundation; either version 3 of the License, or 107d62b00eSchristos (at your option) any later version. 117d62b00eSchristos 127d62b00eSchristos This program is distributed in the hope that it will be useful, 137d62b00eSchristos but WITHOUT ANY WARRANTY; without even the implied warranty of 147d62b00eSchristos MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 157d62b00eSchristos GNU General Public License for more details. 167d62b00eSchristos 177d62b00eSchristos You should have received a copy of the GNU General Public License 187d62b00eSchristos along with this program. If not, see <http://www.gnu.org/licenses/>. */ 197d62b00eSchristos 207d62b00eSchristos #ifndef GDBSUPPORT_PARALLEL_FOR_H 217d62b00eSchristos #define GDBSUPPORT_PARALLEL_FOR_H 227d62b00eSchristos 237d62b00eSchristos #include <algorithm> 24*6881a400Schristos #include <type_traits> 25*6881a400Schristos #include "gdbsupport/invoke-result.h" 267d62b00eSchristos #include "gdbsupport/thread-pool.h" 27*6881a400Schristos #include "gdbsupport/function-view.h" 287d62b00eSchristos 297d62b00eSchristos namespace gdb 307d62b00eSchristos { 317d62b00eSchristos 32*6881a400Schristos namespace detail 33*6881a400Schristos { 34*6881a400Schristos 35*6881a400Schristos /* This is a helper class that is used to accumulate results for 36*6881a400Schristos parallel_for. There is a specialization for 'void', below. */ 37*6881a400Schristos template<typename T> 38*6881a400Schristos struct par_for_accumulator 39*6881a400Schristos { 40*6881a400Schristos public: 41*6881a400Schristos 42*6881a400Schristos explicit par_for_accumulator (size_t n_threads) 43*6881a400Schristos : m_futures (n_threads) 44*6881a400Schristos { 45*6881a400Schristos } 46*6881a400Schristos 47*6881a400Schristos /* The result type that is accumulated. */ 48*6881a400Schristos typedef std::vector<T> result_type; 49*6881a400Schristos 50*6881a400Schristos /* Post the Ith task to a background thread, and store a future for 51*6881a400Schristos later. */ 52*6881a400Schristos void post (size_t i, std::function<T ()> task) 53*6881a400Schristos { 54*6881a400Schristos m_futures[i] 55*6881a400Schristos = gdb::thread_pool::g_thread_pool->post_task (std::move (task)); 56*6881a400Schristos } 57*6881a400Schristos 58*6881a400Schristos /* Invoke TASK in the current thread, then compute all the results 59*6881a400Schristos from all background tasks and put them into a result vector, 60*6881a400Schristos which is returned. */ 61*6881a400Schristos result_type finish (gdb::function_view<T ()> task) 62*6881a400Schristos { 63*6881a400Schristos result_type result (m_futures.size () + 1); 64*6881a400Schristos 65*6881a400Schristos result.back () = task (); 66*6881a400Schristos 67*6881a400Schristos for (size_t i = 0; i < m_futures.size (); ++i) 68*6881a400Schristos result[i] = m_futures[i].get (); 69*6881a400Schristos 70*6881a400Schristos return result; 71*6881a400Schristos } 72*6881a400Schristos 73*6881a400Schristos /* Resize the results to N. */ 74*6881a400Schristos void resize (size_t n) 75*6881a400Schristos { 76*6881a400Schristos m_futures.resize (n); 77*6881a400Schristos } 78*6881a400Schristos 79*6881a400Schristos private: 80*6881a400Schristos 81*6881a400Schristos /* A vector of futures coming from the tasks run in the 82*6881a400Schristos background. */ 83*6881a400Schristos std::vector<gdb::future<T>> m_futures; 84*6881a400Schristos }; 85*6881a400Schristos 86*6881a400Schristos /* See the generic template. */ 87*6881a400Schristos template<> 88*6881a400Schristos struct par_for_accumulator<void> 89*6881a400Schristos { 90*6881a400Schristos public: 91*6881a400Schristos 92*6881a400Schristos explicit par_for_accumulator (size_t n_threads) 93*6881a400Schristos : m_futures (n_threads) 94*6881a400Schristos { 95*6881a400Schristos } 96*6881a400Schristos 97*6881a400Schristos /* This specialization does not compute results. */ 98*6881a400Schristos typedef void result_type; 99*6881a400Schristos 100*6881a400Schristos void post (size_t i, std::function<void ()> task) 101*6881a400Schristos { 102*6881a400Schristos m_futures[i] 103*6881a400Schristos = gdb::thread_pool::g_thread_pool->post_task (std::move (task)); 104*6881a400Schristos } 105*6881a400Schristos 106*6881a400Schristos result_type finish (gdb::function_view<void ()> task) 107*6881a400Schristos { 108*6881a400Schristos task (); 109*6881a400Schristos 110*6881a400Schristos for (auto &future : m_futures) 111*6881a400Schristos { 112*6881a400Schristos /* Use 'get' and not 'wait', to propagate any exception. */ 113*6881a400Schristos future.get (); 114*6881a400Schristos } 115*6881a400Schristos } 116*6881a400Schristos 117*6881a400Schristos /* Resize the results to N. */ 118*6881a400Schristos void resize (size_t n) 119*6881a400Schristos { 120*6881a400Schristos m_futures.resize (n); 121*6881a400Schristos } 122*6881a400Schristos 123*6881a400Schristos private: 124*6881a400Schristos 125*6881a400Schristos std::vector<gdb::future<void>> m_futures; 126*6881a400Schristos }; 127*6881a400Schristos 128*6881a400Schristos } 129*6881a400Schristos 1307d62b00eSchristos /* A very simple "parallel for". This splits the range of iterators 1317d62b00eSchristos into subranges, and then passes each subrange to the callback. The 1327d62b00eSchristos work may or may not be done in separate threads. 1337d62b00eSchristos 1347d62b00eSchristos This approach was chosen over having the callback work on single 1357d62b00eSchristos items because it makes it simple for the caller to do 136*6881a400Schristos once-per-subrange initialization and destruction. 137*6881a400Schristos 138*6881a400Schristos The parameter N says how batching ought to be done -- there will be 139*6881a400Schristos at least N elements processed per thread. Setting N to 0 is not 140*6881a400Schristos allowed. 141*6881a400Schristos 142*6881a400Schristos If the function returns a non-void type, then a vector of the 143*6881a400Schristos results is returned. The size of the resulting vector depends on 144*6881a400Schristos the number of threads that were used. */ 1457d62b00eSchristos 1467d62b00eSchristos template<class RandomIt, class RangeFunction> 147*6881a400Schristos typename gdb::detail::par_for_accumulator< 148*6881a400Schristos typename gdb::invoke_result<RangeFunction, RandomIt, RandomIt>::type 149*6881a400Schristos >::result_type 150*6881a400Schristos parallel_for_each (unsigned n, RandomIt first, RandomIt last, 151*6881a400Schristos RangeFunction callback, 152*6881a400Schristos gdb::function_view<size_t(RandomIt)> task_size = nullptr) 1537d62b00eSchristos { 154*6881a400Schristos using result_type 155*6881a400Schristos = typename gdb::invoke_result<RangeFunction, RandomIt, RandomIt>::type; 1567d62b00eSchristos 157*6881a400Schristos /* If enabled, print debug info about how the work is distributed across 158*6881a400Schristos the threads. */ 159*6881a400Schristos const bool parallel_for_each_debug = false; 160*6881a400Schristos 161*6881a400Schristos size_t n_worker_threads = thread_pool::g_thread_pool->thread_count (); 162*6881a400Schristos size_t n_threads = n_worker_threads; 1637d62b00eSchristos size_t n_elements = last - first; 164*6881a400Schristos size_t elts_per_thread = 0; 165*6881a400Schristos size_t elts_left_over = 0; 166*6881a400Schristos size_t total_size = 0; 167*6881a400Schristos size_t size_per_thread = 0; 168*6881a400Schristos size_t max_element_size = n_elements == 0 ? 1 : SIZE_MAX / n_elements; 169*6881a400Schristos 1707d62b00eSchristos if (n_threads > 1) 1717d62b00eSchristos { 172*6881a400Schristos if (task_size != nullptr) 1737d62b00eSchristos { 174*6881a400Schristos gdb_assert (n == 1); 175*6881a400Schristos for (RandomIt i = first; i != last; ++i) 1767d62b00eSchristos { 177*6881a400Schristos size_t element_size = task_size (i); 178*6881a400Schristos gdb_assert (element_size > 0); 179*6881a400Schristos if (element_size > max_element_size) 180*6881a400Schristos /* We could start scaling here, but that doesn't seem to be 181*6881a400Schristos worth the effort. */ 182*6881a400Schristos element_size = max_element_size; 183*6881a400Schristos size_t prev_total_size = total_size; 184*6881a400Schristos total_size += element_size; 185*6881a400Schristos /* Check for overflow. */ 186*6881a400Schristos gdb_assert (prev_total_size < total_size); 187*6881a400Schristos } 188*6881a400Schristos size_per_thread = total_size / n_threads; 189*6881a400Schristos } 190*6881a400Schristos else 191*6881a400Schristos { 192*6881a400Schristos /* Require that there should be at least N elements in a 193*6881a400Schristos thread. */ 194*6881a400Schristos gdb_assert (n > 0); 195*6881a400Schristos if (n_elements / n_threads < n) 196*6881a400Schristos n_threads = std::max (n_elements / n, (size_t) 1); 197*6881a400Schristos elts_per_thread = n_elements / n_threads; 198*6881a400Schristos elts_left_over = n_elements % n_threads; 199*6881a400Schristos /* n_elements == n_threads * elts_per_thread + elts_left_over. */ 200*6881a400Schristos } 201*6881a400Schristos } 2027d62b00eSchristos 203*6881a400Schristos size_t count = n_threads == 0 ? 0 : n_threads - 1; 204*6881a400Schristos gdb::detail::par_for_accumulator<result_type> results (count); 205*6881a400Schristos 206*6881a400Schristos if (parallel_for_each_debug) 207*6881a400Schristos { 208*6881a400Schristos debug_printf (_("Parallel for: n_elements: %zu\n"), n_elements); 209*6881a400Schristos if (task_size != nullptr) 210*6881a400Schristos { 211*6881a400Schristos debug_printf (_("Parallel for: total_size: %zu\n"), total_size); 212*6881a400Schristos debug_printf (_("Parallel for: size_per_thread: %zu\n"), size_per_thread); 213*6881a400Schristos } 214*6881a400Schristos else 215*6881a400Schristos { 216*6881a400Schristos debug_printf (_("Parallel for: minimum elements per thread: %u\n"), n); 217*6881a400Schristos debug_printf (_("Parallel for: elts_per_thread: %zu\n"), elts_per_thread); 218*6881a400Schristos } 219*6881a400Schristos } 220*6881a400Schristos 221*6881a400Schristos size_t remaining_size = total_size; 222*6881a400Schristos for (int i = 0; i < count; ++i) 223*6881a400Schristos { 224*6881a400Schristos RandomIt end; 225*6881a400Schristos size_t chunk_size = 0; 226*6881a400Schristos if (task_size == nullptr) 227*6881a400Schristos { 228*6881a400Schristos end = first + elts_per_thread; 229*6881a400Schristos if (i < elts_left_over) 230*6881a400Schristos /* Distribute the leftovers over the worker threads, to avoid having 231*6881a400Schristos to handle all of them in a single thread. */ 232*6881a400Schristos end++; 233*6881a400Schristos } 234*6881a400Schristos else 235*6881a400Schristos { 236*6881a400Schristos RandomIt j; 237*6881a400Schristos for (j = first; j < last && chunk_size < size_per_thread; ++j) 238*6881a400Schristos { 239*6881a400Schristos size_t element_size = task_size (j); 240*6881a400Schristos if (element_size > max_element_size) 241*6881a400Schristos element_size = max_element_size; 242*6881a400Schristos chunk_size += element_size; 243*6881a400Schristos } 244*6881a400Schristos end = j; 245*6881a400Schristos remaining_size -= chunk_size; 246*6881a400Schristos } 247*6881a400Schristos 248*6881a400Schristos /* This case means we don't have enough elements to really 249*6881a400Schristos distribute them. Rather than ever submit a task that does 250*6881a400Schristos nothing, we short-circuit here. */ 251*6881a400Schristos if (first == end) 252*6881a400Schristos end = last; 253*6881a400Schristos 254*6881a400Schristos if (end == last) 255*6881a400Schristos { 256*6881a400Schristos /* We're about to dispatch the last batch of elements, which 257*6881a400Schristos we normally process in the main thread. So just truncate 258*6881a400Schristos the result list here. This avoids submitting empty tasks 259*6881a400Schristos to the thread pool. */ 260*6881a400Schristos count = i; 261*6881a400Schristos results.resize (count); 262*6881a400Schristos break; 263*6881a400Schristos } 264*6881a400Schristos 265*6881a400Schristos if (parallel_for_each_debug) 266*6881a400Schristos { 267*6881a400Schristos debug_printf (_("Parallel for: elements on worker thread %i\t: %zu"), 268*6881a400Schristos i, (size_t)(end - first)); 269*6881a400Schristos if (task_size != nullptr) 270*6881a400Schristos debug_printf (_("\t(size: %zu)"), chunk_size); 271*6881a400Schristos debug_printf (_("\n")); 272*6881a400Schristos } 273*6881a400Schristos results.post (i, [=] () 274*6881a400Schristos { 275*6881a400Schristos return callback (first, end); 276*6881a400Schristos }); 2777d62b00eSchristos first = end; 2787d62b00eSchristos } 279*6881a400Schristos 280*6881a400Schristos for (int i = count; i < n_worker_threads; ++i) 281*6881a400Schristos if (parallel_for_each_debug) 282*6881a400Schristos { 283*6881a400Schristos debug_printf (_("Parallel for: elements on worker thread %i\t: 0"), i); 284*6881a400Schristos if (task_size != nullptr) 285*6881a400Schristos debug_printf (_("\t(size: 0)")); 286*6881a400Schristos debug_printf (_("\n")); 2877d62b00eSchristos } 2887d62b00eSchristos 2897d62b00eSchristos /* Process all the remaining elements in the main thread. */ 290*6881a400Schristos if (parallel_for_each_debug) 291*6881a400Schristos { 292*6881a400Schristos debug_printf (_("Parallel for: elements on main thread\t\t: %zu"), 293*6881a400Schristos (size_t)(last - first)); 294*6881a400Schristos if (task_size != nullptr) 295*6881a400Schristos debug_printf (_("\t(size: %zu)"), remaining_size); 296*6881a400Schristos debug_printf (_("\n")); 297*6881a400Schristos } 298*6881a400Schristos return results.finish ([=] () 299*6881a400Schristos { 300*6881a400Schristos return callback (first, last); 301*6881a400Schristos }); 302*6881a400Schristos } 3037d62b00eSchristos 304*6881a400Schristos /* A sequential drop-in replacement of parallel_for_each. This can be useful 305*6881a400Schristos when debugging multi-threading behaviour, and you want to limit 306*6881a400Schristos multi-threading in a fine-grained way. */ 307*6881a400Schristos 308*6881a400Schristos template<class RandomIt, class RangeFunction> 309*6881a400Schristos typename gdb::detail::par_for_accumulator< 310*6881a400Schristos typename gdb::invoke_result<RangeFunction, RandomIt, RandomIt>::type 311*6881a400Schristos >::result_type 312*6881a400Schristos sequential_for_each (unsigned n, RandomIt first, RandomIt last, 313*6881a400Schristos RangeFunction callback, 314*6881a400Schristos gdb::function_view<size_t(RandomIt)> task_size = nullptr) 315*6881a400Schristos { 316*6881a400Schristos using result_type = typename gdb::invoke_result<RangeFunction, RandomIt, RandomIt>::type; 317*6881a400Schristos 318*6881a400Schristos gdb::detail::par_for_accumulator<result_type> results (0); 319*6881a400Schristos 320*6881a400Schristos /* Process all the remaining elements in the main thread. */ 321*6881a400Schristos return results.finish ([=] () 322*6881a400Schristos { 323*6881a400Schristos return callback (first, last); 324*6881a400Schristos }); 3257d62b00eSchristos } 3267d62b00eSchristos 3277d62b00eSchristos } 3287d62b00eSchristos 3297d62b00eSchristos #endif /* GDBSUPPORT_PARALLEL_FOR_H */ 330