xref: /spdk/lib/nvmf/transport.c (revision 7ff7ec0ed88d5acad07010b6c577326debc22f7c)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation. All rights reserved.
3  *   Copyright (c) 2018-2019, 2021 Mellanox Technologies LTD. All rights reserved.
4  *   Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "nvmf_internal.h"
10 #include "transport.h"
11 
12 #include "spdk/config.h"
13 #include "spdk/log.h"
14 #include "spdk/nvmf.h"
15 #include "spdk/nvmf_transport.h"
16 #include "spdk/queue.h"
17 #include "spdk/util.h"
18 #include "spdk_internal/usdt.h"
19 
20 #define NVMF_TRANSPORT_DEFAULT_ASSOCIATION_TIMEOUT_IN_MS 120000
21 
22 struct nvmf_transport_ops_list_element {
23 	struct spdk_nvmf_transport_ops			ops;
24 	TAILQ_ENTRY(nvmf_transport_ops_list_element)	link;
25 };
26 
27 TAILQ_HEAD(nvmf_transport_ops_list, nvmf_transport_ops_list_element)
28 g_spdk_nvmf_transport_ops = TAILQ_HEAD_INITIALIZER(g_spdk_nvmf_transport_ops);
29 
30 static inline const struct spdk_nvmf_transport_ops *
31 nvmf_get_transport_ops(const char *transport_name)
32 {
33 	struct nvmf_transport_ops_list_element *ops;
34 	TAILQ_FOREACH(ops, &g_spdk_nvmf_transport_ops, link) {
35 		if (strcasecmp(transport_name, ops->ops.name) == 0) {
36 			return &ops->ops;
37 		}
38 	}
39 	return NULL;
40 }
41 
42 void
43 spdk_nvmf_transport_register(const struct spdk_nvmf_transport_ops *ops)
44 {
45 	struct nvmf_transport_ops_list_element *new_ops;
46 
47 	if (nvmf_get_transport_ops(ops->name) != NULL) {
48 		SPDK_ERRLOG("Double registering nvmf transport type %s.\n", ops->name);
49 		assert(false);
50 		return;
51 	}
52 
53 	new_ops = calloc(1, sizeof(*new_ops));
54 	if (new_ops == NULL) {
55 		SPDK_ERRLOG("Unable to allocate memory to register new transport type %s.\n", ops->name);
56 		assert(false);
57 		return;
58 	}
59 
60 	new_ops->ops = *ops;
61 
62 	TAILQ_INSERT_TAIL(&g_spdk_nvmf_transport_ops, new_ops, link);
63 }
64 
65 const struct spdk_nvmf_transport_opts *
66 spdk_nvmf_get_transport_opts(struct spdk_nvmf_transport *transport)
67 {
68 	return &transport->opts;
69 }
70 
71 void
72 nvmf_transport_dump_opts(struct spdk_nvmf_transport *transport, struct spdk_json_write_ctx *w,
73 			 bool named)
74 {
75 	const struct spdk_nvmf_transport_opts *opts = spdk_nvmf_get_transport_opts(transport);
76 
77 	named ? spdk_json_write_named_object_begin(w, "params") : spdk_json_write_object_begin(w);
78 
79 	spdk_json_write_named_string(w, "trtype", spdk_nvmf_get_transport_name(transport));
80 	spdk_json_write_named_uint32(w, "max_queue_depth", opts->max_queue_depth);
81 	spdk_json_write_named_uint32(w, "max_io_qpairs_per_ctrlr", opts->max_qpairs_per_ctrlr - 1);
82 	spdk_json_write_named_uint32(w, "in_capsule_data_size", opts->in_capsule_data_size);
83 	spdk_json_write_named_uint32(w, "max_io_size", opts->max_io_size);
84 	spdk_json_write_named_uint32(w, "io_unit_size", opts->io_unit_size);
85 	spdk_json_write_named_uint32(w, "max_aq_depth", opts->max_aq_depth);
86 	spdk_json_write_named_uint32(w, "num_shared_buffers", opts->num_shared_buffers);
87 	spdk_json_write_named_uint32(w, "buf_cache_size", opts->buf_cache_size);
88 	spdk_json_write_named_bool(w, "dif_insert_or_strip", opts->dif_insert_or_strip);
89 	spdk_json_write_named_bool(w, "zcopy", opts->zcopy);
90 
91 	if (transport->ops->dump_opts) {
92 		transport->ops->dump_opts(transport, w);
93 	}
94 
95 	spdk_json_write_named_uint32(w, "abort_timeout_sec", opts->abort_timeout_sec);
96 	spdk_json_write_named_uint32(w, "ack_timeout", opts->ack_timeout);
97 	spdk_json_write_named_uint32(w, "data_wr_pool_size", opts->data_wr_pool_size);
98 	spdk_json_write_object_end(w);
99 }
100 
101 void
102 nvmf_transport_listen_dump_trid(const struct spdk_nvme_transport_id *trid,
103 				struct spdk_json_write_ctx *w)
104 {
105 	const char *adrfam = spdk_nvme_transport_id_adrfam_str(trid->adrfam);
106 
107 	spdk_json_write_named_string(w, "trtype", trid->trstring);
108 	spdk_json_write_named_string(w, "adrfam", adrfam ? adrfam : "unknown");
109 	spdk_json_write_named_string(w, "traddr", trid->traddr);
110 	spdk_json_write_named_string(w, "trsvcid", trid->trsvcid);
111 }
112 
113 spdk_nvme_transport_type_t
114 spdk_nvmf_get_transport_type(struct spdk_nvmf_transport *transport)
115 {
116 	return transport->ops->type;
117 }
118 
119 const char *
120 spdk_nvmf_get_transport_name(struct spdk_nvmf_transport *transport)
121 {
122 	return transport->ops->name;
123 }
124 
125 static void
126 nvmf_transport_opts_copy(struct spdk_nvmf_transport_opts *opts,
127 			 struct spdk_nvmf_transport_opts *opts_src,
128 			 size_t opts_size)
129 {
130 	assert(opts);
131 	assert(opts_src);
132 
133 	opts->opts_size = opts_size;
134 
135 #define SET_FIELD(field) \
136 	if (offsetof(struct spdk_nvmf_transport_opts, field) + sizeof(opts->field) <= opts_size) { \
137 		opts->field = opts_src->field; \
138 	} \
139 
140 	SET_FIELD(max_queue_depth);
141 	SET_FIELD(max_qpairs_per_ctrlr);
142 	SET_FIELD(in_capsule_data_size);
143 	SET_FIELD(max_io_size);
144 	SET_FIELD(io_unit_size);
145 	SET_FIELD(max_aq_depth);
146 	SET_FIELD(buf_cache_size);
147 	SET_FIELD(num_shared_buffers);
148 	SET_FIELD(dif_insert_or_strip);
149 	SET_FIELD(abort_timeout_sec);
150 	SET_FIELD(association_timeout);
151 	SET_FIELD(transport_specific);
152 	SET_FIELD(acceptor_poll_rate);
153 	SET_FIELD(zcopy);
154 	SET_FIELD(ack_timeout);
155 	SET_FIELD(data_wr_pool_size);
156 
157 	/* Do not remove this statement, you should always update this statement when you adding a new field,
158 	 * and do not forget to add the SET_FIELD statement for your added field. */
159 	SPDK_STATIC_ASSERT(sizeof(struct spdk_nvmf_transport_opts) == 72, "Incorrect size");
160 
161 #undef SET_FIELD
162 #undef FILED_CHECK
163 }
164 
165 struct nvmf_transport_create_ctx {
166 	const struct spdk_nvmf_transport_ops *ops;
167 	struct spdk_nvmf_transport_opts opts;
168 	void *cb_arg;
169 	spdk_nvmf_transport_create_done_cb cb_fn;
170 };
171 
172 static bool
173 nvmf_transport_use_iobuf(struct spdk_nvmf_transport *transport)
174 {
175 	return transport->opts.num_shared_buffers || transport->opts.buf_cache_size;
176 }
177 
178 static void
179 nvmf_transport_create_async_done(void *cb_arg, struct spdk_nvmf_transport *transport)
180 {
181 	struct nvmf_transport_create_ctx *ctx = cb_arg;
182 	int chars_written;
183 
184 	if (!transport) {
185 		SPDK_ERRLOG("Failed to create transport.\n");
186 		goto err;
187 	}
188 
189 	pthread_mutex_init(&transport->mutex, NULL);
190 	TAILQ_INIT(&transport->listeners);
191 	transport->ops = ctx->ops;
192 	transport->opts = ctx->opts;
193 	chars_written = snprintf(transport->iobuf_name, MAX_MEMPOOL_NAME_LENGTH, "%s_%s", "nvmf",
194 				 transport->ops->name);
195 	if (chars_written < 0) {
196 		SPDK_ERRLOG("Unable to generate transport data buffer pool name.\n");
197 		goto err;
198 	}
199 
200 	if (nvmf_transport_use_iobuf(transport)) {
201 		spdk_iobuf_register_module(transport->iobuf_name);
202 	}
203 
204 	ctx->cb_fn(ctx->cb_arg, transport);
205 	free(ctx);
206 	return;
207 
208 err:
209 	if (transport) {
210 		transport->ops->destroy(transport, NULL, NULL);
211 	}
212 
213 	ctx->cb_fn(ctx->cb_arg, NULL);
214 	free(ctx);
215 }
216 
217 static void
218 _nvmf_transport_create_done(void *ctx)
219 {
220 	struct nvmf_transport_create_ctx *_ctx = (struct nvmf_transport_create_ctx *)ctx;
221 
222 	nvmf_transport_create_async_done(_ctx, _ctx->ops->create(&_ctx->opts));
223 }
224 
225 static int
226 nvmf_transport_create(const char *transport_name, struct spdk_nvmf_transport_opts *opts,
227 		      spdk_nvmf_transport_create_done_cb cb_fn, void *cb_arg, bool sync)
228 {
229 	struct nvmf_transport_create_ctx *ctx;
230 	struct spdk_iobuf_opts opts_iobuf = {};
231 	int rc;
232 	uint64_t count;
233 
234 	ctx = calloc(1, sizeof(*ctx));
235 	if (!ctx) {
236 		return -ENOMEM;
237 	}
238 
239 	if (!opts) {
240 		SPDK_ERRLOG("opts should not be NULL\n");
241 		goto err;
242 	}
243 
244 	if (!opts->opts_size) {
245 		SPDK_ERRLOG("The opts_size in opts structure should not be zero\n");
246 		goto err;
247 	}
248 
249 	ctx->ops = nvmf_get_transport_ops(transport_name);
250 	if (!ctx->ops) {
251 		SPDK_ERRLOG("Transport type '%s' unavailable.\n", transport_name);
252 		goto err;
253 	}
254 
255 	nvmf_transport_opts_copy(&ctx->opts, opts, opts->opts_size);
256 	if (ctx->opts.max_io_size != 0 && (!spdk_u32_is_pow2(ctx->opts.max_io_size) ||
257 					   ctx->opts.max_io_size < 8192)) {
258 		SPDK_ERRLOG("max_io_size %u must be a power of 2 and be greater than or equal 8KB\n",
259 			    ctx->opts.max_io_size);
260 		goto err;
261 	}
262 
263 	if (ctx->opts.max_aq_depth < SPDK_NVMF_MIN_ADMIN_MAX_SQ_SIZE) {
264 		SPDK_ERRLOG("max_aq_depth %u is less than minimum defined by NVMf spec, use min value\n",
265 			    ctx->opts.max_aq_depth);
266 		ctx->opts.max_aq_depth = SPDK_NVMF_MIN_ADMIN_MAX_SQ_SIZE;
267 	}
268 
269 	spdk_iobuf_get_opts(&opts_iobuf, sizeof(opts_iobuf));
270 	if (ctx->opts.io_unit_size == 0) {
271 		SPDK_ERRLOG("io_unit_size cannot be 0\n");
272 		goto err;
273 	}
274 	if (ctx->opts.io_unit_size > opts_iobuf.large_bufsize) {
275 		SPDK_ERRLOG("io_unit_size %u is larger than iobuf pool large buffer size %d\n",
276 			    ctx->opts.io_unit_size, opts_iobuf.large_bufsize);
277 		goto err;
278 	}
279 
280 	if (ctx->opts.io_unit_size <= opts_iobuf.small_bufsize) {
281 		/* We'll be using the small buffer pool only */
282 		count = opts_iobuf.small_pool_count;
283 	} else {
284 		count = spdk_min(opts_iobuf.small_pool_count, opts_iobuf.large_pool_count);
285 	}
286 
287 	if (ctx->opts.num_shared_buffers > count) {
288 		SPDK_WARNLOG("The num_shared_buffers value (%u) is larger than the available iobuf"
289 			     " pool size (%lu). Please increase the iobuf pool sizes.\n",
290 			     ctx->opts.num_shared_buffers, count);
291 	}
292 
293 	ctx->cb_fn = cb_fn;
294 	ctx->cb_arg = cb_arg;
295 
296 	/* Prioritize sync create operation. */
297 	if (ctx->ops->create) {
298 		if (sync) {
299 			_nvmf_transport_create_done(ctx);
300 			return 0;
301 		}
302 
303 		rc = spdk_thread_send_msg(spdk_get_thread(), _nvmf_transport_create_done, ctx);
304 		if (rc) {
305 			goto err;
306 		}
307 
308 		return 0;
309 	}
310 
311 	assert(ctx->ops->create_async);
312 	rc = ctx->ops->create_async(&ctx->opts, nvmf_transport_create_async_done, ctx);
313 	if (rc) {
314 		SPDK_ERRLOG("Unable to create new transport of type %s\n", transport_name);
315 		goto err;
316 	}
317 
318 	return 0;
319 err:
320 	free(ctx);
321 	return -1;
322 }
323 
324 int
325 spdk_nvmf_transport_create_async(const char *transport_name, struct spdk_nvmf_transport_opts *opts,
326 				 spdk_nvmf_transport_create_done_cb cb_fn, void *cb_arg)
327 {
328 	return nvmf_transport_create(transport_name, opts, cb_fn, cb_arg, false);
329 }
330 
331 static void
332 nvmf_transport_create_sync_done(void *cb_arg, struct spdk_nvmf_transport *transport)
333 {
334 	struct spdk_nvmf_transport **_transport = cb_arg;
335 
336 	*_transport = transport;
337 }
338 
339 struct spdk_nvmf_transport *
340 spdk_nvmf_transport_create(const char *transport_name, struct spdk_nvmf_transport_opts *opts)
341 {
342 	struct spdk_nvmf_transport *transport = NULL;
343 
344 	/* Current implementation supports synchronous version of create operation only. */
345 	assert(nvmf_get_transport_ops(transport_name) && nvmf_get_transport_ops(transport_name)->create);
346 
347 	nvmf_transport_create(transport_name, opts, nvmf_transport_create_sync_done, &transport, true);
348 	return transport;
349 }
350 
351 struct spdk_nvmf_transport *
352 spdk_nvmf_transport_get_first(struct spdk_nvmf_tgt *tgt)
353 {
354 	return TAILQ_FIRST(&tgt->transports);
355 }
356 
357 struct spdk_nvmf_transport *
358 spdk_nvmf_transport_get_next(struct spdk_nvmf_transport *transport)
359 {
360 	return TAILQ_NEXT(transport, link);
361 }
362 
363 int
364 spdk_nvmf_transport_destroy(struct spdk_nvmf_transport *transport,
365 			    spdk_nvmf_transport_destroy_done_cb cb_fn, void *cb_arg)
366 {
367 	struct spdk_nvmf_listener *listener, *listener_tmp;
368 
369 	TAILQ_FOREACH_SAFE(listener, &transport->listeners, link, listener_tmp) {
370 		TAILQ_REMOVE(&transport->listeners, listener, link);
371 		transport->ops->stop_listen(transport, &listener->trid);
372 		free(listener);
373 	}
374 
375 	if (nvmf_transport_use_iobuf(transport)) {
376 		spdk_iobuf_unregister_module(transport->iobuf_name);
377 	}
378 
379 	pthread_mutex_destroy(&transport->mutex);
380 	return transport->ops->destroy(transport, cb_fn, cb_arg);
381 }
382 
383 struct spdk_nvmf_listener *
384 nvmf_transport_find_listener(struct spdk_nvmf_transport *transport,
385 			     const struct spdk_nvme_transport_id *trid)
386 {
387 	struct spdk_nvmf_listener *listener;
388 
389 	TAILQ_FOREACH(listener, &transport->listeners, link) {
390 		if (spdk_nvme_transport_id_compare(&listener->trid, trid) == 0) {
391 			return listener;
392 		}
393 	}
394 
395 	return NULL;
396 }
397 
398 int
399 spdk_nvmf_transport_listen(struct spdk_nvmf_transport *transport,
400 			   const struct spdk_nvme_transport_id *trid, struct spdk_nvmf_listen_opts *opts)
401 {
402 	struct spdk_nvmf_listener *listener;
403 	int rc;
404 
405 	listener = nvmf_transport_find_listener(transport, trid);
406 	if (!listener) {
407 		listener = calloc(1, sizeof(*listener));
408 		if (!listener) {
409 			return -ENOMEM;
410 		}
411 
412 		listener->ref = 1;
413 		listener->trid = *trid;
414 		TAILQ_INSERT_TAIL(&transport->listeners, listener, link);
415 		pthread_mutex_lock(&transport->mutex);
416 		rc = transport->ops->listen(transport, &listener->trid, opts);
417 		pthread_mutex_unlock(&transport->mutex);
418 		if (rc != 0) {
419 			TAILQ_REMOVE(&transport->listeners, listener, link);
420 			free(listener);
421 		}
422 		return rc;
423 	}
424 
425 	++listener->ref;
426 
427 	return 0;
428 }
429 
430 int
431 spdk_nvmf_transport_stop_listen(struct spdk_nvmf_transport *transport,
432 				const struct spdk_nvme_transport_id *trid)
433 {
434 	struct spdk_nvmf_listener *listener;
435 
436 	listener = nvmf_transport_find_listener(transport, trid);
437 	if (!listener) {
438 		return -ENOENT;
439 	}
440 
441 	if (--listener->ref == 0) {
442 		TAILQ_REMOVE(&transport->listeners, listener, link);
443 		pthread_mutex_lock(&transport->mutex);
444 		transport->ops->stop_listen(transport, trid);
445 		pthread_mutex_unlock(&transport->mutex);
446 		free(listener);
447 	}
448 
449 	return 0;
450 }
451 
452 struct nvmf_stop_listen_ctx {
453 	struct spdk_nvmf_transport *transport;
454 	struct spdk_nvme_transport_id trid;
455 	struct spdk_nvmf_subsystem *subsystem;
456 	spdk_nvmf_tgt_subsystem_listen_done_fn cb_fn;
457 	void *cb_arg;
458 };
459 
460 static void
461 nvmf_stop_listen_fini(struct spdk_io_channel_iter *i, int status)
462 {
463 	struct nvmf_stop_listen_ctx *ctx;
464 	struct spdk_nvmf_transport *transport;
465 	int rc = status;
466 
467 	ctx = spdk_io_channel_iter_get_ctx(i);
468 	transport = ctx->transport;
469 	assert(transport != NULL);
470 
471 	rc = spdk_nvmf_transport_stop_listen(transport, &ctx->trid);
472 	if (rc) {
473 		SPDK_ERRLOG("Failed to stop listening on address '%s'\n", ctx->trid.traddr);
474 	}
475 
476 	if (ctx->cb_fn) {
477 		ctx->cb_fn(ctx->cb_arg, rc);
478 	}
479 	free(ctx);
480 }
481 
482 static void nvmf_stop_listen_disconnect_qpairs(struct spdk_io_channel_iter *i);
483 
484 static void
485 nvmf_stop_listen_disconnect_qpairs_msg(void *ctx)
486 {
487 	nvmf_stop_listen_disconnect_qpairs((struct spdk_io_channel_iter *)ctx);
488 }
489 
490 static void
491 nvmf_stop_listen_disconnect_qpairs(struct spdk_io_channel_iter *i)
492 {
493 	struct nvmf_stop_listen_ctx *ctx;
494 	struct spdk_nvmf_poll_group *group;
495 	struct spdk_io_channel *ch;
496 	struct spdk_nvmf_qpair *qpair, *tmp_qpair;
497 	struct spdk_nvme_transport_id tmp_trid;
498 	bool qpair_found = false;
499 
500 	ctx = spdk_io_channel_iter_get_ctx(i);
501 	ch = spdk_io_channel_iter_get_channel(i);
502 	group = spdk_io_channel_get_ctx(ch);
503 
504 	TAILQ_FOREACH_SAFE(qpair, &group->qpairs, link, tmp_qpair) {
505 		if (spdk_nvmf_qpair_get_listen_trid(qpair, &tmp_trid)) {
506 			continue;
507 		}
508 
509 		/* Skip qpairs that don't match the listen trid and subsystem pointer.  If
510 		 * the ctx->subsystem is NULL, it means disconnect all qpairs that match
511 		 * the listen trid. */
512 		if (!spdk_nvme_transport_id_compare(&ctx->trid, &tmp_trid)) {
513 			if (ctx->subsystem == NULL ||
514 			    (qpair->ctrlr != NULL && ctx->subsystem == qpair->ctrlr->subsys)) {
515 				spdk_nvmf_qpair_disconnect(qpair);
516 				qpair_found = true;
517 			}
518 		}
519 	}
520 	if (qpair_found) {
521 		spdk_thread_send_msg(spdk_get_thread(), nvmf_stop_listen_disconnect_qpairs_msg, i);
522 		return;
523 	}
524 
525 	spdk_for_each_channel_continue(i, 0);
526 }
527 
528 int
529 spdk_nvmf_transport_stop_listen_async(struct spdk_nvmf_transport *transport,
530 				      const struct spdk_nvme_transport_id *trid,
531 				      struct spdk_nvmf_subsystem *subsystem,
532 				      spdk_nvmf_tgt_subsystem_listen_done_fn cb_fn,
533 				      void *cb_arg)
534 {
535 	struct nvmf_stop_listen_ctx *ctx;
536 
537 	if (trid->subnqn[0] != '\0') {
538 		SPDK_ERRLOG("subnqn should be empty, use subsystem pointer instead\n");
539 		return -EINVAL;
540 	}
541 
542 	ctx = calloc(1, sizeof(struct nvmf_stop_listen_ctx));
543 	if (ctx == NULL) {
544 		return -ENOMEM;
545 	}
546 
547 	ctx->trid = *trid;
548 	ctx->subsystem = subsystem;
549 	ctx->transport = transport;
550 	ctx->cb_fn = cb_fn;
551 	ctx->cb_arg = cb_arg;
552 
553 	spdk_for_each_channel(transport->tgt, nvmf_stop_listen_disconnect_qpairs, ctx,
554 			      nvmf_stop_listen_fini);
555 
556 	return 0;
557 }
558 
559 void
560 nvmf_transport_listener_discover(struct spdk_nvmf_transport *transport,
561 				 struct spdk_nvme_transport_id *trid,
562 				 struct spdk_nvmf_discovery_log_page_entry *entry)
563 {
564 	transport->ops->listener_discover(transport, trid, entry);
565 }
566 
567 struct spdk_nvmf_transport_poll_group *
568 nvmf_transport_poll_group_create(struct spdk_nvmf_transport *transport,
569 				 struct spdk_nvmf_poll_group *group)
570 {
571 	struct spdk_nvmf_transport_poll_group *tgroup;
572 	struct spdk_iobuf_opts opts_iobuf = {};
573 	uint32_t buf_cache_size, small_cache_size, large_cache_size;
574 	int rc;
575 
576 	pthread_mutex_lock(&transport->mutex);
577 	tgroup = transport->ops->poll_group_create(transport, group);
578 	pthread_mutex_unlock(&transport->mutex);
579 	if (!tgroup) {
580 		return NULL;
581 	}
582 	tgroup->transport = transport;
583 
584 	STAILQ_INIT(&tgroup->pending_buf_queue);
585 
586 	if (!nvmf_transport_use_iobuf(transport)) {
587 		/* We aren't going to allocate any shared buffers or cache, so just return now. */
588 		return tgroup;
589 	}
590 
591 	buf_cache_size = transport->opts.buf_cache_size;
592 
593 	/* buf_cache_size of UINT32_MAX means the value should be calculated dynamically
594 	 * based on the number of buffers in the shared pool and the number of poll groups
595 	 * that are sharing them.  We allocate 75% of the pool for the cache, and then
596 	 * divide that by number of poll groups to determine the buf_cache_size for this
597 	 * poll group.
598 	 */
599 	if (buf_cache_size == UINT32_MAX) {
600 		uint32_t num_shared_buffers = transport->opts.num_shared_buffers;
601 
602 		/* Theoretically the nvmf library can dynamically add poll groups to
603 		 * the target, after transports have already been created.  We aren't
604 		 * going to try to really handle this case efficiently, just do enough
605 		 * here to ensure we don't divide-by-zero.
606 		 */
607 		uint16_t num_poll_groups = group->tgt->num_poll_groups ? : spdk_env_get_core_count();
608 
609 		buf_cache_size = (num_shared_buffers * 3 / 4) / num_poll_groups;
610 	}
611 
612 	spdk_iobuf_get_opts(&opts_iobuf, sizeof(opts_iobuf));
613 	small_cache_size = buf_cache_size;
614 	if (transport->opts.io_unit_size <= opts_iobuf.small_bufsize) {
615 		large_cache_size = 0;
616 	} else {
617 		large_cache_size = buf_cache_size;
618 	}
619 
620 	tgroup->buf_cache = calloc(1, sizeof(*tgroup->buf_cache));
621 	if (!tgroup->buf_cache) {
622 		SPDK_ERRLOG("Unable to allocate an iobuf channel in the poll group.\n");
623 		goto err;
624 	}
625 
626 	rc = spdk_iobuf_channel_init(tgroup->buf_cache, transport->iobuf_name, small_cache_size,
627 				     large_cache_size);
628 	if (rc != 0) {
629 		SPDK_ERRLOG("Unable to reserve the full number of buffers for the pg buffer cache.\n");
630 		rc = spdk_iobuf_channel_init(tgroup->buf_cache, transport->iobuf_name, 0, 0);
631 		if (rc != 0) {
632 			SPDK_ERRLOG("Unable to create an iobuf channel in the poll group.\n");
633 			goto err;
634 		}
635 	}
636 
637 	return tgroup;
638 err:
639 	transport->ops->poll_group_destroy(tgroup);
640 	return NULL;
641 }
642 
643 struct spdk_nvmf_transport_poll_group *
644 nvmf_transport_get_optimal_poll_group(struct spdk_nvmf_transport *transport,
645 				      struct spdk_nvmf_qpair *qpair)
646 {
647 	struct spdk_nvmf_transport_poll_group *tgroup;
648 
649 	if (transport->ops->get_optimal_poll_group) {
650 		pthread_mutex_lock(&transport->mutex);
651 		tgroup = transport->ops->get_optimal_poll_group(qpair);
652 		pthread_mutex_unlock(&transport->mutex);
653 
654 		return tgroup;
655 	} else {
656 		return NULL;
657 	}
658 }
659 
660 void
661 nvmf_transport_poll_group_destroy(struct spdk_nvmf_transport_poll_group *group)
662 {
663 	struct spdk_nvmf_transport *transport;
664 	struct spdk_iobuf_channel *ch = NULL;
665 
666 	transport = group->transport;
667 
668 	if (!STAILQ_EMPTY(&group->pending_buf_queue)) {
669 		SPDK_ERRLOG("Pending I/O list wasn't empty on poll group destruction\n");
670 	}
671 
672 	if (nvmf_transport_use_iobuf(transport)) {
673 		/* The call to poll_group_destroy both frees the group memory, but also
674 		 * releases any remaining buffers. Cache channel pointer so we can still
675 		 * release the resources after the group has been freed. */
676 		ch = group->buf_cache;
677 	}
678 
679 	pthread_mutex_lock(&transport->mutex);
680 	transport->ops->poll_group_destroy(group);
681 	pthread_mutex_unlock(&transport->mutex);
682 
683 	if (nvmf_transport_use_iobuf(transport)) {
684 		spdk_iobuf_channel_fini(ch);
685 		free(ch);
686 	}
687 }
688 
689 int
690 nvmf_transport_poll_group_add(struct spdk_nvmf_transport_poll_group *group,
691 			      struct spdk_nvmf_qpair *qpair)
692 {
693 	if (qpair->transport) {
694 		assert(qpair->transport == group->transport);
695 		if (qpair->transport != group->transport) {
696 			return -1;
697 		}
698 	} else {
699 		qpair->transport = group->transport;
700 	}
701 
702 	SPDK_DTRACE_PROBE3(nvmf_transport_poll_group_add, qpair, qpair->qid,
703 			   spdk_thread_get_id(group->group->thread));
704 
705 	return group->transport->ops->poll_group_add(group, qpair);
706 }
707 
708 int
709 nvmf_transport_poll_group_remove(struct spdk_nvmf_transport_poll_group *group,
710 				 struct spdk_nvmf_qpair *qpair)
711 {
712 	int rc = ENOTSUP;
713 
714 	SPDK_DTRACE_PROBE3(nvmf_transport_poll_group_remove, qpair, qpair->qid,
715 			   spdk_thread_get_id(group->group->thread));
716 
717 	assert(qpair->transport == group->transport);
718 	if (group->transport->ops->poll_group_remove) {
719 		rc = group->transport->ops->poll_group_remove(group, qpair);
720 	}
721 
722 	return rc;
723 }
724 
725 int
726 nvmf_transport_poll_group_poll(struct spdk_nvmf_transport_poll_group *group)
727 {
728 	return group->transport->ops->poll_group_poll(group);
729 }
730 
731 int
732 nvmf_transport_req_free(struct spdk_nvmf_request *req)
733 {
734 	return req->qpair->transport->ops->req_free(req);
735 }
736 
737 int
738 nvmf_transport_req_complete(struct spdk_nvmf_request *req)
739 {
740 	return req->qpair->transport->ops->req_complete(req);
741 }
742 
743 void
744 nvmf_transport_qpair_fini(struct spdk_nvmf_qpair *qpair,
745 			  spdk_nvmf_transport_qpair_fini_cb cb_fn,
746 			  void *cb_arg)
747 {
748 	SPDK_DTRACE_PROBE1(nvmf_transport_qpair_fini, qpair);
749 
750 	qpair->transport->ops->qpair_fini(qpair, cb_fn, cb_arg);
751 }
752 
753 int
754 nvmf_transport_qpair_get_peer_trid(struct spdk_nvmf_qpair *qpair,
755 				   struct spdk_nvme_transport_id *trid)
756 {
757 	return qpair->transport->ops->qpair_get_peer_trid(qpair, trid);
758 }
759 
760 int
761 nvmf_transport_qpair_get_local_trid(struct spdk_nvmf_qpair *qpair,
762 				    struct spdk_nvme_transport_id *trid)
763 {
764 	return qpair->transport->ops->qpair_get_local_trid(qpair, trid);
765 }
766 
767 int
768 nvmf_transport_qpair_get_listen_trid(struct spdk_nvmf_qpair *qpair,
769 				     struct spdk_nvme_transport_id *trid)
770 {
771 	return qpair->transport->ops->qpair_get_listen_trid(qpair, trid);
772 }
773 
774 void
775 nvmf_transport_qpair_abort_request(struct spdk_nvmf_qpair *qpair,
776 				   struct spdk_nvmf_request *req)
777 {
778 	if (qpair->transport->ops->qpair_abort_request) {
779 		qpair->transport->ops->qpair_abort_request(qpair, req);
780 	}
781 }
782 
783 bool
784 spdk_nvmf_transport_opts_init(const char *transport_name,
785 			      struct spdk_nvmf_transport_opts *opts, size_t opts_size)
786 {
787 	const struct spdk_nvmf_transport_ops *ops;
788 	struct spdk_nvmf_transport_opts opts_local = {};
789 
790 	ops = nvmf_get_transport_ops(transport_name);
791 	if (!ops) {
792 		SPDK_ERRLOG("Transport type %s unavailable.\n", transport_name);
793 		return false;
794 	}
795 
796 	if (!opts) {
797 		SPDK_ERRLOG("opts should not be NULL\n");
798 		return false;
799 	}
800 
801 	if (!opts_size) {
802 		SPDK_ERRLOG("opts_size inside opts should not be zero value\n");
803 		return false;
804 	}
805 
806 	opts_local.association_timeout = NVMF_TRANSPORT_DEFAULT_ASSOCIATION_TIMEOUT_IN_MS;
807 	opts_local.acceptor_poll_rate = SPDK_NVMF_DEFAULT_ACCEPT_POLL_RATE_US;
808 	opts_local.disable_command_passthru = false;
809 	ops->opts_init(&opts_local);
810 
811 	nvmf_transport_opts_copy(opts, &opts_local, opts_size);
812 
813 	return true;
814 }
815 
816 void
817 spdk_nvmf_request_free_buffers(struct spdk_nvmf_request *req,
818 			       struct spdk_nvmf_transport_poll_group *group,
819 			       struct spdk_nvmf_transport *transport)
820 {
821 	uint32_t i;
822 
823 	for (i = 0; i < req->iovcnt; i++) {
824 		spdk_iobuf_put(group->buf_cache, req->iov[i].iov_base, req->iov[i].iov_len);
825 		req->iov[i].iov_base = NULL;
826 		req->iov[i].iov_len = 0;
827 	}
828 	req->iovcnt = 0;
829 	req->data_from_pool = false;
830 }
831 
832 typedef int (*set_buffer_callback)(struct spdk_nvmf_request *req, void *buf,
833 				   uint32_t length,	uint32_t io_unit_size);
834 static int
835 nvmf_request_set_buffer(struct spdk_nvmf_request *req, void *buf, uint32_t length,
836 			uint32_t io_unit_size)
837 {
838 	req->iov[req->iovcnt].iov_base = buf;
839 	req->iov[req->iovcnt].iov_len  = spdk_min(length, io_unit_size);
840 	length -= req->iov[req->iovcnt].iov_len;
841 	req->iovcnt++;
842 
843 	return length;
844 }
845 
846 static int
847 nvmf_request_get_buffers(struct spdk_nvmf_request *req,
848 			 struct spdk_nvmf_transport_poll_group *group,
849 			 struct spdk_nvmf_transport *transport,
850 			 uint32_t length, uint32_t io_unit_size,
851 			 set_buffer_callback cb_func)
852 {
853 	uint32_t num_buffers;
854 	uint32_t i = 0;
855 	void *buffer;
856 
857 	/* If the number of buffers is too large, then we know the I/O is larger than allowed.
858 	 *  Fail it.
859 	 */
860 	num_buffers = SPDK_CEIL_DIV(length, io_unit_size);
861 	if (spdk_unlikely(num_buffers > NVMF_REQ_MAX_BUFFERS)) {
862 		return -EINVAL;
863 	}
864 
865 	while (i < num_buffers) {
866 		buffer = spdk_iobuf_get(group->buf_cache, spdk_min(io_unit_size, length), NULL, NULL);
867 		if (spdk_unlikely(buffer == NULL)) {
868 			return -ENOMEM;
869 		}
870 		length = cb_func(req, buffer, length, io_unit_size);
871 		i++;
872 	}
873 
874 	assert(length == 0);
875 
876 	return 0;
877 }
878 
879 int
880 spdk_nvmf_request_get_buffers(struct spdk_nvmf_request *req,
881 			      struct spdk_nvmf_transport_poll_group *group,
882 			      struct spdk_nvmf_transport *transport,
883 			      uint32_t length)
884 {
885 	int rc;
886 
887 	assert(nvmf_transport_use_iobuf(transport));
888 
889 	req->iovcnt = 0;
890 	rc = nvmf_request_get_buffers(req, group, transport, length,
891 				      transport->opts.io_unit_size,
892 				      nvmf_request_set_buffer);
893 	if (spdk_likely(rc == 0)) {
894 		req->data_from_pool = true;
895 	} else if (rc == -ENOMEM) {
896 		spdk_nvmf_request_free_buffers(req, group, transport);
897 	}
898 
899 	return rc;
900 }
901 
902 static int
903 nvmf_request_set_stripped_buffer(struct spdk_nvmf_request *req, void *buf, uint32_t length,
904 				 uint32_t io_unit_size)
905 {
906 	struct spdk_nvmf_stripped_data *data = req->stripped_data;
907 
908 	data->iov[data->iovcnt].iov_base = buf;
909 	data->iov[data->iovcnt].iov_len  = spdk_min(length, io_unit_size);
910 	length -= data->iov[data->iovcnt].iov_len;
911 	data->iovcnt++;
912 
913 	return length;
914 }
915 
916 void
917 nvmf_request_free_stripped_buffers(struct spdk_nvmf_request *req,
918 				   struct spdk_nvmf_transport_poll_group *group,
919 				   struct spdk_nvmf_transport *transport)
920 {
921 	struct spdk_nvmf_stripped_data *data = req->stripped_data;
922 	uint32_t i;
923 
924 	for (i = 0; i < data->iovcnt; i++) {
925 		spdk_iobuf_put(group->buf_cache, data->iov[i].iov_base, data->iov[i].iov_len);
926 	}
927 	free(data);
928 	req->stripped_data = NULL;
929 }
930 
931 int
932 nvmf_request_get_stripped_buffers(struct spdk_nvmf_request *req,
933 				  struct spdk_nvmf_transport_poll_group *group,
934 				  struct spdk_nvmf_transport *transport,
935 				  uint32_t length)
936 {
937 	uint32_t block_size = req->dif.dif_ctx.block_size;
938 	uint32_t data_block_size = block_size - req->dif.dif_ctx.md_size;
939 	uint32_t io_unit_size = transport->opts.io_unit_size / block_size * data_block_size;
940 	struct spdk_nvmf_stripped_data *data;
941 	uint32_t i;
942 	int rc;
943 
944 	/* Data blocks must be block aligned */
945 	for (i = 0; i < req->iovcnt; i++) {
946 		if (req->iov[i].iov_len % block_size) {
947 			return -EINVAL;
948 		}
949 	}
950 
951 	data = calloc(1, sizeof(*data));
952 	if (data == NULL) {
953 		SPDK_ERRLOG("Unable to allocate memory for stripped_data.\n");
954 		return -ENOMEM;
955 	}
956 	req->stripped_data = data;
957 	req->stripped_data->iovcnt = 0;
958 
959 	rc = nvmf_request_get_buffers(req, group, transport, length, io_unit_size,
960 				      nvmf_request_set_stripped_buffer);
961 	if (rc == -ENOMEM) {
962 		nvmf_request_free_stripped_buffers(req, group, transport);
963 		return rc;
964 	}
965 	return rc;
966 }
967