xref: /spdk/lib/nbd/nbd.c (revision cc6920a4763d4b9a43aa40583c8397d8f14fa100)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/string.h"
36 
37 #include <linux/nbd.h>
38 
39 #include "spdk/nbd.h"
40 #include "nbd_internal.h"
41 #include "spdk/bdev.h"
42 #include "spdk/endian.h"
43 #include "spdk/env.h"
44 #include "spdk/likely.h"
45 #include "spdk/log.h"
46 #include "spdk/util.h"
47 #include "spdk/thread.h"
48 
49 #include "spdk/queue.h"
50 
51 #define GET_IO_LOOP_COUNT		16
52 #define NBD_START_BUSY_WAITING_MS	1000
53 #define NBD_STOP_BUSY_WAITING_MS	10000
54 #define NBD_BUSY_POLLING_INTERVAL_US	20000
55 #define NBD_IO_TIMEOUT_S		60
56 
57 enum nbd_io_state_t {
58 	/* Receiving or ready to receive nbd request header */
59 	NBD_IO_RECV_REQ = 0,
60 	/* Receiving write payload */
61 	NBD_IO_RECV_PAYLOAD,
62 	/* Transmitting or ready to transmit nbd response header */
63 	NBD_IO_XMIT_RESP,
64 	/* Transmitting read payload */
65 	NBD_IO_XMIT_PAYLOAD,
66 };
67 
68 struct nbd_io {
69 	struct spdk_nbd_disk	*nbd;
70 	enum nbd_io_state_t	state;
71 
72 	void			*payload;
73 	uint32_t		payload_size;
74 
75 	struct nbd_request	req;
76 	struct nbd_reply	resp;
77 
78 	/*
79 	 * Tracks current progress on reading/writing a request,
80 	 * response, or payload from the nbd socket.
81 	 */
82 	uint32_t		offset;
83 
84 	/* for bdev io_wait */
85 	struct spdk_bdev_io_wait_entry bdev_io_wait;
86 
87 	TAILQ_ENTRY(nbd_io)	tailq;
88 };
89 
90 struct spdk_nbd_disk {
91 	struct spdk_bdev	*bdev;
92 	struct spdk_bdev_desc	*bdev_desc;
93 	struct spdk_io_channel	*ch;
94 	int			dev_fd;
95 	char			*nbd_path;
96 	int			kernel_sp_fd;
97 	int			spdk_sp_fd;
98 	struct spdk_poller	*nbd_poller;
99 	struct spdk_interrupt	*intr;
100 	bool			interrupt_mode;
101 	uint32_t		buf_align;
102 
103 	struct spdk_poller	*retry_poller;
104 	int			retry_count;
105 	/* Synchronize nbd_start_kernel pthread and nbd_stop */
106 	bool			has_nbd_pthread;
107 
108 	struct nbd_io		*io_in_recv;
109 	TAILQ_HEAD(, nbd_io)	received_io_list;
110 	TAILQ_HEAD(, nbd_io)	executed_io_list;
111 	TAILQ_HEAD(, nbd_io)	processing_io_list;
112 
113 	bool			is_started;
114 	bool			is_closing;
115 	/* count of nbd_io in spdk_nbd_disk */
116 	int			io_count;
117 
118 	TAILQ_ENTRY(spdk_nbd_disk)	tailq;
119 };
120 
121 struct spdk_nbd_disk_globals {
122 	TAILQ_HEAD(, spdk_nbd_disk)	disk_head;
123 };
124 
125 static struct spdk_nbd_disk_globals g_spdk_nbd;
126 static spdk_nbd_fini_cb g_fini_cb_fn;
127 static void *g_fini_cb_arg;
128 
129 static void _nbd_fini(void *arg1);
130 
131 static int
132 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io);
133 static int
134 nbd_io_recv_internal(struct spdk_nbd_disk *nbd);
135 
136 int
137 spdk_nbd_init(void)
138 {
139 	TAILQ_INIT(&g_spdk_nbd.disk_head);
140 
141 	return 0;
142 }
143 
144 static void
145 _nbd_fini(void *arg1)
146 {
147 	struct spdk_nbd_disk *nbd, *nbd_tmp;
148 
149 	TAILQ_FOREACH_SAFE(nbd, &g_spdk_nbd.disk_head, tailq, nbd_tmp) {
150 		if (!nbd->is_closing) {
151 			spdk_nbd_stop(nbd);
152 		}
153 	}
154 
155 	/* Check if all nbds closed */
156 	if (!TAILQ_FIRST(&g_spdk_nbd.disk_head)) {
157 		g_fini_cb_fn(g_fini_cb_arg);
158 	} else {
159 		spdk_thread_send_msg(spdk_get_thread(),
160 				     _nbd_fini, NULL);
161 	}
162 }
163 
164 void
165 spdk_nbd_fini(spdk_nbd_fini_cb cb_fn, void *cb_arg)
166 {
167 	g_fini_cb_fn = cb_fn;
168 	g_fini_cb_arg = cb_arg;
169 
170 	_nbd_fini(NULL);
171 }
172 
173 static int
174 nbd_disk_register(struct spdk_nbd_disk *nbd)
175 {
176 	/* Make sure nbd_path is not used in this SPDK app */
177 	if (nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
178 		SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path);
179 		return -EBUSY;
180 	}
181 
182 	TAILQ_INSERT_TAIL(&g_spdk_nbd.disk_head, nbd, tailq);
183 
184 	return 0;
185 }
186 
187 static void
188 nbd_disk_unregister(struct spdk_nbd_disk *nbd)
189 {
190 	struct spdk_nbd_disk *nbd_idx, *nbd_tmp;
191 
192 	/*
193 	 * nbd disk may be stopped before registered.
194 	 * check whether it was registered.
195 	 */
196 	TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) {
197 		if (nbd == nbd_idx) {
198 			TAILQ_REMOVE(&g_spdk_nbd.disk_head, nbd_idx, tailq);
199 			break;
200 		}
201 	}
202 }
203 
204 struct spdk_nbd_disk *
205 nbd_disk_find_by_nbd_path(const char *nbd_path)
206 {
207 	struct spdk_nbd_disk *nbd;
208 
209 	/*
210 	 * check whether nbd has already been registered by nbd path.
211 	 */
212 	TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) {
213 		if (!strcmp(nbd->nbd_path, nbd_path)) {
214 			return nbd;
215 		}
216 	}
217 
218 	return NULL;
219 }
220 
221 struct spdk_nbd_disk *nbd_disk_first(void)
222 {
223 	return TAILQ_FIRST(&g_spdk_nbd.disk_head);
224 }
225 
226 struct spdk_nbd_disk *nbd_disk_next(struct spdk_nbd_disk *prev)
227 {
228 	return TAILQ_NEXT(prev, tailq);
229 }
230 
231 const char *
232 nbd_disk_get_nbd_path(struct spdk_nbd_disk *nbd)
233 {
234 	return nbd->nbd_path;
235 }
236 
237 const char *
238 nbd_disk_get_bdev_name(struct spdk_nbd_disk *nbd)
239 {
240 	return spdk_bdev_get_name(nbd->bdev);
241 }
242 
243 void
244 spdk_nbd_write_config_json(struct spdk_json_write_ctx *w)
245 {
246 	struct spdk_nbd_disk *nbd;
247 
248 	spdk_json_write_array_begin(w);
249 
250 	TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) {
251 		spdk_json_write_object_begin(w);
252 
253 		spdk_json_write_named_string(w, "method", "nbd_start_disk");
254 
255 		spdk_json_write_named_object_begin(w, "params");
256 		spdk_json_write_named_string(w, "nbd_device",  nbd_disk_get_nbd_path(nbd));
257 		spdk_json_write_named_string(w, "bdev_name", nbd_disk_get_bdev_name(nbd));
258 		spdk_json_write_object_end(w);
259 
260 		spdk_json_write_object_end(w);
261 	}
262 
263 	spdk_json_write_array_end(w);
264 }
265 
266 void
267 nbd_disconnect(struct spdk_nbd_disk *nbd)
268 {
269 	/*
270 	 * nbd soft-disconnection to terminate transmission phase.
271 	 * After receiving this ioctl command, nbd kernel module will send
272 	 * a NBD_CMD_DISC type io to nbd server in order to inform server.
273 	 */
274 	ioctl(nbd->dev_fd, NBD_DISCONNECT);
275 }
276 
277 static struct nbd_io *
278 nbd_get_io(struct spdk_nbd_disk *nbd)
279 {
280 	struct nbd_io *io;
281 
282 	io = calloc(1, sizeof(*io));
283 	if (!io) {
284 		return NULL;
285 	}
286 
287 	io->nbd = nbd;
288 	to_be32(&io->resp.magic, NBD_REPLY_MAGIC);
289 
290 	nbd->io_count++;
291 
292 	return io;
293 }
294 
295 static void
296 nbd_put_io(struct spdk_nbd_disk *nbd, struct nbd_io *io)
297 {
298 	if (io->payload) {
299 		spdk_free(io->payload);
300 	}
301 	free(io);
302 
303 	nbd->io_count--;
304 }
305 
306 /*
307  * Check whether received nbd_io are all executed,
308  * and put back executed nbd_io instead of transmitting them
309  *
310  * \return 1 there is still some nbd_io under executing
311  *         0 all nbd_io gotten are freed.
312  */
313 static int
314 nbd_cleanup_io(struct spdk_nbd_disk *nbd)
315 {
316 	/* Try to read the remaining nbd commands in the socket */
317 	while (nbd_io_recv_internal(nbd) > 0);
318 
319 	/* free io_in_recv */
320 	if (nbd->io_in_recv != NULL) {
321 		nbd_put_io(nbd, nbd->io_in_recv);
322 		nbd->io_in_recv = NULL;
323 	}
324 
325 	/*
326 	 * Some nbd_io may be under executing in bdev.
327 	 * Wait for their done operation.
328 	 */
329 	if (nbd->io_count != 0) {
330 		return 1;
331 	}
332 
333 	return 0;
334 }
335 
336 static int
337 _nbd_stop(void *arg)
338 {
339 	struct spdk_nbd_disk *nbd = arg;
340 
341 	if (nbd->nbd_poller) {
342 		spdk_poller_unregister(&nbd->nbd_poller);
343 	}
344 
345 	if (nbd->intr) {
346 		spdk_interrupt_unregister(&nbd->intr);
347 	}
348 
349 	if (nbd->spdk_sp_fd >= 0) {
350 		close(nbd->spdk_sp_fd);
351 		nbd->spdk_sp_fd = -1;
352 	}
353 
354 	if (nbd->kernel_sp_fd >= 0) {
355 		close(nbd->kernel_sp_fd);
356 		nbd->kernel_sp_fd = -1;
357 	}
358 
359 	/* Continue the stop procedure after the exit of nbd_start_kernel pthread */
360 	if (nbd->has_nbd_pthread) {
361 		if (nbd->retry_poller == NULL) {
362 			nbd->retry_count = NBD_STOP_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US;
363 			nbd->retry_poller = SPDK_POLLER_REGISTER(_nbd_stop, nbd,
364 					    NBD_BUSY_POLLING_INTERVAL_US);
365 			return SPDK_POLLER_BUSY;
366 		}
367 
368 		if (nbd->retry_count-- > 0) {
369 			return SPDK_POLLER_BUSY;
370 		}
371 
372 		SPDK_ERRLOG("Failed to wait for returning of NBD_DO_IT ioctl.\n");
373 	}
374 
375 	if (nbd->retry_poller) {
376 		spdk_poller_unregister(&nbd->retry_poller);
377 	}
378 
379 	if (nbd->dev_fd >= 0) {
380 		/* Clear nbd device only if it is occupied by SPDK app */
381 		if (nbd->nbd_path && nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
382 			ioctl(nbd->dev_fd, NBD_CLEAR_QUE);
383 			ioctl(nbd->dev_fd, NBD_CLEAR_SOCK);
384 		}
385 		close(nbd->dev_fd);
386 	}
387 
388 	if (nbd->nbd_path) {
389 		free(nbd->nbd_path);
390 	}
391 
392 	if (nbd->ch) {
393 		spdk_put_io_channel(nbd->ch);
394 		nbd->ch = NULL;
395 	}
396 
397 	if (nbd->bdev_desc) {
398 		spdk_bdev_close(nbd->bdev_desc);
399 		nbd->bdev_desc = NULL;
400 	}
401 
402 	nbd_disk_unregister(nbd);
403 
404 	free(nbd);
405 
406 	return 0;
407 }
408 
409 int
410 spdk_nbd_stop(struct spdk_nbd_disk *nbd)
411 {
412 	int rc = 0;
413 
414 	if (nbd == NULL) {
415 		return rc;
416 	}
417 
418 	nbd->is_closing = true;
419 
420 	/* if nbd is not started, it will continue to call nbd stop later */
421 	if (!nbd->is_started) {
422 		return 1;
423 	}
424 
425 	/*
426 	 * Stop action should be called only after all nbd_io are executed.
427 	 */
428 
429 	rc = nbd_cleanup_io(nbd);
430 	if (!rc) {
431 		_nbd_stop(nbd);
432 	}
433 
434 	return rc;
435 }
436 
437 static int64_t
438 nbd_socket_rw(int fd, void *buf, size_t length, bool read_op)
439 {
440 	ssize_t rc;
441 
442 	if (read_op) {
443 		rc = read(fd, buf, length);
444 	} else {
445 		rc = write(fd, buf, length);
446 	}
447 
448 	if (rc == 0) {
449 		return -EIO;
450 	} else if (rc == -1) {
451 		if (errno != EAGAIN) {
452 			return -errno;
453 		}
454 		return 0;
455 	} else {
456 		return rc;
457 	}
458 }
459 
460 static void
461 nbd_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
462 {
463 	struct nbd_io	*io = cb_arg;
464 	struct spdk_nbd_disk *nbd = io->nbd;
465 
466 	if (success) {
467 		io->resp.error = 0;
468 	} else {
469 		to_be32(&io->resp.error, EIO);
470 	}
471 
472 	memcpy(&io->resp.handle, &io->req.handle, sizeof(io->resp.handle));
473 
474 	/* When there begins to have executed_io, enable socket writable notice in order to
475 	 * get it processed in nbd_io_xmit
476 	 */
477 	if (nbd->interrupt_mode && TAILQ_EMPTY(&nbd->executed_io_list)) {
478 		spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT);
479 	}
480 
481 	TAILQ_REMOVE(&nbd->processing_io_list, io, tailq);
482 	TAILQ_INSERT_TAIL(&nbd->executed_io_list, io, tailq);
483 
484 	if (bdev_io != NULL) {
485 		spdk_bdev_free_io(bdev_io);
486 	}
487 }
488 
489 static void
490 nbd_resubmit_io(void *arg)
491 {
492 	struct nbd_io *io = (struct nbd_io *)arg;
493 	struct spdk_nbd_disk *nbd = io->nbd;
494 	int rc = 0;
495 
496 	rc = nbd_submit_bdev_io(nbd, io);
497 	if (rc) {
498 		SPDK_INFOLOG(nbd, "nbd: io resubmit for dev %s , io_type %d, returned %d.\n",
499 			     nbd_disk_get_bdev_name(nbd), from_be32(&io->req.type), rc);
500 	}
501 }
502 
503 static void
504 nbd_queue_io(struct nbd_io *io)
505 {
506 	int rc;
507 	struct spdk_bdev *bdev = io->nbd->bdev;
508 
509 	io->bdev_io_wait.bdev = bdev;
510 	io->bdev_io_wait.cb_fn = nbd_resubmit_io;
511 	io->bdev_io_wait.cb_arg = io;
512 
513 	rc = spdk_bdev_queue_io_wait(bdev, io->nbd->ch, &io->bdev_io_wait);
514 	if (rc != 0) {
515 		SPDK_ERRLOG("Queue io failed in nbd_queue_io, rc=%d.\n", rc);
516 		nbd_io_done(NULL, false, io);
517 	}
518 }
519 
520 static int
521 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io)
522 {
523 	struct spdk_bdev_desc *desc = nbd->bdev_desc;
524 	struct spdk_io_channel *ch = nbd->ch;
525 	int rc = 0;
526 
527 	switch (from_be32(&io->req.type)) {
528 	case NBD_CMD_READ:
529 		rc = spdk_bdev_read(desc, ch, io->payload, from_be64(&io->req.from),
530 				    io->payload_size, nbd_io_done, io);
531 		break;
532 	case NBD_CMD_WRITE:
533 		rc = spdk_bdev_write(desc, ch, io->payload, from_be64(&io->req.from),
534 				     io->payload_size, nbd_io_done, io);
535 		break;
536 #ifdef NBD_FLAG_SEND_FLUSH
537 	case NBD_CMD_FLUSH:
538 		rc = spdk_bdev_flush(desc, ch, 0,
539 				     spdk_bdev_get_num_blocks(nbd->bdev) * spdk_bdev_get_block_size(nbd->bdev),
540 				     nbd_io_done, io);
541 		break;
542 #endif
543 #ifdef NBD_FLAG_SEND_TRIM
544 	case NBD_CMD_TRIM:
545 		rc = spdk_bdev_unmap(desc, ch, from_be64(&io->req.from),
546 				     from_be32(&io->req.len), nbd_io_done, io);
547 		break;
548 #endif
549 	default:
550 		rc = -1;
551 	}
552 
553 	if (rc < 0) {
554 		if (rc == -ENOMEM) {
555 			SPDK_INFOLOG(nbd, "No memory, start to queue io.\n");
556 			nbd_queue_io(io);
557 		} else {
558 			SPDK_ERRLOG("nbd io failed in nbd_queue_io, rc=%d.\n", rc);
559 			nbd_io_done(NULL, false, io);
560 		}
561 	}
562 
563 	return 0;
564 }
565 
566 static int
567 nbd_io_exec(struct spdk_nbd_disk *nbd)
568 {
569 	struct nbd_io *io, *io_tmp;
570 	int io_count = 0;
571 	int ret = 0;
572 
573 	if (!TAILQ_EMPTY(&nbd->received_io_list)) {
574 		TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) {
575 			TAILQ_REMOVE(&nbd->received_io_list, io, tailq);
576 			TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq);
577 			ret = nbd_submit_bdev_io(nbd, io);
578 			if (ret < 0) {
579 				return ret;
580 			}
581 
582 			io_count++;
583 		}
584 	}
585 
586 	return io_count;
587 }
588 
589 static int
590 nbd_io_recv_internal(struct spdk_nbd_disk *nbd)
591 {
592 	struct nbd_io *io;
593 	int ret = 0;
594 	int received = 0;
595 
596 	if (nbd->io_in_recv == NULL) {
597 		nbd->io_in_recv = nbd_get_io(nbd);
598 		if (!nbd->io_in_recv) {
599 			return -ENOMEM;
600 		}
601 	}
602 
603 	io = nbd->io_in_recv;
604 
605 	if (io->state == NBD_IO_RECV_REQ) {
606 		ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->req + io->offset,
607 				    sizeof(io->req) - io->offset, true);
608 		if (ret < 0) {
609 			nbd_put_io(nbd, io);
610 			nbd->io_in_recv = NULL;
611 			return ret;
612 		}
613 
614 		io->offset += ret;
615 		received = ret;
616 
617 		/* request is fully received */
618 		if (io->offset == sizeof(io->req)) {
619 			io->offset = 0;
620 
621 			/* req magic check */
622 			if (from_be32(&io->req.magic) != NBD_REQUEST_MAGIC) {
623 				SPDK_ERRLOG("invalid request magic\n");
624 				nbd_put_io(nbd, io);
625 				nbd->io_in_recv = NULL;
626 				return -EINVAL;
627 			}
628 
629 			if (from_be32(&io->req.type) == NBD_CMD_DISC) {
630 				nbd->is_closing = true;
631 				nbd->io_in_recv = NULL;
632 				if (nbd->interrupt_mode && TAILQ_EMPTY(&nbd->executed_io_list)) {
633 					spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT);
634 				}
635 				nbd_put_io(nbd, io);
636 				/* After receiving NBD_CMD_DISC, nbd will not receive any new commands */
637 				return received;
638 			}
639 
640 			/* io except read/write should ignore payload */
641 			if (from_be32(&io->req.type) == NBD_CMD_WRITE ||
642 			    from_be32(&io->req.type) == NBD_CMD_READ) {
643 				io->payload_size = from_be32(&io->req.len);
644 			} else {
645 				io->payload_size = 0;
646 			}
647 
648 			/* io payload allocate */
649 			if (io->payload_size) {
650 				io->payload = spdk_malloc(io->payload_size, nbd->buf_align, NULL,
651 							  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
652 				if (io->payload == NULL) {
653 					SPDK_ERRLOG("could not allocate io->payload of size %d\n", io->payload_size);
654 					nbd_put_io(nbd, io);
655 					nbd->io_in_recv = NULL;
656 					return -ENOMEM;
657 				}
658 			} else {
659 				io->payload = NULL;
660 			}
661 
662 			/* next io step */
663 			if (from_be32(&io->req.type) == NBD_CMD_WRITE) {
664 				io->state = NBD_IO_RECV_PAYLOAD;
665 			} else {
666 				io->state = NBD_IO_XMIT_RESP;
667 				if (spdk_likely((!nbd->is_closing) && nbd->is_started)) {
668 					TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq);
669 				} else {
670 					TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq);
671 					nbd_io_done(NULL, false, io);
672 				}
673 				nbd->io_in_recv = NULL;
674 			}
675 		}
676 	}
677 
678 	if (io->state == NBD_IO_RECV_PAYLOAD) {
679 		ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset, true);
680 		if (ret < 0) {
681 			nbd_put_io(nbd, io);
682 			nbd->io_in_recv = NULL;
683 			return ret;
684 		}
685 
686 		io->offset += ret;
687 		received += ret;
688 
689 		/* request payload is fully received */
690 		if (io->offset == io->payload_size) {
691 			io->offset = 0;
692 			io->state = NBD_IO_XMIT_RESP;
693 			if (spdk_likely((!nbd->is_closing) && nbd->is_started)) {
694 				TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq);
695 			} else {
696 				TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq);
697 				nbd_io_done(NULL, false, io);
698 			}
699 			nbd->io_in_recv = NULL;
700 		}
701 
702 	}
703 
704 	return received;
705 }
706 
707 static int
708 nbd_io_recv(struct spdk_nbd_disk *nbd)
709 {
710 	int i, rc, ret = 0;
711 
712 	/*
713 	 * nbd server should not accept request after closing command
714 	 */
715 	if (nbd->is_closing) {
716 		return 0;
717 	}
718 
719 	for (i = 0; i < GET_IO_LOOP_COUNT; i++) {
720 		rc = nbd_io_recv_internal(nbd);
721 		if (rc < 0) {
722 			return rc;
723 		}
724 		ret += rc;
725 		if (nbd->is_closing) {
726 			break;
727 		}
728 	}
729 
730 	return ret;
731 }
732 
733 static int
734 nbd_io_xmit_internal(struct spdk_nbd_disk *nbd)
735 {
736 	struct nbd_io *io;
737 	int ret = 0;
738 	int sent = 0;
739 
740 	io = TAILQ_FIRST(&nbd->executed_io_list);
741 	if (io == NULL) {
742 		return 0;
743 	}
744 
745 	/* Remove IO from list now assuming it will be completed.  It will be inserted
746 	 *  back to the head if it cannot be completed.  This approach is specifically
747 	 *  taken to work around a scan-build use-after-free mischaracterization.
748 	 */
749 	TAILQ_REMOVE(&nbd->executed_io_list, io, tailq);
750 
751 	/* resp error and handler are already set in io_done */
752 
753 	if (io->state == NBD_IO_XMIT_RESP) {
754 		ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->resp + io->offset,
755 				    sizeof(io->resp) - io->offset, false);
756 		if (ret <= 0) {
757 			goto reinsert;
758 		}
759 
760 		io->offset += ret;
761 		sent = ret;
762 
763 		/* response is fully transmitted */
764 		if (io->offset == sizeof(io->resp)) {
765 			io->offset = 0;
766 
767 			/* transmit payload only when NBD_CMD_READ with no resp error */
768 			if (from_be32(&io->req.type) != NBD_CMD_READ || io->resp.error != 0) {
769 				nbd_put_io(nbd, io);
770 				return 0;
771 			} else {
772 				io->state = NBD_IO_XMIT_PAYLOAD;
773 			}
774 		}
775 	}
776 
777 	if (io->state == NBD_IO_XMIT_PAYLOAD) {
778 		ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset,
779 				    false);
780 		if (ret <= 0) {
781 			goto reinsert;
782 		}
783 
784 		io->offset += ret;
785 		sent += ret;
786 
787 		/* read payload is fully transmitted */
788 		if (io->offset == io->payload_size) {
789 			nbd_put_io(nbd, io);
790 			return sent;
791 		}
792 	}
793 
794 reinsert:
795 	TAILQ_INSERT_HEAD(&nbd->executed_io_list, io, tailq);
796 	return ret < 0 ? ret : sent;
797 }
798 
799 static int
800 nbd_io_xmit(struct spdk_nbd_disk *nbd)
801 {
802 	int ret = 0;
803 	int rc;
804 
805 	while (!TAILQ_EMPTY(&nbd->executed_io_list)) {
806 		rc = nbd_io_xmit_internal(nbd);
807 		if (rc < 0) {
808 			return rc;
809 		}
810 
811 		ret += rc;
812 	}
813 
814 	/* When there begins to have no executed_io, disable socket writable notice */
815 	if (nbd->interrupt_mode) {
816 		spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN);
817 	}
818 
819 	return ret;
820 }
821 
822 /**
823  * Poll an NBD instance.
824  *
825  * \return 0 on success or negated errno values on error (e.g. connection closed).
826  */
827 static int
828 _nbd_poll(struct spdk_nbd_disk *nbd)
829 {
830 	int received, sent, executed;
831 
832 	/* transmit executed io first */
833 	sent = nbd_io_xmit(nbd);
834 	if (sent < 0) {
835 		return sent;
836 	}
837 
838 	received = nbd_io_recv(nbd);
839 	if (received < 0) {
840 		return received;
841 	}
842 
843 	executed = nbd_io_exec(nbd);
844 	if (executed < 0) {
845 		return executed;
846 	}
847 
848 	return sent + received + executed;
849 }
850 
851 static int
852 nbd_poll(void *arg)
853 {
854 	struct spdk_nbd_disk *nbd = arg;
855 	int rc;
856 
857 	rc = _nbd_poll(nbd);
858 	if (rc < 0) {
859 		SPDK_INFOLOG(nbd, "nbd_poll() returned %s (%d); closing connection\n",
860 			     spdk_strerror(-rc), rc);
861 		_nbd_stop(nbd);
862 		return SPDK_POLLER_IDLE;
863 	}
864 	if (nbd->is_closing) {
865 		spdk_nbd_stop(nbd);
866 	}
867 
868 	return SPDK_POLLER_BUSY;
869 }
870 
871 static void *
872 nbd_start_kernel(void *arg)
873 {
874 	struct spdk_nbd_disk *nbd = arg;
875 
876 	spdk_unaffinitize_thread();
877 
878 	/* This will block in the kernel until we close the spdk_sp_fd. */
879 	ioctl(nbd->dev_fd, NBD_DO_IT);
880 
881 	nbd->has_nbd_pthread = false;
882 
883 	pthread_exit(NULL);
884 }
885 
886 static void
887 nbd_bdev_hot_remove(struct spdk_nbd_disk *nbd)
888 {
889 	struct nbd_io *io, *io_tmp;
890 
891 	nbd->is_closing = true;
892 	nbd_cleanup_io(nbd);
893 
894 	if (!TAILQ_EMPTY(&nbd->received_io_list)) {
895 		TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) {
896 			TAILQ_REMOVE(&nbd->received_io_list, io, tailq);
897 			TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq);
898 		}
899 	}
900 	if (!TAILQ_EMPTY(&nbd->processing_io_list)) {
901 		TAILQ_FOREACH_SAFE(io, &nbd->processing_io_list, tailq, io_tmp) {
902 			nbd_io_done(NULL, false, io);
903 		}
904 	}
905 }
906 
907 static void
908 nbd_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
909 		  void *event_ctx)
910 {
911 	switch (type) {
912 	case SPDK_BDEV_EVENT_REMOVE:
913 		nbd_bdev_hot_remove(event_ctx);
914 		break;
915 	default:
916 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
917 		break;
918 	}
919 }
920 
921 struct spdk_nbd_start_ctx {
922 	struct spdk_nbd_disk	*nbd;
923 	spdk_nbd_start_cb	cb_fn;
924 	void			*cb_arg;
925 };
926 
927 static void
928 nbd_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
929 {
930 	struct spdk_nbd_disk *nbd = cb_arg;
931 
932 	nbd->interrupt_mode = interrupt_mode;
933 }
934 
935 static void
936 nbd_start_complete(struct spdk_nbd_start_ctx *ctx)
937 {
938 	int		rc;
939 	pthread_t	tid;
940 	unsigned long	nbd_flags = 0;
941 
942 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_BLKSIZE, spdk_bdev_get_block_size(ctx->nbd->bdev));
943 	if (rc == -1) {
944 		SPDK_ERRLOG("ioctl(NBD_SET_BLKSIZE) failed: %s\n", spdk_strerror(errno));
945 		rc = -errno;
946 		goto err;
947 	}
948 
949 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SIZE_BLOCKS, spdk_bdev_get_num_blocks(ctx->nbd->bdev));
950 	if (rc == -1) {
951 		SPDK_ERRLOG("ioctl(NBD_SET_SIZE_BLOCKS) failed: %s\n", spdk_strerror(errno));
952 		rc = -errno;
953 		goto err;
954 	}
955 
956 #ifdef NBD_SET_TIMEOUT
957 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_TIMEOUT, NBD_IO_TIMEOUT_S);
958 	if (rc == -1) {
959 		SPDK_ERRLOG("ioctl(NBD_SET_TIMEOUT) failed: %s\n", spdk_strerror(errno));
960 		rc = -errno;
961 		goto err;
962 	}
963 #else
964 	SPDK_NOTICELOG("ioctl(NBD_SET_TIMEOUT) is not supported.\n");
965 #endif
966 
967 #ifdef NBD_FLAG_SEND_FLUSH
968 	nbd_flags |= NBD_FLAG_SEND_FLUSH;
969 #endif
970 #ifdef NBD_FLAG_SEND_TRIM
971 	nbd_flags |= NBD_FLAG_SEND_TRIM;
972 #endif
973 	if (nbd_flags) {
974 		rc = ioctl(ctx->nbd->dev_fd, NBD_SET_FLAGS, nbd_flags);
975 		if (rc == -1) {
976 			SPDK_ERRLOG("ioctl(NBD_SET_FLAGS, 0x%lx) failed: %s\n", nbd_flags, spdk_strerror(errno));
977 			rc = -errno;
978 			goto err;
979 		}
980 	}
981 
982 	ctx->nbd->has_nbd_pthread = true;
983 	rc = pthread_create(&tid, NULL, nbd_start_kernel, ctx->nbd);
984 	if (rc != 0) {
985 		ctx->nbd->has_nbd_pthread = false;
986 		SPDK_ERRLOG("could not create thread: %s\n", spdk_strerror(rc));
987 		rc = -rc;
988 		goto err;
989 	}
990 
991 	rc = pthread_detach(tid);
992 	if (rc != 0) {
993 		SPDK_ERRLOG("could not detach thread for nbd kernel: %s\n", spdk_strerror(rc));
994 		rc = -rc;
995 		goto err;
996 	}
997 
998 	if (spdk_interrupt_mode_is_enabled()) {
999 		ctx->nbd->intr = SPDK_INTERRUPT_REGISTER(ctx->nbd->spdk_sp_fd, nbd_poll, ctx->nbd);
1000 	}
1001 
1002 	ctx->nbd->nbd_poller = SPDK_POLLER_REGISTER(nbd_poll, ctx->nbd, 0);
1003 	spdk_poller_register_interrupt(ctx->nbd->nbd_poller, nbd_poller_set_interrupt_mode, ctx->nbd);
1004 
1005 	if (ctx->cb_fn) {
1006 		ctx->cb_fn(ctx->cb_arg, ctx->nbd, 0);
1007 	}
1008 
1009 	/* nbd will possibly receive stop command while initing */
1010 	ctx->nbd->is_started = true;
1011 
1012 	free(ctx);
1013 	return;
1014 
1015 err:
1016 	_nbd_stop(ctx->nbd);
1017 	if (ctx->cb_fn) {
1018 		ctx->cb_fn(ctx->cb_arg, NULL, rc);
1019 	}
1020 	free(ctx);
1021 }
1022 
1023 static int
1024 nbd_enable_kernel(void *arg)
1025 {
1026 	struct spdk_nbd_start_ctx *ctx = arg;
1027 	int rc;
1028 
1029 	/* Declare device setup by this process */
1030 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SOCK, ctx->nbd->kernel_sp_fd);
1031 
1032 	if (rc) {
1033 		if (errno == EBUSY) {
1034 			if (ctx->nbd->retry_poller == NULL) {
1035 				ctx->nbd->retry_count = NBD_START_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US;
1036 				ctx->nbd->retry_poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx,
1037 							 NBD_BUSY_POLLING_INTERVAL_US);
1038 				return SPDK_POLLER_BUSY;
1039 			} else if (ctx->nbd->retry_count-- > 0) {
1040 				/* Repeatedly unregister and register retry poller to avoid scan-build error */
1041 				spdk_poller_unregister(&ctx->nbd->retry_poller);
1042 				ctx->nbd->retry_poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx,
1043 							 NBD_BUSY_POLLING_INTERVAL_US);
1044 				return SPDK_POLLER_BUSY;
1045 			}
1046 		}
1047 
1048 		SPDK_ERRLOG("ioctl(NBD_SET_SOCK) failed: %s\n", spdk_strerror(errno));
1049 		if (ctx->nbd->retry_poller) {
1050 			spdk_poller_unregister(&ctx->nbd->retry_poller);
1051 		}
1052 
1053 		_nbd_stop(ctx->nbd);
1054 
1055 		if (ctx->cb_fn) {
1056 			ctx->cb_fn(ctx->cb_arg, NULL, -errno);
1057 		}
1058 
1059 		free(ctx);
1060 		return SPDK_POLLER_BUSY;
1061 	}
1062 
1063 	if (ctx->nbd->retry_poller) {
1064 		spdk_poller_unregister(&ctx->nbd->retry_poller);
1065 	}
1066 
1067 	nbd_start_complete(ctx);
1068 
1069 	return SPDK_POLLER_BUSY;
1070 }
1071 
1072 void
1073 spdk_nbd_start(const char *bdev_name, const char *nbd_path,
1074 	       spdk_nbd_start_cb cb_fn, void *cb_arg)
1075 {
1076 	struct spdk_nbd_start_ctx	*ctx = NULL;
1077 	struct spdk_nbd_disk		*nbd = NULL;
1078 	struct spdk_bdev		*bdev;
1079 	int				rc;
1080 	int				sp[2];
1081 
1082 	nbd = calloc(1, sizeof(*nbd));
1083 	if (nbd == NULL) {
1084 		rc = -ENOMEM;
1085 		goto err;
1086 	}
1087 
1088 	nbd->dev_fd = -1;
1089 	nbd->spdk_sp_fd = -1;
1090 	nbd->kernel_sp_fd = -1;
1091 
1092 	ctx = calloc(1, sizeof(*ctx));
1093 	if (ctx == NULL) {
1094 		rc = -ENOMEM;
1095 		goto err;
1096 	}
1097 
1098 	ctx->nbd = nbd;
1099 	ctx->cb_fn = cb_fn;
1100 	ctx->cb_arg = cb_arg;
1101 
1102 	rc = spdk_bdev_open_ext(bdev_name, true, nbd_bdev_event_cb, nbd, &nbd->bdev_desc);
1103 	if (rc != 0) {
1104 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc);
1105 		goto err;
1106 	}
1107 
1108 	bdev = spdk_bdev_desc_get_bdev(nbd->bdev_desc);
1109 	nbd->bdev = bdev;
1110 
1111 	nbd->ch = spdk_bdev_get_io_channel(nbd->bdev_desc);
1112 	nbd->buf_align = spdk_max(spdk_bdev_get_buf_align(bdev), 64);
1113 
1114 	rc = socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, sp);
1115 	if (rc != 0) {
1116 		SPDK_ERRLOG("socketpair failed\n");
1117 		rc = -errno;
1118 		goto err;
1119 	}
1120 
1121 	nbd->spdk_sp_fd = sp[0];
1122 	nbd->kernel_sp_fd = sp[1];
1123 	nbd->nbd_path = strdup(nbd_path);
1124 	if (!nbd->nbd_path) {
1125 		SPDK_ERRLOG("strdup allocation failure\n");
1126 		rc = -ENOMEM;
1127 		goto err;
1128 	}
1129 
1130 	TAILQ_INIT(&nbd->received_io_list);
1131 	TAILQ_INIT(&nbd->executed_io_list);
1132 	TAILQ_INIT(&nbd->processing_io_list);
1133 
1134 	/* Add nbd_disk to the end of disk list */
1135 	rc = nbd_disk_register(ctx->nbd);
1136 	if (rc != 0) {
1137 		goto err;
1138 	}
1139 
1140 	nbd->dev_fd = open(nbd_path, O_RDWR | O_DIRECT);
1141 	if (nbd->dev_fd == -1) {
1142 		SPDK_ERRLOG("open(\"%s\") failed: %s\n", nbd_path, spdk_strerror(errno));
1143 		rc = -errno;
1144 		goto err;
1145 	}
1146 
1147 	SPDK_INFOLOG(nbd, "Enabling kernel access to bdev %s via %s\n",
1148 		     bdev_name, nbd_path);
1149 
1150 	nbd_enable_kernel(ctx);
1151 	return;
1152 
1153 err:
1154 	free(ctx);
1155 	if (nbd) {
1156 		_nbd_stop(nbd);
1157 	}
1158 
1159 	if (cb_fn) {
1160 		cb_fn(cb_arg, NULL, rc);
1161 	}
1162 }
1163 
1164 const char *
1165 spdk_nbd_get_path(struct spdk_nbd_disk *nbd)
1166 {
1167 	return nbd->nbd_path;
1168 }
1169 
1170 SPDK_LOG_REGISTER_COMPONENT(nbd)
1171