xref: /onnv-gate/usr/src/cmd/filebench/common/threadflow.c (revision 6286:edb6e4556869)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include "config.h"
29 #include <pthread.h>
30 #ifdef HAVE_LWPS
31 #include <sys/lwp.h>
32 #endif
33 #include <signal.h>
34 #include "threadflow.h"
35 #include "filebench.h"
36 #include "flowop.h"
37 #include "ipc.h"
38 
39 static threadflow_t *threadflow_define_common(procflow_t *procflow,
40     char *name, threadflow_t *inherit, int instance);
41 
42 /*
43  * Threadflows are filebench entities which manage operating system
44  * threads. Each worker threadflow spawns a separate filebench thread,
45  * with attributes inherited from a FLOW_MASTER threadflow created during
46  * f model language parsing. This section contains routines to define,
47  * create, control, and delete threadflows.
48  *
49  * Each thread defined in the f model creates a FLOW_MASTER
50  * threadflow which encapsulates the defined attributes and flowops of
51  * the f language thread, including the number of instances to create.
52  * At runtime, a worker threadflow instance with an associated filebench
53  * thread is created, which runs until told to quit or is specifically
54  * deleted.
55  */
56 
57 
58 /*
59  * Prints information about threadflow syntax.
60  */
61 void
62 threadflow_usage(void)
63 {
64 	(void) fprintf(stderr, "  thread  name=<name>[,instances=<count>]\n");
65 	(void) fprintf(stderr, "\n");
66 	(void) fprintf(stderr, "  {\n");
67 	(void) fprintf(stderr, "    flowop ...\n");
68 	(void) fprintf(stderr, "    flowop ...\n");
69 	(void) fprintf(stderr, "    flowop ...\n");
70 	(void) fprintf(stderr, "  }\n");
71 	(void) fprintf(stderr, "\n");
72 }
73 
74 /*
75  * Creates a thread for the supplied threadflow. If interprocess
76  * shared memory is desired, then increments the amount of shared
77  * memory needed by the amount specified in the threadflow's
78  * tf_memsize parameter. The thread starts in routine
79  * flowop_start() with a poineter to the threadflow supplied
80  * as the argument.
81  */
82 static int
83 threadflow_createthread(threadflow_t *threadflow)
84 {
85 	fbint_t memsize;
86 	memsize = avd_get_int(threadflow->tf_memsize);
87 	threadflow->tf_constmemsize = memsize;
88 
89 	filebench_log(LOG_DEBUG_SCRIPT, "Creating thread %s, memory = %ld",
90 	    threadflow->tf_name, memsize);
91 
92 	if (threadflow->tf_attrs & THREADFLOW_USEISM)
93 		filebench_shm->shm_required += memsize;
94 
95 	if (pthread_create(&threadflow->tf_tid, NULL,
96 	    (void *(*)(void*))flowop_start, threadflow) != 0) {
97 		filebench_log(LOG_ERROR, "thread create failed");
98 		filebench_shutdown(1);
99 		return (FILEBENCH_ERROR);
100 	}
101 
102 	return (FILEBENCH_OK);
103 }
104 
105 /*
106  * Terminates (exits) all the threads of the procflow (process).
107  * The procflow is determined from a process private pointer
108  * initialized by threadflow_init().
109  */
110 /* ARGSUSED */
111 static void
112 threadflow_cancel(int arg1)
113 {
114 	threadflow_t *threadflow;
115 
116 #ifdef HAVE_LWPS
117 	filebench_log(LOG_DEBUG_IMPL, "Thread signal handler on tid %d",
118 	    _lwp_self());
119 #endif
120 
121 	threadflow = my_procflow->pf_threads;
122 	my_procflow->pf_running = 0;
123 
124 	while (threadflow) {
125 		if (threadflow->tf_tid) {
126 			/* make sure thread has been cleaned up */
127 			flowop_destruct_all_flows(threadflow);
128 
129 			(void) pthread_cancel(threadflow->tf_tid);
130 			filebench_log(LOG_DEBUG_IMPL, "Thread %d cancelled...",
131 			    threadflow->tf_tid);
132 		}
133 		threadflow = threadflow->tf_next;
134 	}
135 
136 	exit(0);
137 }
138 
139 /*
140  * Creates threads for the threadflows associated with a procflow.
141  * The routine iterates through the list of threadflows in the
142  * supplied procflow's pf_threads list. For each threadflow on
143  * the list, it defines tf_instances number of cloned
144  * threadflows, and then calls threadflow_createthread() for
145  * each to create and start the actual operating system thread.
146  * Note that each of the newly defined threadflows will be linked
147  * into the procflows threadflow list, but at the head of the
148  * list, so they will not become part of the supplied set. After
149  * all the threads have been created, threadflow_init enters
150  * a join loop for all the threads in the newly defined
151  * threadflows. Once all the created threads have exited,
152  * threadflow_init will return 0. If errors are encountered, it
153  * will return a non zero value.
154  */
155 int
156 threadflow_init(procflow_t *procflow)
157 {
158 	threadflow_t *threadflow = procflow->pf_threads;
159 	int ret = 0;
160 
161 	(void) ipc_mutex_lock(&filebench_shm->threadflow_lock);
162 
163 	(void) signal(SIGUSR1, threadflow_cancel);
164 
165 	while (threadflow) {
166 		threadflow_t *newthread;
167 		int instances;
168 		int i;
169 
170 		instances = avd_get_int(threadflow->tf_instances);
171 		filebench_log(LOG_VERBOSE,
172 		    "Starting %d %s threads",
173 		    instances, threadflow->tf_name);
174 
175 		for (i = 1; i < instances; i++) {
176 			/* Create threads */
177 			newthread =
178 			    threadflow_define_common(procflow,
179 			    threadflow->tf_name, threadflow, i + 1);
180 			if (newthread == NULL)
181 				return (-1);
182 			ret |= threadflow_createthread(newthread);
183 		}
184 
185 		newthread = threadflow_define_common(procflow,
186 		    threadflow->tf_name,
187 		    threadflow, 1);
188 
189 		if (newthread == NULL)
190 			return (-1);
191 
192 		/* Create each thread */
193 		ret |= threadflow_createthread(newthread);
194 
195 		threadflow = threadflow->tf_next;
196 	}
197 
198 	threadflow = procflow->pf_threads;
199 
200 	(void) ipc_mutex_unlock(&filebench_shm->threadflow_lock);
201 
202 	while (threadflow) {
203 		void *status;
204 
205 		/* wait for all threads to finish */
206 		if (threadflow->tf_tid)
207 			(void) pthread_join(threadflow->tf_tid, &status);
208 
209 		ret |= *(int *)status;
210 		threadflow = threadflow->tf_next;
211 	}
212 
213 	procflow->pf_running = 0;
214 
215 	return (ret);
216 }
217 
218 /*
219  * Tells the threadflow's thread to stop and optionally signals
220  * its associated process to end the thread.
221  */
222 static void
223 threadflow_kill(threadflow_t *threadflow, int wait_cnt)
224 {
225 	/* Tell thread to finish */
226 	threadflow->tf_abort = 1;
227 
228 	/* wait a bit for threadflow to stop */
229 	while (wait_cnt && threadflow->tf_running) {
230 		(void) sleep(1);
231 		wait_cnt--;
232 	}
233 
234 #ifdef USE_PROCESS_MODEL
235 #ifdef HAVE_SIGSEND
236 	(void) sigsend(P_PID, threadflow->tf_process->pf_pid, SIGUSR1);
237 #else
238 	(void) kill(threadflow->tf_process->pf_pid, SIGUSR1);
239 #endif
240 #else /* USE_PROCESS_MODEL */
241 	threadflow->tf_process->pf_running = 0;
242 #endif /* USE_PROCESS_MODEL */
243 }
244 
245 /*
246  * Deletes the specified threadflow from the specified threadflow
247  * list after first terminating the threadflow's thread, deleting
248  * the threadflow's flowops, and finally freeing the threadflow
249  * entity. It also subtracts the threadflow's shared memory
250  * requirements from the total amount required, shm_required. If
251  * the specified threadflow is found, returns 0, otherwise
252  * returns -1.
253  */
254 static int
255 threadflow_delete(threadflow_t **threadlist, threadflow_t *threadflow,
256     int wait_cnt)
257 {
258 	threadflow_t *entry = *threadlist;
259 
260 	filebench_log(LOG_DEBUG_IMPL, "Deleting thread: (%s-%d)",
261 	    threadflow->tf_name,
262 	    threadflow->tf_instance);
263 
264 	if (threadflow->tf_attrs & THREADFLOW_USEISM)
265 		filebench_shm->shm_required -= threadflow->tf_constmemsize;
266 
267 	if (threadflow == *threadlist) {
268 		/* First on list */
269 		filebench_log(LOG_DEBUG_IMPL, "Deleted thread: (%s-%d)",
270 		    threadflow->tf_name,
271 		    threadflow->tf_instance);
272 
273 		threadflow_kill(threadflow, wait_cnt);
274 		flowop_delete_all(&threadflow->tf_ops);
275 		*threadlist = threadflow->tf_next;
276 		ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow);
277 		return (0);
278 	}
279 
280 	while (entry->tf_next) {
281 		filebench_log(LOG_DEBUG_IMPL,
282 		    "Delete thread: (%s-%d) == (%s-%d)",
283 		    entry->tf_next->tf_name,
284 		    entry->tf_next->tf_instance,
285 		    threadflow->tf_name,
286 		    threadflow->tf_instance);
287 
288 		if (threadflow == entry->tf_next) {
289 			/* Delete */
290 			filebench_log(LOG_DEBUG_IMPL,
291 			    "Deleted thread: (%s-%d)",
292 			    entry->tf_next->tf_name,
293 			    entry->tf_next->tf_instance);
294 			threadflow_kill(entry->tf_next, wait_cnt);
295 			flowop_delete_all(&entry->tf_next->tf_ops);
296 			ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow);
297 			entry->tf_next = entry->tf_next->tf_next;
298 			return (0);
299 		}
300 		entry = entry->tf_next;
301 	}
302 
303 	return (-1);
304 }
305 
306 /*
307  * Given a pointer to the thread list of a procflow, cycles
308  * through all the threadflows on the list, deleting each one
309  * except the FLOW_MASTER.
310  */
311 void
312 threadflow_delete_all(threadflow_t **threadlist, int wait_cnt)
313 {
314 	threadflow_t *threadflow = *threadlist;
315 
316 	(void) ipc_mutex_lock(&filebench_shm->threadflow_lock);
317 
318 	filebench_log(LOG_DEBUG_IMPL, "Deleting all threads");
319 
320 	while (threadflow) {
321 		if (threadflow->tf_instance &&
322 		    (threadflow->tf_instance == FLOW_MASTER)) {
323 			threadflow = threadflow->tf_next;
324 			continue;
325 		}
326 		(void) threadflow_delete(threadlist, threadflow, wait_cnt);
327 		threadflow = threadflow->tf_next;
328 		/* grow more impatient */
329 		if (wait_cnt > 0)
330 			wait_cnt--;
331 	}
332 
333 	(void) ipc_mutex_unlock(&filebench_shm->threadflow_lock);
334 }
335 
336 /*
337  * Waits till all threadflows are started, or a timeout occurs.
338  * Checks through the list of threadflows, waiting up to 10
339  * seconds for each one to set its tf_running flag to 1. If not
340  * set after 10 seconds, continues on to the next threadflow
341  * anyway.
342  */
343 void
344 threadflow_allstarted(pid_t pid, threadflow_t *threadflow)
345 {
346 	(void) ipc_mutex_lock(&filebench_shm->threadflow_lock);
347 
348 	while (threadflow) {
349 		int waits;
350 
351 		if ((threadflow->tf_instance == 0) ||
352 		    (threadflow->tf_instance == FLOW_MASTER)) {
353 			threadflow = threadflow->tf_next;
354 			continue;
355 		}
356 
357 		filebench_log(LOG_DEBUG_IMPL, "Checking pid %d thread %s-%d",
358 		    pid,
359 		    threadflow->tf_name,
360 		    threadflow->tf_instance);
361 
362 		waits = 10;
363 		while (waits && threadflow->tf_running == 0) {
364 			(void) ipc_mutex_unlock(
365 			    &filebench_shm->threadflow_lock);
366 			if (waits < 3)
367 				filebench_log(LOG_INFO,
368 				    "Waiting for pid %d thread %s-%d",
369 				    pid,
370 				    threadflow->tf_name,
371 				    threadflow->tf_instance);
372 
373 			(void) sleep(1);
374 			(void) ipc_mutex_lock(&filebench_shm->threadflow_lock);
375 			waits--;
376 		}
377 
378 		threadflow = threadflow->tf_next;
379 	}
380 
381 	(void) ipc_mutex_unlock(&filebench_shm->threadflow_lock);
382 }
383 
384 /*
385  * Create an in-memory thread object linked to a parent procflow.
386  * A threadflow entity is allocated from shared memory and
387  * initialized from the "inherit" threadflow if supplied,
388  * otherwise to zeros. The threadflow is assigned a unique
389  * thread id, the supplied instance number, the supplied name
390  * and added to the procflow's pf_thread list. If no name is
391  * supplied or the threadflow can't be allocated, NULL is
392  * returned Otherwise a pointer to the newly allocated threadflow
393  * is returned.
394  *
395  * The filebench_shm->threadflow_lock must be held by the caller.
396  */
397 static threadflow_t *
398 threadflow_define_common(procflow_t *procflow, char *name,
399     threadflow_t *inherit, int instance)
400 {
401 	threadflow_t *threadflow;
402 	threadflow_t **threadlistp = &procflow->pf_threads;
403 
404 	if (name == NULL)
405 		return (NULL);
406 
407 	threadflow = (threadflow_t *)ipc_malloc(FILEBENCH_THREADFLOW);
408 
409 	if (threadflow == NULL)
410 		return (NULL);
411 
412 	if (inherit)
413 		(void) memcpy(threadflow, inherit, sizeof (threadflow_t));
414 	else
415 		(void) memset(threadflow, 0, sizeof (threadflow_t));
416 
417 	threadflow->tf_utid = ++filebench_shm->utid;
418 
419 	threadflow->tf_instance = instance;
420 	(void) strcpy(threadflow->tf_name, name);
421 	threadflow->tf_process = procflow;
422 
423 	filebench_log(LOG_DEBUG_IMPL, "Defining thread %s-%d",
424 	    name, instance);
425 
426 	/* Add threadflow to list */
427 	if (*threadlistp == NULL) {
428 		*threadlistp = threadflow;
429 		threadflow->tf_next = NULL;
430 	} else {
431 		threadflow->tf_next = *threadlistp;
432 		*threadlistp = threadflow;
433 	}
434 
435 	return (threadflow);
436 }
437 
438 /*
439  * Create an in memory FLOW_MASTER thread object as described
440  * by the syntax. Acquire the  filebench_shm->threadflow_lock and
441  * call threadflow_define_common() to create a threadflow entity.
442  * Set the number of instances to create at runtime,
443  * tf_instances, to "instances". Return the threadflow pointer
444  * returned by the threadflow_define_common call.
445  */
446 threadflow_t *
447 threadflow_define(procflow_t *procflow, char *name,
448     threadflow_t *inherit, avd_t instances)
449 {
450 	threadflow_t *threadflow;
451 
452 	(void) ipc_mutex_lock(&filebench_shm->threadflow_lock);
453 
454 	if ((threadflow = threadflow_define_common(procflow, name,
455 	    inherit, FLOW_MASTER)) == NULL)
456 		return (NULL);
457 
458 	threadflow->tf_instances = instances;
459 
460 	(void) ipc_mutex_unlock(&filebench_shm->threadflow_lock);
461 
462 	return (threadflow);
463 }
464 
465 
466 /*
467  * Searches the provided threadflow list for the named threadflow.
468  * A pointer to the threadflow is returned, or NULL if threadflow
469  * is not found.
470  */
471 threadflow_t *
472 threadflow_find(threadflow_t *threadlist, char *name)
473 {
474 	threadflow_t *threadflow = threadlist;
475 
476 	(void) ipc_mutex_lock(&filebench_shm->threadflow_lock);
477 
478 	while (threadflow) {
479 		if (strcmp(name, threadflow->tf_name) == 0) {
480 
481 			(void) ipc_mutex_unlock(
482 			    &filebench_shm->threadflow_lock);
483 
484 			return (threadflow);
485 		}
486 		threadflow = threadflow->tf_next;
487 	}
488 
489 	(void) ipc_mutex_unlock(&filebench_shm->threadflow_lock);
490 
491 
492 	return (NULL);
493 }
494