xref: /spdk/lib/nvme/nvme_rdma.c (revision 5469bd2d12b6f3fa914098168ea9ba8f214ca3ec)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation. All rights reserved.
3  *   Copyright (c) 2019-2021 Mellanox Technologies LTD. All rights reserved.
4  *   Copyright (c) 2021-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 /*
8  * NVMe over RDMA transport
9  */
10 
11 #include "spdk/stdinc.h"
12 
13 #include "spdk/assert.h"
14 #include "spdk/dma.h"
15 #include "spdk/log.h"
16 #include "spdk/trace.h"
17 #include "spdk/queue.h"
18 #include "spdk/nvme.h"
19 #include "spdk/nvmf_spec.h"
20 #include "spdk/string.h"
21 #include "spdk/endian.h"
22 #include "spdk/likely.h"
23 #include "spdk/config.h"
24 
25 #include "nvme_internal.h"
26 #include "spdk_internal/rdma_provider.h"
27 #include "spdk_internal/rdma_utils.h"
28 
29 #define NVME_RDMA_TIME_OUT_IN_MS 2000
30 #define NVME_RDMA_RW_BUFFER_SIZE 131072
31 
32 /*
33  * NVME RDMA qpair Resource Defaults
34  */
35 #define NVME_RDMA_DEFAULT_TX_SGE		2
36 #define NVME_RDMA_DEFAULT_RX_SGE		1
37 
38 /* Max number of NVMe-oF SGL descriptors supported by the host */
39 #define NVME_RDMA_MAX_SGL_DESCRIPTORS		16
40 
41 /* number of STAILQ entries for holding pending RDMA CM events. */
42 #define NVME_RDMA_NUM_CM_EVENTS			256
43 
44 /* The default size for a shared rdma completion queue. */
45 #define DEFAULT_NVME_RDMA_CQ_SIZE		4096
46 
47 /*
48  * In the special case of a stale connection we don't expose a mechanism
49  * for the user to retry the connection so we need to handle it internally.
50  */
51 #define NVME_RDMA_STALE_CONN_RETRY_MAX		5
52 #define NVME_RDMA_STALE_CONN_RETRY_DELAY_US	10000
53 
54 /*
55  * Maximum value of transport_retry_count used by RDMA controller
56  */
57 #define NVME_RDMA_CTRLR_MAX_TRANSPORT_RETRY_COUNT	7
58 
59 /*
60  * Maximum value of transport_ack_timeout used by RDMA controller
61  */
62 #define NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT	31
63 
64 /*
65  * Number of microseconds to wait until the lingering qpair becomes quiet.
66  */
67 #define NVME_RDMA_DISCONNECTED_QPAIR_TIMEOUT_US	1000000ull
68 
69 /*
70  * The max length of keyed SGL data block (3 bytes)
71  */
72 #define NVME_RDMA_MAX_KEYED_SGL_LENGTH ((1u << 24u) - 1)
73 
74 #define WC_PER_QPAIR(queue_depth)	(queue_depth * 2)
75 
76 #define NVME_RDMA_POLL_GROUP_CHECK_QPN(_rqpair, qpn)				\
77 	((_rqpair)->rdma_qp && (_rqpair)->rdma_qp->qp->qp_num == (qpn))	\
78 
79 enum nvme_rdma_wr_type {
80 	RDMA_WR_TYPE_RECV,
81 	RDMA_WR_TYPE_SEND,
82 };
83 
84 struct nvme_rdma_wr {
85 	/* Using this instead of the enum allows this struct to only occupy one byte. */
86 	uint8_t	type;
87 };
88 
89 struct spdk_nvmf_cmd {
90 	struct spdk_nvme_cmd cmd;
91 	struct spdk_nvme_sgl_descriptor sgl[NVME_RDMA_MAX_SGL_DESCRIPTORS];
92 };
93 
94 struct spdk_nvme_rdma_hooks g_nvme_hooks = {};
95 
96 /* STAILQ wrapper for cm events. */
97 struct nvme_rdma_cm_event_entry {
98 	struct rdma_cm_event			*evt;
99 	STAILQ_ENTRY(nvme_rdma_cm_event_entry)	link;
100 };
101 
102 /* NVMe RDMA transport extensions for spdk_nvme_ctrlr */
103 struct nvme_rdma_ctrlr {
104 	struct spdk_nvme_ctrlr			ctrlr;
105 
106 	uint16_t				max_sge;
107 
108 	struct rdma_event_channel		*cm_channel;
109 
110 	STAILQ_HEAD(, nvme_rdma_cm_event_entry)	pending_cm_events;
111 
112 	STAILQ_HEAD(, nvme_rdma_cm_event_entry)	free_cm_events;
113 
114 	struct nvme_rdma_cm_event_entry		*cm_events;
115 };
116 
117 struct nvme_rdma_poller_stats {
118 	uint64_t polls;
119 	uint64_t idle_polls;
120 	uint64_t queued_requests;
121 	uint64_t completions;
122 	struct spdk_rdma_provider_qp_stats rdma_stats;
123 };
124 
125 struct nvme_rdma_poll_group;
126 struct nvme_rdma_rsps;
127 
128 struct nvme_rdma_poller {
129 	struct ibv_context		*device;
130 	struct ibv_cq			*cq;
131 	struct spdk_rdma_provider_srq	*srq;
132 	struct nvme_rdma_rsps		*rsps;
133 	struct ibv_pd			*pd;
134 	struct spdk_rdma_utils_mem_map	*mr_map;
135 	uint32_t			refcnt;
136 	int				required_num_wc;
137 	int				current_num_wc;
138 	struct nvme_rdma_poller_stats	stats;
139 	struct nvme_rdma_poll_group	*group;
140 	STAILQ_ENTRY(nvme_rdma_poller)	link;
141 };
142 
143 struct nvme_rdma_qpair;
144 
145 struct nvme_rdma_poll_group {
146 	struct spdk_nvme_transport_poll_group		group;
147 	STAILQ_HEAD(, nvme_rdma_poller)			pollers;
148 	uint32_t					num_pollers;
149 	TAILQ_HEAD(, nvme_rdma_qpair)			connecting_qpairs;
150 	TAILQ_HEAD(, nvme_rdma_qpair)			active_qpairs;
151 };
152 
153 enum nvme_rdma_qpair_state {
154 	NVME_RDMA_QPAIR_STATE_INVALID = 0,
155 	NVME_RDMA_QPAIR_STATE_STALE_CONN,
156 	NVME_RDMA_QPAIR_STATE_INITIALIZING,
157 	NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_SEND,
158 	NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_POLL,
159 	NVME_RDMA_QPAIR_STATE_AUTHENTICATING,
160 	NVME_RDMA_QPAIR_STATE_RUNNING,
161 	NVME_RDMA_QPAIR_STATE_EXITING,
162 	NVME_RDMA_QPAIR_STATE_LINGERING,
163 	NVME_RDMA_QPAIR_STATE_EXITED,
164 };
165 
166 typedef int (*nvme_rdma_cm_event_cb)(struct nvme_rdma_qpair *rqpair, int ret);
167 
168 struct nvme_rdma_rsp_opts {
169 	uint16_t				num_entries;
170 	struct nvme_rdma_qpair			*rqpair;
171 	struct spdk_rdma_provider_srq		*srq;
172 	struct spdk_rdma_utils_mem_map		*mr_map;
173 };
174 
175 struct nvme_rdma_rsps {
176 	/* Parallel arrays of response buffers + response SGLs of size num_entries */
177 	struct ibv_sge				*rsp_sgls;
178 	struct spdk_nvme_rdma_rsp		*rsps;
179 
180 	struct ibv_recv_wr			*rsp_recv_wrs;
181 
182 	/* Count of outstanding recv objects */
183 	uint16_t				current_num_recvs;
184 
185 	uint16_t				num_entries;
186 };
187 
188 /* NVMe RDMA qpair extensions for spdk_nvme_qpair */
189 struct nvme_rdma_qpair {
190 	struct spdk_nvme_qpair			qpair;
191 
192 	struct spdk_rdma_provider_qp		*rdma_qp;
193 	struct rdma_cm_id			*cm_id;
194 	struct ibv_cq				*cq;
195 	struct spdk_rdma_provider_srq		*srq;
196 
197 	struct	spdk_nvme_rdma_req		*rdma_reqs;
198 
199 	uint32_t				max_send_sge;
200 
201 	uint16_t				num_entries;
202 
203 	bool					delay_cmd_submit;
204 
205 	uint32_t				num_completions;
206 	uint32_t				num_outstanding_reqs;
207 
208 	struct nvme_rdma_rsps			*rsps;
209 
210 	/*
211 	 * Array of num_entries NVMe commands registered as RDMA message buffers.
212 	 * Indexed by rdma_req->id.
213 	 */
214 	struct spdk_nvmf_cmd			*cmds;
215 
216 	struct spdk_rdma_utils_mem_map		*mr_map;
217 
218 	TAILQ_HEAD(, spdk_nvme_rdma_req)	free_reqs;
219 	TAILQ_HEAD(, spdk_nvme_rdma_req)	outstanding_reqs;
220 
221 	/* Count of outstanding send objects */
222 	uint16_t				current_num_sends;
223 
224 	TAILQ_ENTRY(nvme_rdma_qpair)		link_active;
225 
226 	/* Placed at the end of the struct since it is not used frequently */
227 	struct rdma_cm_event			*evt;
228 	struct nvme_rdma_poller			*poller;
229 
230 	uint64_t				evt_timeout_ticks;
231 	nvme_rdma_cm_event_cb			evt_cb;
232 	enum rdma_cm_event_type			expected_evt_type;
233 
234 	enum nvme_rdma_qpair_state		state;
235 
236 	bool					in_connect_poll;
237 
238 	uint8_t					stale_conn_retry_count;
239 	bool					need_destroy;
240 	TAILQ_ENTRY(nvme_rdma_qpair)		link_connecting;
241 };
242 
243 enum NVME_RDMA_COMPLETION_FLAGS {
244 	NVME_RDMA_SEND_COMPLETED = 1u << 0,
245 	NVME_RDMA_RECV_COMPLETED = 1u << 1,
246 };
247 
248 struct spdk_nvme_rdma_req {
249 	uint16_t				id;
250 	uint16_t				completion_flags: 2;
251 	uint16_t				in_progress_accel: 1;
252 	uint16_t				reserved: 13;
253 	/* if completion of RDMA_RECV received before RDMA_SEND, we will complete nvme request
254 	 * during processing of RDMA_SEND. To complete the request we must know the response
255 	 * received in RDMA_RECV, so store it in this field */
256 	struct spdk_nvme_rdma_rsp		*rdma_rsp;
257 
258 	struct nvme_rdma_wr			rdma_wr;
259 
260 	struct ibv_send_wr			send_wr;
261 
262 	struct nvme_request			*req;
263 
264 	struct ibv_sge				send_sgl[NVME_RDMA_DEFAULT_TX_SGE];
265 
266 	TAILQ_ENTRY(spdk_nvme_rdma_req)		link;
267 
268 	/* Fields below are not used in regular IO path, keep them last */
269 	spdk_memory_domain_data_cpl_cb		transfer_cpl_cb;
270 	void					*transfer_cpl_cb_arg;
271 	/* Accel sequence API works with iovec pointer, we need to store result of next_sge callback */
272 	struct iovec				iovs[NVME_RDMA_MAX_SGL_DESCRIPTORS];
273 };
274 
275 struct spdk_nvme_rdma_rsp {
276 	struct spdk_nvme_cpl	cpl;
277 	struct nvme_rdma_qpair	*rqpair;
278 	struct ibv_recv_wr	*recv_wr;
279 	struct nvme_rdma_wr	rdma_wr;
280 };
281 
282 struct nvme_rdma_memory_translation_ctx {
283 	void *addr;
284 	size_t length;
285 	uint32_t lkey;
286 	uint32_t rkey;
287 };
288 
289 static const char *rdma_cm_event_str[] = {
290 	"RDMA_CM_EVENT_ADDR_RESOLVED",
291 	"RDMA_CM_EVENT_ADDR_ERROR",
292 	"RDMA_CM_EVENT_ROUTE_RESOLVED",
293 	"RDMA_CM_EVENT_ROUTE_ERROR",
294 	"RDMA_CM_EVENT_CONNECT_REQUEST",
295 	"RDMA_CM_EVENT_CONNECT_RESPONSE",
296 	"RDMA_CM_EVENT_CONNECT_ERROR",
297 	"RDMA_CM_EVENT_UNREACHABLE",
298 	"RDMA_CM_EVENT_REJECTED",
299 	"RDMA_CM_EVENT_ESTABLISHED",
300 	"RDMA_CM_EVENT_DISCONNECTED",
301 	"RDMA_CM_EVENT_DEVICE_REMOVAL",
302 	"RDMA_CM_EVENT_MULTICAST_JOIN",
303 	"RDMA_CM_EVENT_MULTICAST_ERROR",
304 	"RDMA_CM_EVENT_ADDR_CHANGE",
305 	"RDMA_CM_EVENT_TIMEWAIT_EXIT"
306 };
307 
308 static struct nvme_rdma_poller *nvme_rdma_poll_group_get_poller(struct nvme_rdma_poll_group *group,
309 		struct ibv_context *device);
310 static void nvme_rdma_poll_group_put_poller(struct nvme_rdma_poll_group *group,
311 		struct nvme_rdma_poller *poller);
312 
313 static int nvme_rdma_ctrlr_delete_io_qpair(struct spdk_nvme_ctrlr *ctrlr,
314 		struct spdk_nvme_qpair *qpair);
315 
316 static inline int nvme_rdma_memory_domain_transfer_data(struct spdk_memory_domain *dst_domain,
317 		void *dst_domain_ctx,
318 		struct iovec *dst_iov, uint32_t dst_iovcnt,
319 		struct spdk_memory_domain *src_domain, void *src_domain_ctx,
320 		struct iovec *src_iov, uint32_t src_iovcnt,
321 		struct spdk_memory_domain_translation_result *translation,
322 		spdk_memory_domain_data_cpl_cb cpl_cb, void *cpl_cb_arg);
323 
324 static inline int _nvme_rdma_qpair_submit_request(struct nvme_rdma_qpair *rqpair,
325 		struct spdk_nvme_rdma_req *rdma_req);
326 
327 static inline struct nvme_rdma_qpair *
328 nvme_rdma_qpair(struct spdk_nvme_qpair *qpair)
329 {
330 	assert(qpair->trtype == SPDK_NVME_TRANSPORT_RDMA);
331 	return SPDK_CONTAINEROF(qpair, struct nvme_rdma_qpair, qpair);
332 }
333 
334 static inline struct nvme_rdma_poll_group *
335 nvme_rdma_poll_group(struct spdk_nvme_transport_poll_group *group)
336 {
337 	return (SPDK_CONTAINEROF(group, struct nvme_rdma_poll_group, group));
338 }
339 
340 static inline struct nvme_rdma_ctrlr *
341 nvme_rdma_ctrlr(struct spdk_nvme_ctrlr *ctrlr)
342 {
343 	assert(ctrlr->trid.trtype == SPDK_NVME_TRANSPORT_RDMA);
344 	return SPDK_CONTAINEROF(ctrlr, struct nvme_rdma_ctrlr, ctrlr);
345 }
346 
347 static inline struct spdk_nvme_rdma_req *
348 nvme_rdma_req_get(struct nvme_rdma_qpair *rqpair)
349 {
350 	struct spdk_nvme_rdma_req *rdma_req;
351 
352 	rdma_req = TAILQ_FIRST(&rqpair->free_reqs);
353 	if (spdk_likely(rdma_req)) {
354 		TAILQ_REMOVE(&rqpair->free_reqs, rdma_req, link);
355 	}
356 
357 	return rdma_req;
358 }
359 
360 static inline void
361 nvme_rdma_req_put(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
362 {
363 	rdma_req->completion_flags = 0;
364 	rdma_req->req = NULL;
365 	rdma_req->rdma_rsp = NULL;
366 	assert(rdma_req->transfer_cpl_cb == NULL);
367 	TAILQ_INSERT_HEAD(&rqpair->free_reqs, rdma_req, link);
368 }
369 
370 static inline void
371 nvme_rdma_finish_data_transfer(struct spdk_nvme_rdma_req *rdma_req, int rc)
372 {
373 	spdk_memory_domain_data_cpl_cb cb = rdma_req->transfer_cpl_cb;
374 
375 	SPDK_DEBUGLOG(nvme, "req %p, finish data transfer, rc %d\n", rdma_req, rc);
376 	rdma_req->transfer_cpl_cb = NULL;
377 	assert(cb);
378 	cb(rdma_req->transfer_cpl_cb_arg, rc);
379 }
380 
381 static void
382 nvme_rdma_req_complete(struct spdk_nvme_rdma_req *rdma_req,
383 		       struct spdk_nvme_cpl *rsp,
384 		       bool print_on_error)
385 {
386 	struct nvme_request *req = rdma_req->req;
387 	struct nvme_rdma_qpair *rqpair;
388 	struct spdk_nvme_qpair *qpair;
389 	bool error, print_error;
390 
391 	assert(req != NULL);
392 
393 	qpair = req->qpair;
394 	rqpair = nvme_rdma_qpair(qpair);
395 
396 	error = spdk_nvme_cpl_is_error(rsp);
397 	print_error = error && print_on_error && !qpair->ctrlr->opts.disable_error_logging;
398 
399 	if (print_error) {
400 		spdk_nvme_qpair_print_command(qpair, &req->cmd);
401 	}
402 
403 	if (print_error || SPDK_DEBUGLOG_FLAG_ENABLED("nvme")) {
404 		spdk_nvme_qpair_print_completion(qpair, rsp);
405 	}
406 
407 	assert(rqpair->num_outstanding_reqs > 0);
408 	rqpair->num_outstanding_reqs--;
409 
410 	TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link);
411 
412 	nvme_complete_request(req->cb_fn, req->cb_arg, qpair, req, rsp);
413 	nvme_rdma_req_put(rqpair, rdma_req);
414 }
415 
416 static const char *
417 nvme_rdma_cm_event_str_get(uint32_t event)
418 {
419 	if (event < SPDK_COUNTOF(rdma_cm_event_str)) {
420 		return rdma_cm_event_str[event];
421 	} else {
422 		return "Undefined";
423 	}
424 }
425 
426 
427 static int
428 nvme_rdma_qpair_process_cm_event(struct nvme_rdma_qpair *rqpair)
429 {
430 	struct rdma_cm_event				*event = rqpair->evt;
431 	struct spdk_nvmf_rdma_accept_private_data	*accept_data;
432 	int						rc = 0;
433 
434 	if (event) {
435 		switch (event->event) {
436 		case RDMA_CM_EVENT_ADDR_RESOLVED:
437 		case RDMA_CM_EVENT_ADDR_ERROR:
438 		case RDMA_CM_EVENT_ROUTE_RESOLVED:
439 		case RDMA_CM_EVENT_ROUTE_ERROR:
440 			break;
441 		case RDMA_CM_EVENT_CONNECT_REQUEST:
442 			break;
443 		case RDMA_CM_EVENT_CONNECT_ERROR:
444 			break;
445 		case RDMA_CM_EVENT_UNREACHABLE:
446 		case RDMA_CM_EVENT_REJECTED:
447 			break;
448 		case RDMA_CM_EVENT_CONNECT_RESPONSE:
449 			rc = spdk_rdma_provider_qp_complete_connect(rqpair->rdma_qp);
450 		/* fall through */
451 		case RDMA_CM_EVENT_ESTABLISHED:
452 			accept_data = (struct spdk_nvmf_rdma_accept_private_data *)event->param.conn.private_data;
453 			if (accept_data == NULL) {
454 				rc = -1;
455 			} else {
456 				SPDK_DEBUGLOG(nvme, "Requested queue depth %d. Target receive queue depth %d.\n",
457 					      rqpair->num_entries + 1, accept_data->crqsize);
458 			}
459 			break;
460 		case RDMA_CM_EVENT_DISCONNECTED:
461 			rqpair->qpair.transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_REMOTE;
462 			break;
463 		case RDMA_CM_EVENT_DEVICE_REMOVAL:
464 			rqpair->qpair.transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_LOCAL;
465 			rqpair->need_destroy = true;
466 			break;
467 		case RDMA_CM_EVENT_MULTICAST_JOIN:
468 		case RDMA_CM_EVENT_MULTICAST_ERROR:
469 			break;
470 		case RDMA_CM_EVENT_ADDR_CHANGE:
471 			rqpair->qpair.transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_LOCAL;
472 			break;
473 		case RDMA_CM_EVENT_TIMEWAIT_EXIT:
474 			break;
475 		default:
476 			SPDK_ERRLOG("Unexpected Acceptor Event [%d]\n", event->event);
477 			break;
478 		}
479 		rqpair->evt = NULL;
480 		rdma_ack_cm_event(event);
481 	}
482 
483 	return rc;
484 }
485 
486 /*
487  * This function must be called under the nvme controller's lock
488  * because it touches global controller variables. The lock is taken
489  * by the generic transport code before invoking a few of the functions
490  * in this file: nvme_rdma_ctrlr_connect_qpair, nvme_rdma_ctrlr_delete_io_qpair,
491  * and conditionally nvme_rdma_qpair_process_completions when it is calling
492  * completions on the admin qpair. When adding a new call to this function, please
493  * verify that it is in a situation where it falls under the lock.
494  */
495 static int
496 nvme_rdma_poll_events(struct nvme_rdma_ctrlr *rctrlr)
497 {
498 	struct nvme_rdma_cm_event_entry	*entry, *tmp;
499 	struct nvme_rdma_qpair		*event_qpair;
500 	struct rdma_cm_event		*event;
501 	struct rdma_event_channel	*channel = rctrlr->cm_channel;
502 
503 	STAILQ_FOREACH_SAFE(entry, &rctrlr->pending_cm_events, link, tmp) {
504 		event_qpair = entry->evt->id->context;
505 		if (event_qpair->evt == NULL) {
506 			event_qpair->evt = entry->evt;
507 			STAILQ_REMOVE(&rctrlr->pending_cm_events, entry, nvme_rdma_cm_event_entry, link);
508 			STAILQ_INSERT_HEAD(&rctrlr->free_cm_events, entry, link);
509 		}
510 	}
511 
512 	while (rdma_get_cm_event(channel, &event) == 0) {
513 		event_qpair = event->id->context;
514 		if (event_qpair->evt == NULL) {
515 			event_qpair->evt = event;
516 		} else {
517 			assert(rctrlr == nvme_rdma_ctrlr(event_qpair->qpair.ctrlr));
518 			entry = STAILQ_FIRST(&rctrlr->free_cm_events);
519 			if (entry == NULL) {
520 				rdma_ack_cm_event(event);
521 				return -ENOMEM;
522 			}
523 			STAILQ_REMOVE(&rctrlr->free_cm_events, entry, nvme_rdma_cm_event_entry, link);
524 			entry->evt = event;
525 			STAILQ_INSERT_TAIL(&rctrlr->pending_cm_events, entry, link);
526 		}
527 	}
528 
529 	/* rdma_get_cm_event() returns -1 on error. If an error occurs, errno
530 	 * will be set to indicate the failure reason. So return negated errno here.
531 	 */
532 	return -errno;
533 }
534 
535 static int
536 nvme_rdma_validate_cm_event(enum rdma_cm_event_type expected_evt_type,
537 			    struct rdma_cm_event *reaped_evt)
538 {
539 	int rc = -EBADMSG;
540 
541 	if (expected_evt_type == reaped_evt->event) {
542 		return 0;
543 	}
544 
545 	switch (expected_evt_type) {
546 	case RDMA_CM_EVENT_ESTABLISHED:
547 		/*
548 		 * There is an enum ib_cm_rej_reason in the kernel headers that sets 10 as
549 		 * IB_CM_REJ_STALE_CONN. I can't find the corresponding userspace but we get
550 		 * the same values here.
551 		 */
552 		if (reaped_evt->event == RDMA_CM_EVENT_REJECTED && reaped_evt->status == 10) {
553 			rc = -ESTALE;
554 		} else if (reaped_evt->event == RDMA_CM_EVENT_CONNECT_RESPONSE) {
555 			/*
556 			 *  If we are using a qpair which is not created using rdma cm API
557 			 *  then we will receive RDMA_CM_EVENT_CONNECT_RESPONSE instead of
558 			 *  RDMA_CM_EVENT_ESTABLISHED.
559 			 */
560 			return 0;
561 		}
562 		break;
563 	default:
564 		break;
565 	}
566 
567 	SPDK_ERRLOG("Expected %s but received %s (%d) from CM event channel (status = %d)\n",
568 		    nvme_rdma_cm_event_str_get(expected_evt_type),
569 		    nvme_rdma_cm_event_str_get(reaped_evt->event), reaped_evt->event,
570 		    reaped_evt->status);
571 	return rc;
572 }
573 
574 static int
575 nvme_rdma_process_event_start(struct nvme_rdma_qpair *rqpair,
576 			      enum rdma_cm_event_type evt,
577 			      nvme_rdma_cm_event_cb evt_cb)
578 {
579 	int	rc;
580 
581 	assert(evt_cb != NULL);
582 
583 	if (rqpair->evt != NULL) {
584 		rc = nvme_rdma_qpair_process_cm_event(rqpair);
585 		if (rc) {
586 			return rc;
587 		}
588 	}
589 
590 	rqpair->expected_evt_type = evt;
591 	rqpair->evt_cb = evt_cb;
592 	rqpair->evt_timeout_ticks = (g_spdk_nvme_transport_opts.rdma_cm_event_timeout_ms * 1000 *
593 				     spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC + spdk_get_ticks();
594 
595 	return 0;
596 }
597 
598 static int
599 nvme_rdma_process_event_poll(struct nvme_rdma_qpair *rqpair)
600 {
601 	struct nvme_rdma_ctrlr	*rctrlr;
602 	int	rc = 0, rc2;
603 
604 	rctrlr = nvme_rdma_ctrlr(rqpair->qpair.ctrlr);
605 	assert(rctrlr != NULL);
606 
607 	if (!rqpair->evt && spdk_get_ticks() < rqpair->evt_timeout_ticks) {
608 		rc = nvme_rdma_poll_events(rctrlr);
609 		if (rc == -EAGAIN || rc == -EWOULDBLOCK) {
610 			return rc;
611 		}
612 	}
613 
614 	if (rqpair->evt == NULL) {
615 		rc = -EADDRNOTAVAIL;
616 		goto exit;
617 	}
618 
619 	rc = nvme_rdma_validate_cm_event(rqpair->expected_evt_type, rqpair->evt);
620 
621 	rc2 = nvme_rdma_qpair_process_cm_event(rqpair);
622 	/* bad message takes precedence over the other error codes from processing the event. */
623 	rc = rc == 0 ? rc2 : rc;
624 
625 exit:
626 	assert(rqpair->evt_cb != NULL);
627 	return rqpair->evt_cb(rqpair, rc);
628 }
629 
630 static int
631 nvme_rdma_resize_cq(struct nvme_rdma_qpair *rqpair, struct nvme_rdma_poller *poller)
632 {
633 	int	current_num_wc, required_num_wc;
634 	int	max_cq_size;
635 
636 	required_num_wc = poller->required_num_wc + WC_PER_QPAIR(rqpair->num_entries);
637 	current_num_wc = poller->current_num_wc;
638 	if (current_num_wc < required_num_wc) {
639 		current_num_wc = spdk_max(current_num_wc * 2, required_num_wc);
640 	}
641 
642 	max_cq_size = g_spdk_nvme_transport_opts.rdma_max_cq_size;
643 	if (max_cq_size != 0 && current_num_wc > max_cq_size) {
644 		current_num_wc = max_cq_size;
645 	}
646 
647 	if (poller->current_num_wc != current_num_wc) {
648 		SPDK_DEBUGLOG(nvme, "Resize RDMA CQ from %d to %d\n", poller->current_num_wc,
649 			      current_num_wc);
650 		if (ibv_resize_cq(poller->cq, current_num_wc)) {
651 			SPDK_ERRLOG("RDMA CQ resize failed: errno %d: %s\n", errno, spdk_strerror(errno));
652 			return -1;
653 		}
654 
655 		poller->current_num_wc = current_num_wc;
656 	}
657 
658 	poller->required_num_wc = required_num_wc;
659 	return 0;
660 }
661 
662 static int
663 nvme_rdma_qpair_set_poller(struct spdk_nvme_qpair *qpair)
664 {
665 	struct nvme_rdma_qpair          *rqpair = nvme_rdma_qpair(qpair);
666 	struct nvme_rdma_poll_group     *group = nvme_rdma_poll_group(qpair->poll_group);
667 	struct nvme_rdma_poller         *poller;
668 
669 	assert(rqpair->cq == NULL);
670 
671 	poller = nvme_rdma_poll_group_get_poller(group, rqpair->cm_id->verbs);
672 	if (!poller) {
673 		SPDK_ERRLOG("Unable to find a cq for qpair %p on poll group %p\n", qpair, qpair->poll_group);
674 		return -EINVAL;
675 	}
676 
677 	if (!poller->srq) {
678 		if (nvme_rdma_resize_cq(rqpair, poller)) {
679 			nvme_rdma_poll_group_put_poller(group, poller);
680 			return -EPROTO;
681 		}
682 	}
683 
684 	rqpair->cq = poller->cq;
685 	rqpair->srq = poller->srq;
686 	if (rqpair->srq) {
687 		rqpair->rsps = poller->rsps;
688 	}
689 	rqpair->poller = poller;
690 	return 0;
691 }
692 
693 static int
694 nvme_rdma_qpair_init(struct nvme_rdma_qpair *rqpair)
695 {
696 	int			rc;
697 	struct spdk_rdma_provider_qp_init_attr	attr = {};
698 	struct ibv_device_attr	dev_attr;
699 	struct nvme_rdma_ctrlr	*rctrlr;
700 	uint32_t num_cqe, max_num_cqe;
701 
702 	rc = ibv_query_device(rqpair->cm_id->verbs, &dev_attr);
703 	if (rc != 0) {
704 		SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
705 		return -1;
706 	}
707 
708 	if (rqpair->qpair.poll_group) {
709 		assert(!rqpair->cq);
710 		rc = nvme_rdma_qpair_set_poller(&rqpair->qpair);
711 		if (rc) {
712 			SPDK_ERRLOG("Unable to activate the rdmaqpair.\n");
713 			return -1;
714 		}
715 		assert(rqpair->cq);
716 	} else {
717 		num_cqe = rqpair->num_entries * 2;
718 		max_num_cqe = g_spdk_nvme_transport_opts.rdma_max_cq_size;
719 		if (max_num_cqe != 0 && num_cqe > max_num_cqe) {
720 			num_cqe = max_num_cqe;
721 		}
722 		rqpair->cq = ibv_create_cq(rqpair->cm_id->verbs, num_cqe, rqpair, NULL, 0);
723 		if (!rqpair->cq) {
724 			SPDK_ERRLOG("Unable to create completion queue: errno %d: %s\n", errno, spdk_strerror(errno));
725 			return -1;
726 		}
727 	}
728 
729 	rctrlr = nvme_rdma_ctrlr(rqpair->qpair.ctrlr);
730 	if (g_nvme_hooks.get_ibv_pd) {
731 		attr.pd = g_nvme_hooks.get_ibv_pd(&rctrlr->ctrlr.trid, rqpair->cm_id->verbs);
732 	} else {
733 		attr.pd = spdk_rdma_utils_get_pd(rqpair->cm_id->verbs);
734 	}
735 
736 	attr.stats		= rqpair->poller ? &rqpair->poller->stats.rdma_stats : NULL;
737 	attr.send_cq		= rqpair->cq;
738 	attr.recv_cq		= rqpair->cq;
739 	attr.cap.max_send_wr	= rqpair->num_entries; /* SEND operations */
740 	if (rqpair->srq) {
741 		attr.srq	= rqpair->srq->srq;
742 	} else {
743 		attr.cap.max_recv_wr = rqpair->num_entries; /* RECV operations */
744 	}
745 	attr.cap.max_send_sge	= spdk_min(NVME_RDMA_DEFAULT_TX_SGE, dev_attr.max_sge);
746 	attr.cap.max_recv_sge	= spdk_min(NVME_RDMA_DEFAULT_RX_SGE, dev_attr.max_sge);
747 	attr.domain_transfer	= spdk_rdma_provider_accel_sequence_supported() ?
748 				  nvme_rdma_memory_domain_transfer_data : NULL;
749 
750 	rqpair->rdma_qp = spdk_rdma_provider_qp_create(rqpair->cm_id, &attr);
751 
752 	if (!rqpair->rdma_qp) {
753 		return -1;
754 	}
755 
756 	/* ibv_create_qp will change the values in attr.cap. Make sure we store the proper value. */
757 	rqpair->max_send_sge = spdk_min(NVME_RDMA_DEFAULT_TX_SGE, attr.cap.max_send_sge);
758 	rqpair->current_num_sends = 0;
759 
760 	rqpair->cm_id->context = rqpair;
761 
762 	return 0;
763 }
764 
765 static void
766 nvme_rdma_reset_failed_sends(struct nvme_rdma_qpair *rqpair,
767 			     struct ibv_send_wr *bad_send_wr)
768 {
769 	while (bad_send_wr != NULL) {
770 		assert(rqpair->current_num_sends > 0);
771 		rqpair->current_num_sends--;
772 		bad_send_wr = bad_send_wr->next;
773 	}
774 }
775 
776 static void
777 nvme_rdma_reset_failed_recvs(struct nvme_rdma_rsps *rsps,
778 			     struct ibv_recv_wr *bad_recv_wr, int rc)
779 {
780 	SPDK_ERRLOG("Failed to post WRs on receive queue, errno %d (%s), bad_wr %p\n",
781 		    rc, spdk_strerror(rc), bad_recv_wr);
782 	while (bad_recv_wr != NULL) {
783 		assert(rsps->current_num_recvs > 0);
784 		rsps->current_num_recvs--;
785 		bad_recv_wr = bad_recv_wr->next;
786 	}
787 }
788 
789 static inline int
790 nvme_rdma_qpair_submit_sends(struct nvme_rdma_qpair *rqpair)
791 {
792 	struct ibv_send_wr *bad_send_wr = NULL;
793 	int rc;
794 
795 	rc = spdk_rdma_provider_qp_flush_send_wrs(rqpair->rdma_qp, &bad_send_wr);
796 
797 	if (spdk_unlikely(rc)) {
798 		SPDK_ERRLOG("Failed to post WRs on send queue, errno %d (%s), bad_wr %p\n",
799 			    rc, spdk_strerror(rc), bad_send_wr);
800 		nvme_rdma_reset_failed_sends(rqpair, bad_send_wr);
801 	}
802 
803 	return rc;
804 }
805 
806 static inline int
807 nvme_rdma_qpair_submit_recvs(struct nvme_rdma_qpair *rqpair)
808 {
809 	struct ibv_recv_wr *bad_recv_wr;
810 	int rc = 0;
811 
812 	rc = spdk_rdma_provider_qp_flush_recv_wrs(rqpair->rdma_qp, &bad_recv_wr);
813 	if (spdk_unlikely(rc)) {
814 		nvme_rdma_reset_failed_recvs(rqpair->rsps, bad_recv_wr, rc);
815 	}
816 
817 	return rc;
818 }
819 
820 static inline int
821 nvme_rdma_poller_submit_recvs(struct nvme_rdma_poller *poller)
822 {
823 	struct ibv_recv_wr *bad_recv_wr;
824 	int rc;
825 
826 	rc = spdk_rdma_provider_srq_flush_recv_wrs(poller->srq, &bad_recv_wr);
827 	if (spdk_unlikely(rc)) {
828 		nvme_rdma_reset_failed_recvs(poller->rsps, bad_recv_wr, rc);
829 	}
830 
831 	return rc;
832 }
833 
834 #define nvme_rdma_trace_ibv_sge(sg_list) \
835 	if (sg_list) { \
836 		SPDK_DEBUGLOG(nvme, "local addr %p length 0x%x lkey 0x%x\n", \
837 			      (void *)(sg_list)->addr, (sg_list)->length, (sg_list)->lkey); \
838 	}
839 
840 static void
841 nvme_rdma_free_rsps(struct nvme_rdma_rsps *rsps)
842 {
843 	if (!rsps) {
844 		return;
845 	}
846 
847 	spdk_free(rsps->rsps);
848 	spdk_free(rsps->rsp_sgls);
849 	spdk_free(rsps->rsp_recv_wrs);
850 	spdk_free(rsps);
851 }
852 
853 static struct nvme_rdma_rsps *
854 nvme_rdma_create_rsps(struct nvme_rdma_rsp_opts *opts)
855 {
856 	struct nvme_rdma_rsps *rsps;
857 	struct spdk_rdma_utils_memory_translation translation;
858 	uint16_t i;
859 	int rc;
860 
861 	rsps = spdk_zmalloc(sizeof(*rsps), 0, NULL, SPDK_ENV_NUMA_ID_ANY, SPDK_MALLOC_DMA);
862 	if (!rsps) {
863 		SPDK_ERRLOG("Failed to allocate rsps object\n");
864 		return NULL;
865 	}
866 
867 	rsps->rsp_sgls = spdk_zmalloc(opts->num_entries * sizeof(*rsps->rsp_sgls), 0, NULL,
868 				      SPDK_ENV_NUMA_ID_ANY, SPDK_MALLOC_DMA);
869 	if (!rsps->rsp_sgls) {
870 		SPDK_ERRLOG("Failed to allocate rsp_sgls\n");
871 		goto fail;
872 	}
873 
874 	rsps->rsp_recv_wrs = spdk_zmalloc(opts->num_entries * sizeof(*rsps->rsp_recv_wrs), 0, NULL,
875 					  SPDK_ENV_NUMA_ID_ANY, SPDK_MALLOC_DMA);
876 	if (!rsps->rsp_recv_wrs) {
877 		SPDK_ERRLOG("Failed to allocate rsp_recv_wrs\n");
878 		goto fail;
879 	}
880 
881 	rsps->rsps = spdk_zmalloc(opts->num_entries * sizeof(*rsps->rsps), 0, NULL,
882 				  SPDK_ENV_NUMA_ID_ANY, SPDK_MALLOC_DMA);
883 	if (!rsps->rsps) {
884 		SPDK_ERRLOG("can not allocate rdma rsps\n");
885 		goto fail;
886 	}
887 
888 	for (i = 0; i < opts->num_entries; i++) {
889 		struct ibv_sge *rsp_sgl = &rsps->rsp_sgls[i];
890 		struct spdk_nvme_rdma_rsp *rsp = &rsps->rsps[i];
891 		struct ibv_recv_wr *recv_wr = &rsps->rsp_recv_wrs[i];
892 
893 		rsp->rqpair = opts->rqpair;
894 		rsp->rdma_wr.type = RDMA_WR_TYPE_RECV;
895 		rsp->recv_wr = recv_wr;
896 		rsp_sgl->addr = (uint64_t)rsp;
897 		rsp_sgl->length = sizeof(struct spdk_nvme_cpl);
898 		rc = spdk_rdma_utils_get_translation(opts->mr_map, rsp, sizeof(*rsp), &translation);
899 		if (rc) {
900 			goto fail;
901 		}
902 		rsp_sgl->lkey = spdk_rdma_utils_memory_translation_get_lkey(&translation);
903 
904 		recv_wr->wr_id = (uint64_t)&rsp->rdma_wr;
905 		recv_wr->next = NULL;
906 		recv_wr->sg_list = rsp_sgl;
907 		recv_wr->num_sge = 1;
908 
909 		nvme_rdma_trace_ibv_sge(recv_wr->sg_list);
910 
911 		if (opts->rqpair) {
912 			spdk_rdma_provider_qp_queue_recv_wrs(opts->rqpair->rdma_qp, recv_wr);
913 		} else {
914 			spdk_rdma_provider_srq_queue_recv_wrs(opts->srq, recv_wr);
915 		}
916 	}
917 
918 	rsps->num_entries = opts->num_entries;
919 	rsps->current_num_recvs = opts->num_entries;
920 
921 	return rsps;
922 fail:
923 	nvme_rdma_free_rsps(rsps);
924 	return NULL;
925 }
926 
927 static void
928 nvme_rdma_free_reqs(struct nvme_rdma_qpair *rqpair)
929 {
930 	if (!rqpair->rdma_reqs) {
931 		return;
932 	}
933 
934 	spdk_free(rqpair->cmds);
935 	rqpair->cmds = NULL;
936 
937 	spdk_free(rqpair->rdma_reqs);
938 	rqpair->rdma_reqs = NULL;
939 }
940 
941 static int
942 nvme_rdma_create_reqs(struct nvme_rdma_qpair *rqpair)
943 {
944 	struct spdk_rdma_utils_memory_translation translation;
945 	uint16_t i;
946 	int rc;
947 
948 	assert(!rqpair->rdma_reqs);
949 	rqpair->rdma_reqs = spdk_zmalloc(rqpair->num_entries * sizeof(struct spdk_nvme_rdma_req), 0, NULL,
950 					 SPDK_ENV_NUMA_ID_ANY, SPDK_MALLOC_DMA);
951 	if (rqpair->rdma_reqs == NULL) {
952 		SPDK_ERRLOG("Failed to allocate rdma_reqs\n");
953 		goto fail;
954 	}
955 
956 	assert(!rqpair->cmds);
957 	rqpair->cmds = spdk_zmalloc(rqpair->num_entries * sizeof(*rqpair->cmds), 0, NULL,
958 				    SPDK_ENV_NUMA_ID_ANY, SPDK_MALLOC_DMA);
959 	if (!rqpair->cmds) {
960 		SPDK_ERRLOG("Failed to allocate RDMA cmds\n");
961 		goto fail;
962 	}
963 
964 	TAILQ_INIT(&rqpair->free_reqs);
965 	TAILQ_INIT(&rqpair->outstanding_reqs);
966 	for (i = 0; i < rqpair->num_entries; i++) {
967 		struct spdk_nvme_rdma_req	*rdma_req;
968 		struct spdk_nvmf_cmd		*cmd;
969 
970 		rdma_req = &rqpair->rdma_reqs[i];
971 		rdma_req->rdma_wr.type = RDMA_WR_TYPE_SEND;
972 		cmd = &rqpair->cmds[i];
973 
974 		rdma_req->id = i;
975 
976 		rc = spdk_rdma_utils_get_translation(rqpair->mr_map, cmd, sizeof(*cmd), &translation);
977 		if (rc) {
978 			goto fail;
979 		}
980 		rdma_req->send_sgl[0].lkey = spdk_rdma_utils_memory_translation_get_lkey(&translation);
981 
982 		/* The first RDMA sgl element will always point
983 		 * at this data structure. Depending on whether
984 		 * an NVMe-oF SGL is required, the length of
985 		 * this element may change. */
986 		rdma_req->send_sgl[0].addr = (uint64_t)cmd;
987 		rdma_req->send_wr.wr_id = (uint64_t)&rdma_req->rdma_wr;
988 		rdma_req->send_wr.next = NULL;
989 		rdma_req->send_wr.opcode = IBV_WR_SEND;
990 		rdma_req->send_wr.send_flags = IBV_SEND_SIGNALED;
991 		rdma_req->send_wr.sg_list = rdma_req->send_sgl;
992 		rdma_req->send_wr.imm_data = 0;
993 
994 		TAILQ_INSERT_TAIL(&rqpair->free_reqs, rdma_req, link);
995 	}
996 
997 	return 0;
998 fail:
999 	nvme_rdma_free_reqs(rqpair);
1000 	return -ENOMEM;
1001 }
1002 
1003 static int nvme_rdma_connect(struct nvme_rdma_qpair *rqpair);
1004 
1005 static int
1006 nvme_rdma_route_resolved(struct nvme_rdma_qpair *rqpair, int ret)
1007 {
1008 	if (ret) {
1009 		SPDK_ERRLOG("RDMA route resolution error\n");
1010 		return -1;
1011 	}
1012 
1013 	ret = nvme_rdma_qpair_init(rqpair);
1014 	if (ret < 0) {
1015 		SPDK_ERRLOG("nvme_rdma_qpair_init() failed\n");
1016 		return -1;
1017 	}
1018 
1019 	return nvme_rdma_connect(rqpair);
1020 }
1021 
1022 static int
1023 nvme_rdma_addr_resolved(struct nvme_rdma_qpair *rqpair, int ret)
1024 {
1025 	if (ret) {
1026 		SPDK_ERRLOG("RDMA address resolution error\n");
1027 		return -1;
1028 	}
1029 
1030 	if (rqpair->qpair.ctrlr->opts.transport_ack_timeout != SPDK_NVME_TRANSPORT_ACK_TIMEOUT_DISABLED) {
1031 #ifdef SPDK_CONFIG_RDMA_SET_ACK_TIMEOUT
1032 		uint8_t timeout = rqpair->qpair.ctrlr->opts.transport_ack_timeout;
1033 		ret = rdma_set_option(rqpair->cm_id, RDMA_OPTION_ID,
1034 				      RDMA_OPTION_ID_ACK_TIMEOUT,
1035 				      &timeout, sizeof(timeout));
1036 		if (ret) {
1037 			SPDK_NOTICELOG("Can't apply RDMA_OPTION_ID_ACK_TIMEOUT %d, ret %d\n", timeout, ret);
1038 		}
1039 #else
1040 		SPDK_DEBUGLOG(nvme, "transport_ack_timeout is not supported\n");
1041 #endif
1042 	}
1043 
1044 	if (rqpair->qpair.ctrlr->opts.transport_tos != SPDK_NVME_TRANSPORT_TOS_DISABLED) {
1045 #ifdef SPDK_CONFIG_RDMA_SET_TOS
1046 		uint8_t tos = rqpair->qpair.ctrlr->opts.transport_tos;
1047 		ret = rdma_set_option(rqpair->cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_TOS, &tos, sizeof(tos));
1048 		if (ret) {
1049 			SPDK_NOTICELOG("Can't apply RDMA_OPTION_ID_TOS %u, ret %d\n", tos, ret);
1050 		}
1051 #else
1052 		SPDK_DEBUGLOG(nvme, "transport_tos is not supported\n");
1053 #endif
1054 	}
1055 
1056 	ret = rdma_resolve_route(rqpair->cm_id, NVME_RDMA_TIME_OUT_IN_MS);
1057 	if (ret) {
1058 		SPDK_ERRLOG("rdma_resolve_route\n");
1059 		return ret;
1060 	}
1061 
1062 	return nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_ROUTE_RESOLVED,
1063 					     nvme_rdma_route_resolved);
1064 }
1065 
1066 static int
1067 nvme_rdma_resolve_addr(struct nvme_rdma_qpair *rqpair,
1068 		       struct sockaddr *src_addr,
1069 		       struct sockaddr *dst_addr)
1070 {
1071 	int ret;
1072 
1073 	if (src_addr) {
1074 		int reuse = 1;
1075 
1076 		ret = rdma_set_option(rqpair->cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
1077 				      &reuse, sizeof(reuse));
1078 		if (ret) {
1079 			SPDK_NOTICELOG("Can't apply RDMA_OPTION_ID_REUSEADDR %d, ret %d\n",
1080 				       reuse, ret);
1081 			/* It is likely that rdma_resolve_addr() returns -EADDRINUSE, but
1082 			 * we may missing something. We rely on rdma_resolve_addr().
1083 			 */
1084 		}
1085 	}
1086 
1087 	ret = rdma_resolve_addr(rqpair->cm_id, src_addr, dst_addr,
1088 				NVME_RDMA_TIME_OUT_IN_MS);
1089 	if (ret) {
1090 		SPDK_ERRLOG("rdma_resolve_addr, %d\n", errno);
1091 		return ret;
1092 	}
1093 
1094 	return nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_ADDR_RESOLVED,
1095 					     nvme_rdma_addr_resolved);
1096 }
1097 
1098 static int nvme_rdma_stale_conn_retry(struct nvme_rdma_qpair *rqpair);
1099 
1100 static int
1101 nvme_rdma_connect_established(struct nvme_rdma_qpair *rqpair, int ret)
1102 {
1103 	struct nvme_rdma_rsp_opts opts = {};
1104 
1105 	if (ret == -ESTALE) {
1106 		return nvme_rdma_stale_conn_retry(rqpair);
1107 	} else if (ret) {
1108 		SPDK_ERRLOG("RDMA connect error %d\n", ret);
1109 		return ret;
1110 	}
1111 
1112 	assert(!rqpair->mr_map);
1113 	rqpair->mr_map = spdk_rdma_utils_create_mem_map(rqpair->rdma_qp->qp->pd, &g_nvme_hooks,
1114 			 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE);
1115 	if (!rqpair->mr_map) {
1116 		SPDK_ERRLOG("Unable to register RDMA memory translation map\n");
1117 		return -1;
1118 	}
1119 
1120 	ret = nvme_rdma_create_reqs(rqpair);
1121 	SPDK_DEBUGLOG(nvme, "rc =%d\n", ret);
1122 	if (ret) {
1123 		SPDK_ERRLOG("Unable to create rqpair RDMA requests\n");
1124 		return -1;
1125 	}
1126 	SPDK_DEBUGLOG(nvme, "RDMA requests created\n");
1127 
1128 	if (!rqpair->srq) {
1129 		opts.num_entries = rqpair->num_entries;
1130 		opts.rqpair = rqpair;
1131 		opts.srq = NULL;
1132 		opts.mr_map = rqpair->mr_map;
1133 
1134 		assert(!rqpair->rsps);
1135 		rqpair->rsps = nvme_rdma_create_rsps(&opts);
1136 		if (!rqpair->rsps) {
1137 			SPDK_ERRLOG("Unable to create rqpair RDMA responses\n");
1138 			return -1;
1139 		}
1140 		SPDK_DEBUGLOG(nvme, "RDMA responses created\n");
1141 
1142 		ret = nvme_rdma_qpair_submit_recvs(rqpair);
1143 		SPDK_DEBUGLOG(nvme, "rc =%d\n", ret);
1144 		if (ret) {
1145 			SPDK_ERRLOG("Unable to submit rqpair RDMA responses\n");
1146 			return -1;
1147 		}
1148 		SPDK_DEBUGLOG(nvme, "RDMA responses submitted\n");
1149 	}
1150 
1151 	rqpair->state = NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_SEND;
1152 
1153 	return 0;
1154 }
1155 
1156 static int
1157 nvme_rdma_connect(struct nvme_rdma_qpair *rqpair)
1158 {
1159 	struct rdma_conn_param				param = {};
1160 	struct spdk_nvmf_rdma_request_private_data	request_data = {};
1161 	struct ibv_device_attr				attr;
1162 	int						ret;
1163 	struct spdk_nvme_ctrlr				*ctrlr;
1164 
1165 	ret = ibv_query_device(rqpair->cm_id->verbs, &attr);
1166 	if (ret != 0) {
1167 		SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
1168 		return ret;
1169 	}
1170 
1171 	param.responder_resources = attr.max_qp_rd_atom;
1172 
1173 	ctrlr = rqpair->qpair.ctrlr;
1174 	if (!ctrlr) {
1175 		return -1;
1176 	}
1177 
1178 	request_data.qid = rqpair->qpair.id;
1179 	request_data.hrqsize = rqpair->num_entries + 1;
1180 	request_data.hsqsize = rqpair->num_entries;
1181 	request_data.cntlid = ctrlr->cntlid;
1182 
1183 	param.private_data = &request_data;
1184 	param.private_data_len = sizeof(request_data);
1185 	param.retry_count = ctrlr->opts.transport_retry_count;
1186 	param.rnr_retry_count = 7;
1187 
1188 	/* Fields below are ignored by rdma cm if qpair has been
1189 	 * created using rdma cm API. */
1190 	param.srq = 0;
1191 	param.qp_num = rqpair->rdma_qp->qp->qp_num;
1192 
1193 	ret = rdma_connect(rqpair->cm_id, &param);
1194 	if (ret) {
1195 		SPDK_ERRLOG("nvme rdma connect error\n");
1196 		return ret;
1197 	}
1198 
1199 	ctrlr->numa.id_valid = 1;
1200 	ctrlr->numa.id = spdk_rdma_cm_id_get_numa_id(rqpair->cm_id);
1201 
1202 	return nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_ESTABLISHED,
1203 					     nvme_rdma_connect_established);
1204 }
1205 
1206 static int
1207 nvme_rdma_ctrlr_connect_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
1208 {
1209 	struct sockaddr_storage dst_addr;
1210 	struct sockaddr_storage src_addr;
1211 	bool src_addr_specified;
1212 	long int port, src_port = 0;
1213 	int rc;
1214 	struct nvme_rdma_ctrlr *rctrlr;
1215 	struct nvme_rdma_qpair *rqpair;
1216 	struct nvme_rdma_poll_group *group;
1217 	int family;
1218 
1219 	rqpair = nvme_rdma_qpair(qpair);
1220 	rctrlr = nvme_rdma_ctrlr(ctrlr);
1221 	assert(rctrlr != NULL);
1222 
1223 	switch (ctrlr->trid.adrfam) {
1224 	case SPDK_NVMF_ADRFAM_IPV4:
1225 		family = AF_INET;
1226 		break;
1227 	case SPDK_NVMF_ADRFAM_IPV6:
1228 		family = AF_INET6;
1229 		break;
1230 	default:
1231 		SPDK_ERRLOG("Unhandled ADRFAM %d\n", ctrlr->trid.adrfam);
1232 		return -1;
1233 	}
1234 
1235 	SPDK_DEBUGLOG(nvme, "adrfam %d ai_family %d\n", ctrlr->trid.adrfam, family);
1236 
1237 	memset(&dst_addr, 0, sizeof(dst_addr));
1238 
1239 	SPDK_DEBUGLOG(nvme, "trsvcid is %s\n", ctrlr->trid.trsvcid);
1240 	rc = nvme_parse_addr(&dst_addr, family, ctrlr->trid.traddr, ctrlr->trid.trsvcid, &port);
1241 	if (rc != 0) {
1242 		SPDK_ERRLOG("dst_addr nvme_parse_addr() failed\n");
1243 		return -1;
1244 	}
1245 
1246 	if (ctrlr->opts.src_addr[0] || ctrlr->opts.src_svcid[0]) {
1247 		memset(&src_addr, 0, sizeof(src_addr));
1248 		rc = nvme_parse_addr(&src_addr, family,
1249 				     ctrlr->opts.src_addr[0] ? ctrlr->opts.src_addr : NULL,
1250 				     ctrlr->opts.src_svcid[0] ? ctrlr->opts.src_svcid : NULL,
1251 				     &src_port);
1252 		if (rc != 0) {
1253 			SPDK_ERRLOG("src_addr nvme_parse_addr() failed\n");
1254 			return -1;
1255 		}
1256 		src_addr_specified = true;
1257 	} else {
1258 		src_addr_specified = false;
1259 	}
1260 
1261 	rc = rdma_create_id(rctrlr->cm_channel, &rqpair->cm_id, rqpair, RDMA_PS_TCP);
1262 	if (rc < 0) {
1263 		SPDK_ERRLOG("rdma_create_id() failed\n");
1264 		return -1;
1265 	}
1266 
1267 	rc = nvme_rdma_resolve_addr(rqpair,
1268 				    src_addr_specified ? (struct sockaddr *)&src_addr : NULL,
1269 				    (struct sockaddr *)&dst_addr);
1270 	if (rc < 0) {
1271 		SPDK_ERRLOG("nvme_rdma_resolve_addr() failed\n");
1272 		return -1;
1273 	}
1274 
1275 	rqpair->state = NVME_RDMA_QPAIR_STATE_INITIALIZING;
1276 
1277 	if (qpair->poll_group != NULL) {
1278 		group = nvme_rdma_poll_group(qpair->poll_group);
1279 		TAILQ_INSERT_TAIL(&group->connecting_qpairs, rqpair, link_connecting);
1280 	}
1281 
1282 	return 0;
1283 }
1284 
1285 static int
1286 nvme_rdma_stale_conn_reconnect(struct nvme_rdma_qpair *rqpair)
1287 {
1288 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
1289 
1290 	if (spdk_get_ticks() < rqpair->evt_timeout_ticks) {
1291 		return -EAGAIN;
1292 	}
1293 
1294 	return nvme_rdma_ctrlr_connect_qpair(qpair->ctrlr, qpair);
1295 }
1296 
1297 static int
1298 nvme_rdma_ctrlr_connect_qpair_poll(struct spdk_nvme_ctrlr *ctrlr,
1299 				   struct spdk_nvme_qpair *qpair)
1300 {
1301 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
1302 	int rc;
1303 
1304 	if (rqpair->in_connect_poll) {
1305 		return -EAGAIN;
1306 	}
1307 
1308 	rqpair->in_connect_poll = true;
1309 
1310 	switch (rqpair->state) {
1311 	case NVME_RDMA_QPAIR_STATE_INVALID:
1312 		rc = -EAGAIN;
1313 		break;
1314 
1315 	case NVME_RDMA_QPAIR_STATE_INITIALIZING:
1316 	case NVME_RDMA_QPAIR_STATE_EXITING:
1317 		if (!nvme_qpair_is_admin_queue(qpair)) {
1318 			nvme_ctrlr_lock(ctrlr);
1319 		}
1320 
1321 		rc = nvme_rdma_process_event_poll(rqpair);
1322 
1323 		if (!nvme_qpair_is_admin_queue(qpair)) {
1324 			nvme_ctrlr_unlock(ctrlr);
1325 		}
1326 
1327 		if (rc == 0) {
1328 			rc = -EAGAIN;
1329 		}
1330 		rqpair->in_connect_poll = false;
1331 
1332 		return rc;
1333 
1334 	case NVME_RDMA_QPAIR_STATE_STALE_CONN:
1335 		rc = nvme_rdma_stale_conn_reconnect(rqpair);
1336 		if (rc == 0) {
1337 			rc = -EAGAIN;
1338 		}
1339 		break;
1340 	case NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_SEND:
1341 		rc = nvme_fabric_qpair_connect_async(qpair, rqpair->num_entries + 1);
1342 		if (rc == 0) {
1343 			rqpair->state = NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_POLL;
1344 			rc = -EAGAIN;
1345 		} else {
1346 			SPDK_ERRLOG("Failed to send an NVMe-oF Fabric CONNECT command\n");
1347 		}
1348 		break;
1349 	case NVME_RDMA_QPAIR_STATE_FABRIC_CONNECT_POLL:
1350 		rc = nvme_fabric_qpair_connect_poll(qpair);
1351 		if (rc == 0) {
1352 			if (nvme_fabric_qpair_auth_required(qpair)) {
1353 				rc = nvme_fabric_qpair_authenticate_async(qpair);
1354 				if (rc == 0) {
1355 					rqpair->state = NVME_RDMA_QPAIR_STATE_AUTHENTICATING;
1356 					rc = -EAGAIN;
1357 				}
1358 			} else {
1359 				rqpair->state = NVME_RDMA_QPAIR_STATE_RUNNING;
1360 				nvme_qpair_set_state(qpair, NVME_QPAIR_CONNECTED);
1361 			}
1362 		} else if (rc != -EAGAIN) {
1363 			SPDK_ERRLOG("Failed to poll NVMe-oF Fabric CONNECT command\n");
1364 		}
1365 		break;
1366 	case NVME_RDMA_QPAIR_STATE_AUTHENTICATING:
1367 		rc = nvme_fabric_qpair_authenticate_poll(qpair);
1368 		if (rc == 0) {
1369 			rqpair->state = NVME_RDMA_QPAIR_STATE_RUNNING;
1370 			nvme_qpair_set_state(qpair, NVME_QPAIR_CONNECTED);
1371 		}
1372 		break;
1373 	case NVME_RDMA_QPAIR_STATE_RUNNING:
1374 		rc = 0;
1375 		break;
1376 	default:
1377 		assert(false);
1378 		rc = -EINVAL;
1379 		break;
1380 	}
1381 
1382 	rqpair->in_connect_poll = false;
1383 
1384 	return rc;
1385 }
1386 
1387 static inline int
1388 nvme_rdma_get_memory_translation(struct nvme_request *req, struct nvme_rdma_qpair *rqpair,
1389 				 struct nvme_rdma_memory_translation_ctx *_ctx)
1390 {
1391 	struct spdk_memory_domain_translation_ctx ctx;
1392 	struct spdk_memory_domain_translation_result dma_translation = {.iov_count = 0};
1393 	struct spdk_rdma_utils_memory_translation rdma_translation;
1394 	int rc;
1395 
1396 	assert(req);
1397 	assert(rqpair);
1398 	assert(_ctx);
1399 
1400 	if (req->payload.opts && req->payload.opts->memory_domain) {
1401 		ctx.size = sizeof(struct spdk_memory_domain_translation_ctx);
1402 		ctx.rdma.ibv_qp = rqpair->rdma_qp->qp;
1403 		dma_translation.size = sizeof(struct spdk_memory_domain_translation_result);
1404 
1405 		rc = spdk_memory_domain_translate_data(req->payload.opts->memory_domain,
1406 						       req->payload.opts->memory_domain_ctx,
1407 						       rqpair->rdma_qp->domain, &ctx, _ctx->addr,
1408 						       _ctx->length, &dma_translation);
1409 		if (spdk_unlikely(rc) || dma_translation.iov_count != 1) {
1410 			SPDK_ERRLOG("DMA memory translation failed, rc %d, iov count %u\n", rc, dma_translation.iov_count);
1411 			return rc;
1412 		}
1413 
1414 		_ctx->lkey = dma_translation.rdma.lkey;
1415 		_ctx->rkey = dma_translation.rdma.rkey;
1416 		_ctx->addr = dma_translation.iov.iov_base;
1417 		_ctx->length = dma_translation.iov.iov_len;
1418 	} else {
1419 		rc = spdk_rdma_utils_get_translation(rqpair->mr_map, _ctx->addr, _ctx->length, &rdma_translation);
1420 		if (spdk_unlikely(rc)) {
1421 			SPDK_ERRLOG("RDMA memory translation failed, rc %d\n", rc);
1422 			return rc;
1423 		}
1424 		if (rdma_translation.translation_type == SPDK_RDMA_UTILS_TRANSLATION_MR) {
1425 			_ctx->lkey = rdma_translation.mr_or_key.mr->lkey;
1426 			_ctx->rkey = rdma_translation.mr_or_key.mr->rkey;
1427 		} else {
1428 			_ctx->lkey = _ctx->rkey = (uint32_t)rdma_translation.mr_or_key.key;
1429 		}
1430 	}
1431 
1432 	return 0;
1433 }
1434 
1435 
1436 /*
1437  * Build SGL describing empty payload.
1438  */
1439 static int
1440 nvme_rdma_build_null_request(struct spdk_nvme_rdma_req *rdma_req)
1441 {
1442 	struct nvme_request *req = rdma_req->req;
1443 
1444 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1445 
1446 	/* The first element of this SGL is pointing at an
1447 	 * spdk_nvmf_cmd object. For this particular command,
1448 	 * we only need the first 64 bytes corresponding to
1449 	 * the NVMe command. */
1450 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1451 
1452 	/* The RDMA SGL needs one element describing the NVMe command. */
1453 	rdma_req->send_wr.num_sge = 1;
1454 
1455 	req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1456 	req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1457 	req->cmd.dptr.sgl1.keyed.length = 0;
1458 	req->cmd.dptr.sgl1.keyed.key = 0;
1459 	req->cmd.dptr.sgl1.address = 0;
1460 
1461 	return 0;
1462 }
1463 
1464 static inline void
1465 nvme_rdma_configure_contig_inline_request(struct spdk_nvme_rdma_req *rdma_req,
1466 		struct nvme_request *req, struct nvme_rdma_memory_translation_ctx *ctx)
1467 {
1468 	rdma_req->send_sgl[1].lkey = ctx->lkey;
1469 
1470 	/* The first element of this SGL is pointing at an
1471 	 * spdk_nvmf_cmd object. For this particular command,
1472 	 * we only need the first 64 bytes corresponding to
1473 	 * the NVMe command. */
1474 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1475 
1476 	rdma_req->send_sgl[1].addr = (uint64_t)ctx->addr;
1477 	rdma_req->send_sgl[1].length = (uint32_t)ctx->length;
1478 
1479 	/* The RDMA SGL contains two elements. The first describes
1480 	 * the NVMe command and the second describes the data
1481 	 * payload. */
1482 	rdma_req->send_wr.num_sge = 2;
1483 
1484 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1485 	req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
1486 	req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1487 	req->cmd.dptr.sgl1.unkeyed.length = (uint32_t)ctx->length;
1488 	/* Inline only supported for icdoff == 0 currently.  This function will
1489 	 * not get called for controllers with other values. */
1490 	req->cmd.dptr.sgl1.address = (uint64_t)0;
1491 }
1492 
1493 /*
1494  * Build inline SGL describing contiguous payload buffer.
1495  */
1496 static inline int
1497 nvme_rdma_build_contig_inline_request(struct nvme_rdma_qpair *rqpair,
1498 				      struct spdk_nvme_rdma_req *rdma_req)
1499 {
1500 	struct nvme_request *req = rdma_req->req;
1501 	struct nvme_rdma_memory_translation_ctx ctx = {
1502 		.addr = (uint8_t *)req->payload.contig_or_cb_arg + req->payload_offset,
1503 		.length = req->payload_size
1504 	};
1505 	int rc;
1506 
1507 	assert(ctx.length != 0);
1508 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
1509 
1510 	rc = nvme_rdma_get_memory_translation(req, rqpair, &ctx);
1511 	if (spdk_unlikely(rc)) {
1512 		return -1;
1513 	}
1514 
1515 	nvme_rdma_configure_contig_inline_request(rdma_req, req, &ctx);
1516 
1517 	return 0;
1518 }
1519 
1520 static inline void
1521 nvme_rdma_configure_contig_request(struct spdk_nvme_rdma_req *rdma_req, struct nvme_request *req,
1522 				   struct nvme_rdma_memory_translation_ctx *ctx)
1523 {
1524 	req->cmd.dptr.sgl1.keyed.key = ctx->rkey;
1525 
1526 	/* The first element of this SGL is pointing at an
1527 	 * spdk_nvmf_cmd object. For this particular command,
1528 	 * we only need the first 64 bytes corresponding to
1529 	 * the NVMe command. */
1530 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1531 
1532 	/* The RDMA SGL needs one element describing the NVMe command. */
1533 	rdma_req->send_wr.num_sge = 1;
1534 
1535 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1536 	req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1537 	req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1538 	req->cmd.dptr.sgl1.keyed.length = (uint32_t)ctx->length;
1539 	req->cmd.dptr.sgl1.address = (uint64_t)ctx->addr;
1540 }
1541 
1542 /*
1543  * Build SGL describing contiguous payload buffer.
1544  */
1545 static inline int
1546 nvme_rdma_build_contig_request(struct nvme_rdma_qpair *rqpair,
1547 			       struct spdk_nvme_rdma_req *rdma_req)
1548 {
1549 	struct nvme_request *req = rdma_req->req;
1550 	struct nvme_rdma_memory_translation_ctx ctx = {
1551 		.addr = (uint8_t *)req->payload.contig_or_cb_arg + req->payload_offset,
1552 		.length = req->payload_size
1553 	};
1554 	int rc;
1555 
1556 	assert(req->payload_size != 0);
1557 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
1558 
1559 	if (spdk_unlikely(req->payload_size > NVME_RDMA_MAX_KEYED_SGL_LENGTH)) {
1560 		SPDK_ERRLOG("SGL length %u exceeds max keyed SGL block size %u\n",
1561 			    req->payload_size, NVME_RDMA_MAX_KEYED_SGL_LENGTH);
1562 		return -1;
1563 	}
1564 
1565 	rc = nvme_rdma_get_memory_translation(req, rqpair, &ctx);
1566 	if (spdk_unlikely(rc)) {
1567 		return -1;
1568 	}
1569 
1570 	nvme_rdma_configure_contig_request(rdma_req, req, &ctx);
1571 
1572 	return 0;
1573 }
1574 
1575 /*
1576  * Build SGL describing scattered payload buffer.
1577  */
1578 static inline int
1579 nvme_rdma_build_sgl_request(struct nvme_rdma_qpair *rqpair,
1580 			    struct spdk_nvme_rdma_req *rdma_req)
1581 {
1582 	struct nvme_request *req = rdma_req->req;
1583 	struct spdk_nvmf_cmd *cmd = &rqpair->cmds[rdma_req->id];
1584 	struct nvme_rdma_memory_translation_ctx ctx;
1585 	uint32_t remaining_size;
1586 	uint32_t sge_length;
1587 	int rc, max_num_sgl, num_sgl_desc;
1588 
1589 	assert(req->payload_size != 0);
1590 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
1591 	assert(req->payload.reset_sgl_fn != NULL);
1592 	assert(req->payload.next_sge_fn != NULL);
1593 	req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
1594 
1595 	max_num_sgl = req->qpair->ctrlr->max_sges;
1596 
1597 	remaining_size = req->payload_size;
1598 	num_sgl_desc = 0;
1599 	do {
1600 		rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &ctx.addr, &sge_length);
1601 		if (spdk_unlikely(rc)) {
1602 			return -1;
1603 		}
1604 
1605 		sge_length = spdk_min(remaining_size, sge_length);
1606 
1607 		if (spdk_unlikely(sge_length > NVME_RDMA_MAX_KEYED_SGL_LENGTH)) {
1608 			SPDK_ERRLOG("SGL length %u exceeds max keyed SGL block size %u\n",
1609 				    sge_length, NVME_RDMA_MAX_KEYED_SGL_LENGTH);
1610 			return -1;
1611 		}
1612 		ctx.length = sge_length;
1613 		rc = nvme_rdma_get_memory_translation(req, rqpair, &ctx);
1614 		if (spdk_unlikely(rc)) {
1615 			return -1;
1616 		}
1617 
1618 		cmd->sgl[num_sgl_desc].keyed.key = ctx.rkey;
1619 		cmd->sgl[num_sgl_desc].keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1620 		cmd->sgl[num_sgl_desc].keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1621 		cmd->sgl[num_sgl_desc].keyed.length = (uint32_t)ctx.length;
1622 		cmd->sgl[num_sgl_desc].address = (uint64_t)ctx.addr;
1623 
1624 		remaining_size -= ctx.length;
1625 		num_sgl_desc++;
1626 	} while (remaining_size > 0 && num_sgl_desc < max_num_sgl);
1627 
1628 
1629 	/* Should be impossible if we did our sgl checks properly up the stack, but do a sanity check here. */
1630 	if (spdk_unlikely(remaining_size > 0)) {
1631 		return -1;
1632 	}
1633 
1634 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1635 
1636 	/* The RDMA SGL needs one element describing some portion
1637 	 * of the spdk_nvmf_cmd structure. */
1638 	rdma_req->send_wr.num_sge = 1;
1639 
1640 	/*
1641 	 * If only one SGL descriptor is required, it can be embedded directly in the command
1642 	 * as a data block descriptor.
1643 	 */
1644 	if (num_sgl_desc == 1) {
1645 		/* The first element of this SGL is pointing at an
1646 		 * spdk_nvmf_cmd object. For this particular command,
1647 		 * we only need the first 64 bytes corresponding to
1648 		 * the NVMe command. */
1649 		rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1650 
1651 		req->cmd.dptr.sgl1.keyed.type = cmd->sgl[0].keyed.type;
1652 		req->cmd.dptr.sgl1.keyed.subtype = cmd->sgl[0].keyed.subtype;
1653 		req->cmd.dptr.sgl1.keyed.length = cmd->sgl[0].keyed.length;
1654 		req->cmd.dptr.sgl1.keyed.key = cmd->sgl[0].keyed.key;
1655 		req->cmd.dptr.sgl1.address = cmd->sgl[0].address;
1656 	} else {
1657 		/*
1658 		 * Otherwise, The SGL descriptor embedded in the command must point to the list of
1659 		 * SGL descriptors used to describe the operation. In that case it is a last segment descriptor.
1660 		 */
1661 		uint32_t descriptors_size = sizeof(struct spdk_nvme_sgl_descriptor) * num_sgl_desc;
1662 
1663 		if (spdk_unlikely(descriptors_size > rqpair->qpair.ctrlr->ioccsz_bytes)) {
1664 			SPDK_ERRLOG("Size of SGL descriptors (%u) exceeds ICD (%u)\n",
1665 				    descriptors_size, rqpair->qpair.ctrlr->ioccsz_bytes);
1666 			return -1;
1667 		}
1668 		rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd) + descriptors_size;
1669 
1670 		req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_LAST_SEGMENT;
1671 		req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1672 		req->cmd.dptr.sgl1.unkeyed.length = descriptors_size;
1673 		req->cmd.dptr.sgl1.address = (uint64_t)0;
1674 	}
1675 
1676 	return 0;
1677 }
1678 
1679 /*
1680  * Build inline SGL describing sgl payload buffer.
1681  */
1682 static inline int
1683 nvme_rdma_build_sgl_inline_request(struct nvme_rdma_qpair *rqpair,
1684 				   struct spdk_nvme_rdma_req *rdma_req)
1685 {
1686 	struct nvme_request *req = rdma_req->req;
1687 	struct nvme_rdma_memory_translation_ctx ctx;
1688 	uint32_t length;
1689 	int rc;
1690 
1691 	assert(req->payload_size != 0);
1692 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
1693 	assert(req->payload.reset_sgl_fn != NULL);
1694 	assert(req->payload.next_sge_fn != NULL);
1695 	req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
1696 
1697 	rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &ctx.addr, &length);
1698 	if (spdk_unlikely(rc)) {
1699 		return -1;
1700 	}
1701 
1702 	if (length < req->payload_size) {
1703 		SPDK_DEBUGLOG(nvme, "Inline SGL request split so sending separately.\n");
1704 		return nvme_rdma_build_sgl_request(rqpair, rdma_req);
1705 	}
1706 
1707 	if (length > req->payload_size) {
1708 		length = req->payload_size;
1709 	}
1710 
1711 	ctx.length = length;
1712 	rc = nvme_rdma_get_memory_translation(req, rqpair, &ctx);
1713 	if (spdk_unlikely(rc)) {
1714 		return -1;
1715 	}
1716 
1717 	rdma_req->send_sgl[1].addr = (uint64_t)ctx.addr;
1718 	rdma_req->send_sgl[1].length = (uint32_t)ctx.length;
1719 	rdma_req->send_sgl[1].lkey = ctx.lkey;
1720 
1721 	rdma_req->send_wr.num_sge = 2;
1722 
1723 	/* The first element of this SGL is pointing at an
1724 	 * spdk_nvmf_cmd object. For this particular command,
1725 	 * we only need the first 64 bytes corresponding to
1726 	 * the NVMe command. */
1727 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1728 
1729 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1730 	req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
1731 	req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1732 	req->cmd.dptr.sgl1.unkeyed.length = (uint32_t)ctx.length;
1733 	/* Inline only supported for icdoff == 0 currently.  This function will
1734 	 * not get called for controllers with other values. */
1735 	req->cmd.dptr.sgl1.address = (uint64_t)0;
1736 
1737 	return 0;
1738 }
1739 
1740 static inline int
1741 nvme_rdma_accel_append_copy(struct spdk_nvme_poll_group *pg, void **seq,
1742 			    struct spdk_memory_domain *rdma_domain, struct spdk_nvme_rdma_req *rdma_req,
1743 			    struct iovec *iovs, uint32_t iovcnt,
1744 			    struct spdk_memory_domain *src_domain, void *src_domain_ctx)
1745 {
1746 	return pg->accel_fn_table.append_copy(pg->ctx, seq, iovs, iovcnt, rdma_domain, rdma_req, iovs,
1747 					      iovcnt, src_domain, src_domain_ctx, NULL, NULL);
1748 }
1749 
1750 static inline void
1751 nvme_rdma_accel_reverse(struct spdk_nvme_poll_group *pg, void *seq)
1752 {
1753 	pg->accel_fn_table.reverse_sequence(seq);
1754 }
1755 
1756 static inline void
1757 nvme_rdma_accel_finish(struct spdk_nvme_poll_group *pg, void *seq,
1758 		       spdk_nvme_accel_completion_cb cb_fn, void *cb_arg)
1759 {
1760 	pg->accel_fn_table.finish_sequence(seq, cb_fn, cb_arg);
1761 }
1762 
1763 static inline void
1764 nvme_rdma_accel_completion_cb(void *cb_arg, int status)
1765 {
1766 	struct spdk_nvme_rdma_req *rdma_req = cb_arg;
1767 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(rdma_req->req->qpair);
1768 	struct spdk_nvme_cpl cpl;
1769 	enum spdk_nvme_generic_command_status_code sc;
1770 	uint16_t dnr = 0;
1771 
1772 	rdma_req->in_progress_accel = 0;
1773 	rdma_req->req->accel_sequence = NULL;
1774 	SPDK_DEBUGLOG(nvme, "rdma_req %p qpair %p, accel completion rc %d\n", rdma_req, rqpair, status);
1775 
1776 	/* nvme_rdma driver may fail data transfer on WC_FLUSH error completion which is expected.
1777 	 * To prevent false errors from accel, first check if qpair is in the process of disconnect */
1778 	if (spdk_unlikely(!spdk_nvme_qpair_is_connected(&rqpair->qpair))) {
1779 		SPDK_DEBUGLOG(nvme, "qpair %p, req %p accel cpl in disconnecting, outstanding %u\n",
1780 			      rqpair, rdma_req, rqpair->qpair.num_outstanding_reqs);
1781 		sc = SPDK_NVME_SC_ABORTED_SQ_DELETION;
1782 		goto fail_req;
1783 	}
1784 	if (spdk_unlikely(status)) {
1785 		SPDK_ERRLOG("qpair %p, req %p, accel sequence status %d\n", rdma_req->req->qpair, rdma_req, status);
1786 		sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1787 		/* Something wrong happened, let the upper layer know that retry is no desired */
1788 		dnr = 1;
1789 		goto fail_req;
1790 	}
1791 
1792 	nvme_rdma_req_complete(rdma_req, &rdma_req->rdma_rsp->cpl, true);
1793 	return;
1794 
1795 fail_req:
1796 	memset(&cpl, 0, sizeof(cpl));
1797 	cpl.status.sc = sc;
1798 	cpl.status.sct = SPDK_NVME_SCT_GENERIC;
1799 	cpl.status.dnr = dnr;
1800 	nvme_rdma_req_complete(rdma_req, &cpl, true);
1801 }
1802 
1803 static inline int
1804 nvme_rdma_apply_accel_sequence(struct nvme_rdma_qpair *rqpair, struct nvme_request *req,
1805 			       struct spdk_nvme_rdma_req *rdma_req)
1806 {
1807 	struct spdk_nvme_poll_group *pg = rqpair->qpair.poll_group->group;
1808 	struct spdk_memory_domain *src_domain;
1809 	void *src_domain_ctx;
1810 	void *accel_seq = req->accel_sequence;
1811 	uint32_t iovcnt = 0;
1812 	int rc;
1813 
1814 	SPDK_DEBUGLOG(nvme, "req %p, start accel seq %p\n", rdma_req, accel_seq);
1815 	if (nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL) {
1816 		void *addr;
1817 		uint32_t sge_length, payload_size;
1818 
1819 		payload_size = req->payload_size;
1820 		assert(payload_size);
1821 		req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
1822 		do {
1823 			rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &addr, &sge_length);
1824 			if (spdk_unlikely(rc)) {
1825 				return -1;
1826 			}
1827 			sge_length = spdk_min(payload_size, sge_length);
1828 			rdma_req->iovs[iovcnt].iov_base = addr;
1829 			rdma_req->iovs[iovcnt].iov_len = sge_length;
1830 			iovcnt++;
1831 			payload_size -= sge_length;
1832 		} while (payload_size && iovcnt < NVME_RDMA_MAX_SGL_DESCRIPTORS);
1833 
1834 		if (spdk_unlikely(payload_size)) {
1835 			SPDK_ERRLOG("not enough iovs to handle req %p, remaining len %u\n", rdma_req, payload_size);
1836 			return -E2BIG;
1837 		}
1838 	} else {
1839 		rdma_req->iovs[iovcnt].iov_base = req->payload.contig_or_cb_arg;
1840 		rdma_req->iovs[iovcnt].iov_len = req->payload_size;
1841 		iovcnt = 1;
1842 	}
1843 	if (req->payload.opts && req->payload.opts->memory_domain) {
1844 		if (accel_seq) {
1845 			src_domain = rqpair->rdma_qp->domain;
1846 			src_domain_ctx = rdma_req;
1847 		} else {
1848 			src_domain = req->payload.opts->memory_domain;
1849 			src_domain_ctx = req->payload.opts->memory_domain_ctx;
1850 		}
1851 	} else {
1852 		src_domain = NULL;
1853 		src_domain_ctx = NULL;
1854 	}
1855 
1856 	rc = nvme_rdma_accel_append_copy(pg, &accel_seq, rqpair->rdma_qp->domain, rdma_req, rdma_req->iovs,
1857 					 iovcnt, src_domain, src_domain_ctx);
1858 	if (spdk_unlikely(rc)) {
1859 		return rc;
1860 	}
1861 
1862 	if (spdk_nvme_opc_get_data_transfer(req->cmd.opc) == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
1863 		nvme_rdma_accel_reverse(pg, accel_seq);
1864 	}
1865 
1866 	rdma_req->in_progress_accel = 1;
1867 	TAILQ_INSERT_TAIL(&rqpair->outstanding_reqs, rdma_req, link);
1868 	rqpair->num_outstanding_reqs++;
1869 
1870 	SPDK_DEBUGLOG(nvme, "req %p, finish accel seq %p\n", rdma_req, accel_seq);
1871 	nvme_rdma_accel_finish(pg, accel_seq, nvme_rdma_accel_completion_cb, rdma_req);
1872 
1873 	return 0;
1874 }
1875 
1876 static inline int
1877 nvme_rdma_memory_domain_transfer_data(struct spdk_memory_domain *dst_domain, void *dst_domain_ctx,
1878 				      struct iovec *dst_iov, uint32_t dst_iovcnt,
1879 				      struct spdk_memory_domain *src_domain, void *src_domain_ctx,
1880 				      struct iovec *src_iov, uint32_t src_iovcnt,
1881 				      struct spdk_memory_domain_translation_result *translation,
1882 				      spdk_memory_domain_data_cpl_cb cpl_cb, void *cpl_cb_arg)
1883 {
1884 	struct nvme_rdma_memory_translation_ctx ctx;
1885 	struct spdk_nvme_rdma_req *rdma_req = dst_domain_ctx;
1886 	struct nvme_request *req = rdma_req->req;
1887 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(rdma_req->req->qpair);
1888 	struct spdk_nvme_ctrlr *ctrlr = rqpair->qpair.ctrlr;
1889 	bool icd_supported;
1890 
1891 	assert(dst_domain == rqpair->rdma_qp->domain);
1892 	assert(src_domain);
1893 	assert(spdk_memory_domain_get_dma_device_type(src_domain) == SPDK_DMA_DEVICE_TYPE_RDMA);
1894 	/* We expect "inplace" operation */
1895 	assert(dst_iov == src_iov);
1896 	assert(dst_iovcnt == src_iovcnt);
1897 
1898 	if (spdk_unlikely(!src_domain ||
1899 			  spdk_memory_domain_get_dma_device_type(src_domain) != SPDK_DMA_DEVICE_TYPE_RDMA)) {
1900 		SPDK_ERRLOG("Unexpected source memory domain %p, type %d\n", src_domain,
1901 			    src_domain ? (int)spdk_memory_domain_get_dma_device_type(src_domain) : -1);
1902 		return -ENOTSUP;
1903 	}
1904 	if (spdk_unlikely(dst_iovcnt != 1 || !translation || translation->iov_count != 1)) {
1905 		SPDK_ERRLOG("Unexpected iovcnt %u or missed translation, rdma_req %p\n", dst_iovcnt, rdma_req);
1906 		return -ENOTSUP;
1907 	}
1908 	ctx.addr = translation->iov.iov_base;
1909 	ctx.length = translation->iov.iov_len;
1910 	ctx.lkey = translation->rdma.lkey;
1911 	ctx.rkey = translation->rdma.rkey;
1912 
1913 	SPDK_DEBUGLOG(nvme, "req %p, addr %p, len %zu, key %u\n", rdma_req, ctx.addr, ctx.length, ctx.rkey);
1914 	icd_supported = spdk_nvme_opc_get_data_transfer(req->cmd.opc) == SPDK_NVME_DATA_HOST_TO_CONTROLLER
1915 			&& req->payload_size <= ctrlr->ioccsz_bytes && ctrlr->icdoff == 0;
1916 
1917 	/* We expect that result of accel sequence is a Memory Key which describes a virtually contig address space.
1918 	 * That means we prepare a contig request even if original payload was scattered */
1919 	if (icd_supported) {
1920 		nvme_rdma_configure_contig_inline_request(rdma_req, req, &ctx);
1921 	} else {
1922 		nvme_rdma_configure_contig_request(rdma_req, req, &ctx);
1923 	}
1924 	rdma_req->transfer_cpl_cb = cpl_cb;
1925 	rdma_req->transfer_cpl_cb_arg = cpl_cb_arg;
1926 
1927 	memcpy(&rqpair->cmds[rdma_req->id], &req->cmd, sizeof(req->cmd));
1928 
1929 	return _nvme_rdma_qpair_submit_request(rqpair, rdma_req);
1930 }
1931 
1932 static inline int
1933 nvme_rdma_req_init(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
1934 {
1935 	struct nvme_request *req = rdma_req->req;
1936 	struct spdk_nvme_ctrlr *ctrlr = rqpair->qpair.ctrlr;
1937 	enum nvme_payload_type payload_type;
1938 	bool icd_supported;
1939 	int rc = -1;
1940 
1941 	payload_type = nvme_payload_type(&req->payload);
1942 	/*
1943 	 * Check if icdoff is non zero, to avoid interop conflicts with
1944 	 * targets with non-zero icdoff.  Both SPDK and the Linux kernel
1945 	 * targets use icdoff = 0.  For targets with non-zero icdoff, we
1946 	 * will currently just not use inline data for now.
1947 	 */
1948 	icd_supported = spdk_nvme_opc_get_data_transfer(req->cmd.opc) == SPDK_NVME_DATA_HOST_TO_CONTROLLER
1949 			&& req->payload_size <= ctrlr->ioccsz_bytes && ctrlr->icdoff == 0;
1950 
1951 	if (spdk_unlikely(req->payload_size == 0)) {
1952 		rc = nvme_rdma_build_null_request(rdma_req);
1953 	} else if (payload_type == NVME_PAYLOAD_TYPE_CONTIG) {
1954 		if (icd_supported) {
1955 			rc = nvme_rdma_build_contig_inline_request(rqpair, rdma_req);
1956 		} else {
1957 			rc = nvme_rdma_build_contig_request(rqpair, rdma_req);
1958 		}
1959 	} else if (payload_type == NVME_PAYLOAD_TYPE_SGL) {
1960 		if (icd_supported) {
1961 			rc = nvme_rdma_build_sgl_inline_request(rqpair, rdma_req);
1962 		} else {
1963 			rc = nvme_rdma_build_sgl_request(rqpair, rdma_req);
1964 		}
1965 	}
1966 
1967 	if (spdk_unlikely(rc)) {
1968 		return rc;
1969 	}
1970 
1971 	memcpy(&rqpair->cmds[rdma_req->id], &req->cmd, sizeof(req->cmd));
1972 	return 0;
1973 }
1974 
1975 static struct spdk_nvme_qpair *
1976 nvme_rdma_ctrlr_create_qpair(struct spdk_nvme_ctrlr *ctrlr,
1977 			     uint16_t qid, uint32_t qsize,
1978 			     enum spdk_nvme_qprio qprio,
1979 			     uint32_t num_requests,
1980 			     bool delay_cmd_submit,
1981 			     bool async)
1982 {
1983 	struct nvme_rdma_qpair *rqpair;
1984 	struct spdk_nvme_qpair *qpair;
1985 	int rc;
1986 
1987 	if (qsize < SPDK_NVME_QUEUE_MIN_ENTRIES) {
1988 		SPDK_ERRLOG("Failed to create qpair with size %u. Minimum queue size is %d.\n",
1989 			    qsize, SPDK_NVME_QUEUE_MIN_ENTRIES);
1990 		return NULL;
1991 	}
1992 
1993 	rqpair = spdk_zmalloc(sizeof(struct nvme_rdma_qpair), 0, NULL, SPDK_ENV_NUMA_ID_ANY,
1994 			      SPDK_MALLOC_DMA);
1995 	if (!rqpair) {
1996 		SPDK_ERRLOG("failed to get create rqpair\n");
1997 		return NULL;
1998 	}
1999 
2000 	/* Set num_entries one less than queue size. According to NVMe
2001 	 * and NVMe-oF specs we can not submit queue size requests,
2002 	 * one slot shall always remain empty.
2003 	 */
2004 	rqpair->num_entries = qsize - 1;
2005 	rqpair->delay_cmd_submit = delay_cmd_submit;
2006 	rqpair->state = NVME_RDMA_QPAIR_STATE_INVALID;
2007 	qpair = &rqpair->qpair;
2008 	rc = nvme_qpair_init(qpair, qid, ctrlr, qprio, num_requests, async);
2009 	if (rc != 0) {
2010 		spdk_free(rqpair);
2011 		return NULL;
2012 	}
2013 
2014 	return qpair;
2015 }
2016 
2017 static void
2018 nvme_rdma_qpair_destroy(struct nvme_rdma_qpair *rqpair)
2019 {
2020 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
2021 	struct nvme_rdma_ctrlr *rctrlr;
2022 	struct nvme_rdma_cm_event_entry *entry, *tmp;
2023 
2024 	spdk_rdma_utils_free_mem_map(&rqpair->mr_map);
2025 
2026 	if (rqpair->evt) {
2027 		rdma_ack_cm_event(rqpair->evt);
2028 		rqpair->evt = NULL;
2029 	}
2030 
2031 	/*
2032 	 * This works because we have the controller lock both in
2033 	 * this function and in the function where we add new events.
2034 	 */
2035 	if (qpair->ctrlr != NULL) {
2036 		rctrlr = nvme_rdma_ctrlr(qpair->ctrlr);
2037 		STAILQ_FOREACH_SAFE(entry, &rctrlr->pending_cm_events, link, tmp) {
2038 			if (entry->evt->id->context == rqpair) {
2039 				STAILQ_REMOVE(&rctrlr->pending_cm_events, entry, nvme_rdma_cm_event_entry, link);
2040 				rdma_ack_cm_event(entry->evt);
2041 				STAILQ_INSERT_HEAD(&rctrlr->free_cm_events, entry, link);
2042 			}
2043 		}
2044 	}
2045 
2046 	if (rqpair->cm_id) {
2047 		if (rqpair->rdma_qp) {
2048 			spdk_rdma_utils_put_pd(rqpair->rdma_qp->qp->pd);
2049 			spdk_rdma_provider_qp_destroy(rqpair->rdma_qp);
2050 			rqpair->rdma_qp = NULL;
2051 		}
2052 	}
2053 
2054 	if (rqpair->poller) {
2055 		struct nvme_rdma_poll_group     *group;
2056 
2057 		assert(qpair->poll_group);
2058 		group = nvme_rdma_poll_group(qpair->poll_group);
2059 
2060 		nvme_rdma_poll_group_put_poller(group, rqpair->poller);
2061 
2062 		rqpair->poller = NULL;
2063 		rqpair->cq = NULL;
2064 		if (rqpair->srq) {
2065 			rqpair->srq = NULL;
2066 			rqpair->rsps = NULL;
2067 		}
2068 	} else if (rqpair->cq) {
2069 		ibv_destroy_cq(rqpair->cq);
2070 		rqpair->cq = NULL;
2071 	}
2072 
2073 	nvme_rdma_free_reqs(rqpair);
2074 	nvme_rdma_free_rsps(rqpair->rsps);
2075 	rqpair->rsps = NULL;
2076 
2077 	/* destroy cm_id last so cma device will not be freed before we destroy the cq. */
2078 	if (rqpair->cm_id) {
2079 		rdma_destroy_id(rqpair->cm_id);
2080 		rqpair->cm_id = NULL;
2081 	}
2082 }
2083 
2084 static void nvme_rdma_qpair_abort_reqs(struct spdk_nvme_qpair *qpair, uint32_t dnr);
2085 
2086 static void
2087 nvme_rdma_qpair_flush_send_wrs(struct nvme_rdma_qpair *rqpair)
2088 {
2089 	struct ibv_send_wr *bad_wr = NULL;
2090 	int rc;
2091 
2092 	rc = spdk_rdma_provider_qp_flush_send_wrs(rqpair->rdma_qp, &bad_wr);
2093 	if (rc) {
2094 		nvme_rdma_reset_failed_sends(rqpair, bad_wr);
2095 	}
2096 }
2097 
2098 static int
2099 nvme_rdma_qpair_disconnected(struct nvme_rdma_qpair *rqpair, int ret)
2100 {
2101 	if (ret) {
2102 		SPDK_DEBUGLOG(nvme, "Target did not respond to qpair disconnect.\n");
2103 		goto quiet;
2104 	}
2105 
2106 	if (rqpair->poller == NULL) {
2107 		/* If poller is not used, cq is not shared.
2108 		 * So complete disconnecting qpair immediately.
2109 		 */
2110 		goto quiet;
2111 	}
2112 
2113 	if (rqpair->rsps == NULL) {
2114 		goto quiet;
2115 	}
2116 
2117 	nvme_rdma_qpair_flush_send_wrs(rqpair);
2118 
2119 	if (rqpair->need_destroy ||
2120 	    (rqpair->current_num_sends != 0 ||
2121 	     (!rqpair->srq && rqpair->rsps->current_num_recvs != 0)) ||
2122 	    ((rqpair->qpair.ctrlr->flags & SPDK_NVME_CTRLR_ACCEL_SEQUENCE_SUPPORTED) &&
2123 	     (!TAILQ_EMPTY(&rqpair->outstanding_reqs)))) {
2124 		rqpair->state = NVME_RDMA_QPAIR_STATE_LINGERING;
2125 		rqpair->evt_timeout_ticks = (NVME_RDMA_DISCONNECTED_QPAIR_TIMEOUT_US * spdk_get_ticks_hz()) /
2126 					    SPDK_SEC_TO_USEC + spdk_get_ticks();
2127 
2128 		return -EAGAIN;
2129 	}
2130 
2131 quiet:
2132 	rqpair->state = NVME_RDMA_QPAIR_STATE_EXITED;
2133 
2134 	nvme_rdma_qpair_abort_reqs(&rqpair->qpair, rqpair->qpair.abort_dnr);
2135 	nvme_rdma_qpair_destroy(rqpair);
2136 	nvme_transport_ctrlr_disconnect_qpair_done(&rqpair->qpair);
2137 
2138 	return 0;
2139 }
2140 
2141 static int
2142 nvme_rdma_qpair_wait_until_quiet(struct nvme_rdma_qpair *rqpair)
2143 {
2144 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
2145 	struct spdk_nvme_ctrlr *ctrlr = qpair->ctrlr;
2146 
2147 	if (spdk_get_ticks() < rqpair->evt_timeout_ticks &&
2148 	    (rqpair->current_num_sends != 0 ||
2149 	     (!rqpair->srq && rqpair->rsps->current_num_recvs != 0))) {
2150 		return -EAGAIN;
2151 	}
2152 
2153 	rqpair->state = NVME_RDMA_QPAIR_STATE_EXITED;
2154 	nvme_rdma_qpair_abort_reqs(qpair, qpair->abort_dnr);
2155 	if (!nvme_qpair_is_admin_queue(qpair)) {
2156 		nvme_robust_mutex_lock(&ctrlr->ctrlr_lock);
2157 	}
2158 	nvme_rdma_qpair_destroy(rqpair);
2159 	if (!nvme_qpair_is_admin_queue(qpair)) {
2160 		nvme_robust_mutex_unlock(&ctrlr->ctrlr_lock);
2161 	}
2162 	nvme_transport_ctrlr_disconnect_qpair_done(&rqpair->qpair);
2163 
2164 	return 0;
2165 }
2166 
2167 static void
2168 _nvme_rdma_ctrlr_disconnect_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair,
2169 				  nvme_rdma_cm_event_cb disconnected_qpair_cb)
2170 {
2171 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2172 	int rc;
2173 
2174 	assert(disconnected_qpair_cb != NULL);
2175 
2176 	rqpair->state = NVME_RDMA_QPAIR_STATE_EXITING;
2177 
2178 	if (rqpair->cm_id) {
2179 		if (rqpair->rdma_qp) {
2180 			rc = spdk_rdma_provider_qp_disconnect(rqpair->rdma_qp);
2181 			if ((qpair->ctrlr != NULL) && (rc == 0)) {
2182 				rc = nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_DISCONNECTED,
2183 								   disconnected_qpair_cb);
2184 				if (rc == 0) {
2185 					return;
2186 				}
2187 			}
2188 		}
2189 	}
2190 
2191 	disconnected_qpair_cb(rqpair, 0);
2192 }
2193 
2194 static int
2195 nvme_rdma_ctrlr_disconnect_qpair_poll(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
2196 {
2197 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2198 	int rc;
2199 
2200 	switch (rqpair->state) {
2201 	case NVME_RDMA_QPAIR_STATE_EXITING:
2202 		if (!nvme_qpair_is_admin_queue(qpair)) {
2203 			nvme_ctrlr_lock(ctrlr);
2204 		}
2205 
2206 		rc = nvme_rdma_process_event_poll(rqpair);
2207 
2208 		if (!nvme_qpair_is_admin_queue(qpair)) {
2209 			nvme_ctrlr_unlock(ctrlr);
2210 		}
2211 		break;
2212 
2213 	case NVME_RDMA_QPAIR_STATE_LINGERING:
2214 		rc = nvme_rdma_qpair_wait_until_quiet(rqpair);
2215 		break;
2216 	case NVME_RDMA_QPAIR_STATE_EXITED:
2217 		rc = 0;
2218 		break;
2219 
2220 	default:
2221 		assert(false);
2222 		rc = -EAGAIN;
2223 		break;
2224 	}
2225 
2226 	return rc;
2227 }
2228 
2229 static void
2230 nvme_rdma_ctrlr_disconnect_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
2231 {
2232 	int rc;
2233 
2234 	_nvme_rdma_ctrlr_disconnect_qpair(ctrlr, qpair, nvme_rdma_qpair_disconnected);
2235 
2236 	/* If the async mode is disabled, poll the qpair until it is actually disconnected.
2237 	 * It is ensured that poll_group_process_completions() calls disconnected_qpair_cb
2238 	 * for any disconnected qpair. Hence, we do not have to check if the qpair is in
2239 	 * a poll group or not.
2240 	 * At the same time, if the qpair is being destroyed, i.e. this function is called by
2241 	 * spdk_nvme_ctrlr_free_io_qpair then we need to wait until qpair is disconnected, otherwise
2242 	 * we may leak some resources.
2243 	 */
2244 	if (qpair->async && !qpair->destroy_in_progress) {
2245 		return;
2246 	}
2247 
2248 	while (1) {
2249 		rc = nvme_rdma_ctrlr_disconnect_qpair_poll(ctrlr, qpair);
2250 		if (rc != -EAGAIN) {
2251 			break;
2252 		}
2253 	}
2254 }
2255 
2256 static int
2257 nvme_rdma_stale_conn_disconnected(struct nvme_rdma_qpair *rqpair, int ret)
2258 {
2259 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
2260 
2261 	if (ret) {
2262 		SPDK_DEBUGLOG(nvme, "Target did not respond to qpair disconnect.\n");
2263 	}
2264 
2265 	nvme_rdma_qpair_destroy(rqpair);
2266 
2267 	qpair->last_transport_failure_reason = qpair->transport_failure_reason;
2268 	qpair->transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_NONE;
2269 
2270 	rqpair->state = NVME_RDMA_QPAIR_STATE_STALE_CONN;
2271 	rqpair->evt_timeout_ticks = (NVME_RDMA_STALE_CONN_RETRY_DELAY_US * spdk_get_ticks_hz()) /
2272 				    SPDK_SEC_TO_USEC + spdk_get_ticks();
2273 
2274 	return 0;
2275 }
2276 
2277 static int
2278 nvme_rdma_stale_conn_retry(struct nvme_rdma_qpair *rqpair)
2279 {
2280 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
2281 
2282 	if (rqpair->stale_conn_retry_count >= NVME_RDMA_STALE_CONN_RETRY_MAX) {
2283 		SPDK_ERRLOG("Retry failed %d times, give up stale connection to qpair (cntlid:%u, qid:%u).\n",
2284 			    NVME_RDMA_STALE_CONN_RETRY_MAX, qpair->ctrlr->cntlid, qpair->id);
2285 		return -ESTALE;
2286 	}
2287 
2288 	rqpair->stale_conn_retry_count++;
2289 
2290 	SPDK_NOTICELOG("%d times, retry stale connection to qpair (cntlid:%u, qid:%u).\n",
2291 		       rqpair->stale_conn_retry_count, qpair->ctrlr->cntlid, qpair->id);
2292 
2293 	_nvme_rdma_ctrlr_disconnect_qpair(qpair->ctrlr, qpair, nvme_rdma_stale_conn_disconnected);
2294 
2295 	return 0;
2296 }
2297 
2298 static int
2299 nvme_rdma_ctrlr_delete_io_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
2300 {
2301 	struct nvme_rdma_qpair *rqpair;
2302 
2303 	assert(qpair != NULL);
2304 	rqpair = nvme_rdma_qpair(qpair);
2305 
2306 	if (rqpair->state != NVME_RDMA_QPAIR_STATE_EXITED) {
2307 		int rc __attribute__((unused));
2308 
2309 		/* qpair was removed from the poll group while the disconnect is not finished.
2310 		 * Destroy rdma resources forcefully. */
2311 		rc = nvme_rdma_qpair_disconnected(rqpair, 0);
2312 		assert(rc == 0);
2313 	}
2314 
2315 	nvme_rdma_qpair_abort_reqs(qpair, qpair->abort_dnr);
2316 	nvme_qpair_deinit(qpair);
2317 
2318 	spdk_free(rqpair);
2319 
2320 	return 0;
2321 }
2322 
2323 static struct spdk_nvme_qpair *
2324 nvme_rdma_ctrlr_create_io_qpair(struct spdk_nvme_ctrlr *ctrlr, uint16_t qid,
2325 				const struct spdk_nvme_io_qpair_opts *opts)
2326 {
2327 	return nvme_rdma_ctrlr_create_qpair(ctrlr, qid, opts->io_queue_size, opts->qprio,
2328 					    opts->io_queue_requests,
2329 					    opts->delay_cmd_submit,
2330 					    opts->async_mode);
2331 }
2332 
2333 static int
2334 nvme_rdma_ctrlr_enable(struct spdk_nvme_ctrlr *ctrlr)
2335 {
2336 	/* do nothing here */
2337 	return 0;
2338 }
2339 
2340 static int nvme_rdma_ctrlr_destruct(struct spdk_nvme_ctrlr *ctrlr);
2341 
2342 /* We have to use the typedef in the function declaration to appease astyle. */
2343 typedef struct spdk_nvme_ctrlr spdk_nvme_ctrlr_t;
2344 
2345 static spdk_nvme_ctrlr_t *
2346 nvme_rdma_ctrlr_construct(const struct spdk_nvme_transport_id *trid,
2347 			  const struct spdk_nvme_ctrlr_opts *opts,
2348 			  void *devhandle)
2349 {
2350 	struct nvme_rdma_ctrlr *rctrlr;
2351 	struct ibv_context **contexts;
2352 	struct ibv_device_attr dev_attr;
2353 	int i, flag, rc;
2354 
2355 	rctrlr = spdk_zmalloc(sizeof(struct nvme_rdma_ctrlr), 0, NULL, SPDK_ENV_NUMA_ID_ANY,
2356 			      SPDK_MALLOC_DMA);
2357 	if (rctrlr == NULL) {
2358 		SPDK_ERRLOG("could not allocate ctrlr\n");
2359 		return NULL;
2360 	}
2361 
2362 	rctrlr->ctrlr.opts = *opts;
2363 	rctrlr->ctrlr.trid = *trid;
2364 
2365 	if (opts->transport_retry_count > NVME_RDMA_CTRLR_MAX_TRANSPORT_RETRY_COUNT) {
2366 		SPDK_NOTICELOG("transport_retry_count exceeds max value %d, use max value\n",
2367 			       NVME_RDMA_CTRLR_MAX_TRANSPORT_RETRY_COUNT);
2368 		rctrlr->ctrlr.opts.transport_retry_count = NVME_RDMA_CTRLR_MAX_TRANSPORT_RETRY_COUNT;
2369 	}
2370 
2371 	if (opts->transport_ack_timeout > NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT) {
2372 		SPDK_NOTICELOG("transport_ack_timeout exceeds max value %d, use max value\n",
2373 			       NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT);
2374 		rctrlr->ctrlr.opts.transport_ack_timeout = NVME_RDMA_CTRLR_MAX_TRANSPORT_ACK_TIMEOUT;
2375 	}
2376 
2377 	contexts = rdma_get_devices(NULL);
2378 	if (contexts == NULL) {
2379 		SPDK_ERRLOG("rdma_get_devices() failed: %s (%d)\n", spdk_strerror(errno), errno);
2380 		spdk_free(rctrlr);
2381 		return NULL;
2382 	}
2383 
2384 	i = 0;
2385 	rctrlr->max_sge = NVME_RDMA_MAX_SGL_DESCRIPTORS;
2386 
2387 	while (contexts[i] != NULL) {
2388 		rc = ibv_query_device(contexts[i], &dev_attr);
2389 		if (rc < 0) {
2390 			SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
2391 			rdma_free_devices(contexts);
2392 			spdk_free(rctrlr);
2393 			return NULL;
2394 		}
2395 		rctrlr->max_sge = spdk_min(rctrlr->max_sge, (uint16_t)dev_attr.max_sge);
2396 		i++;
2397 	}
2398 
2399 	rdma_free_devices(contexts);
2400 
2401 	rc = nvme_ctrlr_construct(&rctrlr->ctrlr);
2402 	if (rc != 0) {
2403 		spdk_free(rctrlr);
2404 		return NULL;
2405 	}
2406 
2407 	STAILQ_INIT(&rctrlr->pending_cm_events);
2408 	STAILQ_INIT(&rctrlr->free_cm_events);
2409 	rctrlr->cm_events = spdk_zmalloc(NVME_RDMA_NUM_CM_EVENTS * sizeof(*rctrlr->cm_events), 0, NULL,
2410 					 SPDK_ENV_NUMA_ID_ANY, SPDK_MALLOC_DMA);
2411 	if (rctrlr->cm_events == NULL) {
2412 		SPDK_ERRLOG("unable to allocate buffers to hold CM events.\n");
2413 		goto destruct_ctrlr;
2414 	}
2415 
2416 	for (i = 0; i < NVME_RDMA_NUM_CM_EVENTS; i++) {
2417 		STAILQ_INSERT_TAIL(&rctrlr->free_cm_events, &rctrlr->cm_events[i], link);
2418 	}
2419 
2420 	rctrlr->cm_channel = rdma_create_event_channel();
2421 	if (rctrlr->cm_channel == NULL) {
2422 		SPDK_ERRLOG("rdma_create_event_channel() failed\n");
2423 		goto destruct_ctrlr;
2424 	}
2425 
2426 	flag = fcntl(rctrlr->cm_channel->fd, F_GETFL);
2427 	if (fcntl(rctrlr->cm_channel->fd, F_SETFL, flag | O_NONBLOCK) < 0) {
2428 		SPDK_ERRLOG("Cannot set event channel to non blocking\n");
2429 		goto destruct_ctrlr;
2430 	}
2431 
2432 	rctrlr->ctrlr.adminq = nvme_rdma_ctrlr_create_qpair(&rctrlr->ctrlr, 0,
2433 			       rctrlr->ctrlr.opts.admin_queue_size, 0,
2434 			       rctrlr->ctrlr.opts.admin_queue_size, false, true);
2435 	if (!rctrlr->ctrlr.adminq) {
2436 		SPDK_ERRLOG("failed to create admin qpair\n");
2437 		goto destruct_ctrlr;
2438 	}
2439 	if (spdk_rdma_provider_accel_sequence_supported()) {
2440 		rctrlr->ctrlr.flags |= SPDK_NVME_CTRLR_ACCEL_SEQUENCE_SUPPORTED;
2441 	}
2442 
2443 	if (nvme_ctrlr_add_process(&rctrlr->ctrlr, 0) != 0) {
2444 		SPDK_ERRLOG("nvme_ctrlr_add_process() failed\n");
2445 		goto destruct_ctrlr;
2446 	}
2447 
2448 	SPDK_DEBUGLOG(nvme, "successfully initialized the nvmf ctrlr\n");
2449 	return &rctrlr->ctrlr;
2450 
2451 destruct_ctrlr:
2452 	nvme_ctrlr_destruct(&rctrlr->ctrlr);
2453 	return NULL;
2454 }
2455 
2456 static int
2457 nvme_rdma_ctrlr_destruct(struct spdk_nvme_ctrlr *ctrlr)
2458 {
2459 	struct nvme_rdma_ctrlr *rctrlr = nvme_rdma_ctrlr(ctrlr);
2460 	struct nvme_rdma_cm_event_entry *entry;
2461 
2462 	if (ctrlr->adminq) {
2463 		nvme_rdma_ctrlr_delete_io_qpair(ctrlr, ctrlr->adminq);
2464 	}
2465 
2466 	STAILQ_FOREACH(entry, &rctrlr->pending_cm_events, link) {
2467 		rdma_ack_cm_event(entry->evt);
2468 	}
2469 
2470 	STAILQ_INIT(&rctrlr->free_cm_events);
2471 	STAILQ_INIT(&rctrlr->pending_cm_events);
2472 	spdk_free(rctrlr->cm_events);
2473 
2474 	if (rctrlr->cm_channel) {
2475 		rdma_destroy_event_channel(rctrlr->cm_channel);
2476 		rctrlr->cm_channel = NULL;
2477 	}
2478 
2479 	nvme_ctrlr_destruct_finish(ctrlr);
2480 
2481 	spdk_free(rctrlr);
2482 
2483 	return 0;
2484 }
2485 
2486 static inline int
2487 _nvme_rdma_qpair_submit_request(struct nvme_rdma_qpair *rqpair,
2488 				struct spdk_nvme_rdma_req *rdma_req)
2489 {
2490 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
2491 	struct ibv_send_wr *wr;
2492 	struct nvme_rdma_poll_group *group;
2493 
2494 	if (!rqpair->link_active.tqe_prev && qpair->poll_group) {
2495 		group = nvme_rdma_poll_group(qpair->poll_group);
2496 		TAILQ_INSERT_TAIL(&group->active_qpairs, rqpair, link_active);
2497 	}
2498 	assert(rqpair->current_num_sends < rqpair->num_entries);
2499 	rqpair->current_num_sends++;
2500 
2501 	wr = &rdma_req->send_wr;
2502 	wr->next = NULL;
2503 	nvme_rdma_trace_ibv_sge(wr->sg_list);
2504 
2505 	spdk_rdma_provider_qp_queue_send_wrs(rqpair->rdma_qp, wr);
2506 
2507 	if (!rqpair->delay_cmd_submit) {
2508 		return nvme_rdma_qpair_submit_sends(rqpair);
2509 	}
2510 
2511 	return 0;
2512 }
2513 
2514 static int
2515 nvme_rdma_qpair_submit_request(struct spdk_nvme_qpair *qpair,
2516 			       struct nvme_request *req)
2517 {
2518 	struct nvme_rdma_qpair *rqpair;
2519 	struct spdk_nvme_rdma_req *rdma_req;
2520 	int rc;
2521 
2522 	rqpair = nvme_rdma_qpair(qpair);
2523 	assert(rqpair != NULL);
2524 	assert(req != NULL);
2525 
2526 	rdma_req = nvme_rdma_req_get(rqpair);
2527 	if (spdk_unlikely(!rdma_req)) {
2528 		if (rqpair->poller) {
2529 			rqpair->poller->stats.queued_requests++;
2530 		}
2531 		/* Inform the upper layer to try again later. */
2532 		return -EAGAIN;
2533 	}
2534 
2535 	assert(rdma_req->req == NULL);
2536 	rdma_req->req = req;
2537 	req->cmd.cid = rdma_req->id;
2538 	if (req->accel_sequence) {
2539 		assert(spdk_rdma_provider_accel_sequence_supported());
2540 		assert(rqpair->qpair.poll_group->group);
2541 		assert(rqpair->qpair.poll_group->group->accel_fn_table.append_copy);
2542 		assert(rqpair->qpair.poll_group->group->accel_fn_table.reverse_sequence);
2543 		assert(rqpair->qpair.poll_group->group->accel_fn_table.finish_sequence);
2544 
2545 		rc = nvme_rdma_apply_accel_sequence(rqpair, req, rdma_req);
2546 		if (spdk_unlikely(rc)) {
2547 			SPDK_ERRLOG("failed to apply accel seq, rqpair %p, req %p, rc %d\n", rqpair, rdma_req, rc);
2548 			nvme_rdma_req_put(rqpair, rdma_req);
2549 			return rc;
2550 		}
2551 		/* Capsule will be sent in data_transfer callback */
2552 		return 0;
2553 	}
2554 
2555 	rc = nvme_rdma_req_init(rqpair, rdma_req);
2556 	if (spdk_unlikely(rc)) {
2557 		SPDK_ERRLOG("nvme_rdma_req_init() failed\n");
2558 		nvme_rdma_req_put(rqpair, rdma_req);
2559 		return -1;
2560 	}
2561 
2562 	TAILQ_INSERT_TAIL(&rqpair->outstanding_reqs, rdma_req, link);
2563 	rqpair->num_outstanding_reqs++;
2564 
2565 	return _nvme_rdma_qpair_submit_request(rqpair, rdma_req);
2566 }
2567 
2568 static int
2569 nvme_rdma_qpair_reset(struct spdk_nvme_qpair *qpair)
2570 {
2571 	/* Currently, doing nothing here */
2572 	return 0;
2573 }
2574 
2575 static void
2576 nvme_rdma_qpair_abort_reqs(struct spdk_nvme_qpair *qpair, uint32_t dnr)
2577 {
2578 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
2579 	struct spdk_nvme_cpl cpl;
2580 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2581 
2582 	cpl.sqid = qpair->id;
2583 	cpl.status.sc = SPDK_NVME_SC_ABORTED_SQ_DELETION;
2584 	cpl.status.sct = SPDK_NVME_SCT_GENERIC;
2585 	cpl.status.dnr = dnr;
2586 
2587 	/*
2588 	 * We cannot abort requests at the RDMA layer without
2589 	 * unregistering them. If we do, we can still get error
2590 	 * free completions on the shared completion queue.
2591 	 */
2592 	if (nvme_qpair_get_state(qpair) > NVME_QPAIR_DISCONNECTING &&
2593 	    nvme_qpair_get_state(qpair) != NVME_QPAIR_DESTROYING) {
2594 		nvme_ctrlr_disconnect_qpair(qpair);
2595 	}
2596 
2597 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
2598 		if (rdma_req->in_progress_accel) {
2599 			/* We should wait for accel completion */
2600 			continue;
2601 		}
2602 		nvme_rdma_req_complete(rdma_req, &cpl, true);
2603 	}
2604 }
2605 
2606 static void
2607 nvme_rdma_qpair_check_timeout(struct spdk_nvme_qpair *qpair)
2608 {
2609 	uint64_t t02;
2610 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
2611 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
2612 	struct spdk_nvme_ctrlr *ctrlr = qpair->ctrlr;
2613 	struct spdk_nvme_ctrlr_process *active_proc;
2614 
2615 	/* Don't check timeouts during controller initialization. */
2616 	if (ctrlr->state != NVME_CTRLR_STATE_READY) {
2617 		return;
2618 	}
2619 
2620 	if (nvme_qpair_is_admin_queue(qpair)) {
2621 		active_proc = nvme_ctrlr_get_current_process(ctrlr);
2622 	} else {
2623 		active_proc = qpair->active_proc;
2624 	}
2625 
2626 	/* Only check timeouts if the current process has a timeout callback. */
2627 	if (active_proc == NULL || active_proc->timeout_cb_fn == NULL) {
2628 		return;
2629 	}
2630 
2631 	t02 = spdk_get_ticks();
2632 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
2633 		assert(rdma_req->req != NULL);
2634 
2635 		if (nvme_request_check_timeout(rdma_req->req, rdma_req->id, active_proc, t02)) {
2636 			/*
2637 			 * The requests are in order, so as soon as one has not timed out,
2638 			 * stop iterating.
2639 			 */
2640 			break;
2641 		}
2642 	}
2643 }
2644 
2645 static inline void
2646 nvme_rdma_request_ready(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
2647 {
2648 	struct spdk_nvme_rdma_rsp *rdma_rsp = rdma_req->rdma_rsp;
2649 	struct ibv_recv_wr *recv_wr = rdma_rsp->recv_wr;
2650 
2651 	if (rdma_req->transfer_cpl_cb) {
2652 		int rc = 0;
2653 
2654 		if (spdk_unlikely(spdk_nvme_cpl_is_error(&rdma_rsp->cpl))) {
2655 			SPDK_WARNLOG("req %p, error cpl sct %d, sc %d\n", rdma_req, rdma_rsp->cpl.status.sct,
2656 				     rdma_rsp->cpl.status.sc);
2657 			rc = -EIO;
2658 		}
2659 		nvme_rdma_finish_data_transfer(rdma_req, rc);
2660 	} else {
2661 		nvme_rdma_req_complete(rdma_req, &rdma_rsp->cpl, true);
2662 	}
2663 
2664 	if (spdk_unlikely(rqpair->state >= NVME_RDMA_QPAIR_STATE_EXITING && !rqpair->srq)) {
2665 		/* Skip posting back recv wr if we are in a disconnection process. We may never get
2666 		 * a WC and we may end up stuck in LINGERING state until the timeout. */
2667 		return;
2668 	}
2669 
2670 	assert(rqpair->rsps->current_num_recvs < rqpair->rsps->num_entries);
2671 	rqpair->rsps->current_num_recvs++;
2672 
2673 	recv_wr->next = NULL;
2674 	nvme_rdma_trace_ibv_sge(recv_wr->sg_list);
2675 
2676 	if (!rqpair->srq) {
2677 		spdk_rdma_provider_qp_queue_recv_wrs(rqpair->rdma_qp, recv_wr);
2678 	} else {
2679 		spdk_rdma_provider_srq_queue_recv_wrs(rqpair->srq, recv_wr);
2680 	}
2681 }
2682 
2683 #define MAX_COMPLETIONS_PER_POLL 128
2684 
2685 static void
2686 nvme_rdma_fail_qpair(struct spdk_nvme_qpair *qpair, int failure_reason)
2687 {
2688 	if (failure_reason == IBV_WC_RETRY_EXC_ERR) {
2689 		qpair->transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_REMOTE;
2690 	} else if (qpair->transport_failure_reason == SPDK_NVME_QPAIR_FAILURE_NONE) {
2691 		qpair->transport_failure_reason = SPDK_NVME_QPAIR_FAILURE_UNKNOWN;
2692 	}
2693 
2694 	nvme_ctrlr_disconnect_qpair(qpair);
2695 }
2696 
2697 static struct nvme_rdma_qpair *
2698 get_rdma_qpair_from_wc(struct nvme_rdma_poll_group *group, struct ibv_wc *wc)
2699 {
2700 	struct spdk_nvme_qpair *qpair;
2701 	struct nvme_rdma_qpair *rqpair;
2702 
2703 	STAILQ_FOREACH(qpair, &group->group.connected_qpairs, poll_group_stailq) {
2704 		rqpair = nvme_rdma_qpair(qpair);
2705 		if (NVME_RDMA_POLL_GROUP_CHECK_QPN(rqpair, wc->qp_num)) {
2706 			return rqpair;
2707 		}
2708 	}
2709 
2710 	STAILQ_FOREACH(qpair, &group->group.disconnected_qpairs, poll_group_stailq) {
2711 		rqpair = nvme_rdma_qpair(qpair);
2712 		if (NVME_RDMA_POLL_GROUP_CHECK_QPN(rqpair, wc->qp_num)) {
2713 			return rqpair;
2714 		}
2715 	}
2716 
2717 	return NULL;
2718 }
2719 
2720 static inline void
2721 nvme_rdma_log_wc_status(struct nvme_rdma_qpair *rqpair, struct ibv_wc *wc)
2722 {
2723 	struct nvme_rdma_wr *rdma_wr = (struct nvme_rdma_wr *)wc->wr_id;
2724 
2725 	if (wc->status == IBV_WC_WR_FLUSH_ERR) {
2726 		/* If qpair is in ERR state, we will receive completions for all posted and not completed
2727 		 * Work Requests with IBV_WC_WR_FLUSH_ERR status. Don't log an error in that case */
2728 		SPDK_DEBUGLOG(nvme, "WC error, qid %u, qp state %d, request 0x%lu type %d, status: (%d): %s\n",
2729 			      rqpair->qpair.id, rqpair->qpair.state, wc->wr_id, rdma_wr->type, wc->status,
2730 			      ibv_wc_status_str(wc->status));
2731 	} else {
2732 		SPDK_ERRLOG("WC error, qid %u, qp state %d, request 0x%lu type %d, status: (%d): %s\n",
2733 			    rqpair->qpair.id, rqpair->qpair.state, wc->wr_id, rdma_wr->type, wc->status,
2734 			    ibv_wc_status_str(wc->status));
2735 	}
2736 }
2737 
2738 static inline int
2739 nvme_rdma_process_recv_completion(struct nvme_rdma_poller *poller, struct ibv_wc *wc,
2740 				  struct nvme_rdma_wr *rdma_wr)
2741 {
2742 	struct nvme_rdma_qpair		*rqpair;
2743 	struct spdk_nvme_rdma_req	*rdma_req;
2744 	struct spdk_nvme_rdma_rsp	*rdma_rsp;
2745 
2746 	rdma_rsp = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvme_rdma_rsp, rdma_wr);
2747 
2748 	if (poller && poller->srq) {
2749 		rqpair = get_rdma_qpair_from_wc(poller->group, wc);
2750 		if (spdk_unlikely(!rqpair)) {
2751 			/* Since we do not handle the LAST_WQE_REACHED event, we do not know when
2752 			 * a Receive Queue in a QP, that is associated with an SRQ, is flushed.
2753 			 * We may get a WC for a already destroyed QP.
2754 			 *
2755 			 * However, for the SRQ, this is not any error. Hence, just re-post the
2756 			 * receive request to the SRQ to reuse for other QPs, and return 0.
2757 			 */
2758 			spdk_rdma_provider_srq_queue_recv_wrs(poller->srq, rdma_rsp->recv_wr);
2759 			return 0;
2760 		}
2761 	} else {
2762 		rqpair = rdma_rsp->rqpair;
2763 		if (spdk_unlikely(!rqpair)) {
2764 			/* TODO: Fix forceful QP destroy when it is not async mode.
2765 			 * CQ itself did not cause any error. Hence, return 0 for now.
2766 			 */
2767 			SPDK_WARNLOG("QP might be already destroyed.\n");
2768 			return 0;
2769 		}
2770 	}
2771 
2772 
2773 	assert(rqpair->rsps->current_num_recvs > 0);
2774 	rqpair->rsps->current_num_recvs--;
2775 
2776 	if (spdk_unlikely(wc->status)) {
2777 		nvme_rdma_log_wc_status(rqpair, wc);
2778 		goto err_wc;
2779 	}
2780 
2781 	SPDK_DEBUGLOG(nvme, "CQ recv completion\n");
2782 
2783 	if (spdk_unlikely(wc->byte_len < sizeof(struct spdk_nvme_cpl))) {
2784 		SPDK_ERRLOG("recv length %u less than expected response size\n", wc->byte_len);
2785 		goto err_wc;
2786 	}
2787 	rdma_req = &rqpair->rdma_reqs[rdma_rsp->cpl.cid];
2788 	rdma_req->completion_flags |= NVME_RDMA_RECV_COMPLETED;
2789 	rdma_req->rdma_rsp = rdma_rsp;
2790 
2791 	if ((rdma_req->completion_flags & NVME_RDMA_SEND_COMPLETED) == 0) {
2792 		return 0;
2793 	}
2794 
2795 	rqpair->num_completions++;
2796 
2797 	nvme_rdma_request_ready(rqpair, rdma_req);
2798 
2799 	if (!rqpair->delay_cmd_submit) {
2800 		if (spdk_unlikely(nvme_rdma_qpair_submit_recvs(rqpair))) {
2801 			SPDK_ERRLOG("Unable to re-post rx descriptor\n");
2802 			nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2803 			return -ENXIO;
2804 		}
2805 	}
2806 
2807 	return 1;
2808 
2809 err_wc:
2810 	nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2811 	if (poller && poller->srq) {
2812 		spdk_rdma_provider_srq_queue_recv_wrs(poller->srq, rdma_rsp->recv_wr);
2813 	}
2814 	rdma_req = &rqpair->rdma_reqs[rdma_rsp->cpl.cid];
2815 	if (rdma_req->transfer_cpl_cb) {
2816 		nvme_rdma_finish_data_transfer(rdma_req, -ENXIO);
2817 	}
2818 	return -ENXIO;
2819 }
2820 
2821 static inline int
2822 nvme_rdma_process_send_completion(struct nvme_rdma_poller *poller,
2823 				  struct nvme_rdma_qpair *rdma_qpair,
2824 				  struct ibv_wc *wc, struct nvme_rdma_wr *rdma_wr)
2825 {
2826 	struct nvme_rdma_qpair		*rqpair;
2827 	struct spdk_nvme_rdma_req	*rdma_req;
2828 
2829 	rdma_req = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvme_rdma_req, rdma_wr);
2830 	rqpair = rdma_req->req ? nvme_rdma_qpair(rdma_req->req->qpair) : NULL;
2831 	if (spdk_unlikely(!rqpair)) {
2832 		rqpair = rdma_qpair != NULL ? rdma_qpair : get_rdma_qpair_from_wc(poller->group, wc);
2833 	}
2834 
2835 	/* If we are flushing I/O */
2836 	if (spdk_unlikely(wc->status)) {
2837 		if (!rqpair) {
2838 			/* When poll_group is used, several qpairs share the same CQ and it is possible to
2839 			 * receive a completion with error (e.g. IBV_WC_WR_FLUSH_ERR) for already disconnected qpair
2840 			 * That happens due to qpair is destroyed while there are submitted but not completed send/receive
2841 			 * Work Requests */
2842 			assert(poller);
2843 			return 0;
2844 		}
2845 		assert(rqpair->current_num_sends > 0);
2846 		rqpair->current_num_sends--;
2847 		nvme_rdma_log_wc_status(rqpair, wc);
2848 		nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2849 		if (rdma_req->rdma_rsp && poller && poller->srq) {
2850 			spdk_rdma_provider_srq_queue_recv_wrs(poller->srq, rdma_req->rdma_rsp->recv_wr);
2851 		}
2852 		if (rdma_req->transfer_cpl_cb) {
2853 			nvme_rdma_finish_data_transfer(rdma_req, -ENXIO);
2854 		}
2855 		return -ENXIO;
2856 	}
2857 
2858 	/* We do not support Soft Roce anymore. Other than Soft Roce's bug, we should not
2859 	 * receive a completion without error status after qpair is disconnected/destroyed.
2860 	 */
2861 	if (spdk_unlikely(rdma_req->req == NULL)) {
2862 		/*
2863 		 * Some infiniband drivers do not guarantee the previous assumption after we
2864 		 * received a RDMA_CM_EVENT_DEVICE_REMOVAL event.
2865 		 */
2866 		SPDK_ERRLOG("Received malformed completion: request 0x%"PRIx64" type %d\n", wc->wr_id,
2867 			    rdma_wr->type);
2868 		if (!rqpair || !rqpair->need_destroy) {
2869 			assert(0);
2870 		}
2871 		return -ENXIO;
2872 	}
2873 
2874 	rdma_req->completion_flags |= NVME_RDMA_SEND_COMPLETED;
2875 	assert(rqpair->current_num_sends > 0);
2876 	rqpair->current_num_sends--;
2877 
2878 	if ((rdma_req->completion_flags & NVME_RDMA_RECV_COMPLETED) == 0) {
2879 		return 0;
2880 	}
2881 
2882 	rqpair->num_completions++;
2883 
2884 	nvme_rdma_request_ready(rqpair, rdma_req);
2885 
2886 	if (!rqpair->delay_cmd_submit) {
2887 		if (spdk_unlikely(nvme_rdma_qpair_submit_recvs(rqpair))) {
2888 			SPDK_ERRLOG("Unable to re-post rx descriptor\n");
2889 			nvme_rdma_fail_qpair(&rqpair->qpair, 0);
2890 			return -ENXIO;
2891 		}
2892 	}
2893 
2894 	return 1;
2895 }
2896 
2897 static inline int
2898 nvme_rdma_cq_process_completions(struct ibv_cq *cq, uint32_t batch_size,
2899 				 struct nvme_rdma_poller *poller,
2900 				 struct nvme_rdma_qpair *rdma_qpair,
2901 				 uint64_t *rdma_completions)
2902 {
2903 	struct ibv_wc			wc[MAX_COMPLETIONS_PER_POLL];
2904 	struct nvme_rdma_wr		*rdma_wr;
2905 	uint32_t			reaped = 0;
2906 	int				completion_rc = 0;
2907 	int				rc, _rc, i;
2908 
2909 	rc = ibv_poll_cq(cq, batch_size, wc);
2910 	if (spdk_unlikely(rc < 0)) {
2911 		SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
2912 			    errno, spdk_strerror(errno));
2913 		return -ECANCELED;
2914 	} else if (rc == 0) {
2915 		return 0;
2916 	}
2917 
2918 	for (i = 0; i < rc; i++) {
2919 		rdma_wr = (struct nvme_rdma_wr *)wc[i].wr_id;
2920 		switch (rdma_wr->type) {
2921 		case RDMA_WR_TYPE_RECV:
2922 			_rc = nvme_rdma_process_recv_completion(poller, &wc[i], rdma_wr);
2923 			break;
2924 
2925 		case RDMA_WR_TYPE_SEND:
2926 			_rc = nvme_rdma_process_send_completion(poller, rdma_qpair, &wc[i], rdma_wr);
2927 			break;
2928 
2929 		default:
2930 			SPDK_ERRLOG("Received an unexpected opcode on the CQ: %d\n", rdma_wr->type);
2931 			return -ECANCELED;
2932 		}
2933 		if (spdk_likely(_rc >= 0)) {
2934 			reaped += _rc;
2935 		} else {
2936 			completion_rc = _rc;
2937 		}
2938 	}
2939 
2940 	*rdma_completions += rc;
2941 
2942 	if (spdk_unlikely(completion_rc)) {
2943 		return completion_rc;
2944 	}
2945 
2946 	return reaped;
2947 }
2948 
2949 static void
2950 dummy_disconnected_qpair_cb(struct spdk_nvme_qpair *qpair, void *poll_group_ctx)
2951 {
2952 
2953 }
2954 
2955 static int
2956 nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
2957 				    uint32_t max_completions)
2958 {
2959 	struct nvme_rdma_qpair		*rqpair = nvme_rdma_qpair(qpair);
2960 	struct nvme_rdma_ctrlr		*rctrlr = nvme_rdma_ctrlr(qpair->ctrlr);
2961 	int				rc = 0, batch_size;
2962 	struct ibv_cq			*cq;
2963 	uint64_t			rdma_completions = 0;
2964 
2965 	/*
2966 	 * This is used during the connection phase. It's possible that we are still reaping error completions
2967 	 * from other qpairs so we need to call the poll group function. Also, it's more correct since the cq
2968 	 * is shared.
2969 	 */
2970 	if (qpair->poll_group != NULL) {
2971 		return spdk_nvme_poll_group_process_completions(qpair->poll_group->group, max_completions,
2972 				dummy_disconnected_qpair_cb);
2973 	}
2974 
2975 	if (max_completions == 0) {
2976 		max_completions = rqpair->num_entries;
2977 	} else {
2978 		max_completions = spdk_min(max_completions, rqpair->num_entries);
2979 	}
2980 
2981 	switch (nvme_qpair_get_state(qpair)) {
2982 	case NVME_QPAIR_CONNECTING:
2983 		rc = nvme_rdma_ctrlr_connect_qpair_poll(qpair->ctrlr, qpair);
2984 		if (rc == 0) {
2985 			/* Once the connection is completed, we can submit queued requests */
2986 			nvme_qpair_resubmit_requests(qpair, rqpair->num_entries);
2987 		} else if (rc != -EAGAIN) {
2988 			SPDK_ERRLOG("Failed to connect rqpair=%p\n", rqpair);
2989 			goto failed;
2990 		} else if (rqpair->state <= NVME_RDMA_QPAIR_STATE_INITIALIZING) {
2991 			return 0;
2992 		}
2993 		break;
2994 
2995 	case NVME_QPAIR_DISCONNECTING:
2996 		nvme_rdma_ctrlr_disconnect_qpair_poll(qpair->ctrlr, qpair);
2997 		return -ENXIO;
2998 
2999 	default:
3000 		if (nvme_qpair_is_admin_queue(qpair)) {
3001 			nvme_rdma_poll_events(rctrlr);
3002 		}
3003 		nvme_rdma_qpair_process_cm_event(rqpair);
3004 		break;
3005 	}
3006 
3007 	if (spdk_unlikely(qpair->transport_failure_reason != SPDK_NVME_QPAIR_FAILURE_NONE)) {
3008 		goto failed;
3009 	}
3010 
3011 	cq = rqpair->cq;
3012 
3013 	rqpair->num_completions = 0;
3014 	do {
3015 		batch_size = spdk_min((max_completions - rqpair->num_completions), MAX_COMPLETIONS_PER_POLL);
3016 		rc = nvme_rdma_cq_process_completions(cq, batch_size, NULL, rqpair, &rdma_completions);
3017 
3018 		if (rc == 0) {
3019 			break;
3020 			/* Handle the case where we fail to poll the cq. */
3021 		} else if (rc == -ECANCELED) {
3022 			goto failed;
3023 		} else if (rc == -ENXIO) {
3024 			return rc;
3025 		}
3026 	} while (rqpair->num_completions < max_completions);
3027 
3028 	if (spdk_unlikely(nvme_rdma_qpair_submit_sends(rqpair) ||
3029 			  nvme_rdma_qpair_submit_recvs(rqpair))) {
3030 		goto failed;
3031 	}
3032 
3033 	if (spdk_unlikely(qpair->ctrlr->timeout_enabled)) {
3034 		nvme_rdma_qpair_check_timeout(qpair);
3035 	}
3036 
3037 	return rqpair->num_completions;
3038 
3039 failed:
3040 	nvme_rdma_fail_qpair(qpair, 0);
3041 	return -ENXIO;
3042 }
3043 
3044 static uint32_t
3045 nvme_rdma_ctrlr_get_max_xfer_size(struct spdk_nvme_ctrlr *ctrlr)
3046 {
3047 	/* max_mr_size by ibv_query_device indicates the largest value that we can
3048 	 * set for a registered memory region.  It is independent from the actual
3049 	 * I/O size and is very likely to be larger than 2 MiB which is the
3050 	 * granularity we currently register memory regions.  Hence return
3051 	 * UINT32_MAX here and let the generic layer use the controller data to
3052 	 * moderate this value.
3053 	 */
3054 	return UINT32_MAX;
3055 }
3056 
3057 static uint16_t
3058 nvme_rdma_ctrlr_get_max_sges(struct spdk_nvme_ctrlr *ctrlr)
3059 {
3060 	struct nvme_rdma_ctrlr *rctrlr = nvme_rdma_ctrlr(ctrlr);
3061 	uint32_t max_sge = rctrlr->max_sge;
3062 	uint32_t max_in_capsule_sge = (ctrlr->cdata.nvmf_specific.ioccsz * 16 -
3063 				       sizeof(struct spdk_nvme_cmd)) /
3064 				      sizeof(struct spdk_nvme_sgl_descriptor);
3065 
3066 	/* Max SGE is limited by capsule size */
3067 	max_sge = spdk_min(max_sge, max_in_capsule_sge);
3068 	/* Max SGE may be limited by MSDBD */
3069 	if (ctrlr->cdata.nvmf_specific.msdbd != 0) {
3070 		max_sge = spdk_min(max_sge, ctrlr->cdata.nvmf_specific.msdbd);
3071 	}
3072 
3073 	/* Max SGE can't be less than 1 */
3074 	max_sge = spdk_max(1, max_sge);
3075 	return max_sge;
3076 }
3077 
3078 static int
3079 nvme_rdma_qpair_iterate_requests(struct spdk_nvme_qpair *qpair,
3080 				 int (*iter_fn)(struct nvme_request *req, void *arg),
3081 				 void *arg)
3082 {
3083 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
3084 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
3085 	int rc;
3086 
3087 	assert(iter_fn != NULL);
3088 
3089 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
3090 		assert(rdma_req->req != NULL);
3091 
3092 		rc = iter_fn(rdma_req->req, arg);
3093 		if (rc != 0) {
3094 			return rc;
3095 		}
3096 	}
3097 
3098 	return 0;
3099 }
3100 
3101 static int
3102 nvme_rdma_qpair_authenticate(struct spdk_nvme_qpair *qpair)
3103 {
3104 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
3105 	int rc;
3106 
3107 	/* If the qpair is still connecting, it'll be forced to authenticate later on */
3108 	if (rqpair->state < NVME_RDMA_QPAIR_STATE_RUNNING) {
3109 		return 0;
3110 	} else if (rqpair->state != NVME_RDMA_QPAIR_STATE_RUNNING) {
3111 		return -ENOTCONN;
3112 	}
3113 
3114 	rc = nvme_fabric_qpair_authenticate_async(qpair);
3115 	if (rc == 0) {
3116 		nvme_qpair_set_state(qpair, NVME_QPAIR_CONNECTING);
3117 		rqpair->state = NVME_RDMA_QPAIR_STATE_AUTHENTICATING;
3118 	}
3119 
3120 	return rc;
3121 }
3122 
3123 static void
3124 nvme_rdma_admin_qpair_abort_aers(struct spdk_nvme_qpair *qpair)
3125 {
3126 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
3127 	struct spdk_nvme_cpl cpl;
3128 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
3129 
3130 	cpl.status.sc = SPDK_NVME_SC_ABORTED_SQ_DELETION;
3131 	cpl.status.sct = SPDK_NVME_SCT_GENERIC;
3132 
3133 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
3134 		assert(rdma_req->req != NULL);
3135 
3136 		if (rdma_req->req->cmd.opc != SPDK_NVME_OPC_ASYNC_EVENT_REQUEST) {
3137 			continue;
3138 		}
3139 
3140 		nvme_rdma_req_complete(rdma_req, &cpl, false);
3141 	}
3142 }
3143 
3144 static void
3145 nvme_rdma_poller_destroy(struct nvme_rdma_poller *poller)
3146 {
3147 	if (poller->cq) {
3148 		ibv_destroy_cq(poller->cq);
3149 	}
3150 	if (poller->rsps) {
3151 		nvme_rdma_free_rsps(poller->rsps);
3152 	}
3153 	if (poller->srq) {
3154 		spdk_rdma_provider_srq_destroy(poller->srq);
3155 	}
3156 	if (poller->mr_map) {
3157 		spdk_rdma_utils_free_mem_map(&poller->mr_map);
3158 	}
3159 	if (poller->pd) {
3160 		spdk_rdma_utils_put_pd(poller->pd);
3161 	}
3162 	free(poller);
3163 }
3164 
3165 static struct nvme_rdma_poller *
3166 nvme_rdma_poller_create(struct nvme_rdma_poll_group *group, struct ibv_context *ctx)
3167 {
3168 	struct nvme_rdma_poller *poller;
3169 	struct ibv_device_attr dev_attr;
3170 	struct spdk_rdma_provider_srq_init_attr srq_init_attr = {};
3171 	struct nvme_rdma_rsp_opts opts;
3172 	int num_cqe, max_num_cqe;
3173 	int rc;
3174 
3175 	poller = calloc(1, sizeof(*poller));
3176 	if (poller == NULL) {
3177 		SPDK_ERRLOG("Unable to allocate poller.\n");
3178 		return NULL;
3179 	}
3180 
3181 	poller->group = group;
3182 	poller->device = ctx;
3183 
3184 	if (g_spdk_nvme_transport_opts.rdma_srq_size != 0) {
3185 		rc = ibv_query_device(ctx, &dev_attr);
3186 		if (rc) {
3187 			SPDK_ERRLOG("Unable to query RDMA device.\n");
3188 			goto fail;
3189 		}
3190 
3191 		poller->pd = spdk_rdma_utils_get_pd(ctx);
3192 		if (poller->pd == NULL) {
3193 			SPDK_ERRLOG("Unable to get PD.\n");
3194 			goto fail;
3195 		}
3196 
3197 		poller->mr_map = spdk_rdma_utils_create_mem_map(poller->pd, &g_nvme_hooks,
3198 				 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE);
3199 		if (poller->mr_map == NULL) {
3200 			SPDK_ERRLOG("Unable to create memory map.\n");
3201 			goto fail;
3202 		}
3203 
3204 		srq_init_attr.stats = &poller->stats.rdma_stats.recv;
3205 		srq_init_attr.pd = poller->pd;
3206 		srq_init_attr.srq_init_attr.attr.max_wr = spdk_min((uint32_t)dev_attr.max_srq_wr,
3207 				g_spdk_nvme_transport_opts.rdma_srq_size);
3208 		srq_init_attr.srq_init_attr.attr.max_sge = spdk_min(dev_attr.max_sge,
3209 				NVME_RDMA_DEFAULT_RX_SGE);
3210 
3211 		poller->srq = spdk_rdma_provider_srq_create(&srq_init_attr);
3212 		if (poller->srq == NULL) {
3213 			SPDK_ERRLOG("Unable to create SRQ.\n");
3214 			goto fail;
3215 		}
3216 
3217 		opts.num_entries = g_spdk_nvme_transport_opts.rdma_srq_size;
3218 		opts.rqpair = NULL;
3219 		opts.srq = poller->srq;
3220 		opts.mr_map = poller->mr_map;
3221 
3222 		poller->rsps = nvme_rdma_create_rsps(&opts);
3223 		if (poller->rsps == NULL) {
3224 			SPDK_ERRLOG("Unable to create poller RDMA responses.\n");
3225 			goto fail;
3226 		}
3227 
3228 		rc = nvme_rdma_poller_submit_recvs(poller);
3229 		if (rc) {
3230 			SPDK_ERRLOG("Unable to submit poller RDMA responses.\n");
3231 			goto fail;
3232 		}
3233 
3234 		/*
3235 		 * When using an srq, fix the size of the completion queue at startup.
3236 		 * The initiator sends only send and recv WRs. Hence, the multiplier is 2.
3237 		 * (The target sends also data WRs. Hence, the multiplier is 3.)
3238 		 */
3239 		num_cqe = g_spdk_nvme_transport_opts.rdma_srq_size * 2;
3240 	} else {
3241 		num_cqe = DEFAULT_NVME_RDMA_CQ_SIZE;
3242 	}
3243 
3244 	max_num_cqe = g_spdk_nvme_transport_opts.rdma_max_cq_size;
3245 	if (max_num_cqe != 0 && num_cqe > max_num_cqe) {
3246 		num_cqe = max_num_cqe;
3247 	}
3248 
3249 	poller->cq = ibv_create_cq(poller->device, num_cqe, group, NULL, 0);
3250 
3251 	if (poller->cq == NULL) {
3252 		SPDK_ERRLOG("Unable to create CQ, errno %d.\n", errno);
3253 		goto fail;
3254 	}
3255 
3256 	STAILQ_INSERT_HEAD(&group->pollers, poller, link);
3257 	group->num_pollers++;
3258 	poller->current_num_wc = num_cqe;
3259 	poller->required_num_wc = 0;
3260 	return poller;
3261 
3262 fail:
3263 	nvme_rdma_poller_destroy(poller);
3264 	return NULL;
3265 }
3266 
3267 static void
3268 nvme_rdma_poll_group_free_pollers(struct nvme_rdma_poll_group *group)
3269 {
3270 	struct nvme_rdma_poller	*poller, *tmp_poller;
3271 
3272 	STAILQ_FOREACH_SAFE(poller, &group->pollers, link, tmp_poller) {
3273 		assert(poller->refcnt == 0);
3274 		if (poller->refcnt) {
3275 			SPDK_WARNLOG("Destroying poller with non-zero ref count: poller %p, refcnt %d\n",
3276 				     poller, poller->refcnt);
3277 		}
3278 
3279 		STAILQ_REMOVE(&group->pollers, poller, nvme_rdma_poller, link);
3280 		nvme_rdma_poller_destroy(poller);
3281 	}
3282 }
3283 
3284 static struct nvme_rdma_poller *
3285 nvme_rdma_poll_group_get_poller(struct nvme_rdma_poll_group *group, struct ibv_context *device)
3286 {
3287 	struct nvme_rdma_poller *poller = NULL;
3288 
3289 	STAILQ_FOREACH(poller, &group->pollers, link) {
3290 		if (poller->device == device) {
3291 			break;
3292 		}
3293 	}
3294 
3295 	if (!poller) {
3296 		poller = nvme_rdma_poller_create(group, device);
3297 		if (!poller) {
3298 			SPDK_ERRLOG("Failed to create a poller for device %p\n", device);
3299 			return NULL;
3300 		}
3301 	}
3302 
3303 	poller->refcnt++;
3304 	return poller;
3305 }
3306 
3307 static void
3308 nvme_rdma_poll_group_put_poller(struct nvme_rdma_poll_group *group, struct nvme_rdma_poller *poller)
3309 {
3310 	assert(poller->refcnt > 0);
3311 	if (--poller->refcnt == 0) {
3312 		STAILQ_REMOVE(&group->pollers, poller, nvme_rdma_poller, link);
3313 		group->num_pollers--;
3314 		nvme_rdma_poller_destroy(poller);
3315 	}
3316 }
3317 
3318 static struct spdk_nvme_transport_poll_group *
3319 nvme_rdma_poll_group_create(void)
3320 {
3321 	struct nvme_rdma_poll_group	*group;
3322 
3323 	group = calloc(1, sizeof(*group));
3324 	if (group == NULL) {
3325 		SPDK_ERRLOG("Unable to allocate poll group.\n");
3326 		return NULL;
3327 	}
3328 
3329 	STAILQ_INIT(&group->pollers);
3330 	TAILQ_INIT(&group->connecting_qpairs);
3331 	TAILQ_INIT(&group->active_qpairs);
3332 	return &group->group;
3333 }
3334 
3335 static int
3336 nvme_rdma_poll_group_connect_qpair(struct spdk_nvme_qpair *qpair)
3337 {
3338 	return 0;
3339 }
3340 
3341 static int
3342 nvme_rdma_poll_group_disconnect_qpair(struct spdk_nvme_qpair *qpair)
3343 {
3344 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
3345 	struct nvme_rdma_poll_group *group = nvme_rdma_poll_group(qpair->poll_group);
3346 
3347 	if (rqpair->link_connecting.tqe_prev) {
3348 		TAILQ_REMOVE(&group->connecting_qpairs, rqpair, link_connecting);
3349 		/* We use prev pointer to check if qpair is in connecting list or not .
3350 		 * TAILQ_REMOVE doesn't do it. So, we do it manually.
3351 		 */
3352 		rqpair->link_connecting.tqe_prev = NULL;
3353 	}
3354 
3355 	return 0;
3356 }
3357 
3358 static int
3359 nvme_rdma_poll_group_add(struct spdk_nvme_transport_poll_group *tgroup,
3360 			 struct spdk_nvme_qpair *qpair)
3361 {
3362 	return 0;
3363 }
3364 
3365 static int
3366 nvme_rdma_poll_group_remove(struct spdk_nvme_transport_poll_group *tgroup,
3367 			    struct spdk_nvme_qpair *qpair)
3368 {
3369 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
3370 	struct nvme_rdma_poll_group *group = nvme_rdma_poll_group(qpair->poll_group);
3371 
3372 	if (rqpair->link_active.tqe_prev) {
3373 		TAILQ_REMOVE(&group->active_qpairs, rqpair, link_active);
3374 		rqpair->link_active.tqe_prev = NULL;
3375 	}
3376 
3377 	return 0;
3378 }
3379 
3380 static inline void
3381 nvme_rdma_qpair_process_submits(struct nvme_rdma_poll_group *group,
3382 				struct nvme_rdma_qpair *rqpair)
3383 {
3384 	struct spdk_nvme_qpair	*qpair = &rqpair->qpair;
3385 
3386 	assert(rqpair->link_active.tqe_prev != NULL);
3387 
3388 	if (spdk_unlikely(rqpair->state <= NVME_RDMA_QPAIR_STATE_INITIALIZING ||
3389 			  rqpair->state >= NVME_RDMA_QPAIR_STATE_EXITING)) {
3390 		return;
3391 	}
3392 
3393 	if (spdk_unlikely(qpair->ctrlr->timeout_enabled)) {
3394 		nvme_rdma_qpair_check_timeout(qpair);
3395 	}
3396 
3397 	nvme_rdma_qpair_submit_sends(rqpair);
3398 	if (!rqpair->srq) {
3399 		nvme_rdma_qpair_submit_recvs(rqpair);
3400 	}
3401 	if (rqpair->num_completions > 0) {
3402 		nvme_qpair_resubmit_requests(qpair, rqpair->num_completions);
3403 		rqpair->num_completions = 0;
3404 	}
3405 
3406 	if (rqpair->num_outstanding_reqs == 0 && STAILQ_EMPTY(&qpair->queued_req)) {
3407 		TAILQ_REMOVE(&group->active_qpairs, rqpair, link_active);
3408 		/* We use prev pointer to check if qpair is in active list or not.
3409 		 * TAILQ_REMOVE doesn't do it. So, we do it manually.
3410 		 */
3411 		rqpair->link_active.tqe_prev = NULL;
3412 	}
3413 }
3414 
3415 static int64_t
3416 nvme_rdma_poll_group_process_completions(struct spdk_nvme_transport_poll_group *tgroup,
3417 		uint32_t completions_per_qpair, spdk_nvme_disconnected_qpair_cb disconnected_qpair_cb)
3418 {
3419 	struct spdk_nvme_qpair			*qpair, *tmp_qpair;
3420 	struct nvme_rdma_qpair			*rqpair, *tmp_rqpair;
3421 	struct nvme_rdma_poll_group		*group;
3422 	struct nvme_rdma_poller			*poller;
3423 	int					batch_size, rc, rc2 = 0;
3424 	int64_t					total_completions = 0;
3425 	uint64_t				completions_allowed = 0;
3426 	uint64_t				completions_per_poller = 0;
3427 	uint64_t				poller_completions = 0;
3428 	uint64_t				rdma_completions;
3429 
3430 	if (completions_per_qpair == 0) {
3431 		completions_per_qpair = MAX_COMPLETIONS_PER_POLL;
3432 	}
3433 
3434 	group = nvme_rdma_poll_group(tgroup);
3435 
3436 	STAILQ_FOREACH_SAFE(qpair, &tgroup->disconnected_qpairs, poll_group_stailq, tmp_qpair) {
3437 		rc = nvme_rdma_ctrlr_disconnect_qpair_poll(qpair->ctrlr, qpair);
3438 		if (rc == 0) {
3439 			disconnected_qpair_cb(qpair, tgroup->group->ctx);
3440 		}
3441 	}
3442 
3443 	TAILQ_FOREACH_SAFE(rqpair, &group->connecting_qpairs, link_connecting, tmp_rqpair) {
3444 		qpair = &rqpair->qpair;
3445 
3446 		rc = nvme_rdma_ctrlr_connect_qpair_poll(qpair->ctrlr, qpair);
3447 		if (rc == 0 || rc != -EAGAIN) {
3448 			TAILQ_REMOVE(&group->connecting_qpairs, rqpair, link_connecting);
3449 			/* We use prev pointer to check if qpair is in connecting list or not.
3450 			 * TAILQ_REMOVE does not do it. So, we do it manually.
3451 			 */
3452 			rqpair->link_connecting.tqe_prev = NULL;
3453 
3454 			if (rc == 0) {
3455 				/* Once the connection is completed, we can submit queued requests */
3456 				nvme_qpair_resubmit_requests(qpair, rqpair->num_entries);
3457 			} else if (rc != -EAGAIN) {
3458 				SPDK_ERRLOG("Failed to connect rqpair=%p\n", rqpair);
3459 				nvme_rdma_fail_qpair(qpair, 0);
3460 			}
3461 		}
3462 	}
3463 
3464 	STAILQ_FOREACH_SAFE(qpair, &tgroup->connected_qpairs, poll_group_stailq, tmp_qpair) {
3465 		rqpair = nvme_rdma_qpair(qpair);
3466 
3467 		if (spdk_likely(nvme_qpair_get_state(qpair) != NVME_QPAIR_CONNECTING)) {
3468 			nvme_rdma_qpair_process_cm_event(rqpair);
3469 		}
3470 
3471 		if (spdk_unlikely(qpair->transport_failure_reason != SPDK_NVME_QPAIR_FAILURE_NONE)) {
3472 			rc2 = -ENXIO;
3473 			nvme_rdma_fail_qpair(qpair, 0);
3474 		}
3475 	}
3476 
3477 	completions_allowed = completions_per_qpair * tgroup->num_connected_qpairs;
3478 	if (spdk_likely(group->num_pollers)) {
3479 		completions_per_poller = spdk_max(completions_allowed / group->num_pollers, 1);
3480 	}
3481 
3482 	STAILQ_FOREACH(poller, &group->pollers, link) {
3483 		poller_completions = 0;
3484 		rdma_completions = 0;
3485 		do {
3486 			poller->stats.polls++;
3487 			batch_size = spdk_min((completions_per_poller - poller_completions), MAX_COMPLETIONS_PER_POLL);
3488 			rc = nvme_rdma_cq_process_completions(poller->cq, batch_size, poller, NULL, &rdma_completions);
3489 			if (rc <= 0) {
3490 				if (rc == -ECANCELED) {
3491 					return -EIO;
3492 				} else if (rc == 0) {
3493 					poller->stats.idle_polls++;
3494 				}
3495 				break;
3496 			}
3497 
3498 			poller_completions += rc;
3499 		} while (poller_completions < completions_per_poller);
3500 		total_completions += poller_completions;
3501 		poller->stats.completions += rdma_completions;
3502 		if (poller->srq) {
3503 			nvme_rdma_poller_submit_recvs(poller);
3504 		}
3505 	}
3506 
3507 	TAILQ_FOREACH_SAFE(rqpair, &group->active_qpairs, link_active, tmp_rqpair) {
3508 		nvme_rdma_qpair_process_submits(group, rqpair);
3509 	}
3510 
3511 	return rc2 != 0 ? rc2 : total_completions;
3512 }
3513 
3514 /*
3515  * Handle disconnected qpairs when interrupt support gets added.
3516  */
3517 static void
3518 nvme_rdma_poll_group_check_disconnected_qpairs(struct spdk_nvme_transport_poll_group *tgroup,
3519 		spdk_nvme_disconnected_qpair_cb disconnected_qpair_cb)
3520 {
3521 }
3522 
3523 static int
3524 nvme_rdma_poll_group_destroy(struct spdk_nvme_transport_poll_group *tgroup)
3525 {
3526 	struct nvme_rdma_poll_group	*group = nvme_rdma_poll_group(tgroup);
3527 
3528 	if (!STAILQ_EMPTY(&tgroup->connected_qpairs) || !STAILQ_EMPTY(&tgroup->disconnected_qpairs)) {
3529 		return -EBUSY;
3530 	}
3531 
3532 	nvme_rdma_poll_group_free_pollers(group);
3533 	free(group);
3534 
3535 	return 0;
3536 }
3537 
3538 static int
3539 nvme_rdma_poll_group_get_stats(struct spdk_nvme_transport_poll_group *tgroup,
3540 			       struct spdk_nvme_transport_poll_group_stat **_stats)
3541 {
3542 	struct nvme_rdma_poll_group *group;
3543 	struct spdk_nvme_transport_poll_group_stat *stats;
3544 	struct spdk_nvme_rdma_device_stat *device_stat;
3545 	struct nvme_rdma_poller *poller;
3546 	uint32_t i = 0;
3547 
3548 	if (tgroup == NULL || _stats == NULL) {
3549 		SPDK_ERRLOG("Invalid stats or group pointer\n");
3550 		return -EINVAL;
3551 	}
3552 
3553 	group = nvme_rdma_poll_group(tgroup);
3554 	stats = calloc(1, sizeof(*stats));
3555 	if (!stats) {
3556 		SPDK_ERRLOG("Can't allocate memory for RDMA stats\n");
3557 		return -ENOMEM;
3558 	}
3559 	stats->trtype = SPDK_NVME_TRANSPORT_RDMA;
3560 	stats->rdma.num_devices = group->num_pollers;
3561 
3562 	if (stats->rdma.num_devices == 0) {
3563 		*_stats = stats;
3564 		return 0;
3565 	}
3566 
3567 	stats->rdma.device_stats = calloc(stats->rdma.num_devices, sizeof(*stats->rdma.device_stats));
3568 	if (!stats->rdma.device_stats) {
3569 		SPDK_ERRLOG("Can't allocate memory for RDMA device stats\n");
3570 		free(stats);
3571 		return -ENOMEM;
3572 	}
3573 
3574 	STAILQ_FOREACH(poller, &group->pollers, link) {
3575 		device_stat = &stats->rdma.device_stats[i];
3576 		device_stat->name = poller->device->device->name;
3577 		device_stat->polls = poller->stats.polls;
3578 		device_stat->idle_polls = poller->stats.idle_polls;
3579 		device_stat->completions = poller->stats.completions;
3580 		device_stat->queued_requests = poller->stats.queued_requests;
3581 		device_stat->total_send_wrs = poller->stats.rdma_stats.send.num_submitted_wrs;
3582 		device_stat->send_doorbell_updates = poller->stats.rdma_stats.send.doorbell_updates;
3583 		device_stat->total_recv_wrs = poller->stats.rdma_stats.recv.num_submitted_wrs;
3584 		device_stat->recv_doorbell_updates = poller->stats.rdma_stats.recv.doorbell_updates;
3585 		i++;
3586 	}
3587 
3588 	*_stats = stats;
3589 
3590 	return 0;
3591 }
3592 
3593 static void
3594 nvme_rdma_poll_group_free_stats(struct spdk_nvme_transport_poll_group *tgroup,
3595 				struct spdk_nvme_transport_poll_group_stat *stats)
3596 {
3597 	if (stats) {
3598 		free(stats->rdma.device_stats);
3599 	}
3600 	free(stats);
3601 }
3602 
3603 static int
3604 nvme_rdma_ctrlr_get_memory_domains(const struct spdk_nvme_ctrlr *ctrlr,
3605 				   struct spdk_memory_domain **domains, int array_size)
3606 {
3607 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(ctrlr->adminq);
3608 
3609 	if (domains && array_size > 0) {
3610 		domains[0] = rqpair->rdma_qp->domain;
3611 	}
3612 
3613 	return 1;
3614 }
3615 
3616 void
3617 spdk_nvme_rdma_init_hooks(struct spdk_nvme_rdma_hooks *hooks)
3618 {
3619 	g_nvme_hooks = *hooks;
3620 }
3621 
3622 const struct spdk_nvme_transport_ops rdma_ops = {
3623 	.name = "RDMA",
3624 	.type = SPDK_NVME_TRANSPORT_RDMA,
3625 	.ctrlr_construct = nvme_rdma_ctrlr_construct,
3626 	.ctrlr_scan = nvme_fabric_ctrlr_scan,
3627 	.ctrlr_destruct = nvme_rdma_ctrlr_destruct,
3628 	.ctrlr_enable = nvme_rdma_ctrlr_enable,
3629 
3630 	.ctrlr_set_reg_4 = nvme_fabric_ctrlr_set_reg_4,
3631 	.ctrlr_set_reg_8 = nvme_fabric_ctrlr_set_reg_8,
3632 	.ctrlr_get_reg_4 = nvme_fabric_ctrlr_get_reg_4,
3633 	.ctrlr_get_reg_8 = nvme_fabric_ctrlr_get_reg_8,
3634 	.ctrlr_set_reg_4_async = nvme_fabric_ctrlr_set_reg_4_async,
3635 	.ctrlr_set_reg_8_async = nvme_fabric_ctrlr_set_reg_8_async,
3636 	.ctrlr_get_reg_4_async = nvme_fabric_ctrlr_get_reg_4_async,
3637 	.ctrlr_get_reg_8_async = nvme_fabric_ctrlr_get_reg_8_async,
3638 
3639 	.ctrlr_get_max_xfer_size = nvme_rdma_ctrlr_get_max_xfer_size,
3640 	.ctrlr_get_max_sges = nvme_rdma_ctrlr_get_max_sges,
3641 
3642 	.ctrlr_create_io_qpair = nvme_rdma_ctrlr_create_io_qpair,
3643 	.ctrlr_delete_io_qpair = nvme_rdma_ctrlr_delete_io_qpair,
3644 	.ctrlr_connect_qpair = nvme_rdma_ctrlr_connect_qpair,
3645 	.ctrlr_disconnect_qpair = nvme_rdma_ctrlr_disconnect_qpair,
3646 
3647 	.ctrlr_get_memory_domains = nvme_rdma_ctrlr_get_memory_domains,
3648 
3649 	.qpair_abort_reqs = nvme_rdma_qpair_abort_reqs,
3650 	.qpair_reset = nvme_rdma_qpair_reset,
3651 	.qpair_submit_request = nvme_rdma_qpair_submit_request,
3652 	.qpair_process_completions = nvme_rdma_qpair_process_completions,
3653 	.qpair_iterate_requests = nvme_rdma_qpair_iterate_requests,
3654 	.qpair_authenticate = nvme_rdma_qpair_authenticate,
3655 	.admin_qpair_abort_aers = nvme_rdma_admin_qpair_abort_aers,
3656 
3657 	.poll_group_create = nvme_rdma_poll_group_create,
3658 	.poll_group_connect_qpair = nvme_rdma_poll_group_connect_qpair,
3659 	.poll_group_disconnect_qpair = nvme_rdma_poll_group_disconnect_qpair,
3660 	.poll_group_add = nvme_rdma_poll_group_add,
3661 	.poll_group_remove = nvme_rdma_poll_group_remove,
3662 	.poll_group_process_completions = nvme_rdma_poll_group_process_completions,
3663 	.poll_group_check_disconnected_qpairs = nvme_rdma_poll_group_check_disconnected_qpairs,
3664 	.poll_group_destroy = nvme_rdma_poll_group_destroy,
3665 	.poll_group_get_stats = nvme_rdma_poll_group_get_stats,
3666 	.poll_group_free_stats = nvme_rdma_poll_group_free_stats,
3667 };
3668 
3669 SPDK_NVME_TRANSPORT_REGISTER(rdma, &rdma_ops);
3670