xref: /spdk/lib/nbd/nbd.c (revision da2fd6651a9cd4732b0910d30291821e77f4d643)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/string.h"
36 
37 #include <linux/nbd.h>
38 
39 #include "spdk/nbd.h"
40 #include "nbd_internal.h"
41 #include "spdk/bdev.h"
42 #include "spdk/endian.h"
43 #include "spdk/env.h"
44 #include "spdk/likely.h"
45 #include "spdk/log.h"
46 #include "spdk/util.h"
47 #include "spdk/thread.h"
48 
49 #include "spdk/queue.h"
50 
51 #define GET_IO_LOOP_COUNT		16
52 #define NBD_START_BUSY_WAITING_MS	1000
53 #define NBD_STOP_BUSY_WAITING_MS	10000
54 #define NBD_BUSY_POLLING_INTERVAL_US	20000
55 #define NBD_IO_TIMEOUT_S		60
56 
57 enum nbd_io_state_t {
58 	/* Receiving or ready to receive nbd request header */
59 	NBD_IO_RECV_REQ = 0,
60 	/* Receiving write payload */
61 	NBD_IO_RECV_PAYLOAD,
62 	/* Transmitting or ready to transmit nbd response header */
63 	NBD_IO_XMIT_RESP,
64 	/* Transmitting read payload */
65 	NBD_IO_XMIT_PAYLOAD,
66 };
67 
68 struct nbd_io {
69 	struct spdk_nbd_disk	*nbd;
70 	enum nbd_io_state_t	state;
71 
72 	void			*payload;
73 	uint32_t		payload_size;
74 
75 	struct nbd_request	req;
76 	struct nbd_reply	resp;
77 
78 	/*
79 	 * Tracks current progress on reading/writing a request,
80 	 * response, or payload from the nbd socket.
81 	 */
82 	uint32_t		offset;
83 
84 	/* for bdev io_wait */
85 	struct spdk_bdev_io_wait_entry bdev_io_wait;
86 
87 	TAILQ_ENTRY(nbd_io)	tailq;
88 };
89 
90 struct spdk_nbd_disk {
91 	struct spdk_bdev	*bdev;
92 	struct spdk_bdev_desc	*bdev_desc;
93 	struct spdk_io_channel	*ch;
94 	int			dev_fd;
95 	char			*nbd_path;
96 	int			kernel_sp_fd;
97 	int			spdk_sp_fd;
98 	struct spdk_poller	*nbd_poller;
99 	struct spdk_interrupt	*intr;
100 	bool			interrupt_mode;
101 	uint32_t		buf_align;
102 
103 	struct spdk_poller	*retry_poller;
104 	int			retry_count;
105 	/* Synchronize nbd_start_kernel pthread and nbd_stop */
106 	bool			has_nbd_pthread;
107 
108 	struct nbd_io		*io_in_recv;
109 	TAILQ_HEAD(, nbd_io)	received_io_list;
110 	TAILQ_HEAD(, nbd_io)	executed_io_list;
111 
112 	bool			is_started;
113 	bool			is_closing;
114 	/* count of nbd_io in spdk_nbd_disk */
115 	int			io_count;
116 
117 	TAILQ_ENTRY(spdk_nbd_disk)	tailq;
118 };
119 
120 struct spdk_nbd_disk_globals {
121 	TAILQ_HEAD(, spdk_nbd_disk)	disk_head;
122 };
123 
124 static struct spdk_nbd_disk_globals g_spdk_nbd;
125 static spdk_nbd_fini_cb g_fini_cb_fn;
126 static void *g_fini_cb_arg;
127 
128 static void _nbd_fini(void *arg1);
129 
130 static int
131 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io);
132 static int
133 nbd_io_recv_internal(struct spdk_nbd_disk *nbd);
134 
135 int
136 spdk_nbd_init(void)
137 {
138 	TAILQ_INIT(&g_spdk_nbd.disk_head);
139 
140 	return 0;
141 }
142 
143 static void
144 _nbd_fini(void *arg1)
145 {
146 	struct spdk_nbd_disk *nbd, *nbd_tmp;
147 
148 	TAILQ_FOREACH_SAFE(nbd, &g_spdk_nbd.disk_head, tailq, nbd_tmp) {
149 		if (!nbd->is_closing) {
150 			spdk_nbd_stop(nbd);
151 		}
152 	}
153 
154 	/* Check if all nbds closed */
155 	if (!TAILQ_FIRST(&g_spdk_nbd.disk_head)) {
156 		g_fini_cb_fn(g_fini_cb_arg);
157 	} else {
158 		spdk_thread_send_msg(spdk_get_thread(),
159 				     _nbd_fini, NULL);
160 	}
161 }
162 
163 void
164 spdk_nbd_fini(spdk_nbd_fini_cb cb_fn, void *cb_arg)
165 {
166 	g_fini_cb_fn = cb_fn;
167 	g_fini_cb_arg = cb_arg;
168 
169 	_nbd_fini(NULL);
170 }
171 
172 static int
173 nbd_disk_register(struct spdk_nbd_disk *nbd)
174 {
175 	/* Make sure nbd_path is not used in this SPDK app */
176 	if (nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
177 		SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path);
178 		return -EBUSY;
179 	}
180 
181 	TAILQ_INSERT_TAIL(&g_spdk_nbd.disk_head, nbd, tailq);
182 
183 	return 0;
184 }
185 
186 static void
187 nbd_disk_unregister(struct spdk_nbd_disk *nbd)
188 {
189 	struct spdk_nbd_disk *nbd_idx, *nbd_tmp;
190 
191 	/*
192 	 * nbd disk may be stopped before registered.
193 	 * check whether it was registered.
194 	 */
195 	TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) {
196 		if (nbd == nbd_idx) {
197 			TAILQ_REMOVE(&g_spdk_nbd.disk_head, nbd_idx, tailq);
198 			break;
199 		}
200 	}
201 }
202 
203 struct spdk_nbd_disk *
204 nbd_disk_find_by_nbd_path(const char *nbd_path)
205 {
206 	struct spdk_nbd_disk *nbd;
207 
208 	/*
209 	 * check whether nbd has already been registered by nbd path.
210 	 */
211 	TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) {
212 		if (!strcmp(nbd->nbd_path, nbd_path)) {
213 			return nbd;
214 		}
215 	}
216 
217 	return NULL;
218 }
219 
220 struct spdk_nbd_disk *nbd_disk_first(void)
221 {
222 	return TAILQ_FIRST(&g_spdk_nbd.disk_head);
223 }
224 
225 struct spdk_nbd_disk *nbd_disk_next(struct spdk_nbd_disk *prev)
226 {
227 	return TAILQ_NEXT(prev, tailq);
228 }
229 
230 const char *
231 nbd_disk_get_nbd_path(struct spdk_nbd_disk *nbd)
232 {
233 	return nbd->nbd_path;
234 }
235 
236 const char *
237 nbd_disk_get_bdev_name(struct spdk_nbd_disk *nbd)
238 {
239 	return spdk_bdev_get_name(nbd->bdev);
240 }
241 
242 void
243 spdk_nbd_write_config_json(struct spdk_json_write_ctx *w)
244 {
245 	struct spdk_nbd_disk *nbd;
246 
247 	spdk_json_write_array_begin(w);
248 
249 	TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) {
250 		spdk_json_write_object_begin(w);
251 
252 		spdk_json_write_named_string(w, "method", "nbd_start_disk");
253 
254 		spdk_json_write_named_object_begin(w, "params");
255 		spdk_json_write_named_string(w, "nbd_device",  nbd_disk_get_nbd_path(nbd));
256 		spdk_json_write_named_string(w, "bdev_name", nbd_disk_get_bdev_name(nbd));
257 		spdk_json_write_object_end(w);
258 
259 		spdk_json_write_object_end(w);
260 	}
261 
262 	spdk_json_write_array_end(w);
263 }
264 
265 void
266 nbd_disconnect(struct spdk_nbd_disk *nbd)
267 {
268 	/*
269 	 * nbd soft-disconnection to terminate transmission phase.
270 	 * After receiving this ioctl command, nbd kernel module will send
271 	 * a NBD_CMD_DISC type io to nbd server in order to inform server.
272 	 */
273 	ioctl(nbd->dev_fd, NBD_DISCONNECT);
274 }
275 
276 static struct nbd_io *
277 nbd_get_io(struct spdk_nbd_disk *nbd)
278 {
279 	struct nbd_io *io;
280 
281 	io = calloc(1, sizeof(*io));
282 	if (!io) {
283 		return NULL;
284 	}
285 
286 	io->nbd = nbd;
287 	to_be32(&io->resp.magic, NBD_REPLY_MAGIC);
288 
289 	nbd->io_count++;
290 
291 	return io;
292 }
293 
294 static void
295 nbd_put_io(struct spdk_nbd_disk *nbd, struct nbd_io *io)
296 {
297 	if (io->payload) {
298 		spdk_free(io->payload);
299 	}
300 	free(io);
301 
302 	nbd->io_count--;
303 }
304 
305 /*
306  * Check whether received nbd_io are all executed,
307  * and put back executed nbd_io instead of transmitting them
308  *
309  * \return 1 there is still some nbd_io under executing
310  *         0 all nbd_io gotten are freed.
311  */
312 static int
313 nbd_cleanup_io(struct spdk_nbd_disk *nbd)
314 {
315 	/* Try to read the remaining nbd commands in the socket */
316 	while (nbd_io_recv_internal(nbd) > 0);
317 
318 	/* free io_in_recv */
319 	if (nbd->io_in_recv != NULL) {
320 		nbd_put_io(nbd, nbd->io_in_recv);
321 		nbd->io_in_recv = NULL;
322 	}
323 
324 	/*
325 	 * Some nbd_io may be under executing in bdev.
326 	 * Wait for their done operation.
327 	 */
328 	if (nbd->io_count != 0) {
329 		return 1;
330 	}
331 
332 	return 0;
333 }
334 
335 static int
336 _nbd_stop(void *arg)
337 {
338 	struct spdk_nbd_disk *nbd = arg;
339 
340 	if (nbd->nbd_poller) {
341 		spdk_poller_unregister(&nbd->nbd_poller);
342 	}
343 
344 	if (nbd->intr) {
345 		spdk_interrupt_unregister(&nbd->intr);
346 	}
347 
348 	if (nbd->spdk_sp_fd >= 0) {
349 		close(nbd->spdk_sp_fd);
350 		nbd->spdk_sp_fd = -1;
351 	}
352 
353 	if (nbd->kernel_sp_fd >= 0) {
354 		close(nbd->kernel_sp_fd);
355 		nbd->kernel_sp_fd = -1;
356 	}
357 
358 	/* Continue the stop procedure after the exit of nbd_start_kernel pthread */
359 	if (nbd->has_nbd_pthread) {
360 		if (nbd->retry_poller == NULL) {
361 			nbd->retry_count = NBD_STOP_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US;
362 			nbd->retry_poller = SPDK_POLLER_REGISTER(_nbd_stop, nbd,
363 					    NBD_BUSY_POLLING_INTERVAL_US);
364 			return SPDK_POLLER_BUSY;
365 		}
366 
367 		if (nbd->retry_count-- > 0) {
368 			return SPDK_POLLER_BUSY;
369 		}
370 
371 		SPDK_ERRLOG("Failed to wait for returning of NBD_DO_IT ioctl.\n");
372 	}
373 
374 	if (nbd->retry_poller) {
375 		spdk_poller_unregister(&nbd->retry_poller);
376 	}
377 
378 	if (nbd->dev_fd >= 0) {
379 		/* Clear nbd device only if it is occupied by SPDK app */
380 		if (nbd->nbd_path && nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
381 			ioctl(nbd->dev_fd, NBD_CLEAR_QUE);
382 			ioctl(nbd->dev_fd, NBD_CLEAR_SOCK);
383 		}
384 		close(nbd->dev_fd);
385 	}
386 
387 	if (nbd->nbd_path) {
388 		free(nbd->nbd_path);
389 	}
390 
391 	if (nbd->ch) {
392 		spdk_put_io_channel(nbd->ch);
393 		nbd->ch = NULL;
394 	}
395 
396 	if (nbd->bdev_desc) {
397 		spdk_bdev_close(nbd->bdev_desc);
398 		nbd->bdev_desc = NULL;
399 	}
400 
401 	nbd_disk_unregister(nbd);
402 
403 	free(nbd);
404 
405 	return 0;
406 }
407 
408 int
409 spdk_nbd_stop(struct spdk_nbd_disk *nbd)
410 {
411 	int rc = 0;
412 
413 	if (nbd == NULL) {
414 		return rc;
415 	}
416 
417 	nbd->is_closing = true;
418 
419 	/* if nbd is not started, it will continue to call nbd stop later */
420 	if (!nbd->is_started) {
421 		return 1;
422 	}
423 
424 	/*
425 	 * Stop action should be called only after all nbd_io are executed.
426 	 */
427 
428 	rc = nbd_cleanup_io(nbd);
429 	if (!rc) {
430 		_nbd_stop(nbd);
431 	}
432 
433 	return rc;
434 }
435 
436 static int64_t
437 nbd_socket_rw(int fd, void *buf, size_t length, bool read_op)
438 {
439 	ssize_t rc;
440 
441 	if (read_op) {
442 		rc = read(fd, buf, length);
443 	} else {
444 		rc = write(fd, buf, length);
445 	}
446 
447 	if (rc == 0) {
448 		return -EIO;
449 	} else if (rc == -1) {
450 		if (errno != EAGAIN) {
451 			return -errno;
452 		}
453 		return 0;
454 	} else {
455 		return rc;
456 	}
457 }
458 
459 static void
460 nbd_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
461 {
462 	struct nbd_io	*io = cb_arg;
463 	struct spdk_nbd_disk *nbd = io->nbd;
464 
465 	if (success) {
466 		io->resp.error = 0;
467 	} else {
468 		to_be32(&io->resp.error, EIO);
469 	}
470 
471 	memcpy(&io->resp.handle, &io->req.handle, sizeof(io->resp.handle));
472 
473 	/* When there begins to have executed_io, enable socket writable notice in order to
474 	 * get it processed in nbd_io_xmit
475 	 */
476 	if (nbd->interrupt_mode && TAILQ_EMPTY(&nbd->executed_io_list)) {
477 		spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT);
478 	}
479 
480 	TAILQ_INSERT_TAIL(&nbd->executed_io_list, io, tailq);
481 
482 	if (bdev_io != NULL) {
483 		spdk_bdev_free_io(bdev_io);
484 	}
485 }
486 
487 static void
488 nbd_resubmit_io(void *arg)
489 {
490 	struct nbd_io *io = (struct nbd_io *)arg;
491 	struct spdk_nbd_disk *nbd = io->nbd;
492 	int rc = 0;
493 
494 	rc = nbd_submit_bdev_io(nbd, io);
495 	if (rc) {
496 		SPDK_INFOLOG(nbd, "nbd: io resubmit for dev %s , io_type %d, returned %d.\n",
497 			     nbd_disk_get_bdev_name(nbd), from_be32(&io->req.type), rc);
498 	}
499 }
500 
501 static void
502 nbd_queue_io(struct nbd_io *io)
503 {
504 	int rc;
505 	struct spdk_bdev *bdev = io->nbd->bdev;
506 
507 	io->bdev_io_wait.bdev = bdev;
508 	io->bdev_io_wait.cb_fn = nbd_resubmit_io;
509 	io->bdev_io_wait.cb_arg = io;
510 
511 	rc = spdk_bdev_queue_io_wait(bdev, io->nbd->ch, &io->bdev_io_wait);
512 	if (rc != 0) {
513 		SPDK_ERRLOG("Queue io failed in nbd_queue_io, rc=%d.\n", rc);
514 		nbd_io_done(NULL, false, io);
515 	}
516 }
517 
518 static int
519 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io)
520 {
521 	struct spdk_bdev_desc *desc = nbd->bdev_desc;
522 	struct spdk_io_channel *ch = nbd->ch;
523 	int rc = 0;
524 
525 	switch (from_be32(&io->req.type)) {
526 	case NBD_CMD_READ:
527 		rc = spdk_bdev_read(desc, ch, io->payload, from_be64(&io->req.from),
528 				    io->payload_size, nbd_io_done, io);
529 		break;
530 	case NBD_CMD_WRITE:
531 		rc = spdk_bdev_write(desc, ch, io->payload, from_be64(&io->req.from),
532 				     io->payload_size, nbd_io_done, io);
533 		break;
534 #ifdef NBD_FLAG_SEND_FLUSH
535 	case NBD_CMD_FLUSH:
536 		rc = spdk_bdev_flush(desc, ch, 0,
537 				     spdk_bdev_get_num_blocks(nbd->bdev) * spdk_bdev_get_block_size(nbd->bdev),
538 				     nbd_io_done, io);
539 		break;
540 #endif
541 #ifdef NBD_FLAG_SEND_TRIM
542 	case NBD_CMD_TRIM:
543 		rc = spdk_bdev_unmap(desc, ch, from_be64(&io->req.from),
544 				     from_be32(&io->req.len), nbd_io_done, io);
545 		break;
546 #endif
547 	case NBD_CMD_DISC:
548 		nbd->is_closing = true;
549 		rc = spdk_bdev_abort(desc, ch, io, nbd_io_done, io);
550 
551 		/* when there begins to have executed_io to send, enable socket writable notice */
552 		if (nbd->interrupt_mode && TAILQ_EMPTY(&nbd->executed_io_list)) {
553 			spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT);
554 		}
555 
556 		break;
557 	default:
558 		rc = -1;
559 	}
560 
561 	if (rc < 0) {
562 		if (rc == -ENOMEM) {
563 			SPDK_INFOLOG(nbd, "No memory, start to queue io.\n");
564 			nbd_queue_io(io);
565 		} else {
566 			SPDK_ERRLOG("nbd io failed in nbd_queue_io, rc=%d.\n", rc);
567 			nbd_io_done(NULL, false, io);
568 		}
569 	}
570 
571 	return 0;
572 }
573 
574 static int
575 nbd_io_exec(struct spdk_nbd_disk *nbd)
576 {
577 	struct nbd_io *io, *io_tmp;
578 	int io_count = 0;
579 	int ret = 0;
580 
581 	if (!TAILQ_EMPTY(&nbd->received_io_list)) {
582 		TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) {
583 			TAILQ_REMOVE(&nbd->received_io_list, io, tailq);
584 			ret = nbd_submit_bdev_io(nbd, io);
585 			if (ret < 0) {
586 				return ret;
587 			}
588 
589 			io_count++;
590 		}
591 	}
592 
593 	return io_count;
594 }
595 
596 static int
597 nbd_io_recv_internal(struct spdk_nbd_disk *nbd)
598 {
599 	struct nbd_io *io;
600 	int ret = 0;
601 	int received = 0;
602 
603 	if (nbd->io_in_recv == NULL) {
604 		nbd->io_in_recv = nbd_get_io(nbd);
605 		if (!nbd->io_in_recv) {
606 			return -ENOMEM;
607 		}
608 	}
609 
610 	io = nbd->io_in_recv;
611 
612 	if (io->state == NBD_IO_RECV_REQ) {
613 		ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->req + io->offset,
614 				    sizeof(io->req) - io->offset, true);
615 		if (ret < 0) {
616 			nbd_put_io(nbd, io);
617 			nbd->io_in_recv = NULL;
618 			return ret;
619 		}
620 
621 		io->offset += ret;
622 		received = ret;
623 
624 		/* request is fully received */
625 		if (io->offset == sizeof(io->req)) {
626 			io->offset = 0;
627 
628 			/* req magic check */
629 			if (from_be32(&io->req.magic) != NBD_REQUEST_MAGIC) {
630 				SPDK_ERRLOG("invalid request magic\n");
631 				nbd_put_io(nbd, io);
632 				nbd->io_in_recv = NULL;
633 				return -EINVAL;
634 			}
635 
636 			/* io except read/write should ignore payload */
637 			if (from_be32(&io->req.type) == NBD_CMD_WRITE ||
638 			    from_be32(&io->req.type) == NBD_CMD_READ) {
639 				io->payload_size = from_be32(&io->req.len);
640 			} else {
641 				io->payload_size = 0;
642 			}
643 
644 			/* io payload allocate */
645 			if (io->payload_size) {
646 				io->payload = spdk_malloc(io->payload_size, nbd->buf_align, NULL,
647 							  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
648 				if (io->payload == NULL) {
649 					SPDK_ERRLOG("could not allocate io->payload of size %d\n", io->payload_size);
650 					nbd_put_io(nbd, io);
651 					nbd->io_in_recv = NULL;
652 					return -ENOMEM;
653 				}
654 			} else {
655 				io->payload = NULL;
656 			}
657 
658 			/* next io step */
659 			if (from_be32(&io->req.type) == NBD_CMD_WRITE) {
660 				io->state = NBD_IO_RECV_PAYLOAD;
661 			} else {
662 				io->state = NBD_IO_XMIT_RESP;
663 				if (spdk_likely((!nbd->is_closing) && nbd->is_started)) {
664 					TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq);
665 				} else {
666 					nbd_io_done(NULL, false, io);
667 				}
668 				nbd->io_in_recv = NULL;
669 			}
670 		}
671 	}
672 
673 	if (io->state == NBD_IO_RECV_PAYLOAD) {
674 		ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset, true);
675 		if (ret < 0) {
676 			nbd_put_io(nbd, io);
677 			nbd->io_in_recv = NULL;
678 			return ret;
679 		}
680 
681 		io->offset += ret;
682 		received += ret;
683 
684 		/* request payload is fully received */
685 		if (io->offset == io->payload_size) {
686 			io->offset = 0;
687 			io->state = NBD_IO_XMIT_RESP;
688 			if (spdk_likely((!nbd->is_closing) && nbd->is_started)) {
689 				TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq);
690 			} else {
691 				nbd_io_done(NULL, false, io);
692 			}
693 			nbd->io_in_recv = NULL;
694 		}
695 
696 	}
697 
698 	return received;
699 }
700 
701 static int
702 nbd_io_recv(struct spdk_nbd_disk *nbd)
703 {
704 	int i, rc, ret = 0;
705 
706 	/*
707 	 * nbd server should not accept request after closing command
708 	 */
709 	if (nbd->is_closing) {
710 		return 0;
711 	}
712 
713 	for (i = 0; i < GET_IO_LOOP_COUNT; i++) {
714 		rc = nbd_io_recv_internal(nbd);
715 		if (rc < 0) {
716 			return rc;
717 		}
718 		ret += rc;
719 	}
720 
721 	return ret;
722 }
723 
724 static int
725 nbd_io_xmit_internal(struct spdk_nbd_disk *nbd)
726 {
727 	struct nbd_io *io;
728 	int ret = 0;
729 	int sent = 0;
730 
731 	io = TAILQ_FIRST(&nbd->executed_io_list);
732 	if (io == NULL) {
733 		return 0;
734 	}
735 
736 	/* Remove IO from list now assuming it will be completed.  It will be inserted
737 	 *  back to the head if it cannot be completed.  This approach is specifically
738 	 *  taken to work around a scan-build use-after-free mischaracterization.
739 	 */
740 	TAILQ_REMOVE(&nbd->executed_io_list, io, tailq);
741 
742 	/* resp error and handler are already set in io_done */
743 
744 	if (io->state == NBD_IO_XMIT_RESP) {
745 		ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->resp + io->offset,
746 				    sizeof(io->resp) - io->offset, false);
747 		if (ret <= 0) {
748 			goto reinsert;
749 		}
750 
751 		io->offset += ret;
752 		sent = ret;
753 
754 		/* response is fully transmitted */
755 		if (io->offset == sizeof(io->resp)) {
756 			io->offset = 0;
757 
758 			/* transmit payload only when NBD_CMD_READ with no resp error */
759 			if (from_be32(&io->req.type) != NBD_CMD_READ || io->resp.error != 0) {
760 				nbd_put_io(nbd, io);
761 				return 0;
762 			} else {
763 				io->state = NBD_IO_XMIT_PAYLOAD;
764 			}
765 		}
766 	}
767 
768 	if (io->state == NBD_IO_XMIT_PAYLOAD) {
769 		ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset,
770 				    false);
771 		if (ret <= 0) {
772 			goto reinsert;
773 		}
774 
775 		io->offset += ret;
776 		sent += ret;
777 
778 		/* read payload is fully transmitted */
779 		if (io->offset == io->payload_size) {
780 			nbd_put_io(nbd, io);
781 			return sent;
782 		}
783 	}
784 
785 reinsert:
786 	TAILQ_INSERT_HEAD(&nbd->executed_io_list, io, tailq);
787 	return ret < 0 ? ret : sent;
788 }
789 
790 static int
791 nbd_io_xmit(struct spdk_nbd_disk *nbd)
792 {
793 	int ret = 0;
794 	int rc;
795 
796 	while (!TAILQ_EMPTY(&nbd->executed_io_list)) {
797 		rc = nbd_io_xmit_internal(nbd);
798 		if (rc < 0) {
799 			return rc;
800 		}
801 
802 		ret += rc;
803 	}
804 
805 	/* When there begins to have no executed_io, disable socket writable notice */
806 	if (nbd->interrupt_mode) {
807 		spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN);
808 	}
809 
810 	return ret;
811 }
812 
813 /**
814  * Poll an NBD instance.
815  *
816  * \return 0 on success or negated errno values on error (e.g. connection closed).
817  */
818 static int
819 _nbd_poll(struct spdk_nbd_disk *nbd)
820 {
821 	int received, sent, executed;
822 
823 	/* transmit executed io first */
824 	sent = nbd_io_xmit(nbd);
825 	if (sent < 0) {
826 		return sent;
827 	}
828 
829 	received = nbd_io_recv(nbd);
830 	if (received < 0) {
831 		return received;
832 	}
833 
834 	executed = nbd_io_exec(nbd);
835 	if (executed < 0) {
836 		return executed;
837 	}
838 
839 	return sent + received + executed;
840 }
841 
842 static int
843 nbd_poll(void *arg)
844 {
845 	struct spdk_nbd_disk *nbd = arg;
846 	int rc;
847 
848 	rc = _nbd_poll(nbd);
849 	if (rc < 0) {
850 		SPDK_INFOLOG(nbd, "nbd_poll() returned %s (%d); closing connection\n",
851 			     spdk_strerror(-rc), rc);
852 		_nbd_stop(nbd);
853 		return SPDK_POLLER_IDLE;
854 	}
855 	if (nbd->is_closing) {
856 		spdk_nbd_stop(nbd);
857 	}
858 
859 	return SPDK_POLLER_BUSY;
860 }
861 
862 static void *
863 nbd_start_kernel(void *arg)
864 {
865 	struct spdk_nbd_disk *nbd = arg;
866 
867 	spdk_unaffinitize_thread();
868 
869 	/* This will block in the kernel until we close the spdk_sp_fd. */
870 	ioctl(nbd->dev_fd, NBD_DO_IT);
871 
872 	nbd->has_nbd_pthread = false;
873 
874 	pthread_exit(NULL);
875 }
876 
877 static void
878 nbd_bdev_hot_remove(struct spdk_nbd_disk *nbd)
879 {
880 	spdk_nbd_stop(nbd);
881 }
882 
883 static void
884 nbd_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
885 		  void *event_ctx)
886 {
887 	switch (type) {
888 	case SPDK_BDEV_EVENT_REMOVE:
889 		nbd_bdev_hot_remove(event_ctx);
890 		break;
891 	default:
892 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
893 		break;
894 	}
895 }
896 
897 struct spdk_nbd_start_ctx {
898 	struct spdk_nbd_disk	*nbd;
899 	spdk_nbd_start_cb	cb_fn;
900 	void			*cb_arg;
901 };
902 
903 static void
904 nbd_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
905 {
906 	struct spdk_nbd_disk *nbd = cb_arg;
907 
908 	nbd->interrupt_mode = interrupt_mode;
909 }
910 
911 static void
912 nbd_start_complete(struct spdk_nbd_start_ctx *ctx)
913 {
914 	int		rc;
915 	pthread_t	tid;
916 	unsigned long	nbd_flags = 0;
917 
918 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_BLKSIZE, spdk_bdev_get_block_size(ctx->nbd->bdev));
919 	if (rc == -1) {
920 		SPDK_ERRLOG("ioctl(NBD_SET_BLKSIZE) failed: %s\n", spdk_strerror(errno));
921 		rc = -errno;
922 		goto err;
923 	}
924 
925 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SIZE_BLOCKS, spdk_bdev_get_num_blocks(ctx->nbd->bdev));
926 	if (rc == -1) {
927 		SPDK_ERRLOG("ioctl(NBD_SET_SIZE_BLOCKS) failed: %s\n", spdk_strerror(errno));
928 		rc = -errno;
929 		goto err;
930 	}
931 
932 #ifdef NBD_SET_TIMEOUT
933 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_TIMEOUT, NBD_IO_TIMEOUT_S);
934 	if (rc == -1) {
935 		SPDK_ERRLOG("ioctl(NBD_SET_TIMEOUT) failed: %s\n", spdk_strerror(errno));
936 		rc = -errno;
937 		goto err;
938 	}
939 #else
940 	SPDK_NOTICELOG("ioctl(NBD_SET_TIMEOUT) is not supported.\n");
941 #endif
942 
943 #ifdef NBD_FLAG_SEND_FLUSH
944 	nbd_flags |= NBD_FLAG_SEND_FLUSH;
945 #endif
946 #ifdef NBD_FLAG_SEND_TRIM
947 	nbd_flags |= NBD_FLAG_SEND_TRIM;
948 #endif
949 	if (nbd_flags) {
950 		rc = ioctl(ctx->nbd->dev_fd, NBD_SET_FLAGS, nbd_flags);
951 		if (rc == -1) {
952 			SPDK_ERRLOG("ioctl(NBD_SET_FLAGS, 0x%lx) failed: %s\n", nbd_flags, spdk_strerror(errno));
953 			rc = -errno;
954 			goto err;
955 		}
956 	}
957 
958 	ctx->nbd->has_nbd_pthread = true;
959 	rc = pthread_create(&tid, NULL, nbd_start_kernel, ctx->nbd);
960 	if (rc != 0) {
961 		ctx->nbd->has_nbd_pthread = false;
962 		SPDK_ERRLOG("could not create thread: %s\n", spdk_strerror(rc));
963 		rc = -rc;
964 		goto err;
965 	}
966 
967 	rc = pthread_detach(tid);
968 	if (rc != 0) {
969 		SPDK_ERRLOG("could not detach thread for nbd kernel: %s\n", spdk_strerror(rc));
970 		rc = -rc;
971 		goto err;
972 	}
973 
974 	if (spdk_interrupt_mode_is_enabled()) {
975 		ctx->nbd->intr = SPDK_INTERRUPT_REGISTER(ctx->nbd->spdk_sp_fd, nbd_poll, ctx->nbd);
976 	}
977 
978 	ctx->nbd->nbd_poller = SPDK_POLLER_REGISTER(nbd_poll, ctx->nbd, 0);
979 	spdk_poller_register_interrupt(ctx->nbd->nbd_poller, nbd_poller_set_interrupt_mode, ctx->nbd);
980 
981 	if (ctx->cb_fn) {
982 		ctx->cb_fn(ctx->cb_arg, ctx->nbd, 0);
983 	}
984 
985 	/* nbd will possibly receive stop command while initing */
986 	ctx->nbd->is_started = true;
987 
988 	free(ctx);
989 	return;
990 
991 err:
992 	_nbd_stop(ctx->nbd);
993 	if (ctx->cb_fn) {
994 		ctx->cb_fn(ctx->cb_arg, NULL, rc);
995 	}
996 	free(ctx);
997 }
998 
999 static int
1000 nbd_enable_kernel(void *arg)
1001 {
1002 	struct spdk_nbd_start_ctx *ctx = arg;
1003 	int rc;
1004 
1005 	/* Declare device setup by this process */
1006 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SOCK, ctx->nbd->kernel_sp_fd);
1007 
1008 	if (rc) {
1009 		if (errno == EBUSY) {
1010 			if (ctx->nbd->retry_poller == NULL) {
1011 				ctx->nbd->retry_count = NBD_START_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US;
1012 				ctx->nbd->retry_poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx,
1013 							 NBD_BUSY_POLLING_INTERVAL_US);
1014 				return SPDK_POLLER_BUSY;
1015 			} else if (ctx->nbd->retry_count-- > 0) {
1016 				/* Repeatedly unregiter and register retry poller to avoid scan-build error */
1017 				spdk_poller_unregister(&ctx->nbd->retry_poller);
1018 				ctx->nbd->retry_poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx,
1019 							 NBD_BUSY_POLLING_INTERVAL_US);
1020 				return SPDK_POLLER_BUSY;
1021 			}
1022 		}
1023 
1024 		SPDK_ERRLOG("ioctl(NBD_SET_SOCK) failed: %s\n", spdk_strerror(errno));
1025 		if (ctx->nbd->retry_poller) {
1026 			spdk_poller_unregister(&ctx->nbd->retry_poller);
1027 		}
1028 
1029 		_nbd_stop(ctx->nbd);
1030 
1031 		if (ctx->cb_fn) {
1032 			ctx->cb_fn(ctx->cb_arg, NULL, -errno);
1033 		}
1034 
1035 		free(ctx);
1036 		return SPDK_POLLER_BUSY;
1037 	}
1038 
1039 	if (ctx->nbd->retry_poller) {
1040 		spdk_poller_unregister(&ctx->nbd->retry_poller);
1041 	}
1042 
1043 	nbd_start_complete(ctx);
1044 
1045 	return SPDK_POLLER_BUSY;
1046 }
1047 
1048 void
1049 spdk_nbd_start(const char *bdev_name, const char *nbd_path,
1050 	       spdk_nbd_start_cb cb_fn, void *cb_arg)
1051 {
1052 	struct spdk_nbd_start_ctx	*ctx = NULL;
1053 	struct spdk_nbd_disk		*nbd = NULL;
1054 	struct spdk_bdev		*bdev;
1055 	int				rc;
1056 	int				sp[2];
1057 
1058 	nbd = calloc(1, sizeof(*nbd));
1059 	if (nbd == NULL) {
1060 		rc = -ENOMEM;
1061 		goto err;
1062 	}
1063 
1064 	nbd->dev_fd = -1;
1065 	nbd->spdk_sp_fd = -1;
1066 	nbd->kernel_sp_fd = -1;
1067 
1068 	ctx = calloc(1, sizeof(*ctx));
1069 	if (ctx == NULL) {
1070 		rc = -ENOMEM;
1071 		goto err;
1072 	}
1073 
1074 	ctx->nbd = nbd;
1075 	ctx->cb_fn = cb_fn;
1076 	ctx->cb_arg = cb_arg;
1077 
1078 	rc = spdk_bdev_open_ext(bdev_name, true, nbd_bdev_event_cb, nbd, &nbd->bdev_desc);
1079 	if (rc != 0) {
1080 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc);
1081 		goto err;
1082 	}
1083 
1084 	bdev = spdk_bdev_desc_get_bdev(nbd->bdev_desc);
1085 	nbd->bdev = bdev;
1086 
1087 	nbd->ch = spdk_bdev_get_io_channel(nbd->bdev_desc);
1088 	nbd->buf_align = spdk_max(spdk_bdev_get_buf_align(bdev), 64);
1089 
1090 	rc = socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, sp);
1091 	if (rc != 0) {
1092 		SPDK_ERRLOG("socketpair failed\n");
1093 		rc = -errno;
1094 		goto err;
1095 	}
1096 
1097 	nbd->spdk_sp_fd = sp[0];
1098 	nbd->kernel_sp_fd = sp[1];
1099 	nbd->nbd_path = strdup(nbd_path);
1100 	if (!nbd->nbd_path) {
1101 		SPDK_ERRLOG("strdup allocation failure\n");
1102 		rc = -ENOMEM;
1103 		goto err;
1104 	}
1105 
1106 	TAILQ_INIT(&nbd->received_io_list);
1107 	TAILQ_INIT(&nbd->executed_io_list);
1108 
1109 	/* Add nbd_disk to the end of disk list */
1110 	rc = nbd_disk_register(ctx->nbd);
1111 	if (rc != 0) {
1112 		goto err;
1113 	}
1114 
1115 	nbd->dev_fd = open(nbd_path, O_RDWR | O_DIRECT);
1116 	if (nbd->dev_fd == -1) {
1117 		SPDK_ERRLOG("open(\"%s\") failed: %s\n", nbd_path, spdk_strerror(errno));
1118 		rc = -errno;
1119 		goto err;
1120 	}
1121 
1122 	SPDK_INFOLOG(nbd, "Enabling kernel access to bdev %s via %s\n",
1123 		     bdev_name, nbd_path);
1124 
1125 	nbd_enable_kernel(ctx);
1126 	return;
1127 
1128 err:
1129 	free(ctx);
1130 	if (nbd) {
1131 		_nbd_stop(nbd);
1132 	}
1133 
1134 	if (cb_fn) {
1135 		cb_fn(cb_arg, NULL, rc);
1136 	}
1137 }
1138 
1139 const char *
1140 spdk_nbd_get_path(struct spdk_nbd_disk *nbd)
1141 {
1142 	return nbd->nbd_path;
1143 }
1144 
1145 SPDK_LOG_REGISTER_COMPONENT(nbd)
1146