xref: /onnv-gate/usr/src/cmd/filebench/common/flowop.c (revision 6701:4213fadfdec4)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include "config.h"
29 
30 #ifdef HAVE_LWPS
31 #include <sys/lwp.h>
32 #endif
33 #include <fcntl.h>
34 #include "filebench.h"
35 #include "flowop.h"
36 #include "stats.h"
37 
38 #ifdef LINUX_PORT
39 #include <sys/types.h>
40 #include <linux/unistd.h>
41 #endif
42 
43 static flowop_t *flowop_define_common(threadflow_t *threadflow, char *name,
44     flowop_t *inherit, flowop_t **flowoplist_hdp, int instance, int type);
45 static int flowop_composite(threadflow_t *threadflow, flowop_t *flowop);
46 static int flowop_composite_init(flowop_t *flowop);
47 static void flowop_composite_destruct(flowop_t *flowop);
48 
49 /*
50  * A collection of flowop support functions. The actual code that
51  * implements the various flowops is in flowop_library.c.
52  *
53  * Routines for defining, creating, initializing and destroying
54  * flowops, cyclically invoking the flowops on each threadflow's flowop
55  * list, collecting statistics about flowop execution, and other
56  * housekeeping duties are included in this file.
57  *
58  * User Defined Composite Flowops
59  *    The ability to define new flowops as lists of built-in or previously
60  * defined flowops has been added to Filebench. In a sense they are like
61  * in-line subroutines, which can have default attributes set at definition
62  * time and passed arguments at invocation time. Like other flowops (and
63  * unlike conventional subroutines) you can invoke them with an iteration
64  * count (the "iter" attribute), and they will loop through their associated
65  * list of flowops for "iter" number of times each time they are encountered
66  * in the thread or outer composite flowop which invokes them.
67  *
68  * Composite flowops are created with a "define" command, are given a name,
69  * optional default attributes, and local variable definitions on the
70  * "define" command line, followed by a brace enclosed list of flowops
71  * to execute. The enclosed flowops may include attributes that reference
72  * the local variables, as well as constants and global variables.
73  *
74  * Composite flowops are used pretty much like regular flowops, but you can
75  * also set local variables to constants or global variables ($local_var =
76  * [$var | $random_var | string | boolean | integer | double]) as part of
77  * the invocation. Thus each invocation can pass customized values to its
78  * inner flowops, greatly increasing their generality.
79  *
80  * All flowops are placed on a global, singly linked list, with fo_next
81  * being the link pointer for this list. The are also placed on a private
82  * list for the thread or composite flowop they are part of. The tf_thrd_fops
83  * pointer in the thread will point to the list of top level flowops in the
84  * thread, which are linked together by fo_exec_next. If any of these flowops
85  * are composite flowops, they will have a list of second level flowops rooted
86  * at the composite flowop's fo_comp_fops pointer. So, there is one big list
87  * of all flowops, and an n-arry tree of threads, composite flowops, and
88  * flowops, with composite flowops being the branch nodes in the tree.
89  *
90  * To illustrate, if we have three first level flowops, the first of which is
91  * a composite flowop consisting of two other flowops, we get:
92  *
93  * Thread->tf_thrd_fops -> flowop->fo_exec_next -> flowop->fo_exec_next
94  *			   flowop->fo_comp_fops		    |
95  *				    |			    V
96  *				    |			flowop->fo_exec_next
97  *				    |
98  *				    V
99  *				flowop->fo_exec_next -> flowop->fo_exec_next
100  *
101  * And all five flowops (plus others from any other threads) are on a global
102  * list linked with fo_next.
103  */
104 
105 /*
106  * Prints the name and instance number of each flowop in
107  * the supplied list to the filebench log.
108  */
109 int
110 flowop_printlist(flowop_t *list)
111 {
112 	flowop_t *flowop = list;
113 
114 	while (flowop) {
115 		filebench_log(LOG_DEBUG_IMPL, "flowop-list %s-%d",
116 		    flowop->fo_name, flowop->fo_instance);
117 		flowop = flowop->fo_exec_next;
118 	}
119 	return (0);
120 }
121 
122 /*
123  * Prints the name and instance number of all flowops on
124  * the master flowop list to the console and the filebench log.
125  */
126 void
127 flowop_printall(void)
128 {
129 	flowop_t *flowop = filebench_shm->shm_flowoplist;
130 
131 	while (flowop) {
132 		filebench_log(LOG_VERBOSE, "flowop-list %s-%d",
133 		    flowop->fo_name, flowop->fo_instance);
134 		flowop = flowop->fo_next;
135 	}
136 }
137 
138 #define	TIMESPEC_TO_HRTIME(s, e) (((e.tv_sec - s.tv_sec) * 1000000000LL) + \
139 					(e.tv_nsec - s.tv_nsec))
140 /*
141  * Puts current high resolution time in start time entry
142  * for threadflow and may also calculate running filebench
143  * overhead statistics.
144  */
145 void
146 flowop_beginop(threadflow_t *threadflow, flowop_t *flowop)
147 {
148 #ifdef HAVE_PROCFS
149 	if ((noproc == 0) && (threadflow->tf_lwpusagefd == 0)) {
150 		char procname[128];
151 
152 		(void) snprintf(procname, sizeof (procname),
153 		    "/proc/%d/lwp/%d/lwpusage", my_pid, _lwp_self());
154 		threadflow->tf_lwpusagefd = open(procname, O_RDONLY);
155 	}
156 
157 	(void) pread(threadflow->tf_lwpusagefd,
158 	    &threadflow->tf_susage,
159 	    sizeof (struct prusage), 0);
160 
161 	/* Compute overhead time in this thread around op */
162 	if (threadflow->tf_eusage.pr_stime.tv_nsec) {
163 		flowop->fo_stats.fs_mstate[FLOW_MSTATE_OHEAD] +=
164 		    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_utime,
165 		    threadflow->tf_susage.pr_utime) +
166 		    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_ttime,
167 		    threadflow->tf_susage.pr_ttime) +
168 		    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_stime,
169 		    threadflow->tf_susage.pr_stime);
170 	}
171 #endif
172 	/* Start of op for this thread */
173 	threadflow->tf_stime = gethrtime();
174 }
175 
176 flowstat_t controlstats;
177 pthread_mutex_t controlstats_lock;
178 static int controlstats_zeroed = 0;
179 
180 /*
181  * Updates flowop's latency statistics, using saved start
182  * time and current high resolution time. Updates flowop's
183  * io count and transferred bytes statistics. Also updates
184  * threadflow's and flowop's cumulative read or write byte
185  * and io count statistics.
186  */
187 void
188 flowop_endop(threadflow_t *threadflow, flowop_t *flowop, int64_t bytes)
189 {
190 	hrtime_t t;
191 
192 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_LAT] +=
193 	    (gethrtime() - threadflow->tf_stime);
194 #ifdef HAVE_PROCFS
195 	if ((pread(threadflow->tf_lwpusagefd, &threadflow->tf_eusage,
196 	    sizeof (struct prusage), 0)) != sizeof (struct prusage))
197 		filebench_log(LOG_ERROR, "cannot read /proc");
198 
199 	t =
200 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_utime,
201 	    threadflow->tf_eusage.pr_utime) +
202 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_ttime,
203 	    threadflow->tf_eusage.pr_ttime) +
204 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_stime,
205 	    threadflow->tf_eusage.pr_stime);
206 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_CPU] += t;
207 
208 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_WAIT] +=
209 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_tftime,
210 	    threadflow->tf_eusage.pr_tftime) +
211 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_dftime,
212 	    threadflow->tf_eusage.pr_dftime) +
213 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
214 	    threadflow->tf_eusage.pr_kftime) +
215 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
216 	    threadflow->tf_eusage.pr_kftime) +
217 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_slptime,
218 	    threadflow->tf_eusage.pr_slptime);
219 #endif
220 
221 	flowop->fo_stats.fs_count++;
222 	flowop->fo_stats.fs_bytes += bytes;
223 	(void) ipc_mutex_lock(&controlstats_lock);
224 	if ((flowop->fo_type & FLOW_TYPE_IO) ||
225 	    (flowop->fo_type & FLOW_TYPE_AIO)) {
226 		controlstats.fs_count++;
227 		controlstats.fs_bytes += bytes;
228 	}
229 	if (flowop->fo_attrs & FLOW_ATTR_READ) {
230 		threadflow->tf_stats.fs_rbytes += bytes;
231 		threadflow->tf_stats.fs_rcount++;
232 		flowop->fo_stats.fs_rcount++;
233 		controlstats.fs_rbytes += bytes;
234 		controlstats.fs_rcount++;
235 	} else if (flowop->fo_attrs & FLOW_ATTR_WRITE) {
236 		threadflow->tf_stats.fs_wbytes += bytes;
237 		threadflow->tf_stats.fs_wcount++;
238 		flowop->fo_stats.fs_wcount++;
239 		controlstats.fs_wbytes += bytes;
240 		controlstats.fs_wcount++;
241 	}
242 	(void) ipc_mutex_unlock(&controlstats_lock);
243 }
244 
245 /*
246  * Calls the flowop's initialization function, pointed to by
247  * flowop->fo_init.
248  */
249 static int
250 flowop_initflow(flowop_t *flowop)
251 {
252 	/*
253 	 * save static copies of two items, in case they are supplied
254 	 * from random variables
255 	 */
256 	flowop->fo_constvalue = avd_get_int(flowop->fo_value);
257 	flowop->fo_constwss = avd_get_int(flowop->fo_wss);
258 
259 	if ((*flowop->fo_init)(flowop) < 0) {
260 		filebench_log(LOG_ERROR, "flowop %s-%d init failed",
261 		    flowop->fo_name, flowop->fo_instance);
262 		return (-1);
263 	}
264 	return (0);
265 }
266 
267 static int
268 flowop_create_runtime_flowops(threadflow_t *threadflow, flowop_t **ops_list_ptr)
269 {
270 	flowop_t *flowop = *ops_list_ptr;
271 
272 	while (flowop) {
273 		flowop_t *newflowop;
274 
275 		if (flowop == *ops_list_ptr)
276 			*ops_list_ptr = NULL;
277 
278 		newflowop = flowop_define_common(threadflow, flowop->fo_name,
279 		    flowop, ops_list_ptr, 1, 0);
280 		if (newflowop == NULL)
281 			return (FILEBENCH_ERROR);
282 
283 		/* check for fo_filename attribute, and resolve if present */
284 		if (flowop->fo_filename) {
285 			char *name;
286 
287 			name = avd_get_str(flowop->fo_filename);
288 			newflowop->fo_fileset = fileset_find(name);
289 
290 			if (newflowop->fo_fileset == NULL) {
291 				filebench_log(LOG_ERROR,
292 				    "flowop %s: file %s not found",
293 				    newflowop->fo_name, name);
294 				filebench_shutdown(1);
295 			}
296 		}
297 
298 		if (flowop_initflow(newflowop) < 0) {
299 			filebench_log(LOG_ERROR, "Flowop init of %s failed",
300 			    newflowop->fo_name);
301 		}
302 
303 		flowop = flowop->fo_exec_next;
304 	}
305 	return (FILEBENCH_OK);
306 }
307 
308 /*
309  * Calls the flowop's destruct function, pointed to by
310  * flowop->fo_destruct.
311  */
312 static void
313 flowop_destructflow(flowop_t *flowop)
314 {
315 	(*flowop->fo_destruct)(flowop);
316 }
317 
318 /*
319  * call the destruct funtions of all the threadflow's flowops,
320  * if it is still flagged as "running".
321  */
322 void
323 flowop_destruct_all_flows(threadflow_t *threadflow)
324 {
325 	flowop_t *flowop;
326 
327 	/* wait a moment to give other threads a chance to stop too */
328 	(void) sleep(1);
329 
330 	(void) ipc_mutex_lock(&threadflow->tf_lock);
331 
332 	/* prepare to call destruct flow routines, if necessary */
333 	if (threadflow->tf_running == 0) {
334 
335 		/* allready destroyed */
336 		(void) ipc_mutex_unlock(&threadflow->tf_lock);
337 		return;
338 	}
339 
340 	flowop = threadflow->tf_thrd_fops;
341 	threadflow->tf_running = 0;
342 	(void) ipc_mutex_unlock(&threadflow->tf_lock);
343 
344 	while (flowop) {
345 		flowop_destructflow(flowop);
346 		flowop = flowop->fo_exec_next;
347 	}
348 }
349 
350 /*
351  * The final initialization and main execution loop for the
352  * worker threads. Sets threadflow and flowop start times,
353  * waits for all process to start, then creates the runtime
354  * flowops from those defined by the F language workload
355  * script. It does some more initialization, then enters a
356  * loop to repeatedly execute the flowops on the flowop list
357  * until an abort condition is detected, at which time it exits.
358  * This is the starting routine for the new worker thread
359  * created by threadflow_createthread(), and is not currently
360  * called from anywhere else.
361  */
362 void
363 flowop_start(threadflow_t *threadflow)
364 {
365 	flowop_t *flowop;
366 	size_t memsize;
367 	int ret = 0;
368 
369 #ifdef HAVE_PROCFS
370 	if (noproc == 0) {
371 		char procname[128];
372 		long ctl[2] = {PCSET, PR_MSACCT};
373 		int pfd;
374 
375 		(void) snprintf(procname, sizeof (procname),
376 		    "/proc/%d/lwp/%d/lwpctl", my_pid, _lwp_self());
377 		pfd = open(procname, O_WRONLY);
378 		(void) pwrite(pfd, &ctl, sizeof (ctl), 0);
379 		(void) close(pfd);
380 	}
381 #endif
382 
383 	(void) ipc_mutex_lock(&controlstats_lock);
384 	if (!controlstats_zeroed) {
385 		(void) memset(&controlstats, 0, sizeof (controlstats));
386 		controlstats_zeroed = 1;
387 	}
388 	(void) ipc_mutex_unlock(&controlstats_lock);
389 
390 	flowop = threadflow->tf_thrd_fops;
391 	threadflow->tf_stats.fs_stime = gethrtime();
392 	flowop->fo_stats.fs_stime = gethrtime();
393 
394 	/* Hold the flowop find lock as reader to prevent lookups */
395 	(void) pthread_rwlock_rdlock(&filebench_shm->shm_flowop_find_lock);
396 
397 	/*
398 	 * Block until all processes have started, acting like
399 	 * a barrier. The original filebench process initially
400 	 * holds the run_lock as a reader, preventing any of the
401 	 * threads from obtaining the writer lock, and hence
402 	 * passing this point. Once all processes and threads
403 	 * have been created, the original process unlocks
404 	 * run_lock, allowing each waiting thread to lock
405 	 * and then immediately unlock it, then begin running.
406 	 */
407 	(void) pthread_rwlock_wrlock(&filebench_shm->shm_run_lock);
408 	(void) pthread_rwlock_unlock(&filebench_shm->shm_run_lock);
409 
410 	/* Create the runtime flowops from those defined by the script */
411 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
412 	if (flowop_create_runtime_flowops(threadflow, &threadflow->tf_thrd_fops)
413 	    != FILEBENCH_OK) {
414 		(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
415 		filebench_shutdown(1);
416 		return;
417 	}
418 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
419 
420 	/* Release the find lock as reader to allow lookups */
421 	(void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
422 
423 	/* Set to the start of the new flowop list */
424 	flowop = threadflow->tf_thrd_fops;
425 
426 	threadflow->tf_abort = 0;
427 	threadflow->tf_running = 1;
428 
429 	memsize = (size_t)threadflow->tf_constmemsize;
430 
431 	/* If we are going to use ISM, allocate later */
432 	if (threadflow->tf_attrs & THREADFLOW_USEISM) {
433 		threadflow->tf_mem =
434 		    ipc_ismmalloc(memsize);
435 	} else {
436 		threadflow->tf_mem =
437 		    malloc(memsize);
438 	}
439 
440 	(void) memset(threadflow->tf_mem, 0, memsize);
441 	filebench_log(LOG_DEBUG_SCRIPT, "Thread allocated %d bytes", memsize);
442 
443 #ifdef HAVE_LWPS
444 	filebench_log(LOG_DEBUG_SCRIPT, "Thread %zx (%d) started",
445 	    threadflow,
446 	    _lwp_self());
447 #endif
448 
449 	/* Main filebench worker loop */
450 	/* CONSTCOND */
451 	while (1) {
452 		int i, count;
453 
454 		/* Abort if asked */
455 		if (threadflow->tf_abort || filebench_shm->shm_f_abort)
456 			break;
457 
458 		/* Be quiet while stats are gathered */
459 		if (filebench_shm->shm_bequiet) {
460 			(void) sleep(1);
461 			continue;
462 		}
463 
464 		/* Take it easy until everyone is ready to go */
465 		if (!filebench_shm->shm_procs_running) {
466 			(void) sleep(1);
467 			continue;
468 		}
469 
470 		if (flowop == NULL) {
471 			filebench_log(LOG_ERROR, "flowop_read null flowop");
472 			return;
473 		}
474 
475 		if (flowop->fo_stats.fs_stime == 0)
476 			flowop->fo_stats.fs_stime = gethrtime();
477 
478 		/* Execute the flowop for fo_iters times */
479 		count = (int)avd_get_int(flowop->fo_iters);
480 		for (i = 0; i < count; i++) {
481 
482 			filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
483 			    "%s-%d", threadflow->tf_name, flowop->fo_name,
484 			    flowop->fo_instance);
485 
486 			ret = (*flowop->fo_func)(threadflow, flowop);
487 
488 			/*
489 			 * Return value FILEBENCH_ERROR means "flowop
490 			 * failed, stop the filebench run"
491 			 */
492 			if (ret == FILEBENCH_ERROR) {
493 				filebench_log(LOG_ERROR,
494 				    "%s-%d: flowop %s-%d failed",
495 				    threadflow->tf_name,
496 				    threadflow->tf_instance,
497 				    flowop->fo_name,
498 				    flowop->fo_instance);
499 				(void) ipc_mutex_lock(&threadflow->tf_lock);
500 				threadflow->tf_abort = 1;
501 				filebench_shm->shm_f_abort =
502 				    FILEBENCH_ABORT_ERROR;
503 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
504 				break;
505 			}
506 
507 			/*
508 			 * Return value of FILEBENCH_NORSC means "stop
509 			 * the filebench run" if in "end on no work mode",
510 			 * otherwise it indicates an error
511 			 */
512 			if (ret == FILEBENCH_NORSC) {
513 				(void) ipc_mutex_lock(&threadflow->tf_lock);
514 				threadflow->tf_abort = FILEBENCH_DONE;
515 				if (filebench_shm->shm_rmode ==
516 				    FILEBENCH_MODE_Q1STDONE) {
517 					filebench_shm->shm_f_abort =
518 					    FILEBENCH_ABORT_RSRC;
519 				} else if (filebench_shm->shm_rmode !=
520 				    FILEBENCH_MODE_QALLDONE) {
521 					filebench_log(LOG_ERROR1,
522 					    "WARNING! Run stopped early:\n   "
523 					    "             flowop %s-%d could "
524 					    "not obtain a file. Please\n      "
525 					    "          reduce runtime, "
526 					    "increase fileset entries "
527 					    "($nfiles), or switch modes.",
528 					    flowop->fo_name,
529 					    flowop->fo_instance);
530 					filebench_shm->shm_f_abort =
531 					    FILEBENCH_ABORT_ERROR;
532 				}
533 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
534 				break;
535 			}
536 
537 			/*
538 			 * Return value of FILEBENCH_DONE means "stop
539 			 * the filebench run without error"
540 			 */
541 			if (ret == FILEBENCH_DONE) {
542 				(void) ipc_mutex_lock(&threadflow->tf_lock);
543 				threadflow->tf_abort = FILEBENCH_DONE;
544 				filebench_shm->shm_f_abort =
545 				    FILEBENCH_ABORT_DONE;
546 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
547 				break;
548 			}
549 
550 			/*
551 			 * If we get here and the return is something other
552 			 * than FILEBENCH_OK, it means a spurious code
553 			 * was returned, so treat as major error. This
554 			 * probably indicates a bug in the flowop.
555 			 */
556 			if (ret != FILEBENCH_OK) {
557 				filebench_log(LOG_ERROR,
558 				    "Flowop %s unexpected return value = %d\n",
559 				    flowop->fo_name, ret);
560 				filebench_shm->shm_f_abort =
561 				    FILEBENCH_ABORT_ERROR;
562 				break;
563 			}
564 		}
565 
566 		/* advance to next flowop */
567 		flowop = flowop->fo_exec_next;
568 
569 		/* but if at end of list, start over from the beginning */
570 		if (flowop == NULL) {
571 			flowop = threadflow->tf_thrd_fops;
572 			threadflow->tf_stats.fs_count++;
573 		}
574 	}
575 
576 #ifdef HAVE_LWPS
577 	filebench_log(LOG_DEBUG_SCRIPT, "Thread %d exiting",
578 	    _lwp_self());
579 #endif
580 
581 	/* Tell flowops to destroy locally acquired state */
582 	flowop_destruct_all_flows(threadflow);
583 
584 	pthread_exit(&threadflow->tf_abort);
585 }
586 
587 void
588 flowop_init(void)
589 {
590 	(void) pthread_mutex_init(&controlstats_lock, ipc_mutexattr());
591 	flowoplib_init();
592 }
593 
594 /*
595  * Delete the designated flowop from the thread's flowop list.
596  */
597 static void
598 flowop_delete(flowop_t **flowoplist, flowop_t *flowop)
599 {
600 	flowop_t *entry = *flowoplist;
601 	int found = 0;
602 
603 	filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
604 	    flowop->fo_name,
605 	    flowop->fo_instance);
606 
607 	/* Delete from thread's flowop list */
608 	if (flowop == *flowoplist) {
609 		/* First on list */
610 		*flowoplist = flowop->fo_exec_next;
611 		filebench_log(LOG_DEBUG_IMPL,
612 		    "Delete0 flowop: (%s-%d)",
613 		    flowop->fo_name,
614 		    flowop->fo_instance);
615 	} else {
616 		while (entry->fo_exec_next) {
617 			filebench_log(LOG_DEBUG_IMPL,
618 			    "Delete0 flowop: (%s-%d) == (%s-%d)",
619 			    entry->fo_exec_next->fo_name,
620 			    entry->fo_exec_next->fo_instance,
621 			    flowop->fo_name,
622 			    flowop->fo_instance);
623 
624 			if (flowop == entry->fo_exec_next) {
625 				/* Delete */
626 				filebench_log(LOG_DEBUG_IMPL,
627 				    "Deleted0 flowop: (%s-%d)",
628 				    entry->fo_exec_next->fo_name,
629 				    entry->fo_exec_next->fo_instance);
630 				entry->fo_exec_next =
631 				    entry->fo_exec_next->fo_exec_next;
632 				break;
633 			}
634 			entry = entry->fo_exec_next;
635 		}
636 	}
637 
638 #ifdef HAVE_PROCFS
639 	/* Close /proc stats */
640 	if (flowop->fo_thread)
641 		(void) close(flowop->fo_thread->tf_lwpusagefd);
642 #endif
643 
644 	/* Delete from global list */
645 	entry = filebench_shm->shm_flowoplist;
646 
647 	if (flowop == filebench_shm->shm_flowoplist) {
648 		/* First on list */
649 		filebench_shm->shm_flowoplist = flowop->fo_next;
650 		found = 1;
651 	} else {
652 		while (entry->fo_next) {
653 			filebench_log(LOG_DEBUG_IMPL,
654 			    "Delete flowop: (%s-%d) == (%s-%d)",
655 			    entry->fo_next->fo_name,
656 			    entry->fo_next->fo_instance,
657 			    flowop->fo_name,
658 			    flowop->fo_instance);
659 
660 			if (flowop == entry->fo_next) {
661 				/* Delete */
662 				entry->fo_next = entry->fo_next->fo_next;
663 				found = 1;
664 				break;
665 			}
666 
667 			entry = entry->fo_next;
668 		}
669 	}
670 	if (found) {
671 		filebench_log(LOG_DEBUG_IMPL,
672 		    "Deleted flowop: (%s-%d)",
673 		    flowop->fo_name,
674 		    flowop->fo_instance);
675 		ipc_free(FILEBENCH_FLOWOP, (char *)flowop);
676 	} else {
677 		filebench_log(LOG_DEBUG_IMPL, "Flowop %s-%d not found!",
678 		    flowop->fo_name,
679 		    flowop->fo_instance);
680 	}
681 }
682 
683 /*
684  * Deletes all the flowops from a flowop list.
685  */
686 void
687 flowop_delete_all(flowop_t **flowoplist)
688 {
689 	flowop_t *flowop = *flowoplist;
690 
691 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
692 
693 	while (flowop) {
694 		filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
695 		    flowop->fo_name, flowop->fo_instance);
696 
697 		if (flowop->fo_instance &&
698 		    (flowop->fo_instance == FLOW_MASTER)) {
699 			flowop = flowop->fo_exec_next;
700 			continue;
701 		}
702 		flowop_delete(flowoplist, flowop);
703 		flowop = flowop->fo_exec_next;
704 	}
705 
706 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
707 }
708 
709 /*
710  * Allocates a flowop entity and initializes it with inherited
711  * contents from the "inherit" flowop, if it is supplied, or
712  * with zeros otherwise. In either case the fo_next and fo_exec_next
713  * pointers are set to NULL, and fo_thread is set to point to
714  * the owning threadflow. The initialized flowop is placed at
715  * the head of the global flowop list, and also placed on the
716  * tail of the supplied local flowop list, which will either
717  * be a threadflow's tf_thrd_fops list or a composite flowop's
718  * fo_comp_fops list. The routine locks the flowop's fo_lock and
719  * leaves it held on return. If successful, it returns a pointer
720  * to the allocated and initialized flowop, otherwise it returns NULL.
721  *
722  * filebench_shm->shm_flowop_lock must be held by caller.
723  */
724 static flowop_t *
725 flowop_define_common(threadflow_t *threadflow, char *name, flowop_t *inherit,
726     flowop_t **flowoplist_hdp, int instance, int type)
727 {
728 	flowop_t *flowop;
729 
730 	if (name == NULL)
731 		return (NULL);
732 
733 	if ((flowop = (flowop_t *)ipc_malloc(FILEBENCH_FLOWOP)) == NULL) {
734 		filebench_log(LOG_ERROR,
735 		    "flowop_define: Can't malloc flowop");
736 		return (NULL);
737 	}
738 
739 	filebench_log(LOG_DEBUG_IMPL, "defining flowops %s-%d, addr %zx",
740 	    name, instance, flowop);
741 
742 	if (flowop == NULL)
743 		return (NULL);
744 
745 	if (inherit) {
746 		(void) memcpy(flowop, inherit, sizeof (flowop_t));
747 		(void) pthread_mutex_init(&flowop->fo_lock, ipc_mutexattr());
748 		(void) ipc_mutex_lock(&flowop->fo_lock);
749 		flowop->fo_next = NULL;
750 		flowop->fo_exec_next = NULL;
751 		filebench_log(LOG_DEBUG_IMPL,
752 		    "flowop %s-%d calling init", name, instance);
753 	} else {
754 		(void) memset(flowop, 0, sizeof (flowop_t));
755 		flowop->fo_iters = avd_int_alloc(1);
756 		flowop->fo_type = type;
757 		(void) pthread_mutex_init(&flowop->fo_lock, ipc_mutexattr());
758 		(void) ipc_mutex_lock(&flowop->fo_lock);
759 	}
760 
761 	/* Create backpointer to thread */
762 	flowop->fo_thread = threadflow;
763 
764 	/* Add flowop to global list */
765 	if (filebench_shm->shm_flowoplist == NULL) {
766 		filebench_shm->shm_flowoplist = flowop;
767 		flowop->fo_next = NULL;
768 	} else {
769 		flowop->fo_next = filebench_shm->shm_flowoplist;
770 		filebench_shm->shm_flowoplist = flowop;
771 	}
772 
773 	(void) strcpy(flowop->fo_name, name);
774 	flowop->fo_instance = instance;
775 
776 	if (flowoplist_hdp == NULL)
777 		return (flowop);
778 
779 	/* Add flowop to thread op list */
780 	if (*flowoplist_hdp == NULL) {
781 		*flowoplist_hdp = flowop;
782 		flowop->fo_exec_next = NULL;
783 	} else {
784 		flowop_t *flowend;
785 
786 		/* Find the end of the thread list */
787 		flowend = *flowoplist_hdp;
788 		while (flowend->fo_exec_next != NULL)
789 			flowend = flowend->fo_exec_next;
790 		flowend->fo_exec_next = flowop;
791 		flowop->fo_exec_next = NULL;
792 	}
793 
794 	return (flowop);
795 }
796 
797 /*
798  * Calls flowop_define_common() to allocate and initialize a
799  * flowop, and holds the shared flowop_lock during the call.
800  * It releases the created flowop's fo_lock when done.
801  */
802 flowop_t *
803 flowop_define(threadflow_t *threadflow, char *name, flowop_t *inherit,
804     flowop_t **flowoplist_hdp, int instance, int type)
805 {
806 	flowop_t	*flowop;
807 
808 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
809 	flowop = flowop_define_common(threadflow, name,
810 	    inherit, flowoplist_hdp, instance, type);
811 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
812 
813 	if (flowop == NULL)
814 		return (NULL);
815 
816 	(void) ipc_mutex_unlock(&flowop->fo_lock);
817 	return (flowop);
818 }
819 
820 /*
821  * Calls flowop_define_common() to allocate and initialize a
822  * composite flowop, and holds the shared flowop_lock during the call.
823  * It releases the created flowop's fo_lock when done.
824  */
825 flowop_t *
826 flowop_new_composite_define(char *name)
827 {
828 	flowop_t *flowop;
829 
830 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
831 	flowop = flowop_define_common(NULL, name,
832 	    NULL, NULL, 0, FLOW_TYPE_COMPOSITE);
833 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
834 
835 	if (flowop == NULL)
836 		return (NULL);
837 
838 	flowop->fo_func = flowop_composite;
839 	flowop->fo_init = flowop_composite_init;
840 	flowop->fo_destruct = flowop_composite_destruct;
841 	(void) ipc_mutex_unlock(&flowop->fo_lock);
842 
843 	return (flowop);
844 }
845 
846 /*
847  * Attempts to take a write lock on the flowop_find_lock that is
848  * defined in interprocess shared memory. Since each call to
849  * flowop_start() holds a read lock on flowop_find_lock, this
850  * routine effectively blocks until all instances of
851  * flowop_start() have finished. The flowop_find() routine calls
852  * this routine so that flowops won't be searched for until all
853  * flowops have been created by flowop_start.
854  */
855 static void
856 flowop_find_barrier(void)
857 {
858 	/* Block on wrlock to ensure find waits for all creates */
859 	(void) pthread_rwlock_wrlock(&filebench_shm->shm_flowop_find_lock);
860 	(void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
861 }
862 
863 /*
864  * Returns a list of flowops named "name" from the master
865  * flowop list.
866  */
867 flowop_t *
868 flowop_find(char *name)
869 {
870 	flowop_t *flowop;
871 	flowop_t *result = NULL;
872 
873 	flowop_find_barrier();
874 
875 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
876 
877 	flowop = filebench_shm->shm_flowoplist;
878 
879 	while (flowop) {
880 		if (strcmp(name, flowop->fo_name) == 0) {
881 
882 			/* Add flowop to result list */
883 			if (result == NULL) {
884 				result = flowop;
885 				flowop->fo_resultnext = NULL;
886 			} else {
887 				flowop->fo_resultnext = result;
888 				result = flowop;
889 			}
890 		}
891 		flowop = flowop->fo_next;
892 	}
893 
894 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
895 
896 
897 	return (result);
898 }
899 
900 /*
901  * Returns a pointer to the specified instance of flowop
902  * "name" from the global list.
903  */
904 flowop_t *
905 flowop_find_one(char *name, int instance)
906 {
907 	flowop_t *test_flowop;
908 
909 	flowop_find_barrier();
910 
911 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
912 
913 	test_flowop = filebench_shm->shm_flowoplist;
914 
915 	while (test_flowop) {
916 		if ((strcmp(name, test_flowop->fo_name) == 0) &&
917 		    (instance == test_flowop->fo_instance))
918 			break;
919 
920 		test_flowop = test_flowop->fo_next;
921 	}
922 
923 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
924 
925 	return (test_flowop);
926 }
927 
928 /*
929  * recursively searches through lists of flowops on a given thread
930  * and those on any included composite flowops for the named flowop.
931  * either returns with a pointer to the named flowop or NULL if it
932  * cannot be found.
933  */
934 static flowop_t *
935 flowop_recurse_search(char *path, char *name, flowop_t *list)
936 {
937 	flowop_t *test_flowop;
938 	char fullname[MAXPATHLEN];
939 
940 	test_flowop = list;
941 
942 	/*
943 	 * when searching a list of inner flowops, "path" is the fullname
944 	 * of the containing composite flowop. Use it to form the
945 	 * full name of the inner flowop to search for.
946 	 */
947 	if (path) {
948 		if ((strlen(path) + strlen(name) + 1) > MAXPATHLEN) {
949 			filebench_log(LOG_ERROR,
950 			    "composite flowop path name %s.%s too long",
951 			    path, name);
952 			return (NULL);
953 		}
954 
955 		/* create composite_name.name for recursive search */
956 		(void) strcpy(fullname, path);
957 		(void) strcat(fullname, ".");
958 		(void) strcat(fullname, name);
959 	} else {
960 		(void) strcpy(fullname, name);
961 	}
962 
963 	/*
964 	 * loop through all flowops on the supplied tf_thrd_fops (flowop)
965 	 * list or fo_comp_fops (inner flowop) list.
966 	 */
967 	while (test_flowop) {
968 		if (strcmp(fullname, test_flowop->fo_name) == 0)
969 			return (test_flowop);
970 
971 		if (test_flowop->fo_type == FLOW_TYPE_COMPOSITE) {
972 			flowop_t *found_flowop;
973 
974 			found_flowop = flowop_recurse_search(
975 			    test_flowop->fo_name, name,
976 			    test_flowop->fo_comp_fops);
977 
978 			if (found_flowop)
979 				return (found_flowop);
980 		}
981 		test_flowop = test_flowop->fo_exec_next;
982 	}
983 
984 	/* not found here or on any child lists */
985 	return (NULL);
986 }
987 
988 /*
989  * Returns a pointer to flowop named "name" from the supplied tf_thrd_fops
990  * list of flowops. Returns the named flowop if found, or NULL.
991  */
992 flowop_t *
993 flowop_find_from_list(char *name, flowop_t *list)
994 {
995 	flowop_t *found_flowop;
996 
997 	flowop_find_barrier();
998 
999 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
1000 
1001 	found_flowop = flowop_recurse_search(NULL, name, list);
1002 
1003 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
1004 
1005 	return (found_flowop);
1006 }
1007 
1008 /*
1009  * Composite flowop method. Does one pass through its list of
1010  * inner flowops per iteration.
1011  */
1012 static int
1013 flowop_composite(threadflow_t *threadflow, flowop_t *flowop)
1014 {
1015 	flowop_t	*inner_flowop;
1016 
1017 	/* get the first flowop in the list */
1018 	inner_flowop = flowop->fo_comp_fops;
1019 
1020 	/* make a pass through the list of sub flowops */
1021 	while (inner_flowop) {
1022 		int	i, count;
1023 
1024 		/* Abort if asked */
1025 		if (threadflow->tf_abort || filebench_shm->shm_f_abort)
1026 			return (FILEBENCH_DONE);
1027 
1028 		if (inner_flowop->fo_stats.fs_stime == 0)
1029 			inner_flowop->fo_stats.fs_stime = gethrtime();
1030 
1031 		/* Execute the flowop for fo_iters times */
1032 		count = (int)avd_get_int(inner_flowop->fo_iters);
1033 		for (i = 0; i < count; i++) {
1034 
1035 			filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
1036 			    "%s-%d", threadflow->tf_name,
1037 			    inner_flowop->fo_name,
1038 			    inner_flowop->fo_instance);
1039 
1040 			switch ((*inner_flowop->fo_func)(threadflow,
1041 			    inner_flowop)) {
1042 
1043 			/* all done */
1044 			case FILEBENCH_DONE:
1045 				return (FILEBENCH_DONE);
1046 
1047 			/* quit if inner flowop limit reached */
1048 			case FILEBENCH_NORSC:
1049 				return (FILEBENCH_NORSC);
1050 
1051 			/* quit on inner flowop error */
1052 			case FILEBENCH_ERROR:
1053 				filebench_log(LOG_ERROR,
1054 				    "inner flowop %s failed",
1055 				    inner_flowop->fo_name);
1056 				return (FILEBENCH_ERROR);
1057 
1058 			/* otherwise keep going */
1059 			default:
1060 				break;
1061 			}
1062 
1063 		}
1064 
1065 		/* advance to next flowop */
1066 		inner_flowop = inner_flowop->fo_exec_next;
1067 	}
1068 
1069 	/* finished with this pass */
1070 	return (FILEBENCH_OK);
1071 }
1072 
1073 /*
1074  * Composite flowop initialization. Creates runtime inner flowops
1075  * from prototype inner flowops.
1076  */
1077 static int
1078 flowop_composite_init(flowop_t *flowop)
1079 {
1080 	int err;
1081 
1082 	err = flowop_create_runtime_flowops(flowop->fo_thread,
1083 	    &flowop->fo_comp_fops);
1084 	if (err != FILEBENCH_OK)
1085 		return (err);
1086 
1087 	(void) ipc_mutex_unlock(&flowop->fo_lock);
1088 	return (0);
1089 }
1090 
1091 /*
1092  * clean up inner flowops
1093  */
1094 static void
1095 flowop_composite_destruct(flowop_t *flowop)
1096 {
1097 	flowop_t *inner_flowop = flowop->fo_comp_fops;
1098 
1099 	while (inner_flowop) {
1100 		filebench_log(LOG_DEBUG_IMPL, "Deleting inner flowop (%s-%d)",
1101 		    inner_flowop->fo_name, inner_flowop->fo_instance);
1102 
1103 		if (inner_flowop->fo_instance &&
1104 		    (inner_flowop->fo_instance == FLOW_MASTER)) {
1105 			inner_flowop = inner_flowop->fo_exec_next;
1106 			continue;
1107 		}
1108 		flowop_delete(&flowop->fo_comp_fops, inner_flowop);
1109 		inner_flowop = inner_flowop->fo_exec_next;
1110 	}
1111 }
1112