xref: /onnv-gate/usr/src/cmd/filebench/common/flowop_library.c (revision 6613:38664cf1a8a1)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  *
25  * Portions Copyright 2008 Denis Cheng
26  */
27 
28 #pragma ident	"%Z%%M%	%I%	%E% SMI"
29 
30 #include "config.h"
31 
32 #include <sys/types.h>
33 #ifdef HAVE_SYS_ASYNCH_H
34 #include <sys/asynch.h>
35 #endif
36 #include <sys/ipc.h>
37 #include <sys/sem.h>
38 #include <sys/errno.h>
39 #include <sys/time.h>
40 #include <inttypes.h>
41 #include <fcntl.h>
42 #include <math.h>
43 
44 #ifdef HAVE_UTILITY_H
45 #include <utility.h>
46 #endif /* HAVE_UTILITY_H */
47 
48 #ifdef HAVE_AIO
49 #include <aio.h>
50 #endif /* HAVE_AIO */
51 
52 #ifdef HAVE_LIBAIO_H
53 #include <libaio.h>
54 #endif /* HAVE_LIBAIO_H */
55 
56 #ifdef HAVE_SYS_ASYNC_H
57 #include <sys/asynch.h>
58 #endif /* HAVE_SYS_ASYNC_H */
59 
60 #ifdef HAVE_AIO_H
61 #include <aio.h>
62 #endif /* HAVE_AIO_H */
63 
64 #ifndef HAVE_UINT_T
65 #define	uint_t unsigned int
66 #endif /* HAVE_UINT_T */
67 
68 #ifndef HAVE_AIOCB64_T
69 #define	aiocb64 aiocb
70 #endif /* HAVE_AIOCB64_T */
71 
72 #ifndef HAVE_SYSV_SEM
73 #include <semaphore.h>
74 #endif /* HAVE_SYSV_SEM */
75 
76 #include "filebench.h"
77 #include "flowop.h"
78 #include "fileset.h"
79 #include "fb_random.h"
80 
81 /*
82  * These routines implement the flowops from the f language. Each
83  * flowop has has a name such as "read", and a set of function pointers
84  * to call for initialization, execution and destruction of the flowop.
85  * The table flowoplib_funcs[] contains a flowoplib struct for each
86  * implemented flowop. Most flowops use a generic initialization function
87  * and all currently use a generic destruction function. All flowop
88  * functions referenced from the table are in this file, though, of
89  * course, they often call functions from other files.
90  *
91  * The flowop_init() routine uses the flowoplib_funcs[] table to
92  * create an initial set of "instance 0" flowops, one for each type of
93  * flowop, from which all other flowops are derived. These "instance 0"
94  * flowops are initialized with information from the table including
95  * pointers for their fo_init, fo_func and fo_destroy functions. When
96  * a flowop definition is encountered in an f language script, the
97  * "type" of flowop, such as "read" is used to search for the
98  * "instance 0" flowop named "read", then a new flowop is allocated
99  * which inherits its function pointers and other initial properties
100  * from the instance 0 flowop, and is given a new name as specified
101  * by the "name=" attribute.
102  */
103 
104 static int flowoplib_init_generic(flowop_t *flowop);
105 static void flowoplib_destruct_generic(flowop_t *flowop);
106 static void flowoplib_destruct_noop(flowop_t *flowop);
107 static int flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop);
108 static int flowoplib_write(threadflow_t *threadflow, flowop_t *flowop);
109 #ifdef HAVE_AIO
110 static int flowoplib_aiowrite(threadflow_t *threadflow, flowop_t *flowop);
111 static int flowoplib_aiowait(threadflow_t *threadflow, flowop_t *flowop);
112 #endif
113 static int flowoplib_read(threadflow_t *threadflow, flowop_t *flowop);
114 static int flowoplib_block_init(flowop_t *flowop);
115 static int flowoplib_block(threadflow_t *threadflow, flowop_t *flowop);
116 static int flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop);
117 static int flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop);
118 static int flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop);
119 static int flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop);
120 static int flowoplib_sempost_init(flowop_t *flowop);
121 static int flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop);
122 static int flowoplib_semblock_init(flowop_t *flowop);
123 static void flowoplib_semblock_destruct(flowop_t *flowop);
124 static int flowoplib_eventlimit(threadflow_t *, flowop_t *flowop);
125 static int flowoplib_bwlimit(threadflow_t *, flowop_t *flowop);
126 static int flowoplib_iopslimit(threadflow_t *, flowop_t *flowop);
127 static int flowoplib_opslimit(threadflow_t *, flowop_t *flowop);
128 static int flowoplib_openfile(threadflow_t *, flowop_t *flowop);
129 static int flowoplib_openfile_common(threadflow_t *, flowop_t *flowop, int fd);
130 static int flowoplib_createfile(threadflow_t *, flowop_t *flowop);
131 static int flowoplib_closefile(threadflow_t *, flowop_t *flowop);
132 static int flowoplib_fsync(threadflow_t *, flowop_t *flowop);
133 static int flowoplib_readwholefile(threadflow_t *, flowop_t *flowop);
134 static int flowoplib_writewholefile(threadflow_t *, flowop_t *flowop);
135 static int flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop);
136 static int flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop);
137 static int flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop);
138 static int flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop);
139 static int flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop);
140 static int flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop);
141 static int flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop);
142 static int flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop);
143 static int flowoplib_testrandvar_init(flowop_t *flowop);
144 static void flowoplib_testrandvar_destruct(flowop_t *flowop);
145 
146 typedef struct flowoplib {
147 	int	fl_type;
148 	int	fl_attrs;
149 	char	*fl_name;
150 	int	(*fl_init)();
151 	int	(*fl_func)();
152 	void	(*fl_destruct)();
153 } flowoplib_t;
154 
155 static flowoplib_t flowoplib_funcs[] = {
156 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "write", flowoplib_init_generic,
157 	flowoplib_write, flowoplib_destruct_generic,
158 	FLOW_TYPE_IO, FLOW_ATTR_READ, "read", flowoplib_init_generic,
159 	flowoplib_read, flowoplib_destruct_generic,
160 #ifdef HAVE_AIO
161 	FLOW_TYPE_AIO, FLOW_ATTR_WRITE, "aiowrite", flowoplib_init_generic,
162 	flowoplib_aiowrite, flowoplib_destruct_generic,
163 	FLOW_TYPE_AIO, 0, "aiowait", flowoplib_init_generic,
164 	flowoplib_aiowait, flowoplib_destruct_generic,
165 #endif
166 	FLOW_TYPE_SYNC, 0, "block", flowoplib_block_init,
167 	flowoplib_block, flowoplib_destruct_generic,
168 	FLOW_TYPE_SYNC, 0, "wakeup", flowoplib_init_generic,
169 	flowoplib_wakeup, flowoplib_destruct_generic,
170 	FLOW_TYPE_SYNC, 0, "semblock", flowoplib_semblock_init,
171 	flowoplib_semblock, flowoplib_semblock_destruct,
172 	FLOW_TYPE_SYNC, 0, "sempost", flowoplib_sempost_init,
173 	flowoplib_sempost, flowoplib_destruct_noop,
174 	FLOW_TYPE_OTHER, 0, "hog", flowoplib_init_generic,
175 	flowoplib_hog, flowoplib_destruct_generic,
176 	FLOW_TYPE_OTHER, 0, "delay", flowoplib_init_generic,
177 	flowoplib_delay, flowoplib_destruct_generic,
178 	FLOW_TYPE_OTHER, 0, "eventlimit", flowoplib_init_generic,
179 	flowoplib_eventlimit, flowoplib_destruct_generic,
180 	FLOW_TYPE_OTHER, 0, "bwlimit", flowoplib_init_generic,
181 	flowoplib_bwlimit, flowoplib_destruct_generic,
182 	FLOW_TYPE_OTHER, 0, "iopslimit", flowoplib_init_generic,
183 	flowoplib_iopslimit, flowoplib_destruct_generic,
184 	FLOW_TYPE_OTHER, 0, "opslimit", flowoplib_init_generic,
185 	flowoplib_opslimit, flowoplib_destruct_generic,
186 	FLOW_TYPE_OTHER, 0, "finishoncount", flowoplib_init_generic,
187 	flowoplib_finishoncount, flowoplib_destruct_generic,
188 	FLOW_TYPE_OTHER, 0, "finishonbytes", flowoplib_init_generic,
189 	flowoplib_finishonbytes, flowoplib_destruct_generic,
190 	FLOW_TYPE_IO, 0, "openfile", flowoplib_init_generic,
191 	flowoplib_openfile, flowoplib_destruct_generic,
192 	FLOW_TYPE_IO, 0, "createfile", flowoplib_init_generic,
193 	flowoplib_createfile, flowoplib_destruct_generic,
194 	FLOW_TYPE_IO, 0, "closefile", flowoplib_init_generic,
195 	flowoplib_closefile, flowoplib_destruct_generic,
196 	FLOW_TYPE_IO, 0, "fsync", flowoplib_init_generic,
197 	flowoplib_fsync, flowoplib_destruct_generic,
198 	FLOW_TYPE_IO, 0, "fsyncset", flowoplib_init_generic,
199 	flowoplib_fsyncset, flowoplib_destruct_generic,
200 	FLOW_TYPE_IO, 0, "statfile", flowoplib_init_generic,
201 	flowoplib_statfile, flowoplib_destruct_generic,
202 	FLOW_TYPE_IO, FLOW_ATTR_READ, "readwholefile", flowoplib_init_generic,
203 	flowoplib_readwholefile, flowoplib_destruct_generic,
204 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfile", flowoplib_init_generic,
205 	flowoplib_appendfile, flowoplib_destruct_generic,
206 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfilerand", flowoplib_init_generic,
207 	flowoplib_appendfilerand, flowoplib_destruct_generic,
208 	FLOW_TYPE_IO, 0, "deletefile", flowoplib_init_generic,
209 	flowoplib_deletefile, flowoplib_destruct_generic,
210 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "writewholefile", flowoplib_init_generic,
211 	flowoplib_writewholefile, flowoplib_destruct_generic,
212 	/* routine to calculate mean and stddev for output from a randvar */
213 	FLOW_TYPE_OTHER, 0, "testrandvar", flowoplib_testrandvar_init,
214 	flowoplib_testrandvar, flowoplib_testrandvar_destruct
215 };
216 
217 /*
218  * Loops through the master list of flowops defined in this
219  * module, and creates and initializes a flowop for each one
220  * by calling flowop_define. As a side effect of calling
221  * flowop define, the created flowops are placed on the
222  * master flowop list. All created flowops are set to
223  * instance "0".
224  */
225 void
226 flowoplib_init()
227 {
228 	int nops = sizeof (flowoplib_funcs) / sizeof (flowoplib_t);
229 	int i;
230 
231 	for (i = 0; i < nops; i++) {
232 		flowop_t *flowop;
233 		flowoplib_t *fl;
234 
235 		fl = &flowoplib_funcs[i];
236 
237 		if ((flowop = flowop_define(NULL,
238 		    fl->fl_name, NULL, NULL, 0, fl->fl_type)) == 0) {
239 			filebench_log(LOG_ERROR,
240 			    "failed to create flowop %s\n",
241 			    fl->fl_name);
242 			filebench_shutdown(1);
243 		}
244 
245 		flowop->fo_func = fl->fl_func;
246 		flowop->fo_init = fl->fl_init;
247 		flowop->fo_destruct = fl->fl_destruct;
248 		flowop->fo_attrs = fl->fl_attrs;
249 	}
250 }
251 
252 static int
253 flowoplib_init_generic(flowop_t *flowop)
254 {
255 	(void) ipc_mutex_unlock(&flowop->fo_lock);
256 	return (FILEBENCH_OK);
257 }
258 
259 static void
260 flowoplib_destruct_generic(flowop_t *flowop)
261 {
262 	char *buf;
263 
264 	/* release any local resources held by the flowop */
265 	(void) ipc_mutex_lock(&flowop->fo_lock);
266 	buf = flowop->fo_buf;
267 	flowop->fo_buf = NULL;
268 	(void) ipc_mutex_unlock(&flowop->fo_lock);
269 
270 	if (buf)
271 		free(buf);
272 }
273 
274 /*
275  * Special total noop destruct
276  */
277 /* ARGSUSED */
278 static void
279 flowoplib_destruct_noop(flowop_t *flowop)
280 {
281 }
282 
283 /*
284  * Generates a file attribute from flags in the supplied flowop.
285  * Sets FLOW_ATTR_DIRECTIO and/or FLOW_ATTR_DSYNC as needed.
286  */
287 static int
288 flowoplib_fileattrs(flowop_t *flowop)
289 {
290 	int attrs = 0;
291 
292 	if (avd_get_bool(flowop->fo_directio))
293 		attrs |= FLOW_ATTR_DIRECTIO;
294 
295 	if (avd_get_bool(flowop->fo_dsync))
296 		attrs |= FLOW_ATTR_DSYNC;
297 
298 	return (attrs);
299 }
300 
301 /*
302  * Searches for a file descriptor. Tries the flowop's
303  * fo_fdnumber first and returns with it if it has been
304  * explicitly set (greater than 0). It next checks to
305  * see if a rotating file descriptor policy is in effect,
306  * and if not returns the fdnumber regardless of what
307  * it is. (note that if it is 0, it just selects to the
308  * default file descriptor in the threadflow's tf_fd
309  * array). If the rotating fd policy is in effect, it
310  * cycles from the end of the tf_fd array to one location
311  * beyond the maximum needed by the number of entries in
312  * the associated fileset on each invocation, then starts
313  * over from the end.
314  *
315  * The routine returns an index into the threadflow's
316  * tf_fd table where the actual file descriptor will be
317  * found. Note: the calling routine must not call this
318  * routine if the flowop does not have a fileset, and the
319  * flowop's fo_fdnumber is zero and fo_rotatefd is
320  * asserted, or an addressing fault may occur.
321  */
322 static int
323 flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop)
324 {
325 	fbint_t	entries;
326 	int fdnumber = flowop->fo_fdnumber;
327 
328 	/* If the script sets the fd explicitly */
329 	if (fdnumber > 0)
330 		return (fdnumber);
331 
332 	/* If the flowop defaults to persistent fd */
333 	if (!avd_get_bool(flowop->fo_rotatefd))
334 		return (fdnumber);
335 
336 	if (flowop->fo_fileset == NULL) {
337 		filebench_log(LOG_ERROR, "flowop NULL file");
338 		return (FILEBENCH_ERROR);
339 	}
340 
341 	entries = flowop->fo_fileset->fs_constentries;
342 
343 	/* Rotate the fd on each flowop invocation */
344 	if (entries > (THREADFLOW_MAXFD / 2)) {
345 		filebench_log(LOG_ERROR, "Out of file descriptors in flowop %s"
346 		    " (too many files : %llu",
347 		    flowop->fo_name, (u_longlong_t)entries);
348 		return (FILEBENCH_ERROR);
349 	}
350 
351 	/* First time around */
352 	if (threadflow->tf_fdrotor == 0)
353 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
354 
355 	/* One fd for every file in the set */
356 	if (entries == (THREADFLOW_MAXFD - threadflow->tf_fdrotor))
357 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
358 
359 
360 	threadflow->tf_fdrotor--;
361 	filebench_log(LOG_DEBUG_IMPL, "selected fd = %d",
362 	    threadflow->tf_fdrotor);
363 	return (threadflow->tf_fdrotor);
364 }
365 
366 /*
367  * Determines the file descriptor to use, and attempts to open
368  * the file if it is not already open. Also determines the wss
369  * value. Returns FILEBENCH_ERROR on errors, FILESET_NORSC if
370  * if flowop_openfile_common couldn't obtain an appropriate file
371  * from a the fileset, and FILEBENCH_OK otherwise.
372  */
373 static int
374 flowoplib_filesetup(threadflow_t *threadflow, flowop_t *flowop,
375     fbint_t *wssp, int *filedescp)
376 {
377 	int fd = flowoplib_fdnum(threadflow, flowop);
378 
379 	if (fd == -1)
380 		return (FILEBENCH_ERROR);
381 
382 	if (threadflow->tf_fd[fd] == 0) {
383 		int ret;
384 
385 		if ((ret = flowoplib_openfile_common(
386 		    threadflow, flowop, fd)) != FILEBENCH_OK)
387 			return (ret);
388 
389 		if (threadflow->tf_fse[fd]) {
390 			filebench_log(LOG_DEBUG_IMPL, "opened file %s",
391 			    threadflow->tf_fse[fd]->fse_path);
392 		} else {
393 			filebench_log(LOG_DEBUG_IMPL,
394 			    "opened device %s/%s",
395 			    avd_get_str(flowop->fo_fileset->fs_path),
396 			    avd_get_str(flowop->fo_fileset->fs_name));
397 		}
398 	}
399 
400 	*filedescp = threadflow->tf_fd[fd];
401 
402 	if ((*wssp = flowop->fo_constwss) == 0) {
403 		if (threadflow->tf_fse[fd])
404 			*wssp = threadflow->tf_fse[fd]->fse_size;
405 		else
406 			*wssp = avd_get_int(flowop->fo_fileset->fs_size);
407 	}
408 
409 	return (FILEBENCH_OK);
410 }
411 
412 /*
413  * Determines the io buffer or random offset into tf_mem for
414  * the IO operation. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
415  */
416 static int
417 flowoplib_iobufsetup(threadflow_t *threadflow, flowop_t *flowop,
418     caddr_t *iobufp, fbint_t iosize)
419 {
420 	long memsize;
421 	size_t memoffset;
422 
423 	if (iosize == 0) {
424 		filebench_log(LOG_ERROR, "zero iosize for thread %s",
425 		    flowop->fo_name);
426 		return (FILEBENCH_ERROR);
427 	}
428 
429 	if ((memsize = threadflow->tf_constmemsize) != 0) {
430 
431 		/* use tf_mem for I/O with random offset */
432 		if (filebench_randomno(&memoffset,
433 		    memsize, iosize, NULL) == -1) {
434 			filebench_log(LOG_ERROR,
435 			    "tf_memsize smaller than IO size for thread %s",
436 			    flowop->fo_name);
437 			return (FILEBENCH_ERROR);
438 		}
439 		*iobufp = threadflow->tf_mem + memoffset;
440 
441 	} else {
442 		/* use private I/O buffer */
443 		if ((flowop->fo_buf != NULL) &&
444 		    (flowop->fo_buf_size < iosize)) {
445 			/* too small, so free up and re-allocate */
446 			free(flowop->fo_buf);
447 			flowop->fo_buf = NULL;
448 		}
449 
450 		/*
451 		 * Allocate memory for the  buffer. The memory is freed
452 		 * by flowop_destruct_generic() or by this routine if more
453 		 * memory is needed for the buffer.
454 		 */
455 		if ((flowop->fo_buf == NULL) && ((flowop->fo_buf
456 		    = (char *)malloc(iosize)) == NULL))
457 			return (FILEBENCH_ERROR);
458 
459 		flowop->fo_buf_size = iosize;
460 		*iobufp = flowop->fo_buf;
461 	}
462 	return (FILEBENCH_OK);
463 }
464 
465 /*
466  * Determines the file descriptor to use, opens it if necessary, the
467  * io buffer or random offset into tf_mem for IO operation and the wss
468  * value. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
469  */
470 static int
471 flowoplib_iosetup(threadflow_t *threadflow, flowop_t *flowop,
472     fbint_t *wssp, caddr_t *iobufp, int *filedescp, fbint_t iosize)
473 {
474 	int ret;
475 
476 	if ((ret = flowoplib_filesetup(threadflow, flowop, wssp, filedescp)) !=
477 	    FILEBENCH_OK)
478 		return (ret);
479 
480 	if ((ret = flowoplib_iobufsetup(threadflow, flowop, iobufp, iosize)) !=
481 	    FILEBENCH_OK)
482 		return (ret);
483 
484 	return (FILEBENCH_OK);
485 }
486 
487 /*
488  * Emulate posix read / pread. If the flowop has a fileset,
489  * a file descriptor number index is fetched, otherwise a
490  * supplied fileobj file is used. In either case the specified
491  * file will be opened if not already open. If the flowop has
492  * neither a fileset or fileobj, an error is logged and FILEBENCH_ERROR
493  * returned.
494  *
495  * The actual read is done to a random offset in the
496  * threadflow's thread memory (tf_mem), with a size set by
497  * fo_iosize and at either a random disk offset within the
498  * working set size, or at the next sequential location. If
499  * any errors are encountered, FILEBENCH_ERROR is returned,
500  * if no appropriate file can be obtained from the fileset then
501  * FILEBENCH_NORSC is returned, otherise FILEBENCH_OK is returned.
502  */
503 static int
504 flowoplib_read(threadflow_t *threadflow, flowop_t *flowop)
505 {
506 	caddr_t iobuf;
507 	fbint_t wss;
508 	fbint_t iosize;
509 	int filedesc;
510 	int ret;
511 
512 
513 	iosize = avd_get_int(flowop->fo_iosize);
514 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
515 	    &filedesc, iosize)) != FILEBENCH_OK)
516 		return (ret);
517 
518 	if (avd_get_bool(flowop->fo_random)) {
519 		uint64_t fileoffset;
520 
521 		if (filebench_randomno64(&fileoffset,
522 		    wss, iosize, NULL) == -1) {
523 			filebench_log(LOG_ERROR,
524 			    "file size smaller than IO size for thread %s",
525 			    flowop->fo_name);
526 			return (FILEBENCH_ERROR);
527 		}
528 
529 		(void) flowop_beginop(threadflow, flowop);
530 		if ((ret = pread64(filedesc, iobuf,
531 		    iosize, (off64_t)fileoffset)) == -1) {
532 			(void) flowop_endop(threadflow, flowop, 0);
533 			filebench_log(LOG_ERROR,
534 			    "read file %s failed, offset %llu "
535 			    "io buffer %zd: %s",
536 			    avd_get_str(flowop->fo_fileset->fs_name),
537 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
538 			flowop_endop(threadflow, flowop, 0);
539 			return (FILEBENCH_ERROR);
540 		}
541 		(void) flowop_endop(threadflow, flowop, ret);
542 
543 		if ((ret == 0))
544 			(void) lseek64(filedesc, 0, SEEK_SET);
545 
546 	} else {
547 		(void) flowop_beginop(threadflow, flowop);
548 		if ((ret = read(filedesc, iobuf, iosize)) == -1) {
549 			(void) flowop_endop(threadflow, flowop, 0);
550 			filebench_log(LOG_ERROR,
551 			    "read file %s failed, io buffer %zd: %s",
552 			    avd_get_str(flowop->fo_fileset->fs_name),
553 			    iobuf, strerror(errno));
554 			(void) flowop_endop(threadflow, flowop, 0);
555 			return (FILEBENCH_ERROR);
556 		}
557 		(void) flowop_endop(threadflow, flowop, ret);
558 
559 		if ((ret == 0))
560 			(void) lseek64(filedesc, 0, SEEK_SET);
561 	}
562 
563 	return (FILEBENCH_OK);
564 }
565 
566 #ifdef HAVE_AIO
567 
568 /*
569  * Asynchronous write section. An Asynchronous IO element
570  * (aiolist_t) is used to associate the asynchronous write request with
571  * its subsequent completion. This element includes a aiocb64 struct
572  * that is used by posix aio_xxx calls to track the asynchronous writes.
573  * The flowops aiowrite and aiowait result in calls to these posix
574  * aio_xxx system routines to do the actual asynchronous write IO
575  * operations.
576  */
577 
578 
579 /*
580  * Allocates an asynchronous I/O list (aio, of type
581  * aiolist_t) element. Adds it to the flowop thread's
582  * threadflow aio list. Returns a pointer to the element.
583  */
584 static aiolist_t *
585 aio_allocate(flowop_t *flowop)
586 {
587 	aiolist_t *aiolist;
588 
589 	if ((aiolist = malloc(sizeof (aiolist_t))) == NULL) {
590 		filebench_log(LOG_ERROR, "malloc aiolist failed");
591 		filebench_shutdown(1);
592 	}
593 
594 	/* Add to list */
595 	if (flowop->fo_thread->tf_aiolist == NULL) {
596 		flowop->fo_thread->tf_aiolist = aiolist;
597 		aiolist->al_next = NULL;
598 	} else {
599 		aiolist->al_next = flowop->fo_thread->tf_aiolist;
600 		flowop->fo_thread->tf_aiolist = aiolist;
601 	}
602 	return (aiolist);
603 }
604 
605 /*
606  * Searches for the aiolist element that has a matching
607  * completion block, aiocb. If none found returns FILEBENCH_ERROR. If
608  * found, removes the aiolist element from flowop thread's
609  * list and returns FILEBENCH_OK.
610  */
611 static int
612 aio_deallocate(flowop_t *flowop, struct aiocb64 *aiocb)
613 {
614 	aiolist_t *aiolist = flowop->fo_thread->tf_aiolist;
615 	aiolist_t *previous = NULL;
616 	aiolist_t *match = NULL;
617 
618 	if (aiocb == NULL) {
619 		filebench_log(LOG_ERROR, "null aiocb deallocate");
620 		return (FILEBENCH_OK);
621 	}
622 
623 	while (aiolist) {
624 		if (aiocb == &(aiolist->al_aiocb)) {
625 			match = aiolist;
626 			break;
627 		}
628 		previous = aiolist;
629 		aiolist = aiolist->al_next;
630 	}
631 
632 	if (match == NULL)
633 		return (FILEBENCH_ERROR);
634 
635 	/* Remove from the list */
636 	if (previous)
637 		previous->al_next = match->al_next;
638 	else
639 		flowop->fo_thread->tf_aiolist = match->al_next;
640 
641 	return (FILEBENCH_OK);
642 }
643 
644 /*
645  * Emulate posix aiowrite(). Determines which file to use,
646  * either one file of a fileset, or the file associated
647  * with a fileobj, allocates and fills an aiolist_t element
648  * for the write, and issues the asynchronous write. This
649  * operation is only valid for random IO, and returns an
650  * error if the flowop is set for sequential IO. Returns
651  * FILEBENCH_OK on success, FILEBENCH_NORSC if iosetup can't
652  * obtain a file to open, and FILEBENCH_ERROR on any
653  * encountered error.
654  */
655 static int
656 flowoplib_aiowrite(threadflow_t *threadflow, flowop_t *flowop)
657 {
658 	caddr_t iobuf;
659 	fbint_t wss;
660 	fbint_t iosize;
661 	int filedesc;
662 	int ret;
663 
664 	iosize = avd_get_int(flowop->fo_iosize);
665 
666 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
667 	    &filedesc, iosize)) != FILEBENCH_OK)
668 		return (ret);
669 
670 	if (avd_get_bool(flowop->fo_random)) {
671 		uint64_t fileoffset;
672 		struct aiocb64 *aiocb;
673 		aiolist_t *aiolist;
674 
675 		if (filebench_randomno64(&fileoffset,
676 		    wss, iosize, NULL) == -1) {
677 			filebench_log(LOG_ERROR,
678 			    "file size smaller than IO size for thread %s",
679 			    flowop->fo_name);
680 			return (FILEBENCH_ERROR);
681 		}
682 
683 		aiolist = aio_allocate(flowop);
684 		aiolist->al_type = AL_WRITE;
685 		aiocb = &aiolist->al_aiocb;
686 
687 		aiocb->aio_fildes = filedesc;
688 		aiocb->aio_buf = iobuf;
689 		aiocb->aio_nbytes = (size_t)iosize;
690 		aiocb->aio_offset = (off64_t)fileoffset;
691 		aiocb->aio_reqprio = 0;
692 
693 		filebench_log(LOG_DEBUG_IMPL,
694 		    "aio fd=%d, bytes=%llu, offset=%llu",
695 		    filedesc, (u_longlong_t)iosize, (u_longlong_t)fileoffset);
696 
697 		flowop_beginop(threadflow, flowop);
698 		if (aio_write64(aiocb) < 0) {
699 			filebench_log(LOG_ERROR, "aiowrite failed: %s",
700 			    strerror(errno));
701 			filebench_shutdown(1);
702 		}
703 		flowop_endop(threadflow, flowop, iosize);
704 	} else {
705 		return (FILEBENCH_ERROR);
706 	}
707 
708 	return (FILEBENCH_OK);
709 }
710 
711 
712 
713 #define	MAXREAP 4096
714 
715 /*
716  * Emulate posix aiowait(). Waits for the completion of half the
717  * outstanding asynchronous IOs, or a single IO, which ever is
718  * larger. The routine will return after a sufficient number of
719  * completed calls issued by any thread in the procflow have
720  * completed, or a 1 second timout elapses. All completed
721  * IO operations are deleted from the thread's aiolist.
722  */
723 static int
724 flowoplib_aiowait(threadflow_t *threadflow, flowop_t *flowop)
725 {
726 	struct aiocb64 **worklist;
727 	aiolist_t *aio = flowop->fo_thread->tf_aiolist;
728 	int uncompleted = 0;
729 
730 	worklist = calloc(MAXREAP, sizeof (struct aiocb64 *));
731 
732 	/* Count the list of pending aios */
733 	while (aio) {
734 		uncompleted++;
735 		aio = aio->al_next;
736 	}
737 
738 	do {
739 		uint_t ncompleted = 0;
740 		uint_t todo;
741 		struct timespec timeout;
742 		int inprogress;
743 		int i;
744 
745 		/* Wait for half of the outstanding requests */
746 		timeout.tv_sec = 1;
747 		timeout.tv_nsec = 0;
748 
749 		if (uncompleted > MAXREAP)
750 			todo = MAXREAP;
751 		else
752 			todo = uncompleted / 2;
753 
754 		if (todo == 0)
755 			todo = 1;
756 
757 		flowop_beginop(threadflow, flowop);
758 
759 #ifdef HAVE_AIOWAITN
760 		if ((aio_waitn64((struct aiocb64 **)worklist,
761 		    MAXREAP, &todo, &timeout) == -1) &&
762 		    errno && (errno != ETIME)) {
763 			filebench_log(LOG_ERROR,
764 			    "aiowait failed: %s, outstanding = %d, "
765 			    "ncompleted = %d ",
766 			    strerror(errno), uncompleted, todo);
767 		}
768 
769 		ncompleted = todo;
770 		/* Take the  completed I/Os from the list */
771 		inprogress = 0;
772 		for (i = 0; i < ncompleted; i++) {
773 			if ((aio_return64(worklist[i]) == -1) &&
774 			    (errno == EINPROGRESS)) {
775 				inprogress++;
776 				continue;
777 			}
778 			if (aio_deallocate(flowop, worklist[i]) < 0) {
779 				filebench_log(LOG_ERROR, "Could not remove "
780 				    "aio from list ");
781 				flowop_endop(threadflow, flowop, 0);
782 				return (FILEBENCH_ERROR);
783 			}
784 		}
785 
786 		uncompleted -= ncompleted;
787 		uncompleted += inprogress;
788 
789 #else
790 
791 		for (ncompleted = 0, inprogress = 0,
792 		    aio = flowop->fo_thread->tf_aiolist;
793 		    ncompleted < todo, aio != NULL; aio = aio->al_next) {
794 			int result = aio_error64(&aio->al_aiocb);
795 
796 			if (result == EINPROGRESS) {
797 				inprogress++;
798 				continue;
799 			}
800 
801 			if ((aio_return64(&aio->al_aiocb) == -1) || result) {
802 				filebench_log(LOG_ERROR, "aio failed: %s",
803 				    strerror(result));
804 				continue;
805 			}
806 
807 			ncompleted++;
808 
809 			if (aio_deallocate(flowop, &aio->al_aiocb) < 0) {
810 				filebench_log(LOG_ERROR, "Could not remove aio "
811 				    "from list ");
812 				flowop_endop(threadflow, flowop, 0);
813 				return (FILEBENCH_ERROR);
814 			}
815 		}
816 
817 		uncompleted -= ncompleted;
818 
819 #endif
820 		filebench_log(LOG_DEBUG_SCRIPT,
821 		    "aio2 completed %d ios, uncompleted = %d, inprogress = %d",
822 		    ncompleted, uncompleted, inprogress);
823 
824 	} while (uncompleted > MAXREAP);
825 
826 	flowop_endop(threadflow, flowop, 0);
827 
828 	free(worklist);
829 
830 	return (FILEBENCH_OK);
831 }
832 
833 #endif /* HAVE_AIO */
834 
835 /*
836  * Initializes a "flowop_block" flowop. Specifically, it
837  * initializes the flowop's fo_cv and unlocks the fo_lock.
838  */
839 static int
840 flowoplib_block_init(flowop_t *flowop)
841 {
842 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d block init address %zx",
843 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
844 	(void) pthread_cond_init(&flowop->fo_cv, ipc_condattr());
845 	(void) ipc_mutex_unlock(&flowop->fo_lock);
846 
847 	return (FILEBENCH_OK);
848 }
849 
850 /*
851  * Blocks the threadflow until woken up by flowoplib_wakeup.
852  * The routine blocks on the flowop's fo_cv condition variable.
853  */
854 static int
855 flowoplib_block(threadflow_t *threadflow, flowop_t *flowop)
856 {
857 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d blocking at address %zx",
858 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
859 	(void) ipc_mutex_lock(&flowop->fo_lock);
860 
861 	flowop_beginop(threadflow, flowop);
862 	(void) pthread_cond_wait(&flowop->fo_cv, &flowop->fo_lock);
863 	flowop_endop(threadflow, flowop, 0);
864 
865 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
866 	    flowop->fo_name, flowop->fo_instance);
867 
868 	(void) ipc_mutex_unlock(&flowop->fo_lock);
869 
870 	return (FILEBENCH_OK);
871 }
872 
873 /*
874  * Wakes up one or more target blocking flowops.
875  * Sends broadcasts on the fo_cv condition variables of all
876  * flowops on the target list, except those that are
877  * FLOW_MASTER flowops. The target list consists of all
878  * flowops whose name matches this flowop's "fo_targetname"
879  * attribute. The target list is generated on the first
880  * invocation, and the run will be shutdown if no targets
881  * are found. Otherwise the routine always returns FILEBENCH_OK.
882  */
883 static int
884 flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop)
885 {
886 	flowop_t *target;
887 
888 	/* if this is the first wakeup, create the wakeup list */
889 	if (flowop->fo_targets == NULL) {
890 		flowop_t *result = flowop_find(flowop->fo_targetname);
891 
892 		flowop->fo_targets = result;
893 		if (result == NULL) {
894 			filebench_log(LOG_ERROR,
895 			    "wakeup: could not find op %s for thread %s",
896 			    flowop->fo_targetname,
897 			    threadflow->tf_name);
898 			filebench_shutdown(1);
899 		}
900 		while (result) {
901 			result->fo_targetnext =
902 			    result->fo_resultnext;
903 			result = result->fo_resultnext;
904 		}
905 	}
906 
907 	target = flowop->fo_targets;
908 
909 	/* wakeup the targets */
910 	while (target) {
911 		if (target->fo_instance == FLOW_MASTER) {
912 			target = target->fo_targetnext;
913 			continue;
914 		}
915 		filebench_log(LOG_DEBUG_IMPL,
916 		    "wakeup flow %s-%d at address %zx",
917 		    target->fo_name,
918 		    target->fo_instance,
919 		    &target->fo_cv);
920 
921 		flowop_beginop(threadflow, flowop);
922 		(void) ipc_mutex_lock(&target->fo_lock);
923 		(void) pthread_cond_broadcast(&target->fo_cv);
924 		(void) ipc_mutex_unlock(&target->fo_lock);
925 		flowop_endop(threadflow, flowop, 0);
926 
927 		target = target->fo_targetnext;
928 	}
929 
930 	return (FILEBENCH_OK);
931 }
932 
933 /*
934  * "think time" routines. the "hog" routine consumes cpu cycles as
935  * it "thinks", while the "delay" flowop simply calls sleep() to delay
936  * for a given number of seconds without consuming cpu cycles.
937  */
938 
939 
940 /*
941  * Consumes CPU cycles and memory bandwidth by looping for
942  * flowop->fo_value times. With each loop sets memory location
943  * threadflow->tf_mem to 1.
944  */
945 static int
946 flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop)
947 {
948 	uint64_t value = avd_get_int(flowop->fo_value);
949 	int i;
950 
951 	filebench_log(LOG_DEBUG_IMPL, "hog enter");
952 	flowop_beginop(threadflow, flowop);
953 	if (threadflow->tf_mem != NULL) {
954 		for (i = 0; i < value; i++)
955 			*(threadflow->tf_mem) = 1;
956 	}
957 	flowop_endop(threadflow, flowop, 0);
958 	filebench_log(LOG_DEBUG_IMPL, "hog exit");
959 	return (FILEBENCH_OK);
960 }
961 
962 
963 /*
964  * Delays for fo_value seconds.
965  */
966 static int
967 flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop)
968 {
969 	int value = avd_get_int(flowop->fo_value);
970 
971 	flowop_beginop(threadflow, flowop);
972 	(void) sleep(value);
973 	flowop_endop(threadflow, flowop, 0);
974 	return (FILEBENCH_OK);
975 }
976 
977 /*
978  * Rate limiting routines. This is the event consuming half of the
979  * event system. Each of the four following routines will limit the rate
980  * to one unit of either calls, issued I/O operations, issued filebench
981  * operations, or I/O bandwidth. Since there is only one event generator,
982  * the events will be divided amoung multiple instances of an event
983  * consumer, and further divided among different consumers if more than
984  * one has been defined. There is no mechanism to enforce equal sharing
985  * of events.
986  */
987 
988 /*
989  * Completes one invocation per posted event. If eventgen_q
990  * has an event count greater than zero, one will be removed
991  * (count decremented), otherwise the calling thread will
992  * block until another event has been posted. Always returns 0
993  */
994 static int
995 flowoplib_eventlimit(threadflow_t *threadflow, flowop_t *flowop)
996 {
997 	/* Immediately bail if not set/enabled */
998 	if (filebench_shm->shm_eventgen_hz == 0)
999 		return (FILEBENCH_OK);
1000 
1001 	if (flowop->fo_initted == 0) {
1002 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1003 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1004 		flowop->fo_initted = 1;
1005 	}
1006 
1007 	flowop_beginop(threadflow, flowop);
1008 	while (filebench_shm->shm_eventgen_hz) {
1009 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1010 		if (filebench_shm->shm_eventgen_q > 0) {
1011 			filebench_shm->shm_eventgen_q--;
1012 			(void) ipc_mutex_unlock(
1013 			    &filebench_shm->shm_eventgen_lock);
1014 			break;
1015 		}
1016 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1017 		    &filebench_shm->shm_eventgen_lock);
1018 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1019 	}
1020 	flowop_endop(threadflow, flowop, 0);
1021 	return (FILEBENCH_OK);
1022 }
1023 
1024 /*
1025  * Blocks the calling thread if the number of issued I/O
1026  * operations exceeds the number of posted events, thus
1027  * limiting the average I/O operation rate to the rate
1028  * specified by eventgen_hz. Always returns FILEBENCH_OK.
1029  */
1030 static int
1031 flowoplib_iopslimit(threadflow_t *threadflow, flowop_t *flowop)
1032 {
1033 	uint64_t iops;
1034 	uint64_t delta;
1035 	uint64_t events;
1036 
1037 	/* Immediately bail if not set/enabled */
1038 	if (filebench_shm->shm_eventgen_hz == 0)
1039 		return (FILEBENCH_OK);
1040 
1041 	if (flowop->fo_initted == 0) {
1042 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1043 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1044 		flowop->fo_initted = 1;
1045 	}
1046 
1047 	(void) ipc_mutex_lock(&controlstats_lock);
1048 	iops = (controlstats.fs_rcount +
1049 	    controlstats.fs_wcount);
1050 	(void) ipc_mutex_unlock(&controlstats_lock);
1051 
1052 	/* Is this the first time around */
1053 	if (flowop->fo_tputlast == 0) {
1054 		flowop->fo_tputlast = iops;
1055 		return (FILEBENCH_OK);
1056 	}
1057 
1058 	delta = iops - flowop->fo_tputlast;
1059 	flowop->fo_tputbucket -= delta;
1060 	flowop->fo_tputlast = iops;
1061 
1062 	/* No need to block if the q isn't empty */
1063 	if (flowop->fo_tputbucket >= 0LL) {
1064 		flowop_endop(threadflow, flowop, 0);
1065 		return (FILEBENCH_OK);
1066 	}
1067 
1068 	iops = flowop->fo_tputbucket * -1;
1069 	events = iops;
1070 
1071 	flowop_beginop(threadflow, flowop);
1072 	while (filebench_shm->shm_eventgen_hz) {
1073 
1074 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1075 		if (filebench_shm->shm_eventgen_q >= events) {
1076 			filebench_shm->shm_eventgen_q -= events;
1077 			(void) ipc_mutex_unlock(
1078 			    &filebench_shm->shm_eventgen_lock);
1079 			flowop->fo_tputbucket += events;
1080 			break;
1081 		}
1082 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1083 		    &filebench_shm->shm_eventgen_lock);
1084 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1085 	}
1086 	flowop_endop(threadflow, flowop, 0);
1087 
1088 	return (FILEBENCH_OK);
1089 }
1090 
1091 /*
1092  * Blocks the calling thread if the number of issued filebench
1093  * operations exceeds the number of posted events, thus limiting
1094  * the average filebench operation rate to the rate specified by
1095  * eventgen_hz. Always returns FILEBENCH_OK.
1096  */
1097 static int
1098 flowoplib_opslimit(threadflow_t *threadflow, flowop_t *flowop)
1099 {
1100 	uint64_t ops;
1101 	uint64_t delta;
1102 	uint64_t events;
1103 
1104 	/* Immediately bail if not set/enabled */
1105 	if (filebench_shm->shm_eventgen_hz == 0)
1106 		return (FILEBENCH_OK);
1107 
1108 	if (flowop->fo_initted == 0) {
1109 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1110 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1111 		flowop->fo_initted = 1;
1112 	}
1113 
1114 	(void) ipc_mutex_lock(&controlstats_lock);
1115 	ops = controlstats.fs_count;
1116 	(void) ipc_mutex_unlock(&controlstats_lock);
1117 
1118 	/* Is this the first time around */
1119 	if (flowop->fo_tputlast == 0) {
1120 		flowop->fo_tputlast = ops;
1121 		return (FILEBENCH_OK);
1122 	}
1123 
1124 	delta = ops - flowop->fo_tputlast;
1125 	flowop->fo_tputbucket -= delta;
1126 	flowop->fo_tputlast = ops;
1127 
1128 	/* No need to block if the q isn't empty */
1129 	if (flowop->fo_tputbucket >= 0LL) {
1130 		flowop_endop(threadflow, flowop, 0);
1131 		return (FILEBENCH_OK);
1132 	}
1133 
1134 	ops = flowop->fo_tputbucket * -1;
1135 	events = ops;
1136 
1137 	flowop_beginop(threadflow, flowop);
1138 	while (filebench_shm->shm_eventgen_hz) {
1139 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1140 		if (filebench_shm->shm_eventgen_q >= events) {
1141 			filebench_shm->shm_eventgen_q -= events;
1142 			(void) ipc_mutex_unlock(
1143 			    &filebench_shm->shm_eventgen_lock);
1144 			flowop->fo_tputbucket += events;
1145 			break;
1146 		}
1147 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1148 		    &filebench_shm->shm_eventgen_lock);
1149 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1150 	}
1151 	flowop_endop(threadflow, flowop, 0);
1152 
1153 	return (FILEBENCH_OK);
1154 }
1155 
1156 
1157 /*
1158  * Blocks the calling thread if the number of bytes of I/O
1159  * issued exceeds one megabyte times the number of posted
1160  * events, thus limiting the average I/O byte rate to one
1161  * megabyte times the event rate as set by eventgen_hz.
1162  * Always retuns FILEBENCH_OK.
1163  */
1164 static int
1165 flowoplib_bwlimit(threadflow_t *threadflow, flowop_t *flowop)
1166 {
1167 	uint64_t bytes;
1168 	uint64_t delta;
1169 	uint64_t events;
1170 
1171 	/* Immediately bail if not set/enabled */
1172 	if (filebench_shm->shm_eventgen_hz == 0)
1173 		return (FILEBENCH_OK);
1174 
1175 	if (flowop->fo_initted == 0) {
1176 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1177 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1178 		flowop->fo_initted = 1;
1179 	}
1180 
1181 	(void) ipc_mutex_lock(&controlstats_lock);
1182 	bytes = (controlstats.fs_rbytes +
1183 	    controlstats.fs_wbytes);
1184 	(void) ipc_mutex_unlock(&controlstats_lock);
1185 
1186 	/* Is this the first time around */
1187 	if (flowop->fo_tputlast == 0) {
1188 		flowop->fo_tputlast = bytes;
1189 		return (FILEBENCH_OK);
1190 	}
1191 
1192 	delta = bytes - flowop->fo_tputlast;
1193 	flowop->fo_tputbucket -= delta;
1194 	flowop->fo_tputlast = bytes;
1195 
1196 	/* No need to block if the q isn't empty */
1197 	if (flowop->fo_tputbucket >= 0LL) {
1198 		flowop_endop(threadflow, flowop, 0);
1199 		return (FILEBENCH_OK);
1200 	}
1201 
1202 	bytes = flowop->fo_tputbucket * -1;
1203 	events = (bytes / MB) + 1;
1204 
1205 	filebench_log(LOG_DEBUG_IMPL, "%llu bytes, %llu events",
1206 	    (u_longlong_t)bytes, (u_longlong_t)events);
1207 
1208 	flowop_beginop(threadflow, flowop);
1209 	while (filebench_shm->shm_eventgen_hz) {
1210 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1211 		if (filebench_shm->shm_eventgen_q >= events) {
1212 			filebench_shm->shm_eventgen_q -= events;
1213 			(void) ipc_mutex_unlock(
1214 			    &filebench_shm->shm_eventgen_lock);
1215 			flowop->fo_tputbucket += (events * MB);
1216 			break;
1217 		}
1218 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1219 		    &filebench_shm->shm_eventgen_lock);
1220 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1221 	}
1222 	flowop_endop(threadflow, flowop, 0);
1223 
1224 	return (FILEBENCH_OK);
1225 }
1226 
1227 /*
1228  * These flowops terminate a benchmark run when either the specified
1229  * number of bytes of I/O (flowoplib_finishonbytes) or the specified
1230  * number of I/O operations (flowoplib_finishoncount) have been generated.
1231  */
1232 
1233 
1234 /*
1235  * Stop filebench run when specified number of I/O bytes have been
1236  * transferred. Compares controlstats.fs_bytes with flowop->value,
1237  * and if greater returns 1, stopping the run, if not, returns 0
1238  * to continue running.
1239  */
1240 static int
1241 flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop)
1242 {
1243 	uint64_t b;
1244 	uint64_t bytes = flowop->fo_constvalue; /* use constant value */
1245 
1246 	(void) ipc_mutex_lock(&controlstats_lock);
1247 	b = controlstats.fs_bytes;
1248 	(void) ipc_mutex_unlock(&controlstats_lock);
1249 
1250 	flowop_beginop(threadflow, flowop);
1251 	if (b > bytes) {
1252 		flowop_endop(threadflow, flowop, 0);
1253 		return (FILEBENCH_DONE);
1254 	}
1255 	flowop_endop(threadflow, flowop, 0);
1256 
1257 	return (FILEBENCH_OK);
1258 }
1259 
1260 /*
1261  * Stop filebench run when specified number of I/O operations have
1262  * been performed. Compares controlstats.fs_count with *flowop->value,
1263  * and if greater returns 1, stopping the run, if not, returns FILEBENCH_OK
1264  * to continue running.
1265  */
1266 static int
1267 flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop)
1268 {
1269 	uint64_t ops;
1270 	uint64_t count = flowop->fo_constvalue; /* use constant value */
1271 
1272 	(void) ipc_mutex_lock(&controlstats_lock);
1273 	ops = controlstats.fs_count;
1274 	(void) ipc_mutex_unlock(&controlstats_lock);
1275 
1276 	flowop_beginop(threadflow, flowop);
1277 	if (ops >= count) {
1278 		flowop_endop(threadflow, flowop, 0);
1279 		return (FILEBENCH_DONE);
1280 	}
1281 	flowop_endop(threadflow, flowop, 0);
1282 
1283 	return (FILEBENCH_OK);
1284 }
1285 
1286 /*
1287  * Semaphore synchronization using either System V semaphores or
1288  * posix semaphores. If System V semaphores are available, they will be
1289  * used, otherwise posix semaphores will be used.
1290  */
1291 
1292 
1293 /*
1294  * Initializes the filebench "block on semaphore" flowop.
1295  * If System V semaphores are implemented, the routine
1296  * initializes the System V semaphore subsystem if it hasn't
1297  * already been initialized, also allocates a pair of semids
1298  * and initializes the highwater System V semaphore.
1299  * If no System V semaphores, then does nothing special.
1300  * Returns FILEBENCH_ERROR if it cannot acquire a set of System V semphores
1301  * or if the initial post to the semaphore set fails. Returns FILEBENCH_OK
1302  * on success.
1303  */
1304 static int
1305 flowoplib_semblock_init(flowop_t *flowop)
1306 {
1307 
1308 #ifdef HAVE_SYSV_SEM
1309 	int sys_semid;
1310 	struct sembuf sbuf[2];
1311 	int highwater;
1312 
1313 	ipc_seminit();
1314 
1315 	flowop->fo_semid_lw = ipc_semidalloc();
1316 	flowop->fo_semid_hw = ipc_semidalloc();
1317 
1318 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d semblock init semid=%x",
1319 	    flowop->fo_name, flowop->fo_instance, flowop->fo_semid_lw);
1320 
1321 	sys_semid = filebench_shm->shm_sys_semid;
1322 
1323 	if ((highwater = flowop->fo_semid_hw) == 0)
1324 		highwater = flowop->fo_constvalue; /* use constant value */
1325 
1326 	filebench_log(LOG_DEBUG_IMPL, "setting highwater to : %d", highwater);
1327 
1328 	sbuf[0].sem_num = (short)highwater;
1329 	sbuf[0].sem_op = avd_get_int(flowop->fo_highwater);
1330 	sbuf[0].sem_flg = 0;
1331 	if ((semop(sys_semid, &sbuf[0], 1) == -1) && errno) {
1332 		filebench_log(LOG_ERROR, "semblock init post failed: %s (%d,"
1333 		    "%d)", strerror(errno), sbuf[0].sem_num, sbuf[0].sem_op);
1334 		return (FILEBENCH_ERROR);
1335 	}
1336 #else
1337 	filebench_log(LOG_DEBUG_IMPL,
1338 	    "flow %s-%d semblock init with posix semaphore",
1339 	    flowop->fo_name, flowop->fo_instance);
1340 
1341 	sem_init(&flowop->fo_sem, 1, 0);
1342 #endif	/* HAVE_SYSV_SEM */
1343 
1344 	if (!(avd_get_bool(flowop->fo_blocking)))
1345 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1346 
1347 	return (FILEBENCH_OK);
1348 }
1349 
1350 /*
1351  * Releases the semids for the System V semaphore allocated
1352  * to this flowop. If not using System V semaphores, then
1353  * it is effectively just a no-op.
1354  */
1355 static void
1356 flowoplib_semblock_destruct(flowop_t *flowop)
1357 {
1358 #ifdef HAVE_SYSV_SEM
1359 	ipc_semidfree(flowop->fo_semid_lw);
1360 	ipc_semidfree(flowop->fo_semid_hw);
1361 	(void) semctl(filebench_shm->shm_sys_semid, 0, IPC_RMID);
1362 	filebench_shm->shm_sys_semid = -1;
1363 #else
1364 	sem_destroy(&flowop->fo_sem);
1365 #endif /* HAVE_SYSV_SEM */
1366 }
1367 
1368 /*
1369  * Attempts to pass a System V or posix semaphore as appropriate,
1370  * and blocks if necessary. Returns FILEBENCH_ERROR if a set of System V
1371  * semphores is not available or cannot be acquired, or if the initial
1372  * post to the semaphore set fails. Returns FILEBENCH_OK on success.
1373  */
1374 static int
1375 flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop)
1376 {
1377 
1378 #ifdef HAVE_SYSV_SEM
1379 	struct sembuf sbuf[2];
1380 	int value = avd_get_int(flowop->fo_value);
1381 	int sys_semid;
1382 	struct timespec timeout;
1383 
1384 	sys_semid = filebench_shm->shm_sys_semid;
1385 
1386 	filebench_log(LOG_DEBUG_IMPL,
1387 	    "flow %s-%d sem blocking on id %x num %x value %d",
1388 	    flowop->fo_name, flowop->fo_instance, sys_semid,
1389 	    flowop->fo_semid_hw, value);
1390 
1391 	/* Post, decrement the increment the hw queue */
1392 	sbuf[0].sem_num = flowop->fo_semid_hw;
1393 	sbuf[0].sem_op = (short)value;
1394 	sbuf[0].sem_flg = 0;
1395 	sbuf[1].sem_num = flowop->fo_semid_lw;
1396 	sbuf[1].sem_op = value * -1;
1397 	sbuf[1].sem_flg = 0;
1398 	timeout.tv_sec = 600;
1399 	timeout.tv_nsec = 0;
1400 
1401 	if (avd_get_bool(flowop->fo_blocking))
1402 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1403 
1404 	flowop_beginop(threadflow, flowop);
1405 
1406 #ifdef HAVE_SEMTIMEDOP
1407 	(void) semtimedop(sys_semid, &sbuf[0], 1, &timeout);
1408 	(void) semtimedop(sys_semid, &sbuf[1], 1, &timeout);
1409 #else
1410 	(void) semop(sys_semid, &sbuf[0], 1);
1411 	(void) semop(sys_semid, &sbuf[1], 1);
1412 #endif /* HAVE_SEMTIMEDOP */
1413 
1414 	if (avd_get_bool(flowop->fo_blocking))
1415 		(void) ipc_mutex_lock(&flowop->fo_lock);
1416 
1417 	flowop_endop(threadflow, flowop, 0);
1418 
1419 #else
1420 	int value = avd_get_int(flowop->fo_value);
1421 	int i;
1422 
1423 	filebench_log(LOG_DEBUG_IMPL,
1424 	    "flow %s-%d sem blocking on posix semaphore",
1425 	    flowop->fo_name, flowop->fo_instance);
1426 
1427 	/* Decrement sem by value */
1428 	for (i = 0; i < value; i++) {
1429 		if (sem_wait(&flowop->fo_sem) == -1) {
1430 			filebench_log(LOG_ERROR, "semop wait failed");
1431 			return (FILEBENCH_ERROR);
1432 		}
1433 	}
1434 
1435 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d sem unblocking",
1436 	    flowop->fo_name, flowop->fo_instance);
1437 #endif /* HAVE_SYSV_SEM */
1438 
1439 	return (FILEBENCH_OK);
1440 }
1441 
1442 /*
1443  * Calls ipc_seminit(). Always returns FILEBENCH_OK.
1444  */
1445 /* ARGSUSED */
1446 static int
1447 flowoplib_sempost_init(flowop_t *flowop)
1448 {
1449 #ifdef HAVE_SYSV_SEM
1450 	ipc_seminit();
1451 #endif /* HAVE_SYSV_SEM */
1452 	return (FILEBENCH_OK);
1453 }
1454 
1455 /*
1456  * Post to a System V or posix semaphore as appropriate.
1457  * On the first call for a given flowop instance, this routine
1458  * will use the fo_targetname attribute to locate all semblock
1459  * flowops that are expecting posts from this flowop. All
1460  * target flowops on this list will have a post operation done
1461  * to their semaphores on each call.
1462  */
1463 static int
1464 flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop)
1465 {
1466 	flowop_t *target;
1467 
1468 	filebench_log(LOG_DEBUG_IMPL,
1469 	    "sempost flow %s-%d",
1470 	    flowop->fo_name,
1471 	    flowop->fo_instance);
1472 
1473 	/* if this is the first post, create the post list */
1474 	if (flowop->fo_targets == NULL) {
1475 		flowop_t *result = flowop_find(flowop->fo_targetname);
1476 
1477 		flowop->fo_targets = result;
1478 
1479 		if (result == NULL) {
1480 			filebench_log(LOG_ERROR,
1481 			    "sempost: could not find op %s for thread %s",
1482 			    flowop->fo_targetname,
1483 			    threadflow->tf_name);
1484 			filebench_shutdown(1);
1485 		}
1486 
1487 		while (result) {
1488 			result->fo_targetnext =
1489 			    result->fo_resultnext;
1490 			result = result->fo_resultnext;
1491 		}
1492 	}
1493 
1494 	target = flowop->fo_targets;
1495 
1496 	flowop_beginop(threadflow, flowop);
1497 	/* post to the targets */
1498 	while (target) {
1499 #ifdef HAVE_SYSV_SEM
1500 		struct sembuf sbuf[2];
1501 		int sys_semid;
1502 		int blocking;
1503 #else
1504 		int i;
1505 #endif /* HAVE_SYSV_SEM */
1506 		struct timespec timeout;
1507 		int value = (int)avd_get_int(flowop->fo_value);
1508 
1509 		if (target->fo_instance == FLOW_MASTER) {
1510 			target = target->fo_targetnext;
1511 			continue;
1512 		}
1513 
1514 #ifdef HAVE_SYSV_SEM
1515 
1516 		filebench_log(LOG_DEBUG_IMPL,
1517 		    "sempost flow %s-%d num %x",
1518 		    target->fo_name,
1519 		    target->fo_instance,
1520 		    target->fo_semid_lw);
1521 
1522 		sys_semid = filebench_shm->shm_sys_semid;
1523 		sbuf[0].sem_num = target->fo_semid_lw;
1524 		sbuf[0].sem_op = (short)value;
1525 		sbuf[0].sem_flg = 0;
1526 		sbuf[1].sem_num = target->fo_semid_hw;
1527 		sbuf[1].sem_op = value * -1;
1528 		sbuf[1].sem_flg = 0;
1529 		timeout.tv_sec = 600;
1530 		timeout.tv_nsec = 0;
1531 
1532 		if (avd_get_bool(flowop->fo_blocking))
1533 			blocking = 1;
1534 		else
1535 			blocking = 0;
1536 
1537 #ifdef HAVE_SEMTIMEDOP
1538 		if ((semtimedop(sys_semid, &sbuf[0], blocking + 1,
1539 		    &timeout) == -1) && (errno && (errno != EAGAIN))) {
1540 #else
1541 		if ((semop(sys_semid, &sbuf[0], blocking + 1) == -1) &&
1542 		    (errno && (errno != EAGAIN))) {
1543 #endif /* HAVE_SEMTIMEDOP */
1544 			filebench_log(LOG_ERROR, "semop post failed: %s",
1545 			    strerror(errno));
1546 			return (FILEBENCH_ERROR);
1547 		}
1548 
1549 		filebench_log(LOG_DEBUG_IMPL,
1550 		    "flow %s-%d finished posting",
1551 		    target->fo_name, target->fo_instance);
1552 #else
1553 		filebench_log(LOG_DEBUG_IMPL,
1554 		    "sempost flow %s-%d to posix semaphore",
1555 		    target->fo_name,
1556 		    target->fo_instance);
1557 
1558 		/* Increment sem by value */
1559 		for (i = 0; i < value; i++) {
1560 			if (sem_post(&target->fo_sem) == -1) {
1561 				filebench_log(LOG_ERROR, "semop post failed");
1562 				return (FILEBENCH_ERROR);
1563 			}
1564 		}
1565 
1566 		filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
1567 		    target->fo_name, target->fo_instance);
1568 #endif /* HAVE_SYSV_SEM */
1569 
1570 		target = target->fo_targetnext;
1571 	}
1572 	flowop_endop(threadflow, flowop, 0);
1573 
1574 	return (FILEBENCH_OK);
1575 }
1576 
1577 
1578 /*
1579  * Section for exercising create / open / close / delete operations
1580  * on files within a fileset. For proper operation, the flowop attribute
1581  * "fd", which sets the fo_fdnumber field in the flowop, must be used
1582  * so that the same file is opened and later closed. "fd" is an index
1583  * into a pair of arrays maintained by threadflows, one of which
1584  * contains the operating system assigned file descriptors and the other
1585  * a pointer to the filesetentry whose file the file descriptor
1586  * references. An openfile flowop defined without fd being set will use
1587  * the default (0) fd or, if specified, rotate through fd indices, but
1588  * createfile and closefile must use the default or a specified fd.
1589  * Meanwhile deletefile picks and arbitrary file to delete, regardless
1590  * of fd attribute.
1591  */
1592 
1593 /*
1594  * XXX Making file selection more consistent among the flowops might good
1595  */
1596 
1597 
1598 /*
1599  * Emulates (and actually does) file open. Obtains a file descriptor
1600  * index, then calls flowoplib_openfile_common() to open. Returns
1601  * FILEBENCH_ERROR if no file descriptor is found, and returns the
1602  * status from flowoplib_openfile_common otherwise (FILEBENCH_ERROR,
1603  * FILEBENCH_NORSC, FILEBENCH_OK).
1604  */
1605 static int
1606 flowoplib_openfile(threadflow_t *threadflow, flowop_t *flowop)
1607 {
1608 	int fd = flowoplib_fdnum(threadflow, flowop);
1609 
1610 	if (fd == -1)
1611 		return (FILEBENCH_ERROR);
1612 
1613 	return (flowoplib_openfile_common(threadflow, flowop, fd));
1614 }
1615 
1616 /*
1617  * Common file opening code for filesets. Uses the supplied
1618  * file descriptor index to determine the tf_fd entry to use.
1619  * If the entry is empty (0) and the fileset exists, fileset
1620  * pick is called to select a fileset entry to use. The file
1621  * specified in the filesetentry is opened, and the returned
1622  * operating system file descriptor and a pointer to the
1623  * filesetentry are stored in tf_fd[fd] and tf_fse[fd],
1624  * respectively. Returns FILEBENCH_ERROR on error,
1625  * FILEBENCH_NORSC if no suitable filesetentry can be found,
1626  * and FILEBENCH_OK on success.
1627  */
1628 static int
1629 flowoplib_openfile_common(threadflow_t *threadflow, flowop_t *flowop, int fd)
1630 {
1631 	filesetentry_t *file;
1632 	char *fileset_name;
1633 	int tid = 0;
1634 
1635 	if (flowop->fo_fileset == NULL) {
1636 		filebench_log(LOG_ERROR, "flowop NULL file");
1637 		return (FILEBENCH_ERROR);
1638 	}
1639 
1640 	if ((fileset_name =
1641 	    avd_get_str(flowop->fo_fileset->fs_name)) == NULL) {
1642 		filebench_log(LOG_ERROR,
1643 		    "flowop %s: fileset has no name", flowop->fo_name);
1644 		return (FILEBENCH_ERROR);
1645 	}
1646 
1647 	/*
1648 	 * If the flowop doesn't default to persistent fd
1649 	 * then get unique thread ID for use by fileset_pick
1650 	 */
1651 	if (avd_get_bool(flowop->fo_rotatefd))
1652 		tid = threadflow->tf_utid;
1653 
1654 	if (threadflow->tf_fd[fd] != 0) {
1655 		filebench_log(LOG_ERROR,
1656 		    "flowop %s attempted to open without closing on fd %d",
1657 		    flowop->fo_name, fd);
1658 		return (FILEBENCH_ERROR);
1659 	}
1660 
1661 #ifdef HAVE_RAW_SUPPORT
1662 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1663 		int open_attrs = 0;
1664 		char name[MAXPATHLEN];
1665 
1666 		(void) strcpy(name,
1667 		    avd_get_str(flowop->fo_fileset->fs_path));
1668 		(void) strcat(name, "/");
1669 		(void) strcat(name, fileset_name);
1670 
1671 		if (avd_get_bool(flowop->fo_dsync)) {
1672 #ifdef sun
1673 			open_attrs |= O_DSYNC;
1674 #else
1675 			open_attrs |= O_FSYNC;
1676 #endif
1677 		}
1678 
1679 		filebench_log(LOG_DEBUG_SCRIPT,
1680 		    "open raw device %s flags %d = %d", name, open_attrs, fd);
1681 
1682 		threadflow->tf_fd[fd] = open64(name,
1683 		    O_RDWR | open_attrs, 0666);
1684 
1685 		if (threadflow->tf_fd[fd] < 0) {
1686 			filebench_log(LOG_ERROR,
1687 			    "Failed to open raw device %s: %s",
1688 			    name, strerror(errno));
1689 			return (FILEBENCH_ERROR);
1690 		}
1691 
1692 		/* if running on Solaris, use un-buffered io */
1693 #ifdef sun
1694 		(void) directio(threadflow->tf_fd[fd], DIRECTIO_ON);
1695 #endif
1696 
1697 		threadflow->tf_fse[fd] = NULL;
1698 
1699 		return (FILEBENCH_OK);
1700 	}
1701 #endif /* HAVE_RAW_SUPPORT */
1702 
1703 	if ((file = fileset_pick(flowop->fo_fileset,
1704 	    FILESET_PICKEXISTS, tid)) == NULL) {
1705 		filebench_log(LOG_DEBUG_SCRIPT,
1706 		    "flowop %s failed to pick file from %s on fd %d",
1707 		    flowop->fo_name, fileset_name, fd);
1708 		return (FILEBENCH_NORSC);
1709 	}
1710 
1711 	threadflow->tf_fse[fd] = file;
1712 
1713 	flowop_beginop(threadflow, flowop);
1714 	threadflow->tf_fd[fd] = fileset_openfile(flowop->fo_fileset,
1715 	    file, O_RDWR, 0666, flowoplib_fileattrs(flowop));
1716 	flowop_endop(threadflow, flowop, 0);
1717 
1718 	if (threadflow->tf_fd[fd] < 0) {
1719 		filebench_log(LOG_ERROR, "flowop %s failed to open file %s",
1720 		    flowop->fo_name, file->fse_path);
1721 		return (FILEBENCH_ERROR);
1722 	}
1723 
1724 	filebench_log(LOG_DEBUG_SCRIPT,
1725 	    "flowop %s: opened %s fd[%d] = %d",
1726 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1727 
1728 	return (FILEBENCH_OK);
1729 }
1730 
1731 /*
1732  * Emulate create of a file. Uses the flowop's fdnumber to select
1733  * tf_fd and tf_fse array locations to put the created file's file
1734  * descriptor and filesetentry respectively. Uses fileset_pick()
1735  * to select a specific filesetentry whose file does not currently
1736  * exist for the file create operation. Then calls
1737  * fileset_openfile() with the O_CREATE flag set to create the
1738  * file. Returns FILEBENCH_ERROR if the array index specified by fdnumber is
1739  * already in use, the flowop has no associated fileset, or
1740  * the create call fails. Returns 1 if a filesetentry with a
1741  * nonexistent file cannot be found. Returns FILEBENCH_OK on success.
1742  */
1743 static int
1744 flowoplib_createfile(threadflow_t *threadflow, flowop_t *flowop)
1745 {
1746 	filesetentry_t *file;
1747 	int fd = flowop->fo_fdnumber;
1748 
1749 	if (threadflow->tf_fd[fd] != 0) {
1750 		filebench_log(LOG_ERROR,
1751 		    "flowop %s attempted to create without closing on fd %d",
1752 		    flowop->fo_name, fd);
1753 		return (FILEBENCH_ERROR);
1754 	}
1755 
1756 	if (flowop->fo_fileset == NULL) {
1757 		filebench_log(LOG_ERROR, "flowop NULL file");
1758 		return (FILEBENCH_ERROR);
1759 	}
1760 
1761 #ifdef HAVE_RAW_SUPPORT
1762 	/* can't be used with raw devices */
1763 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1764 		filebench_log(LOG_ERROR,
1765 		    "flowop %s attempted to a createfile on RAW device",
1766 		    flowop->fo_name);
1767 		return (FILEBENCH_ERROR);
1768 	}
1769 #endif /* HAVE_RAW_SUPPORT */
1770 
1771 	if ((file = fileset_pick(flowop->fo_fileset,
1772 	    FILESET_PICKNOEXIST, 0)) == NULL) {
1773 		filebench_log(LOG_DEBUG_SCRIPT,
1774 		    "flowop %s failed to pick file from fileset %s",
1775 		    flowop->fo_name,
1776 		    avd_get_str(flowop->fo_fileset->fs_name));
1777 		return (FILEBENCH_NORSC);
1778 	}
1779 
1780 	threadflow->tf_fse[fd] = file;
1781 
1782 	flowop_beginop(threadflow, flowop);
1783 	threadflow->tf_fd[fd] = fileset_openfile(flowop->fo_fileset,
1784 	    file, O_RDWR | O_CREAT, 0666, flowoplib_fileattrs(flowop));
1785 	flowop_endop(threadflow, flowop, 0);
1786 
1787 	if (threadflow->tf_fd[fd] < 0) {
1788 		filebench_log(LOG_ERROR, "failed to create file %s",
1789 		    flowop->fo_name);
1790 		return (FILEBENCH_ERROR);
1791 	}
1792 
1793 	filebench_log(LOG_DEBUG_SCRIPT,
1794 	    "flowop %s: created %s fd[%d] = %d",
1795 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1796 
1797 	return (FILEBENCH_OK);
1798 }
1799 
1800 /*
1801  * Emulates delete of a file. If a valid fd is provided, it uses the
1802  * filesetentry stored at that fd location to select the file to be
1803  * deleted, otherwise it picks an arbitrary filesetentry
1804  * whose file exists. It then uses unlink() to delete it and Clears
1805  * the FSE_EXISTS flag for the filesetentry. Returns FILEBENCH_ERROR if the
1806  * flowop has no associated fileset. Returns FILEBENCH_NORSC if an appropriate
1807  * filesetentry cannot be found, and FILEBENCH_OK on success.
1808  */
1809 static int
1810 flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop)
1811 {
1812 	filesetentry_t *file;
1813 	fileset_t *fileset;
1814 	char path[MAXPATHLEN];
1815 	char *pathtmp;
1816 	int fd = flowop->fo_fdnumber;
1817 
1818 	/* if fd specified, use it to access file */
1819 	if ((fd > 0) && ((file = threadflow->tf_fse[fd]) != NULL)) {
1820 
1821 		/* check whether file still open */
1822 		if (threadflow->tf_fd[fd] > 0) {
1823 			filebench_log(LOG_DEBUG_SCRIPT,
1824 			    "flowop %s deleting still open file at fd = %d",
1825 			    flowop->fo_name, fd);
1826 		}
1827 
1828 		/* indicate that the file will be deleted */
1829 		threadflow->tf_fse[fd] = NULL;
1830 
1831 		/* if here, we still have a valid file pointer */
1832 		fileset = file->fse_fileset;
1833 	} else {
1834 		/* Otherwise, pick arbitrary file */
1835 		file = NULL;
1836 		fileset = flowop->fo_fileset;
1837 	}
1838 
1839 
1840 	if (fileset == NULL) {
1841 		filebench_log(LOG_ERROR, "flowop NULL file");
1842 		return (FILEBENCH_ERROR);
1843 	}
1844 
1845 #ifdef HAVE_RAW_SUPPORT
1846 	/* can't be used with raw devices */
1847 	if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1848 		filebench_log(LOG_ERROR,
1849 		    "flowop %s attempted a deletefile on RAW device",
1850 		    flowop->fo_name);
1851 		return (FILEBENCH_ERROR);
1852 	}
1853 #endif /* HAVE_RAW_SUPPORT */
1854 
1855 	if (file == NULL) {
1856 		if ((file = fileset_pick(fileset, FILESET_PICKEXISTS, 0))
1857 		    == NULL) {
1858 			filebench_log(LOG_DEBUG_SCRIPT,
1859 			    "flowop %s failed to pick file", flowop->fo_name);
1860 			return (FILEBENCH_NORSC);
1861 		}
1862 	} else {
1863 		(void) ipc_mutex_lock(&file->fse_lock);
1864 	}
1865 
1866 	*path = 0;
1867 	(void) strcpy(path, avd_get_str(fileset->fs_path));
1868 	(void) strcat(path, "/");
1869 	(void) strcat(path, avd_get_str(fileset->fs_name));
1870 	pathtmp = fileset_resolvepath(file);
1871 	(void) strcat(path, pathtmp);
1872 	free(pathtmp);
1873 
1874 	flowop_beginop(threadflow, flowop);
1875 	(void) unlink(path);
1876 	flowop_endop(threadflow, flowop, 0);
1877 	file->fse_flags &= ~FSE_EXISTS;
1878 	(void) ipc_mutex_unlock(&file->fse_lock);
1879 
1880 	filebench_log(LOG_DEBUG_SCRIPT, "deleted file %s", file->fse_path);
1881 
1882 	return (FILEBENCH_OK);
1883 }
1884 
1885 /*
1886  * Emulates fsync of a file. Obtains the file descriptor index
1887  * from the flowop, obtains the actual file descriptor from
1888  * the threadflow's table, checks to be sure it is still an
1889  * open file, then does an fsync operation on it. Returns FILEBENCH_ERROR
1890  * if the file no longer is open, FILEBENCH_OK otherwise.
1891  */
1892 static int
1893 flowoplib_fsync(threadflow_t *threadflow, flowop_t *flowop)
1894 {
1895 	filesetentry_t *file;
1896 	int fd = flowop->fo_fdnumber;
1897 
1898 	if (threadflow->tf_fd[fd] == 0) {
1899 		filebench_log(LOG_ERROR,
1900 		    "flowop %s attempted to fsync a closed fd %d",
1901 		    flowop->fo_name, fd);
1902 		return (FILEBENCH_ERROR);
1903 	}
1904 
1905 	file = threadflow->tf_fse[fd];
1906 
1907 	if ((file == NULL) ||
1908 	    (file->fse_fileset->fs_attrs & FILESET_IS_RAW_DEV)) {
1909 		filebench_log(LOG_ERROR,
1910 		    "flowop %s attempted to a fsync a RAW device",
1911 		    flowop->fo_name);
1912 		return (FILEBENCH_ERROR);
1913 	}
1914 
1915 	/* Measure time to fsync */
1916 	flowop_beginop(threadflow, flowop);
1917 	(void) fsync(threadflow->tf_fd[fd]);
1918 	flowop_endop(threadflow, flowop, 0);
1919 
1920 	filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s", file->fse_path);
1921 
1922 	return (FILEBENCH_OK);
1923 }
1924 
1925 /*
1926  * Emulate fsync of an entire fileset. Search through the
1927  * threadflow's file descriptor array, doing fsync() on each
1928  * open file that belongs to the flowop's fileset. Always
1929  * returns FILEBENCH_OK.
1930  */
1931 static int
1932 flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop)
1933 {
1934 	int fd;
1935 
1936 	for (fd = 0; fd < THREADFLOW_MAXFD; fd++) {
1937 		filesetentry_t *file;
1938 
1939 		/* Match the file set to fsync */
1940 		if ((threadflow->tf_fse[fd] == NULL) ||
1941 		    (flowop->fo_fileset != threadflow->tf_fse[fd]->fse_fileset))
1942 			continue;
1943 
1944 		/* Measure time to fsync */
1945 		flowop_beginop(threadflow, flowop);
1946 		(void) fsync(threadflow->tf_fd[fd]);
1947 		flowop_endop(threadflow, flowop, 0);
1948 
1949 		file = threadflow->tf_fse[fd];
1950 
1951 		filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s",
1952 		    file->fse_path);
1953 	}
1954 
1955 	return (FILEBENCH_OK);
1956 }
1957 
1958 /*
1959  * Emulate close of a file.  Obtains the file descriptor index
1960  * from the flowop, obtains the actual file descriptor from the
1961  * threadflow's table, checks to be sure it is still an open
1962  * file, then does a close operation on it. Then sets the
1963  * threadflow file descriptor table entry to 0, and the file set
1964  * entry pointer to NULL. Returns FILEBENCH_ERROR if the file was not open,
1965  * FILEBENCH_OK otherwise.
1966  */
1967 static int
1968 flowoplib_closefile(threadflow_t *threadflow, flowop_t *flowop)
1969 {
1970 	filesetentry_t *file;
1971 	int fd = flowop->fo_fdnumber;
1972 
1973 	if (threadflow->tf_fd[fd] == 0) {
1974 		filebench_log(LOG_ERROR,
1975 		    "flowop %s attempted to close an already closed fd %d",
1976 		    flowop->fo_name, fd);
1977 		return (FILEBENCH_ERROR);
1978 	}
1979 
1980 	/* Measure time to close */
1981 	flowop_beginop(threadflow, flowop);
1982 	(void) close(threadflow->tf_fd[fd]);
1983 	flowop_endop(threadflow, flowop, 0);
1984 
1985 	file = threadflow->tf_fse[fd];
1986 
1987 	threadflow->tf_fd[fd] = 0;
1988 
1989 	filebench_log(LOG_DEBUG_SCRIPT, "closed file %s", file->fse_path);
1990 
1991 	return (FILEBENCH_OK);
1992 }
1993 
1994 /*
1995  * Emulate stat of a file. Picks an arbitrary filesetentry with
1996  * an existing file from the flowop's fileset, then performs a
1997  * stat() operation on it. Returns FILEBENCH_ERROR if the flowop has no
1998  * associated fileset. Returns FILEBENCH_NORSC if an appropriate filesetentry
1999  * cannot be found, and FILEBENCH_OK on success.
2000  */
2001 static int
2002 flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop)
2003 {
2004 	filesetentry_t *file;
2005 	fileset_t *fileset;
2006 	char path[MAXPATHLEN];
2007 	char *pathtmp;
2008 
2009 	if ((fileset = flowop->fo_fileset) == NULL) {
2010 		filebench_log(LOG_ERROR, "flowop NULL file");
2011 		return (FILEBENCH_ERROR);
2012 	}
2013 
2014 	if ((file = fileset_pick(fileset, FILESET_PICKEXISTS, 0)) == NULL) {
2015 		filebench_log(LOG_DEBUG_SCRIPT,
2016 		    "flowop %s failed to pick file",
2017 		    flowop->fo_name);
2018 		return (FILEBENCH_NORSC);
2019 	}
2020 
2021 	*path = 0;
2022 	(void) strcpy(path, avd_get_str(fileset->fs_path));
2023 	(void) strcat(path, "/");
2024 	(void) strcat(path, avd_get_str(fileset->fs_name));
2025 	pathtmp = fileset_resolvepath(file);
2026 	(void) strcat(path, pathtmp);
2027 	free(pathtmp);
2028 
2029 	flowop_beginop(threadflow, flowop);
2030 	flowop_endop(threadflow, flowop, 0);
2031 
2032 	(void) ipc_mutex_unlock(&file->fse_lock);
2033 
2034 	return (FILEBENCH_OK);
2035 }
2036 
2037 
2038 /*
2039  * Additional reads and writes. Read and write whole files, write
2040  * and append to files. Some of these work with both fileobjs and
2041  * filesets, others only with filesets. The flowoplib_write routine
2042  * writes from thread memory, while the others read or write using
2043  * fo_buf memory. Note that both flowoplib_read() and
2044  * flowoplib_aiowrite() use thread memory as well.
2045  */
2046 
2047 
2048 /*
2049  * Emulate a read of a whole file. The file must be open with
2050  * file descriptor and filesetentry stored at the locations indexed
2051  * by the flowop's fdnumber. It then seeks to the beginning of the
2052  * associated file, and reads fs_iosize bytes at a time until the end
2053  * of the file. Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if
2054  * out of files, and FILEBENCH_OK on success.
2055  */
2056 static int
2057 flowoplib_readwholefile(threadflow_t *threadflow, flowop_t *flowop)
2058 {
2059 	caddr_t iobuf;
2060 	off64_t bytes = 0;
2061 	int filedesc;
2062 	uint64_t wss;
2063 	fbint_t iosize;
2064 	int ret;
2065 	char zerordbuf;
2066 
2067 	/* get the file to use */
2068 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
2069 	    &filedesc)) != FILEBENCH_OK)
2070 		return (ret);
2071 
2072 	/* an I/O size of zero means read entire working set with one I/O */
2073 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
2074 		iosize = wss;
2075 
2076 	/*
2077 	 * The file may actually be 0 bytes long, in which case skip
2078 	 * the buffer set up call (which would fail) and substitute
2079 	 * a small buffer, which won't really be used.
2080 	 */
2081 	if (iosize == 0) {
2082 		iobuf = (caddr_t)&zerordbuf;
2083 		filebench_log(LOG_DEBUG_SCRIPT,
2084 		    "flowop %s read zero length file", flowop->fo_name);
2085 	} else {
2086 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
2087 		    iosize) != 0)
2088 			return (FILEBENCH_ERROR);
2089 	}
2090 
2091 	/* Measure time to read bytes */
2092 	flowop_beginop(threadflow, flowop);
2093 	(void) lseek64(filedesc, 0, SEEK_SET);
2094 	while ((ret = read(filedesc, iobuf, iosize)) > 0)
2095 		bytes += ret;
2096 
2097 	flowop_endop(threadflow, flowop, bytes);
2098 
2099 	if (ret < 0) {
2100 		filebench_log(LOG_ERROR,
2101 		    "readwhole fail Failed to read whole file: %s",
2102 		    strerror(errno));
2103 		return (FILEBENCH_ERROR);
2104 	}
2105 
2106 	return (FILEBENCH_OK);
2107 }
2108 
2109 /*
2110  * Emulate a write to a file of size fo_iosize.  Will write
2111  * to a file from a fileset if the flowop's fo_fileset field
2112  * specifies one or its fdnumber is non zero. Otherwise it
2113  * will write to a fileobj file, if one exists. If the file
2114  * is not currently open, the routine will attempt to open
2115  * it. The flowop's fo_wss parameter will be used to set the
2116  * maximum file size if it is non-zero, otherwise the
2117  * filesetentry's  fse_size will be used. A random memory
2118  * buffer offset is calculated, and, if fo_random is TRUE,
2119  * a random file offset is used for the write. Otherwise the
2120  * write is to the next sequential location. Returns
2121  * FILEBENCH_ERROR on errors, FILEBENCH_NORSC if iosetup can't
2122  * obtain a file, or FILEBENCH_OK on success.
2123  */
2124 static int
2125 flowoplib_write(threadflow_t *threadflow, flowop_t *flowop)
2126 {
2127 	caddr_t iobuf;
2128 	fbint_t wss;
2129 	fbint_t iosize;
2130 	int filedesc;
2131 	int ret;
2132 
2133 	iosize = avd_get_int(flowop->fo_iosize);
2134 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2135 	    &filedesc, iosize)) != FILEBENCH_OK)
2136 		return (ret);
2137 
2138 	if (avd_get_bool(flowop->fo_random)) {
2139 		uint64_t fileoffset;
2140 
2141 		if (filebench_randomno64(&fileoffset,
2142 		    wss, iosize, NULL) == -1) {
2143 			filebench_log(LOG_ERROR,
2144 			    "file size smaller than IO size for thread %s",
2145 			    flowop->fo_name);
2146 			return (FILEBENCH_ERROR);
2147 		}
2148 		flowop_beginop(threadflow, flowop);
2149 		if (pwrite64(filedesc, iobuf,
2150 		    iosize, (off64_t)fileoffset) == -1) {
2151 			filebench_log(LOG_ERROR, "write failed, "
2152 			    "offset %llu io buffer %zd: %s",
2153 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
2154 			flowop_endop(threadflow, flowop, 0);
2155 			return (FILEBENCH_ERROR);
2156 		}
2157 		flowop_endop(threadflow, flowop, iosize);
2158 	} else {
2159 		flowop_beginop(threadflow, flowop);
2160 		if (write(filedesc, iobuf, iosize) == -1) {
2161 			filebench_log(LOG_ERROR,
2162 			    "write failed, io buffer %zd: %s",
2163 			    iobuf, strerror(errno));
2164 			flowop_endop(threadflow, flowop, 0);
2165 			return (FILEBENCH_ERROR);
2166 		}
2167 		flowop_endop(threadflow, flowop, iosize);
2168 	}
2169 
2170 	return (FILEBENCH_OK);
2171 }
2172 
2173 /*
2174  * Emulate a write of a whole file.  The size of the file
2175  * is taken from a filesetentry identified by fo_srcfdnumber or
2176  * from the working set size, while the file descriptor used is
2177  * identified by fo_fdnumber. Does multiple writes of fo_iosize
2178  * length length until full file has been written. Returns FILEBENCH_ERROR on
2179  * error, FILEBENCH_NORSC if out of files, FILEBENCH_OK on success.
2180  */
2181 static int
2182 flowoplib_writewholefile(threadflow_t *threadflow, flowop_t *flowop)
2183 {
2184 	caddr_t iobuf;
2185 	filesetentry_t *file;
2186 	int wsize;
2187 	off64_t seek;
2188 	off64_t bytes = 0;
2189 	uint64_t wss;
2190 	fbint_t iosize;
2191 	int filedesc;
2192 	int srcfd = flowop->fo_srcfdnumber;
2193 	int ret;
2194 	char zerowrtbuf;
2195 
2196 	/* get the file to use */
2197 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
2198 	    &filedesc)) != FILEBENCH_OK)
2199 		return (ret);
2200 
2201 	/* an I/O size of zero means write entire working set with one I/O */
2202 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
2203 		iosize = wss;
2204 
2205 	/*
2206 	 * The file may actually be 0 bytes long, in which case skip
2207 	 * the buffer set up call (which would fail) and substitute
2208 	 * a small buffer, which won't really be used.
2209 	 */
2210 	if (iosize == 0) {
2211 		iobuf = (caddr_t)&zerowrtbuf;
2212 		filebench_log(LOG_DEBUG_SCRIPT,
2213 		    "flowop %s wrote zero length file", flowop->fo_name);
2214 	} else {
2215 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
2216 		    iosize) != 0)
2217 			return (FILEBENCH_ERROR);
2218 	}
2219 
2220 	file = threadflow->tf_fse[srcfd];
2221 	if ((srcfd != 0) && (file == NULL)) {
2222 		filebench_log(LOG_ERROR, "flowop %s: NULL src file",
2223 		    flowop->fo_name);
2224 		return (FILEBENCH_ERROR);
2225 	}
2226 
2227 	if (file)
2228 		wss = file->fse_size;
2229 
2230 	wsize = (int)MIN(wss, iosize);
2231 
2232 	/* Measure time to write bytes */
2233 	flowop_beginop(threadflow, flowop);
2234 	for (seek = 0; seek < wss; seek += wsize) {
2235 		ret = write(filedesc, iobuf, wsize);
2236 		if (ret != wsize) {
2237 			filebench_log(LOG_ERROR,
2238 			    "Failed to write %d bytes on fd %d: %s",
2239 			    wsize, filedesc, strerror(errno));
2240 			flowop_endop(threadflow, flowop, 0);
2241 			return (FILEBENCH_ERROR);
2242 		}
2243 		wsize = (int)MIN(wss - seek, iosize);
2244 		bytes += ret;
2245 	}
2246 	flowop_endop(threadflow, flowop, bytes);
2247 
2248 	return (FILEBENCH_OK);
2249 }
2250 
2251 
2252 /*
2253  * Emulate a fixed size append to a file. Will append data to
2254  * a file chosen from a fileset if the flowop's fo_fileset
2255  * field specifies one or if its fdnumber is non zero.
2256  * Otherwise it will write to a fileobj file, if one exists.
2257  * The flowop's fo_wss parameter will be used to set the
2258  * maximum file size if it is non-zero, otherwise the
2259  * filesetentry's fse_size will be used. A random memory
2260  * buffer offset is calculated, then a logical seek to the
2261  * end of file is done followed by a write of fo_iosize
2262  * bytes. Writes are actually done from fo_buf, rather than
2263  * tf_mem as is done with flowoplib_write(), and no check
2264  * is made to see if fo_iosize exceeds the size of fo_buf.
2265  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
2266  * files in the fileset, FILEBENCH_OK on success.
2267  */
2268 static int
2269 flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop)
2270 {
2271 	caddr_t iobuf;
2272 	int filedesc;
2273 	fbint_t wss;
2274 	fbint_t iosize;
2275 	int ret;
2276 
2277 	iosize = avd_get_int(flowop->fo_iosize);
2278 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2279 	    &filedesc, iosize)) != FILEBENCH_OK)
2280 		return (ret);
2281 
2282 	/* XXX wss is not being used */
2283 
2284 	/* Measure time to write bytes */
2285 	flowop_beginop(threadflow, flowop);
2286 	(void) lseek64(filedesc, 0, SEEK_END);
2287 	ret = write(filedesc, iobuf, iosize);
2288 	if (ret != iosize) {
2289 		filebench_log(LOG_ERROR,
2290 		    "Failed to write %llu bytes on fd %d: %s",
2291 		    (u_longlong_t)iosize, filedesc, strerror(errno));
2292 		flowop_endop(threadflow, flowop, ret);
2293 		return (FILEBENCH_ERROR);
2294 	}
2295 	flowop_endop(threadflow, flowop, ret);
2296 
2297 	return (FILEBENCH_OK);
2298 }
2299 
2300 /*
2301  * Emulate a random size append to a file. Will append data
2302  * to a file chosen from a fileset if the flowop's fo_fileset
2303  * field specifies one or if its fdnumber is non zero. Otherwise
2304  * it will write to a fileobj file, if one exists. The flowop's
2305  * fo_wss parameter will be used to set the maximum file size
2306  * if it is non-zero, otherwise the filesetentry's fse_size
2307  * will be used.  A random transfer size (but at most fo_iosize
2308  * bytes) and a random memory offset are calculated. A logical
2309  * seek to the end of file is done, then writes of up to
2310  * FILE_ALLOC_BLOCK in size are done until the full transfer
2311  * size has been written. Writes are actually done from fo_buf,
2312  * rather than tf_mem as is done with flowoplib_write().
2313  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
2314  * files in the fileset, FILEBENCH_OK on success.
2315  */
2316 static int
2317 flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop)
2318 {
2319 	caddr_t iobuf;
2320 	uint64_t appendsize;
2321 	int filedesc;
2322 	fbint_t wss;
2323 	fbint_t iosize;
2324 	int ret = 0;
2325 
2326 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0) {
2327 		filebench_log(LOG_ERROR, "zero iosize for flowop %s",
2328 		    flowop->fo_name);
2329 		return (FILEBENCH_ERROR);
2330 	}
2331 
2332 	if (filebench_randomno64(&appendsize, iosize, 1LL, NULL) != 0)
2333 		return (FILEBENCH_ERROR);
2334 
2335 	/* skip if attempting zero length append */
2336 	if (appendsize == 0) {
2337 		flowop_beginop(threadflow, flowop);
2338 		flowop_endop(threadflow, flowop, 0LL);
2339 		return (FILEBENCH_OK);
2340 	}
2341 
2342 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2343 	    &filedesc, appendsize)) != FILEBENCH_OK)
2344 		return (ret);
2345 
2346 	/* XXX wss is not being used */
2347 
2348 	/* Measure time to write bytes */
2349 	flowop_beginop(threadflow, flowop);
2350 
2351 	(void) lseek64(filedesc, 0, SEEK_END);
2352 	ret = write(filedesc, iobuf, appendsize);
2353 	if (ret != appendsize) {
2354 		filebench_log(LOG_ERROR,
2355 		    "Failed to write %llu bytes on fd %d: %s",
2356 		    (u_longlong_t)appendsize, filedesc, strerror(errno));
2357 		flowop_endop(threadflow, flowop, 0);
2358 		return (FILEBENCH_ERROR);
2359 	}
2360 
2361 	flowop_endop(threadflow, flowop, appendsize);
2362 
2363 	return (FILEBENCH_OK);
2364 }
2365 
2366 typedef struct testrandvar_priv {
2367 	uint64_t sample_count;
2368 	double val_sum;
2369 	double sqr_sum;
2370 } testrandvar_priv_t;
2371 
2372 /*
2373  * flowop to calculate various statistics from the number stream
2374  * produced by a random variable. This allows verification that the
2375  * random distribution used to define the random variable is producing
2376  * the expected distribution of random numbers.
2377  */
2378 /* ARGSUSED */
2379 static int
2380 flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop)
2381 {
2382 	testrandvar_priv_t	*mystats;
2383 	double			value;
2384 
2385 	if ((mystats = (testrandvar_priv_t *)flowop->fo_private) == NULL) {
2386 		filebench_log(LOG_ERROR, "testrandvar not initialized\n");
2387 		filebench_shutdown(1);
2388 		return (-1);
2389 	}
2390 
2391 	value = avd_get_dbl(flowop->fo_value);
2392 
2393 	mystats->sample_count++;
2394 	mystats->val_sum += value;
2395 	mystats->sqr_sum += (value * value);
2396 
2397 	return (0);
2398 }
2399 
2400 /*
2401  * Initialize the private data area used to accumulate the statistics
2402  */
2403 static int
2404 flowoplib_testrandvar_init(flowop_t *flowop)
2405 {
2406 	testrandvar_priv_t	*mystats;
2407 
2408 	if ((mystats = (testrandvar_priv_t *)
2409 	    malloc(sizeof (testrandvar_priv_t))) == NULL) {
2410 		filebench_log(LOG_ERROR, "could not initialize testrandvar");
2411 		filebench_shutdown(1);
2412 		return (-1);
2413 	}
2414 
2415 	mystats->sample_count = 0;
2416 	mystats->val_sum = 0;
2417 	mystats->sqr_sum = 0;
2418 	flowop->fo_private = (void *)mystats;
2419 
2420 	(void) ipc_mutex_unlock(&flowop->fo_lock);
2421 	return (0);
2422 }
2423 
2424 /*
2425  * Print out the accumulated statistics, and free the private storage
2426  */
2427 static void
2428 flowoplib_testrandvar_destruct(flowop_t *flowop)
2429 {
2430 	testrandvar_priv_t	*mystats;
2431 	double mean, std_dev, dbl_count;
2432 
2433 	(void) ipc_mutex_lock(&flowop->fo_lock);
2434 	if ((mystats = (testrandvar_priv_t *)
2435 	    flowop->fo_private) == NULL) {
2436 		(void) ipc_mutex_unlock(&flowop->fo_lock);
2437 		return;
2438 	}
2439 
2440 	flowop->fo_private = NULL;
2441 	(void) ipc_mutex_unlock(&flowop->fo_lock);
2442 
2443 	dbl_count = (double)mystats->sample_count;
2444 	mean = mystats->val_sum / dbl_count;
2445 	std_dev = sqrt((mystats->sqr_sum / dbl_count) - (mean * mean)) / mean;
2446 
2447 	filebench_log(LOG_VERBOSE,
2448 	    "testrandvar: ops = %llu, mean = %8.2lf, stddev = %8.2lf",
2449 	    (u_longlong_t)mystats->sample_count, mean, std_dev);
2450 	free(mystats);
2451 }
2452 
2453 /*
2454  * Prints usage information for flowop operations.
2455  */
2456 void
2457 flowoplib_usage()
2458 {
2459 	(void) fprintf(stderr,
2460 	    "flowop [openfile|createfile] name=<name>,fileset=<fname>\n");
2461 	(void) fprintf(stderr,
2462 	    "                       [,fd=<file desc num>]\n");
2463 	(void) fprintf(stderr, "\n");
2464 	(void) fprintf(stderr,
2465 	    "flowop closefile name=<name>,fd=<file desc num>]\n");
2466 	(void) fprintf(stderr, "\n");
2467 	(void) fprintf(stderr, "flowop deletefile name=<name>\n");
2468 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2469 	(void) fprintf(stderr,
2470 	    "                       [,fd=<file desc num>]\n");
2471 	(void) fprintf(stderr, "\n");
2472 	(void) fprintf(stderr, "flowop statfile name=<name>\n");
2473 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2474 	(void) fprintf(stderr,
2475 	    "                       [,fd=<file desc num>]\n");
2476 	(void) fprintf(stderr, "\n");
2477 	(void) fprintf(stderr,
2478 	    "flowop fsync name=<name>,fd=<file desc num>]\n");
2479 	(void) fprintf(stderr, "\n");
2480 	(void) fprintf(stderr,
2481 	    "flowop fsyncset name=<name>,fileset=<fname>]\n");
2482 	(void) fprintf(stderr, "\n");
2483 	(void) fprintf(stderr, "flowop [write|read|aiowrite] name=<name>, \n");
2484 	(void) fprintf(stderr,
2485 	    "                       filename|fileset=<fname>,\n");
2486 	(void) fprintf(stderr, "                       iosize=<size>\n");
2487 	(void) fprintf(stderr, "                       [,directio]\n");
2488 	(void) fprintf(stderr, "                       [,dsync]\n");
2489 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2490 	(void) fprintf(stderr, "                       [,random]\n");
2491 	(void) fprintf(stderr, "                       [,opennext]\n");
2492 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
2493 	(void) fprintf(stderr,
2494 	    "flowop [appendfile|appendfilerand] name=<name>, \n");
2495 	(void) fprintf(stderr,
2496 	    "                       filename|fileset=<fname>,\n");
2497 	(void) fprintf(stderr, "                       iosize=<size>\n");
2498 	(void) fprintf(stderr, "                       [,dsync]\n");
2499 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2500 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
2501 	(void) fprintf(stderr,
2502 	    "flowop [readwholefile|writewholefile] name=<name>, \n");
2503 	(void) fprintf(stderr,
2504 	    "                       filename|fileset=<fname>,\n");
2505 	(void) fprintf(stderr, "                       iosize=<size>\n");
2506 	(void) fprintf(stderr, "                       [,dsync]\n");
2507 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2508 	(void) fprintf(stderr, "\n");
2509 	(void) fprintf(stderr, "flowop aiowait name=<name>,target="
2510 	    "<aiowrite-flowop>\n");
2511 	(void) fprintf(stderr, "\n");
2512 	(void) fprintf(stderr, "flowop sempost name=<name>,"
2513 	    "target=<semblock-flowop>,\n");
2514 	(void) fprintf(stderr,
2515 	    "                       value=<increment-to-post>\n");
2516 	(void) fprintf(stderr, "\n");
2517 	(void) fprintf(stderr, "flowop semblock name=<name>,value="
2518 	    "<decrement-to-receive>,\n");
2519 	(void) fprintf(stderr, "                       highwater="
2520 	    "<inbound-queue-max>\n");
2521 	(void) fprintf(stderr, "\n");
2522 	(void) fprintf(stderr, "flowop block name=<name>\n");
2523 	(void) fprintf(stderr, "\n");
2524 	(void) fprintf(stderr,
2525 	    "flowop wakeup name=<name>,target=<block-flowop>,\n");
2526 	(void) fprintf(stderr, "\n");
2527 	(void) fprintf(stderr,
2528 	    "flowop hog name=<name>,value=<number-of-mem-ops>\n");
2529 	(void) fprintf(stderr,
2530 	    "flowop delay name=<name>,value=<number-of-seconds>\n");
2531 	(void) fprintf(stderr, "\n");
2532 	(void) fprintf(stderr, "flowop eventlimit name=<name>\n");
2533 	(void) fprintf(stderr, "flowop bwlimit name=<name>,value=<mb/s>\n");
2534 	(void) fprintf(stderr, "flowop iopslimit name=<name>,value=<iop/s>\n");
2535 	(void) fprintf(stderr,
2536 	    "flowop finishoncount name=<name>,value=<ops/s>\n");
2537 	(void) fprintf(stderr,
2538 	    "flowop finishonbytes name=<name>,value=<bytes>\n");
2539 	(void) fprintf(stderr, "\n");
2540 	(void) fprintf(stderr, "\n");
2541 }
2542