xref: /spdk/lib/fsdev/fsdev.c (revision c164db9ffe3718ad4e4f5bab380ccfa62c2fa672)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3  */
4 
5 #include "spdk/stdinc.h"
6 #include "spdk/fsdev.h"
7 #include "spdk/config.h"
8 #include "spdk/env.h"
9 #include "spdk/likely.h"
10 #include "spdk/queue.h"
11 #include "spdk/util.h"
12 #include "spdk/notify.h"
13 #include "spdk/fsdev_module.h"
14 #include "spdk/log.h"
15 #include "spdk/string.h"
16 #include "fsdev_internal.h"
17 
18 #define SPDK_FSDEV_IO_POOL_SIZE (64 * 1024 - 1)
19 #define SPDK_FSDEV_IO_CACHE_SIZE 256
20 
21 static struct spdk_fsdev_opts g_fsdev_opts = {
22 	.fsdev_io_pool_size = SPDK_FSDEV_IO_POOL_SIZE,
23 	.fsdev_io_cache_size = SPDK_FSDEV_IO_CACHE_SIZE,
24 };
25 
26 TAILQ_HEAD(spdk_fsdev_list, spdk_fsdev);
27 
28 RB_HEAD(fsdev_name_tree, spdk_fsdev_name);
29 
30 static int
31 fsdev_name_cmp(struct spdk_fsdev_name *name1, struct spdk_fsdev_name *name2)
32 {
33 	return strcmp(name1->name, name2->name);
34 }
35 
36 RB_GENERATE_STATIC(fsdev_name_tree, spdk_fsdev_name, node, fsdev_name_cmp);
37 
38 struct spdk_fsdev_mgr {
39 	struct spdk_mempool *fsdev_io_pool;
40 
41 	TAILQ_HEAD(fsdev_module_list, spdk_fsdev_module) fsdev_modules;
42 
43 	struct spdk_fsdev_list fsdevs;
44 	struct fsdev_name_tree fsdev_names;
45 
46 	bool init_complete;
47 	bool module_init_complete;
48 
49 	struct spdk_spinlock spinlock;
50 };
51 
52 static struct spdk_fsdev_mgr g_fsdev_mgr = {
53 	.fsdev_modules = TAILQ_HEAD_INITIALIZER(g_fsdev_mgr.fsdev_modules),
54 	.fsdevs = TAILQ_HEAD_INITIALIZER(g_fsdev_mgr.fsdevs),
55 	.fsdev_names = RB_INITIALIZER(g_fsdev_mgr.fsdev_names),
56 	.init_complete = false,
57 	.module_init_complete = false,
58 };
59 
60 static void
61 __attribute__((constructor))
62 _fsdev_init(void)
63 {
64 	spdk_spin_init(&g_fsdev_mgr.spinlock);
65 }
66 
67 
68 static spdk_fsdev_init_cb	g_init_cb_fn = NULL;
69 static void			*g_init_cb_arg = NULL;
70 
71 static spdk_fsdev_fini_cb	g_fini_cb_fn = NULL;
72 static void			*g_fini_cb_arg = NULL;
73 static struct spdk_thread	*g_fini_thread = NULL;
74 
75 struct spdk_fsdev_mgmt_channel {
76 	/*
77 	 * Each thread keeps a cache of fsdev_io - this allows
78 	 *  fsdev threads which are *not* DPDK threads to still
79 	 *  benefit from a per-thread fsdev_io cache.  Without
80 	 *  this, non-DPDK threads fetching from the mempool
81 	 *  incur a cmpxchg on get and put.
82 	 */
83 	fsdev_io_stailq_t per_thread_cache;
84 	uint32_t	per_thread_cache_count;
85 	uint32_t	fsdev_io_cache_size;
86 
87 	TAILQ_HEAD(, spdk_fsdev_shared_resource) shared_resources;
88 };
89 
90 /*
91  * Per-module (or per-io_device) data. Multiple fsdevs built on the same io_device
92  * will queue here their IO that awaits retry. It makes it possible to retry sending
93  * IO to one fsdev after IO from other fsdev completes.
94  */
95 struct spdk_fsdev_shared_resource {
96 	/* The fsdev management channel */
97 	struct spdk_fsdev_mgmt_channel *mgmt_ch;
98 
99 	/*
100 	 * Count of I/O submitted to fsdev module and waiting for completion.
101 	 * Incremented before submit_request() is called on an spdk_fsdev_io.
102 	 */
103 	uint64_t		io_outstanding;
104 
105 	/* I/O channel allocated by a fsdev module */
106 	struct spdk_io_channel	*shared_ch;
107 
108 	/* Refcount of fsdev channels using this resource */
109 	uint32_t		ref;
110 
111 	TAILQ_ENTRY(spdk_fsdev_shared_resource) link;
112 };
113 
114 struct spdk_fsdev_channel {
115 	struct spdk_fsdev	*fsdev;
116 
117 	/* The channel for the underlying device */
118 	struct spdk_io_channel	*channel;
119 
120 	/* Per io_device per thread data */
121 	struct spdk_fsdev_shared_resource *shared_resource;
122 
123 	/*
124 	 * Count of I/O submitted to the underlying dev module through this channel
125 	 * and waiting for completion.
126 	 */
127 	uint64_t		io_outstanding;
128 
129 	/*
130 	 * List of all submitted I/Os.
131 	 */
132 	fsdev_io_tailq_t	io_submitted;
133 };
134 
135 struct spdk_fsdev_desc {
136 	struct spdk_fsdev		*fsdev;
137 	struct spdk_thread		*thread;
138 	struct {
139 		spdk_fsdev_event_cb_t event_fn;
140 		void *ctx;
141 	}				callback;
142 	bool				closed;
143 	struct spdk_spinlock		spinlock;
144 	uint32_t			refs;
145 	TAILQ_ENTRY(spdk_fsdev_desc)	link;
146 };
147 
148 #define __fsdev_to_io_dev(fsdev)	(((char *)fsdev) + 1)
149 #define __fsdev_from_io_dev(io_dev)	((struct spdk_fsdev *)(((char *)io_dev) - 1))
150 #define __io_ch_to_fsdev_mgmt_ch(io_ch)	((struct spdk_fsdev_mgmt_channel *)spdk_io_channel_get_ctx(io_ch))
151 
152 static struct spdk_fsdev *
153 fsdev_get_by_name(const char *fsdev_name)
154 {
155 	struct spdk_fsdev_name find;
156 	struct spdk_fsdev_name *res;
157 
158 	find.name = (char *)fsdev_name;
159 	res = RB_FIND(fsdev_name_tree, &g_fsdev_mgr.fsdev_names, &find);
160 	if (res != NULL) {
161 		return res->fsdev;
162 	}
163 
164 	return NULL;
165 }
166 
167 static int
168 fsdev_module_get_max_ctx_size(void)
169 {
170 	struct spdk_fsdev_module *fsdev_module;
171 	int max_fsdev_module_size = 0;
172 
173 	TAILQ_FOREACH(fsdev_module, &g_fsdev_mgr.fsdev_modules, internal.tailq) {
174 		if (fsdev_module->get_ctx_size && fsdev_module->get_ctx_size() > max_fsdev_module_size) {
175 			max_fsdev_module_size = fsdev_module->get_ctx_size();
176 		}
177 	}
178 
179 	return max_fsdev_module_size;
180 }
181 
182 void
183 spdk_fsdev_subsystem_config_json(struct spdk_json_write_ctx *w)
184 {
185 	struct spdk_fsdev_module *fsdev_module;
186 	struct spdk_fsdev *fsdev;
187 
188 	assert(w != NULL);
189 
190 	spdk_json_write_array_begin(w);
191 
192 	spdk_json_write_object_begin(w);
193 	spdk_json_write_named_string(w, "method", "fsdev_set_opts");
194 	spdk_json_write_named_object_begin(w, "params");
195 	spdk_json_write_named_uint32(w, "fsdev_io_pool_size", g_fsdev_opts.fsdev_io_pool_size);
196 	spdk_json_write_named_uint32(w, "fsdev_io_cache_size", g_fsdev_opts.fsdev_io_cache_size);
197 	spdk_json_write_object_end(w); /* params */
198 	spdk_json_write_object_end(w);
199 
200 	TAILQ_FOREACH(fsdev_module, &g_fsdev_mgr.fsdev_modules, internal.tailq) {
201 		if (fsdev_module->config_json) {
202 			fsdev_module->config_json(w);
203 		}
204 	}
205 
206 	spdk_spin_lock(&g_fsdev_mgr.spinlock);
207 
208 	TAILQ_FOREACH(fsdev, &g_fsdev_mgr.fsdevs, internal.link) {
209 		if (fsdev->fn_table->write_config_json) {
210 			fsdev->fn_table->write_config_json(fsdev, w);
211 		}
212 	}
213 
214 	spdk_spin_unlock(&g_fsdev_mgr.spinlock);
215 	spdk_json_write_array_end(w);
216 }
217 
218 static void
219 fsdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
220 {
221 	struct spdk_fsdev_mgmt_channel *ch = ctx_buf;
222 	struct spdk_fsdev_io *fsdev_io;
223 
224 	if (!TAILQ_EMPTY(&ch->shared_resources)) {
225 		SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n");
226 	}
227 
228 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
229 		fsdev_io = STAILQ_FIRST(&ch->per_thread_cache);
230 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
231 		ch->per_thread_cache_count--;
232 		spdk_mempool_put(g_fsdev_mgr.fsdev_io_pool, (void *)fsdev_io);
233 	}
234 
235 	assert(ch->per_thread_cache_count == 0);
236 	return;
237 }
238 
239 static int
240 fsdev_mgmt_channel_create(void *io_device, void *ctx_buf)
241 {
242 	struct spdk_fsdev_mgmt_channel *ch = ctx_buf;
243 	struct spdk_fsdev_io *fsdev_io;
244 	uint32_t i;
245 
246 	STAILQ_INIT(&ch->per_thread_cache);
247 	ch->fsdev_io_cache_size = g_fsdev_opts.fsdev_io_cache_size;
248 
249 	/* Pre-populate fsdev_io cache to ensure this thread cannot be starved. */
250 	ch->per_thread_cache_count = 0;
251 	for (i = 0; i < ch->fsdev_io_cache_size; i++) {
252 		fsdev_io = spdk_mempool_get(g_fsdev_mgr.fsdev_io_pool);
253 		if (fsdev_io == NULL) {
254 			SPDK_ERRLOG("You need to increase fsdev_io_pool_size using fsdev_set_options RPC.\n");
255 			assert(false);
256 			fsdev_mgmt_channel_destroy(io_device, ctx_buf);
257 			return -1;
258 		}
259 		ch->per_thread_cache_count++;
260 		STAILQ_INSERT_HEAD(&ch->per_thread_cache, fsdev_io, internal.buf_link);
261 	}
262 
263 	TAILQ_INIT(&ch->shared_resources);
264 	return 0;
265 }
266 
267 static void
268 fsdev_init_complete(int rc)
269 {
270 	spdk_fsdev_init_cb cb_fn = g_init_cb_fn;
271 	void *cb_arg = g_init_cb_arg;
272 
273 	g_fsdev_mgr.init_complete = true;
274 	g_init_cb_fn = NULL;
275 	g_init_cb_arg = NULL;
276 
277 	cb_fn(cb_arg, rc);
278 }
279 
280 static void
281 fsdev_init_failed(void *cb_arg)
282 {
283 	fsdev_init_complete(-1);
284 }
285 
286 static int
287 fsdev_modules_init(void)
288 {
289 	struct spdk_fsdev_module *module;
290 	int rc = 0;
291 
292 	TAILQ_FOREACH(module, &g_fsdev_mgr.fsdev_modules, internal.tailq) {
293 		rc = module->module_init();
294 		if (rc != 0) {
295 			spdk_thread_send_msg(spdk_get_thread(), fsdev_init_failed, module);
296 			return rc;
297 		}
298 	}
299 
300 	return 0;
301 }
302 
303 void
304 spdk_fsdev_initialize(spdk_fsdev_init_cb cb_fn, void *cb_arg)
305 {
306 	int rc = 0;
307 	char mempool_name[32];
308 
309 	assert(cb_fn != NULL);
310 
311 	g_init_cb_fn = cb_fn;
312 	g_init_cb_arg = cb_arg;
313 
314 	spdk_notify_type_register("fsdev_register");
315 	spdk_notify_type_register("fsdev_unregister");
316 
317 	snprintf(mempool_name, sizeof(mempool_name), "fsdev_io_%d", getpid());
318 
319 	g_fsdev_mgr.fsdev_io_pool = spdk_mempool_create(mempool_name,
320 				    g_fsdev_opts.fsdev_io_pool_size,
321 				    sizeof(struct spdk_fsdev_io) +
322 				    fsdev_module_get_max_ctx_size(),
323 				    0,
324 				    SPDK_ENV_NUMA_ID_ANY);
325 
326 	if (g_fsdev_mgr.fsdev_io_pool == NULL) {
327 		SPDK_ERRLOG("Could not allocate spdk_fsdev_io pool\n");
328 		fsdev_init_complete(-1);
329 		return;
330 	}
331 
332 	spdk_io_device_register(&g_fsdev_mgr, fsdev_mgmt_channel_create,
333 				fsdev_mgmt_channel_destroy,
334 				sizeof(struct spdk_fsdev_mgmt_channel),
335 				"fsdev_mgr");
336 
337 	rc = fsdev_modules_init();
338 	g_fsdev_mgr.module_init_complete = true;
339 	if (rc != 0) {
340 		SPDK_ERRLOG("fsdev modules init failed\n");
341 		return;
342 	}
343 
344 	fsdev_init_complete(0);
345 }
346 
347 static void
348 fsdev_mgr_unregister_cb(void *io_device)
349 {
350 	spdk_fsdev_fini_cb cb_fn = g_fini_cb_fn;
351 
352 	if (g_fsdev_mgr.fsdev_io_pool) {
353 		if (spdk_mempool_count(g_fsdev_mgr.fsdev_io_pool) != g_fsdev_opts.fsdev_io_pool_size) {
354 			SPDK_ERRLOG("fsdev IO pool count is %zu but should be %u\n",
355 				    spdk_mempool_count(g_fsdev_mgr.fsdev_io_pool),
356 				    g_fsdev_opts.fsdev_io_pool_size);
357 		}
358 
359 		spdk_mempool_free(g_fsdev_mgr.fsdev_io_pool);
360 	}
361 
362 	cb_fn(g_fini_cb_arg);
363 	g_fini_cb_fn = NULL;
364 	g_fini_cb_arg = NULL;
365 	g_fsdev_mgr.init_complete = false;
366 	g_fsdev_mgr.module_init_complete = false;
367 }
368 
369 static void
370 fsdev_module_fini_iter(void *arg)
371 {
372 	struct spdk_fsdev_module *fsdev_module;
373 
374 	/* FIXME: Handling initialization failures is broken now,
375 	 * so we won't even try cleaning up after successfully
376 	 * initialized modules. if module_init_complete is false,
377 	 * just call spdk_fsdev_mgr_unregister_cb
378 	 */
379 	if (!g_fsdev_mgr.module_init_complete) {
380 		fsdev_mgr_unregister_cb(NULL);
381 		return;
382 	}
383 
384 	/* Start iterating from the last touched module */
385 	fsdev_module = TAILQ_LAST(&g_fsdev_mgr.fsdev_modules, fsdev_module_list);
386 	while (fsdev_module) {
387 		if (fsdev_module->module_fini) {
388 			fsdev_module->module_fini();
389 		}
390 
391 		fsdev_module = TAILQ_PREV(fsdev_module, fsdev_module_list,
392 					  internal.tailq);
393 	}
394 
395 	spdk_io_device_unregister(&g_fsdev_mgr, fsdev_mgr_unregister_cb);
396 }
397 
398 static void
399 fsdev_finish_unregister_fsdevs_iter(void *cb_arg, int fsdeverrno)
400 {
401 	struct spdk_fsdev *fsdev = cb_arg;
402 
403 	if (fsdeverrno && fsdev) {
404 		SPDK_WARNLOG("Unable to unregister fsdev '%s' during spdk_fsdev_finish()\n",
405 			     fsdev->name);
406 
407 		/*
408 		 * Since the call to spdk_fsdev_unregister() failed, we have no way to free this
409 		 *  fsdev; try to continue by manually removing this fsdev from the list and continue
410 		 *  with the next fsdev in the list.
411 		 */
412 		TAILQ_REMOVE(&g_fsdev_mgr.fsdevs, fsdev, internal.link);
413 	}
414 
415 	fsdev = TAILQ_FIRST(&g_fsdev_mgr.fsdevs);
416 	if (!fsdev) {
417 		SPDK_DEBUGLOG(fsdev, "Done unregistering fsdevs\n");
418 		/*
419 		 * Fsdev module finish need to be deferred as we might be in the middle of some context
420 		 * that will use this fsdev (or private fsdev driver ctx data)
421 		 * after returning.
422 		 */
423 		spdk_thread_send_msg(spdk_get_thread(), fsdev_module_fini_iter, NULL);
424 		return;
425 	}
426 
427 	SPDK_DEBUGLOG(fsdev, "Unregistering fsdev '%s'\n", fsdev->name);
428 	spdk_fsdev_unregister(fsdev, fsdev_finish_unregister_fsdevs_iter, fsdev);
429 	return;
430 }
431 
432 void
433 spdk_fsdev_finish(spdk_fsdev_fini_cb cb_fn, void *cb_arg)
434 {
435 	assert(cb_fn != NULL);
436 	g_fini_thread = spdk_get_thread();
437 	g_fini_cb_fn = cb_fn;
438 	g_fini_cb_arg = cb_arg;
439 	fsdev_finish_unregister_fsdevs_iter(NULL, 0);
440 }
441 
442 struct spdk_fsdev_io *
443 fsdev_channel_get_io(struct spdk_fsdev_channel *channel)
444 {
445 	struct spdk_fsdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch;
446 	struct spdk_fsdev_io *fsdev_io;
447 
448 	if (ch->per_thread_cache_count > 0) {
449 		fsdev_io = STAILQ_FIRST(&ch->per_thread_cache);
450 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
451 		ch->per_thread_cache_count--;
452 	} else {
453 		fsdev_io = spdk_mempool_get(g_fsdev_mgr.fsdev_io_pool);
454 	}
455 
456 	return fsdev_io;
457 }
458 
459 void
460 spdk_fsdev_free_io(struct spdk_fsdev_io *fsdev_io)
461 {
462 	struct spdk_fsdev_mgmt_channel *ch;
463 
464 	assert(fsdev_io != NULL);
465 
466 	ch = fsdev_io->internal.ch->shared_resource->mgmt_ch;
467 
468 	if (ch->per_thread_cache_count < ch->fsdev_io_cache_size) {
469 		ch->per_thread_cache_count++;
470 		STAILQ_INSERT_HEAD(&ch->per_thread_cache, fsdev_io, internal.buf_link);
471 	} else {
472 		spdk_mempool_put(g_fsdev_mgr.fsdev_io_pool, (void *)fsdev_io);
473 	}
474 }
475 
476 void
477 fsdev_io_submit(struct spdk_fsdev_io *fsdev_io)
478 {
479 	struct spdk_fsdev *fsdev = fsdev_io->fsdev;
480 	struct spdk_fsdev_channel *ch = fsdev_io->internal.ch;
481 	struct spdk_fsdev_shared_resource *shared_resource = ch->shared_resource;
482 
483 	TAILQ_INSERT_TAIL(&ch->io_submitted, fsdev_io, internal.ch_link);
484 
485 	ch->io_outstanding++;
486 	shared_resource->io_outstanding++;
487 	fsdev_io->internal.in_submit_request = true;
488 	fsdev->fn_table->submit_request(ch->channel, fsdev_io);
489 	fsdev_io->internal.in_submit_request = false;
490 }
491 
492 static void
493 fsdev_channel_destroy_resource(struct spdk_fsdev_channel *ch)
494 {
495 	struct spdk_fsdev_shared_resource *shared_resource;
496 
497 	spdk_put_io_channel(ch->channel);
498 
499 	shared_resource = ch->shared_resource;
500 
501 	assert(TAILQ_EMPTY(&ch->io_submitted));
502 	assert(ch->io_outstanding == 0);
503 	assert(shared_resource->ref > 0);
504 	shared_resource->ref--;
505 	if (shared_resource->ref == 0) {
506 		assert(shared_resource->io_outstanding == 0);
507 		TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link);
508 		spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch));
509 		free(shared_resource);
510 	}
511 }
512 
513 static void
514 fsdev_desc_free(struct spdk_fsdev_desc *desc)
515 {
516 	spdk_spin_destroy(&desc->spinlock);
517 	free(desc);
518 }
519 
520 
521 static int
522 fsdev_channel_create(void *io_device, void *ctx_buf)
523 {
524 	struct spdk_fsdev		*fsdev = __fsdev_from_io_dev(io_device);
525 	struct spdk_fsdev_channel	*ch = ctx_buf;
526 	struct spdk_io_channel		*mgmt_io_ch;
527 	struct spdk_fsdev_mgmt_channel	*mgmt_ch;
528 	struct spdk_fsdev_shared_resource *shared_resource;
529 
530 	ch->fsdev = fsdev;
531 	ch->channel = fsdev->fn_table->get_io_channel(fsdev->ctxt);
532 	if (!ch->channel) {
533 		return -1;
534 	}
535 
536 	mgmt_io_ch = spdk_get_io_channel(&g_fsdev_mgr);
537 	if (!mgmt_io_ch) {
538 		spdk_put_io_channel(ch->channel);
539 		return -1;
540 	}
541 
542 	mgmt_ch = __io_ch_to_fsdev_mgmt_ch(mgmt_io_ch);
543 	TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) {
544 		if (shared_resource->shared_ch == ch->channel) {
545 			spdk_put_io_channel(mgmt_io_ch);
546 			shared_resource->ref++;
547 			break;
548 		}
549 	}
550 
551 	if (shared_resource == NULL) {
552 		shared_resource = calloc(1, sizeof(*shared_resource));
553 		if (shared_resource == NULL) {
554 			spdk_put_io_channel(ch->channel);
555 			spdk_put_io_channel(mgmt_io_ch);
556 			return -1;
557 		}
558 
559 		shared_resource->mgmt_ch = mgmt_ch;
560 		shared_resource->io_outstanding = 0;
561 		shared_resource->shared_ch = ch->channel;
562 		shared_resource->ref = 1;
563 		TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link);
564 	}
565 
566 	ch->io_outstanding = 0;
567 	ch->shared_resource = shared_resource;
568 	TAILQ_INIT(&ch->io_submitted);
569 	return 0;
570 }
571 
572 static void
573 fsdev_channel_destroy(void *io_device, void *ctx_buf)
574 {
575 	struct spdk_fsdev_channel *ch = ctx_buf;
576 
577 	SPDK_DEBUGLOG(fsdev, "Destroying channel %p for fsdev %s on thread %p\n",
578 		      ch, ch->fsdev->name,
579 		      spdk_get_thread());
580 	fsdev_channel_destroy_resource(ch);
581 }
582 
583 /*
584  * If the name already exists in the global fsdev name tree, RB_INSERT() returns a pointer
585  * to it. Hence we do not have to call fsdev_get_by_name() when using this function.
586  */
587 static int
588 fsdev_name_add(struct spdk_fsdev_name *fsdev_name, struct spdk_fsdev *fsdev, const char *name)
589 {
590 	struct spdk_fsdev_name *tmp;
591 
592 	fsdev_name->name = strdup(name);
593 	if (fsdev_name->name == NULL) {
594 		SPDK_ERRLOG("Unable to allocate fsdev name\n");
595 		return -ENOMEM;
596 	}
597 
598 	fsdev_name->fsdev = fsdev;
599 
600 	spdk_spin_lock(&g_fsdev_mgr.spinlock);
601 	tmp = RB_INSERT(fsdev_name_tree, &g_fsdev_mgr.fsdev_names, fsdev_name);
602 	spdk_spin_unlock(&g_fsdev_mgr.spinlock);
603 	if (tmp != NULL) {
604 		SPDK_ERRLOG("Fsdev name %s already exists\n", name);
605 		free(fsdev_name->name);
606 		return -EEXIST;
607 	}
608 
609 	return 0;
610 }
611 
612 static void
613 fsdev_name_del_unsafe(struct spdk_fsdev_name *fsdev_name)
614 {
615 	RB_REMOVE(fsdev_name_tree, &g_fsdev_mgr.fsdev_names, fsdev_name);
616 	free(fsdev_name->name);
617 }
618 
619 struct spdk_io_channel *
620 spdk_fsdev_get_io_channel(struct spdk_fsdev_desc *desc)
621 {
622 	return spdk_get_io_channel(__fsdev_to_io_dev(spdk_fsdev_desc_get_fsdev(desc)));
623 }
624 
625 int
626 spdk_fsdev_set_opts(const struct spdk_fsdev_opts *opts)
627 {
628 	uint32_t min_pool_size;
629 
630 	if (!opts) {
631 		SPDK_ERRLOG("opts cannot be NULL\n");
632 		return -EINVAL;
633 	}
634 
635 	if (!opts->opts_size) {
636 		SPDK_ERRLOG("opts_size inside opts cannot be zero value\n");
637 		return -EINVAL;
638 	}
639 
640 	/*
641 	 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem
642 	 *  initialization.  A second mgmt_ch will be created on the same thread when the application starts
643 	 *  but before the deferred put_io_channel event is executed for the first mgmt_ch.
644 	 */
645 	min_pool_size = opts->fsdev_io_cache_size * (spdk_thread_get_count() + 1);
646 	if (opts->fsdev_io_pool_size < min_pool_size) {
647 		SPDK_ERRLOG("fsdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32
648 			    " and %" PRIu32 " threads\n", opts->fsdev_io_pool_size, opts->fsdev_io_cache_size,
649 			    spdk_thread_get_count());
650 		SPDK_ERRLOG("fsdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size);
651 		return -EINVAL;
652 	}
653 
654 #define SET_FIELD(field) \
655         if (offsetof(struct spdk_fsdev_opts, field) + sizeof(opts->field) <= opts->opts_size) { \
656                 g_fsdev_opts.field = opts->field; \
657         } \
658 
659 	SET_FIELD(fsdev_io_pool_size);
660 	SET_FIELD(fsdev_io_cache_size);
661 
662 	g_fsdev_opts.opts_size = opts->opts_size;
663 
664 #undef SET_FIELD
665 
666 	return 0;
667 }
668 
669 int
670 spdk_fsdev_get_opts(struct spdk_fsdev_opts *opts, size_t opts_size)
671 {
672 	if (!opts) {
673 		SPDK_ERRLOG("opts should not be NULL\n");
674 		return -EINVAL;
675 	}
676 
677 	if (!opts_size) {
678 		SPDK_ERRLOG("opts_size should not be zero value\n");
679 		return -EINVAL;
680 	}
681 
682 	opts->opts_size = opts_size;
683 
684 #define SET_FIELD(field) \
685 	if (offsetof(struct spdk_fsdev_opts, field) + sizeof(opts->field) <= opts_size) { \
686 		opts->field = g_fsdev_opts.field; \
687 	}
688 
689 	SET_FIELD(fsdev_io_pool_size);
690 	SET_FIELD(fsdev_io_cache_size);
691 
692 	/* Do not remove this statement, you should always update this statement when you adding a new field,
693 	 * and do not forget to add the SET_FIELD statement for your added field. */
694 	SPDK_STATIC_ASSERT(sizeof(struct spdk_fsdev_opts) == 12, "Incorrect size");
695 
696 #undef SET_FIELD
697 	return 0;
698 }
699 
700 int
701 spdk_fsdev_get_memory_domains(struct spdk_fsdev *fsdev, struct spdk_memory_domain **domains,
702 			      int array_size)
703 {
704 	if (!fsdev) {
705 		return -EINVAL;
706 	}
707 
708 	if (fsdev->fn_table->get_memory_domains) {
709 		return fsdev->fn_table->get_memory_domains(fsdev->ctxt, domains, array_size);
710 	}
711 
712 	return 0;
713 }
714 
715 const char *
716 spdk_fsdev_get_module_name(const struct spdk_fsdev *fsdev)
717 {
718 	return fsdev->module->name;
719 }
720 
721 const char *
722 spdk_fsdev_get_name(const struct spdk_fsdev *fsdev)
723 {
724 	return fsdev->name;
725 }
726 
727 static inline void
728 fsdev_io_complete(void *ctx)
729 {
730 	struct spdk_fsdev_io *fsdev_io = ctx;
731 	struct spdk_fsdev_channel *fsdev_ch = fsdev_io->internal.ch;
732 
733 	if (spdk_unlikely(fsdev_io->internal.in_submit_request)) {
734 		/*
735 		 * Defer completion to avoid potential infinite recursion if the
736 		 * user's completion callback issues a new I/O.
737 		 */
738 		spdk_thread_send_msg(spdk_fsdev_io_get_thread(fsdev_io),
739 				     fsdev_io_complete, fsdev_io);
740 		return;
741 	}
742 
743 	TAILQ_REMOVE(&fsdev_ch->io_submitted, fsdev_io, internal.ch_link);
744 
745 	assert(fsdev_io->internal.cb_fn != NULL);
746 	assert(spdk_get_thread() == spdk_fsdev_io_get_thread(fsdev_io));
747 	fsdev_io->internal.cb_fn(fsdev_io, fsdev_io->internal.cb_arg);
748 }
749 
750 
751 void
752 spdk_fsdev_io_complete(struct spdk_fsdev_io *fsdev_io, int status)
753 {
754 	struct spdk_fsdev_channel *fsdev_ch = fsdev_io->internal.ch;
755 	struct spdk_fsdev_shared_resource *shared_resource = fsdev_ch->shared_resource;
756 
757 	assert(status <= 0);
758 	fsdev_io->internal.status = status;
759 	assert(fsdev_ch->io_outstanding > 0);
760 	assert(shared_resource->io_outstanding > 0);
761 	fsdev_ch->io_outstanding--;
762 	shared_resource->io_outstanding--;
763 	fsdev_io_complete(fsdev_io);
764 }
765 
766 struct spdk_thread *
767 spdk_fsdev_io_get_thread(struct spdk_fsdev_io *fsdev_io)
768 {
769 	return spdk_io_channel_get_thread(fsdev_io->internal.ch->channel);
770 }
771 
772 struct spdk_io_channel *
773 spdk_fsdev_io_get_io_channel(struct spdk_fsdev_io *fsdev_io)
774 {
775 	return fsdev_io->internal.ch->channel;
776 }
777 
778 static int
779 fsdev_register(struct spdk_fsdev *fsdev)
780 {
781 	char *fsdev_name;
782 	int ret;
783 
784 	assert(fsdev->module != NULL);
785 
786 	if (!fsdev->name) {
787 		SPDK_ERRLOG("Fsdev name is NULL\n");
788 		return -EINVAL;
789 	}
790 
791 	if (!strlen(fsdev->name)) {
792 		SPDK_ERRLOG("Fsdev name must not be an empty string\n");
793 		return -EINVAL;
794 	}
795 
796 	/* Users often register their own I/O devices using the fsdev name. In
797 	 * order to avoid conflicts, prepend fsdev_. */
798 	fsdev_name = spdk_sprintf_alloc("fsdev_%s", fsdev->name);
799 	if (!fsdev_name) {
800 		SPDK_ERRLOG("Unable to allocate memory for internal fsdev name.\n");
801 		return -ENOMEM;
802 	}
803 
804 	fsdev->internal.status = SPDK_FSDEV_STATUS_READY;
805 	TAILQ_INIT(&fsdev->internal.open_descs);
806 
807 	ret = fsdev_name_add(&fsdev->internal.fsdev_name, fsdev, fsdev->name);
808 	if (ret != 0) {
809 		free(fsdev_name);
810 		return ret;
811 	}
812 
813 	spdk_io_device_register(__fsdev_to_io_dev(fsdev),
814 				fsdev_channel_create, fsdev_channel_destroy,
815 				sizeof(struct spdk_fsdev_channel),
816 				fsdev_name);
817 
818 	free(fsdev_name);
819 
820 	spdk_spin_init(&fsdev->internal.spinlock);
821 
822 	SPDK_DEBUGLOG(fsdev, "Inserting fsdev %s into list\n", fsdev->name);
823 	TAILQ_INSERT_TAIL(&g_fsdev_mgr.fsdevs, fsdev, internal.link);
824 	return 0;
825 }
826 
827 static void
828 fsdev_destroy_cb(void *io_device)
829 {
830 	int			rc;
831 	struct spdk_fsdev	*fsdev;
832 	spdk_fsdev_unregister_cb cb_fn;
833 	void			*cb_arg;
834 
835 	fsdev = __fsdev_from_io_dev(io_device);
836 	cb_fn = fsdev->internal.unregister_cb;
837 	cb_arg = fsdev->internal.unregister_ctx;
838 
839 	spdk_spin_destroy(&fsdev->internal.spinlock);
840 
841 	rc = fsdev->fn_table->destruct(fsdev->ctxt);
842 	if (rc < 0) {
843 		SPDK_ERRLOG("destruct failed\n");
844 	}
845 	if (rc <= 0 && cb_fn != NULL) {
846 		cb_fn(cb_arg, rc);
847 	}
848 }
849 
850 void
851 spdk_fsdev_destruct_done(struct spdk_fsdev *fsdev, int fsdeverrno)
852 {
853 	if (fsdev->internal.unregister_cb != NULL) {
854 		fsdev->internal.unregister_cb(fsdev->internal.unregister_ctx, fsdeverrno);
855 	}
856 }
857 
858 static void
859 _remove_notify(void *arg)
860 {
861 	struct spdk_fsdev_desc *desc = arg;
862 
863 	spdk_spin_lock(&desc->spinlock);
864 	desc->refs--;
865 
866 	if (!desc->closed) {
867 		spdk_spin_unlock(&desc->spinlock);
868 		desc->callback.event_fn(SPDK_FSDEV_EVENT_REMOVE, desc->fsdev, desc->callback.ctx);
869 		return;
870 	} else if (0 == desc->refs) {
871 		/* This descriptor was closed after this remove_notify message was sent.
872 		 * spdk_fsdev_close() could not free the descriptor since this message was
873 		 * in flight, so we free it now using fsdev_desc_free().
874 		 */
875 		spdk_spin_unlock(&desc->spinlock);
876 		fsdev_desc_free(desc);
877 		return;
878 	}
879 	spdk_spin_unlock(&desc->spinlock);
880 }
881 
882 /* Must be called while holding g_fsdev_mgr.mutex and fsdev->internal.spinlock.
883  * returns: 0 - fsdev removed and ready to be destructed.
884  *          -EBUSY - fsdev can't be destructed yet.  */
885 static int
886 fsdev_unregister_unsafe(struct spdk_fsdev *fsdev)
887 {
888 	struct spdk_fsdev_desc	*desc, *tmp;
889 	int			rc = 0;
890 
891 	/* Notify each descriptor about hotremoval */
892 	TAILQ_FOREACH_SAFE(desc, &fsdev->internal.open_descs, link, tmp) {
893 		rc = -EBUSY;
894 		spdk_spin_lock(&desc->spinlock);
895 		/*
896 		 * Defer invocation of the event_cb to a separate message that will
897 		 *  run later on its thread.  This ensures this context unwinds and
898 		 *  we don't recursively unregister this fsdev again if the event_cb
899 		 *  immediately closes its descriptor.
900 		 */
901 		desc->refs++;
902 		spdk_thread_send_msg(desc->thread, _remove_notify, desc);
903 		spdk_spin_unlock(&desc->spinlock);
904 	}
905 
906 	/* If there are no descriptors, proceed removing the fsdev */
907 	if (rc == 0) {
908 		TAILQ_REMOVE(&g_fsdev_mgr.fsdevs, fsdev, internal.link);
909 		SPDK_DEBUGLOG(fsdev, "Removing fsdev %s from list done\n", fsdev->name);
910 		fsdev_name_del_unsafe(&fsdev->internal.fsdev_name);
911 		spdk_notify_send("fsdev_unregister", spdk_fsdev_get_name(fsdev));
912 	}
913 
914 	return rc;
915 }
916 
917 static void
918 fsdev_unregister(struct spdk_fsdev *fsdev, void *_ctx, int status)
919 {
920 	int rc;
921 
922 	spdk_spin_lock(&g_fsdev_mgr.spinlock);
923 	spdk_spin_lock(&fsdev->internal.spinlock);
924 	/*
925 	 * Set the status to REMOVING after completing to abort channels. Otherwise,
926 	 * the last spdk_fsdev_close() may call spdk_io_device_unregister() while
927 	 * spdk_fsdev_for_each_channel() is executed and spdk_io_device_unregister()
928 	 * may fail.
929 	 */
930 	fsdev->internal.status = SPDK_FSDEV_STATUS_REMOVING;
931 	rc = fsdev_unregister_unsafe(fsdev);
932 	spdk_spin_unlock(&fsdev->internal.spinlock);
933 	spdk_spin_unlock(&g_fsdev_mgr.spinlock);
934 
935 	if (rc == 0) {
936 		spdk_io_device_unregister(__fsdev_to_io_dev(fsdev), fsdev_destroy_cb);
937 	}
938 }
939 
940 void
941 spdk_fsdev_unregister(struct spdk_fsdev *fsdev, spdk_fsdev_unregister_cb cb_fn, void *cb_arg)
942 {
943 	struct spdk_thread	*thread;
944 
945 	SPDK_DEBUGLOG(fsdev, "Removing fsdev %s from list\n", fsdev->name);
946 
947 	thread = spdk_get_thread();
948 	if (!thread) {
949 		/* The user called this from a non-SPDK thread. */
950 		if (cb_fn != NULL) {
951 			cb_fn(cb_arg, -ENOTSUP);
952 		}
953 		return;
954 	}
955 
956 	spdk_spin_lock(&g_fsdev_mgr.spinlock);
957 	if (fsdev->internal.status == SPDK_FSDEV_STATUS_UNREGISTERING ||
958 	    fsdev->internal.status == SPDK_FSDEV_STATUS_REMOVING) {
959 		spdk_spin_unlock(&g_fsdev_mgr.spinlock);
960 		if (cb_fn) {
961 			cb_fn(cb_arg, -EBUSY);
962 		}
963 		return;
964 	}
965 
966 	spdk_spin_lock(&fsdev->internal.spinlock);
967 	fsdev->internal.status = SPDK_FSDEV_STATUS_UNREGISTERING;
968 	fsdev->internal.unregister_cb = cb_fn;
969 	fsdev->internal.unregister_ctx = cb_arg;
970 	spdk_spin_unlock(&fsdev->internal.spinlock);
971 	spdk_spin_unlock(&g_fsdev_mgr.spinlock);
972 
973 	/* @todo: bdev aborts IOs on all channels here. */
974 	fsdev_unregister(fsdev, fsdev, 0);
975 }
976 
977 static void
978 _tmp_fsdev_event_cb(enum spdk_fsdev_event_type type, struct spdk_fsdev *fsdev, void *ctx)
979 {
980 	SPDK_NOTICELOG("Unexpected fsdev event type: %d\n", type);
981 }
982 
983 int
984 spdk_fsdev_unregister_by_name(const char *fsdev_name, struct spdk_fsdev_module *module,
985 			      spdk_fsdev_unregister_cb cb_fn, void *cb_arg)
986 {
987 	struct spdk_fsdev_desc *desc;
988 	struct spdk_fsdev *fsdev;
989 	int rc;
990 
991 	rc = spdk_fsdev_open(fsdev_name, _tmp_fsdev_event_cb, NULL, &desc);
992 	if (rc != 0) {
993 		SPDK_ERRLOG("Failed to open fsdev with name: %s\n", fsdev_name);
994 		return rc;
995 	}
996 
997 	fsdev = spdk_fsdev_desc_get_fsdev(desc);
998 
999 	if (fsdev->module != module) {
1000 		spdk_fsdev_close(desc);
1001 		SPDK_ERRLOG("Fsdev %s was not registered by the specified module.\n",
1002 			    fsdev_name);
1003 		return -ENODEV;
1004 	}
1005 
1006 	spdk_fsdev_unregister(fsdev, cb_fn, cb_arg);
1007 	spdk_fsdev_close(desc);
1008 
1009 	return 0;
1010 }
1011 
1012 static int
1013 fsdev_open(struct spdk_fsdev *fsdev, struct spdk_fsdev_desc *desc)
1014 {
1015 	struct spdk_thread *thread;
1016 
1017 	thread = spdk_get_thread();
1018 	if (!thread) {
1019 		SPDK_ERRLOG("Cannot open fsdev from non-SPDK thread.\n");
1020 		return -ENOTSUP;
1021 	}
1022 
1023 	SPDK_DEBUGLOG(fsdev, "Opening descriptor %p for fsdev %s on thread %p\n",
1024 		      desc, fsdev->name, spdk_get_thread());
1025 
1026 	desc->fsdev = fsdev;
1027 	desc->thread = thread;
1028 
1029 	spdk_spin_lock(&fsdev->internal.spinlock);
1030 	if (fsdev->internal.status == SPDK_FSDEV_STATUS_UNREGISTERING ||
1031 	    fsdev->internal.status == SPDK_FSDEV_STATUS_REMOVING) {
1032 		spdk_spin_unlock(&fsdev->internal.spinlock);
1033 		return -ENODEV;
1034 	}
1035 
1036 	TAILQ_INSERT_TAIL(&fsdev->internal.open_descs, desc, link);
1037 	spdk_spin_unlock(&fsdev->internal.spinlock);
1038 	return 0;
1039 }
1040 
1041 static int
1042 fsdev_desc_alloc(struct spdk_fsdev *fsdev, spdk_fsdev_event_cb_t event_cb, void *event_ctx,
1043 		 struct spdk_fsdev_desc **_desc)
1044 {
1045 	struct spdk_fsdev_desc *desc;
1046 
1047 	desc = calloc(1, sizeof(*desc));
1048 	if (desc == NULL) {
1049 		SPDK_ERRLOG("Failed to allocate memory for fsdev descriptor\n");
1050 		return -ENOMEM;
1051 	}
1052 
1053 	desc->callback.event_fn = event_cb;
1054 	desc->callback.ctx = event_ctx;
1055 	spdk_spin_init(&desc->spinlock);
1056 	*_desc = desc;
1057 	return 0;
1058 }
1059 
1060 int
1061 spdk_fsdev_open(const char *fsdev_name, spdk_fsdev_event_cb_t event_cb, void *event_ctx,
1062 		struct spdk_fsdev_desc **_desc)
1063 {
1064 	struct spdk_fsdev_desc *desc;
1065 	struct spdk_fsdev *fsdev;
1066 	int rc;
1067 
1068 	if (event_cb == NULL) {
1069 		SPDK_ERRLOG("Missing event callback function\n");
1070 		return -EINVAL;
1071 	}
1072 
1073 	spdk_spin_lock(&g_fsdev_mgr.spinlock);
1074 
1075 	fsdev = fsdev_get_by_name(fsdev_name);
1076 	if (fsdev == NULL) {
1077 		SPDK_NOTICELOG("Currently unable to find fsdev with name: %s\n", fsdev_name);
1078 		spdk_spin_unlock(&g_fsdev_mgr.spinlock);
1079 		return -ENODEV;
1080 	}
1081 
1082 	rc = fsdev_desc_alloc(fsdev, event_cb, event_ctx, &desc);
1083 	if (rc != 0) {
1084 		spdk_spin_unlock(&g_fsdev_mgr.spinlock);
1085 		return rc;
1086 	}
1087 
1088 	rc = fsdev_open(fsdev, desc);
1089 	if (rc != 0) {
1090 		fsdev_desc_free(desc);
1091 		desc = NULL;
1092 	}
1093 
1094 	*_desc = desc;
1095 	spdk_spin_unlock(&g_fsdev_mgr.spinlock);
1096 	return rc;
1097 }
1098 
1099 static void
1100 fsdev_close(struct spdk_fsdev *fsdev, struct spdk_fsdev_desc *desc)
1101 {
1102 	int rc;
1103 
1104 	spdk_spin_lock(&fsdev->internal.spinlock);
1105 	spdk_spin_lock(&desc->spinlock);
1106 
1107 	TAILQ_REMOVE(&fsdev->internal.open_descs, desc, link);
1108 	desc->closed = true;
1109 	if (0 == desc->refs) {
1110 		spdk_spin_unlock(&desc->spinlock);
1111 		fsdev_desc_free(desc);
1112 	} else {
1113 		spdk_spin_unlock(&desc->spinlock);
1114 	}
1115 
1116 	if (fsdev->internal.status == SPDK_FSDEV_STATUS_REMOVING &&
1117 	    TAILQ_EMPTY(&fsdev->internal.open_descs)) {
1118 		rc = fsdev_unregister_unsafe(fsdev);
1119 		spdk_spin_unlock(&fsdev->internal.spinlock);
1120 
1121 		if (rc == 0) {
1122 			spdk_io_device_unregister(__fsdev_to_io_dev(fsdev), fsdev_destroy_cb);
1123 		}
1124 	} else {
1125 		spdk_spin_unlock(&fsdev->internal.spinlock);
1126 	}
1127 }
1128 
1129 void
1130 spdk_fsdev_close(struct spdk_fsdev_desc *desc)
1131 {
1132 	struct spdk_fsdev *fsdev = spdk_fsdev_desc_get_fsdev(desc);
1133 
1134 	SPDK_DEBUGLOG(fsdev, "Closing descriptor %p for fsdev %s on thread %p\n",
1135 		      desc, fsdev->name, spdk_get_thread());
1136 	assert(desc->thread == spdk_get_thread());
1137 	spdk_spin_lock(&g_fsdev_mgr.spinlock);
1138 	fsdev_close(fsdev, desc);
1139 	spdk_spin_unlock(&g_fsdev_mgr.spinlock);
1140 }
1141 
1142 int
1143 spdk_fsdev_register(struct spdk_fsdev *fsdev)
1144 {
1145 	int rc;
1146 
1147 	rc = fsdev_register(fsdev);
1148 	if (rc != 0) {
1149 		return rc;
1150 	}
1151 
1152 	spdk_notify_send("fsdev_register", spdk_fsdev_get_name(fsdev));
1153 	return rc;
1154 }
1155 
1156 struct spdk_fsdev *
1157 spdk_fsdev_desc_get_fsdev(struct spdk_fsdev_desc *desc)
1158 {
1159 	assert(desc != NULL);
1160 	return desc->fsdev;
1161 }
1162 
1163 void
1164 spdk_fsdev_module_list_add(struct spdk_fsdev_module *fsdev_module)
1165 {
1166 
1167 	if (spdk_fsdev_module_list_find(fsdev_module->name)) {
1168 		SPDK_ERRLOG("ERROR: module '%s' already registered.\n", fsdev_module->name);
1169 		assert(false);
1170 	}
1171 
1172 	TAILQ_INSERT_TAIL(&g_fsdev_mgr.fsdev_modules, fsdev_module, internal.tailq);
1173 }
1174 
1175 struct spdk_fsdev_module *
1176 spdk_fsdev_module_list_find(const char *name)
1177 {
1178 	struct spdk_fsdev_module *fsdev_module;
1179 
1180 	TAILQ_FOREACH(fsdev_module, &g_fsdev_mgr.fsdev_modules, internal.tailq) {
1181 		if (strcmp(name, fsdev_module->name) == 0) {
1182 			break;
1183 		}
1184 	}
1185 
1186 	return fsdev_module;
1187 }
1188 
1189 SPDK_LOG_REGISTER_COMPONENT(fsdev)
1190