12248Sraf /*
22248Sraf * CDDL HEADER START
32248Sraf *
42248Sraf * The contents of this file are subject to the terms of the
52248Sraf * Common Development and Distribution License (the "License").
62248Sraf * You may not use this file except in compliance with the License.
72248Sraf *
82248Sraf * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
92248Sraf * or http://www.opensolaris.org/os/licensing.
102248Sraf * See the License for the specific language governing permissions
112248Sraf * and limitations under the License.
122248Sraf *
132248Sraf * When distributing Covered Code, include this CDDL HEADER in each
142248Sraf * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
152248Sraf * If applicable, add the following below this CDDL HEADER, with the
162248Sraf * fields enclosed by brackets "[]" replaced with your own identifying
172248Sraf * information: Portions Copyright [yyyy] [name of copyright owner]
182248Sraf *
192248Sraf * CDDL HEADER END
202248Sraf */
212248Sraf
222248Sraf /*
235891Sraf * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
242248Sraf * Use is subject to license terms.
252248Sraf */
262248Sraf
272248Sraf #pragma ident "%Z%%M% %I% %E% SMI"
282248Sraf
29*6812Sraf #include "lint.h"
302248Sraf #include "thr_uberdata.h"
312248Sraf #include <sys/types.h>
322248Sraf #include <pthread.h>
332248Sraf #include <unistd.h>
342248Sraf #include <stdlib.h>
352248Sraf #include <thread.h>
362248Sraf #include <pthread.h>
372248Sraf #include <synch.h>
382248Sraf #include <port.h>
392248Sraf #include <signal.h>
402248Sraf #include <stdio.h>
412248Sraf #include <errno.h>
422248Sraf #include <stdarg.h>
432248Sraf #include <string.h>
442248Sraf #include <sys/aiocb.h>
452248Sraf #include <time.h>
462248Sraf #include <signal.h>
472248Sraf #include <fcntl.h>
482248Sraf #include "sigev_thread.h"
492248Sraf
502248Sraf /*
512248Sraf * There is but one spawner for all aio operations.
522248Sraf */
532248Sraf thread_communication_data_t *sigev_aio_tcd = NULL;
542248Sraf
552248Sraf /*
562248Sraf * Set non-zero via _RT_DEBUG to enable debugging printf's.
572248Sraf */
582248Sraf static int _rt_debug = 0;
592248Sraf
602248Sraf void
init_sigev_thread(void)612248Sraf init_sigev_thread(void)
622248Sraf {
632248Sraf char *ldebug;
642248Sraf
652248Sraf if ((ldebug = getenv("_RT_DEBUG")) != NULL)
662248Sraf _rt_debug = atoi(ldebug);
672248Sraf }
682248Sraf
692248Sraf /*
702248Sraf * Routine to print debug messages:
712248Sraf * If _rt_debug is set, printf the debug message to stderr
722248Sraf * with an appropriate prefix.
732248Sraf */
742248Sraf /*PRINTFLIKE1*/
752248Sraf static void
dprintf(const char * format,...)762248Sraf dprintf(const char *format, ...)
772248Sraf {
782248Sraf if (_rt_debug) {
792248Sraf va_list alist;
802248Sraf
812248Sraf va_start(alist, format);
822248Sraf flockfile(stderr);
835891Sraf pthread_cleanup_push(funlockfile, stderr);
842248Sraf (void) fputs("DEBUG: ", stderr);
852248Sraf (void) vfprintf(stderr, format, alist);
865891Sraf pthread_cleanup_pop(1); /* funlockfile(stderr) */
872248Sraf va_end(alist);
882248Sraf }
892248Sraf }
902248Sraf
912248Sraf /*
922248Sraf * The notify_thread() function can be used as the start function of a new
932248Sraf * thread but it is normally called from notifier(), below, in the context
942248Sraf * of a thread pool worker thread. It is used as the start function of a
952248Sraf * new thread only when individual pthread attributes differ from those
962248Sraf * that are common to all workers. This only occurs in the AIO case.
972248Sraf */
982248Sraf static void *
notify_thread(void * arg)992248Sraf notify_thread(void *arg)
1002248Sraf {
1012248Sraf sigev_thread_data_t *stdp = arg;
1022248Sraf void (*function)(union sigval) = stdp->std_func;
1032248Sraf union sigval argument = stdp->std_arg;
1042248Sraf
1052248Sraf lfree(stdp, sizeof (*stdp));
1062248Sraf function(argument);
1072248Sraf return (NULL);
1082248Sraf }
1092248Sraf
1102248Sraf /*
1112248Sraf * Thread pool interface to call the user-supplied notification function.
1122248Sraf */
1132248Sraf static void
notifier(void * arg)1142248Sraf notifier(void *arg)
1152248Sraf {
1162248Sraf (void) notify_thread(arg);
1172248Sraf }
1182248Sraf
1192248Sraf /*
1202248Sraf * This routine adds a new work request, described by function
1212248Sraf * and argument, to the list of outstanding jobs.
1222248Sraf * It returns 0 indicating success. A value != 0 indicates an error.
1232248Sraf */
1242248Sraf static int
sigev_add_work(thread_communication_data_t * tcdp,void (* function)(union sigval),union sigval argument)1252248Sraf sigev_add_work(thread_communication_data_t *tcdp,
1262248Sraf void (*function)(union sigval), union sigval argument)
1272248Sraf {
1282248Sraf tpool_t *tpool = tcdp->tcd_poolp;
1292248Sraf sigev_thread_data_t *stdp;
1302248Sraf
1312248Sraf if (tpool == NULL)
1322248Sraf return (EINVAL);
1332248Sraf if ((stdp = lmalloc(sizeof (*stdp))) == NULL)
1342248Sraf return (errno);
1352248Sraf stdp->std_func = function;
1362248Sraf stdp->std_arg = argument;
1372248Sraf if (tpool_dispatch(tpool, notifier, stdp) != 0) {
1382248Sraf lfree(stdp, sizeof (*stdp));
1392248Sraf return (errno);
1402248Sraf }
1412248Sraf return (0);
1422248Sraf }
1432248Sraf
1442248Sraf static void
sigev_destroy_pool(thread_communication_data_t * tcdp)1452248Sraf sigev_destroy_pool(thread_communication_data_t *tcdp)
1462248Sraf {
1472248Sraf if (tcdp->tcd_poolp != NULL)
1482248Sraf tpool_abandon(tcdp->tcd_poolp);
1492248Sraf tcdp->tcd_poolp = NULL;
1502248Sraf
1512248Sraf if (tcdp->tcd_subsystem == MQ) {
1522248Sraf /*
1532248Sraf * synchronize with del_sigev_mq()
1542248Sraf */
1552248Sraf sig_mutex_lock(&tcdp->tcd_lock);
1562248Sraf tcdp->tcd_server_id = 0;
1572248Sraf if (tcdp->tcd_msg_closing) {
1582248Sraf (void) cond_broadcast(&tcdp->tcd_cv);
1592248Sraf sig_mutex_unlock(&tcdp->tcd_lock);
1602248Sraf return; /* del_sigev_mq() will free the tcd */
1612248Sraf }
1622248Sraf sig_mutex_unlock(&tcdp->tcd_lock);
1632248Sraf }
1642248Sraf
1652248Sraf /*
1662248Sraf * now delete everything
1672248Sraf */
1682248Sraf free_sigev_handler(tcdp);
1692248Sraf }
1702248Sraf
1712248Sraf /*
1722248Sraf * timer_spawner(), mqueue_spawner(), and aio_spawner() are the main
1732248Sraf * functions for the daemon threads that get the event(s) for the
1742248Sraf * respective SIGEV_THREAD subsystems. There is one timer spawner for
1752248Sraf * each timer_create(), one mqueue spawner for every mq_open(), and
1762248Sraf * exactly one aio spawner for all aio requests. These spawners add
1772248Sraf * work requests to be done by a pool of daemon worker threads. In case
1782248Sraf * the event requires creation of a worker thread with different pthread
1792248Sraf * attributes than those from the pool of workers, a new daemon thread
1802248Sraf * with these attributes is spawned apart from the pool of workers.
1812248Sraf * If the spawner fails to add work or fails to create an additional
1822248Sraf * thread because of lacking resources, it puts the event back into
1832248Sraf * the kernel queue and re-tries some time later.
1842248Sraf */
1852248Sraf
1862248Sraf void *
timer_spawner(void * arg)1872248Sraf timer_spawner(void *arg)
1882248Sraf {
1892248Sraf thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
1902248Sraf port_event_t port_event;
1912248Sraf
1922248Sraf /* destroy the pool if we are cancelled */
1932248Sraf pthread_cleanup_push(sigev_destroy_pool, tcdp);
1942248Sraf
1952248Sraf for (;;) {
1962248Sraf if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) {
1972248Sraf dprintf("port_get on port %d failed with %d <%s>\n",
1982248Sraf tcdp->tcd_port, errno, strerror(errno));
1992248Sraf break;
2002248Sraf }
2012248Sraf switch (port_event.portev_source) {
2022248Sraf case PORT_SOURCE_TIMER:
2032248Sraf break;
2042248Sraf case PORT_SOURCE_ALERT:
2052248Sraf if (port_event.portev_events != SIGEV_THREAD_TERM)
2062248Sraf errno = EPROTO;
2072248Sraf goto out;
2082248Sraf default:
2092248Sraf dprintf("port_get on port %d returned %u "
2102248Sraf "(not PORT_SOURCE_TIMER)\n",
2112248Sraf tcdp->tcd_port, port_event.portev_source);
2122248Sraf errno = EPROTO;
2132248Sraf goto out;
2142248Sraf }
2152248Sraf
2162248Sraf tcdp->tcd_overruns = port_event.portev_events - 1;
2172248Sraf if (sigev_add_work(tcdp,
2182248Sraf tcdp->tcd_notif.sigev_notify_function,
2192248Sraf tcdp->tcd_notif.sigev_value) != 0)
2202248Sraf break;
2212248Sraf /* wait until job is done before looking for another */
2222248Sraf tpool_wait(tcdp->tcd_poolp);
2232248Sraf }
2242248Sraf out:
2252248Sraf pthread_cleanup_pop(1);
2262248Sraf return (NULL);
2272248Sraf }
2282248Sraf
2292248Sraf void *
mqueue_spawner(void * arg)2302248Sraf mqueue_spawner(void *arg)
2312248Sraf {
2322248Sraf thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
2332248Sraf int ret = 0;
2342248Sraf int ntype;
2352248Sraf void (*function)(union sigval);
2362248Sraf union sigval argument;
2372248Sraf
2382248Sraf /* destroy the pool if we are cancelled */
2392248Sraf pthread_cleanup_push(sigev_destroy_pool, tcdp);
2402248Sraf
2412248Sraf while (ret == 0) {
2422248Sraf sig_mutex_lock(&tcdp->tcd_lock);
2432248Sraf pthread_cleanup_push(sig_mutex_unlock, &tcdp->tcd_lock);
2442248Sraf while ((ntype = tcdp->tcd_msg_enabled) == 0)
2452248Sraf (void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock);
2462248Sraf pthread_cleanup_pop(1);
2472248Sraf
2482248Sraf while (sem_wait(tcdp->tcd_msg_avail) == -1)
2492248Sraf continue;
2502248Sraf
2512248Sraf sig_mutex_lock(&tcdp->tcd_lock);
2522248Sraf tcdp->tcd_msg_enabled = 0;
2532248Sraf sig_mutex_unlock(&tcdp->tcd_lock);
2542248Sraf
2552248Sraf /* ASSERT(ntype == SIGEV_THREAD || ntype == SIGEV_PORT); */
2562248Sraf if (ntype == SIGEV_THREAD) {
2572248Sraf function = tcdp->tcd_notif.sigev_notify_function;
2582248Sraf argument.sival_ptr = tcdp->tcd_msg_userval;
2592248Sraf ret = sigev_add_work(tcdp, function, argument);
2602248Sraf } else { /* ntype == SIGEV_PORT */
2612248Sraf ret = _port_dispatch(tcdp->tcd_port, 0, PORT_SOURCE_MQ,
2622248Sraf 0, (uintptr_t)tcdp->tcd_msg_object,
2632248Sraf tcdp->tcd_msg_userval);
2642248Sraf }
2652248Sraf }
2662248Sraf sig_mutex_unlock(&tcdp->tcd_lock);
2672248Sraf
2682248Sraf pthread_cleanup_pop(1);
2692248Sraf return (NULL);
2702248Sraf }
2712248Sraf
2722248Sraf void *
aio_spawner(void * arg)2732248Sraf aio_spawner(void *arg)
2742248Sraf {
2752248Sraf thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
2762248Sraf int error = 0;
2772248Sraf void (*function)(union sigval);
2782248Sraf union sigval argument;
2792248Sraf port_event_t port_event;
2802248Sraf struct sigevent *sigevp;
2812248Sraf timespec_t delta;
2822248Sraf pthread_attr_t *attrp;
2832248Sraf
2842248Sraf /* destroy the pool if we are cancelled */
2852248Sraf pthread_cleanup_push(sigev_destroy_pool, tcdp);
2862248Sraf
2872248Sraf while (error == 0) {
2882248Sraf if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) {
2892248Sraf error = errno;
2902248Sraf dprintf("port_get on port %d failed with %d <%s>\n",
2912248Sraf tcdp->tcd_port, error, strerror(error));
2922248Sraf break;
2932248Sraf }
2942248Sraf switch (port_event.portev_source) {
2952248Sraf case PORT_SOURCE_AIO:
2962248Sraf break;
2972248Sraf case PORT_SOURCE_ALERT:
2982248Sraf if (port_event.portev_events != SIGEV_THREAD_TERM)
2992248Sraf errno = EPROTO;
3002248Sraf goto out;
3012248Sraf default:
3022248Sraf dprintf("port_get on port %d returned %u "
3032248Sraf "(not PORT_SOURCE_AIO)\n",
3042248Sraf tcdp->tcd_port, port_event.portev_source);
3052248Sraf errno = EPROTO;
3062248Sraf goto out;
3072248Sraf }
3082248Sraf argument.sival_ptr = port_event.portev_user;
3092248Sraf switch (port_event.portev_events) {
3102248Sraf case AIOLIO:
3112248Sraf #if !defined(_LP64)
3122248Sraf case AIOLIO64:
3132248Sraf #endif
3142248Sraf sigevp = (struct sigevent *)port_event.portev_object;
3152248Sraf function = sigevp->sigev_notify_function;
3162248Sraf attrp = sigevp->sigev_notify_attributes;
3172248Sraf break;
3182248Sraf case AIOAREAD:
3192248Sraf case AIOAWRITE:
3202248Sraf case AIOFSYNC:
3215891Sraf {
3222248Sraf aiocb_t *aiocbp =
3232248Sraf (aiocb_t *)port_event.portev_object;
3242248Sraf function = aiocbp->aio_sigevent.sigev_notify_function;
3252248Sraf attrp = aiocbp->aio_sigevent.sigev_notify_attributes;
3262248Sraf break;
3275891Sraf }
3282248Sraf #if !defined(_LP64)
3292248Sraf case AIOAREAD64:
3302248Sraf case AIOAWRITE64:
3312248Sraf case AIOFSYNC64:
3325891Sraf {
3332248Sraf aiocb64_t *aiocbp =
3342248Sraf (aiocb64_t *)port_event.portev_object;
3352248Sraf function = aiocbp->aio_sigevent.sigev_notify_function;
3362248Sraf attrp = aiocbp->aio_sigevent.sigev_notify_attributes;
3372248Sraf break;
3385891Sraf }
3392248Sraf #endif
3402248Sraf default:
3412248Sraf function = NULL;
3422248Sraf attrp = NULL;
3432248Sraf break;
3442248Sraf }
3452248Sraf
3462248Sraf if (function == NULL)
3472248Sraf error = EINVAL;
348*6812Sraf else if (pthread_attr_equal(attrp, tcdp->tcd_attrp))
3492248Sraf error = sigev_add_work(tcdp, function, argument);
3502248Sraf else {
3512248Sraf /*
3522248Sraf * The attributes don't match.
3532248Sraf * Spawn a thread with the non-matching attributes.
3542248Sraf */
3552248Sraf pthread_attr_t local_attr;
3562248Sraf sigev_thread_data_t *stdp;
3572248Sraf
3582248Sraf if ((stdp = lmalloc(sizeof (*stdp))) == NULL)
3592248Sraf error = ENOMEM;
3602248Sraf else
361*6812Sraf error = pthread_attr_clone(&local_attr, attrp);
3622248Sraf
3632248Sraf if (error == 0) {
3642248Sraf (void) pthread_attr_setdetachstate(
3652248Sraf &local_attr, PTHREAD_CREATE_DETACHED);
366*6812Sraf (void) pthread_attr_setdaemonstate_np(
3672248Sraf &local_attr, PTHREAD_CREATE_DAEMON_NP);
3682248Sraf stdp->std_func = function;
3692248Sraf stdp->std_arg = argument;
3702248Sraf error = pthread_create(NULL, &local_attr,
3712248Sraf notify_thread, stdp);
3722248Sraf (void) pthread_attr_destroy(&local_attr);
3732248Sraf }
3742248Sraf if (error && stdp != NULL)
3752248Sraf lfree(stdp, sizeof (*stdp));
3762248Sraf }
3772248Sraf
3782248Sraf if (error) {
3792248Sraf dprintf("Cannot add work, error=%d <%s>.\n",
3802248Sraf error, strerror(error));
3812248Sraf if (error == EAGAIN || error == ENOMEM) {
3822248Sraf /* (Temporary) no resources are available. */
3832248Sraf if (_port_dispatch(tcdp->tcd_port, 0,
3842248Sraf PORT_SOURCE_AIO, port_event.portev_events,
3852248Sraf port_event.portev_object,
3862248Sraf port_event.portev_user) != 0)
3872248Sraf break;
3882248Sraf error = 0;
3892248Sraf delta.tv_sec = 0;
3902248Sraf delta.tv_nsec = NANOSEC / 20; /* 50 msec */
3912248Sraf (void) nanosleep(&delta, NULL);
3922248Sraf }
3932248Sraf }
3942248Sraf }
3952248Sraf out:
3962248Sraf pthread_cleanup_pop(1);
3972248Sraf return (NULL);
3982248Sraf }
3992248Sraf
4002248Sraf /*
4012248Sraf * Allocate a thread_communication_data_t block.
4022248Sraf */
4032248Sraf static thread_communication_data_t *
alloc_sigev_handler(subsystem_t caller)4042248Sraf alloc_sigev_handler(subsystem_t caller)
4052248Sraf {
4062248Sraf thread_communication_data_t *tcdp;
4072248Sraf
4082248Sraf if ((tcdp = lmalloc(sizeof (*tcdp))) != NULL) {
4092248Sraf tcdp->tcd_subsystem = caller;
4102248Sraf tcdp->tcd_port = -1;
4112248Sraf (void) mutex_init(&tcdp->tcd_lock, USYNC_THREAD, NULL);
4122248Sraf (void) cond_init(&tcdp->tcd_cv, USYNC_THREAD, NULL);
4132248Sraf }
4142248Sraf return (tcdp);
4152248Sraf }
4162248Sraf
4172248Sraf /*
4182248Sraf * Free a thread_communication_data_t block.
4192248Sraf */
4202248Sraf void
free_sigev_handler(thread_communication_data_t * tcdp)4212248Sraf free_sigev_handler(thread_communication_data_t *tcdp)
4222248Sraf {
4232248Sraf if (tcdp->tcd_attrp) {
4242248Sraf (void) pthread_attr_destroy(tcdp->tcd_attrp);
4252248Sraf tcdp->tcd_attrp = NULL;
4262248Sraf }
4272248Sraf (void) memset(&tcdp->tcd_notif, 0, sizeof (tcdp->tcd_notif));
4282248Sraf
4292248Sraf switch (tcdp->tcd_subsystem) {
4302248Sraf case TIMER:
4312248Sraf case AIO:
4322248Sraf if (tcdp->tcd_port >= 0)
4332248Sraf (void) close(tcdp->tcd_port);
4342248Sraf break;
4352248Sraf case MQ:
4362248Sraf tcdp->tcd_msg_avail = NULL;
4372248Sraf tcdp->tcd_msg_object = NULL;
4382248Sraf tcdp->tcd_msg_userval = NULL;
4392248Sraf tcdp->tcd_msg_enabled = 0;
4402248Sraf break;
4412248Sraf }
4422248Sraf
4432248Sraf lfree(tcdp, sizeof (*tcdp));
4442248Sraf }
4452248Sraf
4462248Sraf /*
4472248Sraf * Initialize data structure and create the port.
4482248Sraf */
4492248Sraf thread_communication_data_t *
setup_sigev_handler(const struct sigevent * sigevp,subsystem_t caller)4502248Sraf setup_sigev_handler(const struct sigevent *sigevp, subsystem_t caller)
4512248Sraf {
4522248Sraf thread_communication_data_t *tcdp;
4532248Sraf int error;
4542248Sraf
4552248Sraf if (sigevp == NULL) {
4562248Sraf errno = EINVAL;
4572248Sraf return (NULL);
4582248Sraf }
4592248Sraf
4602248Sraf if ((tcdp = alloc_sigev_handler(caller)) == NULL) {
4612248Sraf errno = ENOMEM;
4622248Sraf return (NULL);
4632248Sraf }
4642248Sraf
4652248Sraf if (sigevp->sigev_notify_attributes == NULL)
4662248Sraf tcdp->tcd_attrp = NULL; /* default attributes */
4672248Sraf else {
4682248Sraf /*
4692248Sraf * We cannot just copy the sigevp->sigev_notify_attributes
4702248Sraf * pointer. We need to initialize a new pthread_attr_t
4712248Sraf * structure with the values from the user-supplied
4722248Sraf * pthread_attr_t.
4732248Sraf */
4742248Sraf tcdp->tcd_attrp = &tcdp->tcd_user_attr;
475*6812Sraf error = pthread_attr_clone(tcdp->tcd_attrp,
4765891Sraf sigevp->sigev_notify_attributes);
4772248Sraf if (error) {
4782248Sraf tcdp->tcd_attrp = NULL;
4792248Sraf free_sigev_handler(tcdp);
4802248Sraf errno = error;
4812248Sraf return (NULL);
4822248Sraf }
4832248Sraf }
4842248Sraf tcdp->tcd_notif = *sigevp;
4852248Sraf tcdp->tcd_notif.sigev_notify_attributes = tcdp->tcd_attrp;
4862248Sraf
4872248Sraf if (caller == TIMER || caller == AIO) {
4882248Sraf if ((tcdp->tcd_port = port_create()) < 0 ||
4892248Sraf fcntl(tcdp->tcd_port, FD_CLOEXEC) == -1) {
4902248Sraf free_sigev_handler(tcdp);
4912248Sraf errno = EBADF;
4922248Sraf return (NULL);
4932248Sraf }
4942248Sraf }
4952248Sraf return (tcdp);
4962248Sraf }
4972248Sraf
4982248Sraf /*
4992248Sraf * Create a thread pool and launch the spawner.
5002248Sraf */
5012248Sraf int
launch_spawner(thread_communication_data_t * tcdp)5022248Sraf launch_spawner(thread_communication_data_t *tcdp)
5032248Sraf {
5042248Sraf int ret;
5052248Sraf int maxworkers;
5062248Sraf void *(*spawner)(void *);
5072248Sraf sigset_t set;
5082248Sraf sigset_t oset;
5092248Sraf
5102248Sraf switch (tcdp->tcd_subsystem) {
5112248Sraf case TIMER:
5122248Sraf spawner = timer_spawner;
5132248Sraf maxworkers = 1;
5142248Sraf break;
5152248Sraf case MQ:
5162248Sraf spawner = mqueue_spawner;
5172248Sraf maxworkers = 1;
5182248Sraf break;
5192248Sraf case AIO:
5202248Sraf spawner = aio_spawner;
5212248Sraf maxworkers = 100;
5222248Sraf break;
5232248Sraf default:
5242248Sraf return (-1);
5252248Sraf }
5262248Sraf tcdp->tcd_poolp = tpool_create(1, maxworkers, 20,
5272248Sraf tcdp->tcd_notif.sigev_notify_attributes);
5282248Sraf if (tcdp->tcd_poolp == NULL)
5292248Sraf return (-1);
5302248Sraf /* create the spawner with all signals blocked */
5312248Sraf (void) sigfillset(&set);
5322248Sraf (void) thr_sigsetmask(SIG_SETMASK, &set, &oset);
5332248Sraf ret = thr_create(NULL, 0, spawner, tcdp,
5342248Sraf THR_DETACHED | THR_DAEMON, &tcdp->tcd_server_id);
5352248Sraf (void) thr_sigsetmask(SIG_SETMASK, &oset, NULL);
5362248Sraf if (ret != 0) {
5372248Sraf tpool_destroy(tcdp->tcd_poolp);
5382248Sraf tcdp->tcd_poolp = NULL;
5392248Sraf return (-1);
5402248Sraf }
5412248Sraf return (0);
5422248Sraf }
5432248Sraf
5442248Sraf /*
5452248Sraf * Delete the data associated with the sigev_thread timer, if timer is
5462248Sraf * associated with such a notification option.
5472248Sraf * Destroy the timer_spawner thread.
5482248Sraf */
5492248Sraf int
del_sigev_timer(timer_t timer)5502248Sraf del_sigev_timer(timer_t timer)
5512248Sraf {
5522248Sraf int rc = 0;
5532248Sraf thread_communication_data_t *tcdp;
5542248Sraf
5552248Sraf if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL) {
5562248Sraf sig_mutex_lock(&tcdp->tcd_lock);
5572248Sraf if (tcdp->tcd_port >= 0) {
5582248Sraf if ((rc = port_alert(tcdp->tcd_port,
5592248Sraf PORT_ALERT_SET, SIGEV_THREAD_TERM, NULL)) == 0) {
5602248Sraf dprintf("del_sigev_timer(%d) OK.\n", timer);
5612248Sraf }
5622248Sraf }
5632248Sraf timer_tcd[timer] = NULL;
5642248Sraf sig_mutex_unlock(&tcdp->tcd_lock);
5652248Sraf }
5662248Sraf return (rc);
5672248Sraf }
5682248Sraf
5692248Sraf int
sigev_timer_getoverrun(timer_t timer)5702248Sraf sigev_timer_getoverrun(timer_t timer)
5712248Sraf {
5722248Sraf thread_communication_data_t *tcdp;
5732248Sraf
5742248Sraf if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL)
5752248Sraf return (tcdp->tcd_overruns);
5762248Sraf return (0);
5772248Sraf }
5782248Sraf
5792248Sraf static void
del_sigev_mq_cleanup(thread_communication_data_t * tcdp)5802248Sraf del_sigev_mq_cleanup(thread_communication_data_t *tcdp)
5812248Sraf {
5822248Sraf sig_mutex_unlock(&tcdp->tcd_lock);
5832248Sraf free_sigev_handler(tcdp);
5842248Sraf }
5852248Sraf
5862248Sraf /*
5872248Sraf * Delete the data associated with the sigev_thread message queue,
5882248Sraf * if the message queue is associated with such a notification option.
5892248Sraf * Destroy the mqueue_spawner thread.
5902248Sraf */
5912248Sraf void
del_sigev_mq(thread_communication_data_t * tcdp)5922248Sraf del_sigev_mq(thread_communication_data_t *tcdp)
5932248Sraf {
5942248Sraf pthread_t server_id;
5952248Sraf int rc;
5962248Sraf
5972248Sraf sig_mutex_lock(&tcdp->tcd_lock);
5982248Sraf
5992248Sraf server_id = tcdp->tcd_server_id;
6002248Sraf tcdp->tcd_msg_closing = 1;
6012248Sraf if ((rc = pthread_cancel(server_id)) != 0) { /* "can't happen" */
6022248Sraf sig_mutex_unlock(&tcdp->tcd_lock);
6032248Sraf dprintf("Fail to cancel %u with error %d <%s>.\n",
6042248Sraf server_id, rc, strerror(rc));
6052248Sraf return;
6062248Sraf }
6072248Sraf
6082248Sraf /*
6092248Sraf * wait for sigev_destroy_pool() to finish
6102248Sraf */
6112248Sraf pthread_cleanup_push(del_sigev_mq_cleanup, tcdp);
6122248Sraf while (tcdp->tcd_server_id == server_id)
6132248Sraf (void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock);
6142248Sraf pthread_cleanup_pop(1);
6152248Sraf }
6162248Sraf
6172248Sraf /*
6182248Sraf * POSIX aio:
6192248Sraf * If the notification type is SIGEV_THREAD, set up
6202248Sraf * the port number for notifications. Create the
6212248Sraf * thread pool and launch the spawner if necessary.
6222248Sraf * If the notification type is not SIGEV_THREAD, do nothing.
6232248Sraf */
6242248Sraf int
_aio_sigev_thread_init(struct sigevent * sigevp)6252248Sraf _aio_sigev_thread_init(struct sigevent *sigevp)
6262248Sraf {
6272248Sraf static mutex_t sigev_aio_lock = DEFAULTMUTEX;
6282248Sraf static cond_t sigev_aio_cv = DEFAULTCV;
6292248Sraf static int sigev_aio_busy = 0;
6302248Sraf
6312248Sraf thread_communication_data_t *tcdp;
6322248Sraf int port;
6335891Sraf int cancel_state;
6342248Sraf int rc = 0;
6352248Sraf
6362248Sraf if (sigevp == NULL ||
6372248Sraf sigevp->sigev_notify != SIGEV_THREAD ||
6382248Sraf sigevp->sigev_notify_function == NULL)
6392248Sraf return (0);
6402248Sraf
6412248Sraf lmutex_lock(&sigev_aio_lock);
6425891Sraf (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
6432248Sraf while (sigev_aio_busy)
6445891Sraf (void) cond_wait(&sigev_aio_cv, &sigev_aio_lock);
6455891Sraf (void) pthread_setcancelstate(cancel_state, NULL);
6462248Sraf if ((tcdp = sigev_aio_tcd) != NULL)
6472248Sraf port = tcdp->tcd_port;
6482248Sraf else {
6492248Sraf sigev_aio_busy = 1;
6502248Sraf lmutex_unlock(&sigev_aio_lock);
6512248Sraf
6522248Sraf tcdp = setup_sigev_handler(sigevp, AIO);
6532248Sraf if (tcdp == NULL) {
6542248Sraf port = -1;
6552248Sraf rc = -1;
6562248Sraf } else if (launch_spawner(tcdp) != 0) {
6572248Sraf free_sigev_handler(tcdp);
6582248Sraf tcdp = NULL;
6592248Sraf port = -1;
6602248Sraf rc = -1;
6612248Sraf } else {
6622248Sraf port = tcdp->tcd_port;
6632248Sraf }
6642248Sraf
6652248Sraf lmutex_lock(&sigev_aio_lock);
6662248Sraf sigev_aio_tcd = tcdp;
6672248Sraf sigev_aio_busy = 0;
6682248Sraf (void) cond_broadcast(&sigev_aio_cv);
6692248Sraf }
6702248Sraf lmutex_unlock(&sigev_aio_lock);
6712248Sraf sigevp->sigev_signo = port;
6722248Sraf return (rc);
6732248Sraf }
6742248Sraf
6752248Sraf int
_aio_sigev_thread(aiocb_t * aiocbp)6762248Sraf _aio_sigev_thread(aiocb_t *aiocbp)
6772248Sraf {
6782248Sraf if (aiocbp == NULL)
6792248Sraf return (0);
6802248Sraf return (_aio_sigev_thread_init(&aiocbp->aio_sigevent));
6812248Sraf }
6822248Sraf
6832248Sraf #if !defined(_LP64)
6842248Sraf int
_aio_sigev_thread64(aiocb64_t * aiocbp)6852248Sraf _aio_sigev_thread64(aiocb64_t *aiocbp)
6862248Sraf {
6872248Sraf if (aiocbp == NULL)
6882248Sraf return (0);
6892248Sraf return (_aio_sigev_thread_init(&aiocbp->aio_sigevent));
6902248Sraf }
6912248Sraf #endif
6922248Sraf
6932248Sraf /*
6942248Sraf * Cleanup POSIX aio after fork1() in the child process.
6952248Sraf */
6962248Sraf void
postfork1_child_sigev_aio(void)6972248Sraf postfork1_child_sigev_aio(void)
6982248Sraf {
6992248Sraf thread_communication_data_t *tcdp;
7002248Sraf
7012248Sraf if ((tcdp = sigev_aio_tcd) != NULL) {
7022248Sraf sigev_aio_tcd = NULL;
7032248Sraf tcd_teardown(tcdp);
7042248Sraf }
7052248Sraf }
7062248Sraf
7072248Sraf /*
7082248Sraf * Utility function for the various postfork1_child_sigev_*() functions.
7092248Sraf * Clean up the tcdp data structure and close the port.
7102248Sraf */
7112248Sraf void
tcd_teardown(thread_communication_data_t * tcdp)7122248Sraf tcd_teardown(thread_communication_data_t *tcdp)
7132248Sraf {
7142248Sraf if (tcdp->tcd_poolp != NULL)
7152248Sraf tpool_abandon(tcdp->tcd_poolp);
7162248Sraf tcdp->tcd_poolp = NULL;
7172248Sraf tcdp->tcd_server_id = 0;
7182248Sraf free_sigev_handler(tcdp);
7192248Sraf }
720