xref: /onnv-gate/usr/src/cmd/filebench/common/flowop_library.c (revision 9801:4a9784073e11)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  *
25  * Portions Copyright 2008 Denis Cheng
26  */
27 
28 #include "config.h"
29 
30 #include <sys/types.h>
31 #ifdef HAVE_SYS_ASYNCH_H
32 #include <sys/asynch.h>
33 #endif
34 #include <stddef.h>
35 #include <sys/ipc.h>
36 #include <sys/sem.h>
37 #include <sys/errno.h>
38 #include <sys/time.h>
39 #include <inttypes.h>
40 #include <fcntl.h>
41 #include <math.h>
42 #include <dirent.h>
43 
44 #ifdef HAVE_UTILITY_H
45 #include <utility.h>
46 #endif /* HAVE_UTILITY_H */
47 
48 #ifdef HAVE_SYS_ASYNC_H
49 #include <sys/asynch.h>
50 #endif /* HAVE_SYS_ASYNC_H */
51 
52 #ifndef HAVE_SYSV_SEM
53 #include <semaphore.h>
54 #endif /* HAVE_SYSV_SEM */
55 
56 #include "filebench.h"
57 #include "flowop.h"
58 #include "fileset.h"
59 #include "fb_random.h"
60 #include "utils.h"
61 #include "fsplug.h"
62 
63 /*
64  * These routines implement the flowops from the f language. Each
65  * flowop has has a name such as "read", and a set of function pointers
66  * to call for initialization, execution and destruction of the flowop.
67  * The table flowoplib_funcs[] contains a flowoplib struct for each
68  * implemented flowop. Most flowops use a generic initialization function
69  * and all currently use a generic destruction function. All flowop
70  * functions referenced from the table are in this file, though, of
71  * course, they often call functions from other files.
72  *
73  * The flowop_init() routine uses the flowoplib_funcs[] table to
74  * create an initial set of "instance 0" flowops, one for each type of
75  * flowop, from which all other flowops are derived. These "instance 0"
76  * flowops are initialized with information from the table including
77  * pointers for their fo_init, fo_func and fo_destroy functions. When
78  * a flowop definition is encountered in an f language script, the
79  * "type" of flowop, such as "read" is used to search for the
80  * "instance 0" flowop named "read", then a new flowop is allocated
81  * which inherits its function pointers and other initial properties
82  * from the instance 0 flowop, and is given a new name as specified
83  * by the "name=" attribute.
84  */
85 
86 static void flowoplib_destruct_noop(flowop_t *flowop);
87 static int flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop);
88 static int flowoplib_print(threadflow_t *threadflow, flowop_t *flowop);
89 static int flowoplib_write(threadflow_t *threadflow, flowop_t *flowop);
90 static int flowoplib_read(threadflow_t *threadflow, flowop_t *flowop);
91 static int flowoplib_block_init(flowop_t *flowop);
92 static int flowoplib_block(threadflow_t *threadflow, flowop_t *flowop);
93 static int flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop);
94 static int flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop);
95 static int flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop);
96 static int flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop);
97 static int flowoplib_sempost_init(flowop_t *flowop);
98 static int flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop);
99 static int flowoplib_semblock_init(flowop_t *flowop);
100 static void flowoplib_semblock_destruct(flowop_t *flowop);
101 static int flowoplib_eventlimit(threadflow_t *, flowop_t *flowop);
102 static int flowoplib_bwlimit(threadflow_t *, flowop_t *flowop);
103 static int flowoplib_iopslimit(threadflow_t *, flowop_t *flowop);
104 static int flowoplib_opslimit(threadflow_t *, flowop_t *flowop);
105 static int flowoplib_openfile(threadflow_t *, flowop_t *flowop);
106 static int flowoplib_openfile_common(threadflow_t *, flowop_t *flowop, int fd);
107 static int flowoplib_createfile(threadflow_t *, flowop_t *flowop);
108 static int flowoplib_closefile(threadflow_t *, flowop_t *flowop);
109 static int flowoplib_makedir(threadflow_t *, flowop_t *flowop);
110 static int flowoplib_removedir(threadflow_t *, flowop_t *flowop);
111 static int flowoplib_listdir(threadflow_t *, flowop_t *flowop);
112 static int flowoplib_fsync(threadflow_t *, flowop_t *flowop);
113 static int flowoplib_readwholefile(threadflow_t *, flowop_t *flowop);
114 static int flowoplib_writewholefile(threadflow_t *, flowop_t *flowop);
115 static int flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop);
116 static int flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop);
117 static int flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop);
118 static int flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop);
119 static int flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop);
120 static int flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop);
121 static int flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop);
122 static int flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop);
123 static int flowoplib_testrandvar_init(flowop_t *flowop);
124 static void flowoplib_testrandvar_destruct(flowop_t *flowop);
125 
126 static flowop_proto_t flowoplib_funcs[] = {
127 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "write", flowop_init_generic,
128 	flowoplib_write, flowop_destruct_generic,
129 	FLOW_TYPE_IO, FLOW_ATTR_READ, "read", flowop_init_generic,
130 	flowoplib_read, flowop_destruct_generic,
131 	FLOW_TYPE_SYNC, 0, "block", flowoplib_block_init,
132 	flowoplib_block, flowop_destruct_generic,
133 	FLOW_TYPE_SYNC, 0, "wakeup", flowop_init_generic,
134 	flowoplib_wakeup, flowop_destruct_generic,
135 	FLOW_TYPE_SYNC, 0, "semblock", flowoplib_semblock_init,
136 	flowoplib_semblock, flowoplib_semblock_destruct,
137 	FLOW_TYPE_SYNC, 0, "sempost", flowoplib_sempost_init,
138 	flowoplib_sempost, flowoplib_destruct_noop,
139 	FLOW_TYPE_OTHER, 0, "hog", flowop_init_generic,
140 	flowoplib_hog, flowop_destruct_generic,
141 	FLOW_TYPE_OTHER, 0, "delay", flowop_init_generic,
142 	flowoplib_delay, flowop_destruct_generic,
143 	FLOW_TYPE_OTHER, 0, "eventlimit", flowop_init_generic,
144 	flowoplib_eventlimit, flowop_destruct_generic,
145 	FLOW_TYPE_OTHER, 0, "bwlimit", flowop_init_generic,
146 	flowoplib_bwlimit, flowop_destruct_generic,
147 	FLOW_TYPE_OTHER, 0, "iopslimit", flowop_init_generic,
148 	flowoplib_iopslimit, flowop_destruct_generic,
149 	FLOW_TYPE_OTHER, 0, "opslimit", flowop_init_generic,
150 	flowoplib_opslimit, flowop_destruct_generic,
151 	FLOW_TYPE_OTHER, 0, "finishoncount", flowop_init_generic,
152 	flowoplib_finishoncount, flowop_destruct_generic,
153 	FLOW_TYPE_OTHER, 0, "finishonbytes", flowop_init_generic,
154 	flowoplib_finishonbytes, flowop_destruct_generic,
155 	FLOW_TYPE_IO, 0, "openfile", flowop_init_generic,
156 	flowoplib_openfile, flowop_destruct_generic,
157 	FLOW_TYPE_IO, 0, "createfile", flowop_init_generic,
158 	flowoplib_createfile, flowop_destruct_generic,
159 	FLOW_TYPE_IO, 0, "closefile", flowop_init_generic,
160 	flowoplib_closefile, flowop_destruct_generic,
161 	FLOW_TYPE_IO, 0, "makedir", flowop_init_generic,
162 	flowoplib_makedir, flowop_destruct_generic,
163 	FLOW_TYPE_IO, 0, "removedir", flowop_init_generic,
164 	flowoplib_removedir, flowop_destruct_generic,
165 	FLOW_TYPE_IO, 0, "listdir", flowop_init_generic,
166 	flowoplib_listdir, flowop_destruct_generic,
167 	FLOW_TYPE_IO, 0, "fsync", flowop_init_generic,
168 	flowoplib_fsync, flowop_destruct_generic,
169 	FLOW_TYPE_IO, 0, "fsyncset", flowop_init_generic,
170 	flowoplib_fsyncset, flowop_destruct_generic,
171 	FLOW_TYPE_IO, 0, "statfile", flowop_init_generic,
172 	flowoplib_statfile, flowop_destruct_generic,
173 	FLOW_TYPE_IO, FLOW_ATTR_READ, "readwholefile", flowop_init_generic,
174 	flowoplib_readwholefile, flowop_destruct_generic,
175 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfile", flowop_init_generic,
176 	flowoplib_appendfile, flowop_destruct_generic,
177 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfilerand", flowop_init_generic,
178 	flowoplib_appendfilerand, flowop_destruct_generic,
179 	FLOW_TYPE_IO, 0, "deletefile", flowop_init_generic,
180 	flowoplib_deletefile, flowop_destruct_generic,
181 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "writewholefile", flowop_init_generic,
182 	flowoplib_writewholefile, flowop_destruct_generic,
183 	FLOW_TYPE_OTHER, 0, "print", flowop_init_generic,
184 	flowoplib_print, flowop_destruct_generic,
185 	/* routine to calculate mean and stddev for output from a randvar */
186 	FLOW_TYPE_OTHER, 0, "testrandvar", flowoplib_testrandvar_init,
187 	flowoplib_testrandvar, flowoplib_testrandvar_destruct
188 };
189 
190 /*
191  * Loops through the list of flowops defined in this
192  * module, and creates and initializes a flowop for each one
193  * by calling flowop_flow_init. As a side effect of calling
194  * flowop_flow_init, the created flowops are placed on the
195  * master flowop list. All created flowops are set to
196  * instance "0".
197  */
198 void
flowoplib_flowinit()199 flowoplib_flowinit()
200 {
201 	int nops = sizeof (flowoplib_funcs) / sizeof (flowop_proto_t);
202 
203 	flowop_flow_init(flowoplib_funcs, nops);
204 }
205 
206 /*
207  * Special total noop destruct
208  */
209 /* ARGSUSED */
210 static void
flowoplib_destruct_noop(flowop_t * flowop)211 flowoplib_destruct_noop(flowop_t *flowop)
212 {
213 }
214 
215 /*
216  * Generates a file attribute from flags in the supplied flowop.
217  * Sets FLOW_ATTR_DIRECTIO and/or FLOW_ATTR_DSYNC as needed.
218  */
219 static int
flowoplib_fileattrs(flowop_t * flowop)220 flowoplib_fileattrs(flowop_t *flowop)
221 {
222 	int attrs = 0;
223 
224 	if (avd_get_bool(flowop->fo_directio))
225 		attrs |= FLOW_ATTR_DIRECTIO;
226 
227 	if (avd_get_bool(flowop->fo_dsync))
228 		attrs |= FLOW_ATTR_DSYNC;
229 
230 	return (attrs);
231 }
232 
233 /*
234  * Obtain a filesetentry for a file. Result placed where filep points.
235  * Supply with a flowop and a flag to indicate whether an existent or
236  * non-existent file is required. Returns FILEBENCH_NORSC if all out
237  * of the appropriate type of directories, FILEBENCH_ERROR if the
238  * flowop does not point to a fileset, and FILEBENCH_OK otherwise.
239  */
240 static int
flowoplib_pickfile(filesetentry_t ** filep,flowop_t * flowop,int flags,int tid)241 flowoplib_pickfile(filesetentry_t **filep, flowop_t *flowop, int flags, int tid)
242 {
243 	fileset_t	*fileset;
244 	int		fileindex;
245 
246 	if ((fileset = flowop->fo_fileset) == NULL) {
247 		filebench_log(LOG_ERROR, "flowop NO fileset");
248 		return (FILEBENCH_ERROR);
249 	}
250 
251 	if (flowop->fo_fileindex) {
252 		fileindex = (int)(avd_get_dbl(flowop->fo_fileindex) *
253 		    ((double)(fileset->fs_constentries / 2)));
254 		fileindex = fileindex % fileset->fs_constentries;
255 		flags |= FILESET_PICKBYINDEX;
256 	} else {
257 		fileindex = 0;
258 	}
259 
260 	if ((*filep = fileset_pick(fileset, FILESET_PICKFILE | flags,
261 	    tid, fileindex)) == NULL) {
262 		filebench_log(LOG_DEBUG_SCRIPT,
263 		    "flowop %s failed to pick file from fileset %s",
264 		    flowop->fo_name,
265 		    avd_get_str(fileset->fs_name));
266 		return (FILEBENCH_NORSC);
267 	}
268 
269 	return (FILEBENCH_OK);
270 }
271 
272 /*
273  * Obtain a filesetentry for a leaf directory. Result placed where dirp
274  * points. Supply with flowop and a flag to indicate whether an existent
275  * or non-existent leaf directory is required. Returns FILEBENCH_NORSC
276  * if all out of the appropriate type of directories, FILEBENCH_ERROR
277  * if the flowop does not point to a fileset, and FILEBENCH_OK otherwise.
278  */
279 static int
flowoplib_pickleafdir(filesetentry_t ** dirp,flowop_t * flowop,int flags)280 flowoplib_pickleafdir(filesetentry_t **dirp, flowop_t *flowop, int flags)
281 {
282 	fileset_t	*fileset;
283 	int		dirindex;
284 
285 	if ((fileset = flowop->fo_fileset) == NULL) {
286 		filebench_log(LOG_ERROR, "flowop NO fileset");
287 		return (FILEBENCH_ERROR);
288 	}
289 
290 	if (flowop->fo_fileindex) {
291 		dirindex = (int)(avd_get_dbl(flowop->fo_fileindex) *
292 		    ((double)(fileset->fs_constleafdirs / 2)));
293 		dirindex = dirindex % fileset->fs_constleafdirs;
294 		flags |= FILESET_PICKBYINDEX;
295 	} else {
296 		dirindex = 0;
297 	}
298 
299 	if ((*dirp = fileset_pick(fileset,
300 	    FILESET_PICKLEAFDIR | flags, 0, dirindex)) == NULL) {
301 		filebench_log(LOG_DEBUG_SCRIPT,
302 		    "flowop %s failed to pick directory from fileset %s",
303 		    flowop->fo_name,
304 		    avd_get_str(fileset->fs_name));
305 		return (FILEBENCH_NORSC);
306 	}
307 
308 	return (FILEBENCH_OK);
309 }
310 
311 /*
312  * Searches for a file descriptor. Tries the flowop's
313  * fo_fdnumber first and returns with it if it has been
314  * explicitly set (greater than 0). It next checks to
315  * see if a rotating file descriptor policy is in effect,
316  * and if not returns the fdnumber regardless of what
317  * it is. (note that if it is 0, it just selects to the
318  * default file descriptor in the threadflow's tf_fd
319  * array). If the rotating fd policy is in effect, it
320  * cycles from the end of the tf_fd array to one location
321  * beyond the maximum needed by the number of entries in
322  * the associated fileset on each invocation, then starts
323  * over from the end.
324  *
325  * The routine returns an index into the threadflow's
326  * tf_fd table where the actual file descriptor will be
327  * found. Note: the calling routine must not call this
328  * routine if the flowop does not have a fileset, and the
329  * flowop's fo_fdnumber is zero and fo_rotatefd is
330  * asserted, or an addressing fault may occur.
331  */
332 static int
flowoplib_fdnum(threadflow_t * threadflow,flowop_t * flowop)333 flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop)
334 {
335 	fbint_t	entries;
336 	int fdnumber = flowop->fo_fdnumber;
337 
338 	/* If the script sets the fd explicitly */
339 	if (fdnumber > 0)
340 		return (fdnumber);
341 
342 	/* If the flowop defaults to persistent fd */
343 	if (!avd_get_bool(flowop->fo_rotatefd))
344 		return (fdnumber);
345 
346 	if (flowop->fo_fileset == NULL) {
347 		filebench_log(LOG_ERROR, "flowop NULL file");
348 		return (FILEBENCH_ERROR);
349 	}
350 
351 	entries = flowop->fo_fileset->fs_constentries;
352 
353 	/* Rotate the fd on each flowop invocation */
354 	if (entries > (THREADFLOW_MAXFD / 2)) {
355 		filebench_log(LOG_ERROR, "Out of file descriptors in flowop %s"
356 		    " (too many files : %llu",
357 		    flowop->fo_name, (u_longlong_t)entries);
358 		return (FILEBENCH_ERROR);
359 	}
360 
361 	/* First time around */
362 	if (threadflow->tf_fdrotor == 0)
363 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
364 
365 	/* One fd for every file in the set */
366 	if (entries == (THREADFLOW_MAXFD - threadflow->tf_fdrotor))
367 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
368 
369 
370 	threadflow->tf_fdrotor--;
371 	filebench_log(LOG_DEBUG_IMPL, "selected fd = %d",
372 	    threadflow->tf_fdrotor);
373 	return (threadflow->tf_fdrotor);
374 }
375 
376 /*
377  * Determines the file descriptor to use, and attempts to open
378  * the file if it is not already open. Also determines the wss
379  * value. Returns FILEBENCH_ERROR on errors, FILESET_NORSC if
380  * if flowop_openfile_common couldn't obtain an appropriate file
381  * from a the fileset, and FILEBENCH_OK otherwise.
382  */
383 static int
flowoplib_filesetup(threadflow_t * threadflow,flowop_t * flowop,fbint_t * wssp,fb_fdesc_t ** fdescp)384 flowoplib_filesetup(threadflow_t *threadflow, flowop_t *flowop,
385     fbint_t *wssp, fb_fdesc_t **fdescp)
386 {
387 	int fd = flowoplib_fdnum(threadflow, flowop);
388 
389 	if (fd == -1)
390 		return (FILEBENCH_ERROR);
391 
392 	/* check for conflicting fdnumber and file name */
393 	if ((fd > 0) && (threadflow->tf_fse[fd] != NULL)) {
394 		char *fd_based_name;
395 
396 		fd_based_name =
397 		    avd_get_str(threadflow->tf_fse[fd]->fse_fileset->fs_name);
398 
399 		if (flowop->fo_filename != NULL) {
400 			char *fo_based_name;
401 
402 			fo_based_name = avd_get_str(flowop->fo_filename);
403 			if (strcmp(fd_based_name, fo_based_name) != 0) {
404 				filebench_log(LOG_ERROR, "Name of fd refer"
405 				    "enced fileset name (%s) CONFLICTS with"
406 				    " flowop supplied fileset name (%s)",
407 				    fd_based_name, fo_based_name);
408 				filebench_shutdown(1);
409 				return (FILEBENCH_ERROR);
410 			}
411 		}
412 	}
413 
414 	if (threadflow->tf_fd[fd].fd_ptr == NULL) {
415 		int ret;
416 
417 		if ((ret = flowoplib_openfile_common(
418 		    threadflow, flowop, fd)) != FILEBENCH_OK)
419 			return (ret);
420 
421 		if (threadflow->tf_fse[fd]) {
422 			filebench_log(LOG_DEBUG_IMPL, "opened file %s",
423 			    threadflow->tf_fse[fd]->fse_path);
424 		} else {
425 			filebench_log(LOG_DEBUG_IMPL,
426 			    "opened device %s/%s",
427 			    avd_get_str(flowop->fo_fileset->fs_path),
428 			    avd_get_str(flowop->fo_fileset->fs_name));
429 		}
430 	}
431 
432 	*fdescp = &(threadflow->tf_fd[fd]);
433 
434 	if ((*wssp = flowop->fo_constwss) == 0) {
435 		if (threadflow->tf_fse[fd])
436 			*wssp = threadflow->tf_fse[fd]->fse_size;
437 		else
438 			*wssp = avd_get_int(flowop->fo_fileset->fs_size);
439 	}
440 
441 	return (FILEBENCH_OK);
442 }
443 
444 /*
445  * Determines the io buffer or random offset into tf_mem for
446  * the IO operation. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
447  */
448 static int
flowoplib_iobufsetup(threadflow_t * threadflow,flowop_t * flowop,caddr_t * iobufp,fbint_t iosize)449 flowoplib_iobufsetup(threadflow_t *threadflow, flowop_t *flowop,
450     caddr_t *iobufp, fbint_t iosize)
451 {
452 	long memsize;
453 	size_t memoffset;
454 
455 	if (iosize == 0) {
456 		filebench_log(LOG_ERROR, "zero iosize for thread %s",
457 		    flowop->fo_name);
458 		return (FILEBENCH_ERROR);
459 	}
460 
461 	if ((memsize = threadflow->tf_constmemsize) != 0) {
462 
463 		/* use tf_mem for I/O with random offset */
464 		if (filebench_randomno(&memoffset,
465 		    memsize, iosize, NULL) == -1) {
466 			filebench_log(LOG_ERROR,
467 			    "tf_memsize smaller than IO size for thread %s",
468 			    flowop->fo_name);
469 			return (FILEBENCH_ERROR);
470 		}
471 		*iobufp = threadflow->tf_mem + memoffset;
472 
473 	} else {
474 		/* use private I/O buffer */
475 		if ((flowop->fo_buf != NULL) &&
476 		    (flowop->fo_buf_size < iosize)) {
477 			/* too small, so free up and re-allocate */
478 			free(flowop->fo_buf);
479 			flowop->fo_buf = NULL;
480 		}
481 
482 		/*
483 		 * Allocate memory for the  buffer. The memory is freed
484 		 * by flowop_destruct_generic() or by this routine if more
485 		 * memory is needed for the buffer.
486 		 */
487 		if ((flowop->fo_buf == NULL) && ((flowop->fo_buf
488 		    = (char *)malloc(iosize)) == NULL))
489 			return (FILEBENCH_ERROR);
490 
491 		flowop->fo_buf_size = iosize;
492 		*iobufp = flowop->fo_buf;
493 	}
494 	return (FILEBENCH_OK);
495 }
496 
497 /*
498  * Determines the file descriptor to use, opens it if necessary, the
499  * io buffer or random offset into tf_mem for IO operation and the wss
500  * value. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
501  */
502 int
flowoplib_iosetup(threadflow_t * threadflow,flowop_t * flowop,fbint_t * wssp,caddr_t * iobufp,fb_fdesc_t ** filedescp,fbint_t iosize)503 flowoplib_iosetup(threadflow_t *threadflow, flowop_t *flowop,
504     fbint_t *wssp, caddr_t *iobufp, fb_fdesc_t **filedescp, fbint_t iosize)
505 {
506 	int ret;
507 
508 	if ((ret = flowoplib_filesetup(threadflow, flowop, wssp, filedescp)) !=
509 	    FILEBENCH_OK)
510 		return (ret);
511 
512 	if ((ret = flowoplib_iobufsetup(threadflow, flowop, iobufp, iosize)) !=
513 	    FILEBENCH_OK)
514 		return (ret);
515 
516 	return (FILEBENCH_OK);
517 }
518 
519 /*
520  * Emulate posix read / pread. If the flowop has a fileset,
521  * a file descriptor number index is fetched, otherwise a
522  * supplied fileobj file is used. In either case the specified
523  * file will be opened if not already open. If the flowop has
524  * neither a fileset or fileobj, an error is logged and FILEBENCH_ERROR
525  * returned.
526  *
527  * The actual read is done to a random offset in the
528  * threadflow's thread memory (tf_mem), with a size set by
529  * fo_iosize and at either a random disk offset within the
530  * working set size, or at the next sequential location. If
531  * any errors are encountered, FILEBENCH_ERROR is returned,
532  * if no appropriate file can be obtained from the fileset then
533  * FILEBENCH_NORSC is returned, otherise FILEBENCH_OK is returned.
534  */
535 static int
flowoplib_read(threadflow_t * threadflow,flowop_t * flowop)536 flowoplib_read(threadflow_t *threadflow, flowop_t *flowop)
537 {
538 	caddr_t iobuf;
539 	fbint_t wss;
540 	fbint_t iosize;
541 	fb_fdesc_t *fdesc;
542 	int ret;
543 
544 
545 	iosize = avd_get_int(flowop->fo_iosize);
546 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
547 	    &fdesc, iosize)) != FILEBENCH_OK)
548 		return (ret);
549 
550 	if (avd_get_bool(flowop->fo_random)) {
551 		uint64_t fileoffset;
552 
553 		if (filebench_randomno64(&fileoffset,
554 		    wss, iosize, NULL) == -1) {
555 			filebench_log(LOG_ERROR,
556 			    "file size smaller than IO size for thread %s",
557 			    flowop->fo_name);
558 			return (FILEBENCH_ERROR);
559 		}
560 
561 		(void) flowop_beginop(threadflow, flowop);
562 		if ((ret = FB_PREAD(fdesc, iobuf,
563 		    iosize, (off64_t)fileoffset)) == -1) {
564 			(void) flowop_endop(threadflow, flowop, 0);
565 			filebench_log(LOG_ERROR,
566 			    "read file %s failed, offset %llu "
567 			    "io buffer %zd: %s",
568 			    avd_get_str(flowop->fo_fileset->fs_name),
569 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
570 			flowop_endop(threadflow, flowop, 0);
571 			return (FILEBENCH_ERROR);
572 		}
573 		(void) flowop_endop(threadflow, flowop, ret);
574 
575 		if ((ret == 0))
576 			(void) FB_LSEEK(fdesc, 0, SEEK_SET);
577 
578 	} else {
579 		(void) flowop_beginop(threadflow, flowop);
580 		if ((ret = FB_READ(fdesc, iobuf, iosize)) == -1) {
581 			(void) flowop_endop(threadflow, flowop, 0);
582 			filebench_log(LOG_ERROR,
583 			    "read file %s failed, io buffer %zd: %s",
584 			    avd_get_str(flowop->fo_fileset->fs_name),
585 			    iobuf, strerror(errno));
586 			(void) flowop_endop(threadflow, flowop, 0);
587 			return (FILEBENCH_ERROR);
588 		}
589 		(void) flowop_endop(threadflow, flowop, ret);
590 
591 		if ((ret == 0))
592 			(void) FB_LSEEK(fdesc, 0, SEEK_SET);
593 	}
594 
595 	return (FILEBENCH_OK);
596 }
597 
598 /*
599  * Initializes a "flowop_block" flowop. Specifically, it
600  * initializes the flowop's fo_cv and unlocks the fo_lock.
601  */
602 static int
flowoplib_block_init(flowop_t * flowop)603 flowoplib_block_init(flowop_t *flowop)
604 {
605 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d block init address %zx",
606 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
607 	(void) pthread_cond_init(&flowop->fo_cv, ipc_condattr());
608 	(void) ipc_mutex_unlock(&flowop->fo_lock);
609 
610 	return (FILEBENCH_OK);
611 }
612 
613 /*
614  * Blocks the threadflow until woken up by flowoplib_wakeup.
615  * The routine blocks on the flowop's fo_cv condition variable.
616  */
617 static int
flowoplib_block(threadflow_t * threadflow,flowop_t * flowop)618 flowoplib_block(threadflow_t *threadflow, flowop_t *flowop)
619 {
620 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d blocking at address %zx",
621 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
622 	(void) ipc_mutex_lock(&flowop->fo_lock);
623 
624 	flowop_beginop(threadflow, flowop);
625 	(void) pthread_cond_wait(&flowop->fo_cv, &flowop->fo_lock);
626 	flowop_endop(threadflow, flowop, 0);
627 
628 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
629 	    flowop->fo_name, flowop->fo_instance);
630 
631 	(void) ipc_mutex_unlock(&flowop->fo_lock);
632 
633 	return (FILEBENCH_OK);
634 }
635 
636 /*
637  * Wakes up one or more target blocking flowops.
638  * Sends broadcasts on the fo_cv condition variables of all
639  * flowops on the target list, except those that are
640  * FLOW_MASTER flowops. The target list consists of all
641  * flowops whose name matches this flowop's "fo_targetname"
642  * attribute. The target list is generated on the first
643  * invocation, and the run will be shutdown if no targets
644  * are found. Otherwise the routine always returns FILEBENCH_OK.
645  */
646 static int
flowoplib_wakeup(threadflow_t * threadflow,flowop_t * flowop)647 flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop)
648 {
649 	flowop_t *target;
650 
651 	/* if this is the first wakeup, create the wakeup list */
652 	if (flowop->fo_targets == NULL) {
653 		flowop_t *result = flowop_find(flowop->fo_targetname);
654 
655 		flowop->fo_targets = result;
656 		if (result == NULL) {
657 			filebench_log(LOG_ERROR,
658 			    "wakeup: could not find op %s for thread %s",
659 			    flowop->fo_targetname,
660 			    threadflow->tf_name);
661 			filebench_shutdown(1);
662 		}
663 		while (result) {
664 			result->fo_targetnext =
665 			    result->fo_resultnext;
666 			result = result->fo_resultnext;
667 		}
668 	}
669 
670 	target = flowop->fo_targets;
671 
672 	/* wakeup the targets */
673 	while (target) {
674 		if (target->fo_instance == FLOW_MASTER) {
675 			target = target->fo_targetnext;
676 			continue;
677 		}
678 		filebench_log(LOG_DEBUG_IMPL,
679 		    "wakeup flow %s-%d at address %zx",
680 		    target->fo_name,
681 		    target->fo_instance,
682 		    &target->fo_cv);
683 
684 		flowop_beginop(threadflow, flowop);
685 		(void) ipc_mutex_lock(&target->fo_lock);
686 		(void) pthread_cond_broadcast(&target->fo_cv);
687 		(void) ipc_mutex_unlock(&target->fo_lock);
688 		flowop_endop(threadflow, flowop, 0);
689 
690 		target = target->fo_targetnext;
691 	}
692 
693 	return (FILEBENCH_OK);
694 }
695 
696 /*
697  * "think time" routines. the "hog" routine consumes cpu cycles as
698  * it "thinks", while the "delay" flowop simply calls sleep() to delay
699  * for a given number of seconds without consuming cpu cycles.
700  */
701 
702 
703 /*
704  * Consumes CPU cycles and memory bandwidth by looping for
705  * flowop->fo_value times. With each loop sets memory location
706  * threadflow->tf_mem to 1.
707  */
708 static int
flowoplib_hog(threadflow_t * threadflow,flowop_t * flowop)709 flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop)
710 {
711 	uint64_t value = avd_get_int(flowop->fo_value);
712 	int i;
713 
714 	filebench_log(LOG_DEBUG_IMPL, "hog enter");
715 	flowop_beginop(threadflow, flowop);
716 	if (threadflow->tf_mem != NULL) {
717 		for (i = 0; i < value; i++)
718 			*(threadflow->tf_mem) = 1;
719 	}
720 	flowop_endop(threadflow, flowop, 0);
721 	filebench_log(LOG_DEBUG_IMPL, "hog exit");
722 	return (FILEBENCH_OK);
723 }
724 
725 
726 /*
727  * Delays for fo_value seconds.
728  */
729 static int
flowoplib_delay(threadflow_t * threadflow,flowop_t * flowop)730 flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop)
731 {
732 	int value = avd_get_int(flowop->fo_value);
733 
734 	flowop_beginop(threadflow, flowop);
735 	(void) sleep(value);
736 	flowop_endop(threadflow, flowop, 0);
737 	return (FILEBENCH_OK);
738 }
739 
740 /*
741  * Rate limiting routines. This is the event consuming half of the
742  * event system. Each of the four following routines will limit the rate
743  * to one unit of either calls, issued I/O operations, issued filebench
744  * operations, or I/O bandwidth. Since there is only one event generator,
745  * the events will be divided amoung multiple instances of an event
746  * consumer, and further divided among different consumers if more than
747  * one has been defined. There is no mechanism to enforce equal sharing
748  * of events.
749  */
750 
751 /*
752  * Completes one invocation per posted event. If eventgen_q
753  * has an event count greater than zero, one will be removed
754  * (count decremented), otherwise the calling thread will
755  * block until another event has been posted. Always returns 0
756  */
757 static int
flowoplib_eventlimit(threadflow_t * threadflow,flowop_t * flowop)758 flowoplib_eventlimit(threadflow_t *threadflow, flowop_t *flowop)
759 {
760 	/* Immediately bail if not set/enabled */
761 	if (!filebench_shm->shm_eventgen_enabled)
762 		return (FILEBENCH_OK);
763 
764 	if (flowop->fo_initted == 0) {
765 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
766 		    flowop, threadflow->tf_name, threadflow->tf_instance);
767 		flowop->fo_initted = 1;
768 	}
769 
770 	flowop_beginop(threadflow, flowop);
771 	while (filebench_shm->shm_eventgen_enabled) {
772 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
773 		if (filebench_shm->shm_eventgen_q > 0) {
774 			filebench_shm->shm_eventgen_q--;
775 			(void) ipc_mutex_unlock(
776 			    &filebench_shm->shm_eventgen_lock);
777 			break;
778 		}
779 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
780 		    &filebench_shm->shm_eventgen_lock);
781 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
782 	}
783 	flowop_endop(threadflow, flowop, 0);
784 	return (FILEBENCH_OK);
785 }
786 
787 static int
flowoplib_event_find_target(threadflow_t * threadflow,flowop_t * flowop)788 flowoplib_event_find_target(threadflow_t *threadflow, flowop_t *flowop)
789 {
790 	if (flowop->fo_targetname[0] != '\0') {
791 
792 		/* Try to use statistics from specific flowop */
793 		flowop->fo_targets =
794 		    flowop_find_from_list(flowop->fo_targetname,
795 		    threadflow->tf_thrd_fops);
796 		if (flowop->fo_targets == NULL) {
797 			filebench_log(LOG_ERROR,
798 			    "limit target: could not find flowop %s",
799 			    flowop->fo_targetname);
800 			filebench_shutdown(1);
801 			return (FILEBENCH_ERROR);
802 		}
803 	} else {
804 		/* use total workload statistics */
805 		flowop->fo_targets = NULL;
806 	}
807 	return (FILEBENCH_OK);
808 }
809 
810 /*
811  * Blocks the calling thread if the number of issued I/O
812  * operations exceeds the number of posted events, thus
813  * limiting the average I/O operation rate to the rate
814  * specified by eventgen_hz. Always returns FILEBENCH_OK.
815  */
816 static int
flowoplib_iopslimit(threadflow_t * threadflow,flowop_t * flowop)817 flowoplib_iopslimit(threadflow_t *threadflow, flowop_t *flowop)
818 {
819 	uint64_t iops;
820 	uint64_t delta;
821 	uint64_t events;
822 
823 	/* Immediately bail if not set/enabled */
824 	if (!filebench_shm->shm_eventgen_enabled)
825 		return (FILEBENCH_OK);
826 
827 	if (flowop->fo_initted == 0) {
828 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
829 		    flowop, threadflow->tf_name, threadflow->tf_instance);
830 		flowop->fo_initted = 1;
831 
832 		if (flowoplib_event_find_target(threadflow, flowop)
833 		    == FILEBENCH_ERROR)
834 			return (FILEBENCH_ERROR);
835 
836 		if (flowop->fo_targets && ((flowop->fo_targets->fo_attrs &
837 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
838 			filebench_log(LOG_ERROR,
839 			    "WARNING: Flowop %s does no IO",
840 			    flowop->fo_targets->fo_name);
841 			filebench_shutdown(1);
842 			return (FILEBENCH_ERROR);
843 		}
844 	}
845 
846 	if (flowop->fo_targets) {
847 		/*
848 		 * Note that fs_count is already the sum of fs_rcount
849 		 * and fs_wcount if looking at a single flowop.
850 		 */
851 		iops = flowop->fo_targets->fo_stats.fs_count;
852 	} else {
853 		(void) ipc_mutex_lock(&controlstats_lock);
854 		iops = (controlstats.fs_rcount +
855 		    controlstats.fs_wcount);
856 		(void) ipc_mutex_unlock(&controlstats_lock);
857 	}
858 
859 	/* Is this the first time around */
860 	if (flowop->fo_tputlast == 0) {
861 		flowop->fo_tputlast = iops;
862 		return (FILEBENCH_OK);
863 	}
864 
865 	delta = iops - flowop->fo_tputlast;
866 	flowop->fo_tputbucket -= delta;
867 	flowop->fo_tputlast = iops;
868 
869 	/* No need to block if the q isn't empty */
870 	if (flowop->fo_tputbucket >= 0LL) {
871 		flowop_endop(threadflow, flowop, 0);
872 		return (FILEBENCH_OK);
873 	}
874 
875 	iops = flowop->fo_tputbucket * -1;
876 	events = iops;
877 
878 	flowop_beginop(threadflow, flowop);
879 	while (filebench_shm->shm_eventgen_enabled) {
880 
881 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
882 		if (filebench_shm->shm_eventgen_q >= events) {
883 			filebench_shm->shm_eventgen_q -= events;
884 			(void) ipc_mutex_unlock(
885 			    &filebench_shm->shm_eventgen_lock);
886 			flowop->fo_tputbucket += events;
887 			break;
888 		}
889 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
890 		    &filebench_shm->shm_eventgen_lock);
891 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
892 	}
893 	flowop_endop(threadflow, flowop, 0);
894 
895 	return (FILEBENCH_OK);
896 }
897 
898 /*
899  * Blocks the calling thread if the number of issued filebench
900  * operations exceeds the number of posted events, thus limiting
901  * the average filebench operation rate to the rate specified by
902  * eventgen_hz. Always returns FILEBENCH_OK.
903  */
904 static int
flowoplib_opslimit(threadflow_t * threadflow,flowop_t * flowop)905 flowoplib_opslimit(threadflow_t *threadflow, flowop_t *flowop)
906 {
907 	uint64_t ops;
908 	uint64_t delta;
909 	uint64_t events;
910 
911 	/* Immediately bail if not set/enabled */
912 	if (!filebench_shm->shm_eventgen_enabled)
913 		return (FILEBENCH_OK);
914 
915 	if (flowop->fo_initted == 0) {
916 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
917 		    flowop, threadflow->tf_name, threadflow->tf_instance);
918 		flowop->fo_initted = 1;
919 
920 		if (flowoplib_event_find_target(threadflow, flowop)
921 		    == FILEBENCH_ERROR)
922 			return (FILEBENCH_ERROR);
923 	}
924 
925 	if (flowop->fo_targets) {
926 		ops = flowop->fo_targets->fo_stats.fs_count;
927 	} else {
928 		(void) ipc_mutex_lock(&controlstats_lock);
929 		ops = controlstats.fs_count;
930 		(void) ipc_mutex_unlock(&controlstats_lock);
931 	}
932 
933 	/* Is this the first time around */
934 	if (flowop->fo_tputlast == 0) {
935 		flowop->fo_tputlast = ops;
936 		return (FILEBENCH_OK);
937 	}
938 
939 	delta = ops - flowop->fo_tputlast;
940 	flowop->fo_tputbucket -= delta;
941 	flowop->fo_tputlast = ops;
942 
943 	/* No need to block if the q isn't empty */
944 	if (flowop->fo_tputbucket >= 0LL) {
945 		flowop_endop(threadflow, flowop, 0);
946 		return (FILEBENCH_OK);
947 	}
948 
949 	ops = flowop->fo_tputbucket * -1;
950 	events = ops;
951 
952 	flowop_beginop(threadflow, flowop);
953 	while (filebench_shm->shm_eventgen_enabled) {
954 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
955 		if (filebench_shm->shm_eventgen_q >= events) {
956 			filebench_shm->shm_eventgen_q -= events;
957 			(void) ipc_mutex_unlock(
958 			    &filebench_shm->shm_eventgen_lock);
959 			flowop->fo_tputbucket += events;
960 			break;
961 		}
962 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
963 		    &filebench_shm->shm_eventgen_lock);
964 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
965 	}
966 	flowop_endop(threadflow, flowop, 0);
967 
968 	return (FILEBENCH_OK);
969 }
970 
971 
972 /*
973  * Blocks the calling thread if the number of bytes of I/O
974  * issued exceeds one megabyte times the number of posted
975  * events, thus limiting the average I/O byte rate to one
976  * megabyte times the event rate as set by eventgen_hz.
977  * Always retuns FILEBENCH_OK.
978  */
979 static int
flowoplib_bwlimit(threadflow_t * threadflow,flowop_t * flowop)980 flowoplib_bwlimit(threadflow_t *threadflow, flowop_t *flowop)
981 {
982 	uint64_t bytes;
983 	uint64_t delta;
984 	uint64_t events;
985 
986 	/* Immediately bail if not set/enabled */
987 	if (!filebench_shm->shm_eventgen_enabled)
988 		return (FILEBENCH_OK);
989 
990 	if (flowop->fo_initted == 0) {
991 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
992 		    flowop, threadflow->tf_name, threadflow->tf_instance);
993 		flowop->fo_initted = 1;
994 
995 		if (flowoplib_event_find_target(threadflow, flowop)
996 		    == FILEBENCH_ERROR)
997 			return (FILEBENCH_ERROR);
998 
999 		if ((flowop->fo_targets) &&
1000 		    ((flowop->fo_targets->fo_attrs &
1001 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
1002 			filebench_log(LOG_ERROR,
1003 			    "WARNING: Flowop %s does no Reads or Writes",
1004 			    flowop->fo_targets->fo_name);
1005 			filebench_shutdown(1);
1006 			return (FILEBENCH_ERROR);
1007 		}
1008 	}
1009 
1010 	if (flowop->fo_targets) {
1011 		/*
1012 		 * Note that fs_bytes is already the sum of fs_rbytes
1013 		 * and fs_wbytes if looking at a single flowop.
1014 		 */
1015 		bytes = flowop->fo_targets->fo_stats.fs_bytes;
1016 	} else {
1017 		(void) ipc_mutex_lock(&controlstats_lock);
1018 		bytes = (controlstats.fs_rbytes +
1019 		    controlstats.fs_wbytes);
1020 		(void) ipc_mutex_unlock(&controlstats_lock);
1021 	}
1022 
1023 	/* Is this the first time around? */
1024 	if (flowop->fo_tputlast == 0) {
1025 		flowop->fo_tputlast = bytes;
1026 		return (FILEBENCH_OK);
1027 	}
1028 
1029 	delta = bytes - flowop->fo_tputlast;
1030 	flowop->fo_tputbucket -= delta;
1031 	flowop->fo_tputlast = bytes;
1032 
1033 	/* No need to block if the q isn't empty */
1034 	if (flowop->fo_tputbucket >= 0LL) {
1035 		flowop_endop(threadflow, flowop, 0);
1036 		return (FILEBENCH_OK);
1037 	}
1038 
1039 	bytes = flowop->fo_tputbucket * -1;
1040 	events = (bytes / MB) + 1;
1041 
1042 	filebench_log(LOG_DEBUG_IMPL, "%llu bytes, %llu events",
1043 	    (u_longlong_t)bytes, (u_longlong_t)events);
1044 
1045 	flowop_beginop(threadflow, flowop);
1046 	while (filebench_shm->shm_eventgen_enabled) {
1047 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1048 		if (filebench_shm->shm_eventgen_q >= events) {
1049 			filebench_shm->shm_eventgen_q -= events;
1050 			(void) ipc_mutex_unlock(
1051 			    &filebench_shm->shm_eventgen_lock);
1052 			flowop->fo_tputbucket += (events * MB);
1053 			break;
1054 		}
1055 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1056 		    &filebench_shm->shm_eventgen_lock);
1057 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1058 	}
1059 	flowop_endop(threadflow, flowop, 0);
1060 
1061 	return (FILEBENCH_OK);
1062 }
1063 
1064 /*
1065  * These flowops terminate a benchmark run when either the specified
1066  * number of bytes of I/O (flowoplib_finishonbytes) or the specified
1067  * number of I/O operations (flowoplib_finishoncount) have been generated.
1068  */
1069 
1070 
1071 /*
1072  * Stop filebench run when specified number of I/O bytes have been
1073  * transferred. Compares controlstats.fs_bytes with flowop->value,
1074  * and if greater returns 1, stopping the run, if not, returns 0
1075  * to continue running.
1076  */
1077 static int
flowoplib_finishonbytes(threadflow_t * threadflow,flowop_t * flowop)1078 flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop)
1079 {
1080 	uint64_t bytes_io;		/* Bytes of I/O delivered so far */
1081 	uint64_t byte_lim = flowop->fo_constvalue;  /* Total Bytes desired */
1082 						    /* Uses constant value */
1083 
1084 	if (flowop->fo_initted == 0) {
1085 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1086 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1087 		flowop->fo_initted = 1;
1088 
1089 		if (flowoplib_event_find_target(threadflow, flowop)
1090 		    == FILEBENCH_ERROR)
1091 			return (FILEBENCH_ERROR);
1092 
1093 		if ((flowop->fo_targets) &&
1094 		    ((flowop->fo_targets->fo_attrs &
1095 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
1096 			filebench_log(LOG_ERROR,
1097 			    "WARNING: Flowop %s does no Reads or Writes",
1098 			    flowop->fo_targets->fo_name);
1099 			filebench_shutdown(1);
1100 			return (FILEBENCH_ERROR);
1101 		}
1102 	}
1103 
1104 	if (flowop->fo_targets) {
1105 		bytes_io = flowop->fo_targets->fo_stats.fs_bytes;
1106 	} else {
1107 		(void) ipc_mutex_lock(&controlstats_lock);
1108 		bytes_io = controlstats.fs_bytes;
1109 		(void) ipc_mutex_unlock(&controlstats_lock);
1110 	}
1111 
1112 	flowop_beginop(threadflow, flowop);
1113 	if (bytes_io > byte_lim) {
1114 		flowop_endop(threadflow, flowop, 0);
1115 		return (FILEBENCH_DONE);
1116 	}
1117 	flowop_endop(threadflow, flowop, 0);
1118 
1119 	return (FILEBENCH_OK);
1120 }
1121 
1122 /*
1123  * Stop filebench run when specified number of I/O operations have
1124  * been performed. Compares controlstats.fs_count with *flowop->value,
1125  * and if greater returns 1, stopping the run, if not, returns FILEBENCH_OK
1126  * to continue running.
1127  */
1128 static int
flowoplib_finishoncount(threadflow_t * threadflow,flowop_t * flowop)1129 flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop)
1130 {
1131 	uint64_t ops;
1132 	uint64_t count = flowop->fo_constvalue; /* use constant value */
1133 
1134 	if (flowop->fo_initted == 0) {
1135 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1136 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1137 		flowop->fo_initted = 1;
1138 
1139 		if (flowoplib_event_find_target(threadflow, flowop)
1140 		    == FILEBENCH_ERROR)
1141 			return (FILEBENCH_ERROR);
1142 	}
1143 
1144 	if (flowop->fo_targets) {
1145 		ops = flowop->fo_targets->fo_stats.fs_count;
1146 	} else {
1147 		(void) ipc_mutex_lock(&controlstats_lock);
1148 		ops = controlstats.fs_count;
1149 		(void) ipc_mutex_unlock(&controlstats_lock);
1150 	}
1151 
1152 	flowop_beginop(threadflow, flowop);
1153 	if (ops >= count) {
1154 		flowop_endop(threadflow, flowop, 0);
1155 		return (FILEBENCH_DONE);
1156 	}
1157 	flowop_endop(threadflow, flowop, 0);
1158 
1159 	return (FILEBENCH_OK);
1160 }
1161 
1162 /*
1163  * Semaphore synchronization using either System V semaphores or
1164  * posix semaphores. If System V semaphores are available, they will be
1165  * used, otherwise posix semaphores will be used.
1166  */
1167 
1168 
1169 /*
1170  * Initializes the filebench "block on semaphore" flowop.
1171  * If System V semaphores are implemented, the routine
1172  * initializes the System V semaphore subsystem if it hasn't
1173  * already been initialized, also allocates a pair of semids
1174  * and initializes the highwater System V semaphore.
1175  * If no System V semaphores, then does nothing special.
1176  * Returns FILEBENCH_ERROR if it cannot acquire a set of System V semphores
1177  * or if the initial post to the semaphore set fails. Returns FILEBENCH_OK
1178  * on success.
1179  */
1180 static int
flowoplib_semblock_init(flowop_t * flowop)1181 flowoplib_semblock_init(flowop_t *flowop)
1182 {
1183 
1184 #ifdef HAVE_SYSV_SEM
1185 	int sys_semid;
1186 	struct sembuf sbuf[2];
1187 	int highwater;
1188 
1189 	ipc_seminit();
1190 
1191 	flowop->fo_semid_lw = ipc_semidalloc();
1192 	flowop->fo_semid_hw = ipc_semidalloc();
1193 
1194 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d semblock init semid=%x",
1195 	    flowop->fo_name, flowop->fo_instance, flowop->fo_semid_lw);
1196 
1197 	sys_semid = filebench_shm->shm_sys_semid;
1198 
1199 	if ((highwater = flowop->fo_semid_hw) == 0)
1200 		highwater = flowop->fo_constvalue; /* use constant value */
1201 
1202 	filebench_log(LOG_DEBUG_IMPL, "setting highwater to : %d", highwater);
1203 
1204 	sbuf[0].sem_num = (short)highwater;
1205 	sbuf[0].sem_op = avd_get_int(flowop->fo_highwater);
1206 	sbuf[0].sem_flg = 0;
1207 	if ((semop(sys_semid, &sbuf[0], 1) == -1) && errno) {
1208 		filebench_log(LOG_ERROR, "semblock init post failed: %s (%d,"
1209 		    "%d)", strerror(errno), sbuf[0].sem_num, sbuf[0].sem_op);
1210 		return (FILEBENCH_ERROR);
1211 	}
1212 #else
1213 	filebench_log(LOG_DEBUG_IMPL,
1214 	    "flow %s-%d semblock init with posix semaphore",
1215 	    flowop->fo_name, flowop->fo_instance);
1216 
1217 	sem_init(&flowop->fo_sem, 1, 0);
1218 #endif	/* HAVE_SYSV_SEM */
1219 
1220 	if (!(avd_get_bool(flowop->fo_blocking)))
1221 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1222 
1223 	return (FILEBENCH_OK);
1224 }
1225 
1226 /*
1227  * Releases the semids for the System V semaphore allocated
1228  * to this flowop. If not using System V semaphores, then
1229  * it is effectively just a no-op.
1230  */
1231 static void
flowoplib_semblock_destruct(flowop_t * flowop)1232 flowoplib_semblock_destruct(flowop_t *flowop)
1233 {
1234 #ifdef HAVE_SYSV_SEM
1235 	ipc_semidfree(flowop->fo_semid_lw);
1236 	ipc_semidfree(flowop->fo_semid_hw);
1237 #else
1238 	sem_destroy(&flowop->fo_sem);
1239 #endif /* HAVE_SYSV_SEM */
1240 }
1241 
1242 /*
1243  * Attempts to pass a System V or posix semaphore as appropriate,
1244  * and blocks if necessary. Returns FILEBENCH_ERROR if a set of System V
1245  * semphores is not available or cannot be acquired, or if the initial
1246  * post to the semaphore set fails. Returns FILEBENCH_OK on success.
1247  */
1248 static int
flowoplib_semblock(threadflow_t * threadflow,flowop_t * flowop)1249 flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop)
1250 {
1251 
1252 #ifdef HAVE_SYSV_SEM
1253 	struct sembuf sbuf[2];
1254 	int value = avd_get_int(flowop->fo_value);
1255 	int sys_semid;
1256 	struct timespec timeout;
1257 
1258 	sys_semid = filebench_shm->shm_sys_semid;
1259 
1260 	filebench_log(LOG_DEBUG_IMPL,
1261 	    "flow %s-%d sem blocking on id %x num %x value %d",
1262 	    flowop->fo_name, flowop->fo_instance, sys_semid,
1263 	    flowop->fo_semid_hw, value);
1264 
1265 	/* Post, decrement the increment the hw queue */
1266 	sbuf[0].sem_num = flowop->fo_semid_hw;
1267 	sbuf[0].sem_op = (short)value;
1268 	sbuf[0].sem_flg = 0;
1269 	sbuf[1].sem_num = flowop->fo_semid_lw;
1270 	sbuf[1].sem_op = value * -1;
1271 	sbuf[1].sem_flg = 0;
1272 	timeout.tv_sec = 600;
1273 	timeout.tv_nsec = 0;
1274 
1275 	if (avd_get_bool(flowop->fo_blocking))
1276 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1277 
1278 	flowop_beginop(threadflow, flowop);
1279 
1280 #ifdef HAVE_SEMTIMEDOP
1281 	(void) semtimedop(sys_semid, &sbuf[0], 1, &timeout);
1282 	(void) semtimedop(sys_semid, &sbuf[1], 1, &timeout);
1283 #else
1284 	(void) semop(sys_semid, &sbuf[0], 1);
1285 	(void) semop(sys_semid, &sbuf[1], 1);
1286 #endif /* HAVE_SEMTIMEDOP */
1287 
1288 	if (avd_get_bool(flowop->fo_blocking))
1289 		(void) ipc_mutex_lock(&flowop->fo_lock);
1290 
1291 	flowop_endop(threadflow, flowop, 0);
1292 
1293 #else
1294 	int value = avd_get_int(flowop->fo_value);
1295 	int i;
1296 
1297 	filebench_log(LOG_DEBUG_IMPL,
1298 	    "flow %s-%d sem blocking on posix semaphore",
1299 	    flowop->fo_name, flowop->fo_instance);
1300 
1301 	/* Decrement sem by value */
1302 	for (i = 0; i < value; i++) {
1303 		if (sem_wait(&flowop->fo_sem) == -1) {
1304 			filebench_log(LOG_ERROR, "semop wait failed");
1305 			return (FILEBENCH_ERROR);
1306 		}
1307 	}
1308 
1309 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d sem unblocking",
1310 	    flowop->fo_name, flowop->fo_instance);
1311 #endif /* HAVE_SYSV_SEM */
1312 
1313 	return (FILEBENCH_OK);
1314 }
1315 
1316 /*
1317  * Calls ipc_seminit(). Always returns FILEBENCH_OK.
1318  */
1319 /* ARGSUSED */
1320 static int
flowoplib_sempost_init(flowop_t * flowop)1321 flowoplib_sempost_init(flowop_t *flowop)
1322 {
1323 #ifdef HAVE_SYSV_SEM
1324 	ipc_seminit();
1325 #endif /* HAVE_SYSV_SEM */
1326 	return (FILEBENCH_OK);
1327 }
1328 
1329 /*
1330  * Post to a System V or posix semaphore as appropriate.
1331  * On the first call for a given flowop instance, this routine
1332  * will use the fo_targetname attribute to locate all semblock
1333  * flowops that are expecting posts from this flowop. All
1334  * target flowops on this list will have a post operation done
1335  * to their semaphores on each call.
1336  */
1337 static int
flowoplib_sempost(threadflow_t * threadflow,flowop_t * flowop)1338 flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop)
1339 {
1340 	flowop_t *target;
1341 
1342 	filebench_log(LOG_DEBUG_IMPL,
1343 	    "sempost flow %s-%d",
1344 	    flowop->fo_name,
1345 	    flowop->fo_instance);
1346 
1347 	/* if this is the first post, create the post list */
1348 	if (flowop->fo_targets == NULL) {
1349 		flowop_t *result = flowop_find(flowop->fo_targetname);
1350 
1351 		flowop->fo_targets = result;
1352 
1353 		if (result == NULL) {
1354 			filebench_log(LOG_ERROR,
1355 			    "sempost: could not find op %s for thread %s",
1356 			    flowop->fo_targetname,
1357 			    threadflow->tf_name);
1358 			filebench_shutdown(1);
1359 		}
1360 
1361 		while (result) {
1362 			result->fo_targetnext =
1363 			    result->fo_resultnext;
1364 			result = result->fo_resultnext;
1365 		}
1366 	}
1367 
1368 	target = flowop->fo_targets;
1369 
1370 	flowop_beginop(threadflow, flowop);
1371 	/* post to the targets */
1372 	while (target) {
1373 #ifdef HAVE_SYSV_SEM
1374 		struct sembuf sbuf[2];
1375 		int sys_semid;
1376 		int blocking;
1377 #else
1378 		int i;
1379 #endif /* HAVE_SYSV_SEM */
1380 		struct timespec timeout;
1381 		int value = (int)avd_get_int(flowop->fo_value);
1382 
1383 		if (target->fo_instance == FLOW_MASTER) {
1384 			target = target->fo_targetnext;
1385 			continue;
1386 		}
1387 
1388 #ifdef HAVE_SYSV_SEM
1389 
1390 		filebench_log(LOG_DEBUG_IMPL,
1391 		    "sempost flow %s-%d num %x",
1392 		    target->fo_name,
1393 		    target->fo_instance,
1394 		    target->fo_semid_lw);
1395 
1396 		sys_semid = filebench_shm->shm_sys_semid;
1397 		sbuf[0].sem_num = target->fo_semid_lw;
1398 		sbuf[0].sem_op = (short)value;
1399 		sbuf[0].sem_flg = 0;
1400 		sbuf[1].sem_num = target->fo_semid_hw;
1401 		sbuf[1].sem_op = value * -1;
1402 		sbuf[1].sem_flg = 0;
1403 		timeout.tv_sec = 600;
1404 		timeout.tv_nsec = 0;
1405 
1406 		if (avd_get_bool(flowop->fo_blocking))
1407 			blocking = 1;
1408 		else
1409 			blocking = 0;
1410 
1411 #ifdef HAVE_SEMTIMEDOP
1412 		if ((semtimedop(sys_semid, &sbuf[0], blocking + 1,
1413 		    &timeout) == -1) && (errno && (errno != EAGAIN))) {
1414 #else
1415 		if ((semop(sys_semid, &sbuf[0], blocking + 1) == -1) &&
1416 		    (errno && (errno != EAGAIN))) {
1417 #endif /* HAVE_SEMTIMEDOP */
1418 			filebench_log(LOG_ERROR, "semop post failed: %s",
1419 			    strerror(errno));
1420 			return (FILEBENCH_ERROR);
1421 		}
1422 
1423 		filebench_log(LOG_DEBUG_IMPL,
1424 		    "flow %s-%d finished posting",
1425 		    target->fo_name, target->fo_instance);
1426 #else
1427 		filebench_log(LOG_DEBUG_IMPL,
1428 		    "sempost flow %s-%d to posix semaphore",
1429 		    target->fo_name,
1430 		    target->fo_instance);
1431 
1432 		/* Increment sem by value */
1433 		for (i = 0; i < value; i++) {
1434 			if (sem_post(&target->fo_sem) == -1) {
1435 				filebench_log(LOG_ERROR, "semop post failed");
1436 				return (FILEBENCH_ERROR);
1437 			}
1438 		}
1439 
1440 		filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
1441 		    target->fo_name, target->fo_instance);
1442 #endif /* HAVE_SYSV_SEM */
1443 
1444 		target = target->fo_targetnext;
1445 	}
1446 	flowop_endop(threadflow, flowop, 0);
1447 
1448 	return (FILEBENCH_OK);
1449 }
1450 
1451 
1452 /*
1453  * Section for exercising create / open / close / delete operations
1454  * on files within a fileset. For proper operation, the flowop attribute
1455  * "fd", which sets the fo_fdnumber field in the flowop, must be used
1456  * so that the same file is opened and later closed. "fd" is an index
1457  * into a pair of arrays maintained by threadflows, one of which
1458  * contains the operating system assigned file descriptors and the other
1459  * a pointer to the filesetentry whose file the file descriptor
1460  * references. An openfile flowop defined without fd being set will use
1461  * the default (0) fd or, if specified, rotate through fd indices, but
1462  * createfile and closefile must use the default or a specified fd.
1463  * Meanwhile deletefile picks and arbitrary file to delete, regardless
1464  * of fd attribute.
1465  */
1466 
1467 /*
1468  * Emulates (and actually does) file open. Obtains a file descriptor
1469  * index, then calls flowoplib_openfile_common() to open. Returns
1470  * FILEBENCH_ERROR if no file descriptor is found, and returns the
1471  * status from flowoplib_openfile_common otherwise (FILEBENCH_ERROR,
1472  * FILEBENCH_NORSC, FILEBENCH_OK).
1473  */
1474 static int
1475 flowoplib_openfile(threadflow_t *threadflow, flowop_t *flowop)
1476 {
1477 	int fd = flowoplib_fdnum(threadflow, flowop);
1478 
1479 	if (fd == -1)
1480 		return (FILEBENCH_ERROR);
1481 
1482 	return (flowoplib_openfile_common(threadflow, flowop, fd));
1483 }
1484 
1485 /*
1486  * Common file opening code for filesets. Uses the supplied
1487  * file descriptor index to determine the tf_fd entry to use.
1488  * If the entry is empty (0) and the fileset exists, fileset
1489  * pick is called to select a fileset entry to use. The file
1490  * specified in the filesetentry is opened, and the returned
1491  * operating system file descriptor and a pointer to the
1492  * filesetentry are stored in tf_fd[fd] and tf_fse[fd],
1493  * respectively. Returns FILEBENCH_ERROR on error,
1494  * FILEBENCH_NORSC if no suitable filesetentry can be found,
1495  * and FILEBENCH_OK on success.
1496  */
1497 static int
1498 flowoplib_openfile_common(threadflow_t *threadflow, flowop_t *flowop, int fd)
1499 {
1500 	filesetentry_t *file;
1501 	char *fileset_name;
1502 	int tid = 0;
1503 	int openflag = 0;
1504 	int err;
1505 
1506 	if (flowop->fo_fileset == NULL) {
1507 		filebench_log(LOG_ERROR, "flowop NULL file");
1508 		return (FILEBENCH_ERROR);
1509 	}
1510 
1511 	if ((fileset_name =
1512 	    avd_get_str(flowop->fo_fileset->fs_name)) == NULL) {
1513 		filebench_log(LOG_ERROR,
1514 		    "flowop %s: fileset has no name", flowop->fo_name);
1515 		return (FILEBENCH_ERROR);
1516 	}
1517 
1518 	/*
1519 	 * set the open flag for read only or read/write, as appropriate.
1520 	 */
1521 	if (avd_get_bool(flowop->fo_fileset->fs_readonly) == TRUE)
1522 		openflag = O_RDONLY;
1523 	else
1524 		openflag = O_RDWR;
1525 
1526 	/*
1527 	 * If the flowop doesn't default to persistent fd
1528 	 * then get unique thread ID for use by fileset_pick
1529 	 */
1530 	if (avd_get_bool(flowop->fo_rotatefd))
1531 		tid = threadflow->tf_utid;
1532 
1533 	if (threadflow->tf_fd[fd].fd_ptr != NULL) {
1534 		filebench_log(LOG_ERROR,
1535 		    "flowop %s attempted to open without closing on fd %d",
1536 		    flowop->fo_name, fd);
1537 		return (FILEBENCH_ERROR);
1538 	}
1539 
1540 #ifdef HAVE_RAW_SUPPORT
1541 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1542 		int open_attrs = 0;
1543 		char name[MAXPATHLEN];
1544 
1545 		(void) fb_strlcpy(name,
1546 		    avd_get_str(flowop->fo_fileset->fs_path), MAXPATHLEN);
1547 		(void) fb_strlcat(name, "/", MAXPATHLEN);
1548 		(void) fb_strlcat(name, fileset_name, MAXPATHLEN);
1549 
1550 		if (avd_get_bool(flowop->fo_dsync)) {
1551 #ifdef sun
1552 			open_attrs |= O_DSYNC;
1553 #else
1554 			open_attrs |= O_FSYNC;
1555 #endif
1556 		}
1557 
1558 		filebench_log(LOG_DEBUG_SCRIPT,
1559 		    "open raw device %s flags %d = %d", name, open_attrs, fd);
1560 
1561 		if (FB_OPEN(&(threadflow->tf_fd[fd]), name,
1562 		    openflag | open_attrs, 0666) == FILEBENCH_ERROR) {
1563 			filebench_log(LOG_ERROR,
1564 			    "Failed to open raw device %s: %s",
1565 			    name, strerror(errno));
1566 			return (FILEBENCH_ERROR);
1567 		}
1568 
1569 		/* if running on Solaris, use un-buffered io */
1570 #ifdef sun
1571 		(void) directio(threadflow->tf_fd[fd].fd_num, DIRECTIO_ON);
1572 #endif
1573 
1574 		threadflow->tf_fse[fd] = NULL;
1575 
1576 		return (FILEBENCH_OK);
1577 	}
1578 #endif /* HAVE_RAW_SUPPORT */
1579 
1580 	if ((err = flowoplib_pickfile(&file, flowop,
1581 	    FILESET_PICKEXISTS, tid)) != FILEBENCH_OK) {
1582 		filebench_log(LOG_DEBUG_SCRIPT,
1583 		    "flowop %s failed to pick file from %s on fd %d",
1584 		    flowop->fo_name, fileset_name, fd);
1585 		return (err);
1586 	}
1587 
1588 	threadflow->tf_fse[fd] = file;
1589 
1590 	flowop_beginop(threadflow, flowop);
1591 	err = fileset_openfile(&threadflow->tf_fd[fd], flowop->fo_fileset,
1592 	    file, openflag, 0666, flowoplib_fileattrs(flowop));
1593 	flowop_endop(threadflow, flowop, 0);
1594 
1595 	if (err == FILEBENCH_ERROR) {
1596 		filebench_log(LOG_ERROR, "flowop %s failed to open file %s",
1597 		    flowop->fo_name, file->fse_path);
1598 		return (FILEBENCH_ERROR);
1599 	}
1600 
1601 	filebench_log(LOG_DEBUG_SCRIPT,
1602 	    "flowop %s: opened %s fd[%d] = %d",
1603 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1604 
1605 	return (FILEBENCH_OK);
1606 }
1607 
1608 /*
1609  * Emulate create of a file. Uses the flowop's fdnumber to select
1610  * tf_fd and tf_fse array locations to put the created file's file
1611  * descriptor and filesetentry respectively. Uses flowoplib_pickfile()
1612  * to select a specific filesetentry whose file does not currently
1613  * exist for the file create operation. Then calls
1614  * fileset_openfile() with the O_CREATE flag set to create the
1615  * file. Returns FILEBENCH_ERROR if the array index specified by fdnumber is
1616  * already in use, the flowop has no associated fileset, or
1617  * the create call fails. Returns 1 if a filesetentry with a
1618  * nonexistent file cannot be found. Returns FILEBENCH_OK on success.
1619  */
1620 static int
1621 flowoplib_createfile(threadflow_t *threadflow, flowop_t *flowop)
1622 {
1623 	filesetentry_t *file;
1624 	int fd = flowop->fo_fdnumber;
1625 	int err;
1626 
1627 	if (threadflow->tf_fd[fd].fd_ptr != NULL) {
1628 		filebench_log(LOG_ERROR,
1629 		    "flowop %s attempted to create without closing on fd %d",
1630 		    flowop->fo_name, fd);
1631 		return (FILEBENCH_ERROR);
1632 	}
1633 
1634 	if (flowop->fo_fileset == NULL) {
1635 		filebench_log(LOG_ERROR, "flowop NULL file");
1636 		return (FILEBENCH_ERROR);
1637 	}
1638 
1639 	if (avd_get_bool(flowop->fo_fileset->fs_readonly) == TRUE) {
1640 		filebench_log(LOG_ERROR, "Can not CREATE the READONLY file %s",
1641 		    avd_get_str(flowop->fo_fileset->fs_name));
1642 		return (FILEBENCH_ERROR);
1643 	}
1644 
1645 
1646 #ifdef HAVE_RAW_SUPPORT
1647 	/* can't be used with raw devices */
1648 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1649 		filebench_log(LOG_ERROR,
1650 		    "flowop %s attempted to a createfile on RAW device",
1651 		    flowop->fo_name);
1652 		return (FILEBENCH_ERROR);
1653 	}
1654 #endif /* HAVE_RAW_SUPPORT */
1655 
1656 	if ((err = flowoplib_pickfile(&file, flowop,
1657 	    FILESET_PICKNOEXIST, 0)) != FILEBENCH_OK) {
1658 		filebench_log(LOG_DEBUG_SCRIPT,
1659 		    "flowop %s failed to pick file from fileset %s",
1660 		    flowop->fo_name,
1661 		    avd_get_str(flowop->fo_fileset->fs_name));
1662 		return (err);
1663 	}
1664 
1665 	threadflow->tf_fse[fd] = file;
1666 
1667 	flowop_beginop(threadflow, flowop);
1668 	err = fileset_openfile(&threadflow->tf_fd[fd], flowop->fo_fileset,
1669 	    file, O_RDWR | O_CREAT, 0666, flowoplib_fileattrs(flowop));
1670 	flowop_endop(threadflow, flowop, 0);
1671 
1672 	if (err == FILEBENCH_ERROR) {
1673 		filebench_log(LOG_ERROR, "failed to create file %s",
1674 		    flowop->fo_name);
1675 		return (FILEBENCH_ERROR);
1676 	}
1677 
1678 	filebench_log(LOG_DEBUG_SCRIPT,
1679 	    "flowop %s: created %s fd[%d] = %d",
1680 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1681 
1682 	return (FILEBENCH_OK);
1683 }
1684 
1685 /*
1686  * Emulates delete of a file. If a valid fd is provided, it uses the
1687  * filesetentry stored at that fd location to select the file to be
1688  * deleted, otherwise it picks an arbitrary filesetentry
1689  * whose file exists. It then uses unlink() to delete it and Clears
1690  * the FSE_EXISTS flag for the filesetentry. Returns FILEBENCH_ERROR if the
1691  * flowop has no associated fileset. Returns FILEBENCH_NORSC if an appropriate
1692  * filesetentry cannot be found, and FILEBENCH_OK on success.
1693  */
1694 static int
1695 flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop)
1696 {
1697 	filesetentry_t *file;
1698 	fileset_t *fileset;
1699 	char path[MAXPATHLEN];
1700 	char *pathtmp;
1701 	int fd = flowop->fo_fdnumber;
1702 
1703 	/* if fd specified, use it to access file */
1704 	if ((fd > 0) && ((file = threadflow->tf_fse[fd]) != NULL)) {
1705 
1706 		/* indicate that the file will be deleted */
1707 		threadflow->tf_fse[fd] = NULL;
1708 
1709 		/* if here, we still have a valid file pointer */
1710 		fileset = file->fse_fileset;
1711 	} else {
1712 
1713 		/* Otherwise, pick arbitrary file */
1714 		file = NULL;
1715 		fileset = flowop->fo_fileset;
1716 	}
1717 
1718 
1719 	if (fileset == NULL) {
1720 		filebench_log(LOG_ERROR, "flowop NULL file");
1721 		return (FILEBENCH_ERROR);
1722 	}
1723 
1724 #ifdef HAVE_RAW_SUPPORT
1725 	/* can't be used with raw devices */
1726 	if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1727 		filebench_log(LOG_ERROR,
1728 		    "flowop %s attempted a deletefile on RAW device",
1729 		    flowop->fo_name);
1730 		return (FILEBENCH_ERROR);
1731 	}
1732 #endif /* HAVE_RAW_SUPPORT */
1733 
1734 	if (file == NULL) {
1735 		int err;
1736 
1737 		/* pick arbitrary, existing (allocated) file */
1738 		if ((err = flowoplib_pickfile(&file, flowop,
1739 		    FILESET_PICKEXISTS, 0)) != FILEBENCH_OK) {
1740 			filebench_log(LOG_DEBUG_SCRIPT,
1741 			    "flowop %s failed to pick file", flowop->fo_name);
1742 			return (err);
1743 		}
1744 	} else {
1745 		/* delete specific file. wait for it to be non-busy */
1746 		(void) ipc_mutex_lock(&fileset->fs_pick_lock);
1747 		while (file->fse_flags & FSE_BUSY) {
1748 			file->fse_flags |= FSE_THRD_WAITNG;
1749 			(void) pthread_cond_wait(&fileset->fs_thrd_wait_cv,
1750 			    &fileset->fs_pick_lock);
1751 		}
1752 
1753 		/* File now available, grab it for deletion */
1754 		file->fse_flags |= FSE_BUSY;
1755 		fileset->fs_idle_files--;
1756 		(void) ipc_mutex_unlock(&fileset->fs_pick_lock);
1757 	}
1758 
1759 	/* don't delete if anyone (other than me) has file open */
1760 	if ((fd > 0) && (threadflow->tf_fd[fd].fd_num > 0)) {
1761 		if (file->fse_open_cnt > 1) {
1762 			filebench_log(LOG_DEBUG_SCRIPT,
1763 			    "flowop %s can't delete file opened by other"
1764 			    " threads at fd = %d", flowop->fo_name, fd);
1765 			fileset_unbusy(file, FALSE, FALSE, 0);
1766 			return (FILEBENCH_OK);
1767 		} else {
1768 			filebench_log(LOG_DEBUG_SCRIPT,
1769 			    "flowop %s deleting still open file at fd = %d",
1770 			    flowop->fo_name, fd);
1771 		}
1772 	} else if (file->fse_open_cnt > 0) {
1773 		filebench_log(LOG_DEBUG_SCRIPT,
1774 		    "flowop %s can't delete file opened by other"
1775 		    " threads at fd = %d, open count = %d",
1776 		    flowop->fo_name, fd, file->fse_open_cnt);
1777 		fileset_unbusy(file, FALSE, FALSE, 0);
1778 		return (FILEBENCH_OK);
1779 	}
1780 
1781 	(void) fb_strlcpy(path, avd_get_str(fileset->fs_path), MAXPATHLEN);
1782 	(void) fb_strlcat(path, "/", MAXPATHLEN);
1783 	(void) fb_strlcat(path, avd_get_str(fileset->fs_name), MAXPATHLEN);
1784 	pathtmp = fileset_resolvepath(file);
1785 	(void) fb_strlcat(path, pathtmp, MAXPATHLEN);
1786 	free(pathtmp);
1787 
1788 	/* delete the selected file */
1789 	flowop_beginop(threadflow, flowop);
1790 	(void) FB_UNLINK(path);
1791 	flowop_endop(threadflow, flowop, 0);
1792 
1793 	/* indicate that it is no longer busy and no longer exists */
1794 	fileset_unbusy(file, TRUE, FALSE, -file->fse_open_cnt);
1795 
1796 	filebench_log(LOG_DEBUG_SCRIPT, "deleted file %s", file->fse_path);
1797 
1798 	return (FILEBENCH_OK);
1799 }
1800 
1801 /*
1802  * Emulates fsync of a file. Obtains the file descriptor index
1803  * from the flowop, obtains the actual file descriptor from
1804  * the threadflow's table, checks to be sure it is still an
1805  * open file, then does an fsync operation on it. Returns FILEBENCH_ERROR
1806  * if the file no longer is open, FILEBENCH_OK otherwise.
1807  */
1808 static int
1809 flowoplib_fsync(threadflow_t *threadflow, flowop_t *flowop)
1810 {
1811 	filesetentry_t *file;
1812 	int fd = flowop->fo_fdnumber;
1813 
1814 	if (threadflow->tf_fd[fd].fd_ptr == NULL) {
1815 		filebench_log(LOG_ERROR,
1816 		    "flowop %s attempted to fsync a closed fd %d",
1817 		    flowop->fo_name, fd);
1818 		return (FILEBENCH_ERROR);
1819 	}
1820 
1821 	file = threadflow->tf_fse[fd];
1822 
1823 	if ((file == NULL) ||
1824 	    (file->fse_fileset->fs_attrs & FILESET_IS_RAW_DEV)) {
1825 		filebench_log(LOG_ERROR,
1826 		    "flowop %s attempted to a fsync a RAW device",
1827 		    flowop->fo_name);
1828 		return (FILEBENCH_ERROR);
1829 	}
1830 
1831 	/* Measure time to fsync */
1832 	flowop_beginop(threadflow, flowop);
1833 	(void) FB_FSYNC(&threadflow->tf_fd[fd]);
1834 	flowop_endop(threadflow, flowop, 0);
1835 
1836 	filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s", file->fse_path);
1837 
1838 	return (FILEBENCH_OK);
1839 }
1840 
1841 /*
1842  * Emulate fsync of an entire fileset. Search through the
1843  * threadflow's file descriptor array, doing fsync() on each
1844  * open file that belongs to the flowop's fileset. Always
1845  * returns FILEBENCH_OK.
1846  */
1847 static int
1848 flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop)
1849 {
1850 	int fd;
1851 
1852 	for (fd = 0; fd < THREADFLOW_MAXFD; fd++) {
1853 		filesetentry_t *file;
1854 
1855 		/* Match the file set to fsync */
1856 		if ((threadflow->tf_fse[fd] == NULL) ||
1857 		    (flowop->fo_fileset != threadflow->tf_fse[fd]->fse_fileset))
1858 			continue;
1859 
1860 		/* Measure time to fsync */
1861 		flowop_beginop(threadflow, flowop);
1862 		(void) FB_FSYNC(&threadflow->tf_fd[fd]);
1863 		flowop_endop(threadflow, flowop, 0);
1864 
1865 		file = threadflow->tf_fse[fd];
1866 
1867 		filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s",
1868 		    file->fse_path);
1869 	}
1870 
1871 	return (FILEBENCH_OK);
1872 }
1873 
1874 /*
1875  * Emulate close of a file.  Obtains the file descriptor index
1876  * from the flowop, obtains the actual file descriptor from the
1877  * threadflow's table, checks to be sure it is still an open
1878  * file, then does a close operation on it. Then sets the
1879  * threadflow file descriptor table entry to 0, and the file set
1880  * entry pointer to NULL. Returns FILEBENCH_ERROR if the file was not open,
1881  * FILEBENCH_OK otherwise.
1882  */
1883 static int
1884 flowoplib_closefile(threadflow_t *threadflow, flowop_t *flowop)
1885 {
1886 	filesetentry_t *file;
1887 	fileset_t *fileset;
1888 	int fd = flowop->fo_fdnumber;
1889 
1890 	if (threadflow->tf_fd[fd].fd_ptr == NULL) {
1891 		filebench_log(LOG_ERROR,
1892 		    "flowop %s attempted to close an already closed fd %d",
1893 		    flowop->fo_name, fd);
1894 		return (FILEBENCH_ERROR);
1895 	}
1896 
1897 	file = threadflow->tf_fse[fd];
1898 	fileset = file->fse_fileset;
1899 
1900 	/* Wait for it to be non-busy */
1901 	(void) ipc_mutex_lock(&fileset->fs_pick_lock);
1902 	while (file->fse_flags & FSE_BUSY) {
1903 		file->fse_flags |= FSE_THRD_WAITNG;
1904 		(void) pthread_cond_wait(&fileset->fs_thrd_wait_cv,
1905 		    &fileset->fs_pick_lock);
1906 	}
1907 
1908 	/* File now available, grab it for closing */
1909 	file->fse_flags |= FSE_BUSY;
1910 
1911 	/* if last open, set declare idle */
1912 	if (file->fse_open_cnt == 1)
1913 		fileset->fs_idle_files--;
1914 
1915 	(void) ipc_mutex_unlock(&fileset->fs_pick_lock);
1916 
1917 	/* Measure time to close */
1918 	flowop_beginop(threadflow, flowop);
1919 	(void) FB_CLOSE(&threadflow->tf_fd[fd]);
1920 	flowop_endop(threadflow, flowop, 0);
1921 
1922 	fileset_unbusy(file, FALSE, FALSE, -1);
1923 
1924 	threadflow->tf_fd[fd].fd_ptr = NULL;
1925 
1926 	filebench_log(LOG_DEBUG_SCRIPT, "closed file %s", file->fse_path);
1927 
1928 	return (FILEBENCH_OK);
1929 }
1930 
1931 /*
1932  * Obtain the full pathname of the directory described by the filesetentry
1933  * indicated by "dir", and copy it into the character array pointed to by
1934  * path. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
1935  */
1936 static int
1937 flowoplib_getdirpath(filesetentry_t *dir, char *path)
1938 {
1939 	char		*fileset_path;
1940 	char		*fileset_name;
1941 	char		*part_path;
1942 
1943 	if ((fileset_path = avd_get_str(dir->fse_fileset->fs_path)) == NULL) {
1944 		filebench_log(LOG_ERROR, "Fileset path not set");
1945 		return (FILEBENCH_ERROR);
1946 	}
1947 
1948 	if ((fileset_name = avd_get_str(dir->fse_fileset->fs_name)) == NULL) {
1949 		filebench_log(LOG_ERROR, "Fileset name not set");
1950 		return (FILEBENCH_ERROR);
1951 	}
1952 
1953 	(void) fb_strlcpy(path, fileset_path, MAXPATHLEN);
1954 	(void) fb_strlcat(path, "/", MAXPATHLEN);
1955 	(void) fb_strlcat(path, fileset_name, MAXPATHLEN);
1956 
1957 	if ((part_path = fileset_resolvepath(dir)) == NULL)
1958 		return (FILEBENCH_ERROR);
1959 
1960 	(void) fb_strlcat(path, part_path, MAXPATHLEN);
1961 	free(part_path);
1962 
1963 	return (FILEBENCH_OK);
1964 }
1965 
1966 /*
1967  * Use mkdir to create a directory.  Obtains the fileset name from the
1968  * flowop, selects a non-existent leaf directory and obtains its full
1969  * path, then uses mkdir to create it on the storage subsystem (make it
1970  * existent). Returns FILEBENCH_NORSC is there are no more non-existent
1971  * directories in the fileset, FILEBENCH_ERROR on other errors, and
1972  * FILEBENCH_OK on success.
1973  */
1974 static int
1975 flowoplib_makedir(threadflow_t *threadflow, flowop_t *flowop)
1976 {
1977 	filesetentry_t	*dir;
1978 	int		ret;
1979 	char		full_path[MAXPATHLEN];
1980 
1981 	if ((ret = flowoplib_pickleafdir(&dir, flowop,
1982 	    FILESET_PICKNOEXIST)) != FILEBENCH_OK)
1983 		return (ret);
1984 
1985 	if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
1986 		return (ret);
1987 
1988 	flowop_beginop(threadflow, flowop);
1989 	(void) FB_MKDIR(full_path, 0755);
1990 	flowop_endop(threadflow, flowop, 0);
1991 
1992 	/* indicate that it is no longer busy and now exists */
1993 	fileset_unbusy(dir, TRUE, TRUE, 0);
1994 
1995 	return (FILEBENCH_OK);
1996 }
1997 
1998 /*
1999  * Use rmdir to delete a directory.  Obtains the fileset name from the
2000  * flowop, selects an existent leaf directory and obtains its full path,
2001  * then uses rmdir to remove it from the storage subsystem (make it
2002  * non-existent). Returns FILEBENCH_NORSC is there are no more existent
2003  * directories in the fileset, FILEBENCH_ERROR on other errors, and
2004  * FILEBENCH_OK on success.
2005  */
2006 static int
2007 flowoplib_removedir(threadflow_t *threadflow, flowop_t *flowop)
2008 {
2009 	filesetentry_t *dir;
2010 	int		ret;
2011 	char		full_path[MAXPATHLEN];
2012 
2013 	if ((ret = flowoplib_pickleafdir(&dir, flowop,
2014 	    FILESET_PICKEXISTS)) != FILEBENCH_OK)
2015 		return (ret);
2016 
2017 	if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
2018 		return (ret);
2019 
2020 	flowop_beginop(threadflow, flowop);
2021 	(void) FB_RMDIR(full_path);
2022 	flowop_endop(threadflow, flowop, 0);
2023 
2024 	/* indicate that it is no longer busy and no longer exists */
2025 	fileset_unbusy(dir, TRUE, FALSE, 0);
2026 
2027 	return (FILEBENCH_OK);
2028 }
2029 
2030 /*
2031  * Use opendir(), multiple readdir() calls, and closedir() to list the
2032  * contents of a directory.  Obtains the fileset name from the
2033  * flowop, selects a normal subdirectory (which always exist) and obtains
2034  * its full path, then uses opendir() to get a DIR handle to it from the
2035  * file system, a readdir() loop to access each directory entry, and
2036  * finally cleans up with a closedir(). The latency reported is the total
2037  * for all this activity, and it also reports the total number of bytes
2038  * in the entries as the amount "read". Returns FILEBENCH_ERROR on errors,
2039  * and FILEBENCH_OK on success.
2040  */
2041 static int
2042 flowoplib_listdir(threadflow_t *threadflow, flowop_t *flowop)
2043 {
2044 	fileset_t	*fileset;
2045 	filesetentry_t	*dir;
2046 	DIR		*dir_handle;
2047 	struct dirent	*direntp;
2048 	int		dir_bytes = 0;
2049 	int		ret;
2050 	char		full_path[MAXPATHLEN];
2051 
2052 	if ((fileset = flowop->fo_fileset) == NULL) {
2053 		filebench_log(LOG_ERROR, "flowop NO fileset");
2054 		return (FILEBENCH_ERROR);
2055 	}
2056 
2057 	if ((dir = fileset_pick(fileset, FILESET_PICKDIR, 0, 0)) == NULL) {
2058 		filebench_log(LOG_DEBUG_SCRIPT,
2059 		    "flowop %s failed to pick directory from fileset %s",
2060 		    flowop->fo_name,
2061 		    avd_get_str(fileset->fs_name));
2062 		return (FILEBENCH_ERROR);
2063 	}
2064 
2065 	if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
2066 		return (ret);
2067 
2068 	flowop_beginop(threadflow, flowop);
2069 
2070 	/* open the directory */
2071 	if ((dir_handle = FB_OPENDIR(full_path)) == NULL) {
2072 		filebench_log(LOG_ERROR,
2073 		    "flowop %s failed to open directory in fileset %s\n",
2074 		    flowop->fo_name, avd_get_str(fileset->fs_name));
2075 		return (FILEBENCH_ERROR);
2076 	}
2077 
2078 	/* read through the directory entries */
2079 	while ((direntp = FB_READDIR(dir_handle)) != NULL) {
2080 		dir_bytes += (strlen(direntp->d_name) +
2081 		    sizeof (struct dirent) - 1);
2082 	}
2083 
2084 	/* close the directory */
2085 	(void) FB_CLOSEDIR(dir_handle);
2086 
2087 	flowop_endop(threadflow, flowop, dir_bytes);
2088 
2089 	/* indicate that it is no longer busy */
2090 	fileset_unbusy(dir, FALSE, FALSE, 0);
2091 
2092 	return (FILEBENCH_OK);
2093 }
2094 
2095 /*
2096  * Emulate stat of a file. Picks an arbitrary filesetentry with
2097  * an existing file from the flowop's fileset, then performs a
2098  * stat() operation on it. Returns FILEBENCH_ERROR if the flowop has no
2099  * associated fileset. Returns FILEBENCH_NORSC if an appropriate filesetentry
2100  * cannot be found, and FILEBENCH_OK on success.
2101  */
2102 static int
2103 flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop)
2104 {
2105 	filesetentry_t *file;
2106 	fileset_t *fileset;
2107 	struct stat64 statbuf;
2108 	int fd = flowop->fo_fdnumber;
2109 
2110 	/* if fd specified and the file is open, use it to access file */
2111 	if ((fd > 0) && (threadflow->tf_fd[fd].fd_num > 0)) {
2112 
2113 		/* check whether file handle still valid */
2114 		if ((file = threadflow->tf_fse[fd]) == NULL) {
2115 			filebench_log(LOG_DEBUG_SCRIPT,
2116 			    "flowop %s trying to stat NULL file at fd = %d",
2117 			    flowop->fo_name, fd);
2118 			return (FILEBENCH_ERROR);
2119 		}
2120 
2121 		/* if here, we still have a valid file pointer */
2122 		fileset = file->fse_fileset;
2123 	} else {
2124 		/* Otherwise, pick arbitrary file */
2125 		file = NULL;
2126 		fileset = flowop->fo_fileset;
2127 	}
2128 
2129 	if (fileset == NULL) {
2130 		filebench_log(LOG_ERROR,
2131 		    "statfile with no fileset specified");
2132 		return (FILEBENCH_ERROR);
2133 	}
2134 
2135 #ifdef HAVE_RAW_SUPPORT
2136 	/* can't be used with raw devices */
2137 	if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
2138 		filebench_log(LOG_ERROR,
2139 		    "flowop %s attempted do a statfile on a RAW device",
2140 		    flowop->fo_name);
2141 		return (FILEBENCH_ERROR);
2142 	}
2143 #endif /* HAVE_RAW_SUPPORT */
2144 
2145 	if (file == NULL) {
2146 		char path[MAXPATHLEN];
2147 		char *pathtmp;
2148 		int err;
2149 
2150 		/* pick arbitrary, existing (allocated) file */
2151 		if ((err = flowoplib_pickfile(&file, flowop,
2152 		    FILESET_PICKEXISTS, 0)) != FILEBENCH_OK) {
2153 			filebench_log(LOG_DEBUG_SCRIPT,
2154 			    "Statfile flowop %s failed to pick file",
2155 			    flowop->fo_name);
2156 			return (err);
2157 		}
2158 
2159 		/* resolve path and do a stat on file */
2160 		(void) fb_strlcpy(path, avd_get_str(fileset->fs_path),
2161 		    MAXPATHLEN);
2162 		(void) fb_strlcat(path, "/", MAXPATHLEN);
2163 		(void) fb_strlcat(path, avd_get_str(fileset->fs_name),
2164 		    MAXPATHLEN);
2165 		pathtmp = fileset_resolvepath(file);
2166 		(void) fb_strlcat(path, pathtmp, MAXPATHLEN);
2167 		free(pathtmp);
2168 
2169 		/* stat the file */
2170 		flowop_beginop(threadflow, flowop);
2171 		if (FB_STAT(path, &statbuf) == -1)
2172 			filebench_log(LOG_ERROR,
2173 			    "statfile flowop %s failed", flowop->fo_name);
2174 		flowop_endop(threadflow, flowop, 0);
2175 
2176 		fileset_unbusy(file, FALSE, FALSE, 0);
2177 	} else {
2178 		/* stat specific file */
2179 		flowop_beginop(threadflow, flowop);
2180 		if (FB_FSTAT(&threadflow->tf_fd[fd], &statbuf) == -1)
2181 			filebench_log(LOG_ERROR,
2182 			    "statfile flowop %s failed", flowop->fo_name);
2183 		flowop_endop(threadflow, flowop, 0);
2184 
2185 	}
2186 
2187 	return (FILEBENCH_OK);
2188 }
2189 
2190 
2191 /*
2192  * Additional reads and writes. Read and write whole files, write
2193  * and append to files. Some of these work with both fileobjs and
2194  * filesets, others only with filesets. The flowoplib_write routine
2195  * writes from thread memory, while the others read or write using
2196  * fo_buf memory. Note that both flowoplib_read() and
2197  * flowoplib_aiowrite() use thread memory as well.
2198  */
2199 
2200 
2201 /*
2202  * Emulate a read of a whole file. The file must be open with
2203  * file descriptor and filesetentry stored at the locations indexed
2204  * by the flowop's fdnumber. It then seeks to the beginning of the
2205  * associated file, and reads fs_iosize bytes at a time until the end
2206  * of the file. Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if
2207  * out of files, and FILEBENCH_OK on success.
2208  */
2209 static int
2210 flowoplib_readwholefile(threadflow_t *threadflow, flowop_t *flowop)
2211 {
2212 	caddr_t iobuf;
2213 	off64_t bytes = 0;
2214 	fb_fdesc_t *fdesc;
2215 	uint64_t wss;
2216 	fbint_t iosize;
2217 	int ret;
2218 	char zerordbuf;
2219 
2220 	/* get the file to use */
2221 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
2222 	    &fdesc)) != FILEBENCH_OK)
2223 		return (ret);
2224 
2225 	/* an I/O size of zero means read entire working set with one I/O */
2226 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
2227 		iosize = wss;
2228 
2229 	/*
2230 	 * The file may actually be 0 bytes long, in which case skip
2231 	 * the buffer set up call (which would fail) and substitute
2232 	 * a small buffer, which won't really be used.
2233 	 */
2234 	if (iosize == 0) {
2235 		iobuf = (caddr_t)&zerordbuf;
2236 		filebench_log(LOG_DEBUG_SCRIPT,
2237 		    "flowop %s read zero length file", flowop->fo_name);
2238 	} else {
2239 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
2240 		    iosize) != 0)
2241 			return (FILEBENCH_ERROR);
2242 	}
2243 
2244 	/* Measure time to read bytes */
2245 	flowop_beginop(threadflow, flowop);
2246 	(void) FB_LSEEK(fdesc, 0, SEEK_SET);
2247 	while ((ret = FB_READ(fdesc, iobuf, iosize)) > 0)
2248 		bytes += ret;
2249 
2250 	flowop_endop(threadflow, flowop, bytes);
2251 
2252 	if (ret < 0) {
2253 		filebench_log(LOG_ERROR,
2254 		    "readwhole fail Failed to read whole file: %s",
2255 		    strerror(errno));
2256 		return (FILEBENCH_ERROR);
2257 	}
2258 
2259 	return (FILEBENCH_OK);
2260 }
2261 
2262 /*
2263  * Emulate a write to a file of size fo_iosize.  Will write
2264  * to a file from a fileset if the flowop's fo_fileset field
2265  * specifies one or its fdnumber is non zero. Otherwise it
2266  * will write to a fileobj file, if one exists. If the file
2267  * is not currently open, the routine will attempt to open
2268  * it. The flowop's fo_wss parameter will be used to set the
2269  * maximum file size if it is non-zero, otherwise the
2270  * filesetentry's  fse_size will be used. A random memory
2271  * buffer offset is calculated, and, if fo_random is TRUE,
2272  * a random file offset is used for the write. Otherwise the
2273  * write is to the next sequential location. Returns
2274  * FILEBENCH_ERROR on errors, FILEBENCH_NORSC if iosetup can't
2275  * obtain a file, or FILEBENCH_OK on success.
2276  */
2277 static int
2278 flowoplib_write(threadflow_t *threadflow, flowop_t *flowop)
2279 {
2280 	caddr_t iobuf;
2281 	fbint_t wss;
2282 	fbint_t iosize;
2283 	fb_fdesc_t *fdesc;
2284 	int ret;
2285 
2286 	iosize = avd_get_int(flowop->fo_iosize);
2287 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2288 	    &fdesc, iosize)) != FILEBENCH_OK)
2289 		return (ret);
2290 
2291 	if (avd_get_bool(flowop->fo_random)) {
2292 		uint64_t fileoffset;
2293 
2294 		if (filebench_randomno64(&fileoffset,
2295 		    wss, iosize, NULL) == -1) {
2296 			filebench_log(LOG_ERROR,
2297 			    "file size smaller than IO size for thread %s",
2298 			    flowop->fo_name);
2299 			return (FILEBENCH_ERROR);
2300 		}
2301 		flowop_beginop(threadflow, flowop);
2302 		if (FB_PWRITE(fdesc, iobuf,
2303 		    iosize, (off64_t)fileoffset) == -1) {
2304 			filebench_log(LOG_ERROR, "write failed, "
2305 			    "offset %llu io buffer %zd: %s",
2306 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
2307 			flowop_endop(threadflow, flowop, 0);
2308 			return (FILEBENCH_ERROR);
2309 		}
2310 		flowop_endop(threadflow, flowop, iosize);
2311 	} else {
2312 		flowop_beginop(threadflow, flowop);
2313 		if (FB_WRITE(fdesc, iobuf, iosize) == -1) {
2314 			filebench_log(LOG_ERROR,
2315 			    "write failed, io buffer %zd: %s",
2316 			    iobuf, strerror(errno));
2317 			flowop_endop(threadflow, flowop, 0);
2318 			return (FILEBENCH_ERROR);
2319 		}
2320 		flowop_endop(threadflow, flowop, iosize);
2321 	}
2322 
2323 	return (FILEBENCH_OK);
2324 }
2325 
2326 /*
2327  * Emulate a write of a whole file.  The size of the file
2328  * is taken from a filesetentry identified by fo_srcfdnumber or
2329  * from the working set size, while the file descriptor used is
2330  * identified by fo_fdnumber. Does multiple writes of fo_iosize
2331  * length length until full file has been written. Returns FILEBENCH_ERROR on
2332  * error, FILEBENCH_NORSC if out of files, FILEBENCH_OK on success.
2333  */
2334 static int
2335 flowoplib_writewholefile(threadflow_t *threadflow, flowop_t *flowop)
2336 {
2337 	caddr_t iobuf;
2338 	filesetentry_t *file;
2339 	int wsize;
2340 	off64_t seek;
2341 	off64_t bytes = 0;
2342 	uint64_t wss;
2343 	fbint_t iosize;
2344 	fb_fdesc_t *fdesc;
2345 	int srcfd = flowop->fo_srcfdnumber;
2346 	int ret;
2347 	char zerowrtbuf;
2348 
2349 	/* get the file to use */
2350 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
2351 	    &fdesc)) != FILEBENCH_OK)
2352 		return (ret);
2353 
2354 	/* an I/O size of zero means write entire working set with one I/O */
2355 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
2356 		iosize = wss;
2357 
2358 	/*
2359 	 * The file may actually be 0 bytes long, in which case skip
2360 	 * the buffer set up call (which would fail) and substitute
2361 	 * a small buffer, which won't really be used.
2362 	 */
2363 	if (iosize == 0) {
2364 		iobuf = (caddr_t)&zerowrtbuf;
2365 		filebench_log(LOG_DEBUG_SCRIPT,
2366 		    "flowop %s wrote zero length file", flowop->fo_name);
2367 	} else {
2368 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
2369 		    iosize) != 0)
2370 			return (FILEBENCH_ERROR);
2371 	}
2372 
2373 	file = threadflow->tf_fse[srcfd];
2374 	if ((srcfd != 0) && (file == NULL)) {
2375 		filebench_log(LOG_ERROR, "flowop %s: NULL src file",
2376 		    flowop->fo_name);
2377 		return (FILEBENCH_ERROR);
2378 	}
2379 
2380 	if (file)
2381 		wss = file->fse_size;
2382 
2383 	wsize = (int)MIN(wss, iosize);
2384 
2385 	/* Measure time to write bytes */
2386 	flowop_beginop(threadflow, flowop);
2387 	for (seek = 0; seek < wss; seek += wsize) {
2388 		ret = FB_WRITE(fdesc, iobuf, wsize);
2389 		if (ret != wsize) {
2390 			filebench_log(LOG_ERROR,
2391 			    "Failed to write %d bytes on fd %d: %s",
2392 			    wsize, fdesc->fd_num, strerror(errno));
2393 			flowop_endop(threadflow, flowop, 0);
2394 			return (FILEBENCH_ERROR);
2395 		}
2396 		wsize = (int)MIN(wss - seek, iosize);
2397 		bytes += ret;
2398 	}
2399 	flowop_endop(threadflow, flowop, bytes);
2400 
2401 	return (FILEBENCH_OK);
2402 }
2403 
2404 
2405 /*
2406  * Emulate a fixed size append to a file. Will append data to
2407  * a file chosen from a fileset if the flowop's fo_fileset
2408  * field specifies one or if its fdnumber is non zero.
2409  * Otherwise it will write to a fileobj file, if one exists.
2410  * The flowop's fo_wss parameter will be used to set the
2411  * maximum file size if it is non-zero, otherwise the
2412  * filesetentry's fse_size will be used. A random memory
2413  * buffer offset is calculated, then a logical seek to the
2414  * end of file is done followed by a write of fo_iosize
2415  * bytes. Writes are actually done from fo_buf, rather than
2416  * tf_mem as is done with flowoplib_write(), and no check
2417  * is made to see if fo_iosize exceeds the size of fo_buf.
2418  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
2419  * files in the fileset, FILEBENCH_OK on success.
2420  */
2421 static int
2422 flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop)
2423 {
2424 	caddr_t iobuf;
2425 	fb_fdesc_t *fdesc;
2426 	fbint_t wss;
2427 	fbint_t iosize;
2428 	int ret;
2429 
2430 	iosize = avd_get_int(flowop->fo_iosize);
2431 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2432 	    &fdesc, iosize)) != FILEBENCH_OK)
2433 		return (ret);
2434 
2435 	/* XXX wss is not being used */
2436 
2437 	/* Measure time to write bytes */
2438 	flowop_beginop(threadflow, flowop);
2439 	(void) FB_LSEEK(fdesc, 0, SEEK_END);
2440 	ret = FB_WRITE(fdesc, iobuf, iosize);
2441 	if (ret != iosize) {
2442 		filebench_log(LOG_ERROR,
2443 		    "Failed to write %llu bytes on fd %d: %s",
2444 		    (u_longlong_t)iosize, fdesc->fd_num, strerror(errno));
2445 		flowop_endop(threadflow, flowop, ret);
2446 		return (FILEBENCH_ERROR);
2447 	}
2448 	flowop_endop(threadflow, flowop, ret);
2449 
2450 	return (FILEBENCH_OK);
2451 }
2452 
2453 /*
2454  * Emulate a random size append to a file. Will append data
2455  * to a file chosen from a fileset if the flowop's fo_fileset
2456  * field specifies one or if its fdnumber is non zero. Otherwise
2457  * it will write to a fileobj file, if one exists. The flowop's
2458  * fo_wss parameter will be used to set the maximum file size
2459  * if it is non-zero, otherwise the filesetentry's fse_size
2460  * will be used.  A random transfer size (but at most fo_iosize
2461  * bytes) and a random memory offset are calculated. A logical
2462  * seek to the end of file is done, then writes of up to
2463  * FILE_ALLOC_BLOCK in size are done until the full transfer
2464  * size has been written. Writes are actually done from fo_buf,
2465  * rather than tf_mem as is done with flowoplib_write().
2466  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
2467  * files in the fileset, FILEBENCH_OK on success.
2468  */
2469 static int
2470 flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop)
2471 {
2472 	caddr_t iobuf;
2473 	uint64_t appendsize;
2474 	fb_fdesc_t *fdesc;
2475 	fbint_t wss;
2476 	fbint_t iosize;
2477 	int ret = 0;
2478 
2479 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0) {
2480 		filebench_log(LOG_ERROR, "zero iosize for flowop %s",
2481 		    flowop->fo_name);
2482 		return (FILEBENCH_ERROR);
2483 	}
2484 
2485 	if (filebench_randomno64(&appendsize, iosize, 1LL, NULL) != 0)
2486 		return (FILEBENCH_ERROR);
2487 
2488 	/* skip if attempting zero length append */
2489 	if (appendsize == 0) {
2490 		flowop_beginop(threadflow, flowop);
2491 		flowop_endop(threadflow, flowop, 0LL);
2492 		return (FILEBENCH_OK);
2493 	}
2494 
2495 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2496 	    &fdesc, appendsize)) != FILEBENCH_OK)
2497 		return (ret);
2498 
2499 	/* XXX wss is not being used */
2500 
2501 	/* Measure time to write bytes */
2502 	flowop_beginop(threadflow, flowop);
2503 
2504 	(void) FB_LSEEK(fdesc, 0, SEEK_END);
2505 	ret = FB_WRITE(fdesc, iobuf, appendsize);
2506 	if (ret != appendsize) {
2507 		filebench_log(LOG_ERROR,
2508 		    "Failed to write %llu bytes on fd %d: %s",
2509 		    (u_longlong_t)appendsize, fdesc->fd_num, strerror(errno));
2510 		flowop_endop(threadflow, flowop, 0);
2511 		return (FILEBENCH_ERROR);
2512 	}
2513 
2514 	flowop_endop(threadflow, flowop, appendsize);
2515 
2516 	return (FILEBENCH_OK);
2517 }
2518 
2519 typedef struct testrandvar_priv {
2520 	uint64_t sample_count;
2521 	double val_sum;
2522 	double sqr_sum;
2523 } testrandvar_priv_t;
2524 
2525 /*
2526  * flowop to calculate various statistics from the number stream
2527  * produced by a random variable. This allows verification that the
2528  * random distribution used to define the random variable is producing
2529  * the expected distribution of random numbers.
2530  */
2531 /* ARGSUSED */
2532 static int
2533 flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop)
2534 {
2535 	testrandvar_priv_t	*mystats;
2536 	double			value;
2537 
2538 	if ((mystats = (testrandvar_priv_t *)flowop->fo_private) == NULL) {
2539 		filebench_log(LOG_ERROR, "testrandvar not initialized\n");
2540 		filebench_shutdown(1);
2541 		return (-1);
2542 	}
2543 
2544 	value = avd_get_dbl(flowop->fo_value);
2545 
2546 	mystats->sample_count++;
2547 	mystats->val_sum += value;
2548 	mystats->sqr_sum += (value * value);
2549 
2550 	return (0);
2551 }
2552 
2553 /*
2554  * Initialize the private data area used to accumulate the statistics
2555  */
2556 static int
2557 flowoplib_testrandvar_init(flowop_t *flowop)
2558 {
2559 	testrandvar_priv_t	*mystats;
2560 
2561 	if ((mystats = (testrandvar_priv_t *)
2562 	    malloc(sizeof (testrandvar_priv_t))) == NULL) {
2563 		filebench_log(LOG_ERROR, "could not initialize testrandvar");
2564 		filebench_shutdown(1);
2565 		return (-1);
2566 	}
2567 
2568 	mystats->sample_count = 0;
2569 	mystats->val_sum = 0;
2570 	mystats->sqr_sum = 0;
2571 	flowop->fo_private = (void *)mystats;
2572 
2573 	(void) ipc_mutex_unlock(&flowop->fo_lock);
2574 	return (0);
2575 }
2576 
2577 /*
2578  * Print out the accumulated statistics, and free the private storage
2579  */
2580 static void
2581 flowoplib_testrandvar_destruct(flowop_t *flowop)
2582 {
2583 	testrandvar_priv_t	*mystats;
2584 	double mean, std_dev, dbl_count;
2585 
2586 	(void) ipc_mutex_lock(&flowop->fo_lock);
2587 	if ((mystats = (testrandvar_priv_t *)
2588 	    flowop->fo_private) == NULL) {
2589 		(void) ipc_mutex_unlock(&flowop->fo_lock);
2590 		return;
2591 	}
2592 
2593 	flowop->fo_private = NULL;
2594 	(void) ipc_mutex_unlock(&flowop->fo_lock);
2595 
2596 	dbl_count = (double)mystats->sample_count;
2597 	mean = mystats->val_sum / dbl_count;
2598 	std_dev = sqrt((mystats->sqr_sum / dbl_count) - (mean * mean)) / mean;
2599 
2600 	filebench_log(LOG_VERBOSE,
2601 	    "testrandvar: ops = %llu, mean = %8.2lf, stddev = %8.2lf",
2602 	    (u_longlong_t)mystats->sample_count, mean, std_dev);
2603 	free(mystats);
2604 }
2605 
2606 /*
2607  * prints message to the console from within a thread
2608  */
2609 static int
2610 flowoplib_print(threadflow_t *threadflow, flowop_t *flowop)
2611 {
2612 	procflow_t *procflow;
2613 
2614 	procflow = threadflow->tf_process;
2615 	filebench_log(LOG_INFO,
2616 	    "Message from process (%s,%d), thread (%s,%d): %s",
2617 	    procflow->pf_name, procflow->pf_instance,
2618 	    threadflow->tf_name, threadflow->tf_instance,
2619 	    avd_get_str(flowop->fo_value));
2620 
2621 	return (FILEBENCH_OK);
2622 }
2623 
2624 /*
2625  * Prints usage information for flowop operations.
2626  */
2627 void
2628 flowoplib_usage()
2629 {
2630 	(void) fprintf(stderr,
2631 	    "flowop [openfile|createfile] name=<name>,fileset=<fname>\n");
2632 	(void) fprintf(stderr,
2633 	    "                       [,fd=<file desc num>]\n");
2634 	(void) fprintf(stderr, "\n");
2635 	(void) fprintf(stderr,
2636 	    "flowop closefile name=<name>,fd=<file desc num>]\n");
2637 	(void) fprintf(stderr, "\n");
2638 	(void) fprintf(stderr, "flowop deletefile name=<name>\n");
2639 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2640 	(void) fprintf(stderr,
2641 	    "                       [,fd=<file desc num>]\n");
2642 	(void) fprintf(stderr, "\n");
2643 	(void) fprintf(stderr, "flowop statfile name=<name>\n");
2644 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2645 	(void) fprintf(stderr,
2646 	    "                       [,fd=<file desc num>]\n");
2647 	(void) fprintf(stderr, "\n");
2648 	(void) fprintf(stderr,
2649 	    "flowop fsync name=<name>,fd=<file desc num>]\n");
2650 	(void) fprintf(stderr, "\n");
2651 	(void) fprintf(stderr,
2652 	    "flowop fsyncset name=<name>,fileset=<fname>]\n");
2653 	(void) fprintf(stderr, "\n");
2654 	(void) fprintf(stderr, "flowop [write|read|aiowrite] name=<name>, \n");
2655 	(void) fprintf(stderr,
2656 	    "                       filename|fileset=<fname>,\n");
2657 	(void) fprintf(stderr, "                       iosize=<size>\n");
2658 	(void) fprintf(stderr, "                       [,directio]\n");
2659 	(void) fprintf(stderr, "                       [,dsync]\n");
2660 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2661 	(void) fprintf(stderr, "                       [,random]\n");
2662 	(void) fprintf(stderr, "                       [,opennext]\n");
2663 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
2664 	(void) fprintf(stderr,
2665 	    "flowop [appendfile|appendfilerand] name=<name>, \n");
2666 	(void) fprintf(stderr,
2667 	    "                       filename|fileset=<fname>,\n");
2668 	(void) fprintf(stderr, "                       iosize=<size>\n");
2669 	(void) fprintf(stderr, "                       [,dsync]\n");
2670 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2671 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
2672 	(void) fprintf(stderr,
2673 	    "flowop [readwholefile|writewholefile] name=<name>, \n");
2674 	(void) fprintf(stderr,
2675 	    "                       filename|fileset=<fname>,\n");
2676 	(void) fprintf(stderr, "                       iosize=<size>\n");
2677 	(void) fprintf(stderr, "                       [,dsync]\n");
2678 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2679 	(void) fprintf(stderr, "\n");
2680 	(void) fprintf(stderr, "flowop aiowait name=<name>,target="
2681 	    "<aiowrite-flowop>\n");
2682 	(void) fprintf(stderr, "\n");
2683 	(void) fprintf(stderr, "flowop sempost name=<name>,"
2684 	    "target=<semblock-flowop>,\n");
2685 	(void) fprintf(stderr,
2686 	    "                       value=<increment-to-post>\n");
2687 	(void) fprintf(stderr, "\n");
2688 	(void) fprintf(stderr, "flowop semblock name=<name>,value="
2689 	    "<decrement-to-receive>,\n");
2690 	(void) fprintf(stderr, "                       highwater="
2691 	    "<inbound-queue-max>\n");
2692 	(void) fprintf(stderr, "\n");
2693 	(void) fprintf(stderr, "flowop block name=<name>\n");
2694 	(void) fprintf(stderr, "\n");
2695 	(void) fprintf(stderr,
2696 	    "flowop wakeup name=<name>,target=<block-flowop>,\n");
2697 	(void) fprintf(stderr, "\n");
2698 	(void) fprintf(stderr,
2699 	    "flowop hog name=<name>,value=<number-of-mem-ops>\n");
2700 	(void) fprintf(stderr,
2701 	    "flowop delay name=<name>,value=<number-of-seconds>\n");
2702 	(void) fprintf(stderr, "\n");
2703 	(void) fprintf(stderr, "flowop eventlimit name=<name>\n");
2704 	(void) fprintf(stderr, "flowop bwlimit name=<name>,value=<mb/s>\n");
2705 	(void) fprintf(stderr, "flowop iopslimit name=<name>,value=<iop/s>\n");
2706 	(void) fprintf(stderr,
2707 	    "flowop finishoncount name=<name>,value=<ops/s>\n");
2708 	(void) fprintf(stderr,
2709 	    "flowop finishonbytes name=<name>,value=<bytes>\n");
2710 	(void) fprintf(stderr, "\n");
2711 	(void) fprintf(stderr, "\n");
2712 }
2713