xref: /spdk/lib/nvmf/transport.c (revision aac967c0d312ef9076b316afd934926f687e5336)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation. All rights reserved.
3  *   Copyright (c) 2018-2019, 2021 Mellanox Technologies LTD. All rights reserved.
4  *   Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "nvmf_internal.h"
10 #include "transport.h"
11 
12 #include "spdk/config.h"
13 #include "spdk/log.h"
14 #include "spdk/nvmf.h"
15 #include "spdk/nvmf_transport.h"
16 #include "spdk/queue.h"
17 #include "spdk/util.h"
18 #include "spdk_internal/usdt.h"
19 
20 #define NVMF_TRANSPORT_DEFAULT_ASSOCIATION_TIMEOUT_IN_MS 120000
21 
22 struct nvmf_transport_ops_list_element {
23 	struct spdk_nvmf_transport_ops			ops;
24 	TAILQ_ENTRY(nvmf_transport_ops_list_element)	link;
25 };
26 
27 TAILQ_HEAD(nvmf_transport_ops_list, nvmf_transport_ops_list_element)
28 g_spdk_nvmf_transport_ops = TAILQ_HEAD_INITIALIZER(g_spdk_nvmf_transport_ops);
29 
30 static inline const struct spdk_nvmf_transport_ops *
31 nvmf_get_transport_ops(const char *transport_name)
32 {
33 	struct nvmf_transport_ops_list_element *ops;
34 	TAILQ_FOREACH(ops, &g_spdk_nvmf_transport_ops, link) {
35 		if (strcasecmp(transport_name, ops->ops.name) == 0) {
36 			return &ops->ops;
37 		}
38 	}
39 	return NULL;
40 }
41 
42 void
43 spdk_nvmf_transport_register(const struct spdk_nvmf_transport_ops *ops)
44 {
45 	struct nvmf_transport_ops_list_element *new_ops;
46 
47 	if (nvmf_get_transport_ops(ops->name) != NULL) {
48 		SPDK_ERRLOG("Double registering nvmf transport type %s.\n", ops->name);
49 		assert(false);
50 		return;
51 	}
52 
53 	new_ops = calloc(1, sizeof(*new_ops));
54 	if (new_ops == NULL) {
55 		SPDK_ERRLOG("Unable to allocate memory to register new transport type %s.\n", ops->name);
56 		assert(false);
57 		return;
58 	}
59 
60 	new_ops->ops = *ops;
61 
62 	TAILQ_INSERT_TAIL(&g_spdk_nvmf_transport_ops, new_ops, link);
63 }
64 
65 const struct spdk_nvmf_transport_opts *
66 spdk_nvmf_get_transport_opts(struct spdk_nvmf_transport *transport)
67 {
68 	return &transport->opts;
69 }
70 
71 void
72 nvmf_transport_dump_opts(struct spdk_nvmf_transport *transport, struct spdk_json_write_ctx *w,
73 			 bool named)
74 {
75 	const struct spdk_nvmf_transport_opts *opts = spdk_nvmf_get_transport_opts(transport);
76 
77 	named ? spdk_json_write_named_object_begin(w, "params") : spdk_json_write_object_begin(w);
78 
79 	spdk_json_write_named_string(w, "trtype", spdk_nvmf_get_transport_name(transport));
80 	spdk_json_write_named_uint32(w, "max_queue_depth", opts->max_queue_depth);
81 	spdk_json_write_named_uint32(w, "max_io_qpairs_per_ctrlr", opts->max_qpairs_per_ctrlr - 1);
82 	spdk_json_write_named_uint32(w, "in_capsule_data_size", opts->in_capsule_data_size);
83 	spdk_json_write_named_uint32(w, "max_io_size", opts->max_io_size);
84 	spdk_json_write_named_uint32(w, "io_unit_size", opts->io_unit_size);
85 	spdk_json_write_named_uint32(w, "max_aq_depth", opts->max_aq_depth);
86 	spdk_json_write_named_uint32(w, "num_shared_buffers", opts->num_shared_buffers);
87 	spdk_json_write_named_uint32(w, "buf_cache_size", opts->buf_cache_size);
88 	spdk_json_write_named_bool(w, "dif_insert_or_strip", opts->dif_insert_or_strip);
89 	spdk_json_write_named_bool(w, "zcopy", opts->zcopy);
90 
91 	if (transport->ops->dump_opts) {
92 		transport->ops->dump_opts(transport, w);
93 	}
94 
95 	spdk_json_write_named_uint32(w, "abort_timeout_sec", opts->abort_timeout_sec);
96 	spdk_json_write_named_uint32(w, "ack_timeout", opts->ack_timeout);
97 	spdk_json_write_named_uint32(w, "data_wr_pool_size", opts->data_wr_pool_size);
98 	spdk_json_write_object_end(w);
99 }
100 
101 void
102 nvmf_transport_listen_dump_trid(const struct spdk_nvme_transport_id *trid,
103 				struct spdk_json_write_ctx *w)
104 {
105 	const char *adrfam = spdk_nvme_transport_id_adrfam_str(trid->adrfam);
106 
107 	spdk_json_write_named_string(w, "trtype", trid->trstring);
108 	spdk_json_write_named_string(w, "adrfam", adrfam ? adrfam : "unknown");
109 	spdk_json_write_named_string(w, "traddr", trid->traddr);
110 	spdk_json_write_named_string(w, "trsvcid", trid->trsvcid);
111 }
112 
113 spdk_nvme_transport_type_t
114 spdk_nvmf_get_transport_type(struct spdk_nvmf_transport *transport)
115 {
116 	return transport->ops->type;
117 }
118 
119 const char *
120 spdk_nvmf_get_transport_name(struct spdk_nvmf_transport *transport)
121 {
122 	return transport->ops->name;
123 }
124 
125 static void
126 nvmf_transport_opts_copy(struct spdk_nvmf_transport_opts *opts,
127 			 struct spdk_nvmf_transport_opts *opts_src,
128 			 size_t opts_size)
129 {
130 	assert(opts);
131 	assert(opts_src);
132 
133 	opts->opts_size = opts_size;
134 
135 #define SET_FIELD(field) \
136 	if (offsetof(struct spdk_nvmf_transport_opts, field) + sizeof(opts->field) <= opts_size) { \
137 		opts->field = opts_src->field; \
138 	} \
139 
140 	SET_FIELD(max_queue_depth);
141 	SET_FIELD(max_qpairs_per_ctrlr);
142 	SET_FIELD(in_capsule_data_size);
143 	SET_FIELD(max_io_size);
144 	SET_FIELD(io_unit_size);
145 	SET_FIELD(max_aq_depth);
146 	SET_FIELD(buf_cache_size);
147 	SET_FIELD(num_shared_buffers);
148 	SET_FIELD(dif_insert_or_strip);
149 	SET_FIELD(abort_timeout_sec);
150 	SET_FIELD(association_timeout);
151 	SET_FIELD(transport_specific);
152 	SET_FIELD(acceptor_poll_rate);
153 	SET_FIELD(zcopy);
154 	SET_FIELD(ack_timeout);
155 	SET_FIELD(data_wr_pool_size);
156 
157 	/* Do not remove this statement, you should always update this statement when you adding a new field,
158 	 * and do not forget to add the SET_FIELD statement for your added field. */
159 	SPDK_STATIC_ASSERT(sizeof(struct spdk_nvmf_transport_opts) == 72, "Incorrect size");
160 
161 #undef SET_FIELD
162 #undef FILED_CHECK
163 }
164 
165 struct nvmf_transport_create_ctx {
166 	const struct spdk_nvmf_transport_ops *ops;
167 	struct spdk_nvmf_transport_opts opts;
168 	void *cb_arg;
169 	spdk_nvmf_transport_create_done_cb cb_fn;
170 };
171 
172 static bool
173 nvmf_transport_use_iobuf(struct spdk_nvmf_transport *transport)
174 {
175 	return transport->opts.num_shared_buffers || transport->opts.buf_cache_size;
176 }
177 
178 static void
179 nvmf_transport_create_async_done(void *cb_arg, struct spdk_nvmf_transport *transport)
180 {
181 	struct nvmf_transport_create_ctx *ctx = cb_arg;
182 	int chars_written;
183 
184 	if (!transport) {
185 		SPDK_ERRLOG("Failed to create transport.\n");
186 		goto err;
187 	}
188 
189 	pthread_mutex_init(&transport->mutex, NULL);
190 	TAILQ_INIT(&transport->listeners);
191 	transport->ops = ctx->ops;
192 	transport->opts = ctx->opts;
193 	chars_written = snprintf(transport->iobuf_name, MAX_MEMPOOL_NAME_LENGTH, "%s_%s", "nvmf",
194 				 transport->ops->name);
195 	if (chars_written < 0) {
196 		SPDK_ERRLOG("Unable to generate transport data buffer pool name.\n");
197 		goto err;
198 	}
199 
200 	if (nvmf_transport_use_iobuf(transport)) {
201 		spdk_iobuf_register_module(transport->iobuf_name);
202 	}
203 
204 	ctx->cb_fn(ctx->cb_arg, transport);
205 	free(ctx);
206 	return;
207 
208 err:
209 	if (transport) {
210 		transport->ops->destroy(transport, NULL, NULL);
211 	}
212 
213 	ctx->cb_fn(ctx->cb_arg, NULL);
214 	free(ctx);
215 }
216 
217 static void
218 _nvmf_transport_create_done(void *ctx)
219 {
220 	struct nvmf_transport_create_ctx *_ctx = (struct nvmf_transport_create_ctx *)ctx;
221 
222 	nvmf_transport_create_async_done(_ctx, _ctx->ops->create(&_ctx->opts));
223 }
224 
225 static int
226 nvmf_transport_create(const char *transport_name, struct spdk_nvmf_transport_opts *opts,
227 		      spdk_nvmf_transport_create_done_cb cb_fn, void *cb_arg, bool sync)
228 {
229 	struct nvmf_transport_create_ctx *ctx;
230 	struct spdk_iobuf_opts opts_iobuf = {};
231 	int rc;
232 	uint64_t count;
233 
234 	ctx = calloc(1, sizeof(*ctx));
235 	if (!ctx) {
236 		return -ENOMEM;
237 	}
238 
239 	if (!opts) {
240 		SPDK_ERRLOG("opts should not be NULL\n");
241 		goto err;
242 	}
243 
244 	if (!opts->opts_size) {
245 		SPDK_ERRLOG("The opts_size in opts structure should not be zero\n");
246 		goto err;
247 	}
248 
249 	ctx->ops = nvmf_get_transport_ops(transport_name);
250 	if (!ctx->ops) {
251 		SPDK_ERRLOG("Transport type '%s' unavailable.\n", transport_name);
252 		goto err;
253 	}
254 
255 	nvmf_transport_opts_copy(&ctx->opts, opts, opts->opts_size);
256 	if (ctx->opts.max_io_size != 0 && (!spdk_u32_is_pow2(ctx->opts.max_io_size) ||
257 					   ctx->opts.max_io_size < 8192)) {
258 		SPDK_ERRLOG("max_io_size %u must be a power of 2 and be greater than or equal 8KB\n",
259 			    ctx->opts.max_io_size);
260 		goto err;
261 	}
262 
263 	if (ctx->opts.max_aq_depth < SPDK_NVMF_MIN_ADMIN_MAX_SQ_SIZE) {
264 		SPDK_ERRLOG("max_aq_depth %u is less than minimum defined by NVMf spec, use min value\n",
265 			    ctx->opts.max_aq_depth);
266 		ctx->opts.max_aq_depth = SPDK_NVMF_MIN_ADMIN_MAX_SQ_SIZE;
267 	}
268 
269 	spdk_iobuf_get_opts(&opts_iobuf, sizeof(opts_iobuf));
270 	if (ctx->opts.io_unit_size == 0) {
271 		SPDK_ERRLOG("io_unit_size cannot be 0\n");
272 		goto err;
273 	}
274 	if (ctx->opts.io_unit_size > opts_iobuf.large_bufsize) {
275 		SPDK_ERRLOG("io_unit_size %u is larger than iobuf pool large buffer size %d\n",
276 			    ctx->opts.io_unit_size, opts_iobuf.large_bufsize);
277 		goto err;
278 	}
279 
280 	if (ctx->opts.io_unit_size <= opts_iobuf.small_bufsize) {
281 		/* We'll be using the small buffer pool only */
282 		count = opts_iobuf.small_pool_count;
283 	} else {
284 		count = spdk_min(opts_iobuf.small_pool_count, opts_iobuf.large_pool_count);
285 	}
286 
287 	if (ctx->opts.num_shared_buffers > count) {
288 		SPDK_WARNLOG("The num_shared_buffers value (%u) is larger than the available iobuf"
289 			     " pool size (%lu). Please increase the iobuf pool sizes.\n",
290 			     ctx->opts.num_shared_buffers, count);
291 	}
292 
293 	ctx->cb_fn = cb_fn;
294 	ctx->cb_arg = cb_arg;
295 
296 	/* Prioritize sync create operation. */
297 	if (ctx->ops->create) {
298 		if (sync) {
299 			_nvmf_transport_create_done(ctx);
300 			return 0;
301 		}
302 
303 		rc = spdk_thread_send_msg(spdk_get_thread(), _nvmf_transport_create_done, ctx);
304 		if (rc) {
305 			goto err;
306 		}
307 
308 		return 0;
309 	}
310 
311 	assert(ctx->ops->create_async);
312 	rc = ctx->ops->create_async(&ctx->opts, nvmf_transport_create_async_done, ctx);
313 	if (rc) {
314 		SPDK_ERRLOG("Unable to create new transport of type %s\n", transport_name);
315 		goto err;
316 	}
317 
318 	return 0;
319 err:
320 	free(ctx);
321 	return -1;
322 }
323 
324 int
325 spdk_nvmf_transport_create_async(const char *transport_name, struct spdk_nvmf_transport_opts *opts,
326 				 spdk_nvmf_transport_create_done_cb cb_fn, void *cb_arg)
327 {
328 	return nvmf_transport_create(transport_name, opts, cb_fn, cb_arg, false);
329 }
330 
331 static void
332 nvmf_transport_create_sync_done(void *cb_arg, struct spdk_nvmf_transport *transport)
333 {
334 	struct spdk_nvmf_transport **_transport = cb_arg;
335 
336 	*_transport = transport;
337 }
338 
339 struct spdk_nvmf_transport *
340 spdk_nvmf_transport_create(const char *transport_name, struct spdk_nvmf_transport_opts *opts)
341 {
342 	struct spdk_nvmf_transport *transport = NULL;
343 
344 	/* Current implementation supports synchronous version of create operation only. */
345 	assert(nvmf_get_transport_ops(transport_name) && nvmf_get_transport_ops(transport_name)->create);
346 
347 	nvmf_transport_create(transport_name, opts, nvmf_transport_create_sync_done, &transport, true);
348 	return transport;
349 }
350 
351 struct spdk_nvmf_transport *
352 spdk_nvmf_transport_get_first(struct spdk_nvmf_tgt *tgt)
353 {
354 	return TAILQ_FIRST(&tgt->transports);
355 }
356 
357 struct spdk_nvmf_transport *
358 spdk_nvmf_transport_get_next(struct spdk_nvmf_transport *transport)
359 {
360 	return TAILQ_NEXT(transport, link);
361 }
362 
363 int
364 spdk_nvmf_transport_destroy(struct spdk_nvmf_transport *transport,
365 			    spdk_nvmf_transport_destroy_done_cb cb_fn, void *cb_arg)
366 {
367 	struct spdk_nvmf_listener *listener, *listener_tmp;
368 
369 	TAILQ_FOREACH_SAFE(listener, &transport->listeners, link, listener_tmp) {
370 		TAILQ_REMOVE(&transport->listeners, listener, link);
371 		transport->ops->stop_listen(transport, &listener->trid);
372 		free(listener);
373 	}
374 
375 	if (nvmf_transport_use_iobuf(transport)) {
376 		spdk_iobuf_unregister_module(transport->iobuf_name);
377 	}
378 
379 	pthread_mutex_destroy(&transport->mutex);
380 	return transport->ops->destroy(transport, cb_fn, cb_arg);
381 }
382 
383 struct spdk_nvmf_listener *
384 nvmf_transport_find_listener(struct spdk_nvmf_transport *transport,
385 			     const struct spdk_nvme_transport_id *trid)
386 {
387 	struct spdk_nvmf_listener *listener;
388 
389 	TAILQ_FOREACH(listener, &transport->listeners, link) {
390 		if (spdk_nvme_transport_id_compare(&listener->trid, trid) == 0) {
391 			return listener;
392 		}
393 	}
394 
395 	return NULL;
396 }
397 
398 int
399 spdk_nvmf_transport_listen(struct spdk_nvmf_transport *transport,
400 			   const struct spdk_nvme_transport_id *trid, struct spdk_nvmf_listen_opts *opts)
401 {
402 	struct spdk_nvmf_listener *listener;
403 	int rc;
404 
405 	listener = nvmf_transport_find_listener(transport, trid);
406 	if (!listener) {
407 		listener = calloc(1, sizeof(*listener));
408 		if (!listener) {
409 			return -ENOMEM;
410 		}
411 
412 		listener->ref = 1;
413 		listener->trid = *trid;
414 		listener->sock_impl = opts->sock_impl;
415 		TAILQ_INSERT_TAIL(&transport->listeners, listener, link);
416 		pthread_mutex_lock(&transport->mutex);
417 		rc = transport->ops->listen(transport, &listener->trid, opts);
418 		pthread_mutex_unlock(&transport->mutex);
419 		if (rc != 0) {
420 			TAILQ_REMOVE(&transport->listeners, listener, link);
421 			free(listener);
422 		}
423 		return rc;
424 	}
425 
426 	if (opts->sock_impl && strncmp(opts->sock_impl, listener->sock_impl, strlen(listener->sock_impl))) {
427 		SPDK_ERRLOG("opts->sock_impl: '%s' doesn't match listener->sock_impl: '%s'\n", opts->sock_impl,
428 			    listener->sock_impl);
429 		return -EINVAL;
430 	}
431 
432 	++listener->ref;
433 
434 	return 0;
435 }
436 
437 int
438 spdk_nvmf_transport_stop_listen(struct spdk_nvmf_transport *transport,
439 				const struct spdk_nvme_transport_id *trid)
440 {
441 	struct spdk_nvmf_listener *listener;
442 
443 	listener = nvmf_transport_find_listener(transport, trid);
444 	if (!listener) {
445 		return -ENOENT;
446 	}
447 
448 	if (--listener->ref == 0) {
449 		TAILQ_REMOVE(&transport->listeners, listener, link);
450 		pthread_mutex_lock(&transport->mutex);
451 		transport->ops->stop_listen(transport, trid);
452 		pthread_mutex_unlock(&transport->mutex);
453 		free(listener);
454 	}
455 
456 	return 0;
457 }
458 
459 struct nvmf_stop_listen_ctx {
460 	struct spdk_nvmf_transport *transport;
461 	struct spdk_nvme_transport_id trid;
462 	struct spdk_nvmf_subsystem *subsystem;
463 	spdk_nvmf_tgt_subsystem_listen_done_fn cb_fn;
464 	void *cb_arg;
465 };
466 
467 static void
468 nvmf_stop_listen_fini(struct spdk_io_channel_iter *i, int status)
469 {
470 	struct nvmf_stop_listen_ctx *ctx;
471 	struct spdk_nvmf_transport *transport;
472 	int rc = status;
473 
474 	ctx = spdk_io_channel_iter_get_ctx(i);
475 	transport = ctx->transport;
476 	assert(transport != NULL);
477 
478 	rc = spdk_nvmf_transport_stop_listen(transport, &ctx->trid);
479 	if (rc) {
480 		SPDK_ERRLOG("Failed to stop listening on address '%s'\n", ctx->trid.traddr);
481 	}
482 
483 	if (ctx->cb_fn) {
484 		ctx->cb_fn(ctx->cb_arg, rc);
485 	}
486 	free(ctx);
487 }
488 
489 static void nvmf_stop_listen_disconnect_qpairs(struct spdk_io_channel_iter *i);
490 
491 static void
492 nvmf_stop_listen_disconnect_qpairs_msg(void *ctx)
493 {
494 	nvmf_stop_listen_disconnect_qpairs((struct spdk_io_channel_iter *)ctx);
495 }
496 
497 static void
498 nvmf_stop_listen_disconnect_qpairs(struct spdk_io_channel_iter *i)
499 {
500 	struct nvmf_stop_listen_ctx *ctx;
501 	struct spdk_nvmf_poll_group *group;
502 	struct spdk_io_channel *ch;
503 	struct spdk_nvmf_qpair *qpair, *tmp_qpair;
504 	struct spdk_nvme_transport_id tmp_trid;
505 	bool qpair_found = false;
506 
507 	ctx = spdk_io_channel_iter_get_ctx(i);
508 	ch = spdk_io_channel_iter_get_channel(i);
509 	group = spdk_io_channel_get_ctx(ch);
510 
511 	TAILQ_FOREACH_SAFE(qpair, &group->qpairs, link, tmp_qpair) {
512 		if (spdk_nvmf_qpair_get_listen_trid(qpair, &tmp_trid)) {
513 			continue;
514 		}
515 
516 		/* Skip qpairs that don't match the listen trid and subsystem pointer.  If
517 		 * the ctx->subsystem is NULL, it means disconnect all qpairs that match
518 		 * the listen trid. */
519 		if (!spdk_nvme_transport_id_compare(&ctx->trid, &tmp_trid)) {
520 			if (ctx->subsystem == NULL ||
521 			    (qpair->ctrlr != NULL && ctx->subsystem == qpair->ctrlr->subsys)) {
522 				spdk_nvmf_qpair_disconnect(qpair);
523 				qpair_found = true;
524 			}
525 		}
526 	}
527 	if (qpair_found) {
528 		spdk_thread_send_msg(spdk_get_thread(), nvmf_stop_listen_disconnect_qpairs_msg, i);
529 		return;
530 	}
531 
532 	spdk_for_each_channel_continue(i, 0);
533 }
534 
535 int
536 spdk_nvmf_transport_stop_listen_async(struct spdk_nvmf_transport *transport,
537 				      const struct spdk_nvme_transport_id *trid,
538 				      struct spdk_nvmf_subsystem *subsystem,
539 				      spdk_nvmf_tgt_subsystem_listen_done_fn cb_fn,
540 				      void *cb_arg)
541 {
542 	struct nvmf_stop_listen_ctx *ctx;
543 
544 	if (trid->subnqn[0] != '\0') {
545 		SPDK_ERRLOG("subnqn should be empty, use subsystem pointer instead\n");
546 		return -EINVAL;
547 	}
548 
549 	ctx = calloc(1, sizeof(struct nvmf_stop_listen_ctx));
550 	if (ctx == NULL) {
551 		return -ENOMEM;
552 	}
553 
554 	ctx->trid = *trid;
555 	ctx->subsystem = subsystem;
556 	ctx->transport = transport;
557 	ctx->cb_fn = cb_fn;
558 	ctx->cb_arg = cb_arg;
559 
560 	spdk_for_each_channel(transport->tgt, nvmf_stop_listen_disconnect_qpairs, ctx,
561 			      nvmf_stop_listen_fini);
562 
563 	return 0;
564 }
565 
566 void
567 nvmf_transport_listener_discover(struct spdk_nvmf_transport *transport,
568 				 struct spdk_nvme_transport_id *trid,
569 				 struct spdk_nvmf_discovery_log_page_entry *entry)
570 {
571 	transport->ops->listener_discover(transport, trid, entry);
572 }
573 
574 static int
575 nvmf_tgroup_poll(void *arg)
576 {
577 	struct spdk_nvmf_transport_poll_group *tgroup = arg;
578 	int rc;
579 
580 	rc = nvmf_transport_poll_group_poll(tgroup);
581 	return rc == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY;
582 }
583 
584 static void
585 nvmf_transport_poll_group_create_poller(struct spdk_nvmf_transport_poll_group *tgroup)
586 {
587 	char poller_name[SPDK_NVMF_TRSTRING_MAX_LEN + 32];
588 
589 	snprintf(poller_name, sizeof(poller_name), "nvmf_%s", tgroup->transport->ops->name);
590 	tgroup->poller = spdk_poller_register_named(nvmf_tgroup_poll, tgroup, 0, poller_name);
591 	spdk_poller_register_interrupt(tgroup->poller, NULL, NULL);
592 }
593 
594 struct spdk_nvmf_transport_poll_group *
595 nvmf_transport_poll_group_create(struct spdk_nvmf_transport *transport,
596 				 struct spdk_nvmf_poll_group *group)
597 {
598 	struct spdk_nvmf_transport_poll_group *tgroup;
599 	struct spdk_iobuf_opts opts_iobuf = {};
600 	uint32_t buf_cache_size, small_cache_size, large_cache_size;
601 	int rc;
602 
603 	pthread_mutex_lock(&transport->mutex);
604 	tgroup = transport->ops->poll_group_create(transport, group);
605 	pthread_mutex_unlock(&transport->mutex);
606 	if (!tgroup) {
607 		return NULL;
608 	}
609 	tgroup->transport = transport;
610 	nvmf_transport_poll_group_create_poller(tgroup);
611 
612 	STAILQ_INIT(&tgroup->pending_buf_queue);
613 
614 	if (!nvmf_transport_use_iobuf(transport)) {
615 		/* We aren't going to allocate any shared buffers or cache, so just return now. */
616 		return tgroup;
617 	}
618 
619 	buf_cache_size = transport->opts.buf_cache_size;
620 
621 	/* buf_cache_size of UINT32_MAX means the value should be calculated dynamically
622 	 * based on the number of buffers in the shared pool and the number of poll groups
623 	 * that are sharing them.  We allocate 75% of the pool for the cache, and then
624 	 * divide that by number of poll groups to determine the buf_cache_size for this
625 	 * poll group.
626 	 */
627 	if (buf_cache_size == UINT32_MAX) {
628 		uint32_t num_shared_buffers = transport->opts.num_shared_buffers;
629 
630 		/* Theoretically the nvmf library can dynamically add poll groups to
631 		 * the target, after transports have already been created.  We aren't
632 		 * going to try to really handle this case efficiently, just do enough
633 		 * here to ensure we don't divide-by-zero.
634 		 */
635 		uint16_t num_poll_groups = group->tgt->num_poll_groups ? : spdk_env_get_core_count();
636 
637 		buf_cache_size = (num_shared_buffers * 3 / 4) / num_poll_groups;
638 	}
639 
640 	spdk_iobuf_get_opts(&opts_iobuf, sizeof(opts_iobuf));
641 	small_cache_size = buf_cache_size;
642 	if (transport->opts.io_unit_size <= opts_iobuf.small_bufsize) {
643 		large_cache_size = 0;
644 	} else {
645 		large_cache_size = buf_cache_size;
646 	}
647 
648 	tgroup->buf_cache = calloc(1, sizeof(*tgroup->buf_cache));
649 	if (!tgroup->buf_cache) {
650 		SPDK_ERRLOG("Unable to allocate an iobuf channel in the poll group.\n");
651 		goto err;
652 	}
653 
654 	rc = spdk_iobuf_channel_init(tgroup->buf_cache, transport->iobuf_name, small_cache_size,
655 				     large_cache_size);
656 	if (rc != 0) {
657 		SPDK_ERRLOG("Unable to reserve the full number of buffers for the pg buffer cache.\n");
658 		rc = spdk_iobuf_channel_init(tgroup->buf_cache, transport->iobuf_name, 0, 0);
659 		if (rc != 0) {
660 			SPDK_ERRLOG("Unable to create an iobuf channel in the poll group.\n");
661 			goto err;
662 		}
663 	}
664 
665 	return tgroup;
666 err:
667 	transport->ops->poll_group_destroy(tgroup);
668 	return NULL;
669 }
670 
671 struct spdk_nvmf_transport_poll_group *
672 nvmf_transport_get_optimal_poll_group(struct spdk_nvmf_transport *transport,
673 				      struct spdk_nvmf_qpair *qpair)
674 {
675 	struct spdk_nvmf_transport_poll_group *tgroup;
676 
677 	if (transport->ops->get_optimal_poll_group) {
678 		pthread_mutex_lock(&transport->mutex);
679 		tgroup = transport->ops->get_optimal_poll_group(qpair);
680 		pthread_mutex_unlock(&transport->mutex);
681 
682 		return tgroup;
683 	} else {
684 		return NULL;
685 	}
686 }
687 
688 void
689 nvmf_transport_poll_group_destroy(struct spdk_nvmf_transport_poll_group *group)
690 {
691 	struct spdk_nvmf_transport *transport;
692 	struct spdk_iobuf_channel *ch = NULL;
693 
694 	transport = group->transport;
695 
696 	spdk_poller_unregister(&group->poller);
697 
698 	if (!STAILQ_EMPTY(&group->pending_buf_queue)) {
699 		SPDK_ERRLOG("Pending I/O list wasn't empty on poll group destruction\n");
700 	}
701 
702 	if (nvmf_transport_use_iobuf(transport)) {
703 		/* The call to poll_group_destroy both frees the group memory, but also
704 		 * releases any remaining buffers. Cache channel pointer so we can still
705 		 * release the resources after the group has been freed. */
706 		ch = group->buf_cache;
707 	}
708 
709 	pthread_mutex_lock(&transport->mutex);
710 	transport->ops->poll_group_destroy(group);
711 	pthread_mutex_unlock(&transport->mutex);
712 
713 	if (nvmf_transport_use_iobuf(transport)) {
714 		spdk_iobuf_channel_fini(ch);
715 		free(ch);
716 	}
717 }
718 
719 void
720 nvmf_transport_poll_group_pause(struct spdk_nvmf_transport_poll_group *tgroup)
721 {
722 	spdk_poller_unregister(&tgroup->poller);
723 }
724 
725 void
726 nvmf_transport_poll_group_resume(struct spdk_nvmf_transport_poll_group *tgroup)
727 {
728 	nvmf_transport_poll_group_create_poller(tgroup);
729 }
730 
731 int
732 nvmf_transport_poll_group_add(struct spdk_nvmf_transport_poll_group *group,
733 			      struct spdk_nvmf_qpair *qpair)
734 {
735 	if (qpair->transport) {
736 		assert(qpair->transport == group->transport);
737 		if (qpair->transport != group->transport) {
738 			return -1;
739 		}
740 	} else {
741 		qpair->transport = group->transport;
742 	}
743 
744 	SPDK_DTRACE_PROBE3(nvmf_transport_poll_group_add, qpair, qpair->qid,
745 			   spdk_thread_get_id(group->group->thread));
746 
747 	return group->transport->ops->poll_group_add(group, qpair);
748 }
749 
750 int
751 nvmf_transport_poll_group_remove(struct spdk_nvmf_transport_poll_group *group,
752 				 struct spdk_nvmf_qpair *qpair)
753 {
754 	int rc = ENOTSUP;
755 
756 	SPDK_DTRACE_PROBE3(nvmf_transport_poll_group_remove, qpair, qpair->qid,
757 			   spdk_thread_get_id(group->group->thread));
758 
759 	assert(qpair->transport == group->transport);
760 	if (group->transport->ops->poll_group_remove) {
761 		rc = group->transport->ops->poll_group_remove(group, qpair);
762 	}
763 
764 	return rc;
765 }
766 
767 int
768 nvmf_transport_poll_group_poll(struct spdk_nvmf_transport_poll_group *group)
769 {
770 	return group->transport->ops->poll_group_poll(group);
771 }
772 
773 int
774 nvmf_transport_req_free(struct spdk_nvmf_request *req)
775 {
776 	return req->qpair->transport->ops->req_free(req);
777 }
778 
779 int
780 nvmf_transport_req_complete(struct spdk_nvmf_request *req)
781 {
782 	return req->qpair->transport->ops->req_complete(req);
783 }
784 
785 void
786 nvmf_transport_qpair_fini(struct spdk_nvmf_qpair *qpair,
787 			  spdk_nvmf_transport_qpair_fini_cb cb_fn,
788 			  void *cb_arg)
789 {
790 	SPDK_DTRACE_PROBE1(nvmf_transport_qpair_fini, qpair);
791 
792 	qpair->transport->ops->qpair_fini(qpair, cb_fn, cb_arg);
793 }
794 
795 int
796 nvmf_transport_qpair_get_peer_trid(struct spdk_nvmf_qpair *qpair,
797 				   struct spdk_nvme_transport_id *trid)
798 {
799 	return qpair->transport->ops->qpair_get_peer_trid(qpair, trid);
800 }
801 
802 int
803 nvmf_transport_qpair_get_local_trid(struct spdk_nvmf_qpair *qpair,
804 				    struct spdk_nvme_transport_id *trid)
805 {
806 	return qpair->transport->ops->qpair_get_local_trid(qpair, trid);
807 }
808 
809 int
810 nvmf_transport_qpair_get_listen_trid(struct spdk_nvmf_qpair *qpair,
811 				     struct spdk_nvme_transport_id *trid)
812 {
813 	return qpair->transport->ops->qpair_get_listen_trid(qpair, trid);
814 }
815 
816 void
817 nvmf_transport_qpair_abort_request(struct spdk_nvmf_qpair *qpair,
818 				   struct spdk_nvmf_request *req)
819 {
820 	if (qpair->transport->ops->qpair_abort_request) {
821 		qpair->transport->ops->qpair_abort_request(qpair, req);
822 	}
823 }
824 
825 bool
826 spdk_nvmf_transport_opts_init(const char *transport_name,
827 			      struct spdk_nvmf_transport_opts *opts, size_t opts_size)
828 {
829 	const struct spdk_nvmf_transport_ops *ops;
830 	struct spdk_nvmf_transport_opts opts_local = {};
831 
832 	ops = nvmf_get_transport_ops(transport_name);
833 	if (!ops) {
834 		SPDK_ERRLOG("Transport type %s unavailable.\n", transport_name);
835 		return false;
836 	}
837 
838 	if (!opts) {
839 		SPDK_ERRLOG("opts should not be NULL\n");
840 		return false;
841 	}
842 
843 	if (!opts_size) {
844 		SPDK_ERRLOG("opts_size inside opts should not be zero value\n");
845 		return false;
846 	}
847 
848 	opts_local.association_timeout = NVMF_TRANSPORT_DEFAULT_ASSOCIATION_TIMEOUT_IN_MS;
849 	opts_local.acceptor_poll_rate = SPDK_NVMF_DEFAULT_ACCEPT_POLL_RATE_US;
850 	opts_local.disable_command_passthru = false;
851 	ops->opts_init(&opts_local);
852 
853 	nvmf_transport_opts_copy(opts, &opts_local, opts_size);
854 
855 	return true;
856 }
857 
858 void
859 spdk_nvmf_request_free_buffers(struct spdk_nvmf_request *req,
860 			       struct spdk_nvmf_transport_poll_group *group,
861 			       struct spdk_nvmf_transport *transport)
862 {
863 	uint32_t i;
864 
865 	for (i = 0; i < req->iovcnt; i++) {
866 		spdk_iobuf_put(group->buf_cache, req->iov[i].iov_base, req->iov[i].iov_len);
867 		req->iov[i].iov_base = NULL;
868 		req->iov[i].iov_len = 0;
869 	}
870 	req->iovcnt = 0;
871 	req->data_from_pool = false;
872 }
873 
874 static int
875 nvmf_request_set_buffer(struct spdk_nvmf_request *req, void *buf, uint32_t length,
876 			uint32_t io_unit_size)
877 {
878 	req->iov[req->iovcnt].iov_base = buf;
879 	req->iov[req->iovcnt].iov_len  = spdk_min(length, io_unit_size);
880 	length -= req->iov[req->iovcnt].iov_len;
881 	req->iovcnt++;
882 
883 	return length;
884 }
885 
886 static int
887 nvmf_request_set_stripped_buffer(struct spdk_nvmf_request *req, void *buf, uint32_t length,
888 				 uint32_t io_unit_size)
889 {
890 	struct spdk_nvmf_stripped_data *data = req->stripped_data;
891 
892 	data->iov[data->iovcnt].iov_base = buf;
893 	data->iov[data->iovcnt].iov_len  = spdk_min(length, io_unit_size);
894 	length -= data->iov[data->iovcnt].iov_len;
895 	data->iovcnt++;
896 
897 	return length;
898 }
899 
900 static void nvmf_request_iobuf_get_cb(struct spdk_iobuf_entry *entry, void *buf);
901 
902 static int
903 nvmf_request_get_buffers(struct spdk_nvmf_request *req,
904 			 struct spdk_nvmf_transport_poll_group *group,
905 			 struct spdk_nvmf_transport *transport,
906 			 uint32_t length, uint32_t io_unit_size,
907 			 bool stripped_buffers)
908 {
909 	struct spdk_iobuf_entry *entry = NULL;
910 	uint32_t num_buffers;
911 	uint32_t i = 0;
912 	void *buffer;
913 
914 	/* If the number of buffers is too large, then we know the I/O is larger than allowed.
915 	 *  Fail it.
916 	 */
917 	num_buffers = SPDK_CEIL_DIV(length, io_unit_size);
918 	if (spdk_unlikely(num_buffers > NVMF_REQ_MAX_BUFFERS)) {
919 		return -EINVAL;
920 	}
921 
922 	/* Use iobuf queuing only if transport supports it */
923 	if (transport->ops->req_get_buffers_done != NULL) {
924 		entry = &req->iobuf.entry;
925 	}
926 
927 	while (i < num_buffers) {
928 		buffer = spdk_iobuf_get(group->buf_cache, spdk_min(io_unit_size, length), entry,
929 					nvmf_request_iobuf_get_cb);
930 		if (spdk_unlikely(buffer == NULL)) {
931 			req->iobuf.remaining_length = length;
932 			return -ENOMEM;
933 		}
934 		if (stripped_buffers) {
935 			length = nvmf_request_set_stripped_buffer(req, buffer, length, io_unit_size);
936 		} else {
937 			length = nvmf_request_set_buffer(req, buffer, length, io_unit_size);
938 		}
939 		i++;
940 	}
941 
942 	assert(length == 0);
943 	req->data_from_pool = true;
944 
945 	return 0;
946 }
947 
948 static void
949 nvmf_request_iobuf_get_cb(struct spdk_iobuf_entry *entry, void *buf)
950 {
951 	struct spdk_nvmf_request *req = SPDK_CONTAINEROF(entry, struct spdk_nvmf_request, iobuf.entry);
952 	struct spdk_nvmf_transport *transport = req->qpair->transport;
953 	struct spdk_nvmf_poll_group *group = req->qpair->group;
954 	struct spdk_nvmf_transport_poll_group *tgroup = nvmf_get_transport_poll_group(group, transport);
955 	uint32_t length = req->iobuf.remaining_length;
956 	uint32_t io_unit_size = transport->opts.io_unit_size;
957 	int rc;
958 
959 	assert(tgroup != NULL);
960 
961 	length = nvmf_request_set_buffer(req, buf, length, io_unit_size);
962 	rc = nvmf_request_get_buffers(req, tgroup, transport, length, io_unit_size, false);
963 	if (rc == 0) {
964 		transport->ops->req_get_buffers_done(req);
965 	}
966 }
967 
968 int
969 spdk_nvmf_request_get_buffers(struct spdk_nvmf_request *req,
970 			      struct spdk_nvmf_transport_poll_group *group,
971 			      struct spdk_nvmf_transport *transport,
972 			      uint32_t length)
973 {
974 	int rc;
975 
976 	assert(nvmf_transport_use_iobuf(transport));
977 
978 	req->iovcnt = 0;
979 	rc = nvmf_request_get_buffers(req, group, transport, length, transport->opts.io_unit_size, false);
980 	if (spdk_unlikely(rc == -ENOMEM && transport->ops->req_get_buffers_done == NULL)) {
981 		spdk_nvmf_request_free_buffers(req, group, transport);
982 	}
983 
984 	return rc;
985 }
986 
987 static int
988 nvmf_request_get_buffers_abort_cb(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry,
989 				  void *cb_ctx)
990 {
991 	struct spdk_nvmf_request *req, *req_to_abort = cb_ctx;
992 
993 	req = SPDK_CONTAINEROF(entry, struct spdk_nvmf_request, iobuf.entry);
994 	if (req != req_to_abort) {
995 		return 0;
996 	}
997 
998 	spdk_iobuf_entry_abort(ch, entry, spdk_min(req->iobuf.remaining_length,
999 			       req->qpair->transport->opts.io_unit_size));
1000 	return 1;
1001 }
1002 
1003 bool
1004 nvmf_request_get_buffers_abort(struct spdk_nvmf_request *req)
1005 {
1006 	struct spdk_nvmf_transport_poll_group *tgroup = nvmf_get_transport_poll_group(req->qpair->group,
1007 			req->qpair->transport);
1008 	int rc;
1009 
1010 	assert(tgroup != NULL);
1011 
1012 	rc = spdk_iobuf_for_each_entry(tgroup->buf_cache, nvmf_request_get_buffers_abort_cb, req);
1013 	return rc == 1;
1014 }
1015 
1016 void
1017 nvmf_request_free_stripped_buffers(struct spdk_nvmf_request *req,
1018 				   struct spdk_nvmf_transport_poll_group *group,
1019 				   struct spdk_nvmf_transport *transport)
1020 {
1021 	struct spdk_nvmf_stripped_data *data = req->stripped_data;
1022 	uint32_t i;
1023 
1024 	for (i = 0; i < data->iovcnt; i++) {
1025 		spdk_iobuf_put(group->buf_cache, data->iov[i].iov_base, data->iov[i].iov_len);
1026 	}
1027 	free(data);
1028 	req->stripped_data = NULL;
1029 }
1030 
1031 int
1032 nvmf_request_get_stripped_buffers(struct spdk_nvmf_request *req,
1033 				  struct spdk_nvmf_transport_poll_group *group,
1034 				  struct spdk_nvmf_transport *transport,
1035 				  uint32_t length)
1036 {
1037 	uint32_t block_size = req->dif.dif_ctx.block_size;
1038 	uint32_t data_block_size = block_size - req->dif.dif_ctx.md_size;
1039 	uint32_t io_unit_size = transport->opts.io_unit_size / block_size * data_block_size;
1040 	struct spdk_nvmf_stripped_data *data;
1041 	uint32_t i;
1042 	int rc;
1043 
1044 	/* We don't support iobuf queueing with stripped buffers yet */
1045 	assert(transport->ops->req_get_buffers_done == NULL);
1046 
1047 	/* Data blocks must be block aligned */
1048 	for (i = 0; i < req->iovcnt; i++) {
1049 		if (req->iov[i].iov_len % block_size) {
1050 			return -EINVAL;
1051 		}
1052 	}
1053 
1054 	data = calloc(1, sizeof(*data));
1055 	if (data == NULL) {
1056 		SPDK_ERRLOG("Unable to allocate memory for stripped_data.\n");
1057 		return -ENOMEM;
1058 	}
1059 	req->stripped_data = data;
1060 	req->stripped_data->iovcnt = 0;
1061 
1062 	rc = nvmf_request_get_buffers(req, group, transport, length, io_unit_size, true);
1063 	if (rc == -ENOMEM) {
1064 		nvmf_request_free_stripped_buffers(req, group, transport);
1065 		return rc;
1066 	}
1067 	return rc;
1068 }
1069