xref: /onnv-gate/usr/src/lib/libc/port/rt/sigev_thread.c (revision 6812:febeba71273d)
12248Sraf /*
22248Sraf  * CDDL HEADER START
32248Sraf  *
42248Sraf  * The contents of this file are subject to the terms of the
52248Sraf  * Common Development and Distribution License (the "License").
62248Sraf  * You may not use this file except in compliance with the License.
72248Sraf  *
82248Sraf  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
92248Sraf  * or http://www.opensolaris.org/os/licensing.
102248Sraf  * See the License for the specific language governing permissions
112248Sraf  * and limitations under the License.
122248Sraf  *
132248Sraf  * When distributing Covered Code, include this CDDL HEADER in each
142248Sraf  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
152248Sraf  * If applicable, add the following below this CDDL HEADER, with the
162248Sraf  * fields enclosed by brackets "[]" replaced with your own identifying
172248Sraf  * information: Portions Copyright [yyyy] [name of copyright owner]
182248Sraf  *
192248Sraf  * CDDL HEADER END
202248Sraf  */
212248Sraf 
222248Sraf /*
235891Sraf  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
242248Sraf  * Use is subject to license terms.
252248Sraf  */
262248Sraf 
272248Sraf #pragma ident	"%Z%%M%	%I%	%E% SMI"
282248Sraf 
29*6812Sraf #include "lint.h"
302248Sraf #include "thr_uberdata.h"
312248Sraf #include <sys/types.h>
322248Sraf #include <pthread.h>
332248Sraf #include <unistd.h>
342248Sraf #include <stdlib.h>
352248Sraf #include <thread.h>
362248Sraf #include <pthread.h>
372248Sraf #include <synch.h>
382248Sraf #include <port.h>
392248Sraf #include <signal.h>
402248Sraf #include <stdio.h>
412248Sraf #include <errno.h>
422248Sraf #include <stdarg.h>
432248Sraf #include <string.h>
442248Sraf #include <sys/aiocb.h>
452248Sraf #include <time.h>
462248Sraf #include <signal.h>
472248Sraf #include <fcntl.h>
482248Sraf #include "sigev_thread.h"
492248Sraf 
502248Sraf /*
512248Sraf  * There is but one spawner for all aio operations.
522248Sraf  */
532248Sraf thread_communication_data_t *sigev_aio_tcd = NULL;
542248Sraf 
552248Sraf /*
562248Sraf  * Set non-zero via _RT_DEBUG to enable debugging printf's.
572248Sraf  */
582248Sraf static int _rt_debug = 0;
592248Sraf 
602248Sraf void
init_sigev_thread(void)612248Sraf init_sigev_thread(void)
622248Sraf {
632248Sraf 	char *ldebug;
642248Sraf 
652248Sraf 	if ((ldebug = getenv("_RT_DEBUG")) != NULL)
662248Sraf 		_rt_debug = atoi(ldebug);
672248Sraf }
682248Sraf 
692248Sraf /*
702248Sraf  * Routine to print debug messages:
712248Sraf  * If _rt_debug is set, printf the debug message to stderr
722248Sraf  * with an appropriate prefix.
732248Sraf  */
742248Sraf /*PRINTFLIKE1*/
752248Sraf static void
dprintf(const char * format,...)762248Sraf dprintf(const char *format, ...)
772248Sraf {
782248Sraf 	if (_rt_debug) {
792248Sraf 		va_list alist;
802248Sraf 
812248Sraf 		va_start(alist, format);
822248Sraf 		flockfile(stderr);
835891Sraf 		pthread_cleanup_push(funlockfile, stderr);
842248Sraf 		(void) fputs("DEBUG: ", stderr);
852248Sraf 		(void) vfprintf(stderr, format, alist);
865891Sraf 		pthread_cleanup_pop(1);		/* funlockfile(stderr) */
872248Sraf 		va_end(alist);
882248Sraf 	}
892248Sraf }
902248Sraf 
912248Sraf /*
922248Sraf  * The notify_thread() function can be used as the start function of a new
932248Sraf  * thread but it is normally called from notifier(), below, in the context
942248Sraf  * of a thread pool worker thread.  It is used as the start function of a
952248Sraf  * new thread only when individual pthread attributes differ from those
962248Sraf  * that are common to all workers.  This only occurs in the AIO case.
972248Sraf  */
982248Sraf static void *
notify_thread(void * arg)992248Sraf notify_thread(void *arg)
1002248Sraf {
1012248Sraf 	sigev_thread_data_t *stdp = arg;
1022248Sraf 	void (*function)(union sigval) = stdp->std_func;
1032248Sraf 	union sigval argument = stdp->std_arg;
1042248Sraf 
1052248Sraf 	lfree(stdp, sizeof (*stdp));
1062248Sraf 	function(argument);
1072248Sraf 	return (NULL);
1082248Sraf }
1092248Sraf 
1102248Sraf /*
1112248Sraf  * Thread pool interface to call the user-supplied notification function.
1122248Sraf  */
1132248Sraf static void
notifier(void * arg)1142248Sraf notifier(void *arg)
1152248Sraf {
1162248Sraf 	(void) notify_thread(arg);
1172248Sraf }
1182248Sraf 
1192248Sraf /*
1202248Sraf  * This routine adds a new work request, described by function
1212248Sraf  * and argument, to the list of outstanding jobs.
1222248Sraf  * It returns 0 indicating success.  A value != 0 indicates an error.
1232248Sraf  */
1242248Sraf static int
sigev_add_work(thread_communication_data_t * tcdp,void (* function)(union sigval),union sigval argument)1252248Sraf sigev_add_work(thread_communication_data_t *tcdp,
1262248Sraf 	void (*function)(union sigval), union sigval argument)
1272248Sraf {
1282248Sraf 	tpool_t *tpool = tcdp->tcd_poolp;
1292248Sraf 	sigev_thread_data_t *stdp;
1302248Sraf 
1312248Sraf 	if (tpool == NULL)
1322248Sraf 		return (EINVAL);
1332248Sraf 	if ((stdp = lmalloc(sizeof (*stdp))) == NULL)
1342248Sraf 		return (errno);
1352248Sraf 	stdp->std_func = function;
1362248Sraf 	stdp->std_arg = argument;
1372248Sraf 	if (tpool_dispatch(tpool, notifier, stdp) != 0) {
1382248Sraf 		lfree(stdp, sizeof (*stdp));
1392248Sraf 		return (errno);
1402248Sraf 	}
1412248Sraf 	return (0);
1422248Sraf }
1432248Sraf 
1442248Sraf static void
sigev_destroy_pool(thread_communication_data_t * tcdp)1452248Sraf sigev_destroy_pool(thread_communication_data_t *tcdp)
1462248Sraf {
1472248Sraf 	if (tcdp->tcd_poolp != NULL)
1482248Sraf 		tpool_abandon(tcdp->tcd_poolp);
1492248Sraf 	tcdp->tcd_poolp = NULL;
1502248Sraf 
1512248Sraf 	if (tcdp->tcd_subsystem == MQ) {
1522248Sraf 		/*
1532248Sraf 		 * synchronize with del_sigev_mq()
1542248Sraf 		 */
1552248Sraf 		sig_mutex_lock(&tcdp->tcd_lock);
1562248Sraf 		tcdp->tcd_server_id = 0;
1572248Sraf 		if (tcdp->tcd_msg_closing) {
1582248Sraf 			(void) cond_broadcast(&tcdp->tcd_cv);
1592248Sraf 			sig_mutex_unlock(&tcdp->tcd_lock);
1602248Sraf 			return;		/* del_sigev_mq() will free the tcd */
1612248Sraf 		}
1622248Sraf 		sig_mutex_unlock(&tcdp->tcd_lock);
1632248Sraf 	}
1642248Sraf 
1652248Sraf 	/*
1662248Sraf 	 * now delete everything
1672248Sraf 	 */
1682248Sraf 	free_sigev_handler(tcdp);
1692248Sraf }
1702248Sraf 
1712248Sraf /*
1722248Sraf  * timer_spawner(), mqueue_spawner(), and aio_spawner() are the main
1732248Sraf  * functions for the daemon threads that get the event(s) for the
1742248Sraf  * respective SIGEV_THREAD subsystems.  There is one timer spawner for
1752248Sraf  * each timer_create(), one mqueue spawner for every mq_open(), and
1762248Sraf  * exactly one aio spawner for all aio requests.  These spawners add
1772248Sraf  * work requests to be done by a pool of daemon worker threads.  In case
1782248Sraf  * the event requires creation of a worker thread with different pthread
1792248Sraf  * attributes than those from the pool of workers, a new daemon thread
1802248Sraf  * with these attributes is spawned apart from the pool of workers.
1812248Sraf  * If the spawner fails to add work or fails to create an additional
1822248Sraf  * thread because of lacking resources, it puts the event back into
1832248Sraf  * the kernel queue and re-tries some time later.
1842248Sraf  */
1852248Sraf 
1862248Sraf void *
timer_spawner(void * arg)1872248Sraf timer_spawner(void *arg)
1882248Sraf {
1892248Sraf 	thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
1902248Sraf 	port_event_t port_event;
1912248Sraf 
1922248Sraf 	/* destroy the pool if we are cancelled */
1932248Sraf 	pthread_cleanup_push(sigev_destroy_pool, tcdp);
1942248Sraf 
1952248Sraf 	for (;;) {
1962248Sraf 		if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) {
1972248Sraf 			dprintf("port_get on port %d failed with %d <%s>\n",
1982248Sraf 			    tcdp->tcd_port, errno, strerror(errno));
1992248Sraf 			break;
2002248Sraf 		}
2012248Sraf 		switch (port_event.portev_source) {
2022248Sraf 		case PORT_SOURCE_TIMER:
2032248Sraf 			break;
2042248Sraf 		case PORT_SOURCE_ALERT:
2052248Sraf 			if (port_event.portev_events != SIGEV_THREAD_TERM)
2062248Sraf 				errno = EPROTO;
2072248Sraf 			goto out;
2082248Sraf 		default:
2092248Sraf 			dprintf("port_get on port %d returned %u "
2102248Sraf 			    "(not PORT_SOURCE_TIMER)\n",
2112248Sraf 			    tcdp->tcd_port, port_event.portev_source);
2122248Sraf 			errno = EPROTO;
2132248Sraf 			goto out;
2142248Sraf 		}
2152248Sraf 
2162248Sraf 		tcdp->tcd_overruns = port_event.portev_events - 1;
2172248Sraf 		if (sigev_add_work(tcdp,
2182248Sraf 		    tcdp->tcd_notif.sigev_notify_function,
2192248Sraf 		    tcdp->tcd_notif.sigev_value) != 0)
2202248Sraf 			break;
2212248Sraf 		/* wait until job is done before looking for another */
2222248Sraf 		tpool_wait(tcdp->tcd_poolp);
2232248Sraf 	}
2242248Sraf out:
2252248Sraf 	pthread_cleanup_pop(1);
2262248Sraf 	return (NULL);
2272248Sraf }
2282248Sraf 
2292248Sraf void *
mqueue_spawner(void * arg)2302248Sraf mqueue_spawner(void *arg)
2312248Sraf {
2322248Sraf 	thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
2332248Sraf 	int ret = 0;
2342248Sraf 	int ntype;
2352248Sraf 	void (*function)(union sigval);
2362248Sraf 	union sigval argument;
2372248Sraf 
2382248Sraf 	/* destroy the pool if we are cancelled */
2392248Sraf 	pthread_cleanup_push(sigev_destroy_pool, tcdp);
2402248Sraf 
2412248Sraf 	while (ret == 0) {
2422248Sraf 		sig_mutex_lock(&tcdp->tcd_lock);
2432248Sraf 		pthread_cleanup_push(sig_mutex_unlock, &tcdp->tcd_lock);
2442248Sraf 		while ((ntype = tcdp->tcd_msg_enabled) == 0)
2452248Sraf 			(void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock);
2462248Sraf 		pthread_cleanup_pop(1);
2472248Sraf 
2482248Sraf 		while (sem_wait(tcdp->tcd_msg_avail) == -1)
2492248Sraf 			continue;
2502248Sraf 
2512248Sraf 		sig_mutex_lock(&tcdp->tcd_lock);
2522248Sraf 		tcdp->tcd_msg_enabled = 0;
2532248Sraf 		sig_mutex_unlock(&tcdp->tcd_lock);
2542248Sraf 
2552248Sraf 		/* ASSERT(ntype == SIGEV_THREAD || ntype == SIGEV_PORT); */
2562248Sraf 		if (ntype == SIGEV_THREAD) {
2572248Sraf 			function = tcdp->tcd_notif.sigev_notify_function;
2582248Sraf 			argument.sival_ptr = tcdp->tcd_msg_userval;
2592248Sraf 			ret = sigev_add_work(tcdp, function, argument);
2602248Sraf 		} else {	/* ntype == SIGEV_PORT */
2612248Sraf 			ret = _port_dispatch(tcdp->tcd_port, 0, PORT_SOURCE_MQ,
2622248Sraf 			    0, (uintptr_t)tcdp->tcd_msg_object,
2632248Sraf 			    tcdp->tcd_msg_userval);
2642248Sraf 		}
2652248Sraf 	}
2662248Sraf 	sig_mutex_unlock(&tcdp->tcd_lock);
2672248Sraf 
2682248Sraf 	pthread_cleanup_pop(1);
2692248Sraf 	return (NULL);
2702248Sraf }
2712248Sraf 
2722248Sraf void *
aio_spawner(void * arg)2732248Sraf aio_spawner(void *arg)
2742248Sraf {
2752248Sraf 	thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
2762248Sraf 	int error = 0;
2772248Sraf 	void (*function)(union sigval);
2782248Sraf 	union sigval argument;
2792248Sraf 	port_event_t port_event;
2802248Sraf 	struct sigevent *sigevp;
2812248Sraf 	timespec_t delta;
2822248Sraf 	pthread_attr_t *attrp;
2832248Sraf 
2842248Sraf 	/* destroy the pool if we are cancelled */
2852248Sraf 	pthread_cleanup_push(sigev_destroy_pool, tcdp);
2862248Sraf 
2872248Sraf 	while (error == 0) {
2882248Sraf 		if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) {
2892248Sraf 			error = errno;
2902248Sraf 			dprintf("port_get on port %d failed with %d <%s>\n",
2912248Sraf 			    tcdp->tcd_port, error, strerror(error));
2922248Sraf 			break;
2932248Sraf 		}
2942248Sraf 		switch (port_event.portev_source) {
2952248Sraf 		case PORT_SOURCE_AIO:
2962248Sraf 			break;
2972248Sraf 		case PORT_SOURCE_ALERT:
2982248Sraf 			if (port_event.portev_events != SIGEV_THREAD_TERM)
2992248Sraf 				errno = EPROTO;
3002248Sraf 			goto out;
3012248Sraf 		default:
3022248Sraf 			dprintf("port_get on port %d returned %u "
3032248Sraf 			    "(not PORT_SOURCE_AIO)\n",
3042248Sraf 			    tcdp->tcd_port, port_event.portev_source);
3052248Sraf 			errno = EPROTO;
3062248Sraf 			goto out;
3072248Sraf 		}
3082248Sraf 		argument.sival_ptr = port_event.portev_user;
3092248Sraf 		switch (port_event.portev_events) {
3102248Sraf 		case AIOLIO:
3112248Sraf #if !defined(_LP64)
3122248Sraf 		case AIOLIO64:
3132248Sraf #endif
3142248Sraf 			sigevp = (struct sigevent *)port_event.portev_object;
3152248Sraf 			function = sigevp->sigev_notify_function;
3162248Sraf 			attrp = sigevp->sigev_notify_attributes;
3172248Sraf 			break;
3182248Sraf 		case AIOAREAD:
3192248Sraf 		case AIOAWRITE:
3202248Sraf 		case AIOFSYNC:
3215891Sraf 			{
3222248Sraf 			aiocb_t *aiocbp =
3232248Sraf 			    (aiocb_t *)port_event.portev_object;
3242248Sraf 			function = aiocbp->aio_sigevent.sigev_notify_function;
3252248Sraf 			attrp = aiocbp->aio_sigevent.sigev_notify_attributes;
3262248Sraf 			break;
3275891Sraf 			}
3282248Sraf #if !defined(_LP64)
3292248Sraf 		case AIOAREAD64:
3302248Sraf 		case AIOAWRITE64:
3312248Sraf 		case AIOFSYNC64:
3325891Sraf 			{
3332248Sraf 			aiocb64_t *aiocbp =
3342248Sraf 			    (aiocb64_t *)port_event.portev_object;
3352248Sraf 			function = aiocbp->aio_sigevent.sigev_notify_function;
3362248Sraf 			attrp = aiocbp->aio_sigevent.sigev_notify_attributes;
3372248Sraf 			break;
3385891Sraf 			}
3392248Sraf #endif
3402248Sraf 		default:
3412248Sraf 			function = NULL;
3422248Sraf 			attrp = NULL;
3432248Sraf 			break;
3442248Sraf 		}
3452248Sraf 
3462248Sraf 		if (function == NULL)
3472248Sraf 			error = EINVAL;
348*6812Sraf 		else if (pthread_attr_equal(attrp, tcdp->tcd_attrp))
3492248Sraf 			error = sigev_add_work(tcdp, function, argument);
3502248Sraf 		else {
3512248Sraf 			/*
3522248Sraf 			 * The attributes don't match.
3532248Sraf 			 * Spawn a thread with the non-matching attributes.
3542248Sraf 			 */
3552248Sraf 			pthread_attr_t local_attr;
3562248Sraf 			sigev_thread_data_t *stdp;
3572248Sraf 
3582248Sraf 			if ((stdp = lmalloc(sizeof (*stdp))) == NULL)
3592248Sraf 				error = ENOMEM;
3602248Sraf 			else
361*6812Sraf 				error = pthread_attr_clone(&local_attr, attrp);
3622248Sraf 
3632248Sraf 			if (error == 0) {
3642248Sraf 				(void) pthread_attr_setdetachstate(
3652248Sraf 				    &local_attr, PTHREAD_CREATE_DETACHED);
366*6812Sraf 				(void) pthread_attr_setdaemonstate_np(
3672248Sraf 				    &local_attr, PTHREAD_CREATE_DAEMON_NP);
3682248Sraf 				stdp->std_func = function;
3692248Sraf 				stdp->std_arg = argument;
3702248Sraf 				error = pthread_create(NULL, &local_attr,
3712248Sraf 				    notify_thread, stdp);
3722248Sraf 				(void) pthread_attr_destroy(&local_attr);
3732248Sraf 			}
3742248Sraf 			if (error && stdp != NULL)
3752248Sraf 				lfree(stdp, sizeof (*stdp));
3762248Sraf 		}
3772248Sraf 
3782248Sraf 		if (error) {
3792248Sraf 			dprintf("Cannot add work, error=%d <%s>.\n",
3802248Sraf 			    error, strerror(error));
3812248Sraf 			if (error == EAGAIN || error == ENOMEM) {
3822248Sraf 				/* (Temporary) no resources are available. */
3832248Sraf 				if (_port_dispatch(tcdp->tcd_port, 0,
3842248Sraf 				    PORT_SOURCE_AIO, port_event.portev_events,
3852248Sraf 				    port_event.portev_object,
3862248Sraf 				    port_event.portev_user) != 0)
3872248Sraf 					break;
3882248Sraf 				error = 0;
3892248Sraf 				delta.tv_sec = 0;
3902248Sraf 				delta.tv_nsec = NANOSEC / 20;	/* 50 msec */
3912248Sraf 				(void) nanosleep(&delta, NULL);
3922248Sraf 			}
3932248Sraf 		}
3942248Sraf 	}
3952248Sraf out:
3962248Sraf 	pthread_cleanup_pop(1);
3972248Sraf 	return (NULL);
3982248Sraf }
3992248Sraf 
4002248Sraf /*
4012248Sraf  * Allocate a thread_communication_data_t block.
4022248Sraf  */
4032248Sraf static thread_communication_data_t *
alloc_sigev_handler(subsystem_t caller)4042248Sraf alloc_sigev_handler(subsystem_t caller)
4052248Sraf {
4062248Sraf 	thread_communication_data_t *tcdp;
4072248Sraf 
4082248Sraf 	if ((tcdp = lmalloc(sizeof (*tcdp))) != NULL) {
4092248Sraf 		tcdp->tcd_subsystem = caller;
4102248Sraf 		tcdp->tcd_port = -1;
4112248Sraf 		(void) mutex_init(&tcdp->tcd_lock, USYNC_THREAD, NULL);
4122248Sraf 		(void) cond_init(&tcdp->tcd_cv, USYNC_THREAD, NULL);
4132248Sraf 	}
4142248Sraf 	return (tcdp);
4152248Sraf }
4162248Sraf 
4172248Sraf /*
4182248Sraf  * Free a thread_communication_data_t block.
4192248Sraf  */
4202248Sraf void
free_sigev_handler(thread_communication_data_t * tcdp)4212248Sraf free_sigev_handler(thread_communication_data_t *tcdp)
4222248Sraf {
4232248Sraf 	if (tcdp->tcd_attrp) {
4242248Sraf 		(void) pthread_attr_destroy(tcdp->tcd_attrp);
4252248Sraf 		tcdp->tcd_attrp = NULL;
4262248Sraf 	}
4272248Sraf 	(void) memset(&tcdp->tcd_notif, 0, sizeof (tcdp->tcd_notif));
4282248Sraf 
4292248Sraf 	switch (tcdp->tcd_subsystem) {
4302248Sraf 	case TIMER:
4312248Sraf 	case AIO:
4322248Sraf 		if (tcdp->tcd_port >= 0)
4332248Sraf 			(void) close(tcdp->tcd_port);
4342248Sraf 		break;
4352248Sraf 	case MQ:
4362248Sraf 		tcdp->tcd_msg_avail = NULL;
4372248Sraf 		tcdp->tcd_msg_object = NULL;
4382248Sraf 		tcdp->tcd_msg_userval = NULL;
4392248Sraf 		tcdp->tcd_msg_enabled = 0;
4402248Sraf 		break;
4412248Sraf 	}
4422248Sraf 
4432248Sraf 	lfree(tcdp, sizeof (*tcdp));
4442248Sraf }
4452248Sraf 
4462248Sraf /*
4472248Sraf  * Initialize data structure and create the port.
4482248Sraf  */
4492248Sraf thread_communication_data_t *
setup_sigev_handler(const struct sigevent * sigevp,subsystem_t caller)4502248Sraf setup_sigev_handler(const struct sigevent *sigevp, subsystem_t caller)
4512248Sraf {
4522248Sraf 	thread_communication_data_t *tcdp;
4532248Sraf 	int error;
4542248Sraf 
4552248Sraf 	if (sigevp == NULL) {
4562248Sraf 		errno = EINVAL;
4572248Sraf 		return (NULL);
4582248Sraf 	}
4592248Sraf 
4602248Sraf 	if ((tcdp = alloc_sigev_handler(caller)) == NULL) {
4612248Sraf 		errno = ENOMEM;
4622248Sraf 		return (NULL);
4632248Sraf 	}
4642248Sraf 
4652248Sraf 	if (sigevp->sigev_notify_attributes == NULL)
4662248Sraf 		tcdp->tcd_attrp = NULL;		/* default attributes */
4672248Sraf 	else {
4682248Sraf 		/*
4692248Sraf 		 * We cannot just copy the sigevp->sigev_notify_attributes
4702248Sraf 		 * pointer.  We need to initialize a new pthread_attr_t
4712248Sraf 		 * structure with the values from the user-supplied
4722248Sraf 		 * pthread_attr_t.
4732248Sraf 		 */
4742248Sraf 		tcdp->tcd_attrp = &tcdp->tcd_user_attr;
475*6812Sraf 		error = pthread_attr_clone(tcdp->tcd_attrp,
4765891Sraf 		    sigevp->sigev_notify_attributes);
4772248Sraf 		if (error) {
4782248Sraf 			tcdp->tcd_attrp = NULL;
4792248Sraf 			free_sigev_handler(tcdp);
4802248Sraf 			errno = error;
4812248Sraf 			return (NULL);
4822248Sraf 		}
4832248Sraf 	}
4842248Sraf 	tcdp->tcd_notif = *sigevp;
4852248Sraf 	tcdp->tcd_notif.sigev_notify_attributes = tcdp->tcd_attrp;
4862248Sraf 
4872248Sraf 	if (caller == TIMER || caller == AIO) {
4882248Sraf 		if ((tcdp->tcd_port = port_create()) < 0 ||
4892248Sraf 		    fcntl(tcdp->tcd_port, FD_CLOEXEC) == -1) {
4902248Sraf 			free_sigev_handler(tcdp);
4912248Sraf 			errno = EBADF;
4922248Sraf 			return (NULL);
4932248Sraf 		}
4942248Sraf 	}
4952248Sraf 	return (tcdp);
4962248Sraf }
4972248Sraf 
4982248Sraf /*
4992248Sraf  * Create a thread pool and launch the spawner.
5002248Sraf  */
5012248Sraf int
launch_spawner(thread_communication_data_t * tcdp)5022248Sraf launch_spawner(thread_communication_data_t *tcdp)
5032248Sraf {
5042248Sraf 	int ret;
5052248Sraf 	int maxworkers;
5062248Sraf 	void *(*spawner)(void *);
5072248Sraf 	sigset_t set;
5082248Sraf 	sigset_t oset;
5092248Sraf 
5102248Sraf 	switch (tcdp->tcd_subsystem) {
5112248Sraf 	case TIMER:
5122248Sraf 		spawner = timer_spawner;
5132248Sraf 		maxworkers = 1;
5142248Sraf 		break;
5152248Sraf 	case MQ:
5162248Sraf 		spawner = mqueue_spawner;
5172248Sraf 		maxworkers = 1;
5182248Sraf 		break;
5192248Sraf 	case AIO:
5202248Sraf 		spawner = aio_spawner;
5212248Sraf 		maxworkers = 100;
5222248Sraf 		break;
5232248Sraf 	default:
5242248Sraf 		return (-1);
5252248Sraf 	}
5262248Sraf 	tcdp->tcd_poolp = tpool_create(1, maxworkers, 20,
5272248Sraf 	    tcdp->tcd_notif.sigev_notify_attributes);
5282248Sraf 	if (tcdp->tcd_poolp == NULL)
5292248Sraf 		return (-1);
5302248Sraf 	/* create the spawner with all signals blocked */
5312248Sraf 	(void) sigfillset(&set);
5322248Sraf 	(void) thr_sigsetmask(SIG_SETMASK, &set, &oset);
5332248Sraf 	ret = thr_create(NULL, 0, spawner, tcdp,
5342248Sraf 	    THR_DETACHED | THR_DAEMON, &tcdp->tcd_server_id);
5352248Sraf 	(void) thr_sigsetmask(SIG_SETMASK, &oset, NULL);
5362248Sraf 	if (ret != 0) {
5372248Sraf 		tpool_destroy(tcdp->tcd_poolp);
5382248Sraf 		tcdp->tcd_poolp = NULL;
5392248Sraf 		return (-1);
5402248Sraf 	}
5412248Sraf 	return (0);
5422248Sraf }
5432248Sraf 
5442248Sraf /*
5452248Sraf  * Delete the data associated with the sigev_thread timer, if timer is
5462248Sraf  * associated with such a notification option.
5472248Sraf  * Destroy the timer_spawner thread.
5482248Sraf  */
5492248Sraf int
del_sigev_timer(timer_t timer)5502248Sraf del_sigev_timer(timer_t timer)
5512248Sraf {
5522248Sraf 	int rc = 0;
5532248Sraf 	thread_communication_data_t *tcdp;
5542248Sraf 
5552248Sraf 	if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL) {
5562248Sraf 		sig_mutex_lock(&tcdp->tcd_lock);
5572248Sraf 		if (tcdp->tcd_port >= 0) {
5582248Sraf 			if ((rc = port_alert(tcdp->tcd_port,
5592248Sraf 			    PORT_ALERT_SET, SIGEV_THREAD_TERM, NULL)) == 0) {
5602248Sraf 				dprintf("del_sigev_timer(%d) OK.\n", timer);
5612248Sraf 			}
5622248Sraf 		}
5632248Sraf 		timer_tcd[timer] = NULL;
5642248Sraf 		sig_mutex_unlock(&tcdp->tcd_lock);
5652248Sraf 	}
5662248Sraf 	return (rc);
5672248Sraf }
5682248Sraf 
5692248Sraf int
sigev_timer_getoverrun(timer_t timer)5702248Sraf sigev_timer_getoverrun(timer_t timer)
5712248Sraf {
5722248Sraf 	thread_communication_data_t *tcdp;
5732248Sraf 
5742248Sraf 	if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL)
5752248Sraf 		return (tcdp->tcd_overruns);
5762248Sraf 	return (0);
5772248Sraf }
5782248Sraf 
5792248Sraf static void
del_sigev_mq_cleanup(thread_communication_data_t * tcdp)5802248Sraf del_sigev_mq_cleanup(thread_communication_data_t *tcdp)
5812248Sraf {
5822248Sraf 	sig_mutex_unlock(&tcdp->tcd_lock);
5832248Sraf 	free_sigev_handler(tcdp);
5842248Sraf }
5852248Sraf 
5862248Sraf /*
5872248Sraf  * Delete the data associated with the sigev_thread message queue,
5882248Sraf  * if the message queue is associated with such a notification option.
5892248Sraf  * Destroy the mqueue_spawner thread.
5902248Sraf  */
5912248Sraf void
del_sigev_mq(thread_communication_data_t * tcdp)5922248Sraf del_sigev_mq(thread_communication_data_t *tcdp)
5932248Sraf {
5942248Sraf 	pthread_t server_id;
5952248Sraf 	int rc;
5962248Sraf 
5972248Sraf 	sig_mutex_lock(&tcdp->tcd_lock);
5982248Sraf 
5992248Sraf 	server_id = tcdp->tcd_server_id;
6002248Sraf 	tcdp->tcd_msg_closing = 1;
6012248Sraf 	if ((rc = pthread_cancel(server_id)) != 0) {	/* "can't happen" */
6022248Sraf 		sig_mutex_unlock(&tcdp->tcd_lock);
6032248Sraf 		dprintf("Fail to cancel %u with error %d <%s>.\n",
6042248Sraf 		    server_id, rc, strerror(rc));
6052248Sraf 		return;
6062248Sraf 	}
6072248Sraf 
6082248Sraf 	/*
6092248Sraf 	 * wait for sigev_destroy_pool() to finish
6102248Sraf 	 */
6112248Sraf 	pthread_cleanup_push(del_sigev_mq_cleanup, tcdp);
6122248Sraf 	while (tcdp->tcd_server_id == server_id)
6132248Sraf 		(void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock);
6142248Sraf 	pthread_cleanup_pop(1);
6152248Sraf }
6162248Sraf 
6172248Sraf /*
6182248Sraf  * POSIX aio:
6192248Sraf  * If the notification type is SIGEV_THREAD, set up
6202248Sraf  * the port number for notifications.  Create the
6212248Sraf  * thread pool and launch the spawner if necessary.
6222248Sraf  * If the notification type is not SIGEV_THREAD, do nothing.
6232248Sraf  */
6242248Sraf int
_aio_sigev_thread_init(struct sigevent * sigevp)6252248Sraf _aio_sigev_thread_init(struct sigevent *sigevp)
6262248Sraf {
6272248Sraf 	static mutex_t sigev_aio_lock = DEFAULTMUTEX;
6282248Sraf 	static cond_t sigev_aio_cv = DEFAULTCV;
6292248Sraf 	static int sigev_aio_busy = 0;
6302248Sraf 
6312248Sraf 	thread_communication_data_t *tcdp;
6322248Sraf 	int port;
6335891Sraf 	int cancel_state;
6342248Sraf 	int rc = 0;
6352248Sraf 
6362248Sraf 	if (sigevp == NULL ||
6372248Sraf 	    sigevp->sigev_notify != SIGEV_THREAD ||
6382248Sraf 	    sigevp->sigev_notify_function == NULL)
6392248Sraf 		return (0);
6402248Sraf 
6412248Sraf 	lmutex_lock(&sigev_aio_lock);
6425891Sraf 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
6432248Sraf 	while (sigev_aio_busy)
6445891Sraf 		(void) cond_wait(&sigev_aio_cv, &sigev_aio_lock);
6455891Sraf 	(void) pthread_setcancelstate(cancel_state, NULL);
6462248Sraf 	if ((tcdp = sigev_aio_tcd) != NULL)
6472248Sraf 		port = tcdp->tcd_port;
6482248Sraf 	else {
6492248Sraf 		sigev_aio_busy = 1;
6502248Sraf 		lmutex_unlock(&sigev_aio_lock);
6512248Sraf 
6522248Sraf 		tcdp = setup_sigev_handler(sigevp, AIO);
6532248Sraf 		if (tcdp == NULL) {
6542248Sraf 			port = -1;
6552248Sraf 			rc = -1;
6562248Sraf 		} else if (launch_spawner(tcdp) != 0) {
6572248Sraf 			free_sigev_handler(tcdp);
6582248Sraf 			tcdp = NULL;
6592248Sraf 			port = -1;
6602248Sraf 			rc = -1;
6612248Sraf 		} else {
6622248Sraf 			port = tcdp->tcd_port;
6632248Sraf 		}
6642248Sraf 
6652248Sraf 		lmutex_lock(&sigev_aio_lock);
6662248Sraf 		sigev_aio_tcd = tcdp;
6672248Sraf 		sigev_aio_busy = 0;
6682248Sraf 		(void) cond_broadcast(&sigev_aio_cv);
6692248Sraf 	}
6702248Sraf 	lmutex_unlock(&sigev_aio_lock);
6712248Sraf 	sigevp->sigev_signo = port;
6722248Sraf 	return (rc);
6732248Sraf }
6742248Sraf 
6752248Sraf int
_aio_sigev_thread(aiocb_t * aiocbp)6762248Sraf _aio_sigev_thread(aiocb_t *aiocbp)
6772248Sraf {
6782248Sraf 	if (aiocbp == NULL)
6792248Sraf 		return (0);
6802248Sraf 	return (_aio_sigev_thread_init(&aiocbp->aio_sigevent));
6812248Sraf }
6822248Sraf 
6832248Sraf #if !defined(_LP64)
6842248Sraf int
_aio_sigev_thread64(aiocb64_t * aiocbp)6852248Sraf _aio_sigev_thread64(aiocb64_t *aiocbp)
6862248Sraf {
6872248Sraf 	if (aiocbp == NULL)
6882248Sraf 		return (0);
6892248Sraf 	return (_aio_sigev_thread_init(&aiocbp->aio_sigevent));
6902248Sraf }
6912248Sraf #endif
6922248Sraf 
6932248Sraf /*
6942248Sraf  * Cleanup POSIX aio after fork1() in the child process.
6952248Sraf  */
6962248Sraf void
postfork1_child_sigev_aio(void)6972248Sraf postfork1_child_sigev_aio(void)
6982248Sraf {
6992248Sraf 	thread_communication_data_t *tcdp;
7002248Sraf 
7012248Sraf 	if ((tcdp = sigev_aio_tcd) != NULL) {
7022248Sraf 		sigev_aio_tcd = NULL;
7032248Sraf 		tcd_teardown(tcdp);
7042248Sraf 	}
7052248Sraf }
7062248Sraf 
7072248Sraf /*
7082248Sraf  * Utility function for the various postfork1_child_sigev_*() functions.
7092248Sraf  * Clean up the tcdp data structure and close the port.
7102248Sraf  */
7112248Sraf void
tcd_teardown(thread_communication_data_t * tcdp)7122248Sraf tcd_teardown(thread_communication_data_t *tcdp)
7132248Sraf {
7142248Sraf 	if (tcdp->tcd_poolp != NULL)
7152248Sraf 		tpool_abandon(tcdp->tcd_poolp);
7162248Sraf 	tcdp->tcd_poolp = NULL;
7172248Sraf 	tcdp->tcd_server_id = 0;
7182248Sraf 	free_sigev_handler(tcdp);
7192248Sraf }
720