1*eabc0478Schristos /* $NetBSD: work_thread.c,v 1.9 2024/08/18 20:47:13 christos Exp $ */ 28585484eSchristos 38585484eSchristos /* 48585484eSchristos * work_thread.c - threads implementation for blocking worker child. 58585484eSchristos */ 68585484eSchristos #include <config.h> 78585484eSchristos #include "ntp_workimpl.h" 88585484eSchristos 98585484eSchristos #ifdef WORK_THREAD 108585484eSchristos 118585484eSchristos #include <stdio.h> 128585484eSchristos #include <ctype.h> 138585484eSchristos #include <signal.h> 148585484eSchristos #ifndef SYS_WINNT 158585484eSchristos #include <pthread.h> 168585484eSchristos #endif 178585484eSchristos 188585484eSchristos #include "ntp_stdlib.h" 198585484eSchristos #include "ntp_malloc.h" 208585484eSchristos #include "ntp_syslog.h" 218585484eSchristos #include "ntpd.h" 228585484eSchristos #include "ntp_io.h" 238585484eSchristos #include "ntp_assert.h" 248585484eSchristos #include "ntp_unixtime.h" 258585484eSchristos #include "timespecops.h" 268585484eSchristos #include "ntp_worker.h" 278585484eSchristos 288585484eSchristos #define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1) 298585484eSchristos #define CHILD_GONE_RESP CHILD_EXIT_REQ 3068dbbb44Schristos /* Queue size increments: 3168dbbb44Schristos * The request queue grows a bit faster than the response queue -- the 324eea345dSchristos * daemon can push requests and pull results faster on avarage than the 3368dbbb44Schristos * worker can process requests and push results... If this really pays 3468dbbb44Schristos * off is debatable. 3568dbbb44Schristos */ 368585484eSchristos #define WORKITEMS_ALLOC_INC 16 378585484eSchristos #define RESPONSES_ALLOC_INC 4 388585484eSchristos 3968dbbb44Schristos /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we 4068dbbb44Schristos * set the maximum to 256kB. If the minimum goes below the 4168dbbb44Schristos * system-defined minimum stack size, we have to adjust accordingly. 4268dbbb44Schristos */ 438585484eSchristos #ifndef THREAD_MINSTACKSIZE 448585484eSchristos # define THREAD_MINSTACKSIZE (64U * 1024) 458585484eSchristos #endif 4668dbbb44Schristos 4768dbbb44Schristos #ifndef THREAD_MAXSTACKSIZE 4868dbbb44Schristos # define THREAD_MAXSTACKSIZE (256U * 1024) 4968dbbb44Schristos #endif 5068dbbb44Schristos 5179045f13Schristos /* need a good integer to store a pointer... */ 5279045f13Schristos #ifndef UINTPTR_T 5379045f13Schristos # if defined(UINTPTR_MAX) 5479045f13Schristos # define UINTPTR_T uintptr_t 5579045f13Schristos # elif defined(UINT_PTR) 5679045f13Schristos # define UINTPTR_T UINT_PTR 5779045f13Schristos # else 5879045f13Schristos # define UINTPTR_T size_t 5979045f13Schristos # endif 6079045f13Schristos #endif 6179045f13Schristos 628585484eSchristos 638585484eSchristos #ifdef SYS_WINNT 648b8da087Schristos 658585484eSchristos # define thread_exit(c) _endthreadex(c) 668b8da087Schristos # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL) 678b8da087Schristos u_int WINAPI blocking_thread(void *); 688b8da087Schristos static BOOL same_os_sema(const sem_ref obj, void * osobj); 698b8da087Schristos 708585484eSchristos #else 718b8da087Schristos 7279045f13Schristos # define thread_exit(c) pthread_exit((void*)(UINTPTR_T)(c)) 738585484eSchristos # define tickle_sem sem_post 748b8da087Schristos void * blocking_thread(void *); 758b8da087Schristos static void block_thread_signals(sigset_t *); 768b8da087Schristos 778585484eSchristos #endif 788585484eSchristos 798585484eSchristos #ifdef WORK_PIPE 808585484eSchristos addremove_io_fd_func addremove_io_fd; 818585484eSchristos #else 828585484eSchristos addremove_io_semaphore_func addremove_io_semaphore; 838585484eSchristos #endif 848585484eSchristos 858585484eSchristos static void start_blocking_thread(blocking_child *); 868585484eSchristos static void start_blocking_thread_internal(blocking_child *); 878585484eSchristos static void prepare_child_sems(blocking_child *); 888585484eSchristos static int wait_for_sem(sem_ref, struct timespec *); 898b8da087Schristos static int ensure_workitems_empty_slot(blocking_child *); 908b8da087Schristos static int ensure_workresp_empty_slot(blocking_child *); 918585484eSchristos static int queue_req_pointer(blocking_child *, blocking_pipe_header *); 928585484eSchristos static void cleanup_after_child(blocking_child *); 938585484eSchristos 9468dbbb44Schristos static sema_type worker_mmutex; 9568dbbb44Schristos static sem_ref worker_memlock; 968585484eSchristos 9768dbbb44Schristos /* -------------------------------------------------------------------- 9868dbbb44Schristos * locking the global worker state table (and other global stuff) 9968dbbb44Schristos */ 10068dbbb44Schristos void 10168dbbb44Schristos worker_global_lock( 10268dbbb44Schristos int inOrOut) 10368dbbb44Schristos { 10468dbbb44Schristos if (worker_memlock) { 10568dbbb44Schristos if (inOrOut) 10668dbbb44Schristos wait_for_sem(worker_memlock, NULL); 10768dbbb44Schristos else 10868dbbb44Schristos tickle_sem(worker_memlock); 10968dbbb44Schristos } 11068dbbb44Schristos } 11168dbbb44Schristos 11268dbbb44Schristos /* -------------------------------------------------------------------- 11368dbbb44Schristos * implementation isolation wrapper 11468dbbb44Schristos */ 1158585484eSchristos void 1168585484eSchristos exit_worker( 1178585484eSchristos int exitcode 1188585484eSchristos ) 1198585484eSchristos { 1208585484eSchristos thread_exit(exitcode); /* see #define thread_exit */ 1218585484eSchristos } 1228585484eSchristos 1238b8da087Schristos /* -------------------------------------------------------------------- 1248b8da087Schristos * sleep for a given time or until the wakup semaphore is tickled. 1258b8da087Schristos */ 1268585484eSchristos int 1278585484eSchristos worker_sleep( 1288585484eSchristos blocking_child * c, 1298585484eSchristos time_t seconds 1308585484eSchristos ) 1318585484eSchristos { 1328585484eSchristos struct timespec until; 1338585484eSchristos int rc; 1348585484eSchristos 1358585484eSchristos # ifdef HAVE_CLOCK_GETTIME 1368585484eSchristos if (0 != clock_gettime(CLOCK_REALTIME, &until)) { 1378585484eSchristos msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m"); 1388585484eSchristos return -1; 1398585484eSchristos } 1408585484eSchristos # else 1418585484eSchristos if (0 != getclock(TIMEOFDAY, &until)) { 1428585484eSchristos msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m"); 1438585484eSchristos return -1; 1448585484eSchristos } 1458585484eSchristos # endif 1468585484eSchristos until.tv_sec += seconds; 1478585484eSchristos rc = wait_for_sem(c->wake_scheduled_sleep, &until); 1488585484eSchristos if (0 == rc) 1498585484eSchristos return -1; 1508585484eSchristos if (-1 == rc && ETIMEDOUT == errno) 1518585484eSchristos return 0; 1528585484eSchristos msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m"); 1538585484eSchristos return -1; 1548585484eSchristos } 1558585484eSchristos 1568585484eSchristos 1578b8da087Schristos /* -------------------------------------------------------------------- 1588b8da087Schristos * Wake up a worker that takes a nap. 1598b8da087Schristos */ 1608585484eSchristos void 1618585484eSchristos interrupt_worker_sleep(void) 1628585484eSchristos { 1638585484eSchristos u_int idx; 1648585484eSchristos blocking_child * c; 1658585484eSchristos 1668585484eSchristos for (idx = 0; idx < blocking_children_alloc; idx++) { 1678585484eSchristos c = blocking_children[idx]; 1688585484eSchristos if (NULL == c || NULL == c->wake_scheduled_sleep) 1698585484eSchristos continue; 1708585484eSchristos tickle_sem(c->wake_scheduled_sleep); 1718585484eSchristos } 1728585484eSchristos } 1738585484eSchristos 1748b8da087Schristos /* -------------------------------------------------------------------- 1758b8da087Schristos * Make sure there is an empty slot at the head of the request 1768b8da087Schristos * queue. Tell if the queue is currently empty. 1778b8da087Schristos */ 1788b8da087Schristos static int 1798585484eSchristos ensure_workitems_empty_slot( 1808585484eSchristos blocking_child *c 1818585484eSchristos ) 1828585484eSchristos { 1838b8da087Schristos /* 1848b8da087Schristos ** !!! PRECONDITION: caller holds access lock! 1858b8da087Schristos ** 1868b8da087Schristos ** This simply tries to increase the size of the buffer if it 1878b8da087Schristos ** becomes full. The resize operation does *not* maintain the 1888b8da087Schristos ** order of requests, but that should be irrelevant since the 1898b8da087Schristos ** processing is considered asynchronous anyway. 1908b8da087Schristos ** 1918b8da087Schristos ** Return if the buffer is currently empty. 1928b8da087Schristos */ 1938b8da087Schristos 1948b8da087Schristos static const size_t each = 1958b8da087Schristos sizeof(blocking_children[0]->workitems[0]); 1968b8da087Schristos 1978585484eSchristos size_t new_alloc; 1988b8da087Schristos size_t slots_used; 19968dbbb44Schristos size_t sidx; 2008585484eSchristos 2018b8da087Schristos slots_used = c->head_workitem - c->tail_workitem; 2028b8da087Schristos if (slots_used >= c->workitems_alloc) { 2038585484eSchristos new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; 2048b8da087Schristos c->workitems = erealloc(c->workitems, new_alloc * each); 20568dbbb44Schristos for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx) 20668dbbb44Schristos c->workitems[sidx] = NULL; 2078b8da087Schristos c->tail_workitem = 0; 2088b8da087Schristos c->head_workitem = c->workitems_alloc; 2098585484eSchristos c->workitems_alloc = new_alloc; 2108585484eSchristos } 21168dbbb44Schristos INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]); 2128b8da087Schristos return (0 == slots_used); 2138b8da087Schristos } 2148585484eSchristos 2158b8da087Schristos /* -------------------------------------------------------------------- 2168b8da087Schristos * Make sure there is an empty slot at the head of the response 2178b8da087Schristos * queue. Tell if the queue is currently empty. 2188b8da087Schristos */ 2198b8da087Schristos static int 2208585484eSchristos ensure_workresp_empty_slot( 2218585484eSchristos blocking_child *c 2228585484eSchristos ) 2238585484eSchristos { 2248b8da087Schristos /* 2258b8da087Schristos ** !!! PRECONDITION: caller holds access lock! 2268b8da087Schristos ** 2278b8da087Schristos ** Works like the companion function above. 2288b8da087Schristos */ 2298b8da087Schristos 2308b8da087Schristos static const size_t each = 2318b8da087Schristos sizeof(blocking_children[0]->responses[0]); 2328b8da087Schristos 2338585484eSchristos size_t new_alloc; 2348b8da087Schristos size_t slots_used; 23568dbbb44Schristos size_t sidx; 2368585484eSchristos 2378b8da087Schristos slots_used = c->head_response - c->tail_response; 2388b8da087Schristos if (slots_used >= c->responses_alloc) { 2398585484eSchristos new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; 2408b8da087Schristos c->responses = erealloc(c->responses, new_alloc * each); 24168dbbb44Schristos for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx) 24268dbbb44Schristos c->responses[sidx] = NULL; 2438b8da087Schristos c->tail_response = 0; 2448b8da087Schristos c->head_response = c->responses_alloc; 2458585484eSchristos c->responses_alloc = new_alloc; 2468585484eSchristos } 24768dbbb44Schristos INSIST(NULL == c->responses[c->head_response % c->responses_alloc]); 2488b8da087Schristos return (0 == slots_used); 2498b8da087Schristos } 2508585484eSchristos 2518585484eSchristos 2528b8da087Schristos /* -------------------------------------------------------------------- 2538585484eSchristos * queue_req_pointer() - append a work item or idle exit request to 2548b8da087Schristos * blocking_workitems[]. Employ proper locking. 2558585484eSchristos */ 2568585484eSchristos static int 2578585484eSchristos queue_req_pointer( 2588585484eSchristos blocking_child * c, 2598585484eSchristos blocking_pipe_header * hdr 2608585484eSchristos ) 2618585484eSchristos { 2628b8da087Schristos size_t qhead; 2638585484eSchristos 2648b8da087Schristos /* >>>> ACCESS LOCKING STARTS >>>> */ 2658b8da087Schristos wait_for_sem(c->accesslock, NULL); 2668b8da087Schristos ensure_workitems_empty_slot(c); 2678b8da087Schristos qhead = c->head_workitem; 2688b8da087Schristos c->workitems[qhead % c->workitems_alloc] = hdr; 2698b8da087Schristos c->head_workitem = 1 + qhead; 2708b8da087Schristos tickle_sem(c->accesslock); 2718b8da087Schristos /* <<<< ACCESS LOCKING ENDS <<<< */ 2728b8da087Schristos 2738b8da087Schristos /* queue consumer wake-up notification */ 2748b8da087Schristos tickle_sem(c->workitems_pending); 2758585484eSchristos 2768585484eSchristos return 0; 2778585484eSchristos } 2788585484eSchristos 2798b8da087Schristos /* -------------------------------------------------------------------- 2808b8da087Schristos * API function to make sure a worker is running, a proper private copy 2818b8da087Schristos * of the data is made, the data eneterd into the queue and the worker 2828b8da087Schristos * is signalled. 2838b8da087Schristos */ 2848585484eSchristos int 2858585484eSchristos send_blocking_req_internal( 2868585484eSchristos blocking_child * c, 2878585484eSchristos blocking_pipe_header * hdr, 2888585484eSchristos void * data 2898585484eSchristos ) 2908585484eSchristos { 2918585484eSchristos blocking_pipe_header * threadcopy; 2928585484eSchristos size_t payload_octets; 2938585484eSchristos 2948585484eSchristos REQUIRE(hdr != NULL); 2958585484eSchristos REQUIRE(data != NULL); 2968585484eSchristos DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig); 2978585484eSchristos 2988585484eSchristos if (hdr->octets <= sizeof(*hdr)) 2998585484eSchristos return 1; /* failure */ 3008585484eSchristos payload_octets = hdr->octets - sizeof(*hdr); 3018585484eSchristos 3028b8da087Schristos if (NULL == c->thread_ref) 3038585484eSchristos start_blocking_thread(c); 3048585484eSchristos threadcopy = emalloc(hdr->octets); 3058585484eSchristos memcpy(threadcopy, hdr, sizeof(*hdr)); 3068585484eSchristos memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); 3078585484eSchristos 3088585484eSchristos return queue_req_pointer(c, threadcopy); 3098585484eSchristos } 3108585484eSchristos 3118b8da087Schristos /* -------------------------------------------------------------------- 3128b8da087Schristos * Wait for the 'incoming queue no longer empty' signal, lock the shared 3138b8da087Schristos * structure and dequeue an item. 3148b8da087Schristos */ 3158585484eSchristos blocking_pipe_header * 3168585484eSchristos receive_blocking_req_internal( 3178585484eSchristos blocking_child * c 3188585484eSchristos ) 3198585484eSchristos { 3208585484eSchristos blocking_pipe_header * req; 3218b8da087Schristos size_t qhead, qtail; 3228585484eSchristos 3238b8da087Schristos req = NULL; 3248585484eSchristos do { 3258b8da087Schristos /* wait for tickle from the producer side */ 3268b8da087Schristos wait_for_sem(c->workitems_pending, NULL); 3278585484eSchristos 3288b8da087Schristos /* >>>> ACCESS LOCKING STARTS >>>> */ 3298b8da087Schristos wait_for_sem(c->accesslock, NULL); 3308b8da087Schristos qhead = c->head_workitem; 3318b8da087Schristos do { 3328b8da087Schristos qtail = c->tail_workitem; 3338b8da087Schristos if (qhead == qtail) 3348b8da087Schristos break; 3358b8da087Schristos c->tail_workitem = qtail + 1; 3368b8da087Schristos qtail %= c->workitems_alloc; 3378b8da087Schristos req = c->workitems[qtail]; 3388b8da087Schristos c->workitems[qtail] = NULL; 3398b8da087Schristos } while (NULL == req); 3408b8da087Schristos tickle_sem(c->accesslock); 3418b8da087Schristos /* <<<< ACCESS LOCKING ENDS <<<< */ 3428b8da087Schristos 3438b8da087Schristos } while (NULL == req); 3448b8da087Schristos 3458585484eSchristos INSIST(NULL != req); 3468585484eSchristos if (CHILD_EXIT_REQ == req) { /* idled out */ 3478585484eSchristos send_blocking_resp_internal(c, CHILD_GONE_RESP); 3488585484eSchristos req = NULL; 3498585484eSchristos } 3508585484eSchristos 3518585484eSchristos return req; 3528585484eSchristos } 3538585484eSchristos 3548b8da087Schristos /* -------------------------------------------------------------------- 3558b8da087Schristos * Push a response into the return queue and eventually tickle the 3568b8da087Schristos * receiver. 3578b8da087Schristos */ 3588585484eSchristos int 3598585484eSchristos send_blocking_resp_internal( 3608585484eSchristos blocking_child * c, 3618585484eSchristos blocking_pipe_header * resp 3628585484eSchristos ) 3638585484eSchristos { 3648b8da087Schristos size_t qhead; 3658b8da087Schristos int empty; 3668585484eSchristos 3678b8da087Schristos /* >>>> ACCESS LOCKING STARTS >>>> */ 3688b8da087Schristos wait_for_sem(c->accesslock, NULL); 3698b8da087Schristos empty = ensure_workresp_empty_slot(c); 3708b8da087Schristos qhead = c->head_response; 3718b8da087Schristos c->responses[qhead % c->responses_alloc] = resp; 3728b8da087Schristos c->head_response = 1 + qhead; 3738b8da087Schristos tickle_sem(c->accesslock); 3748b8da087Schristos /* <<<< ACCESS LOCKING ENDS <<<< */ 3758585484eSchristos 3768b8da087Schristos /* queue consumer wake-up notification */ 3778b8da087Schristos if (empty) 3788b8da087Schristos { 3798585484eSchristos # ifdef WORK_PIPE 38079045f13Schristos if (1 != write(c->resp_write_pipe, "", 1)) 381*eabc0478Schristos msyslog(LOG_WARNING, "async resolver: blocking_get%sinfo" 382*eabc0478Schristos " failed to notify main thread!", 383*eabc0478Schristos (BLOCKING_GETNAMEINFO == resp->rtype) 384*eabc0478Schristos ? "name" 385*eabc0478Schristos : "addr" 386*eabc0478Schristos ); 3878585484eSchristos # else 3888b8da087Schristos tickle_sem(c->responses_pending); 3898585484eSchristos # endif 3908b8da087Schristos } 3918585484eSchristos return 0; 3928585484eSchristos } 3938585484eSchristos 3948585484eSchristos 3958585484eSchristos #ifndef WORK_PIPE 3968b8da087Schristos 3978b8da087Schristos /* -------------------------------------------------------------------- 398*eabc0478Schristos * Check if a (Windows-)handle to a semaphore is actually the same we 3998b8da087Schristos * are using inside the sema wrapper. 4008b8da087Schristos */ 4018b8da087Schristos static BOOL 4028b8da087Schristos same_os_sema( 4038b8da087Schristos const sem_ref obj, 4048b8da087Schristos void* osh 4058b8da087Schristos ) 4068b8da087Schristos { 4078b8da087Schristos return obj && osh && (obj->shnd == (HANDLE)osh); 4088b8da087Schristos } 4098b8da087Schristos 4108b8da087Schristos /* -------------------------------------------------------------------- 4118b8da087Schristos * Find the shared context that associates to an OS handle and make sure 4128b8da087Schristos * the data is dequeued and processed. 4138b8da087Schristos */ 4148585484eSchristos void 4158585484eSchristos handle_blocking_resp_sem( 4168585484eSchristos void * context 4178585484eSchristos ) 4188585484eSchristos { 4198585484eSchristos blocking_child * c; 4208585484eSchristos u_int idx; 4218585484eSchristos 4228585484eSchristos c = NULL; 4238585484eSchristos for (idx = 0; idx < blocking_children_alloc; idx++) { 4248585484eSchristos c = blocking_children[idx]; 4258b8da087Schristos if (c != NULL && 4268b8da087Schristos c->thread_ref != NULL && 4278b8da087Schristos same_os_sema(c->responses_pending, context)) 4288585484eSchristos break; 4298585484eSchristos } 4308585484eSchristos if (idx < blocking_children_alloc) 4318585484eSchristos process_blocking_resp(c); 4328585484eSchristos } 4338585484eSchristos #endif /* !WORK_PIPE */ 4348585484eSchristos 4358b8da087Schristos /* -------------------------------------------------------------------- 4368b8da087Schristos * Fetch the next response from the return queue. In case of signalling 4378b8da087Schristos * via pipe, make sure the pipe is flushed, too. 4388b8da087Schristos */ 4398585484eSchristos blocking_pipe_header * 4408585484eSchristos receive_blocking_resp_internal( 4418585484eSchristos blocking_child * c 4428585484eSchristos ) 4438585484eSchristos { 4448585484eSchristos blocking_pipe_header * removed; 4458b8da087Schristos size_t qhead, qtail, slot; 4468b8da087Schristos 4478585484eSchristos #ifdef WORK_PIPE 4488585484eSchristos int rc; 4498585484eSchristos char scratch[32]; 4508585484eSchristos 4518b8da087Schristos do 4528585484eSchristos rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); 4538b8da087Schristos while (-1 == rc && EINTR == errno); 4548585484eSchristos #endif 4558b8da087Schristos 4568b8da087Schristos /* >>>> ACCESS LOCKING STARTS >>>> */ 4578b8da087Schristos wait_for_sem(c->accesslock, NULL); 4588b8da087Schristos qhead = c->head_response; 4598b8da087Schristos qtail = c->tail_response; 4608b8da087Schristos for (removed = NULL; !removed && (qhead != qtail); ++qtail) { 4618b8da087Schristos slot = qtail % c->responses_alloc; 4628b8da087Schristos removed = c->responses[slot]; 4638b8da087Schristos c->responses[slot] = NULL; 4648b8da087Schristos } 4658b8da087Schristos c->tail_response = qtail; 4668b8da087Schristos tickle_sem(c->accesslock); 4678b8da087Schristos /* <<<< ACCESS LOCKING ENDS <<<< */ 4688b8da087Schristos 4698585484eSchristos if (NULL != removed) { 4708585484eSchristos DEBUG_ENSURE(CHILD_GONE_RESP == removed || 4718585484eSchristos BLOCKING_RESP_MAGIC == removed->magic_sig); 4728585484eSchristos } 4738585484eSchristos if (CHILD_GONE_RESP == removed) { 4748585484eSchristos cleanup_after_child(c); 4758585484eSchristos removed = NULL; 4768585484eSchristos } 4778585484eSchristos 4788585484eSchristos return removed; 4798585484eSchristos } 4808585484eSchristos 4818b8da087Schristos /* -------------------------------------------------------------------- 4828b8da087Schristos * Light up a new worker. 4838b8da087Schristos */ 4848585484eSchristos static void 4858585484eSchristos start_blocking_thread( 4868585484eSchristos blocking_child * c 4878585484eSchristos ) 4888585484eSchristos { 4898585484eSchristos 4908585484eSchristos DEBUG_INSIST(!c->reusable); 4918585484eSchristos 4928585484eSchristos prepare_child_sems(c); 4938585484eSchristos start_blocking_thread_internal(c); 4948585484eSchristos } 4958585484eSchristos 4968b8da087Schristos /* -------------------------------------------------------------------- 4978b8da087Schristos * Create a worker thread. There are several differences between POSIX 498*eabc0478Schristos * and Windows, of course -- most notably the Windows thread is a 4998b8da087Schristos * detached thread, and we keep the handle around until we want to get 5008b8da087Schristos * rid of the thread. The notification scheme also differs: Windows 5018b8da087Schristos * makes use of semaphores in both directions, POSIX uses a pipe for 5028b8da087Schristos * integration with 'select()' or alike. 5038b8da087Schristos */ 5048585484eSchristos static void 5058585484eSchristos start_blocking_thread_internal( 5068585484eSchristos blocking_child * c 5078585484eSchristos ) 5088585484eSchristos #ifdef SYS_WINNT 5098585484eSchristos { 5108585484eSchristos BOOL resumed; 5118585484eSchristos 5128b8da087Schristos c->thread_ref = NULL; 5138b8da087Schristos (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE); 5148b8da087Schristos c->thr_table[0].thnd = 5158585484eSchristos (HANDLE)_beginthreadex( 5168585484eSchristos NULL, 5178585484eSchristos 0, 5188585484eSchristos &blocking_thread, 5198585484eSchristos c, 5208585484eSchristos CREATE_SUSPENDED, 5218b8da087Schristos NULL); 5228585484eSchristos 5238b8da087Schristos if (NULL == c->thr_table[0].thnd) { 5248585484eSchristos msyslog(LOG_ERR, "start blocking thread failed: %m"); 5258585484eSchristos exit(-1); 5268585484eSchristos } 5278585484eSchristos /* remember the thread priority is only within the process class */ 5288b8da087Schristos if (!SetThreadPriority(c->thr_table[0].thnd, 529*eabc0478Schristos THREAD_PRIORITY_BELOW_NORMAL)) { 5308585484eSchristos msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); 531*eabc0478Schristos } 532*eabc0478Schristos if (NULL != pSetThreadDescription) { 533*eabc0478Schristos (*pSetThreadDescription)(c->thr_table[0].thnd, L"ntp_worker"); 534*eabc0478Schristos } 5358b8da087Schristos resumed = ResumeThread(c->thr_table[0].thnd); 5368585484eSchristos DEBUG_INSIST(resumed); 5378b8da087Schristos c->thread_ref = &c->thr_table[0]; 5388585484eSchristos } 5398585484eSchristos #else /* pthreads start_blocking_thread_internal() follows */ 5408585484eSchristos { 5418585484eSchristos # ifdef NEED_PTHREAD_INIT 5428585484eSchristos static int pthread_init_called; 5438585484eSchristos # endif 5448585484eSchristos pthread_attr_t thr_attr; 5458585484eSchristos int rc; 5468585484eSchristos int pipe_ends[2]; /* read then write */ 5478585484eSchristos int is_pipe; 5488585484eSchristos int flags; 54968dbbb44Schristos size_t ostacksize; 55068dbbb44Schristos size_t nstacksize; 5518585484eSchristos sigset_t saved_sig_mask; 5528585484eSchristos 5538b8da087Schristos c->thread_ref = NULL; 5548b8da087Schristos 5558585484eSchristos # ifdef NEED_PTHREAD_INIT 5568585484eSchristos /* 5578585484eSchristos * from lib/isc/unix/app.c: 5588585484eSchristos * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this. 5598585484eSchristos */ 5608585484eSchristos if (!pthread_init_called) { 5618585484eSchristos pthread_init(); 5628585484eSchristos pthread_init_called = TRUE; 5638585484eSchristos } 5648585484eSchristos # endif 5658585484eSchristos 5668585484eSchristos rc = pipe_socketpair(&pipe_ends[0], &is_pipe); 5678585484eSchristos if (0 != rc) { 5688585484eSchristos msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m"); 5698585484eSchristos exit(1); 5708585484eSchristos } 5718585484eSchristos c->resp_read_pipe = move_fd(pipe_ends[0]); 5728585484eSchristos c->resp_write_pipe = move_fd(pipe_ends[1]); 5738585484eSchristos c->ispipe = is_pipe; 5748585484eSchristos flags = fcntl(c->resp_read_pipe, F_GETFL, 0); 5758585484eSchristos if (-1 == flags) { 5768585484eSchristos msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m"); 5778585484eSchristos exit(1); 5788585484eSchristos } 5798585484eSchristos rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags); 5808585484eSchristos if (-1 == rc) { 5818585484eSchristos msyslog(LOG_ERR, 5828585484eSchristos "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m"); 5838585484eSchristos exit(1); 5848585484eSchristos } 5858585484eSchristos (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE); 5868585484eSchristos pthread_attr_init(&thr_attr); 5878585484eSchristos pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED); 5888585484eSchristos #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \ 5898585484eSchristos defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE) 59068dbbb44Schristos rc = pthread_attr_getstacksize(&thr_attr, &ostacksize); 59168dbbb44Schristos if (0 != rc) { 5928585484eSchristos msyslog(LOG_ERR, 59368dbbb44Schristos "start_blocking_thread: pthread_attr_getstacksize() -> %s", 59468dbbb44Schristos strerror(rc)); 59568dbbb44Schristos } else { 59668dbbb44Schristos nstacksize = ostacksize; 597*eabc0478Schristos /* order is important here: first clamp on upper limit, 598*eabc0478Schristos * and the PTHREAD min stack size is ultimate override! 599*eabc0478Schristos */ 600*eabc0478Schristos if (nstacksize > THREAD_MAXSTACKSIZE) 601*eabc0478Schristos nstacksize = THREAD_MAXSTACKSIZE; 602*eabc0478Schristos # ifdef PTHREAD_STACK_MAX 603*eabc0478Schristos if (nstacksize > PTHREAD_STACK_MAX) 604*eabc0478Schristos nstacksize = PTHREAD_STACK_MAX; 605*eabc0478Schristos # endif 606*eabc0478Schristos 607*eabc0478Schristos /* now clamp on lower stack limit. */ 608*eabc0478Schristos if (nstacksize < THREAD_MINSTACKSIZE) 609*eabc0478Schristos nstacksize = THREAD_MINSTACKSIZE; 610*eabc0478Schristos # ifdef PTHREAD_STACK_MIN 611*eabc0478Schristos if (nstacksize < PTHREAD_STACK_MIN) 612*eabc0478Schristos nstacksize = PTHREAD_STACK_MIN; 613*eabc0478Schristos # endif 614*eabc0478Schristos 61568dbbb44Schristos if (nstacksize != ostacksize) 61668dbbb44Schristos rc = pthread_attr_setstacksize(&thr_attr, nstacksize); 61768dbbb44Schristos if (0 != rc) 6188585484eSchristos msyslog(LOG_ERR, 61968dbbb44Schristos "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s", 62068dbbb44Schristos (u_long)ostacksize, (u_long)nstacksize, 62168dbbb44Schristos strerror(rc)); 6228585484eSchristos } 6238585484eSchristos #else 62468dbbb44Schristos UNUSED_ARG(nstacksize); 62568dbbb44Schristos UNUSED_ARG(ostacksize); 6268585484eSchristos #endif 6278585484eSchristos #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM) 6288585484eSchristos pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM); 6298585484eSchristos #endif 6308585484eSchristos c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); 6318585484eSchristos block_thread_signals(&saved_sig_mask); 6328b8da087Schristos rc = pthread_create(&c->thr_table[0], &thr_attr, 6338585484eSchristos &blocking_thread, c); 6348585484eSchristos pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); 6358585484eSchristos pthread_attr_destroy(&thr_attr); 6368585484eSchristos if (0 != rc) { 63768dbbb44Schristos msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s", 63868dbbb44Schristos strerror(rc)); 6398585484eSchristos exit(1); 6408585484eSchristos } 6418b8da087Schristos c->thread_ref = &c->thr_table[0]; 6428585484eSchristos } 6438585484eSchristos #endif 6448585484eSchristos 6458b8da087Schristos /* -------------------------------------------------------------------- 6468585484eSchristos * block_thread_signals() 6478585484eSchristos * 6488585484eSchristos * Temporarily block signals used by ntpd main thread, so that signal 6498585484eSchristos * mask inherited by child threads leaves them blocked. Returns prior 6508585484eSchristos * active signal mask via pmask, to be restored by the main thread 6518585484eSchristos * after pthread_create(). 6528585484eSchristos */ 6538585484eSchristos #ifndef SYS_WINNT 6548585484eSchristos void 6558585484eSchristos block_thread_signals( 6568585484eSchristos sigset_t * pmask 6578585484eSchristos ) 6588585484eSchristos { 6598585484eSchristos sigset_t block; 6608585484eSchristos 6618585484eSchristos sigemptyset(&block); 6628585484eSchristos # ifdef HAVE_SIGNALED_IO 6638585484eSchristos # ifdef SIGIO 6648585484eSchristos sigaddset(&block, SIGIO); 6658585484eSchristos # endif 6668585484eSchristos # ifdef SIGPOLL 6678585484eSchristos sigaddset(&block, SIGPOLL); 6688585484eSchristos # endif 6698585484eSchristos # endif /* HAVE_SIGNALED_IO */ 6708585484eSchristos sigaddset(&block, SIGALRM); 6718585484eSchristos sigaddset(&block, MOREDEBUGSIG); 6728585484eSchristos sigaddset(&block, LESSDEBUGSIG); 6738585484eSchristos # ifdef SIGDIE1 6748585484eSchristos sigaddset(&block, SIGDIE1); 6758585484eSchristos # endif 6768585484eSchristos # ifdef SIGDIE2 6778585484eSchristos sigaddset(&block, SIGDIE2); 6788585484eSchristos # endif 6798585484eSchristos # ifdef SIGDIE3 6808585484eSchristos sigaddset(&block, SIGDIE3); 6818585484eSchristos # endif 6828585484eSchristos # ifdef SIGDIE4 6838585484eSchristos sigaddset(&block, SIGDIE4); 6848585484eSchristos # endif 6858585484eSchristos # ifdef SIGBUS 6868585484eSchristos sigaddset(&block, SIGBUS); 6878585484eSchristos # endif 6888585484eSchristos sigemptyset(pmask); 6898585484eSchristos pthread_sigmask(SIG_BLOCK, &block, pmask); 6908585484eSchristos } 6918585484eSchristos #endif /* !SYS_WINNT */ 6928585484eSchristos 6938585484eSchristos 6948b8da087Schristos /* -------------------------------------------------------------------- 6958b8da087Schristos * Create & destroy semaphores. This is sufficiently different between 6968b8da087Schristos * POSIX and Windows to warrant wrapper functions and close enough to 6978b8da087Schristos * use the concept of synchronization via semaphore for all platforms. 6988b8da087Schristos */ 6998b8da087Schristos static sem_ref 7008b8da087Schristos create_sema( 7018b8da087Schristos sema_type* semptr, 7028b8da087Schristos u_int inival, 7038b8da087Schristos u_int maxval) 7048b8da087Schristos { 7058b8da087Schristos #ifdef SYS_WINNT 7068b8da087Schristos 7078b8da087Schristos long svini, svmax; 7088b8da087Schristos if (NULL != semptr) { 7098b8da087Schristos svini = (inival < LONG_MAX) 7108b8da087Schristos ? (long)inival : LONG_MAX; 7118b8da087Schristos svmax = (maxval < LONG_MAX && maxval > 0) 7128b8da087Schristos ? (long)maxval : LONG_MAX; 7138b8da087Schristos semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL); 7148b8da087Schristos if (NULL == semptr->shnd) 7158b8da087Schristos semptr = NULL; 7168b8da087Schristos } 7178b8da087Schristos 7188b8da087Schristos #else 7198b8da087Schristos 7208b8da087Schristos (void)maxval; 7218b8da087Schristos if (semptr && sem_init(semptr, FALSE, inival)) 7228b8da087Schristos semptr = NULL; 7238b8da087Schristos 7248b8da087Schristos #endif 7258b8da087Schristos 7268b8da087Schristos return semptr; 7278b8da087Schristos } 7288b8da087Schristos 7298b8da087Schristos /* ------------------------------------------------------------------ */ 7308b8da087Schristos static sem_ref 7318b8da087Schristos delete_sema( 7328b8da087Schristos sem_ref obj) 7338b8da087Schristos { 7348b8da087Schristos 7358b8da087Schristos # ifdef SYS_WINNT 7368b8da087Schristos 7378b8da087Schristos if (obj) { 7388b8da087Schristos if (obj->shnd) 7398b8da087Schristos CloseHandle(obj->shnd); 7408b8da087Schristos obj->shnd = NULL; 7418b8da087Schristos } 7428b8da087Schristos 7438b8da087Schristos # else 7448b8da087Schristos 7458b8da087Schristos if (obj) 7468b8da087Schristos sem_destroy(obj); 7478b8da087Schristos 7488b8da087Schristos # endif 7498b8da087Schristos 7508b8da087Schristos return NULL; 7518b8da087Schristos } 7528b8da087Schristos 7538b8da087Schristos /* -------------------------------------------------------------------- 7548585484eSchristos * prepare_child_sems() 7558585484eSchristos * 7568b8da087Schristos * create sync & access semaphores 7578585484eSchristos * 7588b8da087Schristos * All semaphores are cleared, only the access semaphore has 1 unit. 7598b8da087Schristos * Childs wait on 'workitems_pending', then grabs 'sema_access' 7608b8da087Schristos * and dequeues jobs. When done, 'sema_access' is given one unit back. 7618b8da087Schristos * 7628b8da087Schristos * The producer grabs 'sema_access', manages the queue, restores 7638b8da087Schristos * 'sema_access' and puts one unit into 'workitems_pending'. 7648b8da087Schristos * 7658b8da087Schristos * The story goes the same for the response queue. 7668585484eSchristos */ 7678585484eSchristos static void 7688585484eSchristos prepare_child_sems( 7698585484eSchristos blocking_child *c 7708585484eSchristos ) 7718585484eSchristos { 77268dbbb44Schristos if (NULL == worker_memlock) 77368dbbb44Schristos worker_memlock = create_sema(&worker_mmutex, 1, 1); 77468dbbb44Schristos 7758b8da087Schristos c->accesslock = create_sema(&c->sem_table[0], 1, 1); 7768b8da087Schristos c->workitems_pending = create_sema(&c->sem_table[1], 0, 0); 7778b8da087Schristos c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1); 7788b8da087Schristos # ifndef WORK_PIPE 7798b8da087Schristos c->responses_pending = create_sema(&c->sem_table[3], 0, 0); 7808585484eSchristos # endif 7818b8da087Schristos } 7828585484eSchristos 7838b8da087Schristos /* -------------------------------------------------------------------- 7848b8da087Schristos * wait for semaphore. Where the wait can be interrupted, it will 7858b8da087Schristos * internally resume -- When this function returns, there is either no 7868b8da087Schristos * semaphore at all, a timeout occurred, or the caller could 7878b8da087Schristos * successfully take a token from the semaphore. 7888b8da087Schristos * 7898b8da087Schristos * For untimed wait, not checking the result of this function at all is 7908b8da087Schristos * definitely an option. 7918b8da087Schristos */ 7928585484eSchristos static int 7938585484eSchristos wait_for_sem( 7948585484eSchristos sem_ref sem, 7958585484eSchristos struct timespec * timeout /* wall-clock */ 7968585484eSchristos ) 7978585484eSchristos #ifdef SYS_WINNT 7988585484eSchristos { 7998585484eSchristos struct timespec now; 8008585484eSchristos struct timespec delta; 8018585484eSchristos DWORD msec; 8028585484eSchristos DWORD rc; 8038585484eSchristos 8048b8da087Schristos if (!(sem && sem->shnd)) { 8058b8da087Schristos errno = EINVAL; 8068b8da087Schristos return -1; 8078b8da087Schristos } 8088b8da087Schristos 8098585484eSchristos if (NULL == timeout) { 8108585484eSchristos msec = INFINITE; 8118585484eSchristos } else { 8128585484eSchristos getclock(TIMEOFDAY, &now); 8138585484eSchristos delta = sub_tspec(*timeout, now); 8148585484eSchristos if (delta.tv_sec < 0) { 8158585484eSchristos msec = 0; 8168585484eSchristos } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) { 8178585484eSchristos msec = INFINITE; 8188585484eSchristos } else { 8198585484eSchristos msec = 1000 * (DWORD)delta.tv_sec; 8208585484eSchristos msec += delta.tv_nsec / (1000 * 1000); 8218585484eSchristos } 8228585484eSchristos } 8238b8da087Schristos rc = WaitForSingleObject(sem->shnd, msec); 8248585484eSchristos if (WAIT_OBJECT_0 == rc) 8258585484eSchristos return 0; 8268585484eSchristos if (WAIT_TIMEOUT == rc) { 8278585484eSchristos errno = ETIMEDOUT; 8288585484eSchristos return -1; 8298585484eSchristos } 8308585484eSchristos msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc); 8318585484eSchristos errno = EFAULT; 8328585484eSchristos return -1; 8338585484eSchristos } 8348585484eSchristos #else /* pthreads wait_for_sem() follows */ 8358585484eSchristos { 8368b8da087Schristos int rc = -1; 8378585484eSchristos 8388b8da087Schristos if (sem) do { 8398585484eSchristos if (NULL == timeout) 8408585484eSchristos rc = sem_wait(sem); 8418585484eSchristos else 8428585484eSchristos rc = sem_timedwait(sem, timeout); 8438b8da087Schristos } while (rc == -1 && errno == EINTR); 8448b8da087Schristos else 8458b8da087Schristos errno = EINVAL; 8468585484eSchristos 8478585484eSchristos return rc; 8488585484eSchristos } 8498585484eSchristos #endif 8508585484eSchristos 8518b8da087Schristos /* -------------------------------------------------------------------- 8528b8da087Schristos * blocking_thread - thread functions have WINAPI (aka 'stdcall') 8538b8da087Schristos * calling conventions under Windows and POSIX-defined signature 8548b8da087Schristos * otherwise. 8558585484eSchristos */ 8568585484eSchristos #ifdef SYS_WINNT 8578b8da087Schristos u_int WINAPI 8588585484eSchristos #else 8598585484eSchristos void * 8608585484eSchristos #endif 8618585484eSchristos blocking_thread( 8628585484eSchristos void * ThreadArg 8638585484eSchristos ) 8648585484eSchristos { 8658585484eSchristos blocking_child *c; 8668585484eSchristos 8678585484eSchristos c = ThreadArg; 8688585484eSchristos exit_worker(blocking_child_common(c)); 8698585484eSchristos 8708585484eSchristos /* NOTREACHED */ 8718585484eSchristos return 0; 8728585484eSchristos } 8738585484eSchristos 8748b8da087Schristos /* -------------------------------------------------------------------- 8758585484eSchristos * req_child_exit() runs in the parent. 8768b8da087Schristos * 8778b8da087Schristos * This function is called from from the idle timer, too, and possibly 8788b8da087Schristos * without a thread being there any longer. Since we have folded up our 8798b8da087Schristos * tent in that case and all the semaphores are already gone, we simply 8808b8da087Schristos * ignore this request in this case. 8818b8da087Schristos * 8828b8da087Schristos * Since the existence of the semaphores is controlled exclusively by 8838b8da087Schristos * the parent, there's no risk of data race here. 8848585484eSchristos */ 8858585484eSchristos int 8868585484eSchristos req_child_exit( 8878585484eSchristos blocking_child *c 8888585484eSchristos ) 8898585484eSchristos { 8908b8da087Schristos return (c->accesslock) 8918b8da087Schristos ? queue_req_pointer(c, CHILD_EXIT_REQ) 8928b8da087Schristos : 0; 8938585484eSchristos } 8948585484eSchristos 8958b8da087Schristos /* -------------------------------------------------------------------- 8968585484eSchristos * cleanup_after_child() runs in parent. 8978585484eSchristos */ 8988585484eSchristos static void 8998585484eSchristos cleanup_after_child( 9008585484eSchristos blocking_child * c 9018585484eSchristos ) 9028585484eSchristos { 9038585484eSchristos DEBUG_INSIST(!c->reusable); 9048b8da087Schristos 9058585484eSchristos # ifdef SYS_WINNT 9068b8da087Schristos /* The thread was not created in detached state, so we better 9078b8da087Schristos * clean up. 9088b8da087Schristos */ 9098b8da087Schristos if (c->thread_ref && c->thread_ref->thnd) { 9108b8da087Schristos WaitForSingleObject(c->thread_ref->thnd, INFINITE); 9118b8da087Schristos INSIST(CloseHandle(c->thread_ref->thnd)); 9128b8da087Schristos c->thread_ref->thnd = NULL; 9138b8da087Schristos } 9148585484eSchristos # endif 9158585484eSchristos c->thread_ref = NULL; 9168b8da087Schristos 9178b8da087Schristos /* remove semaphores and (if signalling vi IO) pipes */ 9188b8da087Schristos 9198b8da087Schristos c->accesslock = delete_sema(c->accesslock); 9208b8da087Schristos c->workitems_pending = delete_sema(c->workitems_pending); 9218b8da087Schristos c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep); 9228b8da087Schristos 9238585484eSchristos # ifdef WORK_PIPE 9248585484eSchristos DEBUG_INSIST(-1 != c->resp_read_pipe); 9258585484eSchristos DEBUG_INSIST(-1 != c->resp_write_pipe); 9268585484eSchristos (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); 9278585484eSchristos close(c->resp_write_pipe); 9288585484eSchristos close(c->resp_read_pipe); 9298585484eSchristos c->resp_write_pipe = -1; 9308585484eSchristos c->resp_read_pipe = -1; 9318585484eSchristos # else 9328b8da087Schristos DEBUG_INSIST(NULL != c->responses_pending); 9338b8da087Schristos (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE); 9348b8da087Schristos c->responses_pending = delete_sema(c->responses_pending); 9358585484eSchristos # endif 9368b8da087Schristos 9378b8da087Schristos /* Is it necessary to check if there are pending requests and 9388b8da087Schristos * responses? If so, and if there are, what to do with them? 9398b8da087Schristos */ 9408b8da087Schristos 9418b8da087Schristos /* re-init buffer index sequencers */ 9428b8da087Schristos c->head_workitem = 0; 9438b8da087Schristos c->tail_workitem = 0; 9448b8da087Schristos c->head_response = 0; 9458b8da087Schristos c->tail_response = 0; 9468b8da087Schristos 9478585484eSchristos c->reusable = TRUE; 9488585484eSchristos } 9498585484eSchristos 9508585484eSchristos 9518585484eSchristos #else /* !WORK_THREAD follows */ 9528585484eSchristos char work_thread_nonempty_compilation_unit; 9538585484eSchristos #endif 954