xref: /onnv-gate/usr/src/cmd/filebench/common/flowop_library.c (revision 5673:043503f0cca3)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include "config.h"
29 
30 #include <sys/types.h>
31 #ifdef HAVE_SYS_ASYNCH_H
32 #include <sys/asynch.h>
33 #endif
34 #include <sys/ipc.h>
35 #include <sys/sem.h>
36 #include <sys/errno.h>
37 #include <sys/time.h>
38 #include <inttypes.h>
39 #include <fcntl.h>
40 
41 #ifdef HAVE_UTILITY_H
42 #include <utility.h>
43 #endif /* HAVE_UTILITY_H */
44 
45 #ifdef HAVE_AIO
46 #include <aio.h>
47 #endif /* HAVE_AIO */
48 
49 #ifdef HAVE_LIBAIO_H
50 #include <libaio.h>
51 #endif /* HAVE_LIBAIO_H */
52 
53 #ifdef HAVE_SYS_ASYNC_H
54 #include <sys/asynch.h>
55 #endif /* HAVE_SYS_ASYNC_H */
56 
57 #ifdef HAVE_AIO_H
58 #include <aio.h>
59 #endif /* HAVE_AIO_H */
60 
61 #ifndef HAVE_UINT_T
62 #define	uint_t unsigned int
63 #endif /* HAVE_UINT_T */
64 
65 #ifndef HAVE_AIOCB64_T
66 #define	aiocb64 aiocb
67 #endif /* HAVE_AIOCB64_T */
68 
69 #ifndef HAVE_SYSV_SEM
70 #include <semaphore.h>
71 #endif /* HAVE_SYSV_SEM */
72 
73 #include "filebench.h"
74 #include "flowop.h"
75 #include "fileset.h"
76 
77 /*
78  * These routines implement the flowops from the f language. Each
79  * flowop has has a name such as "read", and a set of function pointers
80  * to call for initialization, execution and destruction of the flowop.
81  * The table flowoplib_funcs[] contains a flowoplib struct for each
82  * implemented flowop. Most flowops use a generic initialization function
83  * and all currently use a generic destruction function. All flowop
84  * functions referenced from the table are in this file, though, of
85  * course, they often call functions from other files.
86  *
87  * The flowop_init() routine uses the flowoplib_funcs[] table to
88  * create an initial set of "instance 0" flowops, one for each type of
89  * flowop, from which all other flowops are derived. These "instance 0"
90  * flowops are initialized with information from the table including
91  * pointers for their fo_init, fo_func and fo_destroy functions. When
92  * a flowop definition is encountered in an f language script, the
93  * "type" of flowop, such as "read" is used to search for the
94  * "instance 0" flowop named "read", then a new flowop is allocated
95  * which inherits its function pointers and other initial properties
96  * from the instance 0 flowop, and is given a new name as specified
97  * by the "name=" attribute.
98  */
99 
100 static int flowoplib_init_generic(flowop_t *flowop);
101 static void flowoplib_destruct_generic(flowop_t *flowop);
102 static int flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop);
103 static int flowoplib_write(threadflow_t *threadflow, flowop_t *flowop);
104 #ifdef HAVE_AIO
105 static int flowoplib_aiowrite(threadflow_t *threadflow, flowop_t *flowop);
106 static int flowoplib_aiowait(threadflow_t *threadflow, flowop_t *flowop);
107 #endif
108 static int flowoplib_read(threadflow_t *threadflow, flowop_t *flowop);
109 static int flowoplib_block_init(flowop_t *flowop);
110 static int flowoplib_block(threadflow_t *threadflow, flowop_t *flowop);
111 static int flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop);
112 static int flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop);
113 static int flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop);
114 static int flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop);
115 static int flowoplib_sempost_init(flowop_t *flowop);
116 static int flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop);
117 static int flowoplib_semblock_init(flowop_t *flowop);
118 static void flowoplib_semblock_destruct(flowop_t *flowop);
119 static int flowoplib_eventlimit(threadflow_t *, flowop_t *flowop);
120 static int flowoplib_bwlimit(threadflow_t *, flowop_t *flowop);
121 static int flowoplib_iopslimit(threadflow_t *, flowop_t *flowop);
122 static int flowoplib_opslimit(threadflow_t *, flowop_t *flowop);
123 static int flowoplib_openfile(threadflow_t *, flowop_t *flowop);
124 static int flowoplib_openfile_common(threadflow_t *, flowop_t *flowop, int fd);
125 static int flowoplib_createfile(threadflow_t *, flowop_t *flowop);
126 static int flowoplib_closefile(threadflow_t *, flowop_t *flowop);
127 static int flowoplib_fsync(threadflow_t *, flowop_t *flowop);
128 static int flowoplib_readwholefile(threadflow_t *, flowop_t *flowop);
129 static int flowoplib_writewholefile(threadflow_t *, flowop_t *flowop);
130 static int flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop);
131 static int flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop);
132 static int flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop);
133 static int flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop);
134 static int flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop);
135 static int flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop);
136 static int flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop);
137 
138 typedef struct flowoplib {
139 	int	fl_type;
140 	int	fl_attrs;
141 	char	*fl_name;
142 	int	(*fl_init)();
143 	int	(*fl_func)();
144 	void	(*fl_destruct)();
145 } flowoplib_t;
146 
147 static flowoplib_t flowoplib_funcs[] = {
148 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "write", flowoplib_init_generic,
149 	flowoplib_write, flowoplib_destruct_generic,
150 	FLOW_TYPE_IO, FLOW_ATTR_READ, "read", flowoplib_init_generic,
151 	flowoplib_read, flowoplib_destruct_generic,
152 #ifdef HAVE_AIO
153 	FLOW_TYPE_AIO, FLOW_ATTR_WRITE, "aiowrite", flowoplib_init_generic,
154 	flowoplib_aiowrite, flowoplib_destruct_generic,
155 	FLOW_TYPE_AIO, 0, "aiowait", flowoplib_init_generic,
156 	flowoplib_aiowait, flowoplib_destruct_generic,
157 #endif
158 	FLOW_TYPE_SYNC, 0, "block", flowoplib_block_init,
159 	flowoplib_block, flowoplib_destruct_generic,
160 	FLOW_TYPE_SYNC, 0, "wakeup", flowoplib_init_generic,
161 	flowoplib_wakeup, flowoplib_destruct_generic,
162 	FLOW_TYPE_SYNC, 0, "semblock", flowoplib_semblock_init,
163 	flowoplib_semblock, flowoplib_semblock_destruct,
164 	FLOW_TYPE_SYNC, 0, "sempost", flowoplib_sempost_init,
165 	flowoplib_sempost, flowoplib_destruct_generic,
166 	FLOW_TYPE_OTHER, 0, "hog", flowoplib_init_generic,
167 	flowoplib_hog, flowoplib_destruct_generic,
168 	FLOW_TYPE_OTHER, 0, "delay", flowoplib_init_generic,
169 	flowoplib_delay, flowoplib_destruct_generic,
170 	FLOW_TYPE_OTHER, 0, "eventlimit", flowoplib_init_generic,
171 	flowoplib_eventlimit, flowoplib_destruct_generic,
172 	FLOW_TYPE_OTHER, 0, "bwlimit", flowoplib_init_generic,
173 	flowoplib_bwlimit, flowoplib_destruct_generic,
174 	FLOW_TYPE_OTHER, 0, "iopslimit", flowoplib_init_generic,
175 	flowoplib_iopslimit, flowoplib_destruct_generic,
176 	FLOW_TYPE_OTHER, 0, "opslimit", flowoplib_init_generic,
177 	flowoplib_opslimit, flowoplib_destruct_generic,
178 	FLOW_TYPE_OTHER, 0, "finishoncount", flowoplib_init_generic,
179 	flowoplib_finishoncount, flowoplib_destruct_generic,
180 	FLOW_TYPE_OTHER, 0, "finishonbytes", flowoplib_init_generic,
181 	flowoplib_finishonbytes, flowoplib_destruct_generic,
182 	FLOW_TYPE_IO, 0, "openfile", flowoplib_init_generic,
183 	flowoplib_openfile, flowoplib_destruct_generic,
184 	FLOW_TYPE_IO, 0, "createfile", flowoplib_init_generic,
185 	flowoplib_createfile, flowoplib_destruct_generic,
186 	FLOW_TYPE_IO, 0, "closefile", flowoplib_init_generic,
187 	flowoplib_closefile, flowoplib_destruct_generic,
188 	FLOW_TYPE_IO, 0, "fsync", flowoplib_init_generic,
189 	flowoplib_fsync, flowoplib_destruct_generic,
190 	FLOW_TYPE_IO, 0, "fsyncset", flowoplib_init_generic,
191 	flowoplib_fsyncset, flowoplib_destruct_generic,
192 	FLOW_TYPE_IO, 0, "statfile", flowoplib_init_generic,
193 	flowoplib_statfile, flowoplib_destruct_generic,
194 	FLOW_TYPE_IO, FLOW_ATTR_READ, "readwholefile", flowoplib_init_generic,
195 	flowoplib_readwholefile, flowoplib_destruct_generic,
196 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfile", flowoplib_init_generic,
197 	flowoplib_appendfile, flowoplib_destruct_generic,
198 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfilerand", flowoplib_init_generic,
199 	flowoplib_appendfilerand, flowoplib_destruct_generic,
200 	FLOW_TYPE_IO, 0, "deletefile", flowoplib_init_generic,
201 	flowoplib_deletefile, flowoplib_destruct_generic,
202 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "writewholefile", flowoplib_init_generic,
203 	flowoplib_writewholefile, flowoplib_destruct_generic
204 };
205 
206 /*
207  * Loops through the master list of flowops defined in this
208  * module, and creates and initializes a flowop for each one
209  * by calling flowop_define. As a side effect of calling
210  * flowop define, the created flowops are placed on the
211  * master flowop list. All created flowops are set to
212  * instance "0".
213  */
214 void
215 flowoplib_init()
216 {
217 	int nops = sizeof (flowoplib_funcs) / sizeof (flowoplib_t);
218 	int i;
219 
220 	for (i = 0; i < nops; i++) {
221 		flowop_t *flowop;
222 		flowoplib_t *fl;
223 
224 		fl = &flowoplib_funcs[i];
225 
226 		if ((flowop = flowop_define(NULL,
227 		    fl->fl_name, NULL, 0, fl->fl_type)) == 0) {
228 			filebench_log(LOG_ERROR,
229 			    "failed to create flowop %s\n",
230 			    fl->fl_name);
231 			filebench_shutdown(1);
232 		}
233 
234 		flowop->fo_func = fl->fl_func;
235 		flowop->fo_init = fl->fl_init;
236 		flowop->fo_destruct = fl->fl_destruct;
237 		flowop->fo_attrs = fl->fl_attrs;
238 	}
239 }
240 
241 static int
242 flowoplib_init_generic(flowop_t *flowop)
243 {
244 	(void) ipc_mutex_unlock(&flowop->fo_lock);
245 	return (0);
246 }
247 
248 /* ARGSUSED */
249 static void
250 flowoplib_destruct_generic(flowop_t *flowop)
251 {
252 	/* release any resources held by the flowop */
253 	if (flowop->fo_buf)
254 		free(flowop->fo_buf);
255 }
256 
257 /*
258  * Generates a file attribute from flags in the supplied flowop.
259  * Sets FLOW_ATTR_DIRECTIO and/or FLOW_ATTR_DSYNC as needed.
260  */
261 static int
262 flowoplib_fileattrs(flowop_t *flowop)
263 {
264 	int attrs = 0;
265 
266 	if (*flowop->fo_directio)
267 		attrs |= FLOW_ATTR_DIRECTIO;
268 
269 	if (*flowop->fo_dsync)
270 		attrs |= FLOW_ATTR_DSYNC;
271 
272 	return (attrs);
273 }
274 
275 /*
276  * Searches for a file descriptor. Tries the flowop's
277  * fo_fdnumber first and returns with it if it has been
278  * explicitly set (greater than 0). It next checks to
279  * see if a rotating file descriptor policy is in effect,
280  * and if not returns the fdnumber regardless of what
281  * it is. (note that if it is 0, it just selects to the
282  * default file descriptor in the threadflow's tf_fd
283  * array). If the rotating fd policy is in effect, it
284  * cycles from the end of the tf_fd array to one location
285  * beyond the maximum needed by the number of entries in
286  * the associated fileset on each invocation, then starts
287  * over from the end.
288  *
289  * The routine returns an index into the threadflow's
290  * tf_fd table where the actual file descriptor will be
291  * found. Note: the calling routine must not call this
292  * routine if the flowop does not have a fileset, and the
293  * flowop's fo_fdnumber is zero and fo_rotatefd is
294  * asserted, or an addressing fault may occur.
295  */
296 static int
297 flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop)
298 {
299 	/* If the script sets the fd explicitly */
300 	if (flowop->fo_fdnumber > 0)
301 		return (flowop->fo_fdnumber);
302 
303 	/* If the flowop defaults to persistent fd */
304 	if (!integer_isset(flowop->fo_rotatefd))
305 		return (flowop->fo_fdnumber);
306 
307 	/* Rotate the fd on each flowop invocation */
308 	if (*(flowop->fo_fileset->fs_entries) > (THREADFLOW_MAXFD / 2)) {
309 		filebench_log(LOG_ERROR, "Out of file descriptors in flowop %s"
310 		    " (too many files : %d", flowop->fo_name,
311 		    *(flowop->fo_fileset->fs_entries));
312 		return (-1);
313 	}
314 
315 	/* First time around */
316 	if (threadflow->tf_fdrotor == 0)
317 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
318 
319 	/* One fd for every file in the set */
320 	if (*(flowop->fo_fileset->fs_entries) ==
321 	    (THREADFLOW_MAXFD - threadflow->tf_fdrotor))
322 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
323 
324 
325 	threadflow->tf_fdrotor--;
326 	filebench_log(LOG_DEBUG_IMPL, "selected fd = %d",
327 	    threadflow->tf_fdrotor);
328 	return (threadflow->tf_fdrotor);
329 }
330 
331 /*
332  * Determines the file descriptor to use, and attempts to open
333  * the file if it is not already open. Also determines the wss
334  * value. Returns -1 on errors, 0 otherwise.
335  */
336 static int
337 flowoplib_filesetup(threadflow_t *threadflow, flowop_t *flowop,
338     vinteger_t *wssp, int *filedescp)
339 {
340 	int fd = flowoplib_fdnum(threadflow, flowop);
341 
342 	if (fd == -1)
343 		return (-1);
344 
345 	if (threadflow->tf_fd[fd] == 0) {
346 		if (flowoplib_openfile_common(
347 		    threadflow, flowop, fd) == -1)
348 			return (-1);
349 
350 		if (threadflow->tf_fse[fd]) {
351 			filebench_log(LOG_DEBUG_IMPL, "opened file %s",
352 			    threadflow->tf_fse[fd]->fse_path);
353 		} else {
354 			filebench_log(LOG_DEBUG_IMPL,
355 			    "opened device %s/%s",
356 			    flowop->fo_fileset->fs_path,
357 			    flowop->fo_fileset->fs_name);
358 		}
359 	}
360 
361 	*filedescp = threadflow->tf_fd[fd];
362 
363 	if (*flowop->fo_wss == 0) {
364 		if (threadflow->tf_fse[fd])
365 			*wssp = threadflow->tf_fse[fd]->fse_size;
366 		else
367 			*wssp = *flowop->fo_fileset->fs_size;
368 	} else {
369 		*wssp = *flowop->fo_wss;
370 	}
371 
372 	return (0);
373 }
374 
375 /*
376  * Determines the io buffer or random offset into tf_mem for
377  * the IO operation. Returns -1 on errors, 0 otherwise.
378  */
379 static int
380 flowoplib_iobufsetup(threadflow_t *threadflow, flowop_t *flowop,
381     caddr_t *iobufp, vinteger_t iosize)
382 {
383 	long memsize;
384 	size_t memoffset;
385 
386 	if (iosize == 0) {
387 		filebench_log(LOG_ERROR, "zero iosize for thread %s",
388 		    flowop->fo_name);
389 		return (-1);
390 	}
391 
392 	if ((memsize = *threadflow->tf_memsize) != 0) {
393 
394 		/* use tf_mem for I/O with random offset */
395 		if (filebench_randomno(&memoffset, memsize, iosize) == -1) {
396 			filebench_log(LOG_ERROR,
397 			    "tf_memsize smaller than IO size for thread %s",
398 			    flowop->fo_name);
399 			return (-1);
400 		}
401 		*iobufp = threadflow->tf_mem + memoffset;
402 
403 	} else {
404 		/* use private I/O buffer */
405 		if ((flowop->fo_buf != NULL) &&
406 		    (flowop->fo_buf_size < iosize)) {
407 			free(flowop->fo_buf);
408 			flowop->fo_buf = NULL;
409 		}
410 		if ((flowop->fo_buf == NULL) && ((flowop->fo_buf
411 		    = (char *)malloc(iosize)) == NULL))
412 				return (-1);
413 
414 		flowop->fo_buf_size = iosize;
415 		*iobufp = flowop->fo_buf;
416 	}
417 	return (0);
418 }
419 
420 /*
421  * Determines the file descriptor to use, opens it if necessary, the
422  * io buffer or random offset into tf_mem for IO operation and the wss
423  * value. Returns -1 on errors, 0 otherwise.
424  */
425 static int
426 flowoplib_iosetup(threadflow_t *threadflow, flowop_t *flowop,
427     vinteger_t *wssp, caddr_t *iobufp, int *filedescp, vinteger_t iosize)
428 {
429 	if (flowoplib_filesetup(threadflow, flowop, wssp, filedescp) == -1)
430 		return (-1);
431 
432 	if (flowoplib_iobufsetup(threadflow, flowop, iobufp, iosize) == -1)
433 		return (-1);
434 
435 	return (0);
436 }
437 
438 /*
439  * Emulate posix read / pread. If the flowop has a fileset,
440  * a file descriptor number index is fetched, otherwise a
441  * supplied fileobj file is used. In either case the specified
442  * file will be opened if not already open. If the flowop has
443  * neither a fileset or fileobj, an error is logged and -1
444  * returned.
445  *
446  * The actual read is done to a random offset in the
447  * threadflow's thread memory (tf_mem), with a size set by
448  * fo_iosize and at either a random disk offset within the
449  * working set size, or at the next sequential location. If
450  * any errors are encountered, -1 is returned, if successful,
451  * 0 is returned.
452  */
453 static int
454 flowoplib_read(threadflow_t *threadflow, flowop_t *flowop)
455 {
456 	caddr_t iobuf;
457 	vinteger_t wss;
458 	int filedesc;
459 	int ret;
460 
461 	if (flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
462 	    &filedesc, *flowop->fo_iosize) != 0)
463 		return (-1);
464 
465 	if (*flowop->fo_random) {
466 		uint64_t fileoffset;
467 
468 		if (filebench_randomno64(&fileoffset, wss,
469 		    *flowop->fo_iosize) == -1) {
470 			filebench_log(LOG_ERROR,
471 			    "file size smaller than IO size for thread %s",
472 			    flowop->fo_name);
473 			return (-1);
474 		}
475 
476 		(void) flowop_beginop(threadflow, flowop);
477 		if ((ret = pread64(filedesc, iobuf,
478 		    *flowop->fo_iosize, (off64_t)fileoffset)) == -1) {
479 			(void) flowop_endop(threadflow, flowop, 0);
480 			filebench_log(LOG_ERROR,
481 			    "read file %s failed, offset %lld "
482 			    "io buffer %zd: %s",
483 			    flowop->fo_fileset->fs_name,
484 			    fileoffset, iobuf, strerror(errno));
485 			flowop_endop(threadflow, flowop, 0);
486 			return (-1);
487 		}
488 		(void) flowop_endop(threadflow, flowop, ret);
489 
490 		if ((ret == 0))
491 			(void) lseek64(filedesc, 0, SEEK_SET);
492 
493 	} else {
494 		(void) flowop_beginop(threadflow, flowop);
495 		if ((ret = read(filedesc, iobuf,
496 		    *flowop->fo_iosize)) == -1) {
497 			filebench_log(LOG_ERROR,
498 			    "read file %s failed, io buffer %zd: %s",
499 			    flowop->fo_fileset->fs_name,
500 			    iobuf, strerror(errno));
501 			(void) flowop_endop(threadflow, flowop, 0);
502 			return (-1);
503 		}
504 		(void) flowop_endop(threadflow, flowop, ret);
505 
506 		if ((ret == 0))
507 			(void) lseek64(filedesc, 0, SEEK_SET);
508 	}
509 
510 	return (0);
511 }
512 
513 #ifdef HAVE_AIO
514 
515 /*
516  * Asynchronous write section. An Asynchronous IO element
517  * (aiolist_t) is used to associate the asynchronous write request with
518  * its subsequent completion. This element includes a aiocb64 struct
519  * that is used by posix aio_xxx calls to track the asynchronous writes.
520  * The flowops aiowrite and aiowait result in calls to these posix
521  * aio_xxx system routines to do the actual asynchronous write IO
522  * operations.
523  */
524 
525 
526 /*
527  * Allocates an asynchronous I/O list (aio, of type
528  * aiolist_t) element. Adds it to the flowop thread's
529  * threadflow aio list. Returns a pointer to the element.
530  */
531 static aiolist_t *
532 aio_allocate(flowop_t *flowop)
533 {
534 	aiolist_t *aiolist;
535 
536 	if ((aiolist = malloc(sizeof (aiolist_t))) == NULL) {
537 		filebench_log(LOG_ERROR, "malloc aiolist failed");
538 		filebench_shutdown(1);
539 	}
540 
541 	/* Add to list */
542 	if (flowop->fo_thread->tf_aiolist == NULL) {
543 		flowop->fo_thread->tf_aiolist = aiolist;
544 		aiolist->al_next = NULL;
545 	} else {
546 		aiolist->al_next = flowop->fo_thread->tf_aiolist;
547 		flowop->fo_thread->tf_aiolist = aiolist;
548 	}
549 	return (aiolist);
550 }
551 
552 /*
553  * Searches for the aiolist element that has a matching
554  * completion block, aiocb. If none found returns -1. If
555  * found, removes the aiolist element from flowop thread's
556  * list and returns 0.
557  */
558 static int
559 aio_deallocate(flowop_t *flowop, struct aiocb64 *aiocb)
560 {
561 	aiolist_t *aiolist = flowop->fo_thread->tf_aiolist;
562 	aiolist_t *previous = NULL;
563 	aiolist_t *match = NULL;
564 
565 	if (aiocb == NULL) {
566 		filebench_log(LOG_ERROR, "null aiocb deallocate");
567 		return (0);
568 	}
569 
570 	while (aiolist) {
571 		if (aiocb == &(aiolist->al_aiocb)) {
572 			match = aiolist;
573 			break;
574 		}
575 		previous = aiolist;
576 		aiolist = aiolist->al_next;
577 	}
578 
579 	if (match == NULL)
580 		return (-1);
581 
582 	/* Remove from the list */
583 	if (previous)
584 		previous->al_next = match->al_next;
585 	else
586 		flowop->fo_thread->tf_aiolist = match->al_next;
587 
588 	return (0);
589 }
590 
591 /*
592  * Emulate posix aiowrite(). Determines which file to use,
593  * either one file of a fileset, or the file associated
594  * with a fileobj, allocates and fills an aiolist_t element
595  * for the write, and issues the asynchronous write. This
596  * operation is only valid for random IO, and returns an
597  * error if the flowop is set for sequential IO. Returns 0
598  * on success, -1 on any encountered error.
599  */
600 static int
601 flowoplib_aiowrite(threadflow_t *threadflow, flowop_t *flowop)
602 {
603 	caddr_t iobuf;
604 	vinteger_t wss;
605 	int filedesc;
606 
607 	if (flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
608 	    &filedesc, *flowop->fo_iosize) != 0)
609 		return (-1);
610 
611 	if (*flowop->fo_random) {
612 		uint64_t fileoffset;
613 		struct aiocb64 *aiocb;
614 		aiolist_t *aiolist;
615 
616 		if (filebench_randomno64(&fileoffset,
617 		    wss, *flowop->fo_iosize) == -1) {
618 			filebench_log(LOG_ERROR,
619 			    "file size smaller than IO size for thread %s",
620 			    flowop->fo_name);
621 			return (-1);
622 		}
623 
624 		aiolist = aio_allocate(flowop);
625 		aiolist->al_type = AL_WRITE;
626 		aiocb = &aiolist->al_aiocb;
627 
628 		aiocb->aio_fildes = filedesc;
629 		aiocb->aio_buf = iobuf;
630 		aiocb->aio_nbytes = *flowop->fo_iosize;
631 		aiocb->aio_offset = (off64_t)fileoffset;
632 		aiocb->aio_reqprio = 0;
633 
634 		filebench_log(LOG_DEBUG_IMPL,
635 		    "aio fd=%d, bytes=%lld, offset=%lld",
636 		    filedesc, *flowop->fo_iosize, fileoffset);
637 
638 		flowop_beginop(threadflow, flowop);
639 		if (aio_write64(aiocb) < 0) {
640 			filebench_log(LOG_ERROR, "aiowrite failed: %s",
641 			    strerror(errno));
642 			filebench_shutdown(1);
643 		}
644 		flowop_endop(threadflow, flowop, *flowop->fo_iosize);
645 	} else {
646 		return (-1);
647 	}
648 
649 	return (0);
650 }
651 
652 
653 
654 #define	MAXREAP 4096
655 
656 /*
657  * Emulate posix aiowait(). Waits for the completion of half the
658  * outstanding asynchronous IOs, or a single IO, which ever is
659  * larger. The routine will return after a sufficient number of
660  * completed calls issued by any thread in the procflow have
661  * completed, or a 1 second timout elapses. All completed
662  * IO operations are deleted from the thread's aiolist.
663  */
664 static int
665 flowoplib_aiowait(threadflow_t *threadflow, flowop_t *flowop)
666 {
667 	struct aiocb64 **worklist;
668 	aiolist_t *aio = flowop->fo_thread->tf_aiolist;
669 	int uncompleted = 0;
670 
671 	worklist = calloc(MAXREAP, sizeof (struct aiocb64 *));
672 
673 	/* Count the list of pending aios */
674 	while (aio) {
675 		uncompleted++;
676 		aio = aio->al_next;
677 	}
678 
679 	do {
680 		uint_t ncompleted = 0;
681 		uint_t todo;
682 		struct timespec timeout;
683 		int inprogress;
684 		int i;
685 
686 		/* Wait for half of the outstanding requests */
687 		timeout.tv_sec = 1;
688 		timeout.tv_nsec = 0;
689 
690 		if (uncompleted > MAXREAP)
691 			todo = MAXREAP;
692 		else
693 			todo = uncompleted / 2;
694 
695 		if (todo == 0)
696 			todo = 1;
697 
698 		flowop_beginop(threadflow, flowop);
699 
700 #ifdef HAVE_AIOWAITN
701 		if ((aio_waitn64((struct aiocb64 **)worklist,
702 		    MAXREAP, &todo, &timeout) == -1) &&
703 		    errno && (errno != ETIME)) {
704 			filebench_log(LOG_ERROR,
705 			    "aiowait failed: %s, outstanding = %d, "
706 			    "ncompleted = %d ",
707 			    strerror(errno), uncompleted, todo);
708 		}
709 
710 		ncompleted = todo;
711 		/* Take the  completed I/Os from the list */
712 		inprogress = 0;
713 		for (i = 0; i < ncompleted; i++) {
714 			if ((aio_return64(worklist[i]) == -1) &&
715 			    (errno == EINPROGRESS)) {
716 				inprogress++;
717 				continue;
718 			}
719 			if (aio_deallocate(flowop, worklist[i]) < 0) {
720 				filebench_log(LOG_ERROR, "Could not remove "
721 				    "aio from list ");
722 				flowop_endop(threadflow, flowop, 0);
723 				return (-1);
724 			}
725 		}
726 
727 		uncompleted -= ncompleted;
728 		uncompleted += inprogress;
729 
730 #else
731 
732 		for (ncompleted = 0, inprogress = 0,
733 		    aio = flowop->fo_thread->tf_aiolist;
734 		    ncompleted < todo, aio != NULL; aio = aio->al_next) {
735 
736 			result = aio_error64(&aio->al_aiocb);
737 
738 			if (result == EINPROGRESS) {
739 				inprogress++;
740 				continue;
741 			}
742 
743 			if ((aio_return64(&aio->al_aiocb) == -1) || result) {
744 				filebench_log(LOG_ERROR, "aio failed: %s",
745 				    strerror(result));
746 				continue;
747 			}
748 
749 			ncompleted++;
750 
751 			if (aio_deallocate(flowop, &aio->al_aiocb) < 0) {
752 				filebench_log(LOG_ERROR, "Could not remove aio "
753 				    "from list ");
754 				flowop_endop(threadflow, flowop, 0);
755 				return (-1);
756 			}
757 		}
758 
759 		uncompleted -= ncompleted;
760 
761 #endif
762 		filebench_log(LOG_DEBUG_SCRIPT,
763 		    "aio2 completed %d ios, uncompleted = %d, inprogress = %d",
764 		    ncompleted, uncompleted, inprogress);
765 
766 	} while (uncompleted > MAXREAP);
767 
768 	flowop_endop(threadflow, flowop, 0);
769 
770 	free(worklist);
771 
772 	return (0);
773 }
774 
775 #endif /* HAVE_AIO */
776 
777 /*
778  * Initializes a "flowop_block" flowop. Specifically, it
779  * initializes the flowop's fo_cv and unlocks the fo_lock.
780  */
781 static int
782 flowoplib_block_init(flowop_t *flowop)
783 {
784 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d block init address %zx",
785 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
786 	(void) pthread_cond_init(&flowop->fo_cv, ipc_condattr());
787 	(void) ipc_mutex_unlock(&flowop->fo_lock);
788 
789 	return (0);
790 }
791 
792 /*
793  * Blocks the threadflow until woken up by flowoplib_wakeup.
794  * The routine blocks on the flowop's fo_cv condition variable.
795  */
796 static int
797 flowoplib_block(threadflow_t *threadflow, flowop_t *flowop)
798 {
799 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d blocking at address %zx",
800 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
801 	(void) ipc_mutex_lock(&flowop->fo_lock);
802 
803 	flowop_beginop(threadflow, flowop);
804 	(void) pthread_cond_wait(&flowop->fo_cv, &flowop->fo_lock);
805 	flowop_endop(threadflow, flowop, 0);
806 
807 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
808 	    flowop->fo_name, flowop->fo_instance);
809 
810 	(void) ipc_mutex_unlock(&flowop->fo_lock);
811 
812 	return (0);
813 }
814 
815 /*
816  * Wakes up one or more target blocking flowops.
817  * Sends broadcasts on the fo_cv condition variables of all
818  * flowops on the target list, except those that are
819  * FLOW_MASTER flowops. The target list consists of all
820  * flowops whose name matches this flowop's "fo_targetname"
821  * attribute. The target list is generated on the first
822  * invocation, and the run will be shutdown if no targets
823  * are found. Otherwise the routine always returns 0.
824  */
825 static int
826 flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop)
827 {
828 	flowop_t *target;
829 
830 	/* if this is the first wakeup, create the wakeup list */
831 	if (flowop->fo_targets == NULL) {
832 		flowop_t *result = flowop_find(flowop->fo_targetname);
833 
834 		flowop->fo_targets = result;
835 		if (result == NULL) {
836 			filebench_log(LOG_ERROR,
837 			    "wakeup: could not find op %s for thread %s",
838 			    flowop->fo_targetname,
839 			    threadflow->tf_name);
840 			filebench_shutdown(1);
841 		}
842 		while (result) {
843 			result->fo_targetnext =
844 			    result->fo_resultnext;
845 			result = result->fo_resultnext;
846 		}
847 	}
848 
849 	target = flowop->fo_targets;
850 
851 	/* wakeup the targets */
852 	while (target) {
853 		if (target->fo_instance == FLOW_MASTER) {
854 			target = target->fo_targetnext;
855 			continue;
856 		}
857 		filebench_log(LOG_DEBUG_IMPL,
858 		    "wakeup flow %s-%d at address %zx",
859 		    target->fo_name,
860 		    target->fo_instance,
861 		    &target->fo_cv);
862 
863 		flowop_beginop(threadflow, flowop);
864 		(void) ipc_mutex_lock(&target->fo_lock);
865 		(void) pthread_cond_broadcast(&target->fo_cv);
866 		(void) ipc_mutex_unlock(&target->fo_lock);
867 		flowop_endop(threadflow, flowop, 0);
868 
869 		target = target->fo_targetnext;
870 	}
871 
872 	return (0);
873 }
874 
875 /*
876  * "think time" routines. the "hog" routine consumes cpu cycles as
877  * it "thinks", while the "delay" flowop simply calls sleep() to delay
878  * for a given number of seconds without consuming cpu cycles.
879  */
880 
881 
882 /*
883  * Consumes CPU cycles and memory bandwidth by looping for
884  * flowop->fo_value times. With each loop sets memory location
885  * threadflow->tf_mem to 1.
886  */
887 static int
888 flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop)
889 {
890 	uint64_t value = *flowop->fo_value;
891 	int i;
892 
893 	filebench_log(LOG_DEBUG_IMPL, "hog enter");
894 	flowop_beginop(threadflow, flowop);
895 	if (threadflow->tf_mem != NULL) {
896 		for (i = 0; i < value; i++)
897 			*(threadflow->tf_mem) = 1;
898 	}
899 	flowop_endop(threadflow, flowop, 0);
900 	filebench_log(LOG_DEBUG_IMPL, "hog exit");
901 	return (0);
902 }
903 
904 
905 /*
906  * Delays for fo_value seconds.
907  */
908 static int
909 flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop)
910 {
911 	int value = *flowop->fo_value;
912 
913 	flowop_beginop(threadflow, flowop);
914 	(void) sleep(value);
915 	flowop_endop(threadflow, flowop, 0);
916 	return (0);
917 }
918 
919 /*
920  * Rate limiting routines. This is the event consuming half of the
921  * event system. Each of the four following routines will limit the rate
922  * to one unit of either calls, issued I/O operations, issued filebench
923  * operations, or I/O bandwidth. Since there is only one event generator,
924  * the events will be divided amoung multiple instances of an event
925  * consumer, and further divided among different consumers if more than
926  * one has been defined. There is no mechanism to enforce equal sharing
927  * of events.
928  */
929 
930 /*
931  * Completes one invocation per posted event. If eventgen_q
932  * has an event count greater than zero, one will be removed
933  * (count decremented), otherwise the calling thread will
934  * block until another event has been posted. Always returns 0
935  */
936 static int
937 flowoplib_eventlimit(threadflow_t *threadflow, flowop_t *flowop)
938 {
939 	/* Immediately bail if not set/enabled */
940 	if (filebench_shm->eventgen_hz == 0)
941 		return (0);
942 
943 	if (flowop->fo_initted == 0) {
944 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
945 		    flowop, threadflow->tf_name, threadflow->tf_instance);
946 		flowop->fo_initted = 1;
947 	}
948 
949 	flowop_beginop(threadflow, flowop);
950 	while (filebench_shm->eventgen_hz) {
951 		(void) ipc_mutex_lock(&filebench_shm->eventgen_lock);
952 		if (filebench_shm->eventgen_q > 0) {
953 			filebench_shm->eventgen_q--;
954 			(void) ipc_mutex_unlock(&filebench_shm->eventgen_lock);
955 			break;
956 		}
957 		(void) pthread_cond_wait(&filebench_shm->eventgen_cv,
958 		    &filebench_shm->eventgen_lock);
959 		(void) ipc_mutex_unlock(&filebench_shm->eventgen_lock);
960 	}
961 	flowop_endop(threadflow, flowop, 0);
962 	return (0);
963 }
964 
965 /*
966  * Blocks the calling thread if the number of issued I/O
967  * operations exceeds the number of posted events, thus
968  * limiting the average I/O operation rate to the rate
969  * specified by eventgen_hz. Always returns 0.
970  */
971 static int
972 flowoplib_iopslimit(threadflow_t *threadflow, flowop_t *flowop)
973 {
974 	uint64_t iops;
975 	uint64_t delta;
976 	uint64_t events;
977 
978 	/* Immediately bail if not set/enabled */
979 	if (filebench_shm->eventgen_hz == 0)
980 		return (0);
981 
982 	if (flowop->fo_initted == 0) {
983 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
984 		    flowop, threadflow->tf_name, threadflow->tf_instance);
985 		flowop->fo_initted = 1;
986 	}
987 
988 	iops = (controlstats.fs_rcount +
989 	    controlstats.fs_wcount);
990 
991 	/* Is this the first time around */
992 	if (flowop->fo_tputlast == 0) {
993 		flowop->fo_tputlast = iops;
994 		return (0);
995 	}
996 
997 	delta = iops - flowop->fo_tputlast;
998 	flowop->fo_tputbucket -= delta;
999 	flowop->fo_tputlast = iops;
1000 
1001 	/* No need to block if the q isn't empty */
1002 	if (flowop->fo_tputbucket >= 0LL) {
1003 		flowop_endop(threadflow, flowop, 0);
1004 		return (0);
1005 	}
1006 
1007 	iops = flowop->fo_tputbucket * -1;
1008 	events = iops;
1009 
1010 	flowop_beginop(threadflow, flowop);
1011 	while (filebench_shm->eventgen_hz) {
1012 
1013 		(void) ipc_mutex_lock(&filebench_shm->eventgen_lock);
1014 		if (filebench_shm->eventgen_q >= events) {
1015 			filebench_shm->eventgen_q -= events;
1016 			(void) ipc_mutex_unlock(&filebench_shm->eventgen_lock);
1017 			flowop->fo_tputbucket += events;
1018 			break;
1019 		}
1020 		(void) pthread_cond_wait(&filebench_shm->eventgen_cv,
1021 		    &filebench_shm->eventgen_lock);
1022 		(void) ipc_mutex_unlock(&filebench_shm->eventgen_lock);
1023 	}
1024 	flowop_endop(threadflow, flowop, 0);
1025 
1026 	return (0);
1027 }
1028 
1029 /*
1030  * Blocks the calling thread if the number of issued filebench
1031  * operations exceeds the number of posted events, thus limiting
1032  * the average filebench operation rate to the rate specified by
1033  * eventgen_hz. Always returns 0.
1034  */
1035 static int
1036 flowoplib_opslimit(threadflow_t *threadflow, flowop_t *flowop)
1037 {
1038 	uint64_t ops;
1039 	uint64_t delta;
1040 	uint64_t events;
1041 
1042 	/* Immediately bail if not set/enabled */
1043 	if (filebench_shm->eventgen_hz == 0)
1044 		return (0);
1045 
1046 	if (flowop->fo_initted == 0) {
1047 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1048 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1049 		flowop->fo_initted = 1;
1050 	}
1051 
1052 	ops = controlstats.fs_count;
1053 
1054 	/* Is this the first time around */
1055 	if (flowop->fo_tputlast == 0) {
1056 		flowop->fo_tputlast = ops;
1057 		return (0);
1058 	}
1059 
1060 	delta = ops - flowop->fo_tputlast;
1061 	flowop->fo_tputbucket -= delta;
1062 	flowop->fo_tputlast = ops;
1063 
1064 	/* No need to block if the q isn't empty */
1065 	if (flowop->fo_tputbucket >= 0LL) {
1066 		flowop_endop(threadflow, flowop, 0);
1067 		return (0);
1068 	}
1069 
1070 	ops = flowop->fo_tputbucket * -1;
1071 	events = ops;
1072 
1073 	flowop_beginop(threadflow, flowop);
1074 	while (filebench_shm->eventgen_hz) {
1075 		(void) ipc_mutex_lock(&filebench_shm->eventgen_lock);
1076 		if (filebench_shm->eventgen_q >= events) {
1077 			filebench_shm->eventgen_q -= events;
1078 			(void) ipc_mutex_unlock(&filebench_shm->eventgen_lock);
1079 			flowop->fo_tputbucket += events;
1080 			break;
1081 		}
1082 		(void) pthread_cond_wait(&filebench_shm->eventgen_cv,
1083 		    &filebench_shm->eventgen_lock);
1084 		(void) ipc_mutex_unlock(&filebench_shm->eventgen_lock);
1085 	}
1086 	flowop_endop(threadflow, flowop, 0);
1087 
1088 	return (0);
1089 }
1090 
1091 
1092 /*
1093  * Blocks the calling thread if the number of bytes of I/O
1094  * issued exceeds one megabyte times the number of posted
1095  * events, thus limiting the average I/O byte rate to one
1096  * megabyte times the event rate as set by eventgen_hz.
1097  * Always retuns 0.
1098  */
1099 static int
1100 flowoplib_bwlimit(threadflow_t *threadflow, flowop_t *flowop)
1101 {
1102 	uint64_t bytes;
1103 	uint64_t delta;
1104 	uint64_t events;
1105 
1106 	/* Immediately bail if not set/enabled */
1107 	if (filebench_shm->eventgen_hz == 0)
1108 		return (0);
1109 
1110 	if (flowop->fo_initted == 0) {
1111 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1112 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1113 		flowop->fo_initted = 1;
1114 	}
1115 
1116 	bytes = (controlstats.fs_rbytes +
1117 	    controlstats.fs_wbytes);
1118 
1119 	/* Is this the first time around */
1120 	if (flowop->fo_tputlast == 0) {
1121 		flowop->fo_tputlast = bytes;
1122 		return (0);
1123 	}
1124 
1125 	delta = bytes - flowop->fo_tputlast;
1126 	flowop->fo_tputbucket -= delta;
1127 	flowop->fo_tputlast = bytes;
1128 
1129 	/* No need to block if the q isn't empty */
1130 	if (flowop->fo_tputbucket >= 0LL) {
1131 		flowop_endop(threadflow, flowop, 0);
1132 		return (0);
1133 	}
1134 
1135 	bytes = flowop->fo_tputbucket * -1;
1136 	events = (bytes / MB) + 1;
1137 
1138 	filebench_log(LOG_DEBUG_IMPL, "%lld bytes, %lld events",
1139 	    bytes, events);
1140 
1141 	flowop_beginop(threadflow, flowop);
1142 	while (filebench_shm->eventgen_hz) {
1143 		(void) ipc_mutex_lock(&filebench_shm->eventgen_lock);
1144 		if (filebench_shm->eventgen_q >= events) {
1145 			filebench_shm->eventgen_q -= events;
1146 			(void) ipc_mutex_unlock(&filebench_shm->eventgen_lock);
1147 			flowop->fo_tputbucket += (events * MB);
1148 			break;
1149 		}
1150 		(void) pthread_cond_wait(&filebench_shm->eventgen_cv,
1151 		    &filebench_shm->eventgen_lock);
1152 		(void) ipc_mutex_unlock(&filebench_shm->eventgen_lock);
1153 	}
1154 	flowop_endop(threadflow, flowop, 0);
1155 
1156 	return (0);
1157 }
1158 
1159 /*
1160  * These flowops terminate a benchmark run when either the specified
1161  * number of bytes of I/O (flowoplib_finishonbytes) or the specified
1162  * number of I/O operations (flowoplib_finishoncount) have been generated.
1163  */
1164 
1165 
1166 /*
1167  * Stop filebench run when specified number of I/O bytes have been
1168  * transferred. Compares controlstats.fs_bytes with *flowop->value,
1169  * and if greater returns 1, stopping the run, if not, returns 0
1170  * to continue running.
1171  */
1172 static int
1173 flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop)
1174 {
1175 	uint64_t b;
1176 	uint64_t bytes = *flowop->fo_value;
1177 
1178 	b = controlstats.fs_bytes;
1179 
1180 	flowop_beginop(threadflow, flowop);
1181 	if (b > bytes) {
1182 		flowop_endop(threadflow, flowop, 0);
1183 		return (1);
1184 	}
1185 	flowop_endop(threadflow, flowop, 0);
1186 
1187 	return (0);
1188 }
1189 
1190 /*
1191  * Stop filebench run when specified number of I/O operations have
1192  * been performed. Compares controlstats.fs_count with *flowop->value,
1193  * and if greater returns 1, stopping the run, if not, returns 0 to
1194  * continue running.
1195  */
1196 static int
1197 flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop)
1198 {
1199 	uint64_t ops;
1200 	uint64_t count = *flowop->fo_value;
1201 
1202 	ops = controlstats.fs_count;
1203 
1204 	flowop_beginop(threadflow, flowop);
1205 	if (ops > count) {
1206 		flowop_endop(threadflow, flowop, 0);
1207 		return (1);
1208 	}
1209 	flowop_endop(threadflow, flowop, 0);
1210 
1211 	return (0);
1212 }
1213 
1214 /*
1215  * Semaphore synchronization using either System V semaphores or
1216  * posix semaphores. If System V semaphores are available, they will be
1217  * used, otherwise posix semaphores will be used.
1218  */
1219 
1220 
1221 /*
1222  * Initializes the filebench "block on semaphore" flowop.
1223  * If System V semaphores are implemented, the routine
1224  * initializes the System V semaphore subsystem if it hasn't
1225  * already been initialized, also allocates a pair of semids
1226  * and initializes the highwater System V semaphore.
1227  * If no System V semaphores, then does nothing special.
1228  * Returns -1 if it cannot acquire a set of System V semphores
1229  * or if the initial post to the semaphore set fails. Returns 0
1230  * on success.
1231  */
1232 static int
1233 flowoplib_semblock_init(flowop_t *flowop)
1234 {
1235 
1236 #ifdef HAVE_SYSV_SEM
1237 	int semid;
1238 	struct sembuf sbuf[2];
1239 	int highwater;
1240 
1241 	ipc_seminit();
1242 
1243 	flowop->fo_semid_lw = ipc_semidalloc();
1244 	flowop->fo_semid_hw = ipc_semidalloc();
1245 
1246 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d semblock init semid=%x",
1247 	    flowop->fo_name, flowop->fo_instance, flowop->fo_semid_lw);
1248 
1249 	/*
1250 	 * Raise the number of the hw queue, causing the posting side to
1251 	 * block if queue is > 2 x blocking value
1252 	 */
1253 	if ((semid = semget(filebench_shm->semkey, FILEBENCH_NSEMS, 0)) == -1) {
1254 		filebench_log(LOG_ERROR, "semblock init lookup %x failed: %s",
1255 		    filebench_shm->semkey,
1256 		    strerror(errno));
1257 		return (-1);
1258 	}
1259 
1260 	if ((highwater = flowop->fo_semid_hw) == 0)
1261 		highwater = *flowop->fo_value;
1262 
1263 	filebench_log(LOG_DEBUG_IMPL, "setting highwater to : %d", highwater);
1264 
1265 	sbuf[0].sem_num = (short)highwater;
1266 	sbuf[0].sem_op = *flowop->fo_highwater;
1267 	sbuf[0].sem_flg = 0;
1268 	if ((semop(semid, &sbuf[0], 1) == -1) && errno) {
1269 		filebench_log(LOG_ERROR, "semblock init post failed: %s (%d,"
1270 		    "%d)", strerror(errno), sbuf[0].sem_num, sbuf[0].sem_op);
1271 		return (-1);
1272 	}
1273 #else
1274 	filebench_log(LOG_DEBUG_IMPL,
1275 	    "flow %s-%d semblock init with posix semaphore",
1276 	    flowop->fo_name, flowop->fo_instance);
1277 
1278 	sem_init(&flowop->fo_sem, 1, 0);
1279 #endif	/* HAVE_SYSV_SEM */
1280 
1281 	if (!(*flowop->fo_blocking))
1282 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1283 
1284 	return (0);
1285 }
1286 
1287 /*
1288  * Releases the semids for the System V semaphore allocated
1289  * to this flowop. If not using System V semaphores, then
1290  * it is effectively just a no-op. Always returns 0.
1291  */
1292 static void
1293 flowoplib_semblock_destruct(flowop_t *flowop)
1294 {
1295 #ifdef HAVE_SYSV_SEM
1296 	ipc_semidfree(flowop->fo_semid_lw);
1297 	ipc_semidfree(flowop->fo_semid_hw);
1298 #else
1299 	sem_destroy(&flowop->fo_sem);
1300 #endif /* HAVE_SYSV_SEM */
1301 }
1302 
1303 /*
1304  * Attempts to pass a System V or posix semaphore as appropriate,
1305  * and blocks if necessary. Returns -1 if a set of System V
1306  * semphores is not available or cannot be acquired, or if the initial
1307  * post to the semaphore set fails. Returns 0 on success.
1308  */
1309 static int
1310 flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop)
1311 {
1312 
1313 #ifdef HAVE_SYSV_SEM
1314 	struct sembuf sbuf[2];
1315 	int value = *flowop->fo_value;
1316 	int semid;
1317 	struct timespec timeout;
1318 
1319 	if ((semid = semget(filebench_shm->semkey, FILEBENCH_NSEMS, 0)) == -1) {
1320 		filebench_log(LOG_ERROR, "lookup semop %x failed: %s",
1321 		    filebench_shm->semkey,
1322 		    strerror(errno));
1323 		return (-1);
1324 	}
1325 
1326 	filebench_log(LOG_DEBUG_IMPL,
1327 	    "flow %s-%d sem blocking on id %x num %x value %d",
1328 	    flowop->fo_name, flowop->fo_instance, semid,
1329 	    flowop->fo_semid_hw, value);
1330 
1331 	/* Post, decrement the increment the hw queue */
1332 	sbuf[0].sem_num = flowop->fo_semid_hw;
1333 	sbuf[0].sem_op = (short)value;
1334 	sbuf[0].sem_flg = 0;
1335 	sbuf[1].sem_num = flowop->fo_semid_lw;
1336 	sbuf[1].sem_op = value * -1;
1337 	sbuf[1].sem_flg = 0;
1338 	timeout.tv_sec = 600;
1339 	timeout.tv_nsec = 0;
1340 
1341 	if (*flowop->fo_blocking)
1342 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1343 
1344 	flowop_beginop(threadflow, flowop);
1345 
1346 #ifdef HAVE_SEMTIMEDOP
1347 	(void) semtimedop(semid, &sbuf[0], 1, &timeout);
1348 	(void) semtimedop(semid, &sbuf[1], 1, &timeout);
1349 #else
1350 	(void) semop(semid, &sbuf[0], 1);
1351 	(void) semop(semid, &sbuf[1], 1);
1352 #endif /* HAVE_SEMTIMEDOP */
1353 
1354 	if (*flowop->fo_blocking)
1355 		(void) ipc_mutex_lock(&flowop->fo_lock);
1356 
1357 	flowop_endop(threadflow, flowop, 0);
1358 
1359 #else
1360 	int value = *flowop->fo_value;
1361 	int i;
1362 
1363 	filebench_log(LOG_DEBUG_IMPL,
1364 	    "flow %s-%d sem blocking on posix semaphore",
1365 	    flowop->fo_name, flowop->fo_instance);
1366 
1367 	/* Decrement sem by value */
1368 	for (i = 0; i < value; i++) {
1369 		if (sem_wait(&flowop->fo_sem) == -1) {
1370 			filebench_log(LOG_ERROR, "semop wait failed");
1371 			return (-1);
1372 		}
1373 	}
1374 
1375 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d sem unblocking",
1376 	    flowop->fo_name, flowop->fo_instance);
1377 #endif /* HAVE_SYSV_SEM */
1378 
1379 	return (0);
1380 }
1381 
1382 /*
1383  * Calls ipc_seminit(), and does so whether System V semaphores
1384  * are available or not. Hence it will cause ipc_seminit to log errors
1385  * if they are not. Always returns 0.
1386  */
1387 /* ARGSUSED */
1388 static int
1389 flowoplib_sempost_init(flowop_t *flowop)
1390 {
1391 #ifdef HAVE_SYSV_SEM
1392 	ipc_seminit();
1393 #endif /* HAVE_SYSV_SEM */
1394 	return (0);
1395 }
1396 
1397 /*
1398  * Post to a System V or posix semaphore as appropriate.
1399  * On the first call for a given flowop instance, this routine
1400  * will use the fo_targetname attribute to locate all semblock
1401  * flowops that are expecting posts from this flowop. All
1402  * target flowops on this list will have a post operation done
1403  * to their semaphores on each call.
1404  */
1405 static int
1406 flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop)
1407 {
1408 	flowop_t *target;
1409 
1410 	filebench_log(LOG_DEBUG_IMPL,
1411 	    "sempost flow %s-%d",
1412 	    flowop->fo_name,
1413 	    flowop->fo_instance);
1414 
1415 	/* if this is the first post, create the post list */
1416 	if (flowop->fo_targets == NULL) {
1417 		flowop_t *result = flowop_find(flowop->fo_targetname);
1418 
1419 		flowop->fo_targets = result;
1420 
1421 		if (result == NULL) {
1422 			filebench_log(LOG_ERROR,
1423 			    "sempost: could not find op %s for thread %s",
1424 			    flowop->fo_targetname,
1425 			    threadflow->tf_name);
1426 			filebench_shutdown(1);
1427 		}
1428 
1429 		while (result) {
1430 			result->fo_targetnext =
1431 			    result->fo_resultnext;
1432 			result = result->fo_resultnext;
1433 		}
1434 	}
1435 
1436 	target = flowop->fo_targets;
1437 
1438 	flowop_beginop(threadflow, flowop);
1439 	/* post to the targets */
1440 	while (target) {
1441 #ifdef HAVE_SYSV_SEM
1442 		struct sembuf sbuf[2];
1443 		int semid;
1444 		int blocking;
1445 #else
1446 		int i;
1447 #endif /* HAVE_SYSV_SEM */
1448 		int value = *flowop->fo_value;
1449 		struct timespec timeout;
1450 
1451 		if (target->fo_instance == FLOW_MASTER) {
1452 			target = target->fo_targetnext;
1453 			continue;
1454 		}
1455 
1456 #ifdef HAVE_SYSV_SEM
1457 
1458 		filebench_log(LOG_DEBUG_IMPL,
1459 		    "sempost flow %s-%d num %x",
1460 		    target->fo_name,
1461 		    target->fo_instance,
1462 		    target->fo_semid_lw);
1463 
1464 		if ((semid = semget(filebench_shm->semkey,
1465 		    FILEBENCH_NSEMS, 0)) == -1) {
1466 			filebench_log(LOG_ERROR,
1467 			    "lookup semop %x failed: %s",
1468 			    filebench_shm->semkey,
1469 			    strerror(errno));
1470 			return (-1);
1471 		}
1472 
1473 		sbuf[0].sem_num = target->fo_semid_lw;
1474 		sbuf[0].sem_op = (short)value;
1475 		sbuf[0].sem_flg = 0;
1476 		sbuf[1].sem_num = target->fo_semid_hw;
1477 		sbuf[1].sem_op = value * -1;
1478 		sbuf[1].sem_flg = 0;
1479 		timeout.tv_sec = 600;
1480 		timeout.tv_nsec = 0;
1481 
1482 		if (*flowop->fo_blocking)
1483 			blocking = 1;
1484 		else
1485 			blocking = 0;
1486 
1487 #ifdef HAVE_SEMTIMEDOP
1488 		if ((semtimedop(semid, &sbuf[0], blocking + 1,
1489 		    &timeout) == -1) && (errno && (errno != EAGAIN))) {
1490 #else
1491 		if ((semop(semid, &sbuf[0], blocking + 1) == -1) &&
1492 		    (errno && (errno != EAGAIN))) {
1493 #endif /* HAVE_SEMTIMEDOP */
1494 			filebench_log(LOG_ERROR, "semop post failed: %s",
1495 			    strerror(errno));
1496 			return (-1);
1497 		}
1498 
1499 		filebench_log(LOG_DEBUG_IMPL,
1500 		    "flow %s-%d finished posting",
1501 		    target->fo_name, target->fo_instance);
1502 #else
1503 		filebench_log(LOG_DEBUG_IMPL,
1504 		    "sempost flow %s-%d to posix semaphore",
1505 		    target->fo_name,
1506 		    target->fo_instance);
1507 
1508 		/* Increment sem by value */
1509 		for (i = 0; i < value; i++) {
1510 			if (sem_post(&target->fo_sem) == -1) {
1511 				filebench_log(LOG_ERROR, "semop post failed");
1512 				return (-1);
1513 			}
1514 		}
1515 
1516 		filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
1517 		    target->fo_name, target->fo_instance);
1518 #endif /* HAVE_SYSV_SEM */
1519 
1520 		target = target->fo_targetnext;
1521 	}
1522 	flowop_endop(threadflow, flowop, 0);
1523 
1524 	return (0);
1525 }
1526 
1527 
1528 /*
1529  * Section for exercising create / open / close / delete operations
1530  * on files within a fileset. For proper operation, the flowop attribute
1531  * "fd", which sets the fo_fdnumber field in the flowop, must be used
1532  * so that the same file is opened and later closed. "fd" is an index
1533  * into a pair of arrays maintained by threadflows, one of which
1534  * contains the operating system assigned file descriptors and the other
1535  * a pointer to the filesetentry whose file the file descriptor
1536  * references. An openfile flowop defined without fd being set will use
1537  * the default (0) fd or, if specified, rotate through fd indices, but
1538  * createfile and closefile must use the default or a specified fd.
1539  * Meanwhile deletefile picks and arbitrary file to delete, regardless
1540  * of fd attribute.
1541  */
1542 
1543 /*
1544  * XXX Making file selection more consistent among the flowops might good
1545  */
1546 
1547 
1548 /*
1549  * Emulates (and actually does) file open. Obtains a file descriptor
1550  * index, then calls flowoplib_openfile_common() to open. Returns -1
1551  * if not file descriptor is found or flowoplib_openfile_common
1552  * encounters an error, otherwise 0.
1553  */
1554 static int
1555 flowoplib_openfile(threadflow_t *threadflow, flowop_t *flowop)
1556 {
1557 	int fd = flowoplib_fdnum(threadflow, flowop);
1558 
1559 	if (fd == -1)
1560 		return (-1);
1561 
1562 	return (flowoplib_openfile_common(threadflow, flowop, fd));
1563 }
1564 
1565 /*
1566  * Common file opening code for filesets. Uses the supplied
1567  * file descriptor index to determine the tf_fd entry to use.
1568  * If the entry is empty (0) and the fileset exists, fileset
1569  * pick is called to select a fileset entry to use. The file
1570  * specified in the filesetentry is opened, and the returned
1571  * operating system file descriptor and a pointer to the
1572  * filesetentry are stored in tf_fd[fd] and tf_fse[fd],
1573  * respectively. Returns -1 on error, 0 on success.
1574  */
1575 static int
1576 flowoplib_openfile_common(threadflow_t *threadflow, flowop_t *flowop, int fd)
1577 {
1578 	filesetentry_t *file;
1579 	int tid = 0;
1580 
1581 	/*
1582 	 * If the flowop doesn't default to persistent fd
1583 	 * then get unique thread ID for use by fileset_pick
1584 	 */
1585 	if (integer_isset(flowop->fo_rotatefd))
1586 		tid = threadflow->tf_utid;
1587 
1588 	if (threadflow->tf_fd[fd] != 0) {
1589 		filebench_log(LOG_ERROR,
1590 		    "flowop %s attempted to open without closing on fd %d",
1591 		    flowop->fo_name, fd);
1592 		return (-1);
1593 	}
1594 
1595 	if (flowop->fo_fileset == NULL) {
1596 		filebench_log(LOG_ERROR, "flowop NULL file");
1597 		return (-1);
1598 	}
1599 
1600 #ifdef HAVE_RAW_SUPPORT
1601 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1602 		int open_attrs = 0;
1603 		char name[MAXPATHLEN];
1604 
1605 		(void) strcpy(name, *flowop->fo_fileset->fs_path);
1606 		(void) strcat(name, "/");
1607 		(void) strcat(name, flowop->fo_fileset->fs_name);
1608 
1609 		if (*flowop->fo_dsync) {
1610 #ifdef sun
1611 			open_attrs |= O_DSYNC;
1612 #else
1613 			open_attrs |= O_FSYNC;
1614 #endif
1615 		}
1616 
1617 		filebench_log(LOG_DEBUG_SCRIPT,
1618 		    "open raw device %s flags %d = %d", name, open_attrs, fd);
1619 
1620 		threadflow->tf_fd[fd] = open64(name,
1621 		    O_RDWR | open_attrs, 0666);
1622 
1623 		if (threadflow->tf_fd[fd] < 0) {
1624 			filebench_log(LOG_ERROR,
1625 			    "Failed to open raw device %s: %s",
1626 			    name, strerror(errno));
1627 			return (-1);
1628 		}
1629 
1630 		/* if running on Solaris, use un-buffered io */
1631 #ifdef sun
1632 		(void) directio(threadflow->tf_fd[fd], DIRECTIO_ON);
1633 #endif
1634 
1635 		threadflow->tf_fse[fd] = NULL;
1636 
1637 		return (0);
1638 	}
1639 #endif /* HAVE_RAW_SUPPORT */
1640 
1641 	if ((file = fileset_pick(flowop->fo_fileset,
1642 	    FILESET_PICKEXISTS, tid)) == NULL) {
1643 		filebench_log(LOG_ERROR,
1644 		    "flowop %s failed to pick file from %s on fd %d",
1645 		    flowop->fo_name,
1646 		    flowop->fo_fileset->fs_name, fd);
1647 		return (-1);
1648 	}
1649 
1650 	threadflow->tf_fse[fd] = file;
1651 
1652 	flowop_beginop(threadflow, flowop);
1653 	threadflow->tf_fd[fd] = fileset_openfile(flowop->fo_fileset,
1654 	    file, O_RDWR, 0666, flowoplib_fileattrs(flowop));
1655 	flowop_endop(threadflow, flowop, 0);
1656 
1657 	if (threadflow->tf_fd[fd] < 0) {
1658 		filebench_log(LOG_ERROR, "failed to open file %s",
1659 		    flowop->fo_name);
1660 		return (-1);
1661 	}
1662 
1663 	filebench_log(LOG_DEBUG_SCRIPT,
1664 	    "flowop %s: opened %s fd[%d] = %d",
1665 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1666 
1667 	return (0);
1668 }
1669 
1670 /*
1671  * Emulate create of a file. Uses the flowop's fdnumber to select
1672  * tf_fd and tf_fse array locations to put the created file's file
1673  * descriptor and filesetentry respectively. Uses fileset_pick()
1674  * to select a specific filesetentry whose file does not currently
1675  * exist for the file create operation. Then calls
1676  * fileset_openfile() with the O_CREATE flag set to create the
1677  * file. Returns -1 if the array index specified by fdnumber is
1678  * already in use, the flowop has no associated fileset, or
1679  * the create call fails. Returns 1 if a filesetentry with a
1680  * nonexistent file cannot be found. Returns 0 on success.
1681  */
1682 static int
1683 flowoplib_createfile(threadflow_t *threadflow, flowop_t *flowop)
1684 {
1685 	filesetentry_t *file;
1686 	int fd = flowop->fo_fdnumber;
1687 
1688 	if (threadflow->tf_fd[fd] != 0) {
1689 		filebench_log(LOG_ERROR,
1690 		    "flowop %s attempted to create without closing on fd %d",
1691 		    flowop->fo_name, fd);
1692 		return (-1);
1693 	}
1694 
1695 	if (flowop->fo_fileset == NULL) {
1696 		filebench_log(LOG_ERROR, "flowop NULL file");
1697 		return (-1);
1698 	}
1699 
1700 #ifdef HAVE_RAW_SUPPORT
1701 	/* can't be used with raw devices */
1702 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1703 		filebench_log(LOG_ERROR,
1704 		    "flowop %s attempted to a createfile on RAW device",
1705 		    flowop->fo_name);
1706 		return (-1);
1707 	}
1708 #endif /* HAVE_RAW_SUPPORT */
1709 
1710 	if ((file = fileset_pick(flowop->fo_fileset,
1711 	    FILESET_PICKNOEXIST, 0)) == NULL) {
1712 		filebench_log(LOG_DEBUG_SCRIPT, "flowop %s failed to pick file",
1713 		    flowop->fo_name);
1714 		return (1);
1715 	}
1716 
1717 	threadflow->tf_fse[fd] = file;
1718 
1719 	flowop_beginop(threadflow, flowop);
1720 	threadflow->tf_fd[fd] = fileset_openfile(flowop->fo_fileset,
1721 	    file, O_RDWR | O_CREAT, 0666, flowoplib_fileattrs(flowop));
1722 	flowop_endop(threadflow, flowop, 0);
1723 
1724 	if (threadflow->tf_fd[fd] < 0) {
1725 		filebench_log(LOG_ERROR, "failed to create file %s",
1726 		    flowop->fo_name);
1727 		return (-1);
1728 	}
1729 
1730 	filebench_log(LOG_DEBUG_SCRIPT,
1731 	    "flowop %s: created %s fd[%d] = %d",
1732 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1733 
1734 	return (0);
1735 }
1736 
1737 /*
1738  * Emulates delete of a file. Picks an arbitrary filesetentry
1739  * whose file exists and uses unlink() to delete it. Clears
1740  * the FSE_EXISTS flag for the filesetentry. Returns -1 if the
1741  * flowop has no associated fileset. Returns 1 if an appropriate
1742  * filesetentry cannot be found, and 0 on success.
1743  */
1744 static int
1745 flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop)
1746 {
1747 	filesetentry_t *file;
1748 	fileset_t *fileset;
1749 	char path[MAXPATHLEN];
1750 	char *pathtmp;
1751 
1752 	if (flowop->fo_fileset == NULL) {
1753 		filebench_log(LOG_ERROR, "flowop NULL file");
1754 		return (-1);
1755 	}
1756 
1757 	fileset = flowop->fo_fileset;
1758 
1759 #ifdef HAVE_RAW_SUPPORT
1760 	/* can't be used with raw devices */
1761 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1762 		filebench_log(LOG_ERROR,
1763 		    "flowop %s attempted a deletefile on RAW device",
1764 		    flowop->fo_name);
1765 		return (-1);
1766 	}
1767 #endif /* HAVE_RAW_SUPPORT */
1768 
1769 	if ((file = fileset_pick(flowop->fo_fileset,
1770 	    FILESET_PICKEXISTS, 0)) == NULL) {
1771 		filebench_log(LOG_DEBUG_SCRIPT, "flowop %s failed to pick file",
1772 		    flowop->fo_name);
1773 		return (1);
1774 	}
1775 
1776 	*path = 0;
1777 	(void) strcpy(path, *fileset->fs_path);
1778 	(void) strcat(path, "/");
1779 	(void) strcat(path, fileset->fs_name);
1780 	pathtmp = fileset_resolvepath(file);
1781 	(void) strcat(path, pathtmp);
1782 	free(pathtmp);
1783 
1784 	flowop_beginop(threadflow, flowop);
1785 	(void) unlink(path);
1786 	flowop_endop(threadflow, flowop, 0);
1787 	file->fse_flags &= ~FSE_EXISTS;
1788 	(void) ipc_mutex_unlock(&file->fse_lock);
1789 
1790 	filebench_log(LOG_DEBUG_SCRIPT, "deleted file %s", file->fse_path);
1791 
1792 	return (0);
1793 }
1794 
1795 /*
1796  * Emulates fsync of a file. Obtains the file descriptor index
1797  * from the flowop, obtains the actual file descriptor from
1798  * the threadflow's table, checks to be sure it is still an
1799  * open file, then does an fsync operation on it. Returns -1
1800  * if the file no longer is open, 0 otherwise.
1801  */
1802 static int
1803 flowoplib_fsync(threadflow_t *threadflow, flowop_t *flowop)
1804 {
1805 	filesetentry_t *file;
1806 	int fd = flowop->fo_fdnumber;
1807 
1808 	if (threadflow->tf_fd[fd] == 0) {
1809 		filebench_log(LOG_ERROR,
1810 		    "flowop %s attempted to fsync a closed fd %d",
1811 		    flowop->fo_name, fd);
1812 		return (-1);
1813 	}
1814 
1815 	file = threadflow->tf_fse[fd];
1816 
1817 	if ((file == NULL) ||
1818 	    (file->fse_fileset->fs_attrs & FILESET_IS_RAW_DEV)) {
1819 		filebench_log(LOG_ERROR,
1820 		    "flowop %s attempted to a fsync a RAW device",
1821 		    flowop->fo_name);
1822 		return (-1);
1823 	}
1824 
1825 	/* Measure time to fsync */
1826 	flowop_beginop(threadflow, flowop);
1827 	(void) fsync(threadflow->tf_fd[fd]);
1828 	flowop_endop(threadflow, flowop, 0);
1829 
1830 	filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s", file->fse_path);
1831 
1832 	return (0);
1833 }
1834 
1835 /*
1836  * Emulate fsync of an entire fileset. Search through the
1837  * threadflow's file descriptor array, doing fsync() on each
1838  * open file that belongs to the flowop's fileset. Always
1839  * returns 0.
1840  */
1841 static int
1842 flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop)
1843 {
1844 	int fd;
1845 
1846 	for (fd = 0; fd < THREADFLOW_MAXFD; fd++) {
1847 		filesetentry_t *file;
1848 
1849 		/* Match the file set to fsync */
1850 		if ((threadflow->tf_fse[fd] == NULL) ||
1851 		    (flowop->fo_fileset != threadflow->tf_fse[fd]->fse_fileset))
1852 			continue;
1853 
1854 		/* Measure time to fsync */
1855 		flowop_beginop(threadflow, flowop);
1856 		(void) fsync(threadflow->tf_fd[fd]);
1857 		flowop_endop(threadflow, flowop, 0);
1858 
1859 		file = threadflow->tf_fse[fd];
1860 
1861 		filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s",
1862 		    file->fse_path);
1863 	}
1864 
1865 	return (0);
1866 }
1867 
1868 /*
1869  * Emulate close of a file.  Obtains the file descriptor index
1870  * from the flowop, obtains the actual file descriptor from the
1871  * threadflow's table, checks to be sure it is still an open
1872  * file, then does a close operation on it. Then sets the
1873  * threadflow file descriptor table entry to 0, and the file set
1874  * entry pointer to NULL. Returns -1 if the file was not open,
1875  * 0 otherwise.
1876  */
1877 static int
1878 flowoplib_closefile(threadflow_t *threadflow, flowop_t *flowop)
1879 {
1880 	filesetentry_t *file;
1881 	int fd = flowop->fo_fdnumber;
1882 
1883 	if (threadflow->tf_fd[fd] == 0) {
1884 		filebench_log(LOG_ERROR,
1885 		    "flowop %s attempted to close an already closed fd %d",
1886 		    flowop->fo_name, fd);
1887 		return (-1);
1888 	}
1889 
1890 	/* Measure time to close */
1891 	flowop_beginop(threadflow, flowop);
1892 	(void) close(threadflow->tf_fd[fd]);
1893 	flowop_endop(threadflow, flowop, 0);
1894 
1895 	file = threadflow->tf_fse[fd];
1896 
1897 	threadflow->tf_fd[fd] = 0;
1898 	threadflow->tf_fse[fd] = NULL;
1899 
1900 	filebench_log(LOG_DEBUG_SCRIPT, "closed file %s", file->fse_path);
1901 
1902 	return (0);
1903 }
1904 
1905 /*
1906  * Emulate stat of a file. Picks an arbitrary filesetentry with
1907  * an existing file from the flowop's fileset, then performs a
1908  * stat() operation on it. Returns -1 if the flowop has no
1909  * associated fileset. Returns 1 if an appropriate filesetentry
1910  * cannot be found, and 0 on success.
1911  */
1912 static int
1913 flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop)
1914 {
1915 	filesetentry_t *file;
1916 	fileset_t *fileset;
1917 	char path[MAXPATHLEN];
1918 	char *pathtmp;
1919 
1920 	if (flowop->fo_fileset == NULL) {
1921 		filebench_log(LOG_ERROR, "flowop NULL file");
1922 		return (-1);
1923 	}
1924 
1925 	fileset = flowop->fo_fileset;
1926 
1927 	if ((file = fileset_pick(flowop->fo_fileset,
1928 	    FILESET_PICKEXISTS, 0)) == NULL) {
1929 		filebench_log(LOG_DEBUG_SCRIPT, "flowop %s failed to pick file",
1930 		    flowop->fo_name);
1931 		return (1);
1932 	}
1933 
1934 	*path = 0;
1935 	(void) strcpy(path, *fileset->fs_path);
1936 	(void) strcat(path, "/");
1937 	(void) strcat(path, fileset->fs_name);
1938 	pathtmp = fileset_resolvepath(file);
1939 	(void) strcat(path, pathtmp);
1940 	free(pathtmp);
1941 
1942 	flowop_beginop(threadflow, flowop);
1943 	flowop_endop(threadflow, flowop, 0);
1944 
1945 	(void) ipc_mutex_unlock(&file->fse_lock);
1946 
1947 	return (0);
1948 }
1949 
1950 
1951 /*
1952  * Additional reads and writes. Read and write whole files, write
1953  * and append to files. Some of these work with both fileobjs and
1954  * filesets, others only with filesets. The flowoplib_write routine
1955  * writes from thread memory, while the others read or write using
1956  * fo_buf memory. Note that both flowoplib_read() and
1957  * flowoplib_aiowrite() use thread memory as well.
1958  */
1959 
1960 
1961 /*
1962  * Emulate a read of a whole file. The file must be open with
1963  * file descriptor and filesetentry stored at the locations indexed
1964  * by the flowop's fdnumber. It then seeks to the beginning of the
1965  * associated file, and reads fs_iosize bytes at a time until the end
1966  * of the file. Returns -1 on error, 0 on success.
1967  */
1968 static int
1969 flowoplib_readwholefile(threadflow_t *threadflow, flowop_t *flowop)
1970 {
1971 	caddr_t iobuf;
1972 	off64_t bytes = 0;
1973 	int fd = flowop->fo_fdnumber;
1974 	int filedesc;
1975 	int ret;
1976 	uint64_t wss;
1977 	vinteger_t iosize = *flowop->fo_iosize;
1978 
1979 	/* get the file to use */
1980 	if (flowoplib_filesetup(threadflow, flowop, &wss, &filedesc) != 0)
1981 		return (-1);
1982 
1983 	/* an I/O size of zero means read entire working set with one I/O */
1984 	if (iosize == 0)
1985 		iosize = wss;
1986 
1987 	if (flowoplib_iobufsetup(threadflow, flowop, &iobuf, iosize) != 0)
1988 		return (-1);
1989 
1990 	/* Measure time to read bytes */
1991 	flowop_beginop(threadflow, flowop);
1992 	(void) lseek64(filedesc, 0, SEEK_SET);
1993 	while ((ret = read(filedesc, iobuf, iosize)) > 0)
1994 		bytes += ret;
1995 
1996 	flowop_endop(threadflow, flowop, bytes);
1997 
1998 	if (ret < 0) {
1999 		filebench_log(LOG_ERROR,
2000 		    "Failed to read fd %d: %s",
2001 		    fd, strerror(errno));
2002 		return (-1);
2003 	}
2004 
2005 	return (0);
2006 }
2007 
2008 /*
2009  * Emulate a write to a file of size fo_iosize.  Will write
2010  * to a file from a fileset if the flowop's fo_fileset field
2011  * specifies one or its fdnumber is non zero. Otherwise it
2012  * will write to a fileobj file, if one exists. If the file
2013  * is not currently open, the routine will attempt to open
2014  * it. The flowop's fo_wss parameter will be used to set the
2015  * maximum file size if it is non-zero, otherwise the
2016  * filesetentry's  fse_size will be used. A random memory
2017  * buffer offset is calculated, and, if fo_random is TRUE,
2018  * a random file offset is used for the write. Otherwise the
2019  * write is to the next sequential location. Returns 1 on
2020  * errors, 0 on success.
2021  */
2022 static int
2023 flowoplib_write(threadflow_t *threadflow, flowop_t *flowop)
2024 {
2025 	caddr_t iobuf;
2026 	vinteger_t wss;
2027 	int filedesc;
2028 
2029 	if (flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2030 	    &filedesc, *flowop->fo_iosize) != 0)
2031 		return (-1);
2032 
2033 	if (*flowop->fo_random) {
2034 		uint64_t fileoffset;
2035 
2036 		if (filebench_randomno64(&fileoffset,
2037 		    wss, *flowop->fo_iosize) == -1) {
2038 			filebench_log(LOG_ERROR,
2039 			    "file size smaller than IO size for thread %s",
2040 			    flowop->fo_name);
2041 			return (-1);
2042 		}
2043 		flowop_beginop(threadflow, flowop);
2044 		if (pwrite64(filedesc, iobuf,
2045 		    *flowop->fo_iosize, (off64_t)fileoffset) == -1) {
2046 			filebench_log(LOG_ERROR, "write failed, "
2047 			    "offset %lld io buffer %zd: %s",
2048 			    fileoffset, iobuf, strerror(errno));
2049 			flowop_endop(threadflow, flowop, 0);
2050 			return (-1);
2051 		}
2052 		flowop_endop(threadflow, flowop, *flowop->fo_iosize);
2053 	} else {
2054 		flowop_beginop(threadflow, flowop);
2055 		if (write(filedesc, iobuf,
2056 		    *flowop->fo_iosize) == -1) {
2057 			filebench_log(LOG_ERROR,
2058 			    "write failed, io buffer %zd: %s",
2059 			    iobuf, strerror(errno));
2060 			flowop_endop(threadflow, flowop, 0);
2061 			return (-1);
2062 		}
2063 		flowop_endop(threadflow, flowop, *flowop->fo_iosize);
2064 	}
2065 
2066 	return (0);
2067 }
2068 
2069 /*
2070  * Emulate a write of a whole file.  The size of the file
2071  * is taken from a filesetentry identified by fo_srcfdnumber or
2072  * from the working set size, while the file descriptor used is
2073  * identified by fo_fdnumber. Does multiple writes of fo_iosize
2074  * length length until full file has been written. Returns -1 on
2075  * error, 0 on success.
2076  */
2077 static int
2078 flowoplib_writewholefile(threadflow_t *threadflow, flowop_t *flowop)
2079 {
2080 	caddr_t iobuf;
2081 	filesetentry_t *file;
2082 	int wsize;
2083 	off64_t seek;
2084 	off64_t bytes = 0;
2085 	uint64_t wss;
2086 	int filedesc;
2087 	int srcfd = flowop->fo_srcfdnumber;
2088 	int ret;
2089 	vinteger_t iosize = *flowop->fo_iosize;
2090 
2091 	/* get the file to use */
2092 	if (flowoplib_filesetup(threadflow, flowop, &wss, &filedesc) != 0)
2093 		return (-1);
2094 
2095 	/* an I/O size of zero means read entire working set with one I/O */
2096 	if (iosize == 0)
2097 		iosize = wss;
2098 
2099 	if (flowoplib_iobufsetup(threadflow, flowop, &iobuf, iosize) != 0)
2100 		return (-1);
2101 
2102 	file = threadflow->tf_fse[srcfd];
2103 	if ((srcfd != 0) && (file == NULL)) {
2104 		filebench_log(LOG_ERROR, "flowop %s: NULL src file",
2105 		    flowop->fo_name);
2106 		return (-1);
2107 	}
2108 
2109 	if (file)
2110 		wss = file->fse_size;
2111 
2112 	wsize = (int)MIN(wss, iosize);
2113 
2114 	/* Measure time to write bytes */
2115 	flowop_beginop(threadflow, flowop);
2116 	for (seek = 0; seek < wss; seek += wsize) {
2117 		ret = write(filedesc, iobuf, wsize);
2118 		if (ret != wsize) {
2119 			filebench_log(LOG_ERROR,
2120 			    "Failed to write %d bytes on fd %d: %s",
2121 			    wsize, filedesc, strerror(errno));
2122 			flowop_endop(threadflow, flowop, 0);
2123 			return (-1);
2124 		}
2125 		wsize = (int)MIN(wss - seek, iosize);
2126 		bytes += ret;
2127 	}
2128 	flowop_endop(threadflow, flowop, bytes);
2129 
2130 	return (0);
2131 }
2132 
2133 
2134 /*
2135  * Emulate a fixed size append to a file. Will append data to
2136  * a file chosen from a fileset if the flowop's fo_fileset
2137  * field specifies one or if its fdnumber is non zero.
2138  * Otherwise it will write to a fileobj file, if one exists.
2139  * The flowop's fo_wss parameter will be used to set the
2140  * maximum file size if it is non-zero, otherwise the
2141  * filesetentry's fse_size will be used. A random memory
2142  * buffer offset is calculated, then a logical seek to the
2143  * end of file is done followed by a write of fo_iosize
2144  * bytes. Writes are actually done from fo_buf, rather than
2145  * tf_mem as is done with flowoplib_write(), and no check
2146  * is made to see if fo_iosize exceeds the size of fo_buf.
2147  * Returns -1 on error, 0 on success.
2148  */
2149 static int
2150 flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop)
2151 {
2152 	caddr_t iobuf;
2153 	int filedesc;
2154 	vinteger_t wss;
2155 	vinteger_t iosize = *flowop->fo_iosize;
2156 	int ret;
2157 
2158 	if (flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2159 	    &filedesc, iosize) != 0)
2160 		return (-1);
2161 
2162 	/* XXX wss is not being used */
2163 
2164 	/* Measure time to write bytes */
2165 	flowop_beginop(threadflow, flowop);
2166 	(void) lseek64(filedesc, 0, SEEK_END);
2167 	ret = write(filedesc, iobuf, iosize);
2168 	if (ret != iosize) {
2169 		filebench_log(LOG_ERROR,
2170 		    "Failed to write %d bytes on fd %d: %s",
2171 		    iosize, filedesc, strerror(errno));
2172 		flowop_endop(threadflow, flowop, 0);
2173 		return (-1);
2174 	}
2175 	flowop_endop(threadflow, flowop, iosize);
2176 
2177 	return (0);
2178 }
2179 
2180 /*
2181  * Emulate a random size append to a file. Will append data
2182  * to a file chosen from a fileset if the flowop's fo_fileset
2183  * field specifies one or if its fdnumber is non zero. Otherwise
2184  * it will write to a fileobj file, if one exists. The flowop's
2185  * fo_wss parameter will be used to set the maximum file size
2186  * if it is non-zero, otherwise the filesetentry's fse_size
2187  * will be used.  A random transfer size (but at most fo_iosize
2188  * bytes) and a random memory offset are calculated. A logical
2189  * seek to the end of file is done, then writes of up to
2190  * FILE_ALLOC_BLOCK in size are done until the full transfer
2191  * size has been written. Writes are actually done from fo_buf,
2192  * rather than tf_mem as is done with flowoplib_write().
2193  * Returns -1 on error, 0 on success.
2194  */
2195 static int
2196 flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop)
2197 {
2198 	caddr_t iobuf;
2199 	uint64_t appendsize;
2200 	int filedesc;
2201 	vinteger_t wss;
2202 	int ret = 0;
2203 
2204 	if (filebench_randomno64(&appendsize, *flowop->fo_iosize, 1LL) != 0)
2205 		return (-1);
2206 
2207 	/* skip if attempting zero length append */
2208 	if (appendsize == 0) {
2209 		flowop_beginop(threadflow, flowop);
2210 		flowop_endop(threadflow, flowop, 0LL);
2211 		return (0);
2212 	}
2213 
2214 	if (flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2215 	    &filedesc, appendsize) != 0)
2216 		return (-1);
2217 
2218 	/* XXX wss is not being used */
2219 
2220 	/* Measure time to write bytes */
2221 	flowop_beginop(threadflow, flowop);
2222 
2223 	(void) lseek64(filedesc, 0, SEEK_END);
2224 	ret = write(filedesc, iobuf, appendsize);
2225 	if (ret != appendsize) {
2226 		filebench_log(LOG_ERROR,
2227 		    "Failed to write %d bytes on fd %d: %s",
2228 		    appendsize, filedesc, strerror(errno));
2229 		flowop_endop(threadflow, flowop, 0);
2230 		return (-1);
2231 	}
2232 
2233 	flowop_endop(threadflow, flowop, appendsize);
2234 
2235 	return (0);
2236 }
2237 
2238 
2239 /*
2240  * Prints usage information for flowop operations.
2241  */
2242 void
2243 flowoplib_usage()
2244 {
2245 	(void) fprintf(stderr,
2246 	    "flowop [openfile|createfile] name=<name>,fileset=<fname>\n");
2247 	(void) fprintf(stderr,
2248 	    "                       [,fd=<file desc num>]\n");
2249 	(void) fprintf(stderr, "\n");
2250 	(void) fprintf(stderr,
2251 	    "flowop closefile name=<name>,fd=<file desc num>]\n");
2252 	(void) fprintf(stderr, "\n");
2253 	(void) fprintf(stderr, "flowop deletefile name=<name>\n");
2254 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2255 	(void) fprintf(stderr,
2256 	    "                       [,fd=<file desc num>]\n");
2257 	(void) fprintf(stderr, "\n");
2258 	(void) fprintf(stderr, "flowop statfile name=<name>\n");
2259 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2260 	(void) fprintf(stderr,
2261 	    "                       [,fd=<file desc num>]\n");
2262 	(void) fprintf(stderr, "\n");
2263 	(void) fprintf(stderr,
2264 	    "flowop fsync name=<name>,fd=<file desc num>]\n");
2265 	(void) fprintf(stderr, "\n");
2266 	(void) fprintf(stderr,
2267 	    "flowop fsyncset name=<name>,fileset=<fname>]\n");
2268 	(void) fprintf(stderr, "\n");
2269 	(void) fprintf(stderr, "flowop [write|read|aiowrite] name=<name>, \n");
2270 	(void) fprintf(stderr,
2271 	    "                       filename|fileset=<fname>,\n");
2272 	(void) fprintf(stderr, "                       iosize=<size>\n");
2273 	(void) fprintf(stderr, "                       [,directio]\n");
2274 	(void) fprintf(stderr, "                       [,dsync]\n");
2275 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2276 	(void) fprintf(stderr, "                       [,random]\n");
2277 	(void) fprintf(stderr, "                       [,opennext]\n");
2278 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
2279 	(void) fprintf(stderr,
2280 	    "flowop [appendfile|appendfilerand] name=<name>, \n");
2281 	(void) fprintf(stderr,
2282 	    "                       filename|fileset=<fname>,\n");
2283 	(void) fprintf(stderr, "                       iosize=<size>\n");
2284 	(void) fprintf(stderr, "                       [,dsync]\n");
2285 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2286 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
2287 	(void) fprintf(stderr,
2288 	    "flowop [readwholefile|writewholefile] name=<name>, \n");
2289 	(void) fprintf(stderr,
2290 	    "                       filename|fileset=<fname>,\n");
2291 	(void) fprintf(stderr, "                       iosize=<size>\n");
2292 	(void) fprintf(stderr, "                       [,dsync]\n");
2293 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2294 	(void) fprintf(stderr, "\n");
2295 	(void) fprintf(stderr, "flowop aiowait name=<name>,target="
2296 	    "<aiowrite-flowop>\n");
2297 	(void) fprintf(stderr, "\n");
2298 	(void) fprintf(stderr, "flowop sempost name=<name>,"
2299 	    "target=<semblock-flowop>,\n");
2300 	(void) fprintf(stderr,
2301 	    "                       value=<increment-to-post>\n");
2302 	(void) fprintf(stderr, "\n");
2303 	(void) fprintf(stderr, "flowop semblock name=<name>,value="
2304 	    "<decrement-to-receive>,\n");
2305 	(void) fprintf(stderr, "                       highwater="
2306 	    "<inbound-queue-max>\n");
2307 	(void) fprintf(stderr, "\n");
2308 	(void) fprintf(stderr, "flowop block name=<name>\n");
2309 	(void) fprintf(stderr, "\n");
2310 	(void) fprintf(stderr,
2311 	    "flowop wakeup name=<name>,target=<block-flowop>,\n");
2312 	(void) fprintf(stderr, "\n");
2313 	(void) fprintf(stderr,
2314 	    "flowop hog name=<name>,value=<number-of-mem-ops>\n");
2315 	(void) fprintf(stderr,
2316 	    "flowop delay name=<name>,value=<number-of-seconds>\n");
2317 	(void) fprintf(stderr, "\n");
2318 	(void) fprintf(stderr, "flowop eventlimit name=<name>\n");
2319 	(void) fprintf(stderr, "flowop bwlimit name=<name>,value=<mb/s>\n");
2320 	(void) fprintf(stderr, "flowop iopslimit name=<name>,value=<iop/s>\n");
2321 	(void) fprintf(stderr,
2322 	    "flowop finishoncount name=<name>,value=<ops/s>\n");
2323 	(void) fprintf(stderr,
2324 	    "flowop finishonbytes name=<name>,value=<bytes>\n");
2325 	(void) fprintf(stderr, "\n");
2326 	(void) fprintf(stderr, "\n");
2327 }
2328