xref: /onnv-gate/usr/src/cmd/filebench/common/flowop.c (revision 6613:38664cf1a8a1)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include "config.h"
29 
30 #ifdef HAVE_LWPS
31 #include <sys/lwp.h>
32 #endif
33 #include <fcntl.h>
34 #include "filebench.h"
35 #include "flowop.h"
36 #include "stats.h"
37 
38 #ifdef LINUX_PORT
39 #include <sys/types.h>
40 #include <linux/unistd.h>
41 #endif
42 
43 static flowop_t *flowop_define_common(threadflow_t *threadflow, char *name,
44     flowop_t *inherit, flowop_t **flowoplist_hdp, int instance, int type);
45 static int flowop_composite(threadflow_t *threadflow, flowop_t *flowop);
46 static int flowop_composite_init(flowop_t *flowop);
47 static void flowop_composite_destruct(flowop_t *flowop);
48 
49 /*
50  * A collection of flowop support functions. The actual code that
51  * implements the various flowops is in flowop_library.c.
52  *
53  * Routines for defining, creating, initializing and destroying
54  * flowops, cyclically invoking the flowops on each threadflow's flowop
55  * list, collecting statistics about flowop execution, and other
56  * housekeeping duties are included in this file.
57  *
58  * User Defined Composite Flowops
59  *    The ability to define new flowops as lists of built-in or previously
60  * defined flowops has been added to Filebench. In a sense they are like
61  * in-line subroutines, which can have default attributes set at definition
62  * time and passed arguments at invocation time. Like other flowops (and
63  * unlike conventional subroutines) you can invoke them with an iteration
64  * count (the "iter" attribute), and they will loop through their associated
65  * list of flowops for "iter" number of times each time they are encountered
66  * in the thread or outer composite flowop which invokes them.
67  *
68  * Composite flowops are created with a "define" command, are given a name,
69  * optional default attributes, and local variable definitions on the
70  * "define" command line, followed by a brace enclosed list of flowops
71  * to execute. The enclosed flowops may include attributes that reference
72  * the local variables, as well as constants and global variables.
73  *
74  * Composite flowops are used pretty much like regular flowops, but you can
75  * also set local variables to constants or global variables ($local_var =
76  * [$var | $random_var | string | boolean | integer | double]) as part of
77  * the invocation. Thus each invocation can pass customized values to its
78  * inner flowops, greatly increasing their generality.
79  *
80  * All flowops are placed on a global, singly linked list, with fo_next
81  * being the link pointer for this list. The are also placed on a private
82  * list for the thread or composite flowop they are part of. The tf_thrd_fops
83  * pointer in the thread will point to the list of top level flowops in the
84  * thread, which are linked together by fo_exec_next. If any of these flowops
85  * are composite flowops, they will have a list of second level flowops rooted
86  * at the composite flowop's fo_comp_fops pointer. So, there is one big list
87  * of all flowops, and an n-arry tree of threads, composite flowops, and
88  * flowops, with composite flowops being the branch nodes in the tree.
89  *
90  * To illustrate, if we have three first level flowops, the first of which is
91  * a composite flowop consisting of two other flowops, we get:
92  *
93  * Thread->tf_thrd_fops -> flowop->fo_exec_next -> flowop->fo_exec_next
94  *			   flowop->fo_comp_fops		    |
95  *				    |			    V
96  *				    |			flowop->fo_exec_next
97  *				    |
98  *				    V
99  *				flowop->fo_exec_next -> flowop->fo_exec_next
100  *
101  * And all five flowops (plus others from any other threads) are on a global
102  * list linked with fo_next.
103  */
104 
105 /*
106  * Prints the name and instance number of each flowop in
107  * the supplied list to the filebench log.
108  */
109 int
110 flowop_printlist(flowop_t *list)
111 {
112 	flowop_t *flowop = list;
113 
114 	while (flowop) {
115 		filebench_log(LOG_DEBUG_IMPL, "flowop-list %s-%d",
116 		    flowop->fo_name, flowop->fo_instance);
117 		flowop = flowop->fo_exec_next;
118 	}
119 	return (0);
120 }
121 
122 /*
123  * Prints the name and instance number of all flowops on
124  * the master flowop list to the console and the filebench log.
125  */
126 void
127 flowop_printall(void)
128 {
129 	flowop_t *flowop = filebench_shm->shm_flowoplist;
130 
131 	while (flowop) {
132 		filebench_log(LOG_VERBOSE, "flowop-list %s-%d",
133 		    flowop->fo_name, flowop->fo_instance);
134 		flowop = flowop->fo_next;
135 	}
136 }
137 
138 #define	TIMESPEC_TO_HRTIME(s, e) (((e.tv_sec - s.tv_sec) * 1000000000LL) + \
139 					(e.tv_nsec - s.tv_nsec))
140 /*
141  * Puts current high resolution time in start time entry
142  * for threadflow and may also calculate running filebench
143  * overhead statistics.
144  */
145 void
146 flowop_beginop(threadflow_t *threadflow, flowop_t *flowop)
147 {
148 #ifdef HAVE_PROCFS
149 	if ((noproc == 0) && (threadflow->tf_lwpusagefd == 0)) {
150 		char procname[128];
151 
152 		(void) snprintf(procname, sizeof (procname),
153 		    "/proc/%d/lwp/%d/lwpusage", my_pid, _lwp_self());
154 		threadflow->tf_lwpusagefd = open(procname, O_RDONLY);
155 	}
156 
157 	(void) pread(threadflow->tf_lwpusagefd,
158 	    &threadflow->tf_susage,
159 	    sizeof (struct prusage), 0);
160 
161 	/* Compute overhead time in this thread around op */
162 	if (threadflow->tf_eusage.pr_stime.tv_nsec) {
163 		flowop->fo_stats.fs_mstate[FLOW_MSTATE_OHEAD] +=
164 		    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_utime,
165 		    threadflow->tf_susage.pr_utime) +
166 		    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_ttime,
167 		    threadflow->tf_susage.pr_ttime) +
168 		    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_stime,
169 		    threadflow->tf_susage.pr_stime);
170 	}
171 #endif
172 	/* Start of op for this thread */
173 	threadflow->tf_stime = gethrtime();
174 }
175 
176 flowstat_t controlstats;
177 pthread_mutex_t controlstats_lock;
178 static int controlstats_zeroed = 0;
179 
180 /*
181  * Updates flowop's latency statistics, using saved start
182  * time and current high resolution time. Updates flowop's
183  * io count and transferred bytes statistics. Also updates
184  * threadflow's and flowop's cumulative read or write byte
185  * and io count statistics.
186  */
187 void
188 flowop_endop(threadflow_t *threadflow, flowop_t *flowop, int64_t bytes)
189 {
190 	hrtime_t t;
191 
192 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_LAT] +=
193 	    (gethrtime() - threadflow->tf_stime);
194 #ifdef HAVE_PROCFS
195 	if ((pread(threadflow->tf_lwpusagefd, &threadflow->tf_eusage,
196 	    sizeof (struct prusage), 0)) != sizeof (struct prusage))
197 		filebench_log(LOG_ERROR, "cannot read /proc");
198 
199 	t =
200 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_utime,
201 	    threadflow->tf_eusage.pr_utime) +
202 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_ttime,
203 	    threadflow->tf_eusage.pr_ttime) +
204 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_stime,
205 	    threadflow->tf_eusage.pr_stime);
206 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_CPU] += t;
207 
208 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_WAIT] +=
209 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_tftime,
210 	    threadflow->tf_eusage.pr_tftime) +
211 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_dftime,
212 	    threadflow->tf_eusage.pr_dftime) +
213 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
214 	    threadflow->tf_eusage.pr_kftime) +
215 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
216 	    threadflow->tf_eusage.pr_kftime) +
217 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_slptime,
218 	    threadflow->tf_eusage.pr_slptime);
219 #endif
220 
221 	flowop->fo_stats.fs_count++;
222 	flowop->fo_stats.fs_bytes += bytes;
223 	(void) ipc_mutex_lock(&controlstats_lock);
224 	if ((flowop->fo_type & FLOW_TYPE_IO) ||
225 	    (flowop->fo_type & FLOW_TYPE_AIO)) {
226 		controlstats.fs_count++;
227 		controlstats.fs_bytes += bytes;
228 	}
229 	if (flowop->fo_attrs & FLOW_ATTR_READ) {
230 		threadflow->tf_stats.fs_rbytes += bytes;
231 		threadflow->tf_stats.fs_rcount++;
232 		flowop->fo_stats.fs_rcount++;
233 		controlstats.fs_rbytes += bytes;
234 		controlstats.fs_rcount++;
235 	} else if (flowop->fo_attrs & FLOW_ATTR_WRITE) {
236 		threadflow->tf_stats.fs_wbytes += bytes;
237 		threadflow->tf_stats.fs_wcount++;
238 		flowop->fo_stats.fs_wcount++;
239 		controlstats.fs_wbytes += bytes;
240 		controlstats.fs_wcount++;
241 	}
242 	(void) ipc_mutex_unlock(&controlstats_lock);
243 }
244 
245 /*
246  * Calls the flowop's initialization function, pointed to by
247  * flowop->fo_init.
248  */
249 static int
250 flowop_initflow(flowop_t *flowop)
251 {
252 	/*
253 	 * save static copies of two items, in case they are supplied
254 	 * from random variables
255 	 */
256 	flowop->fo_constvalue = avd_get_int(flowop->fo_value);
257 	flowop->fo_constwss = avd_get_int(flowop->fo_wss);
258 
259 	if ((*flowop->fo_init)(flowop) < 0) {
260 		filebench_log(LOG_ERROR, "flowop %s-%d init failed",
261 		    flowop->fo_name, flowop->fo_instance);
262 		return (-1);
263 	}
264 	return (0);
265 }
266 
267 static int
268 flowop_create_runtime_flowops(threadflow_t *threadflow, flowop_t **ops_list_ptr)
269 {
270 	flowop_t *flowop = *ops_list_ptr;
271 
272 	while (flowop) {
273 		flowop_t *newflowop;
274 
275 		if (flowop == *ops_list_ptr)
276 			*ops_list_ptr = NULL;
277 
278 		newflowop = flowop_define_common(threadflow, flowop->fo_name,
279 		    flowop, ops_list_ptr, 1, 0);
280 		if (newflowop == NULL)
281 			return (FILEBENCH_ERROR);
282 
283 		/* check for fo_filename attribute, and resolve if present */
284 		if (flowop->fo_filename) {
285 			char *name;
286 
287 			name = avd_get_str(flowop->fo_filename);
288 			newflowop->fo_fileset = fileset_find(name);
289 
290 			if (newflowop->fo_fileset == NULL) {
291 				filebench_log(LOG_ERROR,
292 				    "flowop %s: file %s not found",
293 				    newflowop->fo_name, name);
294 				filebench_shutdown(1);
295 			}
296 		}
297 
298 		if (flowop_initflow(newflowop) < 0) {
299 			filebench_log(LOG_ERROR, "Flowop init of %s failed",
300 			    newflowop->fo_name);
301 		}
302 
303 		flowop = flowop->fo_exec_next;
304 	}
305 	return (FILEBENCH_OK);
306 }
307 
308 /*
309  * Calls the flowop's destruct function, pointed to by
310  * flowop->fo_destruct.
311  */
312 static void
313 flowop_destructflow(flowop_t *flowop)
314 {
315 	(*flowop->fo_destruct)(flowop);
316 }
317 
318 /*
319  * call the destruct funtions of all the threadflow's flowops,
320  * if it is still flagged as "running".
321  */
322 void
323 flowop_destruct_all_flows(threadflow_t *threadflow)
324 {
325 	flowop_t *flowop;
326 
327 	(void) ipc_mutex_lock(&threadflow->tf_lock);
328 
329 	/* prepare to call destruct flow routines, if necessary */
330 	if (threadflow->tf_running == 0) {
331 
332 		/* allready destroyed */
333 		(void) ipc_mutex_unlock(&threadflow->tf_lock);
334 		return;
335 	}
336 
337 	flowop = threadflow->tf_thrd_fops;
338 	threadflow->tf_running = 0;
339 	(void) ipc_mutex_unlock(&threadflow->tf_lock);
340 
341 	while (flowop) {
342 		flowop_destructflow(flowop);
343 		flowop = flowop->fo_exec_next;
344 	}
345 }
346 
347 /*
348  * The final initialization and main execution loop for the
349  * worker threads. Sets threadflow and flowop start times,
350  * waits for all process to start, then creates the runtime
351  * flowops from those defined by the F language workload
352  * script. It does some more initialization, then enters a
353  * loop to repeatedly execute the flowops on the flowop list
354  * until an abort condition is detected, at which time it exits.
355  * This is the starting routine for the new worker thread
356  * created by threadflow_createthread(), and is not currently
357  * called from anywhere else.
358  */
359 void
360 flowop_start(threadflow_t *threadflow)
361 {
362 	flowop_t *flowop;
363 	size_t memsize;
364 	int ret = 0;
365 
366 #ifdef HAVE_PROCFS
367 	if (noproc == 0) {
368 		char procname[128];
369 		long ctl[2] = {PCSET, PR_MSACCT};
370 		int pfd;
371 
372 		(void) snprintf(procname, sizeof (procname),
373 		    "/proc/%d/lwp/%d/lwpctl", my_pid, _lwp_self());
374 		pfd = open(procname, O_WRONLY);
375 		(void) pwrite(pfd, &ctl, sizeof (ctl), 0);
376 		(void) close(pfd);
377 	}
378 #endif
379 
380 	(void) ipc_mutex_lock(&controlstats_lock);
381 	if (!controlstats_zeroed) {
382 		(void) memset(&controlstats, 0, sizeof (controlstats));
383 		controlstats_zeroed = 1;
384 	}
385 	(void) ipc_mutex_unlock(&controlstats_lock);
386 
387 	flowop = threadflow->tf_thrd_fops;
388 	threadflow->tf_stats.fs_stime = gethrtime();
389 	flowop->fo_stats.fs_stime = gethrtime();
390 
391 	/* Hold the flowop find lock as reader to prevent lookups */
392 	(void) pthread_rwlock_rdlock(&filebench_shm->shm_flowop_find_lock);
393 
394 	/*
395 	 * Block until all processes have started, acting like
396 	 * a barrier. The original filebench process initially
397 	 * holds the run_lock as a reader, preventing any of the
398 	 * threads from obtaining the writer lock, and hence
399 	 * passing this point. Once all processes and threads
400 	 * have been created, the original process unlocks
401 	 * run_lock, allowing each waiting thread to lock
402 	 * and then immediately unlock it, then begin running.
403 	 */
404 	(void) pthread_rwlock_wrlock(&filebench_shm->shm_run_lock);
405 	(void) pthread_rwlock_unlock(&filebench_shm->shm_run_lock);
406 
407 	/* Create the runtime flowops from those defined by the script */
408 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
409 	if (flowop_create_runtime_flowops(threadflow, &threadflow->tf_thrd_fops)
410 	    != FILEBENCH_OK) {
411 		(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
412 		filebench_shutdown(1);
413 		return;
414 	}
415 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
416 
417 	/* Release the find lock as reader to allow lookups */
418 	(void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
419 
420 	/* Set to the start of the new flowop list */
421 	flowop = threadflow->tf_thrd_fops;
422 
423 	threadflow->tf_abort = 0;
424 	threadflow->tf_running = 1;
425 
426 	memsize = (size_t)threadflow->tf_constmemsize;
427 
428 	/* If we are going to use ISM, allocate later */
429 	if (threadflow->tf_attrs & THREADFLOW_USEISM) {
430 		threadflow->tf_mem =
431 		    ipc_ismmalloc(memsize);
432 	} else {
433 		threadflow->tf_mem =
434 		    malloc(memsize);
435 	}
436 
437 	(void) memset(threadflow->tf_mem, 0, memsize);
438 	filebench_log(LOG_DEBUG_SCRIPT, "Thread allocated %d bytes", memsize);
439 
440 #ifdef HAVE_LWPS
441 	filebench_log(LOG_DEBUG_SCRIPT, "Thread %zx (%d) started",
442 	    threadflow,
443 	    _lwp_self());
444 #endif
445 
446 	/* Main filebench worker loop */
447 	/* CONSTCOND */
448 	while (1) {
449 		int i, count;
450 
451 		/* Abort if asked */
452 		if (threadflow->tf_abort || filebench_shm->shm_f_abort)
453 			break;
454 
455 		/* Be quiet while stats are gathered */
456 		if (filebench_shm->shm_bequiet) {
457 			(void) sleep(1);
458 			continue;
459 		}
460 
461 		/* Take it easy until everyone is ready to go */
462 		if (!filebench_shm->shm_running) {
463 			(void) sleep(1);
464 			continue;
465 		}
466 
467 		if (flowop == NULL) {
468 			filebench_log(LOG_ERROR, "flowop_read null flowop");
469 			return;
470 		}
471 
472 		if (flowop->fo_stats.fs_stime == 0)
473 			flowop->fo_stats.fs_stime = gethrtime();
474 
475 		/* Execute the flowop for fo_iters times */
476 		count = (int)avd_get_int(flowop->fo_iters);
477 		for (i = 0; i < count; i++) {
478 
479 			filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
480 			    "%s-%d", threadflow->tf_name, flowop->fo_name,
481 			    flowop->fo_instance);
482 
483 			ret = (*flowop->fo_func)(threadflow, flowop);
484 
485 			/*
486 			 * Return value FILEBENCH_ERROR means "flowop
487 			 * failed, stop the filebench run"
488 			 */
489 			if (ret == FILEBENCH_ERROR) {
490 				filebench_log(LOG_ERROR,
491 				    "%s-%d: flowop %s-%d failed",
492 				    threadflow->tf_name,
493 				    threadflow->tf_instance,
494 				    flowop->fo_name,
495 				    flowop->fo_instance);
496 				(void) ipc_mutex_lock(&threadflow->tf_lock);
497 				threadflow->tf_abort = 1;
498 				filebench_shm->shm_f_abort =
499 				    FILEBENCH_ABORT_ERROR;
500 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
501 				break;
502 			}
503 
504 			/*
505 			 * Return value of FILEBENCH_NORSC means "stop
506 			 * the filebench run" if in "end on no work mode",
507 			 * otherwise it indicates an error
508 			 */
509 			if (ret == FILEBENCH_NORSC) {
510 				(void) ipc_mutex_lock(&threadflow->tf_lock);
511 				threadflow->tf_abort = FILEBENCH_DONE;
512 				if (filebench_shm->shm_rmode ==
513 				    FILEBENCH_MODE_Q1STDONE) {
514 					filebench_shm->shm_f_abort =
515 					    FILEBENCH_ABORT_RSRC;
516 				} else if (filebench_shm->shm_rmode !=
517 				    FILEBENCH_MODE_QALLDONE) {
518 					filebench_log(LOG_ERROR1,
519 					    "WARNING! Run stopped early:\n   "
520 					    "             flowop %s-%d could "
521 					    "not obtain a file. Please\n      "
522 					    "          reduce runtime, "
523 					    "increase fileset entries "
524 					    "($nfiles), or switch modes.",
525 					    flowop->fo_name,
526 					    flowop->fo_instance);
527 					filebench_shm->shm_f_abort =
528 					    FILEBENCH_ABORT_ERROR;
529 				}
530 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
531 				break;
532 			}
533 
534 			/*
535 			 * Return value of FILEBENCH_DONE means "stop
536 			 * the filebench run without error"
537 			 */
538 			if (ret == FILEBENCH_DONE) {
539 				(void) ipc_mutex_lock(&threadflow->tf_lock);
540 				threadflow->tf_abort = FILEBENCH_DONE;
541 				filebench_shm->shm_f_abort =
542 				    FILEBENCH_ABORT_DONE;
543 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
544 				break;
545 			}
546 
547 			/*
548 			 * If we get here and the return is something other
549 			 * than FILEBENCH_OK, it means a spurious code
550 			 * was returned, so treat as major error. This
551 			 * probably indicates a bug in the flowop.
552 			 */
553 			if (ret != FILEBENCH_OK) {
554 				filebench_log(LOG_ERROR,
555 				    "Flowop %s unexpected return value = %d\n",
556 				    flowop->fo_name, ret);
557 				filebench_shm->shm_f_abort =
558 				    FILEBENCH_ABORT_ERROR;
559 				break;
560 			}
561 		}
562 
563 		/* advance to next flowop */
564 		flowop = flowop->fo_exec_next;
565 
566 		/* but if at end of list, start over from the beginning */
567 		if (flowop == NULL) {
568 			flowop = threadflow->tf_thrd_fops;
569 			threadflow->tf_stats.fs_count++;
570 		}
571 	}
572 
573 #ifdef HAVE_LWPS
574 	filebench_log(LOG_DEBUG_SCRIPT, "Thread %d exiting",
575 	    _lwp_self());
576 #endif
577 
578 	/* Tell flowops to destroy locally acquired state */
579 	flowop_destruct_all_flows(threadflow);
580 
581 	pthread_exit(&threadflow->tf_abort);
582 }
583 
584 void
585 flowop_init(void)
586 {
587 	(void) pthread_mutex_init(&controlstats_lock, ipc_mutexattr());
588 	flowoplib_init();
589 }
590 
591 /*
592  * Delete the designated flowop from the thread's flowop list.
593  */
594 static void
595 flowop_delete(flowop_t **flowoplist, flowop_t *flowop)
596 {
597 	flowop_t *entry = *flowoplist;
598 	int found = 0;
599 
600 	filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
601 	    flowop->fo_name,
602 	    flowop->fo_instance);
603 
604 	/* Delete from thread's flowop list */
605 	if (flowop == *flowoplist) {
606 		/* First on list */
607 		*flowoplist = flowop->fo_exec_next;
608 		filebench_log(LOG_DEBUG_IMPL,
609 		    "Delete0 flowop: (%s-%d)",
610 		    flowop->fo_name,
611 		    flowop->fo_instance);
612 	} else {
613 		while (entry->fo_exec_next) {
614 			filebench_log(LOG_DEBUG_IMPL,
615 			    "Delete0 flowop: (%s-%d) == (%s-%d)",
616 			    entry->fo_exec_next->fo_name,
617 			    entry->fo_exec_next->fo_instance,
618 			    flowop->fo_name,
619 			    flowop->fo_instance);
620 
621 			if (flowop == entry->fo_exec_next) {
622 				/* Delete */
623 				filebench_log(LOG_DEBUG_IMPL,
624 				    "Deleted0 flowop: (%s-%d)",
625 				    entry->fo_exec_next->fo_name,
626 				    entry->fo_exec_next->fo_instance);
627 				entry->fo_exec_next =
628 				    entry->fo_exec_next->fo_exec_next;
629 				break;
630 			}
631 			entry = entry->fo_exec_next;
632 		}
633 	}
634 
635 #ifdef HAVE_PROCFS
636 	/* Close /proc stats */
637 	if (flowop->fo_thread)
638 		(void) close(flowop->fo_thread->tf_lwpusagefd);
639 #endif
640 
641 	/* Delete from global list */
642 	entry = filebench_shm->shm_flowoplist;
643 
644 	if (flowop == filebench_shm->shm_flowoplist) {
645 		/* First on list */
646 		filebench_shm->shm_flowoplist = flowop->fo_next;
647 		found = 1;
648 	} else {
649 		while (entry->fo_next) {
650 			filebench_log(LOG_DEBUG_IMPL,
651 			    "Delete flowop: (%s-%d) == (%s-%d)",
652 			    entry->fo_next->fo_name,
653 			    entry->fo_next->fo_instance,
654 			    flowop->fo_name,
655 			    flowop->fo_instance);
656 
657 			if (flowop == entry->fo_next) {
658 				/* Delete */
659 				entry->fo_next = entry->fo_next->fo_next;
660 				found = 1;
661 				break;
662 			}
663 
664 			entry = entry->fo_next;
665 		}
666 	}
667 	if (found) {
668 		filebench_log(LOG_DEBUG_IMPL,
669 		    "Deleted flowop: (%s-%d)",
670 		    flowop->fo_name,
671 		    flowop->fo_instance);
672 		ipc_free(FILEBENCH_FLOWOP, (char *)flowop);
673 	} else {
674 		filebench_log(LOG_DEBUG_IMPL, "Flowop %s-%d not found!",
675 		    flowop->fo_name,
676 		    flowop->fo_instance);
677 	}
678 }
679 
680 /*
681  * Deletes all the flowops from a flowop list.
682  */
683 void
684 flowop_delete_all(flowop_t **flowoplist)
685 {
686 	flowop_t *flowop = *flowoplist;
687 
688 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
689 
690 	while (flowop) {
691 		filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
692 		    flowop->fo_name, flowop->fo_instance);
693 
694 		if (flowop->fo_instance &&
695 		    (flowop->fo_instance == FLOW_MASTER)) {
696 			flowop = flowop->fo_exec_next;
697 			continue;
698 		}
699 		flowop_delete(flowoplist, flowop);
700 		flowop = flowop->fo_exec_next;
701 	}
702 
703 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
704 }
705 
706 /*
707  * Allocates a flowop entity and initializes it with inherited
708  * contents from the "inherit" flowop, if it is supplied, or
709  * with zeros otherwise. In either case the fo_next and fo_exec_next
710  * pointers are set to NULL, and fo_thread is set to point to
711  * the owning threadflow. The initialized flowop is placed at
712  * the head of the global flowop list, and also placed on the
713  * tail of the supplied local flowop list, which will either
714  * be a threadflow's tf_thrd_fops list or a composite flowop's
715  * fo_comp_fops list. The routine locks the flowop's fo_lock and
716  * leaves it held on return. If successful, it returns a pointer
717  * to the allocated and initialized flowop, otherwise it returns NULL.
718  *
719  * filebench_shm->shm_flowop_lock must be held by caller.
720  */
721 static flowop_t *
722 flowop_define_common(threadflow_t *threadflow, char *name, flowop_t *inherit,
723     flowop_t **flowoplist_hdp, int instance, int type)
724 {
725 	flowop_t *flowop;
726 
727 	if (name == NULL)
728 		return (NULL);
729 
730 	if ((flowop = (flowop_t *)ipc_malloc(FILEBENCH_FLOWOP)) == NULL) {
731 		filebench_log(LOG_ERROR,
732 		    "flowop_define: Can't malloc flowop");
733 		return (NULL);
734 	}
735 
736 	filebench_log(LOG_DEBUG_IMPL, "defining flowops %s-%d, addr %zx",
737 	    name, instance, flowop);
738 
739 	if (flowop == NULL)
740 		return (NULL);
741 
742 	if (inherit) {
743 		(void) memcpy(flowop, inherit, sizeof (flowop_t));
744 		(void) pthread_mutex_init(&flowop->fo_lock, ipc_mutexattr());
745 		(void) ipc_mutex_lock(&flowop->fo_lock);
746 		flowop->fo_next = NULL;
747 		flowop->fo_exec_next = NULL;
748 		filebench_log(LOG_DEBUG_IMPL,
749 		    "flowop %s-%d calling init", name, instance);
750 	} else {
751 		(void) memset(flowop, 0, sizeof (flowop_t));
752 		flowop->fo_iters = avd_int_alloc(1);
753 		flowop->fo_type = type;
754 		(void) pthread_mutex_init(&flowop->fo_lock, ipc_mutexattr());
755 		(void) ipc_mutex_lock(&flowop->fo_lock);
756 	}
757 
758 	/* Create backpointer to thread */
759 	flowop->fo_thread = threadflow;
760 
761 	/* Add flowop to global list */
762 	if (filebench_shm->shm_flowoplist == NULL) {
763 		filebench_shm->shm_flowoplist = flowop;
764 		flowop->fo_next = NULL;
765 	} else {
766 		flowop->fo_next = filebench_shm->shm_flowoplist;
767 		filebench_shm->shm_flowoplist = flowop;
768 	}
769 
770 	(void) strcpy(flowop->fo_name, name);
771 	flowop->fo_instance = instance;
772 
773 	if (flowoplist_hdp == NULL)
774 		return (flowop);
775 
776 	/* Add flowop to thread op list */
777 	if (*flowoplist_hdp == NULL) {
778 		*flowoplist_hdp = flowop;
779 		flowop->fo_exec_next = NULL;
780 	} else {
781 		flowop_t *flowend;
782 
783 		/* Find the end of the thread list */
784 		flowend = *flowoplist_hdp;
785 		while (flowend->fo_exec_next != NULL)
786 			flowend = flowend->fo_exec_next;
787 		flowend->fo_exec_next = flowop;
788 		flowop->fo_exec_next = NULL;
789 	}
790 
791 	return (flowop);
792 }
793 
794 /*
795  * Calls flowop_define_common() to allocate and initialize a
796  * flowop, and holds the shared flowop_lock during the call.
797  * It releases the created flowop's fo_lock when done.
798  */
799 flowop_t *
800 flowop_define(threadflow_t *threadflow, char *name, flowop_t *inherit,
801     flowop_t **flowoplist_hdp, int instance, int type)
802 {
803 	flowop_t	*flowop;
804 
805 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
806 	flowop = flowop_define_common(threadflow, name,
807 	    inherit, flowoplist_hdp, instance, type);
808 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
809 
810 	if (flowop == NULL)
811 		return (NULL);
812 
813 	(void) ipc_mutex_unlock(&flowop->fo_lock);
814 	return (flowop);
815 }
816 
817 /*
818  * Calls flowop_define_common() to allocate and initialize a
819  * composite flowop, and holds the shared flowop_lock during the call.
820  * It releases the created flowop's fo_lock when done.
821  */
822 flowop_t *
823 flowop_new_composite_define(char *name)
824 {
825 	flowop_t *flowop;
826 
827 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
828 	flowop = flowop_define_common(NULL, name,
829 	    NULL, NULL, 0, FLOW_TYPE_COMPOSITE);
830 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
831 
832 	if (flowop == NULL)
833 		return (NULL);
834 
835 	flowop->fo_func = flowop_composite;
836 	flowop->fo_init = flowop_composite_init;
837 	flowop->fo_destruct = flowop_composite_destruct;
838 	(void) ipc_mutex_unlock(&flowop->fo_lock);
839 
840 	return (flowop);
841 }
842 
843 /*
844  * Attempts to take a write lock on the flowop_find_lock that is
845  * defined in interprocess shared memory. Since each call to
846  * flowop_start() holds a read lock on flowop_find_lock, this
847  * routine effectively blocks until all instances of
848  * flowop_start() have finished. The flowop_find() routine calls
849  * this routine so that flowops won't be searched for until all
850  * flowops have been created by flowop_start.
851  */
852 static void
853 flowop_find_barrier(void)
854 {
855 	/* Block on wrlock to ensure find waits for all creates */
856 	(void) pthread_rwlock_wrlock(&filebench_shm->shm_flowop_find_lock);
857 	(void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
858 }
859 
860 /*
861  * Returns a list of flowops named "name" from the master
862  * flowop list.
863  */
864 flowop_t *
865 flowop_find(char *name)
866 {
867 	flowop_t *flowop;
868 	flowop_t *result = NULL;
869 
870 	flowop_find_barrier();
871 
872 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
873 
874 	flowop = filebench_shm->shm_flowoplist;
875 
876 	while (flowop) {
877 		if (strcmp(name, flowop->fo_name) == 0) {
878 
879 			/* Add flowop to result list */
880 			if (result == NULL) {
881 				result = flowop;
882 				flowop->fo_resultnext = NULL;
883 			} else {
884 				flowop->fo_resultnext = result;
885 				result = flowop;
886 			}
887 		}
888 		flowop = flowop->fo_next;
889 	}
890 
891 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
892 
893 
894 	return (result);
895 }
896 
897 /*
898  * Returns a pointer to the specified instance of flowop
899  * "name" from the supplied list.
900  */
901 flowop_t *
902 flowop_find_one(char *name, int instance)
903 {
904 	flowop_t *test_flowop;
905 
906 	flowop_find_barrier();
907 
908 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
909 
910 	test_flowop = filebench_shm->shm_flowoplist;
911 
912 	while (test_flowop) {
913 		if ((strcmp(name, test_flowop->fo_name) == 0) &&
914 		    (instance == test_flowop->fo_instance))
915 			break;
916 
917 		test_flowop = test_flowop->fo_next;
918 	}
919 
920 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
921 
922 	return (test_flowop);
923 }
924 
925 /*
926  * Composite flowop method. Does one pass through its list of
927  * inner flowops per iteration.
928  */
929 static int
930 flowop_composite(threadflow_t *threadflow, flowop_t *flowop)
931 {
932 	flowop_t	*inner_flowop;
933 
934 	/* get the first flowop in the list */
935 	inner_flowop = flowop->fo_comp_fops;
936 
937 	/* make a pass through the list of sub flowops */
938 	while (inner_flowop) {
939 		int	i, count;
940 
941 		/* Abort if asked */
942 		if (threadflow->tf_abort || filebench_shm->shm_f_abort)
943 			return (FILEBENCH_DONE);
944 
945 		if (inner_flowop->fo_stats.fs_stime == 0)
946 			inner_flowop->fo_stats.fs_stime = gethrtime();
947 
948 		/* Execute the flowop for fo_iters times */
949 		count = (int)avd_get_int(inner_flowop->fo_iters);
950 		for (i = 0; i < count; i++) {
951 
952 			filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
953 			    "%s-%d", threadflow->tf_name,
954 			    inner_flowop->fo_name,
955 			    inner_flowop->fo_instance);
956 
957 			switch ((*inner_flowop->fo_func)(threadflow,
958 			    inner_flowop)) {
959 
960 			/* all done */
961 			case FILEBENCH_DONE:
962 				return (FILEBENCH_DONE);
963 
964 			/* quit if inner flowop limit reached */
965 			case FILEBENCH_NORSC:
966 				return (FILEBENCH_NORSC);
967 
968 			/* quit on inner flowop error */
969 			case FILEBENCH_ERROR:
970 				filebench_log(LOG_ERROR,
971 				    "inner flowop %s failed",
972 				    inner_flowop->fo_name);
973 				return (FILEBENCH_ERROR);
974 
975 			/* otherwise keep going */
976 			default:
977 				break;
978 			}
979 
980 		}
981 
982 		/* advance to next flowop */
983 		inner_flowop = inner_flowop->fo_exec_next;
984 	}
985 
986 	/* finished with this pass */
987 	return (FILEBENCH_OK);
988 }
989 
990 /*
991  * Composite flowop initialization. Creates runtime inner flowops
992  * from prototype inner flowops.
993  */
994 static int
995 flowop_composite_init(flowop_t *flowop)
996 {
997 	int err;
998 
999 	err = flowop_create_runtime_flowops(flowop->fo_thread,
1000 	    &flowop->fo_comp_fops);
1001 	if (err != FILEBENCH_OK)
1002 		return (err);
1003 
1004 	(void) ipc_mutex_unlock(&flowop->fo_lock);
1005 	return (0);
1006 }
1007 
1008 /*
1009  * clean up inner flowops
1010  */
1011 static void
1012 flowop_composite_destruct(flowop_t *flowop)
1013 {
1014 	flowop_t *inner_flowop = flowop->fo_comp_fops;
1015 
1016 	while (inner_flowop) {
1017 		filebench_log(LOG_DEBUG_IMPL, "Deleting inner flowop (%s-%d)",
1018 		    inner_flowop->fo_name, inner_flowop->fo_instance);
1019 
1020 		if (inner_flowop->fo_instance &&
1021 		    (inner_flowop->fo_instance == FLOW_MASTER)) {
1022 			inner_flowop = inner_flowop->fo_exec_next;
1023 			continue;
1024 		}
1025 		flowop_delete(&flowop->fo_comp_fops, inner_flowop);
1026 		inner_flowop = inner_flowop->fo_exec_next;
1027 	}
1028 }
1029