xref: /spdk/lib/thread/thread.c (revision 8da7772a7d6daab09e0e7e58ab08eff5c4b7ba92)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/thread.h"
37 #include "spdk/log.h"
38 
39 #ifdef __linux__
40 #include <sys/prctl.h>
41 #endif
42 
43 #ifdef __FreeBSD__
44 #include <pthread_np.h>
45 #endif
46 
47 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
48 
49 struct io_device {
50 	void			*io_device;
51 	spdk_io_channel_create_cb create_cb;
52 	spdk_io_channel_destroy_cb destroy_cb;
53 	spdk_io_device_unregister_cb unregister_cb;
54 	struct spdk_thread	*unregister_thread;
55 	uint32_t		ctx_size;
56 	uint32_t		for_each_count;
57 	TAILQ_ENTRY(io_device)	tailq;
58 
59 	uint32_t		refcnt;
60 
61 	bool			unregistered;
62 };
63 
64 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
65 
66 struct spdk_thread {
67 	pthread_t thread_id;
68 	spdk_thread_pass_msg msg_fn;
69 	spdk_start_poller start_poller_fn;
70 	spdk_stop_poller stop_poller_fn;
71 	void *thread_ctx;
72 	TAILQ_HEAD(, spdk_io_channel) io_channels;
73 	TAILQ_ENTRY(spdk_thread) tailq;
74 	char *name;
75 };
76 
77 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
78 
79 static struct spdk_thread *
80 _get_thread(void)
81 {
82 	pthread_t thread_id;
83 	struct spdk_thread *thread;
84 
85 	thread_id = pthread_self();
86 
87 	thread = NULL;
88 	TAILQ_FOREACH(thread, &g_threads, tailq) {
89 		if (thread->thread_id == thread_id) {
90 			return thread;
91 		}
92 	}
93 
94 	return NULL;
95 }
96 
97 static void
98 _set_thread_name(const char *thread_name)
99 {
100 #if defined(__linux__)
101 	prctl(PR_SET_NAME, thread_name, 0, 0, 0);
102 #elif defined(__FreeBSD__)
103 	pthread_set_name_np(pthread_self(), thread_name);
104 #else
105 #error missing platform support for thread name
106 #endif
107 }
108 
109 struct spdk_thread *
110 spdk_allocate_thread(spdk_thread_pass_msg msg_fn,
111 		     spdk_start_poller start_poller_fn,
112 		     spdk_stop_poller stop_poller_fn,
113 		     void *thread_ctx, const char *name)
114 {
115 	struct spdk_thread *thread;
116 
117 	pthread_mutex_lock(&g_devlist_mutex);
118 
119 	thread = _get_thread();
120 	if (thread) {
121 		SPDK_ERRLOG("Double allocated SPDK thread\n");
122 		pthread_mutex_unlock(&g_devlist_mutex);
123 		return NULL;
124 	}
125 
126 	thread = calloc(1, sizeof(*thread));
127 	if (!thread) {
128 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
129 		pthread_mutex_unlock(&g_devlist_mutex);
130 		return NULL;
131 	}
132 
133 	thread->thread_id = pthread_self();
134 	thread->msg_fn = msg_fn;
135 	thread->start_poller_fn = start_poller_fn;
136 	thread->stop_poller_fn = stop_poller_fn;
137 	thread->thread_ctx = thread_ctx;
138 	TAILQ_INIT(&thread->io_channels);
139 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
140 	if (name) {
141 		_set_thread_name(name);
142 		thread->name = strdup(name);
143 	}
144 
145 	pthread_mutex_unlock(&g_devlist_mutex);
146 
147 	return thread;
148 }
149 
150 void
151 spdk_free_thread(void)
152 {
153 	struct spdk_thread *thread;
154 
155 	pthread_mutex_lock(&g_devlist_mutex);
156 
157 	thread = _get_thread();
158 	if (!thread) {
159 		SPDK_ERRLOG("No thread allocated\n");
160 		pthread_mutex_unlock(&g_devlist_mutex);
161 		return;
162 	}
163 
164 	TAILQ_REMOVE(&g_threads, thread, tailq);
165 	free(thread->name);
166 	free(thread);
167 
168 	pthread_mutex_unlock(&g_devlist_mutex);
169 }
170 
171 struct spdk_thread *
172 spdk_get_thread(void)
173 {
174 	struct spdk_thread *thread;
175 
176 	pthread_mutex_lock(&g_devlist_mutex);
177 
178 	thread = _get_thread();
179 	if (!thread) {
180 		SPDK_ERRLOG("No thread allocated\n");
181 	}
182 
183 	pthread_mutex_unlock(&g_devlist_mutex);
184 
185 	return thread;
186 }
187 
188 const char *
189 spdk_thread_get_name(const struct spdk_thread *thread)
190 {
191 	return thread->name;
192 }
193 
194 void
195 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_thread_fn fn, void *ctx)
196 {
197 	thread->msg_fn(fn, ctx, thread->thread_ctx);
198 }
199 
200 
201 struct spdk_poller *
202 spdk_poller_register(spdk_poller_fn fn,
203 		     void *arg,
204 		     uint64_t period_microseconds)
205 {
206 	struct spdk_thread *thread;
207 	struct spdk_poller *poller;
208 
209 	thread = spdk_get_thread();
210 	if (!thread) {
211 		abort();
212 	}
213 
214 	if (!thread->start_poller_fn || !thread->stop_poller_fn) {
215 		SPDK_ERRLOG("No related functions to start requested poller\n");
216 		abort();
217 	}
218 
219 	poller = thread->start_poller_fn(thread->thread_ctx, fn, arg, period_microseconds);
220 	if (!poller) {
221 		SPDK_ERRLOG("Unable to start requested poller\n");
222 		abort();
223 	}
224 
225 	return poller;
226 }
227 
228 void
229 spdk_poller_unregister(struct spdk_poller **ppoller)
230 {
231 	struct spdk_thread *thread;
232 	struct spdk_poller *poller;
233 
234 	poller = *ppoller;
235 	if (poller == NULL) {
236 		return;
237 	}
238 
239 	*ppoller = NULL;
240 
241 	thread = spdk_get_thread();
242 
243 	if (thread) {
244 		thread->stop_poller_fn(poller, thread->thread_ctx);
245 	}
246 }
247 
248 struct call_thread {
249 	struct spdk_thread *cur_thread;
250 	spdk_thread_fn fn;
251 	void *ctx;
252 
253 	struct spdk_thread *orig_thread;
254 	spdk_thread_fn cpl;
255 };
256 
257 static void
258 spdk_on_thread(void *ctx)
259 {
260 	struct call_thread *ct = ctx;
261 
262 	ct->fn(ct->ctx);
263 
264 	pthread_mutex_lock(&g_devlist_mutex);
265 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
266 	pthread_mutex_unlock(&g_devlist_mutex);
267 
268 	if (!ct->cur_thread) {
269 		spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
270 		free(ctx);
271 	} else {
272 		spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx);
273 	}
274 }
275 
276 void
277 spdk_for_each_thread(spdk_thread_fn fn, void *ctx, spdk_thread_fn cpl)
278 {
279 	struct call_thread *ct;
280 
281 	ct = calloc(1, sizeof(*ct));
282 	if (!ct) {
283 		SPDK_ERRLOG("Unable to perform thread iteration\n");
284 		cpl(ctx);
285 		return;
286 	}
287 
288 	ct->fn = fn;
289 	ct->ctx = ctx;
290 	ct->cpl = cpl;
291 
292 	pthread_mutex_lock(&g_devlist_mutex);
293 	ct->orig_thread = _get_thread();
294 	ct->cur_thread = TAILQ_FIRST(&g_threads);
295 	pthread_mutex_unlock(&g_devlist_mutex);
296 
297 	spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct);
298 }
299 
300 void
301 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
302 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size)
303 {
304 	struct io_device *dev, *tmp;
305 
306 	assert(io_device != NULL);
307 	assert(create_cb != NULL);
308 	assert(destroy_cb != NULL);
309 
310 	dev = calloc(1, sizeof(struct io_device));
311 	if (dev == NULL) {
312 		SPDK_ERRLOG("could not allocate io_device\n");
313 		return;
314 	}
315 
316 	dev->io_device = io_device;
317 	dev->create_cb = create_cb;
318 	dev->destroy_cb = destroy_cb;
319 	dev->unregister_cb = NULL;
320 	dev->ctx_size = ctx_size;
321 	dev->for_each_count = 0;
322 	dev->unregistered = false;
323 	dev->refcnt = 0;
324 
325 	pthread_mutex_lock(&g_devlist_mutex);
326 	TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
327 		if (tmp->io_device == io_device) {
328 			SPDK_ERRLOG("io_device %p already registered\n", io_device);
329 			free(dev);
330 			pthread_mutex_unlock(&g_devlist_mutex);
331 			return;
332 		}
333 	}
334 	TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
335 	pthread_mutex_unlock(&g_devlist_mutex);
336 }
337 
338 static void
339 _finish_unregister(void *arg)
340 {
341 	struct io_device *dev = arg;
342 
343 	dev->unregister_cb(dev->io_device);
344 	free(dev);
345 }
346 
347 static void
348 _spdk_io_device_free(struct io_device *dev)
349 {
350 	if (dev->unregister_cb == NULL) {
351 		free(dev);
352 	} else {
353 		assert(dev->unregister_thread != NULL);
354 		spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
355 	}
356 }
357 
358 void
359 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
360 {
361 	struct io_device *dev;
362 	uint32_t refcnt;
363 	struct spdk_thread *thread;
364 
365 	thread = spdk_get_thread();
366 
367 	pthread_mutex_lock(&g_devlist_mutex);
368 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
369 		if (dev->io_device == io_device) {
370 			break;
371 		}
372 	}
373 
374 	if (!dev) {
375 		SPDK_ERRLOG("io_device %p not found\n", io_device);
376 		pthread_mutex_unlock(&g_devlist_mutex);
377 		return;
378 	}
379 
380 	if (dev->for_each_count > 0) {
381 		SPDK_ERRLOG("io_device %p has %u for_each calls outstanding\n", io_device, dev->for_each_count);
382 		pthread_mutex_unlock(&g_devlist_mutex);
383 		return;
384 	}
385 
386 	dev->unregister_cb = unregister_cb;
387 	dev->unregistered = true;
388 	TAILQ_REMOVE(&g_io_devices, dev, tailq);
389 	refcnt = dev->refcnt;
390 	dev->unregister_thread = thread;
391 	pthread_mutex_unlock(&g_devlist_mutex);
392 
393 	if (refcnt > 0) {
394 		/* defer deletion */
395 		return;
396 	}
397 
398 	_spdk_io_device_free(dev);
399 }
400 
401 struct spdk_io_channel *
402 spdk_get_io_channel(void *io_device)
403 {
404 	struct spdk_io_channel *ch;
405 	struct spdk_thread *thread;
406 	struct io_device *dev;
407 	int rc;
408 
409 	pthread_mutex_lock(&g_devlist_mutex);
410 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
411 		if (dev->io_device == io_device) {
412 			break;
413 		}
414 	}
415 	if (dev == NULL) {
416 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
417 		pthread_mutex_unlock(&g_devlist_mutex);
418 		return NULL;
419 	}
420 
421 	thread = _get_thread();
422 	if (!thread) {
423 		SPDK_ERRLOG("No thread allocated\n");
424 		pthread_mutex_unlock(&g_devlist_mutex);
425 		return NULL;
426 	}
427 
428 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
429 		if (ch->dev == dev) {
430 			ch->ref++;
431 			/*
432 			 * An I/O channel already exists for this device on this
433 			 *  thread, so return it.
434 			 */
435 			pthread_mutex_unlock(&g_devlist_mutex);
436 			return ch;
437 		}
438 	}
439 
440 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
441 	if (ch == NULL) {
442 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
443 		pthread_mutex_unlock(&g_devlist_mutex);
444 		return NULL;
445 	}
446 
447 	ch->dev = dev;
448 	ch->destroy_cb = dev->destroy_cb;
449 	ch->thread = thread;
450 	ch->ref = 1;
451 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
452 
453 	dev->refcnt++;
454 
455 	pthread_mutex_unlock(&g_devlist_mutex);
456 
457 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
458 	if (rc == -1) {
459 		pthread_mutex_lock(&g_devlist_mutex);
460 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
461 		dev->refcnt--;
462 		free(ch);
463 		pthread_mutex_unlock(&g_devlist_mutex);
464 		return NULL;
465 	}
466 
467 	return ch;
468 }
469 
470 static void
471 _spdk_put_io_channel(void *arg)
472 {
473 	struct spdk_io_channel *ch = arg;
474 	bool do_remove_dev = true;
475 
476 	assert(ch->thread == spdk_get_thread());
477 	assert(ch->ref == 0);
478 
479 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
480 
481 	pthread_mutex_lock(&g_devlist_mutex);
482 	ch->dev->refcnt--;
483 
484 	if (!ch->dev->unregistered) {
485 		do_remove_dev = false;
486 	}
487 
488 	if (ch->dev->refcnt > 0) {
489 		do_remove_dev = false;
490 	}
491 
492 	pthread_mutex_unlock(&g_devlist_mutex);
493 
494 	if (do_remove_dev) {
495 		_spdk_io_device_free(ch->dev);
496 	}
497 	free(ch);
498 }
499 
500 void
501 spdk_put_io_channel(struct spdk_io_channel *ch)
502 {
503 	ch->ref--;
504 
505 	if (ch->ref == 0) {
506 		/* If this was the last reference, remove the channel from the list */
507 		pthread_mutex_lock(&g_devlist_mutex);
508 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
509 		pthread_mutex_unlock(&g_devlist_mutex);
510 
511 		spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch);
512 	}
513 }
514 
515 struct spdk_io_channel *
516 spdk_io_channel_from_ctx(void *ctx)
517 {
518 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
519 }
520 
521 struct spdk_thread *
522 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
523 {
524 	return ch->thread;
525 }
526 
527 struct spdk_io_channel_iter {
528 	void *io_device;
529 	struct io_device *dev;
530 	spdk_channel_msg fn;
531 	int status;
532 	void *ctx;
533 	struct spdk_io_channel *ch;
534 
535 	struct spdk_thread *cur_thread;
536 
537 	struct spdk_thread *orig_thread;
538 	spdk_channel_for_each_cpl cpl;
539 };
540 
541 void *
542 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
543 {
544 	return i->io_device;
545 }
546 
547 struct spdk_io_channel *
548 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
549 {
550 	return i->ch;
551 }
552 
553 void *
554 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
555 {
556 	return i->ctx;
557 }
558 
559 static void
560 _call_completion(void *ctx)
561 {
562 	struct spdk_io_channel_iter *i = ctx;
563 
564 	if (i->cpl != NULL) {
565 		i->cpl(i, i->status);
566 	}
567 	free(i);
568 }
569 
570 static void
571 _call_channel(void *ctx)
572 {
573 	struct spdk_io_channel_iter *i = ctx;
574 	struct spdk_io_channel *ch;
575 
576 	/*
577 	 * It is possible that the channel was deleted before this
578 	 *  message had a chance to execute.  If so, skip calling
579 	 *  the fn() on this thread.
580 	 */
581 	pthread_mutex_lock(&g_devlist_mutex);
582 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
583 		if (ch->dev->io_device == i->io_device) {
584 			break;
585 		}
586 	}
587 	pthread_mutex_unlock(&g_devlist_mutex);
588 
589 	if (ch) {
590 		i->fn(i);
591 	} else {
592 		spdk_for_each_channel_continue(i, 0);
593 	}
594 }
595 
596 void
597 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
598 		      spdk_channel_for_each_cpl cpl)
599 {
600 	struct spdk_thread *thread;
601 	struct spdk_io_channel *ch;
602 	struct spdk_io_channel_iter *i;
603 
604 	i = calloc(1, sizeof(*i));
605 	if (!i) {
606 		SPDK_ERRLOG("Unable to allocate iterator\n");
607 		return;
608 	}
609 
610 	i->io_device = io_device;
611 	i->fn = fn;
612 	i->ctx = ctx;
613 	i->cpl = cpl;
614 
615 	pthread_mutex_lock(&g_devlist_mutex);
616 	i->orig_thread = _get_thread();
617 
618 	TAILQ_FOREACH(thread, &g_threads, tailq) {
619 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
620 			if (ch->dev->io_device == io_device) {
621 				ch->dev->for_each_count++;
622 				i->dev = ch->dev;
623 				i->cur_thread = thread;
624 				i->ch = ch;
625 				pthread_mutex_unlock(&g_devlist_mutex);
626 				spdk_thread_send_msg(thread, _call_channel, i);
627 				return;
628 			}
629 		}
630 	}
631 
632 	pthread_mutex_unlock(&g_devlist_mutex);
633 
634 	cpl(i, 0);
635 
636 	free(i);
637 }
638 
639 void
640 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
641 {
642 	struct spdk_thread *thread;
643 	struct spdk_io_channel *ch;
644 
645 	assert(i->cur_thread == spdk_get_thread());
646 
647 	i->status = status;
648 
649 	pthread_mutex_lock(&g_devlist_mutex);
650 	if (status) {
651 		goto end;
652 	}
653 	thread = TAILQ_NEXT(i->cur_thread, tailq);
654 	while (thread) {
655 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
656 			if (ch->dev->io_device == i->io_device) {
657 				i->cur_thread = thread;
658 				i->ch = ch;
659 				pthread_mutex_unlock(&g_devlist_mutex);
660 				spdk_thread_send_msg(thread, _call_channel, i);
661 				return;
662 			}
663 		}
664 		thread = TAILQ_NEXT(thread, tailq);
665 	}
666 
667 end:
668 	i->dev->for_each_count--;
669 	i->ch = NULL;
670 	pthread_mutex_unlock(&g_devlist_mutex);
671 
672 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
673 }
674