xref: /onnv-gate/usr/src/cmd/filebench/common/flowop_library.c (revision 9326:475779da8c08)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  *
25  * Portions Copyright 2008 Denis Cheng
26  */
27 
28 #include "config.h"
29 
30 #include <sys/types.h>
31 #ifdef HAVE_SYS_ASYNCH_H
32 #include <sys/asynch.h>
33 #endif
34 #include <stddef.h>
35 #include <sys/ipc.h>
36 #include <sys/sem.h>
37 #include <sys/errno.h>
38 #include <sys/time.h>
39 #include <inttypes.h>
40 #include <fcntl.h>
41 #include <math.h>
42 #include <dirent.h>
43 
44 #ifdef HAVE_UTILITY_H
45 #include <utility.h>
46 #endif /* HAVE_UTILITY_H */
47 
48 #ifdef HAVE_SYS_ASYNC_H
49 #include <sys/asynch.h>
50 #endif /* HAVE_SYS_ASYNC_H */
51 
52 #ifndef HAVE_UINT_T
53 #define	uint_t unsigned int
54 #endif /* HAVE_UINT_T */
55 
56 #ifndef HAVE_SYSV_SEM
57 #include <semaphore.h>
58 #endif /* HAVE_SYSV_SEM */
59 
60 #include "filebench.h"
61 #include "flowop.h"
62 #include "fileset.h"
63 #include "fb_random.h"
64 #include "utils.h"
65 #include "fsplug.h"
66 
67 /*
68  * These routines implement the flowops from the f language. Each
69  * flowop has has a name such as "read", and a set of function pointers
70  * to call for initialization, execution and destruction of the flowop.
71  * The table flowoplib_funcs[] contains a flowoplib struct for each
72  * implemented flowop. Most flowops use a generic initialization function
73  * and all currently use a generic destruction function. All flowop
74  * functions referenced from the table are in this file, though, of
75  * course, they often call functions from other files.
76  *
77  * The flowop_init() routine uses the flowoplib_funcs[] table to
78  * create an initial set of "instance 0" flowops, one for each type of
79  * flowop, from which all other flowops are derived. These "instance 0"
80  * flowops are initialized with information from the table including
81  * pointers for their fo_init, fo_func and fo_destroy functions. When
82  * a flowop definition is encountered in an f language script, the
83  * "type" of flowop, such as "read" is used to search for the
84  * "instance 0" flowop named "read", then a new flowop is allocated
85  * which inherits its function pointers and other initial properties
86  * from the instance 0 flowop, and is given a new name as specified
87  * by the "name=" attribute.
88  */
89 
90 static void flowoplib_destruct_noop(flowop_t *flowop);
91 static int flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop);
92 static int flowoplib_print(threadflow_t *threadflow, flowop_t *flowop);
93 static int flowoplib_write(threadflow_t *threadflow, flowop_t *flowop);
94 static int flowoplib_read(threadflow_t *threadflow, flowop_t *flowop);
95 static int flowoplib_block_init(flowop_t *flowop);
96 static int flowoplib_block(threadflow_t *threadflow, flowop_t *flowop);
97 static int flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop);
98 static int flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop);
99 static int flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop);
100 static int flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop);
101 static int flowoplib_sempost_init(flowop_t *flowop);
102 static int flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop);
103 static int flowoplib_semblock_init(flowop_t *flowop);
104 static void flowoplib_semblock_destruct(flowop_t *flowop);
105 static int flowoplib_eventlimit(threadflow_t *, flowop_t *flowop);
106 static int flowoplib_bwlimit(threadflow_t *, flowop_t *flowop);
107 static int flowoplib_iopslimit(threadflow_t *, flowop_t *flowop);
108 static int flowoplib_opslimit(threadflow_t *, flowop_t *flowop);
109 static int flowoplib_openfile(threadflow_t *, flowop_t *flowop);
110 static int flowoplib_openfile_common(threadflow_t *, flowop_t *flowop, int fd);
111 static int flowoplib_createfile(threadflow_t *, flowop_t *flowop);
112 static int flowoplib_closefile(threadflow_t *, flowop_t *flowop);
113 static int flowoplib_makedir(threadflow_t *, flowop_t *flowop);
114 static int flowoplib_removedir(threadflow_t *, flowop_t *flowop);
115 static int flowoplib_listdir(threadflow_t *, flowop_t *flowop);
116 static int flowoplib_fsync(threadflow_t *, flowop_t *flowop);
117 static int flowoplib_readwholefile(threadflow_t *, flowop_t *flowop);
118 static int flowoplib_writewholefile(threadflow_t *, flowop_t *flowop);
119 static int flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop);
120 static int flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop);
121 static int flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop);
122 static int flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop);
123 static int flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop);
124 static int flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop);
125 static int flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop);
126 static int flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop);
127 static int flowoplib_testrandvar_init(flowop_t *flowop);
128 static void flowoplib_testrandvar_destruct(flowop_t *flowop);
129 
130 static flowop_proto_t flowoplib_funcs[] = {
131 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "write", flowop_init_generic,
132 	flowoplib_write, flowop_destruct_generic,
133 	FLOW_TYPE_IO, FLOW_ATTR_READ, "read", flowop_init_generic,
134 	flowoplib_read, flowop_destruct_generic,
135 	FLOW_TYPE_SYNC, 0, "block", flowoplib_block_init,
136 	flowoplib_block, flowop_destruct_generic,
137 	FLOW_TYPE_SYNC, 0, "wakeup", flowop_init_generic,
138 	flowoplib_wakeup, flowop_destruct_generic,
139 	FLOW_TYPE_SYNC, 0, "semblock", flowoplib_semblock_init,
140 	flowoplib_semblock, flowoplib_semblock_destruct,
141 	FLOW_TYPE_SYNC, 0, "sempost", flowoplib_sempost_init,
142 	flowoplib_sempost, flowoplib_destruct_noop,
143 	FLOW_TYPE_OTHER, 0, "hog", flowop_init_generic,
144 	flowoplib_hog, flowop_destruct_generic,
145 	FLOW_TYPE_OTHER, 0, "delay", flowop_init_generic,
146 	flowoplib_delay, flowop_destruct_generic,
147 	FLOW_TYPE_OTHER, 0, "eventlimit", flowop_init_generic,
148 	flowoplib_eventlimit, flowop_destruct_generic,
149 	FLOW_TYPE_OTHER, 0, "bwlimit", flowop_init_generic,
150 	flowoplib_bwlimit, flowop_destruct_generic,
151 	FLOW_TYPE_OTHER, 0, "iopslimit", flowop_init_generic,
152 	flowoplib_iopslimit, flowop_destruct_generic,
153 	FLOW_TYPE_OTHER, 0, "opslimit", flowop_init_generic,
154 	flowoplib_opslimit, flowop_destruct_generic,
155 	FLOW_TYPE_OTHER, 0, "finishoncount", flowop_init_generic,
156 	flowoplib_finishoncount, flowop_destruct_generic,
157 	FLOW_TYPE_OTHER, 0, "finishonbytes", flowop_init_generic,
158 	flowoplib_finishonbytes, flowop_destruct_generic,
159 	FLOW_TYPE_IO, 0, "openfile", flowop_init_generic,
160 	flowoplib_openfile, flowop_destruct_generic,
161 	FLOW_TYPE_IO, 0, "createfile", flowop_init_generic,
162 	flowoplib_createfile, flowop_destruct_generic,
163 	FLOW_TYPE_IO, 0, "closefile", flowop_init_generic,
164 	flowoplib_closefile, flowop_destruct_generic,
165 	FLOW_TYPE_IO, 0, "makedir", flowop_init_generic,
166 	flowoplib_makedir, flowop_destruct_generic,
167 	FLOW_TYPE_IO, 0, "removedir", flowop_init_generic,
168 	flowoplib_removedir, flowop_destruct_generic,
169 	FLOW_TYPE_IO, 0, "listdir", flowop_init_generic,
170 	flowoplib_listdir, flowop_destruct_generic,
171 	FLOW_TYPE_IO, 0, "fsync", flowop_init_generic,
172 	flowoplib_fsync, flowop_destruct_generic,
173 	FLOW_TYPE_IO, 0, "fsyncset", flowop_init_generic,
174 	flowoplib_fsyncset, flowop_destruct_generic,
175 	FLOW_TYPE_IO, 0, "statfile", flowop_init_generic,
176 	flowoplib_statfile, flowop_destruct_generic,
177 	FLOW_TYPE_IO, FLOW_ATTR_READ, "readwholefile", flowop_init_generic,
178 	flowoplib_readwholefile, flowop_destruct_generic,
179 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfile", flowop_init_generic,
180 	flowoplib_appendfile, flowop_destruct_generic,
181 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfilerand", flowop_init_generic,
182 	flowoplib_appendfilerand, flowop_destruct_generic,
183 	FLOW_TYPE_IO, 0, "deletefile", flowop_init_generic,
184 	flowoplib_deletefile, flowop_destruct_generic,
185 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "writewholefile", flowop_init_generic,
186 	flowoplib_writewholefile, flowop_destruct_generic,
187 	FLOW_TYPE_OTHER, 0, "print", flowop_init_generic,
188 	flowoplib_print, flowop_destruct_generic,
189 	/* routine to calculate mean and stddev for output from a randvar */
190 	FLOW_TYPE_OTHER, 0, "testrandvar", flowoplib_testrandvar_init,
191 	flowoplib_testrandvar, flowoplib_testrandvar_destruct
192 };
193 
194 /*
195  * Loops through the list of flowops defined in this
196  * module, and creates and initializes a flowop for each one
197  * by calling flowop_flow_init. As a side effect of calling
198  * flowop_flow_init, the created flowops are placed on the
199  * master flowop list. All created flowops are set to
200  * instance "0".
201  */
202 void
203 flowoplib_flowinit()
204 {
205 	int nops = sizeof (flowoplib_funcs) / sizeof (flowop_proto_t);
206 
207 	flowop_flow_init(flowoplib_funcs, nops);
208 }
209 
210 /*
211  * Special total noop destruct
212  */
213 /* ARGSUSED */
214 static void
215 flowoplib_destruct_noop(flowop_t *flowop)
216 {
217 }
218 
219 /*
220  * Generates a file attribute from flags in the supplied flowop.
221  * Sets FLOW_ATTR_DIRECTIO and/or FLOW_ATTR_DSYNC as needed.
222  */
223 static int
224 flowoplib_fileattrs(flowop_t *flowop)
225 {
226 	int attrs = 0;
227 
228 	if (avd_get_bool(flowop->fo_directio))
229 		attrs |= FLOW_ATTR_DIRECTIO;
230 
231 	if (avd_get_bool(flowop->fo_dsync))
232 		attrs |= FLOW_ATTR_DSYNC;
233 
234 	return (attrs);
235 }
236 
237 /*
238  * Obtain a filesetentry for a file. Result placed where filep points.
239  * Supply with a flowop and a flag to indicate whether an existent or
240  * non-existent file is required. Returns FILEBENCH_NORSC if all out
241  * of the appropriate type of directories, FILEBENCH_ERROR if the
242  * flowop does not point to a fileset, and FILEBENCH_OK otherwise.
243  */
244 static int
245 flowoplib_pickfile(filesetentry_t **filep, flowop_t *flowop, int flags, int tid)
246 {
247 	fileset_t	*fileset;
248 	int		fileindex;
249 
250 	if ((fileset = flowop->fo_fileset) == NULL) {
251 		filebench_log(LOG_ERROR, "flowop NO fileset");
252 		return (FILEBENCH_ERROR);
253 	}
254 
255 	if (flowop->fo_fileindex) {
256 		fileindex = (int)(avd_get_dbl(flowop->fo_fileindex) *
257 		    ((double)(fileset->fs_constentries / 2)));
258 		fileindex = fileindex % fileset->fs_constentries;
259 		flags |= FILESET_PICKBYINDEX;
260 	} else {
261 		fileindex = 0;
262 	}
263 
264 	if ((*filep = fileset_pick(fileset, FILESET_PICKFILE | flags,
265 	    tid, fileindex)) == NULL) {
266 		filebench_log(LOG_DEBUG_SCRIPT,
267 		    "flowop %s failed to pick file from fileset %s",
268 		    flowop->fo_name,
269 		    avd_get_str(fileset->fs_name));
270 		return (FILEBENCH_NORSC);
271 	}
272 
273 	return (FILEBENCH_OK);
274 }
275 
276 /*
277  * Obtain a filesetentry for a leaf directory. Result placed where dirp
278  * points. Supply with flowop and a flag to indicate whether an existent
279  * or non-existent leaf directory is required. Returns FILEBENCH_NORSC
280  * if all out of the appropriate type of directories, FILEBENCH_ERROR
281  * if the flowop does not point to a fileset, and FILEBENCH_OK otherwise.
282  */
283 static int
284 flowoplib_pickleafdir(filesetentry_t **dirp, flowop_t *flowop, int flags)
285 {
286 	fileset_t	*fileset;
287 	int		dirindex;
288 
289 	if ((fileset = flowop->fo_fileset) == NULL) {
290 		filebench_log(LOG_ERROR, "flowop NO fileset");
291 		return (FILEBENCH_ERROR);
292 	}
293 
294 	if (flowop->fo_fileindex) {
295 		dirindex = (int)(avd_get_dbl(flowop->fo_fileindex) *
296 		    ((double)(fileset->fs_constleafdirs / 2)));
297 		dirindex = dirindex % fileset->fs_constleafdirs;
298 		flags |= FILESET_PICKBYINDEX;
299 	} else {
300 		dirindex = 0;
301 	}
302 
303 	if ((*dirp = fileset_pick(fileset,
304 	    FILESET_PICKLEAFDIR | flags, 0, dirindex)) == NULL) {
305 		filebench_log(LOG_DEBUG_SCRIPT,
306 		    "flowop %s failed to pick directory from fileset %s",
307 		    flowop->fo_name,
308 		    avd_get_str(fileset->fs_name));
309 		return (FILEBENCH_NORSC);
310 	}
311 
312 	return (FILEBENCH_OK);
313 }
314 
315 /*
316  * Searches for a file descriptor. Tries the flowop's
317  * fo_fdnumber first and returns with it if it has been
318  * explicitly set (greater than 0). It next checks to
319  * see if a rotating file descriptor policy is in effect,
320  * and if not returns the fdnumber regardless of what
321  * it is. (note that if it is 0, it just selects to the
322  * default file descriptor in the threadflow's tf_fd
323  * array). If the rotating fd policy is in effect, it
324  * cycles from the end of the tf_fd array to one location
325  * beyond the maximum needed by the number of entries in
326  * the associated fileset on each invocation, then starts
327  * over from the end.
328  *
329  * The routine returns an index into the threadflow's
330  * tf_fd table where the actual file descriptor will be
331  * found. Note: the calling routine must not call this
332  * routine if the flowop does not have a fileset, and the
333  * flowop's fo_fdnumber is zero and fo_rotatefd is
334  * asserted, or an addressing fault may occur.
335  */
336 static int
337 flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop)
338 {
339 	fbint_t	entries;
340 	int fdnumber = flowop->fo_fdnumber;
341 
342 	/* If the script sets the fd explicitly */
343 	if (fdnumber > 0)
344 		return (fdnumber);
345 
346 	/* If the flowop defaults to persistent fd */
347 	if (!avd_get_bool(flowop->fo_rotatefd))
348 		return (fdnumber);
349 
350 	if (flowop->fo_fileset == NULL) {
351 		filebench_log(LOG_ERROR, "flowop NULL file");
352 		return (FILEBENCH_ERROR);
353 	}
354 
355 	entries = flowop->fo_fileset->fs_constentries;
356 
357 	/* Rotate the fd on each flowop invocation */
358 	if (entries > (THREADFLOW_MAXFD / 2)) {
359 		filebench_log(LOG_ERROR, "Out of file descriptors in flowop %s"
360 		    " (too many files : %llu",
361 		    flowop->fo_name, (u_longlong_t)entries);
362 		return (FILEBENCH_ERROR);
363 	}
364 
365 	/* First time around */
366 	if (threadflow->tf_fdrotor == 0)
367 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
368 
369 	/* One fd for every file in the set */
370 	if (entries == (THREADFLOW_MAXFD - threadflow->tf_fdrotor))
371 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
372 
373 
374 	threadflow->tf_fdrotor--;
375 	filebench_log(LOG_DEBUG_IMPL, "selected fd = %d",
376 	    threadflow->tf_fdrotor);
377 	return (threadflow->tf_fdrotor);
378 }
379 
380 /*
381  * Determines the file descriptor to use, and attempts to open
382  * the file if it is not already open. Also determines the wss
383  * value. Returns FILEBENCH_ERROR on errors, FILESET_NORSC if
384  * if flowop_openfile_common couldn't obtain an appropriate file
385  * from a the fileset, and FILEBENCH_OK otherwise.
386  */
387 static int
388 flowoplib_filesetup(threadflow_t *threadflow, flowop_t *flowop,
389     fbint_t *wssp, fb_fdesc_t **fdescp)
390 {
391 	int fd = flowoplib_fdnum(threadflow, flowop);
392 
393 	if (fd == -1)
394 		return (FILEBENCH_ERROR);
395 
396 	/* check for conflicting fdnumber and file name */
397 	if ((fd > 0) && (threadflow->tf_fse[fd] != NULL)) {
398 		char *fd_based_name;
399 
400 		fd_based_name =
401 		    avd_get_str(threadflow->tf_fse[fd]->fse_fileset->fs_name);
402 
403 		if (flowop->fo_filename != NULL) {
404 			char *fo_based_name;
405 
406 			fo_based_name = avd_get_str(flowop->fo_filename);
407 			if (strcmp(fd_based_name, fo_based_name) != 0) {
408 				filebench_log(LOG_ERROR, "Name of fd refer"
409 				    "enced fileset name (%s) CONFLICTS with"
410 				    " flowop supplied fileset name (%s)",
411 				    fd_based_name, fo_based_name);
412 				filebench_shutdown(1);
413 				return (FILEBENCH_ERROR);
414 			}
415 		}
416 	}
417 
418 	if (threadflow->tf_fd[fd].fd_ptr == NULL) {
419 		int ret;
420 
421 		if ((ret = flowoplib_openfile_common(
422 		    threadflow, flowop, fd)) != FILEBENCH_OK)
423 			return (ret);
424 
425 		if (threadflow->tf_fse[fd]) {
426 			filebench_log(LOG_DEBUG_IMPL, "opened file %s",
427 			    threadflow->tf_fse[fd]->fse_path);
428 		} else {
429 			filebench_log(LOG_DEBUG_IMPL,
430 			    "opened device %s/%s",
431 			    avd_get_str(flowop->fo_fileset->fs_path),
432 			    avd_get_str(flowop->fo_fileset->fs_name));
433 		}
434 	}
435 
436 	*fdescp = &(threadflow->tf_fd[fd]);
437 
438 	if ((*wssp = flowop->fo_constwss) == 0) {
439 		if (threadflow->tf_fse[fd])
440 			*wssp = threadflow->tf_fse[fd]->fse_size;
441 		else
442 			*wssp = avd_get_int(flowop->fo_fileset->fs_size);
443 	}
444 
445 	return (FILEBENCH_OK);
446 }
447 
448 /*
449  * Determines the io buffer or random offset into tf_mem for
450  * the IO operation. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
451  */
452 static int
453 flowoplib_iobufsetup(threadflow_t *threadflow, flowop_t *flowop,
454     caddr_t *iobufp, fbint_t iosize)
455 {
456 	long memsize;
457 	size_t memoffset;
458 
459 	if (iosize == 0) {
460 		filebench_log(LOG_ERROR, "zero iosize for thread %s",
461 		    flowop->fo_name);
462 		return (FILEBENCH_ERROR);
463 	}
464 
465 	if ((memsize = threadflow->tf_constmemsize) != 0) {
466 
467 		/* use tf_mem for I/O with random offset */
468 		if (filebench_randomno(&memoffset,
469 		    memsize, iosize, NULL) == -1) {
470 			filebench_log(LOG_ERROR,
471 			    "tf_memsize smaller than IO size for thread %s",
472 			    flowop->fo_name);
473 			return (FILEBENCH_ERROR);
474 		}
475 		*iobufp = threadflow->tf_mem + memoffset;
476 
477 	} else {
478 		/* use private I/O buffer */
479 		if ((flowop->fo_buf != NULL) &&
480 		    (flowop->fo_buf_size < iosize)) {
481 			/* too small, so free up and re-allocate */
482 			free(flowop->fo_buf);
483 			flowop->fo_buf = NULL;
484 		}
485 
486 		/*
487 		 * Allocate memory for the  buffer. The memory is freed
488 		 * by flowop_destruct_generic() or by this routine if more
489 		 * memory is needed for the buffer.
490 		 */
491 		if ((flowop->fo_buf == NULL) && ((flowop->fo_buf
492 		    = (char *)malloc(iosize)) == NULL))
493 			return (FILEBENCH_ERROR);
494 
495 		flowop->fo_buf_size = iosize;
496 		*iobufp = flowop->fo_buf;
497 	}
498 	return (FILEBENCH_OK);
499 }
500 
501 /*
502  * Determines the file descriptor to use, opens it if necessary, the
503  * io buffer or random offset into tf_mem for IO operation and the wss
504  * value. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
505  */
506 int
507 flowoplib_iosetup(threadflow_t *threadflow, flowop_t *flowop,
508     fbint_t *wssp, caddr_t *iobufp, fb_fdesc_t **filedescp, fbint_t iosize)
509 {
510 	int ret;
511 
512 	if ((ret = flowoplib_filesetup(threadflow, flowop, wssp, filedescp)) !=
513 	    FILEBENCH_OK)
514 		return (ret);
515 
516 	if ((ret = flowoplib_iobufsetup(threadflow, flowop, iobufp, iosize)) !=
517 	    FILEBENCH_OK)
518 		return (ret);
519 
520 	return (FILEBENCH_OK);
521 }
522 
523 /*
524  * Emulate posix read / pread. If the flowop has a fileset,
525  * a file descriptor number index is fetched, otherwise a
526  * supplied fileobj file is used. In either case the specified
527  * file will be opened if not already open. If the flowop has
528  * neither a fileset or fileobj, an error is logged and FILEBENCH_ERROR
529  * returned.
530  *
531  * The actual read is done to a random offset in the
532  * threadflow's thread memory (tf_mem), with a size set by
533  * fo_iosize and at either a random disk offset within the
534  * working set size, or at the next sequential location. If
535  * any errors are encountered, FILEBENCH_ERROR is returned,
536  * if no appropriate file can be obtained from the fileset then
537  * FILEBENCH_NORSC is returned, otherise FILEBENCH_OK is returned.
538  */
539 static int
540 flowoplib_read(threadflow_t *threadflow, flowop_t *flowop)
541 {
542 	caddr_t iobuf;
543 	fbint_t wss;
544 	fbint_t iosize;
545 	fb_fdesc_t *fdesc;
546 	int ret;
547 
548 
549 	iosize = avd_get_int(flowop->fo_iosize);
550 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
551 	    &fdesc, iosize)) != FILEBENCH_OK)
552 		return (ret);
553 
554 	if (avd_get_bool(flowop->fo_random)) {
555 		uint64_t fileoffset;
556 
557 		if (filebench_randomno64(&fileoffset,
558 		    wss, iosize, NULL) == -1) {
559 			filebench_log(LOG_ERROR,
560 			    "file size smaller than IO size for thread %s",
561 			    flowop->fo_name);
562 			return (FILEBENCH_ERROR);
563 		}
564 
565 		(void) flowop_beginop(threadflow, flowop);
566 		if ((ret = FB_PREAD(fdesc, iobuf,
567 		    iosize, (off64_t)fileoffset)) == -1) {
568 			(void) flowop_endop(threadflow, flowop, 0);
569 			filebench_log(LOG_ERROR,
570 			    "read file %s failed, offset %llu "
571 			    "io buffer %zd: %s",
572 			    avd_get_str(flowop->fo_fileset->fs_name),
573 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
574 			flowop_endop(threadflow, flowop, 0);
575 			return (FILEBENCH_ERROR);
576 		}
577 		(void) flowop_endop(threadflow, flowop, ret);
578 
579 		if ((ret == 0))
580 			(void) FB_LSEEK(fdesc, 0, SEEK_SET);
581 
582 	} else {
583 		(void) flowop_beginop(threadflow, flowop);
584 		if ((ret = FB_READ(fdesc, iobuf, iosize)) == -1) {
585 			(void) flowop_endop(threadflow, flowop, 0);
586 			filebench_log(LOG_ERROR,
587 			    "read file %s failed, io buffer %zd: %s",
588 			    avd_get_str(flowop->fo_fileset->fs_name),
589 			    iobuf, strerror(errno));
590 			(void) flowop_endop(threadflow, flowop, 0);
591 			return (FILEBENCH_ERROR);
592 		}
593 		(void) flowop_endop(threadflow, flowop, ret);
594 
595 		if ((ret == 0))
596 			(void) FB_LSEEK(fdesc, 0, SEEK_SET);
597 	}
598 
599 	return (FILEBENCH_OK);
600 }
601 
602 /*
603  * Initializes a "flowop_block" flowop. Specifically, it
604  * initializes the flowop's fo_cv and unlocks the fo_lock.
605  */
606 static int
607 flowoplib_block_init(flowop_t *flowop)
608 {
609 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d block init address %zx",
610 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
611 	(void) pthread_cond_init(&flowop->fo_cv, ipc_condattr());
612 	(void) ipc_mutex_unlock(&flowop->fo_lock);
613 
614 	return (FILEBENCH_OK);
615 }
616 
617 /*
618  * Blocks the threadflow until woken up by flowoplib_wakeup.
619  * The routine blocks on the flowop's fo_cv condition variable.
620  */
621 static int
622 flowoplib_block(threadflow_t *threadflow, flowop_t *flowop)
623 {
624 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d blocking at address %zx",
625 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
626 	(void) ipc_mutex_lock(&flowop->fo_lock);
627 
628 	flowop_beginop(threadflow, flowop);
629 	(void) pthread_cond_wait(&flowop->fo_cv, &flowop->fo_lock);
630 	flowop_endop(threadflow, flowop, 0);
631 
632 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
633 	    flowop->fo_name, flowop->fo_instance);
634 
635 	(void) ipc_mutex_unlock(&flowop->fo_lock);
636 
637 	return (FILEBENCH_OK);
638 }
639 
640 /*
641  * Wakes up one or more target blocking flowops.
642  * Sends broadcasts on the fo_cv condition variables of all
643  * flowops on the target list, except those that are
644  * FLOW_MASTER flowops. The target list consists of all
645  * flowops whose name matches this flowop's "fo_targetname"
646  * attribute. The target list is generated on the first
647  * invocation, and the run will be shutdown if no targets
648  * are found. Otherwise the routine always returns FILEBENCH_OK.
649  */
650 static int
651 flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop)
652 {
653 	flowop_t *target;
654 
655 	/* if this is the first wakeup, create the wakeup list */
656 	if (flowop->fo_targets == NULL) {
657 		flowop_t *result = flowop_find(flowop->fo_targetname);
658 
659 		flowop->fo_targets = result;
660 		if (result == NULL) {
661 			filebench_log(LOG_ERROR,
662 			    "wakeup: could not find op %s for thread %s",
663 			    flowop->fo_targetname,
664 			    threadflow->tf_name);
665 			filebench_shutdown(1);
666 		}
667 		while (result) {
668 			result->fo_targetnext =
669 			    result->fo_resultnext;
670 			result = result->fo_resultnext;
671 		}
672 	}
673 
674 	target = flowop->fo_targets;
675 
676 	/* wakeup the targets */
677 	while (target) {
678 		if (target->fo_instance == FLOW_MASTER) {
679 			target = target->fo_targetnext;
680 			continue;
681 		}
682 		filebench_log(LOG_DEBUG_IMPL,
683 		    "wakeup flow %s-%d at address %zx",
684 		    target->fo_name,
685 		    target->fo_instance,
686 		    &target->fo_cv);
687 
688 		flowop_beginop(threadflow, flowop);
689 		(void) ipc_mutex_lock(&target->fo_lock);
690 		(void) pthread_cond_broadcast(&target->fo_cv);
691 		(void) ipc_mutex_unlock(&target->fo_lock);
692 		flowop_endop(threadflow, flowop, 0);
693 
694 		target = target->fo_targetnext;
695 	}
696 
697 	return (FILEBENCH_OK);
698 }
699 
700 /*
701  * "think time" routines. the "hog" routine consumes cpu cycles as
702  * it "thinks", while the "delay" flowop simply calls sleep() to delay
703  * for a given number of seconds without consuming cpu cycles.
704  */
705 
706 
707 /*
708  * Consumes CPU cycles and memory bandwidth by looping for
709  * flowop->fo_value times. With each loop sets memory location
710  * threadflow->tf_mem to 1.
711  */
712 static int
713 flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop)
714 {
715 	uint64_t value = avd_get_int(flowop->fo_value);
716 	int i;
717 
718 	filebench_log(LOG_DEBUG_IMPL, "hog enter");
719 	flowop_beginop(threadflow, flowop);
720 	if (threadflow->tf_mem != NULL) {
721 		for (i = 0; i < value; i++)
722 			*(threadflow->tf_mem) = 1;
723 	}
724 	flowop_endop(threadflow, flowop, 0);
725 	filebench_log(LOG_DEBUG_IMPL, "hog exit");
726 	return (FILEBENCH_OK);
727 }
728 
729 
730 /*
731  * Delays for fo_value seconds.
732  */
733 static int
734 flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop)
735 {
736 	int value = avd_get_int(flowop->fo_value);
737 
738 	flowop_beginop(threadflow, flowop);
739 	(void) sleep(value);
740 	flowop_endop(threadflow, flowop, 0);
741 	return (FILEBENCH_OK);
742 }
743 
744 /*
745  * Rate limiting routines. This is the event consuming half of the
746  * event system. Each of the four following routines will limit the rate
747  * to one unit of either calls, issued I/O operations, issued filebench
748  * operations, or I/O bandwidth. Since there is only one event generator,
749  * the events will be divided amoung multiple instances of an event
750  * consumer, and further divided among different consumers if more than
751  * one has been defined. There is no mechanism to enforce equal sharing
752  * of events.
753  */
754 
755 /*
756  * Completes one invocation per posted event. If eventgen_q
757  * has an event count greater than zero, one will be removed
758  * (count decremented), otherwise the calling thread will
759  * block until another event has been posted. Always returns 0
760  */
761 static int
762 flowoplib_eventlimit(threadflow_t *threadflow, flowop_t *flowop)
763 {
764 	/* Immediately bail if not set/enabled */
765 	if (filebench_shm->shm_eventgen_hz == NULL)
766 		return (FILEBENCH_OK);
767 
768 	if (flowop->fo_initted == 0) {
769 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
770 		    flowop, threadflow->tf_name, threadflow->tf_instance);
771 		flowop->fo_initted = 1;
772 	}
773 
774 	flowop_beginop(threadflow, flowop);
775 	while (filebench_shm->shm_eventgen_hz != NULL) {
776 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
777 		if (filebench_shm->shm_eventgen_q > 0) {
778 			filebench_shm->shm_eventgen_q--;
779 			(void) ipc_mutex_unlock(
780 			    &filebench_shm->shm_eventgen_lock);
781 			break;
782 		}
783 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
784 		    &filebench_shm->shm_eventgen_lock);
785 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
786 	}
787 	flowop_endop(threadflow, flowop, 0);
788 	return (FILEBENCH_OK);
789 }
790 
791 static int
792 flowoplib_event_find_target(threadflow_t *threadflow, flowop_t *flowop)
793 {
794 	if (flowop->fo_targetname[0] != '\0') {
795 
796 		/* Try to use statistics from specific flowop */
797 		flowop->fo_targets =
798 		    flowop_find_from_list(flowop->fo_targetname,
799 		    threadflow->tf_thrd_fops);
800 		if (flowop->fo_targets == NULL) {
801 			filebench_log(LOG_ERROR,
802 			    "limit target: could not find flowop %s",
803 			    flowop->fo_targetname);
804 			filebench_shutdown(1);
805 			return (FILEBENCH_ERROR);
806 		}
807 	} else {
808 		/* use total workload statistics */
809 		flowop->fo_targets = NULL;
810 	}
811 	return (FILEBENCH_OK);
812 }
813 
814 /*
815  * Blocks the calling thread if the number of issued I/O
816  * operations exceeds the number of posted events, thus
817  * limiting the average I/O operation rate to the rate
818  * specified by eventgen_hz. Always returns FILEBENCH_OK.
819  */
820 static int
821 flowoplib_iopslimit(threadflow_t *threadflow, flowop_t *flowop)
822 {
823 	uint64_t iops;
824 	uint64_t delta;
825 	uint64_t events;
826 
827 	/* Immediately bail if not set/enabled */
828 	if (filebench_shm->shm_eventgen_hz == NULL)
829 		return (FILEBENCH_OK);
830 
831 	if (flowop->fo_initted == 0) {
832 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
833 		    flowop, threadflow->tf_name, threadflow->tf_instance);
834 		flowop->fo_initted = 1;
835 
836 		if (flowoplib_event_find_target(threadflow, flowop)
837 		    == FILEBENCH_ERROR)
838 			return (FILEBENCH_ERROR);
839 
840 		if (flowop->fo_targets && ((flowop->fo_targets->fo_attrs &
841 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
842 			filebench_log(LOG_ERROR,
843 			    "WARNING: Flowop %s does no IO",
844 			    flowop->fo_targets->fo_name);
845 			filebench_shutdown(1);
846 			return (FILEBENCH_ERROR);
847 		}
848 	}
849 
850 	if (flowop->fo_targets) {
851 		/*
852 		 * Note that fs_count is already the sum of fs_rcount
853 		 * and fs_wcount if looking at a single flowop.
854 		 */
855 		iops = flowop->fo_targets->fo_stats.fs_count;
856 	} else {
857 		(void) ipc_mutex_lock(&controlstats_lock);
858 		iops = (controlstats.fs_rcount +
859 		    controlstats.fs_wcount);
860 		(void) ipc_mutex_unlock(&controlstats_lock);
861 	}
862 
863 	/* Is this the first time around */
864 	if (flowop->fo_tputlast == 0) {
865 		flowop->fo_tputlast = iops;
866 		return (FILEBENCH_OK);
867 	}
868 
869 	delta = iops - flowop->fo_tputlast;
870 	flowop->fo_tputbucket -= delta;
871 	flowop->fo_tputlast = iops;
872 
873 	/* No need to block if the q isn't empty */
874 	if (flowop->fo_tputbucket >= 0LL) {
875 		flowop_endop(threadflow, flowop, 0);
876 		return (FILEBENCH_OK);
877 	}
878 
879 	iops = flowop->fo_tputbucket * -1;
880 	events = iops;
881 
882 	flowop_beginop(threadflow, flowop);
883 	while (filebench_shm->shm_eventgen_hz != NULL) {
884 
885 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
886 		if (filebench_shm->shm_eventgen_q >= events) {
887 			filebench_shm->shm_eventgen_q -= events;
888 			(void) ipc_mutex_unlock(
889 			    &filebench_shm->shm_eventgen_lock);
890 			flowop->fo_tputbucket += events;
891 			break;
892 		}
893 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
894 		    &filebench_shm->shm_eventgen_lock);
895 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
896 	}
897 	flowop_endop(threadflow, flowop, 0);
898 
899 	return (FILEBENCH_OK);
900 }
901 
902 /*
903  * Blocks the calling thread if the number of issued filebench
904  * operations exceeds the number of posted events, thus limiting
905  * the average filebench operation rate to the rate specified by
906  * eventgen_hz. Always returns FILEBENCH_OK.
907  */
908 static int
909 flowoplib_opslimit(threadflow_t *threadflow, flowop_t *flowop)
910 {
911 	uint64_t ops;
912 	uint64_t delta;
913 	uint64_t events;
914 
915 	/* Immediately bail if not set/enabled */
916 	if (filebench_shm->shm_eventgen_hz == NULL)
917 		return (FILEBENCH_OK);
918 
919 	if (flowop->fo_initted == 0) {
920 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
921 		    flowop, threadflow->tf_name, threadflow->tf_instance);
922 		flowop->fo_initted = 1;
923 
924 		if (flowoplib_event_find_target(threadflow, flowop)
925 		    == FILEBENCH_ERROR)
926 			return (FILEBENCH_ERROR);
927 	}
928 
929 	if (flowop->fo_targets) {
930 		ops = flowop->fo_targets->fo_stats.fs_count;
931 	} else {
932 		(void) ipc_mutex_lock(&controlstats_lock);
933 		ops = controlstats.fs_count;
934 		(void) ipc_mutex_unlock(&controlstats_lock);
935 	}
936 
937 	/* Is this the first time around */
938 	if (flowop->fo_tputlast == 0) {
939 		flowop->fo_tputlast = ops;
940 		return (FILEBENCH_OK);
941 	}
942 
943 	delta = ops - flowop->fo_tputlast;
944 	flowop->fo_tputbucket -= delta;
945 	flowop->fo_tputlast = ops;
946 
947 	/* No need to block if the q isn't empty */
948 	if (flowop->fo_tputbucket >= 0LL) {
949 		flowop_endop(threadflow, flowop, 0);
950 		return (FILEBENCH_OK);
951 	}
952 
953 	ops = flowop->fo_tputbucket * -1;
954 	events = ops;
955 
956 	flowop_beginop(threadflow, flowop);
957 	while (filebench_shm->shm_eventgen_hz != NULL) {
958 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
959 		if (filebench_shm->shm_eventgen_q >= events) {
960 			filebench_shm->shm_eventgen_q -= events;
961 			(void) ipc_mutex_unlock(
962 			    &filebench_shm->shm_eventgen_lock);
963 			flowop->fo_tputbucket += events;
964 			break;
965 		}
966 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
967 		    &filebench_shm->shm_eventgen_lock);
968 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
969 	}
970 	flowop_endop(threadflow, flowop, 0);
971 
972 	return (FILEBENCH_OK);
973 }
974 
975 
976 /*
977  * Blocks the calling thread if the number of bytes of I/O
978  * issued exceeds one megabyte times the number of posted
979  * events, thus limiting the average I/O byte rate to one
980  * megabyte times the event rate as set by eventgen_hz.
981  * Always retuns FILEBENCH_OK.
982  */
983 static int
984 flowoplib_bwlimit(threadflow_t *threadflow, flowop_t *flowop)
985 {
986 	uint64_t bytes;
987 	uint64_t delta;
988 	uint64_t events;
989 
990 	/* Immediately bail if not set/enabled */
991 	if (filebench_shm->shm_eventgen_hz == NULL)
992 		return (FILEBENCH_OK);
993 
994 	if (flowop->fo_initted == 0) {
995 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
996 		    flowop, threadflow->tf_name, threadflow->tf_instance);
997 		flowop->fo_initted = 1;
998 
999 		if (flowoplib_event_find_target(threadflow, flowop)
1000 		    == FILEBENCH_ERROR)
1001 			return (FILEBENCH_ERROR);
1002 
1003 		if ((flowop->fo_targets) &&
1004 		    ((flowop->fo_targets->fo_attrs &
1005 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
1006 			filebench_log(LOG_ERROR,
1007 			    "WARNING: Flowop %s does no Reads or Writes",
1008 			    flowop->fo_targets->fo_name);
1009 			filebench_shutdown(1);
1010 			return (FILEBENCH_ERROR);
1011 		}
1012 	}
1013 
1014 	if (flowop->fo_targets) {
1015 		/*
1016 		 * Note that fs_bytes is already the sum of fs_rbytes
1017 		 * and fs_wbytes if looking at a single flowop.
1018 		 */
1019 		bytes = flowop->fo_targets->fo_stats.fs_bytes;
1020 	} else {
1021 		(void) ipc_mutex_lock(&controlstats_lock);
1022 		bytes = (controlstats.fs_rbytes +
1023 		    controlstats.fs_wbytes);
1024 		(void) ipc_mutex_unlock(&controlstats_lock);
1025 	}
1026 
1027 	/* Is this the first time around? */
1028 	if (flowop->fo_tputlast == 0) {
1029 		flowop->fo_tputlast = bytes;
1030 		return (FILEBENCH_OK);
1031 	}
1032 
1033 	delta = bytes - flowop->fo_tputlast;
1034 	flowop->fo_tputbucket -= delta;
1035 	flowop->fo_tputlast = bytes;
1036 
1037 	/* No need to block if the q isn't empty */
1038 	if (flowop->fo_tputbucket >= 0LL) {
1039 		flowop_endop(threadflow, flowop, 0);
1040 		return (FILEBENCH_OK);
1041 	}
1042 
1043 	bytes = flowop->fo_tputbucket * -1;
1044 	events = (bytes / MB) + 1;
1045 
1046 	filebench_log(LOG_DEBUG_IMPL, "%llu bytes, %llu events",
1047 	    (u_longlong_t)bytes, (u_longlong_t)events);
1048 
1049 	flowop_beginop(threadflow, flowop);
1050 	while (filebench_shm->shm_eventgen_hz != NULL) {
1051 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1052 		if (filebench_shm->shm_eventgen_q >= events) {
1053 			filebench_shm->shm_eventgen_q -= events;
1054 			(void) ipc_mutex_unlock(
1055 			    &filebench_shm->shm_eventgen_lock);
1056 			flowop->fo_tputbucket += (events * MB);
1057 			break;
1058 		}
1059 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1060 		    &filebench_shm->shm_eventgen_lock);
1061 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1062 	}
1063 	flowop_endop(threadflow, flowop, 0);
1064 
1065 	return (FILEBENCH_OK);
1066 }
1067 
1068 /*
1069  * These flowops terminate a benchmark run when either the specified
1070  * number of bytes of I/O (flowoplib_finishonbytes) or the specified
1071  * number of I/O operations (flowoplib_finishoncount) have been generated.
1072  */
1073 
1074 
1075 /*
1076  * Stop filebench run when specified number of I/O bytes have been
1077  * transferred. Compares controlstats.fs_bytes with flowop->value,
1078  * and if greater returns 1, stopping the run, if not, returns 0
1079  * to continue running.
1080  */
1081 static int
1082 flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop)
1083 {
1084 	uint64_t bytes_io;		/* Bytes of I/O delivered so far */
1085 	uint64_t byte_lim = flowop->fo_constvalue;  /* Total Bytes desired */
1086 						    /* Uses constant value */
1087 
1088 	if (flowop->fo_initted == 0) {
1089 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1090 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1091 		flowop->fo_initted = 1;
1092 
1093 		if (flowoplib_event_find_target(threadflow, flowop)
1094 		    == FILEBENCH_ERROR)
1095 			return (FILEBENCH_ERROR);
1096 
1097 		if ((flowop->fo_targets) &&
1098 		    ((flowop->fo_targets->fo_attrs &
1099 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
1100 			filebench_log(LOG_ERROR,
1101 			    "WARNING: Flowop %s does no Reads or Writes",
1102 			    flowop->fo_targets->fo_name);
1103 			filebench_shutdown(1);
1104 			return (FILEBENCH_ERROR);
1105 		}
1106 	}
1107 
1108 	if (flowop->fo_targets) {
1109 		bytes_io = flowop->fo_targets->fo_stats.fs_bytes;
1110 	} else {
1111 		(void) ipc_mutex_lock(&controlstats_lock);
1112 		bytes_io = controlstats.fs_bytes;
1113 		(void) ipc_mutex_unlock(&controlstats_lock);
1114 	}
1115 
1116 	flowop_beginop(threadflow, flowop);
1117 	if (bytes_io > byte_lim) {
1118 		flowop_endop(threadflow, flowop, 0);
1119 		return (FILEBENCH_DONE);
1120 	}
1121 	flowop_endop(threadflow, flowop, 0);
1122 
1123 	return (FILEBENCH_OK);
1124 }
1125 
1126 /*
1127  * Stop filebench run when specified number of I/O operations have
1128  * been performed. Compares controlstats.fs_count with *flowop->value,
1129  * and if greater returns 1, stopping the run, if not, returns FILEBENCH_OK
1130  * to continue running.
1131  */
1132 static int
1133 flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop)
1134 {
1135 	uint64_t ops;
1136 	uint64_t count = flowop->fo_constvalue; /* use constant value */
1137 
1138 	if (flowop->fo_initted == 0) {
1139 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1140 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1141 		flowop->fo_initted = 1;
1142 
1143 		if (flowoplib_event_find_target(threadflow, flowop)
1144 		    == FILEBENCH_ERROR)
1145 			return (FILEBENCH_ERROR);
1146 	}
1147 
1148 	if (flowop->fo_targets) {
1149 		ops = flowop->fo_targets->fo_stats.fs_count;
1150 	} else {
1151 		(void) ipc_mutex_lock(&controlstats_lock);
1152 		ops = controlstats.fs_count;
1153 		(void) ipc_mutex_unlock(&controlstats_lock);
1154 	}
1155 
1156 	flowop_beginop(threadflow, flowop);
1157 	if (ops >= count) {
1158 		flowop_endop(threadflow, flowop, 0);
1159 		return (FILEBENCH_DONE);
1160 	}
1161 	flowop_endop(threadflow, flowop, 0);
1162 
1163 	return (FILEBENCH_OK);
1164 }
1165 
1166 /*
1167  * Semaphore synchronization using either System V semaphores or
1168  * posix semaphores. If System V semaphores are available, they will be
1169  * used, otherwise posix semaphores will be used.
1170  */
1171 
1172 
1173 /*
1174  * Initializes the filebench "block on semaphore" flowop.
1175  * If System V semaphores are implemented, the routine
1176  * initializes the System V semaphore subsystem if it hasn't
1177  * already been initialized, also allocates a pair of semids
1178  * and initializes the highwater System V semaphore.
1179  * If no System V semaphores, then does nothing special.
1180  * Returns FILEBENCH_ERROR if it cannot acquire a set of System V semphores
1181  * or if the initial post to the semaphore set fails. Returns FILEBENCH_OK
1182  * on success.
1183  */
1184 static int
1185 flowoplib_semblock_init(flowop_t *flowop)
1186 {
1187 
1188 #ifdef HAVE_SYSV_SEM
1189 	int sys_semid;
1190 	struct sembuf sbuf[2];
1191 	int highwater;
1192 
1193 	ipc_seminit();
1194 
1195 	flowop->fo_semid_lw = ipc_semidalloc();
1196 	flowop->fo_semid_hw = ipc_semidalloc();
1197 
1198 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d semblock init semid=%x",
1199 	    flowop->fo_name, flowop->fo_instance, flowop->fo_semid_lw);
1200 
1201 	sys_semid = filebench_shm->shm_sys_semid;
1202 
1203 	if ((highwater = flowop->fo_semid_hw) == 0)
1204 		highwater = flowop->fo_constvalue; /* use constant value */
1205 
1206 	filebench_log(LOG_DEBUG_IMPL, "setting highwater to : %d", highwater);
1207 
1208 	sbuf[0].sem_num = (short)highwater;
1209 	sbuf[0].sem_op = avd_get_int(flowop->fo_highwater);
1210 	sbuf[0].sem_flg = 0;
1211 	if ((semop(sys_semid, &sbuf[0], 1) == -1) && errno) {
1212 		filebench_log(LOG_ERROR, "semblock init post failed: %s (%d,"
1213 		    "%d)", strerror(errno), sbuf[0].sem_num, sbuf[0].sem_op);
1214 		return (FILEBENCH_ERROR);
1215 	}
1216 #else
1217 	filebench_log(LOG_DEBUG_IMPL,
1218 	    "flow %s-%d semblock init with posix semaphore",
1219 	    flowop->fo_name, flowop->fo_instance);
1220 
1221 	sem_init(&flowop->fo_sem, 1, 0);
1222 #endif	/* HAVE_SYSV_SEM */
1223 
1224 	if (!(avd_get_bool(flowop->fo_blocking)))
1225 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1226 
1227 	return (FILEBENCH_OK);
1228 }
1229 
1230 /*
1231  * Releases the semids for the System V semaphore allocated
1232  * to this flowop. If not using System V semaphores, then
1233  * it is effectively just a no-op.
1234  */
1235 static void
1236 flowoplib_semblock_destruct(flowop_t *flowop)
1237 {
1238 #ifdef HAVE_SYSV_SEM
1239 	ipc_semidfree(flowop->fo_semid_lw);
1240 	ipc_semidfree(flowop->fo_semid_hw);
1241 #else
1242 	sem_destroy(&flowop->fo_sem);
1243 #endif /* HAVE_SYSV_SEM */
1244 }
1245 
1246 /*
1247  * Attempts to pass a System V or posix semaphore as appropriate,
1248  * and blocks if necessary. Returns FILEBENCH_ERROR if a set of System V
1249  * semphores is not available or cannot be acquired, or if the initial
1250  * post to the semaphore set fails. Returns FILEBENCH_OK on success.
1251  */
1252 static int
1253 flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop)
1254 {
1255 
1256 #ifdef HAVE_SYSV_SEM
1257 	struct sembuf sbuf[2];
1258 	int value = avd_get_int(flowop->fo_value);
1259 	int sys_semid;
1260 	struct timespec timeout;
1261 
1262 	sys_semid = filebench_shm->shm_sys_semid;
1263 
1264 	filebench_log(LOG_DEBUG_IMPL,
1265 	    "flow %s-%d sem blocking on id %x num %x value %d",
1266 	    flowop->fo_name, flowop->fo_instance, sys_semid,
1267 	    flowop->fo_semid_hw, value);
1268 
1269 	/* Post, decrement the increment the hw queue */
1270 	sbuf[0].sem_num = flowop->fo_semid_hw;
1271 	sbuf[0].sem_op = (short)value;
1272 	sbuf[0].sem_flg = 0;
1273 	sbuf[1].sem_num = flowop->fo_semid_lw;
1274 	sbuf[1].sem_op = value * -1;
1275 	sbuf[1].sem_flg = 0;
1276 	timeout.tv_sec = 600;
1277 	timeout.tv_nsec = 0;
1278 
1279 	if (avd_get_bool(flowop->fo_blocking))
1280 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1281 
1282 	flowop_beginop(threadflow, flowop);
1283 
1284 #ifdef HAVE_SEMTIMEDOP
1285 	(void) semtimedop(sys_semid, &sbuf[0], 1, &timeout);
1286 	(void) semtimedop(sys_semid, &sbuf[1], 1, &timeout);
1287 #else
1288 	(void) semop(sys_semid, &sbuf[0], 1);
1289 	(void) semop(sys_semid, &sbuf[1], 1);
1290 #endif /* HAVE_SEMTIMEDOP */
1291 
1292 	if (avd_get_bool(flowop->fo_blocking))
1293 		(void) ipc_mutex_lock(&flowop->fo_lock);
1294 
1295 	flowop_endop(threadflow, flowop, 0);
1296 
1297 #else
1298 	int value = avd_get_int(flowop->fo_value);
1299 	int i;
1300 
1301 	filebench_log(LOG_DEBUG_IMPL,
1302 	    "flow %s-%d sem blocking on posix semaphore",
1303 	    flowop->fo_name, flowop->fo_instance);
1304 
1305 	/* Decrement sem by value */
1306 	for (i = 0; i < value; i++) {
1307 		if (sem_wait(&flowop->fo_sem) == -1) {
1308 			filebench_log(LOG_ERROR, "semop wait failed");
1309 			return (FILEBENCH_ERROR);
1310 		}
1311 	}
1312 
1313 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d sem unblocking",
1314 	    flowop->fo_name, flowop->fo_instance);
1315 #endif /* HAVE_SYSV_SEM */
1316 
1317 	return (FILEBENCH_OK);
1318 }
1319 
1320 /*
1321  * Calls ipc_seminit(). Always returns FILEBENCH_OK.
1322  */
1323 /* ARGSUSED */
1324 static int
1325 flowoplib_sempost_init(flowop_t *flowop)
1326 {
1327 #ifdef HAVE_SYSV_SEM
1328 	ipc_seminit();
1329 #endif /* HAVE_SYSV_SEM */
1330 	return (FILEBENCH_OK);
1331 }
1332 
1333 /*
1334  * Post to a System V or posix semaphore as appropriate.
1335  * On the first call for a given flowop instance, this routine
1336  * will use the fo_targetname attribute to locate all semblock
1337  * flowops that are expecting posts from this flowop. All
1338  * target flowops on this list will have a post operation done
1339  * to their semaphores on each call.
1340  */
1341 static int
1342 flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop)
1343 {
1344 	flowop_t *target;
1345 
1346 	filebench_log(LOG_DEBUG_IMPL,
1347 	    "sempost flow %s-%d",
1348 	    flowop->fo_name,
1349 	    flowop->fo_instance);
1350 
1351 	/* if this is the first post, create the post list */
1352 	if (flowop->fo_targets == NULL) {
1353 		flowop_t *result = flowop_find(flowop->fo_targetname);
1354 
1355 		flowop->fo_targets = result;
1356 
1357 		if (result == NULL) {
1358 			filebench_log(LOG_ERROR,
1359 			    "sempost: could not find op %s for thread %s",
1360 			    flowop->fo_targetname,
1361 			    threadflow->tf_name);
1362 			filebench_shutdown(1);
1363 		}
1364 
1365 		while (result) {
1366 			result->fo_targetnext =
1367 			    result->fo_resultnext;
1368 			result = result->fo_resultnext;
1369 		}
1370 	}
1371 
1372 	target = flowop->fo_targets;
1373 
1374 	flowop_beginop(threadflow, flowop);
1375 	/* post to the targets */
1376 	while (target) {
1377 #ifdef HAVE_SYSV_SEM
1378 		struct sembuf sbuf[2];
1379 		int sys_semid;
1380 		int blocking;
1381 #else
1382 		int i;
1383 #endif /* HAVE_SYSV_SEM */
1384 		struct timespec timeout;
1385 		int value = (int)avd_get_int(flowop->fo_value);
1386 
1387 		if (target->fo_instance == FLOW_MASTER) {
1388 			target = target->fo_targetnext;
1389 			continue;
1390 		}
1391 
1392 #ifdef HAVE_SYSV_SEM
1393 
1394 		filebench_log(LOG_DEBUG_IMPL,
1395 		    "sempost flow %s-%d num %x",
1396 		    target->fo_name,
1397 		    target->fo_instance,
1398 		    target->fo_semid_lw);
1399 
1400 		sys_semid = filebench_shm->shm_sys_semid;
1401 		sbuf[0].sem_num = target->fo_semid_lw;
1402 		sbuf[0].sem_op = (short)value;
1403 		sbuf[0].sem_flg = 0;
1404 		sbuf[1].sem_num = target->fo_semid_hw;
1405 		sbuf[1].sem_op = value * -1;
1406 		sbuf[1].sem_flg = 0;
1407 		timeout.tv_sec = 600;
1408 		timeout.tv_nsec = 0;
1409 
1410 		if (avd_get_bool(flowop->fo_blocking))
1411 			blocking = 1;
1412 		else
1413 			blocking = 0;
1414 
1415 #ifdef HAVE_SEMTIMEDOP
1416 		if ((semtimedop(sys_semid, &sbuf[0], blocking + 1,
1417 		    &timeout) == -1) && (errno && (errno != EAGAIN))) {
1418 #else
1419 		if ((semop(sys_semid, &sbuf[0], blocking + 1) == -1) &&
1420 		    (errno && (errno != EAGAIN))) {
1421 #endif /* HAVE_SEMTIMEDOP */
1422 			filebench_log(LOG_ERROR, "semop post failed: %s",
1423 			    strerror(errno));
1424 			return (FILEBENCH_ERROR);
1425 		}
1426 
1427 		filebench_log(LOG_DEBUG_IMPL,
1428 		    "flow %s-%d finished posting",
1429 		    target->fo_name, target->fo_instance);
1430 #else
1431 		filebench_log(LOG_DEBUG_IMPL,
1432 		    "sempost flow %s-%d to posix semaphore",
1433 		    target->fo_name,
1434 		    target->fo_instance);
1435 
1436 		/* Increment sem by value */
1437 		for (i = 0; i < value; i++) {
1438 			if (sem_post(&target->fo_sem) == -1) {
1439 				filebench_log(LOG_ERROR, "semop post failed");
1440 				return (FILEBENCH_ERROR);
1441 			}
1442 		}
1443 
1444 		filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
1445 		    target->fo_name, target->fo_instance);
1446 #endif /* HAVE_SYSV_SEM */
1447 
1448 		target = target->fo_targetnext;
1449 	}
1450 	flowop_endop(threadflow, flowop, 0);
1451 
1452 	return (FILEBENCH_OK);
1453 }
1454 
1455 
1456 /*
1457  * Section for exercising create / open / close / delete operations
1458  * on files within a fileset. For proper operation, the flowop attribute
1459  * "fd", which sets the fo_fdnumber field in the flowop, must be used
1460  * so that the same file is opened and later closed. "fd" is an index
1461  * into a pair of arrays maintained by threadflows, one of which
1462  * contains the operating system assigned file descriptors and the other
1463  * a pointer to the filesetentry whose file the file descriptor
1464  * references. An openfile flowop defined without fd being set will use
1465  * the default (0) fd or, if specified, rotate through fd indices, but
1466  * createfile and closefile must use the default or a specified fd.
1467  * Meanwhile deletefile picks and arbitrary file to delete, regardless
1468  * of fd attribute.
1469  */
1470 
1471 /*
1472  * Emulates (and actually does) file open. Obtains a file descriptor
1473  * index, then calls flowoplib_openfile_common() to open. Returns
1474  * FILEBENCH_ERROR if no file descriptor is found, and returns the
1475  * status from flowoplib_openfile_common otherwise (FILEBENCH_ERROR,
1476  * FILEBENCH_NORSC, FILEBENCH_OK).
1477  */
1478 static int
1479 flowoplib_openfile(threadflow_t *threadflow, flowop_t *flowop)
1480 {
1481 	int fd = flowoplib_fdnum(threadflow, flowop);
1482 
1483 	if (fd == -1)
1484 		return (FILEBENCH_ERROR);
1485 
1486 	return (flowoplib_openfile_common(threadflow, flowop, fd));
1487 }
1488 
1489 /*
1490  * Common file opening code for filesets. Uses the supplied
1491  * file descriptor index to determine the tf_fd entry to use.
1492  * If the entry is empty (0) and the fileset exists, fileset
1493  * pick is called to select a fileset entry to use. The file
1494  * specified in the filesetentry is opened, and the returned
1495  * operating system file descriptor and a pointer to the
1496  * filesetentry are stored in tf_fd[fd] and tf_fse[fd],
1497  * respectively. Returns FILEBENCH_ERROR on error,
1498  * FILEBENCH_NORSC if no suitable filesetentry can be found,
1499  * and FILEBENCH_OK on success.
1500  */
1501 static int
1502 flowoplib_openfile_common(threadflow_t *threadflow, flowop_t *flowop, int fd)
1503 {
1504 	filesetentry_t *file;
1505 	char *fileset_name;
1506 	int tid = 0;
1507 	int openflag = 0;
1508 	int err;
1509 
1510 	if (flowop->fo_fileset == NULL) {
1511 		filebench_log(LOG_ERROR, "flowop NULL file");
1512 		return (FILEBENCH_ERROR);
1513 	}
1514 
1515 	if ((fileset_name =
1516 	    avd_get_str(flowop->fo_fileset->fs_name)) == NULL) {
1517 		filebench_log(LOG_ERROR,
1518 		    "flowop %s: fileset has no name", flowop->fo_name);
1519 		return (FILEBENCH_ERROR);
1520 	}
1521 
1522 	/*
1523 	 * set the open flag for read only or read/write, as appropriate.
1524 	 */
1525 	if (avd_get_bool(flowop->fo_fileset->fs_readonly) == TRUE)
1526 		openflag = O_RDONLY;
1527 	else
1528 		openflag = O_RDWR;
1529 
1530 	/*
1531 	 * If the flowop doesn't default to persistent fd
1532 	 * then get unique thread ID for use by fileset_pick
1533 	 */
1534 	if (avd_get_bool(flowop->fo_rotatefd))
1535 		tid = threadflow->tf_utid;
1536 
1537 	if (threadflow->tf_fd[fd].fd_ptr != NULL) {
1538 		filebench_log(LOG_ERROR,
1539 		    "flowop %s attempted to open without closing on fd %d",
1540 		    flowop->fo_name, fd);
1541 		return (FILEBENCH_ERROR);
1542 	}
1543 
1544 #ifdef HAVE_RAW_SUPPORT
1545 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1546 		int open_attrs = 0;
1547 		char name[MAXPATHLEN];
1548 
1549 		(void) fb_strlcpy(name,
1550 		    avd_get_str(flowop->fo_fileset->fs_path), MAXPATHLEN);
1551 		(void) fb_strlcat(name, "/", MAXPATHLEN);
1552 		(void) fb_strlcat(name, fileset_name, MAXPATHLEN);
1553 
1554 		if (avd_get_bool(flowop->fo_dsync)) {
1555 #ifdef sun
1556 			open_attrs |= O_DSYNC;
1557 #else
1558 			open_attrs |= O_FSYNC;
1559 #endif
1560 		}
1561 
1562 		filebench_log(LOG_DEBUG_SCRIPT,
1563 		    "open raw device %s flags %d = %d", name, open_attrs, fd);
1564 
1565 		if (FB_OPEN(&(threadflow->tf_fd[fd]), name,
1566 		    openflag | open_attrs, 0666) == FILEBENCH_ERROR) {
1567 			filebench_log(LOG_ERROR,
1568 			    "Failed to open raw device %s: %s",
1569 			    name, strerror(errno));
1570 			return (FILEBENCH_ERROR);
1571 		}
1572 
1573 		/* if running on Solaris, use un-buffered io */
1574 #ifdef sun
1575 		(void) directio(threadflow->tf_fd[fd].fd_num, DIRECTIO_ON);
1576 #endif
1577 
1578 		threadflow->tf_fse[fd] = NULL;
1579 
1580 		return (FILEBENCH_OK);
1581 	}
1582 #endif /* HAVE_RAW_SUPPORT */
1583 
1584 	if ((err = flowoplib_pickfile(&file, flowop,
1585 	    FILESET_PICKEXISTS, tid)) != FILEBENCH_OK) {
1586 		filebench_log(LOG_DEBUG_SCRIPT,
1587 		    "flowop %s failed to pick file from %s on fd %d",
1588 		    flowop->fo_name, fileset_name, fd);
1589 		return (err);
1590 	}
1591 
1592 	threadflow->tf_fse[fd] = file;
1593 
1594 	flowop_beginop(threadflow, flowop);
1595 	err = fileset_openfile(&threadflow->tf_fd[fd], flowop->fo_fileset,
1596 	    file, openflag, 0666, flowoplib_fileattrs(flowop));
1597 	flowop_endop(threadflow, flowop, 0);
1598 
1599 	if (err == FILEBENCH_ERROR) {
1600 		filebench_log(LOG_ERROR, "flowop %s failed to open file %s",
1601 		    flowop->fo_name, file->fse_path);
1602 		return (FILEBENCH_ERROR);
1603 	}
1604 
1605 	filebench_log(LOG_DEBUG_SCRIPT,
1606 	    "flowop %s: opened %s fd[%d] = %d",
1607 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1608 
1609 	return (FILEBENCH_OK);
1610 }
1611 
1612 /*
1613  * Emulate create of a file. Uses the flowop's fdnumber to select
1614  * tf_fd and tf_fse array locations to put the created file's file
1615  * descriptor and filesetentry respectively. Uses flowoplib_pickfile()
1616  * to select a specific filesetentry whose file does not currently
1617  * exist for the file create operation. Then calls
1618  * fileset_openfile() with the O_CREATE flag set to create the
1619  * file. Returns FILEBENCH_ERROR if the array index specified by fdnumber is
1620  * already in use, the flowop has no associated fileset, or
1621  * the create call fails. Returns 1 if a filesetentry with a
1622  * nonexistent file cannot be found. Returns FILEBENCH_OK on success.
1623  */
1624 static int
1625 flowoplib_createfile(threadflow_t *threadflow, flowop_t *flowop)
1626 {
1627 	filesetentry_t *file;
1628 	int fd = flowop->fo_fdnumber;
1629 	int err;
1630 
1631 	if (threadflow->tf_fd[fd].fd_ptr != NULL) {
1632 		filebench_log(LOG_ERROR,
1633 		    "flowop %s attempted to create without closing on fd %d",
1634 		    flowop->fo_name, fd);
1635 		return (FILEBENCH_ERROR);
1636 	}
1637 
1638 	if (flowop->fo_fileset == NULL) {
1639 		filebench_log(LOG_ERROR, "flowop NULL file");
1640 		return (FILEBENCH_ERROR);
1641 	}
1642 
1643 	if (avd_get_bool(flowop->fo_fileset->fs_readonly) == TRUE) {
1644 		filebench_log(LOG_ERROR, "Can not CREATE the READONLY file %s",
1645 		    avd_get_str(flowop->fo_fileset->fs_name));
1646 		return (FILEBENCH_ERROR);
1647 	}
1648 
1649 
1650 #ifdef HAVE_RAW_SUPPORT
1651 	/* can't be used with raw devices */
1652 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1653 		filebench_log(LOG_ERROR,
1654 		    "flowop %s attempted to a createfile on RAW device",
1655 		    flowop->fo_name);
1656 		return (FILEBENCH_ERROR);
1657 	}
1658 #endif /* HAVE_RAW_SUPPORT */
1659 
1660 	if ((err = flowoplib_pickfile(&file, flowop,
1661 	    FILESET_PICKNOEXIST, 0)) != FILEBENCH_OK) {
1662 		filebench_log(LOG_DEBUG_SCRIPT,
1663 		    "flowop %s failed to pick file from fileset %s",
1664 		    flowop->fo_name,
1665 		    avd_get_str(flowop->fo_fileset->fs_name));
1666 		return (err);
1667 	}
1668 
1669 	threadflow->tf_fse[fd] = file;
1670 
1671 	flowop_beginop(threadflow, flowop);
1672 	err = fileset_openfile(&threadflow->tf_fd[fd], flowop->fo_fileset,
1673 	    file, O_RDWR | O_CREAT, 0666, flowoplib_fileattrs(flowop));
1674 	flowop_endop(threadflow, flowop, 0);
1675 
1676 	if (err == FILEBENCH_ERROR) {
1677 		filebench_log(LOG_ERROR, "failed to create file %s",
1678 		    flowop->fo_name);
1679 		return (FILEBENCH_ERROR);
1680 	}
1681 
1682 	filebench_log(LOG_DEBUG_SCRIPT,
1683 	    "flowop %s: created %s fd[%d] = %d",
1684 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1685 
1686 	return (FILEBENCH_OK);
1687 }
1688 
1689 /*
1690  * Emulates delete of a file. If a valid fd is provided, it uses the
1691  * filesetentry stored at that fd location to select the file to be
1692  * deleted, otherwise it picks an arbitrary filesetentry
1693  * whose file exists. It then uses unlink() to delete it and Clears
1694  * the FSE_EXISTS flag for the filesetentry. Returns FILEBENCH_ERROR if the
1695  * flowop has no associated fileset. Returns FILEBENCH_NORSC if an appropriate
1696  * filesetentry cannot be found, and FILEBENCH_OK on success.
1697  */
1698 static int
1699 flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop)
1700 {
1701 	filesetentry_t *file;
1702 	fileset_t *fileset;
1703 	char path[MAXPATHLEN];
1704 	char *pathtmp;
1705 	int fd = flowop->fo_fdnumber;
1706 
1707 	/* if fd specified, use it to access file */
1708 	if ((fd > 0) && ((file = threadflow->tf_fse[fd]) != NULL)) {
1709 
1710 		/* indicate that the file will be deleted */
1711 		threadflow->tf_fse[fd] = NULL;
1712 
1713 		/* if here, we still have a valid file pointer */
1714 		fileset = file->fse_fileset;
1715 	} else {
1716 
1717 		/* Otherwise, pick arbitrary file */
1718 		file = NULL;
1719 		fileset = flowop->fo_fileset;
1720 	}
1721 
1722 
1723 	if (fileset == NULL) {
1724 		filebench_log(LOG_ERROR, "flowop NULL file");
1725 		return (FILEBENCH_ERROR);
1726 	}
1727 
1728 #ifdef HAVE_RAW_SUPPORT
1729 	/* can't be used with raw devices */
1730 	if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1731 		filebench_log(LOG_ERROR,
1732 		    "flowop %s attempted a deletefile on RAW device",
1733 		    flowop->fo_name);
1734 		return (FILEBENCH_ERROR);
1735 	}
1736 #endif /* HAVE_RAW_SUPPORT */
1737 
1738 	if (file == NULL) {
1739 		int err;
1740 
1741 		/* pick arbitrary, existing (allocated) file */
1742 		if ((err = flowoplib_pickfile(&file, flowop,
1743 		    FILESET_PICKEXISTS, 0)) != FILEBENCH_OK) {
1744 			filebench_log(LOG_DEBUG_SCRIPT,
1745 			    "flowop %s failed to pick file", flowop->fo_name);
1746 			return (err);
1747 		}
1748 	} else {
1749 		/* delete specific file. wait for it to be non-busy */
1750 		(void) ipc_mutex_lock(&fileset->fs_pick_lock);
1751 		while (file->fse_flags & FSE_BUSY) {
1752 			file->fse_flags |= FSE_THRD_WAITNG;
1753 			(void) pthread_cond_wait(&fileset->fs_thrd_wait_cv,
1754 			    &fileset->fs_pick_lock);
1755 		}
1756 
1757 		/* File now available, grab it for deletion */
1758 		file->fse_flags |= FSE_BUSY;
1759 		fileset->fs_idle_files--;
1760 		(void) ipc_mutex_unlock(&fileset->fs_pick_lock);
1761 	}
1762 
1763 	/* don't delete if anyone (other than me) has file open */
1764 	if ((fd > 0) && (threadflow->tf_fd[fd].fd_num > 0)) {
1765 		if (file->fse_open_cnt > 1) {
1766 			filebench_log(LOG_DEBUG_SCRIPT,
1767 			    "flowop %s can't delete file opened by other"
1768 			    " threads at fd = %d", flowop->fo_name, fd);
1769 			fileset_unbusy(file, FALSE, FALSE, 0);
1770 			return (FILEBENCH_OK);
1771 		} else {
1772 			filebench_log(LOG_DEBUG_SCRIPT,
1773 			    "flowop %s deleting still open file at fd = %d",
1774 			    flowop->fo_name, fd);
1775 		}
1776 	} else if (file->fse_open_cnt > 0) {
1777 		filebench_log(LOG_DEBUG_SCRIPT,
1778 		    "flowop %s can't delete file opened by other"
1779 		    " threads at fd = %d, open count = %d",
1780 		    flowop->fo_name, fd, file->fse_open_cnt);
1781 		fileset_unbusy(file, FALSE, FALSE, 0);
1782 		return (FILEBENCH_OK);
1783 	}
1784 
1785 	(void) fb_strlcpy(path, avd_get_str(fileset->fs_path), MAXPATHLEN);
1786 	(void) fb_strlcat(path, "/", MAXPATHLEN);
1787 	(void) fb_strlcat(path, avd_get_str(fileset->fs_name), MAXPATHLEN);
1788 	pathtmp = fileset_resolvepath(file);
1789 	(void) fb_strlcat(path, pathtmp, MAXPATHLEN);
1790 	free(pathtmp);
1791 
1792 	/* delete the selected file */
1793 	flowop_beginop(threadflow, flowop);
1794 	(void) FB_UNLINK(path);
1795 	flowop_endop(threadflow, flowop, 0);
1796 
1797 	/* indicate that it is no longer busy and no longer exists */
1798 	fileset_unbusy(file, TRUE, FALSE, -file->fse_open_cnt);
1799 
1800 	filebench_log(LOG_DEBUG_SCRIPT, "deleted file %s", file->fse_path);
1801 
1802 	return (FILEBENCH_OK);
1803 }
1804 
1805 /*
1806  * Emulates fsync of a file. Obtains the file descriptor index
1807  * from the flowop, obtains the actual file descriptor from
1808  * the threadflow's table, checks to be sure it is still an
1809  * open file, then does an fsync operation on it. Returns FILEBENCH_ERROR
1810  * if the file no longer is open, FILEBENCH_OK otherwise.
1811  */
1812 static int
1813 flowoplib_fsync(threadflow_t *threadflow, flowop_t *flowop)
1814 {
1815 	filesetentry_t *file;
1816 	int fd = flowop->fo_fdnumber;
1817 
1818 	if (threadflow->tf_fd[fd].fd_ptr == NULL) {
1819 		filebench_log(LOG_ERROR,
1820 		    "flowop %s attempted to fsync a closed fd %d",
1821 		    flowop->fo_name, fd);
1822 		return (FILEBENCH_ERROR);
1823 	}
1824 
1825 	file = threadflow->tf_fse[fd];
1826 
1827 	if ((file == NULL) ||
1828 	    (file->fse_fileset->fs_attrs & FILESET_IS_RAW_DEV)) {
1829 		filebench_log(LOG_ERROR,
1830 		    "flowop %s attempted to a fsync a RAW device",
1831 		    flowop->fo_name);
1832 		return (FILEBENCH_ERROR);
1833 	}
1834 
1835 	/* Measure time to fsync */
1836 	flowop_beginop(threadflow, flowop);
1837 	(void) FB_FSYNC(&threadflow->tf_fd[fd]);
1838 	flowop_endop(threadflow, flowop, 0);
1839 
1840 	filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s", file->fse_path);
1841 
1842 	return (FILEBENCH_OK);
1843 }
1844 
1845 /*
1846  * Emulate fsync of an entire fileset. Search through the
1847  * threadflow's file descriptor array, doing fsync() on each
1848  * open file that belongs to the flowop's fileset. Always
1849  * returns FILEBENCH_OK.
1850  */
1851 static int
1852 flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop)
1853 {
1854 	int fd;
1855 
1856 	for (fd = 0; fd < THREADFLOW_MAXFD; fd++) {
1857 		filesetentry_t *file;
1858 
1859 		/* Match the file set to fsync */
1860 		if ((threadflow->tf_fse[fd] == NULL) ||
1861 		    (flowop->fo_fileset != threadflow->tf_fse[fd]->fse_fileset))
1862 			continue;
1863 
1864 		/* Measure time to fsync */
1865 		flowop_beginop(threadflow, flowop);
1866 		(void) FB_FSYNC(&threadflow->tf_fd[fd]);
1867 		flowop_endop(threadflow, flowop, 0);
1868 
1869 		file = threadflow->tf_fse[fd];
1870 
1871 		filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s",
1872 		    file->fse_path);
1873 	}
1874 
1875 	return (FILEBENCH_OK);
1876 }
1877 
1878 /*
1879  * Emulate close of a file.  Obtains the file descriptor index
1880  * from the flowop, obtains the actual file descriptor from the
1881  * threadflow's table, checks to be sure it is still an open
1882  * file, then does a close operation on it. Then sets the
1883  * threadflow file descriptor table entry to 0, and the file set
1884  * entry pointer to NULL. Returns FILEBENCH_ERROR if the file was not open,
1885  * FILEBENCH_OK otherwise.
1886  */
1887 static int
1888 flowoplib_closefile(threadflow_t *threadflow, flowop_t *flowop)
1889 {
1890 	filesetentry_t *file;
1891 	fileset_t *fileset;
1892 	int fd = flowop->fo_fdnumber;
1893 
1894 	if (threadflow->tf_fd[fd].fd_ptr == NULL) {
1895 		filebench_log(LOG_ERROR,
1896 		    "flowop %s attempted to close an already closed fd %d",
1897 		    flowop->fo_name, fd);
1898 		return (FILEBENCH_ERROR);
1899 	}
1900 
1901 	file = threadflow->tf_fse[fd];
1902 	fileset = file->fse_fileset;
1903 
1904 	/* Wait for it to be non-busy */
1905 	(void) ipc_mutex_lock(&fileset->fs_pick_lock);
1906 	while (file->fse_flags & FSE_BUSY) {
1907 		file->fse_flags |= FSE_THRD_WAITNG;
1908 		(void) pthread_cond_wait(&fileset->fs_thrd_wait_cv,
1909 		    &fileset->fs_pick_lock);
1910 	}
1911 
1912 	/* File now available, grab it for closing */
1913 	file->fse_flags |= FSE_BUSY;
1914 
1915 	/* if last open, set declare idle */
1916 	if (file->fse_open_cnt == 1)
1917 		fileset->fs_idle_files--;
1918 
1919 	(void) ipc_mutex_unlock(&fileset->fs_pick_lock);
1920 
1921 	/* Measure time to close */
1922 	flowop_beginop(threadflow, flowop);
1923 	(void) FB_CLOSE(&threadflow->tf_fd[fd]);
1924 	flowop_endop(threadflow, flowop, 0);
1925 
1926 	fileset_unbusy(file, FALSE, FALSE, -1);
1927 
1928 	threadflow->tf_fd[fd].fd_ptr = NULL;
1929 
1930 	filebench_log(LOG_DEBUG_SCRIPT, "closed file %s", file->fse_path);
1931 
1932 	return (FILEBENCH_OK);
1933 }
1934 
1935 /*
1936  * Obtain the full pathname of the directory described by the filesetentry
1937  * indicated by "dir", and copy it into the character array pointed to by
1938  * path. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
1939  */
1940 static int
1941 flowoplib_getdirpath(filesetentry_t *dir, char *path)
1942 {
1943 	char		*fileset_path;
1944 	char		*fileset_name;
1945 	char		*part_path;
1946 
1947 	if ((fileset_path = avd_get_str(dir->fse_fileset->fs_path)) == NULL) {
1948 		filebench_log(LOG_ERROR, "Fileset path not set");
1949 		return (FILEBENCH_ERROR);
1950 	}
1951 
1952 	if ((fileset_name = avd_get_str(dir->fse_fileset->fs_name)) == NULL) {
1953 		filebench_log(LOG_ERROR, "Fileset name not set");
1954 		return (FILEBENCH_ERROR);
1955 	}
1956 
1957 	(void) fb_strlcpy(path, fileset_path, MAXPATHLEN);
1958 	(void) fb_strlcat(path, "/", MAXPATHLEN);
1959 	(void) fb_strlcat(path, fileset_name, MAXPATHLEN);
1960 
1961 	if ((part_path = fileset_resolvepath(dir)) == NULL)
1962 		return (FILEBENCH_ERROR);
1963 
1964 	(void) fb_strlcat(path, part_path, MAXPATHLEN);
1965 	free(part_path);
1966 
1967 	return (FILEBENCH_OK);
1968 }
1969 
1970 /*
1971  * Use mkdir to create a directory.  Obtains the fileset name from the
1972  * flowop, selects a non-existent leaf directory and obtains its full
1973  * path, then uses mkdir to create it on the storage subsystem (make it
1974  * existent). Returns FILEBENCH_NORSC is there are no more non-existent
1975  * directories in the fileset, FILEBENCH_ERROR on other errors, and
1976  * FILEBENCH_OK on success.
1977  */
1978 static int
1979 flowoplib_makedir(threadflow_t *threadflow, flowop_t *flowop)
1980 {
1981 	filesetentry_t	*dir;
1982 	int		ret;
1983 	char		full_path[MAXPATHLEN];
1984 
1985 	if ((ret = flowoplib_pickleafdir(&dir, flowop,
1986 	    FILESET_PICKNOEXIST)) != FILEBENCH_OK)
1987 		return (ret);
1988 
1989 	if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
1990 		return (ret);
1991 
1992 	flowop_beginop(threadflow, flowop);
1993 	(void) FB_MKDIR(full_path, 0755);
1994 	flowop_endop(threadflow, flowop, 0);
1995 
1996 	/* indicate that it is no longer busy and now exists */
1997 	fileset_unbusy(dir, TRUE, TRUE, 0);
1998 
1999 	return (FILEBENCH_OK);
2000 }
2001 
2002 /*
2003  * Use rmdir to delete a directory.  Obtains the fileset name from the
2004  * flowop, selects an existent leaf directory and obtains its full path,
2005  * then uses rmdir to remove it from the storage subsystem (make it
2006  * non-existent). Returns FILEBENCH_NORSC is there are no more existent
2007  * directories in the fileset, FILEBENCH_ERROR on other errors, and
2008  * FILEBENCH_OK on success.
2009  */
2010 static int
2011 flowoplib_removedir(threadflow_t *threadflow, flowop_t *flowop)
2012 {
2013 	filesetentry_t *dir;
2014 	int		ret;
2015 	char		full_path[MAXPATHLEN];
2016 
2017 	if ((ret = flowoplib_pickleafdir(&dir, flowop,
2018 	    FILESET_PICKEXISTS)) != FILEBENCH_OK)
2019 		return (ret);
2020 
2021 	if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
2022 		return (ret);
2023 
2024 	flowop_beginop(threadflow, flowop);
2025 	(void) FB_RMDIR(full_path);
2026 	flowop_endop(threadflow, flowop, 0);
2027 
2028 	/* indicate that it is no longer busy and no longer exists */
2029 	fileset_unbusy(dir, TRUE, FALSE, 0);
2030 
2031 	return (FILEBENCH_OK);
2032 }
2033 
2034 /*
2035  * Use opendir(), multiple readdir() calls, and closedir() to list the
2036  * contents of a directory.  Obtains the fileset name from the
2037  * flowop, selects a normal subdirectory (which always exist) and obtains
2038  * its full path, then uses opendir() to get a DIR handle to it from the
2039  * file system, a readdir() loop to access each directory entry, and
2040  * finally cleans up with a closedir(). The latency reported is the total
2041  * for all this activity, and it also reports the total number of bytes
2042  * in the entries as the amount "read". Returns FILEBENCH_ERROR on errors,
2043  * and FILEBENCH_OK on success.
2044  */
2045 static int
2046 flowoplib_listdir(threadflow_t *threadflow, flowop_t *flowop)
2047 {
2048 	fileset_t	*fileset;
2049 	filesetentry_t	*dir;
2050 	DIR		*dir_handle;
2051 	struct dirent	*direntp;
2052 	int		dir_bytes = 0;
2053 	int		ret;
2054 	char		full_path[MAXPATHLEN];
2055 
2056 	if ((fileset = flowop->fo_fileset) == NULL) {
2057 		filebench_log(LOG_ERROR, "flowop NO fileset");
2058 		return (FILEBENCH_ERROR);
2059 	}
2060 
2061 	if ((dir = fileset_pick(fileset, FILESET_PICKDIR, 0, 0)) == NULL) {
2062 		filebench_log(LOG_DEBUG_SCRIPT,
2063 		    "flowop %s failed to pick directory from fileset %s",
2064 		    flowop->fo_name,
2065 		    avd_get_str(fileset->fs_name));
2066 		return (FILEBENCH_ERROR);
2067 	}
2068 
2069 	if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
2070 		return (ret);
2071 
2072 	flowop_beginop(threadflow, flowop);
2073 
2074 	/* open the directory */
2075 	if ((dir_handle = FB_OPENDIR(full_path)) == NULL) {
2076 		filebench_log(LOG_ERROR,
2077 		    "flowop %s failed to open directory in fileset %s\n",
2078 		    flowop->fo_name, avd_get_str(fileset->fs_name));
2079 		return (FILEBENCH_ERROR);
2080 	}
2081 
2082 	/* read through the directory entries */
2083 	while ((direntp = FB_READDIR(dir_handle)) != NULL) {
2084 		dir_bytes += (strlen(direntp->d_name) +
2085 		    sizeof (struct dirent) - 1);
2086 	}
2087 
2088 	/* close the directory */
2089 	(void) FB_CLOSEDIR(dir_handle);
2090 
2091 	flowop_endop(threadflow, flowop, dir_bytes);
2092 
2093 	/* indicate that it is no longer busy */
2094 	fileset_unbusy(dir, FALSE, FALSE, 0);
2095 
2096 	return (FILEBENCH_OK);
2097 }
2098 
2099 /*
2100  * Emulate stat of a file. Picks an arbitrary filesetentry with
2101  * an existing file from the flowop's fileset, then performs a
2102  * stat() operation on it. Returns FILEBENCH_ERROR if the flowop has no
2103  * associated fileset. Returns FILEBENCH_NORSC if an appropriate filesetentry
2104  * cannot be found, and FILEBENCH_OK on success.
2105  */
2106 static int
2107 flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop)
2108 {
2109 	filesetentry_t *file;
2110 	fileset_t *fileset;
2111 	struct stat64 statbuf;
2112 	int fd = flowop->fo_fdnumber;
2113 
2114 	/* if fd specified and the file is open, use it to access file */
2115 	if ((fd > 0) && (threadflow->tf_fd[fd].fd_num > 0)) {
2116 
2117 		/* check whether file handle still valid */
2118 		if ((file = threadflow->tf_fse[fd]) == NULL) {
2119 			filebench_log(LOG_DEBUG_SCRIPT,
2120 			    "flowop %s trying to stat NULL file at fd = %d",
2121 			    flowop->fo_name, fd);
2122 			return (FILEBENCH_ERROR);
2123 		}
2124 
2125 		/* if here, we still have a valid file pointer */
2126 		fileset = file->fse_fileset;
2127 	} else {
2128 		/* Otherwise, pick arbitrary file */
2129 		file = NULL;
2130 		fileset = flowop->fo_fileset;
2131 	}
2132 
2133 	if (fileset == NULL) {
2134 		filebench_log(LOG_ERROR,
2135 		    "statfile with no fileset specified");
2136 		return (FILEBENCH_ERROR);
2137 	}
2138 
2139 #ifdef HAVE_RAW_SUPPORT
2140 	/* can't be used with raw devices */
2141 	if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
2142 		filebench_log(LOG_ERROR,
2143 		    "flowop %s attempted do a statfile on a RAW device",
2144 		    flowop->fo_name);
2145 		return (FILEBENCH_ERROR);
2146 	}
2147 #endif /* HAVE_RAW_SUPPORT */
2148 
2149 	if (file == NULL) {
2150 		char path[MAXPATHLEN];
2151 		char *pathtmp;
2152 		int err;
2153 
2154 		/* pick arbitrary, existing (allocated) file */
2155 		if ((err = flowoplib_pickfile(&file, flowop,
2156 		    FILESET_PICKEXISTS, 0)) != FILEBENCH_OK) {
2157 			filebench_log(LOG_DEBUG_SCRIPT,
2158 			    "Statfile flowop %s failed to pick file",
2159 			    flowop->fo_name);
2160 			return (err);
2161 		}
2162 
2163 		/* resolve path and do a stat on file */
2164 		(void) fb_strlcpy(path, avd_get_str(fileset->fs_path),
2165 		    MAXPATHLEN);
2166 		(void) fb_strlcat(path, "/", MAXPATHLEN);
2167 		(void) fb_strlcat(path, avd_get_str(fileset->fs_name),
2168 		    MAXPATHLEN);
2169 		pathtmp = fileset_resolvepath(file);
2170 		(void) fb_strlcat(path, pathtmp, MAXPATHLEN);
2171 		free(pathtmp);
2172 
2173 		/* stat the file */
2174 		flowop_beginop(threadflow, flowop);
2175 		if (FB_STAT(path, &statbuf) == -1)
2176 			filebench_log(LOG_ERROR,
2177 			    "statfile flowop %s failed", flowop->fo_name);
2178 		flowop_endop(threadflow, flowop, 0);
2179 
2180 		fileset_unbusy(file, FALSE, FALSE, 0);
2181 	} else {
2182 		/* stat specific file */
2183 		flowop_beginop(threadflow, flowop);
2184 		if (FB_FSTAT(&threadflow->tf_fd[fd], &statbuf) == -1)
2185 			filebench_log(LOG_ERROR,
2186 			    "statfile flowop %s failed", flowop->fo_name);
2187 		flowop_endop(threadflow, flowop, 0);
2188 
2189 	}
2190 
2191 	return (FILEBENCH_OK);
2192 }
2193 
2194 
2195 /*
2196  * Additional reads and writes. Read and write whole files, write
2197  * and append to files. Some of these work with both fileobjs and
2198  * filesets, others only with filesets. The flowoplib_write routine
2199  * writes from thread memory, while the others read or write using
2200  * fo_buf memory. Note that both flowoplib_read() and
2201  * flowoplib_aiowrite() use thread memory as well.
2202  */
2203 
2204 
2205 /*
2206  * Emulate a read of a whole file. The file must be open with
2207  * file descriptor and filesetentry stored at the locations indexed
2208  * by the flowop's fdnumber. It then seeks to the beginning of the
2209  * associated file, and reads fs_iosize bytes at a time until the end
2210  * of the file. Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if
2211  * out of files, and FILEBENCH_OK on success.
2212  */
2213 static int
2214 flowoplib_readwholefile(threadflow_t *threadflow, flowop_t *flowop)
2215 {
2216 	caddr_t iobuf;
2217 	off64_t bytes = 0;
2218 	fb_fdesc_t *fdesc;
2219 	uint64_t wss;
2220 	fbint_t iosize;
2221 	int ret;
2222 	char zerordbuf;
2223 
2224 	/* get the file to use */
2225 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
2226 	    &fdesc)) != FILEBENCH_OK)
2227 		return (ret);
2228 
2229 	/* an I/O size of zero means read entire working set with one I/O */
2230 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
2231 		iosize = wss;
2232 
2233 	/*
2234 	 * The file may actually be 0 bytes long, in which case skip
2235 	 * the buffer set up call (which would fail) and substitute
2236 	 * a small buffer, which won't really be used.
2237 	 */
2238 	if (iosize == 0) {
2239 		iobuf = (caddr_t)&zerordbuf;
2240 		filebench_log(LOG_DEBUG_SCRIPT,
2241 		    "flowop %s read zero length file", flowop->fo_name);
2242 	} else {
2243 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
2244 		    iosize) != 0)
2245 			return (FILEBENCH_ERROR);
2246 	}
2247 
2248 	/* Measure time to read bytes */
2249 	flowop_beginop(threadflow, flowop);
2250 	(void) FB_LSEEK(fdesc, 0, SEEK_SET);
2251 	while ((ret = FB_READ(fdesc, iobuf, iosize)) > 0)
2252 		bytes += ret;
2253 
2254 	flowop_endop(threadflow, flowop, bytes);
2255 
2256 	if (ret < 0) {
2257 		filebench_log(LOG_ERROR,
2258 		    "readwhole fail Failed to read whole file: %s",
2259 		    strerror(errno));
2260 		return (FILEBENCH_ERROR);
2261 	}
2262 
2263 	return (FILEBENCH_OK);
2264 }
2265 
2266 /*
2267  * Emulate a write to a file of size fo_iosize.  Will write
2268  * to a file from a fileset if the flowop's fo_fileset field
2269  * specifies one or its fdnumber is non zero. Otherwise it
2270  * will write to a fileobj file, if one exists. If the file
2271  * is not currently open, the routine will attempt to open
2272  * it. The flowop's fo_wss parameter will be used to set the
2273  * maximum file size if it is non-zero, otherwise the
2274  * filesetentry's  fse_size will be used. A random memory
2275  * buffer offset is calculated, and, if fo_random is TRUE,
2276  * a random file offset is used for the write. Otherwise the
2277  * write is to the next sequential location. Returns
2278  * FILEBENCH_ERROR on errors, FILEBENCH_NORSC if iosetup can't
2279  * obtain a file, or FILEBENCH_OK on success.
2280  */
2281 static int
2282 flowoplib_write(threadflow_t *threadflow, flowop_t *flowop)
2283 {
2284 	caddr_t iobuf;
2285 	fbint_t wss;
2286 	fbint_t iosize;
2287 	fb_fdesc_t *fdesc;
2288 	int ret;
2289 
2290 	iosize = avd_get_int(flowop->fo_iosize);
2291 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2292 	    &fdesc, iosize)) != FILEBENCH_OK)
2293 		return (ret);
2294 
2295 	if (avd_get_bool(flowop->fo_random)) {
2296 		uint64_t fileoffset;
2297 
2298 		if (filebench_randomno64(&fileoffset,
2299 		    wss, iosize, NULL) == -1) {
2300 			filebench_log(LOG_ERROR,
2301 			    "file size smaller than IO size for thread %s",
2302 			    flowop->fo_name);
2303 			return (FILEBENCH_ERROR);
2304 		}
2305 		flowop_beginop(threadflow, flowop);
2306 		if (FB_PWRITE(fdesc, iobuf,
2307 		    iosize, (off64_t)fileoffset) == -1) {
2308 			filebench_log(LOG_ERROR, "write failed, "
2309 			    "offset %llu io buffer %zd: %s",
2310 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
2311 			flowop_endop(threadflow, flowop, 0);
2312 			return (FILEBENCH_ERROR);
2313 		}
2314 		flowop_endop(threadflow, flowop, iosize);
2315 	} else {
2316 		flowop_beginop(threadflow, flowop);
2317 		if (FB_WRITE(fdesc, iobuf, iosize) == -1) {
2318 			filebench_log(LOG_ERROR,
2319 			    "write failed, io buffer %zd: %s",
2320 			    iobuf, strerror(errno));
2321 			flowop_endop(threadflow, flowop, 0);
2322 			return (FILEBENCH_ERROR);
2323 		}
2324 		flowop_endop(threadflow, flowop, iosize);
2325 	}
2326 
2327 	return (FILEBENCH_OK);
2328 }
2329 
2330 /*
2331  * Emulate a write of a whole file.  The size of the file
2332  * is taken from a filesetentry identified by fo_srcfdnumber or
2333  * from the working set size, while the file descriptor used is
2334  * identified by fo_fdnumber. Does multiple writes of fo_iosize
2335  * length length until full file has been written. Returns FILEBENCH_ERROR on
2336  * error, FILEBENCH_NORSC if out of files, FILEBENCH_OK on success.
2337  */
2338 static int
2339 flowoplib_writewholefile(threadflow_t *threadflow, flowop_t *flowop)
2340 {
2341 	caddr_t iobuf;
2342 	filesetentry_t *file;
2343 	int wsize;
2344 	off64_t seek;
2345 	off64_t bytes = 0;
2346 	uint64_t wss;
2347 	fbint_t iosize;
2348 	fb_fdesc_t *fdesc;
2349 	int srcfd = flowop->fo_srcfdnumber;
2350 	int ret;
2351 	char zerowrtbuf;
2352 
2353 	/* get the file to use */
2354 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
2355 	    &fdesc)) != FILEBENCH_OK)
2356 		return (ret);
2357 
2358 	/* an I/O size of zero means write entire working set with one I/O */
2359 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
2360 		iosize = wss;
2361 
2362 	/*
2363 	 * The file may actually be 0 bytes long, in which case skip
2364 	 * the buffer set up call (which would fail) and substitute
2365 	 * a small buffer, which won't really be used.
2366 	 */
2367 	if (iosize == 0) {
2368 		iobuf = (caddr_t)&zerowrtbuf;
2369 		filebench_log(LOG_DEBUG_SCRIPT,
2370 		    "flowop %s wrote zero length file", flowop->fo_name);
2371 	} else {
2372 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
2373 		    iosize) != 0)
2374 			return (FILEBENCH_ERROR);
2375 	}
2376 
2377 	file = threadflow->tf_fse[srcfd];
2378 	if ((srcfd != 0) && (file == NULL)) {
2379 		filebench_log(LOG_ERROR, "flowop %s: NULL src file",
2380 		    flowop->fo_name);
2381 		return (FILEBENCH_ERROR);
2382 	}
2383 
2384 	if (file)
2385 		wss = file->fse_size;
2386 
2387 	wsize = (int)MIN(wss, iosize);
2388 
2389 	/* Measure time to write bytes */
2390 	flowop_beginop(threadflow, flowop);
2391 	for (seek = 0; seek < wss; seek += wsize) {
2392 		ret = FB_WRITE(fdesc, iobuf, wsize);
2393 		if (ret != wsize) {
2394 			filebench_log(LOG_ERROR,
2395 			    "Failed to write %d bytes on fd %d: %s",
2396 			    wsize, fdesc->fd_num, strerror(errno));
2397 			flowop_endop(threadflow, flowop, 0);
2398 			return (FILEBENCH_ERROR);
2399 		}
2400 		wsize = (int)MIN(wss - seek, iosize);
2401 		bytes += ret;
2402 	}
2403 	flowop_endop(threadflow, flowop, bytes);
2404 
2405 	return (FILEBENCH_OK);
2406 }
2407 
2408 
2409 /*
2410  * Emulate a fixed size append to a file. Will append data to
2411  * a file chosen from a fileset if the flowop's fo_fileset
2412  * field specifies one or if its fdnumber is non zero.
2413  * Otherwise it will write to a fileobj file, if one exists.
2414  * The flowop's fo_wss parameter will be used to set the
2415  * maximum file size if it is non-zero, otherwise the
2416  * filesetentry's fse_size will be used. A random memory
2417  * buffer offset is calculated, then a logical seek to the
2418  * end of file is done followed by a write of fo_iosize
2419  * bytes. Writes are actually done from fo_buf, rather than
2420  * tf_mem as is done with flowoplib_write(), and no check
2421  * is made to see if fo_iosize exceeds the size of fo_buf.
2422  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
2423  * files in the fileset, FILEBENCH_OK on success.
2424  */
2425 static int
2426 flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop)
2427 {
2428 	caddr_t iobuf;
2429 	fb_fdesc_t *fdesc;
2430 	fbint_t wss;
2431 	fbint_t iosize;
2432 	int ret;
2433 
2434 	iosize = avd_get_int(flowop->fo_iosize);
2435 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2436 	    &fdesc, iosize)) != FILEBENCH_OK)
2437 		return (ret);
2438 
2439 	/* XXX wss is not being used */
2440 
2441 	/* Measure time to write bytes */
2442 	flowop_beginop(threadflow, flowop);
2443 	(void) FB_LSEEK(fdesc, 0, SEEK_END);
2444 	ret = FB_WRITE(fdesc, iobuf, iosize);
2445 	if (ret != iosize) {
2446 		filebench_log(LOG_ERROR,
2447 		    "Failed to write %llu bytes on fd %d: %s",
2448 		    (u_longlong_t)iosize, fdesc->fd_num, strerror(errno));
2449 		flowop_endop(threadflow, flowop, ret);
2450 		return (FILEBENCH_ERROR);
2451 	}
2452 	flowop_endop(threadflow, flowop, ret);
2453 
2454 	return (FILEBENCH_OK);
2455 }
2456 
2457 /*
2458  * Emulate a random size append to a file. Will append data
2459  * to a file chosen from a fileset if the flowop's fo_fileset
2460  * field specifies one or if its fdnumber is non zero. Otherwise
2461  * it will write to a fileobj file, if one exists. The flowop's
2462  * fo_wss parameter will be used to set the maximum file size
2463  * if it is non-zero, otherwise the filesetentry's fse_size
2464  * will be used.  A random transfer size (but at most fo_iosize
2465  * bytes) and a random memory offset are calculated. A logical
2466  * seek to the end of file is done, then writes of up to
2467  * FILE_ALLOC_BLOCK in size are done until the full transfer
2468  * size has been written. Writes are actually done from fo_buf,
2469  * rather than tf_mem as is done with flowoplib_write().
2470  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
2471  * files in the fileset, FILEBENCH_OK on success.
2472  */
2473 static int
2474 flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop)
2475 {
2476 	caddr_t iobuf;
2477 	uint64_t appendsize;
2478 	fb_fdesc_t *fdesc;
2479 	fbint_t wss;
2480 	fbint_t iosize;
2481 	int ret = 0;
2482 
2483 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0) {
2484 		filebench_log(LOG_ERROR, "zero iosize for flowop %s",
2485 		    flowop->fo_name);
2486 		return (FILEBENCH_ERROR);
2487 	}
2488 
2489 	if (filebench_randomno64(&appendsize, iosize, 1LL, NULL) != 0)
2490 		return (FILEBENCH_ERROR);
2491 
2492 	/* skip if attempting zero length append */
2493 	if (appendsize == 0) {
2494 		flowop_beginop(threadflow, flowop);
2495 		flowop_endop(threadflow, flowop, 0LL);
2496 		return (FILEBENCH_OK);
2497 	}
2498 
2499 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2500 	    &fdesc, appendsize)) != FILEBENCH_OK)
2501 		return (ret);
2502 
2503 	/* XXX wss is not being used */
2504 
2505 	/* Measure time to write bytes */
2506 	flowop_beginop(threadflow, flowop);
2507 
2508 	(void) FB_LSEEK(fdesc, 0, SEEK_END);
2509 	ret = FB_WRITE(fdesc, iobuf, appendsize);
2510 	if (ret != appendsize) {
2511 		filebench_log(LOG_ERROR,
2512 		    "Failed to write %llu bytes on fd %d: %s",
2513 		    (u_longlong_t)appendsize, fdesc->fd_num, strerror(errno));
2514 		flowop_endop(threadflow, flowop, 0);
2515 		return (FILEBENCH_ERROR);
2516 	}
2517 
2518 	flowop_endop(threadflow, flowop, appendsize);
2519 
2520 	return (FILEBENCH_OK);
2521 }
2522 
2523 typedef struct testrandvar_priv {
2524 	uint64_t sample_count;
2525 	double val_sum;
2526 	double sqr_sum;
2527 } testrandvar_priv_t;
2528 
2529 /*
2530  * flowop to calculate various statistics from the number stream
2531  * produced by a random variable. This allows verification that the
2532  * random distribution used to define the random variable is producing
2533  * the expected distribution of random numbers.
2534  */
2535 /* ARGSUSED */
2536 static int
2537 flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop)
2538 {
2539 	testrandvar_priv_t	*mystats;
2540 	double			value;
2541 
2542 	if ((mystats = (testrandvar_priv_t *)flowop->fo_private) == NULL) {
2543 		filebench_log(LOG_ERROR, "testrandvar not initialized\n");
2544 		filebench_shutdown(1);
2545 		return (-1);
2546 	}
2547 
2548 	value = avd_get_dbl(flowop->fo_value);
2549 
2550 	mystats->sample_count++;
2551 	mystats->val_sum += value;
2552 	mystats->sqr_sum += (value * value);
2553 
2554 	return (0);
2555 }
2556 
2557 /*
2558  * Initialize the private data area used to accumulate the statistics
2559  */
2560 static int
2561 flowoplib_testrandvar_init(flowop_t *flowop)
2562 {
2563 	testrandvar_priv_t	*mystats;
2564 
2565 	if ((mystats = (testrandvar_priv_t *)
2566 	    malloc(sizeof (testrandvar_priv_t))) == NULL) {
2567 		filebench_log(LOG_ERROR, "could not initialize testrandvar");
2568 		filebench_shutdown(1);
2569 		return (-1);
2570 	}
2571 
2572 	mystats->sample_count = 0;
2573 	mystats->val_sum = 0;
2574 	mystats->sqr_sum = 0;
2575 	flowop->fo_private = (void *)mystats;
2576 
2577 	(void) ipc_mutex_unlock(&flowop->fo_lock);
2578 	return (0);
2579 }
2580 
2581 /*
2582  * Print out the accumulated statistics, and free the private storage
2583  */
2584 static void
2585 flowoplib_testrandvar_destruct(flowop_t *flowop)
2586 {
2587 	testrandvar_priv_t	*mystats;
2588 	double mean, std_dev, dbl_count;
2589 
2590 	(void) ipc_mutex_lock(&flowop->fo_lock);
2591 	if ((mystats = (testrandvar_priv_t *)
2592 	    flowop->fo_private) == NULL) {
2593 		(void) ipc_mutex_unlock(&flowop->fo_lock);
2594 		return;
2595 	}
2596 
2597 	flowop->fo_private = NULL;
2598 	(void) ipc_mutex_unlock(&flowop->fo_lock);
2599 
2600 	dbl_count = (double)mystats->sample_count;
2601 	mean = mystats->val_sum / dbl_count;
2602 	std_dev = sqrt((mystats->sqr_sum / dbl_count) - (mean * mean)) / mean;
2603 
2604 	filebench_log(LOG_VERBOSE,
2605 	    "testrandvar: ops = %llu, mean = %8.2lf, stddev = %8.2lf",
2606 	    (u_longlong_t)mystats->sample_count, mean, std_dev);
2607 	free(mystats);
2608 }
2609 
2610 /*
2611  * prints message to the console from within a thread
2612  */
2613 static int
2614 flowoplib_print(threadflow_t *threadflow, flowop_t *flowop)
2615 {
2616 	procflow_t *procflow;
2617 
2618 	procflow = threadflow->tf_process;
2619 	filebench_log(LOG_INFO,
2620 	    "Message from process (%s,%d), thread (%s,%d): %s",
2621 	    procflow->pf_name, procflow->pf_instance,
2622 	    threadflow->tf_name, threadflow->tf_instance,
2623 	    avd_get_str(flowop->fo_value));
2624 
2625 	return (FILEBENCH_OK);
2626 }
2627 
2628 /*
2629  * Prints usage information for flowop operations.
2630  */
2631 void
2632 flowoplib_usage()
2633 {
2634 	(void) fprintf(stderr,
2635 	    "flowop [openfile|createfile] name=<name>,fileset=<fname>\n");
2636 	(void) fprintf(stderr,
2637 	    "                       [,fd=<file desc num>]\n");
2638 	(void) fprintf(stderr, "\n");
2639 	(void) fprintf(stderr,
2640 	    "flowop closefile name=<name>,fd=<file desc num>]\n");
2641 	(void) fprintf(stderr, "\n");
2642 	(void) fprintf(stderr, "flowop deletefile name=<name>\n");
2643 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2644 	(void) fprintf(stderr,
2645 	    "                       [,fd=<file desc num>]\n");
2646 	(void) fprintf(stderr, "\n");
2647 	(void) fprintf(stderr, "flowop statfile name=<name>\n");
2648 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2649 	(void) fprintf(stderr,
2650 	    "                       [,fd=<file desc num>]\n");
2651 	(void) fprintf(stderr, "\n");
2652 	(void) fprintf(stderr,
2653 	    "flowop fsync name=<name>,fd=<file desc num>]\n");
2654 	(void) fprintf(stderr, "\n");
2655 	(void) fprintf(stderr,
2656 	    "flowop fsyncset name=<name>,fileset=<fname>]\n");
2657 	(void) fprintf(stderr, "\n");
2658 	(void) fprintf(stderr, "flowop [write|read|aiowrite] name=<name>, \n");
2659 	(void) fprintf(stderr,
2660 	    "                       filename|fileset=<fname>,\n");
2661 	(void) fprintf(stderr, "                       iosize=<size>\n");
2662 	(void) fprintf(stderr, "                       [,directio]\n");
2663 	(void) fprintf(stderr, "                       [,dsync]\n");
2664 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2665 	(void) fprintf(stderr, "                       [,random]\n");
2666 	(void) fprintf(stderr, "                       [,opennext]\n");
2667 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
2668 	(void) fprintf(stderr,
2669 	    "flowop [appendfile|appendfilerand] name=<name>, \n");
2670 	(void) fprintf(stderr,
2671 	    "                       filename|fileset=<fname>,\n");
2672 	(void) fprintf(stderr, "                       iosize=<size>\n");
2673 	(void) fprintf(stderr, "                       [,dsync]\n");
2674 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2675 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
2676 	(void) fprintf(stderr,
2677 	    "flowop [readwholefile|writewholefile] name=<name>, \n");
2678 	(void) fprintf(stderr,
2679 	    "                       filename|fileset=<fname>,\n");
2680 	(void) fprintf(stderr, "                       iosize=<size>\n");
2681 	(void) fprintf(stderr, "                       [,dsync]\n");
2682 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2683 	(void) fprintf(stderr, "\n");
2684 	(void) fprintf(stderr, "flowop aiowait name=<name>,target="
2685 	    "<aiowrite-flowop>\n");
2686 	(void) fprintf(stderr, "\n");
2687 	(void) fprintf(stderr, "flowop sempost name=<name>,"
2688 	    "target=<semblock-flowop>,\n");
2689 	(void) fprintf(stderr,
2690 	    "                       value=<increment-to-post>\n");
2691 	(void) fprintf(stderr, "\n");
2692 	(void) fprintf(stderr, "flowop semblock name=<name>,value="
2693 	    "<decrement-to-receive>,\n");
2694 	(void) fprintf(stderr, "                       highwater="
2695 	    "<inbound-queue-max>\n");
2696 	(void) fprintf(stderr, "\n");
2697 	(void) fprintf(stderr, "flowop block name=<name>\n");
2698 	(void) fprintf(stderr, "\n");
2699 	(void) fprintf(stderr,
2700 	    "flowop wakeup name=<name>,target=<block-flowop>,\n");
2701 	(void) fprintf(stderr, "\n");
2702 	(void) fprintf(stderr,
2703 	    "flowop hog name=<name>,value=<number-of-mem-ops>\n");
2704 	(void) fprintf(stderr,
2705 	    "flowop delay name=<name>,value=<number-of-seconds>\n");
2706 	(void) fprintf(stderr, "\n");
2707 	(void) fprintf(stderr, "flowop eventlimit name=<name>\n");
2708 	(void) fprintf(stderr, "flowop bwlimit name=<name>,value=<mb/s>\n");
2709 	(void) fprintf(stderr, "flowop iopslimit name=<name>,value=<iop/s>\n");
2710 	(void) fprintf(stderr,
2711 	    "flowop finishoncount name=<name>,value=<ops/s>\n");
2712 	(void) fprintf(stderr,
2713 	    "flowop finishonbytes name=<name>,value=<bytes>\n");
2714 	(void) fprintf(stderr, "\n");
2715 	(void) fprintf(stderr, "\n");
2716 }
2717