xref: /onnv-gate/usr/src/cmd/filebench/common/flowop_library.c (revision 7556:55f6926392fe)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  *
25  * Portions Copyright 2008 Denis Cheng
26  */
27 
28 #include "config.h"
29 
30 #include <sys/types.h>
31 #ifdef HAVE_SYS_ASYNCH_H
32 #include <sys/asynch.h>
33 #endif
34 #include <sys/ipc.h>
35 #include <sys/sem.h>
36 #include <sys/errno.h>
37 #include <sys/time.h>
38 #include <inttypes.h>
39 #include <fcntl.h>
40 #include <math.h>
41 
42 #ifdef HAVE_UTILITY_H
43 #include <utility.h>
44 #endif /* HAVE_UTILITY_H */
45 
46 #ifdef HAVE_AIO
47 #include <aio.h>
48 #endif /* HAVE_AIO */
49 
50 #ifdef HAVE_LIBAIO_H
51 #include <libaio.h>
52 #endif /* HAVE_LIBAIO_H */
53 
54 #ifdef HAVE_SYS_ASYNC_H
55 #include <sys/asynch.h>
56 #endif /* HAVE_SYS_ASYNC_H */
57 
58 #ifdef HAVE_AIO_H
59 #include <aio.h>
60 #endif /* HAVE_AIO_H */
61 
62 #ifndef HAVE_UINT_T
63 #define	uint_t unsigned int
64 #endif /* HAVE_UINT_T */
65 
66 #ifndef HAVE_AIOCB64_T
67 #define	aiocb64 aiocb
68 #endif /* HAVE_AIOCB64_T */
69 
70 #ifndef HAVE_SYSV_SEM
71 #include <semaphore.h>
72 #endif /* HAVE_SYSV_SEM */
73 
74 #include "filebench.h"
75 #include "flowop.h"
76 #include "fileset.h"
77 #include "fb_random.h"
78 
79 /*
80  * These routines implement the flowops from the f language. Each
81  * flowop has has a name such as "read", and a set of function pointers
82  * to call for initialization, execution and destruction of the flowop.
83  * The table flowoplib_funcs[] contains a flowoplib struct for each
84  * implemented flowop. Most flowops use a generic initialization function
85  * and all currently use a generic destruction function. All flowop
86  * functions referenced from the table are in this file, though, of
87  * course, they often call functions from other files.
88  *
89  * The flowop_init() routine uses the flowoplib_funcs[] table to
90  * create an initial set of "instance 0" flowops, one for each type of
91  * flowop, from which all other flowops are derived. These "instance 0"
92  * flowops are initialized with information from the table including
93  * pointers for their fo_init, fo_func and fo_destroy functions. When
94  * a flowop definition is encountered in an f language script, the
95  * "type" of flowop, such as "read" is used to search for the
96  * "instance 0" flowop named "read", then a new flowop is allocated
97  * which inherits its function pointers and other initial properties
98  * from the instance 0 flowop, and is given a new name as specified
99  * by the "name=" attribute.
100  */
101 
102 static int flowoplib_init_generic(flowop_t *flowop);
103 static void flowoplib_destruct_generic(flowop_t *flowop);
104 static void flowoplib_destruct_noop(flowop_t *flowop);
105 static int flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop);
106 static int flowoplib_print(threadflow_t *threadflow, flowop_t *flowop);
107 static int flowoplib_write(threadflow_t *threadflow, flowop_t *flowop);
108 #ifdef HAVE_AIO
109 static int flowoplib_aiowrite(threadflow_t *threadflow, flowop_t *flowop);
110 static int flowoplib_aiowait(threadflow_t *threadflow, flowop_t *flowop);
111 #endif
112 static int flowoplib_read(threadflow_t *threadflow, flowop_t *flowop);
113 static int flowoplib_block_init(flowop_t *flowop);
114 static int flowoplib_block(threadflow_t *threadflow, flowop_t *flowop);
115 static int flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop);
116 static int flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop);
117 static int flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop);
118 static int flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop);
119 static int flowoplib_sempost_init(flowop_t *flowop);
120 static int flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop);
121 static int flowoplib_semblock_init(flowop_t *flowop);
122 static void flowoplib_semblock_destruct(flowop_t *flowop);
123 static int flowoplib_eventlimit(threadflow_t *, flowop_t *flowop);
124 static int flowoplib_bwlimit(threadflow_t *, flowop_t *flowop);
125 static int flowoplib_iopslimit(threadflow_t *, flowop_t *flowop);
126 static int flowoplib_opslimit(threadflow_t *, flowop_t *flowop);
127 static int flowoplib_openfile(threadflow_t *, flowop_t *flowop);
128 static int flowoplib_openfile_common(threadflow_t *, flowop_t *flowop, int fd);
129 static int flowoplib_createfile(threadflow_t *, flowop_t *flowop);
130 static int flowoplib_closefile(threadflow_t *, flowop_t *flowop);
131 static int flowoplib_fsync(threadflow_t *, flowop_t *flowop);
132 static int flowoplib_readwholefile(threadflow_t *, flowop_t *flowop);
133 static int flowoplib_writewholefile(threadflow_t *, flowop_t *flowop);
134 static int flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop);
135 static int flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop);
136 static int flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop);
137 static int flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop);
138 static int flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop);
139 static int flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop);
140 static int flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop);
141 static int flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop);
142 static int flowoplib_testrandvar_init(flowop_t *flowop);
143 static void flowoplib_testrandvar_destruct(flowop_t *flowop);
144 
145 typedef struct flowoplib {
146 	int	fl_type;
147 	int	fl_attrs;
148 	char	*fl_name;
149 	int	(*fl_init)();
150 	int	(*fl_func)();
151 	void	(*fl_destruct)();
152 } flowoplib_t;
153 
154 static flowoplib_t flowoplib_funcs[] = {
155 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "write", flowoplib_init_generic,
156 	flowoplib_write, flowoplib_destruct_generic,
157 	FLOW_TYPE_IO, FLOW_ATTR_READ, "read", flowoplib_init_generic,
158 	flowoplib_read, flowoplib_destruct_generic,
159 #ifdef HAVE_AIO
160 	FLOW_TYPE_AIO, FLOW_ATTR_WRITE, "aiowrite", flowoplib_init_generic,
161 	flowoplib_aiowrite, flowoplib_destruct_generic,
162 	FLOW_TYPE_AIO, 0, "aiowait", flowoplib_init_generic,
163 	flowoplib_aiowait, flowoplib_destruct_generic,
164 #endif
165 	FLOW_TYPE_SYNC, 0, "block", flowoplib_block_init,
166 	flowoplib_block, flowoplib_destruct_generic,
167 	FLOW_TYPE_SYNC, 0, "wakeup", flowoplib_init_generic,
168 	flowoplib_wakeup, flowoplib_destruct_generic,
169 	FLOW_TYPE_SYNC, 0, "semblock", flowoplib_semblock_init,
170 	flowoplib_semblock, flowoplib_semblock_destruct,
171 	FLOW_TYPE_SYNC, 0, "sempost", flowoplib_sempost_init,
172 	flowoplib_sempost, flowoplib_destruct_noop,
173 	FLOW_TYPE_OTHER, 0, "hog", flowoplib_init_generic,
174 	flowoplib_hog, flowoplib_destruct_generic,
175 	FLOW_TYPE_OTHER, 0, "delay", flowoplib_init_generic,
176 	flowoplib_delay, flowoplib_destruct_generic,
177 	FLOW_TYPE_OTHER, 0, "eventlimit", flowoplib_init_generic,
178 	flowoplib_eventlimit, flowoplib_destruct_generic,
179 	FLOW_TYPE_OTHER, 0, "bwlimit", flowoplib_init_generic,
180 	flowoplib_bwlimit, flowoplib_destruct_generic,
181 	FLOW_TYPE_OTHER, 0, "iopslimit", flowoplib_init_generic,
182 	flowoplib_iopslimit, flowoplib_destruct_generic,
183 	FLOW_TYPE_OTHER, 0, "opslimit", flowoplib_init_generic,
184 	flowoplib_opslimit, flowoplib_destruct_generic,
185 	FLOW_TYPE_OTHER, 0, "finishoncount", flowoplib_init_generic,
186 	flowoplib_finishoncount, flowoplib_destruct_generic,
187 	FLOW_TYPE_OTHER, 0, "finishonbytes", flowoplib_init_generic,
188 	flowoplib_finishonbytes, flowoplib_destruct_generic,
189 	FLOW_TYPE_IO, 0, "openfile", flowoplib_init_generic,
190 	flowoplib_openfile, flowoplib_destruct_generic,
191 	FLOW_TYPE_IO, 0, "createfile", flowoplib_init_generic,
192 	flowoplib_createfile, flowoplib_destruct_generic,
193 	FLOW_TYPE_IO, 0, "closefile", flowoplib_init_generic,
194 	flowoplib_closefile, flowoplib_destruct_generic,
195 	FLOW_TYPE_IO, 0, "fsync", flowoplib_init_generic,
196 	flowoplib_fsync, flowoplib_destruct_generic,
197 	FLOW_TYPE_IO, 0, "fsyncset", flowoplib_init_generic,
198 	flowoplib_fsyncset, flowoplib_destruct_generic,
199 	FLOW_TYPE_IO, 0, "statfile", flowoplib_init_generic,
200 	flowoplib_statfile, flowoplib_destruct_generic,
201 	FLOW_TYPE_IO, FLOW_ATTR_READ, "readwholefile", flowoplib_init_generic,
202 	flowoplib_readwholefile, flowoplib_destruct_generic,
203 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfile", flowoplib_init_generic,
204 	flowoplib_appendfile, flowoplib_destruct_generic,
205 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfilerand", flowoplib_init_generic,
206 	flowoplib_appendfilerand, flowoplib_destruct_generic,
207 	FLOW_TYPE_IO, 0, "deletefile", flowoplib_init_generic,
208 	flowoplib_deletefile, flowoplib_destruct_generic,
209 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "writewholefile", flowoplib_init_generic,
210 	flowoplib_writewholefile, flowoplib_destruct_generic,
211 	FLOW_TYPE_OTHER, 0, "print", flowoplib_init_generic,
212 	flowoplib_print, flowoplib_destruct_generic,
213 	/* routine to calculate mean and stddev for output from a randvar */
214 	FLOW_TYPE_OTHER, 0, "testrandvar", flowoplib_testrandvar_init,
215 	flowoplib_testrandvar, flowoplib_testrandvar_destruct
216 };
217 
218 /*
219  * Loops through the master list of flowops defined in this
220  * module, and creates and initializes a flowop for each one
221  * by calling flowop_define. As a side effect of calling
222  * flowop define, the created flowops are placed on the
223  * master flowop list. All created flowops are set to
224  * instance "0".
225  */
226 void
227 flowoplib_init()
228 {
229 	int nops = sizeof (flowoplib_funcs) / sizeof (flowoplib_t);
230 	int i;
231 
232 	for (i = 0; i < nops; i++) {
233 		flowop_t *flowop;
234 		flowoplib_t *fl;
235 
236 		fl = &flowoplib_funcs[i];
237 
238 		if ((flowop = flowop_define(NULL,
239 		    fl->fl_name, NULL, NULL, 0, fl->fl_type)) == 0) {
240 			filebench_log(LOG_ERROR,
241 			    "failed to create flowop %s\n",
242 			    fl->fl_name);
243 			filebench_shutdown(1);
244 		}
245 
246 		flowop->fo_func = fl->fl_func;
247 		flowop->fo_init = fl->fl_init;
248 		flowop->fo_destruct = fl->fl_destruct;
249 		flowop->fo_attrs = fl->fl_attrs;
250 	}
251 }
252 
253 static int
254 flowoplib_init_generic(flowop_t *flowop)
255 {
256 	(void) ipc_mutex_unlock(&flowop->fo_lock);
257 	return (FILEBENCH_OK);
258 }
259 
260 static void
261 flowoplib_destruct_generic(flowop_t *flowop)
262 {
263 	char *buf;
264 
265 	/* release any local resources held by the flowop */
266 	(void) ipc_mutex_lock(&flowop->fo_lock);
267 	buf = flowop->fo_buf;
268 	flowop->fo_buf = NULL;
269 	(void) ipc_mutex_unlock(&flowop->fo_lock);
270 
271 	if (buf)
272 		free(buf);
273 }
274 
275 /*
276  * Special total noop destruct
277  */
278 /* ARGSUSED */
279 static void
280 flowoplib_destruct_noop(flowop_t *flowop)
281 {
282 }
283 
284 /*
285  * Generates a file attribute from flags in the supplied flowop.
286  * Sets FLOW_ATTR_DIRECTIO and/or FLOW_ATTR_DSYNC as needed.
287  */
288 static int
289 flowoplib_fileattrs(flowop_t *flowop)
290 {
291 	int attrs = 0;
292 
293 	if (avd_get_bool(flowop->fo_directio))
294 		attrs |= FLOW_ATTR_DIRECTIO;
295 
296 	if (avd_get_bool(flowop->fo_dsync))
297 		attrs |= FLOW_ATTR_DSYNC;
298 
299 	return (attrs);
300 }
301 
302 /*
303  * Searches for a file descriptor. Tries the flowop's
304  * fo_fdnumber first and returns with it if it has been
305  * explicitly set (greater than 0). It next checks to
306  * see if a rotating file descriptor policy is in effect,
307  * and if not returns the fdnumber regardless of what
308  * it is. (note that if it is 0, it just selects to the
309  * default file descriptor in the threadflow's tf_fd
310  * array). If the rotating fd policy is in effect, it
311  * cycles from the end of the tf_fd array to one location
312  * beyond the maximum needed by the number of entries in
313  * the associated fileset on each invocation, then starts
314  * over from the end.
315  *
316  * The routine returns an index into the threadflow's
317  * tf_fd table where the actual file descriptor will be
318  * found. Note: the calling routine must not call this
319  * routine if the flowop does not have a fileset, and the
320  * flowop's fo_fdnumber is zero and fo_rotatefd is
321  * asserted, or an addressing fault may occur.
322  */
323 static int
324 flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop)
325 {
326 	fbint_t	entries;
327 	int fdnumber = flowop->fo_fdnumber;
328 
329 	/* If the script sets the fd explicitly */
330 	if (fdnumber > 0)
331 		return (fdnumber);
332 
333 	/* If the flowop defaults to persistent fd */
334 	if (!avd_get_bool(flowop->fo_rotatefd))
335 		return (fdnumber);
336 
337 	if (flowop->fo_fileset == NULL) {
338 		filebench_log(LOG_ERROR, "flowop NULL file");
339 		return (FILEBENCH_ERROR);
340 	}
341 
342 	entries = flowop->fo_fileset->fs_constentries;
343 
344 	/* Rotate the fd on each flowop invocation */
345 	if (entries > (THREADFLOW_MAXFD / 2)) {
346 		filebench_log(LOG_ERROR, "Out of file descriptors in flowop %s"
347 		    " (too many files : %llu",
348 		    flowop->fo_name, (u_longlong_t)entries);
349 		return (FILEBENCH_ERROR);
350 	}
351 
352 	/* First time around */
353 	if (threadflow->tf_fdrotor == 0)
354 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
355 
356 	/* One fd for every file in the set */
357 	if (entries == (THREADFLOW_MAXFD - threadflow->tf_fdrotor))
358 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
359 
360 
361 	threadflow->tf_fdrotor--;
362 	filebench_log(LOG_DEBUG_IMPL, "selected fd = %d",
363 	    threadflow->tf_fdrotor);
364 	return (threadflow->tf_fdrotor);
365 }
366 
367 /*
368  * Determines the file descriptor to use, and attempts to open
369  * the file if it is not already open. Also determines the wss
370  * value. Returns FILEBENCH_ERROR on errors, FILESET_NORSC if
371  * if flowop_openfile_common couldn't obtain an appropriate file
372  * from a the fileset, and FILEBENCH_OK otherwise.
373  */
374 static int
375 flowoplib_filesetup(threadflow_t *threadflow, flowop_t *flowop,
376     fbint_t *wssp, int *filedescp)
377 {
378 	int fd = flowoplib_fdnum(threadflow, flowop);
379 
380 	if (fd == -1)
381 		return (FILEBENCH_ERROR);
382 
383 	if (threadflow->tf_fd[fd] == 0) {
384 		int ret;
385 
386 		if ((ret = flowoplib_openfile_common(
387 		    threadflow, flowop, fd)) != FILEBENCH_OK)
388 			return (ret);
389 
390 		if (threadflow->tf_fse[fd]) {
391 			filebench_log(LOG_DEBUG_IMPL, "opened file %s",
392 			    threadflow->tf_fse[fd]->fse_path);
393 		} else {
394 			filebench_log(LOG_DEBUG_IMPL,
395 			    "opened device %s/%s",
396 			    avd_get_str(flowop->fo_fileset->fs_path),
397 			    avd_get_str(flowop->fo_fileset->fs_name));
398 		}
399 	}
400 
401 	*filedescp = threadflow->tf_fd[fd];
402 
403 	if ((*wssp = flowop->fo_constwss) == 0) {
404 		if (threadflow->tf_fse[fd])
405 			*wssp = threadflow->tf_fse[fd]->fse_size;
406 		else
407 			*wssp = avd_get_int(flowop->fo_fileset->fs_size);
408 	}
409 
410 	return (FILEBENCH_OK);
411 }
412 
413 /*
414  * Determines the io buffer or random offset into tf_mem for
415  * the IO operation. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
416  */
417 static int
418 flowoplib_iobufsetup(threadflow_t *threadflow, flowop_t *flowop,
419     caddr_t *iobufp, fbint_t iosize)
420 {
421 	long memsize;
422 	size_t memoffset;
423 
424 	if (iosize == 0) {
425 		filebench_log(LOG_ERROR, "zero iosize for thread %s",
426 		    flowop->fo_name);
427 		return (FILEBENCH_ERROR);
428 	}
429 
430 	if ((memsize = threadflow->tf_constmemsize) != 0) {
431 
432 		/* use tf_mem for I/O with random offset */
433 		if (filebench_randomno(&memoffset,
434 		    memsize, iosize, NULL) == -1) {
435 			filebench_log(LOG_ERROR,
436 			    "tf_memsize smaller than IO size for thread %s",
437 			    flowop->fo_name);
438 			return (FILEBENCH_ERROR);
439 		}
440 		*iobufp = threadflow->tf_mem + memoffset;
441 
442 	} else {
443 		/* use private I/O buffer */
444 		if ((flowop->fo_buf != NULL) &&
445 		    (flowop->fo_buf_size < iosize)) {
446 			/* too small, so free up and re-allocate */
447 			free(flowop->fo_buf);
448 			flowop->fo_buf = NULL;
449 		}
450 
451 		/*
452 		 * Allocate memory for the  buffer. The memory is freed
453 		 * by flowop_destruct_generic() or by this routine if more
454 		 * memory is needed for the buffer.
455 		 */
456 		if ((flowop->fo_buf == NULL) && ((flowop->fo_buf
457 		    = (char *)malloc(iosize)) == NULL))
458 			return (FILEBENCH_ERROR);
459 
460 		flowop->fo_buf_size = iosize;
461 		*iobufp = flowop->fo_buf;
462 	}
463 	return (FILEBENCH_OK);
464 }
465 
466 /*
467  * Determines the file descriptor to use, opens it if necessary, the
468  * io buffer or random offset into tf_mem for IO operation and the wss
469  * value. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
470  */
471 static int
472 flowoplib_iosetup(threadflow_t *threadflow, flowop_t *flowop,
473     fbint_t *wssp, caddr_t *iobufp, int *filedescp, fbint_t iosize)
474 {
475 	int ret;
476 
477 	if ((ret = flowoplib_filesetup(threadflow, flowop, wssp, filedescp)) !=
478 	    FILEBENCH_OK)
479 		return (ret);
480 
481 	if ((ret = flowoplib_iobufsetup(threadflow, flowop, iobufp, iosize)) !=
482 	    FILEBENCH_OK)
483 		return (ret);
484 
485 	return (FILEBENCH_OK);
486 }
487 
488 /*
489  * Emulate posix read / pread. If the flowop has a fileset,
490  * a file descriptor number index is fetched, otherwise a
491  * supplied fileobj file is used. In either case the specified
492  * file will be opened if not already open. If the flowop has
493  * neither a fileset or fileobj, an error is logged and FILEBENCH_ERROR
494  * returned.
495  *
496  * The actual read is done to a random offset in the
497  * threadflow's thread memory (tf_mem), with a size set by
498  * fo_iosize and at either a random disk offset within the
499  * working set size, or at the next sequential location. If
500  * any errors are encountered, FILEBENCH_ERROR is returned,
501  * if no appropriate file can be obtained from the fileset then
502  * FILEBENCH_NORSC is returned, otherise FILEBENCH_OK is returned.
503  */
504 static int
505 flowoplib_read(threadflow_t *threadflow, flowop_t *flowop)
506 {
507 	caddr_t iobuf;
508 	fbint_t wss;
509 	fbint_t iosize;
510 	int filedesc;
511 	int ret;
512 
513 
514 	iosize = avd_get_int(flowop->fo_iosize);
515 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
516 	    &filedesc, iosize)) != FILEBENCH_OK)
517 		return (ret);
518 
519 	if (avd_get_bool(flowop->fo_random)) {
520 		uint64_t fileoffset;
521 
522 		if (filebench_randomno64(&fileoffset,
523 		    wss, iosize, NULL) == -1) {
524 			filebench_log(LOG_ERROR,
525 			    "file size smaller than IO size for thread %s",
526 			    flowop->fo_name);
527 			return (FILEBENCH_ERROR);
528 		}
529 
530 		(void) flowop_beginop(threadflow, flowop);
531 		if ((ret = pread64(filedesc, iobuf,
532 		    iosize, (off64_t)fileoffset)) == -1) {
533 			(void) flowop_endop(threadflow, flowop, 0);
534 			filebench_log(LOG_ERROR,
535 			    "read file %s failed, offset %llu "
536 			    "io buffer %zd: %s",
537 			    avd_get_str(flowop->fo_fileset->fs_name),
538 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
539 			flowop_endop(threadflow, flowop, 0);
540 			return (FILEBENCH_ERROR);
541 		}
542 		(void) flowop_endop(threadflow, flowop, ret);
543 
544 		if ((ret == 0))
545 			(void) lseek64(filedesc, 0, SEEK_SET);
546 
547 	} else {
548 		(void) flowop_beginop(threadflow, flowop);
549 		if ((ret = read(filedesc, iobuf, iosize)) == -1) {
550 			(void) flowop_endop(threadflow, flowop, 0);
551 			filebench_log(LOG_ERROR,
552 			    "read file %s failed, io buffer %zd: %s",
553 			    avd_get_str(flowop->fo_fileset->fs_name),
554 			    iobuf, strerror(errno));
555 			(void) flowop_endop(threadflow, flowop, 0);
556 			return (FILEBENCH_ERROR);
557 		}
558 		(void) flowop_endop(threadflow, flowop, ret);
559 
560 		if ((ret == 0))
561 			(void) lseek64(filedesc, 0, SEEK_SET);
562 	}
563 
564 	return (FILEBENCH_OK);
565 }
566 
567 #ifdef HAVE_AIO
568 
569 /*
570  * Asynchronous write section. An Asynchronous IO element
571  * (aiolist_t) is used to associate the asynchronous write request with
572  * its subsequent completion. This element includes a aiocb64 struct
573  * that is used by posix aio_xxx calls to track the asynchronous writes.
574  * The flowops aiowrite and aiowait result in calls to these posix
575  * aio_xxx system routines to do the actual asynchronous write IO
576  * operations.
577  */
578 
579 
580 /*
581  * Allocates an asynchronous I/O list (aio, of type
582  * aiolist_t) element. Adds it to the flowop thread's
583  * threadflow aio list. Returns a pointer to the element.
584  */
585 static aiolist_t *
586 aio_allocate(flowop_t *flowop)
587 {
588 	aiolist_t *aiolist;
589 
590 	if ((aiolist = malloc(sizeof (aiolist_t))) == NULL) {
591 		filebench_log(LOG_ERROR, "malloc aiolist failed");
592 		filebench_shutdown(1);
593 	}
594 
595 	/* Add to list */
596 	if (flowop->fo_thread->tf_aiolist == NULL) {
597 		flowop->fo_thread->tf_aiolist = aiolist;
598 		aiolist->al_next = NULL;
599 	} else {
600 		aiolist->al_next = flowop->fo_thread->tf_aiolist;
601 		flowop->fo_thread->tf_aiolist = aiolist;
602 	}
603 	return (aiolist);
604 }
605 
606 /*
607  * Searches for the aiolist element that has a matching
608  * completion block, aiocb. If none found returns FILEBENCH_ERROR. If
609  * found, removes the aiolist element from flowop thread's
610  * list and returns FILEBENCH_OK.
611  */
612 static int
613 aio_deallocate(flowop_t *flowop, struct aiocb64 *aiocb)
614 {
615 	aiolist_t *aiolist = flowop->fo_thread->tf_aiolist;
616 	aiolist_t *previous = NULL;
617 	aiolist_t *match = NULL;
618 
619 	if (aiocb == NULL) {
620 		filebench_log(LOG_ERROR, "null aiocb deallocate");
621 		return (FILEBENCH_OK);
622 	}
623 
624 	while (aiolist) {
625 		if (aiocb == &(aiolist->al_aiocb)) {
626 			match = aiolist;
627 			break;
628 		}
629 		previous = aiolist;
630 		aiolist = aiolist->al_next;
631 	}
632 
633 	if (match == NULL)
634 		return (FILEBENCH_ERROR);
635 
636 	/* Remove from the list */
637 	if (previous)
638 		previous->al_next = match->al_next;
639 	else
640 		flowop->fo_thread->tf_aiolist = match->al_next;
641 
642 	return (FILEBENCH_OK);
643 }
644 
645 /*
646  * Emulate posix aiowrite(). Determines which file to use,
647  * either one file of a fileset, or the file associated
648  * with a fileobj, allocates and fills an aiolist_t element
649  * for the write, and issues the asynchronous write. This
650  * operation is only valid for random IO, and returns an
651  * error if the flowop is set for sequential IO. Returns
652  * FILEBENCH_OK on success, FILEBENCH_NORSC if iosetup can't
653  * obtain a file to open, and FILEBENCH_ERROR on any
654  * encountered error.
655  */
656 static int
657 flowoplib_aiowrite(threadflow_t *threadflow, flowop_t *flowop)
658 {
659 	caddr_t iobuf;
660 	fbint_t wss;
661 	fbint_t iosize;
662 	int filedesc;
663 	int ret;
664 
665 	iosize = avd_get_int(flowop->fo_iosize);
666 
667 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
668 	    &filedesc, iosize)) != FILEBENCH_OK)
669 		return (ret);
670 
671 	if (avd_get_bool(flowop->fo_random)) {
672 		uint64_t fileoffset;
673 		struct aiocb64 *aiocb;
674 		aiolist_t *aiolist;
675 
676 		if (filebench_randomno64(&fileoffset,
677 		    wss, iosize, NULL) == -1) {
678 			filebench_log(LOG_ERROR,
679 			    "file size smaller than IO size for thread %s",
680 			    flowop->fo_name);
681 			return (FILEBENCH_ERROR);
682 		}
683 
684 		aiolist = aio_allocate(flowop);
685 		aiolist->al_type = AL_WRITE;
686 		aiocb = &aiolist->al_aiocb;
687 
688 		aiocb->aio_fildes = filedesc;
689 		aiocb->aio_buf = iobuf;
690 		aiocb->aio_nbytes = (size_t)iosize;
691 		aiocb->aio_offset = (off64_t)fileoffset;
692 		aiocb->aio_reqprio = 0;
693 
694 		filebench_log(LOG_DEBUG_IMPL,
695 		    "aio fd=%d, bytes=%llu, offset=%llu",
696 		    filedesc, (u_longlong_t)iosize, (u_longlong_t)fileoffset);
697 
698 		flowop_beginop(threadflow, flowop);
699 		if (aio_write64(aiocb) < 0) {
700 			filebench_log(LOG_ERROR, "aiowrite failed: %s",
701 			    strerror(errno));
702 			filebench_shutdown(1);
703 		}
704 		flowop_endop(threadflow, flowop, iosize);
705 	} else {
706 		return (FILEBENCH_ERROR);
707 	}
708 
709 	return (FILEBENCH_OK);
710 }
711 
712 
713 
714 #define	MAXREAP 4096
715 
716 /*
717  * Emulate posix aiowait(). Waits for the completion of half the
718  * outstanding asynchronous IOs, or a single IO, which ever is
719  * larger. The routine will return after a sufficient number of
720  * completed calls issued by any thread in the procflow have
721  * completed, or a 1 second timout elapses. All completed
722  * IO operations are deleted from the thread's aiolist.
723  */
724 static int
725 flowoplib_aiowait(threadflow_t *threadflow, flowop_t *flowop)
726 {
727 	struct aiocb64 **worklist;
728 	aiolist_t *aio = flowop->fo_thread->tf_aiolist;
729 	int uncompleted = 0;
730 
731 	worklist = calloc(MAXREAP, sizeof (struct aiocb64 *));
732 
733 	/* Count the list of pending aios */
734 	while (aio) {
735 		uncompleted++;
736 		aio = aio->al_next;
737 	}
738 
739 	do {
740 		uint_t ncompleted = 0;
741 		uint_t todo;
742 		struct timespec timeout;
743 		int inprogress;
744 		int i;
745 
746 		/* Wait for half of the outstanding requests */
747 		timeout.tv_sec = 1;
748 		timeout.tv_nsec = 0;
749 
750 		if (uncompleted > MAXREAP)
751 			todo = MAXREAP;
752 		else
753 			todo = uncompleted / 2;
754 
755 		if (todo == 0)
756 			todo = 1;
757 
758 		flowop_beginop(threadflow, flowop);
759 
760 #ifdef HAVE_AIOWAITN
761 		if ((aio_waitn64((struct aiocb64 **)worklist,
762 		    MAXREAP, &todo, &timeout) == -1) &&
763 		    errno && (errno != ETIME)) {
764 			filebench_log(LOG_ERROR,
765 			    "aiowait failed: %s, outstanding = %d, "
766 			    "ncompleted = %d ",
767 			    strerror(errno), uncompleted, todo);
768 		}
769 
770 		ncompleted = todo;
771 		/* Take the  completed I/Os from the list */
772 		inprogress = 0;
773 		for (i = 0; i < ncompleted; i++) {
774 			if ((aio_return64(worklist[i]) == -1) &&
775 			    (errno == EINPROGRESS)) {
776 				inprogress++;
777 				continue;
778 			}
779 			if (aio_deallocate(flowop, worklist[i]) < 0) {
780 				filebench_log(LOG_ERROR, "Could not remove "
781 				    "aio from list ");
782 				flowop_endop(threadflow, flowop, 0);
783 				return (FILEBENCH_ERROR);
784 			}
785 		}
786 
787 		uncompleted -= ncompleted;
788 		uncompleted += inprogress;
789 
790 #else
791 
792 		for (ncompleted = 0, inprogress = 0,
793 		    aio = flowop->fo_thread->tf_aiolist;
794 		    ncompleted < todo, aio != NULL; aio = aio->al_next) {
795 			int result = aio_error64(&aio->al_aiocb);
796 
797 			if (result == EINPROGRESS) {
798 				inprogress++;
799 				continue;
800 			}
801 
802 			if ((aio_return64(&aio->al_aiocb) == -1) || result) {
803 				filebench_log(LOG_ERROR, "aio failed: %s",
804 				    strerror(result));
805 				continue;
806 			}
807 
808 			ncompleted++;
809 
810 			if (aio_deallocate(flowop, &aio->al_aiocb) < 0) {
811 				filebench_log(LOG_ERROR, "Could not remove aio "
812 				    "from list ");
813 				flowop_endop(threadflow, flowop, 0);
814 				return (FILEBENCH_ERROR);
815 			}
816 		}
817 
818 		uncompleted -= ncompleted;
819 
820 #endif
821 		filebench_log(LOG_DEBUG_SCRIPT,
822 		    "aio2 completed %d ios, uncompleted = %d, inprogress = %d",
823 		    ncompleted, uncompleted, inprogress);
824 
825 	} while (uncompleted > MAXREAP);
826 
827 	flowop_endop(threadflow, flowop, 0);
828 
829 	free(worklist);
830 
831 	return (FILEBENCH_OK);
832 }
833 
834 #endif /* HAVE_AIO */
835 
836 /*
837  * Initializes a "flowop_block" flowop. Specifically, it
838  * initializes the flowop's fo_cv and unlocks the fo_lock.
839  */
840 static int
841 flowoplib_block_init(flowop_t *flowop)
842 {
843 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d block init address %zx",
844 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
845 	(void) pthread_cond_init(&flowop->fo_cv, ipc_condattr());
846 	(void) ipc_mutex_unlock(&flowop->fo_lock);
847 
848 	return (FILEBENCH_OK);
849 }
850 
851 /*
852  * Blocks the threadflow until woken up by flowoplib_wakeup.
853  * The routine blocks on the flowop's fo_cv condition variable.
854  */
855 static int
856 flowoplib_block(threadflow_t *threadflow, flowop_t *flowop)
857 {
858 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d blocking at address %zx",
859 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
860 	(void) ipc_mutex_lock(&flowop->fo_lock);
861 
862 	flowop_beginop(threadflow, flowop);
863 	(void) pthread_cond_wait(&flowop->fo_cv, &flowop->fo_lock);
864 	flowop_endop(threadflow, flowop, 0);
865 
866 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
867 	    flowop->fo_name, flowop->fo_instance);
868 
869 	(void) ipc_mutex_unlock(&flowop->fo_lock);
870 
871 	return (FILEBENCH_OK);
872 }
873 
874 /*
875  * Wakes up one or more target blocking flowops.
876  * Sends broadcasts on the fo_cv condition variables of all
877  * flowops on the target list, except those that are
878  * FLOW_MASTER flowops. The target list consists of all
879  * flowops whose name matches this flowop's "fo_targetname"
880  * attribute. The target list is generated on the first
881  * invocation, and the run will be shutdown if no targets
882  * are found. Otherwise the routine always returns FILEBENCH_OK.
883  */
884 static int
885 flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop)
886 {
887 	flowop_t *target;
888 
889 	/* if this is the first wakeup, create the wakeup list */
890 	if (flowop->fo_targets == NULL) {
891 		flowop_t *result = flowop_find(flowop->fo_targetname);
892 
893 		flowop->fo_targets = result;
894 		if (result == NULL) {
895 			filebench_log(LOG_ERROR,
896 			    "wakeup: could not find op %s for thread %s",
897 			    flowop->fo_targetname,
898 			    threadflow->tf_name);
899 			filebench_shutdown(1);
900 		}
901 		while (result) {
902 			result->fo_targetnext =
903 			    result->fo_resultnext;
904 			result = result->fo_resultnext;
905 		}
906 	}
907 
908 	target = flowop->fo_targets;
909 
910 	/* wakeup the targets */
911 	while (target) {
912 		if (target->fo_instance == FLOW_MASTER) {
913 			target = target->fo_targetnext;
914 			continue;
915 		}
916 		filebench_log(LOG_DEBUG_IMPL,
917 		    "wakeup flow %s-%d at address %zx",
918 		    target->fo_name,
919 		    target->fo_instance,
920 		    &target->fo_cv);
921 
922 		flowop_beginop(threadflow, flowop);
923 		(void) ipc_mutex_lock(&target->fo_lock);
924 		(void) pthread_cond_broadcast(&target->fo_cv);
925 		(void) ipc_mutex_unlock(&target->fo_lock);
926 		flowop_endop(threadflow, flowop, 0);
927 
928 		target = target->fo_targetnext;
929 	}
930 
931 	return (FILEBENCH_OK);
932 }
933 
934 /*
935  * "think time" routines. the "hog" routine consumes cpu cycles as
936  * it "thinks", while the "delay" flowop simply calls sleep() to delay
937  * for a given number of seconds without consuming cpu cycles.
938  */
939 
940 
941 /*
942  * Consumes CPU cycles and memory bandwidth by looping for
943  * flowop->fo_value times. With each loop sets memory location
944  * threadflow->tf_mem to 1.
945  */
946 static int
947 flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop)
948 {
949 	uint64_t value = avd_get_int(flowop->fo_value);
950 	int i;
951 
952 	filebench_log(LOG_DEBUG_IMPL, "hog enter");
953 	flowop_beginop(threadflow, flowop);
954 	if (threadflow->tf_mem != NULL) {
955 		for (i = 0; i < value; i++)
956 			*(threadflow->tf_mem) = 1;
957 	}
958 	flowop_endop(threadflow, flowop, 0);
959 	filebench_log(LOG_DEBUG_IMPL, "hog exit");
960 	return (FILEBENCH_OK);
961 }
962 
963 
964 /*
965  * Delays for fo_value seconds.
966  */
967 static int
968 flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop)
969 {
970 	int value = avd_get_int(flowop->fo_value);
971 
972 	flowop_beginop(threadflow, flowop);
973 	(void) sleep(value);
974 	flowop_endop(threadflow, flowop, 0);
975 	return (FILEBENCH_OK);
976 }
977 
978 /*
979  * Rate limiting routines. This is the event consuming half of the
980  * event system. Each of the four following routines will limit the rate
981  * to one unit of either calls, issued I/O operations, issued filebench
982  * operations, or I/O bandwidth. Since there is only one event generator,
983  * the events will be divided amoung multiple instances of an event
984  * consumer, and further divided among different consumers if more than
985  * one has been defined. There is no mechanism to enforce equal sharing
986  * of events.
987  */
988 
989 /*
990  * Completes one invocation per posted event. If eventgen_q
991  * has an event count greater than zero, one will be removed
992  * (count decremented), otherwise the calling thread will
993  * block until another event has been posted. Always returns 0
994  */
995 static int
996 flowoplib_eventlimit(threadflow_t *threadflow, flowop_t *flowop)
997 {
998 	/* Immediately bail if not set/enabled */
999 	if (filebench_shm->shm_eventgen_hz == 0)
1000 		return (FILEBENCH_OK);
1001 
1002 	if (flowop->fo_initted == 0) {
1003 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1004 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1005 		flowop->fo_initted = 1;
1006 	}
1007 
1008 	flowop_beginop(threadflow, flowop);
1009 	while (filebench_shm->shm_eventgen_hz) {
1010 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1011 		if (filebench_shm->shm_eventgen_q > 0) {
1012 			filebench_shm->shm_eventgen_q--;
1013 			(void) ipc_mutex_unlock(
1014 			    &filebench_shm->shm_eventgen_lock);
1015 			break;
1016 		}
1017 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1018 		    &filebench_shm->shm_eventgen_lock);
1019 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1020 	}
1021 	flowop_endop(threadflow, flowop, 0);
1022 	return (FILEBENCH_OK);
1023 }
1024 
1025 static int
1026 flowoplib_event_find_target(threadflow_t *threadflow, flowop_t *flowop)
1027 {
1028 	if (flowop->fo_targetname[0] != '\0') {
1029 
1030 		/* Try to use statistics from specific flowop */
1031 		flowop->fo_targets =
1032 		    flowop_find_from_list(flowop->fo_targetname,
1033 		    threadflow->tf_thrd_fops);
1034 		if (flowop->fo_targets == NULL) {
1035 			filebench_log(LOG_ERROR,
1036 			    "limit target: could not find flowop %s",
1037 			    flowop->fo_targetname);
1038 			filebench_shutdown(1);
1039 			return (FILEBENCH_ERROR);
1040 		}
1041 	} else {
1042 		/* use total workload statistics */
1043 		flowop->fo_targets = NULL;
1044 	}
1045 	return (FILEBENCH_OK);
1046 }
1047 
1048 /*
1049  * Blocks the calling thread if the number of issued I/O
1050  * operations exceeds the number of posted events, thus
1051  * limiting the average I/O operation rate to the rate
1052  * specified by eventgen_hz. Always returns FILEBENCH_OK.
1053  */
1054 static int
1055 flowoplib_iopslimit(threadflow_t *threadflow, flowop_t *flowop)
1056 {
1057 	uint64_t iops;
1058 	uint64_t delta;
1059 	uint64_t events;
1060 
1061 	/* Immediately bail if not set/enabled */
1062 	if (filebench_shm->shm_eventgen_hz == 0)
1063 		return (FILEBENCH_OK);
1064 
1065 	if (flowop->fo_initted == 0) {
1066 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1067 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1068 		flowop->fo_initted = 1;
1069 
1070 		if (flowoplib_event_find_target(threadflow, flowop)
1071 		    == FILEBENCH_ERROR)
1072 			return (FILEBENCH_ERROR);
1073 
1074 		if (flowop->fo_targets && ((flowop->fo_targets->fo_attrs &
1075 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
1076 			filebench_log(LOG_ERROR,
1077 			    "WARNING: Flowop %s does no IO",
1078 			    flowop->fo_targets->fo_name);
1079 			filebench_shutdown(1);
1080 			return (FILEBENCH_ERROR);
1081 		}
1082 	}
1083 
1084 	if (flowop->fo_targets) {
1085 		/*
1086 		 * Note that fs_count is already the sum of fs_rcount
1087 		 * and fs_wcount if looking at a single flowop.
1088 		 */
1089 		iops = flowop->fo_targets->fo_stats.fs_count;
1090 	} else {
1091 		(void) ipc_mutex_lock(&controlstats_lock);
1092 		iops = (controlstats.fs_rcount +
1093 		    controlstats.fs_wcount);
1094 		(void) ipc_mutex_unlock(&controlstats_lock);
1095 	}
1096 
1097 	/* Is this the first time around */
1098 	if (flowop->fo_tputlast == 0) {
1099 		flowop->fo_tputlast = iops;
1100 		return (FILEBENCH_OK);
1101 	}
1102 
1103 	delta = iops - flowop->fo_tputlast;
1104 	flowop->fo_tputbucket -= delta;
1105 	flowop->fo_tputlast = iops;
1106 
1107 	/* No need to block if the q isn't empty */
1108 	if (flowop->fo_tputbucket >= 0LL) {
1109 		flowop_endop(threadflow, flowop, 0);
1110 		return (FILEBENCH_OK);
1111 	}
1112 
1113 	iops = flowop->fo_tputbucket * -1;
1114 	events = iops;
1115 
1116 	flowop_beginop(threadflow, flowop);
1117 	while (filebench_shm->shm_eventgen_hz) {
1118 
1119 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1120 		if (filebench_shm->shm_eventgen_q >= events) {
1121 			filebench_shm->shm_eventgen_q -= events;
1122 			(void) ipc_mutex_unlock(
1123 			    &filebench_shm->shm_eventgen_lock);
1124 			flowop->fo_tputbucket += events;
1125 			break;
1126 		}
1127 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1128 		    &filebench_shm->shm_eventgen_lock);
1129 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1130 	}
1131 	flowop_endop(threadflow, flowop, 0);
1132 
1133 	return (FILEBENCH_OK);
1134 }
1135 
1136 /*
1137  * Blocks the calling thread if the number of issued filebench
1138  * operations exceeds the number of posted events, thus limiting
1139  * the average filebench operation rate to the rate specified by
1140  * eventgen_hz. Always returns FILEBENCH_OK.
1141  */
1142 static int
1143 flowoplib_opslimit(threadflow_t *threadflow, flowop_t *flowop)
1144 {
1145 	uint64_t ops;
1146 	uint64_t delta;
1147 	uint64_t events;
1148 
1149 	/* Immediately bail if not set/enabled */
1150 	if (filebench_shm->shm_eventgen_hz == 0)
1151 		return (FILEBENCH_OK);
1152 
1153 	if (flowop->fo_initted == 0) {
1154 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1155 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1156 		flowop->fo_initted = 1;
1157 
1158 		if (flowoplib_event_find_target(threadflow, flowop)
1159 		    == FILEBENCH_ERROR)
1160 			return (FILEBENCH_ERROR);
1161 	}
1162 
1163 	if (flowop->fo_targets) {
1164 		ops = flowop->fo_targets->fo_stats.fs_count;
1165 	} else {
1166 		(void) ipc_mutex_lock(&controlstats_lock);
1167 		ops = controlstats.fs_count;
1168 		(void) ipc_mutex_unlock(&controlstats_lock);
1169 	}
1170 
1171 	/* Is this the first time around */
1172 	if (flowop->fo_tputlast == 0) {
1173 		flowop->fo_tputlast = ops;
1174 		return (FILEBENCH_OK);
1175 	}
1176 
1177 	delta = ops - flowop->fo_tputlast;
1178 	flowop->fo_tputbucket -= delta;
1179 	flowop->fo_tputlast = ops;
1180 
1181 	/* No need to block if the q isn't empty */
1182 	if (flowop->fo_tputbucket >= 0LL) {
1183 		flowop_endop(threadflow, flowop, 0);
1184 		return (FILEBENCH_OK);
1185 	}
1186 
1187 	ops = flowop->fo_tputbucket * -1;
1188 	events = ops;
1189 
1190 	flowop_beginop(threadflow, flowop);
1191 	while (filebench_shm->shm_eventgen_hz) {
1192 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1193 		if (filebench_shm->shm_eventgen_q >= events) {
1194 			filebench_shm->shm_eventgen_q -= events;
1195 			(void) ipc_mutex_unlock(
1196 			    &filebench_shm->shm_eventgen_lock);
1197 			flowop->fo_tputbucket += events;
1198 			break;
1199 		}
1200 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1201 		    &filebench_shm->shm_eventgen_lock);
1202 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1203 	}
1204 	flowop_endop(threadflow, flowop, 0);
1205 
1206 	return (FILEBENCH_OK);
1207 }
1208 
1209 
1210 /*
1211  * Blocks the calling thread if the number of bytes of I/O
1212  * issued exceeds one megabyte times the number of posted
1213  * events, thus limiting the average I/O byte rate to one
1214  * megabyte times the event rate as set by eventgen_hz.
1215  * Always retuns FILEBENCH_OK.
1216  */
1217 static int
1218 flowoplib_bwlimit(threadflow_t *threadflow, flowop_t *flowop)
1219 {
1220 	uint64_t bytes;
1221 	uint64_t delta;
1222 	uint64_t events;
1223 
1224 	/* Immediately bail if not set/enabled */
1225 	if (filebench_shm->shm_eventgen_hz == 0)
1226 		return (FILEBENCH_OK);
1227 
1228 	if (flowop->fo_initted == 0) {
1229 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1230 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1231 		flowop->fo_initted = 1;
1232 
1233 		if (flowoplib_event_find_target(threadflow, flowop)
1234 		    == FILEBENCH_ERROR)
1235 			return (FILEBENCH_ERROR);
1236 
1237 		if ((flowop->fo_targets) &&
1238 		    ((flowop->fo_targets->fo_attrs &
1239 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
1240 			filebench_log(LOG_ERROR,
1241 			    "WARNING: Flowop %s does no Reads or Writes",
1242 			    flowop->fo_targets->fo_name);
1243 			filebench_shutdown(1);
1244 			return (FILEBENCH_ERROR);
1245 		}
1246 	}
1247 
1248 	if (flowop->fo_targets) {
1249 		/*
1250 		 * Note that fs_bytes is already the sum of fs_rbytes
1251 		 * and fs_wbytes if looking at a single flowop.
1252 		 */
1253 		bytes = flowop->fo_targets->fo_stats.fs_bytes;
1254 	} else {
1255 		(void) ipc_mutex_lock(&controlstats_lock);
1256 		bytes = (controlstats.fs_rbytes +
1257 		    controlstats.fs_wbytes);
1258 		(void) ipc_mutex_unlock(&controlstats_lock);
1259 	}
1260 
1261 	/* Is this the first time around? */
1262 	if (flowop->fo_tputlast == 0) {
1263 		flowop->fo_tputlast = bytes;
1264 		return (FILEBENCH_OK);
1265 	}
1266 
1267 	delta = bytes - flowop->fo_tputlast;
1268 	flowop->fo_tputbucket -= delta;
1269 	flowop->fo_tputlast = bytes;
1270 
1271 	/* No need to block if the q isn't empty */
1272 	if (flowop->fo_tputbucket >= 0LL) {
1273 		flowop_endop(threadflow, flowop, 0);
1274 		return (FILEBENCH_OK);
1275 	}
1276 
1277 	bytes = flowop->fo_tputbucket * -1;
1278 	events = (bytes / MB) + 1;
1279 
1280 	filebench_log(LOG_DEBUG_IMPL, "%llu bytes, %llu events",
1281 	    (u_longlong_t)bytes, (u_longlong_t)events);
1282 
1283 	flowop_beginop(threadflow, flowop);
1284 	while (filebench_shm->shm_eventgen_hz) {
1285 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1286 		if (filebench_shm->shm_eventgen_q >= events) {
1287 			filebench_shm->shm_eventgen_q -= events;
1288 			(void) ipc_mutex_unlock(
1289 			    &filebench_shm->shm_eventgen_lock);
1290 			flowop->fo_tputbucket += (events * MB);
1291 			break;
1292 		}
1293 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1294 		    &filebench_shm->shm_eventgen_lock);
1295 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1296 	}
1297 	flowop_endop(threadflow, flowop, 0);
1298 
1299 	return (FILEBENCH_OK);
1300 }
1301 
1302 /*
1303  * These flowops terminate a benchmark run when either the specified
1304  * number of bytes of I/O (flowoplib_finishonbytes) or the specified
1305  * number of I/O operations (flowoplib_finishoncount) have been generated.
1306  */
1307 
1308 
1309 /*
1310  * Stop filebench run when specified number of I/O bytes have been
1311  * transferred. Compares controlstats.fs_bytes with flowop->value,
1312  * and if greater returns 1, stopping the run, if not, returns 0
1313  * to continue running.
1314  */
1315 static int
1316 flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop)
1317 {
1318 	uint64_t bytes_io;		/* Bytes of I/O delivered so far */
1319 	uint64_t byte_lim = flowop->fo_constvalue;  /* Total Bytes desired */
1320 						    /* Uses constant value */
1321 
1322 	if (flowop->fo_initted == 0) {
1323 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1324 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1325 		flowop->fo_initted = 1;
1326 
1327 		if (flowoplib_event_find_target(threadflow, flowop)
1328 		    == FILEBENCH_ERROR)
1329 			return (FILEBENCH_ERROR);
1330 
1331 		if ((flowop->fo_targets) &&
1332 		    ((flowop->fo_targets->fo_attrs &
1333 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
1334 			filebench_log(LOG_ERROR,
1335 			    "WARNING: Flowop %s does no Reads or Writes",
1336 			    flowop->fo_targets->fo_name);
1337 			filebench_shutdown(1);
1338 			return (FILEBENCH_ERROR);
1339 		}
1340 	}
1341 
1342 	if (flowop->fo_targets) {
1343 		bytes_io = flowop->fo_targets->fo_stats.fs_bytes;
1344 	} else {
1345 		(void) ipc_mutex_lock(&controlstats_lock);
1346 		bytes_io = controlstats.fs_bytes;
1347 		(void) ipc_mutex_unlock(&controlstats_lock);
1348 	}
1349 
1350 	flowop_beginop(threadflow, flowop);
1351 	if (bytes_io > byte_lim) {
1352 		flowop_endop(threadflow, flowop, 0);
1353 		return (FILEBENCH_DONE);
1354 	}
1355 	flowop_endop(threadflow, flowop, 0);
1356 
1357 	return (FILEBENCH_OK);
1358 }
1359 
1360 /*
1361  * Stop filebench run when specified number of I/O operations have
1362  * been performed. Compares controlstats.fs_count with *flowop->value,
1363  * and if greater returns 1, stopping the run, if not, returns FILEBENCH_OK
1364  * to continue running.
1365  */
1366 static int
1367 flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop)
1368 {
1369 	uint64_t ops;
1370 	uint64_t count = flowop->fo_constvalue; /* use constant value */
1371 
1372 	if (flowop->fo_initted == 0) {
1373 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1374 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1375 		flowop->fo_initted = 1;
1376 
1377 		if (flowoplib_event_find_target(threadflow, flowop)
1378 		    == FILEBENCH_ERROR)
1379 			return (FILEBENCH_ERROR);
1380 	}
1381 
1382 	if (flowop->fo_targets) {
1383 		ops = flowop->fo_targets->fo_stats.fs_count;
1384 	} else {
1385 		(void) ipc_mutex_lock(&controlstats_lock);
1386 		ops = controlstats.fs_count;
1387 		(void) ipc_mutex_unlock(&controlstats_lock);
1388 	}
1389 
1390 	flowop_beginop(threadflow, flowop);
1391 	if (ops >= count) {
1392 		flowop_endop(threadflow, flowop, 0);
1393 		return (FILEBENCH_DONE);
1394 	}
1395 	flowop_endop(threadflow, flowop, 0);
1396 
1397 	return (FILEBENCH_OK);
1398 }
1399 
1400 /*
1401  * Semaphore synchronization using either System V semaphores or
1402  * posix semaphores. If System V semaphores are available, they will be
1403  * used, otherwise posix semaphores will be used.
1404  */
1405 
1406 
1407 /*
1408  * Initializes the filebench "block on semaphore" flowop.
1409  * If System V semaphores are implemented, the routine
1410  * initializes the System V semaphore subsystem if it hasn't
1411  * already been initialized, also allocates a pair of semids
1412  * and initializes the highwater System V semaphore.
1413  * If no System V semaphores, then does nothing special.
1414  * Returns FILEBENCH_ERROR if it cannot acquire a set of System V semphores
1415  * or if the initial post to the semaphore set fails. Returns FILEBENCH_OK
1416  * on success.
1417  */
1418 static int
1419 flowoplib_semblock_init(flowop_t *flowop)
1420 {
1421 
1422 #ifdef HAVE_SYSV_SEM
1423 	int sys_semid;
1424 	struct sembuf sbuf[2];
1425 	int highwater;
1426 
1427 	ipc_seminit();
1428 
1429 	flowop->fo_semid_lw = ipc_semidalloc();
1430 	flowop->fo_semid_hw = ipc_semidalloc();
1431 
1432 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d semblock init semid=%x",
1433 	    flowop->fo_name, flowop->fo_instance, flowop->fo_semid_lw);
1434 
1435 	sys_semid = filebench_shm->shm_sys_semid;
1436 
1437 	if ((highwater = flowop->fo_semid_hw) == 0)
1438 		highwater = flowop->fo_constvalue; /* use constant value */
1439 
1440 	filebench_log(LOG_DEBUG_IMPL, "setting highwater to : %d", highwater);
1441 
1442 	sbuf[0].sem_num = (short)highwater;
1443 	sbuf[0].sem_op = avd_get_int(flowop->fo_highwater);
1444 	sbuf[0].sem_flg = 0;
1445 	if ((semop(sys_semid, &sbuf[0], 1) == -1) && errno) {
1446 		filebench_log(LOG_ERROR, "semblock init post failed: %s (%d,"
1447 		    "%d)", strerror(errno), sbuf[0].sem_num, sbuf[0].sem_op);
1448 		return (FILEBENCH_ERROR);
1449 	}
1450 #else
1451 	filebench_log(LOG_DEBUG_IMPL,
1452 	    "flow %s-%d semblock init with posix semaphore",
1453 	    flowop->fo_name, flowop->fo_instance);
1454 
1455 	sem_init(&flowop->fo_sem, 1, 0);
1456 #endif	/* HAVE_SYSV_SEM */
1457 
1458 	if (!(avd_get_bool(flowop->fo_blocking)))
1459 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1460 
1461 	return (FILEBENCH_OK);
1462 }
1463 
1464 /*
1465  * Releases the semids for the System V semaphore allocated
1466  * to this flowop. If not using System V semaphores, then
1467  * it is effectively just a no-op.
1468  */
1469 static void
1470 flowoplib_semblock_destruct(flowop_t *flowop)
1471 {
1472 #ifdef HAVE_SYSV_SEM
1473 	ipc_semidfree(flowop->fo_semid_lw);
1474 	ipc_semidfree(flowop->fo_semid_hw);
1475 #else
1476 	sem_destroy(&flowop->fo_sem);
1477 #endif /* HAVE_SYSV_SEM */
1478 }
1479 
1480 /*
1481  * Attempts to pass a System V or posix semaphore as appropriate,
1482  * and blocks if necessary. Returns FILEBENCH_ERROR if a set of System V
1483  * semphores is not available or cannot be acquired, or if the initial
1484  * post to the semaphore set fails. Returns FILEBENCH_OK on success.
1485  */
1486 static int
1487 flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop)
1488 {
1489 
1490 #ifdef HAVE_SYSV_SEM
1491 	struct sembuf sbuf[2];
1492 	int value = avd_get_int(flowop->fo_value);
1493 	int sys_semid;
1494 	struct timespec timeout;
1495 
1496 	sys_semid = filebench_shm->shm_sys_semid;
1497 
1498 	filebench_log(LOG_DEBUG_IMPL,
1499 	    "flow %s-%d sem blocking on id %x num %x value %d",
1500 	    flowop->fo_name, flowop->fo_instance, sys_semid,
1501 	    flowop->fo_semid_hw, value);
1502 
1503 	/* Post, decrement the increment the hw queue */
1504 	sbuf[0].sem_num = flowop->fo_semid_hw;
1505 	sbuf[0].sem_op = (short)value;
1506 	sbuf[0].sem_flg = 0;
1507 	sbuf[1].sem_num = flowop->fo_semid_lw;
1508 	sbuf[1].sem_op = value * -1;
1509 	sbuf[1].sem_flg = 0;
1510 	timeout.tv_sec = 600;
1511 	timeout.tv_nsec = 0;
1512 
1513 	if (avd_get_bool(flowop->fo_blocking))
1514 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1515 
1516 	flowop_beginop(threadflow, flowop);
1517 
1518 #ifdef HAVE_SEMTIMEDOP
1519 	(void) semtimedop(sys_semid, &sbuf[0], 1, &timeout);
1520 	(void) semtimedop(sys_semid, &sbuf[1], 1, &timeout);
1521 #else
1522 	(void) semop(sys_semid, &sbuf[0], 1);
1523 	(void) semop(sys_semid, &sbuf[1], 1);
1524 #endif /* HAVE_SEMTIMEDOP */
1525 
1526 	if (avd_get_bool(flowop->fo_blocking))
1527 		(void) ipc_mutex_lock(&flowop->fo_lock);
1528 
1529 	flowop_endop(threadflow, flowop, 0);
1530 
1531 #else
1532 	int value = avd_get_int(flowop->fo_value);
1533 	int i;
1534 
1535 	filebench_log(LOG_DEBUG_IMPL,
1536 	    "flow %s-%d sem blocking on posix semaphore",
1537 	    flowop->fo_name, flowop->fo_instance);
1538 
1539 	/* Decrement sem by value */
1540 	for (i = 0; i < value; i++) {
1541 		if (sem_wait(&flowop->fo_sem) == -1) {
1542 			filebench_log(LOG_ERROR, "semop wait failed");
1543 			return (FILEBENCH_ERROR);
1544 		}
1545 	}
1546 
1547 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d sem unblocking",
1548 	    flowop->fo_name, flowop->fo_instance);
1549 #endif /* HAVE_SYSV_SEM */
1550 
1551 	return (FILEBENCH_OK);
1552 }
1553 
1554 /*
1555  * Calls ipc_seminit(). Always returns FILEBENCH_OK.
1556  */
1557 /* ARGSUSED */
1558 static int
1559 flowoplib_sempost_init(flowop_t *flowop)
1560 {
1561 #ifdef HAVE_SYSV_SEM
1562 	ipc_seminit();
1563 #endif /* HAVE_SYSV_SEM */
1564 	return (FILEBENCH_OK);
1565 }
1566 
1567 /*
1568  * Post to a System V or posix semaphore as appropriate.
1569  * On the first call for a given flowop instance, this routine
1570  * will use the fo_targetname attribute to locate all semblock
1571  * flowops that are expecting posts from this flowop. All
1572  * target flowops on this list will have a post operation done
1573  * to their semaphores on each call.
1574  */
1575 static int
1576 flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop)
1577 {
1578 	flowop_t *target;
1579 
1580 	filebench_log(LOG_DEBUG_IMPL,
1581 	    "sempost flow %s-%d",
1582 	    flowop->fo_name,
1583 	    flowop->fo_instance);
1584 
1585 	/* if this is the first post, create the post list */
1586 	if (flowop->fo_targets == NULL) {
1587 		flowop_t *result = flowop_find(flowop->fo_targetname);
1588 
1589 		flowop->fo_targets = result;
1590 
1591 		if (result == NULL) {
1592 			filebench_log(LOG_ERROR,
1593 			    "sempost: could not find op %s for thread %s",
1594 			    flowop->fo_targetname,
1595 			    threadflow->tf_name);
1596 			filebench_shutdown(1);
1597 		}
1598 
1599 		while (result) {
1600 			result->fo_targetnext =
1601 			    result->fo_resultnext;
1602 			result = result->fo_resultnext;
1603 		}
1604 	}
1605 
1606 	target = flowop->fo_targets;
1607 
1608 	flowop_beginop(threadflow, flowop);
1609 	/* post to the targets */
1610 	while (target) {
1611 #ifdef HAVE_SYSV_SEM
1612 		struct sembuf sbuf[2];
1613 		int sys_semid;
1614 		int blocking;
1615 #else
1616 		int i;
1617 #endif /* HAVE_SYSV_SEM */
1618 		struct timespec timeout;
1619 		int value = (int)avd_get_int(flowop->fo_value);
1620 
1621 		if (target->fo_instance == FLOW_MASTER) {
1622 			target = target->fo_targetnext;
1623 			continue;
1624 		}
1625 
1626 #ifdef HAVE_SYSV_SEM
1627 
1628 		filebench_log(LOG_DEBUG_IMPL,
1629 		    "sempost flow %s-%d num %x",
1630 		    target->fo_name,
1631 		    target->fo_instance,
1632 		    target->fo_semid_lw);
1633 
1634 		sys_semid = filebench_shm->shm_sys_semid;
1635 		sbuf[0].sem_num = target->fo_semid_lw;
1636 		sbuf[0].sem_op = (short)value;
1637 		sbuf[0].sem_flg = 0;
1638 		sbuf[1].sem_num = target->fo_semid_hw;
1639 		sbuf[1].sem_op = value * -1;
1640 		sbuf[1].sem_flg = 0;
1641 		timeout.tv_sec = 600;
1642 		timeout.tv_nsec = 0;
1643 
1644 		if (avd_get_bool(flowop->fo_blocking))
1645 			blocking = 1;
1646 		else
1647 			blocking = 0;
1648 
1649 #ifdef HAVE_SEMTIMEDOP
1650 		if ((semtimedop(sys_semid, &sbuf[0], blocking + 1,
1651 		    &timeout) == -1) && (errno && (errno != EAGAIN))) {
1652 #else
1653 		if ((semop(sys_semid, &sbuf[0], blocking + 1) == -1) &&
1654 		    (errno && (errno != EAGAIN))) {
1655 #endif /* HAVE_SEMTIMEDOP */
1656 			filebench_log(LOG_ERROR, "semop post failed: %s",
1657 			    strerror(errno));
1658 			return (FILEBENCH_ERROR);
1659 		}
1660 
1661 		filebench_log(LOG_DEBUG_IMPL,
1662 		    "flow %s-%d finished posting",
1663 		    target->fo_name, target->fo_instance);
1664 #else
1665 		filebench_log(LOG_DEBUG_IMPL,
1666 		    "sempost flow %s-%d to posix semaphore",
1667 		    target->fo_name,
1668 		    target->fo_instance);
1669 
1670 		/* Increment sem by value */
1671 		for (i = 0; i < value; i++) {
1672 			if (sem_post(&target->fo_sem) == -1) {
1673 				filebench_log(LOG_ERROR, "semop post failed");
1674 				return (FILEBENCH_ERROR);
1675 			}
1676 		}
1677 
1678 		filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
1679 		    target->fo_name, target->fo_instance);
1680 #endif /* HAVE_SYSV_SEM */
1681 
1682 		target = target->fo_targetnext;
1683 	}
1684 	flowop_endop(threadflow, flowop, 0);
1685 
1686 	return (FILEBENCH_OK);
1687 }
1688 
1689 
1690 /*
1691  * Section for exercising create / open / close / delete operations
1692  * on files within a fileset. For proper operation, the flowop attribute
1693  * "fd", which sets the fo_fdnumber field in the flowop, must be used
1694  * so that the same file is opened and later closed. "fd" is an index
1695  * into a pair of arrays maintained by threadflows, one of which
1696  * contains the operating system assigned file descriptors and the other
1697  * a pointer to the filesetentry whose file the file descriptor
1698  * references. An openfile flowop defined without fd being set will use
1699  * the default (0) fd or, if specified, rotate through fd indices, but
1700  * createfile and closefile must use the default or a specified fd.
1701  * Meanwhile deletefile picks and arbitrary file to delete, regardless
1702  * of fd attribute.
1703  */
1704 
1705 /*
1706  * XXX Making file selection more consistent among the flowops might good
1707  */
1708 
1709 
1710 /*
1711  * Emulates (and actually does) file open. Obtains a file descriptor
1712  * index, then calls flowoplib_openfile_common() to open. Returns
1713  * FILEBENCH_ERROR if no file descriptor is found, and returns the
1714  * status from flowoplib_openfile_common otherwise (FILEBENCH_ERROR,
1715  * FILEBENCH_NORSC, FILEBENCH_OK).
1716  */
1717 static int
1718 flowoplib_openfile(threadflow_t *threadflow, flowop_t *flowop)
1719 {
1720 	int fd = flowoplib_fdnum(threadflow, flowop);
1721 
1722 	if (fd == -1)
1723 		return (FILEBENCH_ERROR);
1724 
1725 	return (flowoplib_openfile_common(threadflow, flowop, fd));
1726 }
1727 
1728 /*
1729  * Common file opening code for filesets. Uses the supplied
1730  * file descriptor index to determine the tf_fd entry to use.
1731  * If the entry is empty (0) and the fileset exists, fileset
1732  * pick is called to select a fileset entry to use. The file
1733  * specified in the filesetentry is opened, and the returned
1734  * operating system file descriptor and a pointer to the
1735  * filesetentry are stored in tf_fd[fd] and tf_fse[fd],
1736  * respectively. Returns FILEBENCH_ERROR on error,
1737  * FILEBENCH_NORSC if no suitable filesetentry can be found,
1738  * and FILEBENCH_OK on success.
1739  */
1740 static int
1741 flowoplib_openfile_common(threadflow_t *threadflow, flowop_t *flowop, int fd)
1742 {
1743 	filesetentry_t *file;
1744 	char *fileset_name;
1745 	int tid = 0;
1746 
1747 	if (flowop->fo_fileset == NULL) {
1748 		filebench_log(LOG_ERROR, "flowop NULL file");
1749 		return (FILEBENCH_ERROR);
1750 	}
1751 
1752 	if ((fileset_name =
1753 	    avd_get_str(flowop->fo_fileset->fs_name)) == NULL) {
1754 		filebench_log(LOG_ERROR,
1755 		    "flowop %s: fileset has no name", flowop->fo_name);
1756 		return (FILEBENCH_ERROR);
1757 	}
1758 
1759 	/*
1760 	 * If the flowop doesn't default to persistent fd
1761 	 * then get unique thread ID for use by fileset_pick
1762 	 */
1763 	if (avd_get_bool(flowop->fo_rotatefd))
1764 		tid = threadflow->tf_utid;
1765 
1766 	if (threadflow->tf_fd[fd] != 0) {
1767 		filebench_log(LOG_ERROR,
1768 		    "flowop %s attempted to open without closing on fd %d",
1769 		    flowop->fo_name, fd);
1770 		return (FILEBENCH_ERROR);
1771 	}
1772 
1773 #ifdef HAVE_RAW_SUPPORT
1774 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1775 		int open_attrs = 0;
1776 		char name[MAXPATHLEN];
1777 
1778 		(void) strcpy(name,
1779 		    avd_get_str(flowop->fo_fileset->fs_path));
1780 		(void) strcat(name, "/");
1781 		(void) strcat(name, fileset_name);
1782 
1783 		if (avd_get_bool(flowop->fo_dsync)) {
1784 #ifdef sun
1785 			open_attrs |= O_DSYNC;
1786 #else
1787 			open_attrs |= O_FSYNC;
1788 #endif
1789 		}
1790 
1791 		filebench_log(LOG_DEBUG_SCRIPT,
1792 		    "open raw device %s flags %d = %d", name, open_attrs, fd);
1793 
1794 		threadflow->tf_fd[fd] = open64(name,
1795 		    O_RDWR | open_attrs, 0666);
1796 
1797 		if (threadflow->tf_fd[fd] < 0) {
1798 			filebench_log(LOG_ERROR,
1799 			    "Failed to open raw device %s: %s",
1800 			    name, strerror(errno));
1801 			return (FILEBENCH_ERROR);
1802 		}
1803 
1804 		/* if running on Solaris, use un-buffered io */
1805 #ifdef sun
1806 		(void) directio(threadflow->tf_fd[fd], DIRECTIO_ON);
1807 #endif
1808 
1809 		threadflow->tf_fse[fd] = NULL;
1810 
1811 		return (FILEBENCH_OK);
1812 	}
1813 #endif /* HAVE_RAW_SUPPORT */
1814 
1815 	if ((file = fileset_pick(flowop->fo_fileset,
1816 	    FILESET_PICKEXISTS, tid)) == NULL) {
1817 		filebench_log(LOG_DEBUG_SCRIPT,
1818 		    "flowop %s failed to pick file from %s on fd %d",
1819 		    flowop->fo_name, fileset_name, fd);
1820 		return (FILEBENCH_NORSC);
1821 	}
1822 
1823 	threadflow->tf_fse[fd] = file;
1824 
1825 	flowop_beginop(threadflow, flowop);
1826 	threadflow->tf_fd[fd] = fileset_openfile(flowop->fo_fileset,
1827 	    file, O_RDWR, 0666, flowoplib_fileattrs(flowop));
1828 	flowop_endop(threadflow, flowop, 0);
1829 
1830 	if (threadflow->tf_fd[fd] < 0) {
1831 		filebench_log(LOG_ERROR, "flowop %s failed to open file %s",
1832 		    flowop->fo_name, file->fse_path);
1833 		return (FILEBENCH_ERROR);
1834 	}
1835 
1836 	filebench_log(LOG_DEBUG_SCRIPT,
1837 	    "flowop %s: opened %s fd[%d] = %d",
1838 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1839 
1840 	return (FILEBENCH_OK);
1841 }
1842 
1843 /*
1844  * Emulate create of a file. Uses the flowop's fdnumber to select
1845  * tf_fd and tf_fse array locations to put the created file's file
1846  * descriptor and filesetentry respectively. Uses fileset_pick()
1847  * to select a specific filesetentry whose file does not currently
1848  * exist for the file create operation. Then calls
1849  * fileset_openfile() with the O_CREATE flag set to create the
1850  * file. Returns FILEBENCH_ERROR if the array index specified by fdnumber is
1851  * already in use, the flowop has no associated fileset, or
1852  * the create call fails. Returns 1 if a filesetentry with a
1853  * nonexistent file cannot be found. Returns FILEBENCH_OK on success.
1854  */
1855 static int
1856 flowoplib_createfile(threadflow_t *threadflow, flowop_t *flowop)
1857 {
1858 	filesetentry_t *file;
1859 	int fd = flowop->fo_fdnumber;
1860 
1861 	if (threadflow->tf_fd[fd] != 0) {
1862 		filebench_log(LOG_ERROR,
1863 		    "flowop %s attempted to create without closing on fd %d",
1864 		    flowop->fo_name, fd);
1865 		return (FILEBENCH_ERROR);
1866 	}
1867 
1868 	if (flowop->fo_fileset == NULL) {
1869 		filebench_log(LOG_ERROR, "flowop NULL file");
1870 		return (FILEBENCH_ERROR);
1871 	}
1872 
1873 #ifdef HAVE_RAW_SUPPORT
1874 	/* can't be used with raw devices */
1875 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1876 		filebench_log(LOG_ERROR,
1877 		    "flowop %s attempted to a createfile on RAW device",
1878 		    flowop->fo_name);
1879 		return (FILEBENCH_ERROR);
1880 	}
1881 #endif /* HAVE_RAW_SUPPORT */
1882 
1883 	if ((file = fileset_pick(flowop->fo_fileset,
1884 	    FILESET_PICKNOEXIST, 0)) == NULL) {
1885 		filebench_log(LOG_DEBUG_SCRIPT,
1886 		    "flowop %s failed to pick file from fileset %s",
1887 		    flowop->fo_name,
1888 		    avd_get_str(flowop->fo_fileset->fs_name));
1889 		return (FILEBENCH_NORSC);
1890 	}
1891 
1892 	threadflow->tf_fse[fd] = file;
1893 
1894 	flowop_beginop(threadflow, flowop);
1895 	threadflow->tf_fd[fd] = fileset_openfile(flowop->fo_fileset,
1896 	    file, O_RDWR | O_CREAT, 0666, flowoplib_fileattrs(flowop));
1897 	flowop_endop(threadflow, flowop, 0);
1898 
1899 	if (threadflow->tf_fd[fd] < 0) {
1900 		filebench_log(LOG_ERROR, "failed to create file %s",
1901 		    flowop->fo_name);
1902 		return (FILEBENCH_ERROR);
1903 	}
1904 
1905 	filebench_log(LOG_DEBUG_SCRIPT,
1906 	    "flowop %s: created %s fd[%d] = %d",
1907 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1908 
1909 	return (FILEBENCH_OK);
1910 }
1911 
1912 /*
1913  * Emulates delete of a file. If a valid fd is provided, it uses the
1914  * filesetentry stored at that fd location to select the file to be
1915  * deleted, otherwise it picks an arbitrary filesetentry
1916  * whose file exists. It then uses unlink() to delete it and Clears
1917  * the FSE_EXISTS flag for the filesetentry. Returns FILEBENCH_ERROR if the
1918  * flowop has no associated fileset. Returns FILEBENCH_NORSC if an appropriate
1919  * filesetentry cannot be found, and FILEBENCH_OK on success.
1920  */
1921 static int
1922 flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop)
1923 {
1924 	filesetentry_t *file;
1925 	fileset_t *fileset;
1926 	char path[MAXPATHLEN];
1927 	char *pathtmp;
1928 	int fd = flowop->fo_fdnumber;
1929 
1930 	/* if fd specified, use it to access file */
1931 	if ((fd > 0) && ((file = threadflow->tf_fse[fd]) != NULL)) {
1932 
1933 		/* check whether file still open */
1934 		if (threadflow->tf_fd[fd] > 0) {
1935 			filebench_log(LOG_DEBUG_SCRIPT,
1936 			    "flowop %s deleting still open file at fd = %d",
1937 			    flowop->fo_name, fd);
1938 		}
1939 
1940 		/* indicate that the file will be deleted */
1941 		threadflow->tf_fse[fd] = NULL;
1942 
1943 		/* if here, we still have a valid file pointer */
1944 		fileset = file->fse_fileset;
1945 	} else {
1946 		/* Otherwise, pick arbitrary file */
1947 		file = NULL;
1948 		fileset = flowop->fo_fileset;
1949 	}
1950 
1951 
1952 	if (fileset == NULL) {
1953 		filebench_log(LOG_ERROR, "flowop NULL file");
1954 		return (FILEBENCH_ERROR);
1955 	}
1956 
1957 #ifdef HAVE_RAW_SUPPORT
1958 	/* can't be used with raw devices */
1959 	if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1960 		filebench_log(LOG_ERROR,
1961 		    "flowop %s attempted a deletefile on RAW device",
1962 		    flowop->fo_name);
1963 		return (FILEBENCH_ERROR);
1964 	}
1965 #endif /* HAVE_RAW_SUPPORT */
1966 
1967 	if (file == NULL) {
1968 		/* pick arbitrary, existing (allocated) file */
1969 		if ((file = fileset_pick(fileset, FILESET_PICKEXISTS, 0))
1970 		    == NULL) {
1971 			filebench_log(LOG_DEBUG_SCRIPT,
1972 			    "flowop %s failed to pick file", flowop->fo_name);
1973 			return (FILEBENCH_NORSC);
1974 		}
1975 	} else {
1976 		/* delete specific file. wait for it to be non-busy */
1977 		(void) ipc_mutex_lock(&fileset->fs_pick_lock);
1978 		while (file->fse_flags & FSE_BUSY) {
1979 			file->fse_flags |= FSE_THRD_WAITNG;
1980 			(void) pthread_cond_wait(&fileset->fs_thrd_wait_cv,
1981 			    &fileset->fs_pick_lock);
1982 		}
1983 
1984 		/* File now available, grab it for deletion */
1985 		file->fse_flags |= FSE_BUSY;
1986 		fileset->fs_idle_files--;
1987 		(void) ipc_mutex_unlock(&fileset->fs_pick_lock);
1988 	}
1989 
1990 	*path = 0;
1991 	(void) strcpy(path, avd_get_str(fileset->fs_path));
1992 	(void) strcat(path, "/");
1993 	(void) strcat(path, avd_get_str(fileset->fs_name));
1994 	pathtmp = fileset_resolvepath(file);
1995 	(void) strcat(path, pathtmp);
1996 	free(pathtmp);
1997 
1998 	/* delete the selected file */
1999 	flowop_beginop(threadflow, flowop);
2000 	(void) unlink(path);
2001 	flowop_endop(threadflow, flowop, 0);
2002 
2003 	/* indicate that it is no longer busy and no longer exists */
2004 	fileset_unbusy(file, TRUE, FALSE);
2005 
2006 	filebench_log(LOG_DEBUG_SCRIPT, "deleted file %s", file->fse_path);
2007 
2008 	return (FILEBENCH_OK);
2009 }
2010 
2011 /*
2012  * Emulates fsync of a file. Obtains the file descriptor index
2013  * from the flowop, obtains the actual file descriptor from
2014  * the threadflow's table, checks to be sure it is still an
2015  * open file, then does an fsync operation on it. Returns FILEBENCH_ERROR
2016  * if the file no longer is open, FILEBENCH_OK otherwise.
2017  */
2018 static int
2019 flowoplib_fsync(threadflow_t *threadflow, flowop_t *flowop)
2020 {
2021 	filesetentry_t *file;
2022 	int fd = flowop->fo_fdnumber;
2023 
2024 	if (threadflow->tf_fd[fd] == 0) {
2025 		filebench_log(LOG_ERROR,
2026 		    "flowop %s attempted to fsync a closed fd %d",
2027 		    flowop->fo_name, fd);
2028 		return (FILEBENCH_ERROR);
2029 	}
2030 
2031 	file = threadflow->tf_fse[fd];
2032 
2033 	if ((file == NULL) ||
2034 	    (file->fse_fileset->fs_attrs & FILESET_IS_RAW_DEV)) {
2035 		filebench_log(LOG_ERROR,
2036 		    "flowop %s attempted to a fsync a RAW device",
2037 		    flowop->fo_name);
2038 		return (FILEBENCH_ERROR);
2039 	}
2040 
2041 	/* Measure time to fsync */
2042 	flowop_beginop(threadflow, flowop);
2043 	(void) fsync(threadflow->tf_fd[fd]);
2044 	flowop_endop(threadflow, flowop, 0);
2045 
2046 	filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s", file->fse_path);
2047 
2048 	return (FILEBENCH_OK);
2049 }
2050 
2051 /*
2052  * Emulate fsync of an entire fileset. Search through the
2053  * threadflow's file descriptor array, doing fsync() on each
2054  * open file that belongs to the flowop's fileset. Always
2055  * returns FILEBENCH_OK.
2056  */
2057 static int
2058 flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop)
2059 {
2060 	int fd;
2061 
2062 	for (fd = 0; fd < THREADFLOW_MAXFD; fd++) {
2063 		filesetentry_t *file;
2064 
2065 		/* Match the file set to fsync */
2066 		if ((threadflow->tf_fse[fd] == NULL) ||
2067 		    (flowop->fo_fileset != threadflow->tf_fse[fd]->fse_fileset))
2068 			continue;
2069 
2070 		/* Measure time to fsync */
2071 		flowop_beginop(threadflow, flowop);
2072 		(void) fsync(threadflow->tf_fd[fd]);
2073 		flowop_endop(threadflow, flowop, 0);
2074 
2075 		file = threadflow->tf_fse[fd];
2076 
2077 		filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s",
2078 		    file->fse_path);
2079 	}
2080 
2081 	return (FILEBENCH_OK);
2082 }
2083 
2084 /*
2085  * Emulate close of a file.  Obtains the file descriptor index
2086  * from the flowop, obtains the actual file descriptor from the
2087  * threadflow's table, checks to be sure it is still an open
2088  * file, then does a close operation on it. Then sets the
2089  * threadflow file descriptor table entry to 0, and the file set
2090  * entry pointer to NULL. Returns FILEBENCH_ERROR if the file was not open,
2091  * FILEBENCH_OK otherwise.
2092  */
2093 static int
2094 flowoplib_closefile(threadflow_t *threadflow, flowop_t *flowop)
2095 {
2096 	filesetentry_t *file;
2097 	int fd = flowop->fo_fdnumber;
2098 
2099 	if (threadflow->tf_fd[fd] == 0) {
2100 		filebench_log(LOG_ERROR,
2101 		    "flowop %s attempted to close an already closed fd %d",
2102 		    flowop->fo_name, fd);
2103 		return (FILEBENCH_ERROR);
2104 	}
2105 
2106 	/* Measure time to close */
2107 	flowop_beginop(threadflow, flowop);
2108 	(void) close(threadflow->tf_fd[fd]);
2109 	flowop_endop(threadflow, flowop, 0);
2110 
2111 	file = threadflow->tf_fse[fd];
2112 
2113 	threadflow->tf_fd[fd] = 0;
2114 
2115 	filebench_log(LOG_DEBUG_SCRIPT, "closed file %s", file->fse_path);
2116 
2117 	return (FILEBENCH_OK);
2118 }
2119 
2120 /*
2121  * Emulate stat of a file. Picks an arbitrary filesetentry with
2122  * an existing file from the flowop's fileset, then performs a
2123  * stat() operation on it. Returns FILEBENCH_ERROR if the flowop has no
2124  * associated fileset. Returns FILEBENCH_NORSC if an appropriate filesetentry
2125  * cannot be found, and FILEBENCH_OK on success.
2126  */
2127 static int
2128 flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop)
2129 {
2130 	filesetentry_t *file;
2131 	fileset_t *fileset;
2132 	struct stat statbuf;
2133 	int fd = flowop->fo_fdnumber;
2134 
2135 	/* if fd specified and the file is open, use it to access file */
2136 	if ((fd > 0) && ((threadflow->tf_fd[fd]) > 0)) {
2137 
2138 		/* check whether file handle still valid */
2139 		if ((file = threadflow->tf_fse[fd]) == NULL) {
2140 			filebench_log(LOG_DEBUG_SCRIPT,
2141 			    "flowop %s trying to stat NULL file at fd = %d",
2142 			    flowop->fo_name, fd);
2143 			return (FILEBENCH_ERROR);
2144 		}
2145 
2146 		/* if here, we still have a valid file pointer */
2147 		fileset = file->fse_fileset;
2148 	} else {
2149 		/* Otherwise, pick arbitrary file */
2150 		file = NULL;
2151 		fileset = flowop->fo_fileset;
2152 	}
2153 
2154 
2155 	if (fileset == NULL) {
2156 		filebench_log(LOG_ERROR,
2157 		    "statfile with no fileset specified");
2158 		return (FILEBENCH_ERROR);
2159 	}
2160 
2161 #ifdef HAVE_RAW_SUPPORT
2162 	/* can't be used with raw devices */
2163 	if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
2164 		filebench_log(LOG_ERROR,
2165 		    "flowop %s attempted do a statfile on a RAW device",
2166 		    flowop->fo_name);
2167 		return (FILEBENCH_ERROR);
2168 	}
2169 #endif /* HAVE_RAW_SUPPORT */
2170 
2171 	if (file == NULL) {
2172 		char path[MAXPATHLEN];
2173 		char *pathtmp;
2174 
2175 		/* pick arbitrary, existing (allocated) file */
2176 		if ((file = fileset_pick(fileset, FILESET_PICKEXISTS, 0))
2177 		    == NULL) {
2178 			filebench_log(LOG_DEBUG_SCRIPT,
2179 			    "Statfile flowop %s failed to pick file",
2180 			    flowop->fo_name);
2181 			return (FILEBENCH_NORSC);
2182 		}
2183 
2184 		/* resolve path and do a stat on file */
2185 		*path = 0;
2186 		(void) strcpy(path, avd_get_str(fileset->fs_path));
2187 		(void) strcat(path, "/");
2188 		(void) strcat(path, avd_get_str(fileset->fs_name));
2189 		pathtmp = fileset_resolvepath(file);
2190 		(void) strcat(path, pathtmp);
2191 		free(pathtmp);
2192 
2193 		/* stat the file */
2194 		flowop_beginop(threadflow, flowop);
2195 		if (stat(path, &statbuf) == -1)
2196 			filebench_log(LOG_ERROR,
2197 			    "statfile flowop %s failed", flowop->fo_name);
2198 		flowop_endop(threadflow, flowop, 0);
2199 
2200 		fileset_unbusy(file, FALSE, FALSE);
2201 	} else {
2202 		/* stat specific file */
2203 		flowop_beginop(threadflow, flowop);
2204 		if (fstat(threadflow->tf_fd[fd], &statbuf) == -1)
2205 			filebench_log(LOG_ERROR,
2206 			    "statfile flowop %s failed", flowop->fo_name);
2207 		flowop_endop(threadflow, flowop, 0);
2208 
2209 	}
2210 
2211 	return (FILEBENCH_OK);
2212 }
2213 
2214 
2215 /*
2216  * Additional reads and writes. Read and write whole files, write
2217  * and append to files. Some of these work with both fileobjs and
2218  * filesets, others only with filesets. The flowoplib_write routine
2219  * writes from thread memory, while the others read or write using
2220  * fo_buf memory. Note that both flowoplib_read() and
2221  * flowoplib_aiowrite() use thread memory as well.
2222  */
2223 
2224 
2225 /*
2226  * Emulate a read of a whole file. The file must be open with
2227  * file descriptor and filesetentry stored at the locations indexed
2228  * by the flowop's fdnumber. It then seeks to the beginning of the
2229  * associated file, and reads fs_iosize bytes at a time until the end
2230  * of the file. Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if
2231  * out of files, and FILEBENCH_OK on success.
2232  */
2233 static int
2234 flowoplib_readwholefile(threadflow_t *threadflow, flowop_t *flowop)
2235 {
2236 	caddr_t iobuf;
2237 	off64_t bytes = 0;
2238 	int filedesc;
2239 	uint64_t wss;
2240 	fbint_t iosize;
2241 	int ret;
2242 	char zerordbuf;
2243 
2244 	/* get the file to use */
2245 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
2246 	    &filedesc)) != FILEBENCH_OK)
2247 		return (ret);
2248 
2249 	/* an I/O size of zero means read entire working set with one I/O */
2250 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
2251 		iosize = wss;
2252 
2253 	/*
2254 	 * The file may actually be 0 bytes long, in which case skip
2255 	 * the buffer set up call (which would fail) and substitute
2256 	 * a small buffer, which won't really be used.
2257 	 */
2258 	if (iosize == 0) {
2259 		iobuf = (caddr_t)&zerordbuf;
2260 		filebench_log(LOG_DEBUG_SCRIPT,
2261 		    "flowop %s read zero length file", flowop->fo_name);
2262 	} else {
2263 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
2264 		    iosize) != 0)
2265 			return (FILEBENCH_ERROR);
2266 	}
2267 
2268 	/* Measure time to read bytes */
2269 	flowop_beginop(threadflow, flowop);
2270 	(void) lseek64(filedesc, 0, SEEK_SET);
2271 	while ((ret = read(filedesc, iobuf, iosize)) > 0)
2272 		bytes += ret;
2273 
2274 	flowop_endop(threadflow, flowop, bytes);
2275 
2276 	if (ret < 0) {
2277 		filebench_log(LOG_ERROR,
2278 		    "readwhole fail Failed to read whole file: %s",
2279 		    strerror(errno));
2280 		return (FILEBENCH_ERROR);
2281 	}
2282 
2283 	return (FILEBENCH_OK);
2284 }
2285 
2286 /*
2287  * Emulate a write to a file of size fo_iosize.  Will write
2288  * to a file from a fileset if the flowop's fo_fileset field
2289  * specifies one or its fdnumber is non zero. Otherwise it
2290  * will write to a fileobj file, if one exists. If the file
2291  * is not currently open, the routine will attempt to open
2292  * it. The flowop's fo_wss parameter will be used to set the
2293  * maximum file size if it is non-zero, otherwise the
2294  * filesetentry's  fse_size will be used. A random memory
2295  * buffer offset is calculated, and, if fo_random is TRUE,
2296  * a random file offset is used for the write. Otherwise the
2297  * write is to the next sequential location. Returns
2298  * FILEBENCH_ERROR on errors, FILEBENCH_NORSC if iosetup can't
2299  * obtain a file, or FILEBENCH_OK on success.
2300  */
2301 static int
2302 flowoplib_write(threadflow_t *threadflow, flowop_t *flowop)
2303 {
2304 	caddr_t iobuf;
2305 	fbint_t wss;
2306 	fbint_t iosize;
2307 	int filedesc;
2308 	int ret;
2309 
2310 	iosize = avd_get_int(flowop->fo_iosize);
2311 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2312 	    &filedesc, iosize)) != FILEBENCH_OK)
2313 		return (ret);
2314 
2315 	if (avd_get_bool(flowop->fo_random)) {
2316 		uint64_t fileoffset;
2317 
2318 		if (filebench_randomno64(&fileoffset,
2319 		    wss, iosize, NULL) == -1) {
2320 			filebench_log(LOG_ERROR,
2321 			    "file size smaller than IO size for thread %s",
2322 			    flowop->fo_name);
2323 			return (FILEBENCH_ERROR);
2324 		}
2325 		flowop_beginop(threadflow, flowop);
2326 		if (pwrite64(filedesc, iobuf,
2327 		    iosize, (off64_t)fileoffset) == -1) {
2328 			filebench_log(LOG_ERROR, "write failed, "
2329 			    "offset %llu io buffer %zd: %s",
2330 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
2331 			flowop_endop(threadflow, flowop, 0);
2332 			return (FILEBENCH_ERROR);
2333 		}
2334 		flowop_endop(threadflow, flowop, iosize);
2335 	} else {
2336 		flowop_beginop(threadflow, flowop);
2337 		if (write(filedesc, iobuf, iosize) == -1) {
2338 			filebench_log(LOG_ERROR,
2339 			    "write failed, io buffer %zd: %s",
2340 			    iobuf, strerror(errno));
2341 			flowop_endop(threadflow, flowop, 0);
2342 			return (FILEBENCH_ERROR);
2343 		}
2344 		flowop_endop(threadflow, flowop, iosize);
2345 	}
2346 
2347 	return (FILEBENCH_OK);
2348 }
2349 
2350 /*
2351  * Emulate a write of a whole file.  The size of the file
2352  * is taken from a filesetentry identified by fo_srcfdnumber or
2353  * from the working set size, while the file descriptor used is
2354  * identified by fo_fdnumber. Does multiple writes of fo_iosize
2355  * length length until full file has been written. Returns FILEBENCH_ERROR on
2356  * error, FILEBENCH_NORSC if out of files, FILEBENCH_OK on success.
2357  */
2358 static int
2359 flowoplib_writewholefile(threadflow_t *threadflow, flowop_t *flowop)
2360 {
2361 	caddr_t iobuf;
2362 	filesetentry_t *file;
2363 	int wsize;
2364 	off64_t seek;
2365 	off64_t bytes = 0;
2366 	uint64_t wss;
2367 	fbint_t iosize;
2368 	int filedesc;
2369 	int srcfd = flowop->fo_srcfdnumber;
2370 	int ret;
2371 	char zerowrtbuf;
2372 
2373 	/* get the file to use */
2374 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
2375 	    &filedesc)) != FILEBENCH_OK)
2376 		return (ret);
2377 
2378 	/* an I/O size of zero means write entire working set with one I/O */
2379 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
2380 		iosize = wss;
2381 
2382 	/*
2383 	 * The file may actually be 0 bytes long, in which case skip
2384 	 * the buffer set up call (which would fail) and substitute
2385 	 * a small buffer, which won't really be used.
2386 	 */
2387 	if (iosize == 0) {
2388 		iobuf = (caddr_t)&zerowrtbuf;
2389 		filebench_log(LOG_DEBUG_SCRIPT,
2390 		    "flowop %s wrote zero length file", flowop->fo_name);
2391 	} else {
2392 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
2393 		    iosize) != 0)
2394 			return (FILEBENCH_ERROR);
2395 	}
2396 
2397 	file = threadflow->tf_fse[srcfd];
2398 	if ((srcfd != 0) && (file == NULL)) {
2399 		filebench_log(LOG_ERROR, "flowop %s: NULL src file",
2400 		    flowop->fo_name);
2401 		return (FILEBENCH_ERROR);
2402 	}
2403 
2404 	if (file)
2405 		wss = file->fse_size;
2406 
2407 	wsize = (int)MIN(wss, iosize);
2408 
2409 	/* Measure time to write bytes */
2410 	flowop_beginop(threadflow, flowop);
2411 	for (seek = 0; seek < wss; seek += wsize) {
2412 		ret = write(filedesc, iobuf, wsize);
2413 		if (ret != wsize) {
2414 			filebench_log(LOG_ERROR,
2415 			    "Failed to write %d bytes on fd %d: %s",
2416 			    wsize, filedesc, strerror(errno));
2417 			flowop_endop(threadflow, flowop, 0);
2418 			return (FILEBENCH_ERROR);
2419 		}
2420 		wsize = (int)MIN(wss - seek, iosize);
2421 		bytes += ret;
2422 	}
2423 	flowop_endop(threadflow, flowop, bytes);
2424 
2425 	return (FILEBENCH_OK);
2426 }
2427 
2428 
2429 /*
2430  * Emulate a fixed size append to a file. Will append data to
2431  * a file chosen from a fileset if the flowop's fo_fileset
2432  * field specifies one or if its fdnumber is non zero.
2433  * Otherwise it will write to a fileobj file, if one exists.
2434  * The flowop's fo_wss parameter will be used to set the
2435  * maximum file size if it is non-zero, otherwise the
2436  * filesetentry's fse_size will be used. A random memory
2437  * buffer offset is calculated, then a logical seek to the
2438  * end of file is done followed by a write of fo_iosize
2439  * bytes. Writes are actually done from fo_buf, rather than
2440  * tf_mem as is done with flowoplib_write(), and no check
2441  * is made to see if fo_iosize exceeds the size of fo_buf.
2442  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
2443  * files in the fileset, FILEBENCH_OK on success.
2444  */
2445 static int
2446 flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop)
2447 {
2448 	caddr_t iobuf;
2449 	int filedesc;
2450 	fbint_t wss;
2451 	fbint_t iosize;
2452 	int ret;
2453 
2454 	iosize = avd_get_int(flowop->fo_iosize);
2455 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2456 	    &filedesc, iosize)) != FILEBENCH_OK)
2457 		return (ret);
2458 
2459 	/* XXX wss is not being used */
2460 
2461 	/* Measure time to write bytes */
2462 	flowop_beginop(threadflow, flowop);
2463 	(void) lseek64(filedesc, 0, SEEK_END);
2464 	ret = write(filedesc, iobuf, iosize);
2465 	if (ret != iosize) {
2466 		filebench_log(LOG_ERROR,
2467 		    "Failed to write %llu bytes on fd %d: %s",
2468 		    (u_longlong_t)iosize, filedesc, strerror(errno));
2469 		flowop_endop(threadflow, flowop, ret);
2470 		return (FILEBENCH_ERROR);
2471 	}
2472 	flowop_endop(threadflow, flowop, ret);
2473 
2474 	return (FILEBENCH_OK);
2475 }
2476 
2477 /*
2478  * Emulate a random size append to a file. Will append data
2479  * to a file chosen from a fileset if the flowop's fo_fileset
2480  * field specifies one or if its fdnumber is non zero. Otherwise
2481  * it will write to a fileobj file, if one exists. The flowop's
2482  * fo_wss parameter will be used to set the maximum file size
2483  * if it is non-zero, otherwise the filesetentry's fse_size
2484  * will be used.  A random transfer size (but at most fo_iosize
2485  * bytes) and a random memory offset are calculated. A logical
2486  * seek to the end of file is done, then writes of up to
2487  * FILE_ALLOC_BLOCK in size are done until the full transfer
2488  * size has been written. Writes are actually done from fo_buf,
2489  * rather than tf_mem as is done with flowoplib_write().
2490  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
2491  * files in the fileset, FILEBENCH_OK on success.
2492  */
2493 static int
2494 flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop)
2495 {
2496 	caddr_t iobuf;
2497 	uint64_t appendsize;
2498 	int filedesc;
2499 	fbint_t wss;
2500 	fbint_t iosize;
2501 	int ret = 0;
2502 
2503 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0) {
2504 		filebench_log(LOG_ERROR, "zero iosize for flowop %s",
2505 		    flowop->fo_name);
2506 		return (FILEBENCH_ERROR);
2507 	}
2508 
2509 	if (filebench_randomno64(&appendsize, iosize, 1LL, NULL) != 0)
2510 		return (FILEBENCH_ERROR);
2511 
2512 	/* skip if attempting zero length append */
2513 	if (appendsize == 0) {
2514 		flowop_beginop(threadflow, flowop);
2515 		flowop_endop(threadflow, flowop, 0LL);
2516 		return (FILEBENCH_OK);
2517 	}
2518 
2519 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2520 	    &filedesc, appendsize)) != FILEBENCH_OK)
2521 		return (ret);
2522 
2523 	/* XXX wss is not being used */
2524 
2525 	/* Measure time to write bytes */
2526 	flowop_beginop(threadflow, flowop);
2527 
2528 	(void) lseek64(filedesc, 0, SEEK_END);
2529 	ret = write(filedesc, iobuf, appendsize);
2530 	if (ret != appendsize) {
2531 		filebench_log(LOG_ERROR,
2532 		    "Failed to write %llu bytes on fd %d: %s",
2533 		    (u_longlong_t)appendsize, filedesc, strerror(errno));
2534 		flowop_endop(threadflow, flowop, 0);
2535 		return (FILEBENCH_ERROR);
2536 	}
2537 
2538 	flowop_endop(threadflow, flowop, appendsize);
2539 
2540 	return (FILEBENCH_OK);
2541 }
2542 
2543 typedef struct testrandvar_priv {
2544 	uint64_t sample_count;
2545 	double val_sum;
2546 	double sqr_sum;
2547 } testrandvar_priv_t;
2548 
2549 /*
2550  * flowop to calculate various statistics from the number stream
2551  * produced by a random variable. This allows verification that the
2552  * random distribution used to define the random variable is producing
2553  * the expected distribution of random numbers.
2554  */
2555 /* ARGSUSED */
2556 static int
2557 flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop)
2558 {
2559 	testrandvar_priv_t	*mystats;
2560 	double			value;
2561 
2562 	if ((mystats = (testrandvar_priv_t *)flowop->fo_private) == NULL) {
2563 		filebench_log(LOG_ERROR, "testrandvar not initialized\n");
2564 		filebench_shutdown(1);
2565 		return (-1);
2566 	}
2567 
2568 	value = avd_get_dbl(flowop->fo_value);
2569 
2570 	mystats->sample_count++;
2571 	mystats->val_sum += value;
2572 	mystats->sqr_sum += (value * value);
2573 
2574 	return (0);
2575 }
2576 
2577 /*
2578  * Initialize the private data area used to accumulate the statistics
2579  */
2580 static int
2581 flowoplib_testrandvar_init(flowop_t *flowop)
2582 {
2583 	testrandvar_priv_t	*mystats;
2584 
2585 	if ((mystats = (testrandvar_priv_t *)
2586 	    malloc(sizeof (testrandvar_priv_t))) == NULL) {
2587 		filebench_log(LOG_ERROR, "could not initialize testrandvar");
2588 		filebench_shutdown(1);
2589 		return (-1);
2590 	}
2591 
2592 	mystats->sample_count = 0;
2593 	mystats->val_sum = 0;
2594 	mystats->sqr_sum = 0;
2595 	flowop->fo_private = (void *)mystats;
2596 
2597 	(void) ipc_mutex_unlock(&flowop->fo_lock);
2598 	return (0);
2599 }
2600 
2601 /*
2602  * Print out the accumulated statistics, and free the private storage
2603  */
2604 static void
2605 flowoplib_testrandvar_destruct(flowop_t *flowop)
2606 {
2607 	testrandvar_priv_t	*mystats;
2608 	double mean, std_dev, dbl_count;
2609 
2610 	(void) ipc_mutex_lock(&flowop->fo_lock);
2611 	if ((mystats = (testrandvar_priv_t *)
2612 	    flowop->fo_private) == NULL) {
2613 		(void) ipc_mutex_unlock(&flowop->fo_lock);
2614 		return;
2615 	}
2616 
2617 	flowop->fo_private = NULL;
2618 	(void) ipc_mutex_unlock(&flowop->fo_lock);
2619 
2620 	dbl_count = (double)mystats->sample_count;
2621 	mean = mystats->val_sum / dbl_count;
2622 	std_dev = sqrt((mystats->sqr_sum / dbl_count) - (mean * mean)) / mean;
2623 
2624 	filebench_log(LOG_VERBOSE,
2625 	    "testrandvar: ops = %llu, mean = %8.2lf, stddev = %8.2lf",
2626 	    (u_longlong_t)mystats->sample_count, mean, std_dev);
2627 	free(mystats);
2628 }
2629 
2630 /*
2631  * prints message to the console from within a thread
2632  */
2633 static int
2634 flowoplib_print(threadflow_t *threadflow, flowop_t *flowop)
2635 {
2636 	procflow_t *procflow;
2637 
2638 	procflow = threadflow->tf_process;
2639 	filebench_log(LOG_INFO,
2640 	    "Message from process (%s,%d), thread (%s,%d): %s",
2641 	    procflow->pf_name, procflow->pf_instance,
2642 	    threadflow->tf_name, threadflow->tf_instance,
2643 	    avd_get_str(flowop->fo_value));
2644 
2645 	return (FILEBENCH_OK);
2646 }
2647 
2648 /*
2649  * Prints usage information for flowop operations.
2650  */
2651 void
2652 flowoplib_usage()
2653 {
2654 	(void) fprintf(stderr,
2655 	    "flowop [openfile|createfile] name=<name>,fileset=<fname>\n");
2656 	(void) fprintf(stderr,
2657 	    "                       [,fd=<file desc num>]\n");
2658 	(void) fprintf(stderr, "\n");
2659 	(void) fprintf(stderr,
2660 	    "flowop closefile name=<name>,fd=<file desc num>]\n");
2661 	(void) fprintf(stderr, "\n");
2662 	(void) fprintf(stderr, "flowop deletefile name=<name>\n");
2663 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2664 	(void) fprintf(stderr,
2665 	    "                       [,fd=<file desc num>]\n");
2666 	(void) fprintf(stderr, "\n");
2667 	(void) fprintf(stderr, "flowop statfile name=<name>\n");
2668 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2669 	(void) fprintf(stderr,
2670 	    "                       [,fd=<file desc num>]\n");
2671 	(void) fprintf(stderr, "\n");
2672 	(void) fprintf(stderr,
2673 	    "flowop fsync name=<name>,fd=<file desc num>]\n");
2674 	(void) fprintf(stderr, "\n");
2675 	(void) fprintf(stderr,
2676 	    "flowop fsyncset name=<name>,fileset=<fname>]\n");
2677 	(void) fprintf(stderr, "\n");
2678 	(void) fprintf(stderr, "flowop [write|read|aiowrite] name=<name>, \n");
2679 	(void) fprintf(stderr,
2680 	    "                       filename|fileset=<fname>,\n");
2681 	(void) fprintf(stderr, "                       iosize=<size>\n");
2682 	(void) fprintf(stderr, "                       [,directio]\n");
2683 	(void) fprintf(stderr, "                       [,dsync]\n");
2684 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2685 	(void) fprintf(stderr, "                       [,random]\n");
2686 	(void) fprintf(stderr, "                       [,opennext]\n");
2687 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
2688 	(void) fprintf(stderr,
2689 	    "flowop [appendfile|appendfilerand] name=<name>, \n");
2690 	(void) fprintf(stderr,
2691 	    "                       filename|fileset=<fname>,\n");
2692 	(void) fprintf(stderr, "                       iosize=<size>\n");
2693 	(void) fprintf(stderr, "                       [,dsync]\n");
2694 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2695 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
2696 	(void) fprintf(stderr,
2697 	    "flowop [readwholefile|writewholefile] name=<name>, \n");
2698 	(void) fprintf(stderr,
2699 	    "                       filename|fileset=<fname>,\n");
2700 	(void) fprintf(stderr, "                       iosize=<size>\n");
2701 	(void) fprintf(stderr, "                       [,dsync]\n");
2702 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2703 	(void) fprintf(stderr, "\n");
2704 	(void) fprintf(stderr, "flowop aiowait name=<name>,target="
2705 	    "<aiowrite-flowop>\n");
2706 	(void) fprintf(stderr, "\n");
2707 	(void) fprintf(stderr, "flowop sempost name=<name>,"
2708 	    "target=<semblock-flowop>,\n");
2709 	(void) fprintf(stderr,
2710 	    "                       value=<increment-to-post>\n");
2711 	(void) fprintf(stderr, "\n");
2712 	(void) fprintf(stderr, "flowop semblock name=<name>,value="
2713 	    "<decrement-to-receive>,\n");
2714 	(void) fprintf(stderr, "                       highwater="
2715 	    "<inbound-queue-max>\n");
2716 	(void) fprintf(stderr, "\n");
2717 	(void) fprintf(stderr, "flowop block name=<name>\n");
2718 	(void) fprintf(stderr, "\n");
2719 	(void) fprintf(stderr,
2720 	    "flowop wakeup name=<name>,target=<block-flowop>,\n");
2721 	(void) fprintf(stderr, "\n");
2722 	(void) fprintf(stderr,
2723 	    "flowop hog name=<name>,value=<number-of-mem-ops>\n");
2724 	(void) fprintf(stderr,
2725 	    "flowop delay name=<name>,value=<number-of-seconds>\n");
2726 	(void) fprintf(stderr, "\n");
2727 	(void) fprintf(stderr, "flowop eventlimit name=<name>\n");
2728 	(void) fprintf(stderr, "flowop bwlimit name=<name>,value=<mb/s>\n");
2729 	(void) fprintf(stderr, "flowop iopslimit name=<name>,value=<iop/s>\n");
2730 	(void) fprintf(stderr,
2731 	    "flowop finishoncount name=<name>,value=<ops/s>\n");
2732 	(void) fprintf(stderr,
2733 	    "flowop finishonbytes name=<name>,value=<bytes>\n");
2734 	(void) fprintf(stderr, "\n");
2735 	(void) fprintf(stderr, "\n");
2736 }
2737