1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 #include "config.h" 27 28 #ifdef HAVE_LWPS 29 #include <sys/lwp.h> 30 #endif 31 #include <fcntl.h> 32 #include "filebench.h" 33 #include "flowop.h" 34 #include "stats.h" 35 36 #ifdef LINUX_PORT 37 #include <sys/types.h> 38 #include <linux/unistd.h> 39 #endif 40 41 static flowop_t *flowop_define_common(threadflow_t *threadflow, char *name, 42 flowop_t *inherit, flowop_t **flowoplist_hdp, int instance, int type); 43 static int flowop_composite(threadflow_t *threadflow, flowop_t *flowop); 44 static int flowop_composite_init(flowop_t *flowop); 45 static void flowop_composite_destruct(flowop_t *flowop); 46 47 /* 48 * A collection of flowop support functions. The actual code that 49 * implements the various flowops is in flowop_library.c. 50 * 51 * Routines for defining, creating, initializing and destroying 52 * flowops, cyclically invoking the flowops on each threadflow's flowop 53 * list, collecting statistics about flowop execution, and other 54 * housekeeping duties are included in this file. 55 * 56 * User Defined Composite Flowops 57 * The ability to define new flowops as lists of built-in or previously 58 * defined flowops has been added to Filebench. In a sense they are like 59 * in-line subroutines, which can have default attributes set at definition 60 * time and passed arguments at invocation time. Like other flowops (and 61 * unlike conventional subroutines) you can invoke them with an iteration 62 * count (the "iter" attribute), and they will loop through their associated 63 * list of flowops for "iter" number of times each time they are encountered 64 * in the thread or outer composite flowop which invokes them. 65 * 66 * Composite flowops are created with a "define" command, are given a name, 67 * optional default attributes, and local variable definitions on the 68 * "define" command line, followed by a brace enclosed list of flowops 69 * to execute. The enclosed flowops may include attributes that reference 70 * the local variables, as well as constants and global variables. 71 * 72 * Composite flowops are used pretty much like regular flowops, but you can 73 * also set local variables to constants or global variables ($local_var = 74 * [$var | $random_var | string | boolean | integer | double]) as part of 75 * the invocation. Thus each invocation can pass customized values to its 76 * inner flowops, greatly increasing their generality. 77 * 78 * All flowops are placed on a global, singly linked list, with fo_next 79 * being the link pointer for this list. The are also placed on a private 80 * list for the thread or composite flowop they are part of. The tf_thrd_fops 81 * pointer in the thread will point to the list of top level flowops in the 82 * thread, which are linked together by fo_exec_next. If any of these flowops 83 * are composite flowops, they will have a list of second level flowops rooted 84 * at the composite flowop's fo_comp_fops pointer. So, there is one big list 85 * of all flowops, and an n-arry tree of threads, composite flowops, and 86 * flowops, with composite flowops being the branch nodes in the tree. 87 * 88 * To illustrate, if we have three first level flowops, the first of which is 89 * a composite flowop consisting of two other flowops, we get: 90 * 91 * Thread->tf_thrd_fops -> flowop->fo_exec_next -> flowop->fo_exec_next 92 * flowop->fo_comp_fops | 93 * | V 94 * | flowop->fo_exec_next 95 * | 96 * V 97 * flowop->fo_exec_next -> flowop->fo_exec_next 98 * 99 * And all five flowops (plus others from any other threads) are on a global 100 * list linked with fo_next. 101 */ 102 103 /* 104 * Prints the name and instance number of each flowop in 105 * the supplied list to the filebench log. 106 */ 107 int 108 flowop_printlist(flowop_t *list) 109 { 110 flowop_t *flowop = list; 111 112 while (flowop) { 113 filebench_log(LOG_DEBUG_IMPL, "flowop-list %s-%d", 114 flowop->fo_name, flowop->fo_instance); 115 flowop = flowop->fo_exec_next; 116 } 117 return (0); 118 } 119 120 /* 121 * Prints the name and instance number of all flowops on 122 * the master flowop list to the console and the filebench log. 123 */ 124 void 125 flowop_printall(void) 126 { 127 flowop_t *flowop = filebench_shm->shm_flowoplist; 128 129 while (flowop) { 130 filebench_log(LOG_VERBOSE, "flowop-list %s-%d", 131 flowop->fo_name, flowop->fo_instance); 132 flowop = flowop->fo_next; 133 } 134 } 135 136 #define TIMESPEC_TO_HRTIME(s, e) (((e.tv_sec - s.tv_sec) * 1000000000LL) + \ 137 (e.tv_nsec - s.tv_nsec)) 138 /* 139 * Puts current high resolution time in start time entry 140 * for threadflow and may also calculate running filebench 141 * overhead statistics. 142 */ 143 void 144 flowop_beginop(threadflow_t *threadflow, flowop_t *flowop) 145 { 146 #ifdef HAVE_PROCFS 147 if ((noproc == 0) && (threadflow->tf_lwpusagefd == 0)) { 148 char procname[128]; 149 150 (void) snprintf(procname, sizeof (procname), 151 "/proc/%d/lwp/%d/lwpusage", my_pid, _lwp_self()); 152 threadflow->tf_lwpusagefd = open(procname, O_RDONLY); 153 } 154 155 (void) pread(threadflow->tf_lwpusagefd, 156 &threadflow->tf_susage, 157 sizeof (struct prusage), 0); 158 159 /* Compute overhead time in this thread around op */ 160 if (threadflow->tf_eusage.pr_stime.tv_nsec) { 161 flowop->fo_stats.fs_mstate[FLOW_MSTATE_OHEAD] += 162 TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_utime, 163 threadflow->tf_susage.pr_utime) + 164 TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_ttime, 165 threadflow->tf_susage.pr_ttime) + 166 TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_stime, 167 threadflow->tf_susage.pr_stime); 168 } 169 #endif 170 /* Start of op for this thread */ 171 threadflow->tf_stime = gethrtime(); 172 } 173 174 flowstat_t controlstats; 175 pthread_mutex_t controlstats_lock; 176 static int controlstats_zeroed = 0; 177 178 /* 179 * Updates flowop's latency statistics, using saved start 180 * time and current high resolution time. Updates flowop's 181 * io count and transferred bytes statistics. Also updates 182 * threadflow's and flowop's cumulative read or write byte 183 * and io count statistics. 184 */ 185 void 186 flowop_endop(threadflow_t *threadflow, flowop_t *flowop, int64_t bytes) 187 { 188 hrtime_t t; 189 190 flowop->fo_stats.fs_mstate[FLOW_MSTATE_LAT] += 191 (gethrtime() - threadflow->tf_stime); 192 #ifdef HAVE_PROCFS 193 if ((pread(threadflow->tf_lwpusagefd, &threadflow->tf_eusage, 194 sizeof (struct prusage), 0)) != sizeof (struct prusage)) 195 filebench_log(LOG_ERROR, "cannot read /proc"); 196 197 t = 198 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_utime, 199 threadflow->tf_eusage.pr_utime) + 200 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_ttime, 201 threadflow->tf_eusage.pr_ttime) + 202 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_stime, 203 threadflow->tf_eusage.pr_stime); 204 flowop->fo_stats.fs_mstate[FLOW_MSTATE_CPU] += t; 205 206 flowop->fo_stats.fs_mstate[FLOW_MSTATE_WAIT] += 207 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_tftime, 208 threadflow->tf_eusage.pr_tftime) + 209 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_dftime, 210 threadflow->tf_eusage.pr_dftime) + 211 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime, 212 threadflow->tf_eusage.pr_kftime) + 213 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime, 214 threadflow->tf_eusage.pr_kftime) + 215 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_slptime, 216 threadflow->tf_eusage.pr_slptime); 217 #endif 218 219 flowop->fo_stats.fs_count++; 220 flowop->fo_stats.fs_bytes += bytes; 221 (void) ipc_mutex_lock(&controlstats_lock); 222 if ((flowop->fo_type & FLOW_TYPE_IO) || 223 (flowop->fo_type & FLOW_TYPE_AIO)) { 224 controlstats.fs_count++; 225 controlstats.fs_bytes += bytes; 226 } 227 if (flowop->fo_attrs & FLOW_ATTR_READ) { 228 threadflow->tf_stats.fs_rbytes += bytes; 229 threadflow->tf_stats.fs_rcount++; 230 flowop->fo_stats.fs_rcount++; 231 controlstats.fs_rbytes += bytes; 232 controlstats.fs_rcount++; 233 } else if (flowop->fo_attrs & FLOW_ATTR_WRITE) { 234 threadflow->tf_stats.fs_wbytes += bytes; 235 threadflow->tf_stats.fs_wcount++; 236 flowop->fo_stats.fs_wcount++; 237 controlstats.fs_wbytes += bytes; 238 controlstats.fs_wcount++; 239 } 240 (void) ipc_mutex_unlock(&controlstats_lock); 241 } 242 243 /* 244 * Calls the flowop's initialization function, pointed to by 245 * flowop->fo_init. 246 */ 247 static int 248 flowop_initflow(flowop_t *flowop) 249 { 250 /* 251 * save static copies of two items, in case they are supplied 252 * from random variables 253 */ 254 if (!AVD_IS_STRING(flowop->fo_value)) 255 flowop->fo_constvalue = avd_get_int(flowop->fo_value); 256 257 flowop->fo_constwss = avd_get_int(flowop->fo_wss); 258 259 if ((*flowop->fo_init)(flowop) < 0) { 260 filebench_log(LOG_ERROR, "flowop %s-%d init failed", 261 flowop->fo_name, flowop->fo_instance); 262 return (-1); 263 } 264 return (0); 265 } 266 267 static int 268 flowop_create_runtime_flowops(threadflow_t *threadflow, flowop_t **ops_list_ptr) 269 { 270 flowop_t *flowop = *ops_list_ptr; 271 272 while (flowop) { 273 flowop_t *newflowop; 274 275 if (flowop == *ops_list_ptr) 276 *ops_list_ptr = NULL; 277 278 newflowop = flowop_define_common(threadflow, flowop->fo_name, 279 flowop, ops_list_ptr, 1, 0); 280 if (newflowop == NULL) 281 return (FILEBENCH_ERROR); 282 283 /* check for fo_filename attribute, and resolve if present */ 284 if (flowop->fo_filename) { 285 char *name; 286 287 name = avd_get_str(flowop->fo_filename); 288 newflowop->fo_fileset = fileset_find(name); 289 290 if (newflowop->fo_fileset == NULL) { 291 filebench_log(LOG_ERROR, 292 "flowop %s: file %s not found", 293 newflowop->fo_name, name); 294 filebench_shutdown(1); 295 } 296 } 297 298 if (flowop_initflow(newflowop) < 0) { 299 filebench_log(LOG_ERROR, "Flowop init of %s failed", 300 newflowop->fo_name); 301 } 302 303 flowop = flowop->fo_exec_next; 304 } 305 return (FILEBENCH_OK); 306 } 307 308 /* 309 * Calls the flowop's destruct function, pointed to by 310 * flowop->fo_destruct. 311 */ 312 static void 313 flowop_destructflow(flowop_t *flowop) 314 { 315 (*flowop->fo_destruct)(flowop); 316 } 317 318 /* 319 * call the destruct funtions of all the threadflow's flowops, 320 * if it is still flagged as "running". 321 */ 322 void 323 flowop_destruct_all_flows(threadflow_t *threadflow) 324 { 325 flowop_t *flowop; 326 327 /* wait a moment to give other threads a chance to stop too */ 328 (void) sleep(1); 329 330 (void) ipc_mutex_lock(&threadflow->tf_lock); 331 332 /* prepare to call destruct flow routines, if necessary */ 333 if (threadflow->tf_running == 0) { 334 335 /* allready destroyed */ 336 (void) ipc_mutex_unlock(&threadflow->tf_lock); 337 return; 338 } 339 340 flowop = threadflow->tf_thrd_fops; 341 threadflow->tf_running = 0; 342 (void) ipc_mutex_unlock(&threadflow->tf_lock); 343 344 while (flowop) { 345 flowop_destructflow(flowop); 346 flowop = flowop->fo_exec_next; 347 } 348 } 349 350 /* 351 * The final initialization and main execution loop for the 352 * worker threads. Sets threadflow and flowop start times, 353 * waits for all process to start, then creates the runtime 354 * flowops from those defined by the F language workload 355 * script. It does some more initialization, then enters a 356 * loop to repeatedly execute the flowops on the flowop list 357 * until an abort condition is detected, at which time it exits. 358 * This is the starting routine for the new worker thread 359 * created by threadflow_createthread(), and is not currently 360 * called from anywhere else. 361 */ 362 void 363 flowop_start(threadflow_t *threadflow) 364 { 365 flowop_t *flowop; 366 size_t memsize; 367 int ret = 0; 368 369 #ifdef HAVE_PROCFS 370 if (noproc == 0) { 371 char procname[128]; 372 long ctl[2] = {PCSET, PR_MSACCT}; 373 int pfd; 374 375 (void) snprintf(procname, sizeof (procname), 376 "/proc/%d/lwp/%d/lwpctl", my_pid, _lwp_self()); 377 pfd = open(procname, O_WRONLY); 378 (void) pwrite(pfd, &ctl, sizeof (ctl), 0); 379 (void) close(pfd); 380 } 381 #endif 382 383 (void) ipc_mutex_lock(&controlstats_lock); 384 if (!controlstats_zeroed) { 385 (void) memset(&controlstats, 0, sizeof (controlstats)); 386 controlstats_zeroed = 1; 387 } 388 (void) ipc_mutex_unlock(&controlstats_lock); 389 390 flowop = threadflow->tf_thrd_fops; 391 threadflow->tf_stats.fs_stime = gethrtime(); 392 flowop->fo_stats.fs_stime = gethrtime(); 393 394 /* Hold the flowop find lock as reader to prevent lookups */ 395 (void) pthread_rwlock_rdlock(&filebench_shm->shm_flowop_find_lock); 396 397 /* 398 * Block until all processes have started, acting like 399 * a barrier. The original filebench process initially 400 * holds the run_lock as a reader, preventing any of the 401 * threads from obtaining the writer lock, and hence 402 * passing this point. Once all processes and threads 403 * have been created, the original process unlocks 404 * run_lock, allowing each waiting thread to lock 405 * and then immediately unlock it, then begin running. 406 */ 407 (void) pthread_rwlock_wrlock(&filebench_shm->shm_run_lock); 408 (void) pthread_rwlock_unlock(&filebench_shm->shm_run_lock); 409 410 /* Create the runtime flowops from those defined by the script */ 411 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock); 412 if (flowop_create_runtime_flowops(threadflow, &threadflow->tf_thrd_fops) 413 != FILEBENCH_OK) { 414 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock); 415 filebench_shutdown(1); 416 return; 417 } 418 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock); 419 420 /* Release the find lock as reader to allow lookups */ 421 (void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock); 422 423 /* Set to the start of the new flowop list */ 424 flowop = threadflow->tf_thrd_fops; 425 426 threadflow->tf_abort = 0; 427 threadflow->tf_running = 1; 428 429 memsize = (size_t)threadflow->tf_constmemsize; 430 431 /* If we are going to use ISM, allocate later */ 432 if (threadflow->tf_attrs & THREADFLOW_USEISM) { 433 threadflow->tf_mem = 434 ipc_ismmalloc(memsize); 435 } else { 436 threadflow->tf_mem = 437 malloc(memsize); 438 } 439 440 (void) memset(threadflow->tf_mem, 0, memsize); 441 filebench_log(LOG_DEBUG_SCRIPT, "Thread allocated %d bytes", memsize); 442 443 #ifdef HAVE_LWPS 444 filebench_log(LOG_DEBUG_SCRIPT, "Thread %zx (%d) started", 445 threadflow, 446 _lwp_self()); 447 #endif 448 449 /* Main filebench worker loop */ 450 /* CONSTCOND */ 451 while (1) { 452 int i, count; 453 454 /* Abort if asked */ 455 if (threadflow->tf_abort || filebench_shm->shm_f_abort) 456 break; 457 458 /* Be quiet while stats are gathered */ 459 if (filebench_shm->shm_bequiet) { 460 (void) sleep(1); 461 continue; 462 } 463 464 /* Take it easy until everyone is ready to go */ 465 if (!filebench_shm->shm_procs_running) { 466 (void) sleep(1); 467 continue; 468 } 469 470 if (flowop == NULL) { 471 filebench_log(LOG_ERROR, "flowop_read null flowop"); 472 return; 473 } 474 475 if (flowop->fo_stats.fs_stime == 0) 476 flowop->fo_stats.fs_stime = gethrtime(); 477 478 /* Execute the flowop for fo_iters times */ 479 count = (int)avd_get_int(flowop->fo_iters); 480 for (i = 0; i < count; i++) { 481 482 filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop " 483 "%s-%d", threadflow->tf_name, flowop->fo_name, 484 flowop->fo_instance); 485 486 ret = (*flowop->fo_func)(threadflow, flowop); 487 488 /* 489 * Return value FILEBENCH_ERROR means "flowop 490 * failed, stop the filebench run" 491 */ 492 if (ret == FILEBENCH_ERROR) { 493 filebench_log(LOG_ERROR, 494 "%s-%d: flowop %s-%d failed", 495 threadflow->tf_name, 496 threadflow->tf_instance, 497 flowop->fo_name, 498 flowop->fo_instance); 499 (void) ipc_mutex_lock(&threadflow->tf_lock); 500 threadflow->tf_abort = 1; 501 filebench_shm->shm_f_abort = 502 FILEBENCH_ABORT_ERROR; 503 (void) ipc_mutex_unlock(&threadflow->tf_lock); 504 break; 505 } 506 507 /* 508 * Return value of FILEBENCH_NORSC means "stop 509 * the filebench run" if in "end on no work mode", 510 * otherwise it indicates an error 511 */ 512 if (ret == FILEBENCH_NORSC) { 513 (void) ipc_mutex_lock(&threadflow->tf_lock); 514 threadflow->tf_abort = FILEBENCH_DONE; 515 if (filebench_shm->shm_rmode == 516 FILEBENCH_MODE_Q1STDONE) { 517 filebench_shm->shm_f_abort = 518 FILEBENCH_ABORT_RSRC; 519 } else if (filebench_shm->shm_rmode != 520 FILEBENCH_MODE_QALLDONE) { 521 filebench_log(LOG_ERROR1, 522 "WARNING! Run stopped early:\n " 523 " flowop %s-%d could " 524 "not obtain a file. Please\n " 525 " reduce runtime, " 526 "increase fileset entries " 527 "($nfiles), or switch modes.", 528 flowop->fo_name, 529 flowop->fo_instance); 530 filebench_shm->shm_f_abort = 531 FILEBENCH_ABORT_ERROR; 532 } 533 (void) ipc_mutex_unlock(&threadflow->tf_lock); 534 break; 535 } 536 537 /* 538 * Return value of FILEBENCH_DONE means "stop 539 * the filebench run without error" 540 */ 541 if (ret == FILEBENCH_DONE) { 542 (void) ipc_mutex_lock(&threadflow->tf_lock); 543 threadflow->tf_abort = FILEBENCH_DONE; 544 filebench_shm->shm_f_abort = 545 FILEBENCH_ABORT_DONE; 546 (void) ipc_mutex_unlock(&threadflow->tf_lock); 547 break; 548 } 549 550 /* 551 * If we get here and the return is something other 552 * than FILEBENCH_OK, it means a spurious code 553 * was returned, so treat as major error. This 554 * probably indicates a bug in the flowop. 555 */ 556 if (ret != FILEBENCH_OK) { 557 filebench_log(LOG_ERROR, 558 "Flowop %s unexpected return value = %d\n", 559 flowop->fo_name, ret); 560 filebench_shm->shm_f_abort = 561 FILEBENCH_ABORT_ERROR; 562 break; 563 } 564 } 565 566 /* advance to next flowop */ 567 flowop = flowop->fo_exec_next; 568 569 /* but if at end of list, start over from the beginning */ 570 if (flowop == NULL) { 571 flowop = threadflow->tf_thrd_fops; 572 threadflow->tf_stats.fs_count++; 573 } 574 } 575 576 #ifdef HAVE_LWPS 577 filebench_log(LOG_DEBUG_SCRIPT, "Thread %d exiting", 578 _lwp_self()); 579 #endif 580 581 /* Tell flowops to destroy locally acquired state */ 582 flowop_destruct_all_flows(threadflow); 583 584 pthread_exit(&threadflow->tf_abort); 585 } 586 587 void 588 flowop_init(void) 589 { 590 (void) pthread_mutex_init(&controlstats_lock, 591 ipc_mutexattr(IPC_MUTEX_NORMAL)); 592 flowoplib_init(); 593 } 594 595 /* 596 * Delete the designated flowop from the thread's flowop list. 597 */ 598 static void 599 flowop_delete(flowop_t **flowoplist, flowop_t *flowop) 600 { 601 flowop_t *entry = *flowoplist; 602 int found = 0; 603 604 filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)", 605 flowop->fo_name, 606 flowop->fo_instance); 607 608 /* Delete from thread's flowop list */ 609 if (flowop == *flowoplist) { 610 /* First on list */ 611 *flowoplist = flowop->fo_exec_next; 612 filebench_log(LOG_DEBUG_IMPL, 613 "Delete0 flowop: (%s-%d)", 614 flowop->fo_name, 615 flowop->fo_instance); 616 } else { 617 while (entry->fo_exec_next) { 618 filebench_log(LOG_DEBUG_IMPL, 619 "Delete0 flowop: (%s-%d) == (%s-%d)", 620 entry->fo_exec_next->fo_name, 621 entry->fo_exec_next->fo_instance, 622 flowop->fo_name, 623 flowop->fo_instance); 624 625 if (flowop == entry->fo_exec_next) { 626 /* Delete */ 627 filebench_log(LOG_DEBUG_IMPL, 628 "Deleted0 flowop: (%s-%d)", 629 entry->fo_exec_next->fo_name, 630 entry->fo_exec_next->fo_instance); 631 entry->fo_exec_next = 632 entry->fo_exec_next->fo_exec_next; 633 break; 634 } 635 entry = entry->fo_exec_next; 636 } 637 } 638 639 #ifdef HAVE_PROCFS 640 /* Close /proc stats */ 641 if (flowop->fo_thread) 642 (void) close(flowop->fo_thread->tf_lwpusagefd); 643 #endif 644 645 /* Delete from global list */ 646 entry = filebench_shm->shm_flowoplist; 647 648 if (flowop == filebench_shm->shm_flowoplist) { 649 /* First on list */ 650 filebench_shm->shm_flowoplist = flowop->fo_next; 651 found = 1; 652 } else { 653 while (entry->fo_next) { 654 filebench_log(LOG_DEBUG_IMPL, 655 "Delete flowop: (%s-%d) == (%s-%d)", 656 entry->fo_next->fo_name, 657 entry->fo_next->fo_instance, 658 flowop->fo_name, 659 flowop->fo_instance); 660 661 if (flowop == entry->fo_next) { 662 /* Delete */ 663 entry->fo_next = entry->fo_next->fo_next; 664 found = 1; 665 break; 666 } 667 668 entry = entry->fo_next; 669 } 670 } 671 if (found) { 672 filebench_log(LOG_DEBUG_IMPL, 673 "Deleted flowop: (%s-%d)", 674 flowop->fo_name, 675 flowop->fo_instance); 676 ipc_free(FILEBENCH_FLOWOP, (char *)flowop); 677 } else { 678 filebench_log(LOG_DEBUG_IMPL, "Flowop %s-%d not found!", 679 flowop->fo_name, 680 flowop->fo_instance); 681 } 682 } 683 684 /* 685 * Deletes all the flowops from a flowop list. 686 */ 687 void 688 flowop_delete_all(flowop_t **flowoplist) 689 { 690 flowop_t *flowop = *flowoplist; 691 692 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock); 693 694 while (flowop) { 695 filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)", 696 flowop->fo_name, flowop->fo_instance); 697 698 if (flowop->fo_instance && 699 (flowop->fo_instance == FLOW_MASTER)) { 700 flowop = flowop->fo_exec_next; 701 continue; 702 } 703 flowop_delete(flowoplist, flowop); 704 flowop = flowop->fo_exec_next; 705 } 706 707 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock); 708 } 709 710 /* 711 * Allocates a flowop entity and initializes it with inherited 712 * contents from the "inherit" flowop, if it is supplied, or 713 * with zeros otherwise. In either case the fo_next and fo_exec_next 714 * pointers are set to NULL, and fo_thread is set to point to 715 * the owning threadflow. The initialized flowop is placed at 716 * the head of the global flowop list, and also placed on the 717 * tail of the supplied local flowop list, which will either 718 * be a threadflow's tf_thrd_fops list or a composite flowop's 719 * fo_comp_fops list. The routine locks the flowop's fo_lock and 720 * leaves it held on return. If successful, it returns a pointer 721 * to the allocated and initialized flowop, otherwise it returns NULL. 722 * 723 * filebench_shm->shm_flowop_lock must be held by caller. 724 */ 725 static flowop_t * 726 flowop_define_common(threadflow_t *threadflow, char *name, flowop_t *inherit, 727 flowop_t **flowoplist_hdp, int instance, int type) 728 { 729 flowop_t *flowop; 730 731 if (name == NULL) 732 return (NULL); 733 734 if ((flowop = (flowop_t *)ipc_malloc(FILEBENCH_FLOWOP)) == NULL) { 735 filebench_log(LOG_ERROR, 736 "flowop_define: Can't malloc flowop"); 737 return (NULL); 738 } 739 740 filebench_log(LOG_DEBUG_IMPL, "defining flowops %s-%d, addr %zx", 741 name, instance, flowop); 742 743 if (flowop == NULL) 744 return (NULL); 745 746 if (inherit) { 747 (void) memcpy(flowop, inherit, sizeof (flowop_t)); 748 (void) pthread_mutex_init(&flowop->fo_lock, 749 ipc_mutexattr(IPC_MUTEX_PRI_ROB)); 750 (void) ipc_mutex_lock(&flowop->fo_lock); 751 flowop->fo_next = NULL; 752 flowop->fo_exec_next = NULL; 753 filebench_log(LOG_DEBUG_IMPL, 754 "flowop %s-%d calling init", name, instance); 755 } else { 756 (void) memset(flowop, 0, sizeof (flowop_t)); 757 flowop->fo_iters = avd_int_alloc(1); 758 flowop->fo_type = type; 759 (void) pthread_mutex_init(&flowop->fo_lock, 760 ipc_mutexattr(IPC_MUTEX_PRI_ROB)); 761 (void) ipc_mutex_lock(&flowop->fo_lock); 762 } 763 764 /* Create backpointer to thread */ 765 flowop->fo_thread = threadflow; 766 767 /* Add flowop to global list */ 768 if (filebench_shm->shm_flowoplist == NULL) { 769 filebench_shm->shm_flowoplist = flowop; 770 flowop->fo_next = NULL; 771 } else { 772 flowop->fo_next = filebench_shm->shm_flowoplist; 773 filebench_shm->shm_flowoplist = flowop; 774 } 775 776 (void) strcpy(flowop->fo_name, name); 777 flowop->fo_instance = instance; 778 779 if (flowoplist_hdp == NULL) 780 return (flowop); 781 782 /* Add flowop to thread op list */ 783 if (*flowoplist_hdp == NULL) { 784 *flowoplist_hdp = flowop; 785 flowop->fo_exec_next = NULL; 786 } else { 787 flowop_t *flowend; 788 789 /* Find the end of the thread list */ 790 flowend = *flowoplist_hdp; 791 while (flowend->fo_exec_next != NULL) 792 flowend = flowend->fo_exec_next; 793 flowend->fo_exec_next = flowop; 794 flowop->fo_exec_next = NULL; 795 } 796 797 return (flowop); 798 } 799 800 /* 801 * Calls flowop_define_common() to allocate and initialize a 802 * flowop, and holds the shared flowop_lock during the call. 803 * It releases the created flowop's fo_lock when done. 804 */ 805 flowop_t * 806 flowop_define(threadflow_t *threadflow, char *name, flowop_t *inherit, 807 flowop_t **flowoplist_hdp, int instance, int type) 808 { 809 flowop_t *flowop; 810 811 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock); 812 flowop = flowop_define_common(threadflow, name, 813 inherit, flowoplist_hdp, instance, type); 814 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock); 815 816 if (flowop == NULL) 817 return (NULL); 818 819 (void) ipc_mutex_unlock(&flowop->fo_lock); 820 821 return (flowop); 822 } 823 824 /* 825 * Calls flowop_define_common() to allocate and initialize a 826 * composite flowop, and holds the shared flowop_lock during the call. 827 * It releases the created flowop's fo_lock when done. 828 */ 829 flowop_t * 830 flowop_new_composite_define(char *name) 831 { 832 flowop_t *flowop; 833 834 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock); 835 flowop = flowop_define_common(NULL, name, 836 NULL, NULL, 0, FLOW_TYPE_COMPOSITE); 837 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock); 838 839 if (flowop == NULL) 840 return (NULL); 841 842 flowop->fo_func = flowop_composite; 843 flowop->fo_init = flowop_composite_init; 844 flowop->fo_destruct = flowop_composite_destruct; 845 (void) ipc_mutex_unlock(&flowop->fo_lock); 846 847 return (flowop); 848 } 849 850 /* 851 * Attempts to take a write lock on the flowop_find_lock that is 852 * defined in interprocess shared memory. Since each call to 853 * flowop_start() holds a read lock on flowop_find_lock, this 854 * routine effectively blocks until all instances of 855 * flowop_start() have finished. The flowop_find() routine calls 856 * this routine so that flowops won't be searched for until all 857 * flowops have been created by flowop_start. 858 */ 859 static void 860 flowop_find_barrier(void) 861 { 862 /* Block on wrlock to ensure find waits for all creates */ 863 (void) pthread_rwlock_wrlock(&filebench_shm->shm_flowop_find_lock); 864 (void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock); 865 } 866 867 /* 868 * Returns a list of flowops named "name" from the master 869 * flowop list. 870 */ 871 flowop_t * 872 flowop_find(char *name) 873 { 874 flowop_t *flowop; 875 flowop_t *result = NULL; 876 877 flowop_find_barrier(); 878 879 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock); 880 881 flowop = filebench_shm->shm_flowoplist; 882 883 while (flowop) { 884 if (strcmp(name, flowop->fo_name) == 0) { 885 886 /* Add flowop to result list */ 887 if (result == NULL) { 888 result = flowop; 889 flowop->fo_resultnext = NULL; 890 } else { 891 flowop->fo_resultnext = result; 892 result = flowop; 893 } 894 } 895 flowop = flowop->fo_next; 896 } 897 898 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock); 899 900 901 return (result); 902 } 903 904 /* 905 * Returns a pointer to the specified instance of flowop 906 * "name" from the global list. 907 */ 908 flowop_t * 909 flowop_find_one(char *name, int instance) 910 { 911 flowop_t *test_flowop; 912 913 flowop_find_barrier(); 914 915 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock); 916 917 test_flowop = filebench_shm->shm_flowoplist; 918 919 while (test_flowop) { 920 if ((strcmp(name, test_flowop->fo_name) == 0) && 921 (instance == test_flowop->fo_instance)) 922 break; 923 924 test_flowop = test_flowop->fo_next; 925 } 926 927 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock); 928 929 return (test_flowop); 930 } 931 932 /* 933 * recursively searches through lists of flowops on a given thread 934 * and those on any included composite flowops for the named flowop. 935 * either returns with a pointer to the named flowop or NULL if it 936 * cannot be found. 937 */ 938 static flowop_t * 939 flowop_recurse_search(char *path, char *name, flowop_t *list) 940 { 941 flowop_t *test_flowop; 942 char fullname[MAXPATHLEN]; 943 944 test_flowop = list; 945 946 /* 947 * when searching a list of inner flowops, "path" is the fullname 948 * of the containing composite flowop. Use it to form the 949 * full name of the inner flowop to search for. 950 */ 951 if (path) { 952 if ((strlen(path) + strlen(name) + 1) > MAXPATHLEN) { 953 filebench_log(LOG_ERROR, 954 "composite flowop path name %s.%s too long", 955 path, name); 956 return (NULL); 957 } 958 959 /* create composite_name.name for recursive search */ 960 (void) strcpy(fullname, path); 961 (void) strcat(fullname, "."); 962 (void) strcat(fullname, name); 963 } else { 964 (void) strcpy(fullname, name); 965 } 966 967 /* 968 * loop through all flowops on the supplied tf_thrd_fops (flowop) 969 * list or fo_comp_fops (inner flowop) list. 970 */ 971 while (test_flowop) { 972 if (strcmp(fullname, test_flowop->fo_name) == 0) 973 return (test_flowop); 974 975 if (test_flowop->fo_type == FLOW_TYPE_COMPOSITE) { 976 flowop_t *found_flowop; 977 978 found_flowop = flowop_recurse_search( 979 test_flowop->fo_name, name, 980 test_flowop->fo_comp_fops); 981 982 if (found_flowop) 983 return (found_flowop); 984 } 985 test_flowop = test_flowop->fo_exec_next; 986 } 987 988 /* not found here or on any child lists */ 989 return (NULL); 990 } 991 992 /* 993 * Returns a pointer to flowop named "name" from the supplied tf_thrd_fops 994 * list of flowops. Returns the named flowop if found, or NULL. 995 */ 996 flowop_t * 997 flowop_find_from_list(char *name, flowop_t *list) 998 { 999 flowop_t *found_flowop; 1000 1001 flowop_find_barrier(); 1002 1003 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock); 1004 1005 found_flowop = flowop_recurse_search(NULL, name, list); 1006 1007 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock); 1008 1009 return (found_flowop); 1010 } 1011 1012 /* 1013 * Composite flowop method. Does one pass through its list of 1014 * inner flowops per iteration. 1015 */ 1016 static int 1017 flowop_composite(threadflow_t *threadflow, flowop_t *flowop) 1018 { 1019 flowop_t *inner_flowop; 1020 1021 /* get the first flowop in the list */ 1022 inner_flowop = flowop->fo_comp_fops; 1023 1024 /* make a pass through the list of sub flowops */ 1025 while (inner_flowop) { 1026 int i, count; 1027 1028 /* Abort if asked */ 1029 if (threadflow->tf_abort || filebench_shm->shm_f_abort) 1030 return (FILEBENCH_DONE); 1031 1032 if (inner_flowop->fo_stats.fs_stime == 0) 1033 inner_flowop->fo_stats.fs_stime = gethrtime(); 1034 1035 /* Execute the flowop for fo_iters times */ 1036 count = (int)avd_get_int(inner_flowop->fo_iters); 1037 for (i = 0; i < count; i++) { 1038 1039 filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop " 1040 "%s-%d", threadflow->tf_name, 1041 inner_flowop->fo_name, 1042 inner_flowop->fo_instance); 1043 1044 switch ((*inner_flowop->fo_func)(threadflow, 1045 inner_flowop)) { 1046 1047 /* all done */ 1048 case FILEBENCH_DONE: 1049 return (FILEBENCH_DONE); 1050 1051 /* quit if inner flowop limit reached */ 1052 case FILEBENCH_NORSC: 1053 return (FILEBENCH_NORSC); 1054 1055 /* quit on inner flowop error */ 1056 case FILEBENCH_ERROR: 1057 filebench_log(LOG_ERROR, 1058 "inner flowop %s failed", 1059 inner_flowop->fo_name); 1060 return (FILEBENCH_ERROR); 1061 1062 /* otherwise keep going */ 1063 default: 1064 break; 1065 } 1066 1067 } 1068 1069 /* advance to next flowop */ 1070 inner_flowop = inner_flowop->fo_exec_next; 1071 } 1072 1073 /* finished with this pass */ 1074 return (FILEBENCH_OK); 1075 } 1076 1077 /* 1078 * Composite flowop initialization. Creates runtime inner flowops 1079 * from prototype inner flowops. 1080 */ 1081 static int 1082 flowop_composite_init(flowop_t *flowop) 1083 { 1084 int err; 1085 1086 err = flowop_create_runtime_flowops(flowop->fo_thread, 1087 &flowop->fo_comp_fops); 1088 if (err != FILEBENCH_OK) 1089 return (err); 1090 1091 (void) ipc_mutex_unlock(&flowop->fo_lock); 1092 return (0); 1093 } 1094 1095 /* 1096 * clean up inner flowops 1097 */ 1098 static void 1099 flowop_composite_destruct(flowop_t *flowop) 1100 { 1101 flowop_t *inner_flowop = flowop->fo_comp_fops; 1102 1103 while (inner_flowop) { 1104 filebench_log(LOG_DEBUG_IMPL, "Deleting inner flowop (%s-%d)", 1105 inner_flowop->fo_name, inner_flowop->fo_instance); 1106 1107 if (inner_flowop->fo_instance && 1108 (inner_flowop->fo_instance == FLOW_MASTER)) { 1109 inner_flowop = inner_flowop->fo_exec_next; 1110 continue; 1111 } 1112 flowop_delete(&flowop->fo_comp_fops, inner_flowop); 1113 inner_flowop = inner_flowop->fo_exec_next; 1114 } 1115 } 1116