12248Sraf /*
22248Sraf * CDDL HEADER START
32248Sraf *
42248Sraf * The contents of this file are subject to the terms of the
52248Sraf * Common Development and Distribution License (the "License").
62248Sraf * You may not use this file except in compliance with the License.
72248Sraf *
82248Sraf * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
92248Sraf * or http://www.opensolaris.org/os/licensing.
102248Sraf * See the License for the specific language governing permissions
112248Sraf * and limitations under the License.
122248Sraf *
132248Sraf * When distributing Covered Code, include this CDDL HEADER in each
142248Sraf * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
152248Sraf * If applicable, add the following below this CDDL HEADER, with the
162248Sraf * fields enclosed by brackets "[]" replaced with your own identifying
172248Sraf * information: Portions Copyright [yyyy] [name of copyright owner]
182248Sraf *
192248Sraf * CDDL HEADER END
202248Sraf */
212248Sraf
222248Sraf /*
235891Sraf * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
242248Sraf * Use is subject to license terms.
252248Sraf */
262248Sraf
272248Sraf #pragma ident "%Z%%M% %I% %E% SMI"
282248Sraf
296812Sraf #include "lint.h"
302248Sraf #include "mtlib.h"
312248Sraf #define _KMEMUSER
322248Sraf #include <sys/param.h> /* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */
332248Sraf #undef _KMEMUSER
342248Sraf #include <mqueue.h>
352248Sraf #include <sys/types.h>
362248Sraf #include <sys/file.h>
372248Sraf #include <sys/mman.h>
382248Sraf #include <errno.h>
392248Sraf #include <stdarg.h>
402248Sraf #include <limits.h>
412248Sraf #include <pthread.h>
422248Sraf #include <assert.h>
432248Sraf #include <string.h>
442248Sraf #include <unistd.h>
452248Sraf #include <stdlib.h>
462248Sraf #include <sys/stat.h>
472248Sraf #include <inttypes.h>
482248Sraf #include "sigev_thread.h"
492248Sraf #include "pos4obj.h"
502248Sraf
512248Sraf /*
522248Sraf * Default values per message queue
532248Sraf */
542248Sraf #define MQ_MAXMSG 128
552248Sraf #define MQ_MAXSIZE 1024
562248Sraf
572248Sraf #define MQ_MAGIC 0x4d534751 /* "MSGQ" */
582248Sraf
592248Sraf /*
602248Sraf * Message header which is part of messages in link list
612248Sraf */
622248Sraf typedef struct {
632248Sraf uint64_t msg_next; /* offset of next message in the link */
642248Sraf uint64_t msg_len; /* length of the message */
652248Sraf } msghdr_t;
662248Sraf
672248Sraf /*
682248Sraf * message queue description
692248Sraf */
702248Sraf struct mq_dn {
712248Sraf size_t mqdn_flags; /* open description flags */
722248Sraf };
732248Sraf
742248Sraf /*
752248Sraf * message queue descriptor structure
762248Sraf */
772248Sraf typedef struct mq_des {
782248Sraf struct mq_des *mqd_next; /* list of all open mq descriptors, */
792248Sraf struct mq_des *mqd_prev; /* needed for fork-safety */
802248Sraf int mqd_magic; /* magic # to identify mq_des */
812248Sraf int mqd_flags; /* operation flag per open */
822248Sraf struct mq_header *mqd_mq; /* address pointer of message Q */
832248Sraf struct mq_dn *mqd_mqdn; /* open description */
842248Sraf thread_communication_data_t *mqd_tcd; /* SIGEV_THREAD notification */
85*7175Sraf int mqd_ownerdead; /* mq_exclusive is inconsistent */
862248Sraf } mqdes_t;
872248Sraf
882248Sraf /*
892248Sraf * message queue common header, part of the mmap()ed file.
902248Sraf * Since message queues may be shared between 32- and 64-bit processes,
912248Sraf * care must be taken to make sure that the elements of this structure
922248Sraf * are identical for both _LP64 and _ILP32 cases.
932248Sraf */
942248Sraf typedef struct mq_header {
952248Sraf /* first field must be mq_totsize, DO NOT insert before this */
962248Sraf int64_t mq_totsize; /* total size of the Queue */
972248Sraf int64_t mq_maxsz; /* max size of each message */
982248Sraf uint32_t mq_maxmsg; /* max messages in the queue */
992248Sraf uint32_t mq_maxprio; /* maximum mqueue priority */
1002248Sraf uint32_t mq_curmaxprio; /* current maximum MQ priority */
1012248Sraf uint32_t mq_mask; /* priority bitmask */
1022248Sraf uint64_t mq_freep; /* free message's head pointer */
1032248Sraf uint64_t mq_headpp; /* pointer to head pointers */
1042248Sraf uint64_t mq_tailpp; /* pointer to tail pointers */
1052248Sraf signotify_id_t mq_sigid; /* notification id (3 int's) */
1062248Sraf uint32_t mq_ntype; /* notification type (SIGEV_*) */
1072248Sraf uint64_t mq_des; /* pointer to msg Q descriptor */
1082248Sraf mutex_t mq_exclusive; /* acquire for exclusive access */
1092248Sraf sem_t mq_rblocked; /* number of processes rblocked */
1102248Sraf sem_t mq_notfull; /* mq_send()'s block on this */
1112248Sraf sem_t mq_notempty; /* mq_receive()'s block on this */
1122248Sraf sem_t mq_spawner; /* spawner thread blocks on this */
1132248Sraf } mqhdr_t;
1142248Sraf
1152248Sraf /*
1162248Sraf * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit".
1172248Sraf * If this assumption is somehow invalidated, mq_open() needs to be changed
1182248Sraf * back to the old version which kept a count and enforced a limit.
1192248Sraf * We make sure that this is pointed out to those changing <sys/param.h>
1202248Sraf * by checking _MQ_OPEN_MAX at compile time.
1212248Sraf */
1222248Sraf #if _MQ_OPEN_MAX != -1
1232248Sraf #error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing."
1242248Sraf #endif
1252248Sraf
1262248Sraf #define MQ_ALIGNSIZE 8 /* 64-bit alignment */
1272248Sraf
1282248Sraf #ifdef DEBUG
1292248Sraf #define MQ_ASSERT(x) assert(x);
1302248Sraf
1312248Sraf #define MQ_ASSERT_PTR(_m, _p) \
1322248Sraf assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \
1332248Sraf !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \
1342248Sraf _m->mq_totsize));
1352248Sraf
1362248Sraf #define MQ_ASSERT_SEMVAL_LEQ(sem, val) { \
1372248Sraf int _val; \
1382248Sraf (void) sem_getvalue((sem), &_val); \
1392248Sraf assert((_val) <= val); }
1402248Sraf #else
1412248Sraf #define MQ_ASSERT(x)
1422248Sraf #define MQ_ASSERT_PTR(_m, _p)
1432248Sraf #define MQ_ASSERT_SEMVAL_LEQ(sem, val)
1442248Sraf #endif
1452248Sraf
1462248Sraf #define MQ_PTR(m, n) ((msghdr_t *)((uintptr_t)m + (uintptr_t)n))
1472248Sraf #define HEAD_PTR(m, n) ((uint64_t *)((uintptr_t)m + \
1482248Sraf (uintptr_t)m->mq_headpp + n * sizeof (uint64_t)))
1492248Sraf #define TAIL_PTR(m, n) ((uint64_t *)((uintptr_t)m + \
1502248Sraf (uintptr_t)m->mq_tailpp + n * sizeof (uint64_t)))
1512248Sraf
1522248Sraf #define MQ_RESERVED ((mqdes_t *)-1)
1532248Sraf
1542248Sraf #define ABS_TIME 0
1552248Sraf #define REL_TIME 1
1562248Sraf
1572248Sraf static mutex_t mq_list_lock = DEFAULTMUTEX;
1582248Sraf static mqdes_t *mq_list = NULL;
1592248Sraf
1602248Sraf extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id);
1612248Sraf
1622248Sraf static int
mq_is_valid(mqdes_t * mqdp)1632248Sraf mq_is_valid(mqdes_t *mqdp)
1642248Sraf {
1652248Sraf /*
1662248Sraf * Any use of a message queue after it was closed is
1672248Sraf * undefined. But the standard strongly favours EBADF
1682248Sraf * returns. Before we dereference which could be fatal,
1692248Sraf * we first do some pointer sanity checks.
1702248Sraf */
1712248Sraf if (mqdp != NULL && mqdp != MQ_RESERVED &&
1722248Sraf ((uintptr_t)mqdp & 0x7) == 0) {
1732248Sraf return (mqdp->mqd_magic == MQ_MAGIC);
1742248Sraf }
1752248Sraf
1762248Sraf return (0);
1772248Sraf }
1782248Sraf
1792248Sraf static void
mq_init(mqhdr_t * mqhp,size_t msgsize,ssize_t maxmsg)1802248Sraf mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg)
1812248Sraf {
1822248Sraf int i;
1832248Sraf uint64_t temp;
1842248Sraf uint64_t currentp;
1852248Sraf uint64_t nextp;
1862248Sraf
1872248Sraf /*
1882248Sraf * We only need to initialize the non-zero fields. The use of
1892248Sraf * ftruncate() on the message queue file assures that the
190*7175Sraf * pages will be zero-filled.
1912248Sraf */
192*7175Sraf (void) mutex_init(&mqhp->mq_exclusive,
193*7175Sraf USYNC_PROCESS | LOCK_ROBUST, NULL);
1942248Sraf (void) sem_init(&mqhp->mq_rblocked, 1, 0);
1952248Sraf (void) sem_init(&mqhp->mq_notempty, 1, 0);
1962248Sraf (void) sem_init(&mqhp->mq_spawner, 1, 0);
1972248Sraf (void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg);
1982248Sraf
1992248Sraf mqhp->mq_maxsz = msgsize;
2002248Sraf mqhp->mq_maxmsg = maxmsg;
2012248Sraf
2022248Sraf /*
2032248Sraf * As of this writing (1997), there are 32 message queue priorities.
2042248Sraf * If this is to change, then the size of the mq_mask will
2052248Sraf * also have to change. If DEBUG is defined, assert that
2062248Sraf * _MQ_PRIO_MAX hasn't changed.
2072248Sraf */
2082248Sraf mqhp->mq_maxprio = _MQ_PRIO_MAX;
2092248Sraf #if defined(DEBUG)
2102248Sraf /* LINTED always true */
2112248Sraf MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX);
2122248Sraf #endif
2132248Sraf
2142248Sraf /*
2152248Sraf * Since the message queue can be mapped into different
2162248Sraf * virtual address ranges by different processes, we don't
2172248Sraf * keep track of pointers, only offsets into the shared region.
2182248Sraf */
2192248Sraf mqhp->mq_headpp = sizeof (mqhdr_t);
2202248Sraf mqhp->mq_tailpp = mqhp->mq_headpp +
2215891Sraf mqhp->mq_maxprio * sizeof (uint64_t);
2222248Sraf mqhp->mq_freep = mqhp->mq_tailpp +
2235891Sraf mqhp->mq_maxprio * sizeof (uint64_t);
2242248Sraf
2252248Sraf currentp = mqhp->mq_freep;
2262248Sraf MQ_PTR(mqhp, currentp)->msg_next = 0;
2272248Sraf
2282248Sraf temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
2292248Sraf for (i = 1; i < mqhp->mq_maxmsg; i++) {
2302248Sraf nextp = currentp + sizeof (msghdr_t) + temp;
2312248Sraf MQ_PTR(mqhp, currentp)->msg_next = nextp;
2322248Sraf MQ_PTR(mqhp, nextp)->msg_next = 0;
2332248Sraf currentp = nextp;
2342248Sraf }
2352248Sraf }
2362248Sraf
2372248Sraf static size_t
mq_getmsg(mqhdr_t * mqhp,char * msgp,uint_t * msg_prio)2382248Sraf mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio)
2392248Sraf {
2402248Sraf uint64_t currentp;
2412248Sraf msghdr_t *curbuf;
2422248Sraf uint64_t *headpp;
2432248Sraf uint64_t *tailpp;
2442248Sraf
2452248Sraf MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
2462248Sraf
2472248Sraf /*
2482248Sraf * Get the head and tail pointers for the queue of maximum
2492248Sraf * priority. We shouldn't be here unless there is a message for
2502248Sraf * us, so it's fair to assert that both the head and tail
2512248Sraf * pointers are non-NULL.
2522248Sraf */
2532248Sraf headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio);
2542248Sraf tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio);
2552248Sraf
2562248Sraf if (msg_prio != NULL)
2572248Sraf *msg_prio = mqhp->mq_curmaxprio;
2582248Sraf
2592248Sraf currentp = *headpp;
2602248Sraf MQ_ASSERT_PTR(mqhp, currentp);
2612248Sraf curbuf = MQ_PTR(mqhp, currentp);
2622248Sraf
2632248Sraf if ((*headpp = curbuf->msg_next) == NULL) {
2642248Sraf /*
2652248Sraf * We just nuked the last message in this priority's queue.
2662248Sraf * Twiddle this priority's bit, and then find the next bit
2672248Sraf * tipped.
2682248Sraf */
2692248Sraf uint_t prio = mqhp->mq_curmaxprio;
2702248Sraf
2712248Sraf mqhp->mq_mask &= ~(1u << prio);
2722248Sraf
2732248Sraf for (; prio != 0; prio--)
2742248Sraf if (mqhp->mq_mask & (1u << prio))
2752248Sraf break;
2762248Sraf mqhp->mq_curmaxprio = prio;
2772248Sraf
2782248Sraf *tailpp = NULL;
2792248Sraf }
2802248Sraf
2812248Sraf /*
2822248Sraf * Copy the message, and put the buffer back on the free list.
2832248Sraf */
2842248Sraf (void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len);
2852248Sraf curbuf->msg_next = mqhp->mq_freep;
2862248Sraf mqhp->mq_freep = currentp;
2872248Sraf
2882248Sraf return (curbuf->msg_len);
2892248Sraf }
2902248Sraf
2912248Sraf
2922248Sraf static void
mq_putmsg(mqhdr_t * mqhp,const char * msgp,ssize_t len,uint_t prio)2932248Sraf mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio)
2942248Sraf {
2952248Sraf uint64_t currentp;
2962248Sraf msghdr_t *curbuf;
2972248Sraf uint64_t *headpp;
2982248Sraf uint64_t *tailpp;
2992248Sraf
3002248Sraf MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
3012248Sraf
3022248Sraf /*
3032248Sraf * Grab a free message block, and link it in. We shouldn't
3042248Sraf * be here unless there is room in the queue for us; it's
3052248Sraf * fair to assert that the free pointer is non-NULL.
3062248Sraf */
3072248Sraf currentp = mqhp->mq_freep;
3082248Sraf MQ_ASSERT_PTR(mqhp, currentp);
3092248Sraf curbuf = MQ_PTR(mqhp, currentp);
3102248Sraf
3112248Sraf /*
3122248Sraf * Remove a message from the free list, and copy in the new contents.
3132248Sraf */
3142248Sraf mqhp->mq_freep = curbuf->msg_next;
3152248Sraf curbuf->msg_next = NULL;
3162248Sraf (void) memcpy((char *)&curbuf[1], msgp, len);
3172248Sraf curbuf->msg_len = len;
3182248Sraf
3192248Sraf headpp = HEAD_PTR(mqhp, prio);
3202248Sraf tailpp = TAIL_PTR(mqhp, prio);
3212248Sraf
3222248Sraf if (*tailpp == 0) {
3232248Sraf /*
3242248Sraf * This is the first message on this queue. Set the
3252248Sraf * head and tail pointers, and tip the appropriate bit
3262248Sraf * in the priority mask.
3272248Sraf */
3282248Sraf *headpp = currentp;
3292248Sraf *tailpp = currentp;
3302248Sraf mqhp->mq_mask |= (1u << prio);
3312248Sraf if (prio > mqhp->mq_curmaxprio)
3322248Sraf mqhp->mq_curmaxprio = prio;
3332248Sraf } else {
3342248Sraf MQ_ASSERT_PTR(mqhp, *tailpp);
3352248Sraf MQ_PTR(mqhp, *tailpp)->msg_next = currentp;
3362248Sraf *tailpp = currentp;
3372248Sraf }
3382248Sraf }
3392248Sraf
340*7175Sraf /*
341*7175Sraf * Send a notification and also delete the registration.
342*7175Sraf */
343*7175Sraf static void
do_notify(mqhdr_t * mqhp)344*7175Sraf do_notify(mqhdr_t *mqhp)
345*7175Sraf {
346*7175Sraf (void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
347*7175Sraf if (mqhp->mq_ntype == SIGEV_THREAD ||
348*7175Sraf mqhp->mq_ntype == SIGEV_PORT)
349*7175Sraf (void) sem_post(&mqhp->mq_spawner);
350*7175Sraf mqhp->mq_ntype = 0;
351*7175Sraf mqhp->mq_des = 0;
352*7175Sraf }
353*7175Sraf
354*7175Sraf /*
355*7175Sraf * Called when the mq_exclusive lock draws EOWNERDEAD or ENOTRECOVERABLE.
356*7175Sraf * Wake up anyone waiting on mq_*send() or mq_*receive() and ensure that
357*7175Sraf * they fail with errno == EBADMSG. Trigger any registered notification.
358*7175Sraf */
359*7175Sraf static void
owner_dead(mqdes_t * mqdp,int error)360*7175Sraf owner_dead(mqdes_t *mqdp, int error)
361*7175Sraf {
362*7175Sraf mqhdr_t *mqhp = mqdp->mqd_mq;
363*7175Sraf
364*7175Sraf mqdp->mqd_ownerdead = 1;
365*7175Sraf (void) sem_post(&mqhp->mq_notfull);
366*7175Sraf (void) sem_post(&mqhp->mq_notempty);
367*7175Sraf if (error == EOWNERDEAD) {
368*7175Sraf if (mqhp->mq_sigid.sn_pid != 0)
369*7175Sraf do_notify(mqhp);
370*7175Sraf (void) mutex_unlock(&mqhp->mq_exclusive);
371*7175Sraf }
372*7175Sraf errno = EBADMSG;
373*7175Sraf }
374*7175Sraf
3752248Sraf mqd_t
mq_open(const char * path,int oflag,...)3766812Sraf mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...)
3772248Sraf {
3782248Sraf va_list ap;
379*7175Sraf mode_t mode = 0;
380*7175Sraf struct mq_attr *attr = NULL;
3812248Sraf int fd;
3822248Sraf int err;
3832248Sraf int cr_flag = 0;
3842248Sraf int locked = 0;
3852248Sraf uint64_t total_size;
3862248Sraf size_t msgsize;
3872248Sraf ssize_t maxmsg;
3882248Sraf uint64_t temp;
3892248Sraf void *ptr;
3902248Sraf mqdes_t *mqdp;
3912248Sraf mqhdr_t *mqhp;
3922248Sraf struct mq_dn *mqdnp;
3932248Sraf
3942248Sraf if (__pos4obj_check(path) == -1)
3952248Sraf return ((mqd_t)-1);
3962248Sraf
3972248Sraf /* acquire MSGQ lock to have atomic operation */
3982248Sraf if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0)
3992248Sraf goto out;
4002248Sraf locked = 1;
4012248Sraf
4022248Sraf va_start(ap, oflag);
4032248Sraf /* filter oflag to have READ/WRITE/CREATE modes only */
4042248Sraf oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK);
4052248Sraf if ((oflag & O_CREAT) != 0) {
4062248Sraf mode = va_arg(ap, mode_t);
4072248Sraf attr = va_arg(ap, struct mq_attr *);
4082248Sraf }
4092248Sraf va_end(ap);
4102248Sraf
4112248Sraf if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag,
4122248Sraf mode, &cr_flag)) < 0)
4132248Sraf goto out;
4142248Sraf
4152248Sraf /* closing permission file */
4162248Sraf (void) __close_nc(fd);
4172248Sraf
4182248Sraf /* Try to open/create data file */
4192248Sraf if (cr_flag) {
4202248Sraf cr_flag = PFILE_CREATE;
4212248Sraf if (attr == NULL) {
4222248Sraf maxmsg = MQ_MAXMSG;
4232248Sraf msgsize = MQ_MAXSIZE;
4242248Sraf } else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
4252248Sraf errno = EINVAL;
4262248Sraf goto out;
4272248Sraf } else if (attr->mq_maxmsg > _SEM_VALUE_MAX) {
4282248Sraf errno = ENOSPC;
4292248Sraf goto out;
4302248Sraf } else {
4312248Sraf maxmsg = attr->mq_maxmsg;
4322248Sraf msgsize = attr->mq_msgsize;
4332248Sraf }
4342248Sraf
4352248Sraf /* adjust for message size at word boundary */
4362248Sraf temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
4372248Sraf
4382248Sraf total_size = sizeof (mqhdr_t) +
4395891Sraf maxmsg * (temp + sizeof (msghdr_t)) +
4405891Sraf 2 * _MQ_PRIO_MAX * sizeof (uint64_t);
4412248Sraf
4422248Sraf if (total_size > SSIZE_MAX) {
4432248Sraf errno = ENOSPC;
4442248Sraf goto out;
4452248Sraf }
4462248Sraf
4472248Sraf /*
4482248Sraf * data file is opened with read/write to those
4492248Sraf * who have read or write permission
4502248Sraf */
4512248Sraf mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1;
4522248Sraf if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
4532248Sraf (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0)
4542248Sraf goto out;
4552248Sraf
4562248Sraf cr_flag |= DFILE_CREATE | DFILE_OPEN;
4572248Sraf
4582248Sraf /* force permissions to avoid umask effect */
4592248Sraf if (fchmod(fd, mode) < 0)
4602248Sraf goto out;
4612248Sraf
4622248Sraf if (ftruncate64(fd, (off64_t)total_size) < 0)
4632248Sraf goto out;
4642248Sraf } else {
4652248Sraf if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
4662248Sraf O_RDWR, 0666, &err)) < 0)
4672248Sraf goto out;
4682248Sraf cr_flag = DFILE_OPEN;
4692248Sraf
4702248Sraf /* Message queue has not been initialized yet */
4712248Sraf if (read(fd, &total_size, sizeof (total_size)) !=
4722248Sraf sizeof (total_size) || total_size == 0) {
4732248Sraf errno = ENOENT;
4742248Sraf goto out;
4752248Sraf }
4762248Sraf
4772248Sraf /* Message queue too big for this process to handle */
4782248Sraf if (total_size > SSIZE_MAX) {
4792248Sraf errno = EFBIG;
4802248Sraf goto out;
4812248Sraf }
4822248Sraf }
4832248Sraf
4842248Sraf if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) {
4852248Sraf errno = ENOMEM;
4862248Sraf goto out;
4872248Sraf }
4882248Sraf cr_flag |= ALLOC_MEM;
4892248Sraf
4902248Sraf if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE,
4912248Sraf MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
4922248Sraf goto out;
4932248Sraf mqhp = ptr;
4942248Sraf cr_flag |= DFILE_MMAP;
4952248Sraf
4962248Sraf /* closing data file */
4972248Sraf (void) __close_nc(fd);
4982248Sraf cr_flag &= ~DFILE_OPEN;
4992248Sraf
5002248Sraf /*
5012248Sraf * create, unlink, size, mmap, and close description file
5022248Sraf * all for a flag word in anonymous shared memory
5032248Sraf */
5042248Sraf if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT,
5052248Sraf 0666, &err)) < 0)
5062248Sraf goto out;
5072248Sraf cr_flag |= DFILE_OPEN;
5082248Sraf (void) __pos4obj_unlink(path, MQ_DSCN_TYPE);
5092248Sraf if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0)
5102248Sraf goto out;
5112248Sraf
5122248Sraf if ((ptr = mmap64(NULL, sizeof (struct mq_dn),
5132248Sraf PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
5142248Sraf goto out;
5152248Sraf mqdnp = ptr;
5162248Sraf cr_flag |= MQDNP_MMAP;
5172248Sraf
5182248Sraf (void) __close_nc(fd);
5192248Sraf cr_flag &= ~DFILE_OPEN;
5202248Sraf
5212248Sraf /*
5222248Sraf * we follow the same strategy as filesystem open() routine,
5232248Sraf * where fcntl.h flags are changed to flags defined in file.h.
5242248Sraf */
5252248Sraf mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE);
5262248Sraf mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK);
5272248Sraf
5282248Sraf /* new message queue requires initialization */
5292248Sraf if ((cr_flag & DFILE_CREATE) != 0) {
5302248Sraf /* message queue header has to be initialized */
5312248Sraf mq_init(mqhp, msgsize, maxmsg);
5322248Sraf mqhp->mq_totsize = total_size;
5332248Sraf }
5342248Sraf mqdp->mqd_mq = mqhp;
5352248Sraf mqdp->mqd_mqdn = mqdnp;
5362248Sraf mqdp->mqd_magic = MQ_MAGIC;
5372248Sraf mqdp->mqd_tcd = NULL;
538*7175Sraf mqdp->mqd_ownerdead = 0;
5392248Sraf if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) {
5402248Sraf lmutex_lock(&mq_list_lock);
5412248Sraf mqdp->mqd_next = mq_list;
5422248Sraf mqdp->mqd_prev = NULL;
5432248Sraf if (mq_list)
5442248Sraf mq_list->mqd_prev = mqdp;
5452248Sraf mq_list = mqdp;
5462248Sraf lmutex_unlock(&mq_list_lock);
5472248Sraf return ((mqd_t)mqdp);
5482248Sraf }
5492248Sraf
5502248Sraf locked = 0; /* fall into the error case */
5512248Sraf out:
5522248Sraf err = errno;
5532248Sraf if ((cr_flag & DFILE_OPEN) != 0)
5542248Sraf (void) __close_nc(fd);
5552248Sraf if ((cr_flag & DFILE_CREATE) != 0)
5562248Sraf (void) __pos4obj_unlink(path, MQ_DATA_TYPE);
5572248Sraf if ((cr_flag & PFILE_CREATE) != 0)
5582248Sraf (void) __pos4obj_unlink(path, MQ_PERM_TYPE);
5592248Sraf if ((cr_flag & ALLOC_MEM) != 0)
5602248Sraf free((void *)mqdp);
5612248Sraf if ((cr_flag & DFILE_MMAP) != 0)
5622248Sraf (void) munmap((caddr_t)mqhp, (size_t)total_size);
5632248Sraf if ((cr_flag & MQDNP_MMAP) != 0)
5642248Sraf (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
5652248Sraf if (locked)
5662248Sraf (void) __pos4obj_unlock(path, MQ_LOCK_TYPE);
5672248Sraf errno = err;
5682248Sraf return ((mqd_t)-1);
5692248Sraf }
5702248Sraf
5712248Sraf static void
mq_close_cleanup(mqdes_t * mqdp)5722248Sraf mq_close_cleanup(mqdes_t *mqdp)
5732248Sraf {
5742248Sraf mqhdr_t *mqhp = mqdp->mqd_mq;
5752248Sraf struct mq_dn *mqdnp = mqdp->mqd_mqdn;
5762248Sraf
5772248Sraf /* invalidate the descriptor before freeing it */
5782248Sraf mqdp->mqd_magic = 0;
579*7175Sraf if (!mqdp->mqd_ownerdead)
580*7175Sraf (void) mutex_unlock(&mqhp->mq_exclusive);
5812248Sraf
5822248Sraf lmutex_lock(&mq_list_lock);
5832248Sraf if (mqdp->mqd_next)
5842248Sraf mqdp->mqd_next->mqd_prev = mqdp->mqd_prev;
5852248Sraf if (mqdp->mqd_prev)
5862248Sraf mqdp->mqd_prev->mqd_next = mqdp->mqd_next;
5872248Sraf if (mq_list == mqdp)
5882248Sraf mq_list = mqdp->mqd_next;
5892248Sraf lmutex_unlock(&mq_list_lock);
5902248Sraf
5912248Sraf free(mqdp);
5922248Sraf (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
5932248Sraf (void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize);
5942248Sraf }
5952248Sraf
5962248Sraf int
mq_close(mqd_t mqdes)5976812Sraf mq_close(mqd_t mqdes)
5982248Sraf {
5992248Sraf mqdes_t *mqdp = (mqdes_t *)mqdes;
6002248Sraf mqhdr_t *mqhp;
6012248Sraf thread_communication_data_t *tcdp;
602*7175Sraf int error;
6032248Sraf
6042248Sraf if (!mq_is_valid(mqdp)) {
6052248Sraf errno = EBADF;
6062248Sraf return (-1);
6072248Sraf }
6082248Sraf
6092248Sraf mqhp = mqdp->mqd_mq;
610*7175Sraf if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
611*7175Sraf mqdp->mqd_ownerdead = 1;
612*7175Sraf if (error == EOWNERDEAD)
613*7175Sraf (void) mutex_unlock(&mqhp->mq_exclusive);
614*7175Sraf /* carry on regardless, without holding mq_exclusive */
615*7175Sraf }
6162248Sraf
6172248Sraf if (mqhp->mq_des == (uintptr_t)mqdp &&
6182248Sraf mqhp->mq_sigid.sn_pid == getpid()) {
6192248Sraf /* notification is set for this descriptor, remove it */
6202248Sraf (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
6212248Sraf mqhp->mq_ntype = 0;
6222248Sraf mqhp->mq_des = 0;
6232248Sraf }
6242248Sraf
6252248Sraf pthread_cleanup_push(mq_close_cleanup, mqdp);
6262248Sraf if ((tcdp = mqdp->mqd_tcd) != NULL) {
6272248Sraf mqdp->mqd_tcd = NULL;
6282248Sraf del_sigev_mq(tcdp); /* possible cancellation point */
6292248Sraf }
6302248Sraf pthread_cleanup_pop(1); /* finish in the cleanup handler */
6312248Sraf
6322248Sraf return (0);
6332248Sraf }
6342248Sraf
6352248Sraf int
mq_unlink(const char * path)6366812Sraf mq_unlink(const char *path)
6372248Sraf {
6382248Sraf int err;
6392248Sraf
6402248Sraf if (__pos4obj_check(path) < 0)
6412248Sraf return (-1);
6422248Sraf
6432248Sraf if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) {
6442248Sraf return (-1);
6452248Sraf }
6462248Sraf
6472248Sraf err = __pos4obj_unlink(path, MQ_PERM_TYPE);
6482248Sraf
6492248Sraf if (err == 0 || (err == -1 && errno == EEXIST)) {
6502248Sraf errno = 0;
6512248Sraf err = __pos4obj_unlink(path, MQ_DATA_TYPE);
6522248Sraf }
6532248Sraf
6542248Sraf if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0)
6552248Sraf return (-1);
6562248Sraf
6572248Sraf return (err);
6582248Sraf
6592248Sraf }
6602248Sraf
6612248Sraf static int
__mq_timedsend(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio,const timespec_t * timeout,int abs_rel)6622248Sraf __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
6632248Sraf uint_t msg_prio, const timespec_t *timeout, int abs_rel)
6642248Sraf {
6652248Sraf mqdes_t *mqdp = (mqdes_t *)mqdes;
6662248Sraf mqhdr_t *mqhp;
6672248Sraf int err;
6682248Sraf int notify = 0;
6692248Sraf
6702248Sraf /*
6712248Sraf * sem_*wait() does cancellation, if called.
6722248Sraf * pthread_testcancel() ensures that cancellation takes place if
6732248Sraf * there is a cancellation pending when mq_*send() is called.
6742248Sraf */
6752248Sraf pthread_testcancel();
6762248Sraf
6772248Sraf if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) {
6782248Sraf errno = EBADF;
6792248Sraf return (-1);
6802248Sraf }
6812248Sraf
6822248Sraf mqhp = mqdp->mqd_mq;
6832248Sraf
6842248Sraf if (msg_prio >= mqhp->mq_maxprio) {
6852248Sraf errno = EINVAL;
6862248Sraf return (-1);
6872248Sraf }
6882248Sraf if (msg_len > mqhp->mq_maxsz) {
6892248Sraf errno = EMSGSIZE;
6902248Sraf return (-1);
6912248Sraf }
6922248Sraf
6932248Sraf if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK)
6942248Sraf err = sem_trywait(&mqhp->mq_notfull);
6952248Sraf else {
6962248Sraf /*
6972248Sraf * We might get cancelled here...
6982248Sraf */
6992248Sraf if (timeout == NULL)
7002248Sraf err = sem_wait(&mqhp->mq_notfull);
7012248Sraf else if (abs_rel == ABS_TIME)
7022248Sraf err = sem_timedwait(&mqhp->mq_notfull, timeout);
7032248Sraf else
7042248Sraf err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout);
7052248Sraf }
7062248Sraf if (err == -1) {
7072248Sraf /*
7082248Sraf * errno has been set to EAGAIN / EINTR / ETIMEDOUT
7092248Sraf * by sem_*wait(), so we can just return.
7102248Sraf */
7112248Sraf return (-1);
7122248Sraf }
7132248Sraf
7142248Sraf /*
7152248Sraf * By the time we're here, we know that we've got the capacity
7162248Sraf * to add to the queue...now acquire the exclusive lock.
7172248Sraf */
718*7175Sraf if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
719*7175Sraf owner_dead(mqdp, err);
720*7175Sraf return (-1);
721*7175Sraf }
7222248Sraf
7232248Sraf /*
7242248Sraf * Now determine if we want to kick the notification. POSIX
7252248Sraf * requires that if a process has registered for notification,
7262248Sraf * we must kick it when the queue makes an empty to non-empty
7272248Sraf * transition, and there are no blocked receivers. Note that
7282248Sraf * this mechanism does _not_ guarantee that the kicked process
7292248Sraf * will be able to receive a message without blocking;
7302248Sraf * another receiver could intervene in the meantime. Thus,
7312248Sraf * the notification mechanism is inherently racy; all we can
7322248Sraf * do is hope to minimize the window as much as possible.
7332248Sraf * In general, we want to avoid kicking the notification when
7342248Sraf * there are clearly receivers blocked. We'll determine if
7352248Sraf * we want to kick the notification before the mq_putmsg(),
7362248Sraf * but the actual signotify() won't be done until the message
7372248Sraf * is on the queue.
7382248Sraf */
7392248Sraf if (mqhp->mq_sigid.sn_pid != 0) {
7402248Sraf int nmessages, nblocked;
7412248Sraf
7422248Sraf (void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
7432248Sraf (void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);
7442248Sraf
7452248Sraf if (nmessages == 0 && nblocked == 0)
7462248Sraf notify = 1;
7472248Sraf }
7482248Sraf
7492248Sraf mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
7502248Sraf (void) sem_post(&mqhp->mq_notempty);
7512248Sraf
7522248Sraf if (notify) {
7532248Sraf /* notify and also delete the registration */
754*7175Sraf do_notify(mqhp);
7552248Sraf }
7562248Sraf
7572248Sraf MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg));
7582248Sraf (void) mutex_unlock(&mqhp->mq_exclusive);
7592248Sraf
7602248Sraf return (0);
7612248Sraf }
7622248Sraf
7632248Sraf int
mq_send(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio)7646812Sraf mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio)
7652248Sraf {
7662248Sraf return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
7675891Sraf NULL, ABS_TIME));
7682248Sraf }
7692248Sraf
7702248Sraf int
mq_timedsend(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio,const timespec_t * abs_timeout)7716812Sraf mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
7722248Sraf uint_t msg_prio, const timespec_t *abs_timeout)
7732248Sraf {
7742248Sraf return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
7755891Sraf abs_timeout, ABS_TIME));
7762248Sraf }
7772248Sraf
7782248Sraf int
mq_reltimedsend_np(mqd_t mqdes,const char * msg_ptr,size_t msg_len,uint_t msg_prio,const timespec_t * rel_timeout)7796812Sraf mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
7802248Sraf uint_t msg_prio, const timespec_t *rel_timeout)
7812248Sraf {
7822248Sraf return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
7835891Sraf rel_timeout, REL_TIME));
7842248Sraf }
7852248Sraf
7862248Sraf static void
decrement_rblocked(mqhdr_t * mqhp)7872248Sraf decrement_rblocked(mqhdr_t *mqhp)
7882248Sraf {
7895891Sraf int cancel_state;
7902248Sraf
7915891Sraf (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
7922248Sraf while (sem_wait(&mqhp->mq_rblocked) == -1)
7932248Sraf continue;
7945891Sraf (void) pthread_setcancelstate(cancel_state, NULL);
7952248Sraf }
7962248Sraf
7972248Sraf static ssize_t
__mq_timedreceive(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio,const timespec_t * timeout,int abs_rel)7982248Sraf __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
7992248Sraf uint_t *msg_prio, const timespec_t *timeout, int abs_rel)
8002248Sraf {
8012248Sraf mqdes_t *mqdp = (mqdes_t *)mqdes;
8022248Sraf mqhdr_t *mqhp;
8032248Sraf ssize_t msg_size;
8042248Sraf int err;
8052248Sraf
8062248Sraf /*
8072248Sraf * sem_*wait() does cancellation, if called.
8082248Sraf * pthread_testcancel() ensures that cancellation takes place if
8092248Sraf * there is a cancellation pending when mq_*receive() is called.
8102248Sraf */
8112248Sraf pthread_testcancel();
8122248Sraf
8132248Sraf if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) {
8142248Sraf errno = EBADF;
8152248Sraf return (ssize_t)(-1);
8162248Sraf }
8172248Sraf
8182248Sraf mqhp = mqdp->mqd_mq;
8192248Sraf
8202248Sraf if (msg_len < mqhp->mq_maxsz) {
8212248Sraf errno = EMSGSIZE;
8222248Sraf return (ssize_t)(-1);
8232248Sraf }
8242248Sraf
8252248Sraf /*
8262248Sraf * The semaphoring scheme for mq_[timed]receive is a little hairy
8272248Sraf * thanks to POSIX.1b's arcane notification mechanism. First,
8282248Sraf * we try to take the common case and do a sem_trywait().
8292248Sraf * If that doesn't work, and O_NONBLOCK hasn't been set,
8302248Sraf * then note that we're going to sleep by incrementing the rblocked
8312248Sraf * semaphore. We decrement that semaphore after waking up.
8322248Sraf */
8332248Sraf if (sem_trywait(&mqhp->mq_notempty) == -1) {
8342248Sraf if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) {
8352248Sraf /*
8362248Sraf * errno has been set to EAGAIN or EINTR by
8372248Sraf * sem_trywait(), so we can just return.
8382248Sraf */
8392248Sraf return (-1);
8402248Sraf }
8412248Sraf /*
8422248Sraf * If we're here, then we're probably going to block...
8432248Sraf * increment the rblocked semaphore. If we get
8442248Sraf * cancelled, decrement_rblocked() will decrement it.
8452248Sraf */
8462248Sraf (void) sem_post(&mqhp->mq_rblocked);
8472248Sraf
8482248Sraf pthread_cleanup_push(decrement_rblocked, mqhp);
8492248Sraf if (timeout == NULL)
8502248Sraf err = sem_wait(&mqhp->mq_notempty);
8512248Sraf else if (abs_rel == ABS_TIME)
8522248Sraf err = sem_timedwait(&mqhp->mq_notempty, timeout);
8532248Sraf else
8542248Sraf err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout);
8552248Sraf pthread_cleanup_pop(1);
8562248Sraf
8572248Sraf if (err == -1) {
8582248Sraf /*
8592248Sraf * We took a signal or timeout while waiting
8602248Sraf * on mq_notempty...
8612248Sraf */
8622248Sraf return (-1);
8632248Sraf }
8642248Sraf }
8652248Sraf
866*7175Sraf if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
867*7175Sraf owner_dead(mqdp, err);
868*7175Sraf return (-1);
869*7175Sraf }
8702248Sraf msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio);
8712248Sraf (void) sem_post(&mqhp->mq_notfull);
8722248Sraf MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg));
8732248Sraf (void) mutex_unlock(&mqhp->mq_exclusive);
8742248Sraf
8752248Sraf return (msg_size);
8762248Sraf }
8772248Sraf
8782248Sraf ssize_t
mq_receive(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio)8796812Sraf mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio)
8802248Sraf {
8812248Sraf return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
8825891Sraf NULL, ABS_TIME));
8832248Sraf }
8842248Sraf
8852248Sraf ssize_t
mq_timedreceive(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio,const timespec_t * abs_timeout)8866812Sraf mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
8872248Sraf uint_t *msg_prio, const timespec_t *abs_timeout)
8882248Sraf {
8892248Sraf return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
8905891Sraf abs_timeout, ABS_TIME));
8912248Sraf }
8922248Sraf
8932248Sraf ssize_t
mq_reltimedreceive_np(mqd_t mqdes,char * msg_ptr,size_t msg_len,uint_t * msg_prio,const timespec_t * rel_timeout)8946812Sraf mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len,
8952248Sraf uint_t *msg_prio, const timespec_t *rel_timeout)
8962248Sraf {
8972248Sraf return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
8985891Sraf rel_timeout, REL_TIME));
8992248Sraf }
9002248Sraf
9012248Sraf /*
9026812Sraf * Only used below, in mq_notify().
9032248Sraf * We already have a spawner thread.
9042248Sraf * Verify that the attributes match; cancel it if necessary.
9052248Sraf */
9062248Sraf static int
cancel_if_necessary(thread_communication_data_t * tcdp,const struct sigevent * sigevp)9072248Sraf cancel_if_necessary(thread_communication_data_t *tcdp,
9082248Sraf const struct sigevent *sigevp)
9092248Sraf {
9106812Sraf int do_cancel = !pthread_attr_equal(tcdp->tcd_attrp,
9115891Sraf sigevp->sigev_notify_attributes);
9122248Sraf
9132248Sraf if (do_cancel) {
9142248Sraf /*
9152248Sraf * Attributes don't match, cancel the spawner thread.
9162248Sraf */
9172248Sraf (void) pthread_cancel(tcdp->tcd_server_id);
9182248Sraf } else {
9192248Sraf /*
9202248Sraf * Reuse the existing spawner thread with possibly
9212248Sraf * changed notification function and value.
9222248Sraf */
9232248Sraf tcdp->tcd_notif.sigev_notify = SIGEV_THREAD;
9242248Sraf tcdp->tcd_notif.sigev_signo = 0;
9252248Sraf tcdp->tcd_notif.sigev_value = sigevp->sigev_value;
9262248Sraf tcdp->tcd_notif.sigev_notify_function =
9275891Sraf sigevp->sigev_notify_function;
9282248Sraf }
9292248Sraf
9302248Sraf return (do_cancel);
9312248Sraf }
9322248Sraf
9332248Sraf int
mq_notify(mqd_t mqdes,const struct sigevent * sigevp)9346812Sraf mq_notify(mqd_t mqdes, const struct sigevent *sigevp)
9352248Sraf {
9362248Sraf mqdes_t *mqdp = (mqdes_t *)mqdes;
9372248Sraf mqhdr_t *mqhp;
9382248Sraf thread_communication_data_t *tcdp;
9392248Sraf siginfo_t mq_siginfo;
9402248Sraf struct sigevent sigevent;
9412248Sraf struct stat64 statb;
9422248Sraf port_notify_t *pn;
9432248Sraf void *userval;
9442248Sraf int rval = -1;
9452248Sraf int ntype;
9462248Sraf int port;
947*7175Sraf int error;
9482248Sraf
9492248Sraf if (!mq_is_valid(mqdp)) {
9502248Sraf errno = EBADF;
9512248Sraf return (-1);
9522248Sraf }
9532248Sraf
9542248Sraf mqhp = mqdp->mqd_mq;
9552248Sraf
956*7175Sraf if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
957*7175Sraf mqdp->mqd_ownerdead = 1;
958*7175Sraf sigevp = NULL;
959*7175Sraf if (error == EOWNERDEAD)
960*7175Sraf (void) mutex_unlock(&mqhp->mq_exclusive);
961*7175Sraf /* carry on regardless, without holding mq_exclusive */
962*7175Sraf }
9632248Sraf
9642248Sraf if (sigevp == NULL) { /* remove notification */
9652248Sraf if (mqhp->mq_des == (uintptr_t)mqdp &&
9662248Sraf mqhp->mq_sigid.sn_pid == getpid()) {
9672248Sraf /* notification is set for this descriptor, remove it */
9682248Sraf (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
9692248Sraf if ((tcdp = mqdp->mqd_tcd) != NULL) {
9702248Sraf sig_mutex_lock(&tcdp->tcd_lock);
9712248Sraf if (tcdp->tcd_msg_enabled) {
9722248Sraf /* cancel the spawner thread */
9732248Sraf tcdp = mqdp->mqd_tcd;
9742248Sraf mqdp->mqd_tcd = NULL;
9752248Sraf (void) pthread_cancel(
9762248Sraf tcdp->tcd_server_id);
9772248Sraf }
9782248Sraf sig_mutex_unlock(&tcdp->tcd_lock);
9792248Sraf }
9802248Sraf mqhp->mq_ntype = 0;
9812248Sraf mqhp->mq_des = 0;
9822248Sraf } else {
9832248Sraf /* notification is not set for this descriptor */
9842248Sraf errno = EBUSY;
9852248Sraf goto bad;
9862248Sraf }
9872248Sraf } else { /* register notification with this process */
9882248Sraf switch (ntype = sigevp->sigev_notify) {
9892248Sraf case SIGEV_THREAD:
9902248Sraf userval = sigevp->sigev_value.sival_ptr;
9912248Sraf port = -1;
9922248Sraf break;
9932248Sraf case SIGEV_PORT:
9942248Sraf pn = sigevp->sigev_value.sival_ptr;
9952248Sraf userval = pn->portnfy_user;
9962248Sraf port = pn->portnfy_port;
9972248Sraf if (fstat64(port, &statb) != 0 ||
9982248Sraf !S_ISPORT(statb.st_mode)) {
9992248Sraf errno = EBADF;
10002248Sraf goto bad;
10012248Sraf }
10022248Sraf (void) memset(&sigevent, 0, sizeof (sigevent));
10032248Sraf sigevent.sigev_notify = SIGEV_PORT;
10042248Sraf sigevp = &sigevent;
10052248Sraf break;
10062248Sraf }
10072248Sraf switch (ntype) {
10082248Sraf case SIGEV_NONE:
10092248Sraf mq_siginfo.si_signo = 0;
10102248Sraf mq_siginfo.si_code = SI_MESGQ;
10112248Sraf break;
10122248Sraf case SIGEV_SIGNAL:
10132248Sraf mq_siginfo.si_signo = sigevp->sigev_signo;
10142248Sraf mq_siginfo.si_value = sigevp->sigev_value;
10152248Sraf mq_siginfo.si_code = SI_MESGQ;
10162248Sraf break;
10172248Sraf case SIGEV_THREAD:
10182248Sraf if ((tcdp = mqdp->mqd_tcd) != NULL &&
10192248Sraf cancel_if_necessary(tcdp, sigevp))
10202248Sraf mqdp->mqd_tcd = NULL;
10212248Sraf /* FALLTHROUGH */
10222248Sraf case SIGEV_PORT:
10232248Sraf if ((tcdp = mqdp->mqd_tcd) == NULL) {
10242248Sraf /* we must create a spawner thread */
10252248Sraf tcdp = setup_sigev_handler(sigevp, MQ);
10262248Sraf if (tcdp == NULL) {
10272248Sraf errno = EBADF;
10282248Sraf goto bad;
10292248Sraf }
10302248Sraf tcdp->tcd_msg_enabled = 0;
10312248Sraf tcdp->tcd_msg_closing = 0;
10322248Sraf tcdp->tcd_msg_avail = &mqhp->mq_spawner;
10332248Sraf if (launch_spawner(tcdp) != 0) {
10342248Sraf free_sigev_handler(tcdp);
10352248Sraf goto bad;
10362248Sraf }
10372248Sraf mqdp->mqd_tcd = tcdp;
10382248Sraf }
10392248Sraf mq_siginfo.si_signo = 0;
10402248Sraf mq_siginfo.si_code = SI_MESGQ;
10412248Sraf break;
10422248Sraf default:
10432248Sraf errno = EINVAL;
10442248Sraf goto bad;
10452248Sraf }
10462248Sraf
10472248Sraf /* register notification */
10482248Sraf if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0)
10492248Sraf goto bad;
10502248Sraf mqhp->mq_ntype = ntype;
10512248Sraf mqhp->mq_des = (uintptr_t)mqdp;
10522248Sraf switch (ntype) {
10532248Sraf case SIGEV_THREAD:
10542248Sraf case SIGEV_PORT:
10552248Sraf tcdp->tcd_port = port;
10562248Sraf tcdp->tcd_msg_object = mqdp;
10572248Sraf tcdp->tcd_msg_userval = userval;
10582248Sraf sig_mutex_lock(&tcdp->tcd_lock);
10592248Sraf tcdp->tcd_msg_enabled = ntype;
10602248Sraf sig_mutex_unlock(&tcdp->tcd_lock);
10612248Sraf (void) cond_broadcast(&tcdp->tcd_cv);
10622248Sraf break;
10632248Sraf }
10642248Sraf }
10652248Sraf
10662248Sraf rval = 0; /* success */
10672248Sraf bad:
1068*7175Sraf if (error == 0) {
1069*7175Sraf (void) mutex_unlock(&mqhp->mq_exclusive);
1070*7175Sraf } else {
1071*7175Sraf errno = EBADMSG;
1072*7175Sraf rval = -1;
1073*7175Sraf }
10742248Sraf return (rval);
10752248Sraf }
10762248Sraf
10772248Sraf int
mq_setattr(mqd_t mqdes,const struct mq_attr * mqstat,struct mq_attr * omqstat)10786812Sraf mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
10792248Sraf {
10802248Sraf mqdes_t *mqdp = (mqdes_t *)mqdes;
10812248Sraf mqhdr_t *mqhp;
10822248Sraf uint_t flag = 0;
10832248Sraf
10842248Sraf if (!mq_is_valid(mqdp)) {
10852248Sraf errno = EBADF;
10862248Sraf return (-1);
10872248Sraf }
10882248Sraf
10892248Sraf /* store current attributes */
10902248Sraf if (omqstat != NULL) {
10912248Sraf int count;
10922248Sraf
10932248Sraf mqhp = mqdp->mqd_mq;
10942248Sraf omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
10952248Sraf omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
10962248Sraf omqstat->mq_msgsize = (long)mqhp->mq_maxsz;
10972248Sraf (void) sem_getvalue(&mqhp->mq_notempty, &count);
10982248Sraf omqstat->mq_curmsgs = count;
10992248Sraf }
11002248Sraf
11012248Sraf /* set description attributes */
11022248Sraf if ((mqstat->mq_flags & O_NONBLOCK) != 0)
11032248Sraf flag = FNONBLOCK;
11042248Sraf mqdp->mqd_mqdn->mqdn_flags = flag;
11052248Sraf
11062248Sraf return (0);
11072248Sraf }
11082248Sraf
11092248Sraf int
mq_getattr(mqd_t mqdes,struct mq_attr * mqstat)11106812Sraf mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
11112248Sraf {
11122248Sraf mqdes_t *mqdp = (mqdes_t *)mqdes;
11132248Sraf mqhdr_t *mqhp;
11142248Sraf int count;
11152248Sraf
11162248Sraf if (!mq_is_valid(mqdp)) {
11172248Sraf errno = EBADF;
11182248Sraf return (-1);
11192248Sraf }
11202248Sraf
11212248Sraf mqhp = mqdp->mqd_mq;
11222248Sraf
11232248Sraf mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
11242248Sraf mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
11252248Sraf mqstat->mq_msgsize = (long)mqhp->mq_maxsz;
11262248Sraf (void) sem_getvalue(&mqhp->mq_notempty, &count);
11272248Sraf mqstat->mq_curmsgs = count;
11282248Sraf return (0);
11292248Sraf }
11302248Sraf
11312248Sraf /*
11322248Sraf * Cleanup after fork1() in the child process.
11332248Sraf */
11342248Sraf void
postfork1_child_sigev_mq(void)11352248Sraf postfork1_child_sigev_mq(void)
11362248Sraf {
11372248Sraf thread_communication_data_t *tcdp;
11382248Sraf mqdes_t *mqdp;
11392248Sraf
11402248Sraf for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) {
11412248Sraf if ((tcdp = mqdp->mqd_tcd) != NULL) {
11422248Sraf mqdp->mqd_tcd = NULL;
11432248Sraf tcd_teardown(tcdp);
11442248Sraf }
11452248Sraf }
11462248Sraf }
1147