1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 #pragma ident "%Z%%M% %I% %E% SMI" 27 28 #include "config.h" 29 #include <pthread.h> 30 #ifdef HAVE_LWPS 31 #include <sys/lwp.h> 32 #endif 33 #include <signal.h> 34 #include "threadflow.h" 35 #include "filebench.h" 36 #include "flowop.h" 37 #include "ipc.h" 38 39 static threadflow_t *threadflow_define_common(procflow_t *procflow, 40 char *name, threadflow_t *inherit, int instance); 41 42 /* 43 * Threadflows are filebench entities which manage operating system 44 * threads. Each worker threadflow spawns a separate filebench thread, 45 * with attributes inherited from a FLOW_MASTER threadflow created during 46 * f model language parsing. This section contains routines to define, 47 * create, control, and delete threadflows. 48 * 49 * Each thread defined in the f model creates a FLOW_MASTER 50 * threadflow which encapsulates the defined attributes and flowops of 51 * the f language thread, including the number of instances to create. 52 * At runtime, a worker threadflow instance with an associated filebench 53 * thread is created, which runs until told to quit or is specifically 54 * deleted. 55 */ 56 57 58 /* 59 * Prints information about threadflow syntax. 60 */ 61 void 62 threadflow_usage(void) 63 { 64 (void) fprintf(stderr, " thread name=<name>[,instances=<count>]\n"); 65 (void) fprintf(stderr, "\n"); 66 (void) fprintf(stderr, " {\n"); 67 (void) fprintf(stderr, " flowop ...\n"); 68 (void) fprintf(stderr, " flowop ...\n"); 69 (void) fprintf(stderr, " flowop ...\n"); 70 (void) fprintf(stderr, " }\n"); 71 (void) fprintf(stderr, "\n"); 72 } 73 74 /* 75 * Creates a thread for the supplied threadflow. If interprocess 76 * shared memory is desired, then increments the amount of shared 77 * memory needed by the amount specified in the threadflow's 78 * tf_memsize parameter. The thread starts in routine 79 * flowop_start() with a poineter to the threadflow supplied 80 * as the argument. 81 */ 82 static int 83 threadflow_createthread(threadflow_t *threadflow) 84 { 85 fbint_t memsize; 86 memsize = avd_get_int(threadflow->tf_memsize); 87 threadflow->tf_constmemsize = memsize; 88 89 filebench_log(LOG_DEBUG_SCRIPT, "Creating thread %s, memory = %ld", 90 threadflow->tf_name, memsize); 91 92 if (threadflow->tf_attrs & THREADFLOW_USEISM) 93 filebench_shm->shm_required += memsize; 94 95 if (pthread_create(&threadflow->tf_tid, NULL, 96 (void *(*)(void*))flowop_start, threadflow) != 0) { 97 filebench_log(LOG_ERROR, "thread create failed"); 98 filebench_shutdown(1); 99 return (FILEBENCH_ERROR); 100 } 101 102 return (FILEBENCH_OK); 103 } 104 105 /* 106 * Terminates (exits) all the threads of the procflow (process). 107 * The procflow is determined from a process private pointer 108 * initialized by threadflow_init(). 109 */ 110 /* ARGSUSED */ 111 static void 112 threadflow_cancel(int arg1) 113 { 114 threadflow_t *threadflow; 115 116 #ifdef HAVE_LWPS 117 filebench_log(LOG_DEBUG_IMPL, "Thread signal handler on tid %d", 118 _lwp_self()); 119 #endif 120 121 threadflow = my_procflow->pf_threads; 122 my_procflow->pf_running = 0; 123 124 while (threadflow) { 125 if (threadflow->tf_tid) { 126 /* make sure thread has been cleaned up */ 127 flowop_destruct_all_flows(threadflow); 128 129 (void) pthread_cancel(threadflow->tf_tid); 130 filebench_log(LOG_DEBUG_IMPL, "Thread %d cancelled...", 131 threadflow->tf_tid); 132 } 133 threadflow = threadflow->tf_next; 134 } 135 136 exit(0); 137 } 138 139 /* 140 * Creates threads for the threadflows associated with a procflow. 141 * The routine iterates through the list of threadflows in the 142 * supplied procflow's pf_threads list. For each threadflow on 143 * the list, it defines tf_instances number of cloned 144 * threadflows, and then calls threadflow_createthread() for 145 * each to create and start the actual operating system thread. 146 * Note that each of the newly defined threadflows will be linked 147 * into the procflows threadflow list, but at the head of the 148 * list, so they will not become part of the supplied set. After 149 * all the threads have been created, threadflow_init enters 150 * a join loop for all the threads in the newly defined 151 * threadflows. Once all the created threads have exited, 152 * threadflow_init will return 0. If errors are encountered, it 153 * will return a non zero value. 154 */ 155 int 156 threadflow_init(procflow_t *procflow) 157 { 158 threadflow_t *threadflow = procflow->pf_threads; 159 int ret = 0; 160 161 (void) ipc_mutex_lock(&filebench_shm->threadflow_lock); 162 163 (void) signal(SIGUSR1, threadflow_cancel); 164 165 while (threadflow) { 166 threadflow_t *newthread; 167 int instances; 168 int i; 169 170 instances = avd_get_int(threadflow->tf_instances); 171 filebench_log(LOG_VERBOSE, 172 "Starting %d %s threads", 173 instances, threadflow->tf_name); 174 175 for (i = 1; i < instances; i++) { 176 /* Create threads */ 177 newthread = 178 threadflow_define_common(procflow, 179 threadflow->tf_name, threadflow, i + 1); 180 if (newthread == NULL) 181 return (-1); 182 ret |= threadflow_createthread(newthread); 183 } 184 185 newthread = threadflow_define_common(procflow, 186 threadflow->tf_name, 187 threadflow, 1); 188 189 if (newthread == NULL) 190 return (-1); 191 192 /* Create each thread */ 193 ret |= threadflow_createthread(newthread); 194 195 threadflow = threadflow->tf_next; 196 } 197 198 threadflow = procflow->pf_threads; 199 200 (void) ipc_mutex_unlock(&filebench_shm->threadflow_lock); 201 202 while (threadflow) { 203 void *status; 204 205 /* wait for all threads to finish */ 206 if (threadflow->tf_tid) 207 (void) pthread_join(threadflow->tf_tid, &status); 208 209 ret |= *(int *)status; 210 threadflow = threadflow->tf_next; 211 } 212 213 procflow->pf_running = 0; 214 215 return (ret); 216 } 217 218 /* 219 * Tells the threadflow's thread to stop and optionally signals 220 * its associated process to end the thread. 221 */ 222 static void 223 threadflow_kill(threadflow_t *threadflow, int wait_cnt) 224 { 225 /* Tell thread to finish */ 226 threadflow->tf_abort = 1; 227 228 /* wait a bit for threadflow to stop */ 229 while (wait_cnt && threadflow->tf_running) { 230 (void) sleep(1); 231 wait_cnt--; 232 } 233 234 #ifdef USE_PROCESS_MODEL 235 #ifdef HAVE_SIGSEND 236 (void) sigsend(P_PID, threadflow->tf_process->pf_pid, SIGUSR1); 237 #else 238 (void) kill(threadflow->tf_process->pf_pid, SIGUSR1); 239 #endif 240 #else /* USE_PROCESS_MODEL */ 241 threadflow->tf_process->pf_running = 0; 242 #endif /* USE_PROCESS_MODEL */ 243 } 244 245 /* 246 * Deletes the specified threadflow from the specified threadflow 247 * list after first terminating the threadflow's thread, deleting 248 * the threadflow's flowops, and finally freeing the threadflow 249 * entity. It also subtracts the threadflow's shared memory 250 * requirements from the total amount required, shm_required. If 251 * the specified threadflow is found, returns 0, otherwise 252 * returns -1. 253 */ 254 static int 255 threadflow_delete(threadflow_t **threadlist, threadflow_t *threadflow, 256 int wait_cnt) 257 { 258 threadflow_t *entry = *threadlist; 259 260 filebench_log(LOG_DEBUG_IMPL, "Deleting thread: (%s-%d)", 261 threadflow->tf_name, 262 threadflow->tf_instance); 263 264 if (threadflow->tf_attrs & THREADFLOW_USEISM) 265 filebench_shm->shm_required -= threadflow->tf_constmemsize; 266 267 if (threadflow == *threadlist) { 268 /* First on list */ 269 filebench_log(LOG_DEBUG_IMPL, "Deleted thread: (%s-%d)", 270 threadflow->tf_name, 271 threadflow->tf_instance); 272 273 threadflow_kill(threadflow, wait_cnt); 274 flowop_delete_all(&threadflow->tf_ops); 275 *threadlist = threadflow->tf_next; 276 ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow); 277 return (0); 278 } 279 280 while (entry->tf_next) { 281 filebench_log(LOG_DEBUG_IMPL, 282 "Delete thread: (%s-%d) == (%s-%d)", 283 entry->tf_next->tf_name, 284 entry->tf_next->tf_instance, 285 threadflow->tf_name, 286 threadflow->tf_instance); 287 288 if (threadflow == entry->tf_next) { 289 /* Delete */ 290 filebench_log(LOG_DEBUG_IMPL, 291 "Deleted thread: (%s-%d)", 292 entry->tf_next->tf_name, 293 entry->tf_next->tf_instance); 294 threadflow_kill(entry->tf_next, wait_cnt); 295 flowop_delete_all(&entry->tf_next->tf_ops); 296 ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow); 297 entry->tf_next = entry->tf_next->tf_next; 298 return (0); 299 } 300 entry = entry->tf_next; 301 } 302 303 return (-1); 304 } 305 306 /* 307 * Given a pointer to the thread list of a procflow, cycles 308 * through all the threadflows on the list, deleting each one 309 * except the FLOW_MASTER. 310 */ 311 void 312 threadflow_delete_all(threadflow_t **threadlist, int wait_cnt) 313 { 314 threadflow_t *threadflow = *threadlist; 315 316 (void) ipc_mutex_lock(&filebench_shm->threadflow_lock); 317 318 filebench_log(LOG_DEBUG_IMPL, "Deleting all threads"); 319 320 while (threadflow) { 321 if (threadflow->tf_instance && 322 (threadflow->tf_instance == FLOW_MASTER)) { 323 threadflow = threadflow->tf_next; 324 continue; 325 } 326 (void) threadflow_delete(threadlist, threadflow, wait_cnt); 327 threadflow = threadflow->tf_next; 328 /* grow more impatient */ 329 if (wait_cnt > 0) 330 wait_cnt--; 331 } 332 333 (void) ipc_mutex_unlock(&filebench_shm->threadflow_lock); 334 } 335 336 /* 337 * Waits till all threadflows are started, or a timeout occurs. 338 * Checks through the list of threadflows, waiting up to 10 339 * seconds for each one to set its tf_running flag to 1. If not 340 * set after 10 seconds, continues on to the next threadflow 341 * anyway. 342 */ 343 void 344 threadflow_allstarted(pid_t pid, threadflow_t *threadflow) 345 { 346 (void) ipc_mutex_lock(&filebench_shm->threadflow_lock); 347 348 while (threadflow) { 349 int waits; 350 351 if ((threadflow->tf_instance == 0) || 352 (threadflow->tf_instance == FLOW_MASTER)) { 353 threadflow = threadflow->tf_next; 354 continue; 355 } 356 357 filebench_log(LOG_DEBUG_IMPL, "Checking pid %d thread %s-%d", 358 pid, 359 threadflow->tf_name, 360 threadflow->tf_instance); 361 362 waits = 10; 363 while (waits && threadflow->tf_running == 0) { 364 (void) ipc_mutex_unlock( 365 &filebench_shm->threadflow_lock); 366 if (waits < 3) 367 filebench_log(LOG_INFO, 368 "Waiting for pid %d thread %s-%d", 369 pid, 370 threadflow->tf_name, 371 threadflow->tf_instance); 372 373 (void) sleep(1); 374 (void) ipc_mutex_lock(&filebench_shm->threadflow_lock); 375 waits--; 376 } 377 378 threadflow = threadflow->tf_next; 379 } 380 381 (void) ipc_mutex_unlock(&filebench_shm->threadflow_lock); 382 } 383 384 /* 385 * Create an in-memory thread object linked to a parent procflow. 386 * A threadflow entity is allocated from shared memory and 387 * initialized from the "inherit" threadflow if supplied, 388 * otherwise to zeros. The threadflow is assigned a unique 389 * thread id, the supplied instance number, the supplied name 390 * and added to the procflow's pf_thread list. If no name is 391 * supplied or the threadflow can't be allocated, NULL is 392 * returned Otherwise a pointer to the newly allocated threadflow 393 * is returned. 394 * 395 * The filebench_shm->threadflow_lock must be held by the caller. 396 */ 397 static threadflow_t * 398 threadflow_define_common(procflow_t *procflow, char *name, 399 threadflow_t *inherit, int instance) 400 { 401 threadflow_t *threadflow; 402 threadflow_t **threadlistp = &procflow->pf_threads; 403 404 if (name == NULL) 405 return (NULL); 406 407 threadflow = (threadflow_t *)ipc_malloc(FILEBENCH_THREADFLOW); 408 409 if (threadflow == NULL) 410 return (NULL); 411 412 if (inherit) 413 (void) memcpy(threadflow, inherit, sizeof (threadflow_t)); 414 else 415 (void) memset(threadflow, 0, sizeof (threadflow_t)); 416 417 threadflow->tf_utid = ++filebench_shm->utid; 418 419 threadflow->tf_instance = instance; 420 (void) strcpy(threadflow->tf_name, name); 421 threadflow->tf_process = procflow; 422 423 filebench_log(LOG_DEBUG_IMPL, "Defining thread %s-%d", 424 name, instance); 425 426 /* Add threadflow to list */ 427 if (*threadlistp == NULL) { 428 *threadlistp = threadflow; 429 threadflow->tf_next = NULL; 430 } else { 431 threadflow->tf_next = *threadlistp; 432 *threadlistp = threadflow; 433 } 434 435 return (threadflow); 436 } 437 438 /* 439 * Create an in memory FLOW_MASTER thread object as described 440 * by the syntax. Acquire the filebench_shm->threadflow_lock and 441 * call threadflow_define_common() to create a threadflow entity. 442 * Set the number of instances to create at runtime, 443 * tf_instances, to "instances". Return the threadflow pointer 444 * returned by the threadflow_define_common call. 445 */ 446 threadflow_t * 447 threadflow_define(procflow_t *procflow, char *name, 448 threadflow_t *inherit, avd_t instances) 449 { 450 threadflow_t *threadflow; 451 452 (void) ipc_mutex_lock(&filebench_shm->threadflow_lock); 453 454 if ((threadflow = threadflow_define_common(procflow, name, 455 inherit, FLOW_MASTER)) == NULL) 456 return (NULL); 457 458 threadflow->tf_instances = instances; 459 460 (void) ipc_mutex_unlock(&filebench_shm->threadflow_lock); 461 462 return (threadflow); 463 } 464 465 466 /* 467 * Searches the provided threadflow list for the named threadflow. 468 * A pointer to the threadflow is returned, or NULL if threadflow 469 * is not found. 470 */ 471 threadflow_t * 472 threadflow_find(threadflow_t *threadlist, char *name) 473 { 474 threadflow_t *threadflow = threadlist; 475 476 (void) ipc_mutex_lock(&filebench_shm->threadflow_lock); 477 478 while (threadflow) { 479 if (strcmp(name, threadflow->tf_name) == 0) { 480 481 (void) ipc_mutex_unlock( 482 &filebench_shm->threadflow_lock); 483 484 return (threadflow); 485 } 486 threadflow = threadflow->tf_next; 487 } 488 489 (void) ipc_mutex_unlock(&filebench_shm->threadflow_lock); 490 491 492 return (NULL); 493 } 494