xref: /onnv-gate/usr/src/cmd/filebench/common/flowop.c (revision 9326:475779da8c08)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #include "config.h"
27 
28 #ifdef HAVE_LWPS
29 #include <sys/lwp.h>
30 #endif
31 #include <fcntl.h>
32 #include "filebench.h"
33 #include "flowop.h"
34 #include "stats.h"
35 
36 #ifdef LINUX_PORT
37 #include <sys/types.h>
38 #include <linux/unistd.h>
39 #endif
40 
41 static flowop_t *flowop_define_common(threadflow_t *threadflow, char *name,
42     flowop_t *inherit, flowop_t **flowoplist_hdp, int instance, int type);
43 static int flowop_composite(threadflow_t *threadflow, flowop_t *flowop);
44 static int flowop_composite_init(flowop_t *flowop);
45 static void flowop_composite_destruct(flowop_t *flowop);
46 
47 /*
48  * A collection of flowop support functions. The actual code that
49  * implements the various flowops is in flowop_library.c.
50  *
51  * Routines for defining, creating, initializing and destroying
52  * flowops, cyclically invoking the flowops on each threadflow's flowop
53  * list, collecting statistics about flowop execution, and other
54  * housekeeping duties are included in this file.
55  *
56  * User Defined Composite Flowops
57  *    The ability to define new flowops as lists of built-in or previously
58  * defined flowops has been added to Filebench. In a sense they are like
59  * in-line subroutines, which can have default attributes set at definition
60  * time and passed arguments at invocation time. Like other flowops (and
61  * unlike conventional subroutines) you can invoke them with an iteration
62  * count (the "iter" attribute), and they will loop through their associated
63  * list of flowops for "iter" number of times each time they are encountered
64  * in the thread or outer composite flowop which invokes them.
65  *
66  * Composite flowops are created with a "define" command, are given a name,
67  * optional default attributes, and local variable definitions on the
68  * "define" command line, followed by a brace enclosed list of flowops
69  * to execute. The enclosed flowops may include attributes that reference
70  * the local variables, as well as constants and global variables.
71  *
72  * Composite flowops are used pretty much like regular flowops, but you can
73  * also set local variables to constants or global variables ($local_var =
74  * [$var | $random_var | string | boolean | integer | double]) as part of
75  * the invocation. Thus each invocation can pass customized values to its
76  * inner flowops, greatly increasing their generality.
77  *
78  * All flowops are placed on a global, singly linked list, with fo_next
79  * being the link pointer for this list. The are also placed on a private
80  * list for the thread or composite flowop they are part of. The tf_thrd_fops
81  * pointer in the thread will point to the list of top level flowops in the
82  * thread, which are linked together by fo_exec_next. If any of these flowops
83  * are composite flowops, they will have a list of second level flowops rooted
84  * at the composite flowop's fo_comp_fops pointer. So, there is one big list
85  * of all flowops, and an n-arry tree of threads, composite flowops, and
86  * flowops, with composite flowops being the branch nodes in the tree.
87  *
88  * To illustrate, if we have three first level flowops, the first of which is
89  * a composite flowop consisting of two other flowops, we get:
90  *
91  * Thread->tf_thrd_fops -> flowop->fo_exec_next -> flowop->fo_exec_next
92  *			   flowop->fo_comp_fops		    |
93  *				    |			    V
94  *				    |			flowop->fo_exec_next
95  *				    |
96  *				    V
97  *				flowop->fo_exec_next -> flowop->fo_exec_next
98  *
99  * And all five flowops (plus others from any other threads) are on a global
100  * list linked with fo_next.
101  */
102 
103 /*
104  * Prints the name and instance number of each flowop in
105  * the supplied list to the filebench log.
106  */
107 int
108 flowop_printlist(flowop_t *list)
109 {
110 	flowop_t *flowop = list;
111 
112 	while (flowop) {
113 		filebench_log(LOG_DEBUG_IMPL, "flowop-list %s-%d",
114 		    flowop->fo_name, flowop->fo_instance);
115 		flowop = flowop->fo_exec_next;
116 	}
117 	return (0);
118 }
119 
120 /*
121  * Prints the name and instance number of all flowops on
122  * the master flowop list to the console and the filebench log.
123  */
124 void
125 flowop_printall(void)
126 {
127 	flowop_t *flowop = filebench_shm->shm_flowoplist;
128 
129 	while (flowop) {
130 		filebench_log(LOG_VERBOSE, "flowop-list %s-%d",
131 		    flowop->fo_name, flowop->fo_instance);
132 		flowop = flowop->fo_next;
133 	}
134 }
135 
136 #define	TIMESPEC_TO_HRTIME(s, e) (((e.tv_sec - s.tv_sec) * 1000000000LL) + \
137 					(e.tv_nsec - s.tv_nsec))
138 /*
139  * Puts current high resolution time in start time entry
140  * for threadflow and may also calculate running filebench
141  * overhead statistics.
142  */
143 void
144 flowop_beginop(threadflow_t *threadflow, flowop_t *flowop)
145 {
146 #ifdef HAVE_PROCFS
147 	if ((noproc == 0) && (threadflow->tf_lwpusagefd == 0)) {
148 		char procname[128];
149 
150 		(void) snprintf(procname, sizeof (procname),
151 		    "/proc/%d/lwp/%d/lwpusage", my_pid, _lwp_self());
152 		threadflow->tf_lwpusagefd = open(procname, O_RDONLY);
153 	}
154 
155 	(void) pread(threadflow->tf_lwpusagefd,
156 	    &threadflow->tf_susage,
157 	    sizeof (struct prusage), 0);
158 
159 	/* Compute overhead time in this thread around op */
160 	if (threadflow->tf_eusage.pr_stime.tv_nsec) {
161 		flowop->fo_stats.fs_mstate[FLOW_MSTATE_OHEAD] +=
162 		    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_utime,
163 		    threadflow->tf_susage.pr_utime) +
164 		    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_ttime,
165 		    threadflow->tf_susage.pr_ttime) +
166 		    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_stime,
167 		    threadflow->tf_susage.pr_stime);
168 	}
169 #endif
170 	/* Start of op for this thread */
171 	threadflow->tf_stime = gethrtime();
172 }
173 
174 flowstat_t controlstats;
175 pthread_mutex_t controlstats_lock;
176 static int controlstats_zeroed = 0;
177 
178 /*
179  * Updates flowop's latency statistics, using saved start
180  * time and current high resolution time. Updates flowop's
181  * io count and transferred bytes statistics. Also updates
182  * threadflow's and flowop's cumulative read or write byte
183  * and io count statistics.
184  */
185 void
186 flowop_endop(threadflow_t *threadflow, flowop_t *flowop, int64_t bytes)
187 {
188 	hrtime_t t;
189 
190 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_LAT] +=
191 	    (gethrtime() - threadflow->tf_stime);
192 #ifdef HAVE_PROCFS
193 	if ((pread(threadflow->tf_lwpusagefd, &threadflow->tf_eusage,
194 	    sizeof (struct prusage), 0)) != sizeof (struct prusage))
195 		filebench_log(LOG_ERROR, "cannot read /proc");
196 
197 	t =
198 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_utime,
199 	    threadflow->tf_eusage.pr_utime) +
200 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_ttime,
201 	    threadflow->tf_eusage.pr_ttime) +
202 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_stime,
203 	    threadflow->tf_eusage.pr_stime);
204 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_CPU] += t;
205 
206 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_WAIT] +=
207 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_tftime,
208 	    threadflow->tf_eusage.pr_tftime) +
209 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_dftime,
210 	    threadflow->tf_eusage.pr_dftime) +
211 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
212 	    threadflow->tf_eusage.pr_kftime) +
213 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
214 	    threadflow->tf_eusage.pr_kftime) +
215 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_slptime,
216 	    threadflow->tf_eusage.pr_slptime);
217 #endif
218 
219 	flowop->fo_stats.fs_count++;
220 	flowop->fo_stats.fs_bytes += bytes;
221 	(void) ipc_mutex_lock(&controlstats_lock);
222 	if ((flowop->fo_type & FLOW_TYPE_IO) ||
223 	    (flowop->fo_type & FLOW_TYPE_AIO)) {
224 		controlstats.fs_count++;
225 		controlstats.fs_bytes += bytes;
226 	}
227 	if (flowop->fo_attrs & FLOW_ATTR_READ) {
228 		threadflow->tf_stats.fs_rbytes += bytes;
229 		threadflow->tf_stats.fs_rcount++;
230 		flowop->fo_stats.fs_rcount++;
231 		controlstats.fs_rbytes += bytes;
232 		controlstats.fs_rcount++;
233 	} else if (flowop->fo_attrs & FLOW_ATTR_WRITE) {
234 		threadflow->tf_stats.fs_wbytes += bytes;
235 		threadflow->tf_stats.fs_wcount++;
236 		flowop->fo_stats.fs_wcount++;
237 		controlstats.fs_wbytes += bytes;
238 		controlstats.fs_wcount++;
239 	}
240 	(void) ipc_mutex_unlock(&controlstats_lock);
241 }
242 
243 /*
244  * Calls the flowop's initialization function, pointed to by
245  * flowop->fo_init.
246  */
247 static int
248 flowop_initflow(flowop_t *flowop)
249 {
250 	/*
251 	 * save static copies of two items, in case they are supplied
252 	 * from random variables
253 	 */
254 	if (!AVD_IS_STRING(flowop->fo_value))
255 		flowop->fo_constvalue = avd_get_int(flowop->fo_value);
256 
257 	flowop->fo_constwss = avd_get_int(flowop->fo_wss);
258 
259 	if ((*flowop->fo_init)(flowop) < 0) {
260 		filebench_log(LOG_ERROR, "flowop %s-%d init failed",
261 		    flowop->fo_name, flowop->fo_instance);
262 		return (-1);
263 	}
264 	return (0);
265 }
266 
267 static int
268 flowop_create_runtime_flowops(threadflow_t *threadflow, flowop_t **ops_list_ptr)
269 {
270 	flowop_t *flowop = *ops_list_ptr;
271 
272 	while (flowop) {
273 		flowop_t *newflowop;
274 
275 		if (flowop == *ops_list_ptr)
276 			*ops_list_ptr = NULL;
277 
278 		newflowop = flowop_define_common(threadflow, flowop->fo_name,
279 		    flowop, ops_list_ptr, 1, 0);
280 		if (newflowop == NULL)
281 			return (FILEBENCH_ERROR);
282 
283 		/* check for fo_filename attribute, and resolve if present */
284 		if (flowop->fo_filename) {
285 			char *name;
286 
287 			name = avd_get_str(flowop->fo_filename);
288 			newflowop->fo_fileset = fileset_find(name);
289 
290 			if (newflowop->fo_fileset == NULL) {
291 				filebench_log(LOG_ERROR,
292 				    "flowop %s: file %s not found",
293 				    newflowop->fo_name, name);
294 				filebench_shutdown(1);
295 			}
296 		}
297 
298 		if (flowop_initflow(newflowop) < 0) {
299 			filebench_log(LOG_ERROR, "Flowop init of %s failed",
300 			    newflowop->fo_name);
301 		}
302 
303 		flowop = flowop->fo_exec_next;
304 	}
305 	return (FILEBENCH_OK);
306 }
307 
308 /*
309  * Calls the flowop's destruct function, pointed to by
310  * flowop->fo_destruct.
311  */
312 static void
313 flowop_destructflow(flowop_t *flowop)
314 {
315 	(*flowop->fo_destruct)(flowop);
316 }
317 
318 /*
319  * call the destruct funtions of all the threadflow's flowops,
320  * if it is still flagged as "running".
321  */
322 void
323 flowop_destruct_all_flows(threadflow_t *threadflow)
324 {
325 	flowop_t *flowop;
326 
327 	/* wait a moment to give other threads a chance to stop too */
328 	(void) sleep(1);
329 
330 	(void) ipc_mutex_lock(&threadflow->tf_lock);
331 
332 	/* prepare to call destruct flow routines, if necessary */
333 	if (threadflow->tf_running == 0) {
334 
335 		/* allready destroyed */
336 		(void) ipc_mutex_unlock(&threadflow->tf_lock);
337 		return;
338 	}
339 
340 	flowop = threadflow->tf_thrd_fops;
341 	threadflow->tf_running = 0;
342 	(void) ipc_mutex_unlock(&threadflow->tf_lock);
343 
344 	while (flowop) {
345 		flowop_destructflow(flowop);
346 		flowop = flowop->fo_exec_next;
347 	}
348 }
349 
350 /*
351  * The final initialization and main execution loop for the
352  * worker threads. Sets threadflow and flowop start times,
353  * waits for all process to start, then creates the runtime
354  * flowops from those defined by the F language workload
355  * script. It does some more initialization, then enters a
356  * loop to repeatedly execute the flowops on the flowop list
357  * until an abort condition is detected, at which time it exits.
358  * This is the starting routine for the new worker thread
359  * created by threadflow_createthread(), and is not currently
360  * called from anywhere else.
361  */
362 void
363 flowop_start(threadflow_t *threadflow)
364 {
365 	flowop_t *flowop;
366 	size_t memsize;
367 	int ret = FILEBENCH_OK;
368 
369 #ifdef HAVE_PROCFS
370 	if (noproc == 0) {
371 		char procname[128];
372 		long ctl[2] = {PCSET, PR_MSACCT};
373 		int pfd;
374 
375 		(void) snprintf(procname, sizeof (procname),
376 		    "/proc/%d/lwp/%d/lwpctl", my_pid, _lwp_self());
377 		pfd = open(procname, O_WRONLY);
378 		(void) pwrite(pfd, &ctl, sizeof (ctl), 0);
379 		(void) close(pfd);
380 	}
381 #endif
382 
383 	(void) ipc_mutex_lock(&controlstats_lock);
384 	if (!controlstats_zeroed) {
385 		(void) memset(&controlstats, 0, sizeof (controlstats));
386 		controlstats_zeroed = 1;
387 	}
388 	(void) ipc_mutex_unlock(&controlstats_lock);
389 
390 	flowop = threadflow->tf_thrd_fops;
391 	threadflow->tf_stats.fs_stime = gethrtime();
392 	flowop->fo_stats.fs_stime = gethrtime();
393 
394 	/* Hold the flowop find lock as reader to prevent lookups */
395 	(void) pthread_rwlock_rdlock(&filebench_shm->shm_flowop_find_lock);
396 
397 	/*
398 	 * Block until all processes have started, acting like
399 	 * a barrier. The original filebench process initially
400 	 * holds the run_lock as a reader, preventing any of the
401 	 * threads from obtaining the writer lock, and hence
402 	 * passing this point. Once all processes and threads
403 	 * have been created, the original process unlocks
404 	 * run_lock, allowing each waiting thread to lock
405 	 * and then immediately unlock it, then begin running.
406 	 */
407 	(void) pthread_rwlock_wrlock(&filebench_shm->shm_run_lock);
408 	(void) pthread_rwlock_unlock(&filebench_shm->shm_run_lock);
409 
410 	/* Create the runtime flowops from those defined by the script */
411 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
412 	if (flowop_create_runtime_flowops(threadflow, &threadflow->tf_thrd_fops)
413 	    != FILEBENCH_OK) {
414 		(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
415 		filebench_shutdown(1);
416 		return;
417 	}
418 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
419 
420 	/* Release the find lock as reader to allow lookups */
421 	(void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
422 
423 	/* Set to the start of the new flowop list */
424 	flowop = threadflow->tf_thrd_fops;
425 
426 	threadflow->tf_abort = 0;
427 	threadflow->tf_running = 1;
428 
429 	memsize = (size_t)threadflow->tf_constmemsize;
430 
431 	/* If we are going to use ISM, allocate later */
432 	if (threadflow->tf_attrs & THREADFLOW_USEISM) {
433 		threadflow->tf_mem =
434 		    ipc_ismmalloc(memsize);
435 	} else {
436 		threadflow->tf_mem =
437 		    malloc(memsize);
438 	}
439 
440 	(void) memset(threadflow->tf_mem, 0, memsize);
441 	filebench_log(LOG_DEBUG_SCRIPT, "Thread allocated %d bytes", memsize);
442 
443 #ifdef HAVE_LWPS
444 	filebench_log(LOG_DEBUG_SCRIPT, "Thread %zx (%d) started",
445 	    threadflow,
446 	    _lwp_self());
447 #endif
448 
449 	/* Main filebench worker loop */
450 	while (ret == FILEBENCH_OK) {
451 		int i, count;
452 
453 		/* Abort if asked */
454 		if (threadflow->tf_abort || filebench_shm->shm_f_abort)
455 			break;
456 
457 		/* Be quiet while stats are gathered */
458 		if (filebench_shm->shm_bequiet) {
459 			(void) sleep(1);
460 			continue;
461 		}
462 
463 		/* Take it easy until everyone is ready to go */
464 		if (!filebench_shm->shm_procs_running) {
465 			(void) sleep(1);
466 			continue;
467 		}
468 
469 		if (flowop == NULL) {
470 			filebench_log(LOG_ERROR, "flowop_read null flowop");
471 			return;
472 		}
473 
474 		if (flowop->fo_stats.fs_stime == 0)
475 			flowop->fo_stats.fs_stime = gethrtime();
476 
477 		/* Execute the flowop for fo_iters times */
478 		count = (int)avd_get_int(flowop->fo_iters);
479 		for (i = 0; i < count; i++) {
480 
481 			filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
482 			    "%s-%d", threadflow->tf_name, flowop->fo_name,
483 			    flowop->fo_instance);
484 
485 			ret = (*flowop->fo_func)(threadflow, flowop);
486 
487 			/*
488 			 * Return value FILEBENCH_ERROR means "flowop
489 			 * failed, stop the filebench run"
490 			 */
491 			if (ret == FILEBENCH_ERROR) {
492 				filebench_log(LOG_ERROR,
493 				    "%s-%d: flowop %s-%d failed",
494 				    threadflow->tf_name,
495 				    threadflow->tf_instance,
496 				    flowop->fo_name,
497 				    flowop->fo_instance);
498 				(void) ipc_mutex_lock(&threadflow->tf_lock);
499 				threadflow->tf_abort = 1;
500 				filebench_shm->shm_f_abort =
501 				    FILEBENCH_ABORT_ERROR;
502 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
503 				break;
504 			}
505 
506 			/*
507 			 * Return value of FILEBENCH_NORSC means "stop
508 			 * the filebench run" if in "end on no work mode",
509 			 * otherwise it indicates an error
510 			 */
511 			if (ret == FILEBENCH_NORSC) {
512 				(void) ipc_mutex_lock(&threadflow->tf_lock);
513 				threadflow->tf_abort = FILEBENCH_DONE;
514 				if (filebench_shm->shm_rmode ==
515 				    FILEBENCH_MODE_Q1STDONE) {
516 					filebench_shm->shm_f_abort =
517 					    FILEBENCH_ABORT_RSRC;
518 				} else if (filebench_shm->shm_rmode !=
519 				    FILEBENCH_MODE_QALLDONE) {
520 					filebench_log(LOG_ERROR1,
521 					    "WARNING! Run stopped early:\n   "
522 					    "             flowop %s-%d could "
523 					    "not obtain a file. Please\n      "
524 					    "          reduce runtime, "
525 					    "increase fileset entries "
526 					    "($nfiles), or switch modes.",
527 					    flowop->fo_name,
528 					    flowop->fo_instance);
529 					filebench_shm->shm_f_abort =
530 					    FILEBENCH_ABORT_ERROR;
531 				}
532 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
533 				break;
534 			}
535 
536 			/*
537 			 * Return value of FILEBENCH_DONE means "stop
538 			 * the filebench run without error"
539 			 */
540 			if (ret == FILEBENCH_DONE) {
541 				(void) ipc_mutex_lock(&threadflow->tf_lock);
542 				threadflow->tf_abort = FILEBENCH_DONE;
543 				filebench_shm->shm_f_abort =
544 				    FILEBENCH_ABORT_DONE;
545 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
546 				break;
547 			}
548 
549 			/*
550 			 * If we get here and the return is something other
551 			 * than FILEBENCH_OK, it means a spurious code
552 			 * was returned, so treat as major error. This
553 			 * probably indicates a bug in the flowop.
554 			 */
555 			if (ret != FILEBENCH_OK) {
556 				filebench_log(LOG_ERROR,
557 				    "Flowop %s unexpected return value = %d\n",
558 				    flowop->fo_name, ret);
559 				filebench_shm->shm_f_abort =
560 				    FILEBENCH_ABORT_ERROR;
561 				break;
562 			}
563 		}
564 
565 		/* advance to next flowop */
566 		flowop = flowop->fo_exec_next;
567 
568 		/* but if at end of list, start over from the beginning */
569 		if (flowop == NULL) {
570 			flowop = threadflow->tf_thrd_fops;
571 			threadflow->tf_stats.fs_count++;
572 		}
573 	}
574 
575 #ifdef HAVE_LWPS
576 	filebench_log(LOG_DEBUG_SCRIPT, "Thread %d exiting",
577 	    _lwp_self());
578 #endif
579 
580 	/* Tell flowops to destroy locally acquired state */
581 	flowop_destruct_all_flows(threadflow);
582 
583 	pthread_exit(&threadflow->tf_abort);
584 }
585 
586 void flowoplib_flowinit(void);
587 void fb_lfs_flowinit(void);
588 
589 void
590 flowop_init(void)
591 {
592 	(void) pthread_mutex_init(&controlstats_lock,
593 	    ipc_mutexattr(IPC_MUTEX_NORMAL));
594 	flowoplib_flowinit();
595 }
596 
597 static int plugin_flowinit_done = FALSE;
598 
599 /*
600  * Initialize any "plug-in" flowops. Called when the first "create fileset"
601  * command is encountered.
602  */
603 void
604 flowop_plugin_flowinit(void)
605 {
606 	if (plugin_flowinit_done)
607 		return;
608 
609 	plugin_flowinit_done = TRUE;
610 
611 	switch (filebench_shm->shm_filesys_type) {
612 	case LOCAL_FS_PLUG:
613 		fb_lfs_flowinit();
614 		break;
615 
616 	case NFS3_PLUG:
617 	case NFS4_PLUG:
618 	case CIFS_PLUG:
619 		break;
620 	}
621 }
622 
623 
624 /*
625  * Delete the designated flowop from the thread's flowop list.
626  */
627 static void
628 flowop_delete(flowop_t **flowoplist, flowop_t *flowop)
629 {
630 	flowop_t *entry = *flowoplist;
631 	int found = 0;
632 
633 	filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
634 	    flowop->fo_name,
635 	    flowop->fo_instance);
636 
637 	/* Delete from thread's flowop list */
638 	if (flowop == *flowoplist) {
639 		/* First on list */
640 		*flowoplist = flowop->fo_exec_next;
641 		filebench_log(LOG_DEBUG_IMPL,
642 		    "Delete0 flowop: (%s-%d)",
643 		    flowop->fo_name,
644 		    flowop->fo_instance);
645 	} else {
646 		while (entry->fo_exec_next) {
647 			filebench_log(LOG_DEBUG_IMPL,
648 			    "Delete0 flowop: (%s-%d) == (%s-%d)",
649 			    entry->fo_exec_next->fo_name,
650 			    entry->fo_exec_next->fo_instance,
651 			    flowop->fo_name,
652 			    flowop->fo_instance);
653 
654 			if (flowop == entry->fo_exec_next) {
655 				/* Delete */
656 				filebench_log(LOG_DEBUG_IMPL,
657 				    "Deleted0 flowop: (%s-%d)",
658 				    entry->fo_exec_next->fo_name,
659 				    entry->fo_exec_next->fo_instance);
660 				entry->fo_exec_next =
661 				    entry->fo_exec_next->fo_exec_next;
662 				break;
663 			}
664 			entry = entry->fo_exec_next;
665 		}
666 	}
667 
668 #ifdef HAVE_PROCFS
669 	/* Close /proc stats */
670 	if (flowop->fo_thread)
671 		(void) close(flowop->fo_thread->tf_lwpusagefd);
672 #endif
673 
674 	/* Delete from global list */
675 	entry = filebench_shm->shm_flowoplist;
676 
677 	if (flowop == filebench_shm->shm_flowoplist) {
678 		/* First on list */
679 		filebench_shm->shm_flowoplist = flowop->fo_next;
680 		found = 1;
681 	} else {
682 		while (entry->fo_next) {
683 			filebench_log(LOG_DEBUG_IMPL,
684 			    "Delete flowop: (%s-%d) == (%s-%d)",
685 			    entry->fo_next->fo_name,
686 			    entry->fo_next->fo_instance,
687 			    flowop->fo_name,
688 			    flowop->fo_instance);
689 
690 			if (flowop == entry->fo_next) {
691 				/* Delete */
692 				entry->fo_next = entry->fo_next->fo_next;
693 				found = 1;
694 				break;
695 			}
696 
697 			entry = entry->fo_next;
698 		}
699 	}
700 	if (found) {
701 		filebench_log(LOG_DEBUG_IMPL,
702 		    "Deleted flowop: (%s-%d)",
703 		    flowop->fo_name,
704 		    flowop->fo_instance);
705 		ipc_free(FILEBENCH_FLOWOP, (char *)flowop);
706 	} else {
707 		filebench_log(LOG_DEBUG_IMPL, "Flowop %s-%d not found!",
708 		    flowop->fo_name,
709 		    flowop->fo_instance);
710 	}
711 }
712 
713 /*
714  * Deletes all the flowops from a flowop list.
715  */
716 void
717 flowop_delete_all(flowop_t **flowoplist)
718 {
719 	flowop_t *flowop = *flowoplist;
720 	flowop_t *next_flowop;
721 
722 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
723 
724 	while (flowop) {
725 		filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
726 		    flowop->fo_name, flowop->fo_instance);
727 
728 		if (flowop->fo_instance &&
729 		    (flowop->fo_instance == FLOW_MASTER)) {
730 			flowop = flowop->fo_exec_next;
731 			continue;
732 		}
733 		next_flowop = flowop->fo_exec_next;
734 		flowop_delete(flowoplist, flowop);
735 		flowop = next_flowop;
736 	}
737 
738 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
739 }
740 
741 /*
742  * Allocates a flowop entity and initializes it with inherited
743  * contents from the "inherit" flowop, if it is supplied, or
744  * with zeros otherwise. In either case the fo_next and fo_exec_next
745  * pointers are set to NULL, and fo_thread is set to point to
746  * the owning threadflow. The initialized flowop is placed at
747  * the head of the global flowop list, and also placed on the
748  * tail of the supplied local flowop list, which will either
749  * be a threadflow's tf_thrd_fops list or a composite flowop's
750  * fo_comp_fops list. The routine locks the flowop's fo_lock and
751  * leaves it held on return. If successful, it returns a pointer
752  * to the allocated and initialized flowop, otherwise it returns NULL.
753  *
754  * filebench_shm->shm_flowop_lock must be held by caller.
755  */
756 static flowop_t *
757 flowop_define_common(threadflow_t *threadflow, char *name, flowop_t *inherit,
758     flowop_t **flowoplist_hdp, int instance, int type)
759 {
760 	flowop_t *flowop;
761 
762 	if (name == NULL)
763 		return (NULL);
764 
765 	if ((flowop = (flowop_t *)ipc_malloc(FILEBENCH_FLOWOP)) == NULL) {
766 		filebench_log(LOG_ERROR,
767 		    "flowop_define: Can't malloc flowop");
768 		return (NULL);
769 	}
770 
771 	filebench_log(LOG_DEBUG_IMPL, "defining flowops %s-%d, addr %zx",
772 	    name, instance, flowop);
773 
774 	if (flowop == NULL)
775 		return (NULL);
776 
777 	if (inherit) {
778 		(void) memcpy(flowop, inherit, sizeof (flowop_t));
779 		(void) pthread_mutex_init(&flowop->fo_lock,
780 		    ipc_mutexattr(IPC_MUTEX_PRI_ROB));
781 		(void) ipc_mutex_lock(&flowop->fo_lock);
782 		flowop->fo_next = NULL;
783 		flowop->fo_exec_next = NULL;
784 		filebench_log(LOG_DEBUG_IMPL,
785 		    "flowop %s-%d calling init", name, instance);
786 	} else {
787 		(void) memset(flowop, 0, sizeof (flowop_t));
788 		flowop->fo_iters = avd_int_alloc(1);
789 		flowop->fo_type = type;
790 		(void) pthread_mutex_init(&flowop->fo_lock,
791 		    ipc_mutexattr(IPC_MUTEX_PRI_ROB));
792 		(void) ipc_mutex_lock(&flowop->fo_lock);
793 	}
794 
795 	/* Create backpointer to thread */
796 	flowop->fo_thread = threadflow;
797 
798 	/* Add flowop to global list */
799 	if (filebench_shm->shm_flowoplist == NULL) {
800 		filebench_shm->shm_flowoplist = flowop;
801 		flowop->fo_next = NULL;
802 	} else {
803 		flowop->fo_next = filebench_shm->shm_flowoplist;
804 		filebench_shm->shm_flowoplist = flowop;
805 	}
806 
807 	(void) strcpy(flowop->fo_name, name);
808 	flowop->fo_instance = instance;
809 
810 	if (flowoplist_hdp == NULL)
811 		return (flowop);
812 
813 	/* Add flowop to thread op list */
814 	if (*flowoplist_hdp == NULL) {
815 		*flowoplist_hdp = flowop;
816 		flowop->fo_exec_next = NULL;
817 	} else {
818 		flowop_t *flowend;
819 
820 		/* Find the end of the thread list */
821 		flowend = *flowoplist_hdp;
822 		while (flowend->fo_exec_next != NULL)
823 			flowend = flowend->fo_exec_next;
824 		flowend->fo_exec_next = flowop;
825 		flowop->fo_exec_next = NULL;
826 	}
827 
828 	return (flowop);
829 }
830 
831 /*
832  * Calls flowop_define_common() to allocate and initialize a
833  * flowop, and holds the shared flowop_lock during the call.
834  * It releases the created flowop's fo_lock when done.
835  */
836 flowop_t *
837 flowop_define(threadflow_t *threadflow, char *name, flowop_t *inherit,
838     flowop_t **flowoplist_hdp, int instance, int type)
839 {
840 	flowop_t	*flowop;
841 
842 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
843 	flowop = flowop_define_common(threadflow, name,
844 	    inherit, flowoplist_hdp, instance, type);
845 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
846 
847 	if (flowop == NULL)
848 		return (NULL);
849 
850 	(void) ipc_mutex_unlock(&flowop->fo_lock);
851 
852 	return (flowop);
853 }
854 
855 /*
856  * Calls flowop_define_common() to allocate and initialize a
857  * composite flowop, and holds the shared flowop_lock during the call.
858  * It releases the created flowop's fo_lock when done.
859  */
860 flowop_t *
861 flowop_new_composite_define(char *name)
862 {
863 	flowop_t *flowop;
864 
865 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
866 	flowop = flowop_define_common(NULL, name,
867 	    NULL, NULL, 0, FLOW_TYPE_COMPOSITE);
868 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
869 
870 	if (flowop == NULL)
871 		return (NULL);
872 
873 	flowop->fo_func = flowop_composite;
874 	flowop->fo_init = flowop_composite_init;
875 	flowop->fo_destruct = flowop_composite_destruct;
876 	(void) ipc_mutex_unlock(&flowop->fo_lock);
877 
878 	return (flowop);
879 }
880 
881 /*
882  * Attempts to take a write lock on the flowop_find_lock that is
883  * defined in interprocess shared memory. Since each call to
884  * flowop_start() holds a read lock on flowop_find_lock, this
885  * routine effectively blocks until all instances of
886  * flowop_start() have finished. The flowop_find() routine calls
887  * this routine so that flowops won't be searched for until all
888  * flowops have been created by flowop_start.
889  */
890 static void
891 flowop_find_barrier(void)
892 {
893 	/* Block on wrlock to ensure find waits for all creates */
894 	(void) pthread_rwlock_wrlock(&filebench_shm->shm_flowop_find_lock);
895 	(void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
896 }
897 
898 /*
899  * Returns a list of flowops named "name" from the master
900  * flowop list.
901  */
902 flowop_t *
903 flowop_find(char *name)
904 {
905 	flowop_t *flowop;
906 	flowop_t *result = NULL;
907 
908 	flowop_find_barrier();
909 
910 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
911 
912 	flowop = filebench_shm->shm_flowoplist;
913 
914 	while (flowop) {
915 		if (strcmp(name, flowop->fo_name) == 0) {
916 
917 			/* Add flowop to result list */
918 			if (result == NULL) {
919 				result = flowop;
920 				flowop->fo_resultnext = NULL;
921 			} else {
922 				flowop->fo_resultnext = result;
923 				result = flowop;
924 			}
925 		}
926 		flowop = flowop->fo_next;
927 	}
928 
929 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
930 
931 
932 	return (result);
933 }
934 
935 /*
936  * Returns a pointer to the specified instance of flowop
937  * "name" from the global list.
938  */
939 flowop_t *
940 flowop_find_one(char *name, int instance)
941 {
942 	flowop_t *test_flowop;
943 
944 	flowop_find_barrier();
945 
946 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
947 
948 	test_flowop = filebench_shm->shm_flowoplist;
949 
950 	while (test_flowop) {
951 		if ((strcmp(name, test_flowop->fo_name) == 0) &&
952 		    (instance == test_flowop->fo_instance))
953 			break;
954 
955 		test_flowop = test_flowop->fo_next;
956 	}
957 
958 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
959 
960 	return (test_flowop);
961 }
962 
963 /*
964  * recursively searches through lists of flowops on a given thread
965  * and those on any included composite flowops for the named flowop.
966  * either returns with a pointer to the named flowop or NULL if it
967  * cannot be found.
968  */
969 static flowop_t *
970 flowop_recurse_search(char *path, char *name, flowop_t *list)
971 {
972 	flowop_t *test_flowop;
973 	char fullname[MAXPATHLEN];
974 
975 	test_flowop = list;
976 
977 	/*
978 	 * when searching a list of inner flowops, "path" is the fullname
979 	 * of the containing composite flowop. Use it to form the
980 	 * full name of the inner flowop to search for.
981 	 */
982 	if (path) {
983 		if ((strlen(path) + strlen(name) + 1) > MAXPATHLEN) {
984 			filebench_log(LOG_ERROR,
985 			    "composite flowop path name %s.%s too long",
986 			    path, name);
987 			return (NULL);
988 		}
989 
990 		/* create composite_name.name for recursive search */
991 		(void) strcpy(fullname, path);
992 		(void) strcat(fullname, ".");
993 		(void) strcat(fullname, name);
994 	} else {
995 		(void) strcpy(fullname, name);
996 	}
997 
998 	/*
999 	 * loop through all flowops on the supplied tf_thrd_fops (flowop)
1000 	 * list or fo_comp_fops (inner flowop) list.
1001 	 */
1002 	while (test_flowop) {
1003 		if (strcmp(fullname, test_flowop->fo_name) == 0)
1004 			return (test_flowop);
1005 
1006 		if (test_flowop->fo_type == FLOW_TYPE_COMPOSITE) {
1007 			flowop_t *found_flowop;
1008 
1009 			found_flowop = flowop_recurse_search(
1010 			    test_flowop->fo_name, name,
1011 			    test_flowop->fo_comp_fops);
1012 
1013 			if (found_flowop)
1014 				return (found_flowop);
1015 		}
1016 		test_flowop = test_flowop->fo_exec_next;
1017 	}
1018 
1019 	/* not found here or on any child lists */
1020 	return (NULL);
1021 }
1022 
1023 /*
1024  * Returns a pointer to flowop named "name" from the supplied tf_thrd_fops
1025  * list of flowops. Returns the named flowop if found, or NULL.
1026  */
1027 flowop_t *
1028 flowop_find_from_list(char *name, flowop_t *list)
1029 {
1030 	flowop_t *found_flowop;
1031 
1032 	flowop_find_barrier();
1033 
1034 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
1035 
1036 	found_flowop = flowop_recurse_search(NULL, name, list);
1037 
1038 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
1039 
1040 	return (found_flowop);
1041 }
1042 
1043 /*
1044  * Composite flowop method. Does one pass through its list of
1045  * inner flowops per iteration.
1046  */
1047 static int
1048 flowop_composite(threadflow_t *threadflow, flowop_t *flowop)
1049 {
1050 	flowop_t	*inner_flowop;
1051 
1052 	/* get the first flowop in the list */
1053 	inner_flowop = flowop->fo_comp_fops;
1054 
1055 	/* make a pass through the list of sub flowops */
1056 	while (inner_flowop) {
1057 		int	i, count;
1058 
1059 		/* Abort if asked */
1060 		if (threadflow->tf_abort || filebench_shm->shm_f_abort)
1061 			return (FILEBENCH_DONE);
1062 
1063 		if (inner_flowop->fo_stats.fs_stime == 0)
1064 			inner_flowop->fo_stats.fs_stime = gethrtime();
1065 
1066 		/* Execute the flowop for fo_iters times */
1067 		count = (int)avd_get_int(inner_flowop->fo_iters);
1068 		for (i = 0; i < count; i++) {
1069 
1070 			filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
1071 			    "%s-%d", threadflow->tf_name,
1072 			    inner_flowop->fo_name,
1073 			    inner_flowop->fo_instance);
1074 
1075 			switch ((*inner_flowop->fo_func)(threadflow,
1076 			    inner_flowop)) {
1077 
1078 			/* all done */
1079 			case FILEBENCH_DONE:
1080 				return (FILEBENCH_DONE);
1081 
1082 			/* quit if inner flowop limit reached */
1083 			case FILEBENCH_NORSC:
1084 				return (FILEBENCH_NORSC);
1085 
1086 			/* quit on inner flowop error */
1087 			case FILEBENCH_ERROR:
1088 				filebench_log(LOG_ERROR,
1089 				    "inner flowop %s failed",
1090 				    inner_flowop->fo_name);
1091 				return (FILEBENCH_ERROR);
1092 
1093 			/* otherwise keep going */
1094 			default:
1095 				break;
1096 			}
1097 
1098 		}
1099 
1100 		/* advance to next flowop */
1101 		inner_flowop = inner_flowop->fo_exec_next;
1102 	}
1103 
1104 	/* finished with this pass */
1105 	return (FILEBENCH_OK);
1106 }
1107 
1108 /*
1109  * Composite flowop initialization. Creates runtime inner flowops
1110  * from prototype inner flowops.
1111  */
1112 static int
1113 flowop_composite_init(flowop_t *flowop)
1114 {
1115 	int err;
1116 
1117 	err = flowop_create_runtime_flowops(flowop->fo_thread,
1118 	    &flowop->fo_comp_fops);
1119 	if (err != FILEBENCH_OK)
1120 		return (err);
1121 
1122 	(void) ipc_mutex_unlock(&flowop->fo_lock);
1123 	return (0);
1124 }
1125 
1126 /*
1127  * clean up inner flowops
1128  */
1129 static void
1130 flowop_composite_destruct(flowop_t *flowop)
1131 {
1132 	flowop_t *inner_flowop = flowop->fo_comp_fops;
1133 
1134 	while (inner_flowop) {
1135 		filebench_log(LOG_DEBUG_IMPL, "Deleting inner flowop (%s-%d)",
1136 		    inner_flowop->fo_name, inner_flowop->fo_instance);
1137 
1138 		if (inner_flowop->fo_instance &&
1139 		    (inner_flowop->fo_instance == FLOW_MASTER)) {
1140 			inner_flowop = inner_flowop->fo_exec_next;
1141 			continue;
1142 		}
1143 		flowop_delete(&flowop->fo_comp_fops, inner_flowop);
1144 		inner_flowop = inner_flowop->fo_exec_next;
1145 	}
1146 }
1147 
1148 /*
1149  * Support routines for libraries of flowops
1150  */
1151 
1152 int
1153 flowop_init_generic(flowop_t *flowop)
1154 {
1155 	(void) ipc_mutex_unlock(&flowop->fo_lock);
1156 	return (FILEBENCH_OK);
1157 }
1158 
1159 void
1160 flowop_destruct_generic(flowop_t *flowop)
1161 {
1162 	char *buf;
1163 
1164 	/* release any local resources held by the flowop */
1165 	(void) ipc_mutex_lock(&flowop->fo_lock);
1166 	buf = flowop->fo_buf;
1167 	flowop->fo_buf = NULL;
1168 	(void) ipc_mutex_unlock(&flowop->fo_lock);
1169 
1170 	if (buf)
1171 		free(buf);
1172 }
1173 
1174 
1175 /*
1176  * Loops through the supplied list of flowops and creates and initializes
1177  * a flowop for each one by calling flowop_define. As a side effect of
1178  * calling flowop define, the created flowops are placed on the
1179  * master flowop list. All created flowops are set to instance "0".
1180  */
1181 void
1182 flowop_flow_init(flowop_proto_t *list, int nops)
1183 {
1184 	int i;
1185 
1186 	for (i = 0; i < nops; i++) {
1187 		flowop_t *flowop;
1188 		flowop_proto_t *fl;
1189 
1190 		fl = &(list[i]);
1191 
1192 		if ((flowop = flowop_define(NULL,
1193 		    fl->fl_name, NULL, NULL, 0, fl->fl_type)) == 0) {
1194 			filebench_log(LOG_ERROR,
1195 			    "failed to create flowop %s\n",
1196 			    fl->fl_name);
1197 			filebench_shutdown(1);
1198 		}
1199 
1200 		flowop->fo_func = fl->fl_func;
1201 		flowop->fo_init = fl->fl_init;
1202 		flowop->fo_destruct = fl->fl_destruct;
1203 		flowop->fo_attrs = fl->fl_attrs;
1204 	}
1205 }
1206