xref: /onnv-gate/usr/src/lib/libc/port/aio/aio.c (revision 2248:4609e8bb25ad)
1*2248Sraf /*
2*2248Sraf  * CDDL HEADER START
3*2248Sraf  *
4*2248Sraf  * The contents of this file are subject to the terms of the
5*2248Sraf  * Common Development and Distribution License (the "License").
6*2248Sraf  * You may not use this file except in compliance with the License.
7*2248Sraf  *
8*2248Sraf  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9*2248Sraf  * or http://www.opensolaris.org/os/licensing.
10*2248Sraf  * See the License for the specific language governing permissions
11*2248Sraf  * and limitations under the License.
12*2248Sraf  *
13*2248Sraf  * When distributing Covered Code, include this CDDL HEADER in each
14*2248Sraf  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15*2248Sraf  * If applicable, add the following below this CDDL HEADER, with the
16*2248Sraf  * fields enclosed by brackets "[]" replaced with your own identifying
17*2248Sraf  * information: Portions Copyright [yyyy] [name of copyright owner]
18*2248Sraf  *
19*2248Sraf  * CDDL HEADER END
20*2248Sraf  */
21*2248Sraf 
22*2248Sraf /*
23*2248Sraf  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
24*2248Sraf  * Use is subject to license terms.
25*2248Sraf  */
26*2248Sraf 
27*2248Sraf #pragma ident	"%Z%%M%	%I%	%E% SMI"
28*2248Sraf 
29*2248Sraf #include "synonyms.h"
30*2248Sraf #include "thr_uberdata.h"
31*2248Sraf #include "asyncio.h"
32*2248Sraf #include <atomic.h>
33*2248Sraf #include <sys/param.h>
34*2248Sraf #include <sys/file.h>
35*2248Sraf #include <sys/port.h>
36*2248Sraf 
37*2248Sraf static int _aio_hash_insert(aio_result_t *, aio_req_t *);
38*2248Sraf static aio_req_t *_aio_req_get(aio_worker_t *);
39*2248Sraf static void _aio_req_add(aio_req_t *, aio_worker_t **, int);
40*2248Sraf static void _aio_req_del(aio_worker_t *, aio_req_t *, int);
41*2248Sraf static void _aio_work_done(aio_worker_t *);
42*2248Sraf static void _aio_enq_doneq(aio_req_t *);
43*2248Sraf 
44*2248Sraf extern void _aio_lio_free(aio_lio_t *);
45*2248Sraf 
46*2248Sraf extern int __fdsync(int, int);
47*2248Sraf extern int _port_dispatch(int, int, int, int, uintptr_t, void *);
48*2248Sraf 
49*2248Sraf static int _aio_fsync_del(aio_worker_t *, aio_req_t *);
50*2248Sraf static void _aiodone(aio_req_t *, ssize_t, int);
51*2248Sraf static void _aio_cancel_work(aio_worker_t *, int, int *, int *);
52*2248Sraf static void _aio_finish_request(aio_worker_t *, ssize_t, int);
53*2248Sraf 
54*2248Sraf /*
55*2248Sraf  * switch for kernel async I/O
56*2248Sraf  */
57*2248Sraf int _kaio_ok = 0;		/* 0 = disabled, 1 = on, -1 = error */
58*2248Sraf 
59*2248Sraf /*
60*2248Sraf  * Key for thread-specific data
61*2248Sraf  */
62*2248Sraf pthread_key_t _aio_key;
63*2248Sraf 
64*2248Sraf /*
65*2248Sraf  * Array for determining whether or not a file supports kaio.
66*2248Sraf  * Initialized in _kaio_init().
67*2248Sraf  */
68*2248Sraf uint32_t *_kaio_supported = NULL;
69*2248Sraf 
70*2248Sraf /*
71*2248Sraf  *  workers for read/write requests
72*2248Sraf  * (__aio_mutex lock protects circular linked list of workers)
73*2248Sraf  */
74*2248Sraf aio_worker_t *__workers_rw;	/* circular list of AIO workers */
75*2248Sraf aio_worker_t *__nextworker_rw;	/* next worker in list of workers */
76*2248Sraf int __rw_workerscnt;		/* number of read/write workers */
77*2248Sraf 
78*2248Sraf /*
79*2248Sraf  * worker for notification requests.
80*2248Sraf  */
81*2248Sraf aio_worker_t *__workers_no;	/* circular list of AIO workers */
82*2248Sraf aio_worker_t *__nextworker_no;	/* next worker in list of workers */
83*2248Sraf int __no_workerscnt;		/* number of write workers */
84*2248Sraf 
85*2248Sraf aio_req_t *_aio_done_tail;		/* list of done requests */
86*2248Sraf aio_req_t *_aio_done_head;
87*2248Sraf 
88*2248Sraf mutex_t __aio_initlock = DEFAULTMUTEX;	/* makes aio initialization atomic */
89*2248Sraf cond_t __aio_initcv = DEFAULTCV;
90*2248Sraf int __aio_initbusy = 0;
91*2248Sraf 
92*2248Sraf mutex_t __aio_mutex = DEFAULTMUTEX;	/* protects counts, and linked lists */
93*2248Sraf cond_t _aio_iowait_cv = DEFAULTCV;	/* wait for userland I/Os */
94*2248Sraf 
95*2248Sraf pid_t __pid = (pid_t)-1;		/* initialize as invalid pid */
96*2248Sraf int _sigio_enabled = 0;			/* when set, send SIGIO signal */
97*2248Sraf 
98*2248Sraf aio_hash_t *_aio_hash;
99*2248Sraf 
100*2248Sraf aio_req_t *_aio_doneq;			/* double linked done queue list */
101*2248Sraf 
102*2248Sraf int _aio_donecnt = 0;
103*2248Sraf int _aio_waitncnt = 0;			/* # of requests for aio_waitn */
104*2248Sraf int _aio_doneq_cnt = 0;
105*2248Sraf int _aio_outstand_cnt = 0;		/* # of outstanding requests */
106*2248Sraf int _kaio_outstand_cnt = 0;		/* # of outstanding kaio requests */
107*2248Sraf int _aio_req_done_cnt = 0;		/* req. done but not in "done queue" */
108*2248Sraf int _aio_kernel_suspend = 0;		/* active kernel kaio calls */
109*2248Sraf int _aio_suscv_cnt = 0;			/* aio_suspend calls waiting on cv's */
110*2248Sraf 
111*2248Sraf int _max_workers = 256;			/* max number of workers permitted */
112*2248Sraf int _min_workers = 4;			/* min number of workers */
113*2248Sraf int _minworkload = 2;			/* min number of request in q */
114*2248Sraf int _aio_worker_cnt = 0;		/* number of workers to do requests */
115*2248Sraf int __uaio_ok = 0;			/* AIO has been enabled */
116*2248Sraf sigset_t _worker_set;			/* worker's signal mask */
117*2248Sraf 
118*2248Sraf int _aiowait_flag = 0;			/* when set, aiowait() is inprogress */
119*2248Sraf int _aio_flags = 0;			/* see asyncio.h defines for */
120*2248Sraf 
121*2248Sraf aio_worker_t *_kaiowp = NULL;		/* points to kaio cleanup thread */
122*2248Sraf 
123*2248Sraf int hz;					/* clock ticks per second */
124*2248Sraf 
125*2248Sraf static int
126*2248Sraf _kaio_supported_init(void)
127*2248Sraf {
128*2248Sraf 	void *ptr;
129*2248Sraf 	size_t size;
130*2248Sraf 
131*2248Sraf 	if (_kaio_supported != NULL)	/* already initialized */
132*2248Sraf 		return (0);
133*2248Sraf 
134*2248Sraf 	size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t);
135*2248Sraf 	ptr = mmap(NULL, size, PROT_READ | PROT_WRITE,
136*2248Sraf 	    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
137*2248Sraf 	if (ptr == MAP_FAILED)
138*2248Sraf 		return (-1);
139*2248Sraf 	_kaio_supported = ptr;
140*2248Sraf 	return (0);
141*2248Sraf }
142*2248Sraf 
143*2248Sraf /*
144*2248Sraf  * The aio subsystem is initialized when an AIO request is made.
145*2248Sraf  * Constants are initialized like the max number of workers that
146*2248Sraf  * the subsystem can create, and the minimum number of workers
147*2248Sraf  * permitted before imposing some restrictions.  Also, some
148*2248Sraf  * workers are created.
149*2248Sraf  */
150*2248Sraf int
151*2248Sraf __uaio_init(void)
152*2248Sraf {
153*2248Sraf 	int ret = -1;
154*2248Sraf 	int i;
155*2248Sraf 
156*2248Sraf 	lmutex_lock(&__aio_initlock);
157*2248Sraf 	while (__aio_initbusy)
158*2248Sraf 		(void) _cond_wait(&__aio_initcv, &__aio_initlock);
159*2248Sraf 	if (__uaio_ok) {	/* already initialized */
160*2248Sraf 		lmutex_unlock(&__aio_initlock);
161*2248Sraf 		return (0);
162*2248Sraf 	}
163*2248Sraf 	__aio_initbusy = 1;
164*2248Sraf 	lmutex_unlock(&__aio_initlock);
165*2248Sraf 
166*2248Sraf 	hz = (int)sysconf(_SC_CLK_TCK);
167*2248Sraf 	__pid = getpid();
168*2248Sraf 
169*2248Sraf 	setup_cancelsig(SIGAIOCANCEL);
170*2248Sraf 
171*2248Sraf 	if (_kaio_supported_init() != 0)
172*2248Sraf 		goto out;
173*2248Sraf 
174*2248Sraf 	/*
175*2248Sraf 	 * Allocate and initialize the hash table.
176*2248Sraf 	 */
177*2248Sraf 	/* LINTED pointer cast */
178*2248Sraf 	_aio_hash = (aio_hash_t *)mmap(NULL,
179*2248Sraf 	    HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE,
180*2248Sraf 	    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
181*2248Sraf 	if ((void *)_aio_hash == MAP_FAILED) {
182*2248Sraf 		_aio_hash = NULL;
183*2248Sraf 		goto out;
184*2248Sraf 	}
185*2248Sraf 	for (i = 0; i < HASHSZ; i++)
186*2248Sraf 		(void) mutex_init(&_aio_hash[i].hash_lock, USYNC_THREAD, NULL);
187*2248Sraf 
188*2248Sraf 	/*
189*2248Sraf 	 * Initialize worker's signal mask to only catch SIGAIOCANCEL.
190*2248Sraf 	 */
191*2248Sraf 	(void) sigfillset(&_worker_set);
192*2248Sraf 	(void) sigdelset(&_worker_set, SIGAIOCANCEL);
193*2248Sraf 
194*2248Sraf 	/*
195*2248Sraf 	 * Create the minimum number of read/write workers.
196*2248Sraf 	 */
197*2248Sraf 	for (i = 0; i < _min_workers; i++)
198*2248Sraf 		(void) _aio_create_worker(NULL, AIOREAD);
199*2248Sraf 
200*2248Sraf 	/*
201*2248Sraf 	 * Create one worker to send asynchronous notifications.
202*2248Sraf 	 */
203*2248Sraf 	(void) _aio_create_worker(NULL, AIONOTIFY);
204*2248Sraf 
205*2248Sraf 	ret = 0;
206*2248Sraf out:
207*2248Sraf 	lmutex_lock(&__aio_initlock);
208*2248Sraf 	if (ret == 0)
209*2248Sraf 		__uaio_ok = 1;
210*2248Sraf 	__aio_initbusy = 0;
211*2248Sraf 	(void) cond_broadcast(&__aio_initcv);
212*2248Sraf 	lmutex_unlock(&__aio_initlock);
213*2248Sraf 	return (ret);
214*2248Sraf }
215*2248Sraf 
216*2248Sraf /*
217*2248Sraf  * Called from close() before actually performing the real _close().
218*2248Sraf  */
219*2248Sraf void
220*2248Sraf _aio_close(int fd)
221*2248Sraf {
222*2248Sraf 	if (fd < 0)	/* avoid cancelling everything */
223*2248Sraf 		return;
224*2248Sraf 	/*
225*2248Sraf 	 * Cancel all outstanding aio requests for this file descriptor.
226*2248Sraf 	 */
227*2248Sraf 	if (__uaio_ok)
228*2248Sraf 		(void) aiocancel_all(fd);
229*2248Sraf 	/*
230*2248Sraf 	 * If we have allocated the bit array, clear the bit for this file.
231*2248Sraf 	 * The next open may re-use this file descriptor and the new file
232*2248Sraf 	 * may have different kaio() behaviour.
233*2248Sraf 	 */
234*2248Sraf 	if (_kaio_supported != NULL)
235*2248Sraf 		CLEAR_KAIO_SUPPORTED(fd);
236*2248Sraf }
237*2248Sraf 
238*2248Sraf /*
239*2248Sraf  * special kaio cleanup thread sits in a loop in the
240*2248Sraf  * kernel waiting for pending kaio requests to complete.
241*2248Sraf  */
242*2248Sraf void *
243*2248Sraf _kaio_cleanup_thread(void *arg)
244*2248Sraf {
245*2248Sraf 	if (pthread_setspecific(_aio_key, arg) != 0)
246*2248Sraf 		aio_panic("_kaio_cleanup_thread, pthread_setspecific()");
247*2248Sraf 	(void) _kaio(AIOSTART);
248*2248Sraf 	return (arg);
249*2248Sraf }
250*2248Sraf 
251*2248Sraf /*
252*2248Sraf  * initialize kaio.
253*2248Sraf  */
254*2248Sraf void
255*2248Sraf _kaio_init()
256*2248Sraf {
257*2248Sraf 	int error;
258*2248Sraf 	sigset_t oset;
259*2248Sraf 
260*2248Sraf 	lmutex_lock(&__aio_initlock);
261*2248Sraf 	while (__aio_initbusy)
262*2248Sraf 		(void) _cond_wait(&__aio_initcv, &__aio_initlock);
263*2248Sraf 	if (_kaio_ok) {		/* already initialized */
264*2248Sraf 		lmutex_unlock(&__aio_initlock);
265*2248Sraf 		return;
266*2248Sraf 	}
267*2248Sraf 	__aio_initbusy = 1;
268*2248Sraf 	lmutex_unlock(&__aio_initlock);
269*2248Sraf 
270*2248Sraf 	if (_kaio_supported_init() != 0)
271*2248Sraf 		error = ENOMEM;
272*2248Sraf 	else if ((_kaiowp = _aio_worker_alloc()) == NULL)
273*2248Sraf 		error = ENOMEM;
274*2248Sraf 	else if ((error = (int)_kaio(AIOINIT)) == 0) {
275*2248Sraf 		(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
276*2248Sraf 		error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread,
277*2248Sraf 		    _kaiowp, THR_DAEMON, &_kaiowp->work_tid);
278*2248Sraf 		(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
279*2248Sraf 	}
280*2248Sraf 	if (error && _kaiowp != NULL) {
281*2248Sraf 		_aio_worker_free(_kaiowp);
282*2248Sraf 		_kaiowp = NULL;
283*2248Sraf 	}
284*2248Sraf 
285*2248Sraf 	lmutex_lock(&__aio_initlock);
286*2248Sraf 	if (error)
287*2248Sraf 		_kaio_ok = -1;
288*2248Sraf 	else
289*2248Sraf 		_kaio_ok = 1;
290*2248Sraf 	__aio_initbusy = 0;
291*2248Sraf 	(void) cond_broadcast(&__aio_initcv);
292*2248Sraf 	lmutex_unlock(&__aio_initlock);
293*2248Sraf }
294*2248Sraf 
295*2248Sraf int
296*2248Sraf aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
297*2248Sraf     aio_result_t *resultp)
298*2248Sraf {
299*2248Sraf 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD));
300*2248Sraf }
301*2248Sraf 
302*2248Sraf int
303*2248Sraf aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
304*2248Sraf     aio_result_t *resultp)
305*2248Sraf {
306*2248Sraf 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE));
307*2248Sraf }
308*2248Sraf 
309*2248Sraf #if !defined(_LP64)
310*2248Sraf int
311*2248Sraf aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
312*2248Sraf     aio_result_t *resultp)
313*2248Sraf {
314*2248Sraf 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64));
315*2248Sraf }
316*2248Sraf 
317*2248Sraf int
318*2248Sraf aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
319*2248Sraf     aio_result_t *resultp)
320*2248Sraf {
321*2248Sraf 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64));
322*2248Sraf }
323*2248Sraf #endif	/* !defined(_LP64) */
324*2248Sraf 
325*2248Sraf int
326*2248Sraf _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence,
327*2248Sraf     aio_result_t *resultp, int mode)
328*2248Sraf {
329*2248Sraf 	aio_req_t *reqp;
330*2248Sraf 	aio_args_t *ap;
331*2248Sraf 	offset_t loffset;
332*2248Sraf 	struct stat stat;
333*2248Sraf 	int error = 0;
334*2248Sraf 	int kerr;
335*2248Sraf 	int umode;
336*2248Sraf 
337*2248Sraf 	switch (whence) {
338*2248Sraf 
339*2248Sraf 	case SEEK_SET:
340*2248Sraf 		loffset = offset;
341*2248Sraf 		break;
342*2248Sraf 	case SEEK_CUR:
343*2248Sraf 		if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1)
344*2248Sraf 			error = -1;
345*2248Sraf 		else
346*2248Sraf 			loffset += offset;
347*2248Sraf 		break;
348*2248Sraf 	case SEEK_END:
349*2248Sraf 		if (fstat(fd, &stat) == -1)
350*2248Sraf 			error = -1;
351*2248Sraf 		else
352*2248Sraf 			loffset = offset + stat.st_size;
353*2248Sraf 		break;
354*2248Sraf 	default:
355*2248Sraf 		errno = EINVAL;
356*2248Sraf 		error = -1;
357*2248Sraf 	}
358*2248Sraf 
359*2248Sraf 	if (error)
360*2248Sraf 		return (error);
361*2248Sraf 
362*2248Sraf 	/* initialize kaio */
363*2248Sraf 	if (!_kaio_ok)
364*2248Sraf 		_kaio_init();
365*2248Sraf 
366*2248Sraf 	/*
367*2248Sraf 	 * _aio_do_request() needs the original request code (mode) to be able
368*2248Sraf 	 * to choose the appropiate 32/64 bit function.  All other functions
369*2248Sraf 	 * only require the difference between READ and WRITE (umode).
370*2248Sraf 	 */
371*2248Sraf 	if (mode == AIOAREAD64 || mode == AIOAWRITE64)
372*2248Sraf 		umode = mode - AIOAREAD64;
373*2248Sraf 	else
374*2248Sraf 		umode = mode;
375*2248Sraf 
376*2248Sraf 	/*
377*2248Sraf 	 * Try kernel aio first.
378*2248Sraf 	 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation.
379*2248Sraf 	 */
380*2248Sraf 	if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) {
381*2248Sraf 		resultp->aio_errno = 0;
382*2248Sraf 		sig_mutex_lock(&__aio_mutex);
383*2248Sraf 		_kaio_outstand_cnt++;
384*2248Sraf 		kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ?
385*2248Sraf 		    (umode | AIO_POLL_BIT) : umode),
386*2248Sraf 		    fd, buf, bufsz, loffset, resultp);
387*2248Sraf 		if (kerr == 0) {
388*2248Sraf 			sig_mutex_unlock(&__aio_mutex);
389*2248Sraf 			return (0);
390*2248Sraf 		}
391*2248Sraf 		_kaio_outstand_cnt--;
392*2248Sraf 		sig_mutex_unlock(&__aio_mutex);
393*2248Sraf 		if (errno != ENOTSUP && errno != EBADFD)
394*2248Sraf 			return (-1);
395*2248Sraf 		if (errno == EBADFD)
396*2248Sraf 			SET_KAIO_NOT_SUPPORTED(fd);
397*2248Sraf 	}
398*2248Sraf 
399*2248Sraf 	if (!__uaio_ok && __uaio_init() == -1)
400*2248Sraf 		return (-1);
401*2248Sraf 
402*2248Sraf 	if ((reqp = _aio_req_alloc()) == NULL) {
403*2248Sraf 		errno = EAGAIN;
404*2248Sraf 		return (-1);
405*2248Sraf 	}
406*2248Sraf 
407*2248Sraf 	/*
408*2248Sraf 	 * _aio_do_request() checks reqp->req_op to differentiate
409*2248Sraf 	 * between 32 and 64 bit access.
410*2248Sraf 	 */
411*2248Sraf 	reqp->req_op = mode;
412*2248Sraf 	reqp->req_resultp = resultp;
413*2248Sraf 	ap = &reqp->req_args;
414*2248Sraf 	ap->fd = fd;
415*2248Sraf 	ap->buf = buf;
416*2248Sraf 	ap->bufsz = bufsz;
417*2248Sraf 	ap->offset = loffset;
418*2248Sraf 
419*2248Sraf 	if (_aio_hash_insert(resultp, reqp) != 0) {
420*2248Sraf 		_aio_req_free(reqp);
421*2248Sraf 		errno = EINVAL;
422*2248Sraf 		return (-1);
423*2248Sraf 	}
424*2248Sraf 	/*
425*2248Sraf 	 * _aio_req_add() only needs the difference between READ and
426*2248Sraf 	 * WRITE to choose the right worker queue.
427*2248Sraf 	 */
428*2248Sraf 	_aio_req_add(reqp, &__nextworker_rw, umode);
429*2248Sraf 	return (0);
430*2248Sraf }
431*2248Sraf 
432*2248Sraf int
433*2248Sraf aiocancel(aio_result_t *resultp)
434*2248Sraf {
435*2248Sraf 	aio_req_t *reqp;
436*2248Sraf 	aio_worker_t *aiowp;
437*2248Sraf 	int ret;
438*2248Sraf 	int done = 0;
439*2248Sraf 	int canceled = 0;
440*2248Sraf 
441*2248Sraf 	if (!__uaio_ok) {
442*2248Sraf 		errno = EINVAL;
443*2248Sraf 		return (-1);
444*2248Sraf 	}
445*2248Sraf 
446*2248Sraf 	sig_mutex_lock(&__aio_mutex);
447*2248Sraf 	reqp = _aio_hash_find(resultp);
448*2248Sraf 	if (reqp == NULL) {
449*2248Sraf 		if (_aio_outstand_cnt == _aio_req_done_cnt)
450*2248Sraf 			errno = EINVAL;
451*2248Sraf 		else
452*2248Sraf 			errno = EACCES;
453*2248Sraf 		ret = -1;
454*2248Sraf 	} else {
455*2248Sraf 		aiowp = reqp->req_worker;
456*2248Sraf 		sig_mutex_lock(&aiowp->work_qlock1);
457*2248Sraf 		(void) _aio_cancel_req(aiowp, reqp, &canceled, &done);
458*2248Sraf 		sig_mutex_unlock(&aiowp->work_qlock1);
459*2248Sraf 
460*2248Sraf 		if (canceled) {
461*2248Sraf 			ret = 0;
462*2248Sraf 		} else {
463*2248Sraf 			if (_aio_outstand_cnt == 0 ||
464*2248Sraf 			    _aio_outstand_cnt == _aio_req_done_cnt)
465*2248Sraf 				errno = EINVAL;
466*2248Sraf 			else
467*2248Sraf 				errno = EACCES;
468*2248Sraf 			ret = -1;
469*2248Sraf 		}
470*2248Sraf 	}
471*2248Sraf 	sig_mutex_unlock(&__aio_mutex);
472*2248Sraf 	return (ret);
473*2248Sraf }
474*2248Sraf 
475*2248Sraf /*
476*2248Sraf  * This must be asynch safe
477*2248Sraf  */
478*2248Sraf aio_result_t *
479*2248Sraf aiowait(struct timeval *uwait)
480*2248Sraf {
481*2248Sraf 	aio_result_t *uresultp;
482*2248Sraf 	aio_result_t *kresultp;
483*2248Sraf 	aio_result_t *resultp;
484*2248Sraf 	int dontblock;
485*2248Sraf 	int timedwait = 0;
486*2248Sraf 	int kaio_errno = 0;
487*2248Sraf 	struct timeval twait;
488*2248Sraf 	struct timeval *wait = NULL;
489*2248Sraf 	hrtime_t hrtend;
490*2248Sraf 	hrtime_t hres;
491*2248Sraf 
492*2248Sraf 	if (uwait) {
493*2248Sraf 		/*
494*2248Sraf 		 * Check for a valid specified wait time.
495*2248Sraf 		 * If it is invalid, fail the call right away.
496*2248Sraf 		 */
497*2248Sraf 		if (uwait->tv_sec < 0 || uwait->tv_usec < 0 ||
498*2248Sraf 		    uwait->tv_usec >= MICROSEC) {
499*2248Sraf 			errno = EINVAL;
500*2248Sraf 			return ((aio_result_t *)-1);
501*2248Sraf 		}
502*2248Sraf 
503*2248Sraf 		if (uwait->tv_sec > 0 || uwait->tv_usec > 0) {
504*2248Sraf 			hrtend = gethrtime() +
505*2248Sraf 				(hrtime_t)uwait->tv_sec * NANOSEC +
506*2248Sraf 				(hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC);
507*2248Sraf 			twait = *uwait;
508*2248Sraf 			wait = &twait;
509*2248Sraf 			timedwait++;
510*2248Sraf 		} else {
511*2248Sraf 			/* polling */
512*2248Sraf 			sig_mutex_lock(&__aio_mutex);
513*2248Sraf 			if (_kaio_outstand_cnt == 0) {
514*2248Sraf 				kresultp = (aio_result_t *)-1;
515*2248Sraf 			} else {
516*2248Sraf 				kresultp = (aio_result_t *)_kaio(AIOWAIT,
517*2248Sraf 				    (struct timeval *)-1, 1);
518*2248Sraf 				if (kresultp != (aio_result_t *)-1 &&
519*2248Sraf 				    kresultp != NULL &&
520*2248Sraf 				    kresultp != (aio_result_t *)1) {
521*2248Sraf 					_kaio_outstand_cnt--;
522*2248Sraf 					sig_mutex_unlock(&__aio_mutex);
523*2248Sraf 					return (kresultp);
524*2248Sraf 				}
525*2248Sraf 			}
526*2248Sraf 			uresultp = _aio_req_done();
527*2248Sraf 			sig_mutex_unlock(&__aio_mutex);
528*2248Sraf 			if (uresultp != NULL &&
529*2248Sraf 			    uresultp != (aio_result_t *)-1) {
530*2248Sraf 				return (uresultp);
531*2248Sraf 			}
532*2248Sraf 			if (uresultp == (aio_result_t *)-1 &&
533*2248Sraf 			    kresultp == (aio_result_t *)-1) {
534*2248Sraf 				errno = EINVAL;
535*2248Sraf 				return ((aio_result_t *)-1);
536*2248Sraf 			} else {
537*2248Sraf 				return (NULL);
538*2248Sraf 			}
539*2248Sraf 		}
540*2248Sraf 	}
541*2248Sraf 
542*2248Sraf 	for (;;) {
543*2248Sraf 		sig_mutex_lock(&__aio_mutex);
544*2248Sraf 		uresultp = _aio_req_done();
545*2248Sraf 		if (uresultp != NULL && uresultp != (aio_result_t *)-1) {
546*2248Sraf 			sig_mutex_unlock(&__aio_mutex);
547*2248Sraf 			resultp = uresultp;
548*2248Sraf 			break;
549*2248Sraf 		}
550*2248Sraf 		_aiowait_flag++;
551*2248Sraf 		dontblock = (uresultp == (aio_result_t *)-1);
552*2248Sraf 		if (dontblock && _kaio_outstand_cnt == 0) {
553*2248Sraf 			kresultp = (aio_result_t *)-1;
554*2248Sraf 			kaio_errno = EINVAL;
555*2248Sraf 		} else {
556*2248Sraf 			sig_mutex_unlock(&__aio_mutex);
557*2248Sraf 			kresultp = (aio_result_t *)_kaio(AIOWAIT,
558*2248Sraf 			    wait, dontblock);
559*2248Sraf 			sig_mutex_lock(&__aio_mutex);
560*2248Sraf 			kaio_errno = errno;
561*2248Sraf 		}
562*2248Sraf 		_aiowait_flag--;
563*2248Sraf 		sig_mutex_unlock(&__aio_mutex);
564*2248Sraf 		if (kresultp == (aio_result_t *)1) {
565*2248Sraf 			/* aiowait() awakened by an aionotify() */
566*2248Sraf 			continue;
567*2248Sraf 		} else if (kresultp != NULL &&
568*2248Sraf 		    kresultp != (aio_result_t *)-1) {
569*2248Sraf 			resultp = kresultp;
570*2248Sraf 			sig_mutex_lock(&__aio_mutex);
571*2248Sraf 			_kaio_outstand_cnt--;
572*2248Sraf 			sig_mutex_unlock(&__aio_mutex);
573*2248Sraf 			break;
574*2248Sraf 		} else if (kresultp == (aio_result_t *)-1 &&
575*2248Sraf 		    kaio_errno == EINVAL &&
576*2248Sraf 		    uresultp == (aio_result_t *)-1) {
577*2248Sraf 			errno = kaio_errno;
578*2248Sraf 			resultp = (aio_result_t *)-1;
579*2248Sraf 			break;
580*2248Sraf 		} else if (kresultp == (aio_result_t *)-1 &&
581*2248Sraf 		    kaio_errno == EINTR) {
582*2248Sraf 			errno = kaio_errno;
583*2248Sraf 			resultp = (aio_result_t *)-1;
584*2248Sraf 			break;
585*2248Sraf 		} else if (timedwait) {
586*2248Sraf 			hres = hrtend - gethrtime();
587*2248Sraf 			if (hres <= 0) {
588*2248Sraf 				/* time is up; return */
589*2248Sraf 				resultp = NULL;
590*2248Sraf 				break;
591*2248Sraf 			} else {
592*2248Sraf 				/*
593*2248Sraf 				 * Some time left.  Round up the remaining time
594*2248Sraf 				 * in nanoseconds to microsec.  Retry the call.
595*2248Sraf 				 */
596*2248Sraf 				hres += (NANOSEC / MICROSEC) - 1;
597*2248Sraf 				wait->tv_sec = hres / NANOSEC;
598*2248Sraf 				wait->tv_usec =
599*2248Sraf 					(hres % NANOSEC) / (NANOSEC / MICROSEC);
600*2248Sraf 			}
601*2248Sraf 		} else {
602*2248Sraf 			ASSERT(kresultp == NULL && uresultp == NULL);
603*2248Sraf 			resultp = NULL;
604*2248Sraf 			continue;
605*2248Sraf 		}
606*2248Sraf 	}
607*2248Sraf 	return (resultp);
608*2248Sraf }
609*2248Sraf 
610*2248Sraf /*
611*2248Sraf  * _aio_get_timedelta calculates the remaining time and stores the result
612*2248Sraf  * into timespec_t *wait.
613*2248Sraf  */
614*2248Sraf 
615*2248Sraf int
616*2248Sraf _aio_get_timedelta(timespec_t *end, timespec_t *wait)
617*2248Sraf {
618*2248Sraf 	int	ret = 0;
619*2248Sraf 	struct	timeval cur;
620*2248Sraf 	timespec_t curtime;
621*2248Sraf 
622*2248Sraf 	(void) gettimeofday(&cur, NULL);
623*2248Sraf 	curtime.tv_sec = cur.tv_sec;
624*2248Sraf 	curtime.tv_nsec = cur.tv_usec * 1000;   /* convert us to ns */
625*2248Sraf 
626*2248Sraf 	if (end->tv_sec >= curtime.tv_sec) {
627*2248Sraf 		wait->tv_sec = end->tv_sec - curtime.tv_sec;
628*2248Sraf 		if (end->tv_nsec >= curtime.tv_nsec) {
629*2248Sraf 			wait->tv_nsec = end->tv_nsec - curtime.tv_nsec;
630*2248Sraf 			if (wait->tv_sec == 0 && wait->tv_nsec == 0)
631*2248Sraf 				ret = -1;	/* timer expired */
632*2248Sraf 		} else {
633*2248Sraf 			if (end->tv_sec > curtime.tv_sec) {
634*2248Sraf 				wait->tv_sec -= 1;
635*2248Sraf 				wait->tv_nsec = NANOSEC -
636*2248Sraf 				    (curtime.tv_nsec - end->tv_nsec);
637*2248Sraf 			} else {
638*2248Sraf 				ret = -1;	/* timer expired */
639*2248Sraf 			}
640*2248Sraf 		}
641*2248Sraf 	} else {
642*2248Sraf 		ret = -1;
643*2248Sraf 	}
644*2248Sraf 	return (ret);
645*2248Sraf }
646*2248Sraf 
647*2248Sraf /*
648*2248Sraf  * If closing by file descriptor: we will simply cancel all the outstanding
649*2248Sraf  * aio`s and return.  Those aio's in question will have either noticed the
650*2248Sraf  * cancellation notice before, during, or after initiating io.
651*2248Sraf  */
652*2248Sraf int
653*2248Sraf aiocancel_all(int fd)
654*2248Sraf {
655*2248Sraf 	aio_req_t *reqp;
656*2248Sraf 	aio_req_t **reqpp;
657*2248Sraf 	aio_worker_t *first;
658*2248Sraf 	aio_worker_t *next;
659*2248Sraf 	int canceled = 0;
660*2248Sraf 	int done = 0;
661*2248Sraf 	int cancelall = 0;
662*2248Sraf 
663*2248Sraf 	sig_mutex_lock(&__aio_mutex);
664*2248Sraf 
665*2248Sraf 	if (_aio_outstand_cnt == 0) {
666*2248Sraf 		sig_mutex_unlock(&__aio_mutex);
667*2248Sraf 		return (AIO_ALLDONE);
668*2248Sraf 	}
669*2248Sraf 
670*2248Sraf 	/*
671*2248Sraf 	 * Cancel requests from the read/write workers' queues.
672*2248Sraf 	 */
673*2248Sraf 	first = __nextworker_rw;
674*2248Sraf 	next = first;
675*2248Sraf 	do {
676*2248Sraf 		_aio_cancel_work(next, fd, &canceled, &done);
677*2248Sraf 	} while ((next = next->work_forw) != first);
678*2248Sraf 
679*2248Sraf 	/*
680*2248Sraf 	 * finally, check if there are requests on the done queue that
681*2248Sraf 	 * should be canceled.
682*2248Sraf 	 */
683*2248Sraf 	if (fd < 0)
684*2248Sraf 		cancelall = 1;
685*2248Sraf 	reqpp = &_aio_done_tail;
686*2248Sraf 	while ((reqp = *reqpp) != NULL) {
687*2248Sraf 		if (cancelall || reqp->req_args.fd == fd) {
688*2248Sraf 			*reqpp = reqp->req_next;
689*2248Sraf 			_aio_donecnt--;
690*2248Sraf 			(void) _aio_hash_del(reqp->req_resultp);
691*2248Sraf 			_aio_req_free(reqp);
692*2248Sraf 		} else
693*2248Sraf 			reqpp = &reqp->req_next;
694*2248Sraf 	}
695*2248Sraf 	if (cancelall) {
696*2248Sraf 		ASSERT(_aio_donecnt == 0);
697*2248Sraf 		_aio_done_head = NULL;
698*2248Sraf 	}
699*2248Sraf 	sig_mutex_unlock(&__aio_mutex);
700*2248Sraf 
701*2248Sraf 	if (canceled && done == 0)
702*2248Sraf 		return (AIO_CANCELED);
703*2248Sraf 	else if (done && canceled == 0)
704*2248Sraf 		return (AIO_ALLDONE);
705*2248Sraf 	else if ((canceled + done == 0) && KAIO_SUPPORTED(fd))
706*2248Sraf 		return ((int)_kaio(AIOCANCEL, fd, NULL));
707*2248Sraf 	return (AIO_NOTCANCELED);
708*2248Sraf }
709*2248Sraf 
710*2248Sraf /*
711*2248Sraf  * Cancel requests from a given work queue.  If the file descriptor
712*2248Sraf  * parameter, fd, is non-negative, then only cancel those requests
713*2248Sraf  * in this queue that are to this file descriptor.  If the fd
714*2248Sraf  * parameter is -1, then cancel all requests.
715*2248Sraf  */
716*2248Sraf static void
717*2248Sraf _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done)
718*2248Sraf {
719*2248Sraf 	aio_req_t *reqp;
720*2248Sraf 
721*2248Sraf 	sig_mutex_lock(&aiowp->work_qlock1);
722*2248Sraf 	/*
723*2248Sraf 	 * cancel queued requests first.
724*2248Sraf 	 */
725*2248Sraf 	reqp = aiowp->work_tail1;
726*2248Sraf 	while (reqp != NULL) {
727*2248Sraf 		if (fd < 0 || reqp->req_args.fd == fd) {
728*2248Sraf 			if (_aio_cancel_req(aiowp, reqp, canceled, done)) {
729*2248Sraf 				/*
730*2248Sraf 				 * Callers locks were dropped.
731*2248Sraf 				 * reqp is invalid; start traversing
732*2248Sraf 				 * the list from the beginning again.
733*2248Sraf 				 */
734*2248Sraf 				reqp = aiowp->work_tail1;
735*2248Sraf 				continue;
736*2248Sraf 			}
737*2248Sraf 		}
738*2248Sraf 		reqp = reqp->req_next;
739*2248Sraf 	}
740*2248Sraf 	/*
741*2248Sraf 	 * Since the queued requests have been canceled, there can
742*2248Sraf 	 * only be one inprogress request that should be canceled.
743*2248Sraf 	 */
744*2248Sraf 	if ((reqp = aiowp->work_req) != NULL &&
745*2248Sraf 	    (fd < 0 || reqp->req_args.fd == fd))
746*2248Sraf 		(void) _aio_cancel_req(aiowp, reqp, canceled, done);
747*2248Sraf 	sig_mutex_unlock(&aiowp->work_qlock1);
748*2248Sraf }
749*2248Sraf 
750*2248Sraf /*
751*2248Sraf  * Cancel a request.  Return 1 if the callers locks were temporarily
752*2248Sraf  * dropped, otherwise return 0.
753*2248Sraf  */
754*2248Sraf int
755*2248Sraf _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done)
756*2248Sraf {
757*2248Sraf 	int ostate = reqp->req_state;
758*2248Sraf 
759*2248Sraf 	ASSERT(MUTEX_HELD(&__aio_mutex));
760*2248Sraf 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
761*2248Sraf 	if (ostate == AIO_REQ_CANCELED)
762*2248Sraf 		return (0);
763*2248Sraf 	if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) {
764*2248Sraf 		(*done)++;
765*2248Sraf 		return (0);
766*2248Sraf 	}
767*2248Sraf 	if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) {
768*2248Sraf 		ASSERT(POSIX_AIO(reqp));
769*2248Sraf 		/* Cancel the queued aio_fsync() request */
770*2248Sraf 		if (!reqp->req_head->lio_canned) {
771*2248Sraf 			reqp->req_head->lio_canned = 1;
772*2248Sraf 			_aio_outstand_cnt--;
773*2248Sraf 			(*canceled)++;
774*2248Sraf 		}
775*2248Sraf 		return (0);
776*2248Sraf 	}
777*2248Sraf 	reqp->req_state = AIO_REQ_CANCELED;
778*2248Sraf 	_aio_req_del(aiowp, reqp, ostate);
779*2248Sraf 	(void) _aio_hash_del(reqp->req_resultp);
780*2248Sraf 	(*canceled)++;
781*2248Sraf 	if (reqp == aiowp->work_req) {
782*2248Sraf 		ASSERT(ostate == AIO_REQ_INPROGRESS);
783*2248Sraf 		/*
784*2248Sraf 		 * Set the result values now, before _aiodone() is called.
785*2248Sraf 		 * We do this because the application can expect aio_return
786*2248Sraf 		 * and aio_errno to be set to -1 and ECANCELED, respectively,
787*2248Sraf 		 * immediately after a successful return from aiocancel()
788*2248Sraf 		 * or aio_cancel().
789*2248Sraf 		 */
790*2248Sraf 		_aio_set_result(reqp, -1, ECANCELED);
791*2248Sraf 		(void) thr_kill(aiowp->work_tid, SIGAIOCANCEL);
792*2248Sraf 		return (0);
793*2248Sraf 	}
794*2248Sraf 	if (!POSIX_AIO(reqp)) {
795*2248Sraf 		_aio_outstand_cnt--;
796*2248Sraf 		_aio_set_result(reqp, -1, ECANCELED);
797*2248Sraf 		return (0);
798*2248Sraf 	}
799*2248Sraf 	sig_mutex_unlock(&aiowp->work_qlock1);
800*2248Sraf 	sig_mutex_unlock(&__aio_mutex);
801*2248Sraf 	_aiodone(reqp, -1, ECANCELED);
802*2248Sraf 	sig_mutex_lock(&__aio_mutex);
803*2248Sraf 	sig_mutex_lock(&aiowp->work_qlock1);
804*2248Sraf 	return (1);
805*2248Sraf }
806*2248Sraf 
807*2248Sraf int
808*2248Sraf _aio_create_worker(aio_req_t *reqp, int mode)
809*2248Sraf {
810*2248Sraf 	aio_worker_t *aiowp, **workers, **nextworker;
811*2248Sraf 	int *aio_workerscnt;
812*2248Sraf 	void *(*func)(void *);
813*2248Sraf 	sigset_t oset;
814*2248Sraf 	int error;
815*2248Sraf 
816*2248Sraf 	/*
817*2248Sraf 	 * Put the new worker thread in the right queue.
818*2248Sraf 	 */
819*2248Sraf 	switch (mode) {
820*2248Sraf 	case AIOREAD:
821*2248Sraf 	case AIOWRITE:
822*2248Sraf 	case AIOAREAD:
823*2248Sraf 	case AIOAWRITE:
824*2248Sraf #if !defined(_LP64)
825*2248Sraf 	case AIOAREAD64:
826*2248Sraf 	case AIOAWRITE64:
827*2248Sraf #endif
828*2248Sraf 		workers = &__workers_rw;
829*2248Sraf 		nextworker = &__nextworker_rw;
830*2248Sraf 		aio_workerscnt = &__rw_workerscnt;
831*2248Sraf 		func = _aio_do_request;
832*2248Sraf 		break;
833*2248Sraf 	case AIONOTIFY:
834*2248Sraf 		workers = &__workers_no;
835*2248Sraf 		nextworker = &__nextworker_no;
836*2248Sraf 		func = _aio_do_notify;
837*2248Sraf 		aio_workerscnt = &__no_workerscnt;
838*2248Sraf 		break;
839*2248Sraf 	default:
840*2248Sraf 		aio_panic("_aio_create_worker: invalid mode");
841*2248Sraf 		break;
842*2248Sraf 	}
843*2248Sraf 
844*2248Sraf 	if ((aiowp = _aio_worker_alloc()) == NULL)
845*2248Sraf 		return (-1);
846*2248Sraf 
847*2248Sraf 	if (reqp) {
848*2248Sraf 		reqp->req_state = AIO_REQ_QUEUED;
849*2248Sraf 		reqp->req_worker = aiowp;
850*2248Sraf 		aiowp->work_head1 = reqp;
851*2248Sraf 		aiowp->work_tail1 = reqp;
852*2248Sraf 		aiowp->work_next1 = reqp;
853*2248Sraf 		aiowp->work_count1 = 1;
854*2248Sraf 		aiowp->work_minload1 = 1;
855*2248Sraf 	}
856*2248Sraf 
857*2248Sraf 	(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
858*2248Sraf 	error = thr_create(NULL, AIOSTKSIZE, func, aiowp,
859*2248Sraf 		THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid);
860*2248Sraf 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
861*2248Sraf 	if (error) {
862*2248Sraf 		if (reqp) {
863*2248Sraf 			reqp->req_state = 0;
864*2248Sraf 			reqp->req_worker = NULL;
865*2248Sraf 		}
866*2248Sraf 		_aio_worker_free(aiowp);
867*2248Sraf 		return (-1);
868*2248Sraf 	}
869*2248Sraf 
870*2248Sraf 	lmutex_lock(&__aio_mutex);
871*2248Sraf 	(*aio_workerscnt)++;
872*2248Sraf 	if (*workers == NULL) {
873*2248Sraf 		aiowp->work_forw = aiowp;
874*2248Sraf 		aiowp->work_backw = aiowp;
875*2248Sraf 		*nextworker = aiowp;
876*2248Sraf 		*workers = aiowp;
877*2248Sraf 	} else {
878*2248Sraf 		aiowp->work_backw = (*workers)->work_backw;
879*2248Sraf 		aiowp->work_forw = (*workers);
880*2248Sraf 		(*workers)->work_backw->work_forw = aiowp;
881*2248Sraf 		(*workers)->work_backw = aiowp;
882*2248Sraf 	}
883*2248Sraf 	_aio_worker_cnt++;
884*2248Sraf 	lmutex_unlock(&__aio_mutex);
885*2248Sraf 
886*2248Sraf 	(void) thr_continue(aiowp->work_tid);
887*2248Sraf 
888*2248Sraf 	return (0);
889*2248Sraf }
890*2248Sraf 
891*2248Sraf /*
892*2248Sraf  * This is the worker's main routine.
893*2248Sraf  * The task of this function is to execute all queued requests;
894*2248Sraf  * once the last pending request is executed this function will block
895*2248Sraf  * in _aio_idle().  A new incoming request must wakeup this thread to
896*2248Sraf  * restart the work.
897*2248Sraf  * Every worker has an own work queue.  The queue lock is required
898*2248Sraf  * to synchronize the addition of new requests for this worker or
899*2248Sraf  * cancellation of pending/running requests.
900*2248Sraf  *
901*2248Sraf  * Cancellation scenarios:
902*2248Sraf  * The cancellation of a request is being done asynchronously using
903*2248Sraf  * _aio_cancel_req() from another thread context.
904*2248Sraf  * A queued request can be cancelled in different manners :
905*2248Sraf  * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED):
906*2248Sraf  *	- lock the queue -> remove the request -> unlock the queue
907*2248Sraf  *	- this function/thread does not detect this cancellation process
908*2248Sraf  * b) request is in progress (AIO_REQ_INPROGRESS) :
909*2248Sraf  *	- this function first allow the cancellation of the running
910*2248Sraf  *	  request with the flag "work_cancel_flg=1"
911*2248Sraf  * 		see _aio_req_get() -> _aio_cancel_on()
912*2248Sraf  *	  During this phase, it is allowed to interrupt the worker
913*2248Sraf  *	  thread running the request (this thread) using the SIGAIOCANCEL
914*2248Sraf  *	  signal.
915*2248Sraf  *	  Once this thread returns from the kernel (because the request
916*2248Sraf  *	  is just done), then it must disable a possible cancellation
917*2248Sraf  *	  and proceed to finish the request.  To disable the cancellation
918*2248Sraf  *	  this thread must use _aio_cancel_off() to set "work_cancel_flg=0".
919*2248Sraf  * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ):
920*2248Sraf  *	  same procedure as in a)
921*2248Sraf  *
922*2248Sraf  * To b)
923*2248Sraf  *	This thread uses sigsetjmp() to define the position in the code, where
924*2248Sraf  *	it wish to continue working in the case that a SIGAIOCANCEL signal
925*2248Sraf  *	is detected.
926*2248Sraf  *	Normally this thread should get the cancellation signal during the
927*2248Sraf  *	kernel phase (reading or writing).  In that case the signal handler
928*2248Sraf  *	aiosigcancelhndlr() is activated using the worker thread context,
929*2248Sraf  *	which again will use the siglongjmp() function to break the standard
930*2248Sraf  *	code flow and jump to the "sigsetjmp" position, provided that
931*2248Sraf  *	"work_cancel_flg" is set to "1".
932*2248Sraf  *	Because the "work_cancel_flg" is only manipulated by this worker
933*2248Sraf  *	thread and it can only run on one CPU at a given time, it is not
934*2248Sraf  *	necessary to protect that flag with the queue lock.
935*2248Sraf  *	Returning from the kernel (read or write system call) we must
936*2248Sraf  *	first disable the use of the SIGAIOCANCEL signal and accordingly
937*2248Sraf  *	the use of the siglongjmp() function to prevent a possible deadlock:
938*2248Sraf  *	- It can happens that this worker thread returns from the kernel and
939*2248Sraf  *	  blocks in "work_qlock1",
940*2248Sraf  *	- then a second thread cancels the apparently "in progress" request
941*2248Sraf  *	  and sends the SIGAIOCANCEL signal to the worker thread,
942*2248Sraf  *	- the worker thread gets assigned the "work_qlock1" and will returns
943*2248Sraf  *	  from the kernel,
944*2248Sraf  *	- the kernel detects the pending signal and activates the signal
945*2248Sraf  *	  handler instead,
946*2248Sraf  *	- if the "work_cancel_flg" is still set then the signal handler
947*2248Sraf  *	  should use siglongjmp() to cancel the "in progress" request and
948*2248Sraf  *	  it would try to acquire the same work_qlock1 in _aio_req_get()
949*2248Sraf  *	  for a second time => deadlock.
950*2248Sraf  *	To avoid that situation we disable the cancellation of the request
951*2248Sraf  *	in progress BEFORE we try to acquire the work_qlock1.
952*2248Sraf  *	In that case the signal handler will not call siglongjmp() and the
953*2248Sraf  *	worker thread will continue running the standard code flow.
954*2248Sraf  *	Then this thread must check the AIO_REQ_CANCELED flag to emulate
955*2248Sraf  *	an eventually required siglongjmp() freeing the work_qlock1 and
956*2248Sraf  *	avoiding a deadlock.
957*2248Sraf  */
958*2248Sraf void *
959*2248Sraf _aio_do_request(void *arglist)
960*2248Sraf {
961*2248Sraf 	aio_worker_t *aiowp = (aio_worker_t *)arglist;
962*2248Sraf 	ulwp_t *self = curthread;
963*2248Sraf 	struct aio_args *arg;
964*2248Sraf 	aio_req_t *reqp;		/* current AIO request */
965*2248Sraf 	ssize_t retval;
966*2248Sraf 	int error;
967*2248Sraf 
968*2248Sraf 	if (pthread_setspecific(_aio_key, aiowp) != 0)
969*2248Sraf 		aio_panic("_aio_do_request, pthread_setspecific()");
970*2248Sraf 	(void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL);
971*2248Sraf 	ASSERT(aiowp->work_req == NULL);
972*2248Sraf 
973*2248Sraf 	/*
974*2248Sraf 	 * We resume here when an operation is cancelled.
975*2248Sraf 	 * On first entry, aiowp->work_req == NULL, so all
976*2248Sraf 	 * we do is block SIGAIOCANCEL.
977*2248Sraf 	 */
978*2248Sraf 	(void) sigsetjmp(aiowp->work_jmp_buf, 0);
979*2248Sraf 	ASSERT(self->ul_sigdefer == 0);
980*2248Sraf 
981*2248Sraf 	sigoff(self);	/* block SIGAIOCANCEL */
982*2248Sraf 	if (aiowp->work_req != NULL)
983*2248Sraf 		_aio_finish_request(aiowp, -1, ECANCELED);
984*2248Sraf 
985*2248Sraf 	for (;;) {
986*2248Sraf 		/*
987*2248Sraf 		 * Put completed requests on aio_done_list.  This has
988*2248Sraf 		 * to be done as part of the main loop to ensure that
989*2248Sraf 		 * we don't artificially starve any aiowait'ers.
990*2248Sraf 		 */
991*2248Sraf 		if (aiowp->work_done1)
992*2248Sraf 			_aio_work_done(aiowp);
993*2248Sraf 
994*2248Sraf top:
995*2248Sraf 		/* consume any deferred SIGAIOCANCEL signal here */
996*2248Sraf 		sigon(self);
997*2248Sraf 		sigoff(self);
998*2248Sraf 
999*2248Sraf 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
1000*2248Sraf 			if (_aio_idle(aiowp) != 0)
1001*2248Sraf 				goto top;
1002*2248Sraf 		}
1003*2248Sraf 		arg = &reqp->req_args;
1004*2248Sraf 		ASSERT(reqp->req_state == AIO_REQ_INPROGRESS ||
1005*2248Sraf 		    reqp->req_state == AIO_REQ_CANCELED);
1006*2248Sraf 		error = 0;
1007*2248Sraf 
1008*2248Sraf 		switch (reqp->req_op) {
1009*2248Sraf 		case AIOREAD:
1010*2248Sraf 		case AIOAREAD:
1011*2248Sraf 			sigon(self);	/* unblock SIGAIOCANCEL */
1012*2248Sraf 			retval = pread(arg->fd, arg->buf,
1013*2248Sraf 			    arg->bufsz, arg->offset);
1014*2248Sraf 			if (retval == -1) {
1015*2248Sraf 				if (errno == ESPIPE) {
1016*2248Sraf 					retval = read(arg->fd,
1017*2248Sraf 					    arg->buf, arg->bufsz);
1018*2248Sraf 					if (retval == -1)
1019*2248Sraf 						error = errno;
1020*2248Sraf 				} else {
1021*2248Sraf 					error = errno;
1022*2248Sraf 				}
1023*2248Sraf 			}
1024*2248Sraf 			sigoff(self);	/* block SIGAIOCANCEL */
1025*2248Sraf 			break;
1026*2248Sraf 		case AIOWRITE:
1027*2248Sraf 		case AIOAWRITE:
1028*2248Sraf 			sigon(self);	/* unblock SIGAIOCANCEL */
1029*2248Sraf 			retval = pwrite(arg->fd, arg->buf,
1030*2248Sraf 			    arg->bufsz, arg->offset);
1031*2248Sraf 			if (retval == -1) {
1032*2248Sraf 				if (errno == ESPIPE) {
1033*2248Sraf 					retval = write(arg->fd,
1034*2248Sraf 					    arg->buf, arg->bufsz);
1035*2248Sraf 					if (retval == -1)
1036*2248Sraf 						error = errno;
1037*2248Sraf 				} else {
1038*2248Sraf 					error = errno;
1039*2248Sraf 				}
1040*2248Sraf 			}
1041*2248Sraf 			sigoff(self);	/* block SIGAIOCANCEL */
1042*2248Sraf 			break;
1043*2248Sraf #if !defined(_LP64)
1044*2248Sraf 		case AIOAREAD64:
1045*2248Sraf 			sigon(self);	/* unblock SIGAIOCANCEL */
1046*2248Sraf 			retval = pread64(arg->fd, arg->buf,
1047*2248Sraf 			    arg->bufsz, arg->offset);
1048*2248Sraf 			if (retval == -1) {
1049*2248Sraf 				if (errno == ESPIPE) {
1050*2248Sraf 					retval = read(arg->fd,
1051*2248Sraf 					    arg->buf, arg->bufsz);
1052*2248Sraf 					if (retval == -1)
1053*2248Sraf 						error = errno;
1054*2248Sraf 				} else {
1055*2248Sraf 					error = errno;
1056*2248Sraf 				}
1057*2248Sraf 			}
1058*2248Sraf 			sigoff(self);	/* block SIGAIOCANCEL */
1059*2248Sraf 			break;
1060*2248Sraf 		case AIOAWRITE64:
1061*2248Sraf 			sigon(self);	/* unblock SIGAIOCANCEL */
1062*2248Sraf 			retval = pwrite64(arg->fd, arg->buf,
1063*2248Sraf 			    arg->bufsz, arg->offset);
1064*2248Sraf 			if (retval == -1) {
1065*2248Sraf 				if (errno == ESPIPE) {
1066*2248Sraf 					retval = write(arg->fd,
1067*2248Sraf 					    arg->buf, arg->bufsz);
1068*2248Sraf 					if (retval == -1)
1069*2248Sraf 						error = errno;
1070*2248Sraf 				} else {
1071*2248Sraf 					error = errno;
1072*2248Sraf 				}
1073*2248Sraf 			}
1074*2248Sraf 			sigoff(self);	/* block SIGAIOCANCEL */
1075*2248Sraf 			break;
1076*2248Sraf #endif	/* !defined(_LP64) */
1077*2248Sraf 		case AIOFSYNC:
1078*2248Sraf 			if (_aio_fsync_del(aiowp, reqp))
1079*2248Sraf 				goto top;
1080*2248Sraf 			ASSERT(reqp->req_head == NULL);
1081*2248Sraf 			/*
1082*2248Sraf 			 * All writes for this fsync request are now
1083*2248Sraf 			 * acknowledged.  Now make these writes visible
1084*2248Sraf 			 * and put the final request into the hash table.
1085*2248Sraf 			 */
1086*2248Sraf 			if (reqp->req_state == AIO_REQ_CANCELED) {
1087*2248Sraf 				/* EMPTY */;
1088*2248Sraf 			} else if (arg->offset == O_SYNC) {
1089*2248Sraf 				if ((retval = __fdsync(arg->fd, FSYNC)) == -1)
1090*2248Sraf 					error = errno;
1091*2248Sraf 			} else {
1092*2248Sraf 				if ((retval = __fdsync(arg->fd, FDSYNC)) == -1)
1093*2248Sraf 					error = errno;
1094*2248Sraf 			}
1095*2248Sraf 			if (_aio_hash_insert(reqp->req_resultp, reqp) != 0)
1096*2248Sraf 				aio_panic("_aio_do_request(): AIOFSYNC: "
1097*2248Sraf 				    "request already in hash table");
1098*2248Sraf 			break;
1099*2248Sraf 		default:
1100*2248Sraf 			aio_panic("_aio_do_request, bad op");
1101*2248Sraf 		}
1102*2248Sraf 
1103*2248Sraf 		_aio_finish_request(aiowp, retval, error);
1104*2248Sraf 	}
1105*2248Sraf 	/* NOTREACHED */
1106*2248Sraf 	return (NULL);
1107*2248Sraf }
1108*2248Sraf 
1109*2248Sraf /*
1110*2248Sraf  * Perform the tail processing for _aio_do_request().
1111*2248Sraf  * The in-progress request may or may not have been cancelled.
1112*2248Sraf  */
1113*2248Sraf static void
1114*2248Sraf _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error)
1115*2248Sraf {
1116*2248Sraf 	aio_req_t *reqp;
1117*2248Sraf 
1118*2248Sraf 	sig_mutex_lock(&aiowp->work_qlock1);
1119*2248Sraf 	if ((reqp = aiowp->work_req) == NULL)
1120*2248Sraf 		sig_mutex_unlock(&aiowp->work_qlock1);
1121*2248Sraf 	else {
1122*2248Sraf 		aiowp->work_req = NULL;
1123*2248Sraf 		if (reqp->req_state == AIO_REQ_CANCELED) {
1124*2248Sraf 			retval = -1;
1125*2248Sraf 			error = ECANCELED;
1126*2248Sraf 		}
1127*2248Sraf 		if (!POSIX_AIO(reqp)) {
1128*2248Sraf 			sig_mutex_unlock(&aiowp->work_qlock1);
1129*2248Sraf 			sig_mutex_lock(&__aio_mutex);
1130*2248Sraf 			if (reqp->req_state == AIO_REQ_INPROGRESS)
1131*2248Sraf 				reqp->req_state = AIO_REQ_DONE;
1132*2248Sraf 			_aio_req_done_cnt++;
1133*2248Sraf 			_aio_set_result(reqp, retval, error);
1134*2248Sraf 			if (error == ECANCELED)
1135*2248Sraf 				_aio_outstand_cnt--;
1136*2248Sraf 			sig_mutex_unlock(&__aio_mutex);
1137*2248Sraf 		} else {
1138*2248Sraf 			if (reqp->req_state == AIO_REQ_INPROGRESS)
1139*2248Sraf 				reqp->req_state = AIO_REQ_DONE;
1140*2248Sraf 			sig_mutex_unlock(&aiowp->work_qlock1);
1141*2248Sraf 			_aiodone(reqp, retval, error);
1142*2248Sraf 		}
1143*2248Sraf 	}
1144*2248Sraf }
1145*2248Sraf 
1146*2248Sraf void
1147*2248Sraf _aio_req_mark_done(aio_req_t *reqp)
1148*2248Sraf {
1149*2248Sraf #if !defined(_LP64)
1150*2248Sraf 	if (reqp->req_largefile)
1151*2248Sraf 		((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1152*2248Sraf 	else
1153*2248Sraf #endif
1154*2248Sraf 		((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1155*2248Sraf }
1156*2248Sraf 
1157*2248Sraf /*
1158*2248Sraf  * Sleep for 'ticks' clock ticks to give somebody else a chance to run,
1159*2248Sraf  * hopefully to consume one of our queued signals.
1160*2248Sraf  */
1161*2248Sraf static void
1162*2248Sraf _aio_delay(int ticks)
1163*2248Sraf {
1164*2248Sraf 	(void) usleep(ticks * (MICROSEC / hz));
1165*2248Sraf }
1166*2248Sraf 
1167*2248Sraf /*
1168*2248Sraf  * Actually send the notifications.
1169*2248Sraf  * We could block indefinitely here if the application
1170*2248Sraf  * is not listening for the signal or port notifications.
1171*2248Sraf  */
1172*2248Sraf static void
1173*2248Sraf send_notification(notif_param_t *npp)
1174*2248Sraf {
1175*2248Sraf 	extern int __sigqueue(pid_t pid, int signo,
1176*2248Sraf 		/* const union sigval */ void *value, int si_code, int block);
1177*2248Sraf 
1178*2248Sraf 	if (npp->np_signo)
1179*2248Sraf 		(void) __sigqueue(__pid, npp->np_signo, npp->np_user,
1180*2248Sraf 		    SI_ASYNCIO, 1);
1181*2248Sraf 	else if (npp->np_port >= 0)
1182*2248Sraf 		(void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO,
1183*2248Sraf 		    npp->np_event, npp->np_object, npp->np_user);
1184*2248Sraf 
1185*2248Sraf 	if (npp->np_lio_signo)
1186*2248Sraf 		(void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user,
1187*2248Sraf 		    SI_ASYNCIO, 1);
1188*2248Sraf 	else if (npp->np_lio_port >= 0)
1189*2248Sraf 		(void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO,
1190*2248Sraf 		    npp->np_lio_event, npp->np_lio_object, npp->np_lio_user);
1191*2248Sraf }
1192*2248Sraf 
1193*2248Sraf /*
1194*2248Sraf  * Asynchronous notification worker.
1195*2248Sraf  */
1196*2248Sraf void *
1197*2248Sraf _aio_do_notify(void *arg)
1198*2248Sraf {
1199*2248Sraf 	aio_worker_t *aiowp = (aio_worker_t *)arg;
1200*2248Sraf 	aio_req_t *reqp;
1201*2248Sraf 
1202*2248Sraf 	/*
1203*2248Sraf 	 * This isn't really necessary.  All signals are blocked.
1204*2248Sraf 	 */
1205*2248Sraf 	if (pthread_setspecific(_aio_key, aiowp) != 0)
1206*2248Sraf 		aio_panic("_aio_do_notify, pthread_setspecific()");
1207*2248Sraf 
1208*2248Sraf 	/*
1209*2248Sraf 	 * Notifications are never cancelled.
1210*2248Sraf 	 * All signals remain blocked, forever.
1211*2248Sraf 	 */
1212*2248Sraf 	for (;;) {
1213*2248Sraf 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
1214*2248Sraf 			if (_aio_idle(aiowp) != 0)
1215*2248Sraf 				aio_panic("_aio_do_notify: _aio_idle() failed");
1216*2248Sraf 		}
1217*2248Sraf 		send_notification(&reqp->req_notify);
1218*2248Sraf 		_aio_req_free(reqp);
1219*2248Sraf 	}
1220*2248Sraf 
1221*2248Sraf 	/* NOTREACHED */
1222*2248Sraf 	return (NULL);
1223*2248Sraf }
1224*2248Sraf 
1225*2248Sraf /*
1226*2248Sraf  * Do the completion semantics for a request that was either canceled
1227*2248Sraf  * by _aio_cancel_req() or was completed by _aio_do_request().
1228*2248Sraf  */
1229*2248Sraf static void
1230*2248Sraf _aiodone(aio_req_t *reqp, ssize_t retval, int error)
1231*2248Sraf {
1232*2248Sraf 	aio_result_t *resultp = reqp->req_resultp;
1233*2248Sraf 	int notify = 0;
1234*2248Sraf 	aio_lio_t *head;
1235*2248Sraf 	int sigev_none;
1236*2248Sraf 	int sigev_signal;
1237*2248Sraf 	int sigev_thread;
1238*2248Sraf 	int sigev_port;
1239*2248Sraf 	notif_param_t np;
1240*2248Sraf 
1241*2248Sraf 	/*
1242*2248Sraf 	 * We call _aiodone() only for Posix I/O.
1243*2248Sraf 	 */
1244*2248Sraf 	ASSERT(POSIX_AIO(reqp));
1245*2248Sraf 
1246*2248Sraf 	sigev_none = 0;
1247*2248Sraf 	sigev_signal = 0;
1248*2248Sraf 	sigev_thread = 0;
1249*2248Sraf 	sigev_port = 0;
1250*2248Sraf 	np.np_signo = 0;
1251*2248Sraf 	np.np_port = -1;
1252*2248Sraf 	np.np_lio_signo = 0;
1253*2248Sraf 	np.np_lio_port = -1;
1254*2248Sraf 
1255*2248Sraf 	switch (reqp->req_sigevent.sigev_notify) {
1256*2248Sraf 	case SIGEV_NONE:
1257*2248Sraf 		sigev_none = 1;
1258*2248Sraf 		break;
1259*2248Sraf 	case SIGEV_SIGNAL:
1260*2248Sraf 		sigev_signal = 1;
1261*2248Sraf 		break;
1262*2248Sraf 	case SIGEV_THREAD:
1263*2248Sraf 		sigev_thread = 1;
1264*2248Sraf 		break;
1265*2248Sraf 	case SIGEV_PORT:
1266*2248Sraf 		sigev_port = 1;
1267*2248Sraf 		break;
1268*2248Sraf 	default:
1269*2248Sraf 		aio_panic("_aiodone: improper sigev_notify");
1270*2248Sraf 		break;
1271*2248Sraf 	}
1272*2248Sraf 
1273*2248Sraf 	/*
1274*2248Sraf 	 * Figure out the notification parameters while holding __aio_mutex.
1275*2248Sraf 	 * Actually perform the notifications after dropping __aio_mutex.
1276*2248Sraf 	 * This allows us to sleep for a long time (if the notifications
1277*2248Sraf 	 * incur delays) without impeding other async I/O operations.
1278*2248Sraf 	 */
1279*2248Sraf 
1280*2248Sraf 	sig_mutex_lock(&__aio_mutex);
1281*2248Sraf 
1282*2248Sraf 	if (sigev_signal) {
1283*2248Sraf 		if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0)
1284*2248Sraf 			notify = 1;
1285*2248Sraf 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1286*2248Sraf 	} else if (sigev_thread | sigev_port) {
1287*2248Sraf 		if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0)
1288*2248Sraf 			notify = 1;
1289*2248Sraf 		np.np_event = reqp->req_op;
1290*2248Sraf 		if (np.np_event == AIOFSYNC && reqp->req_largefile)
1291*2248Sraf 			np.np_event = AIOFSYNC64;
1292*2248Sraf 		np.np_object = (uintptr_t)reqp->req_aiocbp;
1293*2248Sraf 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1294*2248Sraf 	}
1295*2248Sraf 
1296*2248Sraf 	if (resultp->aio_errno == EINPROGRESS)
1297*2248Sraf 		_aio_set_result(reqp, retval, error);
1298*2248Sraf 
1299*2248Sraf 	_aio_outstand_cnt--;
1300*2248Sraf 
1301*2248Sraf 	head = reqp->req_head;
1302*2248Sraf 	reqp->req_head = NULL;
1303*2248Sraf 
1304*2248Sraf 	if (sigev_none) {
1305*2248Sraf 		_aio_enq_doneq(reqp);
1306*2248Sraf 		reqp = NULL;
1307*2248Sraf 	} else {
1308*2248Sraf 		(void) _aio_hash_del(resultp);
1309*2248Sraf 		_aio_req_mark_done(reqp);
1310*2248Sraf 	}
1311*2248Sraf 
1312*2248Sraf 	_aio_waitn_wakeup();
1313*2248Sraf 
1314*2248Sraf 	/*
1315*2248Sraf 	 * __aio_waitn() sets AIO_WAIT_INPROGRESS and
1316*2248Sraf 	 * __aio_suspend() increments "_aio_kernel_suspend"
1317*2248Sraf 	 * when they are waiting in the kernel for completed I/Os.
1318*2248Sraf 	 *
1319*2248Sraf 	 * _kaio(AIONOTIFY) awakes the corresponding function
1320*2248Sraf 	 * in the kernel; then the corresponding __aio_waitn() or
1321*2248Sraf 	 * __aio_suspend() function could reap the recently
1322*2248Sraf 	 * completed I/Os (_aiodone()).
1323*2248Sraf 	 */
1324*2248Sraf 	if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0)
1325*2248Sraf 		(void) _kaio(AIONOTIFY);
1326*2248Sraf 
1327*2248Sraf 	sig_mutex_unlock(&__aio_mutex);
1328*2248Sraf 
1329*2248Sraf 	if (head != NULL) {
1330*2248Sraf 		/*
1331*2248Sraf 		 * If all the lio requests have completed,
1332*2248Sraf 		 * prepare to notify the waiting thread.
1333*2248Sraf 		 */
1334*2248Sraf 		sig_mutex_lock(&head->lio_mutex);
1335*2248Sraf 		ASSERT(head->lio_refcnt == head->lio_nent);
1336*2248Sraf 		if (head->lio_refcnt == 1) {
1337*2248Sraf 			int waiting = 0;
1338*2248Sraf 			if (head->lio_mode == LIO_WAIT) {
1339*2248Sraf 				if ((waiting = head->lio_waiting) != 0)
1340*2248Sraf 					(void) cond_signal(&head->lio_cond_cv);
1341*2248Sraf 			} else if (head->lio_port < 0) { /* none or signal */
1342*2248Sraf 				if ((np.np_lio_signo = head->lio_signo) != 0)
1343*2248Sraf 					notify = 1;
1344*2248Sraf 				np.np_lio_user = head->lio_sigval.sival_ptr;
1345*2248Sraf 			} else {			/* thread or port */
1346*2248Sraf 				notify = 1;
1347*2248Sraf 				np.np_lio_port = head->lio_port;
1348*2248Sraf 				np.np_lio_event = head->lio_event;
1349*2248Sraf 				np.np_lio_object =
1350*2248Sraf 				    (uintptr_t)head->lio_sigevent;
1351*2248Sraf 				np.np_lio_user = head->lio_sigval.sival_ptr;
1352*2248Sraf 			}
1353*2248Sraf 			head->lio_nent = head->lio_refcnt = 0;
1354*2248Sraf 			sig_mutex_unlock(&head->lio_mutex);
1355*2248Sraf 			if (waiting == 0)
1356*2248Sraf 				_aio_lio_free(head);
1357*2248Sraf 		} else {
1358*2248Sraf 			head->lio_nent--;
1359*2248Sraf 			head->lio_refcnt--;
1360*2248Sraf 			sig_mutex_unlock(&head->lio_mutex);
1361*2248Sraf 		}
1362*2248Sraf 	}
1363*2248Sraf 
1364*2248Sraf 	/*
1365*2248Sraf 	 * The request is completed; now perform the notifications.
1366*2248Sraf 	 */
1367*2248Sraf 	if (notify) {
1368*2248Sraf 		if (reqp != NULL) {
1369*2248Sraf 			/*
1370*2248Sraf 			 * We usually put the request on the notification
1371*2248Sraf 			 * queue because we don't want to block and delay
1372*2248Sraf 			 * other operations behind us in the work queue.
1373*2248Sraf 			 * Also we must never block on a cancel notification
1374*2248Sraf 			 * because we are being called from an application
1375*2248Sraf 			 * thread in this case and that could lead to deadlock
1376*2248Sraf 			 * if no other thread is receiving notificatins.
1377*2248Sraf 			 */
1378*2248Sraf 			reqp->req_notify = np;
1379*2248Sraf 			reqp->req_op = AIONOTIFY;
1380*2248Sraf 			_aio_req_add(reqp, &__workers_no, AIONOTIFY);
1381*2248Sraf 			reqp = NULL;
1382*2248Sraf 		} else {
1383*2248Sraf 			/*
1384*2248Sraf 			 * We already put the request on the done queue,
1385*2248Sraf 			 * so we can't queue it to the notification queue.
1386*2248Sraf 			 * Just do the notification directly.
1387*2248Sraf 			 */
1388*2248Sraf 			send_notification(&np);
1389*2248Sraf 		}
1390*2248Sraf 	}
1391*2248Sraf 
1392*2248Sraf 	if (reqp != NULL)
1393*2248Sraf 		_aio_req_free(reqp);
1394*2248Sraf }
1395*2248Sraf 
1396*2248Sraf /*
1397*2248Sraf  * Delete fsync requests from list head until there is
1398*2248Sraf  * only one left.  Return 0 when there is only one,
1399*2248Sraf  * otherwise return a non-zero value.
1400*2248Sraf  */
1401*2248Sraf static int
1402*2248Sraf _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp)
1403*2248Sraf {
1404*2248Sraf 	aio_lio_t *head = reqp->req_head;
1405*2248Sraf 	int rval = 0;
1406*2248Sraf 
1407*2248Sraf 	ASSERT(reqp == aiowp->work_req);
1408*2248Sraf 	sig_mutex_lock(&aiowp->work_qlock1);
1409*2248Sraf 	sig_mutex_lock(&head->lio_mutex);
1410*2248Sraf 	if (head->lio_refcnt > 1) {
1411*2248Sraf 		head->lio_refcnt--;
1412*2248Sraf 		head->lio_nent--;
1413*2248Sraf 		aiowp->work_req = NULL;
1414*2248Sraf 		sig_mutex_unlock(&head->lio_mutex);
1415*2248Sraf 		sig_mutex_unlock(&aiowp->work_qlock1);
1416*2248Sraf 		sig_mutex_lock(&__aio_mutex);
1417*2248Sraf 		_aio_outstand_cnt--;
1418*2248Sraf 		_aio_waitn_wakeup();
1419*2248Sraf 		sig_mutex_unlock(&__aio_mutex);
1420*2248Sraf 		_aio_req_free(reqp);
1421*2248Sraf 		return (1);
1422*2248Sraf 	}
1423*2248Sraf 	ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1);
1424*2248Sraf 	reqp->req_head = NULL;
1425*2248Sraf 	if (head->lio_canned)
1426*2248Sraf 		reqp->req_state = AIO_REQ_CANCELED;
1427*2248Sraf 	if (head->lio_mode == LIO_DESTROY) {
1428*2248Sraf 		aiowp->work_req = NULL;
1429*2248Sraf 		rval = 1;
1430*2248Sraf 	}
1431*2248Sraf 	sig_mutex_unlock(&head->lio_mutex);
1432*2248Sraf 	sig_mutex_unlock(&aiowp->work_qlock1);
1433*2248Sraf 	head->lio_refcnt--;
1434*2248Sraf 	head->lio_nent--;
1435*2248Sraf 	_aio_lio_free(head);
1436*2248Sraf 	if (rval != 0)
1437*2248Sraf 		_aio_req_free(reqp);
1438*2248Sraf 	return (rval);
1439*2248Sraf }
1440*2248Sraf 
1441*2248Sraf /*
1442*2248Sraf  * A worker is set idle when its work queue is empty.
1443*2248Sraf  * The worker checks again that it has no more work
1444*2248Sraf  * and then goes to sleep waiting for more work.
1445*2248Sraf  */
1446*2248Sraf int
1447*2248Sraf _aio_idle(aio_worker_t *aiowp)
1448*2248Sraf {
1449*2248Sraf 	int error = 0;
1450*2248Sraf 
1451*2248Sraf 	sig_mutex_lock(&aiowp->work_qlock1);
1452*2248Sraf 	if (aiowp->work_count1 == 0) {
1453*2248Sraf 		ASSERT(aiowp->work_minload1 == 0);
1454*2248Sraf 		aiowp->work_idleflg = 1;
1455*2248Sraf 		/*
1456*2248Sraf 		 * A cancellation handler is not needed here.
1457*2248Sraf 		 * aio worker threads are never cancelled via pthread_cancel().
1458*2248Sraf 		 */
1459*2248Sraf 		error = sig_cond_wait(&aiowp->work_idle_cv,
1460*2248Sraf 		    &aiowp->work_qlock1);
1461*2248Sraf 		/*
1462*2248Sraf 		 * The idle flag is normally cleared before worker is awakened
1463*2248Sraf 		 * by aio_req_add().  On error (EINTR), we clear it ourself.
1464*2248Sraf 		 */
1465*2248Sraf 		if (error)
1466*2248Sraf 			aiowp->work_idleflg = 0;
1467*2248Sraf 	}
1468*2248Sraf 	sig_mutex_unlock(&aiowp->work_qlock1);
1469*2248Sraf 	return (error);
1470*2248Sraf }
1471*2248Sraf 
1472*2248Sraf /*
1473*2248Sraf  * A worker's completed AIO requests are placed onto a global
1474*2248Sraf  * done queue.  The application is only sent a SIGIO signal if
1475*2248Sraf  * the process has a handler enabled and it is not waiting via
1476*2248Sraf  * aiowait().
1477*2248Sraf  */
1478*2248Sraf static void
1479*2248Sraf _aio_work_done(aio_worker_t *aiowp)
1480*2248Sraf {
1481*2248Sraf 	aio_req_t *reqp;
1482*2248Sraf 
1483*2248Sraf 	sig_mutex_lock(&aiowp->work_qlock1);
1484*2248Sraf 	reqp = aiowp->work_prev1;
1485*2248Sraf 	reqp->req_next = NULL;
1486*2248Sraf 	aiowp->work_done1 = 0;
1487*2248Sraf 	aiowp->work_tail1 = aiowp->work_next1;
1488*2248Sraf 	if (aiowp->work_tail1 == NULL)
1489*2248Sraf 		aiowp->work_head1 = NULL;
1490*2248Sraf 	aiowp->work_prev1 = NULL;
1491*2248Sraf 	sig_mutex_unlock(&aiowp->work_qlock1);
1492*2248Sraf 	sig_mutex_lock(&__aio_mutex);
1493*2248Sraf 	_aio_donecnt++;
1494*2248Sraf 	_aio_outstand_cnt--;
1495*2248Sraf 	_aio_req_done_cnt--;
1496*2248Sraf 	ASSERT(_aio_donecnt > 0 &&
1497*2248Sraf 	    _aio_outstand_cnt >= 0 &&
1498*2248Sraf 	    _aio_req_done_cnt >= 0);
1499*2248Sraf 	ASSERT(reqp != NULL);
1500*2248Sraf 
1501*2248Sraf 	if (_aio_done_tail == NULL) {
1502*2248Sraf 		_aio_done_head = _aio_done_tail = reqp;
1503*2248Sraf 	} else {
1504*2248Sraf 		_aio_done_head->req_next = reqp;
1505*2248Sraf 		_aio_done_head = reqp;
1506*2248Sraf 	}
1507*2248Sraf 
1508*2248Sraf 	if (_aiowait_flag) {
1509*2248Sraf 		sig_mutex_unlock(&__aio_mutex);
1510*2248Sraf 		(void) _kaio(AIONOTIFY);
1511*2248Sraf 	} else {
1512*2248Sraf 		sig_mutex_unlock(&__aio_mutex);
1513*2248Sraf 		if (_sigio_enabled)
1514*2248Sraf 			(void) kill(__pid, SIGIO);
1515*2248Sraf 	}
1516*2248Sraf }
1517*2248Sraf 
1518*2248Sraf /*
1519*2248Sraf  * The done queue consists of AIO requests that are in either the
1520*2248Sraf  * AIO_REQ_DONE or AIO_REQ_CANCELED state.  Requests that were cancelled
1521*2248Sraf  * are discarded.  If the done queue is empty then NULL is returned.
1522*2248Sraf  * Otherwise the address of a done aio_result_t is returned.
1523*2248Sraf  */
1524*2248Sraf aio_result_t *
1525*2248Sraf _aio_req_done(void)
1526*2248Sraf {
1527*2248Sraf 	aio_req_t *reqp;
1528*2248Sraf 	aio_result_t *resultp;
1529*2248Sraf 
1530*2248Sraf 	ASSERT(MUTEX_HELD(&__aio_mutex));
1531*2248Sraf 
1532*2248Sraf 	if ((reqp = _aio_done_tail) != NULL) {
1533*2248Sraf 		if ((_aio_done_tail = reqp->req_next) == NULL)
1534*2248Sraf 			_aio_done_head = NULL;
1535*2248Sraf 		ASSERT(_aio_donecnt > 0);
1536*2248Sraf 		_aio_donecnt--;
1537*2248Sraf 		(void) _aio_hash_del(reqp->req_resultp);
1538*2248Sraf 		resultp = reqp->req_resultp;
1539*2248Sraf 		ASSERT(reqp->req_state == AIO_REQ_DONE);
1540*2248Sraf 		_aio_req_free(reqp);
1541*2248Sraf 		return (resultp);
1542*2248Sraf 	}
1543*2248Sraf 	/* is queue empty? */
1544*2248Sraf 	if (reqp == NULL && _aio_outstand_cnt == 0) {
1545*2248Sraf 		return ((aio_result_t *)-1);
1546*2248Sraf 	}
1547*2248Sraf 	return (NULL);
1548*2248Sraf }
1549*2248Sraf 
1550*2248Sraf /*
1551*2248Sraf  * Set the return and errno values for the application's use.
1552*2248Sraf  *
1553*2248Sraf  * For the Posix interfaces, we must set the return value first followed
1554*2248Sraf  * by the errno value because the Posix interfaces allow for a change
1555*2248Sraf  * in the errno value from EINPROGRESS to something else to signal
1556*2248Sraf  * the completion of the asynchronous request.
1557*2248Sraf  *
1558*2248Sraf  * The opposite is true for the Solaris interfaces.  These allow for
1559*2248Sraf  * a change in the return value from AIO_INPROGRESS to something else
1560*2248Sraf  * to signal the completion of the asynchronous request.
1561*2248Sraf  */
1562*2248Sraf void
1563*2248Sraf _aio_set_result(aio_req_t *reqp, ssize_t retval, int error)
1564*2248Sraf {
1565*2248Sraf 	aio_result_t *resultp = reqp->req_resultp;
1566*2248Sraf 
1567*2248Sraf 	if (POSIX_AIO(reqp)) {
1568*2248Sraf 		resultp->aio_return = retval;
1569*2248Sraf 		membar_producer();
1570*2248Sraf 		resultp->aio_errno = error;
1571*2248Sraf 	} else {
1572*2248Sraf 		resultp->aio_errno = error;
1573*2248Sraf 		membar_producer();
1574*2248Sraf 		resultp->aio_return = retval;
1575*2248Sraf 	}
1576*2248Sraf }
1577*2248Sraf 
1578*2248Sraf /*
1579*2248Sraf  * Add an AIO request onto the next work queue.
1580*2248Sraf  * A circular list of workers is used to choose the next worker.
1581*2248Sraf  */
1582*2248Sraf void
1583*2248Sraf _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode)
1584*2248Sraf {
1585*2248Sraf 	ulwp_t *self = curthread;
1586*2248Sraf 	aio_worker_t *aiowp;
1587*2248Sraf 	aio_worker_t *first;
1588*2248Sraf 	int load_bal_flg = 1;
1589*2248Sraf 	int found;
1590*2248Sraf 
1591*2248Sraf 	ASSERT(reqp->req_state != AIO_REQ_DONEQ);
1592*2248Sraf 	reqp->req_next = NULL;
1593*2248Sraf 	/*
1594*2248Sraf 	 * Try to acquire the next worker's work queue.  If it is locked,
1595*2248Sraf 	 * then search the list of workers until a queue is found unlocked,
1596*2248Sraf 	 * or until the list is completely traversed at which point another
1597*2248Sraf 	 * worker will be created.
1598*2248Sraf 	 */
1599*2248Sraf 	sigoff(self);		/* defer SIGIO */
1600*2248Sraf 	sig_mutex_lock(&__aio_mutex);
1601*2248Sraf 	first = aiowp = *nextworker;
1602*2248Sraf 	if (mode != AIONOTIFY)
1603*2248Sraf 		_aio_outstand_cnt++;
1604*2248Sraf 	sig_mutex_unlock(&__aio_mutex);
1605*2248Sraf 
1606*2248Sraf 	switch (mode) {
1607*2248Sraf 	case AIOREAD:
1608*2248Sraf 	case AIOWRITE:
1609*2248Sraf 	case AIOAREAD:
1610*2248Sraf 	case AIOAWRITE:
1611*2248Sraf #if !defined(_LP64)
1612*2248Sraf 	case AIOAREAD64:
1613*2248Sraf 	case AIOAWRITE64:
1614*2248Sraf #endif
1615*2248Sraf 		/* try to find an idle worker */
1616*2248Sraf 		found = 0;
1617*2248Sraf 		do {
1618*2248Sraf 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1619*2248Sraf 				if (aiowp->work_idleflg) {
1620*2248Sraf 					found = 1;
1621*2248Sraf 					break;
1622*2248Sraf 				}
1623*2248Sraf 				sig_mutex_unlock(&aiowp->work_qlock1);
1624*2248Sraf 			}
1625*2248Sraf 		} while ((aiowp = aiowp->work_forw) != first);
1626*2248Sraf 
1627*2248Sraf 		if (found) {
1628*2248Sraf 			aiowp->work_minload1++;
1629*2248Sraf 			break;
1630*2248Sraf 		}
1631*2248Sraf 
1632*2248Sraf 		/* try to acquire some worker's queue lock */
1633*2248Sraf 		do {
1634*2248Sraf 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1635*2248Sraf 				found = 1;
1636*2248Sraf 				break;
1637*2248Sraf 			}
1638*2248Sraf 		} while ((aiowp = aiowp->work_forw) != first);
1639*2248Sraf 
1640*2248Sraf 		/*
1641*2248Sraf 		 * Create more workers when the workers appear overloaded.
1642*2248Sraf 		 * Either all the workers are busy draining their queues
1643*2248Sraf 		 * or no worker's queue lock could be acquired.
1644*2248Sraf 		 */
1645*2248Sraf 		if (!found) {
1646*2248Sraf 			if (_aio_worker_cnt < _max_workers) {
1647*2248Sraf 				if (_aio_create_worker(reqp, mode))
1648*2248Sraf 					aio_panic("_aio_req_add: add worker");
1649*2248Sraf 				sigon(self);	/* reenable SIGIO */
1650*2248Sraf 				return;
1651*2248Sraf 			}
1652*2248Sraf 
1653*2248Sraf 			/*
1654*2248Sraf 			 * No worker available and we have created
1655*2248Sraf 			 * _max_workers, keep going through the
1656*2248Sraf 			 * list slowly until we get a lock
1657*2248Sraf 			 */
1658*2248Sraf 			while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) {
1659*2248Sraf 				/*
1660*2248Sraf 				 * give someone else a chance
1661*2248Sraf 				 */
1662*2248Sraf 				_aio_delay(1);
1663*2248Sraf 				aiowp = aiowp->work_forw;
1664*2248Sraf 			}
1665*2248Sraf 		}
1666*2248Sraf 
1667*2248Sraf 		ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1668*2248Sraf 		if (_aio_worker_cnt < _max_workers &&
1669*2248Sraf 		    aiowp->work_minload1 >= _minworkload) {
1670*2248Sraf 			sig_mutex_unlock(&aiowp->work_qlock1);
1671*2248Sraf 			sig_mutex_lock(&__aio_mutex);
1672*2248Sraf 			*nextworker = aiowp->work_forw;
1673*2248Sraf 			sig_mutex_unlock(&__aio_mutex);
1674*2248Sraf 			if (_aio_create_worker(reqp, mode))
1675*2248Sraf 				aio_panic("aio_req_add: add worker");
1676*2248Sraf 			sigon(self);	/* reenable SIGIO */
1677*2248Sraf 			return;
1678*2248Sraf 		}
1679*2248Sraf 		aiowp->work_minload1++;
1680*2248Sraf 		break;
1681*2248Sraf 	case AIOFSYNC:
1682*2248Sraf 	case AIONOTIFY:
1683*2248Sraf 		load_bal_flg = 0;
1684*2248Sraf 		sig_mutex_lock(&aiowp->work_qlock1);
1685*2248Sraf 		break;
1686*2248Sraf 	default:
1687*2248Sraf 		aio_panic("_aio_req_add: invalid mode");
1688*2248Sraf 		break;
1689*2248Sraf 	}
1690*2248Sraf 	/*
1691*2248Sraf 	 * Put request onto worker's work queue.
1692*2248Sraf 	 */
1693*2248Sraf 	if (aiowp->work_tail1 == NULL) {
1694*2248Sraf 		ASSERT(aiowp->work_count1 == 0);
1695*2248Sraf 		aiowp->work_tail1 = reqp;
1696*2248Sraf 		aiowp->work_next1 = reqp;
1697*2248Sraf 	} else {
1698*2248Sraf 		aiowp->work_head1->req_next = reqp;
1699*2248Sraf 		if (aiowp->work_next1 == NULL)
1700*2248Sraf 			aiowp->work_next1 = reqp;
1701*2248Sraf 	}
1702*2248Sraf 	reqp->req_state = AIO_REQ_QUEUED;
1703*2248Sraf 	reqp->req_worker = aiowp;
1704*2248Sraf 	aiowp->work_head1 = reqp;
1705*2248Sraf 	/*
1706*2248Sraf 	 * Awaken worker if it is not currently active.
1707*2248Sraf 	 */
1708*2248Sraf 	if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) {
1709*2248Sraf 		aiowp->work_idleflg = 0;
1710*2248Sraf 		(void) cond_signal(&aiowp->work_idle_cv);
1711*2248Sraf 	}
1712*2248Sraf 	sig_mutex_unlock(&aiowp->work_qlock1);
1713*2248Sraf 
1714*2248Sraf 	if (load_bal_flg) {
1715*2248Sraf 		sig_mutex_lock(&__aio_mutex);
1716*2248Sraf 		*nextworker = aiowp->work_forw;
1717*2248Sraf 		sig_mutex_unlock(&__aio_mutex);
1718*2248Sraf 	}
1719*2248Sraf 	sigon(self);	/* reenable SIGIO */
1720*2248Sraf }
1721*2248Sraf 
1722*2248Sraf /*
1723*2248Sraf  * Get an AIO request for a specified worker.
1724*2248Sraf  * If the work queue is empty, return NULL.
1725*2248Sraf  */
1726*2248Sraf aio_req_t *
1727*2248Sraf _aio_req_get(aio_worker_t *aiowp)
1728*2248Sraf {
1729*2248Sraf 	aio_req_t *reqp;
1730*2248Sraf 
1731*2248Sraf 	sig_mutex_lock(&aiowp->work_qlock1);
1732*2248Sraf 	if ((reqp = aiowp->work_next1) != NULL) {
1733*2248Sraf 		/*
1734*2248Sraf 		 * Remove a POSIX request from the queue; the
1735*2248Sraf 		 * request queue is a singularly linked list
1736*2248Sraf 		 * with a previous pointer.  The request is
1737*2248Sraf 		 * removed by updating the previous pointer.
1738*2248Sraf 		 *
1739*2248Sraf 		 * Non-posix requests are left on the queue
1740*2248Sraf 		 * to eventually be placed on the done queue.
1741*2248Sraf 		 */
1742*2248Sraf 
1743*2248Sraf 		if (POSIX_AIO(reqp)) {
1744*2248Sraf 			if (aiowp->work_prev1 == NULL) {
1745*2248Sraf 				aiowp->work_tail1 = reqp->req_next;
1746*2248Sraf 				if (aiowp->work_tail1 == NULL)
1747*2248Sraf 					aiowp->work_head1 = NULL;
1748*2248Sraf 			} else {
1749*2248Sraf 				aiowp->work_prev1->req_next = reqp->req_next;
1750*2248Sraf 				if (aiowp->work_head1 == reqp)
1751*2248Sraf 					aiowp->work_head1 = reqp->req_next;
1752*2248Sraf 			}
1753*2248Sraf 
1754*2248Sraf 		} else {
1755*2248Sraf 			aiowp->work_prev1 = reqp;
1756*2248Sraf 			ASSERT(aiowp->work_done1 >= 0);
1757*2248Sraf 			aiowp->work_done1++;
1758*2248Sraf 		}
1759*2248Sraf 		ASSERT(reqp != reqp->req_next);
1760*2248Sraf 		aiowp->work_next1 = reqp->req_next;
1761*2248Sraf 		ASSERT(aiowp->work_count1 >= 1);
1762*2248Sraf 		aiowp->work_count1--;
1763*2248Sraf 		switch (reqp->req_op) {
1764*2248Sraf 		case AIOREAD:
1765*2248Sraf 		case AIOWRITE:
1766*2248Sraf 		case AIOAREAD:
1767*2248Sraf 		case AIOAWRITE:
1768*2248Sraf #if !defined(_LP64)
1769*2248Sraf 		case AIOAREAD64:
1770*2248Sraf 		case AIOAWRITE64:
1771*2248Sraf #endif
1772*2248Sraf 			ASSERT(aiowp->work_minload1 > 0);
1773*2248Sraf 			aiowp->work_minload1--;
1774*2248Sraf 			break;
1775*2248Sraf 		}
1776*2248Sraf 		reqp->req_state = AIO_REQ_INPROGRESS;
1777*2248Sraf 	}
1778*2248Sraf 	aiowp->work_req = reqp;
1779*2248Sraf 	ASSERT(reqp != NULL || aiowp->work_count1 == 0);
1780*2248Sraf 	sig_mutex_unlock(&aiowp->work_qlock1);
1781*2248Sraf 	return (reqp);
1782*2248Sraf }
1783*2248Sraf 
1784*2248Sraf static void
1785*2248Sraf _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate)
1786*2248Sraf {
1787*2248Sraf 	aio_req_t **last;
1788*2248Sraf 	aio_req_t *lastrp;
1789*2248Sraf 	aio_req_t *next;
1790*2248Sraf 
1791*2248Sraf 	ASSERT(aiowp != NULL);
1792*2248Sraf 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1793*2248Sraf 	if (POSIX_AIO(reqp)) {
1794*2248Sraf 		if (ostate != AIO_REQ_QUEUED)
1795*2248Sraf 			return;
1796*2248Sraf 	}
1797*2248Sraf 	last = &aiowp->work_tail1;
1798*2248Sraf 	lastrp = aiowp->work_tail1;
1799*2248Sraf 	ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS);
1800*2248Sraf 	while ((next = *last) != NULL) {
1801*2248Sraf 		if (next == reqp) {
1802*2248Sraf 			*last = next->req_next;
1803*2248Sraf 			if (aiowp->work_next1 == next)
1804*2248Sraf 				aiowp->work_next1 = next->req_next;
1805*2248Sraf 
1806*2248Sraf 			if ((next->req_next != NULL) ||
1807*2248Sraf 			    (aiowp->work_done1 == 0)) {
1808*2248Sraf 				if (aiowp->work_head1 == next)
1809*2248Sraf 					aiowp->work_head1 = next->req_next;
1810*2248Sraf 				if (aiowp->work_prev1 == next)
1811*2248Sraf 					aiowp->work_prev1 = next->req_next;
1812*2248Sraf 			} else {
1813*2248Sraf 				if (aiowp->work_head1 == next)
1814*2248Sraf 					aiowp->work_head1 = lastrp;
1815*2248Sraf 				if (aiowp->work_prev1 == next)
1816*2248Sraf 					aiowp->work_prev1 = lastrp;
1817*2248Sraf 			}
1818*2248Sraf 
1819*2248Sraf 			if (ostate == AIO_REQ_QUEUED) {
1820*2248Sraf 				ASSERT(aiowp->work_count1 >= 1);
1821*2248Sraf 				aiowp->work_count1--;
1822*2248Sraf 				ASSERT(aiowp->work_minload1 >= 1);
1823*2248Sraf 				aiowp->work_minload1--;
1824*2248Sraf 			} else {
1825*2248Sraf 				ASSERT(ostate == AIO_REQ_INPROGRESS &&
1826*2248Sraf 				    !POSIX_AIO(reqp));
1827*2248Sraf 				aiowp->work_done1--;
1828*2248Sraf 			}
1829*2248Sraf 			return;
1830*2248Sraf 		}
1831*2248Sraf 		last = &next->req_next;
1832*2248Sraf 		lastrp = next;
1833*2248Sraf 	}
1834*2248Sraf 	/* NOTREACHED */
1835*2248Sraf }
1836*2248Sraf 
1837*2248Sraf static void
1838*2248Sraf _aio_enq_doneq(aio_req_t *reqp)
1839*2248Sraf {
1840*2248Sraf 	if (_aio_doneq == NULL) {
1841*2248Sraf 		_aio_doneq = reqp;
1842*2248Sraf 		reqp->req_next = reqp->req_prev = reqp;
1843*2248Sraf 	} else {
1844*2248Sraf 		reqp->req_next = _aio_doneq;
1845*2248Sraf 		reqp->req_prev = _aio_doneq->req_prev;
1846*2248Sraf 		_aio_doneq->req_prev->req_next = reqp;
1847*2248Sraf 		_aio_doneq->req_prev = reqp;
1848*2248Sraf 	}
1849*2248Sraf 	reqp->req_state = AIO_REQ_DONEQ;
1850*2248Sraf 	_aio_doneq_cnt++;
1851*2248Sraf }
1852*2248Sraf 
1853*2248Sraf /*
1854*2248Sraf  * caller owns the _aio_mutex
1855*2248Sraf  */
1856*2248Sraf aio_req_t *
1857*2248Sraf _aio_req_remove(aio_req_t *reqp)
1858*2248Sraf {
1859*2248Sraf 	if (reqp && reqp->req_state != AIO_REQ_DONEQ)
1860*2248Sraf 		return (NULL);
1861*2248Sraf 
1862*2248Sraf 	if (reqp) {
1863*2248Sraf 		/* request in done queue */
1864*2248Sraf 		if (_aio_doneq == reqp)
1865*2248Sraf 			_aio_doneq = reqp->req_next;
1866*2248Sraf 		if (_aio_doneq == reqp) {
1867*2248Sraf 			/* only one request on queue */
1868*2248Sraf 			_aio_doneq = NULL;
1869*2248Sraf 		} else {
1870*2248Sraf 			aio_req_t *tmp = reqp->req_next;
1871*2248Sraf 			reqp->req_prev->req_next = tmp;
1872*2248Sraf 			tmp->req_prev = reqp->req_prev;
1873*2248Sraf 		}
1874*2248Sraf 	} else if ((reqp = _aio_doneq) != NULL) {
1875*2248Sraf 		if (reqp == reqp->req_next) {
1876*2248Sraf 			/* only one request on queue */
1877*2248Sraf 			_aio_doneq = NULL;
1878*2248Sraf 		} else {
1879*2248Sraf 			reqp->req_prev->req_next = _aio_doneq = reqp->req_next;
1880*2248Sraf 			_aio_doneq->req_prev = reqp->req_prev;
1881*2248Sraf 		}
1882*2248Sraf 	}
1883*2248Sraf 	if (reqp) {
1884*2248Sraf 		_aio_doneq_cnt--;
1885*2248Sraf 		reqp->req_next = reqp->req_prev = reqp;
1886*2248Sraf 		reqp->req_state = AIO_REQ_DONE;
1887*2248Sraf 	}
1888*2248Sraf 	return (reqp);
1889*2248Sraf }
1890*2248Sraf 
1891*2248Sraf /*
1892*2248Sraf  * An AIO request is identified by an aio_result_t pointer.  The library
1893*2248Sraf  * maps this aio_result_t pointer to its internal representation using a
1894*2248Sraf  * hash table.  This function adds an aio_result_t pointer to the hash table.
1895*2248Sraf  */
1896*2248Sraf static int
1897*2248Sraf _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp)
1898*2248Sraf {
1899*2248Sraf 	aio_hash_t *hashp;
1900*2248Sraf 	aio_req_t **prev;
1901*2248Sraf 	aio_req_t *next;
1902*2248Sraf 
1903*2248Sraf 	hashp = _aio_hash + AIOHASH(resultp);
1904*2248Sraf 	lmutex_lock(&hashp->hash_lock);
1905*2248Sraf 	prev = &hashp->hash_ptr;
1906*2248Sraf 	while ((next = *prev) != NULL) {
1907*2248Sraf 		if (resultp == next->req_resultp) {
1908*2248Sraf 			lmutex_unlock(&hashp->hash_lock);
1909*2248Sraf 			return (-1);
1910*2248Sraf 		}
1911*2248Sraf 		prev = &next->req_link;
1912*2248Sraf 	}
1913*2248Sraf 	*prev = reqp;
1914*2248Sraf 	ASSERT(reqp->req_link == NULL);
1915*2248Sraf 	lmutex_unlock(&hashp->hash_lock);
1916*2248Sraf 	return (0);
1917*2248Sraf }
1918*2248Sraf 
1919*2248Sraf /*
1920*2248Sraf  * Remove an entry from the hash table.
1921*2248Sraf  */
1922*2248Sraf aio_req_t *
1923*2248Sraf _aio_hash_del(aio_result_t *resultp)
1924*2248Sraf {
1925*2248Sraf 	aio_hash_t *hashp;
1926*2248Sraf 	aio_req_t **prev;
1927*2248Sraf 	aio_req_t *next = NULL;
1928*2248Sraf 
1929*2248Sraf 	if (_aio_hash != NULL) {
1930*2248Sraf 		hashp = _aio_hash + AIOHASH(resultp);
1931*2248Sraf 		lmutex_lock(&hashp->hash_lock);
1932*2248Sraf 		prev = &hashp->hash_ptr;
1933*2248Sraf 		while ((next = *prev) != NULL) {
1934*2248Sraf 			if (resultp == next->req_resultp) {
1935*2248Sraf 				*prev = next->req_link;
1936*2248Sraf 				next->req_link = NULL;
1937*2248Sraf 				break;
1938*2248Sraf 			}
1939*2248Sraf 			prev = &next->req_link;
1940*2248Sraf 		}
1941*2248Sraf 		lmutex_unlock(&hashp->hash_lock);
1942*2248Sraf 	}
1943*2248Sraf 	return (next);
1944*2248Sraf }
1945*2248Sraf 
1946*2248Sraf /*
1947*2248Sraf  *  find an entry in the hash table
1948*2248Sraf  */
1949*2248Sraf aio_req_t *
1950*2248Sraf _aio_hash_find(aio_result_t *resultp)
1951*2248Sraf {
1952*2248Sraf 	aio_hash_t *hashp;
1953*2248Sraf 	aio_req_t **prev;
1954*2248Sraf 	aio_req_t *next = NULL;
1955*2248Sraf 
1956*2248Sraf 	if (_aio_hash != NULL) {
1957*2248Sraf 		hashp = _aio_hash + AIOHASH(resultp);
1958*2248Sraf 		lmutex_lock(&hashp->hash_lock);
1959*2248Sraf 		prev = &hashp->hash_ptr;
1960*2248Sraf 		while ((next = *prev) != NULL) {
1961*2248Sraf 			if (resultp == next->req_resultp)
1962*2248Sraf 				break;
1963*2248Sraf 			prev = &next->req_link;
1964*2248Sraf 		}
1965*2248Sraf 		lmutex_unlock(&hashp->hash_lock);
1966*2248Sraf 	}
1967*2248Sraf 	return (next);
1968*2248Sraf }
1969*2248Sraf 
1970*2248Sraf /*
1971*2248Sraf  * AIO interface for POSIX
1972*2248Sraf  */
1973*2248Sraf int
1974*2248Sraf _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
1975*2248Sraf     int mode, int flg)
1976*2248Sraf {
1977*2248Sraf 	aio_req_t *reqp;
1978*2248Sraf 	aio_args_t *ap;
1979*2248Sraf 	int kerr;
1980*2248Sraf 
1981*2248Sraf 	if (aiocbp == NULL) {
1982*2248Sraf 		errno = EINVAL;
1983*2248Sraf 		return (-1);
1984*2248Sraf 	}
1985*2248Sraf 
1986*2248Sraf 	/* initialize kaio */
1987*2248Sraf 	if (!_kaio_ok)
1988*2248Sraf 		_kaio_init();
1989*2248Sraf 
1990*2248Sraf 	aiocbp->aio_state = NOCHECK;
1991*2248Sraf 
1992*2248Sraf 	/*
1993*2248Sraf 	 * If we have been called because a list I/O
1994*2248Sraf 	 * kaio() failed, we dont want to repeat the
1995*2248Sraf 	 * system call
1996*2248Sraf 	 */
1997*2248Sraf 
1998*2248Sraf 	if (flg & AIO_KAIO) {
1999*2248Sraf 		/*
2000*2248Sraf 		 * Try kernel aio first.
2001*2248Sraf 		 * If errno is ENOTSUP/EBADFD,
2002*2248Sraf 		 * fall back to the thread implementation.
2003*2248Sraf 		 */
2004*2248Sraf 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2005*2248Sraf 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2006*2248Sraf 			aiocbp->aio_state = CHECK;
2007*2248Sraf 			kerr = (int)_kaio(mode, aiocbp);
2008*2248Sraf 			if (kerr == 0)
2009*2248Sraf 				return (0);
2010*2248Sraf 			if (errno != ENOTSUP && errno != EBADFD) {
2011*2248Sraf 				aiocbp->aio_resultp.aio_errno = errno;
2012*2248Sraf 				aiocbp->aio_resultp.aio_return = -1;
2013*2248Sraf 				aiocbp->aio_state = NOCHECK;
2014*2248Sraf 				return (-1);
2015*2248Sraf 			}
2016*2248Sraf 			if (errno == EBADFD)
2017*2248Sraf 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2018*2248Sraf 		}
2019*2248Sraf 	}
2020*2248Sraf 
2021*2248Sraf 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2022*2248Sraf 	aiocbp->aio_state = USERAIO;
2023*2248Sraf 
2024*2248Sraf 	if (!__uaio_ok && __uaio_init() == -1)
2025*2248Sraf 		return (-1);
2026*2248Sraf 
2027*2248Sraf 	if ((reqp = _aio_req_alloc()) == NULL) {
2028*2248Sraf 		errno = EAGAIN;
2029*2248Sraf 		return (-1);
2030*2248Sraf 	}
2031*2248Sraf 
2032*2248Sraf 	/*
2033*2248Sraf 	 * If an LIO request, add the list head to the aio request
2034*2248Sraf 	 */
2035*2248Sraf 	reqp->req_head = lio_head;
2036*2248Sraf 	reqp->req_type = AIO_POSIX_REQ;
2037*2248Sraf 	reqp->req_op = mode;
2038*2248Sraf 	reqp->req_largefile = 0;
2039*2248Sraf 
2040*2248Sraf 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2041*2248Sraf 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2042*2248Sraf 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2043*2248Sraf 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2044*2248Sraf 		reqp->req_sigevent.sigev_signo =
2045*2248Sraf 		    aiocbp->aio_sigevent.sigev_signo;
2046*2248Sraf 		reqp->req_sigevent.sigev_value.sival_ptr =
2047*2248Sraf 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2048*2248Sraf 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2049*2248Sraf 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2050*2248Sraf 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2051*2248Sraf 		/*
2052*2248Sraf 		 * Reuse the sigevent structure to contain the port number
2053*2248Sraf 		 * and the user value.  Same for SIGEV_THREAD, below.
2054*2248Sraf 		 */
2055*2248Sraf 		reqp->req_sigevent.sigev_signo =
2056*2248Sraf 		    pn->portnfy_port;
2057*2248Sraf 		reqp->req_sigevent.sigev_value.sival_ptr =
2058*2248Sraf 		    pn->portnfy_user;
2059*2248Sraf 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2060*2248Sraf 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2061*2248Sraf 		/*
2062*2248Sraf 		 * The sigevent structure contains the port number
2063*2248Sraf 		 * and the user value.  Same for SIGEV_PORT, above.
2064*2248Sraf 		 */
2065*2248Sraf 		reqp->req_sigevent.sigev_signo =
2066*2248Sraf 		    aiocbp->aio_sigevent.sigev_signo;
2067*2248Sraf 		reqp->req_sigevent.sigev_value.sival_ptr =
2068*2248Sraf 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2069*2248Sraf 	}
2070*2248Sraf 
2071*2248Sraf 	reqp->req_resultp = &aiocbp->aio_resultp;
2072*2248Sraf 	reqp->req_aiocbp = aiocbp;
2073*2248Sraf 	ap = &reqp->req_args;
2074*2248Sraf 	ap->fd = aiocbp->aio_fildes;
2075*2248Sraf 	ap->buf = (caddr_t)aiocbp->aio_buf;
2076*2248Sraf 	ap->bufsz = aiocbp->aio_nbytes;
2077*2248Sraf 	ap->offset = aiocbp->aio_offset;
2078*2248Sraf 
2079*2248Sraf 	if ((flg & AIO_NO_DUPS) &&
2080*2248Sraf 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2081*2248Sraf 		aio_panic("_aio_rw(): request already in hash table");
2082*2248Sraf 		_aio_req_free(reqp);
2083*2248Sraf 		errno = EINVAL;
2084*2248Sraf 		return (-1);
2085*2248Sraf 	}
2086*2248Sraf 	_aio_req_add(reqp, nextworker, mode);
2087*2248Sraf 	return (0);
2088*2248Sraf }
2089*2248Sraf 
2090*2248Sraf #if !defined(_LP64)
2091*2248Sraf /*
2092*2248Sraf  * 64-bit AIO interface for POSIX
2093*2248Sraf  */
2094*2248Sraf int
2095*2248Sraf _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2096*2248Sraf     int mode, int flg)
2097*2248Sraf {
2098*2248Sraf 	aio_req_t *reqp;
2099*2248Sraf 	aio_args_t *ap;
2100*2248Sraf 	int kerr;
2101*2248Sraf 
2102*2248Sraf 	if (aiocbp == NULL) {
2103*2248Sraf 		errno = EINVAL;
2104*2248Sraf 		return (-1);
2105*2248Sraf 	}
2106*2248Sraf 
2107*2248Sraf 	/* initialize kaio */
2108*2248Sraf 	if (!_kaio_ok)
2109*2248Sraf 		_kaio_init();
2110*2248Sraf 
2111*2248Sraf 	aiocbp->aio_state = NOCHECK;
2112*2248Sraf 
2113*2248Sraf 	/*
2114*2248Sraf 	 * If we have been called because a list I/O
2115*2248Sraf 	 * kaio() failed, we dont want to repeat the
2116*2248Sraf 	 * system call
2117*2248Sraf 	 */
2118*2248Sraf 
2119*2248Sraf 	if (flg & AIO_KAIO) {
2120*2248Sraf 		/*
2121*2248Sraf 		 * Try kernel aio first.
2122*2248Sraf 		 * If errno is ENOTSUP/EBADFD,
2123*2248Sraf 		 * fall back to the thread implementation.
2124*2248Sraf 		 */
2125*2248Sraf 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2126*2248Sraf 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2127*2248Sraf 			aiocbp->aio_state = CHECK;
2128*2248Sraf 			kerr = (int)_kaio(mode, aiocbp);
2129*2248Sraf 			if (kerr == 0)
2130*2248Sraf 				return (0);
2131*2248Sraf 			if (errno != ENOTSUP && errno != EBADFD) {
2132*2248Sraf 				aiocbp->aio_resultp.aio_errno = errno;
2133*2248Sraf 				aiocbp->aio_resultp.aio_return = -1;
2134*2248Sraf 				aiocbp->aio_state = NOCHECK;
2135*2248Sraf 				return (-1);
2136*2248Sraf 			}
2137*2248Sraf 			if (errno == EBADFD)
2138*2248Sraf 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2139*2248Sraf 		}
2140*2248Sraf 	}
2141*2248Sraf 
2142*2248Sraf 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2143*2248Sraf 	aiocbp->aio_state = USERAIO;
2144*2248Sraf 
2145*2248Sraf 	if (!__uaio_ok && __uaio_init() == -1)
2146*2248Sraf 		return (-1);
2147*2248Sraf 
2148*2248Sraf 	if ((reqp = _aio_req_alloc()) == NULL) {
2149*2248Sraf 		errno = EAGAIN;
2150*2248Sraf 		return (-1);
2151*2248Sraf 	}
2152*2248Sraf 
2153*2248Sraf 	/*
2154*2248Sraf 	 * If an LIO request, add the list head to the aio request
2155*2248Sraf 	 */
2156*2248Sraf 	reqp->req_head = lio_head;
2157*2248Sraf 	reqp->req_type = AIO_POSIX_REQ;
2158*2248Sraf 	reqp->req_op = mode;
2159*2248Sraf 	reqp->req_largefile = 1;
2160*2248Sraf 
2161*2248Sraf 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2162*2248Sraf 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2163*2248Sraf 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2164*2248Sraf 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2165*2248Sraf 		reqp->req_sigevent.sigev_signo =
2166*2248Sraf 		    aiocbp->aio_sigevent.sigev_signo;
2167*2248Sraf 		reqp->req_sigevent.sigev_value.sival_ptr =
2168*2248Sraf 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2169*2248Sraf 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2170*2248Sraf 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2171*2248Sraf 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2172*2248Sraf 		reqp->req_sigevent.sigev_signo =
2173*2248Sraf 		    pn->portnfy_port;
2174*2248Sraf 		reqp->req_sigevent.sigev_value.sival_ptr =
2175*2248Sraf 		    pn->portnfy_user;
2176*2248Sraf 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2177*2248Sraf 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2178*2248Sraf 		reqp->req_sigevent.sigev_signo =
2179*2248Sraf 		    aiocbp->aio_sigevent.sigev_signo;
2180*2248Sraf 		reqp->req_sigevent.sigev_value.sival_ptr =
2181*2248Sraf 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2182*2248Sraf 	}
2183*2248Sraf 
2184*2248Sraf 	reqp->req_resultp = &aiocbp->aio_resultp;
2185*2248Sraf 	reqp->req_aiocbp = aiocbp;
2186*2248Sraf 	ap = &reqp->req_args;
2187*2248Sraf 	ap->fd = aiocbp->aio_fildes;
2188*2248Sraf 	ap->buf = (caddr_t)aiocbp->aio_buf;
2189*2248Sraf 	ap->bufsz = aiocbp->aio_nbytes;
2190*2248Sraf 	ap->offset = aiocbp->aio_offset;
2191*2248Sraf 
2192*2248Sraf 	if ((flg & AIO_NO_DUPS) &&
2193*2248Sraf 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2194*2248Sraf 		aio_panic("_aio_rw64(): request already in hash table");
2195*2248Sraf 		_aio_req_free(reqp);
2196*2248Sraf 		errno = EINVAL;
2197*2248Sraf 		return (-1);
2198*2248Sraf 	}
2199*2248Sraf 	_aio_req_add(reqp, nextworker, mode);
2200*2248Sraf 	return (0);
2201*2248Sraf }
2202*2248Sraf #endif	/* !defined(_LP64) */
2203