xref: /spdk/lib/nvmf/transport.c (revision 1e3d25b901a6b9d2dce4999e2ecbc02f98d79f05)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation. All rights reserved.
3  *   Copyright (c) 2018-2019, 2021 Mellanox Technologies LTD. All rights reserved.
4  *   Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "nvmf_internal.h"
10 #include "transport.h"
11 
12 #include "spdk/config.h"
13 #include "spdk/log.h"
14 #include "spdk/nvmf.h"
15 #include "spdk/nvmf_transport.h"
16 #include "spdk/queue.h"
17 #include "spdk/util.h"
18 #include "spdk_internal/usdt.h"
19 
20 #define NVMF_TRANSPORT_DEFAULT_ASSOCIATION_TIMEOUT_IN_MS 120000
21 
22 struct nvmf_transport_ops_list_element {
23 	struct spdk_nvmf_transport_ops			ops;
24 	TAILQ_ENTRY(nvmf_transport_ops_list_element)	link;
25 };
26 
27 TAILQ_HEAD(nvmf_transport_ops_list, nvmf_transport_ops_list_element)
28 g_spdk_nvmf_transport_ops = TAILQ_HEAD_INITIALIZER(g_spdk_nvmf_transport_ops);
29 
30 static inline const struct spdk_nvmf_transport_ops *
31 nvmf_get_transport_ops(const char *transport_name)
32 {
33 	struct nvmf_transport_ops_list_element *ops;
34 	TAILQ_FOREACH(ops, &g_spdk_nvmf_transport_ops, link) {
35 		if (strcasecmp(transport_name, ops->ops.name) == 0) {
36 			return &ops->ops;
37 		}
38 	}
39 	return NULL;
40 }
41 
42 void
43 spdk_nvmf_transport_register(const struct spdk_nvmf_transport_ops *ops)
44 {
45 	struct nvmf_transport_ops_list_element *new_ops;
46 
47 	if (nvmf_get_transport_ops(ops->name) != NULL) {
48 		SPDK_ERRLOG("Double registering nvmf transport type %s.\n", ops->name);
49 		assert(false);
50 		return;
51 	}
52 
53 	new_ops = calloc(1, sizeof(*new_ops));
54 	if (new_ops == NULL) {
55 		SPDK_ERRLOG("Unable to allocate memory to register new transport type %s.\n", ops->name);
56 		assert(false);
57 		return;
58 	}
59 
60 	new_ops->ops = *ops;
61 
62 	TAILQ_INSERT_TAIL(&g_spdk_nvmf_transport_ops, new_ops, link);
63 }
64 
65 const struct spdk_nvmf_transport_opts *
66 spdk_nvmf_get_transport_opts(struct spdk_nvmf_transport *transport)
67 {
68 	return &transport->opts;
69 }
70 
71 void
72 nvmf_transport_dump_opts(struct spdk_nvmf_transport *transport, struct spdk_json_write_ctx *w,
73 			 bool named)
74 {
75 	const struct spdk_nvmf_transport_opts *opts = spdk_nvmf_get_transport_opts(transport);
76 
77 	named ? spdk_json_write_named_object_begin(w, "params") : spdk_json_write_object_begin(w);
78 
79 	spdk_json_write_named_string(w, "trtype", spdk_nvmf_get_transport_name(transport));
80 	spdk_json_write_named_uint32(w, "max_queue_depth", opts->max_queue_depth);
81 	spdk_json_write_named_uint32(w, "max_io_qpairs_per_ctrlr", opts->max_qpairs_per_ctrlr - 1);
82 	spdk_json_write_named_uint32(w, "in_capsule_data_size", opts->in_capsule_data_size);
83 	spdk_json_write_named_uint32(w, "max_io_size", opts->max_io_size);
84 	spdk_json_write_named_uint32(w, "io_unit_size", opts->io_unit_size);
85 	spdk_json_write_named_uint32(w, "max_aq_depth", opts->max_aq_depth);
86 	spdk_json_write_named_uint32(w, "num_shared_buffers", opts->num_shared_buffers);
87 	spdk_json_write_named_uint32(w, "buf_cache_size", opts->buf_cache_size);
88 	spdk_json_write_named_bool(w, "dif_insert_or_strip", opts->dif_insert_or_strip);
89 	spdk_json_write_named_bool(w, "zcopy", opts->zcopy);
90 
91 	if (transport->ops->dump_opts) {
92 		transport->ops->dump_opts(transport, w);
93 	}
94 
95 	spdk_json_write_named_uint32(w, "abort_timeout_sec", opts->abort_timeout_sec);
96 	spdk_json_write_object_end(w);
97 }
98 
99 void
100 nvmf_transport_listen_dump_opts(struct spdk_nvmf_transport *transport,
101 				const struct spdk_nvme_transport_id *trid, struct spdk_json_write_ctx *w)
102 {
103 	const char *adrfam = spdk_nvme_transport_id_adrfam_str(trid->adrfam);
104 
105 	spdk_json_write_named_object_begin(w, "listen_address");
106 
107 	spdk_json_write_named_string(w, "trtype", trid->trstring);
108 	spdk_json_write_named_string(w, "adrfam", adrfam ? adrfam : "unknown");
109 	spdk_json_write_named_string(w, "traddr", trid->traddr);
110 	spdk_json_write_named_string(w, "trsvcid", trid->trsvcid);
111 
112 	spdk_json_write_object_end(w);
113 
114 	if (transport->ops->listen_dump_opts) {
115 		transport->ops->listen_dump_opts(transport, trid, w);
116 	}
117 }
118 
119 spdk_nvme_transport_type_t
120 spdk_nvmf_get_transport_type(struct spdk_nvmf_transport *transport)
121 {
122 	return transport->ops->type;
123 }
124 
125 const char *
126 spdk_nvmf_get_transport_name(struct spdk_nvmf_transport *transport)
127 {
128 	return transport->ops->name;
129 }
130 
131 static void
132 nvmf_transport_opts_copy(struct spdk_nvmf_transport_opts *opts,
133 			 struct spdk_nvmf_transport_opts *opts_src,
134 			 size_t opts_size)
135 {
136 	assert(opts);
137 	assert(opts_src);
138 
139 	opts->opts_size = opts_size;
140 
141 #define SET_FIELD(field) \
142 	if (offsetof(struct spdk_nvmf_transport_opts, field) + sizeof(opts->field) <= opts_size) { \
143 		opts->field = opts_src->field; \
144 	} \
145 
146 	SET_FIELD(max_queue_depth);
147 	SET_FIELD(max_qpairs_per_ctrlr);
148 	SET_FIELD(in_capsule_data_size);
149 	SET_FIELD(max_io_size);
150 	SET_FIELD(io_unit_size);
151 	SET_FIELD(max_aq_depth);
152 	SET_FIELD(buf_cache_size);
153 	SET_FIELD(num_shared_buffers);
154 	SET_FIELD(dif_insert_or_strip);
155 	SET_FIELD(abort_timeout_sec);
156 	SET_FIELD(association_timeout);
157 	SET_FIELD(transport_specific);
158 	SET_FIELD(acceptor_poll_rate);
159 	SET_FIELD(zcopy);
160 
161 	/* Do not remove this statement, you should always update this statement when you adding a new field,
162 	 * and do not forget to add the SET_FIELD statement for your added field. */
163 	SPDK_STATIC_ASSERT(sizeof(struct spdk_nvmf_transport_opts) == 64, "Incorrect size");
164 
165 #undef SET_FIELD
166 #undef FILED_CHECK
167 }
168 
169 struct nvmf_transport_create_ctx {
170 	const struct spdk_nvmf_transport_ops *ops;
171 	struct spdk_nvmf_transport_opts opts;
172 	void *cb_arg;
173 	spdk_nvmf_transport_create_done_cb cb_fn;
174 };
175 
176 static void
177 nvmf_transport_create_async_done(void *cb_arg, struct spdk_nvmf_transport *transport)
178 {
179 	struct nvmf_transport_create_ctx *ctx = cb_arg;
180 	int chars_written;
181 
182 	if (!transport) {
183 		SPDK_ERRLOG("Failed to create transport.\n");
184 		goto err;
185 	}
186 
187 	pthread_mutex_init(&transport->mutex, NULL);
188 	TAILQ_INIT(&transport->listeners);
189 	transport->ops = ctx->ops;
190 	transport->opts = ctx->opts;
191 	chars_written = snprintf(transport->iobuf_name, MAX_MEMPOOL_NAME_LENGTH, "%s_%s_%s", "spdk_nvmf",
192 				 transport->ops->name, "data");
193 	if (chars_written < 0) {
194 		SPDK_ERRLOG("Unable to generate transport data buffer pool name.\n");
195 		goto err;
196 	}
197 
198 	if (ctx->opts.num_shared_buffers) {
199 		spdk_iobuf_register_module(transport->iobuf_name);
200 	}
201 
202 	ctx->cb_fn(ctx->cb_arg, transport);
203 	free(ctx);
204 	return;
205 
206 err:
207 	if (transport) {
208 		transport->ops->destroy(transport, NULL, NULL);
209 	}
210 
211 	ctx->cb_fn(ctx->cb_arg, NULL);
212 	free(ctx);
213 }
214 
215 static void
216 _nvmf_transport_create_done(void *ctx)
217 {
218 	struct nvmf_transport_create_ctx *_ctx = (struct nvmf_transport_create_ctx *)ctx;
219 
220 	nvmf_transport_create_async_done(_ctx, _ctx->ops->create(&_ctx->opts));
221 }
222 
223 static int
224 nvmf_transport_create(const char *transport_name, struct spdk_nvmf_transport_opts *opts,
225 		      spdk_nvmf_transport_create_done_cb cb_fn, void *cb_arg, bool sync)
226 {
227 	struct nvmf_transport_create_ctx *ctx;
228 	struct spdk_iobuf_opts opts_iobuf = {};
229 	int rc;
230 	uint64_t count;
231 
232 	ctx = calloc(1, sizeof(*ctx));
233 	if (!ctx) {
234 		return -ENOMEM;
235 	}
236 
237 	if (!opts) {
238 		SPDK_ERRLOG("opts should not be NULL\n");
239 		goto err;
240 	}
241 
242 	if (!opts->opts_size) {
243 		SPDK_ERRLOG("The opts_size in opts structure should not be zero\n");
244 		goto err;
245 	}
246 
247 	ctx->ops = nvmf_get_transport_ops(transport_name);
248 	if (!ctx->ops) {
249 		SPDK_ERRLOG("Transport type '%s' unavailable.\n", transport_name);
250 		goto err;
251 	}
252 
253 	nvmf_transport_opts_copy(&ctx->opts, opts, opts->opts_size);
254 	if (ctx->opts.max_io_size != 0 && (!spdk_u32_is_pow2(ctx->opts.max_io_size) ||
255 					   ctx->opts.max_io_size < 8192)) {
256 		SPDK_ERRLOG("max_io_size %u must be a power of 2 and be greater than or equal 8KB\n",
257 			    ctx->opts.max_io_size);
258 		goto err;
259 	}
260 
261 	if (ctx->opts.max_aq_depth < SPDK_NVMF_MIN_ADMIN_MAX_SQ_SIZE) {
262 		SPDK_ERRLOG("max_aq_depth %u is less than minimum defined by NVMf spec, use min value\n",
263 			    ctx->opts.max_aq_depth);
264 		ctx->opts.max_aq_depth = SPDK_NVMF_MIN_ADMIN_MAX_SQ_SIZE;
265 	}
266 
267 	spdk_iobuf_get_opts(&opts_iobuf);
268 	if (ctx->opts.io_unit_size > opts_iobuf.large_bufsize) {
269 		SPDK_ERRLOG("io_unit_size %u is larger than iobuf pool large buffer size %d\n",
270 			    ctx->opts.io_unit_size, opts_iobuf.large_bufsize);
271 		goto err;
272 	}
273 
274 	if (ctx->opts.io_unit_size <= opts_iobuf.small_bufsize) {
275 		/* We'll be using the small buffer pool only */
276 		count = opts_iobuf.small_pool_count;
277 	} else {
278 		count = spdk_min(opts_iobuf.small_pool_count, opts_iobuf.large_pool_count);
279 	}
280 
281 	if (ctx->opts.num_shared_buffers > count) {
282 		SPDK_WARNLOG("The num_shared_buffers value (%u) is larger than the available iobuf"
283 			     " pool size (%lu). Please increase the iobuf pool sizes.\n",
284 			     ctx->opts.num_shared_buffers, count);
285 	}
286 
287 	ctx->cb_fn = cb_fn;
288 	ctx->cb_arg = cb_arg;
289 
290 	/* Prioritize sync create operation. */
291 	if (ctx->ops->create) {
292 		if (sync) {
293 			_nvmf_transport_create_done(ctx);
294 			return 0;
295 		}
296 
297 		rc = spdk_thread_send_msg(spdk_get_thread(), _nvmf_transport_create_done, ctx);
298 		if (rc) {
299 			goto err;
300 		}
301 
302 		return 0;
303 	}
304 
305 	assert(ctx->ops->create_async);
306 	rc = ctx->ops->create_async(&ctx->opts, nvmf_transport_create_async_done, ctx);
307 	if (rc) {
308 		SPDK_ERRLOG("Unable to create new transport of type %s\n", transport_name);
309 		goto err;
310 	}
311 
312 	return 0;
313 err:
314 	free(ctx);
315 	return -1;
316 }
317 
318 int
319 spdk_nvmf_transport_create_async(const char *transport_name, struct spdk_nvmf_transport_opts *opts,
320 				 spdk_nvmf_transport_create_done_cb cb_fn, void *cb_arg)
321 {
322 	return nvmf_transport_create(transport_name, opts, cb_fn, cb_arg, false);
323 }
324 
325 static void
326 nvmf_transport_create_sync_done(void *cb_arg, struct spdk_nvmf_transport *transport)
327 {
328 	struct spdk_nvmf_transport **_transport = cb_arg;
329 
330 	*_transport = transport;
331 }
332 
333 struct spdk_nvmf_transport *
334 spdk_nvmf_transport_create(const char *transport_name, struct spdk_nvmf_transport_opts *opts)
335 {
336 	struct spdk_nvmf_transport *transport = NULL;
337 
338 	/* Current implementation supports synchronous version of create operation only. */
339 	assert(nvmf_get_transport_ops(transport_name) && nvmf_get_transport_ops(transport_name)->create);
340 
341 	nvmf_transport_create(transport_name, opts, nvmf_transport_create_sync_done, &transport, true);
342 	return transport;
343 }
344 
345 struct spdk_nvmf_transport *
346 spdk_nvmf_transport_get_first(struct spdk_nvmf_tgt *tgt)
347 {
348 	return TAILQ_FIRST(&tgt->transports);
349 }
350 
351 struct spdk_nvmf_transport *
352 spdk_nvmf_transport_get_next(struct spdk_nvmf_transport *transport)
353 {
354 	return TAILQ_NEXT(transport, link);
355 }
356 
357 int
358 spdk_nvmf_transport_destroy(struct spdk_nvmf_transport *transport,
359 			    spdk_nvmf_transport_destroy_done_cb cb_fn, void *cb_arg)
360 {
361 	struct spdk_nvmf_listener *listener, *listener_tmp;
362 
363 	TAILQ_FOREACH_SAFE(listener, &transport->listeners, link, listener_tmp) {
364 		TAILQ_REMOVE(&transport->listeners, listener, link);
365 		transport->ops->stop_listen(transport, &listener->trid);
366 		free(listener);
367 	}
368 
369 	if (transport->opts.num_shared_buffers) {
370 		spdk_iobuf_unregister_module(transport->iobuf_name);
371 	}
372 
373 	pthread_mutex_destroy(&transport->mutex);
374 	return transport->ops->destroy(transport, cb_fn, cb_arg);
375 }
376 
377 struct spdk_nvmf_listener *
378 nvmf_transport_find_listener(struct spdk_nvmf_transport *transport,
379 			     const struct spdk_nvme_transport_id *trid)
380 {
381 	struct spdk_nvmf_listener *listener;
382 
383 	TAILQ_FOREACH(listener, &transport->listeners, link) {
384 		if (spdk_nvme_transport_id_compare(&listener->trid, trid) == 0) {
385 			return listener;
386 		}
387 	}
388 
389 	return NULL;
390 }
391 
392 int
393 spdk_nvmf_transport_listen(struct spdk_nvmf_transport *transport,
394 			   const struct spdk_nvme_transport_id *trid, struct spdk_nvmf_listen_opts *opts)
395 {
396 	struct spdk_nvmf_listener *listener;
397 	int rc;
398 
399 	listener = nvmf_transport_find_listener(transport, trid);
400 	if (!listener) {
401 		listener = calloc(1, sizeof(*listener));
402 		if (!listener) {
403 			return -ENOMEM;
404 		}
405 
406 		listener->ref = 1;
407 		listener->trid = *trid;
408 		TAILQ_INSERT_TAIL(&transport->listeners, listener, link);
409 		pthread_mutex_lock(&transport->mutex);
410 		rc = transport->ops->listen(transport, &listener->trid, opts);
411 		pthread_mutex_unlock(&transport->mutex);
412 		if (rc != 0) {
413 			TAILQ_REMOVE(&transport->listeners, listener, link);
414 			free(listener);
415 		}
416 		return rc;
417 	}
418 
419 	++listener->ref;
420 
421 	return 0;
422 }
423 
424 int
425 spdk_nvmf_transport_stop_listen(struct spdk_nvmf_transport *transport,
426 				const struct spdk_nvme_transport_id *trid)
427 {
428 	struct spdk_nvmf_listener *listener;
429 
430 	listener = nvmf_transport_find_listener(transport, trid);
431 	if (!listener) {
432 		return -ENOENT;
433 	}
434 
435 	if (--listener->ref == 0) {
436 		TAILQ_REMOVE(&transport->listeners, listener, link);
437 		pthread_mutex_lock(&transport->mutex);
438 		transport->ops->stop_listen(transport, trid);
439 		pthread_mutex_unlock(&transport->mutex);
440 		free(listener);
441 	}
442 
443 	return 0;
444 }
445 
446 struct nvmf_stop_listen_ctx {
447 	struct spdk_nvmf_transport *transport;
448 	struct spdk_nvme_transport_id trid;
449 	struct spdk_nvmf_subsystem *subsystem;
450 	spdk_nvmf_tgt_subsystem_listen_done_fn cb_fn;
451 	void *cb_arg;
452 };
453 
454 static void
455 nvmf_stop_listen_fini(struct spdk_io_channel_iter *i, int status)
456 {
457 	struct nvmf_stop_listen_ctx *ctx;
458 	struct spdk_nvmf_transport *transport;
459 	int rc = status;
460 
461 	ctx = spdk_io_channel_iter_get_ctx(i);
462 	transport = ctx->transport;
463 	assert(transport != NULL);
464 
465 	rc = spdk_nvmf_transport_stop_listen(transport, &ctx->trid);
466 	if (rc) {
467 		SPDK_ERRLOG("Failed to stop listening on address '%s'\n", ctx->trid.traddr);
468 	}
469 
470 	if (ctx->cb_fn) {
471 		ctx->cb_fn(ctx->cb_arg, rc);
472 	}
473 	free(ctx);
474 }
475 
476 static void nvmf_stop_listen_disconnect_qpairs(struct spdk_io_channel_iter *i);
477 
478 static void
479 nvmf_stop_listen_disconnect_qpairs_msg(void *ctx)
480 {
481 	nvmf_stop_listen_disconnect_qpairs((struct spdk_io_channel_iter *)ctx);
482 }
483 
484 static void
485 nvmf_stop_listen_disconnect_qpairs(struct spdk_io_channel_iter *i)
486 {
487 	struct nvmf_stop_listen_ctx *ctx;
488 	struct spdk_nvmf_poll_group *group;
489 	struct spdk_io_channel *ch;
490 	struct spdk_nvmf_qpair *qpair, *tmp_qpair;
491 	struct spdk_nvme_transport_id tmp_trid;
492 	bool qpair_found = false;
493 
494 	ctx = spdk_io_channel_iter_get_ctx(i);
495 	ch = spdk_io_channel_iter_get_channel(i);
496 	group = spdk_io_channel_get_ctx(ch);
497 
498 	TAILQ_FOREACH_SAFE(qpair, &group->qpairs, link, tmp_qpair) {
499 		if (spdk_nvmf_qpair_get_listen_trid(qpair, &tmp_trid)) {
500 			continue;
501 		}
502 
503 		/* Skip qpairs that don't match the listen trid and subsystem pointer.  If
504 		 * the ctx->subsystem is NULL, it means disconnect all qpairs that match
505 		 * the listen trid. */
506 		if (!spdk_nvme_transport_id_compare(&ctx->trid, &tmp_trid)) {
507 			if (ctx->subsystem == NULL ||
508 			    (qpair->ctrlr != NULL && ctx->subsystem == qpair->ctrlr->subsys)) {
509 				spdk_nvmf_qpair_disconnect(qpair, NULL, NULL);
510 				qpair_found = true;
511 			}
512 		}
513 	}
514 	if (qpair_found) {
515 		spdk_thread_send_msg(spdk_get_thread(), nvmf_stop_listen_disconnect_qpairs_msg, i);
516 		return;
517 	}
518 
519 	spdk_for_each_channel_continue(i, 0);
520 }
521 
522 int
523 spdk_nvmf_transport_stop_listen_async(struct spdk_nvmf_transport *transport,
524 				      const struct spdk_nvme_transport_id *trid,
525 				      struct spdk_nvmf_subsystem *subsystem,
526 				      spdk_nvmf_tgt_subsystem_listen_done_fn cb_fn,
527 				      void *cb_arg)
528 {
529 	struct nvmf_stop_listen_ctx *ctx;
530 
531 	if (trid->subnqn[0] != '\0') {
532 		SPDK_ERRLOG("subnqn should be empty, use subsystem pointer instead\n");
533 		return -EINVAL;
534 	}
535 
536 	ctx = calloc(1, sizeof(struct nvmf_stop_listen_ctx));
537 	if (ctx == NULL) {
538 		return -ENOMEM;
539 	}
540 
541 	ctx->trid = *trid;
542 	ctx->subsystem = subsystem;
543 	ctx->transport = transport;
544 	ctx->cb_fn = cb_fn;
545 	ctx->cb_arg = cb_arg;
546 
547 	spdk_for_each_channel(transport->tgt, nvmf_stop_listen_disconnect_qpairs, ctx,
548 			      nvmf_stop_listen_fini);
549 
550 	return 0;
551 }
552 
553 void
554 nvmf_transport_listener_discover(struct spdk_nvmf_transport *transport,
555 				 struct spdk_nvme_transport_id *trid,
556 				 struct spdk_nvmf_discovery_log_page_entry *entry)
557 {
558 	transport->ops->listener_discover(transport, trid, entry);
559 }
560 
561 struct spdk_nvmf_transport_poll_group *
562 nvmf_transport_poll_group_create(struct spdk_nvmf_transport *transport,
563 				 struct spdk_nvmf_poll_group *group)
564 {
565 	struct spdk_nvmf_transport_poll_group *tgroup;
566 	struct spdk_iobuf_opts opts_iobuf = {};
567 	uint32_t buf_cache_size, small_cache_size, large_cache_size;
568 	int rc;
569 
570 	pthread_mutex_lock(&transport->mutex);
571 	tgroup = transport->ops->poll_group_create(transport, group);
572 	pthread_mutex_unlock(&transport->mutex);
573 	if (!tgroup) {
574 		return NULL;
575 	}
576 	tgroup->transport = transport;
577 
578 	STAILQ_INIT(&tgroup->pending_buf_queue);
579 
580 	if (transport->opts.buf_cache_size == 0) {
581 		/* We aren't going to allocate any buffers for the cache, so just return now. */
582 		return tgroup;
583 	}
584 
585 	buf_cache_size = transport->opts.buf_cache_size;
586 
587 	/* buf_cache_size of UINT32_MAX means the value should be calculated dynamically
588 	 * based on the number of buffers in the shared pool and the number of poll groups
589 	 * that are sharing them.  We allocate 75% of the pool for the cache, and then
590 	 * divide that by number of poll groups to determine the buf_cache_size for this
591 	 * poll group.
592 	 */
593 	if (buf_cache_size == UINT32_MAX) {
594 		uint32_t num_shared_buffers = transport->opts.num_shared_buffers;
595 
596 		/* Theoretically the nvmf library can dynamically add poll groups to
597 		 * the target, after transports have already been created.  We aren't
598 		 * going to try to really handle this case efficiently, just do enough
599 		 * here to ensure we don't divide-by-zero.
600 		 */
601 		uint16_t num_poll_groups = group->tgt->num_poll_groups ? : spdk_env_get_core_count();
602 
603 		buf_cache_size = (num_shared_buffers * 3 / 4) / num_poll_groups;
604 	}
605 
606 	spdk_iobuf_get_opts(&opts_iobuf);
607 	small_cache_size = buf_cache_size;
608 	if (transport->opts.io_unit_size <= opts_iobuf.small_bufsize) {
609 		large_cache_size = 0;
610 	} else {
611 		large_cache_size = buf_cache_size;
612 	}
613 
614 	rc = spdk_iobuf_channel_init(&tgroup->buf_cache, transport->iobuf_name,
615 				     small_cache_size, large_cache_size);
616 
617 	if (rc != 0) {
618 		SPDK_ERRLOG("Unable to reserve the full number of buffers for the pg buffer cache.\n");
619 		rc = spdk_iobuf_channel_init(&tgroup->buf_cache, transport->iobuf_name, 0, 0);
620 		if (rc != 0) {
621 			SPDK_ERRLOG("Unable to create an iobuf channel in the poll group.\n");
622 			transport->ops->poll_group_destroy(tgroup);
623 			return NULL;
624 		}
625 	}
626 
627 	return tgroup;
628 }
629 
630 struct spdk_nvmf_transport_poll_group *
631 nvmf_transport_get_optimal_poll_group(struct spdk_nvmf_transport *transport,
632 				      struct spdk_nvmf_qpair *qpair)
633 {
634 	struct spdk_nvmf_transport_poll_group *tgroup;
635 
636 	if (transport->ops->get_optimal_poll_group) {
637 		pthread_mutex_lock(&transport->mutex);
638 		tgroup = transport->ops->get_optimal_poll_group(qpair);
639 		pthread_mutex_unlock(&transport->mutex);
640 
641 		return tgroup;
642 	} else {
643 		return NULL;
644 	}
645 }
646 
647 void
648 nvmf_transport_poll_group_destroy(struct spdk_nvmf_transport_poll_group *group)
649 {
650 	struct spdk_nvmf_transport *transport;
651 	struct spdk_iobuf_channel ch;
652 
653 	transport = group->transport;
654 
655 	if (!STAILQ_EMPTY(&group->pending_buf_queue)) {
656 		SPDK_ERRLOG("Pending I/O list wasn't empty on poll group destruction\n");
657 	}
658 
659 	if (transport->opts.buf_cache_size) {
660 		/* The call to poll_group_destroy both frees the group memory,
661 		 * but also releases any remaining buffers. Make a copy of
662 		 * the channel onto the stack so we can still release the
663 		 * resources after the group has been freed. */
664 		ch = group->buf_cache;
665 	}
666 
667 	pthread_mutex_lock(&transport->mutex);
668 	transport->ops->poll_group_destroy(group);
669 	pthread_mutex_unlock(&transport->mutex);
670 
671 	if (transport->opts.buf_cache_size) {
672 		spdk_iobuf_channel_fini(&ch);
673 	}
674 }
675 
676 int
677 nvmf_transport_poll_group_add(struct spdk_nvmf_transport_poll_group *group,
678 			      struct spdk_nvmf_qpair *qpair)
679 {
680 	if (qpair->transport) {
681 		assert(qpair->transport == group->transport);
682 		if (qpair->transport != group->transport) {
683 			return -1;
684 		}
685 	} else {
686 		qpair->transport = group->transport;
687 	}
688 
689 	SPDK_DTRACE_PROBE3(nvmf_transport_poll_group_add, qpair, qpair->qid,
690 			   spdk_thread_get_id(group->group->thread));
691 
692 	return group->transport->ops->poll_group_add(group, qpair);
693 }
694 
695 int
696 nvmf_transport_poll_group_remove(struct spdk_nvmf_transport_poll_group *group,
697 				 struct spdk_nvmf_qpair *qpair)
698 {
699 	int rc = ENOTSUP;
700 
701 	SPDK_DTRACE_PROBE3(nvmf_transport_poll_group_remove, qpair, qpair->qid,
702 			   spdk_thread_get_id(group->group->thread));
703 
704 	assert(qpair->transport == group->transport);
705 	if (group->transport->ops->poll_group_remove) {
706 		rc = group->transport->ops->poll_group_remove(group, qpair);
707 	}
708 
709 	return rc;
710 }
711 
712 int
713 nvmf_transport_poll_group_poll(struct spdk_nvmf_transport_poll_group *group)
714 {
715 	return group->transport->ops->poll_group_poll(group);
716 }
717 
718 int
719 nvmf_transport_req_free(struct spdk_nvmf_request *req)
720 {
721 	return req->qpair->transport->ops->req_free(req);
722 }
723 
724 int
725 nvmf_transport_req_complete(struct spdk_nvmf_request *req)
726 {
727 	return req->qpair->transport->ops->req_complete(req);
728 }
729 
730 void
731 nvmf_transport_qpair_fini(struct spdk_nvmf_qpair *qpair,
732 			  spdk_nvmf_transport_qpair_fini_cb cb_fn,
733 			  void *cb_arg)
734 {
735 	SPDK_DTRACE_PROBE1(nvmf_transport_qpair_fini, qpair);
736 
737 	qpair->transport->ops->qpair_fini(qpair, cb_fn, cb_arg);
738 }
739 
740 int
741 nvmf_transport_qpair_get_peer_trid(struct spdk_nvmf_qpair *qpair,
742 				   struct spdk_nvme_transport_id *trid)
743 {
744 	return qpair->transport->ops->qpair_get_peer_trid(qpair, trid);
745 }
746 
747 int
748 nvmf_transport_qpair_get_local_trid(struct spdk_nvmf_qpair *qpair,
749 				    struct spdk_nvme_transport_id *trid)
750 {
751 	return qpair->transport->ops->qpair_get_local_trid(qpair, trid);
752 }
753 
754 int
755 nvmf_transport_qpair_get_listen_trid(struct spdk_nvmf_qpair *qpair,
756 				     struct spdk_nvme_transport_id *trid)
757 {
758 	return qpair->transport->ops->qpair_get_listen_trid(qpair, trid);
759 }
760 
761 void
762 nvmf_transport_qpair_abort_request(struct spdk_nvmf_qpair *qpair,
763 				   struct spdk_nvmf_request *req)
764 {
765 	if (qpair->transport->ops->qpair_abort_request) {
766 		qpair->transport->ops->qpair_abort_request(qpair, req);
767 	}
768 }
769 
770 bool
771 spdk_nvmf_transport_opts_init(const char *transport_name,
772 			      struct spdk_nvmf_transport_opts *opts, size_t opts_size)
773 {
774 	const struct spdk_nvmf_transport_ops *ops;
775 	struct spdk_nvmf_transport_opts opts_local = {};
776 
777 	ops = nvmf_get_transport_ops(transport_name);
778 	if (!ops) {
779 		SPDK_ERRLOG("Transport type %s unavailable.\n", transport_name);
780 		return false;
781 	}
782 
783 	if (!opts) {
784 		SPDK_ERRLOG("opts should not be NULL\n");
785 		return false;
786 	}
787 
788 	if (!opts_size) {
789 		SPDK_ERRLOG("opts_size inside opts should not be zero value\n");
790 		return false;
791 	}
792 
793 	opts_local.association_timeout = NVMF_TRANSPORT_DEFAULT_ASSOCIATION_TIMEOUT_IN_MS;
794 	opts_local.acceptor_poll_rate = SPDK_NVMF_DEFAULT_ACCEPT_POLL_RATE_US;
795 	ops->opts_init(&opts_local);
796 
797 	nvmf_transport_opts_copy(opts, &opts_local, opts_size);
798 
799 	return true;
800 }
801 
802 void
803 spdk_nvmf_request_free_buffers(struct spdk_nvmf_request *req,
804 			       struct spdk_nvmf_transport_poll_group *group,
805 			       struct spdk_nvmf_transport *transport)
806 {
807 	uint32_t i;
808 
809 	for (i = 0; i < req->iovcnt; i++) {
810 		spdk_iobuf_put(&group->buf_cache, req->iov[i].iov_base, req->iov[i].iov_len);
811 		req->iov[i].iov_base = NULL;
812 		req->iov[i].iov_len = 0;
813 	}
814 	req->iovcnt = 0;
815 	req->data_from_pool = false;
816 }
817 
818 typedef int (*set_buffer_callback)(struct spdk_nvmf_request *req, void *buf,
819 				   uint32_t length,	uint32_t io_unit_size);
820 static int
821 nvmf_request_set_buffer(struct spdk_nvmf_request *req, void *buf, uint32_t length,
822 			uint32_t io_unit_size)
823 {
824 	req->iov[req->iovcnt].iov_base = buf;
825 	req->iov[req->iovcnt].iov_len  = spdk_min(length, io_unit_size);
826 	length -= req->iov[req->iovcnt].iov_len;
827 	req->iovcnt++;
828 
829 	return length;
830 }
831 
832 static int
833 nvmf_request_get_buffers(struct spdk_nvmf_request *req,
834 			 struct spdk_nvmf_transport_poll_group *group,
835 			 struct spdk_nvmf_transport *transport,
836 			 uint32_t length, uint32_t io_unit_size,
837 			 set_buffer_callback cb_func)
838 {
839 	uint32_t num_buffers;
840 	uint32_t i = 0;
841 	void *buffer;
842 
843 	/* If the number of buffers is too large, then we know the I/O is larger than allowed.
844 	 *  Fail it.
845 	 */
846 	num_buffers = SPDK_CEIL_DIV(length, io_unit_size);
847 	if (num_buffers > NVMF_REQ_MAX_BUFFERS) {
848 		return -EINVAL;
849 	}
850 
851 	while (i < num_buffers) {
852 		buffer = spdk_iobuf_get(&group->buf_cache, spdk_min(io_unit_size, length), NULL, NULL);
853 		if (buffer == NULL) {
854 			return -ENOMEM;
855 		}
856 		length = cb_func(req, buffer, length, io_unit_size);
857 		i++;
858 	}
859 
860 	assert(length == 0);
861 
862 	return 0;
863 }
864 
865 int
866 spdk_nvmf_request_get_buffers(struct spdk_nvmf_request *req,
867 			      struct spdk_nvmf_transport_poll_group *group,
868 			      struct spdk_nvmf_transport *transport,
869 			      uint32_t length)
870 {
871 	int rc;
872 
873 	req->iovcnt = 0;
874 	rc = nvmf_request_get_buffers(req, group, transport, length,
875 				      transport->opts.io_unit_size,
876 				      nvmf_request_set_buffer);
877 	if (!rc) {
878 		req->data_from_pool = true;
879 	} else if (rc == -ENOMEM) {
880 		spdk_nvmf_request_free_buffers(req, group, transport);
881 		return rc;
882 	}
883 
884 	return rc;
885 }
886 
887 static int
888 nvmf_request_set_stripped_buffer(struct spdk_nvmf_request *req, void *buf, uint32_t length,
889 				 uint32_t io_unit_size)
890 {
891 	struct spdk_nvmf_stripped_data *data = req->stripped_data;
892 
893 	data->iov[data->iovcnt].iov_base = buf;
894 	data->iov[data->iovcnt].iov_len  = spdk_min(length, io_unit_size);
895 	length -= data->iov[data->iovcnt].iov_len;
896 	data->iovcnt++;
897 
898 	return length;
899 }
900 
901 void
902 nvmf_request_free_stripped_buffers(struct spdk_nvmf_request *req,
903 				   struct spdk_nvmf_transport_poll_group *group,
904 				   struct spdk_nvmf_transport *transport)
905 {
906 	struct spdk_nvmf_stripped_data *data = req->stripped_data;
907 	uint32_t i;
908 
909 	for (i = 0; i < data->iovcnt; i++) {
910 		spdk_iobuf_put(&group->buf_cache, data->iov[i].iov_base, data->iov[i].iov_len);
911 	}
912 	free(data);
913 	req->stripped_data = NULL;
914 }
915 
916 int
917 nvmf_request_get_stripped_buffers(struct spdk_nvmf_request *req,
918 				  struct spdk_nvmf_transport_poll_group *group,
919 				  struct spdk_nvmf_transport *transport,
920 				  uint32_t length)
921 {
922 	uint32_t block_size = req->dif.dif_ctx.block_size;
923 	uint32_t data_block_size = block_size - req->dif.dif_ctx.md_size;
924 	uint32_t io_unit_size = transport->opts.io_unit_size / block_size * data_block_size;
925 	struct spdk_nvmf_stripped_data *data;
926 	uint32_t i;
927 	int rc;
928 
929 	/* Data blocks must be block aligned */
930 	for (i = 0; i < req->iovcnt; i++) {
931 		if (req->iov[i].iov_len % block_size) {
932 			return -EINVAL;
933 		}
934 	}
935 
936 	data = calloc(1, sizeof(*data));
937 	if (data == NULL) {
938 		SPDK_ERRLOG("Unable to allocate memory for stripped_data.\n");
939 		return -ENOMEM;
940 	}
941 	req->stripped_data = data;
942 	req->stripped_data->iovcnt = 0;
943 
944 	rc = nvmf_request_get_buffers(req, group, transport, length, io_unit_size,
945 				      nvmf_request_set_stripped_buffer);
946 	if (rc == -ENOMEM) {
947 		nvmf_request_free_stripped_buffers(req, group, transport);
948 		return rc;
949 	}
950 	return rc;
951 }
952