xref: /onnv-gate/usr/src/cmd/filebench/common/flowop_library.c (revision 5184:da60d2b4a9e2)
1*5184Sek110237 /*
2*5184Sek110237  * CDDL HEADER START
3*5184Sek110237  *
4*5184Sek110237  * The contents of this file are subject to the terms of the
5*5184Sek110237  * Common Development and Distribution License (the "License").
6*5184Sek110237  * You may not use this file except in compliance with the License.
7*5184Sek110237  *
8*5184Sek110237  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9*5184Sek110237  * or http://www.opensolaris.org/os/licensing.
10*5184Sek110237  * See the License for the specific language governing permissions
11*5184Sek110237  * and limitations under the License.
12*5184Sek110237  *
13*5184Sek110237  * When distributing Covered Code, include this CDDL HEADER in each
14*5184Sek110237  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15*5184Sek110237  * If applicable, add the following below this CDDL HEADER, with the
16*5184Sek110237  * fields enclosed by brackets "[]" replaced with your own identifying
17*5184Sek110237  * information: Portions Copyright [yyyy] [name of copyright owner]
18*5184Sek110237  *
19*5184Sek110237  * CDDL HEADER END
20*5184Sek110237  */
21*5184Sek110237 /*
22*5184Sek110237  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
23*5184Sek110237  * Use is subject to license terms.
24*5184Sek110237  */
25*5184Sek110237 
26*5184Sek110237 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27*5184Sek110237 
28*5184Sek110237 #include "config.h"
29*5184Sek110237 
30*5184Sek110237 #include <sys/types.h>
31*5184Sek110237 #ifdef HAVE_SYS_ASYNCH_H
32*5184Sek110237 #include <sys/asynch.h>
33*5184Sek110237 #endif
34*5184Sek110237 #include <sys/ipc.h>
35*5184Sek110237 #include <sys/sem.h>
36*5184Sek110237 #include <sys/errno.h>
37*5184Sek110237 #include <sys/time.h>
38*5184Sek110237 #include <inttypes.h>
39*5184Sek110237 #include <fcntl.h>
40*5184Sek110237 
41*5184Sek110237 #ifdef HAVE_UTILITY_H
42*5184Sek110237 #include <utility.h>
43*5184Sek110237 #endif /* HAVE_UTILITY_H */
44*5184Sek110237 
45*5184Sek110237 #ifdef HAVE_AIO
46*5184Sek110237 #include <aio.h>
47*5184Sek110237 #endif /* HAVE_AIO */
48*5184Sek110237 
49*5184Sek110237 #ifdef HAVE_LIBAIO_H
50*5184Sek110237 #include <libaio.h>
51*5184Sek110237 #endif /* HAVE_LIBAIO_H */
52*5184Sek110237 
53*5184Sek110237 #ifdef HAVE_SYS_ASYNC_H
54*5184Sek110237 #include <sys/asynch.h>
55*5184Sek110237 #endif /* HAVE_SYS_ASYNC_H */
56*5184Sek110237 
57*5184Sek110237 #ifdef HAVE_AIO_H
58*5184Sek110237 #include <aio.h>
59*5184Sek110237 #endif /* HAVE_AIO_H */
60*5184Sek110237 
61*5184Sek110237 #ifndef HAVE_UINT_T
62*5184Sek110237 #define	uint_t unsigned int
63*5184Sek110237 #endif /* HAVE_UINT_T */
64*5184Sek110237 
65*5184Sek110237 #ifndef HAVE_AIOCB64_T
66*5184Sek110237 #define	aiocb64 aiocb
67*5184Sek110237 #endif /* HAVE_AIOCB64_T */
68*5184Sek110237 
69*5184Sek110237 #ifndef HAVE_SYSV_SEM
70*5184Sek110237 #include <semaphore.h>
71*5184Sek110237 #endif /* HAVE_SYSV_SEM */
72*5184Sek110237 
73*5184Sek110237 #include "filebench.h"
74*5184Sek110237 #include "flowop.h"
75*5184Sek110237 #include "fileset.h"
76*5184Sek110237 
77*5184Sek110237 /*
78*5184Sek110237  * These routines implement the flowops from the f language. Each
79*5184Sek110237  * flowop has has a name such as "read", and a set of function pointers
80*5184Sek110237  * to call for initialization, execution and destruction of the flowop.
81*5184Sek110237  * The table flowoplib_funcs[] contains a flowoplib struct for each
82*5184Sek110237  * implemented flowop. Most flowops use a generic initialization function
83*5184Sek110237  * and all currently use a generic destruction function. All flowop
84*5184Sek110237  * functions referenced from the table are in this file, though, of
85*5184Sek110237  * course, they often call functions from other files.
86*5184Sek110237  *
87*5184Sek110237  * The flowop_init() routine uses the flowoplib_funcs[] table to
88*5184Sek110237  * create an initial set of "instance 0" flowops, one for each type of
89*5184Sek110237  * flowop, from which all other flowops are derived. These "instance 0"
90*5184Sek110237  * flowops are initialized with information from the table including
91*5184Sek110237  * pointers for their fo_init, fo_func and fo_destroy functions. When
92*5184Sek110237  * a flowop definition is encountered in an f language script, the
93*5184Sek110237  * "type" of flowop, such as "read" is used to search for the
94*5184Sek110237  * "instance 0" flowop named "read", then a new flowop is allocated
95*5184Sek110237  * which inherits its function pointers and other initial properties
96*5184Sek110237  * from the instance 0 flowop, and is given a new name as specified
97*5184Sek110237  * by the "name=" attribute.
98*5184Sek110237  */
99*5184Sek110237 
100*5184Sek110237 static int flowoplib_init_generic(flowop_t *flowop);
101*5184Sek110237 static void flowoplib_destruct_generic(flowop_t *flowop);
102*5184Sek110237 static int flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop);
103*5184Sek110237 static int flowoplib_write(threadflow_t *threadflow, flowop_t *flowop);
104*5184Sek110237 #ifdef HAVE_AIO
105*5184Sek110237 static int flowoplib_aiowrite(threadflow_t *threadflow, flowop_t *flowop);
106*5184Sek110237 static int flowoplib_aiowait(threadflow_t *threadflow, flowop_t *flowop);
107*5184Sek110237 #endif
108*5184Sek110237 static int flowoplib_read(threadflow_t *threadflow, flowop_t *flowop);
109*5184Sek110237 static int flowoplib_block_init(flowop_t *flowop);
110*5184Sek110237 static int flowoplib_block(threadflow_t *threadflow, flowop_t *flowop);
111*5184Sek110237 static int flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop);
112*5184Sek110237 static int flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop);
113*5184Sek110237 static int flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop);
114*5184Sek110237 static int flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop);
115*5184Sek110237 static int flowoplib_sempost_init(flowop_t *flowop);
116*5184Sek110237 static int flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop);
117*5184Sek110237 static int flowoplib_semblock_init(flowop_t *flowop);
118*5184Sek110237 static void flowoplib_semblock_destruct(flowop_t *flowop);
119*5184Sek110237 static int flowoplib_eventlimit(threadflow_t *, flowop_t *flowop);
120*5184Sek110237 static int flowoplib_bwlimit(threadflow_t *, flowop_t *flowop);
121*5184Sek110237 static int flowoplib_iopslimit(threadflow_t *, flowop_t *flowop);
122*5184Sek110237 static int flowoplib_opslimit(threadflow_t *, flowop_t *flowop);
123*5184Sek110237 static int flowoplib_openfile(threadflow_t *, flowop_t *flowop);
124*5184Sek110237 static int flowoplib_openfile_common(threadflow_t *, flowop_t *flowop, int fd);
125*5184Sek110237 static int flowoplib_createfile(threadflow_t *, flowop_t *flowop);
126*5184Sek110237 static int flowoplib_closefile(threadflow_t *, flowop_t *flowop);
127*5184Sek110237 static int flowoplib_fsync(threadflow_t *, flowop_t *flowop);
128*5184Sek110237 static int flowoplib_readwholefile(threadflow_t *, flowop_t *flowop);
129*5184Sek110237 static int flowoplib_writewholefile(threadflow_t *, flowop_t *flowop);
130*5184Sek110237 static int flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop);
131*5184Sek110237 static int flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop);
132*5184Sek110237 static int flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop);
133*5184Sek110237 static int flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop);
134*5184Sek110237 static int flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop);
135*5184Sek110237 static int flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop);
136*5184Sek110237 static int flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop);
137*5184Sek110237 
138*5184Sek110237 typedef struct flowoplib {
139*5184Sek110237 	int	fl_type;
140*5184Sek110237 	int	fl_attrs;
141*5184Sek110237 	char	*fl_name;
142*5184Sek110237 	int	(*fl_init)();
143*5184Sek110237 	int	(*fl_func)();
144*5184Sek110237 	void	(*fl_destruct)();
145*5184Sek110237 } flowoplib_t;
146*5184Sek110237 
147*5184Sek110237 static flowoplib_t flowoplib_funcs[] = {
148*5184Sek110237 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "write", flowoplib_init_generic,
149*5184Sek110237 	flowoplib_write, flowoplib_destruct_generic,
150*5184Sek110237 	FLOW_TYPE_IO, FLOW_ATTR_READ, "read", flowoplib_init_generic,
151*5184Sek110237 	flowoplib_read, flowoplib_destruct_generic,
152*5184Sek110237 #ifdef HAVE_AIO
153*5184Sek110237 	FLOW_TYPE_AIO, FLOW_ATTR_WRITE, "aiowrite", flowoplib_init_generic,
154*5184Sek110237 	flowoplib_aiowrite, flowoplib_destruct_generic,
155*5184Sek110237 	FLOW_TYPE_AIO, 0, "aiowait", flowoplib_init_generic,
156*5184Sek110237 	flowoplib_aiowait, flowoplib_destruct_generic,
157*5184Sek110237 #endif
158*5184Sek110237 	FLOW_TYPE_SYNC, 0, "block", flowoplib_block_init,
159*5184Sek110237 	flowoplib_block, flowoplib_destruct_generic,
160*5184Sek110237 	FLOW_TYPE_SYNC, 0, "wakeup", flowoplib_init_generic,
161*5184Sek110237 	flowoplib_wakeup, flowoplib_destruct_generic,
162*5184Sek110237 	FLOW_TYPE_SYNC, 0, "semblock", flowoplib_semblock_init,
163*5184Sek110237 	flowoplib_semblock, flowoplib_semblock_destruct,
164*5184Sek110237 	FLOW_TYPE_SYNC, 0, "sempost", flowoplib_sempost_init,
165*5184Sek110237 	flowoplib_sempost, flowoplib_destruct_generic,
166*5184Sek110237 	FLOW_TYPE_OTHER, 0, "hog", flowoplib_init_generic,
167*5184Sek110237 	flowoplib_hog, flowoplib_destruct_generic,
168*5184Sek110237 	FLOW_TYPE_OTHER, 0, "delay", flowoplib_init_generic,
169*5184Sek110237 	flowoplib_delay, flowoplib_destruct_generic,
170*5184Sek110237 	FLOW_TYPE_OTHER, 0, "eventlimit", flowoplib_init_generic,
171*5184Sek110237 	flowoplib_eventlimit, flowoplib_destruct_generic,
172*5184Sek110237 	FLOW_TYPE_OTHER, 0, "bwlimit", flowoplib_init_generic,
173*5184Sek110237 	flowoplib_bwlimit, flowoplib_destruct_generic,
174*5184Sek110237 	FLOW_TYPE_OTHER, 0, "iopslimit", flowoplib_init_generic,
175*5184Sek110237 	flowoplib_iopslimit, flowoplib_destruct_generic,
176*5184Sek110237 	FLOW_TYPE_OTHER, 0, "opslimit", flowoplib_init_generic,
177*5184Sek110237 	flowoplib_opslimit, flowoplib_destruct_generic,
178*5184Sek110237 	FLOW_TYPE_OTHER, 0, "finishoncount", flowoplib_init_generic,
179*5184Sek110237 	flowoplib_finishoncount, flowoplib_destruct_generic,
180*5184Sek110237 	FLOW_TYPE_OTHER, 0, "finishonbytes", flowoplib_init_generic,
181*5184Sek110237 	flowoplib_finishonbytes, flowoplib_destruct_generic,
182*5184Sek110237 	FLOW_TYPE_IO, 0, "openfile", flowoplib_init_generic,
183*5184Sek110237 	flowoplib_openfile, flowoplib_destruct_generic,
184*5184Sek110237 	FLOW_TYPE_IO, 0, "createfile", flowoplib_init_generic,
185*5184Sek110237 	flowoplib_createfile, flowoplib_destruct_generic,
186*5184Sek110237 	FLOW_TYPE_IO, 0, "closefile", flowoplib_init_generic,
187*5184Sek110237 	flowoplib_closefile, flowoplib_destruct_generic,
188*5184Sek110237 	FLOW_TYPE_IO, 0, "fsync", flowoplib_init_generic,
189*5184Sek110237 	flowoplib_fsync, flowoplib_destruct_generic,
190*5184Sek110237 	FLOW_TYPE_IO, 0, "fsyncset", flowoplib_init_generic,
191*5184Sek110237 	flowoplib_fsyncset, flowoplib_destruct_generic,
192*5184Sek110237 	FLOW_TYPE_IO, 0, "statfile", flowoplib_init_generic,
193*5184Sek110237 	flowoplib_statfile, flowoplib_destruct_generic,
194*5184Sek110237 	FLOW_TYPE_IO, FLOW_ATTR_READ, "readwholefile", flowoplib_init_generic,
195*5184Sek110237 	flowoplib_readwholefile, flowoplib_destruct_generic,
196*5184Sek110237 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfile", flowoplib_init_generic,
197*5184Sek110237 	flowoplib_appendfile, flowoplib_destruct_generic,
198*5184Sek110237 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfilerand", flowoplib_init_generic,
199*5184Sek110237 	flowoplib_appendfilerand, flowoplib_destruct_generic,
200*5184Sek110237 	FLOW_TYPE_IO, 0, "deletefile", flowoplib_init_generic,
201*5184Sek110237 	flowoplib_deletefile, flowoplib_destruct_generic,
202*5184Sek110237 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "writewholefile", flowoplib_init_generic,
203*5184Sek110237 	flowoplib_writewholefile, flowoplib_destruct_generic
204*5184Sek110237 };
205*5184Sek110237 
206*5184Sek110237 /*
207*5184Sek110237  * Loops through the master list of flowops defined in this
208*5184Sek110237  * module, and creates and initializes a flowop for each one
209*5184Sek110237  * by calling flowop_define. As a side effect of calling
210*5184Sek110237  * flowop define, the created flowops are placed on the
211*5184Sek110237  * master flowop list. All created flowops are set to
212*5184Sek110237  * instance "0".
213*5184Sek110237  */
214*5184Sek110237 void
215*5184Sek110237 flowoplib_init()
216*5184Sek110237 {
217*5184Sek110237 	int nops = sizeof (flowoplib_funcs) / sizeof (flowoplib_t);
218*5184Sek110237 	int i;
219*5184Sek110237 
220*5184Sek110237 	for (i = 0; i < nops; i++) {
221*5184Sek110237 		flowop_t *flowop;
222*5184Sek110237 		flowoplib_t *fl;
223*5184Sek110237 
224*5184Sek110237 		fl = &flowoplib_funcs[i];
225*5184Sek110237 
226*5184Sek110237 		if ((flowop = flowop_define(NULL,
227*5184Sek110237 		    fl->fl_name, NULL, 0, fl->fl_type)) == 0) {
228*5184Sek110237 			filebench_log(LOG_ERROR,
229*5184Sek110237 			    "failed to create flowop %s\n",
230*5184Sek110237 			    fl->fl_name);
231*5184Sek110237 			filebench_shutdown(1);
232*5184Sek110237 		}
233*5184Sek110237 
234*5184Sek110237 		flowop->fo_func = fl->fl_func;
235*5184Sek110237 		flowop->fo_init = fl->fl_init;
236*5184Sek110237 		flowop->fo_destruct = fl->fl_destruct;
237*5184Sek110237 		flowop->fo_attrs = fl->fl_attrs;
238*5184Sek110237 	}
239*5184Sek110237 }
240*5184Sek110237 
241*5184Sek110237 static int
242*5184Sek110237 flowoplib_init_generic(flowop_t *flowop)
243*5184Sek110237 {
244*5184Sek110237 	(void) ipc_mutex_unlock(&flowop->fo_lock);
245*5184Sek110237 	return (0);
246*5184Sek110237 }
247*5184Sek110237 
248*5184Sek110237 /* ARGSUSED */
249*5184Sek110237 static void
250*5184Sek110237 flowoplib_destruct_generic(flowop_t *flowop)
251*5184Sek110237 {
252*5184Sek110237 }
253*5184Sek110237 
254*5184Sek110237 /*
255*5184Sek110237  * Generates a file attribute from flags in the supplied flowop.
256*5184Sek110237  * Sets FLOW_ATTR_DIRECTIO and/or FLOW_ATTR_DSYNC as needed.
257*5184Sek110237  */
258*5184Sek110237 static int
259*5184Sek110237 flowoplib_fileattrs(flowop_t *flowop)
260*5184Sek110237 {
261*5184Sek110237 	int attrs = 0;
262*5184Sek110237 
263*5184Sek110237 	if (*flowop->fo_directio)
264*5184Sek110237 		attrs |= FLOW_ATTR_DIRECTIO;
265*5184Sek110237 
266*5184Sek110237 	if (*flowop->fo_dsync)
267*5184Sek110237 		attrs |= FLOW_ATTR_DSYNC;
268*5184Sek110237 
269*5184Sek110237 	return (attrs);
270*5184Sek110237 }
271*5184Sek110237 
272*5184Sek110237 /*
273*5184Sek110237  * Searches for a file descriptor. Tries the flowop's
274*5184Sek110237  * fo_fdnumber first and returns with it if it has been
275*5184Sek110237  * explicitly set (greater than 0). It next checks to
276*5184Sek110237  * see if a rotating file descriptor policy is in effect,
277*5184Sek110237  * and if not returns the fdnumber regardless of what
278*5184Sek110237  * it is. (note that if it is 0, it just selects to the
279*5184Sek110237  * default file descriptor in the threadflow's tf_fd
280*5184Sek110237  * array). If the rotating fd policy is in effect, it
281*5184Sek110237  * cycles from the end of the tf_fd array to one location
282*5184Sek110237  * beyond the maximum needed by the number of entries in
283*5184Sek110237  * the associated fileset on each invocation, then starts
284*5184Sek110237  * over from the end.
285*5184Sek110237  *
286*5184Sek110237  * The routine returns an index into the threadflow's
287*5184Sek110237  * tf_fd table where the actual file descriptor will be
288*5184Sek110237  * found. Note: the calling routine must not call this
289*5184Sek110237  * routine if the flowop does not have a fileset, and the
290*5184Sek110237  * flowop's fo_fdnumber is zero and fo_rotatefd is
291*5184Sek110237  * asserted, or an addressing fault may occur.
292*5184Sek110237  */
293*5184Sek110237 int
294*5184Sek110237 flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop)
295*5184Sek110237 {
296*5184Sek110237 	/* If the script sets the fd explicitly */
297*5184Sek110237 	if (flowop->fo_fdnumber > 0)
298*5184Sek110237 		return (flowop->fo_fdnumber);
299*5184Sek110237 
300*5184Sek110237 	/* If the flowop defaults to persistent fd */
301*5184Sek110237 	if (!integer_isset(flowop->fo_rotatefd))
302*5184Sek110237 		return (flowop->fo_fdnumber);
303*5184Sek110237 
304*5184Sek110237 	/* Rotate the fd on each flowop invocation */
305*5184Sek110237 	if (*(flowop->fo_fileset->fs_entries) > (THREADFLOW_MAXFD / 2)) {
306*5184Sek110237 		filebench_log(LOG_ERROR, "Out of file descriptors in flowop %s"
307*5184Sek110237 		    " (too many files : %d", flowop->fo_name,
308*5184Sek110237 		    *(flowop->fo_fileset->fs_entries));
309*5184Sek110237 		return (-1);
310*5184Sek110237 	}
311*5184Sek110237 
312*5184Sek110237 	/* First time around */
313*5184Sek110237 	if (threadflow->tf_fdrotor == 0)
314*5184Sek110237 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
315*5184Sek110237 
316*5184Sek110237 	/* One fd for every file in the set */
317*5184Sek110237 	if (*(flowop->fo_fileset->fs_entries) ==
318*5184Sek110237 	    (THREADFLOW_MAXFD - threadflow->tf_fdrotor))
319*5184Sek110237 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
320*5184Sek110237 
321*5184Sek110237 
322*5184Sek110237 	threadflow->tf_fdrotor--;
323*5184Sek110237 	filebench_log(LOG_DEBUG_IMPL, "selected fd = %d",
324*5184Sek110237 	    threadflow->tf_fdrotor);
325*5184Sek110237 	return (threadflow->tf_fdrotor);
326*5184Sek110237 }
327*5184Sek110237 
328*5184Sek110237 /*
329*5184Sek110237  * Emulate posix read / pread. If the flowop has a fileset,
330*5184Sek110237  * a file descriptor number index is fetched, otherwise a
331*5184Sek110237  * supplied fileobj file is used. In either case the specified
332*5184Sek110237  * file will be opened if not already open. If the flowop has
333*5184Sek110237  * neither a fileset or fileobj, an error is logged and -1
334*5184Sek110237  * returned.
335*5184Sek110237  *
336*5184Sek110237  * The actual read is done to a random offset in the
337*5184Sek110237  * threadflow's thread memory (tf_mem), with a size set by
338*5184Sek110237  * fo_iosize and at either a random disk offset within the
339*5184Sek110237  * working set size, or at the next sequential location. If
340*5184Sek110237  * any errors are encountered, -1 is returned, if successful,
341*5184Sek110237  * 0 is returned.
342*5184Sek110237  */
343*5184Sek110237 static int
344*5184Sek110237 flowoplib_read(threadflow_t *threadflow, flowop_t *flowop)
345*5184Sek110237 {
346*5184Sek110237 	size_t memoffset;
347*5184Sek110237 	vinteger_t wss;
348*5184Sek110237 	long memsize, round;
349*5184Sek110237 	int filedesc;
350*5184Sek110237 	int ret;
351*5184Sek110237 
352*5184Sek110237 	if (flowop->fo_fileset || flowop->fo_fdnumber) {
353*5184Sek110237 		int fd = flowoplib_fdnum(threadflow, flowop);
354*5184Sek110237 
355*5184Sek110237 		if (fd == -1)
356*5184Sek110237 			return (-1);
357*5184Sek110237 
358*5184Sek110237 		if (threadflow->tf_fd[fd] == 0) {
359*5184Sek110237 			(void) flowoplib_openfile_common(threadflow,
360*5184Sek110237 			    flowop, fd);
361*5184Sek110237 			filebench_log(LOG_DEBUG_IMPL, "read opened file %s",
362*5184Sek110237 			    threadflow->tf_fse[fd]->fse_path);
363*5184Sek110237 		}
364*5184Sek110237 		filedesc = threadflow->tf_fd[fd];
365*5184Sek110237 		if (*flowop->fo_wss == 0)
366*5184Sek110237 			wss = threadflow->tf_fse[fd]->fse_size;
367*5184Sek110237 		else
368*5184Sek110237 			wss = *flowop->fo_wss;
369*5184Sek110237 	} else {
370*5184Sek110237 		if (flowop->fo_file == NULL) {
371*5184Sek110237 			filebench_log(LOG_ERROR, "flowop NULL file");
372*5184Sek110237 			return (-1);
373*5184Sek110237 		}
374*5184Sek110237 		if (flowop->fo_fd < 0)
375*5184Sek110237 			flowop->fo_fd = fileobj_open(flowop->fo_file,
376*5184Sek110237 			    flowoplib_fileattrs(flowop));
377*5184Sek110237 
378*5184Sek110237 		if (flowop->fo_fd < 0) {
379*5184Sek110237 			filebench_log(LOG_ERROR, "failed to open file %s",
380*5184Sek110237 			    flowop->fo_file->fo_name);
381*5184Sek110237 			return (-1);
382*5184Sek110237 		}
383*5184Sek110237 		filedesc = flowop->fo_fd;
384*5184Sek110237 		if (*flowop->fo_wss == 0)
385*5184Sek110237 			wss = *flowop->fo_file->fo_size;
386*5184Sek110237 		else
387*5184Sek110237 			wss = *flowop->fo_wss;
388*5184Sek110237 	}
389*5184Sek110237 
390*5184Sek110237 	if (*flowop->fo_iosize == 0) {
391*5184Sek110237 		filebench_log(LOG_ERROR, "zero iosize for thread %s",
392*5184Sek110237 		    flowop->fo_name);
393*5184Sek110237 		return (-1);
394*5184Sek110237 	}
395*5184Sek110237 
396*5184Sek110237 	memsize = *threadflow->tf_memsize;
397*5184Sek110237 	round = *flowop->fo_iosize;
398*5184Sek110237 
399*5184Sek110237 	if (filebench_randomno(&memoffset, memsize, round) == -1) {
400*5184Sek110237 		filebench_log(LOG_ERROR,
401*5184Sek110237 		    "tf_memsize smaller than IO size for thread %s",
402*5184Sek110237 		    flowop->fo_name);
403*5184Sek110237 		return (-1);
404*5184Sek110237 	}
405*5184Sek110237 
406*5184Sek110237 	if (*flowop->fo_random) {
407*5184Sek110237 		uint64_t fileoffset;
408*5184Sek110237 
409*5184Sek110237 		if (filebench_randomno64(&fileoffset, wss,
410*5184Sek110237 		    *flowop->fo_iosize) == -1) {
411*5184Sek110237 			filebench_log(LOG_ERROR,
412*5184Sek110237 			    "file size smaller than IO size for thread %s",
413*5184Sek110237 			    flowop->fo_name);
414*5184Sek110237 			return (-1);
415*5184Sek110237 		}
416*5184Sek110237 
417*5184Sek110237 		(void) flowop_beginop(threadflow, flowop);
418*5184Sek110237 		if ((ret = pread64(filedesc, threadflow->tf_mem + memoffset,
419*5184Sek110237 		    *flowop->fo_iosize, (off64_t)fileoffset)) == -1) {
420*5184Sek110237 			(void) flowop_endop(threadflow, flowop);
421*5184Sek110237 			filebench_log(LOG_ERROR,
422*5184Sek110237 			    "read file %s failed, offset %lld "
423*5184Sek110237 			    "memoffset %zd: %s",
424*5184Sek110237 			    flowop->fo_file->fo_name,
425*5184Sek110237 			    fileoffset, memoffset, strerror(errno));
426*5184Sek110237 			flowop_endop(threadflow, flowop);
427*5184Sek110237 			return (-1);
428*5184Sek110237 		}
429*5184Sek110237 		(void) flowop_endop(threadflow, flowop);
430*5184Sek110237 
431*5184Sek110237 		if ((ret == 0))
432*5184Sek110237 			(void) lseek64(filedesc, 0, SEEK_SET);
433*5184Sek110237 
434*5184Sek110237 	} else {
435*5184Sek110237 		(void) flowop_beginop(threadflow, flowop);
436*5184Sek110237 		if ((ret = read(filedesc, threadflow->tf_mem + memoffset,
437*5184Sek110237 		    *flowop->fo_iosize)) == -1) {
438*5184Sek110237 			filebench_log(LOG_ERROR,
439*5184Sek110237 			    "read file %s failed, memoffset %zd: %s",
440*5184Sek110237 			    flowop->fo_file->fo_name,
441*5184Sek110237 			    memoffset, strerror(errno));
442*5184Sek110237 			(void) flowop_endop(threadflow, flowop);
443*5184Sek110237 			return (-1);
444*5184Sek110237 		}
445*5184Sek110237 		(void) flowop_endop(threadflow, flowop);
446*5184Sek110237 
447*5184Sek110237 		if ((ret == 0))
448*5184Sek110237 			(void) lseek64(filedesc, 0, SEEK_SET);
449*5184Sek110237 	}
450*5184Sek110237 
451*5184Sek110237 	return (0);
452*5184Sek110237 }
453*5184Sek110237 
454*5184Sek110237 #ifdef HAVE_AIO
455*5184Sek110237 
456*5184Sek110237 /*
457*5184Sek110237  * Asynchronous write section. An Asynchronous IO element
458*5184Sek110237  * (aiolist_t) is used to associate the asynchronous write request with
459*5184Sek110237  * its subsequent completion. This element includes a aiocb64 struct
460*5184Sek110237  * that is used by posix aio_xxx calls to track the asynchronous writes.
461*5184Sek110237  * The flowops aiowrite and aiowait result in calls to these posix
462*5184Sek110237  * aio_xxx system routines to do the actual asynchronous write IO
463*5184Sek110237  * operations.
464*5184Sek110237  */
465*5184Sek110237 
466*5184Sek110237 
467*5184Sek110237 /*
468*5184Sek110237  * Allocates an asynchronous I/O list (aio, of type
469*5184Sek110237  * aiolist_t) element. Adds it to the flowop thread's
470*5184Sek110237  * threadflow aio list. Returns a pointer to the element.
471*5184Sek110237  */
472*5184Sek110237 static aiolist_t *
473*5184Sek110237 aio_allocate(flowop_t *flowop)
474*5184Sek110237 {
475*5184Sek110237 	aiolist_t *aiolist;
476*5184Sek110237 
477*5184Sek110237 	if ((aiolist = malloc(sizeof (aiolist_t))) == NULL) {
478*5184Sek110237 		filebench_log(LOG_ERROR, "malloc aiolist failed");
479*5184Sek110237 		filebench_shutdown(1);
480*5184Sek110237 	}
481*5184Sek110237 
482*5184Sek110237 	/* Add to list */
483*5184Sek110237 	if (flowop->fo_thread->tf_aiolist == NULL) {
484*5184Sek110237 		flowop->fo_thread->tf_aiolist = aiolist;
485*5184Sek110237 		aiolist->al_next = NULL;
486*5184Sek110237 	} else {
487*5184Sek110237 		aiolist->al_next = flowop->fo_thread->tf_aiolist;
488*5184Sek110237 		flowop->fo_thread->tf_aiolist = aiolist;
489*5184Sek110237 	}
490*5184Sek110237 	return (aiolist);
491*5184Sek110237 }
492*5184Sek110237 
493*5184Sek110237 /*
494*5184Sek110237  * Searches for the aiolist element that has a matching
495*5184Sek110237  * completion block, aiocb. If none found returns -1. If
496*5184Sek110237  * found, removes the aiolist element from flowop thread's
497*5184Sek110237  * list and returns 0.
498*5184Sek110237  */
499*5184Sek110237 static int
500*5184Sek110237 aio_deallocate(flowop_t *flowop, struct aiocb64 *aiocb)
501*5184Sek110237 {
502*5184Sek110237 	aiolist_t *aiolist = flowop->fo_thread->tf_aiolist;
503*5184Sek110237 	aiolist_t *previous = NULL;
504*5184Sek110237 	aiolist_t *match = NULL;
505*5184Sek110237 
506*5184Sek110237 	if (aiocb == NULL) {
507*5184Sek110237 		filebench_log(LOG_ERROR, "null aiocb deallocate");
508*5184Sek110237 		return (0);
509*5184Sek110237 	}
510*5184Sek110237 
511*5184Sek110237 	while (aiolist) {
512*5184Sek110237 		if (aiocb == &(aiolist->al_aiocb)) {
513*5184Sek110237 			match = aiolist;
514*5184Sek110237 			break;
515*5184Sek110237 		}
516*5184Sek110237 		previous = aiolist;
517*5184Sek110237 		aiolist = aiolist->al_next;
518*5184Sek110237 	}
519*5184Sek110237 
520*5184Sek110237 	if (match == NULL)
521*5184Sek110237 		return (-1);
522*5184Sek110237 
523*5184Sek110237 	/* Remove from the list */
524*5184Sek110237 	if (previous)
525*5184Sek110237 		previous->al_next = match->al_next;
526*5184Sek110237 	else
527*5184Sek110237 		flowop->fo_thread->tf_aiolist = match->al_next;
528*5184Sek110237 
529*5184Sek110237 	return (0);
530*5184Sek110237 }
531*5184Sek110237 
532*5184Sek110237 /*
533*5184Sek110237  * Emulate posix aiowrite(). Determines which file to use,
534*5184Sek110237  * either one file of a fileset, or the file associated
535*5184Sek110237  * with a fileobj, allocates and fills an aiolist_t element
536*5184Sek110237  * for the write, and issues the asynchronous write. This
537*5184Sek110237  * operation is only valid for random IO, and returns an
538*5184Sek110237  * error if the flowop is set for sequential IO. Returns 0
539*5184Sek110237  * on success, -1 on any encountered error.
540*5184Sek110237  */
541*5184Sek110237 static int
542*5184Sek110237 flowoplib_aiowrite(threadflow_t *threadflow, flowop_t *flowop)
543*5184Sek110237 {
544*5184Sek110237 	size_t memoffset;
545*5184Sek110237 	vinteger_t wss;
546*5184Sek110237 	long memsize, round;
547*5184Sek110237 	int filedesc;
548*5184Sek110237 
549*5184Sek110237 	if (flowop->fo_fileset || flowop->fo_fdnumber) {
550*5184Sek110237 		int fd = flowoplib_fdnum(threadflow, flowop);
551*5184Sek110237 
552*5184Sek110237 		if (fd == -1)
553*5184Sek110237 			return (-1);
554*5184Sek110237 
555*5184Sek110237 		if (threadflow->tf_fd[fd] == 0) {
556*5184Sek110237 			(void) flowoplib_openfile_common(threadflow,
557*5184Sek110237 			    flowop, fd);
558*5184Sek110237 			filebench_log(LOG_DEBUG_IMPL,
559*5184Sek110237 			    "writefile opened file %s",
560*5184Sek110237 			    threadflow->tf_fse[fd]->fse_path);
561*5184Sek110237 		}
562*5184Sek110237 		filedesc = threadflow->tf_fd[fd];
563*5184Sek110237 		if (*flowop->fo_wss == 0)
564*5184Sek110237 			wss = threadflow->tf_fse[fd]->fse_size;
565*5184Sek110237 		else
566*5184Sek110237 			wss = *flowop->fo_wss;
567*5184Sek110237 	} else {
568*5184Sek110237 		if (flowop->fo_file == NULL) {
569*5184Sek110237 			filebench_log(LOG_ERROR, "flowop NULL file");
570*5184Sek110237 			return (-1);
571*5184Sek110237 		}
572*5184Sek110237 		if (flowop->fo_fd < 0)
573*5184Sek110237 			flowop->fo_fd = fileobj_open(flowop->fo_file,
574*5184Sek110237 			    flowoplib_fileattrs(flowop));
575*5184Sek110237 
576*5184Sek110237 		if (flowop->fo_fd < 0) {
577*5184Sek110237 			filebench_log(LOG_ERROR, "failed to open file %s",
578*5184Sek110237 			    flowop->fo_file->fo_name);
579*5184Sek110237 			return (-1);
580*5184Sek110237 		}
581*5184Sek110237 		filedesc = flowop->fo_fd;
582*5184Sek110237 		if (*flowop->fo_wss == 0)
583*5184Sek110237 			wss = *flowop->fo_file->fo_size;
584*5184Sek110237 		else
585*5184Sek110237 			wss = *flowop->fo_wss;
586*5184Sek110237 	}
587*5184Sek110237 
588*5184Sek110237 	if (*flowop->fo_iosize == 0) {
589*5184Sek110237 		filebench_log(LOG_ERROR, "zero iosize for thread %s",
590*5184Sek110237 		    flowop->fo_name);
591*5184Sek110237 		return (-1);
592*5184Sek110237 	}
593*5184Sek110237 
594*5184Sek110237 	memsize = *threadflow->tf_memsize;
595*5184Sek110237 	round = *flowop->fo_iosize;
596*5184Sek110237 
597*5184Sek110237 	/* Select memory offset for IO */
598*5184Sek110237 	if (filebench_randomno(&memoffset, memsize, round) == -1) {
599*5184Sek110237 		filebench_log(LOG_ERROR,
600*5184Sek110237 		    "tf_memsize smaller than IO size for thread %s",
601*5184Sek110237 		    flowop->fo_name);
602*5184Sek110237 		return (-1);
603*5184Sek110237 	}
604*5184Sek110237 
605*5184Sek110237 	if (*flowop->fo_random) {
606*5184Sek110237 		uint64_t fileoffset;
607*5184Sek110237 		struct aiocb64 *aiocb;
608*5184Sek110237 		aiolist_t *aiolist;
609*5184Sek110237 
610*5184Sek110237 		if (filebench_randomno64(&fileoffset,
611*5184Sek110237 		    wss, *flowop->fo_iosize) == -1) {
612*5184Sek110237 			filebench_log(LOG_ERROR,
613*5184Sek110237 			    "file size smaller than IO size for thread %s",
614*5184Sek110237 			    flowop->fo_name);
615*5184Sek110237 			return (-1);
616*5184Sek110237 		}
617*5184Sek110237 
618*5184Sek110237 		aiolist = aio_allocate(flowop);
619*5184Sek110237 		aiolist->al_type = AL_WRITE;
620*5184Sek110237 		aiocb = &aiolist->al_aiocb;
621*5184Sek110237 
622*5184Sek110237 		aiocb->aio_fildes = filedesc;
623*5184Sek110237 		aiocb->aio_buf = threadflow->tf_mem + memoffset;
624*5184Sek110237 		aiocb->aio_nbytes = *flowop->fo_iosize;
625*5184Sek110237 		aiocb->aio_offset = (off64_t)fileoffset;
626*5184Sek110237 		aiocb->aio_reqprio = 0;
627*5184Sek110237 
628*5184Sek110237 		filebench_log(LOG_DEBUG_IMPL,
629*5184Sek110237 		    "aio fd=%d, bytes=%lld, offset=%lld",
630*5184Sek110237 		    filedesc, *flowop->fo_iosize, fileoffset);
631*5184Sek110237 
632*5184Sek110237 		flowop_beginop(threadflow, flowop);
633*5184Sek110237 		if (aio_write64(aiocb) < 0) {
634*5184Sek110237 			filebench_log(LOG_ERROR, "aiowrite failed: %s",
635*5184Sek110237 			    strerror(errno));
636*5184Sek110237 			filebench_shutdown(1);
637*5184Sek110237 		}
638*5184Sek110237 		flowop_endop(threadflow, flowop);
639*5184Sek110237 	} else {
640*5184Sek110237 		return (-1);
641*5184Sek110237 	}
642*5184Sek110237 
643*5184Sek110237 	return (0);
644*5184Sek110237 }
645*5184Sek110237 
646*5184Sek110237 
647*5184Sek110237 
648*5184Sek110237 #define	MAXREAP 4096
649*5184Sek110237 
650*5184Sek110237 /*
651*5184Sek110237  * Emulate posix aiowait(). Waits for the completion of half the
652*5184Sek110237  * outstanding asynchronous IOs, or a single IO, which ever is
653*5184Sek110237  * larger. The routine will return after a sufficient number of
654*5184Sek110237  * completed calls issued by any thread in the procflow have
655*5184Sek110237  * completed, or a 1 second timout elapses. All completed
656*5184Sek110237  * IO operations are deleted from the thread's aiolist.
657*5184Sek110237  */
658*5184Sek110237 static int
659*5184Sek110237 flowoplib_aiowait(threadflow_t *threadflow, flowop_t *flowop)
660*5184Sek110237 {
661*5184Sek110237 	struct aiocb64 **worklist;
662*5184Sek110237 	aiolist_t *aio = flowop->fo_thread->tf_aiolist;
663*5184Sek110237 	int uncompleted = 0;
664*5184Sek110237 
665*5184Sek110237 	worklist = calloc(MAXREAP, sizeof (struct aiocb64 *));
666*5184Sek110237 
667*5184Sek110237 	/* Count the list of pending aios */
668*5184Sek110237 	while (aio) {
669*5184Sek110237 		uncompleted++;
670*5184Sek110237 		aio = aio->al_next;
671*5184Sek110237 	}
672*5184Sek110237 
673*5184Sek110237 	do {
674*5184Sek110237 		uint_t ncompleted = 0;
675*5184Sek110237 		uint_t todo;
676*5184Sek110237 		struct timespec timeout;
677*5184Sek110237 		int inprogress;
678*5184Sek110237 		int i;
679*5184Sek110237 
680*5184Sek110237 		/* Wait for half of the outstanding requests */
681*5184Sek110237 		timeout.tv_sec = 1;
682*5184Sek110237 		timeout.tv_nsec = 0;
683*5184Sek110237 
684*5184Sek110237 		if (uncompleted > MAXREAP)
685*5184Sek110237 			todo = MAXREAP;
686*5184Sek110237 		else
687*5184Sek110237 			todo = uncompleted / 2;
688*5184Sek110237 
689*5184Sek110237 		if (todo == 0)
690*5184Sek110237 			todo = 1;
691*5184Sek110237 
692*5184Sek110237 		flowop_beginop(threadflow, flowop);
693*5184Sek110237 
694*5184Sek110237 #ifdef HAVE_AIOWAITN
695*5184Sek110237 		if ((aio_waitn64((struct aiocb64 **)worklist,
696*5184Sek110237 		    MAXREAP, &todo, &timeout) == -1) &&
697*5184Sek110237 		    errno && (errno != ETIME)) {
698*5184Sek110237 			filebench_log(LOG_ERROR,
699*5184Sek110237 			    "aiowait failed: %s, outstanding = %d, "
700*5184Sek110237 			    "ncompleted = %d ",
701*5184Sek110237 			    strerror(errno), uncompleted, todo);
702*5184Sek110237 		}
703*5184Sek110237 
704*5184Sek110237 		ncompleted = todo;
705*5184Sek110237 		/* Take the  completed I/Os from the list */
706*5184Sek110237 		inprogress = 0;
707*5184Sek110237 		for (i = 0; i < ncompleted; i++) {
708*5184Sek110237 			if ((aio_return64(worklist[i]) == -1) &&
709*5184Sek110237 			    (errno == EINPROGRESS)) {
710*5184Sek110237 				inprogress++;
711*5184Sek110237 				continue;
712*5184Sek110237 			}
713*5184Sek110237 			if (aio_deallocate(flowop, worklist[i]) < 0) {
714*5184Sek110237 				filebench_log(LOG_ERROR, "Could not remove "
715*5184Sek110237 				    "aio from list ");
716*5184Sek110237 				flowop_endop(threadflow, flowop);
717*5184Sek110237 				return (-1);
718*5184Sek110237 			}
719*5184Sek110237 		}
720*5184Sek110237 
721*5184Sek110237 		uncompleted -= ncompleted;
722*5184Sek110237 		uncompleted += inprogress;
723*5184Sek110237 
724*5184Sek110237 #else
725*5184Sek110237 
726*5184Sek110237 		for (ncompleted = 0, inprogress = 0,
727*5184Sek110237 		    aio = flowop->fo_thread->tf_aiolist;
728*5184Sek110237 		    ncompleted < todo, aio != NULL; aio = aio->al_next) {
729*5184Sek110237 
730*5184Sek110237 			result = aio_error64(&aio->al_aiocb);
731*5184Sek110237 
732*5184Sek110237 			if (result == EINPROGRESS) {
733*5184Sek110237 				inprogress++;
734*5184Sek110237 				continue;
735*5184Sek110237 			}
736*5184Sek110237 
737*5184Sek110237 			if ((aio_return64(&aio->al_aiocb) == -1) || result) {
738*5184Sek110237 				filebench_log(LOG_ERROR, "aio failed: %s",
739*5184Sek110237 				    strerror(result));
740*5184Sek110237 				continue;
741*5184Sek110237 			}
742*5184Sek110237 
743*5184Sek110237 			ncompleted++;
744*5184Sek110237 
745*5184Sek110237 			if (aio_deallocate(flowop, &aio->al_aiocb) < 0) {
746*5184Sek110237 				filebench_log(LOG_ERROR, "Could not remove aio "
747*5184Sek110237 				    "from list ");
748*5184Sek110237 				flowop_endop(threadflow, flowop);
749*5184Sek110237 				return (-1);
750*5184Sek110237 			}
751*5184Sek110237 		}
752*5184Sek110237 
753*5184Sek110237 		uncompleted -= ncompleted;
754*5184Sek110237 
755*5184Sek110237 #endif
756*5184Sek110237 		filebench_log(LOG_DEBUG_SCRIPT,
757*5184Sek110237 		    "aio2 completed %d ios, uncompleted = %d, inprogress = %d",
758*5184Sek110237 		    ncompleted, uncompleted, inprogress);
759*5184Sek110237 
760*5184Sek110237 	} while (uncompleted > MAXREAP);
761*5184Sek110237 
762*5184Sek110237 	flowop_endop(threadflow, flowop);
763*5184Sek110237 
764*5184Sek110237 	free(worklist);
765*5184Sek110237 
766*5184Sek110237 	return (0);
767*5184Sek110237 }
768*5184Sek110237 
769*5184Sek110237 #endif /* HAVE_AIO */
770*5184Sek110237 
771*5184Sek110237 /*
772*5184Sek110237  * Initializes a "flowop_block" flowop. Specifically, it
773*5184Sek110237  * initializes the flowop's fo_cv and unlocks the fo_lock.
774*5184Sek110237  */
775*5184Sek110237 static int
776*5184Sek110237 flowoplib_block_init(flowop_t *flowop)
777*5184Sek110237 {
778*5184Sek110237 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d block init address %zx",
779*5184Sek110237 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
780*5184Sek110237 	(void) pthread_cond_init(&flowop->fo_cv, ipc_condattr());
781*5184Sek110237 	(void) ipc_mutex_unlock(&flowop->fo_lock);
782*5184Sek110237 
783*5184Sek110237 	return (0);
784*5184Sek110237 }
785*5184Sek110237 
786*5184Sek110237 /*
787*5184Sek110237  * Blocks the threadflow until woken up by flowoplib_wakeup.
788*5184Sek110237  * The routine blocks on the flowop's fo_cv condition variable.
789*5184Sek110237  */
790*5184Sek110237 static int
791*5184Sek110237 flowoplib_block(threadflow_t *threadflow, flowop_t *flowop)
792*5184Sek110237 {
793*5184Sek110237 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d blocking at address %zx",
794*5184Sek110237 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
795*5184Sek110237 	(void) ipc_mutex_lock(&flowop->fo_lock);
796*5184Sek110237 
797*5184Sek110237 	flowop_beginop(threadflow, flowop);
798*5184Sek110237 	(void) pthread_cond_wait(&flowop->fo_cv, &flowop->fo_lock);
799*5184Sek110237 	flowop_endop(threadflow, flowop);
800*5184Sek110237 
801*5184Sek110237 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
802*5184Sek110237 	    flowop->fo_name, flowop->fo_instance);
803*5184Sek110237 
804*5184Sek110237 	(void) ipc_mutex_unlock(&flowop->fo_lock);
805*5184Sek110237 
806*5184Sek110237 	return (0);
807*5184Sek110237 }
808*5184Sek110237 
809*5184Sek110237 /*
810*5184Sek110237  * Wakes up one or more target blocking flowops.
811*5184Sek110237  * Sends broadcasts on the fo_cv condition variables of all
812*5184Sek110237  * flowops on the target list, except those that are
813*5184Sek110237  * FLOW_MASTER flowops. The target list consists of all
814*5184Sek110237  * flowops whose name matches this flowop's "fo_targetname"
815*5184Sek110237  * attribute. The target list is generated on the first
816*5184Sek110237  * invocation, and the run will be shutdown if no targets
817*5184Sek110237  * are found. Otherwise the routine always returns 0.
818*5184Sek110237  */
819*5184Sek110237 static int
820*5184Sek110237 flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop)
821*5184Sek110237 {
822*5184Sek110237 	flowop_t *target;
823*5184Sek110237 
824*5184Sek110237 	/* if this is the first wakeup, create the wakeup list */
825*5184Sek110237 	if (flowop->fo_targets == NULL) {
826*5184Sek110237 		flowop_t *result = flowop_find(flowop->fo_targetname);
827*5184Sek110237 
828*5184Sek110237 		flowop->fo_targets = result;
829*5184Sek110237 		if (result == NULL) {
830*5184Sek110237 			filebench_log(LOG_ERROR,
831*5184Sek110237 			    "wakeup: could not find op %s for thread %s",
832*5184Sek110237 			    flowop->fo_targetname,
833*5184Sek110237 			    threadflow->tf_name);
834*5184Sek110237 			filebench_shutdown(1);
835*5184Sek110237 		}
836*5184Sek110237 		while (result) {
837*5184Sek110237 			result->fo_targetnext =
838*5184Sek110237 			    result->fo_resultnext;
839*5184Sek110237 			result = result->fo_resultnext;
840*5184Sek110237 		}
841*5184Sek110237 	}
842*5184Sek110237 
843*5184Sek110237 	target = flowop->fo_targets;
844*5184Sek110237 
845*5184Sek110237 	/* wakeup the targets */
846*5184Sek110237 	while (target) {
847*5184Sek110237 		if (target->fo_instance == FLOW_MASTER) {
848*5184Sek110237 			target = target->fo_targetnext;
849*5184Sek110237 			continue;
850*5184Sek110237 		}
851*5184Sek110237 		filebench_log(LOG_DEBUG_IMPL,
852*5184Sek110237 		    "wakeup flow %s-%d at address %zx",
853*5184Sek110237 		    target->fo_name,
854*5184Sek110237 		    target->fo_instance,
855*5184Sek110237 		    &target->fo_cv);
856*5184Sek110237 
857*5184Sek110237 		flowop_beginop(threadflow, flowop);
858*5184Sek110237 		(void) ipc_mutex_lock(&target->fo_lock);
859*5184Sek110237 		(void) pthread_cond_broadcast(&target->fo_cv);
860*5184Sek110237 		(void) ipc_mutex_unlock(&target->fo_lock);
861*5184Sek110237 		flowop_endop(threadflow, flowop);
862*5184Sek110237 
863*5184Sek110237 		target = target->fo_targetnext;
864*5184Sek110237 	}
865*5184Sek110237 
866*5184Sek110237 	return (0);
867*5184Sek110237 }
868*5184Sek110237 
869*5184Sek110237 /*
870*5184Sek110237  * "think time" routines. the "hog" routine consumes cpu cycles as
871*5184Sek110237  * it "thinks", while the "delay" flowop simply calls sleep() to delay
872*5184Sek110237  * for a given number of seconds without consuming cpu cycles.
873*5184Sek110237  */
874*5184Sek110237 
875*5184Sek110237 
876*5184Sek110237 /*
877*5184Sek110237  * Consumes CPU cycles and memory bandwidth by looping for
878*5184Sek110237  * flowop->fo_value times. With each loop sets memory location
879*5184Sek110237  * threadflow->tf_mem to 1.
880*5184Sek110237  */
881*5184Sek110237 static int
882*5184Sek110237 flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop)
883*5184Sek110237 {
884*5184Sek110237 	uint64_t value = *flowop->fo_value;
885*5184Sek110237 	int i;
886*5184Sek110237 
887*5184Sek110237 	flowop_beginop(threadflow, flowop);
888*5184Sek110237 	filebench_log(LOG_DEBUG_IMPL, "hog enter");
889*5184Sek110237 	for (i = 0; i < value; i++)
890*5184Sek110237 		*(threadflow->tf_mem) = 1;
891*5184Sek110237 	flowop_endop(threadflow, flowop);
892*5184Sek110237 	filebench_log(LOG_DEBUG_IMPL, "hog exit");
893*5184Sek110237 	return (0);
894*5184Sek110237 }
895*5184Sek110237 
896*5184Sek110237 
897*5184Sek110237 /*
898*5184Sek110237  * Delays for fo_value seconds.
899*5184Sek110237  */
900*5184Sek110237 static int
901*5184Sek110237 flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop)
902*5184Sek110237 {
903*5184Sek110237 	int value = *flowop->fo_value;
904*5184Sek110237 
905*5184Sek110237 	flowop_beginop(threadflow, flowop);
906*5184Sek110237 	(void) sleep(value);
907*5184Sek110237 	flowop_endop(threadflow, flowop);
908*5184Sek110237 	return (0);
909*5184Sek110237 }
910*5184Sek110237 
911*5184Sek110237 /*
912*5184Sek110237  * Rate limiting routines. This is the event consuming half of the
913*5184Sek110237  * event system. Each of the four following routines will limit the rate
914*5184Sek110237  * to one unit of either calls, issued I/O operations, issued filebench
915*5184Sek110237  * operations, or I/O bandwidth. Since there is only one event generator,
916*5184Sek110237  * the events will be divided amoung multiple instances of an event
917*5184Sek110237  * consumer, and further divided among different consumers if more than
918*5184Sek110237  * one has been defined. There is no mechanism to enforce equal sharing
919*5184Sek110237  * of events.
920*5184Sek110237  */
921*5184Sek110237 
922*5184Sek110237 /*
923*5184Sek110237  * Completes one invocation per posted event. If eventgen_q
924*5184Sek110237  * has an event count greater than zero, one will be removed
925*5184Sek110237  * (count decremented), otherwise the calling thread will
926*5184Sek110237  * block until another event has been posted. Always returns 0
927*5184Sek110237  */
928*5184Sek110237 static int
929*5184Sek110237 flowoplib_eventlimit(threadflow_t *threadflow, flowop_t *flowop)
930*5184Sek110237 {
931*5184Sek110237 	/* Immediately bail if not set/enabled */
932*5184Sek110237 	if (filebench_shm->eventgen_hz == 0)
933*5184Sek110237 		return (0);
934*5184Sek110237 
935*5184Sek110237 	if (flowop->fo_initted == 0) {
936*5184Sek110237 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
937*5184Sek110237 		    flowop, threadflow->tf_name, threadflow->tf_instance);
938*5184Sek110237 		flowop->fo_initted = 1;
939*5184Sek110237 	}
940*5184Sek110237 
941*5184Sek110237 	flowop_beginop(threadflow, flowop);
942*5184Sek110237 	while (filebench_shm->eventgen_hz) {
943*5184Sek110237 		(void) ipc_mutex_lock(&filebench_shm->eventgen_lock);
944*5184Sek110237 		if (filebench_shm->eventgen_q > 0) {
945*5184Sek110237 			filebench_shm->eventgen_q--;
946*5184Sek110237 			(void) ipc_mutex_unlock(&filebench_shm->eventgen_lock);
947*5184Sek110237 			break;
948*5184Sek110237 		}
949*5184Sek110237 		(void) pthread_cond_wait(&filebench_shm->eventgen_cv,
950*5184Sek110237 		    &filebench_shm->eventgen_lock);
951*5184Sek110237 		(void) ipc_mutex_unlock(&filebench_shm->eventgen_lock);
952*5184Sek110237 	}
953*5184Sek110237 	flowop_endop(threadflow, flowop);
954*5184Sek110237 	return (0);
955*5184Sek110237 }
956*5184Sek110237 
957*5184Sek110237 /*
958*5184Sek110237  * Blocks the calling thread if the number of issued I/O
959*5184Sek110237  * operations exceeds the number of posted events, thus
960*5184Sek110237  * limiting the average I/O operation rate to the rate
961*5184Sek110237  * specified by eventgen_hz. Always returns 0.
962*5184Sek110237  */
963*5184Sek110237 static int
964*5184Sek110237 flowoplib_iopslimit(threadflow_t *threadflow, flowop_t *flowop)
965*5184Sek110237 {
966*5184Sek110237 	uint64_t iops;
967*5184Sek110237 	uint64_t delta;
968*5184Sek110237 	int events;
969*5184Sek110237 
970*5184Sek110237 	/* Immediately bail if not set/enabled */
971*5184Sek110237 	if (filebench_shm->eventgen_hz == 0)
972*5184Sek110237 		return (0);
973*5184Sek110237 
974*5184Sek110237 	if (flowop->fo_initted == 0) {
975*5184Sek110237 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
976*5184Sek110237 		    flowop, threadflow->tf_name, threadflow->tf_instance);
977*5184Sek110237 		flowop->fo_initted = 1;
978*5184Sek110237 	}
979*5184Sek110237 
980*5184Sek110237 	iops = (controlstats.fs_rcount +
981*5184Sek110237 	    controlstats.fs_wcount);
982*5184Sek110237 
983*5184Sek110237 	/* Is this the first time around */
984*5184Sek110237 	if (flowop->fo_tputlast == 0) {
985*5184Sek110237 		flowop->fo_tputlast = iops;
986*5184Sek110237 		return (0);
987*5184Sek110237 	}
988*5184Sek110237 
989*5184Sek110237 	delta = iops - flowop->fo_tputlast;
990*5184Sek110237 	flowop->fo_tputbucket -= delta;
991*5184Sek110237 	flowop->fo_tputlast = iops;
992*5184Sek110237 
993*5184Sek110237 	/* No need to block if the q isn't empty */
994*5184Sek110237 	if (flowop->fo_tputbucket >= 0LL) {
995*5184Sek110237 		flowop_endop(threadflow, flowop);
996*5184Sek110237 		return (0);
997*5184Sek110237 	}
998*5184Sek110237 
999*5184Sek110237 	iops = flowop->fo_tputbucket * -1;
1000*5184Sek110237 	events = iops;
1001*5184Sek110237 
1002*5184Sek110237 	flowop_beginop(threadflow, flowop);
1003*5184Sek110237 	while (filebench_shm->eventgen_hz) {
1004*5184Sek110237 
1005*5184Sek110237 		(void) ipc_mutex_lock(&filebench_shm->eventgen_lock);
1006*5184Sek110237 		if (filebench_shm->eventgen_q >= events) {
1007*5184Sek110237 			filebench_shm->eventgen_q -= events;
1008*5184Sek110237 			(void) ipc_mutex_unlock(&filebench_shm->eventgen_lock);
1009*5184Sek110237 			flowop->fo_tputbucket += events;
1010*5184Sek110237 			break;
1011*5184Sek110237 		}
1012*5184Sek110237 		(void) pthread_cond_wait(&filebench_shm->eventgen_cv,
1013*5184Sek110237 		    &filebench_shm->eventgen_lock);
1014*5184Sek110237 		(void) ipc_mutex_unlock(&filebench_shm->eventgen_lock);
1015*5184Sek110237 	}
1016*5184Sek110237 	flowop_endop(threadflow, flowop);
1017*5184Sek110237 
1018*5184Sek110237 	return (0);
1019*5184Sek110237 }
1020*5184Sek110237 
1021*5184Sek110237 /*
1022*5184Sek110237  * Blocks the calling thread if the number of issued filebench
1023*5184Sek110237  * operations exceeds the number of posted events, thus limiting
1024*5184Sek110237  * the average filebench operation rate to the rate specified by
1025*5184Sek110237  * eventgen_hz. Always returns 0.
1026*5184Sek110237  */
1027*5184Sek110237 static int
1028*5184Sek110237 flowoplib_opslimit(threadflow_t *threadflow, flowop_t *flowop)
1029*5184Sek110237 {
1030*5184Sek110237 	uint64_t ops;
1031*5184Sek110237 	uint64_t delta;
1032*5184Sek110237 	int events;
1033*5184Sek110237 
1034*5184Sek110237 	/* Immediately bail if not set/enabled */
1035*5184Sek110237 	if (filebench_shm->eventgen_hz == 0)
1036*5184Sek110237 		return (0);
1037*5184Sek110237 
1038*5184Sek110237 	if (flowop->fo_initted == 0) {
1039*5184Sek110237 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1040*5184Sek110237 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1041*5184Sek110237 		flowop->fo_initted = 1;
1042*5184Sek110237 	}
1043*5184Sek110237 
1044*5184Sek110237 	ops = controlstats.fs_count;
1045*5184Sek110237 
1046*5184Sek110237 	/* Is this the first time around */
1047*5184Sek110237 	if (flowop->fo_tputlast == 0) {
1048*5184Sek110237 		flowop->fo_tputlast = ops;
1049*5184Sek110237 		return (0);
1050*5184Sek110237 	}
1051*5184Sek110237 
1052*5184Sek110237 	delta = ops - flowop->fo_tputlast;
1053*5184Sek110237 	flowop->fo_tputbucket -= delta;
1054*5184Sek110237 	flowop->fo_tputlast = ops;
1055*5184Sek110237 
1056*5184Sek110237 	/* No need to block if the q isn't empty */
1057*5184Sek110237 	if (flowop->fo_tputbucket >= 0LL) {
1058*5184Sek110237 		flowop_endop(threadflow, flowop);
1059*5184Sek110237 		return (0);
1060*5184Sek110237 	}
1061*5184Sek110237 
1062*5184Sek110237 	ops = flowop->fo_tputbucket * -1;
1063*5184Sek110237 	events = ops;
1064*5184Sek110237 
1065*5184Sek110237 	flowop_beginop(threadflow, flowop);
1066*5184Sek110237 	while (filebench_shm->eventgen_hz) {
1067*5184Sek110237 		(void) ipc_mutex_lock(&filebench_shm->eventgen_lock);
1068*5184Sek110237 		if (filebench_shm->eventgen_q >= events) {
1069*5184Sek110237 			filebench_shm->eventgen_q -= events;
1070*5184Sek110237 			(void) ipc_mutex_unlock(&filebench_shm->eventgen_lock);
1071*5184Sek110237 			flowop->fo_tputbucket += events;
1072*5184Sek110237 			break;
1073*5184Sek110237 		}
1074*5184Sek110237 		(void) pthread_cond_wait(&filebench_shm->eventgen_cv,
1075*5184Sek110237 		    &filebench_shm->eventgen_lock);
1076*5184Sek110237 		(void) ipc_mutex_unlock(&filebench_shm->eventgen_lock);
1077*5184Sek110237 	}
1078*5184Sek110237 	flowop_endop(threadflow, flowop);
1079*5184Sek110237 
1080*5184Sek110237 	return (0);
1081*5184Sek110237 }
1082*5184Sek110237 
1083*5184Sek110237 
1084*5184Sek110237 /*
1085*5184Sek110237  * Blocks the calling thread if the number of bytes of I/O
1086*5184Sek110237  * issued exceeds one megabyte times the number of posted
1087*5184Sek110237  * events, thus limiting the average I/O byte rate to one
1088*5184Sek110237  * megabyte times the event rate as set by eventgen_hz.
1089*5184Sek110237  * Always retuns 0.
1090*5184Sek110237  */
1091*5184Sek110237 static int
1092*5184Sek110237 flowoplib_bwlimit(threadflow_t *threadflow, flowop_t *flowop)
1093*5184Sek110237 {
1094*5184Sek110237 	uint64_t bytes;
1095*5184Sek110237 	uint64_t delta;
1096*5184Sek110237 	int events;
1097*5184Sek110237 
1098*5184Sek110237 	/* Immediately bail if not set/enabled */
1099*5184Sek110237 	if (filebench_shm->eventgen_hz == 0)
1100*5184Sek110237 		return (0);
1101*5184Sek110237 
1102*5184Sek110237 	if (flowop->fo_initted == 0) {
1103*5184Sek110237 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1104*5184Sek110237 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1105*5184Sek110237 		flowop->fo_initted = 1;
1106*5184Sek110237 	}
1107*5184Sek110237 
1108*5184Sek110237 	bytes = (controlstats.fs_rbytes +
1109*5184Sek110237 	    controlstats.fs_wbytes);
1110*5184Sek110237 
1111*5184Sek110237 	/* Is this the first time around */
1112*5184Sek110237 	if (flowop->fo_tputlast == 0) {
1113*5184Sek110237 		flowop->fo_tputlast = bytes;
1114*5184Sek110237 		return (0);
1115*5184Sek110237 	}
1116*5184Sek110237 
1117*5184Sek110237 	delta = bytes - flowop->fo_tputlast;
1118*5184Sek110237 	flowop->fo_tputbucket -= delta;
1119*5184Sek110237 	flowop->fo_tputlast = bytes;
1120*5184Sek110237 
1121*5184Sek110237 	/* No need to block if the q isn't empty */
1122*5184Sek110237 	if (flowop->fo_tputbucket >= 0LL) {
1123*5184Sek110237 		flowop_endop(threadflow, flowop);
1124*5184Sek110237 		return (0);
1125*5184Sek110237 	}
1126*5184Sek110237 
1127*5184Sek110237 	bytes = flowop->fo_tputbucket * -1;
1128*5184Sek110237 	events = (bytes / MB) + 1;
1129*5184Sek110237 
1130*5184Sek110237 	filebench_log(LOG_DEBUG_IMPL, "%lld bytes, %lld events",
1131*5184Sek110237 	    bytes, events);
1132*5184Sek110237 
1133*5184Sek110237 	flowop_beginop(threadflow, flowop);
1134*5184Sek110237 	while (filebench_shm->eventgen_hz) {
1135*5184Sek110237 		(void) ipc_mutex_lock(&filebench_shm->eventgen_lock);
1136*5184Sek110237 		if (filebench_shm->eventgen_q >= events) {
1137*5184Sek110237 			filebench_shm->eventgen_q -= events;
1138*5184Sek110237 			(void) ipc_mutex_unlock(&filebench_shm->eventgen_lock);
1139*5184Sek110237 			flowop->fo_tputbucket += (events * MB);
1140*5184Sek110237 			break;
1141*5184Sek110237 		}
1142*5184Sek110237 		(void) pthread_cond_wait(&filebench_shm->eventgen_cv,
1143*5184Sek110237 		    &filebench_shm->eventgen_lock);
1144*5184Sek110237 		(void) ipc_mutex_unlock(&filebench_shm->eventgen_lock);
1145*5184Sek110237 	}
1146*5184Sek110237 	flowop_endop(threadflow, flowop);
1147*5184Sek110237 
1148*5184Sek110237 	return (0);
1149*5184Sek110237 }
1150*5184Sek110237 
1151*5184Sek110237 /*
1152*5184Sek110237  * These flowops terminate a benchmark run when either the specified
1153*5184Sek110237  * number of bytes of I/O (flowoplib_finishonbytes) or the specified
1154*5184Sek110237  * number of I/O operations (flowoplib_finishoncount) have been generated.
1155*5184Sek110237  */
1156*5184Sek110237 
1157*5184Sek110237 
1158*5184Sek110237 /*
1159*5184Sek110237  * Stop filebench run when specified number of I/O bytes have been
1160*5184Sek110237  * transferred. Compares controlstats.fs_bytes with *flowop->value,
1161*5184Sek110237  * and if greater returns 1, stopping the run, if not, returns 0
1162*5184Sek110237  * to continue running.
1163*5184Sek110237  */
1164*5184Sek110237 static int
1165*5184Sek110237 flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop)
1166*5184Sek110237 {
1167*5184Sek110237 	uint64_t b;
1168*5184Sek110237 	uint64_t bytes = *flowop->fo_value;
1169*5184Sek110237 
1170*5184Sek110237 	b = controlstats.fs_bytes;
1171*5184Sek110237 
1172*5184Sek110237 	flowop_beginop(threadflow, flowop);
1173*5184Sek110237 	if (b > bytes) {
1174*5184Sek110237 		flowop_endop(threadflow, flowop);
1175*5184Sek110237 		return (1);
1176*5184Sek110237 	}
1177*5184Sek110237 	flowop_endop(threadflow, flowop);
1178*5184Sek110237 
1179*5184Sek110237 	return (0);
1180*5184Sek110237 }
1181*5184Sek110237 
1182*5184Sek110237 /*
1183*5184Sek110237  * Stop filebench run when specified number of I/O operations have
1184*5184Sek110237  * been performed. Compares controlstats.fs_count with *flowop->value,
1185*5184Sek110237  * and if greater returns 1, stopping the run, if not, returns 0 to
1186*5184Sek110237  * continue running.
1187*5184Sek110237  */
1188*5184Sek110237 static int
1189*5184Sek110237 flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop)
1190*5184Sek110237 {
1191*5184Sek110237 	uint64_t ops;
1192*5184Sek110237 	uint64_t count = *flowop->fo_value;
1193*5184Sek110237 
1194*5184Sek110237 	ops = controlstats.fs_count;
1195*5184Sek110237 
1196*5184Sek110237 	flowop_beginop(threadflow, flowop);
1197*5184Sek110237 	if (ops > count) {
1198*5184Sek110237 		flowop_endop(threadflow, flowop);
1199*5184Sek110237 		return (1);
1200*5184Sek110237 	}
1201*5184Sek110237 	flowop_endop(threadflow, flowop);
1202*5184Sek110237 
1203*5184Sek110237 	return (0);
1204*5184Sek110237 }
1205*5184Sek110237 
1206*5184Sek110237 /*
1207*5184Sek110237  * Semaphore synchronization using either System V semaphores or
1208*5184Sek110237  * posix semaphores. If System V semaphores are available, they will be
1209*5184Sek110237  * used, otherwise posix semaphores will be used.
1210*5184Sek110237  */
1211*5184Sek110237 
1212*5184Sek110237 
1213*5184Sek110237 /*
1214*5184Sek110237  * Initializes the filebench "block on semaphore" flowop.
1215*5184Sek110237  * If System V semaphores are implemented, the routine
1216*5184Sek110237  * initializes the System V semaphore subsystem if it hasn't
1217*5184Sek110237  * already been initialized, also allocates a pair of semids
1218*5184Sek110237  * and initializes the highwater System V semaphore.
1219*5184Sek110237  * If no System V semaphores, then does nothing special.
1220*5184Sek110237  * Returns -1 if it cannot acquire a set of System V semphores
1221*5184Sek110237  * or if the initial post to the semaphore set fails. Returns 0
1222*5184Sek110237  * on success.
1223*5184Sek110237  */
1224*5184Sek110237 static int
1225*5184Sek110237 flowoplib_semblock_init(flowop_t *flowop)
1226*5184Sek110237 {
1227*5184Sek110237 
1228*5184Sek110237 #ifdef HAVE_SYSV_SEM
1229*5184Sek110237 	int semid;
1230*5184Sek110237 	struct sembuf sbuf[2];
1231*5184Sek110237 	int highwater;
1232*5184Sek110237 
1233*5184Sek110237 	ipc_seminit();
1234*5184Sek110237 
1235*5184Sek110237 	flowop->fo_semid_lw = ipc_semidalloc();
1236*5184Sek110237 	flowop->fo_semid_hw = ipc_semidalloc();
1237*5184Sek110237 
1238*5184Sek110237 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d semblock init semid=%x",
1239*5184Sek110237 	    flowop->fo_name, flowop->fo_instance, flowop->fo_semid_lw);
1240*5184Sek110237 
1241*5184Sek110237 	/*
1242*5184Sek110237 	 * Raise the number of the hw queue, causing the posting side to
1243*5184Sek110237 	 * block if queue is > 2 x blocking value
1244*5184Sek110237 	 */
1245*5184Sek110237 	if ((semid = semget(filebench_shm->semkey, FILEBENCH_NSEMS, 0)) == -1) {
1246*5184Sek110237 		filebench_log(LOG_ERROR, "semblock init lookup %x failed: %s",
1247*5184Sek110237 		    filebench_shm->semkey,
1248*5184Sek110237 		    strerror(errno));
1249*5184Sek110237 		return (-1);
1250*5184Sek110237 	}
1251*5184Sek110237 
1252*5184Sek110237 	if ((highwater = flowop->fo_semid_hw) == 0)
1253*5184Sek110237 		highwater = *flowop->fo_value;
1254*5184Sek110237 
1255*5184Sek110237 	filebench_log(LOG_DEBUG_IMPL, "setting highwater to : %d", highwater);
1256*5184Sek110237 
1257*5184Sek110237 	sbuf[0].sem_num = highwater;
1258*5184Sek110237 	sbuf[0].sem_op = *flowop->fo_highwater;
1259*5184Sek110237 	sbuf[0].sem_flg = 0;
1260*5184Sek110237 	if ((semop(semid, &sbuf[0], 1) == -1) && errno) {
1261*5184Sek110237 		filebench_log(LOG_ERROR, "semblock init post failed: %s (%d,"
1262*5184Sek110237 		    "%d)", strerror(errno), sbuf[0].sem_num, sbuf[0].sem_op);
1263*5184Sek110237 		return (-1);
1264*5184Sek110237 	}
1265*5184Sek110237 #else
1266*5184Sek110237 	filebench_log(LOG_DEBUG_IMPL,
1267*5184Sek110237 	    "flow %s-%d semblock init with posix semaphore",
1268*5184Sek110237 	    flowop->fo_name, flowop->fo_instance);
1269*5184Sek110237 
1270*5184Sek110237 	sem_init(&flowop->fo_sem, 1, 0);
1271*5184Sek110237 #endif	/* HAVE_SYSV_SEM */
1272*5184Sek110237 
1273*5184Sek110237 	if (!(*flowop->fo_blocking))
1274*5184Sek110237 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1275*5184Sek110237 
1276*5184Sek110237 	return (0);
1277*5184Sek110237 }
1278*5184Sek110237 
1279*5184Sek110237 /*
1280*5184Sek110237  * Releases the semids for the System V semaphore allocated
1281*5184Sek110237  * to this flowop. If not using System V semaphores, then
1282*5184Sek110237  * it is effectively just a no-op. Always returns 0.
1283*5184Sek110237  */
1284*5184Sek110237 static void
1285*5184Sek110237 flowoplib_semblock_destruct(flowop_t *flowop)
1286*5184Sek110237 {
1287*5184Sek110237 #ifdef HAVE_SYSV_SEM
1288*5184Sek110237 	ipc_semidfree(flowop->fo_semid_lw);
1289*5184Sek110237 	ipc_semidfree(flowop->fo_semid_hw);
1290*5184Sek110237 #else
1291*5184Sek110237 	sem_destroy(&flowop->fo_sem);
1292*5184Sek110237 #endif /* HAVE_SYSV_SEM */
1293*5184Sek110237 }
1294*5184Sek110237 
1295*5184Sek110237 /*
1296*5184Sek110237  * Attempts to pass a System V or posix semaphore as appropriate,
1297*5184Sek110237  * and blocks if necessary. Returns -1 if a set of System V
1298*5184Sek110237  * semphores is not available or cannot be acquired, or if the initial
1299*5184Sek110237  * post to the semaphore set fails. Returns 0 on success.
1300*5184Sek110237  */
1301*5184Sek110237 static int
1302*5184Sek110237 flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop)
1303*5184Sek110237 {
1304*5184Sek110237 
1305*5184Sek110237 #ifdef HAVE_SYSV_SEM
1306*5184Sek110237 	struct sembuf sbuf[2];
1307*5184Sek110237 	int value = *flowop->fo_value;
1308*5184Sek110237 	int semid;
1309*5184Sek110237 	struct timespec timeout;
1310*5184Sek110237 
1311*5184Sek110237 	if ((semid = semget(filebench_shm->semkey, FILEBENCH_NSEMS, 0)) == -1) {
1312*5184Sek110237 		filebench_log(LOG_ERROR, "lookup semop %x failed: %s",
1313*5184Sek110237 		    filebench_shm->semkey,
1314*5184Sek110237 		    strerror(errno));
1315*5184Sek110237 		return (-1);
1316*5184Sek110237 	}
1317*5184Sek110237 
1318*5184Sek110237 	filebench_log(LOG_DEBUG_IMPL,
1319*5184Sek110237 	    "flow %s-%d sem blocking on id %x num %x value %d",
1320*5184Sek110237 	    flowop->fo_name, flowop->fo_instance, semid,
1321*5184Sek110237 	    flowop->fo_semid_hw, value);
1322*5184Sek110237 
1323*5184Sek110237 	/* Post, decrement the increment the hw queue */
1324*5184Sek110237 	sbuf[0].sem_num = flowop->fo_semid_hw;
1325*5184Sek110237 	sbuf[0].sem_op = value;
1326*5184Sek110237 	sbuf[0].sem_flg = 0;
1327*5184Sek110237 	sbuf[1].sem_num = flowop->fo_semid_lw;
1328*5184Sek110237 	sbuf[1].sem_op = value * -1;
1329*5184Sek110237 	sbuf[1].sem_flg = 0;
1330*5184Sek110237 	timeout.tv_sec = 600;
1331*5184Sek110237 	timeout.tv_nsec = 0;
1332*5184Sek110237 
1333*5184Sek110237 	if (*flowop->fo_blocking)
1334*5184Sek110237 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1335*5184Sek110237 
1336*5184Sek110237 	flowop_beginop(threadflow, flowop);
1337*5184Sek110237 
1338*5184Sek110237 #ifdef HAVE_SEMTIMEDOP
1339*5184Sek110237 	(void) semtimedop(semid, &sbuf[0], 1, &timeout);
1340*5184Sek110237 	(void) semtimedop(semid, &sbuf[1], 1, &timeout);
1341*5184Sek110237 #else
1342*5184Sek110237 	(void) semop(semid, &sbuf[0], 1);
1343*5184Sek110237 	(void) semop(semid, &sbuf[1], 1);
1344*5184Sek110237 #endif /* HAVE_SEMTIMEDOP */
1345*5184Sek110237 
1346*5184Sek110237 	if (*flowop->fo_blocking)
1347*5184Sek110237 		(void) ipc_mutex_lock(&flowop->fo_lock);
1348*5184Sek110237 
1349*5184Sek110237 	flowop_endop(threadflow, flowop);
1350*5184Sek110237 
1351*5184Sek110237 #else
1352*5184Sek110237 	int value = *flowop->fo_value;
1353*5184Sek110237 	int i;
1354*5184Sek110237 
1355*5184Sek110237 	filebench_log(LOG_DEBUG_IMPL,
1356*5184Sek110237 	    "flow %s-%d sem blocking on posix semaphore",
1357*5184Sek110237 	    flowop->fo_name, flowop->fo_instance);
1358*5184Sek110237 
1359*5184Sek110237 	/* Decrement sem by value */
1360*5184Sek110237 	for (i = 0; i < value; i++) {
1361*5184Sek110237 		if (sem_wait(&flowop->fo_sem) == -1) {
1362*5184Sek110237 			filebench_log(LOG_ERROR, "semop wait failed");
1363*5184Sek110237 			return (-1);
1364*5184Sek110237 		}
1365*5184Sek110237 	}
1366*5184Sek110237 
1367*5184Sek110237 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d sem unblocking",
1368*5184Sek110237 	    flowop->fo_name, flowop->fo_instance);
1369*5184Sek110237 #endif /* HAVE_SYSV_SEM */
1370*5184Sek110237 
1371*5184Sek110237 	return (0);
1372*5184Sek110237 }
1373*5184Sek110237 
1374*5184Sek110237 /*
1375*5184Sek110237  * Calls ipc_seminit(), and does so whether System V semaphores
1376*5184Sek110237  * are available or not. Hence it will cause ipc_seminit to log errors
1377*5184Sek110237  * if they are not. Always returns 0.
1378*5184Sek110237  */
1379*5184Sek110237 /* ARGSUSED */
1380*5184Sek110237 static int
1381*5184Sek110237 flowoplib_sempost_init(flowop_t *flowop)
1382*5184Sek110237 {
1383*5184Sek110237 #ifdef HAVE_SYSV_SEM
1384*5184Sek110237 	ipc_seminit();
1385*5184Sek110237 #endif /* HAVE_SYSV_SEM */
1386*5184Sek110237 	return (0);
1387*5184Sek110237 }
1388*5184Sek110237 
1389*5184Sek110237 /*
1390*5184Sek110237  * Post to a System V or posix semaphore as appropriate.
1391*5184Sek110237  * On the first call for a given flowop instance, this routine
1392*5184Sek110237  * will use the fo_targetname attribute to locate all semblock
1393*5184Sek110237  * flowops that are expecting posts from this flowop. All
1394*5184Sek110237  * target flowops on this list will have a post operation done
1395*5184Sek110237  * to their semaphores on each call.
1396*5184Sek110237  */
1397*5184Sek110237 static int
1398*5184Sek110237 flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop)
1399*5184Sek110237 {
1400*5184Sek110237 	flowop_t *target;
1401*5184Sek110237 
1402*5184Sek110237 	filebench_log(LOG_DEBUG_IMPL,
1403*5184Sek110237 	    "sempost flow %s-%d",
1404*5184Sek110237 	    flowop->fo_name,
1405*5184Sek110237 	    flowop->fo_instance);
1406*5184Sek110237 
1407*5184Sek110237 	/* if this is the first post, create the post list */
1408*5184Sek110237 	if (flowop->fo_targets == NULL) {
1409*5184Sek110237 		flowop_t *result = flowop_find(flowop->fo_targetname);
1410*5184Sek110237 
1411*5184Sek110237 		flowop->fo_targets = result;
1412*5184Sek110237 
1413*5184Sek110237 		if (result == NULL) {
1414*5184Sek110237 			filebench_log(LOG_ERROR,
1415*5184Sek110237 			    "sempost: could not find op %s for thread %s",
1416*5184Sek110237 			    flowop->fo_targetname,
1417*5184Sek110237 			    threadflow->tf_name);
1418*5184Sek110237 			filebench_shutdown(1);
1419*5184Sek110237 		}
1420*5184Sek110237 
1421*5184Sek110237 		while (result) {
1422*5184Sek110237 			result->fo_targetnext =
1423*5184Sek110237 			    result->fo_resultnext;
1424*5184Sek110237 			result = result->fo_resultnext;
1425*5184Sek110237 		}
1426*5184Sek110237 	}
1427*5184Sek110237 
1428*5184Sek110237 	target = flowop->fo_targets;
1429*5184Sek110237 
1430*5184Sek110237 	flowop_beginop(threadflow, flowop);
1431*5184Sek110237 	/* post to the targets */
1432*5184Sek110237 	while (target) {
1433*5184Sek110237 #ifdef HAVE_SYSV_SEM
1434*5184Sek110237 		struct sembuf sbuf[2];
1435*5184Sek110237 		int semid;
1436*5184Sek110237 		int blocking;
1437*5184Sek110237 #else
1438*5184Sek110237 		int i;
1439*5184Sek110237 #endif /* HAVE_SYSV_SEM */
1440*5184Sek110237 		int value = *flowop->fo_value;
1441*5184Sek110237 		struct timespec timeout;
1442*5184Sek110237 
1443*5184Sek110237 		if (target->fo_instance == FLOW_MASTER) {
1444*5184Sek110237 			target = target->fo_targetnext;
1445*5184Sek110237 			continue;
1446*5184Sek110237 		}
1447*5184Sek110237 
1448*5184Sek110237 #ifdef HAVE_SYSV_SEM
1449*5184Sek110237 
1450*5184Sek110237 		filebench_log(LOG_DEBUG_IMPL,
1451*5184Sek110237 		    "sempost flow %s-%d num %x",
1452*5184Sek110237 		    target->fo_name,
1453*5184Sek110237 		    target->fo_instance,
1454*5184Sek110237 		    target->fo_semid_lw);
1455*5184Sek110237 		/* ipc_mutex_lock(&target->fo_lock); */
1456*5184Sek110237 
1457*5184Sek110237 		if ((semid = semget(filebench_shm->semkey,
1458*5184Sek110237 		    FILEBENCH_NSEMS, 0)) == -1) {
1459*5184Sek110237 			filebench_log(LOG_ERROR,
1460*5184Sek110237 			    "lookup semop %x failed: %s",
1461*5184Sek110237 			    filebench_shm->semkey,
1462*5184Sek110237 			    strerror(errno));
1463*5184Sek110237 			/* ipc_mutex_unlock(&target->fo_lock); */
1464*5184Sek110237 			return (-1);
1465*5184Sek110237 		}
1466*5184Sek110237 
1467*5184Sek110237 		sbuf[0].sem_num = target->fo_semid_lw;
1468*5184Sek110237 		sbuf[0].sem_op = value;
1469*5184Sek110237 		sbuf[0].sem_flg = 0;
1470*5184Sek110237 		sbuf[1].sem_num = target->fo_semid_hw;
1471*5184Sek110237 		sbuf[1].sem_op = value * -1;
1472*5184Sek110237 		sbuf[1].sem_flg = 0;
1473*5184Sek110237 		timeout.tv_sec = 600;
1474*5184Sek110237 		timeout.tv_nsec = 0;
1475*5184Sek110237 
1476*5184Sek110237 		if (*flowop->fo_blocking)
1477*5184Sek110237 			blocking = 1;
1478*5184Sek110237 		else
1479*5184Sek110237 			blocking = 0;
1480*5184Sek110237 
1481*5184Sek110237 #ifdef HAVE_SEMTIMEDOP
1482*5184Sek110237 		if ((semtimedop(semid, &sbuf[0], blocking + 1,
1483*5184Sek110237 		    &timeout) == -1) && (errno && (errno != EAGAIN))) {
1484*5184Sek110237 #else
1485*5184Sek110237 		if ((semop(semid, &sbuf[0], blocking + 1) == -1) &&
1486*5184Sek110237 		    (errno && (errno != EAGAIN))) {
1487*5184Sek110237 #endif /* HAVE_SEMTIMEDOP */
1488*5184Sek110237 			filebench_log(LOG_ERROR, "semop post failed: %s",
1489*5184Sek110237 			    strerror(errno));
1490*5184Sek110237 			/* ipc_mutex_unlock(&target->fo_lock); */
1491*5184Sek110237 			return (-1);
1492*5184Sek110237 		}
1493*5184Sek110237 
1494*5184Sek110237 		filebench_log(LOG_DEBUG_IMPL,
1495*5184Sek110237 		    "flow %s-%d finished posting",
1496*5184Sek110237 		    target->fo_name, target->fo_instance);
1497*5184Sek110237 #else
1498*5184Sek110237 		filebench_log(LOG_DEBUG_IMPL,
1499*5184Sek110237 		    "sempost flow %s-%d to posix semaphore",
1500*5184Sek110237 		    target->fo_name,
1501*5184Sek110237 		    target->fo_instance);
1502*5184Sek110237 
1503*5184Sek110237 		/* Increment sem by value */
1504*5184Sek110237 		for (i = 0; i < value; i++) {
1505*5184Sek110237 			if (sem_post(&target->fo_sem) == -1) {
1506*5184Sek110237 				filebench_log(LOG_ERROR, "semop post failed");
1507*5184Sek110237 				/* ipc_mutex_unlock(&target->fo_lock); */
1508*5184Sek110237 				return (-1);
1509*5184Sek110237 			}
1510*5184Sek110237 		}
1511*5184Sek110237 
1512*5184Sek110237 		filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
1513*5184Sek110237 		    target->fo_name, target->fo_instance);
1514*5184Sek110237 #endif /* HAVE_SYSV_SEM */
1515*5184Sek110237 
1516*5184Sek110237 		target = target->fo_targetnext;
1517*5184Sek110237 	}
1518*5184Sek110237 	flowop_endop(threadflow, flowop);
1519*5184Sek110237 
1520*5184Sek110237 	return (0);
1521*5184Sek110237 }
1522*5184Sek110237 
1523*5184Sek110237 
1524*5184Sek110237 /*
1525*5184Sek110237  * Section for exercising create / open / close / delete operations
1526*5184Sek110237  * on files within a fileset. For proper operation, the flowop attribute
1527*5184Sek110237  * "fd", which sets the fo_fdnumber field in the flowop, must be used
1528*5184Sek110237  * so that the same file is opened and later closed. "fd" is an index
1529*5184Sek110237  * into a pair of arrays maintained by threadflows, one of which
1530*5184Sek110237  * contains the operating system assigned file descriptors and the other
1531*5184Sek110237  * a pointer to the filesetentry whose file the file descriptor
1532*5184Sek110237  * references. An openfile flowop defined without fd being set will use
1533*5184Sek110237  * the default (0) fd or, if specified, rotate through fd indices, but
1534*5184Sek110237  * createfile and closefile must use the default or a specified fd.
1535*5184Sek110237  * Meanwhile deletefile picks and arbitrary file to delete, regardless
1536*5184Sek110237  * of fd attribute.
1537*5184Sek110237  */
1538*5184Sek110237 
1539*5184Sek110237 /*
1540*5184Sek110237  * XXX Making file selection more consistent among the flowops might good
1541*5184Sek110237  */
1542*5184Sek110237 
1543*5184Sek110237 
1544*5184Sek110237 /*
1545*5184Sek110237  * Emulates (and actually does) file open. Obtains a file descriptor
1546*5184Sek110237  * index, then calls flowoplib_openfile_common() to open. Returns -1
1547*5184Sek110237  * if not file descriptor is found or flowoplib_openfile_common
1548*5184Sek110237  * encounters an error, otherwise 0.
1549*5184Sek110237  */
1550*5184Sek110237 static int
1551*5184Sek110237 flowoplib_openfile(threadflow_t *threadflow, flowop_t *flowop)
1552*5184Sek110237 {
1553*5184Sek110237 	int fd = flowoplib_fdnum(threadflow, flowop);
1554*5184Sek110237 
1555*5184Sek110237 	if (fd == -1)
1556*5184Sek110237 		return (-1);
1557*5184Sek110237 
1558*5184Sek110237 	return (flowoplib_openfile_common(threadflow, flowop, fd));
1559*5184Sek110237 }
1560*5184Sek110237 
1561*5184Sek110237 /*
1562*5184Sek110237  * Common file opening code for filesets. Uses the supplied
1563*5184Sek110237  * file descriptor index to determine the tf_fd entry to use.
1564*5184Sek110237  * If the entry is empty (0) and the fileset exists, fileset
1565*5184Sek110237  * pick is called to select a fileset entry to use. The file
1566*5184Sek110237  * specified in the filesetentry is opened, and the returned
1567*5184Sek110237  * operating system file descriptor and a pointer to the
1568*5184Sek110237  * filesetentry are stored in tf_fd[fd] and tf_fse[fd],
1569*5184Sek110237  * respectively. Returns -1 on error, 0 on success.
1570*5184Sek110237  */
1571*5184Sek110237 static int
1572*5184Sek110237 flowoplib_openfile_common(threadflow_t *threadflow, flowop_t *flowop, int fd)
1573*5184Sek110237 {
1574*5184Sek110237 	filesetentry_t *file;
1575*5184Sek110237 	int tid = 0;
1576*5184Sek110237 
1577*5184Sek110237 	/*
1578*5184Sek110237 	 * If the flowop doesn't default to persistent fd
1579*5184Sek110237 	 * then get unique thread ID for use by fileset_pick
1580*5184Sek110237 	 */
1581*5184Sek110237 	if (integer_isset(flowop->fo_rotatefd))
1582*5184Sek110237 		tid = threadflow->tf_utid;
1583*5184Sek110237 
1584*5184Sek110237 	if (threadflow->tf_fd[fd] != 0) {
1585*5184Sek110237 		filebench_log(LOG_ERROR,
1586*5184Sek110237 		    "flowop %s attempted to open without closing on fd %d",
1587*5184Sek110237 		    flowop->fo_name, fd);
1588*5184Sek110237 		return (-1);
1589*5184Sek110237 	}
1590*5184Sek110237 
1591*5184Sek110237 	if (flowop->fo_fileset == NULL) {
1592*5184Sek110237 		filebench_log(LOG_ERROR, "flowop NULL file");
1593*5184Sek110237 		return (-1);
1594*5184Sek110237 	}
1595*5184Sek110237 
1596*5184Sek110237 	if ((file = fileset_pick(flowop->fo_fileset,
1597*5184Sek110237 	    FILESET_PICKEXISTS, tid)) == NULL) {
1598*5184Sek110237 		filebench_log(LOG_ERROR,
1599*5184Sek110237 		    "flowop %s failed to pick file from %s on fd %d",
1600*5184Sek110237 		    flowop->fo_name,
1601*5184Sek110237 		    flowop->fo_fileset->fs_name, fd);
1602*5184Sek110237 		return (-1);
1603*5184Sek110237 	}
1604*5184Sek110237 
1605*5184Sek110237 	threadflow->tf_fse[fd] = file;
1606*5184Sek110237 
1607*5184Sek110237 	flowop_beginop(threadflow, flowop);
1608*5184Sek110237 	threadflow->tf_fd[fd] = fileset_openfile(flowop->fo_fileset,
1609*5184Sek110237 	    file, O_RDWR, 0666, flowoplib_fileattrs(flowop));
1610*5184Sek110237 	flowop_endop(threadflow, flowop);
1611*5184Sek110237 
1612*5184Sek110237 	if (threadflow->tf_fd[fd] < 0) {
1613*5184Sek110237 		filebench_log(LOG_ERROR, "failed to open file %s",
1614*5184Sek110237 		    flowop->fo_name);
1615*5184Sek110237 		return (-1);
1616*5184Sek110237 	}
1617*5184Sek110237 
1618*5184Sek110237 	filebench_log(LOG_DEBUG_SCRIPT,
1619*5184Sek110237 	    "flowop %s: opened %s fd[%d] = %d",
1620*5184Sek110237 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1621*5184Sek110237 
1622*5184Sek110237 	return (0);
1623*5184Sek110237 }
1624*5184Sek110237 
1625*5184Sek110237 /*
1626*5184Sek110237  * Emulate create of a file. Uses the flowop's fdnumber to select
1627*5184Sek110237  * tf_fd and tf_fse array locations to put the created file's file
1628*5184Sek110237  * descriptor and filesetentry respectively. Uses fileset_pick()
1629*5184Sek110237  * to select a specific filesetentry whose file does not currently
1630*5184Sek110237  * exist for the file create operation. Then calls
1631*5184Sek110237  * fileset_openfile() with the O_CREATE flag set to create the
1632*5184Sek110237  * file. Returns -1 if the array index specified by fdnumber is
1633*5184Sek110237  * already in use, the flowop has no associated fileset, or
1634*5184Sek110237  * the create call fails. Returns 1 if a filesetentry with a
1635*5184Sek110237  * nonexistent file cannot be found. Returns 0 on success.
1636*5184Sek110237  */
1637*5184Sek110237 static int
1638*5184Sek110237 flowoplib_createfile(threadflow_t *threadflow, flowop_t *flowop)
1639*5184Sek110237 {
1640*5184Sek110237 	filesetentry_t *file;
1641*5184Sek110237 	int fd = flowop->fo_fdnumber;
1642*5184Sek110237 
1643*5184Sek110237 	if (threadflow->tf_fd[fd] != 0) {
1644*5184Sek110237 		filebench_log(LOG_ERROR,
1645*5184Sek110237 		    "flowop %s attempted to create without closing on fd %d",
1646*5184Sek110237 		    flowop->fo_name, fd);
1647*5184Sek110237 		return (-1);
1648*5184Sek110237 	}
1649*5184Sek110237 
1650*5184Sek110237 	if (flowop->fo_fileset == NULL) {
1651*5184Sek110237 		filebench_log(LOG_ERROR, "flowop NULL file");
1652*5184Sek110237 		return (-1);
1653*5184Sek110237 	}
1654*5184Sek110237 
1655*5184Sek110237 	if ((file = fileset_pick(flowop->fo_fileset,
1656*5184Sek110237 	    FILESET_PICKNOEXIST, 0)) == NULL) {
1657*5184Sek110237 		filebench_log(LOG_DEBUG_SCRIPT, "flowop %s failed to pick file",
1658*5184Sek110237 		    flowop->fo_name);
1659*5184Sek110237 		return (1);
1660*5184Sek110237 	}
1661*5184Sek110237 
1662*5184Sek110237 	threadflow->tf_fse[fd] = file;
1663*5184Sek110237 
1664*5184Sek110237 	flowop_beginop(threadflow, flowop);
1665*5184Sek110237 	threadflow->tf_fd[fd] = fileset_openfile(flowop->fo_fileset,
1666*5184Sek110237 	    file, O_RDWR | O_CREAT, 0666, flowoplib_fileattrs(flowop));
1667*5184Sek110237 	flowop_endop(threadflow, flowop);
1668*5184Sek110237 
1669*5184Sek110237 	if (threadflow->tf_fd[fd] < 0) {
1670*5184Sek110237 		filebench_log(LOG_ERROR, "failed to create file %s",
1671*5184Sek110237 		    flowop->fo_name);
1672*5184Sek110237 		return (-1);
1673*5184Sek110237 	}
1674*5184Sek110237 
1675*5184Sek110237 	filebench_log(LOG_DEBUG_SCRIPT,
1676*5184Sek110237 	    "flowop %s: created %s fd[%d] = %d",
1677*5184Sek110237 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1678*5184Sek110237 
1679*5184Sek110237 	return (0);
1680*5184Sek110237 }
1681*5184Sek110237 
1682*5184Sek110237 /*
1683*5184Sek110237  * Emulates delete of a file. Picks an arbitrary filesetentry
1684*5184Sek110237  * whose file exists and uses unlink() to delete it. Clears
1685*5184Sek110237  * the FSE_EXISTS flag for the filesetentry. Returns -1 if the
1686*5184Sek110237  * flowop has no associated fileset. Returns 1 if an appropriate
1687*5184Sek110237  * filesetentry cannot be found, and 0 on success.
1688*5184Sek110237  */
1689*5184Sek110237 static int
1690*5184Sek110237 flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop)
1691*5184Sek110237 {
1692*5184Sek110237 	filesetentry_t *file;
1693*5184Sek110237 	fileset_t *fileset;
1694*5184Sek110237 	char path[MAXPATHLEN];
1695*5184Sek110237 	char *pathtmp;
1696*5184Sek110237 
1697*5184Sek110237 	if (flowop->fo_fileset == NULL) {
1698*5184Sek110237 		filebench_log(LOG_ERROR, "flowop NULL file");
1699*5184Sek110237 		return (-1);
1700*5184Sek110237 	}
1701*5184Sek110237 
1702*5184Sek110237 	fileset = flowop->fo_fileset;
1703*5184Sek110237 
1704*5184Sek110237 	if ((file = fileset_pick(flowop->fo_fileset,
1705*5184Sek110237 	    FILESET_PICKEXISTS, 0)) == NULL) {
1706*5184Sek110237 		filebench_log(LOG_DEBUG_SCRIPT, "flowop %s failed to pick file",
1707*5184Sek110237 		    flowop->fo_name);
1708*5184Sek110237 		return (1);
1709*5184Sek110237 	}
1710*5184Sek110237 
1711*5184Sek110237 	*path = 0;
1712*5184Sek110237 	(void) strcpy(path, *fileset->fs_path);
1713*5184Sek110237 	(void) strcat(path, "/");
1714*5184Sek110237 	(void) strcat(path, fileset->fs_name);
1715*5184Sek110237 	pathtmp = fileset_resolvepath(file);
1716*5184Sek110237 	(void) strcat(path, pathtmp);
1717*5184Sek110237 	free(pathtmp);
1718*5184Sek110237 
1719*5184Sek110237 	flowop_beginop(threadflow, flowop);
1720*5184Sek110237 	(void) unlink(path);
1721*5184Sek110237 	flowop_endop(threadflow, flowop);
1722*5184Sek110237 	file->fse_flags &= ~FSE_EXISTS;
1723*5184Sek110237 	(void) ipc_mutex_unlock(&file->fse_lock);
1724*5184Sek110237 
1725*5184Sek110237 	filebench_log(LOG_DEBUG_SCRIPT, "deleted file %s", file->fse_path);
1726*5184Sek110237 
1727*5184Sek110237 	return (0);
1728*5184Sek110237 }
1729*5184Sek110237 
1730*5184Sek110237 /*
1731*5184Sek110237  * Emulates fsync of a file. Obtains the file descriptor index
1732*5184Sek110237  * from the flowop, obtains the actual file descriptor from
1733*5184Sek110237  * the threadflow's table, checks to be sure it is still an
1734*5184Sek110237  * open file, then does an fsync operation on it. Returns -1
1735*5184Sek110237  * if the file no longer is open, 0 otherwise.
1736*5184Sek110237  */
1737*5184Sek110237 static int
1738*5184Sek110237 flowoplib_fsync(threadflow_t *threadflow, flowop_t *flowop)
1739*5184Sek110237 {
1740*5184Sek110237 	filesetentry_t *file;
1741*5184Sek110237 	int fd = flowop->fo_fdnumber;
1742*5184Sek110237 
1743*5184Sek110237 	if (threadflow->tf_fd[fd] == 0) {
1744*5184Sek110237 		filebench_log(LOG_ERROR,
1745*5184Sek110237 		    "flowop %s attempted to fsync a closed fd %d",
1746*5184Sek110237 		    flowop->fo_name, fd);
1747*5184Sek110237 		return (-1);
1748*5184Sek110237 	}
1749*5184Sek110237 
1750*5184Sek110237 	/* Measure time to fsync */
1751*5184Sek110237 	flowop_beginop(threadflow, flowop);
1752*5184Sek110237 	(void) fsync(threadflow->tf_fd[fd]);
1753*5184Sek110237 	flowop_endop(threadflow, flowop);
1754*5184Sek110237 
1755*5184Sek110237 	file = threadflow->tf_fse[fd];
1756*5184Sek110237 
1757*5184Sek110237 	filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s", file->fse_path);
1758*5184Sek110237 
1759*5184Sek110237 	return (0);
1760*5184Sek110237 }
1761*5184Sek110237 
1762*5184Sek110237 /*
1763*5184Sek110237  * Emulate fsync of an entire fileset. Search through the
1764*5184Sek110237  * threadflow's file descriptor array, doing fsync() on each
1765*5184Sek110237  * open file that belongs to the flowop's fileset. Always
1766*5184Sek110237  * returns 0.
1767*5184Sek110237  */
1768*5184Sek110237 static int
1769*5184Sek110237 flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop)
1770*5184Sek110237 {
1771*5184Sek110237 	int fd;
1772*5184Sek110237 
1773*5184Sek110237 	for (fd = 0; fd < THREADFLOW_MAXFD; fd++) {
1774*5184Sek110237 		filesetentry_t *file;
1775*5184Sek110237 
1776*5184Sek110237 		/* Match the file set to fsync */
1777*5184Sek110237 		if ((threadflow->tf_fse[fd] == NULL) ||
1778*5184Sek110237 		    (flowop->fo_fileset != threadflow->tf_fse[fd]->fse_fileset))
1779*5184Sek110237 			continue;
1780*5184Sek110237 
1781*5184Sek110237 		/* Measure time to fsync */
1782*5184Sek110237 		flowop_beginop(threadflow, flowop);
1783*5184Sek110237 		(void) fsync(threadflow->tf_fd[fd]);
1784*5184Sek110237 		flowop_endop(threadflow, flowop);
1785*5184Sek110237 
1786*5184Sek110237 		file = threadflow->tf_fse[fd];
1787*5184Sek110237 
1788*5184Sek110237 		filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s",
1789*5184Sek110237 		    file->fse_path);
1790*5184Sek110237 	}
1791*5184Sek110237 
1792*5184Sek110237 	return (0);
1793*5184Sek110237 }
1794*5184Sek110237 
1795*5184Sek110237 /*
1796*5184Sek110237  * Emulate close of a file.  Obtains the file descriptor index
1797*5184Sek110237  * from the flowop, obtains the actual file descriptor from the
1798*5184Sek110237  * threadflow's table, checks to be sure it is still an open
1799*5184Sek110237  * file, then does a close operation on it. Then sets the
1800*5184Sek110237  * threadflow file descriptor table entry to 0, and the file set
1801*5184Sek110237  * entry pointer to NULL. Returns -1 if the file was not open,
1802*5184Sek110237  * 0 otherwise.
1803*5184Sek110237  */
1804*5184Sek110237 static int
1805*5184Sek110237 flowoplib_closefile(threadflow_t *threadflow, flowop_t *flowop)
1806*5184Sek110237 {
1807*5184Sek110237 	filesetentry_t *file;
1808*5184Sek110237 	int fd = flowop->fo_fdnumber;
1809*5184Sek110237 
1810*5184Sek110237 	if (threadflow->tf_fd[fd] == 0) {
1811*5184Sek110237 		filebench_log(LOG_ERROR,
1812*5184Sek110237 		    "flowop %s attempted to close an already closed fd %d",
1813*5184Sek110237 		    flowop->fo_name, fd);
1814*5184Sek110237 		return (-1);
1815*5184Sek110237 	}
1816*5184Sek110237 
1817*5184Sek110237 	/* Measure time to close */
1818*5184Sek110237 	flowop_beginop(threadflow, flowop);
1819*5184Sek110237 	(void) close(threadflow->tf_fd[fd]);
1820*5184Sek110237 	flowop_endop(threadflow, flowop);
1821*5184Sek110237 
1822*5184Sek110237 	file = threadflow->tf_fse[fd];
1823*5184Sek110237 
1824*5184Sek110237 	threadflow->tf_fd[fd] = 0;
1825*5184Sek110237 	threadflow->tf_fse[fd] = NULL;
1826*5184Sek110237 
1827*5184Sek110237 	filebench_log(LOG_DEBUG_SCRIPT, "closed file %s", file->fse_path);
1828*5184Sek110237 
1829*5184Sek110237 	return (0);
1830*5184Sek110237 }
1831*5184Sek110237 
1832*5184Sek110237 /*
1833*5184Sek110237  * Emulate stat of a file. Picks an arbitrary filesetentry with
1834*5184Sek110237  * an existing file from the flowop's fileset, then performs a
1835*5184Sek110237  * stat() operation on it. Returns -1 if the flowop has no
1836*5184Sek110237  * associated fileset. Returns 1 if an appropriate filesetentry
1837*5184Sek110237  * cannot be found, and 0 on success.
1838*5184Sek110237  */
1839*5184Sek110237 static int
1840*5184Sek110237 flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop)
1841*5184Sek110237 {
1842*5184Sek110237 	filesetentry_t *file;
1843*5184Sek110237 	fileset_t *fileset;
1844*5184Sek110237 	char path[MAXPATHLEN];
1845*5184Sek110237 	char *pathtmp;
1846*5184Sek110237 
1847*5184Sek110237 	if (flowop->fo_fileset == NULL) {
1848*5184Sek110237 		filebench_log(LOG_ERROR, "flowop NULL file");
1849*5184Sek110237 		return (-1);
1850*5184Sek110237 	}
1851*5184Sek110237 
1852*5184Sek110237 	fileset = flowop->fo_fileset;
1853*5184Sek110237 
1854*5184Sek110237 	if ((file = fileset_pick(flowop->fo_fileset,
1855*5184Sek110237 	    FILESET_PICKEXISTS, 0)) == NULL) {
1856*5184Sek110237 		filebench_log(LOG_DEBUG_SCRIPT, "flowop %s failed to pick file",
1857*5184Sek110237 		    flowop->fo_name);
1858*5184Sek110237 		return (1);
1859*5184Sek110237 	}
1860*5184Sek110237 
1861*5184Sek110237 	*path = 0;
1862*5184Sek110237 	(void) strcpy(path, *fileset->fs_path);
1863*5184Sek110237 	(void) strcat(path, "/");
1864*5184Sek110237 	(void) strcat(path, fileset->fs_name);
1865*5184Sek110237 	pathtmp = fileset_resolvepath(file);
1866*5184Sek110237 	(void) strcat(path, pathtmp);
1867*5184Sek110237 	free(pathtmp);
1868*5184Sek110237 
1869*5184Sek110237 	flowop_beginop(threadflow, flowop);
1870*5184Sek110237 	flowop_endop(threadflow, flowop);
1871*5184Sek110237 
1872*5184Sek110237 	(void) ipc_mutex_unlock(&file->fse_lock);
1873*5184Sek110237 
1874*5184Sek110237 	return (0);
1875*5184Sek110237 }
1876*5184Sek110237 
1877*5184Sek110237 
1878*5184Sek110237 /*
1879*5184Sek110237  * Additional reads and writes. Read and write whole files, write
1880*5184Sek110237  * and append to files. Some of these work with both fileobjs and
1881*5184Sek110237  * filesets, others only with filesets. The flowoplib_write routine
1882*5184Sek110237  * writes from thread memory, while the others read or write using
1883*5184Sek110237  * fo_buf memory. Note that both flowoplib_read() and
1884*5184Sek110237  * flowoplib_aiowrite() use thread memory as well.
1885*5184Sek110237  */
1886*5184Sek110237 
1887*5184Sek110237 
1888*5184Sek110237 /*
1889*5184Sek110237  * Emulate a read of a whole file.  The file must be open
1890*5184Sek110237  * with file descriptor and filesetentry stored at the
1891*5184Sek110237  * locations indexed by the flowop's fdnumber. It then seeks
1892*5184Sek110237  * to the beginning of the associated file, and reads
1893*5184Sek110237  * FILE_ALLOC_BLOCK bytes at a time until the end of the
1894*5184Sek110237  * file. Returns -1 on error, 0 on success.
1895*5184Sek110237  */
1896*5184Sek110237 static int
1897*5184Sek110237 flowoplib_readwholefile(threadflow_t *threadflow, flowop_t *flowop)
1898*5184Sek110237 {
1899*5184Sek110237 	size_t memoffset;
1900*5184Sek110237 	long memsize, round;
1901*5184Sek110237 	off64_t bytes = 0;
1902*5184Sek110237 	int fd = flowop->fo_fdnumber;
1903*5184Sek110237 	int ret;
1904*5184Sek110237 
1905*5184Sek110237 	if (threadflow->tf_fd[fd] == 0) {
1906*5184Sek110237 		filebench_log(LOG_ERROR,
1907*5184Sek110237 		    "flowop %s attempted to read a closed fd %d",
1908*5184Sek110237 		    flowop->fo_name, fd);
1909*5184Sek110237 		return (-1);
1910*5184Sek110237 	}
1911*5184Sek110237 
1912*5184Sek110237 	if ((flowop->fo_buf == NULL) &&
1913*5184Sek110237 	    ((flowop->fo_buf = (char *)malloc(FILE_ALLOC_BLOCK)) == NULL))
1914*5184Sek110237 		return (-1);
1915*5184Sek110237 
1916*5184Sek110237 	if (threadflow->tf_fse[fd] == NULL) {
1917*5184Sek110237 		filebench_log(LOG_ERROR, "flowop %s: NULL file",
1918*5184Sek110237 		    flowop->fo_name);
1919*5184Sek110237 		return (-1);
1920*5184Sek110237 	}
1921*5184Sek110237 
1922*5184Sek110237 	memsize = *threadflow->tf_memsize;
1923*5184Sek110237 	round = *flowop->fo_iosize;
1924*5184Sek110237 	if (filebench_randomno(&memoffset, memsize, round) == -1) {
1925*5184Sek110237 		filebench_log(LOG_ERROR,
1926*5184Sek110237 		    "tf_memsize smaller than IO size for thread %s",
1927*5184Sek110237 		    flowop->fo_name);
1928*5184Sek110237 		return (-1);
1929*5184Sek110237 	}
1930*5184Sek110237 
1931*5184Sek110237 	/* Measure time to read bytes */
1932*5184Sek110237 	flowop_beginop(threadflow, flowop);
1933*5184Sek110237 	(void) lseek64(threadflow->tf_fd[fd], 0, SEEK_SET);
1934*5184Sek110237 	while ((ret = read(threadflow->tf_fd[fd], flowop->fo_buf,
1935*5184Sek110237 	    FILE_ALLOC_BLOCK)) > 0)
1936*5184Sek110237 		bytes += ret;
1937*5184Sek110237 
1938*5184Sek110237 	flowop_endop(threadflow, flowop);
1939*5184Sek110237 
1940*5184Sek110237 	if (ret < 0) {
1941*5184Sek110237 		filebench_log(LOG_ERROR,
1942*5184Sek110237 		    "Failed to read fd %d: %s",
1943*5184Sek110237 		    fd, strerror(errno));
1944*5184Sek110237 		return (-1);
1945*5184Sek110237 	}
1946*5184Sek110237 
1947*5184Sek110237 	if (flowop->fo_iosize == NULL)
1948*5184Sek110237 		flowop->fo_iosize = integer_alloc(bytes);
1949*5184Sek110237 	*(flowop->fo_iosize) = bytes;
1950*5184Sek110237 
1951*5184Sek110237 	return (0);
1952*5184Sek110237 }
1953*5184Sek110237 
1954*5184Sek110237 /*
1955*5184Sek110237  * Emulate a write to a file of size fo_iosize.  Will write
1956*5184Sek110237  * to a file from a fileset if the flowop's fo_fileset field
1957*5184Sek110237  * specifies one or its fdnumber is non zero. Otherwise it
1958*5184Sek110237  * will write to a fileobj file, if one exists. If the file
1959*5184Sek110237  * is not currently open, the routine will attempt to open
1960*5184Sek110237  * it. The flowop's fo_wss parameter will be used to set the
1961*5184Sek110237  * maximum file size if it is non-zero, otherwise the
1962*5184Sek110237  * filesetentry's  fse_size will be used. A random memory
1963*5184Sek110237  * buffer offset is calculated, and, if fo_random is TRUE,
1964*5184Sek110237  * a random file offset is used for the write. Otherwise the
1965*5184Sek110237  * write is to the next sequential location. Returns 1 on
1966*5184Sek110237  * errors, 0 on success.
1967*5184Sek110237  */
1968*5184Sek110237 static int
1969*5184Sek110237 flowoplib_write(threadflow_t *threadflow, flowop_t *flowop)
1970*5184Sek110237 {
1971*5184Sek110237 	size_t memoffset;
1972*5184Sek110237 	vinteger_t wss;
1973*5184Sek110237 	long memsize, round;
1974*5184Sek110237 	int filedesc;
1975*5184Sek110237 
1976*5184Sek110237 	if (flowop->fo_fileset || flowop->fo_fdnumber) {
1977*5184Sek110237 		int fd = flowoplib_fdnum(threadflow, flowop);
1978*5184Sek110237 
1979*5184Sek110237 		if (fd == -1)
1980*5184Sek110237 			return (-1);
1981*5184Sek110237 
1982*5184Sek110237 		if (threadflow->tf_fd[fd] == 0) {
1983*5184Sek110237 			(void) flowoplib_openfile_common(threadflow,
1984*5184Sek110237 			    flowop, fd);
1985*5184Sek110237 			filebench_log(LOG_DEBUG_IMPL, "read opened file %s",
1986*5184Sek110237 			    threadflow->tf_fse[fd]->fse_path);
1987*5184Sek110237 		}
1988*5184Sek110237 		filedesc = threadflow->tf_fd[fd];
1989*5184Sek110237 		if (*flowop->fo_wss == 0)
1990*5184Sek110237 			wss = threadflow->tf_fse[fd]->fse_size;
1991*5184Sek110237 		else
1992*5184Sek110237 			wss = *flowop->fo_wss;
1993*5184Sek110237 	} else {
1994*5184Sek110237 		if (flowop->fo_file == NULL) {
1995*5184Sek110237 			filebench_log(LOG_ERROR, "flowop NULL file");
1996*5184Sek110237 			return (-1);
1997*5184Sek110237 		}
1998*5184Sek110237 		if (flowop->fo_fd < 0)
1999*5184Sek110237 			flowop->fo_fd = fileobj_open(flowop->fo_file,
2000*5184Sek110237 			    flowoplib_fileattrs(flowop));
2001*5184Sek110237 
2002*5184Sek110237 		if (flowop->fo_fd < 0) {
2003*5184Sek110237 			filebench_log(LOG_ERROR, "failed to open file %s",
2004*5184Sek110237 			    flowop->fo_file->fo_name);
2005*5184Sek110237 			return (-1);
2006*5184Sek110237 		}
2007*5184Sek110237 		filedesc = flowop->fo_fd;
2008*5184Sek110237 		if (*flowop->fo_wss == 0)
2009*5184Sek110237 			wss = *flowop->fo_file->fo_size;
2010*5184Sek110237 		else
2011*5184Sek110237 			wss = *flowop->fo_wss;
2012*5184Sek110237 	}
2013*5184Sek110237 
2014*5184Sek110237 	if ((flowop->fo_buf == NULL) &&
2015*5184Sek110237 	    ((flowop->fo_buf = (char *)malloc(FILE_ALLOC_BLOCK)) == NULL)) {
2016*5184Sek110237 		return (-1);
2017*5184Sek110237 	}
2018*5184Sek110237 
2019*5184Sek110237 	if (*flowop->fo_iosize == 0) {
2020*5184Sek110237 		filebench_log(LOG_ERROR, "zero iosize for thread %s",
2021*5184Sek110237 		    flowop->fo_name);
2022*5184Sek110237 		return (-1);
2023*5184Sek110237 	}
2024*5184Sek110237 
2025*5184Sek110237 	memsize = *threadflow->tf_memsize;
2026*5184Sek110237 	round = *flowop->fo_iosize;
2027*5184Sek110237 
2028*5184Sek110237 	/* Select memory offset for IO */
2029*5184Sek110237 	if (filebench_randomno(&memoffset, memsize, round) == -1) {
2030*5184Sek110237 		filebench_log(LOG_ERROR,
2031*5184Sek110237 		    "tf_memsize smaller than IO size for thread %s",
2032*5184Sek110237 		    flowop->fo_name);
2033*5184Sek110237 		return (-1);
2034*5184Sek110237 	}
2035*5184Sek110237 
2036*5184Sek110237 	if (*flowop->fo_random) {
2037*5184Sek110237 		uint64_t fileoffset;
2038*5184Sek110237 
2039*5184Sek110237 		if (filebench_randomno64(&fileoffset,
2040*5184Sek110237 		    wss, *flowop->fo_iosize) == -1) {
2041*5184Sek110237 			filebench_log(LOG_ERROR,
2042*5184Sek110237 			    "file size smaller than IO size for thread %s",
2043*5184Sek110237 			    flowop->fo_name);
2044*5184Sek110237 			return (-1);
2045*5184Sek110237 		}
2046*5184Sek110237 		flowop_beginop(threadflow, flowop);
2047*5184Sek110237 		if (pwrite64(filedesc, threadflow->tf_mem + memoffset,
2048*5184Sek110237 		    *flowop->fo_iosize, (off64_t)fileoffset) == -1) {
2049*5184Sek110237 			filebench_log(LOG_ERROR, "write failed, "
2050*5184Sek110237 			    "offset %lld memoffset %zd: %s",
2051*5184Sek110237 			    fileoffset, memoffset, strerror(errno));
2052*5184Sek110237 			flowop_endop(threadflow, flowop);
2053*5184Sek110237 			return (-1);
2054*5184Sek110237 		}
2055*5184Sek110237 		flowop_endop(threadflow, flowop);
2056*5184Sek110237 	} else {
2057*5184Sek110237 		flowop_beginop(threadflow, flowop);
2058*5184Sek110237 		if (write(filedesc, threadflow->tf_mem + memoffset,
2059*5184Sek110237 		    *flowop->fo_iosize) == -1) {
2060*5184Sek110237 			filebench_log(LOG_ERROR,
2061*5184Sek110237 			    "write failed, memoffset %zd: %s",
2062*5184Sek110237 			    memoffset, strerror(errno));
2063*5184Sek110237 			flowop_endop(threadflow, flowop);
2064*5184Sek110237 			return (-1);
2065*5184Sek110237 		}
2066*5184Sek110237 		flowop_endop(threadflow, flowop);
2067*5184Sek110237 	}
2068*5184Sek110237 
2069*5184Sek110237 	return (0);
2070*5184Sek110237 }
2071*5184Sek110237 
2072*5184Sek110237 /*
2073*5184Sek110237  * Emulate a write of a whole file.  The size of the file
2074*5184Sek110237  * is taken from a filesetentry identified by fo_srcfdnumber,
2075*5184Sek110237  * while the file descriptor used is identified by
2076*5184Sek110237  * fo_fdnumber. Does multiple writes of FILE_ALLOC_BLOCK
2077*5184Sek110237  * length until full file has been written. Returns -1 on
2078*5184Sek110237  * error, 0 on success and sets flowop->fo_iosize to the
2079*5184Sek110237  * number of bytes actually written.
2080*5184Sek110237  */
2081*5184Sek110237 static int
2082*5184Sek110237 flowoplib_writewholefile(threadflow_t *threadflow, flowop_t *flowop)
2083*5184Sek110237 {
2084*5184Sek110237 	size_t memoffset;
2085*5184Sek110237 	filesetentry_t *file;
2086*5184Sek110237 	int wsize;
2087*5184Sek110237 	off64_t seek;
2088*5184Sek110237 	off64_t bytes = 0;
2089*5184Sek110237 	long memsize, round;
2090*5184Sek110237 	int fd = flowop->fo_fdnumber;
2091*5184Sek110237 	int srcfd = flowop->fo_srcfdnumber;
2092*5184Sek110237 	int ret;
2093*5184Sek110237 
2094*5184Sek110237 	if (threadflow->tf_fd[fd] == 0) {
2095*5184Sek110237 		filebench_log(LOG_ERROR,
2096*5184Sek110237 		    "flowop %s attempted to write a closed fd %d",
2097*5184Sek110237 		    flowop->fo_name, fd);
2098*5184Sek110237 		return (-1);
2099*5184Sek110237 	}
2100*5184Sek110237 
2101*5184Sek110237 	if ((flowop->fo_buf == NULL) &&
2102*5184Sek110237 	    ((flowop->fo_buf = (char *)malloc(FILE_ALLOC_BLOCK)) == NULL)) {
2103*5184Sek110237 		return (-1);
2104*5184Sek110237 	}
2105*5184Sek110237 
2106*5184Sek110237 	memsize = *threadflow->tf_memsize;
2107*5184Sek110237 	round = *flowop->fo_iosize;
2108*5184Sek110237 	if (filebench_randomno(&memoffset, memsize, round) == -1) {
2109*5184Sek110237 		filebench_log(LOG_ERROR,
2110*5184Sek110237 		    "tf_memsize smaller than IO size for thread %s",
2111*5184Sek110237 		    flowop->fo_name);
2112*5184Sek110237 	}
2113*5184Sek110237 
2114*5184Sek110237 	file = threadflow->tf_fse[srcfd];
2115*5184Sek110237 	if (((srcfd != 0) && (file == NULL)) ||
2116*5184Sek110237 	    ((file = threadflow->tf_fse[fd]) == NULL)) {
2117*5184Sek110237 		filebench_log(LOG_ERROR, "flowop %s: NULL file",
2118*5184Sek110237 		    flowop->fo_name);
2119*5184Sek110237 		return (-1);
2120*5184Sek110237 	}
2121*5184Sek110237 
2122*5184Sek110237 	wsize = MIN(file->fse_size, FILE_ALLOC_BLOCK);
2123*5184Sek110237 
2124*5184Sek110237 	/* Measure time to write bytes */
2125*5184Sek110237 	flowop_beginop(threadflow, flowop);
2126*5184Sek110237 	for (seek = 0; seek < file->fse_size; seek += FILE_ALLOC_BLOCK) {
2127*5184Sek110237 		ret = write(threadflow->tf_fd[fd], flowop->fo_buf, wsize);
2128*5184Sek110237 		if (ret != wsize) {
2129*5184Sek110237 			filebench_log(LOG_ERROR,
2130*5184Sek110237 			    "Failed to write %d bytes on fd %d: %s",
2131*5184Sek110237 			    threadflow->tf_fd[fd], fd, strerror(errno));
2132*5184Sek110237 			flowop_endop(threadflow, flowop);
2133*5184Sek110237 			return (-1);
2134*5184Sek110237 		}
2135*5184Sek110237 		bytes += ret;
2136*5184Sek110237 	}
2137*5184Sek110237 	flowop_endop(threadflow, flowop);
2138*5184Sek110237 
2139*5184Sek110237 	if (flowop->fo_iosize == NULL)
2140*5184Sek110237 		flowop->fo_iosize = integer_alloc(bytes);
2141*5184Sek110237 	*(flowop->fo_iosize) = bytes;
2142*5184Sek110237 
2143*5184Sek110237 	return (0);
2144*5184Sek110237 }
2145*5184Sek110237 
2146*5184Sek110237 
2147*5184Sek110237 /*
2148*5184Sek110237  * Emulate a fixed size append to a file. Will append data to
2149*5184Sek110237  * a file chosen from a fileset if the flowop's fo_fileset
2150*5184Sek110237  * field specifies one or if its fdnumber is non zero.
2151*5184Sek110237  * Otherwise it will write to a fileobj file, if one exists.
2152*5184Sek110237  * The flowop's fo_wss parameter will be used to set the
2153*5184Sek110237  * maximum file size if it is non-zero, otherwise the
2154*5184Sek110237  * filesetentry's fse_size will be used. A random memory
2155*5184Sek110237  * buffer offset is calculated, then a logical seek to the
2156*5184Sek110237  * end of file is done followed by a write of fo_iosize
2157*5184Sek110237  * bytes. Writes are actually done from fo_buf, rather than
2158*5184Sek110237  * tf_mem as is done with flowoplib_write(), and no check
2159*5184Sek110237  * is made to see if fo_iosize exceeds the size of fo_buf.
2160*5184Sek110237  * Returns -1 on error, 0 on success.
2161*5184Sek110237  */
2162*5184Sek110237 static int
2163*5184Sek110237 flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop)
2164*5184Sek110237 {
2165*5184Sek110237 	size_t memoffset;
2166*5184Sek110237 	off64_t wsize;
2167*5184Sek110237 	long memsize;
2168*5184Sek110237 	int fd, filedesc;
2169*5184Sek110237 	/* LINTED E_FUNC_SET_NOT_USED */
2170*5184Sek110237 	vinteger_t wss;
2171*5184Sek110237 	int ret;
2172*5184Sek110237 
2173*5184Sek110237 	if (flowop->fo_fileset || flowop->fo_fdnumber) {
2174*5184Sek110237 		fd  = flowoplib_fdnum(threadflow, flowop);
2175*5184Sek110237 
2176*5184Sek110237 		if (fd == -1)
2177*5184Sek110237 			return (-1);
2178*5184Sek110237 
2179*5184Sek110237 		if (threadflow->tf_fd[fd] == 0) {
2180*5184Sek110237 			(void) flowoplib_openfile_common(threadflow,
2181*5184Sek110237 			    flowop, fd);
2182*5184Sek110237 			filebench_log(LOG_DEBUG_IMPL, "read opened file %s",
2183*5184Sek110237 			    threadflow->tf_fse[fd]->fse_path);
2184*5184Sek110237 		}
2185*5184Sek110237 		filedesc = threadflow->tf_fd[fd];
2186*5184Sek110237 		if (*flowop->fo_wss == 0)
2187*5184Sek110237 			wss = threadflow->tf_fse[fd]->fse_size;
2188*5184Sek110237 		else
2189*5184Sek110237 			wss = *flowop->fo_wss;
2190*5184Sek110237 	} else {
2191*5184Sek110237 		if (flowop->fo_file == NULL) {
2192*5184Sek110237 			filebench_log(LOG_ERROR, "flowop NULL file");
2193*5184Sek110237 			return (-1);
2194*5184Sek110237 		}
2195*5184Sek110237 		if (flowop->fo_fd < 0)
2196*5184Sek110237 			flowop->fo_fd = fileobj_open(flowop->fo_file,
2197*5184Sek110237 			    flowoplib_fileattrs(flowop));
2198*5184Sek110237 
2199*5184Sek110237 		if (flowop->fo_fd < 0) {
2200*5184Sek110237 			filebench_log(LOG_ERROR, "failed to open file %s",
2201*5184Sek110237 			    flowop->fo_file->fo_name);
2202*5184Sek110237 			return (-1);
2203*5184Sek110237 		}
2204*5184Sek110237 		filedesc = flowop->fo_fd;
2205*5184Sek110237 		if (*flowop->fo_wss == 0)
2206*5184Sek110237 			wss = *flowop->fo_file->fo_size;
2207*5184Sek110237 		else
2208*5184Sek110237 			wss = *flowop->fo_wss;
2209*5184Sek110237 	}
2210*5184Sek110237 	/* XXX wss is not being used */
2211*5184Sek110237 
2212*5184Sek110237 	if ((flowop->fo_buf == NULL) &&
2213*5184Sek110237 	    ((flowop->fo_buf = (char *)malloc(FILE_ALLOC_BLOCK)) == NULL)) {
2214*5184Sek110237 		return (-1);
2215*5184Sek110237 	}
2216*5184Sek110237 
2217*5184Sek110237 	memsize = *threadflow->tf_memsize;
2218*5184Sek110237 	wsize = *flowop->fo_iosize;
2219*5184Sek110237 	if (filebench_randomno(&memoffset, memsize, wsize) == -1) {
2220*5184Sek110237 		filebench_log(LOG_ERROR,
2221*5184Sek110237 		    "tf_memsize smaller than IO size for thread %s",
2222*5184Sek110237 		    flowop->fo_name);
2223*5184Sek110237 		return (-1);
2224*5184Sek110237 	}
2225*5184Sek110237 
2226*5184Sek110237 	/* Measure time to write bytes */
2227*5184Sek110237 	flowop_beginop(threadflow, flowop);
2228*5184Sek110237 	(void) lseek64(filedesc, 0, SEEK_END);
2229*5184Sek110237 	ret = write(filedesc, flowop->fo_buf, wsize);
2230*5184Sek110237 	if (ret != wsize) {
2231*5184Sek110237 		filebench_log(LOG_ERROR,
2232*5184Sek110237 		    "Failed to write %d bytes on fd %d: %s",
2233*5184Sek110237 		    wsize, fd, strerror(errno));
2234*5184Sek110237 		flowop_endop(threadflow, flowop);
2235*5184Sek110237 		return (-1);
2236*5184Sek110237 	}
2237*5184Sek110237 	flowop_endop(threadflow, flowop);
2238*5184Sek110237 
2239*5184Sek110237 	return (0);
2240*5184Sek110237 }
2241*5184Sek110237 
2242*5184Sek110237 /*
2243*5184Sek110237  * Emulate a random size append to a file. Will append data
2244*5184Sek110237  * to a file chosen from a fileset if the flowop's fo_fileset
2245*5184Sek110237  * field specifies one or if its fdnumber is non zero. Otherwise
2246*5184Sek110237  * it will write to a fileobj file, if one exists. The flowop's
2247*5184Sek110237  * fo_wss parameter will be used to set the maximum file size
2248*5184Sek110237  * if it is non-zero, otherwise the filesetentry's fse_size
2249*5184Sek110237  * will be used.  A random transfer size (but at most fo_iosize
2250*5184Sek110237  * bytes) and a random memory offset are calculated. A logical
2251*5184Sek110237  * seek to the end of file is done, then writes of up to
2252*5184Sek110237  * FILE_ALLOC_BLOCK in size are done until the full transfer
2253*5184Sek110237  * size has been written. Writes are actually done from fo_buf,
2254*5184Sek110237  * rather than tf_mem as is done with flowoplib_write().
2255*5184Sek110237  * Returns -1 on error, 0 on success.
2256*5184Sek110237  */
2257*5184Sek110237 static int
2258*5184Sek110237 flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop)
2259*5184Sek110237 {
2260*5184Sek110237 	size_t memoffset;
2261*5184Sek110237 	uint64_t appendsize;
2262*5184Sek110237 	off64_t seek;
2263*5184Sek110237 	long memsize, round;
2264*5184Sek110237 	int fd, filedesc;
2265*5184Sek110237 	/* LINTED E_FUNC_SET_NOT_USED */
2266*5184Sek110237 	vinteger_t wss;
2267*5184Sek110237 
2268*5184Sek110237 	if (flowop->fo_fileset || flowop->fo_fdnumber) {
2269*5184Sek110237 		fd = flowoplib_fdnum(threadflow, flowop);
2270*5184Sek110237 
2271*5184Sek110237 		if (fd == -1)
2272*5184Sek110237 			return (-1);
2273*5184Sek110237 
2274*5184Sek110237 		if (threadflow->tf_fd[fd] == 0) {
2275*5184Sek110237 			(void) flowoplib_openfile_common(threadflow,
2276*5184Sek110237 			    flowop, fd);
2277*5184Sek110237 			filebench_log(LOG_DEBUG_IMPL, "append opened file %s",
2278*5184Sek110237 			    threadflow->tf_fse[fd]->fse_path);
2279*5184Sek110237 		}
2280*5184Sek110237 		filedesc = threadflow->tf_fd[fd];
2281*5184Sek110237 		if (*flowop->fo_wss == 0)
2282*5184Sek110237 			wss = threadflow->tf_fse[fd]->fse_size;
2283*5184Sek110237 		else
2284*5184Sek110237 			wss = *flowop->fo_wss;
2285*5184Sek110237 	} else {
2286*5184Sek110237 		if (flowop->fo_file == NULL) {
2287*5184Sek110237 			filebench_log(LOG_ERROR, "flowop NULL file");
2288*5184Sek110237 			return (-1);
2289*5184Sek110237 		}
2290*5184Sek110237 		if (flowop->fo_fd < 0)
2291*5184Sek110237 			flowop->fo_fd = fileobj_open(flowop->fo_file,
2292*5184Sek110237 			    flowoplib_fileattrs(flowop));
2293*5184Sek110237 
2294*5184Sek110237 		if (flowop->fo_fd < 0) {
2295*5184Sek110237 			filebench_log(LOG_ERROR, "failed to open file %s",
2296*5184Sek110237 			    flowop->fo_file->fo_name);
2297*5184Sek110237 			return (-1);
2298*5184Sek110237 		}
2299*5184Sek110237 		filedesc = flowop->fo_fd;
2300*5184Sek110237 		if (*flowop->fo_wss == 0)
2301*5184Sek110237 			wss = *flowop->fo_file->fo_size;
2302*5184Sek110237 		else
2303*5184Sek110237 			wss = *flowop->fo_wss;
2304*5184Sek110237 	}
2305*5184Sek110237 	/* XXX wss is not being used */
2306*5184Sek110237 
2307*5184Sek110237 	if ((flowop->fo_buf == NULL) &&
2308*5184Sek110237 	    ((flowop->fo_buf = (char *)malloc(FILE_ALLOC_BLOCK)) == NULL)) {
2309*5184Sek110237 		return (-1);
2310*5184Sek110237 	}
2311*5184Sek110237 
2312*5184Sek110237 	memsize = *threadflow->tf_memsize;
2313*5184Sek110237 	round = *flowop->fo_iosize;
2314*5184Sek110237 	if (filebench_randomno(&memoffset, memsize, round) == -1) {
2315*5184Sek110237 		filebench_log(LOG_ERROR, "tf_memsize smaller than IO size"
2316*5184Sek110237 		    "for thread %s", flowop->fo_name);
2317*5184Sek110237 		return (-1);
2318*5184Sek110237 	}
2319*5184Sek110237 	if (filebench_randomno64(&appendsize, *flowop->fo_iosize, 1LL) == -1) {
2320*5184Sek110237 		filebench_log(LOG_ERROR, "tf_memsize smaller than IO size"
2321*5184Sek110237 		    "for thread %s", flowop->fo_name);
2322*5184Sek110237 		return (-1);
2323*5184Sek110237 	}
2324*5184Sek110237 
2325*5184Sek110237 	/* Measure time to write bytes */
2326*5184Sek110237 	flowop_beginop(threadflow, flowop);
2327*5184Sek110237 	for (seek = 0; seek < appendsize; seek += FILE_ALLOC_BLOCK) {
2328*5184Sek110237 		off64_t wsize;
2329*5184Sek110237 		int ret = 0;
2330*5184Sek110237 
2331*5184Sek110237 		(void) lseek64(filedesc, 0, SEEK_END);
2332*5184Sek110237 		wsize = ((appendsize - seek) > FILE_ALLOC_BLOCK) ?
2333*5184Sek110237 		    FILE_ALLOC_BLOCK : (appendsize - seek);
2334*5184Sek110237 		ret = write(filedesc, flowop->fo_buf, wsize);
2335*5184Sek110237 		if (ret != wsize) {
2336*5184Sek110237 			filebench_log(LOG_ERROR,
2337*5184Sek110237 			    "Failed to write %d bytes on fd %d: %s",
2338*5184Sek110237 			    wsize, fd, strerror(errno));
2339*5184Sek110237 			flowop_endop(threadflow, flowop);
2340*5184Sek110237 			return (-1);
2341*5184Sek110237 		}
2342*5184Sek110237 	}
2343*5184Sek110237 	flowop_endop(threadflow, flowop);
2344*5184Sek110237 
2345*5184Sek110237 	return (0);
2346*5184Sek110237 }
2347*5184Sek110237 
2348*5184Sek110237 
2349*5184Sek110237 /*
2350*5184Sek110237  * Prints usage information for flowop operations.
2351*5184Sek110237  */
2352*5184Sek110237 void
2353*5184Sek110237 flowoplib_usage()
2354*5184Sek110237 {
2355*5184Sek110237 	(void) fprintf(stderr,
2356*5184Sek110237 	    "flowop [openfile|createfile] name=<name>,fileset=<fname>\n");
2357*5184Sek110237 	(void) fprintf(stderr,
2358*5184Sek110237 	    "                       [,fd=<file desc num>]\n");
2359*5184Sek110237 	(void) fprintf(stderr, "\n");
2360*5184Sek110237 	(void) fprintf(stderr,
2361*5184Sek110237 	    "flowop closefile name=<name>,fd=<file desc num>]\n");
2362*5184Sek110237 	(void) fprintf(stderr, "\n");
2363*5184Sek110237 	(void) fprintf(stderr, "flowop deletefile name=<name>\n");
2364*5184Sek110237 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2365*5184Sek110237 	(void) fprintf(stderr,
2366*5184Sek110237 	    "                       [,fd=<file desc num>]\n");
2367*5184Sek110237 	(void) fprintf(stderr, "\n");
2368*5184Sek110237 	(void) fprintf(stderr, "flowop statfile name=<name>\n");
2369*5184Sek110237 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2370*5184Sek110237 	(void) fprintf(stderr,
2371*5184Sek110237 	    "                       [,fd=<file desc num>]\n");
2372*5184Sek110237 	(void) fprintf(stderr, "\n");
2373*5184Sek110237 	(void) fprintf(stderr,
2374*5184Sek110237 	    "flowop fsync name=<name>,fd=<file desc num>]\n");
2375*5184Sek110237 	(void) fprintf(stderr, "\n");
2376*5184Sek110237 	(void) fprintf(stderr,
2377*5184Sek110237 	    "flowop fsyncset name=<name>,fileset=<fname>]\n");
2378*5184Sek110237 	(void) fprintf(stderr, "\n");
2379*5184Sek110237 	(void) fprintf(stderr, "flowop [write|read|aiowrite] name=<name>, \n");
2380*5184Sek110237 	(void) fprintf(stderr,
2381*5184Sek110237 	    "                       filename|fileset=<fname>,\n");
2382*5184Sek110237 	(void) fprintf(stderr, "                       iosize=<size>\n");
2383*5184Sek110237 	(void) fprintf(stderr, "                       [,directio]\n");
2384*5184Sek110237 	(void) fprintf(stderr, "                       [,dsync]\n");
2385*5184Sek110237 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2386*5184Sek110237 	(void) fprintf(stderr, "                       [,random]\n");
2387*5184Sek110237 	(void) fprintf(stderr, "                       [,opennext]\n");
2388*5184Sek110237 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
2389*5184Sek110237 	(void) fprintf(stderr,
2390*5184Sek110237 	    "flowop [appendfile|appendfilerand] name=<name>, \n");
2391*5184Sek110237 	(void) fprintf(stderr,
2392*5184Sek110237 	    "                       filename|fileset=<fname>,\n");
2393*5184Sek110237 	(void) fprintf(stderr, "                       iosize=<size>\n");
2394*5184Sek110237 	(void) fprintf(stderr, "                       [,dsync]\n");
2395*5184Sek110237 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2396*5184Sek110237 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
2397*5184Sek110237 	(void) fprintf(stderr,
2398*5184Sek110237 	    "flowop [readwholefile|writewholefile] name=<name>, \n");
2399*5184Sek110237 	(void) fprintf(stderr,
2400*5184Sek110237 	    "                       filename|fileset=<fname>,\n");
2401*5184Sek110237 	(void) fprintf(stderr, "                       iosize=<size>\n");
2402*5184Sek110237 	(void) fprintf(stderr, "                       [,dsync]\n");
2403*5184Sek110237 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
2404*5184Sek110237 	(void) fprintf(stderr, "\n");
2405*5184Sek110237 	(void) fprintf(stderr, "flowop aiowait name=<name>,target="
2406*5184Sek110237 	    "<aiowrite-flowop>\n");
2407*5184Sek110237 	(void) fprintf(stderr, "\n");
2408*5184Sek110237 	(void) fprintf(stderr, "flowop sempost name=<name>,"
2409*5184Sek110237 	    "target=<semblock-flowop>,\n");
2410*5184Sek110237 	(void) fprintf(stderr,
2411*5184Sek110237 	    "                       value=<increment-to-post>\n");
2412*5184Sek110237 	(void) fprintf(stderr, "\n");
2413*5184Sek110237 	(void) fprintf(stderr, "flowop semblock name=<name>,value="
2414*5184Sek110237 	    "<decrement-to-receive>,\n");
2415*5184Sek110237 	(void) fprintf(stderr, "                       highwater="
2416*5184Sek110237 	    "<inbound-queue-max>\n");
2417*5184Sek110237 	(void) fprintf(stderr, "\n");
2418*5184Sek110237 	(void) fprintf(stderr, "flowop block name=<name>\n");
2419*5184Sek110237 	(void) fprintf(stderr, "\n");
2420*5184Sek110237 	(void) fprintf(stderr,
2421*5184Sek110237 	    "flowop wakeup name=<name>,target=<block-flowop>,\n");
2422*5184Sek110237 	(void) fprintf(stderr, "\n");
2423*5184Sek110237 	(void) fprintf(stderr,
2424*5184Sek110237 	    "flowop hog name=<name>,value=<number-of-mem-ops>\n");
2425*5184Sek110237 	(void) fprintf(stderr,
2426*5184Sek110237 	    "flowop delay name=<name>,value=<number-of-seconds>\n");
2427*5184Sek110237 	(void) fprintf(stderr, "\n");
2428*5184Sek110237 	(void) fprintf(stderr, "flowop eventlimit name=<name>\n");
2429*5184Sek110237 	(void) fprintf(stderr, "flowop bwlimit name=<name>,value=<mb/s>\n");
2430*5184Sek110237 	(void) fprintf(stderr, "flowop iopslimit name=<name>,value=<iop/s>\n");
2431*5184Sek110237 	(void) fprintf(stderr,
2432*5184Sek110237 	    "flowop finishoncount name=<name>,value=<ops/s>\n");
2433*5184Sek110237 	(void) fprintf(stderr,
2434*5184Sek110237 	    "flowop finishonbytes name=<name>,value=<bytes>\n");
2435*5184Sek110237 	(void) fprintf(stderr, "\n");
2436*5184Sek110237 	(void) fprintf(stderr, "\n");
2437*5184Sek110237 }
2438