xref: /netbsd-src/external/zlib/pigz/dist/yarn.c (revision 3e407a68a64115001ff3f1b658def92a1368b7e6)
1 /* yarn.c -- generic thread operations implemented using pthread functions
2  * Copyright (C) 2008, 2012 Mark Adler
3  * Version 1.3  13 Jan 2012  Mark Adler
4  * For conditions of distribution and use, see copyright notice in yarn.h
5  */
6 
7 /* Basic thread operations implemented using the POSIX pthread library.  All
8    pthread references are isolated within this module to allow alternate
9    implementations with other thread libraries.  See yarn.h for the description
10    of these operations. */
11 
12 /* Version history:
13    1.0    19 Oct 2008  First version
14    1.1    26 Oct 2008  No need to set the stack size -- remove
15                        Add yarn_abort() function for clean-up on error exit
16    1.2    19 Dec 2011  (changes reversed in 1.3)
17    1.3    13 Jan 2012  Add large file #define for consistency with pigz.c
18                        Update thread portability #defines per IEEE 1003.1-2008
19                        Fix documentation in yarn.h for yarn_prefix
20  */
21 
22 /* for thread portability */
23 #define _XOPEN_SOURCE 700
24 #define _POSIX_C_SOURCE 200809L
25 #define _THREAD_SAFE
26 
27 /* use large file functions if available */
28 #define _FILE_OFFSET_BITS 64
29 
30 /* external libraries and entities referenced */
31 #include <stdio.h>      /* fprintf(), stderr */
32 #include <stdlib.h>     /* exit(), malloc(), free(), NULL */
33 #include <pthread.h>    /* pthread_t, pthread_create(), pthread_join(), */
34     /* pthread_attr_t, pthread_attr_init(), pthread_attr_destroy(),
35        PTHREAD_CREATE_JOINABLE, pthread_attr_setdetachstate(),
36        pthread_self(), pthread_equal(),
37        pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER, pthread_mutex_init(),
38        pthread_mutex_lock(), pthread_mutex_unlock(), pthread_mutex_destroy(),
39        pthread_cond_t, PTHREAD_COND_INITIALIZER, pthread_cond_init(),
40        pthread_cond_broadcast(), pthread_cond_wait(), pthread_cond_destroy() */
41 #include <errno.h>      /* ENOMEM, EAGAIN, EINVAL */
42 
43 /* interface definition */
44 #include "yarn.h"
45 
46 /* constants */
47 #define local static            /* for non-exported functions and globals */
48 
49 /* error handling external globals, resettable by application */
50 char *yarn_prefix = "yarn";
51 void (*yarn_abort)(int) = NULL;
52 
53 
54 /* immediately exit -- use for errors that shouldn't ever happen */
fail(int err)55 local void fail(int err)
56 {
57     fprintf(stderr, "%s: %s (%d) -- aborting\n", yarn_prefix,
58             err == ENOMEM ? "out of memory" : "internal pthread error", err);
59     if (yarn_abort != NULL)
60         yarn_abort(err);
61     exit(err == ENOMEM || err == EAGAIN ? err : EINVAL);
62 }
63 
64 /* memory handling routines provided by user -- if none are provided, malloc()
65    and free() are used, which are therefore assumed to be thread-safe */
66 typedef void *(*malloc_t)(size_t);
67 typedef void (*free_t)(void *);
68 local malloc_t my_malloc_f = malloc;
69 local free_t my_free = free;
70 
71 /* use user-supplied allocation routines instead of malloc() and free() */
yarn_mem(malloc_t lease,free_t vacate)72 void yarn_mem(malloc_t lease, free_t vacate)
73 {
74     my_malloc_f = lease;
75     my_free = vacate;
76 }
77 
78 /* memory allocation that cannot fail (from the point of view of the caller) */
my_malloc(size_t size)79 local void *my_malloc(size_t size)
80 {
81     void *block;
82 
83     if ((block = my_malloc_f(size)) == NULL)
84         fail(ENOMEM);
85     return block;
86 }
87 
88 /* -- lock functions -- */
89 
90 struct lock_s {
91     pthread_mutex_t mutex;
92     pthread_cond_t cond;
93     long value;
94 };
95 
new_lock(long initial)96 lock *new_lock(long initial)
97 {
98     int ret;
99     lock *bolt;
100 
101     bolt = my_malloc(sizeof(struct lock_s));
102     if ((ret = pthread_mutex_init(&(bolt->mutex), NULL)) ||
103         (ret = pthread_cond_init(&(bolt->cond), NULL)))
104         fail(ret);
105     bolt->value = initial;
106     return bolt;
107 }
108 
possess(lock * bolt)109 void possess(lock *bolt)
110 {
111     int ret;
112 
113     if ((ret = pthread_mutex_lock(&(bolt->mutex))) != 0)
114         fail(ret);
115 }
116 
release(lock * bolt)117 void release(lock *bolt)
118 {
119     int ret;
120 
121     if ((ret = pthread_mutex_unlock(&(bolt->mutex))) != 0)
122         fail(ret);
123 }
124 
twist(lock * bolt,enum twist_op op,long val)125 void twist(lock *bolt, enum twist_op op, long val)
126 {
127     int ret;
128 
129     if (op == TO)
130         bolt->value = val;
131     else if (op == BY)
132         bolt->value += val;
133     if ((ret = pthread_cond_broadcast(&(bolt->cond))) ||
134         (ret = pthread_mutex_unlock(&(bolt->mutex))))
135         fail(ret);
136 }
137 
138 #define until(a) while(!(a))
139 
wait_for(lock * bolt,enum wait_op op,long val)140 void wait_for(lock *bolt, enum wait_op op, long val)
141 {
142     int ret;
143 
144     switch (op) {
145     case TO_BE:
146         until (bolt->value == val)
147             if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
148                 fail(ret);
149         break;
150     case NOT_TO_BE:
151         until (bolt->value != val)
152             if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
153                 fail(ret);
154         break;
155     case TO_BE_MORE_THAN:
156         until (bolt->value > val)
157             if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
158                 fail(ret);
159         break;
160     case TO_BE_LESS_THAN:
161         until (bolt->value < val)
162             if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
163                 fail(ret);
164     }
165 }
166 
peek_lock(lock * bolt)167 long peek_lock(lock *bolt)
168 {
169     return bolt->value;
170 }
171 
free_lock(lock * bolt)172 void free_lock(lock *bolt)
173 {
174     int ret;
175     if ((ret = pthread_cond_destroy(&(bolt->cond))) ||
176         (ret = pthread_mutex_destroy(&(bolt->mutex))))
177         fail(ret);
178     my_free(bolt);
179 }
180 
181 /* -- thread functions (uses lock functions above) -- */
182 
183 struct thread_s {
184     pthread_t id;
185     int done;                   /* true if this thread has exited */
186     thread *next;               /* for list of all launched threads */
187 };
188 
189 /* list of threads launched but not joined, count of threads exited but not
190    joined (incremented by ignition() just before exiting) */
191 local lock threads_lock = {
192     PTHREAD_MUTEX_INITIALIZER,
193     PTHREAD_COND_INITIALIZER,
194     0                           /* number of threads exited but not joined */
195 };
196 local thread *threads = NULL;       /* list of extant threads */
197 
198 /* structure in which to pass the probe and its payload to ignition() */
199 struct capsule {
200     void (*probe)(void *);
201     void *payload;
202 };
203 
204 /* mark the calling thread as done and alert join_all() */
reenter(void * dummy)205 local void reenter(void *dummy)
206 {
207     thread *match, **prior;
208     pthread_t me;
209 
210     (void)dummy;
211 
212     /* find this thread in the threads list by matching the thread id */
213     me = pthread_self();
214     possess(&(threads_lock));
215     prior = &(threads);
216     while ((match = *prior) != NULL) {
217         if (pthread_equal(match->id, me))
218             break;
219         prior = &(match->next);
220     }
221     if (match == NULL)
222         fail(EINVAL);
223 
224     /* mark this thread as done and move it to the head of the list */
225     match->done = 1;
226     if (threads != match) {
227         *prior = match->next;
228         match->next = threads;
229         threads = match;
230     }
231 
232     /* update the count of threads to be joined and alert join_all() */
233     twist(&(threads_lock), BY, +1);
234 }
235 
236 /* all threads go through this routine so that just before the thread exits,
237    it marks itself as done in the threads list and alerts join_all() so that
238    the thread resources can be released -- use cleanup stack so that the
239    marking occurs even if the thread is cancelled */
ignition(void * arg)240 local void *ignition(void *arg)
241 {
242     struct capsule *capsule = arg;
243 
244     /* run reenter() before leaving */
245     pthread_cleanup_push(reenter, NULL);
246 
247     /* execute the requested function with argument */
248     capsule->probe(capsule->payload);
249     my_free(capsule);
250 
251     /* mark this thread as done and let join_all() know */
252     pthread_cleanup_pop(1);
253 
254     /* exit thread */
255     return NULL;
256 }
257 
258 /* not all POSIX implementations create threads as joinable by default, so that
259    is made explicit here */
launch(void (* probe)(void *),void * payload)260 thread *launch(void (*probe)(void *), void *payload)
261 {
262     int ret;
263     thread *th;
264     struct capsule *capsule;
265     pthread_attr_t attr;
266 
267     /* construct the requested call and argument for the ignition() routine
268        (allocated instead of automatic so that we're sure this will still be
269        there when ignition() actually starts up -- ignition() will free this
270        allocation) */
271     capsule = my_malloc(sizeof(struct capsule));
272     capsule->probe = probe;
273     capsule->payload = payload;
274 
275     /* assure this thread is in the list before join_all() or ignition() looks
276        for it */
277     possess(&(threads_lock));
278 
279     /* create the thread and call ignition() from that thread */
280     th = my_malloc(sizeof(struct thread_s));
281     if ((ret = pthread_attr_init(&attr)) ||
282         (ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE)) ||
283         (ret = pthread_create(&(th->id), &attr, ignition, capsule)) ||
284         (ret = pthread_attr_destroy(&attr)))
285         fail(ret);
286 
287     /* put the thread in the threads list for join_all() */
288     th->done = 0;
289     th->next = threads;
290     threads = th;
291     release(&(threads_lock));
292     return th;
293 }
294 
join(thread * ally)295 void join(thread *ally)
296 {
297     int ret;
298     thread *match, **prior;
299 
300     /* wait for thread to exit and return its resources */
301     if ((ret = pthread_join(ally->id, NULL)) != 0)
302         fail(ret);
303 
304     /* find the thread in the threads list */
305     possess(&(threads_lock));
306     prior = &(threads);
307     while ((match = *prior) != NULL) {
308         if (match == ally)
309             break;
310         prior = &(match->next);
311     }
312     if (match == NULL)
313         fail(EINVAL);
314 
315     /* remove thread from list and update exited count, free thread */
316     if (match->done)
317         threads_lock.value--;
318     *prior = match->next;
319     release(&(threads_lock));
320     my_free(ally);
321 }
322 
323 /* This implementation of join_all() only attempts to join threads that have
324    announced that they have exited (see ignition()).  When there are many
325    threads, this is faster than waiting for some random thread to exit while a
326    bunch of other threads have already exited. */
join_all(void)327 int join_all(void)
328 {
329     int ret, count;
330     thread *match, **prior;
331 
332     /* grab the threads list and initialize the joined count */
333     count = 0;
334     possess(&(threads_lock));
335 
336     /* do until threads list is empty */
337     while (threads != NULL) {
338         /* wait until at least one thread has reentered */
339         wait_for(&(threads_lock), NOT_TO_BE, 0);
340 
341         /* find the first thread marked done (should be at or near the top) */
342         prior = &(threads);
343         while ((match = *prior) != NULL) {
344             if (match->done)
345                 break;
346             prior = &(match->next);
347         }
348         if (match == NULL)
349             fail(EINVAL);
350 
351         /* join the thread (will be almost immediate), remove from the threads
352            list, update the reenter count, and free the thread */
353         if ((ret = pthread_join(match->id, NULL)) != 0)
354             fail(ret);
355         threads_lock.value--;
356         *prior = match->next;
357         my_free(match);
358         count++;
359     }
360 
361     /* let go of the threads list and return the number of threads joined */
362     release(&(threads_lock));
363     return count;
364 }
365 
366 /* cancel and join the thread -- the thread will cancel when it gets to a file
367    operation, a sleep or pause, or a condition wait */
destruct(thread * off_course)368 void destruct(thread *off_course)
369 {
370     int ret;
371 
372     if ((ret = pthread_cancel(off_course->id)) != 0)
373         fail(ret);
374     join(off_course);
375 }
376