xref: /spdk/lib/nvme/nvme_rdma.c (revision 4790c4ef7e626e751480fd9c4be4bb730aaab4d7)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation. All rights reserved.
3  *   Copyright (c) 2019-2021 Mellanox Technologies LTD. All rights reserved.
4  *   Copyright (c) 2021-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 /*
8  * NVMe over RDMA transport
9  */
10 
11 #include "spdk/stdinc.h"
12 
13 #include "spdk/assert.h"
14 #include "spdk/dma.h"
15 #include "spdk/log.h"
16 #include "spdk/trace.h"
17 #include "spdk/queue.h"
18 #include "spdk/nvme.h"
19 #include "spdk/nvmf_spec.h"
20 #include "spdk/string.h"
21 #include "spdk/endian.h"
22 #include "spdk/likely.h"
23 #include "spdk/config.h"
24 
25 #include "nvme_internal.h"
26 #include "spdk_internal/rdma.h"
27 
28 #define NVME_RDMA_TIME_OUT_IN_MS 2000
29 #define NVME_RDMA_RW_BUFFER_SIZE 131072
30 
31 /*
32  * NVME RDMA qpair Resource Defaults
33  */
34 #define NVME_RDMA_DEFAULT_TX_SGE		2
35 #define NVME_RDMA_DEFAULT_RX_SGE		1
36 
37 /* Max number of NVMe-oF SGL descriptors supported by the host */
38 #define NVME_RDMA_MAX_SGL_DESCRIPTORS		16
39 
40 /* number of STAILQ entries for holding pending RDMA CM events. */
41 #define NVME_RDMA_NUM_CM_EVENTS			256
42 
43 /* CM event processing timeout */
44 #define NVME_RDMA_QPAIR_CM_EVENT_TIMEOUT_US	1000000
45 
46 /* The default size for a shared rdma completion queue. */
47 #define DEFAULT_NVME_RDMA_CQ_SIZE		4096
48 
49 /*
50  * In the special case of a stale connection we don't expose a mechanism
51  * for the user to retry the connection so we need to handle it internally.
52  */
53 #define NVME_RDMA_STALE_CONN_RETRY_MAX		5
54 #define NVME_RDMA_STALE_CONN_RETRY_DELAY_US	10000
55 
56 /*
57  * Maximum value of transport_retry_count used by RDMA controller
58  */
59 #define NVME_RDMA_CTRLR_MAX_TRANSPORT_RETRY_COUNT	7
60 
61 /*
62  * Maximum value of transport_ack_timeout used by RDMA controller
63  */
64 #define NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT	31
65 
66 /*
67  * Number of microseconds to wait until the lingering qpair becomes quiet.
68  */
69 #define NVME_RDMA_DISCONNECTED_QPAIR_TIMEOUT_US	1000000ull
70 
71 /*
72  * The max length of keyed SGL data block (3 bytes)
73  */
74 #define NVME_RDMA_MAX_KEYED_SGL_LENGTH ((1u << 24u) - 1)
75 
76 #define WC_PER_QPAIR(queue_depth)	(queue_depth * 2)
77 
78 #define NVME_RDMA_POLL_GROUP_CHECK_QPN(_rqpair, qpn)				\
79 	((_rqpair)->rdma_qp && (_rqpair)->rdma_qp->qp->qp_num == (qpn))	\
80 
81 struct nvme_rdma_memory_domain {
82 	TAILQ_ENTRY(nvme_rdma_memory_domain) link;
83 	uint32_t ref;
84 	struct ibv_pd *pd;
85 	struct spdk_memory_domain *domain;
86 	struct spdk_memory_domain_rdma_ctx rdma_ctx;
87 };
88 
89 enum nvme_rdma_wr_type {
90 	RDMA_WR_TYPE_RECV,
91 	RDMA_WR_TYPE_SEND,
92 };
93 
94 struct nvme_rdma_wr {
95 	/* Using this instead of the enum allows this struct to only occupy one byte. */
96 	uint8_t	type;
97 };
98 
99 struct spdk_nvmf_cmd {
100 	struct spdk_nvme_cmd cmd;
101 	struct spdk_nvme_sgl_descriptor sgl[NVME_RDMA_MAX_SGL_DESCRIPTORS];
102 };
103 
104 struct spdk_nvme_rdma_hooks g_nvme_hooks = {};
105 
106 /* STAILQ wrapper for cm events. */
107 struct nvme_rdma_cm_event_entry {
108 	struct rdma_cm_event			*evt;
109 	STAILQ_ENTRY(nvme_rdma_cm_event_entry)	link;
110 };
111 
112 /* NVMe RDMA transport extensions for spdk_nvme_ctrlr */
113 struct nvme_rdma_ctrlr {
114 	struct spdk_nvme_ctrlr			ctrlr;
115 
116 	uint16_t				max_sge;
117 
118 	struct rdma_event_channel		*cm_channel;
119 
120 	STAILQ_HEAD(, nvme_rdma_cm_event_entry)	pending_cm_events;
121 
122 	STAILQ_HEAD(, nvme_rdma_cm_event_entry)	free_cm_events;
123 
124 	struct nvme_rdma_cm_event_entry		*cm_events;
125 };
126 
127 struct nvme_rdma_poller_stats {
128 	uint64_t polls;
129 	uint64_t idle_polls;
130 	uint64_t queued_requests;
131 	uint64_t completions;
132 	struct spdk_rdma_qp_stats rdma_stats;
133 };
134 
135 struct nvme_rdma_poll_group;
136 struct nvme_rdma_rsps;
137 
138 struct nvme_rdma_poller {
139 	struct ibv_context		*device;
140 	struct ibv_cq			*cq;
141 	struct spdk_rdma_srq		*srq;
142 	struct nvme_rdma_rsps		*rsps;
143 	struct ibv_pd			*pd;
144 	struct spdk_rdma_mem_map	*mr_map;
145 	uint32_t			refcnt;
146 	int				required_num_wc;
147 	int				current_num_wc;
148 	struct nvme_rdma_poller_stats	stats;
149 	struct nvme_rdma_poll_group	*group;
150 	STAILQ_ENTRY(nvme_rdma_poller)	link;
151 };
152 
153 struct nvme_rdma_poll_group {
154 	struct spdk_nvme_transport_poll_group		group;
155 	STAILQ_HEAD(, nvme_rdma_poller)			pollers;
156 	uint32_t					num_pollers;
157 };
158 
159 enum nvme_rdma_qpair_state {
160 	NVME_RDMA_QPAIR_STATE_INVALID = 0,
161 	NVME_RDMA_QPAIR_STATE_STALE_CONN,
162 	NVME_RDMA_QPAIR_STATE_INITIALIZING,
163 	NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_SEND,
164 	NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_POLL,
165 	NVME_RDMA_QPAIR_STATE_RUNNING,
166 	NVME_RDMA_QPAIR_STATE_EXITING,
167 	NVME_RDMA_QPAIR_STATE_LINGERING,
168 	NVME_RDMA_QPAIR_STATE_EXITED,
169 };
170 
171 struct nvme_rdma_qpair;
172 
173 typedef int (*nvme_rdma_cm_event_cb)(struct nvme_rdma_qpair *rqpair, int ret);
174 
175 struct nvme_rdma_rsp_opts {
176 	uint16_t				num_entries;
177 	struct nvme_rdma_qpair			*rqpair;
178 	struct spdk_rdma_srq			*srq;
179 	struct spdk_rdma_mem_map		*mr_map;
180 };
181 
182 struct nvme_rdma_rsps {
183 	/* Parallel arrays of response buffers + response SGLs of size num_entries */
184 	struct ibv_sge				*rsp_sgls;
185 	struct spdk_nvme_rdma_rsp		*rsps;
186 
187 	struct ibv_recv_wr			*rsp_recv_wrs;
188 
189 	/* Count of outstanding recv objects */
190 	uint16_t				current_num_recvs;
191 
192 	uint16_t				num_entries;
193 };
194 
195 /* NVMe RDMA qpair extensions for spdk_nvme_qpair */
196 struct nvme_rdma_qpair {
197 	struct spdk_nvme_qpair			qpair;
198 
199 	struct spdk_rdma_qp			*rdma_qp;
200 	struct rdma_cm_id			*cm_id;
201 	struct ibv_cq				*cq;
202 	struct spdk_rdma_srq			*srq;
203 
204 	struct	spdk_nvme_rdma_req		*rdma_reqs;
205 
206 	uint32_t				max_send_sge;
207 
208 	uint32_t				max_recv_sge;
209 
210 	uint16_t				num_entries;
211 
212 	bool					delay_cmd_submit;
213 
214 	uint32_t				num_completions;
215 
216 	struct nvme_rdma_rsps			*rsps;
217 
218 	/*
219 	 * Array of num_entries NVMe commands registered as RDMA message buffers.
220 	 * Indexed by rdma_req->id.
221 	 */
222 	struct spdk_nvmf_cmd			*cmds;
223 
224 	struct spdk_rdma_mem_map		*mr_map;
225 
226 	TAILQ_HEAD(, spdk_nvme_rdma_req)	free_reqs;
227 	TAILQ_HEAD(, spdk_nvme_rdma_req)	outstanding_reqs;
228 
229 	struct nvme_rdma_memory_domain		*memory_domain;
230 
231 	/* Count of outstanding send objects */
232 	uint16_t				current_num_sends;
233 
234 	/* Placed at the end of the struct since it is not used frequently */
235 	struct rdma_cm_event			*evt;
236 	struct nvme_rdma_poller			*poller;
237 
238 	uint64_t				evt_timeout_ticks;
239 	nvme_rdma_cm_event_cb			evt_cb;
240 	enum rdma_cm_event_type			expected_evt_type;
241 
242 	enum nvme_rdma_qpair_state		state;
243 
244 	bool					in_connect_poll;
245 
246 	uint8_t					stale_conn_retry_count;
247 	bool					need_destroy;
248 };
249 
250 enum NVME_RDMA_COMPLETION_FLAGS {
251 	NVME_RDMA_SEND_COMPLETED = 1u << 0,
252 	NVME_RDMA_RECV_COMPLETED = 1u << 1,
253 };
254 
255 struct spdk_nvme_rdma_req {
256 	uint16_t				id;
257 	uint16_t				completion_flags: 2;
258 	uint16_t				reserved: 14;
259 	/* if completion of RDMA_RECV received before RDMA_SEND, we will complete nvme request
260 	 * during processing of RDMA_SEND. To complete the request we must know the response
261 	 * received in RDMA_RECV, so store it in this field */
262 	struct spdk_nvme_rdma_rsp		*rdma_rsp;
263 
264 	struct nvme_rdma_wr			rdma_wr;
265 
266 	struct ibv_send_wr			send_wr;
267 
268 	struct nvme_request			*req;
269 
270 	struct ibv_sge				send_sgl[NVME_RDMA_DEFAULT_TX_SGE];
271 
272 	TAILQ_ENTRY(spdk_nvme_rdma_req)		link;
273 };
274 
275 struct spdk_nvme_rdma_rsp {
276 	struct spdk_nvme_cpl	cpl;
277 	struct nvme_rdma_qpair	*rqpair;
278 	struct ibv_recv_wr	*recv_wr;
279 	struct nvme_rdma_wr	rdma_wr;
280 };
281 
282 struct nvme_rdma_memory_translation_ctx {
283 	void *addr;
284 	size_t length;
285 	uint32_t lkey;
286 	uint32_t rkey;
287 };
288 
289 static const char *rdma_cm_event_str[] = {
290 	"RDMA_CM_EVENT_ADDR_RESOLVED",
291 	"RDMA_CM_EVENT_ADDR_ERROR",
292 	"RDMA_CM_EVENT_ROUTE_RESOLVED",
293 	"RDMA_CM_EVENT_ROUTE_ERROR",
294 	"RDMA_CM_EVENT_CONNECT_REQUEST",
295 	"RDMA_CM_EVENT_CONNECT_RESPONSE",
296 	"RDMA_CM_EVENT_CONNECT_ERROR",
297 	"RDMA_CM_EVENT_UNREACHABLE",
298 	"RDMA_CM_EVENT_REJECTED",
299 	"RDMA_CM_EVENT_ESTABLISHED",
300 	"RDMA_CM_EVENT_DISCONNECTED",
301 	"RDMA_CM_EVENT_DEVICE_REMOVAL",
302 	"RDMA_CM_EVENT_MULTICAST_JOIN",
303 	"RDMA_CM_EVENT_MULTICAST_ERROR",
304 	"RDMA_CM_EVENT_ADDR_CHANGE",
305 	"RDMA_CM_EVENT_TIMEWAIT_EXIT"
306 };
307 
308 static struct nvme_rdma_poller *nvme_rdma_poll_group_get_poller(struct nvme_rdma_poll_group *group,
309 		struct ibv_context *device);
310 static void nvme_rdma_poll_group_put_poller(struct nvme_rdma_poll_group *group,
311 		struct nvme_rdma_poller *poller);
312 
313 static TAILQ_HEAD(, nvme_rdma_memory_domain) g_memory_domains = TAILQ_HEAD_INITIALIZER(
314 			g_memory_domains);
315 static pthread_mutex_t g_memory_domains_lock = PTHREAD_MUTEX_INITIALIZER;
316 
317 static struct nvme_rdma_memory_domain *
318 nvme_rdma_get_memory_domain(struct ibv_pd *pd)
319 {
320 	struct nvme_rdma_memory_domain *domain = NULL;
321 	struct spdk_memory_domain_ctx ctx;
322 	int rc;
323 
324 	pthread_mutex_lock(&g_memory_domains_lock);
325 
326 	TAILQ_FOREACH(domain, &g_memory_domains, link) {
327 		if (domain->pd == pd) {
328 			domain->ref++;
329 			pthread_mutex_unlock(&g_memory_domains_lock);
330 			return domain;
331 		}
332 	}
333 
334 	domain = calloc(1, sizeof(*domain));
335 	if (!domain) {
336 		SPDK_ERRLOG("Memory allocation failed\n");
337 		pthread_mutex_unlock(&g_memory_domains_lock);
338 		return NULL;
339 	}
340 
341 	domain->rdma_ctx.size = sizeof(domain->rdma_ctx);
342 	domain->rdma_ctx.ibv_pd = pd;
343 	ctx.size = sizeof(ctx);
344 	ctx.user_ctx = &domain->rdma_ctx;
345 
346 	rc = spdk_memory_domain_create(&domain->domain, SPDK_DMA_DEVICE_TYPE_RDMA, &ctx,
347 				       SPDK_RDMA_DMA_DEVICE);
348 	if (rc) {
349 		SPDK_ERRLOG("Failed to create memory domain\n");
350 		free(domain);
351 		pthread_mutex_unlock(&g_memory_domains_lock);
352 		return NULL;
353 	}
354 
355 	domain->pd = pd;
356 	domain->ref = 1;
357 	TAILQ_INSERT_TAIL(&g_memory_domains, domain, link);
358 
359 	pthread_mutex_unlock(&g_memory_domains_lock);
360 
361 	return domain;
362 }
363 
364 static void
365 nvme_rdma_put_memory_domain(struct nvme_rdma_memory_domain *device)
366 {
367 	if (!device) {
368 		return;
369 	}
370 
371 	pthread_mutex_lock(&g_memory_domains_lock);
372 
373 	assert(device->ref > 0);
374 
375 	device->ref--;
376 
377 	if (device->ref == 0) {
378 		spdk_memory_domain_destroy(device->domain);
379 		TAILQ_REMOVE(&g_memory_domains, device, link);
380 		free(device);
381 	}
382 
383 	pthread_mutex_unlock(&g_memory_domains_lock);
384 }
385 
386 static int nvme_rdma_ctrlr_delete_io_qpair(struct spdk_nvme_ctrlr *ctrlr,
387 		struct spdk_nvme_qpair *qpair);
388 
389 static inline struct nvme_rdma_qpair *
390 nvme_rdma_qpair(struct spdk_nvme_qpair *qpair)
391 {
392 	assert(qpair->trtype == SPDK_NVME_TRANSPORT_RDMA);
393 	return SPDK_CONTAINEROF(qpair, struct nvme_rdma_qpair, qpair);
394 }
395 
396 static inline struct nvme_rdma_poll_group *
397 nvme_rdma_poll_group(struct spdk_nvme_transport_poll_group *group)
398 {
399 	return (SPDK_CONTAINEROF(group, struct nvme_rdma_poll_group, group));
400 }
401 
402 static inline struct nvme_rdma_ctrlr *
403 nvme_rdma_ctrlr(struct spdk_nvme_ctrlr *ctrlr)
404 {
405 	assert(ctrlr->trid.trtype == SPDK_NVME_TRANSPORT_RDMA);
406 	return SPDK_CONTAINEROF(ctrlr, struct nvme_rdma_ctrlr, ctrlr);
407 }
408 
409 static struct spdk_nvme_rdma_req *
410 nvme_rdma_req_get(struct nvme_rdma_qpair *rqpair)
411 {
412 	struct spdk_nvme_rdma_req *rdma_req;
413 
414 	rdma_req = TAILQ_FIRST(&rqpair->free_reqs);
415 	if (rdma_req) {
416 		TAILQ_REMOVE(&rqpair->free_reqs, rdma_req, link);
417 		TAILQ_INSERT_TAIL(&rqpair->outstanding_reqs, rdma_req, link);
418 	}
419 
420 	return rdma_req;
421 }
422 
423 static void
424 nvme_rdma_req_put(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
425 {
426 	rdma_req->completion_flags = 0;
427 	rdma_req->req = NULL;
428 	rdma_req->rdma_rsp = NULL;
429 	TAILQ_INSERT_HEAD(&rqpair->free_reqs, rdma_req, link);
430 }
431 
432 static void
433 nvme_rdma_req_complete(struct spdk_nvme_rdma_req *rdma_req,
434 		       struct spdk_nvme_cpl *rsp,
435 		       bool print_on_error)
436 {
437 	struct nvme_request *req = rdma_req->req;
438 	struct nvme_rdma_qpair *rqpair;
439 	struct spdk_nvme_qpair *qpair;
440 	bool error, print_error;
441 
442 	assert(req != NULL);
443 
444 	qpair = req->qpair;
445 	rqpair = nvme_rdma_qpair(qpair);
446 
447 	error = spdk_nvme_cpl_is_error(rsp);
448 	print_error = error && print_on_error && !qpair->ctrlr->opts.disable_error_logging;
449 
450 	if (print_error) {
451 		spdk_nvme_qpair_print_command(qpair, &req->cmd);
452 	}
453 
454 	if (print_error || SPDK_DEBUGLOG_FLAG_ENABLED("nvme")) {
455 		spdk_nvme_qpair_print_completion(qpair, rsp);
456 	}
457 
458 	TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link);
459 
460 	nvme_complete_request(req->cb_fn, req->cb_arg, qpair, req, rsp);
461 	nvme_rdma_req_put(rqpair, rdma_req);
462 }
463 
464 static const char *
465 nvme_rdma_cm_event_str_get(uint32_t event)
466 {
467 	if (event < SPDK_COUNTOF(rdma_cm_event_str)) {
468 		return rdma_cm_event_str[event];
469 	} else {
470 		return "Undefined";
471 	}
472 }
473 
474 
475 static int
476 nvme_rdma_qpair_process_cm_event(struct nvme_rdma_qpair *rqpair)
477 {
478 	struct rdma_cm_event				*event = rqpair->evt;
479 	struct spdk_nvmf_rdma_accept_private_data	*accept_data;
480 	int						rc = 0;
481 
482 	if (event) {
483 		switch (event->event) {
484 		case RDMA_CM_EVENT_ADDR_RESOLVED:
485 		case RDMA_CM_EVENT_ADDR_ERROR:
486 		case RDMA_CM_EVENT_ROUTE_RESOLVED:
487 		case RDMA_CM_EVENT_ROUTE_ERROR:
488 			break;
489 		case RDMA_CM_EVENT_CONNECT_REQUEST:
490 			break;
491 		case RDMA_CM_EVENT_CONNECT_ERROR:
492 			break;
493 		case RDMA_CM_EVENT_UNREACHABLE:
494 		case RDMA_CM_EVENT_REJECTED:
495 			break;
496 		case RDMA_CM_EVENT_CONNECT_RESPONSE:
497 			rc = spdk_rdma_qp_complete_connect(rqpair->rdma_qp);
498 		/* fall through */
499 		case RDMA_CM_EVENT_ESTABLISHED:
500 			accept_data = (struct spdk_nvmf_rdma_accept_private_data *)event->param.conn.private_data;
501 			if (accept_data == NULL) {
502 				rc = -1;
503 			} else {
504 				SPDK_DEBUGLOG(nvme, "Requested queue depth %d. Target receive queue depth %d.\n",
505 					      rqpair->num_entries + 1, accept_data->crqsize);
506 			}
507 			break;
508 		case RDMA_CM_EVENT_DISCONNECTED:
509 			rqpair->qpair.transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_REMOTE;
510 			break;
511 		case RDMA_CM_EVENT_DEVICE_REMOVAL:
512 			rqpair->qpair.transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_LOCAL;
513 			rqpair->need_destroy = true;
514 			break;
515 		case RDMA_CM_EVENT_MULTICAST_JOIN:
516 		case RDMA_CM_EVENT_MULTICAST_ERROR:
517 			break;
518 		case RDMA_CM_EVENT_ADDR_CHANGE:
519 			rqpair->qpair.transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_LOCAL;
520 			break;
521 		case RDMA_CM_EVENT_TIMEWAIT_EXIT:
522 			break;
523 		default:
524 			SPDK_ERRLOG("Unexpected Acceptor Event [%d]\n", event->event);
525 			break;
526 		}
527 		rqpair->evt = NULL;
528 		rdma_ack_cm_event(event);
529 	}
530 
531 	return rc;
532 }
533 
534 /*
535  * This function must be called under the nvme controller's lock
536  * because it touches global controller variables. The lock is taken
537  * by the generic transport code before invoking a few of the functions
538  * in this file: nvme_rdma_ctrlr_connect_qpair, nvme_rdma_ctrlr_delete_io_qpair,
539  * and conditionally nvme_rdma_qpair_process_completions when it is calling
540  * completions on the admin qpair. When adding a new call to this function, please
541  * verify that it is in a situation where it falls under the lock.
542  */
543 static int
544 nvme_rdma_poll_events(struct nvme_rdma_ctrlr *rctrlr)
545 {
546 	struct nvme_rdma_cm_event_entry	*entry, *tmp;
547 	struct nvme_rdma_qpair		*event_qpair;
548 	struct rdma_cm_event		*event;
549 	struct rdma_event_channel	*channel = rctrlr->cm_channel;
550 
551 	STAILQ_FOREACH_SAFE(entry, &rctrlr->pending_cm_events, link, tmp) {
552 		event_qpair = entry->evt->id->context;
553 		if (event_qpair->evt == NULL) {
554 			event_qpair->evt = entry->evt;
555 			STAILQ_REMOVE(&rctrlr->pending_cm_events, entry, nvme_rdma_cm_event_entry, link);
556 			STAILQ_INSERT_HEAD(&rctrlr->free_cm_events, entry, link);
557 		}
558 	}
559 
560 	while (rdma_get_cm_event(channel, &event) == 0) {
561 		event_qpair = event->id->context;
562 		if (event_qpair->evt == NULL) {
563 			event_qpair->evt = event;
564 		} else {
565 			assert(rctrlr == nvme_rdma_ctrlr(event_qpair->qpair.ctrlr));
566 			entry = STAILQ_FIRST(&rctrlr->free_cm_events);
567 			if (entry == NULL) {
568 				rdma_ack_cm_event(event);
569 				return -ENOMEM;
570 			}
571 			STAILQ_REMOVE(&rctrlr->free_cm_events, entry, nvme_rdma_cm_event_entry, link);
572 			entry->evt = event;
573 			STAILQ_INSERT_TAIL(&rctrlr->pending_cm_events, entry, link);
574 		}
575 	}
576 
577 	/* rdma_get_cm_event() returns -1 on error. If an error occurs, errno
578 	 * will be set to indicate the failure reason. So return negated errno here.
579 	 */
580 	return -errno;
581 }
582 
583 static int
584 nvme_rdma_validate_cm_event(enum rdma_cm_event_type expected_evt_type,
585 			    struct rdma_cm_event *reaped_evt)
586 {
587 	int rc = -EBADMSG;
588 
589 	if (expected_evt_type == reaped_evt->event) {
590 		return 0;
591 	}
592 
593 	switch (expected_evt_type) {
594 	case RDMA_CM_EVENT_ESTABLISHED:
595 		/*
596 		 * There is an enum ib_cm_rej_reason in the kernel headers that sets 10 as
597 		 * IB_CM_REJ_STALE_CONN. I can't find the corresponding userspace but we get
598 		 * the same values here.
599 		 */
600 		if (reaped_evt->event == RDMA_CM_EVENT_REJECTED && reaped_evt->status == 10) {
601 			rc = -ESTALE;
602 		} else if (reaped_evt->event == RDMA_CM_EVENT_CONNECT_RESPONSE) {
603 			/*
604 			 *  If we are using a qpair which is not created using rdma cm API
605 			 *  then we will receive RDMA_CM_EVENT_CONNECT_RESPONSE instead of
606 			 *  RDMA_CM_EVENT_ESTABLISHED.
607 			 */
608 			return 0;
609 		}
610 		break;
611 	default:
612 		break;
613 	}
614 
615 	SPDK_ERRLOG("Expected %s but received %s (%d) from CM event channel (status = %d)\n",
616 		    nvme_rdma_cm_event_str_get(expected_evt_type),
617 		    nvme_rdma_cm_event_str_get(reaped_evt->event), reaped_evt->event,
618 		    reaped_evt->status);
619 	return rc;
620 }
621 
622 static int
623 nvme_rdma_process_event_start(struct nvme_rdma_qpair *rqpair,
624 			      enum rdma_cm_event_type evt,
625 			      nvme_rdma_cm_event_cb evt_cb)
626 {
627 	int	rc;
628 
629 	assert(evt_cb != NULL);
630 
631 	if (rqpair->evt != NULL) {
632 		rc = nvme_rdma_qpair_process_cm_event(rqpair);
633 		if (rc) {
634 			return rc;
635 		}
636 	}
637 
638 	rqpair->expected_evt_type = evt;
639 	rqpair->evt_cb = evt_cb;
640 	rqpair->evt_timeout_ticks = (NVME_RDMA_QPAIR_CM_EVENT_TIMEOUT_US * spdk_get_ticks_hz()) /
641 				    SPDK_SEC_TO_USEC + spdk_get_ticks();
642 
643 	return 0;
644 }
645 
646 static int
647 nvme_rdma_process_event_poll(struct nvme_rdma_qpair *rqpair)
648 {
649 	struct nvme_rdma_ctrlr	*rctrlr;
650 	int	rc = 0, rc2;
651 
652 	rctrlr = nvme_rdma_ctrlr(rqpair->qpair.ctrlr);
653 	assert(rctrlr != NULL);
654 
655 	if (!rqpair->evt && spdk_get_ticks() < rqpair->evt_timeout_ticks) {
656 		rc = nvme_rdma_poll_events(rctrlr);
657 		if (rc == -EAGAIN || rc == -EWOULDBLOCK) {
658 			return rc;
659 		}
660 	}
661 
662 	if (rqpair->evt == NULL) {
663 		rc = -EADDRNOTAVAIL;
664 		goto exit;
665 	}
666 
667 	rc = nvme_rdma_validate_cm_event(rqpair->expected_evt_type, rqpair->evt);
668 
669 	rc2 = nvme_rdma_qpair_process_cm_event(rqpair);
670 	/* bad message takes precedence over the other error codes from processing the event. */
671 	rc = rc == 0 ? rc2 : rc;
672 
673 exit:
674 	assert(rqpair->evt_cb != NULL);
675 	return rqpair->evt_cb(rqpair, rc);
676 }
677 
678 static int
679 nvme_rdma_resize_cq(struct nvme_rdma_qpair *rqpair, struct nvme_rdma_poller *poller)
680 {
681 	int	current_num_wc, required_num_wc;
682 	int	max_cq_size;
683 
684 	required_num_wc = poller->required_num_wc + WC_PER_QPAIR(rqpair->num_entries);
685 	current_num_wc = poller->current_num_wc;
686 	if (current_num_wc < required_num_wc) {
687 		current_num_wc = spdk_max(current_num_wc * 2, required_num_wc);
688 	}
689 
690 	max_cq_size = g_spdk_nvme_transport_opts.rdma_max_cq_size;
691 	if (max_cq_size != 0 && current_num_wc > max_cq_size) {
692 		current_num_wc = max_cq_size;
693 	}
694 
695 	if (poller->current_num_wc != current_num_wc) {
696 		SPDK_DEBUGLOG(nvme, "Resize RDMA CQ from %d to %d\n", poller->current_num_wc,
697 			      current_num_wc);
698 		if (ibv_resize_cq(poller->cq, current_num_wc)) {
699 			SPDK_ERRLOG("RDMA CQ resize failed: errno %d: %s\n", errno, spdk_strerror(errno));
700 			return -1;
701 		}
702 
703 		poller->current_num_wc = current_num_wc;
704 	}
705 
706 	poller->required_num_wc = required_num_wc;
707 	return 0;
708 }
709 
710 static int
711 nvme_rdma_qpair_set_poller(struct spdk_nvme_qpair *qpair)
712 {
713 	struct nvme_rdma_qpair          *rqpair = nvme_rdma_qpair(qpair);
714 	struct nvme_rdma_poll_group     *group = nvme_rdma_poll_group(qpair->poll_group);
715 	struct nvme_rdma_poller         *poller;
716 
717 	assert(rqpair->cq == NULL);
718 
719 	poller = nvme_rdma_poll_group_get_poller(group, rqpair->cm_id->verbs);
720 	if (!poller) {
721 		SPDK_ERRLOG("Unable to find a cq for qpair %p on poll group %p\n", qpair, qpair->poll_group);
722 		return -EINVAL;
723 	}
724 
725 	if (!poller->srq) {
726 		if (nvme_rdma_resize_cq(rqpair, poller)) {
727 			nvme_rdma_poll_group_put_poller(group, poller);
728 			return -EPROTO;
729 		}
730 	}
731 
732 	rqpair->cq = poller->cq;
733 	rqpair->srq = poller->srq;
734 	if (rqpair->srq) {
735 		rqpair->rsps = poller->rsps;
736 	}
737 	rqpair->poller = poller;
738 	return 0;
739 }
740 
741 static int
742 nvme_rdma_qpair_init(struct nvme_rdma_qpair *rqpair)
743 {
744 	int			rc;
745 	struct spdk_rdma_qp_init_attr	attr = {};
746 	struct ibv_device_attr	dev_attr;
747 	struct nvme_rdma_ctrlr	*rctrlr;
748 	uint32_t num_cqe, max_num_cqe;
749 
750 	rc = ibv_query_device(rqpair->cm_id->verbs, &dev_attr);
751 	if (rc != 0) {
752 		SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
753 		return -1;
754 	}
755 
756 	if (rqpair->qpair.poll_group) {
757 		assert(!rqpair->cq);
758 		rc = nvme_rdma_qpair_set_poller(&rqpair->qpair);
759 		if (rc) {
760 			SPDK_ERRLOG("Unable to activate the rdmaqpair.\n");
761 			return -1;
762 		}
763 		assert(rqpair->cq);
764 	} else {
765 		num_cqe = rqpair->num_entries * 2;
766 		max_num_cqe = g_spdk_nvme_transport_opts.rdma_max_cq_size;
767 		if (max_num_cqe != 0 && num_cqe > max_num_cqe) {
768 			num_cqe = max_num_cqe;
769 		}
770 		rqpair->cq = ibv_create_cq(rqpair->cm_id->verbs, num_cqe, rqpair, NULL, 0);
771 		if (!rqpair->cq) {
772 			SPDK_ERRLOG("Unable to create completion queue: errno %d: %s\n", errno, spdk_strerror(errno));
773 			return -1;
774 		}
775 	}
776 
777 	rctrlr = nvme_rdma_ctrlr(rqpair->qpair.ctrlr);
778 	if (g_nvme_hooks.get_ibv_pd) {
779 		attr.pd = g_nvme_hooks.get_ibv_pd(&rctrlr->ctrlr.trid, rqpair->cm_id->verbs);
780 	} else {
781 		attr.pd = spdk_rdma_get_pd(rqpair->cm_id->verbs);
782 	}
783 
784 	attr.stats =		rqpair->poller ? &rqpair->poller->stats.rdma_stats : NULL;
785 	attr.send_cq		= rqpair->cq;
786 	attr.recv_cq		= rqpair->cq;
787 	attr.cap.max_send_wr	= rqpair->num_entries; /* SEND operations */
788 	if (rqpair->srq) {
789 		attr.srq	= rqpair->srq->srq;
790 	} else {
791 		attr.cap.max_recv_wr = rqpair->num_entries; /* RECV operations */
792 	}
793 	attr.cap.max_send_sge	= spdk_min(NVME_RDMA_DEFAULT_TX_SGE, dev_attr.max_sge);
794 	attr.cap.max_recv_sge	= spdk_min(NVME_RDMA_DEFAULT_RX_SGE, dev_attr.max_sge);
795 
796 	rqpair->rdma_qp = spdk_rdma_qp_create(rqpair->cm_id, &attr);
797 
798 	if (!rqpair->rdma_qp) {
799 		return -1;
800 	}
801 
802 	rqpair->memory_domain = nvme_rdma_get_memory_domain(rqpair->rdma_qp->qp->pd);
803 	if (!rqpair->memory_domain) {
804 		SPDK_ERRLOG("Failed to get memory domain\n");
805 		return -1;
806 	}
807 
808 	/* ibv_create_qp will change the values in attr.cap. Make sure we store the proper value. */
809 	rqpair->max_send_sge = spdk_min(NVME_RDMA_DEFAULT_TX_SGE, attr.cap.max_send_sge);
810 	rqpair->max_recv_sge = spdk_min(NVME_RDMA_DEFAULT_RX_SGE, attr.cap.max_recv_sge);
811 	rqpair->current_num_sends = 0;
812 
813 	rqpair->cm_id->context = rqpair;
814 
815 	return 0;
816 }
817 
818 static void
819 nvme_rdma_reset_failed_sends(struct nvme_rdma_qpair *rqpair,
820 			     struct ibv_send_wr *bad_send_wr, int rc)
821 {
822 	SPDK_ERRLOG("Failed to post WRs on send queue, errno %d (%s), bad_wr %p\n",
823 		    rc, spdk_strerror(rc), bad_send_wr);
824 	while (bad_send_wr != NULL) {
825 		assert(rqpair->current_num_sends > 0);
826 		rqpair->current_num_sends--;
827 		bad_send_wr = bad_send_wr->next;
828 	}
829 }
830 
831 static void
832 nvme_rdma_reset_failed_recvs(struct nvme_rdma_rsps *rsps,
833 			     struct ibv_recv_wr *bad_recv_wr, int rc)
834 {
835 	SPDK_ERRLOG("Failed to post WRs on receive queue, errno %d (%s), bad_wr %p\n",
836 		    rc, spdk_strerror(rc), bad_recv_wr);
837 	while (bad_recv_wr != NULL) {
838 		assert(rsps->current_num_recvs > 0);
839 		rsps->current_num_recvs--;
840 		bad_recv_wr = bad_recv_wr->next;
841 	}
842 }
843 
844 static inline int
845 nvme_rdma_qpair_submit_sends(struct nvme_rdma_qpair *rqpair)
846 {
847 	struct ibv_send_wr *bad_send_wr = NULL;
848 	int rc;
849 
850 	rc = spdk_rdma_qp_flush_send_wrs(rqpair->rdma_qp, &bad_send_wr);
851 
852 	if (spdk_unlikely(rc)) {
853 		nvme_rdma_reset_failed_sends(rqpair, bad_send_wr, rc);
854 	}
855 
856 	return rc;
857 }
858 
859 static inline int
860 nvme_rdma_qpair_submit_recvs(struct nvme_rdma_qpair *rqpair)
861 {
862 	struct ibv_recv_wr *bad_recv_wr;
863 	int rc = 0;
864 
865 	rc = spdk_rdma_qp_flush_recv_wrs(rqpair->rdma_qp, &bad_recv_wr);
866 	if (spdk_unlikely(rc)) {
867 		nvme_rdma_reset_failed_recvs(rqpair->rsps, bad_recv_wr, rc);
868 	}
869 
870 	return rc;
871 }
872 
873 static inline int
874 nvme_rdma_poller_submit_recvs(struct nvme_rdma_poller *poller)
875 {
876 	struct ibv_recv_wr *bad_recv_wr;
877 	int rc;
878 
879 	rc = spdk_rdma_srq_flush_recv_wrs(poller->srq, &bad_recv_wr);
880 	if (spdk_unlikely(rc)) {
881 		nvme_rdma_reset_failed_recvs(poller->rsps, bad_recv_wr, rc);
882 	}
883 
884 	return rc;
885 }
886 
887 #define nvme_rdma_trace_ibv_sge(sg_list) \
888 	if (sg_list) { \
889 		SPDK_DEBUGLOG(nvme, "local addr %p length 0x%x lkey 0x%x\n", \
890 			      (void *)(sg_list)->addr, (sg_list)->length, (sg_list)->lkey); \
891 	}
892 
893 static void
894 nvme_rdma_free_rsps(struct nvme_rdma_rsps *rsps)
895 {
896 	if (!rsps) {
897 		return;
898 	}
899 
900 	spdk_free(rsps->rsps);
901 	spdk_free(rsps->rsp_sgls);
902 	spdk_free(rsps->rsp_recv_wrs);
903 	spdk_free(rsps);
904 }
905 
906 static struct nvme_rdma_rsps *
907 nvme_rdma_create_rsps(struct nvme_rdma_rsp_opts *opts)
908 {
909 	struct nvme_rdma_rsps *rsps;
910 	struct spdk_rdma_memory_translation translation;
911 	uint16_t i;
912 	int rc;
913 
914 	rsps = spdk_zmalloc(sizeof(*rsps), 0, NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
915 	if (!rsps) {
916 		SPDK_ERRLOG("Failed to allocate rsps object\n");
917 		return NULL;
918 	}
919 
920 	rsps->rsp_sgls = spdk_zmalloc(opts->num_entries * sizeof(*rsps->rsp_sgls), 0, NULL,
921 				      SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
922 	if (!rsps->rsp_sgls) {
923 		SPDK_ERRLOG("Failed to allocate rsp_sgls\n");
924 		goto fail;
925 	}
926 
927 	rsps->rsp_recv_wrs = spdk_zmalloc(opts->num_entries * sizeof(*rsps->rsp_recv_wrs), 0, NULL,
928 					  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
929 	if (!rsps->rsp_recv_wrs) {
930 		SPDK_ERRLOG("Failed to allocate rsp_recv_wrs\n");
931 		goto fail;
932 	}
933 
934 	rsps->rsps = spdk_zmalloc(opts->num_entries * sizeof(*rsps->rsps), 0, NULL,
935 				  SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
936 	if (!rsps->rsps) {
937 		SPDK_ERRLOG("can not allocate rdma rsps\n");
938 		goto fail;
939 	}
940 
941 	for (i = 0; i < opts->num_entries; i++) {
942 		struct ibv_sge *rsp_sgl = &rsps->rsp_sgls[i];
943 		struct spdk_nvme_rdma_rsp *rsp = &rsps->rsps[i];
944 		struct ibv_recv_wr *recv_wr = &rsps->rsp_recv_wrs[i];
945 
946 		rsp->rqpair = opts->rqpair;
947 		rsp->rdma_wr.type = RDMA_WR_TYPE_RECV;
948 		rsp->recv_wr = recv_wr;
949 		rsp_sgl->addr = (uint64_t)rsp;
950 		rsp_sgl->length = sizeof(struct spdk_nvme_cpl);
951 		rc = spdk_rdma_get_translation(opts->mr_map, rsp, sizeof(*rsp), &translation);
952 		if (rc) {
953 			goto fail;
954 		}
955 		rsp_sgl->lkey = spdk_rdma_memory_translation_get_lkey(&translation);
956 
957 		recv_wr->wr_id = (uint64_t)&rsp->rdma_wr;
958 		recv_wr->next = NULL;
959 		recv_wr->sg_list = rsp_sgl;
960 		recv_wr->num_sge = 1;
961 
962 		nvme_rdma_trace_ibv_sge(recv_wr->sg_list);
963 
964 		if (opts->rqpair) {
965 			spdk_rdma_qp_queue_recv_wrs(opts->rqpair->rdma_qp, recv_wr);
966 		} else {
967 			spdk_rdma_srq_queue_recv_wrs(opts->srq, recv_wr);
968 		}
969 	}
970 
971 	rsps->num_entries = opts->num_entries;
972 	rsps->current_num_recvs = opts->num_entries;
973 
974 	return rsps;
975 fail:
976 	nvme_rdma_free_rsps(rsps);
977 	return NULL;
978 }
979 
980 static void
981 nvme_rdma_free_reqs(struct nvme_rdma_qpair *rqpair)
982 {
983 	if (!rqpair->rdma_reqs) {
984 		return;
985 	}
986 
987 	spdk_free(rqpair->cmds);
988 	rqpair->cmds = NULL;
989 
990 	spdk_free(rqpair->rdma_reqs);
991 	rqpair->rdma_reqs = NULL;
992 }
993 
994 static int
995 nvme_rdma_create_reqs(struct nvme_rdma_qpair *rqpair)
996 {
997 	struct spdk_rdma_memory_translation translation;
998 	uint16_t i;
999 	int rc;
1000 
1001 	assert(!rqpair->rdma_reqs);
1002 	rqpair->rdma_reqs = spdk_zmalloc(rqpair->num_entries * sizeof(struct spdk_nvme_rdma_req), 0, NULL,
1003 					 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1004 	if (rqpair->rdma_reqs == NULL) {
1005 		SPDK_ERRLOG("Failed to allocate rdma_reqs\n");
1006 		goto fail;
1007 	}
1008 
1009 	assert(!rqpair->cmds);
1010 	rqpair->cmds = spdk_zmalloc(rqpair->num_entries * sizeof(*rqpair->cmds), 0, NULL,
1011 				    SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
1012 	if (!rqpair->cmds) {
1013 		SPDK_ERRLOG("Failed to allocate RDMA cmds\n");
1014 		goto fail;
1015 	}
1016 
1017 	TAILQ_INIT(&rqpair->free_reqs);
1018 	TAILQ_INIT(&rqpair->outstanding_reqs);
1019 	for (i = 0; i < rqpair->num_entries; i++) {
1020 		struct spdk_nvme_rdma_req	*rdma_req;
1021 		struct spdk_nvmf_cmd		*cmd;
1022 
1023 		rdma_req = &rqpair->rdma_reqs[i];
1024 		rdma_req->rdma_wr.type = RDMA_WR_TYPE_SEND;
1025 		cmd = &rqpair->cmds[i];
1026 
1027 		rdma_req->id = i;
1028 
1029 		rc = spdk_rdma_get_translation(rqpair->mr_map, cmd, sizeof(*cmd), &translation);
1030 		if (rc) {
1031 			goto fail;
1032 		}
1033 		rdma_req->send_sgl[0].lkey = spdk_rdma_memory_translation_get_lkey(&translation);
1034 
1035 		/* The first RDMA sgl element will always point
1036 		 * at this data structure. Depending on whether
1037 		 * an NVMe-oF SGL is required, the length of
1038 		 * this element may change. */
1039 		rdma_req->send_sgl[0].addr = (uint64_t)cmd;
1040 		rdma_req->send_wr.wr_id = (uint64_t)&rdma_req->rdma_wr;
1041 		rdma_req->send_wr.next = NULL;
1042 		rdma_req->send_wr.opcode = IBV_WR_SEND;
1043 		rdma_req->send_wr.send_flags = IBV_SEND_SIGNALED;
1044 		rdma_req->send_wr.sg_list = rdma_req->send_sgl;
1045 		rdma_req->send_wr.imm_data = 0;
1046 
1047 		TAILQ_INSERT_TAIL(&rqpair->free_reqs, rdma_req, link);
1048 	}
1049 
1050 	return 0;
1051 fail:
1052 	nvme_rdma_free_reqs(rqpair);
1053 	return -ENOMEM;
1054 }
1055 
1056 static int nvme_rdma_connect(struct nvme_rdma_qpair *rqpair);
1057 
1058 static int
1059 nvme_rdma_route_resolved(struct nvme_rdma_qpair *rqpair, int ret)
1060 {
1061 	if (ret) {
1062 		SPDK_ERRLOG("RDMA route resolution error\n");
1063 		return -1;
1064 	}
1065 
1066 	ret = nvme_rdma_qpair_init(rqpair);
1067 	if (ret < 0) {
1068 		SPDK_ERRLOG("nvme_rdma_qpair_init() failed\n");
1069 		return -1;
1070 	}
1071 
1072 	return nvme_rdma_connect(rqpair);
1073 }
1074 
1075 static int
1076 nvme_rdma_addr_resolved(struct nvme_rdma_qpair *rqpair, int ret)
1077 {
1078 	if (ret) {
1079 		SPDK_ERRLOG("RDMA address resolution error\n");
1080 		return -1;
1081 	}
1082 
1083 	if (rqpair->qpair.ctrlr->opts.transport_ack_timeout != SPDK_NVME_TRANSPORT_ACK_TIMEOUT_DISABLED) {
1084 #ifdef SPDK_CONFIG_RDMA_SET_ACK_TIMEOUT
1085 		uint8_t timeout = rqpair->qpair.ctrlr->opts.transport_ack_timeout;
1086 		ret = rdma_set_option(rqpair->cm_id, RDMA_OPTION_ID,
1087 				      RDMA_OPTION_ID_ACK_TIMEOUT,
1088 				      &timeout, sizeof(timeout));
1089 		if (ret) {
1090 			SPDK_NOTICELOG("Can't apply RDMA_OPTION_ID_ACK_TIMEOUT %d, ret %d\n", timeout, ret);
1091 		}
1092 #else
1093 		SPDK_DEBUGLOG(nvme, "transport_ack_timeout is not supported\n");
1094 #endif
1095 	}
1096 
1097 	if (rqpair->qpair.ctrlr->opts.transport_tos != SPDK_NVME_TRANSPORT_TOS_DISABLED) {
1098 #ifdef SPDK_CONFIG_RDMA_SET_TOS
1099 		uint8_t tos = rqpair->qpair.ctrlr->opts.transport_tos;
1100 		ret = rdma_set_option(rqpair->cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_TOS, &tos, sizeof(tos));
1101 		if (ret) {
1102 			SPDK_NOTICELOG("Can't apply RDMA_OPTION_ID_TOS %u, ret %d\n", tos, ret);
1103 		}
1104 #else
1105 		SPDK_DEBUGLOG(nvme, "transport_tos is not supported\n");
1106 #endif
1107 	}
1108 
1109 	ret = rdma_resolve_route(rqpair->cm_id, NVME_RDMA_TIME_OUT_IN_MS);
1110 	if (ret) {
1111 		SPDK_ERRLOG("rdma_resolve_route\n");
1112 		return ret;
1113 	}
1114 
1115 	return nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_ROUTE_RESOLVED,
1116 					     nvme_rdma_route_resolved);
1117 }
1118 
1119 static int
1120 nvme_rdma_resolve_addr(struct nvme_rdma_qpair *rqpair,
1121 		       struct sockaddr *src_addr,
1122 		       struct sockaddr *dst_addr)
1123 {
1124 	int ret;
1125 
1126 	if (src_addr) {
1127 		int reuse = 1;
1128 
1129 		ret = rdma_set_option(rqpair->cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
1130 				      &reuse, sizeof(reuse));
1131 		if (ret) {
1132 			SPDK_NOTICELOG("Can't apply RDMA_OPTION_ID_REUSEADDR %d, ret %d\n",
1133 				       reuse, ret);
1134 			/* It is likely that rdma_resolve_addr() returns -EADDRINUSE, but
1135 			 * we may missing something. We rely on rdma_resolve_addr().
1136 			 */
1137 		}
1138 	}
1139 
1140 	ret = rdma_resolve_addr(rqpair->cm_id, src_addr, dst_addr,
1141 				NVME_RDMA_TIME_OUT_IN_MS);
1142 	if (ret) {
1143 		SPDK_ERRLOG("rdma_resolve_addr, %d\n", errno);
1144 		return ret;
1145 	}
1146 
1147 	return nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_ADDR_RESOLVED,
1148 					     nvme_rdma_addr_resolved);
1149 }
1150 
1151 static int nvme_rdma_stale_conn_retry(struct nvme_rdma_qpair *rqpair);
1152 
1153 static int
1154 nvme_rdma_connect_established(struct nvme_rdma_qpair *rqpair, int ret)
1155 {
1156 	struct nvme_rdma_rsp_opts opts = {};
1157 
1158 	if (ret == -ESTALE) {
1159 		return nvme_rdma_stale_conn_retry(rqpair);
1160 	} else if (ret) {
1161 		SPDK_ERRLOG("RDMA connect error %d\n", ret);
1162 		return ret;
1163 	}
1164 
1165 	assert(!rqpair->mr_map);
1166 	rqpair->mr_map = spdk_rdma_create_mem_map(rqpair->rdma_qp->qp->pd, &g_nvme_hooks,
1167 			 SPDK_RDMA_MEMORY_MAP_ROLE_INITIATOR);
1168 	if (!rqpair->mr_map) {
1169 		SPDK_ERRLOG("Unable to register RDMA memory translation map\n");
1170 		return -1;
1171 	}
1172 
1173 	ret = nvme_rdma_create_reqs(rqpair);
1174 	SPDK_DEBUGLOG(nvme, "rc =%d\n", ret);
1175 	if (ret) {
1176 		SPDK_ERRLOG("Unable to create rqpair RDMA requests\n");
1177 		return -1;
1178 	}
1179 	SPDK_DEBUGLOG(nvme, "RDMA requests created\n");
1180 
1181 	if (!rqpair->srq) {
1182 		opts.num_entries = rqpair->num_entries;
1183 		opts.rqpair = rqpair;
1184 		opts.srq = NULL;
1185 		opts.mr_map = rqpair->mr_map;
1186 
1187 		assert(!rqpair->rsps);
1188 		rqpair->rsps = nvme_rdma_create_rsps(&opts);
1189 		if (!rqpair->rsps) {
1190 			SPDK_ERRLOG("Unable to create rqpair RDMA responses\n");
1191 			return -1;
1192 		}
1193 		SPDK_DEBUGLOG(nvme, "RDMA responses created\n");
1194 
1195 		ret = nvme_rdma_qpair_submit_recvs(rqpair);
1196 		SPDK_DEBUGLOG(nvme, "rc =%d\n", ret);
1197 		if (ret) {
1198 			SPDK_ERRLOG("Unable to submit rqpair RDMA responses\n");
1199 			return -1;
1200 		}
1201 		SPDK_DEBUGLOG(nvme, "RDMA responses submitted\n");
1202 	}
1203 
1204 	rqpair->state = NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_SEND;
1205 
1206 	return 0;
1207 }
1208 
1209 static int
1210 nvme_rdma_connect(struct nvme_rdma_qpair *rqpair)
1211 {
1212 	struct rdma_conn_param				param = {};
1213 	struct spdk_nvmf_rdma_request_private_data	request_data = {};
1214 	struct ibv_device_attr				attr;
1215 	int						ret;
1216 	struct spdk_nvme_ctrlr				*ctrlr;
1217 
1218 	ret = ibv_query_device(rqpair->cm_id->verbs, &attr);
1219 	if (ret != 0) {
1220 		SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
1221 		return ret;
1222 	}
1223 
1224 	param.responder_resources = attr.max_qp_rd_atom;
1225 
1226 	ctrlr = rqpair->qpair.ctrlr;
1227 	if (!ctrlr) {
1228 		return -1;
1229 	}
1230 
1231 	request_data.qid = rqpair->qpair.id;
1232 	request_data.hrqsize = rqpair->num_entries + 1;
1233 	request_data.hsqsize = rqpair->num_entries;
1234 	request_data.cntlid = ctrlr->cntlid;
1235 
1236 	param.private_data = &request_data;
1237 	param.private_data_len = sizeof(request_data);
1238 	param.retry_count = ctrlr->opts.transport_retry_count;
1239 	param.rnr_retry_count = 7;
1240 
1241 	/* Fields below are ignored by rdma cm if qpair has been
1242 	 * created using rdma cm API. */
1243 	param.srq = 0;
1244 	param.qp_num = rqpair->rdma_qp->qp->qp_num;
1245 
1246 	ret = rdma_connect(rqpair->cm_id, &param);
1247 	if (ret) {
1248 		SPDK_ERRLOG("nvme rdma connect error\n");
1249 		return ret;
1250 	}
1251 
1252 	return nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_ESTABLISHED,
1253 					     nvme_rdma_connect_established);
1254 }
1255 
1256 static int
1257 nvme_rdma_ctrlr_connect_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
1258 {
1259 	struct sockaddr_storage dst_addr;
1260 	struct sockaddr_storage src_addr;
1261 	bool src_addr_specified;
1262 	long int port, src_port;
1263 	int rc;
1264 	struct nvme_rdma_ctrlr *rctrlr;
1265 	struct nvme_rdma_qpair *rqpair;
1266 	int family;
1267 
1268 	rqpair = nvme_rdma_qpair(qpair);
1269 	rctrlr = nvme_rdma_ctrlr(ctrlr);
1270 	assert(rctrlr != NULL);
1271 
1272 	switch (ctrlr->trid.adrfam) {
1273 	case SPDK_NVMF_ADRFAM_IPV4:
1274 		family = AF_INET;
1275 		break;
1276 	case SPDK_NVMF_ADRFAM_IPV6:
1277 		family = AF_INET6;
1278 		break;
1279 	default:
1280 		SPDK_ERRLOG("Unhandled ADRFAM %d\n", ctrlr->trid.adrfam);
1281 		return -1;
1282 	}
1283 
1284 	SPDK_DEBUGLOG(nvme, "adrfam %d ai_family %d\n", ctrlr->trid.adrfam, family);
1285 
1286 	memset(&dst_addr, 0, sizeof(dst_addr));
1287 
1288 	SPDK_DEBUGLOG(nvme, "trsvcid is %s\n", ctrlr->trid.trsvcid);
1289 	rc = nvme_parse_addr(&dst_addr, family, ctrlr->trid.traddr, ctrlr->trid.trsvcid, &port);
1290 	if (rc != 0) {
1291 		SPDK_ERRLOG("dst_addr nvme_parse_addr() failed\n");
1292 		return -1;
1293 	}
1294 
1295 	if (ctrlr->opts.src_addr[0] || ctrlr->opts.src_svcid[0]) {
1296 		memset(&src_addr, 0, sizeof(src_addr));
1297 		rc = nvme_parse_addr(&src_addr, family, ctrlr->opts.src_addr, ctrlr->opts.src_svcid, &src_port);
1298 		if (rc != 0) {
1299 			SPDK_ERRLOG("src_addr nvme_parse_addr() failed\n");
1300 			return -1;
1301 		}
1302 		src_addr_specified = true;
1303 	} else {
1304 		src_addr_specified = false;
1305 	}
1306 
1307 	rc = rdma_create_id(rctrlr->cm_channel, &rqpair->cm_id, rqpair, RDMA_PS_TCP);
1308 	if (rc < 0) {
1309 		SPDK_ERRLOG("rdma_create_id() failed\n");
1310 		return -1;
1311 	}
1312 
1313 	rc = nvme_rdma_resolve_addr(rqpair,
1314 				    src_addr_specified ? (struct sockaddr *)&src_addr : NULL,
1315 				    (struct sockaddr *)&dst_addr);
1316 	if (rc < 0) {
1317 		SPDK_ERRLOG("nvme_rdma_resolve_addr() failed\n");
1318 		return -1;
1319 	}
1320 
1321 	rqpair->state = NVME_RDMA_QPAIR_STATE_INITIALIZING;
1322 
1323 	return 0;
1324 }
1325 
1326 static int
1327 nvme_rdma_stale_conn_reconnect(struct nvme_rdma_qpair *rqpair)
1328 {
1329 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
1330 
1331 	if (spdk_get_ticks() < rqpair->evt_timeout_ticks) {
1332 		return -EAGAIN;
1333 	}
1334 
1335 	return nvme_rdma_ctrlr_connect_qpair(qpair->ctrlr, qpair);
1336 }
1337 
1338 static int
1339 nvme_rdma_ctrlr_connect_qpair_poll(struct spdk_nvme_ctrlr *ctrlr,
1340 				   struct spdk_nvme_qpair *qpair)
1341 {
1342 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
1343 	int rc;
1344 
1345 	if (rqpair->in_connect_poll) {
1346 		return -EAGAIN;
1347 	}
1348 
1349 	rqpair->in_connect_poll = true;
1350 
1351 	switch (rqpair->state) {
1352 	case NVME_RDMA_QPAIR_STATE_INVALID:
1353 		rc = -EAGAIN;
1354 		break;
1355 
1356 	case NVME_RDMA_QPAIR_STATE_INITIALIZING:
1357 	case NVME_RDMA_QPAIR_STATE_EXITING:
1358 		if (!nvme_qpair_is_admin_queue(qpair)) {
1359 			nvme_robust_mutex_lock(&ctrlr->ctrlr_lock);
1360 		}
1361 
1362 		rc = nvme_rdma_process_event_poll(rqpair);
1363 
1364 		if (!nvme_qpair_is_admin_queue(qpair)) {
1365 			nvme_robust_mutex_unlock(&ctrlr->ctrlr_lock);
1366 		}
1367 
1368 		if (rc == 0) {
1369 			rc = -EAGAIN;
1370 		}
1371 		rqpair->in_connect_poll = false;
1372 
1373 		return rc;
1374 
1375 	case NVME_RDMA_QPAIR_STATE_STALE_CONN:
1376 		rc = nvme_rdma_stale_conn_reconnect(rqpair);
1377 		if (rc == 0) {
1378 			rc = -EAGAIN;
1379 		}
1380 		break;
1381 	case NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_SEND:
1382 		rc = nvme_fabric_qpair_connect_async(qpair, rqpair->num_entries + 1);
1383 		if (rc == 0) {
1384 			rqpair->state = NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_POLL;
1385 			rc = -EAGAIN;
1386 		} else {
1387 			SPDK_ERRLOG("Failed to send an NVMe-oF Fabric CONNECT command\n");
1388 		}
1389 		break;
1390 	case NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_POLL:
1391 		rc = nvme_fabric_qpair_connect_poll(qpair);
1392 		if (rc == 0) {
1393 			rqpair->state = NVME_RDMA_QPAIR_STATE_RUNNING;
1394 			nvme_qpair_set_state(qpair, NVME_QPAIR_CONNECTED);
1395 		} else if (rc != -EAGAIN) {
1396 			SPDK_ERRLOG("Failed to poll NVMe-oF Fabric CONNECT command\n");
1397 		}
1398 		break;
1399 	case NVME_RDMA_QPAIR_STATE_RUNNING:
1400 		rc = 0;
1401 		break;
1402 	default:
1403 		assert(false);
1404 		rc = -EINVAL;
1405 		break;
1406 	}
1407 
1408 	rqpair->in_connect_poll = false;
1409 
1410 	return rc;
1411 }
1412 
1413 static inline int
1414 nvme_rdma_get_memory_translation(struct nvme_request *req, struct nvme_rdma_qpair *rqpair,
1415 				 struct nvme_rdma_memory_translation_ctx *_ctx)
1416 {
1417 	struct spdk_memory_domain_translation_ctx ctx;
1418 	struct spdk_memory_domain_translation_result dma_translation = {.iov_count = 0};
1419 	struct spdk_rdma_memory_translation rdma_translation;
1420 	int rc;
1421 
1422 	assert(req);
1423 	assert(rqpair);
1424 	assert(_ctx);
1425 
1426 	if (req->payload.opts && req->payload.opts->memory_domain) {
1427 		ctx.size = sizeof(struct spdk_memory_domain_translation_ctx);
1428 		ctx.rdma.ibv_qp = rqpair->rdma_qp->qp;
1429 		dma_translation.size = sizeof(struct spdk_memory_domain_translation_result);
1430 
1431 		rc = spdk_memory_domain_translate_data(req->payload.opts->memory_domain,
1432 						       req->payload.opts->memory_domain_ctx,
1433 						       rqpair->memory_domain->domain, &ctx, _ctx->addr,
1434 						       _ctx->length, &dma_translation);
1435 		if (spdk_unlikely(rc) || dma_translation.iov_count != 1) {
1436 			SPDK_ERRLOG("DMA memory translation failed, rc %d, iov count %u\n", rc, dma_translation.iov_count);
1437 			return rc;
1438 		}
1439 
1440 		_ctx->lkey = dma_translation.rdma.lkey;
1441 		_ctx->rkey = dma_translation.rdma.rkey;
1442 		_ctx->addr = dma_translation.iov.iov_base;
1443 		_ctx->length = dma_translation.iov.iov_len;
1444 	} else {
1445 		rc = spdk_rdma_get_translation(rqpair->mr_map, _ctx->addr, _ctx->length, &rdma_translation);
1446 		if (spdk_unlikely(rc)) {
1447 			SPDK_ERRLOG("RDMA memory translation failed, rc %d\n", rc);
1448 			return rc;
1449 		}
1450 		if (rdma_translation.translation_type == SPDK_RDMA_TRANSLATION_MR) {
1451 			_ctx->lkey = rdma_translation.mr_or_key.mr->lkey;
1452 			_ctx->rkey = rdma_translation.mr_or_key.mr->rkey;
1453 		} else {
1454 			_ctx->lkey = _ctx->rkey = (uint32_t)rdma_translation.mr_or_key.key;
1455 		}
1456 	}
1457 
1458 	return 0;
1459 }
1460 
1461 
1462 /*
1463  * Build SGL describing empty payload.
1464  */
1465 static int
1466 nvme_rdma_build_null_request(struct spdk_nvme_rdma_req *rdma_req)
1467 {
1468 	struct nvme_request *req = rdma_req->req;
1469 
1470 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1471 
1472 	/* The first element of this SGL is pointing at an
1473 	 * spdk_nvmf_cmd object. For this particular command,
1474 	 * we only need the first 64 bytes corresponding to
1475 	 * the NVMe command. */
1476 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1477 
1478 	/* The RDMA SGL needs one element describing the NVMe command. */
1479 	rdma_req->send_wr.num_sge = 1;
1480 
1481 	req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1482 	req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1483 	req->cmd.dptr.sgl1.keyed.length = 0;
1484 	req->cmd.dptr.sgl1.keyed.key = 0;
1485 	req->cmd.dptr.sgl1.address = 0;
1486 
1487 	return 0;
1488 }
1489 
1490 /*
1491  * Build inline SGL describing contiguous payload buffer.
1492  */
1493 static int
1494 nvme_rdma_build_contig_inline_request(struct nvme_rdma_qpair *rqpair,
1495 				      struct spdk_nvme_rdma_req *rdma_req)
1496 {
1497 	struct nvme_request *req = rdma_req->req;
1498 	struct nvme_rdma_memory_translation_ctx ctx = {
1499 		.addr = (uint8_t *)req->payload.contig_or_cb_arg + req->payload_offset,
1500 		.length = req->payload_size
1501 	};
1502 	int rc;
1503 
1504 	assert(ctx.length != 0);
1505 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
1506 
1507 	rc = nvme_rdma_get_memory_translation(req, rqpair, &ctx);
1508 	if (spdk_unlikely(rc)) {
1509 		return -1;
1510 	}
1511 
1512 	rdma_req->send_sgl[1].lkey = ctx.lkey;
1513 
1514 	/* The first element of this SGL is pointing at an
1515 	 * spdk_nvmf_cmd object. For this particular command,
1516 	 * we only need the first 64 bytes corresponding to
1517 	 * the NVMe command. */
1518 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1519 
1520 	rdma_req->send_sgl[1].addr = (uint64_t)ctx.addr;
1521 	rdma_req->send_sgl[1].length = (uint32_t)ctx.length;
1522 
1523 	/* The RDMA SGL contains two elements. The first describes
1524 	 * the NVMe command and the second describes the data
1525 	 * payload. */
1526 	rdma_req->send_wr.num_sge = 2;
1527 
1528 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1529 	req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
1530 	req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1531 	req->cmd.dptr.sgl1.unkeyed.length = (uint32_t)ctx.length;
1532 	/* Inline only supported for icdoff == 0 currently.  This function will
1533 	 * not get called for controllers with other values. */
1534 	req->cmd.dptr.sgl1.address = (uint64_t)0;
1535 
1536 	return 0;
1537 }
1538 
1539 /*
1540  * Build SGL describing contiguous payload buffer.
1541  */
1542 static int
1543 nvme_rdma_build_contig_request(struct nvme_rdma_qpair *rqpair,
1544 			       struct spdk_nvme_rdma_req *rdma_req)
1545 {
1546 	struct nvme_request *req = rdma_req->req;
1547 	struct nvme_rdma_memory_translation_ctx ctx = {
1548 		.addr = (uint8_t *)req->payload.contig_or_cb_arg + req->payload_offset,
1549 		.length = req->payload_size
1550 	};
1551 	int rc;
1552 
1553 	assert(req->payload_size != 0);
1554 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
1555 
1556 	if (spdk_unlikely(req->payload_size > NVME_RDMA_MAX_KEYED_SGL_LENGTH)) {
1557 		SPDK_ERRLOG("SGL length %u exceeds max keyed SGL block size %u\n",
1558 			    req->payload_size, NVME_RDMA_MAX_KEYED_SGL_LENGTH);
1559 		return -1;
1560 	}
1561 
1562 	rc = nvme_rdma_get_memory_translation(req, rqpair, &ctx);
1563 	if (spdk_unlikely(rc)) {
1564 		return -1;
1565 	}
1566 
1567 	req->cmd.dptr.sgl1.keyed.key = ctx.rkey;
1568 
1569 	/* The first element of this SGL is pointing at an
1570 	 * spdk_nvmf_cmd object. For this particular command,
1571 	 * we only need the first 64 bytes corresponding to
1572 	 * the NVMe command. */
1573 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1574 
1575 	/* The RDMA SGL needs one element describing the NVMe command. */
1576 	rdma_req->send_wr.num_sge = 1;
1577 
1578 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1579 	req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1580 	req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1581 	req->cmd.dptr.sgl1.keyed.length = (uint32_t)ctx.length;
1582 	req->cmd.dptr.sgl1.address = (uint64_t)ctx.addr;
1583 
1584 	return 0;
1585 }
1586 
1587 /*
1588  * Build SGL describing scattered payload buffer.
1589  */
1590 static int
1591 nvme_rdma_build_sgl_request(struct nvme_rdma_qpair *rqpair,
1592 			    struct spdk_nvme_rdma_req *rdma_req)
1593 {
1594 	struct nvme_request *req = rdma_req->req;
1595 	struct spdk_nvmf_cmd *cmd = &rqpair->cmds[rdma_req->id];
1596 	struct nvme_rdma_memory_translation_ctx ctx;
1597 	uint32_t remaining_size;
1598 	uint32_t sge_length;
1599 	int rc, max_num_sgl, num_sgl_desc;
1600 
1601 	assert(req->payload_size != 0);
1602 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
1603 	assert(req->payload.reset_sgl_fn != NULL);
1604 	assert(req->payload.next_sge_fn != NULL);
1605 	req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
1606 
1607 	max_num_sgl = req->qpair->ctrlr->max_sges;
1608 
1609 	remaining_size = req->payload_size;
1610 	num_sgl_desc = 0;
1611 	do {
1612 		rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &ctx.addr, &sge_length);
1613 		if (rc) {
1614 			return -1;
1615 		}
1616 
1617 		sge_length = spdk_min(remaining_size, sge_length);
1618 
1619 		if (spdk_unlikely(sge_length > NVME_RDMA_MAX_KEYED_SGL_LENGTH)) {
1620 			SPDK_ERRLOG("SGL length %u exceeds max keyed SGL block size %u\n",
1621 				    sge_length, NVME_RDMA_MAX_KEYED_SGL_LENGTH);
1622 			return -1;
1623 		}
1624 		ctx.length = sge_length;
1625 		rc = nvme_rdma_get_memory_translation(req, rqpair, &ctx);
1626 		if (spdk_unlikely(rc)) {
1627 			return -1;
1628 		}
1629 
1630 		cmd->sgl[num_sgl_desc].keyed.key = ctx.rkey;
1631 		cmd->sgl[num_sgl_desc].keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1632 		cmd->sgl[num_sgl_desc].keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1633 		cmd->sgl[num_sgl_desc].keyed.length = (uint32_t)ctx.length;
1634 		cmd->sgl[num_sgl_desc].address = (uint64_t)ctx.addr;
1635 
1636 		remaining_size -= ctx.length;
1637 		num_sgl_desc++;
1638 	} while (remaining_size > 0 && num_sgl_desc < max_num_sgl);
1639 
1640 
1641 	/* Should be impossible if we did our sgl checks properly up the stack, but do a sanity check here. */
1642 	if (remaining_size > 0) {
1643 		return -1;
1644 	}
1645 
1646 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1647 
1648 	/* The RDMA SGL needs one element describing some portion
1649 	 * of the spdk_nvmf_cmd structure. */
1650 	rdma_req->send_wr.num_sge = 1;
1651 
1652 	/*
1653 	 * If only one SGL descriptor is required, it can be embedded directly in the command
1654 	 * as a data block descriptor.
1655 	 */
1656 	if (num_sgl_desc == 1) {
1657 		/* The first element of this SGL is pointing at an
1658 		 * spdk_nvmf_cmd object. For this particular command,
1659 		 * we only need the first 64 bytes corresponding to
1660 		 * the NVMe command. */
1661 		rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1662 
1663 		req->cmd.dptr.sgl1.keyed.type = cmd->sgl[0].keyed.type;
1664 		req->cmd.dptr.sgl1.keyed.subtype = cmd->sgl[0].keyed.subtype;
1665 		req->cmd.dptr.sgl1.keyed.length = cmd->sgl[0].keyed.length;
1666 		req->cmd.dptr.sgl1.keyed.key = cmd->sgl[0].keyed.key;
1667 		req->cmd.dptr.sgl1.address = cmd->sgl[0].address;
1668 	} else {
1669 		/*
1670 		 * Otherwise, The SGL descriptor embedded in the command must point to the list of
1671 		 * SGL descriptors used to describe the operation. In that case it is a last segment descriptor.
1672 		 */
1673 		uint32_t descriptors_size = sizeof(struct spdk_nvme_sgl_descriptor) * num_sgl_desc;
1674 
1675 		if (spdk_unlikely(descriptors_size > rqpair->qpair.ctrlr->ioccsz_bytes)) {
1676 			SPDK_ERRLOG("Size of SGL descriptors (%u) exceeds ICD (%u)\n",
1677 				    descriptors_size, rqpair->qpair.ctrlr->ioccsz_bytes);
1678 			return -1;
1679 		}
1680 		rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd) + descriptors_size;
1681 
1682 		req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_LAST_SEGMENT;
1683 		req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1684 		req->cmd.dptr.sgl1.unkeyed.length = descriptors_size;
1685 		req->cmd.dptr.sgl1.address = (uint64_t)0;
1686 	}
1687 
1688 	return 0;
1689 }
1690 
1691 /*
1692  * Build inline SGL describing sgl payload buffer.
1693  */
1694 static int
1695 nvme_rdma_build_sgl_inline_request(struct nvme_rdma_qpair *rqpair,
1696 				   struct spdk_nvme_rdma_req *rdma_req)
1697 {
1698 	struct nvme_request *req = rdma_req->req;
1699 	struct nvme_rdma_memory_translation_ctx ctx;
1700 	uint32_t length;
1701 	int rc;
1702 
1703 	assert(req->payload_size != 0);
1704 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
1705 	assert(req->payload.reset_sgl_fn != NULL);
1706 	assert(req->payload.next_sge_fn != NULL);
1707 	req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
1708 
1709 	rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &ctx.addr, &length);
1710 	if (rc) {
1711 		return -1;
1712 	}
1713 
1714 	if (length < req->payload_size) {
1715 		SPDK_DEBUGLOG(nvme, "Inline SGL request split so sending separately.\n");
1716 		return nvme_rdma_build_sgl_request(rqpair, rdma_req);
1717 	}
1718 
1719 	if (length > req->payload_size) {
1720 		length = req->payload_size;
1721 	}
1722 
1723 	ctx.length = length;
1724 	rc = nvme_rdma_get_memory_translation(req, rqpair, &ctx);
1725 	if (spdk_unlikely(rc)) {
1726 		return -1;
1727 	}
1728 
1729 	rdma_req->send_sgl[1].addr = (uint64_t)ctx.addr;
1730 	rdma_req->send_sgl[1].length = (uint32_t)ctx.length;
1731 	rdma_req->send_sgl[1].lkey = ctx.lkey;
1732 
1733 	rdma_req->send_wr.num_sge = 2;
1734 
1735 	/* The first element of this SGL is pointing at an
1736 	 * spdk_nvmf_cmd object. For this particular command,
1737 	 * we only need the first 64 bytes corresponding to
1738 	 * the NVMe command. */
1739 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1740 
1741 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1742 	req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
1743 	req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1744 	req->cmd.dptr.sgl1.unkeyed.length = (uint32_t)ctx.length;
1745 	/* Inline only supported for icdoff == 0 currently.  This function will
1746 	 * not get called for controllers with other values. */
1747 	req->cmd.dptr.sgl1.address = (uint64_t)0;
1748 
1749 	return 0;
1750 }
1751 
1752 static int
1753 nvme_rdma_req_init(struct nvme_rdma_qpair *rqpair, struct nvme_request *req,
1754 		   struct spdk_nvme_rdma_req *rdma_req)
1755 {
1756 	struct spdk_nvme_ctrlr *ctrlr = rqpair->qpair.ctrlr;
1757 	enum nvme_payload_type payload_type;
1758 	bool icd_supported;
1759 	int rc;
1760 
1761 	assert(rdma_req->req == NULL);
1762 	rdma_req->req = req;
1763 	req->cmd.cid = rdma_req->id;
1764 	payload_type = nvme_payload_type(&req->payload);
1765 	/*
1766 	 * Check if icdoff is non zero, to avoid interop conflicts with
1767 	 * targets with non-zero icdoff.  Both SPDK and the Linux kernel
1768 	 * targets use icdoff = 0.  For targets with non-zero icdoff, we
1769 	 * will currently just not use inline data for now.
1770 	 */
1771 	icd_supported = spdk_nvme_opc_get_data_transfer(req->cmd.opc) == SPDK_NVME_DATA_HOST_TO_CONTROLLER
1772 			&& req->payload_size <= ctrlr->ioccsz_bytes && ctrlr->icdoff == 0;
1773 
1774 	if (req->payload_size == 0) {
1775 		rc = nvme_rdma_build_null_request(rdma_req);
1776 	} else if (payload_type == NVME_PAYLOAD_TYPE_CONTIG) {
1777 		if (icd_supported) {
1778 			rc = nvme_rdma_build_contig_inline_request(rqpair, rdma_req);
1779 		} else {
1780 			rc = nvme_rdma_build_contig_request(rqpair, rdma_req);
1781 		}
1782 	} else if (payload_type == NVME_PAYLOAD_TYPE_SGL) {
1783 		if (icd_supported) {
1784 			rc = nvme_rdma_build_sgl_inline_request(rqpair, rdma_req);
1785 		} else {
1786 			rc = nvme_rdma_build_sgl_request(rqpair, rdma_req);
1787 		}
1788 	} else {
1789 		rc = -1;
1790 	}
1791 
1792 	if (rc) {
1793 		rdma_req->req = NULL;
1794 		return rc;
1795 	}
1796 
1797 	memcpy(&rqpair->cmds[rdma_req->id], &req->cmd, sizeof(req->cmd));
1798 	return 0;
1799 }
1800 
1801 static struct spdk_nvme_qpair *
1802 nvme_rdma_ctrlr_create_qpair(struct spdk_nvme_ctrlr *ctrlr,
1803 			     uint16_t qid, uint32_t qsize,
1804 			     enum spdk_nvme_qprio qprio,
1805 			     uint32_t num_requests,
1806 			     bool delay_cmd_submit,
1807 			     bool async)
1808 {
1809 	struct nvme_rdma_qpair *rqpair;
1810 	struct spdk_nvme_qpair *qpair;
1811 	int rc;
1812 
1813 	if (qsize < SPDK_NVME_QUEUE_MIN_ENTRIES) {
1814 		SPDK_ERRLOG("Failed to create qpair with size %u. Minimum queue size is %d.\n",
1815 			    qsize, SPDK_NVME_QUEUE_MIN_ENTRIES);
1816 		return NULL;
1817 	}
1818 
1819 	rqpair = spdk_zmalloc(sizeof(struct nvme_rdma_qpair), 0, NULL, SPDK_ENV_SOCKET_ID_ANY,
1820 			      SPDK_MALLOC_DMA);
1821 	if (!rqpair) {
1822 		SPDK_ERRLOG("failed to get create rqpair\n");
1823 		return NULL;
1824 	}
1825 
1826 	/* Set num_entries one less than queue size. According to NVMe
1827 	 * and NVMe-oF specs we can not submit queue size requests,
1828 	 * one slot shall always remain empty.
1829 	 */
1830 	rqpair->num_entries = qsize - 1;
1831 	rqpair->delay_cmd_submit = delay_cmd_submit;
1832 	rqpair->state = NVME_RDMA_QPAIR_STATE_INVALID;
1833 	qpair = &rqpair->qpair;
1834 	rc = nvme_qpair_init(qpair, qid, ctrlr, qprio, num_requests, async);
1835 	if (rc != 0) {
1836 		spdk_free(rqpair);
1837 		return NULL;
1838 	}
1839 
1840 	return qpair;
1841 }
1842 
1843 static void
1844 nvme_rdma_qpair_destroy(struct nvme_rdma_qpair *rqpair)
1845 {
1846 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
1847 	struct nvme_rdma_ctrlr *rctrlr;
1848 	struct nvme_rdma_cm_event_entry *entry, *tmp;
1849 
1850 	spdk_rdma_free_mem_map(&rqpair->mr_map);
1851 
1852 	if (rqpair->evt) {
1853 		rdma_ack_cm_event(rqpair->evt);
1854 		rqpair->evt = NULL;
1855 	}
1856 
1857 	/*
1858 	 * This works because we have the controller lock both in
1859 	 * this function and in the function where we add new events.
1860 	 */
1861 	if (qpair->ctrlr != NULL) {
1862 		rctrlr = nvme_rdma_ctrlr(qpair->ctrlr);
1863 		STAILQ_FOREACH_SAFE(entry, &rctrlr->pending_cm_events, link, tmp) {
1864 			if (entry->evt->id->context == rqpair) {
1865 				STAILQ_REMOVE(&rctrlr->pending_cm_events, entry, nvme_rdma_cm_event_entry, link);
1866 				rdma_ack_cm_event(entry->evt);
1867 				STAILQ_INSERT_HEAD(&rctrlr->free_cm_events, entry, link);
1868 			}
1869 		}
1870 	}
1871 
1872 	if (rqpair->cm_id) {
1873 		if (rqpair->rdma_qp) {
1874 			spdk_rdma_put_pd(rqpair->rdma_qp->qp->pd);
1875 			spdk_rdma_qp_destroy(rqpair->rdma_qp);
1876 			rqpair->rdma_qp = NULL;
1877 		}
1878 	}
1879 
1880 	if (rqpair->poller) {
1881 		struct nvme_rdma_poll_group     *group;
1882 
1883 		assert(qpair->poll_group);
1884 		group = nvme_rdma_poll_group(qpair->poll_group);
1885 
1886 		nvme_rdma_poll_group_put_poller(group, rqpair->poller);
1887 
1888 		rqpair->poller = NULL;
1889 		rqpair->cq = NULL;
1890 		if (rqpair->srq) {
1891 			rqpair->srq = NULL;
1892 			rqpair->rsps = NULL;
1893 		}
1894 	} else if (rqpair->cq) {
1895 		ibv_destroy_cq(rqpair->cq);
1896 		rqpair->cq = NULL;
1897 	}
1898 
1899 	nvme_rdma_free_reqs(rqpair);
1900 	nvme_rdma_free_rsps(rqpair->rsps);
1901 	rqpair->rsps = NULL;
1902 
1903 	/* destroy cm_id last so cma device will not be freed before we destroy the cq. */
1904 	if (rqpair->cm_id) {
1905 		rdma_destroy_id(rqpair->cm_id);
1906 		rqpair->cm_id = NULL;
1907 	}
1908 }
1909 
1910 static void nvme_rdma_qpair_abort_reqs(struct spdk_nvme_qpair *qpair, uint32_t dnr);
1911 
1912 static int
1913 nvme_rdma_qpair_disconnected(struct nvme_rdma_qpair *rqpair, int ret)
1914 {
1915 	if (ret) {
1916 		SPDK_DEBUGLOG(nvme, "Target did not respond to qpair disconnect.\n");
1917 		goto quiet;
1918 	}
1919 
1920 	if (rqpair->poller == NULL) {
1921 		/* If poller is not used, cq is not shared.
1922 		 * So complete disconnecting qpair immediately.
1923 		 */
1924 		goto quiet;
1925 	}
1926 
1927 	if (rqpair->rsps == NULL) {
1928 		goto quiet;
1929 	}
1930 
1931 	if (rqpair->need_destroy ||
1932 	    (rqpair->current_num_sends != 0 ||
1933 	     (!rqpair->srq && rqpair->rsps->current_num_recvs != 0))) {
1934 		rqpair->state = NVME_RDMA_QPAIR_STATE_LINGERING;
1935 		rqpair->evt_timeout_ticks = (NVME_RDMA_DISCONNECTED_QPAIR_TIMEOUT_US * spdk_get_ticks_hz()) /
1936 					    SPDK_SEC_TO_USEC + spdk_get_ticks();
1937 
1938 		return -EAGAIN;
1939 	}
1940 
1941 quiet:
1942 	rqpair->state = NVME_RDMA_QPAIR_STATE_EXITED;
1943 
1944 	nvme_rdma_qpair_abort_reqs(&rqpair->qpair, 0);
1945 	nvme_rdma_qpair_destroy(rqpair);
1946 	nvme_transport_ctrlr_disconnect_qpair_done(&rqpair->qpair);
1947 
1948 	return 0;
1949 }
1950 
1951 static int
1952 nvme_rdma_qpair_wait_until_quiet(struct nvme_rdma_qpair *rqpair)
1953 {
1954 	if (spdk_get_ticks() < rqpair->evt_timeout_ticks &&
1955 	    (rqpair->current_num_sends != 0 ||
1956 	     (!rqpair->srq && rqpair->rsps->current_num_recvs != 0))) {
1957 		return -EAGAIN;
1958 	}
1959 
1960 	rqpair->state = NVME_RDMA_QPAIR_STATE_EXITED;
1961 
1962 	nvme_rdma_qpair_abort_reqs(&rqpair->qpair, 0);
1963 	nvme_rdma_qpair_destroy(rqpair);
1964 	nvme_transport_ctrlr_disconnect_qpair_done(&rqpair->qpair);
1965 
1966 	return 0;
1967 }
1968 
1969 static void
1970 _nvme_rdma_ctrlr_disconnect_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair,
1971 				  nvme_rdma_cm_event_cb disconnected_qpair_cb)
1972 {
1973 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
1974 	int rc;
1975 
1976 	assert(disconnected_qpair_cb != NULL);
1977 
1978 	rqpair->state = NVME_RDMA_QPAIR_STATE_EXITING;
1979 
1980 	if (rqpair->cm_id) {
1981 		if (rqpair->rdma_qp) {
1982 			rc = spdk_rdma_qp_disconnect(rqpair->rdma_qp);
1983 			if ((qpair->ctrlr != NULL) && (rc == 0)) {
1984 				rc = nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_DISCONNECTED,
1985 								   disconnected_qpair_cb);
1986 				if (rc == 0) {
1987 					return;
1988 				}
1989 			}
1990 		}
1991 	}
1992 
1993 	disconnected_qpair_cb(rqpair, 0);
1994 }
1995 
1996 static int
1997 nvme_rdma_ctrlr_disconnect_qpair_poll(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
1998 {
1999 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2000 	int rc;
2001 
2002 	switch (rqpair->state) {
2003 	case NVME_RDMA_QPAIR_STATE_EXITING:
2004 		if (!nvme_qpair_is_admin_queue(qpair)) {
2005 			nvme_robust_mutex_lock(&ctrlr->ctrlr_lock);
2006 		}
2007 
2008 		rc = nvme_rdma_process_event_poll(rqpair);
2009 
2010 		if (!nvme_qpair_is_admin_queue(qpair)) {
2011 			nvme_robust_mutex_unlock(&ctrlr->ctrlr_lock);
2012 		}
2013 		break;
2014 
2015 	case NVME_RDMA_QPAIR_STATE_LINGERING:
2016 		rc = nvme_rdma_qpair_wait_until_quiet(rqpair);
2017 		break;
2018 	case NVME_RDMA_QPAIR_STATE_EXITED:
2019 		rc = 0;
2020 		break;
2021 
2022 	default:
2023 		assert(false);
2024 		rc = -EAGAIN;
2025 		break;
2026 	}
2027 
2028 	return rc;
2029 }
2030 
2031 static void
2032 nvme_rdma_ctrlr_disconnect_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
2033 {
2034 	int rc;
2035 
2036 	_nvme_rdma_ctrlr_disconnect_qpair(ctrlr, qpair, nvme_rdma_qpair_disconnected);
2037 
2038 	/* If the async mode is disabled, poll the qpair until it is actually disconnected.
2039 	 * It is ensured that poll_group_process_completions() calls disconnected_qpair_cb
2040 	 * for any disconnected qpair. Hence, we do not have to check if the qpair is in
2041 	 * a poll group or not.
2042 	 * At the same time, if the qpair is being destroyed, i.e. this function is called by
2043 	 * spdk_nvme_ctrlr_free_io_qpair then we need to wait until qpair is disconnected, otherwise
2044 	 * we may leak some resources.
2045 	 */
2046 	if (qpair->async && !qpair->destroy_in_progress) {
2047 		return;
2048 	}
2049 
2050 	while (1) {
2051 		rc = nvme_rdma_ctrlr_disconnect_qpair_poll(ctrlr, qpair);
2052 		if (rc != -EAGAIN) {
2053 			break;
2054 		}
2055 	}
2056 }
2057 
2058 static int
2059 nvme_rdma_stale_conn_disconnected(struct nvme_rdma_qpair *rqpair, int ret)
2060 {
2061 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
2062 
2063 	if (ret) {
2064 		SPDK_DEBUGLOG(nvme, "Target did not respond to qpair disconnect.\n");
2065 	}
2066 
2067 	nvme_rdma_qpair_destroy(rqpair);
2068 
2069 	qpair->last_transport_failure_reason = qpair->transport_failure_reason;
2070 	qpair->transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_NONE;
2071 
2072 	rqpair->state = NVME_RDMA_QPAIR_STATE_STALE_CONN;
2073 	rqpair->evt_timeout_ticks = (NVME_RDMA_STALE_CONN_RETRY_DELAY_US * spdk_get_ticks_hz()) /
2074 				    SPDK_SEC_TO_USEC + spdk_get_ticks();
2075 
2076 	return 0;
2077 }
2078 
2079 static int
2080 nvme_rdma_stale_conn_retry(struct nvme_rdma_qpair *rqpair)
2081 {
2082 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
2083 
2084 	if (rqpair->stale_conn_retry_count >= NVME_RDMA_STALE_CONN_RETRY_MAX) {
2085 		SPDK_ERRLOG("Retry failed %d times, give up stale connection to qpair (cntlid:%u, qid:%u).\n",
2086 			    NVME_RDMA_STALE_CONN_RETRY_MAX, qpair->ctrlr->cntlid, qpair->id);
2087 		return -ESTALE;
2088 	}
2089 
2090 	rqpair->stale_conn_retry_count++;
2091 
2092 	SPDK_NOTICELOG("%d times, retry stale connection to qpair (cntlid:%u, qid:%u).\n",
2093 		       rqpair->stale_conn_retry_count, qpair->ctrlr->cntlid, qpair->id);
2094 
2095 	_nvme_rdma_ctrlr_disconnect_qpair(qpair->ctrlr, qpair, nvme_rdma_stale_conn_disconnected);
2096 
2097 	return 0;
2098 }
2099 
2100 static int
2101 nvme_rdma_ctrlr_delete_io_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
2102 {
2103 	struct nvme_rdma_qpair *rqpair;
2104 
2105 	assert(qpair != NULL);
2106 	rqpair = nvme_rdma_qpair(qpair);
2107 
2108 	if (rqpair->state != NVME_RDMA_QPAIR_STATE_EXITED) {
2109 		int rc __attribute__((unused));
2110 
2111 		/* qpair was removed from the poll group while the disconnect is not finished.
2112 		 * Destroy rdma resources forcefully. */
2113 		rc = nvme_rdma_qpair_disconnected(rqpair, 0);
2114 		assert(rc == 0);
2115 	}
2116 
2117 	nvme_rdma_qpair_abort_reqs(qpair, 0);
2118 	nvme_qpair_deinit(qpair);
2119 
2120 	nvme_rdma_put_memory_domain(rqpair->memory_domain);
2121 
2122 	spdk_free(rqpair);
2123 
2124 	return 0;
2125 }
2126 
2127 static struct spdk_nvme_qpair *
2128 nvme_rdma_ctrlr_create_io_qpair(struct spdk_nvme_ctrlr *ctrlr, uint16_t qid,
2129 				const struct spdk_nvme_io_qpair_opts *opts)
2130 {
2131 	return nvme_rdma_ctrlr_create_qpair(ctrlr, qid, opts->io_queue_size, opts->qprio,
2132 					    opts->io_queue_requests,
2133 					    opts->delay_cmd_submit,
2134 					    opts->async_mode);
2135 }
2136 
2137 static int
2138 nvme_rdma_ctrlr_enable(struct spdk_nvme_ctrlr *ctrlr)
2139 {
2140 	/* do nothing here */
2141 	return 0;
2142 }
2143 
2144 static int nvme_rdma_ctrlr_destruct(struct spdk_nvme_ctrlr *ctrlr);
2145 
2146 /* We have to use the typedef in the function declaration to appease astyle. */
2147 typedef struct spdk_nvme_ctrlr spdk_nvme_ctrlr_t;
2148 
2149 static spdk_nvme_ctrlr_t *
2150 nvme_rdma_ctrlr_construct(const struct spdk_nvme_transport_id *trid,
2151 			  const struct spdk_nvme_ctrlr_opts *opts,
2152 			  void *devhandle)
2153 {
2154 	struct nvme_rdma_ctrlr *rctrlr;
2155 	struct ibv_context **contexts;
2156 	struct ibv_device_attr dev_attr;
2157 	int i, flag, rc;
2158 
2159 	rctrlr = spdk_zmalloc(sizeof(struct nvme_rdma_ctrlr), 0, NULL, SPDK_ENV_SOCKET_ID_ANY,
2160 			      SPDK_MALLOC_DMA);
2161 	if (rctrlr == NULL) {
2162 		SPDK_ERRLOG("could not allocate ctrlr\n");
2163 		return NULL;
2164 	}
2165 
2166 	rctrlr->ctrlr.opts = *opts;
2167 	rctrlr->ctrlr.trid = *trid;
2168 
2169 	if (opts->transport_retry_count > NVME_RDMA_CTRLR_MAX_TRANSPORT_RETRY_COUNT) {
2170 		SPDK_NOTICELOG("transport_retry_count exceeds max value %d, use max value\n",
2171 			       NVME_RDMA_CTRLR_MAX_TRANSPORT_RETRY_COUNT);
2172 		rctrlr->ctrlr.opts.transport_retry_count = NVME_RDMA_CTRLR_MAX_TRANSPORT_RETRY_COUNT;
2173 	}
2174 
2175 	if (opts->transport_ack_timeout > NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT) {
2176 		SPDK_NOTICELOG("transport_ack_timeout exceeds max value %d, use max value\n",
2177 			       NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT);
2178 		rctrlr->ctrlr.opts.transport_ack_timeout = NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT;
2179 	}
2180 
2181 	contexts = rdma_get_devices(NULL);
2182 	if (contexts == NULL) {
2183 		SPDK_ERRLOG("rdma_get_devices() failed: %s (%d)\n", spdk_strerror(errno), errno);
2184 		spdk_free(rctrlr);
2185 		return NULL;
2186 	}
2187 
2188 	i = 0;
2189 	rctrlr->max_sge = NVME_RDMA_MAX_SGL_DESCRIPTORS;
2190 
2191 	while (contexts[i] != NULL) {
2192 		rc = ibv_query_device(contexts[i], &dev_attr);
2193 		if (rc < 0) {
2194 			SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
2195 			rdma_free_devices(contexts);
2196 			spdk_free(rctrlr);
2197 			return NULL;
2198 		}
2199 		rctrlr->max_sge = spdk_min(rctrlr->max_sge, (uint16_t)dev_attr.max_sge);
2200 		i++;
2201 	}
2202 
2203 	rdma_free_devices(contexts);
2204 
2205 	rc = nvme_ctrlr_construct(&rctrlr->ctrlr);
2206 	if (rc != 0) {
2207 		spdk_free(rctrlr);
2208 		return NULL;
2209 	}
2210 
2211 	STAILQ_INIT(&rctrlr->pending_cm_events);
2212 	STAILQ_INIT(&rctrlr->free_cm_events);
2213 	rctrlr->cm_events = spdk_zmalloc(NVME_RDMA_NUM_CM_EVENTS * sizeof(*rctrlr->cm_events), 0, NULL,
2214 					 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
2215 	if (rctrlr->cm_events == NULL) {
2216 		SPDK_ERRLOG("unable to allocate buffers to hold CM events.\n");
2217 		goto destruct_ctrlr;
2218 	}
2219 
2220 	for (i = 0; i < NVME_RDMA_NUM_CM_EVENTS; i++) {
2221 		STAILQ_INSERT_TAIL(&rctrlr->free_cm_events, &rctrlr->cm_events[i], link);
2222 	}
2223 
2224 	rctrlr->cm_channel = rdma_create_event_channel();
2225 	if (rctrlr->cm_channel == NULL) {
2226 		SPDK_ERRLOG("rdma_create_event_channel() failed\n");
2227 		goto destruct_ctrlr;
2228 	}
2229 
2230 	flag = fcntl(rctrlr->cm_channel->fd, F_GETFL);
2231 	if (fcntl(rctrlr->cm_channel->fd, F_SETFL, flag | O_NONBLOCK) < 0) {
2232 		SPDK_ERRLOG("Cannot set event channel to non blocking\n");
2233 		goto destruct_ctrlr;
2234 	}
2235 
2236 	rctrlr->ctrlr.adminq = nvme_rdma_ctrlr_create_qpair(&rctrlr->ctrlr, 0,
2237 			       rctrlr->ctrlr.opts.admin_queue_size, 0,
2238 			       rctrlr->ctrlr.opts.admin_queue_size, false, true);
2239 	if (!rctrlr->ctrlr.adminq) {
2240 		SPDK_ERRLOG("failed to create admin qpair\n");
2241 		goto destruct_ctrlr;
2242 	}
2243 
2244 	if (nvme_ctrlr_add_process(&rctrlr->ctrlr, 0) != 0) {
2245 		SPDK_ERRLOG("nvme_ctrlr_add_process() failed\n");
2246 		goto destruct_ctrlr;
2247 	}
2248 
2249 	SPDK_DEBUGLOG(nvme, "successfully initialized the nvmf ctrlr\n");
2250 	return &rctrlr->ctrlr;
2251 
2252 destruct_ctrlr:
2253 	nvme_ctrlr_destruct(&rctrlr->ctrlr);
2254 	return NULL;
2255 }
2256 
2257 static int
2258 nvme_rdma_ctrlr_destruct(struct spdk_nvme_ctrlr *ctrlr)
2259 {
2260 	struct nvme_rdma_ctrlr *rctrlr = nvme_rdma_ctrlr(ctrlr);
2261 	struct nvme_rdma_cm_event_entry *entry;
2262 
2263 	if (ctrlr->adminq) {
2264 		nvme_rdma_ctrlr_delete_io_qpair(ctrlr, ctrlr->adminq);
2265 	}
2266 
2267 	STAILQ_FOREACH(entry, &rctrlr->pending_cm_events, link) {
2268 		rdma_ack_cm_event(entry->evt);
2269 	}
2270 
2271 	STAILQ_INIT(&rctrlr->free_cm_events);
2272 	STAILQ_INIT(&rctrlr->pending_cm_events);
2273 	spdk_free(rctrlr->cm_events);
2274 
2275 	if (rctrlr->cm_channel) {
2276 		rdma_destroy_event_channel(rctrlr->cm_channel);
2277 		rctrlr->cm_channel = NULL;
2278 	}
2279 
2280 	nvme_ctrlr_destruct_finish(ctrlr);
2281 
2282 	spdk_free(rctrlr);
2283 
2284 	return 0;
2285 }
2286 
2287 static int
2288 nvme_rdma_qpair_submit_request(struct spdk_nvme_qpair *qpair,
2289 			       struct nvme_request *req)
2290 {
2291 	struct nvme_rdma_qpair *rqpair;
2292 	struct spdk_nvme_rdma_req *rdma_req;
2293 	struct ibv_send_wr *wr;
2294 
2295 	rqpair = nvme_rdma_qpair(qpair);
2296 	assert(rqpair != NULL);
2297 	assert(req != NULL);
2298 
2299 	rdma_req = nvme_rdma_req_get(rqpair);
2300 	if (spdk_unlikely(!rdma_req)) {
2301 		if (rqpair->poller) {
2302 			rqpair->poller->stats.queued_requests++;
2303 		}
2304 		/* Inform the upper layer to try again later. */
2305 		return -EAGAIN;
2306 	}
2307 
2308 	if (nvme_rdma_req_init(rqpair, req, rdma_req)) {
2309 		SPDK_ERRLOG("nvme_rdma_req_init() failed\n");
2310 		TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link);
2311 		nvme_rdma_req_put(rqpair, rdma_req);
2312 		return -1;
2313 	}
2314 
2315 	assert(rqpair->current_num_sends < rqpair->num_entries);
2316 	rqpair->current_num_sends++;
2317 
2318 	wr = &rdma_req->send_wr;
2319 	wr->next = NULL;
2320 	nvme_rdma_trace_ibv_sge(wr->sg_list);
2321 
2322 	spdk_rdma_qp_queue_send_wrs(rqpair->rdma_qp, wr);
2323 
2324 	if (!rqpair->delay_cmd_submit) {
2325 		return nvme_rdma_qpair_submit_sends(rqpair);
2326 	}
2327 
2328 	return 0;
2329 }
2330 
2331 static int
2332 nvme_rdma_qpair_reset(struct spdk_nvme_qpair *qpair)
2333 {
2334 	/* Currently, doing nothing here */
2335 	return 0;
2336 }
2337 
2338 static void
2339 nvme_rdma_qpair_abort_reqs(struct spdk_nvme_qpair *qpair, uint32_t dnr)
2340 {
2341 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
2342 	struct spdk_nvme_cpl cpl;
2343 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2344 
2345 	cpl.sqid = qpair->id;
2346 	cpl.status.sc = SPDK_NVME_SC_ABORTED_SQ_DELETION;
2347 	cpl.status.sct = SPDK_NVME_SCT_GENERIC;
2348 	cpl.status.dnr = dnr;
2349 
2350 	/*
2351 	 * We cannot abort requests at the RDMA layer without
2352 	 * unregistering them. If we do, we can still get error
2353 	 * free completions on the shared completion queue.
2354 	 */
2355 	if (nvme_qpair_get_state(qpair) > NVME_QPAIR_DISCONNECTING &&
2356 	    nvme_qpair_get_state(qpair) != NVME_QPAIR_DESTROYING) {
2357 		nvme_ctrlr_disconnect_qpair(qpair);
2358 	}
2359 
2360 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
2361 		nvme_rdma_req_complete(rdma_req, &cpl, true);
2362 	}
2363 }
2364 
2365 static void
2366 nvme_rdma_qpair_check_timeout(struct spdk_nvme_qpair *qpair)
2367 {
2368 	uint64_t t02;
2369 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
2370 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2371 	struct spdk_nvme_ctrlr *ctrlr = qpair->ctrlr;
2372 	struct spdk_nvme_ctrlr_process *active_proc;
2373 
2374 	/* Don't check timeouts during controller initialization. */
2375 	if (ctrlr->state != NVME_CTRLR_STATE_READY) {
2376 		return;
2377 	}
2378 
2379 	if (nvme_qpair_is_admin_queue(qpair)) {
2380 		active_proc = nvme_ctrlr_get_current_process(ctrlr);
2381 	} else {
2382 		active_proc = qpair->active_proc;
2383 	}
2384 
2385 	/* Only check timeouts if the current process has a timeout callback. */
2386 	if (active_proc == NULL || active_proc->timeout_cb_fn == NULL) {
2387 		return;
2388 	}
2389 
2390 	t02 = spdk_get_ticks();
2391 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
2392 		assert(rdma_req->req != NULL);
2393 
2394 		if (nvme_request_check_timeout(rdma_req->req, rdma_req->id, active_proc, t02)) {
2395 			/*
2396 			 * The requests are in order, so as soon as one has not timed out,
2397 			 * stop iterating.
2398 			 */
2399 			break;
2400 		}
2401 	}
2402 }
2403 
2404 static inline void
2405 nvme_rdma_request_ready(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
2406 {
2407 	struct spdk_nvme_rdma_rsp *rdma_rsp = rdma_req->rdma_rsp;
2408 	struct ibv_recv_wr *recv_wr = rdma_rsp->recv_wr;
2409 
2410 	nvme_rdma_req_complete(rdma_req, &rdma_rsp->cpl, true);
2411 
2412 	assert(rqpair->rsps->current_num_recvs < rqpair->rsps->num_entries);
2413 	rqpair->rsps->current_num_recvs++;
2414 
2415 	recv_wr->next = NULL;
2416 	nvme_rdma_trace_ibv_sge(recv_wr->sg_list);
2417 
2418 	if (!rqpair->srq) {
2419 		spdk_rdma_qp_queue_recv_wrs(rqpair->rdma_qp, recv_wr);
2420 	} else {
2421 		spdk_rdma_srq_queue_recv_wrs(rqpair->srq, recv_wr);
2422 	}
2423 }
2424 
2425 #define MAX_COMPLETIONS_PER_POLL 128
2426 
2427 static void
2428 nvme_rdma_fail_qpair(struct spdk_nvme_qpair *qpair, int failure_reason)
2429 {
2430 	if (failure_reason == IBV_WC_RETRY_EXC_ERR) {
2431 		qpair->transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_REMOTE;
2432 	} else if (qpair->transport_failure_reason == SPDK_NVME_QPAIR_FAILURE_NONE) {
2433 		qpair->transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_UNKNOWN;
2434 	}
2435 
2436 	nvme_ctrlr_disconnect_qpair(qpair);
2437 }
2438 
2439 static struct nvme_rdma_qpair *
2440 get_rdma_qpair_from_wc(struct nvme_rdma_poll_group *group, struct ibv_wc *wc)
2441 {
2442 	struct spdk_nvme_qpair *qpair;
2443 	struct nvme_rdma_qpair *rqpair;
2444 
2445 	STAILQ_FOREACH(qpair, &group->group.connected_qpairs, poll_group_stailq) {
2446 		rqpair = nvme_rdma_qpair(qpair);
2447 		if (NVME_RDMA_POLL_GROUP_CHECK_QPN(rqpair, wc->qp_num)) {
2448 			return rqpair;
2449 		}
2450 	}
2451 
2452 	STAILQ_FOREACH(qpair, &group->group.disconnected_qpairs, poll_group_stailq) {
2453 		rqpair = nvme_rdma_qpair(qpair);
2454 		if (NVME_RDMA_POLL_GROUP_CHECK_QPN(rqpair, wc->qp_num)) {
2455 			return rqpair;
2456 		}
2457 	}
2458 
2459 	return NULL;
2460 }
2461 
2462 static inline void
2463 nvme_rdma_log_wc_status(struct nvme_rdma_qpair *rqpair, struct ibv_wc *wc)
2464 {
2465 	struct nvme_rdma_wr *rdma_wr = (struct nvme_rdma_wr *)wc->wr_id;
2466 
2467 	if (wc->status == IBV_WC_WR_FLUSH_ERR) {
2468 		/* If qpair is in ERR state, we will receive completions for all posted and not completed
2469 		 * Work Requests with IBV_WC_WR_FLUSH_ERR status. Don't log an error in that case */
2470 		SPDK_DEBUGLOG(nvme, "WC error, qid %u, qp state %d, request 0x%lu type %d, status: (%d): %s\n",
2471 			      rqpair->qpair.id, rqpair->qpair.state, wc->wr_id, rdma_wr->type, wc->status,
2472 			      ibv_wc_status_str(wc->status));
2473 	} else {
2474 		SPDK_ERRLOG("WC error, qid %u, qp state %d, request 0x%lu type %d, status: (%d): %s\n",
2475 			    rqpair->qpair.id, rqpair->qpair.state, wc->wr_id, rdma_wr->type, wc->status,
2476 			    ibv_wc_status_str(wc->status));
2477 	}
2478 }
2479 
2480 static inline int
2481 nvme_rdma_process_recv_completion(struct nvme_rdma_poller *poller, struct ibv_wc *wc,
2482 				  struct nvme_rdma_wr *rdma_wr)
2483 {
2484 	struct nvme_rdma_qpair		*rqpair;
2485 	struct spdk_nvme_rdma_req	*rdma_req;
2486 	struct spdk_nvme_rdma_rsp	*rdma_rsp;
2487 
2488 	rdma_rsp = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvme_rdma_rsp, rdma_wr);
2489 
2490 	if (poller && poller->srq) {
2491 		rqpair = get_rdma_qpair_from_wc(poller->group, wc);
2492 		if (spdk_unlikely(!rqpair)) {
2493 			/* Since we do not handle the LAST_WQE_REACHED event, we do not know when
2494 			 * a Receive Queue in a QP, that is associated with an SRQ, is flushed.
2495 			 * We may get a WC for a already destroyed QP.
2496 			 *
2497 			 * However, for the SRQ, this is not any error. Hence, just re-post the
2498 			 * receive request to the SRQ to reuse for other QPs, and return 0.
2499 			 */
2500 			spdk_rdma_srq_queue_recv_wrs(poller->srq, rdma_rsp->recv_wr);
2501 			return 0;
2502 		}
2503 	} else {
2504 		rqpair = rdma_rsp->rqpair;
2505 		if (spdk_unlikely(!rqpair)) {
2506 			/* TODO: Fix forceful QP destroy when it is not async mode.
2507 			 * CQ itself did not cause any error. Hence, return 0 for now.
2508 			 */
2509 			SPDK_WARNLOG("QP might be already destroyed.\n");
2510 			return 0;
2511 		}
2512 	}
2513 
2514 
2515 	assert(rqpair->rsps->current_num_recvs > 0);
2516 	rqpair->rsps->current_num_recvs--;
2517 
2518 	if (wc->status) {
2519 		nvme_rdma_log_wc_status(rqpair, wc);
2520 		goto err_wc;
2521 	}
2522 
2523 	SPDK_DEBUGLOG(nvme, "CQ recv completion\n");
2524 
2525 	if (wc->byte_len < sizeof(struct spdk_nvme_cpl)) {
2526 		SPDK_ERRLOG("recv length %u less than expected response size\n", wc->byte_len);
2527 		goto err_wc;
2528 	}
2529 	rdma_req = &rqpair->rdma_reqs[rdma_rsp->cpl.cid];
2530 	rdma_req->completion_flags |= NVME_RDMA_RECV_COMPLETED;
2531 	rdma_req->rdma_rsp = rdma_rsp;
2532 
2533 	if ((rdma_req->completion_flags & NVME_RDMA_SEND_COMPLETED) == 0) {
2534 		return 0;
2535 	}
2536 
2537 	nvme_rdma_request_ready(rqpair, rdma_req);
2538 
2539 	if (!rqpair->delay_cmd_submit) {
2540 		if (spdk_unlikely(nvme_rdma_qpair_submit_recvs(rqpair))) {
2541 			SPDK_ERRLOG("Unable to re-post rx descriptor\n");
2542 			nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2543 			return -ENXIO;
2544 		}
2545 	}
2546 
2547 	rqpair->num_completions++;
2548 	return 1;
2549 
2550 err_wc:
2551 	nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2552 	if (poller && poller->srq) {
2553 		spdk_rdma_srq_queue_recv_wrs(poller->srq, rdma_rsp->recv_wr);
2554 	}
2555 	return -ENXIO;
2556 }
2557 
2558 static inline int
2559 nvme_rdma_process_send_completion(struct nvme_rdma_poller *poller,
2560 				  struct nvme_rdma_qpair *rdma_qpair,
2561 				  struct ibv_wc *wc, struct nvme_rdma_wr *rdma_wr)
2562 {
2563 	struct nvme_rdma_qpair		*rqpair;
2564 	struct spdk_nvme_rdma_req	*rdma_req;
2565 
2566 	rdma_req = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvme_rdma_req, rdma_wr);
2567 	rqpair = rdma_req->req ? nvme_rdma_qpair(rdma_req->req->qpair) : NULL;
2568 	if (!rqpair) {
2569 		rqpair = rdma_qpair != NULL ? rdma_qpair : get_rdma_qpair_from_wc(poller->group, wc);
2570 	}
2571 
2572 	/* If we are flushing I/O */
2573 	if (wc->status) {
2574 		if (!rqpair) {
2575 			/* When poll_group is used, several qpairs share the same CQ and it is possible to
2576 			 * receive a completion with error (e.g. IBV_WC_WR_FLUSH_ERR) for already disconnected qpair
2577 			 * That happens due to qpair is destroyed while there are submitted but not completed send/receive
2578 			 * Work Requests */
2579 			assert(poller);
2580 			return 0;
2581 		}
2582 		assert(rqpair->current_num_sends > 0);
2583 		rqpair->current_num_sends--;
2584 		nvme_rdma_log_wc_status(rqpair, wc);
2585 		nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2586 		if (rdma_req->rdma_rsp && poller && poller->srq) {
2587 			spdk_rdma_srq_queue_recv_wrs(poller->srq, rdma_req->rdma_rsp->recv_wr);
2588 		}
2589 		return -ENXIO;
2590 	}
2591 
2592 	/* We do not support Soft Roce anymore. Other than Soft Roce's bug, we should not
2593 	 * receive a completion without error status after qpair is disconnected/destroyed.
2594 	 */
2595 	if (spdk_unlikely(rdma_req->req == NULL)) {
2596 		/*
2597 		 * Some infiniband drivers do not guarantee the previous assumption after we
2598 		 * received a RDMA_CM_EVENT_DEVICE_REMOVAL event.
2599 		 */
2600 		SPDK_ERRLOG("Received malformed completion: request 0x%"PRIx64" type %d\n", wc->wr_id,
2601 			    rdma_wr->type);
2602 		if (!rqpair || !rqpair->need_destroy) {
2603 			assert(0);
2604 		}
2605 		return -ENXIO;
2606 	}
2607 
2608 	rdma_req->completion_flags |= NVME_RDMA_SEND_COMPLETED;
2609 	assert(rqpair->current_num_sends > 0);
2610 	rqpair->current_num_sends--;
2611 
2612 	if ((rdma_req->completion_flags & NVME_RDMA_RECV_COMPLETED) == 0) {
2613 		return 0;
2614 	}
2615 
2616 	nvme_rdma_request_ready(rqpair, rdma_req);
2617 
2618 	if (!rqpair->delay_cmd_submit) {
2619 		if (spdk_unlikely(nvme_rdma_qpair_submit_recvs(rqpair))) {
2620 			SPDK_ERRLOG("Unable to re-post rx descriptor\n");
2621 			nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2622 			return -ENXIO;
2623 		}
2624 	}
2625 
2626 	rqpair->num_completions++;
2627 	return 1;
2628 }
2629 
2630 static int
2631 nvme_rdma_cq_process_completions(struct ibv_cq *cq, uint32_t batch_size,
2632 				 struct nvme_rdma_poller *poller,
2633 				 struct nvme_rdma_qpair *rdma_qpair,
2634 				 uint64_t *rdma_completions)
2635 {
2636 	struct ibv_wc			wc[MAX_COMPLETIONS_PER_POLL];
2637 	struct nvme_rdma_wr		*rdma_wr;
2638 	uint32_t			reaped = 0;
2639 	int				completion_rc = 0;
2640 	int				rc, _rc, i;
2641 
2642 	rc = ibv_poll_cq(cq, batch_size, wc);
2643 	if (rc < 0) {
2644 		SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
2645 			    errno, spdk_strerror(errno));
2646 		return -ECANCELED;
2647 	} else if (rc == 0) {
2648 		return 0;
2649 	}
2650 
2651 	for (i = 0; i < rc; i++) {
2652 		rdma_wr = (struct nvme_rdma_wr *)wc[i].wr_id;
2653 		switch (rdma_wr->type) {
2654 		case RDMA_WR_TYPE_RECV:
2655 			_rc = nvme_rdma_process_recv_completion(poller, &wc[i], rdma_wr);
2656 			break;
2657 
2658 		case RDMA_WR_TYPE_SEND:
2659 			_rc = nvme_rdma_process_send_completion(poller, rdma_qpair, &wc[i], rdma_wr);
2660 			break;
2661 
2662 		default:
2663 			SPDK_ERRLOG("Received an unexpected opcode on the CQ: %d\n", rdma_wr->type);
2664 			return -ECANCELED;
2665 		}
2666 		if (spdk_likely(_rc >= 0)) {
2667 			reaped += _rc;
2668 		} else {
2669 			completion_rc = _rc;
2670 		}
2671 	}
2672 
2673 	*rdma_completions += rc;
2674 
2675 	if (completion_rc) {
2676 		return completion_rc;
2677 	}
2678 
2679 	return reaped;
2680 }
2681 
2682 static void
2683 dummy_disconnected_qpair_cb(struct spdk_nvme_qpair *qpair, void *poll_group_ctx)
2684 {
2685 
2686 }
2687 
2688 static int
2689 nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
2690 				    uint32_t max_completions)
2691 {
2692 	struct nvme_rdma_qpair		*rqpair = nvme_rdma_qpair(qpair);
2693 	struct nvme_rdma_ctrlr		*rctrlr = nvme_rdma_ctrlr(qpair->ctrlr);
2694 	int				rc = 0, batch_size;
2695 	struct ibv_cq			*cq;
2696 	uint64_t			rdma_completions = 0;
2697 
2698 	/*
2699 	 * This is used during the connection phase. It's possible that we are still reaping error completions
2700 	 * from other qpairs so we need to call the poll group function. Also, it's more correct since the cq
2701 	 * is shared.
2702 	 */
2703 	if (qpair->poll_group != NULL) {
2704 		return spdk_nvme_poll_group_process_completions(qpair->poll_group->group, max_completions,
2705 				dummy_disconnected_qpair_cb);
2706 	}
2707 
2708 	if (max_completions == 0) {
2709 		max_completions = rqpair->num_entries;
2710 	} else {
2711 		max_completions = spdk_min(max_completions, rqpair->num_entries);
2712 	}
2713 
2714 	switch (nvme_qpair_get_state(qpair)) {
2715 	case NVME_QPAIR_CONNECTING:
2716 		rc = nvme_rdma_ctrlr_connect_qpair_poll(qpair->ctrlr, qpair);
2717 		if (rc == 0) {
2718 			/* Once the connection is completed, we can submit queued requests */
2719 			nvme_qpair_resubmit_requests(qpair, rqpair->num_entries);
2720 		} else if (rc != -EAGAIN) {
2721 			SPDK_ERRLOG("Failed to connect rqpair=%p\n", rqpair);
2722 			goto failed;
2723 		} else if (rqpair->state <= NVME_RDMA_QPAIR_STATE_INITIALIZING) {
2724 			return 0;
2725 		}
2726 		break;
2727 
2728 	case NVME_QPAIR_DISCONNECTING:
2729 		nvme_rdma_ctrlr_disconnect_qpair_poll(qpair->ctrlr, qpair);
2730 		return -ENXIO;
2731 
2732 	default:
2733 		if (nvme_qpair_is_admin_queue(qpair)) {
2734 			nvme_rdma_poll_events(rctrlr);
2735 		}
2736 		nvme_rdma_qpair_process_cm_event(rqpair);
2737 		break;
2738 	}
2739 
2740 	if (spdk_unlikely(qpair->transport_failure_reason != SPDK_NVME_QPAIR_FAILURE_NONE)) {
2741 		goto failed;
2742 	}
2743 
2744 	cq = rqpair->cq;
2745 
2746 	rqpair->num_completions = 0;
2747 	do {
2748 		batch_size = spdk_min((max_completions - rqpair->num_completions), MAX_COMPLETIONS_PER_POLL);
2749 		rc = nvme_rdma_cq_process_completions(cq, batch_size, NULL, rqpair, &rdma_completions);
2750 
2751 		if (rc == 0) {
2752 			break;
2753 			/* Handle the case where we fail to poll the cq. */
2754 		} else if (rc == -ECANCELED) {
2755 			goto failed;
2756 		} else if (rc == -ENXIO) {
2757 			return rc;
2758 		}
2759 	} while (rqpair->num_completions < max_completions);
2760 
2761 	if (spdk_unlikely(nvme_rdma_qpair_submit_sends(rqpair) ||
2762 			  nvme_rdma_qpair_submit_recvs(rqpair))) {
2763 		goto failed;
2764 	}
2765 
2766 	if (spdk_unlikely(qpair->ctrlr->timeout_enabled)) {
2767 		nvme_rdma_qpair_check_timeout(qpair);
2768 	}
2769 
2770 	return rqpair->num_completions;
2771 
2772 failed:
2773 	nvme_rdma_fail_qpair(qpair, 0);
2774 	return -ENXIO;
2775 }
2776 
2777 static uint32_t
2778 nvme_rdma_ctrlr_get_max_xfer_size(struct spdk_nvme_ctrlr *ctrlr)
2779 {
2780 	/* max_mr_size by ibv_query_device indicates the largest value that we can
2781 	 * set for a registered memory region.  It is independent from the actual
2782 	 * I/O size and is very likely to be larger than 2 MiB which is the
2783 	 * granularity we currently register memory regions.  Hence return
2784 	 * UINT32_MAX here and let the generic layer use the controller data to
2785 	 * moderate this value.
2786 	 */
2787 	return UINT32_MAX;
2788 }
2789 
2790 static uint16_t
2791 nvme_rdma_ctrlr_get_max_sges(struct spdk_nvme_ctrlr *ctrlr)
2792 {
2793 	struct nvme_rdma_ctrlr *rctrlr = nvme_rdma_ctrlr(ctrlr);
2794 	uint32_t max_sge = rctrlr->max_sge;
2795 	uint32_t max_in_capsule_sge = (ctrlr->cdata.nvmf_specific.ioccsz * 16 -
2796 				       sizeof(struct spdk_nvme_cmd)) /
2797 				      sizeof(struct spdk_nvme_sgl_descriptor);
2798 
2799 	/* Max SGE is limited by capsule size */
2800 	max_sge = spdk_min(max_sge, max_in_capsule_sge);
2801 	/* Max SGE may be limited by MSDBD */
2802 	if (ctrlr->cdata.nvmf_specific.msdbd != 0) {
2803 		max_sge = spdk_min(max_sge, ctrlr->cdata.nvmf_specific.msdbd);
2804 	}
2805 
2806 	/* Max SGE can't be less than 1 */
2807 	max_sge = spdk_max(1, max_sge);
2808 	return max_sge;
2809 }
2810 
2811 static int
2812 nvme_rdma_qpair_iterate_requests(struct spdk_nvme_qpair *qpair,
2813 				 int (*iter_fn)(struct nvme_request *req, void *arg),
2814 				 void *arg)
2815 {
2816 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2817 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
2818 	int rc;
2819 
2820 	assert(iter_fn != NULL);
2821 
2822 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
2823 		assert(rdma_req->req != NULL);
2824 
2825 		rc = iter_fn(rdma_req->req, arg);
2826 		if (rc != 0) {
2827 			return rc;
2828 		}
2829 	}
2830 
2831 	return 0;
2832 }
2833 
2834 static void
2835 nvme_rdma_admin_qpair_abort_aers(struct spdk_nvme_qpair *qpair)
2836 {
2837 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
2838 	struct spdk_nvme_cpl cpl;
2839 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2840 
2841 	cpl.status.sc = SPDK_NVME_SC_ABORTED_SQ_DELETION;
2842 	cpl.status.sct = SPDK_NVME_SCT_GENERIC;
2843 
2844 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
2845 		assert(rdma_req->req != NULL);
2846 
2847 		if (rdma_req->req->cmd.opc != SPDK_NVME_OPC_ASYNC_EVENT_REQUEST) {
2848 			continue;
2849 		}
2850 
2851 		nvme_rdma_req_complete(rdma_req, &cpl, false);
2852 	}
2853 }
2854 
2855 static void
2856 nvme_rdma_poller_destroy(struct nvme_rdma_poller *poller)
2857 {
2858 	if (poller->cq) {
2859 		ibv_destroy_cq(poller->cq);
2860 	}
2861 	if (poller->rsps) {
2862 		nvme_rdma_free_rsps(poller->rsps);
2863 	}
2864 	if (poller->srq) {
2865 		spdk_rdma_srq_destroy(poller->srq);
2866 	}
2867 	if (poller->mr_map) {
2868 		spdk_rdma_free_mem_map(&poller->mr_map);
2869 	}
2870 	if (poller->pd) {
2871 		spdk_rdma_put_pd(poller->pd);
2872 	}
2873 	free(poller);
2874 }
2875 
2876 static struct nvme_rdma_poller *
2877 nvme_rdma_poller_create(struct nvme_rdma_poll_group *group, struct ibv_context *ctx)
2878 {
2879 	struct nvme_rdma_poller *poller;
2880 	struct ibv_device_attr dev_attr;
2881 	struct spdk_rdma_srq_init_attr srq_init_attr = {};
2882 	struct nvme_rdma_rsp_opts opts;
2883 	int num_cqe, max_num_cqe;
2884 	int rc;
2885 
2886 	poller = calloc(1, sizeof(*poller));
2887 	if (poller == NULL) {
2888 		SPDK_ERRLOG("Unable to allocate poller.\n");
2889 		return NULL;
2890 	}
2891 
2892 	poller->group = group;
2893 	poller->device = ctx;
2894 
2895 	if (g_spdk_nvme_transport_opts.rdma_srq_size != 0) {
2896 		rc = ibv_query_device(ctx, &dev_attr);
2897 		if (rc) {
2898 			SPDK_ERRLOG("Unable to query RDMA device.\n");
2899 			goto fail;
2900 		}
2901 
2902 		poller->pd = spdk_rdma_get_pd(ctx);
2903 		if (poller->pd == NULL) {
2904 			SPDK_ERRLOG("Unable to get PD.\n");
2905 			goto fail;
2906 		}
2907 
2908 		poller->mr_map = spdk_rdma_create_mem_map(poller->pd, &g_nvme_hooks,
2909 				 SPDK_RDMA_MEMORY_MAP_ROLE_INITIATOR);
2910 		if (poller->mr_map == NULL) {
2911 			SPDK_ERRLOG("Unable to create memory map.\n");
2912 			goto fail;
2913 		}
2914 
2915 		srq_init_attr.stats = &poller->stats.rdma_stats.recv;
2916 		srq_init_attr.pd = poller->pd;
2917 		srq_init_attr.srq_init_attr.attr.max_wr = spdk_min((uint32_t)dev_attr.max_srq_wr,
2918 				g_spdk_nvme_transport_opts.rdma_srq_size);
2919 		srq_init_attr.srq_init_attr.attr.max_sge = spdk_min(dev_attr.max_sge,
2920 				NVME_RDMA_DEFAULT_RX_SGE);
2921 
2922 		poller->srq = spdk_rdma_srq_create(&srq_init_attr);
2923 		if (poller->srq == NULL) {
2924 			SPDK_ERRLOG("Unable to create SRQ.\n");
2925 			goto fail;
2926 		}
2927 
2928 		opts.num_entries = g_spdk_nvme_transport_opts.rdma_srq_size;
2929 		opts.rqpair = NULL;
2930 		opts.srq = poller->srq;
2931 		opts.mr_map = poller->mr_map;
2932 
2933 		poller->rsps = nvme_rdma_create_rsps(&opts);
2934 		if (poller->rsps == NULL) {
2935 			SPDK_ERRLOG("Unable to create poller RDMA responses.\n");
2936 			goto fail;
2937 		}
2938 
2939 		rc = nvme_rdma_poller_submit_recvs(poller);
2940 		if (rc) {
2941 			SPDK_ERRLOG("Unable to submit poller RDMA responses.\n");
2942 			goto fail;
2943 		}
2944 
2945 		/*
2946 		 * When using an srq, fix the size of the completion queue at startup.
2947 		 * The initiator sends only send and recv WRs. Hence, the multiplier is 2.
2948 		 * (The target sends also data WRs. Hence, the multiplier is 3.)
2949 		 */
2950 		num_cqe = g_spdk_nvme_transport_opts.rdma_srq_size * 2;
2951 	} else {
2952 		num_cqe = DEFAULT_NVME_RDMA_CQ_SIZE;
2953 	}
2954 
2955 	max_num_cqe = g_spdk_nvme_transport_opts.rdma_max_cq_size;
2956 	if (max_num_cqe != 0 && num_cqe > max_num_cqe) {
2957 		num_cqe = max_num_cqe;
2958 	}
2959 
2960 	poller->cq = ibv_create_cq(poller->device, num_cqe, group, NULL, 0);
2961 
2962 	if (poller->cq == NULL) {
2963 		SPDK_ERRLOG("Unable to create CQ, errno %d.\n", errno);
2964 		goto fail;
2965 	}
2966 
2967 	STAILQ_INSERT_HEAD(&group->pollers, poller, link);
2968 	group->num_pollers++;
2969 	poller->current_num_wc = num_cqe;
2970 	poller->required_num_wc = 0;
2971 	return poller;
2972 
2973 fail:
2974 	nvme_rdma_poller_destroy(poller);
2975 	return NULL;
2976 }
2977 
2978 static void
2979 nvme_rdma_poll_group_free_pollers(struct nvme_rdma_poll_group *group)
2980 {
2981 	struct nvme_rdma_poller	*poller, *tmp_poller;
2982 
2983 	STAILQ_FOREACH_SAFE(poller, &group->pollers, link, tmp_poller) {
2984 		assert(poller->refcnt == 0);
2985 		if (poller->refcnt) {
2986 			SPDK_WARNLOG("Destroying poller with non-zero ref count: poller %p, refcnt %d\n",
2987 				     poller, poller->refcnt);
2988 		}
2989 
2990 		STAILQ_REMOVE(&group->pollers, poller, nvme_rdma_poller, link);
2991 		nvme_rdma_poller_destroy(poller);
2992 	}
2993 }
2994 
2995 static struct nvme_rdma_poller *
2996 nvme_rdma_poll_group_get_poller(struct nvme_rdma_poll_group *group, struct ibv_context *device)
2997 {
2998 	struct nvme_rdma_poller *poller = NULL;
2999 
3000 	STAILQ_FOREACH(poller, &group->pollers, link) {
3001 		if (poller->device == device) {
3002 			break;
3003 		}
3004 	}
3005 
3006 	if (!poller) {
3007 		poller = nvme_rdma_poller_create(group, device);
3008 		if (!poller) {
3009 			SPDK_ERRLOG("Failed to create a poller for device %p\n", device);
3010 			return NULL;
3011 		}
3012 	}
3013 
3014 	poller->refcnt++;
3015 	return poller;
3016 }
3017 
3018 static void
3019 nvme_rdma_poll_group_put_poller(struct nvme_rdma_poll_group *group, struct nvme_rdma_poller *poller)
3020 {
3021 	assert(poller->refcnt > 0);
3022 	if (--poller->refcnt == 0) {
3023 		STAILQ_REMOVE(&group->pollers, poller, nvme_rdma_poller, link);
3024 		group->num_pollers--;
3025 		nvme_rdma_poller_destroy(poller);
3026 	}
3027 }
3028 
3029 static struct spdk_nvme_transport_poll_group *
3030 nvme_rdma_poll_group_create(void)
3031 {
3032 	struct nvme_rdma_poll_group	*group;
3033 
3034 	group = calloc(1, sizeof(*group));
3035 	if (group == NULL) {
3036 		SPDK_ERRLOG("Unable to allocate poll group.\n");
3037 		return NULL;
3038 	}
3039 
3040 	STAILQ_INIT(&group->pollers);
3041 	return &group->group;
3042 }
3043 
3044 static int
3045 nvme_rdma_poll_group_connect_qpair(struct spdk_nvme_qpair *qpair)
3046 {
3047 	return 0;
3048 }
3049 
3050 static int
3051 nvme_rdma_poll_group_disconnect_qpair(struct spdk_nvme_qpair *qpair)
3052 {
3053 	return 0;
3054 }
3055 
3056 static int
3057 nvme_rdma_poll_group_add(struct spdk_nvme_transport_poll_group *tgroup,
3058 			 struct spdk_nvme_qpair *qpair)
3059 {
3060 	return 0;
3061 }
3062 
3063 static int
3064 nvme_rdma_poll_group_remove(struct spdk_nvme_transport_poll_group *tgroup,
3065 			    struct spdk_nvme_qpair *qpair)
3066 {
3067 	return 0;
3068 }
3069 
3070 static int64_t
3071 nvme_rdma_poll_group_process_completions(struct spdk_nvme_transport_poll_group *tgroup,
3072 		uint32_t completions_per_qpair, spdk_nvme_disconnected_qpair_cb disconnected_qpair_cb)
3073 {
3074 	struct spdk_nvme_qpair			*qpair, *tmp_qpair;
3075 	struct nvme_rdma_qpair			*rqpair;
3076 	struct nvme_rdma_poll_group		*group;
3077 	struct nvme_rdma_poller			*poller;
3078 	int					num_qpairs = 0, batch_size, rc, rc2 = 0;
3079 	int64_t					total_completions = 0;
3080 	uint64_t				completions_allowed = 0;
3081 	uint64_t				completions_per_poller = 0;
3082 	uint64_t				poller_completions = 0;
3083 	uint64_t				rdma_completions;
3084 
3085 	if (completions_per_qpair == 0) {
3086 		completions_per_qpair = MAX_COMPLETIONS_PER_POLL;
3087 	}
3088 
3089 	group = nvme_rdma_poll_group(tgroup);
3090 	STAILQ_FOREACH_SAFE(qpair, &tgroup->disconnected_qpairs, poll_group_stailq, tmp_qpair) {
3091 		rc = nvme_rdma_ctrlr_disconnect_qpair_poll(qpair->ctrlr, qpair);
3092 		if (rc == 0) {
3093 			disconnected_qpair_cb(qpair, tgroup->group->ctx);
3094 		}
3095 	}
3096 
3097 	STAILQ_FOREACH_SAFE(qpair, &tgroup->connected_qpairs, poll_group_stailq, tmp_qpair) {
3098 		rqpair = nvme_rdma_qpair(qpair);
3099 		rqpair->num_completions = 0;
3100 
3101 		if (spdk_unlikely(nvme_qpair_get_state(qpair) == NVME_QPAIR_CONNECTING)) {
3102 			rc = nvme_rdma_ctrlr_connect_qpair_poll(qpair->ctrlr, qpair);
3103 			if (rc == 0) {
3104 				/* Once the connection is completed, we can submit queued requests */
3105 				nvme_qpair_resubmit_requests(qpair, rqpair->num_entries);
3106 			} else if (rc != -EAGAIN) {
3107 				SPDK_ERRLOG("Failed to connect rqpair=%p\n", rqpair);
3108 				nvme_rdma_fail_qpair(qpair, 0);
3109 				continue;
3110 			}
3111 		} else {
3112 			nvme_rdma_qpair_process_cm_event(rqpair);
3113 		}
3114 
3115 		if (spdk_unlikely(qpair->transport_failure_reason != SPDK_NVME_QPAIR_FAILURE_NONE)) {
3116 			rc2 = -ENXIO;
3117 			nvme_rdma_fail_qpair(qpair, 0);
3118 			continue;
3119 		}
3120 		num_qpairs++;
3121 	}
3122 
3123 	completions_allowed = completions_per_qpair * num_qpairs;
3124 	if (group->num_pollers) {
3125 		completions_per_poller = spdk_max(completions_allowed / group->num_pollers, 1);
3126 	}
3127 
3128 	STAILQ_FOREACH(poller, &group->pollers, link) {
3129 		poller_completions = 0;
3130 		rdma_completions = 0;
3131 		do {
3132 			poller->stats.polls++;
3133 			batch_size = spdk_min((completions_per_poller - poller_completions), MAX_COMPLETIONS_PER_POLL);
3134 			rc = nvme_rdma_cq_process_completions(poller->cq, batch_size, poller, NULL, &rdma_completions);
3135 			if (rc <= 0) {
3136 				if (rc == -ECANCELED) {
3137 					return -EIO;
3138 				} else if (rc == 0) {
3139 					poller->stats.idle_polls++;
3140 				}
3141 				break;
3142 			}
3143 
3144 			poller_completions += rc;
3145 		} while (poller_completions < completions_per_poller);
3146 		total_completions += poller_completions;
3147 		poller->stats.completions += rdma_completions;
3148 		if (poller->srq) {
3149 			nvme_rdma_poller_submit_recvs(poller);
3150 		}
3151 	}
3152 
3153 	STAILQ_FOREACH_SAFE(qpair, &tgroup->connected_qpairs, poll_group_stailq, tmp_qpair) {
3154 		rqpair = nvme_rdma_qpair(qpair);
3155 
3156 		if (spdk_unlikely(rqpair->state <= NVME_RDMA_QPAIR_STATE_INITIALIZING)) {
3157 			continue;
3158 		}
3159 
3160 		if (spdk_unlikely(qpair->ctrlr->timeout_enabled)) {
3161 			nvme_rdma_qpair_check_timeout(qpair);
3162 		}
3163 
3164 		nvme_rdma_qpair_submit_sends(rqpair);
3165 		if (!rqpair->srq) {
3166 			nvme_rdma_qpair_submit_recvs(rqpair);
3167 		}
3168 		if (rqpair->num_completions > 0) {
3169 			nvme_qpair_resubmit_requests(qpair, rqpair->num_completions);
3170 		}
3171 	}
3172 
3173 	return rc2 != 0 ? rc2 : total_completions;
3174 }
3175 
3176 static int
3177 nvme_rdma_poll_group_destroy(struct spdk_nvme_transport_poll_group *tgroup)
3178 {
3179 	struct nvme_rdma_poll_group	*group = nvme_rdma_poll_group(tgroup);
3180 
3181 	if (!STAILQ_EMPTY(&tgroup->connected_qpairs) || !STAILQ_EMPTY(&tgroup->disconnected_qpairs)) {
3182 		return -EBUSY;
3183 	}
3184 
3185 	nvme_rdma_poll_group_free_pollers(group);
3186 	free(group);
3187 
3188 	return 0;
3189 }
3190 
3191 static int
3192 nvme_rdma_poll_group_get_stats(struct spdk_nvme_transport_poll_group *tgroup,
3193 			       struct spdk_nvme_transport_poll_group_stat **_stats)
3194 {
3195 	struct nvme_rdma_poll_group *group;
3196 	struct spdk_nvme_transport_poll_group_stat *stats;
3197 	struct spdk_nvme_rdma_device_stat *device_stat;
3198 	struct nvme_rdma_poller *poller;
3199 	uint32_t i = 0;
3200 
3201 	if (tgroup == NULL || _stats == NULL) {
3202 		SPDK_ERRLOG("Invalid stats or group pointer\n");
3203 		return -EINVAL;
3204 	}
3205 
3206 	group = nvme_rdma_poll_group(tgroup);
3207 	stats = calloc(1, sizeof(*stats));
3208 	if (!stats) {
3209 		SPDK_ERRLOG("Can't allocate memory for RDMA stats\n");
3210 		return -ENOMEM;
3211 	}
3212 	stats->trtype = SPDK_NVME_TRANSPORT_RDMA;
3213 	stats->rdma.num_devices = group->num_pollers;
3214 
3215 	if (stats->rdma.num_devices == 0) {
3216 		*_stats = stats;
3217 		return 0;
3218 	}
3219 
3220 	stats->rdma.device_stats = calloc(stats->rdma.num_devices, sizeof(*stats->rdma.device_stats));
3221 	if (!stats->rdma.device_stats) {
3222 		SPDK_ERRLOG("Can't allocate memory for RDMA device stats\n");
3223 		free(stats);
3224 		return -ENOMEM;
3225 	}
3226 
3227 	STAILQ_FOREACH(poller, &group->pollers, link) {
3228 		device_stat = &stats->rdma.device_stats[i];
3229 		device_stat->name = poller->device->device->name;
3230 		device_stat->polls = poller->stats.polls;
3231 		device_stat->idle_polls = poller->stats.idle_polls;
3232 		device_stat->completions = poller->stats.completions;
3233 		device_stat->queued_requests = poller->stats.queued_requests;
3234 		device_stat->total_send_wrs = poller->stats.rdma_stats.send.num_submitted_wrs;
3235 		device_stat->send_doorbell_updates = poller->stats.rdma_stats.send.doorbell_updates;
3236 		device_stat->total_recv_wrs = poller->stats.rdma_stats.recv.num_submitted_wrs;
3237 		device_stat->recv_doorbell_updates = poller->stats.rdma_stats.recv.doorbell_updates;
3238 		i++;
3239 	}
3240 
3241 	*_stats = stats;
3242 
3243 	return 0;
3244 }
3245 
3246 static void
3247 nvme_rdma_poll_group_free_stats(struct spdk_nvme_transport_poll_group *tgroup,
3248 				struct spdk_nvme_transport_poll_group_stat *stats)
3249 {
3250 	if (stats) {
3251 		free(stats->rdma.device_stats);
3252 	}
3253 	free(stats);
3254 }
3255 
3256 static int
3257 nvme_rdma_ctrlr_get_memory_domains(const struct spdk_nvme_ctrlr *ctrlr,
3258 				   struct spdk_memory_domain **domains, int array_size)
3259 {
3260 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(ctrlr->adminq);
3261 
3262 	if (domains && array_size > 0) {
3263 		domains[0] = rqpair->memory_domain->domain;
3264 	}
3265 
3266 	return 1;
3267 }
3268 
3269 void
3270 spdk_nvme_rdma_init_hooks(struct spdk_nvme_rdma_hooks *hooks)
3271 {
3272 	g_nvme_hooks = *hooks;
3273 }
3274 
3275 const struct spdk_nvme_transport_ops rdma_ops = {
3276 	.name = "RDMA",
3277 	.type = SPDK_NVME_TRANSPORT_RDMA,
3278 	.ctrlr_construct = nvme_rdma_ctrlr_construct,
3279 	.ctrlr_scan = nvme_fabric_ctrlr_scan,
3280 	.ctrlr_destruct = nvme_rdma_ctrlr_destruct,
3281 	.ctrlr_enable = nvme_rdma_ctrlr_enable,
3282 
3283 	.ctrlr_set_reg_4 = nvme_fabric_ctrlr_set_reg_4,
3284 	.ctrlr_set_reg_8 = nvme_fabric_ctrlr_set_reg_8,
3285 	.ctrlr_get_reg_4 = nvme_fabric_ctrlr_get_reg_4,
3286 	.ctrlr_get_reg_8 = nvme_fabric_ctrlr_get_reg_8,
3287 	.ctrlr_set_reg_4_async = nvme_fabric_ctrlr_set_reg_4_async,
3288 	.ctrlr_set_reg_8_async = nvme_fabric_ctrlr_set_reg_8_async,
3289 	.ctrlr_get_reg_4_async = nvme_fabric_ctrlr_get_reg_4_async,
3290 	.ctrlr_get_reg_8_async = nvme_fabric_ctrlr_get_reg_8_async,
3291 
3292 	.ctrlr_get_max_xfer_size = nvme_rdma_ctrlr_get_max_xfer_size,
3293 	.ctrlr_get_max_sges = nvme_rdma_ctrlr_get_max_sges,
3294 
3295 	.ctrlr_create_io_qpair = nvme_rdma_ctrlr_create_io_qpair,
3296 	.ctrlr_delete_io_qpair = nvme_rdma_ctrlr_delete_io_qpair,
3297 	.ctrlr_connect_qpair = nvme_rdma_ctrlr_connect_qpair,
3298 	.ctrlr_disconnect_qpair = nvme_rdma_ctrlr_disconnect_qpair,
3299 
3300 	.ctrlr_get_memory_domains = nvme_rdma_ctrlr_get_memory_domains,
3301 
3302 	.qpair_abort_reqs = nvme_rdma_qpair_abort_reqs,
3303 	.qpair_reset = nvme_rdma_qpair_reset,
3304 	.qpair_submit_request = nvme_rdma_qpair_submit_request,
3305 	.qpair_process_completions = nvme_rdma_qpair_process_completions,
3306 	.qpair_iterate_requests = nvme_rdma_qpair_iterate_requests,
3307 	.admin_qpair_abort_aers = nvme_rdma_admin_qpair_abort_aers,
3308 
3309 	.poll_group_create = nvme_rdma_poll_group_create,
3310 	.poll_group_connect_qpair = nvme_rdma_poll_group_connect_qpair,
3311 	.poll_group_disconnect_qpair = nvme_rdma_poll_group_disconnect_qpair,
3312 	.poll_group_add = nvme_rdma_poll_group_add,
3313 	.poll_group_remove = nvme_rdma_poll_group_remove,
3314 	.poll_group_process_completions = nvme_rdma_poll_group_process_completions,
3315 	.poll_group_destroy = nvme_rdma_poll_group_destroy,
3316 	.poll_group_get_stats = nvme_rdma_poll_group_get_stats,
3317 	.poll_group_free_stats = nvme_rdma_poll_group_free_stats,
3318 };
3319 
3320 SPDK_NVME_TRANSPORT_REGISTER(rdma, &rdma_ops);
3321