1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /*
22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved.
23 * Use is subject to license terms.
24 *
25 * Portions Copyright 2008 Denis Cheng
26 */
27
28 #include "config.h"
29 #include <pthread.h>
30 #ifdef HAVE_LWPS
31 #include <sys/lwp.h>
32 #endif
33 #include <signal.h>
34
35 #include "filebench.h"
36 #include "threadflow.h"
37 #include "flowop.h"
38 #include "ipc.h"
39
40 static threadflow_t *threadflow_define_common(procflow_t *procflow,
41 char *name, threadflow_t *inherit, int instance);
42
43 /*
44 * Threadflows are filebench entities which manage operating system
45 * threads. Each worker threadflow spawns a separate filebench thread,
46 * with attributes inherited from a FLOW_MASTER threadflow created during
47 * f model language parsing. This section contains routines to define,
48 * create, control, and delete threadflows.
49 *
50 * Each thread defined in the f model creates a FLOW_MASTER
51 * threadflow which encapsulates the defined attributes and flowops of
52 * the f language thread, including the number of instances to create.
53 * At runtime, a worker threadflow instance with an associated filebench
54 * thread is created, which runs until told to quit or is specifically
55 * deleted.
56 */
57
58
59 /*
60 * Prints information about threadflow syntax.
61 */
62 void
threadflow_usage(void)63 threadflow_usage(void)
64 {
65 (void) fprintf(stderr, " thread name=<name>[,instances=<count>]\n");
66 (void) fprintf(stderr, "\n");
67 (void) fprintf(stderr, " {\n");
68 (void) fprintf(stderr, " flowop ...\n");
69 (void) fprintf(stderr, " flowop ...\n");
70 (void) fprintf(stderr, " flowop ...\n");
71 (void) fprintf(stderr, " }\n");
72 (void) fprintf(stderr, "\n");
73 }
74
75 /*
76 * Creates a thread for the supplied threadflow. If interprocess
77 * shared memory is desired, then increments the amount of shared
78 * memory needed by the amount specified in the threadflow's
79 * tf_memsize parameter. The thread starts in routine
80 * flowop_start() with a poineter to the threadflow supplied
81 * as the argument.
82 */
83 static int
threadflow_createthread(threadflow_t * threadflow)84 threadflow_createthread(threadflow_t *threadflow)
85 {
86 fbint_t memsize;
87 memsize = avd_get_int(threadflow->tf_memsize);
88 threadflow->tf_constmemsize = memsize;
89
90 filebench_log(LOG_DEBUG_SCRIPT, "Creating thread %s, memory = %ld",
91 threadflow->tf_name, memsize);
92
93 if (threadflow->tf_attrs & THREADFLOW_USEISM)
94 filebench_shm->shm_required += memsize;
95
96 if (pthread_create(&threadflow->tf_tid, NULL,
97 (void *(*)(void*))flowop_start, threadflow) != 0) {
98 filebench_log(LOG_ERROR, "thread create failed");
99 filebench_shutdown(1);
100 return (FILEBENCH_ERROR);
101 }
102
103 return (FILEBENCH_OK);
104 }
105
106 /*
107 * Creates threads for the threadflows associated with a procflow.
108 * The routine iterates through the list of threadflows in the
109 * supplied procflow's pf_threads list. For each threadflow on
110 * the list, it defines tf_instances number of cloned
111 * threadflows, and then calls threadflow_createthread() for
112 * each to create and start the actual operating system thread.
113 * Note that each of the newly defined threadflows will be linked
114 * into the procflows threadflow list, but at the head of the
115 * list, so they will not become part of the supplied set. After
116 * all the threads have been created, threadflow_init enters
117 * a join loop for all the threads in the newly defined
118 * threadflows. Once all the created threads have exited,
119 * threadflow_init will return 0. If errors are encountered, it
120 * will return a non zero value.
121 */
122 int
threadflow_init(procflow_t * procflow)123 threadflow_init(procflow_t *procflow)
124 {
125 threadflow_t *threadflow = procflow->pf_threads;
126 int ret = 0;
127
128 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock);
129
130 while (threadflow) {
131 threadflow_t *newthread;
132 int instances;
133 int i;
134
135 instances = avd_get_int(threadflow->tf_instances);
136 filebench_log(LOG_VERBOSE,
137 "Starting %d %s threads",
138 instances, threadflow->tf_name);
139
140 for (i = 1; i < instances; i++) {
141 /* Create threads */
142 newthread =
143 threadflow_define_common(procflow,
144 threadflow->tf_name, threadflow, i + 1);
145 if (newthread == NULL)
146 return (-1);
147 ret |= threadflow_createthread(newthread);
148 }
149
150 newthread = threadflow_define_common(procflow,
151 threadflow->tf_name,
152 threadflow, 1);
153
154 if (newthread == NULL)
155 return (-1);
156
157 /* Create each thread */
158 ret |= threadflow_createthread(newthread);
159
160 threadflow = threadflow->tf_next;
161 }
162
163 threadflow = procflow->pf_threads;
164
165 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock);
166
167 while (threadflow) {
168 /* wait for all threads to finish */
169 if (threadflow->tf_tid) {
170 void *status;
171
172 if (pthread_join(threadflow->tf_tid, &status) == 0)
173 ret += *(int *)status;
174 }
175 threadflow = threadflow->tf_next;
176 }
177
178 procflow->pf_running = 0;
179
180 return (ret);
181 }
182
183 /*
184 * Tells the threadflow's thread to stop and optionally signals
185 * its associated process to end the thread.
186 */
187 static void
threadflow_kill(threadflow_t * threadflow)188 threadflow_kill(threadflow_t *threadflow)
189 {
190 int wait_cnt = 2;
191
192 /* Tell thread to finish */
193 threadflow->tf_abort = 1;
194
195 /* wait a bit for threadflow to stop */
196 while (wait_cnt && threadflow->tf_running) {
197 (void) sleep(1);
198 wait_cnt--;
199 }
200
201 if (threadflow->tf_running) {
202 threadflow->tf_running = FALSE;
203 (void) pthread_kill(threadflow->tf_tid, SIGKILL);
204 }
205 }
206
207 /*
208 * Deletes the specified threadflow from the specified threadflow
209 * list after first terminating the threadflow's thread, deleting
210 * the threadflow's flowops, and finally freeing the threadflow
211 * entity. It also subtracts the threadflow's shared memory
212 * requirements from the total amount required, shm_required. If
213 * the specified threadflow is found, returns 0, otherwise
214 * returns -1.
215 */
216 static int
threadflow_delete(threadflow_t ** threadlist,threadflow_t * threadflow)217 threadflow_delete(threadflow_t **threadlist, threadflow_t *threadflow)
218 {
219 threadflow_t *entry = *threadlist;
220
221 filebench_log(LOG_DEBUG_IMPL, "Deleting thread: (%s-%d)",
222 threadflow->tf_name,
223 threadflow->tf_instance);
224
225 if (threadflow->tf_attrs & THREADFLOW_USEISM)
226 filebench_shm->shm_required -= threadflow->tf_constmemsize;
227
228 if (threadflow == *threadlist) {
229 /* First on list */
230 filebench_log(LOG_DEBUG_IMPL, "Deleted thread: (%s-%d)",
231 threadflow->tf_name,
232 threadflow->tf_instance);
233
234 threadflow_kill(threadflow);
235 flowop_delete_all(&threadflow->tf_thrd_fops);
236 *threadlist = threadflow->tf_next;
237 (void) pthread_mutex_destroy(&threadflow->tf_lock);
238 ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow);
239 return (0);
240 }
241
242 while (entry->tf_next) {
243 filebench_log(LOG_DEBUG_IMPL,
244 "Delete thread: (%s-%d) == (%s-%d)",
245 entry->tf_next->tf_name,
246 entry->tf_next->tf_instance,
247 threadflow->tf_name,
248 threadflow->tf_instance);
249
250 if (threadflow == entry->tf_next) {
251 /* Delete */
252 filebench_log(LOG_DEBUG_IMPL,
253 "Deleted thread: (%s-%d)",
254 entry->tf_next->tf_name,
255 entry->tf_next->tf_instance);
256 threadflow_kill(entry->tf_next);
257 flowop_delete_all(&entry->tf_next->tf_thrd_fops);
258 (void) pthread_mutex_destroy(&threadflow->tf_lock);
259 ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow);
260 entry->tf_next = entry->tf_next->tf_next;
261 return (0);
262 }
263 entry = entry->tf_next;
264 }
265
266 return (-1);
267 }
268
269 /*
270 * Given a pointer to the thread list of a procflow, cycles
271 * through all the threadflows on the list, deleting each one
272 * except the FLOW_MASTER.
273 */
274 void
threadflow_delete_all(threadflow_t ** threadlist)275 threadflow_delete_all(threadflow_t **threadlist)
276 {
277 threadflow_t *threadflow;
278
279 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock);
280
281 threadflow = *threadlist;
282 filebench_log(LOG_DEBUG_IMPL, "Deleting all threads");
283
284 while (threadflow) {
285 if (threadflow->tf_instance &&
286 (threadflow->tf_instance == FLOW_MASTER)) {
287 threadflow = threadflow->tf_next;
288 continue;
289 }
290 (void) threadflow_delete(threadlist, threadflow);
291 threadflow = threadflow->tf_next;
292 }
293
294 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock);
295 }
296
297 /*
298 * Waits till all threadflows are started, or a timeout occurs.
299 * Checks through the list of threadflows, waiting up to 10
300 * seconds for each one to set its tf_running flag to 1. If not
301 * set after 10 seconds, continues on to the next threadflow
302 * anyway.
303 */
304 void
threadflow_allstarted(pid_t pid,threadflow_t * threadflow)305 threadflow_allstarted(pid_t pid, threadflow_t *threadflow)
306 {
307 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock);
308
309 while (threadflow) {
310 int waits;
311
312 if ((threadflow->tf_instance == 0) ||
313 (threadflow->tf_instance == FLOW_MASTER)) {
314 threadflow = threadflow->tf_next;
315 continue;
316 }
317
318 filebench_log(LOG_DEBUG_IMPL, "Checking pid %d thread %s-%d",
319 pid,
320 threadflow->tf_name,
321 threadflow->tf_instance);
322
323 waits = 10;
324 while (waits && (threadflow->tf_running == 0) &&
325 (filebench_shm->shm_f_abort == 0)) {
326 (void) ipc_mutex_unlock(
327 &filebench_shm->shm_threadflow_lock);
328 if (waits < 3)
329 filebench_log(LOG_INFO,
330 "Waiting for pid %d thread %s-%d",
331 pid,
332 threadflow->tf_name,
333 threadflow->tf_instance);
334
335 (void) sleep(1);
336 (void) ipc_mutex_lock(
337 &filebench_shm->shm_threadflow_lock);
338 waits--;
339 }
340
341 threadflow = threadflow->tf_next;
342 }
343
344 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock);
345 }
346
347 /*
348 * Create an in-memory thread object linked to a parent procflow.
349 * A threadflow entity is allocated from shared memory and
350 * initialized from the "inherit" threadflow if supplied,
351 * otherwise to zeros. The threadflow is assigned a unique
352 * thread id, the supplied instance number, the supplied name
353 * and added to the procflow's pf_thread list. If no name is
354 * supplied or the threadflow can't be allocated, NULL is
355 * returned Otherwise a pointer to the newly allocated threadflow
356 * is returned.
357 *
358 * The filebench_shm->shm_threadflow_lock must be held by the caller.
359 */
360 static threadflow_t *
threadflow_define_common(procflow_t * procflow,char * name,threadflow_t * inherit,int instance)361 threadflow_define_common(procflow_t *procflow, char *name,
362 threadflow_t *inherit, int instance)
363 {
364 threadflow_t *threadflow;
365 threadflow_t **threadlistp = &procflow->pf_threads;
366
367 if (name == NULL)
368 return (NULL);
369
370 threadflow = (threadflow_t *)ipc_malloc(FILEBENCH_THREADFLOW);
371
372 if (threadflow == NULL)
373 return (NULL);
374
375 if (inherit)
376 (void) memcpy(threadflow, inherit, sizeof (threadflow_t));
377 else
378 (void) memset(threadflow, 0, sizeof (threadflow_t));
379
380 threadflow->tf_utid = ++filebench_shm->shm_utid;
381
382 threadflow->tf_instance = instance;
383 (void) strcpy(threadflow->tf_name, name);
384 threadflow->tf_process = procflow;
385 (void) pthread_mutex_init(&threadflow->tf_lock,
386 ipc_mutexattr(IPC_MUTEX_NORMAL));
387
388 filebench_log(LOG_DEBUG_IMPL, "Defining thread %s-%d",
389 name, instance);
390
391 /* Add threadflow to list */
392 if (*threadlistp == NULL) {
393 *threadlistp = threadflow;
394 threadflow->tf_next = NULL;
395 } else {
396 threadflow->tf_next = *threadlistp;
397 *threadlistp = threadflow;
398 }
399
400 return (threadflow);
401 }
402
403 /*
404 * Create an in memory FLOW_MASTER thread object as described
405 * by the syntax. Acquire the filebench_shm->shm_threadflow_lock and
406 * call threadflow_define_common() to create a threadflow entity.
407 * Set the number of instances to create at runtime,
408 * tf_instances, to "instances". Return the threadflow pointer
409 * returned by the threadflow_define_common call.
410 */
411 threadflow_t *
threadflow_define(procflow_t * procflow,char * name,threadflow_t * inherit,avd_t instances)412 threadflow_define(procflow_t *procflow, char *name,
413 threadflow_t *inherit, avd_t instances)
414 {
415 threadflow_t *threadflow;
416
417 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock);
418
419 if ((threadflow = threadflow_define_common(procflow, name,
420 inherit, FLOW_MASTER)) == NULL)
421 return (NULL);
422
423 threadflow->tf_instances = instances;
424
425 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock);
426
427 return (threadflow);
428 }
429
430
431 /*
432 * Searches the provided threadflow list for the named threadflow.
433 * A pointer to the threadflow is returned, or NULL if threadflow
434 * is not found.
435 */
436 threadflow_t *
threadflow_find(threadflow_t * threadlist,char * name)437 threadflow_find(threadflow_t *threadlist, char *name)
438 {
439 threadflow_t *threadflow = threadlist;
440
441 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock);
442
443 while (threadflow) {
444 if (strcmp(name, threadflow->tf_name) == 0) {
445
446 (void) ipc_mutex_unlock(
447 &filebench_shm->shm_threadflow_lock);
448
449 return (threadflow);
450 }
451 threadflow = threadflow->tf_next;
452 }
453
454 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock);
455
456
457 return (NULL);
458 }
459