xref: /netbsd-src/external/gpl3/binutils/dist/gprofng/src/DbeThread.cc (revision cb63e24e8d6aae7ddac1859a9015f48b1d8bd90e)
1 /* Copyright (C) 2021-2024 Free Software Foundation, Inc.
2    Contributed by Oracle.
3 
4    This file is part of GNU Binutils.
5 
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3, or (at your option)
9    any later version.
10 
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15 
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, write to the Free Software
18    Foundation, 51 Franklin Street - Fifth Floor, Boston,
19    MA 02110-1301, USA.  */
20 
21 #include "config.h"
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <sys/types.h>
26 #include <unistd.h>
27 
28 #include "DbeThread.h"
29 #include "util.h"
30 #include "vec.h"
31 
32 static void
cleanup_free_mutex(void * arg)33 cleanup_free_mutex (void* arg) {
34   //  pthread_mutex_t *p_mutex = (pthread_mutex_t *) arg;
35   //  if (p_mutex)
36   //    pthread_mutex_unlock (p_mutex);
37 }
38 
39 static void*
thread_pool_loop(void * arg)40 thread_pool_loop (void* arg)
41 {
42   DbeThreadPool *thrp = (DbeThreadPool*) arg;
43   Dprintf (DEBUG_THREADS, "thread_pool_loop:%d starting thread=%llu\n",
44 	   __LINE__, (unsigned long long) pthread_self ());
45 
46   /* set my cancel state to 'enabled', and cancel type to 'defered'. */
47   pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL);
48   pthread_setcanceltype (PTHREAD_CANCEL_DEFERRED, NULL);
49 
50   /* set thread cleanup handler */
51   pthread_cleanup_push (cleanup_free_mutex, (void*) & (thrp->p_mutex));
52   for (;;)
53     {
54       DbeQueue *q = thrp->get_queue ();
55       if (q)
56 	{ /* a request is pending */
57 	  Dprintf (DEBUG_THREADS,
58 		   "thread_pool_loop:%d thread=%llu queue=%d start\n",
59 		   __LINE__, (unsigned long long) pthread_self (), q->id);
60 	  q->func (q->arg);
61 	  Dprintf (DEBUG_THREADS,
62 		   "thread_pool_loop:%d thread=%llu queue=%d done\n",
63 		   __LINE__, (unsigned long long) pthread_self (), q->id);
64 	  delete q;
65 	  continue;
66 	}
67       if (thrp->no_new_queues)
68 	{
69 	  Dprintf (DEBUG_THREADS, "thread_pool_loop:%d exit thread=%llu\n",
70 		   __LINE__, (unsigned long long) pthread_self ());
71 	  pthread_exit (NULL);
72 	}
73       Dprintf (DEBUG_THREADS,
74 	       "thread_pool_loop:%d before pthread_cond_wait thread=%llu\n",
75 	       __LINE__, (unsigned long long) pthread_self ());
76       pthread_mutex_lock (&thrp->p_mutex);
77       pthread_cond_wait (&thrp->p_cond_var, &thrp->p_mutex);
78       Dprintf (DEBUG_THREADS,
79 	       "thread_pool_loop:%d after pthread_cond_wait thread=%llu\n",
80 	       __LINE__, (unsigned long long) pthread_self ());
81       pthread_mutex_unlock (&thrp->p_mutex);
82     }
83 
84   // never reached, but we must use it here. See `man pthread_cleanup_push`
85   pthread_cleanup_pop (0);
86 }
87 
DbeThreadPool(int _max_threads)88 DbeThreadPool::DbeThreadPool (int _max_threads)
89 {
90   static const int DBE_NTHREADS_DEFAULT = 4;
91   char *s = getenv ("GPROFNG_DBE_NTHREADS");
92   if (s)
93     {
94       max_threads = atoi (s);
95       if (max_threads < 0)
96 	max_threads = 0;
97       if (_max_threads > 0 && max_threads < _max_threads)
98 	max_threads = _max_threads;
99     }
100   else
101     {
102       max_threads = _max_threads;
103       if (max_threads < 0)
104 	max_threads = DBE_NTHREADS_DEFAULT;
105     }
106   Dprintf (DEBUG_THREADS, "DbeThreadPool:%d  max_threads %d ---> %d\n",
107 	   __LINE__, _max_threads, max_threads);
108   pthread_mutex_init (&p_mutex, NULL);
109   pthread_cond_init (&p_cond_var, NULL);
110   threads = new Vector <pthread_t>(max_threads);
111   queue = NULL;
112   last_queue = NULL;
113   no_new_queues = false;
114   queues_cnt = 0;
115   total_queues = 0;
116 }
117 
~DbeThreadPool()118 DbeThreadPool::~DbeThreadPool ()
119 {
120   delete threads;
121 }
122 
123 DbeQueue *
get_queue()124 DbeThreadPool::get_queue ()
125 {
126   pthread_mutex_lock (&p_mutex);
127   DbeQueue *q = queue;
128   Dprintf (DEBUG_THREADS,
129    "get_queue:%d thr: %lld id=%d queues_cnt=%d threads_cnt=%d max_threads=%d\n",
130 	   __LINE__, (unsigned long long) pthread_self (),
131 	   q ? q->id : -1, queues_cnt, (int) threads->size (), max_threads);
132   if (q)
133     {
134       queue = q->next;
135       queues_cnt--;
136     }
137   pthread_mutex_unlock (&p_mutex);
138   return q;
139 }
140 
141 void
put_queue(DbeQueue * q)142 DbeThreadPool::put_queue (DbeQueue *q)
143 {
144   if (max_threads == 0)
145     {
146       // nothing runs in parallel
147       q->id = ++total_queues;
148       Dprintf (DEBUG_THREADS, NTXT ("put_queue:%d thr=%lld max_threads=%d queue (%d) runs on the worked thread\n"),
149 	       __LINE__, (unsigned long long) pthread_self (), max_threads, q->id);
150       q->func (q->arg);
151       delete q;
152       return;
153     }
154 
155   pthread_mutex_lock (&p_mutex);
156   // nothing runs in parallel
157   q->id = ++total_queues;
158   Dprintf (DEBUG_THREADS, "put_queue:%d thr=%lld max_threads=%d queue (%d)\n",
159 	   __LINE__, (unsigned long long) pthread_self (), max_threads, q->id);
160   if (queue)
161     {
162       last_queue->next = q;
163       last_queue = q;
164     }
165   else
166     {
167       queue = q;
168       last_queue = q;
169     }
170   queues_cnt++;
171   Dprintf (DEBUG_THREADS,
172 	   "put_queue:%d id=%d queues_cnt=%d threads_cnt=%d max_threads=%d\n",
173 	   __LINE__, q->id, queues_cnt, (int) threads->size (), max_threads);
174   if (queues_cnt > threads->size () && threads->size () < max_threads)
175     {
176       pthread_t thr;
177       int r = pthread_create (&thr, NULL, thread_pool_loop, (void *) this);
178       Dprintf (DEBUG_THREADS,
179 	       "put_queue:%d pthread_create returns %d thr=%llu\n",
180 	       __LINE__, r, (unsigned long long) thr);
181       if (r)
182 	fprintf (stderr, GTXT ("pthread_create failed. errnum=%d (%s)\n"), r,
183 		 STR (strerror (r)));
184       else
185 	threads->append (thr);
186     }
187   pthread_cond_signal (&p_cond_var);
188   pthread_mutex_unlock (&p_mutex);
189 }
190 
191 void
wait_queues()192 DbeThreadPool::wait_queues ()
193 {
194   pthread_mutex_lock (&p_mutex);
195   no_new_queues = true;
196   pthread_mutex_unlock (&p_mutex);
197   pthread_cond_broadcast (&p_cond_var);
198   for (;;) // Run requests on the worked thread too
199     {
200       DbeQueue *q = get_queue ();
201       if (q == NULL)
202 	break;
203       Dprintf (DEBUG_THREADS, "wait_queues:%d thread=%llu queue=%d start\n",
204 	       __LINE__, (unsigned long long) pthread_self (), q->id);
205       q->func (q->arg);
206       Dprintf (DEBUG_THREADS, "wait_queues:%d thread=%llu queue=%d done\n",
207 	       __LINE__, (unsigned long long) pthread_self (), q->id);
208       delete q;
209     }
210   for (int i = 0, sz = threads->size (); i < sz; i++)
211     {
212       void *retval;
213       pthread_join (threads->get (i), &retval);
214     }
215 }
216 
DbeQueue(int (* _func)(void * arg),void * _arg)217 DbeQueue::DbeQueue (int (*_func) (void *arg), void *_arg)
218 {
219   func = _func;
220   arg = _arg;
221   next = NULL;
222 }
223 
~DbeQueue()224 DbeQueue::~DbeQueue () { }
225