xref: /spdk/lib/nbd/nbd.c (revision fa2d95b3fe66e7f5c543eaef89fa00d4eaa0e6e7)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/string.h"
36 
37 #include <linux/nbd.h>
38 
39 #include "spdk/nbd.h"
40 #include "nbd_internal.h"
41 #include "spdk/bdev.h"
42 #include "spdk/endian.h"
43 #include "spdk/env.h"
44 #include "spdk/log.h"
45 #include "spdk/util.h"
46 #include "spdk/thread.h"
47 #include "spdk/event.h"
48 
49 #include "spdk_internal/log.h"
50 #include "spdk/queue.h"
51 
52 #define GET_IO_LOOP_COUNT		16
53 #define NBD_BUSY_WAITING_MS		1000
54 #define NBD_BUSY_POLLING_INTERVAL_US	20000
55 
56 enum nbd_io_state_t {
57 	/* Receiving or ready to receive nbd request header */
58 	NBD_IO_RECV_REQ = 0,
59 	/* Receiving write payload */
60 	NBD_IO_RECV_PAYLOAD,
61 	/* Transmitting or ready to transmit nbd response header */
62 	NBD_IO_XMIT_RESP,
63 	/* Transmitting read payload */
64 	NBD_IO_XMIT_PAYLOAD,
65 };
66 
67 struct nbd_io {
68 	struct spdk_nbd_disk	*nbd;
69 	enum nbd_io_state_t	state;
70 
71 	void			*payload;
72 	uint32_t		payload_size;
73 
74 	struct nbd_request	req;
75 	struct nbd_reply	resp;
76 
77 	/*
78 	 * Tracks current progress on reading/writing a request,
79 	 * response, or payload from the nbd socket.
80 	 */
81 	uint32_t		offset;
82 
83 	/* for bdev io_wait */
84 	struct spdk_bdev_io_wait_entry bdev_io_wait;
85 
86 	TAILQ_ENTRY(nbd_io)	tailq;
87 };
88 
89 enum nbd_disk_state_t {
90 	NBD_DISK_STATE_RUNNING = 0,
91 	/* soft disconnection caused by receiving nbd_cmd_disc */
92 	NBD_DISK_STATE_SOFTDISC,
93 	/* hard disconnection caused by mandatory conditions */
94 	NBD_DISK_STATE_HARDDISC,
95 };
96 
97 struct spdk_nbd_disk {
98 	struct spdk_bdev	*bdev;
99 	struct spdk_bdev_desc	*bdev_desc;
100 	struct spdk_io_channel	*ch;
101 	int			dev_fd;
102 	char			*nbd_path;
103 	int			kernel_sp_fd;
104 	int			spdk_sp_fd;
105 	struct spdk_poller	*nbd_poller;
106 	uint32_t		buf_align;
107 
108 	struct nbd_io		*io_in_recv;
109 	TAILQ_HEAD(, nbd_io)	received_io_list;
110 	TAILQ_HEAD(, nbd_io)	executed_io_list;
111 
112 	enum nbd_disk_state_t	state;
113 	/* count of nbd_io in spdk_nbd_disk */
114 	int			io_count;
115 
116 	TAILQ_ENTRY(spdk_nbd_disk)	tailq;
117 };
118 
119 struct spdk_nbd_disk_globals {
120 	TAILQ_HEAD(, spdk_nbd_disk)	disk_head;
121 };
122 
123 static struct spdk_nbd_disk_globals g_spdk_nbd;
124 
125 static int
126 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io);
127 
128 int
129 spdk_nbd_init(void)
130 {
131 	TAILQ_INIT(&g_spdk_nbd.disk_head);
132 
133 	return 0;
134 }
135 
136 void
137 spdk_nbd_fini(void)
138 {
139 	struct spdk_nbd_disk *nbd_idx, *nbd_tmp;
140 
141 	/*
142 	 * Stop running spdk_nbd_disk.
143 	 * Here, nbd removing are unnecessary, but _SAFE variant
144 	 * is needed, since internal spdk_nbd_disk_unregister will
145 	 * remove nbd from TAILQ.
146 	 */
147 	TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) {
148 		spdk_nbd_stop(nbd_idx);
149 	}
150 }
151 
152 static int
153 spdk_nbd_disk_register(struct spdk_nbd_disk *nbd)
154 {
155 	if (spdk_nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
156 		SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path);
157 		return -EBUSY;
158 	}
159 
160 	TAILQ_INSERT_TAIL(&g_spdk_nbd.disk_head, nbd, tailq);
161 
162 	return 0;
163 }
164 
165 static void
166 spdk_nbd_disk_unregister(struct spdk_nbd_disk *nbd)
167 {
168 	struct spdk_nbd_disk *nbd_idx, *nbd_tmp;
169 
170 	/*
171 	 * nbd disk may be stopped before registered.
172 	 * check whether it was registered.
173 	 */
174 	TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) {
175 		if (nbd == nbd_idx) {
176 			TAILQ_REMOVE(&g_spdk_nbd.disk_head, nbd_idx, tailq);
177 			break;
178 		}
179 	}
180 }
181 
182 struct spdk_nbd_disk *
183 spdk_nbd_disk_find_by_nbd_path(const char *nbd_path)
184 {
185 	struct spdk_nbd_disk *nbd;
186 
187 	/*
188 	 * check whether nbd has already been registered by nbd path.
189 	 */
190 	TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) {
191 		if (!strcmp(nbd->nbd_path, nbd_path)) {
192 			return nbd;
193 		}
194 	}
195 
196 	return NULL;
197 }
198 
199 struct spdk_nbd_disk *spdk_nbd_disk_first(void)
200 {
201 	return TAILQ_FIRST(&g_spdk_nbd.disk_head);
202 }
203 
204 struct spdk_nbd_disk *spdk_nbd_disk_next(struct spdk_nbd_disk *prev)
205 {
206 	return TAILQ_NEXT(prev, tailq);
207 }
208 
209 const char *
210 spdk_nbd_disk_get_nbd_path(struct spdk_nbd_disk *nbd)
211 {
212 	return nbd->nbd_path;
213 }
214 
215 const char *
216 spdk_nbd_disk_get_bdev_name(struct spdk_nbd_disk *nbd)
217 {
218 	return spdk_bdev_get_name(nbd->bdev);
219 }
220 
221 void
222 spdk_nbd_write_config_json(struct spdk_json_write_ctx *w)
223 {
224 	struct spdk_nbd_disk *nbd;
225 
226 	spdk_json_write_array_begin(w);
227 
228 	TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) {
229 		spdk_json_write_object_begin(w);
230 
231 		spdk_json_write_named_string(w, "method", "start_nbd_disk");
232 
233 		spdk_json_write_named_object_begin(w, "params");
234 		spdk_json_write_named_string(w, "nbd_device",  spdk_nbd_disk_get_nbd_path(nbd));
235 		spdk_json_write_named_string(w, "bdev_name", spdk_nbd_disk_get_bdev_name(nbd));
236 		spdk_json_write_object_end(w);
237 
238 		spdk_json_write_object_end(w);
239 	}
240 
241 	spdk_json_write_array_end(w);
242 }
243 
244 void
245 nbd_disconnect(struct spdk_nbd_disk *nbd)
246 {
247 	/*
248 	 * nbd soft-disconnection to terminate transmission phase.
249 	 * After receiving this ioctl command, nbd kernel module will send
250 	 * a NBD_CMD_DISC type io to nbd server in order to inform server.
251 	 */
252 	ioctl(nbd->dev_fd, NBD_DISCONNECT);
253 }
254 
255 static struct nbd_io *
256 spdk_get_nbd_io(struct spdk_nbd_disk *nbd)
257 {
258 	struct nbd_io *io;
259 
260 	io = calloc(1, sizeof(*io));
261 	if (!io) {
262 		return NULL;
263 	}
264 
265 	io->nbd = nbd;
266 	to_be32(&io->resp.magic, NBD_REPLY_MAGIC);
267 
268 	nbd->io_count++;
269 
270 	return io;
271 }
272 
273 static void
274 spdk_put_nbd_io(struct spdk_nbd_disk *nbd, struct nbd_io *io)
275 {
276 	if (io->payload) {
277 		spdk_dma_free(io->payload);
278 	}
279 	free(io);
280 
281 	nbd->io_count--;
282 }
283 
284 /*
285  * Check whether received nbd_io are all transmitted.
286  *
287  * \return 1 there is still some nbd_io not transmitted.
288  *         0 all nbd_io received are transmitted.
289  */
290 static int
291 spdk_nbd_io_xmit_check(struct spdk_nbd_disk *nbd)
292 {
293 	if (nbd->io_count == 0) {
294 		return 0;
295 	} else if (nbd->io_count == 1 && nbd->io_in_recv != NULL) {
296 		return 0;
297 	}
298 
299 	return 1;
300 }
301 
302 /*
303  * Check whether received nbd_io are all executed,
304  * and put back executed nbd_io instead of transmitting them
305  *
306  * \return 1 there is still some nbd_io under executing
307  *         0 all nbd_io gotten are freed.
308  */
309 static int
310 spdk_nbd_cleanup_io(struct spdk_nbd_disk *nbd)
311 {
312 	struct nbd_io *io, *io_tmp;
313 
314 	/* free io_in_recv */
315 	if (nbd->io_in_recv != NULL) {
316 		spdk_put_nbd_io(nbd, nbd->io_in_recv);
317 		nbd->io_in_recv = NULL;
318 	}
319 
320 	/* free io in received_io_list */
321 	if (!TAILQ_EMPTY(&nbd->received_io_list)) {
322 		TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) {
323 			TAILQ_REMOVE(&nbd->received_io_list, io, tailq);
324 			spdk_put_nbd_io(nbd, io);
325 		}
326 	}
327 
328 	/* free io in executed_io_list */
329 	if (!TAILQ_EMPTY(&nbd->executed_io_list)) {
330 		TAILQ_FOREACH_SAFE(io, &nbd->executed_io_list, tailq, io_tmp) {
331 			TAILQ_REMOVE(&nbd->executed_io_list, io, tailq);
332 			spdk_put_nbd_io(nbd, io);
333 		}
334 	}
335 
336 	/*
337 	 * Some nbd_io may be under executing in bdev.
338 	 * Wait for their done operation.
339 	 */
340 	if (nbd->io_count != 0) {
341 		return 1;
342 	}
343 
344 	return 0;
345 }
346 
347 static void
348 _nbd_stop(struct spdk_nbd_disk *nbd)
349 {
350 	if (nbd->ch) {
351 		spdk_put_io_channel(nbd->ch);
352 	}
353 
354 	if (nbd->bdev_desc) {
355 		spdk_bdev_close(nbd->bdev_desc);
356 	}
357 
358 	if (nbd->spdk_sp_fd >= 0) {
359 		close(nbd->spdk_sp_fd);
360 	}
361 
362 	if (nbd->kernel_sp_fd >= 0) {
363 		close(nbd->kernel_sp_fd);
364 	}
365 
366 	if (nbd->dev_fd >= 0) {
367 		/* Clear nbd device only if it is occupied by SPDK app */
368 		if (nbd->nbd_path && spdk_nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
369 			ioctl(nbd->dev_fd, NBD_CLEAR_QUE);
370 			ioctl(nbd->dev_fd, NBD_CLEAR_SOCK);
371 		}
372 		close(nbd->dev_fd);
373 	}
374 
375 	if (nbd->nbd_path) {
376 		free(nbd->nbd_path);
377 	}
378 
379 	if (nbd->nbd_poller) {
380 		spdk_poller_unregister(&nbd->nbd_poller);
381 	}
382 
383 	spdk_nbd_disk_unregister(nbd);
384 
385 	free(nbd);
386 }
387 
388 void
389 spdk_nbd_stop(struct spdk_nbd_disk *nbd)
390 {
391 	if (nbd == NULL) {
392 		return;
393 	}
394 
395 	nbd->state = NBD_DISK_STATE_HARDDISC;
396 
397 	/*
398 	 * Stop action should be called only after all nbd_io are executed.
399 	 */
400 	if (!spdk_nbd_cleanup_io(nbd)) {
401 		_nbd_stop(nbd);
402 	}
403 }
404 
405 static int64_t
406 read_from_socket(int fd, void *buf, size_t length)
407 {
408 	ssize_t bytes_read;
409 
410 	bytes_read = read(fd, buf, length);
411 	if (bytes_read == 0) {
412 		return -EIO;
413 	} else if (bytes_read == -1) {
414 		if (errno != EAGAIN) {
415 			return -errno;
416 		}
417 		return 0;
418 	} else {
419 		return bytes_read;
420 	}
421 }
422 
423 static int64_t
424 write_to_socket(int fd, void *buf, size_t length)
425 {
426 	ssize_t bytes_written;
427 
428 	bytes_written = write(fd, buf, length);
429 	if (bytes_written == 0) {
430 		return -EIO;
431 	} else if (bytes_written == -1) {
432 		if (errno != EAGAIN) {
433 			return -errno;
434 		}
435 		return 0;
436 	} else {
437 		return bytes_written;
438 	}
439 }
440 
441 static void
442 nbd_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
443 {
444 	struct nbd_io	*io = cb_arg;
445 	struct spdk_nbd_disk *nbd = io->nbd;
446 
447 	if (success) {
448 		io->resp.error = 0;
449 	} else {
450 		to_be32(&io->resp.error, EIO);
451 	}
452 
453 	memcpy(&io->resp.handle, &io->req.handle, sizeof(io->resp.handle));
454 	TAILQ_INSERT_TAIL(&nbd->executed_io_list, io, tailq);
455 
456 	if (bdev_io != NULL) {
457 		spdk_bdev_free_io(bdev_io);
458 	}
459 
460 	if (nbd->state == NBD_DISK_STATE_HARDDISC && !spdk_nbd_cleanup_io(nbd)) {
461 		_nbd_stop(nbd);
462 	}
463 }
464 
465 static void
466 nbd_resubmit_io(void *arg)
467 {
468 	struct nbd_io *io = (struct nbd_io *)arg;
469 	struct spdk_nbd_disk *nbd = io->nbd;
470 	int rc = 0;
471 
472 	rc = nbd_submit_bdev_io(nbd, io);
473 	if (rc) {
474 		SPDK_INFOLOG(SPDK_LOG_NBD, "nbd: io resubmit for dev %s , io_type %d, returned %d.\n",
475 			     spdk_nbd_disk_get_bdev_name(nbd), from_be32(&io->req.type), rc);
476 	}
477 }
478 
479 static void
480 nbd_queue_io(struct nbd_io *io)
481 {
482 	int rc;
483 	struct spdk_bdev *bdev = io->nbd->bdev;
484 
485 	io->bdev_io_wait.bdev = bdev;
486 	io->bdev_io_wait.cb_fn = nbd_resubmit_io;
487 	io->bdev_io_wait.cb_arg = io;
488 
489 	rc = spdk_bdev_queue_io_wait(bdev, io->nbd->ch, &io->bdev_io_wait);
490 	if (rc != 0) {
491 		SPDK_ERRLOG("Queue io failed in nbd_queue_io, rc=%d.\n", rc);
492 		nbd_io_done(NULL, false, io);
493 	}
494 }
495 
496 static int
497 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io)
498 {
499 	struct spdk_bdev_desc *desc = nbd->bdev_desc;
500 	struct spdk_io_channel *ch = nbd->ch;
501 	int rc = 0;
502 
503 	switch (from_be32(&io->req.type)) {
504 	case NBD_CMD_READ:
505 		rc = spdk_bdev_read(desc, ch, io->payload, from_be64(&io->req.from),
506 				    io->payload_size, nbd_io_done, io);
507 		break;
508 	case NBD_CMD_WRITE:
509 		rc = spdk_bdev_write(desc, ch, io->payload, from_be64(&io->req.from),
510 				     io->payload_size, nbd_io_done, io);
511 		break;
512 #ifdef NBD_FLAG_SEND_FLUSH
513 	case NBD_CMD_FLUSH:
514 		rc = spdk_bdev_flush(desc, ch, 0,
515 				     spdk_bdev_get_num_blocks(nbd->bdev) * spdk_bdev_get_block_size(nbd->bdev),
516 				     nbd_io_done, io);
517 		break;
518 #endif
519 #ifdef NBD_FLAG_SEND_TRIM
520 	case NBD_CMD_TRIM:
521 		rc = spdk_bdev_unmap(desc, ch, from_be64(&io->req.from),
522 				     from_be32(&io->req.len), nbd_io_done, io);
523 		break;
524 #endif
525 	case NBD_CMD_DISC:
526 		spdk_put_nbd_io(nbd, io);
527 		nbd->state = NBD_DISK_STATE_SOFTDISC;
528 		break;
529 	default:
530 		rc = -1;
531 	}
532 
533 	if (rc < 0) {
534 		if (rc == -ENOMEM) {
535 			SPDK_INFOLOG(SPDK_LOG_NBD, "No memory, start to queue io.\n");
536 			nbd_queue_io(io);
537 		} else {
538 			SPDK_ERRLOG("nbd io failed in nbd_queue_io, rc=%d.\n", rc);
539 			nbd_io_done(NULL, false, io);
540 		}
541 	}
542 
543 	return 0;
544 }
545 
546 static int
547 spdk_nbd_io_exec(struct spdk_nbd_disk *nbd)
548 {
549 	struct nbd_io *io, *io_tmp;
550 	int ret = 0;
551 
552 	/*
553 	 * For soft disconnection, nbd server must handle all outstanding
554 	 * request before closing connection.
555 	 */
556 	if (nbd->state == NBD_DISK_STATE_HARDDISC) {
557 		return 0;
558 	}
559 
560 	if (!TAILQ_EMPTY(&nbd->received_io_list)) {
561 		TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) {
562 			TAILQ_REMOVE(&nbd->received_io_list, io, tailq);
563 			ret = nbd_submit_bdev_io(nbd, io);
564 			if (ret < 0) {
565 				break;
566 			}
567 		}
568 	}
569 
570 	return ret;
571 }
572 
573 static int
574 spdk_nbd_io_recv_internal(struct spdk_nbd_disk *nbd)
575 {
576 	struct nbd_io *io;
577 	int ret = 0;
578 
579 	if (nbd->io_in_recv == NULL) {
580 		nbd->io_in_recv = spdk_get_nbd_io(nbd);
581 		if (!nbd->io_in_recv) {
582 			return -ENOMEM;
583 		}
584 	}
585 
586 	io = nbd->io_in_recv;
587 
588 	if (io->state == NBD_IO_RECV_REQ) {
589 		ret = read_from_socket(nbd->spdk_sp_fd, (char *)&io->req + io->offset,
590 				       sizeof(io->req) - io->offset);
591 		if (ret < 0) {
592 			spdk_put_nbd_io(nbd, io);
593 			nbd->io_in_recv = NULL;
594 			return ret;
595 		}
596 
597 		io->offset += ret;
598 
599 		/* request is fully received */
600 		if (io->offset == sizeof(io->req)) {
601 			io->offset = 0;
602 
603 			/* req magic check */
604 			if (from_be32(&io->req.magic) != NBD_REQUEST_MAGIC) {
605 				SPDK_ERRLOG("invalid request magic\n");
606 				spdk_put_nbd_io(nbd, io);
607 				nbd->io_in_recv = NULL;
608 				return -EINVAL;
609 			}
610 
611 			/* io except read/write should ignore payload */
612 			if (from_be32(&io->req.type) == NBD_CMD_WRITE ||
613 			    from_be32(&io->req.type) == NBD_CMD_READ) {
614 				io->payload_size = from_be32(&io->req.len);
615 			} else {
616 				io->payload_size = 0;
617 			}
618 
619 			/* io payload allocate */
620 			if (io->payload_size) {
621 				io->payload = spdk_dma_malloc(io->payload_size, nbd->buf_align, NULL);
622 				if (io->payload == NULL) {
623 					SPDK_ERRLOG("could not allocate io->payload of size %d\n", io->payload_size);
624 					spdk_put_nbd_io(nbd, io);
625 					nbd->io_in_recv = NULL;
626 					return -ENOMEM;
627 				}
628 			} else {
629 				io->payload = NULL;
630 			}
631 
632 			/* next io step */
633 			if (from_be32(&io->req.type) == NBD_CMD_WRITE) {
634 				io->state = NBD_IO_RECV_PAYLOAD;
635 			} else {
636 				io->state = NBD_IO_XMIT_RESP;
637 				nbd->io_in_recv = NULL;
638 				TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq);
639 			}
640 		}
641 	}
642 
643 	if (io->state == NBD_IO_RECV_PAYLOAD) {
644 		ret = read_from_socket(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset);
645 		if (ret < 0) {
646 			spdk_put_nbd_io(nbd, io);
647 			nbd->io_in_recv = NULL;
648 			return ret;
649 		}
650 
651 		io->offset += ret;
652 
653 		/* request payload is fully received */
654 		if (io->offset == io->payload_size) {
655 			io->offset = 0;
656 			io->state = NBD_IO_XMIT_RESP;
657 			nbd->io_in_recv = NULL;
658 			TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq);
659 		}
660 
661 	}
662 
663 	return 0;
664 }
665 
666 static int
667 spdk_nbd_io_recv(struct spdk_nbd_disk *nbd)
668 {
669 	int i, ret = 0;
670 
671 	/*
672 	 * nbd server should not accept request in both soft and hard
673 	 * disconnect states.
674 	 */
675 	if (nbd->state != NBD_DISK_STATE_RUNNING) {
676 		return 0;
677 	}
678 
679 	for (i = 0; i < GET_IO_LOOP_COUNT; i++) {
680 		ret = spdk_nbd_io_recv_internal(nbd);
681 		if (ret != 0) {
682 			return ret;
683 		}
684 	}
685 
686 	return 0;
687 }
688 
689 static int
690 spdk_nbd_io_xmit_internal(struct spdk_nbd_disk *nbd)
691 {
692 	struct nbd_io *io;
693 	int ret = 0;
694 
695 	io = TAILQ_FIRST(&nbd->executed_io_list);
696 	if (io == NULL) {
697 		return 0;
698 	}
699 
700 	/* Remove IO from list now assuming it will be completed.  It will be inserted
701 	 *  back to the head if it cannot be completed.  This approach is specifically
702 	 *  taken to work around a scan-build use-after-free mischaracterization.
703 	 */
704 	TAILQ_REMOVE(&nbd->executed_io_list, io, tailq);
705 
706 	/* resp error and handler are already set in io_done */
707 
708 	if (io->state == NBD_IO_XMIT_RESP) {
709 		ret = write_to_socket(nbd->spdk_sp_fd, (char *)&io->resp + io->offset,
710 				      sizeof(io->resp) - io->offset);
711 		if (ret <= 0) {
712 			goto reinsert;
713 		}
714 
715 		io->offset += ret;
716 
717 		/* response is fully transmitted */
718 		if (io->offset == sizeof(io->resp)) {
719 			io->offset = 0;
720 
721 			/* transmit payload only when NBD_CMD_READ with no resp error */
722 			if (from_be32(&io->req.type) != NBD_CMD_READ || io->resp.error != 0) {
723 				spdk_put_nbd_io(nbd, io);
724 				return 0;
725 			} else {
726 				io->state = NBD_IO_XMIT_PAYLOAD;
727 			}
728 		}
729 	}
730 
731 	if (io->state == NBD_IO_XMIT_PAYLOAD) {
732 		ret = write_to_socket(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset);
733 		if (ret <= 0) {
734 			goto reinsert;
735 		}
736 
737 		io->offset += ret;
738 
739 		/* read payload is fully transmitted */
740 		if (io->offset == io->payload_size) {
741 			spdk_put_nbd_io(nbd, io);
742 			return 0;
743 		}
744 	}
745 
746 reinsert:
747 	TAILQ_INSERT_HEAD(&nbd->executed_io_list, io, tailq);
748 	return ret;
749 }
750 
751 static int
752 spdk_nbd_io_xmit(struct spdk_nbd_disk *nbd)
753 {
754 	int ret = 0;
755 
756 	/*
757 	 * For soft disconnection, nbd server must handle all outstanding
758 	 * request before closing connection.
759 	 */
760 	if (nbd->state == NBD_DISK_STATE_HARDDISC) {
761 		return 0;
762 	}
763 
764 	while (!TAILQ_EMPTY(&nbd->executed_io_list)) {
765 		ret = spdk_nbd_io_xmit_internal(nbd);
766 		if (ret != 0) {
767 			return ret;
768 		}
769 	}
770 
771 	/*
772 	 * For soft disconnection, nbd server can close connection after all
773 	 * outstanding request are transmitted.
774 	 */
775 	if (nbd->state == NBD_DISK_STATE_SOFTDISC && !spdk_nbd_io_xmit_check(nbd)) {
776 		return -1;
777 	}
778 
779 	return 0;
780 }
781 
782 /**
783  * Poll an NBD instance.
784  *
785  * \return 0 on success or negated errno values on error (e.g. connection closed).
786  */
787 static int
788 _spdk_nbd_poll(struct spdk_nbd_disk *nbd)
789 {
790 	int rc;
791 
792 	/* transmit executed io first */
793 	rc = spdk_nbd_io_xmit(nbd);
794 	if (rc < 0) {
795 		return rc;
796 	}
797 
798 	rc = spdk_nbd_io_recv(nbd);
799 	if (rc < 0) {
800 		return rc;
801 	}
802 
803 	rc = spdk_nbd_io_exec(nbd);
804 
805 	return rc;
806 }
807 
808 static int
809 spdk_nbd_poll(void *arg)
810 {
811 	struct spdk_nbd_disk *nbd = arg;
812 	int rc;
813 
814 	rc = _spdk_nbd_poll(nbd);
815 	if (rc < 0) {
816 		SPDK_INFOLOG(SPDK_LOG_NBD, "spdk_nbd_poll() returned %s (%d); closing connection\n",
817 			     spdk_strerror(-rc), rc);
818 		spdk_nbd_stop(nbd);
819 	}
820 
821 	return -1;
822 }
823 
824 static void *
825 nbd_start_kernel(void *arg)
826 {
827 	int dev_fd = (int)(intptr_t)arg;
828 
829 	spdk_unaffinitize_thread();
830 
831 	/* This will block in the kernel until we close the spdk_sp_fd. */
832 	ioctl(dev_fd, NBD_DO_IT);
833 
834 	pthread_exit(NULL);
835 }
836 
837 static void
838 spdk_nbd_bdev_hot_remove(void *remove_ctx)
839 {
840 	struct spdk_nbd_disk *nbd = remove_ctx;
841 
842 	spdk_nbd_stop(nbd);
843 }
844 
845 struct spdk_nbd_start_ctx {
846 	struct spdk_nbd_disk	*nbd;
847 	spdk_nbd_start_cb	cb_fn;
848 	void			*cb_arg;
849 	struct spdk_poller	*poller;
850 	int			polling_count;
851 };
852 
853 static void
854 spdk_nbd_start_complete(struct spdk_nbd_start_ctx *ctx)
855 {
856 	int		rc;
857 	pthread_t	tid;
858 	int		flag;
859 
860 	/* Add nbd_disk to the end of disk list */
861 	rc = spdk_nbd_disk_register(ctx->nbd);
862 	if (rc != 0) {
863 		SPDK_ERRLOG("Failed to register %s, it should not happen.\n", ctx->nbd->nbd_path);
864 		assert(false);
865 		goto err;
866 	}
867 
868 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_BLKSIZE, spdk_bdev_get_block_size(ctx->nbd->bdev));
869 	if (rc == -1) {
870 		SPDK_ERRLOG("ioctl(NBD_SET_BLKSIZE) failed: %s\n", spdk_strerror(errno));
871 		rc = -errno;
872 		goto err;
873 	}
874 
875 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SIZE_BLOCKS, spdk_bdev_get_num_blocks(ctx->nbd->bdev));
876 	if (rc == -1) {
877 		SPDK_ERRLOG("ioctl(NBD_SET_SIZE_BLOCKS) failed: %s\n", spdk_strerror(errno));
878 		rc = -errno;
879 		goto err;
880 	}
881 
882 #ifdef NBD_FLAG_SEND_TRIM
883 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_FLAGS, NBD_FLAG_SEND_TRIM);
884 	if (rc == -1) {
885 		SPDK_ERRLOG("ioctl(NBD_SET_FLAGS) failed: %s\n", spdk_strerror(errno));
886 		rc = -errno;
887 		goto err;
888 	}
889 #endif
890 
891 	rc = pthread_create(&tid, NULL, nbd_start_kernel, (void *)(intptr_t)ctx->nbd->dev_fd);
892 	if (rc != 0) {
893 		SPDK_ERRLOG("could not create thread: %s\n", spdk_strerror(rc));
894 		rc = -rc;
895 		goto err;
896 	}
897 
898 	rc = pthread_detach(tid);
899 	if (rc != 0) {
900 		SPDK_ERRLOG("could not detach thread for nbd kernel: %s\n", spdk_strerror(rc));
901 		rc = -rc;
902 		goto err;
903 	}
904 
905 	flag = fcntl(ctx->nbd->spdk_sp_fd, F_GETFL);
906 	if (fcntl(ctx->nbd->spdk_sp_fd, F_SETFL, flag | O_NONBLOCK) < 0) {
907 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
908 			    ctx->nbd->spdk_sp_fd, spdk_strerror(errno));
909 		rc = -errno;
910 		goto err;
911 	}
912 
913 	ctx->nbd->nbd_poller = spdk_poller_register(spdk_nbd_poll, ctx->nbd, 0);
914 
915 	if (ctx->cb_fn) {
916 		ctx->cb_fn(ctx->cb_arg, ctx->nbd, 0);
917 	}
918 
919 	free(ctx);
920 	return;
921 
922 err:
923 	spdk_nbd_stop(ctx->nbd);
924 	if (ctx->cb_fn) {
925 		ctx->cb_fn(ctx->cb_arg, NULL, rc);
926 	}
927 	free(ctx);
928 }
929 
930 static int
931 spdk_nbd_enable_kernel(void *arg)
932 {
933 	struct spdk_nbd_start_ctx *ctx = arg;
934 	int rc;
935 
936 	/* Declare device setup by this process */
937 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SOCK, ctx->nbd->kernel_sp_fd);
938 	if (rc == -1) {
939 		if (errno == EBUSY && ctx->polling_count-- > 0) {
940 			if (ctx->poller == NULL) {
941 				ctx->poller = spdk_poller_register(spdk_nbd_enable_kernel, ctx,
942 								   NBD_BUSY_POLLING_INTERVAL_US);
943 			}
944 			/* If the kernel is busy, check back later */
945 			return 0;
946 		}
947 
948 		SPDK_ERRLOG("ioctl(NBD_SET_SOCK) failed: %s\n", spdk_strerror(errno));
949 		if (ctx->poller) {
950 			spdk_poller_unregister(&ctx->poller);
951 		}
952 
953 		spdk_nbd_stop(ctx->nbd);
954 
955 		if (ctx->cb_fn) {
956 			ctx->cb_fn(ctx->cb_arg, NULL, -errno);
957 		}
958 
959 		free(ctx);
960 		return 1;
961 	}
962 
963 	if (ctx->poller) {
964 		spdk_poller_unregister(&ctx->poller);
965 	}
966 
967 	spdk_nbd_start_complete(ctx);
968 
969 	return 1;
970 }
971 
972 void
973 spdk_nbd_start(const char *bdev_name, const char *nbd_path,
974 	       spdk_nbd_start_cb cb_fn, void *cb_arg)
975 {
976 	struct spdk_nbd_start_ctx	*ctx = NULL;
977 	struct spdk_nbd_disk		*nbd = NULL;
978 	struct spdk_bdev		*bdev;
979 	int				rc;
980 	int				sp[2];
981 
982 	bdev = spdk_bdev_get_by_name(bdev_name);
983 	if (bdev == NULL) {
984 		SPDK_ERRLOG("no bdev %s exists\n", bdev_name);
985 		rc = -EINVAL;
986 		goto err;
987 	}
988 
989 	nbd = calloc(1, sizeof(*nbd));
990 	if (nbd == NULL) {
991 		rc = -ENOMEM;
992 		goto err;
993 	}
994 
995 	nbd->dev_fd = -1;
996 	nbd->spdk_sp_fd = -1;
997 	nbd->kernel_sp_fd = -1;
998 
999 	ctx = calloc(1, sizeof(*ctx));
1000 	if (ctx == NULL) {
1001 		rc = -ENOMEM;
1002 		goto err;
1003 	}
1004 
1005 	ctx->nbd = nbd;
1006 	ctx->cb_fn = cb_fn;
1007 	ctx->cb_arg = cb_arg;
1008 	ctx->polling_count = NBD_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US;
1009 
1010 	rc = spdk_bdev_open(bdev, true, spdk_nbd_bdev_hot_remove, nbd, &nbd->bdev_desc);
1011 	if (rc != 0) {
1012 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", spdk_bdev_get_name(bdev), rc);
1013 		goto err;
1014 	}
1015 
1016 	nbd->bdev = bdev;
1017 
1018 	nbd->ch = spdk_bdev_get_io_channel(nbd->bdev_desc);
1019 	nbd->buf_align = spdk_max(spdk_bdev_get_buf_align(bdev), 64);
1020 
1021 	rc = socketpair(AF_UNIX, SOCK_STREAM, 0, sp);
1022 	if (rc != 0) {
1023 		SPDK_ERRLOG("socketpair failed\n");
1024 		rc = -errno;
1025 		goto err;
1026 	}
1027 
1028 	nbd->spdk_sp_fd = sp[0];
1029 	nbd->kernel_sp_fd = sp[1];
1030 	nbd->nbd_path = strdup(nbd_path);
1031 	if (!nbd->nbd_path) {
1032 		SPDK_ERRLOG("strdup allocation failure\n");
1033 		rc = -ENOMEM;
1034 		goto err;
1035 	}
1036 
1037 	TAILQ_INIT(&nbd->received_io_list);
1038 	TAILQ_INIT(&nbd->executed_io_list);
1039 
1040 	/* Make sure nbd_path is not used in this SPDK app */
1041 	if (spdk_nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
1042 		SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path);
1043 		rc = -EBUSY;
1044 		goto err;
1045 	}
1046 
1047 	nbd->dev_fd = open(nbd_path, O_RDWR);
1048 	if (nbd->dev_fd == -1) {
1049 		SPDK_ERRLOG("open(\"%s\") failed: %s\n", nbd_path, spdk_strerror(errno));
1050 		rc = -errno;
1051 		goto err;
1052 	}
1053 
1054 	SPDK_INFOLOG(SPDK_LOG_NBD, "Enabling kernel access to bdev %s via %s\n",
1055 		     spdk_bdev_get_name(bdev), nbd_path);
1056 
1057 	spdk_nbd_enable_kernel(ctx);
1058 	return;
1059 
1060 err:
1061 	free(ctx);
1062 	if (nbd) {
1063 		spdk_nbd_stop(nbd);
1064 	}
1065 
1066 	if (cb_fn) {
1067 		cb_fn(cb_arg, NULL, rc);
1068 	}
1069 }
1070 
1071 const char *
1072 spdk_nbd_get_path(struct spdk_nbd_disk *nbd)
1073 {
1074 	return nbd->nbd_path;
1075 }
1076 
1077 SPDK_LOG_REGISTER_COMPONENT("nbd", SPDK_LOG_NBD)
1078