xref: /spdk/lib/nvmf/nvmf.c (revision 8a0a98d35e21f282088edf28b9e8da66ec390e3a)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/bdev.h"
37 #include "spdk/bit_array.h"
38 #include "spdk/conf.h"
39 #include "spdk/thread.h"
40 #include "spdk/nvmf.h"
41 #include "spdk/trace.h"
42 #include "spdk/endian.h"
43 #include "spdk/string.h"
44 
45 #include "spdk_internal/log.h"
46 
47 #include "nvmf_internal.h"
48 #include "transport.h"
49 
50 SPDK_LOG_REGISTER_COMPONENT("nvmf", SPDK_LOG_NVMF)
51 
52 #define SPDK_NVMF_DEFAULT_MAX_QUEUE_DEPTH 128
53 #define SPDK_NVMF_DEFAULT_MAX_QPAIRS_PER_CTRLR 64
54 #define SPDK_NVMF_DEFAULT_IN_CAPSULE_DATA_SIZE 4096
55 #define SPDK_NVMF_DEFAULT_MAX_IO_SIZE 131072
56 #define SPDK_NVMF_DEFAULT_MAX_SUBSYSTEMS 1024
57 #define SPDK_NVMF_DEFAULT_IO_UNIT_SIZE 131072
58 
59 void
60 spdk_nvmf_tgt_opts_init(struct spdk_nvmf_tgt_opts *opts)
61 {
62 	opts->max_queue_depth = SPDK_NVMF_DEFAULT_MAX_QUEUE_DEPTH;
63 	opts->max_qpairs_per_ctrlr = SPDK_NVMF_DEFAULT_MAX_QPAIRS_PER_CTRLR;
64 	opts->in_capsule_data_size = SPDK_NVMF_DEFAULT_IN_CAPSULE_DATA_SIZE;
65 	opts->max_io_size = SPDK_NVMF_DEFAULT_MAX_IO_SIZE;
66 	opts->max_subsystems = SPDK_NVMF_DEFAULT_MAX_SUBSYSTEMS;
67 	opts->io_unit_size = SPDK_NVMF_DEFAULT_IO_UNIT_SIZE;
68 }
69 
70 static int
71 spdk_nvmf_poll_group_poll(void *ctx)
72 {
73 	struct spdk_nvmf_poll_group *group = ctx;
74 	int rc;
75 	int count = 0;
76 	struct spdk_nvmf_transport_poll_group *tgroup;
77 
78 	TAILQ_FOREACH(tgroup, &group->tgroups, link) {
79 		rc = spdk_nvmf_transport_poll_group_poll(tgroup);
80 		if (rc < 0) {
81 			return -1;
82 		}
83 		count += rc;
84 	}
85 
86 	return count;
87 }
88 
89 static int
90 spdk_nvmf_tgt_create_poll_group(void *io_device, void *ctx_buf)
91 {
92 	struct spdk_nvmf_tgt *tgt = io_device;
93 	struct spdk_nvmf_poll_group *group = ctx_buf;
94 	struct spdk_nvmf_transport *transport;
95 	uint32_t sid;
96 
97 	TAILQ_INIT(&group->tgroups);
98 	TAILQ_INIT(&group->qpairs);
99 
100 	TAILQ_FOREACH(transport, &tgt->transports, link) {
101 		spdk_nvmf_poll_group_add_transport(group, transport);
102 	}
103 
104 	group->num_sgroups = tgt->opts.max_subsystems;
105 	group->sgroups = calloc(tgt->opts.max_subsystems, sizeof(struct spdk_nvmf_subsystem_poll_group));
106 	if (!group->sgroups) {
107 		return -1;
108 	}
109 
110 	for (sid = 0; sid < tgt->opts.max_subsystems; sid++) {
111 		struct spdk_nvmf_subsystem *subsystem;
112 
113 		subsystem = tgt->subsystems[sid];
114 		if (!subsystem) {
115 			continue;
116 		}
117 
118 		spdk_nvmf_poll_group_add_subsystem(group, subsystem);
119 	}
120 
121 	group->poller = spdk_poller_register(spdk_nvmf_poll_group_poll, group, 0);
122 	group->thread = spdk_get_thread();
123 
124 	return 0;
125 }
126 
127 static void
128 spdk_nvmf_tgt_destroy_poll_group(void *io_device, void *ctx_buf)
129 {
130 	struct spdk_nvmf_poll_group *group = ctx_buf;
131 	struct spdk_nvmf_qpair *qpair, *qptmp;
132 	struct spdk_nvmf_transport_poll_group *tgroup, *tmp;
133 	struct spdk_nvmf_subsystem_poll_group *sgroup;
134 	uint32_t sid, nsid;
135 
136 	spdk_poller_unregister(&group->poller);
137 
138 	TAILQ_FOREACH_SAFE(qpair, &group->qpairs, link, qptmp) {
139 		spdk_nvmf_qpair_disconnect(qpair);
140 	}
141 
142 	TAILQ_FOREACH_SAFE(tgroup, &group->tgroups, link, tmp) {
143 		TAILQ_REMOVE(&group->tgroups, tgroup, link);
144 		spdk_nvmf_transport_poll_group_destroy(tgroup);
145 	}
146 
147 	for (sid = 0; sid < group->num_sgroups; sid++) {
148 		sgroup = &group->sgroups[sid];
149 
150 		for (nsid = 0; nsid < sgroup->num_channels; nsid++) {
151 			if (sgroup->channels[nsid]) {
152 				spdk_put_io_channel(sgroup->channels[nsid]);
153 				sgroup->channels[nsid] = NULL;
154 			}
155 		}
156 
157 		free(sgroup->channels);
158 	}
159 
160 	free(group->sgroups);
161 }
162 
163 struct spdk_nvmf_tgt *
164 spdk_nvmf_tgt_create(struct spdk_nvmf_tgt_opts *opts)
165 {
166 	struct spdk_nvmf_tgt *tgt;
167 
168 	tgt = calloc(1, sizeof(*tgt));
169 	if (!tgt) {
170 		return NULL;
171 	}
172 
173 	if (!opts) {
174 		spdk_nvmf_tgt_opts_init(&tgt->opts);
175 	} else {
176 		tgt->opts = *opts;
177 	}
178 
179 	if ((tgt->opts.max_io_size % tgt->opts.io_unit_size != 0) ||
180 	    (tgt->opts.max_io_size / tgt->opts.io_unit_size > SPDK_NVMF_MAX_SGL_ENTRIES)) {
181 		SPDK_ERRLOG("Unsupported IO size, MaxIO:%d, UnitIO:%d\n", tgt->opts.max_io_size,
182 			    tgt->opts.io_unit_size);
183 		free(tgt);
184 		return NULL;
185 	}
186 
187 	tgt->discovery_genctr = 0;
188 	tgt->discovery_log_page = NULL;
189 	tgt->discovery_log_page_size = 0;
190 	TAILQ_INIT(&tgt->transports);
191 
192 	tgt->subsystems = calloc(tgt->opts.max_subsystems, sizeof(struct spdk_nvmf_subsystem *));
193 	if (!tgt->subsystems) {
194 		free(tgt);
195 		return NULL;
196 	}
197 
198 	spdk_io_device_register(tgt,
199 				spdk_nvmf_tgt_create_poll_group,
200 				spdk_nvmf_tgt_destroy_poll_group,
201 				sizeof(struct spdk_nvmf_poll_group));
202 
203 	SPDK_DEBUGLOG(SPDK_LOG_NVMF, "Max Queue Pairs Per Controller: %d\n",
204 		      tgt->opts.max_qpairs_per_ctrlr);
205 	SPDK_DEBUGLOG(SPDK_LOG_NVMF, "Max Queue Depth: %d\n", tgt->opts.max_queue_depth);
206 	SPDK_DEBUGLOG(SPDK_LOG_NVMF, "Max In Capsule Data: %d bytes\n",
207 		      tgt->opts.in_capsule_data_size);
208 	SPDK_DEBUGLOG(SPDK_LOG_NVMF, "Max I/O Size: %d bytes\n", tgt->opts.max_io_size);
209 	SPDK_DEBUGLOG(SPDK_LOG_NVMF, "I/O Unit Size: %d bytes\n", tgt->opts.io_unit_size);
210 
211 	return tgt;
212 }
213 
214 static void
215 spdk_nvmf_tgt_destroy_cb(void *io_device)
216 {
217 	struct spdk_nvmf_tgt *tgt = io_device;
218 	struct spdk_nvmf_transport *transport, *transport_tmp;
219 	spdk_nvmf_tgt_destroy_done_fn		*destroy_cb_fn;
220 	void					*destroy_cb_arg;
221 	uint32_t i;
222 
223 	if (tgt->discovery_log_page) {
224 		free(tgt->discovery_log_page);
225 	}
226 
227 	if (tgt->subsystems) {
228 		for (i = 0; i < tgt->opts.max_subsystems; i++) {
229 			if (tgt->subsystems[i]) {
230 				spdk_nvmf_subsystem_destroy(tgt->subsystems[i]);
231 			}
232 		}
233 		free(tgt->subsystems);
234 	}
235 
236 	TAILQ_FOREACH_SAFE(transport, &tgt->transports, link, transport_tmp) {
237 		TAILQ_REMOVE(&tgt->transports, transport, link);
238 		spdk_nvmf_transport_destroy(transport);
239 	}
240 
241 	destroy_cb_fn = tgt->destroy_cb_fn;
242 	destroy_cb_arg = tgt->destroy_cb_arg;
243 
244 	free(tgt);
245 
246 	if (destroy_cb_fn) {
247 		destroy_cb_fn(destroy_cb_arg, 0);
248 	}
249 }
250 
251 void
252 spdk_nvmf_tgt_destroy(struct spdk_nvmf_tgt *tgt,
253 		      spdk_nvmf_tgt_destroy_done_fn cb_fn,
254 		      void *cb_arg)
255 {
256 	tgt->destroy_cb_fn = cb_fn;
257 	tgt->destroy_cb_arg = cb_arg;
258 
259 	spdk_io_device_unregister(tgt, spdk_nvmf_tgt_destroy_cb);
260 }
261 
262 struct spdk_nvmf_tgt_listen_ctx {
263 	struct spdk_nvmf_tgt *tgt;
264 	struct spdk_nvmf_transport *transport;
265 	struct spdk_nvme_transport_id trid;
266 
267 	spdk_nvmf_tgt_listen_done_fn cb_fn;
268 	void *cb_arg;
269 };
270 
271 static void
272 spdk_nvmf_write_subsystem_config_json(struct spdk_json_write_ctx *w,
273 				      struct spdk_nvmf_subsystem *subsystem)
274 {
275 	struct spdk_nvmf_host *host;
276 	struct spdk_nvmf_listener *listener;
277 	const struct spdk_nvme_transport_id *trid;
278 	struct spdk_nvmf_ns *ns;
279 	struct spdk_nvmf_ns_opts ns_opts;
280 	uint32_t max_namespaces;
281 	char uuid_str[SPDK_UUID_STRING_LEN];
282 	const char *trtype;
283 	const char *adrfam;
284 
285 	if (spdk_nvmf_subsystem_get_type(subsystem) != SPDK_NVMF_SUBTYPE_NVME) {
286 		return;
287 	}
288 
289 	/* { */
290 	spdk_json_write_object_begin(w);
291 	spdk_json_write_named_string(w, "method", "construct_nvmf_subsystem");
292 
293 	/*     "params" : { */
294 	spdk_json_write_named_object_begin(w, "params");
295 	spdk_json_write_named_string(w, "nqn", spdk_nvmf_subsystem_get_nqn(subsystem));
296 	spdk_json_write_named_bool(w, "allow_any_host", spdk_nvmf_subsystem_get_allow_any_host(subsystem));
297 
298 	/*         "listen_addresses" : [ */
299 	spdk_json_write_named_array_begin(w, "listen_addresses");
300 	for (listener = spdk_nvmf_subsystem_get_first_listener(subsystem); listener != NULL;
301 	     listener = spdk_nvmf_subsystem_get_next_listener(subsystem, listener)) {
302 		trid = spdk_nvmf_listener_get_trid(listener);
303 
304 		trtype = spdk_nvme_transport_id_trtype_str(trid->trtype);
305 		adrfam = spdk_nvme_transport_id_adrfam_str(trid->adrfam);
306 
307 		/*        { */
308 		spdk_json_write_object_begin(w);
309 		spdk_json_write_named_string(w, "trtype", trtype);
310 		if (adrfam) {
311 			spdk_json_write_named_string(w, "adrfam", adrfam);
312 		}
313 
314 		spdk_json_write_named_string(w, "traddr", trid->traddr);
315 		spdk_json_write_named_string(w, "trsvcid", trid->trsvcid);
316 		spdk_json_write_object_end(w);
317 		/*        } */
318 	}
319 	spdk_json_write_array_end(w);
320 	/*         ] "listen_addresses" */
321 
322 	/*         "hosts" : [ */
323 	spdk_json_write_named_array_begin(w, "hosts");
324 	for (host = spdk_nvmf_subsystem_get_first_host(subsystem); host != NULL;
325 	     host = spdk_nvmf_subsystem_get_next_host(subsystem, host)) {
326 		spdk_json_write_string(w, spdk_nvmf_host_get_nqn(host));
327 	}
328 	spdk_json_write_array_end(w);
329 	/*         ] "hosts" */
330 
331 	spdk_json_write_named_string(w, "serial_number", spdk_nvmf_subsystem_get_sn(subsystem));
332 
333 	max_namespaces = spdk_nvmf_subsystem_get_max_namespaces(subsystem);
334 	if (max_namespaces != 0) {
335 		spdk_json_write_named_uint32(w, "max_namespaces", max_namespaces);
336 	}
337 
338 	/*         "namespaces" : [ */
339 	spdk_json_write_named_array_begin(w, "namespaces");
340 	for (ns = spdk_nvmf_subsystem_get_first_ns(subsystem); ns != NULL;
341 	     ns = spdk_nvmf_subsystem_get_next_ns(subsystem, ns)) {
342 		spdk_nvmf_ns_get_opts(ns, &ns_opts, sizeof(ns_opts));
343 
344 		/*         { */
345 		spdk_json_write_object_begin(w);
346 		spdk_json_write_named_uint32(w, "nsid", spdk_nvmf_ns_get_id(ns));
347 		spdk_json_write_named_string(w, "bdev_name", spdk_bdev_get_name(spdk_nvmf_ns_get_bdev(ns)));
348 
349 		if (!spdk_mem_all_zero(ns_opts.nguid, sizeof(ns_opts.nguid))) {
350 			SPDK_STATIC_ASSERT(sizeof(ns_opts.nguid) == sizeof(uint64_t) * 2, "size mismatch");
351 			spdk_json_write_named_string_fmt(w, "nguid", "%016"PRIX64"%016"PRIX64, from_be64(&ns_opts.nguid[0]),
352 							 from_be64(&ns_opts.nguid[8]));
353 		}
354 
355 		if (!spdk_mem_all_zero(ns_opts.eui64, sizeof(ns_opts.eui64))) {
356 			SPDK_STATIC_ASSERT(sizeof(ns_opts.eui64) == sizeof(uint64_t), "size mismatch");
357 			spdk_json_write_named_string_fmt(w, "eui64", "%016"PRIX64, from_be64(&ns_opts.eui64));
358 		}
359 
360 		if (!spdk_mem_all_zero(&ns_opts.uuid, sizeof(ns_opts.uuid))) {
361 			spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &ns_opts.uuid);
362 			spdk_json_write_named_string(w, "uuid",  uuid_str);
363 		}
364 		/*         } */
365 		spdk_json_write_object_end(w);
366 	}
367 
368 	/*         ] "namespaces" */
369 	spdk_json_write_array_end(w);
370 
371 	/*     } "params" */
372 	spdk_json_write_object_end(w);
373 
374 	/* } */
375 	spdk_json_write_object_end(w);
376 }
377 
378 void
379 spdk_nvmf_tgt_write_config_json(struct spdk_json_write_ctx *w, struct spdk_nvmf_tgt *tgt)
380 {
381 	struct spdk_nvmf_subsystem *subsystem;
382 
383 	spdk_json_write_object_begin(w);
384 	spdk_json_write_named_string(w, "method", "set_nvmf_target_options");
385 
386 	spdk_json_write_named_object_begin(w, "params");
387 	spdk_json_write_named_uint32(w, "max_queue_depth", tgt->opts.max_queue_depth);
388 	spdk_json_write_named_uint32(w, "max_qpairs_per_ctrlr", tgt->opts.max_qpairs_per_ctrlr);
389 	spdk_json_write_named_uint32(w, "in_capsule_data_size", tgt->opts.in_capsule_data_size);
390 	spdk_json_write_named_uint32(w, "max_io_size", tgt->opts.max_io_size);
391 	spdk_json_write_named_uint32(w, "max_subsystems", tgt->opts.max_subsystems);
392 	spdk_json_write_named_uint32(w, "io_unit_size", tgt->opts.io_unit_size);
393 	spdk_json_write_object_end(w);
394 
395 	spdk_json_write_object_end(w);
396 
397 	subsystem = spdk_nvmf_subsystem_get_first(tgt);
398 	while (subsystem) {
399 		spdk_nvmf_write_subsystem_config_json(w, subsystem);
400 		subsystem = spdk_nvmf_subsystem_get_next(subsystem);
401 	}
402 }
403 
404 static void
405 spdk_nvmf_tgt_listen_done(struct spdk_io_channel_iter *i, int status)
406 {
407 	struct spdk_nvmf_tgt_listen_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
408 
409 	ctx->cb_fn(ctx->cb_arg, status);
410 
411 	free(ctx);
412 }
413 
414 static void
415 spdk_nvmf_tgt_listen_add_transport(struct spdk_io_channel_iter *i)
416 {
417 	struct spdk_nvmf_tgt_listen_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
418 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
419 	struct spdk_nvmf_poll_group *group = spdk_io_channel_get_ctx(ch);
420 	int rc;
421 
422 	rc = spdk_nvmf_poll_group_add_transport(group, ctx->transport);
423 	spdk_for_each_channel_continue(i, rc);
424 }
425 
426 void
427 spdk_nvmf_tgt_listen(struct spdk_nvmf_tgt *tgt,
428 		     struct spdk_nvme_transport_id *trid,
429 		     spdk_nvmf_tgt_listen_done_fn cb_fn,
430 		     void *cb_arg)
431 {
432 	struct spdk_nvmf_transport *transport;
433 	int rc;
434 	bool propagate = false;
435 
436 	transport = spdk_nvmf_tgt_get_transport(tgt, trid->trtype);
437 	if (!transport) {
438 		transport = spdk_nvmf_transport_create(tgt, trid->trtype);
439 		if (!transport) {
440 			SPDK_ERRLOG("Transport initialization failed\n");
441 			cb_fn(cb_arg, -EINVAL);
442 			return;
443 		}
444 		TAILQ_INSERT_TAIL(&tgt->transports, transport, link);
445 
446 		propagate = true;
447 	}
448 
449 	rc = spdk_nvmf_transport_listen(transport, trid);
450 	if (rc < 0) {
451 		SPDK_ERRLOG("Unable to listen on address '%s'\n", trid->traddr);
452 		cb_fn(cb_arg, rc);
453 		return;
454 	}
455 
456 	tgt->discovery_genctr++;
457 
458 	if (propagate) {
459 		struct spdk_nvmf_tgt_listen_ctx *ctx;
460 
461 		ctx = calloc(1, sizeof(*ctx));
462 		if (!ctx) {
463 			cb_fn(cb_arg, -ENOMEM);
464 			return;
465 		}
466 
467 		ctx->tgt = tgt;
468 		ctx->transport = transport;
469 		ctx->trid = *trid;
470 		ctx->cb_fn = cb_fn;
471 		ctx->cb_arg = cb_arg;
472 
473 		spdk_for_each_channel(tgt,
474 				      spdk_nvmf_tgt_listen_add_transport,
475 				      ctx,
476 				      spdk_nvmf_tgt_listen_done);
477 	} else {
478 		cb_fn(cb_arg, 0);
479 	}
480 }
481 
482 struct spdk_nvmf_subsystem *
483 spdk_nvmf_tgt_find_subsystem(struct spdk_nvmf_tgt *tgt, const char *subnqn)
484 {
485 	struct spdk_nvmf_subsystem	*subsystem;
486 	uint32_t sid;
487 
488 	if (!subnqn) {
489 		return NULL;
490 	}
491 
492 	for (sid = 0; sid < tgt->opts.max_subsystems; sid++) {
493 		subsystem = tgt->subsystems[sid];
494 		if (subsystem == NULL) {
495 			continue;
496 		}
497 
498 		if (strcmp(subnqn, subsystem->subnqn) == 0) {
499 			return subsystem;
500 		}
501 	}
502 
503 	return NULL;
504 }
505 
506 struct spdk_nvmf_transport *
507 spdk_nvmf_tgt_get_transport(struct spdk_nvmf_tgt *tgt, enum spdk_nvme_transport_type type)
508 {
509 	struct spdk_nvmf_transport *transport;
510 
511 	TAILQ_FOREACH(transport, &tgt->transports, link) {
512 		if (transport->ops->type == type) {
513 			return transport;
514 		}
515 	}
516 
517 	return NULL;
518 }
519 
520 void
521 spdk_nvmf_tgt_accept(struct spdk_nvmf_tgt *tgt, new_qpair_fn cb_fn)
522 {
523 	struct spdk_nvmf_transport *transport, *tmp;
524 
525 	TAILQ_FOREACH_SAFE(transport, &tgt->transports, link, tmp) {
526 		spdk_nvmf_transport_accept(transport, cb_fn);
527 	}
528 }
529 
530 struct spdk_nvmf_poll_group *
531 spdk_nvmf_poll_group_create(struct spdk_nvmf_tgt *tgt)
532 {
533 	struct spdk_io_channel *ch;
534 
535 	ch = spdk_get_io_channel(tgt);
536 	if (!ch) {
537 		SPDK_ERRLOG("Unable to get I/O channel for target\n");
538 		return NULL;
539 	}
540 
541 	return spdk_io_channel_get_ctx(ch);
542 }
543 
544 void
545 spdk_nvmf_poll_group_destroy(struct spdk_nvmf_poll_group *group)
546 {
547 	struct spdk_io_channel *ch;
548 
549 	ch = spdk_io_channel_from_ctx(group);
550 	spdk_put_io_channel(ch);
551 }
552 
553 int
554 spdk_nvmf_poll_group_add(struct spdk_nvmf_poll_group *group,
555 			 struct spdk_nvmf_qpair *qpair)
556 {
557 	int rc = -1;
558 	struct spdk_nvmf_transport_poll_group *tgroup;
559 
560 	TAILQ_INIT(&qpair->outstanding);
561 	qpair->group = group;
562 	qpair->state = SPDK_NVMF_QPAIR_ACTIVATING;
563 
564 	TAILQ_INSERT_TAIL(&group->qpairs, qpair, link);
565 
566 	TAILQ_FOREACH(tgroup, &group->tgroups, link) {
567 		if (tgroup->transport == qpair->transport) {
568 			rc = spdk_nvmf_transport_poll_group_add(tgroup, qpair);
569 			break;
570 		}
571 	}
572 
573 	if (rc == 0) {
574 		qpair->state = SPDK_NVMF_QPAIR_ACTIVE;
575 	} else {
576 		qpair->state = SPDK_NVMF_QPAIR_INACTIVE;
577 	}
578 
579 	return rc;
580 }
581 
582 int
583 spdk_nvmf_poll_group_remove(struct spdk_nvmf_poll_group *group,
584 			    struct spdk_nvmf_qpair *qpair)
585 {
586 	int rc = -1;
587 	struct spdk_nvmf_transport_poll_group *tgroup;
588 
589 	TAILQ_REMOVE(&group->qpairs, qpair, link);
590 
591 	qpair->group = NULL;
592 
593 	TAILQ_FOREACH(tgroup, &group->tgroups, link) {
594 		if (tgroup->transport == qpair->transport) {
595 			rc = spdk_nvmf_transport_poll_group_remove(tgroup, qpair);
596 			break;
597 		}
598 	}
599 
600 	return rc;
601 }
602 
603 static void
604 _spdk_nvmf_ctrlr_free(void *ctx)
605 {
606 	struct spdk_nvmf_ctrlr *ctrlr = ctx;
607 
608 	spdk_nvmf_ctrlr_destruct(ctrlr);
609 }
610 
611 static void
612 _spdk_nvmf_qpair_destroy(void *ctx, int status)
613 {
614 	struct spdk_nvmf_qpair *qpair = ctx;
615 	struct spdk_nvmf_ctrlr *ctrlr = qpair->ctrlr;
616 	uint16_t qid = qpair->qid;
617 	uint32_t count;
618 
619 	spdk_nvmf_poll_group_remove(qpair->group, qpair);
620 
621 	assert(qpair->state == SPDK_NVMF_QPAIR_DEACTIVATING);
622 	qpair->state = SPDK_NVMF_QPAIR_INACTIVE;
623 
624 	spdk_nvmf_transport_qpair_fini(qpair);
625 
626 	if (!ctrlr) {
627 		return;
628 	}
629 
630 	pthread_mutex_lock(&ctrlr->mtx);
631 	spdk_bit_array_clear(ctrlr->qpair_mask, qid);
632 	count = spdk_bit_array_count_set(ctrlr->qpair_mask);
633 	pthread_mutex_unlock(&ctrlr->mtx);
634 
635 	if (count == 0) {
636 		/* If this was the last queue pair on the controller, also send a message
637 		 * to the subsystem to remove the controller. */
638 		spdk_thread_send_msg(ctrlr->subsys->thread, _spdk_nvmf_ctrlr_free, ctrlr);
639 	}
640 }
641 
642 static void
643 _spdk_nvmf_qpair_deactivate(void *ctx)
644 {
645 	struct spdk_nvmf_qpair *qpair = ctx;
646 
647 	if (qpair->state == SPDK_NVMF_QPAIR_DEACTIVATING ||
648 	    qpair->state == SPDK_NVMF_QPAIR_INACTIVE) {
649 		/* This can occur if the connection is killed by the target,
650 		 * which results in a notification that the connection
651 		 * died. */
652 		return;
653 	}
654 
655 	assert(qpair->state == SPDK_NVMF_QPAIR_ACTIVE);
656 	qpair->state = SPDK_NVMF_QPAIR_DEACTIVATING;
657 
658 	/* Check for outstanding I/O */
659 	if (!TAILQ_EMPTY(&qpair->outstanding)) {
660 		qpair->state_cb = _spdk_nvmf_qpair_destroy;
661 		qpair->state_cb_arg = qpair;
662 		return;
663 	}
664 
665 	_spdk_nvmf_qpair_destroy(qpair, 0);
666 }
667 
668 void
669 spdk_nvmf_qpair_disconnect(struct spdk_nvmf_qpair *qpair)
670 {
671 	if (qpair->group->thread == spdk_get_thread()) {
672 		_spdk_nvmf_qpair_deactivate(qpair);
673 	} else {
674 		/* Send a message to the thread that owns this qpair */
675 		spdk_thread_send_msg(qpair->group->thread, _spdk_nvmf_qpair_deactivate, qpair);
676 	}
677 }
678 
679 int
680 spdk_nvmf_poll_group_add_transport(struct spdk_nvmf_poll_group *group,
681 				   struct spdk_nvmf_transport *transport)
682 {
683 	struct spdk_nvmf_transport_poll_group *tgroup;
684 
685 	TAILQ_FOREACH(tgroup, &group->tgroups, link) {
686 		if (tgroup->transport == transport) {
687 			/* Transport already in the poll group */
688 			return 0;
689 		}
690 	}
691 
692 	tgroup = spdk_nvmf_transport_poll_group_create(transport);
693 	if (!tgroup) {
694 		SPDK_ERRLOG("Unable to create poll group for transport\n");
695 		return -1;
696 	}
697 
698 	TAILQ_INSERT_TAIL(&group->tgroups, tgroup, link);
699 
700 	return 0;
701 }
702 
703 static int
704 poll_group_update_subsystem(struct spdk_nvmf_poll_group *group,
705 			    struct spdk_nvmf_subsystem *subsystem)
706 {
707 	struct spdk_nvmf_subsystem_poll_group *sgroup;
708 	uint32_t new_num_channels, old_num_channels;
709 	uint32_t i;
710 	struct spdk_nvmf_ns *ns;
711 
712 	/* Make sure our poll group has memory for this subsystem allocated */
713 	if (subsystem->id >= group->num_sgroups) {
714 		return -ENOMEM;
715 	}
716 
717 	sgroup = &group->sgroups[subsystem->id];
718 
719 	/* Make sure the array of channels is the correct size */
720 	new_num_channels = subsystem->max_nsid;
721 	old_num_channels = sgroup->num_channels;
722 
723 	if (old_num_channels == 0) {
724 		if (new_num_channels > 0) {
725 			/* First allocation */
726 			sgroup->channels = calloc(new_num_channels, sizeof(sgroup->channels[0]));
727 			if (!sgroup->channels) {
728 				return -ENOMEM;
729 			}
730 		}
731 	} else if (new_num_channels > old_num_channels) {
732 		void *buf;
733 
734 		/* Make the array larger */
735 		buf = realloc(sgroup->channels, new_num_channels * sizeof(sgroup->channels[0]));
736 		if (!buf) {
737 			return -ENOMEM;
738 		}
739 
740 		sgroup->channels = buf;
741 
742 		/* Null out the new channels slots */
743 		for (i = old_num_channels; i < new_num_channels; i++) {
744 			sgroup->channels[i] = NULL;
745 		}
746 	} else if (new_num_channels < old_num_channels) {
747 		void *buf;
748 
749 		/* Free the extra I/O channels */
750 		for (i = new_num_channels; i < old_num_channels; i++) {
751 			if (sgroup->channels[i]) {
752 				spdk_put_io_channel(sgroup->channels[i]);
753 				sgroup->channels[i] = NULL;
754 			}
755 		}
756 
757 		/* Make the array smaller */
758 		if (new_num_channels > 0) {
759 			buf = realloc(sgroup->channels, new_num_channels * sizeof(sgroup->channels[0]));
760 			if (!buf) {
761 				return -ENOMEM;
762 			}
763 			sgroup->channels = buf;
764 		} else {
765 			free(sgroup->channels);
766 			sgroup->channels = NULL;
767 		}
768 	}
769 
770 	sgroup->num_channels = new_num_channels;
771 
772 	/* Detect bdevs that were added or removed */
773 	for (i = 0; i < sgroup->num_channels; i++) {
774 		ns = subsystem->ns[i];
775 		if (ns == NULL && sgroup->channels[i] == NULL) {
776 			/* Both NULL. Leave empty */
777 		} else if (ns == NULL && sgroup->channels[i] != NULL) {
778 			/* There was a channel here, but the namespace is gone. */
779 			spdk_put_io_channel(sgroup->channels[i]);
780 			sgroup->channels[i] = NULL;
781 		} else if (ns != NULL && sgroup->channels[i] == NULL) {
782 			/* A namespace appeared but there is no channel yet */
783 			sgroup->channels[i] = spdk_bdev_get_io_channel(ns->desc);
784 		} else {
785 			/* A namespace was present before and didn't change. */
786 		}
787 	}
788 
789 	return 0;
790 }
791 
792 int
793 spdk_nvmf_poll_group_update_subsystem(struct spdk_nvmf_poll_group *group,
794 				      struct spdk_nvmf_subsystem *subsystem)
795 {
796 	return poll_group_update_subsystem(group, subsystem);
797 }
798 
799 int
800 spdk_nvmf_poll_group_add_subsystem(struct spdk_nvmf_poll_group *group,
801 				   struct spdk_nvmf_subsystem *subsystem)
802 {
803 	struct spdk_nvmf_subsystem_poll_group *sgroup;
804 	int rc;
805 
806 	rc = poll_group_update_subsystem(group, subsystem);
807 	if (rc) {
808 		return rc;
809 	}
810 
811 	sgroup = &group->sgroups[subsystem->id];
812 	sgroup->state = SPDK_NVMF_SUBSYSTEM_ACTIVE;
813 	TAILQ_INIT(&sgroup->queued);
814 
815 	return 0;
816 }
817 
818 int
819 spdk_nvmf_poll_group_remove_subsystem(struct spdk_nvmf_poll_group *group,
820 				      struct spdk_nvmf_subsystem *subsystem)
821 {
822 	struct spdk_nvmf_qpair *qpair, *tmp;
823 	struct spdk_nvmf_subsystem_poll_group *sgroup;
824 	uint32_t nsid;
825 
826 	TAILQ_FOREACH_SAFE(qpair, &group->qpairs, link, tmp) {
827 		if (qpair->ctrlr->subsys == subsystem) {
828 			spdk_nvmf_qpair_disconnect(qpair);
829 		}
830 	}
831 
832 	sgroup = &group->sgroups[subsystem->id];
833 	sgroup->state = SPDK_NVMF_SUBSYSTEM_INACTIVE;
834 
835 	for (nsid = 0; nsid < sgroup->num_channels; nsid++) {
836 		if (sgroup->channels[nsid]) {
837 			spdk_put_io_channel(sgroup->channels[nsid]);
838 			sgroup->channels[nsid] = NULL;
839 		}
840 	}
841 
842 	sgroup->num_channels = 0;
843 	free(sgroup->channels);
844 	sgroup->channels = NULL;
845 
846 	return 0;
847 }
848 
849 int
850 spdk_nvmf_poll_group_pause_subsystem(struct spdk_nvmf_poll_group *group,
851 				     struct spdk_nvmf_subsystem *subsystem)
852 {
853 	struct spdk_nvmf_subsystem_poll_group *sgroup;
854 
855 	if (subsystem->id >= group->num_sgroups) {
856 		return -1;
857 	}
858 
859 	sgroup = &group->sgroups[subsystem->id];
860 	if (sgroup == NULL) {
861 		return -1;
862 	}
863 
864 	assert(sgroup->state == SPDK_NVMF_SUBSYSTEM_ACTIVE);
865 	/* TODO: This currently does not quiesce I/O */
866 	sgroup->state = SPDK_NVMF_SUBSYSTEM_PAUSED;
867 
868 	return 0;
869 }
870 
871 int
872 spdk_nvmf_poll_group_resume_subsystem(struct spdk_nvmf_poll_group *group,
873 				      struct spdk_nvmf_subsystem *subsystem)
874 {
875 	struct spdk_nvmf_request *req, *tmp;
876 	struct spdk_nvmf_subsystem_poll_group *sgroup;
877 	int rc;
878 
879 	if (subsystem->id >= group->num_sgroups) {
880 		return -1;
881 	}
882 
883 	sgroup = &group->sgroups[subsystem->id];
884 
885 	assert(sgroup->state == SPDK_NVMF_SUBSYSTEM_PAUSED);
886 
887 	rc = poll_group_update_subsystem(group, subsystem);
888 	if (rc) {
889 		return rc;
890 	}
891 
892 	sgroup->state = SPDK_NVMF_SUBSYSTEM_ACTIVE;
893 
894 	/* Release all queued requests */
895 	TAILQ_FOREACH_SAFE(req, &sgroup->queued, link, tmp) {
896 		TAILQ_REMOVE(&sgroup->queued, req, link);
897 		spdk_nvmf_request_exec(req);
898 	}
899 
900 	return 0;
901 }
902