xref: /spdk/lib/fuse_dispatcher/fuse_dispatcher.c (revision 66289a6dbe28217365daa40fd92dcf327871c2e8)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/event.h"
8 #include "spdk/log.h"
9 #include "spdk/string.h"
10 #include "spdk/fsdev.h"
11 #include "spdk/rpc.h"
12 #include "spdk/env.h"
13 #include "spdk/util.h"
14 #include "spdk/thread.h"
15 #include "spdk/likely.h"
16 #include "spdk_internal/fuse_dispatcher.h"
17 #include "linux/fuse_kernel.h"
18 
19 #ifndef UNUSED
20 #define UNUSED(x) (void)(x)
21 #endif
22 
23 /* TODO: values, see https://libfuse.github.io/doxygen/structfuse__conn__info.html */
24 #define DEFAULT_TIME_GRAN 1
25 #define DEFAULT_MAX_BACKGROUND 1024
26 #define DEFAULT_CONGESTION_THRESHOLD 1024
27 #define DEFAULT_MAX_READAHEAD 0x00020000
28 #define OFFSET_MAX 0x7fffffffffffffffLL
29 
30 /*
31  * NOTE: It appeared that the open flags have different values on the different HW architechtures.
32  *
33  * This code handles the open flags translation in case they're originated from a platform with
34  * a different HW architecture.
35  *
36  * Currently supported:
37  *  - X86
38  *  - X86_64
39  *  - ARM
40  *  - ARM64
41  */
42 /* See https://lxr.missinglinkelectronics.com/linux/arch/arm/include/uapi/asm/fcntl.h */
43 #define ARM_O_DIRECTORY      040000 /* must be a directory */
44 #define ARM_O_NOFOLLOW      0100000 /* don't follow links */
45 #define ARM_O_DIRECT        0200000 /* direct disk access hint - currently ignored */
46 #define ARM_O_LARGEFILE     0400000
47 
48 /* See https://lxr.missinglinkelectronics.com/linux/include/uapi/asm-generic/fcntl.h */
49 #define X86_O_DIRECT        00040000        /* direct disk access hint */
50 #define X86_O_LARGEFILE     00100000
51 #define X86_O_DIRECTORY     00200000        /* must be a directory */
52 #define X86_O_NOFOLLOW      00400000        /* don't follow links */
53 
54 static inline bool
55 fsdev_d2h_open_flags(enum spdk_fuse_arch fuse_arch, uint32_t flags, uint32_t *translated_flags)
56 {
57 	bool res = true;
58 
59 	*translated_flags = flags;
60 
61 	/* NOTE: we always check the original flags to avoid situation where the arch and the native flags
62 	 * overlap and previously set native flag could be interpreted as original arch flag.
63 	 */
64 #define REPLACE_FLAG(arch_flag, native_flag) \
65 	do { \
66 		if (flags & (arch_flag)) { \
67 			*translated_flags &= ~(arch_flag); \
68 			*translated_flags |= (native_flag); \
69 		} \
70 	} while(0)
71 
72 	switch (fuse_arch) {
73 	case SPDK_FUSE_ARCH_NATIVE:
74 #if defined(__x86_64__) || defined(__i386__)
75 	case SPDK_FUSE_ARCH_X86:
76 	case SPDK_FUSE_ARCH_X86_64:
77 #endif
78 #if defined(__aarch64__) || defined(__arm__)
79 	case SPDK_FUSE_ARCH_ARM:
80 	case SPDK_FUSE_ARCH_ARM64:
81 #endif
82 		/* No translation required */
83 		break;
84 #if defined(__x86_64__) || defined(__i386__)
85 	case SPDK_FUSE_ARCH_ARM:
86 	case SPDK_FUSE_ARCH_ARM64:
87 		/* Relace the ARM-specific flags with the native ones */
88 		REPLACE_FLAG(ARM_O_DIRECTORY, O_DIRECTORY);
89 		REPLACE_FLAG(ARM_O_NOFOLLOW, O_NOFOLLOW);
90 		REPLACE_FLAG(ARM_O_DIRECT, O_DIRECT);
91 		REPLACE_FLAG(ARM_O_LARGEFILE, O_LARGEFILE);
92 		break;
93 #endif
94 #if defined(__aarch64__) || defined(__arm__)
95 	case SPDK_FUSE_ARCH_X86:
96 	case SPDK_FUSE_ARCH_X86_64:
97 		/* Relace the X86-specific flags with the native ones */
98 		REPLACE_FLAG(X86_O_DIRECTORY, O_DIRECTORY);
99 		REPLACE_FLAG(X86_O_NOFOLLOW, O_NOFOLLOW);
100 		REPLACE_FLAG(X86_O_DIRECT, O_DIRECT);
101 		REPLACE_FLAG(X86_O_LARGEFILE, O_LARGEFILE);
102 		break;
103 #endif
104 	default:
105 		SPDK_ERRLOG("Unsupported FUSE arch: %d\n", fuse_arch);
106 		assert(0);
107 		res = false;
108 		break;
109 	}
110 
111 #undef REPLACE_FLAG
112 
113 	return res;
114 }
115 
116 struct spdk_fuse_mgr {
117 	struct spdk_mempool *fuse_io_pool;
118 	uint32_t ref_cnt;
119 	pthread_mutex_t lock;
120 };
121 
122 static struct spdk_fuse_mgr g_fuse_mgr = {
123 	.fuse_io_pool = NULL,
124 	.ref_cnt = 0,
125 	.lock = PTHREAD_MUTEX_INITIALIZER,
126 };
127 
128 struct fuse_forget_data {
129 	uint64_t ino;
130 	uint64_t nlookup;
131 };
132 
133 struct iov_offs {
134 	size_t iov_offs;
135 	size_t buf_offs;
136 };
137 
138 struct fuse_io {
139 	/** For SG buffer cases, array of iovecs for input. */
140 	struct iovec *in_iov;
141 
142 	/** For SG buffer cases, number of iovecs in in_iov array. */
143 	int in_iovcnt;
144 
145 	/** For SG buffer cases, array of iovecs for output. */
146 	struct iovec *out_iov;
147 
148 	/** For SG buffer cases, number of iovecs in out_iov array. */
149 	int out_iovcnt;
150 
151 	struct iov_offs in_offs;
152 	struct iov_offs out_offs;
153 
154 	spdk_fuse_dispatcher_submit_cpl_cb cpl_cb;
155 	void *cpl_cb_arg;
156 	struct spdk_io_channel *ch;
157 	struct spdk_fuse_dispatcher *disp;
158 
159 	struct fuse_in_header hdr;
160 	bool in_hdr_with_data;
161 
162 	union {
163 		struct {
164 			struct spdk_thread *thread;
165 			struct fuse_init_in *in;
166 			bool legacy_in;
167 			struct spdk_fsdev_mount_opts opts;
168 			size_t out_len;
169 			int error;
170 		} init;
171 		struct {
172 			bool plus;
173 			uint32_t size;
174 			char *writep;
175 			uint32_t bytes_written;
176 		} readdir;
177 		struct {
178 			uint32_t to_forget;
179 			int status;
180 		} batch_forget;
181 
182 		struct {
183 			int status;
184 		} fsdev_close;
185 	} u;
186 };
187 
188 struct spdk_fuse_dispatcher {
189 	/**
190 	 * fsdev descriptor
191 	 */
192 	struct spdk_fsdev_desc *desc;
193 
194 	/**
195 	 * fsdev thread
196 	 */
197 	struct spdk_thread *fsdev_thread;
198 
199 	/**
200 	 * Major version of the protocol (read-only)
201 	 */
202 	unsigned proto_major;
203 
204 	/**
205 	 * Minor version of the protocol (read-only)
206 	 */
207 	unsigned proto_minor;
208 
209 	/**
210 	 * FUSE request source's architecture
211 	 */
212 	enum spdk_fuse_arch fuse_arch;
213 
214 	/**
215 	 * Root file object
216 	 */
217 	struct spdk_fsdev_file_object *root_fobject;
218 
219 	/**
220 	 * Event callback
221 	 */
222 	spdk_fuse_dispatcher_event_cb event_cb;
223 
224 	/**
225 	 * Event callback's context
226 	 */
227 	void *event_ctx;
228 
229 	/**
230 	 * Name of the underlying fsdev
231 	 *
232 	 * NOTE: must be last
233 	 */
234 	char fsdev_name[];
235 };
236 
237 struct spdk_fuse_dispatcher_channel {
238 	struct spdk_io_channel *fsdev_io_ch;
239 };
240 
241 #define __disp_to_io_dev(disp)	(((char *)disp) + 1)
242 #define __disp_from_io_dev(io_dev)	((struct spdk_fuse_dispatcher *)(((char *)io_dev) - 1))
243 #define __disp_ch_from_io_ch(io_ch)	((struct spdk_fuse_dispatcher_channel *)spdk_io_channel_get_ctx(io_ch))
244 
245 static inline const char *
246 fuse_dispatcher_name(struct spdk_fuse_dispatcher *disp)
247 {
248 	return disp->fsdev_name;
249 }
250 
251 static inline uint64_t
252 file_ino(struct fuse_io *fuse_io, const struct spdk_fsdev_file_object *fobject)
253 {
254 	return (fuse_io->disp->root_fobject == fobject) ? FUSE_ROOT_ID : (uint64_t)(uintptr_t)fobject;
255 }
256 
257 static struct spdk_fsdev_file_object *
258 ino_to_object(struct fuse_io *fuse_io, uint64_t ino)
259 {
260 	return (ino == FUSE_ROOT_ID) ?
261 	       fuse_io->disp->root_fobject :
262 	       (struct spdk_fsdev_file_object *)(uintptr_t)ino;
263 }
264 
265 static struct spdk_fsdev_file_object *
266 file_object(struct fuse_io *fuse_io)
267 {
268 	return ino_to_object(fuse_io, fuse_io->hdr.nodeid);
269 }
270 
271 static inline uint64_t
272 file_fh(const struct spdk_fsdev_file_handle *fhandle)
273 {
274 	return (uint64_t)(uintptr_t)fhandle;
275 }
276 
277 static struct spdk_fsdev_file_handle *
278 file_handle(uint64_t fh)
279 {
280 	return (struct spdk_fsdev_file_handle *)(uintptr_t)fh;
281 }
282 
283 static inline uint16_t
284 fsdev_io_d2h_u16(struct fuse_io *fuse_io, uint16_t v)
285 {
286 	return v;
287 }
288 
289 static inline uint16_t
290 fsdev_io_h2d_u16(struct fuse_io *fuse_io, uint16_t v)
291 {
292 	return v;
293 }
294 
295 static inline uint32_t
296 fsdev_io_d2h_u32(struct fuse_io *fuse_io, uint32_t v)
297 {
298 	return v;
299 }
300 
301 static inline uint32_t
302 fsdev_io_h2d_u32(struct fuse_io *fuse_io, uint32_t v)
303 {
304 	return v;
305 }
306 
307 static inline int32_t
308 fsdev_io_h2d_i32(struct fuse_io *fuse_io, int32_t v)
309 {
310 	return v;
311 }
312 
313 static inline uint64_t
314 fsdev_io_d2h_u64(struct fuse_io *fuse_io, uint64_t v)
315 {
316 	return v;
317 }
318 
319 static inline uint64_t
320 fsdev_io_h2d_u64(struct fuse_io *fuse_io, uint64_t v)
321 {
322 	return v;
323 }
324 
325 static inline unsigned
326 fsdev_io_proto_minor(struct fuse_io *fuse_io)
327 {
328 	return fuse_io->disp->proto_minor;
329 }
330 
331 static inline void *
332 _iov_arr_get_buf_info(struct iovec *iovs, size_t cnt, struct iov_offs *offs, size_t *size)
333 {
334 	struct iovec *iov;
335 
336 	assert(offs->iov_offs <= cnt);
337 
338 	if (offs->iov_offs == cnt) {
339 		assert(!offs->buf_offs);
340 		*size = 0;
341 		return NULL;
342 	}
343 
344 	iov = &iovs[offs->iov_offs];
345 
346 	assert(offs->buf_offs < iov->iov_len);
347 
348 	*size = iov->iov_len - offs->buf_offs;
349 
350 	return ((char *)iov->iov_base) + offs->buf_offs;
351 }
352 
353 static inline void *
354 _iov_arr_get_buf(struct iovec *iovs, size_t cnt, struct iov_offs *offs, size_t size,
355 		 const char *direction)
356 {
357 	char *arg_buf;
358 	size_t arg_size;
359 
360 	arg_buf = _iov_arr_get_buf_info(iovs, cnt, offs, &arg_size);
361 	if (!arg_buf) {
362 		SPDK_INFOLOG(fuse_dispatcher, "No %s arg header attached at %zu:%zu\n", direction, offs->iov_offs,
363 			     offs->buf_offs);
364 		return NULL;
365 	}
366 
367 	if (!arg_size) {
368 		SPDK_INFOLOG(fuse_dispatcher, "%s arg of zero length attached at %zu:%zu\n", direction,
369 			     offs->iov_offs, offs->buf_offs);
370 		return NULL;
371 	}
372 
373 	if (size > arg_size) {
374 		SPDK_INFOLOG(fuse_dispatcher, "%s arg is too small (%zu > %zu) at %zu:%zu\n", direction, size,
375 			     arg_size, offs->iov_offs, offs->buf_offs);
376 		return NULL;
377 	}
378 
379 	if (size == arg_size) {
380 		offs->iov_offs++;
381 		offs->buf_offs = 0;
382 	} else {
383 		offs->buf_offs += size;
384 	}
385 
386 	return arg_buf;
387 }
388 
389 static inline const char *
390 _fsdev_io_in_arg_get_str(struct fuse_io *fuse_io)
391 {
392 	char *arg_buf;
393 	size_t arg_size, len;
394 
395 	arg_buf = _iov_arr_get_buf_info(fuse_io->in_iov, fuse_io->in_iovcnt, &fuse_io->in_offs,
396 					&arg_size);
397 	if (!arg_buf) {
398 		SPDK_ERRLOG("No IN arg header attached at %zu:%zu\n", fuse_io->in_offs.iov_offs,
399 			    fuse_io->in_offs.buf_offs);
400 		return NULL;
401 	}
402 
403 	len = strnlen(arg_buf, arg_size);
404 	if (len == arg_size) {
405 		SPDK_ERRLOG("no string or bad string attached at %zu:%zu\n", fuse_io->in_offs.iov_offs,
406 			    fuse_io->in_offs.buf_offs);
407 		return NULL;
408 	}
409 
410 	fuse_io->in_offs.buf_offs += len + 1;
411 
412 	if (len + 1 == arg_size) {
413 		fuse_io->in_offs.iov_offs++;
414 		fuse_io->in_offs.buf_offs = 0;
415 	}
416 
417 	return arg_buf;
418 }
419 
420 static inline void *
421 _fsdev_io_in_arg_get_buf(struct fuse_io *fuse_io, size_t size)
422 {
423 	return _iov_arr_get_buf(fuse_io->in_iov, fuse_io->in_iovcnt, &fuse_io->in_offs, size, "IN");
424 }
425 
426 
427 static inline void *
428 _fsdev_io_out_arg_get_buf(struct fuse_io *fuse_io, size_t size)
429 {
430 	return _iov_arr_get_buf(fuse_io->out_iov, fuse_io->out_iovcnt, &fuse_io->out_offs, size,
431 				"OUT");
432 }
433 
434 static bool
435 _fuse_op_requires_reply(uint32_t opcode)
436 {
437 	switch (opcode) {
438 	case FUSE_FORGET:
439 	case FUSE_BATCH_FORGET:
440 		return false;
441 	default:
442 		return true;
443 	}
444 }
445 
446 static void
447 convert_stat(struct fuse_io *fuse_io, struct spdk_fsdev_file_object *fobject,
448 	     const struct spdk_fsdev_file_attr *attr, struct fuse_attr *fattr)
449 {
450 	fattr->ino	= fsdev_io_h2d_u64(fuse_io, attr->ino);
451 	fattr->mode	= fsdev_io_h2d_u32(fuse_io, attr->mode);
452 	fattr->nlink	= fsdev_io_h2d_u32(fuse_io, attr->nlink);
453 	fattr->uid	= fsdev_io_h2d_u32(fuse_io, attr->uid);
454 	fattr->gid	= fsdev_io_h2d_u32(fuse_io, attr->gid);
455 	fattr->rdev	= fsdev_io_h2d_u32(fuse_io, attr->rdev);
456 	fattr->size	= fsdev_io_h2d_u64(fuse_io, attr->size);
457 	fattr->blksize	= fsdev_io_h2d_u32(fuse_io, attr->blksize);
458 	fattr->blocks	= fsdev_io_h2d_u64(fuse_io, attr->blocks);
459 	fattr->atime	= fsdev_io_h2d_u64(fuse_io, attr->atime);
460 	fattr->mtime	= fsdev_io_h2d_u64(fuse_io, attr->mtime);
461 	fattr->ctime	= fsdev_io_h2d_u64(fuse_io, attr->ctime);
462 	fattr->atimensec = fsdev_io_h2d_u32(fuse_io, attr->atimensec);
463 	fattr->mtimensec = fsdev_io_h2d_u32(fuse_io, attr->mtimensec);
464 	fattr->ctimensec = fsdev_io_h2d_u32(fuse_io, attr->ctimensec);
465 }
466 
467 static uint32_t
468 calc_timeout_sec(uint32_t ms)
469 {
470 	return ms / 1000;
471 }
472 
473 static uint32_t
474 calc_timeout_nsec(uint32_t ms)
475 {
476 	return (ms % 1000) * 1000000;
477 }
478 
479 static void
480 fill_entry(struct fuse_io *fuse_io, struct fuse_entry_out *arg,
481 	   struct spdk_fsdev_file_object *fobject, const struct spdk_fsdev_file_attr *attr)
482 {
483 	arg->nodeid = fsdev_io_h2d_u64(fuse_io, file_ino(fuse_io, fobject));
484 	arg->generation = 0;
485 	arg->entry_valid = fsdev_io_h2d_u64(fuse_io, calc_timeout_sec(attr->valid_ms));
486 	arg->entry_valid_nsec = fsdev_io_h2d_u32(fuse_io, calc_timeout_nsec(attr->valid_ms));
487 	arg->attr_valid = fsdev_io_h2d_u64(fuse_io, calc_timeout_sec(attr->valid_ms));
488 	arg->attr_valid_nsec = fsdev_io_h2d_u32(fuse_io, calc_timeout_nsec(attr->valid_ms));
489 	convert_stat(fuse_io, fobject, attr, &arg->attr);
490 }
491 
492 static void
493 fill_open(struct fuse_io *fuse_io, struct fuse_open_out *arg,
494 	  struct spdk_fsdev_file_handle *fhandle)
495 {
496 	arg->fh = fsdev_io_h2d_u64(fuse_io, file_fh(fhandle));
497 	arg->open_flags = fsdev_io_h2d_u64(fuse_io, FOPEN_DIRECT_IO);
498 }
499 
500 static void
501 convert_statfs(struct fuse_io *fuse_io, const struct spdk_fsdev_file_statfs *statfs,
502 	       struct fuse_kstatfs *kstatfs)
503 {
504 	kstatfs->bsize	 = fsdev_io_h2d_u32(fuse_io, statfs->bsize);
505 	kstatfs->frsize	 = fsdev_io_h2d_u32(fuse_io, statfs->frsize);
506 	kstatfs->blocks	 = fsdev_io_h2d_u64(fuse_io, statfs->blocks);
507 	kstatfs->bfree	 = fsdev_io_h2d_u64(fuse_io, statfs->bfree);
508 	kstatfs->bavail	 = fsdev_io_h2d_u64(fuse_io, statfs->bavail);
509 	kstatfs->files	 = fsdev_io_h2d_u64(fuse_io, statfs->files);
510 	kstatfs->ffree	 = fsdev_io_h2d_u64(fuse_io, statfs->ffree);
511 	kstatfs->namelen = fsdev_io_h2d_u32(fuse_io, statfs->namelen);
512 }
513 
514 static struct fuse_out_header *
515 fuse_dispatcher_fill_out_hdr(struct fuse_io *fuse_io, size_t out_len, int error)
516 {
517 	struct fuse_out_header *hdr;
518 	struct iovec *out;
519 	uint32_t len;
520 
521 	assert(fuse_io->out_iovcnt >= 1);
522 	assert(error <= 0);
523 
524 	out = fuse_io->out_iov;
525 
526 	if (out->iov_len < sizeof(*hdr)) {
527 		SPDK_ERRLOG("Bad out header len: %zu < %zu\n", out->iov_len, sizeof(*hdr));
528 		return NULL;
529 	}
530 
531 	if (error < -1000) {
532 		SPDK_ERRLOG("Bad completion error value: %" PRIu32 "\n", error);
533 		return NULL;
534 	}
535 
536 	len = sizeof(*hdr) + out_len;
537 
538 	hdr = out->iov_base;
539 	memset(hdr, 0, sizeof(*hdr));
540 
541 	hdr->unique = fsdev_io_h2d_u64(fuse_io, fuse_io->hdr.unique);
542 	hdr->error = fsdev_io_h2d_i32(fuse_io, error);
543 	hdr->len = fsdev_io_h2d_u32(fuse_io, len);
544 
545 	return hdr;
546 }
547 
548 static void
549 fuse_dispatcher_io_complete_final(struct fuse_io *fuse_io, int error)
550 {
551 	spdk_fuse_dispatcher_submit_cpl_cb cpl_cb = fuse_io->cpl_cb;
552 	void *cpl_cb_arg = fuse_io->cpl_cb_arg;
553 
554 	/* NOTE: it's important to free fuse_io before the completion callback,
555 	 * as the callback can destroy the dispatcher
556 	 */
557 	spdk_mempool_put(g_fuse_mgr.fuse_io_pool, fuse_io);
558 
559 	cpl_cb(cpl_cb_arg, error);
560 }
561 
562 static void
563 fuse_dispatcher_io_complete(struct fuse_io *fuse_io, uint32_t out_len, int error)
564 {
565 	struct fuse_out_header *hdr = fuse_dispatcher_fill_out_hdr(fuse_io, out_len, error);
566 
567 	assert(_fuse_op_requires_reply(fuse_io->hdr.opcode));
568 
569 	if (!hdr) {
570 		SPDK_ERRLOG("Completion failed: cannot fill out header\n");
571 		return;
572 	}
573 
574 	SPDK_DEBUGLOG(fuse_dispatcher,
575 		      "Completing IO#%" PRIu64 " (err=%d, out_len=%" PRIu32 ")\n",
576 		      fuse_io->hdr.unique, error, out_len);
577 
578 	fuse_dispatcher_io_complete_final(fuse_io, error);
579 }
580 
581 static void
582 fuse_dispatcher_io_copy_and_complete(struct fuse_io *fuse_io, const void *out, uint32_t out_len,
583 				     int error)
584 {
585 	if (out && out_len) {
586 		void *buf = _fsdev_io_out_arg_get_buf(fuse_io, out_len);
587 		if (buf) {
588 			memcpy(buf, out, out_len);
589 		} else {
590 			SPDK_ERRLOG("Completion failed: cannot get buf to copy %" PRIu32 " bytes\n", out_len);
591 			error = EINVAL;
592 			out_len = 0;
593 		}
594 	}
595 
596 	fuse_dispatcher_io_complete(fuse_io, out_len, error);
597 }
598 
599 static void
600 fuse_dispatcher_io_complete_none(struct fuse_io *fuse_io, int err)
601 {
602 	SPDK_DEBUGLOG(fuse_dispatcher, "Completing IO#%" PRIu64 "(err=%d)\n",
603 		      fuse_io->hdr.unique, err);
604 	fuse_dispatcher_io_complete_final(fuse_io, err);
605 }
606 
607 static void
608 fuse_dispatcher_io_complete_ok(struct fuse_io *fuse_io, uint32_t out_len)
609 {
610 	fuse_dispatcher_io_complete(fuse_io, out_len, 0);
611 }
612 
613 static void
614 fuse_dispatcher_io_complete_err(struct fuse_io *fuse_io, int err)
615 {
616 	fuse_dispatcher_io_complete(fuse_io, 0, err);
617 }
618 
619 static void
620 fuse_dispatcher_io_complete_entry(struct fuse_io *fuse_io, struct spdk_fsdev_file_object *fobject,
621 				  const struct spdk_fsdev_file_attr *attr)
622 {
623 	struct fuse_entry_out arg;
624 	size_t size = fsdev_io_proto_minor(fuse_io) < 9 ?
625 		      FUSE_COMPAT_ENTRY_OUT_SIZE : sizeof(arg);
626 
627 	memset(&arg, 0, sizeof(arg));
628 	fill_entry(fuse_io, &arg, fobject, attr);
629 
630 	fuse_dispatcher_io_copy_and_complete(fuse_io, &arg, size, 0);
631 }
632 
633 static void
634 fuse_dispatcher_io_complete_open(struct fuse_io *fuse_io, struct spdk_fsdev_file_handle *fhandle)
635 {
636 	struct fuse_open_out *arg;
637 
638 	arg = _fsdev_io_out_arg_get_buf(fuse_io, sizeof(*arg));
639 	if (!arg) {
640 		SPDK_ERRLOG("Cannot get fuse_open_out\n");
641 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
642 		return;
643 	}
644 
645 	fill_open(fuse_io, arg, fhandle);
646 
647 	fuse_dispatcher_io_complete_ok(fuse_io, sizeof(*arg));
648 }
649 
650 static void
651 fuse_dispatcher_io_complete_create(struct fuse_io *fuse_io, struct spdk_fsdev_file_object *fobject,
652 				   const struct spdk_fsdev_file_attr *attr,
653 				   struct spdk_fsdev_file_handle *fhandle)
654 {
655 	char buf[sizeof(struct fuse_entry_out) + sizeof(struct fuse_open_out)];
656 	size_t entrysize = fsdev_io_proto_minor(fuse_io) < 9 ?
657 			   FUSE_COMPAT_ENTRY_OUT_SIZE : sizeof(struct fuse_entry_out);
658 	struct fuse_entry_out *earg = (struct fuse_entry_out *) buf;
659 	struct fuse_open_out *oarg = (struct fuse_open_out *)(buf + entrysize);
660 
661 	memset(buf, 0, sizeof(buf));
662 	fill_entry(fuse_io, earg, fobject, attr);
663 	fill_open(fuse_io, oarg, fhandle);
664 
665 	fuse_dispatcher_io_copy_and_complete(fuse_io, buf, entrysize + sizeof(struct fuse_open_out), 0);
666 }
667 
668 static void
669 fuse_dispatcher_io_complete_xattr(struct fuse_io *fuse_io, uint32_t count)
670 {
671 	struct fuse_getxattr_out *arg;
672 
673 	arg = _fsdev_io_out_arg_get_buf(fuse_io, sizeof(*arg));
674 	if (!arg) {
675 		SPDK_ERRLOG("Cannot get fuse_getxattr_out\n");
676 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
677 		return;
678 	}
679 
680 	arg->size = fsdev_io_h2d_i32(fuse_io, count);
681 
682 	fuse_dispatcher_io_complete_ok(fuse_io, sizeof(*arg));
683 }
684 
685 static void
686 fuse_dispatcher_io_complete_write(struct fuse_io *fuse_io, uint32_t data_size, int error)
687 {
688 	struct fuse_write_out *arg;
689 
690 	arg = _fsdev_io_out_arg_get_buf(fuse_io, sizeof(*arg));
691 	if (!arg) {
692 		SPDK_ERRLOG("Cannot get fuse_write_out\n");
693 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
694 		return;
695 	}
696 
697 	arg->size = fsdev_io_d2h_u32(fuse_io, data_size);
698 
699 	fuse_dispatcher_io_complete(fuse_io, sizeof(*arg), error);
700 }
701 
702 static void
703 fuse_dispatcher_io_complete_statfs(struct fuse_io *fuse_io,
704 				   const struct spdk_fsdev_file_statfs *statfs)
705 {
706 	struct fuse_statfs_out arg;
707 	size_t size = fsdev_io_proto_minor(fuse_io) < 4 ?
708 		      FUSE_COMPAT_STATFS_SIZE : sizeof(arg);
709 
710 	memset(&arg, 0, sizeof(arg));
711 	convert_statfs(fuse_io, statfs, &arg.st);
712 
713 	return fuse_dispatcher_io_copy_and_complete(fuse_io, &arg, size, 0);
714 }
715 
716 static void
717 fuse_dispatcher_io_complete_attr(struct fuse_io *fuse_io, const struct spdk_fsdev_file_attr *attr)
718 {
719 	struct fuse_attr_out arg;
720 	size_t size = fsdev_io_proto_minor(fuse_io) < 9 ?
721 		      FUSE_COMPAT_ATTR_OUT_SIZE : sizeof(arg);
722 
723 	memset(&arg, 0, sizeof(arg));
724 	arg.attr_valid = fsdev_io_h2d_u64(fuse_io, calc_timeout_sec(attr->valid_ms));
725 	arg.attr_valid_nsec = fsdev_io_h2d_u32(fuse_io, calc_timeout_nsec(attr->valid_ms));
726 	convert_stat(fuse_io, file_object(fuse_io), attr, &arg.attr);
727 
728 	fuse_dispatcher_io_copy_and_complete(fuse_io, &arg, size, 0);
729 }
730 
731 /* `buf` is allowed to be empty so that the proper size may be
732    allocated by the caller */
733 static size_t
734 fuse_dispatcher_add_direntry(struct fuse_io *fuse_io, char *buf, size_t bufsize,
735 			     const char *name, struct spdk_fsdev_file_object *fobject, const struct spdk_fsdev_file_attr *attr,
736 			     off_t off)
737 {
738 	size_t namelen;
739 	size_t entlen;
740 	size_t entlen_padded;
741 	struct fuse_dirent *dirent;
742 
743 	namelen = strlen(name);
744 	entlen = FUSE_NAME_OFFSET + namelen;
745 	entlen_padded = FUSE_DIRENT_ALIGN(entlen);
746 
747 	if ((buf == NULL) || (entlen_padded > bufsize)) {
748 		return entlen_padded;
749 	}
750 
751 	dirent = (struct fuse_dirent *) buf;
752 	dirent->ino = file_ino(fuse_io, fobject);
753 	dirent->off = fsdev_io_h2d_u64(fuse_io, off);
754 	dirent->namelen = fsdev_io_h2d_u32(fuse_io, namelen);
755 	dirent->type = fsdev_io_h2d_u32(fuse_io, (attr->mode & 0170000) >> 12);
756 	memcpy(dirent->name, name, namelen);
757 	memset(dirent->name + namelen, 0, entlen_padded - entlen);
758 
759 	return entlen_padded;
760 }
761 
762 /* `buf` is allowed to be empty so that the proper size may be
763    allocated by the caller */
764 static size_t
765 fuse_dispatcher_add_direntry_plus(struct fuse_io *fuse_io, char *buf, size_t bufsize,
766 				  const char *name, struct spdk_fsdev_file_object *fobject, const struct spdk_fsdev_file_attr *attr,
767 				  off_t off)
768 {
769 	size_t namelen;
770 	size_t entlen;
771 	size_t entlen_padded;
772 
773 	namelen = strlen(name);
774 	entlen = FUSE_NAME_OFFSET_DIRENTPLUS + namelen;
775 	entlen_padded = FUSE_DIRENT_ALIGN(entlen);
776 	if ((buf == NULL) || (entlen_padded > bufsize)) {
777 		return entlen_padded;
778 	}
779 
780 	struct fuse_direntplus *dp = (struct fuse_direntplus *) buf;
781 	memset(&dp->entry_out, 0, sizeof(dp->entry_out));
782 	fill_entry(fuse_io, &dp->entry_out, fobject, attr);
783 
784 	struct fuse_dirent *dirent = &dp->dirent;
785 	dirent->ino = fsdev_io_h2d_u64(fuse_io, attr->ino);
786 	dirent->off = fsdev_io_h2d_u64(fuse_io, off);
787 	dirent->namelen = fsdev_io_h2d_u32(fuse_io, namelen);
788 	dirent->type = fsdev_io_h2d_u32(fuse_io, (attr->mode & 0170000) >> 12);
789 	memcpy(dirent->name, name, namelen);
790 	memset(dirent->name + namelen, 0, entlen_padded - entlen);
791 
792 	return entlen_padded;
793 }
794 
795 /*
796  * Static FUSE commands handlers
797  */
798 static inline struct spdk_fsdev_desc *
799 fuse_io_desc(struct fuse_io *fuse_io)
800 {
801 	return fuse_io->disp->desc;
802 }
803 
804 static void
805 do_lookup_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status,
806 		  struct spdk_fsdev_file_object *fobject, const struct spdk_fsdev_file_attr *attr)
807 {
808 	struct fuse_io *fuse_io = cb_arg;
809 
810 	if (!status) {
811 		fuse_dispatcher_io_complete_entry(fuse_io, fobject, attr);
812 	} else {
813 		fuse_dispatcher_io_complete_err(fuse_io, status);
814 	}
815 }
816 
817 static void
818 do_lookup(struct fuse_io *fuse_io)
819 {
820 	int err;
821 	const char *name = _fsdev_io_in_arg_get_str(fuse_io);
822 	if (!name) {
823 		SPDK_ERRLOG("No name or bad name attached\n");
824 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
825 		return;
826 	}
827 
828 	err = spdk_fsdev_lookup(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
829 				file_object(fuse_io), name, do_lookup_cpl_clb, fuse_io);
830 	if (err) {
831 		fuse_dispatcher_io_complete_err(fuse_io, err);
832 	}
833 }
834 
835 static void
836 do_forget_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status)
837 {
838 	struct fuse_io *fuse_io = cb_arg;
839 
840 	fuse_dispatcher_io_complete_none(fuse_io, status); /* FUSE_FORGET requires no response */
841 }
842 
843 static void
844 do_forget(struct fuse_io *fuse_io)
845 {
846 	int err;
847 	struct fuse_forget_in *arg;
848 
849 	arg = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*arg));
850 	if (!arg) {
851 		SPDK_ERRLOG("Cannot get fuse_forget_in\n");
852 		fuse_dispatcher_io_complete_none(fuse_io, EINVAL); /* FUSE_FORGET requires no response */
853 		return;
854 	}
855 
856 	err = spdk_fsdev_forget(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
857 				file_object(fuse_io), fsdev_io_d2h_u64(fuse_io, arg->nlookup),
858 				do_forget_cpl_clb, fuse_io);
859 	if (err) {
860 		fuse_dispatcher_io_complete_err(fuse_io, err);
861 	}
862 }
863 
864 static void
865 do_getattr_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status,
866 		   const struct spdk_fsdev_file_attr *attr)
867 {
868 	struct fuse_io *fuse_io = cb_arg;
869 
870 	if (!status) {
871 		fuse_dispatcher_io_complete_attr(fuse_io, attr);
872 	} else {
873 		fuse_dispatcher_io_complete_err(fuse_io, status);
874 	}
875 }
876 
877 static void
878 do_getattr(struct fuse_io *fuse_io)
879 {
880 	int err;
881 	uint64_t fh = 0;
882 
883 	if (fsdev_io_proto_minor(fuse_io) >= 9) {
884 		struct fuse_getattr_in *arg;
885 
886 		arg = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*arg));
887 		if (!arg) {
888 			SPDK_ERRLOG("Cannot get fuse_getattr_in\n");
889 			fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
890 			return;
891 		}
892 
893 		if (fsdev_io_d2h_u64(fuse_io, arg->getattr_flags) & FUSE_GETATTR_FH) {
894 			fh = fsdev_io_d2h_u64(fuse_io, arg->fh);
895 		}
896 	}
897 
898 	err = spdk_fsdev_getattr(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
899 				 file_object(fuse_io), file_handle(fh), do_getattr_cpl_clb, fuse_io);
900 	if (err) {
901 		fuse_dispatcher_io_complete_err(fuse_io, err);
902 	}
903 }
904 
905 static void
906 do_setattr_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status,
907 		   const struct spdk_fsdev_file_attr *attr)
908 {
909 	struct fuse_io *fuse_io = cb_arg;
910 
911 	if (!status) {
912 		fuse_dispatcher_io_complete_attr(fuse_io, attr);
913 	} else {
914 		fuse_dispatcher_io_complete_err(fuse_io, status);
915 	}
916 }
917 
918 static void
919 do_setattr(struct fuse_io *fuse_io)
920 {
921 	int err;
922 	struct fuse_setattr_in *arg;
923 	uint32_t valid;
924 	uint64_t fh = 0;
925 	struct spdk_fsdev_file_attr attr;
926 
927 	arg = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*arg));
928 	if (!arg) {
929 		SPDK_ERRLOG("Cannot get fuse_setattr_in\n");
930 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
931 		return;
932 	}
933 
934 	memset(&attr, 0, sizeof(attr));
935 	attr.mode      = fsdev_io_d2h_u32(fuse_io, arg->mode);
936 	attr.uid       = fsdev_io_d2h_u32(fuse_io, arg->uid);
937 	attr.gid       = fsdev_io_d2h_u32(fuse_io, arg->gid);
938 	attr.size      = fsdev_io_d2h_u64(fuse_io, arg->size);
939 	attr.atime     = fsdev_io_d2h_u64(fuse_io, arg->atime);
940 	attr.mtime     = fsdev_io_d2h_u64(fuse_io, arg->mtime);
941 	attr.ctime     = fsdev_io_d2h_u64(fuse_io, arg->ctime);
942 	attr.atimensec = fsdev_io_d2h_u32(fuse_io, arg->atimensec);
943 	attr.mtimensec = fsdev_io_d2h_u32(fuse_io, arg->mtimensec);
944 	attr.ctimensec = fsdev_io_d2h_u32(fuse_io, arg->ctimensec);
945 
946 	valid = fsdev_io_d2h_u64(fuse_io, arg->valid);
947 	if (valid & FATTR_FH) {
948 		valid &= ~FATTR_FH;
949 		fh = fsdev_io_d2h_u64(fuse_io, arg->fh);
950 	}
951 
952 	valid &=
953 		FSDEV_SET_ATTR_MODE |
954 		FSDEV_SET_ATTR_UID |
955 		FSDEV_SET_ATTR_GID |
956 		FSDEV_SET_ATTR_SIZE |
957 		FSDEV_SET_ATTR_ATIME |
958 		FSDEV_SET_ATTR_MTIME |
959 		FSDEV_SET_ATTR_ATIME_NOW |
960 		FSDEV_SET_ATTR_MTIME_NOW |
961 		FSDEV_SET_ATTR_CTIME;
962 
963 	err = spdk_fsdev_setattr(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
964 				 file_object(fuse_io), file_handle(fh), &attr, valid,
965 				 do_setattr_cpl_clb, fuse_io);
966 	if (err) {
967 		fuse_dispatcher_io_complete_err(fuse_io, err);
968 	}
969 }
970 
971 static void
972 do_readlink_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status, const char *linkname)
973 {
974 	struct fuse_io *fuse_io = cb_arg;
975 
976 	if (!status) {
977 		fuse_dispatcher_io_copy_and_complete(fuse_io, linkname, strlen(linkname) + 1, 0);
978 	} else {
979 		fuse_dispatcher_io_complete_err(fuse_io, status);
980 	}
981 }
982 
983 static void
984 do_readlink(struct fuse_io *fuse_io)
985 {
986 	int err;
987 
988 	err = spdk_fsdev_readlink(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
989 				  file_object(fuse_io), do_readlink_cpl_clb, fuse_io);
990 	if (err) {
991 		fuse_dispatcher_io_complete_err(fuse_io, err);
992 	}
993 }
994 
995 static void
996 do_symlink_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status,
997 		   struct spdk_fsdev_file_object *fobject, const struct spdk_fsdev_file_attr *attr)
998 {
999 	struct fuse_io *fuse_io = cb_arg;
1000 
1001 	if (!status) {
1002 		fuse_dispatcher_io_complete_entry(fuse_io, fobject, attr);
1003 	} else {
1004 		fuse_dispatcher_io_complete_err(fuse_io, status);
1005 	}
1006 }
1007 
1008 static void
1009 do_symlink(struct fuse_io *fuse_io)
1010 {
1011 	int err;
1012 	const char *name, *linkname;
1013 
1014 	name = _fsdev_io_in_arg_get_str(fuse_io);
1015 	if (!name) {
1016 		SPDK_ERRLOG("Cannot get name\n");
1017 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1018 		return;
1019 	}
1020 
1021 	linkname = _fsdev_io_in_arg_get_str(fuse_io);
1022 	if (!linkname) {
1023 		SPDK_ERRLOG("Cannot get linkname\n");
1024 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1025 		return;
1026 	}
1027 
1028 	err = spdk_fsdev_symlink(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
1029 				 file_object(fuse_io), name, linkname, fuse_io->hdr.uid, fuse_io->hdr.gid,
1030 				 do_symlink_cpl_clb, fuse_io);
1031 	if (err) {
1032 		fuse_dispatcher_io_complete_err(fuse_io, err);
1033 	}
1034 }
1035 
1036 static void
1037 do_mknod_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status,
1038 		 struct spdk_fsdev_file_object *fobject, const struct spdk_fsdev_file_attr *attr)
1039 {
1040 	struct fuse_io *fuse_io = cb_arg;
1041 
1042 	if (!status) {
1043 		fuse_dispatcher_io_complete_entry(fuse_io, fobject, attr);
1044 	} else {
1045 		fuse_dispatcher_io_complete_err(fuse_io, status);
1046 	}
1047 }
1048 
1049 static void
1050 do_mknod(struct fuse_io *fuse_io)
1051 {
1052 	int err;
1053 	bool compat = fsdev_io_proto_minor(fuse_io) < 12;
1054 	struct fuse_mknod_in *arg;
1055 	const char *name;
1056 
1057 	arg = _fsdev_io_in_arg_get_buf(fuse_io, compat ? FUSE_COMPAT_MKNOD_IN_SIZE : sizeof(*arg));
1058 	if (!arg) {
1059 		SPDK_ERRLOG("Cannot get fuse_mknod_in (compat=%d)\n", compat);
1060 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1061 		return;
1062 	}
1063 
1064 	name = _fsdev_io_in_arg_get_str(fuse_io);
1065 	if (!name) {
1066 		SPDK_ERRLOG("Cannot get name (compat=%d)\n", compat);
1067 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1068 		return;
1069 	}
1070 
1071 	err = spdk_fsdev_mknod(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
1072 			       file_object(fuse_io), name, fsdev_io_d2h_u32(fuse_io, arg->mode),
1073 			       fsdev_io_d2h_u32(fuse_io, arg->rdev), fuse_io->hdr.uid, fuse_io->hdr.gid,
1074 			       do_mknod_cpl_clb, fuse_io);
1075 	if (err) {
1076 		fuse_dispatcher_io_complete_err(fuse_io, err);
1077 	}
1078 }
1079 
1080 static void
1081 do_mkdir_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status,
1082 		 struct spdk_fsdev_file_object *fobject, const struct spdk_fsdev_file_attr *attr)
1083 {
1084 	struct fuse_io *fuse_io = cb_arg;
1085 
1086 	if (!status) {
1087 		fuse_dispatcher_io_complete_entry(fuse_io, fobject, attr);
1088 	} else {
1089 		fuse_dispatcher_io_complete_err(fuse_io, status);
1090 	}
1091 }
1092 
1093 static void
1094 do_mkdir(struct fuse_io *fuse_io)
1095 {
1096 	int err;
1097 	bool compat = fsdev_io_proto_minor(fuse_io) < 12;
1098 	struct fuse_mkdir_in *arg;
1099 	const char *name;
1100 
1101 	arg = _fsdev_io_in_arg_get_buf(fuse_io, compat ? sizeof(uint32_t) : sizeof(*arg));
1102 	if (!arg) {
1103 		SPDK_ERRLOG("Cannot get fuse_mkdir_in (compat=%d)\n", compat);
1104 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1105 		return;
1106 	}
1107 
1108 	name = _fsdev_io_in_arg_get_str(fuse_io);
1109 	if (!name) {
1110 		SPDK_ERRLOG("Cannot get name (compat=%d)\n", compat);
1111 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1112 		return;
1113 	}
1114 
1115 	err = spdk_fsdev_mkdir(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
1116 			       file_object(fuse_io), name, fsdev_io_d2h_u32(fuse_io, arg->mode),
1117 			       fuse_io->hdr.uid, fuse_io->hdr.gid, do_mkdir_cpl_clb, fuse_io);
1118 	if (err) {
1119 		fuse_dispatcher_io_complete_err(fuse_io, err);
1120 	}
1121 }
1122 
1123 static void
1124 do_unlink_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status)
1125 {
1126 	struct fuse_io *fuse_io = cb_arg;
1127 
1128 	fuse_dispatcher_io_complete_err(fuse_io, status);
1129 }
1130 
1131 static void
1132 do_unlink(struct fuse_io *fuse_io)
1133 {
1134 	int err;
1135 	const char *name;
1136 
1137 	name = _fsdev_io_in_arg_get_str(fuse_io);
1138 	if (!name) {
1139 		SPDK_ERRLOG("Cannot get name\n");
1140 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1141 		return;
1142 	}
1143 
1144 	err = spdk_fsdev_unlink(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
1145 				file_object(fuse_io), name, do_unlink_cpl_clb, fuse_io);
1146 	if (err) {
1147 		fuse_dispatcher_io_complete_err(fuse_io, err);
1148 	}
1149 }
1150 
1151 static void
1152 do_rmdir_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status)
1153 {
1154 	struct fuse_io *fuse_io = cb_arg;
1155 
1156 	fuse_dispatcher_io_complete_err(fuse_io, status);
1157 }
1158 
1159 static void
1160 do_rmdir(struct fuse_io *fuse_io)
1161 {
1162 	int err;
1163 	const char *name;
1164 
1165 	name = _fsdev_io_in_arg_get_str(fuse_io);
1166 	if (!name) {
1167 		SPDK_ERRLOG("Cannot get name\n");
1168 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1169 		return;
1170 	}
1171 
1172 	err = spdk_fsdev_rmdir(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
1173 			       file_object(fuse_io), name, do_rmdir_cpl_clb, fuse_io);
1174 	if (err) {
1175 		fuse_dispatcher_io_complete_err(fuse_io, err);
1176 	}
1177 }
1178 
1179 static void
1180 do_rename_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status)
1181 {
1182 	struct fuse_io *fuse_io = cb_arg;
1183 
1184 	fuse_dispatcher_io_complete_err(fuse_io, status);
1185 }
1186 
1187 static void
1188 do_rename_common(struct fuse_io *fuse_io, bool version2)
1189 {
1190 	int err;
1191 	uint64_t newdir;
1192 	const char *oldname;
1193 	const char *newname;
1194 	uint32_t flags = 0;
1195 
1196 	if (!version2) {
1197 		struct fuse_rename_in *arg;
1198 		arg = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*arg));
1199 		if (!arg) {
1200 			SPDK_ERRLOG("Cannot get fuse_rename_in\n");
1201 			fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1202 			return;
1203 		}
1204 		newdir = fsdev_io_d2h_u64(fuse_io, arg->newdir);
1205 	} else {
1206 		struct fuse_rename2_in *arg;
1207 		arg = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*arg));
1208 		if (!arg) {
1209 			SPDK_ERRLOG("Cannot get fuse_rename2_in\n");
1210 			fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1211 			return;
1212 		}
1213 		newdir = fsdev_io_d2h_u64(fuse_io, arg->newdir);
1214 		flags = fsdev_io_d2h_u64(fuse_io, arg->flags);
1215 	}
1216 
1217 	oldname = _fsdev_io_in_arg_get_str(fuse_io);
1218 	if (!oldname) {
1219 		SPDK_ERRLOG("Cannot get oldname\n");
1220 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1221 		return;
1222 	}
1223 
1224 	newname = _fsdev_io_in_arg_get_str(fuse_io);
1225 	if (!newname) {
1226 		SPDK_ERRLOG("Cannot get newname\n");
1227 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1228 		return;
1229 	}
1230 
1231 	err = spdk_fsdev_rename(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
1232 				file_object(fuse_io), oldname, ino_to_object(fuse_io, newdir),
1233 				newname, flags, do_rename_cpl_clb, fuse_io);
1234 	if (err) {
1235 		fuse_dispatcher_io_complete_err(fuse_io, err);
1236 	}
1237 }
1238 
1239 static void
1240 do_rename(struct fuse_io *fuse_io)
1241 {
1242 	do_rename_common(fuse_io, false);
1243 }
1244 
1245 static void
1246 do_rename2(struct fuse_io *fuse_io)
1247 {
1248 	do_rename_common(fuse_io, true);
1249 }
1250 
1251 static void
1252 do_link_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status,
1253 		struct spdk_fsdev_file_object *fobject, const struct spdk_fsdev_file_attr *attr)
1254 {
1255 	struct fuse_io *fuse_io = cb_arg;
1256 
1257 	if (!status) {
1258 		fuse_dispatcher_io_complete_entry(fuse_io, fobject, attr);
1259 	} else {
1260 		fuse_dispatcher_io_complete_err(fuse_io, status);
1261 	}
1262 }
1263 
1264 static void
1265 do_link(struct fuse_io *fuse_io)
1266 {
1267 	int err;
1268 	struct fuse_link_in *arg;
1269 	const char *name;
1270 	uint64_t oldnodeid;
1271 
1272 	arg = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*arg));
1273 	if (!arg) {
1274 		SPDK_ERRLOG("Cannot get fuse_link_in\n");
1275 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1276 		return;
1277 	}
1278 
1279 	name = _fsdev_io_in_arg_get_str(fuse_io);
1280 	if (!name) {
1281 		SPDK_ERRLOG("Cannot get name\n");
1282 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1283 		return;
1284 	}
1285 
1286 	oldnodeid = fsdev_io_d2h_u64(fuse_io, arg->oldnodeid);
1287 
1288 	err = spdk_fsdev_link(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
1289 			      ino_to_object(fuse_io, oldnodeid), file_object(fuse_io), name,
1290 			      do_link_cpl_clb, fuse_io);
1291 	if (err) {
1292 		fuse_dispatcher_io_complete_err(fuse_io, err);
1293 	}
1294 }
1295 
1296 static void
1297 do_fopen_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status,
1298 		 struct spdk_fsdev_file_handle *fhandle)
1299 {
1300 	struct fuse_io *fuse_io = cb_arg;
1301 
1302 	if (!status) {
1303 		fuse_dispatcher_io_complete_open(fuse_io, fhandle);
1304 	} else {
1305 		fuse_dispatcher_io_complete_err(fuse_io, status);
1306 	}
1307 }
1308 
1309 static void
1310 do_open(struct fuse_io *fuse_io)
1311 {
1312 	struct spdk_fuse_dispatcher *disp = fuse_io->disp;
1313 	int err;
1314 	struct fuse_open_in *arg;
1315 	uint32_t flags;
1316 
1317 	arg = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*arg));
1318 	if (!arg) {
1319 		SPDK_ERRLOG("Cannot get fuse_forget_in\n");
1320 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1321 		return;
1322 	}
1323 
1324 	if (!fsdev_d2h_open_flags(disp->fuse_arch, fsdev_io_d2h_u32(fuse_io, arg->flags), &flags)) {
1325 		SPDK_ERRLOG("Cannot translate flags\n");
1326 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1327 		return;
1328 	}
1329 
1330 	err = spdk_fsdev_fopen(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
1331 			       file_object(fuse_io), flags,
1332 			       do_fopen_cpl_clb, fuse_io);
1333 	if (err) {
1334 		fuse_dispatcher_io_complete_err(fuse_io, err);
1335 	}
1336 }
1337 
1338 static void
1339 do_read_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status, uint32_t data_size)
1340 {
1341 	struct fuse_io *fuse_io = cb_arg;
1342 
1343 	fuse_dispatcher_io_complete(fuse_io, data_size, status);
1344 }
1345 
1346 static void
1347 do_read(struct fuse_io *fuse_io)
1348 {
1349 	int err;
1350 	bool compat = fsdev_io_proto_minor(fuse_io) < 9;
1351 	struct fuse_read_in *arg;
1352 	uint64_t fh;
1353 	uint32_t flags = 0;
1354 
1355 	arg = _fsdev_io_in_arg_get_buf(fuse_io,
1356 				       compat ? offsetof(struct fuse_read_in, lock_owner) : sizeof(*arg));
1357 	if (!arg) {
1358 		SPDK_ERRLOG("Cannot get fuse_read_in\n");
1359 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1360 		return;
1361 	}
1362 
1363 
1364 	if (!compat) {
1365 		flags = fsdev_io_d2h_u32(fuse_io, arg->flags);
1366 	}
1367 
1368 	fh = fsdev_io_d2h_u64(fuse_io, arg->fh);
1369 
1370 	err = spdk_fsdev_read(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
1371 			      file_object(fuse_io), file_handle(fh),
1372 			      fsdev_io_d2h_u32(fuse_io, arg->size), fsdev_io_d2h_u64(fuse_io, arg->offset),
1373 			      flags, fuse_io->out_iov + 1, fuse_io->out_iovcnt - 1, NULL,
1374 			      do_read_cpl_clb, fuse_io);
1375 	if (err) {
1376 		fuse_dispatcher_io_complete_err(fuse_io, err);
1377 	}
1378 }
1379 
1380 static void
1381 do_write_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status, uint32_t data_size)
1382 {
1383 	struct fuse_io *fuse_io = cb_arg;
1384 
1385 	fuse_dispatcher_io_complete_write(fuse_io, data_size, status);
1386 }
1387 
1388 static void
1389 do_write(struct fuse_io *fuse_io)
1390 {
1391 	int err;
1392 	bool compat = fsdev_io_proto_minor(fuse_io) < 9;
1393 	struct fuse_write_in *arg;
1394 	uint64_t fh;
1395 	uint64_t flags = 0;
1396 
1397 	arg = _fsdev_io_in_arg_get_buf(fuse_io,
1398 				       compat ? FUSE_COMPAT_WRITE_IN_SIZE : sizeof(*arg));
1399 	if (!arg) {
1400 		SPDK_ERRLOG("Cannot get fuse_write_in\n");
1401 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1402 		return;
1403 	}
1404 
1405 	if (fuse_io->in_offs.buf_offs) {
1406 		SPDK_ERRLOG("Data IOVs should be separate from the header IOV\n");
1407 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1408 		return;
1409 	}
1410 
1411 	if (!compat) {
1412 		flags = fsdev_io_d2h_u32(fuse_io, arg->flags);
1413 	}
1414 
1415 	fh = fsdev_io_d2h_u64(fuse_io, arg->fh);
1416 
1417 	err = spdk_fsdev_write(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
1418 			       file_object(fuse_io), file_handle(fh),
1419 			       fsdev_io_d2h_u32(fuse_io, arg->size), fsdev_io_d2h_u64(fuse_io, arg->offset),
1420 			       flags, fuse_io->in_iov + fuse_io->in_offs.iov_offs, fuse_io->in_iovcnt - fuse_io->in_offs.iov_offs,
1421 			       NULL, do_write_cpl_clb, fuse_io);
1422 	if (err) {
1423 		fuse_dispatcher_io_complete_err(fuse_io, err);
1424 	}
1425 }
1426 
1427 static void
1428 do_statfs_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status,
1429 		  const struct spdk_fsdev_file_statfs *statfs)
1430 {
1431 	struct fuse_io *fuse_io = cb_arg;
1432 
1433 	if (!status) {
1434 		fuse_dispatcher_io_complete_statfs(fuse_io, statfs);
1435 	} else {
1436 		fuse_dispatcher_io_complete_err(fuse_io, status);
1437 	}
1438 }
1439 
1440 static void
1441 do_statfs(struct fuse_io *fuse_io)
1442 {
1443 	int err;
1444 
1445 	err = spdk_fsdev_statfs(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
1446 				file_object(fuse_io), do_statfs_cpl_clb, fuse_io);
1447 	if (err) {
1448 		fuse_dispatcher_io_complete_err(fuse_io, err);
1449 	}
1450 }
1451 
1452 static void
1453 do_release_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status)
1454 {
1455 	struct fuse_io *fuse_io = cb_arg;
1456 
1457 	fuse_dispatcher_io_complete_err(fuse_io, status);
1458 }
1459 
1460 static void
1461 do_release(struct fuse_io *fuse_io)
1462 {
1463 	int err;
1464 	bool compat = fsdev_io_proto_minor(fuse_io) < 8;
1465 	struct fuse_release_in *arg;
1466 	uint64_t fh;
1467 
1468 	arg = _fsdev_io_in_arg_get_buf(fuse_io,
1469 				       compat ? offsetof(struct fuse_release_in, lock_owner) : sizeof(*arg));
1470 	if (!arg) {
1471 		SPDK_ERRLOG("Cannot get fuse_release_in\n");
1472 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1473 		return;
1474 	}
1475 
1476 	fh = fsdev_io_d2h_u64(fuse_io, arg->fh);
1477 
1478 	err = spdk_fsdev_release(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
1479 				 file_object(fuse_io), file_handle(fh),
1480 				 do_release_cpl_clb, fuse_io);
1481 	if (err) {
1482 		fuse_dispatcher_io_complete_err(fuse_io, err);
1483 	}
1484 }
1485 
1486 static void
1487 do_fsync_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status)
1488 {
1489 	struct fuse_io *fuse_io = cb_arg;
1490 
1491 	fuse_dispatcher_io_complete_err(fuse_io, status);
1492 }
1493 
1494 static void
1495 do_fsync(struct fuse_io *fuse_io)
1496 {
1497 	int err;
1498 	struct fuse_fsync_in *arg;
1499 	uint64_t fh;
1500 	bool datasync;
1501 
1502 	arg = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*arg));
1503 	if (!arg) {
1504 		SPDK_ERRLOG("Cannot get fuse_fsync_in\n");
1505 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1506 		return;
1507 	}
1508 
1509 	fh = fsdev_io_d2h_u64(fuse_io, arg->fh);
1510 	datasync = (fsdev_io_d2h_u32(fuse_io, arg->fsync_flags) & 1) ? true : false;
1511 
1512 	err = spdk_fsdev_fsync(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
1513 			       file_object(fuse_io), file_handle(fh), datasync,
1514 			       do_fsync_cpl_clb, fuse_io);
1515 	if (err) {
1516 		fuse_dispatcher_io_complete_err(fuse_io, err);
1517 	}
1518 }
1519 
1520 static void
1521 do_setxattr_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status)
1522 {
1523 	struct fuse_io *fuse_io = cb_arg;
1524 
1525 	fuse_dispatcher_io_complete_err(fuse_io, status);
1526 }
1527 
1528 static void
1529 do_setxattr(struct fuse_io *fuse_io)
1530 {
1531 	int err;
1532 	struct fuse_setxattr_in *arg;
1533 	const char *name;
1534 	const char *value;
1535 	uint32_t size;
1536 
1537 	arg = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*arg));
1538 	if (!arg) {
1539 		SPDK_ERRLOG("Cannot get fuse_setxattr_in\n");
1540 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1541 		return;
1542 	}
1543 
1544 	name = _fsdev_io_in_arg_get_str(fuse_io);
1545 	if (!name) {
1546 		SPDK_ERRLOG("Cannot get name\n");
1547 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1548 		return;
1549 	}
1550 
1551 	size = fsdev_io_d2h_u32(fuse_io, arg->size);
1552 	value = _fsdev_io_in_arg_get_buf(fuse_io, size);
1553 	if (!value) {
1554 		SPDK_ERRLOG("Cannot get value of %" PRIu32 " bytes\n", size);
1555 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1556 		return;
1557 	}
1558 
1559 	err = spdk_fsdev_setxattr(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
1560 				  file_object(fuse_io), name, value, size, fsdev_io_d2h_u32(fuse_io, arg->flags),
1561 				  do_setxattr_cpl_clb, fuse_io);
1562 	if (err) {
1563 		fuse_dispatcher_io_complete_err(fuse_io, err);
1564 	}
1565 }
1566 
1567 static void
1568 do_getxattr_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status, size_t value_size)
1569 {
1570 	struct fuse_io *fuse_io = cb_arg;
1571 
1572 	if (!status) {
1573 		fuse_dispatcher_io_complete_xattr(fuse_io, value_size);
1574 	} else {
1575 		fuse_dispatcher_io_complete_err(fuse_io, status);
1576 	}
1577 }
1578 
1579 static void
1580 do_getxattr(struct fuse_io *fuse_io)
1581 {
1582 	int err;
1583 	struct fuse_getxattr_in *arg;
1584 	const char *name;
1585 	char *buff;
1586 	uint32_t size;
1587 	struct iov_offs out_offs_bu;
1588 
1589 	arg = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*arg));
1590 	if (!arg) {
1591 		SPDK_ERRLOG("Cannot get fuse_getxattr_in\n");
1592 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1593 		return;
1594 	}
1595 
1596 	name = _fsdev_io_in_arg_get_str(fuse_io);
1597 	if (!name) {
1598 		SPDK_ERRLOG("Cannot get name\n");
1599 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1600 		return;
1601 	}
1602 
1603 	if (fuse_io->out_iovcnt < 2) {
1604 		SPDK_ERRLOG("No buffer to getxattr\n");
1605 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1606 		return;
1607 	}
1608 
1609 	size = fsdev_io_d2h_u32(fuse_io, arg->size);
1610 
1611 	/* NOTE: we want to avoid an additionl allocation and copy and put the xattr directly to the buffer provided in out_iov.
1612 	 * In order to do so we have to preserve the out_offs, advance it to get the buffer pointer and then restore to allow
1613 	 * the fuse_dispatcher_io_complete_xattr() to fill the fuse_getxattr_out which precedes this buffer.
1614 	 */
1615 	out_offs_bu = fuse_io->out_offs; /* Preserve the out offset */
1616 
1617 	/* Skip the fuse_getxattr_out */
1618 	_fsdev_io_out_arg_get_buf(fuse_io, sizeof(struct fuse_getxattr_out));
1619 	size -= sizeof(struct fuse_getxattr_out);
1620 
1621 	buff = _fsdev_io_out_arg_get_buf(fuse_io, size); /* Get the buffer for the xattr */
1622 	if (!buff) {
1623 		SPDK_INFOLOG(fuse_dispatcher, "NULL buffer, probably asking for the size\n");
1624 		size = 0;
1625 	}
1626 
1627 	fuse_io->out_offs = out_offs_bu; /* Restore the out offset */
1628 
1629 	err = spdk_fsdev_getxattr(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
1630 				  file_object(fuse_io), name, buff, size,
1631 				  do_getxattr_cpl_clb, fuse_io);
1632 	if (err) {
1633 		fuse_dispatcher_io_complete_err(fuse_io, err);
1634 	}
1635 }
1636 
1637 static void
1638 do_listxattr_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status, size_t size,
1639 		     bool size_only)
1640 {
1641 	struct fuse_io *fuse_io = cb_arg;
1642 
1643 	if (status) {
1644 		fuse_dispatcher_io_complete_err(fuse_io, status);
1645 	} else if (size_only) {
1646 		fuse_dispatcher_io_complete_xattr(fuse_io, size);
1647 	} else {
1648 		fuse_dispatcher_io_complete_ok(fuse_io, size);
1649 	}
1650 }
1651 
1652 static void
1653 do_listxattr(struct fuse_io *fuse_io)
1654 {
1655 	int err;
1656 	struct fuse_getxattr_in *arg;
1657 	struct iovec *iov;
1658 	uint32_t size;
1659 
1660 	arg = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*arg));
1661 	if (!arg) {
1662 		SPDK_ERRLOG("Cannot get fuse_getxattr_in\n");
1663 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1664 		return;
1665 	}
1666 
1667 	size = fsdev_io_d2h_u32(fuse_io, arg->size);
1668 	iov = fuse_io->out_iov + 1;
1669 	if (iov->iov_len < size) {
1670 		SPDK_ERRLOG("Wrong iov len (%zu < %" PRIu32")\n", iov->iov_len, size);
1671 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1672 		return;
1673 	}
1674 
1675 	err = spdk_fsdev_listxattr(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
1676 				   file_object(fuse_io), iov->iov_base, size,
1677 				   do_listxattr_cpl_clb, fuse_io);
1678 	if (err) {
1679 		fuse_dispatcher_io_complete_err(fuse_io, err);
1680 	}
1681 }
1682 
1683 static void
1684 do_removexattr_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status)
1685 {
1686 	struct fuse_io *fuse_io = cb_arg;
1687 
1688 	fuse_dispatcher_io_complete_err(fuse_io, status);
1689 }
1690 
1691 static void
1692 do_removexattr(struct fuse_io *fuse_io)
1693 {
1694 	int err;
1695 	const char *name = _fsdev_io_in_arg_get_str(fuse_io);
1696 
1697 	if (!name) {
1698 		SPDK_ERRLOG("Cannot get name\n");
1699 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1700 		return;
1701 	}
1702 
1703 	err = spdk_fsdev_removexattr(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
1704 				     file_object(fuse_io), name, do_removexattr_cpl_clb, fuse_io);
1705 	if (err) {
1706 		fuse_dispatcher_io_complete_err(fuse_io, err);
1707 	}
1708 }
1709 
1710 static void
1711 do_flush_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status)
1712 {
1713 	struct fuse_io *fuse_io = cb_arg;
1714 
1715 	fuse_dispatcher_io_complete_err(fuse_io, status);
1716 }
1717 
1718 static void
1719 do_flush(struct fuse_io *fuse_io)
1720 {
1721 	int err;
1722 	bool compat = fsdev_io_proto_minor(fuse_io) < 7;
1723 	struct fuse_flush_in *arg;
1724 	uint64_t fh;
1725 
1726 	arg = _fsdev_io_in_arg_get_buf(fuse_io,
1727 				       compat ? offsetof(struct fuse_flush_in, lock_owner) : sizeof(*arg));
1728 	if (!arg) {
1729 		SPDK_ERRLOG("Cannot get fuse_flush_in\n");
1730 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1731 		return;
1732 	}
1733 
1734 	fh = fsdev_io_d2h_u64(fuse_io, arg->fh);
1735 
1736 	err = spdk_fsdev_flush(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
1737 			       file_object(fuse_io), file_handle(fh),
1738 			       do_flush_cpl_clb, fuse_io);
1739 	if (err) {
1740 		fuse_dispatcher_io_complete_err(fuse_io, err);
1741 	}
1742 }
1743 
1744 static void
1745 do_mount_rollback_cpl_clb(void *cb_arg, struct spdk_io_channel *ch)
1746 {
1747 	struct fuse_io *fuse_io = cb_arg;
1748 	struct spdk_fuse_dispatcher *disp = fuse_io->disp;
1749 
1750 	UNUSED(disp);
1751 
1752 	SPDK_DEBUGLOG(fuse_dispatcher, "%s unmounted\n", fuse_dispatcher_name(disp));
1753 
1754 	/* The IO is FUSE_INIT, so we complete it with the appropriate error */
1755 	fuse_dispatcher_io_complete_err(fuse_io, fuse_io->u.init.error);
1756 }
1757 
1758 static void fuse_dispatcher_mount_rollback_msg(void *ctx);
1759 
1760 static void
1761 fuse_dispatcher_mount_rollback(struct fuse_io *fuse_io)
1762 {
1763 	struct spdk_fuse_dispatcher *disp = fuse_io->disp;
1764 	int rc;
1765 
1766 	rc = spdk_fsdev_umount(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
1767 			       do_mount_rollback_cpl_clb, fuse_io);
1768 	if (rc) {
1769 		/* It can only fail due to a lack of the IO objects, so we retry until one of them will be available */
1770 		SPDK_WARNLOG("%s: umount cannot be initiated (err=%d). Retrying...\n",
1771 			     fuse_dispatcher_name(disp), rc);
1772 		spdk_thread_send_msg(spdk_get_thread(), fuse_dispatcher_mount_rollback_msg, fuse_io);
1773 	}
1774 }
1775 
1776 static void
1777 fuse_dispatcher_mount_rollback_msg(void *ctx)
1778 {
1779 	struct fuse_io *fuse_io = ctx;
1780 
1781 	fuse_dispatcher_mount_rollback(fuse_io);
1782 }
1783 
1784 static void
1785 fuse_dispatcher_fsdev_remove_put_channel(struct spdk_io_channel_iter *i)
1786 {
1787 	struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i);
1788 	struct spdk_fuse_dispatcher_channel *ch = __disp_ch_from_io_ch(io_ch);
1789 
1790 	assert(ch->fsdev_io_ch);
1791 	spdk_put_io_channel(ch->fsdev_io_ch);
1792 	ch->fsdev_io_ch = NULL;
1793 
1794 	spdk_for_each_channel_continue(i, 0);
1795 }
1796 
1797 static void
1798 fuse_dispatcher_fsdev_remove_put_channel_done(struct spdk_io_channel_iter *i, int status)
1799 {
1800 	struct spdk_fuse_dispatcher *disp = spdk_io_channel_iter_get_ctx(i);
1801 
1802 	if (status) {
1803 		SPDK_WARNLOG("%s: putting channels failed with %d\n", fuse_dispatcher_name(disp), status);
1804 	}
1805 
1806 	disp->event_cb(SPDK_FUSE_DISP_EVENT_FSDEV_REMOVE, disp, disp->event_ctx);
1807 }
1808 
1809 static void
1810 fuse_dispatcher_fsdev_event_cb(enum spdk_fsdev_event_type type, struct spdk_fsdev *fsdev,
1811 			       void *event_ctx)
1812 {
1813 	struct spdk_fuse_dispatcher *disp = event_ctx;
1814 
1815 	SPDK_NOTICELOG("%s received fsdev event %d\n", fuse_dispatcher_name(disp), type);
1816 
1817 	switch (type) {
1818 	case SPDK_FSDEV_EVENT_REMOVE:
1819 		SPDK_NOTICELOG("%s received SPDK_FSDEV_EVENT_REMOVE\n", fuse_dispatcher_name(disp));
1820 		/* Put the channels, to prevent the further IO submission */
1821 		spdk_for_each_channel(__disp_to_io_dev(disp),
1822 				      fuse_dispatcher_fsdev_remove_put_channel,
1823 				      disp,
1824 				      fuse_dispatcher_fsdev_remove_put_channel_done);
1825 		break;
1826 	default:
1827 		SPDK_NOTICELOG("%s received an unknown fsdev event %d\n", fuse_dispatcher_name(disp), type);
1828 		break;
1829 	}
1830 }
1831 
1832 static int
1833 do_mount_prepare_completion(struct fuse_io *fuse_io)
1834 {
1835 	struct spdk_fuse_dispatcher *disp = fuse_io->disp;
1836 	struct fuse_init_out outarg;
1837 	size_t outargsize = sizeof(outarg);
1838 	uint32_t max_readahead = DEFAULT_MAX_READAHEAD;
1839 	uint32_t flags = 0;
1840 	void *out_buf;
1841 
1842 	assert(disp->desc);
1843 
1844 	memset(&outarg, 0, sizeof(outarg));
1845 	outarg.major = fsdev_io_h2d_u32(fuse_io, FUSE_KERNEL_VERSION);
1846 	outarg.minor = fsdev_io_h2d_u32(fuse_io, FUSE_KERNEL_MINOR_VERSION);
1847 
1848 	if (disp->proto_minor < 5) {
1849 		outargsize = FUSE_COMPAT_INIT_OUT_SIZE;
1850 	} else if (disp->proto_minor < 23) {
1851 		outargsize = FUSE_COMPAT_22_INIT_OUT_SIZE;
1852 	}
1853 
1854 	if (!fuse_io->u.init.legacy_in) {
1855 		max_readahead = fsdev_io_d2h_u32(fuse_io, fuse_io->u.init.in->max_readahead);
1856 		flags = fsdev_io_d2h_u32(fuse_io, fuse_io->u.init.in->flags);
1857 
1858 		SPDK_INFOLOG(fuse_dispatcher, "max_readahead: %" PRIu32 " flags=0x%" PRIx32 "\n",
1859 			     max_readahead, flags);
1860 	}
1861 
1862 	/* Always enable big writes, this is superseded by the max_write option */
1863 	outarg.flags = FUSE_BIG_WRITES;
1864 
1865 #define LL_SET_DEFAULT(cond, cap) \
1866 	if ((cond) && flags & (cap)) \
1867 		outarg.flags |= (cap)
1868 	LL_SET_DEFAULT(true, FUSE_ASYNC_READ);
1869 	LL_SET_DEFAULT(true, FUSE_AUTO_INVAL_DATA);
1870 	LL_SET_DEFAULT(true, FUSE_ASYNC_DIO);
1871 	LL_SET_DEFAULT(true, FUSE_ATOMIC_O_TRUNC);
1872 	LL_SET_DEFAULT(true, FUSE_FLOCK_LOCKS);
1873 	LL_SET_DEFAULT(true, FUSE_DO_READDIRPLUS);
1874 	LL_SET_DEFAULT(true, FUSE_READDIRPLUS_AUTO);
1875 	LL_SET_DEFAULT(true, FUSE_EXPORT_SUPPORT);
1876 	LL_SET_DEFAULT(fuse_io->u.init.opts.writeback_cache_enabled, FUSE_WRITEBACK_CACHE);
1877 
1878 	outarg.flags = fsdev_io_h2d_u32(fuse_io, outarg.flags);
1879 	outarg.max_readahead = fsdev_io_h2d_u32(fuse_io, max_readahead);
1880 	outarg.max_write = fsdev_io_h2d_u32(fuse_io, fuse_io->u.init.opts.max_write);
1881 	if (fsdev_io_proto_minor(fuse_io) >= 13) {
1882 		outarg.max_background = fsdev_io_h2d_u16(fuse_io, DEFAULT_MAX_BACKGROUND);
1883 		outarg.congestion_threshold = fsdev_io_h2d_u16(fuse_io, DEFAULT_CONGESTION_THRESHOLD);
1884 	}
1885 
1886 	if (fsdev_io_proto_minor(fuse_io) >= 23) {
1887 		outarg.time_gran = fsdev_io_h2d_u32(fuse_io, DEFAULT_TIME_GRAN);
1888 	}
1889 
1890 	SPDK_INFOLOG(fuse_dispatcher, "INIT: %" PRIu32 ".%" PRIu32 "\n",
1891 		     fsdev_io_d2h_u32(fuse_io, outarg.major), fsdev_io_d2h_u32(fuse_io, outarg.minor));
1892 	SPDK_INFOLOG(fuse_dispatcher, "flags: 0x%08" PRIx32 "\n", fsdev_io_d2h_u32(fuse_io, outarg.flags));
1893 	SPDK_INFOLOG(fuse_dispatcher, "max_readahead: %" PRIu32 "\n",
1894 		     fsdev_io_d2h_u32(fuse_io, outarg.max_readahead));
1895 	SPDK_INFOLOG(fuse_dispatcher, "max_write: %" PRIu32 "\n",
1896 		     fsdev_io_d2h_u32(fuse_io, outarg.max_write));
1897 	SPDK_INFOLOG(fuse_dispatcher, "max_background: %" PRIu16 "\n",
1898 		     fsdev_io_d2h_u16(fuse_io, outarg.max_background));
1899 	SPDK_INFOLOG(fuse_dispatcher, "congestion_threshold: %" PRIu16 "\n",
1900 		     fsdev_io_d2h_u16(fuse_io, outarg.congestion_threshold));
1901 	SPDK_INFOLOG(fuse_dispatcher, "time_gran: %" PRIu32 "\n", fsdev_io_d2h_u32(fuse_io,
1902 			outarg.time_gran));
1903 
1904 	out_buf = _fsdev_io_out_arg_get_buf(fuse_io, outargsize);
1905 	if (!out_buf) {
1906 		SPDK_ERRLOG("Cannot get buf to copy fuse_init_out of %zu bytes\n", outargsize);
1907 		return -EINVAL;
1908 	}
1909 
1910 	memcpy(out_buf, &outarg, outargsize);
1911 
1912 	fuse_io->u.init.out_len = outargsize;
1913 	return 0;
1914 }
1915 
1916 static void
1917 do_mount_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status,
1918 		 const struct spdk_fsdev_mount_opts *opts, struct spdk_fsdev_file_object *root_fobject)
1919 {
1920 	struct fuse_io *fuse_io = cb_arg;
1921 	struct spdk_fuse_dispatcher *disp = fuse_io->disp;
1922 	int rc;
1923 
1924 	if (status) {
1925 		SPDK_ERRLOG("%s: spdk_fsdev_mount failed (err=%d)\n", fuse_dispatcher_name(disp), status);
1926 		fuse_dispatcher_io_complete_err(fuse_io, status);
1927 		return;
1928 	}
1929 
1930 	SPDK_DEBUGLOG(fuse_dispatcher, "%s: spdk_fsdev_mount succeeded\n", fuse_dispatcher_name(disp));
1931 	disp->root_fobject = root_fobject;
1932 	rc = do_mount_prepare_completion(fuse_io);
1933 	if (rc) {
1934 		SPDK_ERRLOG("%s: mount completion preparation failed with %d\n", fuse_dispatcher_name(disp), rc);
1935 		fuse_io->u.init.error = rc;
1936 		disp->root_fobject = NULL;
1937 		fuse_dispatcher_mount_rollback(fuse_io);
1938 		return;
1939 	}
1940 
1941 	fuse_dispatcher_io_complete_ok(fuse_io, fuse_io->u.init.out_len);
1942 }
1943 
1944 static void
1945 do_init(struct fuse_io *fuse_io)
1946 {
1947 	size_t compat_size = offsetof(struct fuse_init_in, max_readahead);
1948 	struct spdk_fuse_dispatcher *disp = fuse_io->disp;
1949 	uint32_t flags = 0;
1950 	int rc;
1951 
1952 	/* First try to read the legacy header */
1953 	fuse_io->u.init.in = _fsdev_io_in_arg_get_buf(fuse_io, compat_size);
1954 	if (!fuse_io->u.init.in) {
1955 		SPDK_ERRLOG("Cannot get fuse_init_in\n");
1956 		fuse_dispatcher_io_complete_err(fuse_io, -EBADR);
1957 		return;
1958 	}
1959 
1960 	disp->proto_major = fsdev_io_d2h_u32(fuse_io, fuse_io->u.init.in->major);
1961 	disp->proto_minor = fsdev_io_d2h_u32(fuse_io, fuse_io->u.init.in->minor);
1962 
1963 	SPDK_DEBUGLOG(fuse_dispatcher, "Proto version: %" PRIu32 ".%" PRIu32 "\n",
1964 		      disp->proto_major,
1965 		      disp->proto_minor);
1966 
1967 	/* Now try to read the whole struct */
1968 	if (disp->proto_major == 7 && disp->proto_minor >= 6) {
1969 		void *arg_extra = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*fuse_io->u.init.in) - compat_size);
1970 		if (!arg_extra) {
1971 			SPDK_ERRLOG("INIT: protocol version: %" PRIu32 ".%" PRIu32 " but legacy data found\n",
1972 				    disp->proto_major, disp->proto_minor);
1973 			fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
1974 			return;
1975 		}
1976 		fuse_io->u.init.legacy_in = false;
1977 	} else {
1978 		fuse_io->u.init.legacy_in = true;
1979 	}
1980 
1981 	if (disp->proto_major < 7) {
1982 		SPDK_ERRLOG("INIT: unsupported major protocol version: %" PRIu32 "\n",
1983 			    disp->proto_major);
1984 		fuse_dispatcher_io_complete_err(fuse_io, -EAGAIN);
1985 		return;
1986 	}
1987 
1988 	if (disp->proto_major > 7) {
1989 		/* Wait for a second INIT request with a 7.X version */
1990 
1991 		struct fuse_init_out outarg;
1992 		size_t outargsize = sizeof(outarg);
1993 
1994 		memset(&outarg, 0, sizeof(outarg));
1995 		outarg.major = fsdev_io_h2d_u32(fuse_io, FUSE_KERNEL_VERSION);
1996 		outarg.minor = fsdev_io_h2d_u32(fuse_io, FUSE_KERNEL_MINOR_VERSION);
1997 
1998 		fuse_dispatcher_io_copy_and_complete(fuse_io, &outarg, outargsize, 0);
1999 		return;
2000 	}
2001 
2002 	if (!fuse_io->u.init.legacy_in) {
2003 		flags = fsdev_io_d2h_u32(fuse_io, fuse_io->u.init.in->flags);
2004 
2005 		SPDK_INFOLOG(fuse_dispatcher, "flags=0x%" PRIx32 "\n", flags);
2006 	}
2007 
2008 	memset(&fuse_io->u.init.opts, 0, sizeof(fuse_io->u.init.opts));
2009 	fuse_io->u.init.opts.opts_size = sizeof(fuse_io->u.init.opts);
2010 	fuse_io->u.init.opts.max_write = 0;
2011 	fuse_io->u.init.opts.writeback_cache_enabled = flags & FUSE_WRITEBACK_CACHE ? true : false;
2012 	fuse_io->u.init.thread = spdk_get_thread();
2013 
2014 	rc = spdk_fsdev_mount(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
2015 			      &fuse_io->u.init.opts, do_mount_cpl_clb, fuse_io);
2016 	if (rc) {
2017 		SPDK_ERRLOG("%s: failed to initiate mount (err=%d)\n", fuse_dispatcher_name(disp), rc);
2018 		fuse_dispatcher_io_complete_err(fuse_io, rc);
2019 	}
2020 }
2021 
2022 static void
2023 do_opendir_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status,
2024 		   struct spdk_fsdev_file_handle *fhandle)
2025 {
2026 	struct fuse_io *fuse_io = cb_arg;
2027 
2028 	if (!status) {
2029 		fuse_dispatcher_io_complete_open(fuse_io, fhandle);
2030 	} else {
2031 		fuse_dispatcher_io_complete_err(fuse_io, status);
2032 	}
2033 }
2034 
2035 static void
2036 do_opendir(struct fuse_io *fuse_io)
2037 {
2038 	int err;
2039 	struct fuse_open_in *arg;
2040 
2041 	arg = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*arg));
2042 	if (!arg) {
2043 		SPDK_ERRLOG("Cannot get fuse_open_in\n");
2044 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
2045 		return;
2046 	}
2047 
2048 	err = spdk_fsdev_opendir(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
2049 				 file_object(fuse_io), fsdev_io_d2h_u32(fuse_io, arg->flags),
2050 				 do_opendir_cpl_clb, fuse_io);
2051 	if (err) {
2052 		fuse_dispatcher_io_complete_err(fuse_io, err);
2053 	}
2054 }
2055 
2056 static int
2057 do_readdir_entry_clb(void *cb_arg, struct spdk_io_channel *ch, const char *name,
2058 		     struct spdk_fsdev_file_object *fobject, const struct spdk_fsdev_file_attr *attr, off_t offset)
2059 {
2060 	struct fuse_io *fuse_io = cb_arg;
2061 	size_t bytes_remained = fuse_io->u.readdir.size - fuse_io->u.readdir.bytes_written;
2062 	size_t direntry_bytes;
2063 
2064 	direntry_bytes = fuse_io->u.readdir.plus ?
2065 			 fuse_dispatcher_add_direntry_plus(fuse_io, fuse_io->u.readdir.writep, bytes_remained,
2066 					 name, fobject, attr, offset) :
2067 			 fuse_dispatcher_add_direntry(fuse_io, fuse_io->u.readdir.writep, bytes_remained,
2068 					 name, fobject, attr, offset);
2069 
2070 	if (direntry_bytes > bytes_remained) {
2071 		return EAGAIN;
2072 	}
2073 
2074 	fuse_io->u.readdir.writep += direntry_bytes;
2075 	fuse_io->u.readdir.bytes_written += direntry_bytes;
2076 
2077 	return 0;
2078 }
2079 
2080 static void
2081 do_readdir_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status)
2082 {
2083 	struct fuse_io *fuse_io = cb_arg;
2084 
2085 	if (!status || (status == EAGAIN && fuse_io->u.readdir.bytes_written == fuse_io->u.readdir.size)) {
2086 		fuse_dispatcher_io_complete_ok(fuse_io, fuse_io->u.readdir.bytes_written);
2087 	} else {
2088 		fuse_dispatcher_io_complete_err(fuse_io, status);
2089 	}
2090 }
2091 
2092 static void
2093 do_readdir_common(struct fuse_io *fuse_io, bool plus)
2094 {
2095 	int err;
2096 	struct fuse_read_in *arg;
2097 	uint64_t fh;
2098 	uint32_t size;
2099 
2100 	arg = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*arg));
2101 	if (!arg) {
2102 		SPDK_ERRLOG("Cannot get fuse_read_in\n");
2103 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
2104 		return;
2105 	}
2106 
2107 	size = fsdev_io_d2h_u32(fuse_io, arg->size);
2108 
2109 	fuse_io->u.readdir.writep = _fsdev_io_out_arg_get_buf(fuse_io, size);
2110 	if (!fuse_io->u.readdir.writep) {
2111 		SPDK_ERRLOG("Cannot get buffer of %" PRIu32 " bytes\n", size);
2112 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
2113 		return;
2114 	}
2115 
2116 	fuse_io->u.readdir.plus = plus;
2117 	fuse_io->u.readdir.size = size;
2118 	fuse_io->u.readdir.bytes_written = 0;
2119 
2120 	fh = fsdev_io_d2h_u64(fuse_io, arg->fh);
2121 
2122 	err = spdk_fsdev_readdir(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
2123 				 file_object(fuse_io), file_handle(fh),
2124 				 fsdev_io_d2h_u64(fuse_io, arg->offset),
2125 				 do_readdir_entry_clb, do_readdir_cpl_clb, fuse_io);
2126 	if (err) {
2127 		fuse_dispatcher_io_complete_err(fuse_io, err);
2128 	}
2129 }
2130 
2131 static void
2132 do_readdir(struct fuse_io *fuse_io)
2133 {
2134 	do_readdir_common(fuse_io, false);
2135 }
2136 
2137 static void
2138 do_readdirplus(struct fuse_io *fuse_io)
2139 {
2140 	do_readdir_common(fuse_io, true);
2141 }
2142 
2143 static void
2144 do_releasedir_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status)
2145 {
2146 	struct fuse_io *fuse_io = cb_arg;
2147 
2148 	fuse_dispatcher_io_complete_err(fuse_io, status);
2149 }
2150 
2151 static void
2152 do_releasedir(struct fuse_io *fuse_io)
2153 {
2154 	int err;
2155 	struct fuse_release_in *arg;
2156 	uint64_t fh;
2157 
2158 	arg = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*arg));
2159 	if (!arg) {
2160 		SPDK_ERRLOG("Cannot get fuse_release_in\n");
2161 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
2162 		return;
2163 	}
2164 
2165 	fh = fsdev_io_d2h_u64(fuse_io, arg->fh);
2166 
2167 	err = spdk_fsdev_releasedir(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
2168 				    file_object(fuse_io), file_handle(fh),
2169 				    do_releasedir_cpl_clb, fuse_io);
2170 	if (err) {
2171 		fuse_dispatcher_io_complete_err(fuse_io, err);
2172 	}
2173 }
2174 
2175 static void
2176 do_fsyncdir_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status)
2177 {
2178 	struct fuse_io *fuse_io = cb_arg;
2179 
2180 	fuse_dispatcher_io_complete_err(fuse_io, status);
2181 }
2182 
2183 static void
2184 do_fsyncdir(struct fuse_io *fuse_io)
2185 {
2186 	int err;
2187 	struct fuse_fsync_in *arg;
2188 	uint64_t fh;
2189 	bool datasync;
2190 
2191 	arg = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*arg));
2192 	if (!arg) {
2193 		SPDK_ERRLOG("Cannot get fuse_fsync_in\n");
2194 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
2195 		return;
2196 	}
2197 
2198 	fh = fsdev_io_d2h_u64(fuse_io, arg->fh);
2199 	datasync = (fsdev_io_d2h_u32(fuse_io, arg->fsync_flags) & 1) ? true : false;
2200 
2201 	err = spdk_fsdev_fsyncdir(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
2202 				  file_object(fuse_io), file_handle(fh), datasync,
2203 				  do_fsyncdir_cpl_clb, fuse_io);
2204 	if (err) {
2205 		fuse_dispatcher_io_complete_err(fuse_io, err);
2206 	}
2207 }
2208 
2209 static void
2210 do_getlk(struct fuse_io *fuse_io)
2211 {
2212 	SPDK_ERRLOG("GETLK is not supported\n");
2213 	fuse_dispatcher_io_complete_err(fuse_io, -ENOSYS);
2214 }
2215 
2216 static void
2217 do_setlk_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status)
2218 {
2219 	struct fuse_io *fuse_io = cb_arg;
2220 
2221 	fuse_dispatcher_io_complete_err(fuse_io, status);
2222 }
2223 
2224 static void
2225 do_setlk_common(struct fuse_io *fuse_io)
2226 {
2227 	int err;
2228 	struct fuse_lk_in *arg;
2229 	uint64_t fh;
2230 	uint32_t lk_flags;
2231 
2232 	arg = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*arg));
2233 	if (!arg) {
2234 		SPDK_ERRLOG("Cannot get fuse_lk_in\n");
2235 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
2236 		return;
2237 	}
2238 
2239 	lk_flags = fsdev_io_d2h_u64(fuse_io, arg->lk_flags);
2240 
2241 	if (lk_flags & FUSE_LK_FLOCK) {
2242 		int op = 0;
2243 
2244 		switch (arg->lk.type) {
2245 		case F_RDLCK:
2246 			op = LOCK_SH;
2247 			break;
2248 		case F_WRLCK:
2249 			op = LOCK_EX;
2250 			break;
2251 		case F_UNLCK:
2252 			op = LOCK_UN;
2253 			break;
2254 		}
2255 
2256 		fh = fsdev_io_d2h_u64(fuse_io, arg->fh);
2257 
2258 		err = spdk_fsdev_flock(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
2259 				       file_object(fuse_io), file_handle(fh), op,
2260 				       do_setlk_cpl_clb, fuse_io);
2261 		if (err) {
2262 			fuse_dispatcher_io_complete_err(fuse_io, err);
2263 		}
2264 	} else {
2265 		SPDK_ERRLOG("SETLK: with no FUSE_LK_FLOCK is not supported\n");
2266 		fuse_dispatcher_io_complete_err(fuse_io, -ENOSYS);
2267 	}
2268 }
2269 
2270 static void
2271 do_setlk(struct fuse_io *fuse_io)
2272 {
2273 	do_setlk_common(fuse_io);
2274 }
2275 
2276 static void
2277 do_setlkw(struct fuse_io *fuse_io)
2278 {
2279 	SPDK_ERRLOG("SETLKW is not supported\n");
2280 	fuse_dispatcher_io_complete_err(fuse_io, -ENOSYS);
2281 }
2282 
2283 static void
2284 do_access(struct fuse_io *fuse_io)
2285 {
2286 	SPDK_ERRLOG("ACCESS is not supported\n");
2287 	fuse_dispatcher_io_complete_err(fuse_io, -ENOSYS);
2288 }
2289 
2290 static void
2291 do_create_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status,
2292 		  struct spdk_fsdev_file_object *fobject, const struct spdk_fsdev_file_attr *attr,
2293 		  struct spdk_fsdev_file_handle *fhandle)
2294 {
2295 	struct fuse_io *fuse_io = cb_arg;
2296 
2297 	if (!status) {
2298 		fuse_dispatcher_io_complete_create(fuse_io, fobject, attr, fhandle);
2299 	} else {
2300 		fuse_dispatcher_io_complete_err(fuse_io, status);
2301 	}
2302 }
2303 
2304 static void
2305 do_create(struct fuse_io *fuse_io)
2306 {
2307 	int err;
2308 	struct spdk_fuse_dispatcher *disp = fuse_io->disp;
2309 	bool compat = fsdev_io_proto_minor(fuse_io) < 12;
2310 	struct fuse_create_in *arg;
2311 	const char *name;
2312 	uint32_t flags, mode, umask = 0;
2313 	size_t arg_size = compat ? sizeof(struct fuse_open_in) : sizeof(*arg);
2314 
2315 	arg = _fsdev_io_in_arg_get_buf(fuse_io, arg_size);
2316 	if (!arg) {
2317 		SPDK_ERRLOG("Cannot get fuse_create_in (compat=%d)\n", compat);
2318 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
2319 		return;
2320 	}
2321 
2322 	name = _fsdev_io_in_arg_get_str(fuse_io);
2323 	if (!name) {
2324 		SPDK_ERRLOG("Cannot get name (compat=%d)\n", compat);
2325 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
2326 		return;
2327 	}
2328 
2329 	mode =  fsdev_io_d2h_u32(fuse_io, arg->mode);
2330 	if (!compat) {
2331 		umask = fsdev_io_d2h_u32(fuse_io, arg->umask);
2332 	}
2333 
2334 	if (!fsdev_d2h_open_flags(disp->fuse_arch, fsdev_io_d2h_u32(fuse_io, arg->flags), &flags)) {
2335 		SPDK_ERRLOG("Cannot translate flags\n");
2336 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
2337 		return;
2338 	}
2339 
2340 	err = spdk_fsdev_create(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
2341 				file_object(fuse_io), name, mode, flags, umask, fuse_io->hdr.uid,
2342 				fuse_io->hdr.gid, do_create_cpl_clb, fuse_io);
2343 	if (err) {
2344 		fuse_dispatcher_io_complete_err(fuse_io, err);
2345 	}
2346 }
2347 
2348 static void
2349 do_abort_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status)
2350 {
2351 	struct fuse_io *fuse_io = cb_arg;
2352 
2353 	fuse_dispatcher_io_complete_err(fuse_io, status);
2354 }
2355 
2356 static void
2357 do_interrupt(struct fuse_io *fuse_io)
2358 {
2359 	int err;
2360 	struct fuse_interrupt_in *arg;
2361 	uint64_t unique;
2362 
2363 	arg = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*arg));
2364 	if (!arg) {
2365 		SPDK_ERRLOG("Cannot get fuse_access_in\n");
2366 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
2367 		return;
2368 	}
2369 
2370 	unique = fsdev_io_d2h_u64(fuse_io, arg->unique);
2371 
2372 	SPDK_DEBUGLOG(fuse_dispatcher, "INTERRUPT: %" PRIu64 "\n", unique);
2373 
2374 	err = spdk_fsdev_abort(fuse_io_desc(fuse_io), fuse_io->ch, unique, do_abort_cpl_clb, fuse_io);
2375 	if (err) {
2376 		fuse_dispatcher_io_complete_err(fuse_io, err);
2377 	}
2378 }
2379 
2380 static void
2381 do_bmap(struct fuse_io *fuse_io)
2382 {
2383 	SPDK_ERRLOG("BMAP is not supported\n");
2384 	fuse_dispatcher_io_complete_err(fuse_io, -ENOSYS);
2385 }
2386 
2387 static void
2388 do_ioctl(struct fuse_io *fuse_io)
2389 {
2390 	SPDK_ERRLOG("IOCTL is not supported\n");
2391 	fuse_dispatcher_io_complete_err(fuse_io, -ENOSYS);
2392 }
2393 
2394 static void
2395 do_poll(struct fuse_io *fuse_io)
2396 {
2397 	SPDK_ERRLOG("POLL is not supported\n");
2398 	fuse_dispatcher_io_complete_err(fuse_io, -ENOSYS);
2399 }
2400 
2401 static void
2402 do_fallocate_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status)
2403 {
2404 	struct fuse_io *fuse_io = cb_arg;
2405 
2406 	fuse_dispatcher_io_complete_err(fuse_io, status);
2407 }
2408 
2409 static void
2410 do_fallocate(struct fuse_io *fuse_io)
2411 {
2412 	int err;
2413 	struct fuse_fallocate_in *arg;
2414 	uint64_t fh;
2415 
2416 	arg = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*arg));
2417 	if (!arg) {
2418 		SPDK_ERRLOG("Cannot get fuse_fallocate_in\n");
2419 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
2420 		return;
2421 	}
2422 
2423 	fh = fsdev_io_d2h_u64(fuse_io, arg->fh);
2424 
2425 	err = spdk_fsdev_fallocate(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
2426 				   file_object(fuse_io), file_handle(fh),
2427 				   fsdev_io_d2h_u32(fuse_io, arg->mode), fsdev_io_d2h_u64(fuse_io, arg->offset),
2428 				   fsdev_io_d2h_u64(fuse_io, arg->length),
2429 				   do_fallocate_cpl_clb, fuse_io);
2430 	if (err) {
2431 		fuse_dispatcher_io_complete_err(fuse_io, err);
2432 	}
2433 }
2434 
2435 static void
2436 do_umount_cpl_clb(void *cb_arg, struct spdk_io_channel *ch)
2437 {
2438 	struct fuse_io *fuse_io = cb_arg;
2439 	struct spdk_fuse_dispatcher *disp = fuse_io->disp;
2440 
2441 	disp->proto_major = disp->proto_minor = 0;
2442 	disp->root_fobject = NULL;
2443 	SPDK_DEBUGLOG(fuse_dispatcher, "%s unmounted\n", fuse_dispatcher_name(disp));
2444 	fuse_dispatcher_io_complete_err(fuse_io, 0);
2445 }
2446 
2447 static void
2448 do_destroy(struct fuse_io *fuse_io)
2449 {
2450 	struct spdk_fuse_dispatcher *disp = fuse_io->disp;
2451 	int rc;
2452 
2453 	rc = spdk_fsdev_umount(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique, do_umount_cpl_clb,
2454 			       fuse_io);
2455 	if (rc) {
2456 		SPDK_ERRLOG("%s: failed to initiate umount (err=%d)\n", fuse_dispatcher_name(disp), rc);
2457 		fuse_dispatcher_io_complete_err(fuse_io, rc);
2458 		return;
2459 	}
2460 }
2461 
2462 static void
2463 do_batch_forget_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status)
2464 {
2465 	struct fuse_io *fuse_io = cb_arg;
2466 
2467 	if (status) {
2468 		fuse_io->u.batch_forget.status = status;
2469 	}
2470 
2471 	fuse_io->u.batch_forget.to_forget--;
2472 
2473 	if (!fuse_io->u.batch_forget.to_forget) {
2474 		/* FUSE_BATCH_FORGET requires no response */
2475 		fuse_dispatcher_io_complete_none(fuse_io, fuse_io->u.batch_forget.status);
2476 	}
2477 }
2478 
2479 static void
2480 do_batch_forget(struct fuse_io *fuse_io)
2481 {
2482 	int err;
2483 	struct fuse_batch_forget_in *arg;
2484 	struct fuse_forget_data *forgets;
2485 	size_t scount;
2486 	uint32_t count, i;
2487 
2488 	arg = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*arg));
2489 	if (!arg) {
2490 		SPDK_ERRLOG("Cannot get fuse_batch_forget_in\n");
2491 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
2492 		return;
2493 	}
2494 
2495 	/* Prevent integer overflow.  The compiler emits the following warning
2496 	 * unless we use the scount local variable:
2497 	 *
2498 	 * error: comparison is always false due to limited range of data type
2499 	 * [-Werror=type-limits]
2500 	 *
2501 	 * This may be true on 64-bit hosts but we need this check for 32-bit
2502 	 * hosts.
2503 	 */
2504 	scount = fsdev_io_d2h_u32(fuse_io, arg->count);
2505 	if (scount > SIZE_MAX / sizeof(forgets[0])) {
2506 		SPDK_WARNLOG("Too many forgets (%zu >= %zu)\n", scount,
2507 			     SIZE_MAX / sizeof(forgets[0]));
2508 		/* FUSE_BATCH_FORGET requires no response */
2509 		fuse_dispatcher_io_complete_none(fuse_io, -EINVAL);
2510 		return;
2511 	}
2512 
2513 	count = scount;
2514 	if (!count) {
2515 		SPDK_WARNLOG("0 forgets requested\n");
2516 		/* FUSE_BATCH_FORGET requires no response */
2517 		fuse_dispatcher_io_complete_none(fuse_io, -EINVAL);
2518 		return;
2519 	}
2520 
2521 	forgets = _fsdev_io_in_arg_get_buf(fuse_io, count * sizeof(forgets[0]));
2522 	if (!forgets) {
2523 		SPDK_WARNLOG("Cannot get expected forgets (%" PRIu32 ")\n", count);
2524 		/* FUSE_BATCH_FORGET requires no response */
2525 		fuse_dispatcher_io_complete_none(fuse_io, -EINVAL);
2526 		return;
2527 	}
2528 
2529 	fuse_io->u.batch_forget.to_forget = 0;
2530 	fuse_io->u.batch_forget.status = 0;
2531 
2532 	for (i = 0; i < count; i++) {
2533 		uint64_t ino = fsdev_io_d2h_u64(fuse_io, forgets[i].ino);
2534 		uint64_t nlookup = fsdev_io_d2h_u64(fuse_io, forgets[i].nlookup);
2535 		err = spdk_fsdev_forget(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
2536 					ino_to_object(fuse_io, ino), nlookup,
2537 					do_batch_forget_cpl_clb, fuse_io);
2538 		if (!err) {
2539 			fuse_io->u.batch_forget.to_forget++;
2540 		} else {
2541 			fuse_io->u.batch_forget.status = err;
2542 		}
2543 	}
2544 
2545 	if (!fuse_io->u.batch_forget.to_forget) {
2546 		/* FUSE_BATCH_FORGET requires no response */
2547 		fuse_dispatcher_io_complete_none(fuse_io, fuse_io->u.batch_forget.status);
2548 	}
2549 }
2550 
2551 static void
2552 do_copy_file_range_cpl_clb(void *cb_arg, struct spdk_io_channel *ch, int status, uint32_t data_size)
2553 {
2554 	struct fuse_io *fuse_io = cb_arg;
2555 
2556 	fuse_dispatcher_io_complete_write(fuse_io, data_size, status);
2557 }
2558 
2559 static void
2560 do_copy_file_range(struct fuse_io *fuse_io)
2561 {
2562 	int err;
2563 	struct fuse_copy_file_range_in *arg;
2564 	uint64_t fh_in, fh_out, nodeid_out;
2565 
2566 	arg = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*arg));
2567 	if (!arg) {
2568 		SPDK_ERRLOG("Cannot get fuse_copy_file_range_in\n");
2569 		fuse_dispatcher_io_complete_err(fuse_io, -EINVAL);
2570 		return;
2571 	}
2572 
2573 	fh_in = fsdev_io_d2h_u64(fuse_io, arg->fh_in);
2574 	nodeid_out = fsdev_io_d2h_u64(fuse_io, arg->nodeid_out);
2575 	fh_out = fsdev_io_d2h_u64(fuse_io, arg->fh_out);
2576 
2577 	err = spdk_fsdev_copy_file_range(fuse_io_desc(fuse_io), fuse_io->ch, fuse_io->hdr.unique,
2578 					 file_object(fuse_io), file_handle(fh_in), fsdev_io_d2h_u64(fuse_io, arg->off_in),
2579 					 ino_to_object(fuse_io, nodeid_out), file_handle(fh_out), fsdev_io_d2h_u64(fuse_io, arg->off_out),
2580 					 fsdev_io_d2h_u64(fuse_io, arg->len), fsdev_io_d2h_u64(fuse_io, arg->flags),
2581 					 do_copy_file_range_cpl_clb, fuse_io);
2582 
2583 	if (err) {
2584 		fuse_dispatcher_io_complete_err(fuse_io, err);
2585 	}
2586 }
2587 
2588 static void
2589 do_setupmapping(struct fuse_io *fuse_io)
2590 {
2591 	SPDK_ERRLOG("SETUPMAPPING is not supported\n");
2592 	fuse_dispatcher_io_complete_err(fuse_io, -ENOSYS);
2593 }
2594 
2595 static void
2596 do_removemapping(struct fuse_io *fuse_io)
2597 {
2598 	SPDK_ERRLOG("REMOVEMAPPING is not supported\n");
2599 	fuse_dispatcher_io_complete_err(fuse_io, -ENOSYS);
2600 }
2601 
2602 static void
2603 do_syncfs(struct fuse_io *fuse_io)
2604 {
2605 	SPDK_ERRLOG("SYNCFS is not supported\n");
2606 	fuse_dispatcher_io_complete_err(fuse_io, -ENOSYS);
2607 }
2608 
2609 static const struct {
2610 	void (*func)(struct fuse_io *fuse_io);
2611 	const char *name;
2612 } fuse_ll_ops[] = {
2613 	[FUSE_LOOKUP]	   = { do_lookup,      "LOOKUP"	     },
2614 	[FUSE_FORGET]	   = { do_forget,      "FORGET"	     },
2615 	[FUSE_GETATTR]	   = { do_getattr,     "GETATTR"     },
2616 	[FUSE_SETATTR]	   = { do_setattr,     "SETATTR"     },
2617 	[FUSE_READLINK]	   = { do_readlink,    "READLINK"    },
2618 	[FUSE_SYMLINK]	   = { do_symlink,     "SYMLINK"     },
2619 	[FUSE_MKNOD]	   = { do_mknod,       "MKNOD"	     },
2620 	[FUSE_MKDIR]	   = { do_mkdir,       "MKDIR"	     },
2621 	[FUSE_UNLINK]	   = { do_unlink,      "UNLINK"	     },
2622 	[FUSE_RMDIR]	   = { do_rmdir,       "RMDIR"	     },
2623 	[FUSE_RENAME]	   = { do_rename,      "RENAME"	     },
2624 	[FUSE_LINK]	   = { do_link,	       "LINK"	     },
2625 	[FUSE_OPEN]	   = { do_open,	       "OPEN"	     },
2626 	[FUSE_READ]	   = { do_read,       "READ"	     },
2627 	[FUSE_WRITE]	   = { do_write,       "WRITE"	     },
2628 	[FUSE_STATFS]	   = { do_statfs,      "STATFS"	     },
2629 	[FUSE_RELEASE]	   = { do_release,     "RELEASE"     },
2630 	[FUSE_FSYNC]	   = { do_fsync,       "FSYNC"	     },
2631 	[FUSE_SETXATTR]	   = { do_setxattr,    "SETXATTR"    },
2632 	[FUSE_GETXATTR]	   = { do_getxattr,    "GETXATTR"    },
2633 	[FUSE_LISTXATTR]   = { do_listxattr,   "LISTXATTR"   },
2634 	[FUSE_REMOVEXATTR] = { do_removexattr, "REMOVEXATTR" },
2635 	[FUSE_FLUSH]	   = { do_flush,       "FLUSH"	     },
2636 	[FUSE_INIT]	   = { do_init,	       "INIT"	     },
2637 	[FUSE_OPENDIR]	   = { do_opendir,     "OPENDIR"     },
2638 	[FUSE_READDIR]	   = { do_readdir,     "READDIR"     },
2639 	[FUSE_RELEASEDIR]  = { do_releasedir,  "RELEASEDIR"  },
2640 	[FUSE_FSYNCDIR]	   = { do_fsyncdir,    "FSYNCDIR"    },
2641 	[FUSE_GETLK]	   = { do_getlk,       "GETLK"	     },
2642 	[FUSE_SETLK]	   = { do_setlk,       "SETLK"	     },
2643 	[FUSE_SETLKW]	   = { do_setlkw,      "SETLKW"	     },
2644 	[FUSE_ACCESS]	   = { do_access,      "ACCESS"	     },
2645 	[FUSE_CREATE]	   = { do_create,      "CREATE"	     },
2646 	[FUSE_INTERRUPT]   = { do_interrupt,   "INTERRUPT"   },
2647 	[FUSE_BMAP]	   = { do_bmap,	       "BMAP"	     },
2648 	[FUSE_IOCTL]	   = { do_ioctl,       "IOCTL"	     },
2649 	[FUSE_POLL]	   = { do_poll,        "POLL"	     },
2650 	[FUSE_FALLOCATE]   = { do_fallocate,   "FALLOCATE"   },
2651 	[FUSE_DESTROY]	   = { do_destroy,     "DESTROY"     },
2652 	[FUSE_NOTIFY_REPLY] = { NULL,    "NOTIFY_REPLY" },
2653 	[FUSE_BATCH_FORGET] = { do_batch_forget, "BATCH_FORGET" },
2654 	[FUSE_READDIRPLUS] = { do_readdirplus,	"READDIRPLUS"},
2655 	[FUSE_RENAME2]     = { do_rename2,      "RENAME2"    },
2656 	[FUSE_COPY_FILE_RANGE] = { do_copy_file_range, "COPY_FILE_RANGE" },
2657 	[FUSE_SETUPMAPPING]  = { do_setupmapping, "SETUPMAPPING" },
2658 	[FUSE_REMOVEMAPPING] = { do_removemapping, "REMOVEMAPPING" },
2659 	[FUSE_SYNCFS] = { do_syncfs, "SYNCFS" },
2660 };
2661 
2662 static int
2663 spdk_fuse_dispatcher_handle_fuse_req(struct spdk_fuse_dispatcher *disp, struct fuse_io *fuse_io)
2664 {
2665 	struct fuse_in_header *hdr;
2666 
2667 	if (!fuse_io->in_iovcnt || !fuse_io->in_iov) {
2668 		SPDK_ERRLOG("Bad IO: no IN iov (%d, %p)\n", fuse_io->in_iovcnt, fuse_io->in_iov);
2669 		goto exit;
2670 	}
2671 
2672 	hdr = _fsdev_io_in_arg_get_buf(fuse_io, sizeof(*hdr));
2673 	if (!hdr) {
2674 		SPDK_ERRLOG("Bad IO: cannot get fuse_in_header\n");
2675 		goto exit;
2676 	}
2677 
2678 	fuse_io->hdr.opcode = fsdev_io_d2h_u32(fuse_io, hdr->opcode);
2679 
2680 	if (spdk_unlikely(!fuse_io->ch)) {
2681 		/* FUSE_INIT is allowed with no channel. It'll open the fsdev and get channels */
2682 		if (fuse_io->hdr.opcode != FUSE_INIT) {
2683 			/* The fsdev is not currently active. Complete this request. */
2684 			SPDK_ERRLOG("IO (%" PRIu32 ") arrived while there's no channel\n", fuse_io->hdr.opcode);
2685 			goto exit;
2686 		}
2687 	}
2688 
2689 	if (spdk_likely(_fuse_op_requires_reply(hdr->opcode))) {
2690 		struct fuse_out_header *out_hdr = _fsdev_io_out_arg_get_buf(fuse_io, sizeof(*out_hdr));
2691 		if (!out_hdr) {
2692 			SPDK_ERRLOG("Bad IO: cannot get out_hdr\n");
2693 			goto exit;
2694 		}
2695 
2696 		UNUSED(out_hdr); /* We don't need it here, we just made a check and a reservation */
2697 	}
2698 
2699 	if (fuse_io->hdr.opcode >= SPDK_COUNTOF(fuse_ll_ops)) {
2700 		SPDK_ERRLOG("Bad IO: opt_code is out of range (%" PRIu32 " > %zu)\n", fuse_io->hdr.opcode,
2701 			    SPDK_COUNTOF(fuse_ll_ops));
2702 		fuse_dispatcher_io_complete_err(fuse_io, -ENOSYS);
2703 		return 0;
2704 	}
2705 
2706 	if (!fuse_ll_ops[fuse_io->hdr.opcode].func) {
2707 		SPDK_ERRLOG("Bad IO: no handler for (%" PRIu32 ") %s\n", fuse_io->hdr.opcode,
2708 			    fuse_ll_ops[fuse_io->hdr.opcode].name);
2709 		fuse_dispatcher_io_complete_err(fuse_io, -ENOSYS);
2710 		return 0;
2711 	}
2712 
2713 	fuse_io->hdr.len = fsdev_io_d2h_u32(fuse_io, hdr->len);
2714 	fuse_io->hdr.unique = fsdev_io_d2h_u64(fuse_io, hdr->unique);
2715 	fuse_io->hdr.nodeid = fsdev_io_d2h_u64(fuse_io, hdr->nodeid);
2716 	fuse_io->hdr.uid = fsdev_io_d2h_u32(fuse_io, hdr->uid);
2717 	fuse_io->hdr.gid = fsdev_io_d2h_u32(fuse_io, hdr->gid);
2718 	fuse_io->hdr.pid = fsdev_io_d2h_u32(fuse_io, hdr->pid);
2719 
2720 	SPDK_DEBUGLOG(fuse_dispatcher, "IO arrived: %" PRIu32 " (%s) len=%" PRIu32 " unique=%" PRIu64
2721 		      " nodeid=%" PRIu64 " uid=%" PRIu32 " gid=%" PRIu32 " pid=%" PRIu32 "\n", fuse_io->hdr.opcode,
2722 		      fuse_ll_ops[fuse_io->hdr.opcode].name, fuse_io->hdr.len, fuse_io->hdr.unique,
2723 		      fuse_io->hdr.nodeid, fuse_io->hdr.uid, fuse_io->hdr.gid, fuse_io->hdr.pid);
2724 
2725 	fuse_ll_ops[fuse_io->hdr.opcode].func(fuse_io);
2726 	return 0;
2727 
2728 exit:
2729 	spdk_mempool_put(g_fuse_mgr.fuse_io_pool, fuse_io);
2730 	return -EINVAL;
2731 }
2732 
2733 static int
2734 fuse_dispatcher_channel_create(void *io_device, void *ctx_buf)
2735 {
2736 	struct spdk_fuse_dispatcher	*disp = __disp_from_io_dev(io_device);
2737 	struct spdk_fuse_dispatcher_channel	*ch = ctx_buf;
2738 
2739 	if (disp->desc) {
2740 		ch->fsdev_io_ch = spdk_fsdev_get_io_channel(disp->desc);
2741 	}
2742 
2743 	return 0;
2744 }
2745 
2746 static void
2747 fuse_dispatcher_channel_destroy(void *io_device, void *ctx_buf)
2748 {
2749 	struct spdk_fuse_dispatcher	*disp = __disp_from_io_dev(io_device);
2750 	struct spdk_fuse_dispatcher_channel	*ch = ctx_buf;
2751 
2752 	UNUSED(disp);
2753 
2754 	if (ch->fsdev_io_ch) {
2755 		assert(disp->desc);
2756 		spdk_put_io_channel(ch->fsdev_io_ch);
2757 		ch->fsdev_io_ch = NULL;
2758 	}
2759 }
2760 
2761 struct fuse_dispatcher_create_ctx {
2762 	struct spdk_fuse_dispatcher *disp;
2763 	spdk_fuse_dispatcher_create_cpl_cb cb;
2764 	void *cb_arg;
2765 };
2766 
2767 static void
2768 fuse_dispatcher_get_channel_rollback(struct spdk_io_channel_iter *i)
2769 {
2770 	struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i);
2771 	struct spdk_fuse_dispatcher_channel *ch = __disp_ch_from_io_ch(io_ch);
2772 
2773 	if (ch->fsdev_io_ch) {
2774 		spdk_put_io_channel(ch->fsdev_io_ch);
2775 		ch->fsdev_io_ch = NULL;
2776 	}
2777 
2778 	spdk_for_each_channel_continue(i, 0);
2779 }
2780 
2781 static void
2782 fuse_dispatcher_get_channel_rollback_done(struct spdk_io_channel_iter *i, int status)
2783 {
2784 	struct fuse_dispatcher_create_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
2785 	struct spdk_fuse_dispatcher *disp = ctx->disp;
2786 
2787 	if (status) {
2788 		SPDK_WARNLOG("%s: getting channels failed with %d\n", fuse_dispatcher_name(disp), status);
2789 		spdk_fsdev_close(disp->desc);
2790 		free(disp);
2791 		disp = NULL;
2792 	}
2793 
2794 	ctx->cb(ctx->cb_arg, disp);
2795 	free(ctx);
2796 }
2797 
2798 static void
2799 fuse_dispatcher_undo_create_get_channel(struct fuse_dispatcher_create_ctx *ctx)
2800 {
2801 	spdk_for_each_channel(__disp_to_io_dev(ctx->disp),
2802 			      fuse_dispatcher_get_channel_rollback,
2803 			      ctx,
2804 			      fuse_dispatcher_get_channel_rollback_done);
2805 
2806 }
2807 
2808 static void
2809 fuse_dispatcher_get_channel(struct spdk_io_channel_iter *i)
2810 {
2811 	struct fuse_dispatcher_create_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
2812 	struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i);
2813 	struct spdk_fuse_dispatcher_channel *ch = __disp_ch_from_io_ch(io_ch);
2814 	struct spdk_fuse_dispatcher *disp = ctx->disp;
2815 
2816 	assert(!ch->fsdev_io_ch);
2817 
2818 	ch->fsdev_io_ch = spdk_fsdev_get_io_channel(disp->desc);
2819 
2820 	spdk_for_each_channel_continue(i, 0);
2821 }
2822 
2823 static void
2824 fuse_dispatcher_get_channel_done(struct spdk_io_channel_iter *i, int status)
2825 {
2826 	struct fuse_dispatcher_create_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
2827 	struct spdk_fuse_dispatcher *disp = ctx->disp;
2828 
2829 	if (status) {
2830 		SPDK_ERRLOG("%s: getting channels failed with %d\n", fuse_dispatcher_name(disp), status);
2831 		fuse_dispatcher_undo_create_get_channel(ctx);
2832 		return;
2833 	}
2834 
2835 	SPDK_DEBUGLOG(fuse_dispatcher, "%s: getting succeeded\n", fuse_dispatcher_name(disp));
2836 	ctx->cb(ctx->cb_arg, disp);
2837 	free(ctx);
2838 }
2839 
2840 int
2841 spdk_fuse_dispatcher_create(const char *fsdev_name, spdk_fuse_dispatcher_event_cb event_cb,
2842 			    void *event_ctx, spdk_fuse_dispatcher_create_cpl_cb cb, void *cb_arg)
2843 {
2844 	struct spdk_fuse_dispatcher *disp;
2845 	struct fuse_dispatcher_create_ctx *ctx;
2846 	char *io_dev_name;
2847 	size_t fsdev_name_len;
2848 	int rc;
2849 
2850 	if (!fsdev_name || !event_cb || !cb) {
2851 		SPDK_ERRLOG("Invalid params\n");
2852 		return -EINVAL;
2853 	}
2854 
2855 	ctx = calloc(1, sizeof(*ctx));
2856 	if (!ctx) {
2857 		SPDK_ERRLOG("%s: could not alloc context\n", fsdev_name);
2858 		return -ENOMEM;
2859 	}
2860 
2861 	io_dev_name = spdk_sprintf_alloc("fuse_disp_%s", fsdev_name);
2862 	if (!io_dev_name) {
2863 		SPDK_ERRLOG("Could not format io_dev name (%s)\n", fsdev_name);
2864 		rc = -ENOMEM;
2865 		goto io_dev_name_failed;
2866 	}
2867 
2868 	fsdev_name_len = strlen(fsdev_name);
2869 
2870 	disp = calloc(1, sizeof(*disp) + fsdev_name_len + 1);
2871 	if (!disp) {
2872 		SPDK_ERRLOG("Could not allocate spdk_fuse_dispatcher\n");
2873 		rc = -ENOMEM;
2874 		goto disp_alloc_failed;
2875 	}
2876 
2877 	rc = spdk_fsdev_open(fsdev_name, fuse_dispatcher_fsdev_event_cb, disp, &disp->desc);
2878 	if (rc) {
2879 		SPDK_ERRLOG("Could not open fsdev %s (err=%d)\n", fsdev_name, rc);
2880 		goto fsdev_open_failed;
2881 	}
2882 
2883 	pthread_mutex_lock(&g_fuse_mgr.lock);
2884 	if (!g_fuse_mgr.ref_cnt) {
2885 		struct spdk_fsdev_opts opts;
2886 		spdk_fsdev_get_opts(&opts, sizeof(opts));
2887 
2888 		g_fuse_mgr.fuse_io_pool = spdk_mempool_create("FUSE_disp_ios", opts.fsdev_io_pool_size,
2889 					  sizeof(struct fuse_io), opts.fsdev_io_cache_size, SPDK_ENV_NUMA_ID_ANY);
2890 		if (!g_fuse_mgr.fuse_io_pool) {
2891 			pthread_mutex_unlock(&g_fuse_mgr.lock);
2892 			SPDK_ERRLOG("Could not create mempool\n");
2893 			rc = -ENOMEM;
2894 			goto mempool_create_failed;
2895 		}
2896 	}
2897 	g_fuse_mgr.ref_cnt++;
2898 	pthread_mutex_unlock(&g_fuse_mgr.lock);
2899 
2900 	spdk_io_device_register(__disp_to_io_dev(disp),
2901 				fuse_dispatcher_channel_create, fuse_dispatcher_channel_destroy,
2902 				sizeof(struct spdk_fuse_dispatcher_channel),
2903 				io_dev_name);
2904 
2905 	free(io_dev_name);
2906 
2907 	memcpy(disp->fsdev_name, fsdev_name, fsdev_name_len + 1);
2908 	disp->event_cb = event_cb;
2909 	disp->event_ctx = event_ctx;
2910 	disp->fuse_arch = SPDK_FUSE_ARCH_NATIVE;
2911 	disp->fsdev_thread = spdk_get_thread();
2912 
2913 	ctx->disp = disp;
2914 	ctx->cb = cb;
2915 	ctx->cb_arg = cb_arg;
2916 
2917 	spdk_for_each_channel(__disp_to_io_dev(disp),
2918 			      fuse_dispatcher_get_channel,
2919 			      ctx,
2920 			      fuse_dispatcher_get_channel_done);
2921 
2922 	return 0;
2923 
2924 mempool_create_failed:
2925 	spdk_fsdev_close(disp->desc);
2926 fsdev_open_failed:
2927 	free(disp);
2928 disp_alloc_failed:
2929 	free(io_dev_name);
2930 io_dev_name_failed:
2931 	free(ctx);
2932 	return rc;
2933 }
2934 
2935 int
2936 spdk_fuse_dispatcher_set_arch(struct spdk_fuse_dispatcher *disp, enum spdk_fuse_arch fuse_arch)
2937 {
2938 	switch (fuse_arch) {
2939 	case SPDK_FUSE_ARCH_NATIVE:
2940 	case SPDK_FUSE_ARCH_X86:
2941 	case SPDK_FUSE_ARCH_X86_64:
2942 	case SPDK_FUSE_ARCH_ARM:
2943 	case SPDK_FUSE_ARCH_ARM64:
2944 		SPDK_NOTICELOG("FUSE arch set to %d\n", fuse_arch);
2945 		disp->fuse_arch = fuse_arch;
2946 		return 0;
2947 	default:
2948 		return -EINVAL;
2949 	}
2950 }
2951 
2952 const char *
2953 spdk_fuse_dispatcher_get_fsdev_name(struct spdk_fuse_dispatcher *disp)
2954 {
2955 	return fuse_dispatcher_name(disp);
2956 }
2957 
2958 struct spdk_io_channel *
2959 spdk_fuse_dispatcher_get_io_channel(struct spdk_fuse_dispatcher *disp)
2960 {
2961 	return spdk_get_io_channel(__disp_to_io_dev(disp));
2962 }
2963 
2964 int
2965 spdk_fuse_dispatcher_submit_request(struct spdk_fuse_dispatcher *disp,
2966 				    struct spdk_io_channel *ch,
2967 				    struct iovec *in_iov, int in_iovcnt,
2968 				    struct iovec *out_iov, int out_iovcnt,
2969 				    spdk_fuse_dispatcher_submit_cpl_cb clb, void *cb_arg)
2970 {
2971 	struct fuse_io *fuse_io;
2972 	struct spdk_fuse_dispatcher_channel *disp_ch = __disp_ch_from_io_ch(ch);
2973 
2974 	fuse_io = spdk_mempool_get(g_fuse_mgr.fuse_io_pool);
2975 
2976 	if (!fuse_io) {
2977 		SPDK_ERRLOG("We ran out of FUSE IOs\n");
2978 		return -ENOBUFS;
2979 	}
2980 
2981 	fuse_io->disp = disp;
2982 	fuse_io->ch = disp_ch->fsdev_io_ch;
2983 	fuse_io->in_iov = in_iov;
2984 	fuse_io->in_iovcnt = in_iovcnt;
2985 	fuse_io->out_iov = out_iov;
2986 	fuse_io->out_iovcnt = out_iovcnt;
2987 	fuse_io->cpl_cb = clb;
2988 	fuse_io->cpl_cb_arg = cb_arg;
2989 
2990 	fuse_io->in_offs.iov_offs = 0;
2991 	fuse_io->in_offs.buf_offs = 0;
2992 	fuse_io->out_offs.iov_offs = 0;
2993 	fuse_io->out_offs.buf_offs = 0;
2994 
2995 	return spdk_fuse_dispatcher_handle_fuse_req(disp, fuse_io);
2996 }
2997 
2998 struct fuse_dispatcher_delete_ctx {
2999 	struct spdk_fuse_dispatcher *disp;
3000 	struct spdk_thread *thread;
3001 	spdk_fuse_dispatcher_delete_cpl_cb cb;
3002 	void *cb_arg;
3003 };
3004 
3005 static void
3006 fuse_dispatcher_delete_done(struct fuse_dispatcher_delete_ctx *ctx, int status)
3007 {
3008 	struct spdk_fuse_dispatcher *disp = ctx->disp;
3009 
3010 	if (!status) {
3011 		SPDK_DEBUGLOG(fuse_dispatcher, "%s: deletion succeeded\n", fuse_dispatcher_name(disp));
3012 
3013 		spdk_io_device_unregister(__disp_to_io_dev(disp), NULL);
3014 
3015 		free(disp);
3016 
3017 		pthread_mutex_lock(&g_fuse_mgr.lock);
3018 		g_fuse_mgr.ref_cnt--;
3019 		if (!g_fuse_mgr.ref_cnt) {
3020 			spdk_mempool_free(g_fuse_mgr.fuse_io_pool);
3021 			g_fuse_mgr.fuse_io_pool = NULL;
3022 		}
3023 		pthread_mutex_unlock(&g_fuse_mgr.lock);
3024 	} else {
3025 		SPDK_ERRLOG("%s: deletion failed with %d\n", fuse_dispatcher_name(disp), status);
3026 	}
3027 
3028 	ctx->cb(ctx->cb_arg, (uint32_t)(-status));
3029 	free(ctx);
3030 }
3031 
3032 static void
3033 fuse_dispatcher_delete_put_channel(struct spdk_io_channel_iter *i)
3034 {
3035 	struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i);
3036 	struct spdk_fuse_dispatcher_channel *ch = __disp_ch_from_io_ch(io_ch);
3037 
3038 	if (ch->fsdev_io_ch) {
3039 		spdk_put_io_channel(ch->fsdev_io_ch);
3040 		ch->fsdev_io_ch = NULL;
3041 	}
3042 
3043 	spdk_for_each_channel_continue(i, 0);
3044 }
3045 
3046 static void
3047 fuse_dispatcher_delete_done_msg(void *_ctx)
3048 {
3049 	struct fuse_dispatcher_delete_ctx *ctx = _ctx;
3050 
3051 	fuse_dispatcher_delete_done(ctx, 0);
3052 }
3053 
3054 static void
3055 fuse_dispatcher_delete_close_fsdev_msg(void *_ctx)
3056 {
3057 	struct fuse_dispatcher_delete_ctx *ctx = _ctx;
3058 	struct spdk_fuse_dispatcher *disp = ctx->disp;
3059 
3060 	spdk_fsdev_close(disp->desc);
3061 
3062 	spdk_thread_send_msg(ctx->thread, fuse_dispatcher_delete_done_msg, ctx);
3063 }
3064 
3065 static void
3066 fuse_dispatcher_delete_put_channel_done(struct spdk_io_channel_iter *i, int status)
3067 {
3068 	struct fuse_dispatcher_delete_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3069 	struct spdk_fuse_dispatcher *disp = ctx->disp;
3070 
3071 	if (status) {
3072 		SPDK_ERRLOG("%s: putting channels failed with %d\n", fuse_dispatcher_name(disp), status);
3073 		fuse_dispatcher_delete_done(ctx, status);
3074 		return;
3075 	}
3076 
3077 	SPDK_DEBUGLOG(fuse_dispatcher, "%s: putting channels succeeded. Releasing the fdev\n",
3078 		      fuse_dispatcher_name(disp));
3079 
3080 	spdk_thread_send_msg(disp->fsdev_thread, fuse_dispatcher_delete_close_fsdev_msg, ctx);
3081 }
3082 
3083 int
3084 spdk_fuse_dispatcher_delete(struct spdk_fuse_dispatcher *disp,
3085 			    spdk_fuse_dispatcher_delete_cpl_cb cb, void *cb_arg)
3086 {
3087 	struct fuse_dispatcher_delete_ctx *ctx;
3088 
3089 	ctx = calloc(1, sizeof(*ctx));
3090 	if (!ctx) {
3091 		SPDK_ERRLOG("cannot allocate context\n");
3092 		return -ENOMEM;
3093 	}
3094 
3095 	ctx->disp = disp;
3096 	ctx->cb = cb;
3097 	ctx->cb_arg = cb_arg;
3098 	ctx->thread = spdk_get_thread();
3099 
3100 	if (disp->desc) {
3101 		SPDK_DEBUGLOG(fuse_dispatcher, "%s: fsdev still open. Releasing the channels.\n",
3102 			      fuse_dispatcher_name(disp));
3103 
3104 		spdk_for_each_channel(__disp_to_io_dev(disp),
3105 				      fuse_dispatcher_delete_put_channel,
3106 				      ctx,
3107 				      fuse_dispatcher_delete_put_channel_done);
3108 	} else {
3109 		fuse_dispatcher_delete_done(ctx, 0);
3110 	}
3111 
3112 	return 0;
3113 }
3114 
3115 SPDK_LOG_REGISTER_COMPONENT(fuse_dispatcher)
3116