1*2248Sraf /* 2*2248Sraf * CDDL HEADER START 3*2248Sraf * 4*2248Sraf * The contents of this file are subject to the terms of the 5*2248Sraf * Common Development and Distribution License (the "License"). 6*2248Sraf * You may not use this file except in compliance with the License. 7*2248Sraf * 8*2248Sraf * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9*2248Sraf * or http://www.opensolaris.org/os/licensing. 10*2248Sraf * See the License for the specific language governing permissions 11*2248Sraf * and limitations under the License. 12*2248Sraf * 13*2248Sraf * When distributing Covered Code, include this CDDL HEADER in each 14*2248Sraf * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15*2248Sraf * If applicable, add the following below this CDDL HEADER, with the 16*2248Sraf * fields enclosed by brackets "[]" replaced with your own identifying 17*2248Sraf * information: Portions Copyright [yyyy] [name of copyright owner] 18*2248Sraf * 19*2248Sraf * CDDL HEADER END 20*2248Sraf */ 21*2248Sraf 22*2248Sraf /* 23*2248Sraf * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 24*2248Sraf * Use is subject to license terms. 25*2248Sraf */ 26*2248Sraf 27*2248Sraf #pragma ident "%Z%%M% %I% %E% SMI" 28*2248Sraf 29*2248Sraf #include "synonyms.h" 30*2248Sraf #include "thr_uberdata.h" 31*2248Sraf #include "asyncio.h" 32*2248Sraf #include <atomic.h> 33*2248Sraf #include <sys/param.h> 34*2248Sraf #include <sys/file.h> 35*2248Sraf #include <sys/port.h> 36*2248Sraf 37*2248Sraf static int _aio_hash_insert(aio_result_t *, aio_req_t *); 38*2248Sraf static aio_req_t *_aio_req_get(aio_worker_t *); 39*2248Sraf static void _aio_req_add(aio_req_t *, aio_worker_t **, int); 40*2248Sraf static void _aio_req_del(aio_worker_t *, aio_req_t *, int); 41*2248Sraf static void _aio_work_done(aio_worker_t *); 42*2248Sraf static void _aio_enq_doneq(aio_req_t *); 43*2248Sraf 44*2248Sraf extern void _aio_lio_free(aio_lio_t *); 45*2248Sraf 46*2248Sraf extern int __fdsync(int, int); 47*2248Sraf extern int _port_dispatch(int, int, int, int, uintptr_t, void *); 48*2248Sraf 49*2248Sraf static int _aio_fsync_del(aio_worker_t *, aio_req_t *); 50*2248Sraf static void _aiodone(aio_req_t *, ssize_t, int); 51*2248Sraf static void _aio_cancel_work(aio_worker_t *, int, int *, int *); 52*2248Sraf static void _aio_finish_request(aio_worker_t *, ssize_t, int); 53*2248Sraf 54*2248Sraf /* 55*2248Sraf * switch for kernel async I/O 56*2248Sraf */ 57*2248Sraf int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */ 58*2248Sraf 59*2248Sraf /* 60*2248Sraf * Key for thread-specific data 61*2248Sraf */ 62*2248Sraf pthread_key_t _aio_key; 63*2248Sraf 64*2248Sraf /* 65*2248Sraf * Array for determining whether or not a file supports kaio. 66*2248Sraf * Initialized in _kaio_init(). 67*2248Sraf */ 68*2248Sraf uint32_t *_kaio_supported = NULL; 69*2248Sraf 70*2248Sraf /* 71*2248Sraf * workers for read/write requests 72*2248Sraf * (__aio_mutex lock protects circular linked list of workers) 73*2248Sraf */ 74*2248Sraf aio_worker_t *__workers_rw; /* circular list of AIO workers */ 75*2248Sraf aio_worker_t *__nextworker_rw; /* next worker in list of workers */ 76*2248Sraf int __rw_workerscnt; /* number of read/write workers */ 77*2248Sraf 78*2248Sraf /* 79*2248Sraf * worker for notification requests. 80*2248Sraf */ 81*2248Sraf aio_worker_t *__workers_no; /* circular list of AIO workers */ 82*2248Sraf aio_worker_t *__nextworker_no; /* next worker in list of workers */ 83*2248Sraf int __no_workerscnt; /* number of write workers */ 84*2248Sraf 85*2248Sraf aio_req_t *_aio_done_tail; /* list of done requests */ 86*2248Sraf aio_req_t *_aio_done_head; 87*2248Sraf 88*2248Sraf mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */ 89*2248Sraf cond_t __aio_initcv = DEFAULTCV; 90*2248Sraf int __aio_initbusy = 0; 91*2248Sraf 92*2248Sraf mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */ 93*2248Sraf cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */ 94*2248Sraf 95*2248Sraf pid_t __pid = (pid_t)-1; /* initialize as invalid pid */ 96*2248Sraf int _sigio_enabled = 0; /* when set, send SIGIO signal */ 97*2248Sraf 98*2248Sraf aio_hash_t *_aio_hash; 99*2248Sraf 100*2248Sraf aio_req_t *_aio_doneq; /* double linked done queue list */ 101*2248Sraf 102*2248Sraf int _aio_donecnt = 0; 103*2248Sraf int _aio_waitncnt = 0; /* # of requests for aio_waitn */ 104*2248Sraf int _aio_doneq_cnt = 0; 105*2248Sraf int _aio_outstand_cnt = 0; /* # of outstanding requests */ 106*2248Sraf int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */ 107*2248Sraf int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */ 108*2248Sraf int _aio_kernel_suspend = 0; /* active kernel kaio calls */ 109*2248Sraf int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */ 110*2248Sraf 111*2248Sraf int _max_workers = 256; /* max number of workers permitted */ 112*2248Sraf int _min_workers = 4; /* min number of workers */ 113*2248Sraf int _minworkload = 2; /* min number of request in q */ 114*2248Sraf int _aio_worker_cnt = 0; /* number of workers to do requests */ 115*2248Sraf int __uaio_ok = 0; /* AIO has been enabled */ 116*2248Sraf sigset_t _worker_set; /* worker's signal mask */ 117*2248Sraf 118*2248Sraf int _aiowait_flag = 0; /* when set, aiowait() is inprogress */ 119*2248Sraf int _aio_flags = 0; /* see asyncio.h defines for */ 120*2248Sraf 121*2248Sraf aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */ 122*2248Sraf 123*2248Sraf int hz; /* clock ticks per second */ 124*2248Sraf 125*2248Sraf static int 126*2248Sraf _kaio_supported_init(void) 127*2248Sraf { 128*2248Sraf void *ptr; 129*2248Sraf size_t size; 130*2248Sraf 131*2248Sraf if (_kaio_supported != NULL) /* already initialized */ 132*2248Sraf return (0); 133*2248Sraf 134*2248Sraf size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t); 135*2248Sraf ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, 136*2248Sraf MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 137*2248Sraf if (ptr == MAP_FAILED) 138*2248Sraf return (-1); 139*2248Sraf _kaio_supported = ptr; 140*2248Sraf return (0); 141*2248Sraf } 142*2248Sraf 143*2248Sraf /* 144*2248Sraf * The aio subsystem is initialized when an AIO request is made. 145*2248Sraf * Constants are initialized like the max number of workers that 146*2248Sraf * the subsystem can create, and the minimum number of workers 147*2248Sraf * permitted before imposing some restrictions. Also, some 148*2248Sraf * workers are created. 149*2248Sraf */ 150*2248Sraf int 151*2248Sraf __uaio_init(void) 152*2248Sraf { 153*2248Sraf int ret = -1; 154*2248Sraf int i; 155*2248Sraf 156*2248Sraf lmutex_lock(&__aio_initlock); 157*2248Sraf while (__aio_initbusy) 158*2248Sraf (void) _cond_wait(&__aio_initcv, &__aio_initlock); 159*2248Sraf if (__uaio_ok) { /* already initialized */ 160*2248Sraf lmutex_unlock(&__aio_initlock); 161*2248Sraf return (0); 162*2248Sraf } 163*2248Sraf __aio_initbusy = 1; 164*2248Sraf lmutex_unlock(&__aio_initlock); 165*2248Sraf 166*2248Sraf hz = (int)sysconf(_SC_CLK_TCK); 167*2248Sraf __pid = getpid(); 168*2248Sraf 169*2248Sraf setup_cancelsig(SIGAIOCANCEL); 170*2248Sraf 171*2248Sraf if (_kaio_supported_init() != 0) 172*2248Sraf goto out; 173*2248Sraf 174*2248Sraf /* 175*2248Sraf * Allocate and initialize the hash table. 176*2248Sraf */ 177*2248Sraf /* LINTED pointer cast */ 178*2248Sraf _aio_hash = (aio_hash_t *)mmap(NULL, 179*2248Sraf HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE, 180*2248Sraf MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 181*2248Sraf if ((void *)_aio_hash == MAP_FAILED) { 182*2248Sraf _aio_hash = NULL; 183*2248Sraf goto out; 184*2248Sraf } 185*2248Sraf for (i = 0; i < HASHSZ; i++) 186*2248Sraf (void) mutex_init(&_aio_hash[i].hash_lock, USYNC_THREAD, NULL); 187*2248Sraf 188*2248Sraf /* 189*2248Sraf * Initialize worker's signal mask to only catch SIGAIOCANCEL. 190*2248Sraf */ 191*2248Sraf (void) sigfillset(&_worker_set); 192*2248Sraf (void) sigdelset(&_worker_set, SIGAIOCANCEL); 193*2248Sraf 194*2248Sraf /* 195*2248Sraf * Create the minimum number of read/write workers. 196*2248Sraf */ 197*2248Sraf for (i = 0; i < _min_workers; i++) 198*2248Sraf (void) _aio_create_worker(NULL, AIOREAD); 199*2248Sraf 200*2248Sraf /* 201*2248Sraf * Create one worker to send asynchronous notifications. 202*2248Sraf */ 203*2248Sraf (void) _aio_create_worker(NULL, AIONOTIFY); 204*2248Sraf 205*2248Sraf ret = 0; 206*2248Sraf out: 207*2248Sraf lmutex_lock(&__aio_initlock); 208*2248Sraf if (ret == 0) 209*2248Sraf __uaio_ok = 1; 210*2248Sraf __aio_initbusy = 0; 211*2248Sraf (void) cond_broadcast(&__aio_initcv); 212*2248Sraf lmutex_unlock(&__aio_initlock); 213*2248Sraf return (ret); 214*2248Sraf } 215*2248Sraf 216*2248Sraf /* 217*2248Sraf * Called from close() before actually performing the real _close(). 218*2248Sraf */ 219*2248Sraf void 220*2248Sraf _aio_close(int fd) 221*2248Sraf { 222*2248Sraf if (fd < 0) /* avoid cancelling everything */ 223*2248Sraf return; 224*2248Sraf /* 225*2248Sraf * Cancel all outstanding aio requests for this file descriptor. 226*2248Sraf */ 227*2248Sraf if (__uaio_ok) 228*2248Sraf (void) aiocancel_all(fd); 229*2248Sraf /* 230*2248Sraf * If we have allocated the bit array, clear the bit for this file. 231*2248Sraf * The next open may re-use this file descriptor and the new file 232*2248Sraf * may have different kaio() behaviour. 233*2248Sraf */ 234*2248Sraf if (_kaio_supported != NULL) 235*2248Sraf CLEAR_KAIO_SUPPORTED(fd); 236*2248Sraf } 237*2248Sraf 238*2248Sraf /* 239*2248Sraf * special kaio cleanup thread sits in a loop in the 240*2248Sraf * kernel waiting for pending kaio requests to complete. 241*2248Sraf */ 242*2248Sraf void * 243*2248Sraf _kaio_cleanup_thread(void *arg) 244*2248Sraf { 245*2248Sraf if (pthread_setspecific(_aio_key, arg) != 0) 246*2248Sraf aio_panic("_kaio_cleanup_thread, pthread_setspecific()"); 247*2248Sraf (void) _kaio(AIOSTART); 248*2248Sraf return (arg); 249*2248Sraf } 250*2248Sraf 251*2248Sraf /* 252*2248Sraf * initialize kaio. 253*2248Sraf */ 254*2248Sraf void 255*2248Sraf _kaio_init() 256*2248Sraf { 257*2248Sraf int error; 258*2248Sraf sigset_t oset; 259*2248Sraf 260*2248Sraf lmutex_lock(&__aio_initlock); 261*2248Sraf while (__aio_initbusy) 262*2248Sraf (void) _cond_wait(&__aio_initcv, &__aio_initlock); 263*2248Sraf if (_kaio_ok) { /* already initialized */ 264*2248Sraf lmutex_unlock(&__aio_initlock); 265*2248Sraf return; 266*2248Sraf } 267*2248Sraf __aio_initbusy = 1; 268*2248Sraf lmutex_unlock(&__aio_initlock); 269*2248Sraf 270*2248Sraf if (_kaio_supported_init() != 0) 271*2248Sraf error = ENOMEM; 272*2248Sraf else if ((_kaiowp = _aio_worker_alloc()) == NULL) 273*2248Sraf error = ENOMEM; 274*2248Sraf else if ((error = (int)_kaio(AIOINIT)) == 0) { 275*2248Sraf (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 276*2248Sraf error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread, 277*2248Sraf _kaiowp, THR_DAEMON, &_kaiowp->work_tid); 278*2248Sraf (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 279*2248Sraf } 280*2248Sraf if (error && _kaiowp != NULL) { 281*2248Sraf _aio_worker_free(_kaiowp); 282*2248Sraf _kaiowp = NULL; 283*2248Sraf } 284*2248Sraf 285*2248Sraf lmutex_lock(&__aio_initlock); 286*2248Sraf if (error) 287*2248Sraf _kaio_ok = -1; 288*2248Sraf else 289*2248Sraf _kaio_ok = 1; 290*2248Sraf __aio_initbusy = 0; 291*2248Sraf (void) cond_broadcast(&__aio_initcv); 292*2248Sraf lmutex_unlock(&__aio_initlock); 293*2248Sraf } 294*2248Sraf 295*2248Sraf int 296*2248Sraf aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 297*2248Sraf aio_result_t *resultp) 298*2248Sraf { 299*2248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD)); 300*2248Sraf } 301*2248Sraf 302*2248Sraf int 303*2248Sraf aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 304*2248Sraf aio_result_t *resultp) 305*2248Sraf { 306*2248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE)); 307*2248Sraf } 308*2248Sraf 309*2248Sraf #if !defined(_LP64) 310*2248Sraf int 311*2248Sraf aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 312*2248Sraf aio_result_t *resultp) 313*2248Sraf { 314*2248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64)); 315*2248Sraf } 316*2248Sraf 317*2248Sraf int 318*2248Sraf aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 319*2248Sraf aio_result_t *resultp) 320*2248Sraf { 321*2248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64)); 322*2248Sraf } 323*2248Sraf #endif /* !defined(_LP64) */ 324*2248Sraf 325*2248Sraf int 326*2248Sraf _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence, 327*2248Sraf aio_result_t *resultp, int mode) 328*2248Sraf { 329*2248Sraf aio_req_t *reqp; 330*2248Sraf aio_args_t *ap; 331*2248Sraf offset_t loffset; 332*2248Sraf struct stat stat; 333*2248Sraf int error = 0; 334*2248Sraf int kerr; 335*2248Sraf int umode; 336*2248Sraf 337*2248Sraf switch (whence) { 338*2248Sraf 339*2248Sraf case SEEK_SET: 340*2248Sraf loffset = offset; 341*2248Sraf break; 342*2248Sraf case SEEK_CUR: 343*2248Sraf if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1) 344*2248Sraf error = -1; 345*2248Sraf else 346*2248Sraf loffset += offset; 347*2248Sraf break; 348*2248Sraf case SEEK_END: 349*2248Sraf if (fstat(fd, &stat) == -1) 350*2248Sraf error = -1; 351*2248Sraf else 352*2248Sraf loffset = offset + stat.st_size; 353*2248Sraf break; 354*2248Sraf default: 355*2248Sraf errno = EINVAL; 356*2248Sraf error = -1; 357*2248Sraf } 358*2248Sraf 359*2248Sraf if (error) 360*2248Sraf return (error); 361*2248Sraf 362*2248Sraf /* initialize kaio */ 363*2248Sraf if (!_kaio_ok) 364*2248Sraf _kaio_init(); 365*2248Sraf 366*2248Sraf /* 367*2248Sraf * _aio_do_request() needs the original request code (mode) to be able 368*2248Sraf * to choose the appropiate 32/64 bit function. All other functions 369*2248Sraf * only require the difference between READ and WRITE (umode). 370*2248Sraf */ 371*2248Sraf if (mode == AIOAREAD64 || mode == AIOAWRITE64) 372*2248Sraf umode = mode - AIOAREAD64; 373*2248Sraf else 374*2248Sraf umode = mode; 375*2248Sraf 376*2248Sraf /* 377*2248Sraf * Try kernel aio first. 378*2248Sraf * If errno is ENOTSUP/EBADFD, fall back to the thread implementation. 379*2248Sraf */ 380*2248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) { 381*2248Sraf resultp->aio_errno = 0; 382*2248Sraf sig_mutex_lock(&__aio_mutex); 383*2248Sraf _kaio_outstand_cnt++; 384*2248Sraf kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ? 385*2248Sraf (umode | AIO_POLL_BIT) : umode), 386*2248Sraf fd, buf, bufsz, loffset, resultp); 387*2248Sraf if (kerr == 0) { 388*2248Sraf sig_mutex_unlock(&__aio_mutex); 389*2248Sraf return (0); 390*2248Sraf } 391*2248Sraf _kaio_outstand_cnt--; 392*2248Sraf sig_mutex_unlock(&__aio_mutex); 393*2248Sraf if (errno != ENOTSUP && errno != EBADFD) 394*2248Sraf return (-1); 395*2248Sraf if (errno == EBADFD) 396*2248Sraf SET_KAIO_NOT_SUPPORTED(fd); 397*2248Sraf } 398*2248Sraf 399*2248Sraf if (!__uaio_ok && __uaio_init() == -1) 400*2248Sraf return (-1); 401*2248Sraf 402*2248Sraf if ((reqp = _aio_req_alloc()) == NULL) { 403*2248Sraf errno = EAGAIN; 404*2248Sraf return (-1); 405*2248Sraf } 406*2248Sraf 407*2248Sraf /* 408*2248Sraf * _aio_do_request() checks reqp->req_op to differentiate 409*2248Sraf * between 32 and 64 bit access. 410*2248Sraf */ 411*2248Sraf reqp->req_op = mode; 412*2248Sraf reqp->req_resultp = resultp; 413*2248Sraf ap = &reqp->req_args; 414*2248Sraf ap->fd = fd; 415*2248Sraf ap->buf = buf; 416*2248Sraf ap->bufsz = bufsz; 417*2248Sraf ap->offset = loffset; 418*2248Sraf 419*2248Sraf if (_aio_hash_insert(resultp, reqp) != 0) { 420*2248Sraf _aio_req_free(reqp); 421*2248Sraf errno = EINVAL; 422*2248Sraf return (-1); 423*2248Sraf } 424*2248Sraf /* 425*2248Sraf * _aio_req_add() only needs the difference between READ and 426*2248Sraf * WRITE to choose the right worker queue. 427*2248Sraf */ 428*2248Sraf _aio_req_add(reqp, &__nextworker_rw, umode); 429*2248Sraf return (0); 430*2248Sraf } 431*2248Sraf 432*2248Sraf int 433*2248Sraf aiocancel(aio_result_t *resultp) 434*2248Sraf { 435*2248Sraf aio_req_t *reqp; 436*2248Sraf aio_worker_t *aiowp; 437*2248Sraf int ret; 438*2248Sraf int done = 0; 439*2248Sraf int canceled = 0; 440*2248Sraf 441*2248Sraf if (!__uaio_ok) { 442*2248Sraf errno = EINVAL; 443*2248Sraf return (-1); 444*2248Sraf } 445*2248Sraf 446*2248Sraf sig_mutex_lock(&__aio_mutex); 447*2248Sraf reqp = _aio_hash_find(resultp); 448*2248Sraf if (reqp == NULL) { 449*2248Sraf if (_aio_outstand_cnt == _aio_req_done_cnt) 450*2248Sraf errno = EINVAL; 451*2248Sraf else 452*2248Sraf errno = EACCES; 453*2248Sraf ret = -1; 454*2248Sraf } else { 455*2248Sraf aiowp = reqp->req_worker; 456*2248Sraf sig_mutex_lock(&aiowp->work_qlock1); 457*2248Sraf (void) _aio_cancel_req(aiowp, reqp, &canceled, &done); 458*2248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 459*2248Sraf 460*2248Sraf if (canceled) { 461*2248Sraf ret = 0; 462*2248Sraf } else { 463*2248Sraf if (_aio_outstand_cnt == 0 || 464*2248Sraf _aio_outstand_cnt == _aio_req_done_cnt) 465*2248Sraf errno = EINVAL; 466*2248Sraf else 467*2248Sraf errno = EACCES; 468*2248Sraf ret = -1; 469*2248Sraf } 470*2248Sraf } 471*2248Sraf sig_mutex_unlock(&__aio_mutex); 472*2248Sraf return (ret); 473*2248Sraf } 474*2248Sraf 475*2248Sraf /* 476*2248Sraf * This must be asynch safe 477*2248Sraf */ 478*2248Sraf aio_result_t * 479*2248Sraf aiowait(struct timeval *uwait) 480*2248Sraf { 481*2248Sraf aio_result_t *uresultp; 482*2248Sraf aio_result_t *kresultp; 483*2248Sraf aio_result_t *resultp; 484*2248Sraf int dontblock; 485*2248Sraf int timedwait = 0; 486*2248Sraf int kaio_errno = 0; 487*2248Sraf struct timeval twait; 488*2248Sraf struct timeval *wait = NULL; 489*2248Sraf hrtime_t hrtend; 490*2248Sraf hrtime_t hres; 491*2248Sraf 492*2248Sraf if (uwait) { 493*2248Sraf /* 494*2248Sraf * Check for a valid specified wait time. 495*2248Sraf * If it is invalid, fail the call right away. 496*2248Sraf */ 497*2248Sraf if (uwait->tv_sec < 0 || uwait->tv_usec < 0 || 498*2248Sraf uwait->tv_usec >= MICROSEC) { 499*2248Sraf errno = EINVAL; 500*2248Sraf return ((aio_result_t *)-1); 501*2248Sraf } 502*2248Sraf 503*2248Sraf if (uwait->tv_sec > 0 || uwait->tv_usec > 0) { 504*2248Sraf hrtend = gethrtime() + 505*2248Sraf (hrtime_t)uwait->tv_sec * NANOSEC + 506*2248Sraf (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC); 507*2248Sraf twait = *uwait; 508*2248Sraf wait = &twait; 509*2248Sraf timedwait++; 510*2248Sraf } else { 511*2248Sraf /* polling */ 512*2248Sraf sig_mutex_lock(&__aio_mutex); 513*2248Sraf if (_kaio_outstand_cnt == 0) { 514*2248Sraf kresultp = (aio_result_t *)-1; 515*2248Sraf } else { 516*2248Sraf kresultp = (aio_result_t *)_kaio(AIOWAIT, 517*2248Sraf (struct timeval *)-1, 1); 518*2248Sraf if (kresultp != (aio_result_t *)-1 && 519*2248Sraf kresultp != NULL && 520*2248Sraf kresultp != (aio_result_t *)1) { 521*2248Sraf _kaio_outstand_cnt--; 522*2248Sraf sig_mutex_unlock(&__aio_mutex); 523*2248Sraf return (kresultp); 524*2248Sraf } 525*2248Sraf } 526*2248Sraf uresultp = _aio_req_done(); 527*2248Sraf sig_mutex_unlock(&__aio_mutex); 528*2248Sraf if (uresultp != NULL && 529*2248Sraf uresultp != (aio_result_t *)-1) { 530*2248Sraf return (uresultp); 531*2248Sraf } 532*2248Sraf if (uresultp == (aio_result_t *)-1 && 533*2248Sraf kresultp == (aio_result_t *)-1) { 534*2248Sraf errno = EINVAL; 535*2248Sraf return ((aio_result_t *)-1); 536*2248Sraf } else { 537*2248Sraf return (NULL); 538*2248Sraf } 539*2248Sraf } 540*2248Sraf } 541*2248Sraf 542*2248Sraf for (;;) { 543*2248Sraf sig_mutex_lock(&__aio_mutex); 544*2248Sraf uresultp = _aio_req_done(); 545*2248Sraf if (uresultp != NULL && uresultp != (aio_result_t *)-1) { 546*2248Sraf sig_mutex_unlock(&__aio_mutex); 547*2248Sraf resultp = uresultp; 548*2248Sraf break; 549*2248Sraf } 550*2248Sraf _aiowait_flag++; 551*2248Sraf dontblock = (uresultp == (aio_result_t *)-1); 552*2248Sraf if (dontblock && _kaio_outstand_cnt == 0) { 553*2248Sraf kresultp = (aio_result_t *)-1; 554*2248Sraf kaio_errno = EINVAL; 555*2248Sraf } else { 556*2248Sraf sig_mutex_unlock(&__aio_mutex); 557*2248Sraf kresultp = (aio_result_t *)_kaio(AIOWAIT, 558*2248Sraf wait, dontblock); 559*2248Sraf sig_mutex_lock(&__aio_mutex); 560*2248Sraf kaio_errno = errno; 561*2248Sraf } 562*2248Sraf _aiowait_flag--; 563*2248Sraf sig_mutex_unlock(&__aio_mutex); 564*2248Sraf if (kresultp == (aio_result_t *)1) { 565*2248Sraf /* aiowait() awakened by an aionotify() */ 566*2248Sraf continue; 567*2248Sraf } else if (kresultp != NULL && 568*2248Sraf kresultp != (aio_result_t *)-1) { 569*2248Sraf resultp = kresultp; 570*2248Sraf sig_mutex_lock(&__aio_mutex); 571*2248Sraf _kaio_outstand_cnt--; 572*2248Sraf sig_mutex_unlock(&__aio_mutex); 573*2248Sraf break; 574*2248Sraf } else if (kresultp == (aio_result_t *)-1 && 575*2248Sraf kaio_errno == EINVAL && 576*2248Sraf uresultp == (aio_result_t *)-1) { 577*2248Sraf errno = kaio_errno; 578*2248Sraf resultp = (aio_result_t *)-1; 579*2248Sraf break; 580*2248Sraf } else if (kresultp == (aio_result_t *)-1 && 581*2248Sraf kaio_errno == EINTR) { 582*2248Sraf errno = kaio_errno; 583*2248Sraf resultp = (aio_result_t *)-1; 584*2248Sraf break; 585*2248Sraf } else if (timedwait) { 586*2248Sraf hres = hrtend - gethrtime(); 587*2248Sraf if (hres <= 0) { 588*2248Sraf /* time is up; return */ 589*2248Sraf resultp = NULL; 590*2248Sraf break; 591*2248Sraf } else { 592*2248Sraf /* 593*2248Sraf * Some time left. Round up the remaining time 594*2248Sraf * in nanoseconds to microsec. Retry the call. 595*2248Sraf */ 596*2248Sraf hres += (NANOSEC / MICROSEC) - 1; 597*2248Sraf wait->tv_sec = hres / NANOSEC; 598*2248Sraf wait->tv_usec = 599*2248Sraf (hres % NANOSEC) / (NANOSEC / MICROSEC); 600*2248Sraf } 601*2248Sraf } else { 602*2248Sraf ASSERT(kresultp == NULL && uresultp == NULL); 603*2248Sraf resultp = NULL; 604*2248Sraf continue; 605*2248Sraf } 606*2248Sraf } 607*2248Sraf return (resultp); 608*2248Sraf } 609*2248Sraf 610*2248Sraf /* 611*2248Sraf * _aio_get_timedelta calculates the remaining time and stores the result 612*2248Sraf * into timespec_t *wait. 613*2248Sraf */ 614*2248Sraf 615*2248Sraf int 616*2248Sraf _aio_get_timedelta(timespec_t *end, timespec_t *wait) 617*2248Sraf { 618*2248Sraf int ret = 0; 619*2248Sraf struct timeval cur; 620*2248Sraf timespec_t curtime; 621*2248Sraf 622*2248Sraf (void) gettimeofday(&cur, NULL); 623*2248Sraf curtime.tv_sec = cur.tv_sec; 624*2248Sraf curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */ 625*2248Sraf 626*2248Sraf if (end->tv_sec >= curtime.tv_sec) { 627*2248Sraf wait->tv_sec = end->tv_sec - curtime.tv_sec; 628*2248Sraf if (end->tv_nsec >= curtime.tv_nsec) { 629*2248Sraf wait->tv_nsec = end->tv_nsec - curtime.tv_nsec; 630*2248Sraf if (wait->tv_sec == 0 && wait->tv_nsec == 0) 631*2248Sraf ret = -1; /* timer expired */ 632*2248Sraf } else { 633*2248Sraf if (end->tv_sec > curtime.tv_sec) { 634*2248Sraf wait->tv_sec -= 1; 635*2248Sraf wait->tv_nsec = NANOSEC - 636*2248Sraf (curtime.tv_nsec - end->tv_nsec); 637*2248Sraf } else { 638*2248Sraf ret = -1; /* timer expired */ 639*2248Sraf } 640*2248Sraf } 641*2248Sraf } else { 642*2248Sraf ret = -1; 643*2248Sraf } 644*2248Sraf return (ret); 645*2248Sraf } 646*2248Sraf 647*2248Sraf /* 648*2248Sraf * If closing by file descriptor: we will simply cancel all the outstanding 649*2248Sraf * aio`s and return. Those aio's in question will have either noticed the 650*2248Sraf * cancellation notice before, during, or after initiating io. 651*2248Sraf */ 652*2248Sraf int 653*2248Sraf aiocancel_all(int fd) 654*2248Sraf { 655*2248Sraf aio_req_t *reqp; 656*2248Sraf aio_req_t **reqpp; 657*2248Sraf aio_worker_t *first; 658*2248Sraf aio_worker_t *next; 659*2248Sraf int canceled = 0; 660*2248Sraf int done = 0; 661*2248Sraf int cancelall = 0; 662*2248Sraf 663*2248Sraf sig_mutex_lock(&__aio_mutex); 664*2248Sraf 665*2248Sraf if (_aio_outstand_cnt == 0) { 666*2248Sraf sig_mutex_unlock(&__aio_mutex); 667*2248Sraf return (AIO_ALLDONE); 668*2248Sraf } 669*2248Sraf 670*2248Sraf /* 671*2248Sraf * Cancel requests from the read/write workers' queues. 672*2248Sraf */ 673*2248Sraf first = __nextworker_rw; 674*2248Sraf next = first; 675*2248Sraf do { 676*2248Sraf _aio_cancel_work(next, fd, &canceled, &done); 677*2248Sraf } while ((next = next->work_forw) != first); 678*2248Sraf 679*2248Sraf /* 680*2248Sraf * finally, check if there are requests on the done queue that 681*2248Sraf * should be canceled. 682*2248Sraf */ 683*2248Sraf if (fd < 0) 684*2248Sraf cancelall = 1; 685*2248Sraf reqpp = &_aio_done_tail; 686*2248Sraf while ((reqp = *reqpp) != NULL) { 687*2248Sraf if (cancelall || reqp->req_args.fd == fd) { 688*2248Sraf *reqpp = reqp->req_next; 689*2248Sraf _aio_donecnt--; 690*2248Sraf (void) _aio_hash_del(reqp->req_resultp); 691*2248Sraf _aio_req_free(reqp); 692*2248Sraf } else 693*2248Sraf reqpp = &reqp->req_next; 694*2248Sraf } 695*2248Sraf if (cancelall) { 696*2248Sraf ASSERT(_aio_donecnt == 0); 697*2248Sraf _aio_done_head = NULL; 698*2248Sraf } 699*2248Sraf sig_mutex_unlock(&__aio_mutex); 700*2248Sraf 701*2248Sraf if (canceled && done == 0) 702*2248Sraf return (AIO_CANCELED); 703*2248Sraf else if (done && canceled == 0) 704*2248Sraf return (AIO_ALLDONE); 705*2248Sraf else if ((canceled + done == 0) && KAIO_SUPPORTED(fd)) 706*2248Sraf return ((int)_kaio(AIOCANCEL, fd, NULL)); 707*2248Sraf return (AIO_NOTCANCELED); 708*2248Sraf } 709*2248Sraf 710*2248Sraf /* 711*2248Sraf * Cancel requests from a given work queue. If the file descriptor 712*2248Sraf * parameter, fd, is non-negative, then only cancel those requests 713*2248Sraf * in this queue that are to this file descriptor. If the fd 714*2248Sraf * parameter is -1, then cancel all requests. 715*2248Sraf */ 716*2248Sraf static void 717*2248Sraf _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done) 718*2248Sraf { 719*2248Sraf aio_req_t *reqp; 720*2248Sraf 721*2248Sraf sig_mutex_lock(&aiowp->work_qlock1); 722*2248Sraf /* 723*2248Sraf * cancel queued requests first. 724*2248Sraf */ 725*2248Sraf reqp = aiowp->work_tail1; 726*2248Sraf while (reqp != NULL) { 727*2248Sraf if (fd < 0 || reqp->req_args.fd == fd) { 728*2248Sraf if (_aio_cancel_req(aiowp, reqp, canceled, done)) { 729*2248Sraf /* 730*2248Sraf * Callers locks were dropped. 731*2248Sraf * reqp is invalid; start traversing 732*2248Sraf * the list from the beginning again. 733*2248Sraf */ 734*2248Sraf reqp = aiowp->work_tail1; 735*2248Sraf continue; 736*2248Sraf } 737*2248Sraf } 738*2248Sraf reqp = reqp->req_next; 739*2248Sraf } 740*2248Sraf /* 741*2248Sraf * Since the queued requests have been canceled, there can 742*2248Sraf * only be one inprogress request that should be canceled. 743*2248Sraf */ 744*2248Sraf if ((reqp = aiowp->work_req) != NULL && 745*2248Sraf (fd < 0 || reqp->req_args.fd == fd)) 746*2248Sraf (void) _aio_cancel_req(aiowp, reqp, canceled, done); 747*2248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 748*2248Sraf } 749*2248Sraf 750*2248Sraf /* 751*2248Sraf * Cancel a request. Return 1 if the callers locks were temporarily 752*2248Sraf * dropped, otherwise return 0. 753*2248Sraf */ 754*2248Sraf int 755*2248Sraf _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done) 756*2248Sraf { 757*2248Sraf int ostate = reqp->req_state; 758*2248Sraf 759*2248Sraf ASSERT(MUTEX_HELD(&__aio_mutex)); 760*2248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 761*2248Sraf if (ostate == AIO_REQ_CANCELED) 762*2248Sraf return (0); 763*2248Sraf if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) { 764*2248Sraf (*done)++; 765*2248Sraf return (0); 766*2248Sraf } 767*2248Sraf if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) { 768*2248Sraf ASSERT(POSIX_AIO(reqp)); 769*2248Sraf /* Cancel the queued aio_fsync() request */ 770*2248Sraf if (!reqp->req_head->lio_canned) { 771*2248Sraf reqp->req_head->lio_canned = 1; 772*2248Sraf _aio_outstand_cnt--; 773*2248Sraf (*canceled)++; 774*2248Sraf } 775*2248Sraf return (0); 776*2248Sraf } 777*2248Sraf reqp->req_state = AIO_REQ_CANCELED; 778*2248Sraf _aio_req_del(aiowp, reqp, ostate); 779*2248Sraf (void) _aio_hash_del(reqp->req_resultp); 780*2248Sraf (*canceled)++; 781*2248Sraf if (reqp == aiowp->work_req) { 782*2248Sraf ASSERT(ostate == AIO_REQ_INPROGRESS); 783*2248Sraf /* 784*2248Sraf * Set the result values now, before _aiodone() is called. 785*2248Sraf * We do this because the application can expect aio_return 786*2248Sraf * and aio_errno to be set to -1 and ECANCELED, respectively, 787*2248Sraf * immediately after a successful return from aiocancel() 788*2248Sraf * or aio_cancel(). 789*2248Sraf */ 790*2248Sraf _aio_set_result(reqp, -1, ECANCELED); 791*2248Sraf (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL); 792*2248Sraf return (0); 793*2248Sraf } 794*2248Sraf if (!POSIX_AIO(reqp)) { 795*2248Sraf _aio_outstand_cnt--; 796*2248Sraf _aio_set_result(reqp, -1, ECANCELED); 797*2248Sraf return (0); 798*2248Sraf } 799*2248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 800*2248Sraf sig_mutex_unlock(&__aio_mutex); 801*2248Sraf _aiodone(reqp, -1, ECANCELED); 802*2248Sraf sig_mutex_lock(&__aio_mutex); 803*2248Sraf sig_mutex_lock(&aiowp->work_qlock1); 804*2248Sraf return (1); 805*2248Sraf } 806*2248Sraf 807*2248Sraf int 808*2248Sraf _aio_create_worker(aio_req_t *reqp, int mode) 809*2248Sraf { 810*2248Sraf aio_worker_t *aiowp, **workers, **nextworker; 811*2248Sraf int *aio_workerscnt; 812*2248Sraf void *(*func)(void *); 813*2248Sraf sigset_t oset; 814*2248Sraf int error; 815*2248Sraf 816*2248Sraf /* 817*2248Sraf * Put the new worker thread in the right queue. 818*2248Sraf */ 819*2248Sraf switch (mode) { 820*2248Sraf case AIOREAD: 821*2248Sraf case AIOWRITE: 822*2248Sraf case AIOAREAD: 823*2248Sraf case AIOAWRITE: 824*2248Sraf #if !defined(_LP64) 825*2248Sraf case AIOAREAD64: 826*2248Sraf case AIOAWRITE64: 827*2248Sraf #endif 828*2248Sraf workers = &__workers_rw; 829*2248Sraf nextworker = &__nextworker_rw; 830*2248Sraf aio_workerscnt = &__rw_workerscnt; 831*2248Sraf func = _aio_do_request; 832*2248Sraf break; 833*2248Sraf case AIONOTIFY: 834*2248Sraf workers = &__workers_no; 835*2248Sraf nextworker = &__nextworker_no; 836*2248Sraf func = _aio_do_notify; 837*2248Sraf aio_workerscnt = &__no_workerscnt; 838*2248Sraf break; 839*2248Sraf default: 840*2248Sraf aio_panic("_aio_create_worker: invalid mode"); 841*2248Sraf break; 842*2248Sraf } 843*2248Sraf 844*2248Sraf if ((aiowp = _aio_worker_alloc()) == NULL) 845*2248Sraf return (-1); 846*2248Sraf 847*2248Sraf if (reqp) { 848*2248Sraf reqp->req_state = AIO_REQ_QUEUED; 849*2248Sraf reqp->req_worker = aiowp; 850*2248Sraf aiowp->work_head1 = reqp; 851*2248Sraf aiowp->work_tail1 = reqp; 852*2248Sraf aiowp->work_next1 = reqp; 853*2248Sraf aiowp->work_count1 = 1; 854*2248Sraf aiowp->work_minload1 = 1; 855*2248Sraf } 856*2248Sraf 857*2248Sraf (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 858*2248Sraf error = thr_create(NULL, AIOSTKSIZE, func, aiowp, 859*2248Sraf THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid); 860*2248Sraf (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 861*2248Sraf if (error) { 862*2248Sraf if (reqp) { 863*2248Sraf reqp->req_state = 0; 864*2248Sraf reqp->req_worker = NULL; 865*2248Sraf } 866*2248Sraf _aio_worker_free(aiowp); 867*2248Sraf return (-1); 868*2248Sraf } 869*2248Sraf 870*2248Sraf lmutex_lock(&__aio_mutex); 871*2248Sraf (*aio_workerscnt)++; 872*2248Sraf if (*workers == NULL) { 873*2248Sraf aiowp->work_forw = aiowp; 874*2248Sraf aiowp->work_backw = aiowp; 875*2248Sraf *nextworker = aiowp; 876*2248Sraf *workers = aiowp; 877*2248Sraf } else { 878*2248Sraf aiowp->work_backw = (*workers)->work_backw; 879*2248Sraf aiowp->work_forw = (*workers); 880*2248Sraf (*workers)->work_backw->work_forw = aiowp; 881*2248Sraf (*workers)->work_backw = aiowp; 882*2248Sraf } 883*2248Sraf _aio_worker_cnt++; 884*2248Sraf lmutex_unlock(&__aio_mutex); 885*2248Sraf 886*2248Sraf (void) thr_continue(aiowp->work_tid); 887*2248Sraf 888*2248Sraf return (0); 889*2248Sraf } 890*2248Sraf 891*2248Sraf /* 892*2248Sraf * This is the worker's main routine. 893*2248Sraf * The task of this function is to execute all queued requests; 894*2248Sraf * once the last pending request is executed this function will block 895*2248Sraf * in _aio_idle(). A new incoming request must wakeup this thread to 896*2248Sraf * restart the work. 897*2248Sraf * Every worker has an own work queue. The queue lock is required 898*2248Sraf * to synchronize the addition of new requests for this worker or 899*2248Sraf * cancellation of pending/running requests. 900*2248Sraf * 901*2248Sraf * Cancellation scenarios: 902*2248Sraf * The cancellation of a request is being done asynchronously using 903*2248Sraf * _aio_cancel_req() from another thread context. 904*2248Sraf * A queued request can be cancelled in different manners : 905*2248Sraf * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED): 906*2248Sraf * - lock the queue -> remove the request -> unlock the queue 907*2248Sraf * - this function/thread does not detect this cancellation process 908*2248Sraf * b) request is in progress (AIO_REQ_INPROGRESS) : 909*2248Sraf * - this function first allow the cancellation of the running 910*2248Sraf * request with the flag "work_cancel_flg=1" 911*2248Sraf * see _aio_req_get() -> _aio_cancel_on() 912*2248Sraf * During this phase, it is allowed to interrupt the worker 913*2248Sraf * thread running the request (this thread) using the SIGAIOCANCEL 914*2248Sraf * signal. 915*2248Sraf * Once this thread returns from the kernel (because the request 916*2248Sraf * is just done), then it must disable a possible cancellation 917*2248Sraf * and proceed to finish the request. To disable the cancellation 918*2248Sraf * this thread must use _aio_cancel_off() to set "work_cancel_flg=0". 919*2248Sraf * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ): 920*2248Sraf * same procedure as in a) 921*2248Sraf * 922*2248Sraf * To b) 923*2248Sraf * This thread uses sigsetjmp() to define the position in the code, where 924*2248Sraf * it wish to continue working in the case that a SIGAIOCANCEL signal 925*2248Sraf * is detected. 926*2248Sraf * Normally this thread should get the cancellation signal during the 927*2248Sraf * kernel phase (reading or writing). In that case the signal handler 928*2248Sraf * aiosigcancelhndlr() is activated using the worker thread context, 929*2248Sraf * which again will use the siglongjmp() function to break the standard 930*2248Sraf * code flow and jump to the "sigsetjmp" position, provided that 931*2248Sraf * "work_cancel_flg" is set to "1". 932*2248Sraf * Because the "work_cancel_flg" is only manipulated by this worker 933*2248Sraf * thread and it can only run on one CPU at a given time, it is not 934*2248Sraf * necessary to protect that flag with the queue lock. 935*2248Sraf * Returning from the kernel (read or write system call) we must 936*2248Sraf * first disable the use of the SIGAIOCANCEL signal and accordingly 937*2248Sraf * the use of the siglongjmp() function to prevent a possible deadlock: 938*2248Sraf * - It can happens that this worker thread returns from the kernel and 939*2248Sraf * blocks in "work_qlock1", 940*2248Sraf * - then a second thread cancels the apparently "in progress" request 941*2248Sraf * and sends the SIGAIOCANCEL signal to the worker thread, 942*2248Sraf * - the worker thread gets assigned the "work_qlock1" and will returns 943*2248Sraf * from the kernel, 944*2248Sraf * - the kernel detects the pending signal and activates the signal 945*2248Sraf * handler instead, 946*2248Sraf * - if the "work_cancel_flg" is still set then the signal handler 947*2248Sraf * should use siglongjmp() to cancel the "in progress" request and 948*2248Sraf * it would try to acquire the same work_qlock1 in _aio_req_get() 949*2248Sraf * for a second time => deadlock. 950*2248Sraf * To avoid that situation we disable the cancellation of the request 951*2248Sraf * in progress BEFORE we try to acquire the work_qlock1. 952*2248Sraf * In that case the signal handler will not call siglongjmp() and the 953*2248Sraf * worker thread will continue running the standard code flow. 954*2248Sraf * Then this thread must check the AIO_REQ_CANCELED flag to emulate 955*2248Sraf * an eventually required siglongjmp() freeing the work_qlock1 and 956*2248Sraf * avoiding a deadlock. 957*2248Sraf */ 958*2248Sraf void * 959*2248Sraf _aio_do_request(void *arglist) 960*2248Sraf { 961*2248Sraf aio_worker_t *aiowp = (aio_worker_t *)arglist; 962*2248Sraf ulwp_t *self = curthread; 963*2248Sraf struct aio_args *arg; 964*2248Sraf aio_req_t *reqp; /* current AIO request */ 965*2248Sraf ssize_t retval; 966*2248Sraf int error; 967*2248Sraf 968*2248Sraf if (pthread_setspecific(_aio_key, aiowp) != 0) 969*2248Sraf aio_panic("_aio_do_request, pthread_setspecific()"); 970*2248Sraf (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL); 971*2248Sraf ASSERT(aiowp->work_req == NULL); 972*2248Sraf 973*2248Sraf /* 974*2248Sraf * We resume here when an operation is cancelled. 975*2248Sraf * On first entry, aiowp->work_req == NULL, so all 976*2248Sraf * we do is block SIGAIOCANCEL. 977*2248Sraf */ 978*2248Sraf (void) sigsetjmp(aiowp->work_jmp_buf, 0); 979*2248Sraf ASSERT(self->ul_sigdefer == 0); 980*2248Sraf 981*2248Sraf sigoff(self); /* block SIGAIOCANCEL */ 982*2248Sraf if (aiowp->work_req != NULL) 983*2248Sraf _aio_finish_request(aiowp, -1, ECANCELED); 984*2248Sraf 985*2248Sraf for (;;) { 986*2248Sraf /* 987*2248Sraf * Put completed requests on aio_done_list. This has 988*2248Sraf * to be done as part of the main loop to ensure that 989*2248Sraf * we don't artificially starve any aiowait'ers. 990*2248Sraf */ 991*2248Sraf if (aiowp->work_done1) 992*2248Sraf _aio_work_done(aiowp); 993*2248Sraf 994*2248Sraf top: 995*2248Sraf /* consume any deferred SIGAIOCANCEL signal here */ 996*2248Sraf sigon(self); 997*2248Sraf sigoff(self); 998*2248Sraf 999*2248Sraf while ((reqp = _aio_req_get(aiowp)) == NULL) { 1000*2248Sraf if (_aio_idle(aiowp) != 0) 1001*2248Sraf goto top; 1002*2248Sraf } 1003*2248Sraf arg = &reqp->req_args; 1004*2248Sraf ASSERT(reqp->req_state == AIO_REQ_INPROGRESS || 1005*2248Sraf reqp->req_state == AIO_REQ_CANCELED); 1006*2248Sraf error = 0; 1007*2248Sraf 1008*2248Sraf switch (reqp->req_op) { 1009*2248Sraf case AIOREAD: 1010*2248Sraf case AIOAREAD: 1011*2248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 1012*2248Sraf retval = pread(arg->fd, arg->buf, 1013*2248Sraf arg->bufsz, arg->offset); 1014*2248Sraf if (retval == -1) { 1015*2248Sraf if (errno == ESPIPE) { 1016*2248Sraf retval = read(arg->fd, 1017*2248Sraf arg->buf, arg->bufsz); 1018*2248Sraf if (retval == -1) 1019*2248Sraf error = errno; 1020*2248Sraf } else { 1021*2248Sraf error = errno; 1022*2248Sraf } 1023*2248Sraf } 1024*2248Sraf sigoff(self); /* block SIGAIOCANCEL */ 1025*2248Sraf break; 1026*2248Sraf case AIOWRITE: 1027*2248Sraf case AIOAWRITE: 1028*2248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 1029*2248Sraf retval = pwrite(arg->fd, arg->buf, 1030*2248Sraf arg->bufsz, arg->offset); 1031*2248Sraf if (retval == -1) { 1032*2248Sraf if (errno == ESPIPE) { 1033*2248Sraf retval = write(arg->fd, 1034*2248Sraf arg->buf, arg->bufsz); 1035*2248Sraf if (retval == -1) 1036*2248Sraf error = errno; 1037*2248Sraf } else { 1038*2248Sraf error = errno; 1039*2248Sraf } 1040*2248Sraf } 1041*2248Sraf sigoff(self); /* block SIGAIOCANCEL */ 1042*2248Sraf break; 1043*2248Sraf #if !defined(_LP64) 1044*2248Sraf case AIOAREAD64: 1045*2248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 1046*2248Sraf retval = pread64(arg->fd, arg->buf, 1047*2248Sraf arg->bufsz, arg->offset); 1048*2248Sraf if (retval == -1) { 1049*2248Sraf if (errno == ESPIPE) { 1050*2248Sraf retval = read(arg->fd, 1051*2248Sraf arg->buf, arg->bufsz); 1052*2248Sraf if (retval == -1) 1053*2248Sraf error = errno; 1054*2248Sraf } else { 1055*2248Sraf error = errno; 1056*2248Sraf } 1057*2248Sraf } 1058*2248Sraf sigoff(self); /* block SIGAIOCANCEL */ 1059*2248Sraf break; 1060*2248Sraf case AIOAWRITE64: 1061*2248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 1062*2248Sraf retval = pwrite64(arg->fd, arg->buf, 1063*2248Sraf arg->bufsz, arg->offset); 1064*2248Sraf if (retval == -1) { 1065*2248Sraf if (errno == ESPIPE) { 1066*2248Sraf retval = write(arg->fd, 1067*2248Sraf arg->buf, arg->bufsz); 1068*2248Sraf if (retval == -1) 1069*2248Sraf error = errno; 1070*2248Sraf } else { 1071*2248Sraf error = errno; 1072*2248Sraf } 1073*2248Sraf } 1074*2248Sraf sigoff(self); /* block SIGAIOCANCEL */ 1075*2248Sraf break; 1076*2248Sraf #endif /* !defined(_LP64) */ 1077*2248Sraf case AIOFSYNC: 1078*2248Sraf if (_aio_fsync_del(aiowp, reqp)) 1079*2248Sraf goto top; 1080*2248Sraf ASSERT(reqp->req_head == NULL); 1081*2248Sraf /* 1082*2248Sraf * All writes for this fsync request are now 1083*2248Sraf * acknowledged. Now make these writes visible 1084*2248Sraf * and put the final request into the hash table. 1085*2248Sraf */ 1086*2248Sraf if (reqp->req_state == AIO_REQ_CANCELED) { 1087*2248Sraf /* EMPTY */; 1088*2248Sraf } else if (arg->offset == O_SYNC) { 1089*2248Sraf if ((retval = __fdsync(arg->fd, FSYNC)) == -1) 1090*2248Sraf error = errno; 1091*2248Sraf } else { 1092*2248Sraf if ((retval = __fdsync(arg->fd, FDSYNC)) == -1) 1093*2248Sraf error = errno; 1094*2248Sraf } 1095*2248Sraf if (_aio_hash_insert(reqp->req_resultp, reqp) != 0) 1096*2248Sraf aio_panic("_aio_do_request(): AIOFSYNC: " 1097*2248Sraf "request already in hash table"); 1098*2248Sraf break; 1099*2248Sraf default: 1100*2248Sraf aio_panic("_aio_do_request, bad op"); 1101*2248Sraf } 1102*2248Sraf 1103*2248Sraf _aio_finish_request(aiowp, retval, error); 1104*2248Sraf } 1105*2248Sraf /* NOTREACHED */ 1106*2248Sraf return (NULL); 1107*2248Sraf } 1108*2248Sraf 1109*2248Sraf /* 1110*2248Sraf * Perform the tail processing for _aio_do_request(). 1111*2248Sraf * The in-progress request may or may not have been cancelled. 1112*2248Sraf */ 1113*2248Sraf static void 1114*2248Sraf _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error) 1115*2248Sraf { 1116*2248Sraf aio_req_t *reqp; 1117*2248Sraf 1118*2248Sraf sig_mutex_lock(&aiowp->work_qlock1); 1119*2248Sraf if ((reqp = aiowp->work_req) == NULL) 1120*2248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 1121*2248Sraf else { 1122*2248Sraf aiowp->work_req = NULL; 1123*2248Sraf if (reqp->req_state == AIO_REQ_CANCELED) { 1124*2248Sraf retval = -1; 1125*2248Sraf error = ECANCELED; 1126*2248Sraf } 1127*2248Sraf if (!POSIX_AIO(reqp)) { 1128*2248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 1129*2248Sraf sig_mutex_lock(&__aio_mutex); 1130*2248Sraf if (reqp->req_state == AIO_REQ_INPROGRESS) 1131*2248Sraf reqp->req_state = AIO_REQ_DONE; 1132*2248Sraf _aio_req_done_cnt++; 1133*2248Sraf _aio_set_result(reqp, retval, error); 1134*2248Sraf if (error == ECANCELED) 1135*2248Sraf _aio_outstand_cnt--; 1136*2248Sraf sig_mutex_unlock(&__aio_mutex); 1137*2248Sraf } else { 1138*2248Sraf if (reqp->req_state == AIO_REQ_INPROGRESS) 1139*2248Sraf reqp->req_state = AIO_REQ_DONE; 1140*2248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 1141*2248Sraf _aiodone(reqp, retval, error); 1142*2248Sraf } 1143*2248Sraf } 1144*2248Sraf } 1145*2248Sraf 1146*2248Sraf void 1147*2248Sraf _aio_req_mark_done(aio_req_t *reqp) 1148*2248Sraf { 1149*2248Sraf #if !defined(_LP64) 1150*2248Sraf if (reqp->req_largefile) 1151*2248Sraf ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1152*2248Sraf else 1153*2248Sraf #endif 1154*2248Sraf ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1155*2248Sraf } 1156*2248Sraf 1157*2248Sraf /* 1158*2248Sraf * Sleep for 'ticks' clock ticks to give somebody else a chance to run, 1159*2248Sraf * hopefully to consume one of our queued signals. 1160*2248Sraf */ 1161*2248Sraf static void 1162*2248Sraf _aio_delay(int ticks) 1163*2248Sraf { 1164*2248Sraf (void) usleep(ticks * (MICROSEC / hz)); 1165*2248Sraf } 1166*2248Sraf 1167*2248Sraf /* 1168*2248Sraf * Actually send the notifications. 1169*2248Sraf * We could block indefinitely here if the application 1170*2248Sraf * is not listening for the signal or port notifications. 1171*2248Sraf */ 1172*2248Sraf static void 1173*2248Sraf send_notification(notif_param_t *npp) 1174*2248Sraf { 1175*2248Sraf extern int __sigqueue(pid_t pid, int signo, 1176*2248Sraf /* const union sigval */ void *value, int si_code, int block); 1177*2248Sraf 1178*2248Sraf if (npp->np_signo) 1179*2248Sraf (void) __sigqueue(__pid, npp->np_signo, npp->np_user, 1180*2248Sraf SI_ASYNCIO, 1); 1181*2248Sraf else if (npp->np_port >= 0) 1182*2248Sraf (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO, 1183*2248Sraf npp->np_event, npp->np_object, npp->np_user); 1184*2248Sraf 1185*2248Sraf if (npp->np_lio_signo) 1186*2248Sraf (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user, 1187*2248Sraf SI_ASYNCIO, 1); 1188*2248Sraf else if (npp->np_lio_port >= 0) 1189*2248Sraf (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO, 1190*2248Sraf npp->np_lio_event, npp->np_lio_object, npp->np_lio_user); 1191*2248Sraf } 1192*2248Sraf 1193*2248Sraf /* 1194*2248Sraf * Asynchronous notification worker. 1195*2248Sraf */ 1196*2248Sraf void * 1197*2248Sraf _aio_do_notify(void *arg) 1198*2248Sraf { 1199*2248Sraf aio_worker_t *aiowp = (aio_worker_t *)arg; 1200*2248Sraf aio_req_t *reqp; 1201*2248Sraf 1202*2248Sraf /* 1203*2248Sraf * This isn't really necessary. All signals are blocked. 1204*2248Sraf */ 1205*2248Sraf if (pthread_setspecific(_aio_key, aiowp) != 0) 1206*2248Sraf aio_panic("_aio_do_notify, pthread_setspecific()"); 1207*2248Sraf 1208*2248Sraf /* 1209*2248Sraf * Notifications are never cancelled. 1210*2248Sraf * All signals remain blocked, forever. 1211*2248Sraf */ 1212*2248Sraf for (;;) { 1213*2248Sraf while ((reqp = _aio_req_get(aiowp)) == NULL) { 1214*2248Sraf if (_aio_idle(aiowp) != 0) 1215*2248Sraf aio_panic("_aio_do_notify: _aio_idle() failed"); 1216*2248Sraf } 1217*2248Sraf send_notification(&reqp->req_notify); 1218*2248Sraf _aio_req_free(reqp); 1219*2248Sraf } 1220*2248Sraf 1221*2248Sraf /* NOTREACHED */ 1222*2248Sraf return (NULL); 1223*2248Sraf } 1224*2248Sraf 1225*2248Sraf /* 1226*2248Sraf * Do the completion semantics for a request that was either canceled 1227*2248Sraf * by _aio_cancel_req() or was completed by _aio_do_request(). 1228*2248Sraf */ 1229*2248Sraf static void 1230*2248Sraf _aiodone(aio_req_t *reqp, ssize_t retval, int error) 1231*2248Sraf { 1232*2248Sraf aio_result_t *resultp = reqp->req_resultp; 1233*2248Sraf int notify = 0; 1234*2248Sraf aio_lio_t *head; 1235*2248Sraf int sigev_none; 1236*2248Sraf int sigev_signal; 1237*2248Sraf int sigev_thread; 1238*2248Sraf int sigev_port; 1239*2248Sraf notif_param_t np; 1240*2248Sraf 1241*2248Sraf /* 1242*2248Sraf * We call _aiodone() only for Posix I/O. 1243*2248Sraf */ 1244*2248Sraf ASSERT(POSIX_AIO(reqp)); 1245*2248Sraf 1246*2248Sraf sigev_none = 0; 1247*2248Sraf sigev_signal = 0; 1248*2248Sraf sigev_thread = 0; 1249*2248Sraf sigev_port = 0; 1250*2248Sraf np.np_signo = 0; 1251*2248Sraf np.np_port = -1; 1252*2248Sraf np.np_lio_signo = 0; 1253*2248Sraf np.np_lio_port = -1; 1254*2248Sraf 1255*2248Sraf switch (reqp->req_sigevent.sigev_notify) { 1256*2248Sraf case SIGEV_NONE: 1257*2248Sraf sigev_none = 1; 1258*2248Sraf break; 1259*2248Sraf case SIGEV_SIGNAL: 1260*2248Sraf sigev_signal = 1; 1261*2248Sraf break; 1262*2248Sraf case SIGEV_THREAD: 1263*2248Sraf sigev_thread = 1; 1264*2248Sraf break; 1265*2248Sraf case SIGEV_PORT: 1266*2248Sraf sigev_port = 1; 1267*2248Sraf break; 1268*2248Sraf default: 1269*2248Sraf aio_panic("_aiodone: improper sigev_notify"); 1270*2248Sraf break; 1271*2248Sraf } 1272*2248Sraf 1273*2248Sraf /* 1274*2248Sraf * Figure out the notification parameters while holding __aio_mutex. 1275*2248Sraf * Actually perform the notifications after dropping __aio_mutex. 1276*2248Sraf * This allows us to sleep for a long time (if the notifications 1277*2248Sraf * incur delays) without impeding other async I/O operations. 1278*2248Sraf */ 1279*2248Sraf 1280*2248Sraf sig_mutex_lock(&__aio_mutex); 1281*2248Sraf 1282*2248Sraf if (sigev_signal) { 1283*2248Sraf if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0) 1284*2248Sraf notify = 1; 1285*2248Sraf np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1286*2248Sraf } else if (sigev_thread | sigev_port) { 1287*2248Sraf if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0) 1288*2248Sraf notify = 1; 1289*2248Sraf np.np_event = reqp->req_op; 1290*2248Sraf if (np.np_event == AIOFSYNC && reqp->req_largefile) 1291*2248Sraf np.np_event = AIOFSYNC64; 1292*2248Sraf np.np_object = (uintptr_t)reqp->req_aiocbp; 1293*2248Sraf np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1294*2248Sraf } 1295*2248Sraf 1296*2248Sraf if (resultp->aio_errno == EINPROGRESS) 1297*2248Sraf _aio_set_result(reqp, retval, error); 1298*2248Sraf 1299*2248Sraf _aio_outstand_cnt--; 1300*2248Sraf 1301*2248Sraf head = reqp->req_head; 1302*2248Sraf reqp->req_head = NULL; 1303*2248Sraf 1304*2248Sraf if (sigev_none) { 1305*2248Sraf _aio_enq_doneq(reqp); 1306*2248Sraf reqp = NULL; 1307*2248Sraf } else { 1308*2248Sraf (void) _aio_hash_del(resultp); 1309*2248Sraf _aio_req_mark_done(reqp); 1310*2248Sraf } 1311*2248Sraf 1312*2248Sraf _aio_waitn_wakeup(); 1313*2248Sraf 1314*2248Sraf /* 1315*2248Sraf * __aio_waitn() sets AIO_WAIT_INPROGRESS and 1316*2248Sraf * __aio_suspend() increments "_aio_kernel_suspend" 1317*2248Sraf * when they are waiting in the kernel for completed I/Os. 1318*2248Sraf * 1319*2248Sraf * _kaio(AIONOTIFY) awakes the corresponding function 1320*2248Sraf * in the kernel; then the corresponding __aio_waitn() or 1321*2248Sraf * __aio_suspend() function could reap the recently 1322*2248Sraf * completed I/Os (_aiodone()). 1323*2248Sraf */ 1324*2248Sraf if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0) 1325*2248Sraf (void) _kaio(AIONOTIFY); 1326*2248Sraf 1327*2248Sraf sig_mutex_unlock(&__aio_mutex); 1328*2248Sraf 1329*2248Sraf if (head != NULL) { 1330*2248Sraf /* 1331*2248Sraf * If all the lio requests have completed, 1332*2248Sraf * prepare to notify the waiting thread. 1333*2248Sraf */ 1334*2248Sraf sig_mutex_lock(&head->lio_mutex); 1335*2248Sraf ASSERT(head->lio_refcnt == head->lio_nent); 1336*2248Sraf if (head->lio_refcnt == 1) { 1337*2248Sraf int waiting = 0; 1338*2248Sraf if (head->lio_mode == LIO_WAIT) { 1339*2248Sraf if ((waiting = head->lio_waiting) != 0) 1340*2248Sraf (void) cond_signal(&head->lio_cond_cv); 1341*2248Sraf } else if (head->lio_port < 0) { /* none or signal */ 1342*2248Sraf if ((np.np_lio_signo = head->lio_signo) != 0) 1343*2248Sraf notify = 1; 1344*2248Sraf np.np_lio_user = head->lio_sigval.sival_ptr; 1345*2248Sraf } else { /* thread or port */ 1346*2248Sraf notify = 1; 1347*2248Sraf np.np_lio_port = head->lio_port; 1348*2248Sraf np.np_lio_event = head->lio_event; 1349*2248Sraf np.np_lio_object = 1350*2248Sraf (uintptr_t)head->lio_sigevent; 1351*2248Sraf np.np_lio_user = head->lio_sigval.sival_ptr; 1352*2248Sraf } 1353*2248Sraf head->lio_nent = head->lio_refcnt = 0; 1354*2248Sraf sig_mutex_unlock(&head->lio_mutex); 1355*2248Sraf if (waiting == 0) 1356*2248Sraf _aio_lio_free(head); 1357*2248Sraf } else { 1358*2248Sraf head->lio_nent--; 1359*2248Sraf head->lio_refcnt--; 1360*2248Sraf sig_mutex_unlock(&head->lio_mutex); 1361*2248Sraf } 1362*2248Sraf } 1363*2248Sraf 1364*2248Sraf /* 1365*2248Sraf * The request is completed; now perform the notifications. 1366*2248Sraf */ 1367*2248Sraf if (notify) { 1368*2248Sraf if (reqp != NULL) { 1369*2248Sraf /* 1370*2248Sraf * We usually put the request on the notification 1371*2248Sraf * queue because we don't want to block and delay 1372*2248Sraf * other operations behind us in the work queue. 1373*2248Sraf * Also we must never block on a cancel notification 1374*2248Sraf * because we are being called from an application 1375*2248Sraf * thread in this case and that could lead to deadlock 1376*2248Sraf * if no other thread is receiving notificatins. 1377*2248Sraf */ 1378*2248Sraf reqp->req_notify = np; 1379*2248Sraf reqp->req_op = AIONOTIFY; 1380*2248Sraf _aio_req_add(reqp, &__workers_no, AIONOTIFY); 1381*2248Sraf reqp = NULL; 1382*2248Sraf } else { 1383*2248Sraf /* 1384*2248Sraf * We already put the request on the done queue, 1385*2248Sraf * so we can't queue it to the notification queue. 1386*2248Sraf * Just do the notification directly. 1387*2248Sraf */ 1388*2248Sraf send_notification(&np); 1389*2248Sraf } 1390*2248Sraf } 1391*2248Sraf 1392*2248Sraf if (reqp != NULL) 1393*2248Sraf _aio_req_free(reqp); 1394*2248Sraf } 1395*2248Sraf 1396*2248Sraf /* 1397*2248Sraf * Delete fsync requests from list head until there is 1398*2248Sraf * only one left. Return 0 when there is only one, 1399*2248Sraf * otherwise return a non-zero value. 1400*2248Sraf */ 1401*2248Sraf static int 1402*2248Sraf _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp) 1403*2248Sraf { 1404*2248Sraf aio_lio_t *head = reqp->req_head; 1405*2248Sraf int rval = 0; 1406*2248Sraf 1407*2248Sraf ASSERT(reqp == aiowp->work_req); 1408*2248Sraf sig_mutex_lock(&aiowp->work_qlock1); 1409*2248Sraf sig_mutex_lock(&head->lio_mutex); 1410*2248Sraf if (head->lio_refcnt > 1) { 1411*2248Sraf head->lio_refcnt--; 1412*2248Sraf head->lio_nent--; 1413*2248Sraf aiowp->work_req = NULL; 1414*2248Sraf sig_mutex_unlock(&head->lio_mutex); 1415*2248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 1416*2248Sraf sig_mutex_lock(&__aio_mutex); 1417*2248Sraf _aio_outstand_cnt--; 1418*2248Sraf _aio_waitn_wakeup(); 1419*2248Sraf sig_mutex_unlock(&__aio_mutex); 1420*2248Sraf _aio_req_free(reqp); 1421*2248Sraf return (1); 1422*2248Sraf } 1423*2248Sraf ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1); 1424*2248Sraf reqp->req_head = NULL; 1425*2248Sraf if (head->lio_canned) 1426*2248Sraf reqp->req_state = AIO_REQ_CANCELED; 1427*2248Sraf if (head->lio_mode == LIO_DESTROY) { 1428*2248Sraf aiowp->work_req = NULL; 1429*2248Sraf rval = 1; 1430*2248Sraf } 1431*2248Sraf sig_mutex_unlock(&head->lio_mutex); 1432*2248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 1433*2248Sraf head->lio_refcnt--; 1434*2248Sraf head->lio_nent--; 1435*2248Sraf _aio_lio_free(head); 1436*2248Sraf if (rval != 0) 1437*2248Sraf _aio_req_free(reqp); 1438*2248Sraf return (rval); 1439*2248Sraf } 1440*2248Sraf 1441*2248Sraf /* 1442*2248Sraf * A worker is set idle when its work queue is empty. 1443*2248Sraf * The worker checks again that it has no more work 1444*2248Sraf * and then goes to sleep waiting for more work. 1445*2248Sraf */ 1446*2248Sraf int 1447*2248Sraf _aio_idle(aio_worker_t *aiowp) 1448*2248Sraf { 1449*2248Sraf int error = 0; 1450*2248Sraf 1451*2248Sraf sig_mutex_lock(&aiowp->work_qlock1); 1452*2248Sraf if (aiowp->work_count1 == 0) { 1453*2248Sraf ASSERT(aiowp->work_minload1 == 0); 1454*2248Sraf aiowp->work_idleflg = 1; 1455*2248Sraf /* 1456*2248Sraf * A cancellation handler is not needed here. 1457*2248Sraf * aio worker threads are never cancelled via pthread_cancel(). 1458*2248Sraf */ 1459*2248Sraf error = sig_cond_wait(&aiowp->work_idle_cv, 1460*2248Sraf &aiowp->work_qlock1); 1461*2248Sraf /* 1462*2248Sraf * The idle flag is normally cleared before worker is awakened 1463*2248Sraf * by aio_req_add(). On error (EINTR), we clear it ourself. 1464*2248Sraf */ 1465*2248Sraf if (error) 1466*2248Sraf aiowp->work_idleflg = 0; 1467*2248Sraf } 1468*2248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 1469*2248Sraf return (error); 1470*2248Sraf } 1471*2248Sraf 1472*2248Sraf /* 1473*2248Sraf * A worker's completed AIO requests are placed onto a global 1474*2248Sraf * done queue. The application is only sent a SIGIO signal if 1475*2248Sraf * the process has a handler enabled and it is not waiting via 1476*2248Sraf * aiowait(). 1477*2248Sraf */ 1478*2248Sraf static void 1479*2248Sraf _aio_work_done(aio_worker_t *aiowp) 1480*2248Sraf { 1481*2248Sraf aio_req_t *reqp; 1482*2248Sraf 1483*2248Sraf sig_mutex_lock(&aiowp->work_qlock1); 1484*2248Sraf reqp = aiowp->work_prev1; 1485*2248Sraf reqp->req_next = NULL; 1486*2248Sraf aiowp->work_done1 = 0; 1487*2248Sraf aiowp->work_tail1 = aiowp->work_next1; 1488*2248Sraf if (aiowp->work_tail1 == NULL) 1489*2248Sraf aiowp->work_head1 = NULL; 1490*2248Sraf aiowp->work_prev1 = NULL; 1491*2248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 1492*2248Sraf sig_mutex_lock(&__aio_mutex); 1493*2248Sraf _aio_donecnt++; 1494*2248Sraf _aio_outstand_cnt--; 1495*2248Sraf _aio_req_done_cnt--; 1496*2248Sraf ASSERT(_aio_donecnt > 0 && 1497*2248Sraf _aio_outstand_cnt >= 0 && 1498*2248Sraf _aio_req_done_cnt >= 0); 1499*2248Sraf ASSERT(reqp != NULL); 1500*2248Sraf 1501*2248Sraf if (_aio_done_tail == NULL) { 1502*2248Sraf _aio_done_head = _aio_done_tail = reqp; 1503*2248Sraf } else { 1504*2248Sraf _aio_done_head->req_next = reqp; 1505*2248Sraf _aio_done_head = reqp; 1506*2248Sraf } 1507*2248Sraf 1508*2248Sraf if (_aiowait_flag) { 1509*2248Sraf sig_mutex_unlock(&__aio_mutex); 1510*2248Sraf (void) _kaio(AIONOTIFY); 1511*2248Sraf } else { 1512*2248Sraf sig_mutex_unlock(&__aio_mutex); 1513*2248Sraf if (_sigio_enabled) 1514*2248Sraf (void) kill(__pid, SIGIO); 1515*2248Sraf } 1516*2248Sraf } 1517*2248Sraf 1518*2248Sraf /* 1519*2248Sraf * The done queue consists of AIO requests that are in either the 1520*2248Sraf * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled 1521*2248Sraf * are discarded. If the done queue is empty then NULL is returned. 1522*2248Sraf * Otherwise the address of a done aio_result_t is returned. 1523*2248Sraf */ 1524*2248Sraf aio_result_t * 1525*2248Sraf _aio_req_done(void) 1526*2248Sraf { 1527*2248Sraf aio_req_t *reqp; 1528*2248Sraf aio_result_t *resultp; 1529*2248Sraf 1530*2248Sraf ASSERT(MUTEX_HELD(&__aio_mutex)); 1531*2248Sraf 1532*2248Sraf if ((reqp = _aio_done_tail) != NULL) { 1533*2248Sraf if ((_aio_done_tail = reqp->req_next) == NULL) 1534*2248Sraf _aio_done_head = NULL; 1535*2248Sraf ASSERT(_aio_donecnt > 0); 1536*2248Sraf _aio_donecnt--; 1537*2248Sraf (void) _aio_hash_del(reqp->req_resultp); 1538*2248Sraf resultp = reqp->req_resultp; 1539*2248Sraf ASSERT(reqp->req_state == AIO_REQ_DONE); 1540*2248Sraf _aio_req_free(reqp); 1541*2248Sraf return (resultp); 1542*2248Sraf } 1543*2248Sraf /* is queue empty? */ 1544*2248Sraf if (reqp == NULL && _aio_outstand_cnt == 0) { 1545*2248Sraf return ((aio_result_t *)-1); 1546*2248Sraf } 1547*2248Sraf return (NULL); 1548*2248Sraf } 1549*2248Sraf 1550*2248Sraf /* 1551*2248Sraf * Set the return and errno values for the application's use. 1552*2248Sraf * 1553*2248Sraf * For the Posix interfaces, we must set the return value first followed 1554*2248Sraf * by the errno value because the Posix interfaces allow for a change 1555*2248Sraf * in the errno value from EINPROGRESS to something else to signal 1556*2248Sraf * the completion of the asynchronous request. 1557*2248Sraf * 1558*2248Sraf * The opposite is true for the Solaris interfaces. These allow for 1559*2248Sraf * a change in the return value from AIO_INPROGRESS to something else 1560*2248Sraf * to signal the completion of the asynchronous request. 1561*2248Sraf */ 1562*2248Sraf void 1563*2248Sraf _aio_set_result(aio_req_t *reqp, ssize_t retval, int error) 1564*2248Sraf { 1565*2248Sraf aio_result_t *resultp = reqp->req_resultp; 1566*2248Sraf 1567*2248Sraf if (POSIX_AIO(reqp)) { 1568*2248Sraf resultp->aio_return = retval; 1569*2248Sraf membar_producer(); 1570*2248Sraf resultp->aio_errno = error; 1571*2248Sraf } else { 1572*2248Sraf resultp->aio_errno = error; 1573*2248Sraf membar_producer(); 1574*2248Sraf resultp->aio_return = retval; 1575*2248Sraf } 1576*2248Sraf } 1577*2248Sraf 1578*2248Sraf /* 1579*2248Sraf * Add an AIO request onto the next work queue. 1580*2248Sraf * A circular list of workers is used to choose the next worker. 1581*2248Sraf */ 1582*2248Sraf void 1583*2248Sraf _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode) 1584*2248Sraf { 1585*2248Sraf ulwp_t *self = curthread; 1586*2248Sraf aio_worker_t *aiowp; 1587*2248Sraf aio_worker_t *first; 1588*2248Sraf int load_bal_flg = 1; 1589*2248Sraf int found; 1590*2248Sraf 1591*2248Sraf ASSERT(reqp->req_state != AIO_REQ_DONEQ); 1592*2248Sraf reqp->req_next = NULL; 1593*2248Sraf /* 1594*2248Sraf * Try to acquire the next worker's work queue. If it is locked, 1595*2248Sraf * then search the list of workers until a queue is found unlocked, 1596*2248Sraf * or until the list is completely traversed at which point another 1597*2248Sraf * worker will be created. 1598*2248Sraf */ 1599*2248Sraf sigoff(self); /* defer SIGIO */ 1600*2248Sraf sig_mutex_lock(&__aio_mutex); 1601*2248Sraf first = aiowp = *nextworker; 1602*2248Sraf if (mode != AIONOTIFY) 1603*2248Sraf _aio_outstand_cnt++; 1604*2248Sraf sig_mutex_unlock(&__aio_mutex); 1605*2248Sraf 1606*2248Sraf switch (mode) { 1607*2248Sraf case AIOREAD: 1608*2248Sraf case AIOWRITE: 1609*2248Sraf case AIOAREAD: 1610*2248Sraf case AIOAWRITE: 1611*2248Sraf #if !defined(_LP64) 1612*2248Sraf case AIOAREAD64: 1613*2248Sraf case AIOAWRITE64: 1614*2248Sraf #endif 1615*2248Sraf /* try to find an idle worker */ 1616*2248Sraf found = 0; 1617*2248Sraf do { 1618*2248Sraf if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1619*2248Sraf if (aiowp->work_idleflg) { 1620*2248Sraf found = 1; 1621*2248Sraf break; 1622*2248Sraf } 1623*2248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 1624*2248Sraf } 1625*2248Sraf } while ((aiowp = aiowp->work_forw) != first); 1626*2248Sraf 1627*2248Sraf if (found) { 1628*2248Sraf aiowp->work_minload1++; 1629*2248Sraf break; 1630*2248Sraf } 1631*2248Sraf 1632*2248Sraf /* try to acquire some worker's queue lock */ 1633*2248Sraf do { 1634*2248Sraf if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1635*2248Sraf found = 1; 1636*2248Sraf break; 1637*2248Sraf } 1638*2248Sraf } while ((aiowp = aiowp->work_forw) != first); 1639*2248Sraf 1640*2248Sraf /* 1641*2248Sraf * Create more workers when the workers appear overloaded. 1642*2248Sraf * Either all the workers are busy draining their queues 1643*2248Sraf * or no worker's queue lock could be acquired. 1644*2248Sraf */ 1645*2248Sraf if (!found) { 1646*2248Sraf if (_aio_worker_cnt < _max_workers) { 1647*2248Sraf if (_aio_create_worker(reqp, mode)) 1648*2248Sraf aio_panic("_aio_req_add: add worker"); 1649*2248Sraf sigon(self); /* reenable SIGIO */ 1650*2248Sraf return; 1651*2248Sraf } 1652*2248Sraf 1653*2248Sraf /* 1654*2248Sraf * No worker available and we have created 1655*2248Sraf * _max_workers, keep going through the 1656*2248Sraf * list slowly until we get a lock 1657*2248Sraf */ 1658*2248Sraf while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) { 1659*2248Sraf /* 1660*2248Sraf * give someone else a chance 1661*2248Sraf */ 1662*2248Sraf _aio_delay(1); 1663*2248Sraf aiowp = aiowp->work_forw; 1664*2248Sraf } 1665*2248Sraf } 1666*2248Sraf 1667*2248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1668*2248Sraf if (_aio_worker_cnt < _max_workers && 1669*2248Sraf aiowp->work_minload1 >= _minworkload) { 1670*2248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 1671*2248Sraf sig_mutex_lock(&__aio_mutex); 1672*2248Sraf *nextworker = aiowp->work_forw; 1673*2248Sraf sig_mutex_unlock(&__aio_mutex); 1674*2248Sraf if (_aio_create_worker(reqp, mode)) 1675*2248Sraf aio_panic("aio_req_add: add worker"); 1676*2248Sraf sigon(self); /* reenable SIGIO */ 1677*2248Sraf return; 1678*2248Sraf } 1679*2248Sraf aiowp->work_minload1++; 1680*2248Sraf break; 1681*2248Sraf case AIOFSYNC: 1682*2248Sraf case AIONOTIFY: 1683*2248Sraf load_bal_flg = 0; 1684*2248Sraf sig_mutex_lock(&aiowp->work_qlock1); 1685*2248Sraf break; 1686*2248Sraf default: 1687*2248Sraf aio_panic("_aio_req_add: invalid mode"); 1688*2248Sraf break; 1689*2248Sraf } 1690*2248Sraf /* 1691*2248Sraf * Put request onto worker's work queue. 1692*2248Sraf */ 1693*2248Sraf if (aiowp->work_tail1 == NULL) { 1694*2248Sraf ASSERT(aiowp->work_count1 == 0); 1695*2248Sraf aiowp->work_tail1 = reqp; 1696*2248Sraf aiowp->work_next1 = reqp; 1697*2248Sraf } else { 1698*2248Sraf aiowp->work_head1->req_next = reqp; 1699*2248Sraf if (aiowp->work_next1 == NULL) 1700*2248Sraf aiowp->work_next1 = reqp; 1701*2248Sraf } 1702*2248Sraf reqp->req_state = AIO_REQ_QUEUED; 1703*2248Sraf reqp->req_worker = aiowp; 1704*2248Sraf aiowp->work_head1 = reqp; 1705*2248Sraf /* 1706*2248Sraf * Awaken worker if it is not currently active. 1707*2248Sraf */ 1708*2248Sraf if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) { 1709*2248Sraf aiowp->work_idleflg = 0; 1710*2248Sraf (void) cond_signal(&aiowp->work_idle_cv); 1711*2248Sraf } 1712*2248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 1713*2248Sraf 1714*2248Sraf if (load_bal_flg) { 1715*2248Sraf sig_mutex_lock(&__aio_mutex); 1716*2248Sraf *nextworker = aiowp->work_forw; 1717*2248Sraf sig_mutex_unlock(&__aio_mutex); 1718*2248Sraf } 1719*2248Sraf sigon(self); /* reenable SIGIO */ 1720*2248Sraf } 1721*2248Sraf 1722*2248Sraf /* 1723*2248Sraf * Get an AIO request for a specified worker. 1724*2248Sraf * If the work queue is empty, return NULL. 1725*2248Sraf */ 1726*2248Sraf aio_req_t * 1727*2248Sraf _aio_req_get(aio_worker_t *aiowp) 1728*2248Sraf { 1729*2248Sraf aio_req_t *reqp; 1730*2248Sraf 1731*2248Sraf sig_mutex_lock(&aiowp->work_qlock1); 1732*2248Sraf if ((reqp = aiowp->work_next1) != NULL) { 1733*2248Sraf /* 1734*2248Sraf * Remove a POSIX request from the queue; the 1735*2248Sraf * request queue is a singularly linked list 1736*2248Sraf * with a previous pointer. The request is 1737*2248Sraf * removed by updating the previous pointer. 1738*2248Sraf * 1739*2248Sraf * Non-posix requests are left on the queue 1740*2248Sraf * to eventually be placed on the done queue. 1741*2248Sraf */ 1742*2248Sraf 1743*2248Sraf if (POSIX_AIO(reqp)) { 1744*2248Sraf if (aiowp->work_prev1 == NULL) { 1745*2248Sraf aiowp->work_tail1 = reqp->req_next; 1746*2248Sraf if (aiowp->work_tail1 == NULL) 1747*2248Sraf aiowp->work_head1 = NULL; 1748*2248Sraf } else { 1749*2248Sraf aiowp->work_prev1->req_next = reqp->req_next; 1750*2248Sraf if (aiowp->work_head1 == reqp) 1751*2248Sraf aiowp->work_head1 = reqp->req_next; 1752*2248Sraf } 1753*2248Sraf 1754*2248Sraf } else { 1755*2248Sraf aiowp->work_prev1 = reqp; 1756*2248Sraf ASSERT(aiowp->work_done1 >= 0); 1757*2248Sraf aiowp->work_done1++; 1758*2248Sraf } 1759*2248Sraf ASSERT(reqp != reqp->req_next); 1760*2248Sraf aiowp->work_next1 = reqp->req_next; 1761*2248Sraf ASSERT(aiowp->work_count1 >= 1); 1762*2248Sraf aiowp->work_count1--; 1763*2248Sraf switch (reqp->req_op) { 1764*2248Sraf case AIOREAD: 1765*2248Sraf case AIOWRITE: 1766*2248Sraf case AIOAREAD: 1767*2248Sraf case AIOAWRITE: 1768*2248Sraf #if !defined(_LP64) 1769*2248Sraf case AIOAREAD64: 1770*2248Sraf case AIOAWRITE64: 1771*2248Sraf #endif 1772*2248Sraf ASSERT(aiowp->work_minload1 > 0); 1773*2248Sraf aiowp->work_minload1--; 1774*2248Sraf break; 1775*2248Sraf } 1776*2248Sraf reqp->req_state = AIO_REQ_INPROGRESS; 1777*2248Sraf } 1778*2248Sraf aiowp->work_req = reqp; 1779*2248Sraf ASSERT(reqp != NULL || aiowp->work_count1 == 0); 1780*2248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 1781*2248Sraf return (reqp); 1782*2248Sraf } 1783*2248Sraf 1784*2248Sraf static void 1785*2248Sraf _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate) 1786*2248Sraf { 1787*2248Sraf aio_req_t **last; 1788*2248Sraf aio_req_t *lastrp; 1789*2248Sraf aio_req_t *next; 1790*2248Sraf 1791*2248Sraf ASSERT(aiowp != NULL); 1792*2248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1793*2248Sraf if (POSIX_AIO(reqp)) { 1794*2248Sraf if (ostate != AIO_REQ_QUEUED) 1795*2248Sraf return; 1796*2248Sraf } 1797*2248Sraf last = &aiowp->work_tail1; 1798*2248Sraf lastrp = aiowp->work_tail1; 1799*2248Sraf ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS); 1800*2248Sraf while ((next = *last) != NULL) { 1801*2248Sraf if (next == reqp) { 1802*2248Sraf *last = next->req_next; 1803*2248Sraf if (aiowp->work_next1 == next) 1804*2248Sraf aiowp->work_next1 = next->req_next; 1805*2248Sraf 1806*2248Sraf if ((next->req_next != NULL) || 1807*2248Sraf (aiowp->work_done1 == 0)) { 1808*2248Sraf if (aiowp->work_head1 == next) 1809*2248Sraf aiowp->work_head1 = next->req_next; 1810*2248Sraf if (aiowp->work_prev1 == next) 1811*2248Sraf aiowp->work_prev1 = next->req_next; 1812*2248Sraf } else { 1813*2248Sraf if (aiowp->work_head1 == next) 1814*2248Sraf aiowp->work_head1 = lastrp; 1815*2248Sraf if (aiowp->work_prev1 == next) 1816*2248Sraf aiowp->work_prev1 = lastrp; 1817*2248Sraf } 1818*2248Sraf 1819*2248Sraf if (ostate == AIO_REQ_QUEUED) { 1820*2248Sraf ASSERT(aiowp->work_count1 >= 1); 1821*2248Sraf aiowp->work_count1--; 1822*2248Sraf ASSERT(aiowp->work_minload1 >= 1); 1823*2248Sraf aiowp->work_minload1--; 1824*2248Sraf } else { 1825*2248Sraf ASSERT(ostate == AIO_REQ_INPROGRESS && 1826*2248Sraf !POSIX_AIO(reqp)); 1827*2248Sraf aiowp->work_done1--; 1828*2248Sraf } 1829*2248Sraf return; 1830*2248Sraf } 1831*2248Sraf last = &next->req_next; 1832*2248Sraf lastrp = next; 1833*2248Sraf } 1834*2248Sraf /* NOTREACHED */ 1835*2248Sraf } 1836*2248Sraf 1837*2248Sraf static void 1838*2248Sraf _aio_enq_doneq(aio_req_t *reqp) 1839*2248Sraf { 1840*2248Sraf if (_aio_doneq == NULL) { 1841*2248Sraf _aio_doneq = reqp; 1842*2248Sraf reqp->req_next = reqp->req_prev = reqp; 1843*2248Sraf } else { 1844*2248Sraf reqp->req_next = _aio_doneq; 1845*2248Sraf reqp->req_prev = _aio_doneq->req_prev; 1846*2248Sraf _aio_doneq->req_prev->req_next = reqp; 1847*2248Sraf _aio_doneq->req_prev = reqp; 1848*2248Sraf } 1849*2248Sraf reqp->req_state = AIO_REQ_DONEQ; 1850*2248Sraf _aio_doneq_cnt++; 1851*2248Sraf } 1852*2248Sraf 1853*2248Sraf /* 1854*2248Sraf * caller owns the _aio_mutex 1855*2248Sraf */ 1856*2248Sraf aio_req_t * 1857*2248Sraf _aio_req_remove(aio_req_t *reqp) 1858*2248Sraf { 1859*2248Sraf if (reqp && reqp->req_state != AIO_REQ_DONEQ) 1860*2248Sraf return (NULL); 1861*2248Sraf 1862*2248Sraf if (reqp) { 1863*2248Sraf /* request in done queue */ 1864*2248Sraf if (_aio_doneq == reqp) 1865*2248Sraf _aio_doneq = reqp->req_next; 1866*2248Sraf if (_aio_doneq == reqp) { 1867*2248Sraf /* only one request on queue */ 1868*2248Sraf _aio_doneq = NULL; 1869*2248Sraf } else { 1870*2248Sraf aio_req_t *tmp = reqp->req_next; 1871*2248Sraf reqp->req_prev->req_next = tmp; 1872*2248Sraf tmp->req_prev = reqp->req_prev; 1873*2248Sraf } 1874*2248Sraf } else if ((reqp = _aio_doneq) != NULL) { 1875*2248Sraf if (reqp == reqp->req_next) { 1876*2248Sraf /* only one request on queue */ 1877*2248Sraf _aio_doneq = NULL; 1878*2248Sraf } else { 1879*2248Sraf reqp->req_prev->req_next = _aio_doneq = reqp->req_next; 1880*2248Sraf _aio_doneq->req_prev = reqp->req_prev; 1881*2248Sraf } 1882*2248Sraf } 1883*2248Sraf if (reqp) { 1884*2248Sraf _aio_doneq_cnt--; 1885*2248Sraf reqp->req_next = reqp->req_prev = reqp; 1886*2248Sraf reqp->req_state = AIO_REQ_DONE; 1887*2248Sraf } 1888*2248Sraf return (reqp); 1889*2248Sraf } 1890*2248Sraf 1891*2248Sraf /* 1892*2248Sraf * An AIO request is identified by an aio_result_t pointer. The library 1893*2248Sraf * maps this aio_result_t pointer to its internal representation using a 1894*2248Sraf * hash table. This function adds an aio_result_t pointer to the hash table. 1895*2248Sraf */ 1896*2248Sraf static int 1897*2248Sraf _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp) 1898*2248Sraf { 1899*2248Sraf aio_hash_t *hashp; 1900*2248Sraf aio_req_t **prev; 1901*2248Sraf aio_req_t *next; 1902*2248Sraf 1903*2248Sraf hashp = _aio_hash + AIOHASH(resultp); 1904*2248Sraf lmutex_lock(&hashp->hash_lock); 1905*2248Sraf prev = &hashp->hash_ptr; 1906*2248Sraf while ((next = *prev) != NULL) { 1907*2248Sraf if (resultp == next->req_resultp) { 1908*2248Sraf lmutex_unlock(&hashp->hash_lock); 1909*2248Sraf return (-1); 1910*2248Sraf } 1911*2248Sraf prev = &next->req_link; 1912*2248Sraf } 1913*2248Sraf *prev = reqp; 1914*2248Sraf ASSERT(reqp->req_link == NULL); 1915*2248Sraf lmutex_unlock(&hashp->hash_lock); 1916*2248Sraf return (0); 1917*2248Sraf } 1918*2248Sraf 1919*2248Sraf /* 1920*2248Sraf * Remove an entry from the hash table. 1921*2248Sraf */ 1922*2248Sraf aio_req_t * 1923*2248Sraf _aio_hash_del(aio_result_t *resultp) 1924*2248Sraf { 1925*2248Sraf aio_hash_t *hashp; 1926*2248Sraf aio_req_t **prev; 1927*2248Sraf aio_req_t *next = NULL; 1928*2248Sraf 1929*2248Sraf if (_aio_hash != NULL) { 1930*2248Sraf hashp = _aio_hash + AIOHASH(resultp); 1931*2248Sraf lmutex_lock(&hashp->hash_lock); 1932*2248Sraf prev = &hashp->hash_ptr; 1933*2248Sraf while ((next = *prev) != NULL) { 1934*2248Sraf if (resultp == next->req_resultp) { 1935*2248Sraf *prev = next->req_link; 1936*2248Sraf next->req_link = NULL; 1937*2248Sraf break; 1938*2248Sraf } 1939*2248Sraf prev = &next->req_link; 1940*2248Sraf } 1941*2248Sraf lmutex_unlock(&hashp->hash_lock); 1942*2248Sraf } 1943*2248Sraf return (next); 1944*2248Sraf } 1945*2248Sraf 1946*2248Sraf /* 1947*2248Sraf * find an entry in the hash table 1948*2248Sraf */ 1949*2248Sraf aio_req_t * 1950*2248Sraf _aio_hash_find(aio_result_t *resultp) 1951*2248Sraf { 1952*2248Sraf aio_hash_t *hashp; 1953*2248Sraf aio_req_t **prev; 1954*2248Sraf aio_req_t *next = NULL; 1955*2248Sraf 1956*2248Sraf if (_aio_hash != NULL) { 1957*2248Sraf hashp = _aio_hash + AIOHASH(resultp); 1958*2248Sraf lmutex_lock(&hashp->hash_lock); 1959*2248Sraf prev = &hashp->hash_ptr; 1960*2248Sraf while ((next = *prev) != NULL) { 1961*2248Sraf if (resultp == next->req_resultp) 1962*2248Sraf break; 1963*2248Sraf prev = &next->req_link; 1964*2248Sraf } 1965*2248Sraf lmutex_unlock(&hashp->hash_lock); 1966*2248Sraf } 1967*2248Sraf return (next); 1968*2248Sraf } 1969*2248Sraf 1970*2248Sraf /* 1971*2248Sraf * AIO interface for POSIX 1972*2248Sraf */ 1973*2248Sraf int 1974*2248Sraf _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 1975*2248Sraf int mode, int flg) 1976*2248Sraf { 1977*2248Sraf aio_req_t *reqp; 1978*2248Sraf aio_args_t *ap; 1979*2248Sraf int kerr; 1980*2248Sraf 1981*2248Sraf if (aiocbp == NULL) { 1982*2248Sraf errno = EINVAL; 1983*2248Sraf return (-1); 1984*2248Sraf } 1985*2248Sraf 1986*2248Sraf /* initialize kaio */ 1987*2248Sraf if (!_kaio_ok) 1988*2248Sraf _kaio_init(); 1989*2248Sraf 1990*2248Sraf aiocbp->aio_state = NOCHECK; 1991*2248Sraf 1992*2248Sraf /* 1993*2248Sraf * If we have been called because a list I/O 1994*2248Sraf * kaio() failed, we dont want to repeat the 1995*2248Sraf * system call 1996*2248Sraf */ 1997*2248Sraf 1998*2248Sraf if (flg & AIO_KAIO) { 1999*2248Sraf /* 2000*2248Sraf * Try kernel aio first. 2001*2248Sraf * If errno is ENOTSUP/EBADFD, 2002*2248Sraf * fall back to the thread implementation. 2003*2248Sraf */ 2004*2248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2005*2248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2006*2248Sraf aiocbp->aio_state = CHECK; 2007*2248Sraf kerr = (int)_kaio(mode, aiocbp); 2008*2248Sraf if (kerr == 0) 2009*2248Sraf return (0); 2010*2248Sraf if (errno != ENOTSUP && errno != EBADFD) { 2011*2248Sraf aiocbp->aio_resultp.aio_errno = errno; 2012*2248Sraf aiocbp->aio_resultp.aio_return = -1; 2013*2248Sraf aiocbp->aio_state = NOCHECK; 2014*2248Sraf return (-1); 2015*2248Sraf } 2016*2248Sraf if (errno == EBADFD) 2017*2248Sraf SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2018*2248Sraf } 2019*2248Sraf } 2020*2248Sraf 2021*2248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2022*2248Sraf aiocbp->aio_state = USERAIO; 2023*2248Sraf 2024*2248Sraf if (!__uaio_ok && __uaio_init() == -1) 2025*2248Sraf return (-1); 2026*2248Sraf 2027*2248Sraf if ((reqp = _aio_req_alloc()) == NULL) { 2028*2248Sraf errno = EAGAIN; 2029*2248Sraf return (-1); 2030*2248Sraf } 2031*2248Sraf 2032*2248Sraf /* 2033*2248Sraf * If an LIO request, add the list head to the aio request 2034*2248Sraf */ 2035*2248Sraf reqp->req_head = lio_head; 2036*2248Sraf reqp->req_type = AIO_POSIX_REQ; 2037*2248Sraf reqp->req_op = mode; 2038*2248Sraf reqp->req_largefile = 0; 2039*2248Sraf 2040*2248Sraf if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2041*2248Sraf reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2042*2248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2043*2248Sraf reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2044*2248Sraf reqp->req_sigevent.sigev_signo = 2045*2248Sraf aiocbp->aio_sigevent.sigev_signo; 2046*2248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 2047*2248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 2048*2248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2049*2248Sraf port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2050*2248Sraf reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2051*2248Sraf /* 2052*2248Sraf * Reuse the sigevent structure to contain the port number 2053*2248Sraf * and the user value. Same for SIGEV_THREAD, below. 2054*2248Sraf */ 2055*2248Sraf reqp->req_sigevent.sigev_signo = 2056*2248Sraf pn->portnfy_port; 2057*2248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 2058*2248Sraf pn->portnfy_user; 2059*2248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2060*2248Sraf reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2061*2248Sraf /* 2062*2248Sraf * The sigevent structure contains the port number 2063*2248Sraf * and the user value. Same for SIGEV_PORT, above. 2064*2248Sraf */ 2065*2248Sraf reqp->req_sigevent.sigev_signo = 2066*2248Sraf aiocbp->aio_sigevent.sigev_signo; 2067*2248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 2068*2248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 2069*2248Sraf } 2070*2248Sraf 2071*2248Sraf reqp->req_resultp = &aiocbp->aio_resultp; 2072*2248Sraf reqp->req_aiocbp = aiocbp; 2073*2248Sraf ap = &reqp->req_args; 2074*2248Sraf ap->fd = aiocbp->aio_fildes; 2075*2248Sraf ap->buf = (caddr_t)aiocbp->aio_buf; 2076*2248Sraf ap->bufsz = aiocbp->aio_nbytes; 2077*2248Sraf ap->offset = aiocbp->aio_offset; 2078*2248Sraf 2079*2248Sraf if ((flg & AIO_NO_DUPS) && 2080*2248Sraf _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2081*2248Sraf aio_panic("_aio_rw(): request already in hash table"); 2082*2248Sraf _aio_req_free(reqp); 2083*2248Sraf errno = EINVAL; 2084*2248Sraf return (-1); 2085*2248Sraf } 2086*2248Sraf _aio_req_add(reqp, nextworker, mode); 2087*2248Sraf return (0); 2088*2248Sraf } 2089*2248Sraf 2090*2248Sraf #if !defined(_LP64) 2091*2248Sraf /* 2092*2248Sraf * 64-bit AIO interface for POSIX 2093*2248Sraf */ 2094*2248Sraf int 2095*2248Sraf _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 2096*2248Sraf int mode, int flg) 2097*2248Sraf { 2098*2248Sraf aio_req_t *reqp; 2099*2248Sraf aio_args_t *ap; 2100*2248Sraf int kerr; 2101*2248Sraf 2102*2248Sraf if (aiocbp == NULL) { 2103*2248Sraf errno = EINVAL; 2104*2248Sraf return (-1); 2105*2248Sraf } 2106*2248Sraf 2107*2248Sraf /* initialize kaio */ 2108*2248Sraf if (!_kaio_ok) 2109*2248Sraf _kaio_init(); 2110*2248Sraf 2111*2248Sraf aiocbp->aio_state = NOCHECK; 2112*2248Sraf 2113*2248Sraf /* 2114*2248Sraf * If we have been called because a list I/O 2115*2248Sraf * kaio() failed, we dont want to repeat the 2116*2248Sraf * system call 2117*2248Sraf */ 2118*2248Sraf 2119*2248Sraf if (flg & AIO_KAIO) { 2120*2248Sraf /* 2121*2248Sraf * Try kernel aio first. 2122*2248Sraf * If errno is ENOTSUP/EBADFD, 2123*2248Sraf * fall back to the thread implementation. 2124*2248Sraf */ 2125*2248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2126*2248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2127*2248Sraf aiocbp->aio_state = CHECK; 2128*2248Sraf kerr = (int)_kaio(mode, aiocbp); 2129*2248Sraf if (kerr == 0) 2130*2248Sraf return (0); 2131*2248Sraf if (errno != ENOTSUP && errno != EBADFD) { 2132*2248Sraf aiocbp->aio_resultp.aio_errno = errno; 2133*2248Sraf aiocbp->aio_resultp.aio_return = -1; 2134*2248Sraf aiocbp->aio_state = NOCHECK; 2135*2248Sraf return (-1); 2136*2248Sraf } 2137*2248Sraf if (errno == EBADFD) 2138*2248Sraf SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2139*2248Sraf } 2140*2248Sraf } 2141*2248Sraf 2142*2248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2143*2248Sraf aiocbp->aio_state = USERAIO; 2144*2248Sraf 2145*2248Sraf if (!__uaio_ok && __uaio_init() == -1) 2146*2248Sraf return (-1); 2147*2248Sraf 2148*2248Sraf if ((reqp = _aio_req_alloc()) == NULL) { 2149*2248Sraf errno = EAGAIN; 2150*2248Sraf return (-1); 2151*2248Sraf } 2152*2248Sraf 2153*2248Sraf /* 2154*2248Sraf * If an LIO request, add the list head to the aio request 2155*2248Sraf */ 2156*2248Sraf reqp->req_head = lio_head; 2157*2248Sraf reqp->req_type = AIO_POSIX_REQ; 2158*2248Sraf reqp->req_op = mode; 2159*2248Sraf reqp->req_largefile = 1; 2160*2248Sraf 2161*2248Sraf if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2162*2248Sraf reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2163*2248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2164*2248Sraf reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2165*2248Sraf reqp->req_sigevent.sigev_signo = 2166*2248Sraf aiocbp->aio_sigevent.sigev_signo; 2167*2248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 2168*2248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 2169*2248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2170*2248Sraf port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2171*2248Sraf reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2172*2248Sraf reqp->req_sigevent.sigev_signo = 2173*2248Sraf pn->portnfy_port; 2174*2248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 2175*2248Sraf pn->portnfy_user; 2176*2248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2177*2248Sraf reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2178*2248Sraf reqp->req_sigevent.sigev_signo = 2179*2248Sraf aiocbp->aio_sigevent.sigev_signo; 2180*2248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 2181*2248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 2182*2248Sraf } 2183*2248Sraf 2184*2248Sraf reqp->req_resultp = &aiocbp->aio_resultp; 2185*2248Sraf reqp->req_aiocbp = aiocbp; 2186*2248Sraf ap = &reqp->req_args; 2187*2248Sraf ap->fd = aiocbp->aio_fildes; 2188*2248Sraf ap->buf = (caddr_t)aiocbp->aio_buf; 2189*2248Sraf ap->bufsz = aiocbp->aio_nbytes; 2190*2248Sraf ap->offset = aiocbp->aio_offset; 2191*2248Sraf 2192*2248Sraf if ((flg & AIO_NO_DUPS) && 2193*2248Sraf _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2194*2248Sraf aio_panic("_aio_rw64(): request already in hash table"); 2195*2248Sraf _aio_req_free(reqp); 2196*2248Sraf errno = EINVAL; 2197*2248Sraf return (-1); 2198*2248Sraf } 2199*2248Sraf _aio_req_add(reqp, nextworker, mode); 2200*2248Sraf return (0); 2201*2248Sraf } 2202*2248Sraf #endif /* !defined(_LP64) */ 2203