xref: /netbsd-src/external/gpl3/gdb/dist/gdbsupport/parallel-for.h (revision 2dd295436a0082eb4f8d294f4aa73c223413d0f2)
1 /* Parallel for loops
2 
3    Copyright (C) 2019-2023 Free Software Foundation, Inc.
4 
5    This file is part of GDB.
6 
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11 
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16 
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.  */
19 
20 #ifndef GDBSUPPORT_PARALLEL_FOR_H
21 #define GDBSUPPORT_PARALLEL_FOR_H
22 
23 #include <algorithm>
24 #include <type_traits>
25 #include "gdbsupport/invoke-result.h"
26 #include "gdbsupport/thread-pool.h"
27 #include "gdbsupport/function-view.h"
28 
29 namespace gdb
30 {
31 
32 namespace detail
33 {
34 
35 /* This is a helper class that is used to accumulate results for
36    parallel_for.  There is a specialization for 'void', below.  */
37 template<typename T>
38 struct par_for_accumulator
39 {
40 public:
41 
42   explicit par_for_accumulator (size_t n_threads)
43     : m_futures (n_threads)
44   {
45   }
46 
47   /* The result type that is accumulated.  */
48   typedef std::vector<T> result_type;
49 
50   /* Post the Ith task to a background thread, and store a future for
51      later.  */
52   void post (size_t i, std::function<T ()> task)
53   {
54     m_futures[i]
55       = gdb::thread_pool::g_thread_pool->post_task (std::move (task));
56   }
57 
58   /* Invoke TASK in the current thread, then compute all the results
59      from all background tasks and put them into a result vector,
60      which is returned.  */
61   result_type finish (gdb::function_view<T ()> task)
62   {
63     result_type result (m_futures.size () + 1);
64 
65     result.back () = task ();
66 
67     for (size_t i = 0; i < m_futures.size (); ++i)
68       result[i] = m_futures[i].get ();
69 
70     return result;
71   }
72 
73   /* Resize the results to N.  */
74   void resize (size_t n)
75   {
76     m_futures.resize (n);
77   }
78 
79 private:
80 
81   /* A vector of futures coming from the tasks run in the
82      background.  */
83   std::vector<gdb::future<T>> m_futures;
84 };
85 
86 /* See the generic template.  */
87 template<>
88 struct par_for_accumulator<void>
89 {
90 public:
91 
92   explicit par_for_accumulator (size_t n_threads)
93     : m_futures (n_threads)
94   {
95   }
96 
97   /* This specialization does not compute results.  */
98   typedef void result_type;
99 
100   void post (size_t i, std::function<void ()> task)
101   {
102     m_futures[i]
103       = gdb::thread_pool::g_thread_pool->post_task (std::move (task));
104   }
105 
106   result_type finish (gdb::function_view<void ()> task)
107   {
108     task ();
109 
110     for (auto &future : m_futures)
111       {
112 	/* Use 'get' and not 'wait', to propagate any exception.  */
113 	future.get ();
114       }
115   }
116 
117   /* Resize the results to N.  */
118   void resize (size_t n)
119   {
120     m_futures.resize (n);
121   }
122 
123 private:
124 
125   std::vector<gdb::future<void>> m_futures;
126 };
127 
128 }
129 
130 /* A very simple "parallel for".  This splits the range of iterators
131    into subranges, and then passes each subrange to the callback.  The
132    work may or may not be done in separate threads.
133 
134    This approach was chosen over having the callback work on single
135    items because it makes it simple for the caller to do
136    once-per-subrange initialization and destruction.
137 
138    The parameter N says how batching ought to be done -- there will be
139    at least N elements processed per thread.  Setting N to 0 is not
140    allowed.
141 
142    If the function returns a non-void type, then a vector of the
143    results is returned.  The size of the resulting vector depends on
144    the number of threads that were used.  */
145 
146 template<class RandomIt, class RangeFunction>
147 typename gdb::detail::par_for_accumulator<
148     typename gdb::invoke_result<RangeFunction, RandomIt, RandomIt>::type
149   >::result_type
150 parallel_for_each (unsigned n, RandomIt first, RandomIt last,
151 		   RangeFunction callback,
152 		   gdb::function_view<size_t(RandomIt)> task_size = nullptr)
153 {
154   using result_type
155     = typename gdb::invoke_result<RangeFunction, RandomIt, RandomIt>::type;
156 
157   /* If enabled, print debug info about how the work is distributed across
158      the threads.  */
159   const bool parallel_for_each_debug = false;
160 
161   size_t n_worker_threads = thread_pool::g_thread_pool->thread_count ();
162   size_t n_threads = n_worker_threads;
163   size_t n_elements = last - first;
164   size_t elts_per_thread = 0;
165   size_t elts_left_over = 0;
166   size_t total_size = 0;
167   size_t size_per_thread = 0;
168   size_t max_element_size = n_elements == 0 ? 1 : SIZE_MAX / n_elements;
169 
170   if (n_threads > 1)
171     {
172       if (task_size != nullptr)
173 	{
174 	  gdb_assert (n == 1);
175 	  for (RandomIt i = first; i != last; ++i)
176 	    {
177 	      size_t element_size = task_size (i);
178 	      gdb_assert (element_size > 0);
179 	      if (element_size > max_element_size)
180 		/* We could start scaling here, but that doesn't seem to be
181 		   worth the effort.  */
182 		element_size = max_element_size;
183 	      size_t prev_total_size = total_size;
184 	      total_size += element_size;
185 	      /* Check for overflow.  */
186 	      gdb_assert (prev_total_size < total_size);
187 	    }
188 	  size_per_thread = total_size / n_threads;
189 	}
190       else
191 	{
192 	  /* Require that there should be at least N elements in a
193 	     thread.  */
194 	  gdb_assert (n > 0);
195 	  if (n_elements / n_threads < n)
196 	    n_threads = std::max (n_elements / n, (size_t) 1);
197 	  elts_per_thread = n_elements / n_threads;
198 	  elts_left_over = n_elements % n_threads;
199 	  /* n_elements == n_threads * elts_per_thread + elts_left_over. */
200 	}
201     }
202 
203   size_t count = n_threads == 0 ? 0 : n_threads - 1;
204   gdb::detail::par_for_accumulator<result_type> results (count);
205 
206   if (parallel_for_each_debug)
207     {
208       debug_printf (_("Parallel for: n_elements: %zu\n"), n_elements);
209       if (task_size != nullptr)
210 	{
211 	  debug_printf (_("Parallel for: total_size: %zu\n"), total_size);
212 	  debug_printf (_("Parallel for: size_per_thread: %zu\n"), size_per_thread);
213 	}
214       else
215 	{
216 	  debug_printf (_("Parallel for: minimum elements per thread: %u\n"), n);
217 	  debug_printf (_("Parallel for: elts_per_thread: %zu\n"), elts_per_thread);
218 	}
219     }
220 
221   size_t remaining_size = total_size;
222   for (int i = 0; i < count; ++i)
223     {
224       RandomIt end;
225       size_t chunk_size = 0;
226       if (task_size == nullptr)
227 	{
228 	  end = first + elts_per_thread;
229 	  if (i < elts_left_over)
230 	    /* Distribute the leftovers over the worker threads, to avoid having
231 	       to handle all of them in a single thread.  */
232 	    end++;
233 	}
234       else
235 	{
236 	  RandomIt j;
237 	  for (j = first; j < last && chunk_size < size_per_thread; ++j)
238 	    {
239 	      size_t element_size = task_size (j);
240 	      if (element_size > max_element_size)
241 		element_size = max_element_size;
242 	      chunk_size += element_size;
243 	    }
244 	  end = j;
245 	  remaining_size -= chunk_size;
246 	}
247 
248       /* This case means we don't have enough elements to really
249 	 distribute them.  Rather than ever submit a task that does
250 	 nothing, we short-circuit here.  */
251       if (first == end)
252 	end = last;
253 
254       if (end == last)
255 	{
256 	  /* We're about to dispatch the last batch of elements, which
257 	     we normally process in the main thread.  So just truncate
258 	     the result list here.  This avoids submitting empty tasks
259 	     to the thread pool.  */
260 	  count = i;
261 	  results.resize (count);
262 	  break;
263 	}
264 
265       if (parallel_for_each_debug)
266 	{
267 	  debug_printf (_("Parallel for: elements on worker thread %i\t: %zu"),
268 			i, (size_t)(end - first));
269 	  if (task_size != nullptr)
270 	    debug_printf (_("\t(size: %zu)"), chunk_size);
271 	  debug_printf (_("\n"));
272 	}
273       results.post (i, [=] ()
274         {
275 	  return callback (first, end);
276 	});
277       first = end;
278     }
279 
280   for (int i = count; i < n_worker_threads; ++i)
281     if (parallel_for_each_debug)
282       {
283 	debug_printf (_("Parallel for: elements on worker thread %i\t: 0"), i);
284 	if (task_size != nullptr)
285 	  debug_printf (_("\t(size: 0)"));
286 	debug_printf (_("\n"));
287       }
288 
289   /* Process all the remaining elements in the main thread.  */
290   if (parallel_for_each_debug)
291     {
292       debug_printf (_("Parallel for: elements on main thread\t\t: %zu"),
293 		    (size_t)(last - first));
294       if (task_size != nullptr)
295 	debug_printf (_("\t(size: %zu)"), remaining_size);
296       debug_printf (_("\n"));
297     }
298   return results.finish ([=] ()
299     {
300       return callback (first, last);
301     });
302 }
303 
304 /* A sequential drop-in replacement of parallel_for_each.  This can be useful
305    when debugging multi-threading behaviour, and you want to limit
306    multi-threading in a fine-grained way.  */
307 
308 template<class RandomIt, class RangeFunction>
309 typename gdb::detail::par_for_accumulator<
310     typename gdb::invoke_result<RangeFunction, RandomIt, RandomIt>::type
311   >::result_type
312 sequential_for_each (unsigned n, RandomIt first, RandomIt last,
313 		     RangeFunction callback,
314 		     gdb::function_view<size_t(RandomIt)> task_size = nullptr)
315 {
316   using result_type = typename gdb::invoke_result<RangeFunction, RandomIt, RandomIt>::type;
317 
318   gdb::detail::par_for_accumulator<result_type> results (0);
319 
320   /* Process all the remaining elements in the main thread.  */
321   return results.finish ([=] ()
322     {
323       return callback (first, last);
324     });
325 }
326 
327 }
328 
329 #endif /* GDBSUPPORT_PARALLEL_FOR_H */
330