xref: /onnv-gate/usr/src/cmd/filebench/common/flowop.c (revision 6550:43ed8ddd4789)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include "config.h"
29 
30 #ifdef HAVE_LWPS
31 #include <sys/lwp.h>
32 #endif
33 #include <fcntl.h>
34 #include "filebench.h"
35 #include "flowop.h"
36 #include "stats.h"
37 
38 #ifdef LINUX_PORT
39 #include <sys/types.h>
40 #include <linux/unistd.h>
41 #endif
42 
43 static flowop_t *flowop_define_common(threadflow_t *threadflow, char *name,
44     flowop_t *inherit, flowop_t **flowoplist_hdp, int instance, int type);
45 static int flowop_composite(threadflow_t *threadflow, flowop_t *flowop);
46 static int flowop_composite_init(flowop_t *flowop);
47 static void flowop_composite_destruct(flowop_t *flowop);
48 
49 /*
50  * A collection of flowop support functions. The actual code that
51  * implements the various flowops is in flowop_library.c.
52  *
53  * Routines for defining, creating, initializing and destroying
54  * flowops, cyclically invoking the flowops on each threadflow's flowop
55  * list, collecting statistics about flowop execution, and other
56  * housekeeping duties are included in this file.
57  *
58  * User Defined Composite Flowops
59  *    The ability to define new flowops as lists of built-in or previously
60  * defined flowops has been added to Filebench. In a sense they are like
61  * in-line subroutines, which can have default attributes set at definition
62  * time and passed arguments at invocation time. Like other flowops (and
63  * unlike conventional subroutines) you can invoke them with an iteration
64  * count (the "iter" attribute), and they will loop through their associated
65  * list of flowops for "iter" number of times each time they are encountered
66  * in the thread or outer composite flowop which invokes them.
67  *
68  * Composite flowops are created with a "define" command, are given a name,
69  * optional default attributes, and local variable definitions on the
70  * "define" command line, followed by a brace enclosed list of flowops
71  * to execute. The enclosed flowops may include attributes that reference
72  * the local variables, as well as constants and global variables.
73  *
74  * Composite flowops are used pretty much like regular flowops, but you can
75  * also set local variables to constants or global variables ($local_var =
76  * [$var | $random_var | string | boolean | integer | double]) as part of
77  * the invocation. Thus each invocation can pass customized values to its
78  * inner flowops, greatly increasing their generality.
79  *
80  * All flowops are placed on a global, singly linked list, with fo_next
81  * being the link pointer for this list. The are also placed on a private
82  * list for the thread or composite flowop they are part of. The tf_thrd_fops
83  * pointer in the thread will point to the list of top level flowops in the
84  * thread, which are linked together by fo_exec_next. If any of these flowops
85  * are composite flowops, they will have a list of second level flowops rooted
86  * at the composite flowop's fo_comp_fops pointer. So, there is one big list
87  * of all flowops, and an n-arry tree of threads, composite flowops, and
88  * flowops, with composite flowops being the branch nodes in the tree.
89  *
90  * To illustrate, if we have three first level flowops, the first of which is
91  * a composite flowop consisting of two other flowops, we get:
92  *
93  * Thread->tf_thrd_fops -> flowop->fo_exec_next -> flowop->fo_exec_next
94  *			   flowop->fo_comp_fops		    |
95  *				    |			    V
96  *				    |			flowop->fo_exec_next
97  *				    |
98  *				    V
99  *				flowop->fo_exec_next -> flowop->fo_exec_next
100  *
101  * And all five flowops (plus others from any other threads) are on a global
102  * list linked with fo_next.
103  */
104 
105 /*
106  * Prints the name and instance number of each flowop in
107  * the supplied list to the filebench log.
108  */
109 int
110 flowop_printlist(flowop_t *list)
111 {
112 	flowop_t *flowop = list;
113 
114 	while (flowop) {
115 		filebench_log(LOG_DEBUG_IMPL, "flowop-list %s-%d",
116 		    flowop->fo_name, flowop->fo_instance);
117 		flowop = flowop->fo_exec_next;
118 	}
119 	return (0);
120 }
121 
122 /*
123  * Prints the name and instance number of all flowops on
124  * the master flowop list to the console and the filebench log.
125  */
126 void
127 flowop_printall(void)
128 {
129 	flowop_t *flowop = filebench_shm->shm_flowoplist;
130 
131 	while (flowop) {
132 		filebench_log(LOG_VERBOSE, "flowop-list %s-%d",
133 		    flowop->fo_name, flowop->fo_instance);
134 		flowop = flowop->fo_next;
135 	}
136 }
137 
138 #define	TIMESPEC_TO_HRTIME(s, e) (((e.tv_sec - s.tv_sec) * 1000000000LL) + \
139 					(e.tv_nsec - s.tv_nsec))
140 /*
141  * Puts current high resolution time in start time entry
142  * for threadflow and may also calculate running filebench
143  * overhead statistics.
144  */
145 void
146 flowop_beginop(threadflow_t *threadflow, flowop_t *flowop)
147 {
148 #ifdef HAVE_PROCFS
149 	if ((noproc == 0) && (threadflow->tf_lwpusagefd == 0)) {
150 		char procname[128];
151 
152 		(void) snprintf(procname, sizeof (procname),
153 		    "/proc/%d/lwp/%d/lwpusage", my_pid, _lwp_self());
154 		threadflow->tf_lwpusagefd = open(procname, O_RDONLY);
155 	}
156 
157 	(void) pread(threadflow->tf_lwpusagefd,
158 	    &threadflow->tf_susage,
159 	    sizeof (struct prusage), 0);
160 
161 	/* Compute overhead time in this thread around op */
162 	if (threadflow->tf_eusage.pr_stime.tv_nsec) {
163 		flowop->fo_stats.fs_mstate[FLOW_MSTATE_OHEAD] +=
164 		    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_utime,
165 		    threadflow->tf_susage.pr_utime) +
166 		    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_ttime,
167 		    threadflow->tf_susage.pr_ttime) +
168 		    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_stime,
169 		    threadflow->tf_susage.pr_stime);
170 	}
171 #endif
172 	/* Start of op for this thread */
173 	threadflow->tf_stime = gethrtime();
174 }
175 
176 flowstat_t controlstats;
177 pthread_mutex_t controlstats_lock;
178 static int controlstats_zeroed = 0;
179 
180 /*
181  * Updates flowop's latency statistics, using saved start
182  * time and current high resolution time. Updates flowop's
183  * io count and transferred bytes statistics. Also updates
184  * threadflow's and flowop's cumulative read or write byte
185  * and io count statistics.
186  */
187 void
188 flowop_endop(threadflow_t *threadflow, flowop_t *flowop, int64_t bytes)
189 {
190 	hrtime_t t;
191 
192 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_LAT] +=
193 	    (gethrtime() - threadflow->tf_stime);
194 #ifdef HAVE_PROCFS
195 	if ((pread(threadflow->tf_lwpusagefd, &threadflow->tf_eusage,
196 	    sizeof (struct prusage), 0)) != sizeof (struct prusage))
197 		filebench_log(LOG_ERROR, "cannot read /proc");
198 
199 	t =
200 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_utime,
201 	    threadflow->tf_eusage.pr_utime) +
202 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_ttime,
203 	    threadflow->tf_eusage.pr_ttime) +
204 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_stime,
205 	    threadflow->tf_eusage.pr_stime);
206 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_CPU] += t;
207 
208 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_WAIT] +=
209 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_tftime,
210 	    threadflow->tf_eusage.pr_tftime) +
211 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_dftime,
212 	    threadflow->tf_eusage.pr_dftime) +
213 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
214 	    threadflow->tf_eusage.pr_kftime) +
215 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
216 	    threadflow->tf_eusage.pr_kftime) +
217 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_slptime,
218 	    threadflow->tf_eusage.pr_slptime);
219 #endif
220 
221 	flowop->fo_stats.fs_count++;
222 	flowop->fo_stats.fs_bytes += bytes;
223 	(void) ipc_mutex_lock(&controlstats_lock);
224 	if ((flowop->fo_type & FLOW_TYPE_IO) ||
225 	    (flowop->fo_type & FLOW_TYPE_AIO)) {
226 		controlstats.fs_count++;
227 		controlstats.fs_bytes += bytes;
228 	}
229 	if (flowop->fo_attrs & FLOW_ATTR_READ) {
230 		threadflow->tf_stats.fs_rbytes += bytes;
231 		threadflow->tf_stats.fs_rcount++;
232 		flowop->fo_stats.fs_rcount++;
233 		controlstats.fs_rbytes += bytes;
234 		controlstats.fs_rcount++;
235 	} else if (flowop->fo_attrs & FLOW_ATTR_WRITE) {
236 		threadflow->tf_stats.fs_wbytes += bytes;
237 		threadflow->tf_stats.fs_wcount++;
238 		flowop->fo_stats.fs_wcount++;
239 		controlstats.fs_wbytes += bytes;
240 		controlstats.fs_wcount++;
241 	}
242 	(void) ipc_mutex_unlock(&controlstats_lock);
243 }
244 
245 /*
246  * Calls the flowop's initialization function, pointed to by
247  * flowop->fo_init.
248  */
249 static int
250 flowop_initflow(flowop_t *flowop)
251 {
252 	/*
253 	 * save static copies of two items, in case they are supplied
254 	 * from random variables
255 	 */
256 	flowop->fo_constvalue = avd_get_int(flowop->fo_value);
257 	flowop->fo_constwss = avd_get_int(flowop->fo_wss);
258 
259 	if ((*flowop->fo_init)(flowop) < 0) {
260 		filebench_log(LOG_ERROR, "flowop %s-%d init failed",
261 		    flowop->fo_name, flowop->fo_instance);
262 		return (-1);
263 	}
264 	return (0);
265 }
266 
267 static int
268 flowop_create_runtime_flowops(threadflow_t *threadflow, flowop_t **ops_list_ptr)
269 {
270 	flowop_t *flowop = *ops_list_ptr;
271 
272 	while (flowop) {
273 		flowop_t *newflowop;
274 
275 		if (flowop == *ops_list_ptr)
276 			*ops_list_ptr = NULL;
277 
278 		newflowop = flowop_define_common(threadflow, flowop->fo_name,
279 		    flowop, ops_list_ptr, 1, 0);
280 		if (newflowop == NULL)
281 			return (FILEBENCH_ERROR);
282 
283 		/* check for fo_filename attribute, and resolve if present */
284 		if (flowop->fo_filename) {
285 			char *name;
286 
287 			name = avd_get_str(flowop->fo_filename);
288 			newflowop->fo_fileset = fileset_find(name);
289 
290 			if (newflowop->fo_fileset == NULL) {
291 				filebench_log(LOG_ERROR,
292 				    "flowop %s: file %s not found",
293 				    newflowop->fo_name, name);
294 				filebench_shutdown(1);
295 			}
296 		}
297 
298 		if (flowop_initflow(newflowop) < 0) {
299 			filebench_log(LOG_ERROR, "Flowop init of %s failed",
300 			    newflowop->fo_name);
301 		}
302 
303 		flowop = flowop->fo_exec_next;
304 	}
305 	return (FILEBENCH_OK);
306 }
307 
308 /*
309  * Calls the flowop's destruct function, pointed to by
310  * flowop->fo_destruct.
311  */
312 static void
313 flowop_destructflow(flowop_t *flowop)
314 {
315 	(*flowop->fo_destruct)(flowop);
316 }
317 
318 /*
319  * call the destruct funtions of all the threadflow's flowops,
320  * if it is still flagged as "running".
321  */
322 void
323 flowop_destruct_all_flows(threadflow_t *threadflow)
324 {
325 	flowop_t *flowop;
326 
327 	(void) ipc_mutex_lock(&threadflow->tf_lock);
328 
329 	/* prepare to call destruct flow routines, if necessary */
330 	if (threadflow->tf_running == 0) {
331 
332 		/* allready destroyed */
333 		(void) ipc_mutex_unlock(&threadflow->tf_lock);
334 		return;
335 	}
336 
337 	flowop = threadflow->tf_thrd_fops;
338 	threadflow->tf_running = 0;
339 	(void) ipc_mutex_unlock(&threadflow->tf_lock);
340 
341 	while (flowop) {
342 		flowop_destructflow(flowop);
343 		flowop = flowop->fo_exec_next;
344 	}
345 }
346 
347 /*
348  * The final initialization and main execution loop for the
349  * worker threads. Sets threadflow and flowop start times,
350  * waits for all process to start, then creates the runtime
351  * flowops from those defined by the F language workload
352  * script. It does some more initialization, then enters a
353  * loop to repeatedly execute the flowops on the flowop list
354  * until an abort condition is detected, at which time it exits.
355  * This is the starting routine for the new worker thread
356  * created by threadflow_createthread(), and is not currently
357  * called from anywhere else.
358  */
359 void
360 flowop_start(threadflow_t *threadflow)
361 {
362 	flowop_t *flowop;
363 	size_t memsize;
364 	int ret = 0;
365 
366 #ifdef HAVE_PROCFS
367 	if (noproc == 0) {
368 		char procname[128];
369 		long ctl[2] = {PCSET, PR_MSACCT};
370 		int pfd;
371 
372 		(void) snprintf(procname, sizeof (procname),
373 		    "/proc/%d/lwp/%d/lwpctl", my_pid, _lwp_self());
374 		pfd = open(procname, O_WRONLY);
375 		(void) pwrite(pfd, &ctl, sizeof (ctl), 0);
376 		(void) close(pfd);
377 	}
378 #endif
379 
380 	(void) ipc_mutex_lock(&controlstats_lock);
381 	if (!controlstats_zeroed) {
382 		(void) memset(&controlstats, 0, sizeof (controlstats));
383 		controlstats_zeroed = 1;
384 	}
385 	(void) ipc_mutex_unlock(&controlstats_lock);
386 
387 	flowop = threadflow->tf_thrd_fops;
388 	threadflow->tf_stats.fs_stime = gethrtime();
389 	flowop->fo_stats.fs_stime = gethrtime();
390 
391 	/* Hold the flowop find lock as reader to prevent lookups */
392 	(void) pthread_rwlock_rdlock(&filebench_shm->shm_flowop_find_lock);
393 
394 	/*
395 	 * Block until all processes have started, acting like
396 	 * a barrier. The original filebench process initially
397 	 * holds the run_lock as a reader, preventing any of the
398 	 * threads from obtaining the writer lock, and hence
399 	 * passing this point. Once all processes and threads
400 	 * have been created, the original process unlocks
401 	 * run_lock, allowing each waiting thread to lock
402 	 * and then immediately unlock it, then begin running.
403 	 */
404 	(void) pthread_rwlock_wrlock(&filebench_shm->shm_run_lock);
405 	(void) pthread_rwlock_unlock(&filebench_shm->shm_run_lock);
406 
407 	/* Create the runtime flowops from those defined by the script */
408 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
409 	if (flowop_create_runtime_flowops(threadflow, &threadflow->tf_thrd_fops)
410 	    != FILEBENCH_OK) {
411 		(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
412 		filebench_shutdown(1);
413 		return;
414 	}
415 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
416 
417 	/* Release the find lock as reader to allow lookups */
418 	(void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
419 
420 	/* Set to the start of the new flowop list */
421 	flowop = threadflow->tf_thrd_fops;
422 
423 	threadflow->tf_abort = 0;
424 	threadflow->tf_running = 1;
425 
426 	memsize = (size_t)threadflow->tf_constmemsize;
427 
428 	/* If we are going to use ISM, allocate later */
429 	if (threadflow->tf_attrs & THREADFLOW_USEISM) {
430 		threadflow->tf_mem =
431 		    ipc_ismmalloc(memsize);
432 	} else {
433 		threadflow->tf_mem =
434 		    malloc(memsize);
435 	}
436 
437 	(void) memset(threadflow->tf_mem, 0, memsize);
438 	filebench_log(LOG_DEBUG_SCRIPT, "Thread allocated %d bytes", memsize);
439 
440 #ifdef HAVE_LWPS
441 	filebench_log(LOG_DEBUG_SCRIPT, "Thread %zx (%d) started",
442 	    threadflow,
443 	    _lwp_self());
444 #endif
445 
446 	/* Main filebench worker loop */
447 	/* CONSTCOND */
448 	while (1) {
449 		int i, count;
450 
451 		/* Abort if asked */
452 		if (threadflow->tf_abort || filebench_shm->shm_f_abort)
453 			break;
454 
455 		/* Be quiet while stats are gathered */
456 		if (filebench_shm->shm_bequiet) {
457 			(void) sleep(1);
458 			continue;
459 		}
460 
461 		/* Take it easy until everyone is ready to go */
462 		if (!filebench_shm->shm_running) {
463 			(void) sleep(1);
464 			continue;
465 		}
466 
467 		if (flowop == NULL) {
468 			filebench_log(LOG_ERROR, "flowop_read null flowop");
469 			return;
470 		}
471 
472 		if (flowop->fo_stats.fs_stime == 0)
473 			flowop->fo_stats.fs_stime = gethrtime();
474 
475 		/* Execute the flowop for fo_iters times */
476 		count = (int)avd_get_int(flowop->fo_iters);
477 		for (i = 0; i < count; i++) {
478 
479 			filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
480 			    "%s-%d", threadflow->tf_name, flowop->fo_name,
481 			    flowop->fo_instance);
482 
483 			ret = (*flowop->fo_func)(threadflow, flowop);
484 
485 			/*
486 			 * Return value FILEBENCH_ERROR means "flowop
487 			 * failed, stop the filebench run"
488 			 */
489 			if (ret == FILEBENCH_ERROR) {
490 				filebench_log(LOG_ERROR,
491 				    "%s-%d: flowop %s-%d failed",
492 				    threadflow->tf_name,
493 				    threadflow->tf_instance,
494 				    flowop->fo_name,
495 				    flowop->fo_instance);
496 				(void) ipc_mutex_lock(&threadflow->tf_lock);
497 				threadflow->tf_abort = 1;
498 				filebench_shm->shm_f_abort =
499 				    FILEBENCH_ABORT_ERROR;
500 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
501 				break;
502 			}
503 
504 			/*
505 			 * Return value of FILEBENCH_NORSC means "stop
506 			 * the filebench run" if in "end on no work mode",
507 			 * otherwise it indicates an error
508 			 */
509 			if (ret == FILEBENCH_NORSC) {
510 				(void) ipc_mutex_lock(&threadflow->tf_lock);
511 				threadflow->tf_abort = FILEBENCH_DONE;
512 				if (filebench_shm->shm_rmode ==
513 				    FILEBENCH_MODE_Q1STDONE) {
514 					filebench_shm->shm_f_abort =
515 					    FILEBENCH_ABORT_RSRC;
516 				} else if (filebench_shm->shm_rmode !=
517 				    FILEBENCH_MODE_QALLDONE) {
518 					filebench_log(LOG_ERROR1,
519 					    "WARNING! Run stopped early:\n   "
520 					    "             flowop %s-%d could "
521 					    "not obtain a file. Please\n      "
522 					    "          reduce runtime, "
523 					    "increase fileset entries "
524 					    "($nfiles), or switch modes.",
525 					    flowop->fo_name,
526 					    flowop->fo_instance);
527 					filebench_shm->shm_f_abort =
528 					    FILEBENCH_ABORT_ERROR;
529 				}
530 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
531 				break;
532 			}
533 
534 			/*
535 			 * Return value of FILEBENCH_DONE means "stop
536 			 * the filebench run without error"
537 			 */
538 			if (ret == FILEBENCH_DONE) {
539 				(void) ipc_mutex_lock(&threadflow->tf_lock);
540 				threadflow->tf_abort = FILEBENCH_DONE;
541 				filebench_shm->shm_f_abort =
542 				    FILEBENCH_ABORT_DONE;
543 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
544 				break;
545 			}
546 
547 			/*
548 			 * If we get here and the return is something other
549 			 * than FILEBENCH_OK, it means a spurious code
550 			 * was returned, so treat as major error. This
551 			 * probably indicates a bug in the flowop.
552 			 */
553 			if (ret != FILEBENCH_OK) {
554 				filebench_log(LOG_ERROR,
555 				    "Flowop %s unexpected return value = %d\n",
556 				    flowop->fo_name, ret);
557 				filebench_shm->shm_f_abort =
558 				    FILEBENCH_ABORT_ERROR;
559 				break;
560 			}
561 		}
562 
563 		/* advance to next flowop */
564 		flowop = flowop->fo_exec_next;
565 
566 		/* but if at end of list, start over from the beginning */
567 		if (flowop == NULL) {
568 			flowop = threadflow->tf_thrd_fops;
569 			threadflow->tf_stats.fs_count++;
570 		}
571 	}
572 
573 #ifdef HAVE_LWPS
574 	filebench_log(LOG_DEBUG_SCRIPT, "Thread %d exiting",
575 	    _lwp_self());
576 #endif
577 
578 	/* Tell flowops to destroy locally acquired state */
579 	flowop_destruct_all_flows(threadflow);
580 
581 	pthread_exit(&threadflow->tf_abort);
582 }
583 
584 void
585 flowop_init(void)
586 {
587 	flowoplib_init();
588 }
589 
590 /*
591  * Delete the designated flowop from the thread's flowop list.
592  */
593 static void
594 flowop_delete(flowop_t **flowoplist, flowop_t *flowop)
595 {
596 	flowop_t *entry = *flowoplist;
597 	int found = 0;
598 
599 	filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
600 	    flowop->fo_name,
601 	    flowop->fo_instance);
602 
603 	/* Delete from thread's flowop list */
604 	if (flowop == *flowoplist) {
605 		/* First on list */
606 		*flowoplist = flowop->fo_exec_next;
607 		filebench_log(LOG_DEBUG_IMPL,
608 		    "Delete0 flowop: (%s-%d)",
609 		    flowop->fo_name,
610 		    flowop->fo_instance);
611 	} else {
612 		while (entry->fo_exec_next) {
613 			filebench_log(LOG_DEBUG_IMPL,
614 			    "Delete0 flowop: (%s-%d) == (%s-%d)",
615 			    entry->fo_exec_next->fo_name,
616 			    entry->fo_exec_next->fo_instance,
617 			    flowop->fo_name,
618 			    flowop->fo_instance);
619 
620 			if (flowop == entry->fo_exec_next) {
621 				/* Delete */
622 				filebench_log(LOG_DEBUG_IMPL,
623 				    "Deleted0 flowop: (%s-%d)",
624 				    entry->fo_exec_next->fo_name,
625 				    entry->fo_exec_next->fo_instance);
626 				entry->fo_exec_next =
627 				    entry->fo_exec_next->fo_exec_next;
628 				break;
629 			}
630 			entry = entry->fo_exec_next;
631 		}
632 	}
633 
634 #ifdef HAVE_PROCFS
635 	/* Close /proc stats */
636 	if (flowop->fo_thread)
637 		(void) close(flowop->fo_thread->tf_lwpusagefd);
638 #endif
639 
640 	/* Delete from global list */
641 	entry = filebench_shm->shm_flowoplist;
642 
643 	if (flowop == filebench_shm->shm_flowoplist) {
644 		/* First on list */
645 		filebench_shm->shm_flowoplist = flowop->fo_next;
646 		found = 1;
647 	} else {
648 		while (entry->fo_next) {
649 			filebench_log(LOG_DEBUG_IMPL,
650 			    "Delete flowop: (%s-%d) == (%s-%d)",
651 			    entry->fo_next->fo_name,
652 			    entry->fo_next->fo_instance,
653 			    flowop->fo_name,
654 			    flowop->fo_instance);
655 
656 			if (flowop == entry->fo_next) {
657 				/* Delete */
658 				entry->fo_next = entry->fo_next->fo_next;
659 				found = 1;
660 				break;
661 			}
662 
663 			entry = entry->fo_next;
664 		}
665 	}
666 	if (found) {
667 		filebench_log(LOG_DEBUG_IMPL,
668 		    "Deleted flowop: (%s-%d)",
669 		    flowop->fo_name,
670 		    flowop->fo_instance);
671 		ipc_free(FILEBENCH_FLOWOP, (char *)flowop);
672 	} else {
673 		filebench_log(LOG_DEBUG_IMPL, "Flowop %s-%d not found!",
674 		    flowop->fo_name,
675 		    flowop->fo_instance);
676 	}
677 }
678 
679 /*
680  * Deletes all the flowops from a flowop list.
681  */
682 void
683 flowop_delete_all(flowop_t **flowoplist)
684 {
685 	flowop_t *flowop = *flowoplist;
686 
687 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
688 
689 	while (flowop) {
690 		filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
691 		    flowop->fo_name, flowop->fo_instance);
692 
693 		if (flowop->fo_instance &&
694 		    (flowop->fo_instance == FLOW_MASTER)) {
695 			flowop = flowop->fo_exec_next;
696 			continue;
697 		}
698 		flowop_delete(flowoplist, flowop);
699 		flowop = flowop->fo_exec_next;
700 	}
701 
702 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
703 }
704 
705 /*
706  * Allocates a flowop entity and initializes it with inherited
707  * contents from the "inherit" flowop, if it is supplied, or
708  * with zeros otherwise. In either case the fo_next and fo_exec_next
709  * pointers are set to NULL, and fo_thread is set to point to
710  * the owning threadflow. The initialized flowop is placed at
711  * the head of the global flowop list, and also placed on the
712  * tail of the supplied local flowop list, which will either
713  * be a threadflow's tf_thrd_fops list or a composite flowop's
714  * fo_comp_fops list. The routine locks the flowop's fo_lock and
715  * leaves it held on return. If successful, it returns a pointer
716  * to the allocated and initialized flowop, otherwise it returns NULL.
717  *
718  * filebench_shm->shm_flowop_lock must be held by caller.
719  */
720 static flowop_t *
721 flowop_define_common(threadflow_t *threadflow, char *name, flowop_t *inherit,
722     flowop_t **flowoplist_hdp, int instance, int type)
723 {
724 	flowop_t *flowop;
725 
726 	if (name == NULL)
727 		return (NULL);
728 
729 	if ((flowop = (flowop_t *)ipc_malloc(FILEBENCH_FLOWOP)) == NULL) {
730 		filebench_log(LOG_ERROR,
731 		    "flowop_define: Can't malloc flowop");
732 		return (NULL);
733 	}
734 
735 	filebench_log(LOG_DEBUG_IMPL, "defining flowops %s-%d, addr %zx",
736 	    name, instance, flowop);
737 
738 	if (flowop == NULL)
739 		return (NULL);
740 
741 	if (inherit) {
742 		(void) memcpy(flowop, inherit, sizeof (flowop_t));
743 		(void) pthread_mutex_init(&flowop->fo_lock, ipc_mutexattr());
744 		(void) ipc_mutex_lock(&flowop->fo_lock);
745 		flowop->fo_next = NULL;
746 		flowop->fo_exec_next = NULL;
747 		filebench_log(LOG_DEBUG_IMPL,
748 		    "flowop %s-%d calling init", name, instance);
749 	} else {
750 		(void) memset(flowop, 0, sizeof (flowop_t));
751 		flowop->fo_iters = avd_int_alloc(1);
752 		flowop->fo_type = type;
753 		(void) pthread_mutex_init(&flowop->fo_lock, ipc_mutexattr());
754 		(void) ipc_mutex_lock(&flowop->fo_lock);
755 	}
756 
757 	/* Create backpointer to thread */
758 	flowop->fo_thread = threadflow;
759 
760 	/* Add flowop to global list */
761 	if (filebench_shm->shm_flowoplist == NULL) {
762 		filebench_shm->shm_flowoplist = flowop;
763 		flowop->fo_next = NULL;
764 	} else {
765 		flowop->fo_next = filebench_shm->shm_flowoplist;
766 		filebench_shm->shm_flowoplist = flowop;
767 	}
768 
769 	(void) strcpy(flowop->fo_name, name);
770 	flowop->fo_instance = instance;
771 
772 	if (flowoplist_hdp == NULL)
773 		return (flowop);
774 
775 	/* Add flowop to thread op list */
776 	if (*flowoplist_hdp == NULL) {
777 		*flowoplist_hdp = flowop;
778 		flowop->fo_exec_next = NULL;
779 	} else {
780 		flowop_t *flowend;
781 
782 		/* Find the end of the thread list */
783 		flowend = *flowoplist_hdp;
784 		while (flowend->fo_exec_next != NULL)
785 			flowend = flowend->fo_exec_next;
786 		flowend->fo_exec_next = flowop;
787 		flowop->fo_exec_next = NULL;
788 	}
789 
790 	return (flowop);
791 }
792 
793 /*
794  * Calls flowop_define_common() to allocate and initialize a
795  * flowop, and holds the shared flowop_lock during the call.
796  * It releases the created flowop's fo_lock when done.
797  */
798 flowop_t *
799 flowop_define(threadflow_t *threadflow, char *name, flowop_t *inherit,
800     flowop_t **flowoplist_hdp, int instance, int type)
801 {
802 	flowop_t	*flowop;
803 
804 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
805 	flowop = flowop_define_common(threadflow, name,
806 	    inherit, flowoplist_hdp, instance, type);
807 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
808 
809 	if (flowop == NULL)
810 		return (NULL);
811 
812 	(void) ipc_mutex_unlock(&flowop->fo_lock);
813 	return (flowop);
814 }
815 
816 /*
817  * Calls flowop_define_common() to allocate and initialize a
818  * composite flowop, and holds the shared flowop_lock during the call.
819  * It releases the created flowop's fo_lock when done.
820  */
821 flowop_t *
822 flowop_new_composite_define(char *name)
823 {
824 	flowop_t *flowop;
825 
826 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
827 	flowop = flowop_define_common(NULL, name,
828 	    NULL, NULL, 0, FLOW_TYPE_COMPOSITE);
829 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
830 
831 	if (flowop == NULL)
832 		return (NULL);
833 
834 	flowop->fo_func = flowop_composite;
835 	flowop->fo_init = flowop_composite_init;
836 	flowop->fo_destruct = flowop_composite_destruct;
837 	(void) ipc_mutex_unlock(&flowop->fo_lock);
838 
839 	return (flowop);
840 }
841 
842 /*
843  * Attempts to take a write lock on the flowop_find_lock that is
844  * defined in interprocess shared memory. Since each call to
845  * flowop_start() holds a read lock on flowop_find_lock, this
846  * routine effectively blocks until all instances of
847  * flowop_start() have finished. The flowop_find() routine calls
848  * this routine so that flowops won't be searched for until all
849  * flowops have been created by flowop_start.
850  */
851 static void
852 flowop_find_barrier(void)
853 {
854 	/* Block on wrlock to ensure find waits for all creates */
855 	(void) pthread_rwlock_wrlock(&filebench_shm->shm_flowop_find_lock);
856 	(void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
857 }
858 
859 /*
860  * Returns a list of flowops named "name" from the master
861  * flowop list.
862  */
863 flowop_t *
864 flowop_find(char *name)
865 {
866 	flowop_t *flowop;
867 	flowop_t *result = NULL;
868 
869 	flowop_find_barrier();
870 
871 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
872 
873 	flowop = filebench_shm->shm_flowoplist;
874 
875 	while (flowop) {
876 		if (strcmp(name, flowop->fo_name) == 0) {
877 
878 			/* Add flowop to result list */
879 			if (result == NULL) {
880 				result = flowop;
881 				flowop->fo_resultnext = NULL;
882 			} else {
883 				flowop->fo_resultnext = result;
884 				result = flowop;
885 			}
886 		}
887 		flowop = flowop->fo_next;
888 	}
889 
890 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
891 
892 
893 	return (result);
894 }
895 
896 /*
897  * Returns a pointer to the specified instance of flowop
898  * "name" from the supplied list.
899  */
900 flowop_t *
901 flowop_find_one(char *name, int instance)
902 {
903 	flowop_t *test_flowop;
904 
905 	flowop_find_barrier();
906 
907 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
908 
909 	test_flowop = filebench_shm->shm_flowoplist;
910 
911 	while (test_flowop) {
912 		if ((strcmp(name, test_flowop->fo_name) == 0) &&
913 		    (instance == test_flowop->fo_instance))
914 			break;
915 
916 		test_flowop = test_flowop->fo_next;
917 	}
918 
919 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
920 
921 	return (test_flowop);
922 }
923 
924 /*
925  * Composite flowop method. Does one pass through its list of
926  * inner flowops per iteration.
927  */
928 static int
929 flowop_composite(threadflow_t *threadflow, flowop_t *flowop)
930 {
931 	flowop_t	*inner_flowop;
932 
933 	/* get the first flowop in the list */
934 	inner_flowop = flowop->fo_comp_fops;
935 
936 	/* make a pass through the list of sub flowops */
937 	while (inner_flowop) {
938 		int	i, count;
939 
940 		/* Abort if asked */
941 		if (threadflow->tf_abort || filebench_shm->shm_f_abort)
942 			return (FILEBENCH_DONE);
943 
944 		if (inner_flowop->fo_stats.fs_stime == 0)
945 			inner_flowop->fo_stats.fs_stime = gethrtime();
946 
947 		/* Execute the flowop for fo_iters times */
948 		count = (int)avd_get_int(inner_flowop->fo_iters);
949 		for (i = 0; i < count; i++) {
950 
951 			filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
952 			    "%s-%d", threadflow->tf_name,
953 			    inner_flowop->fo_name,
954 			    inner_flowop->fo_instance);
955 
956 			switch ((*inner_flowop->fo_func)(threadflow,
957 			    inner_flowop)) {
958 
959 			/* all done */
960 			case FILEBENCH_DONE:
961 				return (FILEBENCH_DONE);
962 
963 			/* quit if inner flowop limit reached */
964 			case FILEBENCH_NORSC:
965 				return (FILEBENCH_NORSC);
966 
967 			/* quit on inner flowop error */
968 			case FILEBENCH_ERROR:
969 				filebench_log(LOG_ERROR,
970 				    "inner flowop %s failed",
971 				    inner_flowop->fo_name);
972 				return (FILEBENCH_ERROR);
973 
974 			/* otherwise keep going */
975 			default:
976 				break;
977 			}
978 
979 		}
980 
981 		/* advance to next flowop */
982 		inner_flowop = inner_flowop->fo_exec_next;
983 	}
984 
985 	/* finished with this pass */
986 	return (FILEBENCH_OK);
987 }
988 
989 /*
990  * Composite flowop initialization. Creates runtime inner flowops
991  * from prototype inner flowops.
992  */
993 static int
994 flowop_composite_init(flowop_t *flowop)
995 {
996 	int err;
997 
998 	err = flowop_create_runtime_flowops(flowop->fo_thread,
999 	    &flowop->fo_comp_fops);
1000 	if (err != FILEBENCH_OK)
1001 		return (err);
1002 
1003 	(void) ipc_mutex_unlock(&flowop->fo_lock);
1004 	return (0);
1005 }
1006 
1007 /*
1008  * clean up inner flowops
1009  */
1010 static void
1011 flowop_composite_destruct(flowop_t *flowop)
1012 {
1013 	flowop_t *inner_flowop = flowop->fo_comp_fops;
1014 
1015 	while (inner_flowop) {
1016 		filebench_log(LOG_DEBUG_IMPL, "Deleting inner flowop (%s-%d)",
1017 		    inner_flowop->fo_name, inner_flowop->fo_instance);
1018 
1019 		if (inner_flowop->fo_instance &&
1020 		    (inner_flowop->fo_instance == FLOW_MASTER)) {
1021 			inner_flowop = inner_flowop->fo_exec_next;
1022 			continue;
1023 		}
1024 		flowop_delete(&flowop->fo_comp_fops, inner_flowop);
1025 		inner_flowop = inner_flowop->fo_exec_next;
1026 	}
1027 }
1028