xref: /spdk/lib/nvme/nvme_rdma.c (revision d39963a9da590629d3a9bc6e3011e433229555c7)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation. All rights reserved.
5  *   Copyright (c) 2019-2021 Mellanox Technologies LTD. All rights reserved.
6  *   Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 /*
36  * NVMe over RDMA transport
37  */
38 
39 #include "spdk/stdinc.h"
40 
41 #include "spdk/assert.h"
42 #include "spdk/dma.h"
43 #include "spdk/log.h"
44 #include "spdk/trace.h"
45 #include "spdk/queue.h"
46 #include "spdk/nvme.h"
47 #include "spdk/nvmf_spec.h"
48 #include "spdk/string.h"
49 #include "spdk/endian.h"
50 #include "spdk/likely.h"
51 #include "spdk/config.h"
52 
53 #include "nvme_internal.h"
54 #include "spdk_internal/rdma.h"
55 
56 #define NVME_RDMA_TIME_OUT_IN_MS 2000
57 #define NVME_RDMA_RW_BUFFER_SIZE 131072
58 
59 /*
60  * NVME RDMA qpair Resource Defaults
61  */
62 #define NVME_RDMA_DEFAULT_TX_SGE		2
63 #define NVME_RDMA_DEFAULT_RX_SGE		1
64 
65 /* Max number of NVMe-oF SGL descriptors supported by the host */
66 #define NVME_RDMA_MAX_SGL_DESCRIPTORS		16
67 
68 /* number of STAILQ entries for holding pending RDMA CM events. */
69 #define NVME_RDMA_NUM_CM_EVENTS			256
70 
71 /* CM event processing timeout */
72 #define NVME_RDMA_QPAIR_CM_EVENT_TIMEOUT_US	1000000
73 
74 /* The default size for a shared rdma completion queue. */
75 #define DEFAULT_NVME_RDMA_CQ_SIZE		4096
76 
77 /*
78  * In the special case of a stale connection we don't expose a mechanism
79  * for the user to retry the connection so we need to handle it internally.
80  */
81 #define NVME_RDMA_STALE_CONN_RETRY_MAX		5
82 #define NVME_RDMA_STALE_CONN_RETRY_DELAY_US	10000
83 
84 /*
85  * Maximum value of transport_retry_count used by RDMA controller
86  */
87 #define NVME_RDMA_CTRLR_MAX_TRANSPORT_RETRY_COUNT	7
88 
89 /*
90  * Maximum value of transport_ack_timeout used by RDMA controller
91  */
92 #define NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT	31
93 
94 /*
95  * Number of microseconds to wait until the lingering qpair becomes quiet.
96  */
97 #define NVME_RDMA_DISCONNECTED_QPAIR_TIMEOUT_US	1000000ull
98 
99 /*
100  * The max length of keyed SGL data block (3 bytes)
101  */
102 #define NVME_RDMA_MAX_KEYED_SGL_LENGTH ((1u << 24u) - 1)
103 
104 #define WC_PER_QPAIR(queue_depth)	(queue_depth * 2)
105 
106 #define NVME_RDMA_POLL_GROUP_CHECK_QPN(_rqpair, qpn)				\
107 	((_rqpair)->rdma_qp && (_rqpair)->rdma_qp->qp->qp_num == (qpn))	\
108 
109 struct nvme_rdma_memory_domain {
110 	TAILQ_ENTRY(nvme_rdma_memory_domain) link;
111 	uint32_t ref;
112 	struct ibv_pd *pd;
113 	struct spdk_memory_domain *domain;
114 	struct spdk_memory_domain_rdma_ctx rdma_ctx;
115 };
116 
117 enum nvme_rdma_wr_type {
118 	RDMA_WR_TYPE_RECV,
119 	RDMA_WR_TYPE_SEND,
120 };
121 
122 struct nvme_rdma_wr {
123 	/* Using this instead of the enum allows this struct to only occupy one byte. */
124 	uint8_t	type;
125 };
126 
127 struct spdk_nvmf_cmd {
128 	struct spdk_nvme_cmd cmd;
129 	struct spdk_nvme_sgl_descriptor sgl[NVME_RDMA_MAX_SGL_DESCRIPTORS];
130 };
131 
132 struct spdk_nvme_rdma_hooks g_nvme_hooks = {};
133 
134 /* STAILQ wrapper for cm events. */
135 struct nvme_rdma_cm_event_entry {
136 	struct rdma_cm_event			*evt;
137 	STAILQ_ENTRY(nvme_rdma_cm_event_entry)	link;
138 };
139 
140 /* NVMe RDMA transport extensions for spdk_nvme_ctrlr */
141 struct nvme_rdma_ctrlr {
142 	struct spdk_nvme_ctrlr			ctrlr;
143 
144 	struct ibv_pd				*pd;
145 
146 	uint16_t				max_sge;
147 
148 	struct rdma_event_channel		*cm_channel;
149 
150 	STAILQ_HEAD(, nvme_rdma_cm_event_entry)	pending_cm_events;
151 
152 	STAILQ_HEAD(, nvme_rdma_cm_event_entry)	free_cm_events;
153 
154 	struct nvme_rdma_cm_event_entry		*cm_events;
155 };
156 
157 struct nvme_rdma_poller_stats {
158 	uint64_t polls;
159 	uint64_t idle_polls;
160 	uint64_t queued_requests;
161 	uint64_t completions;
162 	struct spdk_rdma_qp_stats rdma_stats;
163 };
164 
165 struct nvme_rdma_poller {
166 	struct ibv_context		*device;
167 	struct ibv_cq			*cq;
168 	int				required_num_wc;
169 	int				current_num_wc;
170 	struct nvme_rdma_poller_stats	stats;
171 	STAILQ_ENTRY(nvme_rdma_poller)	link;
172 };
173 
174 struct nvme_rdma_poll_group {
175 	struct spdk_nvme_transport_poll_group		group;
176 	STAILQ_HEAD(, nvme_rdma_poller)			pollers;
177 	uint32_t					num_pollers;
178 };
179 
180 /* Memory regions */
181 union nvme_rdma_mr {
182 	struct ibv_mr	*mr;
183 	uint64_t	key;
184 };
185 
186 enum nvme_rdma_qpair_state {
187 	NVME_RDMA_QPAIR_STATE_INVALID = 0,
188 	NVME_RDMA_QPAIR_STATE_STALE_CONN,
189 	NVME_RDMA_QPAIR_STATE_INITIALIZING,
190 	NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_SEND,
191 	NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_POLL,
192 	NVME_RDMA_QPAIR_STATE_RUNNING,
193 	NVME_RDMA_QPAIR_STATE_EXITING,
194 	NVME_RDMA_QPAIR_STATE_LINGERING,
195 	NVME_RDMA_QPAIR_STATE_EXITED,
196 };
197 
198 struct nvme_rdma_qpair;
199 
200 typedef int (*nvme_rdma_cm_event_cb)(struct nvme_rdma_qpair *rqpair, int ret);
201 
202 /* NVMe RDMA qpair extensions for spdk_nvme_qpair */
203 struct nvme_rdma_qpair {
204 	struct spdk_nvme_qpair			qpair;
205 
206 	struct spdk_rdma_qp			*rdma_qp;
207 	struct rdma_cm_id			*cm_id;
208 	struct ibv_cq				*cq;
209 
210 	struct	spdk_nvme_rdma_req		*rdma_reqs;
211 
212 	uint32_t				max_send_sge;
213 
214 	uint32_t				max_recv_sge;
215 
216 	uint16_t				num_entries;
217 
218 	bool					delay_cmd_submit;
219 
220 	uint32_t				num_completions;
221 
222 	/* Parallel arrays of response buffers + response SGLs of size num_entries */
223 	struct ibv_sge				*rsp_sgls;
224 	struct spdk_nvme_rdma_rsp		*rsps;
225 
226 	struct ibv_recv_wr			*rsp_recv_wrs;
227 
228 	/* Memory region describing all rsps for this qpair */
229 	union nvme_rdma_mr			rsp_mr;
230 
231 	/*
232 	 * Array of num_entries NVMe commands registered as RDMA message buffers.
233 	 * Indexed by rdma_req->id.
234 	 */
235 	struct spdk_nvmf_cmd			*cmds;
236 
237 	/* Memory region describing all cmds for this qpair */
238 	union nvme_rdma_mr			cmd_mr;
239 
240 	struct spdk_rdma_mem_map		*mr_map;
241 
242 	TAILQ_HEAD(, spdk_nvme_rdma_req)	free_reqs;
243 	TAILQ_HEAD(, spdk_nvme_rdma_req)	outstanding_reqs;
244 
245 	struct nvme_rdma_memory_domain		*memory_domain;
246 
247 	/* Counts of outstanding send and recv objects */
248 	uint16_t				current_num_recvs;
249 	uint16_t				current_num_sends;
250 
251 	/* Placed at the end of the struct since it is not used frequently */
252 	struct rdma_cm_event			*evt;
253 	struct nvme_rdma_poller			*poller;
254 
255 	uint64_t				evt_timeout_ticks;
256 	nvme_rdma_cm_event_cb			evt_cb;
257 	enum rdma_cm_event_type			expected_evt_type;
258 
259 	enum nvme_rdma_qpair_state		state;
260 
261 	bool					in_connect_poll;
262 
263 	uint8_t					stale_conn_retry_count;
264 };
265 
266 enum NVME_RDMA_COMPLETION_FLAGS {
267 	NVME_RDMA_SEND_COMPLETED = 1u << 0,
268 	NVME_RDMA_RECV_COMPLETED = 1u << 1,
269 };
270 
271 struct spdk_nvme_rdma_req {
272 	uint16_t				id;
273 	uint16_t				completion_flags: 2;
274 	uint16_t				reserved: 14;
275 	/* if completion of RDMA_RECV received before RDMA_SEND, we will complete nvme request
276 	 * during processing of RDMA_SEND. To complete the request we must know the index
277 	 * of nvme_cpl received in RDMA_RECV, so store it in this field */
278 	uint16_t				rsp_idx;
279 
280 	struct nvme_rdma_wr			rdma_wr;
281 
282 	struct ibv_send_wr			send_wr;
283 
284 	struct nvme_request			*req;
285 
286 	struct ibv_sge				send_sgl[NVME_RDMA_DEFAULT_TX_SGE];
287 
288 	TAILQ_ENTRY(spdk_nvme_rdma_req)		link;
289 };
290 
291 struct spdk_nvme_rdma_rsp {
292 	struct spdk_nvme_cpl	cpl;
293 	struct nvme_rdma_qpair	*rqpair;
294 	uint16_t		idx;
295 	struct nvme_rdma_wr	rdma_wr;
296 };
297 
298 struct nvme_rdma_memory_translation_ctx {
299 	void *addr;
300 	size_t length;
301 	uint32_t lkey;
302 	uint32_t rkey;
303 };
304 
305 static const char *rdma_cm_event_str[] = {
306 	"RDMA_CM_EVENT_ADDR_RESOLVED",
307 	"RDMA_CM_EVENT_ADDR_ERROR",
308 	"RDMA_CM_EVENT_ROUTE_RESOLVED",
309 	"RDMA_CM_EVENT_ROUTE_ERROR",
310 	"RDMA_CM_EVENT_CONNECT_REQUEST",
311 	"RDMA_CM_EVENT_CONNECT_RESPONSE",
312 	"RDMA_CM_EVENT_CONNECT_ERROR",
313 	"RDMA_CM_EVENT_UNREACHABLE",
314 	"RDMA_CM_EVENT_REJECTED",
315 	"RDMA_CM_EVENT_ESTABLISHED",
316 	"RDMA_CM_EVENT_DISCONNECTED",
317 	"RDMA_CM_EVENT_DEVICE_REMOVAL",
318 	"RDMA_CM_EVENT_MULTICAST_JOIN",
319 	"RDMA_CM_EVENT_MULTICAST_ERROR",
320 	"RDMA_CM_EVENT_ADDR_CHANGE",
321 	"RDMA_CM_EVENT_TIMEWAIT_EXIT"
322 };
323 
324 struct nvme_rdma_qpair *nvme_rdma_poll_group_get_qpair_by_id(struct nvme_rdma_poll_group *group,
325 		uint32_t qp_num);
326 
327 static TAILQ_HEAD(, nvme_rdma_memory_domain) g_memory_domains = TAILQ_HEAD_INITIALIZER(
328 			g_memory_domains);
329 static pthread_mutex_t g_memory_domains_lock = PTHREAD_MUTEX_INITIALIZER;
330 
331 static struct nvme_rdma_memory_domain *
332 nvme_rdma_get_memory_domain(struct ibv_pd *pd)
333 {
334 	struct nvme_rdma_memory_domain *domain = NULL;
335 	struct spdk_memory_domain_ctx ctx;
336 	int rc;
337 
338 	pthread_mutex_lock(&g_memory_domains_lock);
339 
340 	TAILQ_FOREACH(domain, &g_memory_domains, link) {
341 		if (domain->pd == pd) {
342 			domain->ref++;
343 			pthread_mutex_unlock(&g_memory_domains_lock);
344 			return domain;
345 		}
346 	}
347 
348 	domain = calloc(1, sizeof(*domain));
349 	if (!domain) {
350 		SPDK_ERRLOG("Memory allocation failed\n");
351 		pthread_mutex_unlock(&g_memory_domains_lock);
352 		return NULL;
353 	}
354 
355 	domain->rdma_ctx.size = sizeof(domain->rdma_ctx);
356 	domain->rdma_ctx.ibv_pd = pd;
357 	ctx.size = sizeof(ctx);
358 	ctx.user_ctx = &domain->rdma_ctx;
359 
360 	rc = spdk_memory_domain_create(&domain->domain, SPDK_DMA_DEVICE_TYPE_RDMA, &ctx,
361 				       SPDK_RDMA_DMA_DEVICE);
362 	if (rc) {
363 		SPDK_ERRLOG("Failed to create memory domain\n");
364 		free(domain);
365 		pthread_mutex_unlock(&g_memory_domains_lock);
366 		return NULL;
367 	}
368 
369 	domain->pd = pd;
370 	domain->ref = 1;
371 	TAILQ_INSERT_TAIL(&g_memory_domains, domain, link);
372 
373 	pthread_mutex_unlock(&g_memory_domains_lock);
374 
375 	return domain;
376 }
377 
378 static void
379 nvme_rdma_put_memory_domain(struct nvme_rdma_memory_domain *device)
380 {
381 	if (!device) {
382 		return;
383 	}
384 
385 	pthread_mutex_lock(&g_memory_domains_lock);
386 
387 	assert(device->ref > 0);
388 
389 	device->ref--;
390 
391 	if (device->ref == 0) {
392 		spdk_memory_domain_destroy(device->domain);
393 		TAILQ_REMOVE(&g_memory_domains, device, link);
394 		free(device);
395 	}
396 
397 	pthread_mutex_unlock(&g_memory_domains_lock);
398 }
399 
400 static inline void *
401 nvme_rdma_calloc(size_t nmemb, size_t size)
402 {
403 	if (!nmemb || !size) {
404 		return NULL;
405 	}
406 
407 	return spdk_zmalloc(nmemb * size, 0, NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
408 }
409 
410 static inline void
411 nvme_rdma_free(void *buf)
412 {
413 	spdk_free(buf);
414 }
415 
416 static int nvme_rdma_ctrlr_delete_io_qpair(struct spdk_nvme_ctrlr *ctrlr,
417 		struct spdk_nvme_qpair *qpair);
418 
419 static inline struct nvme_rdma_qpair *
420 nvme_rdma_qpair(struct spdk_nvme_qpair *qpair)
421 {
422 	assert(qpair->trtype == SPDK_NVME_TRANSPORT_RDMA);
423 	return SPDK_CONTAINEROF(qpair, struct nvme_rdma_qpair, qpair);
424 }
425 
426 static inline struct nvme_rdma_poll_group *
427 nvme_rdma_poll_group(struct spdk_nvme_transport_poll_group *group)
428 {
429 	return (SPDK_CONTAINEROF(group, struct nvme_rdma_poll_group, group));
430 }
431 
432 static inline struct nvme_rdma_ctrlr *
433 nvme_rdma_ctrlr(struct spdk_nvme_ctrlr *ctrlr)
434 {
435 	assert(ctrlr->trid.trtype == SPDK_NVME_TRANSPORT_RDMA);
436 	return SPDK_CONTAINEROF(ctrlr, struct nvme_rdma_ctrlr, ctrlr);
437 }
438 
439 static struct spdk_nvme_rdma_req *
440 nvme_rdma_req_get(struct nvme_rdma_qpair *rqpair)
441 {
442 	struct spdk_nvme_rdma_req *rdma_req;
443 
444 	rdma_req = TAILQ_FIRST(&rqpair->free_reqs);
445 	if (rdma_req) {
446 		TAILQ_REMOVE(&rqpair->free_reqs, rdma_req, link);
447 		TAILQ_INSERT_TAIL(&rqpair->outstanding_reqs, rdma_req, link);
448 	}
449 
450 	return rdma_req;
451 }
452 
453 static void
454 nvme_rdma_req_put(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
455 {
456 	rdma_req->completion_flags = 0;
457 	rdma_req->req = NULL;
458 	TAILQ_INSERT_HEAD(&rqpair->free_reqs, rdma_req, link);
459 }
460 
461 static void
462 nvme_rdma_req_complete(struct spdk_nvme_rdma_req *rdma_req,
463 		       struct spdk_nvme_cpl *rsp)
464 {
465 	struct nvme_request *req = rdma_req->req;
466 	struct nvme_rdma_qpair *rqpair;
467 
468 	assert(req != NULL);
469 
470 	rqpair = nvme_rdma_qpair(req->qpair);
471 	TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link);
472 
473 	nvme_complete_request(req->cb_fn, req->cb_arg, req->qpair, req, rsp);
474 	nvme_free_request(req);
475 }
476 
477 static const char *
478 nvme_rdma_cm_event_str_get(uint32_t event)
479 {
480 	if (event < SPDK_COUNTOF(rdma_cm_event_str)) {
481 		return rdma_cm_event_str[event];
482 	} else {
483 		return "Undefined";
484 	}
485 }
486 
487 
488 static int
489 nvme_rdma_qpair_process_cm_event(struct nvme_rdma_qpair *rqpair)
490 {
491 	struct rdma_cm_event				*event = rqpair->evt;
492 	struct spdk_nvmf_rdma_accept_private_data	*accept_data;
493 	int						rc = 0;
494 
495 	if (event) {
496 		switch (event->event) {
497 		case RDMA_CM_EVENT_ADDR_RESOLVED:
498 		case RDMA_CM_EVENT_ADDR_ERROR:
499 		case RDMA_CM_EVENT_ROUTE_RESOLVED:
500 		case RDMA_CM_EVENT_ROUTE_ERROR:
501 			break;
502 		case RDMA_CM_EVENT_CONNECT_REQUEST:
503 			break;
504 		case RDMA_CM_EVENT_CONNECT_ERROR:
505 			break;
506 		case RDMA_CM_EVENT_UNREACHABLE:
507 		case RDMA_CM_EVENT_REJECTED:
508 			break;
509 		case RDMA_CM_EVENT_CONNECT_RESPONSE:
510 			rc = spdk_rdma_qp_complete_connect(rqpair->rdma_qp);
511 		/* fall through */
512 		case RDMA_CM_EVENT_ESTABLISHED:
513 			accept_data = (struct spdk_nvmf_rdma_accept_private_data *)event->param.conn.private_data;
514 			if (accept_data == NULL) {
515 				rc = -1;
516 			} else {
517 				SPDK_DEBUGLOG(nvme, "Requested queue depth %d. Target receive queue depth %d.\n",
518 					      rqpair->num_entries + 1, accept_data->crqsize);
519 			}
520 			break;
521 		case RDMA_CM_EVENT_DISCONNECTED:
522 			rqpair->qpair.transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_REMOTE;
523 			break;
524 		case RDMA_CM_EVENT_DEVICE_REMOVAL:
525 			rqpair->qpair.transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_LOCAL;
526 			break;
527 		case RDMA_CM_EVENT_MULTICAST_JOIN:
528 		case RDMA_CM_EVENT_MULTICAST_ERROR:
529 			break;
530 		case RDMA_CM_EVENT_ADDR_CHANGE:
531 			rqpair->qpair.transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_LOCAL;
532 			break;
533 		case RDMA_CM_EVENT_TIMEWAIT_EXIT:
534 			break;
535 		default:
536 			SPDK_ERRLOG("Unexpected Acceptor Event [%d]\n", event->event);
537 			break;
538 		}
539 		rqpair->evt = NULL;
540 		rdma_ack_cm_event(event);
541 	}
542 
543 	return rc;
544 }
545 
546 /*
547  * This function must be called under the nvme controller's lock
548  * because it touches global controller variables. The lock is taken
549  * by the generic transport code before invoking a few of the functions
550  * in this file: nvme_rdma_ctrlr_connect_qpair, nvme_rdma_ctrlr_delete_io_qpair,
551  * and conditionally nvme_rdma_qpair_process_completions when it is calling
552  * completions on the admin qpair. When adding a new call to this function, please
553  * verify that it is in a situation where it falls under the lock.
554  */
555 static int
556 nvme_rdma_poll_events(struct nvme_rdma_ctrlr *rctrlr)
557 {
558 	struct nvme_rdma_cm_event_entry	*entry, *tmp;
559 	struct nvme_rdma_qpair		*event_qpair;
560 	struct rdma_cm_event		*event;
561 	struct rdma_event_channel	*channel = rctrlr->cm_channel;
562 
563 	STAILQ_FOREACH_SAFE(entry, &rctrlr->pending_cm_events, link, tmp) {
564 		event_qpair = entry->evt->id->context;
565 		if (event_qpair->evt == NULL) {
566 			event_qpair->evt = entry->evt;
567 			STAILQ_REMOVE(&rctrlr->pending_cm_events, entry, nvme_rdma_cm_event_entry, link);
568 			STAILQ_INSERT_HEAD(&rctrlr->free_cm_events, entry, link);
569 		}
570 	}
571 
572 	while (rdma_get_cm_event(channel, &event) == 0) {
573 		event_qpair = event->id->context;
574 		if (event_qpair->evt == NULL) {
575 			event_qpair->evt = event;
576 		} else {
577 			assert(rctrlr == nvme_rdma_ctrlr(event_qpair->qpair.ctrlr));
578 			entry = STAILQ_FIRST(&rctrlr->free_cm_events);
579 			if (entry == NULL) {
580 				rdma_ack_cm_event(event);
581 				return -ENOMEM;
582 			}
583 			STAILQ_REMOVE(&rctrlr->free_cm_events, entry, nvme_rdma_cm_event_entry, link);
584 			entry->evt = event;
585 			STAILQ_INSERT_TAIL(&rctrlr->pending_cm_events, entry, link);
586 		}
587 	}
588 
589 	/* rdma_get_cm_event() returns -1 on error. If an error occurs, errno
590 	 * will be set to indicate the failure reason. So return negated errno here.
591 	 */
592 	return -errno;
593 }
594 
595 static int
596 nvme_rdma_validate_cm_event(enum rdma_cm_event_type expected_evt_type,
597 			    struct rdma_cm_event *reaped_evt)
598 {
599 	int rc = -EBADMSG;
600 
601 	if (expected_evt_type == reaped_evt->event) {
602 		return 0;
603 	}
604 
605 	switch (expected_evt_type) {
606 	case RDMA_CM_EVENT_ESTABLISHED:
607 		/*
608 		 * There is an enum ib_cm_rej_reason in the kernel headers that sets 10 as
609 		 * IB_CM_REJ_STALE_CONN. I can't find the corresponding userspace but we get
610 		 * the same values here.
611 		 */
612 		if (reaped_evt->event == RDMA_CM_EVENT_REJECTED && reaped_evt->status == 10) {
613 			rc = -ESTALE;
614 		} else if (reaped_evt->event == RDMA_CM_EVENT_CONNECT_RESPONSE) {
615 			/*
616 			 *  If we are using a qpair which is not created using rdma cm API
617 			 *  then we will receive RDMA_CM_EVENT_CONNECT_RESPONSE instead of
618 			 *  RDMA_CM_EVENT_ESTABLISHED.
619 			 */
620 			return 0;
621 		}
622 		break;
623 	default:
624 		break;
625 	}
626 
627 	SPDK_ERRLOG("Expected %s but received %s (%d) from CM event channel (status = %d)\n",
628 		    nvme_rdma_cm_event_str_get(expected_evt_type),
629 		    nvme_rdma_cm_event_str_get(reaped_evt->event), reaped_evt->event,
630 		    reaped_evt->status);
631 	return rc;
632 }
633 
634 static int
635 nvme_rdma_process_event_start(struct nvme_rdma_qpair *rqpair,
636 			      enum rdma_cm_event_type evt,
637 			      nvme_rdma_cm_event_cb evt_cb)
638 {
639 	int	rc;
640 
641 	assert(evt_cb != NULL);
642 
643 	if (rqpair->evt != NULL) {
644 		rc = nvme_rdma_qpair_process_cm_event(rqpair);
645 		if (rc) {
646 			return rc;
647 		}
648 	}
649 
650 	rqpair->expected_evt_type = evt;
651 	rqpair->evt_cb = evt_cb;
652 	rqpair->evt_timeout_ticks = (NVME_RDMA_QPAIR_CM_EVENT_TIMEOUT_US * spdk_get_ticks_hz()) /
653 				    SPDK_SEC_TO_USEC + spdk_get_ticks();
654 
655 	return 0;
656 }
657 
658 static int
659 nvme_rdma_process_event_poll(struct nvme_rdma_qpair *rqpair)
660 {
661 	struct nvme_rdma_ctrlr	*rctrlr;
662 	int	rc = 0, rc2;
663 
664 	rctrlr = nvme_rdma_ctrlr(rqpair->qpair.ctrlr);
665 	assert(rctrlr != NULL);
666 
667 	if (!rqpair->evt && spdk_get_ticks() < rqpair->evt_timeout_ticks) {
668 		rc = nvme_rdma_poll_events(rctrlr);
669 		if (rc == -EAGAIN || rc == -EWOULDBLOCK) {
670 			return rc;
671 		}
672 	}
673 
674 	if (rqpair->evt == NULL) {
675 		rc = -EADDRNOTAVAIL;
676 		goto exit;
677 	}
678 
679 	rc = nvme_rdma_validate_cm_event(rqpair->expected_evt_type, rqpair->evt);
680 
681 	rc2 = nvme_rdma_qpair_process_cm_event(rqpair);
682 	/* bad message takes precedence over the other error codes from processing the event. */
683 	rc = rc == 0 ? rc2 : rc;
684 
685 exit:
686 	assert(rqpair->evt_cb != NULL);
687 	return rqpair->evt_cb(rqpair, rc);
688 }
689 
690 static int
691 nvme_rdma_resize_cq(struct nvme_rdma_qpair *rqpair, struct nvme_rdma_poller *poller)
692 {
693 	int	current_num_wc, required_num_wc;
694 
695 	required_num_wc = poller->required_num_wc + WC_PER_QPAIR(rqpair->num_entries);
696 	current_num_wc = poller->current_num_wc;
697 	if (current_num_wc < required_num_wc) {
698 		current_num_wc = spdk_max(current_num_wc * 2, required_num_wc);
699 	}
700 
701 	if (poller->current_num_wc != current_num_wc) {
702 		SPDK_DEBUGLOG(nvme, "Resize RDMA CQ from %d to %d\n", poller->current_num_wc,
703 			      current_num_wc);
704 		if (ibv_resize_cq(poller->cq, current_num_wc)) {
705 			SPDK_ERRLOG("RDMA CQ resize failed: errno %d: %s\n", errno, spdk_strerror(errno));
706 			return -1;
707 		}
708 
709 		poller->current_num_wc = current_num_wc;
710 	}
711 
712 	poller->required_num_wc = required_num_wc;
713 	return 0;
714 }
715 
716 static int
717 nvme_rdma_poll_group_set_cq(struct spdk_nvme_qpair *qpair)
718 {
719 	struct nvme_rdma_qpair          *rqpair = nvme_rdma_qpair(qpair);
720 	struct nvme_rdma_poll_group     *group = nvme_rdma_poll_group(qpair->poll_group);
721 	struct nvme_rdma_poller         *poller;
722 
723 	assert(rqpair->cq == NULL);
724 
725 	STAILQ_FOREACH(poller, &group->pollers, link) {
726 		if (poller->device == rqpair->cm_id->verbs) {
727 			if (nvme_rdma_resize_cq(rqpair, poller)) {
728 				return -EPROTO;
729 			}
730 			rqpair->cq = poller->cq;
731 			rqpair->poller = poller;
732 			break;
733 		}
734 	}
735 
736 	if (rqpair->cq == NULL) {
737 		SPDK_ERRLOG("Unable to find a cq for qpair %p on poll group %p\n", qpair, qpair->poll_group);
738 		return -EINVAL;
739 	}
740 
741 	return 0;
742 }
743 
744 static int
745 nvme_rdma_qpair_init(struct nvme_rdma_qpair *rqpair)
746 {
747 	int			rc;
748 	struct spdk_rdma_qp_init_attr	attr = {};
749 	struct ibv_device_attr	dev_attr;
750 	struct nvme_rdma_ctrlr	*rctrlr;
751 
752 	rc = ibv_query_device(rqpair->cm_id->verbs, &dev_attr);
753 	if (rc != 0) {
754 		SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
755 		return -1;
756 	}
757 
758 	if (rqpair->qpair.poll_group) {
759 		assert(!rqpair->cq);
760 		rc = nvme_rdma_poll_group_set_cq(&rqpair->qpair);
761 		if (rc) {
762 			SPDK_ERRLOG("Unable to activate the rdmaqpair.\n");
763 			return -1;
764 		}
765 		assert(rqpair->cq);
766 	} else {
767 		rqpair->cq = ibv_create_cq(rqpair->cm_id->verbs, rqpair->num_entries * 2, rqpair, NULL, 0);
768 		if (!rqpair->cq) {
769 			SPDK_ERRLOG("Unable to create completion queue: errno %d: %s\n", errno, spdk_strerror(errno));
770 			return -1;
771 		}
772 	}
773 
774 	rctrlr = nvme_rdma_ctrlr(rqpair->qpair.ctrlr);
775 	if (g_nvme_hooks.get_ibv_pd) {
776 		rctrlr->pd = g_nvme_hooks.get_ibv_pd(&rctrlr->ctrlr.trid, rqpair->cm_id->verbs);
777 	} else {
778 		rctrlr->pd = NULL;
779 	}
780 
781 	attr.pd =		rctrlr->pd;
782 	attr.stats =		rqpair->poller ? &rqpair->poller->stats.rdma_stats : NULL;
783 	attr.send_cq		= rqpair->cq;
784 	attr.recv_cq		= rqpair->cq;
785 	attr.cap.max_send_wr	= rqpair->num_entries; /* SEND operations */
786 	attr.cap.max_recv_wr	= rqpair->num_entries; /* RECV operations */
787 	attr.cap.max_send_sge	= spdk_min(NVME_RDMA_DEFAULT_TX_SGE, dev_attr.max_sge);
788 	attr.cap.max_recv_sge	= spdk_min(NVME_RDMA_DEFAULT_RX_SGE, dev_attr.max_sge);
789 
790 	rqpair->rdma_qp = spdk_rdma_qp_create(rqpair->cm_id, &attr);
791 
792 	if (!rqpair->rdma_qp) {
793 		return -1;
794 	}
795 
796 	rqpair->memory_domain = nvme_rdma_get_memory_domain(rqpair->rdma_qp->qp->pd);
797 	if (!rqpair->memory_domain) {
798 		SPDK_ERRLOG("Failed to get memory domain\n");
799 		return -1;
800 	}
801 
802 	/* ibv_create_qp will change the values in attr.cap. Make sure we store the proper value. */
803 	rqpair->max_send_sge = spdk_min(NVME_RDMA_DEFAULT_TX_SGE, attr.cap.max_send_sge);
804 	rqpair->max_recv_sge = spdk_min(NVME_RDMA_DEFAULT_RX_SGE, attr.cap.max_recv_sge);
805 	rqpair->current_num_recvs = 0;
806 	rqpair->current_num_sends = 0;
807 
808 	rctrlr->pd = rqpair->rdma_qp->qp->pd;
809 
810 	rqpair->cm_id->context = rqpair;
811 
812 	return 0;
813 }
814 
815 static inline int
816 nvme_rdma_qpair_submit_sends(struct nvme_rdma_qpair *rqpair)
817 {
818 	struct ibv_send_wr *bad_send_wr = NULL;
819 	int rc;
820 
821 	rc = spdk_rdma_qp_flush_send_wrs(rqpair->rdma_qp, &bad_send_wr);
822 
823 	if (spdk_unlikely(rc)) {
824 		SPDK_ERRLOG("Failed to post WRs on send queue, errno %d (%s), bad_wr %p\n",
825 			    rc, spdk_strerror(rc), bad_send_wr);
826 		while (bad_send_wr != NULL) {
827 			assert(rqpair->current_num_sends > 0);
828 			rqpair->current_num_sends--;
829 			bad_send_wr = bad_send_wr->next;
830 		}
831 		return rc;
832 	}
833 
834 	return 0;
835 }
836 
837 static inline int
838 nvme_rdma_qpair_submit_recvs(struct nvme_rdma_qpair *rqpair)
839 {
840 	struct ibv_recv_wr *bad_recv_wr;
841 	int rc = 0;
842 
843 	rc = spdk_rdma_qp_flush_recv_wrs(rqpair->rdma_qp, &bad_recv_wr);
844 	if (spdk_unlikely(rc)) {
845 		SPDK_ERRLOG("Failed to post WRs on receive queue, errno %d (%s), bad_wr %p\n",
846 			    rc, spdk_strerror(rc), bad_recv_wr);
847 		while (bad_recv_wr != NULL) {
848 			assert(rqpair->current_num_sends > 0);
849 			rqpair->current_num_recvs--;
850 			bad_recv_wr = bad_recv_wr->next;
851 		}
852 	}
853 
854 	return rc;
855 }
856 
857 /* Append the given send wr structure to the qpair's outstanding sends list. */
858 /* This function accepts only a single wr. */
859 static inline int
860 nvme_rdma_qpair_queue_send_wr(struct nvme_rdma_qpair *rqpair, struct ibv_send_wr *wr)
861 {
862 	assert(wr->next == NULL);
863 
864 	assert(rqpair->current_num_sends < rqpair->num_entries);
865 
866 	rqpair->current_num_sends++;
867 	spdk_rdma_qp_queue_send_wrs(rqpair->rdma_qp, wr);
868 
869 	if (!rqpair->delay_cmd_submit) {
870 		return nvme_rdma_qpair_submit_sends(rqpair);
871 	}
872 
873 	return 0;
874 }
875 
876 /* Append the given recv wr structure to the qpair's outstanding recvs list. */
877 /* This function accepts only a single wr. */
878 static inline int
879 nvme_rdma_qpair_queue_recv_wr(struct nvme_rdma_qpair *rqpair, struct ibv_recv_wr *wr)
880 {
881 
882 	assert(wr->next == NULL);
883 	assert(rqpair->current_num_recvs < rqpair->num_entries);
884 
885 	rqpair->current_num_recvs++;
886 	spdk_rdma_qp_queue_recv_wrs(rqpair->rdma_qp, wr);
887 
888 	if (!rqpair->delay_cmd_submit) {
889 		return nvme_rdma_qpair_submit_recvs(rqpair);
890 	}
891 
892 	return 0;
893 }
894 
895 #define nvme_rdma_trace_ibv_sge(sg_list) \
896 	if (sg_list) { \
897 		SPDK_DEBUGLOG(nvme, "local addr %p length 0x%x lkey 0x%x\n", \
898 			      (void *)(sg_list)->addr, (sg_list)->length, (sg_list)->lkey); \
899 	}
900 
901 static int
902 nvme_rdma_post_recv(struct nvme_rdma_qpair *rqpair, uint16_t rsp_idx)
903 {
904 	struct ibv_recv_wr *wr;
905 
906 	wr = &rqpair->rsp_recv_wrs[rsp_idx];
907 	wr->next = NULL;
908 	nvme_rdma_trace_ibv_sge(wr->sg_list);
909 	return nvme_rdma_qpair_queue_recv_wr(rqpair, wr);
910 }
911 
912 static int
913 nvme_rdma_reg_mr(struct rdma_cm_id *cm_id, union nvme_rdma_mr *mr, void *mem, size_t length)
914 {
915 	if (!g_nvme_hooks.get_rkey) {
916 		mr->mr = rdma_reg_msgs(cm_id, mem, length);
917 		if (mr->mr == NULL) {
918 			SPDK_ERRLOG("Unable to register mr: %s (%d)\n",
919 				    spdk_strerror(errno), errno);
920 			return -1;
921 		}
922 	} else {
923 		mr->key = g_nvme_hooks.get_rkey(cm_id->pd, mem, length);
924 	}
925 
926 	return 0;
927 }
928 
929 static void
930 nvme_rdma_dereg_mr(union nvme_rdma_mr *mr)
931 {
932 	if (!g_nvme_hooks.get_rkey) {
933 		if (mr->mr && rdma_dereg_mr(mr->mr)) {
934 			SPDK_ERRLOG("Unable to de-register mr\n");
935 		}
936 	} else {
937 		if (mr->key) {
938 			g_nvme_hooks.put_rkey(mr->key);
939 		}
940 	}
941 	memset(mr, 0, sizeof(*mr));
942 }
943 
944 static uint32_t
945 nvme_rdma_mr_get_lkey(union nvme_rdma_mr *mr)
946 {
947 	uint32_t lkey;
948 
949 	if (!g_nvme_hooks.get_rkey) {
950 		lkey = mr->mr->lkey;
951 	} else {
952 		lkey = *((uint64_t *) mr->key);
953 	}
954 
955 	return lkey;
956 }
957 
958 static void
959 nvme_rdma_unregister_rsps(struct nvme_rdma_qpair *rqpair)
960 {
961 	nvme_rdma_dereg_mr(&rqpair->rsp_mr);
962 }
963 
964 static void
965 nvme_rdma_free_rsps(struct nvme_rdma_qpair *rqpair)
966 {
967 	nvme_rdma_free(rqpair->rsps);
968 	rqpair->rsps = NULL;
969 	nvme_rdma_free(rqpair->rsp_sgls);
970 	rqpair->rsp_sgls = NULL;
971 	nvme_rdma_free(rqpair->rsp_recv_wrs);
972 	rqpair->rsp_recv_wrs = NULL;
973 }
974 
975 static int
976 nvme_rdma_alloc_rsps(struct nvme_rdma_qpair *rqpair)
977 {
978 	rqpair->rsps = NULL;
979 	rqpair->rsp_recv_wrs = NULL;
980 
981 	rqpair->rsp_sgls = nvme_rdma_calloc(rqpair->num_entries, sizeof(*rqpair->rsp_sgls));
982 	if (!rqpair->rsp_sgls) {
983 		SPDK_ERRLOG("Failed to allocate rsp_sgls\n");
984 		goto fail;
985 	}
986 
987 	rqpair->rsp_recv_wrs = nvme_rdma_calloc(rqpair->num_entries, sizeof(*rqpair->rsp_recv_wrs));
988 	if (!rqpair->rsp_recv_wrs) {
989 		SPDK_ERRLOG("Failed to allocate rsp_recv_wrs\n");
990 		goto fail;
991 	}
992 
993 	rqpair->rsps = nvme_rdma_calloc(rqpair->num_entries, sizeof(*rqpair->rsps));
994 	if (!rqpair->rsps) {
995 		SPDK_ERRLOG("can not allocate rdma rsps\n");
996 		goto fail;
997 	}
998 
999 	return 0;
1000 fail:
1001 	nvme_rdma_free_rsps(rqpair);
1002 	return -ENOMEM;
1003 }
1004 
1005 static int
1006 nvme_rdma_register_rsps(struct nvme_rdma_qpair *rqpair)
1007 {
1008 	uint16_t i;
1009 	int rc;
1010 	uint32_t lkey;
1011 
1012 	rc = nvme_rdma_reg_mr(rqpair->cm_id, &rqpair->rsp_mr,
1013 			      rqpair->rsps, rqpair->num_entries * sizeof(*rqpair->rsps));
1014 
1015 	if (rc < 0) {
1016 		goto fail;
1017 	}
1018 
1019 	lkey = nvme_rdma_mr_get_lkey(&rqpair->rsp_mr);
1020 
1021 	for (i = 0; i < rqpair->num_entries; i++) {
1022 		struct ibv_sge *rsp_sgl = &rqpair->rsp_sgls[i];
1023 		struct spdk_nvme_rdma_rsp *rsp = &rqpair->rsps[i];
1024 
1025 		rsp->rqpair = rqpair;
1026 		rsp->rdma_wr.type = RDMA_WR_TYPE_RECV;
1027 		rsp->idx = i;
1028 		rsp_sgl->addr = (uint64_t)&rqpair->rsps[i];
1029 		rsp_sgl->length = sizeof(struct spdk_nvme_cpl);
1030 		rsp_sgl->lkey = lkey;
1031 
1032 		rqpair->rsp_recv_wrs[i].wr_id = (uint64_t)&rsp->rdma_wr;
1033 		rqpair->rsp_recv_wrs[i].next = NULL;
1034 		rqpair->rsp_recv_wrs[i].sg_list = rsp_sgl;
1035 		rqpair->rsp_recv_wrs[i].num_sge = 1;
1036 
1037 		rc = nvme_rdma_post_recv(rqpair, i);
1038 		if (rc) {
1039 			goto fail;
1040 		}
1041 	}
1042 
1043 	rc = nvme_rdma_qpair_submit_recvs(rqpair);
1044 	if (rc) {
1045 		goto fail;
1046 	}
1047 
1048 	return 0;
1049 
1050 fail:
1051 	nvme_rdma_unregister_rsps(rqpair);
1052 	return rc;
1053 }
1054 
1055 static void
1056 nvme_rdma_unregister_reqs(struct nvme_rdma_qpair *rqpair)
1057 {
1058 	nvme_rdma_dereg_mr(&rqpair->cmd_mr);
1059 }
1060 
1061 static void
1062 nvme_rdma_free_reqs(struct nvme_rdma_qpair *rqpair)
1063 {
1064 	if (!rqpair->rdma_reqs) {
1065 		return;
1066 	}
1067 
1068 	nvme_rdma_free(rqpair->cmds);
1069 	rqpair->cmds = NULL;
1070 
1071 	nvme_rdma_free(rqpair->rdma_reqs);
1072 	rqpair->rdma_reqs = NULL;
1073 }
1074 
1075 static int
1076 nvme_rdma_alloc_reqs(struct nvme_rdma_qpair *rqpair)
1077 {
1078 	uint16_t i;
1079 
1080 	rqpair->rdma_reqs = nvme_rdma_calloc(rqpair->num_entries, sizeof(struct spdk_nvme_rdma_req));
1081 	if (rqpair->rdma_reqs == NULL) {
1082 		SPDK_ERRLOG("Failed to allocate rdma_reqs\n");
1083 		goto fail;
1084 	}
1085 
1086 	rqpair->cmds = nvme_rdma_calloc(rqpair->num_entries, sizeof(*rqpair->cmds));
1087 	if (!rqpair->cmds) {
1088 		SPDK_ERRLOG("Failed to allocate RDMA cmds\n");
1089 		goto fail;
1090 	}
1091 
1092 
1093 	TAILQ_INIT(&rqpair->free_reqs);
1094 	TAILQ_INIT(&rqpair->outstanding_reqs);
1095 	for (i = 0; i < rqpair->num_entries; i++) {
1096 		struct spdk_nvme_rdma_req	*rdma_req;
1097 		struct spdk_nvmf_cmd		*cmd;
1098 
1099 		rdma_req = &rqpair->rdma_reqs[i];
1100 		rdma_req->rdma_wr.type = RDMA_WR_TYPE_SEND;
1101 		cmd = &rqpair->cmds[i];
1102 
1103 		rdma_req->id = i;
1104 
1105 		/* The first RDMA sgl element will always point
1106 		 * at this data structure. Depending on whether
1107 		 * an NVMe-oF SGL is required, the length of
1108 		 * this element may change. */
1109 		rdma_req->send_sgl[0].addr = (uint64_t)cmd;
1110 		rdma_req->send_wr.wr_id = (uint64_t)&rdma_req->rdma_wr;
1111 		rdma_req->send_wr.next = NULL;
1112 		rdma_req->send_wr.opcode = IBV_WR_SEND;
1113 		rdma_req->send_wr.send_flags = IBV_SEND_SIGNALED;
1114 		rdma_req->send_wr.sg_list = rdma_req->send_sgl;
1115 		rdma_req->send_wr.imm_data = 0;
1116 
1117 		TAILQ_INSERT_TAIL(&rqpair->free_reqs, rdma_req, link);
1118 	}
1119 
1120 	return 0;
1121 fail:
1122 	nvme_rdma_free_reqs(rqpair);
1123 	return -ENOMEM;
1124 }
1125 
1126 static int
1127 nvme_rdma_register_reqs(struct nvme_rdma_qpair *rqpair)
1128 {
1129 	int i;
1130 	int rc;
1131 	uint32_t lkey;
1132 
1133 	rc = nvme_rdma_reg_mr(rqpair->cm_id, &rqpair->cmd_mr,
1134 			      rqpair->cmds, rqpair->num_entries * sizeof(*rqpair->cmds));
1135 
1136 	if (rc < 0) {
1137 		goto fail;
1138 	}
1139 
1140 	lkey = nvme_rdma_mr_get_lkey(&rqpair->cmd_mr);
1141 
1142 	for (i = 0; i < rqpair->num_entries; i++) {
1143 		rqpair->rdma_reqs[i].send_sgl[0].lkey = lkey;
1144 	}
1145 
1146 	return 0;
1147 
1148 fail:
1149 	nvme_rdma_unregister_reqs(rqpair);
1150 	return -ENOMEM;
1151 }
1152 
1153 static int nvme_rdma_connect(struct nvme_rdma_qpair *rqpair);
1154 
1155 static int
1156 nvme_rdma_route_resolved(struct nvme_rdma_qpair *rqpair, int ret)
1157 {
1158 	if (ret) {
1159 		SPDK_ERRLOG("RDMA route resolution error\n");
1160 		return -1;
1161 	}
1162 
1163 	ret = nvme_rdma_qpair_init(rqpair);
1164 	if (ret < 0) {
1165 		SPDK_ERRLOG("nvme_rdma_qpair_init() failed\n");
1166 		return -1;
1167 	}
1168 
1169 	return nvme_rdma_connect(rqpair);
1170 }
1171 
1172 static int
1173 nvme_rdma_addr_resolved(struct nvme_rdma_qpair *rqpair, int ret)
1174 {
1175 	if (ret) {
1176 		SPDK_ERRLOG("RDMA address resolution error\n");
1177 		return -1;
1178 	}
1179 
1180 	if (rqpair->qpair.ctrlr->opts.transport_ack_timeout != SPDK_NVME_TRANSPORT_ACK_TIMEOUT_DISABLED) {
1181 #ifdef SPDK_CONFIG_RDMA_SET_ACK_TIMEOUT
1182 		uint8_t timeout = rqpair->qpair.ctrlr->opts.transport_ack_timeout;
1183 		ret = rdma_set_option(rqpair->cm_id, RDMA_OPTION_ID,
1184 				      RDMA_OPTION_ID_ACK_TIMEOUT,
1185 				      &timeout, sizeof(timeout));
1186 		if (ret) {
1187 			SPDK_NOTICELOG("Can't apply RDMA_OPTION_ID_ACK_TIMEOUT %d, ret %d\n", timeout, ret);
1188 		}
1189 #else
1190 		SPDK_DEBUGLOG(nvme, "transport_ack_timeout is not supported\n");
1191 #endif
1192 	}
1193 
1194 	ret = rdma_resolve_route(rqpair->cm_id, NVME_RDMA_TIME_OUT_IN_MS);
1195 	if (ret) {
1196 		SPDK_ERRLOG("rdma_resolve_route\n");
1197 		return ret;
1198 	}
1199 
1200 	return nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_ROUTE_RESOLVED,
1201 					     nvme_rdma_route_resolved);
1202 }
1203 
1204 static int
1205 nvme_rdma_resolve_addr(struct nvme_rdma_qpair *rqpair,
1206 		       struct sockaddr *src_addr,
1207 		       struct sockaddr *dst_addr)
1208 {
1209 	int ret;
1210 
1211 	ret = rdma_resolve_addr(rqpair->cm_id, src_addr, dst_addr,
1212 				NVME_RDMA_TIME_OUT_IN_MS);
1213 	if (ret) {
1214 		SPDK_ERRLOG("rdma_resolve_addr, %d\n", errno);
1215 		return ret;
1216 	}
1217 
1218 	return nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_ADDR_RESOLVED,
1219 					     nvme_rdma_addr_resolved);
1220 }
1221 
1222 static int nvme_rdma_stale_conn_retry(struct nvme_rdma_qpair *rqpair);
1223 
1224 static int
1225 nvme_rdma_connect_established(struct nvme_rdma_qpair *rqpair, int ret)
1226 {
1227 	if (ret == -ESTALE) {
1228 		return nvme_rdma_stale_conn_retry(rqpair);
1229 	} else if (ret) {
1230 		SPDK_ERRLOG("RDMA connect error %d\n", ret);
1231 		return ret;
1232 	}
1233 
1234 	ret = nvme_rdma_register_reqs(rqpair);
1235 	SPDK_DEBUGLOG(nvme, "rc =%d\n", ret);
1236 	if (ret) {
1237 		SPDK_ERRLOG("Unable to register rqpair RDMA requests\n");
1238 		return -1;
1239 	}
1240 	SPDK_DEBUGLOG(nvme, "RDMA requests registered\n");
1241 
1242 	ret = nvme_rdma_register_rsps(rqpair);
1243 	SPDK_DEBUGLOG(nvme, "rc =%d\n", ret);
1244 	if (ret < 0) {
1245 		SPDK_ERRLOG("Unable to register rqpair RDMA responses\n");
1246 		return -1;
1247 	}
1248 	SPDK_DEBUGLOG(nvme, "RDMA responses registered\n");
1249 
1250 	rqpair->mr_map = spdk_rdma_create_mem_map(rqpair->rdma_qp->qp->pd, &g_nvme_hooks,
1251 			 SPDK_RDMA_MEMORY_MAP_ROLE_INITIATOR);
1252 	if (!rqpair->mr_map) {
1253 		SPDK_ERRLOG("Unable to register RDMA memory translation map\n");
1254 		return -1;
1255 	}
1256 
1257 	rqpair->state = NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_SEND;
1258 
1259 	return 0;
1260 }
1261 
1262 static int
1263 nvme_rdma_connect(struct nvme_rdma_qpair *rqpair)
1264 {
1265 	struct rdma_conn_param				param = {};
1266 	struct spdk_nvmf_rdma_request_private_data	request_data = {};
1267 	struct ibv_device_attr				attr;
1268 	int						ret;
1269 	struct spdk_nvme_ctrlr				*ctrlr;
1270 
1271 	ret = ibv_query_device(rqpair->cm_id->verbs, &attr);
1272 	if (ret != 0) {
1273 		SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
1274 		return ret;
1275 	}
1276 
1277 	param.responder_resources = attr.max_qp_rd_atom;
1278 
1279 	ctrlr = rqpair->qpair.ctrlr;
1280 	if (!ctrlr) {
1281 		return -1;
1282 	}
1283 
1284 	request_data.qid = rqpair->qpair.id;
1285 	request_data.hrqsize = rqpair->num_entries + 1;
1286 	request_data.hsqsize = rqpair->num_entries;
1287 	request_data.cntlid = ctrlr->cntlid;
1288 
1289 	param.private_data = &request_data;
1290 	param.private_data_len = sizeof(request_data);
1291 	param.retry_count = ctrlr->opts.transport_retry_count;
1292 	param.rnr_retry_count = 7;
1293 
1294 	/* Fields below are ignored by rdma cm if qpair has been
1295 	 * created using rdma cm API. */
1296 	param.srq = 0;
1297 	param.qp_num = rqpair->rdma_qp->qp->qp_num;
1298 
1299 	ret = rdma_connect(rqpair->cm_id, &param);
1300 	if (ret) {
1301 		SPDK_ERRLOG("nvme rdma connect error\n");
1302 		return ret;
1303 	}
1304 
1305 	return nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_ESTABLISHED,
1306 					     nvme_rdma_connect_established);
1307 }
1308 
1309 static int
1310 nvme_rdma_parse_addr(struct sockaddr_storage *sa, int family, const char *addr, const char *service)
1311 {
1312 	struct addrinfo *res;
1313 	struct addrinfo hints;
1314 	int ret;
1315 
1316 	memset(&hints, 0, sizeof(hints));
1317 	hints.ai_family = family;
1318 	hints.ai_socktype = SOCK_STREAM;
1319 	hints.ai_protocol = 0;
1320 
1321 	ret = getaddrinfo(addr, service, &hints, &res);
1322 	if (ret) {
1323 		SPDK_ERRLOG("getaddrinfo failed: %s (%d)\n", gai_strerror(ret), ret);
1324 		return ret;
1325 	}
1326 
1327 	if (res->ai_addrlen > sizeof(*sa)) {
1328 		SPDK_ERRLOG("getaddrinfo() ai_addrlen %zu too large\n", (size_t)res->ai_addrlen);
1329 		ret = EINVAL;
1330 	} else {
1331 		memcpy(sa, res->ai_addr, res->ai_addrlen);
1332 	}
1333 
1334 	freeaddrinfo(res);
1335 	return ret;
1336 }
1337 
1338 static int
1339 nvme_rdma_ctrlr_connect_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
1340 {
1341 	struct sockaddr_storage dst_addr;
1342 	struct sockaddr_storage src_addr;
1343 	bool src_addr_specified;
1344 	int rc;
1345 	struct nvme_rdma_ctrlr *rctrlr;
1346 	struct nvme_rdma_qpair *rqpair;
1347 	int family;
1348 
1349 	rqpair = nvme_rdma_qpair(qpair);
1350 	rctrlr = nvme_rdma_ctrlr(ctrlr);
1351 	assert(rctrlr != NULL);
1352 
1353 	switch (ctrlr->trid.adrfam) {
1354 	case SPDK_NVMF_ADRFAM_IPV4:
1355 		family = AF_INET;
1356 		break;
1357 	case SPDK_NVMF_ADRFAM_IPV6:
1358 		family = AF_INET6;
1359 		break;
1360 	default:
1361 		SPDK_ERRLOG("Unhandled ADRFAM %d\n", ctrlr->trid.adrfam);
1362 		return -1;
1363 	}
1364 
1365 	SPDK_DEBUGLOG(nvme, "adrfam %d ai_family %d\n", ctrlr->trid.adrfam, family);
1366 
1367 	memset(&dst_addr, 0, sizeof(dst_addr));
1368 
1369 	SPDK_DEBUGLOG(nvme, "trsvcid is %s\n", ctrlr->trid.trsvcid);
1370 	rc = nvme_rdma_parse_addr(&dst_addr, family, ctrlr->trid.traddr, ctrlr->trid.trsvcid);
1371 	if (rc != 0) {
1372 		SPDK_ERRLOG("dst_addr nvme_rdma_parse_addr() failed\n");
1373 		return -1;
1374 	}
1375 
1376 	if (ctrlr->opts.src_addr[0] || ctrlr->opts.src_svcid[0]) {
1377 		memset(&src_addr, 0, sizeof(src_addr));
1378 		rc = nvme_rdma_parse_addr(&src_addr, family, ctrlr->opts.src_addr, ctrlr->opts.src_svcid);
1379 		if (rc != 0) {
1380 			SPDK_ERRLOG("src_addr nvme_rdma_parse_addr() failed\n");
1381 			return -1;
1382 		}
1383 		src_addr_specified = true;
1384 	} else {
1385 		src_addr_specified = false;
1386 	}
1387 
1388 	rc = rdma_create_id(rctrlr->cm_channel, &rqpair->cm_id, rqpair, RDMA_PS_TCP);
1389 	if (rc < 0) {
1390 		SPDK_ERRLOG("rdma_create_id() failed\n");
1391 		return -1;
1392 	}
1393 
1394 	rc = nvme_rdma_resolve_addr(rqpair,
1395 				    src_addr_specified ? (struct sockaddr *)&src_addr : NULL,
1396 				    (struct sockaddr *)&dst_addr);
1397 	if (rc < 0) {
1398 		SPDK_ERRLOG("nvme_rdma_resolve_addr() failed\n");
1399 		return -1;
1400 	}
1401 
1402 	rqpair->state = NVME_RDMA_QPAIR_STATE_INITIALIZING;
1403 
1404 	return 0;
1405 }
1406 
1407 static int
1408 nvme_rdma_stale_conn_reconnect(struct nvme_rdma_qpair *rqpair)
1409 {
1410 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
1411 
1412 	if (spdk_get_ticks() < rqpair->evt_timeout_ticks) {
1413 		return -EAGAIN;
1414 	}
1415 
1416 	return nvme_rdma_ctrlr_connect_qpair(qpair->ctrlr, qpair);
1417 }
1418 
1419 static int
1420 nvme_rdma_ctrlr_connect_qpair_poll(struct spdk_nvme_ctrlr *ctrlr,
1421 				   struct spdk_nvme_qpair *qpair)
1422 {
1423 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
1424 	int rc;
1425 
1426 	if (rqpair->in_connect_poll) {
1427 		return -EAGAIN;
1428 	}
1429 
1430 	rqpair->in_connect_poll = true;
1431 
1432 	switch (rqpair->state) {
1433 	case NVME_RDMA_QPAIR_STATE_INVALID:
1434 		rc = -EAGAIN;
1435 		break;
1436 
1437 	case NVME_RDMA_QPAIR_STATE_INITIALIZING:
1438 	case NVME_RDMA_QPAIR_STATE_EXITING:
1439 		if (!nvme_qpair_is_admin_queue(qpair)) {
1440 			nvme_robust_mutex_lock(&ctrlr->ctrlr_lock);
1441 		}
1442 
1443 		rc = nvme_rdma_process_event_poll(rqpair);
1444 
1445 		if (!nvme_qpair_is_admin_queue(qpair)) {
1446 			nvme_robust_mutex_unlock(&ctrlr->ctrlr_lock);
1447 		}
1448 
1449 		if (rc == 0) {
1450 			rc = -EAGAIN;
1451 		}
1452 		rqpair->in_connect_poll = false;
1453 
1454 		return rc;
1455 
1456 	case NVME_RDMA_QPAIR_STATE_STALE_CONN:
1457 		rc = nvme_rdma_stale_conn_reconnect(rqpair);
1458 		if (rc == 0) {
1459 			rc = -EAGAIN;
1460 		}
1461 		break;
1462 	case NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_SEND:
1463 		rc = nvme_fabric_qpair_connect_async(qpair, rqpair->num_entries + 1);
1464 		if (rc == 0) {
1465 			rqpair->state = NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_POLL;
1466 			rc = -EAGAIN;
1467 		} else {
1468 			SPDK_ERRLOG("Failed to send an NVMe-oF Fabric CONNECT command\n");
1469 		}
1470 		break;
1471 	case NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_POLL:
1472 		rc = nvme_fabric_qpair_connect_poll(qpair);
1473 		if (rc == 0) {
1474 			rqpair->state = NVME_RDMA_QPAIR_STATE_RUNNING;
1475 			nvme_qpair_set_state(qpair, NVME_QPAIR_CONNECTED);
1476 		} else if (rc != -EAGAIN) {
1477 			SPDK_ERRLOG("Failed to poll NVMe-oF Fabric CONNECT command\n");
1478 		}
1479 		break;
1480 	case NVME_RDMA_QPAIR_STATE_RUNNING:
1481 		rc = 0;
1482 		break;
1483 	default:
1484 		assert(false);
1485 		rc = -EINVAL;
1486 		break;
1487 	}
1488 
1489 	rqpair->in_connect_poll = false;
1490 
1491 	return rc;
1492 }
1493 
1494 static inline int
1495 nvme_rdma_get_memory_translation(struct nvme_request *req, struct nvme_rdma_qpair *rqpair,
1496 				 struct nvme_rdma_memory_translation_ctx *_ctx)
1497 {
1498 	struct spdk_memory_domain_translation_ctx ctx;
1499 	struct spdk_memory_domain_translation_result dma_translation = {.iov_count = 0};
1500 	struct spdk_rdma_memory_translation rdma_translation;
1501 	int rc;
1502 
1503 	assert(req);
1504 	assert(rqpair);
1505 	assert(_ctx);
1506 
1507 	if (req->payload.opts && req->payload.opts->memory_domain) {
1508 		ctx.size = sizeof(struct spdk_memory_domain_translation_ctx);
1509 		ctx.rdma.ibv_qp = rqpair->rdma_qp->qp;
1510 		dma_translation.size = sizeof(struct spdk_memory_domain_translation_result);
1511 
1512 		rc = spdk_memory_domain_translate_data(req->payload.opts->memory_domain,
1513 						       req->payload.opts->memory_domain_ctx,
1514 						       rqpair->memory_domain->domain, &ctx, _ctx->addr,
1515 						       _ctx->length, &dma_translation);
1516 		if (spdk_unlikely(rc) || dma_translation.iov_count != 1) {
1517 			SPDK_ERRLOG("DMA memory translation failed, rc %d, iov count %u\n", rc, dma_translation.iov_count);
1518 			return rc;
1519 		}
1520 
1521 		_ctx->lkey = dma_translation.rdma.lkey;
1522 		_ctx->rkey = dma_translation.rdma.rkey;
1523 		_ctx->addr = dma_translation.iov.iov_base;
1524 		_ctx->length = dma_translation.iov.iov_len;
1525 	} else {
1526 		rc = spdk_rdma_get_translation(rqpair->mr_map, _ctx->addr, _ctx->length, &rdma_translation);
1527 		if (spdk_unlikely(rc)) {
1528 			SPDK_ERRLOG("RDMA memory translation failed, rc %d\n", rc);
1529 			return rc;
1530 		}
1531 		if (rdma_translation.translation_type == SPDK_RDMA_TRANSLATION_MR) {
1532 			_ctx->lkey = rdma_translation.mr_or_key.mr->lkey;
1533 			_ctx->rkey = rdma_translation.mr_or_key.mr->rkey;
1534 		} else {
1535 			_ctx->lkey = _ctx->rkey = (uint32_t)rdma_translation.mr_or_key.key;
1536 		}
1537 	}
1538 
1539 	return 0;
1540 }
1541 
1542 
1543 /*
1544  * Build SGL describing empty payload.
1545  */
1546 static int
1547 nvme_rdma_build_null_request(struct spdk_nvme_rdma_req *rdma_req)
1548 {
1549 	struct nvme_request *req = rdma_req->req;
1550 
1551 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1552 
1553 	/* The first element of this SGL is pointing at an
1554 	 * spdk_nvmf_cmd object. For this particular command,
1555 	 * we only need the first 64 bytes corresponding to
1556 	 * the NVMe command. */
1557 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1558 
1559 	/* The RDMA SGL needs one element describing the NVMe command. */
1560 	rdma_req->send_wr.num_sge = 1;
1561 
1562 	req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1563 	req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1564 	req->cmd.dptr.sgl1.keyed.length = 0;
1565 	req->cmd.dptr.sgl1.keyed.key = 0;
1566 	req->cmd.dptr.sgl1.address = 0;
1567 
1568 	return 0;
1569 }
1570 
1571 /*
1572  * Build inline SGL describing contiguous payload buffer.
1573  */
1574 static int
1575 nvme_rdma_build_contig_inline_request(struct nvme_rdma_qpair *rqpair,
1576 				      struct spdk_nvme_rdma_req *rdma_req)
1577 {
1578 	struct nvme_request *req = rdma_req->req;
1579 	struct nvme_rdma_memory_translation_ctx ctx = {
1580 		.addr = req->payload.contig_or_cb_arg + req->payload_offset,
1581 		.length = req->payload_size
1582 	};
1583 	int rc;
1584 
1585 	assert(ctx.length != 0);
1586 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
1587 
1588 	rc = nvme_rdma_get_memory_translation(req, rqpair, &ctx);
1589 	if (spdk_unlikely(rc)) {
1590 		return -1;
1591 	}
1592 
1593 	rdma_req->send_sgl[1].lkey = ctx.lkey;
1594 
1595 	/* The first element of this SGL is pointing at an
1596 	 * spdk_nvmf_cmd object. For this particular command,
1597 	 * we only need the first 64 bytes corresponding to
1598 	 * the NVMe command. */
1599 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1600 
1601 	rdma_req->send_sgl[1].addr = (uint64_t)ctx.addr;
1602 	rdma_req->send_sgl[1].length = (uint32_t)ctx.length;
1603 
1604 	/* The RDMA SGL contains two elements. The first describes
1605 	 * the NVMe command and the second describes the data
1606 	 * payload. */
1607 	rdma_req->send_wr.num_sge = 2;
1608 
1609 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1610 	req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
1611 	req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1612 	req->cmd.dptr.sgl1.unkeyed.length = (uint32_t)ctx.length;
1613 	/* Inline only supported for icdoff == 0 currently.  This function will
1614 	 * not get called for controllers with other values. */
1615 	req->cmd.dptr.sgl1.address = (uint64_t)0;
1616 
1617 	return 0;
1618 }
1619 
1620 /*
1621  * Build SGL describing contiguous payload buffer.
1622  */
1623 static int
1624 nvme_rdma_build_contig_request(struct nvme_rdma_qpair *rqpair,
1625 			       struct spdk_nvme_rdma_req *rdma_req)
1626 {
1627 	struct nvme_request *req = rdma_req->req;
1628 	struct nvme_rdma_memory_translation_ctx ctx = {
1629 		.addr = req->payload.contig_or_cb_arg + req->payload_offset,
1630 		.length = req->payload_size
1631 	};
1632 	int rc;
1633 
1634 	assert(req->payload_size != 0);
1635 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
1636 
1637 	if (spdk_unlikely(req->payload_size > NVME_RDMA_MAX_KEYED_SGL_LENGTH)) {
1638 		SPDK_ERRLOG("SGL length %u exceeds max keyed SGL block size %u\n",
1639 			    req->payload_size, NVME_RDMA_MAX_KEYED_SGL_LENGTH);
1640 		return -1;
1641 	}
1642 
1643 	rc = nvme_rdma_get_memory_translation(req, rqpair, &ctx);
1644 	if (spdk_unlikely(rc)) {
1645 		return -1;
1646 	}
1647 
1648 	req->cmd.dptr.sgl1.keyed.key = ctx.rkey;
1649 
1650 	/* The first element of this SGL is pointing at an
1651 	 * spdk_nvmf_cmd object. For this particular command,
1652 	 * we only need the first 64 bytes corresponding to
1653 	 * the NVMe command. */
1654 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1655 
1656 	/* The RDMA SGL needs one element describing the NVMe command. */
1657 	rdma_req->send_wr.num_sge = 1;
1658 
1659 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1660 	req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1661 	req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1662 	req->cmd.dptr.sgl1.keyed.length = (uint32_t)ctx.length;
1663 	req->cmd.dptr.sgl1.address = (uint64_t)ctx.addr;
1664 
1665 	return 0;
1666 }
1667 
1668 /*
1669  * Build SGL describing scattered payload buffer.
1670  */
1671 static int
1672 nvme_rdma_build_sgl_request(struct nvme_rdma_qpair *rqpair,
1673 			    struct spdk_nvme_rdma_req *rdma_req)
1674 {
1675 	struct nvme_request *req = rdma_req->req;
1676 	struct spdk_nvmf_cmd *cmd = &rqpair->cmds[rdma_req->id];
1677 	struct nvme_rdma_memory_translation_ctx ctx;
1678 	uint32_t remaining_size;
1679 	uint32_t sge_length;
1680 	int rc, max_num_sgl, num_sgl_desc;
1681 
1682 	assert(req->payload_size != 0);
1683 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
1684 	assert(req->payload.reset_sgl_fn != NULL);
1685 	assert(req->payload.next_sge_fn != NULL);
1686 	req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
1687 
1688 	max_num_sgl = req->qpair->ctrlr->max_sges;
1689 
1690 	remaining_size = req->payload_size;
1691 	num_sgl_desc = 0;
1692 	do {
1693 		rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &ctx.addr, &sge_length);
1694 		if (rc) {
1695 			return -1;
1696 		}
1697 
1698 		sge_length = spdk_min(remaining_size, sge_length);
1699 
1700 		if (spdk_unlikely(sge_length > NVME_RDMA_MAX_KEYED_SGL_LENGTH)) {
1701 			SPDK_ERRLOG("SGL length %u exceeds max keyed SGL block size %u\n",
1702 				    sge_length, NVME_RDMA_MAX_KEYED_SGL_LENGTH);
1703 			return -1;
1704 		}
1705 		ctx.length = sge_length;
1706 		rc = nvme_rdma_get_memory_translation(req, rqpair, &ctx);
1707 		if (spdk_unlikely(rc)) {
1708 			return -1;
1709 		}
1710 
1711 		cmd->sgl[num_sgl_desc].keyed.key = ctx.rkey;
1712 		cmd->sgl[num_sgl_desc].keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1713 		cmd->sgl[num_sgl_desc].keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1714 		cmd->sgl[num_sgl_desc].keyed.length = (uint32_t)ctx.length;
1715 		cmd->sgl[num_sgl_desc].address = (uint64_t)ctx.addr;
1716 
1717 		remaining_size -= ctx.length;
1718 		num_sgl_desc++;
1719 	} while (remaining_size > 0 && num_sgl_desc < max_num_sgl);
1720 
1721 
1722 	/* Should be impossible if we did our sgl checks properly up the stack, but do a sanity check here. */
1723 	if (remaining_size > 0) {
1724 		return -1;
1725 	}
1726 
1727 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1728 
1729 	/* The RDMA SGL needs one element describing some portion
1730 	 * of the spdk_nvmf_cmd structure. */
1731 	rdma_req->send_wr.num_sge = 1;
1732 
1733 	/*
1734 	 * If only one SGL descriptor is required, it can be embedded directly in the command
1735 	 * as a data block descriptor.
1736 	 */
1737 	if (num_sgl_desc == 1) {
1738 		/* The first element of this SGL is pointing at an
1739 		 * spdk_nvmf_cmd object. For this particular command,
1740 		 * we only need the first 64 bytes corresponding to
1741 		 * the NVMe command. */
1742 		rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1743 
1744 		req->cmd.dptr.sgl1.keyed.type = cmd->sgl[0].keyed.type;
1745 		req->cmd.dptr.sgl1.keyed.subtype = cmd->sgl[0].keyed.subtype;
1746 		req->cmd.dptr.sgl1.keyed.length = cmd->sgl[0].keyed.length;
1747 		req->cmd.dptr.sgl1.keyed.key = cmd->sgl[0].keyed.key;
1748 		req->cmd.dptr.sgl1.address = cmd->sgl[0].address;
1749 	} else {
1750 		/*
1751 		 * Otherwise, The SGL descriptor embedded in the command must point to the list of
1752 		 * SGL descriptors used to describe the operation. In that case it is a last segment descriptor.
1753 		 */
1754 		uint32_t descriptors_size = sizeof(struct spdk_nvme_sgl_descriptor) * num_sgl_desc;
1755 
1756 		if (spdk_unlikely(descriptors_size > rqpair->qpair.ctrlr->ioccsz_bytes)) {
1757 			SPDK_ERRLOG("Size of SGL descriptors (%u) exceeds ICD (%u)\n",
1758 				    descriptors_size, rqpair->qpair.ctrlr->ioccsz_bytes);
1759 			return -1;
1760 		}
1761 		rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd) + descriptors_size;
1762 
1763 		req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_LAST_SEGMENT;
1764 		req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1765 		req->cmd.dptr.sgl1.unkeyed.length = descriptors_size;
1766 		req->cmd.dptr.sgl1.address = (uint64_t)0;
1767 	}
1768 
1769 	return 0;
1770 }
1771 
1772 /*
1773  * Build inline SGL describing sgl payload buffer.
1774  */
1775 static int
1776 nvme_rdma_build_sgl_inline_request(struct nvme_rdma_qpair *rqpair,
1777 				   struct spdk_nvme_rdma_req *rdma_req)
1778 {
1779 	struct nvme_request *req = rdma_req->req;
1780 	struct nvme_rdma_memory_translation_ctx ctx;
1781 	uint32_t length;
1782 	int rc;
1783 
1784 	assert(req->payload_size != 0);
1785 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
1786 	assert(req->payload.reset_sgl_fn != NULL);
1787 	assert(req->payload.next_sge_fn != NULL);
1788 	req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
1789 
1790 	rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &ctx.addr, &length);
1791 	if (rc) {
1792 		return -1;
1793 	}
1794 
1795 	if (length < req->payload_size) {
1796 		SPDK_DEBUGLOG(nvme, "Inline SGL request split so sending separately.\n");
1797 		return nvme_rdma_build_sgl_request(rqpair, rdma_req);
1798 	}
1799 
1800 	if (length > req->payload_size) {
1801 		length = req->payload_size;
1802 	}
1803 
1804 	ctx.length = length;
1805 	rc = nvme_rdma_get_memory_translation(req, rqpair, &ctx);
1806 	if (spdk_unlikely(rc)) {
1807 		return -1;
1808 	}
1809 
1810 	rdma_req->send_sgl[1].addr = (uint64_t)ctx.addr;
1811 	rdma_req->send_sgl[1].length = (uint32_t)ctx.length;
1812 	rdma_req->send_sgl[1].lkey = ctx.lkey;
1813 
1814 	rdma_req->send_wr.num_sge = 2;
1815 
1816 	/* The first element of this SGL is pointing at an
1817 	 * spdk_nvmf_cmd object. For this particular command,
1818 	 * we only need the first 64 bytes corresponding to
1819 	 * the NVMe command. */
1820 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1821 
1822 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1823 	req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
1824 	req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1825 	req->cmd.dptr.sgl1.unkeyed.length = (uint32_t)ctx.length;
1826 	/* Inline only supported for icdoff == 0 currently.  This function will
1827 	 * not get called for controllers with other values. */
1828 	req->cmd.dptr.sgl1.address = (uint64_t)0;
1829 
1830 	return 0;
1831 }
1832 
1833 static int
1834 nvme_rdma_req_init(struct nvme_rdma_qpair *rqpair, struct nvme_request *req,
1835 		   struct spdk_nvme_rdma_req *rdma_req)
1836 {
1837 	struct spdk_nvme_ctrlr *ctrlr = rqpair->qpair.ctrlr;
1838 	enum nvme_payload_type payload_type;
1839 	bool icd_supported;
1840 	int rc;
1841 
1842 	assert(rdma_req->req == NULL);
1843 	rdma_req->req = req;
1844 	req->cmd.cid = rdma_req->id;
1845 	payload_type = nvme_payload_type(&req->payload);
1846 	/*
1847 	 * Check if icdoff is non zero, to avoid interop conflicts with
1848 	 * targets with non-zero icdoff.  Both SPDK and the Linux kernel
1849 	 * targets use icdoff = 0.  For targets with non-zero icdoff, we
1850 	 * will currently just not use inline data for now.
1851 	 */
1852 	icd_supported = spdk_nvme_opc_get_data_transfer(req->cmd.opc) == SPDK_NVME_DATA_HOST_TO_CONTROLLER
1853 			&& req->payload_size <= ctrlr->ioccsz_bytes && ctrlr->icdoff == 0;
1854 
1855 	if (req->payload_size == 0) {
1856 		rc = nvme_rdma_build_null_request(rdma_req);
1857 	} else if (payload_type == NVME_PAYLOAD_TYPE_CONTIG) {
1858 		if (icd_supported) {
1859 			rc = nvme_rdma_build_contig_inline_request(rqpair, rdma_req);
1860 		} else {
1861 			rc = nvme_rdma_build_contig_request(rqpair, rdma_req);
1862 		}
1863 	} else if (payload_type == NVME_PAYLOAD_TYPE_SGL) {
1864 		if (icd_supported) {
1865 			rc = nvme_rdma_build_sgl_inline_request(rqpair, rdma_req);
1866 		} else {
1867 			rc = nvme_rdma_build_sgl_request(rqpair, rdma_req);
1868 		}
1869 	} else {
1870 		rc = -1;
1871 	}
1872 
1873 	if (rc) {
1874 		rdma_req->req = NULL;
1875 		return rc;
1876 	}
1877 
1878 	memcpy(&rqpair->cmds[rdma_req->id], &req->cmd, sizeof(req->cmd));
1879 	return 0;
1880 }
1881 
1882 static struct spdk_nvme_qpair *
1883 nvme_rdma_ctrlr_create_qpair(struct spdk_nvme_ctrlr *ctrlr,
1884 			     uint16_t qid, uint32_t qsize,
1885 			     enum spdk_nvme_qprio qprio,
1886 			     uint32_t num_requests,
1887 			     bool delay_cmd_submit,
1888 			     bool async)
1889 {
1890 	struct nvme_rdma_qpair *rqpair;
1891 	struct spdk_nvme_qpair *qpair;
1892 	int rc;
1893 
1894 	if (qsize < SPDK_NVME_QUEUE_MIN_ENTRIES) {
1895 		SPDK_ERRLOG("Failed to create qpair with size %u. Minimum queue size is %d.\n",
1896 			    qsize, SPDK_NVME_QUEUE_MIN_ENTRIES);
1897 		return NULL;
1898 	}
1899 
1900 	rqpair = nvme_rdma_calloc(1, sizeof(struct nvme_rdma_qpair));
1901 	if (!rqpair) {
1902 		SPDK_ERRLOG("failed to get create rqpair\n");
1903 		return NULL;
1904 	}
1905 
1906 	/* Set num_entries one less than queue size. According to NVMe
1907 	 * and NVMe-oF specs we can not submit queue size requests,
1908 	 * one slot shall always remain empty.
1909 	 */
1910 	rqpair->num_entries = qsize - 1;
1911 	rqpair->delay_cmd_submit = delay_cmd_submit;
1912 	rqpair->state = NVME_RDMA_QPAIR_STATE_INVALID;
1913 	qpair = &rqpair->qpair;
1914 	rc = nvme_qpair_init(qpair, qid, ctrlr, qprio, num_requests, async);
1915 	if (rc != 0) {
1916 		nvme_rdma_free(rqpair);
1917 		return NULL;
1918 	}
1919 
1920 	rc = nvme_rdma_alloc_reqs(rqpair);
1921 	SPDK_DEBUGLOG(nvme, "rc =%d\n", rc);
1922 	if (rc) {
1923 		SPDK_ERRLOG("Unable to allocate rqpair RDMA requests\n");
1924 		nvme_rdma_free(rqpair);
1925 		return NULL;
1926 	}
1927 	SPDK_DEBUGLOG(nvme, "RDMA requests allocated\n");
1928 
1929 	rc = nvme_rdma_alloc_rsps(rqpair);
1930 	SPDK_DEBUGLOG(nvme, "rc =%d\n", rc);
1931 	if (rc < 0) {
1932 		SPDK_ERRLOG("Unable to allocate rqpair RDMA responses\n");
1933 		nvme_rdma_free_reqs(rqpair);
1934 		nvme_rdma_free(rqpair);
1935 		return NULL;
1936 	}
1937 	SPDK_DEBUGLOG(nvme, "RDMA responses allocated\n");
1938 
1939 	return qpair;
1940 }
1941 
1942 static void
1943 nvme_rdma_qpair_destroy(struct nvme_rdma_qpair *rqpair)
1944 {
1945 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
1946 	struct nvme_rdma_ctrlr *rctrlr;
1947 	struct nvme_rdma_cm_event_entry *entry, *tmp;
1948 
1949 	spdk_rdma_free_mem_map(&rqpair->mr_map);
1950 	nvme_rdma_unregister_reqs(rqpair);
1951 	nvme_rdma_unregister_rsps(rqpair);
1952 
1953 	if (rqpair->evt) {
1954 		rdma_ack_cm_event(rqpair->evt);
1955 		rqpair->evt = NULL;
1956 	}
1957 
1958 	/*
1959 	 * This works because we have the controller lock both in
1960 	 * this function and in the function where we add new events.
1961 	 */
1962 	if (qpair->ctrlr != NULL) {
1963 		rctrlr = nvme_rdma_ctrlr(qpair->ctrlr);
1964 		STAILQ_FOREACH_SAFE(entry, &rctrlr->pending_cm_events, link, tmp) {
1965 			if (entry->evt->id->context == rqpair) {
1966 				STAILQ_REMOVE(&rctrlr->pending_cm_events, entry, nvme_rdma_cm_event_entry, link);
1967 				rdma_ack_cm_event(entry->evt);
1968 				STAILQ_INSERT_HEAD(&rctrlr->free_cm_events, entry, link);
1969 			}
1970 		}
1971 	}
1972 
1973 	if (rqpair->cm_id) {
1974 		if (rqpair->rdma_qp) {
1975 			spdk_rdma_qp_destroy(rqpair->rdma_qp);
1976 			rqpair->rdma_qp = NULL;
1977 		}
1978 
1979 		rdma_destroy_id(rqpair->cm_id);
1980 		rqpair->cm_id = NULL;
1981 	}
1982 
1983 	if (rqpair->cq) {
1984 		ibv_destroy_cq(rqpair->cq);
1985 		rqpair->cq = NULL;
1986 	}
1987 }
1988 
1989 static void nvme_rdma_qpair_abort_reqs(struct spdk_nvme_qpair *qpair, uint32_t dnr);
1990 
1991 static int
1992 nvme_rdma_qpair_disconnected(struct nvme_rdma_qpair *rqpair, int ret)
1993 {
1994 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
1995 
1996 	nvme_rdma_qpair_destroy(rqpair);
1997 
1998 	nvme_rdma_qpair_abort_reqs(&rqpair->qpair, 0);
1999 
2000 	if (ret) {
2001 		SPDK_DEBUGLOG(nvme, "Target did not respond to qpair disconnect.\n");
2002 		goto quiet;
2003 	}
2004 
2005 	if (qpair->poll_group == NULL) {
2006 		/* If poll group is not used, cq is already destroyed. So complete
2007 		 * disconnecting qpair immediately.
2008 		 */
2009 		goto quiet;
2010 	}
2011 
2012 	if (rqpair->current_num_sends != 0 || rqpair->current_num_recvs != 0) {
2013 		rqpair->state = NVME_RDMA_QPAIR_STATE_LINGERING;
2014 		rqpair->evt_timeout_ticks = (NVME_RDMA_DISCONNECTED_QPAIR_TIMEOUT_US * spdk_get_ticks_hz()) /
2015 					    SPDK_SEC_TO_USEC + spdk_get_ticks();
2016 
2017 		return -EAGAIN;
2018 	}
2019 
2020 quiet:
2021 	rqpair->state = NVME_RDMA_QPAIR_STATE_EXITED;
2022 
2023 	nvme_transport_ctrlr_disconnect_qpair_done(&rqpair->qpair);
2024 
2025 	return 0;
2026 }
2027 
2028 static int
2029 nvme_rdma_qpair_wait_until_quiet(struct nvme_rdma_qpair *rqpair)
2030 {
2031 	if (spdk_get_ticks() < rqpair->evt_timeout_ticks &&
2032 	    (rqpair->current_num_sends != 0 || rqpair->current_num_recvs != 0)) {
2033 		return -EAGAIN;
2034 	}
2035 
2036 	rqpair->state = NVME_RDMA_QPAIR_STATE_EXITED;
2037 
2038 	nvme_transport_ctrlr_disconnect_qpair_done(&rqpair->qpair);
2039 
2040 	return 0;
2041 }
2042 
2043 static void
2044 _nvme_rdma_ctrlr_disconnect_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair,
2045 				  nvme_rdma_cm_event_cb disconnected_qpair_cb)
2046 {
2047 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2048 	int rc;
2049 
2050 	assert(disconnected_qpair_cb != NULL);
2051 
2052 	rqpair->state = NVME_RDMA_QPAIR_STATE_EXITING;
2053 
2054 	if (rqpair->cm_id) {
2055 		if (rqpair->rdma_qp) {
2056 			rc = spdk_rdma_qp_disconnect(rqpair->rdma_qp);
2057 			if ((qpair->ctrlr != NULL) && (rc == 0)) {
2058 				rc = nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_DISCONNECTED,
2059 								   disconnected_qpair_cb);
2060 				if (rc == 0) {
2061 					return;
2062 				}
2063 			}
2064 		}
2065 	}
2066 
2067 	disconnected_qpair_cb(rqpair, 0);
2068 }
2069 
2070 static int
2071 nvme_rdma_ctrlr_disconnect_qpair_poll(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
2072 {
2073 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2074 	int rc;
2075 
2076 	switch (rqpair->state) {
2077 	case NVME_RDMA_QPAIR_STATE_EXITING:
2078 		if (!nvme_qpair_is_admin_queue(qpair)) {
2079 			nvme_robust_mutex_lock(&ctrlr->ctrlr_lock);
2080 		}
2081 
2082 		rc = nvme_rdma_process_event_poll(rqpair);
2083 
2084 		if (!nvme_qpair_is_admin_queue(qpair)) {
2085 			nvme_robust_mutex_unlock(&ctrlr->ctrlr_lock);
2086 		}
2087 		break;
2088 
2089 	case NVME_RDMA_QPAIR_STATE_LINGERING:
2090 		rc = nvme_rdma_qpair_wait_until_quiet(rqpair);
2091 		break;
2092 	case NVME_RDMA_QPAIR_STATE_EXITED:
2093 		rc = 0;
2094 		break;
2095 
2096 	default:
2097 		assert(false);
2098 		rc = -EAGAIN;
2099 		break;
2100 	}
2101 
2102 	return rc;
2103 }
2104 
2105 static void
2106 nvme_rdma_ctrlr_disconnect_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
2107 {
2108 	int rc;
2109 
2110 	_nvme_rdma_ctrlr_disconnect_qpair(ctrlr, qpair, nvme_rdma_qpair_disconnected);
2111 
2112 	/* If the qpair is in a poll group, disconnected_qpair_cb has to be called
2113 	 * asynchronously after the qpair is actually disconnected. Hence let
2114 	 * poll_group_process_completions() poll the qpair until then.
2115 	 *
2116 	 * If the qpair is not in a poll group, poll the qpair until it is actually
2117 	 * disconnected here.
2118 	 */
2119 	if (qpair->async || qpair->poll_group != NULL) {
2120 		return;
2121 	}
2122 
2123 	while (1) {
2124 		rc = nvme_rdma_ctrlr_disconnect_qpair_poll(ctrlr, qpair);
2125 		if (rc != -EAGAIN) {
2126 			break;
2127 		}
2128 	}
2129 }
2130 
2131 static int
2132 nvme_rdma_stale_conn_disconnected(struct nvme_rdma_qpair *rqpair, int ret)
2133 {
2134 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
2135 
2136 	if (ret) {
2137 		SPDK_DEBUGLOG(nvme, "Target did not respond to qpair disconnect.\n");
2138 	}
2139 
2140 	nvme_rdma_qpair_destroy(rqpair);
2141 
2142 	qpair->last_transport_failure_reason = qpair->transport_failure_reason;
2143 	qpair->transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_NONE;
2144 
2145 	rqpair->state = NVME_RDMA_QPAIR_STATE_STALE_CONN;
2146 	rqpair->evt_timeout_ticks = (NVME_RDMA_STALE_CONN_RETRY_DELAY_US * spdk_get_ticks_hz()) /
2147 				    SPDK_SEC_TO_USEC + spdk_get_ticks();
2148 
2149 	return 0;
2150 }
2151 
2152 static int
2153 nvme_rdma_stale_conn_retry(struct nvme_rdma_qpair *rqpair)
2154 {
2155 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
2156 
2157 	if (rqpair->stale_conn_retry_count >= NVME_RDMA_STALE_CONN_RETRY_MAX) {
2158 		SPDK_ERRLOG("Retry failed %d times, give up stale connection to qpair (cntlid:%u, qid:%u).\n",
2159 			    NVME_RDMA_STALE_CONN_RETRY_MAX, qpair->ctrlr->cntlid, qpair->id);
2160 		return -ESTALE;
2161 	}
2162 
2163 	rqpair->stale_conn_retry_count++;
2164 
2165 	SPDK_NOTICELOG("%d times, retry stale connnection to qpair (cntlid:%u, qid:%u).\n",
2166 		       rqpair->stale_conn_retry_count, qpair->ctrlr->cntlid, qpair->id);
2167 
2168 	if (qpair->poll_group) {
2169 		rqpair->cq = NULL;
2170 	}
2171 
2172 	_nvme_rdma_ctrlr_disconnect_qpair(qpair->ctrlr, qpair, nvme_rdma_stale_conn_disconnected);
2173 
2174 	return 0;
2175 }
2176 
2177 static int
2178 nvme_rdma_ctrlr_delete_io_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
2179 {
2180 	struct nvme_rdma_qpair *rqpair;
2181 
2182 	assert(qpair != NULL);
2183 	rqpair = nvme_rdma_qpair(qpair);
2184 
2185 	if (rqpair->state != NVME_RDMA_QPAIR_STATE_EXITED) {
2186 		int rc __attribute__((unused));
2187 
2188 		/* qpair was removed from the poll group while the disconnect is not finished.
2189 		 * Destroy rdma resources forcefully. */
2190 		rc = nvme_rdma_qpair_disconnected(rqpair, 0);
2191 		assert(rc == 0);
2192 	}
2193 
2194 	nvme_rdma_qpair_abort_reqs(qpair, 0);
2195 	nvme_qpair_deinit(qpair);
2196 
2197 	nvme_rdma_put_memory_domain(rqpair->memory_domain);
2198 
2199 	nvme_rdma_free_reqs(rqpair);
2200 	nvme_rdma_free_rsps(rqpair);
2201 	nvme_rdma_free(rqpair);
2202 
2203 	return 0;
2204 }
2205 
2206 static struct spdk_nvme_qpair *
2207 nvme_rdma_ctrlr_create_io_qpair(struct spdk_nvme_ctrlr *ctrlr, uint16_t qid,
2208 				const struct spdk_nvme_io_qpair_opts *opts)
2209 {
2210 	return nvme_rdma_ctrlr_create_qpair(ctrlr, qid, opts->io_queue_size, opts->qprio,
2211 					    opts->io_queue_requests,
2212 					    opts->delay_cmd_submit,
2213 					    opts->async_mode);
2214 }
2215 
2216 static int
2217 nvme_rdma_ctrlr_enable(struct spdk_nvme_ctrlr *ctrlr)
2218 {
2219 	/* do nothing here */
2220 	return 0;
2221 }
2222 
2223 static int nvme_rdma_ctrlr_destruct(struct spdk_nvme_ctrlr *ctrlr);
2224 
2225 static struct spdk_nvme_ctrlr *nvme_rdma_ctrlr_construct(const struct spdk_nvme_transport_id *trid,
2226 		const struct spdk_nvme_ctrlr_opts *opts,
2227 		void *devhandle)
2228 {
2229 	struct nvme_rdma_ctrlr *rctrlr;
2230 	struct ibv_context **contexts;
2231 	struct ibv_device_attr dev_attr;
2232 	int i, flag, rc;
2233 
2234 	rctrlr = nvme_rdma_calloc(1, sizeof(struct nvme_rdma_ctrlr));
2235 	if (rctrlr == NULL) {
2236 		SPDK_ERRLOG("could not allocate ctrlr\n");
2237 		return NULL;
2238 	}
2239 
2240 	rctrlr->ctrlr.opts = *opts;
2241 	rctrlr->ctrlr.trid = *trid;
2242 
2243 	if (opts->transport_retry_count > NVME_RDMA_CTRLR_MAX_TRANSPORT_RETRY_COUNT) {
2244 		SPDK_NOTICELOG("transport_retry_count exceeds max value %d, use max value\n",
2245 			       NVME_RDMA_CTRLR_MAX_TRANSPORT_RETRY_COUNT);
2246 		rctrlr->ctrlr.opts.transport_retry_count = NVME_RDMA_CTRLR_MAX_TRANSPORT_RETRY_COUNT;
2247 	}
2248 
2249 	if (opts->transport_ack_timeout > NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT) {
2250 		SPDK_NOTICELOG("transport_ack_timeout exceeds max value %d, use max value\n",
2251 			       NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT);
2252 		rctrlr->ctrlr.opts.transport_ack_timeout = NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT;
2253 	}
2254 
2255 	contexts = rdma_get_devices(NULL);
2256 	if (contexts == NULL) {
2257 		SPDK_ERRLOG("rdma_get_devices() failed: %s (%d)\n", spdk_strerror(errno), errno);
2258 		nvme_rdma_free(rctrlr);
2259 		return NULL;
2260 	}
2261 
2262 	i = 0;
2263 	rctrlr->max_sge = NVME_RDMA_MAX_SGL_DESCRIPTORS;
2264 
2265 	while (contexts[i] != NULL) {
2266 		rc = ibv_query_device(contexts[i], &dev_attr);
2267 		if (rc < 0) {
2268 			SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
2269 			rdma_free_devices(contexts);
2270 			nvme_rdma_free(rctrlr);
2271 			return NULL;
2272 		}
2273 		rctrlr->max_sge = spdk_min(rctrlr->max_sge, (uint16_t)dev_attr.max_sge);
2274 		i++;
2275 	}
2276 
2277 	rdma_free_devices(contexts);
2278 
2279 	rc = nvme_ctrlr_construct(&rctrlr->ctrlr);
2280 	if (rc != 0) {
2281 		nvme_rdma_free(rctrlr);
2282 		return NULL;
2283 	}
2284 
2285 	STAILQ_INIT(&rctrlr->pending_cm_events);
2286 	STAILQ_INIT(&rctrlr->free_cm_events);
2287 	rctrlr->cm_events = nvme_rdma_calloc(NVME_RDMA_NUM_CM_EVENTS, sizeof(*rctrlr->cm_events));
2288 	if (rctrlr->cm_events == NULL) {
2289 		SPDK_ERRLOG("unable to allocate buffers to hold CM events.\n");
2290 		goto destruct_ctrlr;
2291 	}
2292 
2293 	for (i = 0; i < NVME_RDMA_NUM_CM_EVENTS; i++) {
2294 		STAILQ_INSERT_TAIL(&rctrlr->free_cm_events, &rctrlr->cm_events[i], link);
2295 	}
2296 
2297 	rctrlr->cm_channel = rdma_create_event_channel();
2298 	if (rctrlr->cm_channel == NULL) {
2299 		SPDK_ERRLOG("rdma_create_event_channel() failed\n");
2300 		goto destruct_ctrlr;
2301 	}
2302 
2303 	flag = fcntl(rctrlr->cm_channel->fd, F_GETFL);
2304 	if (fcntl(rctrlr->cm_channel->fd, F_SETFL, flag | O_NONBLOCK) < 0) {
2305 		SPDK_ERRLOG("Cannot set event channel to non blocking\n");
2306 		goto destruct_ctrlr;
2307 	}
2308 
2309 	rctrlr->ctrlr.adminq = nvme_rdma_ctrlr_create_qpair(&rctrlr->ctrlr, 0,
2310 			       rctrlr->ctrlr.opts.admin_queue_size, 0,
2311 			       rctrlr->ctrlr.opts.admin_queue_size, false, true);
2312 	if (!rctrlr->ctrlr.adminq) {
2313 		SPDK_ERRLOG("failed to create admin qpair\n");
2314 		goto destruct_ctrlr;
2315 	}
2316 
2317 	if (nvme_ctrlr_add_process(&rctrlr->ctrlr, 0) != 0) {
2318 		SPDK_ERRLOG("nvme_ctrlr_add_process() failed\n");
2319 		goto destruct_ctrlr;
2320 	}
2321 
2322 	SPDK_DEBUGLOG(nvme, "successfully initialized the nvmf ctrlr\n");
2323 	return &rctrlr->ctrlr;
2324 
2325 destruct_ctrlr:
2326 	nvme_ctrlr_destruct(&rctrlr->ctrlr);
2327 	return NULL;
2328 }
2329 
2330 static int
2331 nvme_rdma_ctrlr_destruct(struct spdk_nvme_ctrlr *ctrlr)
2332 {
2333 	struct nvme_rdma_ctrlr *rctrlr = nvme_rdma_ctrlr(ctrlr);
2334 	struct nvme_rdma_cm_event_entry *entry;
2335 
2336 	if (ctrlr->adminq) {
2337 		nvme_rdma_ctrlr_delete_io_qpair(ctrlr, ctrlr->adminq);
2338 	}
2339 
2340 	STAILQ_FOREACH(entry, &rctrlr->pending_cm_events, link) {
2341 		rdma_ack_cm_event(entry->evt);
2342 	}
2343 
2344 	STAILQ_INIT(&rctrlr->free_cm_events);
2345 	STAILQ_INIT(&rctrlr->pending_cm_events);
2346 	nvme_rdma_free(rctrlr->cm_events);
2347 
2348 	if (rctrlr->cm_channel) {
2349 		rdma_destroy_event_channel(rctrlr->cm_channel);
2350 		rctrlr->cm_channel = NULL;
2351 	}
2352 
2353 	nvme_ctrlr_destruct_finish(ctrlr);
2354 
2355 	nvme_rdma_free(rctrlr);
2356 
2357 	return 0;
2358 }
2359 
2360 static int
2361 nvme_rdma_qpair_submit_request(struct spdk_nvme_qpair *qpair,
2362 			       struct nvme_request *req)
2363 {
2364 	struct nvme_rdma_qpair *rqpair;
2365 	struct spdk_nvme_rdma_req *rdma_req;
2366 	struct ibv_send_wr *wr;
2367 
2368 	rqpair = nvme_rdma_qpair(qpair);
2369 	assert(rqpair != NULL);
2370 	assert(req != NULL);
2371 
2372 	rdma_req = nvme_rdma_req_get(rqpair);
2373 	if (spdk_unlikely(!rdma_req)) {
2374 		if (rqpair->poller) {
2375 			rqpair->poller->stats.queued_requests++;
2376 		}
2377 		/* Inform the upper layer to try again later. */
2378 		return -EAGAIN;
2379 	}
2380 
2381 	if (nvme_rdma_req_init(rqpair, req, rdma_req)) {
2382 		SPDK_ERRLOG("nvme_rdma_req_init() failed\n");
2383 		TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link);
2384 		nvme_rdma_req_put(rqpair, rdma_req);
2385 		return -1;
2386 	}
2387 
2388 	wr = &rdma_req->send_wr;
2389 	wr->next = NULL;
2390 	nvme_rdma_trace_ibv_sge(wr->sg_list);
2391 	return nvme_rdma_qpair_queue_send_wr(rqpair, wr);
2392 }
2393 
2394 static int
2395 nvme_rdma_qpair_reset(struct spdk_nvme_qpair *qpair)
2396 {
2397 	/* Currently, doing nothing here */
2398 	return 0;
2399 }
2400 
2401 static void
2402 nvme_rdma_qpair_abort_reqs(struct spdk_nvme_qpair *qpair, uint32_t dnr)
2403 {
2404 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
2405 	struct spdk_nvme_cpl cpl;
2406 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2407 
2408 	cpl.status.sc = SPDK_NVME_SC_ABORTED_SQ_DELETION;
2409 	cpl.status.sct = SPDK_NVME_SCT_GENERIC;
2410 	cpl.status.dnr = dnr;
2411 
2412 	/*
2413 	 * We cannot abort requests at the RDMA layer without
2414 	 * unregistering them. If we do, we can still get error
2415 	 * free completions on the shared completion queue.
2416 	 */
2417 	if (nvme_qpair_get_state(qpair) > NVME_QPAIR_DISCONNECTING &&
2418 	    nvme_qpair_get_state(qpair) != NVME_QPAIR_DESTROYING) {
2419 		nvme_ctrlr_disconnect_qpair(qpair);
2420 	}
2421 
2422 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
2423 		nvme_rdma_req_complete(rdma_req, &cpl);
2424 		nvme_rdma_req_put(rqpair, rdma_req);
2425 	}
2426 }
2427 
2428 static void
2429 nvme_rdma_qpair_check_timeout(struct spdk_nvme_qpair *qpair)
2430 {
2431 	uint64_t t02;
2432 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
2433 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2434 	struct spdk_nvme_ctrlr *ctrlr = qpair->ctrlr;
2435 	struct spdk_nvme_ctrlr_process *active_proc;
2436 
2437 	/* Don't check timeouts during controller initialization. */
2438 	if (ctrlr->state != NVME_CTRLR_STATE_READY) {
2439 		return;
2440 	}
2441 
2442 	if (nvme_qpair_is_admin_queue(qpair)) {
2443 		active_proc = nvme_ctrlr_get_current_process(ctrlr);
2444 	} else {
2445 		active_proc = qpair->active_proc;
2446 	}
2447 
2448 	/* Only check timeouts if the current process has a timeout callback. */
2449 	if (active_proc == NULL || active_proc->timeout_cb_fn == NULL) {
2450 		return;
2451 	}
2452 
2453 	t02 = spdk_get_ticks();
2454 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
2455 		assert(rdma_req->req != NULL);
2456 
2457 		if (nvme_request_check_timeout(rdma_req->req, rdma_req->id, active_proc, t02)) {
2458 			/*
2459 			 * The requests are in order, so as soon as one has not timed out,
2460 			 * stop iterating.
2461 			 */
2462 			break;
2463 		}
2464 	}
2465 }
2466 
2467 static inline int
2468 nvme_rdma_request_ready(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
2469 {
2470 	nvme_rdma_req_complete(rdma_req, &rqpair->rsps[rdma_req->rsp_idx].cpl);
2471 	nvme_rdma_req_put(rqpair, rdma_req);
2472 	return nvme_rdma_post_recv(rqpair, rdma_req->rsp_idx);
2473 }
2474 
2475 #define MAX_COMPLETIONS_PER_POLL 128
2476 
2477 static void
2478 nvme_rdma_fail_qpair(struct spdk_nvme_qpair *qpair, int failure_reason)
2479 {
2480 	if (failure_reason == IBV_WC_RETRY_EXC_ERR) {
2481 		qpair->transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_REMOTE;
2482 	} else if (qpair->transport_failure_reason == SPDK_NVME_QPAIR_FAILURE_NONE) {
2483 		qpair->transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_UNKNOWN;
2484 	}
2485 
2486 	nvme_ctrlr_disconnect_qpair(qpair);
2487 }
2488 
2489 static inline void
2490 nvme_rdma_log_wc_status(struct nvme_rdma_qpair *rqpair, struct ibv_wc *wc)
2491 {
2492 	struct nvme_rdma_wr *rdma_wr = (struct nvme_rdma_wr *)wc->wr_id;
2493 
2494 	if (wc->status == IBV_WC_WR_FLUSH_ERR) {
2495 		/* If qpair is in ERR state, we will receive completions for all posted and not completed
2496 		 * Work Requests with IBV_WC_WR_FLUSH_ERR status. Don't log an error in that case */
2497 		SPDK_DEBUGLOG(nvme, "WC error, qid %u, qp state %d, request 0x%lu type %d, status: (%d): %s\n",
2498 			      rqpair->qpair.id, rqpair->qpair.state, wc->wr_id, rdma_wr->type, wc->status,
2499 			      ibv_wc_status_str(wc->status));
2500 	} else {
2501 		SPDK_ERRLOG("WC error, qid %u, qp state %d, request 0x%lu type %d, status: (%d): %s\n",
2502 			    rqpair->qpair.id, rqpair->qpair.state, wc->wr_id, rdma_wr->type, wc->status,
2503 			    ibv_wc_status_str(wc->status));
2504 	}
2505 }
2506 
2507 static inline bool
2508 nvme_rdma_is_rxe_device(struct ibv_device_attr *dev_attr)
2509 {
2510 	return dev_attr->vendor_id == SPDK_RDMA_RXE_VENDOR_ID_OLD ||
2511 	       dev_attr->vendor_id == SPDK_RDMA_RXE_VENDOR_ID_NEW;
2512 }
2513 
2514 static int
2515 nvme_rdma_cq_process_completions(struct ibv_cq *cq, uint32_t batch_size,
2516 				 struct nvme_rdma_poll_group *group,
2517 				 struct nvme_rdma_qpair *rdma_qpair,
2518 				 uint64_t *rdma_completions)
2519 {
2520 	struct ibv_wc			wc[MAX_COMPLETIONS_PER_POLL];
2521 	struct nvme_rdma_qpair		*rqpair;
2522 	struct spdk_nvme_rdma_req	*rdma_req;
2523 	struct spdk_nvme_rdma_rsp	*rdma_rsp;
2524 	struct nvme_rdma_wr		*rdma_wr;
2525 	uint32_t			reaped = 0;
2526 	int				completion_rc = 0;
2527 	int				rc, i;
2528 
2529 	rc = ibv_poll_cq(cq, batch_size, wc);
2530 	if (rc < 0) {
2531 		SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
2532 			    errno, spdk_strerror(errno));
2533 		return -ECANCELED;
2534 	} else if (rc == 0) {
2535 		return 0;
2536 	}
2537 
2538 	for (i = 0; i < rc; i++) {
2539 		rdma_wr = (struct nvme_rdma_wr *)wc[i].wr_id;
2540 		switch (rdma_wr->type) {
2541 		case RDMA_WR_TYPE_RECV:
2542 			rdma_rsp = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvme_rdma_rsp, rdma_wr);
2543 			rqpair = rdma_rsp->rqpair;
2544 			assert(rqpair->current_num_recvs > 0);
2545 			rqpair->current_num_recvs--;
2546 
2547 			if (wc[i].status) {
2548 				nvme_rdma_log_wc_status(rqpair, &wc[i]);
2549 				nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2550 				completion_rc = -ENXIO;
2551 				continue;
2552 			}
2553 
2554 			SPDK_DEBUGLOG(nvme, "CQ recv completion\n");
2555 
2556 			if (wc[i].byte_len < sizeof(struct spdk_nvme_cpl)) {
2557 				SPDK_ERRLOG("recv length %u less than expected response size\n", wc[i].byte_len);
2558 				nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2559 				completion_rc = -ENXIO;
2560 				continue;
2561 			}
2562 			rdma_req = &rqpair->rdma_reqs[rdma_rsp->cpl.cid];
2563 			rdma_req->completion_flags |= NVME_RDMA_RECV_COMPLETED;
2564 			rdma_req->rsp_idx = rdma_rsp->idx;
2565 
2566 			if ((rdma_req->completion_flags & NVME_RDMA_SEND_COMPLETED) != 0) {
2567 				if (spdk_unlikely(nvme_rdma_request_ready(rqpair, rdma_req))) {
2568 					SPDK_ERRLOG("Unable to re-post rx descriptor\n");
2569 					nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2570 					completion_rc = -ENXIO;
2571 					continue;
2572 				}
2573 				reaped++;
2574 				rqpair->num_completions++;
2575 			}
2576 			break;
2577 
2578 		case RDMA_WR_TYPE_SEND:
2579 			rdma_req = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvme_rdma_req, rdma_wr);
2580 
2581 			/* If we are flushing I/O */
2582 			if (wc[i].status) {
2583 				rqpair = rdma_req->req ? nvme_rdma_qpair(rdma_req->req->qpair) : NULL;
2584 				if (!rqpair) {
2585 					rqpair = rdma_qpair != NULL ? rdma_qpair : nvme_rdma_poll_group_get_qpair_by_id(group,
2586 							wc[i].qp_num);
2587 				}
2588 				if (!rqpair) {
2589 					/* When poll_group is used, several qpairs share the same CQ and it is possible to
2590 					 * receive a completion with error (e.g. IBV_WC_WR_FLUSH_ERR) for already disconnected qpair
2591 					 * That happens due to qpair is destroyed while there are submitted but not completed send/receive
2592 					 * Work Requests */
2593 					assert(group);
2594 					continue;
2595 				}
2596 				assert(rqpair->current_num_sends > 0);
2597 				rqpair->current_num_sends--;
2598 				nvme_rdma_log_wc_status(rqpair, &wc[i]);
2599 				nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2600 				completion_rc = -ENXIO;
2601 				continue;
2602 			}
2603 
2604 			if (spdk_unlikely(rdma_req->req == NULL)) {
2605 				struct ibv_device_attr dev_attr;
2606 				int query_status;
2607 
2608 				/* Bug in Soft Roce - we may receive a completion without error status when qpair is disconnected/destroyed.
2609 				 * As sanity check - log an error if we use a real HW (it should never happen) */
2610 				query_status = ibv_query_device(cq->context, &dev_attr);
2611 				if (query_status == 0) {
2612 					if (!nvme_rdma_is_rxe_device(&dev_attr)) {
2613 						SPDK_ERRLOG("Received malformed completion: request 0x%"PRIx64" type %d\n", wc->wr_id,
2614 							    rdma_wr->type);
2615 						assert(0);
2616 					}
2617 				} else {
2618 					SPDK_ERRLOG("Failed to query ib device\n");
2619 					assert(0);
2620 				}
2621 				continue;
2622 			}
2623 
2624 			rqpair = nvme_rdma_qpair(rdma_req->req->qpair);
2625 			rdma_req->completion_flags |= NVME_RDMA_SEND_COMPLETED;
2626 			rqpair->current_num_sends--;
2627 
2628 			if ((rdma_req->completion_flags & NVME_RDMA_RECV_COMPLETED) != 0) {
2629 				if (spdk_unlikely(nvme_rdma_request_ready(rqpair, rdma_req))) {
2630 					SPDK_ERRLOG("Unable to re-post rx descriptor\n");
2631 					nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2632 					completion_rc = -ENXIO;
2633 					continue;
2634 				}
2635 				reaped++;
2636 				rqpair->num_completions++;
2637 			}
2638 			break;
2639 
2640 		default:
2641 			SPDK_ERRLOG("Received an unexpected opcode on the CQ: %d\n", rdma_wr->type);
2642 			return -ECANCELED;
2643 		}
2644 	}
2645 
2646 	*rdma_completions += rc;
2647 
2648 	if (completion_rc) {
2649 		return completion_rc;
2650 	}
2651 
2652 	return reaped;
2653 }
2654 
2655 static void
2656 dummy_disconnected_qpair_cb(struct spdk_nvme_qpair *qpair, void *poll_group_ctx)
2657 {
2658 
2659 }
2660 
2661 static int
2662 nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
2663 				    uint32_t max_completions)
2664 {
2665 	struct nvme_rdma_qpair		*rqpair = nvme_rdma_qpair(qpair);
2666 	struct nvme_rdma_ctrlr		*rctrlr = nvme_rdma_ctrlr(qpair->ctrlr);
2667 	int				rc = 0, batch_size;
2668 	struct ibv_cq			*cq;
2669 	uint64_t			rdma_completions = 0;
2670 
2671 	/*
2672 	 * This is used during the connection phase. It's possible that we are still reaping error completions
2673 	 * from other qpairs so we need to call the poll group function. Also, it's more correct since the cq
2674 	 * is shared.
2675 	 */
2676 	if (qpair->poll_group != NULL) {
2677 		return spdk_nvme_poll_group_process_completions(qpair->poll_group->group, max_completions,
2678 				dummy_disconnected_qpair_cb);
2679 	}
2680 
2681 	if (max_completions == 0) {
2682 		max_completions = rqpair->num_entries;
2683 	} else {
2684 		max_completions = spdk_min(max_completions, rqpair->num_entries);
2685 	}
2686 
2687 	switch (nvme_qpair_get_state(qpair)) {
2688 	case NVME_QPAIR_CONNECTING:
2689 		rc = nvme_rdma_ctrlr_connect_qpair_poll(qpair->ctrlr, qpair);
2690 		if (rc == 0) {
2691 			/* Once the connection is completed, we can submit queued requests */
2692 			nvme_qpair_resubmit_requests(qpair, rqpair->num_entries);
2693 		} else if (rc != -EAGAIN) {
2694 			SPDK_ERRLOG("Failed to connect rqpair=%p\n", rqpair);
2695 			goto failed;
2696 		} else if (rqpair->state <= NVME_RDMA_QPAIR_STATE_INITIALIZING) {
2697 			return 0;
2698 		}
2699 		break;
2700 
2701 	case NVME_QPAIR_DISCONNECTING:
2702 		nvme_rdma_ctrlr_disconnect_qpair_poll(qpair->ctrlr, qpair);
2703 		return -ENXIO;
2704 
2705 	default:
2706 		if (nvme_qpair_is_admin_queue(qpair)) {
2707 			nvme_rdma_poll_events(rctrlr);
2708 		}
2709 		nvme_rdma_qpair_process_cm_event(rqpair);
2710 		break;
2711 	}
2712 
2713 	if (spdk_unlikely(qpair->transport_failure_reason != SPDK_NVME_QPAIR_FAILURE_NONE)) {
2714 		goto failed;
2715 	}
2716 
2717 	cq = rqpair->cq;
2718 
2719 	rqpair->num_completions = 0;
2720 	do {
2721 		batch_size = spdk_min((max_completions - rqpair->num_completions), MAX_COMPLETIONS_PER_POLL);
2722 		rc = nvme_rdma_cq_process_completions(cq, batch_size, NULL, rqpair, &rdma_completions);
2723 
2724 		if (rc == 0) {
2725 			break;
2726 			/* Handle the case where we fail to poll the cq. */
2727 		} else if (rc == -ECANCELED) {
2728 			goto failed;
2729 		} else if (rc == -ENXIO) {
2730 			return rc;
2731 		}
2732 	} while (rqpair->num_completions < max_completions);
2733 
2734 	if (spdk_unlikely(nvme_rdma_qpair_submit_sends(rqpair) ||
2735 			  nvme_rdma_qpair_submit_recvs(rqpair))) {
2736 		goto failed;
2737 	}
2738 
2739 	if (spdk_unlikely(qpair->ctrlr->timeout_enabled)) {
2740 		nvme_rdma_qpair_check_timeout(qpair);
2741 	}
2742 
2743 	return rqpair->num_completions;
2744 
2745 failed:
2746 	nvme_rdma_fail_qpair(qpair, 0);
2747 	return -ENXIO;
2748 }
2749 
2750 static uint32_t
2751 nvme_rdma_ctrlr_get_max_xfer_size(struct spdk_nvme_ctrlr *ctrlr)
2752 {
2753 	/* max_mr_size by ibv_query_device indicates the largest value that we can
2754 	 * set for a registered memory region.  It is independent from the actual
2755 	 * I/O size and is very likely to be larger than 2 MiB which is the
2756 	 * granularity we currently register memory regions.  Hence return
2757 	 * UINT32_MAX here and let the generic layer use the controller data to
2758 	 * moderate this value.
2759 	 */
2760 	return UINT32_MAX;
2761 }
2762 
2763 static uint16_t
2764 nvme_rdma_ctrlr_get_max_sges(struct spdk_nvme_ctrlr *ctrlr)
2765 {
2766 	struct nvme_rdma_ctrlr *rctrlr = nvme_rdma_ctrlr(ctrlr);
2767 	uint32_t max_sge = rctrlr->max_sge;
2768 	uint32_t max_in_capsule_sge = (ctrlr->cdata.nvmf_specific.ioccsz * 16 -
2769 				       sizeof(struct spdk_nvme_cmd)) /
2770 				      sizeof(struct spdk_nvme_sgl_descriptor);
2771 
2772 	/* Max SGE is limited by capsule size */
2773 	max_sge = spdk_min(max_sge, max_in_capsule_sge);
2774 	/* Max SGE may be limited by MSDBD */
2775 	if (ctrlr->cdata.nvmf_specific.msdbd != 0) {
2776 		max_sge = spdk_min(max_sge, ctrlr->cdata.nvmf_specific.msdbd);
2777 	}
2778 
2779 	/* Max SGE can't be less than 1 */
2780 	max_sge = spdk_max(1, max_sge);
2781 	return max_sge;
2782 }
2783 
2784 static int
2785 nvme_rdma_qpair_iterate_requests(struct spdk_nvme_qpair *qpair,
2786 				 int (*iter_fn)(struct nvme_request *req, void *arg),
2787 				 void *arg)
2788 {
2789 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2790 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
2791 	int rc;
2792 
2793 	assert(iter_fn != NULL);
2794 
2795 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
2796 		assert(rdma_req->req != NULL);
2797 
2798 		rc = iter_fn(rdma_req->req, arg);
2799 		if (rc != 0) {
2800 			return rc;
2801 		}
2802 	}
2803 
2804 	return 0;
2805 }
2806 
2807 static void
2808 nvme_rdma_admin_qpair_abort_aers(struct spdk_nvme_qpair *qpair)
2809 {
2810 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
2811 	struct spdk_nvme_cpl cpl;
2812 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2813 
2814 	cpl.status.sc = SPDK_NVME_SC_ABORTED_SQ_DELETION;
2815 	cpl.status.sct = SPDK_NVME_SCT_GENERIC;
2816 
2817 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
2818 		assert(rdma_req->req != NULL);
2819 
2820 		if (rdma_req->req->cmd.opc != SPDK_NVME_OPC_ASYNC_EVENT_REQUEST) {
2821 			continue;
2822 		}
2823 
2824 		nvme_rdma_req_complete(rdma_req, &cpl);
2825 		nvme_rdma_req_put(rqpair, rdma_req);
2826 	}
2827 }
2828 
2829 static int
2830 nvme_rdma_poller_create(struct nvme_rdma_poll_group *group, struct ibv_context *ctx)
2831 {
2832 	struct nvme_rdma_poller *poller;
2833 
2834 	poller = calloc(1, sizeof(*poller));
2835 	if (poller == NULL) {
2836 		SPDK_ERRLOG("Unable to allocate poller.\n");
2837 		return -ENOMEM;
2838 	}
2839 
2840 	poller->device = ctx;
2841 	poller->cq = ibv_create_cq(poller->device, DEFAULT_NVME_RDMA_CQ_SIZE, group, NULL, 0);
2842 
2843 	if (poller->cq == NULL) {
2844 		free(poller);
2845 		return -EINVAL;
2846 	}
2847 
2848 	STAILQ_INSERT_HEAD(&group->pollers, poller, link);
2849 	group->num_pollers++;
2850 	poller->current_num_wc = DEFAULT_NVME_RDMA_CQ_SIZE;
2851 	poller->required_num_wc = 0;
2852 	return 0;
2853 }
2854 
2855 static void
2856 nvme_rdma_poll_group_free_pollers(struct nvme_rdma_poll_group *group)
2857 {
2858 	struct nvme_rdma_poller	*poller, *tmp_poller;
2859 
2860 	STAILQ_FOREACH_SAFE(poller, &group->pollers, link, tmp_poller) {
2861 		if (poller->cq) {
2862 			ibv_destroy_cq(poller->cq);
2863 		}
2864 		STAILQ_REMOVE(&group->pollers, poller, nvme_rdma_poller, link);
2865 		free(poller);
2866 	}
2867 }
2868 
2869 static struct spdk_nvme_transport_poll_group *
2870 nvme_rdma_poll_group_create(void)
2871 {
2872 	struct nvme_rdma_poll_group	*group;
2873 	struct ibv_context		**contexts;
2874 	int i = 0;
2875 
2876 	group = calloc(1, sizeof(*group));
2877 	if (group == NULL) {
2878 		SPDK_ERRLOG("Unable to allocate poll group.\n");
2879 		return NULL;
2880 	}
2881 
2882 	STAILQ_INIT(&group->pollers);
2883 
2884 	contexts = rdma_get_devices(NULL);
2885 	if (contexts == NULL) {
2886 		SPDK_ERRLOG("rdma_get_devices() failed: %s (%d)\n", spdk_strerror(errno), errno);
2887 		free(group);
2888 		return NULL;
2889 	}
2890 
2891 	while (contexts[i] != NULL) {
2892 		if (nvme_rdma_poller_create(group, contexts[i])) {
2893 			nvme_rdma_poll_group_free_pollers(group);
2894 			free(group);
2895 			rdma_free_devices(contexts);
2896 			return NULL;
2897 		}
2898 		i++;
2899 	}
2900 
2901 	rdma_free_devices(contexts);
2902 
2903 	return &group->group;
2904 }
2905 
2906 struct nvme_rdma_qpair *
2907 nvme_rdma_poll_group_get_qpair_by_id(struct nvme_rdma_poll_group *group, uint32_t qp_num)
2908 {
2909 	struct spdk_nvme_qpair *qpair;
2910 	struct nvme_rdma_qpair *rqpair;
2911 
2912 	STAILQ_FOREACH(qpair, &group->group.disconnected_qpairs, poll_group_stailq) {
2913 		rqpair = nvme_rdma_qpair(qpair);
2914 		if (NVME_RDMA_POLL_GROUP_CHECK_QPN(rqpair, qp_num)) {
2915 			return rqpair;
2916 		}
2917 	}
2918 
2919 	STAILQ_FOREACH(qpair, &group->group.connected_qpairs, poll_group_stailq) {
2920 		rqpair = nvme_rdma_qpair(qpair);
2921 		if (NVME_RDMA_POLL_GROUP_CHECK_QPN(rqpair, qp_num)) {
2922 			return rqpair;
2923 		}
2924 	}
2925 
2926 	return NULL;
2927 }
2928 
2929 
2930 static int
2931 nvme_rdma_poll_group_connect_qpair(struct spdk_nvme_qpair *qpair)
2932 {
2933 	return 0;
2934 }
2935 
2936 static int
2937 nvme_rdma_poll_group_disconnect_qpair(struct spdk_nvme_qpair *qpair)
2938 {
2939 	struct nvme_rdma_qpair		*rqpair = nvme_rdma_qpair(qpair);
2940 
2941 	rqpair->cq = NULL;
2942 
2943 	return 0;
2944 }
2945 
2946 static int
2947 nvme_rdma_poll_group_add(struct spdk_nvme_transport_poll_group *tgroup,
2948 			 struct spdk_nvme_qpair *qpair)
2949 {
2950 	return 0;
2951 }
2952 
2953 static int
2954 nvme_rdma_poll_group_remove(struct spdk_nvme_transport_poll_group *tgroup,
2955 			    struct spdk_nvme_qpair *qpair)
2956 {
2957 	assert(qpair->poll_group_tailq_head == &tgroup->disconnected_qpairs);
2958 
2959 	return 0;
2960 }
2961 
2962 static int64_t
2963 nvme_rdma_poll_group_process_completions(struct spdk_nvme_transport_poll_group *tgroup,
2964 		uint32_t completions_per_qpair, spdk_nvme_disconnected_qpair_cb disconnected_qpair_cb)
2965 {
2966 	struct spdk_nvme_qpair			*qpair, *tmp_qpair;
2967 	struct nvme_rdma_qpair			*rqpair;
2968 	struct nvme_rdma_poll_group		*group;
2969 	struct nvme_rdma_poller			*poller;
2970 	int					num_qpairs = 0, batch_size, rc, rc2 = 0;
2971 	int64_t					total_completions = 0;
2972 	uint64_t				completions_allowed = 0;
2973 	uint64_t				completions_per_poller = 0;
2974 	uint64_t				poller_completions = 0;
2975 	uint64_t				rdma_completions;
2976 
2977 	if (completions_per_qpair == 0) {
2978 		completions_per_qpair = MAX_COMPLETIONS_PER_POLL;
2979 	}
2980 
2981 	group = nvme_rdma_poll_group(tgroup);
2982 	STAILQ_FOREACH_SAFE(qpair, &tgroup->disconnected_qpairs, poll_group_stailq, tmp_qpair) {
2983 		rc = nvme_rdma_ctrlr_disconnect_qpair_poll(qpair->ctrlr, qpair);
2984 		if (rc == 0) {
2985 			disconnected_qpair_cb(qpair, tgroup->group->ctx);
2986 		}
2987 	}
2988 
2989 	STAILQ_FOREACH_SAFE(qpair, &tgroup->connected_qpairs, poll_group_stailq, tmp_qpair) {
2990 		rqpair = nvme_rdma_qpair(qpair);
2991 		rqpair->num_completions = 0;
2992 
2993 		if (spdk_unlikely(nvme_qpair_get_state(qpair) == NVME_QPAIR_CONNECTING)) {
2994 			rc = nvme_rdma_ctrlr_connect_qpair_poll(qpair->ctrlr, qpair);
2995 			if (rc == 0) {
2996 				/* Once the connection is completed, we can submit queued requests */
2997 				nvme_qpair_resubmit_requests(qpair, rqpair->num_entries);
2998 			} else if (rc != -EAGAIN) {
2999 				SPDK_ERRLOG("Failed to connect rqpair=%p\n", rqpair);
3000 				nvme_rdma_fail_qpair(qpair, 0);
3001 				continue;
3002 			}
3003 		} else {
3004 			nvme_rdma_qpair_process_cm_event(rqpair);
3005 		}
3006 
3007 		if (spdk_unlikely(qpair->transport_failure_reason != SPDK_NVME_QPAIR_FAILURE_NONE)) {
3008 			rc2 = -ENXIO;
3009 			nvme_rdma_fail_qpair(qpair, 0);
3010 			continue;
3011 		}
3012 		num_qpairs++;
3013 	}
3014 
3015 	completions_allowed = completions_per_qpair * num_qpairs;
3016 	completions_per_poller = spdk_max(completions_allowed / group->num_pollers, 1);
3017 
3018 	STAILQ_FOREACH(poller, &group->pollers, link) {
3019 		poller_completions = 0;
3020 		rdma_completions = 0;
3021 		do {
3022 			poller->stats.polls++;
3023 			batch_size = spdk_min((completions_per_poller - poller_completions), MAX_COMPLETIONS_PER_POLL);
3024 			rc = nvme_rdma_cq_process_completions(poller->cq, batch_size, group, NULL, &rdma_completions);
3025 			if (rc <= 0) {
3026 				if (rc == -ECANCELED) {
3027 					return -EIO;
3028 				} else if (rc == 0) {
3029 					poller->stats.idle_polls++;
3030 				}
3031 				break;
3032 			}
3033 
3034 			poller_completions += rc;
3035 		} while (poller_completions < completions_per_poller);
3036 		total_completions += poller_completions;
3037 		poller->stats.completions += rdma_completions;
3038 	}
3039 
3040 	STAILQ_FOREACH_SAFE(qpair, &tgroup->connected_qpairs, poll_group_stailq, tmp_qpair) {
3041 		rqpair = nvme_rdma_qpair(qpair);
3042 
3043 		if (spdk_unlikely(rqpair->state <= NVME_RDMA_QPAIR_STATE_INITIALIZING)) {
3044 			continue;
3045 		}
3046 
3047 		if (spdk_unlikely(qpair->ctrlr->timeout_enabled)) {
3048 			nvme_rdma_qpair_check_timeout(qpair);
3049 		}
3050 
3051 		nvme_rdma_qpair_submit_sends(rqpair);
3052 		nvme_rdma_qpair_submit_recvs(rqpair);
3053 		if (rqpair->num_completions > 0) {
3054 			nvme_qpair_resubmit_requests(qpair, rqpair->num_completions);
3055 		}
3056 	}
3057 
3058 	return rc2 != 0 ? rc2 : total_completions;
3059 }
3060 
3061 static int
3062 nvme_rdma_poll_group_destroy(struct spdk_nvme_transport_poll_group *tgroup)
3063 {
3064 	struct nvme_rdma_poll_group	*group = nvme_rdma_poll_group(tgroup);
3065 
3066 	if (!STAILQ_EMPTY(&tgroup->connected_qpairs) || !STAILQ_EMPTY(&tgroup->disconnected_qpairs)) {
3067 		return -EBUSY;
3068 	}
3069 
3070 	nvme_rdma_poll_group_free_pollers(group);
3071 	free(group);
3072 
3073 	return 0;
3074 }
3075 
3076 static int
3077 nvme_rdma_poll_group_get_stats(struct spdk_nvme_transport_poll_group *tgroup,
3078 			       struct spdk_nvme_transport_poll_group_stat **_stats)
3079 {
3080 	struct nvme_rdma_poll_group *group;
3081 	struct spdk_nvme_transport_poll_group_stat *stats;
3082 	struct spdk_nvme_rdma_device_stat *device_stat;
3083 	struct nvme_rdma_poller *poller;
3084 	uint32_t i = 0;
3085 
3086 	if (tgroup == NULL || _stats == NULL) {
3087 		SPDK_ERRLOG("Invalid stats or group pointer\n");
3088 		return -EINVAL;
3089 	}
3090 
3091 	group = nvme_rdma_poll_group(tgroup);
3092 	stats = calloc(1, sizeof(*stats));
3093 	if (!stats) {
3094 		SPDK_ERRLOG("Can't allocate memory for RDMA stats\n");
3095 		return -ENOMEM;
3096 	}
3097 	stats->trtype = SPDK_NVME_TRANSPORT_RDMA;
3098 	stats->rdma.num_devices = group->num_pollers;
3099 	stats->rdma.device_stats = calloc(stats->rdma.num_devices, sizeof(*stats->rdma.device_stats));
3100 	if (!stats->rdma.device_stats) {
3101 		SPDK_ERRLOG("Can't allocate memory for RDMA device stats\n");
3102 		free(stats);
3103 		return -ENOMEM;
3104 	}
3105 
3106 	STAILQ_FOREACH(poller, &group->pollers, link) {
3107 		device_stat = &stats->rdma.device_stats[i];
3108 		device_stat->name = poller->device->device->name;
3109 		device_stat->polls = poller->stats.polls;
3110 		device_stat->idle_polls = poller->stats.idle_polls;
3111 		device_stat->completions = poller->stats.completions;
3112 		device_stat->queued_requests = poller->stats.queued_requests;
3113 		device_stat->total_send_wrs = poller->stats.rdma_stats.send.num_submitted_wrs;
3114 		device_stat->send_doorbell_updates = poller->stats.rdma_stats.send.doorbell_updates;
3115 		device_stat->total_recv_wrs = poller->stats.rdma_stats.recv.num_submitted_wrs;
3116 		device_stat->recv_doorbell_updates = poller->stats.rdma_stats.recv.doorbell_updates;
3117 		i++;
3118 	}
3119 
3120 	*_stats = stats;
3121 
3122 	return 0;
3123 }
3124 
3125 static void
3126 nvme_rdma_poll_group_free_stats(struct spdk_nvme_transport_poll_group *tgroup,
3127 				struct spdk_nvme_transport_poll_group_stat *stats)
3128 {
3129 	if (stats) {
3130 		free(stats->rdma.device_stats);
3131 	}
3132 	free(stats);
3133 }
3134 
3135 static int
3136 nvme_rdma_ctrlr_get_memory_domains(const struct spdk_nvme_ctrlr *ctrlr,
3137 				   struct spdk_memory_domain **domains, int array_size)
3138 {
3139 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(ctrlr->adminq);
3140 
3141 	if (domains && array_size > 0) {
3142 		domains[0] = rqpair->memory_domain->domain;
3143 	}
3144 
3145 	return 1;
3146 }
3147 
3148 void
3149 spdk_nvme_rdma_init_hooks(struct spdk_nvme_rdma_hooks *hooks)
3150 {
3151 	g_nvme_hooks = *hooks;
3152 }
3153 
3154 const struct spdk_nvme_transport_ops rdma_ops = {
3155 	.name = "RDMA",
3156 	.type = SPDK_NVME_TRANSPORT_RDMA,
3157 	.ctrlr_construct = nvme_rdma_ctrlr_construct,
3158 	.ctrlr_scan = nvme_fabric_ctrlr_scan,
3159 	.ctrlr_destruct = nvme_rdma_ctrlr_destruct,
3160 	.ctrlr_enable = nvme_rdma_ctrlr_enable,
3161 
3162 	.ctrlr_set_reg_4 = nvme_fabric_ctrlr_set_reg_4,
3163 	.ctrlr_set_reg_8 = nvme_fabric_ctrlr_set_reg_8,
3164 	.ctrlr_get_reg_4 = nvme_fabric_ctrlr_get_reg_4,
3165 	.ctrlr_get_reg_8 = nvme_fabric_ctrlr_get_reg_8,
3166 	.ctrlr_set_reg_4_async = nvme_fabric_ctrlr_set_reg_4_async,
3167 	.ctrlr_set_reg_8_async = nvme_fabric_ctrlr_set_reg_8_async,
3168 	.ctrlr_get_reg_4_async = nvme_fabric_ctrlr_get_reg_4_async,
3169 	.ctrlr_get_reg_8_async = nvme_fabric_ctrlr_get_reg_8_async,
3170 
3171 	.ctrlr_get_max_xfer_size = nvme_rdma_ctrlr_get_max_xfer_size,
3172 	.ctrlr_get_max_sges = nvme_rdma_ctrlr_get_max_sges,
3173 
3174 	.ctrlr_create_io_qpair = nvme_rdma_ctrlr_create_io_qpair,
3175 	.ctrlr_delete_io_qpair = nvme_rdma_ctrlr_delete_io_qpair,
3176 	.ctrlr_connect_qpair = nvme_rdma_ctrlr_connect_qpair,
3177 	.ctrlr_disconnect_qpair = nvme_rdma_ctrlr_disconnect_qpair,
3178 
3179 	.ctrlr_get_memory_domains = nvme_rdma_ctrlr_get_memory_domains,
3180 
3181 	.qpair_abort_reqs = nvme_rdma_qpair_abort_reqs,
3182 	.qpair_reset = nvme_rdma_qpair_reset,
3183 	.qpair_submit_request = nvme_rdma_qpair_submit_request,
3184 	.qpair_process_completions = nvme_rdma_qpair_process_completions,
3185 	.qpair_iterate_requests = nvme_rdma_qpair_iterate_requests,
3186 	.admin_qpair_abort_aers = nvme_rdma_admin_qpair_abort_aers,
3187 
3188 	.poll_group_create = nvme_rdma_poll_group_create,
3189 	.poll_group_connect_qpair = nvme_rdma_poll_group_connect_qpair,
3190 	.poll_group_disconnect_qpair = nvme_rdma_poll_group_disconnect_qpair,
3191 	.poll_group_add = nvme_rdma_poll_group_add,
3192 	.poll_group_remove = nvme_rdma_poll_group_remove,
3193 	.poll_group_process_completions = nvme_rdma_poll_group_process_completions,
3194 	.poll_group_destroy = nvme_rdma_poll_group_destroy,
3195 	.poll_group_get_stats = nvme_rdma_poll_group_get_stats,
3196 	.poll_group_free_stats = nvme_rdma_poll_group_free_stats,
3197 };
3198 
3199 SPDK_NVME_TRANSPORT_REGISTER(rdma, &rdma_ops);
3200