xref: /onnv-gate/usr/src/cmd/filebench/common/flowop_library.c (revision 6701:4213fadfdec4)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  *
25  * Portions Copyright 2008 Denis Cheng
26  */
27 
28 #pragma ident	"%Z%%M%	%I%	%E% SMI"
29 
30 #include "config.h"
31 
32 #include <sys/types.h>
33 #ifdef HAVE_SYS_ASYNCH_H
34 #include <sys/asynch.h>
35 #endif
36 #include <sys/ipc.h>
37 #include <sys/sem.h>
38 #include <sys/errno.h>
39 #include <sys/time.h>
40 #include <inttypes.h>
41 #include <fcntl.h>
42 #include <math.h>
43 
44 #ifdef HAVE_UTILITY_H
45 #include <utility.h>
46 #endif /* HAVE_UTILITY_H */
47 
48 #ifdef HAVE_AIO
49 #include <aio.h>
50 #endif /* HAVE_AIO */
51 
52 #ifdef HAVE_LIBAIO_H
53 #include <libaio.h>
54 #endif /* HAVE_LIBAIO_H */
55 
56 #ifdef HAVE_SYS_ASYNC_H
57 #include <sys/asynch.h>
58 #endif /* HAVE_SYS_ASYNC_H */
59 
60 #ifdef HAVE_AIO_H
61 #include <aio.h>
62 #endif /* HAVE_AIO_H */
63 
64 #ifndef HAVE_UINT_T
65 #define	uint_t unsigned int
66 #endif /* HAVE_UINT_T */
67 
68 #ifndef HAVE_AIOCB64_T
69 #define	aiocb64 aiocb
70 #endif /* HAVE_AIOCB64_T */
71 
72 #ifndef HAVE_SYSV_SEM
73 #include <semaphore.h>
74 #endif /* HAVE_SYSV_SEM */
75 
76 #include "filebench.h"
77 #include "flowop.h"
78 #include "fileset.h"
79 #include "fb_random.h"
80 
81 /*
82  * These routines implement the flowops from the f language. Each
83  * flowop has has a name such as "read", and a set of function pointers
84  * to call for initialization, execution and destruction of the flowop.
85  * The table flowoplib_funcs[] contains a flowoplib struct for each
86  * implemented flowop. Most flowops use a generic initialization function
87  * and all currently use a generic destruction function. All flowop
88  * functions referenced from the table are in this file, though, of
89  * course, they often call functions from other files.
90  *
91  * The flowop_init() routine uses the flowoplib_funcs[] table to
92  * create an initial set of "instance 0" flowops, one for each type of
93  * flowop, from which all other flowops are derived. These "instance 0"
94  * flowops are initialized with information from the table including
95  * pointers for their fo_init, fo_func and fo_destroy functions. When
96  * a flowop definition is encountered in an f language script, the
97  * "type" of flowop, such as "read" is used to search for the
98  * "instance 0" flowop named "read", then a new flowop is allocated
99  * which inherits its function pointers and other initial properties
100  * from the instance 0 flowop, and is given a new name as specified
101  * by the "name=" attribute.
102  */
103 
104 static int flowoplib_init_generic(flowop_t *flowop);
105 static void flowoplib_destruct_generic(flowop_t *flowop);
106 static void flowoplib_destruct_noop(flowop_t *flowop);
107 static int flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop);
108 static int flowoplib_write(threadflow_t *threadflow, flowop_t *flowop);
109 #ifdef HAVE_AIO
110 static int flowoplib_aiowrite(threadflow_t *threadflow, flowop_t *flowop);
111 static int flowoplib_aiowait(threadflow_t *threadflow, flowop_t *flowop);
112 #endif
113 static int flowoplib_read(threadflow_t *threadflow, flowop_t *flowop);
114 static int flowoplib_block_init(flowop_t *flowop);
115 static int flowoplib_block(threadflow_t *threadflow, flowop_t *flowop);
116 static int flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop);
117 static int flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop);
118 static int flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop);
119 static int flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop);
120 static int flowoplib_sempost_init(flowop_t *flowop);
121 static int flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop);
122 static int flowoplib_semblock_init(flowop_t *flowop);
123 static void flowoplib_semblock_destruct(flowop_t *flowop);
124 static int flowoplib_eventlimit(threadflow_t *, flowop_t *flowop);
125 static int flowoplib_bwlimit(threadflow_t *, flowop_t *flowop);
126 static int flowoplib_iopslimit(threadflow_t *, flowop_t *flowop);
127 static int flowoplib_opslimit(threadflow_t *, flowop_t *flowop);
128 static int flowoplib_openfile(threadflow_t *, flowop_t *flowop);
129 static int flowoplib_openfile_common(threadflow_t *, flowop_t *flowop, int fd);
130 static int flowoplib_createfile(threadflow_t *, flowop_t *flowop);
131 static int flowoplib_closefile(threadflow_t *, flowop_t *flowop);
132 static int flowoplib_fsync(threadflow_t *, flowop_t *flowop);
133 static int flowoplib_readwholefile(threadflow_t *, flowop_t *flowop);
134 static int flowoplib_writewholefile(threadflow_t *, flowop_t *flowop);
135 static int flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop);
136 static int flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop);
137 static int flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop);
138 static int flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop);
139 static int flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop);
140 static int flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop);
141 static int flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop);
142 static int flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop);
143 static int flowoplib_testrandvar_init(flowop_t *flowop);
144 static void flowoplib_testrandvar_destruct(flowop_t *flowop);
145 
146 typedef struct flowoplib {
147 	int	fl_type;
148 	int	fl_attrs;
149 	char	*fl_name;
150 	int	(*fl_init)();
151 	int	(*fl_func)();
152 	void	(*fl_destruct)();
153 } flowoplib_t;
154 
155 static flowoplib_t flowoplib_funcs[] = {
156 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "write", flowoplib_init_generic,
157 	flowoplib_write, flowoplib_destruct_generic,
158 	FLOW_TYPE_IO, FLOW_ATTR_READ, "read", flowoplib_init_generic,
159 	flowoplib_read, flowoplib_destruct_generic,
160 #ifdef HAVE_AIO
161 	FLOW_TYPE_AIO, FLOW_ATTR_WRITE, "aiowrite", flowoplib_init_generic,
162 	flowoplib_aiowrite, flowoplib_destruct_generic,
163 	FLOW_TYPE_AIO, 0, "aiowait", flowoplib_init_generic,
164 	flowoplib_aiowait, flowoplib_destruct_generic,
165 #endif
166 	FLOW_TYPE_SYNC, 0, "block", flowoplib_block_init,
167 	flowoplib_block, flowoplib_destruct_generic,
168 	FLOW_TYPE_SYNC, 0, "wakeup", flowoplib_init_generic,
169 	flowoplib_wakeup, flowoplib_destruct_generic,
170 	FLOW_TYPE_SYNC, 0, "semblock", flowoplib_semblock_init,
171 	flowoplib_semblock, flowoplib_semblock_destruct,
172 	FLOW_TYPE_SYNC, 0, "sempost", flowoplib_sempost_init,
173 	flowoplib_sempost, flowoplib_destruct_noop,
174 	FLOW_TYPE_OTHER, 0, "hog", flowoplib_init_generic,
175 	flowoplib_hog, flowoplib_destruct_generic,
176 	FLOW_TYPE_OTHER, 0, "delay", flowoplib_init_generic,
177 	flowoplib_delay, flowoplib_destruct_generic,
178 	FLOW_TYPE_OTHER, 0, "eventlimit", flowoplib_init_generic,
179 	flowoplib_eventlimit, flowoplib_destruct_generic,
180 	FLOW_TYPE_OTHER, 0, "bwlimit", flowoplib_init_generic,
181 	flowoplib_bwlimit, flowoplib_destruct_generic,
182 	FLOW_TYPE_OTHER, 0, "iopslimit", flowoplib_init_generic,
183 	flowoplib_iopslimit, flowoplib_destruct_generic,
184 	FLOW_TYPE_OTHER, 0, "opslimit", flowoplib_init_generic,
185 	flowoplib_opslimit, flowoplib_destruct_generic,
186 	FLOW_TYPE_OTHER, 0, "finishoncount", flowoplib_init_generic,
187 	flowoplib_finishoncount, flowoplib_destruct_generic,
188 	FLOW_TYPE_OTHER, 0, "finishonbytes", flowoplib_init_generic,
189 	flowoplib_finishonbytes, flowoplib_destruct_generic,
190 	FLOW_TYPE_IO, 0, "openfile", flowoplib_init_generic,
191 	flowoplib_openfile, flowoplib_destruct_generic,
192 	FLOW_TYPE_IO, 0, "createfile", flowoplib_init_generic,
193 	flowoplib_createfile, flowoplib_destruct_generic,
194 	FLOW_TYPE_IO, 0, "closefile", flowoplib_init_generic,
195 	flowoplib_closefile, flowoplib_destruct_generic,
196 	FLOW_TYPE_IO, 0, "fsync", flowoplib_init_generic,
197 	flowoplib_fsync, flowoplib_destruct_generic,
198 	FLOW_TYPE_IO, 0, "fsyncset", flowoplib_init_generic,
199 	flowoplib_fsyncset, flowoplib_destruct_generic,
200 	FLOW_TYPE_IO, 0, "statfile", flowoplib_init_generic,
201 	flowoplib_statfile, flowoplib_destruct_generic,
202 	FLOW_TYPE_IO, FLOW_ATTR_READ, "readwholefile", flowoplib_init_generic,
203 	flowoplib_readwholefile, flowoplib_destruct_generic,
204 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfile", flowoplib_init_generic,
205 	flowoplib_appendfile, flowoplib_destruct_generic,
206 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfilerand", flowoplib_init_generic,
207 	flowoplib_appendfilerand, flowoplib_destruct_generic,
208 	FLOW_TYPE_IO, 0, "deletefile", flowoplib_init_generic,
209 	flowoplib_deletefile, flowoplib_destruct_generic,
210 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "writewholefile", flowoplib_init_generic,
211 	flowoplib_writewholefile, flowoplib_destruct_generic,
212 	/* routine to calculate mean and stddev for output from a randvar */
213 	FLOW_TYPE_OTHER, 0, "testrandvar", flowoplib_testrandvar_init,
214 	flowoplib_testrandvar, flowoplib_testrandvar_destruct
215 };
216 
217 /*
218  * Loops through the master list of flowops defined in this
219  * module, and creates and initializes a flowop for each one
220  * by calling flowop_define. As a side effect of calling
221  * flowop define, the created flowops are placed on the
222  * master flowop list. All created flowops are set to
223  * instance "0".
224  */
225 void
226 flowoplib_init()
227 {
228 	int nops = sizeof (flowoplib_funcs) / sizeof (flowoplib_t);
229 	int i;
230 
231 	for (i = 0; i < nops; i++) {
232 		flowop_t *flowop;
233 		flowoplib_t *fl;
234 
235 		fl = &flowoplib_funcs[i];
236 
237 		if ((flowop = flowop_define(NULL,
238 		    fl->fl_name, NULL, NULL, 0, fl->fl_type)) == 0) {
239 			filebench_log(LOG_ERROR,
240 			    "failed to create flowop %s\n",
241 			    fl->fl_name);
242 			filebench_shutdown(1);
243 		}
244 
245 		flowop->fo_func = fl->fl_func;
246 		flowop->fo_init = fl->fl_init;
247 		flowop->fo_destruct = fl->fl_destruct;
248 		flowop->fo_attrs = fl->fl_attrs;
249 	}
250 }
251 
252 static int
253 flowoplib_init_generic(flowop_t *flowop)
254 {
255 	(void) ipc_mutex_unlock(&flowop->fo_lock);
256 	return (FILEBENCH_OK);
257 }
258 
259 static void
260 flowoplib_destruct_generic(flowop_t *flowop)
261 {
262 	char *buf;
263 
264 	/* release any local resources held by the flowop */
265 	(void) ipc_mutex_lock(&flowop->fo_lock);
266 	buf = flowop->fo_buf;
267 	flowop->fo_buf = NULL;
268 	(void) ipc_mutex_unlock(&flowop->fo_lock);
269 
270 	if (buf)
271 		free(buf);
272 }
273 
274 /*
275  * Special total noop destruct
276  */
277 /* ARGSUSED */
278 static void
279 flowoplib_destruct_noop(flowop_t *flowop)
280 {
281 }
282 
283 /*
284  * Generates a file attribute from flags in the supplied flowop.
285  * Sets FLOW_ATTR_DIRECTIO and/or FLOW_ATTR_DSYNC as needed.
286  */
287 static int
288 flowoplib_fileattrs(flowop_t *flowop)
289 {
290 	int attrs = 0;
291 
292 	if (avd_get_bool(flowop->fo_directio))
293 		attrs |= FLOW_ATTR_DIRECTIO;
294 
295 	if (avd_get_bool(flowop->fo_dsync))
296 		attrs |= FLOW_ATTR_DSYNC;
297 
298 	return (attrs);
299 }
300 
301 /*
302  * Searches for a file descriptor. Tries the flowop's
303  * fo_fdnumber first and returns with it if it has been
304  * explicitly set (greater than 0). It next checks to
305  * see if a rotating file descriptor policy is in effect,
306  * and if not returns the fdnumber regardless of what
307  * it is. (note that if it is 0, it just selects to the
308  * default file descriptor in the threadflow's tf_fd
309  * array). If the rotating fd policy is in effect, it
310  * cycles from the end of the tf_fd array to one location
311  * beyond the maximum needed by the number of entries in
312  * the associated fileset on each invocation, then starts
313  * over from the end.
314  *
315  * The routine returns an index into the threadflow's
316  * tf_fd table where the actual file descriptor will be
317  * found. Note: the calling routine must not call this
318  * routine if the flowop does not have a fileset, and the
319  * flowop's fo_fdnumber is zero and fo_rotatefd is
320  * asserted, or an addressing fault may occur.
321  */
322 static int
323 flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop)
324 {
325 	fbint_t	entries;
326 	int fdnumber = flowop->fo_fdnumber;
327 
328 	/* If the script sets the fd explicitly */
329 	if (fdnumber > 0)
330 		return (fdnumber);
331 
332 	/* If the flowop defaults to persistent fd */
333 	if (!avd_get_bool(flowop->fo_rotatefd))
334 		return (fdnumber);
335 
336 	if (flowop->fo_fileset == NULL) {
337 		filebench_log(LOG_ERROR, "flowop NULL file");
338 		return (FILEBENCH_ERROR);
339 	}
340 
341 	entries = flowop->fo_fileset->fs_constentries;
342 
343 	/* Rotate the fd on each flowop invocation */
344 	if (entries > (THREADFLOW_MAXFD / 2)) {
345 		filebench_log(LOG_ERROR, "Out of file descriptors in flowop %s"
346 		    " (too many files : %llu",
347 		    flowop->fo_name, (u_longlong_t)entries);
348 		return (FILEBENCH_ERROR);
349 	}
350 
351 	/* First time around */
352 	if (threadflow->tf_fdrotor == 0)
353 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
354 
355 	/* One fd for every file in the set */
356 	if (entries == (THREADFLOW_MAXFD - threadflow->tf_fdrotor))
357 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
358 
359 
360 	threadflow->tf_fdrotor--;
361 	filebench_log(LOG_DEBUG_IMPL, "selected fd = %d",
362 	    threadflow->tf_fdrotor);
363 	return (threadflow->tf_fdrotor);
364 }
365 
366 /*
367  * Determines the file descriptor to use, and attempts to open
368  * the file if it is not already open. Also determines the wss
369  * value. Returns FILEBENCH_ERROR on errors, FILESET_NORSC if
370  * if flowop_openfile_common couldn't obtain an appropriate file
371  * from a the fileset, and FILEBENCH_OK otherwise.
372  */
373 static int
374 flowoplib_filesetup(threadflow_t *threadflow, flowop_t *flowop,
375     fbint_t *wssp, int *filedescp)
376 {
377 	int fd = flowoplib_fdnum(threadflow, flowop);
378 
379 	if (fd == -1)
380 		return (FILEBENCH_ERROR);
381 
382 	if (threadflow->tf_fd[fd] == 0) {
383 		int ret;
384 
385 		if ((ret = flowoplib_openfile_common(
386 		    threadflow, flowop, fd)) != FILEBENCH_OK)
387 			return (ret);
388 
389 		if (threadflow->tf_fse[fd]) {
390 			filebench_log(LOG_DEBUG_IMPL, "opened file %s",
391 			    threadflow->tf_fse[fd]->fse_path);
392 		} else {
393 			filebench_log(LOG_DEBUG_IMPL,
394 			    "opened device %s/%s",
395 			    avd_get_str(flowop->fo_fileset->fs_path),
396 			    avd_get_str(flowop->fo_fileset->fs_name));
397 		}
398 	}
399 
400 	*filedescp = threadflow->tf_fd[fd];
401 
402 	if ((*wssp = flowop->fo_constwss) == 0) {
403 		if (threadflow->tf_fse[fd])
404 			*wssp = threadflow->tf_fse[fd]->fse_size;
405 		else
406 			*wssp = avd_get_int(flowop->fo_fileset->fs_size);
407 	}
408 
409 	return (FILEBENCH_OK);
410 }
411 
412 /*
413  * Determines the io buffer or random offset into tf_mem for
414  * the IO operation. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
415  */
416 static int
417 flowoplib_iobufsetup(threadflow_t *threadflow, flowop_t *flowop,
418     caddr_t *iobufp, fbint_t iosize)
419 {
420 	long memsize;
421 	size_t memoffset;
422 
423 	if (iosize == 0) {
424 		filebench_log(LOG_ERROR, "zero iosize for thread %s",
425 		    flowop->fo_name);
426 		return (FILEBENCH_ERROR);
427 	}
428 
429 	if ((memsize = threadflow->tf_constmemsize) != 0) {
430 
431 		/* use tf_mem for I/O with random offset */
432 		if (filebench_randomno(&memoffset,
433 		    memsize, iosize, NULL) == -1) {
434 			filebench_log(LOG_ERROR,
435 			    "tf_memsize smaller than IO size for thread %s",
436 			    flowop->fo_name);
437 			return (FILEBENCH_ERROR);
438 		}
439 		*iobufp = threadflow->tf_mem + memoffset;
440 
441 	} else {
442 		/* use private I/O buffer */
443 		if ((flowop->fo_buf != NULL) &&
444 		    (flowop->fo_buf_size < iosize)) {
445 			/* too small, so free up and re-allocate */
446 			free(flowop->fo_buf);
447 			flowop->fo_buf = NULL;
448 		}
449 
450 		/*
451 		 * Allocate memory for the  buffer. The memory is freed
452 		 * by flowop_destruct_generic() or by this routine if more
453 		 * memory is needed for the buffer.
454 		 */
455 		if ((flowop->fo_buf == NULL) && ((flowop->fo_buf
456 		    = (char *)malloc(iosize)) == NULL))
457 			return (FILEBENCH_ERROR);
458 
459 		flowop->fo_buf_size = iosize;
460 		*iobufp = flowop->fo_buf;
461 	}
462 	return (FILEBENCH_OK);
463 }
464 
465 /*
466  * Determines the file descriptor to use, opens it if necessary, the
467  * io buffer or random offset into tf_mem for IO operation and the wss
468  * value. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
469  */
470 static int
471 flowoplib_iosetup(threadflow_t *threadflow, flowop_t *flowop,
472     fbint_t *wssp, caddr_t *iobufp, int *filedescp, fbint_t iosize)
473 {
474 	int ret;
475 
476 	if ((ret = flowoplib_filesetup(threadflow, flowop, wssp, filedescp)) !=
477 	    FILEBENCH_OK)
478 		return (ret);
479 
480 	if ((ret = flowoplib_iobufsetup(threadflow, flowop, iobufp, iosize)) !=
481 	    FILEBENCH_OK)
482 		return (ret);
483 
484 	return (FILEBENCH_OK);
485 }
486 
487 /*
488  * Emulate posix read / pread. If the flowop has a fileset,
489  * a file descriptor number index is fetched, otherwise a
490  * supplied fileobj file is used. In either case the specified
491  * file will be opened if not already open. If the flowop has
492  * neither a fileset or fileobj, an error is logged and FILEBENCH_ERROR
493  * returned.
494  *
495  * The actual read is done to a random offset in the
496  * threadflow's thread memory (tf_mem), with a size set by
497  * fo_iosize and at either a random disk offset within the
498  * working set size, or at the next sequential location. If
499  * any errors are encountered, FILEBENCH_ERROR is returned,
500  * if no appropriate file can be obtained from the fileset then
501  * FILEBENCH_NORSC is returned, otherise FILEBENCH_OK is returned.
502  */
503 static int
504 flowoplib_read(threadflow_t *threadflow, flowop_t *flowop)
505 {
506 	caddr_t iobuf;
507 	fbint_t wss;
508 	fbint_t iosize;
509 	int filedesc;
510 	int ret;
511 
512 
513 	iosize = avd_get_int(flowop->fo_iosize);
514 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
515 	    &filedesc, iosize)) != FILEBENCH_OK)
516 		return (ret);
517 
518 	if (avd_get_bool(flowop->fo_random)) {
519 		uint64_t fileoffset;
520 
521 		if (filebench_randomno64(&fileoffset,
522 		    wss, iosize, NULL) == -1) {
523 			filebench_log(LOG_ERROR,
524 			    "file size smaller than IO size for thread %s",
525 			    flowop->fo_name);
526 			return (FILEBENCH_ERROR);
527 		}
528 
529 		(void) flowop_beginop(threadflow, flowop);
530 		if ((ret = pread64(filedesc, iobuf,
531 		    iosize, (off64_t)fileoffset)) == -1) {
532 			(void) flowop_endop(threadflow, flowop, 0);
533 			filebench_log(LOG_ERROR,
534 			    "read file %s failed, offset %llu "
535 			    "io buffer %zd: %s",
536 			    avd_get_str(flowop->fo_fileset->fs_name),
537 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
538 			flowop_endop(threadflow, flowop, 0);
539 			return (FILEBENCH_ERROR);
540 		}
541 		(void) flowop_endop(threadflow, flowop, ret);
542 
543 		if ((ret == 0))
544 			(void) lseek64(filedesc, 0, SEEK_SET);
545 
546 	} else {
547 		(void) flowop_beginop(threadflow, flowop);
548 		if ((ret = read(filedesc, iobuf, iosize)) == -1) {
549 			(void) flowop_endop(threadflow, flowop, 0);
550 			filebench_log(LOG_ERROR,
551 			    "read file %s failed, io buffer %zd: %s",
552 			    avd_get_str(flowop->fo_fileset->fs_name),
553 			    iobuf, strerror(errno));
554 			(void) flowop_endop(threadflow, flowop, 0);
555 			return (FILEBENCH_ERROR);
556 		}
557 		(void) flowop_endop(threadflow, flowop, ret);
558 
559 		if ((ret == 0))
560 			(void) lseek64(filedesc, 0, SEEK_SET);
561 	}
562 
563 	return (FILEBENCH_OK);
564 }
565 
566 #ifdef HAVE_AIO
567 
568 /*
569  * Asynchronous write section. An Asynchronous IO element
570  * (aiolist_t) is used to associate the asynchronous write request with
571  * its subsequent completion. This element includes a aiocb64 struct
572  * that is used by posix aio_xxx calls to track the asynchronous writes.
573  * The flowops aiowrite and aiowait result in calls to these posix
574  * aio_xxx system routines to do the actual asynchronous write IO
575  * operations.
576  */
577 
578 
579 /*
580  * Allocates an asynchronous I/O list (aio, of type
581  * aiolist_t) element. Adds it to the flowop thread's
582  * threadflow aio list. Returns a pointer to the element.
583  */
584 static aiolist_t *
585 aio_allocate(flowop_t *flowop)
586 {
587 	aiolist_t *aiolist;
588 
589 	if ((aiolist = malloc(sizeof (aiolist_t))) == NULL) {
590 		filebench_log(LOG_ERROR, "malloc aiolist failed");
591 		filebench_shutdown(1);
592 	}
593 
594 	/* Add to list */
595 	if (flowop->fo_thread->tf_aiolist == NULL) {
596 		flowop->fo_thread->tf_aiolist = aiolist;
597 		aiolist->al_next = NULL;
598 	} else {
599 		aiolist->al_next = flowop->fo_thread->tf_aiolist;
600 		flowop->fo_thread->tf_aiolist = aiolist;
601 	}
602 	return (aiolist);
603 }
604 
605 /*
606  * Searches for the aiolist element that has a matching
607  * completion block, aiocb. If none found returns FILEBENCH_ERROR. If
608  * found, removes the aiolist element from flowop thread's
609  * list and returns FILEBENCH_OK.
610  */
611 static int
612 aio_deallocate(flowop_t *flowop, struct aiocb64 *aiocb)
613 {
614 	aiolist_t *aiolist = flowop->fo_thread->tf_aiolist;
615 	aiolist_t *previous = NULL;
616 	aiolist_t *match = NULL;
617 
618 	if (aiocb == NULL) {
619 		filebench_log(LOG_ERROR, "null aiocb deallocate");
620 		return (FILEBENCH_OK);
621 	}
622 
623 	while (aiolist) {
624 		if (aiocb == &(aiolist->al_aiocb)) {
625 			match = aiolist;
626 			break;
627 		}
628 		previous = aiolist;
629 		aiolist = aiolist->al_next;
630 	}
631 
632 	if (match == NULL)
633 		return (FILEBENCH_ERROR);
634 
635 	/* Remove from the list */
636 	if (previous)
637 		previous->al_next = match->al_next;
638 	else
639 		flowop->fo_thread->tf_aiolist = match->al_next;
640 
641 	return (FILEBENCH_OK);
642 }
643 
644 /*
645  * Emulate posix aiowrite(). Determines which file to use,
646  * either one file of a fileset, or the file associated
647  * with a fileobj, allocates and fills an aiolist_t element
648  * for the write, and issues the asynchronous write. This
649  * operation is only valid for random IO, and returns an
650  * error if the flowop is set for sequential IO. Returns
651  * FILEBENCH_OK on success, FILEBENCH_NORSC if iosetup can't
652  * obtain a file to open, and FILEBENCH_ERROR on any
653  * encountered error.
654  */
655 static int
656 flowoplib_aiowrite(threadflow_t *threadflow, flowop_t *flowop)
657 {
658 	caddr_t iobuf;
659 	fbint_t wss;
660 	fbint_t iosize;
661 	int filedesc;
662 	int ret;
663 
664 	iosize = avd_get_int(flowop->fo_iosize);
665 
666 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
667 	    &filedesc, iosize)) != FILEBENCH_OK)
668 		return (ret);
669 
670 	if (avd_get_bool(flowop->fo_random)) {
671 		uint64_t fileoffset;
672 		struct aiocb64 *aiocb;
673 		aiolist_t *aiolist;
674 
675 		if (filebench_randomno64(&fileoffset,
676 		    wss, iosize, NULL) == -1) {
677 			filebench_log(LOG_ERROR,
678 			    "file size smaller than IO size for thread %s",
679 			    flowop->fo_name);
680 			return (FILEBENCH_ERROR);
681 		}
682 
683 		aiolist = aio_allocate(flowop);
684 		aiolist->al_type = AL_WRITE;
685 		aiocb = &aiolist->al_aiocb;
686 
687 		aiocb->aio_fildes = filedesc;
688 		aiocb->aio_buf = iobuf;
689 		aiocb->aio_nbytes = (size_t)iosize;
690 		aiocb->aio_offset = (off64_t)fileoffset;
691 		aiocb->aio_reqprio = 0;
692 
693 		filebench_log(LOG_DEBUG_IMPL,
694 		    "aio fd=%d, bytes=%llu, offset=%llu",
695 		    filedesc, (u_longlong_t)iosize, (u_longlong_t)fileoffset);
696 
697 		flowop_beginop(threadflow, flowop);
698 		if (aio_write64(aiocb) < 0) {
699 			filebench_log(LOG_ERROR, "aiowrite failed: %s",
700 			    strerror(errno));
701 			filebench_shutdown(1);
702 		}
703 		flowop_endop(threadflow, flowop, iosize);
704 	} else {
705 		return (FILEBENCH_ERROR);
706 	}
707 
708 	return (FILEBENCH_OK);
709 }
710 
711 
712 
713 #define	MAXREAP 4096
714 
715 /*
716  * Emulate posix aiowait(). Waits for the completion of half the
717  * outstanding asynchronous IOs, or a single IO, which ever is
718  * larger. The routine will return after a sufficient number of
719  * completed calls issued by any thread in the procflow have
720  * completed, or a 1 second timout elapses. All completed
721  * IO operations are deleted from the thread's aiolist.
722  */
723 static int
724 flowoplib_aiowait(threadflow_t *threadflow, flowop_t *flowop)
725 {
726 	struct aiocb64 **worklist;
727 	aiolist_t *aio = flowop->fo_thread->tf_aiolist;
728 	int uncompleted = 0;
729 
730 	worklist = calloc(MAXREAP, sizeof (struct aiocb64 *));
731 
732 	/* Count the list of pending aios */
733 	while (aio) {
734 		uncompleted++;
735 		aio = aio->al_next;
736 	}
737 
738 	do {
739 		uint_t ncompleted = 0;
740 		uint_t todo;
741 		struct timespec timeout;
742 		int inprogress;
743 		int i;
744 
745 		/* Wait for half of the outstanding requests */
746 		timeout.tv_sec = 1;
747 		timeout.tv_nsec = 0;
748 
749 		if (uncompleted > MAXREAP)
750 			todo = MAXREAP;
751 		else
752 			todo = uncompleted / 2;
753 
754 		if (todo == 0)
755 			todo = 1;
756 
757 		flowop_beginop(threadflow, flowop);
758 
759 #ifdef HAVE_AIOWAITN
760 		if ((aio_waitn64((struct aiocb64 **)worklist,
761 		    MAXREAP, &todo, &timeout) == -1) &&
762 		    errno && (errno != ETIME)) {
763 			filebench_log(LOG_ERROR,
764 			    "aiowait failed: %s, outstanding = %d, "
765 			    "ncompleted = %d ",
766 			    strerror(errno), uncompleted, todo);
767 		}
768 
769 		ncompleted = todo;
770 		/* Take the  completed I/Os from the list */
771 		inprogress = 0;
772 		for (i = 0; i < ncompleted; i++) {
773 			if ((aio_return64(worklist[i]) == -1) &&
774 			    (errno == EINPROGRESS)) {
775 				inprogress++;
776 				continue;
777 			}
778 			if (aio_deallocate(flowop, worklist[i]) < 0) {
779 				filebench_log(LOG_ERROR, "Could not remove "
780 				    "aio from list ");
781 				flowop_endop(threadflow, flowop, 0);
782 				return (FILEBENCH_ERROR);
783 			}
784 		}
785 
786 		uncompleted -= ncompleted;
787 		uncompleted += inprogress;
788 
789 #else
790 
791 		for (ncompleted = 0, inprogress = 0,
792 		    aio = flowop->fo_thread->tf_aiolist;
793 		    ncompleted < todo, aio != NULL; aio = aio->al_next) {
794 			int result = aio_error64(&aio->al_aiocb);
795 
796 			if (result == EINPROGRESS) {
797 				inprogress++;
798 				continue;
799 			}
800 
801 			if ((aio_return64(&aio->al_aiocb) == -1) || result) {
802 				filebench_log(LOG_ERROR, "aio failed: %s",
803 				    strerror(result));
804 				continue;
805 			}
806 
807 			ncompleted++;
808 
809 			if (aio_deallocate(flowop, &aio->al_aiocb) < 0) {
810 				filebench_log(LOG_ERROR, "Could not remove aio "
811 				    "from list ");
812 				flowop_endop(threadflow, flowop, 0);
813 				return (FILEBENCH_ERROR);
814 			}
815 		}
816 
817 		uncompleted -= ncompleted;
818 
819 #endif
820 		filebench_log(LOG_DEBUG_SCRIPT,
821 		    "aio2 completed %d ios, uncompleted = %d, inprogress = %d",
822 		    ncompleted, uncompleted, inprogress);
823 
824 	} while (uncompleted > MAXREAP);
825 
826 	flowop_endop(threadflow, flowop, 0);
827 
828 	free(worklist);
829 
830 	return (FILEBENCH_OK);
831 }
832 
833 #endif /* HAVE_AIO */
834 
835 /*
836  * Initializes a "flowop_block" flowop. Specifically, it
837  * initializes the flowop's fo_cv and unlocks the fo_lock.
838  */
839 static int
840 flowoplib_block_init(flowop_t *flowop)
841 {
842 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d block init address %zx",
843 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
844 	(void) pthread_cond_init(&flowop->fo_cv, ipc_condattr());
845 	(void) ipc_mutex_unlock(&flowop->fo_lock);
846 
847 	return (FILEBENCH_OK);
848 }
849 
850 /*
851  * Blocks the threadflow until woken up by flowoplib_wakeup.
852  * The routine blocks on the flowop's fo_cv condition variable.
853  */
854 static int
855 flowoplib_block(threadflow_t *threadflow, flowop_t *flowop)
856 {
857 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d blocking at address %zx",
858 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
859 	(void) ipc_mutex_lock(&flowop->fo_lock);
860 
861 	flowop_beginop(threadflow, flowop);
862 	(void) pthread_cond_wait(&flowop->fo_cv, &flowop->fo_lock);
863 	flowop_endop(threadflow, flowop, 0);
864 
865 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
866 	    flowop->fo_name, flowop->fo_instance);
867 
868 	(void) ipc_mutex_unlock(&flowop->fo_lock);
869 
870 	return (FILEBENCH_OK);
871 }
872 
873 /*
874  * Wakes up one or more target blocking flowops.
875  * Sends broadcasts on the fo_cv condition variables of all
876  * flowops on the target list, except those that are
877  * FLOW_MASTER flowops. The target list consists of all
878  * flowops whose name matches this flowop's "fo_targetname"
879  * attribute. The target list is generated on the first
880  * invocation, and the run will be shutdown if no targets
881  * are found. Otherwise the routine always returns FILEBENCH_OK.
882  */
883 static int
884 flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop)
885 {
886 	flowop_t *target;
887 
888 	/* if this is the first wakeup, create the wakeup list */
889 	if (flowop->fo_targets == NULL) {
890 		flowop_t *result = flowop_find(flowop->fo_targetname);
891 
892 		flowop->fo_targets = result;
893 		if (result == NULL) {
894 			filebench_log(LOG_ERROR,
895 			    "wakeup: could not find op %s for thread %s",
896 			    flowop->fo_targetname,
897 			    threadflow->tf_name);
898 			filebench_shutdown(1);
899 		}
900 		while (result) {
901 			result->fo_targetnext =
902 			    result->fo_resultnext;
903 			result = result->fo_resultnext;
904 		}
905 	}
906 
907 	target = flowop->fo_targets;
908 
909 	/* wakeup the targets */
910 	while (target) {
911 		if (target->fo_instance == FLOW_MASTER) {
912 			target = target->fo_targetnext;
913 			continue;
914 		}
915 		filebench_log(LOG_DEBUG_IMPL,
916 		    "wakeup flow %s-%d at address %zx",
917 		    target->fo_name,
918 		    target->fo_instance,
919 		    &target->fo_cv);
920 
921 		flowop_beginop(threadflow, flowop);
922 		(void) ipc_mutex_lock(&target->fo_lock);
923 		(void) pthread_cond_broadcast(&target->fo_cv);
924 		(void) ipc_mutex_unlock(&target->fo_lock);
925 		flowop_endop(threadflow, flowop, 0);
926 
927 		target = target->fo_targetnext;
928 	}
929 
930 	return (FILEBENCH_OK);
931 }
932 
933 /*
934  * "think time" routines. the "hog" routine consumes cpu cycles as
935  * it "thinks", while the "delay" flowop simply calls sleep() to delay
936  * for a given number of seconds without consuming cpu cycles.
937  */
938 
939 
940 /*
941  * Consumes CPU cycles and memory bandwidth by looping for
942  * flowop->fo_value times. With each loop sets memory location
943  * threadflow->tf_mem to 1.
944  */
945 static int
946 flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop)
947 {
948 	uint64_t value = avd_get_int(flowop->fo_value);
949 	int i;
950 
951 	filebench_log(LOG_DEBUG_IMPL, "hog enter");
952 	flowop_beginop(threadflow, flowop);
953 	if (threadflow->tf_mem != NULL) {
954 		for (i = 0; i < value; i++)
955 			*(threadflow->tf_mem) = 1;
956 	}
957 	flowop_endop(threadflow, flowop, 0);
958 	filebench_log(LOG_DEBUG_IMPL, "hog exit");
959 	return (FILEBENCH_OK);
960 }
961 
962 
963 /*
964  * Delays for fo_value seconds.
965  */
966 static int
967 flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop)
968 {
969 	int value = avd_get_int(flowop->fo_value);
970 
971 	flowop_beginop(threadflow, flowop);
972 	(void) sleep(value);
973 	flowop_endop(threadflow, flowop, 0);
974 	return (FILEBENCH_OK);
975 }
976 
977 /*
978  * Rate limiting routines. This is the event consuming half of the
979  * event system. Each of the four following routines will limit the rate
980  * to one unit of either calls, issued I/O operations, issued filebench
981  * operations, or I/O bandwidth. Since there is only one event generator,
982  * the events will be divided amoung multiple instances of an event
983  * consumer, and further divided among different consumers if more than
984  * one has been defined. There is no mechanism to enforce equal sharing
985  * of events.
986  */
987 
988 /*
989  * Completes one invocation per posted event. If eventgen_q
990  * has an event count greater than zero, one will be removed
991  * (count decremented), otherwise the calling thread will
992  * block until another event has been posted. Always returns 0
993  */
994 static int
995 flowoplib_eventlimit(threadflow_t *threadflow, flowop_t *flowop)
996 {
997 	/* Immediately bail if not set/enabled */
998 	if (filebench_shm->shm_eventgen_hz == 0)
999 		return (FILEBENCH_OK);
1000 
1001 	if (flowop->fo_initted == 0) {
1002 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1003 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1004 		flowop->fo_initted = 1;
1005 	}
1006 
1007 	flowop_beginop(threadflow, flowop);
1008 	while (filebench_shm->shm_eventgen_hz) {
1009 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1010 		if (filebench_shm->shm_eventgen_q > 0) {
1011 			filebench_shm->shm_eventgen_q--;
1012 			(void) ipc_mutex_unlock(
1013 			    &filebench_shm->shm_eventgen_lock);
1014 			break;
1015 		}
1016 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1017 		    &filebench_shm->shm_eventgen_lock);
1018 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1019 	}
1020 	flowop_endop(threadflow, flowop, 0);
1021 	return (FILEBENCH_OK);
1022 }
1023 
1024 static int
1025 flowoplib_event_find_target(threadflow_t *threadflow, flowop_t *flowop)
1026 {
1027 	if (flowop->fo_targetname[0] != '\0') {
1028 
1029 		/* Try to use statistics from specific flowop */
1030 		flowop->fo_targets =
1031 		    flowop_find_from_list(flowop->fo_targetname,
1032 		    threadflow->tf_thrd_fops);
1033 		if (flowop->fo_targets == NULL) {
1034 			filebench_log(LOG_ERROR,
1035 			    "limit target: could not find flowop %s",
1036 			    flowop->fo_targetname);
1037 			filebench_shutdown(1);
1038 			return (FILEBENCH_ERROR);
1039 		}
1040 	} else {
1041 		/* use total workload statistics */
1042 		flowop->fo_targets = NULL;
1043 	}
1044 	return (FILEBENCH_OK);
1045 }
1046 
1047 /*
1048  * Blocks the calling thread if the number of issued I/O
1049  * operations exceeds the number of posted events, thus
1050  * limiting the average I/O operation rate to the rate
1051  * specified by eventgen_hz. Always returns FILEBENCH_OK.
1052  */
1053 static int
1054 flowoplib_iopslimit(threadflow_t *threadflow, flowop_t *flowop)
1055 {
1056 	uint64_t iops;
1057 	uint64_t delta;
1058 	uint64_t events;
1059 
1060 	/* Immediately bail if not set/enabled */
1061 	if (filebench_shm->shm_eventgen_hz == 0)
1062 		return (FILEBENCH_OK);
1063 
1064 	if (flowop->fo_initted == 0) {
1065 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1066 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1067 		flowop->fo_initted = 1;
1068 
1069 		if (flowoplib_event_find_target(threadflow, flowop)
1070 		    == FILEBENCH_ERROR)
1071 			return (FILEBENCH_ERROR);
1072 
1073 		if (flowop->fo_targets && ((flowop->fo_targets->fo_attrs &
1074 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
1075 			filebench_log(LOG_ERROR,
1076 			    "WARNING: Flowop %s does no IO",
1077 			    flowop->fo_targets->fo_name);
1078 			filebench_shutdown(1);
1079 			return (FILEBENCH_ERROR);
1080 		}
1081 	}
1082 
1083 	if (flowop->fo_targets) {
1084 		/*
1085 		 * Note that fs_count is already the sum of fs_rcount
1086 		 * and fs_wcount if looking at a single flowop.
1087 		 */
1088 		iops = flowop->fo_targets->fo_stats.fs_count;
1089 	} else {
1090 		(void) ipc_mutex_lock(&controlstats_lock);
1091 		iops = (controlstats.fs_rcount +
1092 		    controlstats.fs_wcount);
1093 		(void) ipc_mutex_unlock(&controlstats_lock);
1094 	}
1095 
1096 	/* Is this the first time around */
1097 	if (flowop->fo_tputlast == 0) {
1098 		flowop->fo_tputlast = iops;
1099 		return (FILEBENCH_OK);
1100 	}
1101 
1102 	delta = iops - flowop->fo_tputlast;
1103 	flowop->fo_tputbucket -= delta;
1104 	flowop->fo_tputlast = iops;
1105 
1106 	/* No need to block if the q isn't empty */
1107 	if (flowop->fo_tputbucket >= 0LL) {
1108 		flowop_endop(threadflow, flowop, 0);
1109 		return (FILEBENCH_OK);
1110 	}
1111 
1112 	iops = flowop->fo_tputbucket * -1;
1113 	events = iops;
1114 
1115 	flowop_beginop(threadflow, flowop);
1116 	while (filebench_shm->shm_eventgen_hz) {
1117 
1118 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1119 		if (filebench_shm->shm_eventgen_q >= events) {
1120 			filebench_shm->shm_eventgen_q -= events;
1121 			(void) ipc_mutex_unlock(
1122 			    &filebench_shm->shm_eventgen_lock);
1123 			flowop->fo_tputbucket += events;
1124 			break;
1125 		}
1126 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1127 		    &filebench_shm->shm_eventgen_lock);
1128 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1129 	}
1130 	flowop_endop(threadflow, flowop, 0);
1131 
1132 	return (FILEBENCH_OK);
1133 }
1134 
1135 /*
1136  * Blocks the calling thread if the number of issued filebench
1137  * operations exceeds the number of posted events, thus limiting
1138  * the average filebench operation rate to the rate specified by
1139  * eventgen_hz. Always returns FILEBENCH_OK.
1140  */
1141 static int
1142 flowoplib_opslimit(threadflow_t *threadflow, flowop_t *flowop)
1143 {
1144 	uint64_t ops;
1145 	uint64_t delta;
1146 	uint64_t events;
1147 
1148 	/* Immediately bail if not set/enabled */
1149 	if (filebench_shm->shm_eventgen_hz == 0)
1150 		return (FILEBENCH_OK);
1151 
1152 	if (flowop->fo_initted == 0) {
1153 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1154 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1155 		flowop->fo_initted = 1;
1156 
1157 		if (flowoplib_event_find_target(threadflow, flowop)
1158 		    == FILEBENCH_ERROR)
1159 			return (FILEBENCH_ERROR);
1160 	}
1161 
1162 	if (flowop->fo_targets) {
1163 		ops = flowop->fo_targets->fo_stats.fs_count;
1164 	} else {
1165 		(void) ipc_mutex_lock(&controlstats_lock);
1166 		ops = controlstats.fs_count;
1167 		(void) ipc_mutex_unlock(&controlstats_lock);
1168 	}
1169 
1170 	/* Is this the first time around */
1171 	if (flowop->fo_tputlast == 0) {
1172 		flowop->fo_tputlast = ops;
1173 		return (FILEBENCH_OK);
1174 	}
1175 
1176 	delta = ops - flowop->fo_tputlast;
1177 	flowop->fo_tputbucket -= delta;
1178 	flowop->fo_tputlast = ops;
1179 
1180 	/* No need to block if the q isn't empty */
1181 	if (flowop->fo_tputbucket >= 0LL) {
1182 		flowop_endop(threadflow, flowop, 0);
1183 		return (FILEBENCH_OK);
1184 	}
1185 
1186 	ops = flowop->fo_tputbucket * -1;
1187 	events = ops;
1188 
1189 	flowop_beginop(threadflow, flowop);
1190 	while (filebench_shm->shm_eventgen_hz) {
1191 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1192 		if (filebench_shm->shm_eventgen_q >= events) {
1193 			filebench_shm->shm_eventgen_q -= events;
1194 			(void) ipc_mutex_unlock(
1195 			    &filebench_shm->shm_eventgen_lock);
1196 			flowop->fo_tputbucket += events;
1197 			break;
1198 		}
1199 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1200 		    &filebench_shm->shm_eventgen_lock);
1201 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1202 	}
1203 	flowop_endop(threadflow, flowop, 0);
1204 
1205 	return (FILEBENCH_OK);
1206 }
1207 
1208 
1209 /*
1210  * Blocks the calling thread if the number of bytes of I/O
1211  * issued exceeds one megabyte times the number of posted
1212  * events, thus limiting the average I/O byte rate to one
1213  * megabyte times the event rate as set by eventgen_hz.
1214  * Always retuns FILEBENCH_OK.
1215  */
1216 static int
1217 flowoplib_bwlimit(threadflow_t *threadflow, flowop_t *flowop)
1218 {
1219 	uint64_t bytes;
1220 	uint64_t delta;
1221 	uint64_t events;
1222 
1223 	/* Immediately bail if not set/enabled */
1224 	if (filebench_shm->shm_eventgen_hz == 0)
1225 		return (FILEBENCH_OK);
1226 
1227 	if (flowop->fo_initted == 0) {
1228 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1229 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1230 		flowop->fo_initted = 1;
1231 
1232 		if (flowoplib_event_find_target(threadflow, flowop)
1233 		    == FILEBENCH_ERROR)
1234 			return (FILEBENCH_ERROR);
1235 
1236 		if ((flowop->fo_targets) &&
1237 		    ((flowop->fo_targets->fo_attrs &
1238 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
1239 			filebench_log(LOG_ERROR,
1240 			    "WARNING: Flowop %s does no Reads or Writes",
1241 			    flowop->fo_targets->fo_name);
1242 			filebench_shutdown(1);
1243 			return (FILEBENCH_ERROR);
1244 		}
1245 	}
1246 
1247 	if (flowop->fo_targets) {
1248 		/*
1249 		 * Note that fs_bytes is already the sum of fs_rbytes
1250 		 * and fs_wbytes if looking at a single flowop.
1251 		 */
1252 		bytes = flowop->fo_targets->fo_stats.fs_bytes;
1253 	} else {
1254 		(void) ipc_mutex_lock(&controlstats_lock);
1255 		bytes = (controlstats.fs_rbytes +
1256 		    controlstats.fs_wbytes);
1257 		(void) ipc_mutex_unlock(&controlstats_lock);
1258 	}
1259 
1260 	/* Is this the first time around? */
1261 	if (flowop->fo_tputlast == 0) {
1262 		flowop->fo_tputlast = bytes;
1263 		return (FILEBENCH_OK);
1264 	}
1265 
1266 	delta = bytes - flowop->fo_tputlast;
1267 	flowop->fo_tputbucket -= delta;
1268 	flowop->fo_tputlast = bytes;
1269 
1270 	/* No need to block if the q isn't empty */
1271 	if (flowop->fo_tputbucket >= 0LL) {
1272 		flowop_endop(threadflow, flowop, 0);
1273 		return (FILEBENCH_OK);
1274 	}
1275 
1276 	bytes = flowop->fo_tputbucket * -1;
1277 	events = (bytes / MB) + 1;
1278 
1279 	filebench_log(LOG_DEBUG_IMPL, "%llu bytes, %llu events",
1280 	    (u_longlong_t)bytes, (u_longlong_t)events);
1281 
1282 	flowop_beginop(threadflow, flowop);
1283 	while (filebench_shm->shm_eventgen_hz) {
1284 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1285 		if (filebench_shm->shm_eventgen_q >= events) {
1286 			filebench_shm->shm_eventgen_q -= events;
1287 			(void) ipc_mutex_unlock(
1288 			    &filebench_shm->shm_eventgen_lock);
1289 			flowop->fo_tputbucket += (events * MB);
1290 			break;
1291 		}
1292 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1293 		    &filebench_shm->shm_eventgen_lock);
1294 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1295 	}
1296 	flowop_endop(threadflow, flowop, 0);
1297 
1298 	return (FILEBENCH_OK);
1299 }
1300 
1301 /*
1302  * These flowops terminate a benchmark run when either the specified
1303  * number of bytes of I/O (flowoplib_finishonbytes) or the specified
1304  * number of I/O operations (flowoplib_finishoncount) have been generated.
1305  */
1306 
1307 
1308 /*
1309  * Stop filebench run when specified number of I/O bytes have been
1310  * transferred. Compares controlstats.fs_bytes with flowop->value,
1311  * and if greater returns 1, stopping the run, if not, returns 0
1312  * to continue running.
1313  */
1314 static int
1315 flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop)
1316 {
1317 	uint64_t bytes_io;		/* Bytes of I/O delivered so far */
1318 	uint64_t byte_lim = flowop->fo_constvalue;  /* Total Bytes desired */
1319 						    /* Uses constant value */
1320 
1321 	if (flowop->fo_initted == 0) {
1322 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1323 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1324 		flowop->fo_initted = 1;
1325 
1326 		if (flowoplib_event_find_target(threadflow, flowop)
1327 		    == FILEBENCH_ERROR)
1328 			return (FILEBENCH_ERROR);
1329 
1330 		if ((flowop->fo_targets) &&
1331 		    ((flowop->fo_targets->fo_attrs &
1332 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
1333 			filebench_log(LOG_ERROR,
1334 			    "WARNING: Flowop %s does no Reads or Writes",
1335 			    flowop->fo_targets->fo_name);
1336 			filebench_shutdown(1);
1337 			return (FILEBENCH_ERROR);
1338 		}
1339 	}
1340 
1341 	if (flowop->fo_targets) {
1342 		bytes_io = flowop->fo_targets->fo_stats.fs_bytes;
1343 	} else {
1344 		(void) ipc_mutex_lock(&controlstats_lock);
1345 		bytes_io = controlstats.fs_bytes;
1346 		(void) ipc_mutex_unlock(&controlstats_lock);
1347 	}
1348 
1349 	flowop_beginop(threadflow, flowop);
1350 	if (bytes_io > byte_lim) {
1351 		flowop_endop(threadflow, flowop, 0);
1352 		return (FILEBENCH_DONE);
1353 	}
1354 	flowop_endop(threadflow, flowop, 0);
1355 
1356 	return (FILEBENCH_OK);
1357 }
1358 
1359 /*
1360  * Stop filebench run when specified number of I/O operations have
1361  * been performed. Compares controlstats.fs_count with *flowop->value,
1362  * and if greater returns 1, stopping the run, if not, returns FILEBENCH_OK
1363  * to continue running.
1364  */
1365 static int
1366 flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop)
1367 {
1368 	uint64_t ops;
1369 	uint64_t count = flowop->fo_constvalue; /* use constant value */
1370 
1371 	if (flowop->fo_initted == 0) {
1372 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1373 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1374 		flowop->fo_initted = 1;
1375 
1376 		if (flowoplib_event_find_target(threadflow, flowop)
1377 		    == FILEBENCH_ERROR)
1378 			return (FILEBENCH_ERROR);
1379 	}
1380 
1381 	if (flowop->fo_targets) {
1382 		ops = flowop->fo_targets->fo_stats.fs_count;
1383 	} else {
1384 		(void) ipc_mutex_lock(&controlstats_lock);
1385 		ops = controlstats.fs_count;
1386 		(void) ipc_mutex_unlock(&controlstats_lock);
1387 	}
1388 
1389 	flowop_beginop(threadflow, flowop);
1390 	if (ops >= count) {
1391 		flowop_endop(threadflow, flowop, 0);
1392 		return (FILEBENCH_DONE);
1393 	}
1394 	flowop_endop(threadflow, flowop, 0);
1395 
1396 	return (FILEBENCH_OK);
1397 }
1398 
1399 /*
1400  * Semaphore synchronization using either System V semaphores or
1401  * posix semaphores. If System V semaphores are available, they will be
1402  * used, otherwise posix semaphores will be used.
1403  */
1404 
1405 
1406 /*
1407  * Initializes the filebench "block on semaphore" flowop.
1408  * If System V semaphores are implemented, the routine
1409  * initializes the System V semaphore subsystem if it hasn't
1410  * already been initialized, also allocates a pair of semids
1411  * and initializes the highwater System V semaphore.
1412  * If no System V semaphores, then does nothing special.
1413  * Returns FILEBENCH_ERROR if it cannot acquire a set of System V semphores
1414  * or if the initial post to the semaphore set fails. Returns FILEBENCH_OK
1415  * on success.
1416  */
1417 static int
1418 flowoplib_semblock_init(flowop_t *flowop)
1419 {
1420 
1421 #ifdef HAVE_SYSV_SEM
1422 	int sys_semid;
1423 	struct sembuf sbuf[2];
1424 	int highwater;
1425 
1426 	ipc_seminit();
1427 
1428 	flowop->fo_semid_lw = ipc_semidalloc();
1429 	flowop->fo_semid_hw = ipc_semidalloc();
1430 
1431 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d semblock init semid=%x",
1432 	    flowop->fo_name, flowop->fo_instance, flowop->fo_semid_lw);
1433 
1434 	sys_semid = filebench_shm->shm_sys_semid;
1435 
1436 	if ((highwater = flowop->fo_semid_hw) == 0)
1437 		highwater = flowop->fo_constvalue; /* use constant value */
1438 
1439 	filebench_log(LOG_DEBUG_IMPL, "setting highwater to : %d", highwater);
1440 
1441 	sbuf[0].sem_num = (short)highwater;
1442 	sbuf[0].sem_op = avd_get_int(flowop->fo_highwater);
1443 	sbuf[0].sem_flg = 0;
1444 	if ((semop(sys_semid, &sbuf[0], 1) == -1) && errno) {
1445 		filebench_log(LOG_ERROR, "semblock init post failed: %s (%d,"
1446 		    "%d)", strerror(errno), sbuf[0].sem_num, sbuf[0].sem_op);
1447 		return (FILEBENCH_ERROR);
1448 	}
1449 #else
1450 	filebench_log(LOG_DEBUG_IMPL,
1451 	    "flow %s-%d semblock init with posix semaphore",
1452 	    flowop->fo_name, flowop->fo_instance);
1453 
1454 	sem_init(&flowop->fo_sem, 1, 0);
1455 #endif	/* HAVE_SYSV_SEM */
1456 
1457 	if (!(avd_get_bool(flowop->fo_blocking)))
1458 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1459 
1460 	return (FILEBENCH_OK);
1461 }
1462 
1463 /*
1464  * Releases the semids for the System V semaphore allocated
1465  * to this flowop. If not using System V semaphores, then
1466  * it is effectively just a no-op.
1467  */
1468 static void
1469 flowoplib_semblock_destruct(flowop_t *flowop)
1470 {
1471 #ifdef HAVE_SYSV_SEM
1472 	ipc_semidfree(flowop->fo_semid_lw);
1473 	ipc_semidfree(flowop->fo_semid_hw);
1474 	(void) semctl(filebench_shm->shm_sys_semid, 0, IPC_RMID);
1475 	filebench_shm->shm_sys_semid = -1;
1476 #else
1477 	sem_destroy(&flowop->fo_sem);
1478 #endif /* HAVE_SYSV_SEM */
1479 }
1480 
1481 /*
1482  * Attempts to pass a System V or posix semaphore as appropriate,
1483  * and blocks if necessary. Returns FILEBENCH_ERROR if a set of System V
1484  * semphores is not available or cannot be acquired, or if the initial
1485  * post to the semaphore set fails. Returns FILEBENCH_OK on success.
1486  */
1487 static int
1488 flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop)
1489 {
1490 
1491 #ifdef HAVE_SYSV_SEM
1492 	struct sembuf sbuf[2];
1493 	int value = avd_get_int(flowop->fo_value);
1494 	int sys_semid;
1495 	struct timespec timeout;
1496 
1497 	sys_semid = filebench_shm->shm_sys_semid;
1498 
1499 	filebench_log(LOG_DEBUG_IMPL,
1500 	    "flow %s-%d sem blocking on id %x num %x value %d",
1501 	    flowop->fo_name, flowop->fo_instance, sys_semid,
1502 	    flowop->fo_semid_hw, value);
1503 
1504 	/* Post, decrement the increment the hw queue */
1505 	sbuf[0].sem_num = flowop->fo_semid_hw;
1506 	sbuf[0].sem_op = (short)value;
1507 	sbuf[0].sem_flg = 0;
1508 	sbuf[1].sem_num = flowop->fo_semid_lw;
1509 	sbuf[1].sem_op = value * -1;
1510 	sbuf[1].sem_flg = 0;
1511 	timeout.tv_sec = 600;
1512 	timeout.tv_nsec = 0;
1513 
1514 	if (avd_get_bool(flowop->fo_blocking))
1515 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1516 
1517 	flowop_beginop(threadflow, flowop);
1518 
1519 #ifdef HAVE_SEMTIMEDOP
1520 	(void) semtimedop(sys_semid, &sbuf[0], 1, &timeout);
1521 	(void) semtimedop(sys_semid, &sbuf[1], 1, &timeout);
1522 #else
1523 	(void) semop(sys_semid, &sbuf[0], 1);
1524 	(void) semop(sys_semid, &sbuf[1], 1);
1525 #endif /* HAVE_SEMTIMEDOP */
1526 
1527 	if (avd_get_bool(flowop->fo_blocking))
1528 		(void) ipc_mutex_lock(&flowop->fo_lock);
1529 
1530 	flowop_endop(threadflow, flowop, 0);
1531 
1532 #else
1533 	int value = avd_get_int(flowop->fo_value);
1534 	int i;
1535 
1536 	filebench_log(LOG_DEBUG_IMPL,
1537 	    "flow %s-%d sem blocking on posix semaphore",
1538 	    flowop->fo_name, flowop->fo_instance);
1539 
1540 	/* Decrement sem by value */
1541 	for (i = 0; i < value; i++) {
1542 		if (sem_wait(&flowop->fo_sem) == -1) {
1543 			filebench_log(LOG_ERROR, "semop wait failed");
1544 			return (FILEBENCH_ERROR);
1545 		}
1546 	}
1547 
1548 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d sem unblocking",
1549 	    flowop->fo_name, flowop->fo_instance);
1550 #endif /* HAVE_SYSV_SEM */
1551 
1552 	return (FILEBENCH_OK);
1553 }
1554 
1555 /*
1556  * Calls ipc_seminit(). Always returns FILEBENCH_OK.
1557  */
1558 /* ARGSUSED */
1559 static int
1560 flowoplib_sempost_init(flowop_t *flowop)
1561 {
1562 #ifdef HAVE_SYSV_SEM
1563 	ipc_seminit();
1564 #endif /* HAVE_SYSV_SEM */
1565 	return (FILEBENCH_OK);
1566 }
1567 
1568 /*
1569  * Post to a System V or posix semaphore as appropriate.
1570  * On the first call for a given flowop instance, this routine
1571  * will use the fo_targetname attribute to locate all semblock
1572  * flowops that are expecting posts from this flowop. All
1573  * target flowops on this list will have a post operation done
1574  * to their semaphores on each call.
1575  */
1576 static int
1577 flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop)
1578 {
1579 	flowop_t *target;
1580 
1581 	filebench_log(LOG_DEBUG_IMPL,
1582 	    "sempost flow %s-%d",
1583 	    flowop->fo_name,
1584 	    flowop->fo_instance);
1585 
1586 	/* if this is the first post, create the post list */
1587 	if (flowop->fo_targets == NULL) {
1588 		flowop_t *result = flowop_find(flowop->fo_targetname);
1589 
1590 		flowop->fo_targets = result;
1591 
1592 		if (result == NULL) {
1593 			filebench_log(LOG_ERROR,
1594 			    "sempost: could not find op %s for thread %s",
1595 			    flowop->fo_targetname,
1596 			    threadflow->tf_name);
1597 			filebench_shutdown(1);
1598 		}
1599 
1600 		while (result) {
1601 			result->fo_targetnext =
1602 			    result->fo_resultnext;
1603 			result = result->fo_resultnext;
1604 		}
1605 	}
1606 
1607 	target = flowop->fo_targets;
1608 
1609 	flowop_beginop(threadflow, flowop);
1610 	/* post to the targets */
1611 	while (target) {
1612 #ifdef HAVE_SYSV_SEM
1613 		struct sembuf sbuf[2];
1614 		int sys_semid;
1615 		int blocking;
1616 #else
1617 		int i;
1618 #endif /* HAVE_SYSV_SEM */
1619 		struct timespec timeout;
1620 		int value = (int)avd_get_int(flowop->fo_value);
1621 
1622 		if (target->fo_instance == FLOW_MASTER) {
1623 			target = target->fo_targetnext;
1624 			continue;
1625 		}
1626 
1627 #ifdef HAVE_SYSV_SEM
1628 
1629 		filebench_log(LOG_DEBUG_IMPL,
1630 		    "sempost flow %s-%d num %x",
1631 		    target->fo_name,
1632 		    target->fo_instance,
1633 		    target->fo_semid_lw);
1634 
1635 		sys_semid = filebench_shm->shm_sys_semid;
1636 		sbuf[0].sem_num = target->fo_semid_lw;
1637 		sbuf[0].sem_op = (short)value;
1638 		sbuf[0].sem_flg = 0;
1639 		sbuf[1].sem_num = target->fo_semid_hw;
1640 		sbuf[1].sem_op = value * -1;
1641 		sbuf[1].sem_flg = 0;
1642 		timeout.tv_sec = 600;
1643 		timeout.tv_nsec = 0;
1644 
1645 		if (avd_get_bool(flowop->fo_blocking))
1646 			blocking = 1;
1647 		else
1648 			blocking = 0;
1649 
1650 #ifdef HAVE_SEMTIMEDOP
1651 		if ((semtimedop(sys_semid, &sbuf[0], blocking + 1,
1652 		    &timeout) == -1) && (errno && (errno != EAGAIN))) {
1653 #else
1654 		if ((semop(sys_semid, &sbuf[0], blocking + 1) == -1) &&
1655 		    (errno && (errno != EAGAIN))) {
1656 #endif /* HAVE_SEMTIMEDOP */
1657 			filebench_log(LOG_ERROR, "semop post failed: %s",
1658 			    strerror(errno));
1659 			return (FILEBENCH_ERROR);
1660 		}
1661 
1662 		filebench_log(LOG_DEBUG_IMPL,
1663 		    "flow %s-%d finished posting",
1664 		    target->fo_name, target->fo_instance);
1665 #else
1666 		filebench_log(LOG_DEBUG_IMPL,
1667 		    "sempost flow %s-%d to posix semaphore",
1668 		    target->fo_name,
1669 		    target->fo_instance);
1670 
1671 		/* Increment sem by value */
1672 		for (i = 0; i < value; i++) {
1673 			if (sem_post(&target->fo_sem) == -1) {
1674 				filebench_log(LOG_ERROR, "semop post failed");
1675 				return (FILEBENCH_ERROR);
1676 			}
1677 		}
1678 
1679 		filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
1680 		    target->fo_name, target->fo_instance);
1681 #endif /* HAVE_SYSV_SEM */
1682 
1683 		target = target->fo_targetnext;
1684 	}
1685 	flowop_endop(threadflow, flowop, 0);
1686 
1687 	return (FILEBENCH_OK);
1688 }
1689 
1690 
1691 /*
1692  * Section for exercising create / open / close / delete operations
1693  * on files within a fileset. For proper operation, the flowop attribute
1694  * "fd", which sets the fo_fdnumber field in the flowop, must be used
1695  * so that the same file is opened and later closed. "fd" is an index
1696  * into a pair of arrays maintained by threadflows, one of which
1697  * contains the operating system assigned file descriptors and the other
1698  * a pointer to the filesetentry whose file the file descriptor
1699  * references. An openfile flowop defined without fd being set will use
1700  * the default (0) fd or, if specified, rotate through fd indices, but
1701  * createfile and closefile must use the default or a specified fd.
1702  * Meanwhile deletefile picks and arbitrary file to delete, regardless
1703  * of fd attribute.
1704  */
1705 
1706 /*
1707  * XXX Making file selection more consistent among the flowops might good
1708  */
1709 
1710 
1711 /*
1712  * Emulates (and actually does) file open. Obtains a file descriptor
1713  * index, then calls flowoplib_openfile_common() to open. Returns
1714  * FILEBENCH_ERROR if no file descriptor is found, and returns the
1715  * status from flowoplib_openfile_common otherwise (FILEBENCH_ERROR,
1716  * FILEBENCH_NORSC, FILEBENCH_OK).
1717  */
1718 static int
1719 flowoplib_openfile(threadflow_t *threadflow, flowop_t *flowop)
1720 {
1721 	int fd = flowoplib_fdnum(threadflow, flowop);
1722 
1723 	if (fd == -1)
1724 		return (FILEBENCH_ERROR);
1725 
1726 	return (flowoplib_openfile_common(threadflow, flowop, fd));
1727 }
1728 
1729 /*
1730  * Common file opening code for filesets. Uses the supplied
1731  * file descriptor index to determine the tf_fd entry to use.
1732  * If the entry is empty (0) and the fileset exists, fileset
1733  * pick is called to select a fileset entry to use. The file
1734  * specified in the filesetentry is opened, and the returned
1735  * operating system file descriptor and a pointer to the
1736  * filesetentry are stored in tf_fd[fd] and tf_fse[fd],
1737  * respectively. Returns FILEBENCH_ERROR on error,
1738  * FILEBENCH_NORSC if no suitable filesetentry can be found,
1739  * and FILEBENCH_OK on success.
1740  */
1741 static int
1742 flowoplib_openfile_common(threadflow_t *threadflow, flowop_t *flowop, int fd)
1743 {
1744 	filesetentry_t *file;
1745 	char *fileset_name;
1746 	int tid = 0;
1747 
1748 	if (flowop->fo_fileset == NULL) {
1749 		filebench_log(LOG_ERROR, "flowop NULL file");
1750 		return (FILEBENCH_ERROR);
1751 	}
1752 
1753 	if ((fileset_name =
1754 	    avd_get_str(flowop->fo_fileset->fs_name)) == NULL) {
1755 		filebench_log(LOG_ERROR,
1756 		    "flowop %s: fileset has no name", flowop->fo_name);
1757 		return (FILEBENCH_ERROR);
1758 	}
1759 
1760 	/*
1761 	 * If the flowop doesn't default to persistent fd
1762 	 * then get unique thread ID for use by fileset_pick
1763 	 */
1764 	if (avd_get_bool(flowop->fo_rotatefd))
1765 		tid = threadflow->tf_utid;
1766 
1767 	if (threadflow->tf_fd[fd] != 0) {
1768 		filebench_log(LOG_ERROR,
1769 		    "flowop %s attempted to open without closing on fd %d",
1770 		    flowop->fo_name, fd);
1771 		return (FILEBENCH_ERROR);
1772 	}
1773 
1774 #ifdef HAVE_RAW_SUPPORT
1775 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1776 		int open_attrs = 0;
1777 		char name[MAXPATHLEN];
1778 
1779 		(void) strcpy(name,
1780 		    avd_get_str(flowop->fo_fileset->fs_path));
1781 		(void) strcat(name, "/");
1782 		(void) strcat(name, fileset_name);
1783 
1784 		if (avd_get_bool(flowop->fo_dsync)) {
1785 #ifdef sun
1786 			open_attrs |= O_DSYNC;
1787 #else
1788 			open_attrs |= O_FSYNC;
1789 #endif
1790 		}
1791 
1792 		filebench_log(LOG_DEBUG_SCRIPT,
1793 		    "open raw device %s flags %d = %d", name, open_attrs, fd);
1794 
1795 		threadflow->tf_fd[fd] = open64(name,
1796 		    O_RDWR | open_attrs, 0666);
1797 
1798 		if (threadflow->tf_fd[fd] < 0) {
1799 			filebench_log(LOG_ERROR,
1800 			    "Failed to open raw device %s: %s",
1801 			    name, strerror(errno));
1802 			return (FILEBENCH_ERROR);
1803 		}
1804 
1805 		/* if running on Solaris, use un-buffered io */
1806 #ifdef sun
1807 		(void) directio(threadflow->tf_fd[fd], DIRECTIO_ON);
1808 #endif
1809 
1810 		threadflow->tf_fse[fd] = NULL;
1811 
1812 		return (FILEBENCH_OK);
1813 	}
1814 #endif /* HAVE_RAW_SUPPORT */
1815 
1816 	if ((file = fileset_pick(flowop->fo_fileset,
1817 	    FILESET_PICKEXISTS, tid)) == NULL) {
1818 		filebench_log(LOG_DEBUG_SCRIPT,
1819 		    "flowop %s failed to pick file from %s on fd %d",
1820 		    flowop->fo_name, fileset_name, fd);
1821 		return (FILEBENCH_NORSC);
1822 	}
1823 
1824 	threadflow->tf_fse[fd] = file;
1825 
1826 	flowop_beginop(threadflow, flowop);
1827 	threadflow->tf_fd[fd] = fileset_openfile(flowop->fo_fileset,
1828 	    file, O_RDWR, 0666, flowoplib_fileattrs(flowop));
1829 	flowop_endop(threadflow, flowop, 0);
1830 
1831 	if (threadflow->tf_fd[fd] < 0) {
1832 		filebench_log(LOG_ERROR, "flowop %s failed to open file %s",
1833 		    flowop->fo_name, file->fse_path);
1834 		return (FILEBENCH_ERROR);
1835 	}
1836 
1837 	filebench_log(LOG_DEBUG_SCRIPT,
1838 	    "flowop %s: opened %s fd[%d] = %d",
1839 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1840 
1841 	return (FILEBENCH_OK);
1842 }
1843 
1844 /*
1845  * Emulate create of a file. Uses the flowop's fdnumber to select
1846  * tf_fd and tf_fse array locations to put the created file's file
1847  * descriptor and filesetentry respectively. Uses fileset_pick()
1848  * to select a specific filesetentry whose file does not currently
1849  * exist for the file create operation. Then calls
1850  * fileset_openfile() with the O_CREATE flag set to create the
1851  * file. Returns FILEBENCH_ERROR if the array index specified by fdnumber is
1852  * already in use, the flowop has no associated fileset, or
1853  * the create call fails. Returns 1 if a filesetentry with a
1854  * nonexistent file cannot be found. Returns FILEBENCH_OK on success.
1855  */
1856 static int
1857 flowoplib_createfile(threadflow_t *threadflow, flowop_t *flowop)
1858 {
1859 	filesetentry_t *file;
1860 	int fd = flowop->fo_fdnumber;
1861 
1862 	if (threadflow->tf_fd[fd] != 0) {
1863 		filebench_log(LOG_ERROR,
1864 		    "flowop %s attempted to create without closing on fd %d",
1865 		    flowop->fo_name, fd);
1866 		return (FILEBENCH_ERROR);
1867 	}
1868 
1869 	if (flowop->fo_fileset == NULL) {
1870 		filebench_log(LOG_ERROR, "flowop NULL file");
1871 		return (FILEBENCH_ERROR);
1872 	}
1873 
1874 #ifdef HAVE_RAW_SUPPORT
1875 	/* can't be used with raw devices */
1876 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1877 		filebench_log(LOG_ERROR,
1878 		    "flowop %s attempted to a createfile on RAW device",
1879 		    flowop->fo_name);
1880 		return (FILEBENCH_ERROR);
1881 	}
1882 #endif /* HAVE_RAW_SUPPORT */
1883 
1884 	if ((file = fileset_pick(flowop->fo_fileset,
1885 	    FILESET_PICKNOEXIST, 0)) == NULL) {
1886 		filebench_log(LOG_DEBUG_SCRIPT,
1887 		    "flowop %s failed to pick file from fileset %s",
1888 		    flowop->fo_name,
1889 		    avd_get_str(flowop->fo_fileset->fs_name));
1890 		return (FILEBENCH_NORSC);
1891 	}
1892 
1893 	threadflow->tf_fse[fd] = file;
1894 
1895 	flowop_beginop(threadflow, flowop);
1896 	threadflow->tf_fd[fd] = fileset_openfile(flowop->fo_fileset,
1897 	    file, O_RDWR | O_CREAT, 0666, flowoplib_fileattrs(flowop));
1898 	flowop_endop(threadflow, flowop, 0);
1899 
1900 	if (threadflow->tf_fd[fd] < 0) {
1901 		filebench_log(LOG_ERROR, "failed to create file %s",
1902 		    flowop->fo_name);
1903 		return (FILEBENCH_ERROR);
1904 	}
1905 
1906 	filebench_log(LOG_DEBUG_SCRIPT,
1907 	    "flowop %s: created %s fd[%d] = %d",
1908 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1909 
1910 	return (FILEBENCH_OK);
1911 }
1912 
1913 /*
1914  * Emulates delete of a file. If a valid fd is provided, it uses the
1915  * filesetentry stored at that fd location to select the file to be
1916  * deleted, otherwise it picks an arbitrary filesetentry
1917  * whose file exists. It then uses unlink() to delete it and Clears
1918  * the FSE_EXISTS flag for the filesetentry. Returns FILEBENCH_ERROR if the
1919  * flowop has no associated fileset. Returns FILEBENCH_NORSC if an appropriate
1920  * filesetentry cannot be found, and FILEBENCH_OK on success.
1921  */
1922 static int
1923 flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop)
1924 {
1925 	filesetentry_t *file;
1926 	fileset_t *fileset;
1927 	char path[MAXPATHLEN];
1928 	char *pathtmp;
1929 	int fd = flowop->fo_fdnumber;
1930 
1931 	/* if fd specified, use it to access file */
1932 	if ((fd > 0) && ((file = threadflow->tf_fse[fd]) != NULL)) {
1933 
1934 		/* check whether file still open */
1935 		if (threadflow->tf_fd[fd] > 0) {
1936 			filebench_log(LOG_DEBUG_SCRIPT,
1937 			    "flowop %s deleting still open file at fd = %d",
1938 			    flowop->fo_name, fd);
1939 		}
1940 
1941 		/* indicate that the file will be deleted */
1942 		threadflow->tf_fse[fd] = NULL;
1943 
1944 		/* if here, we still have a valid file pointer */
1945 		fileset = file->fse_fileset;
1946 	} else {
1947 		/* Otherwise, pick arbitrary file */
1948 		file = NULL;
1949 		fileset = flowop->fo_fileset;
1950 	}
1951 
1952 
1953 	if (fileset == NULL) {
1954 		filebench_log(LOG_ERROR, "flowop NULL file");
1955 		return (FILEBENCH_ERROR);
1956 	}
1957 
1958 #ifdef HAVE_RAW_SUPPORT
1959 	/* can't be used with raw devices */
1960 	if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1961 		filebench_log(LOG_ERROR,
1962 		    "flowop %s attempted a deletefile on RAW device",
1963 		    flowop->fo_name);
1964 		return (FILEBENCH_ERROR);
1965 	}
1966 #endif /* HAVE_RAW_SUPPORT */
1967 
1968 	if (file == NULL) {
1969 		if ((file = fileset_pick(fileset, FILESET_PICKEXISTS, 0))
1970 		    == NULL) {
1971 			filebench_log(LOG_DEBUG_SCRIPT,
1972 			    "flowop %s failed to pick file", flowop->fo_name);
1973 			return (FILEBENCH_NORSC);
1974 		}
1975 	} else {
1976 		(void) ipc_mutex_lock(&file->fse_lock);
1977 	}
1978 
1979 	*path = 0;
1980 	(void) strcpy(path, avd_get_str(fileset->fs_path));
1981 	(void) strcat(path, "/");
1982 	(void) strcat(path, avd_get_str(fileset->fs_name));
1983 	pathtmp = fileset_resolvepath(file);
1984 	(void) strcat(path, pathtmp);
1985 	free(pathtmp);
1986 
1987 	flowop_beginop(threadflow, flowop);
1988 	(void) unlink(path);
1989 	flowop_endop(threadflow, flowop, 0);
1990 	file->fse_flags &= ~FSE_EXISTS;
1991 	(void) ipc_mutex_lock(&fileset->fs_num_files_lock);
1992 	fileset->fs_num_act_files--;
1993 	(void) ipc_mutex_unlock(&fileset->fs_num_files_lock);
1994 	(void) ipc_mutex_unlock(&file->fse_lock);
1995 
1996 	filebench_log(LOG_DEBUG_SCRIPT, "deleted file %s", file->fse_path);
1997 
1998 	return (FILEBENCH_OK);
1999 }
2000 
2001 /*
2002  * Emulates fsync of a file. Obtains the file descriptor index
2003  * from the flowop, obtains the actual file descriptor from
2004  * the threadflow's table, checks to be sure it is still an
2005  * open file, then does an fsync operation on it. Returns FILEBENCH_ERROR
2006  * if the file no longer is open, FILEBENCH_OK otherwise.
2007  */
2008 static int
2009 flowoplib_fsync(threadflow_t *threadflow, flowop_t *flowop)
2010 {
2011 	filesetentry_t *file;
2012 	int fd = flowop->fo_fdnumber;
2013 
2014 	if (threadflow->tf_fd[fd] == 0) {
2015 		filebench_log(LOG_ERROR,
2016 		    "flowop %s attempted to fsync a closed fd %d",
2017 		    flowop->fo_name, fd);
2018 		return (FILEBENCH_ERROR);
2019 	}
2020 
2021 	file = threadflow->tf_fse[fd];
2022 
2023 	if ((file == NULL) ||
2024 	    (file->fse_fileset->fs_attrs & FILESET_IS_RAW_DEV)) {
2025 		filebench_log(LOG_ERROR,
2026 		    "flowop %s attempted to a fsync a RAW device",
2027 		    flowop->fo_name);
2028 		return (FILEBENCH_ERROR);
2029 	}
2030 
2031 	/* Measure time to fsync */
2032 	flowop_beginop(threadflow, flowop);
2033 	(void) fsync(threadflow->tf_fd[fd]);
2034 	flowop_endop(threadflow, flowop, 0);
2035 
2036 	filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s", file->fse_path);
2037 
2038 	return (FILEBENCH_OK);
2039 }
2040 
2041 /*
2042  * Emulate fsync of an entire fileset. Search through the
2043  * threadflow's file descriptor array, doing fsync() on each
2044  * open file that belongs to the flowop's fileset. Always
2045  * returns FILEBENCH_OK.
2046  */
2047 static int
2048 flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop)
2049 {
2050 	int fd;
2051 
2052 	for (fd = 0; fd < THREADFLOW_MAXFD; fd++) {
2053 		filesetentry_t *file;
2054 
2055 		/* Match the file set to fsync */
2056 		if ((threadflow->tf_fse[fd] == NULL) ||
2057 		    (flowop->fo_fileset != threadflow->tf_fse[fd]->fse_fileset))
2058 			continue;
2059 
2060 		/* Measure time to fsync */
2061 		flowop_beginop(threadflow, flowop);
2062 		(void) fsync(threadflow->tf_fd[fd]);
2063 		flowop_endop(threadflow, flowop, 0);
2064 
2065 		file = threadflow->tf_fse[fd];
2066 
2067 		filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s",
2068 		    file->fse_path);
2069 	}
2070 
2071 	return (FILEBENCH_OK);
2072 }
2073 
2074 /*
2075  * Emulate close of a file.  Obtains the file descriptor index
2076  * from the flowop, obtains the actual file descriptor from the
2077  * threadflow's table, checks to be sure it is still an open
2078  * file, then does a close operation on it. Then sets the
2079  * threadflow file descriptor table entry to 0, and the file set
2080  * entry pointer to NULL. Returns FILEBENCH_ERROR if the file was not open,
2081  * FILEBENCH_OK otherwise.
2082  */
2083 static int
2084 flowoplib_closefile(threadflow_t *threadflow, flowop_t *flowop)
2085 {
2086 	filesetentry_t *file;
2087 	int fd = flowop->fo_fdnumber;
2088 
2089 	if (threadflow->tf_fd[fd] == 0) {
2090 		filebench_log(LOG_ERROR,
2091 		    "flowop %s attempted to close an already closed fd %d",
2092 		    flowop->fo_name, fd);
2093 		return (FILEBENCH_ERROR);
2094 	}
2095 
2096 	/* Measure time to close */
2097 	flowop_beginop(threadflow, flowop);
2098 	(void) close(threadflow->tf_fd[fd]);
2099 	flowop_endop(threadflow, flowop, 0);
2100 
2101 	file = threadflow->tf_fse[fd];
2102 
2103 	threadflow->tf_fd[fd] = 0;
2104 
2105 	filebench_log(LOG_DEBUG_SCRIPT, "closed file %s", file->fse_path);
2106 
2107 	return (FILEBENCH_OK);
2108 }
2109 
2110 /*
2111  * Emulate stat of a file. Picks an arbitrary filesetentry with
2112  * an existing file from the flowop's fileset, then performs a
2113  * stat() operation on it. Returns FILEBENCH_ERROR if the flowop has no
2114  * associated fileset. Returns FILEBENCH_NORSC if an appropriate filesetentry
2115  * cannot be found, and FILEBENCH_OK on success.
2116  */
2117 static int
2118 flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop)
2119 {
2120 	filesetentry_t *file;
2121 	fileset_t *fileset;
2122 	char path[MAXPATHLEN];
2123 	char *pathtmp;
2124 
2125 	if ((fileset = flowop->fo_fileset) == NULL) {
2126 		filebench_log(LOG_ERROR, "flowop NULL file");
2127 		return (FILEBENCH_ERROR);
2128 	}
2129 
2130 	if ((file = fileset_pick(fileset, FILESET_PICKEXISTS, 0)) == NULL) {
2131 		filebench_log(LOG_DEBUG_SCRIPT,
2132 		    "flowop %s failed to pick file",
2133 		    flowop->fo_name);
2134 		return (FILEBENCH_NORSC);
2135 	}
2136 
2137 	*path = 0;
2138 	(void) strcpy(path, avd_get_str(fileset->fs_path));
2139 	(void) strcat(path, "/");
2140 	(void) strcat(path, avd_get_str(fileset->fs_name));
2141 	pathtmp = fileset_resolvepath(file);
2142 	(void) strcat(path, pathtmp);
2143 	free(pathtmp);
2144 
2145 	flowop_beginop(threadflow, flowop);
2146 	flowop_endop(threadflow, flowop, 0);
2147 
2148 	(void) ipc_mutex_unlock(&file->fse_lock);
2149 
2150 	return (FILEBENCH_OK);
2151 }
2152 
2153 
2154 /*
2155  * Additional reads and writes. Read and write whole files, write
2156  * and append to files. Some of these work with both fileobjs and
2157  * filesets, others only with filesets. The flowoplib_write routine
2158  * writes from thread memory, while the others read or write using
2159  * fo_buf memory. Note that both flowoplib_read() and
2160  * flowoplib_aiowrite() use thread memory as well.
2161  */
2162 
2163 
2164 /*
2165  * Emulate a read of a whole file. The file must be open with
2166  * file descriptor and filesetentry stored at the locations indexed
2167  * by the flowop's fdnumber. It then seeks to the beginning of the
2168  * associated file, and reads fs_iosize bytes at a time until the end
2169  * of the file. Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if
2170  * out of files, and FILEBENCH_OK on success.
2171  */
2172 static int
2173 flowoplib_readwholefile(threadflow_t *threadflow, flowop_t *flowop)
2174 {
2175 	caddr_t iobuf;
2176 	off64_t bytes = 0;
2177 	int filedesc;
2178 	uint64_t wss;
2179 	fbint_t iosize;
2180 	int ret;
2181 	char zerordbuf;
2182 
2183 	/* get the file to use */
2184 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
2185 	    &filedesc)) != FILEBENCH_OK)
2186 		return (ret);
2187 
2188 	/* an I/O size of zero means read entire working set with one I/O */
2189 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
2190 		iosize = wss;
2191 
2192 	/*
2193 	 * The file may actually be 0 bytes long, in which case skip
2194 	 * the buffer set up call (which would fail) and substitute
2195 	 * a small buffer, which won't really be used.
2196 	 */
2197 	if (iosize == 0) {
2198 		iobuf = (caddr_t)&zerordbuf;
2199 		filebench_log(LOG_DEBUG_SCRIPT,
2200 		    "flowop %s read zero length file", flowop->fo_name);
2201 	} else {
2202 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
2203 		    iosize) != 0)
2204 			return (FILEBENCH_ERROR);
2205 	}
2206 
2207 	/* Measure time to read bytes */
2208 	flowop_beginop(threadflow, flowop);
2209 	(void) lseek64(filedesc, 0, SEEK_SET);
2210 	while ((ret = read(filedesc, iobuf, iosize)) > 0)
2211 		bytes += ret;
2212 
2213 	flowop_endop(threadflow, flowop, bytes);
2214 
2215 	if (ret < 0) {
2216 		filebench_log(LOG_ERROR,
2217 		    "readwhole fail Failed to read whole file: %s",
2218 		    strerror(errno));
2219 		return (FILEBENCH_ERROR);
2220 	}
2221 
2222 	return (FILEBENCH_OK);
2223 }
2224 
2225 /*
2226  * Emulate a write to a file of size fo_iosize.  Will write
2227  * to a file from a fileset if the flowop's fo_fileset field
2228  * specifies one or its fdnumber is non zero. Otherwise it
2229  * will write to a fileobj file, if one exists. If the file
2230  * is not currently open, the routine will attempt to open
2231  * it. The flowop's fo_wss parameter will be used to set the
2232  * maximum file size if it is non-zero, otherwise the
2233  * filesetentry's  fse_size will be used. A random memory
2234  * buffer offset is calculated, and, if fo_random is TRUE,
2235  * a random file offset is used for the write. Otherwise the
2236  * write is to the next sequential location. Returns
2237  * FILEBENCH_ERROR on errors, FILEBENCH_NORSC if iosetup can't
2238  * obtain a file, or FILEBENCH_OK on success.
2239  */
2240 static int
2241 flowoplib_write(threadflow_t *threadflow, flowop_t *flowop)
2242 {
2243 	caddr_t iobuf;
2244 	fbint_t wss;
2245 	fbint_t iosize;
2246 	int filedesc;
2247 	int ret;
2248 
2249 	iosize = avd_get_int(flowop->fo_iosize);
2250 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2251 	    &filedesc, iosize)) != FILEBENCH_OK)
2252 		return (ret);
2253 
2254 	if (avd_get_bool(flowop->fo_random)) {
2255 		uint64_t fileoffset;
2256 
2257 		if (filebench_randomno64(&fileoffset,
2258 		    wss, iosize, NULL) == -1) {
2259 			filebench_log(LOG_ERROR,
2260 			    "file size smaller than IO size for thread %s",
2261 			    flowop->fo_name);
2262 			return (FILEBENCH_ERROR);
2263 		}
2264 		flowop_beginop(threadflow, flowop);
2265 		if (pwrite64(filedesc, iobuf,
2266 		    iosize, (off64_t)fileoffset) == -1) {
2267 			filebench_log(LOG_ERROR, "write failed, "
2268 			    "offset %llu io buffer %zd: %s",
2269 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
2270 			flowop_endop(threadflow, flowop, 0);
2271 			return (FILEBENCH_ERROR);
2272 		}
2273 		flowop_endop(threadflow, flowop, iosize);
2274 	} else {
2275 		flowop_beginop(threadflow, flowop);
2276 		if (write(filedesc, iobuf, iosize) == -1) {
2277 			filebench_log(LOG_ERROR,
2278 			    "write failed, io buffer %zd: %s",
2279 			    iobuf, strerror(errno));
2280 			flowop_endop(threadflow, flowop, 0);
2281 			return (FILEBENCH_ERROR);
2282 		}
2283 		flowop_endop(threadflow, flowop, iosize);
2284 	}
2285 
2286 	return (FILEBENCH_OK);
2287 }
2288 
2289 /*
2290  * Emulate a write of a whole file.  The size of the file
2291  * is taken from a filesetentry identified by fo_srcfdnumber or
2292  * from the working set size, while the file descriptor used is
2293  * identified by fo_fdnumber. Does multiple writes of fo_iosize
2294  * length length until full file has been written. Returns FILEBENCH_ERROR on
2295  * error, FILEBENCH_NORSC if out of files, FILEBENCH_OK on success.
2296  */
2297 static int
2298 flowoplib_writewholefile(threadflow_t *threadflow, flowop_t *flowop)
2299 {
2300 	caddr_t iobuf;
2301 	filesetentry_t *file;
2302 	int wsize;
2303 	off64_t seek;
2304 	off64_t bytes = 0;
2305 	uint64_t wss;
2306 	fbint_t iosize;
2307 	int filedesc;
2308 	int srcfd = flowop->fo_srcfdnumber;
2309 	int ret;
2310 	char zerowrtbuf;
2311 
2312 	/* get the file to use */
2313 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
2314 	    &filedesc)) != FILEBENCH_OK)
2315 		return (ret);
2316 
2317 	/* an I/O size of zero means write entire working set with one I/O */
2318 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
2319 		iosize = wss;
2320 
2321 	/*
2322 	 * The file may actually be 0 bytes long, in which case skip
2323 	 * the buffer set up call (which would fail) and substitute
2324 	 * a small buffer, which won't really be used.
2325 	 */
2326 	if (iosize == 0) {
2327 		iobuf = (caddr_t)&zerowrtbuf;
2328 		filebench_log(LOG_DEBUG_SCRIPT,
2329 		    "flowop %s wrote zero length file", flowop->fo_name);
2330 	} else {
2331 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
2332 		    iosize) != 0)
2333 			return (FILEBENCH_ERROR);
2334 	}
2335 
2336 	file = threadflow->tf_fse[srcfd];
2337 	if ((srcfd != 0) && (file == NULL)) {
2338 		filebench_log(LOG_ERROR, "flowop %s: NULL src file",
2339 		    flowop->fo_name);
2340 		return (FILEBENCH_ERROR);
2341 	}
2342 
2343 	if (file)
2344 		wss = file->fse_size;
2345 
2346 	wsize = (int)MIN(wss, iosize);
2347 
2348 	/* Measure time to write bytes */
2349 	flowop_beginop(threadflow, flowop);
2350 	for (seek = 0; seek < wss; seek += wsize) {
2351 		ret = write(filedesc, iobuf, wsize);
2352 		if (ret != wsize) {
2353 			filebench_log(LOG_ERROR,
2354 			    "Failed to write %d bytes on fd %d: %s",
2355 			    wsize, filedesc, strerror(errno));
2356 			flowop_endop(threadflow, flowop, 0);
2357 			return (FILEBENCH_ERROR);
2358 		}
2359 		wsize = (int)MIN(wss - seek, iosize);
2360 		bytes += ret;
2361 	}
2362 	flowop_endop(threadflow, flowop, bytes);
2363 
2364 	return (FILEBENCH_OK);
2365 }
2366 
2367 
2368 /*
2369  * Emulate a fixed size append to a file. Will append data to
2370  * a file chosen from a fileset if the flowop's fo_fileset
2371  * field specifies one or if its fdnumber is non zero.
2372  * Otherwise it will write to a fileobj file, if one exists.
2373  * The flowop's fo_wss parameter will be used to set the
2374  * maximum file size if it is non-zero, otherwise the
2375  * filesetentry's fse_size will be used. A random memory
2376  * buffer offset is calculated, then a logical seek to the
2377  * end of file is done followed by a write of fo_iosize
2378  * bytes. Writes are actually done from fo_buf, rather than
2379  * tf_mem as is done with flowoplib_write(), and no check
2380  * is made to see if fo_iosize exceeds the size of fo_buf.
2381  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
2382  * files in the fileset, FILEBENCH_OK on success.
2383  */
2384 static int
2385 flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop)
2386 {
2387 	caddr_t iobuf;
2388 	int filedesc;
2389 	fbint_t wss;
2390 	fbint_t iosize;
2391 	int ret;
2392 
2393 	iosize = avd_get_int(flowop->fo_iosize);
2394 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2395 	    &filedesc, iosize)) != FILEBENCH_OK)
2396 		return (ret);
2397 
2398 	/* XXX wss is not being used */
2399 
2400 	/* Measure time to write bytes */
2401 	flowop_beginop(threadflow, flowop);
2402 	(void) lseek64(filedesc, 0, SEEK_END);
2403 	ret = write(filedesc, iobuf, iosize);
2404 	if (ret != iosize) {
2405 		filebench_log(LOG_ERROR,
2406 		    "Failed to write %llu bytes on fd %d: %s",
2407 		    (u_longlong_t)iosize, filedesc, strerror(errno));
2408 		flowop_endop(threadflow, flowop, ret);
2409 		return (FILEBENCH_ERROR);
2410 	}
2411 	flowop_endop(threadflow, flowop, ret);
2412 
2413 	return (FILEBENCH_OK);
2414 }
2415 
2416 /*
2417  * Emulate a random size append to a file. Will append data
2418  * to a file chosen from a fileset if the flowop's fo_fileset
2419  * field specifies one or if its fdnumber is non zero. Otherwise
2420  * it will write to a fileobj file, if one exists. The flowop's
2421  * fo_wss parameter will be used to set the maximum file size
2422  * if it is non-zero, otherwise the filesetentry's fse_size
2423  * will be used.  A random transfer size (but at most fo_iosize
2424  * bytes) and a random memory offset are calculated. A logical
2425  * seek to the end of file is done, then writes of up to
2426  * FILE_ALLOC_BLOCK in size are done until the full transfer
2427  * size has been written. Writes are actually done from fo_buf,
2428  * rather than tf_mem as is done with flowoplib_write().
2429  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
2430  * files in the fileset, FILEBENCH_OK on success.
2431  */
2432 static int
2433 flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop)
2434 {
2435 	caddr_t iobuf;
2436 	uint64_t appendsize;
2437 	int filedesc;
2438 	fbint_t wss;
2439 	fbint_t iosize;
2440 	int ret = 0;
2441 
2442 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0) {
2443 		filebench_log(LOG_ERROR, "zero iosize for flowop %s",
2444 		    flowop->fo_name);
2445 		return (FILEBENCH_ERROR);
2446 	}
2447 
2448 	if (filebench_randomno64(&appendsize, iosize, 1LL, NULL) != 0)
2449 		return (FILEBENCH_ERROR);
2450 
2451 	/* skip if attempting zero length append */
2452 	if (appendsize == 0) {
2453 		flowop_beginop(threadflow, flowop);
2454 		flowop_endop(threadflow, flowop, 0LL);
2455 		return (FILEBENCH_OK);
2456 	}
2457 
2458 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2459 	    &filedesc, appendsize)) != FILEBENCH_OK)
2460 		return (ret);
2461 
2462 	/* XXX wss is not being used */
2463 
2464 	/* Measure time to write bytes */
2465 	flowop_beginop(threadflow, flowop);
2466 
2467 	(void) lseek64(filedesc, 0, SEEK_END);
2468 	ret = write(filedesc, iobuf, appendsize);
2469 	if (ret != appendsize) {
2470 		filebench_log(LOG_ERROR,
2471 		    "Failed to write %llu bytes on fd %d: %s",
2472 		    (u_longlong_t)appendsize, filedesc, strerror(errno));
2473 		flowop_endop(threadflow, flowop, 0);
2474 		return (FILEBENCH_ERROR);
2475 	}
2476 
2477 	flowop_endop(threadflow, flowop, appendsize);
2478 
2479 	return (FILEBENCH_OK);
2480 }
2481 
2482 typedef struct testrandvar_priv {
2483 	uint64_t sample_count;
2484 	double val_sum;
2485 	double sqr_sum;
2486 } testrandvar_priv_t;
2487 
2488 /*
2489  * flowop to calculate various statistics from the number stream
2490  * produced by a random variable. This allows verification that the
2491  * random distribution used to define the random variable is producing
2492  * the expected distribution of random numbers.
2493  */
2494 /* ARGSUSED */
2495 static int
2496 flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop)
2497 {
2498 	testrandvar_priv_t	*mystats;
2499 	double			value;
2500 
2501 	if ((mystats = (testrandvar_priv_t *)flowop->fo_private) == NULL) {
2502 		filebench_log(LOG_ERROR, "testrandvar not initialized\n");
2503 		filebench_shutdown(1);
2504 		return (-1);
2505 	}
2506 
2507 	value = avd_get_dbl(flowop->fo_value);
2508 
2509 	mystats->sample_count++;
2510 	mystats->val_sum += value;
2511 	mystats->sqr_sum += (value * value);
2512 
2513 	return (0);
2514 }
2515 
2516 /*
2517  * Initialize the private data area used to accumulate the statistics
2518  */
2519 static int
2520 flowoplib_testrandvar_init(flowop_t *flowop)
2521 {
2522 	testrandvar_priv_t	*mystats;
2523 
2524 	if ((mystats = (testrandvar_priv_t *)
2525 	    malloc(sizeof (testrandvar_priv_t))) == NULL) {
2526 		filebench_log(LOG_ERROR, "could not initialize testrandvar");
2527 		filebench_shutdown(1);
2528 		return (-1);
2529 	}
2530 
2531 	mystats->sample_count = 0;
2532 	mystats->val_sum = 0;
2533 	mystats->sqr_sum = 0;
2534 	flowop->fo_private = (void *)mystats;
2535 
2536 	(void) ipc_mutex_unlock(&flowop->fo_lock);
2537 	return (0);
2538 }
2539 
2540 /*
2541  * Print out the accumulated statistics, and free the private storage
2542  */
2543 static void
2544 flowoplib_testrandvar_destruct(flowop_t *flowop)
2545 {
2546 	testrandvar_priv_t	*mystats;
2547 	double mean, std_dev, dbl_count;
2548 
2549 	(void) ipc_mutex_lock(&flowop->fo_lock);
2550 	if ((mystats = (testrandvar_priv_t *)
2551 	    flowop->fo_private) == NULL) {
2552 		(void) ipc_mutex_unlock(&flowop->fo_lock);
2553 		return;
2554 	}
2555 
2556 	flowop->fo_private = NULL;
2557 	(void) ipc_mutex_unlock(&flowop->fo_lock);
2558 
2559 	dbl_count = (double)mystats->sample_count;
2560 	mean = mystats->val_sum / dbl_count;
2561 	std_dev = sqrt((mystats->sqr_sum / dbl_count) - (mean * mean)) / mean;
2562 
2563 	filebench_log(LOG_VERBOSE,
2564 	    "testrandvar: ops = %llu, mean = %8.2lf, stddev = %8.2lf",
2565 	    (u_longlong_t)mystats->sample_count, mean, std_dev);
2566 	free(mystats);
2567 }
2568 
2569 /*
2570  * Prints usage information for flowop operations.
2571  */
2572 void
2573 flowoplib_usage()
2574 {
2575 	(void) fprintf(stderr,
2576 	    "flowop [openfile|createfile] name=<name>,fileset=<fname>\n");
2577 	(void) fprintf(stderr,
2578 	    "                       [,fd=<file desc num>]\n");
2579 	(void) fprintf(stderr, "\n");
2580 	(void) fprintf(stderr,
2581 	    "flowop closefile name=<name>,fd=<file desc num>]\n");
2582 	(void) fprintf(stderr, "\n");
2583 	(void) fprintf(stderr, "flowop deletefile name=<name>\n");
2584 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2585 	(void) fprintf(stderr,
2586 	    "                       [,fd=<file desc num>]\n");
2587 	(void) fprintf(stderr, "\n");
2588 	(void) fprintf(stderr, "flowop statfile name=<name>\n");
2589 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2590 	(void) fprintf(stderr,
2591 	    "                       [,fd=<file desc num>]\n");
2592 	(void) fprintf(stderr, "\n");
2593 	(void) fprintf(stderr,
2594 	    "flowop fsync name=<name>,fd=<file desc num>]\n");
2595 	(void) fprintf(stderr, "\n");
2596 	(void) fprintf(stderr,
2597 	    "flowop fsyncset name=<name>,fileset=<fname>]\n");
2598 	(void) fprintf(stderr, "\n");
2599 	(void) fprintf(stderr, "flowop [write|read|aiowrite] name=<name>, \n");
2600 	(void) fprintf(stderr,
2601 	    "                       filename|fileset=<fname>,\n");
2602 	(void) fprintf(stderr, "                       iosize=<size>\n");
2603 	(void) fprintf(stderr, "                       [,directio]\n");
2604 	(void) fprintf(stderr, "                       [,dsync]\n");
2605 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2606 	(void) fprintf(stderr, "                       [,random]\n");
2607 	(void) fprintf(stderr, "                       [,opennext]\n");
2608 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
2609 	(void) fprintf(stderr,
2610 	    "flowop [appendfile|appendfilerand] name=<name>, \n");
2611 	(void) fprintf(stderr,
2612 	    "                       filename|fileset=<fname>,\n");
2613 	(void) fprintf(stderr, "                       iosize=<size>\n");
2614 	(void) fprintf(stderr, "                       [,dsync]\n");
2615 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2616 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
2617 	(void) fprintf(stderr,
2618 	    "flowop [readwholefile|writewholefile] name=<name>, \n");
2619 	(void) fprintf(stderr,
2620 	    "                       filename|fileset=<fname>,\n");
2621 	(void) fprintf(stderr, "                       iosize=<size>\n");
2622 	(void) fprintf(stderr, "                       [,dsync]\n");
2623 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2624 	(void) fprintf(stderr, "\n");
2625 	(void) fprintf(stderr, "flowop aiowait name=<name>,target="
2626 	    "<aiowrite-flowop>\n");
2627 	(void) fprintf(stderr, "\n");
2628 	(void) fprintf(stderr, "flowop sempost name=<name>,"
2629 	    "target=<semblock-flowop>,\n");
2630 	(void) fprintf(stderr,
2631 	    "                       value=<increment-to-post>\n");
2632 	(void) fprintf(stderr, "\n");
2633 	(void) fprintf(stderr, "flowop semblock name=<name>,value="
2634 	    "<decrement-to-receive>,\n");
2635 	(void) fprintf(stderr, "                       highwater="
2636 	    "<inbound-queue-max>\n");
2637 	(void) fprintf(stderr, "\n");
2638 	(void) fprintf(stderr, "flowop block name=<name>\n");
2639 	(void) fprintf(stderr, "\n");
2640 	(void) fprintf(stderr,
2641 	    "flowop wakeup name=<name>,target=<block-flowop>,\n");
2642 	(void) fprintf(stderr, "\n");
2643 	(void) fprintf(stderr,
2644 	    "flowop hog name=<name>,value=<number-of-mem-ops>\n");
2645 	(void) fprintf(stderr,
2646 	    "flowop delay name=<name>,value=<number-of-seconds>\n");
2647 	(void) fprintf(stderr, "\n");
2648 	(void) fprintf(stderr, "flowop eventlimit name=<name>\n");
2649 	(void) fprintf(stderr, "flowop bwlimit name=<name>,value=<mb/s>\n");
2650 	(void) fprintf(stderr, "flowop iopslimit name=<name>,value=<iop/s>\n");
2651 	(void) fprintf(stderr,
2652 	    "flowop finishoncount name=<name>,value=<ops/s>\n");
2653 	(void) fprintf(stderr,
2654 	    "flowop finishonbytes name=<name>,value=<bytes>\n");
2655 	(void) fprintf(stderr, "\n");
2656 	(void) fprintf(stderr, "\n");
2657 }
2658