xref: /netbsd-src/external/zlib/pigz/dist/yarn.c (revision c2f76ff004a2cb67efe5b12d97bd3ef7fe89e18d)
1 /* yarn.c -- generic thread operations implemented using pthread functions
2  * Copyright (C) 2008 Mark Adler
3  * Version 1.1  26 Oct 2008  Mark Adler
4  * For conditions of distribution and use, see copyright notice in yarn.h
5  */
6 
7 /* Basic thread operations implemented using the POSIX pthread library.  All
8    pthread references are isolated within this module to allow alternate
9    implementations with other thread libraries.  See yarn.h for the description
10    of these operations. */
11 
12 /* Version history:
13    1.0    19 Oct 2008  First version
14    1.1    26 Oct 2008  No need to set the stack size -- remove
15                        Add yarn_abort() function for clean-up on error exit
16  */
17 
18 /* for thread portability */
19 #define _POSIX_PTHREAD_SEMANTICS
20 #define _REENTRANT
21 
22 /* external libraries and entities referenced */
23 #include <stdio.h>      /* fprintf(), stderr */
24 #include <stdlib.h>     /* exit(), malloc(), free(), NULL */
25 #include <pthread.h>    /* pthread_t, pthread_create(), pthread_join(), */
26     /* pthread_attr_t, pthread_attr_init(), pthread_attr_destroy(),
27        PTHREAD_CREATE_JOINABLE, pthread_attr_setdetachstate(),
28        pthread_self(), pthread_equal(),
29        pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER, pthread_mutex_init(),
30        pthread_mutex_lock(), pthread_mutex_unlock(), pthread_mutex_destroy(),
31        pthread_cond_t, PTHREAD_COND_INITIALIZER, pthread_cond_init(),
32        pthread_cond_broadcast(), pthread_cond_wait(), pthread_cond_destroy() */
33 #include <errno.h>      /* ENOMEM, EAGAIN, EINVAL */
34 
35 /* interface definition */
36 #include "yarn.h"
37 
38 /* constants */
39 #define local static            /* for non-exported functions and globals */
40 
41 /* error handling external globals, resettable by application */
42 char *yarn_prefix = "yarn";
43 void (*yarn_abort)(int) = NULL;
44 
45 
46 /* immediately exit -- use for errors that shouldn't ever happen */
47 local void fail(int err)
48 {
49     fprintf(stderr, "%s: %s (%d) -- aborting\n", yarn_prefix,
50             err == ENOMEM ? "out of memory" : "internal pthread error", err);
51     if (yarn_abort != NULL)
52         yarn_abort(err);
53     exit(err == ENOMEM || err == EAGAIN ? err : EINVAL);
54 }
55 
56 /* memory handling routines provided by user -- if none are provided, malloc()
57    and free() are used, which are therefore assumed to be thread-safe */
58 typedef void *(*malloc_t)(size_t);
59 typedef void (*free_t)(void *);
60 local malloc_t my_malloc_f = malloc;
61 local free_t my_free = free;
62 
63 /* use user-supplied allocation routines instead of malloc() and free() */
64 void yarn_mem(malloc_t lease, free_t vacate)
65 {
66     my_malloc_f = lease;
67     my_free = vacate;
68 }
69 
70 /* memory allocation that cannot fail (from the point of view of the caller) */
71 local void *my_malloc(size_t size)
72 {
73     void *block;
74 
75     if ((block = my_malloc_f(size)) == NULL)
76         fail(ENOMEM);
77     return block;
78 }
79 
80 /* -- lock functions -- */
81 
82 struct lock_s {
83     pthread_mutex_t mutex;
84     pthread_cond_t cond;
85     long value;
86 };
87 
88 lock *new_lock(long initial)
89 {
90     int ret;
91     lock *bolt;
92 
93     bolt = my_malloc(sizeof(struct lock_s));
94     if ((ret = pthread_mutex_init(&(bolt->mutex), NULL)) ||
95         (ret = pthread_cond_init(&(bolt->cond), NULL)))
96         fail(ret);
97     bolt->value = initial;
98     return bolt;
99 }
100 
101 void possess(lock *bolt)
102 {
103     int ret;
104 
105     if ((ret = pthread_mutex_lock(&(bolt->mutex))) != 0)
106         fail(ret);
107 }
108 
109 void release(lock *bolt)
110 {
111     int ret;
112 
113     if ((ret = pthread_mutex_unlock(&(bolt->mutex))) != 0)
114         fail(ret);
115 }
116 
117 void twist(lock *bolt, enum twist_op op, long val)
118 {
119     int ret;
120 
121     if (op == TO)
122         bolt->value = val;
123     else if (op == BY)
124         bolt->value += val;
125     if ((ret = pthread_cond_broadcast(&(bolt->cond))) ||
126         (ret = pthread_mutex_unlock(&(bolt->mutex))))
127         fail(ret);
128 }
129 
130 #define until(a) while(!(a))
131 
132 void wait_for(lock *bolt, enum wait_op op, long val)
133 {
134     int ret;
135 
136     switch (op) {
137     case TO_BE:
138         until (bolt->value == val)
139             if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
140                 fail(ret);
141         break;
142     case NOT_TO_BE:
143         until (bolt->value != val)
144             if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
145                 fail(ret);
146         break;
147     case TO_BE_MORE_THAN:
148         until (bolt->value > val)
149             if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
150                 fail(ret);
151         break;
152     case TO_BE_LESS_THAN:
153         until (bolt->value < val)
154             if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
155                 fail(ret);
156     }
157 }
158 
159 long peek_lock(lock *bolt)
160 {
161     return bolt->value;
162 }
163 
164 void free_lock(lock *bolt)
165 {
166     int ret;
167     if ((ret = pthread_cond_destroy(&(bolt->cond))) ||
168         (ret = pthread_mutex_destroy(&(bolt->mutex))))
169         fail(ret);
170     my_free(bolt);
171 }
172 
173 /* -- thread functions (uses lock functions above) -- */
174 
175 struct thread_s {
176     pthread_t id;
177     int done;                   /* true if this thread has exited */
178     thread *next;               /* for list of all launched threads */
179 };
180 
181 /* list of threads launched but not joined, count of threads exited but not
182    joined (incremented by ignition() just before exiting) */
183 local lock threads_lock = {
184     PTHREAD_MUTEX_INITIALIZER,
185     PTHREAD_COND_INITIALIZER,
186     0                           /* number of threads exited but not joined */
187 };
188 local thread *threads = NULL;       /* list of extant threads */
189 
190 /* structure in which to pass the probe and its payload to ignition() */
191 struct capsule {
192     void (*probe)(void *);
193     void *payload;
194 };
195 
196 /* mark the calling thread as done and alert join_all() */
197 local void reenter(void *dummy)
198 {
199     thread *match, **prior;
200     pthread_t me;
201 
202     /* find this thread in the threads list by matching the thread id */
203     me = pthread_self();
204     possess(&(threads_lock));
205     prior = &(threads);
206     while ((match = *prior) != NULL) {
207         if (pthread_equal(match->id, me))
208             break;
209         prior = &(match->next);
210     }
211     if (match == NULL)
212         fail(EINVAL);
213 
214     /* mark this thread as done and move it to the head of the list */
215     match->done = 1;
216     if (threads != match) {
217         *prior = match->next;
218         match->next = threads;
219         threads = match;
220     }
221 
222     /* update the count of threads to be joined and alert join_all() */
223     twist(&(threads_lock), BY, +1);
224 }
225 
226 /* all threads go through this routine so that just before the thread exits,
227    it marks itself as done in the threads list and alerts join_all() so that
228    the thread resources can be released -- use cleanup stack so that the
229    marking occurs even if the thread is cancelled */
230 local void *ignition(void *arg)
231 {
232     struct capsule *capsule = arg;
233 
234     /* run reenter() before leaving */
235     pthread_cleanup_push(reenter, NULL);
236 
237     /* execute the requested function with argument */
238     capsule->probe(capsule->payload);
239     my_free(capsule);
240 
241     /* mark this thread as done and let join_all() know */
242     pthread_cleanup_pop(1);
243 
244     /* exit thread */
245     return NULL;
246 }
247 
248 /* not all POSIX implementations create threads as joinable by default, so that
249    is made explicit here */
250 thread *launch(void (*probe)(void *), void *payload)
251 {
252     int ret;
253     thread *th;
254     struct capsule *capsule;
255     pthread_attr_t attr;
256 
257     /* construct the requested call and argument for the ignition() routine
258        (allocated instead of automatic so that we're sure this will still be
259        there when ignition() actually starts up -- ignition() will free this
260        allocation) */
261     capsule = my_malloc(sizeof(struct capsule));
262     capsule->probe = probe;
263     capsule->payload = payload;
264 
265     /* assure this thread is in the list before join_all() or ignition() looks
266        for it */
267     possess(&(threads_lock));
268 
269     /* create the thread and call ignition() from that thread */
270     th = my_malloc(sizeof(struct thread_s));
271     if ((ret = pthread_attr_init(&attr)) ||
272         (ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE)) ||
273         (ret = pthread_create(&(th->id), &attr, ignition, capsule)) ||
274         (ret = pthread_attr_destroy(&attr)))
275         fail(ret);
276 
277     /* put the thread in the threads list for join_all() */
278     th->done = 0;
279     th->next = threads;
280     threads = th;
281     release(&(threads_lock));
282     return th;
283 }
284 
285 void join(thread *ally)
286 {
287     int ret;
288     thread *match, **prior;
289 
290     /* wait for thread to exit and return its resources */
291     if ((ret = pthread_join(ally->id, NULL)) != 0)
292         fail(ret);
293 
294     /* find the thread in the threads list */
295     possess(&(threads_lock));
296     prior = &(threads);
297     while ((match = *prior) != NULL) {
298         if (match == ally)
299             break;
300         prior = &(match->next);
301     }
302     if (match == NULL)
303         fail(EINVAL);
304 
305     /* remove thread from list and update exited count, free thread */
306     if (match->done)
307         threads_lock.value--;
308     *prior = match->next;
309     release(&(threads_lock));
310     my_free(ally);
311 }
312 
313 /* This implementation of join_all() only attempts to join threads that have
314    announced that they have exited (see ignition()).  When there are many
315    threads, this is faster than waiting for some random thread to exit while a
316    bunch of other threads have already exited. */
317 int join_all(void)
318 {
319     int ret, count;
320     thread *match, **prior;
321 
322     /* grab the threads list and initialize the joined count */
323     count = 0;
324     possess(&(threads_lock));
325 
326     /* do until threads list is empty */
327     while (threads != NULL) {
328         /* wait until at least one thread has reentered */
329         wait_for(&(threads_lock), NOT_TO_BE, 0);
330 
331         /* find the first thread marked done (should be at or near the top) */
332         prior = &(threads);
333         while ((match = *prior) != NULL) {
334             if (match->done)
335                 break;
336             prior = &(match->next);
337         }
338         if (match == NULL)
339             fail(EINVAL);
340 
341         /* join the thread (will be almost immediate), remove from the threads
342            list, update the reenter count, and free the thread */
343         if ((ret = pthread_join(match->id, NULL)) != 0)
344             fail(ret);
345         threads_lock.value--;
346         *prior = match->next;
347         my_free(match);
348         count++;
349     }
350 
351     /* let go of the threads list and return the number of threads joined */
352     release(&(threads_lock));
353     return count;
354 }
355 
356 /* cancel and join the thread -- the thread will cancel when it gets to a file
357    operation, a sleep or pause, or a condition wait */
358 void destruct(thread *off_course)
359 {
360     int ret;
361 
362     if ((ret = pthread_cancel(off_course->id)) != 0)
363         fail(ret);
364     join(off_course);
365 }
366