12248Sraf /* 22248Sraf * CDDL HEADER START 32248Sraf * 42248Sraf * The contents of this file are subject to the terms of the 52248Sraf * Common Development and Distribution License (the "License"). 62248Sraf * You may not use this file except in compliance with the License. 72248Sraf * 82248Sraf * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 92248Sraf * or http://www.opensolaris.org/os/licensing. 102248Sraf * See the License for the specific language governing permissions 112248Sraf * and limitations under the License. 122248Sraf * 132248Sraf * When distributing Covered Code, include this CDDL HEADER in each 142248Sraf * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 152248Sraf * If applicable, add the following below this CDDL HEADER, with the 162248Sraf * fields enclosed by brackets "[]" replaced with your own identifying 172248Sraf * information: Portions Copyright [yyyy] [name of copyright owner] 182248Sraf * 192248Sraf * CDDL HEADER END 202248Sraf */ 212248Sraf 222248Sraf /* 235891Sraf * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 242248Sraf * Use is subject to license terms. 252248Sraf */ 262248Sraf 272248Sraf #pragma ident "%Z%%M% %I% %E% SMI" 282248Sraf 29*6812Sraf #include "lint.h" 302248Sraf #include "thr_uberdata.h" 312248Sraf #include "asyncio.h" 322248Sraf #include <atomic.h> 332248Sraf #include <sys/param.h> 342248Sraf #include <sys/file.h> 352248Sraf #include <sys/port.h> 362248Sraf 372248Sraf static int _aio_hash_insert(aio_result_t *, aio_req_t *); 382248Sraf static aio_req_t *_aio_req_get(aio_worker_t *); 392248Sraf static void _aio_req_add(aio_req_t *, aio_worker_t **, int); 402248Sraf static void _aio_req_del(aio_worker_t *, aio_req_t *, int); 412248Sraf static void _aio_work_done(aio_worker_t *); 422248Sraf static void _aio_enq_doneq(aio_req_t *); 432248Sraf 442248Sraf extern void _aio_lio_free(aio_lio_t *); 452248Sraf 462248Sraf extern int __fdsync(int, int); 475937Sraf extern int __fcntl(int, int, ...); 482248Sraf extern int _port_dispatch(int, int, int, int, uintptr_t, void *); 492248Sraf 502248Sraf static int _aio_fsync_del(aio_worker_t *, aio_req_t *); 512248Sraf static void _aiodone(aio_req_t *, ssize_t, int); 522248Sraf static void _aio_cancel_work(aio_worker_t *, int, int *, int *); 532248Sraf static void _aio_finish_request(aio_worker_t *, ssize_t, int); 542248Sraf 552248Sraf /* 562248Sraf * switch for kernel async I/O 572248Sraf */ 582248Sraf int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */ 592248Sraf 602248Sraf /* 612248Sraf * Key for thread-specific data 622248Sraf */ 632248Sraf pthread_key_t _aio_key; 642248Sraf 652248Sraf /* 662248Sraf * Array for determining whether or not a file supports kaio. 672248Sraf * Initialized in _kaio_init(). 682248Sraf */ 692248Sraf uint32_t *_kaio_supported = NULL; 702248Sraf 712248Sraf /* 722248Sraf * workers for read/write requests 732248Sraf * (__aio_mutex lock protects circular linked list of workers) 742248Sraf */ 752248Sraf aio_worker_t *__workers_rw; /* circular list of AIO workers */ 762248Sraf aio_worker_t *__nextworker_rw; /* next worker in list of workers */ 772248Sraf int __rw_workerscnt; /* number of read/write workers */ 782248Sraf 792248Sraf /* 802248Sraf * worker for notification requests. 812248Sraf */ 822248Sraf aio_worker_t *__workers_no; /* circular list of AIO workers */ 832248Sraf aio_worker_t *__nextworker_no; /* next worker in list of workers */ 842248Sraf int __no_workerscnt; /* number of write workers */ 852248Sraf 862248Sraf aio_req_t *_aio_done_tail; /* list of done requests */ 872248Sraf aio_req_t *_aio_done_head; 882248Sraf 892248Sraf mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */ 902248Sraf cond_t __aio_initcv = DEFAULTCV; 912248Sraf int __aio_initbusy = 0; 922248Sraf 932248Sraf mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */ 942248Sraf cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */ 952248Sraf 962248Sraf pid_t __pid = (pid_t)-1; /* initialize as invalid pid */ 972248Sraf int _sigio_enabled = 0; /* when set, send SIGIO signal */ 982248Sraf 992248Sraf aio_hash_t *_aio_hash; 1002248Sraf 1012248Sraf aio_req_t *_aio_doneq; /* double linked done queue list */ 1022248Sraf 1032248Sraf int _aio_donecnt = 0; 1042248Sraf int _aio_waitncnt = 0; /* # of requests for aio_waitn */ 1052248Sraf int _aio_doneq_cnt = 0; 1062248Sraf int _aio_outstand_cnt = 0; /* # of outstanding requests */ 1072248Sraf int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */ 1082248Sraf int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */ 1092248Sraf int _aio_kernel_suspend = 0; /* active kernel kaio calls */ 1102248Sraf int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */ 1112248Sraf 1122248Sraf int _max_workers = 256; /* max number of workers permitted */ 1132248Sraf int _min_workers = 4; /* min number of workers */ 1142248Sraf int _minworkload = 2; /* min number of request in q */ 1152248Sraf int _aio_worker_cnt = 0; /* number of workers to do requests */ 1162248Sraf int __uaio_ok = 0; /* AIO has been enabled */ 1172248Sraf sigset_t _worker_set; /* worker's signal mask */ 1182248Sraf 1192248Sraf int _aiowait_flag = 0; /* when set, aiowait() is inprogress */ 1202248Sraf int _aio_flags = 0; /* see asyncio.h defines for */ 1212248Sraf 1222248Sraf aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */ 1232248Sraf 1242248Sraf int hz; /* clock ticks per second */ 1252248Sraf 1262248Sraf static int 1272248Sraf _kaio_supported_init(void) 1282248Sraf { 1292248Sraf void *ptr; 1302248Sraf size_t size; 1312248Sraf 1322248Sraf if (_kaio_supported != NULL) /* already initialized */ 1332248Sraf return (0); 1342248Sraf 1352248Sraf size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t); 1362248Sraf ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, 1372248Sraf MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 1382248Sraf if (ptr == MAP_FAILED) 1392248Sraf return (-1); 1402248Sraf _kaio_supported = ptr; 1412248Sraf return (0); 1422248Sraf } 1432248Sraf 1442248Sraf /* 1452248Sraf * The aio subsystem is initialized when an AIO request is made. 1462248Sraf * Constants are initialized like the max number of workers that 1472248Sraf * the subsystem can create, and the minimum number of workers 1482248Sraf * permitted before imposing some restrictions. Also, some 1492248Sraf * workers are created. 1502248Sraf */ 1512248Sraf int 1522248Sraf __uaio_init(void) 1532248Sraf { 1542248Sraf int ret = -1; 1552248Sraf int i; 1565891Sraf int cancel_state; 1572248Sraf 1582248Sraf lmutex_lock(&__aio_initlock); 1595891Sraf (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 1602248Sraf while (__aio_initbusy) 1615891Sraf (void) cond_wait(&__aio_initcv, &__aio_initlock); 1625891Sraf (void) pthread_setcancelstate(cancel_state, NULL); 1632248Sraf if (__uaio_ok) { /* already initialized */ 1642248Sraf lmutex_unlock(&__aio_initlock); 1652248Sraf return (0); 1662248Sraf } 1672248Sraf __aio_initbusy = 1; 1682248Sraf lmutex_unlock(&__aio_initlock); 1692248Sraf 1702248Sraf hz = (int)sysconf(_SC_CLK_TCK); 1712248Sraf __pid = getpid(); 1722248Sraf 1732248Sraf setup_cancelsig(SIGAIOCANCEL); 1742248Sraf 1752248Sraf if (_kaio_supported_init() != 0) 1762248Sraf goto out; 1772248Sraf 1782248Sraf /* 1792248Sraf * Allocate and initialize the hash table. 1803344Ssp92102 * Do this only once, even if __uaio_init() is called twice. 1812248Sraf */ 1823344Ssp92102 if (_aio_hash == NULL) { 1833344Ssp92102 /* LINTED pointer cast */ 1843344Ssp92102 _aio_hash = (aio_hash_t *)mmap(NULL, 1853344Ssp92102 HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE, 1863344Ssp92102 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 1873344Ssp92102 if ((void *)_aio_hash == MAP_FAILED) { 1883344Ssp92102 _aio_hash = NULL; 1893344Ssp92102 goto out; 1903344Ssp92102 } 1913344Ssp92102 for (i = 0; i < HASHSZ; i++) 1923344Ssp92102 (void) mutex_init(&_aio_hash[i].hash_lock, 1933344Ssp92102 USYNC_THREAD, NULL); 1942248Sraf } 1952248Sraf 1962248Sraf /* 1972248Sraf * Initialize worker's signal mask to only catch SIGAIOCANCEL. 1982248Sraf */ 1992248Sraf (void) sigfillset(&_worker_set); 2002248Sraf (void) sigdelset(&_worker_set, SIGAIOCANCEL); 2012248Sraf 2022248Sraf /* 2033344Ssp92102 * Create one worker to send asynchronous notifications. 2043344Ssp92102 * Do this only once, even if __uaio_init() is called twice. 2053344Ssp92102 */ 2063344Ssp92102 if (__no_workerscnt == 0 && 2073344Ssp92102 (_aio_create_worker(NULL, AIONOTIFY) != 0)) { 2083344Ssp92102 errno = EAGAIN; 2093344Ssp92102 goto out; 2103344Ssp92102 } 2113344Ssp92102 2123344Ssp92102 /* 2132248Sraf * Create the minimum number of read/write workers. 2143344Ssp92102 * And later check whether atleast one worker is created; 2153344Ssp92102 * lwp_create() calls could fail because of segkp exhaustion. 2162248Sraf */ 2172248Sraf for (i = 0; i < _min_workers; i++) 2182248Sraf (void) _aio_create_worker(NULL, AIOREAD); 2193344Ssp92102 if (__rw_workerscnt == 0) { 2203344Ssp92102 errno = EAGAIN; 2213344Ssp92102 goto out; 2223344Ssp92102 } 2232248Sraf 2242248Sraf ret = 0; 2252248Sraf out: 2262248Sraf lmutex_lock(&__aio_initlock); 2272248Sraf if (ret == 0) 2282248Sraf __uaio_ok = 1; 2292248Sraf __aio_initbusy = 0; 2302248Sraf (void) cond_broadcast(&__aio_initcv); 2312248Sraf lmutex_unlock(&__aio_initlock); 2322248Sraf return (ret); 2332248Sraf } 2342248Sraf 2352248Sraf /* 2362248Sraf * Called from close() before actually performing the real _close(). 2372248Sraf */ 2382248Sraf void 2392248Sraf _aio_close(int fd) 2402248Sraf { 2412248Sraf if (fd < 0) /* avoid cancelling everything */ 2422248Sraf return; 2432248Sraf /* 2442248Sraf * Cancel all outstanding aio requests for this file descriptor. 2452248Sraf */ 2462248Sraf if (__uaio_ok) 2472248Sraf (void) aiocancel_all(fd); 2482248Sraf /* 2492248Sraf * If we have allocated the bit array, clear the bit for this file. 2502248Sraf * The next open may re-use this file descriptor and the new file 2512248Sraf * may have different kaio() behaviour. 2522248Sraf */ 2532248Sraf if (_kaio_supported != NULL) 2542248Sraf CLEAR_KAIO_SUPPORTED(fd); 2552248Sraf } 2562248Sraf 2572248Sraf /* 2582248Sraf * special kaio cleanup thread sits in a loop in the 2592248Sraf * kernel waiting for pending kaio requests to complete. 2602248Sraf */ 2612248Sraf void * 2622248Sraf _kaio_cleanup_thread(void *arg) 2632248Sraf { 2642248Sraf if (pthread_setspecific(_aio_key, arg) != 0) 2652248Sraf aio_panic("_kaio_cleanup_thread, pthread_setspecific()"); 2662248Sraf (void) _kaio(AIOSTART); 2672248Sraf return (arg); 2682248Sraf } 2692248Sraf 2702248Sraf /* 2712248Sraf * initialize kaio. 2722248Sraf */ 2732248Sraf void 2742248Sraf _kaio_init() 2752248Sraf { 2762248Sraf int error; 2772248Sraf sigset_t oset; 2785891Sraf int cancel_state; 2792248Sraf 2802248Sraf lmutex_lock(&__aio_initlock); 2815891Sraf (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 2822248Sraf while (__aio_initbusy) 2835891Sraf (void) cond_wait(&__aio_initcv, &__aio_initlock); 2845891Sraf (void) pthread_setcancelstate(cancel_state, NULL); 2852248Sraf if (_kaio_ok) { /* already initialized */ 2862248Sraf lmutex_unlock(&__aio_initlock); 2872248Sraf return; 2882248Sraf } 2892248Sraf __aio_initbusy = 1; 2902248Sraf lmutex_unlock(&__aio_initlock); 2912248Sraf 2922248Sraf if (_kaio_supported_init() != 0) 2932248Sraf error = ENOMEM; 2942248Sraf else if ((_kaiowp = _aio_worker_alloc()) == NULL) 2952248Sraf error = ENOMEM; 2962248Sraf else if ((error = (int)_kaio(AIOINIT)) == 0) { 2972248Sraf (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 2982248Sraf error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread, 2992248Sraf _kaiowp, THR_DAEMON, &_kaiowp->work_tid); 3002248Sraf (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 3012248Sraf } 3022248Sraf if (error && _kaiowp != NULL) { 3032248Sraf _aio_worker_free(_kaiowp); 3042248Sraf _kaiowp = NULL; 3052248Sraf } 3062248Sraf 3072248Sraf lmutex_lock(&__aio_initlock); 3082248Sraf if (error) 3092248Sraf _kaio_ok = -1; 3102248Sraf else 3112248Sraf _kaio_ok = 1; 3122248Sraf __aio_initbusy = 0; 3132248Sraf (void) cond_broadcast(&__aio_initcv); 3142248Sraf lmutex_unlock(&__aio_initlock); 3152248Sraf } 3162248Sraf 3172248Sraf int 3182248Sraf aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 3192248Sraf aio_result_t *resultp) 3202248Sraf { 3212248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD)); 3222248Sraf } 3232248Sraf 3242248Sraf int 3252248Sraf aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 3262248Sraf aio_result_t *resultp) 3272248Sraf { 3282248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE)); 3292248Sraf } 3302248Sraf 3312248Sraf #if !defined(_LP64) 3322248Sraf int 3332248Sraf aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 3342248Sraf aio_result_t *resultp) 3352248Sraf { 3362248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64)); 3372248Sraf } 3382248Sraf 3392248Sraf int 3402248Sraf aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 3412248Sraf aio_result_t *resultp) 3422248Sraf { 3432248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64)); 3442248Sraf } 3452248Sraf #endif /* !defined(_LP64) */ 3462248Sraf 3472248Sraf int 3482248Sraf _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence, 3492248Sraf aio_result_t *resultp, int mode) 3502248Sraf { 3512248Sraf aio_req_t *reqp; 3522248Sraf aio_args_t *ap; 3532248Sraf offset_t loffset; 3545535Spraks struct stat64 stat64; 3552248Sraf int error = 0; 3562248Sraf int kerr; 3572248Sraf int umode; 3582248Sraf 3592248Sraf switch (whence) { 3602248Sraf 3612248Sraf case SEEK_SET: 3622248Sraf loffset = offset; 3632248Sraf break; 3642248Sraf case SEEK_CUR: 3652248Sraf if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1) 3662248Sraf error = -1; 3672248Sraf else 3682248Sraf loffset += offset; 3692248Sraf break; 3702248Sraf case SEEK_END: 3715535Spraks if (fstat64(fd, &stat64) == -1) 3722248Sraf error = -1; 3732248Sraf else 3745535Spraks loffset = offset + stat64.st_size; 3752248Sraf break; 3762248Sraf default: 3772248Sraf errno = EINVAL; 3782248Sraf error = -1; 3792248Sraf } 3802248Sraf 3812248Sraf if (error) 3822248Sraf return (error); 3832248Sraf 3842248Sraf /* initialize kaio */ 3852248Sraf if (!_kaio_ok) 3862248Sraf _kaio_init(); 3872248Sraf 3882248Sraf /* 3892248Sraf * _aio_do_request() needs the original request code (mode) to be able 3902248Sraf * to choose the appropiate 32/64 bit function. All other functions 3912248Sraf * only require the difference between READ and WRITE (umode). 3922248Sraf */ 3932248Sraf if (mode == AIOAREAD64 || mode == AIOAWRITE64) 3942248Sraf umode = mode - AIOAREAD64; 3952248Sraf else 3962248Sraf umode = mode; 3972248Sraf 3982248Sraf /* 3992248Sraf * Try kernel aio first. 4002248Sraf * If errno is ENOTSUP/EBADFD, fall back to the thread implementation. 4012248Sraf */ 4022248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) { 4032248Sraf resultp->aio_errno = 0; 4042248Sraf sig_mutex_lock(&__aio_mutex); 4052248Sraf _kaio_outstand_cnt++; 4065535Spraks sig_mutex_unlock(&__aio_mutex); 4072248Sraf kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ? 4082248Sraf (umode | AIO_POLL_BIT) : umode), 4092248Sraf fd, buf, bufsz, loffset, resultp); 4102248Sraf if (kerr == 0) { 4112248Sraf return (0); 4122248Sraf } 4135535Spraks sig_mutex_lock(&__aio_mutex); 4142248Sraf _kaio_outstand_cnt--; 4152248Sraf sig_mutex_unlock(&__aio_mutex); 4162248Sraf if (errno != ENOTSUP && errno != EBADFD) 4172248Sraf return (-1); 4182248Sraf if (errno == EBADFD) 4192248Sraf SET_KAIO_NOT_SUPPORTED(fd); 4202248Sraf } 4212248Sraf 4222248Sraf if (!__uaio_ok && __uaio_init() == -1) 4232248Sraf return (-1); 4242248Sraf 4252248Sraf if ((reqp = _aio_req_alloc()) == NULL) { 4262248Sraf errno = EAGAIN; 4272248Sraf return (-1); 4282248Sraf } 4292248Sraf 4302248Sraf /* 4312248Sraf * _aio_do_request() checks reqp->req_op to differentiate 4322248Sraf * between 32 and 64 bit access. 4332248Sraf */ 4342248Sraf reqp->req_op = mode; 4352248Sraf reqp->req_resultp = resultp; 4362248Sraf ap = &reqp->req_args; 4372248Sraf ap->fd = fd; 4382248Sraf ap->buf = buf; 4392248Sraf ap->bufsz = bufsz; 4402248Sraf ap->offset = loffset; 4412248Sraf 4422248Sraf if (_aio_hash_insert(resultp, reqp) != 0) { 4432248Sraf _aio_req_free(reqp); 4442248Sraf errno = EINVAL; 4452248Sraf return (-1); 4462248Sraf } 4472248Sraf /* 4482248Sraf * _aio_req_add() only needs the difference between READ and 4492248Sraf * WRITE to choose the right worker queue. 4502248Sraf */ 4512248Sraf _aio_req_add(reqp, &__nextworker_rw, umode); 4522248Sraf return (0); 4532248Sraf } 4542248Sraf 4552248Sraf int 4562248Sraf aiocancel(aio_result_t *resultp) 4572248Sraf { 4582248Sraf aio_req_t *reqp; 4592248Sraf aio_worker_t *aiowp; 4602248Sraf int ret; 4612248Sraf int done = 0; 4622248Sraf int canceled = 0; 4632248Sraf 4642248Sraf if (!__uaio_ok) { 4652248Sraf errno = EINVAL; 4662248Sraf return (-1); 4672248Sraf } 4682248Sraf 4692248Sraf sig_mutex_lock(&__aio_mutex); 4702248Sraf reqp = _aio_hash_find(resultp); 4712248Sraf if (reqp == NULL) { 4722248Sraf if (_aio_outstand_cnt == _aio_req_done_cnt) 4732248Sraf errno = EINVAL; 4742248Sraf else 4752248Sraf errno = EACCES; 4762248Sraf ret = -1; 4772248Sraf } else { 4782248Sraf aiowp = reqp->req_worker; 4792248Sraf sig_mutex_lock(&aiowp->work_qlock1); 4802248Sraf (void) _aio_cancel_req(aiowp, reqp, &canceled, &done); 4812248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 4822248Sraf 4832248Sraf if (canceled) { 4842248Sraf ret = 0; 4852248Sraf } else { 4862248Sraf if (_aio_outstand_cnt == 0 || 4872248Sraf _aio_outstand_cnt == _aio_req_done_cnt) 4882248Sraf errno = EINVAL; 4892248Sraf else 4902248Sraf errno = EACCES; 4912248Sraf ret = -1; 4922248Sraf } 4932248Sraf } 4942248Sraf sig_mutex_unlock(&__aio_mutex); 4952248Sraf return (ret); 4962248Sraf } 4972248Sraf 4985891Sraf /* ARGSUSED */ 4995891Sraf static void 5005891Sraf _aiowait_cleanup(void *arg) 5015891Sraf { 5025891Sraf sig_mutex_lock(&__aio_mutex); 5035891Sraf _aiowait_flag--; 5045891Sraf sig_mutex_unlock(&__aio_mutex); 5055891Sraf } 5065891Sraf 5072248Sraf /* 5085891Sraf * This must be asynch safe and cancel safe 5092248Sraf */ 5102248Sraf aio_result_t * 5112248Sraf aiowait(struct timeval *uwait) 5122248Sraf { 5132248Sraf aio_result_t *uresultp; 5142248Sraf aio_result_t *kresultp; 5152248Sraf aio_result_t *resultp; 5162248Sraf int dontblock; 5172248Sraf int timedwait = 0; 5182248Sraf int kaio_errno = 0; 5192248Sraf struct timeval twait; 5202248Sraf struct timeval *wait = NULL; 5212248Sraf hrtime_t hrtend; 5222248Sraf hrtime_t hres; 5232248Sraf 5242248Sraf if (uwait) { 5252248Sraf /* 5262248Sraf * Check for a valid specified wait time. 5272248Sraf * If it is invalid, fail the call right away. 5282248Sraf */ 5292248Sraf if (uwait->tv_sec < 0 || uwait->tv_usec < 0 || 5302248Sraf uwait->tv_usec >= MICROSEC) { 5312248Sraf errno = EINVAL; 5322248Sraf return ((aio_result_t *)-1); 5332248Sraf } 5342248Sraf 5352248Sraf if (uwait->tv_sec > 0 || uwait->tv_usec > 0) { 5362248Sraf hrtend = gethrtime() + 5374502Spraks (hrtime_t)uwait->tv_sec * NANOSEC + 5384502Spraks (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC); 5392248Sraf twait = *uwait; 5402248Sraf wait = &twait; 5412248Sraf timedwait++; 5422248Sraf } else { 5432248Sraf /* polling */ 5442248Sraf sig_mutex_lock(&__aio_mutex); 5452248Sraf if (_kaio_outstand_cnt == 0) { 5462248Sraf kresultp = (aio_result_t *)-1; 5472248Sraf } else { 5482248Sraf kresultp = (aio_result_t *)_kaio(AIOWAIT, 5492248Sraf (struct timeval *)-1, 1); 5502248Sraf if (kresultp != (aio_result_t *)-1 && 5512248Sraf kresultp != NULL && 5522248Sraf kresultp != (aio_result_t *)1) { 5532248Sraf _kaio_outstand_cnt--; 5542248Sraf sig_mutex_unlock(&__aio_mutex); 5552248Sraf return (kresultp); 5562248Sraf } 5572248Sraf } 5582248Sraf uresultp = _aio_req_done(); 5592248Sraf sig_mutex_unlock(&__aio_mutex); 5602248Sraf if (uresultp != NULL && 5612248Sraf uresultp != (aio_result_t *)-1) { 5622248Sraf return (uresultp); 5632248Sraf } 5642248Sraf if (uresultp == (aio_result_t *)-1 && 5652248Sraf kresultp == (aio_result_t *)-1) { 5662248Sraf errno = EINVAL; 5672248Sraf return ((aio_result_t *)-1); 5682248Sraf } else { 5692248Sraf return (NULL); 5702248Sraf } 5712248Sraf } 5722248Sraf } 5732248Sraf 5742248Sraf for (;;) { 5752248Sraf sig_mutex_lock(&__aio_mutex); 5762248Sraf uresultp = _aio_req_done(); 5772248Sraf if (uresultp != NULL && uresultp != (aio_result_t *)-1) { 5782248Sraf sig_mutex_unlock(&__aio_mutex); 5792248Sraf resultp = uresultp; 5802248Sraf break; 5812248Sraf } 5822248Sraf _aiowait_flag++; 5832248Sraf dontblock = (uresultp == (aio_result_t *)-1); 5842248Sraf if (dontblock && _kaio_outstand_cnt == 0) { 5852248Sraf kresultp = (aio_result_t *)-1; 5862248Sraf kaio_errno = EINVAL; 5872248Sraf } else { 5882248Sraf sig_mutex_unlock(&__aio_mutex); 5895891Sraf pthread_cleanup_push(_aiowait_cleanup, NULL); 5905891Sraf _cancel_prologue(); 5912248Sraf kresultp = (aio_result_t *)_kaio(AIOWAIT, 5922248Sraf wait, dontblock); 5935891Sraf _cancel_epilogue(); 5945891Sraf pthread_cleanup_pop(0); 5952248Sraf sig_mutex_lock(&__aio_mutex); 5962248Sraf kaio_errno = errno; 5972248Sraf } 5982248Sraf _aiowait_flag--; 5992248Sraf sig_mutex_unlock(&__aio_mutex); 6002248Sraf if (kresultp == (aio_result_t *)1) { 6012248Sraf /* aiowait() awakened by an aionotify() */ 6022248Sraf continue; 6032248Sraf } else if (kresultp != NULL && 6042248Sraf kresultp != (aio_result_t *)-1) { 6052248Sraf resultp = kresultp; 6062248Sraf sig_mutex_lock(&__aio_mutex); 6072248Sraf _kaio_outstand_cnt--; 6082248Sraf sig_mutex_unlock(&__aio_mutex); 6092248Sraf break; 6102248Sraf } else if (kresultp == (aio_result_t *)-1 && 6112248Sraf kaio_errno == EINVAL && 6122248Sraf uresultp == (aio_result_t *)-1) { 6132248Sraf errno = kaio_errno; 6142248Sraf resultp = (aio_result_t *)-1; 6152248Sraf break; 6162248Sraf } else if (kresultp == (aio_result_t *)-1 && 6172248Sraf kaio_errno == EINTR) { 6182248Sraf errno = kaio_errno; 6192248Sraf resultp = (aio_result_t *)-1; 6202248Sraf break; 6212248Sraf } else if (timedwait) { 6222248Sraf hres = hrtend - gethrtime(); 6232248Sraf if (hres <= 0) { 6242248Sraf /* time is up; return */ 6252248Sraf resultp = NULL; 6262248Sraf break; 6272248Sraf } else { 6282248Sraf /* 6292248Sraf * Some time left. Round up the remaining time 6302248Sraf * in nanoseconds to microsec. Retry the call. 6312248Sraf */ 6322248Sraf hres += (NANOSEC / MICROSEC) - 1; 6332248Sraf wait->tv_sec = hres / NANOSEC; 6342248Sraf wait->tv_usec = 6354502Spraks (hres % NANOSEC) / (NANOSEC / MICROSEC); 6362248Sraf } 6372248Sraf } else { 6382248Sraf ASSERT(kresultp == NULL && uresultp == NULL); 6392248Sraf resultp = NULL; 6402248Sraf continue; 6412248Sraf } 6422248Sraf } 6432248Sraf return (resultp); 6442248Sraf } 6452248Sraf 6462248Sraf /* 6472248Sraf * _aio_get_timedelta calculates the remaining time and stores the result 6482248Sraf * into timespec_t *wait. 6492248Sraf */ 6502248Sraf 6512248Sraf int 6522248Sraf _aio_get_timedelta(timespec_t *end, timespec_t *wait) 6532248Sraf { 6542248Sraf int ret = 0; 6552248Sraf struct timeval cur; 6562248Sraf timespec_t curtime; 6572248Sraf 6582248Sraf (void) gettimeofday(&cur, NULL); 6592248Sraf curtime.tv_sec = cur.tv_sec; 6602248Sraf curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */ 6612248Sraf 6622248Sraf if (end->tv_sec >= curtime.tv_sec) { 6632248Sraf wait->tv_sec = end->tv_sec - curtime.tv_sec; 6642248Sraf if (end->tv_nsec >= curtime.tv_nsec) { 6652248Sraf wait->tv_nsec = end->tv_nsec - curtime.tv_nsec; 6662248Sraf if (wait->tv_sec == 0 && wait->tv_nsec == 0) 6672248Sraf ret = -1; /* timer expired */ 6682248Sraf } else { 6692248Sraf if (end->tv_sec > curtime.tv_sec) { 6702248Sraf wait->tv_sec -= 1; 6712248Sraf wait->tv_nsec = NANOSEC - 6722248Sraf (curtime.tv_nsec - end->tv_nsec); 6732248Sraf } else { 6742248Sraf ret = -1; /* timer expired */ 6752248Sraf } 6762248Sraf } 6772248Sraf } else { 6782248Sraf ret = -1; 6792248Sraf } 6802248Sraf return (ret); 6812248Sraf } 6822248Sraf 6832248Sraf /* 6842248Sraf * If closing by file descriptor: we will simply cancel all the outstanding 6852248Sraf * aio`s and return. Those aio's in question will have either noticed the 6862248Sraf * cancellation notice before, during, or after initiating io. 6872248Sraf */ 6882248Sraf int 6892248Sraf aiocancel_all(int fd) 6902248Sraf { 6912248Sraf aio_req_t *reqp; 6922248Sraf aio_req_t **reqpp; 6932248Sraf aio_worker_t *first; 6942248Sraf aio_worker_t *next; 6952248Sraf int canceled = 0; 6962248Sraf int done = 0; 6972248Sraf int cancelall = 0; 6982248Sraf 6992248Sraf sig_mutex_lock(&__aio_mutex); 7002248Sraf 7012248Sraf if (_aio_outstand_cnt == 0) { 7022248Sraf sig_mutex_unlock(&__aio_mutex); 7032248Sraf return (AIO_ALLDONE); 7042248Sraf } 7052248Sraf 7062248Sraf /* 7072248Sraf * Cancel requests from the read/write workers' queues. 7082248Sraf */ 7092248Sraf first = __nextworker_rw; 7102248Sraf next = first; 7112248Sraf do { 7122248Sraf _aio_cancel_work(next, fd, &canceled, &done); 7132248Sraf } while ((next = next->work_forw) != first); 7142248Sraf 7152248Sraf /* 7162248Sraf * finally, check if there are requests on the done queue that 7172248Sraf * should be canceled. 7182248Sraf */ 7192248Sraf if (fd < 0) 7202248Sraf cancelall = 1; 7212248Sraf reqpp = &_aio_done_tail; 7222248Sraf while ((reqp = *reqpp) != NULL) { 7232248Sraf if (cancelall || reqp->req_args.fd == fd) { 7242248Sraf *reqpp = reqp->req_next; 7252248Sraf _aio_donecnt--; 7262248Sraf (void) _aio_hash_del(reqp->req_resultp); 7272248Sraf _aio_req_free(reqp); 7282248Sraf } else 7292248Sraf reqpp = &reqp->req_next; 7302248Sraf } 7312248Sraf if (cancelall) { 7322248Sraf ASSERT(_aio_donecnt == 0); 7332248Sraf _aio_done_head = NULL; 7342248Sraf } 7352248Sraf sig_mutex_unlock(&__aio_mutex); 7362248Sraf 7372248Sraf if (canceled && done == 0) 7382248Sraf return (AIO_CANCELED); 7392248Sraf else if (done && canceled == 0) 7402248Sraf return (AIO_ALLDONE); 7412248Sraf else if ((canceled + done == 0) && KAIO_SUPPORTED(fd)) 7422248Sraf return ((int)_kaio(AIOCANCEL, fd, NULL)); 7432248Sraf return (AIO_NOTCANCELED); 7442248Sraf } 7452248Sraf 7462248Sraf /* 7472248Sraf * Cancel requests from a given work queue. If the file descriptor 7482248Sraf * parameter, fd, is non-negative, then only cancel those requests 7492248Sraf * in this queue that are to this file descriptor. If the fd 7502248Sraf * parameter is -1, then cancel all requests. 7512248Sraf */ 7522248Sraf static void 7532248Sraf _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done) 7542248Sraf { 7552248Sraf aio_req_t *reqp; 7562248Sraf 7572248Sraf sig_mutex_lock(&aiowp->work_qlock1); 7582248Sraf /* 7592248Sraf * cancel queued requests first. 7602248Sraf */ 7612248Sraf reqp = aiowp->work_tail1; 7622248Sraf while (reqp != NULL) { 7632248Sraf if (fd < 0 || reqp->req_args.fd == fd) { 7642248Sraf if (_aio_cancel_req(aiowp, reqp, canceled, done)) { 7652248Sraf /* 7662248Sraf * Callers locks were dropped. 7672248Sraf * reqp is invalid; start traversing 7682248Sraf * the list from the beginning again. 7692248Sraf */ 7702248Sraf reqp = aiowp->work_tail1; 7712248Sraf continue; 7722248Sraf } 7732248Sraf } 7742248Sraf reqp = reqp->req_next; 7752248Sraf } 7762248Sraf /* 7772248Sraf * Since the queued requests have been canceled, there can 7782248Sraf * only be one inprogress request that should be canceled. 7792248Sraf */ 7802248Sraf if ((reqp = aiowp->work_req) != NULL && 7812248Sraf (fd < 0 || reqp->req_args.fd == fd)) 7822248Sraf (void) _aio_cancel_req(aiowp, reqp, canceled, done); 7832248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 7842248Sraf } 7852248Sraf 7862248Sraf /* 7872248Sraf * Cancel a request. Return 1 if the callers locks were temporarily 7882248Sraf * dropped, otherwise return 0. 7892248Sraf */ 7902248Sraf int 7912248Sraf _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done) 7922248Sraf { 7932248Sraf int ostate = reqp->req_state; 7942248Sraf 7952248Sraf ASSERT(MUTEX_HELD(&__aio_mutex)); 7962248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 7972248Sraf if (ostate == AIO_REQ_CANCELED) 7982248Sraf return (0); 7992248Sraf if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) { 8002248Sraf (*done)++; 8012248Sraf return (0); 8022248Sraf } 8032248Sraf if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) { 8042248Sraf ASSERT(POSIX_AIO(reqp)); 8052248Sraf /* Cancel the queued aio_fsync() request */ 8062248Sraf if (!reqp->req_head->lio_canned) { 8072248Sraf reqp->req_head->lio_canned = 1; 8082248Sraf _aio_outstand_cnt--; 8092248Sraf (*canceled)++; 8102248Sraf } 8112248Sraf return (0); 8122248Sraf } 8132248Sraf reqp->req_state = AIO_REQ_CANCELED; 8142248Sraf _aio_req_del(aiowp, reqp, ostate); 8152248Sraf (void) _aio_hash_del(reqp->req_resultp); 8162248Sraf (*canceled)++; 8172248Sraf if (reqp == aiowp->work_req) { 8182248Sraf ASSERT(ostate == AIO_REQ_INPROGRESS); 8192248Sraf /* 8202248Sraf * Set the result values now, before _aiodone() is called. 8212248Sraf * We do this because the application can expect aio_return 8222248Sraf * and aio_errno to be set to -1 and ECANCELED, respectively, 8232248Sraf * immediately after a successful return from aiocancel() 8242248Sraf * or aio_cancel(). 8252248Sraf */ 8262248Sraf _aio_set_result(reqp, -1, ECANCELED); 8272248Sraf (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL); 8282248Sraf return (0); 8292248Sraf } 8302248Sraf if (!POSIX_AIO(reqp)) { 8312248Sraf _aio_outstand_cnt--; 8322248Sraf _aio_set_result(reqp, -1, ECANCELED); 8332248Sraf return (0); 8342248Sraf } 8352248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 8362248Sraf sig_mutex_unlock(&__aio_mutex); 8372248Sraf _aiodone(reqp, -1, ECANCELED); 8382248Sraf sig_mutex_lock(&__aio_mutex); 8392248Sraf sig_mutex_lock(&aiowp->work_qlock1); 8402248Sraf return (1); 8412248Sraf } 8422248Sraf 8432248Sraf int 8442248Sraf _aio_create_worker(aio_req_t *reqp, int mode) 8452248Sraf { 8462248Sraf aio_worker_t *aiowp, **workers, **nextworker; 8472248Sraf int *aio_workerscnt; 8482248Sraf void *(*func)(void *); 8492248Sraf sigset_t oset; 8502248Sraf int error; 8512248Sraf 8522248Sraf /* 8532248Sraf * Put the new worker thread in the right queue. 8542248Sraf */ 8552248Sraf switch (mode) { 8562248Sraf case AIOREAD: 8572248Sraf case AIOWRITE: 8582248Sraf case AIOAREAD: 8592248Sraf case AIOAWRITE: 8602248Sraf #if !defined(_LP64) 8612248Sraf case AIOAREAD64: 8622248Sraf case AIOAWRITE64: 8632248Sraf #endif 8642248Sraf workers = &__workers_rw; 8652248Sraf nextworker = &__nextworker_rw; 8662248Sraf aio_workerscnt = &__rw_workerscnt; 8672248Sraf func = _aio_do_request; 8682248Sraf break; 8692248Sraf case AIONOTIFY: 8702248Sraf workers = &__workers_no; 8712248Sraf nextworker = &__nextworker_no; 8722248Sraf func = _aio_do_notify; 8732248Sraf aio_workerscnt = &__no_workerscnt; 8742248Sraf break; 8752248Sraf default: 8762248Sraf aio_panic("_aio_create_worker: invalid mode"); 8772248Sraf break; 8782248Sraf } 8792248Sraf 8802248Sraf if ((aiowp = _aio_worker_alloc()) == NULL) 8812248Sraf return (-1); 8822248Sraf 8832248Sraf if (reqp) { 8842248Sraf reqp->req_state = AIO_REQ_QUEUED; 8852248Sraf reqp->req_worker = aiowp; 8862248Sraf aiowp->work_head1 = reqp; 8872248Sraf aiowp->work_tail1 = reqp; 8882248Sraf aiowp->work_next1 = reqp; 8892248Sraf aiowp->work_count1 = 1; 8902248Sraf aiowp->work_minload1 = 1; 8912248Sraf } 8922248Sraf 8932248Sraf (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 8942248Sraf error = thr_create(NULL, AIOSTKSIZE, func, aiowp, 8954502Spraks THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid); 8962248Sraf (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 8972248Sraf if (error) { 8982248Sraf if (reqp) { 8992248Sraf reqp->req_state = 0; 9002248Sraf reqp->req_worker = NULL; 9012248Sraf } 9022248Sraf _aio_worker_free(aiowp); 9032248Sraf return (-1); 9042248Sraf } 9052248Sraf 9062248Sraf lmutex_lock(&__aio_mutex); 9072248Sraf (*aio_workerscnt)++; 9082248Sraf if (*workers == NULL) { 9092248Sraf aiowp->work_forw = aiowp; 9102248Sraf aiowp->work_backw = aiowp; 9112248Sraf *nextworker = aiowp; 9122248Sraf *workers = aiowp; 9132248Sraf } else { 9142248Sraf aiowp->work_backw = (*workers)->work_backw; 9152248Sraf aiowp->work_forw = (*workers); 9162248Sraf (*workers)->work_backw->work_forw = aiowp; 9172248Sraf (*workers)->work_backw = aiowp; 9182248Sraf } 9192248Sraf _aio_worker_cnt++; 9202248Sraf lmutex_unlock(&__aio_mutex); 9212248Sraf 9222248Sraf (void) thr_continue(aiowp->work_tid); 9232248Sraf 9242248Sraf return (0); 9252248Sraf } 9262248Sraf 9272248Sraf /* 9282248Sraf * This is the worker's main routine. 9292248Sraf * The task of this function is to execute all queued requests; 9302248Sraf * once the last pending request is executed this function will block 9312248Sraf * in _aio_idle(). A new incoming request must wakeup this thread to 9322248Sraf * restart the work. 9332248Sraf * Every worker has an own work queue. The queue lock is required 9342248Sraf * to synchronize the addition of new requests for this worker or 9352248Sraf * cancellation of pending/running requests. 9362248Sraf * 9372248Sraf * Cancellation scenarios: 9382248Sraf * The cancellation of a request is being done asynchronously using 9392248Sraf * _aio_cancel_req() from another thread context. 9402248Sraf * A queued request can be cancelled in different manners : 9412248Sraf * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED): 9422248Sraf * - lock the queue -> remove the request -> unlock the queue 9432248Sraf * - this function/thread does not detect this cancellation process 9442248Sraf * b) request is in progress (AIO_REQ_INPROGRESS) : 9452248Sraf * - this function first allow the cancellation of the running 9462248Sraf * request with the flag "work_cancel_flg=1" 9472248Sraf * see _aio_req_get() -> _aio_cancel_on() 9482248Sraf * During this phase, it is allowed to interrupt the worker 9492248Sraf * thread running the request (this thread) using the SIGAIOCANCEL 9502248Sraf * signal. 9512248Sraf * Once this thread returns from the kernel (because the request 9522248Sraf * is just done), then it must disable a possible cancellation 9532248Sraf * and proceed to finish the request. To disable the cancellation 9542248Sraf * this thread must use _aio_cancel_off() to set "work_cancel_flg=0". 9552248Sraf * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ): 9562248Sraf * same procedure as in a) 9572248Sraf * 9582248Sraf * To b) 9592248Sraf * This thread uses sigsetjmp() to define the position in the code, where 9602248Sraf * it wish to continue working in the case that a SIGAIOCANCEL signal 9612248Sraf * is detected. 9622248Sraf * Normally this thread should get the cancellation signal during the 9632248Sraf * kernel phase (reading or writing). In that case the signal handler 9642248Sraf * aiosigcancelhndlr() is activated using the worker thread context, 9652248Sraf * which again will use the siglongjmp() function to break the standard 9662248Sraf * code flow and jump to the "sigsetjmp" position, provided that 9672248Sraf * "work_cancel_flg" is set to "1". 9682248Sraf * Because the "work_cancel_flg" is only manipulated by this worker 9692248Sraf * thread and it can only run on one CPU at a given time, it is not 9702248Sraf * necessary to protect that flag with the queue lock. 9712248Sraf * Returning from the kernel (read or write system call) we must 9722248Sraf * first disable the use of the SIGAIOCANCEL signal and accordingly 9732248Sraf * the use of the siglongjmp() function to prevent a possible deadlock: 9742248Sraf * - It can happens that this worker thread returns from the kernel and 9752248Sraf * blocks in "work_qlock1", 9762248Sraf * - then a second thread cancels the apparently "in progress" request 9772248Sraf * and sends the SIGAIOCANCEL signal to the worker thread, 9782248Sraf * - the worker thread gets assigned the "work_qlock1" and will returns 9792248Sraf * from the kernel, 9802248Sraf * - the kernel detects the pending signal and activates the signal 9812248Sraf * handler instead, 9822248Sraf * - if the "work_cancel_flg" is still set then the signal handler 9832248Sraf * should use siglongjmp() to cancel the "in progress" request and 9842248Sraf * it would try to acquire the same work_qlock1 in _aio_req_get() 9852248Sraf * for a second time => deadlock. 9862248Sraf * To avoid that situation we disable the cancellation of the request 9872248Sraf * in progress BEFORE we try to acquire the work_qlock1. 9882248Sraf * In that case the signal handler will not call siglongjmp() and the 9892248Sraf * worker thread will continue running the standard code flow. 9902248Sraf * Then this thread must check the AIO_REQ_CANCELED flag to emulate 9912248Sraf * an eventually required siglongjmp() freeing the work_qlock1 and 9922248Sraf * avoiding a deadlock. 9932248Sraf */ 9942248Sraf void * 9952248Sraf _aio_do_request(void *arglist) 9962248Sraf { 9972248Sraf aio_worker_t *aiowp = (aio_worker_t *)arglist; 9982248Sraf ulwp_t *self = curthread; 9992248Sraf struct aio_args *arg; 10002248Sraf aio_req_t *reqp; /* current AIO request */ 10012248Sraf ssize_t retval; 10025937Sraf int append; 10032248Sraf int error; 10042248Sraf 10052248Sraf if (pthread_setspecific(_aio_key, aiowp) != 0) 10062248Sraf aio_panic("_aio_do_request, pthread_setspecific()"); 10072248Sraf (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL); 10082248Sraf ASSERT(aiowp->work_req == NULL); 10092248Sraf 10102248Sraf /* 10112248Sraf * We resume here when an operation is cancelled. 10122248Sraf * On first entry, aiowp->work_req == NULL, so all 10132248Sraf * we do is block SIGAIOCANCEL. 10142248Sraf */ 10152248Sraf (void) sigsetjmp(aiowp->work_jmp_buf, 0); 10162248Sraf ASSERT(self->ul_sigdefer == 0); 10172248Sraf 10182248Sraf sigoff(self); /* block SIGAIOCANCEL */ 10192248Sraf if (aiowp->work_req != NULL) 10202248Sraf _aio_finish_request(aiowp, -1, ECANCELED); 10212248Sraf 10222248Sraf for (;;) { 10232248Sraf /* 10242248Sraf * Put completed requests on aio_done_list. This has 10252248Sraf * to be done as part of the main loop to ensure that 10262248Sraf * we don't artificially starve any aiowait'ers. 10272248Sraf */ 10282248Sraf if (aiowp->work_done1) 10292248Sraf _aio_work_done(aiowp); 10302248Sraf 10312248Sraf top: 10322248Sraf /* consume any deferred SIGAIOCANCEL signal here */ 10332248Sraf sigon(self); 10342248Sraf sigoff(self); 10352248Sraf 10362248Sraf while ((reqp = _aio_req_get(aiowp)) == NULL) { 10372248Sraf if (_aio_idle(aiowp) != 0) 10382248Sraf goto top; 10392248Sraf } 10402248Sraf arg = &reqp->req_args; 10412248Sraf ASSERT(reqp->req_state == AIO_REQ_INPROGRESS || 10422248Sraf reqp->req_state == AIO_REQ_CANCELED); 10432248Sraf error = 0; 10442248Sraf 10452248Sraf switch (reqp->req_op) { 10462248Sraf case AIOREAD: 10472248Sraf case AIOAREAD: 10482248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 10492248Sraf retval = pread(arg->fd, arg->buf, 10502248Sraf arg->bufsz, arg->offset); 10512248Sraf if (retval == -1) { 10522248Sraf if (errno == ESPIPE) { 10532248Sraf retval = read(arg->fd, 10542248Sraf arg->buf, arg->bufsz); 10552248Sraf if (retval == -1) 10562248Sraf error = errno; 10572248Sraf } else { 10582248Sraf error = errno; 10592248Sraf } 10602248Sraf } 10612248Sraf sigoff(self); /* block SIGAIOCANCEL */ 10622248Sraf break; 10632248Sraf case AIOWRITE: 10642248Sraf case AIOAWRITE: 10655937Sraf /* 10665937Sraf * The SUSv3 POSIX spec for aio_write() states: 10675937Sraf * If O_APPEND is set for the file descriptor, 10685937Sraf * write operations append to the file in the 10695937Sraf * same order as the calls were made. 10705937Sraf * but, somewhat inconsistently, it requires pwrite() 10715937Sraf * to ignore the O_APPEND setting. So we have to use 10725937Sraf * fcntl() to get the open modes and call write() for 10735937Sraf * the O_APPEND case. 10745937Sraf */ 10755937Sraf append = (__fcntl(arg->fd, F_GETFL) & O_APPEND); 10762248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 10775937Sraf retval = append? 10785937Sraf write(arg->fd, arg->buf, arg->bufsz) : 10795937Sraf pwrite(arg->fd, arg->buf, arg->bufsz, 10805937Sraf arg->offset); 10812248Sraf if (retval == -1) { 10822248Sraf if (errno == ESPIPE) { 10832248Sraf retval = write(arg->fd, 10842248Sraf arg->buf, arg->bufsz); 10852248Sraf if (retval == -1) 10862248Sraf error = errno; 10872248Sraf } else { 10882248Sraf error = errno; 10892248Sraf } 10902248Sraf } 10912248Sraf sigoff(self); /* block SIGAIOCANCEL */ 10922248Sraf break; 10932248Sraf #if !defined(_LP64) 10942248Sraf case AIOAREAD64: 10952248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 10962248Sraf retval = pread64(arg->fd, arg->buf, 10972248Sraf arg->bufsz, arg->offset); 10982248Sraf if (retval == -1) { 10992248Sraf if (errno == ESPIPE) { 11002248Sraf retval = read(arg->fd, 11012248Sraf arg->buf, arg->bufsz); 11022248Sraf if (retval == -1) 11032248Sraf error = errno; 11042248Sraf } else { 11052248Sraf error = errno; 11062248Sraf } 11072248Sraf } 11082248Sraf sigoff(self); /* block SIGAIOCANCEL */ 11092248Sraf break; 11102248Sraf case AIOAWRITE64: 11115937Sraf /* 11125937Sraf * The SUSv3 POSIX spec for aio_write() states: 11135937Sraf * If O_APPEND is set for the file descriptor, 11145937Sraf * write operations append to the file in the 11155937Sraf * same order as the calls were made. 11165937Sraf * but, somewhat inconsistently, it requires pwrite() 11175937Sraf * to ignore the O_APPEND setting. So we have to use 11185937Sraf * fcntl() to get the open modes and call write() for 11195937Sraf * the O_APPEND case. 11205937Sraf */ 11215937Sraf append = (__fcntl(arg->fd, F_GETFL) & O_APPEND); 11222248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 11235937Sraf retval = append? 11245937Sraf write(arg->fd, arg->buf, arg->bufsz) : 11255937Sraf pwrite64(arg->fd, arg->buf, arg->bufsz, 11265937Sraf arg->offset); 11272248Sraf if (retval == -1) { 11282248Sraf if (errno == ESPIPE) { 11292248Sraf retval = write(arg->fd, 11302248Sraf arg->buf, arg->bufsz); 11312248Sraf if (retval == -1) 11322248Sraf error = errno; 11332248Sraf } else { 11342248Sraf error = errno; 11352248Sraf } 11362248Sraf } 11372248Sraf sigoff(self); /* block SIGAIOCANCEL */ 11382248Sraf break; 11392248Sraf #endif /* !defined(_LP64) */ 11402248Sraf case AIOFSYNC: 11412248Sraf if (_aio_fsync_del(aiowp, reqp)) 11422248Sraf goto top; 11432248Sraf ASSERT(reqp->req_head == NULL); 11442248Sraf /* 11452248Sraf * All writes for this fsync request are now 11462248Sraf * acknowledged. Now make these writes visible 11472248Sraf * and put the final request into the hash table. 11482248Sraf */ 11492248Sraf if (reqp->req_state == AIO_REQ_CANCELED) { 11502248Sraf /* EMPTY */; 11512248Sraf } else if (arg->offset == O_SYNC) { 11522248Sraf if ((retval = __fdsync(arg->fd, FSYNC)) == -1) 11532248Sraf error = errno; 11542248Sraf } else { 11552248Sraf if ((retval = __fdsync(arg->fd, FDSYNC)) == -1) 11562248Sraf error = errno; 11572248Sraf } 11582248Sraf if (_aio_hash_insert(reqp->req_resultp, reqp) != 0) 11592248Sraf aio_panic("_aio_do_request(): AIOFSYNC: " 11602248Sraf "request already in hash table"); 11612248Sraf break; 11622248Sraf default: 11632248Sraf aio_panic("_aio_do_request, bad op"); 11642248Sraf } 11652248Sraf 11662248Sraf _aio_finish_request(aiowp, retval, error); 11672248Sraf } 11682248Sraf /* NOTREACHED */ 11692248Sraf return (NULL); 11702248Sraf } 11712248Sraf 11722248Sraf /* 11732248Sraf * Perform the tail processing for _aio_do_request(). 11742248Sraf * The in-progress request may or may not have been cancelled. 11752248Sraf */ 11762248Sraf static void 11772248Sraf _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error) 11782248Sraf { 11792248Sraf aio_req_t *reqp; 11802248Sraf 11812248Sraf sig_mutex_lock(&aiowp->work_qlock1); 11822248Sraf if ((reqp = aiowp->work_req) == NULL) 11832248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 11842248Sraf else { 11852248Sraf aiowp->work_req = NULL; 11862248Sraf if (reqp->req_state == AIO_REQ_CANCELED) { 11872248Sraf retval = -1; 11882248Sraf error = ECANCELED; 11892248Sraf } 11902248Sraf if (!POSIX_AIO(reqp)) { 11914502Spraks int notify; 11922248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 11932248Sraf sig_mutex_lock(&__aio_mutex); 11942248Sraf if (reqp->req_state == AIO_REQ_INPROGRESS) 11952248Sraf reqp->req_state = AIO_REQ_DONE; 11964502Spraks /* 11974502Spraks * If it was canceled, this request will not be 11984502Spraks * added to done list. Just free it. 11994502Spraks */ 12004502Spraks if (error == ECANCELED) { 12012248Sraf _aio_outstand_cnt--; 12024502Spraks _aio_req_free(reqp); 12034502Spraks } else { 12044502Spraks _aio_set_result(reqp, retval, error); 12054502Spraks _aio_req_done_cnt++; 12064502Spraks } 12074502Spraks /* 12084502Spraks * Notify any thread that may have blocked 12094502Spraks * because it saw an outstanding request. 12104502Spraks */ 12114502Spraks notify = 0; 12124502Spraks if (_aio_outstand_cnt == 0 && _aiowait_flag) { 12134502Spraks notify = 1; 12144502Spraks } 12152248Sraf sig_mutex_unlock(&__aio_mutex); 12164502Spraks if (notify) { 12174502Spraks (void) _kaio(AIONOTIFY); 12184502Spraks } 12192248Sraf } else { 12202248Sraf if (reqp->req_state == AIO_REQ_INPROGRESS) 12212248Sraf reqp->req_state = AIO_REQ_DONE; 12222248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 12232248Sraf _aiodone(reqp, retval, error); 12242248Sraf } 12252248Sraf } 12262248Sraf } 12272248Sraf 12282248Sraf void 12292248Sraf _aio_req_mark_done(aio_req_t *reqp) 12302248Sraf { 12312248Sraf #if !defined(_LP64) 12322248Sraf if (reqp->req_largefile) 12332248Sraf ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 12342248Sraf else 12352248Sraf #endif 12362248Sraf ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 12372248Sraf } 12382248Sraf 12392248Sraf /* 12402248Sraf * Sleep for 'ticks' clock ticks to give somebody else a chance to run, 12412248Sraf * hopefully to consume one of our queued signals. 12422248Sraf */ 12432248Sraf static void 12442248Sraf _aio_delay(int ticks) 12452248Sraf { 12462248Sraf (void) usleep(ticks * (MICROSEC / hz)); 12472248Sraf } 12482248Sraf 12492248Sraf /* 12502248Sraf * Actually send the notifications. 12512248Sraf * We could block indefinitely here if the application 12522248Sraf * is not listening for the signal or port notifications. 12532248Sraf */ 12542248Sraf static void 12552248Sraf send_notification(notif_param_t *npp) 12562248Sraf { 12572248Sraf extern int __sigqueue(pid_t pid, int signo, 12584502Spraks /* const union sigval */ void *value, int si_code, int block); 12592248Sraf 12602248Sraf if (npp->np_signo) 12612248Sraf (void) __sigqueue(__pid, npp->np_signo, npp->np_user, 12622248Sraf SI_ASYNCIO, 1); 12632248Sraf else if (npp->np_port >= 0) 12642248Sraf (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO, 12652248Sraf npp->np_event, npp->np_object, npp->np_user); 12662248Sraf 12672248Sraf if (npp->np_lio_signo) 12682248Sraf (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user, 12692248Sraf SI_ASYNCIO, 1); 12702248Sraf else if (npp->np_lio_port >= 0) 12712248Sraf (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO, 12722248Sraf npp->np_lio_event, npp->np_lio_object, npp->np_lio_user); 12732248Sraf } 12742248Sraf 12752248Sraf /* 12762248Sraf * Asynchronous notification worker. 12772248Sraf */ 12782248Sraf void * 12792248Sraf _aio_do_notify(void *arg) 12802248Sraf { 12812248Sraf aio_worker_t *aiowp = (aio_worker_t *)arg; 12822248Sraf aio_req_t *reqp; 12832248Sraf 12842248Sraf /* 12852248Sraf * This isn't really necessary. All signals are blocked. 12862248Sraf */ 12872248Sraf if (pthread_setspecific(_aio_key, aiowp) != 0) 12882248Sraf aio_panic("_aio_do_notify, pthread_setspecific()"); 12892248Sraf 12902248Sraf /* 12912248Sraf * Notifications are never cancelled. 12922248Sraf * All signals remain blocked, forever. 12932248Sraf */ 12942248Sraf for (;;) { 12952248Sraf while ((reqp = _aio_req_get(aiowp)) == NULL) { 12962248Sraf if (_aio_idle(aiowp) != 0) 12972248Sraf aio_panic("_aio_do_notify: _aio_idle() failed"); 12982248Sraf } 12992248Sraf send_notification(&reqp->req_notify); 13002248Sraf _aio_req_free(reqp); 13012248Sraf } 13022248Sraf 13032248Sraf /* NOTREACHED */ 13042248Sraf return (NULL); 13052248Sraf } 13062248Sraf 13072248Sraf /* 13082248Sraf * Do the completion semantics for a request that was either canceled 13092248Sraf * by _aio_cancel_req() or was completed by _aio_do_request(). 13102248Sraf */ 13112248Sraf static void 13122248Sraf _aiodone(aio_req_t *reqp, ssize_t retval, int error) 13132248Sraf { 13142248Sraf aio_result_t *resultp = reqp->req_resultp; 13152248Sraf int notify = 0; 13162248Sraf aio_lio_t *head; 13172248Sraf int sigev_none; 13182248Sraf int sigev_signal; 13192248Sraf int sigev_thread; 13202248Sraf int sigev_port; 13212248Sraf notif_param_t np; 13222248Sraf 13232248Sraf /* 13242248Sraf * We call _aiodone() only for Posix I/O. 13252248Sraf */ 13262248Sraf ASSERT(POSIX_AIO(reqp)); 13272248Sraf 13282248Sraf sigev_none = 0; 13292248Sraf sigev_signal = 0; 13302248Sraf sigev_thread = 0; 13312248Sraf sigev_port = 0; 13322248Sraf np.np_signo = 0; 13332248Sraf np.np_port = -1; 13342248Sraf np.np_lio_signo = 0; 13352248Sraf np.np_lio_port = -1; 13362248Sraf 13372248Sraf switch (reqp->req_sigevent.sigev_notify) { 13382248Sraf case SIGEV_NONE: 13392248Sraf sigev_none = 1; 13402248Sraf break; 13412248Sraf case SIGEV_SIGNAL: 13422248Sraf sigev_signal = 1; 13432248Sraf break; 13442248Sraf case SIGEV_THREAD: 13452248Sraf sigev_thread = 1; 13462248Sraf break; 13472248Sraf case SIGEV_PORT: 13482248Sraf sigev_port = 1; 13492248Sraf break; 13502248Sraf default: 13512248Sraf aio_panic("_aiodone: improper sigev_notify"); 13522248Sraf break; 13532248Sraf } 13542248Sraf 13552248Sraf /* 13562248Sraf * Figure out the notification parameters while holding __aio_mutex. 13572248Sraf * Actually perform the notifications after dropping __aio_mutex. 13582248Sraf * This allows us to sleep for a long time (if the notifications 13592248Sraf * incur delays) without impeding other async I/O operations. 13602248Sraf */ 13612248Sraf 13622248Sraf sig_mutex_lock(&__aio_mutex); 13632248Sraf 13642248Sraf if (sigev_signal) { 13652248Sraf if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0) 13662248Sraf notify = 1; 13672248Sraf np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 13682248Sraf } else if (sigev_thread | sigev_port) { 13692248Sraf if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0) 13702248Sraf notify = 1; 13712248Sraf np.np_event = reqp->req_op; 13722248Sraf if (np.np_event == AIOFSYNC && reqp->req_largefile) 13732248Sraf np.np_event = AIOFSYNC64; 13742248Sraf np.np_object = (uintptr_t)reqp->req_aiocbp; 13752248Sraf np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 13762248Sraf } 13772248Sraf 13782248Sraf if (resultp->aio_errno == EINPROGRESS) 13792248Sraf _aio_set_result(reqp, retval, error); 13802248Sraf 13812248Sraf _aio_outstand_cnt--; 13822248Sraf 13832248Sraf head = reqp->req_head; 13842248Sraf reqp->req_head = NULL; 13852248Sraf 13862248Sraf if (sigev_none) { 13872248Sraf _aio_enq_doneq(reqp); 13882248Sraf reqp = NULL; 13892248Sraf } else { 13902248Sraf (void) _aio_hash_del(resultp); 13912248Sraf _aio_req_mark_done(reqp); 13922248Sraf } 13932248Sraf 13942248Sraf _aio_waitn_wakeup(); 13952248Sraf 13962248Sraf /* 13972248Sraf * __aio_waitn() sets AIO_WAIT_INPROGRESS and 13982248Sraf * __aio_suspend() increments "_aio_kernel_suspend" 13992248Sraf * when they are waiting in the kernel for completed I/Os. 14002248Sraf * 14012248Sraf * _kaio(AIONOTIFY) awakes the corresponding function 14022248Sraf * in the kernel; then the corresponding __aio_waitn() or 14032248Sraf * __aio_suspend() function could reap the recently 14042248Sraf * completed I/Os (_aiodone()). 14052248Sraf */ 14062248Sraf if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0) 14072248Sraf (void) _kaio(AIONOTIFY); 14082248Sraf 14092248Sraf sig_mutex_unlock(&__aio_mutex); 14102248Sraf 14112248Sraf if (head != NULL) { 14122248Sraf /* 14132248Sraf * If all the lio requests have completed, 14142248Sraf * prepare to notify the waiting thread. 14152248Sraf */ 14162248Sraf sig_mutex_lock(&head->lio_mutex); 14172248Sraf ASSERT(head->lio_refcnt == head->lio_nent); 14182248Sraf if (head->lio_refcnt == 1) { 14192248Sraf int waiting = 0; 14202248Sraf if (head->lio_mode == LIO_WAIT) { 14212248Sraf if ((waiting = head->lio_waiting) != 0) 14222248Sraf (void) cond_signal(&head->lio_cond_cv); 14232248Sraf } else if (head->lio_port < 0) { /* none or signal */ 14242248Sraf if ((np.np_lio_signo = head->lio_signo) != 0) 14252248Sraf notify = 1; 14262248Sraf np.np_lio_user = head->lio_sigval.sival_ptr; 14272248Sraf } else { /* thread or port */ 14282248Sraf notify = 1; 14292248Sraf np.np_lio_port = head->lio_port; 14302248Sraf np.np_lio_event = head->lio_event; 14312248Sraf np.np_lio_object = 14322248Sraf (uintptr_t)head->lio_sigevent; 14332248Sraf np.np_lio_user = head->lio_sigval.sival_ptr; 14342248Sraf } 14352248Sraf head->lio_nent = head->lio_refcnt = 0; 14362248Sraf sig_mutex_unlock(&head->lio_mutex); 14372248Sraf if (waiting == 0) 14382248Sraf _aio_lio_free(head); 14392248Sraf } else { 14402248Sraf head->lio_nent--; 14412248Sraf head->lio_refcnt--; 14422248Sraf sig_mutex_unlock(&head->lio_mutex); 14432248Sraf } 14442248Sraf } 14452248Sraf 14462248Sraf /* 14472248Sraf * The request is completed; now perform the notifications. 14482248Sraf */ 14492248Sraf if (notify) { 14502248Sraf if (reqp != NULL) { 14512248Sraf /* 14522248Sraf * We usually put the request on the notification 14532248Sraf * queue because we don't want to block and delay 14542248Sraf * other operations behind us in the work queue. 14552248Sraf * Also we must never block on a cancel notification 14562248Sraf * because we are being called from an application 14572248Sraf * thread in this case and that could lead to deadlock 14582248Sraf * if no other thread is receiving notificatins. 14592248Sraf */ 14602248Sraf reqp->req_notify = np; 14612248Sraf reqp->req_op = AIONOTIFY; 14622248Sraf _aio_req_add(reqp, &__workers_no, AIONOTIFY); 14632248Sraf reqp = NULL; 14642248Sraf } else { 14652248Sraf /* 14662248Sraf * We already put the request on the done queue, 14672248Sraf * so we can't queue it to the notification queue. 14682248Sraf * Just do the notification directly. 14692248Sraf */ 14702248Sraf send_notification(&np); 14712248Sraf } 14722248Sraf } 14732248Sraf 14742248Sraf if (reqp != NULL) 14752248Sraf _aio_req_free(reqp); 14762248Sraf } 14772248Sraf 14782248Sraf /* 14792248Sraf * Delete fsync requests from list head until there is 14802248Sraf * only one left. Return 0 when there is only one, 14812248Sraf * otherwise return a non-zero value. 14822248Sraf */ 14832248Sraf static int 14842248Sraf _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp) 14852248Sraf { 14862248Sraf aio_lio_t *head = reqp->req_head; 14872248Sraf int rval = 0; 14882248Sraf 14892248Sraf ASSERT(reqp == aiowp->work_req); 14902248Sraf sig_mutex_lock(&aiowp->work_qlock1); 14912248Sraf sig_mutex_lock(&head->lio_mutex); 14922248Sraf if (head->lio_refcnt > 1) { 14932248Sraf head->lio_refcnt--; 14942248Sraf head->lio_nent--; 14952248Sraf aiowp->work_req = NULL; 14962248Sraf sig_mutex_unlock(&head->lio_mutex); 14972248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 14982248Sraf sig_mutex_lock(&__aio_mutex); 14992248Sraf _aio_outstand_cnt--; 15002248Sraf _aio_waitn_wakeup(); 15012248Sraf sig_mutex_unlock(&__aio_mutex); 15022248Sraf _aio_req_free(reqp); 15032248Sraf return (1); 15042248Sraf } 15052248Sraf ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1); 15062248Sraf reqp->req_head = NULL; 15072248Sraf if (head->lio_canned) 15082248Sraf reqp->req_state = AIO_REQ_CANCELED; 15092248Sraf if (head->lio_mode == LIO_DESTROY) { 15102248Sraf aiowp->work_req = NULL; 15112248Sraf rval = 1; 15122248Sraf } 15132248Sraf sig_mutex_unlock(&head->lio_mutex); 15142248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 15152248Sraf head->lio_refcnt--; 15162248Sraf head->lio_nent--; 15172248Sraf _aio_lio_free(head); 15182248Sraf if (rval != 0) 15192248Sraf _aio_req_free(reqp); 15202248Sraf return (rval); 15212248Sraf } 15222248Sraf 15232248Sraf /* 15242248Sraf * A worker is set idle when its work queue is empty. 15252248Sraf * The worker checks again that it has no more work 15262248Sraf * and then goes to sleep waiting for more work. 15272248Sraf */ 15282248Sraf int 15292248Sraf _aio_idle(aio_worker_t *aiowp) 15302248Sraf { 15312248Sraf int error = 0; 15322248Sraf 15332248Sraf sig_mutex_lock(&aiowp->work_qlock1); 15342248Sraf if (aiowp->work_count1 == 0) { 15352248Sraf ASSERT(aiowp->work_minload1 == 0); 15362248Sraf aiowp->work_idleflg = 1; 15372248Sraf /* 15382248Sraf * A cancellation handler is not needed here. 15392248Sraf * aio worker threads are never cancelled via pthread_cancel(). 15402248Sraf */ 15412248Sraf error = sig_cond_wait(&aiowp->work_idle_cv, 15422248Sraf &aiowp->work_qlock1); 15432248Sraf /* 15442248Sraf * The idle flag is normally cleared before worker is awakened 15452248Sraf * by aio_req_add(). On error (EINTR), we clear it ourself. 15462248Sraf */ 15472248Sraf if (error) 15482248Sraf aiowp->work_idleflg = 0; 15492248Sraf } 15502248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 15512248Sraf return (error); 15522248Sraf } 15532248Sraf 15542248Sraf /* 15552248Sraf * A worker's completed AIO requests are placed onto a global 15562248Sraf * done queue. The application is only sent a SIGIO signal if 15572248Sraf * the process has a handler enabled and it is not waiting via 15582248Sraf * aiowait(). 15592248Sraf */ 15602248Sraf static void 15612248Sraf _aio_work_done(aio_worker_t *aiowp) 15622248Sraf { 15632248Sraf aio_req_t *reqp; 15642248Sraf 15652248Sraf sig_mutex_lock(&aiowp->work_qlock1); 15662248Sraf reqp = aiowp->work_prev1; 15672248Sraf reqp->req_next = NULL; 15682248Sraf aiowp->work_done1 = 0; 15692248Sraf aiowp->work_tail1 = aiowp->work_next1; 15702248Sraf if (aiowp->work_tail1 == NULL) 15712248Sraf aiowp->work_head1 = NULL; 15722248Sraf aiowp->work_prev1 = NULL; 15732248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 15742248Sraf sig_mutex_lock(&__aio_mutex); 15752248Sraf _aio_donecnt++; 15762248Sraf _aio_outstand_cnt--; 15772248Sraf _aio_req_done_cnt--; 15782248Sraf ASSERT(_aio_donecnt > 0 && 15792248Sraf _aio_outstand_cnt >= 0 && 15802248Sraf _aio_req_done_cnt >= 0); 15812248Sraf ASSERT(reqp != NULL); 15822248Sraf 15832248Sraf if (_aio_done_tail == NULL) { 15842248Sraf _aio_done_head = _aio_done_tail = reqp; 15852248Sraf } else { 15862248Sraf _aio_done_head->req_next = reqp; 15872248Sraf _aio_done_head = reqp; 15882248Sraf } 15892248Sraf 15902248Sraf if (_aiowait_flag) { 15912248Sraf sig_mutex_unlock(&__aio_mutex); 15922248Sraf (void) _kaio(AIONOTIFY); 15932248Sraf } else { 15942248Sraf sig_mutex_unlock(&__aio_mutex); 15952248Sraf if (_sigio_enabled) 15962248Sraf (void) kill(__pid, SIGIO); 15972248Sraf } 15982248Sraf } 15992248Sraf 16002248Sraf /* 16012248Sraf * The done queue consists of AIO requests that are in either the 16022248Sraf * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled 16032248Sraf * are discarded. If the done queue is empty then NULL is returned. 16042248Sraf * Otherwise the address of a done aio_result_t is returned. 16052248Sraf */ 16062248Sraf aio_result_t * 16072248Sraf _aio_req_done(void) 16082248Sraf { 16092248Sraf aio_req_t *reqp; 16102248Sraf aio_result_t *resultp; 16112248Sraf 16122248Sraf ASSERT(MUTEX_HELD(&__aio_mutex)); 16132248Sraf 16142248Sraf if ((reqp = _aio_done_tail) != NULL) { 16152248Sraf if ((_aio_done_tail = reqp->req_next) == NULL) 16162248Sraf _aio_done_head = NULL; 16172248Sraf ASSERT(_aio_donecnt > 0); 16182248Sraf _aio_donecnt--; 16192248Sraf (void) _aio_hash_del(reqp->req_resultp); 16202248Sraf resultp = reqp->req_resultp; 16212248Sraf ASSERT(reqp->req_state == AIO_REQ_DONE); 16222248Sraf _aio_req_free(reqp); 16232248Sraf return (resultp); 16242248Sraf } 16252248Sraf /* is queue empty? */ 16262248Sraf if (reqp == NULL && _aio_outstand_cnt == 0) { 16272248Sraf return ((aio_result_t *)-1); 16282248Sraf } 16292248Sraf return (NULL); 16302248Sraf } 16312248Sraf 16322248Sraf /* 16332248Sraf * Set the return and errno values for the application's use. 16342248Sraf * 16352248Sraf * For the Posix interfaces, we must set the return value first followed 16362248Sraf * by the errno value because the Posix interfaces allow for a change 16372248Sraf * in the errno value from EINPROGRESS to something else to signal 16382248Sraf * the completion of the asynchronous request. 16392248Sraf * 16402248Sraf * The opposite is true for the Solaris interfaces. These allow for 16412248Sraf * a change in the return value from AIO_INPROGRESS to something else 16422248Sraf * to signal the completion of the asynchronous request. 16432248Sraf */ 16442248Sraf void 16452248Sraf _aio_set_result(aio_req_t *reqp, ssize_t retval, int error) 16462248Sraf { 16472248Sraf aio_result_t *resultp = reqp->req_resultp; 16482248Sraf 16492248Sraf if (POSIX_AIO(reqp)) { 16502248Sraf resultp->aio_return = retval; 16512248Sraf membar_producer(); 16522248Sraf resultp->aio_errno = error; 16532248Sraf } else { 16542248Sraf resultp->aio_errno = error; 16552248Sraf membar_producer(); 16562248Sraf resultp->aio_return = retval; 16572248Sraf } 16582248Sraf } 16592248Sraf 16602248Sraf /* 16612248Sraf * Add an AIO request onto the next work queue. 16622248Sraf * A circular list of workers is used to choose the next worker. 16632248Sraf */ 16642248Sraf void 16652248Sraf _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode) 16662248Sraf { 16672248Sraf ulwp_t *self = curthread; 16682248Sraf aio_worker_t *aiowp; 16692248Sraf aio_worker_t *first; 16702248Sraf int load_bal_flg = 1; 16712248Sraf int found; 16722248Sraf 16732248Sraf ASSERT(reqp->req_state != AIO_REQ_DONEQ); 16742248Sraf reqp->req_next = NULL; 16752248Sraf /* 16762248Sraf * Try to acquire the next worker's work queue. If it is locked, 16772248Sraf * then search the list of workers until a queue is found unlocked, 16782248Sraf * or until the list is completely traversed at which point another 16792248Sraf * worker will be created. 16802248Sraf */ 16812248Sraf sigoff(self); /* defer SIGIO */ 16822248Sraf sig_mutex_lock(&__aio_mutex); 16832248Sraf first = aiowp = *nextworker; 16842248Sraf if (mode != AIONOTIFY) 16852248Sraf _aio_outstand_cnt++; 16862248Sraf sig_mutex_unlock(&__aio_mutex); 16872248Sraf 16882248Sraf switch (mode) { 16892248Sraf case AIOREAD: 16902248Sraf case AIOWRITE: 16912248Sraf case AIOAREAD: 16922248Sraf case AIOAWRITE: 16932248Sraf #if !defined(_LP64) 16942248Sraf case AIOAREAD64: 16952248Sraf case AIOAWRITE64: 16962248Sraf #endif 16972248Sraf /* try to find an idle worker */ 16982248Sraf found = 0; 16992248Sraf do { 17002248Sraf if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 17012248Sraf if (aiowp->work_idleflg) { 17022248Sraf found = 1; 17032248Sraf break; 17042248Sraf } 17052248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 17062248Sraf } 17072248Sraf } while ((aiowp = aiowp->work_forw) != first); 17082248Sraf 17092248Sraf if (found) { 17102248Sraf aiowp->work_minload1++; 17112248Sraf break; 17122248Sraf } 17132248Sraf 17142248Sraf /* try to acquire some worker's queue lock */ 17152248Sraf do { 17162248Sraf if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 17172248Sraf found = 1; 17182248Sraf break; 17192248Sraf } 17202248Sraf } while ((aiowp = aiowp->work_forw) != first); 17212248Sraf 17222248Sraf /* 17232248Sraf * Create more workers when the workers appear overloaded. 17242248Sraf * Either all the workers are busy draining their queues 17252248Sraf * or no worker's queue lock could be acquired. 17262248Sraf */ 17272248Sraf if (!found) { 17282248Sraf if (_aio_worker_cnt < _max_workers) { 17292248Sraf if (_aio_create_worker(reqp, mode)) 17302248Sraf aio_panic("_aio_req_add: add worker"); 17312248Sraf sigon(self); /* reenable SIGIO */ 17322248Sraf return; 17332248Sraf } 17342248Sraf 17352248Sraf /* 17362248Sraf * No worker available and we have created 17372248Sraf * _max_workers, keep going through the 17382248Sraf * list slowly until we get a lock 17392248Sraf */ 17402248Sraf while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) { 17412248Sraf /* 17422248Sraf * give someone else a chance 17432248Sraf */ 17442248Sraf _aio_delay(1); 17452248Sraf aiowp = aiowp->work_forw; 17462248Sraf } 17472248Sraf } 17482248Sraf 17492248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 17502248Sraf if (_aio_worker_cnt < _max_workers && 17512248Sraf aiowp->work_minload1 >= _minworkload) { 17522248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 17532248Sraf sig_mutex_lock(&__aio_mutex); 17542248Sraf *nextworker = aiowp->work_forw; 17552248Sraf sig_mutex_unlock(&__aio_mutex); 17562248Sraf if (_aio_create_worker(reqp, mode)) 17572248Sraf aio_panic("aio_req_add: add worker"); 17582248Sraf sigon(self); /* reenable SIGIO */ 17592248Sraf return; 17602248Sraf } 17612248Sraf aiowp->work_minload1++; 17622248Sraf break; 17632248Sraf case AIOFSYNC: 17642248Sraf case AIONOTIFY: 17652248Sraf load_bal_flg = 0; 17662248Sraf sig_mutex_lock(&aiowp->work_qlock1); 17672248Sraf break; 17682248Sraf default: 17692248Sraf aio_panic("_aio_req_add: invalid mode"); 17702248Sraf break; 17712248Sraf } 17722248Sraf /* 17732248Sraf * Put request onto worker's work queue. 17742248Sraf */ 17752248Sraf if (aiowp->work_tail1 == NULL) { 17762248Sraf ASSERT(aiowp->work_count1 == 0); 17772248Sraf aiowp->work_tail1 = reqp; 17782248Sraf aiowp->work_next1 = reqp; 17792248Sraf } else { 17802248Sraf aiowp->work_head1->req_next = reqp; 17812248Sraf if (aiowp->work_next1 == NULL) 17822248Sraf aiowp->work_next1 = reqp; 17832248Sraf } 17842248Sraf reqp->req_state = AIO_REQ_QUEUED; 17852248Sraf reqp->req_worker = aiowp; 17862248Sraf aiowp->work_head1 = reqp; 17872248Sraf /* 17882248Sraf * Awaken worker if it is not currently active. 17892248Sraf */ 17902248Sraf if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) { 17912248Sraf aiowp->work_idleflg = 0; 17922248Sraf (void) cond_signal(&aiowp->work_idle_cv); 17932248Sraf } 17942248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 17952248Sraf 17962248Sraf if (load_bal_flg) { 17972248Sraf sig_mutex_lock(&__aio_mutex); 17982248Sraf *nextworker = aiowp->work_forw; 17992248Sraf sig_mutex_unlock(&__aio_mutex); 18002248Sraf } 18012248Sraf sigon(self); /* reenable SIGIO */ 18022248Sraf } 18032248Sraf 18042248Sraf /* 18052248Sraf * Get an AIO request for a specified worker. 18062248Sraf * If the work queue is empty, return NULL. 18072248Sraf */ 18082248Sraf aio_req_t * 18092248Sraf _aio_req_get(aio_worker_t *aiowp) 18102248Sraf { 18112248Sraf aio_req_t *reqp; 18122248Sraf 18132248Sraf sig_mutex_lock(&aiowp->work_qlock1); 18142248Sraf if ((reqp = aiowp->work_next1) != NULL) { 18152248Sraf /* 18162248Sraf * Remove a POSIX request from the queue; the 18172248Sraf * request queue is a singularly linked list 18182248Sraf * with a previous pointer. The request is 18192248Sraf * removed by updating the previous pointer. 18202248Sraf * 18212248Sraf * Non-posix requests are left on the queue 18222248Sraf * to eventually be placed on the done queue. 18232248Sraf */ 18242248Sraf 18252248Sraf if (POSIX_AIO(reqp)) { 18262248Sraf if (aiowp->work_prev1 == NULL) { 18272248Sraf aiowp->work_tail1 = reqp->req_next; 18282248Sraf if (aiowp->work_tail1 == NULL) 18292248Sraf aiowp->work_head1 = NULL; 18302248Sraf } else { 18312248Sraf aiowp->work_prev1->req_next = reqp->req_next; 18322248Sraf if (aiowp->work_head1 == reqp) 18332248Sraf aiowp->work_head1 = reqp->req_next; 18342248Sraf } 18352248Sraf 18362248Sraf } else { 18372248Sraf aiowp->work_prev1 = reqp; 18382248Sraf ASSERT(aiowp->work_done1 >= 0); 18392248Sraf aiowp->work_done1++; 18402248Sraf } 18412248Sraf ASSERT(reqp != reqp->req_next); 18422248Sraf aiowp->work_next1 = reqp->req_next; 18432248Sraf ASSERT(aiowp->work_count1 >= 1); 18442248Sraf aiowp->work_count1--; 18452248Sraf switch (reqp->req_op) { 18462248Sraf case AIOREAD: 18472248Sraf case AIOWRITE: 18482248Sraf case AIOAREAD: 18492248Sraf case AIOAWRITE: 18502248Sraf #if !defined(_LP64) 18512248Sraf case AIOAREAD64: 18522248Sraf case AIOAWRITE64: 18532248Sraf #endif 18542248Sraf ASSERT(aiowp->work_minload1 > 0); 18552248Sraf aiowp->work_minload1--; 18562248Sraf break; 18572248Sraf } 18582248Sraf reqp->req_state = AIO_REQ_INPROGRESS; 18592248Sraf } 18602248Sraf aiowp->work_req = reqp; 18612248Sraf ASSERT(reqp != NULL || aiowp->work_count1 == 0); 18622248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 18632248Sraf return (reqp); 18642248Sraf } 18652248Sraf 18662248Sraf static void 18672248Sraf _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate) 18682248Sraf { 18692248Sraf aio_req_t **last; 18702248Sraf aio_req_t *lastrp; 18712248Sraf aio_req_t *next; 18722248Sraf 18732248Sraf ASSERT(aiowp != NULL); 18742248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 18752248Sraf if (POSIX_AIO(reqp)) { 18762248Sraf if (ostate != AIO_REQ_QUEUED) 18772248Sraf return; 18782248Sraf } 18792248Sraf last = &aiowp->work_tail1; 18802248Sraf lastrp = aiowp->work_tail1; 18812248Sraf ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS); 18822248Sraf while ((next = *last) != NULL) { 18832248Sraf if (next == reqp) { 18842248Sraf *last = next->req_next; 18852248Sraf if (aiowp->work_next1 == next) 18862248Sraf aiowp->work_next1 = next->req_next; 18872248Sraf 18882248Sraf if ((next->req_next != NULL) || 18892248Sraf (aiowp->work_done1 == 0)) { 18902248Sraf if (aiowp->work_head1 == next) 18912248Sraf aiowp->work_head1 = next->req_next; 18922248Sraf if (aiowp->work_prev1 == next) 18932248Sraf aiowp->work_prev1 = next->req_next; 18942248Sraf } else { 18952248Sraf if (aiowp->work_head1 == next) 18962248Sraf aiowp->work_head1 = lastrp; 18972248Sraf if (aiowp->work_prev1 == next) 18982248Sraf aiowp->work_prev1 = lastrp; 18992248Sraf } 19002248Sraf 19012248Sraf if (ostate == AIO_REQ_QUEUED) { 19022248Sraf ASSERT(aiowp->work_count1 >= 1); 19032248Sraf aiowp->work_count1--; 19042248Sraf ASSERT(aiowp->work_minload1 >= 1); 19052248Sraf aiowp->work_minload1--; 19062248Sraf } else { 19072248Sraf ASSERT(ostate == AIO_REQ_INPROGRESS && 19082248Sraf !POSIX_AIO(reqp)); 19092248Sraf aiowp->work_done1--; 19102248Sraf } 19112248Sraf return; 19122248Sraf } 19132248Sraf last = &next->req_next; 19142248Sraf lastrp = next; 19152248Sraf } 19162248Sraf /* NOTREACHED */ 19172248Sraf } 19182248Sraf 19192248Sraf static void 19202248Sraf _aio_enq_doneq(aio_req_t *reqp) 19212248Sraf { 19222248Sraf if (_aio_doneq == NULL) { 19232248Sraf _aio_doneq = reqp; 19242248Sraf reqp->req_next = reqp->req_prev = reqp; 19252248Sraf } else { 19262248Sraf reqp->req_next = _aio_doneq; 19272248Sraf reqp->req_prev = _aio_doneq->req_prev; 19282248Sraf _aio_doneq->req_prev->req_next = reqp; 19292248Sraf _aio_doneq->req_prev = reqp; 19302248Sraf } 19312248Sraf reqp->req_state = AIO_REQ_DONEQ; 19322248Sraf _aio_doneq_cnt++; 19332248Sraf } 19342248Sraf 19352248Sraf /* 19362248Sraf * caller owns the _aio_mutex 19372248Sraf */ 19382248Sraf aio_req_t * 19392248Sraf _aio_req_remove(aio_req_t *reqp) 19402248Sraf { 19412248Sraf if (reqp && reqp->req_state != AIO_REQ_DONEQ) 19422248Sraf return (NULL); 19432248Sraf 19442248Sraf if (reqp) { 19452248Sraf /* request in done queue */ 19462248Sraf if (_aio_doneq == reqp) 19472248Sraf _aio_doneq = reqp->req_next; 19482248Sraf if (_aio_doneq == reqp) { 19492248Sraf /* only one request on queue */ 19502248Sraf _aio_doneq = NULL; 19512248Sraf } else { 19522248Sraf aio_req_t *tmp = reqp->req_next; 19532248Sraf reqp->req_prev->req_next = tmp; 19542248Sraf tmp->req_prev = reqp->req_prev; 19552248Sraf } 19562248Sraf } else if ((reqp = _aio_doneq) != NULL) { 19572248Sraf if (reqp == reqp->req_next) { 19582248Sraf /* only one request on queue */ 19592248Sraf _aio_doneq = NULL; 19602248Sraf } else { 19612248Sraf reqp->req_prev->req_next = _aio_doneq = reqp->req_next; 19622248Sraf _aio_doneq->req_prev = reqp->req_prev; 19632248Sraf } 19642248Sraf } 19652248Sraf if (reqp) { 19662248Sraf _aio_doneq_cnt--; 19672248Sraf reqp->req_next = reqp->req_prev = reqp; 19682248Sraf reqp->req_state = AIO_REQ_DONE; 19692248Sraf } 19702248Sraf return (reqp); 19712248Sraf } 19722248Sraf 19732248Sraf /* 19742248Sraf * An AIO request is identified by an aio_result_t pointer. The library 19752248Sraf * maps this aio_result_t pointer to its internal representation using a 19762248Sraf * hash table. This function adds an aio_result_t pointer to the hash table. 19772248Sraf */ 19782248Sraf static int 19792248Sraf _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp) 19802248Sraf { 19812248Sraf aio_hash_t *hashp; 19822248Sraf aio_req_t **prev; 19832248Sraf aio_req_t *next; 19842248Sraf 19852248Sraf hashp = _aio_hash + AIOHASH(resultp); 19862248Sraf lmutex_lock(&hashp->hash_lock); 19872248Sraf prev = &hashp->hash_ptr; 19882248Sraf while ((next = *prev) != NULL) { 19892248Sraf if (resultp == next->req_resultp) { 19902248Sraf lmutex_unlock(&hashp->hash_lock); 19912248Sraf return (-1); 19922248Sraf } 19932248Sraf prev = &next->req_link; 19942248Sraf } 19952248Sraf *prev = reqp; 19962248Sraf ASSERT(reqp->req_link == NULL); 19972248Sraf lmutex_unlock(&hashp->hash_lock); 19982248Sraf return (0); 19992248Sraf } 20002248Sraf 20012248Sraf /* 20022248Sraf * Remove an entry from the hash table. 20032248Sraf */ 20042248Sraf aio_req_t * 20052248Sraf _aio_hash_del(aio_result_t *resultp) 20062248Sraf { 20072248Sraf aio_hash_t *hashp; 20082248Sraf aio_req_t **prev; 20092248Sraf aio_req_t *next = NULL; 20102248Sraf 20112248Sraf if (_aio_hash != NULL) { 20122248Sraf hashp = _aio_hash + AIOHASH(resultp); 20132248Sraf lmutex_lock(&hashp->hash_lock); 20142248Sraf prev = &hashp->hash_ptr; 20152248Sraf while ((next = *prev) != NULL) { 20162248Sraf if (resultp == next->req_resultp) { 20172248Sraf *prev = next->req_link; 20182248Sraf next->req_link = NULL; 20192248Sraf break; 20202248Sraf } 20212248Sraf prev = &next->req_link; 20222248Sraf } 20232248Sraf lmutex_unlock(&hashp->hash_lock); 20242248Sraf } 20252248Sraf return (next); 20262248Sraf } 20272248Sraf 20282248Sraf /* 20292248Sraf * find an entry in the hash table 20302248Sraf */ 20312248Sraf aio_req_t * 20322248Sraf _aio_hash_find(aio_result_t *resultp) 20332248Sraf { 20342248Sraf aio_hash_t *hashp; 20352248Sraf aio_req_t **prev; 20362248Sraf aio_req_t *next = NULL; 20372248Sraf 20382248Sraf if (_aio_hash != NULL) { 20392248Sraf hashp = _aio_hash + AIOHASH(resultp); 20402248Sraf lmutex_lock(&hashp->hash_lock); 20412248Sraf prev = &hashp->hash_ptr; 20422248Sraf while ((next = *prev) != NULL) { 20432248Sraf if (resultp == next->req_resultp) 20442248Sraf break; 20452248Sraf prev = &next->req_link; 20462248Sraf } 20472248Sraf lmutex_unlock(&hashp->hash_lock); 20482248Sraf } 20492248Sraf return (next); 20502248Sraf } 20512248Sraf 20522248Sraf /* 20532248Sraf * AIO interface for POSIX 20542248Sraf */ 20552248Sraf int 20562248Sraf _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 20572248Sraf int mode, int flg) 20582248Sraf { 20592248Sraf aio_req_t *reqp; 20602248Sraf aio_args_t *ap; 20612248Sraf int kerr; 20622248Sraf 20632248Sraf if (aiocbp == NULL) { 20642248Sraf errno = EINVAL; 20652248Sraf return (-1); 20662248Sraf } 20672248Sraf 20682248Sraf /* initialize kaio */ 20692248Sraf if (!_kaio_ok) 20702248Sraf _kaio_init(); 20712248Sraf 20722248Sraf aiocbp->aio_state = NOCHECK; 20732248Sraf 20742248Sraf /* 20752248Sraf * If we have been called because a list I/O 20762248Sraf * kaio() failed, we dont want to repeat the 20772248Sraf * system call 20782248Sraf */ 20792248Sraf 20802248Sraf if (flg & AIO_KAIO) { 20812248Sraf /* 20822248Sraf * Try kernel aio first. 20832248Sraf * If errno is ENOTSUP/EBADFD, 20842248Sraf * fall back to the thread implementation. 20852248Sraf */ 20862248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 20872248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 20882248Sraf aiocbp->aio_state = CHECK; 20892248Sraf kerr = (int)_kaio(mode, aiocbp); 20902248Sraf if (kerr == 0) 20912248Sraf return (0); 20922248Sraf if (errno != ENOTSUP && errno != EBADFD) { 20932248Sraf aiocbp->aio_resultp.aio_errno = errno; 20942248Sraf aiocbp->aio_resultp.aio_return = -1; 20952248Sraf aiocbp->aio_state = NOCHECK; 20962248Sraf return (-1); 20972248Sraf } 20982248Sraf if (errno == EBADFD) 20992248Sraf SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 21002248Sraf } 21012248Sraf } 21022248Sraf 21032248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 21042248Sraf aiocbp->aio_state = USERAIO; 21052248Sraf 21062248Sraf if (!__uaio_ok && __uaio_init() == -1) 21072248Sraf return (-1); 21082248Sraf 21092248Sraf if ((reqp = _aio_req_alloc()) == NULL) { 21102248Sraf errno = EAGAIN; 21112248Sraf return (-1); 21122248Sraf } 21132248Sraf 21142248Sraf /* 21152248Sraf * If an LIO request, add the list head to the aio request 21162248Sraf */ 21172248Sraf reqp->req_head = lio_head; 21182248Sraf reqp->req_type = AIO_POSIX_REQ; 21192248Sraf reqp->req_op = mode; 21202248Sraf reqp->req_largefile = 0; 21212248Sraf 21222248Sraf if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 21232248Sraf reqp->req_sigevent.sigev_notify = SIGEV_NONE; 21242248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 21252248Sraf reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 21262248Sraf reqp->req_sigevent.sigev_signo = 21272248Sraf aiocbp->aio_sigevent.sigev_signo; 21282248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 21292248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 21302248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 21312248Sraf port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 21322248Sraf reqp->req_sigevent.sigev_notify = SIGEV_PORT; 21332248Sraf /* 21342248Sraf * Reuse the sigevent structure to contain the port number 21352248Sraf * and the user value. Same for SIGEV_THREAD, below. 21362248Sraf */ 21372248Sraf reqp->req_sigevent.sigev_signo = 21382248Sraf pn->portnfy_port; 21392248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 21402248Sraf pn->portnfy_user; 21412248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 21422248Sraf reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 21432248Sraf /* 21442248Sraf * The sigevent structure contains the port number 21452248Sraf * and the user value. Same for SIGEV_PORT, above. 21462248Sraf */ 21472248Sraf reqp->req_sigevent.sigev_signo = 21482248Sraf aiocbp->aio_sigevent.sigev_signo; 21492248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 21502248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 21512248Sraf } 21522248Sraf 21532248Sraf reqp->req_resultp = &aiocbp->aio_resultp; 21542248Sraf reqp->req_aiocbp = aiocbp; 21552248Sraf ap = &reqp->req_args; 21562248Sraf ap->fd = aiocbp->aio_fildes; 21572248Sraf ap->buf = (caddr_t)aiocbp->aio_buf; 21582248Sraf ap->bufsz = aiocbp->aio_nbytes; 21592248Sraf ap->offset = aiocbp->aio_offset; 21602248Sraf 21612248Sraf if ((flg & AIO_NO_DUPS) && 21622248Sraf _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 21632248Sraf aio_panic("_aio_rw(): request already in hash table"); 21642248Sraf _aio_req_free(reqp); 21652248Sraf errno = EINVAL; 21662248Sraf return (-1); 21672248Sraf } 21682248Sraf _aio_req_add(reqp, nextworker, mode); 21692248Sraf return (0); 21702248Sraf } 21712248Sraf 21722248Sraf #if !defined(_LP64) 21732248Sraf /* 21742248Sraf * 64-bit AIO interface for POSIX 21752248Sraf */ 21762248Sraf int 21772248Sraf _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 21782248Sraf int mode, int flg) 21792248Sraf { 21802248Sraf aio_req_t *reqp; 21812248Sraf aio_args_t *ap; 21822248Sraf int kerr; 21832248Sraf 21842248Sraf if (aiocbp == NULL) { 21852248Sraf errno = EINVAL; 21862248Sraf return (-1); 21872248Sraf } 21882248Sraf 21892248Sraf /* initialize kaio */ 21902248Sraf if (!_kaio_ok) 21912248Sraf _kaio_init(); 21922248Sraf 21932248Sraf aiocbp->aio_state = NOCHECK; 21942248Sraf 21952248Sraf /* 21962248Sraf * If we have been called because a list I/O 21972248Sraf * kaio() failed, we dont want to repeat the 21982248Sraf * system call 21992248Sraf */ 22002248Sraf 22012248Sraf if (flg & AIO_KAIO) { 22022248Sraf /* 22032248Sraf * Try kernel aio first. 22042248Sraf * If errno is ENOTSUP/EBADFD, 22052248Sraf * fall back to the thread implementation. 22062248Sraf */ 22072248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 22082248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 22092248Sraf aiocbp->aio_state = CHECK; 22102248Sraf kerr = (int)_kaio(mode, aiocbp); 22112248Sraf if (kerr == 0) 22122248Sraf return (0); 22132248Sraf if (errno != ENOTSUP && errno != EBADFD) { 22142248Sraf aiocbp->aio_resultp.aio_errno = errno; 22152248Sraf aiocbp->aio_resultp.aio_return = -1; 22162248Sraf aiocbp->aio_state = NOCHECK; 22172248Sraf return (-1); 22182248Sraf } 22192248Sraf if (errno == EBADFD) 22202248Sraf SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 22212248Sraf } 22222248Sraf } 22232248Sraf 22242248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 22252248Sraf aiocbp->aio_state = USERAIO; 22262248Sraf 22272248Sraf if (!__uaio_ok && __uaio_init() == -1) 22282248Sraf return (-1); 22292248Sraf 22302248Sraf if ((reqp = _aio_req_alloc()) == NULL) { 22312248Sraf errno = EAGAIN; 22322248Sraf return (-1); 22332248Sraf } 22342248Sraf 22352248Sraf /* 22362248Sraf * If an LIO request, add the list head to the aio request 22372248Sraf */ 22382248Sraf reqp->req_head = lio_head; 22392248Sraf reqp->req_type = AIO_POSIX_REQ; 22402248Sraf reqp->req_op = mode; 22412248Sraf reqp->req_largefile = 1; 22422248Sraf 22432248Sraf if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 22442248Sraf reqp->req_sigevent.sigev_notify = SIGEV_NONE; 22452248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 22462248Sraf reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 22472248Sraf reqp->req_sigevent.sigev_signo = 22482248Sraf aiocbp->aio_sigevent.sigev_signo; 22492248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 22502248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 22512248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 22522248Sraf port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 22532248Sraf reqp->req_sigevent.sigev_notify = SIGEV_PORT; 22542248Sraf reqp->req_sigevent.sigev_signo = 22552248Sraf pn->portnfy_port; 22562248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 22572248Sraf pn->portnfy_user; 22582248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 22592248Sraf reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 22602248Sraf reqp->req_sigevent.sigev_signo = 22612248Sraf aiocbp->aio_sigevent.sigev_signo; 22622248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 22632248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 22642248Sraf } 22652248Sraf 22662248Sraf reqp->req_resultp = &aiocbp->aio_resultp; 22672248Sraf reqp->req_aiocbp = aiocbp; 22682248Sraf ap = &reqp->req_args; 22692248Sraf ap->fd = aiocbp->aio_fildes; 22702248Sraf ap->buf = (caddr_t)aiocbp->aio_buf; 22712248Sraf ap->bufsz = aiocbp->aio_nbytes; 22722248Sraf ap->offset = aiocbp->aio_offset; 22732248Sraf 22742248Sraf if ((flg & AIO_NO_DUPS) && 22752248Sraf _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 22762248Sraf aio_panic("_aio_rw64(): request already in hash table"); 22772248Sraf _aio_req_free(reqp); 22782248Sraf errno = EINVAL; 22792248Sraf return (-1); 22802248Sraf } 22812248Sraf _aio_req_add(reqp, nextworker, mode); 22822248Sraf return (0); 22832248Sraf } 22842248Sraf #endif /* !defined(_LP64) */ 2285