15184Sek110237 /*
25184Sek110237 * CDDL HEADER START
35184Sek110237 *
45184Sek110237 * The contents of this file are subject to the terms of the
55184Sek110237 * Common Development and Distribution License (the "License").
65184Sek110237 * You may not use this file except in compliance with the License.
75184Sek110237 *
85184Sek110237 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
95184Sek110237 * or http://www.opensolaris.org/os/licensing.
105184Sek110237 * See the License for the specific language governing permissions
115184Sek110237 * and limitations under the License.
125184Sek110237 *
135184Sek110237 * When distributing Covered Code, include this CDDL HEADER in each
145184Sek110237 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
155184Sek110237 * If applicable, add the following below this CDDL HEADER, with the
165184Sek110237 * fields enclosed by brackets "[]" replaced with your own identifying
175184Sek110237 * information: Portions Copyright [yyyy] [name of copyright owner]
185184Sek110237 *
195184Sek110237 * CDDL HEADER END
205184Sek110237 */
215184Sek110237 /*
228615SAndrew.W.Wilson@sun.com * Copyright 2009 Sun Microsystems, Inc. All rights reserved.
235184Sek110237 * Use is subject to license terms.
246613Sek110237 *
256613Sek110237 * Portions Copyright 2008 Denis Cheng
265184Sek110237 */
275184Sek110237
285184Sek110237 #include "config.h"
295184Sek110237 #include <pthread.h>
305184Sek110237 #ifdef HAVE_LWPS
315184Sek110237 #include <sys/lwp.h>
325184Sek110237 #endif
335184Sek110237 #include <signal.h>
346613Sek110237
356613Sek110237 #include "filebench.h"
365184Sek110237 #include "threadflow.h"
375184Sek110237 #include "flowop.h"
385184Sek110237 #include "ipc.h"
395184Sek110237
405184Sek110237 static threadflow_t *threadflow_define_common(procflow_t *procflow,
415184Sek110237 char *name, threadflow_t *inherit, int instance);
425184Sek110237
435184Sek110237 /*
445184Sek110237 * Threadflows are filebench entities which manage operating system
455184Sek110237 * threads. Each worker threadflow spawns a separate filebench thread,
465184Sek110237 * with attributes inherited from a FLOW_MASTER threadflow created during
475184Sek110237 * f model language parsing. This section contains routines to define,
485184Sek110237 * create, control, and delete threadflows.
495184Sek110237 *
505184Sek110237 * Each thread defined in the f model creates a FLOW_MASTER
515184Sek110237 * threadflow which encapsulates the defined attributes and flowops of
525184Sek110237 * the f language thread, including the number of instances to create.
535184Sek110237 * At runtime, a worker threadflow instance with an associated filebench
545184Sek110237 * thread is created, which runs until told to quit or is specifically
555184Sek110237 * deleted.
565184Sek110237 */
575184Sek110237
585184Sek110237
595184Sek110237 /*
605184Sek110237 * Prints information about threadflow syntax.
615184Sek110237 */
625184Sek110237 void
threadflow_usage(void)635184Sek110237 threadflow_usage(void)
645184Sek110237 {
655184Sek110237 (void) fprintf(stderr, " thread name=<name>[,instances=<count>]\n");
665184Sek110237 (void) fprintf(stderr, "\n");
675184Sek110237 (void) fprintf(stderr, " {\n");
685184Sek110237 (void) fprintf(stderr, " flowop ...\n");
695184Sek110237 (void) fprintf(stderr, " flowop ...\n");
705184Sek110237 (void) fprintf(stderr, " flowop ...\n");
715184Sek110237 (void) fprintf(stderr, " }\n");
725184Sek110237 (void) fprintf(stderr, "\n");
735184Sek110237 }
745184Sek110237
755184Sek110237 /*
765184Sek110237 * Creates a thread for the supplied threadflow. If interprocess
775184Sek110237 * shared memory is desired, then increments the amount of shared
785184Sek110237 * memory needed by the amount specified in the threadflow's
795184Sek110237 * tf_memsize parameter. The thread starts in routine
805184Sek110237 * flowop_start() with a poineter to the threadflow supplied
815184Sek110237 * as the argument.
825184Sek110237 */
835184Sek110237 static int
threadflow_createthread(threadflow_t * threadflow)845184Sek110237 threadflow_createthread(threadflow_t *threadflow)
855184Sek110237 {
866212Saw148015 fbint_t memsize;
876212Saw148015 memsize = avd_get_int(threadflow->tf_memsize);
886212Saw148015 threadflow->tf_constmemsize = memsize;
896212Saw148015
905184Sek110237 filebench_log(LOG_DEBUG_SCRIPT, "Creating thread %s, memory = %ld",
916212Saw148015 threadflow->tf_name, memsize);
925184Sek110237
935184Sek110237 if (threadflow->tf_attrs & THREADFLOW_USEISM)
946212Saw148015 filebench_shm->shm_required += memsize;
955184Sek110237
965184Sek110237 if (pthread_create(&threadflow->tf_tid, NULL,
975184Sek110237 (void *(*)(void*))flowop_start, threadflow) != 0) {
985184Sek110237 filebench_log(LOG_ERROR, "thread create failed");
995184Sek110237 filebench_shutdown(1);
1006084Saw148015 return (FILEBENCH_ERROR);
1015184Sek110237 }
1025184Sek110237
1036084Saw148015 return (FILEBENCH_OK);
1045184Sek110237 }
1055184Sek110237
1065184Sek110237 /*
1075184Sek110237 * Creates threads for the threadflows associated with a procflow.
1085184Sek110237 * The routine iterates through the list of threadflows in the
1095184Sek110237 * supplied procflow's pf_threads list. For each threadflow on
1105184Sek110237 * the list, it defines tf_instances number of cloned
1115184Sek110237 * threadflows, and then calls threadflow_createthread() for
1125184Sek110237 * each to create and start the actual operating system thread.
1135184Sek110237 * Note that each of the newly defined threadflows will be linked
1145184Sek110237 * into the procflows threadflow list, but at the head of the
1155184Sek110237 * list, so they will not become part of the supplied set. After
1165184Sek110237 * all the threads have been created, threadflow_init enters
1175184Sek110237 * a join loop for all the threads in the newly defined
1185184Sek110237 * threadflows. Once all the created threads have exited,
1195184Sek110237 * threadflow_init will return 0. If errors are encountered, it
1205184Sek110237 * will return a non zero value.
1215184Sek110237 */
1225184Sek110237 int
threadflow_init(procflow_t * procflow)1235184Sek110237 threadflow_init(procflow_t *procflow)
1245184Sek110237 {
1255184Sek110237 threadflow_t *threadflow = procflow->pf_threads;
1265184Sek110237 int ret = 0;
1275184Sek110237
1286391Saw148015 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock);
1295184Sek110237
1305184Sek110237 while (threadflow) {
1315184Sek110237 threadflow_t *newthread;
1326212Saw148015 int instances;
1335184Sek110237 int i;
1345184Sek110237
1356212Saw148015 instances = avd_get_int(threadflow->tf_instances);
1365184Sek110237 filebench_log(LOG_VERBOSE,
1376286Saw148015 "Starting %d %s threads",
1386212Saw148015 instances, threadflow->tf_name);
1395184Sek110237
1406212Saw148015 for (i = 1; i < instances; i++) {
1415184Sek110237 /* Create threads */
1425184Sek110237 newthread =
1435184Sek110237 threadflow_define_common(procflow,
1445184Sek110237 threadflow->tf_name, threadflow, i + 1);
1455184Sek110237 if (newthread == NULL)
1465184Sek110237 return (-1);
1476084Saw148015 ret |= threadflow_createthread(newthread);
1485184Sek110237 }
1495184Sek110237
1505184Sek110237 newthread = threadflow_define_common(procflow,
1515184Sek110237 threadflow->tf_name,
1525184Sek110237 threadflow, 1);
1535184Sek110237
1545184Sek110237 if (newthread == NULL)
1555184Sek110237 return (-1);
1565184Sek110237
1576084Saw148015 /* Create each thread */
1586084Saw148015 ret |= threadflow_createthread(newthread);
1595184Sek110237
1605184Sek110237 threadflow = threadflow->tf_next;
1615184Sek110237 }
1625184Sek110237
1635184Sek110237 threadflow = procflow->pf_threads;
1645184Sek110237
1656391Saw148015 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock);
1665184Sek110237
1675184Sek110237 while (threadflow) {
1686333Sek110237 /* wait for all threads to finish */
1696333Sek110237 if (threadflow->tf_tid) {
1706333Sek110237 void *status;
1715184Sek110237
1726333Sek110237 if (pthread_join(threadflow->tf_tid, &status) == 0)
1736333Sek110237 ret += *(int *)status;
1746333Sek110237 }
1755184Sek110237 threadflow = threadflow->tf_next;
1765184Sek110237 }
1775184Sek110237
1785184Sek110237 procflow->pf_running = 0;
1795184Sek110237
1805184Sek110237 return (ret);
1815184Sek110237 }
1825184Sek110237
1835184Sek110237 /*
1845184Sek110237 * Tells the threadflow's thread to stop and optionally signals
1855184Sek110237 * its associated process to end the thread.
1865184Sek110237 */
1875184Sek110237 static void
threadflow_kill(threadflow_t * threadflow)1888762SAndrew.W.Wilson@sun.com threadflow_kill(threadflow_t *threadflow)
1895184Sek110237 {
1908762SAndrew.W.Wilson@sun.com int wait_cnt = 2;
1918762SAndrew.W.Wilson@sun.com
1925184Sek110237 /* Tell thread to finish */
1935184Sek110237 threadflow->tf_abort = 1;
1945184Sek110237
1956084Saw148015 /* wait a bit for threadflow to stop */
1966084Saw148015 while (wait_cnt && threadflow->tf_running) {
1976084Saw148015 (void) sleep(1);
1986084Saw148015 wait_cnt--;
1996084Saw148015 }
2006084Saw148015
2018762SAndrew.W.Wilson@sun.com if (threadflow->tf_running) {
2028762SAndrew.W.Wilson@sun.com threadflow->tf_running = FALSE;
203*8769SAndrew.W.Wilson@sun.com (void) pthread_kill(threadflow->tf_tid, SIGKILL);
2048762SAndrew.W.Wilson@sun.com }
2055184Sek110237 }
2065184Sek110237
2075184Sek110237 /*
2085184Sek110237 * Deletes the specified threadflow from the specified threadflow
2095184Sek110237 * list after first terminating the threadflow's thread, deleting
2105184Sek110237 * the threadflow's flowops, and finally freeing the threadflow
2115184Sek110237 * entity. It also subtracts the threadflow's shared memory
2125184Sek110237 * requirements from the total amount required, shm_required. If
2135184Sek110237 * the specified threadflow is found, returns 0, otherwise
2145184Sek110237 * returns -1.
2155184Sek110237 */
2165184Sek110237 static int
threadflow_delete(threadflow_t ** threadlist,threadflow_t * threadflow)2178762SAndrew.W.Wilson@sun.com threadflow_delete(threadflow_t **threadlist, threadflow_t *threadflow)
2185184Sek110237 {
2195184Sek110237 threadflow_t *entry = *threadlist;
2205184Sek110237
2215184Sek110237 filebench_log(LOG_DEBUG_IMPL, "Deleting thread: (%s-%d)",
2225184Sek110237 threadflow->tf_name,
2235184Sek110237 threadflow->tf_instance);
2245184Sek110237
2256212Saw148015 if (threadflow->tf_attrs & THREADFLOW_USEISM)
2266212Saw148015 filebench_shm->shm_required -= threadflow->tf_constmemsize;
2275184Sek110237
2285184Sek110237 if (threadflow == *threadlist) {
2295184Sek110237 /* First on list */
2305184Sek110237 filebench_log(LOG_DEBUG_IMPL, "Deleted thread: (%s-%d)",
2315184Sek110237 threadflow->tf_name,
2325184Sek110237 threadflow->tf_instance);
2335184Sek110237
2348762SAndrew.W.Wilson@sun.com threadflow_kill(threadflow);
2356550Saw148015 flowop_delete_all(&threadflow->tf_thrd_fops);
2365184Sek110237 *threadlist = threadflow->tf_next;
2376613Sek110237 (void) pthread_mutex_destroy(&threadflow->tf_lock);
2385184Sek110237 ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow);
2395184Sek110237 return (0);
2405184Sek110237 }
2415184Sek110237
2425184Sek110237 while (entry->tf_next) {
2435184Sek110237 filebench_log(LOG_DEBUG_IMPL,
2445184Sek110237 "Delete thread: (%s-%d) == (%s-%d)",
2455184Sek110237 entry->tf_next->tf_name,
2465184Sek110237 entry->tf_next->tf_instance,
2475184Sek110237 threadflow->tf_name,
2485184Sek110237 threadflow->tf_instance);
2495184Sek110237
2505184Sek110237 if (threadflow == entry->tf_next) {
2515184Sek110237 /* Delete */
2525184Sek110237 filebench_log(LOG_DEBUG_IMPL,
2535184Sek110237 "Deleted thread: (%s-%d)",
2545184Sek110237 entry->tf_next->tf_name,
2555184Sek110237 entry->tf_next->tf_instance);
2568762SAndrew.W.Wilson@sun.com threadflow_kill(entry->tf_next);
2576550Saw148015 flowop_delete_all(&entry->tf_next->tf_thrd_fops);
2586613Sek110237 (void) pthread_mutex_destroy(&threadflow->tf_lock);
2595184Sek110237 ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow);
2605184Sek110237 entry->tf_next = entry->tf_next->tf_next;
2615184Sek110237 return (0);
2625184Sek110237 }
2635184Sek110237 entry = entry->tf_next;
2645184Sek110237 }
2655184Sek110237
2665184Sek110237 return (-1);
2675184Sek110237 }
2685184Sek110237
2695184Sek110237 /*
2705184Sek110237 * Given a pointer to the thread list of a procflow, cycles
2715184Sek110237 * through all the threadflows on the list, deleting each one
2725184Sek110237 * except the FLOW_MASTER.
2735184Sek110237 */
2745184Sek110237 void
threadflow_delete_all(threadflow_t ** threadlist)2758762SAndrew.W.Wilson@sun.com threadflow_delete_all(threadflow_t **threadlist)
2765184Sek110237 {
2778615SAndrew.W.Wilson@sun.com threadflow_t *threadflow;
2785184Sek110237
2796391Saw148015 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock);
2805184Sek110237
2818615SAndrew.W.Wilson@sun.com threadflow = *threadlist;
2825184Sek110237 filebench_log(LOG_DEBUG_IMPL, "Deleting all threads");
2835184Sek110237
2845184Sek110237 while (threadflow) {
2855184Sek110237 if (threadflow->tf_instance &&
2865184Sek110237 (threadflow->tf_instance == FLOW_MASTER)) {
2875184Sek110237 threadflow = threadflow->tf_next;
2885184Sek110237 continue;
2895184Sek110237 }
2908762SAndrew.W.Wilson@sun.com (void) threadflow_delete(threadlist, threadflow);
2915184Sek110237 threadflow = threadflow->tf_next;
2925184Sek110237 }
2935184Sek110237
2946391Saw148015 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock);
2955184Sek110237 }
2965184Sek110237
2975184Sek110237 /*
2985184Sek110237 * Waits till all threadflows are started, or a timeout occurs.
2995184Sek110237 * Checks through the list of threadflows, waiting up to 10
3005184Sek110237 * seconds for each one to set its tf_running flag to 1. If not
3015184Sek110237 * set after 10 seconds, continues on to the next threadflow
3025184Sek110237 * anyway.
3035184Sek110237 */
3045184Sek110237 void
threadflow_allstarted(pid_t pid,threadflow_t * threadflow)3055184Sek110237 threadflow_allstarted(pid_t pid, threadflow_t *threadflow)
3065184Sek110237 {
3076391Saw148015 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock);
3085184Sek110237
3095184Sek110237 while (threadflow) {
3105184Sek110237 int waits;
3115184Sek110237
3125184Sek110237 if ((threadflow->tf_instance == 0) ||
3135184Sek110237 (threadflow->tf_instance == FLOW_MASTER)) {
3145184Sek110237 threadflow = threadflow->tf_next;
3155184Sek110237 continue;
3165184Sek110237 }
3175184Sek110237
3185184Sek110237 filebench_log(LOG_DEBUG_IMPL, "Checking pid %d thread %s-%d",
3195184Sek110237 pid,
3205184Sek110237 threadflow->tf_name,
3215184Sek110237 threadflow->tf_instance);
3225184Sek110237
3235184Sek110237 waits = 10;
3246391Saw148015 while (waits && (threadflow->tf_running == 0) &&
3256391Saw148015 (filebench_shm->shm_f_abort == 0)) {
3265184Sek110237 (void) ipc_mutex_unlock(
3276391Saw148015 &filebench_shm->shm_threadflow_lock);
3285184Sek110237 if (waits < 3)
3295184Sek110237 filebench_log(LOG_INFO,
3305184Sek110237 "Waiting for pid %d thread %s-%d",
3315184Sek110237 pid,
3325184Sek110237 threadflow->tf_name,
3335184Sek110237 threadflow->tf_instance);
3345184Sek110237
3355184Sek110237 (void) sleep(1);
3366391Saw148015 (void) ipc_mutex_lock(
3376391Saw148015 &filebench_shm->shm_threadflow_lock);
3385184Sek110237 waits--;
3395184Sek110237 }
3405184Sek110237
3415184Sek110237 threadflow = threadflow->tf_next;
3425184Sek110237 }
3435184Sek110237
3446391Saw148015 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock);
3455184Sek110237 }
3465184Sek110237
3475184Sek110237 /*
3485184Sek110237 * Create an in-memory thread object linked to a parent procflow.
3495184Sek110237 * A threadflow entity is allocated from shared memory and
3505184Sek110237 * initialized from the "inherit" threadflow if supplied,
3515184Sek110237 * otherwise to zeros. The threadflow is assigned a unique
3525184Sek110237 * thread id, the supplied instance number, the supplied name
3535184Sek110237 * and added to the procflow's pf_thread list. If no name is
3545184Sek110237 * supplied or the threadflow can't be allocated, NULL is
3555184Sek110237 * returned Otherwise a pointer to the newly allocated threadflow
3565184Sek110237 * is returned.
3575184Sek110237 *
3586391Saw148015 * The filebench_shm->shm_threadflow_lock must be held by the caller.
3595184Sek110237 */
3605184Sek110237 static threadflow_t *
threadflow_define_common(procflow_t * procflow,char * name,threadflow_t * inherit,int instance)3615184Sek110237 threadflow_define_common(procflow_t *procflow, char *name,
3625184Sek110237 threadflow_t *inherit, int instance)
3635184Sek110237 {
3645184Sek110237 threadflow_t *threadflow;
3655184Sek110237 threadflow_t **threadlistp = &procflow->pf_threads;
3665184Sek110237
3675184Sek110237 if (name == NULL)
3685184Sek110237 return (NULL);
3695184Sek110237
3705184Sek110237 threadflow = (threadflow_t *)ipc_malloc(FILEBENCH_THREADFLOW);
3715184Sek110237
3725184Sek110237 if (threadflow == NULL)
3735184Sek110237 return (NULL);
3745184Sek110237
3755184Sek110237 if (inherit)
3765184Sek110237 (void) memcpy(threadflow, inherit, sizeof (threadflow_t));
3775184Sek110237 else
3785184Sek110237 (void) memset(threadflow, 0, sizeof (threadflow_t));
3795184Sek110237
3806391Saw148015 threadflow->tf_utid = ++filebench_shm->shm_utid;
3815184Sek110237
3825184Sek110237 threadflow->tf_instance = instance;
3835184Sek110237 (void) strcpy(threadflow->tf_name, name);
3845184Sek110237 threadflow->tf_process = procflow;
3857556SAndrew.W.Wilson@sun.com (void) pthread_mutex_init(&threadflow->tf_lock,
3867556SAndrew.W.Wilson@sun.com ipc_mutexattr(IPC_MUTEX_NORMAL));
3875184Sek110237
3885184Sek110237 filebench_log(LOG_DEBUG_IMPL, "Defining thread %s-%d",
3895184Sek110237 name, instance);
3905184Sek110237
3915184Sek110237 /* Add threadflow to list */
3925184Sek110237 if (*threadlistp == NULL) {
3935184Sek110237 *threadlistp = threadflow;
3945184Sek110237 threadflow->tf_next = NULL;
3955184Sek110237 } else {
3965184Sek110237 threadflow->tf_next = *threadlistp;
3975184Sek110237 *threadlistp = threadflow;
3985184Sek110237 }
3995184Sek110237
4005184Sek110237 return (threadflow);
4015184Sek110237 }
4025184Sek110237
4035184Sek110237 /*
4045184Sek110237 * Create an in memory FLOW_MASTER thread object as described
4056391Saw148015 * by the syntax. Acquire the filebench_shm->shm_threadflow_lock and
4065184Sek110237 * call threadflow_define_common() to create a threadflow entity.
4075184Sek110237 * Set the number of instances to create at runtime,
4085184Sek110237 * tf_instances, to "instances". Return the threadflow pointer
4095184Sek110237 * returned by the threadflow_define_common call.
4105184Sek110237 */
4115184Sek110237 threadflow_t *
threadflow_define(procflow_t * procflow,char * name,threadflow_t * inherit,avd_t instances)4125184Sek110237 threadflow_define(procflow_t *procflow, char *name,
4136212Saw148015 threadflow_t *inherit, avd_t instances)
4145184Sek110237 {
4155184Sek110237 threadflow_t *threadflow;
4165184Sek110237
4176391Saw148015 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock);
4185184Sek110237
4195184Sek110237 if ((threadflow = threadflow_define_common(procflow, name,
4205184Sek110237 inherit, FLOW_MASTER)) == NULL)
4215184Sek110237 return (NULL);
4225184Sek110237
4235184Sek110237 threadflow->tf_instances = instances;
4245184Sek110237
4256391Saw148015 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock);
4265184Sek110237
4275184Sek110237 return (threadflow);
4285184Sek110237 }
4295184Sek110237
4305184Sek110237
4315184Sek110237 /*
4325184Sek110237 * Searches the provided threadflow list for the named threadflow.
4335184Sek110237 * A pointer to the threadflow is returned, or NULL if threadflow
4345184Sek110237 * is not found.
4355184Sek110237 */
4365184Sek110237 threadflow_t *
threadflow_find(threadflow_t * threadlist,char * name)4375184Sek110237 threadflow_find(threadflow_t *threadlist, char *name)
4385184Sek110237 {
4395184Sek110237 threadflow_t *threadflow = threadlist;
4405184Sek110237
4416391Saw148015 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock);
4425184Sek110237
4435184Sek110237 while (threadflow) {
4445184Sek110237 if (strcmp(name, threadflow->tf_name) == 0) {
4455184Sek110237
4465184Sek110237 (void) ipc_mutex_unlock(
4476391Saw148015 &filebench_shm->shm_threadflow_lock);
4485184Sek110237
4495184Sek110237 return (threadflow);
4505184Sek110237 }
4515184Sek110237 threadflow = threadflow->tf_next;
4525184Sek110237 }
4535184Sek110237
4546391Saw148015 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock);
4555184Sek110237
4565184Sek110237
4575184Sek110237 return (NULL);
4585184Sek110237 }
459