xref: /onnv-gate/usr/src/cmd/filebench/common/flowop.c (revision 9356:2ff1c33b24c1)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #include "config.h"
27 
28 #ifdef HAVE_LWPS
29 #include <sys/lwp.h>
30 #endif
31 #include <fcntl.h>
32 #include "filebench.h"
33 #include "flowop.h"
34 #include "stats.h"
35 
36 #ifdef LINUX_PORT
37 #include <sys/types.h>
38 #include <linux/unistd.h>
39 #endif
40 
41 static flowop_t *flowop_define_common(threadflow_t *threadflow, char *name,
42     flowop_t *inherit, flowop_t **flowoplist_hdp, int instance, int type);
43 static int flowop_composite(threadflow_t *threadflow, flowop_t *flowop);
44 static int flowop_composite_init(flowop_t *flowop);
45 static void flowop_composite_destruct(flowop_t *flowop);
46 
47 /*
48  * A collection of flowop support functions. The actual code that
49  * implements the various flowops is in flowop_library.c.
50  *
51  * Routines for defining, creating, initializing and destroying
52  * flowops, cyclically invoking the flowops on each threadflow's flowop
53  * list, collecting statistics about flowop execution, and other
54  * housekeeping duties are included in this file.
55  *
56  * User Defined Composite Flowops
57  *    The ability to define new flowops as lists of built-in or previously
58  * defined flowops has been added to Filebench. In a sense they are like
59  * in-line subroutines, which can have default attributes set at definition
60  * time and passed arguments at invocation time. Like other flowops (and
61  * unlike conventional subroutines) you can invoke them with an iteration
62  * count (the "iter" attribute), and they will loop through their associated
63  * list of flowops for "iter" number of times each time they are encountered
64  * in the thread or outer composite flowop which invokes them.
65  *
66  * Composite flowops are created with a "define" command, are given a name,
67  * optional default attributes, and local variable definitions on the
68  * "define" command line, followed by a brace enclosed list of flowops
69  * to execute. The enclosed flowops may include attributes that reference
70  * the local variables, as well as constants and global variables.
71  *
72  * Composite flowops are used pretty much like regular flowops, but you can
73  * also set local variables to constants or global variables ($local_var =
74  * [$var | $random_var | string | boolean | integer | double]) as part of
75  * the invocation. Thus each invocation can pass customized values to its
76  * inner flowops, greatly increasing their generality.
77  *
78  * All flowops are placed on a global, singly linked list, with fo_next
79  * being the link pointer for this list. The are also placed on a private
80  * list for the thread or composite flowop they are part of. The tf_thrd_fops
81  * pointer in the thread will point to the list of top level flowops in the
82  * thread, which are linked together by fo_exec_next. If any of these flowops
83  * are composite flowops, they will have a list of second level flowops rooted
84  * at the composite flowop's fo_comp_fops pointer. So, there is one big list
85  * of all flowops, and an n-arry tree of threads, composite flowops, and
86  * flowops, with composite flowops being the branch nodes in the tree.
87  *
88  * To illustrate, if we have three first level flowops, the first of which is
89  * a composite flowop consisting of two other flowops, we get:
90  *
91  * Thread->tf_thrd_fops -> flowop->fo_exec_next -> flowop->fo_exec_next
92  *			   flowop->fo_comp_fops		    |
93  *				    |			    V
94  *				    |			flowop->fo_exec_next
95  *				    |
96  *				    V
97  *				flowop->fo_exec_next -> flowop->fo_exec_next
98  *
99  * And all five flowops (plus others from any other threads) are on a global
100  * list linked with fo_next.
101  */
102 
103 /*
104  * Prints the name and instance number of each flowop in
105  * the supplied list to the filebench log.
106  */
107 int
flowop_printlist(flowop_t * list)108 flowop_printlist(flowop_t *list)
109 {
110 	flowop_t *flowop = list;
111 
112 	while (flowop) {
113 		filebench_log(LOG_DEBUG_IMPL, "flowop-list %s-%d",
114 		    flowop->fo_name, flowop->fo_instance);
115 		flowop = flowop->fo_exec_next;
116 	}
117 	return (0);
118 }
119 
120 /*
121  * Prints the name and instance number of all flowops on
122  * the master flowop list to the console and the filebench log.
123  */
124 void
flowop_printall(void)125 flowop_printall(void)
126 {
127 	flowop_t *flowop = filebench_shm->shm_flowoplist;
128 
129 	while (flowop) {
130 		filebench_log(LOG_VERBOSE, "flowop-list %s-%d",
131 		    flowop->fo_name, flowop->fo_instance);
132 		flowop = flowop->fo_next;
133 	}
134 }
135 
136 #define	TIMESPEC_TO_HRTIME(s, e) (((e.tv_sec - s.tv_sec) * 1000000000LL) + \
137 					(e.tv_nsec - s.tv_nsec))
138 /*
139  * Puts current high resolution time in start time entry
140  * for threadflow and may also calculate running filebench
141  * overhead statistics.
142  */
143 void
flowop_beginop(threadflow_t * threadflow,flowop_t * flowop)144 flowop_beginop(threadflow_t *threadflow, flowop_t *flowop)
145 {
146 #ifdef HAVE_PROCFS
147 	if ((filebench_shm->shm_mmode & FILEBENCH_MODE_NOUSAGE) == 0) {
148 		if ((noproc == 0) && (threadflow->tf_lwpusagefd == 0)) {
149 			char procname[128];
150 
151 			(void) snprintf(procname, sizeof (procname),
152 			    "/proc/%d/lwp/%d/lwpusage", my_pid, _lwp_self());
153 			threadflow->tf_lwpusagefd = open(procname, O_RDONLY);
154 		}
155 
156 		(void) pread(threadflow->tf_lwpusagefd,
157 		    &threadflow->tf_susage,
158 		    sizeof (struct prusage), 0);
159 
160 		/* Compute overhead time in this thread around op */
161 		if (threadflow->tf_eusage.pr_stime.tv_nsec) {
162 			flowop->fo_stats.fs_mstate[FLOW_MSTATE_OHEAD] +=
163 			    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_utime,
164 			    threadflow->tf_susage.pr_utime) +
165 			    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_ttime,
166 			    threadflow->tf_susage.pr_ttime) +
167 			    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_stime,
168 			    threadflow->tf_susage.pr_stime);
169 		}
170 	}
171 #endif
172 
173 	/* Start of op for this thread */
174 	threadflow->tf_stime = gethrtime();
175 }
176 
177 flowstat_t controlstats;
178 pthread_mutex_t controlstats_lock;
179 static int controlstats_zeroed = 0;
180 
181 /*
182  * Updates flowop's latency statistics, using saved start
183  * time and current high resolution time. Updates flowop's
184  * io count and transferred bytes statistics. Also updates
185  * threadflow's and flowop's cumulative read or write byte
186  * and io count statistics.
187  */
188 void
flowop_endop(threadflow_t * threadflow,flowop_t * flowop,int64_t bytes)189 flowop_endop(threadflow_t *threadflow, flowop_t *flowop, int64_t bytes)
190 {
191 	hrtime_t t;
192 
193 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_LAT] +=
194 	    (gethrtime() - threadflow->tf_stime);
195 #ifdef HAVE_PROCFS
196 	if ((filebench_shm->shm_mmode & FILEBENCH_MODE_NOUSAGE) == 0) {
197 		if ((pread(threadflow->tf_lwpusagefd, &threadflow->tf_eusage,
198 		    sizeof (struct prusage), 0)) != sizeof (struct prusage))
199 			filebench_log(LOG_ERROR, "cannot read /proc");
200 
201 		t =
202 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_utime,
203 		    threadflow->tf_eusage.pr_utime) +
204 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_ttime,
205 		    threadflow->tf_eusage.pr_ttime) +
206 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_stime,
207 		    threadflow->tf_eusage.pr_stime);
208 		flowop->fo_stats.fs_mstate[FLOW_MSTATE_CPU] += t;
209 
210 		flowop->fo_stats.fs_mstate[FLOW_MSTATE_WAIT] +=
211 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_tftime,
212 		    threadflow->tf_eusage.pr_tftime) +
213 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_dftime,
214 		    threadflow->tf_eusage.pr_dftime) +
215 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
216 		    threadflow->tf_eusage.pr_kftime) +
217 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
218 		    threadflow->tf_eusage.pr_kftime) +
219 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_slptime,
220 		    threadflow->tf_eusage.pr_slptime);
221 	}
222 #endif
223 
224 	flowop->fo_stats.fs_count++;
225 	flowop->fo_stats.fs_bytes += bytes;
226 	(void) ipc_mutex_lock(&controlstats_lock);
227 	if ((flowop->fo_type & FLOW_TYPE_IO) ||
228 	    (flowop->fo_type & FLOW_TYPE_AIO)) {
229 		controlstats.fs_count++;
230 		controlstats.fs_bytes += bytes;
231 	}
232 	if (flowop->fo_attrs & FLOW_ATTR_READ) {
233 		threadflow->tf_stats.fs_rbytes += bytes;
234 		threadflow->tf_stats.fs_rcount++;
235 		flowop->fo_stats.fs_rcount++;
236 		controlstats.fs_rbytes += bytes;
237 		controlstats.fs_rcount++;
238 	} else if (flowop->fo_attrs & FLOW_ATTR_WRITE) {
239 		threadflow->tf_stats.fs_wbytes += bytes;
240 		threadflow->tf_stats.fs_wcount++;
241 		flowop->fo_stats.fs_wcount++;
242 		controlstats.fs_wbytes += bytes;
243 		controlstats.fs_wcount++;
244 	}
245 	(void) ipc_mutex_unlock(&controlstats_lock);
246 }
247 
248 /*
249  * Calls the flowop's initialization function, pointed to by
250  * flowop->fo_init.
251  */
252 static int
flowop_initflow(flowop_t * flowop)253 flowop_initflow(flowop_t *flowop)
254 {
255 	/*
256 	 * save static copies of two items, in case they are supplied
257 	 * from random variables
258 	 */
259 	if (!AVD_IS_STRING(flowop->fo_value))
260 		flowop->fo_constvalue = avd_get_int(flowop->fo_value);
261 
262 	flowop->fo_constwss = avd_get_int(flowop->fo_wss);
263 
264 	if ((*flowop->fo_init)(flowop) < 0) {
265 		filebench_log(LOG_ERROR, "flowop %s-%d init failed",
266 		    flowop->fo_name, flowop->fo_instance);
267 		return (-1);
268 	}
269 	return (0);
270 }
271 
272 static int
flowop_create_runtime_flowops(threadflow_t * threadflow,flowop_t ** ops_list_ptr)273 flowop_create_runtime_flowops(threadflow_t *threadflow, flowop_t **ops_list_ptr)
274 {
275 	flowop_t *flowop = *ops_list_ptr;
276 
277 	while (flowop) {
278 		flowop_t *newflowop;
279 
280 		if (flowop == *ops_list_ptr)
281 			*ops_list_ptr = NULL;
282 
283 		newflowop = flowop_define_common(threadflow, flowop->fo_name,
284 		    flowop, ops_list_ptr, 1, 0);
285 		if (newflowop == NULL)
286 			return (FILEBENCH_ERROR);
287 
288 		/* check for fo_filename attribute, and resolve if present */
289 		if (flowop->fo_filename) {
290 			char *name;
291 
292 			name = avd_get_str(flowop->fo_filename);
293 			newflowop->fo_fileset = fileset_find(name);
294 
295 			if (newflowop->fo_fileset == NULL) {
296 				filebench_log(LOG_ERROR,
297 				    "flowop %s: file %s not found",
298 				    newflowop->fo_name, name);
299 				filebench_shutdown(1);
300 			}
301 		}
302 
303 		if (flowop_initflow(newflowop) < 0) {
304 			filebench_log(LOG_ERROR, "Flowop init of %s failed",
305 			    newflowop->fo_name);
306 		}
307 
308 		flowop = flowop->fo_exec_next;
309 	}
310 	return (FILEBENCH_OK);
311 }
312 
313 /*
314  * Calls the flowop's destruct function, pointed to by
315  * flowop->fo_destruct.
316  */
317 static void
flowop_destructflow(flowop_t * flowop)318 flowop_destructflow(flowop_t *flowop)
319 {
320 	(*flowop->fo_destruct)(flowop);
321 }
322 
323 /*
324  * call the destruct funtions of all the threadflow's flowops,
325  * if it is still flagged as "running".
326  */
327 void
flowop_destruct_all_flows(threadflow_t * threadflow)328 flowop_destruct_all_flows(threadflow_t *threadflow)
329 {
330 	flowop_t *flowop;
331 
332 	/* wait a moment to give other threads a chance to stop too */
333 	(void) sleep(1);
334 
335 	(void) ipc_mutex_lock(&threadflow->tf_lock);
336 
337 	/* prepare to call destruct flow routines, if necessary */
338 	if (threadflow->tf_running == 0) {
339 
340 		/* allready destroyed */
341 		(void) ipc_mutex_unlock(&threadflow->tf_lock);
342 		return;
343 	}
344 
345 	flowop = threadflow->tf_thrd_fops;
346 	threadflow->tf_running = 0;
347 	(void) ipc_mutex_unlock(&threadflow->tf_lock);
348 
349 	while (flowop) {
350 		flowop_destructflow(flowop);
351 		flowop = flowop->fo_exec_next;
352 	}
353 }
354 
355 /*
356  * The final initialization and main execution loop for the
357  * worker threads. Sets threadflow and flowop start times,
358  * waits for all process to start, then creates the runtime
359  * flowops from those defined by the F language workload
360  * script. It does some more initialization, then enters a
361  * loop to repeatedly execute the flowops on the flowop list
362  * until an abort condition is detected, at which time it exits.
363  * This is the starting routine for the new worker thread
364  * created by threadflow_createthread(), and is not currently
365  * called from anywhere else.
366  */
367 void
flowop_start(threadflow_t * threadflow)368 flowop_start(threadflow_t *threadflow)
369 {
370 	flowop_t *flowop;
371 	size_t memsize;
372 	int ret = FILEBENCH_OK;
373 
374 #ifdef HAVE_PROCFS
375 	if (noproc == 0) {
376 		char procname[128];
377 		long ctl[2] = {PCSET, PR_MSACCT};
378 		int pfd;
379 
380 		(void) snprintf(procname, sizeof (procname),
381 		    "/proc/%d/lwp/%d/lwpctl", my_pid, _lwp_self());
382 		pfd = open(procname, O_WRONLY);
383 		(void) pwrite(pfd, &ctl, sizeof (ctl), 0);
384 		(void) close(pfd);
385 	}
386 #endif
387 
388 	(void) ipc_mutex_lock(&controlstats_lock);
389 	if (!controlstats_zeroed) {
390 		(void) memset(&controlstats, 0, sizeof (controlstats));
391 		controlstats_zeroed = 1;
392 	}
393 	(void) ipc_mutex_unlock(&controlstats_lock);
394 
395 	flowop = threadflow->tf_thrd_fops;
396 	threadflow->tf_stats.fs_stime = gethrtime();
397 	flowop->fo_stats.fs_stime = gethrtime();
398 
399 	/* Hold the flowop find lock as reader to prevent lookups */
400 	(void) pthread_rwlock_rdlock(&filebench_shm->shm_flowop_find_lock);
401 
402 	/*
403 	 * Block until all processes have started, acting like
404 	 * a barrier. The original filebench process initially
405 	 * holds the run_lock as a reader, preventing any of the
406 	 * threads from obtaining the writer lock, and hence
407 	 * passing this point. Once all processes and threads
408 	 * have been created, the original process unlocks
409 	 * run_lock, allowing each waiting thread to lock
410 	 * and then immediately unlock it, then begin running.
411 	 */
412 	(void) pthread_rwlock_wrlock(&filebench_shm->shm_run_lock);
413 	(void) pthread_rwlock_unlock(&filebench_shm->shm_run_lock);
414 
415 	/* Create the runtime flowops from those defined by the script */
416 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
417 	if (flowop_create_runtime_flowops(threadflow, &threadflow->tf_thrd_fops)
418 	    != FILEBENCH_OK) {
419 		(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
420 		filebench_shutdown(1);
421 		return;
422 	}
423 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
424 
425 	/* Release the find lock as reader to allow lookups */
426 	(void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
427 
428 	/* Set to the start of the new flowop list */
429 	flowop = threadflow->tf_thrd_fops;
430 
431 	threadflow->tf_abort = 0;
432 	threadflow->tf_running = 1;
433 
434 	memsize = (size_t)threadflow->tf_constmemsize;
435 
436 	/* If we are going to use ISM, allocate later */
437 	if (threadflow->tf_attrs & THREADFLOW_USEISM) {
438 		threadflow->tf_mem =
439 		    ipc_ismmalloc(memsize);
440 	} else {
441 		threadflow->tf_mem =
442 		    malloc(memsize);
443 	}
444 
445 	(void) memset(threadflow->tf_mem, 0, memsize);
446 	filebench_log(LOG_DEBUG_SCRIPT, "Thread allocated %d bytes", memsize);
447 
448 #ifdef HAVE_LWPS
449 	filebench_log(LOG_DEBUG_SCRIPT, "Thread %zx (%d) started",
450 	    threadflow,
451 	    _lwp_self());
452 #endif
453 
454 	/* Main filebench worker loop */
455 	while (ret == FILEBENCH_OK) {
456 		int i, count;
457 
458 		/* Abort if asked */
459 		if (threadflow->tf_abort || filebench_shm->shm_f_abort)
460 			break;
461 
462 		/* Be quiet while stats are gathered */
463 		if (filebench_shm->shm_bequiet) {
464 			(void) sleep(1);
465 			continue;
466 		}
467 
468 		/* Take it easy until everyone is ready to go */
469 		if (!filebench_shm->shm_procs_running) {
470 			(void) sleep(1);
471 			continue;
472 		}
473 
474 		if (flowop == NULL) {
475 			filebench_log(LOG_ERROR, "flowop_read null flowop");
476 			return;
477 		}
478 
479 		if (flowop->fo_stats.fs_stime == 0)
480 			flowop->fo_stats.fs_stime = gethrtime();
481 
482 		/* Execute the flowop for fo_iters times */
483 		count = (int)avd_get_int(flowop->fo_iters);
484 		for (i = 0; i < count; i++) {
485 
486 			filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
487 			    "%s-%d", threadflow->tf_name, flowop->fo_name,
488 			    flowop->fo_instance);
489 
490 			ret = (*flowop->fo_func)(threadflow, flowop);
491 
492 			/*
493 			 * Return value FILEBENCH_ERROR means "flowop
494 			 * failed, stop the filebench run"
495 			 */
496 			if (ret == FILEBENCH_ERROR) {
497 				filebench_log(LOG_ERROR,
498 				    "%s-%d: flowop %s-%d failed",
499 				    threadflow->tf_name,
500 				    threadflow->tf_instance,
501 				    flowop->fo_name,
502 				    flowop->fo_instance);
503 				(void) ipc_mutex_lock(&threadflow->tf_lock);
504 				threadflow->tf_abort = 1;
505 				filebench_shm->shm_f_abort =
506 				    FILEBENCH_ABORT_ERROR;
507 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
508 				break;
509 			}
510 
511 			/*
512 			 * Return value of FILEBENCH_NORSC means "stop
513 			 * the filebench run" if in "end on no work mode",
514 			 * otherwise it indicates an error
515 			 */
516 			if (ret == FILEBENCH_NORSC) {
517 				(void) ipc_mutex_lock(&threadflow->tf_lock);
518 				threadflow->tf_abort = FILEBENCH_DONE;
519 				if (filebench_shm->shm_rmode ==
520 				    FILEBENCH_MODE_Q1STDONE) {
521 					filebench_shm->shm_f_abort =
522 					    FILEBENCH_ABORT_RSRC;
523 				} else if (filebench_shm->shm_rmode !=
524 				    FILEBENCH_MODE_QALLDONE) {
525 					filebench_log(LOG_ERROR1,
526 					    "WARNING! Run stopped early:\n   "
527 					    "             flowop %s-%d could "
528 					    "not obtain a file. Please\n      "
529 					    "          reduce runtime, "
530 					    "increase fileset entries "
531 					    "($nfiles), or switch modes.",
532 					    flowop->fo_name,
533 					    flowop->fo_instance);
534 					filebench_shm->shm_f_abort =
535 					    FILEBENCH_ABORT_ERROR;
536 				}
537 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
538 				break;
539 			}
540 
541 			/*
542 			 * Return value of FILEBENCH_DONE means "stop
543 			 * the filebench run without error"
544 			 */
545 			if (ret == FILEBENCH_DONE) {
546 				(void) ipc_mutex_lock(&threadflow->tf_lock);
547 				threadflow->tf_abort = FILEBENCH_DONE;
548 				filebench_shm->shm_f_abort =
549 				    FILEBENCH_ABORT_DONE;
550 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
551 				break;
552 			}
553 
554 			/*
555 			 * If we get here and the return is something other
556 			 * than FILEBENCH_OK, it means a spurious code
557 			 * was returned, so treat as major error. This
558 			 * probably indicates a bug in the flowop.
559 			 */
560 			if (ret != FILEBENCH_OK) {
561 				filebench_log(LOG_ERROR,
562 				    "Flowop %s unexpected return value = %d\n",
563 				    flowop->fo_name, ret);
564 				filebench_shm->shm_f_abort =
565 				    FILEBENCH_ABORT_ERROR;
566 				break;
567 			}
568 		}
569 
570 		/* advance to next flowop */
571 		flowop = flowop->fo_exec_next;
572 
573 		/* but if at end of list, start over from the beginning */
574 		if (flowop == NULL) {
575 			flowop = threadflow->tf_thrd_fops;
576 			threadflow->tf_stats.fs_count++;
577 		}
578 	}
579 
580 #ifdef HAVE_LWPS
581 	filebench_log(LOG_DEBUG_SCRIPT, "Thread %d exiting",
582 	    _lwp_self());
583 #endif
584 
585 	/* Tell flowops to destroy locally acquired state */
586 	flowop_destruct_all_flows(threadflow);
587 
588 	pthread_exit(&threadflow->tf_abort);
589 }
590 
591 void flowoplib_flowinit(void);
592 void fb_lfs_flowinit(void);
593 
594 void
flowop_init(void)595 flowop_init(void)
596 {
597 	(void) pthread_mutex_init(&controlstats_lock,
598 	    ipc_mutexattr(IPC_MUTEX_NORMAL));
599 	flowoplib_flowinit();
600 }
601 
602 static int plugin_flowinit_done = FALSE;
603 
604 /*
605  * Initialize any "plug-in" flowops. Called when the first "create fileset"
606  * command is encountered.
607  */
608 void
flowop_plugin_flowinit(void)609 flowop_plugin_flowinit(void)
610 {
611 	if (plugin_flowinit_done)
612 		return;
613 
614 	plugin_flowinit_done = TRUE;
615 
616 	switch (filebench_shm->shm_filesys_type) {
617 	case LOCAL_FS_PLUG:
618 		fb_lfs_flowinit();
619 		break;
620 
621 	case NFS3_PLUG:
622 	case NFS4_PLUG:
623 	case CIFS_PLUG:
624 		break;
625 	}
626 }
627 
628 
629 /*
630  * Delete the designated flowop from the thread's flowop list.
631  */
632 static void
flowop_delete(flowop_t ** flowoplist,flowop_t * flowop)633 flowop_delete(flowop_t **flowoplist, flowop_t *flowop)
634 {
635 	flowop_t *entry = *flowoplist;
636 	int found = 0;
637 
638 	filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
639 	    flowop->fo_name,
640 	    flowop->fo_instance);
641 
642 	/* Delete from thread's flowop list */
643 	if (flowop == *flowoplist) {
644 		/* First on list */
645 		*flowoplist = flowop->fo_exec_next;
646 		filebench_log(LOG_DEBUG_IMPL,
647 		    "Delete0 flowop: (%s-%d)",
648 		    flowop->fo_name,
649 		    flowop->fo_instance);
650 	} else {
651 		while (entry->fo_exec_next) {
652 			filebench_log(LOG_DEBUG_IMPL,
653 			    "Delete0 flowop: (%s-%d) == (%s-%d)",
654 			    entry->fo_exec_next->fo_name,
655 			    entry->fo_exec_next->fo_instance,
656 			    flowop->fo_name,
657 			    flowop->fo_instance);
658 
659 			if (flowop == entry->fo_exec_next) {
660 				/* Delete */
661 				filebench_log(LOG_DEBUG_IMPL,
662 				    "Deleted0 flowop: (%s-%d)",
663 				    entry->fo_exec_next->fo_name,
664 				    entry->fo_exec_next->fo_instance);
665 				entry->fo_exec_next =
666 				    entry->fo_exec_next->fo_exec_next;
667 				break;
668 			}
669 			entry = entry->fo_exec_next;
670 		}
671 	}
672 
673 #ifdef HAVE_PROCFS
674 	/* Close /proc stats */
675 	if (flowop->fo_thread)
676 		(void) close(flowop->fo_thread->tf_lwpusagefd);
677 #endif
678 
679 	/* Delete from global list */
680 	entry = filebench_shm->shm_flowoplist;
681 
682 	if (flowop == filebench_shm->shm_flowoplist) {
683 		/* First on list */
684 		filebench_shm->shm_flowoplist = flowop->fo_next;
685 		found = 1;
686 	} else {
687 		while (entry->fo_next) {
688 			filebench_log(LOG_DEBUG_IMPL,
689 			    "Delete flowop: (%s-%d) == (%s-%d)",
690 			    entry->fo_next->fo_name,
691 			    entry->fo_next->fo_instance,
692 			    flowop->fo_name,
693 			    flowop->fo_instance);
694 
695 			if (flowop == entry->fo_next) {
696 				/* Delete */
697 				entry->fo_next = entry->fo_next->fo_next;
698 				found = 1;
699 				break;
700 			}
701 
702 			entry = entry->fo_next;
703 		}
704 	}
705 	if (found) {
706 		filebench_log(LOG_DEBUG_IMPL,
707 		    "Deleted flowop: (%s-%d)",
708 		    flowop->fo_name,
709 		    flowop->fo_instance);
710 		ipc_free(FILEBENCH_FLOWOP, (char *)flowop);
711 	} else {
712 		filebench_log(LOG_DEBUG_IMPL, "Flowop %s-%d not found!",
713 		    flowop->fo_name,
714 		    flowop->fo_instance);
715 	}
716 }
717 
718 /*
719  * Deletes all the flowops from a flowop list.
720  */
721 void
flowop_delete_all(flowop_t ** flowoplist)722 flowop_delete_all(flowop_t **flowoplist)
723 {
724 	flowop_t *flowop = *flowoplist;
725 	flowop_t *next_flowop;
726 
727 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
728 
729 	while (flowop) {
730 		filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
731 		    flowop->fo_name, flowop->fo_instance);
732 
733 		if (flowop->fo_instance &&
734 		    (flowop->fo_instance == FLOW_MASTER)) {
735 			flowop = flowop->fo_exec_next;
736 			continue;
737 		}
738 		next_flowop = flowop->fo_exec_next;
739 		flowop_delete(flowoplist, flowop);
740 		flowop = next_flowop;
741 	}
742 
743 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
744 }
745 
746 /*
747  * Allocates a flowop entity and initializes it with inherited
748  * contents from the "inherit" flowop, if it is supplied, or
749  * with zeros otherwise. In either case the fo_next and fo_exec_next
750  * pointers are set to NULL, and fo_thread is set to point to
751  * the owning threadflow. The initialized flowop is placed at
752  * the head of the global flowop list, and also placed on the
753  * tail of the supplied local flowop list, which will either
754  * be a threadflow's tf_thrd_fops list or a composite flowop's
755  * fo_comp_fops list. The routine locks the flowop's fo_lock and
756  * leaves it held on return. If successful, it returns a pointer
757  * to the allocated and initialized flowop, otherwise it returns NULL.
758  *
759  * filebench_shm->shm_flowop_lock must be held by caller.
760  */
761 static flowop_t *
flowop_define_common(threadflow_t * threadflow,char * name,flowop_t * inherit,flowop_t ** flowoplist_hdp,int instance,int type)762 flowop_define_common(threadflow_t *threadflow, char *name, flowop_t *inherit,
763     flowop_t **flowoplist_hdp, int instance, int type)
764 {
765 	flowop_t *flowop;
766 
767 	if (name == NULL)
768 		return (NULL);
769 
770 	if ((flowop = (flowop_t *)ipc_malloc(FILEBENCH_FLOWOP)) == NULL) {
771 		filebench_log(LOG_ERROR,
772 		    "flowop_define: Can't malloc flowop");
773 		return (NULL);
774 	}
775 
776 	filebench_log(LOG_DEBUG_IMPL, "defining flowops %s-%d, addr %zx",
777 	    name, instance, flowop);
778 
779 	if (flowop == NULL)
780 		return (NULL);
781 
782 	if (inherit) {
783 		(void) memcpy(flowop, inherit, sizeof (flowop_t));
784 		(void) pthread_mutex_init(&flowop->fo_lock,
785 		    ipc_mutexattr(IPC_MUTEX_PRI_ROB));
786 		(void) ipc_mutex_lock(&flowop->fo_lock);
787 		flowop->fo_next = NULL;
788 		flowop->fo_exec_next = NULL;
789 		filebench_log(LOG_DEBUG_IMPL,
790 		    "flowop %s-%d calling init", name, instance);
791 	} else {
792 		(void) memset(flowop, 0, sizeof (flowop_t));
793 		flowop->fo_iters = avd_int_alloc(1);
794 		flowop->fo_type = type;
795 		(void) pthread_mutex_init(&flowop->fo_lock,
796 		    ipc_mutexattr(IPC_MUTEX_PRI_ROB));
797 		(void) ipc_mutex_lock(&flowop->fo_lock);
798 	}
799 
800 	/* Create backpointer to thread */
801 	flowop->fo_thread = threadflow;
802 
803 	/* Add flowop to global list */
804 	if (filebench_shm->shm_flowoplist == NULL) {
805 		filebench_shm->shm_flowoplist = flowop;
806 		flowop->fo_next = NULL;
807 	} else {
808 		flowop->fo_next = filebench_shm->shm_flowoplist;
809 		filebench_shm->shm_flowoplist = flowop;
810 	}
811 
812 	(void) strcpy(flowop->fo_name, name);
813 	flowop->fo_instance = instance;
814 
815 	if (flowoplist_hdp == NULL)
816 		return (flowop);
817 
818 	/* Add flowop to thread op list */
819 	if (*flowoplist_hdp == NULL) {
820 		*flowoplist_hdp = flowop;
821 		flowop->fo_exec_next = NULL;
822 	} else {
823 		flowop_t *flowend;
824 
825 		/* Find the end of the thread list */
826 		flowend = *flowoplist_hdp;
827 		while (flowend->fo_exec_next != NULL)
828 			flowend = flowend->fo_exec_next;
829 		flowend->fo_exec_next = flowop;
830 		flowop->fo_exec_next = NULL;
831 	}
832 
833 	return (flowop);
834 }
835 
836 /*
837  * Calls flowop_define_common() to allocate and initialize a
838  * flowop, and holds the shared flowop_lock during the call.
839  * It releases the created flowop's fo_lock when done.
840  */
841 flowop_t *
flowop_define(threadflow_t * threadflow,char * name,flowop_t * inherit,flowop_t ** flowoplist_hdp,int instance,int type)842 flowop_define(threadflow_t *threadflow, char *name, flowop_t *inherit,
843     flowop_t **flowoplist_hdp, int instance, int type)
844 {
845 	flowop_t	*flowop;
846 
847 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
848 	flowop = flowop_define_common(threadflow, name,
849 	    inherit, flowoplist_hdp, instance, type);
850 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
851 
852 	if (flowop == NULL)
853 		return (NULL);
854 
855 	(void) ipc_mutex_unlock(&flowop->fo_lock);
856 
857 	return (flowop);
858 }
859 
860 /*
861  * Calls flowop_define_common() to allocate and initialize a
862  * composite flowop, and holds the shared flowop_lock during the call.
863  * It releases the created flowop's fo_lock when done.
864  */
865 flowop_t *
flowop_new_composite_define(char * name)866 flowop_new_composite_define(char *name)
867 {
868 	flowop_t *flowop;
869 
870 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
871 	flowop = flowop_define_common(NULL, name,
872 	    NULL, NULL, 0, FLOW_TYPE_COMPOSITE);
873 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
874 
875 	if (flowop == NULL)
876 		return (NULL);
877 
878 	flowop->fo_func = flowop_composite;
879 	flowop->fo_init = flowop_composite_init;
880 	flowop->fo_destruct = flowop_composite_destruct;
881 	(void) ipc_mutex_unlock(&flowop->fo_lock);
882 
883 	return (flowop);
884 }
885 
886 /*
887  * Attempts to take a write lock on the flowop_find_lock that is
888  * defined in interprocess shared memory. Since each call to
889  * flowop_start() holds a read lock on flowop_find_lock, this
890  * routine effectively blocks until all instances of
891  * flowop_start() have finished. The flowop_find() routine calls
892  * this routine so that flowops won't be searched for until all
893  * flowops have been created by flowop_start.
894  */
895 static void
flowop_find_barrier(void)896 flowop_find_barrier(void)
897 {
898 	/* Block on wrlock to ensure find waits for all creates */
899 	(void) pthread_rwlock_wrlock(&filebench_shm->shm_flowop_find_lock);
900 	(void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
901 }
902 
903 /*
904  * Returns a list of flowops named "name" from the master
905  * flowop list.
906  */
907 flowop_t *
flowop_find(char * name)908 flowop_find(char *name)
909 {
910 	flowop_t *flowop;
911 	flowop_t *result = NULL;
912 
913 	flowop_find_barrier();
914 
915 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
916 
917 	flowop = filebench_shm->shm_flowoplist;
918 
919 	while (flowop) {
920 		if (strcmp(name, flowop->fo_name) == 0) {
921 
922 			/* Add flowop to result list */
923 			if (result == NULL) {
924 				result = flowop;
925 				flowop->fo_resultnext = NULL;
926 			} else {
927 				flowop->fo_resultnext = result;
928 				result = flowop;
929 			}
930 		}
931 		flowop = flowop->fo_next;
932 	}
933 
934 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
935 
936 
937 	return (result);
938 }
939 
940 /*
941  * Returns a pointer to the specified instance of flowop
942  * "name" from the global list.
943  */
944 flowop_t *
flowop_find_one(char * name,int instance)945 flowop_find_one(char *name, int instance)
946 {
947 	flowop_t *test_flowop;
948 
949 	flowop_find_barrier();
950 
951 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
952 
953 	test_flowop = filebench_shm->shm_flowoplist;
954 
955 	while (test_flowop) {
956 		if ((strcmp(name, test_flowop->fo_name) == 0) &&
957 		    (instance == test_flowop->fo_instance))
958 			break;
959 
960 		test_flowop = test_flowop->fo_next;
961 	}
962 
963 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
964 
965 	return (test_flowop);
966 }
967 
968 /*
969  * recursively searches through lists of flowops on a given thread
970  * and those on any included composite flowops for the named flowop.
971  * either returns with a pointer to the named flowop or NULL if it
972  * cannot be found.
973  */
974 static flowop_t *
flowop_recurse_search(char * path,char * name,flowop_t * list)975 flowop_recurse_search(char *path, char *name, flowop_t *list)
976 {
977 	flowop_t *test_flowop;
978 	char fullname[MAXPATHLEN];
979 
980 	test_flowop = list;
981 
982 	/*
983 	 * when searching a list of inner flowops, "path" is the fullname
984 	 * of the containing composite flowop. Use it to form the
985 	 * full name of the inner flowop to search for.
986 	 */
987 	if (path) {
988 		if ((strlen(path) + strlen(name) + 1) > MAXPATHLEN) {
989 			filebench_log(LOG_ERROR,
990 			    "composite flowop path name %s.%s too long",
991 			    path, name);
992 			return (NULL);
993 		}
994 
995 		/* create composite_name.name for recursive search */
996 		(void) strcpy(fullname, path);
997 		(void) strcat(fullname, ".");
998 		(void) strcat(fullname, name);
999 	} else {
1000 		(void) strcpy(fullname, name);
1001 	}
1002 
1003 	/*
1004 	 * loop through all flowops on the supplied tf_thrd_fops (flowop)
1005 	 * list or fo_comp_fops (inner flowop) list.
1006 	 */
1007 	while (test_flowop) {
1008 		if (strcmp(fullname, test_flowop->fo_name) == 0)
1009 			return (test_flowop);
1010 
1011 		if (test_flowop->fo_type == FLOW_TYPE_COMPOSITE) {
1012 			flowop_t *found_flowop;
1013 
1014 			found_flowop = flowop_recurse_search(
1015 			    test_flowop->fo_name, name,
1016 			    test_flowop->fo_comp_fops);
1017 
1018 			if (found_flowop)
1019 				return (found_flowop);
1020 		}
1021 		test_flowop = test_flowop->fo_exec_next;
1022 	}
1023 
1024 	/* not found here or on any child lists */
1025 	return (NULL);
1026 }
1027 
1028 /*
1029  * Returns a pointer to flowop named "name" from the supplied tf_thrd_fops
1030  * list of flowops. Returns the named flowop if found, or NULL.
1031  */
1032 flowop_t *
flowop_find_from_list(char * name,flowop_t * list)1033 flowop_find_from_list(char *name, flowop_t *list)
1034 {
1035 	flowop_t *found_flowop;
1036 
1037 	flowop_find_barrier();
1038 
1039 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
1040 
1041 	found_flowop = flowop_recurse_search(NULL, name, list);
1042 
1043 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
1044 
1045 	return (found_flowop);
1046 }
1047 
1048 /*
1049  * Composite flowop method. Does one pass through its list of
1050  * inner flowops per iteration.
1051  */
1052 static int
flowop_composite(threadflow_t * threadflow,flowop_t * flowop)1053 flowop_composite(threadflow_t *threadflow, flowop_t *flowop)
1054 {
1055 	flowop_t	*inner_flowop;
1056 
1057 	/* get the first flowop in the list */
1058 	inner_flowop = flowop->fo_comp_fops;
1059 
1060 	/* make a pass through the list of sub flowops */
1061 	while (inner_flowop) {
1062 		int	i, count;
1063 
1064 		/* Abort if asked */
1065 		if (threadflow->tf_abort || filebench_shm->shm_f_abort)
1066 			return (FILEBENCH_DONE);
1067 
1068 		if (inner_flowop->fo_stats.fs_stime == 0)
1069 			inner_flowop->fo_stats.fs_stime = gethrtime();
1070 
1071 		/* Execute the flowop for fo_iters times */
1072 		count = (int)avd_get_int(inner_flowop->fo_iters);
1073 		for (i = 0; i < count; i++) {
1074 
1075 			filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
1076 			    "%s-%d", threadflow->tf_name,
1077 			    inner_flowop->fo_name,
1078 			    inner_flowop->fo_instance);
1079 
1080 			switch ((*inner_flowop->fo_func)(threadflow,
1081 			    inner_flowop)) {
1082 
1083 			/* all done */
1084 			case FILEBENCH_DONE:
1085 				return (FILEBENCH_DONE);
1086 
1087 			/* quit if inner flowop limit reached */
1088 			case FILEBENCH_NORSC:
1089 				return (FILEBENCH_NORSC);
1090 
1091 			/* quit on inner flowop error */
1092 			case FILEBENCH_ERROR:
1093 				filebench_log(LOG_ERROR,
1094 				    "inner flowop %s failed",
1095 				    inner_flowop->fo_name);
1096 				return (FILEBENCH_ERROR);
1097 
1098 			/* otherwise keep going */
1099 			default:
1100 				break;
1101 			}
1102 
1103 		}
1104 
1105 		/* advance to next flowop */
1106 		inner_flowop = inner_flowop->fo_exec_next;
1107 	}
1108 
1109 	/* finished with this pass */
1110 	return (FILEBENCH_OK);
1111 }
1112 
1113 /*
1114  * Composite flowop initialization. Creates runtime inner flowops
1115  * from prototype inner flowops.
1116  */
1117 static int
flowop_composite_init(flowop_t * flowop)1118 flowop_composite_init(flowop_t *flowop)
1119 {
1120 	int err;
1121 
1122 	err = flowop_create_runtime_flowops(flowop->fo_thread,
1123 	    &flowop->fo_comp_fops);
1124 	if (err != FILEBENCH_OK)
1125 		return (err);
1126 
1127 	(void) ipc_mutex_unlock(&flowop->fo_lock);
1128 	return (0);
1129 }
1130 
1131 /*
1132  * clean up inner flowops
1133  */
1134 static void
flowop_composite_destruct(flowop_t * flowop)1135 flowop_composite_destruct(flowop_t *flowop)
1136 {
1137 	flowop_t *inner_flowop = flowop->fo_comp_fops;
1138 
1139 	while (inner_flowop) {
1140 		filebench_log(LOG_DEBUG_IMPL, "Deleting inner flowop (%s-%d)",
1141 		    inner_flowop->fo_name, inner_flowop->fo_instance);
1142 
1143 		if (inner_flowop->fo_instance &&
1144 		    (inner_flowop->fo_instance == FLOW_MASTER)) {
1145 			inner_flowop = inner_flowop->fo_exec_next;
1146 			continue;
1147 		}
1148 		flowop_delete(&flowop->fo_comp_fops, inner_flowop);
1149 		inner_flowop = inner_flowop->fo_exec_next;
1150 	}
1151 }
1152 
1153 /*
1154  * Support routines for libraries of flowops
1155  */
1156 
1157 int
flowop_init_generic(flowop_t * flowop)1158 flowop_init_generic(flowop_t *flowop)
1159 {
1160 	(void) ipc_mutex_unlock(&flowop->fo_lock);
1161 	return (FILEBENCH_OK);
1162 }
1163 
1164 void
flowop_destruct_generic(flowop_t * flowop)1165 flowop_destruct_generic(flowop_t *flowop)
1166 {
1167 	char *buf;
1168 
1169 	/* release any local resources held by the flowop */
1170 	(void) ipc_mutex_lock(&flowop->fo_lock);
1171 	buf = flowop->fo_buf;
1172 	flowop->fo_buf = NULL;
1173 	(void) ipc_mutex_unlock(&flowop->fo_lock);
1174 
1175 	if (buf)
1176 		free(buf);
1177 }
1178 
1179 
1180 /*
1181  * Loops through the supplied list of flowops and creates and initializes
1182  * a flowop for each one by calling flowop_define. As a side effect of
1183  * calling flowop define, the created flowops are placed on the
1184  * master flowop list. All created flowops are set to instance "0".
1185  */
1186 void
flowop_flow_init(flowop_proto_t * list,int nops)1187 flowop_flow_init(flowop_proto_t *list, int nops)
1188 {
1189 	int i;
1190 
1191 	for (i = 0; i < nops; i++) {
1192 		flowop_t *flowop;
1193 		flowop_proto_t *fl;
1194 
1195 		fl = &(list[i]);
1196 
1197 		if ((flowop = flowop_define(NULL,
1198 		    fl->fl_name, NULL, NULL, 0, fl->fl_type)) == 0) {
1199 			filebench_log(LOG_ERROR,
1200 			    "failed to create flowop %s\n",
1201 			    fl->fl_name);
1202 			filebench_shutdown(1);
1203 		}
1204 
1205 		flowop->fo_func = fl->fl_func;
1206 		flowop->fo_init = fl->fl_init;
1207 		flowop->fo_destruct = fl->fl_destruct;
1208 		flowop->fo_attrs = fl->fl_attrs;
1209 	}
1210 }
1211