1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /*
22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved.
23 * Use is subject to license terms.
24 */
25
26 #include "config.h"
27
28 #ifdef HAVE_LWPS
29 #include <sys/lwp.h>
30 #endif
31 #include <fcntl.h>
32 #include "filebench.h"
33 #include "flowop.h"
34 #include "stats.h"
35
36 #ifdef LINUX_PORT
37 #include <sys/types.h>
38 #include <linux/unistd.h>
39 #endif
40
41 static flowop_t *flowop_define_common(threadflow_t *threadflow, char *name,
42 flowop_t *inherit, flowop_t **flowoplist_hdp, int instance, int type);
43 static int flowop_composite(threadflow_t *threadflow, flowop_t *flowop);
44 static int flowop_composite_init(flowop_t *flowop);
45 static void flowop_composite_destruct(flowop_t *flowop);
46
47 /*
48 * A collection of flowop support functions. The actual code that
49 * implements the various flowops is in flowop_library.c.
50 *
51 * Routines for defining, creating, initializing and destroying
52 * flowops, cyclically invoking the flowops on each threadflow's flowop
53 * list, collecting statistics about flowop execution, and other
54 * housekeeping duties are included in this file.
55 *
56 * User Defined Composite Flowops
57 * The ability to define new flowops as lists of built-in or previously
58 * defined flowops has been added to Filebench. In a sense they are like
59 * in-line subroutines, which can have default attributes set at definition
60 * time and passed arguments at invocation time. Like other flowops (and
61 * unlike conventional subroutines) you can invoke them with an iteration
62 * count (the "iter" attribute), and they will loop through their associated
63 * list of flowops for "iter" number of times each time they are encountered
64 * in the thread or outer composite flowop which invokes them.
65 *
66 * Composite flowops are created with a "define" command, are given a name,
67 * optional default attributes, and local variable definitions on the
68 * "define" command line, followed by a brace enclosed list of flowops
69 * to execute. The enclosed flowops may include attributes that reference
70 * the local variables, as well as constants and global variables.
71 *
72 * Composite flowops are used pretty much like regular flowops, but you can
73 * also set local variables to constants or global variables ($local_var =
74 * [$var | $random_var | string | boolean | integer | double]) as part of
75 * the invocation. Thus each invocation can pass customized values to its
76 * inner flowops, greatly increasing their generality.
77 *
78 * All flowops are placed on a global, singly linked list, with fo_next
79 * being the link pointer for this list. The are also placed on a private
80 * list for the thread or composite flowop they are part of. The tf_thrd_fops
81 * pointer in the thread will point to the list of top level flowops in the
82 * thread, which are linked together by fo_exec_next. If any of these flowops
83 * are composite flowops, they will have a list of second level flowops rooted
84 * at the composite flowop's fo_comp_fops pointer. So, there is one big list
85 * of all flowops, and an n-arry tree of threads, composite flowops, and
86 * flowops, with composite flowops being the branch nodes in the tree.
87 *
88 * To illustrate, if we have three first level flowops, the first of which is
89 * a composite flowop consisting of two other flowops, we get:
90 *
91 * Thread->tf_thrd_fops -> flowop->fo_exec_next -> flowop->fo_exec_next
92 * flowop->fo_comp_fops |
93 * | V
94 * | flowop->fo_exec_next
95 * |
96 * V
97 * flowop->fo_exec_next -> flowop->fo_exec_next
98 *
99 * And all five flowops (plus others from any other threads) are on a global
100 * list linked with fo_next.
101 */
102
103 /*
104 * Prints the name and instance number of each flowop in
105 * the supplied list to the filebench log.
106 */
107 int
flowop_printlist(flowop_t * list)108 flowop_printlist(flowop_t *list)
109 {
110 flowop_t *flowop = list;
111
112 while (flowop) {
113 filebench_log(LOG_DEBUG_IMPL, "flowop-list %s-%d",
114 flowop->fo_name, flowop->fo_instance);
115 flowop = flowop->fo_exec_next;
116 }
117 return (0);
118 }
119
120 /*
121 * Prints the name and instance number of all flowops on
122 * the master flowop list to the console and the filebench log.
123 */
124 void
flowop_printall(void)125 flowop_printall(void)
126 {
127 flowop_t *flowop = filebench_shm->shm_flowoplist;
128
129 while (flowop) {
130 filebench_log(LOG_VERBOSE, "flowop-list %s-%d",
131 flowop->fo_name, flowop->fo_instance);
132 flowop = flowop->fo_next;
133 }
134 }
135
136 #define TIMESPEC_TO_HRTIME(s, e) (((e.tv_sec - s.tv_sec) * 1000000000LL) + \
137 (e.tv_nsec - s.tv_nsec))
138 /*
139 * Puts current high resolution time in start time entry
140 * for threadflow and may also calculate running filebench
141 * overhead statistics.
142 */
143 void
flowop_beginop(threadflow_t * threadflow,flowop_t * flowop)144 flowop_beginop(threadflow_t *threadflow, flowop_t *flowop)
145 {
146 #ifdef HAVE_PROCFS
147 if ((filebench_shm->shm_mmode & FILEBENCH_MODE_NOUSAGE) == 0) {
148 if ((noproc == 0) && (threadflow->tf_lwpusagefd == 0)) {
149 char procname[128];
150
151 (void) snprintf(procname, sizeof (procname),
152 "/proc/%d/lwp/%d/lwpusage", my_pid, _lwp_self());
153 threadflow->tf_lwpusagefd = open(procname, O_RDONLY);
154 }
155
156 (void) pread(threadflow->tf_lwpusagefd,
157 &threadflow->tf_susage,
158 sizeof (struct prusage), 0);
159
160 /* Compute overhead time in this thread around op */
161 if (threadflow->tf_eusage.pr_stime.tv_nsec) {
162 flowop->fo_stats.fs_mstate[FLOW_MSTATE_OHEAD] +=
163 TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_utime,
164 threadflow->tf_susage.pr_utime) +
165 TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_ttime,
166 threadflow->tf_susage.pr_ttime) +
167 TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_stime,
168 threadflow->tf_susage.pr_stime);
169 }
170 }
171 #endif
172
173 /* Start of op for this thread */
174 threadflow->tf_stime = gethrtime();
175 }
176
177 flowstat_t controlstats;
178 pthread_mutex_t controlstats_lock;
179 static int controlstats_zeroed = 0;
180
181 /*
182 * Updates flowop's latency statistics, using saved start
183 * time and current high resolution time. Updates flowop's
184 * io count and transferred bytes statistics. Also updates
185 * threadflow's and flowop's cumulative read or write byte
186 * and io count statistics.
187 */
188 void
flowop_endop(threadflow_t * threadflow,flowop_t * flowop,int64_t bytes)189 flowop_endop(threadflow_t *threadflow, flowop_t *flowop, int64_t bytes)
190 {
191 hrtime_t t;
192
193 flowop->fo_stats.fs_mstate[FLOW_MSTATE_LAT] +=
194 (gethrtime() - threadflow->tf_stime);
195 #ifdef HAVE_PROCFS
196 if ((filebench_shm->shm_mmode & FILEBENCH_MODE_NOUSAGE) == 0) {
197 if ((pread(threadflow->tf_lwpusagefd, &threadflow->tf_eusage,
198 sizeof (struct prusage), 0)) != sizeof (struct prusage))
199 filebench_log(LOG_ERROR, "cannot read /proc");
200
201 t =
202 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_utime,
203 threadflow->tf_eusage.pr_utime) +
204 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_ttime,
205 threadflow->tf_eusage.pr_ttime) +
206 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_stime,
207 threadflow->tf_eusage.pr_stime);
208 flowop->fo_stats.fs_mstate[FLOW_MSTATE_CPU] += t;
209
210 flowop->fo_stats.fs_mstate[FLOW_MSTATE_WAIT] +=
211 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_tftime,
212 threadflow->tf_eusage.pr_tftime) +
213 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_dftime,
214 threadflow->tf_eusage.pr_dftime) +
215 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
216 threadflow->tf_eusage.pr_kftime) +
217 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
218 threadflow->tf_eusage.pr_kftime) +
219 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_slptime,
220 threadflow->tf_eusage.pr_slptime);
221 }
222 #endif
223
224 flowop->fo_stats.fs_count++;
225 flowop->fo_stats.fs_bytes += bytes;
226 (void) ipc_mutex_lock(&controlstats_lock);
227 if ((flowop->fo_type & FLOW_TYPE_IO) ||
228 (flowop->fo_type & FLOW_TYPE_AIO)) {
229 controlstats.fs_count++;
230 controlstats.fs_bytes += bytes;
231 }
232 if (flowop->fo_attrs & FLOW_ATTR_READ) {
233 threadflow->tf_stats.fs_rbytes += bytes;
234 threadflow->tf_stats.fs_rcount++;
235 flowop->fo_stats.fs_rcount++;
236 controlstats.fs_rbytes += bytes;
237 controlstats.fs_rcount++;
238 } else if (flowop->fo_attrs & FLOW_ATTR_WRITE) {
239 threadflow->tf_stats.fs_wbytes += bytes;
240 threadflow->tf_stats.fs_wcount++;
241 flowop->fo_stats.fs_wcount++;
242 controlstats.fs_wbytes += bytes;
243 controlstats.fs_wcount++;
244 }
245 (void) ipc_mutex_unlock(&controlstats_lock);
246 }
247
248 /*
249 * Calls the flowop's initialization function, pointed to by
250 * flowop->fo_init.
251 */
252 static int
flowop_initflow(flowop_t * flowop)253 flowop_initflow(flowop_t *flowop)
254 {
255 /*
256 * save static copies of two items, in case they are supplied
257 * from random variables
258 */
259 if (!AVD_IS_STRING(flowop->fo_value))
260 flowop->fo_constvalue = avd_get_int(flowop->fo_value);
261
262 flowop->fo_constwss = avd_get_int(flowop->fo_wss);
263
264 if ((*flowop->fo_init)(flowop) < 0) {
265 filebench_log(LOG_ERROR, "flowop %s-%d init failed",
266 flowop->fo_name, flowop->fo_instance);
267 return (-1);
268 }
269 return (0);
270 }
271
272 static int
flowop_create_runtime_flowops(threadflow_t * threadflow,flowop_t ** ops_list_ptr)273 flowop_create_runtime_flowops(threadflow_t *threadflow, flowop_t **ops_list_ptr)
274 {
275 flowop_t *flowop = *ops_list_ptr;
276
277 while (flowop) {
278 flowop_t *newflowop;
279
280 if (flowop == *ops_list_ptr)
281 *ops_list_ptr = NULL;
282
283 newflowop = flowop_define_common(threadflow, flowop->fo_name,
284 flowop, ops_list_ptr, 1, 0);
285 if (newflowop == NULL)
286 return (FILEBENCH_ERROR);
287
288 /* check for fo_filename attribute, and resolve if present */
289 if (flowop->fo_filename) {
290 char *name;
291
292 name = avd_get_str(flowop->fo_filename);
293 newflowop->fo_fileset = fileset_find(name);
294
295 if (newflowop->fo_fileset == NULL) {
296 filebench_log(LOG_ERROR,
297 "flowop %s: file %s not found",
298 newflowop->fo_name, name);
299 filebench_shutdown(1);
300 }
301 }
302
303 if (flowop_initflow(newflowop) < 0) {
304 filebench_log(LOG_ERROR, "Flowop init of %s failed",
305 newflowop->fo_name);
306 }
307
308 flowop = flowop->fo_exec_next;
309 }
310 return (FILEBENCH_OK);
311 }
312
313 /*
314 * Calls the flowop's destruct function, pointed to by
315 * flowop->fo_destruct.
316 */
317 static void
flowop_destructflow(flowop_t * flowop)318 flowop_destructflow(flowop_t *flowop)
319 {
320 (*flowop->fo_destruct)(flowop);
321 }
322
323 /*
324 * call the destruct funtions of all the threadflow's flowops,
325 * if it is still flagged as "running".
326 */
327 void
flowop_destruct_all_flows(threadflow_t * threadflow)328 flowop_destruct_all_flows(threadflow_t *threadflow)
329 {
330 flowop_t *flowop;
331
332 /* wait a moment to give other threads a chance to stop too */
333 (void) sleep(1);
334
335 (void) ipc_mutex_lock(&threadflow->tf_lock);
336
337 /* prepare to call destruct flow routines, if necessary */
338 if (threadflow->tf_running == 0) {
339
340 /* allready destroyed */
341 (void) ipc_mutex_unlock(&threadflow->tf_lock);
342 return;
343 }
344
345 flowop = threadflow->tf_thrd_fops;
346 threadflow->tf_running = 0;
347 (void) ipc_mutex_unlock(&threadflow->tf_lock);
348
349 while (flowop) {
350 flowop_destructflow(flowop);
351 flowop = flowop->fo_exec_next;
352 }
353 }
354
355 /*
356 * The final initialization and main execution loop for the
357 * worker threads. Sets threadflow and flowop start times,
358 * waits for all process to start, then creates the runtime
359 * flowops from those defined by the F language workload
360 * script. It does some more initialization, then enters a
361 * loop to repeatedly execute the flowops on the flowop list
362 * until an abort condition is detected, at which time it exits.
363 * This is the starting routine for the new worker thread
364 * created by threadflow_createthread(), and is not currently
365 * called from anywhere else.
366 */
367 void
flowop_start(threadflow_t * threadflow)368 flowop_start(threadflow_t *threadflow)
369 {
370 flowop_t *flowop;
371 size_t memsize;
372 int ret = FILEBENCH_OK;
373
374 #ifdef HAVE_PROCFS
375 if (noproc == 0) {
376 char procname[128];
377 long ctl[2] = {PCSET, PR_MSACCT};
378 int pfd;
379
380 (void) snprintf(procname, sizeof (procname),
381 "/proc/%d/lwp/%d/lwpctl", my_pid, _lwp_self());
382 pfd = open(procname, O_WRONLY);
383 (void) pwrite(pfd, &ctl, sizeof (ctl), 0);
384 (void) close(pfd);
385 }
386 #endif
387
388 (void) ipc_mutex_lock(&controlstats_lock);
389 if (!controlstats_zeroed) {
390 (void) memset(&controlstats, 0, sizeof (controlstats));
391 controlstats_zeroed = 1;
392 }
393 (void) ipc_mutex_unlock(&controlstats_lock);
394
395 flowop = threadflow->tf_thrd_fops;
396 threadflow->tf_stats.fs_stime = gethrtime();
397 flowop->fo_stats.fs_stime = gethrtime();
398
399 /* Hold the flowop find lock as reader to prevent lookups */
400 (void) pthread_rwlock_rdlock(&filebench_shm->shm_flowop_find_lock);
401
402 /*
403 * Block until all processes have started, acting like
404 * a barrier. The original filebench process initially
405 * holds the run_lock as a reader, preventing any of the
406 * threads from obtaining the writer lock, and hence
407 * passing this point. Once all processes and threads
408 * have been created, the original process unlocks
409 * run_lock, allowing each waiting thread to lock
410 * and then immediately unlock it, then begin running.
411 */
412 (void) pthread_rwlock_wrlock(&filebench_shm->shm_run_lock);
413 (void) pthread_rwlock_unlock(&filebench_shm->shm_run_lock);
414
415 /* Create the runtime flowops from those defined by the script */
416 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
417 if (flowop_create_runtime_flowops(threadflow, &threadflow->tf_thrd_fops)
418 != FILEBENCH_OK) {
419 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
420 filebench_shutdown(1);
421 return;
422 }
423 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
424
425 /* Release the find lock as reader to allow lookups */
426 (void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
427
428 /* Set to the start of the new flowop list */
429 flowop = threadflow->tf_thrd_fops;
430
431 threadflow->tf_abort = 0;
432 threadflow->tf_running = 1;
433
434 memsize = (size_t)threadflow->tf_constmemsize;
435
436 /* If we are going to use ISM, allocate later */
437 if (threadflow->tf_attrs & THREADFLOW_USEISM) {
438 threadflow->tf_mem =
439 ipc_ismmalloc(memsize);
440 } else {
441 threadflow->tf_mem =
442 malloc(memsize);
443 }
444
445 (void) memset(threadflow->tf_mem, 0, memsize);
446 filebench_log(LOG_DEBUG_SCRIPT, "Thread allocated %d bytes", memsize);
447
448 #ifdef HAVE_LWPS
449 filebench_log(LOG_DEBUG_SCRIPT, "Thread %zx (%d) started",
450 threadflow,
451 _lwp_self());
452 #endif
453
454 /* Main filebench worker loop */
455 while (ret == FILEBENCH_OK) {
456 int i, count;
457
458 /* Abort if asked */
459 if (threadflow->tf_abort || filebench_shm->shm_f_abort)
460 break;
461
462 /* Be quiet while stats are gathered */
463 if (filebench_shm->shm_bequiet) {
464 (void) sleep(1);
465 continue;
466 }
467
468 /* Take it easy until everyone is ready to go */
469 if (!filebench_shm->shm_procs_running) {
470 (void) sleep(1);
471 continue;
472 }
473
474 if (flowop == NULL) {
475 filebench_log(LOG_ERROR, "flowop_read null flowop");
476 return;
477 }
478
479 if (flowop->fo_stats.fs_stime == 0)
480 flowop->fo_stats.fs_stime = gethrtime();
481
482 /* Execute the flowop for fo_iters times */
483 count = (int)avd_get_int(flowop->fo_iters);
484 for (i = 0; i < count; i++) {
485
486 filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
487 "%s-%d", threadflow->tf_name, flowop->fo_name,
488 flowop->fo_instance);
489
490 ret = (*flowop->fo_func)(threadflow, flowop);
491
492 /*
493 * Return value FILEBENCH_ERROR means "flowop
494 * failed, stop the filebench run"
495 */
496 if (ret == FILEBENCH_ERROR) {
497 filebench_log(LOG_ERROR,
498 "%s-%d: flowop %s-%d failed",
499 threadflow->tf_name,
500 threadflow->tf_instance,
501 flowop->fo_name,
502 flowop->fo_instance);
503 (void) ipc_mutex_lock(&threadflow->tf_lock);
504 threadflow->tf_abort = 1;
505 filebench_shm->shm_f_abort =
506 FILEBENCH_ABORT_ERROR;
507 (void) ipc_mutex_unlock(&threadflow->tf_lock);
508 break;
509 }
510
511 /*
512 * Return value of FILEBENCH_NORSC means "stop
513 * the filebench run" if in "end on no work mode",
514 * otherwise it indicates an error
515 */
516 if (ret == FILEBENCH_NORSC) {
517 (void) ipc_mutex_lock(&threadflow->tf_lock);
518 threadflow->tf_abort = FILEBENCH_DONE;
519 if (filebench_shm->shm_rmode ==
520 FILEBENCH_MODE_Q1STDONE) {
521 filebench_shm->shm_f_abort =
522 FILEBENCH_ABORT_RSRC;
523 } else if (filebench_shm->shm_rmode !=
524 FILEBENCH_MODE_QALLDONE) {
525 filebench_log(LOG_ERROR1,
526 "WARNING! Run stopped early:\n "
527 " flowop %s-%d could "
528 "not obtain a file. Please\n "
529 " reduce runtime, "
530 "increase fileset entries "
531 "($nfiles), or switch modes.",
532 flowop->fo_name,
533 flowop->fo_instance);
534 filebench_shm->shm_f_abort =
535 FILEBENCH_ABORT_ERROR;
536 }
537 (void) ipc_mutex_unlock(&threadflow->tf_lock);
538 break;
539 }
540
541 /*
542 * Return value of FILEBENCH_DONE means "stop
543 * the filebench run without error"
544 */
545 if (ret == FILEBENCH_DONE) {
546 (void) ipc_mutex_lock(&threadflow->tf_lock);
547 threadflow->tf_abort = FILEBENCH_DONE;
548 filebench_shm->shm_f_abort =
549 FILEBENCH_ABORT_DONE;
550 (void) ipc_mutex_unlock(&threadflow->tf_lock);
551 break;
552 }
553
554 /*
555 * If we get here and the return is something other
556 * than FILEBENCH_OK, it means a spurious code
557 * was returned, so treat as major error. This
558 * probably indicates a bug in the flowop.
559 */
560 if (ret != FILEBENCH_OK) {
561 filebench_log(LOG_ERROR,
562 "Flowop %s unexpected return value = %d\n",
563 flowop->fo_name, ret);
564 filebench_shm->shm_f_abort =
565 FILEBENCH_ABORT_ERROR;
566 break;
567 }
568 }
569
570 /* advance to next flowop */
571 flowop = flowop->fo_exec_next;
572
573 /* but if at end of list, start over from the beginning */
574 if (flowop == NULL) {
575 flowop = threadflow->tf_thrd_fops;
576 threadflow->tf_stats.fs_count++;
577 }
578 }
579
580 #ifdef HAVE_LWPS
581 filebench_log(LOG_DEBUG_SCRIPT, "Thread %d exiting",
582 _lwp_self());
583 #endif
584
585 /* Tell flowops to destroy locally acquired state */
586 flowop_destruct_all_flows(threadflow);
587
588 pthread_exit(&threadflow->tf_abort);
589 }
590
591 void flowoplib_flowinit(void);
592 void fb_lfs_flowinit(void);
593
594 void
flowop_init(void)595 flowop_init(void)
596 {
597 (void) pthread_mutex_init(&controlstats_lock,
598 ipc_mutexattr(IPC_MUTEX_NORMAL));
599 flowoplib_flowinit();
600 }
601
602 static int plugin_flowinit_done = FALSE;
603
604 /*
605 * Initialize any "plug-in" flowops. Called when the first "create fileset"
606 * command is encountered.
607 */
608 void
flowop_plugin_flowinit(void)609 flowop_plugin_flowinit(void)
610 {
611 if (plugin_flowinit_done)
612 return;
613
614 plugin_flowinit_done = TRUE;
615
616 switch (filebench_shm->shm_filesys_type) {
617 case LOCAL_FS_PLUG:
618 fb_lfs_flowinit();
619 break;
620
621 case NFS3_PLUG:
622 case NFS4_PLUG:
623 case CIFS_PLUG:
624 break;
625 }
626 }
627
628
629 /*
630 * Delete the designated flowop from the thread's flowop list.
631 */
632 static void
flowop_delete(flowop_t ** flowoplist,flowop_t * flowop)633 flowop_delete(flowop_t **flowoplist, flowop_t *flowop)
634 {
635 flowop_t *entry = *flowoplist;
636 int found = 0;
637
638 filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
639 flowop->fo_name,
640 flowop->fo_instance);
641
642 /* Delete from thread's flowop list */
643 if (flowop == *flowoplist) {
644 /* First on list */
645 *flowoplist = flowop->fo_exec_next;
646 filebench_log(LOG_DEBUG_IMPL,
647 "Delete0 flowop: (%s-%d)",
648 flowop->fo_name,
649 flowop->fo_instance);
650 } else {
651 while (entry->fo_exec_next) {
652 filebench_log(LOG_DEBUG_IMPL,
653 "Delete0 flowop: (%s-%d) == (%s-%d)",
654 entry->fo_exec_next->fo_name,
655 entry->fo_exec_next->fo_instance,
656 flowop->fo_name,
657 flowop->fo_instance);
658
659 if (flowop == entry->fo_exec_next) {
660 /* Delete */
661 filebench_log(LOG_DEBUG_IMPL,
662 "Deleted0 flowop: (%s-%d)",
663 entry->fo_exec_next->fo_name,
664 entry->fo_exec_next->fo_instance);
665 entry->fo_exec_next =
666 entry->fo_exec_next->fo_exec_next;
667 break;
668 }
669 entry = entry->fo_exec_next;
670 }
671 }
672
673 #ifdef HAVE_PROCFS
674 /* Close /proc stats */
675 if (flowop->fo_thread)
676 (void) close(flowop->fo_thread->tf_lwpusagefd);
677 #endif
678
679 /* Delete from global list */
680 entry = filebench_shm->shm_flowoplist;
681
682 if (flowop == filebench_shm->shm_flowoplist) {
683 /* First on list */
684 filebench_shm->shm_flowoplist = flowop->fo_next;
685 found = 1;
686 } else {
687 while (entry->fo_next) {
688 filebench_log(LOG_DEBUG_IMPL,
689 "Delete flowop: (%s-%d) == (%s-%d)",
690 entry->fo_next->fo_name,
691 entry->fo_next->fo_instance,
692 flowop->fo_name,
693 flowop->fo_instance);
694
695 if (flowop == entry->fo_next) {
696 /* Delete */
697 entry->fo_next = entry->fo_next->fo_next;
698 found = 1;
699 break;
700 }
701
702 entry = entry->fo_next;
703 }
704 }
705 if (found) {
706 filebench_log(LOG_DEBUG_IMPL,
707 "Deleted flowop: (%s-%d)",
708 flowop->fo_name,
709 flowop->fo_instance);
710 ipc_free(FILEBENCH_FLOWOP, (char *)flowop);
711 } else {
712 filebench_log(LOG_DEBUG_IMPL, "Flowop %s-%d not found!",
713 flowop->fo_name,
714 flowop->fo_instance);
715 }
716 }
717
718 /*
719 * Deletes all the flowops from a flowop list.
720 */
721 void
flowop_delete_all(flowop_t ** flowoplist)722 flowop_delete_all(flowop_t **flowoplist)
723 {
724 flowop_t *flowop = *flowoplist;
725 flowop_t *next_flowop;
726
727 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
728
729 while (flowop) {
730 filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
731 flowop->fo_name, flowop->fo_instance);
732
733 if (flowop->fo_instance &&
734 (flowop->fo_instance == FLOW_MASTER)) {
735 flowop = flowop->fo_exec_next;
736 continue;
737 }
738 next_flowop = flowop->fo_exec_next;
739 flowop_delete(flowoplist, flowop);
740 flowop = next_flowop;
741 }
742
743 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
744 }
745
746 /*
747 * Allocates a flowop entity and initializes it with inherited
748 * contents from the "inherit" flowop, if it is supplied, or
749 * with zeros otherwise. In either case the fo_next and fo_exec_next
750 * pointers are set to NULL, and fo_thread is set to point to
751 * the owning threadflow. The initialized flowop is placed at
752 * the head of the global flowop list, and also placed on the
753 * tail of the supplied local flowop list, which will either
754 * be a threadflow's tf_thrd_fops list or a composite flowop's
755 * fo_comp_fops list. The routine locks the flowop's fo_lock and
756 * leaves it held on return. If successful, it returns a pointer
757 * to the allocated and initialized flowop, otherwise it returns NULL.
758 *
759 * filebench_shm->shm_flowop_lock must be held by caller.
760 */
761 static flowop_t *
flowop_define_common(threadflow_t * threadflow,char * name,flowop_t * inherit,flowop_t ** flowoplist_hdp,int instance,int type)762 flowop_define_common(threadflow_t *threadflow, char *name, flowop_t *inherit,
763 flowop_t **flowoplist_hdp, int instance, int type)
764 {
765 flowop_t *flowop;
766
767 if (name == NULL)
768 return (NULL);
769
770 if ((flowop = (flowop_t *)ipc_malloc(FILEBENCH_FLOWOP)) == NULL) {
771 filebench_log(LOG_ERROR,
772 "flowop_define: Can't malloc flowop");
773 return (NULL);
774 }
775
776 filebench_log(LOG_DEBUG_IMPL, "defining flowops %s-%d, addr %zx",
777 name, instance, flowop);
778
779 if (flowop == NULL)
780 return (NULL);
781
782 if (inherit) {
783 (void) memcpy(flowop, inherit, sizeof (flowop_t));
784 (void) pthread_mutex_init(&flowop->fo_lock,
785 ipc_mutexattr(IPC_MUTEX_PRI_ROB));
786 (void) ipc_mutex_lock(&flowop->fo_lock);
787 flowop->fo_next = NULL;
788 flowop->fo_exec_next = NULL;
789 filebench_log(LOG_DEBUG_IMPL,
790 "flowop %s-%d calling init", name, instance);
791 } else {
792 (void) memset(flowop, 0, sizeof (flowop_t));
793 flowop->fo_iters = avd_int_alloc(1);
794 flowop->fo_type = type;
795 (void) pthread_mutex_init(&flowop->fo_lock,
796 ipc_mutexattr(IPC_MUTEX_PRI_ROB));
797 (void) ipc_mutex_lock(&flowop->fo_lock);
798 }
799
800 /* Create backpointer to thread */
801 flowop->fo_thread = threadflow;
802
803 /* Add flowop to global list */
804 if (filebench_shm->shm_flowoplist == NULL) {
805 filebench_shm->shm_flowoplist = flowop;
806 flowop->fo_next = NULL;
807 } else {
808 flowop->fo_next = filebench_shm->shm_flowoplist;
809 filebench_shm->shm_flowoplist = flowop;
810 }
811
812 (void) strcpy(flowop->fo_name, name);
813 flowop->fo_instance = instance;
814
815 if (flowoplist_hdp == NULL)
816 return (flowop);
817
818 /* Add flowop to thread op list */
819 if (*flowoplist_hdp == NULL) {
820 *flowoplist_hdp = flowop;
821 flowop->fo_exec_next = NULL;
822 } else {
823 flowop_t *flowend;
824
825 /* Find the end of the thread list */
826 flowend = *flowoplist_hdp;
827 while (flowend->fo_exec_next != NULL)
828 flowend = flowend->fo_exec_next;
829 flowend->fo_exec_next = flowop;
830 flowop->fo_exec_next = NULL;
831 }
832
833 return (flowop);
834 }
835
836 /*
837 * Calls flowop_define_common() to allocate and initialize a
838 * flowop, and holds the shared flowop_lock during the call.
839 * It releases the created flowop's fo_lock when done.
840 */
841 flowop_t *
flowop_define(threadflow_t * threadflow,char * name,flowop_t * inherit,flowop_t ** flowoplist_hdp,int instance,int type)842 flowop_define(threadflow_t *threadflow, char *name, flowop_t *inherit,
843 flowop_t **flowoplist_hdp, int instance, int type)
844 {
845 flowop_t *flowop;
846
847 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
848 flowop = flowop_define_common(threadflow, name,
849 inherit, flowoplist_hdp, instance, type);
850 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
851
852 if (flowop == NULL)
853 return (NULL);
854
855 (void) ipc_mutex_unlock(&flowop->fo_lock);
856
857 return (flowop);
858 }
859
860 /*
861 * Calls flowop_define_common() to allocate and initialize a
862 * composite flowop, and holds the shared flowop_lock during the call.
863 * It releases the created flowop's fo_lock when done.
864 */
865 flowop_t *
flowop_new_composite_define(char * name)866 flowop_new_composite_define(char *name)
867 {
868 flowop_t *flowop;
869
870 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
871 flowop = flowop_define_common(NULL, name,
872 NULL, NULL, 0, FLOW_TYPE_COMPOSITE);
873 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
874
875 if (flowop == NULL)
876 return (NULL);
877
878 flowop->fo_func = flowop_composite;
879 flowop->fo_init = flowop_composite_init;
880 flowop->fo_destruct = flowop_composite_destruct;
881 (void) ipc_mutex_unlock(&flowop->fo_lock);
882
883 return (flowop);
884 }
885
886 /*
887 * Attempts to take a write lock on the flowop_find_lock that is
888 * defined in interprocess shared memory. Since each call to
889 * flowop_start() holds a read lock on flowop_find_lock, this
890 * routine effectively blocks until all instances of
891 * flowop_start() have finished. The flowop_find() routine calls
892 * this routine so that flowops won't be searched for until all
893 * flowops have been created by flowop_start.
894 */
895 static void
flowop_find_barrier(void)896 flowop_find_barrier(void)
897 {
898 /* Block on wrlock to ensure find waits for all creates */
899 (void) pthread_rwlock_wrlock(&filebench_shm->shm_flowop_find_lock);
900 (void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
901 }
902
903 /*
904 * Returns a list of flowops named "name" from the master
905 * flowop list.
906 */
907 flowop_t *
flowop_find(char * name)908 flowop_find(char *name)
909 {
910 flowop_t *flowop;
911 flowop_t *result = NULL;
912
913 flowop_find_barrier();
914
915 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
916
917 flowop = filebench_shm->shm_flowoplist;
918
919 while (flowop) {
920 if (strcmp(name, flowop->fo_name) == 0) {
921
922 /* Add flowop to result list */
923 if (result == NULL) {
924 result = flowop;
925 flowop->fo_resultnext = NULL;
926 } else {
927 flowop->fo_resultnext = result;
928 result = flowop;
929 }
930 }
931 flowop = flowop->fo_next;
932 }
933
934 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
935
936
937 return (result);
938 }
939
940 /*
941 * Returns a pointer to the specified instance of flowop
942 * "name" from the global list.
943 */
944 flowop_t *
flowop_find_one(char * name,int instance)945 flowop_find_one(char *name, int instance)
946 {
947 flowop_t *test_flowop;
948
949 flowop_find_barrier();
950
951 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
952
953 test_flowop = filebench_shm->shm_flowoplist;
954
955 while (test_flowop) {
956 if ((strcmp(name, test_flowop->fo_name) == 0) &&
957 (instance == test_flowop->fo_instance))
958 break;
959
960 test_flowop = test_flowop->fo_next;
961 }
962
963 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
964
965 return (test_flowop);
966 }
967
968 /*
969 * recursively searches through lists of flowops on a given thread
970 * and those on any included composite flowops for the named flowop.
971 * either returns with a pointer to the named flowop or NULL if it
972 * cannot be found.
973 */
974 static flowop_t *
flowop_recurse_search(char * path,char * name,flowop_t * list)975 flowop_recurse_search(char *path, char *name, flowop_t *list)
976 {
977 flowop_t *test_flowop;
978 char fullname[MAXPATHLEN];
979
980 test_flowop = list;
981
982 /*
983 * when searching a list of inner flowops, "path" is the fullname
984 * of the containing composite flowop. Use it to form the
985 * full name of the inner flowop to search for.
986 */
987 if (path) {
988 if ((strlen(path) + strlen(name) + 1) > MAXPATHLEN) {
989 filebench_log(LOG_ERROR,
990 "composite flowop path name %s.%s too long",
991 path, name);
992 return (NULL);
993 }
994
995 /* create composite_name.name for recursive search */
996 (void) strcpy(fullname, path);
997 (void) strcat(fullname, ".");
998 (void) strcat(fullname, name);
999 } else {
1000 (void) strcpy(fullname, name);
1001 }
1002
1003 /*
1004 * loop through all flowops on the supplied tf_thrd_fops (flowop)
1005 * list or fo_comp_fops (inner flowop) list.
1006 */
1007 while (test_flowop) {
1008 if (strcmp(fullname, test_flowop->fo_name) == 0)
1009 return (test_flowop);
1010
1011 if (test_flowop->fo_type == FLOW_TYPE_COMPOSITE) {
1012 flowop_t *found_flowop;
1013
1014 found_flowop = flowop_recurse_search(
1015 test_flowop->fo_name, name,
1016 test_flowop->fo_comp_fops);
1017
1018 if (found_flowop)
1019 return (found_flowop);
1020 }
1021 test_flowop = test_flowop->fo_exec_next;
1022 }
1023
1024 /* not found here or on any child lists */
1025 return (NULL);
1026 }
1027
1028 /*
1029 * Returns a pointer to flowop named "name" from the supplied tf_thrd_fops
1030 * list of flowops. Returns the named flowop if found, or NULL.
1031 */
1032 flowop_t *
flowop_find_from_list(char * name,flowop_t * list)1033 flowop_find_from_list(char *name, flowop_t *list)
1034 {
1035 flowop_t *found_flowop;
1036
1037 flowop_find_barrier();
1038
1039 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
1040
1041 found_flowop = flowop_recurse_search(NULL, name, list);
1042
1043 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
1044
1045 return (found_flowop);
1046 }
1047
1048 /*
1049 * Composite flowop method. Does one pass through its list of
1050 * inner flowops per iteration.
1051 */
1052 static int
flowop_composite(threadflow_t * threadflow,flowop_t * flowop)1053 flowop_composite(threadflow_t *threadflow, flowop_t *flowop)
1054 {
1055 flowop_t *inner_flowop;
1056
1057 /* get the first flowop in the list */
1058 inner_flowop = flowop->fo_comp_fops;
1059
1060 /* make a pass through the list of sub flowops */
1061 while (inner_flowop) {
1062 int i, count;
1063
1064 /* Abort if asked */
1065 if (threadflow->tf_abort || filebench_shm->shm_f_abort)
1066 return (FILEBENCH_DONE);
1067
1068 if (inner_flowop->fo_stats.fs_stime == 0)
1069 inner_flowop->fo_stats.fs_stime = gethrtime();
1070
1071 /* Execute the flowop for fo_iters times */
1072 count = (int)avd_get_int(inner_flowop->fo_iters);
1073 for (i = 0; i < count; i++) {
1074
1075 filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
1076 "%s-%d", threadflow->tf_name,
1077 inner_flowop->fo_name,
1078 inner_flowop->fo_instance);
1079
1080 switch ((*inner_flowop->fo_func)(threadflow,
1081 inner_flowop)) {
1082
1083 /* all done */
1084 case FILEBENCH_DONE:
1085 return (FILEBENCH_DONE);
1086
1087 /* quit if inner flowop limit reached */
1088 case FILEBENCH_NORSC:
1089 return (FILEBENCH_NORSC);
1090
1091 /* quit on inner flowop error */
1092 case FILEBENCH_ERROR:
1093 filebench_log(LOG_ERROR,
1094 "inner flowop %s failed",
1095 inner_flowop->fo_name);
1096 return (FILEBENCH_ERROR);
1097
1098 /* otherwise keep going */
1099 default:
1100 break;
1101 }
1102
1103 }
1104
1105 /* advance to next flowop */
1106 inner_flowop = inner_flowop->fo_exec_next;
1107 }
1108
1109 /* finished with this pass */
1110 return (FILEBENCH_OK);
1111 }
1112
1113 /*
1114 * Composite flowop initialization. Creates runtime inner flowops
1115 * from prototype inner flowops.
1116 */
1117 static int
flowop_composite_init(flowop_t * flowop)1118 flowop_composite_init(flowop_t *flowop)
1119 {
1120 int err;
1121
1122 err = flowop_create_runtime_flowops(flowop->fo_thread,
1123 &flowop->fo_comp_fops);
1124 if (err != FILEBENCH_OK)
1125 return (err);
1126
1127 (void) ipc_mutex_unlock(&flowop->fo_lock);
1128 return (0);
1129 }
1130
1131 /*
1132 * clean up inner flowops
1133 */
1134 static void
flowop_composite_destruct(flowop_t * flowop)1135 flowop_composite_destruct(flowop_t *flowop)
1136 {
1137 flowop_t *inner_flowop = flowop->fo_comp_fops;
1138
1139 while (inner_flowop) {
1140 filebench_log(LOG_DEBUG_IMPL, "Deleting inner flowop (%s-%d)",
1141 inner_flowop->fo_name, inner_flowop->fo_instance);
1142
1143 if (inner_flowop->fo_instance &&
1144 (inner_flowop->fo_instance == FLOW_MASTER)) {
1145 inner_flowop = inner_flowop->fo_exec_next;
1146 continue;
1147 }
1148 flowop_delete(&flowop->fo_comp_fops, inner_flowop);
1149 inner_flowop = inner_flowop->fo_exec_next;
1150 }
1151 }
1152
1153 /*
1154 * Support routines for libraries of flowops
1155 */
1156
1157 int
flowop_init_generic(flowop_t * flowop)1158 flowop_init_generic(flowop_t *flowop)
1159 {
1160 (void) ipc_mutex_unlock(&flowop->fo_lock);
1161 return (FILEBENCH_OK);
1162 }
1163
1164 void
flowop_destruct_generic(flowop_t * flowop)1165 flowop_destruct_generic(flowop_t *flowop)
1166 {
1167 char *buf;
1168
1169 /* release any local resources held by the flowop */
1170 (void) ipc_mutex_lock(&flowop->fo_lock);
1171 buf = flowop->fo_buf;
1172 flowop->fo_buf = NULL;
1173 (void) ipc_mutex_unlock(&flowop->fo_lock);
1174
1175 if (buf)
1176 free(buf);
1177 }
1178
1179
1180 /*
1181 * Loops through the supplied list of flowops and creates and initializes
1182 * a flowop for each one by calling flowop_define. As a side effect of
1183 * calling flowop define, the created flowops are placed on the
1184 * master flowop list. All created flowops are set to instance "0".
1185 */
1186 void
flowop_flow_init(flowop_proto_t * list,int nops)1187 flowop_flow_init(flowop_proto_t *list, int nops)
1188 {
1189 int i;
1190
1191 for (i = 0; i < nops; i++) {
1192 flowop_t *flowop;
1193 flowop_proto_t *fl;
1194
1195 fl = &(list[i]);
1196
1197 if ((flowop = flowop_define(NULL,
1198 fl->fl_name, NULL, NULL, 0, fl->fl_type)) == 0) {
1199 filebench_log(LOG_ERROR,
1200 "failed to create flowop %s\n",
1201 fl->fl_name);
1202 filebench_shutdown(1);
1203 }
1204
1205 flowop->fo_func = fl->fl_func;
1206 flowop->fo_init = fl->fl_init;
1207 flowop->fo_destruct = fl->fl_destruct;
1208 flowop->fo_attrs = fl->fl_attrs;
1209 }
1210 }
1211