xref: /onnv-gate/usr/src/lib/libc/port/rt/mqueue.c (revision 7175:ca52ae05bc25)
12248Sraf /*
22248Sraf  * CDDL HEADER START
32248Sraf  *
42248Sraf  * The contents of this file are subject to the terms of the
52248Sraf  * Common Development and Distribution License (the "License").
62248Sraf  * You may not use this file except in compliance with the License.
72248Sraf  *
82248Sraf  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
92248Sraf  * or http://www.opensolaris.org/os/licensing.
102248Sraf  * See the License for the specific language governing permissions
112248Sraf  * and limitations under the License.
122248Sraf  *
132248Sraf  * When distributing Covered Code, include this CDDL HEADER in each
142248Sraf  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
152248Sraf  * If applicable, add the following below this CDDL HEADER, with the
162248Sraf  * fields enclosed by brackets "[]" replaced with your own identifying
172248Sraf  * information: Portions Copyright [yyyy] [name of copyright owner]
182248Sraf  *
192248Sraf  * CDDL HEADER END
202248Sraf  */
212248Sraf 
222248Sraf /*
235891Sraf  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
242248Sraf  * Use is subject to license terms.
252248Sraf  */
262248Sraf 
272248Sraf #pragma ident	"%Z%%M%	%I%	%E% SMI"
282248Sraf 
296812Sraf #include "lint.h"
302248Sraf #include "mtlib.h"
312248Sraf #define	_KMEMUSER
322248Sraf #include <sys/param.h>		/* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */
332248Sraf #undef	_KMEMUSER
342248Sraf #include <mqueue.h>
352248Sraf #include <sys/types.h>
362248Sraf #include <sys/file.h>
372248Sraf #include <sys/mman.h>
382248Sraf #include <errno.h>
392248Sraf #include <stdarg.h>
402248Sraf #include <limits.h>
412248Sraf #include <pthread.h>
422248Sraf #include <assert.h>
432248Sraf #include <string.h>
442248Sraf #include <unistd.h>
452248Sraf #include <stdlib.h>
462248Sraf #include <sys/stat.h>
472248Sraf #include <inttypes.h>
482248Sraf #include "sigev_thread.h"
492248Sraf #include "pos4obj.h"
502248Sraf 
512248Sraf /*
522248Sraf  * Default values per message queue
532248Sraf  */
542248Sraf #define	MQ_MAXMSG	128
552248Sraf #define	MQ_MAXSIZE	1024
562248Sraf 
572248Sraf #define	MQ_MAGIC	0x4d534751		/* "MSGQ" */
582248Sraf 
592248Sraf /*
602248Sraf  * Message header which is part of messages in link list
612248Sraf  */
622248Sraf typedef struct {
632248Sraf 	uint64_t 	msg_next;	/* offset of next message in the link */
642248Sraf 	uint64_t	msg_len;	/* length of the message */
652248Sraf } msghdr_t;
662248Sraf 
672248Sraf /*
682248Sraf  * message queue description
692248Sraf  */
702248Sraf struct mq_dn {
712248Sraf 	size_t		mqdn_flags;	/* open description flags */
722248Sraf };
732248Sraf 
742248Sraf /*
752248Sraf  * message queue descriptor structure
762248Sraf  */
772248Sraf typedef struct mq_des {
782248Sraf 	struct mq_des	*mqd_next;	/* list of all open mq descriptors, */
792248Sraf 	struct mq_des	*mqd_prev;	/* needed for fork-safety */
802248Sraf 	int		mqd_magic;	/* magic # to identify mq_des */
812248Sraf 	int		mqd_flags;	/* operation flag per open */
822248Sraf 	struct mq_header *mqd_mq;	/* address pointer of message Q */
832248Sraf 	struct mq_dn	*mqd_mqdn;	/* open	description */
842248Sraf 	thread_communication_data_t *mqd_tcd;	/* SIGEV_THREAD notification */
85*7175Sraf 	int		mqd_ownerdead;	/* mq_exclusive is inconsistent */
862248Sraf } mqdes_t;
872248Sraf 
882248Sraf /*
892248Sraf  * message queue common header, part of the mmap()ed file.
902248Sraf  * Since message queues may be shared between 32- and 64-bit processes,
912248Sraf  * care must be taken to make sure that the elements of this structure
922248Sraf  * are identical for both _LP64 and _ILP32 cases.
932248Sraf  */
942248Sraf typedef struct mq_header {
952248Sraf 	/* first field must be mq_totsize, DO NOT insert before this	*/
962248Sraf 	int64_t		mq_totsize;	/* total size of the Queue */
972248Sraf 	int64_t		mq_maxsz;	/* max size of each message */
982248Sraf 	uint32_t	mq_maxmsg;	/* max messages in the queue */
992248Sraf 	uint32_t	mq_maxprio;	/* maximum mqueue priority */
1002248Sraf 	uint32_t	mq_curmaxprio;	/* current maximum MQ priority */
1012248Sraf 	uint32_t	mq_mask;	/* priority bitmask */
1022248Sraf 	uint64_t	mq_freep;	/* free message's head pointer */
1032248Sraf 	uint64_t	mq_headpp;	/* pointer to head pointers */
1042248Sraf 	uint64_t	mq_tailpp;	/* pointer to tail pointers */
1052248Sraf 	signotify_id_t	mq_sigid;	/* notification id (3 int's) */
1062248Sraf 	uint32_t	mq_ntype;	/* notification type (SIGEV_*) */
1072248Sraf 	uint64_t	mq_des;		/* pointer to msg Q descriptor */
1082248Sraf 	mutex_t		mq_exclusive;	/* acquire for exclusive access */
1092248Sraf 	sem_t		mq_rblocked;	/* number of processes rblocked */
1102248Sraf 	sem_t		mq_notfull;	/* mq_send()'s block on this */
1112248Sraf 	sem_t		mq_notempty;	/* mq_receive()'s block on this */
1122248Sraf 	sem_t		mq_spawner;	/* spawner thread blocks on this */
1132248Sraf } mqhdr_t;
1142248Sraf 
1152248Sraf /*
1162248Sraf  * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit".
1172248Sraf  * If this assumption is somehow invalidated, mq_open() needs to be changed
1182248Sraf  * back to the old version which kept a count and enforced a limit.
1192248Sraf  * We make sure that this is pointed out to those changing <sys/param.h>
1202248Sraf  * by checking _MQ_OPEN_MAX at compile time.
1212248Sraf  */
1222248Sraf #if _MQ_OPEN_MAX != -1
1232248Sraf #error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing."
1242248Sraf #endif
1252248Sraf 
1262248Sraf #define	MQ_ALIGNSIZE	8	/* 64-bit alignment */
1272248Sraf 
1282248Sraf #ifdef DEBUG
1292248Sraf #define	MQ_ASSERT(x)	assert(x);
1302248Sraf 
1312248Sraf #define	MQ_ASSERT_PTR(_m, _p) \
1322248Sraf 	assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \
1332248Sraf 	    !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \
1342248Sraf 	    _m->mq_totsize));
1352248Sraf 
1362248Sraf #define	MQ_ASSERT_SEMVAL_LEQ(sem, val) { \
1372248Sraf 	int _val; \
1382248Sraf 	(void) sem_getvalue((sem), &_val); \
1392248Sraf 	assert((_val) <= val); }
1402248Sraf #else
1412248Sraf #define	MQ_ASSERT(x)
1422248Sraf #define	MQ_ASSERT_PTR(_m, _p)
1432248Sraf #define	MQ_ASSERT_SEMVAL_LEQ(sem, val)
1442248Sraf #endif
1452248Sraf 
1462248Sraf #define	MQ_PTR(m, n)	((msghdr_t *)((uintptr_t)m + (uintptr_t)n))
1472248Sraf #define	HEAD_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
1482248Sraf 			(uintptr_t)m->mq_headpp + n * sizeof (uint64_t)))
1492248Sraf #define	TAIL_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
1502248Sraf 			(uintptr_t)m->mq_tailpp + n * sizeof (uint64_t)))
1512248Sraf 
1522248Sraf #define	MQ_RESERVED	((mqdes_t *)-1)
1532248Sraf 
1542248Sraf #define	ABS_TIME	0
1552248Sraf #define	REL_TIME	1
1562248Sraf 
1572248Sraf static mutex_t mq_list_lock = DEFAULTMUTEX;
1582248Sraf static mqdes_t *mq_list = NULL;
1592248Sraf 
1602248Sraf extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id);
1612248Sraf 
1622248Sraf static int
mq_is_valid(mqdes_t * mqdp)1632248Sraf mq_is_valid(mqdes_t *mqdp)
1642248Sraf {
1652248Sraf 	/*
1662248Sraf 	 * Any use of a message queue after it was closed is
1672248Sraf 	 * undefined.  But the standard strongly favours EBADF
1682248Sraf 	 * returns.  Before we dereference which could be fatal,
1692248Sraf 	 * we first do some pointer sanity checks.
1702248Sraf 	 */
1712248Sraf 	if (mqdp != NULL && mqdp != MQ_RESERVED &&
1722248Sraf 	    ((uintptr_t)mqdp & 0x7) == 0) {
1732248Sraf 		return (mqdp->mqd_magic == MQ_MAGIC);
1742248Sraf 	}
1752248Sraf 
1762248Sraf 	return (0);
1772248Sraf }
1782248Sraf 
1792248Sraf static void
mq_init(mqhdr_t * mqhp,size_t msgsize,ssize_t maxmsg)1802248Sraf mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg)
1812248Sraf {
1822248Sraf 	int		i;
1832248Sraf 	uint64_t	temp;
1842248Sraf 	uint64_t	currentp;
1852248Sraf 	uint64_t	nextp;
1862248Sraf 
1872248Sraf 	/*
1882248Sraf 	 * We only need to initialize the non-zero fields.  The use of
1892248Sraf 	 * ftruncate() on the message queue file assures that the
190*7175Sraf 	 * pages will be zero-filled.
1912248Sraf 	 */
192*7175Sraf 	(void) mutex_init(&mqhp->mq_exclusive,
193*7175Sraf 	    USYNC_PROCESS | LOCK_ROBUST, NULL);
1942248Sraf 	(void) sem_init(&mqhp->mq_rblocked, 1, 0);
1952248Sraf 	(void) sem_init(&mqhp->mq_notempty, 1, 0);
1962248Sraf 	(void) sem_init(&mqhp->mq_spawner, 1, 0);
1972248Sraf 	(void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg);
1982248Sraf 
1992248Sraf 	mqhp->mq_maxsz = msgsize;
2002248Sraf 	mqhp->mq_maxmsg = maxmsg;
2012248Sraf 
2022248Sraf 	/*
2032248Sraf 	 * As of this writing (1997), there are 32 message queue priorities.
2042248Sraf 	 * If this is to change, then the size of the mq_mask will
2052248Sraf 	 * also have to change.  If DEBUG is defined, assert that
2062248Sraf 	 * _MQ_PRIO_MAX hasn't changed.
2072248Sraf 	 */
2082248Sraf 	mqhp->mq_maxprio = _MQ_PRIO_MAX;
2092248Sraf #if defined(DEBUG)
2102248Sraf 	/* LINTED always true */
2112248Sraf 	MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX);
2122248Sraf #endif
2132248Sraf 
2142248Sraf 	/*
2152248Sraf 	 * Since the message queue can be mapped into different
2162248Sraf 	 * virtual address ranges by different processes, we don't
2172248Sraf 	 * keep track of pointers, only offsets into the shared region.
2182248Sraf 	 */
2192248Sraf 	mqhp->mq_headpp = sizeof (mqhdr_t);
2202248Sraf 	mqhp->mq_tailpp = mqhp->mq_headpp +
2215891Sraf 	    mqhp->mq_maxprio * sizeof (uint64_t);
2222248Sraf 	mqhp->mq_freep = mqhp->mq_tailpp +
2235891Sraf 	    mqhp->mq_maxprio * sizeof (uint64_t);
2242248Sraf 
2252248Sraf 	currentp = mqhp->mq_freep;
2262248Sraf 	MQ_PTR(mqhp, currentp)->msg_next = 0;
2272248Sraf 
2282248Sraf 	temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
2292248Sraf 	for (i = 1; i < mqhp->mq_maxmsg; i++) {
2302248Sraf 		nextp = currentp + sizeof (msghdr_t) + temp;
2312248Sraf 		MQ_PTR(mqhp, currentp)->msg_next = nextp;
2322248Sraf 		MQ_PTR(mqhp, nextp)->msg_next = 0;
2332248Sraf 		currentp = nextp;
2342248Sraf 	}
2352248Sraf }
2362248Sraf 
2372248Sraf static size_t
mq_getmsg(mqhdr_t * mqhp,char * msgp,uint_t * msg_prio)2382248Sraf mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio)
2392248Sraf {
2402248Sraf 	uint64_t currentp;
2412248Sraf 	msghdr_t *curbuf;
2422248Sraf 	uint64_t *headpp;
2432248Sraf 	uint64_t *tailpp;
2442248Sraf 
2452248Sraf 	MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
2462248Sraf 
2472248Sraf 	/*
2482248Sraf 	 * Get the head and tail pointers for the queue of maximum
2492248Sraf 	 * priority.  We shouldn't be here unless there is a message for
2502248Sraf 	 * us, so it's fair to assert that both the head and tail
2512248Sraf 	 * pointers are non-NULL.
2522248Sraf 	 */
2532248Sraf 	headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio);
2542248Sraf 	tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio);
2552248Sraf 
2562248Sraf 	if (msg_prio != NULL)
2572248Sraf 		*msg_prio = mqhp->mq_curmaxprio;
2582248Sraf 
2592248Sraf 	currentp = *headpp;
2602248Sraf 	MQ_ASSERT_PTR(mqhp, currentp);
2612248Sraf 	curbuf = MQ_PTR(mqhp, currentp);
2622248Sraf 
2632248Sraf 	if ((*headpp = curbuf->msg_next) == NULL) {
2642248Sraf 		/*
2652248Sraf 		 * We just nuked the last message in this priority's queue.
2662248Sraf 		 * Twiddle this priority's bit, and then find the next bit
2672248Sraf 		 * tipped.
2682248Sraf 		 */
2692248Sraf 		uint_t prio = mqhp->mq_curmaxprio;
2702248Sraf 
2712248Sraf 		mqhp->mq_mask &= ~(1u << prio);
2722248Sraf 
2732248Sraf 		for (; prio != 0; prio--)
2742248Sraf 			if (mqhp->mq_mask & (1u << prio))
2752248Sraf 				break;
2762248Sraf 		mqhp->mq_curmaxprio = prio;
2772248Sraf 
2782248Sraf 		*tailpp = NULL;
2792248Sraf 	}
2802248Sraf 
2812248Sraf 	/*
2822248Sraf 	 * Copy the message, and put the buffer back on the free list.
2832248Sraf 	 */
2842248Sraf 	(void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len);
2852248Sraf 	curbuf->msg_next = mqhp->mq_freep;
2862248Sraf 	mqhp->mq_freep = currentp;
2872248Sraf 
2882248Sraf 	return (curbuf->msg_len);
2892248Sraf }
2902248Sraf 
2912248Sraf 
2922248Sraf static void
mq_putmsg(mqhdr_t * mqhp,const char * msgp,ssize_t len,uint_t prio)2932248Sraf mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio)
2942248Sraf {
2952248Sraf 	uint64_t currentp;
2962248Sraf 	msghdr_t *curbuf;
2972248Sraf 	uint64_t *headpp;
2982248Sraf 	uint64_t *tailpp;
2992248Sraf 
3002248Sraf 	MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
3012248Sraf 
3022248Sraf 	/*
3032248Sraf 	 * Grab a free message block, and link it in.  We shouldn't
3042248Sraf 	 * be here unless there is room in the queue for us;  it's
3052248Sraf 	 * fair to assert that the free pointer is non-NULL.
3062248Sraf 	 */
3072248Sraf 	currentp = mqhp->mq_freep;
3082248Sraf 	MQ_ASSERT_PTR(mqhp, currentp);
3092248Sraf 	curbuf = MQ_PTR(mqhp, currentp);
3102248Sraf 
3112248Sraf 	/*
3122248Sraf 	 * Remove a message from the free list, and copy in the new contents.
3132248Sraf 	 */
3142248Sraf 	mqhp->mq_freep = curbuf->msg_next;
3152248Sraf 	curbuf->msg_next = NULL;
3162248Sraf 	(void) memcpy((char *)&curbuf[1], msgp, len);
3172248Sraf 	curbuf->msg_len = len;
3182248Sraf 
3192248Sraf 	headpp = HEAD_PTR(mqhp, prio);
3202248Sraf 	tailpp = TAIL_PTR(mqhp, prio);
3212248Sraf 
3222248Sraf 	if (*tailpp == 0) {
3232248Sraf 		/*
3242248Sraf 		 * This is the first message on this queue.  Set the
3252248Sraf 		 * head and tail pointers, and tip the appropriate bit
3262248Sraf 		 * in the priority mask.
3272248Sraf 		 */
3282248Sraf 		*headpp = currentp;
3292248Sraf 		*tailpp = currentp;
3302248Sraf 		mqhp->mq_mask |= (1u << prio);
3312248Sraf 		if (prio > mqhp->mq_curmaxprio)
3322248Sraf 			mqhp->mq_curmaxprio = prio;
3332248Sraf 	} else {
3342248Sraf 		MQ_ASSERT_PTR(mqhp, *tailpp);
3352248Sraf 		MQ_PTR(mqhp, *tailpp)->msg_next = currentp;
3362248Sraf 		*tailpp = currentp;
3372248Sraf 	}
3382248Sraf }
3392248Sraf 
340*7175Sraf /*
341*7175Sraf  * Send a notification and also delete the registration.
342*7175Sraf  */
343*7175Sraf static void
do_notify(mqhdr_t * mqhp)344*7175Sraf do_notify(mqhdr_t *mqhp)
345*7175Sraf {
346*7175Sraf 	(void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
347*7175Sraf 	if (mqhp->mq_ntype == SIGEV_THREAD ||
348*7175Sraf 	    mqhp->mq_ntype == SIGEV_PORT)
349*7175Sraf 		(void) sem_post(&mqhp->mq_spawner);
350*7175Sraf 	mqhp->mq_ntype = 0;
351*7175Sraf 	mqhp->mq_des = 0;
352*7175Sraf }
353*7175Sraf 
354*7175Sraf /*
355*7175Sraf  * Called when the mq_exclusive lock draws EOWNERDEAD or ENOTRECOVERABLE.
356*7175Sraf  * Wake up anyone waiting on mq_*send() or mq_*receive() and ensure that
357*7175Sraf  * they fail with errno == EBADMSG.  Trigger any registered notification.
358*7175Sraf  */
359*7175Sraf static void
owner_dead(mqdes_t * mqdp,int error)360*7175Sraf owner_dead(mqdes_t *mqdp, int error)
361*7175Sraf {
362*7175Sraf 	mqhdr_t *mqhp = mqdp->mqd_mq;
363*7175Sraf 
364*7175Sraf 	mqdp->mqd_ownerdead = 1;
365*7175Sraf 	(void) sem_post(&mqhp->mq_notfull);
366*7175Sraf 	(void) sem_post(&mqhp->mq_notempty);
367*7175Sraf 	if (error == EOWNERDEAD) {
368*7175Sraf 		if (mqhp->mq_sigid.sn_pid != 0)
369*7175Sraf 			do_notify(mqhp);
370*7175Sraf 		(void) mutex_unlock(&mqhp->mq_exclusive);
371*7175Sraf 	}
372*7175Sraf 	errno = EBADMSG;
373*7175Sraf }
374*7175Sraf 
3752248Sraf mqd_t
mq_open(const char * path,int oflag,...)3766812Sraf mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...)
3772248Sraf {
3782248Sraf 	va_list		ap;
379*7175Sraf 	mode_t		mode = 0;
380*7175Sraf 	struct mq_attr	*attr = NULL;
3812248Sraf 	int		fd;
3822248Sraf 	int		err;
3832248Sraf 	int		cr_flag = 0;
3842248Sraf 	int		locked = 0;
3852248Sraf 	uint64_t	total_size;
3862248Sraf 	size_t		msgsize;
3872248Sraf 	ssize_t		maxmsg;
3882248Sraf 	uint64_t	temp;
3892248Sraf 	void		*ptr;
3902248Sraf 	mqdes_t		*mqdp;
3912248Sraf 	mqhdr_t		*mqhp;
3922248Sraf 	struct mq_dn	*mqdnp;
3932248Sraf 
3942248Sraf 	if (__pos4obj_check(path) == -1)
3952248Sraf 		return ((mqd_t)-1);
3962248Sraf 
3972248Sraf 	/* acquire MSGQ lock to have atomic operation */
3982248Sraf 	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0)
3992248Sraf 		goto out;
4002248Sraf 	locked = 1;
4012248Sraf 
4022248Sraf 	va_start(ap, oflag);
4032248Sraf 	/* filter oflag to have READ/WRITE/CREATE modes only */
4042248Sraf 	oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK);
4052248Sraf 	if ((oflag & O_CREAT) != 0) {
4062248Sraf 		mode = va_arg(ap, mode_t);
4072248Sraf 		attr = va_arg(ap, struct mq_attr *);
4082248Sraf 	}
4092248Sraf 	va_end(ap);
4102248Sraf 
4112248Sraf 	if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag,
4122248Sraf 	    mode, &cr_flag)) < 0)
4132248Sraf 		goto out;
4142248Sraf 
4152248Sraf 	/* closing permission file */
4162248Sraf 	(void) __close_nc(fd);
4172248Sraf 
4182248Sraf 	/* Try to open/create data file */
4192248Sraf 	if (cr_flag) {
4202248Sraf 		cr_flag = PFILE_CREATE;
4212248Sraf 		if (attr == NULL) {
4222248Sraf 			maxmsg = MQ_MAXMSG;
4232248Sraf 			msgsize = MQ_MAXSIZE;
4242248Sraf 		} else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
4252248Sraf 			errno = EINVAL;
4262248Sraf 			goto out;
4272248Sraf 		} else if (attr->mq_maxmsg > _SEM_VALUE_MAX) {
4282248Sraf 			errno = ENOSPC;
4292248Sraf 			goto out;
4302248Sraf 		} else {
4312248Sraf 			maxmsg = attr->mq_maxmsg;
4322248Sraf 			msgsize = attr->mq_msgsize;
4332248Sraf 		}
4342248Sraf 
4352248Sraf 		/* adjust for message size at word boundary */
4362248Sraf 		temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
4372248Sraf 
4382248Sraf 		total_size = sizeof (mqhdr_t) +
4395891Sraf 		    maxmsg * (temp + sizeof (msghdr_t)) +
4405891Sraf 		    2 * _MQ_PRIO_MAX * sizeof (uint64_t);
4412248Sraf 
4422248Sraf 		if (total_size > SSIZE_MAX) {
4432248Sraf 			errno = ENOSPC;
4442248Sraf 			goto out;
4452248Sraf 		}
4462248Sraf 
4472248Sraf 		/*
4482248Sraf 		 * data file is opened with read/write to those
4492248Sraf 		 * who have read or write permission
4502248Sraf 		 */
4512248Sraf 		mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1;
4522248Sraf 		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
4532248Sraf 		    (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0)
4542248Sraf 			goto out;
4552248Sraf 
4562248Sraf 		cr_flag |= DFILE_CREATE | DFILE_OPEN;
4572248Sraf 
4582248Sraf 		/* force permissions to avoid umask effect */
4592248Sraf 		if (fchmod(fd, mode) < 0)
4602248Sraf 			goto out;
4612248Sraf 
4622248Sraf 		if (ftruncate64(fd, (off64_t)total_size) < 0)
4632248Sraf 			goto out;
4642248Sraf 	} else {
4652248Sraf 		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
4662248Sraf 		    O_RDWR, 0666, &err)) < 0)
4672248Sraf 			goto out;
4682248Sraf 		cr_flag = DFILE_OPEN;
4692248Sraf 
4702248Sraf 		/* Message queue has not been initialized yet */
4712248Sraf 		if (read(fd, &total_size, sizeof (total_size)) !=
4722248Sraf 		    sizeof (total_size) || total_size == 0) {
4732248Sraf 			errno = ENOENT;
4742248Sraf 			goto out;
4752248Sraf 		}
4762248Sraf 
4772248Sraf 		/* Message queue too big for this process to handle */
4782248Sraf 		if (total_size > SSIZE_MAX) {
4792248Sraf 			errno = EFBIG;
4802248Sraf 			goto out;
4812248Sraf 		}
4822248Sraf 	}
4832248Sraf 
4842248Sraf 	if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) {
4852248Sraf 		errno = ENOMEM;
4862248Sraf 		goto out;
4872248Sraf 	}
4882248Sraf 	cr_flag |= ALLOC_MEM;
4892248Sraf 
4902248Sraf 	if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE,
4912248Sraf 	    MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
4922248Sraf 		goto out;
4932248Sraf 	mqhp = ptr;
4942248Sraf 	cr_flag |= DFILE_MMAP;
4952248Sraf 
4962248Sraf 	/* closing data file */
4972248Sraf 	(void) __close_nc(fd);
4982248Sraf 	cr_flag &= ~DFILE_OPEN;
4992248Sraf 
5002248Sraf 	/*
5012248Sraf 	 * create, unlink, size, mmap, and close description file
5022248Sraf 	 * all for a flag word in anonymous shared memory
5032248Sraf 	 */
5042248Sraf 	if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT,
5052248Sraf 	    0666, &err)) < 0)
5062248Sraf 		goto out;
5072248Sraf 	cr_flag |= DFILE_OPEN;
5082248Sraf 	(void) __pos4obj_unlink(path, MQ_DSCN_TYPE);
5092248Sraf 	if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0)
5102248Sraf 		goto out;
5112248Sraf 
5122248Sraf 	if ((ptr = mmap64(NULL, sizeof (struct mq_dn),
5132248Sraf 	    PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
5142248Sraf 		goto out;
5152248Sraf 	mqdnp = ptr;
5162248Sraf 	cr_flag |= MQDNP_MMAP;
5172248Sraf 
5182248Sraf 	(void) __close_nc(fd);
5192248Sraf 	cr_flag &= ~DFILE_OPEN;
5202248Sraf 
5212248Sraf 	/*
5222248Sraf 	 * we follow the same strategy as filesystem open() routine,
5232248Sraf 	 * where fcntl.h flags are changed to flags defined in file.h.
5242248Sraf 	 */
5252248Sraf 	mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE);
5262248Sraf 	mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK);
5272248Sraf 
5282248Sraf 	/* new message queue requires initialization */
5292248Sraf 	if ((cr_flag & DFILE_CREATE) != 0) {
5302248Sraf 		/* message queue header has to be initialized */
5312248Sraf 		mq_init(mqhp, msgsize, maxmsg);
5322248Sraf 		mqhp->mq_totsize = total_size;
5332248Sraf 	}
5342248Sraf 	mqdp->mqd_mq = mqhp;
5352248Sraf 	mqdp->mqd_mqdn = mqdnp;
5362248Sraf 	mqdp->mqd_magic = MQ_MAGIC;
5372248Sraf 	mqdp->mqd_tcd = NULL;
538*7175Sraf 	mqdp->mqd_ownerdead = 0;
5392248Sraf 	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) {
5402248Sraf 		lmutex_lock(&mq_list_lock);
5412248Sraf 		mqdp->mqd_next = mq_list;
5422248Sraf 		mqdp->mqd_prev = NULL;
5432248Sraf 		if (mq_list)
5442248Sraf 			mq_list->mqd_prev = mqdp;
5452248Sraf 		mq_list = mqdp;
5462248Sraf 		lmutex_unlock(&mq_list_lock);
5472248Sraf 		return ((mqd_t)mqdp);
5482248Sraf 	}
5492248Sraf 
5502248Sraf 	locked = 0;	/* fall into the error case */
5512248Sraf out:
5522248Sraf 	err = errno;
5532248Sraf 	if ((cr_flag & DFILE_OPEN) != 0)
5542248Sraf 		(void) __close_nc(fd);
5552248Sraf 	if ((cr_flag & DFILE_CREATE) != 0)
5562248Sraf 		(void) __pos4obj_unlink(path, MQ_DATA_TYPE);
5572248Sraf 	if ((cr_flag & PFILE_CREATE) != 0)
5582248Sraf 		(void) __pos4obj_unlink(path, MQ_PERM_TYPE);
5592248Sraf 	if ((cr_flag & ALLOC_MEM) != 0)
5602248Sraf 		free((void *)mqdp);
5612248Sraf 	if ((cr_flag & DFILE_MMAP) != 0)
5622248Sraf 		(void) munmap((caddr_t)mqhp, (size_t)total_size);
5632248Sraf 	if ((cr_flag & MQDNP_MMAP) != 0)
5642248Sraf 		(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
5652248Sraf 	if (locked)
5662248Sraf 		(void) __pos4obj_unlock(path, MQ_LOCK_TYPE);
5672248Sraf 	errno = err;
5682248Sraf 	return ((mqd_t)-1);
5692248Sraf }
5702248Sraf 
5712248Sraf static void
mq_close_cleanup(mqdes_t * mqdp)5722248Sraf mq_close_cleanup(mqdes_t *mqdp)
5732248Sraf {
5742248Sraf 	mqhdr_t *mqhp = mqdp->mqd_mq;
5752248Sraf 	struct mq_dn *mqdnp = mqdp->mqd_mqdn;
5762248Sraf 
5772248Sraf 	/* invalidate the descriptor before freeing it */
5782248Sraf 	mqdp->mqd_magic = 0;
579*7175Sraf 	if (!mqdp->mqd_ownerdead)
580*7175Sraf 		(void) mutex_unlock(&mqhp->mq_exclusive);
5812248Sraf 
5822248Sraf 	lmutex_lock(&mq_list_lock);
5832248Sraf 	if (mqdp->mqd_next)
5842248Sraf 		mqdp->mqd_next->mqd_prev = mqdp->mqd_prev;
5852248Sraf 	if (mqdp->mqd_prev)
5862248Sraf 		mqdp->mqd_prev->mqd_next = mqdp->mqd_next;
5872248Sraf 	if (mq_list == mqdp)
5882248Sraf 		mq_list = mqdp->mqd_next;
5892248Sraf 	lmutex_unlock(&mq_list_lock);
5902248Sraf 
5912248Sraf 	free(mqdp);
5922248Sraf 	(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
5932248Sraf 	(void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize);
5942248Sraf }
5952248Sraf 
5962248Sraf int
mq_close(mqd_t mqdes)5976812Sraf mq_close(mqd_t mqdes)
5982248Sraf {
5992248Sraf 	mqdes_t *mqdp = (mqdes_t *)mqdes;
6002248Sraf 	mqhdr_t *mqhp;
6012248Sraf 	thread_communication_data_t *tcdp;
602*7175Sraf 	int error;
6032248Sraf 
6042248Sraf 	if (!mq_is_valid(mqdp)) {
6052248Sraf 		errno = EBADF;
6062248Sraf 		return (-1);
6072248Sraf 	}
6082248Sraf 
6092248Sraf 	mqhp = mqdp->mqd_mq;
610*7175Sraf 	if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
611*7175Sraf 		mqdp->mqd_ownerdead = 1;
612*7175Sraf 		if (error == EOWNERDEAD)
613*7175Sraf 			(void) mutex_unlock(&mqhp->mq_exclusive);
614*7175Sraf 		/* carry on regardless, without holding mq_exclusive */
615*7175Sraf 	}
6162248Sraf 
6172248Sraf 	if (mqhp->mq_des == (uintptr_t)mqdp &&
6182248Sraf 	    mqhp->mq_sigid.sn_pid == getpid()) {
6192248Sraf 		/* notification is set for this descriptor, remove it */
6202248Sraf 		(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
6212248Sraf 		mqhp->mq_ntype = 0;
6222248Sraf 		mqhp->mq_des = 0;
6232248Sraf 	}
6242248Sraf 
6252248Sraf 	pthread_cleanup_push(mq_close_cleanup, mqdp);
6262248Sraf 	if ((tcdp = mqdp->mqd_tcd) != NULL) {
6272248Sraf 		mqdp->mqd_tcd = NULL;
6282248Sraf 		del_sigev_mq(tcdp);	/* possible cancellation point */
6292248Sraf 	}
6302248Sraf 	pthread_cleanup_pop(1);		/* finish in the cleanup handler */
6312248Sraf 
6322248Sraf 	return (0);
6332248Sraf }
6342248Sraf 
6352248Sraf int
mq_unlink(const char * path)6366812Sraf mq_unlink(const char *path)
6372248Sraf {
6382248Sraf 	int err;
6392248Sraf 
6402248Sraf 	if (__pos4obj_check(path) < 0)
6412248Sraf 		return (-1);
6422248Sraf 
6432248Sraf 	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) {
6442248Sraf 		return (-1);
6452248Sraf 	}
6462248Sraf 
6472248Sraf 	err = __pos4obj_unlink(path, MQ_PERM_TYPE);
6482248Sraf 
6492248Sraf 	if (err == 0 || (err == -1 && errno == EEXIST)) {
6502248Sraf 		errno = 0;
6512248Sraf 		err = __pos4obj_unlink(path, MQ_DATA_TYPE);
6522248Sraf 	}
6532248Sraf 
6542248Sraf 	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0)
6552248Sraf 		return (-1);
6562248Sraf 
6572248Sraf 	return (err);
6582248Sraf 
6592248Sraf }
6602248Sraf 
6612248Sraf static int
__mq_timedsend(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio,const timespec_t * timeout,int abs_rel)6622248Sraf __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
6632248Sraf 	uint_t msg_prio, const timespec_t *timeout, int abs_rel)
6642248Sraf {
6652248Sraf 	mqdes_t *mqdp = (mqdes_t *)mqdes;
6662248Sraf 	mqhdr_t *mqhp;
6672248Sraf 	int err;
6682248Sraf 	int notify = 0;
6692248Sraf 
6702248Sraf 	/*
6712248Sraf 	 * sem_*wait() does cancellation, if called.
6722248Sraf 	 * pthread_testcancel() ensures that cancellation takes place if
6732248Sraf 	 * there is a cancellation pending when mq_*send() is called.
6742248Sraf 	 */
6752248Sraf 	pthread_testcancel();
6762248Sraf 
6772248Sraf 	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) {
6782248Sraf 		errno = EBADF;
6792248Sraf 		return (-1);
6802248Sraf 	}
6812248Sraf 
6822248Sraf 	mqhp = mqdp->mqd_mq;
6832248Sraf 
6842248Sraf 	if (msg_prio >= mqhp->mq_maxprio) {
6852248Sraf 		errno = EINVAL;
6862248Sraf 		return (-1);
6872248Sraf 	}
6882248Sraf 	if (msg_len > mqhp->mq_maxsz) {
6892248Sraf 		errno = EMSGSIZE;
6902248Sraf 		return (-1);
6912248Sraf 	}
6922248Sraf 
6932248Sraf 	if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK)
6942248Sraf 		err = sem_trywait(&mqhp->mq_notfull);
6952248Sraf 	else {
6962248Sraf 		/*
6972248Sraf 		 * We might get cancelled here...
6982248Sraf 		 */
6992248Sraf 		if (timeout == NULL)
7002248Sraf 			err = sem_wait(&mqhp->mq_notfull);
7012248Sraf 		else if (abs_rel == ABS_TIME)
7022248Sraf 			err = sem_timedwait(&mqhp->mq_notfull, timeout);
7032248Sraf 		else
7042248Sraf 			err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout);
7052248Sraf 	}
7062248Sraf 	if (err == -1) {
7072248Sraf 		/*
7082248Sraf 		 * errno has been set to EAGAIN / EINTR / ETIMEDOUT
7092248Sraf 		 * by sem_*wait(), so we can just return.
7102248Sraf 		 */
7112248Sraf 		return (-1);
7122248Sraf 	}
7132248Sraf 
7142248Sraf 	/*
7152248Sraf 	 * By the time we're here, we know that we've got the capacity
7162248Sraf 	 * to add to the queue...now acquire the exclusive lock.
7172248Sraf 	 */
718*7175Sraf 	if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
719*7175Sraf 		owner_dead(mqdp, err);
720*7175Sraf 		return (-1);
721*7175Sraf 	}
7222248Sraf 
7232248Sraf 	/*
7242248Sraf 	 * Now determine if we want to kick the notification.  POSIX
7252248Sraf 	 * requires that if a process has registered for notification,
7262248Sraf 	 * we must kick it when the queue makes an empty to non-empty
7272248Sraf 	 * transition, and there are no blocked receivers.  Note that
7282248Sraf 	 * this mechanism does _not_ guarantee that the kicked process
7292248Sraf 	 * will be able to receive a message without blocking;
7302248Sraf 	 * another receiver could intervene in the meantime.  Thus,
7312248Sraf 	 * the notification mechanism is inherently racy; all we can
7322248Sraf 	 * do is hope to minimize the window as much as possible.
7332248Sraf 	 * In general, we want to avoid kicking the notification when
7342248Sraf 	 * there are clearly receivers blocked.  We'll determine if
7352248Sraf 	 * we want to kick the notification before the mq_putmsg(),
7362248Sraf 	 * but the actual signotify() won't be done until the message
7372248Sraf 	 * is on the queue.
7382248Sraf 	 */
7392248Sraf 	if (mqhp->mq_sigid.sn_pid != 0) {
7402248Sraf 		int nmessages, nblocked;
7412248Sraf 
7422248Sraf 		(void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
7432248Sraf 		(void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);
7442248Sraf 
7452248Sraf 		if (nmessages == 0 && nblocked == 0)
7462248Sraf 			notify = 1;
7472248Sraf 	}
7482248Sraf 
7492248Sraf 	mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
7502248Sraf 	(void) sem_post(&mqhp->mq_notempty);
7512248Sraf 
7522248Sraf 	if (notify) {
7532248Sraf 		/* notify and also delete the registration */
754*7175Sraf 		do_notify(mqhp);
7552248Sraf 	}
7562248Sraf 
7572248Sraf 	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg));
7582248Sraf 	(void) mutex_unlock(&mqhp->mq_exclusive);
7592248Sraf 
7602248Sraf 	return (0);
7612248Sraf }
7622248Sraf 
7632248Sraf int
mq_send(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio)7646812Sraf mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio)
7652248Sraf {
7662248Sraf 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
7675891Sraf 	    NULL, ABS_TIME));
7682248Sraf }
7692248Sraf 
7702248Sraf int
mq_timedsend(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio,const timespec_t * abs_timeout)7716812Sraf mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
7722248Sraf 	uint_t msg_prio, const timespec_t *abs_timeout)
7732248Sraf {
7742248Sraf 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
7755891Sraf 	    abs_timeout, ABS_TIME));
7762248Sraf }
7772248Sraf 
7782248Sraf int
mq_reltimedsend_np(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio,const timespec_t * rel_timeout)7796812Sraf mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
7802248Sraf 	uint_t msg_prio, const timespec_t *rel_timeout)
7812248Sraf {
7822248Sraf 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
7835891Sraf 	    rel_timeout, REL_TIME));
7842248Sraf }
7852248Sraf 
7862248Sraf static void
decrement_rblocked(mqhdr_t * mqhp)7872248Sraf decrement_rblocked(mqhdr_t *mqhp)
7882248Sraf {
7895891Sraf 	int cancel_state;
7902248Sraf 
7915891Sraf 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
7922248Sraf 	while (sem_wait(&mqhp->mq_rblocked) == -1)
7932248Sraf 		continue;
7945891Sraf 	(void) pthread_setcancelstate(cancel_state, NULL);
7952248Sraf }
7962248Sraf 
7972248Sraf static ssize_t
__mq_timedreceive(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio,const timespec_t * timeout,int abs_rel)7982248Sraf __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
7992248Sraf 	uint_t *msg_prio, const timespec_t *timeout, int abs_rel)
8002248Sraf {
8012248Sraf 	mqdes_t *mqdp = (mqdes_t *)mqdes;
8022248Sraf 	mqhdr_t *mqhp;
8032248Sraf 	ssize_t	msg_size;
8042248Sraf 	int err;
8052248Sraf 
8062248Sraf 	/*
8072248Sraf 	 * sem_*wait() does cancellation, if called.
8082248Sraf 	 * pthread_testcancel() ensures that cancellation takes place if
8092248Sraf 	 * there is a cancellation pending when mq_*receive() is called.
8102248Sraf 	 */
8112248Sraf 	pthread_testcancel();
8122248Sraf 
8132248Sraf 	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) {
8142248Sraf 		errno = EBADF;
8152248Sraf 		return (ssize_t)(-1);
8162248Sraf 	}
8172248Sraf 
8182248Sraf 	mqhp = mqdp->mqd_mq;
8192248Sraf 
8202248Sraf 	if (msg_len < mqhp->mq_maxsz) {
8212248Sraf 		errno = EMSGSIZE;
8222248Sraf 		return (ssize_t)(-1);
8232248Sraf 	}
8242248Sraf 
8252248Sraf 	/*
8262248Sraf 	 * The semaphoring scheme for mq_[timed]receive is a little hairy
8272248Sraf 	 * thanks to POSIX.1b's arcane notification mechanism.  First,
8282248Sraf 	 * we try to take the common case and do a sem_trywait().
8292248Sraf 	 * If that doesn't work, and O_NONBLOCK hasn't been set,
8302248Sraf 	 * then note that we're going to sleep by incrementing the rblocked
8312248Sraf 	 * semaphore.  We decrement that semaphore after waking up.
8322248Sraf 	 */
8332248Sraf 	if (sem_trywait(&mqhp->mq_notempty) == -1) {
8342248Sraf 		if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) {
8352248Sraf 			/*
8362248Sraf 			 * errno has been set to EAGAIN or EINTR by
8372248Sraf 			 * sem_trywait(), so we can just return.
8382248Sraf 			 */
8392248Sraf 			return (-1);
8402248Sraf 		}
8412248Sraf 		/*
8422248Sraf 		 * If we're here, then we're probably going to block...
8432248Sraf 		 * increment the rblocked semaphore.  If we get
8442248Sraf 		 * cancelled, decrement_rblocked() will decrement it.
8452248Sraf 		 */
8462248Sraf 		(void) sem_post(&mqhp->mq_rblocked);
8472248Sraf 
8482248Sraf 		pthread_cleanup_push(decrement_rblocked, mqhp);
8492248Sraf 		if (timeout == NULL)
8502248Sraf 			err = sem_wait(&mqhp->mq_notempty);
8512248Sraf 		else if (abs_rel == ABS_TIME)
8522248Sraf 			err = sem_timedwait(&mqhp->mq_notempty, timeout);
8532248Sraf 		else
8542248Sraf 			err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout);
8552248Sraf 		pthread_cleanup_pop(1);
8562248Sraf 
8572248Sraf 		if (err == -1) {
8582248Sraf 			/*
8592248Sraf 			 * We took a signal or timeout while waiting
8602248Sraf 			 * on mq_notempty...
8612248Sraf 			 */
8622248Sraf 			return (-1);
8632248Sraf 		}
8642248Sraf 	}
8652248Sraf 
866*7175Sraf 	if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
867*7175Sraf 		owner_dead(mqdp, err);
868*7175Sraf 		return (-1);
869*7175Sraf 	}
8702248Sraf 	msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio);
8712248Sraf 	(void) sem_post(&mqhp->mq_notfull);
8722248Sraf 	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg));
8732248Sraf 	(void) mutex_unlock(&mqhp->mq_exclusive);
8742248Sraf 
8752248Sraf 	return (msg_size);
8762248Sraf }
8772248Sraf 
8782248Sraf ssize_t
mq_receive(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio)8796812Sraf mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio)
8802248Sraf {
8812248Sraf 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
8825891Sraf 	    NULL, ABS_TIME));
8832248Sraf }
8842248Sraf 
8852248Sraf ssize_t
mq_timedreceive(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio,const timespec_t * abs_timeout)8866812Sraf mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
8872248Sraf 	uint_t *msg_prio, const timespec_t *abs_timeout)
8882248Sraf {
8892248Sraf 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
8905891Sraf 	    abs_timeout, ABS_TIME));
8912248Sraf }
8922248Sraf 
8932248Sraf ssize_t
mq_reltimedreceive_np(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio,const timespec_t * rel_timeout)8946812Sraf mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len,
8952248Sraf 	uint_t *msg_prio, const timespec_t *rel_timeout)
8962248Sraf {
8972248Sraf 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
8985891Sraf 	    rel_timeout, REL_TIME));
8992248Sraf }
9002248Sraf 
9012248Sraf /*
9026812Sraf  * Only used below, in mq_notify().
9032248Sraf  * We already have a spawner thread.
9042248Sraf  * Verify that the attributes match; cancel it if necessary.
9052248Sraf  */
9062248Sraf static int
cancel_if_necessary(thread_communication_data_t * tcdp,const struct sigevent * sigevp)9072248Sraf cancel_if_necessary(thread_communication_data_t *tcdp,
9082248Sraf 	const struct sigevent *sigevp)
9092248Sraf {
9106812Sraf 	int do_cancel = !pthread_attr_equal(tcdp->tcd_attrp,
9115891Sraf 	    sigevp->sigev_notify_attributes);
9122248Sraf 
9132248Sraf 	if (do_cancel) {
9142248Sraf 		/*
9152248Sraf 		 * Attributes don't match, cancel the spawner thread.
9162248Sraf 		 */
9172248Sraf 		(void) pthread_cancel(tcdp->tcd_server_id);
9182248Sraf 	} else {
9192248Sraf 		/*
9202248Sraf 		 * Reuse the existing spawner thread with possibly
9212248Sraf 		 * changed notification function and value.
9222248Sraf 		 */
9232248Sraf 		tcdp->tcd_notif.sigev_notify = SIGEV_THREAD;
9242248Sraf 		tcdp->tcd_notif.sigev_signo = 0;
9252248Sraf 		tcdp->tcd_notif.sigev_value = sigevp->sigev_value;
9262248Sraf 		tcdp->tcd_notif.sigev_notify_function =
9275891Sraf 		    sigevp->sigev_notify_function;
9282248Sraf 	}
9292248Sraf 
9302248Sraf 	return (do_cancel);
9312248Sraf }
9322248Sraf 
9332248Sraf int
mq_notify(mqd_t mqdes,const struct sigevent * sigevp)9346812Sraf mq_notify(mqd_t mqdes, const struct sigevent *sigevp)
9352248Sraf {
9362248Sraf 	mqdes_t *mqdp = (mqdes_t *)mqdes;
9372248Sraf 	mqhdr_t *mqhp;
9382248Sraf 	thread_communication_data_t *tcdp;
9392248Sraf 	siginfo_t mq_siginfo;
9402248Sraf 	struct sigevent sigevent;
9412248Sraf 	struct stat64 statb;
9422248Sraf 	port_notify_t *pn;
9432248Sraf 	void *userval;
9442248Sraf 	int rval = -1;
9452248Sraf 	int ntype;
9462248Sraf 	int port;
947*7175Sraf 	int error;
9482248Sraf 
9492248Sraf 	if (!mq_is_valid(mqdp)) {
9502248Sraf 		errno = EBADF;
9512248Sraf 		return (-1);
9522248Sraf 	}
9532248Sraf 
9542248Sraf 	mqhp = mqdp->mqd_mq;
9552248Sraf 
956*7175Sraf 	if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
957*7175Sraf 		mqdp->mqd_ownerdead = 1;
958*7175Sraf 		sigevp = NULL;
959*7175Sraf 		if (error == EOWNERDEAD)
960*7175Sraf 			(void) mutex_unlock(&mqhp->mq_exclusive);
961*7175Sraf 		/* carry on regardless, without holding mq_exclusive */
962*7175Sraf 	}
9632248Sraf 
9642248Sraf 	if (sigevp == NULL) {		/* remove notification */
9652248Sraf 		if (mqhp->mq_des == (uintptr_t)mqdp &&
9662248Sraf 		    mqhp->mq_sigid.sn_pid == getpid()) {
9672248Sraf 			/* notification is set for this descriptor, remove it */
9682248Sraf 			(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
9692248Sraf 			if ((tcdp = mqdp->mqd_tcd) != NULL) {
9702248Sraf 				sig_mutex_lock(&tcdp->tcd_lock);
9712248Sraf 				if (tcdp->tcd_msg_enabled) {
9722248Sraf 					/* cancel the spawner thread */
9732248Sraf 					tcdp = mqdp->mqd_tcd;
9742248Sraf 					mqdp->mqd_tcd = NULL;
9752248Sraf 					(void) pthread_cancel(
9762248Sraf 					    tcdp->tcd_server_id);
9772248Sraf 				}
9782248Sraf 				sig_mutex_unlock(&tcdp->tcd_lock);
9792248Sraf 			}
9802248Sraf 			mqhp->mq_ntype = 0;
9812248Sraf 			mqhp->mq_des = 0;
9822248Sraf 		} else {
9832248Sraf 			/* notification is not set for this descriptor */
9842248Sraf 			errno = EBUSY;
9852248Sraf 			goto bad;
9862248Sraf 		}
9872248Sraf 	} else {		/* register notification with this process */
9882248Sraf 		switch (ntype = sigevp->sigev_notify) {
9892248Sraf 		case SIGEV_THREAD:
9902248Sraf 			userval = sigevp->sigev_value.sival_ptr;
9912248Sraf 			port = -1;
9922248Sraf 			break;
9932248Sraf 		case SIGEV_PORT:
9942248Sraf 			pn = sigevp->sigev_value.sival_ptr;
9952248Sraf 			userval = pn->portnfy_user;
9962248Sraf 			port = pn->portnfy_port;
9972248Sraf 			if (fstat64(port, &statb) != 0 ||
9982248Sraf 			    !S_ISPORT(statb.st_mode)) {
9992248Sraf 				errno = EBADF;
10002248Sraf 				goto bad;
10012248Sraf 			}
10022248Sraf 			(void) memset(&sigevent, 0, sizeof (sigevent));
10032248Sraf 			sigevent.sigev_notify = SIGEV_PORT;
10042248Sraf 			sigevp = &sigevent;
10052248Sraf 			break;
10062248Sraf 		}
10072248Sraf 		switch (ntype) {
10082248Sraf 		case SIGEV_NONE:
10092248Sraf 			mq_siginfo.si_signo = 0;
10102248Sraf 			mq_siginfo.si_code = SI_MESGQ;
10112248Sraf 			break;
10122248Sraf 		case SIGEV_SIGNAL:
10132248Sraf 			mq_siginfo.si_signo = sigevp->sigev_signo;
10142248Sraf 			mq_siginfo.si_value = sigevp->sigev_value;
10152248Sraf 			mq_siginfo.si_code = SI_MESGQ;
10162248Sraf 			break;
10172248Sraf 		case SIGEV_THREAD:
10182248Sraf 			if ((tcdp = mqdp->mqd_tcd) != NULL &&
10192248Sraf 			    cancel_if_necessary(tcdp, sigevp))
10202248Sraf 				mqdp->mqd_tcd = NULL;
10212248Sraf 			/* FALLTHROUGH */
10222248Sraf 		case SIGEV_PORT:
10232248Sraf 			if ((tcdp = mqdp->mqd_tcd) == NULL) {
10242248Sraf 				/* we must create a spawner thread */
10252248Sraf 				tcdp = setup_sigev_handler(sigevp, MQ);
10262248Sraf 				if (tcdp == NULL) {
10272248Sraf 					errno = EBADF;
10282248Sraf 					goto bad;
10292248Sraf 				}
10302248Sraf 				tcdp->tcd_msg_enabled = 0;
10312248Sraf 				tcdp->tcd_msg_closing = 0;
10322248Sraf 				tcdp->tcd_msg_avail = &mqhp->mq_spawner;
10332248Sraf 				if (launch_spawner(tcdp) != 0) {
10342248Sraf 					free_sigev_handler(tcdp);
10352248Sraf 					goto bad;
10362248Sraf 				}
10372248Sraf 				mqdp->mqd_tcd = tcdp;
10382248Sraf 			}
10392248Sraf 			mq_siginfo.si_signo = 0;
10402248Sraf 			mq_siginfo.si_code = SI_MESGQ;
10412248Sraf 			break;
10422248Sraf 		default:
10432248Sraf 			errno = EINVAL;
10442248Sraf 			goto bad;
10452248Sraf 		}
10462248Sraf 
10472248Sraf 		/* register notification */
10482248Sraf 		if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0)
10492248Sraf 			goto bad;
10502248Sraf 		mqhp->mq_ntype = ntype;
10512248Sraf 		mqhp->mq_des = (uintptr_t)mqdp;
10522248Sraf 		switch (ntype) {
10532248Sraf 		case SIGEV_THREAD:
10542248Sraf 		case SIGEV_PORT:
10552248Sraf 			tcdp->tcd_port = port;
10562248Sraf 			tcdp->tcd_msg_object = mqdp;
10572248Sraf 			tcdp->tcd_msg_userval = userval;
10582248Sraf 			sig_mutex_lock(&tcdp->tcd_lock);
10592248Sraf 			tcdp->tcd_msg_enabled = ntype;
10602248Sraf 			sig_mutex_unlock(&tcdp->tcd_lock);
10612248Sraf 			(void) cond_broadcast(&tcdp->tcd_cv);
10622248Sraf 			break;
10632248Sraf 		}
10642248Sraf 	}
10652248Sraf 
10662248Sraf 	rval = 0;	/* success */
10672248Sraf bad:
1068*7175Sraf 	if (error == 0) {
1069*7175Sraf 		(void) mutex_unlock(&mqhp->mq_exclusive);
1070*7175Sraf 	} else {
1071*7175Sraf 		errno = EBADMSG;
1072*7175Sraf 		rval = -1;
1073*7175Sraf 	}
10742248Sraf 	return (rval);
10752248Sraf }
10762248Sraf 
10772248Sraf int
mq_setattr(mqd_t mqdes,const struct mq_attr * mqstat,struct mq_attr * omqstat)10786812Sraf mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
10792248Sraf {
10802248Sraf 	mqdes_t *mqdp = (mqdes_t *)mqdes;
10812248Sraf 	mqhdr_t *mqhp;
10822248Sraf 	uint_t	flag = 0;
10832248Sraf 
10842248Sraf 	if (!mq_is_valid(mqdp)) {
10852248Sraf 		errno = EBADF;
10862248Sraf 		return (-1);
10872248Sraf 	}
10882248Sraf 
10892248Sraf 	/* store current attributes */
10902248Sraf 	if (omqstat != NULL) {
10912248Sraf 		int	count;
10922248Sraf 
10932248Sraf 		mqhp = mqdp->mqd_mq;
10942248Sraf 		omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
10952248Sraf 		omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
10962248Sraf 		omqstat->mq_msgsize = (long)mqhp->mq_maxsz;
10972248Sraf 		(void) sem_getvalue(&mqhp->mq_notempty, &count);
10982248Sraf 		omqstat->mq_curmsgs = count;
10992248Sraf 	}
11002248Sraf 
11012248Sraf 	/* set description attributes */
11022248Sraf 	if ((mqstat->mq_flags & O_NONBLOCK) != 0)
11032248Sraf 		flag = FNONBLOCK;
11042248Sraf 	mqdp->mqd_mqdn->mqdn_flags = flag;
11052248Sraf 
11062248Sraf 	return (0);
11072248Sraf }
11082248Sraf 
11092248Sraf int
mq_getattr(mqd_t mqdes,struct mq_attr * mqstat)11106812Sraf mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
11112248Sraf {
11122248Sraf 	mqdes_t *mqdp = (mqdes_t *)mqdes;
11132248Sraf 	mqhdr_t *mqhp;
11142248Sraf 	int count;
11152248Sraf 
11162248Sraf 	if (!mq_is_valid(mqdp)) {
11172248Sraf 		errno = EBADF;
11182248Sraf 		return (-1);
11192248Sraf 	}
11202248Sraf 
11212248Sraf 	mqhp = mqdp->mqd_mq;
11222248Sraf 
11232248Sraf 	mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
11242248Sraf 	mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
11252248Sraf 	mqstat->mq_msgsize = (long)mqhp->mq_maxsz;
11262248Sraf 	(void) sem_getvalue(&mqhp->mq_notempty, &count);
11272248Sraf 	mqstat->mq_curmsgs = count;
11282248Sraf 	return (0);
11292248Sraf }
11302248Sraf 
11312248Sraf /*
11322248Sraf  * Cleanup after fork1() in the child process.
11332248Sraf  */
11342248Sraf void
postfork1_child_sigev_mq(void)11352248Sraf postfork1_child_sigev_mq(void)
11362248Sraf {
11372248Sraf 	thread_communication_data_t *tcdp;
11382248Sraf 	mqdes_t *mqdp;
11392248Sraf 
11402248Sraf 	for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) {
11412248Sraf 		if ((tcdp = mqdp->mqd_tcd) != NULL) {
11422248Sraf 			mqdp->mqd_tcd = NULL;
11432248Sraf 			tcd_teardown(tcdp);
11442248Sraf 		}
11452248Sraf 	}
11462248Sraf }
1147