xref: /spdk/lib/nvme/nvme_rdma.c (revision 60982c759db49b4f4579f16e3b24df0725ba4b94)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation. All rights reserved.
3  *   Copyright (c) 2019-2021 Mellanox Technologies LTD. All rights reserved.
4  *   Copyright (c) 2021-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 /*
8  * NVMe over RDMA transport
9  */
10 
11 #include "spdk/stdinc.h"
12 
13 #include "spdk/assert.h"
14 #include "spdk/dma.h"
15 #include "spdk/log.h"
16 #include "spdk/trace.h"
17 #include "spdk/queue.h"
18 #include "spdk/nvme.h"
19 #include "spdk/nvmf_spec.h"
20 #include "spdk/string.h"
21 #include "spdk/endian.h"
22 #include "spdk/likely.h"
23 #include "spdk/config.h"
24 
25 #include "nvme_internal.h"
26 #include "spdk_internal/rdma.h"
27 
28 #define NVME_RDMA_TIME_OUT_IN_MS 2000
29 #define NVME_RDMA_RW_BUFFER_SIZE 131072
30 
31 /*
32  * NVME RDMA qpair Resource Defaults
33  */
34 #define NVME_RDMA_DEFAULT_TX_SGE		2
35 #define NVME_RDMA_DEFAULT_RX_SGE		1
36 
37 /* Max number of NVMe-oF SGL descriptors supported by the host */
38 #define NVME_RDMA_MAX_SGL_DESCRIPTORS		16
39 
40 /* number of STAILQ entries for holding pending RDMA CM events. */
41 #define NVME_RDMA_NUM_CM_EVENTS			256
42 
43 /* CM event processing timeout */
44 #define NVME_RDMA_QPAIR_CM_EVENT_TIMEOUT_US	1000000
45 
46 /* The default size for a shared rdma completion queue. */
47 #define DEFAULT_NVME_RDMA_CQ_SIZE		4096
48 
49 /*
50  * In the special case of a stale connection we don't expose a mechanism
51  * for the user to retry the connection so we need to handle it internally.
52  */
53 #define NVME_RDMA_STALE_CONN_RETRY_MAX		5
54 #define NVME_RDMA_STALE_CONN_RETRY_DELAY_US	10000
55 
56 /*
57  * Maximum value of transport_retry_count used by RDMA controller
58  */
59 #define NVME_RDMA_CTRLR_MAX_TRANSPORT_RETRY_COUNT	7
60 
61 /*
62  * Maximum value of transport_ack_timeout used by RDMA controller
63  */
64 #define NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT	31
65 
66 /*
67  * Number of microseconds to wait until the lingering qpair becomes quiet.
68  */
69 #define NVME_RDMA_DISCONNECTED_QPAIR_TIMEOUT_US	1000000ull
70 
71 /*
72  * The max length of keyed SGL data block (3 bytes)
73  */
74 #define NVME_RDMA_MAX_KEYED_SGL_LENGTH ((1u << 24u) - 1)
75 
76 #define WC_PER_QPAIR(queue_depth)	(queue_depth * 2)
77 
78 #define NVME_RDMA_POLL_GROUP_CHECK_QPN(_rqpair, qpn)				\
79 	((_rqpair)->rdma_qp && (_rqpair)->rdma_qp->qp->qp_num == (qpn))	\
80 
81 struct nvme_rdma_memory_domain {
82 	TAILQ_ENTRY(nvme_rdma_memory_domain) link;
83 	uint32_t ref;
84 	struct ibv_pd *pd;
85 	struct spdk_memory_domain *domain;
86 	struct spdk_memory_domain_rdma_ctx rdma_ctx;
87 };
88 
89 enum nvme_rdma_wr_type {
90 	RDMA_WR_TYPE_RECV,
91 	RDMA_WR_TYPE_SEND,
92 };
93 
94 struct nvme_rdma_wr {
95 	/* Using this instead of the enum allows this struct to only occupy one byte. */
96 	uint8_t	type;
97 };
98 
99 struct spdk_nvmf_cmd {
100 	struct spdk_nvme_cmd cmd;
101 	struct spdk_nvme_sgl_descriptor sgl[NVME_RDMA_MAX_SGL_DESCRIPTORS];
102 };
103 
104 struct spdk_nvme_rdma_hooks g_nvme_hooks = {};
105 
106 /* STAILQ wrapper for cm events. */
107 struct nvme_rdma_cm_event_entry {
108 	struct rdma_cm_event			*evt;
109 	STAILQ_ENTRY(nvme_rdma_cm_event_entry)	link;
110 };
111 
112 /* NVMe RDMA transport extensions for spdk_nvme_ctrlr */
113 struct nvme_rdma_ctrlr {
114 	struct spdk_nvme_ctrlr			ctrlr;
115 
116 	uint16_t				max_sge;
117 
118 	struct rdma_event_channel		*cm_channel;
119 
120 	STAILQ_HEAD(, nvme_rdma_cm_event_entry)	pending_cm_events;
121 
122 	STAILQ_HEAD(, nvme_rdma_cm_event_entry)	free_cm_events;
123 
124 	struct nvme_rdma_cm_event_entry		*cm_events;
125 };
126 
127 struct nvme_rdma_poller_stats {
128 	uint64_t polls;
129 	uint64_t idle_polls;
130 	uint64_t queued_requests;
131 	uint64_t completions;
132 	struct spdk_rdma_qp_stats rdma_stats;
133 };
134 
135 struct nvme_rdma_poll_group;
136 struct nvme_rdma_rsps;
137 
138 struct nvme_rdma_poller {
139 	struct ibv_context		*device;
140 	struct ibv_cq			*cq;
141 	struct spdk_rdma_srq		*srq;
142 	struct nvme_rdma_rsps		*rsps;
143 	struct ibv_pd			*pd;
144 	struct spdk_rdma_mem_map	*mr_map;
145 	uint32_t			refcnt;
146 	int				required_num_wc;
147 	int				current_num_wc;
148 	struct nvme_rdma_poller_stats	stats;
149 	struct nvme_rdma_poll_group	*group;
150 	STAILQ_ENTRY(nvme_rdma_poller)	link;
151 };
152 
153 struct nvme_rdma_poll_group {
154 	struct spdk_nvme_transport_poll_group		group;
155 	STAILQ_HEAD(, nvme_rdma_poller)			pollers;
156 	uint32_t					num_pollers;
157 };
158 
159 enum nvme_rdma_qpair_state {
160 	NVME_RDMA_QPAIR_STATE_INVALID = 0,
161 	NVME_RDMA_QPAIR_STATE_STALE_CONN,
162 	NVME_RDMA_QPAIR_STATE_INITIALIZING,
163 	NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_SEND,
164 	NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_POLL,
165 	NVME_RDMA_QPAIR_STATE_RUNNING,
166 	NVME_RDMA_QPAIR_STATE_EXITING,
167 	NVME_RDMA_QPAIR_STATE_LINGERING,
168 	NVME_RDMA_QPAIR_STATE_EXITED,
169 };
170 
171 struct nvme_rdma_qpair;
172 
173 typedef int (*nvme_rdma_cm_event_cb)(struct nvme_rdma_qpair *rqpair, int ret);
174 
175 struct nvme_rdma_rsp_opts {
176 	uint16_t				num_entries;
177 	struct nvme_rdma_qpair			*rqpair;
178 	struct spdk_rdma_srq			*srq;
179 	struct spdk_rdma_mem_map		*mr_map;
180 };
181 
182 struct nvme_rdma_rsps {
183 	/* Parallel arrays of response buffers + response SGLs of size num_entries */
184 	struct ibv_sge				*rsp_sgls;
185 	struct spdk_nvme_rdma_rsp		*rsps;
186 
187 	struct ibv_recv_wr			*rsp_recv_wrs;
188 
189 	/* Count of outstanding recv objects */
190 	uint16_t				current_num_recvs;
191 
192 	uint16_t				num_entries;
193 };
194 
195 /* NVMe RDMA qpair extensions for spdk_nvme_qpair */
196 struct nvme_rdma_qpair {
197 	struct spdk_nvme_qpair			qpair;
198 
199 	struct spdk_rdma_qp			*rdma_qp;
200 	struct rdma_cm_id			*cm_id;
201 	struct ibv_cq				*cq;
202 	struct spdk_rdma_srq			*srq;
203 
204 	struct	spdk_nvme_rdma_req		*rdma_reqs;
205 
206 	uint32_t				max_send_sge;
207 
208 	uint32_t				max_recv_sge;
209 
210 	uint16_t				num_entries;
211 
212 	bool					delay_cmd_submit;
213 
214 	uint32_t				num_completions;
215 
216 	struct nvme_rdma_rsps			*rsps;
217 
218 	/*
219 	 * Array of num_entries NVMe commands registered as RDMA message buffers.
220 	 * Indexed by rdma_req->id.
221 	 */
222 	struct spdk_nvmf_cmd			*cmds;
223 
224 	struct spdk_rdma_mem_map		*mr_map;
225 
226 	TAILQ_HEAD(, spdk_nvme_rdma_req)	free_reqs;
227 	TAILQ_HEAD(, spdk_nvme_rdma_req)	outstanding_reqs;
228 
229 	struct nvme_rdma_memory_domain		*memory_domain;
230 
231 	/* Count of outstanding send objects */
232 	uint16_t				current_num_sends;
233 
234 	/* Placed at the end of the struct since it is not used frequently */
235 	struct rdma_cm_event			*evt;
236 	struct nvme_rdma_poller			*poller;
237 
238 	uint64_t				evt_timeout_ticks;
239 	nvme_rdma_cm_event_cb			evt_cb;
240 	enum rdma_cm_event_type			expected_evt_type;
241 
242 	enum nvme_rdma_qpair_state		state;
243 
244 	bool					in_connect_poll;
245 
246 	uint8_t					stale_conn_retry_count;
247 	bool					need_destroy;
248 };
249 
250 enum NVME_RDMA_COMPLETION_FLAGS {
251 	NVME_RDMA_SEND_COMPLETED = 1u << 0,
252 	NVME_RDMA_RECV_COMPLETED = 1u << 1,
253 };
254 
255 struct spdk_nvme_rdma_req {
256 	uint16_t				id;
257 	uint16_t				completion_flags: 2;
258 	uint16_t				reserved: 14;
259 	/* if completion of RDMA_RECV received before RDMA_SEND, we will complete nvme request
260 	 * during processing of RDMA_SEND. To complete the request we must know the response
261 	 * received in RDMA_RECV, so store it in this field */
262 	struct spdk_nvme_rdma_rsp		*rdma_rsp;
263 
264 	struct nvme_rdma_wr			rdma_wr;
265 
266 	struct ibv_send_wr			send_wr;
267 
268 	struct nvme_request			*req;
269 
270 	struct ibv_sge				send_sgl[NVME_RDMA_DEFAULT_TX_SGE];
271 
272 	TAILQ_ENTRY(spdk_nvme_rdma_req)		link;
273 };
274 
275 struct spdk_nvme_rdma_rsp {
276 	struct spdk_nvme_cpl	cpl;
277 	struct nvme_rdma_qpair	*rqpair;
278 	struct ibv_recv_wr	*recv_wr;
279 	struct nvme_rdma_wr	rdma_wr;
280 };
281 
282 struct nvme_rdma_memory_translation_ctx {
283 	void *addr;
284 	size_t length;
285 	uint32_t lkey;
286 	uint32_t rkey;
287 };
288 
289 static const char *rdma_cm_event_str[] = {
290 	"RDMA_CM_EVENT_ADDR_RESOLVED",
291 	"RDMA_CM_EVENT_ADDR_ERROR",
292 	"RDMA_CM_EVENT_ROUTE_RESOLVED",
293 	"RDMA_CM_EVENT_ROUTE_ERROR",
294 	"RDMA_CM_EVENT_CONNECT_REQUEST",
295 	"RDMA_CM_EVENT_CONNECT_RESPONSE",
296 	"RDMA_CM_EVENT_CONNECT_ERROR",
297 	"RDMA_CM_EVENT_UNREACHABLE",
298 	"RDMA_CM_EVENT_REJECTED",
299 	"RDMA_CM_EVENT_ESTABLISHED",
300 	"RDMA_CM_EVENT_DISCONNECTED",
301 	"RDMA_CM_EVENT_DEVICE_REMOVAL",
302 	"RDMA_CM_EVENT_MULTICAST_JOIN",
303 	"RDMA_CM_EVENT_MULTICAST_ERROR",
304 	"RDMA_CM_EVENT_ADDR_CHANGE",
305 	"RDMA_CM_EVENT_TIMEWAIT_EXIT"
306 };
307 
308 static struct nvme_rdma_poller *nvme_rdma_poll_group_get_poller(struct nvme_rdma_poll_group *group,
309 		struct ibv_context *device);
310 static void nvme_rdma_poll_group_put_poller(struct nvme_rdma_poll_group *group,
311 		struct nvme_rdma_poller *poller);
312 
313 static TAILQ_HEAD(, nvme_rdma_memory_domain) g_memory_domains = TAILQ_HEAD_INITIALIZER(
314 			g_memory_domains);
315 static pthread_mutex_t g_memory_domains_lock = PTHREAD_MUTEX_INITIALIZER;
316 
317 static struct nvme_rdma_memory_domain *
318 nvme_rdma_get_memory_domain(struct ibv_pd *pd)
319 {
320 	struct nvme_rdma_memory_domain *domain = NULL;
321 	struct spdk_memory_domain_ctx ctx;
322 	int rc;
323 
324 	pthread_mutex_lock(&g_memory_domains_lock);
325 
326 	TAILQ_FOREACH(domain, &g_memory_domains, link) {
327 		if (domain->pd == pd) {
328 			domain->ref++;
329 			pthread_mutex_unlock(&g_memory_domains_lock);
330 			return domain;
331 		}
332 	}
333 
334 	domain = calloc(1, sizeof(*domain));
335 	if (!domain) {
336 		SPDK_ERRLOG("Memory allocation failed\n");
337 		pthread_mutex_unlock(&g_memory_domains_lock);
338 		return NULL;
339 	}
340 
341 	domain->rdma_ctx.size = sizeof(domain->rdma_ctx);
342 	domain->rdma_ctx.ibv_pd = pd;
343 	ctx.size = sizeof(ctx);
344 	ctx.user_ctx = &domain->rdma_ctx;
345 
346 	rc = spdk_memory_domain_create(&domain->domain, SPDK_DMA_DEVICE_TYPE_RDMA, &ctx,
347 				       SPDK_RDMA_DMA_DEVICE);
348 	if (rc) {
349 		SPDK_ERRLOG("Failed to create memory domain\n");
350 		free(domain);
351 		pthread_mutex_unlock(&g_memory_domains_lock);
352 		return NULL;
353 	}
354 
355 	domain->pd = pd;
356 	domain->ref = 1;
357 	TAILQ_INSERT_TAIL(&g_memory_domains, domain, link);
358 
359 	pthread_mutex_unlock(&g_memory_domains_lock);
360 
361 	return domain;
362 }
363 
364 static void
365 nvme_rdma_put_memory_domain(struct nvme_rdma_memory_domain *device)
366 {
367 	if (!device) {
368 		return;
369 	}
370 
371 	pthread_mutex_lock(&g_memory_domains_lock);
372 
373 	assert(device->ref > 0);
374 
375 	device->ref--;
376 
377 	if (device->ref == 0) {
378 		spdk_memory_domain_destroy(device->domain);
379 		TAILQ_REMOVE(&g_memory_domains, device, link);
380 		free(device);
381 	}
382 
383 	pthread_mutex_unlock(&g_memory_domains_lock);
384 }
385 
386 static int nvme_rdma_ctrlr_delete_io_qpair(struct spdk_nvme_ctrlr *ctrlr,
387 		struct spdk_nvme_qpair *qpair);
388 
389 static inline struct nvme_rdma_qpair *
390 nvme_rdma_qpair(struct spdk_nvme_qpair *qpair)
391 {
392 	assert(qpair->trtype == SPDK_NVME_TRANSPORT_RDMA);
393 	return SPDK_CONTAINEROF(qpair, struct nvme_rdma_qpair, qpair);
394 }
395 
396 static inline struct nvme_rdma_poll_group *
397 nvme_rdma_poll_group(struct spdk_nvme_transport_poll_group *group)
398 {
399 	return (SPDK_CONTAINEROF(group, struct nvme_rdma_poll_group, group));
400 }
401 
402 static inline struct nvme_rdma_ctrlr *
403 nvme_rdma_ctrlr(struct spdk_nvme_ctrlr *ctrlr)
404 {
405 	assert(ctrlr->trid.trtype == SPDK_NVME_TRANSPORT_RDMA);
406 	return SPDK_CONTAINEROF(ctrlr, struct nvme_rdma_ctrlr, ctrlr);
407 }
408 
409 static struct spdk_nvme_rdma_req *
410 nvme_rdma_req_get(struct nvme_rdma_qpair *rqpair)
411 {
412 	struct spdk_nvme_rdma_req *rdma_req;
413 
414 	rdma_req = TAILQ_FIRST(&rqpair->free_reqs);
415 	if (rdma_req) {
416 		TAILQ_REMOVE(&rqpair->free_reqs, rdma_req, link);
417 		TAILQ_INSERT_TAIL(&rqpair->outstanding_reqs, rdma_req, link);
418 	}
419 
420 	return rdma_req;
421 }
422 
423 static void
424 nvme_rdma_req_put(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
425 {
426 	rdma_req->completion_flags = 0;
427 	rdma_req->req = NULL;
428 	TAILQ_INSERT_HEAD(&rqpair->free_reqs, rdma_req, link);
429 }
430 
431 static void
432 nvme_rdma_req_complete(struct spdk_nvme_rdma_req *rdma_req,
433 		       struct spdk_nvme_cpl *rsp,
434 		       bool print_on_error)
435 {
436 	struct nvme_request *req = rdma_req->req;
437 	struct nvme_rdma_qpair *rqpair;
438 	struct spdk_nvme_qpair *qpair;
439 	bool error, print_error;
440 
441 	assert(req != NULL);
442 
443 	qpair = req->qpair;
444 	rqpair = nvme_rdma_qpair(qpair);
445 
446 	error = spdk_nvme_cpl_is_error(rsp);
447 	print_error = error && print_on_error && !qpair->ctrlr->opts.disable_error_logging;
448 
449 	if (print_error) {
450 		spdk_nvme_qpair_print_command(qpair, &req->cmd);
451 	}
452 
453 	if (print_error || SPDK_DEBUGLOG_FLAG_ENABLED("nvme")) {
454 		spdk_nvme_qpair_print_completion(qpair, rsp);
455 	}
456 
457 	TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link);
458 
459 	nvme_complete_request(req->cb_fn, req->cb_arg, qpair, req, rsp);
460 	nvme_free_request(req);
461 	nvme_rdma_req_put(rqpair, rdma_req);
462 }
463 
464 static const char *
465 nvme_rdma_cm_event_str_get(uint32_t event)
466 {
467 	if (event < SPDK_COUNTOF(rdma_cm_event_str)) {
468 		return rdma_cm_event_str[event];
469 	} else {
470 		return "Undefined";
471 	}
472 }
473 
474 
475 static int
476 nvme_rdma_qpair_process_cm_event(struct nvme_rdma_qpair *rqpair)
477 {
478 	struct rdma_cm_event				*event = rqpair->evt;
479 	struct spdk_nvmf_rdma_accept_private_data	*accept_data;
480 	int						rc = 0;
481 
482 	if (event) {
483 		switch (event->event) {
484 		case RDMA_CM_EVENT_ADDR_RESOLVED:
485 		case RDMA_CM_EVENT_ADDR_ERROR:
486 		case RDMA_CM_EVENT_ROUTE_RESOLVED:
487 		case RDMA_CM_EVENT_ROUTE_ERROR:
488 			break;
489 		case RDMA_CM_EVENT_CONNECT_REQUEST:
490 			break;
491 		case RDMA_CM_EVENT_CONNECT_ERROR:
492 			break;
493 		case RDMA_CM_EVENT_UNREACHABLE:
494 		case RDMA_CM_EVENT_REJECTED:
495 			break;
496 		case RDMA_CM_EVENT_CONNECT_RESPONSE:
497 			rc = spdk_rdma_qp_complete_connect(rqpair->rdma_qp);
498 		/* fall through */
499 		case RDMA_CM_EVENT_ESTABLISHED:
500 			accept_data = (struct spdk_nvmf_rdma_accept_private_data *)event->param.conn.private_data;
501 			if (accept_data == NULL) {
502 				rc = -1;
503 			} else {
504 				SPDK_DEBUGLOG(nvme, "Requested queue depth %d. Target receive queue depth %d.\n",
505 					      rqpair->num_entries + 1, accept_data->crqsize);
506 			}
507 			break;
508 		case RDMA_CM_EVENT_DISCONNECTED:
509 			rqpair->qpair.transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_REMOTE;
510 			break;
511 		case RDMA_CM_EVENT_DEVICE_REMOVAL:
512 			rqpair->qpair.transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_LOCAL;
513 			rqpair->need_destroy = true;
514 			break;
515 		case RDMA_CM_EVENT_MULTICAST_JOIN:
516 		case RDMA_CM_EVENT_MULTICAST_ERROR:
517 			break;
518 		case RDMA_CM_EVENT_ADDR_CHANGE:
519 			rqpair->qpair.transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_LOCAL;
520 			break;
521 		case RDMA_CM_EVENT_TIMEWAIT_EXIT:
522 			break;
523 		default:
524 			SPDK_ERRLOG("Unexpected Acceptor Event [%d]\n", event->event);
525 			break;
526 		}
527 		rqpair->evt = NULL;
528 		rdma_ack_cm_event(event);
529 	}
530 
531 	return rc;
532 }
533 
534 /*
535  * This function must be called under the nvme controller's lock
536  * because it touches global controller variables. The lock is taken
537  * by the generic transport code before invoking a few of the functions
538  * in this file: nvme_rdma_ctrlr_connect_qpair, nvme_rdma_ctrlr_delete_io_qpair,
539  * and conditionally nvme_rdma_qpair_process_completions when it is calling
540  * completions on the admin qpair. When adding a new call to this function, please
541  * verify that it is in a situation where it falls under the lock.
542  */
543 static int
544 nvme_rdma_poll_events(struct nvme_rdma_ctrlr *rctrlr)
545 {
546 	struct nvme_rdma_cm_event_entry	*entry, *tmp;
547 	struct nvme_rdma_qpair		*event_qpair;
548 	struct rdma_cm_event		*event;
549 	struct rdma_event_channel	*channel = rctrlr->cm_channel;
550 
551 	STAILQ_FOREACH_SAFE(entry, &rctrlr->pending_cm_events, link, tmp) {
552 		event_qpair = entry->evt->id->context;
553 		if (event_qpair->evt == NULL) {
554 			event_qpair->evt = entry->evt;
555 			STAILQ_REMOVE(&rctrlr->pending_cm_events, entry, nvme_rdma_cm_event_entry, link);
556 			STAILQ_INSERT_HEAD(&rctrlr->free_cm_events, entry, link);
557 		}
558 	}
559 
560 	while (rdma_get_cm_event(channel, &event) == 0) {
561 		event_qpair = event->id->context;
562 		if (event_qpair->evt == NULL) {
563 			event_qpair->evt = event;
564 		} else {
565 			assert(rctrlr == nvme_rdma_ctrlr(event_qpair->qpair.ctrlr));
566 			entry = STAILQ_FIRST(&rctrlr->free_cm_events);
567 			if (entry == NULL) {
568 				rdma_ack_cm_event(event);
569 				return -ENOMEM;
570 			}
571 			STAILQ_REMOVE(&rctrlr->free_cm_events, entry, nvme_rdma_cm_event_entry, link);
572 			entry->evt = event;
573 			STAILQ_INSERT_TAIL(&rctrlr->pending_cm_events, entry, link);
574 		}
575 	}
576 
577 	/* rdma_get_cm_event() returns -1 on error. If an error occurs, errno
578 	 * will be set to indicate the failure reason. So return negated errno here.
579 	 */
580 	return -errno;
581 }
582 
583 static int
584 nvme_rdma_validate_cm_event(enum rdma_cm_event_type expected_evt_type,
585 			    struct rdma_cm_event *reaped_evt)
586 {
587 	int rc = -EBADMSG;
588 
589 	if (expected_evt_type == reaped_evt->event) {
590 		return 0;
591 	}
592 
593 	switch (expected_evt_type) {
594 	case RDMA_CM_EVENT_ESTABLISHED:
595 		/*
596 		 * There is an enum ib_cm_rej_reason in the kernel headers that sets 10 as
597 		 * IB_CM_REJ_STALE_CONN. I can't find the corresponding userspace but we get
598 		 * the same values here.
599 		 */
600 		if (reaped_evt->event == RDMA_CM_EVENT_REJECTED && reaped_evt->status == 10) {
601 			rc = -ESTALE;
602 		} else if (reaped_evt->event == RDMA_CM_EVENT_CONNECT_RESPONSE) {
603 			/*
604 			 *  If we are using a qpair which is not created using rdma cm API
605 			 *  then we will receive RDMA_CM_EVENT_CONNECT_RESPONSE instead of
606 			 *  RDMA_CM_EVENT_ESTABLISHED.
607 			 */
608 			return 0;
609 		}
610 		break;
611 	default:
612 		break;
613 	}
614 
615 	SPDK_ERRLOG("Expected %s but received %s (%d) from CM event channel (status = %d)\n",
616 		    nvme_rdma_cm_event_str_get(expected_evt_type),
617 		    nvme_rdma_cm_event_str_get(reaped_evt->event), reaped_evt->event,
618 		    reaped_evt->status);
619 	return rc;
620 }
621 
622 static int
623 nvme_rdma_process_event_start(struct nvme_rdma_qpair *rqpair,
624 			      enum rdma_cm_event_type evt,
625 			      nvme_rdma_cm_event_cb evt_cb)
626 {
627 	int	rc;
628 
629 	assert(evt_cb != NULL);
630 
631 	if (rqpair->evt != NULL) {
632 		rc = nvme_rdma_qpair_process_cm_event(rqpair);
633 		if (rc) {
634 			return rc;
635 		}
636 	}
637 
638 	rqpair->expected_evt_type = evt;
639 	rqpair->evt_cb = evt_cb;
640 	rqpair->evt_timeout_ticks = (NVME_RDMA_QPAIR_CM_EVENT_TIMEOUT_US * spdk_get_ticks_hz()) /
641 				    SPDK_SEC_TO_USEC + spdk_get_ticks();
642 
643 	return 0;
644 }
645 
646 static int
647 nvme_rdma_process_event_poll(struct nvme_rdma_qpair *rqpair)
648 {
649 	struct nvme_rdma_ctrlr	*rctrlr;
650 	int	rc = 0, rc2;
651 
652 	rctrlr = nvme_rdma_ctrlr(rqpair->qpair.ctrlr);
653 	assert(rctrlr != NULL);
654 
655 	if (!rqpair->evt && spdk_get_ticks() < rqpair->evt_timeout_ticks) {
656 		rc = nvme_rdma_poll_events(rctrlr);
657 		if (rc == -EAGAIN || rc == -EWOULDBLOCK) {
658 			return rc;
659 		}
660 	}
661 
662 	if (rqpair->evt == NULL) {
663 		rc = -EADDRNOTAVAIL;
664 		goto exit;
665 	}
666 
667 	rc = nvme_rdma_validate_cm_event(rqpair->expected_evt_type, rqpair->evt);
668 
669 	rc2 = nvme_rdma_qpair_process_cm_event(rqpair);
670 	/* bad message takes precedence over the other error codes from processing the event. */
671 	rc = rc == 0 ? rc2 : rc;
672 
673 exit:
674 	assert(rqpair->evt_cb != NULL);
675 	return rqpair->evt_cb(rqpair, rc);
676 }
677 
678 static int
679 nvme_rdma_resize_cq(struct nvme_rdma_qpair *rqpair, struct nvme_rdma_poller *poller)
680 {
681 	int	current_num_wc, required_num_wc;
682 
683 	required_num_wc = poller->required_num_wc + WC_PER_QPAIR(rqpair->num_entries);
684 	current_num_wc = poller->current_num_wc;
685 	if (current_num_wc < required_num_wc) {
686 		current_num_wc = spdk_max(current_num_wc * 2, required_num_wc);
687 	}
688 
689 	if (poller->current_num_wc != current_num_wc) {
690 		SPDK_DEBUGLOG(nvme, "Resize RDMA CQ from %d to %d\n", poller->current_num_wc,
691 			      current_num_wc);
692 		if (ibv_resize_cq(poller->cq, current_num_wc)) {
693 			SPDK_ERRLOG("RDMA CQ resize failed: errno %d: %s\n", errno, spdk_strerror(errno));
694 			return -1;
695 		}
696 
697 		poller->current_num_wc = current_num_wc;
698 	}
699 
700 	poller->required_num_wc = required_num_wc;
701 	return 0;
702 }
703 
704 static int
705 nvme_rdma_qpair_set_poller(struct spdk_nvme_qpair *qpair)
706 {
707 	struct nvme_rdma_qpair          *rqpair = nvme_rdma_qpair(qpair);
708 	struct nvme_rdma_poll_group     *group = nvme_rdma_poll_group(qpair->poll_group);
709 	struct nvme_rdma_poller         *poller;
710 
711 	assert(rqpair->cq == NULL);
712 
713 	poller = nvme_rdma_poll_group_get_poller(group, rqpair->cm_id->verbs);
714 	if (!poller) {
715 		SPDK_ERRLOG("Unable to find a cq for qpair %p on poll group %p\n", qpair, qpair->poll_group);
716 		return -EINVAL;
717 	}
718 
719 	if (!poller->srq) {
720 		if (nvme_rdma_resize_cq(rqpair, poller)) {
721 			nvme_rdma_poll_group_put_poller(group, poller);
722 			return -EPROTO;
723 		}
724 	}
725 
726 	rqpair->cq = poller->cq;
727 	rqpair->srq = poller->srq;
728 	if (rqpair->srq) {
729 		rqpair->rsps = poller->rsps;
730 	}
731 	rqpair->poller = poller;
732 	return 0;
733 }
734 
735 static int
736 nvme_rdma_qpair_init(struct nvme_rdma_qpair *rqpair)
737 {
738 	int			rc;
739 	struct spdk_rdma_qp_init_attr	attr = {};
740 	struct ibv_device_attr	dev_attr;
741 	struct nvme_rdma_ctrlr	*rctrlr;
742 
743 	rc = ibv_query_device(rqpair->cm_id->verbs, &dev_attr);
744 	if (rc != 0) {
745 		SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
746 		return -1;
747 	}
748 
749 	if (rqpair->qpair.poll_group) {
750 		assert(!rqpair->cq);
751 		rc = nvme_rdma_qpair_set_poller(&rqpair->qpair);
752 		if (rc) {
753 			SPDK_ERRLOG("Unable to activate the rdmaqpair.\n");
754 			return -1;
755 		}
756 		assert(rqpair->cq);
757 	} else {
758 		rqpair->cq = ibv_create_cq(rqpair->cm_id->verbs, rqpair->num_entries * 2, rqpair, NULL, 0);
759 		if (!rqpair->cq) {
760 			SPDK_ERRLOG("Unable to create completion queue: errno %d: %s\n", errno, spdk_strerror(errno));
761 			return -1;
762 		}
763 	}
764 
765 	rctrlr = nvme_rdma_ctrlr(rqpair->qpair.ctrlr);
766 	if (g_nvme_hooks.get_ibv_pd) {
767 		attr.pd = g_nvme_hooks.get_ibv_pd(&rctrlr->ctrlr.trid, rqpair->cm_id->verbs);
768 	} else {
769 		attr.pd = spdk_rdma_get_pd(rqpair->cm_id->verbs);
770 	}
771 
772 	attr.stats =		rqpair->poller ? &rqpair->poller->stats.rdma_stats : NULL;
773 	attr.send_cq		= rqpair->cq;
774 	attr.recv_cq		= rqpair->cq;
775 	attr.cap.max_send_wr	= rqpair->num_entries; /* SEND operations */
776 	if (rqpair->srq) {
777 		attr.srq	= rqpair->srq->srq;
778 	} else {
779 		attr.cap.max_recv_wr = rqpair->num_entries; /* RECV operations */
780 	}
781 	attr.cap.max_send_sge	= spdk_min(NVME_RDMA_DEFAULT_TX_SGE, dev_attr.max_sge);
782 	attr.cap.max_recv_sge	= spdk_min(NVME_RDMA_DEFAULT_RX_SGE, dev_attr.max_sge);
783 
784 	rqpair->rdma_qp = spdk_rdma_qp_create(rqpair->cm_id, &attr);
785 
786 	if (!rqpair->rdma_qp) {
787 		return -1;
788 	}
789 
790 	rqpair->memory_domain = nvme_rdma_get_memory_domain(rqpair->rdma_qp->qp->pd);
791 	if (!rqpair->memory_domain) {
792 		SPDK_ERRLOG("Failed to get memory domain\n");
793 		return -1;
794 	}
795 
796 	/* ibv_create_qp will change the values in attr.cap. Make sure we store the proper value. */
797 	rqpair->max_send_sge = spdk_min(NVME_RDMA_DEFAULT_TX_SGE, attr.cap.max_send_sge);
798 	rqpair->max_recv_sge = spdk_min(NVME_RDMA_DEFAULT_RX_SGE, attr.cap.max_recv_sge);
799 	rqpair->current_num_sends = 0;
800 
801 	rqpair->cm_id->context = rqpair;
802 
803 	return 0;
804 }
805 
806 static void
807 nvme_rdma_reset_failed_sends(struct nvme_rdma_qpair *rqpair,
808 			     struct ibv_send_wr *bad_send_wr, int rc)
809 {
810 	SPDK_ERRLOG("Failed to post WRs on send queue, errno %d (%s), bad_wr %p\n",
811 		    rc, spdk_strerror(rc), bad_send_wr);
812 	while (bad_send_wr != NULL) {
813 		assert(rqpair->current_num_sends > 0);
814 		rqpair->current_num_sends--;
815 		bad_send_wr = bad_send_wr->next;
816 	}
817 }
818 
819 static void
820 nvme_rdma_reset_failed_recvs(struct nvme_rdma_rsps *rsps,
821 			     struct ibv_recv_wr *bad_recv_wr, int rc)
822 {
823 	SPDK_ERRLOG("Failed to post WRs on receive queue, errno %d (%s), bad_wr %p\n",
824 		    rc, spdk_strerror(rc), bad_recv_wr);
825 	while (bad_recv_wr != NULL) {
826 		assert(rsps->current_num_recvs > 0);
827 		rsps->current_num_recvs--;
828 		bad_recv_wr = bad_recv_wr->next;
829 	}
830 }
831 
832 static inline int
833 nvme_rdma_qpair_submit_sends(struct nvme_rdma_qpair *rqpair)
834 {
835 	struct ibv_send_wr *bad_send_wr = NULL;
836 	int rc;
837 
838 	rc = spdk_rdma_qp_flush_send_wrs(rqpair->rdma_qp, &bad_send_wr);
839 
840 	if (spdk_unlikely(rc)) {
841 		nvme_rdma_reset_failed_sends(rqpair, bad_send_wr, rc);
842 	}
843 
844 	return rc;
845 }
846 
847 static inline int
848 nvme_rdma_qpair_submit_recvs(struct nvme_rdma_qpair *rqpair)
849 {
850 	struct ibv_recv_wr *bad_recv_wr;
851 	int rc = 0;
852 
853 	rc = spdk_rdma_qp_flush_recv_wrs(rqpair->rdma_qp, &bad_recv_wr);
854 	if (spdk_unlikely(rc)) {
855 		nvme_rdma_reset_failed_recvs(rqpair->rsps, bad_recv_wr, rc);
856 	}
857 
858 	return rc;
859 }
860 
861 static inline int
862 nvme_rdma_poller_submit_recvs(struct nvme_rdma_poller *poller)
863 {
864 	struct ibv_recv_wr *bad_recv_wr;
865 	int rc;
866 
867 	rc = spdk_rdma_srq_flush_recv_wrs(poller->srq, &bad_recv_wr);
868 	if (spdk_unlikely(rc)) {
869 		nvme_rdma_reset_failed_recvs(poller->rsps, bad_recv_wr, rc);
870 	}
871 
872 	return rc;
873 }
874 
875 #define nvme_rdma_trace_ibv_sge(sg_list) \
876 	if (sg_list) { \
877 		SPDK_DEBUGLOG(nvme, "local addr %p length 0x%x lkey 0x%x\n", \
878 			      (void *)(sg_list)->addr, (sg_list)->length, (sg_list)->lkey); \
879 	}
880 
881 static void
882 nvme_rdma_free_rsps(struct nvme_rdma_rsps *rsps)
883 {
884 	if (!rsps) {
885 		return;
886 	}
887 
888 	spdk_free(rsps->rsps);
889 	spdk_free(rsps->rsp_sgls);
890 	spdk_free(rsps->rsp_recv_wrs);
891 	spdk_free(rsps);
892 }
893 
894 static struct nvme_rdma_rsps *
895 nvme_rdma_create_rsps(struct nvme_rdma_rsp_opts *opts)
896 {
897 	struct nvme_rdma_rsps *rsps;
898 	struct spdk_rdma_memory_translation translation;
899 	uint16_t i;
900 	int rc;
901 
902 	rsps = spdk_zmalloc(sizeof(*rsps), 0, NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
903 	if (!rsps) {
904 		SPDK_ERRLOG("Failed to allocate rsps object\n");
905 		return NULL;
906 	}
907 
908 	rsps->rsp_sgls = spdk_zmalloc(opts->num_entries * sizeof(*rsps->rsp_sgls), 0, NULL,
909 				      SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
910 	if (!rsps->rsp_sgls) {
911 		SPDK_ERRLOG("Failed to allocate rsp_sgls\n");
912 		goto fail;
913 	}
914 
915 	rsps->rsp_recv_wrs = spdk_zmalloc(opts->num_entries * sizeof(*rsps->rsp_recv_wrs), 0, NULL,
916 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
917 	if (!rsps->rsp_recv_wrs) {
918 		SPDK_ERRLOG("Failed to allocate rsp_recv_wrs\n");
919 		goto fail;
920 	}
921 
922 	rsps->rsps = spdk_zmalloc(opts->num_entries * sizeof(*rsps->rsps), 0, NULL,
923 				  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
924 	if (!rsps->rsps) {
925 		SPDK_ERRLOG("can not allocate rdma rsps\n");
926 		goto fail;
927 	}
928 
929 	for (i = 0; i < opts->num_entries; i++) {
930 		struct ibv_sge *rsp_sgl = &rsps->rsp_sgls[i];
931 		struct spdk_nvme_rdma_rsp *rsp = &rsps->rsps[i];
932 		struct ibv_recv_wr *recv_wr = &rsps->rsp_recv_wrs[i];
933 
934 		rsp->rqpair = opts->rqpair;
935 		rsp->rdma_wr.type = RDMA_WR_TYPE_RECV;
936 		rsp->recv_wr = recv_wr;
937 		rsp_sgl->addr = (uint64_t)rsp;
938 		rsp_sgl->length = sizeof(struct spdk_nvme_cpl);
939 		rc = spdk_rdma_get_translation(opts->mr_map, rsp, sizeof(*rsp), &translation);
940 		if (rc) {
941 			goto fail;
942 		}
943 		rsp_sgl->lkey = spdk_rdma_memory_translation_get_lkey(&translation);
944 
945 		recv_wr->wr_id = (uint64_t)&rsp->rdma_wr;
946 		recv_wr->next = NULL;
947 		recv_wr->sg_list = rsp_sgl;
948 		recv_wr->num_sge = 1;
949 
950 		nvme_rdma_trace_ibv_sge(recv_wr->sg_list);
951 
952 		if (opts->rqpair) {
953 			spdk_rdma_qp_queue_recv_wrs(opts->rqpair->rdma_qp, recv_wr);
954 		} else {
955 			spdk_rdma_srq_queue_recv_wrs(opts->srq, recv_wr);
956 		}
957 	}
958 
959 	rsps->num_entries = opts->num_entries;
960 	rsps->current_num_recvs = opts->num_entries;
961 
962 	return rsps;
963 fail:
964 	nvme_rdma_free_rsps(rsps);
965 	return NULL;
966 }
967 
968 static void
969 nvme_rdma_free_reqs(struct nvme_rdma_qpair *rqpair)
970 {
971 	if (!rqpair->rdma_reqs) {
972 		return;
973 	}
974 
975 	spdk_free(rqpair->cmds);
976 	rqpair->cmds = NULL;
977 
978 	spdk_free(rqpair->rdma_reqs);
979 	rqpair->rdma_reqs = NULL;
980 }
981 
982 static int
983 nvme_rdma_create_reqs(struct nvme_rdma_qpair *rqpair)
984 {
985 	struct spdk_rdma_memory_translation translation;
986 	uint16_t i;
987 	int rc;
988 
989 	assert(!rqpair->rdma_reqs);
990 	rqpair->rdma_reqs = spdk_zmalloc(rqpair->num_entries * sizeof(struct spdk_nvme_rdma_req), 0, NULL,
991 					 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
992 	if (rqpair->rdma_reqs == NULL) {
993 		SPDK_ERRLOG("Failed to allocate rdma_reqs\n");
994 		goto fail;
995 	}
996 
997 	assert(!rqpair->cmds);
998 	rqpair->cmds = spdk_zmalloc(rqpair->num_entries * sizeof(*rqpair->cmds), 0, NULL,
999 				    SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1000 	if (!rqpair->cmds) {
1001 		SPDK_ERRLOG("Failed to allocate RDMA cmds\n");
1002 		goto fail;
1003 	}
1004 
1005 	TAILQ_INIT(&rqpair->free_reqs);
1006 	TAILQ_INIT(&rqpair->outstanding_reqs);
1007 	for (i = 0; i < rqpair->num_entries; i++) {
1008 		struct spdk_nvme_rdma_req	*rdma_req;
1009 		struct spdk_nvmf_cmd		*cmd;
1010 
1011 		rdma_req = &rqpair->rdma_reqs[i];
1012 		rdma_req->rdma_wr.type = RDMA_WR_TYPE_SEND;
1013 		cmd = &rqpair->cmds[i];
1014 
1015 		rdma_req->id = i;
1016 
1017 		rc = spdk_rdma_get_translation(rqpair->mr_map, cmd, sizeof(*cmd), &translation);
1018 		if (rc) {
1019 			goto fail;
1020 		}
1021 		rdma_req->send_sgl[0].lkey = spdk_rdma_memory_translation_get_lkey(&translation);
1022 
1023 		/* The first RDMA sgl element will always point
1024 		 * at this data structure. Depending on whether
1025 		 * an NVMe-oF SGL is required, the length of
1026 		 * this element may change. */
1027 		rdma_req->send_sgl[0].addr = (uint64_t)cmd;
1028 		rdma_req->send_wr.wr_id = (uint64_t)&rdma_req->rdma_wr;
1029 		rdma_req->send_wr.next = NULL;
1030 		rdma_req->send_wr.opcode = IBV_WR_SEND;
1031 		rdma_req->send_wr.send_flags = IBV_SEND_SIGNALED;
1032 		rdma_req->send_wr.sg_list = rdma_req->send_sgl;
1033 		rdma_req->send_wr.imm_data = 0;
1034 
1035 		TAILQ_INSERT_TAIL(&rqpair->free_reqs, rdma_req, link);
1036 	}
1037 
1038 	return 0;
1039 fail:
1040 	nvme_rdma_free_reqs(rqpair);
1041 	return -ENOMEM;
1042 }
1043 
1044 static int nvme_rdma_connect(struct nvme_rdma_qpair *rqpair);
1045 
1046 static int
1047 nvme_rdma_route_resolved(struct nvme_rdma_qpair *rqpair, int ret)
1048 {
1049 	if (ret) {
1050 		SPDK_ERRLOG("RDMA route resolution error\n");
1051 		return -1;
1052 	}
1053 
1054 	ret = nvme_rdma_qpair_init(rqpair);
1055 	if (ret < 0) {
1056 		SPDK_ERRLOG("nvme_rdma_qpair_init() failed\n");
1057 		return -1;
1058 	}
1059 
1060 	return nvme_rdma_connect(rqpair);
1061 }
1062 
1063 static int
1064 nvme_rdma_addr_resolved(struct nvme_rdma_qpair *rqpair, int ret)
1065 {
1066 	if (ret) {
1067 		SPDK_ERRLOG("RDMA address resolution error\n");
1068 		return -1;
1069 	}
1070 
1071 	if (rqpair->qpair.ctrlr->opts.transport_ack_timeout != SPDK_NVME_TRANSPORT_ACK_TIMEOUT_DISABLED) {
1072 #ifdef SPDK_CONFIG_RDMA_SET_ACK_TIMEOUT
1073 		uint8_t timeout = rqpair->qpair.ctrlr->opts.transport_ack_timeout;
1074 		ret = rdma_set_option(rqpair->cm_id, RDMA_OPTION_ID,
1075 				      RDMA_OPTION_ID_ACK_TIMEOUT,
1076 				      &timeout, sizeof(timeout));
1077 		if (ret) {
1078 			SPDK_NOTICELOG("Can't apply RDMA_OPTION_ID_ACK_TIMEOUT %d, ret %d\n", timeout, ret);
1079 		}
1080 #else
1081 		SPDK_DEBUGLOG(nvme, "transport_ack_timeout is not supported\n");
1082 #endif
1083 	}
1084 
1085 	if (rqpair->qpair.ctrlr->opts.transport_tos != SPDK_NVME_TRANSPORT_TOS_DISABLED) {
1086 #ifdef SPDK_CONFIG_RDMA_SET_TOS
1087 		uint8_t tos = rqpair->qpair.ctrlr->opts.transport_tos;
1088 		ret = rdma_set_option(rqpair->cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_TOS, &tos, sizeof(tos));
1089 		if (ret) {
1090 			SPDK_NOTICELOG("Can't apply RDMA_OPTION_ID_TOS %u, ret %d\n", tos, ret);
1091 		}
1092 #else
1093 		SPDK_DEBUGLOG(nvme, "transport_tos is not supported\n");
1094 #endif
1095 	}
1096 
1097 	ret = rdma_resolve_route(rqpair->cm_id, NVME_RDMA_TIME_OUT_IN_MS);
1098 	if (ret) {
1099 		SPDK_ERRLOG("rdma_resolve_route\n");
1100 		return ret;
1101 	}
1102 
1103 	return nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_ROUTE_RESOLVED,
1104 					     nvme_rdma_route_resolved);
1105 }
1106 
1107 static int
1108 nvme_rdma_resolve_addr(struct nvme_rdma_qpair *rqpair,
1109 		       struct sockaddr *src_addr,
1110 		       struct sockaddr *dst_addr)
1111 {
1112 	int ret;
1113 
1114 	if (src_addr) {
1115 		int reuse = 1;
1116 
1117 		ret = rdma_set_option(rqpair->cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
1118 				      &reuse, sizeof(reuse));
1119 		if (ret) {
1120 			SPDK_NOTICELOG("Can't apply RDMA_OPTION_ID_REUSEADDR %d, ret %d\n",
1121 				       reuse, ret);
1122 			/* It is likely that rdma_resolve_addr() returns -EADDRINUSE, but
1123 			 * we may missing something. We rely on rdma_resolve_addr().
1124 			 */
1125 		}
1126 	}
1127 
1128 	ret = rdma_resolve_addr(rqpair->cm_id, src_addr, dst_addr,
1129 				NVME_RDMA_TIME_OUT_IN_MS);
1130 	if (ret) {
1131 		SPDK_ERRLOG("rdma_resolve_addr, %d\n", errno);
1132 		return ret;
1133 	}
1134 
1135 	return nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_ADDR_RESOLVED,
1136 					     nvme_rdma_addr_resolved);
1137 }
1138 
1139 static int nvme_rdma_stale_conn_retry(struct nvme_rdma_qpair *rqpair);
1140 
1141 static int
1142 nvme_rdma_connect_established(struct nvme_rdma_qpair *rqpair, int ret)
1143 {
1144 	struct nvme_rdma_rsp_opts opts = {};
1145 
1146 	if (ret == -ESTALE) {
1147 		return nvme_rdma_stale_conn_retry(rqpair);
1148 	} else if (ret) {
1149 		SPDK_ERRLOG("RDMA connect error %d\n", ret);
1150 		return ret;
1151 	}
1152 
1153 	assert(!rqpair->mr_map);
1154 	rqpair->mr_map = spdk_rdma_create_mem_map(rqpair->rdma_qp->qp->pd, &g_nvme_hooks,
1155 			 SPDK_RDMA_MEMORY_MAP_ROLE_INITIATOR);
1156 	if (!rqpair->mr_map) {
1157 		SPDK_ERRLOG("Unable to register RDMA memory translation map\n");
1158 		return -1;
1159 	}
1160 
1161 	ret = nvme_rdma_create_reqs(rqpair);
1162 	SPDK_DEBUGLOG(nvme, "rc =%d\n", ret);
1163 	if (ret) {
1164 		SPDK_ERRLOG("Unable to create rqpair RDMA requests\n");
1165 		return -1;
1166 	}
1167 	SPDK_DEBUGLOG(nvme, "RDMA requests created\n");
1168 
1169 	if (!rqpair->srq) {
1170 		opts.num_entries = rqpair->num_entries;
1171 		opts.rqpair = rqpair;
1172 		opts.srq = NULL;
1173 		opts.mr_map = rqpair->mr_map;
1174 
1175 		assert(!rqpair->rsps);
1176 		rqpair->rsps = nvme_rdma_create_rsps(&opts);
1177 		if (!rqpair->rsps) {
1178 			SPDK_ERRLOG("Unable to create rqpair RDMA responses\n");
1179 			return -1;
1180 		}
1181 		SPDK_DEBUGLOG(nvme, "RDMA responses created\n");
1182 
1183 		ret = nvme_rdma_qpair_submit_recvs(rqpair);
1184 		SPDK_DEBUGLOG(nvme, "rc =%d\n", ret);
1185 		if (ret) {
1186 			SPDK_ERRLOG("Unable to submit rqpair RDMA responses\n");
1187 			return -1;
1188 		}
1189 		SPDK_DEBUGLOG(nvme, "RDMA responses submitted\n");
1190 	}
1191 
1192 	rqpair->state = NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_SEND;
1193 
1194 	return 0;
1195 }
1196 
1197 static int
1198 nvme_rdma_connect(struct nvme_rdma_qpair *rqpair)
1199 {
1200 	struct rdma_conn_param				param = {};
1201 	struct spdk_nvmf_rdma_request_private_data	request_data = {};
1202 	struct ibv_device_attr				attr;
1203 	int						ret;
1204 	struct spdk_nvme_ctrlr				*ctrlr;
1205 
1206 	ret = ibv_query_device(rqpair->cm_id->verbs, &attr);
1207 	if (ret != 0) {
1208 		SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
1209 		return ret;
1210 	}
1211 
1212 	param.responder_resources = attr.max_qp_rd_atom;
1213 
1214 	ctrlr = rqpair->qpair.ctrlr;
1215 	if (!ctrlr) {
1216 		return -1;
1217 	}
1218 
1219 	request_data.qid = rqpair->qpair.id;
1220 	request_data.hrqsize = rqpair->num_entries + 1;
1221 	request_data.hsqsize = rqpair->num_entries;
1222 	request_data.cntlid = ctrlr->cntlid;
1223 
1224 	param.private_data = &request_data;
1225 	param.private_data_len = sizeof(request_data);
1226 	param.retry_count = ctrlr->opts.transport_retry_count;
1227 	param.rnr_retry_count = 7;
1228 
1229 	/* Fields below are ignored by rdma cm if qpair has been
1230 	 * created using rdma cm API. */
1231 	param.srq = 0;
1232 	param.qp_num = rqpair->rdma_qp->qp->qp_num;
1233 
1234 	ret = rdma_connect(rqpair->cm_id, &param);
1235 	if (ret) {
1236 		SPDK_ERRLOG("nvme rdma connect error\n");
1237 		return ret;
1238 	}
1239 
1240 	return nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_ESTABLISHED,
1241 					     nvme_rdma_connect_established);
1242 }
1243 
1244 static int
1245 nvme_rdma_parse_addr(struct sockaddr_storage *sa, int family, const char *addr, const char *service)
1246 {
1247 	struct addrinfo *res;
1248 	struct addrinfo hints;
1249 	int ret;
1250 
1251 	memset(&hints, 0, sizeof(hints));
1252 	hints.ai_family = family;
1253 	hints.ai_socktype = SOCK_STREAM;
1254 	hints.ai_protocol = 0;
1255 
1256 	ret = getaddrinfo(addr, service, &hints, &res);
1257 	if (ret) {
1258 		SPDK_ERRLOG("getaddrinfo failed: %s (%d)\n", gai_strerror(ret), ret);
1259 		return -(abs(ret));
1260 	}
1261 
1262 	if (res->ai_addrlen > sizeof(*sa)) {
1263 		SPDK_ERRLOG("getaddrinfo() ai_addrlen %zu too large\n", (size_t)res->ai_addrlen);
1264 		ret = -EINVAL;
1265 	} else {
1266 		memcpy(sa, res->ai_addr, res->ai_addrlen);
1267 	}
1268 
1269 	freeaddrinfo(res);
1270 	return ret;
1271 }
1272 
1273 static int
1274 nvme_rdma_ctrlr_connect_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
1275 {
1276 	struct sockaddr_storage dst_addr;
1277 	struct sockaddr_storage src_addr;
1278 	bool src_addr_specified;
1279 	int rc;
1280 	struct nvme_rdma_ctrlr *rctrlr;
1281 	struct nvme_rdma_qpair *rqpair;
1282 	int family;
1283 
1284 	rqpair = nvme_rdma_qpair(qpair);
1285 	rctrlr = nvme_rdma_ctrlr(ctrlr);
1286 	assert(rctrlr != NULL);
1287 
1288 	switch (ctrlr->trid.adrfam) {
1289 	case SPDK_NVMF_ADRFAM_IPV4:
1290 		family = AF_INET;
1291 		break;
1292 	case SPDK_NVMF_ADRFAM_IPV6:
1293 		family = AF_INET6;
1294 		break;
1295 	default:
1296 		SPDK_ERRLOG("Unhandled ADRFAM %d\n", ctrlr->trid.adrfam);
1297 		return -1;
1298 	}
1299 
1300 	SPDK_DEBUGLOG(nvme, "adrfam %d ai_family %d\n", ctrlr->trid.adrfam, family);
1301 
1302 	memset(&dst_addr, 0, sizeof(dst_addr));
1303 
1304 	SPDK_DEBUGLOG(nvme, "trsvcid is %s\n", ctrlr->trid.trsvcid);
1305 	rc = nvme_rdma_parse_addr(&dst_addr, family, ctrlr->trid.traddr, ctrlr->trid.trsvcid);
1306 	if (rc != 0) {
1307 		SPDK_ERRLOG("dst_addr nvme_rdma_parse_addr() failed\n");
1308 		return -1;
1309 	}
1310 
1311 	if (ctrlr->opts.src_addr[0] || ctrlr->opts.src_svcid[0]) {
1312 		memset(&src_addr, 0, sizeof(src_addr));
1313 		rc = nvme_rdma_parse_addr(&src_addr, family, ctrlr->opts.src_addr, ctrlr->opts.src_svcid);
1314 		if (rc != 0) {
1315 			SPDK_ERRLOG("src_addr nvme_rdma_parse_addr() failed\n");
1316 			return -1;
1317 		}
1318 		src_addr_specified = true;
1319 	} else {
1320 		src_addr_specified = false;
1321 	}
1322 
1323 	rc = rdma_create_id(rctrlr->cm_channel, &rqpair->cm_id, rqpair, RDMA_PS_TCP);
1324 	if (rc < 0) {
1325 		SPDK_ERRLOG("rdma_create_id() failed\n");
1326 		return -1;
1327 	}
1328 
1329 	rc = nvme_rdma_resolve_addr(rqpair,
1330 				    src_addr_specified ? (struct sockaddr *)&src_addr : NULL,
1331 				    (struct sockaddr *)&dst_addr);
1332 	if (rc < 0) {
1333 		SPDK_ERRLOG("nvme_rdma_resolve_addr() failed\n");
1334 		return -1;
1335 	}
1336 
1337 	rqpair->state = NVME_RDMA_QPAIR_STATE_INITIALIZING;
1338 
1339 	return 0;
1340 }
1341 
1342 static int
1343 nvme_rdma_stale_conn_reconnect(struct nvme_rdma_qpair *rqpair)
1344 {
1345 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
1346 
1347 	if (spdk_get_ticks() < rqpair->evt_timeout_ticks) {
1348 		return -EAGAIN;
1349 	}
1350 
1351 	return nvme_rdma_ctrlr_connect_qpair(qpair->ctrlr, qpair);
1352 }
1353 
1354 static int
1355 nvme_rdma_ctrlr_connect_qpair_poll(struct spdk_nvme_ctrlr *ctrlr,
1356 				   struct spdk_nvme_qpair *qpair)
1357 {
1358 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
1359 	int rc;
1360 
1361 	if (rqpair->in_connect_poll) {
1362 		return -EAGAIN;
1363 	}
1364 
1365 	rqpair->in_connect_poll = true;
1366 
1367 	switch (rqpair->state) {
1368 	case NVME_RDMA_QPAIR_STATE_INVALID:
1369 		rc = -EAGAIN;
1370 		break;
1371 
1372 	case NVME_RDMA_QPAIR_STATE_INITIALIZING:
1373 	case NVME_RDMA_QPAIR_STATE_EXITING:
1374 		if (!nvme_qpair_is_admin_queue(qpair)) {
1375 			nvme_robust_mutex_lock(&ctrlr->ctrlr_lock);
1376 		}
1377 
1378 		rc = nvme_rdma_process_event_poll(rqpair);
1379 
1380 		if (!nvme_qpair_is_admin_queue(qpair)) {
1381 			nvme_robust_mutex_unlock(&ctrlr->ctrlr_lock);
1382 		}
1383 
1384 		if (rc == 0) {
1385 			rc = -EAGAIN;
1386 		}
1387 		rqpair->in_connect_poll = false;
1388 
1389 		return rc;
1390 
1391 	case NVME_RDMA_QPAIR_STATE_STALE_CONN:
1392 		rc = nvme_rdma_stale_conn_reconnect(rqpair);
1393 		if (rc == 0) {
1394 			rc = -EAGAIN;
1395 		}
1396 		break;
1397 	case NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_SEND:
1398 		rc = nvme_fabric_qpair_connect_async(qpair, rqpair->num_entries + 1);
1399 		if (rc == 0) {
1400 			rqpair->state = NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_POLL;
1401 			rc = -EAGAIN;
1402 		} else {
1403 			SPDK_ERRLOG("Failed to send an NVMe-oF Fabric CONNECT command\n");
1404 		}
1405 		break;
1406 	case NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_POLL:
1407 		rc = nvme_fabric_qpair_connect_poll(qpair);
1408 		if (rc == 0) {
1409 			rqpair->state = NVME_RDMA_QPAIR_STATE_RUNNING;
1410 			nvme_qpair_set_state(qpair, NVME_QPAIR_CONNECTED);
1411 		} else if (rc != -EAGAIN) {
1412 			SPDK_ERRLOG("Failed to poll NVMe-oF Fabric CONNECT command\n");
1413 		}
1414 		break;
1415 	case NVME_RDMA_QPAIR_STATE_RUNNING:
1416 		rc = 0;
1417 		break;
1418 	default:
1419 		assert(false);
1420 		rc = -EINVAL;
1421 		break;
1422 	}
1423 
1424 	rqpair->in_connect_poll = false;
1425 
1426 	return rc;
1427 }
1428 
1429 static inline int
1430 nvme_rdma_get_memory_translation(struct nvme_request *req, struct nvme_rdma_qpair *rqpair,
1431 				 struct nvme_rdma_memory_translation_ctx *_ctx)
1432 {
1433 	struct spdk_memory_domain_translation_ctx ctx;
1434 	struct spdk_memory_domain_translation_result dma_translation = {.iov_count = 0};
1435 	struct spdk_rdma_memory_translation rdma_translation;
1436 	int rc;
1437 
1438 	assert(req);
1439 	assert(rqpair);
1440 	assert(_ctx);
1441 
1442 	if (req->payload.opts && req->payload.opts->memory_domain) {
1443 		ctx.size = sizeof(struct spdk_memory_domain_translation_ctx);
1444 		ctx.rdma.ibv_qp = rqpair->rdma_qp->qp;
1445 		dma_translation.size = sizeof(struct spdk_memory_domain_translation_result);
1446 
1447 		rc = spdk_memory_domain_translate_data(req->payload.opts->memory_domain,
1448 						       req->payload.opts->memory_domain_ctx,
1449 						       rqpair->memory_domain->domain, &ctx, _ctx->addr,
1450 						       _ctx->length, &dma_translation);
1451 		if (spdk_unlikely(rc) || dma_translation.iov_count != 1) {
1452 			SPDK_ERRLOG("DMA memory translation failed, rc %d, iov count %u\n", rc, dma_translation.iov_count);
1453 			return rc;
1454 		}
1455 
1456 		_ctx->lkey = dma_translation.rdma.lkey;
1457 		_ctx->rkey = dma_translation.rdma.rkey;
1458 		_ctx->addr = dma_translation.iov.iov_base;
1459 		_ctx->length = dma_translation.iov.iov_len;
1460 	} else {
1461 		rc = spdk_rdma_get_translation(rqpair->mr_map, _ctx->addr, _ctx->length, &rdma_translation);
1462 		if (spdk_unlikely(rc)) {
1463 			SPDK_ERRLOG("RDMA memory translation failed, rc %d\n", rc);
1464 			return rc;
1465 		}
1466 		if (rdma_translation.translation_type == SPDK_RDMA_TRANSLATION_MR) {
1467 			_ctx->lkey = rdma_translation.mr_or_key.mr->lkey;
1468 			_ctx->rkey = rdma_translation.mr_or_key.mr->rkey;
1469 		} else {
1470 			_ctx->lkey = _ctx->rkey = (uint32_t)rdma_translation.mr_or_key.key;
1471 		}
1472 	}
1473 
1474 	return 0;
1475 }
1476 
1477 
1478 /*
1479  * Build SGL describing empty payload.
1480  */
1481 static int
1482 nvme_rdma_build_null_request(struct spdk_nvme_rdma_req *rdma_req)
1483 {
1484 	struct nvme_request *req = rdma_req->req;
1485 
1486 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1487 
1488 	/* The first element of this SGL is pointing at an
1489 	 * spdk_nvmf_cmd object. For this particular command,
1490 	 * we only need the first 64 bytes corresponding to
1491 	 * the NVMe command. */
1492 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1493 
1494 	/* The RDMA SGL needs one element describing the NVMe command. */
1495 	rdma_req->send_wr.num_sge = 1;
1496 
1497 	req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1498 	req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1499 	req->cmd.dptr.sgl1.keyed.length = 0;
1500 	req->cmd.dptr.sgl1.keyed.key = 0;
1501 	req->cmd.dptr.sgl1.address = 0;
1502 
1503 	return 0;
1504 }
1505 
1506 /*
1507  * Build inline SGL describing contiguous payload buffer.
1508  */
1509 static int
1510 nvme_rdma_build_contig_inline_request(struct nvme_rdma_qpair *rqpair,
1511 				      struct spdk_nvme_rdma_req *rdma_req)
1512 {
1513 	struct nvme_request *req = rdma_req->req;
1514 	struct nvme_rdma_memory_translation_ctx ctx = {
1515 		.addr = (uint8_t *)req->payload.contig_or_cb_arg + req->payload_offset,
1516 		.length = req->payload_size
1517 	};
1518 	int rc;
1519 
1520 	assert(ctx.length != 0);
1521 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
1522 
1523 	rc = nvme_rdma_get_memory_translation(req, rqpair, &ctx);
1524 	if (spdk_unlikely(rc)) {
1525 		return -1;
1526 	}
1527 
1528 	rdma_req->send_sgl[1].lkey = ctx.lkey;
1529 
1530 	/* The first element of this SGL is pointing at an
1531 	 * spdk_nvmf_cmd object. For this particular command,
1532 	 * we only need the first 64 bytes corresponding to
1533 	 * the NVMe command. */
1534 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1535 
1536 	rdma_req->send_sgl[1].addr = (uint64_t)ctx.addr;
1537 	rdma_req->send_sgl[1].length = (uint32_t)ctx.length;
1538 
1539 	/* The RDMA SGL contains two elements. The first describes
1540 	 * the NVMe command and the second describes the data
1541 	 * payload. */
1542 	rdma_req->send_wr.num_sge = 2;
1543 
1544 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1545 	req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
1546 	req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1547 	req->cmd.dptr.sgl1.unkeyed.length = (uint32_t)ctx.length;
1548 	/* Inline only supported for icdoff == 0 currently.  This function will
1549 	 * not get called for controllers with other values. */
1550 	req->cmd.dptr.sgl1.address = (uint64_t)0;
1551 
1552 	return 0;
1553 }
1554 
1555 /*
1556  * Build SGL describing contiguous payload buffer.
1557  */
1558 static int
1559 nvme_rdma_build_contig_request(struct nvme_rdma_qpair *rqpair,
1560 			       struct spdk_nvme_rdma_req *rdma_req)
1561 {
1562 	struct nvme_request *req = rdma_req->req;
1563 	struct nvme_rdma_memory_translation_ctx ctx = {
1564 		.addr = (uint8_t *)req->payload.contig_or_cb_arg + req->payload_offset,
1565 		.length = req->payload_size
1566 	};
1567 	int rc;
1568 
1569 	assert(req->payload_size != 0);
1570 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
1571 
1572 	if (spdk_unlikely(req->payload_size > NVME_RDMA_MAX_KEYED_SGL_LENGTH)) {
1573 		SPDK_ERRLOG("SGL length %u exceeds max keyed SGL block size %u\n",
1574 			    req->payload_size, NVME_RDMA_MAX_KEYED_SGL_LENGTH);
1575 		return -1;
1576 	}
1577 
1578 	rc = nvme_rdma_get_memory_translation(req, rqpair, &ctx);
1579 	if (spdk_unlikely(rc)) {
1580 		return -1;
1581 	}
1582 
1583 	req->cmd.dptr.sgl1.keyed.key = ctx.rkey;
1584 
1585 	/* The first element of this SGL is pointing at an
1586 	 * spdk_nvmf_cmd object. For this particular command,
1587 	 * we only need the first 64 bytes corresponding to
1588 	 * the NVMe command. */
1589 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1590 
1591 	/* The RDMA SGL needs one element describing the NVMe command. */
1592 	rdma_req->send_wr.num_sge = 1;
1593 
1594 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1595 	req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1596 	req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1597 	req->cmd.dptr.sgl1.keyed.length = (uint32_t)ctx.length;
1598 	req->cmd.dptr.sgl1.address = (uint64_t)ctx.addr;
1599 
1600 	return 0;
1601 }
1602 
1603 /*
1604  * Build SGL describing scattered payload buffer.
1605  */
1606 static int
1607 nvme_rdma_build_sgl_request(struct nvme_rdma_qpair *rqpair,
1608 			    struct spdk_nvme_rdma_req *rdma_req)
1609 {
1610 	struct nvme_request *req = rdma_req->req;
1611 	struct spdk_nvmf_cmd *cmd = &rqpair->cmds[rdma_req->id];
1612 	struct nvme_rdma_memory_translation_ctx ctx;
1613 	uint32_t remaining_size;
1614 	uint32_t sge_length;
1615 	int rc, max_num_sgl, num_sgl_desc;
1616 
1617 	assert(req->payload_size != 0);
1618 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
1619 	assert(req->payload.reset_sgl_fn != NULL);
1620 	assert(req->payload.next_sge_fn != NULL);
1621 	req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
1622 
1623 	max_num_sgl = req->qpair->ctrlr->max_sges;
1624 
1625 	remaining_size = req->payload_size;
1626 	num_sgl_desc = 0;
1627 	do {
1628 		rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &ctx.addr, &sge_length);
1629 		if (rc) {
1630 			return -1;
1631 		}
1632 
1633 		sge_length = spdk_min(remaining_size, sge_length);
1634 
1635 		if (spdk_unlikely(sge_length > NVME_RDMA_MAX_KEYED_SGL_LENGTH)) {
1636 			SPDK_ERRLOG("SGL length %u exceeds max keyed SGL block size %u\n",
1637 				    sge_length, NVME_RDMA_MAX_KEYED_SGL_LENGTH);
1638 			return -1;
1639 		}
1640 		ctx.length = sge_length;
1641 		rc = nvme_rdma_get_memory_translation(req, rqpair, &ctx);
1642 		if (spdk_unlikely(rc)) {
1643 			return -1;
1644 		}
1645 
1646 		cmd->sgl[num_sgl_desc].keyed.key = ctx.rkey;
1647 		cmd->sgl[num_sgl_desc].keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1648 		cmd->sgl[num_sgl_desc].keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1649 		cmd->sgl[num_sgl_desc].keyed.length = (uint32_t)ctx.length;
1650 		cmd->sgl[num_sgl_desc].address = (uint64_t)ctx.addr;
1651 
1652 		remaining_size -= ctx.length;
1653 		num_sgl_desc++;
1654 	} while (remaining_size > 0 && num_sgl_desc < max_num_sgl);
1655 
1656 
1657 	/* Should be impossible if we did our sgl checks properly up the stack, but do a sanity check here. */
1658 	if (remaining_size > 0) {
1659 		return -1;
1660 	}
1661 
1662 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1663 
1664 	/* The RDMA SGL needs one element describing some portion
1665 	 * of the spdk_nvmf_cmd structure. */
1666 	rdma_req->send_wr.num_sge = 1;
1667 
1668 	/*
1669 	 * If only one SGL descriptor is required, it can be embedded directly in the command
1670 	 * as a data block descriptor.
1671 	 */
1672 	if (num_sgl_desc == 1) {
1673 		/* The first element of this SGL is pointing at an
1674 		 * spdk_nvmf_cmd object. For this particular command,
1675 		 * we only need the first 64 bytes corresponding to
1676 		 * the NVMe command. */
1677 		rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1678 
1679 		req->cmd.dptr.sgl1.keyed.type = cmd->sgl[0].keyed.type;
1680 		req->cmd.dptr.sgl1.keyed.subtype = cmd->sgl[0].keyed.subtype;
1681 		req->cmd.dptr.sgl1.keyed.length = cmd->sgl[0].keyed.length;
1682 		req->cmd.dptr.sgl1.keyed.key = cmd->sgl[0].keyed.key;
1683 		req->cmd.dptr.sgl1.address = cmd->sgl[0].address;
1684 	} else {
1685 		/*
1686 		 * Otherwise, The SGL descriptor embedded in the command must point to the list of
1687 		 * SGL descriptors used to describe the operation. In that case it is a last segment descriptor.
1688 		 */
1689 		uint32_t descriptors_size = sizeof(struct spdk_nvme_sgl_descriptor) * num_sgl_desc;
1690 
1691 		if (spdk_unlikely(descriptors_size > rqpair->qpair.ctrlr->ioccsz_bytes)) {
1692 			SPDK_ERRLOG("Size of SGL descriptors (%u) exceeds ICD (%u)\n",
1693 				    descriptors_size, rqpair->qpair.ctrlr->ioccsz_bytes);
1694 			return -1;
1695 		}
1696 		rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd) + descriptors_size;
1697 
1698 		req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_LAST_SEGMENT;
1699 		req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1700 		req->cmd.dptr.sgl1.unkeyed.length = descriptors_size;
1701 		req->cmd.dptr.sgl1.address = (uint64_t)0;
1702 	}
1703 
1704 	return 0;
1705 }
1706 
1707 /*
1708  * Build inline SGL describing sgl payload buffer.
1709  */
1710 static int
1711 nvme_rdma_build_sgl_inline_request(struct nvme_rdma_qpair *rqpair,
1712 				   struct spdk_nvme_rdma_req *rdma_req)
1713 {
1714 	struct nvme_request *req = rdma_req->req;
1715 	struct nvme_rdma_memory_translation_ctx ctx;
1716 	uint32_t length;
1717 	int rc;
1718 
1719 	assert(req->payload_size != 0);
1720 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
1721 	assert(req->payload.reset_sgl_fn != NULL);
1722 	assert(req->payload.next_sge_fn != NULL);
1723 	req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
1724 
1725 	rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &ctx.addr, &length);
1726 	if (rc) {
1727 		return -1;
1728 	}
1729 
1730 	if (length < req->payload_size) {
1731 		SPDK_DEBUGLOG(nvme, "Inline SGL request split so sending separately.\n");
1732 		return nvme_rdma_build_sgl_request(rqpair, rdma_req);
1733 	}
1734 
1735 	if (length > req->payload_size) {
1736 		length = req->payload_size;
1737 	}
1738 
1739 	ctx.length = length;
1740 	rc = nvme_rdma_get_memory_translation(req, rqpair, &ctx);
1741 	if (spdk_unlikely(rc)) {
1742 		return -1;
1743 	}
1744 
1745 	rdma_req->send_sgl[1].addr = (uint64_t)ctx.addr;
1746 	rdma_req->send_sgl[1].length = (uint32_t)ctx.length;
1747 	rdma_req->send_sgl[1].lkey = ctx.lkey;
1748 
1749 	rdma_req->send_wr.num_sge = 2;
1750 
1751 	/* The first element of this SGL is pointing at an
1752 	 * spdk_nvmf_cmd object. For this particular command,
1753 	 * we only need the first 64 bytes corresponding to
1754 	 * the NVMe command. */
1755 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1756 
1757 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1758 	req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
1759 	req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1760 	req->cmd.dptr.sgl1.unkeyed.length = (uint32_t)ctx.length;
1761 	/* Inline only supported for icdoff == 0 currently.  This function will
1762 	 * not get called for controllers with other values. */
1763 	req->cmd.dptr.sgl1.address = (uint64_t)0;
1764 
1765 	return 0;
1766 }
1767 
1768 static int
1769 nvme_rdma_req_init(struct nvme_rdma_qpair *rqpair, struct nvme_request *req,
1770 		   struct spdk_nvme_rdma_req *rdma_req)
1771 {
1772 	struct spdk_nvme_ctrlr *ctrlr = rqpair->qpair.ctrlr;
1773 	enum nvme_payload_type payload_type;
1774 	bool icd_supported;
1775 	int rc;
1776 
1777 	assert(rdma_req->req == NULL);
1778 	rdma_req->req = req;
1779 	req->cmd.cid = rdma_req->id;
1780 	payload_type = nvme_payload_type(&req->payload);
1781 	/*
1782 	 * Check if icdoff is non zero, to avoid interop conflicts with
1783 	 * targets with non-zero icdoff.  Both SPDK and the Linux kernel
1784 	 * targets use icdoff = 0.  For targets with non-zero icdoff, we
1785 	 * will currently just not use inline data for now.
1786 	 */
1787 	icd_supported = spdk_nvme_opc_get_data_transfer(req->cmd.opc) == SPDK_NVME_DATA_HOST_TO_CONTROLLER
1788 			&& req->payload_size <= ctrlr->ioccsz_bytes && ctrlr->icdoff == 0;
1789 
1790 	if (req->payload_size == 0) {
1791 		rc = nvme_rdma_build_null_request(rdma_req);
1792 	} else if (payload_type == NVME_PAYLOAD_TYPE_CONTIG) {
1793 		if (icd_supported) {
1794 			rc = nvme_rdma_build_contig_inline_request(rqpair, rdma_req);
1795 		} else {
1796 			rc = nvme_rdma_build_contig_request(rqpair, rdma_req);
1797 		}
1798 	} else if (payload_type == NVME_PAYLOAD_TYPE_SGL) {
1799 		if (icd_supported) {
1800 			rc = nvme_rdma_build_sgl_inline_request(rqpair, rdma_req);
1801 		} else {
1802 			rc = nvme_rdma_build_sgl_request(rqpair, rdma_req);
1803 		}
1804 	} else {
1805 		rc = -1;
1806 	}
1807 
1808 	if (rc) {
1809 		rdma_req->req = NULL;
1810 		return rc;
1811 	}
1812 
1813 	memcpy(&rqpair->cmds[rdma_req->id], &req->cmd, sizeof(req->cmd));
1814 	return 0;
1815 }
1816 
1817 static struct spdk_nvme_qpair *
1818 nvme_rdma_ctrlr_create_qpair(struct spdk_nvme_ctrlr *ctrlr,
1819 			     uint16_t qid, uint32_t qsize,
1820 			     enum spdk_nvme_qprio qprio,
1821 			     uint32_t num_requests,
1822 			     bool delay_cmd_submit,
1823 			     bool async)
1824 {
1825 	struct nvme_rdma_qpair *rqpair;
1826 	struct spdk_nvme_qpair *qpair;
1827 	int rc;
1828 
1829 	if (qsize < SPDK_NVME_QUEUE_MIN_ENTRIES) {
1830 		SPDK_ERRLOG("Failed to create qpair with size %u. Minimum queue size is %d.\n",
1831 			    qsize, SPDK_NVME_QUEUE_MIN_ENTRIES);
1832 		return NULL;
1833 	}
1834 
1835 	rqpair = spdk_zmalloc(sizeof(struct nvme_rdma_qpair), 0, NULL, SPDK_ENV_SOCKET_ID_ANY,
1836 			      SPDK_MALLOC_DMA);
1837 	if (!rqpair) {
1838 		SPDK_ERRLOG("failed to get create rqpair\n");
1839 		return NULL;
1840 	}
1841 
1842 	/* Set num_entries one less than queue size. According to NVMe
1843 	 * and NVMe-oF specs we can not submit queue size requests,
1844 	 * one slot shall always remain empty.
1845 	 */
1846 	rqpair->num_entries = qsize - 1;
1847 	rqpair->delay_cmd_submit = delay_cmd_submit;
1848 	rqpair->state = NVME_RDMA_QPAIR_STATE_INVALID;
1849 	qpair = &rqpair->qpair;
1850 	rc = nvme_qpair_init(qpair, qid, ctrlr, qprio, num_requests, async);
1851 	if (rc != 0) {
1852 		spdk_free(rqpair);
1853 		return NULL;
1854 	}
1855 
1856 	return qpair;
1857 }
1858 
1859 static void
1860 nvme_rdma_qpair_destroy(struct nvme_rdma_qpair *rqpair)
1861 {
1862 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
1863 	struct nvme_rdma_ctrlr *rctrlr;
1864 	struct nvme_rdma_cm_event_entry *entry, *tmp;
1865 
1866 	spdk_rdma_free_mem_map(&rqpair->mr_map);
1867 
1868 	if (rqpair->evt) {
1869 		rdma_ack_cm_event(rqpair->evt);
1870 		rqpair->evt = NULL;
1871 	}
1872 
1873 	/*
1874 	 * This works because we have the controller lock both in
1875 	 * this function and in the function where we add new events.
1876 	 */
1877 	if (qpair->ctrlr != NULL) {
1878 		rctrlr = nvme_rdma_ctrlr(qpair->ctrlr);
1879 		STAILQ_FOREACH_SAFE(entry, &rctrlr->pending_cm_events, link, tmp) {
1880 			if (entry->evt->id->context == rqpair) {
1881 				STAILQ_REMOVE(&rctrlr->pending_cm_events, entry, nvme_rdma_cm_event_entry, link);
1882 				rdma_ack_cm_event(entry->evt);
1883 				STAILQ_INSERT_HEAD(&rctrlr->free_cm_events, entry, link);
1884 			}
1885 		}
1886 	}
1887 
1888 	if (rqpair->cm_id) {
1889 		if (rqpair->rdma_qp) {
1890 			spdk_rdma_put_pd(rqpair->rdma_qp->qp->pd);
1891 			spdk_rdma_qp_destroy(rqpair->rdma_qp);
1892 			rqpair->rdma_qp = NULL;
1893 		}
1894 	}
1895 
1896 	if (rqpair->poller) {
1897 		struct nvme_rdma_poll_group     *group;
1898 
1899 		assert(qpair->poll_group);
1900 		group = nvme_rdma_poll_group(qpair->poll_group);
1901 
1902 		nvme_rdma_poll_group_put_poller(group, rqpair->poller);
1903 
1904 		rqpair->poller = NULL;
1905 		rqpair->cq = NULL;
1906 		if (rqpair->srq) {
1907 			rqpair->srq = NULL;
1908 			rqpair->rsps = NULL;
1909 		}
1910 	} else if (rqpair->cq) {
1911 		ibv_destroy_cq(rqpair->cq);
1912 		rqpair->cq = NULL;
1913 	}
1914 
1915 	nvme_rdma_free_reqs(rqpair);
1916 	nvme_rdma_free_rsps(rqpair->rsps);
1917 	rqpair->rsps = NULL;
1918 
1919 	/* destroy cm_id last so cma device will not be freed before we destroy the cq. */
1920 	if (rqpair->cm_id) {
1921 		rdma_destroy_id(rqpair->cm_id);
1922 		rqpair->cm_id = NULL;
1923 	}
1924 }
1925 
1926 static void nvme_rdma_qpair_abort_reqs(struct spdk_nvme_qpair *qpair, uint32_t dnr);
1927 
1928 static int
1929 nvme_rdma_qpair_disconnected(struct nvme_rdma_qpair *rqpair, int ret)
1930 {
1931 	if (ret) {
1932 		SPDK_DEBUGLOG(nvme, "Target did not respond to qpair disconnect.\n");
1933 		goto quiet;
1934 	}
1935 
1936 	if (rqpair->poller == NULL) {
1937 		/* If poller is not used, cq is not shared.
1938 		 * So complete disconnecting qpair immediately.
1939 		 */
1940 		goto quiet;
1941 	}
1942 
1943 	if (rqpair->rsps == NULL) {
1944 		goto quiet;
1945 	}
1946 
1947 	if (rqpair->need_destroy ||
1948 	    (rqpair->current_num_sends != 0 ||
1949 	     (!rqpair->srq && rqpair->rsps->current_num_recvs != 0))) {
1950 		rqpair->state = NVME_RDMA_QPAIR_STATE_LINGERING;
1951 		rqpair->evt_timeout_ticks = (NVME_RDMA_DISCONNECTED_QPAIR_TIMEOUT_US * spdk_get_ticks_hz()) /
1952 					    SPDK_SEC_TO_USEC + spdk_get_ticks();
1953 
1954 		return -EAGAIN;
1955 	}
1956 
1957 quiet:
1958 	rqpair->state = NVME_RDMA_QPAIR_STATE_EXITED;
1959 
1960 	nvme_rdma_qpair_abort_reqs(&rqpair->qpair, 0);
1961 	nvme_rdma_qpair_destroy(rqpair);
1962 	nvme_transport_ctrlr_disconnect_qpair_done(&rqpair->qpair);
1963 
1964 	return 0;
1965 }
1966 
1967 static int
1968 nvme_rdma_qpair_wait_until_quiet(struct nvme_rdma_qpair *rqpair)
1969 {
1970 	if (spdk_get_ticks() < rqpair->evt_timeout_ticks &&
1971 	    (rqpair->current_num_sends != 0 ||
1972 	     (!rqpair->srq && rqpair->rsps->current_num_recvs != 0))) {
1973 		return -EAGAIN;
1974 	}
1975 
1976 	rqpair->state = NVME_RDMA_QPAIR_STATE_EXITED;
1977 
1978 	nvme_rdma_qpair_abort_reqs(&rqpair->qpair, 0);
1979 	nvme_rdma_qpair_destroy(rqpair);
1980 	nvme_transport_ctrlr_disconnect_qpair_done(&rqpair->qpair);
1981 
1982 	return 0;
1983 }
1984 
1985 static void
1986 _nvme_rdma_ctrlr_disconnect_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair,
1987 				  nvme_rdma_cm_event_cb disconnected_qpair_cb)
1988 {
1989 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
1990 	int rc;
1991 
1992 	assert(disconnected_qpair_cb != NULL);
1993 
1994 	rqpair->state = NVME_RDMA_QPAIR_STATE_EXITING;
1995 
1996 	if (rqpair->cm_id) {
1997 		if (rqpair->rdma_qp) {
1998 			rc = spdk_rdma_qp_disconnect(rqpair->rdma_qp);
1999 			if ((qpair->ctrlr != NULL) && (rc == 0)) {
2000 				rc = nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_DISCONNECTED,
2001 								   disconnected_qpair_cb);
2002 				if (rc == 0) {
2003 					return;
2004 				}
2005 			}
2006 		}
2007 	}
2008 
2009 	disconnected_qpair_cb(rqpair, 0);
2010 }
2011 
2012 static int
2013 nvme_rdma_ctrlr_disconnect_qpair_poll(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
2014 {
2015 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2016 	int rc;
2017 
2018 	switch (rqpair->state) {
2019 	case NVME_RDMA_QPAIR_STATE_EXITING:
2020 		if (!nvme_qpair_is_admin_queue(qpair)) {
2021 			nvme_robust_mutex_lock(&ctrlr->ctrlr_lock);
2022 		}
2023 
2024 		rc = nvme_rdma_process_event_poll(rqpair);
2025 
2026 		if (!nvme_qpair_is_admin_queue(qpair)) {
2027 			nvme_robust_mutex_unlock(&ctrlr->ctrlr_lock);
2028 		}
2029 		break;
2030 
2031 	case NVME_RDMA_QPAIR_STATE_LINGERING:
2032 		rc = nvme_rdma_qpair_wait_until_quiet(rqpair);
2033 		break;
2034 	case NVME_RDMA_QPAIR_STATE_EXITED:
2035 		rc = 0;
2036 		break;
2037 
2038 	default:
2039 		assert(false);
2040 		rc = -EAGAIN;
2041 		break;
2042 	}
2043 
2044 	return rc;
2045 }
2046 
2047 static void
2048 nvme_rdma_ctrlr_disconnect_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
2049 {
2050 	int rc;
2051 
2052 	_nvme_rdma_ctrlr_disconnect_qpair(ctrlr, qpair, nvme_rdma_qpair_disconnected);
2053 
2054 	/* If the async mode is disabled, poll the qpair until it is actually disconnected.
2055 	 * It is ensured that poll_group_process_completions() calls disconnected_qpair_cb
2056 	 * for any disconnected qpair. Hence, we do not have to check if the qpair is in
2057 	 * a poll group or not.
2058 	 * At the same time, if the qpair is being destroyed, i.e. this function is called by
2059 	 * spdk_nvme_ctrlr_free_io_qpair then we need to wait until qpair is disconnected, otherwise
2060 	 * we may leak some resources.
2061 	 */
2062 	if (qpair->async && !qpair->destroy_in_progress) {
2063 		return;
2064 	}
2065 
2066 	while (1) {
2067 		rc = nvme_rdma_ctrlr_disconnect_qpair_poll(ctrlr, qpair);
2068 		if (rc != -EAGAIN) {
2069 			break;
2070 		}
2071 	}
2072 }
2073 
2074 static int
2075 nvme_rdma_stale_conn_disconnected(struct nvme_rdma_qpair *rqpair, int ret)
2076 {
2077 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
2078 
2079 	if (ret) {
2080 		SPDK_DEBUGLOG(nvme, "Target did not respond to qpair disconnect.\n");
2081 	}
2082 
2083 	nvme_rdma_qpair_destroy(rqpair);
2084 
2085 	qpair->last_transport_failure_reason = qpair->transport_failure_reason;
2086 	qpair->transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_NONE;
2087 
2088 	rqpair->state = NVME_RDMA_QPAIR_STATE_STALE_CONN;
2089 	rqpair->evt_timeout_ticks = (NVME_RDMA_STALE_CONN_RETRY_DELAY_US * spdk_get_ticks_hz()) /
2090 				    SPDK_SEC_TO_USEC + spdk_get_ticks();
2091 
2092 	return 0;
2093 }
2094 
2095 static int
2096 nvme_rdma_stale_conn_retry(struct nvme_rdma_qpair *rqpair)
2097 {
2098 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
2099 
2100 	if (rqpair->stale_conn_retry_count >= NVME_RDMA_STALE_CONN_RETRY_MAX) {
2101 		SPDK_ERRLOG("Retry failed %d times, give up stale connection to qpair (cntlid:%u, qid:%u).\n",
2102 			    NVME_RDMA_STALE_CONN_RETRY_MAX, qpair->ctrlr->cntlid, qpair->id);
2103 		return -ESTALE;
2104 	}
2105 
2106 	rqpair->stale_conn_retry_count++;
2107 
2108 	SPDK_NOTICELOG("%d times, retry stale connection to qpair (cntlid:%u, qid:%u).\n",
2109 		       rqpair->stale_conn_retry_count, qpair->ctrlr->cntlid, qpair->id);
2110 
2111 	_nvme_rdma_ctrlr_disconnect_qpair(qpair->ctrlr, qpair, nvme_rdma_stale_conn_disconnected);
2112 
2113 	return 0;
2114 }
2115 
2116 static int
2117 nvme_rdma_ctrlr_delete_io_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
2118 {
2119 	struct nvme_rdma_qpair *rqpair;
2120 
2121 	assert(qpair != NULL);
2122 	rqpair = nvme_rdma_qpair(qpair);
2123 
2124 	if (rqpair->state != NVME_RDMA_QPAIR_STATE_EXITED) {
2125 		int rc __attribute__((unused));
2126 
2127 		/* qpair was removed from the poll group while the disconnect is not finished.
2128 		 * Destroy rdma resources forcefully. */
2129 		rc = nvme_rdma_qpair_disconnected(rqpair, 0);
2130 		assert(rc == 0);
2131 	}
2132 
2133 	nvme_rdma_qpair_abort_reqs(qpair, 0);
2134 	nvme_qpair_deinit(qpair);
2135 
2136 	nvme_rdma_put_memory_domain(rqpair->memory_domain);
2137 
2138 	spdk_free(rqpair);
2139 
2140 	return 0;
2141 }
2142 
2143 static struct spdk_nvme_qpair *
2144 nvme_rdma_ctrlr_create_io_qpair(struct spdk_nvme_ctrlr *ctrlr, uint16_t qid,
2145 				const struct spdk_nvme_io_qpair_opts *opts)
2146 {
2147 	return nvme_rdma_ctrlr_create_qpair(ctrlr, qid, opts->io_queue_size, opts->qprio,
2148 					    opts->io_queue_requests,
2149 					    opts->delay_cmd_submit,
2150 					    opts->async_mode);
2151 }
2152 
2153 static int
2154 nvme_rdma_ctrlr_enable(struct spdk_nvme_ctrlr *ctrlr)
2155 {
2156 	/* do nothing here */
2157 	return 0;
2158 }
2159 
2160 static int nvme_rdma_ctrlr_destruct(struct spdk_nvme_ctrlr *ctrlr);
2161 
2162 /* We have to use the typedef in the function declaration to appease astyle. */
2163 typedef struct spdk_nvme_ctrlr spdk_nvme_ctrlr_t;
2164 
2165 static spdk_nvme_ctrlr_t *
2166 nvme_rdma_ctrlr_construct(const struct spdk_nvme_transport_id *trid,
2167 			  const struct spdk_nvme_ctrlr_opts *opts,
2168 			  void *devhandle)
2169 {
2170 	struct nvme_rdma_ctrlr *rctrlr;
2171 	struct ibv_context **contexts;
2172 	struct ibv_device_attr dev_attr;
2173 	int i, flag, rc;
2174 
2175 	rctrlr = spdk_zmalloc(sizeof(struct nvme_rdma_ctrlr), 0, NULL, SPDK_ENV_SOCKET_ID_ANY,
2176 			      SPDK_MALLOC_DMA);
2177 	if (rctrlr == NULL) {
2178 		SPDK_ERRLOG("could not allocate ctrlr\n");
2179 		return NULL;
2180 	}
2181 
2182 	rctrlr->ctrlr.opts = *opts;
2183 	rctrlr->ctrlr.trid = *trid;
2184 
2185 	if (opts->transport_retry_count > NVME_RDMA_CTRLR_MAX_TRANSPORT_RETRY_COUNT) {
2186 		SPDK_NOTICELOG("transport_retry_count exceeds max value %d, use max value\n",
2187 			       NVME_RDMA_CTRLR_MAX_TRANSPORT_RETRY_COUNT);
2188 		rctrlr->ctrlr.opts.transport_retry_count = NVME_RDMA_CTRLR_MAX_TRANSPORT_RETRY_COUNT;
2189 	}
2190 
2191 	if (opts->transport_ack_timeout > NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT) {
2192 		SPDK_NOTICELOG("transport_ack_timeout exceeds max value %d, use max value\n",
2193 			       NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT);
2194 		rctrlr->ctrlr.opts.transport_ack_timeout = NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT;
2195 	}
2196 
2197 	contexts = rdma_get_devices(NULL);
2198 	if (contexts == NULL) {
2199 		SPDK_ERRLOG("rdma_get_devices() failed: %s (%d)\n", spdk_strerror(errno), errno);
2200 		spdk_free(rctrlr);
2201 		return NULL;
2202 	}
2203 
2204 	i = 0;
2205 	rctrlr->max_sge = NVME_RDMA_MAX_SGL_DESCRIPTORS;
2206 
2207 	while (contexts[i] != NULL) {
2208 		rc = ibv_query_device(contexts[i], &dev_attr);
2209 		if (rc < 0) {
2210 			SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
2211 			rdma_free_devices(contexts);
2212 			spdk_free(rctrlr);
2213 			return NULL;
2214 		}
2215 		rctrlr->max_sge = spdk_min(rctrlr->max_sge, (uint16_t)dev_attr.max_sge);
2216 		i++;
2217 	}
2218 
2219 	rdma_free_devices(contexts);
2220 
2221 	rc = nvme_ctrlr_construct(&rctrlr->ctrlr);
2222 	if (rc != 0) {
2223 		spdk_free(rctrlr);
2224 		return NULL;
2225 	}
2226 
2227 	STAILQ_INIT(&rctrlr->pending_cm_events);
2228 	STAILQ_INIT(&rctrlr->free_cm_events);
2229 	rctrlr->cm_events = spdk_zmalloc(NVME_RDMA_NUM_CM_EVENTS * sizeof(*rctrlr->cm_events), 0, NULL,
2230 					 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
2231 	if (rctrlr->cm_events == NULL) {
2232 		SPDK_ERRLOG("unable to allocate buffers to hold CM events.\n");
2233 		goto destruct_ctrlr;
2234 	}
2235 
2236 	for (i = 0; i < NVME_RDMA_NUM_CM_EVENTS; i++) {
2237 		STAILQ_INSERT_TAIL(&rctrlr->free_cm_events, &rctrlr->cm_events[i], link);
2238 	}
2239 
2240 	rctrlr->cm_channel = rdma_create_event_channel();
2241 	if (rctrlr->cm_channel == NULL) {
2242 		SPDK_ERRLOG("rdma_create_event_channel() failed\n");
2243 		goto destruct_ctrlr;
2244 	}
2245 
2246 	flag = fcntl(rctrlr->cm_channel->fd, F_GETFL);
2247 	if (fcntl(rctrlr->cm_channel->fd, F_SETFL, flag | O_NONBLOCK) < 0) {
2248 		SPDK_ERRLOG("Cannot set event channel to non blocking\n");
2249 		goto destruct_ctrlr;
2250 	}
2251 
2252 	rctrlr->ctrlr.adminq = nvme_rdma_ctrlr_create_qpair(&rctrlr->ctrlr, 0,
2253 			       rctrlr->ctrlr.opts.admin_queue_size, 0,
2254 			       rctrlr->ctrlr.opts.admin_queue_size, false, true);
2255 	if (!rctrlr->ctrlr.adminq) {
2256 		SPDK_ERRLOG("failed to create admin qpair\n");
2257 		goto destruct_ctrlr;
2258 	}
2259 
2260 	if (nvme_ctrlr_add_process(&rctrlr->ctrlr, 0) != 0) {
2261 		SPDK_ERRLOG("nvme_ctrlr_add_process() failed\n");
2262 		goto destruct_ctrlr;
2263 	}
2264 
2265 	SPDK_DEBUGLOG(nvme, "successfully initialized the nvmf ctrlr\n");
2266 	return &rctrlr->ctrlr;
2267 
2268 destruct_ctrlr:
2269 	nvme_ctrlr_destruct(&rctrlr->ctrlr);
2270 	return NULL;
2271 }
2272 
2273 static int
2274 nvme_rdma_ctrlr_destruct(struct spdk_nvme_ctrlr *ctrlr)
2275 {
2276 	struct nvme_rdma_ctrlr *rctrlr = nvme_rdma_ctrlr(ctrlr);
2277 	struct nvme_rdma_cm_event_entry *entry;
2278 
2279 	if (ctrlr->adminq) {
2280 		nvme_rdma_ctrlr_delete_io_qpair(ctrlr, ctrlr->adminq);
2281 	}
2282 
2283 	STAILQ_FOREACH(entry, &rctrlr->pending_cm_events, link) {
2284 		rdma_ack_cm_event(entry->evt);
2285 	}
2286 
2287 	STAILQ_INIT(&rctrlr->free_cm_events);
2288 	STAILQ_INIT(&rctrlr->pending_cm_events);
2289 	spdk_free(rctrlr->cm_events);
2290 
2291 	if (rctrlr->cm_channel) {
2292 		rdma_destroy_event_channel(rctrlr->cm_channel);
2293 		rctrlr->cm_channel = NULL;
2294 	}
2295 
2296 	nvme_ctrlr_destruct_finish(ctrlr);
2297 
2298 	spdk_free(rctrlr);
2299 
2300 	return 0;
2301 }
2302 
2303 static int
2304 nvme_rdma_qpair_submit_request(struct spdk_nvme_qpair *qpair,
2305 			       struct nvme_request *req)
2306 {
2307 	struct nvme_rdma_qpair *rqpair;
2308 	struct spdk_nvme_rdma_req *rdma_req;
2309 	struct ibv_send_wr *wr;
2310 
2311 	rqpair = nvme_rdma_qpair(qpair);
2312 	assert(rqpair != NULL);
2313 	assert(req != NULL);
2314 
2315 	rdma_req = nvme_rdma_req_get(rqpair);
2316 	if (spdk_unlikely(!rdma_req)) {
2317 		if (rqpair->poller) {
2318 			rqpair->poller->stats.queued_requests++;
2319 		}
2320 		/* Inform the upper layer to try again later. */
2321 		return -EAGAIN;
2322 	}
2323 
2324 	if (nvme_rdma_req_init(rqpair, req, rdma_req)) {
2325 		SPDK_ERRLOG("nvme_rdma_req_init() failed\n");
2326 		TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link);
2327 		nvme_rdma_req_put(rqpair, rdma_req);
2328 		return -1;
2329 	}
2330 
2331 	assert(rqpair->current_num_sends < rqpair->num_entries);
2332 	rqpair->current_num_sends++;
2333 
2334 	wr = &rdma_req->send_wr;
2335 	wr->next = NULL;
2336 	nvme_rdma_trace_ibv_sge(wr->sg_list);
2337 
2338 	spdk_rdma_qp_queue_send_wrs(rqpair->rdma_qp, wr);
2339 
2340 	if (!rqpair->delay_cmd_submit) {
2341 		return nvme_rdma_qpair_submit_sends(rqpair);
2342 	}
2343 
2344 	return 0;
2345 }
2346 
2347 static int
2348 nvme_rdma_qpair_reset(struct spdk_nvme_qpair *qpair)
2349 {
2350 	/* Currently, doing nothing here */
2351 	return 0;
2352 }
2353 
2354 static void
2355 nvme_rdma_qpair_abort_reqs(struct spdk_nvme_qpair *qpair, uint32_t dnr)
2356 {
2357 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
2358 	struct spdk_nvme_cpl cpl;
2359 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2360 
2361 	cpl.sqid = qpair->id;
2362 	cpl.status.sc = SPDK_NVME_SC_ABORTED_SQ_DELETION;
2363 	cpl.status.sct = SPDK_NVME_SCT_GENERIC;
2364 	cpl.status.dnr = dnr;
2365 
2366 	/*
2367 	 * We cannot abort requests at the RDMA layer without
2368 	 * unregistering them. If we do, we can still get error
2369 	 * free completions on the shared completion queue.
2370 	 */
2371 	if (nvme_qpair_get_state(qpair) > NVME_QPAIR_DISCONNECTING &&
2372 	    nvme_qpair_get_state(qpair) != NVME_QPAIR_DESTROYING) {
2373 		nvme_ctrlr_disconnect_qpair(qpair);
2374 	}
2375 
2376 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
2377 		nvme_rdma_req_complete(rdma_req, &cpl, true);
2378 	}
2379 }
2380 
2381 static void
2382 nvme_rdma_qpair_check_timeout(struct spdk_nvme_qpair *qpair)
2383 {
2384 	uint64_t t02;
2385 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
2386 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2387 	struct spdk_nvme_ctrlr *ctrlr = qpair->ctrlr;
2388 	struct spdk_nvme_ctrlr_process *active_proc;
2389 
2390 	/* Don't check timeouts during controller initialization. */
2391 	if (ctrlr->state != NVME_CTRLR_STATE_READY) {
2392 		return;
2393 	}
2394 
2395 	if (nvme_qpair_is_admin_queue(qpair)) {
2396 		active_proc = nvme_ctrlr_get_current_process(ctrlr);
2397 	} else {
2398 		active_proc = qpair->active_proc;
2399 	}
2400 
2401 	/* Only check timeouts if the current process has a timeout callback. */
2402 	if (active_proc == NULL || active_proc->timeout_cb_fn == NULL) {
2403 		return;
2404 	}
2405 
2406 	t02 = spdk_get_ticks();
2407 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
2408 		assert(rdma_req->req != NULL);
2409 
2410 		if (nvme_request_check_timeout(rdma_req->req, rdma_req->id, active_proc, t02)) {
2411 			/*
2412 			 * The requests are in order, so as soon as one has not timed out,
2413 			 * stop iterating.
2414 			 */
2415 			break;
2416 		}
2417 	}
2418 }
2419 
2420 static inline void
2421 nvme_rdma_request_ready(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
2422 {
2423 	struct spdk_nvme_rdma_rsp *rdma_rsp = rdma_req->rdma_rsp;
2424 	struct ibv_recv_wr *recv_wr = rdma_rsp->recv_wr;
2425 
2426 	nvme_rdma_req_complete(rdma_req, &rdma_rsp->cpl, true);
2427 
2428 	assert(rqpair->rsps->current_num_recvs < rqpair->rsps->num_entries);
2429 	rqpair->rsps->current_num_recvs++;
2430 
2431 	recv_wr->next = NULL;
2432 	nvme_rdma_trace_ibv_sge(recv_wr->sg_list);
2433 
2434 	if (!rqpair->srq) {
2435 		spdk_rdma_qp_queue_recv_wrs(rqpair->rdma_qp, recv_wr);
2436 	} else {
2437 		spdk_rdma_srq_queue_recv_wrs(rqpair->srq, recv_wr);
2438 	}
2439 }
2440 
2441 #define MAX_COMPLETIONS_PER_POLL 128
2442 
2443 static void
2444 nvme_rdma_fail_qpair(struct spdk_nvme_qpair *qpair, int failure_reason)
2445 {
2446 	if (failure_reason == IBV_WC_RETRY_EXC_ERR) {
2447 		qpair->transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_REMOTE;
2448 	} else if (qpair->transport_failure_reason == SPDK_NVME_QPAIR_FAILURE_NONE) {
2449 		qpair->transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_UNKNOWN;
2450 	}
2451 
2452 	nvme_ctrlr_disconnect_qpair(qpair);
2453 }
2454 
2455 static struct nvme_rdma_qpair *
2456 get_rdma_qpair_from_wc(struct nvme_rdma_poll_group *group, struct ibv_wc *wc)
2457 {
2458 	struct spdk_nvme_qpair *qpair;
2459 	struct nvme_rdma_qpair *rqpair;
2460 
2461 	STAILQ_FOREACH(qpair, &group->group.connected_qpairs, poll_group_stailq) {
2462 		rqpair = nvme_rdma_qpair(qpair);
2463 		if (NVME_RDMA_POLL_GROUP_CHECK_QPN(rqpair, wc->qp_num)) {
2464 			return rqpair;
2465 		}
2466 	}
2467 
2468 	STAILQ_FOREACH(qpair, &group->group.disconnected_qpairs, poll_group_stailq) {
2469 		rqpair = nvme_rdma_qpair(qpair);
2470 		if (NVME_RDMA_POLL_GROUP_CHECK_QPN(rqpair, wc->qp_num)) {
2471 			return rqpair;
2472 		}
2473 	}
2474 
2475 	return NULL;
2476 }
2477 
2478 static inline void
2479 nvme_rdma_log_wc_status(struct nvme_rdma_qpair *rqpair, struct ibv_wc *wc)
2480 {
2481 	struct nvme_rdma_wr *rdma_wr = (struct nvme_rdma_wr *)wc->wr_id;
2482 
2483 	if (wc->status == IBV_WC_WR_FLUSH_ERR) {
2484 		/* If qpair is in ERR state, we will receive completions for all posted and not completed
2485 		 * Work Requests with IBV_WC_WR_FLUSH_ERR status. Don't log an error in that case */
2486 		SPDK_DEBUGLOG(nvme, "WC error, qid %u, qp state %d, request 0x%lu type %d, status: (%d): %s\n",
2487 			      rqpair->qpair.id, rqpair->qpair.state, wc->wr_id, rdma_wr->type, wc->status,
2488 			      ibv_wc_status_str(wc->status));
2489 	} else {
2490 		SPDK_ERRLOG("WC error, qid %u, qp state %d, request 0x%lu type %d, status: (%d): %s\n",
2491 			    rqpair->qpair.id, rqpair->qpair.state, wc->wr_id, rdma_wr->type, wc->status,
2492 			    ibv_wc_status_str(wc->status));
2493 	}
2494 }
2495 
2496 static inline int
2497 nvme_rdma_process_recv_completion(struct nvme_rdma_poller *poller, struct ibv_wc *wc,
2498 				  struct nvme_rdma_wr *rdma_wr)
2499 {
2500 	struct nvme_rdma_qpair		*rqpair;
2501 	struct spdk_nvme_rdma_req	*rdma_req;
2502 	struct spdk_nvme_rdma_rsp	*rdma_rsp;
2503 
2504 	rdma_rsp = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvme_rdma_rsp, rdma_wr);
2505 
2506 	if (poller && poller->srq) {
2507 		rqpair = get_rdma_qpair_from_wc(poller->group, wc);
2508 		if (spdk_unlikely(!rqpair)) {
2509 			/* Since we do not handle the LAST_WQE_REACHED event, we do not know when
2510 			 * a Receive Queue in a QP, that is associated with an SRQ, is flushed.
2511 			 * We may get a WC for a already destroyed QP.
2512 			 *
2513 			 * However, for the SRQ, this is not any error. Hence, just re-post the
2514 			 * receive request to the SRQ to reuse for other QPs, and return 0.
2515 			 */
2516 			spdk_rdma_srq_queue_recv_wrs(poller->srq, rdma_rsp->recv_wr);
2517 			return 0;
2518 		}
2519 	} else {
2520 		rqpair = rdma_rsp->rqpair;
2521 		if (spdk_unlikely(!rqpair)) {
2522 			/* TODO: Fix forceful QP destroy when it is not async mode.
2523 			 * CQ itself did not cause any error. Hence, return 0 for now.
2524 			 */
2525 			SPDK_WARNLOG("QP might be already destroyed.\n");
2526 			return 0;
2527 		}
2528 	}
2529 
2530 
2531 	assert(rqpair->rsps->current_num_recvs > 0);
2532 	rqpair->rsps->current_num_recvs--;
2533 
2534 	if (wc->status) {
2535 		nvme_rdma_log_wc_status(rqpair, wc);
2536 		goto err_wc;
2537 	}
2538 
2539 	SPDK_DEBUGLOG(nvme, "CQ recv completion\n");
2540 
2541 	if (wc->byte_len < sizeof(struct spdk_nvme_cpl)) {
2542 		SPDK_ERRLOG("recv length %u less than expected response size\n", wc->byte_len);
2543 		goto err_wc;
2544 	}
2545 	rdma_req = &rqpair->rdma_reqs[rdma_rsp->cpl.cid];
2546 	rdma_req->completion_flags |= NVME_RDMA_RECV_COMPLETED;
2547 	rdma_req->rdma_rsp = rdma_rsp;
2548 
2549 	if ((rdma_req->completion_flags & NVME_RDMA_SEND_COMPLETED) == 0) {
2550 		return 0;
2551 	}
2552 
2553 	nvme_rdma_request_ready(rqpair, rdma_req);
2554 
2555 	if (!rqpair->delay_cmd_submit) {
2556 		if (spdk_unlikely(nvme_rdma_qpair_submit_recvs(rqpair))) {
2557 			SPDK_ERRLOG("Unable to re-post rx descriptor\n");
2558 			nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2559 			return -ENXIO;
2560 		}
2561 	}
2562 
2563 	rqpair->num_completions++;
2564 	return 1;
2565 
2566 err_wc:
2567 	nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2568 	if (poller && poller->srq) {
2569 		spdk_rdma_srq_queue_recv_wrs(poller->srq, rdma_rsp->recv_wr);
2570 	}
2571 	return -ENXIO;
2572 }
2573 
2574 static inline int
2575 nvme_rdma_process_send_completion(struct nvme_rdma_poller *poller,
2576 				  struct nvme_rdma_qpair *rdma_qpair,
2577 				  struct ibv_wc *wc, struct nvme_rdma_wr *rdma_wr)
2578 {
2579 	struct nvme_rdma_qpair		*rqpair;
2580 	struct spdk_nvme_rdma_req	*rdma_req;
2581 
2582 	rdma_req = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvme_rdma_req, rdma_wr);
2583 	rqpair = rdma_req->req ? nvme_rdma_qpair(rdma_req->req->qpair) : NULL;
2584 	if (!rqpair) {
2585 		rqpair = rdma_qpair != NULL ? rdma_qpair : get_rdma_qpair_from_wc(poller->group, wc);
2586 	}
2587 
2588 	/* If we are flushing I/O */
2589 	if (wc->status) {
2590 		if (!rqpair) {
2591 			/* When poll_group is used, several qpairs share the same CQ and it is possible to
2592 			 * receive a completion with error (e.g. IBV_WC_WR_FLUSH_ERR) for already disconnected qpair
2593 			 * That happens due to qpair is destroyed while there are submitted but not completed send/receive
2594 			 * Work Requests */
2595 			assert(poller);
2596 			return 0;
2597 		}
2598 		assert(rqpair->current_num_sends > 0);
2599 		rqpair->current_num_sends--;
2600 		nvme_rdma_log_wc_status(rqpair, wc);
2601 		nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2602 		if (rdma_req->rdma_rsp && poller && poller->srq) {
2603 			spdk_rdma_srq_queue_recv_wrs(poller->srq, rdma_req->rdma_rsp->recv_wr);
2604 		}
2605 		return -ENXIO;
2606 	}
2607 
2608 	/* We do not support Soft Roce anymore. Other than Soft Roce's bug, we should not
2609 	 * receive a completion without error status after qpair is disconnected/destroyed.
2610 	 */
2611 	if (spdk_unlikely(rdma_req->req == NULL)) {
2612 		/*
2613 		 * Some infiniband drivers do not guarantee the previous assumption after we
2614 		 * received a RDMA_CM_EVENT_DEVICE_REMOVAL event.
2615 		 */
2616 		SPDK_ERRLOG("Received malformed completion: request 0x%"PRIx64" type %d\n", wc->wr_id,
2617 			    rdma_wr->type);
2618 		if (!rqpair || !rqpair->need_destroy) {
2619 			assert(0);
2620 		}
2621 		return -ENXIO;
2622 	}
2623 
2624 	rdma_req->completion_flags |= NVME_RDMA_SEND_COMPLETED;
2625 	assert(rqpair->current_num_sends > 0);
2626 	rqpair->current_num_sends--;
2627 
2628 	if ((rdma_req->completion_flags & NVME_RDMA_RECV_COMPLETED) == 0) {
2629 		return 0;
2630 	}
2631 
2632 	nvme_rdma_request_ready(rqpair, rdma_req);
2633 
2634 	if (!rqpair->delay_cmd_submit) {
2635 		if (spdk_unlikely(nvme_rdma_qpair_submit_recvs(rqpair))) {
2636 			SPDK_ERRLOG("Unable to re-post rx descriptor\n");
2637 			nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2638 			return -ENXIO;
2639 		}
2640 	}
2641 
2642 	rqpair->num_completions++;
2643 	return 1;
2644 }
2645 
2646 static int
2647 nvme_rdma_cq_process_completions(struct ibv_cq *cq, uint32_t batch_size,
2648 				 struct nvme_rdma_poller *poller,
2649 				 struct nvme_rdma_qpair *rdma_qpair,
2650 				 uint64_t *rdma_completions)
2651 {
2652 	struct ibv_wc			wc[MAX_COMPLETIONS_PER_POLL];
2653 	struct nvme_rdma_wr		*rdma_wr;
2654 	uint32_t			reaped = 0;
2655 	int				completion_rc = 0;
2656 	int				rc, _rc, i;
2657 
2658 	rc = ibv_poll_cq(cq, batch_size, wc);
2659 	if (rc < 0) {
2660 		SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
2661 			    errno, spdk_strerror(errno));
2662 		return -ECANCELED;
2663 	} else if (rc == 0) {
2664 		return 0;
2665 	}
2666 
2667 	for (i = 0; i < rc; i++) {
2668 		rdma_wr = (struct nvme_rdma_wr *)wc[i].wr_id;
2669 		switch (rdma_wr->type) {
2670 		case RDMA_WR_TYPE_RECV:
2671 			_rc = nvme_rdma_process_recv_completion(poller, &wc[i], rdma_wr);
2672 			break;
2673 
2674 		case RDMA_WR_TYPE_SEND:
2675 			_rc = nvme_rdma_process_send_completion(poller, rdma_qpair, &wc[i], rdma_wr);
2676 			break;
2677 
2678 		default:
2679 			SPDK_ERRLOG("Received an unexpected opcode on the CQ: %d\n", rdma_wr->type);
2680 			return -ECANCELED;
2681 		}
2682 		if (spdk_likely(_rc >= 0)) {
2683 			reaped += _rc;
2684 		} else {
2685 			completion_rc = _rc;
2686 		}
2687 	}
2688 
2689 	*rdma_completions += rc;
2690 
2691 	if (completion_rc) {
2692 		return completion_rc;
2693 	}
2694 
2695 	return reaped;
2696 }
2697 
2698 static void
2699 dummy_disconnected_qpair_cb(struct spdk_nvme_qpair *qpair, void *poll_group_ctx)
2700 {
2701 
2702 }
2703 
2704 static int
2705 nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
2706 				    uint32_t max_completions)
2707 {
2708 	struct nvme_rdma_qpair		*rqpair = nvme_rdma_qpair(qpair);
2709 	struct nvme_rdma_ctrlr		*rctrlr = nvme_rdma_ctrlr(qpair->ctrlr);
2710 	int				rc = 0, batch_size;
2711 	struct ibv_cq			*cq;
2712 	uint64_t			rdma_completions = 0;
2713 
2714 	/*
2715 	 * This is used during the connection phase. It's possible that we are still reaping error completions
2716 	 * from other qpairs so we need to call the poll group function. Also, it's more correct since the cq
2717 	 * is shared.
2718 	 */
2719 	if (qpair->poll_group != NULL) {
2720 		return spdk_nvme_poll_group_process_completions(qpair->poll_group->group, max_completions,
2721 				dummy_disconnected_qpair_cb);
2722 	}
2723 
2724 	if (max_completions == 0) {
2725 		max_completions = rqpair->num_entries;
2726 	} else {
2727 		max_completions = spdk_min(max_completions, rqpair->num_entries);
2728 	}
2729 
2730 	switch (nvme_qpair_get_state(qpair)) {
2731 	case NVME_QPAIR_CONNECTING:
2732 		rc = nvme_rdma_ctrlr_connect_qpair_poll(qpair->ctrlr, qpair);
2733 		if (rc == 0) {
2734 			/* Once the connection is completed, we can submit queued requests */
2735 			nvme_qpair_resubmit_requests(qpair, rqpair->num_entries);
2736 		} else if (rc != -EAGAIN) {
2737 			SPDK_ERRLOG("Failed to connect rqpair=%p\n", rqpair);
2738 			goto failed;
2739 		} else if (rqpair->state <= NVME_RDMA_QPAIR_STATE_INITIALIZING) {
2740 			return 0;
2741 		}
2742 		break;
2743 
2744 	case NVME_QPAIR_DISCONNECTING:
2745 		nvme_rdma_ctrlr_disconnect_qpair_poll(qpair->ctrlr, qpair);
2746 		return -ENXIO;
2747 
2748 	default:
2749 		if (nvme_qpair_is_admin_queue(qpair)) {
2750 			nvme_rdma_poll_events(rctrlr);
2751 		}
2752 		nvme_rdma_qpair_process_cm_event(rqpair);
2753 		break;
2754 	}
2755 
2756 	if (spdk_unlikely(qpair->transport_failure_reason != SPDK_NVME_QPAIR_FAILURE_NONE)) {
2757 		goto failed;
2758 	}
2759 
2760 	cq = rqpair->cq;
2761 
2762 	rqpair->num_completions = 0;
2763 	do {
2764 		batch_size = spdk_min((max_completions - rqpair->num_completions), MAX_COMPLETIONS_PER_POLL);
2765 		rc = nvme_rdma_cq_process_completions(cq, batch_size, NULL, rqpair, &rdma_completions);
2766 
2767 		if (rc == 0) {
2768 			break;
2769 			/* Handle the case where we fail to poll the cq. */
2770 		} else if (rc == -ECANCELED) {
2771 			goto failed;
2772 		} else if (rc == -ENXIO) {
2773 			return rc;
2774 		}
2775 	} while (rqpair->num_completions < max_completions);
2776 
2777 	if (spdk_unlikely(nvme_rdma_qpair_submit_sends(rqpair) ||
2778 			  nvme_rdma_qpair_submit_recvs(rqpair))) {
2779 		goto failed;
2780 	}
2781 
2782 	if (spdk_unlikely(qpair->ctrlr->timeout_enabled)) {
2783 		nvme_rdma_qpair_check_timeout(qpair);
2784 	}
2785 
2786 	return rqpair->num_completions;
2787 
2788 failed:
2789 	nvme_rdma_fail_qpair(qpair, 0);
2790 	return -ENXIO;
2791 }
2792 
2793 static uint32_t
2794 nvme_rdma_ctrlr_get_max_xfer_size(struct spdk_nvme_ctrlr *ctrlr)
2795 {
2796 	/* max_mr_size by ibv_query_device indicates the largest value that we can
2797 	 * set for a registered memory region.  It is independent from the actual
2798 	 * I/O size and is very likely to be larger than 2 MiB which is the
2799 	 * granularity we currently register memory regions.  Hence return
2800 	 * UINT32_MAX here and let the generic layer use the controller data to
2801 	 * moderate this value.
2802 	 */
2803 	return UINT32_MAX;
2804 }
2805 
2806 static uint16_t
2807 nvme_rdma_ctrlr_get_max_sges(struct spdk_nvme_ctrlr *ctrlr)
2808 {
2809 	struct nvme_rdma_ctrlr *rctrlr = nvme_rdma_ctrlr(ctrlr);
2810 	uint32_t max_sge = rctrlr->max_sge;
2811 	uint32_t max_in_capsule_sge = (ctrlr->cdata.nvmf_specific.ioccsz * 16 -
2812 				       sizeof(struct spdk_nvme_cmd)) /
2813 				      sizeof(struct spdk_nvme_sgl_descriptor);
2814 
2815 	/* Max SGE is limited by capsule size */
2816 	max_sge = spdk_min(max_sge, max_in_capsule_sge);
2817 	/* Max SGE may be limited by MSDBD */
2818 	if (ctrlr->cdata.nvmf_specific.msdbd != 0) {
2819 		max_sge = spdk_min(max_sge, ctrlr->cdata.nvmf_specific.msdbd);
2820 	}
2821 
2822 	/* Max SGE can't be less than 1 */
2823 	max_sge = spdk_max(1, max_sge);
2824 	return max_sge;
2825 }
2826 
2827 static int
2828 nvme_rdma_qpair_iterate_requests(struct spdk_nvme_qpair *qpair,
2829 				 int (*iter_fn)(struct nvme_request *req, void *arg),
2830 				 void *arg)
2831 {
2832 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2833 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
2834 	int rc;
2835 
2836 	assert(iter_fn != NULL);
2837 
2838 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
2839 		assert(rdma_req->req != NULL);
2840 
2841 		rc = iter_fn(rdma_req->req, arg);
2842 		if (rc != 0) {
2843 			return rc;
2844 		}
2845 	}
2846 
2847 	return 0;
2848 }
2849 
2850 static void
2851 nvme_rdma_admin_qpair_abort_aers(struct spdk_nvme_qpair *qpair)
2852 {
2853 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
2854 	struct spdk_nvme_cpl cpl;
2855 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2856 
2857 	cpl.status.sc = SPDK_NVME_SC_ABORTED_SQ_DELETION;
2858 	cpl.status.sct = SPDK_NVME_SCT_GENERIC;
2859 
2860 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
2861 		assert(rdma_req->req != NULL);
2862 
2863 		if (rdma_req->req->cmd.opc != SPDK_NVME_OPC_ASYNC_EVENT_REQUEST) {
2864 			continue;
2865 		}
2866 
2867 		nvme_rdma_req_complete(rdma_req, &cpl, false);
2868 	}
2869 }
2870 
2871 static void
2872 nvme_rdma_poller_destroy(struct nvme_rdma_poller *poller)
2873 {
2874 	if (poller->cq) {
2875 		ibv_destroy_cq(poller->cq);
2876 	}
2877 	if (poller->rsps) {
2878 		nvme_rdma_free_rsps(poller->rsps);
2879 	}
2880 	if (poller->srq) {
2881 		spdk_rdma_srq_destroy(poller->srq);
2882 	}
2883 	if (poller->mr_map) {
2884 		spdk_rdma_free_mem_map(&poller->mr_map);
2885 	}
2886 	if (poller->pd) {
2887 		spdk_rdma_put_pd(poller->pd);
2888 	}
2889 	free(poller);
2890 }
2891 
2892 static struct nvme_rdma_poller *
2893 nvme_rdma_poller_create(struct nvme_rdma_poll_group *group, struct ibv_context *ctx)
2894 {
2895 	struct nvme_rdma_poller *poller;
2896 	struct ibv_device_attr dev_attr;
2897 	struct spdk_rdma_srq_init_attr srq_init_attr = {};
2898 	struct nvme_rdma_rsp_opts opts;
2899 	int num_cqe;
2900 	int rc;
2901 
2902 	poller = calloc(1, sizeof(*poller));
2903 	if (poller == NULL) {
2904 		SPDK_ERRLOG("Unable to allocate poller.\n");
2905 		return NULL;
2906 	}
2907 
2908 	poller->group = group;
2909 	poller->device = ctx;
2910 
2911 	if (g_spdk_nvme_transport_opts.rdma_srq_size != 0) {
2912 		rc = ibv_query_device(ctx, &dev_attr);
2913 		if (rc) {
2914 			SPDK_ERRLOG("Unable to query RDMA device.\n");
2915 			goto fail;
2916 		}
2917 
2918 		poller->pd = spdk_rdma_get_pd(ctx);
2919 		if (poller->pd == NULL) {
2920 			SPDK_ERRLOG("Unable to get PD.\n");
2921 			goto fail;
2922 		}
2923 
2924 		poller->mr_map = spdk_rdma_create_mem_map(poller->pd, &g_nvme_hooks,
2925 				 SPDK_RDMA_MEMORY_MAP_ROLE_INITIATOR);
2926 		if (poller->mr_map == NULL) {
2927 			SPDK_ERRLOG("Unable to create memory map.\n");
2928 			goto fail;
2929 		}
2930 
2931 		srq_init_attr.stats = &poller->stats.rdma_stats.recv;
2932 		srq_init_attr.pd = poller->pd;
2933 		srq_init_attr.srq_init_attr.attr.max_wr = spdk_min((uint32_t)dev_attr.max_srq_wr,
2934 				g_spdk_nvme_transport_opts.rdma_srq_size);
2935 		srq_init_attr.srq_init_attr.attr.max_sge = spdk_min(dev_attr.max_sge,
2936 				NVME_RDMA_DEFAULT_RX_SGE);
2937 
2938 		poller->srq = spdk_rdma_srq_create(&srq_init_attr);
2939 		if (poller->srq == NULL) {
2940 			SPDK_ERRLOG("Unable to create SRQ.\n");
2941 			goto fail;
2942 		}
2943 
2944 		opts.num_entries = g_spdk_nvme_transport_opts.rdma_srq_size;
2945 		opts.rqpair = NULL;
2946 		opts.srq = poller->srq;
2947 		opts.mr_map = poller->mr_map;
2948 
2949 		poller->rsps = nvme_rdma_create_rsps(&opts);
2950 		if (poller->rsps == NULL) {
2951 			SPDK_ERRLOG("Unable to create poller RDMA responses.\n");
2952 			goto fail;
2953 		}
2954 
2955 		rc = nvme_rdma_poller_submit_recvs(poller);
2956 		if (rc) {
2957 			SPDK_ERRLOG("Unable to submit poller RDMA responses.\n");
2958 			goto fail;
2959 		}
2960 
2961 		/*
2962 		 * When using an srq, fix the size of the completion queue at startup.
2963 		 * The initiator sends only send and recv WRs. Hence, the multiplier is 2.
2964 		 * (The target sends also data WRs. Hence, the multiplier is 3.)
2965 		 */
2966 		num_cqe = g_spdk_nvme_transport_opts.rdma_srq_size * 2;
2967 	} else {
2968 		num_cqe = DEFAULT_NVME_RDMA_CQ_SIZE;
2969 	}
2970 
2971 	poller->cq = ibv_create_cq(poller->device, num_cqe, group, NULL, 0);
2972 
2973 	if (poller->cq == NULL) {
2974 		SPDK_ERRLOG("Unable to create CQ, errno %d.\n", errno);
2975 		goto fail;
2976 	}
2977 
2978 	STAILQ_INSERT_HEAD(&group->pollers, poller, link);
2979 	group->num_pollers++;
2980 	poller->current_num_wc = num_cqe;
2981 	poller->required_num_wc = 0;
2982 	return poller;
2983 
2984 fail:
2985 	nvme_rdma_poller_destroy(poller);
2986 	return NULL;
2987 }
2988 
2989 static void
2990 nvme_rdma_poll_group_free_pollers(struct nvme_rdma_poll_group *group)
2991 {
2992 	struct nvme_rdma_poller	*poller, *tmp_poller;
2993 
2994 	STAILQ_FOREACH_SAFE(poller, &group->pollers, link, tmp_poller) {
2995 		assert(poller->refcnt == 0);
2996 		if (poller->refcnt) {
2997 			SPDK_WARNLOG("Destroying poller with non-zero ref count: poller %p, refcnt %d\n",
2998 				     poller, poller->refcnt);
2999 		}
3000 
3001 		STAILQ_REMOVE(&group->pollers, poller, nvme_rdma_poller, link);
3002 		nvme_rdma_poller_destroy(poller);
3003 	}
3004 }
3005 
3006 static struct nvme_rdma_poller *
3007 nvme_rdma_poll_group_get_poller(struct nvme_rdma_poll_group *group, struct ibv_context *device)
3008 {
3009 	struct nvme_rdma_poller *poller = NULL;
3010 
3011 	STAILQ_FOREACH(poller, &group->pollers, link) {
3012 		if (poller->device == device) {
3013 			break;
3014 		}
3015 	}
3016 
3017 	if (!poller) {
3018 		poller = nvme_rdma_poller_create(group, device);
3019 		if (!poller) {
3020 			SPDK_ERRLOG("Failed to create a poller for device %p\n", device);
3021 			return NULL;
3022 		}
3023 	}
3024 
3025 	poller->refcnt++;
3026 	return poller;
3027 }
3028 
3029 static void
3030 nvme_rdma_poll_group_put_poller(struct nvme_rdma_poll_group *group, struct nvme_rdma_poller *poller)
3031 {
3032 	assert(poller->refcnt > 0);
3033 	if (--poller->refcnt == 0) {
3034 		STAILQ_REMOVE(&group->pollers, poller, nvme_rdma_poller, link);
3035 		group->num_pollers--;
3036 		nvme_rdma_poller_destroy(poller);
3037 	}
3038 }
3039 
3040 static struct spdk_nvme_transport_poll_group *
3041 nvme_rdma_poll_group_create(void)
3042 {
3043 	struct nvme_rdma_poll_group	*group;
3044 
3045 	group = calloc(1, sizeof(*group));
3046 	if (group == NULL) {
3047 		SPDK_ERRLOG("Unable to allocate poll group.\n");
3048 		return NULL;
3049 	}
3050 
3051 	STAILQ_INIT(&group->pollers);
3052 	return &group->group;
3053 }
3054 
3055 static int
3056 nvme_rdma_poll_group_connect_qpair(struct spdk_nvme_qpair *qpair)
3057 {
3058 	return 0;
3059 }
3060 
3061 static int
3062 nvme_rdma_poll_group_disconnect_qpair(struct spdk_nvme_qpair *qpair)
3063 {
3064 	return 0;
3065 }
3066 
3067 static int
3068 nvme_rdma_poll_group_add(struct spdk_nvme_transport_poll_group *tgroup,
3069 			 struct spdk_nvme_qpair *qpair)
3070 {
3071 	return 0;
3072 }
3073 
3074 static int
3075 nvme_rdma_poll_group_remove(struct spdk_nvme_transport_poll_group *tgroup,
3076 			    struct spdk_nvme_qpair *qpair)
3077 {
3078 	return 0;
3079 }
3080 
3081 static int64_t
3082 nvme_rdma_poll_group_process_completions(struct spdk_nvme_transport_poll_group *tgroup,
3083 		uint32_t completions_per_qpair, spdk_nvme_disconnected_qpair_cb disconnected_qpair_cb)
3084 {
3085 	struct spdk_nvme_qpair			*qpair, *tmp_qpair;
3086 	struct nvme_rdma_qpair			*rqpair;
3087 	struct nvme_rdma_poll_group		*group;
3088 	struct nvme_rdma_poller			*poller;
3089 	int					num_qpairs = 0, batch_size, rc, rc2 = 0;
3090 	int64_t					total_completions = 0;
3091 	uint64_t				completions_allowed = 0;
3092 	uint64_t				completions_per_poller = 0;
3093 	uint64_t				poller_completions = 0;
3094 	uint64_t				rdma_completions;
3095 
3096 	if (completions_per_qpair == 0) {
3097 		completions_per_qpair = MAX_COMPLETIONS_PER_POLL;
3098 	}
3099 
3100 	group = nvme_rdma_poll_group(tgroup);
3101 	STAILQ_FOREACH_SAFE(qpair, &tgroup->disconnected_qpairs, poll_group_stailq, tmp_qpair) {
3102 		rc = nvme_rdma_ctrlr_disconnect_qpair_poll(qpair->ctrlr, qpair);
3103 		if (rc == 0) {
3104 			disconnected_qpair_cb(qpair, tgroup->group->ctx);
3105 		}
3106 	}
3107 
3108 	STAILQ_FOREACH_SAFE(qpair, &tgroup->connected_qpairs, poll_group_stailq, tmp_qpair) {
3109 		rqpair = nvme_rdma_qpair(qpair);
3110 		rqpair->num_completions = 0;
3111 
3112 		if (spdk_unlikely(nvme_qpair_get_state(qpair) == NVME_QPAIR_CONNECTING)) {
3113 			rc = nvme_rdma_ctrlr_connect_qpair_poll(qpair->ctrlr, qpair);
3114 			if (rc == 0) {
3115 				/* Once the connection is completed, we can submit queued requests */
3116 				nvme_qpair_resubmit_requests(qpair, rqpair->num_entries);
3117 			} else if (rc != -EAGAIN) {
3118 				SPDK_ERRLOG("Failed to connect rqpair=%p\n", rqpair);
3119 				nvme_rdma_fail_qpair(qpair, 0);
3120 				continue;
3121 			}
3122 		} else {
3123 			nvme_rdma_qpair_process_cm_event(rqpair);
3124 		}
3125 
3126 		if (spdk_unlikely(qpair->transport_failure_reason != SPDK_NVME_QPAIR_FAILURE_NONE)) {
3127 			rc2 = -ENXIO;
3128 			nvme_rdma_fail_qpair(qpair, 0);
3129 			continue;
3130 		}
3131 		num_qpairs++;
3132 	}
3133 
3134 	completions_allowed = completions_per_qpair * num_qpairs;
3135 	if (group->num_pollers) {
3136 		completions_per_poller = spdk_max(completions_allowed / group->num_pollers, 1);
3137 	}
3138 
3139 	STAILQ_FOREACH(poller, &group->pollers, link) {
3140 		poller_completions = 0;
3141 		rdma_completions = 0;
3142 		do {
3143 			poller->stats.polls++;
3144 			batch_size = spdk_min((completions_per_poller - poller_completions), MAX_COMPLETIONS_PER_POLL);
3145 			rc = nvme_rdma_cq_process_completions(poller->cq, batch_size, poller, NULL, &rdma_completions);
3146 			if (rc <= 0) {
3147 				if (rc == -ECANCELED) {
3148 					return -EIO;
3149 				} else if (rc == 0) {
3150 					poller->stats.idle_polls++;
3151 				}
3152 				break;
3153 			}
3154 
3155 			poller_completions += rc;
3156 		} while (poller_completions < completions_per_poller);
3157 		total_completions += poller_completions;
3158 		poller->stats.completions += rdma_completions;
3159 		if (poller->srq) {
3160 			nvme_rdma_poller_submit_recvs(poller);
3161 		}
3162 	}
3163 
3164 	STAILQ_FOREACH_SAFE(qpair, &tgroup->connected_qpairs, poll_group_stailq, tmp_qpair) {
3165 		rqpair = nvme_rdma_qpair(qpair);
3166 
3167 		if (spdk_unlikely(rqpair->state <= NVME_RDMA_QPAIR_STATE_INITIALIZING)) {
3168 			continue;
3169 		}
3170 
3171 		if (spdk_unlikely(qpair->ctrlr->timeout_enabled)) {
3172 			nvme_rdma_qpair_check_timeout(qpair);
3173 		}
3174 
3175 		nvme_rdma_qpair_submit_sends(rqpair);
3176 		if (!rqpair->srq) {
3177 			nvme_rdma_qpair_submit_recvs(rqpair);
3178 		}
3179 		if (rqpair->num_completions > 0) {
3180 			nvme_qpair_resubmit_requests(qpair, rqpair->num_completions);
3181 		}
3182 	}
3183 
3184 	return rc2 != 0 ? rc2 : total_completions;
3185 }
3186 
3187 static int
3188 nvme_rdma_poll_group_destroy(struct spdk_nvme_transport_poll_group *tgroup)
3189 {
3190 	struct nvme_rdma_poll_group	*group = nvme_rdma_poll_group(tgroup);
3191 
3192 	if (!STAILQ_EMPTY(&tgroup->connected_qpairs) || !STAILQ_EMPTY(&tgroup->disconnected_qpairs)) {
3193 		return -EBUSY;
3194 	}
3195 
3196 	nvme_rdma_poll_group_free_pollers(group);
3197 	free(group);
3198 
3199 	return 0;
3200 }
3201 
3202 static int
3203 nvme_rdma_poll_group_get_stats(struct spdk_nvme_transport_poll_group *tgroup,
3204 			       struct spdk_nvme_transport_poll_group_stat **_stats)
3205 {
3206 	struct nvme_rdma_poll_group *group;
3207 	struct spdk_nvme_transport_poll_group_stat *stats;
3208 	struct spdk_nvme_rdma_device_stat *device_stat;
3209 	struct nvme_rdma_poller *poller;
3210 	uint32_t i = 0;
3211 
3212 	if (tgroup == NULL || _stats == NULL) {
3213 		SPDK_ERRLOG("Invalid stats or group pointer\n");
3214 		return -EINVAL;
3215 	}
3216 
3217 	group = nvme_rdma_poll_group(tgroup);
3218 	stats = calloc(1, sizeof(*stats));
3219 	if (!stats) {
3220 		SPDK_ERRLOG("Can't allocate memory for RDMA stats\n");
3221 		return -ENOMEM;
3222 	}
3223 	stats->trtype = SPDK_NVME_TRANSPORT_RDMA;
3224 	stats->rdma.num_devices = group->num_pollers;
3225 
3226 	if (stats->rdma.num_devices == 0) {
3227 		*_stats = stats;
3228 		return 0;
3229 	}
3230 
3231 	stats->rdma.device_stats = calloc(stats->rdma.num_devices, sizeof(*stats->rdma.device_stats));
3232 	if (!stats->rdma.device_stats) {
3233 		SPDK_ERRLOG("Can't allocate memory for RDMA device stats\n");
3234 		free(stats);
3235 		return -ENOMEM;
3236 	}
3237 
3238 	STAILQ_FOREACH(poller, &group->pollers, link) {
3239 		device_stat = &stats->rdma.device_stats[i];
3240 		device_stat->name = poller->device->device->name;
3241 		device_stat->polls = poller->stats.polls;
3242 		device_stat->idle_polls = poller->stats.idle_polls;
3243 		device_stat->completions = poller->stats.completions;
3244 		device_stat->queued_requests = poller->stats.queued_requests;
3245 		device_stat->total_send_wrs = poller->stats.rdma_stats.send.num_submitted_wrs;
3246 		device_stat->send_doorbell_updates = poller->stats.rdma_stats.send.doorbell_updates;
3247 		device_stat->total_recv_wrs = poller->stats.rdma_stats.recv.num_submitted_wrs;
3248 		device_stat->recv_doorbell_updates = poller->stats.rdma_stats.recv.doorbell_updates;
3249 		i++;
3250 	}
3251 
3252 	*_stats = stats;
3253 
3254 	return 0;
3255 }
3256 
3257 static void
3258 nvme_rdma_poll_group_free_stats(struct spdk_nvme_transport_poll_group *tgroup,
3259 				struct spdk_nvme_transport_poll_group_stat *stats)
3260 {
3261 	if (stats) {
3262 		free(stats->rdma.device_stats);
3263 	}
3264 	free(stats);
3265 }
3266 
3267 static int
3268 nvme_rdma_ctrlr_get_memory_domains(const struct spdk_nvme_ctrlr *ctrlr,
3269 				   struct spdk_memory_domain **domains, int array_size)
3270 {
3271 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(ctrlr->adminq);
3272 
3273 	if (domains && array_size > 0) {
3274 		domains[0] = rqpair->memory_domain->domain;
3275 	}
3276 
3277 	return 1;
3278 }
3279 
3280 void
3281 spdk_nvme_rdma_init_hooks(struct spdk_nvme_rdma_hooks *hooks)
3282 {
3283 	g_nvme_hooks = *hooks;
3284 }
3285 
3286 const struct spdk_nvme_transport_ops rdma_ops = {
3287 	.name = "RDMA",
3288 	.type = SPDK_NVME_TRANSPORT_RDMA,
3289 	.ctrlr_construct = nvme_rdma_ctrlr_construct,
3290 	.ctrlr_scan = nvme_fabric_ctrlr_scan,
3291 	.ctrlr_destruct = nvme_rdma_ctrlr_destruct,
3292 	.ctrlr_enable = nvme_rdma_ctrlr_enable,
3293 
3294 	.ctrlr_set_reg_4 = nvme_fabric_ctrlr_set_reg_4,
3295 	.ctrlr_set_reg_8 = nvme_fabric_ctrlr_set_reg_8,
3296 	.ctrlr_get_reg_4 = nvme_fabric_ctrlr_get_reg_4,
3297 	.ctrlr_get_reg_8 = nvme_fabric_ctrlr_get_reg_8,
3298 	.ctrlr_set_reg_4_async = nvme_fabric_ctrlr_set_reg_4_async,
3299 	.ctrlr_set_reg_8_async = nvme_fabric_ctrlr_set_reg_8_async,
3300 	.ctrlr_get_reg_4_async = nvme_fabric_ctrlr_get_reg_4_async,
3301 	.ctrlr_get_reg_8_async = nvme_fabric_ctrlr_get_reg_8_async,
3302 
3303 	.ctrlr_get_max_xfer_size = nvme_rdma_ctrlr_get_max_xfer_size,
3304 	.ctrlr_get_max_sges = nvme_rdma_ctrlr_get_max_sges,
3305 
3306 	.ctrlr_create_io_qpair = nvme_rdma_ctrlr_create_io_qpair,
3307 	.ctrlr_delete_io_qpair = nvme_rdma_ctrlr_delete_io_qpair,
3308 	.ctrlr_connect_qpair = nvme_rdma_ctrlr_connect_qpair,
3309 	.ctrlr_disconnect_qpair = nvme_rdma_ctrlr_disconnect_qpair,
3310 
3311 	.ctrlr_get_memory_domains = nvme_rdma_ctrlr_get_memory_domains,
3312 
3313 	.qpair_abort_reqs = nvme_rdma_qpair_abort_reqs,
3314 	.qpair_reset = nvme_rdma_qpair_reset,
3315 	.qpair_submit_request = nvme_rdma_qpair_submit_request,
3316 	.qpair_process_completions = nvme_rdma_qpair_process_completions,
3317 	.qpair_iterate_requests = nvme_rdma_qpair_iterate_requests,
3318 	.admin_qpair_abort_aers = nvme_rdma_admin_qpair_abort_aers,
3319 
3320 	.poll_group_create = nvme_rdma_poll_group_create,
3321 	.poll_group_connect_qpair = nvme_rdma_poll_group_connect_qpair,
3322 	.poll_group_disconnect_qpair = nvme_rdma_poll_group_disconnect_qpair,
3323 	.poll_group_add = nvme_rdma_poll_group_add,
3324 	.poll_group_remove = nvme_rdma_poll_group_remove,
3325 	.poll_group_process_completions = nvme_rdma_poll_group_process_completions,
3326 	.poll_group_destroy = nvme_rdma_poll_group_destroy,
3327 	.poll_group_get_stats = nvme_rdma_poll_group_get_stats,
3328 	.poll_group_free_stats = nvme_rdma_poll_group_free_stats,
3329 };
3330 
3331 SPDK_NVME_TRANSPORT_REGISTER(rdma, &rdma_ops);
3332