xref: /spdk/lib/nbd/nbd.c (revision ba23cec1820104cc710ad776f0127e1cf82033aa)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/string.h"
36 
37 #include <linux/nbd.h>
38 
39 #include "spdk/nbd.h"
40 #include "nbd_internal.h"
41 #include "spdk/bdev.h"
42 #include "spdk/endian.h"
43 #include "spdk/env.h"
44 #include "spdk/log.h"
45 #include "spdk/util.h"
46 #include "spdk/thread.h"
47 
48 #include "spdk_internal/log.h"
49 #include "spdk/queue.h"
50 
51 #define GET_IO_LOOP_COUNT		16
52 #define NBD_BUSY_WAITING_MS		1000
53 #define NBD_BUSY_POLLING_INTERVAL_US	20000
54 
55 enum nbd_io_state_t {
56 	/* Receiving or ready to receive nbd request header */
57 	NBD_IO_RECV_REQ = 0,
58 	/* Receiving write payload */
59 	NBD_IO_RECV_PAYLOAD,
60 	/* Transmitting or ready to transmit nbd response header */
61 	NBD_IO_XMIT_RESP,
62 	/* Transmitting read payload */
63 	NBD_IO_XMIT_PAYLOAD,
64 };
65 
66 struct nbd_io {
67 	struct spdk_nbd_disk	*nbd;
68 	enum nbd_io_state_t	state;
69 
70 	void			*payload;
71 	uint32_t		payload_size;
72 
73 	struct nbd_request	req;
74 	struct nbd_reply	resp;
75 
76 	/*
77 	 * Tracks current progress on reading/writing a request,
78 	 * response, or payload from the nbd socket.
79 	 */
80 	uint32_t		offset;
81 
82 	/* for bdev io_wait */
83 	struct spdk_bdev_io_wait_entry bdev_io_wait;
84 
85 	TAILQ_ENTRY(nbd_io)	tailq;
86 };
87 
88 enum nbd_disk_state_t {
89 	NBD_DISK_STATE_RUNNING = 0,
90 	/* soft disconnection caused by receiving nbd_cmd_disc */
91 	NBD_DISK_STATE_SOFTDISC,
92 	/* hard disconnection caused by mandatory conditions */
93 	NBD_DISK_STATE_HARDDISC,
94 };
95 
96 struct spdk_nbd_disk {
97 	struct spdk_bdev	*bdev;
98 	struct spdk_bdev_desc	*bdev_desc;
99 	struct spdk_io_channel	*ch;
100 	int			dev_fd;
101 	char			*nbd_path;
102 	int			kernel_sp_fd;
103 	int			spdk_sp_fd;
104 	struct spdk_poller	*nbd_poller;
105 	uint32_t		buf_align;
106 
107 	struct nbd_io		*io_in_recv;
108 	TAILQ_HEAD(, nbd_io)	received_io_list;
109 	TAILQ_HEAD(, nbd_io)	executed_io_list;
110 
111 	enum nbd_disk_state_t	state;
112 	/* count of nbd_io in spdk_nbd_disk */
113 	int			io_count;
114 
115 	TAILQ_ENTRY(spdk_nbd_disk)	tailq;
116 };
117 
118 struct spdk_nbd_disk_globals {
119 	TAILQ_HEAD(, spdk_nbd_disk)	disk_head;
120 };
121 
122 static struct spdk_nbd_disk_globals g_spdk_nbd;
123 
124 static int
125 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io);
126 
127 int
128 spdk_nbd_init(void)
129 {
130 	TAILQ_INIT(&g_spdk_nbd.disk_head);
131 
132 	return 0;
133 }
134 
135 void
136 spdk_nbd_fini(void)
137 {
138 	struct spdk_nbd_disk *nbd_idx, *nbd_tmp;
139 
140 	/*
141 	 * Stop running spdk_nbd_disk.
142 	 * Here, nbd removing are unnecessary, but _SAFE variant
143 	 * is needed, since internal nbd_disk_unregister will
144 	 * remove nbd from TAILQ.
145 	 */
146 	TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) {
147 		spdk_nbd_stop(nbd_idx);
148 	}
149 }
150 
151 static int
152 nbd_disk_register(struct spdk_nbd_disk *nbd)
153 {
154 	if (nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
155 		SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path);
156 		return -EBUSY;
157 	}
158 
159 	TAILQ_INSERT_TAIL(&g_spdk_nbd.disk_head, nbd, tailq);
160 
161 	return 0;
162 }
163 
164 static void
165 nbd_disk_unregister(struct spdk_nbd_disk *nbd)
166 {
167 	struct spdk_nbd_disk *nbd_idx, *nbd_tmp;
168 
169 	/*
170 	 * nbd disk may be stopped before registered.
171 	 * check whether it was registered.
172 	 */
173 	TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) {
174 		if (nbd == nbd_idx) {
175 			TAILQ_REMOVE(&g_spdk_nbd.disk_head, nbd_idx, tailq);
176 			break;
177 		}
178 	}
179 }
180 
181 struct spdk_nbd_disk *
182 nbd_disk_find_by_nbd_path(const char *nbd_path)
183 {
184 	struct spdk_nbd_disk *nbd;
185 
186 	/*
187 	 * check whether nbd has already been registered by nbd path.
188 	 */
189 	TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) {
190 		if (!strcmp(nbd->nbd_path, nbd_path)) {
191 			return nbd;
192 		}
193 	}
194 
195 	return NULL;
196 }
197 
198 struct spdk_nbd_disk *nbd_disk_first(void)
199 {
200 	return TAILQ_FIRST(&g_spdk_nbd.disk_head);
201 }
202 
203 struct spdk_nbd_disk *nbd_disk_next(struct spdk_nbd_disk *prev)
204 {
205 	return TAILQ_NEXT(prev, tailq);
206 }
207 
208 const char *
209 nbd_disk_get_nbd_path(struct spdk_nbd_disk *nbd)
210 {
211 	return nbd->nbd_path;
212 }
213 
214 const char *
215 nbd_disk_get_bdev_name(struct spdk_nbd_disk *nbd)
216 {
217 	return spdk_bdev_get_name(nbd->bdev);
218 }
219 
220 void
221 spdk_nbd_write_config_json(struct spdk_json_write_ctx *w)
222 {
223 	struct spdk_nbd_disk *nbd;
224 
225 	spdk_json_write_array_begin(w);
226 
227 	TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) {
228 		spdk_json_write_object_begin(w);
229 
230 		spdk_json_write_named_string(w, "method", "nbd_start_disk");
231 
232 		spdk_json_write_named_object_begin(w, "params");
233 		spdk_json_write_named_string(w, "nbd_device",  nbd_disk_get_nbd_path(nbd));
234 		spdk_json_write_named_string(w, "bdev_name", nbd_disk_get_bdev_name(nbd));
235 		spdk_json_write_object_end(w);
236 
237 		spdk_json_write_object_end(w);
238 	}
239 
240 	spdk_json_write_array_end(w);
241 }
242 
243 void
244 nbd_disconnect(struct spdk_nbd_disk *nbd)
245 {
246 	/*
247 	 * nbd soft-disconnection to terminate transmission phase.
248 	 * After receiving this ioctl command, nbd kernel module will send
249 	 * a NBD_CMD_DISC type io to nbd server in order to inform server.
250 	 */
251 	ioctl(nbd->dev_fd, NBD_DISCONNECT);
252 }
253 
254 static struct nbd_io *
255 nbd_get_io(struct spdk_nbd_disk *nbd)
256 {
257 	struct nbd_io *io;
258 
259 	io = calloc(1, sizeof(*io));
260 	if (!io) {
261 		return NULL;
262 	}
263 
264 	io->nbd = nbd;
265 	to_be32(&io->resp.magic, NBD_REPLY_MAGIC);
266 
267 	nbd->io_count++;
268 
269 	return io;
270 }
271 
272 static void
273 nbd_put_io(struct spdk_nbd_disk *nbd, struct nbd_io *io)
274 {
275 	if (io->payload) {
276 		spdk_free(io->payload);
277 	}
278 	free(io);
279 
280 	nbd->io_count--;
281 }
282 
283 /*
284  * Check whether received nbd_io are all transmitted.
285  *
286  * \return 1 there is still some nbd_io not transmitted.
287  *         0 all nbd_io received are transmitted.
288  */
289 static int
290 nbd_io_xmit_check(struct spdk_nbd_disk *nbd)
291 {
292 	if (nbd->io_count == 0) {
293 		return 0;
294 	} else if (nbd->io_count == 1 && nbd->io_in_recv != NULL) {
295 		return 0;
296 	}
297 
298 	return 1;
299 }
300 
301 /*
302  * Check whether received nbd_io are all executed,
303  * and put back executed nbd_io instead of transmitting them
304  *
305  * \return 1 there is still some nbd_io under executing
306  *         0 all nbd_io gotten are freed.
307  */
308 static int
309 nbd_cleanup_io(struct spdk_nbd_disk *nbd)
310 {
311 	struct nbd_io *io, *io_tmp;
312 
313 	/* free io_in_recv */
314 	if (nbd->io_in_recv != NULL) {
315 		nbd_put_io(nbd, nbd->io_in_recv);
316 		nbd->io_in_recv = NULL;
317 	}
318 
319 	/* free io in received_io_list */
320 	if (!TAILQ_EMPTY(&nbd->received_io_list)) {
321 		TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) {
322 			TAILQ_REMOVE(&nbd->received_io_list, io, tailq);
323 			nbd_put_io(nbd, io);
324 		}
325 	}
326 
327 	/* free io in executed_io_list */
328 	if (!TAILQ_EMPTY(&nbd->executed_io_list)) {
329 		TAILQ_FOREACH_SAFE(io, &nbd->executed_io_list, tailq, io_tmp) {
330 			TAILQ_REMOVE(&nbd->executed_io_list, io, tailq);
331 			nbd_put_io(nbd, io);
332 		}
333 	}
334 
335 	/*
336 	 * Some nbd_io may be under executing in bdev.
337 	 * Wait for their done operation.
338 	 */
339 	if (nbd->io_count != 0) {
340 		return 1;
341 	}
342 
343 	return 0;
344 }
345 
346 static void
347 _nbd_stop(struct spdk_nbd_disk *nbd)
348 {
349 	if (nbd->ch) {
350 		spdk_put_io_channel(nbd->ch);
351 	}
352 
353 	if (nbd->bdev_desc) {
354 		spdk_bdev_close(nbd->bdev_desc);
355 	}
356 
357 	if (nbd->spdk_sp_fd >= 0) {
358 		close(nbd->spdk_sp_fd);
359 	}
360 
361 	if (nbd->kernel_sp_fd >= 0) {
362 		close(nbd->kernel_sp_fd);
363 	}
364 
365 	if (nbd->dev_fd >= 0) {
366 		/* Clear nbd device only if it is occupied by SPDK app */
367 		if (nbd->nbd_path && nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
368 			ioctl(nbd->dev_fd, NBD_CLEAR_QUE);
369 			ioctl(nbd->dev_fd, NBD_CLEAR_SOCK);
370 		}
371 		close(nbd->dev_fd);
372 	}
373 
374 	if (nbd->nbd_path) {
375 		free(nbd->nbd_path);
376 	}
377 
378 	if (nbd->nbd_poller) {
379 		spdk_poller_unregister(&nbd->nbd_poller);
380 	}
381 
382 	nbd_disk_unregister(nbd);
383 
384 	free(nbd);
385 }
386 
387 void
388 spdk_nbd_stop(struct spdk_nbd_disk *nbd)
389 {
390 	if (nbd == NULL) {
391 		return;
392 	}
393 
394 	nbd->state = NBD_DISK_STATE_HARDDISC;
395 
396 	/*
397 	 * Stop action should be called only after all nbd_io are executed.
398 	 */
399 	if (!nbd_cleanup_io(nbd)) {
400 		_nbd_stop(nbd);
401 	}
402 }
403 
404 static int64_t
405 read_from_socket(int fd, void *buf, size_t length)
406 {
407 	ssize_t bytes_read;
408 
409 	bytes_read = read(fd, buf, length);
410 	if (bytes_read == 0) {
411 		return -EIO;
412 	} else if (bytes_read == -1) {
413 		if (errno != EAGAIN) {
414 			return -errno;
415 		}
416 		return 0;
417 	} else {
418 		return bytes_read;
419 	}
420 }
421 
422 static int64_t
423 write_to_socket(int fd, void *buf, size_t length)
424 {
425 	ssize_t bytes_written;
426 
427 	bytes_written = write(fd, buf, length);
428 	if (bytes_written == 0) {
429 		return -EIO;
430 	} else if (bytes_written == -1) {
431 		if (errno != EAGAIN) {
432 			return -errno;
433 		}
434 		return 0;
435 	} else {
436 		return bytes_written;
437 	}
438 }
439 
440 static void
441 nbd_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
442 {
443 	struct nbd_io	*io = cb_arg;
444 	struct spdk_nbd_disk *nbd = io->nbd;
445 
446 	if (success) {
447 		io->resp.error = 0;
448 	} else {
449 		to_be32(&io->resp.error, EIO);
450 	}
451 
452 	memcpy(&io->resp.handle, &io->req.handle, sizeof(io->resp.handle));
453 	TAILQ_INSERT_TAIL(&nbd->executed_io_list, io, tailq);
454 
455 	if (bdev_io != NULL) {
456 		spdk_bdev_free_io(bdev_io);
457 	}
458 
459 	if (nbd->state == NBD_DISK_STATE_HARDDISC && !nbd_cleanup_io(nbd)) {
460 		_nbd_stop(nbd);
461 	}
462 }
463 
464 static void
465 nbd_resubmit_io(void *arg)
466 {
467 	struct nbd_io *io = (struct nbd_io *)arg;
468 	struct spdk_nbd_disk *nbd = io->nbd;
469 	int rc = 0;
470 
471 	rc = nbd_submit_bdev_io(nbd, io);
472 	if (rc) {
473 		SPDK_INFOLOG(SPDK_LOG_NBD, "nbd: io resubmit for dev %s , io_type %d, returned %d.\n",
474 			     nbd_disk_get_bdev_name(nbd), from_be32(&io->req.type), rc);
475 	}
476 }
477 
478 static void
479 nbd_queue_io(struct nbd_io *io)
480 {
481 	int rc;
482 	struct spdk_bdev *bdev = io->nbd->bdev;
483 
484 	io->bdev_io_wait.bdev = bdev;
485 	io->bdev_io_wait.cb_fn = nbd_resubmit_io;
486 	io->bdev_io_wait.cb_arg = io;
487 
488 	rc = spdk_bdev_queue_io_wait(bdev, io->nbd->ch, &io->bdev_io_wait);
489 	if (rc != 0) {
490 		SPDK_ERRLOG("Queue io failed in nbd_queue_io, rc=%d.\n", rc);
491 		nbd_io_done(NULL, false, io);
492 	}
493 }
494 
495 static int
496 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io)
497 {
498 	struct spdk_bdev_desc *desc = nbd->bdev_desc;
499 	struct spdk_io_channel *ch = nbd->ch;
500 	int rc = 0;
501 
502 	switch (from_be32(&io->req.type)) {
503 	case NBD_CMD_READ:
504 		rc = spdk_bdev_read(desc, ch, io->payload, from_be64(&io->req.from),
505 				    io->payload_size, nbd_io_done, io);
506 		break;
507 	case NBD_CMD_WRITE:
508 		rc = spdk_bdev_write(desc, ch, io->payload, from_be64(&io->req.from),
509 				     io->payload_size, nbd_io_done, io);
510 		break;
511 #ifdef NBD_FLAG_SEND_FLUSH
512 	case NBD_CMD_FLUSH:
513 		rc = spdk_bdev_flush(desc, ch, 0,
514 				     spdk_bdev_get_num_blocks(nbd->bdev) * spdk_bdev_get_block_size(nbd->bdev),
515 				     nbd_io_done, io);
516 		break;
517 #endif
518 #ifdef NBD_FLAG_SEND_TRIM
519 	case NBD_CMD_TRIM:
520 		rc = spdk_bdev_unmap(desc, ch, from_be64(&io->req.from),
521 				     from_be32(&io->req.len), nbd_io_done, io);
522 		break;
523 #endif
524 	case NBD_CMD_DISC:
525 		nbd_put_io(nbd, io);
526 		nbd->state = NBD_DISK_STATE_SOFTDISC;
527 		break;
528 	default:
529 		rc = -1;
530 	}
531 
532 	if (rc < 0) {
533 		if (rc == -ENOMEM) {
534 			SPDK_INFOLOG(SPDK_LOG_NBD, "No memory, start to queue io.\n");
535 			nbd_queue_io(io);
536 		} else {
537 			SPDK_ERRLOG("nbd io failed in nbd_queue_io, rc=%d.\n", rc);
538 			nbd_io_done(NULL, false, io);
539 		}
540 	}
541 
542 	return 0;
543 }
544 
545 static int
546 nbd_io_exec(struct spdk_nbd_disk *nbd)
547 {
548 	struct nbd_io *io, *io_tmp;
549 	int ret = 0;
550 
551 	/*
552 	 * For soft disconnection, nbd server must handle all outstanding
553 	 * request before closing connection.
554 	 */
555 	if (nbd->state == NBD_DISK_STATE_HARDDISC) {
556 		return 0;
557 	}
558 
559 	if (!TAILQ_EMPTY(&nbd->received_io_list)) {
560 		TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) {
561 			TAILQ_REMOVE(&nbd->received_io_list, io, tailq);
562 			ret = nbd_submit_bdev_io(nbd, io);
563 			if (ret < 0) {
564 				break;
565 			}
566 		}
567 	}
568 
569 	return ret;
570 }
571 
572 static int
573 nbd_io_recv_internal(struct spdk_nbd_disk *nbd)
574 {
575 	struct nbd_io *io;
576 	int ret = 0;
577 
578 	if (nbd->io_in_recv == NULL) {
579 		nbd->io_in_recv = nbd_get_io(nbd);
580 		if (!nbd->io_in_recv) {
581 			return -ENOMEM;
582 		}
583 	}
584 
585 	io = nbd->io_in_recv;
586 
587 	if (io->state == NBD_IO_RECV_REQ) {
588 		ret = read_from_socket(nbd->spdk_sp_fd, (char *)&io->req + io->offset,
589 				       sizeof(io->req) - io->offset);
590 		if (ret < 0) {
591 			nbd_put_io(nbd, io);
592 			nbd->io_in_recv = NULL;
593 			return ret;
594 		}
595 
596 		io->offset += ret;
597 
598 		/* request is fully received */
599 		if (io->offset == sizeof(io->req)) {
600 			io->offset = 0;
601 
602 			/* req magic check */
603 			if (from_be32(&io->req.magic) != NBD_REQUEST_MAGIC) {
604 				SPDK_ERRLOG("invalid request magic\n");
605 				nbd_put_io(nbd, io);
606 				nbd->io_in_recv = NULL;
607 				return -EINVAL;
608 			}
609 
610 			/* io except read/write should ignore payload */
611 			if (from_be32(&io->req.type) == NBD_CMD_WRITE ||
612 			    from_be32(&io->req.type) == NBD_CMD_READ) {
613 				io->payload_size = from_be32(&io->req.len);
614 			} else {
615 				io->payload_size = 0;
616 			}
617 
618 			/* io payload allocate */
619 			if (io->payload_size) {
620 				io->payload = spdk_malloc(io->payload_size, nbd->buf_align, NULL,
621 							  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
622 				if (io->payload == NULL) {
623 					SPDK_ERRLOG("could not allocate io->payload of size %d\n", io->payload_size);
624 					nbd_put_io(nbd, io);
625 					nbd->io_in_recv = NULL;
626 					return -ENOMEM;
627 				}
628 			} else {
629 				io->payload = NULL;
630 			}
631 
632 			/* next io step */
633 			if (from_be32(&io->req.type) == NBD_CMD_WRITE) {
634 				io->state = NBD_IO_RECV_PAYLOAD;
635 			} else {
636 				io->state = NBD_IO_XMIT_RESP;
637 				nbd->io_in_recv = NULL;
638 				TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq);
639 			}
640 		}
641 	}
642 
643 	if (io->state == NBD_IO_RECV_PAYLOAD) {
644 		ret = read_from_socket(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset);
645 		if (ret < 0) {
646 			nbd_put_io(nbd, io);
647 			nbd->io_in_recv = NULL;
648 			return ret;
649 		}
650 
651 		io->offset += ret;
652 
653 		/* request payload is fully received */
654 		if (io->offset == io->payload_size) {
655 			io->offset = 0;
656 			io->state = NBD_IO_XMIT_RESP;
657 			nbd->io_in_recv = NULL;
658 			TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq);
659 		}
660 
661 	}
662 
663 	return 0;
664 }
665 
666 static int
667 nbd_io_recv(struct spdk_nbd_disk *nbd)
668 {
669 	int i, ret = 0;
670 
671 	/*
672 	 * nbd server should not accept request in both soft and hard
673 	 * disconnect states.
674 	 */
675 	if (nbd->state != NBD_DISK_STATE_RUNNING) {
676 		return 0;
677 	}
678 
679 	for (i = 0; i < GET_IO_LOOP_COUNT; i++) {
680 		ret = nbd_io_recv_internal(nbd);
681 		if (ret != 0) {
682 			return ret;
683 		}
684 	}
685 
686 	return 0;
687 }
688 
689 static int
690 nbd_io_xmit_internal(struct spdk_nbd_disk *nbd)
691 {
692 	struct nbd_io *io;
693 	int ret = 0;
694 
695 	io = TAILQ_FIRST(&nbd->executed_io_list);
696 	if (io == NULL) {
697 		return 0;
698 	}
699 
700 	/* Remove IO from list now assuming it will be completed.  It will be inserted
701 	 *  back to the head if it cannot be completed.  This approach is specifically
702 	 *  taken to work around a scan-build use-after-free mischaracterization.
703 	 */
704 	TAILQ_REMOVE(&nbd->executed_io_list, io, tailq);
705 
706 	/* resp error and handler are already set in io_done */
707 
708 	if (io->state == NBD_IO_XMIT_RESP) {
709 		ret = write_to_socket(nbd->spdk_sp_fd, (char *)&io->resp + io->offset,
710 				      sizeof(io->resp) - io->offset);
711 		if (ret <= 0) {
712 			goto reinsert;
713 		}
714 
715 		io->offset += ret;
716 
717 		/* response is fully transmitted */
718 		if (io->offset == sizeof(io->resp)) {
719 			io->offset = 0;
720 
721 			/* transmit payload only when NBD_CMD_READ with no resp error */
722 			if (from_be32(&io->req.type) != NBD_CMD_READ || io->resp.error != 0) {
723 				nbd_put_io(nbd, io);
724 				return 0;
725 			} else {
726 				io->state = NBD_IO_XMIT_PAYLOAD;
727 			}
728 		}
729 	}
730 
731 	if (io->state == NBD_IO_XMIT_PAYLOAD) {
732 		ret = write_to_socket(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset);
733 		if (ret <= 0) {
734 			goto reinsert;
735 		}
736 
737 		io->offset += ret;
738 
739 		/* read payload is fully transmitted */
740 		if (io->offset == io->payload_size) {
741 			nbd_put_io(nbd, io);
742 			return 0;
743 		}
744 	}
745 
746 reinsert:
747 	TAILQ_INSERT_HEAD(&nbd->executed_io_list, io, tailq);
748 	return ret;
749 }
750 
751 static int
752 nbd_io_xmit(struct spdk_nbd_disk *nbd)
753 {
754 	int ret = 0;
755 
756 	/*
757 	 * For soft disconnection, nbd server must handle all outstanding
758 	 * request before closing connection.
759 	 */
760 	if (nbd->state == NBD_DISK_STATE_HARDDISC) {
761 		return 0;
762 	}
763 
764 	while (!TAILQ_EMPTY(&nbd->executed_io_list)) {
765 		ret = nbd_io_xmit_internal(nbd);
766 		if (ret != 0) {
767 			return ret;
768 		}
769 	}
770 
771 	/*
772 	 * For soft disconnection, nbd server can close connection after all
773 	 * outstanding request are transmitted.
774 	 */
775 	if (nbd->state == NBD_DISK_STATE_SOFTDISC && !nbd_io_xmit_check(nbd)) {
776 		return -1;
777 	}
778 
779 	return 0;
780 }
781 
782 /**
783  * Poll an NBD instance.
784  *
785  * \return 0 on success or negated errno values on error (e.g. connection closed).
786  */
787 static int
788 _nbd_poll(struct spdk_nbd_disk *nbd)
789 {
790 	int rc;
791 
792 	/* transmit executed io first */
793 	rc = nbd_io_xmit(nbd);
794 	if (rc < 0) {
795 		return rc;
796 	}
797 
798 	rc = nbd_io_recv(nbd);
799 	if (rc < 0) {
800 		return rc;
801 	}
802 
803 	rc = nbd_io_exec(nbd);
804 
805 	return rc;
806 }
807 
808 static int
809 nbd_poll(void *arg)
810 {
811 	struct spdk_nbd_disk *nbd = arg;
812 	int rc;
813 
814 	rc = _nbd_poll(nbd);
815 	if (rc < 0) {
816 		SPDK_INFOLOG(SPDK_LOG_NBD, "nbd_poll() returned %s (%d); closing connection\n",
817 			     spdk_strerror(-rc), rc);
818 		spdk_nbd_stop(nbd);
819 	}
820 
821 	return -1;
822 }
823 
824 static void *
825 nbd_start_kernel(void *arg)
826 {
827 	int dev_fd = (int)(intptr_t)arg;
828 
829 	spdk_unaffinitize_thread();
830 
831 	/* This will block in the kernel until we close the spdk_sp_fd. */
832 	ioctl(dev_fd, NBD_DO_IT);
833 
834 	pthread_exit(NULL);
835 }
836 
837 static void
838 nbd_bdev_hot_remove(void *remove_ctx)
839 {
840 	struct spdk_nbd_disk *nbd = remove_ctx;
841 
842 	spdk_nbd_stop(nbd);
843 }
844 
845 struct spdk_nbd_start_ctx {
846 	struct spdk_nbd_disk	*nbd;
847 	spdk_nbd_start_cb	cb_fn;
848 	void			*cb_arg;
849 	struct spdk_poller	*poller;
850 	int			polling_count;
851 };
852 
853 static void
854 nbd_start_complete(struct spdk_nbd_start_ctx *ctx)
855 {
856 	int		rc;
857 	pthread_t	tid;
858 	int		flag;
859 
860 	/* Add nbd_disk to the end of disk list */
861 	rc = nbd_disk_register(ctx->nbd);
862 	if (rc != 0) {
863 		SPDK_ERRLOG("Failed to register %s, it should not happen.\n", ctx->nbd->nbd_path);
864 		assert(false);
865 		goto err;
866 	}
867 
868 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_BLKSIZE, spdk_bdev_get_block_size(ctx->nbd->bdev));
869 	if (rc == -1) {
870 		SPDK_ERRLOG("ioctl(NBD_SET_BLKSIZE) failed: %s\n", spdk_strerror(errno));
871 		rc = -errno;
872 		goto err;
873 	}
874 
875 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SIZE_BLOCKS, spdk_bdev_get_num_blocks(ctx->nbd->bdev));
876 	if (rc == -1) {
877 		SPDK_ERRLOG("ioctl(NBD_SET_SIZE_BLOCKS) failed: %s\n", spdk_strerror(errno));
878 		rc = -errno;
879 		goto err;
880 	}
881 
882 #ifdef NBD_FLAG_SEND_TRIM
883 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_FLAGS, NBD_FLAG_SEND_TRIM);
884 	if (rc == -1) {
885 		SPDK_ERRLOG("ioctl(NBD_SET_FLAGS) failed: %s\n", spdk_strerror(errno));
886 		rc = -errno;
887 		goto err;
888 	}
889 #endif
890 
891 	rc = pthread_create(&tid, NULL, nbd_start_kernel, (void *)(intptr_t)ctx->nbd->dev_fd);
892 	if (rc != 0) {
893 		SPDK_ERRLOG("could not create thread: %s\n", spdk_strerror(rc));
894 		rc = -rc;
895 		goto err;
896 	}
897 
898 	rc = pthread_detach(tid);
899 	if (rc != 0) {
900 		SPDK_ERRLOG("could not detach thread for nbd kernel: %s\n", spdk_strerror(rc));
901 		rc = -rc;
902 		goto err;
903 	}
904 
905 	flag = fcntl(ctx->nbd->spdk_sp_fd, F_GETFL);
906 	if (fcntl(ctx->nbd->spdk_sp_fd, F_SETFL, flag | O_NONBLOCK) < 0) {
907 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
908 			    ctx->nbd->spdk_sp_fd, spdk_strerror(errno));
909 		rc = -errno;
910 		goto err;
911 	}
912 
913 	ctx->nbd->nbd_poller = SPDK_POLLER_REGISTER(nbd_poll, ctx->nbd, 0);
914 
915 	if (ctx->cb_fn) {
916 		ctx->cb_fn(ctx->cb_arg, ctx->nbd, 0);
917 	}
918 
919 	free(ctx);
920 	return;
921 
922 err:
923 	spdk_nbd_stop(ctx->nbd);
924 	if (ctx->cb_fn) {
925 		ctx->cb_fn(ctx->cb_arg, NULL, rc);
926 	}
927 	free(ctx);
928 }
929 
930 static int
931 nbd_enable_kernel(void *arg)
932 {
933 	struct spdk_nbd_start_ctx *ctx = arg;
934 	int rc;
935 
936 	/* Declare device setup by this process */
937 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SOCK, ctx->nbd->kernel_sp_fd);
938 	if (rc == -1) {
939 		if (errno == EBUSY && ctx->polling_count-- > 0) {
940 			if (ctx->poller == NULL) {
941 				ctx->poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx,
942 								   NBD_BUSY_POLLING_INTERVAL_US);
943 			}
944 			/* If the kernel is busy, check back later */
945 			return 0;
946 		}
947 
948 		SPDK_ERRLOG("ioctl(NBD_SET_SOCK) failed: %s\n", spdk_strerror(errno));
949 		if (ctx->poller) {
950 			spdk_poller_unregister(&ctx->poller);
951 		}
952 
953 		spdk_nbd_stop(ctx->nbd);
954 
955 		if (ctx->cb_fn) {
956 			ctx->cb_fn(ctx->cb_arg, NULL, -errno);
957 		}
958 
959 		free(ctx);
960 		return 1;
961 	}
962 
963 	if (ctx->poller) {
964 		spdk_poller_unregister(&ctx->poller);
965 	}
966 
967 	nbd_start_complete(ctx);
968 
969 	return 1;
970 }
971 
972 void
973 spdk_nbd_start(const char *bdev_name, const char *nbd_path,
974 	       spdk_nbd_start_cb cb_fn, void *cb_arg)
975 {
976 	struct spdk_nbd_start_ctx	*ctx = NULL;
977 	struct spdk_nbd_disk		*nbd = NULL;
978 	struct spdk_bdev		*bdev;
979 	int				rc;
980 	int				sp[2];
981 
982 	bdev = spdk_bdev_get_by_name(bdev_name);
983 	if (bdev == NULL) {
984 		SPDK_ERRLOG("no bdev %s exists\n", bdev_name);
985 		rc = -EINVAL;
986 		goto err;
987 	}
988 
989 	nbd = calloc(1, sizeof(*nbd));
990 	if (nbd == NULL) {
991 		rc = -ENOMEM;
992 		goto err;
993 	}
994 
995 	nbd->dev_fd = -1;
996 	nbd->spdk_sp_fd = -1;
997 	nbd->kernel_sp_fd = -1;
998 
999 	ctx = calloc(1, sizeof(*ctx));
1000 	if (ctx == NULL) {
1001 		rc = -ENOMEM;
1002 		goto err;
1003 	}
1004 
1005 	ctx->nbd = nbd;
1006 	ctx->cb_fn = cb_fn;
1007 	ctx->cb_arg = cb_arg;
1008 	ctx->polling_count = NBD_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US;
1009 
1010 	rc = spdk_bdev_open(bdev, true, nbd_bdev_hot_remove, nbd, &nbd->bdev_desc);
1011 	if (rc != 0) {
1012 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", spdk_bdev_get_name(bdev), rc);
1013 		goto err;
1014 	}
1015 
1016 	nbd->bdev = bdev;
1017 
1018 	nbd->ch = spdk_bdev_get_io_channel(nbd->bdev_desc);
1019 	nbd->buf_align = spdk_max(spdk_bdev_get_buf_align(bdev), 64);
1020 
1021 	rc = socketpair(AF_UNIX, SOCK_STREAM, 0, sp);
1022 	if (rc != 0) {
1023 		SPDK_ERRLOG("socketpair failed\n");
1024 		rc = -errno;
1025 		goto err;
1026 	}
1027 
1028 	nbd->spdk_sp_fd = sp[0];
1029 	nbd->kernel_sp_fd = sp[1];
1030 	nbd->nbd_path = strdup(nbd_path);
1031 	if (!nbd->nbd_path) {
1032 		SPDK_ERRLOG("strdup allocation failure\n");
1033 		rc = -ENOMEM;
1034 		goto err;
1035 	}
1036 
1037 	TAILQ_INIT(&nbd->received_io_list);
1038 	TAILQ_INIT(&nbd->executed_io_list);
1039 
1040 	/* Make sure nbd_path is not used in this SPDK app */
1041 	if (nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
1042 		SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path);
1043 		rc = -EBUSY;
1044 		goto err;
1045 	}
1046 
1047 	nbd->dev_fd = open(nbd_path, O_RDWR);
1048 	if (nbd->dev_fd == -1) {
1049 		SPDK_ERRLOG("open(\"%s\") failed: %s\n", nbd_path, spdk_strerror(errno));
1050 		rc = -errno;
1051 		goto err;
1052 	}
1053 
1054 	SPDK_INFOLOG(SPDK_LOG_NBD, "Enabling kernel access to bdev %s via %s\n",
1055 		     spdk_bdev_get_name(bdev), nbd_path);
1056 
1057 	nbd_enable_kernel(ctx);
1058 	return;
1059 
1060 err:
1061 	free(ctx);
1062 	if (nbd) {
1063 		spdk_nbd_stop(nbd);
1064 	}
1065 
1066 	if (cb_fn) {
1067 		cb_fn(cb_arg, NULL, rc);
1068 	}
1069 }
1070 
1071 const char *
1072 spdk_nbd_get_path(struct spdk_nbd_disk *nbd)
1073 {
1074 	return nbd->nbd_path;
1075 }
1076 
1077 SPDK_LOG_REGISTER_COMPONENT("nbd", SPDK_LOG_NBD)
1078