xref: /onnv-gate/usr/src/cmd/filebench/common/flowop_library.c (revision 8404:b96b8ad1c3e9)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  *
25  * Portions Copyright 2008 Denis Cheng
26  */
27 
28 #include "config.h"
29 
30 #include <sys/types.h>
31 #ifdef HAVE_SYS_ASYNCH_H
32 #include <sys/asynch.h>
33 #endif
34 #include <sys/ipc.h>
35 #include <sys/sem.h>
36 #include <sys/errno.h>
37 #include <sys/time.h>
38 #include <inttypes.h>
39 #include <fcntl.h>
40 #include <math.h>
41 #include <dirent.h>
42 
43 #ifdef HAVE_UTILITY_H
44 #include <utility.h>
45 #endif /* HAVE_UTILITY_H */
46 
47 #ifdef HAVE_AIO
48 #include <aio.h>
49 #endif /* HAVE_AIO */
50 
51 #ifdef HAVE_LIBAIO_H
52 #include <libaio.h>
53 #endif /* HAVE_LIBAIO_H */
54 
55 #ifdef HAVE_SYS_ASYNC_H
56 #include <sys/asynch.h>
57 #endif /* HAVE_SYS_ASYNC_H */
58 
59 #ifdef HAVE_AIO_H
60 #include <aio.h>
61 #endif /* HAVE_AIO_H */
62 
63 #ifndef HAVE_UINT_T
64 #define	uint_t unsigned int
65 #endif /* HAVE_UINT_T */
66 
67 #ifndef HAVE_AIOCB64_T
68 #define	aiocb64 aiocb
69 #endif /* HAVE_AIOCB64_T */
70 
71 #ifndef HAVE_SYSV_SEM
72 #include <semaphore.h>
73 #endif /* HAVE_SYSV_SEM */
74 
75 #include "filebench.h"
76 #include "flowop.h"
77 #include "fileset.h"
78 #include "fb_random.h"
79 #include "utils.h"
80 /*
81  * These routines implement the flowops from the f language. Each
82  * flowop has has a name such as "read", and a set of function pointers
83  * to call for initialization, execution and destruction of the flowop.
84  * The table flowoplib_funcs[] contains a flowoplib struct for each
85  * implemented flowop. Most flowops use a generic initialization function
86  * and all currently use a generic destruction function. All flowop
87  * functions referenced from the table are in this file, though, of
88  * course, they often call functions from other files.
89  *
90  * The flowop_init() routine uses the flowoplib_funcs[] table to
91  * create an initial set of "instance 0" flowops, one for each type of
92  * flowop, from which all other flowops are derived. These "instance 0"
93  * flowops are initialized with information from the table including
94  * pointers for their fo_init, fo_func and fo_destroy functions. When
95  * a flowop definition is encountered in an f language script, the
96  * "type" of flowop, such as "read" is used to search for the
97  * "instance 0" flowop named "read", then a new flowop is allocated
98  * which inherits its function pointers and other initial properties
99  * from the instance 0 flowop, and is given a new name as specified
100  * by the "name=" attribute.
101  */
102 
103 static int flowoplib_init_generic(flowop_t *flowop);
104 static void flowoplib_destruct_generic(flowop_t *flowop);
105 static void flowoplib_destruct_noop(flowop_t *flowop);
106 static int flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop);
107 static int flowoplib_print(threadflow_t *threadflow, flowop_t *flowop);
108 static int flowoplib_write(threadflow_t *threadflow, flowop_t *flowop);
109 #ifdef HAVE_AIO
110 static int flowoplib_aiowrite(threadflow_t *threadflow, flowop_t *flowop);
111 static int flowoplib_aiowait(threadflow_t *threadflow, flowop_t *flowop);
112 #endif
113 static int flowoplib_read(threadflow_t *threadflow, flowop_t *flowop);
114 static int flowoplib_block_init(flowop_t *flowop);
115 static int flowoplib_block(threadflow_t *threadflow, flowop_t *flowop);
116 static int flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop);
117 static int flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop);
118 static int flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop);
119 static int flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop);
120 static int flowoplib_sempost_init(flowop_t *flowop);
121 static int flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop);
122 static int flowoplib_semblock_init(flowop_t *flowop);
123 static void flowoplib_semblock_destruct(flowop_t *flowop);
124 static int flowoplib_eventlimit(threadflow_t *, flowop_t *flowop);
125 static int flowoplib_bwlimit(threadflow_t *, flowop_t *flowop);
126 static int flowoplib_iopslimit(threadflow_t *, flowop_t *flowop);
127 static int flowoplib_opslimit(threadflow_t *, flowop_t *flowop);
128 static int flowoplib_openfile(threadflow_t *, flowop_t *flowop);
129 static int flowoplib_openfile_common(threadflow_t *, flowop_t *flowop, int fd);
130 static int flowoplib_createfile(threadflow_t *, flowop_t *flowop);
131 static int flowoplib_closefile(threadflow_t *, flowop_t *flowop);
132 static int flowoplib_makedir(threadflow_t *, flowop_t *flowop);
133 static int flowoplib_removedir(threadflow_t *, flowop_t *flowop);
134 static int flowoplib_listdir(threadflow_t *, flowop_t *flowop);
135 static int flowoplib_fsync(threadflow_t *, flowop_t *flowop);
136 static int flowoplib_readwholefile(threadflow_t *, flowop_t *flowop);
137 static int flowoplib_writewholefile(threadflow_t *, flowop_t *flowop);
138 static int flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop);
139 static int flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop);
140 static int flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop);
141 static int flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop);
142 static int flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop);
143 static int flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop);
144 static int flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop);
145 static int flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop);
146 static int flowoplib_testrandvar_init(flowop_t *flowop);
147 static void flowoplib_testrandvar_destruct(flowop_t *flowop);
148 
149 typedef struct flowoplib {
150 	int	fl_type;
151 	int	fl_attrs;
152 	char	*fl_name;
153 	int	(*fl_init)();
154 	int	(*fl_func)();
155 	void	(*fl_destruct)();
156 } flowoplib_t;
157 
158 static flowoplib_t flowoplib_funcs[] = {
159 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "write", flowoplib_init_generic,
160 	flowoplib_write, flowoplib_destruct_generic,
161 	FLOW_TYPE_IO, FLOW_ATTR_READ, "read", flowoplib_init_generic,
162 	flowoplib_read, flowoplib_destruct_generic,
163 #ifdef HAVE_AIO
164 	FLOW_TYPE_AIO, FLOW_ATTR_WRITE, "aiowrite", flowoplib_init_generic,
165 	flowoplib_aiowrite, flowoplib_destruct_generic,
166 	FLOW_TYPE_AIO, 0, "aiowait", flowoplib_init_generic,
167 	flowoplib_aiowait, flowoplib_destruct_generic,
168 #endif
169 	FLOW_TYPE_SYNC, 0, "block", flowoplib_block_init,
170 	flowoplib_block, flowoplib_destruct_generic,
171 	FLOW_TYPE_SYNC, 0, "wakeup", flowoplib_init_generic,
172 	flowoplib_wakeup, flowoplib_destruct_generic,
173 	FLOW_TYPE_SYNC, 0, "semblock", flowoplib_semblock_init,
174 	flowoplib_semblock, flowoplib_semblock_destruct,
175 	FLOW_TYPE_SYNC, 0, "sempost", flowoplib_sempost_init,
176 	flowoplib_sempost, flowoplib_destruct_noop,
177 	FLOW_TYPE_OTHER, 0, "hog", flowoplib_init_generic,
178 	flowoplib_hog, flowoplib_destruct_generic,
179 	FLOW_TYPE_OTHER, 0, "delay", flowoplib_init_generic,
180 	flowoplib_delay, flowoplib_destruct_generic,
181 	FLOW_TYPE_OTHER, 0, "eventlimit", flowoplib_init_generic,
182 	flowoplib_eventlimit, flowoplib_destruct_generic,
183 	FLOW_TYPE_OTHER, 0, "bwlimit", flowoplib_init_generic,
184 	flowoplib_bwlimit, flowoplib_destruct_generic,
185 	FLOW_TYPE_OTHER, 0, "iopslimit", flowoplib_init_generic,
186 	flowoplib_iopslimit, flowoplib_destruct_generic,
187 	FLOW_TYPE_OTHER, 0, "opslimit", flowoplib_init_generic,
188 	flowoplib_opslimit, flowoplib_destruct_generic,
189 	FLOW_TYPE_OTHER, 0, "finishoncount", flowoplib_init_generic,
190 	flowoplib_finishoncount, flowoplib_destruct_generic,
191 	FLOW_TYPE_OTHER, 0, "finishonbytes", flowoplib_init_generic,
192 	flowoplib_finishonbytes, flowoplib_destruct_generic,
193 	FLOW_TYPE_IO, 0, "openfile", flowoplib_init_generic,
194 	flowoplib_openfile, flowoplib_destruct_generic,
195 	FLOW_TYPE_IO, 0, "createfile", flowoplib_init_generic,
196 	flowoplib_createfile, flowoplib_destruct_generic,
197 	FLOW_TYPE_IO, 0, "closefile", flowoplib_init_generic,
198 	flowoplib_closefile, flowoplib_destruct_generic,
199 	FLOW_TYPE_IO, 0, "makedir", flowoplib_init_generic,
200 	flowoplib_makedir, flowoplib_destruct_generic,
201 	FLOW_TYPE_IO, 0, "removedir", flowoplib_init_generic,
202 	flowoplib_removedir, flowoplib_destruct_generic,
203 	FLOW_TYPE_IO, 0, "listdir", flowoplib_init_generic,
204 	flowoplib_listdir, flowoplib_destruct_generic,
205 	FLOW_TYPE_IO, 0, "fsync", flowoplib_init_generic,
206 	flowoplib_fsync, flowoplib_destruct_generic,
207 	FLOW_TYPE_IO, 0, "fsyncset", flowoplib_init_generic,
208 	flowoplib_fsyncset, flowoplib_destruct_generic,
209 	FLOW_TYPE_IO, 0, "statfile", flowoplib_init_generic,
210 	flowoplib_statfile, flowoplib_destruct_generic,
211 	FLOW_TYPE_IO, FLOW_ATTR_READ, "readwholefile", flowoplib_init_generic,
212 	flowoplib_readwholefile, flowoplib_destruct_generic,
213 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfile", flowoplib_init_generic,
214 	flowoplib_appendfile, flowoplib_destruct_generic,
215 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfilerand", flowoplib_init_generic,
216 	flowoplib_appendfilerand, flowoplib_destruct_generic,
217 	FLOW_TYPE_IO, 0, "deletefile", flowoplib_init_generic,
218 	flowoplib_deletefile, flowoplib_destruct_generic,
219 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "writewholefile", flowoplib_init_generic,
220 	flowoplib_writewholefile, flowoplib_destruct_generic,
221 	FLOW_TYPE_OTHER, 0, "print", flowoplib_init_generic,
222 	flowoplib_print, flowoplib_destruct_generic,
223 	/* routine to calculate mean and stddev for output from a randvar */
224 	FLOW_TYPE_OTHER, 0, "testrandvar", flowoplib_testrandvar_init,
225 	flowoplib_testrandvar, flowoplib_testrandvar_destruct
226 };
227 
228 /*
229  * Loops through the master list of flowops defined in this
230  * module, and creates and initializes a flowop for each one
231  * by calling flowop_define. As a side effect of calling
232  * flowop define, the created flowops are placed on the
233  * master flowop list. All created flowops are set to
234  * instance "0".
235  */
236 void
237 flowoplib_init()
238 {
239 	int nops = sizeof (flowoplib_funcs) / sizeof (flowoplib_t);
240 	int i;
241 
242 	for (i = 0; i < nops; i++) {
243 		flowop_t *flowop;
244 		flowoplib_t *fl;
245 
246 		fl = &flowoplib_funcs[i];
247 
248 		if ((flowop = flowop_define(NULL,
249 		    fl->fl_name, NULL, NULL, 0, fl->fl_type)) == 0) {
250 			filebench_log(LOG_ERROR,
251 			    "failed to create flowop %s\n",
252 			    fl->fl_name);
253 			filebench_shutdown(1);
254 		}
255 
256 		flowop->fo_func = fl->fl_func;
257 		flowop->fo_init = fl->fl_init;
258 		flowop->fo_destruct = fl->fl_destruct;
259 		flowop->fo_attrs = fl->fl_attrs;
260 	}
261 }
262 
263 static int
264 flowoplib_init_generic(flowop_t *flowop)
265 {
266 	(void) ipc_mutex_unlock(&flowop->fo_lock);
267 	return (FILEBENCH_OK);
268 }
269 
270 static void
271 flowoplib_destruct_generic(flowop_t *flowop)
272 {
273 	char *buf;
274 
275 	/* release any local resources held by the flowop */
276 	(void) ipc_mutex_lock(&flowop->fo_lock);
277 	buf = flowop->fo_buf;
278 	flowop->fo_buf = NULL;
279 	(void) ipc_mutex_unlock(&flowop->fo_lock);
280 
281 	if (buf)
282 		free(buf);
283 }
284 
285 /*
286  * Special total noop destruct
287  */
288 /* ARGSUSED */
289 static void
290 flowoplib_destruct_noop(flowop_t *flowop)
291 {
292 }
293 
294 /*
295  * Generates a file attribute from flags in the supplied flowop.
296  * Sets FLOW_ATTR_DIRECTIO and/or FLOW_ATTR_DSYNC as needed.
297  */
298 static int
299 flowoplib_fileattrs(flowop_t *flowop)
300 {
301 	int attrs = 0;
302 
303 	if (avd_get_bool(flowop->fo_directio))
304 		attrs |= FLOW_ATTR_DIRECTIO;
305 
306 	if (avd_get_bool(flowop->fo_dsync))
307 		attrs |= FLOW_ATTR_DSYNC;
308 
309 	return (attrs);
310 }
311 
312 /*
313  * Obtain a filesetentry for a file. Result placed where filep points.
314  * Supply with a flowop and a flag to indicate whether an existent or
315  * non-existent file is required. Returns FILEBENCH_NORSC if all out
316  * of the appropriate type of directories, FILEBENCH_ERROR if the
317  * flowop does not point to a fileset, and FILEBENCH_OK otherwise.
318  */
319 static int
320 flowoplib_pickfile(filesetentry_t **filep, flowop_t *flowop, int flags, int tid)
321 {
322 	fileset_t	*fileset;
323 	int		fileindex;
324 
325 	if ((fileset = flowop->fo_fileset) == NULL) {
326 		filebench_log(LOG_ERROR, "flowop NO fileset");
327 		return (FILEBENCH_ERROR);
328 	}
329 
330 	if (flowop->fo_fileindex) {
331 		fileindex = (int)(avd_get_dbl(flowop->fo_fileindex) *
332 		    ((double)(fileset->fs_constentries / 2)));
333 		fileindex = fileindex % fileset->fs_constentries;
334 		flags |= FILESET_PICKBYINDEX;
335 	} else {
336 		fileindex = 0;
337 	}
338 
339 	if ((*filep = fileset_pick(fileset, FILESET_PICKFILE | flags,
340 	    tid, fileindex)) == NULL) {
341 		filebench_log(LOG_DEBUG_SCRIPT,
342 		    "flowop %s failed to pick file from fileset %s",
343 		    flowop->fo_name,
344 		    avd_get_str(fileset->fs_name));
345 		return (FILEBENCH_NORSC);
346 	}
347 
348 	return (FILEBENCH_OK);
349 }
350 
351 /*
352  * Obtain a filesetentry for a leaf directory. Result placed where dirp
353  * points. Supply with flowop and a flag to indicate whether an existent
354  * or non-existent leaf directory is required. Returns FILEBENCH_NORSC
355  * if all out of the appropriate type of directories, FILEBENCH_ERROR
356  * if the flowop does not point to a fileset, and FILEBENCH_OK otherwise.
357  */
358 static int
359 flowoplib_pickleafdir(filesetentry_t **dirp, flowop_t *flowop, int flags)
360 {
361 	fileset_t	*fileset;
362 	int		dirindex;
363 
364 	if ((fileset = flowop->fo_fileset) == NULL) {
365 		filebench_log(LOG_ERROR, "flowop NO fileset");
366 		return (FILEBENCH_ERROR);
367 	}
368 
369 	if (flowop->fo_fileindex) {
370 		dirindex = (int)(avd_get_dbl(flowop->fo_fileindex) *
371 		    ((double)(fileset->fs_constleafdirs / 2)));
372 		dirindex = dirindex % fileset->fs_constleafdirs;
373 		flags |= FILESET_PICKBYINDEX;
374 	} else {
375 		dirindex = 0;
376 	}
377 
378 	if ((*dirp = fileset_pick(fileset,
379 	    FILESET_PICKLEAFDIR | flags, 0, dirindex)) == NULL) {
380 		filebench_log(LOG_DEBUG_SCRIPT,
381 		    "flowop %s failed to pick directory from fileset %s",
382 		    flowop->fo_name,
383 		    avd_get_str(fileset->fs_name));
384 		return (FILEBENCH_NORSC);
385 	}
386 
387 	return (FILEBENCH_OK);
388 }
389 
390 /*
391  * Searches for a file descriptor. Tries the flowop's
392  * fo_fdnumber first and returns with it if it has been
393  * explicitly set (greater than 0). It next checks to
394  * see if a rotating file descriptor policy is in effect,
395  * and if not returns the fdnumber regardless of what
396  * it is. (note that if it is 0, it just selects to the
397  * default file descriptor in the threadflow's tf_fd
398  * array). If the rotating fd policy is in effect, it
399  * cycles from the end of the tf_fd array to one location
400  * beyond the maximum needed by the number of entries in
401  * the associated fileset on each invocation, then starts
402  * over from the end.
403  *
404  * The routine returns an index into the threadflow's
405  * tf_fd table where the actual file descriptor will be
406  * found. Note: the calling routine must not call this
407  * routine if the flowop does not have a fileset, and the
408  * flowop's fo_fdnumber is zero and fo_rotatefd is
409  * asserted, or an addressing fault may occur.
410  */
411 static int
412 flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop)
413 {
414 	fbint_t	entries;
415 	int fdnumber = flowop->fo_fdnumber;
416 
417 	/* If the script sets the fd explicitly */
418 	if (fdnumber > 0)
419 		return (fdnumber);
420 
421 	/* If the flowop defaults to persistent fd */
422 	if (!avd_get_bool(flowop->fo_rotatefd))
423 		return (fdnumber);
424 
425 	if (flowop->fo_fileset == NULL) {
426 		filebench_log(LOG_ERROR, "flowop NULL file");
427 		return (FILEBENCH_ERROR);
428 	}
429 
430 	entries = flowop->fo_fileset->fs_constentries;
431 
432 	/* Rotate the fd on each flowop invocation */
433 	if (entries > (THREADFLOW_MAXFD / 2)) {
434 		filebench_log(LOG_ERROR, "Out of file descriptors in flowop %s"
435 		    " (too many files : %llu",
436 		    flowop->fo_name, (u_longlong_t)entries);
437 		return (FILEBENCH_ERROR);
438 	}
439 
440 	/* First time around */
441 	if (threadflow->tf_fdrotor == 0)
442 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
443 
444 	/* One fd for every file in the set */
445 	if (entries == (THREADFLOW_MAXFD - threadflow->tf_fdrotor))
446 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
447 
448 
449 	threadflow->tf_fdrotor--;
450 	filebench_log(LOG_DEBUG_IMPL, "selected fd = %d",
451 	    threadflow->tf_fdrotor);
452 	return (threadflow->tf_fdrotor);
453 }
454 
455 /*
456  * Determines the file descriptor to use, and attempts to open
457  * the file if it is not already open. Also determines the wss
458  * value. Returns FILEBENCH_ERROR on errors, FILESET_NORSC if
459  * if flowop_openfile_common couldn't obtain an appropriate file
460  * from a the fileset, and FILEBENCH_OK otherwise.
461  */
462 static int
463 flowoplib_filesetup(threadflow_t *threadflow, flowop_t *flowop,
464     fbint_t *wssp, int *filedescp)
465 {
466 	int fd = flowoplib_fdnum(threadflow, flowop);
467 
468 	if (fd == -1)
469 		return (FILEBENCH_ERROR);
470 
471 	if (threadflow->tf_fd[fd] == 0) {
472 		int ret;
473 
474 		if ((ret = flowoplib_openfile_common(
475 		    threadflow, flowop, fd)) != FILEBENCH_OK)
476 			return (ret);
477 
478 		if (threadflow->tf_fse[fd]) {
479 			filebench_log(LOG_DEBUG_IMPL, "opened file %s",
480 			    threadflow->tf_fse[fd]->fse_path);
481 		} else {
482 			filebench_log(LOG_DEBUG_IMPL,
483 			    "opened device %s/%s",
484 			    avd_get_str(flowop->fo_fileset->fs_path),
485 			    avd_get_str(flowop->fo_fileset->fs_name));
486 		}
487 	}
488 
489 	*filedescp = threadflow->tf_fd[fd];
490 
491 	if ((*wssp = flowop->fo_constwss) == 0) {
492 		if (threadflow->tf_fse[fd])
493 			*wssp = threadflow->tf_fse[fd]->fse_size;
494 		else
495 			*wssp = avd_get_int(flowop->fo_fileset->fs_size);
496 	}
497 
498 	return (FILEBENCH_OK);
499 }
500 
501 /*
502  * Determines the io buffer or random offset into tf_mem for
503  * the IO operation. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
504  */
505 static int
506 flowoplib_iobufsetup(threadflow_t *threadflow, flowop_t *flowop,
507     caddr_t *iobufp, fbint_t iosize)
508 {
509 	long memsize;
510 	size_t memoffset;
511 
512 	if (iosize == 0) {
513 		filebench_log(LOG_ERROR, "zero iosize for thread %s",
514 		    flowop->fo_name);
515 		return (FILEBENCH_ERROR);
516 	}
517 
518 	if ((memsize = threadflow->tf_constmemsize) != 0) {
519 
520 		/* use tf_mem for I/O with random offset */
521 		if (filebench_randomno(&memoffset,
522 		    memsize, iosize, NULL) == -1) {
523 			filebench_log(LOG_ERROR,
524 			    "tf_memsize smaller than IO size for thread %s",
525 			    flowop->fo_name);
526 			return (FILEBENCH_ERROR);
527 		}
528 		*iobufp = threadflow->tf_mem + memoffset;
529 
530 	} else {
531 		/* use private I/O buffer */
532 		if ((flowop->fo_buf != NULL) &&
533 		    (flowop->fo_buf_size < iosize)) {
534 			/* too small, so free up and re-allocate */
535 			free(flowop->fo_buf);
536 			flowop->fo_buf = NULL;
537 		}
538 
539 		/*
540 		 * Allocate memory for the  buffer. The memory is freed
541 		 * by flowop_destruct_generic() or by this routine if more
542 		 * memory is needed for the buffer.
543 		 */
544 		if ((flowop->fo_buf == NULL) && ((flowop->fo_buf
545 		    = (char *)malloc(iosize)) == NULL))
546 			return (FILEBENCH_ERROR);
547 
548 		flowop->fo_buf_size = iosize;
549 		*iobufp = flowop->fo_buf;
550 	}
551 	return (FILEBENCH_OK);
552 }
553 
554 /*
555  * Determines the file descriptor to use, opens it if necessary, the
556  * io buffer or random offset into tf_mem for IO operation and the wss
557  * value. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
558  */
559 static int
560 flowoplib_iosetup(threadflow_t *threadflow, flowop_t *flowop,
561     fbint_t *wssp, caddr_t *iobufp, int *filedescp, fbint_t iosize)
562 {
563 	int ret;
564 
565 	if ((ret = flowoplib_filesetup(threadflow, flowop, wssp, filedescp)) !=
566 	    FILEBENCH_OK)
567 		return (ret);
568 
569 	if ((ret = flowoplib_iobufsetup(threadflow, flowop, iobufp, iosize)) !=
570 	    FILEBENCH_OK)
571 		return (ret);
572 
573 	return (FILEBENCH_OK);
574 }
575 
576 /*
577  * Emulate posix read / pread. If the flowop has a fileset,
578  * a file descriptor number index is fetched, otherwise a
579  * supplied fileobj file is used. In either case the specified
580  * file will be opened if not already open. If the flowop has
581  * neither a fileset or fileobj, an error is logged and FILEBENCH_ERROR
582  * returned.
583  *
584  * The actual read is done to a random offset in the
585  * threadflow's thread memory (tf_mem), with a size set by
586  * fo_iosize and at either a random disk offset within the
587  * working set size, or at the next sequential location. If
588  * any errors are encountered, FILEBENCH_ERROR is returned,
589  * if no appropriate file can be obtained from the fileset then
590  * FILEBENCH_NORSC is returned, otherise FILEBENCH_OK is returned.
591  */
592 static int
593 flowoplib_read(threadflow_t *threadflow, flowop_t *flowop)
594 {
595 	caddr_t iobuf;
596 	fbint_t wss;
597 	fbint_t iosize;
598 	int filedesc;
599 	int ret;
600 
601 
602 	iosize = avd_get_int(flowop->fo_iosize);
603 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
604 	    &filedesc, iosize)) != FILEBENCH_OK)
605 		return (ret);
606 
607 	if (avd_get_bool(flowop->fo_random)) {
608 		uint64_t fileoffset;
609 
610 		if (filebench_randomno64(&fileoffset,
611 		    wss, iosize, NULL) == -1) {
612 			filebench_log(LOG_ERROR,
613 			    "file size smaller than IO size for thread %s",
614 			    flowop->fo_name);
615 			return (FILEBENCH_ERROR);
616 		}
617 
618 		(void) flowop_beginop(threadflow, flowop);
619 		if ((ret = pread64(filedesc, iobuf,
620 		    iosize, (off64_t)fileoffset)) == -1) {
621 			(void) flowop_endop(threadflow, flowop, 0);
622 			filebench_log(LOG_ERROR,
623 			    "read file %s failed, offset %llu "
624 			    "io buffer %zd: %s",
625 			    avd_get_str(flowop->fo_fileset->fs_name),
626 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
627 			flowop_endop(threadflow, flowop, 0);
628 			return (FILEBENCH_ERROR);
629 		}
630 		(void) flowop_endop(threadflow, flowop, ret);
631 
632 		if ((ret == 0))
633 			(void) lseek64(filedesc, 0, SEEK_SET);
634 
635 	} else {
636 		(void) flowop_beginop(threadflow, flowop);
637 		if ((ret = read(filedesc, iobuf, iosize)) == -1) {
638 			(void) flowop_endop(threadflow, flowop, 0);
639 			filebench_log(LOG_ERROR,
640 			    "read file %s failed, io buffer %zd: %s",
641 			    avd_get_str(flowop->fo_fileset->fs_name),
642 			    iobuf, strerror(errno));
643 			(void) flowop_endop(threadflow, flowop, 0);
644 			return (FILEBENCH_ERROR);
645 		}
646 		(void) flowop_endop(threadflow, flowop, ret);
647 
648 		if ((ret == 0))
649 			(void) lseek64(filedesc, 0, SEEK_SET);
650 	}
651 
652 	return (FILEBENCH_OK);
653 }
654 
655 #ifdef HAVE_AIO
656 
657 /*
658  * Asynchronous write section. An Asynchronous IO element
659  * (aiolist_t) is used to associate the asynchronous write request with
660  * its subsequent completion. This element includes a aiocb64 struct
661  * that is used by posix aio_xxx calls to track the asynchronous writes.
662  * The flowops aiowrite and aiowait result in calls to these posix
663  * aio_xxx system routines to do the actual asynchronous write IO
664  * operations.
665  */
666 
667 
668 /*
669  * Allocates an asynchronous I/O list (aio, of type
670  * aiolist_t) element. Adds it to the flowop thread's
671  * threadflow aio list. Returns a pointer to the element.
672  */
673 static aiolist_t *
674 aio_allocate(flowop_t *flowop)
675 {
676 	aiolist_t *aiolist;
677 
678 	if ((aiolist = malloc(sizeof (aiolist_t))) == NULL) {
679 		filebench_log(LOG_ERROR, "malloc aiolist failed");
680 		filebench_shutdown(1);
681 	}
682 
683 	/* Add to list */
684 	if (flowop->fo_thread->tf_aiolist == NULL) {
685 		flowop->fo_thread->tf_aiolist = aiolist;
686 		aiolist->al_next = NULL;
687 	} else {
688 		aiolist->al_next = flowop->fo_thread->tf_aiolist;
689 		flowop->fo_thread->tf_aiolist = aiolist;
690 	}
691 	return (aiolist);
692 }
693 
694 /*
695  * Searches for the aiolist element that has a matching
696  * completion block, aiocb. If none found returns FILEBENCH_ERROR. If
697  * found, removes the aiolist element from flowop thread's
698  * list and returns FILEBENCH_OK.
699  */
700 static int
701 aio_deallocate(flowop_t *flowop, struct aiocb64 *aiocb)
702 {
703 	aiolist_t *aiolist = flowop->fo_thread->tf_aiolist;
704 	aiolist_t *previous = NULL;
705 	aiolist_t *match = NULL;
706 
707 	if (aiocb == NULL) {
708 		filebench_log(LOG_ERROR, "null aiocb deallocate");
709 		return (FILEBENCH_OK);
710 	}
711 
712 	while (aiolist) {
713 		if (aiocb == &(aiolist->al_aiocb)) {
714 			match = aiolist;
715 			break;
716 		}
717 		previous = aiolist;
718 		aiolist = aiolist->al_next;
719 	}
720 
721 	if (match == NULL)
722 		return (FILEBENCH_ERROR);
723 
724 	/* Remove from the list */
725 	if (previous)
726 		previous->al_next = match->al_next;
727 	else
728 		flowop->fo_thread->tf_aiolist = match->al_next;
729 
730 	return (FILEBENCH_OK);
731 }
732 
733 /*
734  * Emulate posix aiowrite(). Determines which file to use,
735  * either one file of a fileset, or the file associated
736  * with a fileobj, allocates and fills an aiolist_t element
737  * for the write, and issues the asynchronous write. This
738  * operation is only valid for random IO, and returns an
739  * error if the flowop is set for sequential IO. Returns
740  * FILEBENCH_OK on success, FILEBENCH_NORSC if iosetup can't
741  * obtain a file to open, and FILEBENCH_ERROR on any
742  * encountered error.
743  */
744 static int
745 flowoplib_aiowrite(threadflow_t *threadflow, flowop_t *flowop)
746 {
747 	caddr_t iobuf;
748 	fbint_t wss;
749 	fbint_t iosize;
750 	int filedesc;
751 	int ret;
752 
753 	iosize = avd_get_int(flowop->fo_iosize);
754 
755 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
756 	    &filedesc, iosize)) != FILEBENCH_OK)
757 		return (ret);
758 
759 	if (avd_get_bool(flowop->fo_random)) {
760 		uint64_t fileoffset;
761 		struct aiocb64 *aiocb;
762 		aiolist_t *aiolist;
763 
764 		if (filebench_randomno64(&fileoffset,
765 		    wss, iosize, NULL) == -1) {
766 			filebench_log(LOG_ERROR,
767 			    "file size smaller than IO size for thread %s",
768 			    flowop->fo_name);
769 			return (FILEBENCH_ERROR);
770 		}
771 
772 		aiolist = aio_allocate(flowop);
773 		aiolist->al_type = AL_WRITE;
774 		aiocb = &aiolist->al_aiocb;
775 
776 		aiocb->aio_fildes = filedesc;
777 		aiocb->aio_buf = iobuf;
778 		aiocb->aio_nbytes = (size_t)iosize;
779 		aiocb->aio_offset = (off64_t)fileoffset;
780 		aiocb->aio_reqprio = 0;
781 
782 		filebench_log(LOG_DEBUG_IMPL,
783 		    "aio fd=%d, bytes=%llu, offset=%llu",
784 		    filedesc, (u_longlong_t)iosize, (u_longlong_t)fileoffset);
785 
786 		flowop_beginop(threadflow, flowop);
787 		if (aio_write64(aiocb) < 0) {
788 			filebench_log(LOG_ERROR, "aiowrite failed: %s",
789 			    strerror(errno));
790 			filebench_shutdown(1);
791 		}
792 		flowop_endop(threadflow, flowop, iosize);
793 	} else {
794 		return (FILEBENCH_ERROR);
795 	}
796 
797 	return (FILEBENCH_OK);
798 }
799 
800 
801 
802 #define	MAXREAP 4096
803 
804 /*
805  * Emulate posix aiowait(). Waits for the completion of half the
806  * outstanding asynchronous IOs, or a single IO, which ever is
807  * larger. The routine will return after a sufficient number of
808  * completed calls issued by any thread in the procflow have
809  * completed, or a 1 second timout elapses. All completed
810  * IO operations are deleted from the thread's aiolist.
811  */
812 static int
813 flowoplib_aiowait(threadflow_t *threadflow, flowop_t *flowop)
814 {
815 	struct aiocb64 **worklist;
816 	aiolist_t *aio = flowop->fo_thread->tf_aiolist;
817 	int uncompleted = 0;
818 
819 	worklist = calloc(MAXREAP, sizeof (struct aiocb64 *));
820 
821 	/* Count the list of pending aios */
822 	while (aio) {
823 		uncompleted++;
824 		aio = aio->al_next;
825 	}
826 
827 	do {
828 		uint_t ncompleted = 0;
829 		uint_t todo;
830 		struct timespec timeout;
831 		int inprogress;
832 		int i;
833 
834 		/* Wait for half of the outstanding requests */
835 		timeout.tv_sec = 1;
836 		timeout.tv_nsec = 0;
837 
838 		if (uncompleted > MAXREAP)
839 			todo = MAXREAP;
840 		else
841 			todo = uncompleted / 2;
842 
843 		if (todo == 0)
844 			todo = 1;
845 
846 		flowop_beginop(threadflow, flowop);
847 
848 #ifdef HAVE_AIOWAITN
849 		if ((aio_waitn64((struct aiocb64 **)worklist,
850 		    MAXREAP, &todo, &timeout) == -1) &&
851 		    errno && (errno != ETIME)) {
852 			filebench_log(LOG_ERROR,
853 			    "aiowait failed: %s, outstanding = %d, "
854 			    "ncompleted = %d ",
855 			    strerror(errno), uncompleted, todo);
856 		}
857 
858 		ncompleted = todo;
859 		/* Take the  completed I/Os from the list */
860 		inprogress = 0;
861 		for (i = 0; i < ncompleted; i++) {
862 			if ((aio_return64(worklist[i]) == -1) &&
863 			    (errno == EINPROGRESS)) {
864 				inprogress++;
865 				continue;
866 			}
867 			if (aio_deallocate(flowop, worklist[i]) < 0) {
868 				filebench_log(LOG_ERROR, "Could not remove "
869 				    "aio from list ");
870 				flowop_endop(threadflow, flowop, 0);
871 				return (FILEBENCH_ERROR);
872 			}
873 		}
874 
875 		uncompleted -= ncompleted;
876 		uncompleted += inprogress;
877 
878 #else
879 
880 		for (ncompleted = 0, inprogress = 0,
881 		    aio = flowop->fo_thread->tf_aiolist;
882 		    ncompleted < todo, aio != NULL; aio = aio->al_next) {
883 			int result = aio_error64(&aio->al_aiocb);
884 
885 			if (result == EINPROGRESS) {
886 				inprogress++;
887 				continue;
888 			}
889 
890 			if ((aio_return64(&aio->al_aiocb) == -1) || result) {
891 				filebench_log(LOG_ERROR, "aio failed: %s",
892 				    strerror(result));
893 				continue;
894 			}
895 
896 			ncompleted++;
897 
898 			if (aio_deallocate(flowop, &aio->al_aiocb) < 0) {
899 				filebench_log(LOG_ERROR, "Could not remove aio "
900 				    "from list ");
901 				flowop_endop(threadflow, flowop, 0);
902 				return (FILEBENCH_ERROR);
903 			}
904 		}
905 
906 		uncompleted -= ncompleted;
907 
908 #endif
909 		filebench_log(LOG_DEBUG_SCRIPT,
910 		    "aio2 completed %d ios, uncompleted = %d, inprogress = %d",
911 		    ncompleted, uncompleted, inprogress);
912 
913 	} while (uncompleted > MAXREAP);
914 
915 	flowop_endop(threadflow, flowop, 0);
916 
917 	free(worklist);
918 
919 	return (FILEBENCH_OK);
920 }
921 
922 #endif /* HAVE_AIO */
923 
924 /*
925  * Initializes a "flowop_block" flowop. Specifically, it
926  * initializes the flowop's fo_cv and unlocks the fo_lock.
927  */
928 static int
929 flowoplib_block_init(flowop_t *flowop)
930 {
931 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d block init address %zx",
932 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
933 	(void) pthread_cond_init(&flowop->fo_cv, ipc_condattr());
934 	(void) ipc_mutex_unlock(&flowop->fo_lock);
935 
936 	return (FILEBENCH_OK);
937 }
938 
939 /*
940  * Blocks the threadflow until woken up by flowoplib_wakeup.
941  * The routine blocks on the flowop's fo_cv condition variable.
942  */
943 static int
944 flowoplib_block(threadflow_t *threadflow, flowop_t *flowop)
945 {
946 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d blocking at address %zx",
947 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
948 	(void) ipc_mutex_lock(&flowop->fo_lock);
949 
950 	flowop_beginop(threadflow, flowop);
951 	(void) pthread_cond_wait(&flowop->fo_cv, &flowop->fo_lock);
952 	flowop_endop(threadflow, flowop, 0);
953 
954 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
955 	    flowop->fo_name, flowop->fo_instance);
956 
957 	(void) ipc_mutex_unlock(&flowop->fo_lock);
958 
959 	return (FILEBENCH_OK);
960 }
961 
962 /*
963  * Wakes up one or more target blocking flowops.
964  * Sends broadcasts on the fo_cv condition variables of all
965  * flowops on the target list, except those that are
966  * FLOW_MASTER flowops. The target list consists of all
967  * flowops whose name matches this flowop's "fo_targetname"
968  * attribute. The target list is generated on the first
969  * invocation, and the run will be shutdown if no targets
970  * are found. Otherwise the routine always returns FILEBENCH_OK.
971  */
972 static int
973 flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop)
974 {
975 	flowop_t *target;
976 
977 	/* if this is the first wakeup, create the wakeup list */
978 	if (flowop->fo_targets == NULL) {
979 		flowop_t *result = flowop_find(flowop->fo_targetname);
980 
981 		flowop->fo_targets = result;
982 		if (result == NULL) {
983 			filebench_log(LOG_ERROR,
984 			    "wakeup: could not find op %s for thread %s",
985 			    flowop->fo_targetname,
986 			    threadflow->tf_name);
987 			filebench_shutdown(1);
988 		}
989 		while (result) {
990 			result->fo_targetnext =
991 			    result->fo_resultnext;
992 			result = result->fo_resultnext;
993 		}
994 	}
995 
996 	target = flowop->fo_targets;
997 
998 	/* wakeup the targets */
999 	while (target) {
1000 		if (target->fo_instance == FLOW_MASTER) {
1001 			target = target->fo_targetnext;
1002 			continue;
1003 		}
1004 		filebench_log(LOG_DEBUG_IMPL,
1005 		    "wakeup flow %s-%d at address %zx",
1006 		    target->fo_name,
1007 		    target->fo_instance,
1008 		    &target->fo_cv);
1009 
1010 		flowop_beginop(threadflow, flowop);
1011 		(void) ipc_mutex_lock(&target->fo_lock);
1012 		(void) pthread_cond_broadcast(&target->fo_cv);
1013 		(void) ipc_mutex_unlock(&target->fo_lock);
1014 		flowop_endop(threadflow, flowop, 0);
1015 
1016 		target = target->fo_targetnext;
1017 	}
1018 
1019 	return (FILEBENCH_OK);
1020 }
1021 
1022 /*
1023  * "think time" routines. the "hog" routine consumes cpu cycles as
1024  * it "thinks", while the "delay" flowop simply calls sleep() to delay
1025  * for a given number of seconds without consuming cpu cycles.
1026  */
1027 
1028 
1029 /*
1030  * Consumes CPU cycles and memory bandwidth by looping for
1031  * flowop->fo_value times. With each loop sets memory location
1032  * threadflow->tf_mem to 1.
1033  */
1034 static int
1035 flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop)
1036 {
1037 	uint64_t value = avd_get_int(flowop->fo_value);
1038 	int i;
1039 
1040 	filebench_log(LOG_DEBUG_IMPL, "hog enter");
1041 	flowop_beginop(threadflow, flowop);
1042 	if (threadflow->tf_mem != NULL) {
1043 		for (i = 0; i < value; i++)
1044 			*(threadflow->tf_mem) = 1;
1045 	}
1046 	flowop_endop(threadflow, flowop, 0);
1047 	filebench_log(LOG_DEBUG_IMPL, "hog exit");
1048 	return (FILEBENCH_OK);
1049 }
1050 
1051 
1052 /*
1053  * Delays for fo_value seconds.
1054  */
1055 static int
1056 flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop)
1057 {
1058 	int value = avd_get_int(flowop->fo_value);
1059 
1060 	flowop_beginop(threadflow, flowop);
1061 	(void) sleep(value);
1062 	flowop_endop(threadflow, flowop, 0);
1063 	return (FILEBENCH_OK);
1064 }
1065 
1066 /*
1067  * Rate limiting routines. This is the event consuming half of the
1068  * event system. Each of the four following routines will limit the rate
1069  * to one unit of either calls, issued I/O operations, issued filebench
1070  * operations, or I/O bandwidth. Since there is only one event generator,
1071  * the events will be divided amoung multiple instances of an event
1072  * consumer, and further divided among different consumers if more than
1073  * one has been defined. There is no mechanism to enforce equal sharing
1074  * of events.
1075  */
1076 
1077 /*
1078  * Completes one invocation per posted event. If eventgen_q
1079  * has an event count greater than zero, one will be removed
1080  * (count decremented), otherwise the calling thread will
1081  * block until another event has been posted. Always returns 0
1082  */
1083 static int
1084 flowoplib_eventlimit(threadflow_t *threadflow, flowop_t *flowop)
1085 {
1086 	/* Immediately bail if not set/enabled */
1087 	if (filebench_shm->shm_eventgen_hz == NULL)
1088 		return (FILEBENCH_OK);
1089 
1090 	if (flowop->fo_initted == 0) {
1091 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1092 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1093 		flowop->fo_initted = 1;
1094 	}
1095 
1096 	flowop_beginop(threadflow, flowop);
1097 	while (filebench_shm->shm_eventgen_hz != NULL) {
1098 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1099 		if (filebench_shm->shm_eventgen_q > 0) {
1100 			filebench_shm->shm_eventgen_q--;
1101 			(void) ipc_mutex_unlock(
1102 			    &filebench_shm->shm_eventgen_lock);
1103 			break;
1104 		}
1105 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1106 		    &filebench_shm->shm_eventgen_lock);
1107 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1108 	}
1109 	flowop_endop(threadflow, flowop, 0);
1110 	return (FILEBENCH_OK);
1111 }
1112 
1113 static int
1114 flowoplib_event_find_target(threadflow_t *threadflow, flowop_t *flowop)
1115 {
1116 	if (flowop->fo_targetname[0] != '\0') {
1117 
1118 		/* Try to use statistics from specific flowop */
1119 		flowop->fo_targets =
1120 		    flowop_find_from_list(flowop->fo_targetname,
1121 		    threadflow->tf_thrd_fops);
1122 		if (flowop->fo_targets == NULL) {
1123 			filebench_log(LOG_ERROR,
1124 			    "limit target: could not find flowop %s",
1125 			    flowop->fo_targetname);
1126 			filebench_shutdown(1);
1127 			return (FILEBENCH_ERROR);
1128 		}
1129 	} else {
1130 		/* use total workload statistics */
1131 		flowop->fo_targets = NULL;
1132 	}
1133 	return (FILEBENCH_OK);
1134 }
1135 
1136 /*
1137  * Blocks the calling thread if the number of issued I/O
1138  * operations exceeds the number of posted events, thus
1139  * limiting the average I/O operation rate to the rate
1140  * specified by eventgen_hz. Always returns FILEBENCH_OK.
1141  */
1142 static int
1143 flowoplib_iopslimit(threadflow_t *threadflow, flowop_t *flowop)
1144 {
1145 	uint64_t iops;
1146 	uint64_t delta;
1147 	uint64_t events;
1148 
1149 	/* Immediately bail if not set/enabled */
1150 	if (filebench_shm->shm_eventgen_hz == NULL)
1151 		return (FILEBENCH_OK);
1152 
1153 	if (flowop->fo_initted == 0) {
1154 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1155 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1156 		flowop->fo_initted = 1;
1157 
1158 		if (flowoplib_event_find_target(threadflow, flowop)
1159 		    == FILEBENCH_ERROR)
1160 			return (FILEBENCH_ERROR);
1161 
1162 		if (flowop->fo_targets && ((flowop->fo_targets->fo_attrs &
1163 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
1164 			filebench_log(LOG_ERROR,
1165 			    "WARNING: Flowop %s does no IO",
1166 			    flowop->fo_targets->fo_name);
1167 			filebench_shutdown(1);
1168 			return (FILEBENCH_ERROR);
1169 		}
1170 	}
1171 
1172 	if (flowop->fo_targets) {
1173 		/*
1174 		 * Note that fs_count is already the sum of fs_rcount
1175 		 * and fs_wcount if looking at a single flowop.
1176 		 */
1177 		iops = flowop->fo_targets->fo_stats.fs_count;
1178 	} else {
1179 		(void) ipc_mutex_lock(&controlstats_lock);
1180 		iops = (controlstats.fs_rcount +
1181 		    controlstats.fs_wcount);
1182 		(void) ipc_mutex_unlock(&controlstats_lock);
1183 	}
1184 
1185 	/* Is this the first time around */
1186 	if (flowop->fo_tputlast == 0) {
1187 		flowop->fo_tputlast = iops;
1188 		return (FILEBENCH_OK);
1189 	}
1190 
1191 	delta = iops - flowop->fo_tputlast;
1192 	flowop->fo_tputbucket -= delta;
1193 	flowop->fo_tputlast = iops;
1194 
1195 	/* No need to block if the q isn't empty */
1196 	if (flowop->fo_tputbucket >= 0LL) {
1197 		flowop_endop(threadflow, flowop, 0);
1198 		return (FILEBENCH_OK);
1199 	}
1200 
1201 	iops = flowop->fo_tputbucket * -1;
1202 	events = iops;
1203 
1204 	flowop_beginop(threadflow, flowop);
1205 	while (filebench_shm->shm_eventgen_hz != NULL) {
1206 
1207 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1208 		if (filebench_shm->shm_eventgen_q >= events) {
1209 			filebench_shm->shm_eventgen_q -= events;
1210 			(void) ipc_mutex_unlock(
1211 			    &filebench_shm->shm_eventgen_lock);
1212 			flowop->fo_tputbucket += events;
1213 			break;
1214 		}
1215 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1216 		    &filebench_shm->shm_eventgen_lock);
1217 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1218 	}
1219 	flowop_endop(threadflow, flowop, 0);
1220 
1221 	return (FILEBENCH_OK);
1222 }
1223 
1224 /*
1225  * Blocks the calling thread if the number of issued filebench
1226  * operations exceeds the number of posted events, thus limiting
1227  * the average filebench operation rate to the rate specified by
1228  * eventgen_hz. Always returns FILEBENCH_OK.
1229  */
1230 static int
1231 flowoplib_opslimit(threadflow_t *threadflow, flowop_t *flowop)
1232 {
1233 	uint64_t ops;
1234 	uint64_t delta;
1235 	uint64_t events;
1236 
1237 	/* Immediately bail if not set/enabled */
1238 	if (filebench_shm->shm_eventgen_hz == NULL)
1239 		return (FILEBENCH_OK);
1240 
1241 	if (flowop->fo_initted == 0) {
1242 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1243 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1244 		flowop->fo_initted = 1;
1245 
1246 		if (flowoplib_event_find_target(threadflow, flowop)
1247 		    == FILEBENCH_ERROR)
1248 			return (FILEBENCH_ERROR);
1249 	}
1250 
1251 	if (flowop->fo_targets) {
1252 		ops = flowop->fo_targets->fo_stats.fs_count;
1253 	} else {
1254 		(void) ipc_mutex_lock(&controlstats_lock);
1255 		ops = controlstats.fs_count;
1256 		(void) ipc_mutex_unlock(&controlstats_lock);
1257 	}
1258 
1259 	/* Is this the first time around */
1260 	if (flowop->fo_tputlast == 0) {
1261 		flowop->fo_tputlast = ops;
1262 		return (FILEBENCH_OK);
1263 	}
1264 
1265 	delta = ops - flowop->fo_tputlast;
1266 	flowop->fo_tputbucket -= delta;
1267 	flowop->fo_tputlast = ops;
1268 
1269 	/* No need to block if the q isn't empty */
1270 	if (flowop->fo_tputbucket >= 0LL) {
1271 		flowop_endop(threadflow, flowop, 0);
1272 		return (FILEBENCH_OK);
1273 	}
1274 
1275 	ops = flowop->fo_tputbucket * -1;
1276 	events = ops;
1277 
1278 	flowop_beginop(threadflow, flowop);
1279 	while (filebench_shm->shm_eventgen_hz != NULL) {
1280 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1281 		if (filebench_shm->shm_eventgen_q >= events) {
1282 			filebench_shm->shm_eventgen_q -= events;
1283 			(void) ipc_mutex_unlock(
1284 			    &filebench_shm->shm_eventgen_lock);
1285 			flowop->fo_tputbucket += events;
1286 			break;
1287 		}
1288 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1289 		    &filebench_shm->shm_eventgen_lock);
1290 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1291 	}
1292 	flowop_endop(threadflow, flowop, 0);
1293 
1294 	return (FILEBENCH_OK);
1295 }
1296 
1297 
1298 /*
1299  * Blocks the calling thread if the number of bytes of I/O
1300  * issued exceeds one megabyte times the number of posted
1301  * events, thus limiting the average I/O byte rate to one
1302  * megabyte times the event rate as set by eventgen_hz.
1303  * Always retuns FILEBENCH_OK.
1304  */
1305 static int
1306 flowoplib_bwlimit(threadflow_t *threadflow, flowop_t *flowop)
1307 {
1308 	uint64_t bytes;
1309 	uint64_t delta;
1310 	uint64_t events;
1311 
1312 	/* Immediately bail if not set/enabled */
1313 	if (filebench_shm->shm_eventgen_hz == NULL)
1314 		return (FILEBENCH_OK);
1315 
1316 	if (flowop->fo_initted == 0) {
1317 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1318 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1319 		flowop->fo_initted = 1;
1320 
1321 		if (flowoplib_event_find_target(threadflow, flowop)
1322 		    == FILEBENCH_ERROR)
1323 			return (FILEBENCH_ERROR);
1324 
1325 		if ((flowop->fo_targets) &&
1326 		    ((flowop->fo_targets->fo_attrs &
1327 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
1328 			filebench_log(LOG_ERROR,
1329 			    "WARNING: Flowop %s does no Reads or Writes",
1330 			    flowop->fo_targets->fo_name);
1331 			filebench_shutdown(1);
1332 			return (FILEBENCH_ERROR);
1333 		}
1334 	}
1335 
1336 	if (flowop->fo_targets) {
1337 		/*
1338 		 * Note that fs_bytes is already the sum of fs_rbytes
1339 		 * and fs_wbytes if looking at a single flowop.
1340 		 */
1341 		bytes = flowop->fo_targets->fo_stats.fs_bytes;
1342 	} else {
1343 		(void) ipc_mutex_lock(&controlstats_lock);
1344 		bytes = (controlstats.fs_rbytes +
1345 		    controlstats.fs_wbytes);
1346 		(void) ipc_mutex_unlock(&controlstats_lock);
1347 	}
1348 
1349 	/* Is this the first time around? */
1350 	if (flowop->fo_tputlast == 0) {
1351 		flowop->fo_tputlast = bytes;
1352 		return (FILEBENCH_OK);
1353 	}
1354 
1355 	delta = bytes - flowop->fo_tputlast;
1356 	flowop->fo_tputbucket -= delta;
1357 	flowop->fo_tputlast = bytes;
1358 
1359 	/* No need to block if the q isn't empty */
1360 	if (flowop->fo_tputbucket >= 0LL) {
1361 		flowop_endop(threadflow, flowop, 0);
1362 		return (FILEBENCH_OK);
1363 	}
1364 
1365 	bytes = flowop->fo_tputbucket * -1;
1366 	events = (bytes / MB) + 1;
1367 
1368 	filebench_log(LOG_DEBUG_IMPL, "%llu bytes, %llu events",
1369 	    (u_longlong_t)bytes, (u_longlong_t)events);
1370 
1371 	flowop_beginop(threadflow, flowop);
1372 	while (filebench_shm->shm_eventgen_hz != NULL) {
1373 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1374 		if (filebench_shm->shm_eventgen_q >= events) {
1375 			filebench_shm->shm_eventgen_q -= events;
1376 			(void) ipc_mutex_unlock(
1377 			    &filebench_shm->shm_eventgen_lock);
1378 			flowop->fo_tputbucket += (events * MB);
1379 			break;
1380 		}
1381 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1382 		    &filebench_shm->shm_eventgen_lock);
1383 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1384 	}
1385 	flowop_endop(threadflow, flowop, 0);
1386 
1387 	return (FILEBENCH_OK);
1388 }
1389 
1390 /*
1391  * These flowops terminate a benchmark run when either the specified
1392  * number of bytes of I/O (flowoplib_finishonbytes) or the specified
1393  * number of I/O operations (flowoplib_finishoncount) have been generated.
1394  */
1395 
1396 
1397 /*
1398  * Stop filebench run when specified number of I/O bytes have been
1399  * transferred. Compares controlstats.fs_bytes with flowop->value,
1400  * and if greater returns 1, stopping the run, if not, returns 0
1401  * to continue running.
1402  */
1403 static int
1404 flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop)
1405 {
1406 	uint64_t bytes_io;		/* Bytes of I/O delivered so far */
1407 	uint64_t byte_lim = flowop->fo_constvalue;  /* Total Bytes desired */
1408 						    /* Uses constant value */
1409 
1410 	if (flowop->fo_initted == 0) {
1411 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1412 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1413 		flowop->fo_initted = 1;
1414 
1415 		if (flowoplib_event_find_target(threadflow, flowop)
1416 		    == FILEBENCH_ERROR)
1417 			return (FILEBENCH_ERROR);
1418 
1419 		if ((flowop->fo_targets) &&
1420 		    ((flowop->fo_targets->fo_attrs &
1421 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
1422 			filebench_log(LOG_ERROR,
1423 			    "WARNING: Flowop %s does no Reads or Writes",
1424 			    flowop->fo_targets->fo_name);
1425 			filebench_shutdown(1);
1426 			return (FILEBENCH_ERROR);
1427 		}
1428 	}
1429 
1430 	if (flowop->fo_targets) {
1431 		bytes_io = flowop->fo_targets->fo_stats.fs_bytes;
1432 	} else {
1433 		(void) ipc_mutex_lock(&controlstats_lock);
1434 		bytes_io = controlstats.fs_bytes;
1435 		(void) ipc_mutex_unlock(&controlstats_lock);
1436 	}
1437 
1438 	flowop_beginop(threadflow, flowop);
1439 	if (bytes_io > byte_lim) {
1440 		flowop_endop(threadflow, flowop, 0);
1441 		return (FILEBENCH_DONE);
1442 	}
1443 	flowop_endop(threadflow, flowop, 0);
1444 
1445 	return (FILEBENCH_OK);
1446 }
1447 
1448 /*
1449  * Stop filebench run when specified number of I/O operations have
1450  * been performed. Compares controlstats.fs_count with *flowop->value,
1451  * and if greater returns 1, stopping the run, if not, returns FILEBENCH_OK
1452  * to continue running.
1453  */
1454 static int
1455 flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop)
1456 {
1457 	uint64_t ops;
1458 	uint64_t count = flowop->fo_constvalue; /* use constant value */
1459 
1460 	if (flowop->fo_initted == 0) {
1461 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1462 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1463 		flowop->fo_initted = 1;
1464 
1465 		if (flowoplib_event_find_target(threadflow, flowop)
1466 		    == FILEBENCH_ERROR)
1467 			return (FILEBENCH_ERROR);
1468 	}
1469 
1470 	if (flowop->fo_targets) {
1471 		ops = flowop->fo_targets->fo_stats.fs_count;
1472 	} else {
1473 		(void) ipc_mutex_lock(&controlstats_lock);
1474 		ops = controlstats.fs_count;
1475 		(void) ipc_mutex_unlock(&controlstats_lock);
1476 	}
1477 
1478 	flowop_beginop(threadflow, flowop);
1479 	if (ops >= count) {
1480 		flowop_endop(threadflow, flowop, 0);
1481 		return (FILEBENCH_DONE);
1482 	}
1483 	flowop_endop(threadflow, flowop, 0);
1484 
1485 	return (FILEBENCH_OK);
1486 }
1487 
1488 /*
1489  * Semaphore synchronization using either System V semaphores or
1490  * posix semaphores. If System V semaphores are available, they will be
1491  * used, otherwise posix semaphores will be used.
1492  */
1493 
1494 
1495 /*
1496  * Initializes the filebench "block on semaphore" flowop.
1497  * If System V semaphores are implemented, the routine
1498  * initializes the System V semaphore subsystem if it hasn't
1499  * already been initialized, also allocates a pair of semids
1500  * and initializes the highwater System V semaphore.
1501  * If no System V semaphores, then does nothing special.
1502  * Returns FILEBENCH_ERROR if it cannot acquire a set of System V semphores
1503  * or if the initial post to the semaphore set fails. Returns FILEBENCH_OK
1504  * on success.
1505  */
1506 static int
1507 flowoplib_semblock_init(flowop_t *flowop)
1508 {
1509 
1510 #ifdef HAVE_SYSV_SEM
1511 	int sys_semid;
1512 	struct sembuf sbuf[2];
1513 	int highwater;
1514 
1515 	ipc_seminit();
1516 
1517 	flowop->fo_semid_lw = ipc_semidalloc();
1518 	flowop->fo_semid_hw = ipc_semidalloc();
1519 
1520 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d semblock init semid=%x",
1521 	    flowop->fo_name, flowop->fo_instance, flowop->fo_semid_lw);
1522 
1523 	sys_semid = filebench_shm->shm_sys_semid;
1524 
1525 	if ((highwater = flowop->fo_semid_hw) == 0)
1526 		highwater = flowop->fo_constvalue; /* use constant value */
1527 
1528 	filebench_log(LOG_DEBUG_IMPL, "setting highwater to : %d", highwater);
1529 
1530 	sbuf[0].sem_num = (short)highwater;
1531 	sbuf[0].sem_op = avd_get_int(flowop->fo_highwater);
1532 	sbuf[0].sem_flg = 0;
1533 	if ((semop(sys_semid, &sbuf[0], 1) == -1) && errno) {
1534 		filebench_log(LOG_ERROR, "semblock init post failed: %s (%d,"
1535 		    "%d)", strerror(errno), sbuf[0].sem_num, sbuf[0].sem_op);
1536 		return (FILEBENCH_ERROR);
1537 	}
1538 #else
1539 	filebench_log(LOG_DEBUG_IMPL,
1540 	    "flow %s-%d semblock init with posix semaphore",
1541 	    flowop->fo_name, flowop->fo_instance);
1542 
1543 	sem_init(&flowop->fo_sem, 1, 0);
1544 #endif	/* HAVE_SYSV_SEM */
1545 
1546 	if (!(avd_get_bool(flowop->fo_blocking)))
1547 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1548 
1549 	return (FILEBENCH_OK);
1550 }
1551 
1552 /*
1553  * Releases the semids for the System V semaphore allocated
1554  * to this flowop. If not using System V semaphores, then
1555  * it is effectively just a no-op.
1556  */
1557 static void
1558 flowoplib_semblock_destruct(flowop_t *flowop)
1559 {
1560 #ifdef HAVE_SYSV_SEM
1561 	ipc_semidfree(flowop->fo_semid_lw);
1562 	ipc_semidfree(flowop->fo_semid_hw);
1563 #else
1564 	sem_destroy(&flowop->fo_sem);
1565 #endif /* HAVE_SYSV_SEM */
1566 }
1567 
1568 /*
1569  * Attempts to pass a System V or posix semaphore as appropriate,
1570  * and blocks if necessary. Returns FILEBENCH_ERROR if a set of System V
1571  * semphores is not available or cannot be acquired, or if the initial
1572  * post to the semaphore set fails. Returns FILEBENCH_OK on success.
1573  */
1574 static int
1575 flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop)
1576 {
1577 
1578 #ifdef HAVE_SYSV_SEM
1579 	struct sembuf sbuf[2];
1580 	int value = avd_get_int(flowop->fo_value);
1581 	int sys_semid;
1582 	struct timespec timeout;
1583 
1584 	sys_semid = filebench_shm->shm_sys_semid;
1585 
1586 	filebench_log(LOG_DEBUG_IMPL,
1587 	    "flow %s-%d sem blocking on id %x num %x value %d",
1588 	    flowop->fo_name, flowop->fo_instance, sys_semid,
1589 	    flowop->fo_semid_hw, value);
1590 
1591 	/* Post, decrement the increment the hw queue */
1592 	sbuf[0].sem_num = flowop->fo_semid_hw;
1593 	sbuf[0].sem_op = (short)value;
1594 	sbuf[0].sem_flg = 0;
1595 	sbuf[1].sem_num = flowop->fo_semid_lw;
1596 	sbuf[1].sem_op = value * -1;
1597 	sbuf[1].sem_flg = 0;
1598 	timeout.tv_sec = 600;
1599 	timeout.tv_nsec = 0;
1600 
1601 	if (avd_get_bool(flowop->fo_blocking))
1602 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1603 
1604 	flowop_beginop(threadflow, flowop);
1605 
1606 #ifdef HAVE_SEMTIMEDOP
1607 	(void) semtimedop(sys_semid, &sbuf[0], 1, &timeout);
1608 	(void) semtimedop(sys_semid, &sbuf[1], 1, &timeout);
1609 #else
1610 	(void) semop(sys_semid, &sbuf[0], 1);
1611 	(void) semop(sys_semid, &sbuf[1], 1);
1612 #endif /* HAVE_SEMTIMEDOP */
1613 
1614 	if (avd_get_bool(flowop->fo_blocking))
1615 		(void) ipc_mutex_lock(&flowop->fo_lock);
1616 
1617 	flowop_endop(threadflow, flowop, 0);
1618 
1619 #else
1620 	int value = avd_get_int(flowop->fo_value);
1621 	int i;
1622 
1623 	filebench_log(LOG_DEBUG_IMPL,
1624 	    "flow %s-%d sem blocking on posix semaphore",
1625 	    flowop->fo_name, flowop->fo_instance);
1626 
1627 	/* Decrement sem by value */
1628 	for (i = 0; i < value; i++) {
1629 		if (sem_wait(&flowop->fo_sem) == -1) {
1630 			filebench_log(LOG_ERROR, "semop wait failed");
1631 			return (FILEBENCH_ERROR);
1632 		}
1633 	}
1634 
1635 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d sem unblocking",
1636 	    flowop->fo_name, flowop->fo_instance);
1637 #endif /* HAVE_SYSV_SEM */
1638 
1639 	return (FILEBENCH_OK);
1640 }
1641 
1642 /*
1643  * Calls ipc_seminit(). Always returns FILEBENCH_OK.
1644  */
1645 /* ARGSUSED */
1646 static int
1647 flowoplib_sempost_init(flowop_t *flowop)
1648 {
1649 #ifdef HAVE_SYSV_SEM
1650 	ipc_seminit();
1651 #endif /* HAVE_SYSV_SEM */
1652 	return (FILEBENCH_OK);
1653 }
1654 
1655 /*
1656  * Post to a System V or posix semaphore as appropriate.
1657  * On the first call for a given flowop instance, this routine
1658  * will use the fo_targetname attribute to locate all semblock
1659  * flowops that are expecting posts from this flowop. All
1660  * target flowops on this list will have a post operation done
1661  * to their semaphores on each call.
1662  */
1663 static int
1664 flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop)
1665 {
1666 	flowop_t *target;
1667 
1668 	filebench_log(LOG_DEBUG_IMPL,
1669 	    "sempost flow %s-%d",
1670 	    flowop->fo_name,
1671 	    flowop->fo_instance);
1672 
1673 	/* if this is the first post, create the post list */
1674 	if (flowop->fo_targets == NULL) {
1675 		flowop_t *result = flowop_find(flowop->fo_targetname);
1676 
1677 		flowop->fo_targets = result;
1678 
1679 		if (result == NULL) {
1680 			filebench_log(LOG_ERROR,
1681 			    "sempost: could not find op %s for thread %s",
1682 			    flowop->fo_targetname,
1683 			    threadflow->tf_name);
1684 			filebench_shutdown(1);
1685 		}
1686 
1687 		while (result) {
1688 			result->fo_targetnext =
1689 			    result->fo_resultnext;
1690 			result = result->fo_resultnext;
1691 		}
1692 	}
1693 
1694 	target = flowop->fo_targets;
1695 
1696 	flowop_beginop(threadflow, flowop);
1697 	/* post to the targets */
1698 	while (target) {
1699 #ifdef HAVE_SYSV_SEM
1700 		struct sembuf sbuf[2];
1701 		int sys_semid;
1702 		int blocking;
1703 #else
1704 		int i;
1705 #endif /* HAVE_SYSV_SEM */
1706 		struct timespec timeout;
1707 		int value = (int)avd_get_int(flowop->fo_value);
1708 
1709 		if (target->fo_instance == FLOW_MASTER) {
1710 			target = target->fo_targetnext;
1711 			continue;
1712 		}
1713 
1714 #ifdef HAVE_SYSV_SEM
1715 
1716 		filebench_log(LOG_DEBUG_IMPL,
1717 		    "sempost flow %s-%d num %x",
1718 		    target->fo_name,
1719 		    target->fo_instance,
1720 		    target->fo_semid_lw);
1721 
1722 		sys_semid = filebench_shm->shm_sys_semid;
1723 		sbuf[0].sem_num = target->fo_semid_lw;
1724 		sbuf[0].sem_op = (short)value;
1725 		sbuf[0].sem_flg = 0;
1726 		sbuf[1].sem_num = target->fo_semid_hw;
1727 		sbuf[1].sem_op = value * -1;
1728 		sbuf[1].sem_flg = 0;
1729 		timeout.tv_sec = 600;
1730 		timeout.tv_nsec = 0;
1731 
1732 		if (avd_get_bool(flowop->fo_blocking))
1733 			blocking = 1;
1734 		else
1735 			blocking = 0;
1736 
1737 #ifdef HAVE_SEMTIMEDOP
1738 		if ((semtimedop(sys_semid, &sbuf[0], blocking + 1,
1739 		    &timeout) == -1) && (errno && (errno != EAGAIN))) {
1740 #else
1741 		if ((semop(sys_semid, &sbuf[0], blocking + 1) == -1) &&
1742 		    (errno && (errno != EAGAIN))) {
1743 #endif /* HAVE_SEMTIMEDOP */
1744 			filebench_log(LOG_ERROR, "semop post failed: %s",
1745 			    strerror(errno));
1746 			return (FILEBENCH_ERROR);
1747 		}
1748 
1749 		filebench_log(LOG_DEBUG_IMPL,
1750 		    "flow %s-%d finished posting",
1751 		    target->fo_name, target->fo_instance);
1752 #else
1753 		filebench_log(LOG_DEBUG_IMPL,
1754 		    "sempost flow %s-%d to posix semaphore",
1755 		    target->fo_name,
1756 		    target->fo_instance);
1757 
1758 		/* Increment sem by value */
1759 		for (i = 0; i < value; i++) {
1760 			if (sem_post(&target->fo_sem) == -1) {
1761 				filebench_log(LOG_ERROR, "semop post failed");
1762 				return (FILEBENCH_ERROR);
1763 			}
1764 		}
1765 
1766 		filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
1767 		    target->fo_name, target->fo_instance);
1768 #endif /* HAVE_SYSV_SEM */
1769 
1770 		target = target->fo_targetnext;
1771 	}
1772 	flowop_endop(threadflow, flowop, 0);
1773 
1774 	return (FILEBENCH_OK);
1775 }
1776 
1777 
1778 /*
1779  * Section for exercising create / open / close / delete operations
1780  * on files within a fileset. For proper operation, the flowop attribute
1781  * "fd", which sets the fo_fdnumber field in the flowop, must be used
1782  * so that the same file is opened and later closed. "fd" is an index
1783  * into a pair of arrays maintained by threadflows, one of which
1784  * contains the operating system assigned file descriptors and the other
1785  * a pointer to the filesetentry whose file the file descriptor
1786  * references. An openfile flowop defined without fd being set will use
1787  * the default (0) fd or, if specified, rotate through fd indices, but
1788  * createfile and closefile must use the default or a specified fd.
1789  * Meanwhile deletefile picks and arbitrary file to delete, regardless
1790  * of fd attribute.
1791  */
1792 
1793 /*
1794  * XXX Making file selection more consistent among the flowops might good
1795  */
1796 
1797 
1798 /*
1799  * Emulates (and actually does) file open. Obtains a file descriptor
1800  * index, then calls flowoplib_openfile_common() to open. Returns
1801  * FILEBENCH_ERROR if no file descriptor is found, and returns the
1802  * status from flowoplib_openfile_common otherwise (FILEBENCH_ERROR,
1803  * FILEBENCH_NORSC, FILEBENCH_OK).
1804  */
1805 static int
1806 flowoplib_openfile(threadflow_t *threadflow, flowop_t *flowop)
1807 {
1808 	int fd = flowoplib_fdnum(threadflow, flowop);
1809 
1810 	if (fd == -1)
1811 		return (FILEBENCH_ERROR);
1812 
1813 	return (flowoplib_openfile_common(threadflow, flowop, fd));
1814 }
1815 
1816 /*
1817  * Common file opening code for filesets. Uses the supplied
1818  * file descriptor index to determine the tf_fd entry to use.
1819  * If the entry is empty (0) and the fileset exists, fileset
1820  * pick is called to select a fileset entry to use. The file
1821  * specified in the filesetentry is opened, and the returned
1822  * operating system file descriptor and a pointer to the
1823  * filesetentry are stored in tf_fd[fd] and tf_fse[fd],
1824  * respectively. Returns FILEBENCH_ERROR on error,
1825  * FILEBENCH_NORSC if no suitable filesetentry can be found,
1826  * and FILEBENCH_OK on success.
1827  */
1828 static int
1829 flowoplib_openfile_common(threadflow_t *threadflow, flowop_t *flowop, int fd)
1830 {
1831 	filesetentry_t *file;
1832 	char *fileset_name;
1833 	int tid = 0;
1834 	int err;
1835 
1836 	if (flowop->fo_fileset == NULL) {
1837 		filebench_log(LOG_ERROR, "flowop NULL file");
1838 		return (FILEBENCH_ERROR);
1839 	}
1840 
1841 	if ((fileset_name =
1842 	    avd_get_str(flowop->fo_fileset->fs_name)) == NULL) {
1843 		filebench_log(LOG_ERROR,
1844 		    "flowop %s: fileset has no name", flowop->fo_name);
1845 		return (FILEBENCH_ERROR);
1846 	}
1847 
1848 	/*
1849 	 * If the flowop doesn't default to persistent fd
1850 	 * then get unique thread ID for use by fileset_pick
1851 	 */
1852 	if (avd_get_bool(flowop->fo_rotatefd))
1853 		tid = threadflow->tf_utid;
1854 
1855 	if (threadflow->tf_fd[fd] != 0) {
1856 		filebench_log(LOG_ERROR,
1857 		    "flowop %s attempted to open without closing on fd %d",
1858 		    flowop->fo_name, fd);
1859 		return (FILEBENCH_ERROR);
1860 	}
1861 
1862 #ifdef HAVE_RAW_SUPPORT
1863 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1864 		int open_attrs = 0;
1865 		char name[MAXPATHLEN];
1866 
1867 		(void) fb_strlcpy(name,
1868 		    avd_get_str(flowop->fo_fileset->fs_path), MAXPATHLEN);
1869 		(void) fb_strlcat(name, "/", MAXPATHLEN);
1870 		(void) fb_strlcat(name, fileset_name, MAXPATHLEN);
1871 
1872 		if (avd_get_bool(flowop->fo_dsync)) {
1873 #ifdef sun
1874 			open_attrs |= O_DSYNC;
1875 #else
1876 			open_attrs |= O_FSYNC;
1877 #endif
1878 		}
1879 
1880 		filebench_log(LOG_DEBUG_SCRIPT,
1881 		    "open raw device %s flags %d = %d", name, open_attrs, fd);
1882 
1883 		threadflow->tf_fd[fd] = open64(name,
1884 		    O_RDWR | open_attrs, 0666);
1885 
1886 		if (threadflow->tf_fd[fd] < 0) {
1887 			filebench_log(LOG_ERROR,
1888 			    "Failed to open raw device %s: %s",
1889 			    name, strerror(errno));
1890 			return (FILEBENCH_ERROR);
1891 		}
1892 
1893 		/* if running on Solaris, use un-buffered io */
1894 #ifdef sun
1895 		(void) directio(threadflow->tf_fd[fd], DIRECTIO_ON);
1896 #endif
1897 
1898 		threadflow->tf_fse[fd] = NULL;
1899 
1900 		return (FILEBENCH_OK);
1901 	}
1902 #endif /* HAVE_RAW_SUPPORT */
1903 
1904 	if ((err = flowoplib_pickfile(&file, flowop,
1905 	    FILESET_PICKEXISTS, tid)) != FILEBENCH_OK) {
1906 		filebench_log(LOG_DEBUG_SCRIPT,
1907 		    "flowop %s failed to pick file from %s on fd %d",
1908 		    flowop->fo_name, fileset_name, fd);
1909 		return (err);
1910 	}
1911 
1912 	threadflow->tf_fse[fd] = file;
1913 
1914 	flowop_beginop(threadflow, flowop);
1915 	threadflow->tf_fd[fd] = fileset_openfile(flowop->fo_fileset,
1916 	    file, O_RDWR, 0666, flowoplib_fileattrs(flowop));
1917 	flowop_endop(threadflow, flowop, 0);
1918 
1919 	if (threadflow->tf_fd[fd] < 0) {
1920 		filebench_log(LOG_ERROR, "flowop %s failed to open file %s",
1921 		    flowop->fo_name, file->fse_path);
1922 		return (FILEBENCH_ERROR);
1923 	}
1924 
1925 	filebench_log(LOG_DEBUG_SCRIPT,
1926 	    "flowop %s: opened %s fd[%d] = %d",
1927 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1928 
1929 	return (FILEBENCH_OK);
1930 }
1931 
1932 /*
1933  * Emulate create of a file. Uses the flowop's fdnumber to select
1934  * tf_fd and tf_fse array locations to put the created file's file
1935  * descriptor and filesetentry respectively. Uses flowoplib_pickfile()
1936  * to select a specific filesetentry whose file does not currently
1937  * exist for the file create operation. Then calls
1938  * fileset_openfile() with the O_CREATE flag set to create the
1939  * file. Returns FILEBENCH_ERROR if the array index specified by fdnumber is
1940  * already in use, the flowop has no associated fileset, or
1941  * the create call fails. Returns 1 if a filesetentry with a
1942  * nonexistent file cannot be found. Returns FILEBENCH_OK on success.
1943  */
1944 static int
1945 flowoplib_createfile(threadflow_t *threadflow, flowop_t *flowop)
1946 {
1947 	filesetentry_t *file;
1948 	int fd = flowop->fo_fdnumber;
1949 	int err;
1950 
1951 	if (threadflow->tf_fd[fd] != 0) {
1952 		filebench_log(LOG_ERROR,
1953 		    "flowop %s attempted to create without closing on fd %d",
1954 		    flowop->fo_name, fd);
1955 		return (FILEBENCH_ERROR);
1956 	}
1957 
1958 	if (flowop->fo_fileset == NULL) {
1959 		filebench_log(LOG_ERROR, "flowop NULL file");
1960 		return (FILEBENCH_ERROR);
1961 	}
1962 
1963 #ifdef HAVE_RAW_SUPPORT
1964 	/* can't be used with raw devices */
1965 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1966 		filebench_log(LOG_ERROR,
1967 		    "flowop %s attempted to a createfile on RAW device",
1968 		    flowop->fo_name);
1969 		return (FILEBENCH_ERROR);
1970 	}
1971 #endif /* HAVE_RAW_SUPPORT */
1972 
1973 	if ((err = flowoplib_pickfile(&file, flowop,
1974 	    FILESET_PICKNOEXIST, 0)) != FILEBENCH_OK) {
1975 		filebench_log(LOG_DEBUG_SCRIPT,
1976 		    "flowop %s failed to pick file from fileset %s",
1977 		    flowop->fo_name,
1978 		    avd_get_str(flowop->fo_fileset->fs_name));
1979 		return (err);
1980 	}
1981 
1982 	threadflow->tf_fse[fd] = file;
1983 
1984 	flowop_beginop(threadflow, flowop);
1985 	threadflow->tf_fd[fd] = fileset_openfile(flowop->fo_fileset,
1986 	    file, O_RDWR | O_CREAT, 0666, flowoplib_fileattrs(flowop));
1987 	flowop_endop(threadflow, flowop, 0);
1988 
1989 	if (threadflow->tf_fd[fd] < 0) {
1990 		filebench_log(LOG_ERROR, "failed to create file %s",
1991 		    flowop->fo_name);
1992 		return (FILEBENCH_ERROR);
1993 	}
1994 
1995 	filebench_log(LOG_DEBUG_SCRIPT,
1996 	    "flowop %s: created %s fd[%d] = %d",
1997 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1998 
1999 	return (FILEBENCH_OK);
2000 }
2001 
2002 /*
2003  * Emulates delete of a file. If a valid fd is provided, it uses the
2004  * filesetentry stored at that fd location to select the file to be
2005  * deleted, otherwise it picks an arbitrary filesetentry
2006  * whose file exists. It then uses unlink() to delete it and Clears
2007  * the FSE_EXISTS flag for the filesetentry. Returns FILEBENCH_ERROR if the
2008  * flowop has no associated fileset. Returns FILEBENCH_NORSC if an appropriate
2009  * filesetentry cannot be found, and FILEBENCH_OK on success.
2010  */
2011 static int
2012 flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop)
2013 {
2014 	filesetentry_t *file;
2015 	fileset_t *fileset;
2016 	char path[MAXPATHLEN];
2017 	char *pathtmp;
2018 	int fd = flowop->fo_fdnumber;
2019 
2020 	/* if fd specified, use it to access file */
2021 	if ((fd > 0) && ((file = threadflow->tf_fse[fd]) != NULL)) {
2022 
2023 		/* indicate that the file will be deleted */
2024 		threadflow->tf_fse[fd] = NULL;
2025 
2026 		/* if here, we still have a valid file pointer */
2027 		fileset = file->fse_fileset;
2028 	} else {
2029 
2030 		/* Otherwise, pick arbitrary file */
2031 		file = NULL;
2032 		fileset = flowop->fo_fileset;
2033 	}
2034 
2035 
2036 	if (fileset == NULL) {
2037 		filebench_log(LOG_ERROR, "flowop NULL file");
2038 		return (FILEBENCH_ERROR);
2039 	}
2040 
2041 #ifdef HAVE_RAW_SUPPORT
2042 	/* can't be used with raw devices */
2043 	if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
2044 		filebench_log(LOG_ERROR,
2045 		    "flowop %s attempted a deletefile on RAW device",
2046 		    flowop->fo_name);
2047 		return (FILEBENCH_ERROR);
2048 	}
2049 #endif /* HAVE_RAW_SUPPORT */
2050 
2051 	if (file == NULL) {
2052 		int err;
2053 
2054 		/* pick arbitrary, existing (allocated) file */
2055 		if ((err = flowoplib_pickfile(&file, flowop,
2056 		    FILESET_PICKEXISTS, 0)) != FILEBENCH_OK) {
2057 			filebench_log(LOG_DEBUG_SCRIPT,
2058 			    "flowop %s failed to pick file", flowop->fo_name);
2059 			return (err);
2060 		}
2061 	} else {
2062 		/* delete specific file. wait for it to be non-busy */
2063 		(void) ipc_mutex_lock(&fileset->fs_pick_lock);
2064 		while (file->fse_flags & FSE_BUSY) {
2065 			file->fse_flags |= FSE_THRD_WAITNG;
2066 			(void) pthread_cond_wait(&fileset->fs_thrd_wait_cv,
2067 			    &fileset->fs_pick_lock);
2068 		}
2069 
2070 		/* File now available, grab it for deletion */
2071 		file->fse_flags |= FSE_BUSY;
2072 		fileset->fs_idle_files--;
2073 		(void) ipc_mutex_unlock(&fileset->fs_pick_lock);
2074 	}
2075 
2076 	/* don't delete if anyone (other than me) has file open */
2077 	if ((fd > 0) && (threadflow->tf_fd[fd] > 0)) {
2078 		if (file->fse_open_cnt > 1) {
2079 			filebench_log(LOG_DEBUG_SCRIPT,
2080 			    "flowop %s can't delete file opened by other"
2081 			    " threads at fd = %d", flowop->fo_name, fd);
2082 			fileset_unbusy(file, FALSE, FALSE, 0);
2083 			return (FILEBENCH_OK);
2084 		} else {
2085 			filebench_log(LOG_DEBUG_SCRIPT,
2086 			    "flowop %s deleting still open file at fd = %d",
2087 			    flowop->fo_name, fd);
2088 		}
2089 	} else if (file->fse_open_cnt > 0) {
2090 		filebench_log(LOG_DEBUG_SCRIPT,
2091 		    "flowop %s can't delete file opened by other"
2092 		    " threads at fd = %d, open count = %d",
2093 		    flowop->fo_name, fd, file->fse_open_cnt);
2094 		fileset_unbusy(file, FALSE, FALSE, 0);
2095 		return (FILEBENCH_OK);
2096 	}
2097 
2098 	(void) fb_strlcpy(path, avd_get_str(fileset->fs_path), MAXPATHLEN);
2099 	(void) fb_strlcat(path, "/", MAXPATHLEN);
2100 	(void) fb_strlcat(path, avd_get_str(fileset->fs_name), MAXPATHLEN);
2101 	pathtmp = fileset_resolvepath(file);
2102 	(void) fb_strlcat(path, pathtmp, MAXPATHLEN);
2103 	free(pathtmp);
2104 
2105 	/* delete the selected file */
2106 	flowop_beginop(threadflow, flowop);
2107 	(void) unlink(path);
2108 	flowop_endop(threadflow, flowop, 0);
2109 
2110 	/* indicate that it is no longer busy and no longer exists */
2111 	fileset_unbusy(file, TRUE, FALSE, -file->fse_open_cnt);
2112 
2113 	filebench_log(LOG_DEBUG_SCRIPT, "deleted file %s", file->fse_path);
2114 
2115 	return (FILEBENCH_OK);
2116 }
2117 
2118 /*
2119  * Emulates fsync of a file. Obtains the file descriptor index
2120  * from the flowop, obtains the actual file descriptor from
2121  * the threadflow's table, checks to be sure it is still an
2122  * open file, then does an fsync operation on it. Returns FILEBENCH_ERROR
2123  * if the file no longer is open, FILEBENCH_OK otherwise.
2124  */
2125 static int
2126 flowoplib_fsync(threadflow_t *threadflow, flowop_t *flowop)
2127 {
2128 	filesetentry_t *file;
2129 	int fd = flowop->fo_fdnumber;
2130 
2131 	if (threadflow->tf_fd[fd] == 0) {
2132 		filebench_log(LOG_ERROR,
2133 		    "flowop %s attempted to fsync a closed fd %d",
2134 		    flowop->fo_name, fd);
2135 		return (FILEBENCH_ERROR);
2136 	}
2137 
2138 	file = threadflow->tf_fse[fd];
2139 
2140 	if ((file == NULL) ||
2141 	    (file->fse_fileset->fs_attrs & FILESET_IS_RAW_DEV)) {
2142 		filebench_log(LOG_ERROR,
2143 		    "flowop %s attempted to a fsync a RAW device",
2144 		    flowop->fo_name);
2145 		return (FILEBENCH_ERROR);
2146 	}
2147 
2148 	/* Measure time to fsync */
2149 	flowop_beginop(threadflow, flowop);
2150 	(void) fsync(threadflow->tf_fd[fd]);
2151 	flowop_endop(threadflow, flowop, 0);
2152 
2153 	filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s", file->fse_path);
2154 
2155 	return (FILEBENCH_OK);
2156 }
2157 
2158 /*
2159  * Emulate fsync of an entire fileset. Search through the
2160  * threadflow's file descriptor array, doing fsync() on each
2161  * open file that belongs to the flowop's fileset. Always
2162  * returns FILEBENCH_OK.
2163  */
2164 static int
2165 flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop)
2166 {
2167 	int fd;
2168 
2169 	for (fd = 0; fd < THREADFLOW_MAXFD; fd++) {
2170 		filesetentry_t *file;
2171 
2172 		/* Match the file set to fsync */
2173 		if ((threadflow->tf_fse[fd] == NULL) ||
2174 		    (flowop->fo_fileset != threadflow->tf_fse[fd]->fse_fileset))
2175 			continue;
2176 
2177 		/* Measure time to fsync */
2178 		flowop_beginop(threadflow, flowop);
2179 		(void) fsync(threadflow->tf_fd[fd]);
2180 		flowop_endop(threadflow, flowop, 0);
2181 
2182 		file = threadflow->tf_fse[fd];
2183 
2184 		filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s",
2185 		    file->fse_path);
2186 	}
2187 
2188 	return (FILEBENCH_OK);
2189 }
2190 
2191 /*
2192  * Emulate close of a file.  Obtains the file descriptor index
2193  * from the flowop, obtains the actual file descriptor from the
2194  * threadflow's table, checks to be sure it is still an open
2195  * file, then does a close operation on it. Then sets the
2196  * threadflow file descriptor table entry to 0, and the file set
2197  * entry pointer to NULL. Returns FILEBENCH_ERROR if the file was not open,
2198  * FILEBENCH_OK otherwise.
2199  */
2200 static int
2201 flowoplib_closefile(threadflow_t *threadflow, flowop_t *flowop)
2202 {
2203 	filesetentry_t *file;
2204 	fileset_t *fileset;
2205 	int fd = flowop->fo_fdnumber;
2206 
2207 	if (threadflow->tf_fd[fd] == 0) {
2208 		filebench_log(LOG_ERROR,
2209 		    "flowop %s attempted to close an already closed fd %d",
2210 		    flowop->fo_name, fd);
2211 		return (FILEBENCH_ERROR);
2212 	}
2213 
2214 	file = threadflow->tf_fse[fd];
2215 	fileset = file->fse_fileset;
2216 
2217 	/* Wait for it to be non-busy */
2218 	(void) ipc_mutex_lock(&fileset->fs_pick_lock);
2219 	while (file->fse_flags & FSE_BUSY) {
2220 		file->fse_flags |= FSE_THRD_WAITNG;
2221 		(void) pthread_cond_wait(&fileset->fs_thrd_wait_cv,
2222 		    &fileset->fs_pick_lock);
2223 	}
2224 
2225 	/* File now available, grab it for closing */
2226 	file->fse_flags |= FSE_BUSY;
2227 
2228 	/* if last open, set declare idle */
2229 	if (file->fse_open_cnt == 1)
2230 		fileset->fs_idle_files--;
2231 
2232 	(void) ipc_mutex_unlock(&fileset->fs_pick_lock);
2233 
2234 	/* Measure time to close */
2235 	flowop_beginop(threadflow, flowop);
2236 	(void) close(threadflow->tf_fd[fd]);
2237 	flowop_endop(threadflow, flowop, 0);
2238 
2239 	fileset_unbusy(file, FALSE, FALSE, -1);
2240 
2241 	threadflow->tf_fd[fd] = 0;
2242 
2243 	filebench_log(LOG_DEBUG_SCRIPT, "closed file %s", file->fse_path);
2244 
2245 	return (FILEBENCH_OK);
2246 }
2247 
2248 /*
2249  * Obtain the full pathname of the directory described by the filesetentry
2250  * indicated by "dir", and copy it into the character array pointed to by
2251  * path. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
2252  */
2253 static int
2254 flowoplib_getdirpath(filesetentry_t *dir, char *path)
2255 {
2256 	char		*fileset_path;
2257 	char		*fileset_name;
2258 	char		*part_path;
2259 
2260 	if ((fileset_path = avd_get_str(dir->fse_fileset->fs_path)) == NULL) {
2261 		filebench_log(LOG_ERROR, "Fileset path not set");
2262 		return (FILEBENCH_ERROR);
2263 	}
2264 
2265 	if ((fileset_name = avd_get_str(dir->fse_fileset->fs_name)) == NULL) {
2266 		filebench_log(LOG_ERROR, "Fileset name not set");
2267 		return (FILEBENCH_ERROR);
2268 	}
2269 
2270 	(void) fb_strlcpy(path, fileset_path, MAXPATHLEN);
2271 	(void) fb_strlcat(path, "/", MAXPATHLEN);
2272 	(void) fb_strlcat(path, fileset_name, MAXPATHLEN);
2273 
2274 	if ((part_path = fileset_resolvepath(dir)) == NULL)
2275 		return (FILEBENCH_ERROR);
2276 
2277 	(void) fb_strlcat(path, part_path, MAXPATHLEN);
2278 	free(part_path);
2279 
2280 	return (FILEBENCH_OK);
2281 }
2282 
2283 /*
2284  * Use mkdir to create a directory.  Obtains the fileset name from the
2285  * flowop, selects a non-existent leaf directory and obtains its full
2286  * path, then uses mkdir to create it on the storage subsystem (make it
2287  * existent). Returns FILEBENCH_NORSC is there are no more non-existent
2288  * directories in the fileset, FILEBENCH_ERROR on other errors, and
2289  * FILEBENCH_OK on success.
2290  */
2291 static int
2292 flowoplib_makedir(threadflow_t *threadflow, flowop_t *flowop)
2293 {
2294 	filesetentry_t	*dir;
2295 	int		ret;
2296 	char		full_path[MAXPATHLEN];
2297 
2298 	if ((ret = flowoplib_pickleafdir(&dir, flowop,
2299 	    FILESET_PICKNOEXIST)) != FILEBENCH_OK)
2300 		return (ret);
2301 
2302 	if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
2303 		return (ret);
2304 
2305 	flowop_beginop(threadflow, flowop);
2306 	(void) mkdir(full_path, 0755);
2307 	flowop_endop(threadflow, flowop, 0);
2308 
2309 	/* indicate that it is no longer busy and now exists */
2310 	fileset_unbusy(dir, TRUE, TRUE, 0);
2311 
2312 	return (FILEBENCH_OK);
2313 }
2314 
2315 /*
2316  * Use rmdir to delete a directory.  Obtains the fileset name from the
2317  * flowop, selects an existent leaf directory and obtains its full path,
2318  * then uses rmdir to remove it from the storage subsystem (make it
2319  * non-existent). Returns FILEBENCH_NORSC is there are no more existent
2320  * directories in the fileset, FILEBENCH_ERROR on other errors, and
2321  * FILEBENCH_OK on success.
2322  */
2323 static int
2324 flowoplib_removedir(threadflow_t *threadflow, flowop_t *flowop)
2325 {
2326 	filesetentry_t *dir;
2327 	int		ret;
2328 	char		full_path[MAXPATHLEN];
2329 
2330 	if ((ret = flowoplib_pickleafdir(&dir, flowop,
2331 	    FILESET_PICKEXISTS)) != FILEBENCH_OK)
2332 		return (ret);
2333 
2334 	if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
2335 		return (ret);
2336 
2337 	flowop_beginop(threadflow, flowop);
2338 	(void) rmdir(full_path);
2339 	flowop_endop(threadflow, flowop, 0);
2340 
2341 	/* indicate that it is no longer busy and no longer exists */
2342 	fileset_unbusy(dir, TRUE, FALSE, 0);
2343 
2344 	return (FILEBENCH_OK);
2345 }
2346 
2347 /*
2348  * Use opendir(), multiple readdir() calls, and closedir() to list the
2349  * contents of a directory.  Obtains the fileset name from the
2350  * flowop, selects a normal subdirectory (which always exist) and obtains
2351  * its full path, then uses opendir() to get a DIR handle to it from the
2352  * file system, a readdir() loop to access each directory entry, and
2353  * finally cleans up with a closedir(). The latency reported is the total
2354  * for all this activity, and it also reports the total number of bytes
2355  * in the entries as the amount "read". Returns FILEBENCH_ERROR on errors,
2356  * and FILEBENCH_OK on success.
2357  */
2358 static int
2359 flowoplib_listdir(threadflow_t *threadflow, flowop_t *flowop)
2360 {
2361 	fileset_t	*fileset;
2362 	filesetentry_t	*dir;
2363 	DIR		*dir_handlep;
2364 	struct dirent	*direntp;
2365 	int		dir_bytes = 0;
2366 	int		ret;
2367 	char		full_path[MAXPATHLEN];
2368 
2369 	if ((fileset = flowop->fo_fileset) == NULL) {
2370 		filebench_log(LOG_ERROR, "flowop NO fileset");
2371 		return (FILEBENCH_ERROR);
2372 	}
2373 
2374 	if ((dir = fileset_pick(fileset, FILESET_PICKDIR, 0, 0)) == NULL) {
2375 		filebench_log(LOG_DEBUG_SCRIPT,
2376 		    "flowop %s failed to pick directory from fileset %s",
2377 		    flowop->fo_name,
2378 		    avd_get_str(fileset->fs_name));
2379 		return (FILEBENCH_ERROR);
2380 	}
2381 
2382 	if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
2383 		return (ret);
2384 
2385 	flowop_beginop(threadflow, flowop);
2386 
2387 	/* open the directory */
2388 	if ((dir_handlep = opendir(full_path)) == NULL) {
2389 		filebench_log(LOG_ERROR,
2390 		    "flowop %s failed to open directory in fileset %s\n",
2391 		    flowop->fo_name, avd_get_str(fileset->fs_name));
2392 		return (FILEBENCH_ERROR);
2393 	}
2394 
2395 	/* read through the directory entries */
2396 	while ((direntp = readdir(dir_handlep)) != NULL) {
2397 		dir_bytes += (strlen(direntp->d_name) +
2398 		    sizeof (struct dirent) - 1);
2399 	}
2400 
2401 	/* close the directory */
2402 	(void) closedir(dir_handlep);
2403 
2404 	flowop_endop(threadflow, flowop, dir_bytes);
2405 
2406 	/* indicate that it is no longer busy */
2407 	fileset_unbusy(dir, FALSE, FALSE, 0);
2408 
2409 	return (FILEBENCH_OK);
2410 }
2411 
2412 /*
2413  * Emulate stat of a file. Picks an arbitrary filesetentry with
2414  * an existing file from the flowop's fileset, then performs a
2415  * stat() operation on it. Returns FILEBENCH_ERROR if the flowop has no
2416  * associated fileset. Returns FILEBENCH_NORSC if an appropriate filesetentry
2417  * cannot be found, and FILEBENCH_OK on success.
2418  */
2419 static int
2420 flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop)
2421 {
2422 	filesetentry_t *file;
2423 	fileset_t *fileset;
2424 	struct stat statbuf;
2425 	int fd = flowop->fo_fdnumber;
2426 
2427 	/* if fd specified and the file is open, use it to access file */
2428 	if ((fd > 0) && ((threadflow->tf_fd[fd]) > 0)) {
2429 
2430 		/* check whether file handle still valid */
2431 		if ((file = threadflow->tf_fse[fd]) == NULL) {
2432 			filebench_log(LOG_DEBUG_SCRIPT,
2433 			    "flowop %s trying to stat NULL file at fd = %d",
2434 			    flowop->fo_name, fd);
2435 			return (FILEBENCH_ERROR);
2436 		}
2437 
2438 		/* if here, we still have a valid file pointer */
2439 		fileset = file->fse_fileset;
2440 	} else {
2441 		/* Otherwise, pick arbitrary file */
2442 		file = NULL;
2443 		fileset = flowop->fo_fileset;
2444 	}
2445 
2446 	if (fileset == NULL) {
2447 		filebench_log(LOG_ERROR,
2448 		    "statfile with no fileset specified");
2449 		return (FILEBENCH_ERROR);
2450 	}
2451 
2452 #ifdef HAVE_RAW_SUPPORT
2453 	/* can't be used with raw devices */
2454 	if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
2455 		filebench_log(LOG_ERROR,
2456 		    "flowop %s attempted do a statfile on a RAW device",
2457 		    flowop->fo_name);
2458 		return (FILEBENCH_ERROR);
2459 	}
2460 #endif /* HAVE_RAW_SUPPORT */
2461 
2462 	if (file == NULL) {
2463 		char path[MAXPATHLEN];
2464 		char *pathtmp;
2465 		int err;
2466 
2467 		/* pick arbitrary, existing (allocated) file */
2468 		if ((err = flowoplib_pickfile(&file, flowop,
2469 		    FILESET_PICKEXISTS, 0)) != FILEBENCH_OK) {
2470 			filebench_log(LOG_DEBUG_SCRIPT,
2471 			    "Statfile flowop %s failed to pick file",
2472 			    flowop->fo_name);
2473 			return (err);
2474 		}
2475 
2476 		/* resolve path and do a stat on file */
2477 		(void) fb_strlcpy(path, avd_get_str(fileset->fs_path),
2478 		    MAXPATHLEN);
2479 		(void) fb_strlcat(path, "/", MAXPATHLEN);
2480 		(void) fb_strlcat(path, avd_get_str(fileset->fs_name),
2481 		    MAXPATHLEN);
2482 		pathtmp = fileset_resolvepath(file);
2483 		(void) fb_strlcat(path, pathtmp, MAXPATHLEN);
2484 		free(pathtmp);
2485 
2486 		/* stat the file */
2487 		flowop_beginop(threadflow, flowop);
2488 		if (stat(path, &statbuf) == -1)
2489 			filebench_log(LOG_ERROR,
2490 			    "statfile flowop %s failed", flowop->fo_name);
2491 		flowop_endop(threadflow, flowop, 0);
2492 
2493 		fileset_unbusy(file, FALSE, FALSE, 0);
2494 	} else {
2495 		/* stat specific file */
2496 		flowop_beginop(threadflow, flowop);
2497 		if (fstat(threadflow->tf_fd[fd], &statbuf) == -1)
2498 			filebench_log(LOG_ERROR,
2499 			    "statfile flowop %s failed", flowop->fo_name);
2500 		flowop_endop(threadflow, flowop, 0);
2501 
2502 	}
2503 
2504 	return (FILEBENCH_OK);
2505 }
2506 
2507 
2508 /*
2509  * Additional reads and writes. Read and write whole files, write
2510  * and append to files. Some of these work with both fileobjs and
2511  * filesets, others only with filesets. The flowoplib_write routine
2512  * writes from thread memory, while the others read or write using
2513  * fo_buf memory. Note that both flowoplib_read() and
2514  * flowoplib_aiowrite() use thread memory as well.
2515  */
2516 
2517 
2518 /*
2519  * Emulate a read of a whole file. The file must be open with
2520  * file descriptor and filesetentry stored at the locations indexed
2521  * by the flowop's fdnumber. It then seeks to the beginning of the
2522  * associated file, and reads fs_iosize bytes at a time until the end
2523  * of the file. Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if
2524  * out of files, and FILEBENCH_OK on success.
2525  */
2526 static int
2527 flowoplib_readwholefile(threadflow_t *threadflow, flowop_t *flowop)
2528 {
2529 	caddr_t iobuf;
2530 	off64_t bytes = 0;
2531 	int filedesc;
2532 	uint64_t wss;
2533 	fbint_t iosize;
2534 	int ret;
2535 	char zerordbuf;
2536 
2537 	/* get the file to use */
2538 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
2539 	    &filedesc)) != FILEBENCH_OK)
2540 		return (ret);
2541 
2542 	/* an I/O size of zero means read entire working set with one I/O */
2543 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
2544 		iosize = wss;
2545 
2546 	/*
2547 	 * The file may actually be 0 bytes long, in which case skip
2548 	 * the buffer set up call (which would fail) and substitute
2549 	 * a small buffer, which won't really be used.
2550 	 */
2551 	if (iosize == 0) {
2552 		iobuf = (caddr_t)&zerordbuf;
2553 		filebench_log(LOG_DEBUG_SCRIPT,
2554 		    "flowop %s read zero length file", flowop->fo_name);
2555 	} else {
2556 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
2557 		    iosize) != 0)
2558 			return (FILEBENCH_ERROR);
2559 	}
2560 
2561 	/* Measure time to read bytes */
2562 	flowop_beginop(threadflow, flowop);
2563 	(void) lseek64(filedesc, 0, SEEK_SET);
2564 	while ((ret = read(filedesc, iobuf, iosize)) > 0)
2565 		bytes += ret;
2566 
2567 	flowop_endop(threadflow, flowop, bytes);
2568 
2569 	if (ret < 0) {
2570 		filebench_log(LOG_ERROR,
2571 		    "readwhole fail Failed to read whole file: %s",
2572 		    strerror(errno));
2573 		return (FILEBENCH_ERROR);
2574 	}
2575 
2576 	return (FILEBENCH_OK);
2577 }
2578 
2579 /*
2580  * Emulate a write to a file of size fo_iosize.  Will write
2581  * to a file from a fileset if the flowop's fo_fileset field
2582  * specifies one or its fdnumber is non zero. Otherwise it
2583  * will write to a fileobj file, if one exists. If the file
2584  * is not currently open, the routine will attempt to open
2585  * it. The flowop's fo_wss parameter will be used to set the
2586  * maximum file size if it is non-zero, otherwise the
2587  * filesetentry's  fse_size will be used. A random memory
2588  * buffer offset is calculated, and, if fo_random is TRUE,
2589  * a random file offset is used for the write. Otherwise the
2590  * write is to the next sequential location. Returns
2591  * FILEBENCH_ERROR on errors, FILEBENCH_NORSC if iosetup can't
2592  * obtain a file, or FILEBENCH_OK on success.
2593  */
2594 static int
2595 flowoplib_write(threadflow_t *threadflow, flowop_t *flowop)
2596 {
2597 	caddr_t iobuf;
2598 	fbint_t wss;
2599 	fbint_t iosize;
2600 	int filedesc;
2601 	int ret;
2602 
2603 	iosize = avd_get_int(flowop->fo_iosize);
2604 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2605 	    &filedesc, iosize)) != FILEBENCH_OK)
2606 		return (ret);
2607 
2608 	if (avd_get_bool(flowop->fo_random)) {
2609 		uint64_t fileoffset;
2610 
2611 		if (filebench_randomno64(&fileoffset,
2612 		    wss, iosize, NULL) == -1) {
2613 			filebench_log(LOG_ERROR,
2614 			    "file size smaller than IO size for thread %s",
2615 			    flowop->fo_name);
2616 			return (FILEBENCH_ERROR);
2617 		}
2618 		flowop_beginop(threadflow, flowop);
2619 		if (pwrite64(filedesc, iobuf,
2620 		    iosize, (off64_t)fileoffset) == -1) {
2621 			filebench_log(LOG_ERROR, "write failed, "
2622 			    "offset %llu io buffer %zd: %s",
2623 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
2624 			flowop_endop(threadflow, flowop, 0);
2625 			return (FILEBENCH_ERROR);
2626 		}
2627 		flowop_endop(threadflow, flowop, iosize);
2628 	} else {
2629 		flowop_beginop(threadflow, flowop);
2630 		if (write(filedesc, iobuf, iosize) == -1) {
2631 			filebench_log(LOG_ERROR,
2632 			    "write failed, io buffer %zd: %s",
2633 			    iobuf, strerror(errno));
2634 			flowop_endop(threadflow, flowop, 0);
2635 			return (FILEBENCH_ERROR);
2636 		}
2637 		flowop_endop(threadflow, flowop, iosize);
2638 	}
2639 
2640 	return (FILEBENCH_OK);
2641 }
2642 
2643 /*
2644  * Emulate a write of a whole file.  The size of the file
2645  * is taken from a filesetentry identified by fo_srcfdnumber or
2646  * from the working set size, while the file descriptor used is
2647  * identified by fo_fdnumber. Does multiple writes of fo_iosize
2648  * length length until full file has been written. Returns FILEBENCH_ERROR on
2649  * error, FILEBENCH_NORSC if out of files, FILEBENCH_OK on success.
2650  */
2651 static int
2652 flowoplib_writewholefile(threadflow_t *threadflow, flowop_t *flowop)
2653 {
2654 	caddr_t iobuf;
2655 	filesetentry_t *file;
2656 	int wsize;
2657 	off64_t seek;
2658 	off64_t bytes = 0;
2659 	uint64_t wss;
2660 	fbint_t iosize;
2661 	int filedesc;
2662 	int srcfd = flowop->fo_srcfdnumber;
2663 	int ret;
2664 	char zerowrtbuf;
2665 
2666 	/* get the file to use */
2667 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
2668 	    &filedesc)) != FILEBENCH_OK)
2669 		return (ret);
2670 
2671 	/* an I/O size of zero means write entire working set with one I/O */
2672 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
2673 		iosize = wss;
2674 
2675 	/*
2676 	 * The file may actually be 0 bytes long, in which case skip
2677 	 * the buffer set up call (which would fail) and substitute
2678 	 * a small buffer, which won't really be used.
2679 	 */
2680 	if (iosize == 0) {
2681 		iobuf = (caddr_t)&zerowrtbuf;
2682 		filebench_log(LOG_DEBUG_SCRIPT,
2683 		    "flowop %s wrote zero length file", flowop->fo_name);
2684 	} else {
2685 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
2686 		    iosize) != 0)
2687 			return (FILEBENCH_ERROR);
2688 	}
2689 
2690 	file = threadflow->tf_fse[srcfd];
2691 	if ((srcfd != 0) && (file == NULL)) {
2692 		filebench_log(LOG_ERROR, "flowop %s: NULL src file",
2693 		    flowop->fo_name);
2694 		return (FILEBENCH_ERROR);
2695 	}
2696 
2697 	if (file)
2698 		wss = file->fse_size;
2699 
2700 	wsize = (int)MIN(wss, iosize);
2701 
2702 	/* Measure time to write bytes */
2703 	flowop_beginop(threadflow, flowop);
2704 	for (seek = 0; seek < wss; seek += wsize) {
2705 		ret = write(filedesc, iobuf, wsize);
2706 		if (ret != wsize) {
2707 			filebench_log(LOG_ERROR,
2708 			    "Failed to write %d bytes on fd %d: %s",
2709 			    wsize, filedesc, strerror(errno));
2710 			flowop_endop(threadflow, flowop, 0);
2711 			return (FILEBENCH_ERROR);
2712 		}
2713 		wsize = (int)MIN(wss - seek, iosize);
2714 		bytes += ret;
2715 	}
2716 	flowop_endop(threadflow, flowop, bytes);
2717 
2718 	return (FILEBENCH_OK);
2719 }
2720 
2721 
2722 /*
2723  * Emulate a fixed size append to a file. Will append data to
2724  * a file chosen from a fileset if the flowop's fo_fileset
2725  * field specifies one or if its fdnumber is non zero.
2726  * Otherwise it will write to a fileobj file, if one exists.
2727  * The flowop's fo_wss parameter will be used to set the
2728  * maximum file size if it is non-zero, otherwise the
2729  * filesetentry's fse_size will be used. A random memory
2730  * buffer offset is calculated, then a logical seek to the
2731  * end of file is done followed by a write of fo_iosize
2732  * bytes. Writes are actually done from fo_buf, rather than
2733  * tf_mem as is done with flowoplib_write(), and no check
2734  * is made to see if fo_iosize exceeds the size of fo_buf.
2735  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
2736  * files in the fileset, FILEBENCH_OK on success.
2737  */
2738 static int
2739 flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop)
2740 {
2741 	caddr_t iobuf;
2742 	int filedesc;
2743 	fbint_t wss;
2744 	fbint_t iosize;
2745 	int ret;
2746 
2747 	iosize = avd_get_int(flowop->fo_iosize);
2748 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2749 	    &filedesc, iosize)) != FILEBENCH_OK)
2750 		return (ret);
2751 
2752 	/* XXX wss is not being used */
2753 
2754 	/* Measure time to write bytes */
2755 	flowop_beginop(threadflow, flowop);
2756 	(void) lseek64(filedesc, 0, SEEK_END);
2757 	ret = write(filedesc, iobuf, iosize);
2758 	if (ret != iosize) {
2759 		filebench_log(LOG_ERROR,
2760 		    "Failed to write %llu bytes on fd %d: %s",
2761 		    (u_longlong_t)iosize, filedesc, strerror(errno));
2762 		flowop_endop(threadflow, flowop, ret);
2763 		return (FILEBENCH_ERROR);
2764 	}
2765 	flowop_endop(threadflow, flowop, ret);
2766 
2767 	return (FILEBENCH_OK);
2768 }
2769 
2770 /*
2771  * Emulate a random size append to a file. Will append data
2772  * to a file chosen from a fileset if the flowop's fo_fileset
2773  * field specifies one or if its fdnumber is non zero. Otherwise
2774  * it will write to a fileobj file, if one exists. The flowop's
2775  * fo_wss parameter will be used to set the maximum file size
2776  * if it is non-zero, otherwise the filesetentry's fse_size
2777  * will be used.  A random transfer size (but at most fo_iosize
2778  * bytes) and a random memory offset are calculated. A logical
2779  * seek to the end of file is done, then writes of up to
2780  * FILE_ALLOC_BLOCK in size are done until the full transfer
2781  * size has been written. Writes are actually done from fo_buf,
2782  * rather than tf_mem as is done with flowoplib_write().
2783  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
2784  * files in the fileset, FILEBENCH_OK on success.
2785  */
2786 static int
2787 flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop)
2788 {
2789 	caddr_t iobuf;
2790 	uint64_t appendsize;
2791 	int filedesc;
2792 	fbint_t wss;
2793 	fbint_t iosize;
2794 	int ret = 0;
2795 
2796 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0) {
2797 		filebench_log(LOG_ERROR, "zero iosize for flowop %s",
2798 		    flowop->fo_name);
2799 		return (FILEBENCH_ERROR);
2800 	}
2801 
2802 	if (filebench_randomno64(&appendsize, iosize, 1LL, NULL) != 0)
2803 		return (FILEBENCH_ERROR);
2804 
2805 	/* skip if attempting zero length append */
2806 	if (appendsize == 0) {
2807 		flowop_beginop(threadflow, flowop);
2808 		flowop_endop(threadflow, flowop, 0LL);
2809 		return (FILEBENCH_OK);
2810 	}
2811 
2812 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2813 	    &filedesc, appendsize)) != FILEBENCH_OK)
2814 		return (ret);
2815 
2816 	/* XXX wss is not being used */
2817 
2818 	/* Measure time to write bytes */
2819 	flowop_beginop(threadflow, flowop);
2820 
2821 	(void) lseek64(filedesc, 0, SEEK_END);
2822 	ret = write(filedesc, iobuf, appendsize);
2823 	if (ret != appendsize) {
2824 		filebench_log(LOG_ERROR,
2825 		    "Failed to write %llu bytes on fd %d: %s",
2826 		    (u_longlong_t)appendsize, filedesc, strerror(errno));
2827 		flowop_endop(threadflow, flowop, 0);
2828 		return (FILEBENCH_ERROR);
2829 	}
2830 
2831 	flowop_endop(threadflow, flowop, appendsize);
2832 
2833 	return (FILEBENCH_OK);
2834 }
2835 
2836 typedef struct testrandvar_priv {
2837 	uint64_t sample_count;
2838 	double val_sum;
2839 	double sqr_sum;
2840 } testrandvar_priv_t;
2841 
2842 /*
2843  * flowop to calculate various statistics from the number stream
2844  * produced by a random variable. This allows verification that the
2845  * random distribution used to define the random variable is producing
2846  * the expected distribution of random numbers.
2847  */
2848 /* ARGSUSED */
2849 static int
2850 flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop)
2851 {
2852 	testrandvar_priv_t	*mystats;
2853 	double			value;
2854 
2855 	if ((mystats = (testrandvar_priv_t *)flowop->fo_private) == NULL) {
2856 		filebench_log(LOG_ERROR, "testrandvar not initialized\n");
2857 		filebench_shutdown(1);
2858 		return (-1);
2859 	}
2860 
2861 	value = avd_get_dbl(flowop->fo_value);
2862 
2863 	mystats->sample_count++;
2864 	mystats->val_sum += value;
2865 	mystats->sqr_sum += (value * value);
2866 
2867 	return (0);
2868 }
2869 
2870 /*
2871  * Initialize the private data area used to accumulate the statistics
2872  */
2873 static int
2874 flowoplib_testrandvar_init(flowop_t *flowop)
2875 {
2876 	testrandvar_priv_t	*mystats;
2877 
2878 	if ((mystats = (testrandvar_priv_t *)
2879 	    malloc(sizeof (testrandvar_priv_t))) == NULL) {
2880 		filebench_log(LOG_ERROR, "could not initialize testrandvar");
2881 		filebench_shutdown(1);
2882 		return (-1);
2883 	}
2884 
2885 	mystats->sample_count = 0;
2886 	mystats->val_sum = 0;
2887 	mystats->sqr_sum = 0;
2888 	flowop->fo_private = (void *)mystats;
2889 
2890 	(void) ipc_mutex_unlock(&flowop->fo_lock);
2891 	return (0);
2892 }
2893 
2894 /*
2895  * Print out the accumulated statistics, and free the private storage
2896  */
2897 static void
2898 flowoplib_testrandvar_destruct(flowop_t *flowop)
2899 {
2900 	testrandvar_priv_t	*mystats;
2901 	double mean, std_dev, dbl_count;
2902 
2903 	(void) ipc_mutex_lock(&flowop->fo_lock);
2904 	if ((mystats = (testrandvar_priv_t *)
2905 	    flowop->fo_private) == NULL) {
2906 		(void) ipc_mutex_unlock(&flowop->fo_lock);
2907 		return;
2908 	}
2909 
2910 	flowop->fo_private = NULL;
2911 	(void) ipc_mutex_unlock(&flowop->fo_lock);
2912 
2913 	dbl_count = (double)mystats->sample_count;
2914 	mean = mystats->val_sum / dbl_count;
2915 	std_dev = sqrt((mystats->sqr_sum / dbl_count) - (mean * mean)) / mean;
2916 
2917 	filebench_log(LOG_VERBOSE,
2918 	    "testrandvar: ops = %llu, mean = %8.2lf, stddev = %8.2lf",
2919 	    (u_longlong_t)mystats->sample_count, mean, std_dev);
2920 	free(mystats);
2921 }
2922 
2923 /*
2924  * prints message to the console from within a thread
2925  */
2926 static int
2927 flowoplib_print(threadflow_t *threadflow, flowop_t *flowop)
2928 {
2929 	procflow_t *procflow;
2930 
2931 	procflow = threadflow->tf_process;
2932 	filebench_log(LOG_INFO,
2933 	    "Message from process (%s,%d), thread (%s,%d): %s",
2934 	    procflow->pf_name, procflow->pf_instance,
2935 	    threadflow->tf_name, threadflow->tf_instance,
2936 	    avd_get_str(flowop->fo_value));
2937 
2938 	return (FILEBENCH_OK);
2939 }
2940 
2941 /*
2942  * Prints usage information for flowop operations.
2943  */
2944 void
2945 flowoplib_usage()
2946 {
2947 	(void) fprintf(stderr,
2948 	    "flowop [openfile|createfile] name=<name>,fileset=<fname>\n");
2949 	(void) fprintf(stderr,
2950 	    "                       [,fd=<file desc num>]\n");
2951 	(void) fprintf(stderr, "\n");
2952 	(void) fprintf(stderr,
2953 	    "flowop closefile name=<name>,fd=<file desc num>]\n");
2954 	(void) fprintf(stderr, "\n");
2955 	(void) fprintf(stderr, "flowop deletefile name=<name>\n");
2956 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2957 	(void) fprintf(stderr,
2958 	    "                       [,fd=<file desc num>]\n");
2959 	(void) fprintf(stderr, "\n");
2960 	(void) fprintf(stderr, "flowop statfile name=<name>\n");
2961 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2962 	(void) fprintf(stderr,
2963 	    "                       [,fd=<file desc num>]\n");
2964 	(void) fprintf(stderr, "\n");
2965 	(void) fprintf(stderr,
2966 	    "flowop fsync name=<name>,fd=<file desc num>]\n");
2967 	(void) fprintf(stderr, "\n");
2968 	(void) fprintf(stderr,
2969 	    "flowop fsyncset name=<name>,fileset=<fname>]\n");
2970 	(void) fprintf(stderr, "\n");
2971 	(void) fprintf(stderr, "flowop [write|read|aiowrite] name=<name>, \n");
2972 	(void) fprintf(stderr,
2973 	    "                       filename|fileset=<fname>,\n");
2974 	(void) fprintf(stderr, "                       iosize=<size>\n");
2975 	(void) fprintf(stderr, "                       [,directio]\n");
2976 	(void) fprintf(stderr, "                       [,dsync]\n");
2977 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2978 	(void) fprintf(stderr, "                       [,random]\n");
2979 	(void) fprintf(stderr, "                       [,opennext]\n");
2980 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
2981 	(void) fprintf(stderr,
2982 	    "flowop [appendfile|appendfilerand] name=<name>, \n");
2983 	(void) fprintf(stderr,
2984 	    "                       filename|fileset=<fname>,\n");
2985 	(void) fprintf(stderr, "                       iosize=<size>\n");
2986 	(void) fprintf(stderr, "                       [,dsync]\n");
2987 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2988 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
2989 	(void) fprintf(stderr,
2990 	    "flowop [readwholefile|writewholefile] name=<name>, \n");
2991 	(void) fprintf(stderr,
2992 	    "                       filename|fileset=<fname>,\n");
2993 	(void) fprintf(stderr, "                       iosize=<size>\n");
2994 	(void) fprintf(stderr, "                       [,dsync]\n");
2995 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2996 	(void) fprintf(stderr, "\n");
2997 	(void) fprintf(stderr, "flowop aiowait name=<name>,target="
2998 	    "<aiowrite-flowop>\n");
2999 	(void) fprintf(stderr, "\n");
3000 	(void) fprintf(stderr, "flowop sempost name=<name>,"
3001 	    "target=<semblock-flowop>,\n");
3002 	(void) fprintf(stderr,
3003 	    "                       value=<increment-to-post>\n");
3004 	(void) fprintf(stderr, "\n");
3005 	(void) fprintf(stderr, "flowop semblock name=<name>,value="
3006 	    "<decrement-to-receive>,\n");
3007 	(void) fprintf(stderr, "                       highwater="
3008 	    "<inbound-queue-max>\n");
3009 	(void) fprintf(stderr, "\n");
3010 	(void) fprintf(stderr, "flowop block name=<name>\n");
3011 	(void) fprintf(stderr, "\n");
3012 	(void) fprintf(stderr,
3013 	    "flowop wakeup name=<name>,target=<block-flowop>,\n");
3014 	(void) fprintf(stderr, "\n");
3015 	(void) fprintf(stderr,
3016 	    "flowop hog name=<name>,value=<number-of-mem-ops>\n");
3017 	(void) fprintf(stderr,
3018 	    "flowop delay name=<name>,value=<number-of-seconds>\n");
3019 	(void) fprintf(stderr, "\n");
3020 	(void) fprintf(stderr, "flowop eventlimit name=<name>\n");
3021 	(void) fprintf(stderr, "flowop bwlimit name=<name>,value=<mb/s>\n");
3022 	(void) fprintf(stderr, "flowop iopslimit name=<name>,value=<iop/s>\n");
3023 	(void) fprintf(stderr,
3024 	    "flowop finishoncount name=<name>,value=<ops/s>\n");
3025 	(void) fprintf(stderr,
3026 	    "flowop finishonbytes name=<name>,value=<bytes>\n");
3027 	(void) fprintf(stderr, "\n");
3028 	(void) fprintf(stderr, "\n");
3029 }
3030