xref: /netbsd-src/external/gpl3/binutils.old/dist/gprofng/src/DbeThread.cc (revision c42dbd0ed2e61fe6eda8590caa852ccf34719964)
1*c42dbd0eSchristos /* Copyright (C) 2021 Free Software Foundation, Inc.
2*c42dbd0eSchristos    Contributed by Oracle.
3*c42dbd0eSchristos 
4*c42dbd0eSchristos    This file is part of GNU Binutils.
5*c42dbd0eSchristos 
6*c42dbd0eSchristos    This program is free software; you can redistribute it and/or modify
7*c42dbd0eSchristos    it under the terms of the GNU General Public License as published by
8*c42dbd0eSchristos    the Free Software Foundation; either version 3, or (at your option)
9*c42dbd0eSchristos    any later version.
10*c42dbd0eSchristos 
11*c42dbd0eSchristos    This program is distributed in the hope that it will be useful,
12*c42dbd0eSchristos    but WITHOUT ANY WARRANTY; without even the implied warranty of
13*c42dbd0eSchristos    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14*c42dbd0eSchristos    GNU General Public License for more details.
15*c42dbd0eSchristos 
16*c42dbd0eSchristos    You should have received a copy of the GNU General Public License
17*c42dbd0eSchristos    along with this program; if not, write to the Free Software
18*c42dbd0eSchristos    Foundation, 51 Franklin Street - Fifth Floor, Boston,
19*c42dbd0eSchristos    MA 02110-1301, USA.  */
20*c42dbd0eSchristos 
21*c42dbd0eSchristos #include "config.h"
22*c42dbd0eSchristos #include <stdio.h>
23*c42dbd0eSchristos #include <stdlib.h>
24*c42dbd0eSchristos #include <string.h>
25*c42dbd0eSchristos #include <sys/types.h>
26*c42dbd0eSchristos #include <unistd.h>
27*c42dbd0eSchristos 
28*c42dbd0eSchristos #include "DbeThread.h"
29*c42dbd0eSchristos #include "util.h"
30*c42dbd0eSchristos #include "vec.h"
31*c42dbd0eSchristos 
32*c42dbd0eSchristos static void
cleanup_free_mutex(void * arg)33*c42dbd0eSchristos cleanup_free_mutex (void* arg) {
34*c42dbd0eSchristos   //  pthread_mutex_t *p_mutex = (pthread_mutex_t *) arg;
35*c42dbd0eSchristos   //  if (p_mutex)
36*c42dbd0eSchristos   //    pthread_mutex_unlock (p_mutex);
37*c42dbd0eSchristos }
38*c42dbd0eSchristos 
39*c42dbd0eSchristos static void*
thread_pool_loop(void * arg)40*c42dbd0eSchristos thread_pool_loop (void* arg)
41*c42dbd0eSchristos {
42*c42dbd0eSchristos   DbeThreadPool *thrp = (DbeThreadPool*) arg;
43*c42dbd0eSchristos   Dprintf (DEBUG_THREADS, "thread_pool_loop:%d starting thread=%llu\n",
44*c42dbd0eSchristos 	   __LINE__, (unsigned long long) pthread_self ());
45*c42dbd0eSchristos 
46*c42dbd0eSchristos   /* set my cancel state to 'enabled', and cancel type to 'defered'. */
47*c42dbd0eSchristos   pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL);
48*c42dbd0eSchristos   pthread_setcanceltype (PTHREAD_CANCEL_DEFERRED, NULL);
49*c42dbd0eSchristos 
50*c42dbd0eSchristos   /* set thread cleanup handler */
51*c42dbd0eSchristos   pthread_cleanup_push (cleanup_free_mutex, (void*) & (thrp->p_mutex));
52*c42dbd0eSchristos   for (;;)
53*c42dbd0eSchristos     {
54*c42dbd0eSchristos       DbeQueue *q = thrp->get_queue ();
55*c42dbd0eSchristos       if (q)
56*c42dbd0eSchristos 	{ /* a request is pending */
57*c42dbd0eSchristos 	  Dprintf (DEBUG_THREADS,
58*c42dbd0eSchristos 		   "thread_pool_loop:%d thread=%llu queue=%d start\n",
59*c42dbd0eSchristos 		   __LINE__, (unsigned long long) pthread_self (), q->id);
60*c42dbd0eSchristos 	  q->func (q->arg);
61*c42dbd0eSchristos 	  Dprintf (DEBUG_THREADS,
62*c42dbd0eSchristos 		   "thread_pool_loop:%d thread=%llu queue=%d done\n",
63*c42dbd0eSchristos 		   __LINE__, (unsigned long long) pthread_self (), q->id);
64*c42dbd0eSchristos 	  delete q;
65*c42dbd0eSchristos 	  continue;
66*c42dbd0eSchristos 	}
67*c42dbd0eSchristos       if (thrp->no_new_queues)
68*c42dbd0eSchristos 	{
69*c42dbd0eSchristos 	  Dprintf (DEBUG_THREADS, "thread_pool_loop:%d exit thread=%llu\n",
70*c42dbd0eSchristos 		   __LINE__, (unsigned long long) pthread_self ());
71*c42dbd0eSchristos 	  pthread_exit (NULL);
72*c42dbd0eSchristos 	}
73*c42dbd0eSchristos       Dprintf (DEBUG_THREADS,
74*c42dbd0eSchristos 	       "thread_pool_loop:%d before pthread_cond_wait thread=%llu\n",
75*c42dbd0eSchristos 	       __LINE__, (unsigned long long) pthread_self ());
76*c42dbd0eSchristos       pthread_mutex_lock (&thrp->p_mutex);
77*c42dbd0eSchristos       pthread_cond_wait (&thrp->p_cond_var, &thrp->p_mutex);
78*c42dbd0eSchristos       Dprintf (DEBUG_THREADS,
79*c42dbd0eSchristos 	       "thread_pool_loop:%d after pthread_cond_wait thread=%llu\n",
80*c42dbd0eSchristos 	       __LINE__, (unsigned long long) pthread_self ());
81*c42dbd0eSchristos       pthread_mutex_unlock (&thrp->p_mutex);
82*c42dbd0eSchristos     }
83*c42dbd0eSchristos 
84*c42dbd0eSchristos   // never reached, but we must use it here. See `man pthread_cleanup_push`
85*c42dbd0eSchristos   pthread_cleanup_pop (0);
86*c42dbd0eSchristos }
87*c42dbd0eSchristos 
DbeThreadPool(int _max_threads)88*c42dbd0eSchristos DbeThreadPool::DbeThreadPool (int _max_threads)
89*c42dbd0eSchristos {
90*c42dbd0eSchristos   static const int DBE_NTHREADS_DEFAULT = 4;
91*c42dbd0eSchristos   char *s = getenv ("GPROFNG_DBE_NTHREADS");
92*c42dbd0eSchristos   if (s)
93*c42dbd0eSchristos     {
94*c42dbd0eSchristos       max_threads = atoi (s);
95*c42dbd0eSchristos       if (max_threads < 0)
96*c42dbd0eSchristos 	max_threads = 0;
97*c42dbd0eSchristos       if (_max_threads > 0 && max_threads < _max_threads)
98*c42dbd0eSchristos 	max_threads = _max_threads;
99*c42dbd0eSchristos     }
100*c42dbd0eSchristos   else
101*c42dbd0eSchristos     {
102*c42dbd0eSchristos       max_threads = _max_threads;
103*c42dbd0eSchristos       if (max_threads < 0)
104*c42dbd0eSchristos 	max_threads = DBE_NTHREADS_DEFAULT;
105*c42dbd0eSchristos     }
106*c42dbd0eSchristos   Dprintf (DEBUG_THREADS, "DbeThreadPool:%d  max_threads %d ---> %d\n",
107*c42dbd0eSchristos 	   __LINE__, _max_threads, max_threads);
108*c42dbd0eSchristos   pthread_mutex_init (&p_mutex, NULL);
109*c42dbd0eSchristos   pthread_cond_init (&p_cond_var, NULL);
110*c42dbd0eSchristos   threads = new Vector <pthread_t>(max_threads);
111*c42dbd0eSchristos   queue = NULL;
112*c42dbd0eSchristos   last_queue = NULL;
113*c42dbd0eSchristos   no_new_queues = false;
114*c42dbd0eSchristos   queues_cnt = 0;
115*c42dbd0eSchristos   total_queues = 0;
116*c42dbd0eSchristos }
117*c42dbd0eSchristos 
~DbeThreadPool()118*c42dbd0eSchristos DbeThreadPool::~DbeThreadPool ()
119*c42dbd0eSchristos {
120*c42dbd0eSchristos   delete threads;
121*c42dbd0eSchristos }
122*c42dbd0eSchristos 
123*c42dbd0eSchristos DbeQueue *
get_queue()124*c42dbd0eSchristos DbeThreadPool::get_queue ()
125*c42dbd0eSchristos {
126*c42dbd0eSchristos   pthread_mutex_lock (&p_mutex);
127*c42dbd0eSchristos   DbeQueue *q = queue;
128*c42dbd0eSchristos   Dprintf (DEBUG_THREADS,
129*c42dbd0eSchristos    "get_queue:%d thr: %lld id=%d queues_cnt=%d threads_cnt=%d max_threads=%d\n",
130*c42dbd0eSchristos 	   __LINE__, (unsigned long long) pthread_self (),
131*c42dbd0eSchristos 	   q ? q->id : -1, queues_cnt, (int) threads->size (), max_threads);
132*c42dbd0eSchristos   if (q)
133*c42dbd0eSchristos     {
134*c42dbd0eSchristos       queue = q->next;
135*c42dbd0eSchristos       queues_cnt--;
136*c42dbd0eSchristos     }
137*c42dbd0eSchristos   pthread_mutex_unlock (&p_mutex);
138*c42dbd0eSchristos   return q;
139*c42dbd0eSchristos }
140*c42dbd0eSchristos 
141*c42dbd0eSchristos void
put_queue(DbeQueue * q)142*c42dbd0eSchristos DbeThreadPool::put_queue (DbeQueue *q)
143*c42dbd0eSchristos {
144*c42dbd0eSchristos   if (max_threads == 0)
145*c42dbd0eSchristos     {
146*c42dbd0eSchristos       // nothing runs in parallel
147*c42dbd0eSchristos       q->id = ++total_queues;
148*c42dbd0eSchristos       Dprintf (DEBUG_THREADS, NTXT ("put_queue:%d thr=%lld max_threads=%d queue (%d) runs on the worked thread\n"),
149*c42dbd0eSchristos 	       __LINE__, (unsigned long long) pthread_self (), max_threads, q->id);
150*c42dbd0eSchristos       q->func (q->arg);
151*c42dbd0eSchristos       delete q;
152*c42dbd0eSchristos       return;
153*c42dbd0eSchristos     }
154*c42dbd0eSchristos 
155*c42dbd0eSchristos   pthread_mutex_lock (&p_mutex);
156*c42dbd0eSchristos   // nothing runs in parallel
157*c42dbd0eSchristos   q->id = ++total_queues;
158*c42dbd0eSchristos   Dprintf (DEBUG_THREADS, "put_queue:%d thr=%lld max_threads=%d queue (%d)\n",
159*c42dbd0eSchristos 	   __LINE__, (unsigned long long) pthread_self (), max_threads, q->id);
160*c42dbd0eSchristos   if (queue)
161*c42dbd0eSchristos     {
162*c42dbd0eSchristos       last_queue->next = q;
163*c42dbd0eSchristos       last_queue = q;
164*c42dbd0eSchristos     }
165*c42dbd0eSchristos   else
166*c42dbd0eSchristos     {
167*c42dbd0eSchristos       queue = q;
168*c42dbd0eSchristos       last_queue = q;
169*c42dbd0eSchristos     }
170*c42dbd0eSchristos   queues_cnt++;
171*c42dbd0eSchristos   Dprintf (DEBUG_THREADS,
172*c42dbd0eSchristos 	   "put_queue:%d id=%d queues_cnt=%d threads_cnt=%d max_threads=%d\n",
173*c42dbd0eSchristos 	   __LINE__, q->id, queues_cnt, (int) threads->size (), max_threads);
174*c42dbd0eSchristos   if (queues_cnt > threads->size () && threads->size () < max_threads)
175*c42dbd0eSchristos     {
176*c42dbd0eSchristos       pthread_t thr;
177*c42dbd0eSchristos       int r = pthread_create (&thr, NULL, thread_pool_loop, (void *) this);
178*c42dbd0eSchristos       Dprintf (DEBUG_THREADS,
179*c42dbd0eSchristos 	       "put_queue:%d pthread_create returns %d thr=%llu\n",
180*c42dbd0eSchristos 	       __LINE__, r, (unsigned long long) thr);
181*c42dbd0eSchristos       if (r)
182*c42dbd0eSchristos 	fprintf (stderr, GTXT ("pthread_create failed. errnum=%d (%s)\n"), r,
183*c42dbd0eSchristos 		 STR (strerror (r)));
184*c42dbd0eSchristos       else
185*c42dbd0eSchristos 	threads->append (thr);
186*c42dbd0eSchristos     }
187*c42dbd0eSchristos   pthread_cond_signal (&p_cond_var);
188*c42dbd0eSchristos   pthread_mutex_unlock (&p_mutex);
189*c42dbd0eSchristos }
190*c42dbd0eSchristos 
191*c42dbd0eSchristos void
wait_queues()192*c42dbd0eSchristos DbeThreadPool::wait_queues ()
193*c42dbd0eSchristos {
194*c42dbd0eSchristos   pthread_mutex_lock (&p_mutex);
195*c42dbd0eSchristos   no_new_queues = true;
196*c42dbd0eSchristos   pthread_mutex_unlock (&p_mutex);
197*c42dbd0eSchristos   pthread_cond_broadcast (&p_cond_var);
198*c42dbd0eSchristos   for (;;) // Run requests on the worked thread too
199*c42dbd0eSchristos     {
200*c42dbd0eSchristos       DbeQueue *q = get_queue ();
201*c42dbd0eSchristos       if (q == NULL)
202*c42dbd0eSchristos 	break;
203*c42dbd0eSchristos       Dprintf (DEBUG_THREADS, "wait_queues:%d thread=%llu queue=%d start\n",
204*c42dbd0eSchristos 	       __LINE__, (unsigned long long) pthread_self (), q->id);
205*c42dbd0eSchristos       q->func (q->arg);
206*c42dbd0eSchristos       Dprintf (DEBUG_THREADS, "wait_queues:%d thread=%llu queue=%d done\n",
207*c42dbd0eSchristos 	       __LINE__, (unsigned long long) pthread_self (), q->id);
208*c42dbd0eSchristos       delete q;
209*c42dbd0eSchristos     }
210*c42dbd0eSchristos   for (int i = 0, sz = threads->size (); i < sz; i++)
211*c42dbd0eSchristos     {
212*c42dbd0eSchristos       void *retval;
213*c42dbd0eSchristos       pthread_join (threads->get (i), &retval);
214*c42dbd0eSchristos     }
215*c42dbd0eSchristos }
216*c42dbd0eSchristos 
DbeQueue(int (* _func)(void * arg),void * _arg)217*c42dbd0eSchristos DbeQueue::DbeQueue (int (*_func) (void *arg), void *_arg)
218*c42dbd0eSchristos {
219*c42dbd0eSchristos   func = _func;
220*c42dbd0eSchristos   arg = _arg;
221*c42dbd0eSchristos   next = NULL;
222*c42dbd0eSchristos }
223*c42dbd0eSchristos 
~DbeQueue()224*c42dbd0eSchristos DbeQueue::~DbeQueue () { }
225