15184Sek110237 /* 25184Sek110237 * CDDL HEADER START 35184Sek110237 * 45184Sek110237 * The contents of this file are subject to the terms of the 55184Sek110237 * Common Development and Distribution License (the "License"). 65184Sek110237 * You may not use this file except in compliance with the License. 75184Sek110237 * 85184Sek110237 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 95184Sek110237 * or http://www.opensolaris.org/os/licensing. 105184Sek110237 * See the License for the specific language governing permissions 115184Sek110237 * and limitations under the License. 125184Sek110237 * 135184Sek110237 * When distributing Covered Code, include this CDDL HEADER in each 145184Sek110237 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 155184Sek110237 * If applicable, add the following below this CDDL HEADER, with the 165184Sek110237 * fields enclosed by brackets "[]" replaced with your own identifying 175184Sek110237 * information: Portions Copyright [yyyy] [name of copyright owner] 185184Sek110237 * 195184Sek110237 * CDDL HEADER END 205184Sek110237 */ 215184Sek110237 /* 228615SAndrew.W.Wilson@sun.com * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 235184Sek110237 * Use is subject to license terms. 246613Sek110237 * 256613Sek110237 * Portions Copyright 2008 Denis Cheng 265184Sek110237 */ 275184Sek110237 285184Sek110237 #include "config.h" 295184Sek110237 #include <pthread.h> 305184Sek110237 #ifdef HAVE_LWPS 315184Sek110237 #include <sys/lwp.h> 325184Sek110237 #endif 335184Sek110237 #include <signal.h> 346613Sek110237 356613Sek110237 #include "filebench.h" 365184Sek110237 #include "threadflow.h" 375184Sek110237 #include "flowop.h" 385184Sek110237 #include "ipc.h" 395184Sek110237 405184Sek110237 static threadflow_t *threadflow_define_common(procflow_t *procflow, 415184Sek110237 char *name, threadflow_t *inherit, int instance); 425184Sek110237 435184Sek110237 /* 445184Sek110237 * Threadflows are filebench entities which manage operating system 455184Sek110237 * threads. Each worker threadflow spawns a separate filebench thread, 465184Sek110237 * with attributes inherited from a FLOW_MASTER threadflow created during 475184Sek110237 * f model language parsing. This section contains routines to define, 485184Sek110237 * create, control, and delete threadflows. 495184Sek110237 * 505184Sek110237 * Each thread defined in the f model creates a FLOW_MASTER 515184Sek110237 * threadflow which encapsulates the defined attributes and flowops of 525184Sek110237 * the f language thread, including the number of instances to create. 535184Sek110237 * At runtime, a worker threadflow instance with an associated filebench 545184Sek110237 * thread is created, which runs until told to quit or is specifically 555184Sek110237 * deleted. 565184Sek110237 */ 575184Sek110237 585184Sek110237 595184Sek110237 /* 605184Sek110237 * Prints information about threadflow syntax. 615184Sek110237 */ 625184Sek110237 void 635184Sek110237 threadflow_usage(void) 645184Sek110237 { 655184Sek110237 (void) fprintf(stderr, " thread name=<name>[,instances=<count>]\n"); 665184Sek110237 (void) fprintf(stderr, "\n"); 675184Sek110237 (void) fprintf(stderr, " {\n"); 685184Sek110237 (void) fprintf(stderr, " flowop ...\n"); 695184Sek110237 (void) fprintf(stderr, " flowop ...\n"); 705184Sek110237 (void) fprintf(stderr, " flowop ...\n"); 715184Sek110237 (void) fprintf(stderr, " }\n"); 725184Sek110237 (void) fprintf(stderr, "\n"); 735184Sek110237 } 745184Sek110237 755184Sek110237 /* 765184Sek110237 * Creates a thread for the supplied threadflow. If interprocess 775184Sek110237 * shared memory is desired, then increments the amount of shared 785184Sek110237 * memory needed by the amount specified in the threadflow's 795184Sek110237 * tf_memsize parameter. The thread starts in routine 805184Sek110237 * flowop_start() with a poineter to the threadflow supplied 815184Sek110237 * as the argument. 825184Sek110237 */ 835184Sek110237 static int 845184Sek110237 threadflow_createthread(threadflow_t *threadflow) 855184Sek110237 { 866212Saw148015 fbint_t memsize; 876212Saw148015 memsize = avd_get_int(threadflow->tf_memsize); 886212Saw148015 threadflow->tf_constmemsize = memsize; 896212Saw148015 905184Sek110237 filebench_log(LOG_DEBUG_SCRIPT, "Creating thread %s, memory = %ld", 916212Saw148015 threadflow->tf_name, memsize); 925184Sek110237 935184Sek110237 if (threadflow->tf_attrs & THREADFLOW_USEISM) 946212Saw148015 filebench_shm->shm_required += memsize; 955184Sek110237 965184Sek110237 if (pthread_create(&threadflow->tf_tid, NULL, 975184Sek110237 (void *(*)(void*))flowop_start, threadflow) != 0) { 985184Sek110237 filebench_log(LOG_ERROR, "thread create failed"); 995184Sek110237 filebench_shutdown(1); 1006084Saw148015 return (FILEBENCH_ERROR); 1015184Sek110237 } 1025184Sek110237 1036084Saw148015 return (FILEBENCH_OK); 1045184Sek110237 } 1055184Sek110237 1065184Sek110237 /* 1075184Sek110237 * Creates threads for the threadflows associated with a procflow. 1085184Sek110237 * The routine iterates through the list of threadflows in the 1095184Sek110237 * supplied procflow's pf_threads list. For each threadflow on 1105184Sek110237 * the list, it defines tf_instances number of cloned 1115184Sek110237 * threadflows, and then calls threadflow_createthread() for 1125184Sek110237 * each to create and start the actual operating system thread. 1135184Sek110237 * Note that each of the newly defined threadflows will be linked 1145184Sek110237 * into the procflows threadflow list, but at the head of the 1155184Sek110237 * list, so they will not become part of the supplied set. After 1165184Sek110237 * all the threads have been created, threadflow_init enters 1175184Sek110237 * a join loop for all the threads in the newly defined 1185184Sek110237 * threadflows. Once all the created threads have exited, 1195184Sek110237 * threadflow_init will return 0. If errors are encountered, it 1205184Sek110237 * will return a non zero value. 1215184Sek110237 */ 1225184Sek110237 int 1235184Sek110237 threadflow_init(procflow_t *procflow) 1245184Sek110237 { 1255184Sek110237 threadflow_t *threadflow = procflow->pf_threads; 1265184Sek110237 int ret = 0; 1275184Sek110237 1286391Saw148015 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock); 1295184Sek110237 1305184Sek110237 while (threadflow) { 1315184Sek110237 threadflow_t *newthread; 1326212Saw148015 int instances; 1335184Sek110237 int i; 1345184Sek110237 1356212Saw148015 instances = avd_get_int(threadflow->tf_instances); 1365184Sek110237 filebench_log(LOG_VERBOSE, 1376286Saw148015 "Starting %d %s threads", 1386212Saw148015 instances, threadflow->tf_name); 1395184Sek110237 1406212Saw148015 for (i = 1; i < instances; i++) { 1415184Sek110237 /* Create threads */ 1425184Sek110237 newthread = 1435184Sek110237 threadflow_define_common(procflow, 1445184Sek110237 threadflow->tf_name, threadflow, i + 1); 1455184Sek110237 if (newthread == NULL) 1465184Sek110237 return (-1); 1476084Saw148015 ret |= threadflow_createthread(newthread); 1485184Sek110237 } 1495184Sek110237 1505184Sek110237 newthread = threadflow_define_common(procflow, 1515184Sek110237 threadflow->tf_name, 1525184Sek110237 threadflow, 1); 1535184Sek110237 1545184Sek110237 if (newthread == NULL) 1555184Sek110237 return (-1); 1565184Sek110237 1576084Saw148015 /* Create each thread */ 1586084Saw148015 ret |= threadflow_createthread(newthread); 1595184Sek110237 1605184Sek110237 threadflow = threadflow->tf_next; 1615184Sek110237 } 1625184Sek110237 1635184Sek110237 threadflow = procflow->pf_threads; 1645184Sek110237 1656391Saw148015 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock); 1665184Sek110237 1675184Sek110237 while (threadflow) { 1686333Sek110237 /* wait for all threads to finish */ 1696333Sek110237 if (threadflow->tf_tid) { 1706333Sek110237 void *status; 1715184Sek110237 1726333Sek110237 if (pthread_join(threadflow->tf_tid, &status) == 0) 1736333Sek110237 ret += *(int *)status; 1746333Sek110237 } 1755184Sek110237 threadflow = threadflow->tf_next; 1765184Sek110237 } 1775184Sek110237 1785184Sek110237 procflow->pf_running = 0; 1795184Sek110237 1805184Sek110237 return (ret); 1815184Sek110237 } 1825184Sek110237 1835184Sek110237 /* 1845184Sek110237 * Tells the threadflow's thread to stop and optionally signals 1855184Sek110237 * its associated process to end the thread. 1865184Sek110237 */ 1875184Sek110237 static void 1888762SAndrew.W.Wilson@sun.com threadflow_kill(threadflow_t *threadflow) 1895184Sek110237 { 1908762SAndrew.W.Wilson@sun.com int wait_cnt = 2; 1918762SAndrew.W.Wilson@sun.com 1925184Sek110237 /* Tell thread to finish */ 1935184Sek110237 threadflow->tf_abort = 1; 1945184Sek110237 1956084Saw148015 /* wait a bit for threadflow to stop */ 1966084Saw148015 while (wait_cnt && threadflow->tf_running) { 1976084Saw148015 (void) sleep(1); 1986084Saw148015 wait_cnt--; 1996084Saw148015 } 2006084Saw148015 2018762SAndrew.W.Wilson@sun.com if (threadflow->tf_running) { 2028762SAndrew.W.Wilson@sun.com threadflow->tf_running = FALSE; 203*8769SAndrew.W.Wilson@sun.com (void) pthread_kill(threadflow->tf_tid, SIGKILL); 2048762SAndrew.W.Wilson@sun.com } 2055184Sek110237 } 2065184Sek110237 2075184Sek110237 /* 2085184Sek110237 * Deletes the specified threadflow from the specified threadflow 2095184Sek110237 * list after first terminating the threadflow's thread, deleting 2105184Sek110237 * the threadflow's flowops, and finally freeing the threadflow 2115184Sek110237 * entity. It also subtracts the threadflow's shared memory 2125184Sek110237 * requirements from the total amount required, shm_required. If 2135184Sek110237 * the specified threadflow is found, returns 0, otherwise 2145184Sek110237 * returns -1. 2155184Sek110237 */ 2165184Sek110237 static int 2178762SAndrew.W.Wilson@sun.com threadflow_delete(threadflow_t **threadlist, threadflow_t *threadflow) 2185184Sek110237 { 2195184Sek110237 threadflow_t *entry = *threadlist; 2205184Sek110237 2215184Sek110237 filebench_log(LOG_DEBUG_IMPL, "Deleting thread: (%s-%d)", 2225184Sek110237 threadflow->tf_name, 2235184Sek110237 threadflow->tf_instance); 2245184Sek110237 2256212Saw148015 if (threadflow->tf_attrs & THREADFLOW_USEISM) 2266212Saw148015 filebench_shm->shm_required -= threadflow->tf_constmemsize; 2275184Sek110237 2285184Sek110237 if (threadflow == *threadlist) { 2295184Sek110237 /* First on list */ 2305184Sek110237 filebench_log(LOG_DEBUG_IMPL, "Deleted thread: (%s-%d)", 2315184Sek110237 threadflow->tf_name, 2325184Sek110237 threadflow->tf_instance); 2335184Sek110237 2348762SAndrew.W.Wilson@sun.com threadflow_kill(threadflow); 2356550Saw148015 flowop_delete_all(&threadflow->tf_thrd_fops); 2365184Sek110237 *threadlist = threadflow->tf_next; 2376613Sek110237 (void) pthread_mutex_destroy(&threadflow->tf_lock); 2385184Sek110237 ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow); 2395184Sek110237 return (0); 2405184Sek110237 } 2415184Sek110237 2425184Sek110237 while (entry->tf_next) { 2435184Sek110237 filebench_log(LOG_DEBUG_IMPL, 2445184Sek110237 "Delete thread: (%s-%d) == (%s-%d)", 2455184Sek110237 entry->tf_next->tf_name, 2465184Sek110237 entry->tf_next->tf_instance, 2475184Sek110237 threadflow->tf_name, 2485184Sek110237 threadflow->tf_instance); 2495184Sek110237 2505184Sek110237 if (threadflow == entry->tf_next) { 2515184Sek110237 /* Delete */ 2525184Sek110237 filebench_log(LOG_DEBUG_IMPL, 2535184Sek110237 "Deleted thread: (%s-%d)", 2545184Sek110237 entry->tf_next->tf_name, 2555184Sek110237 entry->tf_next->tf_instance); 2568762SAndrew.W.Wilson@sun.com threadflow_kill(entry->tf_next); 2576550Saw148015 flowop_delete_all(&entry->tf_next->tf_thrd_fops); 2586613Sek110237 (void) pthread_mutex_destroy(&threadflow->tf_lock); 2595184Sek110237 ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow); 2605184Sek110237 entry->tf_next = entry->tf_next->tf_next; 2615184Sek110237 return (0); 2625184Sek110237 } 2635184Sek110237 entry = entry->tf_next; 2645184Sek110237 } 2655184Sek110237 2665184Sek110237 return (-1); 2675184Sek110237 } 2685184Sek110237 2695184Sek110237 /* 2705184Sek110237 * Given a pointer to the thread list of a procflow, cycles 2715184Sek110237 * through all the threadflows on the list, deleting each one 2725184Sek110237 * except the FLOW_MASTER. 2735184Sek110237 */ 2745184Sek110237 void 2758762SAndrew.W.Wilson@sun.com threadflow_delete_all(threadflow_t **threadlist) 2765184Sek110237 { 2778615SAndrew.W.Wilson@sun.com threadflow_t *threadflow; 2785184Sek110237 2796391Saw148015 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock); 2805184Sek110237 2818615SAndrew.W.Wilson@sun.com threadflow = *threadlist; 2825184Sek110237 filebench_log(LOG_DEBUG_IMPL, "Deleting all threads"); 2835184Sek110237 2845184Sek110237 while (threadflow) { 2855184Sek110237 if (threadflow->tf_instance && 2865184Sek110237 (threadflow->tf_instance == FLOW_MASTER)) { 2875184Sek110237 threadflow = threadflow->tf_next; 2885184Sek110237 continue; 2895184Sek110237 } 2908762SAndrew.W.Wilson@sun.com (void) threadflow_delete(threadlist, threadflow); 2915184Sek110237 threadflow = threadflow->tf_next; 2925184Sek110237 } 2935184Sek110237 2946391Saw148015 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock); 2955184Sek110237 } 2965184Sek110237 2975184Sek110237 /* 2985184Sek110237 * Waits till all threadflows are started, or a timeout occurs. 2995184Sek110237 * Checks through the list of threadflows, waiting up to 10 3005184Sek110237 * seconds for each one to set its tf_running flag to 1. If not 3015184Sek110237 * set after 10 seconds, continues on to the next threadflow 3025184Sek110237 * anyway. 3035184Sek110237 */ 3045184Sek110237 void 3055184Sek110237 threadflow_allstarted(pid_t pid, threadflow_t *threadflow) 3065184Sek110237 { 3076391Saw148015 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock); 3085184Sek110237 3095184Sek110237 while (threadflow) { 3105184Sek110237 int waits; 3115184Sek110237 3125184Sek110237 if ((threadflow->tf_instance == 0) || 3135184Sek110237 (threadflow->tf_instance == FLOW_MASTER)) { 3145184Sek110237 threadflow = threadflow->tf_next; 3155184Sek110237 continue; 3165184Sek110237 } 3175184Sek110237 3185184Sek110237 filebench_log(LOG_DEBUG_IMPL, "Checking pid %d thread %s-%d", 3195184Sek110237 pid, 3205184Sek110237 threadflow->tf_name, 3215184Sek110237 threadflow->tf_instance); 3225184Sek110237 3235184Sek110237 waits = 10; 3246391Saw148015 while (waits && (threadflow->tf_running == 0) && 3256391Saw148015 (filebench_shm->shm_f_abort == 0)) { 3265184Sek110237 (void) ipc_mutex_unlock( 3276391Saw148015 &filebench_shm->shm_threadflow_lock); 3285184Sek110237 if (waits < 3) 3295184Sek110237 filebench_log(LOG_INFO, 3305184Sek110237 "Waiting for pid %d thread %s-%d", 3315184Sek110237 pid, 3325184Sek110237 threadflow->tf_name, 3335184Sek110237 threadflow->tf_instance); 3345184Sek110237 3355184Sek110237 (void) sleep(1); 3366391Saw148015 (void) ipc_mutex_lock( 3376391Saw148015 &filebench_shm->shm_threadflow_lock); 3385184Sek110237 waits--; 3395184Sek110237 } 3405184Sek110237 3415184Sek110237 threadflow = threadflow->tf_next; 3425184Sek110237 } 3435184Sek110237 3446391Saw148015 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock); 3455184Sek110237 } 3465184Sek110237 3475184Sek110237 /* 3485184Sek110237 * Create an in-memory thread object linked to a parent procflow. 3495184Sek110237 * A threadflow entity is allocated from shared memory and 3505184Sek110237 * initialized from the "inherit" threadflow if supplied, 3515184Sek110237 * otherwise to zeros. The threadflow is assigned a unique 3525184Sek110237 * thread id, the supplied instance number, the supplied name 3535184Sek110237 * and added to the procflow's pf_thread list. If no name is 3545184Sek110237 * supplied or the threadflow can't be allocated, NULL is 3555184Sek110237 * returned Otherwise a pointer to the newly allocated threadflow 3565184Sek110237 * is returned. 3575184Sek110237 * 3586391Saw148015 * The filebench_shm->shm_threadflow_lock must be held by the caller. 3595184Sek110237 */ 3605184Sek110237 static threadflow_t * 3615184Sek110237 threadflow_define_common(procflow_t *procflow, char *name, 3625184Sek110237 threadflow_t *inherit, int instance) 3635184Sek110237 { 3645184Sek110237 threadflow_t *threadflow; 3655184Sek110237 threadflow_t **threadlistp = &procflow->pf_threads; 3665184Sek110237 3675184Sek110237 if (name == NULL) 3685184Sek110237 return (NULL); 3695184Sek110237 3705184Sek110237 threadflow = (threadflow_t *)ipc_malloc(FILEBENCH_THREADFLOW); 3715184Sek110237 3725184Sek110237 if (threadflow == NULL) 3735184Sek110237 return (NULL); 3745184Sek110237 3755184Sek110237 if (inherit) 3765184Sek110237 (void) memcpy(threadflow, inherit, sizeof (threadflow_t)); 3775184Sek110237 else 3785184Sek110237 (void) memset(threadflow, 0, sizeof (threadflow_t)); 3795184Sek110237 3806391Saw148015 threadflow->tf_utid = ++filebench_shm->shm_utid; 3815184Sek110237 3825184Sek110237 threadflow->tf_instance = instance; 3835184Sek110237 (void) strcpy(threadflow->tf_name, name); 3845184Sek110237 threadflow->tf_process = procflow; 3857556SAndrew.W.Wilson@sun.com (void) pthread_mutex_init(&threadflow->tf_lock, 3867556SAndrew.W.Wilson@sun.com ipc_mutexattr(IPC_MUTEX_NORMAL)); 3875184Sek110237 3885184Sek110237 filebench_log(LOG_DEBUG_IMPL, "Defining thread %s-%d", 3895184Sek110237 name, instance); 3905184Sek110237 3915184Sek110237 /* Add threadflow to list */ 3925184Sek110237 if (*threadlistp == NULL) { 3935184Sek110237 *threadlistp = threadflow; 3945184Sek110237 threadflow->tf_next = NULL; 3955184Sek110237 } else { 3965184Sek110237 threadflow->tf_next = *threadlistp; 3975184Sek110237 *threadlistp = threadflow; 3985184Sek110237 } 3995184Sek110237 4005184Sek110237 return (threadflow); 4015184Sek110237 } 4025184Sek110237 4035184Sek110237 /* 4045184Sek110237 * Create an in memory FLOW_MASTER thread object as described 4056391Saw148015 * by the syntax. Acquire the filebench_shm->shm_threadflow_lock and 4065184Sek110237 * call threadflow_define_common() to create a threadflow entity. 4075184Sek110237 * Set the number of instances to create at runtime, 4085184Sek110237 * tf_instances, to "instances". Return the threadflow pointer 4095184Sek110237 * returned by the threadflow_define_common call. 4105184Sek110237 */ 4115184Sek110237 threadflow_t * 4125184Sek110237 threadflow_define(procflow_t *procflow, char *name, 4136212Saw148015 threadflow_t *inherit, avd_t instances) 4145184Sek110237 { 4155184Sek110237 threadflow_t *threadflow; 4165184Sek110237 4176391Saw148015 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock); 4185184Sek110237 4195184Sek110237 if ((threadflow = threadflow_define_common(procflow, name, 4205184Sek110237 inherit, FLOW_MASTER)) == NULL) 4215184Sek110237 return (NULL); 4225184Sek110237 4235184Sek110237 threadflow->tf_instances = instances; 4245184Sek110237 4256391Saw148015 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock); 4265184Sek110237 4275184Sek110237 return (threadflow); 4285184Sek110237 } 4295184Sek110237 4305184Sek110237 4315184Sek110237 /* 4325184Sek110237 * Searches the provided threadflow list for the named threadflow. 4335184Sek110237 * A pointer to the threadflow is returned, or NULL if threadflow 4345184Sek110237 * is not found. 4355184Sek110237 */ 4365184Sek110237 threadflow_t * 4375184Sek110237 threadflow_find(threadflow_t *threadlist, char *name) 4385184Sek110237 { 4395184Sek110237 threadflow_t *threadflow = threadlist; 4405184Sek110237 4416391Saw148015 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock); 4425184Sek110237 4435184Sek110237 while (threadflow) { 4445184Sek110237 if (strcmp(name, threadflow->tf_name) == 0) { 4455184Sek110237 4465184Sek110237 (void) ipc_mutex_unlock( 4476391Saw148015 &filebench_shm->shm_threadflow_lock); 4485184Sek110237 4495184Sek110237 return (threadflow); 4505184Sek110237 } 4515184Sek110237 threadflow = threadflow->tf_next; 4525184Sek110237 } 4535184Sek110237 4546391Saw148015 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock); 4555184Sek110237 4565184Sek110237 4575184Sek110237 return (NULL); 4585184Sek110237 } 459