xref: /onnv-gate/usr/src/cmd/filebench/common/flowop_library.c (revision 6391:f317d2de8920)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include "config.h"
29 
30 #include <sys/types.h>
31 #ifdef HAVE_SYS_ASYNCH_H
32 #include <sys/asynch.h>
33 #endif
34 #include <sys/ipc.h>
35 #include <sys/sem.h>
36 #include <sys/errno.h>
37 #include <sys/time.h>
38 #include <inttypes.h>
39 #include <fcntl.h>
40 #include <math.h>
41 
42 #ifdef HAVE_UTILITY_H
43 #include <utility.h>
44 #endif /* HAVE_UTILITY_H */
45 
46 #ifdef HAVE_AIO
47 #include <aio.h>
48 #endif /* HAVE_AIO */
49 
50 #ifdef HAVE_LIBAIO_H
51 #include <libaio.h>
52 #endif /* HAVE_LIBAIO_H */
53 
54 #ifdef HAVE_SYS_ASYNC_H
55 #include <sys/asynch.h>
56 #endif /* HAVE_SYS_ASYNC_H */
57 
58 #ifdef HAVE_AIO_H
59 #include <aio.h>
60 #endif /* HAVE_AIO_H */
61 
62 #ifndef HAVE_UINT_T
63 #define	uint_t unsigned int
64 #endif /* HAVE_UINT_T */
65 
66 #ifndef HAVE_AIOCB64_T
67 #define	aiocb64 aiocb
68 #endif /* HAVE_AIOCB64_T */
69 
70 #ifndef HAVE_SYSV_SEM
71 #include <semaphore.h>
72 #endif /* HAVE_SYSV_SEM */
73 
74 #include "filebench.h"
75 #include "flowop.h"
76 #include "fileset.h"
77 #include "fb_random.h"
78 
79 /*
80  * These routines implement the flowops from the f language. Each
81  * flowop has has a name such as "read", and a set of function pointers
82  * to call for initialization, execution and destruction of the flowop.
83  * The table flowoplib_funcs[] contains a flowoplib struct for each
84  * implemented flowop. Most flowops use a generic initialization function
85  * and all currently use a generic destruction function. All flowop
86  * functions referenced from the table are in this file, though, of
87  * course, they often call functions from other files.
88  *
89  * The flowop_init() routine uses the flowoplib_funcs[] table to
90  * create an initial set of "instance 0" flowops, one for each type of
91  * flowop, from which all other flowops are derived. These "instance 0"
92  * flowops are initialized with information from the table including
93  * pointers for their fo_init, fo_func and fo_destroy functions. When
94  * a flowop definition is encountered in an f language script, the
95  * "type" of flowop, such as "read" is used to search for the
96  * "instance 0" flowop named "read", then a new flowop is allocated
97  * which inherits its function pointers and other initial properties
98  * from the instance 0 flowop, and is given a new name as specified
99  * by the "name=" attribute.
100  */
101 
102 static int flowoplib_init_generic(flowop_t *flowop);
103 static void flowoplib_destruct_generic(flowop_t *flowop);
104 static void flowoplib_destruct_noop(flowop_t *flowop);
105 static int flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop);
106 static int flowoplib_write(threadflow_t *threadflow, flowop_t *flowop);
107 #ifdef HAVE_AIO
108 static int flowoplib_aiowrite(threadflow_t *threadflow, flowop_t *flowop);
109 static int flowoplib_aiowait(threadflow_t *threadflow, flowop_t *flowop);
110 #endif
111 static int flowoplib_read(threadflow_t *threadflow, flowop_t *flowop);
112 static int flowoplib_block_init(flowop_t *flowop);
113 static int flowoplib_block(threadflow_t *threadflow, flowop_t *flowop);
114 static int flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop);
115 static int flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop);
116 static int flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop);
117 static int flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop);
118 static int flowoplib_sempost_init(flowop_t *flowop);
119 static int flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop);
120 static int flowoplib_semblock_init(flowop_t *flowop);
121 static void flowoplib_semblock_destruct(flowop_t *flowop);
122 static int flowoplib_eventlimit(threadflow_t *, flowop_t *flowop);
123 static int flowoplib_bwlimit(threadflow_t *, flowop_t *flowop);
124 static int flowoplib_iopslimit(threadflow_t *, flowop_t *flowop);
125 static int flowoplib_opslimit(threadflow_t *, flowop_t *flowop);
126 static int flowoplib_openfile(threadflow_t *, flowop_t *flowop);
127 static int flowoplib_openfile_common(threadflow_t *, flowop_t *flowop, int fd);
128 static int flowoplib_createfile(threadflow_t *, flowop_t *flowop);
129 static int flowoplib_closefile(threadflow_t *, flowop_t *flowop);
130 static int flowoplib_fsync(threadflow_t *, flowop_t *flowop);
131 static int flowoplib_readwholefile(threadflow_t *, flowop_t *flowop);
132 static int flowoplib_writewholefile(threadflow_t *, flowop_t *flowop);
133 static int flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop);
134 static int flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop);
135 static int flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop);
136 static int flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop);
137 static int flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop);
138 static int flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop);
139 static int flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop);
140 static int flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop);
141 static int flowoplib_testrandvar_init(flowop_t *flowop);
142 static void flowoplib_testrandvar_destruct(flowop_t *flowop);
143 
144 typedef struct flowoplib {
145 	int	fl_type;
146 	int	fl_attrs;
147 	char	*fl_name;
148 	int	(*fl_init)();
149 	int	(*fl_func)();
150 	void	(*fl_destruct)();
151 } flowoplib_t;
152 
153 static flowoplib_t flowoplib_funcs[] = {
154 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "write", flowoplib_init_generic,
155 	flowoplib_write, flowoplib_destruct_generic,
156 	FLOW_TYPE_IO, FLOW_ATTR_READ, "read", flowoplib_init_generic,
157 	flowoplib_read, flowoplib_destruct_generic,
158 #ifdef HAVE_AIO
159 	FLOW_TYPE_AIO, FLOW_ATTR_WRITE, "aiowrite", flowoplib_init_generic,
160 	flowoplib_aiowrite, flowoplib_destruct_generic,
161 	FLOW_TYPE_AIO, 0, "aiowait", flowoplib_init_generic,
162 	flowoplib_aiowait, flowoplib_destruct_generic,
163 #endif
164 	FLOW_TYPE_SYNC, 0, "block", flowoplib_block_init,
165 	flowoplib_block, flowoplib_destruct_generic,
166 	FLOW_TYPE_SYNC, 0, "wakeup", flowoplib_init_generic,
167 	flowoplib_wakeup, flowoplib_destruct_generic,
168 	FLOW_TYPE_SYNC, 0, "semblock", flowoplib_semblock_init,
169 	flowoplib_semblock, flowoplib_semblock_destruct,
170 	FLOW_TYPE_SYNC, 0, "sempost", flowoplib_sempost_init,
171 	flowoplib_sempost, flowoplib_destruct_noop,
172 	FLOW_TYPE_OTHER, 0, "hog", flowoplib_init_generic,
173 	flowoplib_hog, flowoplib_destruct_generic,
174 	FLOW_TYPE_OTHER, 0, "delay", flowoplib_init_generic,
175 	flowoplib_delay, flowoplib_destruct_generic,
176 	FLOW_TYPE_OTHER, 0, "eventlimit", flowoplib_init_generic,
177 	flowoplib_eventlimit, flowoplib_destruct_generic,
178 	FLOW_TYPE_OTHER, 0, "bwlimit", flowoplib_init_generic,
179 	flowoplib_bwlimit, flowoplib_destruct_generic,
180 	FLOW_TYPE_OTHER, 0, "iopslimit", flowoplib_init_generic,
181 	flowoplib_iopslimit, flowoplib_destruct_generic,
182 	FLOW_TYPE_OTHER, 0, "opslimit", flowoplib_init_generic,
183 	flowoplib_opslimit, flowoplib_destruct_generic,
184 	FLOW_TYPE_OTHER, 0, "finishoncount", flowoplib_init_generic,
185 	flowoplib_finishoncount, flowoplib_destruct_generic,
186 	FLOW_TYPE_OTHER, 0, "finishonbytes", flowoplib_init_generic,
187 	flowoplib_finishonbytes, flowoplib_destruct_generic,
188 	FLOW_TYPE_IO, 0, "openfile", flowoplib_init_generic,
189 	flowoplib_openfile, flowoplib_destruct_generic,
190 	FLOW_TYPE_IO, 0, "createfile", flowoplib_init_generic,
191 	flowoplib_createfile, flowoplib_destruct_generic,
192 	FLOW_TYPE_IO, 0, "closefile", flowoplib_init_generic,
193 	flowoplib_closefile, flowoplib_destruct_generic,
194 	FLOW_TYPE_IO, 0, "fsync", flowoplib_init_generic,
195 	flowoplib_fsync, flowoplib_destruct_generic,
196 	FLOW_TYPE_IO, 0, "fsyncset", flowoplib_init_generic,
197 	flowoplib_fsyncset, flowoplib_destruct_generic,
198 	FLOW_TYPE_IO, 0, "statfile", flowoplib_init_generic,
199 	flowoplib_statfile, flowoplib_destruct_generic,
200 	FLOW_TYPE_IO, FLOW_ATTR_READ, "readwholefile", flowoplib_init_generic,
201 	flowoplib_readwholefile, flowoplib_destruct_generic,
202 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfile", flowoplib_init_generic,
203 	flowoplib_appendfile, flowoplib_destruct_generic,
204 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfilerand", flowoplib_init_generic,
205 	flowoplib_appendfilerand, flowoplib_destruct_generic,
206 	FLOW_TYPE_IO, 0, "deletefile", flowoplib_init_generic,
207 	flowoplib_deletefile, flowoplib_destruct_generic,
208 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "writewholefile", flowoplib_init_generic,
209 	flowoplib_writewholefile, flowoplib_destruct_generic,
210 	/* routine to calculate mean and stddev for output from a randvar */
211 	FLOW_TYPE_OTHER, 0, "testrandvar", flowoplib_testrandvar_init,
212 	flowoplib_testrandvar, flowoplib_testrandvar_destruct
213 };
214 
215 /*
216  * Loops through the master list of flowops defined in this
217  * module, and creates and initializes a flowop for each one
218  * by calling flowop_define. As a side effect of calling
219  * flowop define, the created flowops are placed on the
220  * master flowop list. All created flowops are set to
221  * instance "0".
222  */
223 void
224 flowoplib_init()
225 {
226 	int nops = sizeof (flowoplib_funcs) / sizeof (flowoplib_t);
227 	int i;
228 
229 	for (i = 0; i < nops; i++) {
230 		flowop_t *flowop;
231 		flowoplib_t *fl;
232 
233 		fl = &flowoplib_funcs[i];
234 
235 		if ((flowop = flowop_define(NULL,
236 		    fl->fl_name, NULL, 0, fl->fl_type)) == 0) {
237 			filebench_log(LOG_ERROR,
238 			    "failed to create flowop %s\n",
239 			    fl->fl_name);
240 			filebench_shutdown(1);
241 		}
242 
243 		flowop->fo_func = fl->fl_func;
244 		flowop->fo_init = fl->fl_init;
245 		flowop->fo_destruct = fl->fl_destruct;
246 		flowop->fo_attrs = fl->fl_attrs;
247 	}
248 }
249 
250 static int
251 flowoplib_init_generic(flowop_t *flowop)
252 {
253 	(void) ipc_mutex_unlock(&flowop->fo_lock);
254 	return (FILEBENCH_OK);
255 }
256 
257 static void
258 flowoplib_destruct_generic(flowop_t *flowop)
259 {
260 	char *buf;
261 
262 	/* release any local resources held by the flowop */
263 	(void) ipc_mutex_lock(&flowop->fo_lock);
264 	buf = flowop->fo_buf;
265 	flowop->fo_buf = NULL;
266 	(void) ipc_mutex_unlock(&flowop->fo_lock);
267 
268 	if (buf)
269 		free(buf);
270 }
271 
272 /*
273  * Special total noop destruct
274  */
275 /* ARGSUSED */
276 static void
277 flowoplib_destruct_noop(flowop_t *flowop)
278 {
279 }
280 
281 /*
282  * Generates a file attribute from flags in the supplied flowop.
283  * Sets FLOW_ATTR_DIRECTIO and/or FLOW_ATTR_DSYNC as needed.
284  */
285 static int
286 flowoplib_fileattrs(flowop_t *flowop)
287 {
288 	int attrs = 0;
289 
290 	if (avd_get_bool(flowop->fo_directio))
291 		attrs |= FLOW_ATTR_DIRECTIO;
292 
293 	if (avd_get_bool(flowop->fo_dsync))
294 		attrs |= FLOW_ATTR_DSYNC;
295 
296 	return (attrs);
297 }
298 
299 /*
300  * Searches for a file descriptor. Tries the flowop's
301  * fo_fdnumber first and returns with it if it has been
302  * explicitly set (greater than 0). It next checks to
303  * see if a rotating file descriptor policy is in effect,
304  * and if not returns the fdnumber regardless of what
305  * it is. (note that if it is 0, it just selects to the
306  * default file descriptor in the threadflow's tf_fd
307  * array). If the rotating fd policy is in effect, it
308  * cycles from the end of the tf_fd array to one location
309  * beyond the maximum needed by the number of entries in
310  * the associated fileset on each invocation, then starts
311  * over from the end.
312  *
313  * The routine returns an index into the threadflow's
314  * tf_fd table where the actual file descriptor will be
315  * found. Note: the calling routine must not call this
316  * routine if the flowop does not have a fileset, and the
317  * flowop's fo_fdnumber is zero and fo_rotatefd is
318  * asserted, or an addressing fault may occur.
319  */
320 static int
321 flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop)
322 {
323 	fbint_t	entries;
324 	int fdnumber = flowop->fo_fdnumber;
325 
326 	/* If the script sets the fd explicitly */
327 	if (fdnumber > 0)
328 		return (fdnumber);
329 
330 	/* If the flowop defaults to persistent fd */
331 	if (!avd_get_bool(flowop->fo_rotatefd))
332 		return (fdnumber);
333 
334 	if (flowop->fo_fileset == NULL) {
335 		filebench_log(LOG_ERROR, "flowop NULL file");
336 		return (FILEBENCH_ERROR);
337 	}
338 
339 	entries = flowop->fo_fileset->fs_constentries;
340 
341 	/* Rotate the fd on each flowop invocation */
342 	if (entries > (THREADFLOW_MAXFD / 2)) {
343 		filebench_log(LOG_ERROR, "Out of file descriptors in flowop %s"
344 		    " (too many files : %llu",
345 		    flowop->fo_name, (u_longlong_t)entries);
346 		return (FILEBENCH_ERROR);
347 	}
348 
349 	/* First time around */
350 	if (threadflow->tf_fdrotor == 0)
351 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
352 
353 	/* One fd for every file in the set */
354 	if (entries == (THREADFLOW_MAXFD - threadflow->tf_fdrotor))
355 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
356 
357 
358 	threadflow->tf_fdrotor--;
359 	filebench_log(LOG_DEBUG_IMPL, "selected fd = %d",
360 	    threadflow->tf_fdrotor);
361 	return (threadflow->tf_fdrotor);
362 }
363 
364 /*
365  * Determines the file descriptor to use, and attempts to open
366  * the file if it is not already open. Also determines the wss
367  * value. Returns FILEBENCH_ERROR on errors, FILESET_NORSC if
368  * if flowop_openfile_common couldn't obtain an appropriate file
369  * from a the fileset, and FILEBENCH_OK otherwise.
370  */
371 static int
372 flowoplib_filesetup(threadflow_t *threadflow, flowop_t *flowop,
373     fbint_t *wssp, int *filedescp)
374 {
375 	int fd = flowoplib_fdnum(threadflow, flowop);
376 
377 	if (fd == -1)
378 		return (FILEBENCH_ERROR);
379 
380 	if (threadflow->tf_fd[fd] == 0) {
381 		int ret;
382 
383 		if ((ret = flowoplib_openfile_common(
384 		    threadflow, flowop, fd)) != FILEBENCH_OK)
385 			return (ret);
386 
387 		if (threadflow->tf_fse[fd]) {
388 			filebench_log(LOG_DEBUG_IMPL, "opened file %s",
389 			    threadflow->tf_fse[fd]->fse_path);
390 		} else {
391 			filebench_log(LOG_DEBUG_IMPL,
392 			    "opened device %s/%s",
393 			    avd_get_str(flowop->fo_fileset->fs_path),
394 			    avd_get_str(flowop->fo_fileset->fs_name));
395 		}
396 	}
397 
398 	*filedescp = threadflow->tf_fd[fd];
399 
400 	if ((*wssp = flowop->fo_constwss) == 0) {
401 		if (threadflow->tf_fse[fd])
402 			*wssp = threadflow->tf_fse[fd]->fse_size;
403 		else
404 			*wssp = avd_get_int(flowop->fo_fileset->fs_size);
405 	}
406 
407 	return (FILEBENCH_OK);
408 }
409 
410 /*
411  * Determines the io buffer or random offset into tf_mem for
412  * the IO operation. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
413  */
414 static int
415 flowoplib_iobufsetup(threadflow_t *threadflow, flowop_t *flowop,
416     caddr_t *iobufp, fbint_t iosize)
417 {
418 	long memsize;
419 	size_t memoffset;
420 
421 	if (iosize == 0) {
422 		filebench_log(LOG_ERROR, "zero iosize for thread %s",
423 		    flowop->fo_name);
424 		return (FILEBENCH_ERROR);
425 	}
426 
427 	if ((memsize = threadflow->tf_constmemsize) != 0) {
428 
429 		/* use tf_mem for I/O with random offset */
430 		if (filebench_randomno(&memoffset,
431 		    memsize, iosize, NULL) == -1) {
432 			filebench_log(LOG_ERROR,
433 			    "tf_memsize smaller than IO size for thread %s",
434 			    flowop->fo_name);
435 			return (FILEBENCH_ERROR);
436 		}
437 		*iobufp = threadflow->tf_mem + memoffset;
438 
439 	} else {
440 		/* use private I/O buffer */
441 		if ((flowop->fo_buf != NULL) &&
442 		    (flowop->fo_buf_size < iosize)) {
443 			/* too small, so free up and re-allocate */
444 			free(flowop->fo_buf);
445 			flowop->fo_buf = NULL;
446 		}
447 
448 		/*
449 		 * Allocate memory for the  buffer. The memory is freed
450 		 * by flowop_destruct_generic() or by this routine if more
451 		 * memory is needed for the buffer.
452 		 */
453 		if ((flowop->fo_buf == NULL) && ((flowop->fo_buf
454 		    = (char *)malloc(iosize)) == NULL))
455 			return (FILEBENCH_ERROR);
456 
457 		flowop->fo_buf_size = iosize;
458 		*iobufp = flowop->fo_buf;
459 	}
460 	return (FILEBENCH_OK);
461 }
462 
463 /*
464  * Determines the file descriptor to use, opens it if necessary, the
465  * io buffer or random offset into tf_mem for IO operation and the wss
466  * value. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
467  */
468 static int
469 flowoplib_iosetup(threadflow_t *threadflow, flowop_t *flowop,
470     fbint_t *wssp, caddr_t *iobufp, int *filedescp, fbint_t iosize)
471 {
472 	int ret;
473 
474 	if ((ret = flowoplib_filesetup(threadflow, flowop, wssp, filedescp)) !=
475 	    FILEBENCH_OK)
476 		return (ret);
477 
478 	if ((ret = flowoplib_iobufsetup(threadflow, flowop, iobufp, iosize)) !=
479 	    FILEBENCH_OK)
480 		return (ret);
481 
482 	return (FILEBENCH_OK);
483 }
484 
485 /*
486  * Emulate posix read / pread. If the flowop has a fileset,
487  * a file descriptor number index is fetched, otherwise a
488  * supplied fileobj file is used. In either case the specified
489  * file will be opened if not already open. If the flowop has
490  * neither a fileset or fileobj, an error is logged and FILEBENCH_ERROR
491  * returned.
492  *
493  * The actual read is done to a random offset in the
494  * threadflow's thread memory (tf_mem), with a size set by
495  * fo_iosize and at either a random disk offset within the
496  * working set size, or at the next sequential location. If
497  * any errors are encountered, FILEBENCH_ERROR is returned,
498  * if no appropriate file can be obtained from the fileset then
499  * FILEBENCH_NORSC is returned, otherise FILEBENCH_OK is returned.
500  */
501 static int
502 flowoplib_read(threadflow_t *threadflow, flowop_t *flowop)
503 {
504 	caddr_t iobuf;
505 	fbint_t wss;
506 	fbint_t iosize;
507 	int filedesc;
508 	int ret;
509 
510 
511 	iosize = avd_get_int(flowop->fo_iosize);
512 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
513 	    &filedesc, iosize)) != FILEBENCH_OK)
514 		return (ret);
515 
516 	if (avd_get_bool(flowop->fo_random)) {
517 		uint64_t fileoffset;
518 
519 		if (filebench_randomno64(&fileoffset,
520 		    wss, iosize, NULL) == -1) {
521 			filebench_log(LOG_ERROR,
522 			    "file size smaller than IO size for thread %s",
523 			    flowop->fo_name);
524 			return (FILEBENCH_ERROR);
525 		}
526 
527 		(void) flowop_beginop(threadflow, flowop);
528 		if ((ret = pread64(filedesc, iobuf,
529 		    iosize, (off64_t)fileoffset)) == -1) {
530 			(void) flowop_endop(threadflow, flowop, 0);
531 			filebench_log(LOG_ERROR,
532 			    "read file %s failed, offset %llu "
533 			    "io buffer %zd: %s",
534 			    avd_get_str(flowop->fo_fileset->fs_name),
535 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
536 			flowop_endop(threadflow, flowop, 0);
537 			return (FILEBENCH_ERROR);
538 		}
539 		(void) flowop_endop(threadflow, flowop, ret);
540 
541 		if ((ret == 0))
542 			(void) lseek64(filedesc, 0, SEEK_SET);
543 
544 	} else {
545 		(void) flowop_beginop(threadflow, flowop);
546 		if ((ret = read(filedesc, iobuf, iosize)) == -1) {
547 			(void) flowop_endop(threadflow, flowop, 0);
548 			filebench_log(LOG_ERROR,
549 			    "read file %s failed, io buffer %zd: %s",
550 			    avd_get_str(flowop->fo_fileset->fs_name),
551 			    iobuf, strerror(errno));
552 			(void) flowop_endop(threadflow, flowop, 0);
553 			return (FILEBENCH_ERROR);
554 		}
555 		(void) flowop_endop(threadflow, flowop, ret);
556 
557 		if ((ret == 0))
558 			(void) lseek64(filedesc, 0, SEEK_SET);
559 	}
560 
561 	return (FILEBENCH_OK);
562 }
563 
564 #ifdef HAVE_AIO
565 
566 /*
567  * Asynchronous write section. An Asynchronous IO element
568  * (aiolist_t) is used to associate the asynchronous write request with
569  * its subsequent completion. This element includes a aiocb64 struct
570  * that is used by posix aio_xxx calls to track the asynchronous writes.
571  * The flowops aiowrite and aiowait result in calls to these posix
572  * aio_xxx system routines to do the actual asynchronous write IO
573  * operations.
574  */
575 
576 
577 /*
578  * Allocates an asynchronous I/O list (aio, of type
579  * aiolist_t) element. Adds it to the flowop thread's
580  * threadflow aio list. Returns a pointer to the element.
581  */
582 static aiolist_t *
583 aio_allocate(flowop_t *flowop)
584 {
585 	aiolist_t *aiolist;
586 
587 	if ((aiolist = malloc(sizeof (aiolist_t))) == NULL) {
588 		filebench_log(LOG_ERROR, "malloc aiolist failed");
589 		filebench_shutdown(1);
590 	}
591 
592 	/* Add to list */
593 	if (flowop->fo_thread->tf_aiolist == NULL) {
594 		flowop->fo_thread->tf_aiolist = aiolist;
595 		aiolist->al_next = NULL;
596 	} else {
597 		aiolist->al_next = flowop->fo_thread->tf_aiolist;
598 		flowop->fo_thread->tf_aiolist = aiolist;
599 	}
600 	return (aiolist);
601 }
602 
603 /*
604  * Searches for the aiolist element that has a matching
605  * completion block, aiocb. If none found returns FILEBENCH_ERROR. If
606  * found, removes the aiolist element from flowop thread's
607  * list and returns FILEBENCH_OK.
608  */
609 static int
610 aio_deallocate(flowop_t *flowop, struct aiocb64 *aiocb)
611 {
612 	aiolist_t *aiolist = flowop->fo_thread->tf_aiolist;
613 	aiolist_t *previous = NULL;
614 	aiolist_t *match = NULL;
615 
616 	if (aiocb == NULL) {
617 		filebench_log(LOG_ERROR, "null aiocb deallocate");
618 		return (FILEBENCH_OK);
619 	}
620 
621 	while (aiolist) {
622 		if (aiocb == &(aiolist->al_aiocb)) {
623 			match = aiolist;
624 			break;
625 		}
626 		previous = aiolist;
627 		aiolist = aiolist->al_next;
628 	}
629 
630 	if (match == NULL)
631 		return (FILEBENCH_ERROR);
632 
633 	/* Remove from the list */
634 	if (previous)
635 		previous->al_next = match->al_next;
636 	else
637 		flowop->fo_thread->tf_aiolist = match->al_next;
638 
639 	return (FILEBENCH_OK);
640 }
641 
642 /*
643  * Emulate posix aiowrite(). Determines which file to use,
644  * either one file of a fileset, or the file associated
645  * with a fileobj, allocates and fills an aiolist_t element
646  * for the write, and issues the asynchronous write. This
647  * operation is only valid for random IO, and returns an
648  * error if the flowop is set for sequential IO. Returns
649  * FILEBENCH_OK on success, FILEBENCH_NORSC if iosetup can't
650  * obtain a file to open, and FILEBENCH_ERROR on any
651  * encountered error.
652  */
653 static int
654 flowoplib_aiowrite(threadflow_t *threadflow, flowop_t *flowop)
655 {
656 	caddr_t iobuf;
657 	fbint_t wss;
658 	fbint_t iosize;
659 	int filedesc;
660 	int ret;
661 
662 	iosize = avd_get_int(flowop->fo_iosize);
663 
664 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
665 	    &filedesc, iosize)) != FILEBENCH_OK)
666 		return (ret);
667 
668 	if (avd_get_bool(flowop->fo_random)) {
669 		uint64_t fileoffset;
670 		struct aiocb64 *aiocb;
671 		aiolist_t *aiolist;
672 
673 		if (filebench_randomno64(&fileoffset,
674 		    wss, iosize, NULL) == -1) {
675 			filebench_log(LOG_ERROR,
676 			    "file size smaller than IO size for thread %s",
677 			    flowop->fo_name);
678 			return (FILEBENCH_ERROR);
679 		}
680 
681 		aiolist = aio_allocate(flowop);
682 		aiolist->al_type = AL_WRITE;
683 		aiocb = &aiolist->al_aiocb;
684 
685 		aiocb->aio_fildes = filedesc;
686 		aiocb->aio_buf = iobuf;
687 		aiocb->aio_nbytes = (size_t)iosize;
688 		aiocb->aio_offset = (off64_t)fileoffset;
689 		aiocb->aio_reqprio = 0;
690 
691 		filebench_log(LOG_DEBUG_IMPL,
692 		    "aio fd=%d, bytes=%llu, offset=%llu",
693 		    filedesc, (u_longlong_t)iosize, (u_longlong_t)fileoffset);
694 
695 		flowop_beginop(threadflow, flowop);
696 		if (aio_write64(aiocb) < 0) {
697 			filebench_log(LOG_ERROR, "aiowrite failed: %s",
698 			    strerror(errno));
699 			filebench_shutdown(1);
700 		}
701 		flowop_endop(threadflow, flowop, iosize);
702 	} else {
703 		return (FILEBENCH_ERROR);
704 	}
705 
706 	return (FILEBENCH_OK);
707 }
708 
709 
710 
711 #define	MAXREAP 4096
712 
713 /*
714  * Emulate posix aiowait(). Waits for the completion of half the
715  * outstanding asynchronous IOs, or a single IO, which ever is
716  * larger. The routine will return after a sufficient number of
717  * completed calls issued by any thread in the procflow have
718  * completed, or a 1 second timout elapses. All completed
719  * IO operations are deleted from the thread's aiolist.
720  */
721 static int
722 flowoplib_aiowait(threadflow_t *threadflow, flowop_t *flowop)
723 {
724 	struct aiocb64 **worklist;
725 	aiolist_t *aio = flowop->fo_thread->tf_aiolist;
726 	int uncompleted = 0;
727 
728 	worklist = calloc(MAXREAP, sizeof (struct aiocb64 *));
729 
730 	/* Count the list of pending aios */
731 	while (aio) {
732 		uncompleted++;
733 		aio = aio->al_next;
734 	}
735 
736 	do {
737 		uint_t ncompleted = 0;
738 		uint_t todo;
739 		struct timespec timeout;
740 		int inprogress;
741 		int i;
742 
743 		/* Wait for half of the outstanding requests */
744 		timeout.tv_sec = 1;
745 		timeout.tv_nsec = 0;
746 
747 		if (uncompleted > MAXREAP)
748 			todo = MAXREAP;
749 		else
750 			todo = uncompleted / 2;
751 
752 		if (todo == 0)
753 			todo = 1;
754 
755 		flowop_beginop(threadflow, flowop);
756 
757 #ifdef HAVE_AIOWAITN
758 		if ((aio_waitn64((struct aiocb64 **)worklist,
759 		    MAXREAP, &todo, &timeout) == -1) &&
760 		    errno && (errno != ETIME)) {
761 			filebench_log(LOG_ERROR,
762 			    "aiowait failed: %s, outstanding = %d, "
763 			    "ncompleted = %d ",
764 			    strerror(errno), uncompleted, todo);
765 		}
766 
767 		ncompleted = todo;
768 		/* Take the  completed I/Os from the list */
769 		inprogress = 0;
770 		for (i = 0; i < ncompleted; i++) {
771 			if ((aio_return64(worklist[i]) == -1) &&
772 			    (errno == EINPROGRESS)) {
773 				inprogress++;
774 				continue;
775 			}
776 			if (aio_deallocate(flowop, worklist[i]) < 0) {
777 				filebench_log(LOG_ERROR, "Could not remove "
778 				    "aio from list ");
779 				flowop_endop(threadflow, flowop, 0);
780 				return (FILEBENCH_ERROR);
781 			}
782 		}
783 
784 		uncompleted -= ncompleted;
785 		uncompleted += inprogress;
786 
787 #else
788 
789 		for (ncompleted = 0, inprogress = 0,
790 		    aio = flowop->fo_thread->tf_aiolist;
791 		    ncompleted < todo, aio != NULL; aio = aio->al_next) {
792 
793 			result = aio_error64(&aio->al_aiocb);
794 
795 			if (result == EINPROGRESS) {
796 				inprogress++;
797 				continue;
798 			}
799 
800 			if ((aio_return64(&aio->al_aiocb) == -1) || result) {
801 				filebench_log(LOG_ERROR, "aio failed: %s",
802 				    strerror(result));
803 				continue;
804 			}
805 
806 			ncompleted++;
807 
808 			if (aio_deallocate(flowop, &aio->al_aiocb) < 0) {
809 				filebench_log(LOG_ERROR, "Could not remove aio "
810 				    "from list ");
811 				flowop_endop(threadflow, flowop, 0);
812 				return (FILEBENCH_ERROR);
813 			}
814 		}
815 
816 		uncompleted -= ncompleted;
817 
818 #endif
819 		filebench_log(LOG_DEBUG_SCRIPT,
820 		    "aio2 completed %d ios, uncompleted = %d, inprogress = %d",
821 		    ncompleted, uncompleted, inprogress);
822 
823 	} while (uncompleted > MAXREAP);
824 
825 	flowop_endop(threadflow, flowop, 0);
826 
827 	free(worklist);
828 
829 	return (FILEBENCH_OK);
830 }
831 
832 #endif /* HAVE_AIO */
833 
834 /*
835  * Initializes a "flowop_block" flowop. Specifically, it
836  * initializes the flowop's fo_cv and unlocks the fo_lock.
837  */
838 static int
839 flowoplib_block_init(flowop_t *flowop)
840 {
841 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d block init address %zx",
842 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
843 	(void) pthread_cond_init(&flowop->fo_cv, ipc_condattr());
844 	(void) ipc_mutex_unlock(&flowop->fo_lock);
845 
846 	return (FILEBENCH_OK);
847 }
848 
849 /*
850  * Blocks the threadflow until woken up by flowoplib_wakeup.
851  * The routine blocks on the flowop's fo_cv condition variable.
852  */
853 static int
854 flowoplib_block(threadflow_t *threadflow, flowop_t *flowop)
855 {
856 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d blocking at address %zx",
857 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
858 	(void) ipc_mutex_lock(&flowop->fo_lock);
859 
860 	flowop_beginop(threadflow, flowop);
861 	(void) pthread_cond_wait(&flowop->fo_cv, &flowop->fo_lock);
862 	flowop_endop(threadflow, flowop, 0);
863 
864 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
865 	    flowop->fo_name, flowop->fo_instance);
866 
867 	(void) ipc_mutex_unlock(&flowop->fo_lock);
868 
869 	return (FILEBENCH_OK);
870 }
871 
872 /*
873  * Wakes up one or more target blocking flowops.
874  * Sends broadcasts on the fo_cv condition variables of all
875  * flowops on the target list, except those that are
876  * FLOW_MASTER flowops. The target list consists of all
877  * flowops whose name matches this flowop's "fo_targetname"
878  * attribute. The target list is generated on the first
879  * invocation, and the run will be shutdown if no targets
880  * are found. Otherwise the routine always returns FILEBENCH_OK.
881  */
882 static int
883 flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop)
884 {
885 	flowop_t *target;
886 
887 	/* if this is the first wakeup, create the wakeup list */
888 	if (flowop->fo_targets == NULL) {
889 		flowop_t *result = flowop_find(flowop->fo_targetname);
890 
891 		flowop->fo_targets = result;
892 		if (result == NULL) {
893 			filebench_log(LOG_ERROR,
894 			    "wakeup: could not find op %s for thread %s",
895 			    flowop->fo_targetname,
896 			    threadflow->tf_name);
897 			filebench_shutdown(1);
898 		}
899 		while (result) {
900 			result->fo_targetnext =
901 			    result->fo_resultnext;
902 			result = result->fo_resultnext;
903 		}
904 	}
905 
906 	target = flowop->fo_targets;
907 
908 	/* wakeup the targets */
909 	while (target) {
910 		if (target->fo_instance == FLOW_MASTER) {
911 			target = target->fo_targetnext;
912 			continue;
913 		}
914 		filebench_log(LOG_DEBUG_IMPL,
915 		    "wakeup flow %s-%d at address %zx",
916 		    target->fo_name,
917 		    target->fo_instance,
918 		    &target->fo_cv);
919 
920 		flowop_beginop(threadflow, flowop);
921 		(void) ipc_mutex_lock(&target->fo_lock);
922 		(void) pthread_cond_broadcast(&target->fo_cv);
923 		(void) ipc_mutex_unlock(&target->fo_lock);
924 		flowop_endop(threadflow, flowop, 0);
925 
926 		target = target->fo_targetnext;
927 	}
928 
929 	return (FILEBENCH_OK);
930 }
931 
932 /*
933  * "think time" routines. the "hog" routine consumes cpu cycles as
934  * it "thinks", while the "delay" flowop simply calls sleep() to delay
935  * for a given number of seconds without consuming cpu cycles.
936  */
937 
938 
939 /*
940  * Consumes CPU cycles and memory bandwidth by looping for
941  * flowop->fo_value times. With each loop sets memory location
942  * threadflow->tf_mem to 1.
943  */
944 static int
945 flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop)
946 {
947 	uint64_t value = avd_get_int(flowop->fo_value);
948 	int i;
949 
950 	filebench_log(LOG_DEBUG_IMPL, "hog enter");
951 	flowop_beginop(threadflow, flowop);
952 	if (threadflow->tf_mem != NULL) {
953 		for (i = 0; i < value; i++)
954 			*(threadflow->tf_mem) = 1;
955 	}
956 	flowop_endop(threadflow, flowop, 0);
957 	filebench_log(LOG_DEBUG_IMPL, "hog exit");
958 	return (FILEBENCH_OK);
959 }
960 
961 
962 /*
963  * Delays for fo_value seconds.
964  */
965 static int
966 flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop)
967 {
968 	int value = avd_get_int(flowop->fo_value);
969 
970 	flowop_beginop(threadflow, flowop);
971 	(void) sleep(value);
972 	flowop_endop(threadflow, flowop, 0);
973 	return (FILEBENCH_OK);
974 }
975 
976 /*
977  * Rate limiting routines. This is the event consuming half of the
978  * event system. Each of the four following routines will limit the rate
979  * to one unit of either calls, issued I/O operations, issued filebench
980  * operations, or I/O bandwidth. Since there is only one event generator,
981  * the events will be divided amoung multiple instances of an event
982  * consumer, and further divided among different consumers if more than
983  * one has been defined. There is no mechanism to enforce equal sharing
984  * of events.
985  */
986 
987 /*
988  * Completes one invocation per posted event. If eventgen_q
989  * has an event count greater than zero, one will be removed
990  * (count decremented), otherwise the calling thread will
991  * block until another event has been posted. Always returns 0
992  */
993 static int
994 flowoplib_eventlimit(threadflow_t *threadflow, flowop_t *flowop)
995 {
996 	/* Immediately bail if not set/enabled */
997 	if (filebench_shm->shm_eventgen_hz == 0)
998 		return (FILEBENCH_OK);
999 
1000 	if (flowop->fo_initted == 0) {
1001 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1002 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1003 		flowop->fo_initted = 1;
1004 	}
1005 
1006 	flowop_beginop(threadflow, flowop);
1007 	while (filebench_shm->shm_eventgen_hz) {
1008 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1009 		if (filebench_shm->shm_eventgen_q > 0) {
1010 			filebench_shm->shm_eventgen_q--;
1011 			(void) ipc_mutex_unlock(
1012 			    &filebench_shm->shm_eventgen_lock);
1013 			break;
1014 		}
1015 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1016 		    &filebench_shm->shm_eventgen_lock);
1017 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1018 	}
1019 	flowop_endop(threadflow, flowop, 0);
1020 	return (FILEBENCH_OK);
1021 }
1022 
1023 /*
1024  * Blocks the calling thread if the number of issued I/O
1025  * operations exceeds the number of posted events, thus
1026  * limiting the average I/O operation rate to the rate
1027  * specified by eventgen_hz. Always returns FILEBENCH_OK.
1028  */
1029 static int
1030 flowoplib_iopslimit(threadflow_t *threadflow, flowop_t *flowop)
1031 {
1032 	uint64_t iops;
1033 	uint64_t delta;
1034 	uint64_t events;
1035 
1036 	/* Immediately bail if not set/enabled */
1037 	if (filebench_shm->shm_eventgen_hz == 0)
1038 		return (FILEBENCH_OK);
1039 
1040 	if (flowop->fo_initted == 0) {
1041 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1042 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1043 		flowop->fo_initted = 1;
1044 	}
1045 
1046 	(void) ipc_mutex_lock(&controlstats_lock);
1047 	iops = (controlstats.fs_rcount +
1048 	    controlstats.fs_wcount);
1049 	(void) ipc_mutex_unlock(&controlstats_lock);
1050 
1051 	/* Is this the first time around */
1052 	if (flowop->fo_tputlast == 0) {
1053 		flowop->fo_tputlast = iops;
1054 		return (FILEBENCH_OK);
1055 	}
1056 
1057 	delta = iops - flowop->fo_tputlast;
1058 	flowop->fo_tputbucket -= delta;
1059 	flowop->fo_tputlast = iops;
1060 
1061 	/* No need to block if the q isn't empty */
1062 	if (flowop->fo_tputbucket >= 0LL) {
1063 		flowop_endop(threadflow, flowop, 0);
1064 		return (FILEBENCH_OK);
1065 	}
1066 
1067 	iops = flowop->fo_tputbucket * -1;
1068 	events = iops;
1069 
1070 	flowop_beginop(threadflow, flowop);
1071 	while (filebench_shm->shm_eventgen_hz) {
1072 
1073 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1074 		if (filebench_shm->shm_eventgen_q >= events) {
1075 			filebench_shm->shm_eventgen_q -= events;
1076 			(void) ipc_mutex_unlock(
1077 			    &filebench_shm->shm_eventgen_lock);
1078 			flowop->fo_tputbucket += events;
1079 			break;
1080 		}
1081 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1082 		    &filebench_shm->shm_eventgen_lock);
1083 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1084 	}
1085 	flowop_endop(threadflow, flowop, 0);
1086 
1087 	return (FILEBENCH_OK);
1088 }
1089 
1090 /*
1091  * Blocks the calling thread if the number of issued filebench
1092  * operations exceeds the number of posted events, thus limiting
1093  * the average filebench operation rate to the rate specified by
1094  * eventgen_hz. Always returns FILEBENCH_OK.
1095  */
1096 static int
1097 flowoplib_opslimit(threadflow_t *threadflow, flowop_t *flowop)
1098 {
1099 	uint64_t ops;
1100 	uint64_t delta;
1101 	uint64_t events;
1102 
1103 	/* Immediately bail if not set/enabled */
1104 	if (filebench_shm->shm_eventgen_hz == 0)
1105 		return (FILEBENCH_OK);
1106 
1107 	if (flowop->fo_initted == 0) {
1108 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1109 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1110 		flowop->fo_initted = 1;
1111 	}
1112 
1113 	(void) ipc_mutex_lock(&controlstats_lock);
1114 	ops = controlstats.fs_count;
1115 	(void) ipc_mutex_unlock(&controlstats_lock);
1116 
1117 	/* Is this the first time around */
1118 	if (flowop->fo_tputlast == 0) {
1119 		flowop->fo_tputlast = ops;
1120 		return (FILEBENCH_OK);
1121 	}
1122 
1123 	delta = ops - flowop->fo_tputlast;
1124 	flowop->fo_tputbucket -= delta;
1125 	flowop->fo_tputlast = ops;
1126 
1127 	/* No need to block if the q isn't empty */
1128 	if (flowop->fo_tputbucket >= 0LL) {
1129 		flowop_endop(threadflow, flowop, 0);
1130 		return (FILEBENCH_OK);
1131 	}
1132 
1133 	ops = flowop->fo_tputbucket * -1;
1134 	events = ops;
1135 
1136 	flowop_beginop(threadflow, flowop);
1137 	while (filebench_shm->shm_eventgen_hz) {
1138 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1139 		if (filebench_shm->shm_eventgen_q >= events) {
1140 			filebench_shm->shm_eventgen_q -= events;
1141 			(void) ipc_mutex_unlock(
1142 			    &filebench_shm->shm_eventgen_lock);
1143 			flowop->fo_tputbucket += events;
1144 			break;
1145 		}
1146 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1147 		    &filebench_shm->shm_eventgen_lock);
1148 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1149 	}
1150 	flowop_endop(threadflow, flowop, 0);
1151 
1152 	return (FILEBENCH_OK);
1153 }
1154 
1155 
1156 /*
1157  * Blocks the calling thread if the number of bytes of I/O
1158  * issued exceeds one megabyte times the number of posted
1159  * events, thus limiting the average I/O byte rate to one
1160  * megabyte times the event rate as set by eventgen_hz.
1161  * Always retuns FILEBENCH_OK.
1162  */
1163 static int
1164 flowoplib_bwlimit(threadflow_t *threadflow, flowop_t *flowop)
1165 {
1166 	uint64_t bytes;
1167 	uint64_t delta;
1168 	uint64_t events;
1169 
1170 	/* Immediately bail if not set/enabled */
1171 	if (filebench_shm->shm_eventgen_hz == 0)
1172 		return (FILEBENCH_OK);
1173 
1174 	if (flowop->fo_initted == 0) {
1175 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1176 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1177 		flowop->fo_initted = 1;
1178 	}
1179 
1180 	(void) ipc_mutex_lock(&controlstats_lock);
1181 	bytes = (controlstats.fs_rbytes +
1182 	    controlstats.fs_wbytes);
1183 	(void) ipc_mutex_unlock(&controlstats_lock);
1184 
1185 	/* Is this the first time around */
1186 	if (flowop->fo_tputlast == 0) {
1187 		flowop->fo_tputlast = bytes;
1188 		return (FILEBENCH_OK);
1189 	}
1190 
1191 	delta = bytes - flowop->fo_tputlast;
1192 	flowop->fo_tputbucket -= delta;
1193 	flowop->fo_tputlast = bytes;
1194 
1195 	/* No need to block if the q isn't empty */
1196 	if (flowop->fo_tputbucket >= 0LL) {
1197 		flowop_endop(threadflow, flowop, 0);
1198 		return (FILEBENCH_OK);
1199 	}
1200 
1201 	bytes = flowop->fo_tputbucket * -1;
1202 	events = (bytes / MB) + 1;
1203 
1204 	filebench_log(LOG_DEBUG_IMPL, "%llu bytes, %llu events",
1205 	    (u_longlong_t)bytes, (u_longlong_t)events);
1206 
1207 	flowop_beginop(threadflow, flowop);
1208 	while (filebench_shm->shm_eventgen_hz) {
1209 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1210 		if (filebench_shm->shm_eventgen_q >= events) {
1211 			filebench_shm->shm_eventgen_q -= events;
1212 			(void) ipc_mutex_unlock(
1213 			    &filebench_shm->shm_eventgen_lock);
1214 			flowop->fo_tputbucket += (events * MB);
1215 			break;
1216 		}
1217 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1218 		    &filebench_shm->shm_eventgen_lock);
1219 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1220 	}
1221 	flowop_endop(threadflow, flowop, 0);
1222 
1223 	return (FILEBENCH_OK);
1224 }
1225 
1226 /*
1227  * These flowops terminate a benchmark run when either the specified
1228  * number of bytes of I/O (flowoplib_finishonbytes) or the specified
1229  * number of I/O operations (flowoplib_finishoncount) have been generated.
1230  */
1231 
1232 
1233 /*
1234  * Stop filebench run when specified number of I/O bytes have been
1235  * transferred. Compares controlstats.fs_bytes with flowop->value,
1236  * and if greater returns 1, stopping the run, if not, returns 0
1237  * to continue running.
1238  */
1239 static int
1240 flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop)
1241 {
1242 	uint64_t b;
1243 	uint64_t bytes = flowop->fo_constvalue; /* use constant value */
1244 
1245 	(void) ipc_mutex_lock(&controlstats_lock);
1246 	b = controlstats.fs_bytes;
1247 	(void) ipc_mutex_unlock(&controlstats_lock);
1248 
1249 	flowop_beginop(threadflow, flowop);
1250 	if (b > bytes) {
1251 		flowop_endop(threadflow, flowop, 0);
1252 		return (FILEBENCH_DONE);
1253 	}
1254 	flowop_endop(threadflow, flowop, 0);
1255 
1256 	return (FILEBENCH_OK);
1257 }
1258 
1259 /*
1260  * Stop filebench run when specified number of I/O operations have
1261  * been performed. Compares controlstats.fs_count with *flowop->value,
1262  * and if greater returns 1, stopping the run, if not, returns FILEBENCH_OK
1263  * to continue running.
1264  */
1265 static int
1266 flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop)
1267 {
1268 	uint64_t ops;
1269 	uint64_t count = flowop->fo_constvalue; /* use constant value */
1270 
1271 	(void) ipc_mutex_lock(&controlstats_lock);
1272 	ops = controlstats.fs_count;
1273 	(void) ipc_mutex_unlock(&controlstats_lock);
1274 
1275 	flowop_beginop(threadflow, flowop);
1276 	if (ops >= count) {
1277 		flowop_endop(threadflow, flowop, 0);
1278 		return (FILEBENCH_DONE);
1279 	}
1280 	flowop_endop(threadflow, flowop, 0);
1281 
1282 	return (FILEBENCH_OK);
1283 }
1284 
1285 /*
1286  * Semaphore synchronization using either System V semaphores or
1287  * posix semaphores. If System V semaphores are available, they will be
1288  * used, otherwise posix semaphores will be used.
1289  */
1290 
1291 
1292 /*
1293  * Initializes the filebench "block on semaphore" flowop.
1294  * If System V semaphores are implemented, the routine
1295  * initializes the System V semaphore subsystem if it hasn't
1296  * already been initialized, also allocates a pair of semids
1297  * and initializes the highwater System V semaphore.
1298  * If no System V semaphores, then does nothing special.
1299  * Returns FILEBENCH_ERROR if it cannot acquire a set of System V semphores
1300  * or if the initial post to the semaphore set fails. Returns FILEBENCH_OK
1301  * on success.
1302  */
1303 static int
1304 flowoplib_semblock_init(flowop_t *flowop)
1305 {
1306 
1307 #ifdef HAVE_SYSV_SEM
1308 	int sys_semid;
1309 	struct sembuf sbuf[2];
1310 	int highwater;
1311 
1312 	ipc_seminit();
1313 
1314 	flowop->fo_semid_lw = ipc_semidalloc();
1315 	flowop->fo_semid_hw = ipc_semidalloc();
1316 
1317 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d semblock init semid=%x",
1318 	    flowop->fo_name, flowop->fo_instance, flowop->fo_semid_lw);
1319 
1320 	sys_semid = filebench_shm->shm_sys_semid;
1321 
1322 	if ((highwater = flowop->fo_semid_hw) == 0)
1323 		highwater = flowop->fo_constvalue; /* use constant value */
1324 
1325 	filebench_log(LOG_DEBUG_IMPL, "setting highwater to : %d", highwater);
1326 
1327 	sbuf[0].sem_num = (short)highwater;
1328 	sbuf[0].sem_op = avd_get_int(flowop->fo_highwater);
1329 	sbuf[0].sem_flg = 0;
1330 	if ((semop(sys_semid, &sbuf[0], 1) == -1) && errno) {
1331 		filebench_log(LOG_ERROR, "semblock init post failed: %s (%d,"
1332 		    "%d)", strerror(errno), sbuf[0].sem_num, sbuf[0].sem_op);
1333 		return (FILEBENCH_ERROR);
1334 	}
1335 #else
1336 	filebench_log(LOG_DEBUG_IMPL,
1337 	    "flow %s-%d semblock init with posix semaphore",
1338 	    flowop->fo_name, flowop->fo_instance);
1339 
1340 	sem_init(&flowop->fo_sem, 1, 0);
1341 #endif	/* HAVE_SYSV_SEM */
1342 
1343 	if (!(avd_get_bool(flowop->fo_blocking)))
1344 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1345 
1346 	return (FILEBENCH_OK);
1347 }
1348 
1349 /*
1350  * Releases the semids for the System V semaphore allocated
1351  * to this flowop. If not using System V semaphores, then
1352  * it is effectively just a no-op.
1353  */
1354 static void
1355 flowoplib_semblock_destruct(flowop_t *flowop)
1356 {
1357 #ifdef HAVE_SYSV_SEM
1358 	ipc_semidfree(flowop->fo_semid_lw);
1359 	ipc_semidfree(flowop->fo_semid_hw);
1360 	(void) semctl(filebench_shm->shm_sys_semid, 0, IPC_RMID);
1361 	filebench_shm->shm_sys_semid = -1;
1362 #else
1363 	sem_destroy(&flowop->fo_sem);
1364 #endif /* HAVE_SYSV_SEM */
1365 }
1366 
1367 /*
1368  * Attempts to pass a System V or posix semaphore as appropriate,
1369  * and blocks if necessary. Returns FILEBENCH_ERROR if a set of System V
1370  * semphores is not available or cannot be acquired, or if the initial
1371  * post to the semaphore set fails. Returns FILEBENCH_OK on success.
1372  */
1373 static int
1374 flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop)
1375 {
1376 
1377 #ifdef HAVE_SYSV_SEM
1378 	struct sembuf sbuf[2];
1379 	int value = avd_get_int(flowop->fo_value);
1380 	int sys_semid;
1381 	struct timespec timeout;
1382 
1383 	sys_semid = filebench_shm->shm_sys_semid;
1384 
1385 	filebench_log(LOG_DEBUG_IMPL,
1386 	    "flow %s-%d sem blocking on id %x num %x value %d",
1387 	    flowop->fo_name, flowop->fo_instance, sys_semid,
1388 	    flowop->fo_semid_hw, value);
1389 
1390 	/* Post, decrement the increment the hw queue */
1391 	sbuf[0].sem_num = flowop->fo_semid_hw;
1392 	sbuf[0].sem_op = (short)value;
1393 	sbuf[0].sem_flg = 0;
1394 	sbuf[1].sem_num = flowop->fo_semid_lw;
1395 	sbuf[1].sem_op = value * -1;
1396 	sbuf[1].sem_flg = 0;
1397 	timeout.tv_sec = 600;
1398 	timeout.tv_nsec = 0;
1399 
1400 	if (avd_get_bool(flowop->fo_blocking))
1401 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1402 
1403 	flowop_beginop(threadflow, flowop);
1404 
1405 #ifdef HAVE_SEMTIMEDOP
1406 	(void) semtimedop(sys_semid, &sbuf[0], 1, &timeout);
1407 	(void) semtimedop(sys_semid, &sbuf[1], 1, &timeout);
1408 #else
1409 	(void) semop(sys_semid, &sbuf[0], 1);
1410 	(void) semop(sys_semid, &sbuf[1], 1);
1411 #endif /* HAVE_SEMTIMEDOP */
1412 
1413 	if (avd_get_bool(flowop->fo_blocking))
1414 		(void) ipc_mutex_lock(&flowop->fo_lock);
1415 
1416 	flowop_endop(threadflow, flowop, 0);
1417 
1418 #else
1419 	int value = avd_get_int(flowop->fo_value);
1420 	int i;
1421 
1422 	filebench_log(LOG_DEBUG_IMPL,
1423 	    "flow %s-%d sem blocking on posix semaphore",
1424 	    flowop->fo_name, flowop->fo_instance);
1425 
1426 	/* Decrement sem by value */
1427 	for (i = 0; i < value; i++) {
1428 		if (sem_wait(&flowop->fo_sem) == -1) {
1429 			filebench_log(LOG_ERROR, "semop wait failed");
1430 			return (FILEBENCH_ERROR);
1431 		}
1432 	}
1433 
1434 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d sem unblocking",
1435 	    flowop->fo_name, flowop->fo_instance);
1436 #endif /* HAVE_SYSV_SEM */
1437 
1438 	return (FILEBENCH_OK);
1439 }
1440 
1441 /*
1442  * Calls ipc_seminit(). Always returns FILEBENCH_OK.
1443  */
1444 /* ARGSUSED */
1445 static int
1446 flowoplib_sempost_init(flowop_t *flowop)
1447 {
1448 #ifdef HAVE_SYSV_SEM
1449 	ipc_seminit();
1450 #endif /* HAVE_SYSV_SEM */
1451 	return (FILEBENCH_OK);
1452 }
1453 
1454 /*
1455  * Post to a System V or posix semaphore as appropriate.
1456  * On the first call for a given flowop instance, this routine
1457  * will use the fo_targetname attribute to locate all semblock
1458  * flowops that are expecting posts from this flowop. All
1459  * target flowops on this list will have a post operation done
1460  * to their semaphores on each call.
1461  */
1462 static int
1463 flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop)
1464 {
1465 	flowop_t *target;
1466 
1467 	filebench_log(LOG_DEBUG_IMPL,
1468 	    "sempost flow %s-%d",
1469 	    flowop->fo_name,
1470 	    flowop->fo_instance);
1471 
1472 	/* if this is the first post, create the post list */
1473 	if (flowop->fo_targets == NULL) {
1474 		flowop_t *result = flowop_find(flowop->fo_targetname);
1475 
1476 		flowop->fo_targets = result;
1477 
1478 		if (result == NULL) {
1479 			filebench_log(LOG_ERROR,
1480 			    "sempost: could not find op %s for thread %s",
1481 			    flowop->fo_targetname,
1482 			    threadflow->tf_name);
1483 			filebench_shutdown(1);
1484 		}
1485 
1486 		while (result) {
1487 			result->fo_targetnext =
1488 			    result->fo_resultnext;
1489 			result = result->fo_resultnext;
1490 		}
1491 	}
1492 
1493 	target = flowop->fo_targets;
1494 
1495 	flowop_beginop(threadflow, flowop);
1496 	/* post to the targets */
1497 	while (target) {
1498 #ifdef HAVE_SYSV_SEM
1499 		struct sembuf sbuf[2];
1500 		int sys_semid;
1501 		int blocking;
1502 #else
1503 		int i;
1504 #endif /* HAVE_SYSV_SEM */
1505 		struct timespec timeout;
1506 		int value = avd_get_int(flowop->fo_value);
1507 
1508 		if (target->fo_instance == FLOW_MASTER) {
1509 			target = target->fo_targetnext;
1510 			continue;
1511 		}
1512 
1513 #ifdef HAVE_SYSV_SEM
1514 
1515 		filebench_log(LOG_DEBUG_IMPL,
1516 		    "sempost flow %s-%d num %x",
1517 		    target->fo_name,
1518 		    target->fo_instance,
1519 		    target->fo_semid_lw);
1520 
1521 		sys_semid = filebench_shm->shm_sys_semid;
1522 		sbuf[0].sem_num = target->fo_semid_lw;
1523 		sbuf[0].sem_op = (short)value;
1524 		sbuf[0].sem_flg = 0;
1525 		sbuf[1].sem_num = target->fo_semid_hw;
1526 		sbuf[1].sem_op = value * -1;
1527 		sbuf[1].sem_flg = 0;
1528 		timeout.tv_sec = 600;
1529 		timeout.tv_nsec = 0;
1530 
1531 		if (avd_get_bool(flowop->fo_blocking))
1532 			blocking = 1;
1533 		else
1534 			blocking = 0;
1535 
1536 #ifdef HAVE_SEMTIMEDOP
1537 		if ((semtimedop(sys_semid, &sbuf[0], blocking + 1,
1538 		    &timeout) == -1) && (errno && (errno != EAGAIN))) {
1539 #else
1540 		if ((semop(sys_semid, &sbuf[0], blocking + 1) == -1) &&
1541 		    (errno && (errno != EAGAIN))) {
1542 #endif /* HAVE_SEMTIMEDOP */
1543 			filebench_log(LOG_ERROR, "semop post failed: %s",
1544 			    strerror(errno));
1545 			return (FILEBENCH_ERROR);
1546 		}
1547 
1548 		filebench_log(LOG_DEBUG_IMPL,
1549 		    "flow %s-%d finished posting",
1550 		    target->fo_name, target->fo_instance);
1551 #else
1552 		filebench_log(LOG_DEBUG_IMPL,
1553 		    "sempost flow %s-%d to posix semaphore",
1554 		    target->fo_name,
1555 		    target->fo_instance);
1556 
1557 		/* Increment sem by value */
1558 		for (i = 0; i < value; i++) {
1559 			if (sem_post(&target->fo_sem) == -1) {
1560 				filebench_log(LOG_ERROR, "semop post failed");
1561 				return (FILEBENCH_ERROR);
1562 			}
1563 		}
1564 
1565 		filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
1566 		    target->fo_name, target->fo_instance);
1567 #endif /* HAVE_SYSV_SEM */
1568 
1569 		target = target->fo_targetnext;
1570 	}
1571 	flowop_endop(threadflow, flowop, 0);
1572 
1573 	return (FILEBENCH_OK);
1574 }
1575 
1576 
1577 /*
1578  * Section for exercising create / open / close / delete operations
1579  * on files within a fileset. For proper operation, the flowop attribute
1580  * "fd", which sets the fo_fdnumber field in the flowop, must be used
1581  * so that the same file is opened and later closed. "fd" is an index
1582  * into a pair of arrays maintained by threadflows, one of which
1583  * contains the operating system assigned file descriptors and the other
1584  * a pointer to the filesetentry whose file the file descriptor
1585  * references. An openfile flowop defined without fd being set will use
1586  * the default (0) fd or, if specified, rotate through fd indices, but
1587  * createfile and closefile must use the default or a specified fd.
1588  * Meanwhile deletefile picks and arbitrary file to delete, regardless
1589  * of fd attribute.
1590  */
1591 
1592 /*
1593  * XXX Making file selection more consistent among the flowops might good
1594  */
1595 
1596 
1597 /*
1598  * Emulates (and actually does) file open. Obtains a file descriptor
1599  * index, then calls flowoplib_openfile_common() to open. Returns
1600  * FILEBENCH_ERROR if no file descriptor is found, and returns the
1601  * status from flowoplib_openfile_common otherwise (FILEBENCH_ERROR,
1602  * FILEBENCH_NORSC, FILEBENCH_OK).
1603  */
1604 static int
1605 flowoplib_openfile(threadflow_t *threadflow, flowop_t *flowop)
1606 {
1607 	int fd = flowoplib_fdnum(threadflow, flowop);
1608 
1609 	if (fd == -1)
1610 		return (FILEBENCH_ERROR);
1611 
1612 	return (flowoplib_openfile_common(threadflow, flowop, fd));
1613 }
1614 
1615 /*
1616  * Common file opening code for filesets. Uses the supplied
1617  * file descriptor index to determine the tf_fd entry to use.
1618  * If the entry is empty (0) and the fileset exists, fileset
1619  * pick is called to select a fileset entry to use. The file
1620  * specified in the filesetentry is opened, and the returned
1621  * operating system file descriptor and a pointer to the
1622  * filesetentry are stored in tf_fd[fd] and tf_fse[fd],
1623  * respectively. Returns FILEBENCH_ERROR on error,
1624  * FILEBENCH_NORSC if no suitable filesetentry can be found,
1625  * and FILEBENCH_OK on success.
1626  */
1627 static int
1628 flowoplib_openfile_common(threadflow_t *threadflow, flowop_t *flowop, int fd)
1629 {
1630 	filesetentry_t *file;
1631 	char *fileset_name;
1632 	int tid = 0;
1633 
1634 	if (flowop->fo_fileset == NULL) {
1635 		filebench_log(LOG_ERROR, "flowop NULL file");
1636 		return (FILEBENCH_ERROR);
1637 	}
1638 
1639 	if ((fileset_name =
1640 	    avd_get_str(flowop->fo_fileset->fs_name)) == NULL) {
1641 		filebench_log(LOG_ERROR,
1642 		    "flowop %s: fileset has no name", flowop->fo_name);
1643 		return (FILEBENCH_ERROR);
1644 	}
1645 
1646 	/*
1647 	 * If the flowop doesn't default to persistent fd
1648 	 * then get unique thread ID for use by fileset_pick
1649 	 */
1650 	if (avd_get_bool(flowop->fo_rotatefd))
1651 		tid = threadflow->tf_utid;
1652 
1653 	if (threadflow->tf_fd[fd] != 0) {
1654 		filebench_log(LOG_ERROR,
1655 		    "flowop %s attempted to open without closing on fd %d",
1656 		    flowop->fo_name, fd);
1657 		return (FILEBENCH_ERROR);
1658 	}
1659 
1660 #ifdef HAVE_RAW_SUPPORT
1661 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1662 		int open_attrs = 0;
1663 		char name[MAXPATHLEN];
1664 
1665 		(void) strcpy(name,
1666 		    avd_get_str(flowop->fo_fileset->fs_path));
1667 		(void) strcat(name, "/");
1668 		(void) strcat(name, fileset_name);
1669 
1670 		if (avd_get_bool(flowop->fo_dsync)) {
1671 #ifdef sun
1672 			open_attrs |= O_DSYNC;
1673 #else
1674 			open_attrs |= O_FSYNC;
1675 #endif
1676 		}
1677 
1678 		filebench_log(LOG_DEBUG_SCRIPT,
1679 		    "open raw device %s flags %d = %d", name, open_attrs, fd);
1680 
1681 		threadflow->tf_fd[fd] = open64(name,
1682 		    O_RDWR | open_attrs, 0666);
1683 
1684 		if (threadflow->tf_fd[fd] < 0) {
1685 			filebench_log(LOG_ERROR,
1686 			    "Failed to open raw device %s: %s",
1687 			    name, strerror(errno));
1688 			return (FILEBENCH_ERROR);
1689 		}
1690 
1691 		/* if running on Solaris, use un-buffered io */
1692 #ifdef sun
1693 		(void) directio(threadflow->tf_fd[fd], DIRECTIO_ON);
1694 #endif
1695 
1696 		threadflow->tf_fse[fd] = NULL;
1697 
1698 		return (FILEBENCH_OK);
1699 	}
1700 #endif /* HAVE_RAW_SUPPORT */
1701 
1702 	if ((file = fileset_pick(flowop->fo_fileset,
1703 	    FILESET_PICKEXISTS, tid)) == NULL) {
1704 		filebench_log(LOG_DEBUG_SCRIPT,
1705 		    "flowop %s failed to pick file from %s on fd %d",
1706 		    flowop->fo_name, fileset_name, fd);
1707 		return (FILEBENCH_NORSC);
1708 	}
1709 
1710 	threadflow->tf_fse[fd] = file;
1711 
1712 	flowop_beginop(threadflow, flowop);
1713 	threadflow->tf_fd[fd] = fileset_openfile(flowop->fo_fileset,
1714 	    file, O_RDWR, 0666, flowoplib_fileattrs(flowop));
1715 	flowop_endop(threadflow, flowop, 0);
1716 
1717 	if (threadflow->tf_fd[fd] < 0) {
1718 		filebench_log(LOG_ERROR, "flowop %s failed to open file %s",
1719 		    flowop->fo_name, file->fse_path);
1720 		return (FILEBENCH_ERROR);
1721 	}
1722 
1723 	filebench_log(LOG_DEBUG_SCRIPT,
1724 	    "flowop %s: opened %s fd[%d] = %d",
1725 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1726 
1727 	return (FILEBENCH_OK);
1728 }
1729 
1730 /*
1731  * Emulate create of a file. Uses the flowop's fdnumber to select
1732  * tf_fd and tf_fse array locations to put the created file's file
1733  * descriptor and filesetentry respectively. Uses fileset_pick()
1734  * to select a specific filesetentry whose file does not currently
1735  * exist for the file create operation. Then calls
1736  * fileset_openfile() with the O_CREATE flag set to create the
1737  * file. Returns FILEBENCH_ERROR if the array index specified by fdnumber is
1738  * already in use, the flowop has no associated fileset, or
1739  * the create call fails. Returns 1 if a filesetentry with a
1740  * nonexistent file cannot be found. Returns FILEBENCH_OK on success.
1741  */
1742 static int
1743 flowoplib_createfile(threadflow_t *threadflow, flowop_t *flowop)
1744 {
1745 	filesetentry_t *file;
1746 	int fd = flowop->fo_fdnumber;
1747 
1748 	if (threadflow->tf_fd[fd] != 0) {
1749 		filebench_log(LOG_ERROR,
1750 		    "flowop %s attempted to create without closing on fd %d",
1751 		    flowop->fo_name, fd);
1752 		return (FILEBENCH_ERROR);
1753 	}
1754 
1755 	if (flowop->fo_fileset == NULL) {
1756 		filebench_log(LOG_ERROR, "flowop NULL file");
1757 		return (FILEBENCH_ERROR);
1758 	}
1759 
1760 #ifdef HAVE_RAW_SUPPORT
1761 	/* can't be used with raw devices */
1762 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1763 		filebench_log(LOG_ERROR,
1764 		    "flowop %s attempted to a createfile on RAW device",
1765 		    flowop->fo_name);
1766 		return (FILEBENCH_ERROR);
1767 	}
1768 #endif /* HAVE_RAW_SUPPORT */
1769 
1770 	if ((file = fileset_pick(flowop->fo_fileset,
1771 	    FILESET_PICKNOEXIST, 0)) == NULL) {
1772 		filebench_log(LOG_DEBUG_SCRIPT,
1773 		    "flowop %s failed to pick file from fileset %s",
1774 		    flowop->fo_name,
1775 		    avd_get_str(flowop->fo_fileset->fs_name));
1776 		return (FILEBENCH_NORSC);
1777 	}
1778 
1779 	threadflow->tf_fse[fd] = file;
1780 
1781 	flowop_beginop(threadflow, flowop);
1782 	threadflow->tf_fd[fd] = fileset_openfile(flowop->fo_fileset,
1783 	    file, O_RDWR | O_CREAT, 0666, flowoplib_fileattrs(flowop));
1784 	flowop_endop(threadflow, flowop, 0);
1785 
1786 	if (threadflow->tf_fd[fd] < 0) {
1787 		filebench_log(LOG_ERROR, "failed to create file %s",
1788 		    flowop->fo_name);
1789 		return (FILEBENCH_ERROR);
1790 	}
1791 
1792 	filebench_log(LOG_DEBUG_SCRIPT,
1793 	    "flowop %s: created %s fd[%d] = %d",
1794 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1795 
1796 	return (FILEBENCH_OK);
1797 }
1798 
1799 /*
1800  * Emulates delete of a file. If a valid fd is provided, it uses the
1801  * filesetentry stored at that fd location to select the file to be
1802  * deleted, otherwise it picks an arbitrary filesetentry
1803  * whose file exists. It then uses unlink() to delete it and Clears
1804  * the FSE_EXISTS flag for the filesetentry. Returns FILEBENCH_ERROR if the
1805  * flowop has no associated fileset. Returns FILEBENCH_NORSC if an appropriate
1806  * filesetentry cannot be found, and FILEBENCH_OK on success.
1807  */
1808 static int
1809 flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop)
1810 {
1811 	filesetentry_t *file;
1812 	fileset_t *fileset;
1813 	char path[MAXPATHLEN];
1814 	char *pathtmp;
1815 	int fd = flowop->fo_fdnumber;
1816 
1817 	/* if fd specified, use it to access file */
1818 	if ((fd > 0) && ((file = threadflow->tf_fse[fd]) != NULL)) {
1819 
1820 		/* check whether file still open */
1821 		if (threadflow->tf_fd[fd] > 0) {
1822 			filebench_log(LOG_DEBUG_SCRIPT,
1823 			    "flowop %s deleting still open file at fd = %d",
1824 			    flowop->fo_name, fd);
1825 		}
1826 
1827 		/* indicate that the file will be deleted */
1828 		threadflow->tf_fse[fd] = NULL;
1829 
1830 		/* if here, we still have a valid file pointer */
1831 		fileset = file->fse_fileset;
1832 	} else {
1833 		/* Otherwise, pick arbitrary file */
1834 		file = NULL;
1835 		fileset = flowop->fo_fileset;
1836 	}
1837 
1838 
1839 	if (fileset == NULL) {
1840 		filebench_log(LOG_ERROR, "flowop NULL file");
1841 		return (FILEBENCH_ERROR);
1842 	}
1843 
1844 #ifdef HAVE_RAW_SUPPORT
1845 	/* can't be used with raw devices */
1846 	if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1847 		filebench_log(LOG_ERROR,
1848 		    "flowop %s attempted a deletefile on RAW device",
1849 		    flowop->fo_name);
1850 		return (FILEBENCH_ERROR);
1851 	}
1852 #endif /* HAVE_RAW_SUPPORT */
1853 
1854 	if (file == NULL) {
1855 		if ((file = fileset_pick(fileset, FILESET_PICKEXISTS, 0))
1856 		    == NULL) {
1857 			filebench_log(LOG_DEBUG_SCRIPT,
1858 			    "flowop %s failed to pick file", flowop->fo_name);
1859 			return (FILEBENCH_NORSC);
1860 		}
1861 	} else {
1862 		(void) ipc_mutex_lock(&file->fse_lock);
1863 	}
1864 
1865 	*path = 0;
1866 	(void) strcpy(path, avd_get_str(fileset->fs_path));
1867 	(void) strcat(path, "/");
1868 	(void) strcat(path, avd_get_str(fileset->fs_name));
1869 	pathtmp = fileset_resolvepath(file);
1870 	(void) strcat(path, pathtmp);
1871 	free(pathtmp);
1872 
1873 	flowop_beginop(threadflow, flowop);
1874 	(void) unlink(path);
1875 	flowop_endop(threadflow, flowop, 0);
1876 	file->fse_flags &= ~FSE_EXISTS;
1877 	(void) ipc_mutex_unlock(&file->fse_lock);
1878 
1879 	filebench_log(LOG_DEBUG_SCRIPT, "deleted file %s", file->fse_path);
1880 
1881 	return (FILEBENCH_OK);
1882 }
1883 
1884 /*
1885  * Emulates fsync of a file. Obtains the file descriptor index
1886  * from the flowop, obtains the actual file descriptor from
1887  * the threadflow's table, checks to be sure it is still an
1888  * open file, then does an fsync operation on it. Returns FILEBENCH_ERROR
1889  * if the file no longer is open, FILEBENCH_OK otherwise.
1890  */
1891 static int
1892 flowoplib_fsync(threadflow_t *threadflow, flowop_t *flowop)
1893 {
1894 	filesetentry_t *file;
1895 	int fd = flowop->fo_fdnumber;
1896 
1897 	if (threadflow->tf_fd[fd] == 0) {
1898 		filebench_log(LOG_ERROR,
1899 		    "flowop %s attempted to fsync a closed fd %d",
1900 		    flowop->fo_name, fd);
1901 		return (FILEBENCH_ERROR);
1902 	}
1903 
1904 	file = threadflow->tf_fse[fd];
1905 
1906 	if ((file == NULL) ||
1907 	    (file->fse_fileset->fs_attrs & FILESET_IS_RAW_DEV)) {
1908 		filebench_log(LOG_ERROR,
1909 		    "flowop %s attempted to a fsync a RAW device",
1910 		    flowop->fo_name);
1911 		return (FILEBENCH_ERROR);
1912 	}
1913 
1914 	/* Measure time to fsync */
1915 	flowop_beginop(threadflow, flowop);
1916 	(void) fsync(threadflow->tf_fd[fd]);
1917 	flowop_endop(threadflow, flowop, 0);
1918 
1919 	filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s", file->fse_path);
1920 
1921 	return (FILEBENCH_OK);
1922 }
1923 
1924 /*
1925  * Emulate fsync of an entire fileset. Search through the
1926  * threadflow's file descriptor array, doing fsync() on each
1927  * open file that belongs to the flowop's fileset. Always
1928  * returns FILEBENCH_OK.
1929  */
1930 static int
1931 flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop)
1932 {
1933 	int fd;
1934 
1935 	for (fd = 0; fd < THREADFLOW_MAXFD; fd++) {
1936 		filesetentry_t *file;
1937 
1938 		/* Match the file set to fsync */
1939 		if ((threadflow->tf_fse[fd] == NULL) ||
1940 		    (flowop->fo_fileset != threadflow->tf_fse[fd]->fse_fileset))
1941 			continue;
1942 
1943 		/* Measure time to fsync */
1944 		flowop_beginop(threadflow, flowop);
1945 		(void) fsync(threadflow->tf_fd[fd]);
1946 		flowop_endop(threadflow, flowop, 0);
1947 
1948 		file = threadflow->tf_fse[fd];
1949 
1950 		filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s",
1951 		    file->fse_path);
1952 	}
1953 
1954 	return (FILEBENCH_OK);
1955 }
1956 
1957 /*
1958  * Emulate close of a file.  Obtains the file descriptor index
1959  * from the flowop, obtains the actual file descriptor from the
1960  * threadflow's table, checks to be sure it is still an open
1961  * file, then does a close operation on it. Then sets the
1962  * threadflow file descriptor table entry to 0, and the file set
1963  * entry pointer to NULL. Returns FILEBENCH_ERROR if the file was not open,
1964  * FILEBENCH_OK otherwise.
1965  */
1966 static int
1967 flowoplib_closefile(threadflow_t *threadflow, flowop_t *flowop)
1968 {
1969 	filesetentry_t *file;
1970 	int fd = flowop->fo_fdnumber;
1971 
1972 	if (threadflow->tf_fd[fd] == 0) {
1973 		filebench_log(LOG_ERROR,
1974 		    "flowop %s attempted to close an already closed fd %d",
1975 		    flowop->fo_name, fd);
1976 		return (FILEBENCH_ERROR);
1977 	}
1978 
1979 	/* Measure time to close */
1980 	flowop_beginop(threadflow, flowop);
1981 	(void) close(threadflow->tf_fd[fd]);
1982 	flowop_endop(threadflow, flowop, 0);
1983 
1984 	file = threadflow->tf_fse[fd];
1985 
1986 	threadflow->tf_fd[fd] = 0;
1987 
1988 	filebench_log(LOG_DEBUG_SCRIPT, "closed file %s", file->fse_path);
1989 
1990 	return (FILEBENCH_OK);
1991 }
1992 
1993 /*
1994  * Emulate stat of a file. Picks an arbitrary filesetentry with
1995  * an existing file from the flowop's fileset, then performs a
1996  * stat() operation on it. Returns FILEBENCH_ERROR if the flowop has no
1997  * associated fileset. Returns FILEBENCH_NORSC if an appropriate filesetentry
1998  * cannot be found, and FILEBENCH_OK on success.
1999  */
2000 static int
2001 flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop)
2002 {
2003 	filesetentry_t *file;
2004 	fileset_t *fileset;
2005 	char path[MAXPATHLEN];
2006 	char *pathtmp;
2007 
2008 	if ((fileset = flowop->fo_fileset) == NULL) {
2009 		filebench_log(LOG_ERROR, "flowop NULL file");
2010 		return (FILEBENCH_ERROR);
2011 	}
2012 
2013 	if ((file = fileset_pick(fileset, FILESET_PICKEXISTS, 0)) == NULL) {
2014 		filebench_log(LOG_DEBUG_SCRIPT,
2015 		    "flowop %s failed to pick file",
2016 		    flowop->fo_name);
2017 		return (FILEBENCH_NORSC);
2018 	}
2019 
2020 	*path = 0;
2021 	(void) strcpy(path, avd_get_str(fileset->fs_path));
2022 	(void) strcat(path, "/");
2023 	(void) strcat(path, avd_get_str(fileset->fs_name));
2024 	pathtmp = fileset_resolvepath(file);
2025 	(void) strcat(path, pathtmp);
2026 	free(pathtmp);
2027 
2028 	flowop_beginop(threadflow, flowop);
2029 	flowop_endop(threadflow, flowop, 0);
2030 
2031 	(void) ipc_mutex_unlock(&file->fse_lock);
2032 
2033 	return (FILEBENCH_OK);
2034 }
2035 
2036 
2037 /*
2038  * Additional reads and writes. Read and write whole files, write
2039  * and append to files. Some of these work with both fileobjs and
2040  * filesets, others only with filesets. The flowoplib_write routine
2041  * writes from thread memory, while the others read or write using
2042  * fo_buf memory. Note that both flowoplib_read() and
2043  * flowoplib_aiowrite() use thread memory as well.
2044  */
2045 
2046 
2047 /*
2048  * Emulate a read of a whole file. The file must be open with
2049  * file descriptor and filesetentry stored at the locations indexed
2050  * by the flowop's fdnumber. It then seeks to the beginning of the
2051  * associated file, and reads fs_iosize bytes at a time until the end
2052  * of the file. Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if
2053  * out of files, and FILEBENCH_OK on success.
2054  */
2055 static int
2056 flowoplib_readwholefile(threadflow_t *threadflow, flowop_t *flowop)
2057 {
2058 	caddr_t iobuf;
2059 	off64_t bytes = 0;
2060 	int filedesc;
2061 	uint64_t wss;
2062 	fbint_t iosize;
2063 	int ret;
2064 	char zerordbuf;
2065 
2066 	/* get the file to use */
2067 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
2068 	    &filedesc)) != FILEBENCH_OK)
2069 		return (ret);
2070 
2071 	/* an I/O size of zero means read entire working set with one I/O */
2072 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
2073 		iosize = wss;
2074 
2075 	/*
2076 	 * The file may actually be 0 bytes long, in which case skip
2077 	 * the buffer set up call (which would fail) and substitute
2078 	 * a small buffer, which won't really be used.
2079 	 */
2080 	if (iosize == 0) {
2081 		iobuf = (caddr_t)&zerordbuf;
2082 		filebench_log(LOG_DEBUG_SCRIPT,
2083 		    "flowop %s read zero length file", flowop->fo_name);
2084 	} else {
2085 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
2086 		    iosize) != 0)
2087 			return (FILEBENCH_ERROR);
2088 	}
2089 
2090 	/* Measure time to read bytes */
2091 	flowop_beginop(threadflow, flowop);
2092 	(void) lseek64(filedesc, 0, SEEK_SET);
2093 	while ((ret = read(filedesc, iobuf, iosize)) > 0)
2094 		bytes += ret;
2095 
2096 	flowop_endop(threadflow, flowop, bytes);
2097 
2098 	if (ret < 0) {
2099 		filebench_log(LOG_ERROR,
2100 		    "readwhole fail Failed to read whole file: %s",
2101 		    strerror(errno));
2102 		return (FILEBENCH_ERROR);
2103 	}
2104 
2105 	return (FILEBENCH_OK);
2106 }
2107 
2108 /*
2109  * Emulate a write to a file of size fo_iosize.  Will write
2110  * to a file from a fileset if the flowop's fo_fileset field
2111  * specifies one or its fdnumber is non zero. Otherwise it
2112  * will write to a fileobj file, if one exists. If the file
2113  * is not currently open, the routine will attempt to open
2114  * it. The flowop's fo_wss parameter will be used to set the
2115  * maximum file size if it is non-zero, otherwise the
2116  * filesetentry's  fse_size will be used. A random memory
2117  * buffer offset is calculated, and, if fo_random is TRUE,
2118  * a random file offset is used for the write. Otherwise the
2119  * write is to the next sequential location. Returns
2120  * FILEBENCH_ERROR on errors, FILEBENCH_NORSC if iosetup can't
2121  * obtain a file, or FILEBENCH_OK on success.
2122  */
2123 static int
2124 flowoplib_write(threadflow_t *threadflow, flowop_t *flowop)
2125 {
2126 	caddr_t iobuf;
2127 	fbint_t wss;
2128 	fbint_t iosize;
2129 	int filedesc;
2130 	int ret;
2131 
2132 	iosize = avd_get_int(flowop->fo_iosize);
2133 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2134 	    &filedesc, iosize)) != FILEBENCH_OK)
2135 		return (ret);
2136 
2137 	if (avd_get_bool(flowop->fo_random)) {
2138 		uint64_t fileoffset;
2139 
2140 		if (filebench_randomno64(&fileoffset,
2141 		    wss, iosize, NULL) == -1) {
2142 			filebench_log(LOG_ERROR,
2143 			    "file size smaller than IO size for thread %s",
2144 			    flowop->fo_name);
2145 			return (FILEBENCH_ERROR);
2146 		}
2147 		flowop_beginop(threadflow, flowop);
2148 		if (pwrite64(filedesc, iobuf,
2149 		    iosize, (off64_t)fileoffset) == -1) {
2150 			filebench_log(LOG_ERROR, "write failed, "
2151 			    "offset %llu io buffer %zd: %s",
2152 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
2153 			flowop_endop(threadflow, flowop, 0);
2154 			return (FILEBENCH_ERROR);
2155 		}
2156 		flowop_endop(threadflow, flowop, iosize);
2157 	} else {
2158 		flowop_beginop(threadflow, flowop);
2159 		if (write(filedesc, iobuf, iosize) == -1) {
2160 			filebench_log(LOG_ERROR,
2161 			    "write failed, io buffer %zd: %s",
2162 			    iobuf, strerror(errno));
2163 			flowop_endop(threadflow, flowop, 0);
2164 			return (FILEBENCH_ERROR);
2165 		}
2166 		flowop_endop(threadflow, flowop, iosize);
2167 	}
2168 
2169 	return (FILEBENCH_OK);
2170 }
2171 
2172 /*
2173  * Emulate a write of a whole file.  The size of the file
2174  * is taken from a filesetentry identified by fo_srcfdnumber or
2175  * from the working set size, while the file descriptor used is
2176  * identified by fo_fdnumber. Does multiple writes of fo_iosize
2177  * length length until full file has been written. Returns FILEBENCH_ERROR on
2178  * error, FILEBENCH_NORSC if out of files, FILEBENCH_OK on success.
2179  */
2180 static int
2181 flowoplib_writewholefile(threadflow_t *threadflow, flowop_t *flowop)
2182 {
2183 	caddr_t iobuf;
2184 	filesetentry_t *file;
2185 	int wsize;
2186 	off64_t seek;
2187 	off64_t bytes = 0;
2188 	uint64_t wss;
2189 	fbint_t iosize;
2190 	int filedesc;
2191 	int srcfd = flowop->fo_srcfdnumber;
2192 	int ret;
2193 	char zerowrtbuf;
2194 
2195 	/* get the file to use */
2196 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
2197 	    &filedesc)) != FILEBENCH_OK)
2198 		return (ret);
2199 
2200 	/* an I/O size of zero means write entire working set with one I/O */
2201 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
2202 		iosize = wss;
2203 
2204 	/*
2205 	 * The file may actually be 0 bytes long, in which case skip
2206 	 * the buffer set up call (which would fail) and substitute
2207 	 * a small buffer, which won't really be used.
2208 	 */
2209 	if (iosize == 0) {
2210 		iobuf = (caddr_t)&zerowrtbuf;
2211 		filebench_log(LOG_DEBUG_SCRIPT,
2212 		    "flowop %s wrote zero length file", flowop->fo_name);
2213 	} else {
2214 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
2215 		    iosize) != 0)
2216 			return (FILEBENCH_ERROR);
2217 	}
2218 
2219 	file = threadflow->tf_fse[srcfd];
2220 	if ((srcfd != 0) && (file == NULL)) {
2221 		filebench_log(LOG_ERROR, "flowop %s: NULL src file",
2222 		    flowop->fo_name);
2223 		return (FILEBENCH_ERROR);
2224 	}
2225 
2226 	if (file)
2227 		wss = file->fse_size;
2228 
2229 	wsize = (int)MIN(wss, iosize);
2230 
2231 	/* Measure time to write bytes */
2232 	flowop_beginop(threadflow, flowop);
2233 	for (seek = 0; seek < wss; seek += wsize) {
2234 		ret = write(filedesc, iobuf, wsize);
2235 		if (ret != wsize) {
2236 			filebench_log(LOG_ERROR,
2237 			    "Failed to write %d bytes on fd %d: %s",
2238 			    wsize, filedesc, strerror(errno));
2239 			flowop_endop(threadflow, flowop, 0);
2240 			return (FILEBENCH_ERROR);
2241 		}
2242 		wsize = (int)MIN(wss - seek, iosize);
2243 		bytes += ret;
2244 	}
2245 	flowop_endop(threadflow, flowop, bytes);
2246 
2247 	return (FILEBENCH_OK);
2248 }
2249 
2250 
2251 /*
2252  * Emulate a fixed size append to a file. Will append data to
2253  * a file chosen from a fileset if the flowop's fo_fileset
2254  * field specifies one or if its fdnumber is non zero.
2255  * Otherwise it will write to a fileobj file, if one exists.
2256  * The flowop's fo_wss parameter will be used to set the
2257  * maximum file size if it is non-zero, otherwise the
2258  * filesetentry's fse_size will be used. A random memory
2259  * buffer offset is calculated, then a logical seek to the
2260  * end of file is done followed by a write of fo_iosize
2261  * bytes. Writes are actually done from fo_buf, rather than
2262  * tf_mem as is done with flowoplib_write(), and no check
2263  * is made to see if fo_iosize exceeds the size of fo_buf.
2264  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
2265  * files in the fileset, FILEBENCH_OK on success.
2266  */
2267 static int
2268 flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop)
2269 {
2270 	caddr_t iobuf;
2271 	int filedesc;
2272 	fbint_t wss;
2273 	fbint_t iosize;
2274 	int ret;
2275 
2276 	iosize = avd_get_int(flowop->fo_iosize);
2277 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2278 	    &filedesc, iosize)) != FILEBENCH_OK)
2279 		return (ret);
2280 
2281 	/* XXX wss is not being used */
2282 
2283 	/* Measure time to write bytes */
2284 	flowop_beginop(threadflow, flowop);
2285 	(void) lseek64(filedesc, 0, SEEK_END);
2286 	ret = write(filedesc, iobuf, iosize);
2287 	if (ret != iosize) {
2288 		filebench_log(LOG_ERROR,
2289 		    "Failed to write %llu bytes on fd %d: %s",
2290 		    (u_longlong_t)iosize, filedesc, strerror(errno));
2291 		flowop_endop(threadflow, flowop, ret);
2292 		return (FILEBENCH_ERROR);
2293 	}
2294 	flowop_endop(threadflow, flowop, ret);
2295 
2296 	return (FILEBENCH_OK);
2297 }
2298 
2299 /*
2300  * Emulate a random size append to a file. Will append data
2301  * to a file chosen from a fileset if the flowop's fo_fileset
2302  * field specifies one or if its fdnumber is non zero. Otherwise
2303  * it will write to a fileobj file, if one exists. The flowop's
2304  * fo_wss parameter will be used to set the maximum file size
2305  * if it is non-zero, otherwise the filesetentry's fse_size
2306  * will be used.  A random transfer size (but at most fo_iosize
2307  * bytes) and a random memory offset are calculated. A logical
2308  * seek to the end of file is done, then writes of up to
2309  * FILE_ALLOC_BLOCK in size are done until the full transfer
2310  * size has been written. Writes are actually done from fo_buf,
2311  * rather than tf_mem as is done with flowoplib_write().
2312  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
2313  * files in the fileset, FILEBENCH_OK on success.
2314  */
2315 static int
2316 flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop)
2317 {
2318 	caddr_t iobuf;
2319 	uint64_t appendsize;
2320 	int filedesc;
2321 	fbint_t wss;
2322 	fbint_t iosize;
2323 	int ret = 0;
2324 
2325 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0) {
2326 		filebench_log(LOG_ERROR, "zero iosize for flowop %s",
2327 		    flowop->fo_name);
2328 		return (FILEBENCH_ERROR);
2329 	}
2330 
2331 	if (filebench_randomno64(&appendsize, iosize, 1LL, NULL) != 0)
2332 		return (FILEBENCH_ERROR);
2333 
2334 	/* skip if attempting zero length append */
2335 	if (appendsize == 0) {
2336 		flowop_beginop(threadflow, flowop);
2337 		flowop_endop(threadflow, flowop, 0LL);
2338 		return (FILEBENCH_OK);
2339 	}
2340 
2341 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2342 	    &filedesc, appendsize)) != FILEBENCH_OK)
2343 		return (ret);
2344 
2345 	/* XXX wss is not being used */
2346 
2347 	/* Measure time to write bytes */
2348 	flowop_beginop(threadflow, flowop);
2349 
2350 	(void) lseek64(filedesc, 0, SEEK_END);
2351 	ret = write(filedesc, iobuf, appendsize);
2352 	if (ret != appendsize) {
2353 		filebench_log(LOG_ERROR,
2354 		    "Failed to write %llu bytes on fd %d: %s",
2355 		    (u_longlong_t)appendsize, filedesc, strerror(errno));
2356 		flowop_endop(threadflow, flowop, 0);
2357 		return (FILEBENCH_ERROR);
2358 	}
2359 
2360 	flowop_endop(threadflow, flowop, appendsize);
2361 
2362 	return (FILEBENCH_OK);
2363 }
2364 
2365 typedef struct testrandvar_priv {
2366 	uint64_t sample_count;
2367 	double val_sum;
2368 	double sqr_sum;
2369 } testrandvar_priv_t;
2370 
2371 /*
2372  * flowop to calculate various statistics from the number stream
2373  * produced by a random variable. This allows verification that the
2374  * random distribution used to define the random variable is producing
2375  * the expected distribution of random numbers.
2376  */
2377 /* ARGSUSED */
2378 static int
2379 flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop)
2380 {
2381 	testrandvar_priv_t	*mystats;
2382 	double			value;
2383 
2384 	if ((mystats = (testrandvar_priv_t *)flowop->fo_private) == NULL) {
2385 		filebench_log(LOG_ERROR, "testrandvar not initialized\n");
2386 		filebench_shutdown(1);
2387 		return (-1);
2388 	}
2389 
2390 	value = avd_get_dbl(flowop->fo_value);
2391 
2392 	mystats->sample_count++;
2393 	mystats->val_sum += value;
2394 	mystats->sqr_sum += (value * value);
2395 
2396 	return (0);
2397 }
2398 
2399 /*
2400  * Initialize the private data area used to accumulate the statistics
2401  */
2402 static int
2403 flowoplib_testrandvar_init(flowop_t *flowop)
2404 {
2405 	testrandvar_priv_t	*mystats;
2406 
2407 	if ((mystats = (testrandvar_priv_t *)
2408 	    malloc(sizeof (testrandvar_priv_t))) == NULL) {
2409 		filebench_log(LOG_ERROR, "could not initialize testrandvar");
2410 		filebench_shutdown(1);
2411 		return (-1);
2412 	}
2413 
2414 	mystats->sample_count = 0;
2415 	mystats->val_sum = 0;
2416 	mystats->sqr_sum = 0;
2417 	flowop->fo_private = (void *)mystats;
2418 
2419 	(void) ipc_mutex_unlock(&flowop->fo_lock);
2420 	return (0);
2421 }
2422 
2423 /*
2424  * Print out the accumulated statistics, and free the private storage
2425  */
2426 static void
2427 flowoplib_testrandvar_destruct(flowop_t *flowop)
2428 {
2429 	testrandvar_priv_t	*mystats;
2430 	double mean, std_dev, dbl_count;
2431 
2432 	(void) ipc_mutex_lock(&flowop->fo_lock);
2433 	if ((mystats = (testrandvar_priv_t *)
2434 	    flowop->fo_private) == NULL) {
2435 		(void) ipc_mutex_unlock(&flowop->fo_lock);
2436 		return;
2437 	}
2438 
2439 	flowop->fo_private = NULL;
2440 	(void) ipc_mutex_unlock(&flowop->fo_lock);
2441 
2442 	dbl_count = (double)mystats->sample_count;
2443 	mean = mystats->val_sum / dbl_count;
2444 	std_dev = sqrt((mystats->sqr_sum / dbl_count) - (mean * mean)) / mean;
2445 
2446 	filebench_log(LOG_VERBOSE,
2447 	    "testrandvar: ops = %llu, mean = %8.2lf, stddev = %8.2lf",
2448 	    (u_longlong_t)mystats->sample_count, mean, std_dev);
2449 	free(mystats);
2450 }
2451 
2452 /*
2453  * Prints usage information for flowop operations.
2454  */
2455 void
2456 flowoplib_usage()
2457 {
2458 	(void) fprintf(stderr,
2459 	    "flowop [openfile|createfile] name=<name>,fileset=<fname>\n");
2460 	(void) fprintf(stderr,
2461 	    "                       [,fd=<file desc num>]\n");
2462 	(void) fprintf(stderr, "\n");
2463 	(void) fprintf(stderr,
2464 	    "flowop closefile name=<name>,fd=<file desc num>]\n");
2465 	(void) fprintf(stderr, "\n");
2466 	(void) fprintf(stderr, "flowop deletefile name=<name>\n");
2467 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2468 	(void) fprintf(stderr,
2469 	    "                       [,fd=<file desc num>]\n");
2470 	(void) fprintf(stderr, "\n");
2471 	(void) fprintf(stderr, "flowop statfile name=<name>\n");
2472 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2473 	(void) fprintf(stderr,
2474 	    "                       [,fd=<file desc num>]\n");
2475 	(void) fprintf(stderr, "\n");
2476 	(void) fprintf(stderr,
2477 	    "flowop fsync name=<name>,fd=<file desc num>]\n");
2478 	(void) fprintf(stderr, "\n");
2479 	(void) fprintf(stderr,
2480 	    "flowop fsyncset name=<name>,fileset=<fname>]\n");
2481 	(void) fprintf(stderr, "\n");
2482 	(void) fprintf(stderr, "flowop [write|read|aiowrite] name=<name>, \n");
2483 	(void) fprintf(stderr,
2484 	    "                       filename|fileset=<fname>,\n");
2485 	(void) fprintf(stderr, "                       iosize=<size>\n");
2486 	(void) fprintf(stderr, "                       [,directio]\n");
2487 	(void) fprintf(stderr, "                       [,dsync]\n");
2488 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2489 	(void) fprintf(stderr, "                       [,random]\n");
2490 	(void) fprintf(stderr, "                       [,opennext]\n");
2491 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
2492 	(void) fprintf(stderr,
2493 	    "flowop [appendfile|appendfilerand] name=<name>, \n");
2494 	(void) fprintf(stderr,
2495 	    "                       filename|fileset=<fname>,\n");
2496 	(void) fprintf(stderr, "                       iosize=<size>\n");
2497 	(void) fprintf(stderr, "                       [,dsync]\n");
2498 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2499 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
2500 	(void) fprintf(stderr,
2501 	    "flowop [readwholefile|writewholefile] name=<name>, \n");
2502 	(void) fprintf(stderr,
2503 	    "                       filename|fileset=<fname>,\n");
2504 	(void) fprintf(stderr, "                       iosize=<size>\n");
2505 	(void) fprintf(stderr, "                       [,dsync]\n");
2506 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2507 	(void) fprintf(stderr, "\n");
2508 	(void) fprintf(stderr, "flowop aiowait name=<name>,target="
2509 	    "<aiowrite-flowop>\n");
2510 	(void) fprintf(stderr, "\n");
2511 	(void) fprintf(stderr, "flowop sempost name=<name>,"
2512 	    "target=<semblock-flowop>,\n");
2513 	(void) fprintf(stderr,
2514 	    "                       value=<increment-to-post>\n");
2515 	(void) fprintf(stderr, "\n");
2516 	(void) fprintf(stderr, "flowop semblock name=<name>,value="
2517 	    "<decrement-to-receive>,\n");
2518 	(void) fprintf(stderr, "                       highwater="
2519 	    "<inbound-queue-max>\n");
2520 	(void) fprintf(stderr, "\n");
2521 	(void) fprintf(stderr, "flowop block name=<name>\n");
2522 	(void) fprintf(stderr, "\n");
2523 	(void) fprintf(stderr,
2524 	    "flowop wakeup name=<name>,target=<block-flowop>,\n");
2525 	(void) fprintf(stderr, "\n");
2526 	(void) fprintf(stderr,
2527 	    "flowop hog name=<name>,value=<number-of-mem-ops>\n");
2528 	(void) fprintf(stderr,
2529 	    "flowop delay name=<name>,value=<number-of-seconds>\n");
2530 	(void) fprintf(stderr, "\n");
2531 	(void) fprintf(stderr, "flowop eventlimit name=<name>\n");
2532 	(void) fprintf(stderr, "flowop bwlimit name=<name>,value=<mb/s>\n");
2533 	(void) fprintf(stderr, "flowop iopslimit name=<name>,value=<iop/s>\n");
2534 	(void) fprintf(stderr,
2535 	    "flowop finishoncount name=<name>,value=<ops/s>\n");
2536 	(void) fprintf(stderr,
2537 	    "flowop finishonbytes name=<name>,value=<bytes>\n");
2538 	(void) fprintf(stderr, "\n");
2539 	(void) fprintf(stderr, "\n");
2540 }
2541