12b15cb3dSCy Schubert /*
22b15cb3dSCy Schubert * work_thread.c - threads implementation for blocking worker child.
32b15cb3dSCy Schubert */
42b15cb3dSCy Schubert #include <config.h>
52b15cb3dSCy Schubert #include "ntp_workimpl.h"
62b15cb3dSCy Schubert
72b15cb3dSCy Schubert #ifdef WORK_THREAD
82b15cb3dSCy Schubert
92b15cb3dSCy Schubert #include <stdio.h>
102b15cb3dSCy Schubert #include <ctype.h>
112b15cb3dSCy Schubert #include <signal.h>
122b15cb3dSCy Schubert #ifndef SYS_WINNT
132b15cb3dSCy Schubert #include <pthread.h>
142b15cb3dSCy Schubert #endif
152b15cb3dSCy Schubert
162b15cb3dSCy Schubert #include "ntp_stdlib.h"
172b15cb3dSCy Schubert #include "ntp_malloc.h"
182b15cb3dSCy Schubert #include "ntp_syslog.h"
192b15cb3dSCy Schubert #include "ntpd.h"
202b15cb3dSCy Schubert #include "ntp_io.h"
212b15cb3dSCy Schubert #include "ntp_assert.h"
222b15cb3dSCy Schubert #include "ntp_unixtime.h"
232b15cb3dSCy Schubert #include "timespecops.h"
242b15cb3dSCy Schubert #include "ntp_worker.h"
252b15cb3dSCy Schubert
262b15cb3dSCy Schubert #define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1)
272b15cb3dSCy Schubert #define CHILD_GONE_RESP CHILD_EXIT_REQ
2868ba7e87SXin LI /* Queue size increments:
2968ba7e87SXin LI * The request queue grows a bit faster than the response queue -- the
3009100258SXin LI * daemon can push requests and pull results faster on avarage than the
3168ba7e87SXin LI * worker can process requests and push results... If this really pays
3268ba7e87SXin LI * off is debatable.
3368ba7e87SXin LI */
342b15cb3dSCy Schubert #define WORKITEMS_ALLOC_INC 16
352b15cb3dSCy Schubert #define RESPONSES_ALLOC_INC 4
362b15cb3dSCy Schubert
3768ba7e87SXin LI /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
3868ba7e87SXin LI * set the maximum to 256kB. If the minimum goes below the
3968ba7e87SXin LI * system-defined minimum stack size, we have to adjust accordingly.
4068ba7e87SXin LI */
412b15cb3dSCy Schubert #ifndef THREAD_MINSTACKSIZE
422b15cb3dSCy Schubert # define THREAD_MINSTACKSIZE (64U * 1024)
432b15cb3dSCy Schubert #endif
4468ba7e87SXin LI
4568ba7e87SXin LI #ifndef THREAD_MAXSTACKSIZE
4668ba7e87SXin LI # define THREAD_MAXSTACKSIZE (256U * 1024)
4768ba7e87SXin LI #endif
4868ba7e87SXin LI
494e1ef62aSXin LI /* need a good integer to store a pointer... */
504e1ef62aSXin LI #ifndef UINTPTR_T
514e1ef62aSXin LI # if defined(UINTPTR_MAX)
524e1ef62aSXin LI # define UINTPTR_T uintptr_t
534e1ef62aSXin LI # elif defined(UINT_PTR)
544e1ef62aSXin LI # define UINTPTR_T UINT_PTR
554e1ef62aSXin LI # else
564e1ef62aSXin LI # define UINTPTR_T size_t
574e1ef62aSXin LI # endif
584e1ef62aSXin LI #endif
594e1ef62aSXin LI
602b15cb3dSCy Schubert
612b15cb3dSCy Schubert #ifdef SYS_WINNT
623311ff84SXin LI
632b15cb3dSCy Schubert # define thread_exit(c) _endthreadex(c)
643311ff84SXin LI # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
653311ff84SXin LI u_int WINAPI blocking_thread(void *);
663311ff84SXin LI static BOOL same_os_sema(const sem_ref obj, void * osobj);
673311ff84SXin LI
682b15cb3dSCy Schubert #else
693311ff84SXin LI
704e1ef62aSXin LI # define thread_exit(c) pthread_exit((void*)(UINTPTR_T)(c))
712b15cb3dSCy Schubert # define tickle_sem sem_post
723311ff84SXin LI void * blocking_thread(void *);
733311ff84SXin LI static void block_thread_signals(sigset_t *);
743311ff84SXin LI
752b15cb3dSCy Schubert #endif
762b15cb3dSCy Schubert
772b15cb3dSCy Schubert #ifdef WORK_PIPE
782b15cb3dSCy Schubert addremove_io_fd_func addremove_io_fd;
792b15cb3dSCy Schubert #else
802b15cb3dSCy Schubert addremove_io_semaphore_func addremove_io_semaphore;
812b15cb3dSCy Schubert #endif
822b15cb3dSCy Schubert
832b15cb3dSCy Schubert static void start_blocking_thread(blocking_child *);
842b15cb3dSCy Schubert static void start_blocking_thread_internal(blocking_child *);
852b15cb3dSCy Schubert static void prepare_child_sems(blocking_child *);
862b15cb3dSCy Schubert static int wait_for_sem(sem_ref, struct timespec *);
873311ff84SXin LI static int ensure_workitems_empty_slot(blocking_child *);
883311ff84SXin LI static int ensure_workresp_empty_slot(blocking_child *);
892b15cb3dSCy Schubert static int queue_req_pointer(blocking_child *, blocking_pipe_header *);
902b15cb3dSCy Schubert static void cleanup_after_child(blocking_child *);
912b15cb3dSCy Schubert
924990d495SXin LI static sema_type worker_mmutex;
934990d495SXin LI static sem_ref worker_memlock;
942b15cb3dSCy Schubert
954990d495SXin LI /* --------------------------------------------------------------------
964990d495SXin LI * locking the global worker state table (and other global stuff)
974990d495SXin LI */
984990d495SXin LI void
worker_global_lock(int inOrOut)994990d495SXin LI worker_global_lock(
1004990d495SXin LI int inOrOut)
1014990d495SXin LI {
1024990d495SXin LI if (worker_memlock) {
1034990d495SXin LI if (inOrOut)
1044990d495SXin LI wait_for_sem(worker_memlock, NULL);
1054990d495SXin LI else
1064990d495SXin LI tickle_sem(worker_memlock);
1074990d495SXin LI }
1084990d495SXin LI }
1094990d495SXin LI
1104990d495SXin LI /* --------------------------------------------------------------------
1114990d495SXin LI * implementation isolation wrapper
1124990d495SXin LI */
1132b15cb3dSCy Schubert void
exit_worker(int exitcode)1142b15cb3dSCy Schubert exit_worker(
1152b15cb3dSCy Schubert int exitcode
1162b15cb3dSCy Schubert )
1172b15cb3dSCy Schubert {
1182b15cb3dSCy Schubert thread_exit(exitcode); /* see #define thread_exit */
1192b15cb3dSCy Schubert }
1202b15cb3dSCy Schubert
1213311ff84SXin LI /* --------------------------------------------------------------------
1223311ff84SXin LI * sleep for a given time or until the wakup semaphore is tickled.
1233311ff84SXin LI */
1242b15cb3dSCy Schubert int
worker_sleep(blocking_child * c,time_t seconds)1252b15cb3dSCy Schubert worker_sleep(
1262b15cb3dSCy Schubert blocking_child * c,
1272b15cb3dSCy Schubert time_t seconds
1282b15cb3dSCy Schubert )
1292b15cb3dSCy Schubert {
1302b15cb3dSCy Schubert struct timespec until;
1312b15cb3dSCy Schubert int rc;
1322b15cb3dSCy Schubert
1332b15cb3dSCy Schubert # ifdef HAVE_CLOCK_GETTIME
1342b15cb3dSCy Schubert if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
1352b15cb3dSCy Schubert msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
1362b15cb3dSCy Schubert return -1;
1372b15cb3dSCy Schubert }
1382b15cb3dSCy Schubert # else
1392b15cb3dSCy Schubert if (0 != getclock(TIMEOFDAY, &until)) {
1402b15cb3dSCy Schubert msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
1412b15cb3dSCy Schubert return -1;
1422b15cb3dSCy Schubert }
1432b15cb3dSCy Schubert # endif
1442b15cb3dSCy Schubert until.tv_sec += seconds;
1452b15cb3dSCy Schubert rc = wait_for_sem(c->wake_scheduled_sleep, &until);
1462b15cb3dSCy Schubert if (0 == rc)
1472b15cb3dSCy Schubert return -1;
1482b15cb3dSCy Schubert if (-1 == rc && ETIMEDOUT == errno)
1492b15cb3dSCy Schubert return 0;
1502b15cb3dSCy Schubert msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
1512b15cb3dSCy Schubert return -1;
1522b15cb3dSCy Schubert }
1532b15cb3dSCy Schubert
1542b15cb3dSCy Schubert
1553311ff84SXin LI /* --------------------------------------------------------------------
1563311ff84SXin LI * Wake up a worker that takes a nap.
1573311ff84SXin LI */
1582b15cb3dSCy Schubert void
interrupt_worker_sleep(void)1592b15cb3dSCy Schubert interrupt_worker_sleep(void)
1602b15cb3dSCy Schubert {
1612b15cb3dSCy Schubert u_int idx;
1622b15cb3dSCy Schubert blocking_child * c;
1632b15cb3dSCy Schubert
1642b15cb3dSCy Schubert for (idx = 0; idx < blocking_children_alloc; idx++) {
1652b15cb3dSCy Schubert c = blocking_children[idx];
1662b15cb3dSCy Schubert if (NULL == c || NULL == c->wake_scheduled_sleep)
1672b15cb3dSCy Schubert continue;
1682b15cb3dSCy Schubert tickle_sem(c->wake_scheduled_sleep);
1692b15cb3dSCy Schubert }
1702b15cb3dSCy Schubert }
1712b15cb3dSCy Schubert
1723311ff84SXin LI /* --------------------------------------------------------------------
1733311ff84SXin LI * Make sure there is an empty slot at the head of the request
1743311ff84SXin LI * queue. Tell if the queue is currently empty.
1753311ff84SXin LI */
1763311ff84SXin LI static int
ensure_workitems_empty_slot(blocking_child * c)1772b15cb3dSCy Schubert ensure_workitems_empty_slot(
1782b15cb3dSCy Schubert blocking_child *c
1792b15cb3dSCy Schubert )
1802b15cb3dSCy Schubert {
1813311ff84SXin LI /*
1823311ff84SXin LI ** !!! PRECONDITION: caller holds access lock!
1833311ff84SXin LI **
1843311ff84SXin LI ** This simply tries to increase the size of the buffer if it
1853311ff84SXin LI ** becomes full. The resize operation does *not* maintain the
1863311ff84SXin LI ** order of requests, but that should be irrelevant since the
1873311ff84SXin LI ** processing is considered asynchronous anyway.
1883311ff84SXin LI **
1893311ff84SXin LI ** Return if the buffer is currently empty.
1903311ff84SXin LI */
1913311ff84SXin LI
1923311ff84SXin LI static const size_t each =
1933311ff84SXin LI sizeof(blocking_children[0]->workitems[0]);
1943311ff84SXin LI
1952b15cb3dSCy Schubert size_t new_alloc;
1963311ff84SXin LI size_t slots_used;
19768ba7e87SXin LI size_t sidx;
1982b15cb3dSCy Schubert
1993311ff84SXin LI slots_used = c->head_workitem - c->tail_workitem;
2003311ff84SXin LI if (slots_used >= c->workitems_alloc) {
2012b15cb3dSCy Schubert new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
2023311ff84SXin LI c->workitems = erealloc(c->workitems, new_alloc * each);
20368ba7e87SXin LI for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
20468ba7e87SXin LI c->workitems[sidx] = NULL;
2053311ff84SXin LI c->tail_workitem = 0;
2063311ff84SXin LI c->head_workitem = c->workitems_alloc;
2072b15cb3dSCy Schubert c->workitems_alloc = new_alloc;
2082b15cb3dSCy Schubert }
20968ba7e87SXin LI INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
2103311ff84SXin LI return (0 == slots_used);
2113311ff84SXin LI }
2122b15cb3dSCy Schubert
2133311ff84SXin LI /* --------------------------------------------------------------------
2143311ff84SXin LI * Make sure there is an empty slot at the head of the response
2153311ff84SXin LI * queue. Tell if the queue is currently empty.
2163311ff84SXin LI */
2173311ff84SXin LI static int
ensure_workresp_empty_slot(blocking_child * c)2182b15cb3dSCy Schubert ensure_workresp_empty_slot(
2192b15cb3dSCy Schubert blocking_child *c
2202b15cb3dSCy Schubert )
2212b15cb3dSCy Schubert {
2223311ff84SXin LI /*
2233311ff84SXin LI ** !!! PRECONDITION: caller holds access lock!
2243311ff84SXin LI **
2253311ff84SXin LI ** Works like the companion function above.
2263311ff84SXin LI */
2273311ff84SXin LI
2283311ff84SXin LI static const size_t each =
2293311ff84SXin LI sizeof(blocking_children[0]->responses[0]);
2303311ff84SXin LI
2312b15cb3dSCy Schubert size_t new_alloc;
2323311ff84SXin LI size_t slots_used;
23368ba7e87SXin LI size_t sidx;
2342b15cb3dSCy Schubert
2353311ff84SXin LI slots_used = c->head_response - c->tail_response;
2363311ff84SXin LI if (slots_used >= c->responses_alloc) {
2372b15cb3dSCy Schubert new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
2383311ff84SXin LI c->responses = erealloc(c->responses, new_alloc * each);
23968ba7e87SXin LI for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
24068ba7e87SXin LI c->responses[sidx] = NULL;
2413311ff84SXin LI c->tail_response = 0;
2423311ff84SXin LI c->head_response = c->responses_alloc;
2432b15cb3dSCy Schubert c->responses_alloc = new_alloc;
2442b15cb3dSCy Schubert }
24568ba7e87SXin LI INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
2463311ff84SXin LI return (0 == slots_used);
2473311ff84SXin LI }
2482b15cb3dSCy Schubert
2492b15cb3dSCy Schubert
2503311ff84SXin LI /* --------------------------------------------------------------------
2512b15cb3dSCy Schubert * queue_req_pointer() - append a work item or idle exit request to
2523311ff84SXin LI * blocking_workitems[]. Employ proper locking.
2532b15cb3dSCy Schubert */
2542b15cb3dSCy Schubert static int
queue_req_pointer(blocking_child * c,blocking_pipe_header * hdr)2552b15cb3dSCy Schubert queue_req_pointer(
2562b15cb3dSCy Schubert blocking_child * c,
2572b15cb3dSCy Schubert blocking_pipe_header * hdr
2582b15cb3dSCy Schubert )
2592b15cb3dSCy Schubert {
2603311ff84SXin LI size_t qhead;
2612b15cb3dSCy Schubert
2623311ff84SXin LI /* >>>> ACCESS LOCKING STARTS >>>> */
2633311ff84SXin LI wait_for_sem(c->accesslock, NULL);
2643311ff84SXin LI ensure_workitems_empty_slot(c);
2653311ff84SXin LI qhead = c->head_workitem;
2663311ff84SXin LI c->workitems[qhead % c->workitems_alloc] = hdr;
2673311ff84SXin LI c->head_workitem = 1 + qhead;
2683311ff84SXin LI tickle_sem(c->accesslock);
2693311ff84SXin LI /* <<<< ACCESS LOCKING ENDS <<<< */
2703311ff84SXin LI
2713311ff84SXin LI /* queue consumer wake-up notification */
2723311ff84SXin LI tickle_sem(c->workitems_pending);
2732b15cb3dSCy Schubert
2742b15cb3dSCy Schubert return 0;
2752b15cb3dSCy Schubert }
2762b15cb3dSCy Schubert
2773311ff84SXin LI /* --------------------------------------------------------------------
2783311ff84SXin LI * API function to make sure a worker is running, a proper private copy
2793311ff84SXin LI * of the data is made, the data eneterd into the queue and the worker
2803311ff84SXin LI * is signalled.
2813311ff84SXin LI */
2822b15cb3dSCy Schubert int
send_blocking_req_internal(blocking_child * c,blocking_pipe_header * hdr,void * data)2832b15cb3dSCy Schubert send_blocking_req_internal(
2842b15cb3dSCy Schubert blocking_child * c,
2852b15cb3dSCy Schubert blocking_pipe_header * hdr,
2862b15cb3dSCy Schubert void * data
2872b15cb3dSCy Schubert )
2882b15cb3dSCy Schubert {
2892b15cb3dSCy Schubert blocking_pipe_header * threadcopy;
2902b15cb3dSCy Schubert size_t payload_octets;
2912b15cb3dSCy Schubert
2922b15cb3dSCy Schubert REQUIRE(hdr != NULL);
2932b15cb3dSCy Schubert REQUIRE(data != NULL);
2942b15cb3dSCy Schubert DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
2952b15cb3dSCy Schubert
2962b15cb3dSCy Schubert if (hdr->octets <= sizeof(*hdr))
2972b15cb3dSCy Schubert return 1; /* failure */
2982b15cb3dSCy Schubert payload_octets = hdr->octets - sizeof(*hdr);
2992b15cb3dSCy Schubert
3003311ff84SXin LI if (NULL == c->thread_ref)
3012b15cb3dSCy Schubert start_blocking_thread(c);
3022b15cb3dSCy Schubert threadcopy = emalloc(hdr->octets);
3032b15cb3dSCy Schubert memcpy(threadcopy, hdr, sizeof(*hdr));
3042b15cb3dSCy Schubert memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
3052b15cb3dSCy Schubert
3062b15cb3dSCy Schubert return queue_req_pointer(c, threadcopy);
3072b15cb3dSCy Schubert }
3082b15cb3dSCy Schubert
3093311ff84SXin LI /* --------------------------------------------------------------------
3103311ff84SXin LI * Wait for the 'incoming queue no longer empty' signal, lock the shared
3113311ff84SXin LI * structure and dequeue an item.
3123311ff84SXin LI */
3132b15cb3dSCy Schubert blocking_pipe_header *
receive_blocking_req_internal(blocking_child * c)3142b15cb3dSCy Schubert receive_blocking_req_internal(
3152b15cb3dSCy Schubert blocking_child * c
3162b15cb3dSCy Schubert )
3172b15cb3dSCy Schubert {
3182b15cb3dSCy Schubert blocking_pipe_header * req;
3193311ff84SXin LI size_t qhead, qtail;
3202b15cb3dSCy Schubert
3213311ff84SXin LI req = NULL;
3222b15cb3dSCy Schubert do {
3233311ff84SXin LI /* wait for tickle from the producer side */
3243311ff84SXin LI wait_for_sem(c->workitems_pending, NULL);
3252b15cb3dSCy Schubert
3263311ff84SXin LI /* >>>> ACCESS LOCKING STARTS >>>> */
3273311ff84SXin LI wait_for_sem(c->accesslock, NULL);
3283311ff84SXin LI qhead = c->head_workitem;
3293311ff84SXin LI do {
3303311ff84SXin LI qtail = c->tail_workitem;
3313311ff84SXin LI if (qhead == qtail)
3323311ff84SXin LI break;
3333311ff84SXin LI c->tail_workitem = qtail + 1;
3343311ff84SXin LI qtail %= c->workitems_alloc;
3353311ff84SXin LI req = c->workitems[qtail];
3363311ff84SXin LI c->workitems[qtail] = NULL;
3373311ff84SXin LI } while (NULL == req);
3383311ff84SXin LI tickle_sem(c->accesslock);
3393311ff84SXin LI /* <<<< ACCESS LOCKING ENDS <<<< */
3403311ff84SXin LI
3413311ff84SXin LI } while (NULL == req);
3423311ff84SXin LI
3432b15cb3dSCy Schubert INSIST(NULL != req);
3442b15cb3dSCy Schubert if (CHILD_EXIT_REQ == req) { /* idled out */
3452b15cb3dSCy Schubert send_blocking_resp_internal(c, CHILD_GONE_RESP);
3462b15cb3dSCy Schubert req = NULL;
3472b15cb3dSCy Schubert }
3482b15cb3dSCy Schubert
3492b15cb3dSCy Schubert return req;
3502b15cb3dSCy Schubert }
3512b15cb3dSCy Schubert
3523311ff84SXin LI /* --------------------------------------------------------------------
3533311ff84SXin LI * Push a response into the return queue and eventually tickle the
3543311ff84SXin LI * receiver.
3553311ff84SXin LI */
3562b15cb3dSCy Schubert int
send_blocking_resp_internal(blocking_child * c,blocking_pipe_header * resp)3572b15cb3dSCy Schubert send_blocking_resp_internal(
3582b15cb3dSCy Schubert blocking_child * c,
3592b15cb3dSCy Schubert blocking_pipe_header * resp
3602b15cb3dSCy Schubert )
3612b15cb3dSCy Schubert {
3623311ff84SXin LI size_t qhead;
3633311ff84SXin LI int empty;
3642b15cb3dSCy Schubert
3653311ff84SXin LI /* >>>> ACCESS LOCKING STARTS >>>> */
3663311ff84SXin LI wait_for_sem(c->accesslock, NULL);
3673311ff84SXin LI empty = ensure_workresp_empty_slot(c);
3683311ff84SXin LI qhead = c->head_response;
3693311ff84SXin LI c->responses[qhead % c->responses_alloc] = resp;
3703311ff84SXin LI c->head_response = 1 + qhead;
3713311ff84SXin LI tickle_sem(c->accesslock);
3723311ff84SXin LI /* <<<< ACCESS LOCKING ENDS <<<< */
3732b15cb3dSCy Schubert
3743311ff84SXin LI /* queue consumer wake-up notification */
3753311ff84SXin LI if (empty)
3763311ff84SXin LI {
3772b15cb3dSCy Schubert # ifdef WORK_PIPE
3784e1ef62aSXin LI if (1 != write(c->resp_write_pipe, "", 1))
379*f5f40dd6SCy Schubert msyslog(LOG_WARNING, "async resolver: blocking_get%sinfo"
380*f5f40dd6SCy Schubert " failed to notify main thread!",
381*f5f40dd6SCy Schubert (BLOCKING_GETNAMEINFO == resp->rtype)
382*f5f40dd6SCy Schubert ? "name"
383*f5f40dd6SCy Schubert : "addr"
384*f5f40dd6SCy Schubert );
3852b15cb3dSCy Schubert # else
3863311ff84SXin LI tickle_sem(c->responses_pending);
3872b15cb3dSCy Schubert # endif
3883311ff84SXin LI }
3892b15cb3dSCy Schubert return 0;
3902b15cb3dSCy Schubert }
3912b15cb3dSCy Schubert
3922b15cb3dSCy Schubert
3932b15cb3dSCy Schubert #ifndef WORK_PIPE
3943311ff84SXin LI
3953311ff84SXin LI /* --------------------------------------------------------------------
396a466cc55SCy Schubert * Check if a (Windows-)handle to a semaphore is actually the same we
3973311ff84SXin LI * are using inside the sema wrapper.
3983311ff84SXin LI */
3993311ff84SXin LI static BOOL
same_os_sema(const sem_ref obj,void * osh)4003311ff84SXin LI same_os_sema(
4013311ff84SXin LI const sem_ref obj,
4023311ff84SXin LI void* osh
4033311ff84SXin LI )
4043311ff84SXin LI {
4053311ff84SXin LI return obj && osh && (obj->shnd == (HANDLE)osh);
4063311ff84SXin LI }
4073311ff84SXin LI
4083311ff84SXin LI /* --------------------------------------------------------------------
4093311ff84SXin LI * Find the shared context that associates to an OS handle and make sure
4103311ff84SXin LI * the data is dequeued and processed.
4113311ff84SXin LI */
4122b15cb3dSCy Schubert void
handle_blocking_resp_sem(void * context)4132b15cb3dSCy Schubert handle_blocking_resp_sem(
4142b15cb3dSCy Schubert void * context
4152b15cb3dSCy Schubert )
4162b15cb3dSCy Schubert {
4172b15cb3dSCy Schubert blocking_child * c;
4182b15cb3dSCy Schubert u_int idx;
4192b15cb3dSCy Schubert
4202b15cb3dSCy Schubert c = NULL;
4212b15cb3dSCy Schubert for (idx = 0; idx < blocking_children_alloc; idx++) {
4222b15cb3dSCy Schubert c = blocking_children[idx];
4233311ff84SXin LI if (c != NULL &&
4243311ff84SXin LI c->thread_ref != NULL &&
4253311ff84SXin LI same_os_sema(c->responses_pending, context))
4262b15cb3dSCy Schubert break;
4272b15cb3dSCy Schubert }
4282b15cb3dSCy Schubert if (idx < blocking_children_alloc)
4292b15cb3dSCy Schubert process_blocking_resp(c);
4302b15cb3dSCy Schubert }
4312b15cb3dSCy Schubert #endif /* !WORK_PIPE */
4322b15cb3dSCy Schubert
4333311ff84SXin LI /* --------------------------------------------------------------------
4343311ff84SXin LI * Fetch the next response from the return queue. In case of signalling
4353311ff84SXin LI * via pipe, make sure the pipe is flushed, too.
4363311ff84SXin LI */
4372b15cb3dSCy Schubert blocking_pipe_header *
receive_blocking_resp_internal(blocking_child * c)4382b15cb3dSCy Schubert receive_blocking_resp_internal(
4392b15cb3dSCy Schubert blocking_child * c
4402b15cb3dSCy Schubert )
4412b15cb3dSCy Schubert {
4422b15cb3dSCy Schubert blocking_pipe_header * removed;
4433311ff84SXin LI size_t qhead, qtail, slot;
4443311ff84SXin LI
4452b15cb3dSCy Schubert #ifdef WORK_PIPE
4462b15cb3dSCy Schubert int rc;
4472b15cb3dSCy Schubert char scratch[32];
4482b15cb3dSCy Schubert
4493311ff84SXin LI do
4502b15cb3dSCy Schubert rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
4513311ff84SXin LI while (-1 == rc && EINTR == errno);
4522b15cb3dSCy Schubert #endif
4533311ff84SXin LI
4543311ff84SXin LI /* >>>> ACCESS LOCKING STARTS >>>> */
4553311ff84SXin LI wait_for_sem(c->accesslock, NULL);
4563311ff84SXin LI qhead = c->head_response;
4573311ff84SXin LI qtail = c->tail_response;
4583311ff84SXin LI for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
4593311ff84SXin LI slot = qtail % c->responses_alloc;
4603311ff84SXin LI removed = c->responses[slot];
4613311ff84SXin LI c->responses[slot] = NULL;
4623311ff84SXin LI }
4633311ff84SXin LI c->tail_response = qtail;
4643311ff84SXin LI tickle_sem(c->accesslock);
4653311ff84SXin LI /* <<<< ACCESS LOCKING ENDS <<<< */
4663311ff84SXin LI
4672b15cb3dSCy Schubert if (NULL != removed) {
4682b15cb3dSCy Schubert DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
4692b15cb3dSCy Schubert BLOCKING_RESP_MAGIC == removed->magic_sig);
4702b15cb3dSCy Schubert }
4712b15cb3dSCy Schubert if (CHILD_GONE_RESP == removed) {
4722b15cb3dSCy Schubert cleanup_after_child(c);
4732b15cb3dSCy Schubert removed = NULL;
4742b15cb3dSCy Schubert }
4752b15cb3dSCy Schubert
4762b15cb3dSCy Schubert return removed;
4772b15cb3dSCy Schubert }
4782b15cb3dSCy Schubert
4793311ff84SXin LI /* --------------------------------------------------------------------
4803311ff84SXin LI * Light up a new worker.
4813311ff84SXin LI */
4822b15cb3dSCy Schubert static void
start_blocking_thread(blocking_child * c)4832b15cb3dSCy Schubert start_blocking_thread(
4842b15cb3dSCy Schubert blocking_child * c
4852b15cb3dSCy Schubert )
4862b15cb3dSCy Schubert {
4872b15cb3dSCy Schubert
4882b15cb3dSCy Schubert DEBUG_INSIST(!c->reusable);
4892b15cb3dSCy Schubert
4902b15cb3dSCy Schubert prepare_child_sems(c);
4912b15cb3dSCy Schubert start_blocking_thread_internal(c);
4922b15cb3dSCy Schubert }
4932b15cb3dSCy Schubert
4943311ff84SXin LI /* --------------------------------------------------------------------
4953311ff84SXin LI * Create a worker thread. There are several differences between POSIX
496*f5f40dd6SCy Schubert * and Windows, of course -- most notably the Windows thread is a
4973311ff84SXin LI * detached thread, and we keep the handle around until we want to get
4983311ff84SXin LI * rid of the thread. The notification scheme also differs: Windows
4993311ff84SXin LI * makes use of semaphores in both directions, POSIX uses a pipe for
5003311ff84SXin LI * integration with 'select()' or alike.
5013311ff84SXin LI */
5022b15cb3dSCy Schubert static void
start_blocking_thread_internal(blocking_child * c)5032b15cb3dSCy Schubert start_blocking_thread_internal(
5042b15cb3dSCy Schubert blocking_child * c
5052b15cb3dSCy Schubert )
5062b15cb3dSCy Schubert #ifdef SYS_WINNT
5072b15cb3dSCy Schubert {
5082b15cb3dSCy Schubert BOOL resumed;
5092b15cb3dSCy Schubert
5103311ff84SXin LI c->thread_ref = NULL;
5113311ff84SXin LI (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
5123311ff84SXin LI c->thr_table[0].thnd =
5132b15cb3dSCy Schubert (HANDLE)_beginthreadex(
5142b15cb3dSCy Schubert NULL,
5152b15cb3dSCy Schubert 0,
5162b15cb3dSCy Schubert &blocking_thread,
5172b15cb3dSCy Schubert c,
5182b15cb3dSCy Schubert CREATE_SUSPENDED,
5193311ff84SXin LI NULL);
5202b15cb3dSCy Schubert
5213311ff84SXin LI if (NULL == c->thr_table[0].thnd) {
5222b15cb3dSCy Schubert msyslog(LOG_ERR, "start blocking thread failed: %m");
5232b15cb3dSCy Schubert exit(-1);
5242b15cb3dSCy Schubert }
5252b15cb3dSCy Schubert /* remember the thread priority is only within the process class */
5263311ff84SXin LI if (!SetThreadPriority(c->thr_table[0].thnd,
527*f5f40dd6SCy Schubert THREAD_PRIORITY_BELOW_NORMAL)) {
5282b15cb3dSCy Schubert msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
529*f5f40dd6SCy Schubert }
530*f5f40dd6SCy Schubert if (NULL != pSetThreadDescription) {
531*f5f40dd6SCy Schubert (*pSetThreadDescription)(c->thr_table[0].thnd, L"ntp_worker");
532*f5f40dd6SCy Schubert }
5333311ff84SXin LI resumed = ResumeThread(c->thr_table[0].thnd);
5342b15cb3dSCy Schubert DEBUG_INSIST(resumed);
5353311ff84SXin LI c->thread_ref = &c->thr_table[0];
5362b15cb3dSCy Schubert }
5372b15cb3dSCy Schubert #else /* pthreads start_blocking_thread_internal() follows */
5382b15cb3dSCy Schubert {
5392b15cb3dSCy Schubert # ifdef NEED_PTHREAD_INIT
5402b15cb3dSCy Schubert static int pthread_init_called;
5412b15cb3dSCy Schubert # endif
5422b15cb3dSCy Schubert pthread_attr_t thr_attr;
5432b15cb3dSCy Schubert int rc;
5442b15cb3dSCy Schubert int pipe_ends[2]; /* read then write */
5452b15cb3dSCy Schubert int is_pipe;
5462b15cb3dSCy Schubert int flags;
54768ba7e87SXin LI size_t ostacksize;
54868ba7e87SXin LI size_t nstacksize;
5492b15cb3dSCy Schubert sigset_t saved_sig_mask;
5502b15cb3dSCy Schubert
5513311ff84SXin LI c->thread_ref = NULL;
5523311ff84SXin LI
5532b15cb3dSCy Schubert # ifdef NEED_PTHREAD_INIT
5542b15cb3dSCy Schubert /*
5552b15cb3dSCy Schubert * from lib/isc/unix/app.c:
5562b15cb3dSCy Schubert * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
5572b15cb3dSCy Schubert */
5582b15cb3dSCy Schubert if (!pthread_init_called) {
5592b15cb3dSCy Schubert pthread_init();
5602b15cb3dSCy Schubert pthread_init_called = TRUE;
5612b15cb3dSCy Schubert }
5622b15cb3dSCy Schubert # endif
5632b15cb3dSCy Schubert
5642b15cb3dSCy Schubert rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
5652b15cb3dSCy Schubert if (0 != rc) {
5662b15cb3dSCy Schubert msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
5672b15cb3dSCy Schubert exit(1);
5682b15cb3dSCy Schubert }
5692b15cb3dSCy Schubert c->resp_read_pipe = move_fd(pipe_ends[0]);
5702b15cb3dSCy Schubert c->resp_write_pipe = move_fd(pipe_ends[1]);
5712b15cb3dSCy Schubert c->ispipe = is_pipe;
5722b15cb3dSCy Schubert flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
5732b15cb3dSCy Schubert if (-1 == flags) {
5742b15cb3dSCy Schubert msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
5752b15cb3dSCy Schubert exit(1);
5762b15cb3dSCy Schubert }
5772b15cb3dSCy Schubert rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
5782b15cb3dSCy Schubert if (-1 == rc) {
5792b15cb3dSCy Schubert msyslog(LOG_ERR,
5802b15cb3dSCy Schubert "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
5812b15cb3dSCy Schubert exit(1);
5822b15cb3dSCy Schubert }
5832b15cb3dSCy Schubert (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
5842b15cb3dSCy Schubert pthread_attr_init(&thr_attr);
5852b15cb3dSCy Schubert pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
5862b15cb3dSCy Schubert #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
5872b15cb3dSCy Schubert defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
58868ba7e87SXin LI rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
58968ba7e87SXin LI if (0 != rc) {
5902b15cb3dSCy Schubert msyslog(LOG_ERR,
59168ba7e87SXin LI "start_blocking_thread: pthread_attr_getstacksize() -> %s",
59268ba7e87SXin LI strerror(rc));
59368ba7e87SXin LI } else {
59468ba7e87SXin LI nstacksize = ostacksize;
595a466cc55SCy Schubert /* order is important here: first clamp on upper limit,
596a466cc55SCy Schubert * and the PTHREAD min stack size is ultimate override!
597a466cc55SCy Schubert */
598a466cc55SCy Schubert if (nstacksize > THREAD_MAXSTACKSIZE)
599a466cc55SCy Schubert nstacksize = THREAD_MAXSTACKSIZE;
600a466cc55SCy Schubert # ifdef PTHREAD_STACK_MAX
601a466cc55SCy Schubert if (nstacksize > PTHREAD_STACK_MAX)
602a466cc55SCy Schubert nstacksize = PTHREAD_STACK_MAX;
603a466cc55SCy Schubert # endif
604a466cc55SCy Schubert
605a466cc55SCy Schubert /* now clamp on lower stack limit. */
606a466cc55SCy Schubert if (nstacksize < THREAD_MINSTACKSIZE)
607a466cc55SCy Schubert nstacksize = THREAD_MINSTACKSIZE;
608a466cc55SCy Schubert # ifdef PTHREAD_STACK_MIN
609a466cc55SCy Schubert if (nstacksize < PTHREAD_STACK_MIN)
610a466cc55SCy Schubert nstacksize = PTHREAD_STACK_MIN;
611a466cc55SCy Schubert # endif
612a466cc55SCy Schubert
61368ba7e87SXin LI if (nstacksize != ostacksize)
61468ba7e87SXin LI rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
61568ba7e87SXin LI if (0 != rc)
6162b15cb3dSCy Schubert msyslog(LOG_ERR,
61768ba7e87SXin LI "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
61868ba7e87SXin LI (u_long)ostacksize, (u_long)nstacksize,
61968ba7e87SXin LI strerror(rc));
6202b15cb3dSCy Schubert }
6212b15cb3dSCy Schubert #else
62268ba7e87SXin LI UNUSED_ARG(nstacksize);
62368ba7e87SXin LI UNUSED_ARG(ostacksize);
6242b15cb3dSCy Schubert #endif
6252b15cb3dSCy Schubert #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
6262b15cb3dSCy Schubert pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
6272b15cb3dSCy Schubert #endif
6282b15cb3dSCy Schubert c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
6292b15cb3dSCy Schubert block_thread_signals(&saved_sig_mask);
6303311ff84SXin LI rc = pthread_create(&c->thr_table[0], &thr_attr,
6312b15cb3dSCy Schubert &blocking_thread, c);
6322b15cb3dSCy Schubert pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
6332b15cb3dSCy Schubert pthread_attr_destroy(&thr_attr);
6342b15cb3dSCy Schubert if (0 != rc) {
63568ba7e87SXin LI msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
63668ba7e87SXin LI strerror(rc));
6372b15cb3dSCy Schubert exit(1);
6382b15cb3dSCy Schubert }
6393311ff84SXin LI c->thread_ref = &c->thr_table[0];
6402b15cb3dSCy Schubert }
6412b15cb3dSCy Schubert #endif
6422b15cb3dSCy Schubert
6433311ff84SXin LI /* --------------------------------------------------------------------
6442b15cb3dSCy Schubert * block_thread_signals()
6452b15cb3dSCy Schubert *
6462b15cb3dSCy Schubert * Temporarily block signals used by ntpd main thread, so that signal
6472b15cb3dSCy Schubert * mask inherited by child threads leaves them blocked. Returns prior
6482b15cb3dSCy Schubert * active signal mask via pmask, to be restored by the main thread
6492b15cb3dSCy Schubert * after pthread_create().
6502b15cb3dSCy Schubert */
6512b15cb3dSCy Schubert #ifndef SYS_WINNT
6522b15cb3dSCy Schubert void
block_thread_signals(sigset_t * pmask)6532b15cb3dSCy Schubert block_thread_signals(
6542b15cb3dSCy Schubert sigset_t * pmask
6552b15cb3dSCy Schubert )
6562b15cb3dSCy Schubert {
6572b15cb3dSCy Schubert sigset_t block;
6582b15cb3dSCy Schubert
6592b15cb3dSCy Schubert sigemptyset(&block);
6602b15cb3dSCy Schubert # ifdef HAVE_SIGNALED_IO
6612b15cb3dSCy Schubert # ifdef SIGIO
6622b15cb3dSCy Schubert sigaddset(&block, SIGIO);
6632b15cb3dSCy Schubert # endif
6642b15cb3dSCy Schubert # ifdef SIGPOLL
6652b15cb3dSCy Schubert sigaddset(&block, SIGPOLL);
6662b15cb3dSCy Schubert # endif
6672b15cb3dSCy Schubert # endif /* HAVE_SIGNALED_IO */
6682b15cb3dSCy Schubert sigaddset(&block, SIGALRM);
6692b15cb3dSCy Schubert sigaddset(&block, MOREDEBUGSIG);
6702b15cb3dSCy Schubert sigaddset(&block, LESSDEBUGSIG);
6712b15cb3dSCy Schubert # ifdef SIGDIE1
6722b15cb3dSCy Schubert sigaddset(&block, SIGDIE1);
6732b15cb3dSCy Schubert # endif
6742b15cb3dSCy Schubert # ifdef SIGDIE2
6752b15cb3dSCy Schubert sigaddset(&block, SIGDIE2);
6762b15cb3dSCy Schubert # endif
6772b15cb3dSCy Schubert # ifdef SIGDIE3
6782b15cb3dSCy Schubert sigaddset(&block, SIGDIE3);
6792b15cb3dSCy Schubert # endif
6802b15cb3dSCy Schubert # ifdef SIGDIE4
6812b15cb3dSCy Schubert sigaddset(&block, SIGDIE4);
6822b15cb3dSCy Schubert # endif
6832b15cb3dSCy Schubert # ifdef SIGBUS
6842b15cb3dSCy Schubert sigaddset(&block, SIGBUS);
6852b15cb3dSCy Schubert # endif
6862b15cb3dSCy Schubert sigemptyset(pmask);
6872b15cb3dSCy Schubert pthread_sigmask(SIG_BLOCK, &block, pmask);
6882b15cb3dSCy Schubert }
6892b15cb3dSCy Schubert #endif /* !SYS_WINNT */
6902b15cb3dSCy Schubert
6912b15cb3dSCy Schubert
6923311ff84SXin LI /* --------------------------------------------------------------------
6933311ff84SXin LI * Create & destroy semaphores. This is sufficiently different between
6943311ff84SXin LI * POSIX and Windows to warrant wrapper functions and close enough to
6953311ff84SXin LI * use the concept of synchronization via semaphore for all platforms.
6963311ff84SXin LI */
6973311ff84SXin LI static sem_ref
create_sema(sema_type * semptr,u_int inival,u_int maxval)6983311ff84SXin LI create_sema(
6993311ff84SXin LI sema_type* semptr,
7003311ff84SXin LI u_int inival,
7013311ff84SXin LI u_int maxval)
7023311ff84SXin LI {
7033311ff84SXin LI #ifdef SYS_WINNT
7043311ff84SXin LI
7053311ff84SXin LI long svini, svmax;
7063311ff84SXin LI if (NULL != semptr) {
7073311ff84SXin LI svini = (inival < LONG_MAX)
7083311ff84SXin LI ? (long)inival : LONG_MAX;
7093311ff84SXin LI svmax = (maxval < LONG_MAX && maxval > 0)
7103311ff84SXin LI ? (long)maxval : LONG_MAX;
7113311ff84SXin LI semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
7123311ff84SXin LI if (NULL == semptr->shnd)
7133311ff84SXin LI semptr = NULL;
7143311ff84SXin LI }
7153311ff84SXin LI
7163311ff84SXin LI #else
7173311ff84SXin LI
7183311ff84SXin LI (void)maxval;
7193311ff84SXin LI if (semptr && sem_init(semptr, FALSE, inival))
7203311ff84SXin LI semptr = NULL;
7213311ff84SXin LI
7223311ff84SXin LI #endif
7233311ff84SXin LI
7243311ff84SXin LI return semptr;
7253311ff84SXin LI }
7263311ff84SXin LI
7273311ff84SXin LI /* ------------------------------------------------------------------ */
7283311ff84SXin LI static sem_ref
delete_sema(sem_ref obj)7293311ff84SXin LI delete_sema(
7303311ff84SXin LI sem_ref obj)
7313311ff84SXin LI {
7323311ff84SXin LI
7333311ff84SXin LI # ifdef SYS_WINNT
7343311ff84SXin LI
7353311ff84SXin LI if (obj) {
7363311ff84SXin LI if (obj->shnd)
7373311ff84SXin LI CloseHandle(obj->shnd);
7383311ff84SXin LI obj->shnd = NULL;
7393311ff84SXin LI }
7403311ff84SXin LI
7413311ff84SXin LI # else
7423311ff84SXin LI
7433311ff84SXin LI if (obj)
7443311ff84SXin LI sem_destroy(obj);
7453311ff84SXin LI
7463311ff84SXin LI # endif
7473311ff84SXin LI
7483311ff84SXin LI return NULL;
7493311ff84SXin LI }
7503311ff84SXin LI
7513311ff84SXin LI /* --------------------------------------------------------------------
7522b15cb3dSCy Schubert * prepare_child_sems()
7532b15cb3dSCy Schubert *
7543311ff84SXin LI * create sync & access semaphores
7552b15cb3dSCy Schubert *
7563311ff84SXin LI * All semaphores are cleared, only the access semaphore has 1 unit.
7573311ff84SXin LI * Childs wait on 'workitems_pending', then grabs 'sema_access'
7583311ff84SXin LI * and dequeues jobs. When done, 'sema_access' is given one unit back.
7593311ff84SXin LI *
7603311ff84SXin LI * The producer grabs 'sema_access', manages the queue, restores
7613311ff84SXin LI * 'sema_access' and puts one unit into 'workitems_pending'.
7623311ff84SXin LI *
7633311ff84SXin LI * The story goes the same for the response queue.
7642b15cb3dSCy Schubert */
7652b15cb3dSCy Schubert static void
prepare_child_sems(blocking_child * c)7662b15cb3dSCy Schubert prepare_child_sems(
7672b15cb3dSCy Schubert blocking_child *c
7682b15cb3dSCy Schubert )
7692b15cb3dSCy Schubert {
7704990d495SXin LI if (NULL == worker_memlock)
7714990d495SXin LI worker_memlock = create_sema(&worker_mmutex, 1, 1);
7724990d495SXin LI
7733311ff84SXin LI c->accesslock = create_sema(&c->sem_table[0], 1, 1);
7743311ff84SXin LI c->workitems_pending = create_sema(&c->sem_table[1], 0, 0);
7753311ff84SXin LI c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
7763311ff84SXin LI # ifndef WORK_PIPE
7773311ff84SXin LI c->responses_pending = create_sema(&c->sem_table[3], 0, 0);
7782b15cb3dSCy Schubert # endif
7793311ff84SXin LI }
7802b15cb3dSCy Schubert
7813311ff84SXin LI /* --------------------------------------------------------------------
7823311ff84SXin LI * wait for semaphore. Where the wait can be interrupted, it will
7833311ff84SXin LI * internally resume -- When this function returns, there is either no
7843311ff84SXin LI * semaphore at all, a timeout occurred, or the caller could
7853311ff84SXin LI * successfully take a token from the semaphore.
7863311ff84SXin LI *
7873311ff84SXin LI * For untimed wait, not checking the result of this function at all is
7883311ff84SXin LI * definitely an option.
7893311ff84SXin LI */
7902b15cb3dSCy Schubert static int
wait_for_sem(sem_ref sem,struct timespec * timeout)7912b15cb3dSCy Schubert wait_for_sem(
7922b15cb3dSCy Schubert sem_ref sem,
7932b15cb3dSCy Schubert struct timespec * timeout /* wall-clock */
7942b15cb3dSCy Schubert )
7952b15cb3dSCy Schubert #ifdef SYS_WINNT
7962b15cb3dSCy Schubert {
7972b15cb3dSCy Schubert struct timespec now;
7982b15cb3dSCy Schubert struct timespec delta;
7992b15cb3dSCy Schubert DWORD msec;
8002b15cb3dSCy Schubert DWORD rc;
8012b15cb3dSCy Schubert
8023311ff84SXin LI if (!(sem && sem->shnd)) {
8033311ff84SXin LI errno = EINVAL;
8043311ff84SXin LI return -1;
8053311ff84SXin LI }
8063311ff84SXin LI
8072b15cb3dSCy Schubert if (NULL == timeout) {
8082b15cb3dSCy Schubert msec = INFINITE;
8092b15cb3dSCy Schubert } else {
8102b15cb3dSCy Schubert getclock(TIMEOFDAY, &now);
8112b15cb3dSCy Schubert delta = sub_tspec(*timeout, now);
8122b15cb3dSCy Schubert if (delta.tv_sec < 0) {
8132b15cb3dSCy Schubert msec = 0;
8142b15cb3dSCy Schubert } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
8152b15cb3dSCy Schubert msec = INFINITE;
8162b15cb3dSCy Schubert } else {
8172b15cb3dSCy Schubert msec = 1000 * (DWORD)delta.tv_sec;
8182b15cb3dSCy Schubert msec += delta.tv_nsec / (1000 * 1000);
8192b15cb3dSCy Schubert }
8202b15cb3dSCy Schubert }
8213311ff84SXin LI rc = WaitForSingleObject(sem->shnd, msec);
8222b15cb3dSCy Schubert if (WAIT_OBJECT_0 == rc)
8232b15cb3dSCy Schubert return 0;
8242b15cb3dSCy Schubert if (WAIT_TIMEOUT == rc) {
8252b15cb3dSCy Schubert errno = ETIMEDOUT;
8262b15cb3dSCy Schubert return -1;
8272b15cb3dSCy Schubert }
8282b15cb3dSCy Schubert msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
8292b15cb3dSCy Schubert errno = EFAULT;
8302b15cb3dSCy Schubert return -1;
8312b15cb3dSCy Schubert }
8322b15cb3dSCy Schubert #else /* pthreads wait_for_sem() follows */
8332b15cb3dSCy Schubert {
8343311ff84SXin LI int rc = -1;
8352b15cb3dSCy Schubert
8363311ff84SXin LI if (sem) do {
8372b15cb3dSCy Schubert if (NULL == timeout)
8382b15cb3dSCy Schubert rc = sem_wait(sem);
8392b15cb3dSCy Schubert else
8402b15cb3dSCy Schubert rc = sem_timedwait(sem, timeout);
8413311ff84SXin LI } while (rc == -1 && errno == EINTR);
8423311ff84SXin LI else
8433311ff84SXin LI errno = EINVAL;
8442b15cb3dSCy Schubert
8452b15cb3dSCy Schubert return rc;
8462b15cb3dSCy Schubert }
8472b15cb3dSCy Schubert #endif
8482b15cb3dSCy Schubert
8493311ff84SXin LI /* --------------------------------------------------------------------
8503311ff84SXin LI * blocking_thread - thread functions have WINAPI (aka 'stdcall')
8513311ff84SXin LI * calling conventions under Windows and POSIX-defined signature
8523311ff84SXin LI * otherwise.
8532b15cb3dSCy Schubert */
8542b15cb3dSCy Schubert #ifdef SYS_WINNT
8553311ff84SXin LI u_int WINAPI
8562b15cb3dSCy Schubert #else
8572b15cb3dSCy Schubert void *
8582b15cb3dSCy Schubert #endif
blocking_thread(void * ThreadArg)8592b15cb3dSCy Schubert blocking_thread(
8602b15cb3dSCy Schubert void * ThreadArg
8612b15cb3dSCy Schubert )
8622b15cb3dSCy Schubert {
8632b15cb3dSCy Schubert blocking_child *c;
8642b15cb3dSCy Schubert
8652b15cb3dSCy Schubert c = ThreadArg;
8662b15cb3dSCy Schubert exit_worker(blocking_child_common(c));
8672b15cb3dSCy Schubert
8682b15cb3dSCy Schubert /* NOTREACHED */
8692b15cb3dSCy Schubert return 0;
8702b15cb3dSCy Schubert }
8712b15cb3dSCy Schubert
8723311ff84SXin LI /* --------------------------------------------------------------------
8732b15cb3dSCy Schubert * req_child_exit() runs in the parent.
8743311ff84SXin LI *
8753311ff84SXin LI * This function is called from from the idle timer, too, and possibly
8763311ff84SXin LI * without a thread being there any longer. Since we have folded up our
8773311ff84SXin LI * tent in that case and all the semaphores are already gone, we simply
8783311ff84SXin LI * ignore this request in this case.
8793311ff84SXin LI *
8803311ff84SXin LI * Since the existence of the semaphores is controlled exclusively by
8813311ff84SXin LI * the parent, there's no risk of data race here.
8822b15cb3dSCy Schubert */
8832b15cb3dSCy Schubert int
req_child_exit(blocking_child * c)8842b15cb3dSCy Schubert req_child_exit(
8852b15cb3dSCy Schubert blocking_child *c
8862b15cb3dSCy Schubert )
8872b15cb3dSCy Schubert {
8883311ff84SXin LI return (c->accesslock)
8893311ff84SXin LI ? queue_req_pointer(c, CHILD_EXIT_REQ)
8903311ff84SXin LI : 0;
8912b15cb3dSCy Schubert }
8922b15cb3dSCy Schubert
8933311ff84SXin LI /* --------------------------------------------------------------------
8942b15cb3dSCy Schubert * cleanup_after_child() runs in parent.
8952b15cb3dSCy Schubert */
8962b15cb3dSCy Schubert static void
cleanup_after_child(blocking_child * c)8972b15cb3dSCy Schubert cleanup_after_child(
8982b15cb3dSCy Schubert blocking_child * c
8992b15cb3dSCy Schubert )
9002b15cb3dSCy Schubert {
9012b15cb3dSCy Schubert DEBUG_INSIST(!c->reusable);
9023311ff84SXin LI
9032b15cb3dSCy Schubert # ifdef SYS_WINNT
9043311ff84SXin LI /* The thread was not created in detached state, so we better
9053311ff84SXin LI * clean up.
9063311ff84SXin LI */
9073311ff84SXin LI if (c->thread_ref && c->thread_ref->thnd) {
9083311ff84SXin LI WaitForSingleObject(c->thread_ref->thnd, INFINITE);
9093311ff84SXin LI INSIST(CloseHandle(c->thread_ref->thnd));
9103311ff84SXin LI c->thread_ref->thnd = NULL;
9113311ff84SXin LI }
9122b15cb3dSCy Schubert # endif
9132b15cb3dSCy Schubert c->thread_ref = NULL;
9143311ff84SXin LI
9153311ff84SXin LI /* remove semaphores and (if signalling vi IO) pipes */
9163311ff84SXin LI
9173311ff84SXin LI c->accesslock = delete_sema(c->accesslock);
9183311ff84SXin LI c->workitems_pending = delete_sema(c->workitems_pending);
9193311ff84SXin LI c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
9203311ff84SXin LI
9212b15cb3dSCy Schubert # ifdef WORK_PIPE
9222b15cb3dSCy Schubert DEBUG_INSIST(-1 != c->resp_read_pipe);
9232b15cb3dSCy Schubert DEBUG_INSIST(-1 != c->resp_write_pipe);
9242b15cb3dSCy Schubert (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
9252b15cb3dSCy Schubert close(c->resp_write_pipe);
9262b15cb3dSCy Schubert close(c->resp_read_pipe);
9272b15cb3dSCy Schubert c->resp_write_pipe = -1;
9282b15cb3dSCy Schubert c->resp_read_pipe = -1;
9292b15cb3dSCy Schubert # else
9303311ff84SXin LI DEBUG_INSIST(NULL != c->responses_pending);
9313311ff84SXin LI (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
9323311ff84SXin LI c->responses_pending = delete_sema(c->responses_pending);
9332b15cb3dSCy Schubert # endif
9343311ff84SXin LI
9353311ff84SXin LI /* Is it necessary to check if there are pending requests and
9363311ff84SXin LI * responses? If so, and if there are, what to do with them?
9373311ff84SXin LI */
9383311ff84SXin LI
9393311ff84SXin LI /* re-init buffer index sequencers */
9403311ff84SXin LI c->head_workitem = 0;
9413311ff84SXin LI c->tail_workitem = 0;
9423311ff84SXin LI c->head_response = 0;
9433311ff84SXin LI c->tail_response = 0;
9443311ff84SXin LI
9452b15cb3dSCy Schubert c->reusable = TRUE;
9462b15cb3dSCy Schubert }
9472b15cb3dSCy Schubert
9482b15cb3dSCy Schubert
9492b15cb3dSCy Schubert #else /* !WORK_THREAD follows */
9502b15cb3dSCy Schubert char work_thread_nonempty_compilation_unit;
9512b15cb3dSCy Schubert #endif
952