xref: /spdk/lib/nvme/nvme_rdma.c (revision cf151d60e64ad49fcad18dc82ca4f02500b6f07d)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation. All rights reserved.
3  *   Copyright (c) 2019-2021 Mellanox Technologies LTD. All rights reserved.
4  *   Copyright (c) 2021-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 /*
8  * NVMe over RDMA transport
9  */
10 
11 #include "spdk/stdinc.h"
12 
13 #include "spdk/assert.h"
14 #include "spdk/dma.h"
15 #include "spdk/log.h"
16 #include "spdk/trace.h"
17 #include "spdk/queue.h"
18 #include "spdk/nvme.h"
19 #include "spdk/nvmf_spec.h"
20 #include "spdk/string.h"
21 #include "spdk/endian.h"
22 #include "spdk/likely.h"
23 #include "spdk/config.h"
24 
25 #include "nvme_internal.h"
26 #include "spdk_internal/rdma_provider.h"
27 
28 #define NVME_RDMA_TIME_OUT_IN_MS 2000
29 #define NVME_RDMA_RW_BUFFER_SIZE 131072
30 
31 /*
32  * NVME RDMA qpair Resource Defaults
33  */
34 #define NVME_RDMA_DEFAULT_TX_SGE		2
35 #define NVME_RDMA_DEFAULT_RX_SGE		1
36 
37 /* Max number of NVMe-oF SGL descriptors supported by the host */
38 #define NVME_RDMA_MAX_SGL_DESCRIPTORS		16
39 
40 /* number of STAILQ entries for holding pending RDMA CM events. */
41 #define NVME_RDMA_NUM_CM_EVENTS			256
42 
43 /* The default size for a shared rdma completion queue. */
44 #define DEFAULT_NVME_RDMA_CQ_SIZE		4096
45 
46 /*
47  * In the special case of a stale connection we don't expose a mechanism
48  * for the user to retry the connection so we need to handle it internally.
49  */
50 #define NVME_RDMA_STALE_CONN_RETRY_MAX		5
51 #define NVME_RDMA_STALE_CONN_RETRY_DELAY_US	10000
52 
53 /*
54  * Maximum value of transport_retry_count used by RDMA controller
55  */
56 #define NVME_RDMA_CTRLR_MAX_TRANSPORT_RETRY_COUNT	7
57 
58 /*
59  * Maximum value of transport_ack_timeout used by RDMA controller
60  */
61 #define NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT	31
62 
63 /*
64  * Number of microseconds to wait until the lingering qpair becomes quiet.
65  */
66 #define NVME_RDMA_DISCONNECTED_QPAIR_TIMEOUT_US	1000000ull
67 
68 /*
69  * The max length of keyed SGL data block (3 bytes)
70  */
71 #define NVME_RDMA_MAX_KEYED_SGL_LENGTH ((1u << 24u) - 1)
72 
73 #define WC_PER_QPAIR(queue_depth)	(queue_depth * 2)
74 
75 #define NVME_RDMA_POLL_GROUP_CHECK_QPN(_rqpair, qpn)				\
76 	((_rqpair)->rdma_qp && (_rqpair)->rdma_qp->qp->qp_num == (qpn))	\
77 
78 struct nvme_rdma_memory_domain {
79 	TAILQ_ENTRY(nvme_rdma_memory_domain) link;
80 	uint32_t ref;
81 	struct ibv_pd *pd;
82 	struct spdk_memory_domain *domain;
83 	struct spdk_memory_domain_rdma_ctx rdma_ctx;
84 };
85 
86 enum nvme_rdma_wr_type {
87 	RDMA_WR_TYPE_RECV,
88 	RDMA_WR_TYPE_SEND,
89 };
90 
91 struct nvme_rdma_wr {
92 	/* Using this instead of the enum allows this struct to only occupy one byte. */
93 	uint8_t	type;
94 };
95 
96 struct spdk_nvmf_cmd {
97 	struct spdk_nvme_cmd cmd;
98 	struct spdk_nvme_sgl_descriptor sgl[NVME_RDMA_MAX_SGL_DESCRIPTORS];
99 };
100 
101 struct spdk_nvme_rdma_hooks g_nvme_hooks = {};
102 
103 /* STAILQ wrapper for cm events. */
104 struct nvme_rdma_cm_event_entry {
105 	struct rdma_cm_event			*evt;
106 	STAILQ_ENTRY(nvme_rdma_cm_event_entry)	link;
107 };
108 
109 /* NVMe RDMA transport extensions for spdk_nvme_ctrlr */
110 struct nvme_rdma_ctrlr {
111 	struct spdk_nvme_ctrlr			ctrlr;
112 
113 	uint16_t				max_sge;
114 
115 	struct rdma_event_channel		*cm_channel;
116 
117 	STAILQ_HEAD(, nvme_rdma_cm_event_entry)	pending_cm_events;
118 
119 	STAILQ_HEAD(, nvme_rdma_cm_event_entry)	free_cm_events;
120 
121 	struct nvme_rdma_cm_event_entry		*cm_events;
122 };
123 
124 struct nvme_rdma_poller_stats {
125 	uint64_t polls;
126 	uint64_t idle_polls;
127 	uint64_t queued_requests;
128 	uint64_t completions;
129 	struct spdk_rdma_provider_qp_stats rdma_stats;
130 };
131 
132 struct nvme_rdma_poll_group;
133 struct nvme_rdma_rsps;
134 
135 struct nvme_rdma_poller {
136 	struct ibv_context		*device;
137 	struct ibv_cq			*cq;
138 	struct spdk_rdma_provider_srq	*srq;
139 	struct nvme_rdma_rsps		*rsps;
140 	struct ibv_pd			*pd;
141 	struct spdk_rdma_mem_map	*mr_map;
142 	uint32_t			refcnt;
143 	int				required_num_wc;
144 	int				current_num_wc;
145 	struct nvme_rdma_poller_stats	stats;
146 	struct nvme_rdma_poll_group	*group;
147 	STAILQ_ENTRY(nvme_rdma_poller)	link;
148 };
149 
150 struct nvme_rdma_qpair;
151 
152 struct nvme_rdma_poll_group {
153 	struct spdk_nvme_transport_poll_group		group;
154 	STAILQ_HEAD(, nvme_rdma_poller)			pollers;
155 	uint32_t					num_pollers;
156 	TAILQ_HEAD(, nvme_rdma_qpair)			connecting_qpairs;
157 	TAILQ_HEAD(, nvme_rdma_qpair)			active_qpairs;
158 };
159 
160 enum nvme_rdma_qpair_state {
161 	NVME_RDMA_QPAIR_STATE_INVALID = 0,
162 	NVME_RDMA_QPAIR_STATE_STALE_CONN,
163 	NVME_RDMA_QPAIR_STATE_INITIALIZING,
164 	NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_SEND,
165 	NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_POLL,
166 	NVME_RDMA_QPAIR_STATE_RUNNING,
167 	NVME_RDMA_QPAIR_STATE_EXITING,
168 	NVME_RDMA_QPAIR_STATE_LINGERING,
169 	NVME_RDMA_QPAIR_STATE_EXITED,
170 };
171 
172 typedef int (*nvme_rdma_cm_event_cb)(struct nvme_rdma_qpair *rqpair, int ret);
173 
174 struct nvme_rdma_rsp_opts {
175 	uint16_t				num_entries;
176 	struct nvme_rdma_qpair			*rqpair;
177 	struct spdk_rdma_provider_srq		*srq;
178 	struct spdk_rdma_mem_map		*mr_map;
179 };
180 
181 struct nvme_rdma_rsps {
182 	/* Parallel arrays of response buffers + response SGLs of size num_entries */
183 	struct ibv_sge				*rsp_sgls;
184 	struct spdk_nvme_rdma_rsp		*rsps;
185 
186 	struct ibv_recv_wr			*rsp_recv_wrs;
187 
188 	/* Count of outstanding recv objects */
189 	uint16_t				current_num_recvs;
190 
191 	uint16_t				num_entries;
192 };
193 
194 /* NVMe RDMA qpair extensions for spdk_nvme_qpair */
195 struct nvme_rdma_qpair {
196 	struct spdk_nvme_qpair			qpair;
197 
198 	struct spdk_rdma_provider_qp		*rdma_qp;
199 	struct rdma_cm_id			*cm_id;
200 	struct ibv_cq				*cq;
201 	struct spdk_rdma_provider_srq		*srq;
202 
203 	struct	spdk_nvme_rdma_req		*rdma_reqs;
204 
205 	uint32_t				max_send_sge;
206 
207 	uint32_t				max_recv_sge;
208 
209 	uint16_t				num_entries;
210 
211 	bool					delay_cmd_submit;
212 
213 	uint32_t				num_completions;
214 	uint32_t				num_outstanding_reqs;
215 
216 	struct nvme_rdma_rsps			*rsps;
217 
218 	/*
219 	 * Array of num_entries NVMe commands registered as RDMA message buffers.
220 	 * Indexed by rdma_req->id.
221 	 */
222 	struct spdk_nvmf_cmd			*cmds;
223 
224 	struct spdk_rdma_mem_map		*mr_map;
225 
226 	TAILQ_HEAD(, spdk_nvme_rdma_req)	free_reqs;
227 	TAILQ_HEAD(, spdk_nvme_rdma_req)	outstanding_reqs;
228 
229 	struct nvme_rdma_memory_domain		*memory_domain;
230 
231 	/* Count of outstanding send objects */
232 	uint16_t				current_num_sends;
233 
234 	TAILQ_ENTRY(nvme_rdma_qpair)		link_active;
235 
236 	/* Placed at the end of the struct since it is not used frequently */
237 	struct rdma_cm_event			*evt;
238 	struct nvme_rdma_poller			*poller;
239 
240 	uint64_t				evt_timeout_ticks;
241 	nvme_rdma_cm_event_cb			evt_cb;
242 	enum rdma_cm_event_type			expected_evt_type;
243 
244 	enum nvme_rdma_qpair_state		state;
245 
246 	bool					in_connect_poll;
247 
248 	uint8_t					stale_conn_retry_count;
249 	bool					need_destroy;
250 
251 	TAILQ_ENTRY(nvme_rdma_qpair)		link_connecting;
252 };
253 
254 enum NVME_RDMA_COMPLETION_FLAGS {
255 	NVME_RDMA_SEND_COMPLETED = 1u << 0,
256 	NVME_RDMA_RECV_COMPLETED = 1u << 1,
257 };
258 
259 struct spdk_nvme_rdma_req {
260 	uint16_t				id;
261 	uint16_t				completion_flags: 2;
262 	uint16_t				reserved: 14;
263 	/* if completion of RDMA_RECV received before RDMA_SEND, we will complete nvme request
264 	 * during processing of RDMA_SEND. To complete the request we must know the response
265 	 * received in RDMA_RECV, so store it in this field */
266 	struct spdk_nvme_rdma_rsp		*rdma_rsp;
267 
268 	struct nvme_rdma_wr			rdma_wr;
269 
270 	struct ibv_send_wr			send_wr;
271 
272 	struct nvme_request			*req;
273 
274 	struct ibv_sge				send_sgl[NVME_RDMA_DEFAULT_TX_SGE];
275 
276 	TAILQ_ENTRY(spdk_nvme_rdma_req)		link;
277 };
278 
279 struct spdk_nvme_rdma_rsp {
280 	struct spdk_nvme_cpl	cpl;
281 	struct nvme_rdma_qpair	*rqpair;
282 	struct ibv_recv_wr	*recv_wr;
283 	struct nvme_rdma_wr	rdma_wr;
284 };
285 
286 struct nvme_rdma_memory_translation_ctx {
287 	void *addr;
288 	size_t length;
289 	uint32_t lkey;
290 	uint32_t rkey;
291 };
292 
293 static const char *rdma_cm_event_str[] = {
294 	"RDMA_CM_EVENT_ADDR_RESOLVED",
295 	"RDMA_CM_EVENT_ADDR_ERROR",
296 	"RDMA_CM_EVENT_ROUTE_RESOLVED",
297 	"RDMA_CM_EVENT_ROUTE_ERROR",
298 	"RDMA_CM_EVENT_CONNECT_REQUEST",
299 	"RDMA_CM_EVENT_CONNECT_RESPONSE",
300 	"RDMA_CM_EVENT_CONNECT_ERROR",
301 	"RDMA_CM_EVENT_UNREACHABLE",
302 	"RDMA_CM_EVENT_REJECTED",
303 	"RDMA_CM_EVENT_ESTABLISHED",
304 	"RDMA_CM_EVENT_DISCONNECTED",
305 	"RDMA_CM_EVENT_DEVICE_REMOVAL",
306 	"RDMA_CM_EVENT_MULTICAST_JOIN",
307 	"RDMA_CM_EVENT_MULTICAST_ERROR",
308 	"RDMA_CM_EVENT_ADDR_CHANGE",
309 	"RDMA_CM_EVENT_TIMEWAIT_EXIT"
310 };
311 
312 static struct nvme_rdma_poller *nvme_rdma_poll_group_get_poller(struct nvme_rdma_poll_group *group,
313 		struct ibv_context *device);
314 static void nvme_rdma_poll_group_put_poller(struct nvme_rdma_poll_group *group,
315 		struct nvme_rdma_poller *poller);
316 
317 static TAILQ_HEAD(, nvme_rdma_memory_domain) g_memory_domains = TAILQ_HEAD_INITIALIZER(
318 			g_memory_domains);
319 static pthread_mutex_t g_memory_domains_lock = PTHREAD_MUTEX_INITIALIZER;
320 
321 static struct nvme_rdma_memory_domain *
322 nvme_rdma_get_memory_domain(struct ibv_pd *pd)
323 {
324 	struct nvme_rdma_memory_domain *domain = NULL;
325 	struct spdk_memory_domain_ctx ctx;
326 	int rc;
327 
328 	pthread_mutex_lock(&g_memory_domains_lock);
329 
330 	TAILQ_FOREACH(domain, &g_memory_domains, link) {
331 		if (domain->pd == pd) {
332 			domain->ref++;
333 			pthread_mutex_unlock(&g_memory_domains_lock);
334 			return domain;
335 		}
336 	}
337 
338 	domain = calloc(1, sizeof(*domain));
339 	if (!domain) {
340 		SPDK_ERRLOG("Memory allocation failed\n");
341 		pthread_mutex_unlock(&g_memory_domains_lock);
342 		return NULL;
343 	}
344 
345 	domain->rdma_ctx.size = sizeof(domain->rdma_ctx);
346 	domain->rdma_ctx.ibv_pd = pd;
347 	ctx.size = sizeof(ctx);
348 	ctx.user_ctx = &domain->rdma_ctx;
349 
350 	rc = spdk_memory_domain_create(&domain->domain, SPDK_DMA_DEVICE_TYPE_RDMA, &ctx,
351 				       SPDK_RDMA_DMA_DEVICE);
352 	if (rc) {
353 		SPDK_ERRLOG("Failed to create memory domain\n");
354 		free(domain);
355 		pthread_mutex_unlock(&g_memory_domains_lock);
356 		return NULL;
357 	}
358 
359 	domain->pd = pd;
360 	domain->ref = 1;
361 	TAILQ_INSERT_TAIL(&g_memory_domains, domain, link);
362 
363 	pthread_mutex_unlock(&g_memory_domains_lock);
364 
365 	return domain;
366 }
367 
368 static void
369 nvme_rdma_put_memory_domain(struct nvme_rdma_memory_domain *device)
370 {
371 	if (!device) {
372 		return;
373 	}
374 
375 	pthread_mutex_lock(&g_memory_domains_lock);
376 
377 	assert(device->ref > 0);
378 
379 	device->ref--;
380 
381 	if (device->ref == 0) {
382 		spdk_memory_domain_destroy(device->domain);
383 		TAILQ_REMOVE(&g_memory_domains, device, link);
384 		free(device);
385 	}
386 
387 	pthread_mutex_unlock(&g_memory_domains_lock);
388 }
389 
390 static int nvme_rdma_ctrlr_delete_io_qpair(struct spdk_nvme_ctrlr *ctrlr,
391 		struct spdk_nvme_qpair *qpair);
392 
393 static inline struct nvme_rdma_qpair *
394 nvme_rdma_qpair(struct spdk_nvme_qpair *qpair)
395 {
396 	assert(qpair->trtype == SPDK_NVME_TRANSPORT_RDMA);
397 	return SPDK_CONTAINEROF(qpair, struct nvme_rdma_qpair, qpair);
398 }
399 
400 static inline struct nvme_rdma_poll_group *
401 nvme_rdma_poll_group(struct spdk_nvme_transport_poll_group *group)
402 {
403 	return (SPDK_CONTAINEROF(group, struct nvme_rdma_poll_group, group));
404 }
405 
406 static inline struct nvme_rdma_ctrlr *
407 nvme_rdma_ctrlr(struct spdk_nvme_ctrlr *ctrlr)
408 {
409 	assert(ctrlr->trid.trtype == SPDK_NVME_TRANSPORT_RDMA);
410 	return SPDK_CONTAINEROF(ctrlr, struct nvme_rdma_ctrlr, ctrlr);
411 }
412 
413 static struct spdk_nvme_rdma_req *
414 nvme_rdma_req_get(struct nvme_rdma_qpair *rqpair)
415 {
416 	struct spdk_nvme_rdma_req *rdma_req;
417 
418 	rdma_req = TAILQ_FIRST(&rqpair->free_reqs);
419 	if (rdma_req) {
420 		TAILQ_REMOVE(&rqpair->free_reqs, rdma_req, link);
421 	}
422 
423 	return rdma_req;
424 }
425 
426 static void
427 nvme_rdma_req_put(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
428 {
429 	rdma_req->completion_flags = 0;
430 	rdma_req->req = NULL;
431 	rdma_req->rdma_rsp = NULL;
432 	TAILQ_INSERT_HEAD(&rqpair->free_reqs, rdma_req, link);
433 }
434 
435 static void
436 nvme_rdma_req_complete(struct spdk_nvme_rdma_req *rdma_req,
437 		       struct spdk_nvme_cpl *rsp,
438 		       bool print_on_error)
439 {
440 	struct nvme_request *req = rdma_req->req;
441 	struct nvme_rdma_qpair *rqpair;
442 	struct spdk_nvme_qpair *qpair;
443 	bool error, print_error;
444 
445 	assert(req != NULL);
446 
447 	qpair = req->qpair;
448 	rqpair = nvme_rdma_qpair(qpair);
449 
450 	error = spdk_nvme_cpl_is_error(rsp);
451 	print_error = error && print_on_error && !qpair->ctrlr->opts.disable_error_logging;
452 
453 	if (print_error) {
454 		spdk_nvme_qpair_print_command(qpair, &req->cmd);
455 	}
456 
457 	if (print_error || SPDK_DEBUGLOG_FLAG_ENABLED("nvme")) {
458 		spdk_nvme_qpair_print_completion(qpair, rsp);
459 	}
460 
461 	assert(rqpair->num_outstanding_reqs > 0);
462 	rqpair->num_outstanding_reqs--;
463 
464 	TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link);
465 
466 	nvme_complete_request(req->cb_fn, req->cb_arg, qpair, req, rsp);
467 	nvme_rdma_req_put(rqpair, rdma_req);
468 }
469 
470 static const char *
471 nvme_rdma_cm_event_str_get(uint32_t event)
472 {
473 	if (event < SPDK_COUNTOF(rdma_cm_event_str)) {
474 		return rdma_cm_event_str[event];
475 	} else {
476 		return "Undefined";
477 	}
478 }
479 
480 
481 static int
482 nvme_rdma_qpair_process_cm_event(struct nvme_rdma_qpair *rqpair)
483 {
484 	struct rdma_cm_event				*event = rqpair->evt;
485 	struct spdk_nvmf_rdma_accept_private_data	*accept_data;
486 	int						rc = 0;
487 
488 	if (event) {
489 		switch (event->event) {
490 		case RDMA_CM_EVENT_ADDR_RESOLVED:
491 		case RDMA_CM_EVENT_ADDR_ERROR:
492 		case RDMA_CM_EVENT_ROUTE_RESOLVED:
493 		case RDMA_CM_EVENT_ROUTE_ERROR:
494 			break;
495 		case RDMA_CM_EVENT_CONNECT_REQUEST:
496 			break;
497 		case RDMA_CM_EVENT_CONNECT_ERROR:
498 			break;
499 		case RDMA_CM_EVENT_UNREACHABLE:
500 		case RDMA_CM_EVENT_REJECTED:
501 			break;
502 		case RDMA_CM_EVENT_CONNECT_RESPONSE:
503 			rc = spdk_rdma_provider_qp_complete_connect(rqpair->rdma_qp);
504 		/* fall through */
505 		case RDMA_CM_EVENT_ESTABLISHED:
506 			accept_data = (struct spdk_nvmf_rdma_accept_private_data *)event->param.conn.private_data;
507 			if (accept_data == NULL) {
508 				rc = -1;
509 			} else {
510 				SPDK_DEBUGLOG(nvme, "Requested queue depth %d. Target receive queue depth %d.\n",
511 					      rqpair->num_entries + 1, accept_data->crqsize);
512 			}
513 			break;
514 		case RDMA_CM_EVENT_DISCONNECTED:
515 			rqpair->qpair.transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_REMOTE;
516 			break;
517 		case RDMA_CM_EVENT_DEVICE_REMOVAL:
518 			rqpair->qpair.transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_LOCAL;
519 			rqpair->need_destroy = true;
520 			break;
521 		case RDMA_CM_EVENT_MULTICAST_JOIN:
522 		case RDMA_CM_EVENT_MULTICAST_ERROR:
523 			break;
524 		case RDMA_CM_EVENT_ADDR_CHANGE:
525 			rqpair->qpair.transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_LOCAL;
526 			break;
527 		case RDMA_CM_EVENT_TIMEWAIT_EXIT:
528 			break;
529 		default:
530 			SPDK_ERRLOG("Unexpected Acceptor Event [%d]\n", event->event);
531 			break;
532 		}
533 		rqpair->evt = NULL;
534 		rdma_ack_cm_event(event);
535 	}
536 
537 	return rc;
538 }
539 
540 /*
541  * This function must be called under the nvme controller's lock
542  * because it touches global controller variables. The lock is taken
543  * by the generic transport code before invoking a few of the functions
544  * in this file: nvme_rdma_ctrlr_connect_qpair, nvme_rdma_ctrlr_delete_io_qpair,
545  * and conditionally nvme_rdma_qpair_process_completions when it is calling
546  * completions on the admin qpair. When adding a new call to this function, please
547  * verify that it is in a situation where it falls under the lock.
548  */
549 static int
550 nvme_rdma_poll_events(struct nvme_rdma_ctrlr *rctrlr)
551 {
552 	struct nvme_rdma_cm_event_entry	*entry, *tmp;
553 	struct nvme_rdma_qpair		*event_qpair;
554 	struct rdma_cm_event		*event;
555 	struct rdma_event_channel	*channel = rctrlr->cm_channel;
556 
557 	STAILQ_FOREACH_SAFE(entry, &rctrlr->pending_cm_events, link, tmp) {
558 		event_qpair = entry->evt->id->context;
559 		if (event_qpair->evt == NULL) {
560 			event_qpair->evt = entry->evt;
561 			STAILQ_REMOVE(&rctrlr->pending_cm_events, entry, nvme_rdma_cm_event_entry, link);
562 			STAILQ_INSERT_HEAD(&rctrlr->free_cm_events, entry, link);
563 		}
564 	}
565 
566 	while (rdma_get_cm_event(channel, &event) == 0) {
567 		event_qpair = event->id->context;
568 		if (event_qpair->evt == NULL) {
569 			event_qpair->evt = event;
570 		} else {
571 			assert(rctrlr == nvme_rdma_ctrlr(event_qpair->qpair.ctrlr));
572 			entry = STAILQ_FIRST(&rctrlr->free_cm_events);
573 			if (entry == NULL) {
574 				rdma_ack_cm_event(event);
575 				return -ENOMEM;
576 			}
577 			STAILQ_REMOVE(&rctrlr->free_cm_events, entry, nvme_rdma_cm_event_entry, link);
578 			entry->evt = event;
579 			STAILQ_INSERT_TAIL(&rctrlr->pending_cm_events, entry, link);
580 		}
581 	}
582 
583 	/* rdma_get_cm_event() returns -1 on error. If an error occurs, errno
584 	 * will be set to indicate the failure reason. So return negated errno here.
585 	 */
586 	return -errno;
587 }
588 
589 static int
590 nvme_rdma_validate_cm_event(enum rdma_cm_event_type expected_evt_type,
591 			    struct rdma_cm_event *reaped_evt)
592 {
593 	int rc = -EBADMSG;
594 
595 	if (expected_evt_type == reaped_evt->event) {
596 		return 0;
597 	}
598 
599 	switch (expected_evt_type) {
600 	case RDMA_CM_EVENT_ESTABLISHED:
601 		/*
602 		 * There is an enum ib_cm_rej_reason in the kernel headers that sets 10 as
603 		 * IB_CM_REJ_STALE_CONN. I can't find the corresponding userspace but we get
604 		 * the same values here.
605 		 */
606 		if (reaped_evt->event == RDMA_CM_EVENT_REJECTED && reaped_evt->status == 10) {
607 			rc = -ESTALE;
608 		} else if (reaped_evt->event == RDMA_CM_EVENT_CONNECT_RESPONSE) {
609 			/*
610 			 *  If we are using a qpair which is not created using rdma cm API
611 			 *  then we will receive RDMA_CM_EVENT_CONNECT_RESPONSE instead of
612 			 *  RDMA_CM_EVENT_ESTABLISHED.
613 			 */
614 			return 0;
615 		}
616 		break;
617 	default:
618 		break;
619 	}
620 
621 	SPDK_ERRLOG("Expected %s but received %s (%d) from CM event channel (status = %d)\n",
622 		    nvme_rdma_cm_event_str_get(expected_evt_type),
623 		    nvme_rdma_cm_event_str_get(reaped_evt->event), reaped_evt->event,
624 		    reaped_evt->status);
625 	return rc;
626 }
627 
628 static int
629 nvme_rdma_process_event_start(struct nvme_rdma_qpair *rqpair,
630 			      enum rdma_cm_event_type evt,
631 			      nvme_rdma_cm_event_cb evt_cb)
632 {
633 	int	rc;
634 
635 	assert(evt_cb != NULL);
636 
637 	if (rqpair->evt != NULL) {
638 		rc = nvme_rdma_qpair_process_cm_event(rqpair);
639 		if (rc) {
640 			return rc;
641 		}
642 	}
643 
644 	rqpair->expected_evt_type = evt;
645 	rqpair->evt_cb = evt_cb;
646 	rqpair->evt_timeout_ticks = (g_spdk_nvme_transport_opts.rdma_cm_event_timeout_ms * 1000 *
647 				     spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC + spdk_get_ticks();
648 
649 	return 0;
650 }
651 
652 static int
653 nvme_rdma_process_event_poll(struct nvme_rdma_qpair *rqpair)
654 {
655 	struct nvme_rdma_ctrlr	*rctrlr;
656 	int	rc = 0, rc2;
657 
658 	rctrlr = nvme_rdma_ctrlr(rqpair->qpair.ctrlr);
659 	assert(rctrlr != NULL);
660 
661 	if (!rqpair->evt && spdk_get_ticks() < rqpair->evt_timeout_ticks) {
662 		rc = nvme_rdma_poll_events(rctrlr);
663 		if (rc == -EAGAIN || rc == -EWOULDBLOCK) {
664 			return rc;
665 		}
666 	}
667 
668 	if (rqpair->evt == NULL) {
669 		rc = -EADDRNOTAVAIL;
670 		goto exit;
671 	}
672 
673 	rc = nvme_rdma_validate_cm_event(rqpair->expected_evt_type, rqpair->evt);
674 
675 	rc2 = nvme_rdma_qpair_process_cm_event(rqpair);
676 	/* bad message takes precedence over the other error codes from processing the event. */
677 	rc = rc == 0 ? rc2 : rc;
678 
679 exit:
680 	assert(rqpair->evt_cb != NULL);
681 	return rqpair->evt_cb(rqpair, rc);
682 }
683 
684 static int
685 nvme_rdma_resize_cq(struct nvme_rdma_qpair *rqpair, struct nvme_rdma_poller *poller)
686 {
687 	int	current_num_wc, required_num_wc;
688 	int	max_cq_size;
689 
690 	required_num_wc = poller->required_num_wc + WC_PER_QPAIR(rqpair->num_entries);
691 	current_num_wc = poller->current_num_wc;
692 	if (current_num_wc < required_num_wc) {
693 		current_num_wc = spdk_max(current_num_wc * 2, required_num_wc);
694 	}
695 
696 	max_cq_size = g_spdk_nvme_transport_opts.rdma_max_cq_size;
697 	if (max_cq_size != 0 && current_num_wc > max_cq_size) {
698 		current_num_wc = max_cq_size;
699 	}
700 
701 	if (poller->current_num_wc != current_num_wc) {
702 		SPDK_DEBUGLOG(nvme, "Resize RDMA CQ from %d to %d\n", poller->current_num_wc,
703 			      current_num_wc);
704 		if (ibv_resize_cq(poller->cq, current_num_wc)) {
705 			SPDK_ERRLOG("RDMA CQ resize failed: errno %d: %s\n", errno, spdk_strerror(errno));
706 			return -1;
707 		}
708 
709 		poller->current_num_wc = current_num_wc;
710 	}
711 
712 	poller->required_num_wc = required_num_wc;
713 	return 0;
714 }
715 
716 static int
717 nvme_rdma_qpair_set_poller(struct spdk_nvme_qpair *qpair)
718 {
719 	struct nvme_rdma_qpair          *rqpair = nvme_rdma_qpair(qpair);
720 	struct nvme_rdma_poll_group     *group = nvme_rdma_poll_group(qpair->poll_group);
721 	struct nvme_rdma_poller         *poller;
722 
723 	assert(rqpair->cq == NULL);
724 
725 	poller = nvme_rdma_poll_group_get_poller(group, rqpair->cm_id->verbs);
726 	if (!poller) {
727 		SPDK_ERRLOG("Unable to find a cq for qpair %p on poll group %p\n", qpair, qpair->poll_group);
728 		return -EINVAL;
729 	}
730 
731 	if (!poller->srq) {
732 		if (nvme_rdma_resize_cq(rqpair, poller)) {
733 			nvme_rdma_poll_group_put_poller(group, poller);
734 			return -EPROTO;
735 		}
736 	}
737 
738 	rqpair->cq = poller->cq;
739 	rqpair->srq = poller->srq;
740 	if (rqpair->srq) {
741 		rqpair->rsps = poller->rsps;
742 	}
743 	rqpair->poller = poller;
744 	return 0;
745 }
746 
747 static int
748 nvme_rdma_qpair_init(struct nvme_rdma_qpair *rqpair)
749 {
750 	int			rc;
751 	struct spdk_rdma_provider_qp_init_attr	attr = {};
752 	struct ibv_device_attr	dev_attr;
753 	struct nvme_rdma_ctrlr	*rctrlr;
754 	uint32_t num_cqe, max_num_cqe;
755 
756 	rc = ibv_query_device(rqpair->cm_id->verbs, &dev_attr);
757 	if (rc != 0) {
758 		SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
759 		return -1;
760 	}
761 
762 	if (rqpair->qpair.poll_group) {
763 		assert(!rqpair->cq);
764 		rc = nvme_rdma_qpair_set_poller(&rqpair->qpair);
765 		if (rc) {
766 			SPDK_ERRLOG("Unable to activate the rdmaqpair.\n");
767 			return -1;
768 		}
769 		assert(rqpair->cq);
770 	} else {
771 		num_cqe = rqpair->num_entries * 2;
772 		max_num_cqe = g_spdk_nvme_transport_opts.rdma_max_cq_size;
773 		if (max_num_cqe != 0 && num_cqe > max_num_cqe) {
774 			num_cqe = max_num_cqe;
775 		}
776 		rqpair->cq = ibv_create_cq(rqpair->cm_id->verbs, num_cqe, rqpair, NULL, 0);
777 		if (!rqpair->cq) {
778 			SPDK_ERRLOG("Unable to create completion queue: errno %d: %s\n", errno, spdk_strerror(errno));
779 			return -1;
780 		}
781 	}
782 
783 	rctrlr = nvme_rdma_ctrlr(rqpair->qpair.ctrlr);
784 	if (g_nvme_hooks.get_ibv_pd) {
785 		attr.pd = g_nvme_hooks.get_ibv_pd(&rctrlr->ctrlr.trid, rqpair->cm_id->verbs);
786 	} else {
787 		attr.pd = spdk_rdma_get_pd(rqpair->cm_id->verbs);
788 	}
789 
790 	attr.stats =		rqpair->poller ? &rqpair->poller->stats.rdma_stats : NULL;
791 	attr.send_cq		= rqpair->cq;
792 	attr.recv_cq		= rqpair->cq;
793 	attr.cap.max_send_wr	= rqpair->num_entries; /* SEND operations */
794 	if (rqpair->srq) {
795 		attr.srq	= rqpair->srq->srq;
796 	} else {
797 		attr.cap.max_recv_wr = rqpair->num_entries; /* RECV operations */
798 	}
799 	attr.cap.max_send_sge	= spdk_min(NVME_RDMA_DEFAULT_TX_SGE, dev_attr.max_sge);
800 	attr.cap.max_recv_sge	= spdk_min(NVME_RDMA_DEFAULT_RX_SGE, dev_attr.max_sge);
801 
802 	rqpair->rdma_qp = spdk_rdma_provider_qp_create(rqpair->cm_id, &attr);
803 
804 	if (!rqpair->rdma_qp) {
805 		return -1;
806 	}
807 
808 	rqpair->memory_domain = nvme_rdma_get_memory_domain(rqpair->rdma_qp->qp->pd);
809 	if (!rqpair->memory_domain) {
810 		SPDK_ERRLOG("Failed to get memory domain\n");
811 		return -1;
812 	}
813 
814 	/* ibv_create_qp will change the values in attr.cap. Make sure we store the proper value. */
815 	rqpair->max_send_sge = spdk_min(NVME_RDMA_DEFAULT_TX_SGE, attr.cap.max_send_sge);
816 	rqpair->max_recv_sge = spdk_min(NVME_RDMA_DEFAULT_RX_SGE, attr.cap.max_recv_sge);
817 	rqpair->current_num_sends = 0;
818 
819 	rqpair->cm_id->context = rqpair;
820 
821 	return 0;
822 }
823 
824 static void
825 nvme_rdma_reset_failed_sends(struct nvme_rdma_qpair *rqpair,
826 			     struct ibv_send_wr *bad_send_wr, int rc)
827 {
828 	SPDK_ERRLOG("Failed to post WRs on send queue, errno %d (%s), bad_wr %p\n",
829 		    rc, spdk_strerror(rc), bad_send_wr);
830 	while (bad_send_wr != NULL) {
831 		assert(rqpair->current_num_sends > 0);
832 		rqpair->current_num_sends--;
833 		bad_send_wr = bad_send_wr->next;
834 	}
835 }
836 
837 static void
838 nvme_rdma_reset_failed_recvs(struct nvme_rdma_rsps *rsps,
839 			     struct ibv_recv_wr *bad_recv_wr, int rc)
840 {
841 	SPDK_ERRLOG("Failed to post WRs on receive queue, errno %d (%s), bad_wr %p\n",
842 		    rc, spdk_strerror(rc), bad_recv_wr);
843 	while (bad_recv_wr != NULL) {
844 		assert(rsps->current_num_recvs > 0);
845 		rsps->current_num_recvs--;
846 		bad_recv_wr = bad_recv_wr->next;
847 	}
848 }
849 
850 static inline int
851 nvme_rdma_qpair_submit_sends(struct nvme_rdma_qpair *rqpair)
852 {
853 	struct ibv_send_wr *bad_send_wr = NULL;
854 	int rc;
855 
856 	rc = spdk_rdma_provider_qp_flush_send_wrs(rqpair->rdma_qp, &bad_send_wr);
857 
858 	if (spdk_unlikely(rc)) {
859 		nvme_rdma_reset_failed_sends(rqpair, bad_send_wr, rc);
860 	}
861 
862 	return rc;
863 }
864 
865 static inline int
866 nvme_rdma_qpair_submit_recvs(struct nvme_rdma_qpair *rqpair)
867 {
868 	struct ibv_recv_wr *bad_recv_wr;
869 	int rc = 0;
870 
871 	rc = spdk_rdma_provider_qp_flush_recv_wrs(rqpair->rdma_qp, &bad_recv_wr);
872 	if (spdk_unlikely(rc)) {
873 		nvme_rdma_reset_failed_recvs(rqpair->rsps, bad_recv_wr, rc);
874 	}
875 
876 	return rc;
877 }
878 
879 static inline int
880 nvme_rdma_poller_submit_recvs(struct nvme_rdma_poller *poller)
881 {
882 	struct ibv_recv_wr *bad_recv_wr;
883 	int rc;
884 
885 	rc = spdk_rdma_provider_srq_flush_recv_wrs(poller->srq, &bad_recv_wr);
886 	if (spdk_unlikely(rc)) {
887 		nvme_rdma_reset_failed_recvs(poller->rsps, bad_recv_wr, rc);
888 	}
889 
890 	return rc;
891 }
892 
893 #define nvme_rdma_trace_ibv_sge(sg_list) \
894 	if (sg_list) { \
895 		SPDK_DEBUGLOG(nvme, "local addr %p length 0x%x lkey 0x%x\n", \
896 			      (void *)(sg_list)->addr, (sg_list)->length, (sg_list)->lkey); \
897 	}
898 
899 static void
900 nvme_rdma_free_rsps(struct nvme_rdma_rsps *rsps)
901 {
902 	if (!rsps) {
903 		return;
904 	}
905 
906 	spdk_free(rsps->rsps);
907 	spdk_free(rsps->rsp_sgls);
908 	spdk_free(rsps->rsp_recv_wrs);
909 	spdk_free(rsps);
910 }
911 
912 static struct nvme_rdma_rsps *
913 nvme_rdma_create_rsps(struct nvme_rdma_rsp_opts *opts)
914 {
915 	struct nvme_rdma_rsps *rsps;
916 	struct spdk_rdma_memory_translation translation;
917 	uint16_t i;
918 	int rc;
919 
920 	rsps = spdk_zmalloc(sizeof(*rsps), 0, NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
921 	if (!rsps) {
922 		SPDK_ERRLOG("Failed to allocate rsps object\n");
923 		return NULL;
924 	}
925 
926 	rsps->rsp_sgls = spdk_zmalloc(opts->num_entries * sizeof(*rsps->rsp_sgls), 0, NULL,
927 				      SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
928 	if (!rsps->rsp_sgls) {
929 		SPDK_ERRLOG("Failed to allocate rsp_sgls\n");
930 		goto fail;
931 	}
932 
933 	rsps->rsp_recv_wrs = spdk_zmalloc(opts->num_entries * sizeof(*rsps->rsp_recv_wrs), 0, NULL,
934 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
935 	if (!rsps->rsp_recv_wrs) {
936 		SPDK_ERRLOG("Failed to allocate rsp_recv_wrs\n");
937 		goto fail;
938 	}
939 
940 	rsps->rsps = spdk_zmalloc(opts->num_entries * sizeof(*rsps->rsps), 0, NULL,
941 				  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
942 	if (!rsps->rsps) {
943 		SPDK_ERRLOG("can not allocate rdma rsps\n");
944 		goto fail;
945 	}
946 
947 	for (i = 0; i < opts->num_entries; i++) {
948 		struct ibv_sge *rsp_sgl = &rsps->rsp_sgls[i];
949 		struct spdk_nvme_rdma_rsp *rsp = &rsps->rsps[i];
950 		struct ibv_recv_wr *recv_wr = &rsps->rsp_recv_wrs[i];
951 
952 		rsp->rqpair = opts->rqpair;
953 		rsp->rdma_wr.type = RDMA_WR_TYPE_RECV;
954 		rsp->recv_wr = recv_wr;
955 		rsp_sgl->addr = (uint64_t)rsp;
956 		rsp_sgl->length = sizeof(struct spdk_nvme_cpl);
957 		rc = spdk_rdma_get_translation(opts->mr_map, rsp, sizeof(*rsp), &translation);
958 		if (rc) {
959 			goto fail;
960 		}
961 		rsp_sgl->lkey = spdk_rdma_memory_translation_get_lkey(&translation);
962 
963 		recv_wr->wr_id = (uint64_t)&rsp->rdma_wr;
964 		recv_wr->next = NULL;
965 		recv_wr->sg_list = rsp_sgl;
966 		recv_wr->num_sge = 1;
967 
968 		nvme_rdma_trace_ibv_sge(recv_wr->sg_list);
969 
970 		if (opts->rqpair) {
971 			spdk_rdma_provider_qp_queue_recv_wrs(opts->rqpair->rdma_qp, recv_wr);
972 		} else {
973 			spdk_rdma_provider_srq_queue_recv_wrs(opts->srq, recv_wr);
974 		}
975 	}
976 
977 	rsps->num_entries = opts->num_entries;
978 	rsps->current_num_recvs = opts->num_entries;
979 
980 	return rsps;
981 fail:
982 	nvme_rdma_free_rsps(rsps);
983 	return NULL;
984 }
985 
986 static void
987 nvme_rdma_free_reqs(struct nvme_rdma_qpair *rqpair)
988 {
989 	if (!rqpair->rdma_reqs) {
990 		return;
991 	}
992 
993 	spdk_free(rqpair->cmds);
994 	rqpair->cmds = NULL;
995 
996 	spdk_free(rqpair->rdma_reqs);
997 	rqpair->rdma_reqs = NULL;
998 }
999 
1000 static int
1001 nvme_rdma_create_reqs(struct nvme_rdma_qpair *rqpair)
1002 {
1003 	struct spdk_rdma_memory_translation translation;
1004 	uint16_t i;
1005 	int rc;
1006 
1007 	assert(!rqpair->rdma_reqs);
1008 	rqpair->rdma_reqs = spdk_zmalloc(rqpair->num_entries * sizeof(struct spdk_nvme_rdma_req), 0, NULL,
1009 					 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1010 	if (rqpair->rdma_reqs == NULL) {
1011 		SPDK_ERRLOG("Failed to allocate rdma_reqs\n");
1012 		goto fail;
1013 	}
1014 
1015 	assert(!rqpair->cmds);
1016 	rqpair->cmds = spdk_zmalloc(rqpair->num_entries * sizeof(*rqpair->cmds), 0, NULL,
1017 				    SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1018 	if (!rqpair->cmds) {
1019 		SPDK_ERRLOG("Failed to allocate RDMA cmds\n");
1020 		goto fail;
1021 	}
1022 
1023 	TAILQ_INIT(&rqpair->free_reqs);
1024 	TAILQ_INIT(&rqpair->outstanding_reqs);
1025 	for (i = 0; i < rqpair->num_entries; i++) {
1026 		struct spdk_nvme_rdma_req	*rdma_req;
1027 		struct spdk_nvmf_cmd		*cmd;
1028 
1029 		rdma_req = &rqpair->rdma_reqs[i];
1030 		rdma_req->rdma_wr.type = RDMA_WR_TYPE_SEND;
1031 		cmd = &rqpair->cmds[i];
1032 
1033 		rdma_req->id = i;
1034 
1035 		rc = spdk_rdma_get_translation(rqpair->mr_map, cmd, sizeof(*cmd), &translation);
1036 		if (rc) {
1037 			goto fail;
1038 		}
1039 		rdma_req->send_sgl[0].lkey = spdk_rdma_memory_translation_get_lkey(&translation);
1040 
1041 		/* The first RDMA sgl element will always point
1042 		 * at this data structure. Depending on whether
1043 		 * an NVMe-oF SGL is required, the length of
1044 		 * this element may change. */
1045 		rdma_req->send_sgl[0].addr = (uint64_t)cmd;
1046 		rdma_req->send_wr.wr_id = (uint64_t)&rdma_req->rdma_wr;
1047 		rdma_req->send_wr.next = NULL;
1048 		rdma_req->send_wr.opcode = IBV_WR_SEND;
1049 		rdma_req->send_wr.send_flags = IBV_SEND_SIGNALED;
1050 		rdma_req->send_wr.sg_list = rdma_req->send_sgl;
1051 		rdma_req->send_wr.imm_data = 0;
1052 
1053 		TAILQ_INSERT_TAIL(&rqpair->free_reqs, rdma_req, link);
1054 	}
1055 
1056 	return 0;
1057 fail:
1058 	nvme_rdma_free_reqs(rqpair);
1059 	return -ENOMEM;
1060 }
1061 
1062 static int nvme_rdma_connect(struct nvme_rdma_qpair *rqpair);
1063 
1064 static int
1065 nvme_rdma_route_resolved(struct nvme_rdma_qpair *rqpair, int ret)
1066 {
1067 	if (ret) {
1068 		SPDK_ERRLOG("RDMA route resolution error\n");
1069 		return -1;
1070 	}
1071 
1072 	ret = nvme_rdma_qpair_init(rqpair);
1073 	if (ret < 0) {
1074 		SPDK_ERRLOG("nvme_rdma_qpair_init() failed\n");
1075 		return -1;
1076 	}
1077 
1078 	return nvme_rdma_connect(rqpair);
1079 }
1080 
1081 static int
1082 nvme_rdma_addr_resolved(struct nvme_rdma_qpair *rqpair, int ret)
1083 {
1084 	if (ret) {
1085 		SPDK_ERRLOG("RDMA address resolution error\n");
1086 		return -1;
1087 	}
1088 
1089 	if (rqpair->qpair.ctrlr->opts.transport_ack_timeout != SPDK_NVME_TRANSPORT_ACK_TIMEOUT_DISABLED) {
1090 #ifdef SPDK_CONFIG_RDMA_SET_ACK_TIMEOUT
1091 		uint8_t timeout = rqpair->qpair.ctrlr->opts.transport_ack_timeout;
1092 		ret = rdma_set_option(rqpair->cm_id, RDMA_OPTION_ID,
1093 				      RDMA_OPTION_ID_ACK_TIMEOUT,
1094 				      &timeout, sizeof(timeout));
1095 		if (ret) {
1096 			SPDK_NOTICELOG("Can't apply RDMA_OPTION_ID_ACK_TIMEOUT %d, ret %d\n", timeout, ret);
1097 		}
1098 #else
1099 		SPDK_DEBUGLOG(nvme, "transport_ack_timeout is not supported\n");
1100 #endif
1101 	}
1102 
1103 	if (rqpair->qpair.ctrlr->opts.transport_tos != SPDK_NVME_TRANSPORT_TOS_DISABLED) {
1104 #ifdef SPDK_CONFIG_RDMA_SET_TOS
1105 		uint8_t tos = rqpair->qpair.ctrlr->opts.transport_tos;
1106 		ret = rdma_set_option(rqpair->cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_TOS, &tos, sizeof(tos));
1107 		if (ret) {
1108 			SPDK_NOTICELOG("Can't apply RDMA_OPTION_ID_TOS %u, ret %d\n", tos, ret);
1109 		}
1110 #else
1111 		SPDK_DEBUGLOG(nvme, "transport_tos is not supported\n");
1112 #endif
1113 	}
1114 
1115 	ret = rdma_resolve_route(rqpair->cm_id, NVME_RDMA_TIME_OUT_IN_MS);
1116 	if (ret) {
1117 		SPDK_ERRLOG("rdma_resolve_route\n");
1118 		return ret;
1119 	}
1120 
1121 	return nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_ROUTE_RESOLVED,
1122 					     nvme_rdma_route_resolved);
1123 }
1124 
1125 static int
1126 nvme_rdma_resolve_addr(struct nvme_rdma_qpair *rqpair,
1127 		       struct sockaddr *src_addr,
1128 		       struct sockaddr *dst_addr)
1129 {
1130 	int ret;
1131 
1132 	if (src_addr) {
1133 		int reuse = 1;
1134 
1135 		ret = rdma_set_option(rqpair->cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
1136 				      &reuse, sizeof(reuse));
1137 		if (ret) {
1138 			SPDK_NOTICELOG("Can't apply RDMA_OPTION_ID_REUSEADDR %d, ret %d\n",
1139 				       reuse, ret);
1140 			/* It is likely that rdma_resolve_addr() returns -EADDRINUSE, but
1141 			 * we may missing something. We rely on rdma_resolve_addr().
1142 			 */
1143 		}
1144 	}
1145 
1146 	ret = rdma_resolve_addr(rqpair->cm_id, src_addr, dst_addr,
1147 				NVME_RDMA_TIME_OUT_IN_MS);
1148 	if (ret) {
1149 		SPDK_ERRLOG("rdma_resolve_addr, %d\n", errno);
1150 		return ret;
1151 	}
1152 
1153 	return nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_ADDR_RESOLVED,
1154 					     nvme_rdma_addr_resolved);
1155 }
1156 
1157 static int nvme_rdma_stale_conn_retry(struct nvme_rdma_qpair *rqpair);
1158 
1159 static int
1160 nvme_rdma_connect_established(struct nvme_rdma_qpair *rqpair, int ret)
1161 {
1162 	struct nvme_rdma_rsp_opts opts = {};
1163 
1164 	if (ret == -ESTALE) {
1165 		return nvme_rdma_stale_conn_retry(rqpair);
1166 	} else if (ret) {
1167 		SPDK_ERRLOG("RDMA connect error %d\n", ret);
1168 		return ret;
1169 	}
1170 
1171 	assert(!rqpair->mr_map);
1172 	rqpair->mr_map = spdk_rdma_create_mem_map(rqpair->rdma_qp->qp->pd, &g_nvme_hooks,
1173 			 SPDK_RDMA_MEMORY_MAP_ROLE_INITIATOR);
1174 	if (!rqpair->mr_map) {
1175 		SPDK_ERRLOG("Unable to register RDMA memory translation map\n");
1176 		return -1;
1177 	}
1178 
1179 	ret = nvme_rdma_create_reqs(rqpair);
1180 	SPDK_DEBUGLOG(nvme, "rc =%d\n", ret);
1181 	if (ret) {
1182 		SPDK_ERRLOG("Unable to create rqpair RDMA requests\n");
1183 		return -1;
1184 	}
1185 	SPDK_DEBUGLOG(nvme, "RDMA requests created\n");
1186 
1187 	if (!rqpair->srq) {
1188 		opts.num_entries = rqpair->num_entries;
1189 		opts.rqpair = rqpair;
1190 		opts.srq = NULL;
1191 		opts.mr_map = rqpair->mr_map;
1192 
1193 		assert(!rqpair->rsps);
1194 		rqpair->rsps = nvme_rdma_create_rsps(&opts);
1195 		if (!rqpair->rsps) {
1196 			SPDK_ERRLOG("Unable to create rqpair RDMA responses\n");
1197 			return -1;
1198 		}
1199 		SPDK_DEBUGLOG(nvme, "RDMA responses created\n");
1200 
1201 		ret = nvme_rdma_qpair_submit_recvs(rqpair);
1202 		SPDK_DEBUGLOG(nvme, "rc =%d\n", ret);
1203 		if (ret) {
1204 			SPDK_ERRLOG("Unable to submit rqpair RDMA responses\n");
1205 			return -1;
1206 		}
1207 		SPDK_DEBUGLOG(nvme, "RDMA responses submitted\n");
1208 	}
1209 
1210 	rqpair->state = NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_SEND;
1211 
1212 	return 0;
1213 }
1214 
1215 static int
1216 nvme_rdma_connect(struct nvme_rdma_qpair *rqpair)
1217 {
1218 	struct rdma_conn_param				param = {};
1219 	struct spdk_nvmf_rdma_request_private_data	request_data = {};
1220 	struct ibv_device_attr				attr;
1221 	int						ret;
1222 	struct spdk_nvme_ctrlr				*ctrlr;
1223 
1224 	ret = ibv_query_device(rqpair->cm_id->verbs, &attr);
1225 	if (ret != 0) {
1226 		SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
1227 		return ret;
1228 	}
1229 
1230 	param.responder_resources = attr.max_qp_rd_atom;
1231 
1232 	ctrlr = rqpair->qpair.ctrlr;
1233 	if (!ctrlr) {
1234 		return -1;
1235 	}
1236 
1237 	request_data.qid = rqpair->qpair.id;
1238 	request_data.hrqsize = rqpair->num_entries + 1;
1239 	request_data.hsqsize = rqpair->num_entries;
1240 	request_data.cntlid = ctrlr->cntlid;
1241 
1242 	param.private_data = &request_data;
1243 	param.private_data_len = sizeof(request_data);
1244 	param.retry_count = ctrlr->opts.transport_retry_count;
1245 	param.rnr_retry_count = 7;
1246 
1247 	/* Fields below are ignored by rdma cm if qpair has been
1248 	 * created using rdma cm API. */
1249 	param.srq = 0;
1250 	param.qp_num = rqpair->rdma_qp->qp->qp_num;
1251 
1252 	ret = rdma_connect(rqpair->cm_id, &param);
1253 	if (ret) {
1254 		SPDK_ERRLOG("nvme rdma connect error\n");
1255 		return ret;
1256 	}
1257 
1258 	return nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_ESTABLISHED,
1259 					     nvme_rdma_connect_established);
1260 }
1261 
1262 static int
1263 nvme_rdma_ctrlr_connect_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
1264 {
1265 	struct sockaddr_storage dst_addr;
1266 	struct sockaddr_storage src_addr;
1267 	bool src_addr_specified;
1268 	long int port, src_port;
1269 	int rc;
1270 	struct nvme_rdma_ctrlr *rctrlr;
1271 	struct nvme_rdma_qpair *rqpair;
1272 	struct nvme_rdma_poll_group *group;
1273 	int family;
1274 
1275 	rqpair = nvme_rdma_qpair(qpair);
1276 	rctrlr = nvme_rdma_ctrlr(ctrlr);
1277 	assert(rctrlr != NULL);
1278 
1279 	switch (ctrlr->trid.adrfam) {
1280 	case SPDK_NVMF_ADRFAM_IPV4:
1281 		family = AF_INET;
1282 		break;
1283 	case SPDK_NVMF_ADRFAM_IPV6:
1284 		family = AF_INET6;
1285 		break;
1286 	default:
1287 		SPDK_ERRLOG("Unhandled ADRFAM %d\n", ctrlr->trid.adrfam);
1288 		return -1;
1289 	}
1290 
1291 	SPDK_DEBUGLOG(nvme, "adrfam %d ai_family %d\n", ctrlr->trid.adrfam, family);
1292 
1293 	memset(&dst_addr, 0, sizeof(dst_addr));
1294 
1295 	SPDK_DEBUGLOG(nvme, "trsvcid is %s\n", ctrlr->trid.trsvcid);
1296 	rc = nvme_parse_addr(&dst_addr, family, ctrlr->trid.traddr, ctrlr->trid.trsvcid, &port);
1297 	if (rc != 0) {
1298 		SPDK_ERRLOG("dst_addr nvme_parse_addr() failed\n");
1299 		return -1;
1300 	}
1301 
1302 	if (ctrlr->opts.src_addr[0] || ctrlr->opts.src_svcid[0]) {
1303 		memset(&src_addr, 0, sizeof(src_addr));
1304 		rc = nvme_parse_addr(&src_addr, family, ctrlr->opts.src_addr, ctrlr->opts.src_svcid, &src_port);
1305 		if (rc != 0) {
1306 			SPDK_ERRLOG("src_addr nvme_parse_addr() failed\n");
1307 			return -1;
1308 		}
1309 		src_addr_specified = true;
1310 	} else {
1311 		src_addr_specified = false;
1312 	}
1313 
1314 	rc = rdma_create_id(rctrlr->cm_channel, &rqpair->cm_id, rqpair, RDMA_PS_TCP);
1315 	if (rc < 0) {
1316 		SPDK_ERRLOG("rdma_create_id() failed\n");
1317 		return -1;
1318 	}
1319 
1320 	rc = nvme_rdma_resolve_addr(rqpair,
1321 				    src_addr_specified ? (struct sockaddr *)&src_addr : NULL,
1322 				    (struct sockaddr *)&dst_addr);
1323 	if (rc < 0) {
1324 		SPDK_ERRLOG("nvme_rdma_resolve_addr() failed\n");
1325 		return -1;
1326 	}
1327 
1328 	rqpair->state = NVME_RDMA_QPAIR_STATE_INITIALIZING;
1329 
1330 	if (qpair->poll_group != NULL) {
1331 		group = nvme_rdma_poll_group(qpair->poll_group);
1332 		TAILQ_INSERT_TAIL(&group->connecting_qpairs, rqpair, link_connecting);
1333 	}
1334 
1335 	return 0;
1336 }
1337 
1338 static int
1339 nvme_rdma_stale_conn_reconnect(struct nvme_rdma_qpair *rqpair)
1340 {
1341 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
1342 
1343 	if (spdk_get_ticks() < rqpair->evt_timeout_ticks) {
1344 		return -EAGAIN;
1345 	}
1346 
1347 	return nvme_rdma_ctrlr_connect_qpair(qpair->ctrlr, qpair);
1348 }
1349 
1350 static int
1351 nvme_rdma_ctrlr_connect_qpair_poll(struct spdk_nvme_ctrlr *ctrlr,
1352 				   struct spdk_nvme_qpair *qpair)
1353 {
1354 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
1355 	int rc;
1356 
1357 	if (rqpair->in_connect_poll) {
1358 		return -EAGAIN;
1359 	}
1360 
1361 	rqpair->in_connect_poll = true;
1362 
1363 	switch (rqpair->state) {
1364 	case NVME_RDMA_QPAIR_STATE_INVALID:
1365 		rc = -EAGAIN;
1366 		break;
1367 
1368 	case NVME_RDMA_QPAIR_STATE_INITIALIZING:
1369 	case NVME_RDMA_QPAIR_STATE_EXITING:
1370 		if (!nvme_qpair_is_admin_queue(qpair)) {
1371 			nvme_robust_mutex_lock(&ctrlr->ctrlr_lock);
1372 		}
1373 
1374 		rc = nvme_rdma_process_event_poll(rqpair);
1375 
1376 		if (!nvme_qpair_is_admin_queue(qpair)) {
1377 			nvme_robust_mutex_unlock(&ctrlr->ctrlr_lock);
1378 		}
1379 
1380 		if (rc == 0) {
1381 			rc = -EAGAIN;
1382 		}
1383 		rqpair->in_connect_poll = false;
1384 
1385 		return rc;
1386 
1387 	case NVME_RDMA_QPAIR_STATE_STALE_CONN:
1388 		rc = nvme_rdma_stale_conn_reconnect(rqpair);
1389 		if (rc == 0) {
1390 			rc = -EAGAIN;
1391 		}
1392 		break;
1393 	case NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_SEND:
1394 		rc = nvme_fabric_qpair_connect_async(qpair, rqpair->num_entries + 1);
1395 		if (rc == 0) {
1396 			rqpair->state = NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_POLL;
1397 			rc = -EAGAIN;
1398 		} else {
1399 			SPDK_ERRLOG("Failed to send an NVMe-oF Fabric CONNECT command\n");
1400 		}
1401 		break;
1402 	case NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_POLL:
1403 		rc = nvme_fabric_qpair_connect_poll(qpair);
1404 		if (rc == 0) {
1405 			rqpair->state = NVME_RDMA_QPAIR_STATE_RUNNING;
1406 			nvme_qpair_set_state(qpair, NVME_QPAIR_CONNECTED);
1407 		} else if (rc != -EAGAIN) {
1408 			SPDK_ERRLOG("Failed to poll NVMe-oF Fabric CONNECT command\n");
1409 		}
1410 		break;
1411 	case NVME_RDMA_QPAIR_STATE_RUNNING:
1412 		rc = 0;
1413 		break;
1414 	default:
1415 		assert(false);
1416 		rc = -EINVAL;
1417 		break;
1418 	}
1419 
1420 	rqpair->in_connect_poll = false;
1421 
1422 	return rc;
1423 }
1424 
1425 static inline int
1426 nvme_rdma_get_memory_translation(struct nvme_request *req, struct nvme_rdma_qpair *rqpair,
1427 				 struct nvme_rdma_memory_translation_ctx *_ctx)
1428 {
1429 	struct spdk_memory_domain_translation_ctx ctx;
1430 	struct spdk_memory_domain_translation_result dma_translation = {.iov_count = 0};
1431 	struct spdk_rdma_memory_translation rdma_translation;
1432 	int rc;
1433 
1434 	assert(req);
1435 	assert(rqpair);
1436 	assert(_ctx);
1437 
1438 	if (req->payload.opts && req->payload.opts->memory_domain) {
1439 		ctx.size = sizeof(struct spdk_memory_domain_translation_ctx);
1440 		ctx.rdma.ibv_qp = rqpair->rdma_qp->qp;
1441 		dma_translation.size = sizeof(struct spdk_memory_domain_translation_result);
1442 
1443 		rc = spdk_memory_domain_translate_data(req->payload.opts->memory_domain,
1444 						       req->payload.opts->memory_domain_ctx,
1445 						       rqpair->memory_domain->domain, &ctx, _ctx->addr,
1446 						       _ctx->length, &dma_translation);
1447 		if (spdk_unlikely(rc) || dma_translation.iov_count != 1) {
1448 			SPDK_ERRLOG("DMA memory translation failed, rc %d, iov count %u\n", rc, dma_translation.iov_count);
1449 			return rc;
1450 		}
1451 
1452 		_ctx->lkey = dma_translation.rdma.lkey;
1453 		_ctx->rkey = dma_translation.rdma.rkey;
1454 		_ctx->addr = dma_translation.iov.iov_base;
1455 		_ctx->length = dma_translation.iov.iov_len;
1456 	} else {
1457 		rc = spdk_rdma_get_translation(rqpair->mr_map, _ctx->addr, _ctx->length, &rdma_translation);
1458 		if (spdk_unlikely(rc)) {
1459 			SPDK_ERRLOG("RDMA memory translation failed, rc %d\n", rc);
1460 			return rc;
1461 		}
1462 		if (rdma_translation.translation_type == SPDK_RDMA_TRANSLATION_MR) {
1463 			_ctx->lkey = rdma_translation.mr_or_key.mr->lkey;
1464 			_ctx->rkey = rdma_translation.mr_or_key.mr->rkey;
1465 		} else {
1466 			_ctx->lkey = _ctx->rkey = (uint32_t)rdma_translation.mr_or_key.key;
1467 		}
1468 	}
1469 
1470 	return 0;
1471 }
1472 
1473 
1474 /*
1475  * Build SGL describing empty payload.
1476  */
1477 static int
1478 nvme_rdma_build_null_request(struct spdk_nvme_rdma_req *rdma_req)
1479 {
1480 	struct nvme_request *req = rdma_req->req;
1481 
1482 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1483 
1484 	/* The first element of this SGL is pointing at an
1485 	 * spdk_nvmf_cmd object. For this particular command,
1486 	 * we only need the first 64 bytes corresponding to
1487 	 * the NVMe command. */
1488 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1489 
1490 	/* The RDMA SGL needs one element describing the NVMe command. */
1491 	rdma_req->send_wr.num_sge = 1;
1492 
1493 	req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1494 	req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1495 	req->cmd.dptr.sgl1.keyed.length = 0;
1496 	req->cmd.dptr.sgl1.keyed.key = 0;
1497 	req->cmd.dptr.sgl1.address = 0;
1498 
1499 	return 0;
1500 }
1501 
1502 /*
1503  * Build inline SGL describing contiguous payload buffer.
1504  */
1505 static int
1506 nvme_rdma_build_contig_inline_request(struct nvme_rdma_qpair *rqpair,
1507 				      struct spdk_nvme_rdma_req *rdma_req)
1508 {
1509 	struct nvme_request *req = rdma_req->req;
1510 	struct nvme_rdma_memory_translation_ctx ctx = {
1511 		.addr = (uint8_t *)req->payload.contig_or_cb_arg + req->payload_offset,
1512 		.length = req->payload_size
1513 	};
1514 	int rc;
1515 
1516 	assert(ctx.length != 0);
1517 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
1518 
1519 	rc = nvme_rdma_get_memory_translation(req, rqpair, &ctx);
1520 	if (spdk_unlikely(rc)) {
1521 		return -1;
1522 	}
1523 
1524 	rdma_req->send_sgl[1].lkey = ctx.lkey;
1525 
1526 	/* The first element of this SGL is pointing at an
1527 	 * spdk_nvmf_cmd object. For this particular command,
1528 	 * we only need the first 64 bytes corresponding to
1529 	 * the NVMe command. */
1530 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1531 
1532 	rdma_req->send_sgl[1].addr = (uint64_t)ctx.addr;
1533 	rdma_req->send_sgl[1].length = (uint32_t)ctx.length;
1534 
1535 	/* The RDMA SGL contains two elements. The first describes
1536 	 * the NVMe command and the second describes the data
1537 	 * payload. */
1538 	rdma_req->send_wr.num_sge = 2;
1539 
1540 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1541 	req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
1542 	req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1543 	req->cmd.dptr.sgl1.unkeyed.length = (uint32_t)ctx.length;
1544 	/* Inline only supported for icdoff == 0 currently.  This function will
1545 	 * not get called for controllers with other values. */
1546 	req->cmd.dptr.sgl1.address = (uint64_t)0;
1547 
1548 	return 0;
1549 }
1550 
1551 /*
1552  * Build SGL describing contiguous payload buffer.
1553  */
1554 static int
1555 nvme_rdma_build_contig_request(struct nvme_rdma_qpair *rqpair,
1556 			       struct spdk_nvme_rdma_req *rdma_req)
1557 {
1558 	struct nvme_request *req = rdma_req->req;
1559 	struct nvme_rdma_memory_translation_ctx ctx = {
1560 		.addr = (uint8_t *)req->payload.contig_or_cb_arg + req->payload_offset,
1561 		.length = req->payload_size
1562 	};
1563 	int rc;
1564 
1565 	assert(req->payload_size != 0);
1566 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
1567 
1568 	if (spdk_unlikely(req->payload_size > NVME_RDMA_MAX_KEYED_SGL_LENGTH)) {
1569 		SPDK_ERRLOG("SGL length %u exceeds max keyed SGL block size %u\n",
1570 			    req->payload_size, NVME_RDMA_MAX_KEYED_SGL_LENGTH);
1571 		return -1;
1572 	}
1573 
1574 	rc = nvme_rdma_get_memory_translation(req, rqpair, &ctx);
1575 	if (spdk_unlikely(rc)) {
1576 		return -1;
1577 	}
1578 
1579 	req->cmd.dptr.sgl1.keyed.key = ctx.rkey;
1580 
1581 	/* The first element of this SGL is pointing at an
1582 	 * spdk_nvmf_cmd object. For this particular command,
1583 	 * we only need the first 64 bytes corresponding to
1584 	 * the NVMe command. */
1585 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1586 
1587 	/* The RDMA SGL needs one element describing the NVMe command. */
1588 	rdma_req->send_wr.num_sge = 1;
1589 
1590 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1591 	req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1592 	req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1593 	req->cmd.dptr.sgl1.keyed.length = (uint32_t)ctx.length;
1594 	req->cmd.dptr.sgl1.address = (uint64_t)ctx.addr;
1595 
1596 	return 0;
1597 }
1598 
1599 /*
1600  * Build SGL describing scattered payload buffer.
1601  */
1602 static int
1603 nvme_rdma_build_sgl_request(struct nvme_rdma_qpair *rqpair,
1604 			    struct spdk_nvme_rdma_req *rdma_req)
1605 {
1606 	struct nvme_request *req = rdma_req->req;
1607 	struct spdk_nvmf_cmd *cmd = &rqpair->cmds[rdma_req->id];
1608 	struct nvme_rdma_memory_translation_ctx ctx;
1609 	uint32_t remaining_size;
1610 	uint32_t sge_length;
1611 	int rc, max_num_sgl, num_sgl_desc;
1612 
1613 	assert(req->payload_size != 0);
1614 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
1615 	assert(req->payload.reset_sgl_fn != NULL);
1616 	assert(req->payload.next_sge_fn != NULL);
1617 	req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
1618 
1619 	max_num_sgl = req->qpair->ctrlr->max_sges;
1620 
1621 	remaining_size = req->payload_size;
1622 	num_sgl_desc = 0;
1623 	do {
1624 		rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &ctx.addr, &sge_length);
1625 		if (rc) {
1626 			return -1;
1627 		}
1628 
1629 		sge_length = spdk_min(remaining_size, sge_length);
1630 
1631 		if (spdk_unlikely(sge_length > NVME_RDMA_MAX_KEYED_SGL_LENGTH)) {
1632 			SPDK_ERRLOG("SGL length %u exceeds max keyed SGL block size %u\n",
1633 				    sge_length, NVME_RDMA_MAX_KEYED_SGL_LENGTH);
1634 			return -1;
1635 		}
1636 		ctx.length = sge_length;
1637 		rc = nvme_rdma_get_memory_translation(req, rqpair, &ctx);
1638 		if (spdk_unlikely(rc)) {
1639 			return -1;
1640 		}
1641 
1642 		cmd->sgl[num_sgl_desc].keyed.key = ctx.rkey;
1643 		cmd->sgl[num_sgl_desc].keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1644 		cmd->sgl[num_sgl_desc].keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1645 		cmd->sgl[num_sgl_desc].keyed.length = (uint32_t)ctx.length;
1646 		cmd->sgl[num_sgl_desc].address = (uint64_t)ctx.addr;
1647 
1648 		remaining_size -= ctx.length;
1649 		num_sgl_desc++;
1650 	} while (remaining_size > 0 && num_sgl_desc < max_num_sgl);
1651 
1652 
1653 	/* Should be impossible if we did our sgl checks properly up the stack, but do a sanity check here. */
1654 	if (remaining_size > 0) {
1655 		return -1;
1656 	}
1657 
1658 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1659 
1660 	/* The RDMA SGL needs one element describing some portion
1661 	 * of the spdk_nvmf_cmd structure. */
1662 	rdma_req->send_wr.num_sge = 1;
1663 
1664 	/*
1665 	 * If only one SGL descriptor is required, it can be embedded directly in the command
1666 	 * as a data block descriptor.
1667 	 */
1668 	if (num_sgl_desc == 1) {
1669 		/* The first element of this SGL is pointing at an
1670 		 * spdk_nvmf_cmd object. For this particular command,
1671 		 * we only need the first 64 bytes corresponding to
1672 		 * the NVMe command. */
1673 		rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1674 
1675 		req->cmd.dptr.sgl1.keyed.type = cmd->sgl[0].keyed.type;
1676 		req->cmd.dptr.sgl1.keyed.subtype = cmd->sgl[0].keyed.subtype;
1677 		req->cmd.dptr.sgl1.keyed.length = cmd->sgl[0].keyed.length;
1678 		req->cmd.dptr.sgl1.keyed.key = cmd->sgl[0].keyed.key;
1679 		req->cmd.dptr.sgl1.address = cmd->sgl[0].address;
1680 	} else {
1681 		/*
1682 		 * Otherwise, The SGL descriptor embedded in the command must point to the list of
1683 		 * SGL descriptors used to describe the operation. In that case it is a last segment descriptor.
1684 		 */
1685 		uint32_t descriptors_size = sizeof(struct spdk_nvme_sgl_descriptor) * num_sgl_desc;
1686 
1687 		if (spdk_unlikely(descriptors_size > rqpair->qpair.ctrlr->ioccsz_bytes)) {
1688 			SPDK_ERRLOG("Size of SGL descriptors (%u) exceeds ICD (%u)\n",
1689 				    descriptors_size, rqpair->qpair.ctrlr->ioccsz_bytes);
1690 			return -1;
1691 		}
1692 		rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd) + descriptors_size;
1693 
1694 		req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_LAST_SEGMENT;
1695 		req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1696 		req->cmd.dptr.sgl1.unkeyed.length = descriptors_size;
1697 		req->cmd.dptr.sgl1.address = (uint64_t)0;
1698 	}
1699 
1700 	return 0;
1701 }
1702 
1703 /*
1704  * Build inline SGL describing sgl payload buffer.
1705  */
1706 static int
1707 nvme_rdma_build_sgl_inline_request(struct nvme_rdma_qpair *rqpair,
1708 				   struct spdk_nvme_rdma_req *rdma_req)
1709 {
1710 	struct nvme_request *req = rdma_req->req;
1711 	struct nvme_rdma_memory_translation_ctx ctx;
1712 	uint32_t length;
1713 	int rc;
1714 
1715 	assert(req->payload_size != 0);
1716 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
1717 	assert(req->payload.reset_sgl_fn != NULL);
1718 	assert(req->payload.next_sge_fn != NULL);
1719 	req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
1720 
1721 	rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &ctx.addr, &length);
1722 	if (rc) {
1723 		return -1;
1724 	}
1725 
1726 	if (length < req->payload_size) {
1727 		SPDK_DEBUGLOG(nvme, "Inline SGL request split so sending separately.\n");
1728 		return nvme_rdma_build_sgl_request(rqpair, rdma_req);
1729 	}
1730 
1731 	if (length > req->payload_size) {
1732 		length = req->payload_size;
1733 	}
1734 
1735 	ctx.length = length;
1736 	rc = nvme_rdma_get_memory_translation(req, rqpair, &ctx);
1737 	if (spdk_unlikely(rc)) {
1738 		return -1;
1739 	}
1740 
1741 	rdma_req->send_sgl[1].addr = (uint64_t)ctx.addr;
1742 	rdma_req->send_sgl[1].length = (uint32_t)ctx.length;
1743 	rdma_req->send_sgl[1].lkey = ctx.lkey;
1744 
1745 	rdma_req->send_wr.num_sge = 2;
1746 
1747 	/* The first element of this SGL is pointing at an
1748 	 * spdk_nvmf_cmd object. For this particular command,
1749 	 * we only need the first 64 bytes corresponding to
1750 	 * the NVMe command. */
1751 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1752 
1753 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1754 	req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
1755 	req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1756 	req->cmd.dptr.sgl1.unkeyed.length = (uint32_t)ctx.length;
1757 	/* Inline only supported for icdoff == 0 currently.  This function will
1758 	 * not get called for controllers with other values. */
1759 	req->cmd.dptr.sgl1.address = (uint64_t)0;
1760 
1761 	return 0;
1762 }
1763 
1764 static int
1765 nvme_rdma_req_init(struct nvme_rdma_qpair *rqpair, struct nvme_request *req,
1766 		   struct spdk_nvme_rdma_req *rdma_req)
1767 {
1768 	struct spdk_nvme_ctrlr *ctrlr = rqpair->qpair.ctrlr;
1769 	enum nvme_payload_type payload_type;
1770 	bool icd_supported;
1771 	int rc;
1772 
1773 	assert(rdma_req->req == NULL);
1774 	rdma_req->req = req;
1775 	req->cmd.cid = rdma_req->id;
1776 	payload_type = nvme_payload_type(&req->payload);
1777 	/*
1778 	 * Check if icdoff is non zero, to avoid interop conflicts with
1779 	 * targets with non-zero icdoff.  Both SPDK and the Linux kernel
1780 	 * targets use icdoff = 0.  For targets with non-zero icdoff, we
1781 	 * will currently just not use inline data for now.
1782 	 */
1783 	icd_supported = spdk_nvme_opc_get_data_transfer(req->cmd.opc) == SPDK_NVME_DATA_HOST_TO_CONTROLLER
1784 			&& req->payload_size <= ctrlr->ioccsz_bytes && ctrlr->icdoff == 0;
1785 
1786 	if (req->payload_size == 0) {
1787 		rc = nvme_rdma_build_null_request(rdma_req);
1788 	} else if (payload_type == NVME_PAYLOAD_TYPE_CONTIG) {
1789 		if (icd_supported) {
1790 			rc = nvme_rdma_build_contig_inline_request(rqpair, rdma_req);
1791 		} else {
1792 			rc = nvme_rdma_build_contig_request(rqpair, rdma_req);
1793 		}
1794 	} else if (payload_type == NVME_PAYLOAD_TYPE_SGL) {
1795 		if (icd_supported) {
1796 			rc = nvme_rdma_build_sgl_inline_request(rqpair, rdma_req);
1797 		} else {
1798 			rc = nvme_rdma_build_sgl_request(rqpair, rdma_req);
1799 		}
1800 	} else {
1801 		rc = -1;
1802 	}
1803 
1804 	if (rc) {
1805 		rdma_req->req = NULL;
1806 		return rc;
1807 	}
1808 
1809 	memcpy(&rqpair->cmds[rdma_req->id], &req->cmd, sizeof(req->cmd));
1810 	return 0;
1811 }
1812 
1813 static struct spdk_nvme_qpair *
1814 nvme_rdma_ctrlr_create_qpair(struct spdk_nvme_ctrlr *ctrlr,
1815 			     uint16_t qid, uint32_t qsize,
1816 			     enum spdk_nvme_qprio qprio,
1817 			     uint32_t num_requests,
1818 			     bool delay_cmd_submit,
1819 			     bool async)
1820 {
1821 	struct nvme_rdma_qpair *rqpair;
1822 	struct spdk_nvme_qpair *qpair;
1823 	int rc;
1824 
1825 	if (qsize < SPDK_NVME_QUEUE_MIN_ENTRIES) {
1826 		SPDK_ERRLOG("Failed to create qpair with size %u. Minimum queue size is %d.\n",
1827 			    qsize, SPDK_NVME_QUEUE_MIN_ENTRIES);
1828 		return NULL;
1829 	}
1830 
1831 	rqpair = spdk_zmalloc(sizeof(struct nvme_rdma_qpair), 0, NULL, SPDK_ENV_SOCKET_ID_ANY,
1832 			      SPDK_MALLOC_DMA);
1833 	if (!rqpair) {
1834 		SPDK_ERRLOG("failed to get create rqpair\n");
1835 		return NULL;
1836 	}
1837 
1838 	/* Set num_entries one less than queue size. According to NVMe
1839 	 * and NVMe-oF specs we can not submit queue size requests,
1840 	 * one slot shall always remain empty.
1841 	 */
1842 	rqpair->num_entries = qsize - 1;
1843 	rqpair->delay_cmd_submit = delay_cmd_submit;
1844 	rqpair->state = NVME_RDMA_QPAIR_STATE_INVALID;
1845 	qpair = &rqpair->qpair;
1846 	rc = nvme_qpair_init(qpair, qid, ctrlr, qprio, num_requests, async);
1847 	if (rc != 0) {
1848 		spdk_free(rqpair);
1849 		return NULL;
1850 	}
1851 
1852 	return qpair;
1853 }
1854 
1855 static void
1856 nvme_rdma_qpair_destroy(struct nvme_rdma_qpair *rqpair)
1857 {
1858 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
1859 	struct nvme_rdma_ctrlr *rctrlr;
1860 	struct nvme_rdma_cm_event_entry *entry, *tmp;
1861 
1862 	spdk_rdma_free_mem_map(&rqpair->mr_map);
1863 
1864 	if (rqpair->evt) {
1865 		rdma_ack_cm_event(rqpair->evt);
1866 		rqpair->evt = NULL;
1867 	}
1868 
1869 	/*
1870 	 * This works because we have the controller lock both in
1871 	 * this function and in the function where we add new events.
1872 	 */
1873 	if (qpair->ctrlr != NULL) {
1874 		rctrlr = nvme_rdma_ctrlr(qpair->ctrlr);
1875 		STAILQ_FOREACH_SAFE(entry, &rctrlr->pending_cm_events, link, tmp) {
1876 			if (entry->evt->id->context == rqpair) {
1877 				STAILQ_REMOVE(&rctrlr->pending_cm_events, entry, nvme_rdma_cm_event_entry, link);
1878 				rdma_ack_cm_event(entry->evt);
1879 				STAILQ_INSERT_HEAD(&rctrlr->free_cm_events, entry, link);
1880 			}
1881 		}
1882 	}
1883 
1884 	if (rqpair->cm_id) {
1885 		if (rqpair->rdma_qp) {
1886 			spdk_rdma_put_pd(rqpair->rdma_qp->qp->pd);
1887 			spdk_rdma_provider_qp_destroy(rqpair->rdma_qp);
1888 			rqpair->rdma_qp = NULL;
1889 		}
1890 	}
1891 
1892 	if (rqpair->poller) {
1893 		struct nvme_rdma_poll_group     *group;
1894 
1895 		assert(qpair->poll_group);
1896 		group = nvme_rdma_poll_group(qpair->poll_group);
1897 
1898 		nvme_rdma_poll_group_put_poller(group, rqpair->poller);
1899 
1900 		rqpair->poller = NULL;
1901 		rqpair->cq = NULL;
1902 		if (rqpair->srq) {
1903 			rqpair->srq = NULL;
1904 			rqpair->rsps = NULL;
1905 		}
1906 	} else if (rqpair->cq) {
1907 		ibv_destroy_cq(rqpair->cq);
1908 		rqpair->cq = NULL;
1909 	}
1910 
1911 	nvme_rdma_free_reqs(rqpair);
1912 	nvme_rdma_free_rsps(rqpair->rsps);
1913 	rqpair->rsps = NULL;
1914 
1915 	/* destroy cm_id last so cma device will not be freed before we destroy the cq. */
1916 	if (rqpair->cm_id) {
1917 		rdma_destroy_id(rqpair->cm_id);
1918 		rqpair->cm_id = NULL;
1919 	}
1920 }
1921 
1922 static void nvme_rdma_qpair_abort_reqs(struct spdk_nvme_qpair *qpair, uint32_t dnr);
1923 
1924 static int
1925 nvme_rdma_qpair_disconnected(struct nvme_rdma_qpair *rqpair, int ret)
1926 {
1927 	if (ret) {
1928 		SPDK_DEBUGLOG(nvme, "Target did not respond to qpair disconnect.\n");
1929 		goto quiet;
1930 	}
1931 
1932 	if (rqpair->poller == NULL) {
1933 		/* If poller is not used, cq is not shared.
1934 		 * So complete disconnecting qpair immediately.
1935 		 */
1936 		goto quiet;
1937 	}
1938 
1939 	if (rqpair->rsps == NULL) {
1940 		goto quiet;
1941 	}
1942 
1943 	if (rqpair->need_destroy ||
1944 	    (rqpair->current_num_sends != 0 ||
1945 	     (!rqpair->srq && rqpair->rsps->current_num_recvs != 0))) {
1946 		rqpair->state = NVME_RDMA_QPAIR_STATE_LINGERING;
1947 		rqpair->evt_timeout_ticks = (NVME_RDMA_DISCONNECTED_QPAIR_TIMEOUT_US * spdk_get_ticks_hz()) /
1948 					    SPDK_SEC_TO_USEC + spdk_get_ticks();
1949 
1950 		return -EAGAIN;
1951 	}
1952 
1953 quiet:
1954 	rqpair->state = NVME_RDMA_QPAIR_STATE_EXITED;
1955 
1956 	nvme_rdma_qpair_abort_reqs(&rqpair->qpair, 0);
1957 	nvme_rdma_qpair_destroy(rqpair);
1958 	nvme_transport_ctrlr_disconnect_qpair_done(&rqpair->qpair);
1959 
1960 	return 0;
1961 }
1962 
1963 static int
1964 nvme_rdma_qpair_wait_until_quiet(struct nvme_rdma_qpair *rqpair)
1965 {
1966 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
1967 	struct spdk_nvme_ctrlr *ctrlr = qpair->ctrlr;
1968 
1969 	if (spdk_get_ticks() < rqpair->evt_timeout_ticks &&
1970 	    (rqpair->current_num_sends != 0 ||
1971 	     (!rqpair->srq && rqpair->rsps->current_num_recvs != 0))) {
1972 		return -EAGAIN;
1973 	}
1974 
1975 	rqpair->state = NVME_RDMA_QPAIR_STATE_EXITED;
1976 	nvme_rdma_qpair_abort_reqs(&rqpair->qpair, 0);
1977 	if (!nvme_qpair_is_admin_queue(qpair)) {
1978 		nvme_robust_mutex_lock(&ctrlr->ctrlr_lock);
1979 	}
1980 	nvme_rdma_qpair_destroy(rqpair);
1981 	if (!nvme_qpair_is_admin_queue(qpair)) {
1982 		nvme_robust_mutex_unlock(&ctrlr->ctrlr_lock);
1983 	}
1984 	nvme_transport_ctrlr_disconnect_qpair_done(&rqpair->qpair);
1985 
1986 	return 0;
1987 }
1988 
1989 static void
1990 _nvme_rdma_ctrlr_disconnect_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair,
1991 				  nvme_rdma_cm_event_cb disconnected_qpair_cb)
1992 {
1993 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
1994 	int rc;
1995 
1996 	assert(disconnected_qpair_cb != NULL);
1997 
1998 	rqpair->state = NVME_RDMA_QPAIR_STATE_EXITING;
1999 
2000 	if (rqpair->cm_id) {
2001 		if (rqpair->rdma_qp) {
2002 			rc = spdk_rdma_provider_qp_disconnect(rqpair->rdma_qp);
2003 			if ((qpair->ctrlr != NULL) && (rc == 0)) {
2004 				rc = nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_DISCONNECTED,
2005 								   disconnected_qpair_cb);
2006 				if (rc == 0) {
2007 					return;
2008 				}
2009 			}
2010 		}
2011 	}
2012 
2013 	disconnected_qpair_cb(rqpair, 0);
2014 }
2015 
2016 static int
2017 nvme_rdma_ctrlr_disconnect_qpair_poll(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
2018 {
2019 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2020 	int rc;
2021 
2022 	switch (rqpair->state) {
2023 	case NVME_RDMA_QPAIR_STATE_EXITING:
2024 		if (!nvme_qpair_is_admin_queue(qpair)) {
2025 			nvme_robust_mutex_lock(&ctrlr->ctrlr_lock);
2026 		}
2027 
2028 		rc = nvme_rdma_process_event_poll(rqpair);
2029 
2030 		if (!nvme_qpair_is_admin_queue(qpair)) {
2031 			nvme_robust_mutex_unlock(&ctrlr->ctrlr_lock);
2032 		}
2033 		break;
2034 
2035 	case NVME_RDMA_QPAIR_STATE_LINGERING:
2036 		rc = nvme_rdma_qpair_wait_until_quiet(rqpair);
2037 		break;
2038 	case NVME_RDMA_QPAIR_STATE_EXITED:
2039 		rc = 0;
2040 		break;
2041 
2042 	default:
2043 		assert(false);
2044 		rc = -EAGAIN;
2045 		break;
2046 	}
2047 
2048 	return rc;
2049 }
2050 
2051 static void
2052 nvme_rdma_ctrlr_disconnect_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
2053 {
2054 	int rc;
2055 
2056 	_nvme_rdma_ctrlr_disconnect_qpair(ctrlr, qpair, nvme_rdma_qpair_disconnected);
2057 
2058 	/* If the async mode is disabled, poll the qpair until it is actually disconnected.
2059 	 * It is ensured that poll_group_process_completions() calls disconnected_qpair_cb
2060 	 * for any disconnected qpair. Hence, we do not have to check if the qpair is in
2061 	 * a poll group or not.
2062 	 * At the same time, if the qpair is being destroyed, i.e. this function is called by
2063 	 * spdk_nvme_ctrlr_free_io_qpair then we need to wait until qpair is disconnected, otherwise
2064 	 * we may leak some resources.
2065 	 */
2066 	if (qpair->async && !qpair->destroy_in_progress) {
2067 		return;
2068 	}
2069 
2070 	while (1) {
2071 		rc = nvme_rdma_ctrlr_disconnect_qpair_poll(ctrlr, qpair);
2072 		if (rc != -EAGAIN) {
2073 			break;
2074 		}
2075 	}
2076 }
2077 
2078 static int
2079 nvme_rdma_stale_conn_disconnected(struct nvme_rdma_qpair *rqpair, int ret)
2080 {
2081 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
2082 
2083 	if (ret) {
2084 		SPDK_DEBUGLOG(nvme, "Target did not respond to qpair disconnect.\n");
2085 	}
2086 
2087 	nvme_rdma_qpair_destroy(rqpair);
2088 
2089 	qpair->last_transport_failure_reason = qpair->transport_failure_reason;
2090 	qpair->transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_NONE;
2091 
2092 	rqpair->state = NVME_RDMA_QPAIR_STATE_STALE_CONN;
2093 	rqpair->evt_timeout_ticks = (NVME_RDMA_STALE_CONN_RETRY_DELAY_US * spdk_get_ticks_hz()) /
2094 				    SPDK_SEC_TO_USEC + spdk_get_ticks();
2095 
2096 	return 0;
2097 }
2098 
2099 static int
2100 nvme_rdma_stale_conn_retry(struct nvme_rdma_qpair *rqpair)
2101 {
2102 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
2103 
2104 	if (rqpair->stale_conn_retry_count >= NVME_RDMA_STALE_CONN_RETRY_MAX) {
2105 		SPDK_ERRLOG("Retry failed %d times, give up stale connection to qpair (cntlid:%u, qid:%u).\n",
2106 			    NVME_RDMA_STALE_CONN_RETRY_MAX, qpair->ctrlr->cntlid, qpair->id);
2107 		return -ESTALE;
2108 	}
2109 
2110 	rqpair->stale_conn_retry_count++;
2111 
2112 	SPDK_NOTICELOG("%d times, retry stale connection to qpair (cntlid:%u, qid:%u).\n",
2113 		       rqpair->stale_conn_retry_count, qpair->ctrlr->cntlid, qpair->id);
2114 
2115 	_nvme_rdma_ctrlr_disconnect_qpair(qpair->ctrlr, qpair, nvme_rdma_stale_conn_disconnected);
2116 
2117 	return 0;
2118 }
2119 
2120 static int
2121 nvme_rdma_ctrlr_delete_io_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
2122 {
2123 	struct nvme_rdma_qpair *rqpair;
2124 
2125 	assert(qpair != NULL);
2126 	rqpair = nvme_rdma_qpair(qpair);
2127 
2128 	if (rqpair->state != NVME_RDMA_QPAIR_STATE_EXITED) {
2129 		int rc __attribute__((unused));
2130 
2131 		/* qpair was removed from the poll group while the disconnect is not finished.
2132 		 * Destroy rdma resources forcefully. */
2133 		rc = nvme_rdma_qpair_disconnected(rqpair, 0);
2134 		assert(rc == 0);
2135 	}
2136 
2137 	nvme_rdma_qpair_abort_reqs(qpair, 0);
2138 	nvme_qpair_deinit(qpair);
2139 
2140 	nvme_rdma_put_memory_domain(rqpair->memory_domain);
2141 
2142 	spdk_free(rqpair);
2143 
2144 	return 0;
2145 }
2146 
2147 static struct spdk_nvme_qpair *
2148 nvme_rdma_ctrlr_create_io_qpair(struct spdk_nvme_ctrlr *ctrlr, uint16_t qid,
2149 				const struct spdk_nvme_io_qpair_opts *opts)
2150 {
2151 	return nvme_rdma_ctrlr_create_qpair(ctrlr, qid, opts->io_queue_size, opts->qprio,
2152 					    opts->io_queue_requests,
2153 					    opts->delay_cmd_submit,
2154 					    opts->async_mode);
2155 }
2156 
2157 static int
2158 nvme_rdma_ctrlr_enable(struct spdk_nvme_ctrlr *ctrlr)
2159 {
2160 	/* do nothing here */
2161 	return 0;
2162 }
2163 
2164 static int nvme_rdma_ctrlr_destruct(struct spdk_nvme_ctrlr *ctrlr);
2165 
2166 /* We have to use the typedef in the function declaration to appease astyle. */
2167 typedef struct spdk_nvme_ctrlr spdk_nvme_ctrlr_t;
2168 
2169 static spdk_nvme_ctrlr_t *
2170 nvme_rdma_ctrlr_construct(const struct spdk_nvme_transport_id *trid,
2171 			  const struct spdk_nvme_ctrlr_opts *opts,
2172 			  void *devhandle)
2173 {
2174 	struct nvme_rdma_ctrlr *rctrlr;
2175 	struct ibv_context **contexts;
2176 	struct ibv_device_attr dev_attr;
2177 	int i, flag, rc;
2178 
2179 	rctrlr = spdk_zmalloc(sizeof(struct nvme_rdma_ctrlr), 0, NULL, SPDK_ENV_SOCKET_ID_ANY,
2180 			      SPDK_MALLOC_DMA);
2181 	if (rctrlr == NULL) {
2182 		SPDK_ERRLOG("could not allocate ctrlr\n");
2183 		return NULL;
2184 	}
2185 
2186 	rctrlr->ctrlr.opts = *opts;
2187 	rctrlr->ctrlr.trid = *trid;
2188 
2189 	if (opts->transport_retry_count > NVME_RDMA_CTRLR_MAX_TRANSPORT_RETRY_COUNT) {
2190 		SPDK_NOTICELOG("transport_retry_count exceeds max value %d, use max value\n",
2191 			       NVME_RDMA_CTRLR_MAX_TRANSPORT_RETRY_COUNT);
2192 		rctrlr->ctrlr.opts.transport_retry_count = NVME_RDMA_CTRLR_MAX_TRANSPORT_RETRY_COUNT;
2193 	}
2194 
2195 	if (opts->transport_ack_timeout > NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT) {
2196 		SPDK_NOTICELOG("transport_ack_timeout exceeds max value %d, use max value\n",
2197 			       NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT);
2198 		rctrlr->ctrlr.opts.transport_ack_timeout = NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT;
2199 	}
2200 
2201 	contexts = rdma_get_devices(NULL);
2202 	if (contexts == NULL) {
2203 		SPDK_ERRLOG("rdma_get_devices() failed: %s (%d)\n", spdk_strerror(errno), errno);
2204 		spdk_free(rctrlr);
2205 		return NULL;
2206 	}
2207 
2208 	i = 0;
2209 	rctrlr->max_sge = NVME_RDMA_MAX_SGL_DESCRIPTORS;
2210 
2211 	while (contexts[i] != NULL) {
2212 		rc = ibv_query_device(contexts[i], &dev_attr);
2213 		if (rc < 0) {
2214 			SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
2215 			rdma_free_devices(contexts);
2216 			spdk_free(rctrlr);
2217 			return NULL;
2218 		}
2219 		rctrlr->max_sge = spdk_min(rctrlr->max_sge, (uint16_t)dev_attr.max_sge);
2220 		i++;
2221 	}
2222 
2223 	rdma_free_devices(contexts);
2224 
2225 	rc = nvme_ctrlr_construct(&rctrlr->ctrlr);
2226 	if (rc != 0) {
2227 		spdk_free(rctrlr);
2228 		return NULL;
2229 	}
2230 
2231 	STAILQ_INIT(&rctrlr->pending_cm_events);
2232 	STAILQ_INIT(&rctrlr->free_cm_events);
2233 	rctrlr->cm_events = spdk_zmalloc(NVME_RDMA_NUM_CM_EVENTS * sizeof(*rctrlr->cm_events), 0, NULL,
2234 					 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
2235 	if (rctrlr->cm_events == NULL) {
2236 		SPDK_ERRLOG("unable to allocate buffers to hold CM events.\n");
2237 		goto destruct_ctrlr;
2238 	}
2239 
2240 	for (i = 0; i < NVME_RDMA_NUM_CM_EVENTS; i++) {
2241 		STAILQ_INSERT_TAIL(&rctrlr->free_cm_events, &rctrlr->cm_events[i], link);
2242 	}
2243 
2244 	rctrlr->cm_channel = rdma_create_event_channel();
2245 	if (rctrlr->cm_channel == NULL) {
2246 		SPDK_ERRLOG("rdma_create_event_channel() failed\n");
2247 		goto destruct_ctrlr;
2248 	}
2249 
2250 	flag = fcntl(rctrlr->cm_channel->fd, F_GETFL);
2251 	if (fcntl(rctrlr->cm_channel->fd, F_SETFL, flag | O_NONBLOCK) < 0) {
2252 		SPDK_ERRLOG("Cannot set event channel to non blocking\n");
2253 		goto destruct_ctrlr;
2254 	}
2255 
2256 	rctrlr->ctrlr.adminq = nvme_rdma_ctrlr_create_qpair(&rctrlr->ctrlr, 0,
2257 			       rctrlr->ctrlr.opts.admin_queue_size, 0,
2258 			       rctrlr->ctrlr.opts.admin_queue_size, false, true);
2259 	if (!rctrlr->ctrlr.adminq) {
2260 		SPDK_ERRLOG("failed to create admin qpair\n");
2261 		goto destruct_ctrlr;
2262 	}
2263 
2264 	if (nvme_ctrlr_add_process(&rctrlr->ctrlr, 0) != 0) {
2265 		SPDK_ERRLOG("nvme_ctrlr_add_process() failed\n");
2266 		goto destruct_ctrlr;
2267 	}
2268 
2269 	SPDK_DEBUGLOG(nvme, "successfully initialized the nvmf ctrlr\n");
2270 	return &rctrlr->ctrlr;
2271 
2272 destruct_ctrlr:
2273 	nvme_ctrlr_destruct(&rctrlr->ctrlr);
2274 	return NULL;
2275 }
2276 
2277 static int
2278 nvme_rdma_ctrlr_destruct(struct spdk_nvme_ctrlr *ctrlr)
2279 {
2280 	struct nvme_rdma_ctrlr *rctrlr = nvme_rdma_ctrlr(ctrlr);
2281 	struct nvme_rdma_cm_event_entry *entry;
2282 
2283 	if (ctrlr->adminq) {
2284 		nvme_rdma_ctrlr_delete_io_qpair(ctrlr, ctrlr->adminq);
2285 	}
2286 
2287 	STAILQ_FOREACH(entry, &rctrlr->pending_cm_events, link) {
2288 		rdma_ack_cm_event(entry->evt);
2289 	}
2290 
2291 	STAILQ_INIT(&rctrlr->free_cm_events);
2292 	STAILQ_INIT(&rctrlr->pending_cm_events);
2293 	spdk_free(rctrlr->cm_events);
2294 
2295 	if (rctrlr->cm_channel) {
2296 		rdma_destroy_event_channel(rctrlr->cm_channel);
2297 		rctrlr->cm_channel = NULL;
2298 	}
2299 
2300 	nvme_ctrlr_destruct_finish(ctrlr);
2301 
2302 	spdk_free(rctrlr);
2303 
2304 	return 0;
2305 }
2306 
2307 static int
2308 nvme_rdma_qpair_submit_request(struct spdk_nvme_qpair *qpair,
2309 			       struct nvme_request *req)
2310 {
2311 	struct nvme_rdma_qpair *rqpair;
2312 	struct spdk_nvme_rdma_req *rdma_req;
2313 	struct ibv_send_wr *wr;
2314 	struct nvme_rdma_poll_group *group;
2315 
2316 	rqpair = nvme_rdma_qpair(qpair);
2317 	assert(rqpair != NULL);
2318 	assert(req != NULL);
2319 
2320 	rdma_req = nvme_rdma_req_get(rqpair);
2321 	if (spdk_unlikely(!rdma_req)) {
2322 		if (rqpair->poller) {
2323 			rqpair->poller->stats.queued_requests++;
2324 		}
2325 		/* Inform the upper layer to try again later. */
2326 		return -EAGAIN;
2327 	}
2328 
2329 	if (nvme_rdma_req_init(rqpair, req, rdma_req)) {
2330 		SPDK_ERRLOG("nvme_rdma_req_init() failed\n");
2331 		nvme_rdma_req_put(rqpair, rdma_req);
2332 		return -1;
2333 	}
2334 
2335 	TAILQ_INSERT_TAIL(&rqpair->outstanding_reqs, rdma_req, link);
2336 
2337 	if (!rqpair->link_active.tqe_prev && qpair->poll_group) {
2338 		group = nvme_rdma_poll_group(qpair->poll_group);
2339 		TAILQ_INSERT_TAIL(&group->active_qpairs, rqpair, link_active);
2340 	}
2341 	rqpair->num_outstanding_reqs++;
2342 
2343 	assert(rqpair->current_num_sends < rqpair->num_entries);
2344 	rqpair->current_num_sends++;
2345 
2346 	wr = &rdma_req->send_wr;
2347 	wr->next = NULL;
2348 	nvme_rdma_trace_ibv_sge(wr->sg_list);
2349 
2350 	spdk_rdma_provider_qp_queue_send_wrs(rqpair->rdma_qp, wr);
2351 
2352 	if (!rqpair->delay_cmd_submit) {
2353 		return nvme_rdma_qpair_submit_sends(rqpair);
2354 	}
2355 
2356 	return 0;
2357 }
2358 
2359 static int
2360 nvme_rdma_qpair_reset(struct spdk_nvme_qpair *qpair)
2361 {
2362 	/* Currently, doing nothing here */
2363 	return 0;
2364 }
2365 
2366 static void
2367 nvme_rdma_qpair_abort_reqs(struct spdk_nvme_qpair *qpair, uint32_t dnr)
2368 {
2369 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
2370 	struct spdk_nvme_cpl cpl;
2371 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2372 
2373 	cpl.sqid = qpair->id;
2374 	cpl.status.sc = SPDK_NVME_SC_ABORTED_SQ_DELETION;
2375 	cpl.status.sct = SPDK_NVME_SCT_GENERIC;
2376 	cpl.status.dnr = dnr;
2377 
2378 	/*
2379 	 * We cannot abort requests at the RDMA layer without
2380 	 * unregistering them. If we do, we can still get error
2381 	 * free completions on the shared completion queue.
2382 	 */
2383 	if (nvme_qpair_get_state(qpair) > NVME_QPAIR_DISCONNECTING &&
2384 	    nvme_qpair_get_state(qpair) != NVME_QPAIR_DESTROYING) {
2385 		nvme_ctrlr_disconnect_qpair(qpair);
2386 	}
2387 
2388 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
2389 		nvme_rdma_req_complete(rdma_req, &cpl, true);
2390 	}
2391 }
2392 
2393 static void
2394 nvme_rdma_qpair_check_timeout(struct spdk_nvme_qpair *qpair)
2395 {
2396 	uint64_t t02;
2397 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
2398 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2399 	struct spdk_nvme_ctrlr *ctrlr = qpair->ctrlr;
2400 	struct spdk_nvme_ctrlr_process *active_proc;
2401 
2402 	/* Don't check timeouts during controller initialization. */
2403 	if (ctrlr->state != NVME_CTRLR_STATE_READY) {
2404 		return;
2405 	}
2406 
2407 	if (nvme_qpair_is_admin_queue(qpair)) {
2408 		active_proc = nvme_ctrlr_get_current_process(ctrlr);
2409 	} else {
2410 		active_proc = qpair->active_proc;
2411 	}
2412 
2413 	/* Only check timeouts if the current process has a timeout callback. */
2414 	if (active_proc == NULL || active_proc->timeout_cb_fn == NULL) {
2415 		return;
2416 	}
2417 
2418 	t02 = spdk_get_ticks();
2419 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
2420 		assert(rdma_req->req != NULL);
2421 
2422 		if (nvme_request_check_timeout(rdma_req->req, rdma_req->id, active_proc, t02)) {
2423 			/*
2424 			 * The requests are in order, so as soon as one has not timed out,
2425 			 * stop iterating.
2426 			 */
2427 			break;
2428 		}
2429 	}
2430 }
2431 
2432 static inline void
2433 nvme_rdma_request_ready(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
2434 {
2435 	struct spdk_nvme_rdma_rsp *rdma_rsp = rdma_req->rdma_rsp;
2436 	struct ibv_recv_wr *recv_wr = rdma_rsp->recv_wr;
2437 
2438 	nvme_rdma_req_complete(rdma_req, &rdma_rsp->cpl, true);
2439 
2440 	assert(rqpair->rsps->current_num_recvs < rqpair->rsps->num_entries);
2441 	rqpair->rsps->current_num_recvs++;
2442 
2443 	recv_wr->next = NULL;
2444 	nvme_rdma_trace_ibv_sge(recv_wr->sg_list);
2445 
2446 	if (!rqpair->srq) {
2447 		spdk_rdma_provider_qp_queue_recv_wrs(rqpair->rdma_qp, recv_wr);
2448 	} else {
2449 		spdk_rdma_provider_srq_queue_recv_wrs(rqpair->srq, recv_wr);
2450 	}
2451 }
2452 
2453 #define MAX_COMPLETIONS_PER_POLL 128
2454 
2455 static void
2456 nvme_rdma_fail_qpair(struct spdk_nvme_qpair *qpair, int failure_reason)
2457 {
2458 	if (failure_reason == IBV_WC_RETRY_EXC_ERR) {
2459 		qpair->transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_REMOTE;
2460 	} else if (qpair->transport_failure_reason == SPDK_NVME_QPAIR_FAILURE_NONE) {
2461 		qpair->transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_UNKNOWN;
2462 	}
2463 
2464 	nvme_ctrlr_disconnect_qpair(qpair);
2465 }
2466 
2467 static struct nvme_rdma_qpair *
2468 get_rdma_qpair_from_wc(struct nvme_rdma_poll_group *group, struct ibv_wc *wc)
2469 {
2470 	struct spdk_nvme_qpair *qpair;
2471 	struct nvme_rdma_qpair *rqpair;
2472 
2473 	STAILQ_FOREACH(qpair, &group->group.connected_qpairs, poll_group_stailq) {
2474 		rqpair = nvme_rdma_qpair(qpair);
2475 		if (NVME_RDMA_POLL_GROUP_CHECK_QPN(rqpair, wc->qp_num)) {
2476 			return rqpair;
2477 		}
2478 	}
2479 
2480 	STAILQ_FOREACH(qpair, &group->group.disconnected_qpairs, poll_group_stailq) {
2481 		rqpair = nvme_rdma_qpair(qpair);
2482 		if (NVME_RDMA_POLL_GROUP_CHECK_QPN(rqpair, wc->qp_num)) {
2483 			return rqpair;
2484 		}
2485 	}
2486 
2487 	return NULL;
2488 }
2489 
2490 static inline void
2491 nvme_rdma_log_wc_status(struct nvme_rdma_qpair *rqpair, struct ibv_wc *wc)
2492 {
2493 	struct nvme_rdma_wr *rdma_wr = (struct nvme_rdma_wr *)wc->wr_id;
2494 
2495 	if (wc->status == IBV_WC_WR_FLUSH_ERR) {
2496 		/* If qpair is in ERR state, we will receive completions for all posted and not completed
2497 		 * Work Requests with IBV_WC_WR_FLUSH_ERR status. Don't log an error in that case */
2498 		SPDK_DEBUGLOG(nvme, "WC error, qid %u, qp state %d, request 0x%lu type %d, status: (%d): %s\n",
2499 			      rqpair->qpair.id, rqpair->qpair.state, wc->wr_id, rdma_wr->type, wc->status,
2500 			      ibv_wc_status_str(wc->status));
2501 	} else {
2502 		SPDK_ERRLOG("WC error, qid %u, qp state %d, request 0x%lu type %d, status: (%d): %s\n",
2503 			    rqpair->qpair.id, rqpair->qpair.state, wc->wr_id, rdma_wr->type, wc->status,
2504 			    ibv_wc_status_str(wc->status));
2505 	}
2506 }
2507 
2508 static inline int
2509 nvme_rdma_process_recv_completion(struct nvme_rdma_poller *poller, struct ibv_wc *wc,
2510 				  struct nvme_rdma_wr *rdma_wr)
2511 {
2512 	struct nvme_rdma_qpair		*rqpair;
2513 	struct spdk_nvme_rdma_req	*rdma_req;
2514 	struct spdk_nvme_rdma_rsp	*rdma_rsp;
2515 
2516 	rdma_rsp = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvme_rdma_rsp, rdma_wr);
2517 
2518 	if (poller && poller->srq) {
2519 		rqpair = get_rdma_qpair_from_wc(poller->group, wc);
2520 		if (spdk_unlikely(!rqpair)) {
2521 			/* Since we do not handle the LAST_WQE_REACHED event, we do not know when
2522 			 * a Receive Queue in a QP, that is associated with an SRQ, is flushed.
2523 			 * We may get a WC for a already destroyed QP.
2524 			 *
2525 			 * However, for the SRQ, this is not any error. Hence, just re-post the
2526 			 * receive request to the SRQ to reuse for other QPs, and return 0.
2527 			 */
2528 			spdk_rdma_provider_srq_queue_recv_wrs(poller->srq, rdma_rsp->recv_wr);
2529 			return 0;
2530 		}
2531 	} else {
2532 		rqpair = rdma_rsp->rqpair;
2533 		if (spdk_unlikely(!rqpair)) {
2534 			/* TODO: Fix forceful QP destroy when it is not async mode.
2535 			 * CQ itself did not cause any error. Hence, return 0 for now.
2536 			 */
2537 			SPDK_WARNLOG("QP might be already destroyed.\n");
2538 			return 0;
2539 		}
2540 	}
2541 
2542 
2543 	assert(rqpair->rsps->current_num_recvs > 0);
2544 	rqpair->rsps->current_num_recvs--;
2545 
2546 	if (wc->status) {
2547 		nvme_rdma_log_wc_status(rqpair, wc);
2548 		goto err_wc;
2549 	}
2550 
2551 	SPDK_DEBUGLOG(nvme, "CQ recv completion\n");
2552 
2553 	if (wc->byte_len < sizeof(struct spdk_nvme_cpl)) {
2554 		SPDK_ERRLOG("recv length %u less than expected response size\n", wc->byte_len);
2555 		goto err_wc;
2556 	}
2557 	rdma_req = &rqpair->rdma_reqs[rdma_rsp->cpl.cid];
2558 	rdma_req->completion_flags |= NVME_RDMA_RECV_COMPLETED;
2559 	rdma_req->rdma_rsp = rdma_rsp;
2560 
2561 	if ((rdma_req->completion_flags & NVME_RDMA_SEND_COMPLETED) == 0) {
2562 		return 0;
2563 	}
2564 
2565 	rqpair->num_completions++;
2566 
2567 	nvme_rdma_request_ready(rqpair, rdma_req);
2568 
2569 	if (!rqpair->delay_cmd_submit) {
2570 		if (spdk_unlikely(nvme_rdma_qpair_submit_recvs(rqpair))) {
2571 			SPDK_ERRLOG("Unable to re-post rx descriptor\n");
2572 			nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2573 			return -ENXIO;
2574 		}
2575 	}
2576 
2577 	return 1;
2578 
2579 err_wc:
2580 	nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2581 	if (poller && poller->srq) {
2582 		spdk_rdma_provider_srq_queue_recv_wrs(poller->srq, rdma_rsp->recv_wr);
2583 	}
2584 	return -ENXIO;
2585 }
2586 
2587 static inline int
2588 nvme_rdma_process_send_completion(struct nvme_rdma_poller *poller,
2589 				  struct nvme_rdma_qpair *rdma_qpair,
2590 				  struct ibv_wc *wc, struct nvme_rdma_wr *rdma_wr)
2591 {
2592 	struct nvme_rdma_qpair		*rqpair;
2593 	struct spdk_nvme_rdma_req	*rdma_req;
2594 
2595 	rdma_req = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvme_rdma_req, rdma_wr);
2596 	rqpair = rdma_req->req ? nvme_rdma_qpair(rdma_req->req->qpair) : NULL;
2597 	if (!rqpair) {
2598 		rqpair = rdma_qpair != NULL ? rdma_qpair : get_rdma_qpair_from_wc(poller->group, wc);
2599 	}
2600 
2601 	/* If we are flushing I/O */
2602 	if (wc->status) {
2603 		if (!rqpair) {
2604 			/* When poll_group is used, several qpairs share the same CQ and it is possible to
2605 			 * receive a completion with error (e.g. IBV_WC_WR_FLUSH_ERR) for already disconnected qpair
2606 			 * That happens due to qpair is destroyed while there are submitted but not completed send/receive
2607 			 * Work Requests */
2608 			assert(poller);
2609 			return 0;
2610 		}
2611 		assert(rqpair->current_num_sends > 0);
2612 		rqpair->current_num_sends--;
2613 		nvme_rdma_log_wc_status(rqpair, wc);
2614 		nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2615 		if (rdma_req->rdma_rsp && poller && poller->srq) {
2616 			spdk_rdma_provider_srq_queue_recv_wrs(poller->srq, rdma_req->rdma_rsp->recv_wr);
2617 		}
2618 		return -ENXIO;
2619 	}
2620 
2621 	/* We do not support Soft Roce anymore. Other than Soft Roce's bug, we should not
2622 	 * receive a completion without error status after qpair is disconnected/destroyed.
2623 	 */
2624 	if (spdk_unlikely(rdma_req->req == NULL)) {
2625 		/*
2626 		 * Some infiniband drivers do not guarantee the previous assumption after we
2627 		 * received a RDMA_CM_EVENT_DEVICE_REMOVAL event.
2628 		 */
2629 		SPDK_ERRLOG("Received malformed completion: request 0x%"PRIx64" type %d\n", wc->wr_id,
2630 			    rdma_wr->type);
2631 		if (!rqpair || !rqpair->need_destroy) {
2632 			assert(0);
2633 		}
2634 		return -ENXIO;
2635 	}
2636 
2637 	rdma_req->completion_flags |= NVME_RDMA_SEND_COMPLETED;
2638 	assert(rqpair->current_num_sends > 0);
2639 	rqpair->current_num_sends--;
2640 
2641 	if ((rdma_req->completion_flags & NVME_RDMA_RECV_COMPLETED) == 0) {
2642 		return 0;
2643 	}
2644 
2645 	rqpair->num_completions++;
2646 
2647 	nvme_rdma_request_ready(rqpair, rdma_req);
2648 
2649 	if (!rqpair->delay_cmd_submit) {
2650 		if (spdk_unlikely(nvme_rdma_qpair_submit_recvs(rqpair))) {
2651 			SPDK_ERRLOG("Unable to re-post rx descriptor\n");
2652 			nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2653 			return -ENXIO;
2654 		}
2655 	}
2656 
2657 	return 1;
2658 }
2659 
2660 static int
2661 nvme_rdma_cq_process_completions(struct ibv_cq *cq, uint32_t batch_size,
2662 				 struct nvme_rdma_poller *poller,
2663 				 struct nvme_rdma_qpair *rdma_qpair,
2664 				 uint64_t *rdma_completions)
2665 {
2666 	struct ibv_wc			wc[MAX_COMPLETIONS_PER_POLL];
2667 	struct nvme_rdma_wr		*rdma_wr;
2668 	uint32_t			reaped = 0;
2669 	int				completion_rc = 0;
2670 	int				rc, _rc, i;
2671 
2672 	rc = ibv_poll_cq(cq, batch_size, wc);
2673 	if (rc < 0) {
2674 		SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
2675 			    errno, spdk_strerror(errno));
2676 		return -ECANCELED;
2677 	} else if (rc == 0) {
2678 		return 0;
2679 	}
2680 
2681 	for (i = 0; i < rc; i++) {
2682 		rdma_wr = (struct nvme_rdma_wr *)wc[i].wr_id;
2683 		switch (rdma_wr->type) {
2684 		case RDMA_WR_TYPE_RECV:
2685 			_rc = nvme_rdma_process_recv_completion(poller, &wc[i], rdma_wr);
2686 			break;
2687 
2688 		case RDMA_WR_TYPE_SEND:
2689 			_rc = nvme_rdma_process_send_completion(poller, rdma_qpair, &wc[i], rdma_wr);
2690 			break;
2691 
2692 		default:
2693 			SPDK_ERRLOG("Received an unexpected opcode on the CQ: %d\n", rdma_wr->type);
2694 			return -ECANCELED;
2695 		}
2696 		if (spdk_likely(_rc >= 0)) {
2697 			reaped += _rc;
2698 		} else {
2699 			completion_rc = _rc;
2700 		}
2701 	}
2702 
2703 	*rdma_completions += rc;
2704 
2705 	if (completion_rc) {
2706 		return completion_rc;
2707 	}
2708 
2709 	return reaped;
2710 }
2711 
2712 static void
2713 dummy_disconnected_qpair_cb(struct spdk_nvme_qpair *qpair, void *poll_group_ctx)
2714 {
2715 
2716 }
2717 
2718 static int
2719 nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
2720 				    uint32_t max_completions)
2721 {
2722 	struct nvme_rdma_qpair		*rqpair = nvme_rdma_qpair(qpair);
2723 	struct nvme_rdma_ctrlr		*rctrlr = nvme_rdma_ctrlr(qpair->ctrlr);
2724 	int				rc = 0, batch_size;
2725 	struct ibv_cq			*cq;
2726 	uint64_t			rdma_completions = 0;
2727 
2728 	/*
2729 	 * This is used during the connection phase. It's possible that we are still reaping error completions
2730 	 * from other qpairs so we need to call the poll group function. Also, it's more correct since the cq
2731 	 * is shared.
2732 	 */
2733 	if (qpair->poll_group != NULL) {
2734 		return spdk_nvme_poll_group_process_completions(qpair->poll_group->group, max_completions,
2735 				dummy_disconnected_qpair_cb);
2736 	}
2737 
2738 	if (max_completions == 0) {
2739 		max_completions = rqpair->num_entries;
2740 	} else {
2741 		max_completions = spdk_min(max_completions, rqpair->num_entries);
2742 	}
2743 
2744 	switch (nvme_qpair_get_state(qpair)) {
2745 	case NVME_QPAIR_CONNECTING:
2746 		rc = nvme_rdma_ctrlr_connect_qpair_poll(qpair->ctrlr, qpair);
2747 		if (rc == 0) {
2748 			/* Once the connection is completed, we can submit queued requests */
2749 			nvme_qpair_resubmit_requests(qpair, rqpair->num_entries);
2750 		} else if (rc != -EAGAIN) {
2751 			SPDK_ERRLOG("Failed to connect rqpair=%p\n", rqpair);
2752 			goto failed;
2753 		} else if (rqpair->state <= NVME_RDMA_QPAIR_STATE_INITIALIZING) {
2754 			return 0;
2755 		}
2756 		break;
2757 
2758 	case NVME_QPAIR_DISCONNECTING:
2759 		nvme_rdma_ctrlr_disconnect_qpair_poll(qpair->ctrlr, qpair);
2760 		return -ENXIO;
2761 
2762 	default:
2763 		if (nvme_qpair_is_admin_queue(qpair)) {
2764 			nvme_rdma_poll_events(rctrlr);
2765 		}
2766 		nvme_rdma_qpair_process_cm_event(rqpair);
2767 		break;
2768 	}
2769 
2770 	if (spdk_unlikely(qpair->transport_failure_reason != SPDK_NVME_QPAIR_FAILURE_NONE)) {
2771 		goto failed;
2772 	}
2773 
2774 	cq = rqpair->cq;
2775 
2776 	rqpair->num_completions = 0;
2777 	do {
2778 		batch_size = spdk_min((max_completions - rqpair->num_completions), MAX_COMPLETIONS_PER_POLL);
2779 		rc = nvme_rdma_cq_process_completions(cq, batch_size, NULL, rqpair, &rdma_completions);
2780 
2781 		if (rc == 0) {
2782 			break;
2783 			/* Handle the case where we fail to poll the cq. */
2784 		} else if (rc == -ECANCELED) {
2785 			goto failed;
2786 		} else if (rc == -ENXIO) {
2787 			return rc;
2788 		}
2789 	} while (rqpair->num_completions < max_completions);
2790 
2791 	if (spdk_unlikely(nvme_rdma_qpair_submit_sends(rqpair) ||
2792 			  nvme_rdma_qpair_submit_recvs(rqpair))) {
2793 		goto failed;
2794 	}
2795 
2796 	if (spdk_unlikely(qpair->ctrlr->timeout_enabled)) {
2797 		nvme_rdma_qpair_check_timeout(qpair);
2798 	}
2799 
2800 	return rqpair->num_completions;
2801 
2802 failed:
2803 	nvme_rdma_fail_qpair(qpair, 0);
2804 	return -ENXIO;
2805 }
2806 
2807 static uint32_t
2808 nvme_rdma_ctrlr_get_max_xfer_size(struct spdk_nvme_ctrlr *ctrlr)
2809 {
2810 	/* max_mr_size by ibv_query_device indicates the largest value that we can
2811 	 * set for a registered memory region.  It is independent from the actual
2812 	 * I/O size and is very likely to be larger than 2 MiB which is the
2813 	 * granularity we currently register memory regions.  Hence return
2814 	 * UINT32_MAX here and let the generic layer use the controller data to
2815 	 * moderate this value.
2816 	 */
2817 	return UINT32_MAX;
2818 }
2819 
2820 static uint16_t
2821 nvme_rdma_ctrlr_get_max_sges(struct spdk_nvme_ctrlr *ctrlr)
2822 {
2823 	struct nvme_rdma_ctrlr *rctrlr = nvme_rdma_ctrlr(ctrlr);
2824 	uint32_t max_sge = rctrlr->max_sge;
2825 	uint32_t max_in_capsule_sge = (ctrlr->cdata.nvmf_specific.ioccsz * 16 -
2826 				       sizeof(struct spdk_nvme_cmd)) /
2827 				      sizeof(struct spdk_nvme_sgl_descriptor);
2828 
2829 	/* Max SGE is limited by capsule size */
2830 	max_sge = spdk_min(max_sge, max_in_capsule_sge);
2831 	/* Max SGE may be limited by MSDBD */
2832 	if (ctrlr->cdata.nvmf_specific.msdbd != 0) {
2833 		max_sge = spdk_min(max_sge, ctrlr->cdata.nvmf_specific.msdbd);
2834 	}
2835 
2836 	/* Max SGE can't be less than 1 */
2837 	max_sge = spdk_max(1, max_sge);
2838 	return max_sge;
2839 }
2840 
2841 static int
2842 nvme_rdma_qpair_iterate_requests(struct spdk_nvme_qpair *qpair,
2843 				 int (*iter_fn)(struct nvme_request *req, void *arg),
2844 				 void *arg)
2845 {
2846 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2847 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
2848 	int rc;
2849 
2850 	assert(iter_fn != NULL);
2851 
2852 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
2853 		assert(rdma_req->req != NULL);
2854 
2855 		rc = iter_fn(rdma_req->req, arg);
2856 		if (rc != 0) {
2857 			return rc;
2858 		}
2859 	}
2860 
2861 	return 0;
2862 }
2863 
2864 static void
2865 nvme_rdma_admin_qpair_abort_aers(struct spdk_nvme_qpair *qpair)
2866 {
2867 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
2868 	struct spdk_nvme_cpl cpl;
2869 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2870 
2871 	cpl.status.sc = SPDK_NVME_SC_ABORTED_SQ_DELETION;
2872 	cpl.status.sct = SPDK_NVME_SCT_GENERIC;
2873 
2874 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
2875 		assert(rdma_req->req != NULL);
2876 
2877 		if (rdma_req->req->cmd.opc != SPDK_NVME_OPC_ASYNC_EVENT_REQUEST) {
2878 			continue;
2879 		}
2880 
2881 		nvme_rdma_req_complete(rdma_req, &cpl, false);
2882 	}
2883 }
2884 
2885 static void
2886 nvme_rdma_poller_destroy(struct nvme_rdma_poller *poller)
2887 {
2888 	if (poller->cq) {
2889 		ibv_destroy_cq(poller->cq);
2890 	}
2891 	if (poller->rsps) {
2892 		nvme_rdma_free_rsps(poller->rsps);
2893 	}
2894 	if (poller->srq) {
2895 		spdk_rdma_provider_srq_destroy(poller->srq);
2896 	}
2897 	if (poller->mr_map) {
2898 		spdk_rdma_free_mem_map(&poller->mr_map);
2899 	}
2900 	if (poller->pd) {
2901 		spdk_rdma_put_pd(poller->pd);
2902 	}
2903 	free(poller);
2904 }
2905 
2906 static struct nvme_rdma_poller *
2907 nvme_rdma_poller_create(struct nvme_rdma_poll_group *group, struct ibv_context *ctx)
2908 {
2909 	struct nvme_rdma_poller *poller;
2910 	struct ibv_device_attr dev_attr;
2911 	struct spdk_rdma_provider_srq_init_attr srq_init_attr = {};
2912 	struct nvme_rdma_rsp_opts opts;
2913 	int num_cqe, max_num_cqe;
2914 	int rc;
2915 
2916 	poller = calloc(1, sizeof(*poller));
2917 	if (poller == NULL) {
2918 		SPDK_ERRLOG("Unable to allocate poller.\n");
2919 		return NULL;
2920 	}
2921 
2922 	poller->group = group;
2923 	poller->device = ctx;
2924 
2925 	if (g_spdk_nvme_transport_opts.rdma_srq_size != 0) {
2926 		rc = ibv_query_device(ctx, &dev_attr);
2927 		if (rc) {
2928 			SPDK_ERRLOG("Unable to query RDMA device.\n");
2929 			goto fail;
2930 		}
2931 
2932 		poller->pd = spdk_rdma_get_pd(ctx);
2933 		if (poller->pd == NULL) {
2934 			SPDK_ERRLOG("Unable to get PD.\n");
2935 			goto fail;
2936 		}
2937 
2938 		poller->mr_map = spdk_rdma_create_mem_map(poller->pd, &g_nvme_hooks,
2939 				 SPDK_RDMA_MEMORY_MAP_ROLE_INITIATOR);
2940 		if (poller->mr_map == NULL) {
2941 			SPDK_ERRLOG("Unable to create memory map.\n");
2942 			goto fail;
2943 		}
2944 
2945 		srq_init_attr.stats = &poller->stats.rdma_stats.recv;
2946 		srq_init_attr.pd = poller->pd;
2947 		srq_init_attr.srq_init_attr.attr.max_wr = spdk_min((uint32_t)dev_attr.max_srq_wr,
2948 				g_spdk_nvme_transport_opts.rdma_srq_size);
2949 		srq_init_attr.srq_init_attr.attr.max_sge = spdk_min(dev_attr.max_sge,
2950 				NVME_RDMA_DEFAULT_RX_SGE);
2951 
2952 		poller->srq = spdk_rdma_provider_srq_create(&srq_init_attr);
2953 		if (poller->srq == NULL) {
2954 			SPDK_ERRLOG("Unable to create SRQ.\n");
2955 			goto fail;
2956 		}
2957 
2958 		opts.num_entries = g_spdk_nvme_transport_opts.rdma_srq_size;
2959 		opts.rqpair = NULL;
2960 		opts.srq = poller->srq;
2961 		opts.mr_map = poller->mr_map;
2962 
2963 		poller->rsps = nvme_rdma_create_rsps(&opts);
2964 		if (poller->rsps == NULL) {
2965 			SPDK_ERRLOG("Unable to create poller RDMA responses.\n");
2966 			goto fail;
2967 		}
2968 
2969 		rc = nvme_rdma_poller_submit_recvs(poller);
2970 		if (rc) {
2971 			SPDK_ERRLOG("Unable to submit poller RDMA responses.\n");
2972 			goto fail;
2973 		}
2974 
2975 		/*
2976 		 * When using an srq, fix the size of the completion queue at startup.
2977 		 * The initiator sends only send and recv WRs. Hence, the multiplier is 2.
2978 		 * (The target sends also data WRs. Hence, the multiplier is 3.)
2979 		 */
2980 		num_cqe = g_spdk_nvme_transport_opts.rdma_srq_size * 2;
2981 	} else {
2982 		num_cqe = DEFAULT_NVME_RDMA_CQ_SIZE;
2983 	}
2984 
2985 	max_num_cqe = g_spdk_nvme_transport_opts.rdma_max_cq_size;
2986 	if (max_num_cqe != 0 && num_cqe > max_num_cqe) {
2987 		num_cqe = max_num_cqe;
2988 	}
2989 
2990 	poller->cq = ibv_create_cq(poller->device, num_cqe, group, NULL, 0);
2991 
2992 	if (poller->cq == NULL) {
2993 		SPDK_ERRLOG("Unable to create CQ, errno %d.\n", errno);
2994 		goto fail;
2995 	}
2996 
2997 	STAILQ_INSERT_HEAD(&group->pollers, poller, link);
2998 	group->num_pollers++;
2999 	poller->current_num_wc = num_cqe;
3000 	poller->required_num_wc = 0;
3001 	return poller;
3002 
3003 fail:
3004 	nvme_rdma_poller_destroy(poller);
3005 	return NULL;
3006 }
3007 
3008 static void
3009 nvme_rdma_poll_group_free_pollers(struct nvme_rdma_poll_group *group)
3010 {
3011 	struct nvme_rdma_poller	*poller, *tmp_poller;
3012 
3013 	STAILQ_FOREACH_SAFE(poller, &group->pollers, link, tmp_poller) {
3014 		assert(poller->refcnt == 0);
3015 		if (poller->refcnt) {
3016 			SPDK_WARNLOG("Destroying poller with non-zero ref count: poller %p, refcnt %d\n",
3017 				     poller, poller->refcnt);
3018 		}
3019 
3020 		STAILQ_REMOVE(&group->pollers, poller, nvme_rdma_poller, link);
3021 		nvme_rdma_poller_destroy(poller);
3022 	}
3023 }
3024 
3025 static struct nvme_rdma_poller *
3026 nvme_rdma_poll_group_get_poller(struct nvme_rdma_poll_group *group, struct ibv_context *device)
3027 {
3028 	struct nvme_rdma_poller *poller = NULL;
3029 
3030 	STAILQ_FOREACH(poller, &group->pollers, link) {
3031 		if (poller->device == device) {
3032 			break;
3033 		}
3034 	}
3035 
3036 	if (!poller) {
3037 		poller = nvme_rdma_poller_create(group, device);
3038 		if (!poller) {
3039 			SPDK_ERRLOG("Failed to create a poller for device %p\n", device);
3040 			return NULL;
3041 		}
3042 	}
3043 
3044 	poller->refcnt++;
3045 	return poller;
3046 }
3047 
3048 static void
3049 nvme_rdma_poll_group_put_poller(struct nvme_rdma_poll_group *group, struct nvme_rdma_poller *poller)
3050 {
3051 	assert(poller->refcnt > 0);
3052 	if (--poller->refcnt == 0) {
3053 		STAILQ_REMOVE(&group->pollers, poller, nvme_rdma_poller, link);
3054 		group->num_pollers--;
3055 		nvme_rdma_poller_destroy(poller);
3056 	}
3057 }
3058 
3059 static struct spdk_nvme_transport_poll_group *
3060 nvme_rdma_poll_group_create(void)
3061 {
3062 	struct nvme_rdma_poll_group	*group;
3063 
3064 	group = calloc(1, sizeof(*group));
3065 	if (group == NULL) {
3066 		SPDK_ERRLOG("Unable to allocate poll group.\n");
3067 		return NULL;
3068 	}
3069 
3070 	STAILQ_INIT(&group->pollers);
3071 	TAILQ_INIT(&group->connecting_qpairs);
3072 	TAILQ_INIT(&group->active_qpairs);
3073 	return &group->group;
3074 }
3075 
3076 static int
3077 nvme_rdma_poll_group_connect_qpair(struct spdk_nvme_qpair *qpair)
3078 {
3079 	return 0;
3080 }
3081 
3082 static int
3083 nvme_rdma_poll_group_disconnect_qpair(struct spdk_nvme_qpair *qpair)
3084 {
3085 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
3086 	struct nvme_rdma_poll_group *group = nvme_rdma_poll_group(qpair->poll_group);
3087 
3088 	if (rqpair->link_connecting.tqe_prev) {
3089 		TAILQ_REMOVE(&group->connecting_qpairs, rqpair, link_connecting);
3090 		/* We use prev pointer to check if qpair is in connecting list or not .
3091 		 * TAILQ_REMOVE doesn't do it. So, we do it manually.
3092 		 */
3093 		rqpair->link_connecting.tqe_prev = NULL;
3094 	}
3095 
3096 	return 0;
3097 }
3098 
3099 static int
3100 nvme_rdma_poll_group_add(struct spdk_nvme_transport_poll_group *tgroup,
3101 			 struct spdk_nvme_qpair *qpair)
3102 {
3103 	return 0;
3104 }
3105 
3106 static int
3107 nvme_rdma_poll_group_remove(struct spdk_nvme_transport_poll_group *tgroup,
3108 			    struct spdk_nvme_qpair *qpair)
3109 {
3110 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
3111 	struct nvme_rdma_poll_group *group = nvme_rdma_poll_group(qpair->poll_group);
3112 
3113 	if (rqpair->link_active.tqe_prev) {
3114 		TAILQ_REMOVE(&group->active_qpairs, rqpair, link_active);
3115 		rqpair->link_active.tqe_prev = NULL;
3116 	}
3117 
3118 	return 0;
3119 }
3120 
3121 static inline void
3122 nvme_rdma_qpair_process_submits(struct nvme_rdma_poll_group *group,
3123 				struct nvme_rdma_qpair *rqpair)
3124 {
3125 	struct spdk_nvme_qpair	*qpair = &rqpair->qpair;
3126 
3127 	assert(rqpair->link_active.tqe_prev != NULL);
3128 
3129 	if (spdk_unlikely(rqpair->state <= NVME_RDMA_QPAIR_STATE_INITIALIZING ||
3130 			  rqpair->state >= NVME_RDMA_QPAIR_STATE_EXITING)) {
3131 		return;
3132 	}
3133 
3134 	if (spdk_unlikely(qpair->ctrlr->timeout_enabled)) {
3135 		nvme_rdma_qpair_check_timeout(qpair);
3136 	}
3137 
3138 	nvme_rdma_qpair_submit_sends(rqpair);
3139 	if (!rqpair->srq) {
3140 		nvme_rdma_qpair_submit_recvs(rqpair);
3141 	}
3142 	if (rqpair->num_completions > 0) {
3143 		nvme_qpair_resubmit_requests(qpair, rqpair->num_completions);
3144 		rqpair->num_completions = 0;
3145 	}
3146 
3147 	if (rqpair->num_outstanding_reqs == 0 && STAILQ_EMPTY(&qpair->queued_req)) {
3148 		TAILQ_REMOVE(&group->active_qpairs, rqpair, link_active);
3149 		/* We use prev pointer to check if qpair is in active list or not.
3150 		 * TAILQ_REMOVE doesn't do it. So, we do it manually.
3151 		 */
3152 		rqpair->link_active.tqe_prev = NULL;
3153 	}
3154 }
3155 
3156 static int64_t
3157 nvme_rdma_poll_group_process_completions(struct spdk_nvme_transport_poll_group *tgroup,
3158 		uint32_t completions_per_qpair, spdk_nvme_disconnected_qpair_cb disconnected_qpair_cb)
3159 {
3160 	struct spdk_nvme_qpair			*qpair, *tmp_qpair;
3161 	struct nvme_rdma_qpair			*rqpair, *tmp_rqpair;
3162 	struct nvme_rdma_poll_group		*group;
3163 	struct nvme_rdma_poller			*poller;
3164 	int					batch_size, rc, rc2 = 0;
3165 	int64_t					total_completions = 0;
3166 	uint64_t				completions_allowed = 0;
3167 	uint64_t				completions_per_poller = 0;
3168 	uint64_t				poller_completions = 0;
3169 	uint64_t				rdma_completions;
3170 
3171 	if (completions_per_qpair == 0) {
3172 		completions_per_qpair = MAX_COMPLETIONS_PER_POLL;
3173 	}
3174 
3175 	group = nvme_rdma_poll_group(tgroup);
3176 
3177 	STAILQ_FOREACH_SAFE(qpair, &tgroup->disconnected_qpairs, poll_group_stailq, tmp_qpair) {
3178 		rc = nvme_rdma_ctrlr_disconnect_qpair_poll(qpair->ctrlr, qpair);
3179 		if (rc == 0) {
3180 			disconnected_qpair_cb(qpair, tgroup->group->ctx);
3181 		}
3182 	}
3183 
3184 	TAILQ_FOREACH_SAFE(rqpair, &group->connecting_qpairs, link_connecting, tmp_rqpair) {
3185 		qpair = &rqpair->qpair;
3186 
3187 		rc = nvme_rdma_ctrlr_connect_qpair_poll(qpair->ctrlr, qpair);
3188 		if (rc == 0 || rc != -EAGAIN) {
3189 			TAILQ_REMOVE(&group->connecting_qpairs, rqpair, link_connecting);
3190 			/* We use prev pointer to check if qpair is in connecting list or not.
3191 			 * TAILQ_REMOVE does not do it. So, we do it manually.
3192 			 */
3193 			rqpair->link_connecting.tqe_prev = NULL;
3194 
3195 			if (rc == 0) {
3196 				/* Once the connection is completed, we can submit queued requests */
3197 				nvme_qpair_resubmit_requests(qpair, rqpair->num_entries);
3198 			} else if (rc != -EAGAIN) {
3199 				SPDK_ERRLOG("Failed to connect rqpair=%p\n", rqpair);
3200 				nvme_rdma_fail_qpair(qpair, 0);
3201 			}
3202 		}
3203 	}
3204 
3205 	STAILQ_FOREACH_SAFE(qpair, &tgroup->connected_qpairs, poll_group_stailq, tmp_qpair) {
3206 		rqpair = nvme_rdma_qpair(qpair);
3207 
3208 		if (spdk_likely(nvme_qpair_get_state(qpair) != NVME_QPAIR_CONNECTING)) {
3209 			nvme_rdma_qpair_process_cm_event(rqpair);
3210 		}
3211 
3212 		if (spdk_unlikely(qpair->transport_failure_reason != SPDK_NVME_QPAIR_FAILURE_NONE)) {
3213 			rc2 = -ENXIO;
3214 			nvme_rdma_fail_qpair(qpair, 0);
3215 		}
3216 	}
3217 
3218 	completions_allowed = completions_per_qpair * tgroup->num_connected_qpairs;
3219 	if (group->num_pollers) {
3220 		completions_per_poller = spdk_max(completions_allowed / group->num_pollers, 1);
3221 	}
3222 
3223 	STAILQ_FOREACH(poller, &group->pollers, link) {
3224 		poller_completions = 0;
3225 		rdma_completions = 0;
3226 		do {
3227 			poller->stats.polls++;
3228 			batch_size = spdk_min((completions_per_poller - poller_completions), MAX_COMPLETIONS_PER_POLL);
3229 			rc = nvme_rdma_cq_process_completions(poller->cq, batch_size, poller, NULL, &rdma_completions);
3230 			if (rc <= 0) {
3231 				if (rc == -ECANCELED) {
3232 					return -EIO;
3233 				} else if (rc == 0) {
3234 					poller->stats.idle_polls++;
3235 				}
3236 				break;
3237 			}
3238 
3239 			poller_completions += rc;
3240 		} while (poller_completions < completions_per_poller);
3241 		total_completions += poller_completions;
3242 		poller->stats.completions += rdma_completions;
3243 		if (poller->srq) {
3244 			nvme_rdma_poller_submit_recvs(poller);
3245 		}
3246 	}
3247 
3248 	TAILQ_FOREACH_SAFE(rqpair, &group->active_qpairs, link_active, tmp_rqpair) {
3249 		nvme_rdma_qpair_process_submits(group, rqpair);
3250 	}
3251 
3252 	return rc2 != 0 ? rc2 : total_completions;
3253 }
3254 
3255 static int
3256 nvme_rdma_poll_group_destroy(struct spdk_nvme_transport_poll_group *tgroup)
3257 {
3258 	struct nvme_rdma_poll_group	*group = nvme_rdma_poll_group(tgroup);
3259 
3260 	if (!STAILQ_EMPTY(&tgroup->connected_qpairs) || !STAILQ_EMPTY(&tgroup->disconnected_qpairs)) {
3261 		return -EBUSY;
3262 	}
3263 
3264 	nvme_rdma_poll_group_free_pollers(group);
3265 	free(group);
3266 
3267 	return 0;
3268 }
3269 
3270 static int
3271 nvme_rdma_poll_group_get_stats(struct spdk_nvme_transport_poll_group *tgroup,
3272 			       struct spdk_nvme_transport_poll_group_stat **_stats)
3273 {
3274 	struct nvme_rdma_poll_group *group;
3275 	struct spdk_nvme_transport_poll_group_stat *stats;
3276 	struct spdk_nvme_rdma_device_stat *device_stat;
3277 	struct nvme_rdma_poller *poller;
3278 	uint32_t i = 0;
3279 
3280 	if (tgroup == NULL || _stats == NULL) {
3281 		SPDK_ERRLOG("Invalid stats or group pointer\n");
3282 		return -EINVAL;
3283 	}
3284 
3285 	group = nvme_rdma_poll_group(tgroup);
3286 	stats = calloc(1, sizeof(*stats));
3287 	if (!stats) {
3288 		SPDK_ERRLOG("Can't allocate memory for RDMA stats\n");
3289 		return -ENOMEM;
3290 	}
3291 	stats->trtype = SPDK_NVME_TRANSPORT_RDMA;
3292 	stats->rdma.num_devices = group->num_pollers;
3293 
3294 	if (stats->rdma.num_devices == 0) {
3295 		*_stats = stats;
3296 		return 0;
3297 	}
3298 
3299 	stats->rdma.device_stats = calloc(stats->rdma.num_devices, sizeof(*stats->rdma.device_stats));
3300 	if (!stats->rdma.device_stats) {
3301 		SPDK_ERRLOG("Can't allocate memory for RDMA device stats\n");
3302 		free(stats);
3303 		return -ENOMEM;
3304 	}
3305 
3306 	STAILQ_FOREACH(poller, &group->pollers, link) {
3307 		device_stat = &stats->rdma.device_stats[i];
3308 		device_stat->name = poller->device->device->name;
3309 		device_stat->polls = poller->stats.polls;
3310 		device_stat->idle_polls = poller->stats.idle_polls;
3311 		device_stat->completions = poller->stats.completions;
3312 		device_stat->queued_requests = poller->stats.queued_requests;
3313 		device_stat->total_send_wrs = poller->stats.rdma_stats.send.num_submitted_wrs;
3314 		device_stat->send_doorbell_updates = poller->stats.rdma_stats.send.doorbell_updates;
3315 		device_stat->total_recv_wrs = poller->stats.rdma_stats.recv.num_submitted_wrs;
3316 		device_stat->recv_doorbell_updates = poller->stats.rdma_stats.recv.doorbell_updates;
3317 		i++;
3318 	}
3319 
3320 	*_stats = stats;
3321 
3322 	return 0;
3323 }
3324 
3325 static void
3326 nvme_rdma_poll_group_free_stats(struct spdk_nvme_transport_poll_group *tgroup,
3327 				struct spdk_nvme_transport_poll_group_stat *stats)
3328 {
3329 	if (stats) {
3330 		free(stats->rdma.device_stats);
3331 	}
3332 	free(stats);
3333 }
3334 
3335 static int
3336 nvme_rdma_ctrlr_get_memory_domains(const struct spdk_nvme_ctrlr *ctrlr,
3337 				   struct spdk_memory_domain **domains, int array_size)
3338 {
3339 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(ctrlr->adminq);
3340 
3341 	if (domains && array_size > 0) {
3342 		domains[0] = rqpair->memory_domain->domain;
3343 	}
3344 
3345 	return 1;
3346 }
3347 
3348 void
3349 spdk_nvme_rdma_init_hooks(struct spdk_nvme_rdma_hooks *hooks)
3350 {
3351 	g_nvme_hooks = *hooks;
3352 }
3353 
3354 const struct spdk_nvme_transport_ops rdma_ops = {
3355 	.name = "RDMA",
3356 	.type = SPDK_NVME_TRANSPORT_RDMA,
3357 	.ctrlr_construct = nvme_rdma_ctrlr_construct,
3358 	.ctrlr_scan = nvme_fabric_ctrlr_scan,
3359 	.ctrlr_destruct = nvme_rdma_ctrlr_destruct,
3360 	.ctrlr_enable = nvme_rdma_ctrlr_enable,
3361 
3362 	.ctrlr_set_reg_4 = nvme_fabric_ctrlr_set_reg_4,
3363 	.ctrlr_set_reg_8 = nvme_fabric_ctrlr_set_reg_8,
3364 	.ctrlr_get_reg_4 = nvme_fabric_ctrlr_get_reg_4,
3365 	.ctrlr_get_reg_8 = nvme_fabric_ctrlr_get_reg_8,
3366 	.ctrlr_set_reg_4_async = nvme_fabric_ctrlr_set_reg_4_async,
3367 	.ctrlr_set_reg_8_async = nvme_fabric_ctrlr_set_reg_8_async,
3368 	.ctrlr_get_reg_4_async = nvme_fabric_ctrlr_get_reg_4_async,
3369 	.ctrlr_get_reg_8_async = nvme_fabric_ctrlr_get_reg_8_async,
3370 
3371 	.ctrlr_get_max_xfer_size = nvme_rdma_ctrlr_get_max_xfer_size,
3372 	.ctrlr_get_max_sges = nvme_rdma_ctrlr_get_max_sges,
3373 
3374 	.ctrlr_create_io_qpair = nvme_rdma_ctrlr_create_io_qpair,
3375 	.ctrlr_delete_io_qpair = nvme_rdma_ctrlr_delete_io_qpair,
3376 	.ctrlr_connect_qpair = nvme_rdma_ctrlr_connect_qpair,
3377 	.ctrlr_disconnect_qpair = nvme_rdma_ctrlr_disconnect_qpair,
3378 
3379 	.ctrlr_get_memory_domains = nvme_rdma_ctrlr_get_memory_domains,
3380 
3381 	.qpair_abort_reqs = nvme_rdma_qpair_abort_reqs,
3382 	.qpair_reset = nvme_rdma_qpair_reset,
3383 	.qpair_submit_request = nvme_rdma_qpair_submit_request,
3384 	.qpair_process_completions = nvme_rdma_qpair_process_completions,
3385 	.qpair_iterate_requests = nvme_rdma_qpair_iterate_requests,
3386 	.admin_qpair_abort_aers = nvme_rdma_admin_qpair_abort_aers,
3387 
3388 	.poll_group_create = nvme_rdma_poll_group_create,
3389 	.poll_group_connect_qpair = nvme_rdma_poll_group_connect_qpair,
3390 	.poll_group_disconnect_qpair = nvme_rdma_poll_group_disconnect_qpair,
3391 	.poll_group_add = nvme_rdma_poll_group_add,
3392 	.poll_group_remove = nvme_rdma_poll_group_remove,
3393 	.poll_group_process_completions = nvme_rdma_poll_group_process_completions,
3394 	.poll_group_destroy = nvme_rdma_poll_group_destroy,
3395 	.poll_group_get_stats = nvme_rdma_poll_group_get_stats,
3396 	.poll_group_free_stats = nvme_rdma_poll_group_free_stats,
3397 };
3398 
3399 SPDK_NVME_TRANSPORT_REGISTER(rdma, &rdma_ops);
3400