xref: /onnv-gate/usr/src/cmd/filebench/common/flowop_library.c (revision 8615:fa52c2eca6c5)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  *
25  * Portions Copyright 2008 Denis Cheng
26  */
27 
28 #include "config.h"
29 
30 #include <sys/types.h>
31 #ifdef HAVE_SYS_ASYNCH_H
32 #include <sys/asynch.h>
33 #endif
34 #include <stddef.h>
35 #include <sys/ipc.h>
36 #include <sys/sem.h>
37 #include <sys/errno.h>
38 #include <sys/time.h>
39 #include <inttypes.h>
40 #include <fcntl.h>
41 #include <math.h>
42 #include <dirent.h>
43 
44 #ifdef HAVE_UTILITY_H
45 #include <utility.h>
46 #endif /* HAVE_UTILITY_H */
47 
48 #ifdef HAVE_SYS_ASYNC_H
49 #include <sys/asynch.h>
50 #endif /* HAVE_SYS_ASYNC_H */
51 
52 #ifndef HAVE_UINT_T
53 #define	uint_t unsigned int
54 #endif /* HAVE_UINT_T */
55 
56 #ifndef HAVE_SYSV_SEM
57 #include <semaphore.h>
58 #endif /* HAVE_SYSV_SEM */
59 
60 #include "filebench.h"
61 #include "flowop.h"
62 #include "fileset.h"
63 #include "fb_random.h"
64 #include "utils.h"
65 #include "fsplug.h"
66 
67 /*
68  * These routines implement the flowops from the f language. Each
69  * flowop has has a name such as "read", and a set of function pointers
70  * to call for initialization, execution and destruction of the flowop.
71  * The table flowoplib_funcs[] contains a flowoplib struct for each
72  * implemented flowop. Most flowops use a generic initialization function
73  * and all currently use a generic destruction function. All flowop
74  * functions referenced from the table are in this file, though, of
75  * course, they often call functions from other files.
76  *
77  * The flowop_init() routine uses the flowoplib_funcs[] table to
78  * create an initial set of "instance 0" flowops, one for each type of
79  * flowop, from which all other flowops are derived. These "instance 0"
80  * flowops are initialized with information from the table including
81  * pointers for their fo_init, fo_func and fo_destroy functions. When
82  * a flowop definition is encountered in an f language script, the
83  * "type" of flowop, such as "read" is used to search for the
84  * "instance 0" flowop named "read", then a new flowop is allocated
85  * which inherits its function pointers and other initial properties
86  * from the instance 0 flowop, and is given a new name as specified
87  * by the "name=" attribute.
88  */
89 
90 static void flowoplib_destruct_noop(flowop_t *flowop);
91 static int flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop);
92 static int flowoplib_print(threadflow_t *threadflow, flowop_t *flowop);
93 static int flowoplib_write(threadflow_t *threadflow, flowop_t *flowop);
94 static int flowoplib_read(threadflow_t *threadflow, flowop_t *flowop);
95 static int flowoplib_block_init(flowop_t *flowop);
96 static int flowoplib_block(threadflow_t *threadflow, flowop_t *flowop);
97 static int flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop);
98 static int flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop);
99 static int flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop);
100 static int flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop);
101 static int flowoplib_sempost_init(flowop_t *flowop);
102 static int flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop);
103 static int flowoplib_semblock_init(flowop_t *flowop);
104 static void flowoplib_semblock_destruct(flowop_t *flowop);
105 static int flowoplib_eventlimit(threadflow_t *, flowop_t *flowop);
106 static int flowoplib_bwlimit(threadflow_t *, flowop_t *flowop);
107 static int flowoplib_iopslimit(threadflow_t *, flowop_t *flowop);
108 static int flowoplib_opslimit(threadflow_t *, flowop_t *flowop);
109 static int flowoplib_openfile(threadflow_t *, flowop_t *flowop);
110 static int flowoplib_openfile_common(threadflow_t *, flowop_t *flowop, int fd);
111 static int flowoplib_createfile(threadflow_t *, flowop_t *flowop);
112 static int flowoplib_closefile(threadflow_t *, flowop_t *flowop);
113 static int flowoplib_makedir(threadflow_t *, flowop_t *flowop);
114 static int flowoplib_removedir(threadflow_t *, flowop_t *flowop);
115 static int flowoplib_listdir(threadflow_t *, flowop_t *flowop);
116 static int flowoplib_fsync(threadflow_t *, flowop_t *flowop);
117 static int flowoplib_readwholefile(threadflow_t *, flowop_t *flowop);
118 static int flowoplib_writewholefile(threadflow_t *, flowop_t *flowop);
119 static int flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop);
120 static int flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop);
121 static int flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop);
122 static int flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop);
123 static int flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop);
124 static int flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop);
125 static int flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop);
126 static int flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop);
127 static int flowoplib_testrandvar_init(flowop_t *flowop);
128 static void flowoplib_testrandvar_destruct(flowop_t *flowop);
129 
130 static flowop_proto_t flowoplib_funcs[] = {
131 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "write", flowop_init_generic,
132 	flowoplib_write, flowop_destruct_generic,
133 	FLOW_TYPE_IO, FLOW_ATTR_READ, "read", flowop_init_generic,
134 	flowoplib_read, flowop_destruct_generic,
135 	FLOW_TYPE_SYNC, 0, "block", flowoplib_block_init,
136 	flowoplib_block, flowop_destruct_generic,
137 	FLOW_TYPE_SYNC, 0, "wakeup", flowop_init_generic,
138 	flowoplib_wakeup, flowop_destruct_generic,
139 	FLOW_TYPE_SYNC, 0, "semblock", flowoplib_semblock_init,
140 	flowoplib_semblock, flowoplib_semblock_destruct,
141 	FLOW_TYPE_SYNC, 0, "sempost", flowoplib_sempost_init,
142 	flowoplib_sempost, flowoplib_destruct_noop,
143 	FLOW_TYPE_OTHER, 0, "hog", flowop_init_generic,
144 	flowoplib_hog, flowop_destruct_generic,
145 	FLOW_TYPE_OTHER, 0, "delay", flowop_init_generic,
146 	flowoplib_delay, flowop_destruct_generic,
147 	FLOW_TYPE_OTHER, 0, "eventlimit", flowop_init_generic,
148 	flowoplib_eventlimit, flowop_destruct_generic,
149 	FLOW_TYPE_OTHER, 0, "bwlimit", flowop_init_generic,
150 	flowoplib_bwlimit, flowop_destruct_generic,
151 	FLOW_TYPE_OTHER, 0, "iopslimit", flowop_init_generic,
152 	flowoplib_iopslimit, flowop_destruct_generic,
153 	FLOW_TYPE_OTHER, 0, "opslimit", flowop_init_generic,
154 	flowoplib_opslimit, flowop_destruct_generic,
155 	FLOW_TYPE_OTHER, 0, "finishoncount", flowop_init_generic,
156 	flowoplib_finishoncount, flowop_destruct_generic,
157 	FLOW_TYPE_OTHER, 0, "finishonbytes", flowop_init_generic,
158 	flowoplib_finishonbytes, flowop_destruct_generic,
159 	FLOW_TYPE_IO, 0, "openfile", flowop_init_generic,
160 	flowoplib_openfile, flowop_destruct_generic,
161 	FLOW_TYPE_IO, 0, "createfile", flowop_init_generic,
162 	flowoplib_createfile, flowop_destruct_generic,
163 	FLOW_TYPE_IO, 0, "closefile", flowop_init_generic,
164 	flowoplib_closefile, flowop_destruct_generic,
165 	FLOW_TYPE_IO, 0, "makedir", flowop_init_generic,
166 	flowoplib_makedir, flowop_destruct_generic,
167 	FLOW_TYPE_IO, 0, "removedir", flowop_init_generic,
168 	flowoplib_removedir, flowop_destruct_generic,
169 	FLOW_TYPE_IO, 0, "listdir", flowop_init_generic,
170 	flowoplib_listdir, flowop_destruct_generic,
171 	FLOW_TYPE_IO, 0, "fsync", flowop_init_generic,
172 	flowoplib_fsync, flowop_destruct_generic,
173 	FLOW_TYPE_IO, 0, "fsyncset", flowop_init_generic,
174 	flowoplib_fsyncset, flowop_destruct_generic,
175 	FLOW_TYPE_IO, 0, "statfile", flowop_init_generic,
176 	flowoplib_statfile, flowop_destruct_generic,
177 	FLOW_TYPE_IO, FLOW_ATTR_READ, "readwholefile", flowop_init_generic,
178 	flowoplib_readwholefile, flowop_destruct_generic,
179 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfile", flowop_init_generic,
180 	flowoplib_appendfile, flowop_destruct_generic,
181 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfilerand", flowop_init_generic,
182 	flowoplib_appendfilerand, flowop_destruct_generic,
183 	FLOW_TYPE_IO, 0, "deletefile", flowop_init_generic,
184 	flowoplib_deletefile, flowop_destruct_generic,
185 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "writewholefile", flowop_init_generic,
186 	flowoplib_writewholefile, flowop_destruct_generic,
187 	FLOW_TYPE_OTHER, 0, "print", flowop_init_generic,
188 	flowoplib_print, flowop_destruct_generic,
189 	/* routine to calculate mean and stddev for output from a randvar */
190 	FLOW_TYPE_OTHER, 0, "testrandvar", flowoplib_testrandvar_init,
191 	flowoplib_testrandvar, flowoplib_testrandvar_destruct
192 };
193 
194 /*
195  * Loops through the list of flowops defined in this
196  * module, and creates and initializes a flowop for each one
197  * by calling flowop_flow_init. As a side effect of calling
198  * flowop_flow_init, the created flowops are placed on the
199  * master flowop list. All created flowops are set to
200  * instance "0".
201  */
202 void
203 flowoplib_flowinit()
204 {
205 	int nops = sizeof (flowoplib_funcs) / sizeof (flowop_proto_t);
206 
207 	flowop_flow_init(flowoplib_funcs, nops);
208 }
209 
210 /*
211  * Special total noop destruct
212  */
213 /* ARGSUSED */
214 static void
215 flowoplib_destruct_noop(flowop_t *flowop)
216 {
217 }
218 
219 /*
220  * Generates a file attribute from flags in the supplied flowop.
221  * Sets FLOW_ATTR_DIRECTIO and/or FLOW_ATTR_DSYNC as needed.
222  */
223 static int
224 flowoplib_fileattrs(flowop_t *flowop)
225 {
226 	int attrs = 0;
227 
228 	if (avd_get_bool(flowop->fo_directio))
229 		attrs |= FLOW_ATTR_DIRECTIO;
230 
231 	if (avd_get_bool(flowop->fo_dsync))
232 		attrs |= FLOW_ATTR_DSYNC;
233 
234 	return (attrs);
235 }
236 
237 /*
238  * Obtain a filesetentry for a file. Result placed where filep points.
239  * Supply with a flowop and a flag to indicate whether an existent or
240  * non-existent file is required. Returns FILEBENCH_NORSC if all out
241  * of the appropriate type of directories, FILEBENCH_ERROR if the
242  * flowop does not point to a fileset, and FILEBENCH_OK otherwise.
243  */
244 static int
245 flowoplib_pickfile(filesetentry_t **filep, flowop_t *flowop, int flags, int tid)
246 {
247 	fileset_t	*fileset;
248 	int		fileindex;
249 
250 	if ((fileset = flowop->fo_fileset) == NULL) {
251 		filebench_log(LOG_ERROR, "flowop NO fileset");
252 		return (FILEBENCH_ERROR);
253 	}
254 
255 	if (flowop->fo_fileindex) {
256 		fileindex = (int)(avd_get_dbl(flowop->fo_fileindex) *
257 		    ((double)(fileset->fs_constentries / 2)));
258 		fileindex = fileindex % fileset->fs_constentries;
259 		flags |= FILESET_PICKBYINDEX;
260 	} else {
261 		fileindex = 0;
262 	}
263 
264 	if ((*filep = fileset_pick(fileset, FILESET_PICKFILE | flags,
265 	    tid, fileindex)) == NULL) {
266 		filebench_log(LOG_DEBUG_SCRIPT,
267 		    "flowop %s failed to pick file from fileset %s",
268 		    flowop->fo_name,
269 		    avd_get_str(fileset->fs_name));
270 		return (FILEBENCH_NORSC);
271 	}
272 
273 	return (FILEBENCH_OK);
274 }
275 
276 /*
277  * Obtain a filesetentry for a leaf directory. Result placed where dirp
278  * points. Supply with flowop and a flag to indicate whether an existent
279  * or non-existent leaf directory is required. Returns FILEBENCH_NORSC
280  * if all out of the appropriate type of directories, FILEBENCH_ERROR
281  * if the flowop does not point to a fileset, and FILEBENCH_OK otherwise.
282  */
283 static int
284 flowoplib_pickleafdir(filesetentry_t **dirp, flowop_t *flowop, int flags)
285 {
286 	fileset_t	*fileset;
287 	int		dirindex;
288 
289 	if ((fileset = flowop->fo_fileset) == NULL) {
290 		filebench_log(LOG_ERROR, "flowop NO fileset");
291 		return (FILEBENCH_ERROR);
292 	}
293 
294 	if (flowop->fo_fileindex) {
295 		dirindex = (int)(avd_get_dbl(flowop->fo_fileindex) *
296 		    ((double)(fileset->fs_constleafdirs / 2)));
297 		dirindex = dirindex % fileset->fs_constleafdirs;
298 		flags |= FILESET_PICKBYINDEX;
299 	} else {
300 		dirindex = 0;
301 	}
302 
303 	if ((*dirp = fileset_pick(fileset,
304 	    FILESET_PICKLEAFDIR | flags, 0, dirindex)) == NULL) {
305 		filebench_log(LOG_DEBUG_SCRIPT,
306 		    "flowop %s failed to pick directory from fileset %s",
307 		    flowop->fo_name,
308 		    avd_get_str(fileset->fs_name));
309 		return (FILEBENCH_NORSC);
310 	}
311 
312 	return (FILEBENCH_OK);
313 }
314 
315 /*
316  * Searches for a file descriptor. Tries the flowop's
317  * fo_fdnumber first and returns with it if it has been
318  * explicitly set (greater than 0). It next checks to
319  * see if a rotating file descriptor policy is in effect,
320  * and if not returns the fdnumber regardless of what
321  * it is. (note that if it is 0, it just selects to the
322  * default file descriptor in the threadflow's tf_fd
323  * array). If the rotating fd policy is in effect, it
324  * cycles from the end of the tf_fd array to one location
325  * beyond the maximum needed by the number of entries in
326  * the associated fileset on each invocation, then starts
327  * over from the end.
328  *
329  * The routine returns an index into the threadflow's
330  * tf_fd table where the actual file descriptor will be
331  * found. Note: the calling routine must not call this
332  * routine if the flowop does not have a fileset, and the
333  * flowop's fo_fdnumber is zero and fo_rotatefd is
334  * asserted, or an addressing fault may occur.
335  */
336 static int
337 flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop)
338 {
339 	fbint_t	entries;
340 	int fdnumber = flowop->fo_fdnumber;
341 
342 	/* If the script sets the fd explicitly */
343 	if (fdnumber > 0)
344 		return (fdnumber);
345 
346 	/* If the flowop defaults to persistent fd */
347 	if (!avd_get_bool(flowop->fo_rotatefd))
348 		return (fdnumber);
349 
350 	if (flowop->fo_fileset == NULL) {
351 		filebench_log(LOG_ERROR, "flowop NULL file");
352 		return (FILEBENCH_ERROR);
353 	}
354 
355 	entries = flowop->fo_fileset->fs_constentries;
356 
357 	/* Rotate the fd on each flowop invocation */
358 	if (entries > (THREADFLOW_MAXFD / 2)) {
359 		filebench_log(LOG_ERROR, "Out of file descriptors in flowop %s"
360 		    " (too many files : %llu",
361 		    flowop->fo_name, (u_longlong_t)entries);
362 		return (FILEBENCH_ERROR);
363 	}
364 
365 	/* First time around */
366 	if (threadflow->tf_fdrotor == 0)
367 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
368 
369 	/* One fd for every file in the set */
370 	if (entries == (THREADFLOW_MAXFD - threadflow->tf_fdrotor))
371 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
372 
373 
374 	threadflow->tf_fdrotor--;
375 	filebench_log(LOG_DEBUG_IMPL, "selected fd = %d",
376 	    threadflow->tf_fdrotor);
377 	return (threadflow->tf_fdrotor);
378 }
379 
380 /*
381  * Determines the file descriptor to use, and attempts to open
382  * the file if it is not already open. Also determines the wss
383  * value. Returns FILEBENCH_ERROR on errors, FILESET_NORSC if
384  * if flowop_openfile_common couldn't obtain an appropriate file
385  * from a the fileset, and FILEBENCH_OK otherwise.
386  */
387 static int
388 flowoplib_filesetup(threadflow_t *threadflow, flowop_t *flowop,
389     fbint_t *wssp, fb_fdesc_t **fdescp)
390 {
391 	int fd = flowoplib_fdnum(threadflow, flowop);
392 
393 	if (fd == -1)
394 		return (FILEBENCH_ERROR);
395 
396 	if (threadflow->tf_fd[fd].fd_ptr == NULL) {
397 		int ret;
398 
399 		if ((ret = flowoplib_openfile_common(
400 		    threadflow, flowop, fd)) != FILEBENCH_OK)
401 			return (ret);
402 
403 		if (threadflow->tf_fse[fd]) {
404 			filebench_log(LOG_DEBUG_IMPL, "opened file %s",
405 			    threadflow->tf_fse[fd]->fse_path);
406 		} else {
407 			filebench_log(LOG_DEBUG_IMPL,
408 			    "opened device %s/%s",
409 			    avd_get_str(flowop->fo_fileset->fs_path),
410 			    avd_get_str(flowop->fo_fileset->fs_name));
411 		}
412 	}
413 
414 	*fdescp = &(threadflow->tf_fd[fd]);
415 
416 	if ((*wssp = flowop->fo_constwss) == 0) {
417 		if (threadflow->tf_fse[fd])
418 			*wssp = threadflow->tf_fse[fd]->fse_size;
419 		else
420 			*wssp = avd_get_int(flowop->fo_fileset->fs_size);
421 	}
422 
423 	return (FILEBENCH_OK);
424 }
425 
426 /*
427  * Determines the io buffer or random offset into tf_mem for
428  * the IO operation. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
429  */
430 static int
431 flowoplib_iobufsetup(threadflow_t *threadflow, flowop_t *flowop,
432     caddr_t *iobufp, fbint_t iosize)
433 {
434 	long memsize;
435 	size_t memoffset;
436 
437 	if (iosize == 0) {
438 		filebench_log(LOG_ERROR, "zero iosize for thread %s",
439 		    flowop->fo_name);
440 		return (FILEBENCH_ERROR);
441 	}
442 
443 	if ((memsize = threadflow->tf_constmemsize) != 0) {
444 
445 		/* use tf_mem for I/O with random offset */
446 		if (filebench_randomno(&memoffset,
447 		    memsize, iosize, NULL) == -1) {
448 			filebench_log(LOG_ERROR,
449 			    "tf_memsize smaller than IO size for thread %s",
450 			    flowop->fo_name);
451 			return (FILEBENCH_ERROR);
452 		}
453 		*iobufp = threadflow->tf_mem + memoffset;
454 
455 	} else {
456 		/* use private I/O buffer */
457 		if ((flowop->fo_buf != NULL) &&
458 		    (flowop->fo_buf_size < iosize)) {
459 			/* too small, so free up and re-allocate */
460 			free(flowop->fo_buf);
461 			flowop->fo_buf = NULL;
462 		}
463 
464 		/*
465 		 * Allocate memory for the  buffer. The memory is freed
466 		 * by flowop_destruct_generic() or by this routine if more
467 		 * memory is needed for the buffer.
468 		 */
469 		if ((flowop->fo_buf == NULL) && ((flowop->fo_buf
470 		    = (char *)malloc(iosize)) == NULL))
471 			return (FILEBENCH_ERROR);
472 
473 		flowop->fo_buf_size = iosize;
474 		*iobufp = flowop->fo_buf;
475 	}
476 	return (FILEBENCH_OK);
477 }
478 
479 /*
480  * Determines the file descriptor to use, opens it if necessary, the
481  * io buffer or random offset into tf_mem for IO operation and the wss
482  * value. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
483  */
484 int
485 flowoplib_iosetup(threadflow_t *threadflow, flowop_t *flowop,
486     fbint_t *wssp, caddr_t *iobufp, fb_fdesc_t **filedescp, fbint_t iosize)
487 {
488 	int ret;
489 
490 	if ((ret = flowoplib_filesetup(threadflow, flowop, wssp, filedescp)) !=
491 	    FILEBENCH_OK)
492 		return (ret);
493 
494 	if ((ret = flowoplib_iobufsetup(threadflow, flowop, iobufp, iosize)) !=
495 	    FILEBENCH_OK)
496 		return (ret);
497 
498 	return (FILEBENCH_OK);
499 }
500 
501 /*
502  * Emulate posix read / pread. If the flowop has a fileset,
503  * a file descriptor number index is fetched, otherwise a
504  * supplied fileobj file is used. In either case the specified
505  * file will be opened if not already open. If the flowop has
506  * neither a fileset or fileobj, an error is logged and FILEBENCH_ERROR
507  * returned.
508  *
509  * The actual read is done to a random offset in the
510  * threadflow's thread memory (tf_mem), with a size set by
511  * fo_iosize and at either a random disk offset within the
512  * working set size, or at the next sequential location. If
513  * any errors are encountered, FILEBENCH_ERROR is returned,
514  * if no appropriate file can be obtained from the fileset then
515  * FILEBENCH_NORSC is returned, otherise FILEBENCH_OK is returned.
516  */
517 static int
518 flowoplib_read(threadflow_t *threadflow, flowop_t *flowop)
519 {
520 	caddr_t iobuf;
521 	fbint_t wss;
522 	fbint_t iosize;
523 	fb_fdesc_t *fdesc;
524 	int ret;
525 
526 
527 	iosize = avd_get_int(flowop->fo_iosize);
528 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
529 	    &fdesc, iosize)) != FILEBENCH_OK)
530 		return (ret);
531 
532 	if (avd_get_bool(flowop->fo_random)) {
533 		uint64_t fileoffset;
534 
535 		if (filebench_randomno64(&fileoffset,
536 		    wss, iosize, NULL) == -1) {
537 			filebench_log(LOG_ERROR,
538 			    "file size smaller than IO size for thread %s",
539 			    flowop->fo_name);
540 			return (FILEBENCH_ERROR);
541 		}
542 
543 		(void) flowop_beginop(threadflow, flowop);
544 		if ((ret = FB_PREAD(fdesc, iobuf,
545 		    iosize, (off64_t)fileoffset)) == -1) {
546 			(void) flowop_endop(threadflow, flowop, 0);
547 			filebench_log(LOG_ERROR,
548 			    "read file %s failed, offset %llu "
549 			    "io buffer %zd: %s",
550 			    avd_get_str(flowop->fo_fileset->fs_name),
551 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
552 			flowop_endop(threadflow, flowop, 0);
553 			return (FILEBENCH_ERROR);
554 		}
555 		(void) flowop_endop(threadflow, flowop, ret);
556 
557 		if ((ret == 0))
558 			(void) FB_LSEEK(fdesc, 0, SEEK_SET);
559 
560 	} else {
561 		(void) flowop_beginop(threadflow, flowop);
562 		if ((ret = FB_READ(fdesc, iobuf, iosize)) == -1) {
563 			(void) flowop_endop(threadflow, flowop, 0);
564 			filebench_log(LOG_ERROR,
565 			    "read file %s failed, io buffer %zd: %s",
566 			    avd_get_str(flowop->fo_fileset->fs_name),
567 			    iobuf, strerror(errno));
568 			(void) flowop_endop(threadflow, flowop, 0);
569 			return (FILEBENCH_ERROR);
570 		}
571 		(void) flowop_endop(threadflow, flowop, ret);
572 
573 		if ((ret == 0))
574 			(void) FB_LSEEK(fdesc, 0, SEEK_SET);
575 	}
576 
577 	return (FILEBENCH_OK);
578 }
579 
580 /*
581  * Initializes a "flowop_block" flowop. Specifically, it
582  * initializes the flowop's fo_cv and unlocks the fo_lock.
583  */
584 static int
585 flowoplib_block_init(flowop_t *flowop)
586 {
587 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d block init address %zx",
588 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
589 	(void) pthread_cond_init(&flowop->fo_cv, ipc_condattr());
590 	(void) ipc_mutex_unlock(&flowop->fo_lock);
591 
592 	return (FILEBENCH_OK);
593 }
594 
595 /*
596  * Blocks the threadflow until woken up by flowoplib_wakeup.
597  * The routine blocks on the flowop's fo_cv condition variable.
598  */
599 static int
600 flowoplib_block(threadflow_t *threadflow, flowop_t *flowop)
601 {
602 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d blocking at address %zx",
603 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
604 	(void) ipc_mutex_lock(&flowop->fo_lock);
605 
606 	flowop_beginop(threadflow, flowop);
607 	(void) pthread_cond_wait(&flowop->fo_cv, &flowop->fo_lock);
608 	flowop_endop(threadflow, flowop, 0);
609 
610 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
611 	    flowop->fo_name, flowop->fo_instance);
612 
613 	(void) ipc_mutex_unlock(&flowop->fo_lock);
614 
615 	return (FILEBENCH_OK);
616 }
617 
618 /*
619  * Wakes up one or more target blocking flowops.
620  * Sends broadcasts on the fo_cv condition variables of all
621  * flowops on the target list, except those that are
622  * FLOW_MASTER flowops. The target list consists of all
623  * flowops whose name matches this flowop's "fo_targetname"
624  * attribute. The target list is generated on the first
625  * invocation, and the run will be shutdown if no targets
626  * are found. Otherwise the routine always returns FILEBENCH_OK.
627  */
628 static int
629 flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop)
630 {
631 	flowop_t *target;
632 
633 	/* if this is the first wakeup, create the wakeup list */
634 	if (flowop->fo_targets == NULL) {
635 		flowop_t *result = flowop_find(flowop->fo_targetname);
636 
637 		flowop->fo_targets = result;
638 		if (result == NULL) {
639 			filebench_log(LOG_ERROR,
640 			    "wakeup: could not find op %s for thread %s",
641 			    flowop->fo_targetname,
642 			    threadflow->tf_name);
643 			filebench_shutdown(1);
644 		}
645 		while (result) {
646 			result->fo_targetnext =
647 			    result->fo_resultnext;
648 			result = result->fo_resultnext;
649 		}
650 	}
651 
652 	target = flowop->fo_targets;
653 
654 	/* wakeup the targets */
655 	while (target) {
656 		if (target->fo_instance == FLOW_MASTER) {
657 			target = target->fo_targetnext;
658 			continue;
659 		}
660 		filebench_log(LOG_DEBUG_IMPL,
661 		    "wakeup flow %s-%d at address %zx",
662 		    target->fo_name,
663 		    target->fo_instance,
664 		    &target->fo_cv);
665 
666 		flowop_beginop(threadflow, flowop);
667 		(void) ipc_mutex_lock(&target->fo_lock);
668 		(void) pthread_cond_broadcast(&target->fo_cv);
669 		(void) ipc_mutex_unlock(&target->fo_lock);
670 		flowop_endop(threadflow, flowop, 0);
671 
672 		target = target->fo_targetnext;
673 	}
674 
675 	return (FILEBENCH_OK);
676 }
677 
678 /*
679  * "think time" routines. the "hog" routine consumes cpu cycles as
680  * it "thinks", while the "delay" flowop simply calls sleep() to delay
681  * for a given number of seconds without consuming cpu cycles.
682  */
683 
684 
685 /*
686  * Consumes CPU cycles and memory bandwidth by looping for
687  * flowop->fo_value times. With each loop sets memory location
688  * threadflow->tf_mem to 1.
689  */
690 static int
691 flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop)
692 {
693 	uint64_t value = avd_get_int(flowop->fo_value);
694 	int i;
695 
696 	filebench_log(LOG_DEBUG_IMPL, "hog enter");
697 	flowop_beginop(threadflow, flowop);
698 	if (threadflow->tf_mem != NULL) {
699 		for (i = 0; i < value; i++)
700 			*(threadflow->tf_mem) = 1;
701 	}
702 	flowop_endop(threadflow, flowop, 0);
703 	filebench_log(LOG_DEBUG_IMPL, "hog exit");
704 	return (FILEBENCH_OK);
705 }
706 
707 
708 /*
709  * Delays for fo_value seconds.
710  */
711 static int
712 flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop)
713 {
714 	int value = avd_get_int(flowop->fo_value);
715 
716 	flowop_beginop(threadflow, flowop);
717 	(void) sleep(value);
718 	flowop_endop(threadflow, flowop, 0);
719 	return (FILEBENCH_OK);
720 }
721 
722 /*
723  * Rate limiting routines. This is the event consuming half of the
724  * event system. Each of the four following routines will limit the rate
725  * to one unit of either calls, issued I/O operations, issued filebench
726  * operations, or I/O bandwidth. Since there is only one event generator,
727  * the events will be divided amoung multiple instances of an event
728  * consumer, and further divided among different consumers if more than
729  * one has been defined. There is no mechanism to enforce equal sharing
730  * of events.
731  */
732 
733 /*
734  * Completes one invocation per posted event. If eventgen_q
735  * has an event count greater than zero, one will be removed
736  * (count decremented), otherwise the calling thread will
737  * block until another event has been posted. Always returns 0
738  */
739 static int
740 flowoplib_eventlimit(threadflow_t *threadflow, flowop_t *flowop)
741 {
742 	/* Immediately bail if not set/enabled */
743 	if (filebench_shm->shm_eventgen_hz == NULL)
744 		return (FILEBENCH_OK);
745 
746 	if (flowop->fo_initted == 0) {
747 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
748 		    flowop, threadflow->tf_name, threadflow->tf_instance);
749 		flowop->fo_initted = 1;
750 	}
751 
752 	flowop_beginop(threadflow, flowop);
753 	while (filebench_shm->shm_eventgen_hz != NULL) {
754 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
755 		if (filebench_shm->shm_eventgen_q > 0) {
756 			filebench_shm->shm_eventgen_q--;
757 			(void) ipc_mutex_unlock(
758 			    &filebench_shm->shm_eventgen_lock);
759 			break;
760 		}
761 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
762 		    &filebench_shm->shm_eventgen_lock);
763 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
764 	}
765 	flowop_endop(threadflow, flowop, 0);
766 	return (FILEBENCH_OK);
767 }
768 
769 static int
770 flowoplib_event_find_target(threadflow_t *threadflow, flowop_t *flowop)
771 {
772 	if (flowop->fo_targetname[0] != '\0') {
773 
774 		/* Try to use statistics from specific flowop */
775 		flowop->fo_targets =
776 		    flowop_find_from_list(flowop->fo_targetname,
777 		    threadflow->tf_thrd_fops);
778 		if (flowop->fo_targets == NULL) {
779 			filebench_log(LOG_ERROR,
780 			    "limit target: could not find flowop %s",
781 			    flowop->fo_targetname);
782 			filebench_shutdown(1);
783 			return (FILEBENCH_ERROR);
784 		}
785 	} else {
786 		/* use total workload statistics */
787 		flowop->fo_targets = NULL;
788 	}
789 	return (FILEBENCH_OK);
790 }
791 
792 /*
793  * Blocks the calling thread if the number of issued I/O
794  * operations exceeds the number of posted events, thus
795  * limiting the average I/O operation rate to the rate
796  * specified by eventgen_hz. Always returns FILEBENCH_OK.
797  */
798 static int
799 flowoplib_iopslimit(threadflow_t *threadflow, flowop_t *flowop)
800 {
801 	uint64_t iops;
802 	uint64_t delta;
803 	uint64_t events;
804 
805 	/* Immediately bail if not set/enabled */
806 	if (filebench_shm->shm_eventgen_hz == NULL)
807 		return (FILEBENCH_OK);
808 
809 	if (flowop->fo_initted == 0) {
810 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
811 		    flowop, threadflow->tf_name, threadflow->tf_instance);
812 		flowop->fo_initted = 1;
813 
814 		if (flowoplib_event_find_target(threadflow, flowop)
815 		    == FILEBENCH_ERROR)
816 			return (FILEBENCH_ERROR);
817 
818 		if (flowop->fo_targets && ((flowop->fo_targets->fo_attrs &
819 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
820 			filebench_log(LOG_ERROR,
821 			    "WARNING: Flowop %s does no IO",
822 			    flowop->fo_targets->fo_name);
823 			filebench_shutdown(1);
824 			return (FILEBENCH_ERROR);
825 		}
826 	}
827 
828 	if (flowop->fo_targets) {
829 		/*
830 		 * Note that fs_count is already the sum of fs_rcount
831 		 * and fs_wcount if looking at a single flowop.
832 		 */
833 		iops = flowop->fo_targets->fo_stats.fs_count;
834 	} else {
835 		(void) ipc_mutex_lock(&controlstats_lock);
836 		iops = (controlstats.fs_rcount +
837 		    controlstats.fs_wcount);
838 		(void) ipc_mutex_unlock(&controlstats_lock);
839 	}
840 
841 	/* Is this the first time around */
842 	if (flowop->fo_tputlast == 0) {
843 		flowop->fo_tputlast = iops;
844 		return (FILEBENCH_OK);
845 	}
846 
847 	delta = iops - flowop->fo_tputlast;
848 	flowop->fo_tputbucket -= delta;
849 	flowop->fo_tputlast = iops;
850 
851 	/* No need to block if the q isn't empty */
852 	if (flowop->fo_tputbucket >= 0LL) {
853 		flowop_endop(threadflow, flowop, 0);
854 		return (FILEBENCH_OK);
855 	}
856 
857 	iops = flowop->fo_tputbucket * -1;
858 	events = iops;
859 
860 	flowop_beginop(threadflow, flowop);
861 	while (filebench_shm->shm_eventgen_hz != NULL) {
862 
863 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
864 		if (filebench_shm->shm_eventgen_q >= events) {
865 			filebench_shm->shm_eventgen_q -= events;
866 			(void) ipc_mutex_unlock(
867 			    &filebench_shm->shm_eventgen_lock);
868 			flowop->fo_tputbucket += events;
869 			break;
870 		}
871 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
872 		    &filebench_shm->shm_eventgen_lock);
873 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
874 	}
875 	flowop_endop(threadflow, flowop, 0);
876 
877 	return (FILEBENCH_OK);
878 }
879 
880 /*
881  * Blocks the calling thread if the number of issued filebench
882  * operations exceeds the number of posted events, thus limiting
883  * the average filebench operation rate to the rate specified by
884  * eventgen_hz. Always returns FILEBENCH_OK.
885  */
886 static int
887 flowoplib_opslimit(threadflow_t *threadflow, flowop_t *flowop)
888 {
889 	uint64_t ops;
890 	uint64_t delta;
891 	uint64_t events;
892 
893 	/* Immediately bail if not set/enabled */
894 	if (filebench_shm->shm_eventgen_hz == NULL)
895 		return (FILEBENCH_OK);
896 
897 	if (flowop->fo_initted == 0) {
898 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
899 		    flowop, threadflow->tf_name, threadflow->tf_instance);
900 		flowop->fo_initted = 1;
901 
902 		if (flowoplib_event_find_target(threadflow, flowop)
903 		    == FILEBENCH_ERROR)
904 			return (FILEBENCH_ERROR);
905 	}
906 
907 	if (flowop->fo_targets) {
908 		ops = flowop->fo_targets->fo_stats.fs_count;
909 	} else {
910 		(void) ipc_mutex_lock(&controlstats_lock);
911 		ops = controlstats.fs_count;
912 		(void) ipc_mutex_unlock(&controlstats_lock);
913 	}
914 
915 	/* Is this the first time around */
916 	if (flowop->fo_tputlast == 0) {
917 		flowop->fo_tputlast = ops;
918 		return (FILEBENCH_OK);
919 	}
920 
921 	delta = ops - flowop->fo_tputlast;
922 	flowop->fo_tputbucket -= delta;
923 	flowop->fo_tputlast = ops;
924 
925 	/* No need to block if the q isn't empty */
926 	if (flowop->fo_tputbucket >= 0LL) {
927 		flowop_endop(threadflow, flowop, 0);
928 		return (FILEBENCH_OK);
929 	}
930 
931 	ops = flowop->fo_tputbucket * -1;
932 	events = ops;
933 
934 	flowop_beginop(threadflow, flowop);
935 	while (filebench_shm->shm_eventgen_hz != NULL) {
936 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
937 		if (filebench_shm->shm_eventgen_q >= events) {
938 			filebench_shm->shm_eventgen_q -= events;
939 			(void) ipc_mutex_unlock(
940 			    &filebench_shm->shm_eventgen_lock);
941 			flowop->fo_tputbucket += events;
942 			break;
943 		}
944 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
945 		    &filebench_shm->shm_eventgen_lock);
946 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
947 	}
948 	flowop_endop(threadflow, flowop, 0);
949 
950 	return (FILEBENCH_OK);
951 }
952 
953 
954 /*
955  * Blocks the calling thread if the number of bytes of I/O
956  * issued exceeds one megabyte times the number of posted
957  * events, thus limiting the average I/O byte rate to one
958  * megabyte times the event rate as set by eventgen_hz.
959  * Always retuns FILEBENCH_OK.
960  */
961 static int
962 flowoplib_bwlimit(threadflow_t *threadflow, flowop_t *flowop)
963 {
964 	uint64_t bytes;
965 	uint64_t delta;
966 	uint64_t events;
967 
968 	/* Immediately bail if not set/enabled */
969 	if (filebench_shm->shm_eventgen_hz == NULL)
970 		return (FILEBENCH_OK);
971 
972 	if (flowop->fo_initted == 0) {
973 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
974 		    flowop, threadflow->tf_name, threadflow->tf_instance);
975 		flowop->fo_initted = 1;
976 
977 		if (flowoplib_event_find_target(threadflow, flowop)
978 		    == FILEBENCH_ERROR)
979 			return (FILEBENCH_ERROR);
980 
981 		if ((flowop->fo_targets) &&
982 		    ((flowop->fo_targets->fo_attrs &
983 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
984 			filebench_log(LOG_ERROR,
985 			    "WARNING: Flowop %s does no Reads or Writes",
986 			    flowop->fo_targets->fo_name);
987 			filebench_shutdown(1);
988 			return (FILEBENCH_ERROR);
989 		}
990 	}
991 
992 	if (flowop->fo_targets) {
993 		/*
994 		 * Note that fs_bytes is already the sum of fs_rbytes
995 		 * and fs_wbytes if looking at a single flowop.
996 		 */
997 		bytes = flowop->fo_targets->fo_stats.fs_bytes;
998 	} else {
999 		(void) ipc_mutex_lock(&controlstats_lock);
1000 		bytes = (controlstats.fs_rbytes +
1001 		    controlstats.fs_wbytes);
1002 		(void) ipc_mutex_unlock(&controlstats_lock);
1003 	}
1004 
1005 	/* Is this the first time around? */
1006 	if (flowop->fo_tputlast == 0) {
1007 		flowop->fo_tputlast = bytes;
1008 		return (FILEBENCH_OK);
1009 	}
1010 
1011 	delta = bytes - flowop->fo_tputlast;
1012 	flowop->fo_tputbucket -= delta;
1013 	flowop->fo_tputlast = bytes;
1014 
1015 	/* No need to block if the q isn't empty */
1016 	if (flowop->fo_tputbucket >= 0LL) {
1017 		flowop_endop(threadflow, flowop, 0);
1018 		return (FILEBENCH_OK);
1019 	}
1020 
1021 	bytes = flowop->fo_tputbucket * -1;
1022 	events = (bytes / MB) + 1;
1023 
1024 	filebench_log(LOG_DEBUG_IMPL, "%llu bytes, %llu events",
1025 	    (u_longlong_t)bytes, (u_longlong_t)events);
1026 
1027 	flowop_beginop(threadflow, flowop);
1028 	while (filebench_shm->shm_eventgen_hz != NULL) {
1029 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1030 		if (filebench_shm->shm_eventgen_q >= events) {
1031 			filebench_shm->shm_eventgen_q -= events;
1032 			(void) ipc_mutex_unlock(
1033 			    &filebench_shm->shm_eventgen_lock);
1034 			flowop->fo_tputbucket += (events * MB);
1035 			break;
1036 		}
1037 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1038 		    &filebench_shm->shm_eventgen_lock);
1039 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1040 	}
1041 	flowop_endop(threadflow, flowop, 0);
1042 
1043 	return (FILEBENCH_OK);
1044 }
1045 
1046 /*
1047  * These flowops terminate a benchmark run when either the specified
1048  * number of bytes of I/O (flowoplib_finishonbytes) or the specified
1049  * number of I/O operations (flowoplib_finishoncount) have been generated.
1050  */
1051 
1052 
1053 /*
1054  * Stop filebench run when specified number of I/O bytes have been
1055  * transferred. Compares controlstats.fs_bytes with flowop->value,
1056  * and if greater returns 1, stopping the run, if not, returns 0
1057  * to continue running.
1058  */
1059 static int
1060 flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop)
1061 {
1062 	uint64_t bytes_io;		/* Bytes of I/O delivered so far */
1063 	uint64_t byte_lim = flowop->fo_constvalue;  /* Total Bytes desired */
1064 						    /* Uses constant value */
1065 
1066 	if (flowop->fo_initted == 0) {
1067 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1068 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1069 		flowop->fo_initted = 1;
1070 
1071 		if (flowoplib_event_find_target(threadflow, flowop)
1072 		    == FILEBENCH_ERROR)
1073 			return (FILEBENCH_ERROR);
1074 
1075 		if ((flowop->fo_targets) &&
1076 		    ((flowop->fo_targets->fo_attrs &
1077 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
1078 			filebench_log(LOG_ERROR,
1079 			    "WARNING: Flowop %s does no Reads or Writes",
1080 			    flowop->fo_targets->fo_name);
1081 			filebench_shutdown(1);
1082 			return (FILEBENCH_ERROR);
1083 		}
1084 	}
1085 
1086 	if (flowop->fo_targets) {
1087 		bytes_io = flowop->fo_targets->fo_stats.fs_bytes;
1088 	} else {
1089 		(void) ipc_mutex_lock(&controlstats_lock);
1090 		bytes_io = controlstats.fs_bytes;
1091 		(void) ipc_mutex_unlock(&controlstats_lock);
1092 	}
1093 
1094 	flowop_beginop(threadflow, flowop);
1095 	if (bytes_io > byte_lim) {
1096 		flowop_endop(threadflow, flowop, 0);
1097 		return (FILEBENCH_DONE);
1098 	}
1099 	flowop_endop(threadflow, flowop, 0);
1100 
1101 	return (FILEBENCH_OK);
1102 }
1103 
1104 /*
1105  * Stop filebench run when specified number of I/O operations have
1106  * been performed. Compares controlstats.fs_count with *flowop->value,
1107  * and if greater returns 1, stopping the run, if not, returns FILEBENCH_OK
1108  * to continue running.
1109  */
1110 static int
1111 flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop)
1112 {
1113 	uint64_t ops;
1114 	uint64_t count = flowop->fo_constvalue; /* use constant value */
1115 
1116 	if (flowop->fo_initted == 0) {
1117 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1118 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1119 		flowop->fo_initted = 1;
1120 
1121 		if (flowoplib_event_find_target(threadflow, flowop)
1122 		    == FILEBENCH_ERROR)
1123 			return (FILEBENCH_ERROR);
1124 	}
1125 
1126 	if (flowop->fo_targets) {
1127 		ops = flowop->fo_targets->fo_stats.fs_count;
1128 	} else {
1129 		(void) ipc_mutex_lock(&controlstats_lock);
1130 		ops = controlstats.fs_count;
1131 		(void) ipc_mutex_unlock(&controlstats_lock);
1132 	}
1133 
1134 	flowop_beginop(threadflow, flowop);
1135 	if (ops >= count) {
1136 		flowop_endop(threadflow, flowop, 0);
1137 		return (FILEBENCH_DONE);
1138 	}
1139 	flowop_endop(threadflow, flowop, 0);
1140 
1141 	return (FILEBENCH_OK);
1142 }
1143 
1144 /*
1145  * Semaphore synchronization using either System V semaphores or
1146  * posix semaphores. If System V semaphores are available, they will be
1147  * used, otherwise posix semaphores will be used.
1148  */
1149 
1150 
1151 /*
1152  * Initializes the filebench "block on semaphore" flowop.
1153  * If System V semaphores are implemented, the routine
1154  * initializes the System V semaphore subsystem if it hasn't
1155  * already been initialized, also allocates a pair of semids
1156  * and initializes the highwater System V semaphore.
1157  * If no System V semaphores, then does nothing special.
1158  * Returns FILEBENCH_ERROR if it cannot acquire a set of System V semphores
1159  * or if the initial post to the semaphore set fails. Returns FILEBENCH_OK
1160  * on success.
1161  */
1162 static int
1163 flowoplib_semblock_init(flowop_t *flowop)
1164 {
1165 
1166 #ifdef HAVE_SYSV_SEM
1167 	int sys_semid;
1168 	struct sembuf sbuf[2];
1169 	int highwater;
1170 
1171 	ipc_seminit();
1172 
1173 	flowop->fo_semid_lw = ipc_semidalloc();
1174 	flowop->fo_semid_hw = ipc_semidalloc();
1175 
1176 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d semblock init semid=%x",
1177 	    flowop->fo_name, flowop->fo_instance, flowop->fo_semid_lw);
1178 
1179 	sys_semid = filebench_shm->shm_sys_semid;
1180 
1181 	if ((highwater = flowop->fo_semid_hw) == 0)
1182 		highwater = flowop->fo_constvalue; /* use constant value */
1183 
1184 	filebench_log(LOG_DEBUG_IMPL, "setting highwater to : %d", highwater);
1185 
1186 	sbuf[0].sem_num = (short)highwater;
1187 	sbuf[0].sem_op = avd_get_int(flowop->fo_highwater);
1188 	sbuf[0].sem_flg = 0;
1189 	if ((semop(sys_semid, &sbuf[0], 1) == -1) && errno) {
1190 		filebench_log(LOG_ERROR, "semblock init post failed: %s (%d,"
1191 		    "%d)", strerror(errno), sbuf[0].sem_num, sbuf[0].sem_op);
1192 		return (FILEBENCH_ERROR);
1193 	}
1194 #else
1195 	filebench_log(LOG_DEBUG_IMPL,
1196 	    "flow %s-%d semblock init with posix semaphore",
1197 	    flowop->fo_name, flowop->fo_instance);
1198 
1199 	sem_init(&flowop->fo_sem, 1, 0);
1200 #endif	/* HAVE_SYSV_SEM */
1201 
1202 	if (!(avd_get_bool(flowop->fo_blocking)))
1203 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1204 
1205 	return (FILEBENCH_OK);
1206 }
1207 
1208 /*
1209  * Releases the semids for the System V semaphore allocated
1210  * to this flowop. If not using System V semaphores, then
1211  * it is effectively just a no-op.
1212  */
1213 static void
1214 flowoplib_semblock_destruct(flowop_t *flowop)
1215 {
1216 #ifdef HAVE_SYSV_SEM
1217 	ipc_semidfree(flowop->fo_semid_lw);
1218 	ipc_semidfree(flowop->fo_semid_hw);
1219 #else
1220 	sem_destroy(&flowop->fo_sem);
1221 #endif /* HAVE_SYSV_SEM */
1222 }
1223 
1224 /*
1225  * Attempts to pass a System V or posix semaphore as appropriate,
1226  * and blocks if necessary. Returns FILEBENCH_ERROR if a set of System V
1227  * semphores is not available or cannot be acquired, or if the initial
1228  * post to the semaphore set fails. Returns FILEBENCH_OK on success.
1229  */
1230 static int
1231 flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop)
1232 {
1233 
1234 #ifdef HAVE_SYSV_SEM
1235 	struct sembuf sbuf[2];
1236 	int value = avd_get_int(flowop->fo_value);
1237 	int sys_semid;
1238 	struct timespec timeout;
1239 
1240 	sys_semid = filebench_shm->shm_sys_semid;
1241 
1242 	filebench_log(LOG_DEBUG_IMPL,
1243 	    "flow %s-%d sem blocking on id %x num %x value %d",
1244 	    flowop->fo_name, flowop->fo_instance, sys_semid,
1245 	    flowop->fo_semid_hw, value);
1246 
1247 	/* Post, decrement the increment the hw queue */
1248 	sbuf[0].sem_num = flowop->fo_semid_hw;
1249 	sbuf[0].sem_op = (short)value;
1250 	sbuf[0].sem_flg = 0;
1251 	sbuf[1].sem_num = flowop->fo_semid_lw;
1252 	sbuf[1].sem_op = value * -1;
1253 	sbuf[1].sem_flg = 0;
1254 	timeout.tv_sec = 600;
1255 	timeout.tv_nsec = 0;
1256 
1257 	if (avd_get_bool(flowop->fo_blocking))
1258 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1259 
1260 	flowop_beginop(threadflow, flowop);
1261 
1262 #ifdef HAVE_SEMTIMEDOP
1263 	(void) semtimedop(sys_semid, &sbuf[0], 1, &timeout);
1264 	(void) semtimedop(sys_semid, &sbuf[1], 1, &timeout);
1265 #else
1266 	(void) semop(sys_semid, &sbuf[0], 1);
1267 	(void) semop(sys_semid, &sbuf[1], 1);
1268 #endif /* HAVE_SEMTIMEDOP */
1269 
1270 	if (avd_get_bool(flowop->fo_blocking))
1271 		(void) ipc_mutex_lock(&flowop->fo_lock);
1272 
1273 	flowop_endop(threadflow, flowop, 0);
1274 
1275 #else
1276 	int value = avd_get_int(flowop->fo_value);
1277 	int i;
1278 
1279 	filebench_log(LOG_DEBUG_IMPL,
1280 	    "flow %s-%d sem blocking on posix semaphore",
1281 	    flowop->fo_name, flowop->fo_instance);
1282 
1283 	/* Decrement sem by value */
1284 	for (i = 0; i < value; i++) {
1285 		if (sem_wait(&flowop->fo_sem) == -1) {
1286 			filebench_log(LOG_ERROR, "semop wait failed");
1287 			return (FILEBENCH_ERROR);
1288 		}
1289 	}
1290 
1291 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d sem unblocking",
1292 	    flowop->fo_name, flowop->fo_instance);
1293 #endif /* HAVE_SYSV_SEM */
1294 
1295 	return (FILEBENCH_OK);
1296 }
1297 
1298 /*
1299  * Calls ipc_seminit(). Always returns FILEBENCH_OK.
1300  */
1301 /* ARGSUSED */
1302 static int
1303 flowoplib_sempost_init(flowop_t *flowop)
1304 {
1305 #ifdef HAVE_SYSV_SEM
1306 	ipc_seminit();
1307 #endif /* HAVE_SYSV_SEM */
1308 	return (FILEBENCH_OK);
1309 }
1310 
1311 /*
1312  * Post to a System V or posix semaphore as appropriate.
1313  * On the first call for a given flowop instance, this routine
1314  * will use the fo_targetname attribute to locate all semblock
1315  * flowops that are expecting posts from this flowop. All
1316  * target flowops on this list will have a post operation done
1317  * to their semaphores on each call.
1318  */
1319 static int
1320 flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop)
1321 {
1322 	flowop_t *target;
1323 
1324 	filebench_log(LOG_DEBUG_IMPL,
1325 	    "sempost flow %s-%d",
1326 	    flowop->fo_name,
1327 	    flowop->fo_instance);
1328 
1329 	/* if this is the first post, create the post list */
1330 	if (flowop->fo_targets == NULL) {
1331 		flowop_t *result = flowop_find(flowop->fo_targetname);
1332 
1333 		flowop->fo_targets = result;
1334 
1335 		if (result == NULL) {
1336 			filebench_log(LOG_ERROR,
1337 			    "sempost: could not find op %s for thread %s",
1338 			    flowop->fo_targetname,
1339 			    threadflow->tf_name);
1340 			filebench_shutdown(1);
1341 		}
1342 
1343 		while (result) {
1344 			result->fo_targetnext =
1345 			    result->fo_resultnext;
1346 			result = result->fo_resultnext;
1347 		}
1348 	}
1349 
1350 	target = flowop->fo_targets;
1351 
1352 	flowop_beginop(threadflow, flowop);
1353 	/* post to the targets */
1354 	while (target) {
1355 #ifdef HAVE_SYSV_SEM
1356 		struct sembuf sbuf[2];
1357 		int sys_semid;
1358 		int blocking;
1359 #else
1360 		int i;
1361 #endif /* HAVE_SYSV_SEM */
1362 		struct timespec timeout;
1363 		int value = (int)avd_get_int(flowop->fo_value);
1364 
1365 		if (target->fo_instance == FLOW_MASTER) {
1366 			target = target->fo_targetnext;
1367 			continue;
1368 		}
1369 
1370 #ifdef HAVE_SYSV_SEM
1371 
1372 		filebench_log(LOG_DEBUG_IMPL,
1373 		    "sempost flow %s-%d num %x",
1374 		    target->fo_name,
1375 		    target->fo_instance,
1376 		    target->fo_semid_lw);
1377 
1378 		sys_semid = filebench_shm->shm_sys_semid;
1379 		sbuf[0].sem_num = target->fo_semid_lw;
1380 		sbuf[0].sem_op = (short)value;
1381 		sbuf[0].sem_flg = 0;
1382 		sbuf[1].sem_num = target->fo_semid_hw;
1383 		sbuf[1].sem_op = value * -1;
1384 		sbuf[1].sem_flg = 0;
1385 		timeout.tv_sec = 600;
1386 		timeout.tv_nsec = 0;
1387 
1388 		if (avd_get_bool(flowop->fo_blocking))
1389 			blocking = 1;
1390 		else
1391 			blocking = 0;
1392 
1393 #ifdef HAVE_SEMTIMEDOP
1394 		if ((semtimedop(sys_semid, &sbuf[0], blocking + 1,
1395 		    &timeout) == -1) && (errno && (errno != EAGAIN))) {
1396 #else
1397 		if ((semop(sys_semid, &sbuf[0], blocking + 1) == -1) &&
1398 		    (errno && (errno != EAGAIN))) {
1399 #endif /* HAVE_SEMTIMEDOP */
1400 			filebench_log(LOG_ERROR, "semop post failed: %s",
1401 			    strerror(errno));
1402 			return (FILEBENCH_ERROR);
1403 		}
1404 
1405 		filebench_log(LOG_DEBUG_IMPL,
1406 		    "flow %s-%d finished posting",
1407 		    target->fo_name, target->fo_instance);
1408 #else
1409 		filebench_log(LOG_DEBUG_IMPL,
1410 		    "sempost flow %s-%d to posix semaphore",
1411 		    target->fo_name,
1412 		    target->fo_instance);
1413 
1414 		/* Increment sem by value */
1415 		for (i = 0; i < value; i++) {
1416 			if (sem_post(&target->fo_sem) == -1) {
1417 				filebench_log(LOG_ERROR, "semop post failed");
1418 				return (FILEBENCH_ERROR);
1419 			}
1420 		}
1421 
1422 		filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
1423 		    target->fo_name, target->fo_instance);
1424 #endif /* HAVE_SYSV_SEM */
1425 
1426 		target = target->fo_targetnext;
1427 	}
1428 	flowop_endop(threadflow, flowop, 0);
1429 
1430 	return (FILEBENCH_OK);
1431 }
1432 
1433 
1434 /*
1435  * Section for exercising create / open / close / delete operations
1436  * on files within a fileset. For proper operation, the flowop attribute
1437  * "fd", which sets the fo_fdnumber field in the flowop, must be used
1438  * so that the same file is opened and later closed. "fd" is an index
1439  * into a pair of arrays maintained by threadflows, one of which
1440  * contains the operating system assigned file descriptors and the other
1441  * a pointer to the filesetentry whose file the file descriptor
1442  * references. An openfile flowop defined without fd being set will use
1443  * the default (0) fd or, if specified, rotate through fd indices, but
1444  * createfile and closefile must use the default or a specified fd.
1445  * Meanwhile deletefile picks and arbitrary file to delete, regardless
1446  * of fd attribute.
1447  */
1448 
1449 /*
1450  * Emulates (and actually does) file open. Obtains a file descriptor
1451  * index, then calls flowoplib_openfile_common() to open. Returns
1452  * FILEBENCH_ERROR if no file descriptor is found, and returns the
1453  * status from flowoplib_openfile_common otherwise (FILEBENCH_ERROR,
1454  * FILEBENCH_NORSC, FILEBENCH_OK).
1455  */
1456 static int
1457 flowoplib_openfile(threadflow_t *threadflow, flowop_t *flowop)
1458 {
1459 	int fd = flowoplib_fdnum(threadflow, flowop);
1460 
1461 	if (fd == -1)
1462 		return (FILEBENCH_ERROR);
1463 
1464 	return (flowoplib_openfile_common(threadflow, flowop, fd));
1465 }
1466 
1467 /*
1468  * Common file opening code for filesets. Uses the supplied
1469  * file descriptor index to determine the tf_fd entry to use.
1470  * If the entry is empty (0) and the fileset exists, fileset
1471  * pick is called to select a fileset entry to use. The file
1472  * specified in the filesetentry is opened, and the returned
1473  * operating system file descriptor and a pointer to the
1474  * filesetentry are stored in tf_fd[fd] and tf_fse[fd],
1475  * respectively. Returns FILEBENCH_ERROR on error,
1476  * FILEBENCH_NORSC if no suitable filesetentry can be found,
1477  * and FILEBENCH_OK on success.
1478  */
1479 static int
1480 flowoplib_openfile_common(threadflow_t *threadflow, flowop_t *flowop, int fd)
1481 {
1482 	filesetentry_t *file;
1483 	char *fileset_name;
1484 	int tid = 0;
1485 	int err;
1486 
1487 	if (flowop->fo_fileset == NULL) {
1488 		filebench_log(LOG_ERROR, "flowop NULL file");
1489 		return (FILEBENCH_ERROR);
1490 	}
1491 
1492 	if ((fileset_name =
1493 	    avd_get_str(flowop->fo_fileset->fs_name)) == NULL) {
1494 		filebench_log(LOG_ERROR,
1495 		    "flowop %s: fileset has no name", flowop->fo_name);
1496 		return (FILEBENCH_ERROR);
1497 	}
1498 
1499 	/*
1500 	 * If the flowop doesn't default to persistent fd
1501 	 * then get unique thread ID for use by fileset_pick
1502 	 */
1503 	if (avd_get_bool(flowop->fo_rotatefd))
1504 		tid = threadflow->tf_utid;
1505 
1506 	if (threadflow->tf_fd[fd].fd_ptr != NULL) {
1507 		filebench_log(LOG_ERROR,
1508 		    "flowop %s attempted to open without closing on fd %d",
1509 		    flowop->fo_name, fd);
1510 		return (FILEBENCH_ERROR);
1511 	}
1512 
1513 #ifdef HAVE_RAW_SUPPORT
1514 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1515 		int open_attrs = 0;
1516 		char name[MAXPATHLEN];
1517 
1518 		(void) fb_strlcpy(name,
1519 		    avd_get_str(flowop->fo_fileset->fs_path), MAXPATHLEN);
1520 		(void) fb_strlcat(name, "/", MAXPATHLEN);
1521 		(void) fb_strlcat(name, fileset_name, MAXPATHLEN);
1522 
1523 		if (avd_get_bool(flowop->fo_dsync)) {
1524 #ifdef sun
1525 			open_attrs |= O_DSYNC;
1526 #else
1527 			open_attrs |= O_FSYNC;
1528 #endif
1529 		}
1530 
1531 		filebench_log(LOG_DEBUG_SCRIPT,
1532 		    "open raw device %s flags %d = %d", name, open_attrs, fd);
1533 
1534 		if (FB_OPEN(&(threadflow->tf_fd[fd]), name,
1535 		    O_RDWR | open_attrs, 0666) == FILEBENCH_ERROR) {
1536 			filebench_log(LOG_ERROR,
1537 			    "Failed to open raw device %s: %s",
1538 			    name, strerror(errno));
1539 			return (FILEBENCH_ERROR);
1540 		}
1541 
1542 		/* if running on Solaris, use un-buffered io */
1543 #ifdef sun
1544 		(void) directio(threadflow->tf_fd[fd].fd_num, DIRECTIO_ON);
1545 #endif
1546 
1547 		threadflow->tf_fse[fd] = NULL;
1548 
1549 		return (FILEBENCH_OK);
1550 	}
1551 #endif /* HAVE_RAW_SUPPORT */
1552 
1553 	if ((err = flowoplib_pickfile(&file, flowop,
1554 	    FILESET_PICKEXISTS, tid)) != FILEBENCH_OK) {
1555 		filebench_log(LOG_DEBUG_SCRIPT,
1556 		    "flowop %s failed to pick file from %s on fd %d",
1557 		    flowop->fo_name, fileset_name, fd);
1558 		return (err);
1559 	}
1560 
1561 	threadflow->tf_fse[fd] = file;
1562 
1563 	flowop_beginop(threadflow, flowop);
1564 	err = fileset_openfile(&threadflow->tf_fd[fd], flowop->fo_fileset,
1565 	    file, O_RDWR, 0666, flowoplib_fileattrs(flowop));
1566 	flowop_endop(threadflow, flowop, 0);
1567 
1568 	if (err == FILEBENCH_ERROR) {
1569 		filebench_log(LOG_ERROR, "flowop %s failed to open file %s",
1570 		    flowop->fo_name, file->fse_path);
1571 		return (FILEBENCH_ERROR);
1572 	}
1573 
1574 	filebench_log(LOG_DEBUG_SCRIPT,
1575 	    "flowop %s: opened %s fd[%d] = %d",
1576 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1577 
1578 	return (FILEBENCH_OK);
1579 }
1580 
1581 /*
1582  * Emulate create of a file. Uses the flowop's fdnumber to select
1583  * tf_fd and tf_fse array locations to put the created file's file
1584  * descriptor and filesetentry respectively. Uses flowoplib_pickfile()
1585  * to select a specific filesetentry whose file does not currently
1586  * exist for the file create operation. Then calls
1587  * fileset_openfile() with the O_CREATE flag set to create the
1588  * file. Returns FILEBENCH_ERROR if the array index specified by fdnumber is
1589  * already in use, the flowop has no associated fileset, or
1590  * the create call fails. Returns 1 if a filesetentry with a
1591  * nonexistent file cannot be found. Returns FILEBENCH_OK on success.
1592  */
1593 static int
1594 flowoplib_createfile(threadflow_t *threadflow, flowop_t *flowop)
1595 {
1596 	filesetentry_t *file;
1597 	int fd = flowop->fo_fdnumber;
1598 	int err;
1599 
1600 	if (threadflow->tf_fd[fd].fd_ptr != NULL) {
1601 		filebench_log(LOG_ERROR,
1602 		    "flowop %s attempted to create without closing on fd %d",
1603 		    flowop->fo_name, fd);
1604 		return (FILEBENCH_ERROR);
1605 	}
1606 
1607 	if (flowop->fo_fileset == NULL) {
1608 		filebench_log(LOG_ERROR, "flowop NULL file");
1609 		return (FILEBENCH_ERROR);
1610 	}
1611 
1612 #ifdef HAVE_RAW_SUPPORT
1613 	/* can't be used with raw devices */
1614 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1615 		filebench_log(LOG_ERROR,
1616 		    "flowop %s attempted to a createfile on RAW device",
1617 		    flowop->fo_name);
1618 		return (FILEBENCH_ERROR);
1619 	}
1620 #endif /* HAVE_RAW_SUPPORT */
1621 
1622 	if ((err = flowoplib_pickfile(&file, flowop,
1623 	    FILESET_PICKNOEXIST, 0)) != FILEBENCH_OK) {
1624 		filebench_log(LOG_DEBUG_SCRIPT,
1625 		    "flowop %s failed to pick file from fileset %s",
1626 		    flowop->fo_name,
1627 		    avd_get_str(flowop->fo_fileset->fs_name));
1628 		return (err);
1629 	}
1630 
1631 	threadflow->tf_fse[fd] = file;
1632 
1633 	flowop_beginop(threadflow, flowop);
1634 	err = fileset_openfile(&threadflow->tf_fd[fd], flowop->fo_fileset,
1635 	    file, O_RDWR | O_CREAT, 0666, flowoplib_fileattrs(flowop));
1636 	flowop_endop(threadflow, flowop, 0);
1637 
1638 	if (err == FILEBENCH_ERROR) {
1639 		filebench_log(LOG_ERROR, "failed to create file %s",
1640 		    flowop->fo_name);
1641 		return (FILEBENCH_ERROR);
1642 	}
1643 
1644 	filebench_log(LOG_DEBUG_SCRIPT,
1645 	    "flowop %s: created %s fd[%d] = %d",
1646 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1647 
1648 	return (FILEBENCH_OK);
1649 }
1650 
1651 /*
1652  * Emulates delete of a file. If a valid fd is provided, it uses the
1653  * filesetentry stored at that fd location to select the file to be
1654  * deleted, otherwise it picks an arbitrary filesetentry
1655  * whose file exists. It then uses unlink() to delete it and Clears
1656  * the FSE_EXISTS flag for the filesetentry. Returns FILEBENCH_ERROR if the
1657  * flowop has no associated fileset. Returns FILEBENCH_NORSC if an appropriate
1658  * filesetentry cannot be found, and FILEBENCH_OK on success.
1659  */
1660 static int
1661 flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop)
1662 {
1663 	filesetentry_t *file;
1664 	fileset_t *fileset;
1665 	char path[MAXPATHLEN];
1666 	char *pathtmp;
1667 	int fd = flowop->fo_fdnumber;
1668 
1669 	/* if fd specified, use it to access file */
1670 	if ((fd > 0) && ((file = threadflow->tf_fse[fd]) != NULL)) {
1671 
1672 		/* indicate that the file will be deleted */
1673 		threadflow->tf_fse[fd] = NULL;
1674 
1675 		/* if here, we still have a valid file pointer */
1676 		fileset = file->fse_fileset;
1677 	} else {
1678 
1679 		/* Otherwise, pick arbitrary file */
1680 		file = NULL;
1681 		fileset = flowop->fo_fileset;
1682 	}
1683 
1684 
1685 	if (fileset == NULL) {
1686 		filebench_log(LOG_ERROR, "flowop NULL file");
1687 		return (FILEBENCH_ERROR);
1688 	}
1689 
1690 #ifdef HAVE_RAW_SUPPORT
1691 	/* can't be used with raw devices */
1692 	if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1693 		filebench_log(LOG_ERROR,
1694 		    "flowop %s attempted a deletefile on RAW device",
1695 		    flowop->fo_name);
1696 		return (FILEBENCH_ERROR);
1697 	}
1698 #endif /* HAVE_RAW_SUPPORT */
1699 
1700 	if (file == NULL) {
1701 		int err;
1702 
1703 		/* pick arbitrary, existing (allocated) file */
1704 		if ((err = flowoplib_pickfile(&file, flowop,
1705 		    FILESET_PICKEXISTS, 0)) != FILEBENCH_OK) {
1706 			filebench_log(LOG_DEBUG_SCRIPT,
1707 			    "flowop %s failed to pick file", flowop->fo_name);
1708 			return (err);
1709 		}
1710 	} else {
1711 		/* delete specific file. wait for it to be non-busy */
1712 		(void) ipc_mutex_lock(&fileset->fs_pick_lock);
1713 		while (file->fse_flags & FSE_BUSY) {
1714 			file->fse_flags |= FSE_THRD_WAITNG;
1715 			(void) pthread_cond_wait(&fileset->fs_thrd_wait_cv,
1716 			    &fileset->fs_pick_lock);
1717 		}
1718 
1719 		/* File now available, grab it for deletion */
1720 		file->fse_flags |= FSE_BUSY;
1721 		fileset->fs_idle_files--;
1722 		(void) ipc_mutex_unlock(&fileset->fs_pick_lock);
1723 	}
1724 
1725 	/* don't delete if anyone (other than me) has file open */
1726 	if ((fd > 0) && (threadflow->tf_fd[fd].fd_num > 0)) {
1727 		if (file->fse_open_cnt > 1) {
1728 			filebench_log(LOG_DEBUG_SCRIPT,
1729 			    "flowop %s can't delete file opened by other"
1730 			    " threads at fd = %d", flowop->fo_name, fd);
1731 			fileset_unbusy(file, FALSE, FALSE, 0);
1732 			return (FILEBENCH_OK);
1733 		} else {
1734 			filebench_log(LOG_DEBUG_SCRIPT,
1735 			    "flowop %s deleting still open file at fd = %d",
1736 			    flowop->fo_name, fd);
1737 		}
1738 	} else if (file->fse_open_cnt > 0) {
1739 		filebench_log(LOG_DEBUG_SCRIPT,
1740 		    "flowop %s can't delete file opened by other"
1741 		    " threads at fd = %d, open count = %d",
1742 		    flowop->fo_name, fd, file->fse_open_cnt);
1743 		fileset_unbusy(file, FALSE, FALSE, 0);
1744 		return (FILEBENCH_OK);
1745 	}
1746 
1747 	(void) fb_strlcpy(path, avd_get_str(fileset->fs_path), MAXPATHLEN);
1748 	(void) fb_strlcat(path, "/", MAXPATHLEN);
1749 	(void) fb_strlcat(path, avd_get_str(fileset->fs_name), MAXPATHLEN);
1750 	pathtmp = fileset_resolvepath(file);
1751 	(void) fb_strlcat(path, pathtmp, MAXPATHLEN);
1752 	free(pathtmp);
1753 
1754 	/* delete the selected file */
1755 	flowop_beginop(threadflow, flowop);
1756 	(void) FB_UNLINK(path);
1757 	flowop_endop(threadflow, flowop, 0);
1758 
1759 	/* indicate that it is no longer busy and no longer exists */
1760 	fileset_unbusy(file, TRUE, FALSE, -file->fse_open_cnt);
1761 
1762 	filebench_log(LOG_DEBUG_SCRIPT, "deleted file %s", file->fse_path);
1763 
1764 	return (FILEBENCH_OK);
1765 }
1766 
1767 /*
1768  * Emulates fsync of a file. Obtains the file descriptor index
1769  * from the flowop, obtains the actual file descriptor from
1770  * the threadflow's table, checks to be sure it is still an
1771  * open file, then does an fsync operation on it. Returns FILEBENCH_ERROR
1772  * if the file no longer is open, FILEBENCH_OK otherwise.
1773  */
1774 static int
1775 flowoplib_fsync(threadflow_t *threadflow, flowop_t *flowop)
1776 {
1777 	filesetentry_t *file;
1778 	int fd = flowop->fo_fdnumber;
1779 
1780 	if (threadflow->tf_fd[fd].fd_ptr == NULL) {
1781 		filebench_log(LOG_ERROR,
1782 		    "flowop %s attempted to fsync a closed fd %d",
1783 		    flowop->fo_name, fd);
1784 		return (FILEBENCH_ERROR);
1785 	}
1786 
1787 	file = threadflow->tf_fse[fd];
1788 
1789 	if ((file == NULL) ||
1790 	    (file->fse_fileset->fs_attrs & FILESET_IS_RAW_DEV)) {
1791 		filebench_log(LOG_ERROR,
1792 		    "flowop %s attempted to a fsync a RAW device",
1793 		    flowop->fo_name);
1794 		return (FILEBENCH_ERROR);
1795 	}
1796 
1797 	/* Measure time to fsync */
1798 	flowop_beginop(threadflow, flowop);
1799 	(void) FB_FSYNC(&threadflow->tf_fd[fd]);
1800 	flowop_endop(threadflow, flowop, 0);
1801 
1802 	filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s", file->fse_path);
1803 
1804 	return (FILEBENCH_OK);
1805 }
1806 
1807 /*
1808  * Emulate fsync of an entire fileset. Search through the
1809  * threadflow's file descriptor array, doing fsync() on each
1810  * open file that belongs to the flowop's fileset. Always
1811  * returns FILEBENCH_OK.
1812  */
1813 static int
1814 flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop)
1815 {
1816 	int fd;
1817 
1818 	for (fd = 0; fd < THREADFLOW_MAXFD; fd++) {
1819 		filesetentry_t *file;
1820 
1821 		/* Match the file set to fsync */
1822 		if ((threadflow->tf_fse[fd] == NULL) ||
1823 		    (flowop->fo_fileset != threadflow->tf_fse[fd]->fse_fileset))
1824 			continue;
1825 
1826 		/* Measure time to fsync */
1827 		flowop_beginop(threadflow, flowop);
1828 		(void) FB_FSYNC(&threadflow->tf_fd[fd]);
1829 		flowop_endop(threadflow, flowop, 0);
1830 
1831 		file = threadflow->tf_fse[fd];
1832 
1833 		filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s",
1834 		    file->fse_path);
1835 	}
1836 
1837 	return (FILEBENCH_OK);
1838 }
1839 
1840 /*
1841  * Emulate close of a file.  Obtains the file descriptor index
1842  * from the flowop, obtains the actual file descriptor from the
1843  * threadflow's table, checks to be sure it is still an open
1844  * file, then does a close operation on it. Then sets the
1845  * threadflow file descriptor table entry to 0, and the file set
1846  * entry pointer to NULL. Returns FILEBENCH_ERROR if the file was not open,
1847  * FILEBENCH_OK otherwise.
1848  */
1849 static int
1850 flowoplib_closefile(threadflow_t *threadflow, flowop_t *flowop)
1851 {
1852 	filesetentry_t *file;
1853 	fileset_t *fileset;
1854 	int fd = flowop->fo_fdnumber;
1855 
1856 	if (threadflow->tf_fd[fd].fd_ptr == NULL) {
1857 		filebench_log(LOG_ERROR,
1858 		    "flowop %s attempted to close an already closed fd %d",
1859 		    flowop->fo_name, fd);
1860 		return (FILEBENCH_ERROR);
1861 	}
1862 
1863 	file = threadflow->tf_fse[fd];
1864 	fileset = file->fse_fileset;
1865 
1866 	/* Wait for it to be non-busy */
1867 	(void) ipc_mutex_lock(&fileset->fs_pick_lock);
1868 	while (file->fse_flags & FSE_BUSY) {
1869 		file->fse_flags |= FSE_THRD_WAITNG;
1870 		(void) pthread_cond_wait(&fileset->fs_thrd_wait_cv,
1871 		    &fileset->fs_pick_lock);
1872 	}
1873 
1874 	/* File now available, grab it for closing */
1875 	file->fse_flags |= FSE_BUSY;
1876 
1877 	/* if last open, set declare idle */
1878 	if (file->fse_open_cnt == 1)
1879 		fileset->fs_idle_files--;
1880 
1881 	(void) ipc_mutex_unlock(&fileset->fs_pick_lock);
1882 
1883 	/* Measure time to close */
1884 	flowop_beginop(threadflow, flowop);
1885 	(void) FB_CLOSE(&threadflow->tf_fd[fd]);
1886 	flowop_endop(threadflow, flowop, 0);
1887 
1888 	fileset_unbusy(file, FALSE, FALSE, -1);
1889 
1890 	threadflow->tf_fd[fd].fd_ptr = NULL;
1891 
1892 	filebench_log(LOG_DEBUG_SCRIPT, "closed file %s", file->fse_path);
1893 
1894 	return (FILEBENCH_OK);
1895 }
1896 
1897 /*
1898  * Obtain the full pathname of the directory described by the filesetentry
1899  * indicated by "dir", and copy it into the character array pointed to by
1900  * path. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
1901  */
1902 static int
1903 flowoplib_getdirpath(filesetentry_t *dir, char *path)
1904 {
1905 	char		*fileset_path;
1906 	char		*fileset_name;
1907 	char		*part_path;
1908 
1909 	if ((fileset_path = avd_get_str(dir->fse_fileset->fs_path)) == NULL) {
1910 		filebench_log(LOG_ERROR, "Fileset path not set");
1911 		return (FILEBENCH_ERROR);
1912 	}
1913 
1914 	if ((fileset_name = avd_get_str(dir->fse_fileset->fs_name)) == NULL) {
1915 		filebench_log(LOG_ERROR, "Fileset name not set");
1916 		return (FILEBENCH_ERROR);
1917 	}
1918 
1919 	(void) fb_strlcpy(path, fileset_path, MAXPATHLEN);
1920 	(void) fb_strlcat(path, "/", MAXPATHLEN);
1921 	(void) fb_strlcat(path, fileset_name, MAXPATHLEN);
1922 
1923 	if ((part_path = fileset_resolvepath(dir)) == NULL)
1924 		return (FILEBENCH_ERROR);
1925 
1926 	(void) fb_strlcat(path, part_path, MAXPATHLEN);
1927 	free(part_path);
1928 
1929 	return (FILEBENCH_OK);
1930 }
1931 
1932 /*
1933  * Use mkdir to create a directory.  Obtains the fileset name from the
1934  * flowop, selects a non-existent leaf directory and obtains its full
1935  * path, then uses mkdir to create it on the storage subsystem (make it
1936  * existent). Returns FILEBENCH_NORSC is there are no more non-existent
1937  * directories in the fileset, FILEBENCH_ERROR on other errors, and
1938  * FILEBENCH_OK on success.
1939  */
1940 static int
1941 flowoplib_makedir(threadflow_t *threadflow, flowop_t *flowop)
1942 {
1943 	filesetentry_t	*dir;
1944 	int		ret;
1945 	char		full_path[MAXPATHLEN];
1946 
1947 	if ((ret = flowoplib_pickleafdir(&dir, flowop,
1948 	    FILESET_PICKNOEXIST)) != FILEBENCH_OK)
1949 		return (ret);
1950 
1951 	if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
1952 		return (ret);
1953 
1954 	flowop_beginop(threadflow, flowop);
1955 	(void) FB_MKDIR(full_path, 0755);
1956 	flowop_endop(threadflow, flowop, 0);
1957 
1958 	/* indicate that it is no longer busy and now exists */
1959 	fileset_unbusy(dir, TRUE, TRUE, 0);
1960 
1961 	return (FILEBENCH_OK);
1962 }
1963 
1964 /*
1965  * Use rmdir to delete a directory.  Obtains the fileset name from the
1966  * flowop, selects an existent leaf directory and obtains its full path,
1967  * then uses rmdir to remove it from the storage subsystem (make it
1968  * non-existent). Returns FILEBENCH_NORSC is there are no more existent
1969  * directories in the fileset, FILEBENCH_ERROR on other errors, and
1970  * FILEBENCH_OK on success.
1971  */
1972 static int
1973 flowoplib_removedir(threadflow_t *threadflow, flowop_t *flowop)
1974 {
1975 	filesetentry_t *dir;
1976 	int		ret;
1977 	char		full_path[MAXPATHLEN];
1978 
1979 	if ((ret = flowoplib_pickleafdir(&dir, flowop,
1980 	    FILESET_PICKEXISTS)) != FILEBENCH_OK)
1981 		return (ret);
1982 
1983 	if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
1984 		return (ret);
1985 
1986 	flowop_beginop(threadflow, flowop);
1987 	(void) FB_RMDIR(full_path);
1988 	flowop_endop(threadflow, flowop, 0);
1989 
1990 	/* indicate that it is no longer busy and no longer exists */
1991 	fileset_unbusy(dir, TRUE, FALSE, 0);
1992 
1993 	return (FILEBENCH_OK);
1994 }
1995 
1996 /*
1997  * Use opendir(), multiple readdir() calls, and closedir() to list the
1998  * contents of a directory.  Obtains the fileset name from the
1999  * flowop, selects a normal subdirectory (which always exist) and obtains
2000  * its full path, then uses opendir() to get a DIR handle to it from the
2001  * file system, a readdir() loop to access each directory entry, and
2002  * finally cleans up with a closedir(). The latency reported is the total
2003  * for all this activity, and it also reports the total number of bytes
2004  * in the entries as the amount "read". Returns FILEBENCH_ERROR on errors,
2005  * and FILEBENCH_OK on success.
2006  */
2007 static int
2008 flowoplib_listdir(threadflow_t *threadflow, flowop_t *flowop)
2009 {
2010 	fileset_t	*fileset;
2011 	filesetentry_t	*dir;
2012 	DIR		*dir_handle;
2013 	struct dirent	*direntp;
2014 	int		dir_bytes = 0;
2015 	int		ret;
2016 	char		full_path[MAXPATHLEN];
2017 
2018 	if ((fileset = flowop->fo_fileset) == NULL) {
2019 		filebench_log(LOG_ERROR, "flowop NO fileset");
2020 		return (FILEBENCH_ERROR);
2021 	}
2022 
2023 	if ((dir = fileset_pick(fileset, FILESET_PICKDIR, 0, 0)) == NULL) {
2024 		filebench_log(LOG_DEBUG_SCRIPT,
2025 		    "flowop %s failed to pick directory from fileset %s",
2026 		    flowop->fo_name,
2027 		    avd_get_str(fileset->fs_name));
2028 		return (FILEBENCH_ERROR);
2029 	}
2030 
2031 	if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
2032 		return (ret);
2033 
2034 	flowop_beginop(threadflow, flowop);
2035 
2036 	/* open the directory */
2037 	if ((dir_handle = FB_OPENDIR(full_path)) == NULL) {
2038 		filebench_log(LOG_ERROR,
2039 		    "flowop %s failed to open directory in fileset %s\n",
2040 		    flowop->fo_name, avd_get_str(fileset->fs_name));
2041 		return (FILEBENCH_ERROR);
2042 	}
2043 
2044 	/* read through the directory entries */
2045 	while ((direntp = FB_READDIR(dir_handle)) != NULL) {
2046 		dir_bytes += (strlen(direntp->d_name) +
2047 		    sizeof (struct dirent) - 1);
2048 	}
2049 
2050 	/* close the directory */
2051 	(void) FB_CLOSEDIR(dir_handle);
2052 
2053 	flowop_endop(threadflow, flowop, dir_bytes);
2054 
2055 	/* indicate that it is no longer busy */
2056 	fileset_unbusy(dir, FALSE, FALSE, 0);
2057 
2058 	return (FILEBENCH_OK);
2059 }
2060 
2061 /*
2062  * Emulate stat of a file. Picks an arbitrary filesetentry with
2063  * an existing file from the flowop's fileset, then performs a
2064  * stat() operation on it. Returns FILEBENCH_ERROR if the flowop has no
2065  * associated fileset. Returns FILEBENCH_NORSC if an appropriate filesetentry
2066  * cannot be found, and FILEBENCH_OK on success.
2067  */
2068 static int
2069 flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop)
2070 {
2071 	filesetentry_t *file;
2072 	fileset_t *fileset;
2073 	struct stat64 statbuf;
2074 	int fd = flowop->fo_fdnumber;
2075 
2076 	/* if fd specified and the file is open, use it to access file */
2077 	if ((fd > 0) && (threadflow->tf_fd[fd].fd_num > 0)) {
2078 
2079 		/* check whether file handle still valid */
2080 		if ((file = threadflow->tf_fse[fd]) == NULL) {
2081 			filebench_log(LOG_DEBUG_SCRIPT,
2082 			    "flowop %s trying to stat NULL file at fd = %d",
2083 			    flowop->fo_name, fd);
2084 			return (FILEBENCH_ERROR);
2085 		}
2086 
2087 		/* if here, we still have a valid file pointer */
2088 		fileset = file->fse_fileset;
2089 	} else {
2090 		/* Otherwise, pick arbitrary file */
2091 		file = NULL;
2092 		fileset = flowop->fo_fileset;
2093 	}
2094 
2095 	if (fileset == NULL) {
2096 		filebench_log(LOG_ERROR,
2097 		    "statfile with no fileset specified");
2098 		return (FILEBENCH_ERROR);
2099 	}
2100 
2101 #ifdef HAVE_RAW_SUPPORT
2102 	/* can't be used with raw devices */
2103 	if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
2104 		filebench_log(LOG_ERROR,
2105 		    "flowop %s attempted do a statfile on a RAW device",
2106 		    flowop->fo_name);
2107 		return (FILEBENCH_ERROR);
2108 	}
2109 #endif /* HAVE_RAW_SUPPORT */
2110 
2111 	if (file == NULL) {
2112 		char path[MAXPATHLEN];
2113 		char *pathtmp;
2114 		int err;
2115 
2116 		/* pick arbitrary, existing (allocated) file */
2117 		if ((err = flowoplib_pickfile(&file, flowop,
2118 		    FILESET_PICKEXISTS, 0)) != FILEBENCH_OK) {
2119 			filebench_log(LOG_DEBUG_SCRIPT,
2120 			    "Statfile flowop %s failed to pick file",
2121 			    flowop->fo_name);
2122 			return (err);
2123 		}
2124 
2125 		/* resolve path and do a stat on file */
2126 		(void) fb_strlcpy(path, avd_get_str(fileset->fs_path),
2127 		    MAXPATHLEN);
2128 		(void) fb_strlcat(path, "/", MAXPATHLEN);
2129 		(void) fb_strlcat(path, avd_get_str(fileset->fs_name),
2130 		    MAXPATHLEN);
2131 		pathtmp = fileset_resolvepath(file);
2132 		(void) fb_strlcat(path, pathtmp, MAXPATHLEN);
2133 		free(pathtmp);
2134 
2135 		/* stat the file */
2136 		flowop_beginop(threadflow, flowop);
2137 		if (FB_STAT(path, &statbuf) == -1)
2138 			filebench_log(LOG_ERROR,
2139 			    "statfile flowop %s failed", flowop->fo_name);
2140 		flowop_endop(threadflow, flowop, 0);
2141 
2142 		fileset_unbusy(file, FALSE, FALSE, 0);
2143 	} else {
2144 		/* stat specific file */
2145 		flowop_beginop(threadflow, flowop);
2146 		if (FB_FSTAT(&threadflow->tf_fd[fd], &statbuf) == -1)
2147 			filebench_log(LOG_ERROR,
2148 			    "statfile flowop %s failed", flowop->fo_name);
2149 		flowop_endop(threadflow, flowop, 0);
2150 
2151 	}
2152 
2153 	return (FILEBENCH_OK);
2154 }
2155 
2156 
2157 /*
2158  * Additional reads and writes. Read and write whole files, write
2159  * and append to files. Some of these work with both fileobjs and
2160  * filesets, others only with filesets. The flowoplib_write routine
2161  * writes from thread memory, while the others read or write using
2162  * fo_buf memory. Note that both flowoplib_read() and
2163  * flowoplib_aiowrite() use thread memory as well.
2164  */
2165 
2166 
2167 /*
2168  * Emulate a read of a whole file. The file must be open with
2169  * file descriptor and filesetentry stored at the locations indexed
2170  * by the flowop's fdnumber. It then seeks to the beginning of the
2171  * associated file, and reads fs_iosize bytes at a time until the end
2172  * of the file. Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if
2173  * out of files, and FILEBENCH_OK on success.
2174  */
2175 static int
2176 flowoplib_readwholefile(threadflow_t *threadflow, flowop_t *flowop)
2177 {
2178 	caddr_t iobuf;
2179 	off64_t bytes = 0;
2180 	fb_fdesc_t *fdesc;
2181 	uint64_t wss;
2182 	fbint_t iosize;
2183 	int ret;
2184 	char zerordbuf;
2185 
2186 	/* get the file to use */
2187 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
2188 	    &fdesc)) != FILEBENCH_OK)
2189 		return (ret);
2190 
2191 	/* an I/O size of zero means read entire working set with one I/O */
2192 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
2193 		iosize = wss;
2194 
2195 	/*
2196 	 * The file may actually be 0 bytes long, in which case skip
2197 	 * the buffer set up call (which would fail) and substitute
2198 	 * a small buffer, which won't really be used.
2199 	 */
2200 	if (iosize == 0) {
2201 		iobuf = (caddr_t)&zerordbuf;
2202 		filebench_log(LOG_DEBUG_SCRIPT,
2203 		    "flowop %s read zero length file", flowop->fo_name);
2204 	} else {
2205 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
2206 		    iosize) != 0)
2207 			return (FILEBENCH_ERROR);
2208 	}
2209 
2210 	/* Measure time to read bytes */
2211 	flowop_beginop(threadflow, flowop);
2212 	(void) FB_LSEEK(fdesc, 0, SEEK_SET);
2213 	while ((ret = FB_READ(fdesc, iobuf, iosize)) > 0)
2214 		bytes += ret;
2215 
2216 	flowop_endop(threadflow, flowop, bytes);
2217 
2218 	if (ret < 0) {
2219 		filebench_log(LOG_ERROR,
2220 		    "readwhole fail Failed to read whole file: %s",
2221 		    strerror(errno));
2222 		return (FILEBENCH_ERROR);
2223 	}
2224 
2225 	return (FILEBENCH_OK);
2226 }
2227 
2228 /*
2229  * Emulate a write to a file of size fo_iosize.  Will write
2230  * to a file from a fileset if the flowop's fo_fileset field
2231  * specifies one or its fdnumber is non zero. Otherwise it
2232  * will write to a fileobj file, if one exists. If the file
2233  * is not currently open, the routine will attempt to open
2234  * it. The flowop's fo_wss parameter will be used to set the
2235  * maximum file size if it is non-zero, otherwise the
2236  * filesetentry's  fse_size will be used. A random memory
2237  * buffer offset is calculated, and, if fo_random is TRUE,
2238  * a random file offset is used for the write. Otherwise the
2239  * write is to the next sequential location. Returns
2240  * FILEBENCH_ERROR on errors, FILEBENCH_NORSC if iosetup can't
2241  * obtain a file, or FILEBENCH_OK on success.
2242  */
2243 static int
2244 flowoplib_write(threadflow_t *threadflow, flowop_t *flowop)
2245 {
2246 	caddr_t iobuf;
2247 	fbint_t wss;
2248 	fbint_t iosize;
2249 	fb_fdesc_t *fdesc;
2250 	int ret;
2251 
2252 	iosize = avd_get_int(flowop->fo_iosize);
2253 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2254 	    &fdesc, iosize)) != FILEBENCH_OK)
2255 		return (ret);
2256 
2257 	if (avd_get_bool(flowop->fo_random)) {
2258 		uint64_t fileoffset;
2259 
2260 		if (filebench_randomno64(&fileoffset,
2261 		    wss, iosize, NULL) == -1) {
2262 			filebench_log(LOG_ERROR,
2263 			    "file size smaller than IO size for thread %s",
2264 			    flowop->fo_name);
2265 			return (FILEBENCH_ERROR);
2266 		}
2267 		flowop_beginop(threadflow, flowop);
2268 		if (FB_PWRITE(fdesc, iobuf,
2269 		    iosize, (off64_t)fileoffset) == -1) {
2270 			filebench_log(LOG_ERROR, "write failed, "
2271 			    "offset %llu io buffer %zd: %s",
2272 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
2273 			flowop_endop(threadflow, flowop, 0);
2274 			return (FILEBENCH_ERROR);
2275 		}
2276 		flowop_endop(threadflow, flowop, iosize);
2277 	} else {
2278 		flowop_beginop(threadflow, flowop);
2279 		if (FB_WRITE(fdesc, iobuf, iosize) == -1) {
2280 			filebench_log(LOG_ERROR,
2281 			    "write failed, io buffer %zd: %s",
2282 			    iobuf, strerror(errno));
2283 			flowop_endop(threadflow, flowop, 0);
2284 			return (FILEBENCH_ERROR);
2285 		}
2286 		flowop_endop(threadflow, flowop, iosize);
2287 	}
2288 
2289 	return (FILEBENCH_OK);
2290 }
2291 
2292 /*
2293  * Emulate a write of a whole file.  The size of the file
2294  * is taken from a filesetentry identified by fo_srcfdnumber or
2295  * from the working set size, while the file descriptor used is
2296  * identified by fo_fdnumber. Does multiple writes of fo_iosize
2297  * length length until full file has been written. Returns FILEBENCH_ERROR on
2298  * error, FILEBENCH_NORSC if out of files, FILEBENCH_OK on success.
2299  */
2300 static int
2301 flowoplib_writewholefile(threadflow_t *threadflow, flowop_t *flowop)
2302 {
2303 	caddr_t iobuf;
2304 	filesetentry_t *file;
2305 	int wsize;
2306 	off64_t seek;
2307 	off64_t bytes = 0;
2308 	uint64_t wss;
2309 	fbint_t iosize;
2310 	fb_fdesc_t *fdesc;
2311 	int srcfd = flowop->fo_srcfdnumber;
2312 	int ret;
2313 	char zerowrtbuf;
2314 
2315 	/* get the file to use */
2316 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
2317 	    &fdesc)) != FILEBENCH_OK)
2318 		return (ret);
2319 
2320 	/* an I/O size of zero means write entire working set with one I/O */
2321 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
2322 		iosize = wss;
2323 
2324 	/*
2325 	 * The file may actually be 0 bytes long, in which case skip
2326 	 * the buffer set up call (which would fail) and substitute
2327 	 * a small buffer, which won't really be used.
2328 	 */
2329 	if (iosize == 0) {
2330 		iobuf = (caddr_t)&zerowrtbuf;
2331 		filebench_log(LOG_DEBUG_SCRIPT,
2332 		    "flowop %s wrote zero length file", flowop->fo_name);
2333 	} else {
2334 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
2335 		    iosize) != 0)
2336 			return (FILEBENCH_ERROR);
2337 	}
2338 
2339 	file = threadflow->tf_fse[srcfd];
2340 	if ((srcfd != 0) && (file == NULL)) {
2341 		filebench_log(LOG_ERROR, "flowop %s: NULL src file",
2342 		    flowop->fo_name);
2343 		return (FILEBENCH_ERROR);
2344 	}
2345 
2346 	if (file)
2347 		wss = file->fse_size;
2348 
2349 	wsize = (int)MIN(wss, iosize);
2350 
2351 	/* Measure time to write bytes */
2352 	flowop_beginop(threadflow, flowop);
2353 	for (seek = 0; seek < wss; seek += wsize) {
2354 		ret = FB_WRITE(fdesc, iobuf, wsize);
2355 		if (ret != wsize) {
2356 			filebench_log(LOG_ERROR,
2357 			    "Failed to write %d bytes on fd %d: %s",
2358 			    wsize, fdesc->fd_num, strerror(errno));
2359 			flowop_endop(threadflow, flowop, 0);
2360 			return (FILEBENCH_ERROR);
2361 		}
2362 		wsize = (int)MIN(wss - seek, iosize);
2363 		bytes += ret;
2364 	}
2365 	flowop_endop(threadflow, flowop, bytes);
2366 
2367 	return (FILEBENCH_OK);
2368 }
2369 
2370 
2371 /*
2372  * Emulate a fixed size append to a file. Will append data to
2373  * a file chosen from a fileset if the flowop's fo_fileset
2374  * field specifies one or if its fdnumber is non zero.
2375  * Otherwise it will write to a fileobj file, if one exists.
2376  * The flowop's fo_wss parameter will be used to set the
2377  * maximum file size if it is non-zero, otherwise the
2378  * filesetentry's fse_size will be used. A random memory
2379  * buffer offset is calculated, then a logical seek to the
2380  * end of file is done followed by a write of fo_iosize
2381  * bytes. Writes are actually done from fo_buf, rather than
2382  * tf_mem as is done with flowoplib_write(), and no check
2383  * is made to see if fo_iosize exceeds the size of fo_buf.
2384  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
2385  * files in the fileset, FILEBENCH_OK on success.
2386  */
2387 static int
2388 flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop)
2389 {
2390 	caddr_t iobuf;
2391 	fb_fdesc_t *fdesc;
2392 	fbint_t wss;
2393 	fbint_t iosize;
2394 	int ret;
2395 
2396 	iosize = avd_get_int(flowop->fo_iosize);
2397 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2398 	    &fdesc, iosize)) != FILEBENCH_OK)
2399 		return (ret);
2400 
2401 	/* XXX wss is not being used */
2402 
2403 	/* Measure time to write bytes */
2404 	flowop_beginop(threadflow, flowop);
2405 	(void) FB_LSEEK(fdesc, 0, SEEK_END);
2406 	ret = FB_WRITE(fdesc, iobuf, iosize);
2407 	if (ret != iosize) {
2408 		filebench_log(LOG_ERROR,
2409 		    "Failed to write %llu bytes on fd %d: %s",
2410 		    (u_longlong_t)iosize, fdesc->fd_num, strerror(errno));
2411 		flowop_endop(threadflow, flowop, ret);
2412 		return (FILEBENCH_ERROR);
2413 	}
2414 	flowop_endop(threadflow, flowop, ret);
2415 
2416 	return (FILEBENCH_OK);
2417 }
2418 
2419 /*
2420  * Emulate a random size append to a file. Will append data
2421  * to a file chosen from a fileset if the flowop's fo_fileset
2422  * field specifies one or if its fdnumber is non zero. Otherwise
2423  * it will write to a fileobj file, if one exists. The flowop's
2424  * fo_wss parameter will be used to set the maximum file size
2425  * if it is non-zero, otherwise the filesetentry's fse_size
2426  * will be used.  A random transfer size (but at most fo_iosize
2427  * bytes) and a random memory offset are calculated. A logical
2428  * seek to the end of file is done, then writes of up to
2429  * FILE_ALLOC_BLOCK in size are done until the full transfer
2430  * size has been written. Writes are actually done from fo_buf,
2431  * rather than tf_mem as is done with flowoplib_write().
2432  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
2433  * files in the fileset, FILEBENCH_OK on success.
2434  */
2435 static int
2436 flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop)
2437 {
2438 	caddr_t iobuf;
2439 	uint64_t appendsize;
2440 	fb_fdesc_t *fdesc;
2441 	fbint_t wss;
2442 	fbint_t iosize;
2443 	int ret = 0;
2444 
2445 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0) {
2446 		filebench_log(LOG_ERROR, "zero iosize for flowop %s",
2447 		    flowop->fo_name);
2448 		return (FILEBENCH_ERROR);
2449 	}
2450 
2451 	if (filebench_randomno64(&appendsize, iosize, 1LL, NULL) != 0)
2452 		return (FILEBENCH_ERROR);
2453 
2454 	/* skip if attempting zero length append */
2455 	if (appendsize == 0) {
2456 		flowop_beginop(threadflow, flowop);
2457 		flowop_endop(threadflow, flowop, 0LL);
2458 		return (FILEBENCH_OK);
2459 	}
2460 
2461 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2462 	    &fdesc, appendsize)) != FILEBENCH_OK)
2463 		return (ret);
2464 
2465 	/* XXX wss is not being used */
2466 
2467 	/* Measure time to write bytes */
2468 	flowop_beginop(threadflow, flowop);
2469 
2470 	(void) FB_LSEEK(fdesc, 0, SEEK_END);
2471 	ret = FB_WRITE(fdesc, iobuf, appendsize);
2472 	if (ret != appendsize) {
2473 		filebench_log(LOG_ERROR,
2474 		    "Failed to write %llu bytes on fd %d: %s",
2475 		    (u_longlong_t)appendsize, fdesc->fd_num, strerror(errno));
2476 		flowop_endop(threadflow, flowop, 0);
2477 		return (FILEBENCH_ERROR);
2478 	}
2479 
2480 	flowop_endop(threadflow, flowop, appendsize);
2481 
2482 	return (FILEBENCH_OK);
2483 }
2484 
2485 typedef struct testrandvar_priv {
2486 	uint64_t sample_count;
2487 	double val_sum;
2488 	double sqr_sum;
2489 } testrandvar_priv_t;
2490 
2491 /*
2492  * flowop to calculate various statistics from the number stream
2493  * produced by a random variable. This allows verification that the
2494  * random distribution used to define the random variable is producing
2495  * the expected distribution of random numbers.
2496  */
2497 /* ARGSUSED */
2498 static int
2499 flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop)
2500 {
2501 	testrandvar_priv_t	*mystats;
2502 	double			value;
2503 
2504 	if ((mystats = (testrandvar_priv_t *)flowop->fo_private) == NULL) {
2505 		filebench_log(LOG_ERROR, "testrandvar not initialized\n");
2506 		filebench_shutdown(1);
2507 		return (-1);
2508 	}
2509 
2510 	value = avd_get_dbl(flowop->fo_value);
2511 
2512 	mystats->sample_count++;
2513 	mystats->val_sum += value;
2514 	mystats->sqr_sum += (value * value);
2515 
2516 	return (0);
2517 }
2518 
2519 /*
2520  * Initialize the private data area used to accumulate the statistics
2521  */
2522 static int
2523 flowoplib_testrandvar_init(flowop_t *flowop)
2524 {
2525 	testrandvar_priv_t	*mystats;
2526 
2527 	if ((mystats = (testrandvar_priv_t *)
2528 	    malloc(sizeof (testrandvar_priv_t))) == NULL) {
2529 		filebench_log(LOG_ERROR, "could not initialize testrandvar");
2530 		filebench_shutdown(1);
2531 		return (-1);
2532 	}
2533 
2534 	mystats->sample_count = 0;
2535 	mystats->val_sum = 0;
2536 	mystats->sqr_sum = 0;
2537 	flowop->fo_private = (void *)mystats;
2538 
2539 	(void) ipc_mutex_unlock(&flowop->fo_lock);
2540 	return (0);
2541 }
2542 
2543 /*
2544  * Print out the accumulated statistics, and free the private storage
2545  */
2546 static void
2547 flowoplib_testrandvar_destruct(flowop_t *flowop)
2548 {
2549 	testrandvar_priv_t	*mystats;
2550 	double mean, std_dev, dbl_count;
2551 
2552 	(void) ipc_mutex_lock(&flowop->fo_lock);
2553 	if ((mystats = (testrandvar_priv_t *)
2554 	    flowop->fo_private) == NULL) {
2555 		(void) ipc_mutex_unlock(&flowop->fo_lock);
2556 		return;
2557 	}
2558 
2559 	flowop->fo_private = NULL;
2560 	(void) ipc_mutex_unlock(&flowop->fo_lock);
2561 
2562 	dbl_count = (double)mystats->sample_count;
2563 	mean = mystats->val_sum / dbl_count;
2564 	std_dev = sqrt((mystats->sqr_sum / dbl_count) - (mean * mean)) / mean;
2565 
2566 	filebench_log(LOG_VERBOSE,
2567 	    "testrandvar: ops = %llu, mean = %8.2lf, stddev = %8.2lf",
2568 	    (u_longlong_t)mystats->sample_count, mean, std_dev);
2569 	free(mystats);
2570 }
2571 
2572 /*
2573  * prints message to the console from within a thread
2574  */
2575 static int
2576 flowoplib_print(threadflow_t *threadflow, flowop_t *flowop)
2577 {
2578 	procflow_t *procflow;
2579 
2580 	procflow = threadflow->tf_process;
2581 	filebench_log(LOG_INFO,
2582 	    "Message from process (%s,%d), thread (%s,%d): %s",
2583 	    procflow->pf_name, procflow->pf_instance,
2584 	    threadflow->tf_name, threadflow->tf_instance,
2585 	    avd_get_str(flowop->fo_value));
2586 
2587 	return (FILEBENCH_OK);
2588 }
2589 
2590 /*
2591  * Prints usage information for flowop operations.
2592  */
2593 void
2594 flowoplib_usage()
2595 {
2596 	(void) fprintf(stderr,
2597 	    "flowop [openfile|createfile] name=<name>,fileset=<fname>\n");
2598 	(void) fprintf(stderr,
2599 	    "                       [,fd=<file desc num>]\n");
2600 	(void) fprintf(stderr, "\n");
2601 	(void) fprintf(stderr,
2602 	    "flowop closefile name=<name>,fd=<file desc num>]\n");
2603 	(void) fprintf(stderr, "\n");
2604 	(void) fprintf(stderr, "flowop deletefile name=<name>\n");
2605 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2606 	(void) fprintf(stderr,
2607 	    "                       [,fd=<file desc num>]\n");
2608 	(void) fprintf(stderr, "\n");
2609 	(void) fprintf(stderr, "flowop statfile name=<name>\n");
2610 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2611 	(void) fprintf(stderr,
2612 	    "                       [,fd=<file desc num>]\n");
2613 	(void) fprintf(stderr, "\n");
2614 	(void) fprintf(stderr,
2615 	    "flowop fsync name=<name>,fd=<file desc num>]\n");
2616 	(void) fprintf(stderr, "\n");
2617 	(void) fprintf(stderr,
2618 	    "flowop fsyncset name=<name>,fileset=<fname>]\n");
2619 	(void) fprintf(stderr, "\n");
2620 	(void) fprintf(stderr, "flowop [write|read|aiowrite] name=<name>, \n");
2621 	(void) fprintf(stderr,
2622 	    "                       filename|fileset=<fname>,\n");
2623 	(void) fprintf(stderr, "                       iosize=<size>\n");
2624 	(void) fprintf(stderr, "                       [,directio]\n");
2625 	(void) fprintf(stderr, "                       [,dsync]\n");
2626 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2627 	(void) fprintf(stderr, "                       [,random]\n");
2628 	(void) fprintf(stderr, "                       [,opennext]\n");
2629 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
2630 	(void) fprintf(stderr,
2631 	    "flowop [appendfile|appendfilerand] name=<name>, \n");
2632 	(void) fprintf(stderr,
2633 	    "                       filename|fileset=<fname>,\n");
2634 	(void) fprintf(stderr, "                       iosize=<size>\n");
2635 	(void) fprintf(stderr, "                       [,dsync]\n");
2636 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2637 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
2638 	(void) fprintf(stderr,
2639 	    "flowop [readwholefile|writewholefile] name=<name>, \n");
2640 	(void) fprintf(stderr,
2641 	    "                       filename|fileset=<fname>,\n");
2642 	(void) fprintf(stderr, "                       iosize=<size>\n");
2643 	(void) fprintf(stderr, "                       [,dsync]\n");
2644 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2645 	(void) fprintf(stderr, "\n");
2646 	(void) fprintf(stderr, "flowop aiowait name=<name>,target="
2647 	    "<aiowrite-flowop>\n");
2648 	(void) fprintf(stderr, "\n");
2649 	(void) fprintf(stderr, "flowop sempost name=<name>,"
2650 	    "target=<semblock-flowop>,\n");
2651 	(void) fprintf(stderr,
2652 	    "                       value=<increment-to-post>\n");
2653 	(void) fprintf(stderr, "\n");
2654 	(void) fprintf(stderr, "flowop semblock name=<name>,value="
2655 	    "<decrement-to-receive>,\n");
2656 	(void) fprintf(stderr, "                       highwater="
2657 	    "<inbound-queue-max>\n");
2658 	(void) fprintf(stderr, "\n");
2659 	(void) fprintf(stderr, "flowop block name=<name>\n");
2660 	(void) fprintf(stderr, "\n");
2661 	(void) fprintf(stderr,
2662 	    "flowop wakeup name=<name>,target=<block-flowop>,\n");
2663 	(void) fprintf(stderr, "\n");
2664 	(void) fprintf(stderr,
2665 	    "flowop hog name=<name>,value=<number-of-mem-ops>\n");
2666 	(void) fprintf(stderr,
2667 	    "flowop delay name=<name>,value=<number-of-seconds>\n");
2668 	(void) fprintf(stderr, "\n");
2669 	(void) fprintf(stderr, "flowop eventlimit name=<name>\n");
2670 	(void) fprintf(stderr, "flowop bwlimit name=<name>,value=<mb/s>\n");
2671 	(void) fprintf(stderr, "flowop iopslimit name=<name>,value=<iop/s>\n");
2672 	(void) fprintf(stderr,
2673 	    "flowop finishoncount name=<name>,value=<ops/s>\n");
2674 	(void) fprintf(stderr,
2675 	    "flowop finishonbytes name=<name>,value=<bytes>\n");
2676 	(void) fprintf(stderr, "\n");
2677 	(void) fprintf(stderr, "\n");
2678 }
2679