xref: /spdk/lib/thread/thread.c (revision f56b2300633de2070005dd0fa244edb2a7a060d2)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/thread.h"
37 #include "spdk/log.h"
38 
39 #ifdef __linux__
40 #include <sys/prctl.h>
41 #endif
42 
43 #ifdef __FreeBSD__
44 #include <pthread_np.h>
45 #endif
46 
47 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
48 
49 struct io_device {
50 	void			*io_device;
51 	spdk_io_channel_create_cb create_cb;
52 	spdk_io_channel_destroy_cb destroy_cb;
53 	spdk_io_device_unregister_cb unregister_cb;
54 	struct spdk_thread	*unregister_thread;
55 	uint32_t		ctx_size;
56 	uint32_t		for_each_count;
57 	TAILQ_ENTRY(io_device)	tailq;
58 
59 	uint32_t		refcnt;
60 
61 	bool			unregistered;
62 };
63 
64 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
65 
66 struct spdk_thread {
67 	pthread_t thread_id;
68 	spdk_thread_pass_msg msg_fn;
69 	spdk_start_poller start_poller_fn;
70 	spdk_stop_poller stop_poller_fn;
71 	void *thread_ctx;
72 	TAILQ_HEAD(, spdk_io_channel) io_channels;
73 	TAILQ_ENTRY(spdk_thread) tailq;
74 	char *name;
75 };
76 
77 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
78 static uint32_t g_thread_count = 0;
79 
80 static struct spdk_thread *
81 _get_thread(void)
82 {
83 	pthread_t thread_id;
84 	struct spdk_thread *thread;
85 
86 	thread_id = pthread_self();
87 
88 	thread = NULL;
89 	TAILQ_FOREACH(thread, &g_threads, tailq) {
90 		if (thread->thread_id == thread_id) {
91 			return thread;
92 		}
93 	}
94 
95 	return NULL;
96 }
97 
98 static void
99 _set_thread_name(const char *thread_name)
100 {
101 #if defined(__linux__)
102 	prctl(PR_SET_NAME, thread_name, 0, 0, 0);
103 #elif defined(__FreeBSD__)
104 	pthread_set_name_np(pthread_self(), thread_name);
105 #else
106 #error missing platform support for thread name
107 #endif
108 }
109 
110 struct spdk_thread *
111 spdk_allocate_thread(spdk_thread_pass_msg msg_fn,
112 		     spdk_start_poller start_poller_fn,
113 		     spdk_stop_poller stop_poller_fn,
114 		     void *thread_ctx, const char *name)
115 {
116 	struct spdk_thread *thread;
117 
118 	pthread_mutex_lock(&g_devlist_mutex);
119 
120 	thread = _get_thread();
121 	if (thread) {
122 		SPDK_ERRLOG("Double allocated SPDK thread\n");
123 		pthread_mutex_unlock(&g_devlist_mutex);
124 		return NULL;
125 	}
126 
127 	thread = calloc(1, sizeof(*thread));
128 	if (!thread) {
129 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
130 		pthread_mutex_unlock(&g_devlist_mutex);
131 		return NULL;
132 	}
133 
134 	thread->thread_id = pthread_self();
135 	thread->msg_fn = msg_fn;
136 	thread->start_poller_fn = start_poller_fn;
137 	thread->stop_poller_fn = stop_poller_fn;
138 	thread->thread_ctx = thread_ctx;
139 	TAILQ_INIT(&thread->io_channels);
140 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
141 	g_thread_count++;
142 	if (name) {
143 		_set_thread_name(name);
144 		thread->name = strdup(name);
145 	}
146 
147 	pthread_mutex_unlock(&g_devlist_mutex);
148 
149 	return thread;
150 }
151 
152 void
153 spdk_free_thread(void)
154 {
155 	struct spdk_thread *thread;
156 
157 	pthread_mutex_lock(&g_devlist_mutex);
158 
159 	thread = _get_thread();
160 	if (!thread) {
161 		SPDK_ERRLOG("No thread allocated\n");
162 		pthread_mutex_unlock(&g_devlist_mutex);
163 		return;
164 	}
165 
166 	assert(g_thread_count > 0);
167 	g_thread_count--;
168 	TAILQ_REMOVE(&g_threads, thread, tailq);
169 	free(thread->name);
170 	free(thread);
171 
172 	pthread_mutex_unlock(&g_devlist_mutex);
173 }
174 
175 uint32_t
176 spdk_thread_get_count(void)
177 {
178 	/*
179 	 * Return cached value of the current thread count.  We could acquire the
180 	 *  lock and iterate through the TAILQ of threads to count them, but that
181 	 *  count could still be invalidated after we release the lock.
182 	 */
183 	return g_thread_count;
184 }
185 
186 struct spdk_thread *
187 spdk_get_thread(void)
188 {
189 	struct spdk_thread *thread;
190 
191 	pthread_mutex_lock(&g_devlist_mutex);
192 
193 	thread = _get_thread();
194 	if (!thread) {
195 		SPDK_ERRLOG("No thread allocated\n");
196 	}
197 
198 	pthread_mutex_unlock(&g_devlist_mutex);
199 
200 	return thread;
201 }
202 
203 const char *
204 spdk_thread_get_name(const struct spdk_thread *thread)
205 {
206 	return thread->name;
207 }
208 
209 void
210 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_thread_fn fn, void *ctx)
211 {
212 	thread->msg_fn(fn, ctx, thread->thread_ctx);
213 }
214 
215 
216 struct spdk_poller *
217 spdk_poller_register(spdk_poller_fn fn,
218 		     void *arg,
219 		     uint64_t period_microseconds)
220 {
221 	struct spdk_thread *thread;
222 	struct spdk_poller *poller;
223 
224 	thread = spdk_get_thread();
225 	if (!thread) {
226 		assert(false);
227 		return NULL;
228 	}
229 
230 	if (!thread->start_poller_fn || !thread->stop_poller_fn) {
231 		SPDK_ERRLOG("No related functions to start requested poller\n");
232 		assert(false);
233 		return NULL;
234 	}
235 
236 	poller = thread->start_poller_fn(thread->thread_ctx, fn, arg, period_microseconds);
237 	if (!poller) {
238 		SPDK_ERRLOG("Unable to start requested poller\n");
239 		assert(false);
240 		return NULL;
241 	}
242 
243 	return poller;
244 }
245 
246 void
247 spdk_poller_unregister(struct spdk_poller **ppoller)
248 {
249 	struct spdk_thread *thread;
250 	struct spdk_poller *poller;
251 
252 	poller = *ppoller;
253 	if (poller == NULL) {
254 		return;
255 	}
256 
257 	*ppoller = NULL;
258 
259 	thread = spdk_get_thread();
260 
261 	if (thread) {
262 		thread->stop_poller_fn(poller, thread->thread_ctx);
263 	}
264 }
265 
266 struct call_thread {
267 	struct spdk_thread *cur_thread;
268 	spdk_thread_fn fn;
269 	void *ctx;
270 
271 	struct spdk_thread *orig_thread;
272 	spdk_thread_fn cpl;
273 };
274 
275 static void
276 spdk_on_thread(void *ctx)
277 {
278 	struct call_thread *ct = ctx;
279 
280 	ct->fn(ct->ctx);
281 
282 	pthread_mutex_lock(&g_devlist_mutex);
283 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
284 	pthread_mutex_unlock(&g_devlist_mutex);
285 
286 	if (!ct->cur_thread) {
287 		spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
288 		free(ctx);
289 	} else {
290 		spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx);
291 	}
292 }
293 
294 void
295 spdk_for_each_thread(spdk_thread_fn fn, void *ctx, spdk_thread_fn cpl)
296 {
297 	struct call_thread *ct;
298 
299 	ct = calloc(1, sizeof(*ct));
300 	if (!ct) {
301 		SPDK_ERRLOG("Unable to perform thread iteration\n");
302 		cpl(ctx);
303 		return;
304 	}
305 
306 	ct->fn = fn;
307 	ct->ctx = ctx;
308 	ct->cpl = cpl;
309 
310 	pthread_mutex_lock(&g_devlist_mutex);
311 	ct->orig_thread = _get_thread();
312 	ct->cur_thread = TAILQ_FIRST(&g_threads);
313 	pthread_mutex_unlock(&g_devlist_mutex);
314 
315 	spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct);
316 }
317 
318 void
319 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
320 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size)
321 {
322 	struct io_device *dev, *tmp;
323 
324 	assert(io_device != NULL);
325 	assert(create_cb != NULL);
326 	assert(destroy_cb != NULL);
327 
328 	dev = calloc(1, sizeof(struct io_device));
329 	if (dev == NULL) {
330 		SPDK_ERRLOG("could not allocate io_device\n");
331 		return;
332 	}
333 
334 	dev->io_device = io_device;
335 	dev->create_cb = create_cb;
336 	dev->destroy_cb = destroy_cb;
337 	dev->unregister_cb = NULL;
338 	dev->ctx_size = ctx_size;
339 	dev->for_each_count = 0;
340 	dev->unregistered = false;
341 	dev->refcnt = 0;
342 
343 	pthread_mutex_lock(&g_devlist_mutex);
344 	TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
345 		if (tmp->io_device == io_device) {
346 			SPDK_ERRLOG("io_device %p already registered\n", io_device);
347 			free(dev);
348 			pthread_mutex_unlock(&g_devlist_mutex);
349 			return;
350 		}
351 	}
352 	TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
353 	pthread_mutex_unlock(&g_devlist_mutex);
354 }
355 
356 static void
357 _finish_unregister(void *arg)
358 {
359 	struct io_device *dev = arg;
360 
361 	dev->unregister_cb(dev->io_device);
362 	free(dev);
363 }
364 
365 static void
366 _spdk_io_device_free(struct io_device *dev)
367 {
368 	if (dev->unregister_cb == NULL) {
369 		free(dev);
370 	} else {
371 		assert(dev->unregister_thread != NULL);
372 		spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
373 	}
374 }
375 
376 void
377 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
378 {
379 	struct io_device *dev;
380 	uint32_t refcnt;
381 	struct spdk_thread *thread;
382 
383 	thread = spdk_get_thread();
384 
385 	pthread_mutex_lock(&g_devlist_mutex);
386 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
387 		if (dev->io_device == io_device) {
388 			break;
389 		}
390 	}
391 
392 	if (!dev) {
393 		SPDK_ERRLOG("io_device %p not found\n", io_device);
394 		assert(false);
395 		pthread_mutex_unlock(&g_devlist_mutex);
396 		return;
397 	}
398 
399 	if (dev->for_each_count > 0) {
400 		SPDK_ERRLOG("io_device %p has %u for_each calls outstanding\n", io_device, dev->for_each_count);
401 		pthread_mutex_unlock(&g_devlist_mutex);
402 		return;
403 	}
404 
405 	dev->unregister_cb = unregister_cb;
406 	dev->unregistered = true;
407 	TAILQ_REMOVE(&g_io_devices, dev, tailq);
408 	refcnt = dev->refcnt;
409 	dev->unregister_thread = thread;
410 	pthread_mutex_unlock(&g_devlist_mutex);
411 
412 	if (refcnt > 0) {
413 		/* defer deletion */
414 		return;
415 	}
416 
417 	_spdk_io_device_free(dev);
418 }
419 
420 struct spdk_io_channel *
421 spdk_get_io_channel(void *io_device)
422 {
423 	struct spdk_io_channel *ch;
424 	struct spdk_thread *thread;
425 	struct io_device *dev;
426 	int rc;
427 
428 	pthread_mutex_lock(&g_devlist_mutex);
429 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
430 		if (dev->io_device == io_device) {
431 			break;
432 		}
433 	}
434 	if (dev == NULL) {
435 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
436 		pthread_mutex_unlock(&g_devlist_mutex);
437 		return NULL;
438 	}
439 
440 	thread = _get_thread();
441 	if (!thread) {
442 		SPDK_ERRLOG("No thread allocated\n");
443 		pthread_mutex_unlock(&g_devlist_mutex);
444 		return NULL;
445 	}
446 
447 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
448 		if (ch->dev == dev) {
449 			ch->ref++;
450 			/*
451 			 * An I/O channel already exists for this device on this
452 			 *  thread, so return it.
453 			 */
454 			pthread_mutex_unlock(&g_devlist_mutex);
455 			return ch;
456 		}
457 	}
458 
459 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
460 	if (ch == NULL) {
461 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
462 		pthread_mutex_unlock(&g_devlist_mutex);
463 		return NULL;
464 	}
465 
466 	ch->dev = dev;
467 	ch->destroy_cb = dev->destroy_cb;
468 	ch->thread = thread;
469 	ch->ref = 1;
470 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
471 
472 	dev->refcnt++;
473 
474 	pthread_mutex_unlock(&g_devlist_mutex);
475 
476 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
477 	if (rc == -1) {
478 		pthread_mutex_lock(&g_devlist_mutex);
479 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
480 		dev->refcnt--;
481 		free(ch);
482 		pthread_mutex_unlock(&g_devlist_mutex);
483 		return NULL;
484 	}
485 
486 	return ch;
487 }
488 
489 static void
490 _spdk_put_io_channel(void *arg)
491 {
492 	struct spdk_io_channel *ch = arg;
493 	bool do_remove_dev = true;
494 
495 	assert(ch->thread == spdk_get_thread());
496 
497 	if (ch->ref > 0) {
498 		/*
499 		 * Another reference to the associated io_device was requested
500 		 *  after this message was sent but before it had a chance to
501 		 *  execute.
502 		 */
503 		return;
504 	}
505 
506 	pthread_mutex_lock(&g_devlist_mutex);
507 	TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
508 	pthread_mutex_unlock(&g_devlist_mutex);
509 
510 	/* Don't hold the devlist mutex while the destroy_cb is called. */
511 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
512 
513 	pthread_mutex_lock(&g_devlist_mutex);
514 	ch->dev->refcnt--;
515 
516 	if (!ch->dev->unregistered) {
517 		do_remove_dev = false;
518 	}
519 
520 	if (ch->dev->refcnt > 0) {
521 		do_remove_dev = false;
522 	}
523 
524 	pthread_mutex_unlock(&g_devlist_mutex);
525 
526 	if (do_remove_dev) {
527 		_spdk_io_device_free(ch->dev);
528 	}
529 	free(ch);
530 }
531 
532 void
533 spdk_put_io_channel(struct spdk_io_channel *ch)
534 {
535 	ch->ref--;
536 
537 	if (ch->ref == 0) {
538 		spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch);
539 	}
540 }
541 
542 struct spdk_io_channel *
543 spdk_io_channel_from_ctx(void *ctx)
544 {
545 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
546 }
547 
548 struct spdk_thread *
549 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
550 {
551 	return ch->thread;
552 }
553 
554 struct spdk_io_channel_iter {
555 	void *io_device;
556 	struct io_device *dev;
557 	spdk_channel_msg fn;
558 	int status;
559 	void *ctx;
560 	struct spdk_io_channel *ch;
561 
562 	struct spdk_thread *cur_thread;
563 
564 	struct spdk_thread *orig_thread;
565 	spdk_channel_for_each_cpl cpl;
566 };
567 
568 void *
569 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
570 {
571 	return i->io_device;
572 }
573 
574 struct spdk_io_channel *
575 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
576 {
577 	return i->ch;
578 }
579 
580 void *
581 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
582 {
583 	return i->ctx;
584 }
585 
586 static void
587 _call_completion(void *ctx)
588 {
589 	struct spdk_io_channel_iter *i = ctx;
590 
591 	if (i->cpl != NULL) {
592 		i->cpl(i, i->status);
593 	}
594 	free(i);
595 }
596 
597 static void
598 _call_channel(void *ctx)
599 {
600 	struct spdk_io_channel_iter *i = ctx;
601 	struct spdk_io_channel *ch;
602 
603 	/*
604 	 * It is possible that the channel was deleted before this
605 	 *  message had a chance to execute.  If so, skip calling
606 	 *  the fn() on this thread.
607 	 */
608 	pthread_mutex_lock(&g_devlist_mutex);
609 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
610 		if (ch->dev->io_device == i->io_device) {
611 			break;
612 		}
613 	}
614 	pthread_mutex_unlock(&g_devlist_mutex);
615 
616 	if (ch) {
617 		i->fn(i);
618 	} else {
619 		spdk_for_each_channel_continue(i, 0);
620 	}
621 }
622 
623 void
624 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
625 		      spdk_channel_for_each_cpl cpl)
626 {
627 	struct spdk_thread *thread;
628 	struct spdk_io_channel *ch;
629 	struct spdk_io_channel_iter *i;
630 
631 	i = calloc(1, sizeof(*i));
632 	if (!i) {
633 		SPDK_ERRLOG("Unable to allocate iterator\n");
634 		return;
635 	}
636 
637 	i->io_device = io_device;
638 	i->fn = fn;
639 	i->ctx = ctx;
640 	i->cpl = cpl;
641 
642 	pthread_mutex_lock(&g_devlist_mutex);
643 	i->orig_thread = _get_thread();
644 
645 	TAILQ_FOREACH(thread, &g_threads, tailq) {
646 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
647 			if (ch->dev->io_device == io_device) {
648 				ch->dev->for_each_count++;
649 				i->dev = ch->dev;
650 				i->cur_thread = thread;
651 				i->ch = ch;
652 				pthread_mutex_unlock(&g_devlist_mutex);
653 				spdk_thread_send_msg(thread, _call_channel, i);
654 				return;
655 			}
656 		}
657 	}
658 
659 	pthread_mutex_unlock(&g_devlist_mutex);
660 
661 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
662 }
663 
664 void
665 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
666 {
667 	struct spdk_thread *thread;
668 	struct spdk_io_channel *ch;
669 
670 	assert(i->cur_thread == spdk_get_thread());
671 
672 	i->status = status;
673 
674 	pthread_mutex_lock(&g_devlist_mutex);
675 	if (status) {
676 		goto end;
677 	}
678 	thread = TAILQ_NEXT(i->cur_thread, tailq);
679 	while (thread) {
680 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
681 			if (ch->dev->io_device == i->io_device) {
682 				i->cur_thread = thread;
683 				i->ch = ch;
684 				pthread_mutex_unlock(&g_devlist_mutex);
685 				spdk_thread_send_msg(thread, _call_channel, i);
686 				return;
687 			}
688 		}
689 		thread = TAILQ_NEXT(thread, tailq);
690 	}
691 
692 end:
693 	i->dev->for_each_count--;
694 	i->ch = NULL;
695 	pthread_mutex_unlock(&g_devlist_mutex);
696 
697 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
698 }
699