xref: /spdk/lib/nbd/nbd.c (revision b30d57cdad6d2bc75cc1e4e2ebbcebcb0d98dcfa)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/string.h"
36 
37 #include <linux/nbd.h>
38 
39 #include "spdk/nbd.h"
40 #include "nbd_internal.h"
41 #include "spdk/bdev.h"
42 #include "spdk/endian.h"
43 #include "spdk/env.h"
44 #include "spdk/log.h"
45 #include "spdk/util.h"
46 #include "spdk/thread.h"
47 
48 #include "spdk/queue.h"
49 
50 #define GET_IO_LOOP_COUNT		16
51 #define NBD_BUSY_WAITING_MS		1000
52 #define NBD_BUSY_POLLING_INTERVAL_US	20000
53 
54 enum nbd_io_state_t {
55 	/* Receiving or ready to receive nbd request header */
56 	NBD_IO_RECV_REQ = 0,
57 	/* Receiving write payload */
58 	NBD_IO_RECV_PAYLOAD,
59 	/* Transmitting or ready to transmit nbd response header */
60 	NBD_IO_XMIT_RESP,
61 	/* Transmitting read payload */
62 	NBD_IO_XMIT_PAYLOAD,
63 };
64 
65 struct nbd_io {
66 	struct spdk_nbd_disk	*nbd;
67 	enum nbd_io_state_t	state;
68 
69 	void			*payload;
70 	uint32_t		payload_size;
71 
72 	struct nbd_request	req;
73 	struct nbd_reply	resp;
74 
75 	/*
76 	 * Tracks current progress on reading/writing a request,
77 	 * response, or payload from the nbd socket.
78 	 */
79 	uint32_t		offset;
80 
81 	/* for bdev io_wait */
82 	struct spdk_bdev_io_wait_entry bdev_io_wait;
83 
84 	TAILQ_ENTRY(nbd_io)	tailq;
85 };
86 
87 enum nbd_disk_state_t {
88 	NBD_DISK_STATE_RUNNING = 0,
89 	/* soft disconnection caused by receiving nbd_cmd_disc */
90 	NBD_DISK_STATE_SOFTDISC,
91 	/* hard disconnection caused by mandatory conditions */
92 	NBD_DISK_STATE_HARDDISC,
93 };
94 
95 struct spdk_nbd_disk {
96 	struct spdk_bdev	*bdev;
97 	struct spdk_bdev_desc	*bdev_desc;
98 	struct spdk_io_channel	*ch;
99 	int			dev_fd;
100 	char			*nbd_path;
101 	int			kernel_sp_fd;
102 	int			spdk_sp_fd;
103 	struct spdk_poller	*nbd_poller;
104 	struct spdk_interrupt	*intr;
105 	uint32_t		buf_align;
106 
107 	struct nbd_io		*io_in_recv;
108 	TAILQ_HEAD(, nbd_io)	received_io_list;
109 	TAILQ_HEAD(, nbd_io)	executed_io_list;
110 
111 	enum nbd_disk_state_t	state;
112 	/* count of nbd_io in spdk_nbd_disk */
113 	int			io_count;
114 
115 	TAILQ_ENTRY(spdk_nbd_disk)	tailq;
116 };
117 
118 struct spdk_nbd_disk_globals {
119 	TAILQ_HEAD(, spdk_nbd_disk)	disk_head;
120 };
121 
122 static struct spdk_nbd_disk_globals g_spdk_nbd;
123 static spdk_nbd_fini_cb g_fini_cb_fn;
124 static void *g_fini_cb_arg;
125 
126 static void _nbd_fini(void *arg1);
127 
128 static int
129 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io);
130 
131 int
132 spdk_nbd_init(void)
133 {
134 	TAILQ_INIT(&g_spdk_nbd.disk_head);
135 
136 	return 0;
137 }
138 
139 static void
140 _nbd_stop_async(void *arg)
141 {
142 	struct spdk_nbd_disk *nbd = arg;
143 	int rc;
144 
145 	rc = spdk_nbd_stop(nbd);
146 	if (rc) {
147 		/* spdk_nbd_stop failed because some IO are still executing. Send a message
148 		* to this thread to try again later. */
149 		spdk_thread_send_msg(spdk_get_thread(),
150 				     _nbd_stop_async, nbd);
151 	} else {
152 		_nbd_fini(NULL);
153 	}
154 }
155 
156 static void
157 _nbd_fini(void *arg1)
158 {
159 	struct spdk_nbd_disk *nbd_first;
160 
161 	nbd_first = TAILQ_FIRST(&g_spdk_nbd.disk_head);
162 	if (nbd_first) {
163 		/* Stop running spdk_nbd_disk */
164 		spdk_thread_send_msg(spdk_io_channel_get_thread(nbd_first->ch),
165 				     _nbd_stop_async, nbd_first);
166 	} else {
167 		/* We can directly call final function here, because
168 		 spdk_subsystem_fini_next handles the case: current thread does not equal
169 		 to g_final_thread */
170 		g_fini_cb_fn(g_fini_cb_arg);
171 	}
172 }
173 
174 void
175 spdk_nbd_fini(spdk_nbd_fini_cb cb_fn, void *cb_arg)
176 {
177 	g_fini_cb_fn = cb_fn;
178 	g_fini_cb_arg = cb_arg;
179 
180 	_nbd_fini(NULL);
181 }
182 
183 static int
184 nbd_disk_register(struct spdk_nbd_disk *nbd)
185 {
186 	if (nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
187 		SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path);
188 		return -EBUSY;
189 	}
190 
191 	TAILQ_INSERT_TAIL(&g_spdk_nbd.disk_head, nbd, tailq);
192 
193 	return 0;
194 }
195 
196 static void
197 nbd_disk_unregister(struct spdk_nbd_disk *nbd)
198 {
199 	struct spdk_nbd_disk *nbd_idx, *nbd_tmp;
200 
201 	/*
202 	 * nbd disk may be stopped before registered.
203 	 * check whether it was registered.
204 	 */
205 	TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) {
206 		if (nbd == nbd_idx) {
207 			TAILQ_REMOVE(&g_spdk_nbd.disk_head, nbd_idx, tailq);
208 			break;
209 		}
210 	}
211 }
212 
213 struct spdk_nbd_disk *
214 nbd_disk_find_by_nbd_path(const char *nbd_path)
215 {
216 	struct spdk_nbd_disk *nbd;
217 
218 	/*
219 	 * check whether nbd has already been registered by nbd path.
220 	 */
221 	TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) {
222 		if (!strcmp(nbd->nbd_path, nbd_path)) {
223 			return nbd;
224 		}
225 	}
226 
227 	return NULL;
228 }
229 
230 struct spdk_nbd_disk *nbd_disk_first(void)
231 {
232 	return TAILQ_FIRST(&g_spdk_nbd.disk_head);
233 }
234 
235 struct spdk_nbd_disk *nbd_disk_next(struct spdk_nbd_disk *prev)
236 {
237 	return TAILQ_NEXT(prev, tailq);
238 }
239 
240 const char *
241 nbd_disk_get_nbd_path(struct spdk_nbd_disk *nbd)
242 {
243 	return nbd->nbd_path;
244 }
245 
246 const char *
247 nbd_disk_get_bdev_name(struct spdk_nbd_disk *nbd)
248 {
249 	return spdk_bdev_get_name(nbd->bdev);
250 }
251 
252 void
253 spdk_nbd_write_config_json(struct spdk_json_write_ctx *w)
254 {
255 	struct spdk_nbd_disk *nbd;
256 
257 	spdk_json_write_array_begin(w);
258 
259 	TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) {
260 		spdk_json_write_object_begin(w);
261 
262 		spdk_json_write_named_string(w, "method", "nbd_start_disk");
263 
264 		spdk_json_write_named_object_begin(w, "params");
265 		spdk_json_write_named_string(w, "nbd_device",  nbd_disk_get_nbd_path(nbd));
266 		spdk_json_write_named_string(w, "bdev_name", nbd_disk_get_bdev_name(nbd));
267 		spdk_json_write_object_end(w);
268 
269 		spdk_json_write_object_end(w);
270 	}
271 
272 	spdk_json_write_array_end(w);
273 }
274 
275 void
276 nbd_disconnect(struct spdk_nbd_disk *nbd)
277 {
278 	/*
279 	 * nbd soft-disconnection to terminate transmission phase.
280 	 * After receiving this ioctl command, nbd kernel module will send
281 	 * a NBD_CMD_DISC type io to nbd server in order to inform server.
282 	 */
283 	ioctl(nbd->dev_fd, NBD_DISCONNECT);
284 }
285 
286 static struct nbd_io *
287 nbd_get_io(struct spdk_nbd_disk *nbd)
288 {
289 	struct nbd_io *io;
290 
291 	io = calloc(1, sizeof(*io));
292 	if (!io) {
293 		return NULL;
294 	}
295 
296 	io->nbd = nbd;
297 	to_be32(&io->resp.magic, NBD_REPLY_MAGIC);
298 
299 	nbd->io_count++;
300 
301 	return io;
302 }
303 
304 static void
305 nbd_put_io(struct spdk_nbd_disk *nbd, struct nbd_io *io)
306 {
307 	if (io->payload) {
308 		spdk_free(io->payload);
309 	}
310 	free(io);
311 
312 	nbd->io_count--;
313 }
314 
315 /*
316  * Check whether received nbd_io are all transmitted.
317  *
318  * \return 1 there is still some nbd_io not transmitted.
319  *         0 all nbd_io received are transmitted.
320  */
321 static int
322 nbd_io_xmit_check(struct spdk_nbd_disk *nbd)
323 {
324 	if (nbd->io_count == 0) {
325 		return 0;
326 	} else if (nbd->io_count == 1 && nbd->io_in_recv != NULL) {
327 		return 0;
328 	}
329 
330 	return 1;
331 }
332 
333 /*
334  * Check whether received nbd_io are all executed,
335  * and put back executed nbd_io instead of transmitting them
336  *
337  * \return 1 there is still some nbd_io under executing
338  *         0 all nbd_io gotten are freed.
339  */
340 static int
341 nbd_cleanup_io(struct spdk_nbd_disk *nbd)
342 {
343 	/* free io_in_recv */
344 	if (nbd->io_in_recv != NULL) {
345 		nbd_put_io(nbd, nbd->io_in_recv);
346 		nbd->io_in_recv = NULL;
347 	}
348 
349 	/*
350 	 * Some nbd_io may be under executing in bdev.
351 	 * Wait for their done operation.
352 	 */
353 	if (nbd->io_count != 0) {
354 		return 1;
355 	}
356 
357 	return 0;
358 }
359 
360 static void
361 _nbd_stop(struct spdk_nbd_disk *nbd)
362 {
363 	if (nbd->ch) {
364 		spdk_put_io_channel(nbd->ch);
365 	}
366 
367 	if (nbd->bdev_desc) {
368 		spdk_bdev_close(nbd->bdev_desc);
369 	}
370 
371 	if (nbd->nbd_poller) {
372 		spdk_poller_unregister(&nbd->nbd_poller);
373 	}
374 
375 	if (nbd->intr) {
376 		spdk_interrupt_unregister(&nbd->intr);
377 	}
378 
379 	if (nbd->spdk_sp_fd >= 0) {
380 		close(nbd->spdk_sp_fd);
381 	}
382 
383 	if (nbd->kernel_sp_fd >= 0) {
384 		close(nbd->kernel_sp_fd);
385 	}
386 
387 	if (nbd->dev_fd >= 0) {
388 		/* Clear nbd device only if it is occupied by SPDK app */
389 		if (nbd->nbd_path && nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
390 			ioctl(nbd->dev_fd, NBD_CLEAR_QUE);
391 			ioctl(nbd->dev_fd, NBD_CLEAR_SOCK);
392 		}
393 		close(nbd->dev_fd);
394 	}
395 
396 	if (nbd->nbd_path) {
397 		free(nbd->nbd_path);
398 	}
399 
400 	nbd_disk_unregister(nbd);
401 
402 	free(nbd);
403 }
404 
405 int
406 spdk_nbd_stop(struct spdk_nbd_disk *nbd)
407 {
408 	int rc = 0;
409 
410 	if (nbd == NULL) {
411 		return rc;
412 	}
413 
414 	nbd->state = NBD_DISK_STATE_HARDDISC;
415 
416 	/*
417 	 * Stop action should be called only after all nbd_io are executed.
418 	 */
419 
420 	rc = nbd_cleanup_io(nbd);
421 	if (!rc) {
422 		_nbd_stop(nbd);
423 	}
424 
425 	return rc;
426 }
427 
428 static int64_t
429 nbd_socket_rw(int fd, void *buf, size_t length, bool read_op)
430 {
431 	ssize_t rc;
432 
433 	if (read_op) {
434 		rc = read(fd, buf, length);
435 	} else {
436 		rc = write(fd, buf, length);
437 	}
438 
439 	if (rc == 0) {
440 		return -EIO;
441 	} else if (rc == -1) {
442 		if (errno != EAGAIN) {
443 			return -errno;
444 		}
445 		return 0;
446 	} else {
447 		return rc;
448 	}
449 }
450 
451 static void
452 nbd_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
453 {
454 	struct nbd_io	*io = cb_arg;
455 	struct spdk_nbd_disk *nbd = io->nbd;
456 
457 	if (success) {
458 		io->resp.error = 0;
459 	} else {
460 		to_be32(&io->resp.error, EIO);
461 	}
462 
463 	memcpy(&io->resp.handle, &io->req.handle, sizeof(io->resp.handle));
464 
465 	/* When there begins to have executed_io, enable socket writable notice in order to
466 	 * get it processed in nbd_io_xmit
467 	 */
468 	if (nbd->intr && TAILQ_EMPTY(&nbd->executed_io_list)) {
469 		spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT);
470 	}
471 
472 	TAILQ_INSERT_TAIL(&nbd->executed_io_list, io, tailq);
473 
474 	if (bdev_io != NULL) {
475 		spdk_bdev_free_io(bdev_io);
476 	}
477 
478 	if (nbd->state == NBD_DISK_STATE_HARDDISC && !nbd_cleanup_io(nbd)) {
479 		_nbd_stop(nbd);
480 	}
481 }
482 
483 static void
484 nbd_resubmit_io(void *arg)
485 {
486 	struct nbd_io *io = (struct nbd_io *)arg;
487 	struct spdk_nbd_disk *nbd = io->nbd;
488 	int rc = 0;
489 
490 	rc = nbd_submit_bdev_io(nbd, io);
491 	if (rc) {
492 		SPDK_INFOLOG(nbd, "nbd: io resubmit for dev %s , io_type %d, returned %d.\n",
493 			     nbd_disk_get_bdev_name(nbd), from_be32(&io->req.type), rc);
494 	}
495 }
496 
497 static void
498 nbd_queue_io(struct nbd_io *io)
499 {
500 	int rc;
501 	struct spdk_bdev *bdev = io->nbd->bdev;
502 
503 	io->bdev_io_wait.bdev = bdev;
504 	io->bdev_io_wait.cb_fn = nbd_resubmit_io;
505 	io->bdev_io_wait.cb_arg = io;
506 
507 	rc = spdk_bdev_queue_io_wait(bdev, io->nbd->ch, &io->bdev_io_wait);
508 	if (rc != 0) {
509 		SPDK_ERRLOG("Queue io failed in nbd_queue_io, rc=%d.\n", rc);
510 		nbd_io_done(NULL, false, io);
511 	}
512 }
513 
514 static int
515 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io)
516 {
517 	struct spdk_bdev_desc *desc = nbd->bdev_desc;
518 	struct spdk_io_channel *ch = nbd->ch;
519 	int rc = 0;
520 
521 	switch (from_be32(&io->req.type)) {
522 	case NBD_CMD_READ:
523 		rc = spdk_bdev_read(desc, ch, io->payload, from_be64(&io->req.from),
524 				    io->payload_size, nbd_io_done, io);
525 		break;
526 	case NBD_CMD_WRITE:
527 		rc = spdk_bdev_write(desc, ch, io->payload, from_be64(&io->req.from),
528 				     io->payload_size, nbd_io_done, io);
529 		break;
530 #ifdef NBD_FLAG_SEND_FLUSH
531 	case NBD_CMD_FLUSH:
532 		rc = spdk_bdev_flush(desc, ch, 0,
533 				     spdk_bdev_get_num_blocks(nbd->bdev) * spdk_bdev_get_block_size(nbd->bdev),
534 				     nbd_io_done, io);
535 		break;
536 #endif
537 #ifdef NBD_FLAG_SEND_TRIM
538 	case NBD_CMD_TRIM:
539 		rc = spdk_bdev_unmap(desc, ch, from_be64(&io->req.from),
540 				     from_be32(&io->req.len), nbd_io_done, io);
541 		break;
542 #endif
543 	case NBD_CMD_DISC:
544 		nbd_put_io(nbd, io);
545 		nbd->state = NBD_DISK_STATE_SOFTDISC;
546 
547 		/* when there begins to have executed_io to send, enable socket writable notice */
548 		if (nbd->intr && TAILQ_EMPTY(&nbd->executed_io_list)) {
549 			spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT);
550 		}
551 
552 		break;
553 	default:
554 		rc = -1;
555 	}
556 
557 	if (rc < 0) {
558 		if (rc == -ENOMEM) {
559 			SPDK_INFOLOG(nbd, "No memory, start to queue io.\n");
560 			nbd_queue_io(io);
561 		} else {
562 			SPDK_ERRLOG("nbd io failed in nbd_queue_io, rc=%d.\n", rc);
563 			nbd_io_done(NULL, false, io);
564 		}
565 	}
566 
567 	return 0;
568 }
569 
570 static int
571 nbd_io_exec(struct spdk_nbd_disk *nbd)
572 {
573 	struct nbd_io *io, *io_tmp;
574 	int io_count = 0;
575 	int ret = 0;
576 
577 	if (!TAILQ_EMPTY(&nbd->received_io_list)) {
578 		TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) {
579 			TAILQ_REMOVE(&nbd->received_io_list, io, tailq);
580 			ret = nbd_submit_bdev_io(nbd, io);
581 			if (ret < 0) {
582 				return ret;
583 			}
584 
585 			io_count++;
586 		}
587 	}
588 
589 	return io_count;
590 }
591 
592 static int
593 nbd_io_recv_internal(struct spdk_nbd_disk *nbd)
594 {
595 	struct nbd_io *io;
596 	int ret = 0;
597 	int received = 0;
598 
599 	if (nbd->io_in_recv == NULL) {
600 		nbd->io_in_recv = nbd_get_io(nbd);
601 		if (!nbd->io_in_recv) {
602 			return -ENOMEM;
603 		}
604 	}
605 
606 	io = nbd->io_in_recv;
607 
608 	if (io->state == NBD_IO_RECV_REQ) {
609 		ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->req + io->offset,
610 				    sizeof(io->req) - io->offset, true);
611 		if (ret < 0) {
612 			nbd_put_io(nbd, io);
613 			nbd->io_in_recv = NULL;
614 			return ret;
615 		}
616 
617 		io->offset += ret;
618 		received = ret;
619 
620 		/* request is fully received */
621 		if (io->offset == sizeof(io->req)) {
622 			io->offset = 0;
623 
624 			/* req magic check */
625 			if (from_be32(&io->req.magic) != NBD_REQUEST_MAGIC) {
626 				SPDK_ERRLOG("invalid request magic\n");
627 				nbd_put_io(nbd, io);
628 				nbd->io_in_recv = NULL;
629 				return -EINVAL;
630 			}
631 
632 			/* io except read/write should ignore payload */
633 			if (from_be32(&io->req.type) == NBD_CMD_WRITE ||
634 			    from_be32(&io->req.type) == NBD_CMD_READ) {
635 				io->payload_size = from_be32(&io->req.len);
636 			} else {
637 				io->payload_size = 0;
638 			}
639 
640 			/* io payload allocate */
641 			if (io->payload_size) {
642 				io->payload = spdk_malloc(io->payload_size, nbd->buf_align, NULL,
643 							  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
644 				if (io->payload == NULL) {
645 					SPDK_ERRLOG("could not allocate io->payload of size %d\n", io->payload_size);
646 					nbd_put_io(nbd, io);
647 					nbd->io_in_recv = NULL;
648 					return -ENOMEM;
649 				}
650 			} else {
651 				io->payload = NULL;
652 			}
653 
654 			/* next io step */
655 			if (from_be32(&io->req.type) == NBD_CMD_WRITE) {
656 				io->state = NBD_IO_RECV_PAYLOAD;
657 			} else {
658 				io->state = NBD_IO_XMIT_RESP;
659 				nbd->io_in_recv = NULL;
660 				TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq);
661 			}
662 		}
663 	}
664 
665 	if (io->state == NBD_IO_RECV_PAYLOAD) {
666 		ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset, true);
667 		if (ret < 0) {
668 			nbd_put_io(nbd, io);
669 			nbd->io_in_recv = NULL;
670 			return ret;
671 		}
672 
673 		io->offset += ret;
674 		received += ret;
675 
676 		/* request payload is fully received */
677 		if (io->offset == io->payload_size) {
678 			io->offset = 0;
679 			io->state = NBD_IO_XMIT_RESP;
680 			nbd->io_in_recv = NULL;
681 			TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq);
682 		}
683 
684 	}
685 
686 	return received;
687 }
688 
689 static int
690 nbd_io_recv(struct spdk_nbd_disk *nbd)
691 {
692 	int i, rc, ret = 0;
693 
694 	/*
695 	 * nbd server should not accept request in both soft and hard
696 	 * disconnect states.
697 	 */
698 	if (nbd->state != NBD_DISK_STATE_RUNNING) {
699 		return 0;
700 	}
701 
702 	for (i = 0; i < GET_IO_LOOP_COUNT; i++) {
703 		rc = nbd_io_recv_internal(nbd);
704 		if (rc < 0) {
705 			return rc;
706 		}
707 		ret += rc;
708 	}
709 
710 	return ret;
711 }
712 
713 static int
714 nbd_io_xmit_internal(struct spdk_nbd_disk *nbd)
715 {
716 	struct nbd_io *io;
717 	int ret = 0;
718 	int sent = 0;
719 
720 	io = TAILQ_FIRST(&nbd->executed_io_list);
721 	if (io == NULL) {
722 		return 0;
723 	}
724 
725 	/* Remove IO from list now assuming it will be completed.  It will be inserted
726 	 *  back to the head if it cannot be completed.  This approach is specifically
727 	 *  taken to work around a scan-build use-after-free mischaracterization.
728 	 */
729 	TAILQ_REMOVE(&nbd->executed_io_list, io, tailq);
730 
731 	/* resp error and handler are already set in io_done */
732 
733 	if (io->state == NBD_IO_XMIT_RESP) {
734 		ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->resp + io->offset,
735 				    sizeof(io->resp) - io->offset, false);
736 		if (ret <= 0) {
737 			goto reinsert;
738 		}
739 
740 		io->offset += ret;
741 		sent = ret;
742 
743 		/* response is fully transmitted */
744 		if (io->offset == sizeof(io->resp)) {
745 			io->offset = 0;
746 
747 			/* transmit payload only when NBD_CMD_READ with no resp error */
748 			if (from_be32(&io->req.type) != NBD_CMD_READ || io->resp.error != 0) {
749 				nbd_put_io(nbd, io);
750 				return 0;
751 			} else {
752 				io->state = NBD_IO_XMIT_PAYLOAD;
753 			}
754 		}
755 	}
756 
757 	if (io->state == NBD_IO_XMIT_PAYLOAD) {
758 		ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset,
759 				    false);
760 		if (ret <= 0) {
761 			goto reinsert;
762 		}
763 
764 		io->offset += ret;
765 		sent += ret;
766 
767 		/* read payload is fully transmitted */
768 		if (io->offset == io->payload_size) {
769 			nbd_put_io(nbd, io);
770 			return sent;
771 		}
772 	}
773 
774 reinsert:
775 	TAILQ_INSERT_HEAD(&nbd->executed_io_list, io, tailq);
776 	return ret < 0 ? ret : sent;
777 }
778 
779 static int
780 nbd_io_xmit(struct spdk_nbd_disk *nbd)
781 {
782 	int ret = 0;
783 	int rc;
784 
785 	while (!TAILQ_EMPTY(&nbd->executed_io_list)) {
786 		rc = nbd_io_xmit_internal(nbd);
787 		if (rc < 0) {
788 			return rc;
789 		}
790 
791 		ret += rc;
792 	}
793 
794 	/* When there begins to have no executed_io, disable socket writable notice */
795 	if (nbd->intr) {
796 		spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN);
797 	}
798 
799 	/*
800 	 * For soft disconnection, nbd server can close connection after all
801 	 * outstanding request are transmitted.
802 	 */
803 	if (nbd->state == NBD_DISK_STATE_SOFTDISC && !nbd_io_xmit_check(nbd)) {
804 		return -1;
805 	}
806 
807 	return ret;
808 }
809 
810 /**
811  * Poll an NBD instance.
812  *
813  * \return 0 on success or negated errno values on error (e.g. connection closed).
814  */
815 static int
816 _nbd_poll(struct spdk_nbd_disk *nbd)
817 {
818 	int received, sent, executed;
819 
820 	/* transmit executed io first */
821 	sent = nbd_io_xmit(nbd);
822 	if (sent < 0) {
823 		return sent;
824 	}
825 
826 	received = nbd_io_recv(nbd);
827 	if (received < 0) {
828 		return received;
829 	}
830 
831 	executed = nbd_io_exec(nbd);
832 	if (executed < 0) {
833 		return executed;
834 	}
835 
836 	return sent + received + executed;
837 }
838 
839 static int
840 nbd_poll(void *arg)
841 {
842 	struct spdk_nbd_disk *nbd = arg;
843 	int rc;
844 
845 	rc = _nbd_poll(nbd);
846 	if (rc < 0) {
847 		SPDK_INFOLOG(nbd, "nbd_poll() returned %s (%d); closing connection\n",
848 			     spdk_strerror(-rc), rc);
849 		spdk_nbd_stop(nbd);
850 	}
851 
852 	return rc > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
853 }
854 
855 static void *
856 nbd_start_kernel(void *arg)
857 {
858 	int dev_fd = (int)(intptr_t)arg;
859 
860 	spdk_unaffinitize_thread();
861 
862 	/* This will block in the kernel until we close the spdk_sp_fd. */
863 	ioctl(dev_fd, NBD_DO_IT);
864 
865 	pthread_exit(NULL);
866 }
867 
868 static void
869 nbd_bdev_hot_remove(struct spdk_nbd_disk *nbd)
870 {
871 	spdk_nbd_stop(nbd);
872 }
873 
874 static void
875 nbd_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
876 		  void *event_ctx)
877 {
878 	switch (type) {
879 	case SPDK_BDEV_EVENT_REMOVE:
880 		nbd_bdev_hot_remove(event_ctx);
881 		break;
882 	default:
883 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
884 		break;
885 	}
886 }
887 
888 struct spdk_nbd_start_ctx {
889 	struct spdk_nbd_disk	*nbd;
890 	spdk_nbd_start_cb	cb_fn;
891 	void			*cb_arg;
892 	struct spdk_poller	*poller;
893 	int			polling_count;
894 };
895 
896 static void
897 nbd_start_complete(struct spdk_nbd_start_ctx *ctx)
898 {
899 	int		rc;
900 	pthread_t	tid;
901 	int		flag;
902 
903 	/* Add nbd_disk to the end of disk list */
904 	rc = nbd_disk_register(ctx->nbd);
905 	if (rc != 0) {
906 		SPDK_ERRLOG("Failed to register %s, it should not happen.\n", ctx->nbd->nbd_path);
907 		assert(false);
908 		goto err;
909 	}
910 
911 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_BLKSIZE, spdk_bdev_get_block_size(ctx->nbd->bdev));
912 	if (rc == -1) {
913 		SPDK_ERRLOG("ioctl(NBD_SET_BLKSIZE) failed: %s\n", spdk_strerror(errno));
914 		rc = -errno;
915 		goto err;
916 	}
917 
918 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SIZE_BLOCKS, spdk_bdev_get_num_blocks(ctx->nbd->bdev));
919 	if (rc == -1) {
920 		SPDK_ERRLOG("ioctl(NBD_SET_SIZE_BLOCKS) failed: %s\n", spdk_strerror(errno));
921 		rc = -errno;
922 		goto err;
923 	}
924 
925 #ifdef NBD_FLAG_SEND_TRIM
926 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_FLAGS, NBD_FLAG_SEND_TRIM);
927 	if (rc == -1) {
928 		SPDK_ERRLOG("ioctl(NBD_SET_FLAGS) failed: %s\n", spdk_strerror(errno));
929 		rc = -errno;
930 		goto err;
931 	}
932 #endif
933 
934 	rc = pthread_create(&tid, NULL, nbd_start_kernel, (void *)(intptr_t)ctx->nbd->dev_fd);
935 	if (rc != 0) {
936 		SPDK_ERRLOG("could not create thread: %s\n", spdk_strerror(rc));
937 		rc = -rc;
938 		goto err;
939 	}
940 
941 	rc = pthread_detach(tid);
942 	if (rc != 0) {
943 		SPDK_ERRLOG("could not detach thread for nbd kernel: %s\n", spdk_strerror(rc));
944 		rc = -rc;
945 		goto err;
946 	}
947 
948 	flag = fcntl(ctx->nbd->spdk_sp_fd, F_GETFL);
949 	if (fcntl(ctx->nbd->spdk_sp_fd, F_SETFL, flag | O_NONBLOCK) < 0) {
950 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
951 			    ctx->nbd->spdk_sp_fd, spdk_strerror(errno));
952 		rc = -errno;
953 		goto err;
954 	}
955 
956 	if (spdk_interrupt_mode_is_enabled()) {
957 		ctx->nbd->intr = SPDK_INTERRUPT_REGISTER(ctx->nbd->spdk_sp_fd, nbd_poll, ctx->nbd);
958 	} else {
959 		ctx->nbd->nbd_poller = SPDK_POLLER_REGISTER(nbd_poll, ctx->nbd, 0);
960 	}
961 
962 	if (ctx->cb_fn) {
963 		ctx->cb_fn(ctx->cb_arg, ctx->nbd, 0);
964 	}
965 
966 	free(ctx);
967 	return;
968 
969 err:
970 	spdk_nbd_stop(ctx->nbd);
971 	if (ctx->cb_fn) {
972 		ctx->cb_fn(ctx->cb_arg, NULL, rc);
973 	}
974 	free(ctx);
975 }
976 
977 static int
978 nbd_enable_kernel(void *arg)
979 {
980 	struct spdk_nbd_start_ctx *ctx = arg;
981 	int rc;
982 	int flag;
983 
984 	/* Declare device setup by this process */
985 	rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SOCK, ctx->nbd->kernel_sp_fd);
986 
987 	if (!rc) {
988 		flag = fcntl(ctx->nbd->kernel_sp_fd, F_GETFL);
989 		rc = fcntl(ctx->nbd->kernel_sp_fd, F_SETFL, flag | O_NONBLOCK);
990 		if (rc < 0) {
991 			SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
992 				    ctx->nbd->kernel_sp_fd, spdk_strerror(errno));
993 		}
994 	}
995 
996 	if (rc) {
997 		if (errno == EBUSY && ctx->polling_count-- > 0) {
998 			if (ctx->poller == NULL) {
999 				ctx->poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx,
1000 								   NBD_BUSY_POLLING_INTERVAL_US);
1001 			}
1002 			/* If the kernel is busy, check back later */
1003 			return SPDK_POLLER_BUSY;
1004 		}
1005 
1006 		SPDK_ERRLOG("ioctl(NBD_SET_SOCK) failed: %s\n", spdk_strerror(errno));
1007 		if (ctx->poller) {
1008 			spdk_poller_unregister(&ctx->poller);
1009 		}
1010 
1011 		spdk_nbd_stop(ctx->nbd);
1012 
1013 		if (ctx->cb_fn) {
1014 			ctx->cb_fn(ctx->cb_arg, NULL, -errno);
1015 		}
1016 
1017 		free(ctx);
1018 		return SPDK_POLLER_BUSY;
1019 	}
1020 
1021 	if (ctx->poller) {
1022 		spdk_poller_unregister(&ctx->poller);
1023 	}
1024 
1025 	nbd_start_complete(ctx);
1026 
1027 	return SPDK_POLLER_BUSY;
1028 }
1029 
1030 void
1031 spdk_nbd_start(const char *bdev_name, const char *nbd_path,
1032 	       spdk_nbd_start_cb cb_fn, void *cb_arg)
1033 {
1034 	struct spdk_nbd_start_ctx	*ctx = NULL;
1035 	struct spdk_nbd_disk		*nbd = NULL;
1036 	struct spdk_bdev		*bdev;
1037 	int				rc;
1038 	int				sp[2];
1039 
1040 	nbd = calloc(1, sizeof(*nbd));
1041 	if (nbd == NULL) {
1042 		rc = -ENOMEM;
1043 		goto err;
1044 	}
1045 
1046 	nbd->dev_fd = -1;
1047 	nbd->spdk_sp_fd = -1;
1048 	nbd->kernel_sp_fd = -1;
1049 
1050 	ctx = calloc(1, sizeof(*ctx));
1051 	if (ctx == NULL) {
1052 		rc = -ENOMEM;
1053 		goto err;
1054 	}
1055 
1056 	ctx->nbd = nbd;
1057 	ctx->cb_fn = cb_fn;
1058 	ctx->cb_arg = cb_arg;
1059 	ctx->polling_count = NBD_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US;
1060 
1061 	rc = spdk_bdev_open_ext(bdev_name, true, nbd_bdev_event_cb, nbd, &nbd->bdev_desc);
1062 	if (rc != 0) {
1063 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc);
1064 		goto err;
1065 	}
1066 
1067 	bdev = spdk_bdev_desc_get_bdev(nbd->bdev_desc);
1068 	nbd->bdev = bdev;
1069 
1070 	nbd->ch = spdk_bdev_get_io_channel(nbd->bdev_desc);
1071 	nbd->buf_align = spdk_max(spdk_bdev_get_buf_align(bdev), 64);
1072 
1073 	rc = socketpair(AF_UNIX, SOCK_STREAM, 0, sp);
1074 	if (rc != 0) {
1075 		SPDK_ERRLOG("socketpair failed\n");
1076 		rc = -errno;
1077 		goto err;
1078 	}
1079 
1080 	nbd->spdk_sp_fd = sp[0];
1081 	nbd->kernel_sp_fd = sp[1];
1082 	nbd->nbd_path = strdup(nbd_path);
1083 	if (!nbd->nbd_path) {
1084 		SPDK_ERRLOG("strdup allocation failure\n");
1085 		rc = -ENOMEM;
1086 		goto err;
1087 	}
1088 
1089 	TAILQ_INIT(&nbd->received_io_list);
1090 	TAILQ_INIT(&nbd->executed_io_list);
1091 
1092 	/* Make sure nbd_path is not used in this SPDK app */
1093 	if (nbd_disk_find_by_nbd_path(nbd->nbd_path)) {
1094 		SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path);
1095 		rc = -EBUSY;
1096 		goto err;
1097 	}
1098 
1099 	nbd->dev_fd = open(nbd_path, O_RDWR | O_DIRECT);
1100 	if (nbd->dev_fd == -1) {
1101 		SPDK_ERRLOG("open(\"%s\") failed: %s\n", nbd_path, spdk_strerror(errno));
1102 		rc = -errno;
1103 		goto err;
1104 	}
1105 
1106 	SPDK_INFOLOG(nbd, "Enabling kernel access to bdev %s via %s\n",
1107 		     bdev_name, nbd_path);
1108 
1109 	nbd_enable_kernel(ctx);
1110 	return;
1111 
1112 err:
1113 	free(ctx);
1114 	if (nbd) {
1115 		spdk_nbd_stop(nbd);
1116 	}
1117 
1118 	if (cb_fn) {
1119 		cb_fn(cb_arg, NULL, rc);
1120 	}
1121 }
1122 
1123 const char *
1124 spdk_nbd_get_path(struct spdk_nbd_disk *nbd)
1125 {
1126 	return nbd->nbd_path;
1127 }
1128 
1129 SPDK_LOG_REGISTER_COMPONENT(nbd)
1130