xref: /spdk/lib/nvmf/transport.c (revision 6ecf0442ef0837776cebebdd988431f2420be303)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation. All rights reserved.
3  *   Copyright (c) 2018-2019, 2021 Mellanox Technologies LTD. All rights reserved.
4  *   Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "nvmf_internal.h"
10 #include "transport.h"
11 
12 #include "spdk/config.h"
13 #include "spdk/log.h"
14 #include "spdk/nvmf.h"
15 #include "spdk/nvmf_transport.h"
16 #include "spdk/queue.h"
17 #include "spdk/util.h"
18 #include "spdk_internal/usdt.h"
19 
20 #define NVMF_TRANSPORT_DEFAULT_ASSOCIATION_TIMEOUT_IN_MS 120000
21 
22 struct nvmf_transport_ops_list_element {
23 	struct spdk_nvmf_transport_ops			ops;
24 	TAILQ_ENTRY(nvmf_transport_ops_list_element)	link;
25 };
26 
27 TAILQ_HEAD(nvmf_transport_ops_list, nvmf_transport_ops_list_element)
28 g_spdk_nvmf_transport_ops = TAILQ_HEAD_INITIALIZER(g_spdk_nvmf_transport_ops);
29 
30 static inline const struct spdk_nvmf_transport_ops *
31 nvmf_get_transport_ops(const char *transport_name)
32 {
33 	struct nvmf_transport_ops_list_element *ops;
34 	TAILQ_FOREACH(ops, &g_spdk_nvmf_transport_ops, link) {
35 		if (strcasecmp(transport_name, ops->ops.name) == 0) {
36 			return &ops->ops;
37 		}
38 	}
39 	return NULL;
40 }
41 
42 void
43 spdk_nvmf_transport_register(const struct spdk_nvmf_transport_ops *ops)
44 {
45 	struct nvmf_transport_ops_list_element *new_ops;
46 
47 	if (nvmf_get_transport_ops(ops->name) != NULL) {
48 		SPDK_ERRLOG("Double registering nvmf transport type %s.\n", ops->name);
49 		assert(false);
50 		return;
51 	}
52 
53 	new_ops = calloc(1, sizeof(*new_ops));
54 	if (new_ops == NULL) {
55 		SPDK_ERRLOG("Unable to allocate memory to register new transport type %s.\n", ops->name);
56 		assert(false);
57 		return;
58 	}
59 
60 	new_ops->ops = *ops;
61 
62 	TAILQ_INSERT_TAIL(&g_spdk_nvmf_transport_ops, new_ops, link);
63 }
64 
65 const struct spdk_nvmf_transport_opts *
66 spdk_nvmf_get_transport_opts(struct spdk_nvmf_transport *transport)
67 {
68 	return &transport->opts;
69 }
70 
71 void
72 nvmf_transport_dump_opts(struct spdk_nvmf_transport *transport, struct spdk_json_write_ctx *w,
73 			 bool named)
74 {
75 	const struct spdk_nvmf_transport_opts *opts = spdk_nvmf_get_transport_opts(transport);
76 
77 	named ? spdk_json_write_named_object_begin(w, "params") : spdk_json_write_object_begin(w);
78 
79 	spdk_json_write_named_string(w, "trtype", spdk_nvmf_get_transport_name(transport));
80 	spdk_json_write_named_uint32(w, "max_queue_depth", opts->max_queue_depth);
81 	spdk_json_write_named_uint32(w, "max_io_qpairs_per_ctrlr", opts->max_qpairs_per_ctrlr - 1);
82 	spdk_json_write_named_uint32(w, "in_capsule_data_size", opts->in_capsule_data_size);
83 	spdk_json_write_named_uint32(w, "max_io_size", opts->max_io_size);
84 	spdk_json_write_named_uint32(w, "io_unit_size", opts->io_unit_size);
85 	spdk_json_write_named_uint32(w, "max_aq_depth", opts->max_aq_depth);
86 	spdk_json_write_named_uint32(w, "num_shared_buffers", opts->num_shared_buffers);
87 	spdk_json_write_named_uint32(w, "buf_cache_size", opts->buf_cache_size);
88 	spdk_json_write_named_bool(w, "dif_insert_or_strip", opts->dif_insert_or_strip);
89 	spdk_json_write_named_bool(w, "zcopy", opts->zcopy);
90 
91 	if (transport->ops->dump_opts) {
92 		transport->ops->dump_opts(transport, w);
93 	}
94 
95 	spdk_json_write_named_uint32(w, "abort_timeout_sec", opts->abort_timeout_sec);
96 	spdk_json_write_object_end(w);
97 }
98 
99 void
100 nvmf_transport_listen_dump_trid(const struct spdk_nvme_transport_id *trid,
101 				struct spdk_json_write_ctx *w)
102 {
103 	const char *adrfam = spdk_nvme_transport_id_adrfam_str(trid->adrfam);
104 
105 	spdk_json_write_named_string(w, "trtype", trid->trstring);
106 	spdk_json_write_named_string(w, "adrfam", adrfam ? adrfam : "unknown");
107 	spdk_json_write_named_string(w, "traddr", trid->traddr);
108 	spdk_json_write_named_string(w, "trsvcid", trid->trsvcid);
109 }
110 
111 spdk_nvme_transport_type_t
112 spdk_nvmf_get_transport_type(struct spdk_nvmf_transport *transport)
113 {
114 	return transport->ops->type;
115 }
116 
117 const char *
118 spdk_nvmf_get_transport_name(struct spdk_nvmf_transport *transport)
119 {
120 	return transport->ops->name;
121 }
122 
123 static void
124 nvmf_transport_opts_copy(struct spdk_nvmf_transport_opts *opts,
125 			 struct spdk_nvmf_transport_opts *opts_src,
126 			 size_t opts_size)
127 {
128 	assert(opts);
129 	assert(opts_src);
130 
131 	opts->opts_size = opts_size;
132 
133 #define SET_FIELD(field) \
134 	if (offsetof(struct spdk_nvmf_transport_opts, field) + sizeof(opts->field) <= opts_size) { \
135 		opts->field = opts_src->field; \
136 	} \
137 
138 	SET_FIELD(max_queue_depth);
139 	SET_FIELD(max_qpairs_per_ctrlr);
140 	SET_FIELD(in_capsule_data_size);
141 	SET_FIELD(max_io_size);
142 	SET_FIELD(io_unit_size);
143 	SET_FIELD(max_aq_depth);
144 	SET_FIELD(buf_cache_size);
145 	SET_FIELD(num_shared_buffers);
146 	SET_FIELD(dif_insert_or_strip);
147 	SET_FIELD(abort_timeout_sec);
148 	SET_FIELD(association_timeout);
149 	SET_FIELD(transport_specific);
150 	SET_FIELD(acceptor_poll_rate);
151 	SET_FIELD(zcopy);
152 
153 	/* Do not remove this statement, you should always update this statement when you adding a new field,
154 	 * and do not forget to add the SET_FIELD statement for your added field. */
155 	SPDK_STATIC_ASSERT(sizeof(struct spdk_nvmf_transport_opts) == 64, "Incorrect size");
156 
157 #undef SET_FIELD
158 #undef FILED_CHECK
159 }
160 
161 struct nvmf_transport_create_ctx {
162 	const struct spdk_nvmf_transport_ops *ops;
163 	struct spdk_nvmf_transport_opts opts;
164 	void *cb_arg;
165 	spdk_nvmf_transport_create_done_cb cb_fn;
166 };
167 
168 static void
169 nvmf_transport_create_async_done(void *cb_arg, struct spdk_nvmf_transport *transport)
170 {
171 	struct nvmf_transport_create_ctx *ctx = cb_arg;
172 	int chars_written;
173 
174 	if (!transport) {
175 		SPDK_ERRLOG("Failed to create transport.\n");
176 		goto err;
177 	}
178 
179 	pthread_mutex_init(&transport->mutex, NULL);
180 	TAILQ_INIT(&transport->listeners);
181 	transport->ops = ctx->ops;
182 	transport->opts = ctx->opts;
183 	chars_written = snprintf(transport->iobuf_name, MAX_MEMPOOL_NAME_LENGTH, "%s_%s_%s", "spdk_nvmf",
184 				 transport->ops->name, "data");
185 	if (chars_written < 0) {
186 		SPDK_ERRLOG("Unable to generate transport data buffer pool name.\n");
187 		goto err;
188 	}
189 
190 	if (ctx->opts.num_shared_buffers) {
191 		spdk_iobuf_register_module(transport->iobuf_name);
192 	}
193 
194 	ctx->cb_fn(ctx->cb_arg, transport);
195 	free(ctx);
196 	return;
197 
198 err:
199 	if (transport) {
200 		transport->ops->destroy(transport, NULL, NULL);
201 	}
202 
203 	ctx->cb_fn(ctx->cb_arg, NULL);
204 	free(ctx);
205 }
206 
207 static void
208 _nvmf_transport_create_done(void *ctx)
209 {
210 	struct nvmf_transport_create_ctx *_ctx = (struct nvmf_transport_create_ctx *)ctx;
211 
212 	nvmf_transport_create_async_done(_ctx, _ctx->ops->create(&_ctx->opts));
213 }
214 
215 static int
216 nvmf_transport_create(const char *transport_name, struct spdk_nvmf_transport_opts *opts,
217 		      spdk_nvmf_transport_create_done_cb cb_fn, void *cb_arg, bool sync)
218 {
219 	struct nvmf_transport_create_ctx *ctx;
220 	struct spdk_iobuf_opts opts_iobuf = {};
221 	int rc;
222 	uint64_t count;
223 
224 	ctx = calloc(1, sizeof(*ctx));
225 	if (!ctx) {
226 		return -ENOMEM;
227 	}
228 
229 	if (!opts) {
230 		SPDK_ERRLOG("opts should not be NULL\n");
231 		goto err;
232 	}
233 
234 	if (!opts->opts_size) {
235 		SPDK_ERRLOG("The opts_size in opts structure should not be zero\n");
236 		goto err;
237 	}
238 
239 	ctx->ops = nvmf_get_transport_ops(transport_name);
240 	if (!ctx->ops) {
241 		SPDK_ERRLOG("Transport type '%s' unavailable.\n", transport_name);
242 		goto err;
243 	}
244 
245 	nvmf_transport_opts_copy(&ctx->opts, opts, opts->opts_size);
246 	if (ctx->opts.max_io_size != 0 && (!spdk_u32_is_pow2(ctx->opts.max_io_size) ||
247 					   ctx->opts.max_io_size < 8192)) {
248 		SPDK_ERRLOG("max_io_size %u must be a power of 2 and be greater than or equal 8KB\n",
249 			    ctx->opts.max_io_size);
250 		goto err;
251 	}
252 
253 	if (ctx->opts.max_aq_depth < SPDK_NVMF_MIN_ADMIN_MAX_SQ_SIZE) {
254 		SPDK_ERRLOG("max_aq_depth %u is less than minimum defined by NVMf spec, use min value\n",
255 			    ctx->opts.max_aq_depth);
256 		ctx->opts.max_aq_depth = SPDK_NVMF_MIN_ADMIN_MAX_SQ_SIZE;
257 	}
258 
259 	spdk_iobuf_get_opts(&opts_iobuf);
260 	if (ctx->opts.io_unit_size > opts_iobuf.large_bufsize) {
261 		SPDK_ERRLOG("io_unit_size %u is larger than iobuf pool large buffer size %d\n",
262 			    ctx->opts.io_unit_size, opts_iobuf.large_bufsize);
263 		goto err;
264 	}
265 
266 	if (ctx->opts.io_unit_size <= opts_iobuf.small_bufsize) {
267 		/* We'll be using the small buffer pool only */
268 		count = opts_iobuf.small_pool_count;
269 	} else {
270 		count = spdk_min(opts_iobuf.small_pool_count, opts_iobuf.large_pool_count);
271 	}
272 
273 	if (ctx->opts.num_shared_buffers > count) {
274 		SPDK_WARNLOG("The num_shared_buffers value (%u) is larger than the available iobuf"
275 			     " pool size (%lu). Please increase the iobuf pool sizes.\n",
276 			     ctx->opts.num_shared_buffers, count);
277 	}
278 
279 	ctx->cb_fn = cb_fn;
280 	ctx->cb_arg = cb_arg;
281 
282 	/* Prioritize sync create operation. */
283 	if (ctx->ops->create) {
284 		if (sync) {
285 			_nvmf_transport_create_done(ctx);
286 			return 0;
287 		}
288 
289 		rc = spdk_thread_send_msg(spdk_get_thread(), _nvmf_transport_create_done, ctx);
290 		if (rc) {
291 			goto err;
292 		}
293 
294 		return 0;
295 	}
296 
297 	assert(ctx->ops->create_async);
298 	rc = ctx->ops->create_async(&ctx->opts, nvmf_transport_create_async_done, ctx);
299 	if (rc) {
300 		SPDK_ERRLOG("Unable to create new transport of type %s\n", transport_name);
301 		goto err;
302 	}
303 
304 	return 0;
305 err:
306 	free(ctx);
307 	return -1;
308 }
309 
310 int
311 spdk_nvmf_transport_create_async(const char *transport_name, struct spdk_nvmf_transport_opts *opts,
312 				 spdk_nvmf_transport_create_done_cb cb_fn, void *cb_arg)
313 {
314 	return nvmf_transport_create(transport_name, opts, cb_fn, cb_arg, false);
315 }
316 
317 static void
318 nvmf_transport_create_sync_done(void *cb_arg, struct spdk_nvmf_transport *transport)
319 {
320 	struct spdk_nvmf_transport **_transport = cb_arg;
321 
322 	*_transport = transport;
323 }
324 
325 struct spdk_nvmf_transport *
326 spdk_nvmf_transport_create(const char *transport_name, struct spdk_nvmf_transport_opts *opts)
327 {
328 	struct spdk_nvmf_transport *transport = NULL;
329 
330 	/* Current implementation supports synchronous version of create operation only. */
331 	assert(nvmf_get_transport_ops(transport_name) && nvmf_get_transport_ops(transport_name)->create);
332 
333 	nvmf_transport_create(transport_name, opts, nvmf_transport_create_sync_done, &transport, true);
334 	return transport;
335 }
336 
337 struct spdk_nvmf_transport *
338 spdk_nvmf_transport_get_first(struct spdk_nvmf_tgt *tgt)
339 {
340 	return TAILQ_FIRST(&tgt->transports);
341 }
342 
343 struct spdk_nvmf_transport *
344 spdk_nvmf_transport_get_next(struct spdk_nvmf_transport *transport)
345 {
346 	return TAILQ_NEXT(transport, link);
347 }
348 
349 int
350 spdk_nvmf_transport_destroy(struct spdk_nvmf_transport *transport,
351 			    spdk_nvmf_transport_destroy_done_cb cb_fn, void *cb_arg)
352 {
353 	struct spdk_nvmf_listener *listener, *listener_tmp;
354 
355 	TAILQ_FOREACH_SAFE(listener, &transport->listeners, link, listener_tmp) {
356 		TAILQ_REMOVE(&transport->listeners, listener, link);
357 		transport->ops->stop_listen(transport, &listener->trid);
358 		free(listener);
359 	}
360 
361 	if (transport->opts.num_shared_buffers) {
362 		spdk_iobuf_unregister_module(transport->iobuf_name);
363 	}
364 
365 	pthread_mutex_destroy(&transport->mutex);
366 	return transport->ops->destroy(transport, cb_fn, cb_arg);
367 }
368 
369 struct spdk_nvmf_listener *
370 nvmf_transport_find_listener(struct spdk_nvmf_transport *transport,
371 			     const struct spdk_nvme_transport_id *trid)
372 {
373 	struct spdk_nvmf_listener *listener;
374 
375 	TAILQ_FOREACH(listener, &transport->listeners, link) {
376 		if (spdk_nvme_transport_id_compare(&listener->trid, trid) == 0) {
377 			return listener;
378 		}
379 	}
380 
381 	return NULL;
382 }
383 
384 int
385 spdk_nvmf_transport_listen(struct spdk_nvmf_transport *transport,
386 			   const struct spdk_nvme_transport_id *trid, struct spdk_nvmf_listen_opts *opts)
387 {
388 	struct spdk_nvmf_listener *listener;
389 	int rc;
390 
391 	listener = nvmf_transport_find_listener(transport, trid);
392 	if (!listener) {
393 		listener = calloc(1, sizeof(*listener));
394 		if (!listener) {
395 			return -ENOMEM;
396 		}
397 
398 		listener->ref = 1;
399 		listener->trid = *trid;
400 		TAILQ_INSERT_TAIL(&transport->listeners, listener, link);
401 		pthread_mutex_lock(&transport->mutex);
402 		rc = transport->ops->listen(transport, &listener->trid, opts);
403 		pthread_mutex_unlock(&transport->mutex);
404 		if (rc != 0) {
405 			TAILQ_REMOVE(&transport->listeners, listener, link);
406 			free(listener);
407 		}
408 		return rc;
409 	}
410 
411 	++listener->ref;
412 
413 	return 0;
414 }
415 
416 int
417 spdk_nvmf_transport_stop_listen(struct spdk_nvmf_transport *transport,
418 				const struct spdk_nvme_transport_id *trid)
419 {
420 	struct spdk_nvmf_listener *listener;
421 
422 	listener = nvmf_transport_find_listener(transport, trid);
423 	if (!listener) {
424 		return -ENOENT;
425 	}
426 
427 	if (--listener->ref == 0) {
428 		TAILQ_REMOVE(&transport->listeners, listener, link);
429 		pthread_mutex_lock(&transport->mutex);
430 		transport->ops->stop_listen(transport, trid);
431 		pthread_mutex_unlock(&transport->mutex);
432 		free(listener);
433 	}
434 
435 	return 0;
436 }
437 
438 struct nvmf_stop_listen_ctx {
439 	struct spdk_nvmf_transport *transport;
440 	struct spdk_nvme_transport_id trid;
441 	struct spdk_nvmf_subsystem *subsystem;
442 	spdk_nvmf_tgt_subsystem_listen_done_fn cb_fn;
443 	void *cb_arg;
444 };
445 
446 static void
447 nvmf_stop_listen_fini(struct spdk_io_channel_iter *i, int status)
448 {
449 	struct nvmf_stop_listen_ctx *ctx;
450 	struct spdk_nvmf_transport *transport;
451 	int rc = status;
452 
453 	ctx = spdk_io_channel_iter_get_ctx(i);
454 	transport = ctx->transport;
455 	assert(transport != NULL);
456 
457 	rc = spdk_nvmf_transport_stop_listen(transport, &ctx->trid);
458 	if (rc) {
459 		SPDK_ERRLOG("Failed to stop listening on address '%s'\n", ctx->trid.traddr);
460 	}
461 
462 	if (ctx->cb_fn) {
463 		ctx->cb_fn(ctx->cb_arg, rc);
464 	}
465 	free(ctx);
466 }
467 
468 static void nvmf_stop_listen_disconnect_qpairs(struct spdk_io_channel_iter *i);
469 
470 static void
471 nvmf_stop_listen_disconnect_qpairs_msg(void *ctx)
472 {
473 	nvmf_stop_listen_disconnect_qpairs((struct spdk_io_channel_iter *)ctx);
474 }
475 
476 static void
477 nvmf_stop_listen_disconnect_qpairs(struct spdk_io_channel_iter *i)
478 {
479 	struct nvmf_stop_listen_ctx *ctx;
480 	struct spdk_nvmf_poll_group *group;
481 	struct spdk_io_channel *ch;
482 	struct spdk_nvmf_qpair *qpair, *tmp_qpair;
483 	struct spdk_nvme_transport_id tmp_trid;
484 	bool qpair_found = false;
485 
486 	ctx = spdk_io_channel_iter_get_ctx(i);
487 	ch = spdk_io_channel_iter_get_channel(i);
488 	group = spdk_io_channel_get_ctx(ch);
489 
490 	TAILQ_FOREACH_SAFE(qpair, &group->qpairs, link, tmp_qpair) {
491 		if (spdk_nvmf_qpair_get_listen_trid(qpair, &tmp_trid)) {
492 			continue;
493 		}
494 
495 		/* Skip qpairs that don't match the listen trid and subsystem pointer.  If
496 		 * the ctx->subsystem is NULL, it means disconnect all qpairs that match
497 		 * the listen trid. */
498 		if (!spdk_nvme_transport_id_compare(&ctx->trid, &tmp_trid)) {
499 			if (ctx->subsystem == NULL ||
500 			    (qpair->ctrlr != NULL && ctx->subsystem == qpair->ctrlr->subsys)) {
501 				spdk_nvmf_qpair_disconnect(qpair, NULL, NULL);
502 				qpair_found = true;
503 			}
504 		}
505 	}
506 	if (qpair_found) {
507 		spdk_thread_send_msg(spdk_get_thread(), nvmf_stop_listen_disconnect_qpairs_msg, i);
508 		return;
509 	}
510 
511 	spdk_for_each_channel_continue(i, 0);
512 }
513 
514 int
515 spdk_nvmf_transport_stop_listen_async(struct spdk_nvmf_transport *transport,
516 				      const struct spdk_nvme_transport_id *trid,
517 				      struct spdk_nvmf_subsystem *subsystem,
518 				      spdk_nvmf_tgt_subsystem_listen_done_fn cb_fn,
519 				      void *cb_arg)
520 {
521 	struct nvmf_stop_listen_ctx *ctx;
522 
523 	if (trid->subnqn[0] != '\0') {
524 		SPDK_ERRLOG("subnqn should be empty, use subsystem pointer instead\n");
525 		return -EINVAL;
526 	}
527 
528 	ctx = calloc(1, sizeof(struct nvmf_stop_listen_ctx));
529 	if (ctx == NULL) {
530 		return -ENOMEM;
531 	}
532 
533 	ctx->trid = *trid;
534 	ctx->subsystem = subsystem;
535 	ctx->transport = transport;
536 	ctx->cb_fn = cb_fn;
537 	ctx->cb_arg = cb_arg;
538 
539 	spdk_for_each_channel(transport->tgt, nvmf_stop_listen_disconnect_qpairs, ctx,
540 			      nvmf_stop_listen_fini);
541 
542 	return 0;
543 }
544 
545 void
546 nvmf_transport_listener_discover(struct spdk_nvmf_transport *transport,
547 				 struct spdk_nvme_transport_id *trid,
548 				 struct spdk_nvmf_discovery_log_page_entry *entry)
549 {
550 	transport->ops->listener_discover(transport, trid, entry);
551 }
552 
553 struct spdk_nvmf_transport_poll_group *
554 nvmf_transport_poll_group_create(struct spdk_nvmf_transport *transport,
555 				 struct spdk_nvmf_poll_group *group)
556 {
557 	struct spdk_nvmf_transport_poll_group *tgroup;
558 	struct spdk_iobuf_opts opts_iobuf = {};
559 	uint32_t buf_cache_size, small_cache_size, large_cache_size;
560 	int rc;
561 
562 	pthread_mutex_lock(&transport->mutex);
563 	tgroup = transport->ops->poll_group_create(transport, group);
564 	pthread_mutex_unlock(&transport->mutex);
565 	if (!tgroup) {
566 		return NULL;
567 	}
568 	tgroup->transport = transport;
569 
570 	STAILQ_INIT(&tgroup->pending_buf_queue);
571 
572 	if (transport->opts.buf_cache_size == 0) {
573 		/* We aren't going to allocate any buffers for the cache, so just return now. */
574 		return tgroup;
575 	}
576 
577 	buf_cache_size = transport->opts.buf_cache_size;
578 
579 	/* buf_cache_size of UINT32_MAX means the value should be calculated dynamically
580 	 * based on the number of buffers in the shared pool and the number of poll groups
581 	 * that are sharing them.  We allocate 75% of the pool for the cache, and then
582 	 * divide that by number of poll groups to determine the buf_cache_size for this
583 	 * poll group.
584 	 */
585 	if (buf_cache_size == UINT32_MAX) {
586 		uint32_t num_shared_buffers = transport->opts.num_shared_buffers;
587 
588 		/* Theoretically the nvmf library can dynamically add poll groups to
589 		 * the target, after transports have already been created.  We aren't
590 		 * going to try to really handle this case efficiently, just do enough
591 		 * here to ensure we don't divide-by-zero.
592 		 */
593 		uint16_t num_poll_groups = group->tgt->num_poll_groups ? : spdk_env_get_core_count();
594 
595 		buf_cache_size = (num_shared_buffers * 3 / 4) / num_poll_groups;
596 	}
597 
598 	spdk_iobuf_get_opts(&opts_iobuf);
599 	small_cache_size = buf_cache_size;
600 	if (transport->opts.io_unit_size <= opts_iobuf.small_bufsize) {
601 		large_cache_size = 0;
602 	} else {
603 		large_cache_size = buf_cache_size;
604 	}
605 
606 	rc = spdk_iobuf_channel_init(&tgroup->buf_cache, transport->iobuf_name,
607 				     small_cache_size, large_cache_size);
608 
609 	if (rc != 0) {
610 		SPDK_ERRLOG("Unable to reserve the full number of buffers for the pg buffer cache.\n");
611 		rc = spdk_iobuf_channel_init(&tgroup->buf_cache, transport->iobuf_name, 0, 0);
612 		if (rc != 0) {
613 			SPDK_ERRLOG("Unable to create an iobuf channel in the poll group.\n");
614 			transport->ops->poll_group_destroy(tgroup);
615 			return NULL;
616 		}
617 	}
618 
619 	return tgroup;
620 }
621 
622 struct spdk_nvmf_transport_poll_group *
623 nvmf_transport_get_optimal_poll_group(struct spdk_nvmf_transport *transport,
624 				      struct spdk_nvmf_qpair *qpair)
625 {
626 	struct spdk_nvmf_transport_poll_group *tgroup;
627 
628 	if (transport->ops->get_optimal_poll_group) {
629 		pthread_mutex_lock(&transport->mutex);
630 		tgroup = transport->ops->get_optimal_poll_group(qpair);
631 		pthread_mutex_unlock(&transport->mutex);
632 
633 		return tgroup;
634 	} else {
635 		return NULL;
636 	}
637 }
638 
639 void
640 nvmf_transport_poll_group_destroy(struct spdk_nvmf_transport_poll_group *group)
641 {
642 	struct spdk_nvmf_transport *transport;
643 	struct spdk_iobuf_channel ch;
644 
645 	transport = group->transport;
646 
647 	if (!STAILQ_EMPTY(&group->pending_buf_queue)) {
648 		SPDK_ERRLOG("Pending I/O list wasn't empty on poll group destruction\n");
649 	}
650 
651 	if (transport->opts.buf_cache_size) {
652 		/* The call to poll_group_destroy both frees the group memory,
653 		 * but also releases any remaining buffers. Make a copy of
654 		 * the channel onto the stack so we can still release the
655 		 * resources after the group has been freed. */
656 		ch = group->buf_cache;
657 	}
658 
659 	pthread_mutex_lock(&transport->mutex);
660 	transport->ops->poll_group_destroy(group);
661 	pthread_mutex_unlock(&transport->mutex);
662 
663 	if (transport->opts.buf_cache_size) {
664 		spdk_iobuf_channel_fini(&ch);
665 	}
666 }
667 
668 int
669 nvmf_transport_poll_group_add(struct spdk_nvmf_transport_poll_group *group,
670 			      struct spdk_nvmf_qpair *qpair)
671 {
672 	if (qpair->transport) {
673 		assert(qpair->transport == group->transport);
674 		if (qpair->transport != group->transport) {
675 			return -1;
676 		}
677 	} else {
678 		qpair->transport = group->transport;
679 	}
680 
681 	SPDK_DTRACE_PROBE3(nvmf_transport_poll_group_add, qpair, qpair->qid,
682 			   spdk_thread_get_id(group->group->thread));
683 
684 	return group->transport->ops->poll_group_add(group, qpair);
685 }
686 
687 int
688 nvmf_transport_poll_group_remove(struct spdk_nvmf_transport_poll_group *group,
689 				 struct spdk_nvmf_qpair *qpair)
690 {
691 	int rc = ENOTSUP;
692 
693 	SPDK_DTRACE_PROBE3(nvmf_transport_poll_group_remove, qpair, qpair->qid,
694 			   spdk_thread_get_id(group->group->thread));
695 
696 	assert(qpair->transport == group->transport);
697 	if (group->transport->ops->poll_group_remove) {
698 		rc = group->transport->ops->poll_group_remove(group, qpair);
699 	}
700 
701 	return rc;
702 }
703 
704 int
705 nvmf_transport_poll_group_poll(struct spdk_nvmf_transport_poll_group *group)
706 {
707 	return group->transport->ops->poll_group_poll(group);
708 }
709 
710 int
711 nvmf_transport_req_free(struct spdk_nvmf_request *req)
712 {
713 	return req->qpair->transport->ops->req_free(req);
714 }
715 
716 int
717 nvmf_transport_req_complete(struct spdk_nvmf_request *req)
718 {
719 	return req->qpair->transport->ops->req_complete(req);
720 }
721 
722 void
723 nvmf_transport_qpair_fini(struct spdk_nvmf_qpair *qpair,
724 			  spdk_nvmf_transport_qpair_fini_cb cb_fn,
725 			  void *cb_arg)
726 {
727 	SPDK_DTRACE_PROBE1(nvmf_transport_qpair_fini, qpair);
728 
729 	qpair->transport->ops->qpair_fini(qpair, cb_fn, cb_arg);
730 }
731 
732 int
733 nvmf_transport_qpair_get_peer_trid(struct spdk_nvmf_qpair *qpair,
734 				   struct spdk_nvme_transport_id *trid)
735 {
736 	return qpair->transport->ops->qpair_get_peer_trid(qpair, trid);
737 }
738 
739 int
740 nvmf_transport_qpair_get_local_trid(struct spdk_nvmf_qpair *qpair,
741 				    struct spdk_nvme_transport_id *trid)
742 {
743 	return qpair->transport->ops->qpair_get_local_trid(qpair, trid);
744 }
745 
746 int
747 nvmf_transport_qpair_get_listen_trid(struct spdk_nvmf_qpair *qpair,
748 				     struct spdk_nvme_transport_id *trid)
749 {
750 	return qpair->transport->ops->qpair_get_listen_trid(qpair, trid);
751 }
752 
753 void
754 nvmf_transport_qpair_abort_request(struct spdk_nvmf_qpair *qpair,
755 				   struct spdk_nvmf_request *req)
756 {
757 	if (qpair->transport->ops->qpair_abort_request) {
758 		qpair->transport->ops->qpair_abort_request(qpair, req);
759 	}
760 }
761 
762 bool
763 spdk_nvmf_transport_opts_init(const char *transport_name,
764 			      struct spdk_nvmf_transport_opts *opts, size_t opts_size)
765 {
766 	const struct spdk_nvmf_transport_ops *ops;
767 	struct spdk_nvmf_transport_opts opts_local = {};
768 
769 	ops = nvmf_get_transport_ops(transport_name);
770 	if (!ops) {
771 		SPDK_ERRLOG("Transport type %s unavailable.\n", transport_name);
772 		return false;
773 	}
774 
775 	if (!opts) {
776 		SPDK_ERRLOG("opts should not be NULL\n");
777 		return false;
778 	}
779 
780 	if (!opts_size) {
781 		SPDK_ERRLOG("opts_size inside opts should not be zero value\n");
782 		return false;
783 	}
784 
785 	opts_local.association_timeout = NVMF_TRANSPORT_DEFAULT_ASSOCIATION_TIMEOUT_IN_MS;
786 	opts_local.acceptor_poll_rate = SPDK_NVMF_DEFAULT_ACCEPT_POLL_RATE_US;
787 	ops->opts_init(&opts_local);
788 
789 	nvmf_transport_opts_copy(opts, &opts_local, opts_size);
790 
791 	return true;
792 }
793 
794 void
795 spdk_nvmf_request_free_buffers(struct spdk_nvmf_request *req,
796 			       struct spdk_nvmf_transport_poll_group *group,
797 			       struct spdk_nvmf_transport *transport)
798 {
799 	uint32_t i;
800 
801 	for (i = 0; i < req->iovcnt; i++) {
802 		spdk_iobuf_put(&group->buf_cache, req->iov[i].iov_base, req->iov[i].iov_len);
803 		req->iov[i].iov_base = NULL;
804 		req->iov[i].iov_len = 0;
805 	}
806 	req->iovcnt = 0;
807 	req->data_from_pool = false;
808 }
809 
810 typedef int (*set_buffer_callback)(struct spdk_nvmf_request *req, void *buf,
811 				   uint32_t length,	uint32_t io_unit_size);
812 static int
813 nvmf_request_set_buffer(struct spdk_nvmf_request *req, void *buf, uint32_t length,
814 			uint32_t io_unit_size)
815 {
816 	req->iov[req->iovcnt].iov_base = buf;
817 	req->iov[req->iovcnt].iov_len  = spdk_min(length, io_unit_size);
818 	length -= req->iov[req->iovcnt].iov_len;
819 	req->iovcnt++;
820 
821 	return length;
822 }
823 
824 static int
825 nvmf_request_get_buffers(struct spdk_nvmf_request *req,
826 			 struct spdk_nvmf_transport_poll_group *group,
827 			 struct spdk_nvmf_transport *transport,
828 			 uint32_t length, uint32_t io_unit_size,
829 			 set_buffer_callback cb_func)
830 {
831 	uint32_t num_buffers;
832 	uint32_t i = 0;
833 	void *buffer;
834 
835 	/* If the number of buffers is too large, then we know the I/O is larger than allowed.
836 	 *  Fail it.
837 	 */
838 	num_buffers = SPDK_CEIL_DIV(length, io_unit_size);
839 	if (num_buffers > NVMF_REQ_MAX_BUFFERS) {
840 		return -EINVAL;
841 	}
842 
843 	while (i < num_buffers) {
844 		buffer = spdk_iobuf_get(&group->buf_cache, spdk_min(io_unit_size, length), NULL, NULL);
845 		if (buffer == NULL) {
846 			return -ENOMEM;
847 		}
848 		length = cb_func(req, buffer, length, io_unit_size);
849 		i++;
850 	}
851 
852 	assert(length == 0);
853 
854 	return 0;
855 }
856 
857 int
858 spdk_nvmf_request_get_buffers(struct spdk_nvmf_request *req,
859 			      struct spdk_nvmf_transport_poll_group *group,
860 			      struct spdk_nvmf_transport *transport,
861 			      uint32_t length)
862 {
863 	int rc;
864 
865 	req->iovcnt = 0;
866 	rc = nvmf_request_get_buffers(req, group, transport, length,
867 				      transport->opts.io_unit_size,
868 				      nvmf_request_set_buffer);
869 	if (!rc) {
870 		req->data_from_pool = true;
871 	} else if (rc == -ENOMEM) {
872 		spdk_nvmf_request_free_buffers(req, group, transport);
873 		return rc;
874 	}
875 
876 	return rc;
877 }
878 
879 static int
880 nvmf_request_set_stripped_buffer(struct spdk_nvmf_request *req, void *buf, uint32_t length,
881 				 uint32_t io_unit_size)
882 {
883 	struct spdk_nvmf_stripped_data *data = req->stripped_data;
884 
885 	data->iov[data->iovcnt].iov_base = buf;
886 	data->iov[data->iovcnt].iov_len  = spdk_min(length, io_unit_size);
887 	length -= data->iov[data->iovcnt].iov_len;
888 	data->iovcnt++;
889 
890 	return length;
891 }
892 
893 void
894 nvmf_request_free_stripped_buffers(struct spdk_nvmf_request *req,
895 				   struct spdk_nvmf_transport_poll_group *group,
896 				   struct spdk_nvmf_transport *transport)
897 {
898 	struct spdk_nvmf_stripped_data *data = req->stripped_data;
899 	uint32_t i;
900 
901 	for (i = 0; i < data->iovcnt; i++) {
902 		spdk_iobuf_put(&group->buf_cache, data->iov[i].iov_base, data->iov[i].iov_len);
903 	}
904 	free(data);
905 	req->stripped_data = NULL;
906 }
907 
908 int
909 nvmf_request_get_stripped_buffers(struct spdk_nvmf_request *req,
910 				  struct spdk_nvmf_transport_poll_group *group,
911 				  struct spdk_nvmf_transport *transport,
912 				  uint32_t length)
913 {
914 	uint32_t block_size = req->dif.dif_ctx.block_size;
915 	uint32_t data_block_size = block_size - req->dif.dif_ctx.md_size;
916 	uint32_t io_unit_size = transport->opts.io_unit_size / block_size * data_block_size;
917 	struct spdk_nvmf_stripped_data *data;
918 	uint32_t i;
919 	int rc;
920 
921 	/* Data blocks must be block aligned */
922 	for (i = 0; i < req->iovcnt; i++) {
923 		if (req->iov[i].iov_len % block_size) {
924 			return -EINVAL;
925 		}
926 	}
927 
928 	data = calloc(1, sizeof(*data));
929 	if (data == NULL) {
930 		SPDK_ERRLOG("Unable to allocate memory for stripped_data.\n");
931 		return -ENOMEM;
932 	}
933 	req->stripped_data = data;
934 	req->stripped_data->iovcnt = 0;
935 
936 	rc = nvmf_request_get_buffers(req, group, transport, length, io_unit_size,
937 				      nvmf_request_set_stripped_buffer);
938 	if (rc == -ENOMEM) {
939 		nvmf_request_free_stripped_buffers(req, group, transport);
940 		return rc;
941 	}
942 	return rc;
943 }
944