xref: /spdk/lib/nbd/nbd.c (revision 83ba9086796471697a4975a58f60e2392bccd08c)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/string.h"
8 
9 #include <linux/nbd.h>
10 
11 #include "spdk/nbd.h"
12 #include "nbd_internal.h"
13 #include "spdk/bdev.h"
14 #include "spdk/endian.h"
15 #include "spdk/env.h"
16 #include "spdk/likely.h"
17 #include "spdk/log.h"
18 #include "spdk/util.h"
19 #include "spdk/thread.h"
20 
21 #include "spdk/queue.h"
22 
23 #define GET_IO_LOOP_COUNT		16
24 #define NBD_START_BUSY_WAITING_MS	1000
25 #define NBD_STOP_BUSY_WAITING_MS	10000
26 #define NBD_BUSY_POLLING_INTERVAL_US	20000
27 #define NBD_IO_TIMEOUT_S		60
28 
29 enum nbd_io_state_t {
30 	/* Receiving or ready to receive nbd request header */
31 	NBD_IO_RECV_REQ = 0,
32 	/* Receiving write payload */
33 	NBD_IO_RECV_PAYLOAD,
34 	/* Transmitting or ready to transmit nbd response header */
35 	NBD_IO_XMIT_RESP,
36 	/* Transmitting read payload */
37 	NBD_IO_XMIT_PAYLOAD,
38 };
39 
40 struct nbd_io {
41 	struct spdk_nbd_disk	*nbd;
42 	enum nbd_io_state_t	state;
43 
44 	void			*payload;
45 	uint32_t		payload_size;
46 
47 	struct nbd_request	req;
48 	struct nbd_reply	resp;
49 
50 	/*
51 	 * Tracks current progress on reading/writing a request,
52 	 * response, or payload from the nbd socket.
53 	 */
54 	uint32_t		offset;
55 
56 	/* for bdev io_wait */
57 	struct spdk_bdev_io_wait_entry bdev_io_wait;
58 
59 	TAILQ_ENTRY(nbd_io)	tailq;
60 };
61 
62 struct spdk_nbd_disk {
63 	struct spdk_bdev	*bdev;
64 	struct spdk_bdev_desc	*bdev_desc;
65 	struct spdk_io_channel	*ch;
66 	int			dev_fd;
67 	char			*nbd_path;
68 	int			kernel_sp_fd;
69 	int			spdk_sp_fd;
70 	struct spdk_poller	*nbd_poller;
71 	struct spdk_interrupt	*intr;
72 	bool			interrupt_mode;
73 	uint32_t		buf_align;
74 
75 	struct spdk_poller	*retry_poller;
76 	int			retry_count;
77 	/* Synchronize nbd_start_kernel pthread and nbd_stop */
78 	bool			has_nbd_pthread;
79 
80 	struct nbd_io		*io_in_recv;
81 	TAILQ_HEAD(, nbd_io)	received_io_list;
82 	TAILQ_HEAD(, nbd_io)	executed_io_list;
83 	TAILQ_HEAD(, nbd_io)	processing_io_list;
84 
85 	bool			is_started;
86 	bool			is_closing;
87 	/* count of nbd_io in spdk_nbd_disk */
88 	int			io_count;
89 
90 	TAILQ_ENTRY(spdk_nbd_disk)	tailq;
91 };
92 
93 struct spdk_nbd_disk_globals {
94 	TAILQ_HEAD(, spdk_nbd_disk)	disk_head;
95 };
96 
97 static struct spdk_nbd_disk_globals g_spdk_nbd;
98 static spdk_nbd_fini_cb g_fini_cb_fn;
99 static void *g_fini_cb_arg;
100 
101 static void _nbd_fini(void *arg1);
102 
103 static int nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io);
104 static int nbd_io_recv_internal(struct spdk_nbd_disk *nbd);
105 
106 int
107 spdk_nbd_init(void)
108 {
109 	TAILQ_INIT(&g_spdk_nbd.disk_head);
110 
111 	return 0;
112 }
113 
114 static void
115 _nbd_fini(void *arg1)
116 {
117 	struct spdk_nbd_disk *nbd, *nbd_tmp;
118 
119 	TAILQ_FOREACH_SAFE(nbd, &g_spdk_nbd.disk_head, tailq, nbd_tmp) {
120 		if (!nbd->is_closing) {
121 			spdk_nbd_stop(nbd);
122 		}
123 	}
124 
125 	/* Check if all nbds closed */
126 	if (!TAILQ_FIRST(&g_spdk_nbd.disk_head)) {
127 		g_fini_cb_fn(g_fini_cb_arg);
128 	} else {
129 		spdk_thread_send_msg(spdk_get_thread(),
130 				     _nbd_fini, NULL);
131 	}
132 }
133 
134 void
135 spdk_nbd_fini(spdk_nbd_fini_cb cb_fn, void *cb_arg)
136 {
137 	g_fini_cb_fn = cb_fn;
138 	g_fini_cb_arg = cb_arg;
139 
140 	_nbd_fini(NULL);
141 }
142 
143 static int
144 nbd_disk_register(struct spdk_nbd_disk *nbd)
145 {
146 	/* Make sure nbd_path is not used in this SPDK app */
147 	if (nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
148 		SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path);
149 		return -EBUSY;
150 	}
151 
152 	TAILQ_INSERT_TAIL(&g_spdk_nbd.disk_head, nbd, tailq);
153 
154 	return 0;
155 }
156 
157 static void
158 nbd_disk_unregister(struct spdk_nbd_disk *nbd)
159 {
160 	struct spdk_nbd_disk *nbd_idx, *nbd_tmp;
161 
162 	/*
163 	 * nbd disk may be stopped before registered.
164 	 * check whether it was registered.
165 	 */
166 	TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) {
167 		if (nbd == nbd_idx) {
168 			TAILQ_REMOVE(&g_spdk_nbd.disk_head, nbd_idx, tailq);
169 			break;
170 		}
171 	}
172 }
173 
174 struct spdk_nbd_disk *
175 nbd_disk_find_by_nbd_path(const char *nbd_path)
176 {
177 	struct spdk_nbd_disk *nbd;
178 
179 	/*
180 	 * check whether nbd has already been registered by nbd path.
181 	 */
182 	TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) {
183 		if (!strcmp(nbd->nbd_path, nbd_path)) {
184 			return nbd;
185 		}
186 	}
187 
188 	return NULL;
189 }
190 
191 struct spdk_nbd_disk *nbd_disk_first(void)
192 {
193 	return TAILQ_FIRST(&g_spdk_nbd.disk_head);
194 }
195 
196 struct spdk_nbd_disk *nbd_disk_next(struct spdk_nbd_disk *prev)
197 {
198 	return TAILQ_NEXT(prev, tailq);
199 }
200 
201 const char *
202 nbd_disk_get_nbd_path(struct spdk_nbd_disk *nbd)
203 {
204 	return nbd->nbd_path;
205 }
206 
207 const char *
208 nbd_disk_get_bdev_name(struct spdk_nbd_disk *nbd)
209 {
210 	return spdk_bdev_get_name(nbd->bdev);
211 }
212 
213 void
214 spdk_nbd_write_config_json(struct spdk_json_write_ctx *w)
215 {
216 	struct spdk_nbd_disk *nbd;
217 
218 	spdk_json_write_array_begin(w);
219 
220 	TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) {
221 		spdk_json_write_object_begin(w);
222 
223 		spdk_json_write_named_string(w, "method", "nbd_start_disk");
224 
225 		spdk_json_write_named_object_begin(w, "params");
226 		spdk_json_write_named_string(w, "nbd_device",  nbd_disk_get_nbd_path(nbd));
227 		spdk_json_write_named_string(w, "bdev_name", nbd_disk_get_bdev_name(nbd));
228 		spdk_json_write_object_end(w);
229 
230 		spdk_json_write_object_end(w);
231 	}
232 
233 	spdk_json_write_array_end(w);
234 }
235 
236 void
237 nbd_disconnect(struct spdk_nbd_disk *nbd)
238 {
239 	/*
240 	 * nbd soft-disconnection to terminate transmission phase.
241 	 * After receiving this ioctl command, nbd kernel module will send
242 	 * a NBD_CMD_DISC type io to nbd server in order to inform server.
243 	 */
244 	ioctl(nbd->dev_fd, NBD_DISCONNECT);
245 }
246 
247 static struct nbd_io *
248 nbd_get_io(struct spdk_nbd_disk *nbd)
249 {
250 	struct nbd_io *io;
251 
252 	io = calloc(1, sizeof(*io));
253 	if (!io) {
254 		return NULL;
255 	}
256 
257 	io->nbd = nbd;
258 	to_be32(&io->resp.magic, NBD_REPLY_MAGIC);
259 
260 	nbd->io_count++;
261 
262 	return io;
263 }
264 
265 static void
266 nbd_put_io(struct spdk_nbd_disk *nbd, struct nbd_io *io)
267 {
268 	if (io->payload) {
269 		spdk_free(io->payload);
270 	}
271 	free(io);
272 
273 	nbd->io_count--;
274 }
275 
276 /*
277  * Check whether received nbd_io are all executed,
278  * and put back executed nbd_io instead of transmitting them
279  *
280  * \return 1 there is still some nbd_io under executing
281  *         0 all nbd_io gotten are freed.
282  */
283 static int
284 nbd_cleanup_io(struct spdk_nbd_disk *nbd)
285 {
286 	/* Try to read the remaining nbd commands in the socket */
287 	while (nbd_io_recv_internal(nbd) > 0);
288 
289 	/* free io_in_recv */
290 	if (nbd->io_in_recv != NULL) {
291 		nbd_put_io(nbd, nbd->io_in_recv);
292 		nbd->io_in_recv = NULL;
293 	}
294 
295 	/*
296 	 * Some nbd_io may be under executing in bdev.
297 	 * Wait for their done operation.
298 	 */
299 	if (nbd->io_count != 0) {
300 		return 1;
301 	}
302 
303 	return 0;
304 }
305 
306 static int
307 _nbd_stop(void *arg)
308 {
309 	struct spdk_nbd_disk *nbd = arg;
310 
311 	if (nbd->nbd_poller) {
312 		spdk_poller_unregister(&nbd->nbd_poller);
313 	}
314 
315 	if (nbd->intr) {
316 		spdk_interrupt_unregister(&nbd->intr);
317 	}
318 
319 	if (nbd->spdk_sp_fd >= 0) {
320 		close(nbd->spdk_sp_fd);
321 		nbd->spdk_sp_fd = -1;
322 	}
323 
324 	if (nbd->kernel_sp_fd >= 0) {
325 		close(nbd->kernel_sp_fd);
326 		nbd->kernel_sp_fd = -1;
327 	}
328 
329 	/* Continue the stop procedure after the exit of nbd_start_kernel pthread */
330 	if (nbd->has_nbd_pthread) {
331 		if (nbd->retry_poller == NULL) {
332 			nbd->retry_count = NBD_STOP_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US;
333 			nbd->retry_poller = SPDK_POLLER_REGISTER(_nbd_stop, nbd,
334 					    NBD_BUSY_POLLING_INTERVAL_US);
335 			return SPDK_POLLER_BUSY;
336 		}
337 
338 		if (nbd->retry_count-- > 0) {
339 			return SPDK_POLLER_BUSY;
340 		}
341 
342 		SPDK_ERRLOG("Failed to wait for returning of NBD_DO_IT ioctl.\n");
343 	}
344 
345 	if (nbd->retry_poller) {
346 		spdk_poller_unregister(&nbd->retry_poller);
347 	}
348 
349 	if (nbd->dev_fd >= 0) {
350 		/* Clear nbd device only if it is occupied by SPDK app */
351 		if (nbd->nbd_path && nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
352 			ioctl(nbd->dev_fd, NBD_CLEAR_QUE);
353 			ioctl(nbd->dev_fd, NBD_CLEAR_SOCK);
354 		}
355 		close(nbd->dev_fd);
356 	}
357 
358 	if (nbd->nbd_path) {
359 		free(nbd->nbd_path);
360 	}
361 
362 	if (nbd->ch) {
363 		spdk_put_io_channel(nbd->ch);
364 		nbd->ch = NULL;
365 	}
366 
367 	if (nbd->bdev_desc) {
368 		spdk_bdev_close(nbd->bdev_desc);
369 		nbd->bdev_desc = NULL;
370 	}
371 
372 	nbd_disk_unregister(nbd);
373 
374 	free(nbd);
375 
376 	return 0;
377 }
378 
379 int
380 spdk_nbd_stop(struct spdk_nbd_disk *nbd)
381 {
382 	int rc = 0;
383 
384 	if (nbd == NULL) {
385 		return rc;
386 	}
387 
388 	nbd->is_closing = true;
389 
390 	/* if nbd is not started, it will continue to call nbd stop later */
391 	if (!nbd->is_started) {
392 		return 1;
393 	}
394 
395 	/*
396 	 * Stop action should be called only after all nbd_io are executed.
397 	 */
398 
399 	rc = nbd_cleanup_io(nbd);
400 	if (!rc) {
401 		_nbd_stop(nbd);
402 	}
403 
404 	return rc;
405 }
406 
407 static int64_t
408 nbd_socket_rw(int fd, void *buf, size_t length, bool read_op)
409 {
410 	ssize_t rc;
411 
412 	if (read_op) {
413 		rc = read(fd, buf, length);
414 	} else {
415 		rc = write(fd, buf, length);
416 	}
417 
418 	if (rc == 0) {
419 		return -EIO;
420 	} else if (rc == -1) {
421 		if (errno != EAGAIN) {
422 			return -errno;
423 		}
424 		return 0;
425 	} else {
426 		return rc;
427 	}
428 }
429 
430 static void
431 nbd_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
432 {
433 	struct nbd_io	*io = cb_arg;
434 	struct spdk_nbd_disk *nbd = io->nbd;
435 
436 	if (success) {
437 		io->resp.error = 0;
438 	} else {
439 		to_be32(&io->resp.error, EIO);
440 	}
441 
442 	memcpy(&io->resp.handle, &io->req.handle, sizeof(io->resp.handle));
443 
444 	/* When there begins to have executed_io, enable socket writable notice in order to
445 	 * get it processed in nbd_io_xmit
446 	 */
447 	if (nbd->interrupt_mode && TAILQ_EMPTY(&nbd->executed_io_list)) {
448 		spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT);
449 	}
450 
451 	TAILQ_REMOVE(&nbd->processing_io_list, io, tailq);
452 	TAILQ_INSERT_TAIL(&nbd->executed_io_list, io, tailq);
453 
454 	if (bdev_io != NULL) {
455 		spdk_bdev_free_io(bdev_io);
456 	}
457 }
458 
459 static void
460 nbd_resubmit_io(void *arg)
461 {
462 	struct nbd_io *io = (struct nbd_io *)arg;
463 	struct spdk_nbd_disk *nbd = io->nbd;
464 	int rc = 0;
465 
466 	rc = nbd_submit_bdev_io(nbd, io);
467 	if (rc) {
468 		SPDK_INFOLOG(nbd, "nbd: io resubmit for dev %s , io_type %d, returned %d.\n",
469 			     nbd_disk_get_bdev_name(nbd), from_be32(&io->req.type), rc);
470 	}
471 }
472 
473 static void
474 nbd_queue_io(struct nbd_io *io)
475 {
476 	int rc;
477 	struct spdk_bdev *bdev = io->nbd->bdev;
478 
479 	io->bdev_io_wait.bdev = bdev;
480 	io->bdev_io_wait.cb_fn = nbd_resubmit_io;
481 	io->bdev_io_wait.cb_arg = io;
482 
483 	rc = spdk_bdev_queue_io_wait(bdev, io->nbd->ch, &io->bdev_io_wait);
484 	if (rc != 0) {
485 		SPDK_ERRLOG("Queue io failed in nbd_queue_io, rc=%d.\n", rc);
486 		nbd_io_done(NULL, false, io);
487 	}
488 }
489 
490 static int
491 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io)
492 {
493 	struct spdk_bdev_desc *desc = nbd->bdev_desc;
494 	struct spdk_io_channel *ch = nbd->ch;
495 	int rc = 0;
496 
497 	switch (from_be32(&io->req.type)) {
498 	case NBD_CMD_READ:
499 		rc = spdk_bdev_read(desc, ch, io->payload, from_be64(&io->req.from),
500 				    io->payload_size, nbd_io_done, io);
501 		break;
502 	case NBD_CMD_WRITE:
503 		rc = spdk_bdev_write(desc, ch, io->payload, from_be64(&io->req.from),
504 				     io->payload_size, nbd_io_done, io);
505 		break;
506 #ifdef NBD_FLAG_SEND_FLUSH
507 	case NBD_CMD_FLUSH:
508 		rc = spdk_bdev_flush(desc, ch, 0,
509 				     spdk_bdev_get_num_blocks(nbd->bdev) * spdk_bdev_get_block_size(nbd->bdev),
510 				     nbd_io_done, io);
511 		break;
512 #endif
513 #ifdef NBD_FLAG_SEND_TRIM
514 	case NBD_CMD_TRIM:
515 		rc = spdk_bdev_unmap(desc, ch, from_be64(&io->req.from),
516 				     from_be32(&io->req.len), nbd_io_done, io);
517 		break;
518 #endif
519 	default:
520 		rc = -1;
521 	}
522 
523 	if (rc < 0) {
524 		if (rc == -ENOMEM) {
525 			SPDK_INFOLOG(nbd, "No memory, start to queue io.\n");
526 			nbd_queue_io(io);
527 		} else {
528 			SPDK_ERRLOG("nbd io failed in nbd_queue_io, rc=%d.\n", rc);
529 			nbd_io_done(NULL, false, io);
530 		}
531 	}
532 
533 	return 0;
534 }
535 
536 static int
537 nbd_io_exec(struct spdk_nbd_disk *nbd)
538 {
539 	struct nbd_io *io, *io_tmp;
540 	int io_count = 0;
541 	int ret = 0;
542 
543 	TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) {
544 		TAILQ_REMOVE(&nbd->received_io_list, io, tailq);
545 		TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq);
546 		ret = nbd_submit_bdev_io(nbd, io);
547 		if (ret < 0) {
548 			return ret;
549 		}
550 
551 		io_count++;
552 	}
553 
554 	return io_count;
555 }
556 
557 static int
558 nbd_io_recv_internal(struct spdk_nbd_disk *nbd)
559 {
560 	struct nbd_io *io;
561 	int ret = 0;
562 	int received = 0;
563 
564 	if (nbd->io_in_recv == NULL) {
565 		nbd->io_in_recv = nbd_get_io(nbd);
566 		if (!nbd->io_in_recv) {
567 			return -ENOMEM;
568 		}
569 	}
570 
571 	io = nbd->io_in_recv;
572 
573 	if (io->state == NBD_IO_RECV_REQ) {
574 		ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->req + io->offset,
575 				    sizeof(io->req) - io->offset, true);
576 		if (ret < 0) {
577 			nbd_put_io(nbd, io);
578 			nbd->io_in_recv = NULL;
579 			return ret;
580 		}
581 
582 		io->offset += ret;
583 		received = ret;
584 
585 		/* request is fully received */
586 		if (io->offset == sizeof(io->req)) {
587 			io->offset = 0;
588 
589 			/* req magic check */
590 			if (from_be32(&io->req.magic) != NBD_REQUEST_MAGIC) {
591 				SPDK_ERRLOG("invalid request magic\n");
592 				nbd_put_io(nbd, io);
593 				nbd->io_in_recv = NULL;
594 				return -EINVAL;
595 			}
596 
597 			if (from_be32(&io->req.type) == NBD_CMD_DISC) {
598 				nbd->is_closing = true;
599 				nbd->io_in_recv = NULL;
600 				if (nbd->interrupt_mode && TAILQ_EMPTY(&nbd->executed_io_list)) {
601 					spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT);
602 				}
603 				nbd_put_io(nbd, io);
604 				/* After receiving NBD_CMD_DISC, nbd will not receive any new commands */
605 				return received;
606 			}
607 
608 			/* io except read/write should ignore payload */
609 			if (from_be32(&io->req.type) == NBD_CMD_WRITE ||
610 			    from_be32(&io->req.type) == NBD_CMD_READ) {
611 				io->payload_size = from_be32(&io->req.len);
612 			} else {
613 				io->payload_size = 0;
614 			}
615 
616 			/* io payload allocate */
617 			if (io->payload_size) {
618 				io->payload = spdk_malloc(io->payload_size, nbd->buf_align, NULL,
619 							  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
620 				if (io->payload == NULL) {
621 					SPDK_ERRLOG("could not allocate io->payload of size %d\n", io->payload_size);
622 					nbd_put_io(nbd, io);
623 					nbd->io_in_recv = NULL;
624 					return -ENOMEM;
625 				}
626 			} else {
627 				io->payload = NULL;
628 			}
629 
630 			/* next io step */
631 			if (from_be32(&io->req.type) == NBD_CMD_WRITE) {
632 				io->state = NBD_IO_RECV_PAYLOAD;
633 			} else {
634 				io->state = NBD_IO_XMIT_RESP;
635 				if (spdk_likely((!nbd->is_closing) && nbd->is_started)) {
636 					TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq);
637 				} else {
638 					TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq);
639 					nbd_io_done(NULL, false, io);
640 				}
641 				nbd->io_in_recv = NULL;
642 			}
643 		}
644 	}
645 
646 	if (io->state == NBD_IO_RECV_PAYLOAD) {
647 		ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset, true);
648 		if (ret < 0) {
649 			nbd_put_io(nbd, io);
650 			nbd->io_in_recv = NULL;
651 			return ret;
652 		}
653 
654 		io->offset += ret;
655 		received += ret;
656 
657 		/* request payload is fully received */
658 		if (io->offset == io->payload_size) {
659 			io->offset = 0;
660 			io->state = NBD_IO_XMIT_RESP;
661 			if (spdk_likely((!nbd->is_closing) && nbd->is_started)) {
662 				TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq);
663 			} else {
664 				TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq);
665 				nbd_io_done(NULL, false, io);
666 			}
667 			nbd->io_in_recv = NULL;
668 		}
669 
670 	}
671 
672 	return received;
673 }
674 
675 static int
676 nbd_io_recv(struct spdk_nbd_disk *nbd)
677 {
678 	int i, rc, ret = 0;
679 
680 	/*
681 	 * nbd server should not accept request after closing command
682 	 */
683 	if (nbd->is_closing) {
684 		return 0;
685 	}
686 
687 	for (i = 0; i < GET_IO_LOOP_COUNT; i++) {
688 		rc = nbd_io_recv_internal(nbd);
689 		if (rc < 0) {
690 			return rc;
691 		}
692 		ret += rc;
693 		if (nbd->is_closing) {
694 			break;
695 		}
696 	}
697 
698 	return ret;
699 }
700 
701 static int
702 nbd_io_xmit_internal(struct spdk_nbd_disk *nbd)
703 {
704 	struct nbd_io *io;
705 	int ret = 0;
706 	int sent = 0;
707 
708 	io = TAILQ_FIRST(&nbd->executed_io_list);
709 	if (io == NULL) {
710 		return 0;
711 	}
712 
713 	/* Remove IO from list now assuming it will be completed.  It will be inserted
714 	 *  back to the head if it cannot be completed.  This approach is specifically
715 	 *  taken to work around a scan-build use-after-free mischaracterization.
716 	 */
717 	TAILQ_REMOVE(&nbd->executed_io_list, io, tailq);
718 
719 	/* resp error and handler are already set in io_done */
720 
721 	if (io->state == NBD_IO_XMIT_RESP) {
722 		ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->resp + io->offset,
723 				    sizeof(io->resp) - io->offset, false);
724 		if (ret <= 0) {
725 			goto reinsert;
726 		}
727 
728 		io->offset += ret;
729 		sent = ret;
730 
731 		/* response is fully transmitted */
732 		if (io->offset == sizeof(io->resp)) {
733 			io->offset = 0;
734 
735 			/* transmit payload only when NBD_CMD_READ with no resp error */
736 			if (from_be32(&io->req.type) != NBD_CMD_READ || io->resp.error != 0) {
737 				nbd_put_io(nbd, io);
738 				return 0;
739 			} else {
740 				io->state = NBD_IO_XMIT_PAYLOAD;
741 			}
742 		}
743 	}
744 
745 	if (io->state == NBD_IO_XMIT_PAYLOAD) {
746 		ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset,
747 				    false);
748 		if (ret <= 0) {
749 			goto reinsert;
750 		}
751 
752 		io->offset += ret;
753 		sent += ret;
754 
755 		/* read payload is fully transmitted */
756 		if (io->offset == io->payload_size) {
757 			nbd_put_io(nbd, io);
758 			return sent;
759 		}
760 	}
761 
762 reinsert:
763 	TAILQ_INSERT_HEAD(&nbd->executed_io_list, io, tailq);
764 	return ret < 0 ? ret : sent;
765 }
766 
767 static int
768 nbd_io_xmit(struct spdk_nbd_disk *nbd)
769 {
770 	int ret = 0;
771 	int rc;
772 
773 	while (!TAILQ_EMPTY(&nbd->executed_io_list)) {
774 		rc = nbd_io_xmit_internal(nbd);
775 		if (rc < 0) {
776 			return rc;
777 		}
778 
779 		ret += rc;
780 	}
781 
782 	/* When there begins to have no executed_io, disable socket writable notice */
783 	if (nbd->interrupt_mode) {
784 		spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN);
785 	}
786 
787 	return ret;
788 }
789 
790 /**
791  * Poll an NBD instance.
792  *
793  * \return 0 on success or negated errno values on error (e.g. connection closed).
794  */
795 static int
796 _nbd_poll(struct spdk_nbd_disk *nbd)
797 {
798 	int received, sent, executed;
799 
800 	/* transmit executed io first */
801 	sent = nbd_io_xmit(nbd);
802 	if (sent < 0) {
803 		return sent;
804 	}
805 
806 	received = nbd_io_recv(nbd);
807 	if (received < 0) {
808 		return received;
809 	}
810 
811 	executed = nbd_io_exec(nbd);
812 	if (executed < 0) {
813 		return executed;
814 	}
815 
816 	return sent + received + executed;
817 }
818 
819 static int
820 nbd_poll(void *arg)
821 {
822 	struct spdk_nbd_disk *nbd = arg;
823 	int rc;
824 
825 	rc = _nbd_poll(nbd);
826 	if (rc < 0) {
827 		SPDK_INFOLOG(nbd, "nbd_poll() returned %s (%d); closing connection\n",
828 			     spdk_strerror(-rc), rc);
829 		_nbd_stop(nbd);
830 		return SPDK_POLLER_IDLE;
831 	}
832 	if (nbd->is_closing && nbd->io_count == 0) {
833 		spdk_nbd_stop(nbd);
834 	}
835 
836 	return rc == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY;
837 }
838 
839 struct spdk_nbd_start_ctx {
840 	struct spdk_nbd_disk	*nbd;
841 	spdk_nbd_start_cb	cb_fn;
842 	void			*cb_arg;
843 	struct spdk_thread	*thread;
844 };
845 
846 static void
847 nbd_start_complete(void *arg)
848 {
849 	struct spdk_nbd_start_ctx *ctx = arg;
850 
851 	if (ctx->cb_fn) {
852 		ctx->cb_fn(ctx->cb_arg, ctx->nbd, 0);
853 	}
854 
855 	/* nbd will possibly receive stop command while initing */
856 	ctx->nbd->is_started = true;
857 
858 	free(ctx);
859 }
860 
861 static void *
862 nbd_start_kernel(void *arg)
863 {
864 	struct spdk_nbd_start_ctx *ctx = arg;
865 	struct spdk_nbd_disk *nbd = ctx->nbd;
866 
867 	spdk_unaffinitize_thread();
868 
869 	/* Send a message to complete the start context - this is the
870 	 * latest point we can do it, since the NBD_DO_IT ioctl will
871 	 * block in the kernel.
872 	 */
873 	spdk_thread_send_msg(ctx->thread, nbd_start_complete, ctx);
874 
875 	/* This will block in the kernel until we close the spdk_sp_fd. */
876 	ioctl(nbd->dev_fd, NBD_DO_IT);
877 
878 	nbd->has_nbd_pthread = false;
879 
880 	pthread_exit(NULL);
881 }
882 
883 static void
884 nbd_bdev_hot_remove(struct spdk_nbd_disk *nbd)
885 {
886 	struct nbd_io *io, *io_tmp;
887 
888 	nbd->is_closing = true;
889 	nbd_cleanup_io(nbd);
890 
891 	TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) {
892 		TAILQ_REMOVE(&nbd->received_io_list, io, tailq);
893 		nbd_io_done(NULL, false, io);
894 	}
895 }
896 
897 static void
898 nbd_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
899 		  void *event_ctx)
900 {
901 	switch (type) {
902 	case SPDK_BDEV_EVENT_REMOVE:
903 		nbd_bdev_hot_remove(event_ctx);
904 		break;
905 	default:
906 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
907 		break;
908 	}
909 }
910 
911 static void
912 nbd_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
913 {
914 	struct spdk_nbd_disk *nbd = cb_arg;
915 
916 	nbd->interrupt_mode = interrupt_mode;
917 }
918 
919 static void
920 nbd_start_continue(struct spdk_nbd_start_ctx *ctx)
921 {
922 	int		rc;
923 	pthread_t	tid;
924 	unsigned long	nbd_flags = 0;
925 
926 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_BLKSIZE, spdk_bdev_get_block_size(ctx->nbd->bdev));
927 	if (rc == -1) {
928 		SPDK_ERRLOG("ioctl(NBD_SET_BLKSIZE) failed: %s\n", spdk_strerror(errno));
929 		rc = -errno;
930 		goto err;
931 	}
932 
933 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SIZE_BLOCKS, spdk_bdev_get_num_blocks(ctx->nbd->bdev));
934 	if (rc == -1) {
935 		SPDK_ERRLOG("ioctl(NBD_SET_SIZE_BLOCKS) failed: %s\n", spdk_strerror(errno));
936 		rc = -errno;
937 		goto err;
938 	}
939 
940 #ifdef NBD_SET_TIMEOUT
941 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_TIMEOUT, NBD_IO_TIMEOUT_S);
942 	if (rc == -1) {
943 		SPDK_ERRLOG("ioctl(NBD_SET_TIMEOUT) failed: %s\n", spdk_strerror(errno));
944 		rc = -errno;
945 		goto err;
946 	}
947 #else
948 	SPDK_NOTICELOG("ioctl(NBD_SET_TIMEOUT) is not supported.\n");
949 #endif
950 
951 #ifdef NBD_FLAG_SEND_FLUSH
952 	if (spdk_bdev_io_type_supported(ctx->nbd->bdev, SPDK_BDEV_IO_TYPE_FLUSH)) {
953 		nbd_flags |= NBD_FLAG_SEND_FLUSH;
954 	}
955 #endif
956 #ifdef NBD_FLAG_SEND_TRIM
957 	if (spdk_bdev_io_type_supported(ctx->nbd->bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
958 		nbd_flags |= NBD_FLAG_SEND_TRIM;
959 	}
960 #endif
961 
962 	if (nbd_flags) {
963 		rc = ioctl(ctx->nbd->dev_fd, NBD_SET_FLAGS, nbd_flags);
964 		if (rc == -1) {
965 			SPDK_ERRLOG("ioctl(NBD_SET_FLAGS, 0x%lx) failed: %s\n", nbd_flags, spdk_strerror(errno));
966 			rc = -errno;
967 			goto err;
968 		}
969 	}
970 
971 	ctx->nbd->has_nbd_pthread = true;
972 	rc = pthread_create(&tid, NULL, nbd_start_kernel, ctx);
973 	if (rc != 0) {
974 		ctx->nbd->has_nbd_pthread = false;
975 		SPDK_ERRLOG("could not create thread: %s\n", spdk_strerror(rc));
976 		rc = -rc;
977 		goto err;
978 	}
979 
980 	rc = pthread_detach(tid);
981 	if (rc != 0) {
982 		SPDK_ERRLOG("could not detach thread for nbd kernel: %s\n", spdk_strerror(rc));
983 		rc = -rc;
984 		goto err;
985 	}
986 
987 	if (spdk_interrupt_mode_is_enabled()) {
988 		ctx->nbd->intr = SPDK_INTERRUPT_REGISTER(ctx->nbd->spdk_sp_fd, nbd_poll, ctx->nbd);
989 	}
990 
991 	ctx->nbd->nbd_poller = SPDK_POLLER_REGISTER(nbd_poll, ctx->nbd, 0);
992 	spdk_poller_register_interrupt(ctx->nbd->nbd_poller, nbd_poller_set_interrupt_mode, ctx->nbd);
993 	return;
994 
995 err:
996 	_nbd_stop(ctx->nbd);
997 	if (ctx->cb_fn) {
998 		ctx->cb_fn(ctx->cb_arg, NULL, rc);
999 	}
1000 	free(ctx);
1001 }
1002 
1003 static int
1004 nbd_enable_kernel(void *arg)
1005 {
1006 	struct spdk_nbd_start_ctx *ctx = arg;
1007 	int rc;
1008 
1009 	/* Declare device setup by this process */
1010 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SOCK, ctx->nbd->kernel_sp_fd);
1011 
1012 	if (rc) {
1013 		if (errno == EBUSY) {
1014 			if (ctx->nbd->retry_poller == NULL) {
1015 				ctx->nbd->retry_count = NBD_START_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US;
1016 				ctx->nbd->retry_poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx,
1017 							 NBD_BUSY_POLLING_INTERVAL_US);
1018 				return SPDK_POLLER_BUSY;
1019 			} else if (ctx->nbd->retry_count-- > 0) {
1020 				/* Repeatedly unregister and register retry poller to avoid scan-build error */
1021 				spdk_poller_unregister(&ctx->nbd->retry_poller);
1022 				ctx->nbd->retry_poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx,
1023 							 NBD_BUSY_POLLING_INTERVAL_US);
1024 				return SPDK_POLLER_BUSY;
1025 			}
1026 		}
1027 
1028 		SPDK_ERRLOG("ioctl(NBD_SET_SOCK) failed: %s\n", spdk_strerror(errno));
1029 		if (ctx->nbd->retry_poller) {
1030 			spdk_poller_unregister(&ctx->nbd->retry_poller);
1031 		}
1032 
1033 		_nbd_stop(ctx->nbd);
1034 
1035 		if (ctx->cb_fn) {
1036 			ctx->cb_fn(ctx->cb_arg, NULL, -errno);
1037 		}
1038 
1039 		free(ctx);
1040 		return SPDK_POLLER_BUSY;
1041 	}
1042 
1043 	if (ctx->nbd->retry_poller) {
1044 		spdk_poller_unregister(&ctx->nbd->retry_poller);
1045 	}
1046 
1047 	nbd_start_continue(ctx);
1048 
1049 	return SPDK_POLLER_BUSY;
1050 }
1051 
1052 void
1053 spdk_nbd_start(const char *bdev_name, const char *nbd_path,
1054 	       spdk_nbd_start_cb cb_fn, void *cb_arg)
1055 {
1056 	struct spdk_nbd_start_ctx	*ctx = NULL;
1057 	struct spdk_nbd_disk		*nbd = NULL;
1058 	struct spdk_bdev		*bdev;
1059 	int				rc;
1060 	int				sp[2];
1061 
1062 	nbd = calloc(1, sizeof(*nbd));
1063 	if (nbd == NULL) {
1064 		rc = -ENOMEM;
1065 		goto err;
1066 	}
1067 
1068 	nbd->dev_fd = -1;
1069 	nbd->spdk_sp_fd = -1;
1070 	nbd->kernel_sp_fd = -1;
1071 
1072 	ctx = calloc(1, sizeof(*ctx));
1073 	if (ctx == NULL) {
1074 		rc = -ENOMEM;
1075 		goto err;
1076 	}
1077 
1078 	ctx->nbd = nbd;
1079 	ctx->cb_fn = cb_fn;
1080 	ctx->cb_arg = cb_arg;
1081 	ctx->thread = spdk_get_thread();
1082 
1083 	rc = spdk_bdev_open_ext(bdev_name, true, nbd_bdev_event_cb, nbd, &nbd->bdev_desc);
1084 	if (rc != 0) {
1085 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc);
1086 		goto err;
1087 	}
1088 
1089 	bdev = spdk_bdev_desc_get_bdev(nbd->bdev_desc);
1090 	nbd->bdev = bdev;
1091 
1092 	nbd->ch = spdk_bdev_get_io_channel(nbd->bdev_desc);
1093 	nbd->buf_align = spdk_max(spdk_bdev_get_buf_align(bdev), 64);
1094 
1095 	rc = socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, sp);
1096 	if (rc != 0) {
1097 		SPDK_ERRLOG("socketpair failed\n");
1098 		rc = -errno;
1099 		goto err;
1100 	}
1101 
1102 	nbd->spdk_sp_fd = sp[0];
1103 	nbd->kernel_sp_fd = sp[1];
1104 	nbd->nbd_path = strdup(nbd_path);
1105 	if (!nbd->nbd_path) {
1106 		SPDK_ERRLOG("strdup allocation failure\n");
1107 		rc = -ENOMEM;
1108 		goto err;
1109 	}
1110 
1111 	TAILQ_INIT(&nbd->received_io_list);
1112 	TAILQ_INIT(&nbd->executed_io_list);
1113 	TAILQ_INIT(&nbd->processing_io_list);
1114 
1115 	/* Add nbd_disk to the end of disk list */
1116 	rc = nbd_disk_register(ctx->nbd);
1117 	if (rc != 0) {
1118 		goto err;
1119 	}
1120 
1121 	nbd->dev_fd = open(nbd_path, O_RDWR | O_DIRECT);
1122 	if (nbd->dev_fd == -1) {
1123 		SPDK_ERRLOG("open(\"%s\") failed: %s\n", nbd_path, spdk_strerror(errno));
1124 		rc = -errno;
1125 		goto err;
1126 	}
1127 
1128 	SPDK_INFOLOG(nbd, "Enabling kernel access to bdev %s via %s\n",
1129 		     bdev_name, nbd_path);
1130 
1131 	nbd_enable_kernel(ctx);
1132 	return;
1133 
1134 err:
1135 	free(ctx);
1136 	if (nbd) {
1137 		_nbd_stop(nbd);
1138 	}
1139 
1140 	if (cb_fn) {
1141 		cb_fn(cb_arg, NULL, rc);
1142 	}
1143 }
1144 
1145 const char *
1146 spdk_nbd_get_path(struct spdk_nbd_disk *nbd)
1147 {
1148 	return nbd->nbd_path;
1149 }
1150 
1151 SPDK_LOG_REGISTER_COMPONENT(nbd)
1152