xref: /spdk/lib/nvmf/nvmf.c (revision 4f01d6e5959513c869e0fd554367bf0716e5295c)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation. All rights reserved.
3  *   Copyright (c) 2018-2019, 2021 Mellanox Technologies LTD. All rights reserved.
4  *   Copyright (c) 2021, 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/bdev.h"
10 #include "spdk/bit_array.h"
11 #include "spdk/thread.h"
12 #include "spdk/nvmf.h"
13 #include "spdk/endian.h"
14 #include "spdk/string.h"
15 #include "spdk/log.h"
16 #include "spdk_internal/usdt.h"
17 
18 #include "nvmf_internal.h"
19 #include "transport.h"
20 
21 SPDK_LOG_REGISTER_COMPONENT(nvmf)
22 
23 #define SPDK_NVMF_DEFAULT_MAX_SUBSYSTEMS 1024
24 
25 static TAILQ_HEAD(, spdk_nvmf_tgt) g_nvmf_tgts = TAILQ_HEAD_INITIALIZER(g_nvmf_tgts);
26 
27 typedef void (*nvmf_qpair_disconnect_cpl)(void *ctx, int status);
28 
29 /* supplied to a single call to nvmf_qpair_disconnect */
30 struct nvmf_qpair_disconnect_ctx {
31 	struct spdk_nvmf_qpair *qpair;
32 	struct spdk_nvmf_ctrlr *ctrlr;
33 	nvmf_qpair_disconnect_cb cb_fn;
34 	struct spdk_thread *thread;
35 	void *ctx;
36 	uint16_t qid;
37 };
38 
39 /*
40  * There are several times when we need to iterate through the list of all qpairs and selectively delete them.
41  * In order to do this sequentially without overlap, we must provide a context to recover the next qpair from
42  * to enable calling nvmf_qpair_disconnect on the next desired qpair.
43  */
44 struct nvmf_qpair_disconnect_many_ctx {
45 	struct spdk_nvmf_subsystem *subsystem;
46 	struct spdk_nvmf_poll_group *group;
47 	spdk_nvmf_poll_group_mod_done cpl_fn;
48 	void *cpl_ctx;
49 };
50 
51 static struct spdk_nvmf_referral *
52 nvmf_tgt_find_referral(struct spdk_nvmf_tgt *tgt,
53 		       const struct spdk_nvme_transport_id *trid)
54 {
55 	struct spdk_nvmf_referral *referral;
56 
57 	TAILQ_FOREACH(referral, &tgt->referrals, link) {
58 		if (spdk_nvme_transport_id_compare(&referral->trid, trid) == 0) {
59 			return referral;
60 		}
61 	}
62 
63 	return NULL;
64 }
65 
66 int
67 spdk_nvmf_tgt_add_referral(struct spdk_nvmf_tgt *tgt,
68 			   const struct spdk_nvmf_referral_opts *uopts)
69 {
70 	struct spdk_nvmf_referral *referral;
71 	struct spdk_nvmf_referral_opts opts = {};
72 	struct spdk_nvme_transport_id *trid = &opts.trid;
73 
74 	memcpy(&opts, uopts, spdk_min(uopts->size, sizeof(opts)));
75 
76 	/* If the entry already exists, just ignore it. */
77 	if (nvmf_tgt_find_referral(tgt, trid)) {
78 		return 0;
79 	}
80 
81 	referral = calloc(1, sizeof(*referral));
82 	if (!referral) {
83 		SPDK_ERRLOG("Failed to allocate memory for a referral\n");
84 		return -ENOMEM;
85 	}
86 
87 	referral->entry.subtype = SPDK_NVMF_SUBTYPE_DISCOVERY;
88 	referral->entry.treq.secure_channel = opts.secure_channel ?
89 					      SPDK_NVMF_TREQ_SECURE_CHANNEL_REQUIRED
90 					      : SPDK_NVMF_TREQ_SECURE_CHANNEL_NOT_REQUIRED;
91 	referral->entry.cntlid =
92 		0xffff; /* Discovery controller shall support the dynamic controller model */
93 	referral->entry.trtype = trid->trtype;
94 	referral->entry.adrfam = trid->adrfam;
95 	snprintf(referral->entry.subnqn, sizeof(referral->entry.subnqn), "%s", SPDK_NVMF_DISCOVERY_NQN);
96 	memcpy(&referral->trid, trid, sizeof(struct spdk_nvme_transport_id));
97 	spdk_strcpy_pad(referral->entry.trsvcid, trid->trsvcid, sizeof(referral->entry.trsvcid), ' ');
98 	spdk_strcpy_pad(referral->entry.traddr, trid->traddr, sizeof(referral->entry.traddr), ' ');
99 
100 	TAILQ_INSERT_HEAD(&tgt->referrals, referral, link);
101 	nvmf_update_discovery_log(tgt, NULL);
102 
103 	return 0;
104 }
105 
106 int
107 spdk_nvmf_tgt_remove_referral(struct spdk_nvmf_tgt *tgt,
108 			      const struct spdk_nvmf_referral_opts *uopts)
109 {
110 	struct spdk_nvmf_referral *referral;
111 	struct spdk_nvmf_referral_opts opts = {};
112 
113 	memcpy(&opts, uopts, spdk_min(uopts->size, sizeof(opts)));
114 
115 	referral = nvmf_tgt_find_referral(tgt, &opts.trid);
116 	if (referral == NULL) {
117 		return -ENOENT;
118 	}
119 
120 	TAILQ_REMOVE(&tgt->referrals, referral, link);
121 	nvmf_update_discovery_log(tgt, NULL);
122 
123 	free(referral);
124 
125 	return 0;
126 }
127 
128 static void
129 nvmf_qpair_set_state(struct spdk_nvmf_qpair *qpair,
130 		     enum spdk_nvmf_qpair_state state)
131 {
132 	assert(qpair != NULL);
133 	assert(qpair->group->thread == spdk_get_thread());
134 
135 	qpair->state = state;
136 }
137 
138 static int
139 nvmf_poll_group_poll(void *ctx)
140 {
141 	struct spdk_nvmf_poll_group *group = ctx;
142 	int rc;
143 	int count = 0;
144 	struct spdk_nvmf_transport_poll_group *tgroup;
145 
146 	TAILQ_FOREACH(tgroup, &group->tgroups, link) {
147 		rc = nvmf_transport_poll_group_poll(tgroup);
148 		if (rc < 0) {
149 			return SPDK_POLLER_BUSY;
150 		}
151 		count += rc;
152 	}
153 
154 	return count > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
155 }
156 
157 /*
158  * Reset and clean up the poll group (I/O channel code will actually free the
159  * group).
160  */
161 static void
162 nvmf_tgt_cleanup_poll_group(struct spdk_nvmf_poll_group *group)
163 {
164 	struct spdk_nvmf_transport_poll_group *tgroup, *tmp;
165 	struct spdk_nvmf_subsystem_poll_group *sgroup;
166 	uint32_t sid, nsid;
167 
168 	TAILQ_FOREACH_SAFE(tgroup, &group->tgroups, link, tmp) {
169 		TAILQ_REMOVE(&group->tgroups, tgroup, link);
170 		nvmf_transport_poll_group_destroy(tgroup);
171 	}
172 
173 	for (sid = 0; sid < group->num_sgroups; sid++) {
174 		sgroup = &group->sgroups[sid];
175 
176 		assert(sgroup != NULL);
177 
178 		for (nsid = 0; nsid < sgroup->num_ns; nsid++) {
179 			if (sgroup->ns_info[nsid].channel) {
180 				spdk_put_io_channel(sgroup->ns_info[nsid].channel);
181 				sgroup->ns_info[nsid].channel = NULL;
182 			}
183 		}
184 
185 		free(sgroup->ns_info);
186 	}
187 
188 	free(group->sgroups);
189 
190 	spdk_poller_unregister(&group->poller);
191 
192 	if (group->destroy_cb_fn) {
193 		group->destroy_cb_fn(group->destroy_cb_arg, 0);
194 	}
195 }
196 
197 /*
198  * Callback to unregister a poll group from the target, and clean up its state.
199  */
200 static void
201 nvmf_tgt_destroy_poll_group(void *io_device, void *ctx_buf)
202 {
203 	struct spdk_nvmf_tgt *tgt = io_device;
204 	struct spdk_nvmf_poll_group *group = ctx_buf;
205 
206 	SPDK_DTRACE_PROBE1_TICKS(nvmf_destroy_poll_group, spdk_thread_get_id(group->thread));
207 
208 	pthread_mutex_lock(&tgt->mutex);
209 	TAILQ_REMOVE(&tgt->poll_groups, group, link);
210 	tgt->num_poll_groups--;
211 	pthread_mutex_unlock(&tgt->mutex);
212 
213 	assert(!(tgt->state == NVMF_TGT_PAUSING || tgt->state == NVMF_TGT_RESUMING));
214 	nvmf_tgt_cleanup_poll_group(group);
215 }
216 
217 static int
218 nvmf_poll_group_add_transport(struct spdk_nvmf_poll_group *group,
219 			      struct spdk_nvmf_transport *transport)
220 {
221 	struct spdk_nvmf_transport_poll_group *tgroup;
222 
223 	TAILQ_FOREACH(tgroup, &group->tgroups, link) {
224 		if (tgroup->transport == transport) {
225 			/* Transport already in the poll group */
226 			return 0;
227 		}
228 	}
229 
230 	tgroup = nvmf_transport_poll_group_create(transport, group);
231 	if (!tgroup) {
232 		SPDK_ERRLOG("Unable to create poll group for transport\n");
233 		return -1;
234 	}
235 	SPDK_DTRACE_PROBE2_TICKS(nvmf_transport_poll_group_create, transport,
236 				 spdk_thread_get_id(group->thread));
237 
238 	tgroup->group = group;
239 	TAILQ_INSERT_TAIL(&group->tgroups, tgroup, link);
240 
241 	return 0;
242 }
243 
244 static int
245 nvmf_tgt_create_poll_group(void *io_device, void *ctx_buf)
246 {
247 	struct spdk_nvmf_tgt *tgt = io_device;
248 	struct spdk_nvmf_poll_group *group = ctx_buf;
249 	struct spdk_nvmf_transport *transport;
250 	struct spdk_nvmf_subsystem *subsystem;
251 	struct spdk_thread *thread = spdk_get_thread();
252 	int rc;
253 
254 	group->tgt = tgt;
255 	TAILQ_INIT(&group->tgroups);
256 	TAILQ_INIT(&group->qpairs);
257 	group->thread = thread;
258 	pthread_mutex_init(&group->mutex, NULL);
259 
260 	group->poller = SPDK_POLLER_REGISTER(nvmf_poll_group_poll, group, 0);
261 
262 	SPDK_DTRACE_PROBE1_TICKS(nvmf_create_poll_group, spdk_thread_get_id(thread));
263 
264 	TAILQ_FOREACH(transport, &tgt->transports, link) {
265 		rc = nvmf_poll_group_add_transport(group, transport);
266 		if (rc != 0) {
267 			nvmf_tgt_cleanup_poll_group(group);
268 			return rc;
269 		}
270 	}
271 
272 	group->num_sgroups = tgt->max_subsystems;
273 	group->sgroups = calloc(tgt->max_subsystems, sizeof(struct spdk_nvmf_subsystem_poll_group));
274 	if (!group->sgroups) {
275 		nvmf_tgt_cleanup_poll_group(group);
276 		return -ENOMEM;
277 	}
278 
279 	for (subsystem = spdk_nvmf_subsystem_get_first(tgt);
280 	     subsystem != NULL;
281 	     subsystem = spdk_nvmf_subsystem_get_next(subsystem)) {
282 		if (nvmf_poll_group_add_subsystem(group, subsystem, NULL, NULL) != 0) {
283 			nvmf_tgt_cleanup_poll_group(group);
284 			return -1;
285 		}
286 	}
287 
288 	pthread_mutex_lock(&tgt->mutex);
289 	tgt->num_poll_groups++;
290 	TAILQ_INSERT_TAIL(&tgt->poll_groups, group, link);
291 	pthread_mutex_unlock(&tgt->mutex);
292 
293 	return 0;
294 }
295 
296 static void
297 _nvmf_tgt_disconnect_qpairs(void *ctx)
298 {
299 	struct spdk_nvmf_qpair *qpair, *qpair_tmp;
300 	struct nvmf_qpair_disconnect_many_ctx *qpair_ctx = ctx;
301 	struct spdk_nvmf_poll_group *group = qpair_ctx->group;
302 	struct spdk_io_channel *ch;
303 	int rc;
304 
305 	TAILQ_FOREACH_SAFE(qpair, &group->qpairs, link, qpair_tmp) {
306 		rc = spdk_nvmf_qpair_disconnect(qpair, NULL, NULL);
307 		if (rc && rc != -EINPROGRESS) {
308 			break;
309 		}
310 	}
311 
312 	if (TAILQ_EMPTY(&group->qpairs)) {
313 		/* When the refcount from the channels reaches 0, nvmf_tgt_destroy_poll_group will be called. */
314 		ch = spdk_io_channel_from_ctx(group);
315 		spdk_put_io_channel(ch);
316 		free(qpair_ctx);
317 		return;
318 	}
319 
320 	/* Some qpairs are in process of being disconnected. Send a message and try to remove them again */
321 	spdk_thread_send_msg(spdk_get_thread(), _nvmf_tgt_disconnect_qpairs, ctx);
322 }
323 
324 static void
325 nvmf_tgt_destroy_poll_group_qpairs(struct spdk_nvmf_poll_group *group)
326 {
327 	struct nvmf_qpair_disconnect_many_ctx *ctx;
328 
329 	SPDK_DTRACE_PROBE1_TICKS(nvmf_destroy_poll_group_qpairs, spdk_thread_get_id(group->thread));
330 
331 	ctx = calloc(1, sizeof(struct nvmf_qpair_disconnect_many_ctx));
332 	if (!ctx) {
333 		SPDK_ERRLOG("Failed to allocate memory for destroy poll group ctx\n");
334 		return;
335 	}
336 
337 	ctx->group = group;
338 	_nvmf_tgt_disconnect_qpairs(ctx);
339 }
340 
341 struct spdk_nvmf_tgt *
342 spdk_nvmf_tgt_create(struct spdk_nvmf_target_opts *opts)
343 {
344 	struct spdk_nvmf_tgt *tgt, *tmp_tgt;
345 
346 	if (strnlen(opts->name, NVMF_TGT_NAME_MAX_LENGTH) == NVMF_TGT_NAME_MAX_LENGTH) {
347 		SPDK_ERRLOG("Provided target name exceeds the max length of %u.\n", NVMF_TGT_NAME_MAX_LENGTH);
348 		return NULL;
349 	}
350 
351 	TAILQ_FOREACH(tmp_tgt, &g_nvmf_tgts, link) {
352 		if (!strncmp(opts->name, tmp_tgt->name, NVMF_TGT_NAME_MAX_LENGTH)) {
353 			SPDK_ERRLOG("Provided target name must be unique.\n");
354 			return NULL;
355 		}
356 	}
357 
358 	tgt = calloc(1, sizeof(*tgt));
359 	if (!tgt) {
360 		return NULL;
361 	}
362 
363 	snprintf(tgt->name, NVMF_TGT_NAME_MAX_LENGTH, "%s", opts->name);
364 
365 	if (!opts || !opts->max_subsystems) {
366 		tgt->max_subsystems = SPDK_NVMF_DEFAULT_MAX_SUBSYSTEMS;
367 	} else {
368 		tgt->max_subsystems = opts->max_subsystems;
369 	}
370 
371 	if (!opts) {
372 		tgt->crdt[0] = 0;
373 		tgt->crdt[1] = 0;
374 		tgt->crdt[2] = 0;
375 	} else {
376 		tgt->crdt[0] = opts->crdt[0];
377 		tgt->crdt[1] = opts->crdt[1];
378 		tgt->crdt[2] = opts->crdt[2];
379 	}
380 
381 	if (!opts) {
382 		tgt->discovery_filter = SPDK_NVMF_TGT_DISCOVERY_MATCH_ANY;
383 	} else {
384 		tgt->discovery_filter = opts->discovery_filter;
385 	}
386 
387 	tgt->discovery_genctr = 0;
388 	TAILQ_INIT(&tgt->transports);
389 	TAILQ_INIT(&tgt->poll_groups);
390 	TAILQ_INIT(&tgt->referrals);
391 	tgt->num_poll_groups = 0;
392 
393 	tgt->subsystem_ids = spdk_bit_array_create(tgt->max_subsystems);
394 	if (tgt->subsystem_ids == NULL) {
395 		free(tgt);
396 		return NULL;
397 	}
398 
399 	RB_INIT(&tgt->subsystems);
400 
401 	pthread_mutex_init(&tgt->mutex, NULL);
402 
403 	spdk_io_device_register(tgt,
404 				nvmf_tgt_create_poll_group,
405 				nvmf_tgt_destroy_poll_group,
406 				sizeof(struct spdk_nvmf_poll_group),
407 				tgt->name);
408 
409 	tgt->state = NVMF_TGT_RUNNING;
410 
411 	TAILQ_INSERT_HEAD(&g_nvmf_tgts, tgt, link);
412 
413 	return tgt;
414 }
415 
416 static void
417 _nvmf_tgt_destroy_next_transport(void *ctx)
418 {
419 	struct spdk_nvmf_tgt *tgt = ctx;
420 	struct spdk_nvmf_transport *transport;
421 
422 	if (!TAILQ_EMPTY(&tgt->transports)) {
423 		transport = TAILQ_FIRST(&tgt->transports);
424 		TAILQ_REMOVE(&tgt->transports, transport, link);
425 		spdk_nvmf_transport_destroy(transport, _nvmf_tgt_destroy_next_transport, tgt);
426 	} else {
427 		spdk_nvmf_tgt_destroy_done_fn *destroy_cb_fn = tgt->destroy_cb_fn;
428 		void *destroy_cb_arg = tgt->destroy_cb_arg;
429 
430 		pthread_mutex_destroy(&tgt->mutex);
431 		free(tgt);
432 
433 		if (destroy_cb_fn) {
434 			destroy_cb_fn(destroy_cb_arg, 0);
435 		}
436 	}
437 }
438 
439 static void
440 nvmf_tgt_destroy_cb(void *io_device)
441 {
442 	struct spdk_nvmf_tgt *tgt = io_device;
443 	struct spdk_nvmf_subsystem *subsystem, *subsystem_next;
444 	int rc;
445 	struct spdk_nvmf_referral *referral;
446 
447 	while ((referral = TAILQ_FIRST(&tgt->referrals))) {
448 		TAILQ_REMOVE(&tgt->referrals, referral, link);
449 		free(referral);
450 	}
451 
452 	/* We will be freeing subsystems in this loop, so we always need to get the next one
453 	 * ahead of time, since we can't call get_next() on a subsystem that's been freed.
454 	 */
455 	for (subsystem = spdk_nvmf_subsystem_get_first(tgt),
456 	     subsystem_next = spdk_nvmf_subsystem_get_next(subsystem);
457 	     subsystem != NULL;
458 	     subsystem = subsystem_next,
459 	     subsystem_next = spdk_nvmf_subsystem_get_next(subsystem_next)) {
460 		nvmf_subsystem_remove_all_listeners(subsystem, true);
461 
462 		rc = spdk_nvmf_subsystem_destroy(subsystem, nvmf_tgt_destroy_cb, tgt);
463 		if (rc) {
464 			if (rc == -EINPROGRESS) {
465 				/* If rc is -EINPROGRESS, nvmf_tgt_destroy_cb will be called again when subsystem #i
466 				 * is destroyed, nvmf_tgt_destroy_cb will continue to destroy other subsystems if any */
467 				return;
468 			} else {
469 				SPDK_ERRLOG("Failed to destroy subsystem %s, rc %d\n", subsystem->subnqn, rc);
470 			}
471 		}
472 	}
473 	spdk_bit_array_free(&tgt->subsystem_ids);
474 	_nvmf_tgt_destroy_next_transport(tgt);
475 }
476 
477 void
478 spdk_nvmf_tgt_destroy(struct spdk_nvmf_tgt *tgt,
479 		      spdk_nvmf_tgt_destroy_done_fn cb_fn,
480 		      void *cb_arg)
481 {
482 	assert(!(tgt->state == NVMF_TGT_PAUSING || tgt->state == NVMF_TGT_RESUMING));
483 
484 	tgt->destroy_cb_fn = cb_fn;
485 	tgt->destroy_cb_arg = cb_arg;
486 
487 	TAILQ_REMOVE(&g_nvmf_tgts, tgt, link);
488 
489 	spdk_io_device_unregister(tgt, nvmf_tgt_destroy_cb);
490 }
491 
492 const char *
493 spdk_nvmf_tgt_get_name(struct spdk_nvmf_tgt *tgt)
494 {
495 	return tgt->name;
496 }
497 
498 struct spdk_nvmf_tgt *
499 spdk_nvmf_get_tgt(const char *name)
500 {
501 	struct spdk_nvmf_tgt *tgt;
502 	uint32_t num_targets = 0;
503 
504 	TAILQ_FOREACH(tgt, &g_nvmf_tgts, link) {
505 		if (name) {
506 			if (!strncmp(tgt->name, name, NVMF_TGT_NAME_MAX_LENGTH)) {
507 				return tgt;
508 			}
509 		}
510 		num_targets++;
511 	}
512 
513 	/*
514 	 * special case. If there is only one target and
515 	 * no name was specified, return the only available
516 	 * target. If there is more than one target, name must
517 	 * be specified.
518 	 */
519 	if (!name && num_targets == 1) {
520 		return TAILQ_FIRST(&g_nvmf_tgts);
521 	}
522 
523 	return NULL;
524 }
525 
526 struct spdk_nvmf_tgt *
527 spdk_nvmf_get_first_tgt(void)
528 {
529 	return TAILQ_FIRST(&g_nvmf_tgts);
530 }
531 
532 struct spdk_nvmf_tgt *
533 spdk_nvmf_get_next_tgt(struct spdk_nvmf_tgt *prev)
534 {
535 	return TAILQ_NEXT(prev, link);
536 }
537 
538 static void
539 nvmf_write_subsystem_config_json(struct spdk_json_write_ctx *w,
540 				 struct spdk_nvmf_subsystem *subsystem)
541 {
542 	struct spdk_nvmf_host *host;
543 	struct spdk_nvmf_subsystem_listener *listener;
544 	const struct spdk_nvme_transport_id *trid;
545 	struct spdk_nvmf_ns *ns;
546 	struct spdk_nvmf_ns_opts ns_opts;
547 	uint32_t max_namespaces;
548 	struct spdk_nvmf_transport *transport;
549 
550 	if (spdk_nvmf_subsystem_get_type(subsystem) != SPDK_NVMF_SUBTYPE_NVME) {
551 		return;
552 	}
553 
554 	/* { */
555 	spdk_json_write_object_begin(w);
556 	spdk_json_write_named_string(w, "method", "nvmf_create_subsystem");
557 
558 	/*     "params" : { */
559 	spdk_json_write_named_object_begin(w, "params");
560 	spdk_json_write_named_string(w, "nqn", spdk_nvmf_subsystem_get_nqn(subsystem));
561 	spdk_json_write_named_bool(w, "allow_any_host", spdk_nvmf_subsystem_get_allow_any_host(subsystem));
562 	spdk_json_write_named_string(w, "serial_number", spdk_nvmf_subsystem_get_sn(subsystem));
563 	spdk_json_write_named_string(w, "model_number", spdk_nvmf_subsystem_get_mn(subsystem));
564 
565 	max_namespaces = spdk_nvmf_subsystem_get_max_namespaces(subsystem);
566 	if (max_namespaces != 0) {
567 		spdk_json_write_named_uint32(w, "max_namespaces", max_namespaces);
568 	}
569 
570 	spdk_json_write_named_uint32(w, "min_cntlid", spdk_nvmf_subsystem_get_min_cntlid(subsystem));
571 	spdk_json_write_named_uint32(w, "max_cntlid", spdk_nvmf_subsystem_get_max_cntlid(subsystem));
572 	spdk_json_write_named_bool(w, "ana_reporting", nvmf_subsystem_get_ana_reporting(subsystem));
573 
574 	/*     } "params" */
575 	spdk_json_write_object_end(w);
576 
577 	/* } */
578 	spdk_json_write_object_end(w);
579 
580 	for (host = spdk_nvmf_subsystem_get_first_host(subsystem); host != NULL;
581 	     host = spdk_nvmf_subsystem_get_next_host(subsystem, host)) {
582 
583 		spdk_json_write_object_begin(w);
584 		spdk_json_write_named_string(w, "method", "nvmf_subsystem_add_host");
585 
586 		/*     "params" : { */
587 		spdk_json_write_named_object_begin(w, "params");
588 
589 		spdk_json_write_named_string(w, "nqn", spdk_nvmf_subsystem_get_nqn(subsystem));
590 		spdk_json_write_named_string(w, "host", spdk_nvmf_host_get_nqn(host));
591 
592 		TAILQ_FOREACH(transport, &subsystem->tgt->transports, link) {
593 			if (transport->ops->subsystem_dump_host != NULL) {
594 				transport->ops->subsystem_dump_host(transport, subsystem, host->nqn, w);
595 			}
596 		}
597 
598 		/*     } "params" */
599 		spdk_json_write_object_end(w);
600 
601 		/* } */
602 		spdk_json_write_object_end(w);
603 	}
604 
605 	for (ns = spdk_nvmf_subsystem_get_first_ns(subsystem); ns != NULL;
606 	     ns = spdk_nvmf_subsystem_get_next_ns(subsystem, ns)) {
607 		spdk_nvmf_ns_get_opts(ns, &ns_opts, sizeof(ns_opts));
608 
609 		spdk_json_write_object_begin(w);
610 		spdk_json_write_named_string(w, "method", "nvmf_subsystem_add_ns");
611 
612 		/*     "params" : { */
613 		spdk_json_write_named_object_begin(w, "params");
614 
615 		spdk_json_write_named_string(w, "nqn", spdk_nvmf_subsystem_get_nqn(subsystem));
616 
617 		/*     "namespace" : { */
618 		spdk_json_write_named_object_begin(w, "namespace");
619 
620 		spdk_json_write_named_uint32(w, "nsid", spdk_nvmf_ns_get_id(ns));
621 		spdk_json_write_named_string(w, "bdev_name", spdk_bdev_get_name(spdk_nvmf_ns_get_bdev(ns)));
622 
623 		if (!spdk_mem_all_zero(ns_opts.nguid, sizeof(ns_opts.nguid))) {
624 			SPDK_STATIC_ASSERT(sizeof(ns_opts.nguid) == sizeof(uint64_t) * 2, "size mismatch");
625 			spdk_json_write_named_string_fmt(w, "nguid", "%016"PRIX64"%016"PRIX64, from_be64(&ns_opts.nguid[0]),
626 							 from_be64(&ns_opts.nguid[8]));
627 		}
628 
629 		if (!spdk_mem_all_zero(ns_opts.eui64, sizeof(ns_opts.eui64))) {
630 			SPDK_STATIC_ASSERT(sizeof(ns_opts.eui64) == sizeof(uint64_t), "size mismatch");
631 			spdk_json_write_named_string_fmt(w, "eui64", "%016"PRIX64, from_be64(&ns_opts.eui64));
632 		}
633 
634 		if (!spdk_uuid_is_null(&ns_opts.uuid)) {
635 			spdk_json_write_named_uuid(w, "uuid",  &ns_opts.uuid);
636 		}
637 
638 		if (nvmf_subsystem_get_ana_reporting(subsystem)) {
639 			spdk_json_write_named_uint32(w, "anagrpid", ns_opts.anagrpid);
640 		}
641 
642 		/*     "namespace" */
643 		spdk_json_write_object_end(w);
644 
645 		/*     } "params" */
646 		spdk_json_write_object_end(w);
647 
648 		/* } */
649 		spdk_json_write_object_end(w);
650 	}
651 
652 	for (listener = spdk_nvmf_subsystem_get_first_listener(subsystem); listener != NULL;
653 	     listener = spdk_nvmf_subsystem_get_next_listener(subsystem, listener)) {
654 		transport = listener->transport;
655 		trid = spdk_nvmf_subsystem_listener_get_trid(listener);
656 
657 		spdk_json_write_object_begin(w);
658 		spdk_json_write_named_string(w, "method", "nvmf_subsystem_add_listener");
659 
660 		/*     "params" : { */
661 		spdk_json_write_named_object_begin(w, "params");
662 
663 		spdk_json_write_named_string(w, "nqn", spdk_nvmf_subsystem_get_nqn(subsystem));
664 
665 		spdk_json_write_named_object_begin(w, "listen_address");
666 		nvmf_transport_listen_dump_trid(trid, w);
667 		spdk_json_write_object_end(w);
668 		if (transport->ops->listen_dump_opts) {
669 			transport->ops->listen_dump_opts(transport, trid, w);
670 		}
671 
672 		spdk_json_write_named_bool(w, "secure_channel", listener->opts.secure_channel);
673 
674 		/*     } "params" */
675 		spdk_json_write_object_end(w);
676 
677 		/* } */
678 		spdk_json_write_object_end(w);
679 	}
680 
681 }
682 
683 void
684 spdk_nvmf_tgt_write_config_json(struct spdk_json_write_ctx *w, struct spdk_nvmf_tgt *tgt)
685 {
686 	struct spdk_nvmf_subsystem *subsystem;
687 	struct spdk_nvmf_transport *transport;
688 
689 	spdk_json_write_object_begin(w);
690 	spdk_json_write_named_string(w, "method", "nvmf_set_max_subsystems");
691 
692 	spdk_json_write_named_object_begin(w, "params");
693 	spdk_json_write_named_uint32(w, "max_subsystems", tgt->max_subsystems);
694 	spdk_json_write_object_end(w);
695 
696 	spdk_json_write_object_end(w);
697 
698 	spdk_json_write_object_begin(w);
699 	spdk_json_write_named_string(w, "method", "nvmf_set_crdt");
700 	spdk_json_write_named_object_begin(w, "params");
701 	spdk_json_write_named_uint32(w, "crdt1", tgt->crdt[0]);
702 	spdk_json_write_named_uint32(w, "crdt2", tgt->crdt[1]);
703 	spdk_json_write_named_uint32(w, "crdt3", tgt->crdt[2]);
704 	spdk_json_write_object_end(w);
705 	spdk_json_write_object_end(w);
706 
707 	/* write transports */
708 	TAILQ_FOREACH(transport, &tgt->transports, link) {
709 		spdk_json_write_object_begin(w);
710 		spdk_json_write_named_string(w, "method", "nvmf_create_transport");
711 		nvmf_transport_dump_opts(transport, w, true);
712 		spdk_json_write_object_end(w);
713 	}
714 
715 	subsystem = spdk_nvmf_subsystem_get_first(tgt);
716 	while (subsystem) {
717 		nvmf_write_subsystem_config_json(w, subsystem);
718 		subsystem = spdk_nvmf_subsystem_get_next(subsystem);
719 	}
720 }
721 
722 static void
723 nvmf_listen_opts_copy(struct spdk_nvmf_listen_opts *opts,
724 		      const struct spdk_nvmf_listen_opts *opts_src, size_t opts_size)
725 {
726 	assert(opts);
727 	assert(opts_src);
728 
729 	opts->opts_size = opts_size;
730 
731 #define SET_FIELD(field) \
732     if (offsetof(struct spdk_nvmf_listen_opts, field) + sizeof(opts->field) <= opts_size) { \
733                  opts->field = opts_src->field; \
734     } \
735 
736 	SET_FIELD(transport_specific);
737 	SET_FIELD(secure_channel);
738 #undef SET_FIELD
739 
740 	/* Do not remove this statement, you should always update this statement when you adding a new field,
741 	 * and do not forget to add the SET_FIELD statement for your added field. */
742 	SPDK_STATIC_ASSERT(sizeof(struct spdk_nvmf_listen_opts) == 17, "Incorrect size");
743 }
744 
745 void
746 spdk_nvmf_listen_opts_init(struct spdk_nvmf_listen_opts *opts, size_t opts_size)
747 {
748 	struct spdk_nvmf_listen_opts opts_local = {};
749 
750 	/* local version of opts should have defaults set here */
751 
752 	nvmf_listen_opts_copy(opts, &opts_local, opts_size);
753 }
754 
755 int
756 spdk_nvmf_tgt_listen_ext(struct spdk_nvmf_tgt *tgt, const struct spdk_nvme_transport_id *trid,
757 			 struct spdk_nvmf_listen_opts *opts)
758 {
759 	struct spdk_nvmf_transport *transport;
760 	int rc;
761 	struct spdk_nvmf_listen_opts opts_local = {};
762 
763 	if (!opts) {
764 		SPDK_ERRLOG("opts should not be NULL\n");
765 		return -EINVAL;
766 	}
767 
768 	if (!opts->opts_size) {
769 		SPDK_ERRLOG("The opts_size in opts structure should not be zero\n");
770 		return -EINVAL;
771 	}
772 
773 	transport = spdk_nvmf_tgt_get_transport(tgt, trid->trstring);
774 	if (!transport) {
775 		SPDK_ERRLOG("Unable to find %s transport. The transport must be created first also make sure it is properly registered.\n",
776 			    trid->trstring);
777 		return -EINVAL;
778 	}
779 
780 	nvmf_listen_opts_copy(&opts_local, opts, opts->opts_size);
781 	rc = spdk_nvmf_transport_listen(transport, trid, &opts_local);
782 	if (rc < 0) {
783 		SPDK_ERRLOG("Unable to listen on address '%s'\n", trid->traddr);
784 	}
785 
786 	return rc;
787 }
788 
789 int
790 spdk_nvmf_tgt_stop_listen(struct spdk_nvmf_tgt *tgt,
791 			  struct spdk_nvme_transport_id *trid)
792 {
793 	struct spdk_nvmf_transport *transport;
794 	int rc;
795 
796 	transport = spdk_nvmf_tgt_get_transport(tgt, trid->trstring);
797 	if (!transport) {
798 		SPDK_ERRLOG("Unable to find %s transport. The transport must be created first also make sure it is properly registered.\n",
799 			    trid->trstring);
800 		return -EINVAL;
801 	}
802 
803 	rc = spdk_nvmf_transport_stop_listen(transport, trid);
804 	if (rc < 0) {
805 		SPDK_ERRLOG("Failed to stop listening on address '%s'\n", trid->traddr);
806 		return rc;
807 	}
808 	return 0;
809 }
810 
811 struct spdk_nvmf_tgt_add_transport_ctx {
812 	struct spdk_nvmf_tgt *tgt;
813 	struct spdk_nvmf_transport *transport;
814 	spdk_nvmf_tgt_add_transport_done_fn cb_fn;
815 	void *cb_arg;
816 	int status;
817 };
818 
819 static void
820 _nvmf_tgt_remove_transport_done(struct spdk_io_channel_iter *i, int status)
821 {
822 	struct spdk_nvmf_tgt_add_transport_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
823 
824 	ctx->cb_fn(ctx->cb_arg, ctx->status);
825 	free(ctx);
826 }
827 
828 static void
829 _nvmf_tgt_remove_transport(struct spdk_io_channel_iter *i)
830 {
831 	struct spdk_nvmf_tgt_add_transport_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
832 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
833 	struct spdk_nvmf_poll_group *group = spdk_io_channel_get_ctx(ch);
834 	struct spdk_nvmf_transport_poll_group *tgroup, *tmp;
835 
836 	TAILQ_FOREACH_SAFE(tgroup, &group->tgroups, link, tmp) {
837 		if (tgroup->transport == ctx->transport) {
838 			TAILQ_REMOVE(&group->tgroups, tgroup, link);
839 			nvmf_transport_poll_group_destroy(tgroup);
840 		}
841 	}
842 
843 	spdk_for_each_channel_continue(i, 0);
844 }
845 
846 static void
847 _nvmf_tgt_add_transport_done(struct spdk_io_channel_iter *i, int status)
848 {
849 	struct spdk_nvmf_tgt_add_transport_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
850 
851 	if (status) {
852 		ctx->status = status;
853 		spdk_for_each_channel(ctx->tgt,
854 				      _nvmf_tgt_remove_transport,
855 				      ctx,
856 				      _nvmf_tgt_remove_transport_done);
857 		return;
858 	}
859 
860 	ctx->transport->tgt = ctx->tgt;
861 	TAILQ_INSERT_TAIL(&ctx->tgt->transports, ctx->transport, link);
862 	ctx->cb_fn(ctx->cb_arg, status);
863 	free(ctx);
864 }
865 
866 static void
867 _nvmf_tgt_add_transport(struct spdk_io_channel_iter *i)
868 {
869 	struct spdk_nvmf_tgt_add_transport_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
870 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
871 	struct spdk_nvmf_poll_group *group = spdk_io_channel_get_ctx(ch);
872 	int rc;
873 
874 	rc = nvmf_poll_group_add_transport(group, ctx->transport);
875 	spdk_for_each_channel_continue(i, rc);
876 }
877 
878 void
879 spdk_nvmf_tgt_add_transport(struct spdk_nvmf_tgt *tgt,
880 			    struct spdk_nvmf_transport *transport,
881 			    spdk_nvmf_tgt_add_transport_done_fn cb_fn,
882 			    void *cb_arg)
883 {
884 	struct spdk_nvmf_tgt_add_transport_ctx *ctx;
885 
886 	SPDK_DTRACE_PROBE2_TICKS(nvmf_tgt_add_transport, transport, tgt->name);
887 
888 	if (spdk_nvmf_tgt_get_transport(tgt, transport->ops->name)) {
889 		cb_fn(cb_arg, -EEXIST);
890 		return; /* transport already created */
891 	}
892 
893 	ctx = calloc(1, sizeof(*ctx));
894 	if (!ctx) {
895 		cb_fn(cb_arg, -ENOMEM);
896 		return;
897 	}
898 
899 	ctx->tgt = tgt;
900 	ctx->transport = transport;
901 	ctx->cb_fn = cb_fn;
902 	ctx->cb_arg = cb_arg;
903 
904 	spdk_for_each_channel(tgt,
905 			      _nvmf_tgt_add_transport,
906 			      ctx,
907 			      _nvmf_tgt_add_transport_done);
908 }
909 
910 struct nvmf_tgt_pause_ctx {
911 	struct spdk_nvmf_tgt *tgt;
912 	spdk_nvmf_tgt_pause_polling_cb_fn cb_fn;
913 	void *cb_arg;
914 };
915 
916 static void
917 _nvmf_tgt_pause_polling_done(struct spdk_io_channel_iter *i, int status)
918 {
919 	struct nvmf_tgt_pause_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
920 
921 	ctx->tgt->state = NVMF_TGT_PAUSED;
922 
923 	ctx->cb_fn(ctx->cb_arg, status);
924 	free(ctx);
925 }
926 
927 static void
928 _nvmf_tgt_pause_polling(struct spdk_io_channel_iter *i)
929 {
930 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
931 	struct spdk_nvmf_poll_group *group = spdk_io_channel_get_ctx(ch);
932 
933 	spdk_poller_unregister(&group->poller);
934 
935 	spdk_for_each_channel_continue(i, 0);
936 }
937 
938 int
939 spdk_nvmf_tgt_pause_polling(struct spdk_nvmf_tgt *tgt, spdk_nvmf_tgt_pause_polling_cb_fn cb_fn,
940 			    void *cb_arg)
941 {
942 	struct nvmf_tgt_pause_ctx *ctx;
943 
944 	SPDK_DTRACE_PROBE2_TICKS(nvmf_tgt_pause_polling, tgt, tgt->name);
945 
946 	switch (tgt->state) {
947 	case NVMF_TGT_PAUSING:
948 	case NVMF_TGT_RESUMING:
949 		return -EBUSY;
950 	case NVMF_TGT_RUNNING:
951 		break;
952 	default:
953 		return -EINVAL;
954 	}
955 
956 	ctx = calloc(1, sizeof(*ctx));
957 	if (!ctx) {
958 		return -ENOMEM;
959 	}
960 
961 
962 	tgt->state = NVMF_TGT_PAUSING;
963 
964 	ctx->tgt = tgt;
965 	ctx->cb_fn = cb_fn;
966 	ctx->cb_arg = cb_arg;
967 
968 	spdk_for_each_channel(tgt,
969 			      _nvmf_tgt_pause_polling,
970 			      ctx,
971 			      _nvmf_tgt_pause_polling_done);
972 	return 0;
973 }
974 
975 static void
976 _nvmf_tgt_resume_polling_done(struct spdk_io_channel_iter *i, int status)
977 {
978 	struct nvmf_tgt_pause_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
979 
980 	ctx->tgt->state = NVMF_TGT_RUNNING;
981 
982 	ctx->cb_fn(ctx->cb_arg, status);
983 	free(ctx);
984 }
985 
986 static void
987 _nvmf_tgt_resume_polling(struct spdk_io_channel_iter *i)
988 {
989 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
990 	struct spdk_nvmf_poll_group *group = spdk_io_channel_get_ctx(ch);
991 
992 	assert(group->poller == NULL);
993 	group->poller = SPDK_POLLER_REGISTER(nvmf_poll_group_poll, group, 0);
994 
995 	spdk_for_each_channel_continue(i, 0);
996 }
997 
998 int
999 spdk_nvmf_tgt_resume_polling(struct spdk_nvmf_tgt *tgt, spdk_nvmf_tgt_resume_polling_cb_fn cb_fn,
1000 			     void *cb_arg)
1001 {
1002 	struct nvmf_tgt_pause_ctx *ctx;
1003 
1004 	SPDK_DTRACE_PROBE2_TICKS(nvmf_tgt_resume_polling, tgt, tgt->name);
1005 
1006 	switch (tgt->state) {
1007 	case NVMF_TGT_PAUSING:
1008 	case NVMF_TGT_RESUMING:
1009 		return -EBUSY;
1010 	case NVMF_TGT_PAUSED:
1011 		break;
1012 	default:
1013 		return -EINVAL;
1014 	}
1015 
1016 	ctx = calloc(1, sizeof(*ctx));
1017 	if (!ctx) {
1018 		return -ENOMEM;
1019 	}
1020 
1021 	tgt->state = NVMF_TGT_RESUMING;
1022 
1023 	ctx->tgt = tgt;
1024 	ctx->cb_fn = cb_fn;
1025 	ctx->cb_arg = cb_arg;
1026 
1027 	spdk_for_each_channel(tgt,
1028 			      _nvmf_tgt_resume_polling,
1029 			      ctx,
1030 			      _nvmf_tgt_resume_polling_done);
1031 	return 0;
1032 }
1033 
1034 struct spdk_nvmf_subsystem *
1035 spdk_nvmf_tgt_find_subsystem(struct spdk_nvmf_tgt *tgt, const char *subnqn)
1036 {
1037 	struct spdk_nvmf_subsystem subsystem;
1038 
1039 	if (!subnqn) {
1040 		return NULL;
1041 	}
1042 
1043 	/* Ensure that subnqn is null terminated */
1044 	if (!memchr(subnqn, '\0', SPDK_NVMF_NQN_MAX_LEN + 1)) {
1045 		SPDK_ERRLOG("Connect SUBNQN is not null terminated\n");
1046 		return NULL;
1047 	}
1048 
1049 	snprintf(subsystem.subnqn, sizeof(subsystem.subnqn), "%s", subnqn);
1050 	return RB_FIND(subsystem_tree, &tgt->subsystems, &subsystem);
1051 }
1052 
1053 struct spdk_nvmf_transport *
1054 spdk_nvmf_tgt_get_transport(struct spdk_nvmf_tgt *tgt, const char *transport_name)
1055 {
1056 	struct spdk_nvmf_transport *transport;
1057 
1058 	TAILQ_FOREACH(transport, &tgt->transports, link) {
1059 		if (!strncasecmp(transport->ops->name, transport_name, SPDK_NVMF_TRSTRING_MAX_LEN)) {
1060 			return transport;
1061 		}
1062 	}
1063 	return NULL;
1064 }
1065 
1066 struct nvmf_new_qpair_ctx {
1067 	struct spdk_nvmf_qpair *qpair;
1068 	struct spdk_nvmf_poll_group *group;
1069 };
1070 
1071 static void
1072 _nvmf_poll_group_add(void *_ctx)
1073 {
1074 	struct nvmf_new_qpair_ctx *ctx = _ctx;
1075 	struct spdk_nvmf_qpair *qpair = ctx->qpair;
1076 	struct spdk_nvmf_poll_group *group = ctx->group;
1077 
1078 	free(_ctx);
1079 
1080 	if (spdk_nvmf_poll_group_add(group, qpair) != 0) {
1081 		SPDK_ERRLOG("Unable to add the qpair to a poll group.\n");
1082 		spdk_nvmf_qpair_disconnect(qpair, NULL, NULL);
1083 	}
1084 }
1085 
1086 void
1087 spdk_nvmf_tgt_new_qpair(struct spdk_nvmf_tgt *tgt, struct spdk_nvmf_qpair *qpair)
1088 {
1089 	struct spdk_nvmf_poll_group *group;
1090 	struct nvmf_new_qpair_ctx *ctx;
1091 
1092 	group = spdk_nvmf_get_optimal_poll_group(qpair);
1093 	if (group == NULL) {
1094 		if (tgt->next_poll_group == NULL) {
1095 			tgt->next_poll_group = TAILQ_FIRST(&tgt->poll_groups);
1096 			if (tgt->next_poll_group == NULL) {
1097 				SPDK_ERRLOG("No poll groups exist.\n");
1098 				spdk_nvmf_qpair_disconnect(qpair, NULL, NULL);
1099 				return;
1100 			}
1101 		}
1102 		group = tgt->next_poll_group;
1103 		tgt->next_poll_group = TAILQ_NEXT(group, link);
1104 	}
1105 
1106 	ctx = calloc(1, sizeof(*ctx));
1107 	if (!ctx) {
1108 		SPDK_ERRLOG("Unable to send message to poll group.\n");
1109 		spdk_nvmf_qpair_disconnect(qpair, NULL, NULL);
1110 		return;
1111 	}
1112 
1113 	ctx->qpair = qpair;
1114 	ctx->group = group;
1115 
1116 	pthread_mutex_lock(&group->mutex);
1117 	group->current_unassociated_qpairs++;
1118 	pthread_mutex_unlock(&group->mutex);
1119 
1120 	spdk_thread_send_msg(group->thread, _nvmf_poll_group_add, ctx);
1121 }
1122 
1123 struct spdk_nvmf_poll_group *
1124 spdk_nvmf_poll_group_create(struct spdk_nvmf_tgt *tgt)
1125 {
1126 	struct spdk_io_channel *ch;
1127 
1128 	ch = spdk_get_io_channel(tgt);
1129 	if (!ch) {
1130 		SPDK_ERRLOG("Unable to get I/O channel for target\n");
1131 		return NULL;
1132 	}
1133 
1134 	return spdk_io_channel_get_ctx(ch);
1135 }
1136 
1137 void
1138 spdk_nvmf_poll_group_destroy(struct spdk_nvmf_poll_group *group,
1139 			     spdk_nvmf_poll_group_destroy_done_fn cb_fn,
1140 			     void *cb_arg)
1141 {
1142 	assert(group->destroy_cb_fn == NULL);
1143 	group->destroy_cb_fn = cb_fn;
1144 	group->destroy_cb_arg = cb_arg;
1145 
1146 	/* This function will put the io_channel associated with this poll group */
1147 	nvmf_tgt_destroy_poll_group_qpairs(group);
1148 }
1149 
1150 int
1151 spdk_nvmf_poll_group_add(struct spdk_nvmf_poll_group *group,
1152 			 struct spdk_nvmf_qpair *qpair)
1153 {
1154 	int rc = -1;
1155 	struct spdk_nvmf_transport_poll_group *tgroup;
1156 
1157 	TAILQ_INIT(&qpair->outstanding);
1158 	qpair->group = group;
1159 	qpair->ctrlr = NULL;
1160 	qpair->disconnect_started = false;
1161 
1162 	TAILQ_FOREACH(tgroup, &group->tgroups, link) {
1163 		if (tgroup->transport == qpair->transport) {
1164 			rc = nvmf_transport_poll_group_add(tgroup, qpair);
1165 			break;
1166 		}
1167 	}
1168 
1169 	/* We add the qpair to the group only it is successfully added into the tgroup */
1170 	if (rc == 0) {
1171 		SPDK_DTRACE_PROBE2_TICKS(nvmf_poll_group_add_qpair, qpair, spdk_thread_get_id(group->thread));
1172 		TAILQ_INSERT_TAIL(&group->qpairs, qpair, link);
1173 		nvmf_qpair_set_state(qpair, SPDK_NVMF_QPAIR_ACTIVE);
1174 	}
1175 
1176 	return rc;
1177 }
1178 
1179 static void
1180 _nvmf_ctrlr_destruct(void *ctx)
1181 {
1182 	struct spdk_nvmf_ctrlr *ctrlr = ctx;
1183 
1184 	nvmf_ctrlr_destruct(ctrlr);
1185 }
1186 
1187 static void
1188 _nvmf_ctrlr_free_from_qpair(void *ctx)
1189 {
1190 	struct nvmf_qpair_disconnect_ctx *qpair_ctx = ctx;
1191 	struct spdk_nvmf_ctrlr *ctrlr = qpair_ctx->ctrlr;
1192 	uint32_t count;
1193 
1194 	spdk_bit_array_clear(ctrlr->qpair_mask, qpair_ctx->qid);
1195 	count = spdk_bit_array_count_set(ctrlr->qpair_mask);
1196 	if (count == 0) {
1197 		assert(!ctrlr->in_destruct);
1198 		SPDK_DEBUGLOG(nvmf, "Last qpair %u, destroy ctrlr 0x%hx\n", qpair_ctx->qid, ctrlr->cntlid);
1199 		ctrlr->in_destruct = true;
1200 		spdk_thread_send_msg(ctrlr->subsys->thread, _nvmf_ctrlr_destruct, ctrlr);
1201 	}
1202 	free(qpair_ctx);
1203 }
1204 
1205 static void
1206 _nvmf_transport_qpair_fini_complete(void *cb_ctx)
1207 {
1208 	struct nvmf_qpair_disconnect_ctx *qpair_ctx = cb_ctx;
1209 	struct spdk_nvmf_ctrlr *ctrlr;
1210 	/* Store cb args since cb_ctx can be freed in _nvmf_ctrlr_free_from_qpair */
1211 	nvmf_qpair_disconnect_cb cb_fn = qpair_ctx->cb_fn;
1212 	void *cb_arg = qpair_ctx->ctx;
1213 	struct spdk_thread *cb_thread = qpair_ctx->thread;
1214 
1215 	ctrlr = qpair_ctx->ctrlr;
1216 	SPDK_DEBUGLOG(nvmf, "Finish destroying qid %u\n", qpair_ctx->qid);
1217 
1218 	if (ctrlr) {
1219 		if (qpair_ctx->qid == 0) {
1220 			/* Admin qpair is removed, so set the pointer to NULL.
1221 			 * This operation is safe since we are on ctrlr thread now, admin qpair's thread is the same
1222 			 * as controller's thread */
1223 			assert(ctrlr->thread == spdk_get_thread());
1224 			ctrlr->admin_qpair = NULL;
1225 		}
1226 		/* Free qpair id from controller's bit mask and destroy the controller if it is the last qpair */
1227 		if (ctrlr->thread) {
1228 			spdk_thread_send_msg(ctrlr->thread, _nvmf_ctrlr_free_from_qpair, qpair_ctx);
1229 		} else {
1230 			_nvmf_ctrlr_free_from_qpair(qpair_ctx);
1231 		}
1232 	} else {
1233 		free(qpair_ctx);
1234 	}
1235 
1236 	if (cb_fn) {
1237 		spdk_thread_send_msg(cb_thread, cb_fn, cb_arg);
1238 	}
1239 }
1240 
1241 void
1242 spdk_nvmf_poll_group_remove(struct spdk_nvmf_qpair *qpair)
1243 {
1244 	struct spdk_nvmf_transport_poll_group *tgroup;
1245 	int rc;
1246 
1247 	SPDK_DTRACE_PROBE2_TICKS(nvmf_poll_group_remove_qpair, qpair,
1248 				 spdk_thread_get_id(qpair->group->thread));
1249 	nvmf_qpair_set_state(qpair, SPDK_NVMF_QPAIR_ERROR);
1250 
1251 	/* Find the tgroup and remove the qpair from the tgroup */
1252 	TAILQ_FOREACH(tgroup, &qpair->group->tgroups, link) {
1253 		if (tgroup->transport == qpair->transport) {
1254 			rc = nvmf_transport_poll_group_remove(tgroup, qpair);
1255 			if (rc && (rc != ENOTSUP)) {
1256 				SPDK_ERRLOG("Cannot remove qpair=%p from transport group=%p\n",
1257 					    qpair, tgroup);
1258 			}
1259 			break;
1260 		}
1261 	}
1262 
1263 	TAILQ_REMOVE(&qpair->group->qpairs, qpair, link);
1264 	qpair->group = NULL;
1265 }
1266 
1267 static void
1268 _nvmf_qpair_sgroup_req_clean(struct spdk_nvmf_subsystem_poll_group *sgroup,
1269 			     const struct spdk_nvmf_qpair *qpair)
1270 {
1271 	struct spdk_nvmf_request *req, *tmp;
1272 	TAILQ_FOREACH_SAFE(req, &sgroup->queued, link, tmp) {
1273 		if (req->qpair == qpair) {
1274 			TAILQ_REMOVE(&sgroup->queued, req, link);
1275 			if (nvmf_transport_req_free(req)) {
1276 				SPDK_ERRLOG("Transport request free error!\n");
1277 			}
1278 		}
1279 	}
1280 }
1281 
1282 static void
1283 _nvmf_qpair_destroy(void *ctx, int status)
1284 {
1285 	struct nvmf_qpair_disconnect_ctx *qpair_ctx = ctx;
1286 	struct spdk_nvmf_qpair *qpair = qpair_ctx->qpair;
1287 	struct spdk_nvmf_ctrlr *ctrlr = qpair->ctrlr;
1288 	struct spdk_nvmf_subsystem_poll_group *sgroup;
1289 	uint32_t sid;
1290 
1291 	assert(qpair->state == SPDK_NVMF_QPAIR_DEACTIVATING);
1292 	qpair_ctx->qid = qpair->qid;
1293 
1294 	if (qpair->connect_received) {
1295 		if (0 == qpair->qid) {
1296 			assert(qpair->group->stat.current_admin_qpairs > 0);
1297 			qpair->group->stat.current_admin_qpairs--;
1298 		} else {
1299 			assert(qpair->group->stat.current_io_qpairs > 0);
1300 			qpair->group->stat.current_io_qpairs--;
1301 		}
1302 	} else {
1303 		pthread_mutex_lock(&qpair->group->mutex);
1304 		qpair->group->current_unassociated_qpairs--;
1305 		pthread_mutex_unlock(&qpair->group->mutex);
1306 	}
1307 
1308 	if (ctrlr) {
1309 		sgroup = &qpair->group->sgroups[ctrlr->subsys->id];
1310 		_nvmf_qpair_sgroup_req_clean(sgroup, qpair);
1311 	} else {
1312 		for (sid = 0; sid < qpair->group->num_sgroups; sid++) {
1313 			sgroup = &qpair->group->sgroups[sid];
1314 			assert(sgroup != NULL);
1315 			_nvmf_qpair_sgroup_req_clean(sgroup, qpair);
1316 		}
1317 	}
1318 
1319 	qpair_ctx->ctrlr = ctrlr;
1320 	spdk_nvmf_poll_group_remove(qpair);
1321 	nvmf_transport_qpair_fini(qpair, _nvmf_transport_qpair_fini_complete, qpair_ctx);
1322 }
1323 
1324 static void
1325 _nvmf_qpair_disconnect_msg(void *ctx)
1326 {
1327 	struct nvmf_qpair_disconnect_ctx *qpair_ctx = ctx;
1328 
1329 	spdk_nvmf_qpair_disconnect(qpair_ctx->qpair, qpair_ctx->cb_fn, qpair_ctx->ctx);
1330 	free(ctx);
1331 }
1332 
1333 SPDK_LOG_DEPRECATION_REGISTER(spdk_nvmf_qpair_disconnect, "cb_fn and ctx are deprecated", "v24.01",
1334 			      0);
1335 
1336 int
1337 spdk_nvmf_qpair_disconnect(struct spdk_nvmf_qpair *qpair, nvmf_qpair_disconnect_cb cb_fn, void *ctx)
1338 {
1339 	struct spdk_nvmf_poll_group *group = qpair->group;
1340 	struct nvmf_qpair_disconnect_ctx *qpair_ctx;
1341 
1342 	if (__atomic_test_and_set(&qpair->disconnect_started, __ATOMIC_RELAXED)) {
1343 		return -EINPROGRESS;
1344 	}
1345 
1346 	if (cb_fn || ctx) {
1347 		SPDK_LOG_DEPRECATED(spdk_nvmf_qpair_disconnect);
1348 	}
1349 
1350 	/* If we get a qpair in the uninitialized state, we can just destroy it immediately */
1351 	if (qpair->state == SPDK_NVMF_QPAIR_UNINITIALIZED) {
1352 		nvmf_transport_qpair_fini(qpair, NULL, NULL);
1353 		if (cb_fn) {
1354 			cb_fn(ctx);
1355 		}
1356 		return 0;
1357 	}
1358 
1359 	assert(group != NULL);
1360 	if (spdk_get_thread() != group->thread) {
1361 		/* clear the atomic so we can set it on the next call on the proper thread. */
1362 		__atomic_clear(&qpair->disconnect_started, __ATOMIC_RELAXED);
1363 		qpair_ctx = calloc(1, sizeof(struct nvmf_qpair_disconnect_ctx));
1364 		if (!qpair_ctx) {
1365 			SPDK_ERRLOG("Unable to allocate context for nvmf_qpair_disconnect\n");
1366 			return -ENOMEM;
1367 		}
1368 		qpair_ctx->qpair = qpair;
1369 		qpair_ctx->cb_fn = cb_fn;
1370 		qpair_ctx->thread = group->thread;
1371 		qpair_ctx->ctx = ctx;
1372 		spdk_thread_send_msg(group->thread, _nvmf_qpair_disconnect_msg, qpair_ctx);
1373 		return 0;
1374 	}
1375 
1376 	SPDK_DTRACE_PROBE2_TICKS(nvmf_qpair_disconnect, qpair, spdk_thread_get_id(group->thread));
1377 	assert(qpair->state == SPDK_NVMF_QPAIR_ACTIVE);
1378 	nvmf_qpair_set_state(qpair, SPDK_NVMF_QPAIR_DEACTIVATING);
1379 
1380 	qpair_ctx = calloc(1, sizeof(struct nvmf_qpair_disconnect_ctx));
1381 	if (!qpair_ctx) {
1382 		SPDK_ERRLOG("Unable to allocate context for nvmf_qpair_disconnect\n");
1383 		return -ENOMEM;
1384 	}
1385 
1386 	qpair_ctx->qpair = qpair;
1387 	qpair_ctx->cb_fn = cb_fn;
1388 	qpair_ctx->thread = group->thread;
1389 	qpair_ctx->ctx = ctx;
1390 
1391 	/* Check for outstanding I/O */
1392 	if (!TAILQ_EMPTY(&qpair->outstanding)) {
1393 		SPDK_DTRACE_PROBE2_TICKS(nvmf_poll_group_drain_qpair, qpair, spdk_thread_get_id(group->thread));
1394 		qpair->state_cb = _nvmf_qpair_destroy;
1395 		qpair->state_cb_arg = qpair_ctx;
1396 		nvmf_qpair_abort_pending_zcopy_reqs(qpair);
1397 		nvmf_qpair_free_aer(qpair);
1398 		return 0;
1399 	}
1400 
1401 	_nvmf_qpair_destroy(qpair_ctx, 0);
1402 
1403 	return 0;
1404 }
1405 
1406 int
1407 spdk_nvmf_qpair_get_peer_trid(struct spdk_nvmf_qpair *qpair,
1408 			      struct spdk_nvme_transport_id *trid)
1409 {
1410 	memset(trid, 0, sizeof(*trid));
1411 	return nvmf_transport_qpair_get_peer_trid(qpair, trid);
1412 }
1413 
1414 int
1415 spdk_nvmf_qpair_get_local_trid(struct spdk_nvmf_qpair *qpair,
1416 			       struct spdk_nvme_transport_id *trid)
1417 {
1418 	memset(trid, 0, sizeof(*trid));
1419 	return nvmf_transport_qpair_get_local_trid(qpair, trid);
1420 }
1421 
1422 int
1423 spdk_nvmf_qpair_get_listen_trid(struct spdk_nvmf_qpair *qpair,
1424 				struct spdk_nvme_transport_id *trid)
1425 {
1426 	memset(trid, 0, sizeof(*trid));
1427 	return nvmf_transport_qpair_get_listen_trid(qpair, trid);
1428 }
1429 
1430 static int
1431 poll_group_update_subsystem(struct spdk_nvmf_poll_group *group,
1432 			    struct spdk_nvmf_subsystem *subsystem)
1433 {
1434 	struct spdk_nvmf_subsystem_poll_group *sgroup;
1435 	uint32_t new_num_ns, old_num_ns;
1436 	uint32_t i, j;
1437 	struct spdk_nvmf_ns *ns;
1438 	struct spdk_nvmf_registrant *reg, *tmp;
1439 	struct spdk_io_channel *ch;
1440 	struct spdk_nvmf_subsystem_pg_ns_info *ns_info;
1441 	struct spdk_nvmf_ctrlr *ctrlr;
1442 	bool ns_changed;
1443 
1444 	/* Make sure our poll group has memory for this subsystem allocated */
1445 	if (subsystem->id >= group->num_sgroups) {
1446 		return -ENOMEM;
1447 	}
1448 
1449 	sgroup = &group->sgroups[subsystem->id];
1450 
1451 	/* Make sure the array of namespace information is the correct size */
1452 	new_num_ns = subsystem->max_nsid;
1453 	old_num_ns = sgroup->num_ns;
1454 
1455 	ns_changed = false;
1456 
1457 	if (old_num_ns == 0) {
1458 		if (new_num_ns > 0) {
1459 			/* First allocation */
1460 			sgroup->ns_info = calloc(new_num_ns, sizeof(struct spdk_nvmf_subsystem_pg_ns_info));
1461 			if (!sgroup->ns_info) {
1462 				return -ENOMEM;
1463 			}
1464 		}
1465 	} else if (new_num_ns > old_num_ns) {
1466 		void *buf;
1467 
1468 		/* Make the array larger */
1469 		buf = realloc(sgroup->ns_info, new_num_ns * sizeof(struct spdk_nvmf_subsystem_pg_ns_info));
1470 		if (!buf) {
1471 			return -ENOMEM;
1472 		}
1473 
1474 		sgroup->ns_info = buf;
1475 
1476 		/* Null out the new namespace information slots */
1477 		for (i = old_num_ns; i < new_num_ns; i++) {
1478 			memset(&sgroup->ns_info[i], 0, sizeof(struct spdk_nvmf_subsystem_pg_ns_info));
1479 		}
1480 	} else if (new_num_ns < old_num_ns) {
1481 		void *buf;
1482 
1483 		/* Free the extra I/O channels */
1484 		for (i = new_num_ns; i < old_num_ns; i++) {
1485 			ns_info = &sgroup->ns_info[i];
1486 
1487 			if (ns_info->channel) {
1488 				spdk_put_io_channel(ns_info->channel);
1489 				ns_info->channel = NULL;
1490 			}
1491 		}
1492 
1493 		/* Make the array smaller */
1494 		if (new_num_ns > 0) {
1495 			buf = realloc(sgroup->ns_info, new_num_ns * sizeof(struct spdk_nvmf_subsystem_pg_ns_info));
1496 			if (!buf) {
1497 				return -ENOMEM;
1498 			}
1499 			sgroup->ns_info = buf;
1500 		} else {
1501 			free(sgroup->ns_info);
1502 			sgroup->ns_info = NULL;
1503 		}
1504 	}
1505 
1506 	sgroup->num_ns = new_num_ns;
1507 
1508 	/* Detect bdevs that were added or removed */
1509 	for (i = 0; i < sgroup->num_ns; i++) {
1510 		ns = subsystem->ns[i];
1511 		ns_info = &sgroup->ns_info[i];
1512 		ch = ns_info->channel;
1513 
1514 		if (ns == NULL && ch == NULL) {
1515 			/* Both NULL. Leave empty */
1516 		} else if (ns == NULL && ch != NULL) {
1517 			/* There was a channel here, but the namespace is gone. */
1518 			ns_changed = true;
1519 			spdk_put_io_channel(ch);
1520 			ns_info->channel = NULL;
1521 		} else if (ns != NULL && ch == NULL) {
1522 			/* A namespace appeared but there is no channel yet */
1523 			ns_changed = true;
1524 			ch = spdk_bdev_get_io_channel(ns->desc);
1525 			if (ch == NULL) {
1526 				SPDK_ERRLOG("Could not allocate I/O channel.\n");
1527 				return -ENOMEM;
1528 			}
1529 			ns_info->channel = ch;
1530 		} else if (spdk_uuid_compare(&ns_info->uuid, spdk_bdev_get_uuid(ns->bdev)) != 0) {
1531 			/* A namespace was here before, but was replaced by a new one. */
1532 			ns_changed = true;
1533 			spdk_put_io_channel(ns_info->channel);
1534 			memset(ns_info, 0, sizeof(*ns_info));
1535 
1536 			ch = spdk_bdev_get_io_channel(ns->desc);
1537 			if (ch == NULL) {
1538 				SPDK_ERRLOG("Could not allocate I/O channel.\n");
1539 				return -ENOMEM;
1540 			}
1541 			ns_info->channel = ch;
1542 		} else if (ns_info->num_blocks != spdk_bdev_get_num_blocks(ns->bdev)) {
1543 			/* Namespace is still there but size has changed */
1544 			SPDK_DEBUGLOG(nvmf, "Namespace resized: subsystem_id %u,"
1545 				      " nsid %u, pg %p, old %" PRIu64 ", new %" PRIu64 "\n",
1546 				      subsystem->id,
1547 				      ns->nsid,
1548 				      group,
1549 				      ns_info->num_blocks,
1550 				      spdk_bdev_get_num_blocks(ns->bdev));
1551 			ns_changed = true;
1552 		}
1553 
1554 		if (ns == NULL) {
1555 			memset(ns_info, 0, sizeof(*ns_info));
1556 		} else {
1557 			ns_info->uuid = *spdk_bdev_get_uuid(ns->bdev);
1558 			ns_info->num_blocks = spdk_bdev_get_num_blocks(ns->bdev);
1559 			ns_info->crkey = ns->crkey;
1560 			ns_info->rtype = ns->rtype;
1561 			if (ns->holder) {
1562 				ns_info->holder_id = ns->holder->hostid;
1563 			}
1564 
1565 			memset(&ns_info->reg_hostid, 0, SPDK_NVMF_MAX_NUM_REGISTRANTS * sizeof(struct spdk_uuid));
1566 			j = 0;
1567 			TAILQ_FOREACH_SAFE(reg, &ns->registrants, link, tmp) {
1568 				if (j >= SPDK_NVMF_MAX_NUM_REGISTRANTS) {
1569 					SPDK_ERRLOG("Maximum %u registrants can support.\n", SPDK_NVMF_MAX_NUM_REGISTRANTS);
1570 					return -EINVAL;
1571 				}
1572 				ns_info->reg_hostid[j++] = reg->hostid;
1573 			}
1574 		}
1575 	}
1576 
1577 	if (ns_changed) {
1578 		TAILQ_FOREACH(ctrlr, &subsystem->ctrlrs, link) {
1579 			if (ctrlr->thread != spdk_get_thread()) {
1580 				continue;
1581 			}
1582 			/* It is possible that a ctrlr was added but the admin_qpair hasn't been
1583 			 * assigned yet.
1584 			 */
1585 			if (!ctrlr->admin_qpair) {
1586 				continue;
1587 			}
1588 			if (ctrlr->admin_qpair->group == group) {
1589 				nvmf_ctrlr_async_event_ns_notice(ctrlr);
1590 				nvmf_ctrlr_async_event_ana_change_notice(ctrlr);
1591 			}
1592 		}
1593 	}
1594 
1595 	return 0;
1596 }
1597 
1598 int
1599 nvmf_poll_group_update_subsystem(struct spdk_nvmf_poll_group *group,
1600 				 struct spdk_nvmf_subsystem *subsystem)
1601 {
1602 	return poll_group_update_subsystem(group, subsystem);
1603 }
1604 
1605 int
1606 nvmf_poll_group_add_subsystem(struct spdk_nvmf_poll_group *group,
1607 			      struct spdk_nvmf_subsystem *subsystem,
1608 			      spdk_nvmf_poll_group_mod_done cb_fn, void *cb_arg)
1609 {
1610 	int rc = 0;
1611 	struct spdk_nvmf_subsystem_poll_group *sgroup = &group->sgroups[subsystem->id];
1612 	uint32_t i;
1613 
1614 	TAILQ_INIT(&sgroup->queued);
1615 
1616 	rc = poll_group_update_subsystem(group, subsystem);
1617 	if (rc) {
1618 		nvmf_poll_group_remove_subsystem(group, subsystem, NULL, NULL);
1619 		goto fini;
1620 	}
1621 
1622 	sgroup->state = SPDK_NVMF_SUBSYSTEM_ACTIVE;
1623 
1624 	for (i = 0; i < sgroup->num_ns; i++) {
1625 		sgroup->ns_info[i].state = SPDK_NVMF_SUBSYSTEM_ACTIVE;
1626 	}
1627 
1628 fini:
1629 	if (cb_fn) {
1630 		cb_fn(cb_arg, rc);
1631 	}
1632 
1633 	SPDK_DTRACE_PROBE2_TICKS(nvmf_poll_group_add_subsystem, spdk_thread_get_id(group->thread),
1634 				 subsystem->subnqn);
1635 
1636 	return rc;
1637 }
1638 
1639 static void
1640 _nvmf_poll_group_remove_subsystem_cb(void *ctx, int status)
1641 {
1642 	struct nvmf_qpair_disconnect_many_ctx *qpair_ctx = ctx;
1643 	struct spdk_nvmf_subsystem *subsystem;
1644 	struct spdk_nvmf_poll_group *group;
1645 	struct spdk_nvmf_subsystem_poll_group *sgroup;
1646 	spdk_nvmf_poll_group_mod_done cpl_fn = NULL;
1647 	void *cpl_ctx = NULL;
1648 	uint32_t nsid;
1649 
1650 	group = qpair_ctx->group;
1651 	subsystem = qpair_ctx->subsystem;
1652 	cpl_fn = qpair_ctx->cpl_fn;
1653 	cpl_ctx = qpair_ctx->cpl_ctx;
1654 	sgroup = &group->sgroups[subsystem->id];
1655 
1656 	if (status) {
1657 		goto fini;
1658 	}
1659 
1660 	for (nsid = 0; nsid < sgroup->num_ns; nsid++) {
1661 		if (sgroup->ns_info[nsid].channel) {
1662 			spdk_put_io_channel(sgroup->ns_info[nsid].channel);
1663 			sgroup->ns_info[nsid].channel = NULL;
1664 		}
1665 	}
1666 
1667 	sgroup->num_ns = 0;
1668 	free(sgroup->ns_info);
1669 	sgroup->ns_info = NULL;
1670 fini:
1671 	free(qpair_ctx);
1672 	if (cpl_fn) {
1673 		cpl_fn(cpl_ctx, status);
1674 	}
1675 }
1676 
1677 static void nvmf_poll_group_remove_subsystem_msg(void *ctx);
1678 
1679 static void
1680 nvmf_poll_group_remove_subsystem_msg(void *ctx)
1681 {
1682 	struct spdk_nvmf_qpair *qpair, *qpair_tmp;
1683 	struct spdk_nvmf_subsystem *subsystem;
1684 	struct spdk_nvmf_poll_group *group;
1685 	struct nvmf_qpair_disconnect_many_ctx *qpair_ctx = ctx;
1686 	bool qpairs_found = false;
1687 	int rc = 0;
1688 
1689 	group = qpair_ctx->group;
1690 	subsystem = qpair_ctx->subsystem;
1691 
1692 	TAILQ_FOREACH_SAFE(qpair, &group->qpairs, link, qpair_tmp) {
1693 		if ((qpair->ctrlr != NULL) && (qpair->ctrlr->subsys == subsystem)) {
1694 			qpairs_found = true;
1695 			rc = spdk_nvmf_qpair_disconnect(qpair, NULL, NULL);
1696 			if (rc && rc != -EINPROGRESS) {
1697 				break;
1698 			}
1699 		}
1700 	}
1701 
1702 	if (!qpairs_found) {
1703 		_nvmf_poll_group_remove_subsystem_cb(ctx, 0);
1704 		return;
1705 	}
1706 
1707 	/* Some qpairs are in process of being disconnected. Send a message and try to remove them again */
1708 	spdk_thread_send_msg(spdk_get_thread(), nvmf_poll_group_remove_subsystem_msg, ctx);
1709 }
1710 
1711 void
1712 nvmf_poll_group_remove_subsystem(struct spdk_nvmf_poll_group *group,
1713 				 struct spdk_nvmf_subsystem *subsystem,
1714 				 spdk_nvmf_poll_group_mod_done cb_fn, void *cb_arg)
1715 {
1716 	struct spdk_nvmf_subsystem_poll_group *sgroup;
1717 	struct nvmf_qpair_disconnect_many_ctx *ctx;
1718 	uint32_t i;
1719 
1720 	SPDK_DTRACE_PROBE3_TICKS(nvmf_poll_group_remove_subsystem, group, spdk_thread_get_id(group->thread),
1721 				 subsystem->subnqn);
1722 
1723 	ctx = calloc(1, sizeof(struct nvmf_qpair_disconnect_many_ctx));
1724 	if (!ctx) {
1725 		SPDK_ERRLOG("Unable to allocate memory for context to remove poll subsystem\n");
1726 		if (cb_fn) {
1727 			cb_fn(cb_arg, -1);
1728 		}
1729 		return;
1730 	}
1731 
1732 	ctx->group = group;
1733 	ctx->subsystem = subsystem;
1734 	ctx->cpl_fn = cb_fn;
1735 	ctx->cpl_ctx = cb_arg;
1736 
1737 	sgroup = &group->sgroups[subsystem->id];
1738 	sgroup->state = SPDK_NVMF_SUBSYSTEM_INACTIVE;
1739 
1740 	for (i = 0; i < sgroup->num_ns; i++) {
1741 		sgroup->ns_info[i].state = SPDK_NVMF_SUBSYSTEM_INACTIVE;
1742 	}
1743 
1744 	nvmf_poll_group_remove_subsystem_msg(ctx);
1745 }
1746 
1747 void
1748 nvmf_poll_group_pause_subsystem(struct spdk_nvmf_poll_group *group,
1749 				struct spdk_nvmf_subsystem *subsystem,
1750 				uint32_t nsid,
1751 				spdk_nvmf_poll_group_mod_done cb_fn, void *cb_arg)
1752 {
1753 	struct spdk_nvmf_subsystem_poll_group *sgroup;
1754 	struct spdk_nvmf_subsystem_pg_ns_info *ns_info = NULL;
1755 	int rc = 0;
1756 	uint32_t i;
1757 
1758 	if (subsystem->id >= group->num_sgroups) {
1759 		rc = -1;
1760 		goto fini;
1761 	}
1762 
1763 	sgroup = &group->sgroups[subsystem->id];
1764 	if (sgroup->state == SPDK_NVMF_SUBSYSTEM_PAUSED) {
1765 		goto fini;
1766 	}
1767 	sgroup->state = SPDK_NVMF_SUBSYSTEM_PAUSING;
1768 
1769 	if (nsid == SPDK_NVME_GLOBAL_NS_TAG) {
1770 		for (i = 0; i < sgroup->num_ns; i++) {
1771 			ns_info = &sgroup->ns_info[i];
1772 			ns_info->state = SPDK_NVMF_SUBSYSTEM_PAUSING;
1773 		}
1774 	} else {
1775 		/* NOTE: This implicitly also checks for 0, since 0 - 1 wraps around to UINT32_MAX. */
1776 		if (nsid - 1 < sgroup->num_ns) {
1777 			ns_info  = &sgroup->ns_info[nsid - 1];
1778 			ns_info->state = SPDK_NVMF_SUBSYSTEM_PAUSING;
1779 		}
1780 	}
1781 
1782 	if (sgroup->mgmt_io_outstanding > 0) {
1783 		assert(sgroup->cb_fn == NULL);
1784 		sgroup->cb_fn = cb_fn;
1785 		assert(sgroup->cb_arg == NULL);
1786 		sgroup->cb_arg = cb_arg;
1787 		return;
1788 	}
1789 
1790 	if (nsid == SPDK_NVME_GLOBAL_NS_TAG) {
1791 		for (i = 0; i < sgroup->num_ns; i++) {
1792 			ns_info = &sgroup->ns_info[i];
1793 
1794 			if (ns_info->io_outstanding > 0) {
1795 				assert(sgroup->cb_fn == NULL);
1796 				sgroup->cb_fn = cb_fn;
1797 				assert(sgroup->cb_arg == NULL);
1798 				sgroup->cb_arg = cb_arg;
1799 				return;
1800 			}
1801 		}
1802 	} else {
1803 		if (ns_info != NULL && ns_info->io_outstanding > 0) {
1804 			assert(sgroup->cb_fn == NULL);
1805 			sgroup->cb_fn = cb_fn;
1806 			assert(sgroup->cb_arg == NULL);
1807 			sgroup->cb_arg = cb_arg;
1808 			return;
1809 		}
1810 	}
1811 
1812 	assert(sgroup->mgmt_io_outstanding == 0);
1813 	sgroup->state = SPDK_NVMF_SUBSYSTEM_PAUSED;
1814 fini:
1815 	if (cb_fn) {
1816 		cb_fn(cb_arg, rc);
1817 	}
1818 }
1819 
1820 void
1821 nvmf_poll_group_resume_subsystem(struct spdk_nvmf_poll_group *group,
1822 				 struct spdk_nvmf_subsystem *subsystem,
1823 				 spdk_nvmf_poll_group_mod_done cb_fn, void *cb_arg)
1824 {
1825 	struct spdk_nvmf_request *req, *tmp;
1826 	struct spdk_nvmf_subsystem_poll_group *sgroup;
1827 	int rc = 0;
1828 	uint32_t i;
1829 
1830 	if (subsystem->id >= group->num_sgroups) {
1831 		rc = -1;
1832 		goto fini;
1833 	}
1834 
1835 	sgroup = &group->sgroups[subsystem->id];
1836 
1837 	if (sgroup->state == SPDK_NVMF_SUBSYSTEM_ACTIVE) {
1838 		goto fini;
1839 	}
1840 
1841 	rc = poll_group_update_subsystem(group, subsystem);
1842 	if (rc) {
1843 		goto fini;
1844 	}
1845 
1846 	for (i = 0; i < sgroup->num_ns; i++) {
1847 		sgroup->ns_info[i].state = SPDK_NVMF_SUBSYSTEM_ACTIVE;
1848 	}
1849 
1850 	sgroup->state = SPDK_NVMF_SUBSYSTEM_ACTIVE;
1851 
1852 	/* Release all queued requests */
1853 	TAILQ_FOREACH_SAFE(req, &sgroup->queued, link, tmp) {
1854 		TAILQ_REMOVE(&sgroup->queued, req, link);
1855 		if (spdk_nvmf_request_using_zcopy(req)) {
1856 			spdk_nvmf_request_zcopy_start(req);
1857 		} else {
1858 			spdk_nvmf_request_exec(req);
1859 		}
1860 
1861 	}
1862 fini:
1863 	if (cb_fn) {
1864 		cb_fn(cb_arg, rc);
1865 	}
1866 }
1867 
1868 
1869 struct spdk_nvmf_poll_group *
1870 spdk_nvmf_get_optimal_poll_group(struct spdk_nvmf_qpair *qpair)
1871 {
1872 	struct spdk_nvmf_transport_poll_group *tgroup;
1873 
1874 	tgroup = nvmf_transport_get_optimal_poll_group(qpair->transport, qpair);
1875 
1876 	if (tgroup == NULL) {
1877 		return NULL;
1878 	}
1879 
1880 	return tgroup->group;
1881 }
1882 
1883 void
1884 spdk_nvmf_poll_group_dump_stat(struct spdk_nvmf_poll_group *group, struct spdk_json_write_ctx *w)
1885 {
1886 	struct spdk_nvmf_transport_poll_group *tgroup;
1887 
1888 	spdk_json_write_object_begin(w);
1889 
1890 	spdk_json_write_named_string(w, "name", spdk_thread_get_name(spdk_get_thread()));
1891 	spdk_json_write_named_uint32(w, "admin_qpairs", group->stat.admin_qpairs);
1892 	spdk_json_write_named_uint32(w, "io_qpairs", group->stat.io_qpairs);
1893 	spdk_json_write_named_uint32(w, "current_admin_qpairs", group->stat.current_admin_qpairs);
1894 	spdk_json_write_named_uint32(w, "current_io_qpairs", group->stat.current_io_qpairs);
1895 	spdk_json_write_named_uint64(w, "pending_bdev_io", group->stat.pending_bdev_io);
1896 	spdk_json_write_named_uint64(w, "completed_nvme_io", group->stat.completed_nvme_io);
1897 
1898 	spdk_json_write_named_array_begin(w, "transports");
1899 
1900 	TAILQ_FOREACH(tgroup, &group->tgroups, link) {
1901 		spdk_json_write_object_begin(w);
1902 		/*
1903 		 * The trtype field intentionally contains a transport name as this is more informative.
1904 		 * The field has not been renamed for backward compatibility.
1905 		 */
1906 		spdk_json_write_named_string(w, "trtype", spdk_nvmf_get_transport_name(tgroup->transport));
1907 
1908 		if (tgroup->transport->ops->poll_group_dump_stat) {
1909 			tgroup->transport->ops->poll_group_dump_stat(tgroup, w);
1910 		}
1911 
1912 		spdk_json_write_object_end(w);
1913 	}
1914 
1915 	spdk_json_write_array_end(w);
1916 	spdk_json_write_object_end(w);
1917 }
1918