xref: /onnv-gate/usr/src/cmd/filebench/common/flowop.c (revision 6391:f317d2de8920)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include "config.h"
29 
30 #ifdef HAVE_LWPS
31 #include <sys/lwp.h>
32 #endif
33 #include <fcntl.h>
34 #include "filebench.h"
35 #include "flowop.h"
36 #include "stats.h"
37 
38 #ifdef LINUX_PORT
39 #include <sys/types.h>
40 #include <linux/unistd.h>
41 #endif
42 
43 static flowop_t *flowop_define_common(threadflow_t *threadflow, char *name,
44     flowop_t *inherit, int instance, int type);
45 
46 /*
47  * A collection of flowop support functions. The actual code that
48  * implements the various flowops is in flowop_library.c.
49  *
50  * Routines for defining, creating, initializing and destroying
51  * flowops, cyclically invoking the flowops on each threadflow's flowop
52  * list, collecting statistics about flowop execution, and other
53  * housekeeping duties are included in this file.
54  */
55 
56 
57 /*
58  * Prints the name and instance number of each flowop in
59  * the supplied list to the filebench log.
60  */
61 int
62 flowop_printlist(flowop_t *list)
63 {
64 	flowop_t *flowop = list;
65 
66 	while (flowop) {
67 		filebench_log(LOG_DEBUG_IMPL, "flowop-list %s-%d",
68 		    flowop->fo_name, flowop->fo_instance);
69 		flowop = flowop->fo_threadnext;
70 	}
71 	return (0);
72 }
73 
74 /*
75  * Prints the name and instance number of all flowops on
76  * the master flowop list to the console and the filebench log.
77  */
78 void
79 flowop_printall(void)
80 {
81 	flowop_t *flowop = filebench_shm->shm_flowoplist;
82 
83 	while (flowop) {
84 		filebench_log(LOG_VERBOSE, "flowop-list %s-%d",
85 		    flowop->fo_name, flowop->fo_instance);
86 		flowop = flowop->fo_next;
87 	}
88 }
89 
90 #define	TIMESPEC_TO_HRTIME(s, e) (((e.tv_sec - s.tv_sec) * 1000000000LL) + \
91 					(e.tv_nsec - s.tv_nsec))
92 /*
93  * Puts current high resolution time in start time entry
94  * for threadflow and may also calculate running filebench
95  * overhead statistics.
96  */
97 void
98 flowop_beginop(threadflow_t *threadflow, flowop_t *flowop)
99 {
100 #ifdef HAVE_PROCFS
101 	if ((noproc == 0) && (threadflow->tf_lwpusagefd == 0)) {
102 		char procname[128];
103 
104 		(void) snprintf(procname, sizeof (procname),
105 		    "/proc/%d/lwp/%d/lwpusage", my_pid, _lwp_self());
106 		threadflow->tf_lwpusagefd = open(procname, O_RDONLY);
107 	}
108 
109 	(void) pread(threadflow->tf_lwpusagefd,
110 	    &threadflow->tf_susage,
111 	    sizeof (struct prusage), 0);
112 
113 	/* Compute overhead time in this thread around op */
114 	if (threadflow->tf_eusage.pr_stime.tv_nsec) {
115 		flowop->fo_stats.fs_mstate[FLOW_MSTATE_OHEAD] +=
116 		    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_utime,
117 		    threadflow->tf_susage.pr_utime) +
118 		    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_ttime,
119 		    threadflow->tf_susage.pr_ttime) +
120 		    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_stime,
121 		    threadflow->tf_susage.pr_stime);
122 	}
123 #endif
124 	/* Start of op for this thread */
125 	threadflow->tf_stime = gethrtime();
126 }
127 
128 flowstat_t controlstats;
129 pthread_mutex_t controlstats_lock;
130 static int controlstats_zeroed = 0;
131 
132 /*
133  * Updates flowop's latency statistics, using saved start
134  * time and current high resolution time. Updates flowop's
135  * io count and transferred bytes statistics. Also updates
136  * threadflow's and flowop's cumulative read or write byte
137  * and io count statistics.
138  */
139 void
140 flowop_endop(threadflow_t *threadflow, flowop_t *flowop, int64_t bytes)
141 {
142 	hrtime_t t;
143 
144 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_LAT] +=
145 	    (gethrtime() - threadflow->tf_stime);
146 #ifdef HAVE_PROCFS
147 	if ((pread(threadflow->tf_lwpusagefd, &threadflow->tf_eusage,
148 	    sizeof (struct prusage), 0)) != sizeof (struct prusage))
149 		filebench_log(LOG_ERROR, "cannot read /proc");
150 
151 	t =
152 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_utime,
153 	    threadflow->tf_eusage.pr_utime) +
154 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_ttime,
155 	    threadflow->tf_eusage.pr_ttime) +
156 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_stime,
157 	    threadflow->tf_eusage.pr_stime);
158 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_CPU] += t;
159 
160 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_WAIT] +=
161 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_tftime,
162 	    threadflow->tf_eusage.pr_tftime) +
163 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_dftime,
164 	    threadflow->tf_eusage.pr_dftime) +
165 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
166 	    threadflow->tf_eusage.pr_kftime) +
167 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
168 	    threadflow->tf_eusage.pr_kftime) +
169 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_slptime,
170 	    threadflow->tf_eusage.pr_slptime);
171 #endif
172 
173 	flowop->fo_stats.fs_count++;
174 	flowop->fo_stats.fs_bytes += bytes;
175 	(void) ipc_mutex_lock(&controlstats_lock);
176 	if ((flowop->fo_type & FLOW_TYPE_IO) ||
177 	    (flowop->fo_type & FLOW_TYPE_AIO)) {
178 		controlstats.fs_count++;
179 		controlstats.fs_bytes += bytes;
180 	}
181 	if (flowop->fo_attrs & FLOW_ATTR_READ) {
182 		threadflow->tf_stats.fs_rbytes += bytes;
183 		threadflow->tf_stats.fs_rcount++;
184 		flowop->fo_stats.fs_rcount++;
185 		controlstats.fs_rbytes += bytes;
186 		controlstats.fs_rcount++;
187 	} else if (flowop->fo_attrs & FLOW_ATTR_WRITE) {
188 		threadflow->tf_stats.fs_wbytes += bytes;
189 		threadflow->tf_stats.fs_wcount++;
190 		flowop->fo_stats.fs_wcount++;
191 		controlstats.fs_wbytes += bytes;
192 		controlstats.fs_wcount++;
193 	}
194 	(void) ipc_mutex_unlock(&controlstats_lock);
195 }
196 
197 /*
198  * Calls the flowop's initialization function, pointed to by
199  * flowop->fo_init.
200  */
201 static int
202 flowop_initflow(flowop_t *flowop)
203 {
204 	/*
205 	 * save static copies of two items, in case they are supplied
206 	 * from random variables
207 	 */
208 	flowop->fo_constvalue = avd_get_int(flowop->fo_value);
209 	flowop->fo_constwss = avd_get_int(flowop->fo_wss);
210 
211 	if ((*flowop->fo_init)(flowop) < 0) {
212 		filebench_log(LOG_ERROR, "flowop %s-%d init failed",
213 		    flowop->fo_name, flowop->fo_instance);
214 		return (-1);
215 	}
216 	return (0);
217 }
218 
219 /*
220  * Calls the flowop's destruct function, pointed to by
221  * flowop->fo_destruct.
222  */
223 static void
224 flowop_destructflow(flowop_t *flowop)
225 {
226 	(*flowop->fo_destruct)(flowop);
227 }
228 
229 /*
230  * call the destruct funtions of all the threadflow's flowops,
231  * if it is still flagged as "running".
232  */
233 void
234 flowop_destruct_all_flows(threadflow_t *threadflow)
235 {
236 	flowop_t *flowop;
237 
238 	(void) ipc_mutex_lock(&threadflow->tf_lock);
239 
240 	/* prepare to call destruct flow routines, if necessary */
241 	if (threadflow->tf_running == 0) {
242 
243 		/* allready destroyed */
244 		(void) ipc_mutex_unlock(&threadflow->tf_lock);
245 		return;
246 	}
247 
248 	flowop = threadflow->tf_ops;
249 	threadflow->tf_running = 0;
250 	(void) ipc_mutex_unlock(&threadflow->tf_lock);
251 
252 	while (flowop) {
253 		flowop_destructflow(flowop);
254 		flowop = flowop->fo_threadnext;
255 	}
256 }
257 
258 /*
259  * The final initialization and main execution loop for the
260  * worker threads. Sets threadflow and flowop start times,
261  * waits for all process to start, then creates the runtime
262  * flowops from those defined by the F language workload
263  * script. It does some more initialization, then enters a
264  * loop to repeatedly execute the flowops on the flowop list
265  * until an abort condition is detected, at which time it exits.
266  * This is the starting routine for the new worker thread
267  * created by threadflow_createthread(), and is not currently
268  * called from anywhere else.
269  */
270 void
271 flowop_start(threadflow_t *threadflow)
272 {
273 	flowop_t *flowop;
274 	size_t memsize;
275 	int ret = 0;
276 
277 #ifdef HAVE_PROCFS
278 	if (noproc == 0) {
279 		char procname[128];
280 		long ctl[2] = {PCSET, PR_MSACCT};
281 		int pfd;
282 
283 		(void) snprintf(procname, sizeof (procname),
284 		    "/proc/%d/lwp/%d/lwpctl", my_pid, _lwp_self());
285 		pfd = open(procname, O_WRONLY);
286 		(void) pwrite(pfd, &ctl, sizeof (ctl), 0);
287 		(void) close(pfd);
288 	}
289 #endif
290 
291 	(void) ipc_mutex_lock(&controlstats_lock);
292 	if (!controlstats_zeroed) {
293 		(void) memset(&controlstats, 0, sizeof (controlstats));
294 		controlstats_zeroed = 1;
295 	}
296 	(void) ipc_mutex_unlock(&controlstats_lock);
297 
298 	flowop = threadflow->tf_ops;
299 	threadflow->tf_stats.fs_stime = gethrtime();
300 	flowop->fo_stats.fs_stime = gethrtime();
301 
302 	/* Hold the flowop find lock as reader to prevent lookups */
303 	(void) pthread_rwlock_rdlock(&filebench_shm->shm_flowop_find_lock);
304 
305 	/*
306 	 * Block until all processes have started, acting like
307 	 * a barrier. The original filebench process initially
308 	 * holds the run_lock as a reader, preventing any of the
309 	 * threads from obtaining the writer lock, and hence
310 	 * passing this point. Once all processes and threads
311 	 * have been created, the original process unlocks
312 	 * run_lock, allowing each waiting thread to lock
313 	 * and then immediately unlock it, then begin running.
314 	 */
315 	(void) pthread_rwlock_wrlock(&filebench_shm->shm_run_lock);
316 	(void) pthread_rwlock_unlock(&filebench_shm->shm_run_lock);
317 
318 	/* Create the runtime flowops from those defined by the script */
319 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
320 	while (flowop) {
321 		flowop_t *newflowop;
322 
323 		if (flowop == threadflow->tf_ops)
324 			threadflow->tf_ops = NULL;
325 		newflowop = flowop_define_common(threadflow, flowop->fo_name,
326 		    flowop, 1, 0);
327 		if (newflowop == NULL)
328 			return;
329 
330 		/* check for fo_filename attribute, and resolve if present */
331 		if (flowop->fo_filename) {
332 			char *name;
333 
334 			name = avd_get_str(flowop->fo_filename);
335 			newflowop->fo_fileset = fileset_find(name);
336 
337 			if (newflowop->fo_fileset == NULL) {
338 				filebench_log(LOG_ERROR,
339 				    "flowop %s: file %s not found",
340 				    newflowop->fo_name, name);
341 				filebench_shutdown(1);
342 			}
343 		}
344 
345 		if (flowop_initflow(newflowop) < 0) {
346 			filebench_log(LOG_ERROR, "Flowop init of %s failed",
347 			    newflowop->fo_name);
348 		}
349 		flowop = flowop->fo_threadnext;
350 	}
351 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
352 
353 	/* Release the find lock as reader to allow lookups */
354 	(void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
355 
356 	/* Set to the start of the new flowop list */
357 	flowop = threadflow->tf_ops;
358 
359 	threadflow->tf_abort = 0;
360 	threadflow->tf_running = 1;
361 
362 	memsize = (size_t)threadflow->tf_constmemsize;
363 
364 	/* If we are going to use ISM, allocate later */
365 	if (threadflow->tf_attrs & THREADFLOW_USEISM) {
366 		threadflow->tf_mem =
367 		    ipc_ismmalloc(memsize);
368 	} else {
369 		threadflow->tf_mem =
370 		    malloc(memsize);
371 	}
372 
373 	(void) memset(threadflow->tf_mem, 0, memsize);
374 	filebench_log(LOG_DEBUG_SCRIPT, "Thread allocated %d bytes", memsize);
375 
376 #ifdef HAVE_LWPS
377 	filebench_log(LOG_DEBUG_SCRIPT, "Thread %zx (%d) started",
378 	    threadflow,
379 	    _lwp_self());
380 #endif
381 
382 	/* Main filebench worker loop */
383 	/* CONSTCOND */
384 	while (1) {
385 		int i, count;
386 
387 		/* Abort if asked */
388 		if (threadflow->tf_abort || filebench_shm->shm_f_abort)
389 			break;
390 
391 		/* Be quiet while stats are gathered */
392 		if (filebench_shm->shm_bequiet) {
393 			(void) sleep(1);
394 			continue;
395 		}
396 
397 		/* Take it easy until everyone is ready to go */
398 		if (!filebench_shm->shm_running) {
399 			(void) sleep(1);
400 			continue;
401 		}
402 
403 		if (flowop == NULL) {
404 			filebench_log(LOG_ERROR, "flowop_read null flowop");
405 			return;
406 		}
407 
408 		if (flowop->fo_stats.fs_stime == 0)
409 			flowop->fo_stats.fs_stime = gethrtime();
410 
411 		/* Execute the flowop for fo_iters times */
412 		count = avd_get_int(flowop->fo_iters);
413 		for (i = 0; i < count; i++) {
414 
415 			filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
416 			    "%s-%d", threadflow->tf_name, flowop->fo_name,
417 			    flowop->fo_instance);
418 
419 			ret = (*flowop->fo_func)(threadflow, flowop);
420 
421 			/*
422 			 * Return value FILEBENCH_ERROR means "flowop
423 			 * failed, stop the filebench run"
424 			 */
425 			if (ret == FILEBENCH_ERROR) {
426 				filebench_log(LOG_ERROR,
427 				    "%s-%d: flowop %s-%d failed",
428 				    threadflow->tf_name,
429 				    threadflow->tf_instance,
430 				    flowop->fo_name,
431 				    flowop->fo_instance);
432 				(void) ipc_mutex_lock(&threadflow->tf_lock);
433 				threadflow->tf_abort = 1;
434 				filebench_shm->shm_f_abort =
435 				    FILEBENCH_ABORT_ERROR;
436 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
437 				break;
438 			}
439 
440 			/*
441 			 * Return value of FILEBENCH_NORSC means "stop
442 			 * the filebench run" if in "end on no work mode",
443 			 * otherwise it indicates an error
444 			 */
445 			if (ret == FILEBENCH_NORSC) {
446 				(void) ipc_mutex_lock(&threadflow->tf_lock);
447 				threadflow->tf_abort = FILEBENCH_DONE;
448 				if (filebench_shm->shm_rmode ==
449 				    FILEBENCH_MODE_Q1STDONE) {
450 					filebench_shm->shm_f_abort =
451 					    FILEBENCH_ABORT_RSRC;
452 				} else if (filebench_shm->shm_rmode !=
453 				    FILEBENCH_MODE_QALLDONE) {
454 					filebench_log(LOG_ERROR1,
455 					    "WARNING! Run stopped early:\n   "
456 					    "             flowop %s-%d could "
457 					    "not obtain a file. Please\n      "
458 					    "          reduce runtime, "
459 					    "increase fileset entries "
460 					    "($nfiles), or switch modes.",
461 					    flowop->fo_name,
462 					    flowop->fo_instance);
463 					filebench_shm->shm_f_abort =
464 					    FILEBENCH_ABORT_ERROR;
465 				}
466 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
467 				break;
468 			}
469 
470 			/*
471 			 * Return value of FILEBENCH_DONE means "stop
472 			 * the filebench run without error"
473 			 */
474 			if (ret == FILEBENCH_DONE) {
475 				(void) ipc_mutex_lock(&threadflow->tf_lock);
476 				threadflow->tf_abort = FILEBENCH_DONE;
477 				filebench_shm->shm_f_abort =
478 				    FILEBENCH_ABORT_DONE;
479 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
480 				break;
481 			}
482 
483 			/*
484 			 * If we get here and the return is something other
485 			 * than FILEBENCH_OK, it means a spurious code
486 			 * was returned, so treat as major error. This
487 			 * probably indicates a bug in the flowop.
488 			 */
489 			if (ret != FILEBENCH_OK) {
490 				filebench_log(LOG_ERROR,
491 				    "Flowop %s unexpected return value = %d\n",
492 				    flowop->fo_name, ret);
493 				filebench_shm->shm_f_abort =
494 				    FILEBENCH_ABORT_ERROR;
495 				break;
496 			}
497 		}
498 
499 		/* advance to next flowop */
500 		flowop = flowop->fo_threadnext;
501 
502 		/* but if at end of list, start over from the beginning */
503 		if (flowop == NULL) {
504 			flowop = threadflow->tf_ops;
505 			threadflow->tf_stats.fs_count++;
506 		}
507 	}
508 
509 #ifdef HAVE_LWPS
510 	filebench_log(LOG_DEBUG_SCRIPT, "Thread %d exiting",
511 	    _lwp_self());
512 #endif
513 
514 	/* Tell flowops to destroy locally acquired state */
515 	flowop_destruct_all_flows(threadflow);
516 
517 	pthread_exit(&threadflow->tf_abort);
518 }
519 
520 void
521 flowop_init(void)
522 {
523 	flowoplib_init();
524 }
525 
526 /*
527  * Delete the designated flowop from the thread's flowop list.
528  */
529 static void
530 flowop_delete(flowop_t **flowoplist, flowop_t *flowop)
531 {
532 	flowop_t *entry = *flowoplist;
533 	int found = 0;
534 
535 	filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
536 	    flowop->fo_name,
537 	    flowop->fo_instance);
538 
539 	/* Delete from thread's flowop list */
540 	if (flowop == *flowoplist) {
541 		/* First on list */
542 		*flowoplist = flowop->fo_threadnext;
543 		filebench_log(LOG_DEBUG_IMPL,
544 		    "Delete0 flowop: (%s-%d)",
545 		    flowop->fo_name,
546 		    flowop->fo_instance);
547 	} else {
548 		while (entry->fo_threadnext) {
549 			filebench_log(LOG_DEBUG_IMPL,
550 			    "Delete0 flowop: (%s-%d) == (%s-%d)",
551 			    entry->fo_threadnext->fo_name,
552 			    entry->fo_threadnext->fo_instance,
553 			    flowop->fo_name,
554 			    flowop->fo_instance);
555 
556 			if (flowop == entry->fo_threadnext) {
557 				/* Delete */
558 				filebench_log(LOG_DEBUG_IMPL,
559 				    "Deleted0 flowop: (%s-%d)",
560 				    entry->fo_threadnext->fo_name,
561 				    entry->fo_threadnext->fo_instance);
562 				entry->fo_threadnext =
563 				    entry->fo_threadnext->fo_threadnext;
564 				break;
565 			}
566 			entry = entry->fo_threadnext;
567 		}
568 	}
569 
570 #ifdef HAVE_PROCFS
571 	/* Close /proc stats */
572 	if (flowop->fo_thread)
573 		(void) close(flowop->fo_thread->tf_lwpusagefd);
574 #endif
575 
576 	/* Delete from global list */
577 	entry = filebench_shm->shm_flowoplist;
578 
579 	if (flowop == filebench_shm->shm_flowoplist) {
580 		/* First on list */
581 		filebench_shm->shm_flowoplist = flowop->fo_next;
582 		found = 1;
583 	} else {
584 		while (entry->fo_next) {
585 			filebench_log(LOG_DEBUG_IMPL,
586 			    "Delete flowop: (%s-%d) == (%s-%d)",
587 			    entry->fo_next->fo_name,
588 			    entry->fo_next->fo_instance,
589 			    flowop->fo_name,
590 			    flowop->fo_instance);
591 
592 			if (flowop == entry->fo_next) {
593 				/* Delete */
594 				entry->fo_next = entry->fo_next->fo_next;
595 				found = 1;
596 				break;
597 			}
598 
599 			entry = entry->fo_next;
600 		}
601 	}
602 	if (found) {
603 		filebench_log(LOG_DEBUG_IMPL,
604 		    "Deleted flowop: (%s-%d)",
605 		    flowop->fo_name,
606 		    flowop->fo_instance);
607 		ipc_free(FILEBENCH_FLOWOP, (char *)flowop);
608 	} else {
609 		filebench_log(LOG_DEBUG_IMPL, "Flowop %s-%d not found!",
610 		    flowop->fo_name,
611 		    flowop->fo_instance);
612 	}
613 }
614 
615 /*
616  * Deletes all the flowops from a flowop list.
617  */
618 void
619 flowop_delete_all(flowop_t **flowoplist)
620 {
621 	flowop_t *flowop = *flowoplist;
622 
623 	filebench_log(LOG_DEBUG_IMPL, "Deleting all flowops...");
624 	while (flowop) {
625 		filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
626 		    flowop->fo_name, flowop->fo_instance);
627 		flowop = flowop->fo_threadnext;
628 	}
629 
630 	flowop = *flowoplist;
631 
632 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
633 
634 	while (flowop) {
635 		if (flowop->fo_instance &&
636 		    (flowop->fo_instance == FLOW_MASTER)) {
637 			flowop = flowop->fo_threadnext;
638 			continue;
639 		}
640 		flowop_delete(flowoplist, flowop);
641 		flowop = flowop->fo_threadnext;
642 	}
643 
644 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
645 }
646 
647 /*
648  * Allocates a flowop entity and initializes it with inherited
649  * contents from the "inherit" flowop, if it is supplied, or
650  * with zeros otherwise. In either case the file descriptor
651  * (fo_fd) is set to -1, and the fo_next and fo_threadnext
652  * pointers are set to NULL, and fo_thread is set to point to
653  * the owning threadflow. The initialized flowop is placed at
654  * the head of the global flowop list, and also placed on the
655  * tail of thethreadflow's tf_ops list. The routine locks the
656  * flowop's fo_lock and leaves it held on return. If successful,
657  * it returns a pointer to the allocated and initialized flowop,
658  * otherwise NULL.
659  *
660  * filebench_shm->shm_flowop_lock must be held by caller.
661  */
662 static flowop_t *
663 flowop_define_common(threadflow_t *threadflow, char *name, flowop_t *inherit,
664     int instance, int type)
665 {
666 	flowop_t *flowop;
667 
668 	if (name == NULL)
669 		return (NULL);
670 
671 	if ((flowop = (flowop_t *)ipc_malloc(FILEBENCH_FLOWOP)) == NULL) {
672 		filebench_log(LOG_ERROR,
673 		    "flowop_define: Can't malloc flowop");
674 		return (NULL);
675 	}
676 
677 	filebench_log(LOG_DEBUG_IMPL, "defining flowops %s-%d, addr %zx",
678 	    name, instance, flowop);
679 
680 	if (flowop == NULL)
681 		return (NULL);
682 
683 	if (inherit) {
684 		(void) memcpy(flowop, inherit, sizeof (flowop_t));
685 		(void) pthread_mutex_init(&flowop->fo_lock, ipc_mutexattr());
686 		(void) ipc_mutex_lock(&flowop->fo_lock);
687 		flowop->fo_next = NULL;
688 		flowop->fo_threadnext = NULL;
689 		filebench_log(LOG_DEBUG_IMPL,
690 		    "flowop %s-%d calling init", name, instance);
691 	} else {
692 		(void) memset(flowop, 0, sizeof (flowop_t));
693 		flowop->fo_iters = avd_int_alloc(1);
694 		flowop->fo_type = type;
695 		(void) pthread_mutex_init(&flowop->fo_lock, ipc_mutexattr());
696 		(void) ipc_mutex_lock(&flowop->fo_lock);
697 	}
698 
699 	/* Create backpointer to thread */
700 	flowop->fo_thread = threadflow;
701 
702 	/* Add flowop to global list */
703 	if (filebench_shm->shm_flowoplist == NULL) {
704 		filebench_shm->shm_flowoplist = flowop;
705 		flowop->fo_next = NULL;
706 	} else {
707 		flowop->fo_next = filebench_shm->shm_flowoplist;
708 		filebench_shm->shm_flowoplist = flowop;
709 	}
710 
711 	(void) strcpy(flowop->fo_name, name);
712 	flowop->fo_instance = instance;
713 
714 	if (threadflow == NULL)
715 		return (flowop);
716 
717 	/* Add flowop to thread op list */
718 	if (threadflow->tf_ops == NULL) {
719 		threadflow->tf_ops = flowop;
720 		flowop->fo_threadnext = NULL;
721 	} else {
722 		flowop_t *flowend;
723 
724 		/* Find the end of the thread list */
725 		flowend = threadflow->tf_ops;
726 		while (flowend->fo_threadnext != NULL)
727 			flowend = flowend->fo_threadnext;
728 		flowend->fo_threadnext = flowop;
729 		flowop->fo_threadnext = NULL;
730 	}
731 
732 	return (flowop);
733 }
734 
735 /*
736  * Calls flowop_define_common() to allocate and initialize a
737  * flowop, and holds the shared flowop_lock during the call.
738  * It releases the created flowop's fo_lock when done.
739  */
740 flowop_t *
741 flowop_define(threadflow_t *threadflow, char *name, flowop_t *inherit,
742     int instance, int type)
743 {
744 	flowop_t	*flowop;
745 
746 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
747 	flowop = flowop_define_common(threadflow, name,
748 	    inherit, instance, type);
749 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
750 
751 	if (flowop == NULL)
752 		return (NULL);
753 
754 	(void) ipc_mutex_unlock(&flowop->fo_lock);
755 
756 	return (flowop);
757 }
758 
759 /*
760  * Attempts to take a write lock on the flowop_find_lock that is
761  * defined in interprocess shared memory. Since each call to
762  * flowop_start() holds a read lock on flowop_find_lock, this
763  * routine effectively blocks until all instances of
764  * flowop_start() have finished. The flowop_find() routine calls
765  * this routine so that flowops won't be searched for until all
766  * flowops have been created by flowop_start.
767  */
768 static void
769 flowop_find_barrier(void)
770 {
771 	/* Block on wrlock to ensure find waits for all creates */
772 	(void) pthread_rwlock_wrlock(&filebench_shm->shm_flowop_find_lock);
773 	(void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
774 }
775 
776 /*
777  * Returns a list of flowops named "name" from the master
778  * flowop list.
779  */
780 flowop_t *
781 flowop_find(char *name)
782 {
783 	flowop_t *flowop;
784 	flowop_t *result = NULL;
785 
786 	flowop_find_barrier();
787 
788 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
789 
790 	flowop = filebench_shm->shm_flowoplist;
791 
792 	while (flowop) {
793 		if (strcmp(name, flowop->fo_name) == 0) {
794 
795 			/* Add flowop to result list */
796 			if (result == NULL) {
797 				result = flowop;
798 				flowop->fo_resultnext = NULL;
799 			} else {
800 				flowop->fo_resultnext = result;
801 				result = flowop;
802 			}
803 		}
804 		flowop = flowop->fo_next;
805 	}
806 
807 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
808 
809 
810 	return (result);
811 }
812 
813 /*
814  * Returns a pointer to the specified instance of flowop
815  * "name" from the supplied list.
816  */
817 flowop_t *
818 flowop_find_one(char *name, int instance)
819 {
820 	flowop_t *test_flowop;
821 
822 	flowop_find_barrier();
823 
824 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
825 
826 	test_flowop = filebench_shm->shm_flowoplist;
827 
828 	while (test_flowop) {
829 		if ((strcmp(name, test_flowop->fo_name) == 0) &&
830 		    (instance == test_flowop->fo_instance))
831 			break;
832 
833 		test_flowop = test_flowop->fo_next;
834 	}
835 
836 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
837 
838 	return (test_flowop);
839 }
840