12248Sraf /* 22248Sraf * CDDL HEADER START 32248Sraf * 42248Sraf * The contents of this file are subject to the terms of the 52248Sraf * Common Development and Distribution License (the "License"). 62248Sraf * You may not use this file except in compliance with the License. 72248Sraf * 82248Sraf * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 92248Sraf * or http://www.opensolaris.org/os/licensing. 102248Sraf * See the License for the specific language governing permissions 112248Sraf * and limitations under the License. 122248Sraf * 132248Sraf * When distributing Covered Code, include this CDDL HEADER in each 142248Sraf * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 152248Sraf * If applicable, add the following below this CDDL HEADER, with the 162248Sraf * fields enclosed by brackets "[]" replaced with your own identifying 172248Sraf * information: Portions Copyright [yyyy] [name of copyright owner] 182248Sraf * 192248Sraf * CDDL HEADER END 202248Sraf */ 212248Sraf 222248Sraf /* 23*4502Spraks * Copyright 2007 Sun Microsystems, Inc. All rights reserved. 242248Sraf * Use is subject to license terms. 252248Sraf */ 262248Sraf 272248Sraf #pragma ident "%Z%%M% %I% %E% SMI" 282248Sraf 292248Sraf #include "synonyms.h" 302248Sraf #include "thr_uberdata.h" 312248Sraf #include "asyncio.h" 322248Sraf #include <atomic.h> 332248Sraf #include <sys/param.h> 342248Sraf #include <sys/file.h> 352248Sraf #include <sys/port.h> 362248Sraf 372248Sraf static int _aio_hash_insert(aio_result_t *, aio_req_t *); 382248Sraf static aio_req_t *_aio_req_get(aio_worker_t *); 392248Sraf static void _aio_req_add(aio_req_t *, aio_worker_t **, int); 402248Sraf static void _aio_req_del(aio_worker_t *, aio_req_t *, int); 412248Sraf static void _aio_work_done(aio_worker_t *); 422248Sraf static void _aio_enq_doneq(aio_req_t *); 432248Sraf 442248Sraf extern void _aio_lio_free(aio_lio_t *); 452248Sraf 462248Sraf extern int __fdsync(int, int); 472248Sraf extern int _port_dispatch(int, int, int, int, uintptr_t, void *); 482248Sraf 492248Sraf static int _aio_fsync_del(aio_worker_t *, aio_req_t *); 502248Sraf static void _aiodone(aio_req_t *, ssize_t, int); 512248Sraf static void _aio_cancel_work(aio_worker_t *, int, int *, int *); 522248Sraf static void _aio_finish_request(aio_worker_t *, ssize_t, int); 532248Sraf 542248Sraf /* 552248Sraf * switch for kernel async I/O 562248Sraf */ 572248Sraf int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */ 582248Sraf 592248Sraf /* 602248Sraf * Key for thread-specific data 612248Sraf */ 622248Sraf pthread_key_t _aio_key; 632248Sraf 642248Sraf /* 652248Sraf * Array for determining whether or not a file supports kaio. 662248Sraf * Initialized in _kaio_init(). 672248Sraf */ 682248Sraf uint32_t *_kaio_supported = NULL; 692248Sraf 702248Sraf /* 712248Sraf * workers for read/write requests 722248Sraf * (__aio_mutex lock protects circular linked list of workers) 732248Sraf */ 742248Sraf aio_worker_t *__workers_rw; /* circular list of AIO workers */ 752248Sraf aio_worker_t *__nextworker_rw; /* next worker in list of workers */ 762248Sraf int __rw_workerscnt; /* number of read/write workers */ 772248Sraf 782248Sraf /* 792248Sraf * worker for notification requests. 802248Sraf */ 812248Sraf aio_worker_t *__workers_no; /* circular list of AIO workers */ 822248Sraf aio_worker_t *__nextworker_no; /* next worker in list of workers */ 832248Sraf int __no_workerscnt; /* number of write workers */ 842248Sraf 852248Sraf aio_req_t *_aio_done_tail; /* list of done requests */ 862248Sraf aio_req_t *_aio_done_head; 872248Sraf 882248Sraf mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */ 892248Sraf cond_t __aio_initcv = DEFAULTCV; 902248Sraf int __aio_initbusy = 0; 912248Sraf 922248Sraf mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */ 932248Sraf cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */ 942248Sraf 952248Sraf pid_t __pid = (pid_t)-1; /* initialize as invalid pid */ 962248Sraf int _sigio_enabled = 0; /* when set, send SIGIO signal */ 972248Sraf 982248Sraf aio_hash_t *_aio_hash; 992248Sraf 1002248Sraf aio_req_t *_aio_doneq; /* double linked done queue list */ 1012248Sraf 1022248Sraf int _aio_donecnt = 0; 1032248Sraf int _aio_waitncnt = 0; /* # of requests for aio_waitn */ 1042248Sraf int _aio_doneq_cnt = 0; 1052248Sraf int _aio_outstand_cnt = 0; /* # of outstanding requests */ 1062248Sraf int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */ 1072248Sraf int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */ 1082248Sraf int _aio_kernel_suspend = 0; /* active kernel kaio calls */ 1092248Sraf int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */ 1102248Sraf 1112248Sraf int _max_workers = 256; /* max number of workers permitted */ 1122248Sraf int _min_workers = 4; /* min number of workers */ 1132248Sraf int _minworkload = 2; /* min number of request in q */ 1142248Sraf int _aio_worker_cnt = 0; /* number of workers to do requests */ 1152248Sraf int __uaio_ok = 0; /* AIO has been enabled */ 1162248Sraf sigset_t _worker_set; /* worker's signal mask */ 1172248Sraf 1182248Sraf int _aiowait_flag = 0; /* when set, aiowait() is inprogress */ 1192248Sraf int _aio_flags = 0; /* see asyncio.h defines for */ 1202248Sraf 1212248Sraf aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */ 1222248Sraf 1232248Sraf int hz; /* clock ticks per second */ 1242248Sraf 1252248Sraf static int 1262248Sraf _kaio_supported_init(void) 1272248Sraf { 1282248Sraf void *ptr; 1292248Sraf size_t size; 1302248Sraf 1312248Sraf if (_kaio_supported != NULL) /* already initialized */ 1322248Sraf return (0); 1332248Sraf 1342248Sraf size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t); 1352248Sraf ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, 1362248Sraf MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 1372248Sraf if (ptr == MAP_FAILED) 1382248Sraf return (-1); 1392248Sraf _kaio_supported = ptr; 1402248Sraf return (0); 1412248Sraf } 1422248Sraf 1432248Sraf /* 1442248Sraf * The aio subsystem is initialized when an AIO request is made. 1452248Sraf * Constants are initialized like the max number of workers that 1462248Sraf * the subsystem can create, and the minimum number of workers 1472248Sraf * permitted before imposing some restrictions. Also, some 1482248Sraf * workers are created. 1492248Sraf */ 1502248Sraf int 1512248Sraf __uaio_init(void) 1522248Sraf { 1532248Sraf int ret = -1; 1542248Sraf int i; 1552248Sraf 1562248Sraf lmutex_lock(&__aio_initlock); 1572248Sraf while (__aio_initbusy) 1582248Sraf (void) _cond_wait(&__aio_initcv, &__aio_initlock); 1592248Sraf if (__uaio_ok) { /* already initialized */ 1602248Sraf lmutex_unlock(&__aio_initlock); 1612248Sraf return (0); 1622248Sraf } 1632248Sraf __aio_initbusy = 1; 1642248Sraf lmutex_unlock(&__aio_initlock); 1652248Sraf 1662248Sraf hz = (int)sysconf(_SC_CLK_TCK); 1672248Sraf __pid = getpid(); 1682248Sraf 1692248Sraf setup_cancelsig(SIGAIOCANCEL); 1702248Sraf 1712248Sraf if (_kaio_supported_init() != 0) 1722248Sraf goto out; 1732248Sraf 1742248Sraf /* 1752248Sraf * Allocate and initialize the hash table. 1763344Ssp92102 * Do this only once, even if __uaio_init() is called twice. 1772248Sraf */ 1783344Ssp92102 if (_aio_hash == NULL) { 1793344Ssp92102 /* LINTED pointer cast */ 1803344Ssp92102 _aio_hash = (aio_hash_t *)mmap(NULL, 1813344Ssp92102 HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE, 1823344Ssp92102 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 1833344Ssp92102 if ((void *)_aio_hash == MAP_FAILED) { 1843344Ssp92102 _aio_hash = NULL; 1853344Ssp92102 goto out; 1863344Ssp92102 } 1873344Ssp92102 for (i = 0; i < HASHSZ; i++) 1883344Ssp92102 (void) mutex_init(&_aio_hash[i].hash_lock, 1893344Ssp92102 USYNC_THREAD, NULL); 1902248Sraf } 1912248Sraf 1922248Sraf /* 1932248Sraf * Initialize worker's signal mask to only catch SIGAIOCANCEL. 1942248Sraf */ 1952248Sraf (void) sigfillset(&_worker_set); 1962248Sraf (void) sigdelset(&_worker_set, SIGAIOCANCEL); 1972248Sraf 1982248Sraf /* 1993344Ssp92102 * Create one worker to send asynchronous notifications. 2003344Ssp92102 * Do this only once, even if __uaio_init() is called twice. 2013344Ssp92102 */ 2023344Ssp92102 if (__no_workerscnt == 0 && 2033344Ssp92102 (_aio_create_worker(NULL, AIONOTIFY) != 0)) { 2043344Ssp92102 errno = EAGAIN; 2053344Ssp92102 goto out; 2063344Ssp92102 } 2073344Ssp92102 2083344Ssp92102 /* 2092248Sraf * Create the minimum number of read/write workers. 2103344Ssp92102 * And later check whether atleast one worker is created; 2113344Ssp92102 * lwp_create() calls could fail because of segkp exhaustion. 2122248Sraf */ 2132248Sraf for (i = 0; i < _min_workers; i++) 2142248Sraf (void) _aio_create_worker(NULL, AIOREAD); 2153344Ssp92102 if (__rw_workerscnt == 0) { 2163344Ssp92102 errno = EAGAIN; 2173344Ssp92102 goto out; 2183344Ssp92102 } 2192248Sraf 2202248Sraf ret = 0; 2212248Sraf out: 2222248Sraf lmutex_lock(&__aio_initlock); 2232248Sraf if (ret == 0) 2242248Sraf __uaio_ok = 1; 2252248Sraf __aio_initbusy = 0; 2262248Sraf (void) cond_broadcast(&__aio_initcv); 2272248Sraf lmutex_unlock(&__aio_initlock); 2282248Sraf return (ret); 2292248Sraf } 2302248Sraf 2312248Sraf /* 2322248Sraf * Called from close() before actually performing the real _close(). 2332248Sraf */ 2342248Sraf void 2352248Sraf _aio_close(int fd) 2362248Sraf { 2372248Sraf if (fd < 0) /* avoid cancelling everything */ 2382248Sraf return; 2392248Sraf /* 2402248Sraf * Cancel all outstanding aio requests for this file descriptor. 2412248Sraf */ 2422248Sraf if (__uaio_ok) 2432248Sraf (void) aiocancel_all(fd); 2442248Sraf /* 2452248Sraf * If we have allocated the bit array, clear the bit for this file. 2462248Sraf * The next open may re-use this file descriptor and the new file 2472248Sraf * may have different kaio() behaviour. 2482248Sraf */ 2492248Sraf if (_kaio_supported != NULL) 2502248Sraf CLEAR_KAIO_SUPPORTED(fd); 2512248Sraf } 2522248Sraf 2532248Sraf /* 2542248Sraf * special kaio cleanup thread sits in a loop in the 2552248Sraf * kernel waiting for pending kaio requests to complete. 2562248Sraf */ 2572248Sraf void * 2582248Sraf _kaio_cleanup_thread(void *arg) 2592248Sraf { 2602248Sraf if (pthread_setspecific(_aio_key, arg) != 0) 2612248Sraf aio_panic("_kaio_cleanup_thread, pthread_setspecific()"); 2622248Sraf (void) _kaio(AIOSTART); 2632248Sraf return (arg); 2642248Sraf } 2652248Sraf 2662248Sraf /* 2672248Sraf * initialize kaio. 2682248Sraf */ 2692248Sraf void 2702248Sraf _kaio_init() 2712248Sraf { 2722248Sraf int error; 2732248Sraf sigset_t oset; 2742248Sraf 2752248Sraf lmutex_lock(&__aio_initlock); 2762248Sraf while (__aio_initbusy) 2772248Sraf (void) _cond_wait(&__aio_initcv, &__aio_initlock); 2782248Sraf if (_kaio_ok) { /* already initialized */ 2792248Sraf lmutex_unlock(&__aio_initlock); 2802248Sraf return; 2812248Sraf } 2822248Sraf __aio_initbusy = 1; 2832248Sraf lmutex_unlock(&__aio_initlock); 2842248Sraf 2852248Sraf if (_kaio_supported_init() != 0) 2862248Sraf error = ENOMEM; 2872248Sraf else if ((_kaiowp = _aio_worker_alloc()) == NULL) 2882248Sraf error = ENOMEM; 2892248Sraf else if ((error = (int)_kaio(AIOINIT)) == 0) { 2902248Sraf (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 2912248Sraf error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread, 2922248Sraf _kaiowp, THR_DAEMON, &_kaiowp->work_tid); 2932248Sraf (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 2942248Sraf } 2952248Sraf if (error && _kaiowp != NULL) { 2962248Sraf _aio_worker_free(_kaiowp); 2972248Sraf _kaiowp = NULL; 2982248Sraf } 2992248Sraf 3002248Sraf lmutex_lock(&__aio_initlock); 3012248Sraf if (error) 3022248Sraf _kaio_ok = -1; 3032248Sraf else 3042248Sraf _kaio_ok = 1; 3052248Sraf __aio_initbusy = 0; 3062248Sraf (void) cond_broadcast(&__aio_initcv); 3072248Sraf lmutex_unlock(&__aio_initlock); 3082248Sraf } 3092248Sraf 3102248Sraf int 3112248Sraf aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 3122248Sraf aio_result_t *resultp) 3132248Sraf { 3142248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD)); 3152248Sraf } 3162248Sraf 3172248Sraf int 3182248Sraf aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 3192248Sraf aio_result_t *resultp) 3202248Sraf { 3212248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE)); 3222248Sraf } 3232248Sraf 3242248Sraf #if !defined(_LP64) 3252248Sraf int 3262248Sraf aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 3272248Sraf aio_result_t *resultp) 3282248Sraf { 3292248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64)); 3302248Sraf } 3312248Sraf 3322248Sraf int 3332248Sraf aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 3342248Sraf aio_result_t *resultp) 3352248Sraf { 3362248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64)); 3372248Sraf } 3382248Sraf #endif /* !defined(_LP64) */ 3392248Sraf 3402248Sraf int 3412248Sraf _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence, 3422248Sraf aio_result_t *resultp, int mode) 3432248Sraf { 3442248Sraf aio_req_t *reqp; 3452248Sraf aio_args_t *ap; 3462248Sraf offset_t loffset; 3472248Sraf struct stat stat; 3482248Sraf int error = 0; 3492248Sraf int kerr; 3502248Sraf int umode; 3512248Sraf 3522248Sraf switch (whence) { 3532248Sraf 3542248Sraf case SEEK_SET: 3552248Sraf loffset = offset; 3562248Sraf break; 3572248Sraf case SEEK_CUR: 3582248Sraf if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1) 3592248Sraf error = -1; 3602248Sraf else 3612248Sraf loffset += offset; 3622248Sraf break; 3632248Sraf case SEEK_END: 3642248Sraf if (fstat(fd, &stat) == -1) 3652248Sraf error = -1; 3662248Sraf else 3672248Sraf loffset = offset + stat.st_size; 3682248Sraf break; 3692248Sraf default: 3702248Sraf errno = EINVAL; 3712248Sraf error = -1; 3722248Sraf } 3732248Sraf 3742248Sraf if (error) 3752248Sraf return (error); 3762248Sraf 3772248Sraf /* initialize kaio */ 3782248Sraf if (!_kaio_ok) 3792248Sraf _kaio_init(); 3802248Sraf 3812248Sraf /* 3822248Sraf * _aio_do_request() needs the original request code (mode) to be able 3832248Sraf * to choose the appropiate 32/64 bit function. All other functions 3842248Sraf * only require the difference between READ and WRITE (umode). 3852248Sraf */ 3862248Sraf if (mode == AIOAREAD64 || mode == AIOAWRITE64) 3872248Sraf umode = mode - AIOAREAD64; 3882248Sraf else 3892248Sraf umode = mode; 3902248Sraf 3912248Sraf /* 3922248Sraf * Try kernel aio first. 3932248Sraf * If errno is ENOTSUP/EBADFD, fall back to the thread implementation. 3942248Sraf */ 3952248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) { 3962248Sraf resultp->aio_errno = 0; 3972248Sraf sig_mutex_lock(&__aio_mutex); 3982248Sraf _kaio_outstand_cnt++; 3992248Sraf kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ? 4002248Sraf (umode | AIO_POLL_BIT) : umode), 4012248Sraf fd, buf, bufsz, loffset, resultp); 4022248Sraf if (kerr == 0) { 4032248Sraf sig_mutex_unlock(&__aio_mutex); 4042248Sraf return (0); 4052248Sraf } 4062248Sraf _kaio_outstand_cnt--; 4072248Sraf sig_mutex_unlock(&__aio_mutex); 4082248Sraf if (errno != ENOTSUP && errno != EBADFD) 4092248Sraf return (-1); 4102248Sraf if (errno == EBADFD) 4112248Sraf SET_KAIO_NOT_SUPPORTED(fd); 4122248Sraf } 4132248Sraf 4142248Sraf if (!__uaio_ok && __uaio_init() == -1) 4152248Sraf return (-1); 4162248Sraf 4172248Sraf if ((reqp = _aio_req_alloc()) == NULL) { 4182248Sraf errno = EAGAIN; 4192248Sraf return (-1); 4202248Sraf } 4212248Sraf 4222248Sraf /* 4232248Sraf * _aio_do_request() checks reqp->req_op to differentiate 4242248Sraf * between 32 and 64 bit access. 4252248Sraf */ 4262248Sraf reqp->req_op = mode; 4272248Sraf reqp->req_resultp = resultp; 4282248Sraf ap = &reqp->req_args; 4292248Sraf ap->fd = fd; 4302248Sraf ap->buf = buf; 4312248Sraf ap->bufsz = bufsz; 4322248Sraf ap->offset = loffset; 4332248Sraf 4342248Sraf if (_aio_hash_insert(resultp, reqp) != 0) { 4352248Sraf _aio_req_free(reqp); 4362248Sraf errno = EINVAL; 4372248Sraf return (-1); 4382248Sraf } 4392248Sraf /* 4402248Sraf * _aio_req_add() only needs the difference between READ and 4412248Sraf * WRITE to choose the right worker queue. 4422248Sraf */ 4432248Sraf _aio_req_add(reqp, &__nextworker_rw, umode); 4442248Sraf return (0); 4452248Sraf } 4462248Sraf 4472248Sraf int 4482248Sraf aiocancel(aio_result_t *resultp) 4492248Sraf { 4502248Sraf aio_req_t *reqp; 4512248Sraf aio_worker_t *aiowp; 4522248Sraf int ret; 4532248Sraf int done = 0; 4542248Sraf int canceled = 0; 4552248Sraf 4562248Sraf if (!__uaio_ok) { 4572248Sraf errno = EINVAL; 4582248Sraf return (-1); 4592248Sraf } 4602248Sraf 4612248Sraf sig_mutex_lock(&__aio_mutex); 4622248Sraf reqp = _aio_hash_find(resultp); 4632248Sraf if (reqp == NULL) { 4642248Sraf if (_aio_outstand_cnt == _aio_req_done_cnt) 4652248Sraf errno = EINVAL; 4662248Sraf else 4672248Sraf errno = EACCES; 4682248Sraf ret = -1; 4692248Sraf } else { 4702248Sraf aiowp = reqp->req_worker; 4712248Sraf sig_mutex_lock(&aiowp->work_qlock1); 4722248Sraf (void) _aio_cancel_req(aiowp, reqp, &canceled, &done); 4732248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 4742248Sraf 4752248Sraf if (canceled) { 4762248Sraf ret = 0; 4772248Sraf } else { 4782248Sraf if (_aio_outstand_cnt == 0 || 4792248Sraf _aio_outstand_cnt == _aio_req_done_cnt) 4802248Sraf errno = EINVAL; 4812248Sraf else 4822248Sraf errno = EACCES; 4832248Sraf ret = -1; 4842248Sraf } 4852248Sraf } 4862248Sraf sig_mutex_unlock(&__aio_mutex); 4872248Sraf return (ret); 4882248Sraf } 4892248Sraf 4902248Sraf /* 4912248Sraf * This must be asynch safe 4922248Sraf */ 4932248Sraf aio_result_t * 4942248Sraf aiowait(struct timeval *uwait) 4952248Sraf { 4962248Sraf aio_result_t *uresultp; 4972248Sraf aio_result_t *kresultp; 4982248Sraf aio_result_t *resultp; 4992248Sraf int dontblock; 5002248Sraf int timedwait = 0; 5012248Sraf int kaio_errno = 0; 5022248Sraf struct timeval twait; 5032248Sraf struct timeval *wait = NULL; 5042248Sraf hrtime_t hrtend; 5052248Sraf hrtime_t hres; 5062248Sraf 5072248Sraf if (uwait) { 5082248Sraf /* 5092248Sraf * Check for a valid specified wait time. 5102248Sraf * If it is invalid, fail the call right away. 5112248Sraf */ 5122248Sraf if (uwait->tv_sec < 0 || uwait->tv_usec < 0 || 5132248Sraf uwait->tv_usec >= MICROSEC) { 5142248Sraf errno = EINVAL; 5152248Sraf return ((aio_result_t *)-1); 5162248Sraf } 5172248Sraf 5182248Sraf if (uwait->tv_sec > 0 || uwait->tv_usec > 0) { 5192248Sraf hrtend = gethrtime() + 520*4502Spraks (hrtime_t)uwait->tv_sec * NANOSEC + 521*4502Spraks (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC); 5222248Sraf twait = *uwait; 5232248Sraf wait = &twait; 5242248Sraf timedwait++; 5252248Sraf } else { 5262248Sraf /* polling */ 5272248Sraf sig_mutex_lock(&__aio_mutex); 5282248Sraf if (_kaio_outstand_cnt == 0) { 5292248Sraf kresultp = (aio_result_t *)-1; 5302248Sraf } else { 5312248Sraf kresultp = (aio_result_t *)_kaio(AIOWAIT, 5322248Sraf (struct timeval *)-1, 1); 5332248Sraf if (kresultp != (aio_result_t *)-1 && 5342248Sraf kresultp != NULL && 5352248Sraf kresultp != (aio_result_t *)1) { 5362248Sraf _kaio_outstand_cnt--; 5372248Sraf sig_mutex_unlock(&__aio_mutex); 5382248Sraf return (kresultp); 5392248Sraf } 5402248Sraf } 5412248Sraf uresultp = _aio_req_done(); 5422248Sraf sig_mutex_unlock(&__aio_mutex); 5432248Sraf if (uresultp != NULL && 5442248Sraf uresultp != (aio_result_t *)-1) { 5452248Sraf return (uresultp); 5462248Sraf } 5472248Sraf if (uresultp == (aio_result_t *)-1 && 5482248Sraf kresultp == (aio_result_t *)-1) { 5492248Sraf errno = EINVAL; 5502248Sraf return ((aio_result_t *)-1); 5512248Sraf } else { 5522248Sraf return (NULL); 5532248Sraf } 5542248Sraf } 5552248Sraf } 5562248Sraf 5572248Sraf for (;;) { 5582248Sraf sig_mutex_lock(&__aio_mutex); 5592248Sraf uresultp = _aio_req_done(); 5602248Sraf if (uresultp != NULL && uresultp != (aio_result_t *)-1) { 5612248Sraf sig_mutex_unlock(&__aio_mutex); 5622248Sraf resultp = uresultp; 5632248Sraf break; 5642248Sraf } 5652248Sraf _aiowait_flag++; 5662248Sraf dontblock = (uresultp == (aio_result_t *)-1); 5672248Sraf if (dontblock && _kaio_outstand_cnt == 0) { 5682248Sraf kresultp = (aio_result_t *)-1; 5692248Sraf kaio_errno = EINVAL; 5702248Sraf } else { 5712248Sraf sig_mutex_unlock(&__aio_mutex); 5722248Sraf kresultp = (aio_result_t *)_kaio(AIOWAIT, 5732248Sraf wait, dontblock); 5742248Sraf sig_mutex_lock(&__aio_mutex); 5752248Sraf kaio_errno = errno; 5762248Sraf } 5772248Sraf _aiowait_flag--; 5782248Sraf sig_mutex_unlock(&__aio_mutex); 5792248Sraf if (kresultp == (aio_result_t *)1) { 5802248Sraf /* aiowait() awakened by an aionotify() */ 5812248Sraf continue; 5822248Sraf } else if (kresultp != NULL && 5832248Sraf kresultp != (aio_result_t *)-1) { 5842248Sraf resultp = kresultp; 5852248Sraf sig_mutex_lock(&__aio_mutex); 5862248Sraf _kaio_outstand_cnt--; 5872248Sraf sig_mutex_unlock(&__aio_mutex); 5882248Sraf break; 5892248Sraf } else if (kresultp == (aio_result_t *)-1 && 5902248Sraf kaio_errno == EINVAL && 5912248Sraf uresultp == (aio_result_t *)-1) { 5922248Sraf errno = kaio_errno; 5932248Sraf resultp = (aio_result_t *)-1; 5942248Sraf break; 5952248Sraf } else if (kresultp == (aio_result_t *)-1 && 5962248Sraf kaio_errno == EINTR) { 5972248Sraf errno = kaio_errno; 5982248Sraf resultp = (aio_result_t *)-1; 5992248Sraf break; 6002248Sraf } else if (timedwait) { 6012248Sraf hres = hrtend - gethrtime(); 6022248Sraf if (hres <= 0) { 6032248Sraf /* time is up; return */ 6042248Sraf resultp = NULL; 6052248Sraf break; 6062248Sraf } else { 6072248Sraf /* 6082248Sraf * Some time left. Round up the remaining time 6092248Sraf * in nanoseconds to microsec. Retry the call. 6102248Sraf */ 6112248Sraf hres += (NANOSEC / MICROSEC) - 1; 6122248Sraf wait->tv_sec = hres / NANOSEC; 6132248Sraf wait->tv_usec = 614*4502Spraks (hres % NANOSEC) / (NANOSEC / MICROSEC); 6152248Sraf } 6162248Sraf } else { 6172248Sraf ASSERT(kresultp == NULL && uresultp == NULL); 6182248Sraf resultp = NULL; 6192248Sraf continue; 6202248Sraf } 6212248Sraf } 6222248Sraf return (resultp); 6232248Sraf } 6242248Sraf 6252248Sraf /* 6262248Sraf * _aio_get_timedelta calculates the remaining time and stores the result 6272248Sraf * into timespec_t *wait. 6282248Sraf */ 6292248Sraf 6302248Sraf int 6312248Sraf _aio_get_timedelta(timespec_t *end, timespec_t *wait) 6322248Sraf { 6332248Sraf int ret = 0; 6342248Sraf struct timeval cur; 6352248Sraf timespec_t curtime; 6362248Sraf 6372248Sraf (void) gettimeofday(&cur, NULL); 6382248Sraf curtime.tv_sec = cur.tv_sec; 6392248Sraf curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */ 6402248Sraf 6412248Sraf if (end->tv_sec >= curtime.tv_sec) { 6422248Sraf wait->tv_sec = end->tv_sec - curtime.tv_sec; 6432248Sraf if (end->tv_nsec >= curtime.tv_nsec) { 6442248Sraf wait->tv_nsec = end->tv_nsec - curtime.tv_nsec; 6452248Sraf if (wait->tv_sec == 0 && wait->tv_nsec == 0) 6462248Sraf ret = -1; /* timer expired */ 6472248Sraf } else { 6482248Sraf if (end->tv_sec > curtime.tv_sec) { 6492248Sraf wait->tv_sec -= 1; 6502248Sraf wait->tv_nsec = NANOSEC - 6512248Sraf (curtime.tv_nsec - end->tv_nsec); 6522248Sraf } else { 6532248Sraf ret = -1; /* timer expired */ 6542248Sraf } 6552248Sraf } 6562248Sraf } else { 6572248Sraf ret = -1; 6582248Sraf } 6592248Sraf return (ret); 6602248Sraf } 6612248Sraf 6622248Sraf /* 6632248Sraf * If closing by file descriptor: we will simply cancel all the outstanding 6642248Sraf * aio`s and return. Those aio's in question will have either noticed the 6652248Sraf * cancellation notice before, during, or after initiating io. 6662248Sraf */ 6672248Sraf int 6682248Sraf aiocancel_all(int fd) 6692248Sraf { 6702248Sraf aio_req_t *reqp; 6712248Sraf aio_req_t **reqpp; 6722248Sraf aio_worker_t *first; 6732248Sraf aio_worker_t *next; 6742248Sraf int canceled = 0; 6752248Sraf int done = 0; 6762248Sraf int cancelall = 0; 6772248Sraf 6782248Sraf sig_mutex_lock(&__aio_mutex); 6792248Sraf 6802248Sraf if (_aio_outstand_cnt == 0) { 6812248Sraf sig_mutex_unlock(&__aio_mutex); 6822248Sraf return (AIO_ALLDONE); 6832248Sraf } 6842248Sraf 6852248Sraf /* 6862248Sraf * Cancel requests from the read/write workers' queues. 6872248Sraf */ 6882248Sraf first = __nextworker_rw; 6892248Sraf next = first; 6902248Sraf do { 6912248Sraf _aio_cancel_work(next, fd, &canceled, &done); 6922248Sraf } while ((next = next->work_forw) != first); 6932248Sraf 6942248Sraf /* 6952248Sraf * finally, check if there are requests on the done queue that 6962248Sraf * should be canceled. 6972248Sraf */ 6982248Sraf if (fd < 0) 6992248Sraf cancelall = 1; 7002248Sraf reqpp = &_aio_done_tail; 7012248Sraf while ((reqp = *reqpp) != NULL) { 7022248Sraf if (cancelall || reqp->req_args.fd == fd) { 7032248Sraf *reqpp = reqp->req_next; 7042248Sraf _aio_donecnt--; 7052248Sraf (void) _aio_hash_del(reqp->req_resultp); 7062248Sraf _aio_req_free(reqp); 7072248Sraf } else 7082248Sraf reqpp = &reqp->req_next; 7092248Sraf } 7102248Sraf if (cancelall) { 7112248Sraf ASSERT(_aio_donecnt == 0); 7122248Sraf _aio_done_head = NULL; 7132248Sraf } 7142248Sraf sig_mutex_unlock(&__aio_mutex); 7152248Sraf 7162248Sraf if (canceled && done == 0) 7172248Sraf return (AIO_CANCELED); 7182248Sraf else if (done && canceled == 0) 7192248Sraf return (AIO_ALLDONE); 7202248Sraf else if ((canceled + done == 0) && KAIO_SUPPORTED(fd)) 7212248Sraf return ((int)_kaio(AIOCANCEL, fd, NULL)); 7222248Sraf return (AIO_NOTCANCELED); 7232248Sraf } 7242248Sraf 7252248Sraf /* 7262248Sraf * Cancel requests from a given work queue. If the file descriptor 7272248Sraf * parameter, fd, is non-negative, then only cancel those requests 7282248Sraf * in this queue that are to this file descriptor. If the fd 7292248Sraf * parameter is -1, then cancel all requests. 7302248Sraf */ 7312248Sraf static void 7322248Sraf _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done) 7332248Sraf { 7342248Sraf aio_req_t *reqp; 7352248Sraf 7362248Sraf sig_mutex_lock(&aiowp->work_qlock1); 7372248Sraf /* 7382248Sraf * cancel queued requests first. 7392248Sraf */ 7402248Sraf reqp = aiowp->work_tail1; 7412248Sraf while (reqp != NULL) { 7422248Sraf if (fd < 0 || reqp->req_args.fd == fd) { 7432248Sraf if (_aio_cancel_req(aiowp, reqp, canceled, done)) { 7442248Sraf /* 7452248Sraf * Callers locks were dropped. 7462248Sraf * reqp is invalid; start traversing 7472248Sraf * the list from the beginning again. 7482248Sraf */ 7492248Sraf reqp = aiowp->work_tail1; 7502248Sraf continue; 7512248Sraf } 7522248Sraf } 7532248Sraf reqp = reqp->req_next; 7542248Sraf } 7552248Sraf /* 7562248Sraf * Since the queued requests have been canceled, there can 7572248Sraf * only be one inprogress request that should be canceled. 7582248Sraf */ 7592248Sraf if ((reqp = aiowp->work_req) != NULL && 7602248Sraf (fd < 0 || reqp->req_args.fd == fd)) 7612248Sraf (void) _aio_cancel_req(aiowp, reqp, canceled, done); 7622248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 7632248Sraf } 7642248Sraf 7652248Sraf /* 7662248Sraf * Cancel a request. Return 1 if the callers locks were temporarily 7672248Sraf * dropped, otherwise return 0. 7682248Sraf */ 7692248Sraf int 7702248Sraf _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done) 7712248Sraf { 7722248Sraf int ostate = reqp->req_state; 7732248Sraf 7742248Sraf ASSERT(MUTEX_HELD(&__aio_mutex)); 7752248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 7762248Sraf if (ostate == AIO_REQ_CANCELED) 7772248Sraf return (0); 7782248Sraf if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) { 7792248Sraf (*done)++; 7802248Sraf return (0); 7812248Sraf } 7822248Sraf if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) { 7832248Sraf ASSERT(POSIX_AIO(reqp)); 7842248Sraf /* Cancel the queued aio_fsync() request */ 7852248Sraf if (!reqp->req_head->lio_canned) { 7862248Sraf reqp->req_head->lio_canned = 1; 7872248Sraf _aio_outstand_cnt--; 7882248Sraf (*canceled)++; 7892248Sraf } 7902248Sraf return (0); 7912248Sraf } 7922248Sraf reqp->req_state = AIO_REQ_CANCELED; 7932248Sraf _aio_req_del(aiowp, reqp, ostate); 7942248Sraf (void) _aio_hash_del(reqp->req_resultp); 7952248Sraf (*canceled)++; 7962248Sraf if (reqp == aiowp->work_req) { 7972248Sraf ASSERT(ostate == AIO_REQ_INPROGRESS); 7982248Sraf /* 7992248Sraf * Set the result values now, before _aiodone() is called. 8002248Sraf * We do this because the application can expect aio_return 8012248Sraf * and aio_errno to be set to -1 and ECANCELED, respectively, 8022248Sraf * immediately after a successful return from aiocancel() 8032248Sraf * or aio_cancel(). 8042248Sraf */ 8052248Sraf _aio_set_result(reqp, -1, ECANCELED); 8062248Sraf (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL); 8072248Sraf return (0); 8082248Sraf } 8092248Sraf if (!POSIX_AIO(reqp)) { 8102248Sraf _aio_outstand_cnt--; 8112248Sraf _aio_set_result(reqp, -1, ECANCELED); 8122248Sraf return (0); 8132248Sraf } 8142248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 8152248Sraf sig_mutex_unlock(&__aio_mutex); 8162248Sraf _aiodone(reqp, -1, ECANCELED); 8172248Sraf sig_mutex_lock(&__aio_mutex); 8182248Sraf sig_mutex_lock(&aiowp->work_qlock1); 8192248Sraf return (1); 8202248Sraf } 8212248Sraf 8222248Sraf int 8232248Sraf _aio_create_worker(aio_req_t *reqp, int mode) 8242248Sraf { 8252248Sraf aio_worker_t *aiowp, **workers, **nextworker; 8262248Sraf int *aio_workerscnt; 8272248Sraf void *(*func)(void *); 8282248Sraf sigset_t oset; 8292248Sraf int error; 8302248Sraf 8312248Sraf /* 8322248Sraf * Put the new worker thread in the right queue. 8332248Sraf */ 8342248Sraf switch (mode) { 8352248Sraf case AIOREAD: 8362248Sraf case AIOWRITE: 8372248Sraf case AIOAREAD: 8382248Sraf case AIOAWRITE: 8392248Sraf #if !defined(_LP64) 8402248Sraf case AIOAREAD64: 8412248Sraf case AIOAWRITE64: 8422248Sraf #endif 8432248Sraf workers = &__workers_rw; 8442248Sraf nextworker = &__nextworker_rw; 8452248Sraf aio_workerscnt = &__rw_workerscnt; 8462248Sraf func = _aio_do_request; 8472248Sraf break; 8482248Sraf case AIONOTIFY: 8492248Sraf workers = &__workers_no; 8502248Sraf nextworker = &__nextworker_no; 8512248Sraf func = _aio_do_notify; 8522248Sraf aio_workerscnt = &__no_workerscnt; 8532248Sraf break; 8542248Sraf default: 8552248Sraf aio_panic("_aio_create_worker: invalid mode"); 8562248Sraf break; 8572248Sraf } 8582248Sraf 8592248Sraf if ((aiowp = _aio_worker_alloc()) == NULL) 8602248Sraf return (-1); 8612248Sraf 8622248Sraf if (reqp) { 8632248Sraf reqp->req_state = AIO_REQ_QUEUED; 8642248Sraf reqp->req_worker = aiowp; 8652248Sraf aiowp->work_head1 = reqp; 8662248Sraf aiowp->work_tail1 = reqp; 8672248Sraf aiowp->work_next1 = reqp; 8682248Sraf aiowp->work_count1 = 1; 8692248Sraf aiowp->work_minload1 = 1; 8702248Sraf } 8712248Sraf 8722248Sraf (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 8732248Sraf error = thr_create(NULL, AIOSTKSIZE, func, aiowp, 874*4502Spraks THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid); 8752248Sraf (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 8762248Sraf if (error) { 8772248Sraf if (reqp) { 8782248Sraf reqp->req_state = 0; 8792248Sraf reqp->req_worker = NULL; 8802248Sraf } 8812248Sraf _aio_worker_free(aiowp); 8822248Sraf return (-1); 8832248Sraf } 8842248Sraf 8852248Sraf lmutex_lock(&__aio_mutex); 8862248Sraf (*aio_workerscnt)++; 8872248Sraf if (*workers == NULL) { 8882248Sraf aiowp->work_forw = aiowp; 8892248Sraf aiowp->work_backw = aiowp; 8902248Sraf *nextworker = aiowp; 8912248Sraf *workers = aiowp; 8922248Sraf } else { 8932248Sraf aiowp->work_backw = (*workers)->work_backw; 8942248Sraf aiowp->work_forw = (*workers); 8952248Sraf (*workers)->work_backw->work_forw = aiowp; 8962248Sraf (*workers)->work_backw = aiowp; 8972248Sraf } 8982248Sraf _aio_worker_cnt++; 8992248Sraf lmutex_unlock(&__aio_mutex); 9002248Sraf 9012248Sraf (void) thr_continue(aiowp->work_tid); 9022248Sraf 9032248Sraf return (0); 9042248Sraf } 9052248Sraf 9062248Sraf /* 9072248Sraf * This is the worker's main routine. 9082248Sraf * The task of this function is to execute all queued requests; 9092248Sraf * once the last pending request is executed this function will block 9102248Sraf * in _aio_idle(). A new incoming request must wakeup this thread to 9112248Sraf * restart the work. 9122248Sraf * Every worker has an own work queue. The queue lock is required 9132248Sraf * to synchronize the addition of new requests for this worker or 9142248Sraf * cancellation of pending/running requests. 9152248Sraf * 9162248Sraf * Cancellation scenarios: 9172248Sraf * The cancellation of a request is being done asynchronously using 9182248Sraf * _aio_cancel_req() from another thread context. 9192248Sraf * A queued request can be cancelled in different manners : 9202248Sraf * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED): 9212248Sraf * - lock the queue -> remove the request -> unlock the queue 9222248Sraf * - this function/thread does not detect this cancellation process 9232248Sraf * b) request is in progress (AIO_REQ_INPROGRESS) : 9242248Sraf * - this function first allow the cancellation of the running 9252248Sraf * request with the flag "work_cancel_flg=1" 9262248Sraf * see _aio_req_get() -> _aio_cancel_on() 9272248Sraf * During this phase, it is allowed to interrupt the worker 9282248Sraf * thread running the request (this thread) using the SIGAIOCANCEL 9292248Sraf * signal. 9302248Sraf * Once this thread returns from the kernel (because the request 9312248Sraf * is just done), then it must disable a possible cancellation 9322248Sraf * and proceed to finish the request. To disable the cancellation 9332248Sraf * this thread must use _aio_cancel_off() to set "work_cancel_flg=0". 9342248Sraf * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ): 9352248Sraf * same procedure as in a) 9362248Sraf * 9372248Sraf * To b) 9382248Sraf * This thread uses sigsetjmp() to define the position in the code, where 9392248Sraf * it wish to continue working in the case that a SIGAIOCANCEL signal 9402248Sraf * is detected. 9412248Sraf * Normally this thread should get the cancellation signal during the 9422248Sraf * kernel phase (reading or writing). In that case the signal handler 9432248Sraf * aiosigcancelhndlr() is activated using the worker thread context, 9442248Sraf * which again will use the siglongjmp() function to break the standard 9452248Sraf * code flow and jump to the "sigsetjmp" position, provided that 9462248Sraf * "work_cancel_flg" is set to "1". 9472248Sraf * Because the "work_cancel_flg" is only manipulated by this worker 9482248Sraf * thread and it can only run on one CPU at a given time, it is not 9492248Sraf * necessary to protect that flag with the queue lock. 9502248Sraf * Returning from the kernel (read or write system call) we must 9512248Sraf * first disable the use of the SIGAIOCANCEL signal and accordingly 9522248Sraf * the use of the siglongjmp() function to prevent a possible deadlock: 9532248Sraf * - It can happens that this worker thread returns from the kernel and 9542248Sraf * blocks in "work_qlock1", 9552248Sraf * - then a second thread cancels the apparently "in progress" request 9562248Sraf * and sends the SIGAIOCANCEL signal to the worker thread, 9572248Sraf * - the worker thread gets assigned the "work_qlock1" and will returns 9582248Sraf * from the kernel, 9592248Sraf * - the kernel detects the pending signal and activates the signal 9602248Sraf * handler instead, 9612248Sraf * - if the "work_cancel_flg" is still set then the signal handler 9622248Sraf * should use siglongjmp() to cancel the "in progress" request and 9632248Sraf * it would try to acquire the same work_qlock1 in _aio_req_get() 9642248Sraf * for a second time => deadlock. 9652248Sraf * To avoid that situation we disable the cancellation of the request 9662248Sraf * in progress BEFORE we try to acquire the work_qlock1. 9672248Sraf * In that case the signal handler will not call siglongjmp() and the 9682248Sraf * worker thread will continue running the standard code flow. 9692248Sraf * Then this thread must check the AIO_REQ_CANCELED flag to emulate 9702248Sraf * an eventually required siglongjmp() freeing the work_qlock1 and 9712248Sraf * avoiding a deadlock. 9722248Sraf */ 9732248Sraf void * 9742248Sraf _aio_do_request(void *arglist) 9752248Sraf { 9762248Sraf aio_worker_t *aiowp = (aio_worker_t *)arglist; 9772248Sraf ulwp_t *self = curthread; 9782248Sraf struct aio_args *arg; 9792248Sraf aio_req_t *reqp; /* current AIO request */ 9802248Sraf ssize_t retval; 9812248Sraf int error; 9822248Sraf 9832248Sraf if (pthread_setspecific(_aio_key, aiowp) != 0) 9842248Sraf aio_panic("_aio_do_request, pthread_setspecific()"); 9852248Sraf (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL); 9862248Sraf ASSERT(aiowp->work_req == NULL); 9872248Sraf 9882248Sraf /* 9892248Sraf * We resume here when an operation is cancelled. 9902248Sraf * On first entry, aiowp->work_req == NULL, so all 9912248Sraf * we do is block SIGAIOCANCEL. 9922248Sraf */ 9932248Sraf (void) sigsetjmp(aiowp->work_jmp_buf, 0); 9942248Sraf ASSERT(self->ul_sigdefer == 0); 9952248Sraf 9962248Sraf sigoff(self); /* block SIGAIOCANCEL */ 9972248Sraf if (aiowp->work_req != NULL) 9982248Sraf _aio_finish_request(aiowp, -1, ECANCELED); 9992248Sraf 10002248Sraf for (;;) { 10012248Sraf /* 10022248Sraf * Put completed requests on aio_done_list. This has 10032248Sraf * to be done as part of the main loop to ensure that 10042248Sraf * we don't artificially starve any aiowait'ers. 10052248Sraf */ 10062248Sraf if (aiowp->work_done1) 10072248Sraf _aio_work_done(aiowp); 10082248Sraf 10092248Sraf top: 10102248Sraf /* consume any deferred SIGAIOCANCEL signal here */ 10112248Sraf sigon(self); 10122248Sraf sigoff(self); 10132248Sraf 10142248Sraf while ((reqp = _aio_req_get(aiowp)) == NULL) { 10152248Sraf if (_aio_idle(aiowp) != 0) 10162248Sraf goto top; 10172248Sraf } 10182248Sraf arg = &reqp->req_args; 10192248Sraf ASSERT(reqp->req_state == AIO_REQ_INPROGRESS || 10202248Sraf reqp->req_state == AIO_REQ_CANCELED); 10212248Sraf error = 0; 10222248Sraf 10232248Sraf switch (reqp->req_op) { 10242248Sraf case AIOREAD: 10252248Sraf case AIOAREAD: 10262248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 10272248Sraf retval = pread(arg->fd, arg->buf, 10282248Sraf arg->bufsz, arg->offset); 10292248Sraf if (retval == -1) { 10302248Sraf if (errno == ESPIPE) { 10312248Sraf retval = read(arg->fd, 10322248Sraf arg->buf, arg->bufsz); 10332248Sraf if (retval == -1) 10342248Sraf error = errno; 10352248Sraf } else { 10362248Sraf error = errno; 10372248Sraf } 10382248Sraf } 10392248Sraf sigoff(self); /* block SIGAIOCANCEL */ 10402248Sraf break; 10412248Sraf case AIOWRITE: 10422248Sraf case AIOAWRITE: 10432248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 10442248Sraf retval = pwrite(arg->fd, arg->buf, 10452248Sraf arg->bufsz, arg->offset); 10462248Sraf if (retval == -1) { 10472248Sraf if (errno == ESPIPE) { 10482248Sraf retval = write(arg->fd, 10492248Sraf arg->buf, arg->bufsz); 10502248Sraf if (retval == -1) 10512248Sraf error = errno; 10522248Sraf } else { 10532248Sraf error = errno; 10542248Sraf } 10552248Sraf } 10562248Sraf sigoff(self); /* block SIGAIOCANCEL */ 10572248Sraf break; 10582248Sraf #if !defined(_LP64) 10592248Sraf case AIOAREAD64: 10602248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 10612248Sraf retval = pread64(arg->fd, arg->buf, 10622248Sraf arg->bufsz, arg->offset); 10632248Sraf if (retval == -1) { 10642248Sraf if (errno == ESPIPE) { 10652248Sraf retval = read(arg->fd, 10662248Sraf arg->buf, arg->bufsz); 10672248Sraf if (retval == -1) 10682248Sraf error = errno; 10692248Sraf } else { 10702248Sraf error = errno; 10712248Sraf } 10722248Sraf } 10732248Sraf sigoff(self); /* block SIGAIOCANCEL */ 10742248Sraf break; 10752248Sraf case AIOAWRITE64: 10762248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 10772248Sraf retval = pwrite64(arg->fd, arg->buf, 10782248Sraf arg->bufsz, arg->offset); 10792248Sraf if (retval == -1) { 10802248Sraf if (errno == ESPIPE) { 10812248Sraf retval = write(arg->fd, 10822248Sraf arg->buf, arg->bufsz); 10832248Sraf if (retval == -1) 10842248Sraf error = errno; 10852248Sraf } else { 10862248Sraf error = errno; 10872248Sraf } 10882248Sraf } 10892248Sraf sigoff(self); /* block SIGAIOCANCEL */ 10902248Sraf break; 10912248Sraf #endif /* !defined(_LP64) */ 10922248Sraf case AIOFSYNC: 10932248Sraf if (_aio_fsync_del(aiowp, reqp)) 10942248Sraf goto top; 10952248Sraf ASSERT(reqp->req_head == NULL); 10962248Sraf /* 10972248Sraf * All writes for this fsync request are now 10982248Sraf * acknowledged. Now make these writes visible 10992248Sraf * and put the final request into the hash table. 11002248Sraf */ 11012248Sraf if (reqp->req_state == AIO_REQ_CANCELED) { 11022248Sraf /* EMPTY */; 11032248Sraf } else if (arg->offset == O_SYNC) { 11042248Sraf if ((retval = __fdsync(arg->fd, FSYNC)) == -1) 11052248Sraf error = errno; 11062248Sraf } else { 11072248Sraf if ((retval = __fdsync(arg->fd, FDSYNC)) == -1) 11082248Sraf error = errno; 11092248Sraf } 11102248Sraf if (_aio_hash_insert(reqp->req_resultp, reqp) != 0) 11112248Sraf aio_panic("_aio_do_request(): AIOFSYNC: " 11122248Sraf "request already in hash table"); 11132248Sraf break; 11142248Sraf default: 11152248Sraf aio_panic("_aio_do_request, bad op"); 11162248Sraf } 11172248Sraf 11182248Sraf _aio_finish_request(aiowp, retval, error); 11192248Sraf } 11202248Sraf /* NOTREACHED */ 11212248Sraf return (NULL); 11222248Sraf } 11232248Sraf 11242248Sraf /* 11252248Sraf * Perform the tail processing for _aio_do_request(). 11262248Sraf * The in-progress request may or may not have been cancelled. 11272248Sraf */ 11282248Sraf static void 11292248Sraf _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error) 11302248Sraf { 11312248Sraf aio_req_t *reqp; 11322248Sraf 11332248Sraf sig_mutex_lock(&aiowp->work_qlock1); 11342248Sraf if ((reqp = aiowp->work_req) == NULL) 11352248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 11362248Sraf else { 11372248Sraf aiowp->work_req = NULL; 11382248Sraf if (reqp->req_state == AIO_REQ_CANCELED) { 11392248Sraf retval = -1; 11402248Sraf error = ECANCELED; 11412248Sraf } 11422248Sraf if (!POSIX_AIO(reqp)) { 1143*4502Spraks int notify; 11442248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 11452248Sraf sig_mutex_lock(&__aio_mutex); 11462248Sraf if (reqp->req_state == AIO_REQ_INPROGRESS) 11472248Sraf reqp->req_state = AIO_REQ_DONE; 1148*4502Spraks /* 1149*4502Spraks * If it was canceled, this request will not be 1150*4502Spraks * added to done list. Just free it. 1151*4502Spraks */ 1152*4502Spraks if (error == ECANCELED) { 11532248Sraf _aio_outstand_cnt--; 1154*4502Spraks _aio_req_free(reqp); 1155*4502Spraks } else { 1156*4502Spraks _aio_set_result(reqp, retval, error); 1157*4502Spraks _aio_req_done_cnt++; 1158*4502Spraks } 1159*4502Spraks /* 1160*4502Spraks * Notify any thread that may have blocked 1161*4502Spraks * because it saw an outstanding request. 1162*4502Spraks */ 1163*4502Spraks notify = 0; 1164*4502Spraks if (_aio_outstand_cnt == 0 && _aiowait_flag) { 1165*4502Spraks notify = 1; 1166*4502Spraks } 11672248Sraf sig_mutex_unlock(&__aio_mutex); 1168*4502Spraks if (notify) { 1169*4502Spraks (void) _kaio(AIONOTIFY); 1170*4502Spraks } 11712248Sraf } else { 11722248Sraf if (reqp->req_state == AIO_REQ_INPROGRESS) 11732248Sraf reqp->req_state = AIO_REQ_DONE; 11742248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 11752248Sraf _aiodone(reqp, retval, error); 11762248Sraf } 11772248Sraf } 11782248Sraf } 11792248Sraf 11802248Sraf void 11812248Sraf _aio_req_mark_done(aio_req_t *reqp) 11822248Sraf { 11832248Sraf #if !defined(_LP64) 11842248Sraf if (reqp->req_largefile) 11852248Sraf ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 11862248Sraf else 11872248Sraf #endif 11882248Sraf ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 11892248Sraf } 11902248Sraf 11912248Sraf /* 11922248Sraf * Sleep for 'ticks' clock ticks to give somebody else a chance to run, 11932248Sraf * hopefully to consume one of our queued signals. 11942248Sraf */ 11952248Sraf static void 11962248Sraf _aio_delay(int ticks) 11972248Sraf { 11982248Sraf (void) usleep(ticks * (MICROSEC / hz)); 11992248Sraf } 12002248Sraf 12012248Sraf /* 12022248Sraf * Actually send the notifications. 12032248Sraf * We could block indefinitely here if the application 12042248Sraf * is not listening for the signal or port notifications. 12052248Sraf */ 12062248Sraf static void 12072248Sraf send_notification(notif_param_t *npp) 12082248Sraf { 12092248Sraf extern int __sigqueue(pid_t pid, int signo, 1210*4502Spraks /* const union sigval */ void *value, int si_code, int block); 12112248Sraf 12122248Sraf if (npp->np_signo) 12132248Sraf (void) __sigqueue(__pid, npp->np_signo, npp->np_user, 12142248Sraf SI_ASYNCIO, 1); 12152248Sraf else if (npp->np_port >= 0) 12162248Sraf (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO, 12172248Sraf npp->np_event, npp->np_object, npp->np_user); 12182248Sraf 12192248Sraf if (npp->np_lio_signo) 12202248Sraf (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user, 12212248Sraf SI_ASYNCIO, 1); 12222248Sraf else if (npp->np_lio_port >= 0) 12232248Sraf (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO, 12242248Sraf npp->np_lio_event, npp->np_lio_object, npp->np_lio_user); 12252248Sraf } 12262248Sraf 12272248Sraf /* 12282248Sraf * Asynchronous notification worker. 12292248Sraf */ 12302248Sraf void * 12312248Sraf _aio_do_notify(void *arg) 12322248Sraf { 12332248Sraf aio_worker_t *aiowp = (aio_worker_t *)arg; 12342248Sraf aio_req_t *reqp; 12352248Sraf 12362248Sraf /* 12372248Sraf * This isn't really necessary. All signals are blocked. 12382248Sraf */ 12392248Sraf if (pthread_setspecific(_aio_key, aiowp) != 0) 12402248Sraf aio_panic("_aio_do_notify, pthread_setspecific()"); 12412248Sraf 12422248Sraf /* 12432248Sraf * Notifications are never cancelled. 12442248Sraf * All signals remain blocked, forever. 12452248Sraf */ 12462248Sraf for (;;) { 12472248Sraf while ((reqp = _aio_req_get(aiowp)) == NULL) { 12482248Sraf if (_aio_idle(aiowp) != 0) 12492248Sraf aio_panic("_aio_do_notify: _aio_idle() failed"); 12502248Sraf } 12512248Sraf send_notification(&reqp->req_notify); 12522248Sraf _aio_req_free(reqp); 12532248Sraf } 12542248Sraf 12552248Sraf /* NOTREACHED */ 12562248Sraf return (NULL); 12572248Sraf } 12582248Sraf 12592248Sraf /* 12602248Sraf * Do the completion semantics for a request that was either canceled 12612248Sraf * by _aio_cancel_req() or was completed by _aio_do_request(). 12622248Sraf */ 12632248Sraf static void 12642248Sraf _aiodone(aio_req_t *reqp, ssize_t retval, int error) 12652248Sraf { 12662248Sraf aio_result_t *resultp = reqp->req_resultp; 12672248Sraf int notify = 0; 12682248Sraf aio_lio_t *head; 12692248Sraf int sigev_none; 12702248Sraf int sigev_signal; 12712248Sraf int sigev_thread; 12722248Sraf int sigev_port; 12732248Sraf notif_param_t np; 12742248Sraf 12752248Sraf /* 12762248Sraf * We call _aiodone() only for Posix I/O. 12772248Sraf */ 12782248Sraf ASSERT(POSIX_AIO(reqp)); 12792248Sraf 12802248Sraf sigev_none = 0; 12812248Sraf sigev_signal = 0; 12822248Sraf sigev_thread = 0; 12832248Sraf sigev_port = 0; 12842248Sraf np.np_signo = 0; 12852248Sraf np.np_port = -1; 12862248Sraf np.np_lio_signo = 0; 12872248Sraf np.np_lio_port = -1; 12882248Sraf 12892248Sraf switch (reqp->req_sigevent.sigev_notify) { 12902248Sraf case SIGEV_NONE: 12912248Sraf sigev_none = 1; 12922248Sraf break; 12932248Sraf case SIGEV_SIGNAL: 12942248Sraf sigev_signal = 1; 12952248Sraf break; 12962248Sraf case SIGEV_THREAD: 12972248Sraf sigev_thread = 1; 12982248Sraf break; 12992248Sraf case SIGEV_PORT: 13002248Sraf sigev_port = 1; 13012248Sraf break; 13022248Sraf default: 13032248Sraf aio_panic("_aiodone: improper sigev_notify"); 13042248Sraf break; 13052248Sraf } 13062248Sraf 13072248Sraf /* 13082248Sraf * Figure out the notification parameters while holding __aio_mutex. 13092248Sraf * Actually perform the notifications after dropping __aio_mutex. 13102248Sraf * This allows us to sleep for a long time (if the notifications 13112248Sraf * incur delays) without impeding other async I/O operations. 13122248Sraf */ 13132248Sraf 13142248Sraf sig_mutex_lock(&__aio_mutex); 13152248Sraf 13162248Sraf if (sigev_signal) { 13172248Sraf if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0) 13182248Sraf notify = 1; 13192248Sraf np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 13202248Sraf } else if (sigev_thread | sigev_port) { 13212248Sraf if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0) 13222248Sraf notify = 1; 13232248Sraf np.np_event = reqp->req_op; 13242248Sraf if (np.np_event == AIOFSYNC && reqp->req_largefile) 13252248Sraf np.np_event = AIOFSYNC64; 13262248Sraf np.np_object = (uintptr_t)reqp->req_aiocbp; 13272248Sraf np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 13282248Sraf } 13292248Sraf 13302248Sraf if (resultp->aio_errno == EINPROGRESS) 13312248Sraf _aio_set_result(reqp, retval, error); 13322248Sraf 13332248Sraf _aio_outstand_cnt--; 13342248Sraf 13352248Sraf head = reqp->req_head; 13362248Sraf reqp->req_head = NULL; 13372248Sraf 13382248Sraf if (sigev_none) { 13392248Sraf _aio_enq_doneq(reqp); 13402248Sraf reqp = NULL; 13412248Sraf } else { 13422248Sraf (void) _aio_hash_del(resultp); 13432248Sraf _aio_req_mark_done(reqp); 13442248Sraf } 13452248Sraf 13462248Sraf _aio_waitn_wakeup(); 13472248Sraf 13482248Sraf /* 13492248Sraf * __aio_waitn() sets AIO_WAIT_INPROGRESS and 13502248Sraf * __aio_suspend() increments "_aio_kernel_suspend" 13512248Sraf * when they are waiting in the kernel for completed I/Os. 13522248Sraf * 13532248Sraf * _kaio(AIONOTIFY) awakes the corresponding function 13542248Sraf * in the kernel; then the corresponding __aio_waitn() or 13552248Sraf * __aio_suspend() function could reap the recently 13562248Sraf * completed I/Os (_aiodone()). 13572248Sraf */ 13582248Sraf if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0) 13592248Sraf (void) _kaio(AIONOTIFY); 13602248Sraf 13612248Sraf sig_mutex_unlock(&__aio_mutex); 13622248Sraf 13632248Sraf if (head != NULL) { 13642248Sraf /* 13652248Sraf * If all the lio requests have completed, 13662248Sraf * prepare to notify the waiting thread. 13672248Sraf */ 13682248Sraf sig_mutex_lock(&head->lio_mutex); 13692248Sraf ASSERT(head->lio_refcnt == head->lio_nent); 13702248Sraf if (head->lio_refcnt == 1) { 13712248Sraf int waiting = 0; 13722248Sraf if (head->lio_mode == LIO_WAIT) { 13732248Sraf if ((waiting = head->lio_waiting) != 0) 13742248Sraf (void) cond_signal(&head->lio_cond_cv); 13752248Sraf } else if (head->lio_port < 0) { /* none or signal */ 13762248Sraf if ((np.np_lio_signo = head->lio_signo) != 0) 13772248Sraf notify = 1; 13782248Sraf np.np_lio_user = head->lio_sigval.sival_ptr; 13792248Sraf } else { /* thread or port */ 13802248Sraf notify = 1; 13812248Sraf np.np_lio_port = head->lio_port; 13822248Sraf np.np_lio_event = head->lio_event; 13832248Sraf np.np_lio_object = 13842248Sraf (uintptr_t)head->lio_sigevent; 13852248Sraf np.np_lio_user = head->lio_sigval.sival_ptr; 13862248Sraf } 13872248Sraf head->lio_nent = head->lio_refcnt = 0; 13882248Sraf sig_mutex_unlock(&head->lio_mutex); 13892248Sraf if (waiting == 0) 13902248Sraf _aio_lio_free(head); 13912248Sraf } else { 13922248Sraf head->lio_nent--; 13932248Sraf head->lio_refcnt--; 13942248Sraf sig_mutex_unlock(&head->lio_mutex); 13952248Sraf } 13962248Sraf } 13972248Sraf 13982248Sraf /* 13992248Sraf * The request is completed; now perform the notifications. 14002248Sraf */ 14012248Sraf if (notify) { 14022248Sraf if (reqp != NULL) { 14032248Sraf /* 14042248Sraf * We usually put the request on the notification 14052248Sraf * queue because we don't want to block and delay 14062248Sraf * other operations behind us in the work queue. 14072248Sraf * Also we must never block on a cancel notification 14082248Sraf * because we are being called from an application 14092248Sraf * thread in this case and that could lead to deadlock 14102248Sraf * if no other thread is receiving notificatins. 14112248Sraf */ 14122248Sraf reqp->req_notify = np; 14132248Sraf reqp->req_op = AIONOTIFY; 14142248Sraf _aio_req_add(reqp, &__workers_no, AIONOTIFY); 14152248Sraf reqp = NULL; 14162248Sraf } else { 14172248Sraf /* 14182248Sraf * We already put the request on the done queue, 14192248Sraf * so we can't queue it to the notification queue. 14202248Sraf * Just do the notification directly. 14212248Sraf */ 14222248Sraf send_notification(&np); 14232248Sraf } 14242248Sraf } 14252248Sraf 14262248Sraf if (reqp != NULL) 14272248Sraf _aio_req_free(reqp); 14282248Sraf } 14292248Sraf 14302248Sraf /* 14312248Sraf * Delete fsync requests from list head until there is 14322248Sraf * only one left. Return 0 when there is only one, 14332248Sraf * otherwise return a non-zero value. 14342248Sraf */ 14352248Sraf static int 14362248Sraf _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp) 14372248Sraf { 14382248Sraf aio_lio_t *head = reqp->req_head; 14392248Sraf int rval = 0; 14402248Sraf 14412248Sraf ASSERT(reqp == aiowp->work_req); 14422248Sraf sig_mutex_lock(&aiowp->work_qlock1); 14432248Sraf sig_mutex_lock(&head->lio_mutex); 14442248Sraf if (head->lio_refcnt > 1) { 14452248Sraf head->lio_refcnt--; 14462248Sraf head->lio_nent--; 14472248Sraf aiowp->work_req = NULL; 14482248Sraf sig_mutex_unlock(&head->lio_mutex); 14492248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 14502248Sraf sig_mutex_lock(&__aio_mutex); 14512248Sraf _aio_outstand_cnt--; 14522248Sraf _aio_waitn_wakeup(); 14532248Sraf sig_mutex_unlock(&__aio_mutex); 14542248Sraf _aio_req_free(reqp); 14552248Sraf return (1); 14562248Sraf } 14572248Sraf ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1); 14582248Sraf reqp->req_head = NULL; 14592248Sraf if (head->lio_canned) 14602248Sraf reqp->req_state = AIO_REQ_CANCELED; 14612248Sraf if (head->lio_mode == LIO_DESTROY) { 14622248Sraf aiowp->work_req = NULL; 14632248Sraf rval = 1; 14642248Sraf } 14652248Sraf sig_mutex_unlock(&head->lio_mutex); 14662248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 14672248Sraf head->lio_refcnt--; 14682248Sraf head->lio_nent--; 14692248Sraf _aio_lio_free(head); 14702248Sraf if (rval != 0) 14712248Sraf _aio_req_free(reqp); 14722248Sraf return (rval); 14732248Sraf } 14742248Sraf 14752248Sraf /* 14762248Sraf * A worker is set idle when its work queue is empty. 14772248Sraf * The worker checks again that it has no more work 14782248Sraf * and then goes to sleep waiting for more work. 14792248Sraf */ 14802248Sraf int 14812248Sraf _aio_idle(aio_worker_t *aiowp) 14822248Sraf { 14832248Sraf int error = 0; 14842248Sraf 14852248Sraf sig_mutex_lock(&aiowp->work_qlock1); 14862248Sraf if (aiowp->work_count1 == 0) { 14872248Sraf ASSERT(aiowp->work_minload1 == 0); 14882248Sraf aiowp->work_idleflg = 1; 14892248Sraf /* 14902248Sraf * A cancellation handler is not needed here. 14912248Sraf * aio worker threads are never cancelled via pthread_cancel(). 14922248Sraf */ 14932248Sraf error = sig_cond_wait(&aiowp->work_idle_cv, 14942248Sraf &aiowp->work_qlock1); 14952248Sraf /* 14962248Sraf * The idle flag is normally cleared before worker is awakened 14972248Sraf * by aio_req_add(). On error (EINTR), we clear it ourself. 14982248Sraf */ 14992248Sraf if (error) 15002248Sraf aiowp->work_idleflg = 0; 15012248Sraf } 15022248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 15032248Sraf return (error); 15042248Sraf } 15052248Sraf 15062248Sraf /* 15072248Sraf * A worker's completed AIO requests are placed onto a global 15082248Sraf * done queue. The application is only sent a SIGIO signal if 15092248Sraf * the process has a handler enabled and it is not waiting via 15102248Sraf * aiowait(). 15112248Sraf */ 15122248Sraf static void 15132248Sraf _aio_work_done(aio_worker_t *aiowp) 15142248Sraf { 15152248Sraf aio_req_t *reqp; 15162248Sraf 15172248Sraf sig_mutex_lock(&aiowp->work_qlock1); 15182248Sraf reqp = aiowp->work_prev1; 15192248Sraf reqp->req_next = NULL; 15202248Sraf aiowp->work_done1 = 0; 15212248Sraf aiowp->work_tail1 = aiowp->work_next1; 15222248Sraf if (aiowp->work_tail1 == NULL) 15232248Sraf aiowp->work_head1 = NULL; 15242248Sraf aiowp->work_prev1 = NULL; 15252248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 15262248Sraf sig_mutex_lock(&__aio_mutex); 15272248Sraf _aio_donecnt++; 15282248Sraf _aio_outstand_cnt--; 15292248Sraf _aio_req_done_cnt--; 15302248Sraf ASSERT(_aio_donecnt > 0 && 15312248Sraf _aio_outstand_cnt >= 0 && 15322248Sraf _aio_req_done_cnt >= 0); 15332248Sraf ASSERT(reqp != NULL); 15342248Sraf 15352248Sraf if (_aio_done_tail == NULL) { 15362248Sraf _aio_done_head = _aio_done_tail = reqp; 15372248Sraf } else { 15382248Sraf _aio_done_head->req_next = reqp; 15392248Sraf _aio_done_head = reqp; 15402248Sraf } 15412248Sraf 15422248Sraf if (_aiowait_flag) { 15432248Sraf sig_mutex_unlock(&__aio_mutex); 15442248Sraf (void) _kaio(AIONOTIFY); 15452248Sraf } else { 15462248Sraf sig_mutex_unlock(&__aio_mutex); 15472248Sraf if (_sigio_enabled) 15482248Sraf (void) kill(__pid, SIGIO); 15492248Sraf } 15502248Sraf } 15512248Sraf 15522248Sraf /* 15532248Sraf * The done queue consists of AIO requests that are in either the 15542248Sraf * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled 15552248Sraf * are discarded. If the done queue is empty then NULL is returned. 15562248Sraf * Otherwise the address of a done aio_result_t is returned. 15572248Sraf */ 15582248Sraf aio_result_t * 15592248Sraf _aio_req_done(void) 15602248Sraf { 15612248Sraf aio_req_t *reqp; 15622248Sraf aio_result_t *resultp; 15632248Sraf 15642248Sraf ASSERT(MUTEX_HELD(&__aio_mutex)); 15652248Sraf 15662248Sraf if ((reqp = _aio_done_tail) != NULL) { 15672248Sraf if ((_aio_done_tail = reqp->req_next) == NULL) 15682248Sraf _aio_done_head = NULL; 15692248Sraf ASSERT(_aio_donecnt > 0); 15702248Sraf _aio_donecnt--; 15712248Sraf (void) _aio_hash_del(reqp->req_resultp); 15722248Sraf resultp = reqp->req_resultp; 15732248Sraf ASSERT(reqp->req_state == AIO_REQ_DONE); 15742248Sraf _aio_req_free(reqp); 15752248Sraf return (resultp); 15762248Sraf } 15772248Sraf /* is queue empty? */ 15782248Sraf if (reqp == NULL && _aio_outstand_cnt == 0) { 15792248Sraf return ((aio_result_t *)-1); 15802248Sraf } 15812248Sraf return (NULL); 15822248Sraf } 15832248Sraf 15842248Sraf /* 15852248Sraf * Set the return and errno values for the application's use. 15862248Sraf * 15872248Sraf * For the Posix interfaces, we must set the return value first followed 15882248Sraf * by the errno value because the Posix interfaces allow for a change 15892248Sraf * in the errno value from EINPROGRESS to something else to signal 15902248Sraf * the completion of the asynchronous request. 15912248Sraf * 15922248Sraf * The opposite is true for the Solaris interfaces. These allow for 15932248Sraf * a change in the return value from AIO_INPROGRESS to something else 15942248Sraf * to signal the completion of the asynchronous request. 15952248Sraf */ 15962248Sraf void 15972248Sraf _aio_set_result(aio_req_t *reqp, ssize_t retval, int error) 15982248Sraf { 15992248Sraf aio_result_t *resultp = reqp->req_resultp; 16002248Sraf 16012248Sraf if (POSIX_AIO(reqp)) { 16022248Sraf resultp->aio_return = retval; 16032248Sraf membar_producer(); 16042248Sraf resultp->aio_errno = error; 16052248Sraf } else { 16062248Sraf resultp->aio_errno = error; 16072248Sraf membar_producer(); 16082248Sraf resultp->aio_return = retval; 16092248Sraf } 16102248Sraf } 16112248Sraf 16122248Sraf /* 16132248Sraf * Add an AIO request onto the next work queue. 16142248Sraf * A circular list of workers is used to choose the next worker. 16152248Sraf */ 16162248Sraf void 16172248Sraf _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode) 16182248Sraf { 16192248Sraf ulwp_t *self = curthread; 16202248Sraf aio_worker_t *aiowp; 16212248Sraf aio_worker_t *first; 16222248Sraf int load_bal_flg = 1; 16232248Sraf int found; 16242248Sraf 16252248Sraf ASSERT(reqp->req_state != AIO_REQ_DONEQ); 16262248Sraf reqp->req_next = NULL; 16272248Sraf /* 16282248Sraf * Try to acquire the next worker's work queue. If it is locked, 16292248Sraf * then search the list of workers until a queue is found unlocked, 16302248Sraf * or until the list is completely traversed at which point another 16312248Sraf * worker will be created. 16322248Sraf */ 16332248Sraf sigoff(self); /* defer SIGIO */ 16342248Sraf sig_mutex_lock(&__aio_mutex); 16352248Sraf first = aiowp = *nextworker; 16362248Sraf if (mode != AIONOTIFY) 16372248Sraf _aio_outstand_cnt++; 16382248Sraf sig_mutex_unlock(&__aio_mutex); 16392248Sraf 16402248Sraf switch (mode) { 16412248Sraf case AIOREAD: 16422248Sraf case AIOWRITE: 16432248Sraf case AIOAREAD: 16442248Sraf case AIOAWRITE: 16452248Sraf #if !defined(_LP64) 16462248Sraf case AIOAREAD64: 16472248Sraf case AIOAWRITE64: 16482248Sraf #endif 16492248Sraf /* try to find an idle worker */ 16502248Sraf found = 0; 16512248Sraf do { 16522248Sraf if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 16532248Sraf if (aiowp->work_idleflg) { 16542248Sraf found = 1; 16552248Sraf break; 16562248Sraf } 16572248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 16582248Sraf } 16592248Sraf } while ((aiowp = aiowp->work_forw) != first); 16602248Sraf 16612248Sraf if (found) { 16622248Sraf aiowp->work_minload1++; 16632248Sraf break; 16642248Sraf } 16652248Sraf 16662248Sraf /* try to acquire some worker's queue lock */ 16672248Sraf do { 16682248Sraf if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 16692248Sraf found = 1; 16702248Sraf break; 16712248Sraf } 16722248Sraf } while ((aiowp = aiowp->work_forw) != first); 16732248Sraf 16742248Sraf /* 16752248Sraf * Create more workers when the workers appear overloaded. 16762248Sraf * Either all the workers are busy draining their queues 16772248Sraf * or no worker's queue lock could be acquired. 16782248Sraf */ 16792248Sraf if (!found) { 16802248Sraf if (_aio_worker_cnt < _max_workers) { 16812248Sraf if (_aio_create_worker(reqp, mode)) 16822248Sraf aio_panic("_aio_req_add: add worker"); 16832248Sraf sigon(self); /* reenable SIGIO */ 16842248Sraf return; 16852248Sraf } 16862248Sraf 16872248Sraf /* 16882248Sraf * No worker available and we have created 16892248Sraf * _max_workers, keep going through the 16902248Sraf * list slowly until we get a lock 16912248Sraf */ 16922248Sraf while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) { 16932248Sraf /* 16942248Sraf * give someone else a chance 16952248Sraf */ 16962248Sraf _aio_delay(1); 16972248Sraf aiowp = aiowp->work_forw; 16982248Sraf } 16992248Sraf } 17002248Sraf 17012248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 17022248Sraf if (_aio_worker_cnt < _max_workers && 17032248Sraf aiowp->work_minload1 >= _minworkload) { 17042248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 17052248Sraf sig_mutex_lock(&__aio_mutex); 17062248Sraf *nextworker = aiowp->work_forw; 17072248Sraf sig_mutex_unlock(&__aio_mutex); 17082248Sraf if (_aio_create_worker(reqp, mode)) 17092248Sraf aio_panic("aio_req_add: add worker"); 17102248Sraf sigon(self); /* reenable SIGIO */ 17112248Sraf return; 17122248Sraf } 17132248Sraf aiowp->work_minload1++; 17142248Sraf break; 17152248Sraf case AIOFSYNC: 17162248Sraf case AIONOTIFY: 17172248Sraf load_bal_flg = 0; 17182248Sraf sig_mutex_lock(&aiowp->work_qlock1); 17192248Sraf break; 17202248Sraf default: 17212248Sraf aio_panic("_aio_req_add: invalid mode"); 17222248Sraf break; 17232248Sraf } 17242248Sraf /* 17252248Sraf * Put request onto worker's work queue. 17262248Sraf */ 17272248Sraf if (aiowp->work_tail1 == NULL) { 17282248Sraf ASSERT(aiowp->work_count1 == 0); 17292248Sraf aiowp->work_tail1 = reqp; 17302248Sraf aiowp->work_next1 = reqp; 17312248Sraf } else { 17322248Sraf aiowp->work_head1->req_next = reqp; 17332248Sraf if (aiowp->work_next1 == NULL) 17342248Sraf aiowp->work_next1 = reqp; 17352248Sraf } 17362248Sraf reqp->req_state = AIO_REQ_QUEUED; 17372248Sraf reqp->req_worker = aiowp; 17382248Sraf aiowp->work_head1 = reqp; 17392248Sraf /* 17402248Sraf * Awaken worker if it is not currently active. 17412248Sraf */ 17422248Sraf if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) { 17432248Sraf aiowp->work_idleflg = 0; 17442248Sraf (void) cond_signal(&aiowp->work_idle_cv); 17452248Sraf } 17462248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 17472248Sraf 17482248Sraf if (load_bal_flg) { 17492248Sraf sig_mutex_lock(&__aio_mutex); 17502248Sraf *nextworker = aiowp->work_forw; 17512248Sraf sig_mutex_unlock(&__aio_mutex); 17522248Sraf } 17532248Sraf sigon(self); /* reenable SIGIO */ 17542248Sraf } 17552248Sraf 17562248Sraf /* 17572248Sraf * Get an AIO request for a specified worker. 17582248Sraf * If the work queue is empty, return NULL. 17592248Sraf */ 17602248Sraf aio_req_t * 17612248Sraf _aio_req_get(aio_worker_t *aiowp) 17622248Sraf { 17632248Sraf aio_req_t *reqp; 17642248Sraf 17652248Sraf sig_mutex_lock(&aiowp->work_qlock1); 17662248Sraf if ((reqp = aiowp->work_next1) != NULL) { 17672248Sraf /* 17682248Sraf * Remove a POSIX request from the queue; the 17692248Sraf * request queue is a singularly linked list 17702248Sraf * with a previous pointer. The request is 17712248Sraf * removed by updating the previous pointer. 17722248Sraf * 17732248Sraf * Non-posix requests are left on the queue 17742248Sraf * to eventually be placed on the done queue. 17752248Sraf */ 17762248Sraf 17772248Sraf if (POSIX_AIO(reqp)) { 17782248Sraf if (aiowp->work_prev1 == NULL) { 17792248Sraf aiowp->work_tail1 = reqp->req_next; 17802248Sraf if (aiowp->work_tail1 == NULL) 17812248Sraf aiowp->work_head1 = NULL; 17822248Sraf } else { 17832248Sraf aiowp->work_prev1->req_next = reqp->req_next; 17842248Sraf if (aiowp->work_head1 == reqp) 17852248Sraf aiowp->work_head1 = reqp->req_next; 17862248Sraf } 17872248Sraf 17882248Sraf } else { 17892248Sraf aiowp->work_prev1 = reqp; 17902248Sraf ASSERT(aiowp->work_done1 >= 0); 17912248Sraf aiowp->work_done1++; 17922248Sraf } 17932248Sraf ASSERT(reqp != reqp->req_next); 17942248Sraf aiowp->work_next1 = reqp->req_next; 17952248Sraf ASSERT(aiowp->work_count1 >= 1); 17962248Sraf aiowp->work_count1--; 17972248Sraf switch (reqp->req_op) { 17982248Sraf case AIOREAD: 17992248Sraf case AIOWRITE: 18002248Sraf case AIOAREAD: 18012248Sraf case AIOAWRITE: 18022248Sraf #if !defined(_LP64) 18032248Sraf case AIOAREAD64: 18042248Sraf case AIOAWRITE64: 18052248Sraf #endif 18062248Sraf ASSERT(aiowp->work_minload1 > 0); 18072248Sraf aiowp->work_minload1--; 18082248Sraf break; 18092248Sraf } 18102248Sraf reqp->req_state = AIO_REQ_INPROGRESS; 18112248Sraf } 18122248Sraf aiowp->work_req = reqp; 18132248Sraf ASSERT(reqp != NULL || aiowp->work_count1 == 0); 18142248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 18152248Sraf return (reqp); 18162248Sraf } 18172248Sraf 18182248Sraf static void 18192248Sraf _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate) 18202248Sraf { 18212248Sraf aio_req_t **last; 18222248Sraf aio_req_t *lastrp; 18232248Sraf aio_req_t *next; 18242248Sraf 18252248Sraf ASSERT(aiowp != NULL); 18262248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 18272248Sraf if (POSIX_AIO(reqp)) { 18282248Sraf if (ostate != AIO_REQ_QUEUED) 18292248Sraf return; 18302248Sraf } 18312248Sraf last = &aiowp->work_tail1; 18322248Sraf lastrp = aiowp->work_tail1; 18332248Sraf ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS); 18342248Sraf while ((next = *last) != NULL) { 18352248Sraf if (next == reqp) { 18362248Sraf *last = next->req_next; 18372248Sraf if (aiowp->work_next1 == next) 18382248Sraf aiowp->work_next1 = next->req_next; 18392248Sraf 18402248Sraf if ((next->req_next != NULL) || 18412248Sraf (aiowp->work_done1 == 0)) { 18422248Sraf if (aiowp->work_head1 == next) 18432248Sraf aiowp->work_head1 = next->req_next; 18442248Sraf if (aiowp->work_prev1 == next) 18452248Sraf aiowp->work_prev1 = next->req_next; 18462248Sraf } else { 18472248Sraf if (aiowp->work_head1 == next) 18482248Sraf aiowp->work_head1 = lastrp; 18492248Sraf if (aiowp->work_prev1 == next) 18502248Sraf aiowp->work_prev1 = lastrp; 18512248Sraf } 18522248Sraf 18532248Sraf if (ostate == AIO_REQ_QUEUED) { 18542248Sraf ASSERT(aiowp->work_count1 >= 1); 18552248Sraf aiowp->work_count1--; 18562248Sraf ASSERT(aiowp->work_minload1 >= 1); 18572248Sraf aiowp->work_minload1--; 18582248Sraf } else { 18592248Sraf ASSERT(ostate == AIO_REQ_INPROGRESS && 18602248Sraf !POSIX_AIO(reqp)); 18612248Sraf aiowp->work_done1--; 18622248Sraf } 18632248Sraf return; 18642248Sraf } 18652248Sraf last = &next->req_next; 18662248Sraf lastrp = next; 18672248Sraf } 18682248Sraf /* NOTREACHED */ 18692248Sraf } 18702248Sraf 18712248Sraf static void 18722248Sraf _aio_enq_doneq(aio_req_t *reqp) 18732248Sraf { 18742248Sraf if (_aio_doneq == NULL) { 18752248Sraf _aio_doneq = reqp; 18762248Sraf reqp->req_next = reqp->req_prev = reqp; 18772248Sraf } else { 18782248Sraf reqp->req_next = _aio_doneq; 18792248Sraf reqp->req_prev = _aio_doneq->req_prev; 18802248Sraf _aio_doneq->req_prev->req_next = reqp; 18812248Sraf _aio_doneq->req_prev = reqp; 18822248Sraf } 18832248Sraf reqp->req_state = AIO_REQ_DONEQ; 18842248Sraf _aio_doneq_cnt++; 18852248Sraf } 18862248Sraf 18872248Sraf /* 18882248Sraf * caller owns the _aio_mutex 18892248Sraf */ 18902248Sraf aio_req_t * 18912248Sraf _aio_req_remove(aio_req_t *reqp) 18922248Sraf { 18932248Sraf if (reqp && reqp->req_state != AIO_REQ_DONEQ) 18942248Sraf return (NULL); 18952248Sraf 18962248Sraf if (reqp) { 18972248Sraf /* request in done queue */ 18982248Sraf if (_aio_doneq == reqp) 18992248Sraf _aio_doneq = reqp->req_next; 19002248Sraf if (_aio_doneq == reqp) { 19012248Sraf /* only one request on queue */ 19022248Sraf _aio_doneq = NULL; 19032248Sraf } else { 19042248Sraf aio_req_t *tmp = reqp->req_next; 19052248Sraf reqp->req_prev->req_next = tmp; 19062248Sraf tmp->req_prev = reqp->req_prev; 19072248Sraf } 19082248Sraf } else if ((reqp = _aio_doneq) != NULL) { 19092248Sraf if (reqp == reqp->req_next) { 19102248Sraf /* only one request on queue */ 19112248Sraf _aio_doneq = NULL; 19122248Sraf } else { 19132248Sraf reqp->req_prev->req_next = _aio_doneq = reqp->req_next; 19142248Sraf _aio_doneq->req_prev = reqp->req_prev; 19152248Sraf } 19162248Sraf } 19172248Sraf if (reqp) { 19182248Sraf _aio_doneq_cnt--; 19192248Sraf reqp->req_next = reqp->req_prev = reqp; 19202248Sraf reqp->req_state = AIO_REQ_DONE; 19212248Sraf } 19222248Sraf return (reqp); 19232248Sraf } 19242248Sraf 19252248Sraf /* 19262248Sraf * An AIO request is identified by an aio_result_t pointer. The library 19272248Sraf * maps this aio_result_t pointer to its internal representation using a 19282248Sraf * hash table. This function adds an aio_result_t pointer to the hash table. 19292248Sraf */ 19302248Sraf static int 19312248Sraf _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp) 19322248Sraf { 19332248Sraf aio_hash_t *hashp; 19342248Sraf aio_req_t **prev; 19352248Sraf aio_req_t *next; 19362248Sraf 19372248Sraf hashp = _aio_hash + AIOHASH(resultp); 19382248Sraf lmutex_lock(&hashp->hash_lock); 19392248Sraf prev = &hashp->hash_ptr; 19402248Sraf while ((next = *prev) != NULL) { 19412248Sraf if (resultp == next->req_resultp) { 19422248Sraf lmutex_unlock(&hashp->hash_lock); 19432248Sraf return (-1); 19442248Sraf } 19452248Sraf prev = &next->req_link; 19462248Sraf } 19472248Sraf *prev = reqp; 19482248Sraf ASSERT(reqp->req_link == NULL); 19492248Sraf lmutex_unlock(&hashp->hash_lock); 19502248Sraf return (0); 19512248Sraf } 19522248Sraf 19532248Sraf /* 19542248Sraf * Remove an entry from the hash table. 19552248Sraf */ 19562248Sraf aio_req_t * 19572248Sraf _aio_hash_del(aio_result_t *resultp) 19582248Sraf { 19592248Sraf aio_hash_t *hashp; 19602248Sraf aio_req_t **prev; 19612248Sraf aio_req_t *next = NULL; 19622248Sraf 19632248Sraf if (_aio_hash != NULL) { 19642248Sraf hashp = _aio_hash + AIOHASH(resultp); 19652248Sraf lmutex_lock(&hashp->hash_lock); 19662248Sraf prev = &hashp->hash_ptr; 19672248Sraf while ((next = *prev) != NULL) { 19682248Sraf if (resultp == next->req_resultp) { 19692248Sraf *prev = next->req_link; 19702248Sraf next->req_link = NULL; 19712248Sraf break; 19722248Sraf } 19732248Sraf prev = &next->req_link; 19742248Sraf } 19752248Sraf lmutex_unlock(&hashp->hash_lock); 19762248Sraf } 19772248Sraf return (next); 19782248Sraf } 19792248Sraf 19802248Sraf /* 19812248Sraf * find an entry in the hash table 19822248Sraf */ 19832248Sraf aio_req_t * 19842248Sraf _aio_hash_find(aio_result_t *resultp) 19852248Sraf { 19862248Sraf aio_hash_t *hashp; 19872248Sraf aio_req_t **prev; 19882248Sraf aio_req_t *next = NULL; 19892248Sraf 19902248Sraf if (_aio_hash != NULL) { 19912248Sraf hashp = _aio_hash + AIOHASH(resultp); 19922248Sraf lmutex_lock(&hashp->hash_lock); 19932248Sraf prev = &hashp->hash_ptr; 19942248Sraf while ((next = *prev) != NULL) { 19952248Sraf if (resultp == next->req_resultp) 19962248Sraf break; 19972248Sraf prev = &next->req_link; 19982248Sraf } 19992248Sraf lmutex_unlock(&hashp->hash_lock); 20002248Sraf } 20012248Sraf return (next); 20022248Sraf } 20032248Sraf 20042248Sraf /* 20052248Sraf * AIO interface for POSIX 20062248Sraf */ 20072248Sraf int 20082248Sraf _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 20092248Sraf int mode, int flg) 20102248Sraf { 20112248Sraf aio_req_t *reqp; 20122248Sraf aio_args_t *ap; 20132248Sraf int kerr; 20142248Sraf 20152248Sraf if (aiocbp == NULL) { 20162248Sraf errno = EINVAL; 20172248Sraf return (-1); 20182248Sraf } 20192248Sraf 20202248Sraf /* initialize kaio */ 20212248Sraf if (!_kaio_ok) 20222248Sraf _kaio_init(); 20232248Sraf 20242248Sraf aiocbp->aio_state = NOCHECK; 20252248Sraf 20262248Sraf /* 20272248Sraf * If we have been called because a list I/O 20282248Sraf * kaio() failed, we dont want to repeat the 20292248Sraf * system call 20302248Sraf */ 20312248Sraf 20322248Sraf if (flg & AIO_KAIO) { 20332248Sraf /* 20342248Sraf * Try kernel aio first. 20352248Sraf * If errno is ENOTSUP/EBADFD, 20362248Sraf * fall back to the thread implementation. 20372248Sraf */ 20382248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 20392248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 20402248Sraf aiocbp->aio_state = CHECK; 20412248Sraf kerr = (int)_kaio(mode, aiocbp); 20422248Sraf if (kerr == 0) 20432248Sraf return (0); 20442248Sraf if (errno != ENOTSUP && errno != EBADFD) { 20452248Sraf aiocbp->aio_resultp.aio_errno = errno; 20462248Sraf aiocbp->aio_resultp.aio_return = -1; 20472248Sraf aiocbp->aio_state = NOCHECK; 20482248Sraf return (-1); 20492248Sraf } 20502248Sraf if (errno == EBADFD) 20512248Sraf SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 20522248Sraf } 20532248Sraf } 20542248Sraf 20552248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 20562248Sraf aiocbp->aio_state = USERAIO; 20572248Sraf 20582248Sraf if (!__uaio_ok && __uaio_init() == -1) 20592248Sraf return (-1); 20602248Sraf 20612248Sraf if ((reqp = _aio_req_alloc()) == NULL) { 20622248Sraf errno = EAGAIN; 20632248Sraf return (-1); 20642248Sraf } 20652248Sraf 20662248Sraf /* 20672248Sraf * If an LIO request, add the list head to the aio request 20682248Sraf */ 20692248Sraf reqp->req_head = lio_head; 20702248Sraf reqp->req_type = AIO_POSIX_REQ; 20712248Sraf reqp->req_op = mode; 20722248Sraf reqp->req_largefile = 0; 20732248Sraf 20742248Sraf if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 20752248Sraf reqp->req_sigevent.sigev_notify = SIGEV_NONE; 20762248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 20772248Sraf reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 20782248Sraf reqp->req_sigevent.sigev_signo = 20792248Sraf aiocbp->aio_sigevent.sigev_signo; 20802248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 20812248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 20822248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 20832248Sraf port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 20842248Sraf reqp->req_sigevent.sigev_notify = SIGEV_PORT; 20852248Sraf /* 20862248Sraf * Reuse the sigevent structure to contain the port number 20872248Sraf * and the user value. Same for SIGEV_THREAD, below. 20882248Sraf */ 20892248Sraf reqp->req_sigevent.sigev_signo = 20902248Sraf pn->portnfy_port; 20912248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 20922248Sraf pn->portnfy_user; 20932248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 20942248Sraf reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 20952248Sraf /* 20962248Sraf * The sigevent structure contains the port number 20972248Sraf * and the user value. Same for SIGEV_PORT, above. 20982248Sraf */ 20992248Sraf reqp->req_sigevent.sigev_signo = 21002248Sraf aiocbp->aio_sigevent.sigev_signo; 21012248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 21022248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 21032248Sraf } 21042248Sraf 21052248Sraf reqp->req_resultp = &aiocbp->aio_resultp; 21062248Sraf reqp->req_aiocbp = aiocbp; 21072248Sraf ap = &reqp->req_args; 21082248Sraf ap->fd = aiocbp->aio_fildes; 21092248Sraf ap->buf = (caddr_t)aiocbp->aio_buf; 21102248Sraf ap->bufsz = aiocbp->aio_nbytes; 21112248Sraf ap->offset = aiocbp->aio_offset; 21122248Sraf 21132248Sraf if ((flg & AIO_NO_DUPS) && 21142248Sraf _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 21152248Sraf aio_panic("_aio_rw(): request already in hash table"); 21162248Sraf _aio_req_free(reqp); 21172248Sraf errno = EINVAL; 21182248Sraf return (-1); 21192248Sraf } 21202248Sraf _aio_req_add(reqp, nextworker, mode); 21212248Sraf return (0); 21222248Sraf } 21232248Sraf 21242248Sraf #if !defined(_LP64) 21252248Sraf /* 21262248Sraf * 64-bit AIO interface for POSIX 21272248Sraf */ 21282248Sraf int 21292248Sraf _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 21302248Sraf int mode, int flg) 21312248Sraf { 21322248Sraf aio_req_t *reqp; 21332248Sraf aio_args_t *ap; 21342248Sraf int kerr; 21352248Sraf 21362248Sraf if (aiocbp == NULL) { 21372248Sraf errno = EINVAL; 21382248Sraf return (-1); 21392248Sraf } 21402248Sraf 21412248Sraf /* initialize kaio */ 21422248Sraf if (!_kaio_ok) 21432248Sraf _kaio_init(); 21442248Sraf 21452248Sraf aiocbp->aio_state = NOCHECK; 21462248Sraf 21472248Sraf /* 21482248Sraf * If we have been called because a list I/O 21492248Sraf * kaio() failed, we dont want to repeat the 21502248Sraf * system call 21512248Sraf */ 21522248Sraf 21532248Sraf if (flg & AIO_KAIO) { 21542248Sraf /* 21552248Sraf * Try kernel aio first. 21562248Sraf * If errno is ENOTSUP/EBADFD, 21572248Sraf * fall back to the thread implementation. 21582248Sraf */ 21592248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 21602248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 21612248Sraf aiocbp->aio_state = CHECK; 21622248Sraf kerr = (int)_kaio(mode, aiocbp); 21632248Sraf if (kerr == 0) 21642248Sraf return (0); 21652248Sraf if (errno != ENOTSUP && errno != EBADFD) { 21662248Sraf aiocbp->aio_resultp.aio_errno = errno; 21672248Sraf aiocbp->aio_resultp.aio_return = -1; 21682248Sraf aiocbp->aio_state = NOCHECK; 21692248Sraf return (-1); 21702248Sraf } 21712248Sraf if (errno == EBADFD) 21722248Sraf SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 21732248Sraf } 21742248Sraf } 21752248Sraf 21762248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 21772248Sraf aiocbp->aio_state = USERAIO; 21782248Sraf 21792248Sraf if (!__uaio_ok && __uaio_init() == -1) 21802248Sraf return (-1); 21812248Sraf 21822248Sraf if ((reqp = _aio_req_alloc()) == NULL) { 21832248Sraf errno = EAGAIN; 21842248Sraf return (-1); 21852248Sraf } 21862248Sraf 21872248Sraf /* 21882248Sraf * If an LIO request, add the list head to the aio request 21892248Sraf */ 21902248Sraf reqp->req_head = lio_head; 21912248Sraf reqp->req_type = AIO_POSIX_REQ; 21922248Sraf reqp->req_op = mode; 21932248Sraf reqp->req_largefile = 1; 21942248Sraf 21952248Sraf if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 21962248Sraf reqp->req_sigevent.sigev_notify = SIGEV_NONE; 21972248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 21982248Sraf reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 21992248Sraf reqp->req_sigevent.sigev_signo = 22002248Sraf aiocbp->aio_sigevent.sigev_signo; 22012248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 22022248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 22032248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 22042248Sraf port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 22052248Sraf reqp->req_sigevent.sigev_notify = SIGEV_PORT; 22062248Sraf reqp->req_sigevent.sigev_signo = 22072248Sraf pn->portnfy_port; 22082248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 22092248Sraf pn->portnfy_user; 22102248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 22112248Sraf reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 22122248Sraf reqp->req_sigevent.sigev_signo = 22132248Sraf aiocbp->aio_sigevent.sigev_signo; 22142248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 22152248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 22162248Sraf } 22172248Sraf 22182248Sraf reqp->req_resultp = &aiocbp->aio_resultp; 22192248Sraf reqp->req_aiocbp = aiocbp; 22202248Sraf ap = &reqp->req_args; 22212248Sraf ap->fd = aiocbp->aio_fildes; 22222248Sraf ap->buf = (caddr_t)aiocbp->aio_buf; 22232248Sraf ap->bufsz = aiocbp->aio_nbytes; 22242248Sraf ap->offset = aiocbp->aio_offset; 22252248Sraf 22262248Sraf if ((flg & AIO_NO_DUPS) && 22272248Sraf _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 22282248Sraf aio_panic("_aio_rw64(): request already in hash table"); 22292248Sraf _aio_req_free(reqp); 22302248Sraf errno = EINVAL; 22312248Sraf return (-1); 22322248Sraf } 22332248Sraf _aio_req_add(reqp, nextworker, mode); 22342248Sraf return (0); 22352248Sraf } 22362248Sraf #endif /* !defined(_LP64) */ 2237