xref: /onnv-gate/usr/src/cmd/filebench/common/flowop_library.c (revision 7556:55f6926392fe)
15184Sek110237 /*
25184Sek110237  * CDDL HEADER START
35184Sek110237  *
45184Sek110237  * The contents of this file are subject to the terms of the
55184Sek110237  * Common Development and Distribution License (the "License").
65184Sek110237  * You may not use this file except in compliance with the License.
75184Sek110237  *
85184Sek110237  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
95184Sek110237  * or http://www.opensolaris.org/os/licensing.
105184Sek110237  * See the License for the specific language governing permissions
115184Sek110237  * and limitations under the License.
125184Sek110237  *
135184Sek110237  * When distributing Covered Code, include this CDDL HEADER in each
145184Sek110237  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
155184Sek110237  * If applicable, add the following below this CDDL HEADER, with the
165184Sek110237  * fields enclosed by brackets "[]" replaced with your own identifying
175184Sek110237  * information: Portions Copyright [yyyy] [name of copyright owner]
185184Sek110237  *
195184Sek110237  * CDDL HEADER END
205184Sek110237  */
215184Sek110237 /*
226084Saw148015  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
235184Sek110237  * Use is subject to license terms.
246613Sek110237  *
256613Sek110237  * Portions Copyright 2008 Denis Cheng
265184Sek110237  */
275184Sek110237 
285184Sek110237 #include "config.h"
295184Sek110237 
305184Sek110237 #include <sys/types.h>
315184Sek110237 #ifdef HAVE_SYS_ASYNCH_H
325184Sek110237 #include <sys/asynch.h>
335184Sek110237 #endif
345184Sek110237 #include <sys/ipc.h>
355184Sek110237 #include <sys/sem.h>
365184Sek110237 #include <sys/errno.h>
375184Sek110237 #include <sys/time.h>
385184Sek110237 #include <inttypes.h>
395184Sek110237 #include <fcntl.h>
406212Saw148015 #include <math.h>
415184Sek110237 
425184Sek110237 #ifdef HAVE_UTILITY_H
435184Sek110237 #include <utility.h>
445184Sek110237 #endif /* HAVE_UTILITY_H */
455184Sek110237 
465184Sek110237 #ifdef HAVE_AIO
475184Sek110237 #include <aio.h>
485184Sek110237 #endif /* HAVE_AIO */
495184Sek110237 
505184Sek110237 #ifdef HAVE_LIBAIO_H
515184Sek110237 #include <libaio.h>
525184Sek110237 #endif /* HAVE_LIBAIO_H */
535184Sek110237 
545184Sek110237 #ifdef HAVE_SYS_ASYNC_H
555184Sek110237 #include <sys/asynch.h>
565184Sek110237 #endif /* HAVE_SYS_ASYNC_H */
575184Sek110237 
585184Sek110237 #ifdef HAVE_AIO_H
595184Sek110237 #include <aio.h>
605184Sek110237 #endif /* HAVE_AIO_H */
615184Sek110237 
625184Sek110237 #ifndef HAVE_UINT_T
635184Sek110237 #define	uint_t unsigned int
645184Sek110237 #endif /* HAVE_UINT_T */
655184Sek110237 
665184Sek110237 #ifndef HAVE_AIOCB64_T
675184Sek110237 #define	aiocb64 aiocb
685184Sek110237 #endif /* HAVE_AIOCB64_T */
695184Sek110237 
705184Sek110237 #ifndef HAVE_SYSV_SEM
715184Sek110237 #include <semaphore.h>
725184Sek110237 #endif /* HAVE_SYSV_SEM */
735184Sek110237 
745184Sek110237 #include "filebench.h"
755184Sek110237 #include "flowop.h"
765184Sek110237 #include "fileset.h"
776212Saw148015 #include "fb_random.h"
785184Sek110237 
795184Sek110237 /*
805184Sek110237  * These routines implement the flowops from the f language. Each
815184Sek110237  * flowop has has a name such as "read", and a set of function pointers
825184Sek110237  * to call for initialization, execution and destruction of the flowop.
835184Sek110237  * The table flowoplib_funcs[] contains a flowoplib struct for each
845184Sek110237  * implemented flowop. Most flowops use a generic initialization function
855184Sek110237  * and all currently use a generic destruction function. All flowop
865184Sek110237  * functions referenced from the table are in this file, though, of
875184Sek110237  * course, they often call functions from other files.
885184Sek110237  *
895184Sek110237  * The flowop_init() routine uses the flowoplib_funcs[] table to
905184Sek110237  * create an initial set of "instance 0" flowops, one for each type of
915184Sek110237  * flowop, from which all other flowops are derived. These "instance 0"
925184Sek110237  * flowops are initialized with information from the table including
935184Sek110237  * pointers for their fo_init, fo_func and fo_destroy functions. When
945184Sek110237  * a flowop definition is encountered in an f language script, the
955184Sek110237  * "type" of flowop, such as "read" is used to search for the
965184Sek110237  * "instance 0" flowop named "read", then a new flowop is allocated
975184Sek110237  * which inherits its function pointers and other initial properties
985184Sek110237  * from the instance 0 flowop, and is given a new name as specified
995184Sek110237  * by the "name=" attribute.
1005184Sek110237  */
1015184Sek110237 
1025184Sek110237 static int flowoplib_init_generic(flowop_t *flowop);
1035184Sek110237 static void flowoplib_destruct_generic(flowop_t *flowop);
1046084Saw148015 static void flowoplib_destruct_noop(flowop_t *flowop);
1055184Sek110237 static int flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop);
106*7556SAndrew.W.Wilson@sun.com static int flowoplib_print(threadflow_t *threadflow, flowop_t *flowop);
1075184Sek110237 static int flowoplib_write(threadflow_t *threadflow, flowop_t *flowop);
1085184Sek110237 #ifdef HAVE_AIO
1095184Sek110237 static int flowoplib_aiowrite(threadflow_t *threadflow, flowop_t *flowop);
1105184Sek110237 static int flowoplib_aiowait(threadflow_t *threadflow, flowop_t *flowop);
1115184Sek110237 #endif
1125184Sek110237 static int flowoplib_read(threadflow_t *threadflow, flowop_t *flowop);
1135184Sek110237 static int flowoplib_block_init(flowop_t *flowop);
1145184Sek110237 static int flowoplib_block(threadflow_t *threadflow, flowop_t *flowop);
1155184Sek110237 static int flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop);
1165184Sek110237 static int flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop);
1175184Sek110237 static int flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop);
1185184Sek110237 static int flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop);
1195184Sek110237 static int flowoplib_sempost_init(flowop_t *flowop);
1205184Sek110237 static int flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop);
1215184Sek110237 static int flowoplib_semblock_init(flowop_t *flowop);
1225184Sek110237 static void flowoplib_semblock_destruct(flowop_t *flowop);
1235184Sek110237 static int flowoplib_eventlimit(threadflow_t *, flowop_t *flowop);
1245184Sek110237 static int flowoplib_bwlimit(threadflow_t *, flowop_t *flowop);
1255184Sek110237 static int flowoplib_iopslimit(threadflow_t *, flowop_t *flowop);
1265184Sek110237 static int flowoplib_opslimit(threadflow_t *, flowop_t *flowop);
1275184Sek110237 static int flowoplib_openfile(threadflow_t *, flowop_t *flowop);
1285184Sek110237 static int flowoplib_openfile_common(threadflow_t *, flowop_t *flowop, int fd);
1295184Sek110237 static int flowoplib_createfile(threadflow_t *, flowop_t *flowop);
1305184Sek110237 static int flowoplib_closefile(threadflow_t *, flowop_t *flowop);
1315184Sek110237 static int flowoplib_fsync(threadflow_t *, flowop_t *flowop);
1325184Sek110237 static int flowoplib_readwholefile(threadflow_t *, flowop_t *flowop);
1335184Sek110237 static int flowoplib_writewholefile(threadflow_t *, flowop_t *flowop);
1345184Sek110237 static int flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop);
1355184Sek110237 static int flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop);
1365184Sek110237 static int flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop);
1375184Sek110237 static int flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop);
1385184Sek110237 static int flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop);
1395184Sek110237 static int flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop);
1405184Sek110237 static int flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop);
1416212Saw148015 static int flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop);
1426212Saw148015 static int flowoplib_testrandvar_init(flowop_t *flowop);
1436212Saw148015 static void flowoplib_testrandvar_destruct(flowop_t *flowop);
1445184Sek110237 
1455184Sek110237 typedef struct flowoplib {
1465184Sek110237 	int	fl_type;
1475184Sek110237 	int	fl_attrs;
1485184Sek110237 	char	*fl_name;
1495184Sek110237 	int	(*fl_init)();
1505184Sek110237 	int	(*fl_func)();
1515184Sek110237 	void	(*fl_destruct)();
1525184Sek110237 } flowoplib_t;
1535184Sek110237 
1545184Sek110237 static flowoplib_t flowoplib_funcs[] = {
1555184Sek110237 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "write", flowoplib_init_generic,
1565184Sek110237 	flowoplib_write, flowoplib_destruct_generic,
1575184Sek110237 	FLOW_TYPE_IO, FLOW_ATTR_READ, "read", flowoplib_init_generic,
1585184Sek110237 	flowoplib_read, flowoplib_destruct_generic,
1595184Sek110237 #ifdef HAVE_AIO
1605184Sek110237 	FLOW_TYPE_AIO, FLOW_ATTR_WRITE, "aiowrite", flowoplib_init_generic,
1615184Sek110237 	flowoplib_aiowrite, flowoplib_destruct_generic,
1625184Sek110237 	FLOW_TYPE_AIO, 0, "aiowait", flowoplib_init_generic,
1635184Sek110237 	flowoplib_aiowait, flowoplib_destruct_generic,
1645184Sek110237 #endif
1655184Sek110237 	FLOW_TYPE_SYNC, 0, "block", flowoplib_block_init,
1665184Sek110237 	flowoplib_block, flowoplib_destruct_generic,
1675184Sek110237 	FLOW_TYPE_SYNC, 0, "wakeup", flowoplib_init_generic,
1685184Sek110237 	flowoplib_wakeup, flowoplib_destruct_generic,
1695184Sek110237 	FLOW_TYPE_SYNC, 0, "semblock", flowoplib_semblock_init,
1705184Sek110237 	flowoplib_semblock, flowoplib_semblock_destruct,
1715184Sek110237 	FLOW_TYPE_SYNC, 0, "sempost", flowoplib_sempost_init,
1726084Saw148015 	flowoplib_sempost, flowoplib_destruct_noop,
1735184Sek110237 	FLOW_TYPE_OTHER, 0, "hog", flowoplib_init_generic,
1745184Sek110237 	flowoplib_hog, flowoplib_destruct_generic,
1755184Sek110237 	FLOW_TYPE_OTHER, 0, "delay", flowoplib_init_generic,
1765184Sek110237 	flowoplib_delay, flowoplib_destruct_generic,
1775184Sek110237 	FLOW_TYPE_OTHER, 0, "eventlimit", flowoplib_init_generic,
1785184Sek110237 	flowoplib_eventlimit, flowoplib_destruct_generic,
1795184Sek110237 	FLOW_TYPE_OTHER, 0, "bwlimit", flowoplib_init_generic,
1805184Sek110237 	flowoplib_bwlimit, flowoplib_destruct_generic,
1815184Sek110237 	FLOW_TYPE_OTHER, 0, "iopslimit", flowoplib_init_generic,
1825184Sek110237 	flowoplib_iopslimit, flowoplib_destruct_generic,
1835184Sek110237 	FLOW_TYPE_OTHER, 0, "opslimit", flowoplib_init_generic,
1845184Sek110237 	flowoplib_opslimit, flowoplib_destruct_generic,
1855184Sek110237 	FLOW_TYPE_OTHER, 0, "finishoncount", flowoplib_init_generic,
1865184Sek110237 	flowoplib_finishoncount, flowoplib_destruct_generic,
1875184Sek110237 	FLOW_TYPE_OTHER, 0, "finishonbytes", flowoplib_init_generic,
1885184Sek110237 	flowoplib_finishonbytes, flowoplib_destruct_generic,
1895184Sek110237 	FLOW_TYPE_IO, 0, "openfile", flowoplib_init_generic,
1905184Sek110237 	flowoplib_openfile, flowoplib_destruct_generic,
1915184Sek110237 	FLOW_TYPE_IO, 0, "createfile", flowoplib_init_generic,
1925184Sek110237 	flowoplib_createfile, flowoplib_destruct_generic,
1935184Sek110237 	FLOW_TYPE_IO, 0, "closefile", flowoplib_init_generic,
1945184Sek110237 	flowoplib_closefile, flowoplib_destruct_generic,
1955184Sek110237 	FLOW_TYPE_IO, 0, "fsync", flowoplib_init_generic,
1965184Sek110237 	flowoplib_fsync, flowoplib_destruct_generic,
1975184Sek110237 	FLOW_TYPE_IO, 0, "fsyncset", flowoplib_init_generic,
1985184Sek110237 	flowoplib_fsyncset, flowoplib_destruct_generic,
1995184Sek110237 	FLOW_TYPE_IO, 0, "statfile", flowoplib_init_generic,
2005184Sek110237 	flowoplib_statfile, flowoplib_destruct_generic,
2015184Sek110237 	FLOW_TYPE_IO, FLOW_ATTR_READ, "readwholefile", flowoplib_init_generic,
2025184Sek110237 	flowoplib_readwholefile, flowoplib_destruct_generic,
2035184Sek110237 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfile", flowoplib_init_generic,
2045184Sek110237 	flowoplib_appendfile, flowoplib_destruct_generic,
2055184Sek110237 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfilerand", flowoplib_init_generic,
2065184Sek110237 	flowoplib_appendfilerand, flowoplib_destruct_generic,
2075184Sek110237 	FLOW_TYPE_IO, 0, "deletefile", flowoplib_init_generic,
2085184Sek110237 	flowoplib_deletefile, flowoplib_destruct_generic,
2095184Sek110237 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "writewholefile", flowoplib_init_generic,
2106212Saw148015 	flowoplib_writewholefile, flowoplib_destruct_generic,
211*7556SAndrew.W.Wilson@sun.com 	FLOW_TYPE_OTHER, 0, "print", flowoplib_init_generic,
212*7556SAndrew.W.Wilson@sun.com 	flowoplib_print, flowoplib_destruct_generic,
2136212Saw148015 	/* routine to calculate mean and stddev for output from a randvar */
2146212Saw148015 	FLOW_TYPE_OTHER, 0, "testrandvar", flowoplib_testrandvar_init,
2156212Saw148015 	flowoplib_testrandvar, flowoplib_testrandvar_destruct
2165184Sek110237 };
2175184Sek110237 
2185184Sek110237 /*
2195184Sek110237  * Loops through the master list of flowops defined in this
2205184Sek110237  * module, and creates and initializes a flowop for each one
2215184Sek110237  * by calling flowop_define. As a side effect of calling
2225184Sek110237  * flowop define, the created flowops are placed on the
2235184Sek110237  * master flowop list. All created flowops are set to
2245184Sek110237  * instance "0".
2255184Sek110237  */
2265184Sek110237 void
2275184Sek110237 flowoplib_init()
2285184Sek110237 {
2295184Sek110237 	int nops = sizeof (flowoplib_funcs) / sizeof (flowoplib_t);
2305184Sek110237 	int i;
2315184Sek110237 
2325184Sek110237 	for (i = 0; i < nops; i++) {
2335184Sek110237 		flowop_t *flowop;
2345184Sek110237 		flowoplib_t *fl;
2355184Sek110237 
2365184Sek110237 		fl = &flowoplib_funcs[i];
2375184Sek110237 
2385184Sek110237 		if ((flowop = flowop_define(NULL,
2396550Saw148015 		    fl->fl_name, NULL, NULL, 0, fl->fl_type)) == 0) {
2405184Sek110237 			filebench_log(LOG_ERROR,
2415184Sek110237 			    "failed to create flowop %s\n",
2425184Sek110237 			    fl->fl_name);
2435184Sek110237 			filebench_shutdown(1);
2445184Sek110237 		}
2455184Sek110237 
2465184Sek110237 		flowop->fo_func = fl->fl_func;
2475184Sek110237 		flowop->fo_init = fl->fl_init;
2485184Sek110237 		flowop->fo_destruct = fl->fl_destruct;
2495184Sek110237 		flowop->fo_attrs = fl->fl_attrs;
2505184Sek110237 	}
2515184Sek110237 }
2525184Sek110237 
2535184Sek110237 static int
2545184Sek110237 flowoplib_init_generic(flowop_t *flowop)
2555184Sek110237 {
2565184Sek110237 	(void) ipc_mutex_unlock(&flowop->fo_lock);
2576084Saw148015 	return (FILEBENCH_OK);
2585184Sek110237 }
2595184Sek110237 
2605184Sek110237 static void
2615184Sek110237 flowoplib_destruct_generic(flowop_t *flowop)
2625184Sek110237 {
2636084Saw148015 	char *buf;
2646084Saw148015 
2656084Saw148015 	/* release any local resources held by the flowop */
2666084Saw148015 	(void) ipc_mutex_lock(&flowop->fo_lock);
2676084Saw148015 	buf = flowop->fo_buf;
2686084Saw148015 	flowop->fo_buf = NULL;
2696084Saw148015 	(void) ipc_mutex_unlock(&flowop->fo_lock);
2706084Saw148015 
2716084Saw148015 	if (buf)
2726084Saw148015 		free(buf);
2736084Saw148015 }
2746084Saw148015 
2756084Saw148015 /*
2766084Saw148015  * Special total noop destruct
2776084Saw148015  */
2786084Saw148015 /* ARGSUSED */
2796084Saw148015 static void
2806084Saw148015 flowoplib_destruct_noop(flowop_t *flowop)
2816084Saw148015 {
2825184Sek110237 }
2835184Sek110237 
2845184Sek110237 /*
2855184Sek110237  * Generates a file attribute from flags in the supplied flowop.
2865184Sek110237  * Sets FLOW_ATTR_DIRECTIO and/or FLOW_ATTR_DSYNC as needed.
2875184Sek110237  */
2885184Sek110237 static int
2895184Sek110237 flowoplib_fileattrs(flowop_t *flowop)
2905184Sek110237 {
2915184Sek110237 	int attrs = 0;
2925184Sek110237 
2936212Saw148015 	if (avd_get_bool(flowop->fo_directio))
2945184Sek110237 		attrs |= FLOW_ATTR_DIRECTIO;
2955184Sek110237 
2966212Saw148015 	if (avd_get_bool(flowop->fo_dsync))
2975184Sek110237 		attrs |= FLOW_ATTR_DSYNC;
2985184Sek110237 
2995184Sek110237 	return (attrs);
3005184Sek110237 }
3015184Sek110237 
3025184Sek110237 /*
3035184Sek110237  * Searches for a file descriptor. Tries the flowop's
3045184Sek110237  * fo_fdnumber first and returns with it if it has been
3055184Sek110237  * explicitly set (greater than 0). It next checks to
3065184Sek110237  * see if a rotating file descriptor policy is in effect,
3075184Sek110237  * and if not returns the fdnumber regardless of what
3085184Sek110237  * it is. (note that if it is 0, it just selects to the
3095184Sek110237  * default file descriptor in the threadflow's tf_fd
3105184Sek110237  * array). If the rotating fd policy is in effect, it
3115184Sek110237  * cycles from the end of the tf_fd array to one location
3125184Sek110237  * beyond the maximum needed by the number of entries in
3135184Sek110237  * the associated fileset on each invocation, then starts
3145184Sek110237  * over from the end.
3155184Sek110237  *
3165184Sek110237  * The routine returns an index into the threadflow's
3175184Sek110237  * tf_fd table where the actual file descriptor will be
3185184Sek110237  * found. Note: the calling routine must not call this
3195184Sek110237  * routine if the flowop does not have a fileset, and the
3205184Sek110237  * flowop's fo_fdnumber is zero and fo_rotatefd is
3215184Sek110237  * asserted, or an addressing fault may occur.
3225184Sek110237  */
3235673Saw148015 static int
3245184Sek110237 flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop)
3255184Sek110237 {
3266212Saw148015 	fbint_t	entries;
3276391Saw148015 	int fdnumber = flowop->fo_fdnumber;
3286212Saw148015 
3295184Sek110237 	/* If the script sets the fd explicitly */
3306391Saw148015 	if (fdnumber > 0)
3316391Saw148015 		return (fdnumber);
3325184Sek110237 
3335184Sek110237 	/* If the flowop defaults to persistent fd */
3346212Saw148015 	if (!avd_get_bool(flowop->fo_rotatefd))
3356391Saw148015 		return (fdnumber);
3366391Saw148015 
3376391Saw148015 	if (flowop->fo_fileset == NULL) {
3386391Saw148015 		filebench_log(LOG_ERROR, "flowop NULL file");
3396391Saw148015 		return (FILEBENCH_ERROR);
3406391Saw148015 	}
3415184Sek110237 
3426212Saw148015 	entries = flowop->fo_fileset->fs_constentries;
3436212Saw148015 
3445184Sek110237 	/* Rotate the fd on each flowop invocation */
3456212Saw148015 	if (entries > (THREADFLOW_MAXFD / 2)) {
3465184Sek110237 		filebench_log(LOG_ERROR, "Out of file descriptors in flowop %s"
3476286Saw148015 		    " (too many files : %llu",
3486286Saw148015 		    flowop->fo_name, (u_longlong_t)entries);
3496084Saw148015 		return (FILEBENCH_ERROR);
3505184Sek110237 	}
3515184Sek110237 
3525184Sek110237 	/* First time around */
3535184Sek110237 	if (threadflow->tf_fdrotor == 0)
3545184Sek110237 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
3555184Sek110237 
3565184Sek110237 	/* One fd for every file in the set */
3576212Saw148015 	if (entries == (THREADFLOW_MAXFD - threadflow->tf_fdrotor))
3585184Sek110237 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
3595184Sek110237 
3605184Sek110237 
3615184Sek110237 	threadflow->tf_fdrotor--;
3625184Sek110237 	filebench_log(LOG_DEBUG_IMPL, "selected fd = %d",
3635184Sek110237 	    threadflow->tf_fdrotor);
3645184Sek110237 	return (threadflow->tf_fdrotor);
3655184Sek110237 }
3665184Sek110237 
3675184Sek110237 /*
3685673Saw148015  * Determines the file descriptor to use, and attempts to open
3695673Saw148015  * the file if it is not already open. Also determines the wss
3706084Saw148015  * value. Returns FILEBENCH_ERROR on errors, FILESET_NORSC if
3716084Saw148015  * if flowop_openfile_common couldn't obtain an appropriate file
3726084Saw148015  * from a the fileset, and FILEBENCH_OK otherwise.
3735673Saw148015  */
3745673Saw148015 static int
3755673Saw148015 flowoplib_filesetup(threadflow_t *threadflow, flowop_t *flowop,
3766212Saw148015     fbint_t *wssp, int *filedescp)
3775673Saw148015 {
3785673Saw148015 	int fd = flowoplib_fdnum(threadflow, flowop);
3795673Saw148015 
3805673Saw148015 	if (fd == -1)
3816084Saw148015 		return (FILEBENCH_ERROR);
3825673Saw148015 
3835673Saw148015 	if (threadflow->tf_fd[fd] == 0) {
3846084Saw148015 		int ret;
3856084Saw148015 
3866084Saw148015 		if ((ret = flowoplib_openfile_common(
3876084Saw148015 		    threadflow, flowop, fd)) != FILEBENCH_OK)
3886084Saw148015 			return (ret);
3895673Saw148015 
3905673Saw148015 		if (threadflow->tf_fse[fd]) {
3915673Saw148015 			filebench_log(LOG_DEBUG_IMPL, "opened file %s",
3925673Saw148015 			    threadflow->tf_fse[fd]->fse_path);
3935673Saw148015 		} else {
3945673Saw148015 			filebench_log(LOG_DEBUG_IMPL,
3955673Saw148015 			    "opened device %s/%s",
3966212Saw148015 			    avd_get_str(flowop->fo_fileset->fs_path),
3976212Saw148015 			    avd_get_str(flowop->fo_fileset->fs_name));
3985673Saw148015 		}
3995673Saw148015 	}
4005673Saw148015 
4015673Saw148015 	*filedescp = threadflow->tf_fd[fd];
4025673Saw148015 
4036212Saw148015 	if ((*wssp = flowop->fo_constwss) == 0) {
4045673Saw148015 		if (threadflow->tf_fse[fd])
4055673Saw148015 			*wssp = threadflow->tf_fse[fd]->fse_size;
4065673Saw148015 		else
4076212Saw148015 			*wssp = avd_get_int(flowop->fo_fileset->fs_size);
4085673Saw148015 	}
4095673Saw148015 
4106084Saw148015 	return (FILEBENCH_OK);
4115673Saw148015 }
4125673Saw148015 
4135673Saw148015 /*
4145673Saw148015  * Determines the io buffer or random offset into tf_mem for
4156084Saw148015  * the IO operation. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
4165673Saw148015  */
4175673Saw148015 static int
4185673Saw148015 flowoplib_iobufsetup(threadflow_t *threadflow, flowop_t *flowop,
4196212Saw148015     caddr_t *iobufp, fbint_t iosize)
4205673Saw148015 {
4215673Saw148015 	long memsize;
4225673Saw148015 	size_t memoffset;
4235673Saw148015 
4245673Saw148015 	if (iosize == 0) {
4255673Saw148015 		filebench_log(LOG_ERROR, "zero iosize for thread %s",
4265673Saw148015 		    flowop->fo_name);
4276084Saw148015 		return (FILEBENCH_ERROR);
4285673Saw148015 	}
4295673Saw148015 
4306212Saw148015 	if ((memsize = threadflow->tf_constmemsize) != 0) {
4315673Saw148015 
4325673Saw148015 		/* use tf_mem for I/O with random offset */
4336212Saw148015 		if (filebench_randomno(&memoffset,
4346212Saw148015 		    memsize, iosize, NULL) == -1) {
4355673Saw148015 			filebench_log(LOG_ERROR,
4365673Saw148015 			    "tf_memsize smaller than IO size for thread %s",
4375673Saw148015 			    flowop->fo_name);
4386084Saw148015 			return (FILEBENCH_ERROR);
4395673Saw148015 		}
4405673Saw148015 		*iobufp = threadflow->tf_mem + memoffset;
4415673Saw148015 
4425673Saw148015 	} else {
4435673Saw148015 		/* use private I/O buffer */
4445673Saw148015 		if ((flowop->fo_buf != NULL) &&
4455673Saw148015 		    (flowop->fo_buf_size < iosize)) {
4466212Saw148015 			/* too small, so free up and re-allocate */
4475673Saw148015 			free(flowop->fo_buf);
4485673Saw148015 			flowop->fo_buf = NULL;
4495673Saw148015 		}
4506212Saw148015 
4516212Saw148015 		/*
4526212Saw148015 		 * Allocate memory for the  buffer. The memory is freed
4536212Saw148015 		 * by flowop_destruct_generic() or by this routine if more
4546212Saw148015 		 * memory is needed for the buffer.
4556212Saw148015 		 */
4565673Saw148015 		if ((flowop->fo_buf == NULL) && ((flowop->fo_buf
4575673Saw148015 		    = (char *)malloc(iosize)) == NULL))
4586084Saw148015 			return (FILEBENCH_ERROR);
4595673Saw148015 
4605673Saw148015 		flowop->fo_buf_size = iosize;
4615673Saw148015 		*iobufp = flowop->fo_buf;
4625673Saw148015 	}
4636084Saw148015 	return (FILEBENCH_OK);
4645673Saw148015 }
4655673Saw148015 
4665673Saw148015 /*
4675673Saw148015  * Determines the file descriptor to use, opens it if necessary, the
4685673Saw148015  * io buffer or random offset into tf_mem for IO operation and the wss
4696084Saw148015  * value. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
4705673Saw148015  */
4715673Saw148015 static int
4725673Saw148015 flowoplib_iosetup(threadflow_t *threadflow, flowop_t *flowop,
4736212Saw148015     fbint_t *wssp, caddr_t *iobufp, int *filedescp, fbint_t iosize)
4745673Saw148015 {
4756084Saw148015 	int ret;
4766084Saw148015 
4776084Saw148015 	if ((ret = flowoplib_filesetup(threadflow, flowop, wssp, filedescp)) !=
4786084Saw148015 	    FILEBENCH_OK)
4796084Saw148015 		return (ret);
4805673Saw148015 
4816084Saw148015 	if ((ret = flowoplib_iobufsetup(threadflow, flowop, iobufp, iosize)) !=
4826084Saw148015 	    FILEBENCH_OK)
4836084Saw148015 		return (ret);
4845673Saw148015 
4856084Saw148015 	return (FILEBENCH_OK);
4865673Saw148015 }
4875673Saw148015 
4885673Saw148015 /*
4895184Sek110237  * Emulate posix read / pread. If the flowop has a fileset,
4905184Sek110237  * a file descriptor number index is fetched, otherwise a
4915184Sek110237  * supplied fileobj file is used. In either case the specified
4925184Sek110237  * file will be opened if not already open. If the flowop has
4936084Saw148015  * neither a fileset or fileobj, an error is logged and FILEBENCH_ERROR
4945184Sek110237  * returned.
4955184Sek110237  *
4965184Sek110237  * The actual read is done to a random offset in the
4975184Sek110237  * threadflow's thread memory (tf_mem), with a size set by
4985184Sek110237  * fo_iosize and at either a random disk offset within the
4995184Sek110237  * working set size, or at the next sequential location. If
5006084Saw148015  * any errors are encountered, FILEBENCH_ERROR is returned,
5016084Saw148015  * if no appropriate file can be obtained from the fileset then
5026084Saw148015  * FILEBENCH_NORSC is returned, otherise FILEBENCH_OK is returned.
5035184Sek110237  */
5045184Sek110237 static int
5055184Sek110237 flowoplib_read(threadflow_t *threadflow, flowop_t *flowop)
5065184Sek110237 {
5075673Saw148015 	caddr_t iobuf;
5086212Saw148015 	fbint_t wss;
5096212Saw148015 	fbint_t iosize;
5105184Sek110237 	int filedesc;
5115184Sek110237 	int ret;
5125184Sek110237 
5136212Saw148015 
5146212Saw148015 	iosize = avd_get_int(flowop->fo_iosize);
5156084Saw148015 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
5166212Saw148015 	    &filedesc, iosize)) != FILEBENCH_OK)
5176084Saw148015 		return (ret);
5185184Sek110237 
5196212Saw148015 	if (avd_get_bool(flowop->fo_random)) {
5205184Sek110237 		uint64_t fileoffset;
5215184Sek110237 
5226212Saw148015 		if (filebench_randomno64(&fileoffset,
5236212Saw148015 		    wss, iosize, NULL) == -1) {
5245184Sek110237 			filebench_log(LOG_ERROR,
5255184Sek110237 			    "file size smaller than IO size for thread %s",
5265184Sek110237 			    flowop->fo_name);
5276084Saw148015 			return (FILEBENCH_ERROR);
5285184Sek110237 		}
5295184Sek110237 
5305184Sek110237 		(void) flowop_beginop(threadflow, flowop);
5315673Saw148015 		if ((ret = pread64(filedesc, iobuf,
5326212Saw148015 		    iosize, (off64_t)fileoffset)) == -1) {
5335673Saw148015 			(void) flowop_endop(threadflow, flowop, 0);
5345184Sek110237 			filebench_log(LOG_ERROR,
5356286Saw148015 			    "read file %s failed, offset %llu "
5365673Saw148015 			    "io buffer %zd: %s",
5376212Saw148015 			    avd_get_str(flowop->fo_fileset->fs_name),
5386286Saw148015 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
5395673Saw148015 			flowop_endop(threadflow, flowop, 0);
5406084Saw148015 			return (FILEBENCH_ERROR);
5415184Sek110237 		}
5425673Saw148015 		(void) flowop_endop(threadflow, flowop, ret);
5435184Sek110237 
5445184Sek110237 		if ((ret == 0))
5455184Sek110237 			(void) lseek64(filedesc, 0, SEEK_SET);
5465184Sek110237 
5475184Sek110237 	} else {
5485184Sek110237 		(void) flowop_beginop(threadflow, flowop);
5496212Saw148015 		if ((ret = read(filedesc, iobuf, iosize)) == -1) {
5506212Saw148015 			(void) flowop_endop(threadflow, flowop, 0);
5515184Sek110237 			filebench_log(LOG_ERROR,
5525673Saw148015 			    "read file %s failed, io buffer %zd: %s",
5536212Saw148015 			    avd_get_str(flowop->fo_fileset->fs_name),
5545673Saw148015 			    iobuf, strerror(errno));
5555673Saw148015 			(void) flowop_endop(threadflow, flowop, 0);
5566084Saw148015 			return (FILEBENCH_ERROR);
5575184Sek110237 		}
5585673Saw148015 		(void) flowop_endop(threadflow, flowop, ret);
5595184Sek110237 
5605184Sek110237 		if ((ret == 0))
5615184Sek110237 			(void) lseek64(filedesc, 0, SEEK_SET);
5625184Sek110237 	}
5635184Sek110237 
5646084Saw148015 	return (FILEBENCH_OK);
5655184Sek110237 }
5665184Sek110237 
5675184Sek110237 #ifdef HAVE_AIO
5685184Sek110237 
5695184Sek110237 /*
5705184Sek110237  * Asynchronous write section. An Asynchronous IO element
5715184Sek110237  * (aiolist_t) is used to associate the asynchronous write request with
5725184Sek110237  * its subsequent completion. This element includes a aiocb64 struct
5735184Sek110237  * that is used by posix aio_xxx calls to track the asynchronous writes.
5745184Sek110237  * The flowops aiowrite and aiowait result in calls to these posix
5755184Sek110237  * aio_xxx system routines to do the actual asynchronous write IO
5765184Sek110237  * operations.
5775184Sek110237  */
5785184Sek110237 
5795184Sek110237 
5805184Sek110237 /*
5815184Sek110237  * Allocates an asynchronous I/O list (aio, of type
5825184Sek110237  * aiolist_t) element. Adds it to the flowop thread's
5835184Sek110237  * threadflow aio list. Returns a pointer to the element.
5845184Sek110237  */
5855184Sek110237 static aiolist_t *
5865184Sek110237 aio_allocate(flowop_t *flowop)
5875184Sek110237 {
5885184Sek110237 	aiolist_t *aiolist;
5895184Sek110237 
5905184Sek110237 	if ((aiolist = malloc(sizeof (aiolist_t))) == NULL) {
5915184Sek110237 		filebench_log(LOG_ERROR, "malloc aiolist failed");
5925184Sek110237 		filebench_shutdown(1);
5935184Sek110237 	}
5945184Sek110237 
5955184Sek110237 	/* Add to list */
5965184Sek110237 	if (flowop->fo_thread->tf_aiolist == NULL) {
5975184Sek110237 		flowop->fo_thread->tf_aiolist = aiolist;
5985184Sek110237 		aiolist->al_next = NULL;
5995184Sek110237 	} else {
6005184Sek110237 		aiolist->al_next = flowop->fo_thread->tf_aiolist;
6015184Sek110237 		flowop->fo_thread->tf_aiolist = aiolist;
6025184Sek110237 	}
6035184Sek110237 	return (aiolist);
6045184Sek110237 }
6055184Sek110237 
6065184Sek110237 /*
6075184Sek110237  * Searches for the aiolist element that has a matching
6086084Saw148015  * completion block, aiocb. If none found returns FILEBENCH_ERROR. If
6095184Sek110237  * found, removes the aiolist element from flowop thread's
6106084Saw148015  * list and returns FILEBENCH_OK.
6115184Sek110237  */
6125184Sek110237 static int
6135184Sek110237 aio_deallocate(flowop_t *flowop, struct aiocb64 *aiocb)
6145184Sek110237 {
6155184Sek110237 	aiolist_t *aiolist = flowop->fo_thread->tf_aiolist;
6165184Sek110237 	aiolist_t *previous = NULL;
6175184Sek110237 	aiolist_t *match = NULL;
6185184Sek110237 
6195184Sek110237 	if (aiocb == NULL) {
6205184Sek110237 		filebench_log(LOG_ERROR, "null aiocb deallocate");
6216084Saw148015 		return (FILEBENCH_OK);
6225184Sek110237 	}
6235184Sek110237 
6245184Sek110237 	while (aiolist) {
6255184Sek110237 		if (aiocb == &(aiolist->al_aiocb)) {
6265184Sek110237 			match = aiolist;
6275184Sek110237 			break;
6285184Sek110237 		}
6295184Sek110237 		previous = aiolist;
6305184Sek110237 		aiolist = aiolist->al_next;
6315184Sek110237 	}
6325184Sek110237 
6335184Sek110237 	if (match == NULL)
6346084Saw148015 		return (FILEBENCH_ERROR);
6355184Sek110237 
6365184Sek110237 	/* Remove from the list */
6375184Sek110237 	if (previous)
6385184Sek110237 		previous->al_next = match->al_next;
6395184Sek110237 	else
6405184Sek110237 		flowop->fo_thread->tf_aiolist = match->al_next;
6415184Sek110237 
6426084Saw148015 	return (FILEBENCH_OK);
6435184Sek110237 }
6445184Sek110237 
6455184Sek110237 /*
6465184Sek110237  * Emulate posix aiowrite(). Determines which file to use,
6475184Sek110237  * either one file of a fileset, or the file associated
6485184Sek110237  * with a fileobj, allocates and fills an aiolist_t element
6495184Sek110237  * for the write, and issues the asynchronous write. This
6505184Sek110237  * operation is only valid for random IO, and returns an
6516084Saw148015  * error if the flowop is set for sequential IO. Returns
6526084Saw148015  * FILEBENCH_OK on success, FILEBENCH_NORSC if iosetup can't
6536084Saw148015  * obtain a file to open, and FILEBENCH_ERROR on any
6546084Saw148015  * encountered error.
6555184Sek110237  */
6565184Sek110237 static int
6575184Sek110237 flowoplib_aiowrite(threadflow_t *threadflow, flowop_t *flowop)
6585184Sek110237 {
6595673Saw148015 	caddr_t iobuf;
6606212Saw148015 	fbint_t wss;
6616212Saw148015 	fbint_t iosize;
6625184Sek110237 	int filedesc;
6636084Saw148015 	int ret;
6645184Sek110237 
6656212Saw148015 	iosize = avd_get_int(flowop->fo_iosize);
6666212Saw148015 
6676084Saw148015 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
6686212Saw148015 	    &filedesc, iosize)) != FILEBENCH_OK)
6696084Saw148015 		return (ret);
6705184Sek110237 
6716212Saw148015 	if (avd_get_bool(flowop->fo_random)) {
6725184Sek110237 		uint64_t fileoffset;
6735184Sek110237 		struct aiocb64 *aiocb;
6745184Sek110237 		aiolist_t *aiolist;
6755184Sek110237 
6765184Sek110237 		if (filebench_randomno64(&fileoffset,
6776212Saw148015 		    wss, iosize, NULL) == -1) {
6785184Sek110237 			filebench_log(LOG_ERROR,
6795184Sek110237 			    "file size smaller than IO size for thread %s",
6805184Sek110237 			    flowop->fo_name);
6816084Saw148015 			return (FILEBENCH_ERROR);
6825184Sek110237 		}
6835184Sek110237 
6845184Sek110237 		aiolist = aio_allocate(flowop);
6855184Sek110237 		aiolist->al_type = AL_WRITE;
6865184Sek110237 		aiocb = &aiolist->al_aiocb;
6875184Sek110237 
6885184Sek110237 		aiocb->aio_fildes = filedesc;
6895673Saw148015 		aiocb->aio_buf = iobuf;
6906212Saw148015 		aiocb->aio_nbytes = (size_t)iosize;
6915184Sek110237 		aiocb->aio_offset = (off64_t)fileoffset;
6925184Sek110237 		aiocb->aio_reqprio = 0;
6935184Sek110237 
6945184Sek110237 		filebench_log(LOG_DEBUG_IMPL,
6956286Saw148015 		    "aio fd=%d, bytes=%llu, offset=%llu",
6966286Saw148015 		    filedesc, (u_longlong_t)iosize, (u_longlong_t)fileoffset);
6975184Sek110237 
6985184Sek110237 		flowop_beginop(threadflow, flowop);
6995184Sek110237 		if (aio_write64(aiocb) < 0) {
7005184Sek110237 			filebench_log(LOG_ERROR, "aiowrite failed: %s",
7015184Sek110237 			    strerror(errno));
7025184Sek110237 			filebench_shutdown(1);
7035184Sek110237 		}
7046212Saw148015 		flowop_endop(threadflow, flowop, iosize);
7055184Sek110237 	} else {
7066084Saw148015 		return (FILEBENCH_ERROR);
7075184Sek110237 	}
7085184Sek110237 
7096084Saw148015 	return (FILEBENCH_OK);
7105184Sek110237 }
7115184Sek110237 
7125184Sek110237 
7135184Sek110237 
7145184Sek110237 #define	MAXREAP 4096
7155184Sek110237 
7165184Sek110237 /*
7175184Sek110237  * Emulate posix aiowait(). Waits for the completion of half the
7185184Sek110237  * outstanding asynchronous IOs, or a single IO, which ever is
7195184Sek110237  * larger. The routine will return after a sufficient number of
7205184Sek110237  * completed calls issued by any thread in the procflow have
7215184Sek110237  * completed, or a 1 second timout elapses. All completed
7225184Sek110237  * IO operations are deleted from the thread's aiolist.
7235184Sek110237  */
7245184Sek110237 static int
7255184Sek110237 flowoplib_aiowait(threadflow_t *threadflow, flowop_t *flowop)
7265184Sek110237 {
7275184Sek110237 	struct aiocb64 **worklist;
7285184Sek110237 	aiolist_t *aio = flowop->fo_thread->tf_aiolist;
7295184Sek110237 	int uncompleted = 0;
7305184Sek110237 
7315184Sek110237 	worklist = calloc(MAXREAP, sizeof (struct aiocb64 *));
7325184Sek110237 
7335184Sek110237 	/* Count the list of pending aios */
7345184Sek110237 	while (aio) {
7355184Sek110237 		uncompleted++;
7365184Sek110237 		aio = aio->al_next;
7375184Sek110237 	}
7385184Sek110237 
7395184Sek110237 	do {
7405184Sek110237 		uint_t ncompleted = 0;
7415184Sek110237 		uint_t todo;
7425184Sek110237 		struct timespec timeout;
7435184Sek110237 		int inprogress;
7445184Sek110237 		int i;
7455184Sek110237 
7465184Sek110237 		/* Wait for half of the outstanding requests */
7475184Sek110237 		timeout.tv_sec = 1;
7485184Sek110237 		timeout.tv_nsec = 0;
7495184Sek110237 
7505184Sek110237 		if (uncompleted > MAXREAP)
7515184Sek110237 			todo = MAXREAP;
7525184Sek110237 		else
7535184Sek110237 			todo = uncompleted / 2;
7545184Sek110237 
7555184Sek110237 		if (todo == 0)
7565184Sek110237 			todo = 1;
7575184Sek110237 
7585184Sek110237 		flowop_beginop(threadflow, flowop);
7595184Sek110237 
7605184Sek110237 #ifdef HAVE_AIOWAITN
7615184Sek110237 		if ((aio_waitn64((struct aiocb64 **)worklist,
7625184Sek110237 		    MAXREAP, &todo, &timeout) == -1) &&
7635184Sek110237 		    errno && (errno != ETIME)) {
7645184Sek110237 			filebench_log(LOG_ERROR,
7655184Sek110237 			    "aiowait failed: %s, outstanding = %d, "
7665184Sek110237 			    "ncompleted = %d ",
7675184Sek110237 			    strerror(errno), uncompleted, todo);
7685184Sek110237 		}
7695184Sek110237 
7705184Sek110237 		ncompleted = todo;
7715184Sek110237 		/* Take the  completed I/Os from the list */
7725184Sek110237 		inprogress = 0;
7735184Sek110237 		for (i = 0; i < ncompleted; i++) {
7745184Sek110237 			if ((aio_return64(worklist[i]) == -1) &&
7755184Sek110237 			    (errno == EINPROGRESS)) {
7765184Sek110237 				inprogress++;
7775184Sek110237 				continue;
7785184Sek110237 			}
7795184Sek110237 			if (aio_deallocate(flowop, worklist[i]) < 0) {
7805184Sek110237 				filebench_log(LOG_ERROR, "Could not remove "
7815184Sek110237 				    "aio from list ");
7825673Saw148015 				flowop_endop(threadflow, flowop, 0);
7836084Saw148015 				return (FILEBENCH_ERROR);
7845184Sek110237 			}
7855184Sek110237 		}
7865184Sek110237 
7875184Sek110237 		uncompleted -= ncompleted;
7885184Sek110237 		uncompleted += inprogress;
7895184Sek110237 
7905184Sek110237 #else
7915184Sek110237 
7925184Sek110237 		for (ncompleted = 0, inprogress = 0,
7935184Sek110237 		    aio = flowop->fo_thread->tf_aiolist;
7945184Sek110237 		    ncompleted < todo, aio != NULL; aio = aio->al_next) {
7956613Sek110237 			int result = aio_error64(&aio->al_aiocb);
7965184Sek110237 
7975184Sek110237 			if (result == EINPROGRESS) {
7985184Sek110237 				inprogress++;
7995184Sek110237 				continue;
8005184Sek110237 			}
8015184Sek110237 
8025184Sek110237 			if ((aio_return64(&aio->al_aiocb) == -1) || result) {
8035184Sek110237 				filebench_log(LOG_ERROR, "aio failed: %s",
8045184Sek110237 				    strerror(result));
8055184Sek110237 				continue;
8065184Sek110237 			}
8075184Sek110237 
8085184Sek110237 			ncompleted++;
8095184Sek110237 
8105184Sek110237 			if (aio_deallocate(flowop, &aio->al_aiocb) < 0) {
8115184Sek110237 				filebench_log(LOG_ERROR, "Could not remove aio "
8125184Sek110237 				    "from list ");
8135673Saw148015 				flowop_endop(threadflow, flowop, 0);
8146084Saw148015 				return (FILEBENCH_ERROR);
8155184Sek110237 			}
8165184Sek110237 		}
8175184Sek110237 
8185184Sek110237 		uncompleted -= ncompleted;
8195184Sek110237 
8205184Sek110237 #endif
8215184Sek110237 		filebench_log(LOG_DEBUG_SCRIPT,
8225184Sek110237 		    "aio2 completed %d ios, uncompleted = %d, inprogress = %d",
8235184Sek110237 		    ncompleted, uncompleted, inprogress);
8245184Sek110237 
8255184Sek110237 	} while (uncompleted > MAXREAP);
8265184Sek110237 
8275673Saw148015 	flowop_endop(threadflow, flowop, 0);
8285184Sek110237 
8295184Sek110237 	free(worklist);
8305184Sek110237 
8316084Saw148015 	return (FILEBENCH_OK);
8325184Sek110237 }
8335184Sek110237 
8345184Sek110237 #endif /* HAVE_AIO */
8355184Sek110237 
8365184Sek110237 /*
8375184Sek110237  * Initializes a "flowop_block" flowop. Specifically, it
8385184Sek110237  * initializes the flowop's fo_cv and unlocks the fo_lock.
8395184Sek110237  */
8405184Sek110237 static int
8415184Sek110237 flowoplib_block_init(flowop_t *flowop)
8425184Sek110237 {
8435184Sek110237 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d block init address %zx",
8445184Sek110237 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
8455184Sek110237 	(void) pthread_cond_init(&flowop->fo_cv, ipc_condattr());
8465184Sek110237 	(void) ipc_mutex_unlock(&flowop->fo_lock);
8475184Sek110237 
8486084Saw148015 	return (FILEBENCH_OK);
8495184Sek110237 }
8505184Sek110237 
8515184Sek110237 /*
8525184Sek110237  * Blocks the threadflow until woken up by flowoplib_wakeup.
8535184Sek110237  * The routine blocks on the flowop's fo_cv condition variable.
8545184Sek110237  */
8555184Sek110237 static int
8565184Sek110237 flowoplib_block(threadflow_t *threadflow, flowop_t *flowop)
8575184Sek110237 {
8585184Sek110237 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d blocking at address %zx",
8595184Sek110237 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
8605184Sek110237 	(void) ipc_mutex_lock(&flowop->fo_lock);
8615184Sek110237 
8625184Sek110237 	flowop_beginop(threadflow, flowop);
8635184Sek110237 	(void) pthread_cond_wait(&flowop->fo_cv, &flowop->fo_lock);
8645673Saw148015 	flowop_endop(threadflow, flowop, 0);
8655184Sek110237 
8665184Sek110237 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
8675184Sek110237 	    flowop->fo_name, flowop->fo_instance);
8685184Sek110237 
8695184Sek110237 	(void) ipc_mutex_unlock(&flowop->fo_lock);
8705184Sek110237 
8716084Saw148015 	return (FILEBENCH_OK);
8725184Sek110237 }
8735184Sek110237 
8745184Sek110237 /*
8755184Sek110237  * Wakes up one or more target blocking flowops.
8765184Sek110237  * Sends broadcasts on the fo_cv condition variables of all
8775184Sek110237  * flowops on the target list, except those that are
8785184Sek110237  * FLOW_MASTER flowops. The target list consists of all
8795184Sek110237  * flowops whose name matches this flowop's "fo_targetname"
8805184Sek110237  * attribute. The target list is generated on the first
8815184Sek110237  * invocation, and the run will be shutdown if no targets
8826084Saw148015  * are found. Otherwise the routine always returns FILEBENCH_OK.
8835184Sek110237  */
8845184Sek110237 static int
8855184Sek110237 flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop)
8865184Sek110237 {
8875184Sek110237 	flowop_t *target;
8885184Sek110237 
8895184Sek110237 	/* if this is the first wakeup, create the wakeup list */
8905184Sek110237 	if (flowop->fo_targets == NULL) {
8915184Sek110237 		flowop_t *result = flowop_find(flowop->fo_targetname);
8925184Sek110237 
8935184Sek110237 		flowop->fo_targets = result;
8945184Sek110237 		if (result == NULL) {
8955184Sek110237 			filebench_log(LOG_ERROR,
8965184Sek110237 			    "wakeup: could not find op %s for thread %s",
8975184Sek110237 			    flowop->fo_targetname,
8985184Sek110237 			    threadflow->tf_name);
8995184Sek110237 			filebench_shutdown(1);
9005184Sek110237 		}
9015184Sek110237 		while (result) {
9025184Sek110237 			result->fo_targetnext =
9035184Sek110237 			    result->fo_resultnext;
9045184Sek110237 			result = result->fo_resultnext;
9055184Sek110237 		}
9065184Sek110237 	}
9075184Sek110237 
9085184Sek110237 	target = flowop->fo_targets;
9095184Sek110237 
9105184Sek110237 	/* wakeup the targets */
9115184Sek110237 	while (target) {
9125184Sek110237 		if (target->fo_instance == FLOW_MASTER) {
9135184Sek110237 			target = target->fo_targetnext;
9145184Sek110237 			continue;
9155184Sek110237 		}
9165184Sek110237 		filebench_log(LOG_DEBUG_IMPL,
9175184Sek110237 		    "wakeup flow %s-%d at address %zx",
9185184Sek110237 		    target->fo_name,
9195184Sek110237 		    target->fo_instance,
9205184Sek110237 		    &target->fo_cv);
9215184Sek110237 
9225184Sek110237 		flowop_beginop(threadflow, flowop);
9235184Sek110237 		(void) ipc_mutex_lock(&target->fo_lock);
9245184Sek110237 		(void) pthread_cond_broadcast(&target->fo_cv);
9255184Sek110237 		(void) ipc_mutex_unlock(&target->fo_lock);
9265673Saw148015 		flowop_endop(threadflow, flowop, 0);
9275184Sek110237 
9285184Sek110237 		target = target->fo_targetnext;
9295184Sek110237 	}
9305184Sek110237 
9316084Saw148015 	return (FILEBENCH_OK);
9325184Sek110237 }
9335184Sek110237 
9345184Sek110237 /*
9355184Sek110237  * "think time" routines. the "hog" routine consumes cpu cycles as
9365184Sek110237  * it "thinks", while the "delay" flowop simply calls sleep() to delay
9375184Sek110237  * for a given number of seconds without consuming cpu cycles.
9385184Sek110237  */
9395184Sek110237 
9405184Sek110237 
9415184Sek110237 /*
9425184Sek110237  * Consumes CPU cycles and memory bandwidth by looping for
9435184Sek110237  * flowop->fo_value times. With each loop sets memory location
9445184Sek110237  * threadflow->tf_mem to 1.
9455184Sek110237  */
9465184Sek110237 static int
9475184Sek110237 flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop)
9485184Sek110237 {
9496212Saw148015 	uint64_t value = avd_get_int(flowop->fo_value);
9505184Sek110237 	int i;
9515184Sek110237 
9525673Saw148015 	filebench_log(LOG_DEBUG_IMPL, "hog enter");
9535184Sek110237 	flowop_beginop(threadflow, flowop);
9545673Saw148015 	if (threadflow->tf_mem != NULL) {
9555673Saw148015 		for (i = 0; i < value; i++)
9565673Saw148015 			*(threadflow->tf_mem) = 1;
9575673Saw148015 	}
9585673Saw148015 	flowop_endop(threadflow, flowop, 0);
9595184Sek110237 	filebench_log(LOG_DEBUG_IMPL, "hog exit");
9606084Saw148015 	return (FILEBENCH_OK);
9615184Sek110237 }
9625184Sek110237 
9635184Sek110237 
9645184Sek110237 /*
9655184Sek110237  * Delays for fo_value seconds.
9665184Sek110237  */
9675184Sek110237 static int
9685184Sek110237 flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop)
9695184Sek110237 {
9706212Saw148015 	int value = avd_get_int(flowop->fo_value);
9715184Sek110237 
9725184Sek110237 	flowop_beginop(threadflow, flowop);
9735184Sek110237 	(void) sleep(value);
9745673Saw148015 	flowop_endop(threadflow, flowop, 0);
9756084Saw148015 	return (FILEBENCH_OK);
9765184Sek110237 }
9775184Sek110237 
9785184Sek110237 /*
9795184Sek110237  * Rate limiting routines. This is the event consuming half of the
9805184Sek110237  * event system. Each of the four following routines will limit the rate
9815184Sek110237  * to one unit of either calls, issued I/O operations, issued filebench
9825184Sek110237  * operations, or I/O bandwidth. Since there is only one event generator,
9835184Sek110237  * the events will be divided amoung multiple instances of an event
9845184Sek110237  * consumer, and further divided among different consumers if more than
9855184Sek110237  * one has been defined. There is no mechanism to enforce equal sharing
9865184Sek110237  * of events.
9875184Sek110237  */
9885184Sek110237 
9895184Sek110237 /*
9905184Sek110237  * Completes one invocation per posted event. If eventgen_q
9915184Sek110237  * has an event count greater than zero, one will be removed
9925184Sek110237  * (count decremented), otherwise the calling thread will
9935184Sek110237  * block until another event has been posted. Always returns 0
9945184Sek110237  */
9955184Sek110237 static int
9965184Sek110237 flowoplib_eventlimit(threadflow_t *threadflow, flowop_t *flowop)
9975184Sek110237 {
9985184Sek110237 	/* Immediately bail if not set/enabled */
9996391Saw148015 	if (filebench_shm->shm_eventgen_hz == 0)
10006084Saw148015 		return (FILEBENCH_OK);
10015184Sek110237 
10025184Sek110237 	if (flowop->fo_initted == 0) {
10035184Sek110237 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
10045184Sek110237 		    flowop, threadflow->tf_name, threadflow->tf_instance);
10055184Sek110237 		flowop->fo_initted = 1;
10065184Sek110237 	}
10075184Sek110237 
10085184Sek110237 	flowop_beginop(threadflow, flowop);
10096391Saw148015 	while (filebench_shm->shm_eventgen_hz) {
10106391Saw148015 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
10116391Saw148015 		if (filebench_shm->shm_eventgen_q > 0) {
10126391Saw148015 			filebench_shm->shm_eventgen_q--;
10136391Saw148015 			(void) ipc_mutex_unlock(
10146391Saw148015 			    &filebench_shm->shm_eventgen_lock);
10155184Sek110237 			break;
10165184Sek110237 		}
10176391Saw148015 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
10186391Saw148015 		    &filebench_shm->shm_eventgen_lock);
10196391Saw148015 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
10205184Sek110237 	}
10215673Saw148015 	flowop_endop(threadflow, flowop, 0);
10226084Saw148015 	return (FILEBENCH_OK);
10235184Sek110237 }
10245184Sek110237 
10256701Saw148015 static int
10266701Saw148015 flowoplib_event_find_target(threadflow_t *threadflow, flowop_t *flowop)
10276701Saw148015 {
10286701Saw148015 	if (flowop->fo_targetname[0] != '\0') {
10296701Saw148015 
10306701Saw148015 		/* Try to use statistics from specific flowop */
10316701Saw148015 		flowop->fo_targets =
10326701Saw148015 		    flowop_find_from_list(flowop->fo_targetname,
10336701Saw148015 		    threadflow->tf_thrd_fops);
10346701Saw148015 		if (flowop->fo_targets == NULL) {
10356701Saw148015 			filebench_log(LOG_ERROR,
10366701Saw148015 			    "limit target: could not find flowop %s",
10376701Saw148015 			    flowop->fo_targetname);
10386701Saw148015 			filebench_shutdown(1);
10396701Saw148015 			return (FILEBENCH_ERROR);
10406701Saw148015 		}
10416701Saw148015 	} else {
10426701Saw148015 		/* use total workload statistics */
10436701Saw148015 		flowop->fo_targets = NULL;
10446701Saw148015 	}
10456701Saw148015 	return (FILEBENCH_OK);
10466701Saw148015 }
10476701Saw148015 
10485184Sek110237 /*
10495184Sek110237  * Blocks the calling thread if the number of issued I/O
10505184Sek110237  * operations exceeds the number of posted events, thus
10515184Sek110237  * limiting the average I/O operation rate to the rate
10526084Saw148015  * specified by eventgen_hz. Always returns FILEBENCH_OK.
10535184Sek110237  */
10545184Sek110237 static int
10555184Sek110237 flowoplib_iopslimit(threadflow_t *threadflow, flowop_t *flowop)
10565184Sek110237 {
10575184Sek110237 	uint64_t iops;
10585184Sek110237 	uint64_t delta;
10595673Saw148015 	uint64_t events;
10605184Sek110237 
10615184Sek110237 	/* Immediately bail if not set/enabled */
10626391Saw148015 	if (filebench_shm->shm_eventgen_hz == 0)
10636084Saw148015 		return (FILEBENCH_OK);
10645184Sek110237 
10655184Sek110237 	if (flowop->fo_initted == 0) {
10665184Sek110237 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
10675184Sek110237 		    flowop, threadflow->tf_name, threadflow->tf_instance);
10685184Sek110237 		flowop->fo_initted = 1;
10696701Saw148015 
10706701Saw148015 		if (flowoplib_event_find_target(threadflow, flowop)
10716701Saw148015 		    == FILEBENCH_ERROR)
10726701Saw148015 			return (FILEBENCH_ERROR);
10736701Saw148015 
10746701Saw148015 		if (flowop->fo_targets && ((flowop->fo_targets->fo_attrs &
10756701Saw148015 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
10766701Saw148015 			filebench_log(LOG_ERROR,
10776701Saw148015 			    "WARNING: Flowop %s does no IO",
10786701Saw148015 			    flowop->fo_targets->fo_name);
10796701Saw148015 			filebench_shutdown(1);
10806701Saw148015 			return (FILEBENCH_ERROR);
10816701Saw148015 		}
10825184Sek110237 	}
10835184Sek110237 
10846701Saw148015 	if (flowop->fo_targets) {
10856701Saw148015 		/*
10866701Saw148015 		 * Note that fs_count is already the sum of fs_rcount
10876701Saw148015 		 * and fs_wcount if looking at a single flowop.
10886701Saw148015 		 */
10896701Saw148015 		iops = flowop->fo_targets->fo_stats.fs_count;
10906701Saw148015 	} else {
10916701Saw148015 		(void) ipc_mutex_lock(&controlstats_lock);
10926701Saw148015 		iops = (controlstats.fs_rcount +
10936701Saw148015 		    controlstats.fs_wcount);
10946701Saw148015 		(void) ipc_mutex_unlock(&controlstats_lock);
10956701Saw148015 	}
10965184Sek110237 
10975184Sek110237 	/* Is this the first time around */
10985184Sek110237 	if (flowop->fo_tputlast == 0) {
10995184Sek110237 		flowop->fo_tputlast = iops;
11006084Saw148015 		return (FILEBENCH_OK);
11015184Sek110237 	}
11025184Sek110237 
11035184Sek110237 	delta = iops - flowop->fo_tputlast;
11045184Sek110237 	flowop->fo_tputbucket -= delta;
11055184Sek110237 	flowop->fo_tputlast = iops;
11065184Sek110237 
11075184Sek110237 	/* No need to block if the q isn't empty */
11085184Sek110237 	if (flowop->fo_tputbucket >= 0LL) {
11095673Saw148015 		flowop_endop(threadflow, flowop, 0);
11106084Saw148015 		return (FILEBENCH_OK);
11115184Sek110237 	}
11125184Sek110237 
11135184Sek110237 	iops = flowop->fo_tputbucket * -1;
11145184Sek110237 	events = iops;
11155184Sek110237 
11165184Sek110237 	flowop_beginop(threadflow, flowop);
11176391Saw148015 	while (filebench_shm->shm_eventgen_hz) {
11185184Sek110237 
11196391Saw148015 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
11206391Saw148015 		if (filebench_shm->shm_eventgen_q >= events) {
11216391Saw148015 			filebench_shm->shm_eventgen_q -= events;
11226391Saw148015 			(void) ipc_mutex_unlock(
11236391Saw148015 			    &filebench_shm->shm_eventgen_lock);
11245184Sek110237 			flowop->fo_tputbucket += events;
11255184Sek110237 			break;
11265184Sek110237 		}
11276391Saw148015 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
11286391Saw148015 		    &filebench_shm->shm_eventgen_lock);
11296391Saw148015 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
11305184Sek110237 	}
11315673Saw148015 	flowop_endop(threadflow, flowop, 0);
11325184Sek110237 
11336084Saw148015 	return (FILEBENCH_OK);
11345184Sek110237 }
11355184Sek110237 
11365184Sek110237 /*
11375184Sek110237  * Blocks the calling thread if the number of issued filebench
11385184Sek110237  * operations exceeds the number of posted events, thus limiting
11395184Sek110237  * the average filebench operation rate to the rate specified by
11406084Saw148015  * eventgen_hz. Always returns FILEBENCH_OK.
11415184Sek110237  */
11425184Sek110237 static int
11435184Sek110237 flowoplib_opslimit(threadflow_t *threadflow, flowop_t *flowop)
11445184Sek110237 {
11455184Sek110237 	uint64_t ops;
11465184Sek110237 	uint64_t delta;
11475673Saw148015 	uint64_t events;
11485184Sek110237 
11495184Sek110237 	/* Immediately bail if not set/enabled */
11506391Saw148015 	if (filebench_shm->shm_eventgen_hz == 0)
11516084Saw148015 		return (FILEBENCH_OK);
11525184Sek110237 
11535184Sek110237 	if (flowop->fo_initted == 0) {
11545184Sek110237 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
11555184Sek110237 		    flowop, threadflow->tf_name, threadflow->tf_instance);
11565184Sek110237 		flowop->fo_initted = 1;
11576701Saw148015 
11586701Saw148015 		if (flowoplib_event_find_target(threadflow, flowop)
11596701Saw148015 		    == FILEBENCH_ERROR)
11606701Saw148015 			return (FILEBENCH_ERROR);
11615184Sek110237 	}
11625184Sek110237 
11636701Saw148015 	if (flowop->fo_targets) {
11646701Saw148015 		ops = flowop->fo_targets->fo_stats.fs_count;
11656701Saw148015 	} else {
11666701Saw148015 		(void) ipc_mutex_lock(&controlstats_lock);
11676701Saw148015 		ops = controlstats.fs_count;
11686701Saw148015 		(void) ipc_mutex_unlock(&controlstats_lock);
11696701Saw148015 	}
11705184Sek110237 
11715184Sek110237 	/* Is this the first time around */
11725184Sek110237 	if (flowop->fo_tputlast == 0) {
11735184Sek110237 		flowop->fo_tputlast = ops;
11746084Saw148015 		return (FILEBENCH_OK);
11755184Sek110237 	}
11765184Sek110237 
11775184Sek110237 	delta = ops - flowop->fo_tputlast;
11785184Sek110237 	flowop->fo_tputbucket -= delta;
11795184Sek110237 	flowop->fo_tputlast = ops;
11805184Sek110237 
11815184Sek110237 	/* No need to block if the q isn't empty */
11825184Sek110237 	if (flowop->fo_tputbucket >= 0LL) {
11835673Saw148015 		flowop_endop(threadflow, flowop, 0);
11846084Saw148015 		return (FILEBENCH_OK);
11855184Sek110237 	}
11865184Sek110237 
11875184Sek110237 	ops = flowop->fo_tputbucket * -1;
11885184Sek110237 	events = ops;
11895184Sek110237 
11905184Sek110237 	flowop_beginop(threadflow, flowop);
11916391Saw148015 	while (filebench_shm->shm_eventgen_hz) {
11926391Saw148015 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
11936391Saw148015 		if (filebench_shm->shm_eventgen_q >= events) {
11946391Saw148015 			filebench_shm->shm_eventgen_q -= events;
11956391Saw148015 			(void) ipc_mutex_unlock(
11966391Saw148015 			    &filebench_shm->shm_eventgen_lock);
11975184Sek110237 			flowop->fo_tputbucket += events;
11985184Sek110237 			break;
11995184Sek110237 		}
12006391Saw148015 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
12016391Saw148015 		    &filebench_shm->shm_eventgen_lock);
12026391Saw148015 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
12035184Sek110237 	}
12045673Saw148015 	flowop_endop(threadflow, flowop, 0);
12055184Sek110237 
12066084Saw148015 	return (FILEBENCH_OK);
12075184Sek110237 }
12085184Sek110237 
12095184Sek110237 
12105184Sek110237 /*
12115184Sek110237  * Blocks the calling thread if the number of bytes of I/O
12125184Sek110237  * issued exceeds one megabyte times the number of posted
12135184Sek110237  * events, thus limiting the average I/O byte rate to one
12145184Sek110237  * megabyte times the event rate as set by eventgen_hz.
12156084Saw148015  * Always retuns FILEBENCH_OK.
12165184Sek110237  */
12175184Sek110237 static int
12185184Sek110237 flowoplib_bwlimit(threadflow_t *threadflow, flowop_t *flowop)
12195184Sek110237 {
12205184Sek110237 	uint64_t bytes;
12215184Sek110237 	uint64_t delta;
12225673Saw148015 	uint64_t events;
12235184Sek110237 
12245184Sek110237 	/* Immediately bail if not set/enabled */
12256391Saw148015 	if (filebench_shm->shm_eventgen_hz == 0)
12266084Saw148015 		return (FILEBENCH_OK);
12275184Sek110237 
12285184Sek110237 	if (flowop->fo_initted == 0) {
12295184Sek110237 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
12305184Sek110237 		    flowop, threadflow->tf_name, threadflow->tf_instance);
12315184Sek110237 		flowop->fo_initted = 1;
12326701Saw148015 
12336701Saw148015 		if (flowoplib_event_find_target(threadflow, flowop)
12346701Saw148015 		    == FILEBENCH_ERROR)
12356701Saw148015 			return (FILEBENCH_ERROR);
12366701Saw148015 
12376701Saw148015 		if ((flowop->fo_targets) &&
12386701Saw148015 		    ((flowop->fo_targets->fo_attrs &
12396701Saw148015 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
12406701Saw148015 			filebench_log(LOG_ERROR,
12416701Saw148015 			    "WARNING: Flowop %s does no Reads or Writes",
12426701Saw148015 			    flowop->fo_targets->fo_name);
12436701Saw148015 			filebench_shutdown(1);
12446701Saw148015 			return (FILEBENCH_ERROR);
12456701Saw148015 		}
12465184Sek110237 	}
12475184Sek110237 
12486701Saw148015 	if (flowop->fo_targets) {
12496701Saw148015 		/*
12506701Saw148015 		 * Note that fs_bytes is already the sum of fs_rbytes
12516701Saw148015 		 * and fs_wbytes if looking at a single flowop.
12526701Saw148015 		 */
12536701Saw148015 		bytes = flowop->fo_targets->fo_stats.fs_bytes;
12546701Saw148015 	} else {
12556701Saw148015 		(void) ipc_mutex_lock(&controlstats_lock);
12566701Saw148015 		bytes = (controlstats.fs_rbytes +
12576701Saw148015 		    controlstats.fs_wbytes);
12586701Saw148015 		(void) ipc_mutex_unlock(&controlstats_lock);
12596701Saw148015 	}
12606701Saw148015 
12616701Saw148015 	/* Is this the first time around? */
12625184Sek110237 	if (flowop->fo_tputlast == 0) {
12635184Sek110237 		flowop->fo_tputlast = bytes;
12646084Saw148015 		return (FILEBENCH_OK);
12655184Sek110237 	}
12665184Sek110237 
12675184Sek110237 	delta = bytes - flowop->fo_tputlast;
12685184Sek110237 	flowop->fo_tputbucket -= delta;
12695184Sek110237 	flowop->fo_tputlast = bytes;
12705184Sek110237 
12715184Sek110237 	/* No need to block if the q isn't empty */
12725184Sek110237 	if (flowop->fo_tputbucket >= 0LL) {
12735673Saw148015 		flowop_endop(threadflow, flowop, 0);
12746084Saw148015 		return (FILEBENCH_OK);
12755184Sek110237 	}
12765184Sek110237 
12775184Sek110237 	bytes = flowop->fo_tputbucket * -1;
12785184Sek110237 	events = (bytes / MB) + 1;
12795184Sek110237 
12806286Saw148015 	filebench_log(LOG_DEBUG_IMPL, "%llu bytes, %llu events",
12816286Saw148015 	    (u_longlong_t)bytes, (u_longlong_t)events);
12825184Sek110237 
12835184Sek110237 	flowop_beginop(threadflow, flowop);
12846391Saw148015 	while (filebench_shm->shm_eventgen_hz) {
12856391Saw148015 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
12866391Saw148015 		if (filebench_shm->shm_eventgen_q >= events) {
12876391Saw148015 			filebench_shm->shm_eventgen_q -= events;
12886391Saw148015 			(void) ipc_mutex_unlock(
12896391Saw148015 			    &filebench_shm->shm_eventgen_lock);
12905184Sek110237 			flowop->fo_tputbucket += (events * MB);
12915184Sek110237 			break;
12925184Sek110237 		}
12936391Saw148015 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
12946391Saw148015 		    &filebench_shm->shm_eventgen_lock);
12956391Saw148015 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
12965184Sek110237 	}
12975673Saw148015 	flowop_endop(threadflow, flowop, 0);
12985184Sek110237 
12996084Saw148015 	return (FILEBENCH_OK);
13005184Sek110237 }
13015184Sek110237 
13025184Sek110237 /*
13035184Sek110237  * These flowops terminate a benchmark run when either the specified
13045184Sek110237  * number of bytes of I/O (flowoplib_finishonbytes) or the specified
13055184Sek110237  * number of I/O operations (flowoplib_finishoncount) have been generated.
13065184Sek110237  */
13075184Sek110237 
13085184Sek110237 
13095184Sek110237 /*
13105184Sek110237  * Stop filebench run when specified number of I/O bytes have been
13116212Saw148015  * transferred. Compares controlstats.fs_bytes with flowop->value,
13125184Sek110237  * and if greater returns 1, stopping the run, if not, returns 0
13135184Sek110237  * to continue running.
13145184Sek110237  */
13155184Sek110237 static int
13165184Sek110237 flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop)
13175184Sek110237 {
13186701Saw148015 	uint64_t bytes_io;		/* Bytes of I/O delivered so far */
13196701Saw148015 	uint64_t byte_lim = flowop->fo_constvalue;  /* Total Bytes desired */
13206701Saw148015 						    /* Uses constant value */
13216701Saw148015 
13226701Saw148015 	if (flowop->fo_initted == 0) {
13236701Saw148015 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
13246701Saw148015 		    flowop, threadflow->tf_name, threadflow->tf_instance);
13256701Saw148015 		flowop->fo_initted = 1;
13266701Saw148015 
13276701Saw148015 		if (flowoplib_event_find_target(threadflow, flowop)
13286701Saw148015 		    == FILEBENCH_ERROR)
13296701Saw148015 			return (FILEBENCH_ERROR);
13306701Saw148015 
13316701Saw148015 		if ((flowop->fo_targets) &&
13326701Saw148015 		    ((flowop->fo_targets->fo_attrs &
13336701Saw148015 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
13346701Saw148015 			filebench_log(LOG_ERROR,
13356701Saw148015 			    "WARNING: Flowop %s does no Reads or Writes",
13366701Saw148015 			    flowop->fo_targets->fo_name);
13376701Saw148015 			filebench_shutdown(1);
13386701Saw148015 			return (FILEBENCH_ERROR);
13396701Saw148015 		}
13406701Saw148015 	}
13416701Saw148015 
13426701Saw148015 	if (flowop->fo_targets) {
13436701Saw148015 		bytes_io = flowop->fo_targets->fo_stats.fs_bytes;
13446701Saw148015 	} else {
13456701Saw148015 		(void) ipc_mutex_lock(&controlstats_lock);
13466701Saw148015 		bytes_io = controlstats.fs_bytes;
13476701Saw148015 		(void) ipc_mutex_unlock(&controlstats_lock);
13486701Saw148015 	}
13495184Sek110237 
13505184Sek110237 	flowop_beginop(threadflow, flowop);
13516701Saw148015 	if (bytes_io > byte_lim) {
13525673Saw148015 		flowop_endop(threadflow, flowop, 0);
13536084Saw148015 		return (FILEBENCH_DONE);
13545184Sek110237 	}
13555673Saw148015 	flowop_endop(threadflow, flowop, 0);
13565184Sek110237 
13576084Saw148015 	return (FILEBENCH_OK);
13585184Sek110237 }
13595184Sek110237 
13605184Sek110237 /*
13615184Sek110237  * Stop filebench run when specified number of I/O operations have
13625184Sek110237  * been performed. Compares controlstats.fs_count with *flowop->value,
13636084Saw148015  * and if greater returns 1, stopping the run, if not, returns FILEBENCH_OK
13646084Saw148015  * to continue running.
13655184Sek110237  */
13665184Sek110237 static int
13675184Sek110237 flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop)
13685184Sek110237 {
13695184Sek110237 	uint64_t ops;
13706212Saw148015 	uint64_t count = flowop->fo_constvalue; /* use constant value */
13715184Sek110237 
13726701Saw148015 	if (flowop->fo_initted == 0) {
13736701Saw148015 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
13746701Saw148015 		    flowop, threadflow->tf_name, threadflow->tf_instance);
13756701Saw148015 		flowop->fo_initted = 1;
13766701Saw148015 
13776701Saw148015 		if (flowoplib_event_find_target(threadflow, flowop)
13786701Saw148015 		    == FILEBENCH_ERROR)
13796701Saw148015 			return (FILEBENCH_ERROR);
13806701Saw148015 	}
13816701Saw148015 
13826701Saw148015 	if (flowop->fo_targets) {
13836701Saw148015 		ops = flowop->fo_targets->fo_stats.fs_count;
13846701Saw148015 	} else {
13856701Saw148015 		(void) ipc_mutex_lock(&controlstats_lock);
13866701Saw148015 		ops = controlstats.fs_count;
13876701Saw148015 		(void) ipc_mutex_unlock(&controlstats_lock);
13886701Saw148015 	}
13895184Sek110237 
13905184Sek110237 	flowop_beginop(threadflow, flowop);
13916084Saw148015 	if (ops >= count) {
13925673Saw148015 		flowop_endop(threadflow, flowop, 0);
13936084Saw148015 		return (FILEBENCH_DONE);
13945184Sek110237 	}
13955673Saw148015 	flowop_endop(threadflow, flowop, 0);
13965184Sek110237 
13976084Saw148015 	return (FILEBENCH_OK);
13985184Sek110237 }
13995184Sek110237 
14005184Sek110237 /*
14015184Sek110237  * Semaphore synchronization using either System V semaphores or
14025184Sek110237  * posix semaphores. If System V semaphores are available, they will be
14035184Sek110237  * used, otherwise posix semaphores will be used.
14045184Sek110237  */
14055184Sek110237 
14065184Sek110237 
14075184Sek110237 /*
14085184Sek110237  * Initializes the filebench "block on semaphore" flowop.
14095184Sek110237  * If System V semaphores are implemented, the routine
14105184Sek110237  * initializes the System V semaphore subsystem if it hasn't
14115184Sek110237  * already been initialized, also allocates a pair of semids
14125184Sek110237  * and initializes the highwater System V semaphore.
14135184Sek110237  * If no System V semaphores, then does nothing special.
14146084Saw148015  * Returns FILEBENCH_ERROR if it cannot acquire a set of System V semphores
14156084Saw148015  * or if the initial post to the semaphore set fails. Returns FILEBENCH_OK
14165184Sek110237  * on success.
14175184Sek110237  */
14185184Sek110237 static int
14195184Sek110237 flowoplib_semblock_init(flowop_t *flowop)
14205184Sek110237 {
14215184Sek110237 
14225184Sek110237 #ifdef HAVE_SYSV_SEM
14236391Saw148015 	int sys_semid;
14245184Sek110237 	struct sembuf sbuf[2];
14255184Sek110237 	int highwater;
14265184Sek110237 
14275184Sek110237 	ipc_seminit();
14285184Sek110237 
14295184Sek110237 	flowop->fo_semid_lw = ipc_semidalloc();
14305184Sek110237 	flowop->fo_semid_hw = ipc_semidalloc();
14315184Sek110237 
14325184Sek110237 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d semblock init semid=%x",
14335184Sek110237 	    flowop->fo_name, flowop->fo_instance, flowop->fo_semid_lw);
14345184Sek110237 
14356391Saw148015 	sys_semid = filebench_shm->shm_sys_semid;
14365184Sek110237 
14375184Sek110237 	if ((highwater = flowop->fo_semid_hw) == 0)
14386212Saw148015 		highwater = flowop->fo_constvalue; /* use constant value */
14395184Sek110237 
14405184Sek110237 	filebench_log(LOG_DEBUG_IMPL, "setting highwater to : %d", highwater);
14415184Sek110237 
14425673Saw148015 	sbuf[0].sem_num = (short)highwater;
14436212Saw148015 	sbuf[0].sem_op = avd_get_int(flowop->fo_highwater);
14445184Sek110237 	sbuf[0].sem_flg = 0;
14456391Saw148015 	if ((semop(sys_semid, &sbuf[0], 1) == -1) && errno) {
14465184Sek110237 		filebench_log(LOG_ERROR, "semblock init post failed: %s (%d,"
14475184Sek110237 		    "%d)", strerror(errno), sbuf[0].sem_num, sbuf[0].sem_op);
14486084Saw148015 		return (FILEBENCH_ERROR);
14495184Sek110237 	}
14505184Sek110237 #else
14515184Sek110237 	filebench_log(LOG_DEBUG_IMPL,
14525184Sek110237 	    "flow %s-%d semblock init with posix semaphore",
14535184Sek110237 	    flowop->fo_name, flowop->fo_instance);
14545184Sek110237 
14555184Sek110237 	sem_init(&flowop->fo_sem, 1, 0);
14565184Sek110237 #endif	/* HAVE_SYSV_SEM */
14575184Sek110237 
14586212Saw148015 	if (!(avd_get_bool(flowop->fo_blocking)))
14595184Sek110237 		(void) ipc_mutex_unlock(&flowop->fo_lock);
14605184Sek110237 
14616084Saw148015 	return (FILEBENCH_OK);
14625184Sek110237 }
14635184Sek110237 
14645184Sek110237 /*
14655184Sek110237  * Releases the semids for the System V semaphore allocated
14665184Sek110237  * to this flowop. If not using System V semaphores, then
14676084Saw148015  * it is effectively just a no-op.
14685184Sek110237  */
14695184Sek110237 static void
14705184Sek110237 flowoplib_semblock_destruct(flowop_t *flowop)
14715184Sek110237 {
14725184Sek110237 #ifdef HAVE_SYSV_SEM
14735184Sek110237 	ipc_semidfree(flowop->fo_semid_lw);
14745184Sek110237 	ipc_semidfree(flowop->fo_semid_hw);
14755184Sek110237 #else
14765184Sek110237 	sem_destroy(&flowop->fo_sem);
14775184Sek110237 #endif /* HAVE_SYSV_SEM */
14785184Sek110237 }
14795184Sek110237 
14805184Sek110237 /*
14815184Sek110237  * Attempts to pass a System V or posix semaphore as appropriate,
14826084Saw148015  * and blocks if necessary. Returns FILEBENCH_ERROR if a set of System V
14835184Sek110237  * semphores is not available or cannot be acquired, or if the initial
14846084Saw148015  * post to the semaphore set fails. Returns FILEBENCH_OK on success.
14855184Sek110237  */
14865184Sek110237 static int
14875184Sek110237 flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop)
14885184Sek110237 {
14895184Sek110237 
14905184Sek110237 #ifdef HAVE_SYSV_SEM
14915184Sek110237 	struct sembuf sbuf[2];
14926212Saw148015 	int value = avd_get_int(flowop->fo_value);
14936391Saw148015 	int sys_semid;
14945184Sek110237 	struct timespec timeout;
14955184Sek110237 
14966391Saw148015 	sys_semid = filebench_shm->shm_sys_semid;
14975184Sek110237 
14985184Sek110237 	filebench_log(LOG_DEBUG_IMPL,
14995184Sek110237 	    "flow %s-%d sem blocking on id %x num %x value %d",
15006391Saw148015 	    flowop->fo_name, flowop->fo_instance, sys_semid,
15015184Sek110237 	    flowop->fo_semid_hw, value);
15025184Sek110237 
15035184Sek110237 	/* Post, decrement the increment the hw queue */
15045184Sek110237 	sbuf[0].sem_num = flowop->fo_semid_hw;
15055673Saw148015 	sbuf[0].sem_op = (short)value;
15065184Sek110237 	sbuf[0].sem_flg = 0;
15075184Sek110237 	sbuf[1].sem_num = flowop->fo_semid_lw;
15085184Sek110237 	sbuf[1].sem_op = value * -1;
15095184Sek110237 	sbuf[1].sem_flg = 0;
15105184Sek110237 	timeout.tv_sec = 600;
15115184Sek110237 	timeout.tv_nsec = 0;
15125184Sek110237 
15136212Saw148015 	if (avd_get_bool(flowop->fo_blocking))
15145184Sek110237 		(void) ipc_mutex_unlock(&flowop->fo_lock);
15155184Sek110237 
15165184Sek110237 	flowop_beginop(threadflow, flowop);
15175184Sek110237 
15185184Sek110237 #ifdef HAVE_SEMTIMEDOP
15196391Saw148015 	(void) semtimedop(sys_semid, &sbuf[0], 1, &timeout);
15206391Saw148015 	(void) semtimedop(sys_semid, &sbuf[1], 1, &timeout);
15215184Sek110237 #else
15226391Saw148015 	(void) semop(sys_semid, &sbuf[0], 1);
15236391Saw148015 	(void) semop(sys_semid, &sbuf[1], 1);
15245184Sek110237 #endif /* HAVE_SEMTIMEDOP */
15255184Sek110237 
15266212Saw148015 	if (avd_get_bool(flowop->fo_blocking))
15275184Sek110237 		(void) ipc_mutex_lock(&flowop->fo_lock);
15285184Sek110237 
15295673Saw148015 	flowop_endop(threadflow, flowop, 0);
15305184Sek110237 
15315184Sek110237 #else
15326212Saw148015 	int value = avd_get_int(flowop->fo_value);
15335184Sek110237 	int i;
15345184Sek110237 
15355184Sek110237 	filebench_log(LOG_DEBUG_IMPL,
15365184Sek110237 	    "flow %s-%d sem blocking on posix semaphore",
15375184Sek110237 	    flowop->fo_name, flowop->fo_instance);
15385184Sek110237 
15395184Sek110237 	/* Decrement sem by value */
15405184Sek110237 	for (i = 0; i < value; i++) {
15415184Sek110237 		if (sem_wait(&flowop->fo_sem) == -1) {
15425184Sek110237 			filebench_log(LOG_ERROR, "semop wait failed");
15436084Saw148015 			return (FILEBENCH_ERROR);
15445184Sek110237 		}
15455184Sek110237 	}
15465184Sek110237 
15475184Sek110237 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d sem unblocking",
15485184Sek110237 	    flowop->fo_name, flowop->fo_instance);
15495184Sek110237 #endif /* HAVE_SYSV_SEM */
15505184Sek110237 
15516084Saw148015 	return (FILEBENCH_OK);
15525184Sek110237 }
15535184Sek110237 
15545184Sek110237 /*
15556084Saw148015  * Calls ipc_seminit(). Always returns FILEBENCH_OK.
15565184Sek110237  */
15575184Sek110237 /* ARGSUSED */
15585184Sek110237 static int
15595184Sek110237 flowoplib_sempost_init(flowop_t *flowop)
15605184Sek110237 {
15615184Sek110237 #ifdef HAVE_SYSV_SEM
15625184Sek110237 	ipc_seminit();
15635184Sek110237 #endif /* HAVE_SYSV_SEM */
15646084Saw148015 	return (FILEBENCH_OK);
15655184Sek110237 }
15665184Sek110237 
15675184Sek110237 /*
15685184Sek110237  * Post to a System V or posix semaphore as appropriate.
15695184Sek110237  * On the first call for a given flowop instance, this routine
15705184Sek110237  * will use the fo_targetname attribute to locate all semblock
15715184Sek110237  * flowops that are expecting posts from this flowop. All
15725184Sek110237  * target flowops on this list will have a post operation done
15735184Sek110237  * to their semaphores on each call.
15745184Sek110237  */
15755184Sek110237 static int
15765184Sek110237 flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop)
15775184Sek110237 {
15785184Sek110237 	flowop_t *target;
15795184Sek110237 
15805184Sek110237 	filebench_log(LOG_DEBUG_IMPL,
15815184Sek110237 	    "sempost flow %s-%d",
15825184Sek110237 	    flowop->fo_name,
15835184Sek110237 	    flowop->fo_instance);
15845184Sek110237 
15855184Sek110237 	/* if this is the first post, create the post list */
15865184Sek110237 	if (flowop->fo_targets == NULL) {
15875184Sek110237 		flowop_t *result = flowop_find(flowop->fo_targetname);
15885184Sek110237 
15895184Sek110237 		flowop->fo_targets = result;
15905184Sek110237 
15915184Sek110237 		if (result == NULL) {
15925184Sek110237 			filebench_log(LOG_ERROR,
15935184Sek110237 			    "sempost: could not find op %s for thread %s",
15945184Sek110237 			    flowop->fo_targetname,
15955184Sek110237 			    threadflow->tf_name);
15965184Sek110237 			filebench_shutdown(1);
15975184Sek110237 		}
15985184Sek110237 
15995184Sek110237 		while (result) {
16005184Sek110237 			result->fo_targetnext =
16015184Sek110237 			    result->fo_resultnext;
16025184Sek110237 			result = result->fo_resultnext;
16035184Sek110237 		}
16045184Sek110237 	}
16055184Sek110237 
16065184Sek110237 	target = flowop->fo_targets;
16075184Sek110237 
16085184Sek110237 	flowop_beginop(threadflow, flowop);
16095184Sek110237 	/* post to the targets */
16105184Sek110237 	while (target) {
16115184Sek110237 #ifdef HAVE_SYSV_SEM
16125184Sek110237 		struct sembuf sbuf[2];
16136391Saw148015 		int sys_semid;
16145184Sek110237 		int blocking;
16155184Sek110237 #else
16165184Sek110237 		int i;
16175184Sek110237 #endif /* HAVE_SYSV_SEM */
16185184Sek110237 		struct timespec timeout;
16196550Saw148015 		int value = (int)avd_get_int(flowop->fo_value);
16205184Sek110237 
16215184Sek110237 		if (target->fo_instance == FLOW_MASTER) {
16225184Sek110237 			target = target->fo_targetnext;
16235184Sek110237 			continue;
16245184Sek110237 		}
16255184Sek110237 
16265184Sek110237 #ifdef HAVE_SYSV_SEM
16275184Sek110237 
16285184Sek110237 		filebench_log(LOG_DEBUG_IMPL,
16295184Sek110237 		    "sempost flow %s-%d num %x",
16305184Sek110237 		    target->fo_name,
16315184Sek110237 		    target->fo_instance,
16325184Sek110237 		    target->fo_semid_lw);
16335184Sek110237 
16346391Saw148015 		sys_semid = filebench_shm->shm_sys_semid;
16355184Sek110237 		sbuf[0].sem_num = target->fo_semid_lw;
16365673Saw148015 		sbuf[0].sem_op = (short)value;
16375184Sek110237 		sbuf[0].sem_flg = 0;
16385184Sek110237 		sbuf[1].sem_num = target->fo_semid_hw;
16395184Sek110237 		sbuf[1].sem_op = value * -1;
16405184Sek110237 		sbuf[1].sem_flg = 0;
16415184Sek110237 		timeout.tv_sec = 600;
16425184Sek110237 		timeout.tv_nsec = 0;
16435184Sek110237 
16446212Saw148015 		if (avd_get_bool(flowop->fo_blocking))
16455184Sek110237 			blocking = 1;
16465184Sek110237 		else
16475184Sek110237 			blocking = 0;
16485184Sek110237 
16495184Sek110237 #ifdef HAVE_SEMTIMEDOP
16506391Saw148015 		if ((semtimedop(sys_semid, &sbuf[0], blocking + 1,
16515184Sek110237 		    &timeout) == -1) && (errno && (errno != EAGAIN))) {
16525184Sek110237 #else
16536391Saw148015 		if ((semop(sys_semid, &sbuf[0], blocking + 1) == -1) &&
16545184Sek110237 		    (errno && (errno != EAGAIN))) {
16555184Sek110237 #endif /* HAVE_SEMTIMEDOP */
16565184Sek110237 			filebench_log(LOG_ERROR, "semop post failed: %s",
16575184Sek110237 			    strerror(errno));
16586084Saw148015 			return (FILEBENCH_ERROR);
16595184Sek110237 		}
16605184Sek110237 
16615184Sek110237 		filebench_log(LOG_DEBUG_IMPL,
16625184Sek110237 		    "flow %s-%d finished posting",
16635184Sek110237 		    target->fo_name, target->fo_instance);
16645184Sek110237 #else
16655184Sek110237 		filebench_log(LOG_DEBUG_IMPL,
16665184Sek110237 		    "sempost flow %s-%d to posix semaphore",
16675184Sek110237 		    target->fo_name,
16685184Sek110237 		    target->fo_instance);
16695184Sek110237 
16705184Sek110237 		/* Increment sem by value */
16715184Sek110237 		for (i = 0; i < value; i++) {
16725184Sek110237 			if (sem_post(&target->fo_sem) == -1) {
16735184Sek110237 				filebench_log(LOG_ERROR, "semop post failed");
16746084Saw148015 				return (FILEBENCH_ERROR);
16755184Sek110237 			}
16765184Sek110237 		}
16775184Sek110237 
16785184Sek110237 		filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
16795184Sek110237 		    target->fo_name, target->fo_instance);
16805184Sek110237 #endif /* HAVE_SYSV_SEM */
16815184Sek110237 
16825184Sek110237 		target = target->fo_targetnext;
16835184Sek110237 	}
16845673Saw148015 	flowop_endop(threadflow, flowop, 0);
16855184Sek110237 
16866084Saw148015 	return (FILEBENCH_OK);
16875184Sek110237 }
16885184Sek110237 
16895184Sek110237 
16905184Sek110237 /*
16915184Sek110237  * Section for exercising create / open / close / delete operations
16925184Sek110237  * on files within a fileset. For proper operation, the flowop attribute
16935184Sek110237  * "fd", which sets the fo_fdnumber field in the flowop, must be used
16945184Sek110237  * so that the same file is opened and later closed. "fd" is an index
16955184Sek110237  * into a pair of arrays maintained by threadflows, one of which
16965184Sek110237  * contains the operating system assigned file descriptors and the other
16975184Sek110237  * a pointer to the filesetentry whose file the file descriptor
16985184Sek110237  * references. An openfile flowop defined without fd being set will use
16995184Sek110237  * the default (0) fd or, if specified, rotate through fd indices, but
17005184Sek110237  * createfile and closefile must use the default or a specified fd.
17015184Sek110237  * Meanwhile deletefile picks and arbitrary file to delete, regardless
17025184Sek110237  * of fd attribute.
17035184Sek110237  */
17045184Sek110237 
17055184Sek110237 /*
17065184Sek110237  * XXX Making file selection more consistent among the flowops might good
17075184Sek110237  */
17085184Sek110237 
17095184Sek110237 
17105184Sek110237 /*
17115184Sek110237  * Emulates (and actually does) file open. Obtains a file descriptor
17126084Saw148015  * index, then calls flowoplib_openfile_common() to open. Returns
17136084Saw148015  * FILEBENCH_ERROR if no file descriptor is found, and returns the
17146084Saw148015  * status from flowoplib_openfile_common otherwise (FILEBENCH_ERROR,
17156084Saw148015  * FILEBENCH_NORSC, FILEBENCH_OK).
17165184Sek110237  */
17175184Sek110237 static int
17185184Sek110237 flowoplib_openfile(threadflow_t *threadflow, flowop_t *flowop)
17195184Sek110237 {
17205184Sek110237 	int fd = flowoplib_fdnum(threadflow, flowop);
17215184Sek110237 
17225184Sek110237 	if (fd == -1)
17236084Saw148015 		return (FILEBENCH_ERROR);
17245184Sek110237 
17255184Sek110237 	return (flowoplib_openfile_common(threadflow, flowop, fd));
17265184Sek110237 }
17275184Sek110237 
17285184Sek110237 /*
17295184Sek110237  * Common file opening code for filesets. Uses the supplied
17305184Sek110237  * file descriptor index to determine the tf_fd entry to use.
17315184Sek110237  * If the entry is empty (0) and the fileset exists, fileset
17325184Sek110237  * pick is called to select a fileset entry to use. The file
17335184Sek110237  * specified in the filesetentry is opened, and the returned
17345184Sek110237  * operating system file descriptor and a pointer to the
17355184Sek110237  * filesetentry are stored in tf_fd[fd] and tf_fse[fd],
17366084Saw148015  * respectively. Returns FILEBENCH_ERROR on error,
17376084Saw148015  * FILEBENCH_NORSC if no suitable filesetentry can be found,
17386084Saw148015  * and FILEBENCH_OK on success.
17395184Sek110237  */
17405184Sek110237 static int
17415184Sek110237 flowoplib_openfile_common(threadflow_t *threadflow, flowop_t *flowop, int fd)
17425184Sek110237 {
17435184Sek110237 	filesetentry_t *file;
17446212Saw148015 	char *fileset_name;
17455184Sek110237 	int tid = 0;
17465184Sek110237 
17476391Saw148015 	if (flowop->fo_fileset == NULL) {
17486391Saw148015 		filebench_log(LOG_ERROR, "flowop NULL file");
17496391Saw148015 		return (FILEBENCH_ERROR);
17506391Saw148015 	}
17516391Saw148015 
17526212Saw148015 	if ((fileset_name =
17536212Saw148015 	    avd_get_str(flowop->fo_fileset->fs_name)) == NULL) {
17546212Saw148015 		filebench_log(LOG_ERROR,
17556212Saw148015 		    "flowop %s: fileset has no name", flowop->fo_name);
17566212Saw148015 		return (FILEBENCH_ERROR);
17576212Saw148015 	}
17586212Saw148015 
17595184Sek110237 	/*
17605184Sek110237 	 * If the flowop doesn't default to persistent fd
17615184Sek110237 	 * then get unique thread ID for use by fileset_pick
17625184Sek110237 	 */
17636212Saw148015 	if (avd_get_bool(flowop->fo_rotatefd))
17645184Sek110237 		tid = threadflow->tf_utid;
17655184Sek110237 
17665184Sek110237 	if (threadflow->tf_fd[fd] != 0) {
17675184Sek110237 		filebench_log(LOG_ERROR,
17685184Sek110237 		    "flowop %s attempted to open without closing on fd %d",
17695184Sek110237 		    flowop->fo_name, fd);
17706084Saw148015 		return (FILEBENCH_ERROR);
17715184Sek110237 	}
17725184Sek110237 
17735673Saw148015 #ifdef HAVE_RAW_SUPPORT
17745673Saw148015 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
17755673Saw148015 		int open_attrs = 0;
17765673Saw148015 		char name[MAXPATHLEN];
17775673Saw148015 
17786212Saw148015 		(void) strcpy(name,
17796212Saw148015 		    avd_get_str(flowop->fo_fileset->fs_path));
17805673Saw148015 		(void) strcat(name, "/");
17816212Saw148015 		(void) strcat(name, fileset_name);
17825673Saw148015 
17836212Saw148015 		if (avd_get_bool(flowop->fo_dsync)) {
17845673Saw148015 #ifdef sun
17855673Saw148015 			open_attrs |= O_DSYNC;
17865673Saw148015 #else
17875673Saw148015 			open_attrs |= O_FSYNC;
17885673Saw148015 #endif
17895673Saw148015 		}
17905673Saw148015 
17915673Saw148015 		filebench_log(LOG_DEBUG_SCRIPT,
17925673Saw148015 		    "open raw device %s flags %d = %d", name, open_attrs, fd);
17935673Saw148015 
17945673Saw148015 		threadflow->tf_fd[fd] = open64(name,
17955673Saw148015 		    O_RDWR | open_attrs, 0666);
17965673Saw148015 
17975673Saw148015 		if (threadflow->tf_fd[fd] < 0) {
17985673Saw148015 			filebench_log(LOG_ERROR,
17995673Saw148015 			    "Failed to open raw device %s: %s",
18005673Saw148015 			    name, strerror(errno));
18016084Saw148015 			return (FILEBENCH_ERROR);
18025673Saw148015 		}
18035673Saw148015 
18045673Saw148015 		/* if running on Solaris, use un-buffered io */
18055673Saw148015 #ifdef sun
18065673Saw148015 		(void) directio(threadflow->tf_fd[fd], DIRECTIO_ON);
18075673Saw148015 #endif
18085673Saw148015 
18095673Saw148015 		threadflow->tf_fse[fd] = NULL;
18105673Saw148015 
18116084Saw148015 		return (FILEBENCH_OK);
18125673Saw148015 	}
18135673Saw148015 #endif /* HAVE_RAW_SUPPORT */
18145673Saw148015 
18155184Sek110237 	if ((file = fileset_pick(flowop->fo_fileset,
18165184Sek110237 	    FILESET_PICKEXISTS, tid)) == NULL) {
18176084Saw148015 		filebench_log(LOG_DEBUG_SCRIPT,
18185184Sek110237 		    "flowop %s failed to pick file from %s on fd %d",
18196212Saw148015 		    flowop->fo_name, fileset_name, fd);
18206084Saw148015 		return (FILEBENCH_NORSC);
18215184Sek110237 	}
18225184Sek110237 
18235184Sek110237 	threadflow->tf_fse[fd] = file;
18245184Sek110237 
18255184Sek110237 	flowop_beginop(threadflow, flowop);
18265184Sek110237 	threadflow->tf_fd[fd] = fileset_openfile(flowop->fo_fileset,
18275184Sek110237 	    file, O_RDWR, 0666, flowoplib_fileattrs(flowop));
18285673Saw148015 	flowop_endop(threadflow, flowop, 0);
18295184Sek110237 
18305184Sek110237 	if (threadflow->tf_fd[fd] < 0) {
18316212Saw148015 		filebench_log(LOG_ERROR, "flowop %s failed to open file %s",
18326212Saw148015 		    flowop->fo_name, file->fse_path);
18336084Saw148015 		return (FILEBENCH_ERROR);
18345184Sek110237 	}
18355184Sek110237 
18365184Sek110237 	filebench_log(LOG_DEBUG_SCRIPT,
18375184Sek110237 	    "flowop %s: opened %s fd[%d] = %d",
18385184Sek110237 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
18395184Sek110237 
18406084Saw148015 	return (FILEBENCH_OK);
18415184Sek110237 }
18425184Sek110237 
18435184Sek110237 /*
18445184Sek110237  * Emulate create of a file. Uses the flowop's fdnumber to select
18455184Sek110237  * tf_fd and tf_fse array locations to put the created file's file
18465184Sek110237  * descriptor and filesetentry respectively. Uses fileset_pick()
18475184Sek110237  * to select a specific filesetentry whose file does not currently
18485184Sek110237  * exist for the file create operation. Then calls
18495184Sek110237  * fileset_openfile() with the O_CREATE flag set to create the
18506084Saw148015  * file. Returns FILEBENCH_ERROR if the array index specified by fdnumber is
18515184Sek110237  * already in use, the flowop has no associated fileset, or
18525184Sek110237  * the create call fails. Returns 1 if a filesetentry with a
18536084Saw148015  * nonexistent file cannot be found. Returns FILEBENCH_OK on success.
18545184Sek110237  */
18555184Sek110237 static int
18565184Sek110237 flowoplib_createfile(threadflow_t *threadflow, flowop_t *flowop)
18575184Sek110237 {
18585184Sek110237 	filesetentry_t *file;
18595184Sek110237 	int fd = flowop->fo_fdnumber;
18605184Sek110237 
18615184Sek110237 	if (threadflow->tf_fd[fd] != 0) {
18625184Sek110237 		filebench_log(LOG_ERROR,
18635184Sek110237 		    "flowop %s attempted to create without closing on fd %d",
18645184Sek110237 		    flowop->fo_name, fd);
18656084Saw148015 		return (FILEBENCH_ERROR);
18665184Sek110237 	}
18675184Sek110237 
18685184Sek110237 	if (flowop->fo_fileset == NULL) {
18695184Sek110237 		filebench_log(LOG_ERROR, "flowop NULL file");
18706084Saw148015 		return (FILEBENCH_ERROR);
18715184Sek110237 	}
18725184Sek110237 
18735673Saw148015 #ifdef HAVE_RAW_SUPPORT
18745673Saw148015 	/* can't be used with raw devices */
18755673Saw148015 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
18765673Saw148015 		filebench_log(LOG_ERROR,
18775673Saw148015 		    "flowop %s attempted to a createfile on RAW device",
18785673Saw148015 		    flowop->fo_name);
18796084Saw148015 		return (FILEBENCH_ERROR);
18805673Saw148015 	}
18815673Saw148015 #endif /* HAVE_RAW_SUPPORT */
18825673Saw148015 
18835184Sek110237 	if ((file = fileset_pick(flowop->fo_fileset,
18845184Sek110237 	    FILESET_PICKNOEXIST, 0)) == NULL) {
18856084Saw148015 		filebench_log(LOG_DEBUG_SCRIPT,
18866084Saw148015 		    "flowop %s failed to pick file from fileset %s",
18876212Saw148015 		    flowop->fo_name,
18886212Saw148015 		    avd_get_str(flowop->fo_fileset->fs_name));
18896084Saw148015 		return (FILEBENCH_NORSC);
18905184Sek110237 	}
18915184Sek110237 
18925184Sek110237 	threadflow->tf_fse[fd] = file;
18935184Sek110237 
18945184Sek110237 	flowop_beginop(threadflow, flowop);
18955184Sek110237 	threadflow->tf_fd[fd] = fileset_openfile(flowop->fo_fileset,
18965184Sek110237 	    file, O_RDWR | O_CREAT, 0666, flowoplib_fileattrs(flowop));
18975673Saw148015 	flowop_endop(threadflow, flowop, 0);
18985184Sek110237 
18995184Sek110237 	if (threadflow->tf_fd[fd] < 0) {
19005184Sek110237 		filebench_log(LOG_ERROR, "failed to create file %s",
19015184Sek110237 		    flowop->fo_name);
19026084Saw148015 		return (FILEBENCH_ERROR);
19035184Sek110237 	}
19045184Sek110237 
19055184Sek110237 	filebench_log(LOG_DEBUG_SCRIPT,
19065184Sek110237 	    "flowop %s: created %s fd[%d] = %d",
19075184Sek110237 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
19085184Sek110237 
19096084Saw148015 	return (FILEBENCH_OK);
19105184Sek110237 }
19115184Sek110237 
19125184Sek110237 /*
19136391Saw148015  * Emulates delete of a file. If a valid fd is provided, it uses the
19146391Saw148015  * filesetentry stored at that fd location to select the file to be
19156391Saw148015  * deleted, otherwise it picks an arbitrary filesetentry
19166391Saw148015  * whose file exists. It then uses unlink() to delete it and Clears
19176084Saw148015  * the FSE_EXISTS flag for the filesetentry. Returns FILEBENCH_ERROR if the
19186084Saw148015  * flowop has no associated fileset. Returns FILEBENCH_NORSC if an appropriate
19196084Saw148015  * filesetentry cannot be found, and FILEBENCH_OK on success.
19205184Sek110237  */
19215184Sek110237 static int
19225184Sek110237 flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop)
19235184Sek110237 {
19245184Sek110237 	filesetentry_t *file;
19255184Sek110237 	fileset_t *fileset;
19265184Sek110237 	char path[MAXPATHLEN];
19275184Sek110237 	char *pathtmp;
19286391Saw148015 	int fd = flowop->fo_fdnumber;
19295184Sek110237 
19306391Saw148015 	/* if fd specified, use it to access file */
19316391Saw148015 	if ((fd > 0) && ((file = threadflow->tf_fse[fd]) != NULL)) {
19326391Saw148015 
19336391Saw148015 		/* check whether file still open */
19346391Saw148015 		if (threadflow->tf_fd[fd] > 0) {
19356391Saw148015 			filebench_log(LOG_DEBUG_SCRIPT,
19366391Saw148015 			    "flowop %s deleting still open file at fd = %d",
19376391Saw148015 			    flowop->fo_name, fd);
19386391Saw148015 		}
19396391Saw148015 
19406391Saw148015 		/* indicate that the file will be deleted */
19416391Saw148015 		threadflow->tf_fse[fd] = NULL;
19426391Saw148015 
19436391Saw148015 		/* if here, we still have a valid file pointer */
19446391Saw148015 		fileset = file->fse_fileset;
19456391Saw148015 	} else {
19466391Saw148015 		/* Otherwise, pick arbitrary file */
19476391Saw148015 		file = NULL;
19486391Saw148015 		fileset = flowop->fo_fileset;
19496391Saw148015 	}
19506391Saw148015 
19516391Saw148015 
19526391Saw148015 	if (fileset == NULL) {
19535184Sek110237 		filebench_log(LOG_ERROR, "flowop NULL file");
19546084Saw148015 		return (FILEBENCH_ERROR);
19555184Sek110237 	}
19565184Sek110237 
19575673Saw148015 #ifdef HAVE_RAW_SUPPORT
19585673Saw148015 	/* can't be used with raw devices */
19596391Saw148015 	if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
19605673Saw148015 		filebench_log(LOG_ERROR,
19615673Saw148015 		    "flowop %s attempted a deletefile on RAW device",
19625673Saw148015 		    flowop->fo_name);
19636084Saw148015 		return (FILEBENCH_ERROR);
19645673Saw148015 	}
19655673Saw148015 #endif /* HAVE_RAW_SUPPORT */
19665673Saw148015 
19676391Saw148015 	if (file == NULL) {
1968*7556SAndrew.W.Wilson@sun.com 		/* pick arbitrary, existing (allocated) file */
19696391Saw148015 		if ((file = fileset_pick(fileset, FILESET_PICKEXISTS, 0))
19706391Saw148015 		    == NULL) {
19716391Saw148015 			filebench_log(LOG_DEBUG_SCRIPT,
19726391Saw148015 			    "flowop %s failed to pick file", flowop->fo_name);
19736391Saw148015 			return (FILEBENCH_NORSC);
19746391Saw148015 		}
19756391Saw148015 	} else {
1976*7556SAndrew.W.Wilson@sun.com 		/* delete specific file. wait for it to be non-busy */
1977*7556SAndrew.W.Wilson@sun.com 		(void) ipc_mutex_lock(&fileset->fs_pick_lock);
1978*7556SAndrew.W.Wilson@sun.com 		while (file->fse_flags & FSE_BUSY) {
1979*7556SAndrew.W.Wilson@sun.com 			file->fse_flags |= FSE_THRD_WAITNG;
1980*7556SAndrew.W.Wilson@sun.com 			(void) pthread_cond_wait(&fileset->fs_thrd_wait_cv,
1981*7556SAndrew.W.Wilson@sun.com 			    &fileset->fs_pick_lock);
1982*7556SAndrew.W.Wilson@sun.com 		}
1983*7556SAndrew.W.Wilson@sun.com 
1984*7556SAndrew.W.Wilson@sun.com 		/* File now available, grab it for deletion */
1985*7556SAndrew.W.Wilson@sun.com 		file->fse_flags |= FSE_BUSY;
1986*7556SAndrew.W.Wilson@sun.com 		fileset->fs_idle_files--;
1987*7556SAndrew.W.Wilson@sun.com 		(void) ipc_mutex_unlock(&fileset->fs_pick_lock);
19885184Sek110237 	}
19895184Sek110237 
19905184Sek110237 	*path = 0;
19916212Saw148015 	(void) strcpy(path, avd_get_str(fileset->fs_path));
19925184Sek110237 	(void) strcat(path, "/");
19936212Saw148015 	(void) strcat(path, avd_get_str(fileset->fs_name));
19945184Sek110237 	pathtmp = fileset_resolvepath(file);
19955184Sek110237 	(void) strcat(path, pathtmp);
19965184Sek110237 	free(pathtmp);
19975184Sek110237 
1998*7556SAndrew.W.Wilson@sun.com 	/* delete the selected file */
19995184Sek110237 	flowop_beginop(threadflow, flowop);
20005184Sek110237 	(void) unlink(path);
20015673Saw148015 	flowop_endop(threadflow, flowop, 0);
2002*7556SAndrew.W.Wilson@sun.com 
2003*7556SAndrew.W.Wilson@sun.com 	/* indicate that it is no longer busy and no longer exists */
2004*7556SAndrew.W.Wilson@sun.com 	fileset_unbusy(file, TRUE, FALSE);
20055184Sek110237 
20065184Sek110237 	filebench_log(LOG_DEBUG_SCRIPT, "deleted file %s", file->fse_path);
20075184Sek110237 
20086084Saw148015 	return (FILEBENCH_OK);
20095184Sek110237 }
20105184Sek110237 
20115184Sek110237 /*
20125184Sek110237  * Emulates fsync of a file. Obtains the file descriptor index
20135184Sek110237  * from the flowop, obtains the actual file descriptor from
20145184Sek110237  * the threadflow's table, checks to be sure it is still an
20156084Saw148015  * open file, then does an fsync operation on it. Returns FILEBENCH_ERROR
20166084Saw148015  * if the file no longer is open, FILEBENCH_OK otherwise.
20175184Sek110237  */
20185184Sek110237 static int
20195184Sek110237 flowoplib_fsync(threadflow_t *threadflow, flowop_t *flowop)
20205184Sek110237 {
20215184Sek110237 	filesetentry_t *file;
20225184Sek110237 	int fd = flowop->fo_fdnumber;
20235184Sek110237 
20245184Sek110237 	if (threadflow->tf_fd[fd] == 0) {
20255184Sek110237 		filebench_log(LOG_ERROR,
20265184Sek110237 		    "flowop %s attempted to fsync a closed fd %d",
20275184Sek110237 		    flowop->fo_name, fd);
20286084Saw148015 		return (FILEBENCH_ERROR);
20295184Sek110237 	}
20305184Sek110237 
20315673Saw148015 	file = threadflow->tf_fse[fd];
20325673Saw148015 
20335673Saw148015 	if ((file == NULL) ||
20345673Saw148015 	    (file->fse_fileset->fs_attrs & FILESET_IS_RAW_DEV)) {
20355673Saw148015 		filebench_log(LOG_ERROR,
20365673Saw148015 		    "flowop %s attempted to a fsync a RAW device",
20375673Saw148015 		    flowop->fo_name);
20386084Saw148015 		return (FILEBENCH_ERROR);
20395673Saw148015 	}
20405673Saw148015 
20415184Sek110237 	/* Measure time to fsync */
20425184Sek110237 	flowop_beginop(threadflow, flowop);
20435184Sek110237 	(void) fsync(threadflow->tf_fd[fd]);
20445673Saw148015 	flowop_endop(threadflow, flowop, 0);
20455184Sek110237 
20465184Sek110237 	filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s", file->fse_path);
20475184Sek110237 
20486084Saw148015 	return (FILEBENCH_OK);
20495184Sek110237 }
20505184Sek110237 
20515184Sek110237 /*
20525184Sek110237  * Emulate fsync of an entire fileset. Search through the
20535184Sek110237  * threadflow's file descriptor array, doing fsync() on each
20545184Sek110237  * open file that belongs to the flowop's fileset. Always
20556084Saw148015  * returns FILEBENCH_OK.
20565184Sek110237  */
20575184Sek110237 static int
20585184Sek110237 flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop)
20595184Sek110237 {
20605184Sek110237 	int fd;
20615184Sek110237 
20625184Sek110237 	for (fd = 0; fd < THREADFLOW_MAXFD; fd++) {
20635184Sek110237 		filesetentry_t *file;
20645184Sek110237 
20655184Sek110237 		/* Match the file set to fsync */
20665184Sek110237 		if ((threadflow->tf_fse[fd] == NULL) ||
20675184Sek110237 		    (flowop->fo_fileset != threadflow->tf_fse[fd]->fse_fileset))
20685184Sek110237 			continue;
20695184Sek110237 
20705184Sek110237 		/* Measure time to fsync */
20715184Sek110237 		flowop_beginop(threadflow, flowop);
20725184Sek110237 		(void) fsync(threadflow->tf_fd[fd]);
20735673Saw148015 		flowop_endop(threadflow, flowop, 0);
20745184Sek110237 
20755184Sek110237 		file = threadflow->tf_fse[fd];
20765184Sek110237 
20775184Sek110237 		filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s",
20785184Sek110237 		    file->fse_path);
20795184Sek110237 	}
20805184Sek110237 
20816084Saw148015 	return (FILEBENCH_OK);
20825184Sek110237 }
20835184Sek110237 
20845184Sek110237 /*
20855184Sek110237  * Emulate close of a file.  Obtains the file descriptor index
20865184Sek110237  * from the flowop, obtains the actual file descriptor from the
20875184Sek110237  * threadflow's table, checks to be sure it is still an open
20885184Sek110237  * file, then does a close operation on it. Then sets the
20895184Sek110237  * threadflow file descriptor table entry to 0, and the file set
20906084Saw148015  * entry pointer to NULL. Returns FILEBENCH_ERROR if the file was not open,
20916084Saw148015  * FILEBENCH_OK otherwise.
20925184Sek110237  */
20935184Sek110237 static int
20945184Sek110237 flowoplib_closefile(threadflow_t *threadflow, flowop_t *flowop)
20955184Sek110237 {
20965184Sek110237 	filesetentry_t *file;
20975184Sek110237 	int fd = flowop->fo_fdnumber;
20985184Sek110237 
20995184Sek110237 	if (threadflow->tf_fd[fd] == 0) {
21005184Sek110237 		filebench_log(LOG_ERROR,
21015184Sek110237 		    "flowop %s attempted to close an already closed fd %d",
21025184Sek110237 		    flowop->fo_name, fd);
21036084Saw148015 		return (FILEBENCH_ERROR);
21045184Sek110237 	}
21055184Sek110237 
21065184Sek110237 	/* Measure time to close */
21075184Sek110237 	flowop_beginop(threadflow, flowop);
21085184Sek110237 	(void) close(threadflow->tf_fd[fd]);
21095673Saw148015 	flowop_endop(threadflow, flowop, 0);
21105184Sek110237 
21115184Sek110237 	file = threadflow->tf_fse[fd];
21125184Sek110237 
21135184Sek110237 	threadflow->tf_fd[fd] = 0;
21145184Sek110237 
21155184Sek110237 	filebench_log(LOG_DEBUG_SCRIPT, "closed file %s", file->fse_path);
21165184Sek110237 
21176084Saw148015 	return (FILEBENCH_OK);
21185184Sek110237 }
21195184Sek110237 
21205184Sek110237 /*
21215184Sek110237  * Emulate stat of a file. Picks an arbitrary filesetentry with
21225184Sek110237  * an existing file from the flowop's fileset, then performs a
21236084Saw148015  * stat() operation on it. Returns FILEBENCH_ERROR if the flowop has no
21246084Saw148015  * associated fileset. Returns FILEBENCH_NORSC if an appropriate filesetentry
21256084Saw148015  * cannot be found, and FILEBENCH_OK on success.
21265184Sek110237  */
21275184Sek110237 static int
21285184Sek110237 flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop)
21295184Sek110237 {
21305184Sek110237 	filesetentry_t *file;
21315184Sek110237 	fileset_t *fileset;
2132*7556SAndrew.W.Wilson@sun.com 	struct stat statbuf;
2133*7556SAndrew.W.Wilson@sun.com 	int fd = flowop->fo_fdnumber;
2134*7556SAndrew.W.Wilson@sun.com 
2135*7556SAndrew.W.Wilson@sun.com 	/* if fd specified and the file is open, use it to access file */
2136*7556SAndrew.W.Wilson@sun.com 	if ((fd > 0) && ((threadflow->tf_fd[fd]) > 0)) {
2137*7556SAndrew.W.Wilson@sun.com 
2138*7556SAndrew.W.Wilson@sun.com 		/* check whether file handle still valid */
2139*7556SAndrew.W.Wilson@sun.com 		if ((file = threadflow->tf_fse[fd]) == NULL) {
2140*7556SAndrew.W.Wilson@sun.com 			filebench_log(LOG_DEBUG_SCRIPT,
2141*7556SAndrew.W.Wilson@sun.com 			    "flowop %s trying to stat NULL file at fd = %d",
2142*7556SAndrew.W.Wilson@sun.com 			    flowop->fo_name, fd);
2143*7556SAndrew.W.Wilson@sun.com 			return (FILEBENCH_ERROR);
2144*7556SAndrew.W.Wilson@sun.com 		}
2145*7556SAndrew.W.Wilson@sun.com 
2146*7556SAndrew.W.Wilson@sun.com 		/* if here, we still have a valid file pointer */
2147*7556SAndrew.W.Wilson@sun.com 		fileset = file->fse_fileset;
2148*7556SAndrew.W.Wilson@sun.com 	} else {
2149*7556SAndrew.W.Wilson@sun.com 		/* Otherwise, pick arbitrary file */
2150*7556SAndrew.W.Wilson@sun.com 		file = NULL;
2151*7556SAndrew.W.Wilson@sun.com 		fileset = flowop->fo_fileset;
2152*7556SAndrew.W.Wilson@sun.com 	}
2153*7556SAndrew.W.Wilson@sun.com 
2154*7556SAndrew.W.Wilson@sun.com 
2155*7556SAndrew.W.Wilson@sun.com 	if (fileset == NULL) {
2156*7556SAndrew.W.Wilson@sun.com 		filebench_log(LOG_ERROR,
2157*7556SAndrew.W.Wilson@sun.com 		    "statfile with no fileset specified");
2158*7556SAndrew.W.Wilson@sun.com 		return (FILEBENCH_ERROR);
2159*7556SAndrew.W.Wilson@sun.com 	}
2160*7556SAndrew.W.Wilson@sun.com 
2161*7556SAndrew.W.Wilson@sun.com #ifdef HAVE_RAW_SUPPORT
2162*7556SAndrew.W.Wilson@sun.com 	/* can't be used with raw devices */
2163*7556SAndrew.W.Wilson@sun.com 	if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
2164*7556SAndrew.W.Wilson@sun.com 		filebench_log(LOG_ERROR,
2165*7556SAndrew.W.Wilson@sun.com 		    "flowop %s attempted do a statfile on a RAW device",
2166*7556SAndrew.W.Wilson@sun.com 		    flowop->fo_name);
21676084Saw148015 		return (FILEBENCH_ERROR);
21685184Sek110237 	}
2169*7556SAndrew.W.Wilson@sun.com #endif /* HAVE_RAW_SUPPORT */
2170*7556SAndrew.W.Wilson@sun.com 
2171*7556SAndrew.W.Wilson@sun.com 	if (file == NULL) {
2172*7556SAndrew.W.Wilson@sun.com 		char path[MAXPATHLEN];
2173*7556SAndrew.W.Wilson@sun.com 		char *pathtmp;
2174*7556SAndrew.W.Wilson@sun.com 
2175*7556SAndrew.W.Wilson@sun.com 		/* pick arbitrary, existing (allocated) file */
2176*7556SAndrew.W.Wilson@sun.com 		if ((file = fileset_pick(fileset, FILESET_PICKEXISTS, 0))
2177*7556SAndrew.W.Wilson@sun.com 		    == NULL) {
2178*7556SAndrew.W.Wilson@sun.com 			filebench_log(LOG_DEBUG_SCRIPT,
2179*7556SAndrew.W.Wilson@sun.com 			    "Statfile flowop %s failed to pick file",
2180*7556SAndrew.W.Wilson@sun.com 			    flowop->fo_name);
2181*7556SAndrew.W.Wilson@sun.com 			return (FILEBENCH_NORSC);
2182*7556SAndrew.W.Wilson@sun.com 		}
2183*7556SAndrew.W.Wilson@sun.com 
2184*7556SAndrew.W.Wilson@sun.com 		/* resolve path and do a stat on file */
2185*7556SAndrew.W.Wilson@sun.com 		*path = 0;
2186*7556SAndrew.W.Wilson@sun.com 		(void) strcpy(path, avd_get_str(fileset->fs_path));
2187*7556SAndrew.W.Wilson@sun.com 		(void) strcat(path, "/");
2188*7556SAndrew.W.Wilson@sun.com 		(void) strcat(path, avd_get_str(fileset->fs_name));
2189*7556SAndrew.W.Wilson@sun.com 		pathtmp = fileset_resolvepath(file);
2190*7556SAndrew.W.Wilson@sun.com 		(void) strcat(path, pathtmp);
2191*7556SAndrew.W.Wilson@sun.com 		free(pathtmp);
2192*7556SAndrew.W.Wilson@sun.com 
2193*7556SAndrew.W.Wilson@sun.com 		/* stat the file */
2194*7556SAndrew.W.Wilson@sun.com 		flowop_beginop(threadflow, flowop);
2195*7556SAndrew.W.Wilson@sun.com 		if (stat(path, &statbuf) == -1)
2196*7556SAndrew.W.Wilson@sun.com 			filebench_log(LOG_ERROR,
2197*7556SAndrew.W.Wilson@sun.com 			    "statfile flowop %s failed", flowop->fo_name);
2198*7556SAndrew.W.Wilson@sun.com 		flowop_endop(threadflow, flowop, 0);
2199*7556SAndrew.W.Wilson@sun.com 
2200*7556SAndrew.W.Wilson@sun.com 		fileset_unbusy(file, FALSE, FALSE);
2201*7556SAndrew.W.Wilson@sun.com 	} else {
2202*7556SAndrew.W.Wilson@sun.com 		/* stat specific file */
2203*7556SAndrew.W.Wilson@sun.com 		flowop_beginop(threadflow, flowop);
2204*7556SAndrew.W.Wilson@sun.com 		if (fstat(threadflow->tf_fd[fd], &statbuf) == -1)
2205*7556SAndrew.W.Wilson@sun.com 			filebench_log(LOG_ERROR,
2206*7556SAndrew.W.Wilson@sun.com 			    "statfile flowop %s failed", flowop->fo_name);
2207*7556SAndrew.W.Wilson@sun.com 		flowop_endop(threadflow, flowop, 0);
2208*7556SAndrew.W.Wilson@sun.com 
22095184Sek110237 	}
22105184Sek110237 
22116084Saw148015 	return (FILEBENCH_OK);
22125184Sek110237 }
22135184Sek110237 
22145184Sek110237 
22155184Sek110237 /*
22165184Sek110237  * Additional reads and writes. Read and write whole files, write
22175184Sek110237  * and append to files. Some of these work with both fileobjs and
22185184Sek110237  * filesets, others only with filesets. The flowoplib_write routine
22195184Sek110237  * writes from thread memory, while the others read or write using
22205184Sek110237  * fo_buf memory. Note that both flowoplib_read() and
22215184Sek110237  * flowoplib_aiowrite() use thread memory as well.
22225184Sek110237  */
22235184Sek110237 
22245184Sek110237 
22255184Sek110237 /*
22265673Saw148015  * Emulate a read of a whole file. The file must be open with
22275673Saw148015  * file descriptor and filesetentry stored at the locations indexed
22285673Saw148015  * by the flowop's fdnumber. It then seeks to the beginning of the
22295673Saw148015  * associated file, and reads fs_iosize bytes at a time until the end
22306084Saw148015  * of the file. Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if
22316084Saw148015  * out of files, and FILEBENCH_OK on success.
22325184Sek110237  */
22335184Sek110237 static int
22345184Sek110237 flowoplib_readwholefile(threadflow_t *threadflow, flowop_t *flowop)
22355184Sek110237 {
22365673Saw148015 	caddr_t iobuf;
22375184Sek110237 	off64_t bytes = 0;
22385673Saw148015 	int filedesc;
22396212Saw148015 	uint64_t wss;
22406212Saw148015 	fbint_t iosize;
22415184Sek110237 	int ret;
22426212Saw148015 	char zerordbuf;
22435184Sek110237 
22445673Saw148015 	/* get the file to use */
22456084Saw148015 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
22466084Saw148015 	    &filedesc)) != FILEBENCH_OK)
22476084Saw148015 		return (ret);
22485184Sek110237 
22495673Saw148015 	/* an I/O size of zero means read entire working set with one I/O */
22506212Saw148015 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
22515673Saw148015 		iosize = wss;
22525184Sek110237 
22536212Saw148015 	/*
22546212Saw148015 	 * The file may actually be 0 bytes long, in which case skip
22556212Saw148015 	 * the buffer set up call (which would fail) and substitute
22566212Saw148015 	 * a small buffer, which won't really be used.
22576212Saw148015 	 */
22586212Saw148015 	if (iosize == 0) {
22596212Saw148015 		iobuf = (caddr_t)&zerordbuf;
22606212Saw148015 		filebench_log(LOG_DEBUG_SCRIPT,
22616212Saw148015 		    "flowop %s read zero length file", flowop->fo_name);
22626212Saw148015 	} else {
22636212Saw148015 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
22646212Saw148015 		    iosize) != 0)
22656212Saw148015 			return (FILEBENCH_ERROR);
22666212Saw148015 	}
22675184Sek110237 
22685184Sek110237 	/* Measure time to read bytes */
22695184Sek110237 	flowop_beginop(threadflow, flowop);
22705673Saw148015 	(void) lseek64(filedesc, 0, SEEK_SET);
22715673Saw148015 	while ((ret = read(filedesc, iobuf, iosize)) > 0)
22725184Sek110237 		bytes += ret;
22735184Sek110237 
22745673Saw148015 	flowop_endop(threadflow, flowop, bytes);
22755184Sek110237 
22765184Sek110237 	if (ret < 0) {
22775184Sek110237 		filebench_log(LOG_ERROR,
22786391Saw148015 		    "readwhole fail Failed to read whole file: %s",
22796391Saw148015 		    strerror(errno));
22806084Saw148015 		return (FILEBENCH_ERROR);
22815184Sek110237 	}
22825184Sek110237 
22836084Saw148015 	return (FILEBENCH_OK);
22845184Sek110237 }
22855184Sek110237 
22865184Sek110237 /*
22875184Sek110237  * Emulate a write to a file of size fo_iosize.  Will write
22885184Sek110237  * to a file from a fileset if the flowop's fo_fileset field
22895184Sek110237  * specifies one or its fdnumber is non zero. Otherwise it
22905184Sek110237  * will write to a fileobj file, if one exists. If the file
22915184Sek110237  * is not currently open, the routine will attempt to open
22925184Sek110237  * it. The flowop's fo_wss parameter will be used to set the
22935184Sek110237  * maximum file size if it is non-zero, otherwise the
22945184Sek110237  * filesetentry's  fse_size will be used. A random memory
22955184Sek110237  * buffer offset is calculated, and, if fo_random is TRUE,
22965184Sek110237  * a random file offset is used for the write. Otherwise the
22976084Saw148015  * write is to the next sequential location. Returns
22986084Saw148015  * FILEBENCH_ERROR on errors, FILEBENCH_NORSC if iosetup can't
22996084Saw148015  * obtain a file, or FILEBENCH_OK on success.
23005184Sek110237  */
23015184Sek110237 static int
23025184Sek110237 flowoplib_write(threadflow_t *threadflow, flowop_t *flowop)
23035184Sek110237 {
23045673Saw148015 	caddr_t iobuf;
23056212Saw148015 	fbint_t wss;
23066212Saw148015 	fbint_t iosize;
23075184Sek110237 	int filedesc;
23086084Saw148015 	int ret;
23095184Sek110237 
23106212Saw148015 	iosize = avd_get_int(flowop->fo_iosize);
23116084Saw148015 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
23126212Saw148015 	    &filedesc, iosize)) != FILEBENCH_OK)
23136084Saw148015 		return (ret);
23145184Sek110237 
23156212Saw148015 	if (avd_get_bool(flowop->fo_random)) {
23165184Sek110237 		uint64_t fileoffset;
23175184Sek110237 
23185184Sek110237 		if (filebench_randomno64(&fileoffset,
23196212Saw148015 		    wss, iosize, NULL) == -1) {
23205184Sek110237 			filebench_log(LOG_ERROR,
23215184Sek110237 			    "file size smaller than IO size for thread %s",
23225184Sek110237 			    flowop->fo_name);
23236084Saw148015 			return (FILEBENCH_ERROR);
23245184Sek110237 		}
23255184Sek110237 		flowop_beginop(threadflow, flowop);
23265673Saw148015 		if (pwrite64(filedesc, iobuf,
23276212Saw148015 		    iosize, (off64_t)fileoffset) == -1) {
23285184Sek110237 			filebench_log(LOG_ERROR, "write failed, "
23296286Saw148015 			    "offset %llu io buffer %zd: %s",
23306286Saw148015 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
23315673Saw148015 			flowop_endop(threadflow, flowop, 0);
23326084Saw148015 			return (FILEBENCH_ERROR);
23335184Sek110237 		}
23346212Saw148015 		flowop_endop(threadflow, flowop, iosize);
23355184Sek110237 	} else {
23365184Sek110237 		flowop_beginop(threadflow, flowop);
23376391Saw148015 		if (write(filedesc, iobuf, iosize) == -1) {
23385184Sek110237 			filebench_log(LOG_ERROR,
23395673Saw148015 			    "write failed, io buffer %zd: %s",
23405673Saw148015 			    iobuf, strerror(errno));
23415673Saw148015 			flowop_endop(threadflow, flowop, 0);
23426084Saw148015 			return (FILEBENCH_ERROR);
23435184Sek110237 		}
23446212Saw148015 		flowop_endop(threadflow, flowop, iosize);
23455184Sek110237 	}
23465184Sek110237 
23476084Saw148015 	return (FILEBENCH_OK);
23485184Sek110237 }
23495184Sek110237 
23505184Sek110237 /*
23515184Sek110237  * Emulate a write of a whole file.  The size of the file
23525673Saw148015  * is taken from a filesetentry identified by fo_srcfdnumber or
23535673Saw148015  * from the working set size, while the file descriptor used is
23545673Saw148015  * identified by fo_fdnumber. Does multiple writes of fo_iosize
23556084Saw148015  * length length until full file has been written. Returns FILEBENCH_ERROR on
23566084Saw148015  * error, FILEBENCH_NORSC if out of files, FILEBENCH_OK on success.
23575184Sek110237  */
23585184Sek110237 static int
23595184Sek110237 flowoplib_writewholefile(threadflow_t *threadflow, flowop_t *flowop)
23605184Sek110237 {
23615673Saw148015 	caddr_t iobuf;
23625184Sek110237 	filesetentry_t *file;
23635184Sek110237 	int wsize;
23645184Sek110237 	off64_t seek;
23655184Sek110237 	off64_t bytes = 0;
23665673Saw148015 	uint64_t wss;
23676212Saw148015 	fbint_t iosize;
23685673Saw148015 	int filedesc;
23695184Sek110237 	int srcfd = flowop->fo_srcfdnumber;
23705184Sek110237 	int ret;
23716212Saw148015 	char zerowrtbuf;
23725184Sek110237 
23735673Saw148015 	/* get the file to use */
23746084Saw148015 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
23756084Saw148015 	    &filedesc)) != FILEBENCH_OK)
23766084Saw148015 		return (ret);
23775184Sek110237 
23786212Saw148015 	/* an I/O size of zero means write entire working set with one I/O */
23796212Saw148015 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
23805673Saw148015 		iosize = wss;
23815184Sek110237 
23826212Saw148015 	/*
23836212Saw148015 	 * The file may actually be 0 bytes long, in which case skip
23846212Saw148015 	 * the buffer set up call (which would fail) and substitute
23856212Saw148015 	 * a small buffer, which won't really be used.
23866212Saw148015 	 */
23876212Saw148015 	if (iosize == 0) {
23886212Saw148015 		iobuf = (caddr_t)&zerowrtbuf;
23896212Saw148015 		filebench_log(LOG_DEBUG_SCRIPT,
23906212Saw148015 		    "flowop %s wrote zero length file", flowop->fo_name);
23916212Saw148015 	} else {
23926212Saw148015 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
23936212Saw148015 		    iosize) != 0)
23946212Saw148015 			return (FILEBENCH_ERROR);
23956212Saw148015 	}
23965184Sek110237 
23975184Sek110237 	file = threadflow->tf_fse[srcfd];
23985673Saw148015 	if ((srcfd != 0) && (file == NULL)) {
23995673Saw148015 		filebench_log(LOG_ERROR, "flowop %s: NULL src file",
24005184Sek110237 		    flowop->fo_name);
24016084Saw148015 		return (FILEBENCH_ERROR);
24025184Sek110237 	}
24035184Sek110237 
24045673Saw148015 	if (file)
24055673Saw148015 		wss = file->fse_size;
24065673Saw148015 
24075673Saw148015 	wsize = (int)MIN(wss, iosize);
24085184Sek110237 
24095184Sek110237 	/* Measure time to write bytes */
24105184Sek110237 	flowop_beginop(threadflow, flowop);
24115673Saw148015 	for (seek = 0; seek < wss; seek += wsize) {
24125673Saw148015 		ret = write(filedesc, iobuf, wsize);
24135184Sek110237 		if (ret != wsize) {
24145184Sek110237 			filebench_log(LOG_ERROR,
24155184Sek110237 			    "Failed to write %d bytes on fd %d: %s",
24165673Saw148015 			    wsize, filedesc, strerror(errno));
24175673Saw148015 			flowop_endop(threadflow, flowop, 0);
24186084Saw148015 			return (FILEBENCH_ERROR);
24195184Sek110237 		}
24205673Saw148015 		wsize = (int)MIN(wss - seek, iosize);
24215184Sek110237 		bytes += ret;
24225184Sek110237 	}
24235673Saw148015 	flowop_endop(threadflow, flowop, bytes);
24245184Sek110237 
24256084Saw148015 	return (FILEBENCH_OK);
24265184Sek110237 }
24275184Sek110237 
24285184Sek110237 
24295184Sek110237 /*
24305184Sek110237  * Emulate a fixed size append to a file. Will append data to
24315184Sek110237  * a file chosen from a fileset if the flowop's fo_fileset
24325184Sek110237  * field specifies one or if its fdnumber is non zero.
24335184Sek110237  * Otherwise it will write to a fileobj file, if one exists.
24345184Sek110237  * The flowop's fo_wss parameter will be used to set the
24355184Sek110237  * maximum file size if it is non-zero, otherwise the
24365184Sek110237  * filesetentry's fse_size will be used. A random memory
24375184Sek110237  * buffer offset is calculated, then a logical seek to the
24385184Sek110237  * end of file is done followed by a write of fo_iosize
24395184Sek110237  * bytes. Writes are actually done from fo_buf, rather than
24405184Sek110237  * tf_mem as is done with flowoplib_write(), and no check
24415184Sek110237  * is made to see if fo_iosize exceeds the size of fo_buf.
24426084Saw148015  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
24436084Saw148015  * files in the fileset, FILEBENCH_OK on success.
24445184Sek110237  */
24455184Sek110237 static int
24465184Sek110237 flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop)
24475184Sek110237 {
24485673Saw148015 	caddr_t iobuf;
24495673Saw148015 	int filedesc;
24506212Saw148015 	fbint_t wss;
24516212Saw148015 	fbint_t iosize;
24525184Sek110237 	int ret;
24535184Sek110237 
24546212Saw148015 	iosize = avd_get_int(flowop->fo_iosize);
24556084Saw148015 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
24566084Saw148015 	    &filedesc, iosize)) != FILEBENCH_OK)
24576084Saw148015 		return (ret);
24585184Sek110237 
24595184Sek110237 	/* XXX wss is not being used */
24605184Sek110237 
24615184Sek110237 	/* Measure time to write bytes */
24625184Sek110237 	flowop_beginop(threadflow, flowop);
24635184Sek110237 	(void) lseek64(filedesc, 0, SEEK_END);
24645673Saw148015 	ret = write(filedesc, iobuf, iosize);
24655673Saw148015 	if (ret != iosize) {
24665184Sek110237 		filebench_log(LOG_ERROR,
24676286Saw148015 		    "Failed to write %llu bytes on fd %d: %s",
24686286Saw148015 		    (u_longlong_t)iosize, filedesc, strerror(errno));
24696212Saw148015 		flowop_endop(threadflow, flowop, ret);
24706084Saw148015 		return (FILEBENCH_ERROR);
24715184Sek110237 	}
24726212Saw148015 	flowop_endop(threadflow, flowop, ret);
24735184Sek110237 
24746084Saw148015 	return (FILEBENCH_OK);
24755184Sek110237 }
24765184Sek110237 
24775184Sek110237 /*
24785184Sek110237  * Emulate a random size append to a file. Will append data
24795184Sek110237  * to a file chosen from a fileset if the flowop's fo_fileset
24805184Sek110237  * field specifies one or if its fdnumber is non zero. Otherwise
24815184Sek110237  * it will write to a fileobj file, if one exists. The flowop's
24825184Sek110237  * fo_wss parameter will be used to set the maximum file size
24835184Sek110237  * if it is non-zero, otherwise the filesetentry's fse_size
24845184Sek110237  * will be used.  A random transfer size (but at most fo_iosize
24855184Sek110237  * bytes) and a random memory offset are calculated. A logical
24865184Sek110237  * seek to the end of file is done, then writes of up to
24875184Sek110237  * FILE_ALLOC_BLOCK in size are done until the full transfer
24885184Sek110237  * size has been written. Writes are actually done from fo_buf,
24895184Sek110237  * rather than tf_mem as is done with flowoplib_write().
24906084Saw148015  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
24916084Saw148015  * files in the fileset, FILEBENCH_OK on success.
24925184Sek110237  */
24935184Sek110237 static int
24945184Sek110237 flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop)
24955184Sek110237 {
24965673Saw148015 	caddr_t iobuf;
24975184Sek110237 	uint64_t appendsize;
24985673Saw148015 	int filedesc;
24996212Saw148015 	fbint_t wss;
25006212Saw148015 	fbint_t iosize;
25016212Saw148015 	int ret = 0;
25025184Sek110237 
25036212Saw148015 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0) {
25046212Saw148015 		filebench_log(LOG_ERROR, "zero iosize for flowop %s",
25056212Saw148015 		    flowop->fo_name);
25066212Saw148015 		return (FILEBENCH_ERROR);
25076212Saw148015 	}
25086212Saw148015 
25096212Saw148015 	if (filebench_randomno64(&appendsize, iosize, 1LL, NULL) != 0)
25106084Saw148015 		return (FILEBENCH_ERROR);
25115184Sek110237 
25125673Saw148015 	/* skip if attempting zero length append */
25135673Saw148015 	if (appendsize == 0) {
25145673Saw148015 		flowop_beginop(threadflow, flowop);
25155673Saw148015 		flowop_endop(threadflow, flowop, 0LL);
25166084Saw148015 		return (FILEBENCH_OK);
25175673Saw148015 	}
25185184Sek110237 
25196084Saw148015 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
25206084Saw148015 	    &filedesc, appendsize)) != FILEBENCH_OK)
25216084Saw148015 		return (ret);
25225673Saw148015 
25235184Sek110237 	/* XXX wss is not being used */
25245184Sek110237 
25255673Saw148015 	/* Measure time to write bytes */
25265673Saw148015 	flowop_beginop(threadflow, flowop);
25275673Saw148015 
25285673Saw148015 	(void) lseek64(filedesc, 0, SEEK_END);
25295673Saw148015 	ret = write(filedesc, iobuf, appendsize);
25305673Saw148015 	if (ret != appendsize) {
25315673Saw148015 		filebench_log(LOG_ERROR,
25326286Saw148015 		    "Failed to write %llu bytes on fd %d: %s",
25336286Saw148015 		    (u_longlong_t)appendsize, filedesc, strerror(errno));
25345673Saw148015 		flowop_endop(threadflow, flowop, 0);
25356084Saw148015 		return (FILEBENCH_ERROR);
25365184Sek110237 	}
25375184Sek110237 
25385673Saw148015 	flowop_endop(threadflow, flowop, appendsize);
25395184Sek110237 
25406084Saw148015 	return (FILEBENCH_OK);
25415184Sek110237 }
25425184Sek110237 
25436212Saw148015 typedef struct testrandvar_priv {
25446212Saw148015 	uint64_t sample_count;
25456212Saw148015 	double val_sum;
25466212Saw148015 	double sqr_sum;
25476212Saw148015 } testrandvar_priv_t;
25486212Saw148015 
25496212Saw148015 /*
25506212Saw148015  * flowop to calculate various statistics from the number stream
25516212Saw148015  * produced by a random variable. This allows verification that the
25526212Saw148015  * random distribution used to define the random variable is producing
25536212Saw148015  * the expected distribution of random numbers.
25546212Saw148015  */
25556212Saw148015 /* ARGSUSED */
25566212Saw148015 static int
25576212Saw148015 flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop)
25586212Saw148015 {
25596212Saw148015 	testrandvar_priv_t	*mystats;
25606212Saw148015 	double			value;
25616212Saw148015 
25626212Saw148015 	if ((mystats = (testrandvar_priv_t *)flowop->fo_private) == NULL) {
25636212Saw148015 		filebench_log(LOG_ERROR, "testrandvar not initialized\n");
25646212Saw148015 		filebench_shutdown(1);
25656212Saw148015 		return (-1);
25666212Saw148015 	}
25676212Saw148015 
25686212Saw148015 	value = avd_get_dbl(flowop->fo_value);
25696212Saw148015 
25706212Saw148015 	mystats->sample_count++;
25716212Saw148015 	mystats->val_sum += value;
25726212Saw148015 	mystats->sqr_sum += (value * value);
25736212Saw148015 
25746212Saw148015 	return (0);
25756212Saw148015 }
25766212Saw148015 
25776212Saw148015 /*
25786212Saw148015  * Initialize the private data area used to accumulate the statistics
25796212Saw148015  */
25806212Saw148015 static int
25816212Saw148015 flowoplib_testrandvar_init(flowop_t *flowop)
25826212Saw148015 {
25836212Saw148015 	testrandvar_priv_t	*mystats;
25846212Saw148015 
25856212Saw148015 	if ((mystats = (testrandvar_priv_t *)
25866212Saw148015 	    malloc(sizeof (testrandvar_priv_t))) == NULL) {
25876212Saw148015 		filebench_log(LOG_ERROR, "could not initialize testrandvar");
25886212Saw148015 		filebench_shutdown(1);
25896212Saw148015 		return (-1);
25906212Saw148015 	}
25916212Saw148015 
25926212Saw148015 	mystats->sample_count = 0;
25936212Saw148015 	mystats->val_sum = 0;
25946212Saw148015 	mystats->sqr_sum = 0;
25956212Saw148015 	flowop->fo_private = (void *)mystats;
25966212Saw148015 
25976212Saw148015 	(void) ipc_mutex_unlock(&flowop->fo_lock);
25986212Saw148015 	return (0);
25996212Saw148015 }
26006212Saw148015 
26016212Saw148015 /*
26026212Saw148015  * Print out the accumulated statistics, and free the private storage
26036212Saw148015  */
26046212Saw148015 static void
26056212Saw148015 flowoplib_testrandvar_destruct(flowop_t *flowop)
26066212Saw148015 {
26076212Saw148015 	testrandvar_priv_t	*mystats;
26086212Saw148015 	double mean, std_dev, dbl_count;
26096212Saw148015 
26106212Saw148015 	(void) ipc_mutex_lock(&flowop->fo_lock);
26116212Saw148015 	if ((mystats = (testrandvar_priv_t *)
26126212Saw148015 	    flowop->fo_private) == NULL) {
26136212Saw148015 		(void) ipc_mutex_unlock(&flowop->fo_lock);
26146212Saw148015 		return;
26156212Saw148015 	}
26166212Saw148015 
26176212Saw148015 	flowop->fo_private = NULL;
26186212Saw148015 	(void) ipc_mutex_unlock(&flowop->fo_lock);
26196212Saw148015 
26206212Saw148015 	dbl_count = (double)mystats->sample_count;
26216212Saw148015 	mean = mystats->val_sum / dbl_count;
26226212Saw148015 	std_dev = sqrt((mystats->sqr_sum / dbl_count) - (mean * mean)) / mean;
26236212Saw148015 
26246212Saw148015 	filebench_log(LOG_VERBOSE,
26256286Saw148015 	    "testrandvar: ops = %llu, mean = %8.2lf, stddev = %8.2lf",
26266286Saw148015 	    (u_longlong_t)mystats->sample_count, mean, std_dev);
26276212Saw148015 	free(mystats);
26286212Saw148015 }
26295184Sek110237 
26305184Sek110237 /*
2631*7556SAndrew.W.Wilson@sun.com  * prints message to the console from within a thread
2632*7556SAndrew.W.Wilson@sun.com  */
2633*7556SAndrew.W.Wilson@sun.com static int
2634*7556SAndrew.W.Wilson@sun.com flowoplib_print(threadflow_t *threadflow, flowop_t *flowop)
2635*7556SAndrew.W.Wilson@sun.com {
2636*7556SAndrew.W.Wilson@sun.com 	procflow_t *procflow;
2637*7556SAndrew.W.Wilson@sun.com 
2638*7556SAndrew.W.Wilson@sun.com 	procflow = threadflow->tf_process;
2639*7556SAndrew.W.Wilson@sun.com 	filebench_log(LOG_INFO,
2640*7556SAndrew.W.Wilson@sun.com 	    "Message from process (%s,%d), thread (%s,%d): %s",
2641*7556SAndrew.W.Wilson@sun.com 	    procflow->pf_name, procflow->pf_instance,
2642*7556SAndrew.W.Wilson@sun.com 	    threadflow->tf_name, threadflow->tf_instance,
2643*7556SAndrew.W.Wilson@sun.com 	    avd_get_str(flowop->fo_value));
2644*7556SAndrew.W.Wilson@sun.com 
2645*7556SAndrew.W.Wilson@sun.com 	return (FILEBENCH_OK);
2646*7556SAndrew.W.Wilson@sun.com }
2647*7556SAndrew.W.Wilson@sun.com 
2648*7556SAndrew.W.Wilson@sun.com /*
26495184Sek110237  * Prints usage information for flowop operations.
26505184Sek110237  */
26515184Sek110237 void
26525184Sek110237 flowoplib_usage()
26535184Sek110237 {
26545184Sek110237 	(void) fprintf(stderr,
26555184Sek110237 	    "flowop [openfile|createfile] name=<name>,fileset=<fname>\n");
26565184Sek110237 	(void) fprintf(stderr,
26575184Sek110237 	    "                       [,fd=<file desc num>]\n");
26585184Sek110237 	(void) fprintf(stderr, "\n");
26595184Sek110237 	(void) fprintf(stderr,
26605184Sek110237 	    "flowop closefile name=<name>,fd=<file desc num>]\n");
26615184Sek110237 	(void) fprintf(stderr, "\n");
26625184Sek110237 	(void) fprintf(stderr, "flowop deletefile name=<name>\n");
26635184Sek110237 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
26645184Sek110237 	(void) fprintf(stderr,
26655184Sek110237 	    "                       [,fd=<file desc num>]\n");
26665184Sek110237 	(void) fprintf(stderr, "\n");
26675184Sek110237 	(void) fprintf(stderr, "flowop statfile name=<name>\n");
26685184Sek110237 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
26695184Sek110237 	(void) fprintf(stderr,
26705184Sek110237 	    "                       [,fd=<file desc num>]\n");
26715184Sek110237 	(void) fprintf(stderr, "\n");
26725184Sek110237 	(void) fprintf(stderr,
26735184Sek110237 	    "flowop fsync name=<name>,fd=<file desc num>]\n");
26745184Sek110237 	(void) fprintf(stderr, "\n");
26755184Sek110237 	(void) fprintf(stderr,
26765184Sek110237 	    "flowop fsyncset name=<name>,fileset=<fname>]\n");
26775184Sek110237 	(void) fprintf(stderr, "\n");
26785184Sek110237 	(void) fprintf(stderr, "flowop [write|read|aiowrite] name=<name>, \n");
26795184Sek110237 	(void) fprintf(stderr,
26805184Sek110237 	    "                       filename|fileset=<fname>,\n");
26815184Sek110237 	(void) fprintf(stderr, "                       iosize=<size>\n");
26825184Sek110237 	(void) fprintf(stderr, "                       [,directio]\n");
26835184Sek110237 	(void) fprintf(stderr, "                       [,dsync]\n");
26845184Sek110237 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
26855184Sek110237 	(void) fprintf(stderr, "                       [,random]\n");
26865184Sek110237 	(void) fprintf(stderr, "                       [,opennext]\n");
26875184Sek110237 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
26885184Sek110237 	(void) fprintf(stderr,
26895184Sek110237 	    "flowop [appendfile|appendfilerand] name=<name>, \n");
26905184Sek110237 	(void) fprintf(stderr,
26915184Sek110237 	    "                       filename|fileset=<fname>,\n");
26925184Sek110237 	(void) fprintf(stderr, "                       iosize=<size>\n");
26935184Sek110237 	(void) fprintf(stderr, "                       [,dsync]\n");
26945184Sek110237 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
26955184Sek110237 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
26965184Sek110237 	(void) fprintf(stderr,
26975184Sek110237 	    "flowop [readwholefile|writewholefile] name=<name>, \n");
26985184Sek110237 	(void) fprintf(stderr,
26995184Sek110237 	    "                       filename|fileset=<fname>,\n");
27005184Sek110237 	(void) fprintf(stderr, "                       iosize=<size>\n");
27015184Sek110237 	(void) fprintf(stderr, "                       [,dsync]\n");
27025184Sek110237 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
27035184Sek110237 	(void) fprintf(stderr, "\n");
27045184Sek110237 	(void) fprintf(stderr, "flowop aiowait name=<name>,target="
27055184Sek110237 	    "<aiowrite-flowop>\n");
27065184Sek110237 	(void) fprintf(stderr, "\n");
27075184Sek110237 	(void) fprintf(stderr, "flowop sempost name=<name>,"
27085184Sek110237 	    "target=<semblock-flowop>,\n");
27095184Sek110237 	(void) fprintf(stderr,
27105184Sek110237 	    "                       value=<increment-to-post>\n");
27115184Sek110237 	(void) fprintf(stderr, "\n");
27125184Sek110237 	(void) fprintf(stderr, "flowop semblock name=<name>,value="
27135184Sek110237 	    "<decrement-to-receive>,\n");
27145184Sek110237 	(void) fprintf(stderr, "                       highwater="
27155184Sek110237 	    "<inbound-queue-max>\n");
27165184Sek110237 	(void) fprintf(stderr, "\n");
27175184Sek110237 	(void) fprintf(stderr, "flowop block name=<name>\n");
27185184Sek110237 	(void) fprintf(stderr, "\n");
27195184Sek110237 	(void) fprintf(stderr,
27205184Sek110237 	    "flowop wakeup name=<name>,target=<block-flowop>,\n");
27215184Sek110237 	(void) fprintf(stderr, "\n");
27225184Sek110237 	(void) fprintf(stderr,
27235184Sek110237 	    "flowop hog name=<name>,value=<number-of-mem-ops>\n");
27245184Sek110237 	(void) fprintf(stderr,
27255184Sek110237 	    "flowop delay name=<name>,value=<number-of-seconds>\n");
27265184Sek110237 	(void) fprintf(stderr, "\n");
27275184Sek110237 	(void) fprintf(stderr, "flowop eventlimit name=<name>\n");
27285184Sek110237 	(void) fprintf(stderr, "flowop bwlimit name=<name>,value=<mb/s>\n");
27295184Sek110237 	(void) fprintf(stderr, "flowop iopslimit name=<name>,value=<iop/s>\n");
27305184Sek110237 	(void) fprintf(stderr,
27315184Sek110237 	    "flowop finishoncount name=<name>,value=<ops/s>\n");
27325184Sek110237 	(void) fprintf(stderr,
27335184Sek110237 	    "flowop finishonbytes name=<name>,value=<bytes>\n");
27345184Sek110237 	(void) fprintf(stderr, "\n");
27355184Sek110237 	(void) fprintf(stderr, "\n");
27365184Sek110237 }
2737