xref: /onnv-gate/usr/src/cmd/filebench/common/threadflow.c (revision 5184:da60d2b4a9e2)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include "config.h"
29 #include <pthread.h>
30 #ifdef HAVE_LWPS
31 #include <sys/lwp.h>
32 #endif
33 #include <signal.h>
34 #include "threadflow.h"
35 #include "filebench.h"
36 #include "flowop.h"
37 #include "ipc.h"
38 
39 static threadflow_t *threadflow_define_common(procflow_t *procflow,
40     char *name, threadflow_t *inherit, int instance);
41 
42 /*
43  * Threadflows are filebench entities which manage operating system
44  * threads. Each worker threadflow spawns a separate filebench thread,
45  * with attributes inherited from a FLOW_MASTER threadflow created during
46  * f model language parsing. This section contains routines to define,
47  * create, control, and delete threadflows.
48  *
49  * Each thread defined in the f model creates a FLOW_MASTER
50  * threadflow which encapsulates the defined attributes and flowops of
51  * the f language thread, including the number of instances to create.
52  * At runtime, a worker threadflow instance with an associated filebench
53  * thread is created, which runs until told to quit or is specifically
54  * deleted.
55  */
56 
57 
58 /*
59  * Prints information about threadflow syntax.
60  */
61 void
62 threadflow_usage(void)
63 {
64 	(void) fprintf(stderr, "  thread  name=<name>[,instances=<count>]\n");
65 	(void) fprintf(stderr, "\n");
66 	(void) fprintf(stderr, "  {\n");
67 	(void) fprintf(stderr, "    flowop ...\n");
68 	(void) fprintf(stderr, "    flowop ...\n");
69 	(void) fprintf(stderr, "    flowop ...\n");
70 	(void) fprintf(stderr, "  }\n");
71 	(void) fprintf(stderr, "\n");
72 }
73 
74 /*
75  * Creates a thread for the supplied threadflow. If interprocess
76  * shared memory is desired, then increments the amount of shared
77  * memory needed by the amount specified in the threadflow's
78  * tf_memsize parameter. The thread starts in routine
79  * flowop_start() with a poineter to the threadflow supplied
80  * as the argument.
81  */
82 static int
83 threadflow_createthread(threadflow_t *threadflow)
84 {
85 	int fp = 0;
86 
87 	filebench_log(LOG_DEBUG_SCRIPT, "Creating thread %s, memory = %ld",
88 	    threadflow->tf_name,
89 	    *threadflow->tf_memsize);
90 
91 	if (threadflow->tf_attrs & THREADFLOW_USEISM)
92 		filebench_shm->shm_required += (*threadflow->tf_memsize);
93 
94 	if (pthread_create(&threadflow->tf_tid, NULL,
95 	    (void *(*)(void*))flowop_start, threadflow) != 0) {
96 		filebench_log(LOG_ERROR, "thread create failed");
97 		filebench_shutdown(1);
98 	}
99 
100 	/* XXX */
101 	return (fp < 0);
102 }
103 
104 #ifndef USE_PROCESS_MODEL
105 static procflow_t *my_procflow;
106 
107 /*
108  * Terminates (exits) all the threads of the procflow (process).
109  * The procflow is determined from a process private pointer
110  * initialized by threadflow_init().
111  */
112 /* ARGSUSED */
113 static void
114 threadflow_cancel(int arg1)
115 {
116 	threadflow_t *threadflow = my_procflow->pf_threads;
117 
118 #ifdef HAVE_LWPS
119 	filebench_log(LOG_DEBUG_IMPL, "Thread signal handler on tid %d",
120 	    _lwp_self());
121 #endif
122 
123 	my_procflow->pf_running = 0;
124 	exit(0);
125 
126 	while (threadflow) {
127 		if (threadflow->tf_tid) {
128 			(void) pthread_cancel(threadflow->tf_tid);
129 			filebench_log(LOG_DEBUG_IMPL, "Thread %d cancelled...",
130 			    threadflow->tf_tid);
131 		}
132 		threadflow = threadflow->tf_next;
133 	}
134 }
135 #endif /* USE_PROCESS_MODEL */
136 
137 /*
138  * Creates threads for the threadflows associated with a procflow.
139  * The routine iterates through the list of threadflows in the
140  * supplied procflow's pf_threads list. For each threadflow on
141  * the list, it defines tf_instances number of cloned
142  * threadflows, and then calls threadflow_createthread() for
143  * each to create and start the actual operating system thread.
144  * Note that each of the newly defined threadflows will be linked
145  * into the procflows threadflow list, but at the head of the
146  * list, so they will not become part of the supplied set. After
147  * all the threads have been created, threadflow_init enters
148  * a join loop for all the threads in the newly defined
149  * threadflows. Once all the created threads have exited,
150  * threadflow_init will return 0. If errors are encountered, it
151  * will return a non zero value.
152  */
153 int
154 threadflow_init(procflow_t *procflow)
155 {
156 	threadflow_t *threadflow = procflow->pf_threads;
157 	int ret = 0;
158 
159 	(void) ipc_mutex_lock(&filebench_shm->threadflow_lock);
160 #ifndef USE_PROCESS_MODEL
161 	my_procflow = procflow;
162 
163 	(void) signal(SIGUSR1, threadflow_cancel);
164 #endif
165 	while (threadflow) {
166 		threadflow_t *newthread;
167 		int i;
168 
169 		filebench_log(LOG_VERBOSE,
170 		    "Starting %lld %s threads",
171 		    *(threadflow->tf_instances),
172 		    threadflow->tf_name);
173 
174 		for (i = 1; i < *threadflow->tf_instances; i++) {
175 			/* Create threads */
176 			newthread =
177 			    threadflow_define_common(procflow,
178 			    threadflow->tf_name, threadflow, i + 1);
179 			if (newthread == NULL)
180 				return (-1);
181 			ret += threadflow_createthread(newthread);
182 		}
183 
184 		newthread = threadflow_define_common(procflow,
185 		    threadflow->tf_name,
186 		    threadflow, 1);
187 
188 		if (newthread == NULL)
189 			return (-1);
190 
191 		/* Create threads */
192 		ret += threadflow_createthread(newthread);
193 
194 		threadflow = threadflow->tf_next;
195 	}
196 
197 	threadflow = procflow->pf_threads;
198 
199 	(void) ipc_mutex_unlock(&filebench_shm->threadflow_lock);
200 
201 	while (threadflow) {
202 		void *status;
203 
204 		if (threadflow->tf_tid)
205 			(void) pthread_join(threadflow->tf_tid, &status);
206 
207 		ret += *(int *)status;
208 		threadflow = threadflow->tf_next;
209 	}
210 
211 	procflow->pf_running = 0;
212 
213 	return (ret);
214 }
215 
216 /*
217  * Tells the threadflow's thread to stop and optionally signals
218  * its associated process to end the thread.
219  */
220 static void
221 threadflow_kill(threadflow_t *threadflow)
222 {
223 	/* Tell thread to finish */
224 	threadflow->tf_abort = 1;
225 
226 #ifdef USE_PROCESS_MODEL
227 #ifdef HAVE_SIGSEND
228 	(void) sigsend(P_PID, threadflow->tf_process->pf_pid, SIGUSR1);
229 #else
230 	(void) kill(threadflow->tf_process->pf_pid, SIGUSR1);
231 #endif
232 #else /* USE_PROCESS_MODEL */
233 	threadflow->tf_process->pf_running = 0;
234 #endif /* USE_PROCESS_MODEL */
235 }
236 
237 /*
238  * Deletes the specified threadflow from the specified threadflow
239  * list after first terminating the threadflow's thread, deleting
240  * the threadflow's flowops, and finally freeing the threadflow
241  * entity. It also subtracts the threadflow's shared memory
242  * requirements from the total amount required, shm_required. If
243  * the specified threadflow is found, returns 0, otherwise
244  * returns -1.
245  */
246 static int
247 threadflow_delete(threadflow_t **threadlist, threadflow_t *threadflow)
248 {
249 	threadflow_t *entry = *threadlist;
250 
251 	filebench_log(LOG_DEBUG_IMPL, "Deleting thread: (%s-%d)",
252 	    threadflow->tf_name,
253 	    threadflow->tf_instance);
254 
255 	if (threadflow->tf_attrs & THREADFLOW_USEISM) {
256 		filebench_shm->shm_required -= (*threadflow->tf_memsize);
257 	}
258 
259 	if (threadflow == *threadlist) {
260 		/* First on list */
261 		filebench_log(LOG_DEBUG_IMPL, "Deleted thread: (%s-%d)",
262 		    threadflow->tf_name,
263 		    threadflow->tf_instance);
264 
265 		threadflow_kill(threadflow);
266 		flowop_delete_all(&threadflow->tf_ops);
267 		*threadlist = threadflow->tf_next;
268 		ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow);
269 		return (0);
270 	}
271 
272 	while (entry->tf_next) {
273 		filebench_log(LOG_DEBUG_IMPL,
274 		    "Delete thread: (%s-%d) == (%s-%d)",
275 		    entry->tf_next->tf_name,
276 		    entry->tf_next->tf_instance,
277 		    threadflow->tf_name,
278 		    threadflow->tf_instance);
279 
280 		if (threadflow == entry->tf_next) {
281 			/* Delete */
282 			filebench_log(LOG_DEBUG_IMPL,
283 			    "Deleted thread: (%s-%d)",
284 			    entry->tf_next->tf_name,
285 			    entry->tf_next->tf_instance);
286 			threadflow_kill(entry->tf_next);
287 			flowop_delete_all(&entry->tf_next->tf_ops);
288 			ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow);
289 			entry->tf_next = entry->tf_next->tf_next;
290 			return (0);
291 		}
292 		entry = entry->tf_next;
293 	}
294 
295 	return (-1);
296 }
297 
298 /*
299  * Given a pointer to the thread list of a procflow, cycles
300  * through all the threadflows on the list, deleting each one
301  * except the FLOW_MASTER.
302  */
303 void
304 threadflow_delete_all(threadflow_t **threadlist)
305 {
306 	threadflow_t *threadflow = *threadlist;
307 
308 	(void) ipc_mutex_lock(&filebench_shm->threadflow_lock);
309 
310 	filebench_log(LOG_DEBUG_IMPL, "Deleting all threads");
311 
312 	while (threadflow) {
313 		if (threadflow->tf_instance &&
314 		    (threadflow->tf_instance == FLOW_MASTER)) {
315 			threadflow = threadflow->tf_next;
316 			continue;
317 		}
318 		(void) threadflow_delete(threadlist, threadflow);
319 		threadflow = threadflow->tf_next;
320 	}
321 
322 	(void) ipc_mutex_unlock(&filebench_shm->threadflow_lock);
323 }
324 
325 /*
326  * Waits till all threadflows are started, or a timeout occurs.
327  * Checks through the list of threadflows, waiting up to 10
328  * seconds for each one to set its tf_running flag to 1. If not
329  * set after 10 seconds, continues on to the next threadflow
330  * anyway.
331  */
332 void
333 threadflow_allstarted(pid_t pid, threadflow_t *threadflow)
334 {
335 	(void) ipc_mutex_lock(&filebench_shm->threadflow_lock);
336 
337 	while (threadflow) {
338 		int waits;
339 
340 		if ((threadflow->tf_instance == 0) ||
341 		    (threadflow->tf_instance == FLOW_MASTER)) {
342 			threadflow = threadflow->tf_next;
343 			continue;
344 		}
345 
346 		filebench_log(LOG_DEBUG_IMPL, "Checking pid %d thread %s-%d",
347 		    pid,
348 		    threadflow->tf_name,
349 		    threadflow->tf_instance);
350 
351 		waits = 10;
352 		while (waits && threadflow->tf_running == 0) {
353 			(void) ipc_mutex_unlock(
354 			    &filebench_shm->threadflow_lock);
355 			if (waits < 3)
356 				filebench_log(LOG_INFO,
357 				    "Waiting for pid %d thread %s-%d",
358 				    pid,
359 				    threadflow->tf_name,
360 				    threadflow->tf_instance);
361 
362 			(void) sleep(1);
363 			(void) ipc_mutex_lock(&filebench_shm->threadflow_lock);
364 			waits--;
365 		}
366 
367 		threadflow = threadflow->tf_next;
368 	}
369 
370 	(void) ipc_mutex_unlock(&filebench_shm->threadflow_lock);
371 }
372 
373 /*
374  * Create an in-memory thread object linked to a parent procflow.
375  * A threadflow entity is allocated from shared memory and
376  * initialized from the "inherit" threadflow if supplied,
377  * otherwise to zeros. The threadflow is assigned a unique
378  * thread id, the supplied instance number, the supplied name
379  * and added to the procflow's pf_thread list. If no name is
380  * supplied or the threadflow can't be allocated, NULL is
381  * returned Otherwise a pointer to the newly allocated threadflow
382  * is returned.
383  *
384  * The filebench_shm->threadflow_lock must be held by the caller.
385  */
386 static threadflow_t *
387 threadflow_define_common(procflow_t *procflow, char *name,
388     threadflow_t *inherit, int instance)
389 {
390 	threadflow_t *threadflow;
391 	threadflow_t **threadlistp = &procflow->pf_threads;
392 
393 	if (name == NULL)
394 		return (NULL);
395 
396 	threadflow = (threadflow_t *)ipc_malloc(FILEBENCH_THREADFLOW);
397 
398 	if (threadflow == NULL)
399 		return (NULL);
400 
401 	if (inherit)
402 		(void) memcpy(threadflow, inherit, sizeof (threadflow_t));
403 	else
404 		(void) memset(threadflow, 0, sizeof (threadflow_t));
405 
406 	threadflow->tf_utid = ++filebench_shm->utid;
407 
408 	threadflow->tf_instance = instance;
409 	(void) strcpy(threadflow->tf_name, name);
410 	threadflow->tf_process = procflow;
411 
412 	filebench_log(LOG_DEBUG_IMPL, "Defining thread %s-%d",
413 	    name, instance);
414 
415 	/* Add threadflow to list */
416 	if (*threadlistp == NULL) {
417 		*threadlistp = threadflow;
418 		threadflow->tf_next = NULL;
419 	} else {
420 		threadflow->tf_next = *threadlistp;
421 		*threadlistp = threadflow;
422 	}
423 
424 	return (threadflow);
425 }
426 
427 /*
428  * Create an in memory FLOW_MASTER thread object as described
429  * by the syntax. Acquire the  filebench_shm->threadflow_lock and
430  * call threadflow_define_common() to create a threadflow entity.
431  * Set the number of instances to create at runtime,
432  * tf_instances, to "instances". Return the threadflow pointer
433  * returned by the threadflow_define_common call.
434  */
435 threadflow_t *
436 threadflow_define(procflow_t *procflow, char *name,
437     threadflow_t *inherit, var_integer_t instances)
438 {
439 	threadflow_t *threadflow;
440 
441 	(void) ipc_mutex_lock(&filebench_shm->threadflow_lock);
442 
443 	if ((threadflow = threadflow_define_common(procflow, name,
444 	    inherit, FLOW_MASTER)) == NULL)
445 		return (NULL);
446 
447 	threadflow->tf_instances = instances;
448 
449 	(void) ipc_mutex_unlock(&filebench_shm->threadflow_lock);
450 
451 	return (threadflow);
452 }
453 
454 
455 /*
456  * Searches the provided threadflow list for the named threadflow.
457  * A pointer to the threadflow is returned, or NULL if threadflow
458  * is not found.
459  */
460 threadflow_t *
461 threadflow_find(threadflow_t *threadlist, char *name)
462 {
463 	threadflow_t *threadflow = threadlist;
464 
465 	(void) ipc_mutex_lock(&filebench_shm->threadflow_lock);
466 
467 	while (threadflow) {
468 		if (strcmp(name, threadflow->tf_name) == 0) {
469 
470 			(void) ipc_mutex_unlock(
471 			    &filebench_shm->threadflow_lock);
472 
473 			return (threadflow);
474 		}
475 		threadflow = threadflow->tf_next;
476 	}
477 
478 	(void) ipc_mutex_unlock(&filebench_shm->threadflow_lock);
479 
480 
481 	return (NULL);
482 }
483