xref: /spdk/lib/nbd/nbd.c (revision 407e88fd2ab020d753e33014cf759353a9901b51)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/string.h"
36 
37 #include <linux/nbd.h>
38 
39 #include "spdk/nbd.h"
40 #include "nbd_internal.h"
41 #include "spdk/bdev.h"
42 #include "spdk/endian.h"
43 #include "spdk/env.h"
44 #include "spdk/log.h"
45 #include "spdk/util.h"
46 #include "spdk/thread.h"
47 #include "spdk/event.h"
48 
49 #include "spdk_internal/log.h"
50 #include "spdk/queue.h"
51 
52 #define GET_IO_LOOP_COUNT		16
53 #define NBD_BUSY_WAITING_MS		1000
54 #define NBD_BUSY_POLLING_INTERVAL_US	20000
55 
56 enum nbd_io_state_t {
57 	/* Receiving or ready to receive nbd request header */
58 	NBD_IO_RECV_REQ = 0,
59 	/* Receiving write payload */
60 	NBD_IO_RECV_PAYLOAD,
61 	/* Transmitting or ready to transmit nbd response header */
62 	NBD_IO_XMIT_RESP,
63 	/* Transmitting read payload */
64 	NBD_IO_XMIT_PAYLOAD,
65 };
66 
67 struct nbd_io {
68 	struct spdk_nbd_disk	*nbd;
69 	enum nbd_io_state_t	state;
70 
71 	void			*payload;
72 	uint32_t		payload_size;
73 
74 	struct nbd_request	req;
75 	struct nbd_reply	resp;
76 
77 	/*
78 	 * Tracks current progress on reading/writing a request,
79 	 * response, or payload from the nbd socket.
80 	 */
81 	uint32_t		offset;
82 
83 	/* for bdev io_wait */
84 	struct spdk_bdev_io_wait_entry bdev_io_wait;
85 
86 	TAILQ_ENTRY(nbd_io)	tailq;
87 };
88 
89 enum nbd_disk_state_t {
90 	NBD_DISK_STATE_RUNNING = 0,
91 	/* soft disconnection caused by receiving nbd_cmd_disc */
92 	NBD_DISK_STATE_SOFTDISC,
93 	/* hard disconnection caused by mandatory conditions */
94 	NBD_DISK_STATE_HARDDISC,
95 };
96 
97 struct spdk_nbd_disk {
98 	struct spdk_bdev	*bdev;
99 	struct spdk_bdev_desc	*bdev_desc;
100 	struct spdk_io_channel	*ch;
101 	int			dev_fd;
102 	char			*nbd_path;
103 	int			kernel_sp_fd;
104 	int			spdk_sp_fd;
105 	struct spdk_poller	*nbd_poller;
106 	uint32_t		buf_align;
107 
108 	struct nbd_io		*io_in_recv;
109 	TAILQ_HEAD(, nbd_io)	received_io_list;
110 	TAILQ_HEAD(, nbd_io)	executed_io_list;
111 
112 	enum nbd_disk_state_t	state;
113 	/* count of nbd_io in spdk_nbd_disk */
114 	int			io_count;
115 
116 	TAILQ_ENTRY(spdk_nbd_disk)	tailq;
117 };
118 
119 struct spdk_nbd_disk_globals {
120 	TAILQ_HEAD(, spdk_nbd_disk)	disk_head;
121 };
122 
123 static struct spdk_nbd_disk_globals g_spdk_nbd;
124 
125 static int
126 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io);
127 
128 int
129 spdk_nbd_init(void)
130 {
131 	TAILQ_INIT(&g_spdk_nbd.disk_head);
132 
133 	return 0;
134 }
135 
136 void
137 spdk_nbd_fini(void)
138 {
139 	struct spdk_nbd_disk *nbd_idx, *nbd_tmp;
140 
141 	/*
142 	 * Stop running spdk_nbd_disk.
143 	 * Here, nbd removing are unnecessary, but _SAFE variant
144 	 * is needed, since internal spdk_nbd_disk_unregister will
145 	 * remove nbd from TAILQ.
146 	 */
147 	TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) {
148 		spdk_nbd_stop(nbd_idx);
149 	}
150 }
151 
152 static int
153 spdk_nbd_disk_register(struct spdk_nbd_disk *nbd)
154 {
155 	if (spdk_nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
156 		SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path);
157 		return -EBUSY;
158 	}
159 
160 	TAILQ_INSERT_TAIL(&g_spdk_nbd.disk_head, nbd, tailq);
161 
162 	return 0;
163 }
164 
165 static void
166 spdk_nbd_disk_unregister(struct spdk_nbd_disk *nbd)
167 {
168 	struct spdk_nbd_disk *nbd_idx, *nbd_tmp;
169 
170 	/*
171 	 * nbd disk may be stopped before registered.
172 	 * check whether it was registered.
173 	 */
174 	TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) {
175 		if (nbd == nbd_idx) {
176 			TAILQ_REMOVE(&g_spdk_nbd.disk_head, nbd_idx, tailq);
177 			break;
178 		}
179 	}
180 }
181 
182 struct spdk_nbd_disk *
183 spdk_nbd_disk_find_by_nbd_path(const char *nbd_path)
184 {
185 	struct spdk_nbd_disk *nbd;
186 
187 	/*
188 	 * check whether nbd has already been registered by nbd path.
189 	 */
190 	TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) {
191 		if (!strcmp(nbd->nbd_path, nbd_path)) {
192 			return nbd;
193 		}
194 	}
195 
196 	return NULL;
197 }
198 
199 struct spdk_nbd_disk *spdk_nbd_disk_first(void)
200 {
201 	return TAILQ_FIRST(&g_spdk_nbd.disk_head);
202 }
203 
204 struct spdk_nbd_disk *spdk_nbd_disk_next(struct spdk_nbd_disk *prev)
205 {
206 	return TAILQ_NEXT(prev, tailq);
207 }
208 
209 const char *
210 spdk_nbd_disk_get_nbd_path(struct spdk_nbd_disk *nbd)
211 {
212 	return nbd->nbd_path;
213 }
214 
215 const char *
216 spdk_nbd_disk_get_bdev_name(struct spdk_nbd_disk *nbd)
217 {
218 	return spdk_bdev_get_name(nbd->bdev);
219 }
220 
221 void
222 spdk_nbd_write_config_json(struct spdk_json_write_ctx *w)
223 {
224 	struct spdk_nbd_disk *nbd;
225 
226 	spdk_json_write_array_begin(w);
227 
228 	TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) {
229 		spdk_json_write_object_begin(w);
230 
231 		spdk_json_write_named_string(w, "method", "start_nbd_disk");
232 
233 		spdk_json_write_named_object_begin(w, "params");
234 		spdk_json_write_named_string(w, "nbd_device",  spdk_nbd_disk_get_nbd_path(nbd));
235 		spdk_json_write_named_string(w, "bdev_name", spdk_nbd_disk_get_bdev_name(nbd));
236 		spdk_json_write_object_end(w);
237 
238 		spdk_json_write_object_end(w);
239 	}
240 
241 	spdk_json_write_array_end(w);
242 }
243 
244 void
245 nbd_disconnect(struct spdk_nbd_disk *nbd)
246 {
247 	/*
248 	 * nbd soft-disconnection to terminate transmission phase.
249 	 * After receiving this ioctl command, nbd kernel module will send
250 	 * a NBD_CMD_DISC type io to nbd server in order to inform server.
251 	 */
252 	ioctl(nbd->dev_fd, NBD_DISCONNECT);
253 }
254 
255 static struct nbd_io *
256 spdk_get_nbd_io(struct spdk_nbd_disk *nbd)
257 {
258 	struct nbd_io *io;
259 
260 	io = calloc(1, sizeof(*io));
261 	if (!io) {
262 		return NULL;
263 	}
264 
265 	io->nbd = nbd;
266 	to_be32(&io->resp.magic, NBD_REPLY_MAGIC);
267 
268 	nbd->io_count++;
269 
270 	return io;
271 }
272 
273 static void
274 spdk_put_nbd_io(struct spdk_nbd_disk *nbd, struct nbd_io *io)
275 {
276 	if (io->payload) {
277 		spdk_free(io->payload);
278 	}
279 	free(io);
280 
281 	nbd->io_count--;
282 }
283 
284 /*
285  * Check whether received nbd_io are all transmitted.
286  *
287  * \return 1 there is still some nbd_io not transmitted.
288  *         0 all nbd_io received are transmitted.
289  */
290 static int
291 spdk_nbd_io_xmit_check(struct spdk_nbd_disk *nbd)
292 {
293 	if (nbd->io_count == 0) {
294 		return 0;
295 	} else if (nbd->io_count == 1 && nbd->io_in_recv != NULL) {
296 		return 0;
297 	}
298 
299 	return 1;
300 }
301 
302 /*
303  * Check whether received nbd_io are all executed,
304  * and put back executed nbd_io instead of transmitting them
305  *
306  * \return 1 there is still some nbd_io under executing
307  *         0 all nbd_io gotten are freed.
308  */
309 static int
310 spdk_nbd_cleanup_io(struct spdk_nbd_disk *nbd)
311 {
312 	struct nbd_io *io, *io_tmp;
313 
314 	/* free io_in_recv */
315 	if (nbd->io_in_recv != NULL) {
316 		spdk_put_nbd_io(nbd, nbd->io_in_recv);
317 		nbd->io_in_recv = NULL;
318 	}
319 
320 	/* free io in received_io_list */
321 	if (!TAILQ_EMPTY(&nbd->received_io_list)) {
322 		TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) {
323 			TAILQ_REMOVE(&nbd->received_io_list, io, tailq);
324 			spdk_put_nbd_io(nbd, io);
325 		}
326 	}
327 
328 	/* free io in executed_io_list */
329 	if (!TAILQ_EMPTY(&nbd->executed_io_list)) {
330 		TAILQ_FOREACH_SAFE(io, &nbd->executed_io_list, tailq, io_tmp) {
331 			TAILQ_REMOVE(&nbd->executed_io_list, io, tailq);
332 			spdk_put_nbd_io(nbd, io);
333 		}
334 	}
335 
336 	/*
337 	 * Some nbd_io may be under executing in bdev.
338 	 * Wait for their done operation.
339 	 */
340 	if (nbd->io_count != 0) {
341 		return 1;
342 	}
343 
344 	return 0;
345 }
346 
347 static void
348 _nbd_stop(struct spdk_nbd_disk *nbd)
349 {
350 	if (nbd->ch) {
351 		spdk_put_io_channel(nbd->ch);
352 	}
353 
354 	if (nbd->bdev_desc) {
355 		spdk_bdev_close(nbd->bdev_desc);
356 	}
357 
358 	if (nbd->spdk_sp_fd >= 0) {
359 		close(nbd->spdk_sp_fd);
360 	}
361 
362 	if (nbd->kernel_sp_fd >= 0) {
363 		close(nbd->kernel_sp_fd);
364 	}
365 
366 	if (nbd->dev_fd >= 0) {
367 		/* Clear nbd device only if it is occupied by SPDK app */
368 		if (nbd->nbd_path && spdk_nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
369 			ioctl(nbd->dev_fd, NBD_CLEAR_QUE);
370 			ioctl(nbd->dev_fd, NBD_CLEAR_SOCK);
371 		}
372 		close(nbd->dev_fd);
373 	}
374 
375 	if (nbd->nbd_path) {
376 		free(nbd->nbd_path);
377 	}
378 
379 	if (nbd->nbd_poller) {
380 		spdk_poller_unregister(&nbd->nbd_poller);
381 	}
382 
383 	spdk_nbd_disk_unregister(nbd);
384 
385 	free(nbd);
386 }
387 
388 void
389 spdk_nbd_stop(struct spdk_nbd_disk *nbd)
390 {
391 	if (nbd == NULL) {
392 		return;
393 	}
394 
395 	nbd->state = NBD_DISK_STATE_HARDDISC;
396 
397 	/*
398 	 * Stop action should be called only after all nbd_io are executed.
399 	 */
400 	if (!spdk_nbd_cleanup_io(nbd)) {
401 		_nbd_stop(nbd);
402 	}
403 }
404 
405 static int64_t
406 read_from_socket(int fd, void *buf, size_t length)
407 {
408 	ssize_t bytes_read;
409 
410 	bytes_read = read(fd, buf, length);
411 	if (bytes_read == 0) {
412 		return -EIO;
413 	} else if (bytes_read == -1) {
414 		if (errno != EAGAIN) {
415 			return -errno;
416 		}
417 		return 0;
418 	} else {
419 		return bytes_read;
420 	}
421 }
422 
423 static int64_t
424 write_to_socket(int fd, void *buf, size_t length)
425 {
426 	ssize_t bytes_written;
427 
428 	bytes_written = write(fd, buf, length);
429 	if (bytes_written == 0) {
430 		return -EIO;
431 	} else if (bytes_written == -1) {
432 		if (errno != EAGAIN) {
433 			return -errno;
434 		}
435 		return 0;
436 	} else {
437 		return bytes_written;
438 	}
439 }
440 
441 static void
442 nbd_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
443 {
444 	struct nbd_io	*io = cb_arg;
445 	struct spdk_nbd_disk *nbd = io->nbd;
446 
447 	if (success) {
448 		io->resp.error = 0;
449 	} else {
450 		to_be32(&io->resp.error, EIO);
451 	}
452 
453 	memcpy(&io->resp.handle, &io->req.handle, sizeof(io->resp.handle));
454 	TAILQ_INSERT_TAIL(&nbd->executed_io_list, io, tailq);
455 
456 	if (bdev_io != NULL) {
457 		spdk_bdev_free_io(bdev_io);
458 	}
459 
460 	if (nbd->state == NBD_DISK_STATE_HARDDISC && !spdk_nbd_cleanup_io(nbd)) {
461 		_nbd_stop(nbd);
462 	}
463 }
464 
465 static void
466 nbd_resubmit_io(void *arg)
467 {
468 	struct nbd_io *io = (struct nbd_io *)arg;
469 	struct spdk_nbd_disk *nbd = io->nbd;
470 	int rc = 0;
471 
472 	rc = nbd_submit_bdev_io(nbd, io);
473 	if (rc) {
474 		SPDK_INFOLOG(SPDK_LOG_NBD, "nbd: io resubmit for dev %s , io_type %d, returned %d.\n",
475 			     spdk_nbd_disk_get_bdev_name(nbd), from_be32(&io->req.type), rc);
476 	}
477 }
478 
479 static void
480 nbd_queue_io(struct nbd_io *io)
481 {
482 	int rc;
483 	struct spdk_bdev *bdev = io->nbd->bdev;
484 
485 	io->bdev_io_wait.bdev = bdev;
486 	io->bdev_io_wait.cb_fn = nbd_resubmit_io;
487 	io->bdev_io_wait.cb_arg = io;
488 
489 	rc = spdk_bdev_queue_io_wait(bdev, io->nbd->ch, &io->bdev_io_wait);
490 	if (rc != 0) {
491 		SPDK_ERRLOG("Queue io failed in nbd_queue_io, rc=%d.\n", rc);
492 		nbd_io_done(NULL, false, io);
493 	}
494 }
495 
496 static int
497 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io)
498 {
499 	struct spdk_bdev_desc *desc = nbd->bdev_desc;
500 	struct spdk_io_channel *ch = nbd->ch;
501 	int rc = 0;
502 
503 	switch (from_be32(&io->req.type)) {
504 	case NBD_CMD_READ:
505 		rc = spdk_bdev_read(desc, ch, io->payload, from_be64(&io->req.from),
506 				    io->payload_size, nbd_io_done, io);
507 		break;
508 	case NBD_CMD_WRITE:
509 		rc = spdk_bdev_write(desc, ch, io->payload, from_be64(&io->req.from),
510 				     io->payload_size, nbd_io_done, io);
511 		break;
512 #ifdef NBD_FLAG_SEND_FLUSH
513 	case NBD_CMD_FLUSH:
514 		rc = spdk_bdev_flush(desc, ch, 0,
515 				     spdk_bdev_get_num_blocks(nbd->bdev) * spdk_bdev_get_block_size(nbd->bdev),
516 				     nbd_io_done, io);
517 		break;
518 #endif
519 #ifdef NBD_FLAG_SEND_TRIM
520 	case NBD_CMD_TRIM:
521 		rc = spdk_bdev_unmap(desc, ch, from_be64(&io->req.from),
522 				     from_be32(&io->req.len), nbd_io_done, io);
523 		break;
524 #endif
525 	case NBD_CMD_DISC:
526 		spdk_put_nbd_io(nbd, io);
527 		nbd->state = NBD_DISK_STATE_SOFTDISC;
528 		break;
529 	default:
530 		rc = -1;
531 	}
532 
533 	if (rc < 0) {
534 		if (rc == -ENOMEM) {
535 			SPDK_INFOLOG(SPDK_LOG_NBD, "No memory, start to queue io.\n");
536 			nbd_queue_io(io);
537 		} else {
538 			SPDK_ERRLOG("nbd io failed in nbd_queue_io, rc=%d.\n", rc);
539 			nbd_io_done(NULL, false, io);
540 		}
541 	}
542 
543 	return 0;
544 }
545 
546 static int
547 spdk_nbd_io_exec(struct spdk_nbd_disk *nbd)
548 {
549 	struct nbd_io *io, *io_tmp;
550 	int ret = 0;
551 
552 	/*
553 	 * For soft disconnection, nbd server must handle all outstanding
554 	 * request before closing connection.
555 	 */
556 	if (nbd->state == NBD_DISK_STATE_HARDDISC) {
557 		return 0;
558 	}
559 
560 	if (!TAILQ_EMPTY(&nbd->received_io_list)) {
561 		TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) {
562 			TAILQ_REMOVE(&nbd->received_io_list, io, tailq);
563 			ret = nbd_submit_bdev_io(nbd, io);
564 			if (ret < 0) {
565 				break;
566 			}
567 		}
568 	}
569 
570 	return ret;
571 }
572 
573 static int
574 spdk_nbd_io_recv_internal(struct spdk_nbd_disk *nbd)
575 {
576 	struct nbd_io *io;
577 	int ret = 0;
578 
579 	if (nbd->io_in_recv == NULL) {
580 		nbd->io_in_recv = spdk_get_nbd_io(nbd);
581 		if (!nbd->io_in_recv) {
582 			return -ENOMEM;
583 		}
584 	}
585 
586 	io = nbd->io_in_recv;
587 
588 	if (io->state == NBD_IO_RECV_REQ) {
589 		ret = read_from_socket(nbd->spdk_sp_fd, (char *)&io->req + io->offset,
590 				       sizeof(io->req) - io->offset);
591 		if (ret < 0) {
592 			spdk_put_nbd_io(nbd, io);
593 			nbd->io_in_recv = NULL;
594 			return ret;
595 		}
596 
597 		io->offset += ret;
598 
599 		/* request is fully received */
600 		if (io->offset == sizeof(io->req)) {
601 			io->offset = 0;
602 
603 			/* req magic check */
604 			if (from_be32(&io->req.magic) != NBD_REQUEST_MAGIC) {
605 				SPDK_ERRLOG("invalid request magic\n");
606 				spdk_put_nbd_io(nbd, io);
607 				nbd->io_in_recv = NULL;
608 				return -EINVAL;
609 			}
610 
611 			/* io except read/write should ignore payload */
612 			if (from_be32(&io->req.type) == NBD_CMD_WRITE ||
613 			    from_be32(&io->req.type) == NBD_CMD_READ) {
614 				io->payload_size = from_be32(&io->req.len);
615 			} else {
616 				io->payload_size = 0;
617 			}
618 
619 			/* io payload allocate */
620 			if (io->payload_size) {
621 				io->payload = spdk_malloc(io->payload_size, nbd->buf_align, NULL,
622 							  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
623 				if (io->payload == NULL) {
624 					SPDK_ERRLOG("could not allocate io->payload of size %d\n", io->payload_size);
625 					spdk_put_nbd_io(nbd, io);
626 					nbd->io_in_recv = NULL;
627 					return -ENOMEM;
628 				}
629 			} else {
630 				io->payload = NULL;
631 			}
632 
633 			/* next io step */
634 			if (from_be32(&io->req.type) == NBD_CMD_WRITE) {
635 				io->state = NBD_IO_RECV_PAYLOAD;
636 			} else {
637 				io->state = NBD_IO_XMIT_RESP;
638 				nbd->io_in_recv = NULL;
639 				TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq);
640 			}
641 		}
642 	}
643 
644 	if (io->state == NBD_IO_RECV_PAYLOAD) {
645 		ret = read_from_socket(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset);
646 		if (ret < 0) {
647 			spdk_put_nbd_io(nbd, io);
648 			nbd->io_in_recv = NULL;
649 			return ret;
650 		}
651 
652 		io->offset += ret;
653 
654 		/* request payload is fully received */
655 		if (io->offset == io->payload_size) {
656 			io->offset = 0;
657 			io->state = NBD_IO_XMIT_RESP;
658 			nbd->io_in_recv = NULL;
659 			TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq);
660 		}
661 
662 	}
663 
664 	return 0;
665 }
666 
667 static int
668 spdk_nbd_io_recv(struct spdk_nbd_disk *nbd)
669 {
670 	int i, ret = 0;
671 
672 	/*
673 	 * nbd server should not accept request in both soft and hard
674 	 * disconnect states.
675 	 */
676 	if (nbd->state != NBD_DISK_STATE_RUNNING) {
677 		return 0;
678 	}
679 
680 	for (i = 0; i < GET_IO_LOOP_COUNT; i++) {
681 		ret = spdk_nbd_io_recv_internal(nbd);
682 		if (ret != 0) {
683 			return ret;
684 		}
685 	}
686 
687 	return 0;
688 }
689 
690 static int
691 spdk_nbd_io_xmit_internal(struct spdk_nbd_disk *nbd)
692 {
693 	struct nbd_io *io;
694 	int ret = 0;
695 
696 	io = TAILQ_FIRST(&nbd->executed_io_list);
697 	if (io == NULL) {
698 		return 0;
699 	}
700 
701 	/* Remove IO from list now assuming it will be completed.  It will be inserted
702 	 *  back to the head if it cannot be completed.  This approach is specifically
703 	 *  taken to work around a scan-build use-after-free mischaracterization.
704 	 */
705 	TAILQ_REMOVE(&nbd->executed_io_list, io, tailq);
706 
707 	/* resp error and handler are already set in io_done */
708 
709 	if (io->state == NBD_IO_XMIT_RESP) {
710 		ret = write_to_socket(nbd->spdk_sp_fd, (char *)&io->resp + io->offset,
711 				      sizeof(io->resp) - io->offset);
712 		if (ret <= 0) {
713 			goto reinsert;
714 		}
715 
716 		io->offset += ret;
717 
718 		/* response is fully transmitted */
719 		if (io->offset == sizeof(io->resp)) {
720 			io->offset = 0;
721 
722 			/* transmit payload only when NBD_CMD_READ with no resp error */
723 			if (from_be32(&io->req.type) != NBD_CMD_READ || io->resp.error != 0) {
724 				spdk_put_nbd_io(nbd, io);
725 				return 0;
726 			} else {
727 				io->state = NBD_IO_XMIT_PAYLOAD;
728 			}
729 		}
730 	}
731 
732 	if (io->state == NBD_IO_XMIT_PAYLOAD) {
733 		ret = write_to_socket(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset);
734 		if (ret <= 0) {
735 			goto reinsert;
736 		}
737 
738 		io->offset += ret;
739 
740 		/* read payload is fully transmitted */
741 		if (io->offset == io->payload_size) {
742 			spdk_put_nbd_io(nbd, io);
743 			return 0;
744 		}
745 	}
746 
747 reinsert:
748 	TAILQ_INSERT_HEAD(&nbd->executed_io_list, io, tailq);
749 	return ret;
750 }
751 
752 static int
753 spdk_nbd_io_xmit(struct spdk_nbd_disk *nbd)
754 {
755 	int ret = 0;
756 
757 	/*
758 	 * For soft disconnection, nbd server must handle all outstanding
759 	 * request before closing connection.
760 	 */
761 	if (nbd->state == NBD_DISK_STATE_HARDDISC) {
762 		return 0;
763 	}
764 
765 	while (!TAILQ_EMPTY(&nbd->executed_io_list)) {
766 		ret = spdk_nbd_io_xmit_internal(nbd);
767 		if (ret != 0) {
768 			return ret;
769 		}
770 	}
771 
772 	/*
773 	 * For soft disconnection, nbd server can close connection after all
774 	 * outstanding request are transmitted.
775 	 */
776 	if (nbd->state == NBD_DISK_STATE_SOFTDISC && !spdk_nbd_io_xmit_check(nbd)) {
777 		return -1;
778 	}
779 
780 	return 0;
781 }
782 
783 /**
784  * Poll an NBD instance.
785  *
786  * \return 0 on success or negated errno values on error (e.g. connection closed).
787  */
788 static int
789 _spdk_nbd_poll(struct spdk_nbd_disk *nbd)
790 {
791 	int rc;
792 
793 	/* transmit executed io first */
794 	rc = spdk_nbd_io_xmit(nbd);
795 	if (rc < 0) {
796 		return rc;
797 	}
798 
799 	rc = spdk_nbd_io_recv(nbd);
800 	if (rc < 0) {
801 		return rc;
802 	}
803 
804 	rc = spdk_nbd_io_exec(nbd);
805 
806 	return rc;
807 }
808 
809 static int
810 spdk_nbd_poll(void *arg)
811 {
812 	struct spdk_nbd_disk *nbd = arg;
813 	int rc;
814 
815 	rc = _spdk_nbd_poll(nbd);
816 	if (rc < 0) {
817 		SPDK_INFOLOG(SPDK_LOG_NBD, "spdk_nbd_poll() returned %s (%d); closing connection\n",
818 			     spdk_strerror(-rc), rc);
819 		spdk_nbd_stop(nbd);
820 	}
821 
822 	return -1;
823 }
824 
825 static void *
826 nbd_start_kernel(void *arg)
827 {
828 	int dev_fd = (int)(intptr_t)arg;
829 
830 	spdk_unaffinitize_thread();
831 
832 	/* This will block in the kernel until we close the spdk_sp_fd. */
833 	ioctl(dev_fd, NBD_DO_IT);
834 
835 	pthread_exit(NULL);
836 }
837 
838 static void
839 spdk_nbd_bdev_hot_remove(void *remove_ctx)
840 {
841 	struct spdk_nbd_disk *nbd = remove_ctx;
842 
843 	spdk_nbd_stop(nbd);
844 }
845 
846 struct spdk_nbd_start_ctx {
847 	struct spdk_nbd_disk	*nbd;
848 	spdk_nbd_start_cb	cb_fn;
849 	void			*cb_arg;
850 	struct spdk_poller	*poller;
851 	int			polling_count;
852 };
853 
854 static void
855 spdk_nbd_start_complete(struct spdk_nbd_start_ctx *ctx)
856 {
857 	int		rc;
858 	pthread_t	tid;
859 	int		flag;
860 
861 	/* Add nbd_disk to the end of disk list */
862 	rc = spdk_nbd_disk_register(ctx->nbd);
863 	if (rc != 0) {
864 		SPDK_ERRLOG("Failed to register %s, it should not happen.\n", ctx->nbd->nbd_path);
865 		assert(false);
866 		goto err;
867 	}
868 
869 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_BLKSIZE, spdk_bdev_get_block_size(ctx->nbd->bdev));
870 	if (rc == -1) {
871 		SPDK_ERRLOG("ioctl(NBD_SET_BLKSIZE) failed: %s\n", spdk_strerror(errno));
872 		rc = -errno;
873 		goto err;
874 	}
875 
876 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SIZE_BLOCKS, spdk_bdev_get_num_blocks(ctx->nbd->bdev));
877 	if (rc == -1) {
878 		SPDK_ERRLOG("ioctl(NBD_SET_SIZE_BLOCKS) failed: %s\n", spdk_strerror(errno));
879 		rc = -errno;
880 		goto err;
881 	}
882 
883 #ifdef NBD_FLAG_SEND_TRIM
884 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_FLAGS, NBD_FLAG_SEND_TRIM);
885 	if (rc == -1) {
886 		SPDK_ERRLOG("ioctl(NBD_SET_FLAGS) failed: %s\n", spdk_strerror(errno));
887 		rc = -errno;
888 		goto err;
889 	}
890 #endif
891 
892 	rc = pthread_create(&tid, NULL, nbd_start_kernel, (void *)(intptr_t)ctx->nbd->dev_fd);
893 	if (rc != 0) {
894 		SPDK_ERRLOG("could not create thread: %s\n", spdk_strerror(rc));
895 		rc = -rc;
896 		goto err;
897 	}
898 
899 	rc = pthread_detach(tid);
900 	if (rc != 0) {
901 		SPDK_ERRLOG("could not detach thread for nbd kernel: %s\n", spdk_strerror(rc));
902 		rc = -rc;
903 		goto err;
904 	}
905 
906 	flag = fcntl(ctx->nbd->spdk_sp_fd, F_GETFL);
907 	if (fcntl(ctx->nbd->spdk_sp_fd, F_SETFL, flag | O_NONBLOCK) < 0) {
908 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
909 			    ctx->nbd->spdk_sp_fd, spdk_strerror(errno));
910 		rc = -errno;
911 		goto err;
912 	}
913 
914 	ctx->nbd->nbd_poller = spdk_poller_register(spdk_nbd_poll, ctx->nbd, 0);
915 
916 	if (ctx->cb_fn) {
917 		ctx->cb_fn(ctx->cb_arg, ctx->nbd, 0);
918 	}
919 
920 	free(ctx);
921 	return;
922 
923 err:
924 	spdk_nbd_stop(ctx->nbd);
925 	if (ctx->cb_fn) {
926 		ctx->cb_fn(ctx->cb_arg, NULL, rc);
927 	}
928 	free(ctx);
929 }
930 
931 static int
932 spdk_nbd_enable_kernel(void *arg)
933 {
934 	struct spdk_nbd_start_ctx *ctx = arg;
935 	int rc;
936 
937 	/* Declare device setup by this process */
938 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SOCK, ctx->nbd->kernel_sp_fd);
939 	if (rc == -1) {
940 		if (errno == EBUSY && ctx->polling_count-- > 0) {
941 			if (ctx->poller == NULL) {
942 				ctx->poller = spdk_poller_register(spdk_nbd_enable_kernel, ctx,
943 								   NBD_BUSY_POLLING_INTERVAL_US);
944 			}
945 			/* If the kernel is busy, check back later */
946 			return 0;
947 		}
948 
949 		SPDK_ERRLOG("ioctl(NBD_SET_SOCK) failed: %s\n", spdk_strerror(errno));
950 		if (ctx->poller) {
951 			spdk_poller_unregister(&ctx->poller);
952 		}
953 
954 		spdk_nbd_stop(ctx->nbd);
955 
956 		if (ctx->cb_fn) {
957 			ctx->cb_fn(ctx->cb_arg, NULL, -errno);
958 		}
959 
960 		free(ctx);
961 		return 1;
962 	}
963 
964 	if (ctx->poller) {
965 		spdk_poller_unregister(&ctx->poller);
966 	}
967 
968 	spdk_nbd_start_complete(ctx);
969 
970 	return 1;
971 }
972 
973 void
974 spdk_nbd_start(const char *bdev_name, const char *nbd_path,
975 	       spdk_nbd_start_cb cb_fn, void *cb_arg)
976 {
977 	struct spdk_nbd_start_ctx	*ctx = NULL;
978 	struct spdk_nbd_disk		*nbd = NULL;
979 	struct spdk_bdev		*bdev;
980 	int				rc;
981 	int				sp[2];
982 
983 	bdev = spdk_bdev_get_by_name(bdev_name);
984 	if (bdev == NULL) {
985 		SPDK_ERRLOG("no bdev %s exists\n", bdev_name);
986 		rc = -EINVAL;
987 		goto err;
988 	}
989 
990 	nbd = calloc(1, sizeof(*nbd));
991 	if (nbd == NULL) {
992 		rc = -ENOMEM;
993 		goto err;
994 	}
995 
996 	nbd->dev_fd = -1;
997 	nbd->spdk_sp_fd = -1;
998 	nbd->kernel_sp_fd = -1;
999 
1000 	ctx = calloc(1, sizeof(*ctx));
1001 	if (ctx == NULL) {
1002 		rc = -ENOMEM;
1003 		goto err;
1004 	}
1005 
1006 	ctx->nbd = nbd;
1007 	ctx->cb_fn = cb_fn;
1008 	ctx->cb_arg = cb_arg;
1009 	ctx->polling_count = NBD_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US;
1010 
1011 	rc = spdk_bdev_open(bdev, true, spdk_nbd_bdev_hot_remove, nbd, &nbd->bdev_desc);
1012 	if (rc != 0) {
1013 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", spdk_bdev_get_name(bdev), rc);
1014 		goto err;
1015 	}
1016 
1017 	nbd->bdev = bdev;
1018 
1019 	nbd->ch = spdk_bdev_get_io_channel(nbd->bdev_desc);
1020 	nbd->buf_align = spdk_max(spdk_bdev_get_buf_align(bdev), 64);
1021 
1022 	rc = socketpair(AF_UNIX, SOCK_STREAM, 0, sp);
1023 	if (rc != 0) {
1024 		SPDK_ERRLOG("socketpair failed\n");
1025 		rc = -errno;
1026 		goto err;
1027 	}
1028 
1029 	nbd->spdk_sp_fd = sp[0];
1030 	nbd->kernel_sp_fd = sp[1];
1031 	nbd->nbd_path = strdup(nbd_path);
1032 	if (!nbd->nbd_path) {
1033 		SPDK_ERRLOG("strdup allocation failure\n");
1034 		rc = -ENOMEM;
1035 		goto err;
1036 	}
1037 
1038 	TAILQ_INIT(&nbd->received_io_list);
1039 	TAILQ_INIT(&nbd->executed_io_list);
1040 
1041 	/* Make sure nbd_path is not used in this SPDK app */
1042 	if (spdk_nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
1043 		SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path);
1044 		rc = -EBUSY;
1045 		goto err;
1046 	}
1047 
1048 	nbd->dev_fd = open(nbd_path, O_RDWR);
1049 	if (nbd->dev_fd == -1) {
1050 		SPDK_ERRLOG("open(\"%s\") failed: %s\n", nbd_path, spdk_strerror(errno));
1051 		rc = -errno;
1052 		goto err;
1053 	}
1054 
1055 	SPDK_INFOLOG(SPDK_LOG_NBD, "Enabling kernel access to bdev %s via %s\n",
1056 		     spdk_bdev_get_name(bdev), nbd_path);
1057 
1058 	spdk_nbd_enable_kernel(ctx);
1059 	return;
1060 
1061 err:
1062 	free(ctx);
1063 	if (nbd) {
1064 		spdk_nbd_stop(nbd);
1065 	}
1066 
1067 	if (cb_fn) {
1068 		cb_fn(cb_arg, NULL, rc);
1069 	}
1070 }
1071 
1072 const char *
1073 spdk_nbd_get_path(struct spdk_nbd_disk *nbd)
1074 {
1075 	return nbd->nbd_path;
1076 }
1077 
1078 SPDK_LOG_REGISTER_COMPONENT("nbd", SPDK_LOG_NBD)
1079