xref: /spdk/lib/nbd/nbd.c (revision 4e8e97c886e47e337dc470ac8c1ffa044d729af0)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/string.h"
36 
37 #include <linux/nbd.h>
38 
39 #include "spdk/nbd.h"
40 #include "nbd_internal.h"
41 #include "spdk/bdev.h"
42 #include "spdk/endian.h"
43 #include "spdk/env.h"
44 #include "spdk/log.h"
45 #include "spdk/util.h"
46 #include "spdk/thread.h"
47 
48 #include "spdk/queue.h"
49 
50 #define GET_IO_LOOP_COUNT		16
51 #define NBD_BUSY_WAITING_MS		1000
52 #define NBD_BUSY_POLLING_INTERVAL_US	20000
53 
54 enum nbd_io_state_t {
55 	/* Receiving or ready to receive nbd request header */
56 	NBD_IO_RECV_REQ = 0,
57 	/* Receiving write payload */
58 	NBD_IO_RECV_PAYLOAD,
59 	/* Transmitting or ready to transmit nbd response header */
60 	NBD_IO_XMIT_RESP,
61 	/* Transmitting read payload */
62 	NBD_IO_XMIT_PAYLOAD,
63 };
64 
65 struct nbd_io {
66 	struct spdk_nbd_disk	*nbd;
67 	enum nbd_io_state_t	state;
68 
69 	void			*payload;
70 	uint32_t		payload_size;
71 
72 	struct nbd_request	req;
73 	struct nbd_reply	resp;
74 
75 	/*
76 	 * Tracks current progress on reading/writing a request,
77 	 * response, or payload from the nbd socket.
78 	 */
79 	uint32_t		offset;
80 
81 	/* for bdev io_wait */
82 	struct spdk_bdev_io_wait_entry bdev_io_wait;
83 
84 	TAILQ_ENTRY(nbd_io)	tailq;
85 };
86 
87 enum nbd_disk_state_t {
88 	NBD_DISK_STATE_RUNNING = 0,
89 	/* soft disconnection caused by receiving nbd_cmd_disc */
90 	NBD_DISK_STATE_SOFTDISC,
91 	/* hard disconnection caused by mandatory conditions */
92 	NBD_DISK_STATE_HARDDISC,
93 };
94 
95 struct spdk_nbd_disk {
96 	struct spdk_bdev	*bdev;
97 	struct spdk_bdev_desc	*bdev_desc;
98 	struct spdk_io_channel	*ch;
99 	int			dev_fd;
100 	char			*nbd_path;
101 	int			kernel_sp_fd;
102 	int			spdk_sp_fd;
103 	struct spdk_poller	*nbd_poller;
104 	uint32_t		buf_align;
105 
106 	struct nbd_io		*io_in_recv;
107 	TAILQ_HEAD(, nbd_io)	received_io_list;
108 	TAILQ_HEAD(, nbd_io)	executed_io_list;
109 
110 	enum nbd_disk_state_t	state;
111 	/* count of nbd_io in spdk_nbd_disk */
112 	int			io_count;
113 
114 	TAILQ_ENTRY(spdk_nbd_disk)	tailq;
115 };
116 
117 struct spdk_nbd_disk_globals {
118 	TAILQ_HEAD(, spdk_nbd_disk)	disk_head;
119 };
120 
121 static struct spdk_nbd_disk_globals g_spdk_nbd;
122 
123 static int
124 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io);
125 
126 int
127 spdk_nbd_init(void)
128 {
129 	TAILQ_INIT(&g_spdk_nbd.disk_head);
130 
131 	return 0;
132 }
133 
134 void
135 spdk_nbd_fini(void)
136 {
137 	struct spdk_nbd_disk *nbd_idx, *nbd_tmp;
138 
139 	/*
140 	 * Stop running spdk_nbd_disk.
141 	 * Here, nbd removing are unnecessary, but _SAFE variant
142 	 * is needed, since internal nbd_disk_unregister will
143 	 * remove nbd from TAILQ.
144 	 */
145 	TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) {
146 		spdk_nbd_stop(nbd_idx);
147 	}
148 }
149 
150 static int
151 nbd_disk_register(struct spdk_nbd_disk *nbd)
152 {
153 	if (nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
154 		SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path);
155 		return -EBUSY;
156 	}
157 
158 	TAILQ_INSERT_TAIL(&g_spdk_nbd.disk_head, nbd, tailq);
159 
160 	return 0;
161 }
162 
163 static void
164 nbd_disk_unregister(struct spdk_nbd_disk *nbd)
165 {
166 	struct spdk_nbd_disk *nbd_idx, *nbd_tmp;
167 
168 	/*
169 	 * nbd disk may be stopped before registered.
170 	 * check whether it was registered.
171 	 */
172 	TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) {
173 		if (nbd == nbd_idx) {
174 			TAILQ_REMOVE(&g_spdk_nbd.disk_head, nbd_idx, tailq);
175 			break;
176 		}
177 	}
178 }
179 
180 struct spdk_nbd_disk *
181 nbd_disk_find_by_nbd_path(const char *nbd_path)
182 {
183 	struct spdk_nbd_disk *nbd;
184 
185 	/*
186 	 * check whether nbd has already been registered by nbd path.
187 	 */
188 	TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) {
189 		if (!strcmp(nbd->nbd_path, nbd_path)) {
190 			return nbd;
191 		}
192 	}
193 
194 	return NULL;
195 }
196 
197 struct spdk_nbd_disk *nbd_disk_first(void)
198 {
199 	return TAILQ_FIRST(&g_spdk_nbd.disk_head);
200 }
201 
202 struct spdk_nbd_disk *nbd_disk_next(struct spdk_nbd_disk *prev)
203 {
204 	return TAILQ_NEXT(prev, tailq);
205 }
206 
207 const char *
208 nbd_disk_get_nbd_path(struct spdk_nbd_disk *nbd)
209 {
210 	return nbd->nbd_path;
211 }
212 
213 const char *
214 nbd_disk_get_bdev_name(struct spdk_nbd_disk *nbd)
215 {
216 	return spdk_bdev_get_name(nbd->bdev);
217 }
218 
219 void
220 spdk_nbd_write_config_json(struct spdk_json_write_ctx *w)
221 {
222 	struct spdk_nbd_disk *nbd;
223 
224 	spdk_json_write_array_begin(w);
225 
226 	TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) {
227 		spdk_json_write_object_begin(w);
228 
229 		spdk_json_write_named_string(w, "method", "nbd_start_disk");
230 
231 		spdk_json_write_named_object_begin(w, "params");
232 		spdk_json_write_named_string(w, "nbd_device",  nbd_disk_get_nbd_path(nbd));
233 		spdk_json_write_named_string(w, "bdev_name", nbd_disk_get_bdev_name(nbd));
234 		spdk_json_write_object_end(w);
235 
236 		spdk_json_write_object_end(w);
237 	}
238 
239 	spdk_json_write_array_end(w);
240 }
241 
242 void
243 nbd_disconnect(struct spdk_nbd_disk *nbd)
244 {
245 	/*
246 	 * nbd soft-disconnection to terminate transmission phase.
247 	 * After receiving this ioctl command, nbd kernel module will send
248 	 * a NBD_CMD_DISC type io to nbd server in order to inform server.
249 	 */
250 	ioctl(nbd->dev_fd, NBD_DISCONNECT);
251 }
252 
253 static struct nbd_io *
254 nbd_get_io(struct spdk_nbd_disk *nbd)
255 {
256 	struct nbd_io *io;
257 
258 	io = calloc(1, sizeof(*io));
259 	if (!io) {
260 		return NULL;
261 	}
262 
263 	io->nbd = nbd;
264 	to_be32(&io->resp.magic, NBD_REPLY_MAGIC);
265 
266 	nbd->io_count++;
267 
268 	return io;
269 }
270 
271 static void
272 nbd_put_io(struct spdk_nbd_disk *nbd, struct nbd_io *io)
273 {
274 	if (io->payload) {
275 		spdk_free(io->payload);
276 	}
277 	free(io);
278 
279 	nbd->io_count--;
280 }
281 
282 /*
283  * Check whether received nbd_io are all transmitted.
284  *
285  * \return 1 there is still some nbd_io not transmitted.
286  *         0 all nbd_io received are transmitted.
287  */
288 static int
289 nbd_io_xmit_check(struct spdk_nbd_disk *nbd)
290 {
291 	if (nbd->io_count == 0) {
292 		return 0;
293 	} else if (nbd->io_count == 1 && nbd->io_in_recv != NULL) {
294 		return 0;
295 	}
296 
297 	return 1;
298 }
299 
300 /*
301  * Check whether received nbd_io are all executed,
302  * and put back executed nbd_io instead of transmitting them
303  *
304  * \return 1 there is still some nbd_io under executing
305  *         0 all nbd_io gotten are freed.
306  */
307 static int
308 nbd_cleanup_io(struct spdk_nbd_disk *nbd)
309 {
310 	struct nbd_io *io, *io_tmp;
311 
312 	/* free io_in_recv */
313 	if (nbd->io_in_recv != NULL) {
314 		nbd_put_io(nbd, nbd->io_in_recv);
315 		nbd->io_in_recv = NULL;
316 	}
317 
318 	/* free io in received_io_list */
319 	if (!TAILQ_EMPTY(&nbd->received_io_list)) {
320 		TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) {
321 			TAILQ_REMOVE(&nbd->received_io_list, io, tailq);
322 			nbd_put_io(nbd, io);
323 		}
324 	}
325 
326 	/* free io in executed_io_list */
327 	if (!TAILQ_EMPTY(&nbd->executed_io_list)) {
328 		TAILQ_FOREACH_SAFE(io, &nbd->executed_io_list, tailq, io_tmp) {
329 			TAILQ_REMOVE(&nbd->executed_io_list, io, tailq);
330 			nbd_put_io(nbd, io);
331 		}
332 	}
333 
334 	/*
335 	 * Some nbd_io may be under executing in bdev.
336 	 * Wait for their done operation.
337 	 */
338 	if (nbd->io_count != 0) {
339 		return 1;
340 	}
341 
342 	return 0;
343 }
344 
345 static void
346 _nbd_stop(struct spdk_nbd_disk *nbd)
347 {
348 	if (nbd->ch) {
349 		spdk_put_io_channel(nbd->ch);
350 	}
351 
352 	if (nbd->bdev_desc) {
353 		spdk_bdev_close(nbd->bdev_desc);
354 	}
355 
356 	if (nbd->spdk_sp_fd >= 0) {
357 		close(nbd->spdk_sp_fd);
358 	}
359 
360 	if (nbd->kernel_sp_fd >= 0) {
361 		close(nbd->kernel_sp_fd);
362 	}
363 
364 	if (nbd->dev_fd >= 0) {
365 		/* Clear nbd device only if it is occupied by SPDK app */
366 		if (nbd->nbd_path && nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
367 			ioctl(nbd->dev_fd, NBD_CLEAR_QUE);
368 			ioctl(nbd->dev_fd, NBD_CLEAR_SOCK);
369 		}
370 		close(nbd->dev_fd);
371 	}
372 
373 	if (nbd->nbd_path) {
374 		free(nbd->nbd_path);
375 	}
376 
377 	if (nbd->nbd_poller) {
378 		spdk_poller_unregister(&nbd->nbd_poller);
379 	}
380 
381 	nbd_disk_unregister(nbd);
382 
383 	free(nbd);
384 }
385 
386 void
387 spdk_nbd_stop(struct spdk_nbd_disk *nbd)
388 {
389 	if (nbd == NULL) {
390 		return;
391 	}
392 
393 	nbd->state = NBD_DISK_STATE_HARDDISC;
394 
395 	/*
396 	 * Stop action should be called only after all nbd_io are executed.
397 	 */
398 	if (!nbd_cleanup_io(nbd)) {
399 		_nbd_stop(nbd);
400 	}
401 }
402 
403 static int64_t
404 read_from_socket(int fd, void *buf, size_t length)
405 {
406 	ssize_t bytes_read;
407 
408 	bytes_read = read(fd, buf, length);
409 	if (bytes_read == 0) {
410 		return -EIO;
411 	} else if (bytes_read == -1) {
412 		if (errno != EAGAIN) {
413 			return -errno;
414 		}
415 		return 0;
416 	} else {
417 		return bytes_read;
418 	}
419 }
420 
421 static int64_t
422 write_to_socket(int fd, void *buf, size_t length)
423 {
424 	ssize_t bytes_written;
425 
426 	bytes_written = write(fd, buf, length);
427 	if (bytes_written == 0) {
428 		return -EIO;
429 	} else if (bytes_written == -1) {
430 		if (errno != EAGAIN) {
431 			return -errno;
432 		}
433 		return 0;
434 	} else {
435 		return bytes_written;
436 	}
437 }
438 
439 static void
440 nbd_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
441 {
442 	struct nbd_io	*io = cb_arg;
443 	struct spdk_nbd_disk *nbd = io->nbd;
444 
445 	if (success) {
446 		io->resp.error = 0;
447 	} else {
448 		to_be32(&io->resp.error, EIO);
449 	}
450 
451 	memcpy(&io->resp.handle, &io->req.handle, sizeof(io->resp.handle));
452 	TAILQ_INSERT_TAIL(&nbd->executed_io_list, io, tailq);
453 
454 	if (bdev_io != NULL) {
455 		spdk_bdev_free_io(bdev_io);
456 	}
457 
458 	if (nbd->state == NBD_DISK_STATE_HARDDISC && !nbd_cleanup_io(nbd)) {
459 		_nbd_stop(nbd);
460 	}
461 }
462 
463 static void
464 nbd_resubmit_io(void *arg)
465 {
466 	struct nbd_io *io = (struct nbd_io *)arg;
467 	struct spdk_nbd_disk *nbd = io->nbd;
468 	int rc = 0;
469 
470 	rc = nbd_submit_bdev_io(nbd, io);
471 	if (rc) {
472 		SPDK_INFOLOG(nbd, "nbd: io resubmit for dev %s , io_type %d, returned %d.\n",
473 			     nbd_disk_get_bdev_name(nbd), from_be32(&io->req.type), rc);
474 	}
475 }
476 
477 static void
478 nbd_queue_io(struct nbd_io *io)
479 {
480 	int rc;
481 	struct spdk_bdev *bdev = io->nbd->bdev;
482 
483 	io->bdev_io_wait.bdev = bdev;
484 	io->bdev_io_wait.cb_fn = nbd_resubmit_io;
485 	io->bdev_io_wait.cb_arg = io;
486 
487 	rc = spdk_bdev_queue_io_wait(bdev, io->nbd->ch, &io->bdev_io_wait);
488 	if (rc != 0) {
489 		SPDK_ERRLOG("Queue io failed in nbd_queue_io, rc=%d.\n", rc);
490 		nbd_io_done(NULL, false, io);
491 	}
492 }
493 
494 static int
495 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io)
496 {
497 	struct spdk_bdev_desc *desc = nbd->bdev_desc;
498 	struct spdk_io_channel *ch = nbd->ch;
499 	int rc = 0;
500 
501 	switch (from_be32(&io->req.type)) {
502 	case NBD_CMD_READ:
503 		rc = spdk_bdev_read(desc, ch, io->payload, from_be64(&io->req.from),
504 				    io->payload_size, nbd_io_done, io);
505 		break;
506 	case NBD_CMD_WRITE:
507 		rc = spdk_bdev_write(desc, ch, io->payload, from_be64(&io->req.from),
508 				     io->payload_size, nbd_io_done, io);
509 		break;
510 #ifdef NBD_FLAG_SEND_FLUSH
511 	case NBD_CMD_FLUSH:
512 		rc = spdk_bdev_flush(desc, ch, 0,
513 				     spdk_bdev_get_num_blocks(nbd->bdev) * spdk_bdev_get_block_size(nbd->bdev),
514 				     nbd_io_done, io);
515 		break;
516 #endif
517 #ifdef NBD_FLAG_SEND_TRIM
518 	case NBD_CMD_TRIM:
519 		rc = spdk_bdev_unmap(desc, ch, from_be64(&io->req.from),
520 				     from_be32(&io->req.len), nbd_io_done, io);
521 		break;
522 #endif
523 	case NBD_CMD_DISC:
524 		nbd_put_io(nbd, io);
525 		nbd->state = NBD_DISK_STATE_SOFTDISC;
526 		break;
527 	default:
528 		rc = -1;
529 	}
530 
531 	if (rc < 0) {
532 		if (rc == -ENOMEM) {
533 			SPDK_INFOLOG(nbd, "No memory, start to queue io.\n");
534 			nbd_queue_io(io);
535 		} else {
536 			SPDK_ERRLOG("nbd io failed in nbd_queue_io, rc=%d.\n", rc);
537 			nbd_io_done(NULL, false, io);
538 		}
539 	}
540 
541 	return 0;
542 }
543 
544 static int
545 nbd_io_exec(struct spdk_nbd_disk *nbd)
546 {
547 	struct nbd_io *io, *io_tmp;
548 	int io_count = 0;
549 	int ret = 0;
550 
551 	/*
552 	 * For soft disconnection, nbd server must handle all outstanding
553 	 * request before closing connection.
554 	 */
555 	if (nbd->state == NBD_DISK_STATE_HARDDISC) {
556 		return 0;
557 	}
558 
559 	if (!TAILQ_EMPTY(&nbd->received_io_list)) {
560 		TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) {
561 			TAILQ_REMOVE(&nbd->received_io_list, io, tailq);
562 			ret = nbd_submit_bdev_io(nbd, io);
563 			if (ret < 0) {
564 				return ret;
565 			}
566 
567 			io_count++;
568 		}
569 	}
570 
571 	return io_count;
572 }
573 
574 static int
575 nbd_io_recv_internal(struct spdk_nbd_disk *nbd)
576 {
577 	struct nbd_io *io;
578 	int ret = 0;
579 	int received = 0;
580 
581 	if (nbd->io_in_recv == NULL) {
582 		nbd->io_in_recv = nbd_get_io(nbd);
583 		if (!nbd->io_in_recv) {
584 			return -ENOMEM;
585 		}
586 	}
587 
588 	io = nbd->io_in_recv;
589 
590 	if (io->state == NBD_IO_RECV_REQ) {
591 		ret = read_from_socket(nbd->spdk_sp_fd, (char *)&io->req + io->offset,
592 				       sizeof(io->req) - io->offset);
593 		if (ret < 0) {
594 			nbd_put_io(nbd, io);
595 			nbd->io_in_recv = NULL;
596 			return ret;
597 		}
598 
599 		io->offset += ret;
600 		received = ret;
601 
602 		/* request is fully received */
603 		if (io->offset == sizeof(io->req)) {
604 			io->offset = 0;
605 
606 			/* req magic check */
607 			if (from_be32(&io->req.magic) != NBD_REQUEST_MAGIC) {
608 				SPDK_ERRLOG("invalid request magic\n");
609 				nbd_put_io(nbd, io);
610 				nbd->io_in_recv = NULL;
611 				return -EINVAL;
612 			}
613 
614 			/* io except read/write should ignore payload */
615 			if (from_be32(&io->req.type) == NBD_CMD_WRITE ||
616 			    from_be32(&io->req.type) == NBD_CMD_READ) {
617 				io->payload_size = from_be32(&io->req.len);
618 			} else {
619 				io->payload_size = 0;
620 			}
621 
622 			/* io payload allocate */
623 			if (io->payload_size) {
624 				io->payload = spdk_malloc(io->payload_size, nbd->buf_align, NULL,
625 							  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
626 				if (io->payload == NULL) {
627 					SPDK_ERRLOG("could not allocate io->payload of size %d\n", io->payload_size);
628 					nbd_put_io(nbd, io);
629 					nbd->io_in_recv = NULL;
630 					return -ENOMEM;
631 				}
632 			} else {
633 				io->payload = NULL;
634 			}
635 
636 			/* next io step */
637 			if (from_be32(&io->req.type) == NBD_CMD_WRITE) {
638 				io->state = NBD_IO_RECV_PAYLOAD;
639 			} else {
640 				io->state = NBD_IO_XMIT_RESP;
641 				nbd->io_in_recv = NULL;
642 				TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq);
643 			}
644 		}
645 	}
646 
647 	if (io->state == NBD_IO_RECV_PAYLOAD) {
648 		ret = read_from_socket(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset);
649 		if (ret < 0) {
650 			nbd_put_io(nbd, io);
651 			nbd->io_in_recv = NULL;
652 			return ret;
653 		}
654 
655 		io->offset += ret;
656 		received += ret;
657 
658 		/* request payload is fully received */
659 		if (io->offset == io->payload_size) {
660 			io->offset = 0;
661 			io->state = NBD_IO_XMIT_RESP;
662 			nbd->io_in_recv = NULL;
663 			TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq);
664 		}
665 
666 	}
667 
668 	return received;
669 }
670 
671 static int
672 nbd_io_recv(struct spdk_nbd_disk *nbd)
673 {
674 	int i, rc, ret = 0;
675 
676 	/*
677 	 * nbd server should not accept request in both soft and hard
678 	 * disconnect states.
679 	 */
680 	if (nbd->state != NBD_DISK_STATE_RUNNING) {
681 		return 0;
682 	}
683 
684 	for (i = 0; i < GET_IO_LOOP_COUNT; i++) {
685 		rc = nbd_io_recv_internal(nbd);
686 		if (rc < 0) {
687 			return rc;
688 		}
689 		ret += rc;
690 	}
691 
692 	return ret;
693 }
694 
695 static int
696 nbd_io_xmit_internal(struct spdk_nbd_disk *nbd)
697 {
698 	struct nbd_io *io;
699 	int ret = 0;
700 	int sent = 0;
701 
702 	io = TAILQ_FIRST(&nbd->executed_io_list);
703 	if (io == NULL) {
704 		return 0;
705 	}
706 
707 	/* Remove IO from list now assuming it will be completed.  It will be inserted
708 	 *  back to the head if it cannot be completed.  This approach is specifically
709 	 *  taken to work around a scan-build use-after-free mischaracterization.
710 	 */
711 	TAILQ_REMOVE(&nbd->executed_io_list, io, tailq);
712 
713 	/* resp error and handler are already set in io_done */
714 
715 	if (io->state == NBD_IO_XMIT_RESP) {
716 		ret = write_to_socket(nbd->spdk_sp_fd, (char *)&io->resp + io->offset,
717 				      sizeof(io->resp) - io->offset);
718 		if (ret <= 0) {
719 			goto reinsert;
720 		}
721 
722 		io->offset += ret;
723 		sent = ret;
724 
725 		/* response is fully transmitted */
726 		if (io->offset == sizeof(io->resp)) {
727 			io->offset = 0;
728 
729 			/* transmit payload only when NBD_CMD_READ with no resp error */
730 			if (from_be32(&io->req.type) != NBD_CMD_READ || io->resp.error != 0) {
731 				nbd_put_io(nbd, io);
732 				return 0;
733 			} else {
734 				io->state = NBD_IO_XMIT_PAYLOAD;
735 			}
736 		}
737 	}
738 
739 	if (io->state == NBD_IO_XMIT_PAYLOAD) {
740 		ret = write_to_socket(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset);
741 		if (ret <= 0) {
742 			goto reinsert;
743 		}
744 
745 		io->offset += ret;
746 		sent += ret;
747 
748 		/* read payload is fully transmitted */
749 		if (io->offset == io->payload_size) {
750 			nbd_put_io(nbd, io);
751 			return sent;
752 		}
753 	}
754 
755 reinsert:
756 	TAILQ_INSERT_HEAD(&nbd->executed_io_list, io, tailq);
757 	return ret < 0 ? ret : sent;
758 }
759 
760 static int
761 nbd_io_xmit(struct spdk_nbd_disk *nbd)
762 {
763 	int ret = 0;
764 	int rc;
765 
766 	/*
767 	 * For soft disconnection, nbd server must handle all outstanding
768 	 * request before closing connection.
769 	 */
770 	if (nbd->state == NBD_DISK_STATE_HARDDISC) {
771 		return 0;
772 	}
773 
774 	while (!TAILQ_EMPTY(&nbd->executed_io_list)) {
775 		rc = nbd_io_xmit_internal(nbd);
776 		if (rc < 0) {
777 			return rc;
778 		}
779 
780 		ret += rc;
781 	}
782 
783 	/*
784 	 * For soft disconnection, nbd server can close connection after all
785 	 * outstanding request are transmitted.
786 	 */
787 	if (nbd->state == NBD_DISK_STATE_SOFTDISC && !nbd_io_xmit_check(nbd)) {
788 		return -1;
789 	}
790 
791 	return ret;
792 }
793 
794 /**
795  * Poll an NBD instance.
796  *
797  * \return 0 on success or negated errno values on error (e.g. connection closed).
798  */
799 static int
800 _nbd_poll(struct spdk_nbd_disk *nbd)
801 {
802 	int received, sent, executed;
803 
804 	/* transmit executed io first */
805 	sent = nbd_io_xmit(nbd);
806 	if (sent < 0) {
807 		return sent;
808 	}
809 
810 	received = nbd_io_recv(nbd);
811 	if (received < 0) {
812 		return received;
813 	}
814 
815 	executed = nbd_io_exec(nbd);
816 	if (executed < 0) {
817 		return executed;
818 	}
819 
820 	return sent + received + executed;
821 }
822 
823 static int
824 nbd_poll(void *arg)
825 {
826 	struct spdk_nbd_disk *nbd = arg;
827 	int rc;
828 
829 	rc = _nbd_poll(nbd);
830 	if (rc < 0) {
831 		SPDK_INFOLOG(nbd, "nbd_poll() returned %s (%d); closing connection\n",
832 			     spdk_strerror(-rc), rc);
833 		spdk_nbd_stop(nbd);
834 	}
835 
836 	return rc > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
837 }
838 
839 static void *
840 nbd_start_kernel(void *arg)
841 {
842 	int dev_fd = (int)(intptr_t)arg;
843 
844 	spdk_unaffinitize_thread();
845 
846 	/* This will block in the kernel until we close the spdk_sp_fd. */
847 	ioctl(dev_fd, NBD_DO_IT);
848 
849 	pthread_exit(NULL);
850 }
851 
852 static void
853 nbd_bdev_hot_remove(void *remove_ctx)
854 {
855 	struct spdk_nbd_disk *nbd = remove_ctx;
856 
857 	spdk_nbd_stop(nbd);
858 }
859 
860 struct spdk_nbd_start_ctx {
861 	struct spdk_nbd_disk	*nbd;
862 	spdk_nbd_start_cb	cb_fn;
863 	void			*cb_arg;
864 	struct spdk_poller	*poller;
865 	int			polling_count;
866 };
867 
868 static void
869 nbd_start_complete(struct spdk_nbd_start_ctx *ctx)
870 {
871 	int		rc;
872 	pthread_t	tid;
873 	int		flag;
874 
875 	/* Add nbd_disk to the end of disk list */
876 	rc = nbd_disk_register(ctx->nbd);
877 	if (rc != 0) {
878 		SPDK_ERRLOG("Failed to register %s, it should not happen.\n", ctx->nbd->nbd_path);
879 		assert(false);
880 		goto err;
881 	}
882 
883 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_BLKSIZE, spdk_bdev_get_block_size(ctx->nbd->bdev));
884 	if (rc == -1) {
885 		SPDK_ERRLOG("ioctl(NBD_SET_BLKSIZE) failed: %s\n", spdk_strerror(errno));
886 		rc = -errno;
887 		goto err;
888 	}
889 
890 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SIZE_BLOCKS, spdk_bdev_get_num_blocks(ctx->nbd->bdev));
891 	if (rc == -1) {
892 		SPDK_ERRLOG("ioctl(NBD_SET_SIZE_BLOCKS) failed: %s\n", spdk_strerror(errno));
893 		rc = -errno;
894 		goto err;
895 	}
896 
897 #ifdef NBD_FLAG_SEND_TRIM
898 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_FLAGS, NBD_FLAG_SEND_TRIM);
899 	if (rc == -1) {
900 		SPDK_ERRLOG("ioctl(NBD_SET_FLAGS) failed: %s\n", spdk_strerror(errno));
901 		rc = -errno;
902 		goto err;
903 	}
904 #endif
905 
906 	rc = pthread_create(&tid, NULL, nbd_start_kernel, (void *)(intptr_t)ctx->nbd->dev_fd);
907 	if (rc != 0) {
908 		SPDK_ERRLOG("could not create thread: %s\n", spdk_strerror(rc));
909 		rc = -rc;
910 		goto err;
911 	}
912 
913 	rc = pthread_detach(tid);
914 	if (rc != 0) {
915 		SPDK_ERRLOG("could not detach thread for nbd kernel: %s\n", spdk_strerror(rc));
916 		rc = -rc;
917 		goto err;
918 	}
919 
920 	flag = fcntl(ctx->nbd->spdk_sp_fd, F_GETFL);
921 	if (fcntl(ctx->nbd->spdk_sp_fd, F_SETFL, flag | O_NONBLOCK) < 0) {
922 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
923 			    ctx->nbd->spdk_sp_fd, spdk_strerror(errno));
924 		rc = -errno;
925 		goto err;
926 	}
927 
928 	ctx->nbd->nbd_poller = SPDK_POLLER_REGISTER(nbd_poll, ctx->nbd, 0);
929 
930 	if (ctx->cb_fn) {
931 		ctx->cb_fn(ctx->cb_arg, ctx->nbd, 0);
932 	}
933 
934 	free(ctx);
935 	return;
936 
937 err:
938 	spdk_nbd_stop(ctx->nbd);
939 	if (ctx->cb_fn) {
940 		ctx->cb_fn(ctx->cb_arg, NULL, rc);
941 	}
942 	free(ctx);
943 }
944 
945 static int
946 nbd_enable_kernel(void *arg)
947 {
948 	struct spdk_nbd_start_ctx *ctx = arg;
949 	int rc;
950 
951 	/* Declare device setup by this process */
952 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SOCK, ctx->nbd->kernel_sp_fd);
953 	if (rc == -1) {
954 		if (errno == EBUSY && ctx->polling_count-- > 0) {
955 			if (ctx->poller == NULL) {
956 				ctx->poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx,
957 								   NBD_BUSY_POLLING_INTERVAL_US);
958 			}
959 			/* If the kernel is busy, check back later */
960 			return SPDK_POLLER_BUSY;
961 		}
962 
963 		SPDK_ERRLOG("ioctl(NBD_SET_SOCK) failed: %s\n", spdk_strerror(errno));
964 		if (ctx->poller) {
965 			spdk_poller_unregister(&ctx->poller);
966 		}
967 
968 		spdk_nbd_stop(ctx->nbd);
969 
970 		if (ctx->cb_fn) {
971 			ctx->cb_fn(ctx->cb_arg, NULL, -errno);
972 		}
973 
974 		free(ctx);
975 		return SPDK_POLLER_BUSY;
976 	}
977 
978 	if (ctx->poller) {
979 		spdk_poller_unregister(&ctx->poller);
980 	}
981 
982 	nbd_start_complete(ctx);
983 
984 	return SPDK_POLLER_BUSY;
985 }
986 
987 void
988 spdk_nbd_start(const char *bdev_name, const char *nbd_path,
989 	       spdk_nbd_start_cb cb_fn, void *cb_arg)
990 {
991 	struct spdk_nbd_start_ctx	*ctx = NULL;
992 	struct spdk_nbd_disk		*nbd = NULL;
993 	struct spdk_bdev		*bdev;
994 	int				rc;
995 	int				sp[2];
996 
997 	bdev = spdk_bdev_get_by_name(bdev_name);
998 	if (bdev == NULL) {
999 		SPDK_ERRLOG("no bdev %s exists\n", bdev_name);
1000 		rc = -EINVAL;
1001 		goto err;
1002 	}
1003 
1004 	nbd = calloc(1, sizeof(*nbd));
1005 	if (nbd == NULL) {
1006 		rc = -ENOMEM;
1007 		goto err;
1008 	}
1009 
1010 	nbd->dev_fd = -1;
1011 	nbd->spdk_sp_fd = -1;
1012 	nbd->kernel_sp_fd = -1;
1013 
1014 	ctx = calloc(1, sizeof(*ctx));
1015 	if (ctx == NULL) {
1016 		rc = -ENOMEM;
1017 		goto err;
1018 	}
1019 
1020 	ctx->nbd = nbd;
1021 	ctx->cb_fn = cb_fn;
1022 	ctx->cb_arg = cb_arg;
1023 	ctx->polling_count = NBD_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US;
1024 
1025 	rc = spdk_bdev_open(bdev, true, nbd_bdev_hot_remove, nbd, &nbd->bdev_desc);
1026 	if (rc != 0) {
1027 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", spdk_bdev_get_name(bdev), rc);
1028 		goto err;
1029 	}
1030 
1031 	nbd->bdev = bdev;
1032 
1033 	nbd->ch = spdk_bdev_get_io_channel(nbd->bdev_desc);
1034 	nbd->buf_align = spdk_max(spdk_bdev_get_buf_align(bdev), 64);
1035 
1036 	rc = socketpair(AF_UNIX, SOCK_STREAM, 0, sp);
1037 	if (rc != 0) {
1038 		SPDK_ERRLOG("socketpair failed\n");
1039 		rc = -errno;
1040 		goto err;
1041 	}
1042 
1043 	nbd->spdk_sp_fd = sp[0];
1044 	nbd->kernel_sp_fd = sp[1];
1045 	nbd->nbd_path = strdup(nbd_path);
1046 	if (!nbd->nbd_path) {
1047 		SPDK_ERRLOG("strdup allocation failure\n");
1048 		rc = -ENOMEM;
1049 		goto err;
1050 	}
1051 
1052 	TAILQ_INIT(&nbd->received_io_list);
1053 	TAILQ_INIT(&nbd->executed_io_list);
1054 
1055 	/* Make sure nbd_path is not used in this SPDK app */
1056 	if (nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
1057 		SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path);
1058 		rc = -EBUSY;
1059 		goto err;
1060 	}
1061 
1062 	nbd->dev_fd = open(nbd_path, O_RDWR | O_DIRECT);
1063 	if (nbd->dev_fd == -1) {
1064 		SPDK_ERRLOG("open(\"%s\") failed: %s\n", nbd_path, spdk_strerror(errno));
1065 		rc = -errno;
1066 		goto err;
1067 	}
1068 
1069 	SPDK_INFOLOG(nbd, "Enabling kernel access to bdev %s via %s\n",
1070 		     spdk_bdev_get_name(bdev), nbd_path);
1071 
1072 	nbd_enable_kernel(ctx);
1073 	return;
1074 
1075 err:
1076 	free(ctx);
1077 	if (nbd) {
1078 		spdk_nbd_stop(nbd);
1079 	}
1080 
1081 	if (cb_fn) {
1082 		cb_fn(cb_arg, NULL, rc);
1083 	}
1084 }
1085 
1086 const char *
1087 spdk_nbd_get_path(struct spdk_nbd_disk *nbd)
1088 {
1089 	return nbd->nbd_path;
1090 }
1091 
1092 SPDK_LOG_REGISTER_COMPONENT(nbd)
1093