xref: /spdk/lib/thread/thread.c (revision 0d2745c94b03b159020b6812c6caddb4922e4449)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/thread.h"
37 #include "spdk/log.h"
38 
39 #ifdef __linux__
40 #include <sys/prctl.h>
41 #endif
42 
43 #ifdef __FreeBSD__
44 #include <pthread_np.h>
45 #endif
46 
47 static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
48 
49 struct io_device {
50 	void			*io_device;
51 	spdk_io_channel_create_cb create_cb;
52 	spdk_io_channel_destroy_cb destroy_cb;
53 	spdk_io_device_unregister_cb unregister_cb;
54 	struct spdk_thread	*unregister_thread;
55 	uint32_t		ctx_size;
56 	uint32_t		for_each_count;
57 	TAILQ_ENTRY(io_device)	tailq;
58 
59 	uint32_t		refcnt;
60 
61 	bool			unregistered;
62 };
63 
64 static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
65 
66 struct spdk_thread {
67 	pthread_t thread_id;
68 	spdk_thread_pass_msg msg_fn;
69 	spdk_start_poller start_poller_fn;
70 	spdk_stop_poller stop_poller_fn;
71 	void *thread_ctx;
72 	TAILQ_HEAD(, spdk_io_channel) io_channels;
73 	TAILQ_ENTRY(spdk_thread) tailq;
74 	char *name;
75 };
76 
77 static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
78 static uint32_t g_thread_count = 0;
79 
80 static struct spdk_thread *
81 _get_thread(void)
82 {
83 	pthread_t thread_id;
84 	struct spdk_thread *thread;
85 
86 	thread_id = pthread_self();
87 
88 	thread = NULL;
89 	TAILQ_FOREACH(thread, &g_threads, tailq) {
90 		if (thread->thread_id == thread_id) {
91 			return thread;
92 		}
93 	}
94 
95 	return NULL;
96 }
97 
98 static void
99 _set_thread_name(const char *thread_name)
100 {
101 #if defined(__linux__)
102 	prctl(PR_SET_NAME, thread_name, 0, 0, 0);
103 #elif defined(__FreeBSD__)
104 	pthread_set_name_np(pthread_self(), thread_name);
105 #else
106 #error missing platform support for thread name
107 #endif
108 }
109 
110 struct spdk_thread *
111 spdk_allocate_thread(spdk_thread_pass_msg msg_fn,
112 		     spdk_start_poller start_poller_fn,
113 		     spdk_stop_poller stop_poller_fn,
114 		     void *thread_ctx, const char *name)
115 {
116 	struct spdk_thread *thread;
117 
118 	pthread_mutex_lock(&g_devlist_mutex);
119 
120 	thread = _get_thread();
121 	if (thread) {
122 		SPDK_ERRLOG("Double allocated SPDK thread\n");
123 		pthread_mutex_unlock(&g_devlist_mutex);
124 		return NULL;
125 	}
126 
127 	thread = calloc(1, sizeof(*thread));
128 	if (!thread) {
129 		SPDK_ERRLOG("Unable to allocate memory for thread\n");
130 		pthread_mutex_unlock(&g_devlist_mutex);
131 		return NULL;
132 	}
133 
134 	thread->thread_id = pthread_self();
135 	thread->msg_fn = msg_fn;
136 	thread->start_poller_fn = start_poller_fn;
137 	thread->stop_poller_fn = stop_poller_fn;
138 	thread->thread_ctx = thread_ctx;
139 	TAILQ_INIT(&thread->io_channels);
140 	TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
141 	g_thread_count++;
142 	if (name) {
143 		_set_thread_name(name);
144 		thread->name = strdup(name);
145 	}
146 
147 	pthread_mutex_unlock(&g_devlist_mutex);
148 
149 	return thread;
150 }
151 
152 void
153 spdk_free_thread(void)
154 {
155 	struct spdk_thread *thread;
156 
157 	pthread_mutex_lock(&g_devlist_mutex);
158 
159 	thread = _get_thread();
160 	if (!thread) {
161 		SPDK_ERRLOG("No thread allocated\n");
162 		pthread_mutex_unlock(&g_devlist_mutex);
163 		return;
164 	}
165 
166 	assert(g_thread_count > 0);
167 	g_thread_count--;
168 	TAILQ_REMOVE(&g_threads, thread, tailq);
169 	free(thread->name);
170 	free(thread);
171 
172 	pthread_mutex_unlock(&g_devlist_mutex);
173 }
174 
175 uint32_t
176 spdk_thread_get_count(void)
177 {
178 	/*
179 	 * Return cached value of the current thread count.  We could acquire the
180 	 *  lock and iterate through the TAILQ of threads to count them, but that
181 	 *  count could still be invalidated after we release the lock.
182 	 */
183 	return g_thread_count;
184 }
185 
186 struct spdk_thread *
187 spdk_get_thread(void)
188 {
189 	struct spdk_thread *thread;
190 
191 	pthread_mutex_lock(&g_devlist_mutex);
192 
193 	thread = _get_thread();
194 	if (!thread) {
195 		SPDK_ERRLOG("No thread allocated\n");
196 	}
197 
198 	pthread_mutex_unlock(&g_devlist_mutex);
199 
200 	return thread;
201 }
202 
203 const char *
204 spdk_thread_get_name(const struct spdk_thread *thread)
205 {
206 	return thread->name;
207 }
208 
209 void
210 spdk_thread_send_msg(const struct spdk_thread *thread, spdk_thread_fn fn, void *ctx)
211 {
212 	thread->msg_fn(fn, ctx, thread->thread_ctx);
213 }
214 
215 
216 struct spdk_poller *
217 spdk_poller_register(spdk_poller_fn fn,
218 		     void *arg,
219 		     uint64_t period_microseconds)
220 {
221 	struct spdk_thread *thread;
222 	struct spdk_poller *poller;
223 
224 	thread = spdk_get_thread();
225 	if (!thread) {
226 		abort();
227 	}
228 
229 	if (!thread->start_poller_fn || !thread->stop_poller_fn) {
230 		SPDK_ERRLOG("No related functions to start requested poller\n");
231 		abort();
232 	}
233 
234 	poller = thread->start_poller_fn(thread->thread_ctx, fn, arg, period_microseconds);
235 	if (!poller) {
236 		SPDK_ERRLOG("Unable to start requested poller\n");
237 		abort();
238 	}
239 
240 	return poller;
241 }
242 
243 void
244 spdk_poller_unregister(struct spdk_poller **ppoller)
245 {
246 	struct spdk_thread *thread;
247 	struct spdk_poller *poller;
248 
249 	poller = *ppoller;
250 	if (poller == NULL) {
251 		return;
252 	}
253 
254 	*ppoller = NULL;
255 
256 	thread = spdk_get_thread();
257 
258 	if (thread) {
259 		thread->stop_poller_fn(poller, thread->thread_ctx);
260 	}
261 }
262 
263 struct call_thread {
264 	struct spdk_thread *cur_thread;
265 	spdk_thread_fn fn;
266 	void *ctx;
267 
268 	struct spdk_thread *orig_thread;
269 	spdk_thread_fn cpl;
270 };
271 
272 static void
273 spdk_on_thread(void *ctx)
274 {
275 	struct call_thread *ct = ctx;
276 
277 	ct->fn(ct->ctx);
278 
279 	pthread_mutex_lock(&g_devlist_mutex);
280 	ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
281 	pthread_mutex_unlock(&g_devlist_mutex);
282 
283 	if (!ct->cur_thread) {
284 		spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
285 		free(ctx);
286 	} else {
287 		spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx);
288 	}
289 }
290 
291 void
292 spdk_for_each_thread(spdk_thread_fn fn, void *ctx, spdk_thread_fn cpl)
293 {
294 	struct call_thread *ct;
295 
296 	ct = calloc(1, sizeof(*ct));
297 	if (!ct) {
298 		SPDK_ERRLOG("Unable to perform thread iteration\n");
299 		cpl(ctx);
300 		return;
301 	}
302 
303 	ct->fn = fn;
304 	ct->ctx = ctx;
305 	ct->cpl = cpl;
306 
307 	pthread_mutex_lock(&g_devlist_mutex);
308 	ct->orig_thread = _get_thread();
309 	ct->cur_thread = TAILQ_FIRST(&g_threads);
310 	pthread_mutex_unlock(&g_devlist_mutex);
311 
312 	spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct);
313 }
314 
315 void
316 spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
317 			spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size)
318 {
319 	struct io_device *dev, *tmp;
320 
321 	assert(io_device != NULL);
322 	assert(create_cb != NULL);
323 	assert(destroy_cb != NULL);
324 
325 	dev = calloc(1, sizeof(struct io_device));
326 	if (dev == NULL) {
327 		SPDK_ERRLOG("could not allocate io_device\n");
328 		return;
329 	}
330 
331 	dev->io_device = io_device;
332 	dev->create_cb = create_cb;
333 	dev->destroy_cb = destroy_cb;
334 	dev->unregister_cb = NULL;
335 	dev->ctx_size = ctx_size;
336 	dev->for_each_count = 0;
337 	dev->unregistered = false;
338 	dev->refcnt = 0;
339 
340 	pthread_mutex_lock(&g_devlist_mutex);
341 	TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
342 		if (tmp->io_device == io_device) {
343 			SPDK_ERRLOG("io_device %p already registered\n", io_device);
344 			free(dev);
345 			pthread_mutex_unlock(&g_devlist_mutex);
346 			return;
347 		}
348 	}
349 	TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
350 	pthread_mutex_unlock(&g_devlist_mutex);
351 }
352 
353 static void
354 _finish_unregister(void *arg)
355 {
356 	struct io_device *dev = arg;
357 
358 	dev->unregister_cb(dev->io_device);
359 	free(dev);
360 }
361 
362 static void
363 _spdk_io_device_free(struct io_device *dev)
364 {
365 	if (dev->unregister_cb == NULL) {
366 		free(dev);
367 	} else {
368 		assert(dev->unregister_thread != NULL);
369 		spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
370 	}
371 }
372 
373 void
374 spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
375 {
376 	struct io_device *dev;
377 	uint32_t refcnt;
378 	struct spdk_thread *thread;
379 
380 	thread = spdk_get_thread();
381 
382 	pthread_mutex_lock(&g_devlist_mutex);
383 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
384 		if (dev->io_device == io_device) {
385 			break;
386 		}
387 	}
388 
389 	if (!dev) {
390 		SPDK_ERRLOG("io_device %p not found\n", io_device);
391 		assert(false);
392 		pthread_mutex_unlock(&g_devlist_mutex);
393 		return;
394 	}
395 
396 	if (dev->for_each_count > 0) {
397 		SPDK_ERRLOG("io_device %p has %u for_each calls outstanding\n", io_device, dev->for_each_count);
398 		pthread_mutex_unlock(&g_devlist_mutex);
399 		return;
400 	}
401 
402 	dev->unregister_cb = unregister_cb;
403 	dev->unregistered = true;
404 	TAILQ_REMOVE(&g_io_devices, dev, tailq);
405 	refcnt = dev->refcnt;
406 	dev->unregister_thread = thread;
407 	pthread_mutex_unlock(&g_devlist_mutex);
408 
409 	if (refcnt > 0) {
410 		/* defer deletion */
411 		return;
412 	}
413 
414 	_spdk_io_device_free(dev);
415 }
416 
417 struct spdk_io_channel *
418 spdk_get_io_channel(void *io_device)
419 {
420 	struct spdk_io_channel *ch;
421 	struct spdk_thread *thread;
422 	struct io_device *dev;
423 	int rc;
424 
425 	pthread_mutex_lock(&g_devlist_mutex);
426 	TAILQ_FOREACH(dev, &g_io_devices, tailq) {
427 		if (dev->io_device == io_device) {
428 			break;
429 		}
430 	}
431 	if (dev == NULL) {
432 		SPDK_ERRLOG("could not find io_device %p\n", io_device);
433 		pthread_mutex_unlock(&g_devlist_mutex);
434 		return NULL;
435 	}
436 
437 	thread = _get_thread();
438 	if (!thread) {
439 		SPDK_ERRLOG("No thread allocated\n");
440 		pthread_mutex_unlock(&g_devlist_mutex);
441 		return NULL;
442 	}
443 
444 	TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
445 		if (ch->dev == dev) {
446 			ch->ref++;
447 			/*
448 			 * An I/O channel already exists for this device on this
449 			 *  thread, so return it.
450 			 */
451 			pthread_mutex_unlock(&g_devlist_mutex);
452 			return ch;
453 		}
454 	}
455 
456 	ch = calloc(1, sizeof(*ch) + dev->ctx_size);
457 	if (ch == NULL) {
458 		SPDK_ERRLOG("could not calloc spdk_io_channel\n");
459 		pthread_mutex_unlock(&g_devlist_mutex);
460 		return NULL;
461 	}
462 
463 	ch->dev = dev;
464 	ch->destroy_cb = dev->destroy_cb;
465 	ch->thread = thread;
466 	ch->ref = 1;
467 	TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
468 
469 	dev->refcnt++;
470 
471 	pthread_mutex_unlock(&g_devlist_mutex);
472 
473 	rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
474 	if (rc == -1) {
475 		pthread_mutex_lock(&g_devlist_mutex);
476 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
477 		dev->refcnt--;
478 		free(ch);
479 		pthread_mutex_unlock(&g_devlist_mutex);
480 		return NULL;
481 	}
482 
483 	return ch;
484 }
485 
486 static void
487 _spdk_put_io_channel(void *arg)
488 {
489 	struct spdk_io_channel *ch = arg;
490 	bool do_remove_dev = true;
491 
492 	assert(ch->thread == spdk_get_thread());
493 	assert(ch->ref == 0);
494 
495 	ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
496 
497 	pthread_mutex_lock(&g_devlist_mutex);
498 	ch->dev->refcnt--;
499 
500 	if (!ch->dev->unregistered) {
501 		do_remove_dev = false;
502 	}
503 
504 	if (ch->dev->refcnt > 0) {
505 		do_remove_dev = false;
506 	}
507 
508 	pthread_mutex_unlock(&g_devlist_mutex);
509 
510 	if (do_remove_dev) {
511 		_spdk_io_device_free(ch->dev);
512 	}
513 	free(ch);
514 }
515 
516 void
517 spdk_put_io_channel(struct spdk_io_channel *ch)
518 {
519 	ch->ref--;
520 
521 	if (ch->ref == 0) {
522 		/* If this was the last reference, remove the channel from the list */
523 		pthread_mutex_lock(&g_devlist_mutex);
524 		TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
525 		pthread_mutex_unlock(&g_devlist_mutex);
526 
527 		spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch);
528 	}
529 }
530 
531 struct spdk_io_channel *
532 spdk_io_channel_from_ctx(void *ctx)
533 {
534 	return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
535 }
536 
537 struct spdk_thread *
538 spdk_io_channel_get_thread(struct spdk_io_channel *ch)
539 {
540 	return ch->thread;
541 }
542 
543 struct spdk_io_channel_iter {
544 	void *io_device;
545 	struct io_device *dev;
546 	spdk_channel_msg fn;
547 	int status;
548 	void *ctx;
549 	struct spdk_io_channel *ch;
550 
551 	struct spdk_thread *cur_thread;
552 
553 	struct spdk_thread *orig_thread;
554 	spdk_channel_for_each_cpl cpl;
555 };
556 
557 void *
558 spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
559 {
560 	return i->io_device;
561 }
562 
563 struct spdk_io_channel *
564 spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
565 {
566 	return i->ch;
567 }
568 
569 void *
570 spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
571 {
572 	return i->ctx;
573 }
574 
575 static void
576 _call_completion(void *ctx)
577 {
578 	struct spdk_io_channel_iter *i = ctx;
579 
580 	if (i->cpl != NULL) {
581 		i->cpl(i, i->status);
582 	}
583 	free(i);
584 }
585 
586 static void
587 _call_channel(void *ctx)
588 {
589 	struct spdk_io_channel_iter *i = ctx;
590 	struct spdk_io_channel *ch;
591 
592 	/*
593 	 * It is possible that the channel was deleted before this
594 	 *  message had a chance to execute.  If so, skip calling
595 	 *  the fn() on this thread.
596 	 */
597 	pthread_mutex_lock(&g_devlist_mutex);
598 	TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
599 		if (ch->dev->io_device == i->io_device) {
600 			break;
601 		}
602 	}
603 	pthread_mutex_unlock(&g_devlist_mutex);
604 
605 	if (ch) {
606 		i->fn(i);
607 	} else {
608 		spdk_for_each_channel_continue(i, 0);
609 	}
610 }
611 
612 void
613 spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
614 		      spdk_channel_for_each_cpl cpl)
615 {
616 	struct spdk_thread *thread;
617 	struct spdk_io_channel *ch;
618 	struct spdk_io_channel_iter *i;
619 
620 	i = calloc(1, sizeof(*i));
621 	if (!i) {
622 		SPDK_ERRLOG("Unable to allocate iterator\n");
623 		return;
624 	}
625 
626 	i->io_device = io_device;
627 	i->fn = fn;
628 	i->ctx = ctx;
629 	i->cpl = cpl;
630 
631 	pthread_mutex_lock(&g_devlist_mutex);
632 	i->orig_thread = _get_thread();
633 
634 	TAILQ_FOREACH(thread, &g_threads, tailq) {
635 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
636 			if (ch->dev->io_device == io_device) {
637 				ch->dev->for_each_count++;
638 				i->dev = ch->dev;
639 				i->cur_thread = thread;
640 				i->ch = ch;
641 				pthread_mutex_unlock(&g_devlist_mutex);
642 				spdk_thread_send_msg(thread, _call_channel, i);
643 				return;
644 			}
645 		}
646 	}
647 
648 	pthread_mutex_unlock(&g_devlist_mutex);
649 
650 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
651 }
652 
653 void
654 spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
655 {
656 	struct spdk_thread *thread;
657 	struct spdk_io_channel *ch;
658 
659 	assert(i->cur_thread == spdk_get_thread());
660 
661 	i->status = status;
662 
663 	pthread_mutex_lock(&g_devlist_mutex);
664 	if (status) {
665 		goto end;
666 	}
667 	thread = TAILQ_NEXT(i->cur_thread, tailq);
668 	while (thread) {
669 		TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
670 			if (ch->dev->io_device == i->io_device) {
671 				i->cur_thread = thread;
672 				i->ch = ch;
673 				pthread_mutex_unlock(&g_devlist_mutex);
674 				spdk_thread_send_msg(thread, _call_channel, i);
675 				return;
676 			}
677 		}
678 		thread = TAILQ_NEXT(thread, tailq);
679 	}
680 
681 end:
682 	i->dev->for_each_count--;
683 	i->ch = NULL;
684 	pthread_mutex_unlock(&g_devlist_mutex);
685 
686 	spdk_thread_send_msg(i->orig_thread, _call_completion, i);
687 }
688