1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2007 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 #pragma ident "%Z%%M% %I% %E% SMI" 27 28 #include "config.h" 29 #include <pthread.h> 30 #ifdef HAVE_LWPS 31 #include <sys/lwp.h> 32 #endif 33 #include <signal.h> 34 #include "threadflow.h" 35 #include "filebench.h" 36 #include "flowop.h" 37 #include "ipc.h" 38 39 static threadflow_t *threadflow_define_common(procflow_t *procflow, 40 char *name, threadflow_t *inherit, int instance); 41 42 /* 43 * Threadflows are filebench entities which manage operating system 44 * threads. Each worker threadflow spawns a separate filebench thread, 45 * with attributes inherited from a FLOW_MASTER threadflow created during 46 * f model language parsing. This section contains routines to define, 47 * create, control, and delete threadflows. 48 * 49 * Each thread defined in the f model creates a FLOW_MASTER 50 * threadflow which encapsulates the defined attributes and flowops of 51 * the f language thread, including the number of instances to create. 52 * At runtime, a worker threadflow instance with an associated filebench 53 * thread is created, which runs until told to quit or is specifically 54 * deleted. 55 */ 56 57 58 /* 59 * Prints information about threadflow syntax. 60 */ 61 void 62 threadflow_usage(void) 63 { 64 (void) fprintf(stderr, " thread name=<name>[,instances=<count>]\n"); 65 (void) fprintf(stderr, "\n"); 66 (void) fprintf(stderr, " {\n"); 67 (void) fprintf(stderr, " flowop ...\n"); 68 (void) fprintf(stderr, " flowop ...\n"); 69 (void) fprintf(stderr, " flowop ...\n"); 70 (void) fprintf(stderr, " }\n"); 71 (void) fprintf(stderr, "\n"); 72 } 73 74 /* 75 * Creates a thread for the supplied threadflow. If interprocess 76 * shared memory is desired, then increments the amount of shared 77 * memory needed by the amount specified in the threadflow's 78 * tf_memsize parameter. The thread starts in routine 79 * flowop_start() with a poineter to the threadflow supplied 80 * as the argument. 81 */ 82 static int 83 threadflow_createthread(threadflow_t *threadflow) 84 { 85 int fp = 0; 86 87 filebench_log(LOG_DEBUG_SCRIPT, "Creating thread %s, memory = %ld", 88 threadflow->tf_name, 89 *threadflow->tf_memsize); 90 91 if (threadflow->tf_attrs & THREADFLOW_USEISM) 92 filebench_shm->shm_required += (*threadflow->tf_memsize); 93 94 if (pthread_create(&threadflow->tf_tid, NULL, 95 (void *(*)(void*))flowop_start, threadflow) != 0) { 96 filebench_log(LOG_ERROR, "thread create failed"); 97 filebench_shutdown(1); 98 } 99 100 /* XXX */ 101 return (fp < 0); 102 } 103 104 #ifndef USE_PROCESS_MODEL 105 static procflow_t *my_procflow; 106 107 /* 108 * Terminates (exits) all the threads of the procflow (process). 109 * The procflow is determined from a process private pointer 110 * initialized by threadflow_init(). 111 */ 112 /* ARGSUSED */ 113 static void 114 threadflow_cancel(int arg1) 115 { 116 threadflow_t *threadflow = my_procflow->pf_threads; 117 118 #ifdef HAVE_LWPS 119 filebench_log(LOG_DEBUG_IMPL, "Thread signal handler on tid %d", 120 _lwp_self()); 121 #endif 122 123 my_procflow->pf_running = 0; 124 exit(0); 125 126 while (threadflow) { 127 if (threadflow->tf_tid) { 128 (void) pthread_cancel(threadflow->tf_tid); 129 filebench_log(LOG_DEBUG_IMPL, "Thread %d cancelled...", 130 threadflow->tf_tid); 131 } 132 threadflow = threadflow->tf_next; 133 } 134 } 135 #endif /* USE_PROCESS_MODEL */ 136 137 /* 138 * Creates threads for the threadflows associated with a procflow. 139 * The routine iterates through the list of threadflows in the 140 * supplied procflow's pf_threads list. For each threadflow on 141 * the list, it defines tf_instances number of cloned 142 * threadflows, and then calls threadflow_createthread() for 143 * each to create and start the actual operating system thread. 144 * Note that each of the newly defined threadflows will be linked 145 * into the procflows threadflow list, but at the head of the 146 * list, so they will not become part of the supplied set. After 147 * all the threads have been created, threadflow_init enters 148 * a join loop for all the threads in the newly defined 149 * threadflows. Once all the created threads have exited, 150 * threadflow_init will return 0. If errors are encountered, it 151 * will return a non zero value. 152 */ 153 int 154 threadflow_init(procflow_t *procflow) 155 { 156 threadflow_t *threadflow = procflow->pf_threads; 157 int ret = 0; 158 159 (void) ipc_mutex_lock(&filebench_shm->threadflow_lock); 160 #ifndef USE_PROCESS_MODEL 161 my_procflow = procflow; 162 163 (void) signal(SIGUSR1, threadflow_cancel); 164 #endif 165 while (threadflow) { 166 threadflow_t *newthread; 167 int i; 168 169 filebench_log(LOG_VERBOSE, 170 "Starting %lld %s threads", 171 *(threadflow->tf_instances), 172 threadflow->tf_name); 173 174 for (i = 1; i < *threadflow->tf_instances; i++) { 175 /* Create threads */ 176 newthread = 177 threadflow_define_common(procflow, 178 threadflow->tf_name, threadflow, i + 1); 179 if (newthread == NULL) 180 return (-1); 181 ret += threadflow_createthread(newthread); 182 } 183 184 newthread = threadflow_define_common(procflow, 185 threadflow->tf_name, 186 threadflow, 1); 187 188 if (newthread == NULL) 189 return (-1); 190 191 /* Create threads */ 192 ret += threadflow_createthread(newthread); 193 194 threadflow = threadflow->tf_next; 195 } 196 197 threadflow = procflow->pf_threads; 198 199 (void) ipc_mutex_unlock(&filebench_shm->threadflow_lock); 200 201 while (threadflow) { 202 void *status; 203 204 if (threadflow->tf_tid) 205 (void) pthread_join(threadflow->tf_tid, &status); 206 207 ret += *(int *)status; 208 threadflow = threadflow->tf_next; 209 } 210 211 procflow->pf_running = 0; 212 213 return (ret); 214 } 215 216 /* 217 * Tells the threadflow's thread to stop and optionally signals 218 * its associated process to end the thread. 219 */ 220 static void 221 threadflow_kill(threadflow_t *threadflow) 222 { 223 /* Tell thread to finish */ 224 threadflow->tf_abort = 1; 225 226 #ifdef USE_PROCESS_MODEL 227 #ifdef HAVE_SIGSEND 228 (void) sigsend(P_PID, threadflow->tf_process->pf_pid, SIGUSR1); 229 #else 230 (void) kill(threadflow->tf_process->pf_pid, SIGUSR1); 231 #endif 232 #else /* USE_PROCESS_MODEL */ 233 threadflow->tf_process->pf_running = 0; 234 #endif /* USE_PROCESS_MODEL */ 235 } 236 237 /* 238 * Deletes the specified threadflow from the specified threadflow 239 * list after first terminating the threadflow's thread, deleting 240 * the threadflow's flowops, and finally freeing the threadflow 241 * entity. It also subtracts the threadflow's shared memory 242 * requirements from the total amount required, shm_required. If 243 * the specified threadflow is found, returns 0, otherwise 244 * returns -1. 245 */ 246 static int 247 threadflow_delete(threadflow_t **threadlist, threadflow_t *threadflow) 248 { 249 threadflow_t *entry = *threadlist; 250 251 filebench_log(LOG_DEBUG_IMPL, "Deleting thread: (%s-%d)", 252 threadflow->tf_name, 253 threadflow->tf_instance); 254 255 if (threadflow->tf_attrs & THREADFLOW_USEISM) { 256 filebench_shm->shm_required -= (*threadflow->tf_memsize); 257 } 258 259 if (threadflow == *threadlist) { 260 /* First on list */ 261 filebench_log(LOG_DEBUG_IMPL, "Deleted thread: (%s-%d)", 262 threadflow->tf_name, 263 threadflow->tf_instance); 264 265 threadflow_kill(threadflow); 266 flowop_delete_all(&threadflow->tf_ops); 267 *threadlist = threadflow->tf_next; 268 ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow); 269 return (0); 270 } 271 272 while (entry->tf_next) { 273 filebench_log(LOG_DEBUG_IMPL, 274 "Delete thread: (%s-%d) == (%s-%d)", 275 entry->tf_next->tf_name, 276 entry->tf_next->tf_instance, 277 threadflow->tf_name, 278 threadflow->tf_instance); 279 280 if (threadflow == entry->tf_next) { 281 /* Delete */ 282 filebench_log(LOG_DEBUG_IMPL, 283 "Deleted thread: (%s-%d)", 284 entry->tf_next->tf_name, 285 entry->tf_next->tf_instance); 286 threadflow_kill(entry->tf_next); 287 flowop_delete_all(&entry->tf_next->tf_ops); 288 ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow); 289 entry->tf_next = entry->tf_next->tf_next; 290 return (0); 291 } 292 entry = entry->tf_next; 293 } 294 295 return (-1); 296 } 297 298 /* 299 * Given a pointer to the thread list of a procflow, cycles 300 * through all the threadflows on the list, deleting each one 301 * except the FLOW_MASTER. 302 */ 303 void 304 threadflow_delete_all(threadflow_t **threadlist) 305 { 306 threadflow_t *threadflow = *threadlist; 307 308 (void) ipc_mutex_lock(&filebench_shm->threadflow_lock); 309 310 filebench_log(LOG_DEBUG_IMPL, "Deleting all threads"); 311 312 while (threadflow) { 313 if (threadflow->tf_instance && 314 (threadflow->tf_instance == FLOW_MASTER)) { 315 threadflow = threadflow->tf_next; 316 continue; 317 } 318 (void) threadflow_delete(threadlist, threadflow); 319 threadflow = threadflow->tf_next; 320 } 321 322 (void) ipc_mutex_unlock(&filebench_shm->threadflow_lock); 323 } 324 325 /* 326 * Waits till all threadflows are started, or a timeout occurs. 327 * Checks through the list of threadflows, waiting up to 10 328 * seconds for each one to set its tf_running flag to 1. If not 329 * set after 10 seconds, continues on to the next threadflow 330 * anyway. 331 */ 332 void 333 threadflow_allstarted(pid_t pid, threadflow_t *threadflow) 334 { 335 (void) ipc_mutex_lock(&filebench_shm->threadflow_lock); 336 337 while (threadflow) { 338 int waits; 339 340 if ((threadflow->tf_instance == 0) || 341 (threadflow->tf_instance == FLOW_MASTER)) { 342 threadflow = threadflow->tf_next; 343 continue; 344 } 345 346 filebench_log(LOG_DEBUG_IMPL, "Checking pid %d thread %s-%d", 347 pid, 348 threadflow->tf_name, 349 threadflow->tf_instance); 350 351 waits = 10; 352 while (waits && threadflow->tf_running == 0) { 353 (void) ipc_mutex_unlock( 354 &filebench_shm->threadflow_lock); 355 if (waits < 3) 356 filebench_log(LOG_INFO, 357 "Waiting for pid %d thread %s-%d", 358 pid, 359 threadflow->tf_name, 360 threadflow->tf_instance); 361 362 (void) sleep(1); 363 (void) ipc_mutex_lock(&filebench_shm->threadflow_lock); 364 waits--; 365 } 366 367 threadflow = threadflow->tf_next; 368 } 369 370 (void) ipc_mutex_unlock(&filebench_shm->threadflow_lock); 371 } 372 373 /* 374 * Create an in-memory thread object linked to a parent procflow. 375 * A threadflow entity is allocated from shared memory and 376 * initialized from the "inherit" threadflow if supplied, 377 * otherwise to zeros. The threadflow is assigned a unique 378 * thread id, the supplied instance number, the supplied name 379 * and added to the procflow's pf_thread list. If no name is 380 * supplied or the threadflow can't be allocated, NULL is 381 * returned Otherwise a pointer to the newly allocated threadflow 382 * is returned. 383 * 384 * The filebench_shm->threadflow_lock must be held by the caller. 385 */ 386 static threadflow_t * 387 threadflow_define_common(procflow_t *procflow, char *name, 388 threadflow_t *inherit, int instance) 389 { 390 threadflow_t *threadflow; 391 threadflow_t **threadlistp = &procflow->pf_threads; 392 393 if (name == NULL) 394 return (NULL); 395 396 threadflow = (threadflow_t *)ipc_malloc(FILEBENCH_THREADFLOW); 397 398 if (threadflow == NULL) 399 return (NULL); 400 401 if (inherit) 402 (void) memcpy(threadflow, inherit, sizeof (threadflow_t)); 403 else 404 (void) memset(threadflow, 0, sizeof (threadflow_t)); 405 406 threadflow->tf_utid = ++filebench_shm->utid; 407 408 threadflow->tf_instance = instance; 409 (void) strcpy(threadflow->tf_name, name); 410 threadflow->tf_process = procflow; 411 412 filebench_log(LOG_DEBUG_IMPL, "Defining thread %s-%d", 413 name, instance); 414 415 /* Add threadflow to list */ 416 if (*threadlistp == NULL) { 417 *threadlistp = threadflow; 418 threadflow->tf_next = NULL; 419 } else { 420 threadflow->tf_next = *threadlistp; 421 *threadlistp = threadflow; 422 } 423 424 return (threadflow); 425 } 426 427 /* 428 * Create an in memory FLOW_MASTER thread object as described 429 * by the syntax. Acquire the filebench_shm->threadflow_lock and 430 * call threadflow_define_common() to create a threadflow entity. 431 * Set the number of instances to create at runtime, 432 * tf_instances, to "instances". Return the threadflow pointer 433 * returned by the threadflow_define_common call. 434 */ 435 threadflow_t * 436 threadflow_define(procflow_t *procflow, char *name, 437 threadflow_t *inherit, var_integer_t instances) 438 { 439 threadflow_t *threadflow; 440 441 (void) ipc_mutex_lock(&filebench_shm->threadflow_lock); 442 443 if ((threadflow = threadflow_define_common(procflow, name, 444 inherit, FLOW_MASTER)) == NULL) 445 return (NULL); 446 447 threadflow->tf_instances = instances; 448 449 (void) ipc_mutex_unlock(&filebench_shm->threadflow_lock); 450 451 return (threadflow); 452 } 453 454 455 /* 456 * Searches the provided threadflow list for the named threadflow. 457 * A pointer to the threadflow is returned, or NULL if threadflow 458 * is not found. 459 */ 460 threadflow_t * 461 threadflow_find(threadflow_t *threadlist, char *name) 462 { 463 threadflow_t *threadflow = threadlist; 464 465 (void) ipc_mutex_lock(&filebench_shm->threadflow_lock); 466 467 while (threadflow) { 468 if (strcmp(name, threadflow->tf_name) == 0) { 469 470 (void) ipc_mutex_unlock( 471 &filebench_shm->threadflow_lock); 472 473 return (threadflow); 474 } 475 threadflow = threadflow->tf_next; 476 } 477 478 (void) ipc_mutex_unlock(&filebench_shm->threadflow_lock); 479 480 481 return (NULL); 482 } 483