xref: /spdk/lib/nbd/nbd.c (revision d491e7ea33f0f52fd9abbfc4fbfff6a7f3cf2ec2)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/string.h"
36 
37 #include <linux/nbd.h>
38 
39 #include "spdk/nbd.h"
40 #include "nbd_internal.h"
41 #include "spdk/bdev.h"
42 #include "spdk/endian.h"
43 #include "spdk/env.h"
44 #include "spdk/likely.h"
45 #include "spdk/log.h"
46 #include "spdk/util.h"
47 #include "spdk/thread.h"
48 
49 #include "spdk/queue.h"
50 
51 #define GET_IO_LOOP_COUNT		16
52 #define NBD_START_BUSY_WAITING_MS	1000
53 #define NBD_STOP_BUSY_WAITING_MS	10000
54 #define NBD_BUSY_POLLING_INTERVAL_US	20000
55 #define NBD_IO_TIMEOUT_S		60
56 
57 enum nbd_io_state_t {
58 	/* Receiving or ready to receive nbd request header */
59 	NBD_IO_RECV_REQ = 0,
60 	/* Receiving write payload */
61 	NBD_IO_RECV_PAYLOAD,
62 	/* Transmitting or ready to transmit nbd response header */
63 	NBD_IO_XMIT_RESP,
64 	/* Transmitting read payload */
65 	NBD_IO_XMIT_PAYLOAD,
66 };
67 
68 struct nbd_io {
69 	struct spdk_nbd_disk	*nbd;
70 	enum nbd_io_state_t	state;
71 
72 	void			*payload;
73 	uint32_t		payload_size;
74 
75 	struct nbd_request	req;
76 	struct nbd_reply	resp;
77 
78 	/*
79 	 * Tracks current progress on reading/writing a request,
80 	 * response, or payload from the nbd socket.
81 	 */
82 	uint32_t		offset;
83 
84 	/* for bdev io_wait */
85 	struct spdk_bdev_io_wait_entry bdev_io_wait;
86 
87 	TAILQ_ENTRY(nbd_io)	tailq;
88 };
89 
90 enum nbd_disk_state_t {
91 	NBD_DISK_STATE_RUNNING = 0,
92 	/* soft disconnection caused by receiving nbd_cmd_disc */
93 	NBD_DISK_STATE_SOFTDISC,
94 	/* hard disconnection caused by mandatory conditions */
95 	NBD_DISK_STATE_HARDDISC,
96 };
97 
98 struct spdk_nbd_disk {
99 	struct spdk_bdev	*bdev;
100 	struct spdk_bdev_desc	*bdev_desc;
101 	struct spdk_io_channel	*ch;
102 	int			dev_fd;
103 	char			*nbd_path;
104 	int			kernel_sp_fd;
105 	int			spdk_sp_fd;
106 	struct spdk_poller	*nbd_poller;
107 	struct spdk_interrupt	*intr;
108 	bool			interrupt_mode;
109 	uint32_t		buf_align;
110 
111 	struct spdk_poller	*retry_poller;
112 	int			retry_count;
113 	/* Synchronize nbd_start_kernel pthread and nbd_stop */
114 	bool			has_nbd_pthread;
115 
116 	struct nbd_io		*io_in_recv;
117 	TAILQ_HEAD(, nbd_io)	received_io_list;
118 	TAILQ_HEAD(, nbd_io)	executed_io_list;
119 
120 	enum nbd_disk_state_t	state;
121 	/* count of nbd_io in spdk_nbd_disk */
122 	int			io_count;
123 
124 	TAILQ_ENTRY(spdk_nbd_disk)	tailq;
125 };
126 
127 struct spdk_nbd_disk_globals {
128 	TAILQ_HEAD(, spdk_nbd_disk)	disk_head;
129 };
130 
131 static struct spdk_nbd_disk_globals g_spdk_nbd;
132 static spdk_nbd_fini_cb g_fini_cb_fn;
133 static void *g_fini_cb_arg;
134 
135 static void _nbd_fini(void *arg1);
136 
137 static int
138 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io);
139 static int
140 nbd_io_recv_internal(struct spdk_nbd_disk *nbd);
141 
142 int
143 spdk_nbd_init(void)
144 {
145 	TAILQ_INIT(&g_spdk_nbd.disk_head);
146 
147 	return 0;
148 }
149 
150 static void
151 _nbd_fini(void *arg1)
152 {
153 	struct spdk_nbd_disk *nbd, *nbd_tmp;
154 
155 	/* Change all nbds into closing state */
156 	TAILQ_FOREACH_SAFE(nbd, &g_spdk_nbd.disk_head, tailq, nbd_tmp) {
157 		if (nbd->state != NBD_DISK_STATE_HARDDISC) {
158 			spdk_nbd_stop(nbd);
159 		}
160 	}
161 
162 	/* Check if all nbds closed */
163 	if (!TAILQ_FIRST(&g_spdk_nbd.disk_head)) {
164 		g_fini_cb_fn(g_fini_cb_arg);
165 	} else {
166 		spdk_thread_send_msg(spdk_get_thread(),
167 				     _nbd_fini, NULL);
168 	}
169 }
170 
171 void
172 spdk_nbd_fini(spdk_nbd_fini_cb cb_fn, void *cb_arg)
173 {
174 	g_fini_cb_fn = cb_fn;
175 	g_fini_cb_arg = cb_arg;
176 
177 	_nbd_fini(NULL);
178 }
179 
180 static int
181 nbd_disk_register(struct spdk_nbd_disk *nbd)
182 {
183 	/* Make sure nbd_path is not used in this SPDK app */
184 	if (nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
185 		SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path);
186 		return -EBUSY;
187 	}
188 
189 	TAILQ_INSERT_TAIL(&g_spdk_nbd.disk_head, nbd, tailq);
190 
191 	return 0;
192 }
193 
194 static void
195 nbd_disk_unregister(struct spdk_nbd_disk *nbd)
196 {
197 	struct spdk_nbd_disk *nbd_idx, *nbd_tmp;
198 
199 	/*
200 	 * nbd disk may be stopped before registered.
201 	 * check whether it was registered.
202 	 */
203 	TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) {
204 		if (nbd == nbd_idx) {
205 			TAILQ_REMOVE(&g_spdk_nbd.disk_head, nbd_idx, tailq);
206 			break;
207 		}
208 	}
209 }
210 
211 struct spdk_nbd_disk *
212 nbd_disk_find_by_nbd_path(const char *nbd_path)
213 {
214 	struct spdk_nbd_disk *nbd;
215 
216 	/*
217 	 * check whether nbd has already been registered by nbd path.
218 	 */
219 	TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) {
220 		if (!strcmp(nbd->nbd_path, nbd_path)) {
221 			return nbd;
222 		}
223 	}
224 
225 	return NULL;
226 }
227 
228 struct spdk_nbd_disk *nbd_disk_first(void)
229 {
230 	return TAILQ_FIRST(&g_spdk_nbd.disk_head);
231 }
232 
233 struct spdk_nbd_disk *nbd_disk_next(struct spdk_nbd_disk *prev)
234 {
235 	return TAILQ_NEXT(prev, tailq);
236 }
237 
238 const char *
239 nbd_disk_get_nbd_path(struct spdk_nbd_disk *nbd)
240 {
241 	return nbd->nbd_path;
242 }
243 
244 const char *
245 nbd_disk_get_bdev_name(struct spdk_nbd_disk *nbd)
246 {
247 	return spdk_bdev_get_name(nbd->bdev);
248 }
249 
250 void
251 spdk_nbd_write_config_json(struct spdk_json_write_ctx *w)
252 {
253 	struct spdk_nbd_disk *nbd;
254 
255 	spdk_json_write_array_begin(w);
256 
257 	TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) {
258 		spdk_json_write_object_begin(w);
259 
260 		spdk_json_write_named_string(w, "method", "nbd_start_disk");
261 
262 		spdk_json_write_named_object_begin(w, "params");
263 		spdk_json_write_named_string(w, "nbd_device",  nbd_disk_get_nbd_path(nbd));
264 		spdk_json_write_named_string(w, "bdev_name", nbd_disk_get_bdev_name(nbd));
265 		spdk_json_write_object_end(w);
266 
267 		spdk_json_write_object_end(w);
268 	}
269 
270 	spdk_json_write_array_end(w);
271 }
272 
273 void
274 nbd_disconnect(struct spdk_nbd_disk *nbd)
275 {
276 	/*
277 	 * nbd soft-disconnection to terminate transmission phase.
278 	 * After receiving this ioctl command, nbd kernel module will send
279 	 * a NBD_CMD_DISC type io to nbd server in order to inform server.
280 	 */
281 	ioctl(nbd->dev_fd, NBD_DISCONNECT);
282 }
283 
284 static struct nbd_io *
285 nbd_get_io(struct spdk_nbd_disk *nbd)
286 {
287 	struct nbd_io *io;
288 
289 	io = calloc(1, sizeof(*io));
290 	if (!io) {
291 		return NULL;
292 	}
293 
294 	io->nbd = nbd;
295 	to_be32(&io->resp.magic, NBD_REPLY_MAGIC);
296 
297 	nbd->io_count++;
298 
299 	return io;
300 }
301 
302 static void
303 nbd_put_io(struct spdk_nbd_disk *nbd, struct nbd_io *io)
304 {
305 	if (io->payload) {
306 		spdk_free(io->payload);
307 	}
308 	free(io);
309 
310 	nbd->io_count--;
311 }
312 
313 /*
314  * Check whether received nbd_io are all executed,
315  * and put back executed nbd_io instead of transmitting them
316  *
317  * \return 1 there is still some nbd_io under executing
318  *         0 all nbd_io gotten are freed.
319  */
320 static int
321 nbd_cleanup_io(struct spdk_nbd_disk *nbd)
322 {
323 	/* Try to read the remaining nbd commands in the socket */
324 	while (nbd_io_recv_internal(nbd) > 0);
325 
326 	/* free io_in_recv */
327 	if (nbd->io_in_recv != NULL) {
328 		nbd_put_io(nbd, nbd->io_in_recv);
329 		nbd->io_in_recv = NULL;
330 	}
331 
332 	/*
333 	 * Some nbd_io may be under executing in bdev.
334 	 * Wait for their done operation.
335 	 */
336 	if (nbd->io_count != 0) {
337 		return 1;
338 	}
339 
340 	return 0;
341 }
342 
343 static int
344 _nbd_stop(void *arg)
345 {
346 	struct spdk_nbd_disk *nbd = arg;
347 
348 	if (nbd->nbd_poller) {
349 		spdk_poller_unregister(&nbd->nbd_poller);
350 	}
351 
352 	if (nbd->intr) {
353 		spdk_interrupt_unregister(&nbd->intr);
354 	}
355 
356 	if (nbd->spdk_sp_fd >= 0) {
357 		close(nbd->spdk_sp_fd);
358 		nbd->spdk_sp_fd = -1;
359 	}
360 
361 	if (nbd->kernel_sp_fd >= 0) {
362 		close(nbd->kernel_sp_fd);
363 		nbd->kernel_sp_fd = -1;
364 	}
365 
366 	/* Continue the stop procedure after the exit of nbd_start_kernel pthread */
367 	if (nbd->has_nbd_pthread) {
368 		if (nbd->retry_poller == NULL) {
369 			nbd->retry_count = NBD_STOP_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US;
370 			nbd->retry_poller = SPDK_POLLER_REGISTER(_nbd_stop, nbd,
371 					    NBD_BUSY_POLLING_INTERVAL_US);
372 			return SPDK_POLLER_BUSY;
373 		}
374 
375 		if (nbd->retry_count-- > 0) {
376 			return SPDK_POLLER_BUSY;
377 		}
378 
379 		SPDK_ERRLOG("Failed to wait for returning of NBD_DO_IT ioctl.\n");
380 	}
381 
382 	if (nbd->retry_poller) {
383 		spdk_poller_unregister(&nbd->retry_poller);
384 	}
385 
386 	if (nbd->dev_fd >= 0) {
387 		/* Clear nbd device only if it is occupied by SPDK app */
388 		if (nbd->nbd_path && nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
389 			ioctl(nbd->dev_fd, NBD_CLEAR_QUE);
390 			ioctl(nbd->dev_fd, NBD_CLEAR_SOCK);
391 		}
392 		close(nbd->dev_fd);
393 	}
394 
395 	if (nbd->nbd_path) {
396 		free(nbd->nbd_path);
397 	}
398 
399 	if (nbd->ch) {
400 		spdk_put_io_channel(nbd->ch);
401 		nbd->ch = NULL;
402 	}
403 
404 	if (nbd->bdev_desc) {
405 		spdk_bdev_close(nbd->bdev_desc);
406 		nbd->bdev_desc = NULL;
407 	}
408 
409 	nbd_disk_unregister(nbd);
410 
411 	free(nbd);
412 
413 	return 0;
414 }
415 
416 int
417 spdk_nbd_stop(struct spdk_nbd_disk *nbd)
418 {
419 	int rc = 0;
420 
421 	if (nbd == NULL) {
422 		return rc;
423 	}
424 
425 	nbd->state = NBD_DISK_STATE_HARDDISC;
426 
427 	/*
428 	 * Stop action should be called only after all nbd_io are executed.
429 	 */
430 
431 	rc = nbd_cleanup_io(nbd);
432 	if (!rc) {
433 		_nbd_stop(nbd);
434 	}
435 
436 	return rc;
437 }
438 
439 static int64_t
440 nbd_socket_rw(int fd, void *buf, size_t length, bool read_op)
441 {
442 	ssize_t rc;
443 
444 	if (read_op) {
445 		rc = read(fd, buf, length);
446 	} else {
447 		rc = write(fd, buf, length);
448 	}
449 
450 	if (rc == 0) {
451 		return -EIO;
452 	} else if (rc == -1) {
453 		if (errno != EAGAIN) {
454 			return -errno;
455 		}
456 		return 0;
457 	} else {
458 		return rc;
459 	}
460 }
461 
462 static void
463 nbd_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
464 {
465 	struct nbd_io	*io = cb_arg;
466 	struct spdk_nbd_disk *nbd = io->nbd;
467 
468 	if (success) {
469 		io->resp.error = 0;
470 	} else {
471 		to_be32(&io->resp.error, EIO);
472 	}
473 
474 	memcpy(&io->resp.handle, &io->req.handle, sizeof(io->resp.handle));
475 
476 	/* When there begins to have executed_io, enable socket writable notice in order to
477 	 * get it processed in nbd_io_xmit
478 	 */
479 	if (nbd->interrupt_mode && TAILQ_EMPTY(&nbd->executed_io_list)) {
480 		spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT);
481 	}
482 
483 	TAILQ_INSERT_TAIL(&nbd->executed_io_list, io, tailq);
484 
485 	if (bdev_io != NULL) {
486 		spdk_bdev_free_io(bdev_io);
487 	}
488 }
489 
490 static void
491 nbd_resubmit_io(void *arg)
492 {
493 	struct nbd_io *io = (struct nbd_io *)arg;
494 	struct spdk_nbd_disk *nbd = io->nbd;
495 	int rc = 0;
496 
497 	rc = nbd_submit_bdev_io(nbd, io);
498 	if (rc) {
499 		SPDK_INFOLOG(nbd, "nbd: io resubmit for dev %s , io_type %d, returned %d.\n",
500 			     nbd_disk_get_bdev_name(nbd), from_be32(&io->req.type), rc);
501 	}
502 }
503 
504 static void
505 nbd_queue_io(struct nbd_io *io)
506 {
507 	int rc;
508 	struct spdk_bdev *bdev = io->nbd->bdev;
509 
510 	io->bdev_io_wait.bdev = bdev;
511 	io->bdev_io_wait.cb_fn = nbd_resubmit_io;
512 	io->bdev_io_wait.cb_arg = io;
513 
514 	rc = spdk_bdev_queue_io_wait(bdev, io->nbd->ch, &io->bdev_io_wait);
515 	if (rc != 0) {
516 		SPDK_ERRLOG("Queue io failed in nbd_queue_io, rc=%d.\n", rc);
517 		nbd_io_done(NULL, false, io);
518 	}
519 }
520 
521 static int
522 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io)
523 {
524 	struct spdk_bdev_desc *desc = nbd->bdev_desc;
525 	struct spdk_io_channel *ch = nbd->ch;
526 	int rc = 0;
527 
528 	switch (from_be32(&io->req.type)) {
529 	case NBD_CMD_READ:
530 		rc = spdk_bdev_read(desc, ch, io->payload, from_be64(&io->req.from),
531 				    io->payload_size, nbd_io_done, io);
532 		break;
533 	case NBD_CMD_WRITE:
534 		rc = spdk_bdev_write(desc, ch, io->payload, from_be64(&io->req.from),
535 				     io->payload_size, nbd_io_done, io);
536 		break;
537 #ifdef NBD_FLAG_SEND_FLUSH
538 	case NBD_CMD_FLUSH:
539 		rc = spdk_bdev_flush(desc, ch, 0,
540 				     spdk_bdev_get_num_blocks(nbd->bdev) * spdk_bdev_get_block_size(nbd->bdev),
541 				     nbd_io_done, io);
542 		break;
543 #endif
544 #ifdef NBD_FLAG_SEND_TRIM
545 	case NBD_CMD_TRIM:
546 		rc = spdk_bdev_unmap(desc, ch, from_be64(&io->req.from),
547 				     from_be32(&io->req.len), nbd_io_done, io);
548 		break;
549 #endif
550 	case NBD_CMD_DISC:
551 		nbd->state = NBD_DISK_STATE_SOFTDISC;
552 		rc = spdk_bdev_abort(desc, ch, io, nbd_io_done, io);
553 
554 		/* when there begins to have executed_io to send, enable socket writable notice */
555 		if (nbd->interrupt_mode && TAILQ_EMPTY(&nbd->executed_io_list)) {
556 			spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT);
557 		}
558 
559 		break;
560 	default:
561 		rc = -1;
562 	}
563 
564 	if (rc < 0) {
565 		if (rc == -ENOMEM) {
566 			SPDK_INFOLOG(nbd, "No memory, start to queue io.\n");
567 			nbd_queue_io(io);
568 		} else {
569 			SPDK_ERRLOG("nbd io failed in nbd_queue_io, rc=%d.\n", rc);
570 			nbd_io_done(NULL, false, io);
571 		}
572 	}
573 
574 	return 0;
575 }
576 
577 static int
578 nbd_io_exec(struct spdk_nbd_disk *nbd)
579 {
580 	struct nbd_io *io, *io_tmp;
581 	int io_count = 0;
582 	int ret = 0;
583 
584 	if (!TAILQ_EMPTY(&nbd->received_io_list)) {
585 		TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) {
586 			TAILQ_REMOVE(&nbd->received_io_list, io, tailq);
587 			ret = nbd_submit_bdev_io(nbd, io);
588 			if (ret < 0) {
589 				return ret;
590 			}
591 
592 			io_count++;
593 		}
594 	}
595 
596 	return io_count;
597 }
598 
599 static int
600 nbd_io_recv_internal(struct spdk_nbd_disk *nbd)
601 {
602 	struct nbd_io *io;
603 	int ret = 0;
604 	int received = 0;
605 
606 	if (nbd->io_in_recv == NULL) {
607 		nbd->io_in_recv = nbd_get_io(nbd);
608 		if (!nbd->io_in_recv) {
609 			return -ENOMEM;
610 		}
611 	}
612 
613 	io = nbd->io_in_recv;
614 
615 	if (io->state == NBD_IO_RECV_REQ) {
616 		ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->req + io->offset,
617 				    sizeof(io->req) - io->offset, true);
618 		if (ret < 0) {
619 			nbd_put_io(nbd, io);
620 			nbd->io_in_recv = NULL;
621 			return ret;
622 		}
623 
624 		io->offset += ret;
625 		received = ret;
626 
627 		/* request is fully received */
628 		if (io->offset == sizeof(io->req)) {
629 			io->offset = 0;
630 
631 			/* req magic check */
632 			if (from_be32(&io->req.magic) != NBD_REQUEST_MAGIC) {
633 				SPDK_ERRLOG("invalid request magic\n");
634 				nbd_put_io(nbd, io);
635 				nbd->io_in_recv = NULL;
636 				return -EINVAL;
637 			}
638 
639 			/* io except read/write should ignore payload */
640 			if (from_be32(&io->req.type) == NBD_CMD_WRITE ||
641 			    from_be32(&io->req.type) == NBD_CMD_READ) {
642 				io->payload_size = from_be32(&io->req.len);
643 			} else {
644 				io->payload_size = 0;
645 			}
646 
647 			/* io payload allocate */
648 			if (io->payload_size) {
649 				io->payload = spdk_malloc(io->payload_size, nbd->buf_align, NULL,
650 							  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
651 				if (io->payload == NULL) {
652 					SPDK_ERRLOG("could not allocate io->payload of size %d\n", io->payload_size);
653 					nbd_put_io(nbd, io);
654 					nbd->io_in_recv = NULL;
655 					return -ENOMEM;
656 				}
657 			} else {
658 				io->payload = NULL;
659 			}
660 
661 			/* next io step */
662 			if (from_be32(&io->req.type) == NBD_CMD_WRITE) {
663 				io->state = NBD_IO_RECV_PAYLOAD;
664 			} else {
665 				io->state = NBD_IO_XMIT_RESP;
666 				if (spdk_likely(nbd->state == NBD_DISK_STATE_RUNNING)) {
667 					TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq);
668 				} else {
669 					nbd_io_done(NULL, false, io);
670 				}
671 				nbd->io_in_recv = NULL;
672 			}
673 		}
674 	}
675 
676 	if (io->state == NBD_IO_RECV_PAYLOAD) {
677 		ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset, true);
678 		if (ret < 0) {
679 			nbd_put_io(nbd, io);
680 			nbd->io_in_recv = NULL;
681 			return ret;
682 		}
683 
684 		io->offset += ret;
685 		received += ret;
686 
687 		/* request payload is fully received */
688 		if (io->offset == io->payload_size) {
689 			io->offset = 0;
690 			io->state = NBD_IO_XMIT_RESP;
691 			if (spdk_likely(nbd->state == NBD_DISK_STATE_RUNNING)) {
692 				TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq);
693 			} else {
694 				nbd_io_done(NULL, false, io);
695 			}
696 			nbd->io_in_recv = NULL;
697 		}
698 
699 	}
700 
701 	return received;
702 }
703 
704 static int
705 nbd_io_recv(struct spdk_nbd_disk *nbd)
706 {
707 	int i, rc, ret = 0;
708 
709 	/*
710 	 * nbd server should not accept request in both soft and hard
711 	 * disconnect states.
712 	 */
713 	if (nbd->state != NBD_DISK_STATE_RUNNING) {
714 		return 0;
715 	}
716 
717 	for (i = 0; i < GET_IO_LOOP_COUNT; i++) {
718 		rc = nbd_io_recv_internal(nbd);
719 		if (rc < 0) {
720 			return rc;
721 		}
722 		ret += rc;
723 	}
724 
725 	return ret;
726 }
727 
728 static int
729 nbd_io_xmit_internal(struct spdk_nbd_disk *nbd)
730 {
731 	struct nbd_io *io;
732 	int ret = 0;
733 	int sent = 0;
734 
735 	io = TAILQ_FIRST(&nbd->executed_io_list);
736 	if (io == NULL) {
737 		return 0;
738 	}
739 
740 	/* Remove IO from list now assuming it will be completed.  It will be inserted
741 	 *  back to the head if it cannot be completed.  This approach is specifically
742 	 *  taken to work around a scan-build use-after-free mischaracterization.
743 	 */
744 	TAILQ_REMOVE(&nbd->executed_io_list, io, tailq);
745 
746 	/* resp error and handler are already set in io_done */
747 
748 	if (io->state == NBD_IO_XMIT_RESP) {
749 		ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->resp + io->offset,
750 				    sizeof(io->resp) - io->offset, false);
751 		if (ret <= 0) {
752 			goto reinsert;
753 		}
754 
755 		io->offset += ret;
756 		sent = ret;
757 
758 		/* response is fully transmitted */
759 		if (io->offset == sizeof(io->resp)) {
760 			io->offset = 0;
761 
762 			/* transmit payload only when NBD_CMD_READ with no resp error */
763 			if (from_be32(&io->req.type) != NBD_CMD_READ || io->resp.error != 0) {
764 				nbd_put_io(nbd, io);
765 				return 0;
766 			} else {
767 				io->state = NBD_IO_XMIT_PAYLOAD;
768 			}
769 		}
770 	}
771 
772 	if (io->state == NBD_IO_XMIT_PAYLOAD) {
773 		ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset,
774 				    false);
775 		if (ret <= 0) {
776 			goto reinsert;
777 		}
778 
779 		io->offset += ret;
780 		sent += ret;
781 
782 		/* read payload is fully transmitted */
783 		if (io->offset == io->payload_size) {
784 			nbd_put_io(nbd, io);
785 			return sent;
786 		}
787 	}
788 
789 reinsert:
790 	TAILQ_INSERT_HEAD(&nbd->executed_io_list, io, tailq);
791 	return ret < 0 ? ret : sent;
792 }
793 
794 static int
795 nbd_io_xmit(struct spdk_nbd_disk *nbd)
796 {
797 	int ret = 0;
798 	int rc;
799 
800 	while (!TAILQ_EMPTY(&nbd->executed_io_list)) {
801 		rc = nbd_io_xmit_internal(nbd);
802 		if (rc < 0) {
803 			return rc;
804 		}
805 
806 		ret += rc;
807 	}
808 
809 	/* When there begins to have no executed_io, disable socket writable notice */
810 	if (nbd->interrupt_mode) {
811 		spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN);
812 	}
813 
814 	return ret;
815 }
816 
817 /**
818  * Poll an NBD instance.
819  *
820  * \return 0 on success or negated errno values on error (e.g. connection closed).
821  */
822 static int
823 _nbd_poll(struct spdk_nbd_disk *nbd)
824 {
825 	int received, sent, executed;
826 
827 	/* transmit executed io first */
828 	sent = nbd_io_xmit(nbd);
829 	if (sent < 0) {
830 		return sent;
831 	}
832 
833 	received = nbd_io_recv(nbd);
834 	if (received < 0) {
835 		return received;
836 	}
837 
838 	executed = nbd_io_exec(nbd);
839 	if (executed < 0) {
840 		return executed;
841 	}
842 
843 	return sent + received + executed;
844 }
845 
846 static int
847 nbd_poll(void *arg)
848 {
849 	struct spdk_nbd_disk *nbd = arg;
850 	int rc;
851 
852 	rc = _nbd_poll(nbd);
853 	if (rc < 0) {
854 		SPDK_INFOLOG(nbd, "nbd_poll() returned %s (%d); closing connection\n",
855 			     spdk_strerror(-rc), rc);
856 		_nbd_stop(nbd);
857 	}
858 	if (nbd->state != NBD_DISK_STATE_RUNNING) {
859 		if (nbd->state == NBD_DISK_STATE_HARDDISC && !nbd_cleanup_io(nbd)) {
860 			_nbd_stop(nbd);
861 		} else if (nbd->state == NBD_DISK_STATE_SOFTDISC) {
862 			spdk_nbd_stop(nbd);
863 		}
864 	}
865 
866 	return rc > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
867 }
868 
869 static void *
870 nbd_start_kernel(void *arg)
871 {
872 	struct spdk_nbd_disk *nbd = arg;
873 
874 	spdk_unaffinitize_thread();
875 
876 	/* This will block in the kernel until we close the spdk_sp_fd. */
877 	ioctl(nbd->dev_fd, NBD_DO_IT);
878 
879 	nbd->has_nbd_pthread = false;
880 
881 	pthread_exit(NULL);
882 }
883 
884 static void
885 nbd_bdev_hot_remove(struct spdk_nbd_disk *nbd)
886 {
887 	spdk_nbd_stop(nbd);
888 }
889 
890 static void
891 nbd_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
892 		  void *event_ctx)
893 {
894 	switch (type) {
895 	case SPDK_BDEV_EVENT_REMOVE:
896 		nbd_bdev_hot_remove(event_ctx);
897 		break;
898 	default:
899 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
900 		break;
901 	}
902 }
903 
904 struct spdk_nbd_start_ctx {
905 	struct spdk_nbd_disk	*nbd;
906 	spdk_nbd_start_cb	cb_fn;
907 	void			*cb_arg;
908 };
909 
910 static void
911 nbd_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
912 {
913 	struct spdk_nbd_disk *nbd = cb_arg;
914 
915 	nbd->interrupt_mode = interrupt_mode;
916 }
917 
918 static void
919 nbd_start_complete(struct spdk_nbd_start_ctx *ctx)
920 {
921 	int		rc;
922 	pthread_t	tid;
923 	unsigned long	nbd_flags = 0;
924 
925 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_BLKSIZE, spdk_bdev_get_block_size(ctx->nbd->bdev));
926 	if (rc == -1) {
927 		SPDK_ERRLOG("ioctl(NBD_SET_BLKSIZE) failed: %s\n", spdk_strerror(errno));
928 		rc = -errno;
929 		goto err;
930 	}
931 
932 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SIZE_BLOCKS, spdk_bdev_get_num_blocks(ctx->nbd->bdev));
933 	if (rc == -1) {
934 		SPDK_ERRLOG("ioctl(NBD_SET_SIZE_BLOCKS) failed: %s\n", spdk_strerror(errno));
935 		rc = -errno;
936 		goto err;
937 	}
938 
939 #ifdef NBD_SET_TIMEOUT
940 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_TIMEOUT, NBD_IO_TIMEOUT_S);
941 	if (rc == -1) {
942 		SPDK_ERRLOG("ioctl(NBD_SET_TIMEOUT) failed: %s\n", spdk_strerror(errno));
943 		rc = -errno;
944 		goto err;
945 	}
946 #else
947 	SPDK_NOTICELOG("ioctl(NBD_SET_TIMEOUT) is not supported.\n");
948 #endif
949 
950 #ifdef NBD_FLAG_SEND_FLUSH
951 	nbd_flags |= NBD_FLAG_SEND_FLUSH;
952 #endif
953 #ifdef NBD_FLAG_SEND_TRIM
954 	nbd_flags |= NBD_FLAG_SEND_TRIM;
955 #endif
956 	if (nbd_flags) {
957 		rc = ioctl(ctx->nbd->dev_fd, NBD_SET_FLAGS, nbd_flags);
958 		if (rc == -1) {
959 			SPDK_ERRLOG("ioctl(NBD_SET_FLAGS, 0x%lx) failed: %s\n", nbd_flags, spdk_strerror(errno));
960 			rc = -errno;
961 			goto err;
962 		}
963 	}
964 
965 	ctx->nbd->has_nbd_pthread = true;
966 	rc = pthread_create(&tid, NULL, nbd_start_kernel, ctx->nbd);
967 	if (rc != 0) {
968 		ctx->nbd->has_nbd_pthread = false;
969 		SPDK_ERRLOG("could not create thread: %s\n", spdk_strerror(rc));
970 		rc = -rc;
971 		goto err;
972 	}
973 
974 	rc = pthread_detach(tid);
975 	if (rc != 0) {
976 		SPDK_ERRLOG("could not detach thread for nbd kernel: %s\n", spdk_strerror(rc));
977 		rc = -rc;
978 		goto err;
979 	}
980 
981 	if (spdk_interrupt_mode_is_enabled()) {
982 		ctx->nbd->intr = SPDK_INTERRUPT_REGISTER(ctx->nbd->spdk_sp_fd, nbd_poll, ctx->nbd);
983 	}
984 
985 	ctx->nbd->nbd_poller = SPDK_POLLER_REGISTER(nbd_poll, ctx->nbd, 0);
986 	spdk_poller_register_interrupt(ctx->nbd->nbd_poller, nbd_poller_set_interrupt_mode, ctx->nbd);
987 
988 	if (ctx->cb_fn) {
989 		ctx->cb_fn(ctx->cb_arg, ctx->nbd, 0);
990 	}
991 
992 	free(ctx);
993 	return;
994 
995 err:
996 	_nbd_stop(ctx->nbd);
997 	if (ctx->cb_fn) {
998 		ctx->cb_fn(ctx->cb_arg, NULL, rc);
999 	}
1000 	free(ctx);
1001 }
1002 
1003 static int
1004 nbd_enable_kernel(void *arg)
1005 {
1006 	struct spdk_nbd_start_ctx *ctx = arg;
1007 	int rc;
1008 
1009 	/* Declare device setup by this process */
1010 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SOCK, ctx->nbd->kernel_sp_fd);
1011 
1012 	if (rc) {
1013 		if (errno == EBUSY) {
1014 			if (ctx->nbd->retry_poller == NULL) {
1015 				ctx->nbd->retry_count = NBD_START_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US;
1016 				ctx->nbd->retry_poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx,
1017 							 NBD_BUSY_POLLING_INTERVAL_US);
1018 				return SPDK_POLLER_BUSY;
1019 			} else if (ctx->nbd->retry_count-- > 0) {
1020 				/* Repeatedly unregiter and register retry poller to avoid scan-build error */
1021 				spdk_poller_unregister(&ctx->nbd->retry_poller);
1022 				ctx->nbd->retry_poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx,
1023 							 NBD_BUSY_POLLING_INTERVAL_US);
1024 				return SPDK_POLLER_BUSY;
1025 			}
1026 		}
1027 
1028 		SPDK_ERRLOG("ioctl(NBD_SET_SOCK) failed: %s\n", spdk_strerror(errno));
1029 		if (ctx->nbd->retry_poller) {
1030 			spdk_poller_unregister(&ctx->nbd->retry_poller);
1031 		}
1032 
1033 		_nbd_stop(ctx->nbd);
1034 
1035 		if (ctx->cb_fn) {
1036 			ctx->cb_fn(ctx->cb_arg, NULL, -errno);
1037 		}
1038 
1039 		free(ctx);
1040 		return SPDK_POLLER_BUSY;
1041 	}
1042 
1043 	if (ctx->nbd->retry_poller) {
1044 		spdk_poller_unregister(&ctx->nbd->retry_poller);
1045 	}
1046 
1047 	nbd_start_complete(ctx);
1048 
1049 	return SPDK_POLLER_BUSY;
1050 }
1051 
1052 void
1053 spdk_nbd_start(const char *bdev_name, const char *nbd_path,
1054 	       spdk_nbd_start_cb cb_fn, void *cb_arg)
1055 {
1056 	struct spdk_nbd_start_ctx	*ctx = NULL;
1057 	struct spdk_nbd_disk		*nbd = NULL;
1058 	struct spdk_bdev		*bdev;
1059 	int				rc;
1060 	int				sp[2];
1061 
1062 	nbd = calloc(1, sizeof(*nbd));
1063 	if (nbd == NULL) {
1064 		rc = -ENOMEM;
1065 		goto err;
1066 	}
1067 
1068 	nbd->dev_fd = -1;
1069 	nbd->spdk_sp_fd = -1;
1070 	nbd->kernel_sp_fd = -1;
1071 
1072 	ctx = calloc(1, sizeof(*ctx));
1073 	if (ctx == NULL) {
1074 		rc = -ENOMEM;
1075 		goto err;
1076 	}
1077 
1078 	ctx->nbd = nbd;
1079 	ctx->cb_fn = cb_fn;
1080 	ctx->cb_arg = cb_arg;
1081 
1082 	rc = spdk_bdev_open_ext(bdev_name, true, nbd_bdev_event_cb, nbd, &nbd->bdev_desc);
1083 	if (rc != 0) {
1084 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc);
1085 		goto err;
1086 	}
1087 
1088 	bdev = spdk_bdev_desc_get_bdev(nbd->bdev_desc);
1089 	nbd->bdev = bdev;
1090 
1091 	nbd->ch = spdk_bdev_get_io_channel(nbd->bdev_desc);
1092 	nbd->buf_align = spdk_max(spdk_bdev_get_buf_align(bdev), 64);
1093 
1094 	rc = socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, sp);
1095 	if (rc != 0) {
1096 		SPDK_ERRLOG("socketpair failed\n");
1097 		rc = -errno;
1098 		goto err;
1099 	}
1100 
1101 	nbd->spdk_sp_fd = sp[0];
1102 	nbd->kernel_sp_fd = sp[1];
1103 	nbd->nbd_path = strdup(nbd_path);
1104 	if (!nbd->nbd_path) {
1105 		SPDK_ERRLOG("strdup allocation failure\n");
1106 		rc = -ENOMEM;
1107 		goto err;
1108 	}
1109 
1110 	TAILQ_INIT(&nbd->received_io_list);
1111 	TAILQ_INIT(&nbd->executed_io_list);
1112 
1113 	/* Add nbd_disk to the end of disk list */
1114 	rc = nbd_disk_register(ctx->nbd);
1115 	if (rc != 0) {
1116 		goto err;
1117 	}
1118 
1119 	nbd->dev_fd = open(nbd_path, O_RDWR | O_DIRECT);
1120 	if (nbd->dev_fd == -1) {
1121 		SPDK_ERRLOG("open(\"%s\") failed: %s\n", nbd_path, spdk_strerror(errno));
1122 		rc = -errno;
1123 		goto err;
1124 	}
1125 
1126 	SPDK_INFOLOG(nbd, "Enabling kernel access to bdev %s via %s\n",
1127 		     bdev_name, nbd_path);
1128 
1129 	nbd_enable_kernel(ctx);
1130 	return;
1131 
1132 err:
1133 	free(ctx);
1134 	if (nbd) {
1135 		_nbd_stop(nbd);
1136 	}
1137 
1138 	if (cb_fn) {
1139 		cb_fn(cb_arg, NULL, rc);
1140 	}
1141 }
1142 
1143 const char *
1144 spdk_nbd_get_path(struct spdk_nbd_disk *nbd)
1145 {
1146 	return nbd->nbd_path;
1147 }
1148 
1149 SPDK_LOG_REGISTER_COMPONENT(nbd)
1150