12248Sraf /* 22248Sraf * CDDL HEADER START 32248Sraf * 42248Sraf * The contents of this file are subject to the terms of the 52248Sraf * Common Development and Distribution License (the "License"). 62248Sraf * You may not use this file except in compliance with the License. 72248Sraf * 82248Sraf * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 92248Sraf * or http://www.opensolaris.org/os/licensing. 102248Sraf * See the License for the specific language governing permissions 112248Sraf * and limitations under the License. 122248Sraf * 132248Sraf * When distributing Covered Code, include this CDDL HEADER in each 142248Sraf * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 152248Sraf * If applicable, add the following below this CDDL HEADER, with the 162248Sraf * fields enclosed by brackets "[]" replaced with your own identifying 172248Sraf * information: Portions Copyright [yyyy] [name of copyright owner] 182248Sraf * 192248Sraf * CDDL HEADER END 202248Sraf */ 212248Sraf 222248Sraf /* 23*5891Sraf * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 242248Sraf * Use is subject to license terms. 252248Sraf */ 262248Sraf 272248Sraf #pragma ident "%Z%%M% %I% %E% SMI" 282248Sraf 292248Sraf #include "synonyms.h" 302248Sraf #include "thr_uberdata.h" 312248Sraf #include "asyncio.h" 322248Sraf #include <atomic.h> 332248Sraf #include <sys/param.h> 342248Sraf #include <sys/file.h> 352248Sraf #include <sys/port.h> 362248Sraf 372248Sraf static int _aio_hash_insert(aio_result_t *, aio_req_t *); 382248Sraf static aio_req_t *_aio_req_get(aio_worker_t *); 392248Sraf static void _aio_req_add(aio_req_t *, aio_worker_t **, int); 402248Sraf static void _aio_req_del(aio_worker_t *, aio_req_t *, int); 412248Sraf static void _aio_work_done(aio_worker_t *); 422248Sraf static void _aio_enq_doneq(aio_req_t *); 432248Sraf 442248Sraf extern void _aio_lio_free(aio_lio_t *); 452248Sraf 462248Sraf extern int __fdsync(int, int); 472248Sraf extern int _port_dispatch(int, int, int, int, uintptr_t, void *); 482248Sraf 492248Sraf static int _aio_fsync_del(aio_worker_t *, aio_req_t *); 502248Sraf static void _aiodone(aio_req_t *, ssize_t, int); 512248Sraf static void _aio_cancel_work(aio_worker_t *, int, int *, int *); 522248Sraf static void _aio_finish_request(aio_worker_t *, ssize_t, int); 532248Sraf 542248Sraf /* 552248Sraf * switch for kernel async I/O 562248Sraf */ 572248Sraf int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */ 582248Sraf 592248Sraf /* 602248Sraf * Key for thread-specific data 612248Sraf */ 622248Sraf pthread_key_t _aio_key; 632248Sraf 642248Sraf /* 652248Sraf * Array for determining whether or not a file supports kaio. 662248Sraf * Initialized in _kaio_init(). 672248Sraf */ 682248Sraf uint32_t *_kaio_supported = NULL; 692248Sraf 702248Sraf /* 712248Sraf * workers for read/write requests 722248Sraf * (__aio_mutex lock protects circular linked list of workers) 732248Sraf */ 742248Sraf aio_worker_t *__workers_rw; /* circular list of AIO workers */ 752248Sraf aio_worker_t *__nextworker_rw; /* next worker in list of workers */ 762248Sraf int __rw_workerscnt; /* number of read/write workers */ 772248Sraf 782248Sraf /* 792248Sraf * worker for notification requests. 802248Sraf */ 812248Sraf aio_worker_t *__workers_no; /* circular list of AIO workers */ 822248Sraf aio_worker_t *__nextworker_no; /* next worker in list of workers */ 832248Sraf int __no_workerscnt; /* number of write workers */ 842248Sraf 852248Sraf aio_req_t *_aio_done_tail; /* list of done requests */ 862248Sraf aio_req_t *_aio_done_head; 872248Sraf 882248Sraf mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */ 892248Sraf cond_t __aio_initcv = DEFAULTCV; 902248Sraf int __aio_initbusy = 0; 912248Sraf 922248Sraf mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */ 932248Sraf cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */ 942248Sraf 952248Sraf pid_t __pid = (pid_t)-1; /* initialize as invalid pid */ 962248Sraf int _sigio_enabled = 0; /* when set, send SIGIO signal */ 972248Sraf 982248Sraf aio_hash_t *_aio_hash; 992248Sraf 1002248Sraf aio_req_t *_aio_doneq; /* double linked done queue list */ 1012248Sraf 1022248Sraf int _aio_donecnt = 0; 1032248Sraf int _aio_waitncnt = 0; /* # of requests for aio_waitn */ 1042248Sraf int _aio_doneq_cnt = 0; 1052248Sraf int _aio_outstand_cnt = 0; /* # of outstanding requests */ 1062248Sraf int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */ 1072248Sraf int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */ 1082248Sraf int _aio_kernel_suspend = 0; /* active kernel kaio calls */ 1092248Sraf int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */ 1102248Sraf 1112248Sraf int _max_workers = 256; /* max number of workers permitted */ 1122248Sraf int _min_workers = 4; /* min number of workers */ 1132248Sraf int _minworkload = 2; /* min number of request in q */ 1142248Sraf int _aio_worker_cnt = 0; /* number of workers to do requests */ 1152248Sraf int __uaio_ok = 0; /* AIO has been enabled */ 1162248Sraf sigset_t _worker_set; /* worker's signal mask */ 1172248Sraf 1182248Sraf int _aiowait_flag = 0; /* when set, aiowait() is inprogress */ 1192248Sraf int _aio_flags = 0; /* see asyncio.h defines for */ 1202248Sraf 1212248Sraf aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */ 1222248Sraf 1232248Sraf int hz; /* clock ticks per second */ 1242248Sraf 1252248Sraf static int 1262248Sraf _kaio_supported_init(void) 1272248Sraf { 1282248Sraf void *ptr; 1292248Sraf size_t size; 1302248Sraf 1312248Sraf if (_kaio_supported != NULL) /* already initialized */ 1322248Sraf return (0); 1332248Sraf 1342248Sraf size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t); 1352248Sraf ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, 1362248Sraf MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 1372248Sraf if (ptr == MAP_FAILED) 1382248Sraf return (-1); 1392248Sraf _kaio_supported = ptr; 1402248Sraf return (0); 1412248Sraf } 1422248Sraf 1432248Sraf /* 1442248Sraf * The aio subsystem is initialized when an AIO request is made. 1452248Sraf * Constants are initialized like the max number of workers that 1462248Sraf * the subsystem can create, and the minimum number of workers 1472248Sraf * permitted before imposing some restrictions. Also, some 1482248Sraf * workers are created. 1492248Sraf */ 1502248Sraf int 1512248Sraf __uaio_init(void) 1522248Sraf { 1532248Sraf int ret = -1; 1542248Sraf int i; 155*5891Sraf int cancel_state; 1562248Sraf 1572248Sraf lmutex_lock(&__aio_initlock); 158*5891Sraf (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 1592248Sraf while (__aio_initbusy) 160*5891Sraf (void) cond_wait(&__aio_initcv, &__aio_initlock); 161*5891Sraf (void) pthread_setcancelstate(cancel_state, NULL); 1622248Sraf if (__uaio_ok) { /* already initialized */ 1632248Sraf lmutex_unlock(&__aio_initlock); 1642248Sraf return (0); 1652248Sraf } 1662248Sraf __aio_initbusy = 1; 1672248Sraf lmutex_unlock(&__aio_initlock); 1682248Sraf 1692248Sraf hz = (int)sysconf(_SC_CLK_TCK); 1702248Sraf __pid = getpid(); 1712248Sraf 1722248Sraf setup_cancelsig(SIGAIOCANCEL); 1732248Sraf 1742248Sraf if (_kaio_supported_init() != 0) 1752248Sraf goto out; 1762248Sraf 1772248Sraf /* 1782248Sraf * Allocate and initialize the hash table. 1793344Ssp92102 * Do this only once, even if __uaio_init() is called twice. 1802248Sraf */ 1813344Ssp92102 if (_aio_hash == NULL) { 1823344Ssp92102 /* LINTED pointer cast */ 1833344Ssp92102 _aio_hash = (aio_hash_t *)mmap(NULL, 1843344Ssp92102 HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE, 1853344Ssp92102 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 1863344Ssp92102 if ((void *)_aio_hash == MAP_FAILED) { 1873344Ssp92102 _aio_hash = NULL; 1883344Ssp92102 goto out; 1893344Ssp92102 } 1903344Ssp92102 for (i = 0; i < HASHSZ; i++) 1913344Ssp92102 (void) mutex_init(&_aio_hash[i].hash_lock, 1923344Ssp92102 USYNC_THREAD, NULL); 1932248Sraf } 1942248Sraf 1952248Sraf /* 1962248Sraf * Initialize worker's signal mask to only catch SIGAIOCANCEL. 1972248Sraf */ 1982248Sraf (void) sigfillset(&_worker_set); 1992248Sraf (void) sigdelset(&_worker_set, SIGAIOCANCEL); 2002248Sraf 2012248Sraf /* 2023344Ssp92102 * Create one worker to send asynchronous notifications. 2033344Ssp92102 * Do this only once, even if __uaio_init() is called twice. 2043344Ssp92102 */ 2053344Ssp92102 if (__no_workerscnt == 0 && 2063344Ssp92102 (_aio_create_worker(NULL, AIONOTIFY) != 0)) { 2073344Ssp92102 errno = EAGAIN; 2083344Ssp92102 goto out; 2093344Ssp92102 } 2103344Ssp92102 2113344Ssp92102 /* 2122248Sraf * Create the minimum number of read/write workers. 2133344Ssp92102 * And later check whether atleast one worker is created; 2143344Ssp92102 * lwp_create() calls could fail because of segkp exhaustion. 2152248Sraf */ 2162248Sraf for (i = 0; i < _min_workers; i++) 2172248Sraf (void) _aio_create_worker(NULL, AIOREAD); 2183344Ssp92102 if (__rw_workerscnt == 0) { 2193344Ssp92102 errno = EAGAIN; 2203344Ssp92102 goto out; 2213344Ssp92102 } 2222248Sraf 2232248Sraf ret = 0; 2242248Sraf out: 2252248Sraf lmutex_lock(&__aio_initlock); 2262248Sraf if (ret == 0) 2272248Sraf __uaio_ok = 1; 2282248Sraf __aio_initbusy = 0; 2292248Sraf (void) cond_broadcast(&__aio_initcv); 2302248Sraf lmutex_unlock(&__aio_initlock); 2312248Sraf return (ret); 2322248Sraf } 2332248Sraf 2342248Sraf /* 2352248Sraf * Called from close() before actually performing the real _close(). 2362248Sraf */ 2372248Sraf void 2382248Sraf _aio_close(int fd) 2392248Sraf { 2402248Sraf if (fd < 0) /* avoid cancelling everything */ 2412248Sraf return; 2422248Sraf /* 2432248Sraf * Cancel all outstanding aio requests for this file descriptor. 2442248Sraf */ 2452248Sraf if (__uaio_ok) 2462248Sraf (void) aiocancel_all(fd); 2472248Sraf /* 2482248Sraf * If we have allocated the bit array, clear the bit for this file. 2492248Sraf * The next open may re-use this file descriptor and the new file 2502248Sraf * may have different kaio() behaviour. 2512248Sraf */ 2522248Sraf if (_kaio_supported != NULL) 2532248Sraf CLEAR_KAIO_SUPPORTED(fd); 2542248Sraf } 2552248Sraf 2562248Sraf /* 2572248Sraf * special kaio cleanup thread sits in a loop in the 2582248Sraf * kernel waiting for pending kaio requests to complete. 2592248Sraf */ 2602248Sraf void * 2612248Sraf _kaio_cleanup_thread(void *arg) 2622248Sraf { 2632248Sraf if (pthread_setspecific(_aio_key, arg) != 0) 2642248Sraf aio_panic("_kaio_cleanup_thread, pthread_setspecific()"); 2652248Sraf (void) _kaio(AIOSTART); 2662248Sraf return (arg); 2672248Sraf } 2682248Sraf 2692248Sraf /* 2702248Sraf * initialize kaio. 2712248Sraf */ 2722248Sraf void 2732248Sraf _kaio_init() 2742248Sraf { 2752248Sraf int error; 2762248Sraf sigset_t oset; 277*5891Sraf int cancel_state; 2782248Sraf 2792248Sraf lmutex_lock(&__aio_initlock); 280*5891Sraf (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 2812248Sraf while (__aio_initbusy) 282*5891Sraf (void) cond_wait(&__aio_initcv, &__aio_initlock); 283*5891Sraf (void) pthread_setcancelstate(cancel_state, NULL); 2842248Sraf if (_kaio_ok) { /* already initialized */ 2852248Sraf lmutex_unlock(&__aio_initlock); 2862248Sraf return; 2872248Sraf } 2882248Sraf __aio_initbusy = 1; 2892248Sraf lmutex_unlock(&__aio_initlock); 2902248Sraf 2912248Sraf if (_kaio_supported_init() != 0) 2922248Sraf error = ENOMEM; 2932248Sraf else if ((_kaiowp = _aio_worker_alloc()) == NULL) 2942248Sraf error = ENOMEM; 2952248Sraf else if ((error = (int)_kaio(AIOINIT)) == 0) { 2962248Sraf (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 2972248Sraf error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread, 2982248Sraf _kaiowp, THR_DAEMON, &_kaiowp->work_tid); 2992248Sraf (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 3002248Sraf } 3012248Sraf if (error && _kaiowp != NULL) { 3022248Sraf _aio_worker_free(_kaiowp); 3032248Sraf _kaiowp = NULL; 3042248Sraf } 3052248Sraf 3062248Sraf lmutex_lock(&__aio_initlock); 3072248Sraf if (error) 3082248Sraf _kaio_ok = -1; 3092248Sraf else 3102248Sraf _kaio_ok = 1; 3112248Sraf __aio_initbusy = 0; 3122248Sraf (void) cond_broadcast(&__aio_initcv); 3132248Sraf lmutex_unlock(&__aio_initlock); 3142248Sraf } 3152248Sraf 3162248Sraf int 3172248Sraf aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 3182248Sraf aio_result_t *resultp) 3192248Sraf { 3202248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD)); 3212248Sraf } 3222248Sraf 3232248Sraf int 3242248Sraf aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 3252248Sraf aio_result_t *resultp) 3262248Sraf { 3272248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE)); 3282248Sraf } 3292248Sraf 3302248Sraf #if !defined(_LP64) 3312248Sraf int 3322248Sraf aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 3332248Sraf aio_result_t *resultp) 3342248Sraf { 3352248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64)); 3362248Sraf } 3372248Sraf 3382248Sraf int 3392248Sraf aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 3402248Sraf aio_result_t *resultp) 3412248Sraf { 3422248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64)); 3432248Sraf } 3442248Sraf #endif /* !defined(_LP64) */ 3452248Sraf 3462248Sraf int 3472248Sraf _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence, 3482248Sraf aio_result_t *resultp, int mode) 3492248Sraf { 3502248Sraf aio_req_t *reqp; 3512248Sraf aio_args_t *ap; 3522248Sraf offset_t loffset; 3535535Spraks struct stat64 stat64; 3542248Sraf int error = 0; 3552248Sraf int kerr; 3562248Sraf int umode; 3572248Sraf 3582248Sraf switch (whence) { 3592248Sraf 3602248Sraf case SEEK_SET: 3612248Sraf loffset = offset; 3622248Sraf break; 3632248Sraf case SEEK_CUR: 3642248Sraf if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1) 3652248Sraf error = -1; 3662248Sraf else 3672248Sraf loffset += offset; 3682248Sraf break; 3692248Sraf case SEEK_END: 3705535Spraks if (fstat64(fd, &stat64) == -1) 3712248Sraf error = -1; 3722248Sraf else 3735535Spraks loffset = offset + stat64.st_size; 3742248Sraf break; 3752248Sraf default: 3762248Sraf errno = EINVAL; 3772248Sraf error = -1; 3782248Sraf } 3792248Sraf 3802248Sraf if (error) 3812248Sraf return (error); 3822248Sraf 3832248Sraf /* initialize kaio */ 3842248Sraf if (!_kaio_ok) 3852248Sraf _kaio_init(); 3862248Sraf 3872248Sraf /* 3882248Sraf * _aio_do_request() needs the original request code (mode) to be able 3892248Sraf * to choose the appropiate 32/64 bit function. All other functions 3902248Sraf * only require the difference between READ and WRITE (umode). 3912248Sraf */ 3922248Sraf if (mode == AIOAREAD64 || mode == AIOAWRITE64) 3932248Sraf umode = mode - AIOAREAD64; 3942248Sraf else 3952248Sraf umode = mode; 3962248Sraf 3972248Sraf /* 3982248Sraf * Try kernel aio first. 3992248Sraf * If errno is ENOTSUP/EBADFD, fall back to the thread implementation. 4002248Sraf */ 4012248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) { 4022248Sraf resultp->aio_errno = 0; 4032248Sraf sig_mutex_lock(&__aio_mutex); 4042248Sraf _kaio_outstand_cnt++; 4055535Spraks sig_mutex_unlock(&__aio_mutex); 4062248Sraf kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ? 4072248Sraf (umode | AIO_POLL_BIT) : umode), 4082248Sraf fd, buf, bufsz, loffset, resultp); 4092248Sraf if (kerr == 0) { 4102248Sraf return (0); 4112248Sraf } 4125535Spraks sig_mutex_lock(&__aio_mutex); 4132248Sraf _kaio_outstand_cnt--; 4142248Sraf sig_mutex_unlock(&__aio_mutex); 4152248Sraf if (errno != ENOTSUP && errno != EBADFD) 4162248Sraf return (-1); 4172248Sraf if (errno == EBADFD) 4182248Sraf SET_KAIO_NOT_SUPPORTED(fd); 4192248Sraf } 4202248Sraf 4212248Sraf if (!__uaio_ok && __uaio_init() == -1) 4222248Sraf return (-1); 4232248Sraf 4242248Sraf if ((reqp = _aio_req_alloc()) == NULL) { 4252248Sraf errno = EAGAIN; 4262248Sraf return (-1); 4272248Sraf } 4282248Sraf 4292248Sraf /* 4302248Sraf * _aio_do_request() checks reqp->req_op to differentiate 4312248Sraf * between 32 and 64 bit access. 4322248Sraf */ 4332248Sraf reqp->req_op = mode; 4342248Sraf reqp->req_resultp = resultp; 4352248Sraf ap = &reqp->req_args; 4362248Sraf ap->fd = fd; 4372248Sraf ap->buf = buf; 4382248Sraf ap->bufsz = bufsz; 4392248Sraf ap->offset = loffset; 4402248Sraf 4412248Sraf if (_aio_hash_insert(resultp, reqp) != 0) { 4422248Sraf _aio_req_free(reqp); 4432248Sraf errno = EINVAL; 4442248Sraf return (-1); 4452248Sraf } 4462248Sraf /* 4472248Sraf * _aio_req_add() only needs the difference between READ and 4482248Sraf * WRITE to choose the right worker queue. 4492248Sraf */ 4502248Sraf _aio_req_add(reqp, &__nextworker_rw, umode); 4512248Sraf return (0); 4522248Sraf } 4532248Sraf 4542248Sraf int 4552248Sraf aiocancel(aio_result_t *resultp) 4562248Sraf { 4572248Sraf aio_req_t *reqp; 4582248Sraf aio_worker_t *aiowp; 4592248Sraf int ret; 4602248Sraf int done = 0; 4612248Sraf int canceled = 0; 4622248Sraf 4632248Sraf if (!__uaio_ok) { 4642248Sraf errno = EINVAL; 4652248Sraf return (-1); 4662248Sraf } 4672248Sraf 4682248Sraf sig_mutex_lock(&__aio_mutex); 4692248Sraf reqp = _aio_hash_find(resultp); 4702248Sraf if (reqp == NULL) { 4712248Sraf if (_aio_outstand_cnt == _aio_req_done_cnt) 4722248Sraf errno = EINVAL; 4732248Sraf else 4742248Sraf errno = EACCES; 4752248Sraf ret = -1; 4762248Sraf } else { 4772248Sraf aiowp = reqp->req_worker; 4782248Sraf sig_mutex_lock(&aiowp->work_qlock1); 4792248Sraf (void) _aio_cancel_req(aiowp, reqp, &canceled, &done); 4802248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 4812248Sraf 4822248Sraf if (canceled) { 4832248Sraf ret = 0; 4842248Sraf } else { 4852248Sraf if (_aio_outstand_cnt == 0 || 4862248Sraf _aio_outstand_cnt == _aio_req_done_cnt) 4872248Sraf errno = EINVAL; 4882248Sraf else 4892248Sraf errno = EACCES; 4902248Sraf ret = -1; 4912248Sraf } 4922248Sraf } 4932248Sraf sig_mutex_unlock(&__aio_mutex); 4942248Sraf return (ret); 4952248Sraf } 4962248Sraf 497*5891Sraf /* ARGSUSED */ 498*5891Sraf static void 499*5891Sraf _aiowait_cleanup(void *arg) 500*5891Sraf { 501*5891Sraf sig_mutex_lock(&__aio_mutex); 502*5891Sraf _aiowait_flag--; 503*5891Sraf sig_mutex_unlock(&__aio_mutex); 504*5891Sraf } 505*5891Sraf 5062248Sraf /* 507*5891Sraf * This must be asynch safe and cancel safe 5082248Sraf */ 5092248Sraf aio_result_t * 5102248Sraf aiowait(struct timeval *uwait) 5112248Sraf { 5122248Sraf aio_result_t *uresultp; 5132248Sraf aio_result_t *kresultp; 5142248Sraf aio_result_t *resultp; 5152248Sraf int dontblock; 5162248Sraf int timedwait = 0; 5172248Sraf int kaio_errno = 0; 5182248Sraf struct timeval twait; 5192248Sraf struct timeval *wait = NULL; 5202248Sraf hrtime_t hrtend; 5212248Sraf hrtime_t hres; 5222248Sraf 5232248Sraf if (uwait) { 5242248Sraf /* 5252248Sraf * Check for a valid specified wait time. 5262248Sraf * If it is invalid, fail the call right away. 5272248Sraf */ 5282248Sraf if (uwait->tv_sec < 0 || uwait->tv_usec < 0 || 5292248Sraf uwait->tv_usec >= MICROSEC) { 5302248Sraf errno = EINVAL; 5312248Sraf return ((aio_result_t *)-1); 5322248Sraf } 5332248Sraf 5342248Sraf if (uwait->tv_sec > 0 || uwait->tv_usec > 0) { 5352248Sraf hrtend = gethrtime() + 5364502Spraks (hrtime_t)uwait->tv_sec * NANOSEC + 5374502Spraks (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC); 5382248Sraf twait = *uwait; 5392248Sraf wait = &twait; 5402248Sraf timedwait++; 5412248Sraf } else { 5422248Sraf /* polling */ 5432248Sraf sig_mutex_lock(&__aio_mutex); 5442248Sraf if (_kaio_outstand_cnt == 0) { 5452248Sraf kresultp = (aio_result_t *)-1; 5462248Sraf } else { 5472248Sraf kresultp = (aio_result_t *)_kaio(AIOWAIT, 5482248Sraf (struct timeval *)-1, 1); 5492248Sraf if (kresultp != (aio_result_t *)-1 && 5502248Sraf kresultp != NULL && 5512248Sraf kresultp != (aio_result_t *)1) { 5522248Sraf _kaio_outstand_cnt--; 5532248Sraf sig_mutex_unlock(&__aio_mutex); 5542248Sraf return (kresultp); 5552248Sraf } 5562248Sraf } 5572248Sraf uresultp = _aio_req_done(); 5582248Sraf sig_mutex_unlock(&__aio_mutex); 5592248Sraf if (uresultp != NULL && 5602248Sraf uresultp != (aio_result_t *)-1) { 5612248Sraf return (uresultp); 5622248Sraf } 5632248Sraf if (uresultp == (aio_result_t *)-1 && 5642248Sraf kresultp == (aio_result_t *)-1) { 5652248Sraf errno = EINVAL; 5662248Sraf return ((aio_result_t *)-1); 5672248Sraf } else { 5682248Sraf return (NULL); 5692248Sraf } 5702248Sraf } 5712248Sraf } 5722248Sraf 5732248Sraf for (;;) { 5742248Sraf sig_mutex_lock(&__aio_mutex); 5752248Sraf uresultp = _aio_req_done(); 5762248Sraf if (uresultp != NULL && uresultp != (aio_result_t *)-1) { 5772248Sraf sig_mutex_unlock(&__aio_mutex); 5782248Sraf resultp = uresultp; 5792248Sraf break; 5802248Sraf } 5812248Sraf _aiowait_flag++; 5822248Sraf dontblock = (uresultp == (aio_result_t *)-1); 5832248Sraf if (dontblock && _kaio_outstand_cnt == 0) { 5842248Sraf kresultp = (aio_result_t *)-1; 5852248Sraf kaio_errno = EINVAL; 5862248Sraf } else { 5872248Sraf sig_mutex_unlock(&__aio_mutex); 588*5891Sraf pthread_cleanup_push(_aiowait_cleanup, NULL); 589*5891Sraf _cancel_prologue(); 5902248Sraf kresultp = (aio_result_t *)_kaio(AIOWAIT, 5912248Sraf wait, dontblock); 592*5891Sraf _cancel_epilogue(); 593*5891Sraf pthread_cleanup_pop(0); 5942248Sraf sig_mutex_lock(&__aio_mutex); 5952248Sraf kaio_errno = errno; 5962248Sraf } 5972248Sraf _aiowait_flag--; 5982248Sraf sig_mutex_unlock(&__aio_mutex); 5992248Sraf if (kresultp == (aio_result_t *)1) { 6002248Sraf /* aiowait() awakened by an aionotify() */ 6012248Sraf continue; 6022248Sraf } else if (kresultp != NULL && 6032248Sraf kresultp != (aio_result_t *)-1) { 6042248Sraf resultp = kresultp; 6052248Sraf sig_mutex_lock(&__aio_mutex); 6062248Sraf _kaio_outstand_cnt--; 6072248Sraf sig_mutex_unlock(&__aio_mutex); 6082248Sraf break; 6092248Sraf } else if (kresultp == (aio_result_t *)-1 && 6102248Sraf kaio_errno == EINVAL && 6112248Sraf uresultp == (aio_result_t *)-1) { 6122248Sraf errno = kaio_errno; 6132248Sraf resultp = (aio_result_t *)-1; 6142248Sraf break; 6152248Sraf } else if (kresultp == (aio_result_t *)-1 && 6162248Sraf kaio_errno == EINTR) { 6172248Sraf errno = kaio_errno; 6182248Sraf resultp = (aio_result_t *)-1; 6192248Sraf break; 6202248Sraf } else if (timedwait) { 6212248Sraf hres = hrtend - gethrtime(); 6222248Sraf if (hres <= 0) { 6232248Sraf /* time is up; return */ 6242248Sraf resultp = NULL; 6252248Sraf break; 6262248Sraf } else { 6272248Sraf /* 6282248Sraf * Some time left. Round up the remaining time 6292248Sraf * in nanoseconds to microsec. Retry the call. 6302248Sraf */ 6312248Sraf hres += (NANOSEC / MICROSEC) - 1; 6322248Sraf wait->tv_sec = hres / NANOSEC; 6332248Sraf wait->tv_usec = 6344502Spraks (hres % NANOSEC) / (NANOSEC / MICROSEC); 6352248Sraf } 6362248Sraf } else { 6372248Sraf ASSERT(kresultp == NULL && uresultp == NULL); 6382248Sraf resultp = NULL; 6392248Sraf continue; 6402248Sraf } 6412248Sraf } 6422248Sraf return (resultp); 6432248Sraf } 6442248Sraf 6452248Sraf /* 6462248Sraf * _aio_get_timedelta calculates the remaining time and stores the result 6472248Sraf * into timespec_t *wait. 6482248Sraf */ 6492248Sraf 6502248Sraf int 6512248Sraf _aio_get_timedelta(timespec_t *end, timespec_t *wait) 6522248Sraf { 6532248Sraf int ret = 0; 6542248Sraf struct timeval cur; 6552248Sraf timespec_t curtime; 6562248Sraf 6572248Sraf (void) gettimeofday(&cur, NULL); 6582248Sraf curtime.tv_sec = cur.tv_sec; 6592248Sraf curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */ 6602248Sraf 6612248Sraf if (end->tv_sec >= curtime.tv_sec) { 6622248Sraf wait->tv_sec = end->tv_sec - curtime.tv_sec; 6632248Sraf if (end->tv_nsec >= curtime.tv_nsec) { 6642248Sraf wait->tv_nsec = end->tv_nsec - curtime.tv_nsec; 6652248Sraf if (wait->tv_sec == 0 && wait->tv_nsec == 0) 6662248Sraf ret = -1; /* timer expired */ 6672248Sraf } else { 6682248Sraf if (end->tv_sec > curtime.tv_sec) { 6692248Sraf wait->tv_sec -= 1; 6702248Sraf wait->tv_nsec = NANOSEC - 6712248Sraf (curtime.tv_nsec - end->tv_nsec); 6722248Sraf } else { 6732248Sraf ret = -1; /* timer expired */ 6742248Sraf } 6752248Sraf } 6762248Sraf } else { 6772248Sraf ret = -1; 6782248Sraf } 6792248Sraf return (ret); 6802248Sraf } 6812248Sraf 6822248Sraf /* 6832248Sraf * If closing by file descriptor: we will simply cancel all the outstanding 6842248Sraf * aio`s and return. Those aio's in question will have either noticed the 6852248Sraf * cancellation notice before, during, or after initiating io. 6862248Sraf */ 6872248Sraf int 6882248Sraf aiocancel_all(int fd) 6892248Sraf { 6902248Sraf aio_req_t *reqp; 6912248Sraf aio_req_t **reqpp; 6922248Sraf aio_worker_t *first; 6932248Sraf aio_worker_t *next; 6942248Sraf int canceled = 0; 6952248Sraf int done = 0; 6962248Sraf int cancelall = 0; 6972248Sraf 6982248Sraf sig_mutex_lock(&__aio_mutex); 6992248Sraf 7002248Sraf if (_aio_outstand_cnt == 0) { 7012248Sraf sig_mutex_unlock(&__aio_mutex); 7022248Sraf return (AIO_ALLDONE); 7032248Sraf } 7042248Sraf 7052248Sraf /* 7062248Sraf * Cancel requests from the read/write workers' queues. 7072248Sraf */ 7082248Sraf first = __nextworker_rw; 7092248Sraf next = first; 7102248Sraf do { 7112248Sraf _aio_cancel_work(next, fd, &canceled, &done); 7122248Sraf } while ((next = next->work_forw) != first); 7132248Sraf 7142248Sraf /* 7152248Sraf * finally, check if there are requests on the done queue that 7162248Sraf * should be canceled. 7172248Sraf */ 7182248Sraf if (fd < 0) 7192248Sraf cancelall = 1; 7202248Sraf reqpp = &_aio_done_tail; 7212248Sraf while ((reqp = *reqpp) != NULL) { 7222248Sraf if (cancelall || reqp->req_args.fd == fd) { 7232248Sraf *reqpp = reqp->req_next; 7242248Sraf _aio_donecnt--; 7252248Sraf (void) _aio_hash_del(reqp->req_resultp); 7262248Sraf _aio_req_free(reqp); 7272248Sraf } else 7282248Sraf reqpp = &reqp->req_next; 7292248Sraf } 7302248Sraf if (cancelall) { 7312248Sraf ASSERT(_aio_donecnt == 0); 7322248Sraf _aio_done_head = NULL; 7332248Sraf } 7342248Sraf sig_mutex_unlock(&__aio_mutex); 7352248Sraf 7362248Sraf if (canceled && done == 0) 7372248Sraf return (AIO_CANCELED); 7382248Sraf else if (done && canceled == 0) 7392248Sraf return (AIO_ALLDONE); 7402248Sraf else if ((canceled + done == 0) && KAIO_SUPPORTED(fd)) 7412248Sraf return ((int)_kaio(AIOCANCEL, fd, NULL)); 7422248Sraf return (AIO_NOTCANCELED); 7432248Sraf } 7442248Sraf 7452248Sraf /* 7462248Sraf * Cancel requests from a given work queue. If the file descriptor 7472248Sraf * parameter, fd, is non-negative, then only cancel those requests 7482248Sraf * in this queue that are to this file descriptor. If the fd 7492248Sraf * parameter is -1, then cancel all requests. 7502248Sraf */ 7512248Sraf static void 7522248Sraf _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done) 7532248Sraf { 7542248Sraf aio_req_t *reqp; 7552248Sraf 7562248Sraf sig_mutex_lock(&aiowp->work_qlock1); 7572248Sraf /* 7582248Sraf * cancel queued requests first. 7592248Sraf */ 7602248Sraf reqp = aiowp->work_tail1; 7612248Sraf while (reqp != NULL) { 7622248Sraf if (fd < 0 || reqp->req_args.fd == fd) { 7632248Sraf if (_aio_cancel_req(aiowp, reqp, canceled, done)) { 7642248Sraf /* 7652248Sraf * Callers locks were dropped. 7662248Sraf * reqp is invalid; start traversing 7672248Sraf * the list from the beginning again. 7682248Sraf */ 7692248Sraf reqp = aiowp->work_tail1; 7702248Sraf continue; 7712248Sraf } 7722248Sraf } 7732248Sraf reqp = reqp->req_next; 7742248Sraf } 7752248Sraf /* 7762248Sraf * Since the queued requests have been canceled, there can 7772248Sraf * only be one inprogress request that should be canceled. 7782248Sraf */ 7792248Sraf if ((reqp = aiowp->work_req) != NULL && 7802248Sraf (fd < 0 || reqp->req_args.fd == fd)) 7812248Sraf (void) _aio_cancel_req(aiowp, reqp, canceled, done); 7822248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 7832248Sraf } 7842248Sraf 7852248Sraf /* 7862248Sraf * Cancel a request. Return 1 if the callers locks were temporarily 7872248Sraf * dropped, otherwise return 0. 7882248Sraf */ 7892248Sraf int 7902248Sraf _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done) 7912248Sraf { 7922248Sraf int ostate = reqp->req_state; 7932248Sraf 7942248Sraf ASSERT(MUTEX_HELD(&__aio_mutex)); 7952248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 7962248Sraf if (ostate == AIO_REQ_CANCELED) 7972248Sraf return (0); 7982248Sraf if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) { 7992248Sraf (*done)++; 8002248Sraf return (0); 8012248Sraf } 8022248Sraf if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) { 8032248Sraf ASSERT(POSIX_AIO(reqp)); 8042248Sraf /* Cancel the queued aio_fsync() request */ 8052248Sraf if (!reqp->req_head->lio_canned) { 8062248Sraf reqp->req_head->lio_canned = 1; 8072248Sraf _aio_outstand_cnt--; 8082248Sraf (*canceled)++; 8092248Sraf } 8102248Sraf return (0); 8112248Sraf } 8122248Sraf reqp->req_state = AIO_REQ_CANCELED; 8132248Sraf _aio_req_del(aiowp, reqp, ostate); 8142248Sraf (void) _aio_hash_del(reqp->req_resultp); 8152248Sraf (*canceled)++; 8162248Sraf if (reqp == aiowp->work_req) { 8172248Sraf ASSERT(ostate == AIO_REQ_INPROGRESS); 8182248Sraf /* 8192248Sraf * Set the result values now, before _aiodone() is called. 8202248Sraf * We do this because the application can expect aio_return 8212248Sraf * and aio_errno to be set to -1 and ECANCELED, respectively, 8222248Sraf * immediately after a successful return from aiocancel() 8232248Sraf * or aio_cancel(). 8242248Sraf */ 8252248Sraf _aio_set_result(reqp, -1, ECANCELED); 8262248Sraf (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL); 8272248Sraf return (0); 8282248Sraf } 8292248Sraf if (!POSIX_AIO(reqp)) { 8302248Sraf _aio_outstand_cnt--; 8312248Sraf _aio_set_result(reqp, -1, ECANCELED); 8322248Sraf return (0); 8332248Sraf } 8342248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 8352248Sraf sig_mutex_unlock(&__aio_mutex); 8362248Sraf _aiodone(reqp, -1, ECANCELED); 8372248Sraf sig_mutex_lock(&__aio_mutex); 8382248Sraf sig_mutex_lock(&aiowp->work_qlock1); 8392248Sraf return (1); 8402248Sraf } 8412248Sraf 8422248Sraf int 8432248Sraf _aio_create_worker(aio_req_t *reqp, int mode) 8442248Sraf { 8452248Sraf aio_worker_t *aiowp, **workers, **nextworker; 8462248Sraf int *aio_workerscnt; 8472248Sraf void *(*func)(void *); 8482248Sraf sigset_t oset; 8492248Sraf int error; 8502248Sraf 8512248Sraf /* 8522248Sraf * Put the new worker thread in the right queue. 8532248Sraf */ 8542248Sraf switch (mode) { 8552248Sraf case AIOREAD: 8562248Sraf case AIOWRITE: 8572248Sraf case AIOAREAD: 8582248Sraf case AIOAWRITE: 8592248Sraf #if !defined(_LP64) 8602248Sraf case AIOAREAD64: 8612248Sraf case AIOAWRITE64: 8622248Sraf #endif 8632248Sraf workers = &__workers_rw; 8642248Sraf nextworker = &__nextworker_rw; 8652248Sraf aio_workerscnt = &__rw_workerscnt; 8662248Sraf func = _aio_do_request; 8672248Sraf break; 8682248Sraf case AIONOTIFY: 8692248Sraf workers = &__workers_no; 8702248Sraf nextworker = &__nextworker_no; 8712248Sraf func = _aio_do_notify; 8722248Sraf aio_workerscnt = &__no_workerscnt; 8732248Sraf break; 8742248Sraf default: 8752248Sraf aio_panic("_aio_create_worker: invalid mode"); 8762248Sraf break; 8772248Sraf } 8782248Sraf 8792248Sraf if ((aiowp = _aio_worker_alloc()) == NULL) 8802248Sraf return (-1); 8812248Sraf 8822248Sraf if (reqp) { 8832248Sraf reqp->req_state = AIO_REQ_QUEUED; 8842248Sraf reqp->req_worker = aiowp; 8852248Sraf aiowp->work_head1 = reqp; 8862248Sraf aiowp->work_tail1 = reqp; 8872248Sraf aiowp->work_next1 = reqp; 8882248Sraf aiowp->work_count1 = 1; 8892248Sraf aiowp->work_minload1 = 1; 8902248Sraf } 8912248Sraf 8922248Sraf (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 8932248Sraf error = thr_create(NULL, AIOSTKSIZE, func, aiowp, 8944502Spraks THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid); 8952248Sraf (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 8962248Sraf if (error) { 8972248Sraf if (reqp) { 8982248Sraf reqp->req_state = 0; 8992248Sraf reqp->req_worker = NULL; 9002248Sraf } 9012248Sraf _aio_worker_free(aiowp); 9022248Sraf return (-1); 9032248Sraf } 9042248Sraf 9052248Sraf lmutex_lock(&__aio_mutex); 9062248Sraf (*aio_workerscnt)++; 9072248Sraf if (*workers == NULL) { 9082248Sraf aiowp->work_forw = aiowp; 9092248Sraf aiowp->work_backw = aiowp; 9102248Sraf *nextworker = aiowp; 9112248Sraf *workers = aiowp; 9122248Sraf } else { 9132248Sraf aiowp->work_backw = (*workers)->work_backw; 9142248Sraf aiowp->work_forw = (*workers); 9152248Sraf (*workers)->work_backw->work_forw = aiowp; 9162248Sraf (*workers)->work_backw = aiowp; 9172248Sraf } 9182248Sraf _aio_worker_cnt++; 9192248Sraf lmutex_unlock(&__aio_mutex); 9202248Sraf 9212248Sraf (void) thr_continue(aiowp->work_tid); 9222248Sraf 9232248Sraf return (0); 9242248Sraf } 9252248Sraf 9262248Sraf /* 9272248Sraf * This is the worker's main routine. 9282248Sraf * The task of this function is to execute all queued requests; 9292248Sraf * once the last pending request is executed this function will block 9302248Sraf * in _aio_idle(). A new incoming request must wakeup this thread to 9312248Sraf * restart the work. 9322248Sraf * Every worker has an own work queue. The queue lock is required 9332248Sraf * to synchronize the addition of new requests for this worker or 9342248Sraf * cancellation of pending/running requests. 9352248Sraf * 9362248Sraf * Cancellation scenarios: 9372248Sraf * The cancellation of a request is being done asynchronously using 9382248Sraf * _aio_cancel_req() from another thread context. 9392248Sraf * A queued request can be cancelled in different manners : 9402248Sraf * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED): 9412248Sraf * - lock the queue -> remove the request -> unlock the queue 9422248Sraf * - this function/thread does not detect this cancellation process 9432248Sraf * b) request is in progress (AIO_REQ_INPROGRESS) : 9442248Sraf * - this function first allow the cancellation of the running 9452248Sraf * request with the flag "work_cancel_flg=1" 9462248Sraf * see _aio_req_get() -> _aio_cancel_on() 9472248Sraf * During this phase, it is allowed to interrupt the worker 9482248Sraf * thread running the request (this thread) using the SIGAIOCANCEL 9492248Sraf * signal. 9502248Sraf * Once this thread returns from the kernel (because the request 9512248Sraf * is just done), then it must disable a possible cancellation 9522248Sraf * and proceed to finish the request. To disable the cancellation 9532248Sraf * this thread must use _aio_cancel_off() to set "work_cancel_flg=0". 9542248Sraf * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ): 9552248Sraf * same procedure as in a) 9562248Sraf * 9572248Sraf * To b) 9582248Sraf * This thread uses sigsetjmp() to define the position in the code, where 9592248Sraf * it wish to continue working in the case that a SIGAIOCANCEL signal 9602248Sraf * is detected. 9612248Sraf * Normally this thread should get the cancellation signal during the 9622248Sraf * kernel phase (reading or writing). In that case the signal handler 9632248Sraf * aiosigcancelhndlr() is activated using the worker thread context, 9642248Sraf * which again will use the siglongjmp() function to break the standard 9652248Sraf * code flow and jump to the "sigsetjmp" position, provided that 9662248Sraf * "work_cancel_flg" is set to "1". 9672248Sraf * Because the "work_cancel_flg" is only manipulated by this worker 9682248Sraf * thread and it can only run on one CPU at a given time, it is not 9692248Sraf * necessary to protect that flag with the queue lock. 9702248Sraf * Returning from the kernel (read or write system call) we must 9712248Sraf * first disable the use of the SIGAIOCANCEL signal and accordingly 9722248Sraf * the use of the siglongjmp() function to prevent a possible deadlock: 9732248Sraf * - It can happens that this worker thread returns from the kernel and 9742248Sraf * blocks in "work_qlock1", 9752248Sraf * - then a second thread cancels the apparently "in progress" request 9762248Sraf * and sends the SIGAIOCANCEL signal to the worker thread, 9772248Sraf * - the worker thread gets assigned the "work_qlock1" and will returns 9782248Sraf * from the kernel, 9792248Sraf * - the kernel detects the pending signal and activates the signal 9802248Sraf * handler instead, 9812248Sraf * - if the "work_cancel_flg" is still set then the signal handler 9822248Sraf * should use siglongjmp() to cancel the "in progress" request and 9832248Sraf * it would try to acquire the same work_qlock1 in _aio_req_get() 9842248Sraf * for a second time => deadlock. 9852248Sraf * To avoid that situation we disable the cancellation of the request 9862248Sraf * in progress BEFORE we try to acquire the work_qlock1. 9872248Sraf * In that case the signal handler will not call siglongjmp() and the 9882248Sraf * worker thread will continue running the standard code flow. 9892248Sraf * Then this thread must check the AIO_REQ_CANCELED flag to emulate 9902248Sraf * an eventually required siglongjmp() freeing the work_qlock1 and 9912248Sraf * avoiding a deadlock. 9922248Sraf */ 9932248Sraf void * 9942248Sraf _aio_do_request(void *arglist) 9952248Sraf { 9962248Sraf aio_worker_t *aiowp = (aio_worker_t *)arglist; 9972248Sraf ulwp_t *self = curthread; 9982248Sraf struct aio_args *arg; 9992248Sraf aio_req_t *reqp; /* current AIO request */ 10002248Sraf ssize_t retval; 10012248Sraf int error; 10022248Sraf 10032248Sraf if (pthread_setspecific(_aio_key, aiowp) != 0) 10042248Sraf aio_panic("_aio_do_request, pthread_setspecific()"); 10052248Sraf (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL); 10062248Sraf ASSERT(aiowp->work_req == NULL); 10072248Sraf 10082248Sraf /* 10092248Sraf * We resume here when an operation is cancelled. 10102248Sraf * On first entry, aiowp->work_req == NULL, so all 10112248Sraf * we do is block SIGAIOCANCEL. 10122248Sraf */ 10132248Sraf (void) sigsetjmp(aiowp->work_jmp_buf, 0); 10142248Sraf ASSERT(self->ul_sigdefer == 0); 10152248Sraf 10162248Sraf sigoff(self); /* block SIGAIOCANCEL */ 10172248Sraf if (aiowp->work_req != NULL) 10182248Sraf _aio_finish_request(aiowp, -1, ECANCELED); 10192248Sraf 10202248Sraf for (;;) { 10212248Sraf /* 10222248Sraf * Put completed requests on aio_done_list. This has 10232248Sraf * to be done as part of the main loop to ensure that 10242248Sraf * we don't artificially starve any aiowait'ers. 10252248Sraf */ 10262248Sraf if (aiowp->work_done1) 10272248Sraf _aio_work_done(aiowp); 10282248Sraf 10292248Sraf top: 10302248Sraf /* consume any deferred SIGAIOCANCEL signal here */ 10312248Sraf sigon(self); 10322248Sraf sigoff(self); 10332248Sraf 10342248Sraf while ((reqp = _aio_req_get(aiowp)) == NULL) { 10352248Sraf if (_aio_idle(aiowp) != 0) 10362248Sraf goto top; 10372248Sraf } 10382248Sraf arg = &reqp->req_args; 10392248Sraf ASSERT(reqp->req_state == AIO_REQ_INPROGRESS || 10402248Sraf reqp->req_state == AIO_REQ_CANCELED); 10412248Sraf error = 0; 10422248Sraf 10432248Sraf switch (reqp->req_op) { 10442248Sraf case AIOREAD: 10452248Sraf case AIOAREAD: 10462248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 10472248Sraf retval = pread(arg->fd, arg->buf, 10482248Sraf arg->bufsz, arg->offset); 10492248Sraf if (retval == -1) { 10502248Sraf if (errno == ESPIPE) { 10512248Sraf retval = read(arg->fd, 10522248Sraf arg->buf, arg->bufsz); 10532248Sraf if (retval == -1) 10542248Sraf error = errno; 10552248Sraf } else { 10562248Sraf error = errno; 10572248Sraf } 10582248Sraf } 10592248Sraf sigoff(self); /* block SIGAIOCANCEL */ 10602248Sraf break; 10612248Sraf case AIOWRITE: 10622248Sraf case AIOAWRITE: 10632248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 10642248Sraf retval = pwrite(arg->fd, arg->buf, 10652248Sraf arg->bufsz, arg->offset); 10662248Sraf if (retval == -1) { 10672248Sraf if (errno == ESPIPE) { 10682248Sraf retval = write(arg->fd, 10692248Sraf arg->buf, arg->bufsz); 10702248Sraf if (retval == -1) 10712248Sraf error = errno; 10722248Sraf } else { 10732248Sraf error = errno; 10742248Sraf } 10752248Sraf } 10762248Sraf sigoff(self); /* block SIGAIOCANCEL */ 10772248Sraf break; 10782248Sraf #if !defined(_LP64) 10792248Sraf case AIOAREAD64: 10802248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 10812248Sraf retval = pread64(arg->fd, arg->buf, 10822248Sraf arg->bufsz, arg->offset); 10832248Sraf if (retval == -1) { 10842248Sraf if (errno == ESPIPE) { 10852248Sraf retval = read(arg->fd, 10862248Sraf arg->buf, arg->bufsz); 10872248Sraf if (retval == -1) 10882248Sraf error = errno; 10892248Sraf } else { 10902248Sraf error = errno; 10912248Sraf } 10922248Sraf } 10932248Sraf sigoff(self); /* block SIGAIOCANCEL */ 10942248Sraf break; 10952248Sraf case AIOAWRITE64: 10962248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 10972248Sraf retval = pwrite64(arg->fd, arg->buf, 10982248Sraf arg->bufsz, arg->offset); 10992248Sraf if (retval == -1) { 11002248Sraf if (errno == ESPIPE) { 11012248Sraf retval = write(arg->fd, 11022248Sraf arg->buf, arg->bufsz); 11032248Sraf if (retval == -1) 11042248Sraf error = errno; 11052248Sraf } else { 11062248Sraf error = errno; 11072248Sraf } 11082248Sraf } 11092248Sraf sigoff(self); /* block SIGAIOCANCEL */ 11102248Sraf break; 11112248Sraf #endif /* !defined(_LP64) */ 11122248Sraf case AIOFSYNC: 11132248Sraf if (_aio_fsync_del(aiowp, reqp)) 11142248Sraf goto top; 11152248Sraf ASSERT(reqp->req_head == NULL); 11162248Sraf /* 11172248Sraf * All writes for this fsync request are now 11182248Sraf * acknowledged. Now make these writes visible 11192248Sraf * and put the final request into the hash table. 11202248Sraf */ 11212248Sraf if (reqp->req_state == AIO_REQ_CANCELED) { 11222248Sraf /* EMPTY */; 11232248Sraf } else if (arg->offset == O_SYNC) { 11242248Sraf if ((retval = __fdsync(arg->fd, FSYNC)) == -1) 11252248Sraf error = errno; 11262248Sraf } else { 11272248Sraf if ((retval = __fdsync(arg->fd, FDSYNC)) == -1) 11282248Sraf error = errno; 11292248Sraf } 11302248Sraf if (_aio_hash_insert(reqp->req_resultp, reqp) != 0) 11312248Sraf aio_panic("_aio_do_request(): AIOFSYNC: " 11322248Sraf "request already in hash table"); 11332248Sraf break; 11342248Sraf default: 11352248Sraf aio_panic("_aio_do_request, bad op"); 11362248Sraf } 11372248Sraf 11382248Sraf _aio_finish_request(aiowp, retval, error); 11392248Sraf } 11402248Sraf /* NOTREACHED */ 11412248Sraf return (NULL); 11422248Sraf } 11432248Sraf 11442248Sraf /* 11452248Sraf * Perform the tail processing for _aio_do_request(). 11462248Sraf * The in-progress request may or may not have been cancelled. 11472248Sraf */ 11482248Sraf static void 11492248Sraf _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error) 11502248Sraf { 11512248Sraf aio_req_t *reqp; 11522248Sraf 11532248Sraf sig_mutex_lock(&aiowp->work_qlock1); 11542248Sraf if ((reqp = aiowp->work_req) == NULL) 11552248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 11562248Sraf else { 11572248Sraf aiowp->work_req = NULL; 11582248Sraf if (reqp->req_state == AIO_REQ_CANCELED) { 11592248Sraf retval = -1; 11602248Sraf error = ECANCELED; 11612248Sraf } 11622248Sraf if (!POSIX_AIO(reqp)) { 11634502Spraks int notify; 11642248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 11652248Sraf sig_mutex_lock(&__aio_mutex); 11662248Sraf if (reqp->req_state == AIO_REQ_INPROGRESS) 11672248Sraf reqp->req_state = AIO_REQ_DONE; 11684502Spraks /* 11694502Spraks * If it was canceled, this request will not be 11704502Spraks * added to done list. Just free it. 11714502Spraks */ 11724502Spraks if (error == ECANCELED) { 11732248Sraf _aio_outstand_cnt--; 11744502Spraks _aio_req_free(reqp); 11754502Spraks } else { 11764502Spraks _aio_set_result(reqp, retval, error); 11774502Spraks _aio_req_done_cnt++; 11784502Spraks } 11794502Spraks /* 11804502Spraks * Notify any thread that may have blocked 11814502Spraks * because it saw an outstanding request. 11824502Spraks */ 11834502Spraks notify = 0; 11844502Spraks if (_aio_outstand_cnt == 0 && _aiowait_flag) { 11854502Spraks notify = 1; 11864502Spraks } 11872248Sraf sig_mutex_unlock(&__aio_mutex); 11884502Spraks if (notify) { 11894502Spraks (void) _kaio(AIONOTIFY); 11904502Spraks } 11912248Sraf } else { 11922248Sraf if (reqp->req_state == AIO_REQ_INPROGRESS) 11932248Sraf reqp->req_state = AIO_REQ_DONE; 11942248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 11952248Sraf _aiodone(reqp, retval, error); 11962248Sraf } 11972248Sraf } 11982248Sraf } 11992248Sraf 12002248Sraf void 12012248Sraf _aio_req_mark_done(aio_req_t *reqp) 12022248Sraf { 12032248Sraf #if !defined(_LP64) 12042248Sraf if (reqp->req_largefile) 12052248Sraf ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 12062248Sraf else 12072248Sraf #endif 12082248Sraf ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 12092248Sraf } 12102248Sraf 12112248Sraf /* 12122248Sraf * Sleep for 'ticks' clock ticks to give somebody else a chance to run, 12132248Sraf * hopefully to consume one of our queued signals. 12142248Sraf */ 12152248Sraf static void 12162248Sraf _aio_delay(int ticks) 12172248Sraf { 12182248Sraf (void) usleep(ticks * (MICROSEC / hz)); 12192248Sraf } 12202248Sraf 12212248Sraf /* 12222248Sraf * Actually send the notifications. 12232248Sraf * We could block indefinitely here if the application 12242248Sraf * is not listening for the signal or port notifications. 12252248Sraf */ 12262248Sraf static void 12272248Sraf send_notification(notif_param_t *npp) 12282248Sraf { 12292248Sraf extern int __sigqueue(pid_t pid, int signo, 12304502Spraks /* const union sigval */ void *value, int si_code, int block); 12312248Sraf 12322248Sraf if (npp->np_signo) 12332248Sraf (void) __sigqueue(__pid, npp->np_signo, npp->np_user, 12342248Sraf SI_ASYNCIO, 1); 12352248Sraf else if (npp->np_port >= 0) 12362248Sraf (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO, 12372248Sraf npp->np_event, npp->np_object, npp->np_user); 12382248Sraf 12392248Sraf if (npp->np_lio_signo) 12402248Sraf (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user, 12412248Sraf SI_ASYNCIO, 1); 12422248Sraf else if (npp->np_lio_port >= 0) 12432248Sraf (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO, 12442248Sraf npp->np_lio_event, npp->np_lio_object, npp->np_lio_user); 12452248Sraf } 12462248Sraf 12472248Sraf /* 12482248Sraf * Asynchronous notification worker. 12492248Sraf */ 12502248Sraf void * 12512248Sraf _aio_do_notify(void *arg) 12522248Sraf { 12532248Sraf aio_worker_t *aiowp = (aio_worker_t *)arg; 12542248Sraf aio_req_t *reqp; 12552248Sraf 12562248Sraf /* 12572248Sraf * This isn't really necessary. All signals are blocked. 12582248Sraf */ 12592248Sraf if (pthread_setspecific(_aio_key, aiowp) != 0) 12602248Sraf aio_panic("_aio_do_notify, pthread_setspecific()"); 12612248Sraf 12622248Sraf /* 12632248Sraf * Notifications are never cancelled. 12642248Sraf * All signals remain blocked, forever. 12652248Sraf */ 12662248Sraf for (;;) { 12672248Sraf while ((reqp = _aio_req_get(aiowp)) == NULL) { 12682248Sraf if (_aio_idle(aiowp) != 0) 12692248Sraf aio_panic("_aio_do_notify: _aio_idle() failed"); 12702248Sraf } 12712248Sraf send_notification(&reqp->req_notify); 12722248Sraf _aio_req_free(reqp); 12732248Sraf } 12742248Sraf 12752248Sraf /* NOTREACHED */ 12762248Sraf return (NULL); 12772248Sraf } 12782248Sraf 12792248Sraf /* 12802248Sraf * Do the completion semantics for a request that was either canceled 12812248Sraf * by _aio_cancel_req() or was completed by _aio_do_request(). 12822248Sraf */ 12832248Sraf static void 12842248Sraf _aiodone(aio_req_t *reqp, ssize_t retval, int error) 12852248Sraf { 12862248Sraf aio_result_t *resultp = reqp->req_resultp; 12872248Sraf int notify = 0; 12882248Sraf aio_lio_t *head; 12892248Sraf int sigev_none; 12902248Sraf int sigev_signal; 12912248Sraf int sigev_thread; 12922248Sraf int sigev_port; 12932248Sraf notif_param_t np; 12942248Sraf 12952248Sraf /* 12962248Sraf * We call _aiodone() only for Posix I/O. 12972248Sraf */ 12982248Sraf ASSERT(POSIX_AIO(reqp)); 12992248Sraf 13002248Sraf sigev_none = 0; 13012248Sraf sigev_signal = 0; 13022248Sraf sigev_thread = 0; 13032248Sraf sigev_port = 0; 13042248Sraf np.np_signo = 0; 13052248Sraf np.np_port = -1; 13062248Sraf np.np_lio_signo = 0; 13072248Sraf np.np_lio_port = -1; 13082248Sraf 13092248Sraf switch (reqp->req_sigevent.sigev_notify) { 13102248Sraf case SIGEV_NONE: 13112248Sraf sigev_none = 1; 13122248Sraf break; 13132248Sraf case SIGEV_SIGNAL: 13142248Sraf sigev_signal = 1; 13152248Sraf break; 13162248Sraf case SIGEV_THREAD: 13172248Sraf sigev_thread = 1; 13182248Sraf break; 13192248Sraf case SIGEV_PORT: 13202248Sraf sigev_port = 1; 13212248Sraf break; 13222248Sraf default: 13232248Sraf aio_panic("_aiodone: improper sigev_notify"); 13242248Sraf break; 13252248Sraf } 13262248Sraf 13272248Sraf /* 13282248Sraf * Figure out the notification parameters while holding __aio_mutex. 13292248Sraf * Actually perform the notifications after dropping __aio_mutex. 13302248Sraf * This allows us to sleep for a long time (if the notifications 13312248Sraf * incur delays) without impeding other async I/O operations. 13322248Sraf */ 13332248Sraf 13342248Sraf sig_mutex_lock(&__aio_mutex); 13352248Sraf 13362248Sraf if (sigev_signal) { 13372248Sraf if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0) 13382248Sraf notify = 1; 13392248Sraf np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 13402248Sraf } else if (sigev_thread | sigev_port) { 13412248Sraf if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0) 13422248Sraf notify = 1; 13432248Sraf np.np_event = reqp->req_op; 13442248Sraf if (np.np_event == AIOFSYNC && reqp->req_largefile) 13452248Sraf np.np_event = AIOFSYNC64; 13462248Sraf np.np_object = (uintptr_t)reqp->req_aiocbp; 13472248Sraf np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 13482248Sraf } 13492248Sraf 13502248Sraf if (resultp->aio_errno == EINPROGRESS) 13512248Sraf _aio_set_result(reqp, retval, error); 13522248Sraf 13532248Sraf _aio_outstand_cnt--; 13542248Sraf 13552248Sraf head = reqp->req_head; 13562248Sraf reqp->req_head = NULL; 13572248Sraf 13582248Sraf if (sigev_none) { 13592248Sraf _aio_enq_doneq(reqp); 13602248Sraf reqp = NULL; 13612248Sraf } else { 13622248Sraf (void) _aio_hash_del(resultp); 13632248Sraf _aio_req_mark_done(reqp); 13642248Sraf } 13652248Sraf 13662248Sraf _aio_waitn_wakeup(); 13672248Sraf 13682248Sraf /* 13692248Sraf * __aio_waitn() sets AIO_WAIT_INPROGRESS and 13702248Sraf * __aio_suspend() increments "_aio_kernel_suspend" 13712248Sraf * when they are waiting in the kernel for completed I/Os. 13722248Sraf * 13732248Sraf * _kaio(AIONOTIFY) awakes the corresponding function 13742248Sraf * in the kernel; then the corresponding __aio_waitn() or 13752248Sraf * __aio_suspend() function could reap the recently 13762248Sraf * completed I/Os (_aiodone()). 13772248Sraf */ 13782248Sraf if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0) 13792248Sraf (void) _kaio(AIONOTIFY); 13802248Sraf 13812248Sraf sig_mutex_unlock(&__aio_mutex); 13822248Sraf 13832248Sraf if (head != NULL) { 13842248Sraf /* 13852248Sraf * If all the lio requests have completed, 13862248Sraf * prepare to notify the waiting thread. 13872248Sraf */ 13882248Sraf sig_mutex_lock(&head->lio_mutex); 13892248Sraf ASSERT(head->lio_refcnt == head->lio_nent); 13902248Sraf if (head->lio_refcnt == 1) { 13912248Sraf int waiting = 0; 13922248Sraf if (head->lio_mode == LIO_WAIT) { 13932248Sraf if ((waiting = head->lio_waiting) != 0) 13942248Sraf (void) cond_signal(&head->lio_cond_cv); 13952248Sraf } else if (head->lio_port < 0) { /* none or signal */ 13962248Sraf if ((np.np_lio_signo = head->lio_signo) != 0) 13972248Sraf notify = 1; 13982248Sraf np.np_lio_user = head->lio_sigval.sival_ptr; 13992248Sraf } else { /* thread or port */ 14002248Sraf notify = 1; 14012248Sraf np.np_lio_port = head->lio_port; 14022248Sraf np.np_lio_event = head->lio_event; 14032248Sraf np.np_lio_object = 14042248Sraf (uintptr_t)head->lio_sigevent; 14052248Sraf np.np_lio_user = head->lio_sigval.sival_ptr; 14062248Sraf } 14072248Sraf head->lio_nent = head->lio_refcnt = 0; 14082248Sraf sig_mutex_unlock(&head->lio_mutex); 14092248Sraf if (waiting == 0) 14102248Sraf _aio_lio_free(head); 14112248Sraf } else { 14122248Sraf head->lio_nent--; 14132248Sraf head->lio_refcnt--; 14142248Sraf sig_mutex_unlock(&head->lio_mutex); 14152248Sraf } 14162248Sraf } 14172248Sraf 14182248Sraf /* 14192248Sraf * The request is completed; now perform the notifications. 14202248Sraf */ 14212248Sraf if (notify) { 14222248Sraf if (reqp != NULL) { 14232248Sraf /* 14242248Sraf * We usually put the request on the notification 14252248Sraf * queue because we don't want to block and delay 14262248Sraf * other operations behind us in the work queue. 14272248Sraf * Also we must never block on a cancel notification 14282248Sraf * because we are being called from an application 14292248Sraf * thread in this case and that could lead to deadlock 14302248Sraf * if no other thread is receiving notificatins. 14312248Sraf */ 14322248Sraf reqp->req_notify = np; 14332248Sraf reqp->req_op = AIONOTIFY; 14342248Sraf _aio_req_add(reqp, &__workers_no, AIONOTIFY); 14352248Sraf reqp = NULL; 14362248Sraf } else { 14372248Sraf /* 14382248Sraf * We already put the request on the done queue, 14392248Sraf * so we can't queue it to the notification queue. 14402248Sraf * Just do the notification directly. 14412248Sraf */ 14422248Sraf send_notification(&np); 14432248Sraf } 14442248Sraf } 14452248Sraf 14462248Sraf if (reqp != NULL) 14472248Sraf _aio_req_free(reqp); 14482248Sraf } 14492248Sraf 14502248Sraf /* 14512248Sraf * Delete fsync requests from list head until there is 14522248Sraf * only one left. Return 0 when there is only one, 14532248Sraf * otherwise return a non-zero value. 14542248Sraf */ 14552248Sraf static int 14562248Sraf _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp) 14572248Sraf { 14582248Sraf aio_lio_t *head = reqp->req_head; 14592248Sraf int rval = 0; 14602248Sraf 14612248Sraf ASSERT(reqp == aiowp->work_req); 14622248Sraf sig_mutex_lock(&aiowp->work_qlock1); 14632248Sraf sig_mutex_lock(&head->lio_mutex); 14642248Sraf if (head->lio_refcnt > 1) { 14652248Sraf head->lio_refcnt--; 14662248Sraf head->lio_nent--; 14672248Sraf aiowp->work_req = NULL; 14682248Sraf sig_mutex_unlock(&head->lio_mutex); 14692248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 14702248Sraf sig_mutex_lock(&__aio_mutex); 14712248Sraf _aio_outstand_cnt--; 14722248Sraf _aio_waitn_wakeup(); 14732248Sraf sig_mutex_unlock(&__aio_mutex); 14742248Sraf _aio_req_free(reqp); 14752248Sraf return (1); 14762248Sraf } 14772248Sraf ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1); 14782248Sraf reqp->req_head = NULL; 14792248Sraf if (head->lio_canned) 14802248Sraf reqp->req_state = AIO_REQ_CANCELED; 14812248Sraf if (head->lio_mode == LIO_DESTROY) { 14822248Sraf aiowp->work_req = NULL; 14832248Sraf rval = 1; 14842248Sraf } 14852248Sraf sig_mutex_unlock(&head->lio_mutex); 14862248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 14872248Sraf head->lio_refcnt--; 14882248Sraf head->lio_nent--; 14892248Sraf _aio_lio_free(head); 14902248Sraf if (rval != 0) 14912248Sraf _aio_req_free(reqp); 14922248Sraf return (rval); 14932248Sraf } 14942248Sraf 14952248Sraf /* 14962248Sraf * A worker is set idle when its work queue is empty. 14972248Sraf * The worker checks again that it has no more work 14982248Sraf * and then goes to sleep waiting for more work. 14992248Sraf */ 15002248Sraf int 15012248Sraf _aio_idle(aio_worker_t *aiowp) 15022248Sraf { 15032248Sraf int error = 0; 15042248Sraf 15052248Sraf sig_mutex_lock(&aiowp->work_qlock1); 15062248Sraf if (aiowp->work_count1 == 0) { 15072248Sraf ASSERT(aiowp->work_minload1 == 0); 15082248Sraf aiowp->work_idleflg = 1; 15092248Sraf /* 15102248Sraf * A cancellation handler is not needed here. 15112248Sraf * aio worker threads are never cancelled via pthread_cancel(). 15122248Sraf */ 15132248Sraf error = sig_cond_wait(&aiowp->work_idle_cv, 15142248Sraf &aiowp->work_qlock1); 15152248Sraf /* 15162248Sraf * The idle flag is normally cleared before worker is awakened 15172248Sraf * by aio_req_add(). On error (EINTR), we clear it ourself. 15182248Sraf */ 15192248Sraf if (error) 15202248Sraf aiowp->work_idleflg = 0; 15212248Sraf } 15222248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 15232248Sraf return (error); 15242248Sraf } 15252248Sraf 15262248Sraf /* 15272248Sraf * A worker's completed AIO requests are placed onto a global 15282248Sraf * done queue. The application is only sent a SIGIO signal if 15292248Sraf * the process has a handler enabled and it is not waiting via 15302248Sraf * aiowait(). 15312248Sraf */ 15322248Sraf static void 15332248Sraf _aio_work_done(aio_worker_t *aiowp) 15342248Sraf { 15352248Sraf aio_req_t *reqp; 15362248Sraf 15372248Sraf sig_mutex_lock(&aiowp->work_qlock1); 15382248Sraf reqp = aiowp->work_prev1; 15392248Sraf reqp->req_next = NULL; 15402248Sraf aiowp->work_done1 = 0; 15412248Sraf aiowp->work_tail1 = aiowp->work_next1; 15422248Sraf if (aiowp->work_tail1 == NULL) 15432248Sraf aiowp->work_head1 = NULL; 15442248Sraf aiowp->work_prev1 = NULL; 15452248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 15462248Sraf sig_mutex_lock(&__aio_mutex); 15472248Sraf _aio_donecnt++; 15482248Sraf _aio_outstand_cnt--; 15492248Sraf _aio_req_done_cnt--; 15502248Sraf ASSERT(_aio_donecnt > 0 && 15512248Sraf _aio_outstand_cnt >= 0 && 15522248Sraf _aio_req_done_cnt >= 0); 15532248Sraf ASSERT(reqp != NULL); 15542248Sraf 15552248Sraf if (_aio_done_tail == NULL) { 15562248Sraf _aio_done_head = _aio_done_tail = reqp; 15572248Sraf } else { 15582248Sraf _aio_done_head->req_next = reqp; 15592248Sraf _aio_done_head = reqp; 15602248Sraf } 15612248Sraf 15622248Sraf if (_aiowait_flag) { 15632248Sraf sig_mutex_unlock(&__aio_mutex); 15642248Sraf (void) _kaio(AIONOTIFY); 15652248Sraf } else { 15662248Sraf sig_mutex_unlock(&__aio_mutex); 15672248Sraf if (_sigio_enabled) 15682248Sraf (void) kill(__pid, SIGIO); 15692248Sraf } 15702248Sraf } 15712248Sraf 15722248Sraf /* 15732248Sraf * The done queue consists of AIO requests that are in either the 15742248Sraf * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled 15752248Sraf * are discarded. If the done queue is empty then NULL is returned. 15762248Sraf * Otherwise the address of a done aio_result_t is returned. 15772248Sraf */ 15782248Sraf aio_result_t * 15792248Sraf _aio_req_done(void) 15802248Sraf { 15812248Sraf aio_req_t *reqp; 15822248Sraf aio_result_t *resultp; 15832248Sraf 15842248Sraf ASSERT(MUTEX_HELD(&__aio_mutex)); 15852248Sraf 15862248Sraf if ((reqp = _aio_done_tail) != NULL) { 15872248Sraf if ((_aio_done_tail = reqp->req_next) == NULL) 15882248Sraf _aio_done_head = NULL; 15892248Sraf ASSERT(_aio_donecnt > 0); 15902248Sraf _aio_donecnt--; 15912248Sraf (void) _aio_hash_del(reqp->req_resultp); 15922248Sraf resultp = reqp->req_resultp; 15932248Sraf ASSERT(reqp->req_state == AIO_REQ_DONE); 15942248Sraf _aio_req_free(reqp); 15952248Sraf return (resultp); 15962248Sraf } 15972248Sraf /* is queue empty? */ 15982248Sraf if (reqp == NULL && _aio_outstand_cnt == 0) { 15992248Sraf return ((aio_result_t *)-1); 16002248Sraf } 16012248Sraf return (NULL); 16022248Sraf } 16032248Sraf 16042248Sraf /* 16052248Sraf * Set the return and errno values for the application's use. 16062248Sraf * 16072248Sraf * For the Posix interfaces, we must set the return value first followed 16082248Sraf * by the errno value because the Posix interfaces allow for a change 16092248Sraf * in the errno value from EINPROGRESS to something else to signal 16102248Sraf * the completion of the asynchronous request. 16112248Sraf * 16122248Sraf * The opposite is true for the Solaris interfaces. These allow for 16132248Sraf * a change in the return value from AIO_INPROGRESS to something else 16142248Sraf * to signal the completion of the asynchronous request. 16152248Sraf */ 16162248Sraf void 16172248Sraf _aio_set_result(aio_req_t *reqp, ssize_t retval, int error) 16182248Sraf { 16192248Sraf aio_result_t *resultp = reqp->req_resultp; 16202248Sraf 16212248Sraf if (POSIX_AIO(reqp)) { 16222248Sraf resultp->aio_return = retval; 16232248Sraf membar_producer(); 16242248Sraf resultp->aio_errno = error; 16252248Sraf } else { 16262248Sraf resultp->aio_errno = error; 16272248Sraf membar_producer(); 16282248Sraf resultp->aio_return = retval; 16292248Sraf } 16302248Sraf } 16312248Sraf 16322248Sraf /* 16332248Sraf * Add an AIO request onto the next work queue. 16342248Sraf * A circular list of workers is used to choose the next worker. 16352248Sraf */ 16362248Sraf void 16372248Sraf _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode) 16382248Sraf { 16392248Sraf ulwp_t *self = curthread; 16402248Sraf aio_worker_t *aiowp; 16412248Sraf aio_worker_t *first; 16422248Sraf int load_bal_flg = 1; 16432248Sraf int found; 16442248Sraf 16452248Sraf ASSERT(reqp->req_state != AIO_REQ_DONEQ); 16462248Sraf reqp->req_next = NULL; 16472248Sraf /* 16482248Sraf * Try to acquire the next worker's work queue. If it is locked, 16492248Sraf * then search the list of workers until a queue is found unlocked, 16502248Sraf * or until the list is completely traversed at which point another 16512248Sraf * worker will be created. 16522248Sraf */ 16532248Sraf sigoff(self); /* defer SIGIO */ 16542248Sraf sig_mutex_lock(&__aio_mutex); 16552248Sraf first = aiowp = *nextworker; 16562248Sraf if (mode != AIONOTIFY) 16572248Sraf _aio_outstand_cnt++; 16582248Sraf sig_mutex_unlock(&__aio_mutex); 16592248Sraf 16602248Sraf switch (mode) { 16612248Sraf case AIOREAD: 16622248Sraf case AIOWRITE: 16632248Sraf case AIOAREAD: 16642248Sraf case AIOAWRITE: 16652248Sraf #if !defined(_LP64) 16662248Sraf case AIOAREAD64: 16672248Sraf case AIOAWRITE64: 16682248Sraf #endif 16692248Sraf /* try to find an idle worker */ 16702248Sraf found = 0; 16712248Sraf do { 16722248Sraf if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 16732248Sraf if (aiowp->work_idleflg) { 16742248Sraf found = 1; 16752248Sraf break; 16762248Sraf } 16772248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 16782248Sraf } 16792248Sraf } while ((aiowp = aiowp->work_forw) != first); 16802248Sraf 16812248Sraf if (found) { 16822248Sraf aiowp->work_minload1++; 16832248Sraf break; 16842248Sraf } 16852248Sraf 16862248Sraf /* try to acquire some worker's queue lock */ 16872248Sraf do { 16882248Sraf if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 16892248Sraf found = 1; 16902248Sraf break; 16912248Sraf } 16922248Sraf } while ((aiowp = aiowp->work_forw) != first); 16932248Sraf 16942248Sraf /* 16952248Sraf * Create more workers when the workers appear overloaded. 16962248Sraf * Either all the workers are busy draining their queues 16972248Sraf * or no worker's queue lock could be acquired. 16982248Sraf */ 16992248Sraf if (!found) { 17002248Sraf if (_aio_worker_cnt < _max_workers) { 17012248Sraf if (_aio_create_worker(reqp, mode)) 17022248Sraf aio_panic("_aio_req_add: add worker"); 17032248Sraf sigon(self); /* reenable SIGIO */ 17042248Sraf return; 17052248Sraf } 17062248Sraf 17072248Sraf /* 17082248Sraf * No worker available and we have created 17092248Sraf * _max_workers, keep going through the 17102248Sraf * list slowly until we get a lock 17112248Sraf */ 17122248Sraf while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) { 17132248Sraf /* 17142248Sraf * give someone else a chance 17152248Sraf */ 17162248Sraf _aio_delay(1); 17172248Sraf aiowp = aiowp->work_forw; 17182248Sraf } 17192248Sraf } 17202248Sraf 17212248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 17222248Sraf if (_aio_worker_cnt < _max_workers && 17232248Sraf aiowp->work_minload1 >= _minworkload) { 17242248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 17252248Sraf sig_mutex_lock(&__aio_mutex); 17262248Sraf *nextworker = aiowp->work_forw; 17272248Sraf sig_mutex_unlock(&__aio_mutex); 17282248Sraf if (_aio_create_worker(reqp, mode)) 17292248Sraf aio_panic("aio_req_add: add worker"); 17302248Sraf sigon(self); /* reenable SIGIO */ 17312248Sraf return; 17322248Sraf } 17332248Sraf aiowp->work_minload1++; 17342248Sraf break; 17352248Sraf case AIOFSYNC: 17362248Sraf case AIONOTIFY: 17372248Sraf load_bal_flg = 0; 17382248Sraf sig_mutex_lock(&aiowp->work_qlock1); 17392248Sraf break; 17402248Sraf default: 17412248Sraf aio_panic("_aio_req_add: invalid mode"); 17422248Sraf break; 17432248Sraf } 17442248Sraf /* 17452248Sraf * Put request onto worker's work queue. 17462248Sraf */ 17472248Sraf if (aiowp->work_tail1 == NULL) { 17482248Sraf ASSERT(aiowp->work_count1 == 0); 17492248Sraf aiowp->work_tail1 = reqp; 17502248Sraf aiowp->work_next1 = reqp; 17512248Sraf } else { 17522248Sraf aiowp->work_head1->req_next = reqp; 17532248Sraf if (aiowp->work_next1 == NULL) 17542248Sraf aiowp->work_next1 = reqp; 17552248Sraf } 17562248Sraf reqp->req_state = AIO_REQ_QUEUED; 17572248Sraf reqp->req_worker = aiowp; 17582248Sraf aiowp->work_head1 = reqp; 17592248Sraf /* 17602248Sraf * Awaken worker if it is not currently active. 17612248Sraf */ 17622248Sraf if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) { 17632248Sraf aiowp->work_idleflg = 0; 17642248Sraf (void) cond_signal(&aiowp->work_idle_cv); 17652248Sraf } 17662248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 17672248Sraf 17682248Sraf if (load_bal_flg) { 17692248Sraf sig_mutex_lock(&__aio_mutex); 17702248Sraf *nextworker = aiowp->work_forw; 17712248Sraf sig_mutex_unlock(&__aio_mutex); 17722248Sraf } 17732248Sraf sigon(self); /* reenable SIGIO */ 17742248Sraf } 17752248Sraf 17762248Sraf /* 17772248Sraf * Get an AIO request for a specified worker. 17782248Sraf * If the work queue is empty, return NULL. 17792248Sraf */ 17802248Sraf aio_req_t * 17812248Sraf _aio_req_get(aio_worker_t *aiowp) 17822248Sraf { 17832248Sraf aio_req_t *reqp; 17842248Sraf 17852248Sraf sig_mutex_lock(&aiowp->work_qlock1); 17862248Sraf if ((reqp = aiowp->work_next1) != NULL) { 17872248Sraf /* 17882248Sraf * Remove a POSIX request from the queue; the 17892248Sraf * request queue is a singularly linked list 17902248Sraf * with a previous pointer. The request is 17912248Sraf * removed by updating the previous pointer. 17922248Sraf * 17932248Sraf * Non-posix requests are left on the queue 17942248Sraf * to eventually be placed on the done queue. 17952248Sraf */ 17962248Sraf 17972248Sraf if (POSIX_AIO(reqp)) { 17982248Sraf if (aiowp->work_prev1 == NULL) { 17992248Sraf aiowp->work_tail1 = reqp->req_next; 18002248Sraf if (aiowp->work_tail1 == NULL) 18012248Sraf aiowp->work_head1 = NULL; 18022248Sraf } else { 18032248Sraf aiowp->work_prev1->req_next = reqp->req_next; 18042248Sraf if (aiowp->work_head1 == reqp) 18052248Sraf aiowp->work_head1 = reqp->req_next; 18062248Sraf } 18072248Sraf 18082248Sraf } else { 18092248Sraf aiowp->work_prev1 = reqp; 18102248Sraf ASSERT(aiowp->work_done1 >= 0); 18112248Sraf aiowp->work_done1++; 18122248Sraf } 18132248Sraf ASSERT(reqp != reqp->req_next); 18142248Sraf aiowp->work_next1 = reqp->req_next; 18152248Sraf ASSERT(aiowp->work_count1 >= 1); 18162248Sraf aiowp->work_count1--; 18172248Sraf switch (reqp->req_op) { 18182248Sraf case AIOREAD: 18192248Sraf case AIOWRITE: 18202248Sraf case AIOAREAD: 18212248Sraf case AIOAWRITE: 18222248Sraf #if !defined(_LP64) 18232248Sraf case AIOAREAD64: 18242248Sraf case AIOAWRITE64: 18252248Sraf #endif 18262248Sraf ASSERT(aiowp->work_minload1 > 0); 18272248Sraf aiowp->work_minload1--; 18282248Sraf break; 18292248Sraf } 18302248Sraf reqp->req_state = AIO_REQ_INPROGRESS; 18312248Sraf } 18322248Sraf aiowp->work_req = reqp; 18332248Sraf ASSERT(reqp != NULL || aiowp->work_count1 == 0); 18342248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 18352248Sraf return (reqp); 18362248Sraf } 18372248Sraf 18382248Sraf static void 18392248Sraf _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate) 18402248Sraf { 18412248Sraf aio_req_t **last; 18422248Sraf aio_req_t *lastrp; 18432248Sraf aio_req_t *next; 18442248Sraf 18452248Sraf ASSERT(aiowp != NULL); 18462248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 18472248Sraf if (POSIX_AIO(reqp)) { 18482248Sraf if (ostate != AIO_REQ_QUEUED) 18492248Sraf return; 18502248Sraf } 18512248Sraf last = &aiowp->work_tail1; 18522248Sraf lastrp = aiowp->work_tail1; 18532248Sraf ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS); 18542248Sraf while ((next = *last) != NULL) { 18552248Sraf if (next == reqp) { 18562248Sraf *last = next->req_next; 18572248Sraf if (aiowp->work_next1 == next) 18582248Sraf aiowp->work_next1 = next->req_next; 18592248Sraf 18602248Sraf if ((next->req_next != NULL) || 18612248Sraf (aiowp->work_done1 == 0)) { 18622248Sraf if (aiowp->work_head1 == next) 18632248Sraf aiowp->work_head1 = next->req_next; 18642248Sraf if (aiowp->work_prev1 == next) 18652248Sraf aiowp->work_prev1 = next->req_next; 18662248Sraf } else { 18672248Sraf if (aiowp->work_head1 == next) 18682248Sraf aiowp->work_head1 = lastrp; 18692248Sraf if (aiowp->work_prev1 == next) 18702248Sraf aiowp->work_prev1 = lastrp; 18712248Sraf } 18722248Sraf 18732248Sraf if (ostate == AIO_REQ_QUEUED) { 18742248Sraf ASSERT(aiowp->work_count1 >= 1); 18752248Sraf aiowp->work_count1--; 18762248Sraf ASSERT(aiowp->work_minload1 >= 1); 18772248Sraf aiowp->work_minload1--; 18782248Sraf } else { 18792248Sraf ASSERT(ostate == AIO_REQ_INPROGRESS && 18802248Sraf !POSIX_AIO(reqp)); 18812248Sraf aiowp->work_done1--; 18822248Sraf } 18832248Sraf return; 18842248Sraf } 18852248Sraf last = &next->req_next; 18862248Sraf lastrp = next; 18872248Sraf } 18882248Sraf /* NOTREACHED */ 18892248Sraf } 18902248Sraf 18912248Sraf static void 18922248Sraf _aio_enq_doneq(aio_req_t *reqp) 18932248Sraf { 18942248Sraf if (_aio_doneq == NULL) { 18952248Sraf _aio_doneq = reqp; 18962248Sraf reqp->req_next = reqp->req_prev = reqp; 18972248Sraf } else { 18982248Sraf reqp->req_next = _aio_doneq; 18992248Sraf reqp->req_prev = _aio_doneq->req_prev; 19002248Sraf _aio_doneq->req_prev->req_next = reqp; 19012248Sraf _aio_doneq->req_prev = reqp; 19022248Sraf } 19032248Sraf reqp->req_state = AIO_REQ_DONEQ; 19042248Sraf _aio_doneq_cnt++; 19052248Sraf } 19062248Sraf 19072248Sraf /* 19082248Sraf * caller owns the _aio_mutex 19092248Sraf */ 19102248Sraf aio_req_t * 19112248Sraf _aio_req_remove(aio_req_t *reqp) 19122248Sraf { 19132248Sraf if (reqp && reqp->req_state != AIO_REQ_DONEQ) 19142248Sraf return (NULL); 19152248Sraf 19162248Sraf if (reqp) { 19172248Sraf /* request in done queue */ 19182248Sraf if (_aio_doneq == reqp) 19192248Sraf _aio_doneq = reqp->req_next; 19202248Sraf if (_aio_doneq == reqp) { 19212248Sraf /* only one request on queue */ 19222248Sraf _aio_doneq = NULL; 19232248Sraf } else { 19242248Sraf aio_req_t *tmp = reqp->req_next; 19252248Sraf reqp->req_prev->req_next = tmp; 19262248Sraf tmp->req_prev = reqp->req_prev; 19272248Sraf } 19282248Sraf } else if ((reqp = _aio_doneq) != NULL) { 19292248Sraf if (reqp == reqp->req_next) { 19302248Sraf /* only one request on queue */ 19312248Sraf _aio_doneq = NULL; 19322248Sraf } else { 19332248Sraf reqp->req_prev->req_next = _aio_doneq = reqp->req_next; 19342248Sraf _aio_doneq->req_prev = reqp->req_prev; 19352248Sraf } 19362248Sraf } 19372248Sraf if (reqp) { 19382248Sraf _aio_doneq_cnt--; 19392248Sraf reqp->req_next = reqp->req_prev = reqp; 19402248Sraf reqp->req_state = AIO_REQ_DONE; 19412248Sraf } 19422248Sraf return (reqp); 19432248Sraf } 19442248Sraf 19452248Sraf /* 19462248Sraf * An AIO request is identified by an aio_result_t pointer. The library 19472248Sraf * maps this aio_result_t pointer to its internal representation using a 19482248Sraf * hash table. This function adds an aio_result_t pointer to the hash table. 19492248Sraf */ 19502248Sraf static int 19512248Sraf _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp) 19522248Sraf { 19532248Sraf aio_hash_t *hashp; 19542248Sraf aio_req_t **prev; 19552248Sraf aio_req_t *next; 19562248Sraf 19572248Sraf hashp = _aio_hash + AIOHASH(resultp); 19582248Sraf lmutex_lock(&hashp->hash_lock); 19592248Sraf prev = &hashp->hash_ptr; 19602248Sraf while ((next = *prev) != NULL) { 19612248Sraf if (resultp == next->req_resultp) { 19622248Sraf lmutex_unlock(&hashp->hash_lock); 19632248Sraf return (-1); 19642248Sraf } 19652248Sraf prev = &next->req_link; 19662248Sraf } 19672248Sraf *prev = reqp; 19682248Sraf ASSERT(reqp->req_link == NULL); 19692248Sraf lmutex_unlock(&hashp->hash_lock); 19702248Sraf return (0); 19712248Sraf } 19722248Sraf 19732248Sraf /* 19742248Sraf * Remove an entry from the hash table. 19752248Sraf */ 19762248Sraf aio_req_t * 19772248Sraf _aio_hash_del(aio_result_t *resultp) 19782248Sraf { 19792248Sraf aio_hash_t *hashp; 19802248Sraf aio_req_t **prev; 19812248Sraf aio_req_t *next = NULL; 19822248Sraf 19832248Sraf if (_aio_hash != NULL) { 19842248Sraf hashp = _aio_hash + AIOHASH(resultp); 19852248Sraf lmutex_lock(&hashp->hash_lock); 19862248Sraf prev = &hashp->hash_ptr; 19872248Sraf while ((next = *prev) != NULL) { 19882248Sraf if (resultp == next->req_resultp) { 19892248Sraf *prev = next->req_link; 19902248Sraf next->req_link = NULL; 19912248Sraf break; 19922248Sraf } 19932248Sraf prev = &next->req_link; 19942248Sraf } 19952248Sraf lmutex_unlock(&hashp->hash_lock); 19962248Sraf } 19972248Sraf return (next); 19982248Sraf } 19992248Sraf 20002248Sraf /* 20012248Sraf * find an entry in the hash table 20022248Sraf */ 20032248Sraf aio_req_t * 20042248Sraf _aio_hash_find(aio_result_t *resultp) 20052248Sraf { 20062248Sraf aio_hash_t *hashp; 20072248Sraf aio_req_t **prev; 20082248Sraf aio_req_t *next = NULL; 20092248Sraf 20102248Sraf if (_aio_hash != NULL) { 20112248Sraf hashp = _aio_hash + AIOHASH(resultp); 20122248Sraf lmutex_lock(&hashp->hash_lock); 20132248Sraf prev = &hashp->hash_ptr; 20142248Sraf while ((next = *prev) != NULL) { 20152248Sraf if (resultp == next->req_resultp) 20162248Sraf break; 20172248Sraf prev = &next->req_link; 20182248Sraf } 20192248Sraf lmutex_unlock(&hashp->hash_lock); 20202248Sraf } 20212248Sraf return (next); 20222248Sraf } 20232248Sraf 20242248Sraf /* 20252248Sraf * AIO interface for POSIX 20262248Sraf */ 20272248Sraf int 20282248Sraf _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 20292248Sraf int mode, int flg) 20302248Sraf { 20312248Sraf aio_req_t *reqp; 20322248Sraf aio_args_t *ap; 20332248Sraf int kerr; 20342248Sraf 20352248Sraf if (aiocbp == NULL) { 20362248Sraf errno = EINVAL; 20372248Sraf return (-1); 20382248Sraf } 20392248Sraf 20402248Sraf /* initialize kaio */ 20412248Sraf if (!_kaio_ok) 20422248Sraf _kaio_init(); 20432248Sraf 20442248Sraf aiocbp->aio_state = NOCHECK; 20452248Sraf 20462248Sraf /* 20472248Sraf * If we have been called because a list I/O 20482248Sraf * kaio() failed, we dont want to repeat the 20492248Sraf * system call 20502248Sraf */ 20512248Sraf 20522248Sraf if (flg & AIO_KAIO) { 20532248Sraf /* 20542248Sraf * Try kernel aio first. 20552248Sraf * If errno is ENOTSUP/EBADFD, 20562248Sraf * fall back to the thread implementation. 20572248Sraf */ 20582248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 20592248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 20602248Sraf aiocbp->aio_state = CHECK; 20612248Sraf kerr = (int)_kaio(mode, aiocbp); 20622248Sraf if (kerr == 0) 20632248Sraf return (0); 20642248Sraf if (errno != ENOTSUP && errno != EBADFD) { 20652248Sraf aiocbp->aio_resultp.aio_errno = errno; 20662248Sraf aiocbp->aio_resultp.aio_return = -1; 20672248Sraf aiocbp->aio_state = NOCHECK; 20682248Sraf return (-1); 20692248Sraf } 20702248Sraf if (errno == EBADFD) 20712248Sraf SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 20722248Sraf } 20732248Sraf } 20742248Sraf 20752248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 20762248Sraf aiocbp->aio_state = USERAIO; 20772248Sraf 20782248Sraf if (!__uaio_ok && __uaio_init() == -1) 20792248Sraf return (-1); 20802248Sraf 20812248Sraf if ((reqp = _aio_req_alloc()) == NULL) { 20822248Sraf errno = EAGAIN; 20832248Sraf return (-1); 20842248Sraf } 20852248Sraf 20862248Sraf /* 20872248Sraf * If an LIO request, add the list head to the aio request 20882248Sraf */ 20892248Sraf reqp->req_head = lio_head; 20902248Sraf reqp->req_type = AIO_POSIX_REQ; 20912248Sraf reqp->req_op = mode; 20922248Sraf reqp->req_largefile = 0; 20932248Sraf 20942248Sraf if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 20952248Sraf reqp->req_sigevent.sigev_notify = SIGEV_NONE; 20962248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 20972248Sraf reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 20982248Sraf reqp->req_sigevent.sigev_signo = 20992248Sraf aiocbp->aio_sigevent.sigev_signo; 21002248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 21012248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 21022248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 21032248Sraf port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 21042248Sraf reqp->req_sigevent.sigev_notify = SIGEV_PORT; 21052248Sraf /* 21062248Sraf * Reuse the sigevent structure to contain the port number 21072248Sraf * and the user value. Same for SIGEV_THREAD, below. 21082248Sraf */ 21092248Sraf reqp->req_sigevent.sigev_signo = 21102248Sraf pn->portnfy_port; 21112248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 21122248Sraf pn->portnfy_user; 21132248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 21142248Sraf reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 21152248Sraf /* 21162248Sraf * The sigevent structure contains the port number 21172248Sraf * and the user value. Same for SIGEV_PORT, above. 21182248Sraf */ 21192248Sraf reqp->req_sigevent.sigev_signo = 21202248Sraf aiocbp->aio_sigevent.sigev_signo; 21212248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 21222248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 21232248Sraf } 21242248Sraf 21252248Sraf reqp->req_resultp = &aiocbp->aio_resultp; 21262248Sraf reqp->req_aiocbp = aiocbp; 21272248Sraf ap = &reqp->req_args; 21282248Sraf ap->fd = aiocbp->aio_fildes; 21292248Sraf ap->buf = (caddr_t)aiocbp->aio_buf; 21302248Sraf ap->bufsz = aiocbp->aio_nbytes; 21312248Sraf ap->offset = aiocbp->aio_offset; 21322248Sraf 21332248Sraf if ((flg & AIO_NO_DUPS) && 21342248Sraf _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 21352248Sraf aio_panic("_aio_rw(): request already in hash table"); 21362248Sraf _aio_req_free(reqp); 21372248Sraf errno = EINVAL; 21382248Sraf return (-1); 21392248Sraf } 21402248Sraf _aio_req_add(reqp, nextworker, mode); 21412248Sraf return (0); 21422248Sraf } 21432248Sraf 21442248Sraf #if !defined(_LP64) 21452248Sraf /* 21462248Sraf * 64-bit AIO interface for POSIX 21472248Sraf */ 21482248Sraf int 21492248Sraf _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 21502248Sraf int mode, int flg) 21512248Sraf { 21522248Sraf aio_req_t *reqp; 21532248Sraf aio_args_t *ap; 21542248Sraf int kerr; 21552248Sraf 21562248Sraf if (aiocbp == NULL) { 21572248Sraf errno = EINVAL; 21582248Sraf return (-1); 21592248Sraf } 21602248Sraf 21612248Sraf /* initialize kaio */ 21622248Sraf if (!_kaio_ok) 21632248Sraf _kaio_init(); 21642248Sraf 21652248Sraf aiocbp->aio_state = NOCHECK; 21662248Sraf 21672248Sraf /* 21682248Sraf * If we have been called because a list I/O 21692248Sraf * kaio() failed, we dont want to repeat the 21702248Sraf * system call 21712248Sraf */ 21722248Sraf 21732248Sraf if (flg & AIO_KAIO) { 21742248Sraf /* 21752248Sraf * Try kernel aio first. 21762248Sraf * If errno is ENOTSUP/EBADFD, 21772248Sraf * fall back to the thread implementation. 21782248Sraf */ 21792248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 21802248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 21812248Sraf aiocbp->aio_state = CHECK; 21822248Sraf kerr = (int)_kaio(mode, aiocbp); 21832248Sraf if (kerr == 0) 21842248Sraf return (0); 21852248Sraf if (errno != ENOTSUP && errno != EBADFD) { 21862248Sraf aiocbp->aio_resultp.aio_errno = errno; 21872248Sraf aiocbp->aio_resultp.aio_return = -1; 21882248Sraf aiocbp->aio_state = NOCHECK; 21892248Sraf return (-1); 21902248Sraf } 21912248Sraf if (errno == EBADFD) 21922248Sraf SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 21932248Sraf } 21942248Sraf } 21952248Sraf 21962248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 21972248Sraf aiocbp->aio_state = USERAIO; 21982248Sraf 21992248Sraf if (!__uaio_ok && __uaio_init() == -1) 22002248Sraf return (-1); 22012248Sraf 22022248Sraf if ((reqp = _aio_req_alloc()) == NULL) { 22032248Sraf errno = EAGAIN; 22042248Sraf return (-1); 22052248Sraf } 22062248Sraf 22072248Sraf /* 22082248Sraf * If an LIO request, add the list head to the aio request 22092248Sraf */ 22102248Sraf reqp->req_head = lio_head; 22112248Sraf reqp->req_type = AIO_POSIX_REQ; 22122248Sraf reqp->req_op = mode; 22132248Sraf reqp->req_largefile = 1; 22142248Sraf 22152248Sraf if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 22162248Sraf reqp->req_sigevent.sigev_notify = SIGEV_NONE; 22172248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 22182248Sraf reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 22192248Sraf reqp->req_sigevent.sigev_signo = 22202248Sraf aiocbp->aio_sigevent.sigev_signo; 22212248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 22222248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 22232248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 22242248Sraf port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 22252248Sraf reqp->req_sigevent.sigev_notify = SIGEV_PORT; 22262248Sraf reqp->req_sigevent.sigev_signo = 22272248Sraf pn->portnfy_port; 22282248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 22292248Sraf pn->portnfy_user; 22302248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 22312248Sraf reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 22322248Sraf reqp->req_sigevent.sigev_signo = 22332248Sraf aiocbp->aio_sigevent.sigev_signo; 22342248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 22352248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 22362248Sraf } 22372248Sraf 22382248Sraf reqp->req_resultp = &aiocbp->aio_resultp; 22392248Sraf reqp->req_aiocbp = aiocbp; 22402248Sraf ap = &reqp->req_args; 22412248Sraf ap->fd = aiocbp->aio_fildes; 22422248Sraf ap->buf = (caddr_t)aiocbp->aio_buf; 22432248Sraf ap->bufsz = aiocbp->aio_nbytes; 22442248Sraf ap->offset = aiocbp->aio_offset; 22452248Sraf 22462248Sraf if ((flg & AIO_NO_DUPS) && 22472248Sraf _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 22482248Sraf aio_panic("_aio_rw64(): request already in hash table"); 22492248Sraf _aio_req_free(reqp); 22502248Sraf errno = EINVAL; 22512248Sraf return (-1); 22522248Sraf } 22532248Sraf _aio_req_add(reqp, nextworker, mode); 22542248Sraf return (0); 22552248Sraf } 22562248Sraf #endif /* !defined(_LP64) */ 2257