1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 #pragma ident "%Z%%M% %I% %E% SMI" 27 28 #include "config.h" 29 30 #ifdef HAVE_LWPS 31 #include <sys/lwp.h> 32 #endif 33 #include <fcntl.h> 34 #include "filebench.h" 35 #include "flowop.h" 36 #include "stats.h" 37 38 #ifdef LINUX_PORT 39 #include <sys/types.h> 40 #include <linux/unistd.h> 41 #endif 42 43 static flowop_t *flowop_define_common(threadflow_t *threadflow, char *name, 44 flowop_t *inherit, int instance, int type); 45 46 /* 47 * A collection of flowop support functions. The actual code that 48 * implements the various flowops is in flowop_library.c. 49 * 50 * Routines for defining, creating, initializing and destroying 51 * flowops, cyclically invoking the flowops on each threadflow's flowop 52 * list, collecting statistics about flowop execution, and other 53 * housekeeping duties are included in this file. 54 */ 55 56 57 /* 58 * Prints the name and instance number of each flowop in 59 * the supplied list to the filebench log. 60 */ 61 int 62 flowop_printlist(flowop_t *list) 63 { 64 flowop_t *flowop = list; 65 66 while (flowop) { 67 filebench_log(LOG_DEBUG_IMPL, "flowop-list %s-%d", 68 flowop->fo_name, flowop->fo_instance); 69 flowop = flowop->fo_threadnext; 70 } 71 return (0); 72 } 73 74 /* 75 * Prints the name and instance number of all flowops on 76 * the master flowop list to the console and the filebench log. 77 */ 78 void 79 flowop_printall(void) 80 { 81 flowop_t *flowop = filebench_shm->shm_flowoplist; 82 83 while (flowop) { 84 filebench_log(LOG_VERBOSE, "flowop-list %s-%d", 85 flowop->fo_name, flowop->fo_instance); 86 flowop = flowop->fo_next; 87 } 88 } 89 90 #define TIMESPEC_TO_HRTIME(s, e) (((e.tv_sec - s.tv_sec) * 1000000000LL) + \ 91 (e.tv_nsec - s.tv_nsec)) 92 /* 93 * Puts current high resolution time in start time entry 94 * for threadflow and may also calculate running filebench 95 * overhead statistics. 96 */ 97 void 98 flowop_beginop(threadflow_t *threadflow, flowop_t *flowop) 99 { 100 #ifdef HAVE_PROCFS 101 if ((noproc == 0) && (threadflow->tf_lwpusagefd == 0)) { 102 char procname[128]; 103 104 (void) snprintf(procname, sizeof (procname), 105 "/proc/%d/lwp/%d/lwpusage", my_pid, _lwp_self()); 106 threadflow->tf_lwpusagefd = open(procname, O_RDONLY); 107 } 108 109 (void) pread(threadflow->tf_lwpusagefd, 110 &threadflow->tf_susage, 111 sizeof (struct prusage), 0); 112 113 /* Compute overhead time in this thread around op */ 114 if (threadflow->tf_eusage.pr_stime.tv_nsec) { 115 flowop->fo_stats.fs_mstate[FLOW_MSTATE_OHEAD] += 116 TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_utime, 117 threadflow->tf_susage.pr_utime) + 118 TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_ttime, 119 threadflow->tf_susage.pr_ttime) + 120 TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_stime, 121 threadflow->tf_susage.pr_stime); 122 } 123 #endif 124 /* Start of op for this thread */ 125 threadflow->tf_stime = gethrtime(); 126 } 127 128 flowstat_t controlstats; 129 pthread_mutex_t controlstats_lock; 130 static int controlstats_zeroed = 0; 131 132 /* 133 * Updates flowop's latency statistics, using saved start 134 * time and current high resolution time. Updates flowop's 135 * io count and transferred bytes statistics. Also updates 136 * threadflow's and flowop's cumulative read or write byte 137 * and io count statistics. 138 */ 139 void 140 flowop_endop(threadflow_t *threadflow, flowop_t *flowop, int64_t bytes) 141 { 142 hrtime_t t; 143 144 flowop->fo_stats.fs_mstate[FLOW_MSTATE_LAT] += 145 (gethrtime() - threadflow->tf_stime); 146 #ifdef HAVE_PROCFS 147 if ((pread(threadflow->tf_lwpusagefd, &threadflow->tf_eusage, 148 sizeof (struct prusage), 0)) != sizeof (struct prusage)) 149 filebench_log(LOG_ERROR, "cannot read /proc"); 150 151 t = 152 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_utime, 153 threadflow->tf_eusage.pr_utime) + 154 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_ttime, 155 threadflow->tf_eusage.pr_ttime) + 156 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_stime, 157 threadflow->tf_eusage.pr_stime); 158 flowop->fo_stats.fs_mstate[FLOW_MSTATE_CPU] += t; 159 160 flowop->fo_stats.fs_mstate[FLOW_MSTATE_WAIT] += 161 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_tftime, 162 threadflow->tf_eusage.pr_tftime) + 163 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_dftime, 164 threadflow->tf_eusage.pr_dftime) + 165 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime, 166 threadflow->tf_eusage.pr_kftime) + 167 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime, 168 threadflow->tf_eusage.pr_kftime) + 169 TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_slptime, 170 threadflow->tf_eusage.pr_slptime); 171 #endif 172 173 flowop->fo_stats.fs_count++; 174 flowop->fo_stats.fs_bytes += bytes; 175 (void) ipc_mutex_lock(&controlstats_lock); 176 if ((flowop->fo_type & FLOW_TYPE_IO) || 177 (flowop->fo_type & FLOW_TYPE_AIO)) { 178 controlstats.fs_count++; 179 controlstats.fs_bytes += bytes; 180 } 181 if (flowop->fo_attrs & FLOW_ATTR_READ) { 182 threadflow->tf_stats.fs_rbytes += bytes; 183 threadflow->tf_stats.fs_rcount++; 184 flowop->fo_stats.fs_rcount++; 185 controlstats.fs_rbytes += bytes; 186 controlstats.fs_rcount++; 187 } else if (flowop->fo_attrs & FLOW_ATTR_WRITE) { 188 threadflow->tf_stats.fs_wbytes += bytes; 189 threadflow->tf_stats.fs_wcount++; 190 flowop->fo_stats.fs_wcount++; 191 controlstats.fs_wbytes += bytes; 192 controlstats.fs_wcount++; 193 } 194 (void) ipc_mutex_unlock(&controlstats_lock); 195 } 196 197 /* 198 * Calls the flowop's initialization function, pointed to by 199 * flowop->fo_init. 200 */ 201 static int 202 flowop_initflow(flowop_t *flowop) 203 { 204 /* 205 * save static copies of two items, in case they are supplied 206 * from random variables 207 */ 208 flowop->fo_constvalue = avd_get_int(flowop->fo_value); 209 flowop->fo_constwss = avd_get_int(flowop->fo_wss); 210 211 if ((*flowop->fo_init)(flowop) < 0) { 212 filebench_log(LOG_ERROR, "flowop %s-%d init failed", 213 flowop->fo_name, flowop->fo_instance); 214 return (-1); 215 } 216 return (0); 217 } 218 219 /* 220 * Calls the flowop's destruct function, pointed to by 221 * flowop->fo_destruct. 222 */ 223 static void 224 flowop_destructflow(flowop_t *flowop) 225 { 226 (*flowop->fo_destruct)(flowop); 227 } 228 229 /* 230 * call the destruct funtions of all the threadflow's flowops, 231 * if it is still flagged as "running". 232 */ 233 void 234 flowop_destruct_all_flows(threadflow_t *threadflow) 235 { 236 flowop_t *flowop; 237 238 (void) ipc_mutex_lock(&threadflow->tf_lock); 239 240 /* prepare to call destruct flow routines, if necessary */ 241 if (threadflow->tf_running == 0) { 242 243 /* allready destroyed */ 244 (void) ipc_mutex_unlock(&threadflow->tf_lock); 245 return; 246 } 247 248 flowop = threadflow->tf_ops; 249 threadflow->tf_running = 0; 250 (void) ipc_mutex_unlock(&threadflow->tf_lock); 251 252 while (flowop) { 253 flowop_destructflow(flowop); 254 flowop = flowop->fo_threadnext; 255 } 256 } 257 258 /* 259 * The final initialization and main execution loop for the 260 * worker threads. Sets threadflow and flowop start times, 261 * waits for all process to start, then creates the runtime 262 * flowops from those defined by the F language workload 263 * script. It does some more initialization, then enters a 264 * loop to repeatedly execute the flowops on the flowop list 265 * until an abort condition is detected, at which time it exits. 266 * This is the starting routine for the new worker thread 267 * created by threadflow_createthread(), and is not currently 268 * called from anywhere else. 269 */ 270 void 271 flowop_start(threadflow_t *threadflow) 272 { 273 flowop_t *flowop; 274 size_t memsize; 275 int ret = 0; 276 277 #ifdef HAVE_PROCFS 278 if (noproc == 0) { 279 char procname[128]; 280 long ctl[2] = {PCSET, PR_MSACCT}; 281 int pfd; 282 283 (void) snprintf(procname, sizeof (procname), 284 "/proc/%d/lwp/%d/lwpctl", my_pid, _lwp_self()); 285 pfd = open(procname, O_WRONLY); 286 (void) pwrite(pfd, &ctl, sizeof (ctl), 0); 287 (void) close(pfd); 288 } 289 #endif 290 291 (void) ipc_mutex_lock(&controlstats_lock); 292 if (!controlstats_zeroed) { 293 (void) memset(&controlstats, 0, sizeof (controlstats)); 294 controlstats_zeroed = 1; 295 } 296 (void) ipc_mutex_unlock(&controlstats_lock); 297 298 flowop = threadflow->tf_ops; 299 threadflow->tf_stats.fs_stime = gethrtime(); 300 flowop->fo_stats.fs_stime = gethrtime(); 301 302 /* Hold the flowop find lock as reader to prevent lookups */ 303 (void) pthread_rwlock_rdlock(&filebench_shm->shm_flowop_find_lock); 304 305 /* 306 * Block until all processes have started, acting like 307 * a barrier. The original filebench process initially 308 * holds the run_lock as a reader, preventing any of the 309 * threads from obtaining the writer lock, and hence 310 * passing this point. Once all processes and threads 311 * have been created, the original process unlocks 312 * run_lock, allowing each waiting thread to lock 313 * and then immediately unlock it, then begin running. 314 */ 315 (void) pthread_rwlock_wrlock(&filebench_shm->shm_run_lock); 316 (void) pthread_rwlock_unlock(&filebench_shm->shm_run_lock); 317 318 /* Create the runtime flowops from those defined by the script */ 319 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock); 320 while (flowop) { 321 flowop_t *newflowop; 322 323 if (flowop == threadflow->tf_ops) 324 threadflow->tf_ops = NULL; 325 newflowop = flowop_define_common(threadflow, flowop->fo_name, 326 flowop, 1, 0); 327 if (newflowop == NULL) 328 return; 329 330 /* check for fo_filename attribute, and resolve if present */ 331 if (flowop->fo_filename) { 332 char *name; 333 334 name = avd_get_str(flowop->fo_filename); 335 newflowop->fo_fileset = fileset_find(name); 336 337 if (newflowop->fo_fileset == NULL) { 338 filebench_log(LOG_ERROR, 339 "flowop %s: file %s not found", 340 newflowop->fo_name, name); 341 filebench_shutdown(1); 342 } 343 } 344 345 if (flowop_initflow(newflowop) < 0) { 346 filebench_log(LOG_ERROR, "Flowop init of %s failed", 347 newflowop->fo_name); 348 } 349 flowop = flowop->fo_threadnext; 350 } 351 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock); 352 353 /* Release the find lock as reader to allow lookups */ 354 (void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock); 355 356 /* Set to the start of the new flowop list */ 357 flowop = threadflow->tf_ops; 358 359 threadflow->tf_abort = 0; 360 threadflow->tf_running = 1; 361 362 memsize = (size_t)threadflow->tf_constmemsize; 363 364 /* If we are going to use ISM, allocate later */ 365 if (threadflow->tf_attrs & THREADFLOW_USEISM) { 366 threadflow->tf_mem = 367 ipc_ismmalloc(memsize); 368 } else { 369 threadflow->tf_mem = 370 malloc(memsize); 371 } 372 373 (void) memset(threadflow->tf_mem, 0, memsize); 374 filebench_log(LOG_DEBUG_SCRIPT, "Thread allocated %d bytes", memsize); 375 376 #ifdef HAVE_LWPS 377 filebench_log(LOG_DEBUG_SCRIPT, "Thread %zx (%d) started", 378 threadflow, 379 _lwp_self()); 380 #endif 381 382 /* Main filebench worker loop */ 383 /* CONSTCOND */ 384 while (1) { 385 int i, count; 386 387 /* Abort if asked */ 388 if (threadflow->tf_abort || filebench_shm->shm_f_abort) 389 break; 390 391 /* Be quiet while stats are gathered */ 392 if (filebench_shm->shm_bequiet) { 393 (void) sleep(1); 394 continue; 395 } 396 397 /* Take it easy until everyone is ready to go */ 398 if (!filebench_shm->shm_running) { 399 (void) sleep(1); 400 continue; 401 } 402 403 if (flowop == NULL) { 404 filebench_log(LOG_ERROR, "flowop_read null flowop"); 405 return; 406 } 407 408 if (flowop->fo_stats.fs_stime == 0) 409 flowop->fo_stats.fs_stime = gethrtime(); 410 411 /* Execute the flowop for fo_iters times */ 412 count = avd_get_int(flowop->fo_iters); 413 for (i = 0; i < count; i++) { 414 415 filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop " 416 "%s-%d", threadflow->tf_name, flowop->fo_name, 417 flowop->fo_instance); 418 419 ret = (*flowop->fo_func)(threadflow, flowop); 420 421 /* 422 * Return value FILEBENCH_ERROR means "flowop 423 * failed, stop the filebench run" 424 */ 425 if (ret == FILEBENCH_ERROR) { 426 filebench_log(LOG_ERROR, 427 "%s-%d: flowop %s-%d failed", 428 threadflow->tf_name, 429 threadflow->tf_instance, 430 flowop->fo_name, 431 flowop->fo_instance); 432 (void) ipc_mutex_lock(&threadflow->tf_lock); 433 threadflow->tf_abort = 1; 434 filebench_shm->shm_f_abort = 435 FILEBENCH_ABORT_ERROR; 436 (void) ipc_mutex_unlock(&threadflow->tf_lock); 437 break; 438 } 439 440 /* 441 * Return value of FILEBENCH_NORSC means "stop 442 * the filebench run" if in "end on no work mode", 443 * otherwise it indicates an error 444 */ 445 if (ret == FILEBENCH_NORSC) { 446 (void) ipc_mutex_lock(&threadflow->tf_lock); 447 threadflow->tf_abort = FILEBENCH_DONE; 448 if (filebench_shm->shm_rmode == 449 FILEBENCH_MODE_Q1STDONE) { 450 filebench_shm->shm_f_abort = 451 FILEBENCH_ABORT_RSRC; 452 } else if (filebench_shm->shm_rmode != 453 FILEBENCH_MODE_QALLDONE) { 454 filebench_log(LOG_ERROR1, 455 "WARNING! Run stopped early:\n " 456 " flowop %s-%d could " 457 "not obtain a file. Please\n " 458 " reduce runtime, " 459 "increase fileset entries " 460 "($nfiles), or switch modes.", 461 flowop->fo_name, 462 flowop->fo_instance); 463 filebench_shm->shm_f_abort = 464 FILEBENCH_ABORT_ERROR; 465 } 466 (void) ipc_mutex_unlock(&threadflow->tf_lock); 467 break; 468 } 469 470 /* 471 * Return value of FILEBENCH_DONE means "stop 472 * the filebench run without error" 473 */ 474 if (ret == FILEBENCH_DONE) { 475 (void) ipc_mutex_lock(&threadflow->tf_lock); 476 threadflow->tf_abort = FILEBENCH_DONE; 477 filebench_shm->shm_f_abort = 478 FILEBENCH_ABORT_DONE; 479 (void) ipc_mutex_unlock(&threadflow->tf_lock); 480 break; 481 } 482 483 /* 484 * If we get here and the return is something other 485 * than FILEBENCH_OK, it means a spurious code 486 * was returned, so treat as major error. This 487 * probably indicates a bug in the flowop. 488 */ 489 if (ret != FILEBENCH_OK) { 490 filebench_log(LOG_ERROR, 491 "Flowop %s unexpected return value = %d\n", 492 flowop->fo_name, ret); 493 filebench_shm->shm_f_abort = 494 FILEBENCH_ABORT_ERROR; 495 break; 496 } 497 } 498 499 /* advance to next flowop */ 500 flowop = flowop->fo_threadnext; 501 502 /* but if at end of list, start over from the beginning */ 503 if (flowop == NULL) { 504 flowop = threadflow->tf_ops; 505 threadflow->tf_stats.fs_count++; 506 } 507 } 508 509 #ifdef HAVE_LWPS 510 filebench_log(LOG_DEBUG_SCRIPT, "Thread %d exiting", 511 _lwp_self()); 512 #endif 513 514 /* Tell flowops to destroy locally acquired state */ 515 flowop_destruct_all_flows(threadflow); 516 517 pthread_exit(&threadflow->tf_abort); 518 } 519 520 void 521 flowop_init(void) 522 { 523 flowoplib_init(); 524 } 525 526 /* 527 * Delete the designated flowop from the thread's flowop list. 528 */ 529 static void 530 flowop_delete(flowop_t **flowoplist, flowop_t *flowop) 531 { 532 flowop_t *entry = *flowoplist; 533 int found = 0; 534 535 filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)", 536 flowop->fo_name, 537 flowop->fo_instance); 538 539 /* Delete from thread's flowop list */ 540 if (flowop == *flowoplist) { 541 /* First on list */ 542 *flowoplist = flowop->fo_threadnext; 543 filebench_log(LOG_DEBUG_IMPL, 544 "Delete0 flowop: (%s-%d)", 545 flowop->fo_name, 546 flowop->fo_instance); 547 } else { 548 while (entry->fo_threadnext) { 549 filebench_log(LOG_DEBUG_IMPL, 550 "Delete0 flowop: (%s-%d) == (%s-%d)", 551 entry->fo_threadnext->fo_name, 552 entry->fo_threadnext->fo_instance, 553 flowop->fo_name, 554 flowop->fo_instance); 555 556 if (flowop == entry->fo_threadnext) { 557 /* Delete */ 558 filebench_log(LOG_DEBUG_IMPL, 559 "Deleted0 flowop: (%s-%d)", 560 entry->fo_threadnext->fo_name, 561 entry->fo_threadnext->fo_instance); 562 entry->fo_threadnext = 563 entry->fo_threadnext->fo_threadnext; 564 break; 565 } 566 entry = entry->fo_threadnext; 567 } 568 } 569 570 #ifdef HAVE_PROCFS 571 /* Close /proc stats */ 572 if (flowop->fo_thread) 573 (void) close(flowop->fo_thread->tf_lwpusagefd); 574 #endif 575 576 /* Delete from global list */ 577 entry = filebench_shm->shm_flowoplist; 578 579 if (flowop == filebench_shm->shm_flowoplist) { 580 /* First on list */ 581 filebench_shm->shm_flowoplist = flowop->fo_next; 582 found = 1; 583 } else { 584 while (entry->fo_next) { 585 filebench_log(LOG_DEBUG_IMPL, 586 "Delete flowop: (%s-%d) == (%s-%d)", 587 entry->fo_next->fo_name, 588 entry->fo_next->fo_instance, 589 flowop->fo_name, 590 flowop->fo_instance); 591 592 if (flowop == entry->fo_next) { 593 /* Delete */ 594 entry->fo_next = entry->fo_next->fo_next; 595 found = 1; 596 break; 597 } 598 599 entry = entry->fo_next; 600 } 601 } 602 if (found) { 603 filebench_log(LOG_DEBUG_IMPL, 604 "Deleted flowop: (%s-%d)", 605 flowop->fo_name, 606 flowop->fo_instance); 607 ipc_free(FILEBENCH_FLOWOP, (char *)flowop); 608 } else { 609 filebench_log(LOG_DEBUG_IMPL, "Flowop %s-%d not found!", 610 flowop->fo_name, 611 flowop->fo_instance); 612 } 613 } 614 615 /* 616 * Deletes all the flowops from a flowop list. 617 */ 618 void 619 flowop_delete_all(flowop_t **flowoplist) 620 { 621 flowop_t *flowop = *flowoplist; 622 623 filebench_log(LOG_DEBUG_IMPL, "Deleting all flowops..."); 624 while (flowop) { 625 filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)", 626 flowop->fo_name, flowop->fo_instance); 627 flowop = flowop->fo_threadnext; 628 } 629 630 flowop = *flowoplist; 631 632 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock); 633 634 while (flowop) { 635 if (flowop->fo_instance && 636 (flowop->fo_instance == FLOW_MASTER)) { 637 flowop = flowop->fo_threadnext; 638 continue; 639 } 640 flowop_delete(flowoplist, flowop); 641 flowop = flowop->fo_threadnext; 642 } 643 644 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock); 645 } 646 647 /* 648 * Allocates a flowop entity and initializes it with inherited 649 * contents from the "inherit" flowop, if it is supplied, or 650 * with zeros otherwise. In either case the file descriptor 651 * (fo_fd) is set to -1, and the fo_next and fo_threadnext 652 * pointers are set to NULL, and fo_thread is set to point to 653 * the owning threadflow. The initialized flowop is placed at 654 * the head of the global flowop list, and also placed on the 655 * tail of thethreadflow's tf_ops list. The routine locks the 656 * flowop's fo_lock and leaves it held on return. If successful, 657 * it returns a pointer to the allocated and initialized flowop, 658 * otherwise NULL. 659 * 660 * filebench_shm->shm_flowop_lock must be held by caller. 661 */ 662 static flowop_t * 663 flowop_define_common(threadflow_t *threadflow, char *name, flowop_t *inherit, 664 int instance, int type) 665 { 666 flowop_t *flowop; 667 668 if (name == NULL) 669 return (NULL); 670 671 if ((flowop = (flowop_t *)ipc_malloc(FILEBENCH_FLOWOP)) == NULL) { 672 filebench_log(LOG_ERROR, 673 "flowop_define: Can't malloc flowop"); 674 return (NULL); 675 } 676 677 filebench_log(LOG_DEBUG_IMPL, "defining flowops %s-%d, addr %zx", 678 name, instance, flowop); 679 680 if (flowop == NULL) 681 return (NULL); 682 683 if (inherit) { 684 (void) memcpy(flowop, inherit, sizeof (flowop_t)); 685 (void) pthread_mutex_init(&flowop->fo_lock, ipc_mutexattr()); 686 (void) ipc_mutex_lock(&flowop->fo_lock); 687 flowop->fo_next = NULL; 688 flowop->fo_threadnext = NULL; 689 filebench_log(LOG_DEBUG_IMPL, 690 "flowop %s-%d calling init", name, instance); 691 } else { 692 (void) memset(flowop, 0, sizeof (flowop_t)); 693 flowop->fo_iters = avd_int_alloc(1); 694 flowop->fo_type = type; 695 (void) pthread_mutex_init(&flowop->fo_lock, ipc_mutexattr()); 696 (void) ipc_mutex_lock(&flowop->fo_lock); 697 } 698 699 /* Create backpointer to thread */ 700 flowop->fo_thread = threadflow; 701 702 /* Add flowop to global list */ 703 if (filebench_shm->shm_flowoplist == NULL) { 704 filebench_shm->shm_flowoplist = flowop; 705 flowop->fo_next = NULL; 706 } else { 707 flowop->fo_next = filebench_shm->shm_flowoplist; 708 filebench_shm->shm_flowoplist = flowop; 709 } 710 711 (void) strcpy(flowop->fo_name, name); 712 flowop->fo_instance = instance; 713 714 if (threadflow == NULL) 715 return (flowop); 716 717 /* Add flowop to thread op list */ 718 if (threadflow->tf_ops == NULL) { 719 threadflow->tf_ops = flowop; 720 flowop->fo_threadnext = NULL; 721 } else { 722 flowop_t *flowend; 723 724 /* Find the end of the thread list */ 725 flowend = threadflow->tf_ops; 726 while (flowend->fo_threadnext != NULL) 727 flowend = flowend->fo_threadnext; 728 flowend->fo_threadnext = flowop; 729 flowop->fo_threadnext = NULL; 730 } 731 732 return (flowop); 733 } 734 735 /* 736 * Calls flowop_define_common() to allocate and initialize a 737 * flowop, and holds the shared flowop_lock during the call. 738 * It releases the created flowop's fo_lock when done. 739 */ 740 flowop_t * 741 flowop_define(threadflow_t *threadflow, char *name, flowop_t *inherit, 742 int instance, int type) 743 { 744 flowop_t *flowop; 745 746 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock); 747 flowop = flowop_define_common(threadflow, name, 748 inherit, instance, type); 749 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock); 750 751 if (flowop == NULL) 752 return (NULL); 753 754 (void) ipc_mutex_unlock(&flowop->fo_lock); 755 756 return (flowop); 757 } 758 759 /* 760 * Attempts to take a write lock on the flowop_find_lock that is 761 * defined in interprocess shared memory. Since each call to 762 * flowop_start() holds a read lock on flowop_find_lock, this 763 * routine effectively blocks until all instances of 764 * flowop_start() have finished. The flowop_find() routine calls 765 * this routine so that flowops won't be searched for until all 766 * flowops have been created by flowop_start. 767 */ 768 static void 769 flowop_find_barrier(void) 770 { 771 /* Block on wrlock to ensure find waits for all creates */ 772 (void) pthread_rwlock_wrlock(&filebench_shm->shm_flowop_find_lock); 773 (void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock); 774 } 775 776 /* 777 * Returns a list of flowops named "name" from the master 778 * flowop list. 779 */ 780 flowop_t * 781 flowop_find(char *name) 782 { 783 flowop_t *flowop; 784 flowop_t *result = NULL; 785 786 flowop_find_barrier(); 787 788 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock); 789 790 flowop = filebench_shm->shm_flowoplist; 791 792 while (flowop) { 793 if (strcmp(name, flowop->fo_name) == 0) { 794 795 /* Add flowop to result list */ 796 if (result == NULL) { 797 result = flowop; 798 flowop->fo_resultnext = NULL; 799 } else { 800 flowop->fo_resultnext = result; 801 result = flowop; 802 } 803 } 804 flowop = flowop->fo_next; 805 } 806 807 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock); 808 809 810 return (result); 811 } 812 813 /* 814 * Returns a pointer to the specified instance of flowop 815 * "name" from the supplied list. 816 */ 817 flowop_t * 818 flowop_find_one(char *name, int instance) 819 { 820 flowop_t *test_flowop; 821 822 flowop_find_barrier(); 823 824 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock); 825 826 test_flowop = filebench_shm->shm_flowoplist; 827 828 while (test_flowop) { 829 if ((strcmp(name, test_flowop->fo_name) == 0) && 830 (instance == test_flowop->fo_instance)) 831 break; 832 833 test_flowop = test_flowop->fo_next; 834 } 835 836 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock); 837 838 return (test_flowop); 839 } 840