xref: /spdk/lib/nvme/nvme_rdma.c (revision b78e763c1af2ace4c19d2932065a43357e3f5d3e)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 /*
35  * NVMe over RDMA transport
36  */
37 
38 #include "spdk/stdinc.h"
39 
40 #include <infiniband/verbs.h>
41 #include <rdma/rdma_cma.h>
42 #include <rdma/rdma_verbs.h>
43 
44 #include "spdk/assert.h"
45 #include "spdk/log.h"
46 #include "spdk/trace.h"
47 #include "spdk/event.h"
48 #include "spdk/queue.h"
49 #include "spdk/nvme.h"
50 #include "spdk/nvmf_spec.h"
51 #include "spdk/string.h"
52 #include "spdk/endian.h"
53 #include "spdk/likely.h"
54 
55 #include "nvme_internal.h"
56 
57 #define NVME_RDMA_TIME_OUT_IN_MS 2000
58 #define NVME_RDMA_RW_BUFFER_SIZE 131072
59 
60 /*
61  * NVME RDMA qpair Resource Defaults
62  */
63 #define NVME_RDMA_DEFAULT_TX_SGE		2
64 #define NVME_RDMA_DEFAULT_RX_SGE		1
65 
66 
67 /* Max number of NVMe-oF SGL descriptors supported by the host */
68 #define NVME_RDMA_MAX_SGL_DESCRIPTORS		16
69 struct spdk_nvmf_cmd {
70 	struct spdk_nvme_cmd cmd;
71 	struct spdk_nvme_sgl_descriptor sgl[NVME_RDMA_MAX_SGL_DESCRIPTORS];
72 };
73 
74 struct spdk_nvme_rdma_hooks g_nvme_hooks = {};
75 
76 /* Mapping from virtual address to ibv_mr pointer for a protection domain */
77 struct spdk_nvme_rdma_mr_map {
78 	struct ibv_pd				*pd;
79 	struct spdk_mem_map			*map;
80 	uint64_t				ref;
81 	LIST_ENTRY(spdk_nvme_rdma_mr_map)	link;
82 };
83 
84 /* NVMe RDMA transport extensions for spdk_nvme_ctrlr */
85 struct nvme_rdma_ctrlr {
86 	struct spdk_nvme_ctrlr			ctrlr;
87 
88 	struct ibv_pd				*pd;
89 };
90 
91 /* NVMe RDMA qpair extensions for spdk_nvme_qpair */
92 struct nvme_rdma_qpair {
93 	struct spdk_nvme_qpair			qpair;
94 
95 	struct rdma_cm_id			*cm_id;
96 
97 	struct ibv_cq				*cq;
98 
99 	struct	spdk_nvme_rdma_req		*rdma_reqs;
100 
101 	uint32_t				max_send_sge;
102 
103 	uint32_t				max_recv_sge;
104 
105 	uint16_t				num_entries;
106 
107 	/* Parallel arrays of response buffers + response SGLs of size num_entries */
108 	struct ibv_sge				*rsp_sgls;
109 	struct spdk_nvme_cpl			*rsps;
110 
111 	struct ibv_recv_wr			*rsp_recv_wrs;
112 
113 	/* Memory region describing all rsps for this qpair */
114 	struct ibv_mr				*rsp_mr;
115 
116 	/*
117 	 * Array of num_entries NVMe commands registered as RDMA message buffers.
118 	 * Indexed by rdma_req->id.
119 	 */
120 	struct spdk_nvmf_cmd			*cmds;
121 
122 	/* Memory region describing all cmds for this qpair */
123 	struct ibv_mr				*cmd_mr;
124 
125 	struct spdk_nvme_rdma_mr_map		*mr_map;
126 
127 	TAILQ_HEAD(, spdk_nvme_rdma_req)	free_reqs;
128 	TAILQ_HEAD(, spdk_nvme_rdma_req)	outstanding_reqs;
129 
130 	/* Placed at the end of the struct since it is not used frequently */
131 	struct rdma_event_channel		*cm_channel;
132 };
133 
134 struct spdk_nvme_rdma_req {
135 	int					id;
136 
137 	struct ibv_send_wr			send_wr;
138 
139 	struct nvme_request			*req;
140 
141 	struct ibv_sge				send_sgl[NVME_RDMA_DEFAULT_TX_SGE];
142 
143 	TAILQ_ENTRY(spdk_nvme_rdma_req)		link;
144 };
145 
146 static const char *rdma_cm_event_str[] = {
147 	"RDMA_CM_EVENT_ADDR_RESOLVED",
148 	"RDMA_CM_EVENT_ADDR_ERROR",
149 	"RDMA_CM_EVENT_ROUTE_RESOLVED",
150 	"RDMA_CM_EVENT_ROUTE_ERROR",
151 	"RDMA_CM_EVENT_CONNECT_REQUEST",
152 	"RDMA_CM_EVENT_CONNECT_RESPONSE",
153 	"RDMA_CM_EVENT_CONNECT_ERROR",
154 	"RDMA_CM_EVENT_UNREACHABLE",
155 	"RDMA_CM_EVENT_REJECTED",
156 	"RDMA_CM_EVENT_ESTABLISHED",
157 	"RDMA_CM_EVENT_DISCONNECTED",
158 	"RDMA_CM_EVENT_DEVICE_REMOVAL",
159 	"RDMA_CM_EVENT_MULTICAST_JOIN",
160 	"RDMA_CM_EVENT_MULTICAST_ERROR",
161 	"RDMA_CM_EVENT_ADDR_CHANGE",
162 	"RDMA_CM_EVENT_TIMEWAIT_EXIT"
163 };
164 
165 static LIST_HEAD(, spdk_nvme_rdma_mr_map) g_rdma_mr_maps = LIST_HEAD_INITIALIZER(&g_rdma_mr_maps);
166 static pthread_mutex_t g_rdma_mr_maps_mutex = PTHREAD_MUTEX_INITIALIZER;
167 
168 static int nvme_rdma_qpair_destroy(struct spdk_nvme_qpair *qpair);
169 
170 static inline struct nvme_rdma_qpair *
171 nvme_rdma_qpair(struct spdk_nvme_qpair *qpair)
172 {
173 	assert(qpair->trtype == SPDK_NVME_TRANSPORT_RDMA);
174 	return SPDK_CONTAINEROF(qpair, struct nvme_rdma_qpair, qpair);
175 }
176 
177 static inline struct nvme_rdma_ctrlr *
178 nvme_rdma_ctrlr(struct spdk_nvme_ctrlr *ctrlr)
179 {
180 	assert(ctrlr->trid.trtype == SPDK_NVME_TRANSPORT_RDMA);
181 	return SPDK_CONTAINEROF(ctrlr, struct nvme_rdma_ctrlr, ctrlr);
182 }
183 
184 static struct spdk_nvme_rdma_req *
185 nvme_rdma_req_get(struct nvme_rdma_qpair *rqpair)
186 {
187 	struct spdk_nvme_rdma_req *rdma_req;
188 
189 	rdma_req = TAILQ_FIRST(&rqpair->free_reqs);
190 	if (rdma_req) {
191 		TAILQ_REMOVE(&rqpair->free_reqs, rdma_req, link);
192 		TAILQ_INSERT_TAIL(&rqpair->outstanding_reqs, rdma_req, link);
193 	}
194 
195 	return rdma_req;
196 }
197 
198 static void
199 nvme_rdma_req_put(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
200 {
201 	TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link);
202 	TAILQ_INSERT_HEAD(&rqpair->free_reqs, rdma_req, link);
203 }
204 
205 static void
206 nvme_rdma_req_complete(struct nvme_request *req,
207 		       struct spdk_nvme_cpl *rsp)
208 {
209 	nvme_complete_request(req, rsp);
210 	nvme_free_request(req);
211 }
212 
213 static const char *
214 nvme_rdma_cm_event_str_get(uint32_t event)
215 {
216 	if (event < SPDK_COUNTOF(rdma_cm_event_str)) {
217 		return rdma_cm_event_str[event];
218 	} else {
219 		return "Undefined";
220 	}
221 }
222 
223 static struct rdma_cm_event *
224 nvme_rdma_get_event(struct rdma_event_channel *channel,
225 		    enum rdma_cm_event_type evt)
226 {
227 	struct rdma_cm_event	*event;
228 	int			rc;
229 
230 	rc = rdma_get_cm_event(channel, &event);
231 	if (rc < 0) {
232 		SPDK_ERRLOG("Failed to get event from CM event channel. Error %d (%s)\n",
233 			    errno, spdk_strerror(errno));
234 		return NULL;
235 	}
236 
237 	if (event->event != evt) {
238 		SPDK_ERRLOG("Expected %s but received %s (%d) from CM event channel (status = %d)\n",
239 			    nvme_rdma_cm_event_str_get(evt),
240 			    nvme_rdma_cm_event_str_get(event->event), event->event, event->status);
241 		rdma_ack_cm_event(event);
242 		return NULL;
243 	}
244 
245 	return event;
246 }
247 
248 static int
249 nvme_rdma_qpair_init(struct nvme_rdma_qpair *rqpair)
250 {
251 	int			rc;
252 	struct ibv_qp_init_attr	attr;
253 	struct ibv_device_attr	dev_attr;
254 	struct nvme_rdma_ctrlr	*rctrlr;
255 
256 	rc = ibv_query_device(rqpair->cm_id->verbs, &dev_attr);
257 	if (rc != 0) {
258 		SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
259 		return -1;
260 	}
261 
262 	rqpair->cq = ibv_create_cq(rqpair->cm_id->verbs, rqpair->num_entries * 2, rqpair, NULL, 0);
263 	if (!rqpair->cq) {
264 		SPDK_ERRLOG("Unable to create completion queue: errno %d: %s\n", errno, spdk_strerror(errno));
265 		return -1;
266 	}
267 
268 	rctrlr = nvme_rdma_ctrlr(rqpair->qpair.ctrlr);
269 	if (g_nvme_hooks.get_ibv_pd) {
270 		rctrlr->pd = g_nvme_hooks.get_ibv_pd(&rctrlr->ctrlr.trid, rqpair->cm_id->verbs);
271 	} else {
272 		rctrlr->pd = NULL;
273 	}
274 
275 	memset(&attr, 0, sizeof(struct ibv_qp_init_attr));
276 	attr.qp_type		= IBV_QPT_RC;
277 	attr.send_cq		= rqpair->cq;
278 	attr.recv_cq		= rqpair->cq;
279 	attr.cap.max_send_wr	= rqpair->num_entries; /* SEND operations */
280 	attr.cap.max_recv_wr	= rqpair->num_entries; /* RECV operations */
281 	attr.cap.max_send_sge	= spdk_min(NVME_RDMA_DEFAULT_TX_SGE, dev_attr.max_sge);
282 	attr.cap.max_recv_sge	= spdk_min(NVME_RDMA_DEFAULT_RX_SGE, dev_attr.max_sge);
283 
284 	rc = rdma_create_qp(rqpair->cm_id, rctrlr->pd, &attr);
285 
286 	if (rc) {
287 		SPDK_ERRLOG("rdma_create_qp failed\n");
288 		return -1;
289 	}
290 
291 	/* ibv_create_qp will change the values in attr.cap. Make sure we store the proper value. */
292 	rqpair->max_send_sge = spdk_min(NVME_RDMA_DEFAULT_TX_SGE, attr.cap.max_send_sge);
293 	rqpair->max_recv_sge = spdk_min(NVME_RDMA_DEFAULT_RX_SGE, attr.cap.max_recv_sge);
294 
295 	rctrlr->pd = rqpair->cm_id->qp->pd;
296 
297 	rqpair->cm_id->context = &rqpair->qpair;
298 
299 	return 0;
300 }
301 
302 #define nvme_rdma_trace_ibv_sge(sg_list) \
303 	if (sg_list) { \
304 		SPDK_DEBUGLOG(SPDK_LOG_NVME, "local addr %p length 0x%x lkey 0x%x\n", \
305 			      (void *)(sg_list)->addr, (sg_list)->length, (sg_list)->lkey); \
306 	}
307 
308 static int
309 nvme_rdma_post_recv(struct nvme_rdma_qpair *rqpair, uint16_t rsp_idx)
310 {
311 	struct ibv_recv_wr *wr, *bad_wr = NULL;
312 	int rc;
313 
314 	wr = &rqpair->rsp_recv_wrs[rsp_idx];
315 	nvme_rdma_trace_ibv_sge(wr->sg_list);
316 
317 	rc = ibv_post_recv(rqpair->cm_id->qp, wr, &bad_wr);
318 	if (rc) {
319 		SPDK_ERRLOG("Failure posting rdma recv, rc = 0x%x\n", rc);
320 	}
321 
322 	return rc;
323 }
324 
325 static void
326 nvme_rdma_free_rsps(struct nvme_rdma_qpair *rqpair)
327 {
328 	if (rqpair->rsp_mr && rdma_dereg_mr(rqpair->rsp_mr)) {
329 		SPDK_ERRLOG("Unable to de-register rsp_mr\n");
330 	}
331 	rqpair->rsp_mr = NULL;
332 
333 	free(rqpair->rsps);
334 	rqpair->rsps = NULL;
335 	free(rqpair->rsp_sgls);
336 	rqpair->rsp_sgls = NULL;
337 	free(rqpair->rsp_recv_wrs);
338 	rqpair->rsp_recv_wrs = NULL;
339 }
340 
341 static int
342 nvme_rdma_alloc_rsps(struct nvme_rdma_qpair *rqpair)
343 {
344 	uint16_t i;
345 
346 	rqpair->rsp_mr = NULL;
347 	rqpair->rsps = NULL;
348 	rqpair->rsp_recv_wrs = NULL;
349 
350 	rqpair->rsp_sgls = calloc(rqpair->num_entries, sizeof(*rqpair->rsp_sgls));
351 	if (!rqpair->rsp_sgls) {
352 		SPDK_ERRLOG("Failed to allocate rsp_sgls\n");
353 		goto fail;
354 	}
355 
356 	rqpair->rsp_recv_wrs = calloc(rqpair->num_entries,
357 				      sizeof(*rqpair->rsp_recv_wrs));
358 	if (!rqpair->rsp_recv_wrs) {
359 		SPDK_ERRLOG("Failed to allocate rsp_recv_wrs\n");
360 		goto fail;
361 	}
362 
363 	rqpair->rsps = calloc(rqpair->num_entries, sizeof(*rqpair->rsps));
364 	if (!rqpair->rsps) {
365 		SPDK_ERRLOG("can not allocate rdma rsps\n");
366 		goto fail;
367 	}
368 
369 	rqpair->rsp_mr = rdma_reg_msgs(rqpair->cm_id, rqpair->rsps,
370 				       rqpair->num_entries * sizeof(*rqpair->rsps));
371 	if (rqpair->rsp_mr == NULL) {
372 		SPDK_ERRLOG("Unable to register rsp_mr\n");
373 		goto fail;
374 	}
375 
376 	for (i = 0; i < rqpair->num_entries; i++) {
377 		struct ibv_sge *rsp_sgl = &rqpair->rsp_sgls[i];
378 
379 		rsp_sgl->addr = (uint64_t)&rqpair->rsps[i];
380 		rsp_sgl->length = sizeof(rqpair->rsps[i]);
381 		rsp_sgl->lkey = rqpair->rsp_mr->lkey;
382 
383 		rqpair->rsp_recv_wrs[i].wr_id = i;
384 		rqpair->rsp_recv_wrs[i].next = NULL;
385 		rqpair->rsp_recv_wrs[i].sg_list = rsp_sgl;
386 		rqpair->rsp_recv_wrs[i].num_sge = 1;
387 
388 		if (nvme_rdma_post_recv(rqpair, i)) {
389 			SPDK_ERRLOG("Unable to post connection rx desc\n");
390 			goto fail;
391 		}
392 	}
393 
394 	return 0;
395 
396 fail:
397 	nvme_rdma_free_rsps(rqpair);
398 	return -ENOMEM;
399 }
400 
401 static void
402 nvme_rdma_free_reqs(struct nvme_rdma_qpair *rqpair)
403 {
404 	if (!rqpair->rdma_reqs) {
405 		return;
406 	}
407 
408 	if (rqpair->cmd_mr && rdma_dereg_mr(rqpair->cmd_mr)) {
409 		SPDK_ERRLOG("Unable to de-register cmd_mr\n");
410 	}
411 	rqpair->cmd_mr = NULL;
412 
413 	free(rqpair->cmds);
414 	rqpair->cmds = NULL;
415 
416 	free(rqpair->rdma_reqs);
417 	rqpair->rdma_reqs = NULL;
418 }
419 
420 static int
421 nvme_rdma_alloc_reqs(struct nvme_rdma_qpair *rqpair)
422 {
423 	int i;
424 
425 	rqpair->rdma_reqs = calloc(rqpair->num_entries, sizeof(struct spdk_nvme_rdma_req));
426 	if (rqpair->rdma_reqs == NULL) {
427 		SPDK_ERRLOG("Failed to allocate rdma_reqs\n");
428 		goto fail;
429 	}
430 
431 	rqpair->cmds = calloc(rqpair->num_entries, sizeof(*rqpair->cmds));
432 	if (!rqpair->cmds) {
433 		SPDK_ERRLOG("Failed to allocate RDMA cmds\n");
434 		goto fail;
435 	}
436 
437 	rqpair->cmd_mr = rdma_reg_msgs(rqpair->cm_id, rqpair->cmds,
438 				       rqpair->num_entries * sizeof(*rqpair->cmds));
439 	if (!rqpair->cmd_mr) {
440 		SPDK_ERRLOG("Unable to register cmd_mr\n");
441 		goto fail;
442 	}
443 
444 	TAILQ_INIT(&rqpair->free_reqs);
445 	TAILQ_INIT(&rqpair->outstanding_reqs);
446 	for (i = 0; i < rqpair->num_entries; i++) {
447 		struct spdk_nvme_rdma_req	*rdma_req;
448 		struct spdk_nvmf_cmd		*cmd;
449 
450 		rdma_req = &rqpair->rdma_reqs[i];
451 		cmd = &rqpair->cmds[i];
452 
453 		rdma_req->id = i;
454 
455 		/* The first RDMA sgl element will always point
456 		 * at this data structure. Depending on whether
457 		 * an NVMe-oF SGL is required, the length of
458 		 * this element may change. */
459 		rdma_req->send_sgl[0].addr = (uint64_t)cmd;
460 		rdma_req->send_sgl[0].lkey = rqpair->cmd_mr->lkey;
461 
462 		rdma_req->send_wr.wr_id = (uint64_t)rdma_req;
463 		rdma_req->send_wr.next = NULL;
464 		rdma_req->send_wr.opcode = IBV_WR_SEND;
465 		rdma_req->send_wr.send_flags = IBV_SEND_SIGNALED;
466 		rdma_req->send_wr.sg_list = rdma_req->send_sgl;
467 		rdma_req->send_wr.imm_data = 0;
468 
469 		TAILQ_INSERT_TAIL(&rqpair->free_reqs, rdma_req, link);
470 	}
471 
472 	return 0;
473 
474 fail:
475 	nvme_rdma_free_reqs(rqpair);
476 	return -ENOMEM;
477 }
478 
479 static int
480 nvme_rdma_recv(struct nvme_rdma_qpair *rqpair, uint64_t rsp_idx)
481 {
482 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
483 	struct spdk_nvme_rdma_req *rdma_req;
484 	struct spdk_nvme_cpl *rsp;
485 	struct nvme_request *req;
486 
487 	assert(rsp_idx < rqpair->num_entries);
488 	rsp = &rqpair->rsps[rsp_idx];
489 	rdma_req = &rqpair->rdma_reqs[rsp->cid];
490 
491 	req = rdma_req->req;
492 	nvme_rdma_req_complete(req, rsp);
493 
494 	nvme_rdma_req_put(rqpair, rdma_req);
495 	if (nvme_rdma_post_recv(rqpair, rsp_idx)) {
496 		SPDK_ERRLOG("Unable to re-post rx descriptor\n");
497 		return -1;
498 	}
499 
500 	if (!STAILQ_EMPTY(&qpair->queued_req) && !qpair->ctrlr->is_resetting) {
501 		req = STAILQ_FIRST(&qpair->queued_req);
502 		STAILQ_REMOVE_HEAD(&qpair->queued_req, stailq);
503 		nvme_qpair_submit_request(qpair, req);
504 	}
505 
506 	return 0;
507 }
508 
509 static int
510 nvme_rdma_resolve_addr(struct nvme_rdma_qpair *rqpair,
511 		       struct sockaddr *src_addr,
512 		       struct sockaddr *dst_addr,
513 		       struct rdma_event_channel *cm_channel)
514 {
515 	int ret;
516 	struct rdma_cm_event *event;
517 
518 	ret = rdma_resolve_addr(rqpair->cm_id, src_addr, dst_addr,
519 				NVME_RDMA_TIME_OUT_IN_MS);
520 	if (ret) {
521 		SPDK_ERRLOG("rdma_resolve_addr, %d\n", errno);
522 		return ret;
523 	}
524 
525 	event = nvme_rdma_get_event(cm_channel, RDMA_CM_EVENT_ADDR_RESOLVED);
526 	if (event == NULL) {
527 		SPDK_ERRLOG("RDMA address resolution error\n");
528 		return -1;
529 	}
530 	rdma_ack_cm_event(event);
531 
532 	ret = rdma_resolve_route(rqpair->cm_id, NVME_RDMA_TIME_OUT_IN_MS);
533 	if (ret) {
534 		SPDK_ERRLOG("rdma_resolve_route\n");
535 		return ret;
536 	}
537 
538 	event = nvme_rdma_get_event(cm_channel, RDMA_CM_EVENT_ROUTE_RESOLVED);
539 	if (event == NULL) {
540 		SPDK_ERRLOG("RDMA route resolution error\n");
541 		return -1;
542 	}
543 	rdma_ack_cm_event(event);
544 
545 	return 0;
546 }
547 
548 static int
549 nvme_rdma_connect(struct nvme_rdma_qpair *rqpair)
550 {
551 	struct rdma_conn_param				param = {};
552 	struct spdk_nvmf_rdma_request_private_data	request_data = {};
553 	struct spdk_nvmf_rdma_accept_private_data	*accept_data;
554 	struct ibv_device_attr				attr;
555 	int						ret;
556 	struct rdma_cm_event				*event;
557 	struct spdk_nvme_ctrlr				*ctrlr;
558 
559 	ret = ibv_query_device(rqpair->cm_id->verbs, &attr);
560 	if (ret != 0) {
561 		SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
562 		return ret;
563 	}
564 
565 	param.responder_resources = spdk_min(rqpair->num_entries, attr.max_qp_rd_atom);
566 
567 	ctrlr = rqpair->qpair.ctrlr;
568 	if (!ctrlr) {
569 		return -1;
570 	}
571 
572 	request_data.qid = rqpair->qpair.id;
573 	request_data.hrqsize = rqpair->num_entries;
574 	request_data.hsqsize = rqpair->num_entries - 1;
575 	request_data.cntlid = ctrlr->cntlid;
576 
577 	param.private_data = &request_data;
578 	param.private_data_len = sizeof(request_data);
579 	param.retry_count = 7;
580 	param.rnr_retry_count = 7;
581 
582 	ret = rdma_connect(rqpair->cm_id, &param);
583 	if (ret) {
584 		SPDK_ERRLOG("nvme rdma connect error\n");
585 		return ret;
586 	}
587 
588 	event = nvme_rdma_get_event(rqpair->cm_channel, RDMA_CM_EVENT_ESTABLISHED);
589 	if (event == NULL) {
590 		SPDK_ERRLOG("RDMA connect error\n");
591 		return -1;
592 	}
593 
594 	accept_data = (struct spdk_nvmf_rdma_accept_private_data *)event->param.conn.private_data;
595 	if (accept_data == NULL) {
596 		rdma_ack_cm_event(event);
597 		SPDK_ERRLOG("NVMe-oF target did not return accept data\n");
598 		return -1;
599 	}
600 
601 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "Requested queue depth %d. Actually got queue depth %d.\n",
602 		      rqpair->num_entries, accept_data->crqsize);
603 
604 	rqpair->num_entries = spdk_min(rqpair->num_entries, accept_data->crqsize);
605 
606 	rdma_ack_cm_event(event);
607 
608 	return 0;
609 }
610 
611 static int
612 nvme_rdma_parse_addr(struct sockaddr_storage *sa, int family, const char *addr, const char *service)
613 {
614 	struct addrinfo *res;
615 	struct addrinfo hints;
616 	int ret;
617 
618 	memset(&hints, 0, sizeof(hints));
619 	hints.ai_family = family;
620 	hints.ai_socktype = SOCK_STREAM;
621 	hints.ai_protocol = 0;
622 
623 	ret = getaddrinfo(addr, service, &hints, &res);
624 	if (ret) {
625 		SPDK_ERRLOG("getaddrinfo failed: %s (%d)\n", gai_strerror(ret), ret);
626 		return ret;
627 	}
628 
629 	if (res->ai_addrlen > sizeof(*sa)) {
630 		SPDK_ERRLOG("getaddrinfo() ai_addrlen %zu too large\n", (size_t)res->ai_addrlen);
631 		ret = EINVAL;
632 	} else {
633 		memcpy(sa, res->ai_addr, res->ai_addrlen);
634 	}
635 
636 	freeaddrinfo(res);
637 	return ret;
638 }
639 
640 static int
641 nvme_rdma_mr_map_notify(void *cb_ctx, struct spdk_mem_map *map,
642 			enum spdk_mem_map_notify_action action,
643 			void *vaddr, size_t size)
644 {
645 	struct ibv_pd *pd = cb_ctx;
646 	struct ibv_mr *mr;
647 	int rc;
648 
649 	switch (action) {
650 	case SPDK_MEM_MAP_NOTIFY_REGISTER:
651 		if (!g_nvme_hooks.get_rkey) {
652 			mr = ibv_reg_mr(pd, vaddr, size,
653 					IBV_ACCESS_LOCAL_WRITE |
654 					IBV_ACCESS_REMOTE_READ |
655 					IBV_ACCESS_REMOTE_WRITE);
656 			if (mr == NULL) {
657 				SPDK_ERRLOG("ibv_reg_mr() failed\n");
658 				return -EFAULT;
659 			} else {
660 				rc = spdk_mem_map_set_translation(map, (uint64_t)vaddr, size, (uint64_t)mr);
661 			}
662 		} else {
663 			rc = spdk_mem_map_set_translation(map, (uint64_t)vaddr, size,
664 							  g_nvme_hooks.get_rkey(pd, vaddr, size));
665 		}
666 		break;
667 	case SPDK_MEM_MAP_NOTIFY_UNREGISTER:
668 		if (!g_nvme_hooks.get_rkey) {
669 			mr = (struct ibv_mr *)spdk_mem_map_translate(map, (uint64_t)vaddr, NULL);
670 			if (mr) {
671 				ibv_dereg_mr(mr);
672 			}
673 		}
674 		rc = spdk_mem_map_clear_translation(map, (uint64_t)vaddr, size);
675 		break;
676 	default:
677 		SPDK_UNREACHABLE();
678 	}
679 
680 	return rc;
681 }
682 
683 static int
684 nvme_rdma_check_contiguous_entries(uint64_t addr_1, uint64_t addr_2)
685 {
686 	/* Two contiguous mappings will point to the same address which is the start of the RDMA MR. */
687 	return addr_1 == addr_2;
688 }
689 
690 static int
691 nvme_rdma_register_mem(struct nvme_rdma_qpair *rqpair)
692 {
693 	struct ibv_pd *pd = rqpair->cm_id->qp->pd;
694 	struct spdk_nvme_rdma_mr_map *mr_map;
695 	const struct spdk_mem_map_ops nvme_rdma_map_ops = {
696 		.notify_cb = nvme_rdma_mr_map_notify,
697 		.are_contiguous = nvme_rdma_check_contiguous_entries
698 	};
699 
700 	pthread_mutex_lock(&g_rdma_mr_maps_mutex);
701 
702 	/* Look up existing mem map registration for this pd */
703 	LIST_FOREACH(mr_map, &g_rdma_mr_maps, link) {
704 		if (mr_map->pd == pd) {
705 			mr_map->ref++;
706 			rqpair->mr_map = mr_map;
707 			pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
708 			return 0;
709 		}
710 	}
711 
712 	mr_map = calloc(1, sizeof(*mr_map));
713 	if (mr_map == NULL) {
714 		SPDK_ERRLOG("calloc() failed\n");
715 		pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
716 		return -1;
717 	}
718 
719 	mr_map->ref = 1;
720 	mr_map->pd = pd;
721 	mr_map->map = spdk_mem_map_alloc((uint64_t)NULL, &nvme_rdma_map_ops, pd);
722 	if (mr_map->map == NULL) {
723 		SPDK_ERRLOG("spdk_mem_map_alloc() failed\n");
724 		free(mr_map);
725 		pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
726 		return -1;
727 	}
728 
729 	rqpair->mr_map = mr_map;
730 	LIST_INSERT_HEAD(&g_rdma_mr_maps, mr_map, link);
731 
732 	pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
733 
734 	return 0;
735 }
736 
737 static void
738 nvme_rdma_unregister_mem(struct nvme_rdma_qpair *rqpair)
739 {
740 	struct spdk_nvme_rdma_mr_map *mr_map;
741 
742 	mr_map = rqpair->mr_map;
743 	rqpair->mr_map = NULL;
744 
745 	if (mr_map == NULL) {
746 		return;
747 	}
748 
749 	pthread_mutex_lock(&g_rdma_mr_maps_mutex);
750 
751 	assert(mr_map->ref > 0);
752 	mr_map->ref--;
753 	if (mr_map->ref == 0) {
754 		LIST_REMOVE(mr_map, link);
755 		spdk_mem_map_free(&mr_map->map);
756 		free(mr_map);
757 	}
758 
759 	pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
760 }
761 
762 static int
763 nvme_rdma_qpair_connect(struct nvme_rdma_qpair *rqpair)
764 {
765 	struct sockaddr_storage dst_addr;
766 	struct sockaddr_storage src_addr;
767 	bool src_addr_specified;
768 	int rc;
769 	struct spdk_nvme_ctrlr *ctrlr;
770 	int family;
771 
772 	rqpair->cm_channel = rdma_create_event_channel();
773 	if (rqpair->cm_channel == NULL) {
774 		SPDK_ERRLOG("rdma_create_event_channel() failed\n");
775 		return -1;
776 	}
777 
778 	ctrlr = rqpair->qpair.ctrlr;
779 
780 	switch (ctrlr->trid.adrfam) {
781 	case SPDK_NVMF_ADRFAM_IPV4:
782 		family = AF_INET;
783 		break;
784 	case SPDK_NVMF_ADRFAM_IPV6:
785 		family = AF_INET6;
786 		break;
787 	default:
788 		SPDK_ERRLOG("Unhandled ADRFAM %d\n", ctrlr->trid.adrfam);
789 		return -1;
790 	}
791 
792 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "adrfam %d ai_family %d\n", ctrlr->trid.adrfam, family);
793 
794 	memset(&dst_addr, 0, sizeof(dst_addr));
795 
796 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "trsvcid is %s\n", ctrlr->trid.trsvcid);
797 	rc = nvme_rdma_parse_addr(&dst_addr, family, ctrlr->trid.traddr, ctrlr->trid.trsvcid);
798 	if (rc != 0) {
799 		SPDK_ERRLOG("dst_addr nvme_rdma_parse_addr() failed\n");
800 		return -1;
801 	}
802 
803 	if (ctrlr->opts.src_addr[0] || ctrlr->opts.src_svcid[0]) {
804 		memset(&src_addr, 0, sizeof(src_addr));
805 		rc = nvme_rdma_parse_addr(&src_addr, family, ctrlr->opts.src_addr, ctrlr->opts.src_svcid);
806 		if (rc != 0) {
807 			SPDK_ERRLOG("src_addr nvme_rdma_parse_addr() failed\n");
808 			return -1;
809 		}
810 		src_addr_specified = true;
811 	} else {
812 		src_addr_specified = false;
813 	}
814 
815 	rc = rdma_create_id(rqpair->cm_channel, &rqpair->cm_id, rqpair, RDMA_PS_TCP);
816 	if (rc < 0) {
817 		SPDK_ERRLOG("rdma_create_id() failed\n");
818 		return -1;
819 	}
820 
821 	rc = nvme_rdma_resolve_addr(rqpair,
822 				    src_addr_specified ? (struct sockaddr *)&src_addr : NULL,
823 				    (struct sockaddr *)&dst_addr, rqpair->cm_channel);
824 	if (rc < 0) {
825 		SPDK_ERRLOG("nvme_rdma_resolve_addr() failed\n");
826 		return -1;
827 	}
828 
829 	rc = nvme_rdma_qpair_init(rqpair);
830 	if (rc < 0) {
831 		SPDK_ERRLOG("nvme_rdma_qpair_init() failed\n");
832 		return -1;
833 	}
834 
835 	rc = nvme_rdma_connect(rqpair);
836 	if (rc != 0) {
837 		SPDK_ERRLOG("Unable to connect the rqpair\n");
838 		return -1;
839 	}
840 
841 	rc = nvme_rdma_alloc_reqs(rqpair);
842 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "rc =%d\n", rc);
843 	if (rc) {
844 		SPDK_ERRLOG("Unable to allocate rqpair  RDMA requests\n");
845 		return -1;
846 	}
847 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "RDMA requests allocated\n");
848 
849 	rc = nvme_rdma_alloc_rsps(rqpair);
850 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "rc =%d\n", rc);
851 	if (rc < 0) {
852 		SPDK_ERRLOG("Unable to allocate rqpair RDMA responses\n");
853 		return -1;
854 	}
855 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "RDMA responses allocated\n");
856 
857 	rc = nvme_rdma_register_mem(rqpair);
858 	if (rc < 0) {
859 		SPDK_ERRLOG("Unable to register memory for RDMA\n");
860 		return -1;
861 	}
862 
863 	rc = nvme_fabric_qpair_connect(&rqpair->qpair, rqpair->num_entries);
864 	if (rc < 0) {
865 		SPDK_ERRLOG("Failed to send an NVMe-oF Fabric CONNECT command\n");
866 		return -1;
867 	}
868 
869 	return 0;
870 }
871 
872 /*
873  * Build SGL describing empty payload.
874  */
875 static int
876 nvme_rdma_build_null_request(struct spdk_nvme_rdma_req *rdma_req)
877 {
878 	struct nvme_request *req = rdma_req->req;
879 
880 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
881 
882 	/* The first element of this SGL is pointing at an
883 	 * spdk_nvmf_cmd object. For this particular command,
884 	 * we only need the first 64 bytes corresponding to
885 	 * the NVMe command. */
886 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
887 
888 	/* The RDMA SGL needs one element describing the NVMe command. */
889 	rdma_req->send_wr.num_sge = 1;
890 
891 	req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
892 	req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
893 	req->cmd.dptr.sgl1.keyed.length = 0;
894 	req->cmd.dptr.sgl1.keyed.key = 0;
895 	req->cmd.dptr.sgl1.address = 0;
896 
897 	return 0;
898 }
899 
900 /*
901  * Build inline SGL describing contiguous payload buffer.
902  */
903 static int
904 nvme_rdma_build_contig_inline_request(struct nvme_rdma_qpair *rqpair,
905 				      struct spdk_nvme_rdma_req *rdma_req)
906 {
907 	struct nvme_request *req = rdma_req->req;
908 	struct ibv_mr *mr;
909 	void *payload;
910 	uint64_t requested_size;
911 
912 	payload = req->payload.contig_or_cb_arg + req->payload_offset;
913 	assert(req->payload_size != 0);
914 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
915 
916 	requested_size = req->payload_size;
917 	mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map,
918 			(uint64_t)payload, &requested_size);
919 
920 	if (mr == NULL || requested_size < req->payload_size) {
921 		if (mr) {
922 			SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
923 		}
924 		return -EINVAL;
925 	}
926 
927 	/* The first element of this SGL is pointing at an
928 	 * spdk_nvmf_cmd object. For this particular command,
929 	 * we only need the first 64 bytes corresponding to
930 	 * the NVMe command. */
931 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
932 
933 	rdma_req->send_sgl[1].addr = (uint64_t)payload;
934 	rdma_req->send_sgl[1].length = (uint32_t)req->payload_size;
935 	rdma_req->send_sgl[1].lkey = mr->lkey;
936 
937 	/* The RDMA SGL contains two elements. The first describes
938 	 * the NVMe command and the second describes the data
939 	 * payload. */
940 	rdma_req->send_wr.num_sge = 2;
941 
942 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
943 	req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
944 	req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
945 	req->cmd.dptr.sgl1.unkeyed.length = (uint32_t)req->payload_size;
946 	/* Inline only supported for icdoff == 0 currently.  This function will
947 	 * not get called for controllers with other values. */
948 	req->cmd.dptr.sgl1.address = (uint64_t)0;
949 
950 	return 0;
951 }
952 
953 /*
954  * Build SGL describing contiguous payload buffer.
955  */
956 static int
957 nvme_rdma_build_contig_request(struct nvme_rdma_qpair *rqpair,
958 			       struct spdk_nvme_rdma_req *rdma_req)
959 {
960 	struct nvme_request *req = rdma_req->req;
961 	void *payload = req->payload.contig_or_cb_arg + req->payload_offset;
962 	struct ibv_mr *mr;
963 	uint64_t requested_size;
964 
965 	assert(req->payload_size != 0);
966 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
967 
968 	requested_size = req->payload_size;
969 	if (!g_nvme_hooks.get_rkey) {
970 
971 		mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map, (uint64_t)payload,
972 				&requested_size);
973 		if (mr == NULL) {
974 			return -1;
975 		}
976 		req->cmd.dptr.sgl1.keyed.key = mr->rkey;
977 	} else {
978 		req->cmd.dptr.sgl1.keyed.key = spdk_mem_map_translate(rqpair->mr_map->map,
979 					       (uint64_t)payload,
980 					       &requested_size);
981 	}
982 
983 	if (requested_size < req->payload_size) {
984 		SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
985 		return -1;
986 	}
987 
988 	/* The first element of this SGL is pointing at an
989 	 * spdk_nvmf_cmd object. For this particular command,
990 	 * we only need the first 64 bytes corresponding to
991 	 * the NVMe command. */
992 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
993 
994 	/* The RDMA SGL needs one element describing the NVMe command. */
995 	rdma_req->send_wr.num_sge = 1;
996 
997 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
998 	req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
999 	req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1000 	req->cmd.dptr.sgl1.keyed.length = req->payload_size;
1001 	req->cmd.dptr.sgl1.address = (uint64_t)payload;
1002 
1003 	return 0;
1004 }
1005 
1006 /*
1007  * Build SGL describing scattered payload buffer.
1008  */
1009 static int
1010 nvme_rdma_build_sgl_request(struct nvme_rdma_qpair *rqpair,
1011 			    struct spdk_nvme_rdma_req *rdma_req)
1012 {
1013 	struct nvme_request *req = rdma_req->req;
1014 	struct spdk_nvmf_cmd *cmd = &rqpair->cmds[rdma_req->id];
1015 	struct ibv_mr *mr = NULL;
1016 	void *virt_addr;
1017 	uint64_t remaining_size, mr_length;
1018 	uint32_t sge_length;
1019 	int rc, max_num_sgl, num_sgl_desc;
1020 
1021 	assert(req->payload_size != 0);
1022 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
1023 	assert(req->payload.reset_sgl_fn != NULL);
1024 	assert(req->payload.next_sge_fn != NULL);
1025 	req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
1026 
1027 	max_num_sgl = req->qpair->ctrlr->max_sges;
1028 
1029 	remaining_size = req->payload_size;
1030 	num_sgl_desc = 0;
1031 	do {
1032 		rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &virt_addr, &sge_length);
1033 		if (rc) {
1034 			return -1;
1035 		}
1036 
1037 		sge_length = spdk_min(remaining_size, sge_length);
1038 		mr_length = sge_length;
1039 
1040 		if (!g_nvme_hooks.get_rkey) {
1041 			mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map,
1042 					(uint64_t)virt_addr,
1043 					&mr_length);
1044 			if (mr == NULL) {
1045 				return -1;
1046 			}
1047 			cmd->sgl[num_sgl_desc].keyed.key = mr->rkey;
1048 		} else {
1049 			cmd->sgl[num_sgl_desc].keyed.key = spdk_mem_map_translate(rqpair->mr_map->map,
1050 							   (uint64_t)virt_addr,
1051 							   &mr_length);
1052 		}
1053 
1054 		if (mr_length < sge_length) {
1055 			SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
1056 			return -1;
1057 		}
1058 
1059 		cmd->sgl[num_sgl_desc].keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1060 		cmd->sgl[num_sgl_desc].keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1061 		cmd->sgl[num_sgl_desc].keyed.length = sge_length;
1062 		cmd->sgl[num_sgl_desc].address = (uint64_t)virt_addr;
1063 
1064 		remaining_size -= sge_length;
1065 		num_sgl_desc++;
1066 	} while (remaining_size > 0 && num_sgl_desc < max_num_sgl);
1067 
1068 
1069 	/* Should be impossible if we did our sgl checks properly up the stack, but do a sanity check here. */
1070 	if (remaining_size > 0) {
1071 		return -1;
1072 	}
1073 
1074 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1075 
1076 	/* The RDMA SGL needs one element describing some portion
1077 	 * of the spdk_nvmf_cmd structure. */
1078 	rdma_req->send_wr.num_sge = 1;
1079 
1080 	/*
1081 	 * If only one SGL descriptor is required, it can be embedded directly in the command
1082 	 * as a data block descriptor.
1083 	 */
1084 	if (num_sgl_desc == 1) {
1085 		/* The first element of this SGL is pointing at an
1086 		 * spdk_nvmf_cmd object. For this particular command,
1087 		 * we only need the first 64 bytes corresponding to
1088 		 * the NVMe command. */
1089 		rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1090 
1091 		req->cmd.dptr.sgl1.keyed.type = cmd->sgl[0].keyed.type;
1092 		req->cmd.dptr.sgl1.keyed.subtype = cmd->sgl[0].keyed.subtype;
1093 		req->cmd.dptr.sgl1.keyed.length = cmd->sgl[0].keyed.length;
1094 		req->cmd.dptr.sgl1.keyed.key = cmd->sgl[0].keyed.key;
1095 		req->cmd.dptr.sgl1.address = cmd->sgl[0].address;
1096 	} else {
1097 		/*
1098 		 * Otherwise, The SGL descriptor embedded in the command must point to the list of
1099 		 * SGL descriptors used to describe the operation. In that case it is a last segment descriptor.
1100 		 */
1101 		rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd) + sizeof(struct
1102 					       spdk_nvme_sgl_descriptor) * num_sgl_desc;
1103 
1104 		req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_LAST_SEGMENT;
1105 		req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1106 		req->cmd.dptr.sgl1.unkeyed.length = num_sgl_desc * sizeof(struct spdk_nvme_sgl_descriptor);
1107 		req->cmd.dptr.sgl1.address = (uint64_t)0;
1108 	}
1109 
1110 	return 0;
1111 }
1112 
1113 /*
1114  * Build inline SGL describing sgl payload buffer.
1115  */
1116 static int
1117 nvme_rdma_build_sgl_inline_request(struct nvme_rdma_qpair *rqpair,
1118 				   struct spdk_nvme_rdma_req *rdma_req)
1119 {
1120 	struct nvme_request *req = rdma_req->req;
1121 	struct ibv_mr *mr;
1122 	uint32_t length;
1123 	uint64_t requested_size;
1124 	uint32_t remaining_payload;
1125 	void *virt_addr;
1126 	int rc, i;
1127 
1128 	assert(req->payload_size != 0);
1129 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
1130 	assert(req->payload.reset_sgl_fn != NULL);
1131 	assert(req->payload.next_sge_fn != NULL);
1132 	req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
1133 
1134 	remaining_payload = req->payload_size;
1135 	rdma_req->send_wr.num_sge = 1;
1136 
1137 	do {
1138 		rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &virt_addr, &length);
1139 		if (rc) {
1140 			return -1;
1141 		}
1142 
1143 		assert(length <= remaining_payload);
1144 
1145 		requested_size = length;
1146 		mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map, (uint64_t)virt_addr,
1147 				&requested_size);
1148 		if (mr == NULL || requested_size < length) {
1149 			for (i = 1; i < rdma_req->send_wr.num_sge; i++) {
1150 				rdma_req->send_sgl[i].addr = 0;
1151 				rdma_req->send_sgl[i].length = 0;
1152 				rdma_req->send_sgl[i].lkey = 0;
1153 			}
1154 
1155 			if (mr) {
1156 				SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
1157 			}
1158 			return -1;
1159 		}
1160 
1161 		rdma_req->send_sgl[rdma_req->send_wr.num_sge].addr = (uint64_t)virt_addr;
1162 		rdma_req->send_sgl[rdma_req->send_wr.num_sge].length = length;
1163 		rdma_req->send_sgl[rdma_req->send_wr.num_sge].lkey = mr->lkey;
1164 		rdma_req->send_wr.num_sge++;
1165 
1166 		remaining_payload -= length;
1167 	} while (remaining_payload && rdma_req->send_wr.num_sge < (int64_t)rqpair->max_send_sge);
1168 
1169 	if (remaining_payload) {
1170 		SPDK_ERRLOG("Unable to prepare request. Too many SGL elements\n");
1171 		return -1;
1172 	}
1173 
1174 	/* The first element of this SGL is pointing at an
1175 	 * spdk_nvmf_cmd object. For this particular command,
1176 	 * we only need the first 64 bytes corresponding to
1177 	 * the NVMe command. */
1178 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1179 
1180 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1181 	req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
1182 	req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1183 	req->cmd.dptr.sgl1.unkeyed.length = (uint32_t)req->payload_size;
1184 	/* Inline only supported for icdoff == 0 currently.  This function will
1185 	 * not get called for controllers with other values. */
1186 	req->cmd.dptr.sgl1.address = (uint64_t)0;
1187 
1188 	return 0;
1189 }
1190 
1191 static inline unsigned int
1192 nvme_rdma_icdsz_bytes(struct spdk_nvme_ctrlr *ctrlr)
1193 {
1194 	return (ctrlr->cdata.nvmf_specific.ioccsz * 16 - sizeof(struct spdk_nvme_cmd));
1195 }
1196 
1197 static int
1198 nvme_rdma_req_init(struct nvme_rdma_qpair *rqpair, struct nvme_request *req,
1199 		   struct spdk_nvme_rdma_req *rdma_req)
1200 {
1201 	struct spdk_nvme_ctrlr *ctrlr = rqpair->qpair.ctrlr;
1202 	int rc;
1203 
1204 	rdma_req->req = req;
1205 	req->cmd.cid = rdma_req->id;
1206 
1207 	if (req->payload_size == 0) {
1208 		rc = nvme_rdma_build_null_request(rdma_req);
1209 	} else if (nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG) {
1210 		/*
1211 		 * Check if icdoff is non zero, to avoid interop conflicts with
1212 		 * targets with non-zero icdoff.  Both SPDK and the Linux kernel
1213 		 * targets use icdoff = 0.  For targets with non-zero icdoff, we
1214 		 * will currently just not use inline data for now.
1215 		 */
1216 		if (req->cmd.opc == SPDK_NVME_OPC_WRITE &&
1217 		    req->payload_size <= nvme_rdma_icdsz_bytes(ctrlr) &&
1218 		    (ctrlr->cdata.nvmf_specific.icdoff == 0)) {
1219 			rc = nvme_rdma_build_contig_inline_request(rqpair, rdma_req);
1220 		} else {
1221 			rc = nvme_rdma_build_contig_request(rqpair, rdma_req);
1222 		}
1223 	} else if (nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL) {
1224 		if (req->cmd.opc == SPDK_NVME_OPC_WRITE &&
1225 		    req->payload_size <= nvme_rdma_icdsz_bytes(ctrlr) &&
1226 		    ctrlr->cdata.nvmf_specific.icdoff == 0) {
1227 			rc = nvme_rdma_build_sgl_inline_request(rqpair, rdma_req);
1228 		} else {
1229 			rc = nvme_rdma_build_sgl_request(rqpair, rdma_req);
1230 		}
1231 	} else {
1232 		rc = -1;
1233 	}
1234 
1235 	if (rc) {
1236 		return rc;
1237 	}
1238 
1239 	memcpy(&rqpair->cmds[rdma_req->id], &req->cmd, sizeof(req->cmd));
1240 	return 0;
1241 }
1242 
1243 static struct spdk_nvme_qpair *
1244 nvme_rdma_ctrlr_create_qpair(struct spdk_nvme_ctrlr *ctrlr,
1245 			     uint16_t qid, uint32_t qsize,
1246 			     enum spdk_nvme_qprio qprio,
1247 			     uint32_t num_requests)
1248 {
1249 	struct nvme_rdma_qpair *rqpair;
1250 	struct spdk_nvme_qpair *qpair;
1251 	int rc;
1252 
1253 	rqpair = calloc(1, sizeof(struct nvme_rdma_qpair));
1254 	if (!rqpair) {
1255 		SPDK_ERRLOG("failed to get create rqpair\n");
1256 		return NULL;
1257 	}
1258 
1259 	rqpair->num_entries = qsize;
1260 
1261 	qpair = &rqpair->qpair;
1262 
1263 	rc = nvme_qpair_init(qpair, qid, ctrlr, qprio, num_requests);
1264 	if (rc != 0) {
1265 		return NULL;
1266 	}
1267 
1268 	rc = nvme_rdma_qpair_connect(rqpair);
1269 	if (rc < 0) {
1270 		nvme_rdma_qpair_destroy(qpair);
1271 		return NULL;
1272 	}
1273 
1274 	return qpair;
1275 }
1276 
1277 static int
1278 nvme_rdma_qpair_destroy(struct spdk_nvme_qpair *qpair)
1279 {
1280 	struct nvme_rdma_qpair *rqpair;
1281 
1282 	if (!qpair) {
1283 		return -1;
1284 	}
1285 	nvme_rdma_qpair_fail(qpair);
1286 	nvme_qpair_deinit(qpair);
1287 
1288 	rqpair = nvme_rdma_qpair(qpair);
1289 
1290 	nvme_rdma_unregister_mem(rqpair);
1291 	nvme_rdma_free_reqs(rqpair);
1292 	nvme_rdma_free_rsps(rqpair);
1293 
1294 	if (rqpair->cm_id) {
1295 		if (rqpair->cm_id->qp) {
1296 			rdma_destroy_qp(rqpair->cm_id);
1297 		}
1298 		rdma_destroy_id(rqpair->cm_id);
1299 	}
1300 
1301 	if (rqpair->cq) {
1302 		ibv_destroy_cq(rqpair->cq);
1303 	}
1304 
1305 	if (rqpair->cm_channel) {
1306 		rdma_destroy_event_channel(rqpair->cm_channel);
1307 	}
1308 
1309 	free(rqpair);
1310 
1311 	return 0;
1312 }
1313 
1314 struct spdk_nvme_qpair *
1315 nvme_rdma_ctrlr_create_io_qpair(struct spdk_nvme_ctrlr *ctrlr, uint16_t qid,
1316 				const struct spdk_nvme_io_qpair_opts *opts)
1317 {
1318 	return nvme_rdma_ctrlr_create_qpair(ctrlr, qid, opts->io_queue_size, opts->qprio,
1319 					    opts->io_queue_requests);
1320 }
1321 
1322 int
1323 nvme_rdma_ctrlr_enable(struct spdk_nvme_ctrlr *ctrlr)
1324 {
1325 	/* do nothing here */
1326 	return 0;
1327 }
1328 
1329 /* This function must only be called while holding g_spdk_nvme_driver->lock */
1330 int
1331 nvme_rdma_ctrlr_scan(const struct spdk_nvme_transport_id *discovery_trid,
1332 		     void *cb_ctx,
1333 		     spdk_nvme_probe_cb probe_cb,
1334 		     spdk_nvme_remove_cb remove_cb,
1335 		     bool direct_connect)
1336 {
1337 	struct spdk_nvme_ctrlr_opts discovery_opts;
1338 	struct spdk_nvme_ctrlr *discovery_ctrlr;
1339 	union spdk_nvme_cc_register cc;
1340 	int rc;
1341 	struct nvme_completion_poll_status status;
1342 
1343 	if (strcmp(discovery_trid->subnqn, SPDK_NVMF_DISCOVERY_NQN) != 0) {
1344 		/* It is not a discovery_ctrlr info and try to directly connect it */
1345 		rc = nvme_ctrlr_probe(discovery_trid, NULL, probe_cb, cb_ctx);
1346 		return rc;
1347 	}
1348 
1349 	spdk_nvme_ctrlr_get_default_ctrlr_opts(&discovery_opts, sizeof(discovery_opts));
1350 	/* For discovery_ctrlr set the timeout to 0 */
1351 	discovery_opts.keep_alive_timeout_ms = 0;
1352 
1353 	discovery_ctrlr = nvme_rdma_ctrlr_construct(discovery_trid, &discovery_opts, NULL);
1354 	if (discovery_ctrlr == NULL) {
1355 		return -1;
1356 	}
1357 
1358 	/* TODO: this should be using the normal NVMe controller initialization process */
1359 	cc.raw = 0;
1360 	cc.bits.en = 1;
1361 	cc.bits.iosqes = 6; /* SQ entry size == 64 == 2^6 */
1362 	cc.bits.iocqes = 4; /* CQ entry size == 16 == 2^4 */
1363 	rc = nvme_transport_ctrlr_set_reg_4(discovery_ctrlr, offsetof(struct spdk_nvme_registers, cc.raw),
1364 					    cc.raw);
1365 	if (rc < 0) {
1366 		SPDK_ERRLOG("Failed to set cc\n");
1367 		nvme_ctrlr_destruct(discovery_ctrlr);
1368 		return -1;
1369 	}
1370 
1371 	/* get the cdata info */
1372 	rc = nvme_ctrlr_cmd_identify(discovery_ctrlr, SPDK_NVME_IDENTIFY_CTRLR, 0, 0,
1373 				     &discovery_ctrlr->cdata, sizeof(discovery_ctrlr->cdata),
1374 				     nvme_completion_poll_cb, &status);
1375 	if (rc != 0) {
1376 		SPDK_ERRLOG("Failed to identify cdata\n");
1377 		return rc;
1378 	}
1379 
1380 	if (spdk_nvme_wait_for_completion(discovery_ctrlr->adminq, &status)) {
1381 		SPDK_ERRLOG("nvme_identify_controller failed!\n");
1382 		return -ENXIO;
1383 	}
1384 
1385 	/* Direct attach through spdk_nvme_connect() API */
1386 	if (direct_connect == true) {
1387 		/* Set the ready state to skip the normal init process */
1388 		discovery_ctrlr->state = NVME_CTRLR_STATE_READY;
1389 		nvme_ctrlr_connected(discovery_ctrlr);
1390 		nvme_ctrlr_add_process(discovery_ctrlr, 0);
1391 		return 0;
1392 	}
1393 
1394 	rc = nvme_fabric_ctrlr_discover(discovery_ctrlr, cb_ctx, probe_cb);
1395 	nvme_ctrlr_destruct(discovery_ctrlr);
1396 	return rc;
1397 }
1398 
1399 struct spdk_nvme_ctrlr *nvme_rdma_ctrlr_construct(const struct spdk_nvme_transport_id *trid,
1400 		const struct spdk_nvme_ctrlr_opts *opts,
1401 		void *devhandle)
1402 {
1403 	struct nvme_rdma_ctrlr *rctrlr;
1404 	union spdk_nvme_cap_register cap;
1405 	union spdk_nvme_vs_register vs;
1406 	int rc;
1407 
1408 	rctrlr = calloc(1, sizeof(struct nvme_rdma_ctrlr));
1409 	if (rctrlr == NULL) {
1410 		SPDK_ERRLOG("could not allocate ctrlr\n");
1411 		return NULL;
1412 	}
1413 
1414 	rctrlr->ctrlr.trid.trtype = SPDK_NVME_TRANSPORT_RDMA;
1415 	rctrlr->ctrlr.opts = *opts;
1416 	memcpy(&rctrlr->ctrlr.trid, trid, sizeof(rctrlr->ctrlr.trid));
1417 
1418 	rc = nvme_ctrlr_construct(&rctrlr->ctrlr);
1419 	if (rc != 0) {
1420 		free(rctrlr);
1421 		return NULL;
1422 	}
1423 
1424 	rctrlr->ctrlr.adminq = nvme_rdma_ctrlr_create_qpair(&rctrlr->ctrlr, 0,
1425 			       SPDK_NVMF_MIN_ADMIN_QUEUE_ENTRIES, 0, SPDK_NVMF_MIN_ADMIN_QUEUE_ENTRIES);
1426 	if (!rctrlr->ctrlr.adminq) {
1427 		SPDK_ERRLOG("failed to create admin qpair\n");
1428 		nvme_rdma_ctrlr_destruct(&rctrlr->ctrlr);
1429 		return NULL;
1430 	}
1431 
1432 	if (nvme_ctrlr_get_cap(&rctrlr->ctrlr, &cap)) {
1433 		SPDK_ERRLOG("get_cap() failed\n");
1434 		nvme_ctrlr_destruct(&rctrlr->ctrlr);
1435 		return NULL;
1436 	}
1437 
1438 	if (nvme_ctrlr_get_vs(&rctrlr->ctrlr, &vs)) {
1439 		SPDK_ERRLOG("get_vs() failed\n");
1440 		nvme_ctrlr_destruct(&rctrlr->ctrlr);
1441 		return NULL;
1442 	}
1443 
1444 	if (nvme_ctrlr_add_process(&rctrlr->ctrlr, 0) != 0) {
1445 		SPDK_ERRLOG("nvme_ctrlr_add_process() failed\n");
1446 		nvme_ctrlr_destruct(&rctrlr->ctrlr);
1447 		return NULL;
1448 	}
1449 
1450 	nvme_ctrlr_init_cap(&rctrlr->ctrlr, &cap, &vs);
1451 
1452 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "successfully initialized the nvmf ctrlr\n");
1453 	return &rctrlr->ctrlr;
1454 }
1455 
1456 int
1457 nvme_rdma_ctrlr_destruct(struct spdk_nvme_ctrlr *ctrlr)
1458 {
1459 	struct nvme_rdma_ctrlr *rctrlr = nvme_rdma_ctrlr(ctrlr);
1460 
1461 	if (ctrlr->adminq) {
1462 		nvme_rdma_qpair_destroy(ctrlr->adminq);
1463 	}
1464 
1465 	nvme_ctrlr_destruct_finish(ctrlr);
1466 
1467 	free(rctrlr);
1468 
1469 	return 0;
1470 }
1471 
1472 int
1473 nvme_rdma_ctrlr_set_reg_4(struct spdk_nvme_ctrlr *ctrlr, uint32_t offset, uint32_t value)
1474 {
1475 	return nvme_fabric_ctrlr_set_reg_4(ctrlr, offset, value);
1476 }
1477 
1478 int
1479 nvme_rdma_ctrlr_set_reg_8(struct spdk_nvme_ctrlr *ctrlr, uint32_t offset, uint64_t value)
1480 {
1481 	return nvme_fabric_ctrlr_set_reg_8(ctrlr, offset, value);
1482 }
1483 
1484 int
1485 nvme_rdma_ctrlr_get_reg_4(struct spdk_nvme_ctrlr *ctrlr, uint32_t offset, uint32_t *value)
1486 {
1487 	return nvme_fabric_ctrlr_get_reg_4(ctrlr, offset, value);
1488 }
1489 
1490 int
1491 nvme_rdma_ctrlr_get_reg_8(struct spdk_nvme_ctrlr *ctrlr, uint32_t offset, uint64_t *value)
1492 {
1493 	return nvme_fabric_ctrlr_get_reg_8(ctrlr, offset, value);
1494 }
1495 
1496 int
1497 nvme_rdma_qpair_submit_request(struct spdk_nvme_qpair *qpair,
1498 			       struct nvme_request *req)
1499 {
1500 	struct nvme_rdma_qpair *rqpair;
1501 	struct spdk_nvme_rdma_req *rdma_req;
1502 	struct ibv_send_wr *wr, *bad_wr = NULL;
1503 	int rc;
1504 
1505 	rqpair = nvme_rdma_qpair(qpair);
1506 	assert(rqpair != NULL);
1507 	assert(req != NULL);
1508 
1509 	rdma_req = nvme_rdma_req_get(rqpair);
1510 	if (!rdma_req) {
1511 		/*
1512 		 * No rdma_req is available.  Queue the request to be processed later.
1513 		 */
1514 		STAILQ_INSERT_TAIL(&qpair->queued_req, req, stailq);
1515 		return 0;
1516 	}
1517 
1518 	if (nvme_rdma_req_init(rqpair, req, rdma_req)) {
1519 		SPDK_ERRLOG("nvme_rdma_req_init() failed\n");
1520 		nvme_rdma_req_put(rqpair, rdma_req);
1521 		return -1;
1522 	}
1523 
1524 	req->timed_out = false;
1525 	if (spdk_unlikely(rqpair->qpair.ctrlr->timeout_enabled)) {
1526 		req->submit_tick = spdk_get_ticks();
1527 	} else {
1528 		req->submit_tick = 0;
1529 	}
1530 
1531 	wr = &rdma_req->send_wr;
1532 
1533 	nvme_rdma_trace_ibv_sge(wr->sg_list);
1534 
1535 	rc = ibv_post_send(rqpair->cm_id->qp, wr, &bad_wr);
1536 	if (rc) {
1537 		SPDK_ERRLOG("Failure posting rdma send for NVMf completion: %d (%s)\n", rc, spdk_strerror(rc));
1538 	}
1539 
1540 	return rc;
1541 }
1542 
1543 int
1544 nvme_rdma_ctrlr_delete_io_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
1545 {
1546 	return nvme_rdma_qpair_destroy(qpair);
1547 }
1548 
1549 int
1550 nvme_rdma_ctrlr_reinit_io_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
1551 {
1552 	return nvme_rdma_qpair_connect(nvme_rdma_qpair(qpair));
1553 }
1554 
1555 int
1556 nvme_rdma_qpair_enable(struct spdk_nvme_qpair *qpair)
1557 {
1558 	/* Currently, doing nothing here */
1559 	return 0;
1560 }
1561 
1562 int
1563 nvme_rdma_qpair_disable(struct spdk_nvme_qpair *qpair)
1564 {
1565 	/* Currently, doing nothing here */
1566 	return 0;
1567 }
1568 
1569 int
1570 nvme_rdma_qpair_reset(struct spdk_nvme_qpair *qpair)
1571 {
1572 	/* Currently, doing nothing here */
1573 	return 0;
1574 }
1575 
1576 int
1577 nvme_rdma_qpair_fail(struct spdk_nvme_qpair *qpair)
1578 {
1579 	/*
1580 	 * If the qpair is really failed, the connection is broken
1581 	 * and we need to flush back all I/O
1582 	 */
1583 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
1584 	struct nvme_request *req;
1585 	struct spdk_nvme_cpl cpl;
1586 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
1587 
1588 	cpl.status.sc = SPDK_NVME_SC_ABORTED_SQ_DELETION;
1589 	cpl.status.sct = SPDK_NVME_SCT_GENERIC;
1590 
1591 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
1592 		assert(rdma_req->req != NULL);
1593 		req = rdma_req->req;
1594 
1595 		nvme_rdma_req_complete(req, &cpl);
1596 		nvme_rdma_req_put(rqpair, rdma_req);
1597 	}
1598 
1599 	return 0;
1600 }
1601 
1602 static void
1603 nvme_rdma_qpair_check_timeout(struct spdk_nvme_qpair *qpair)
1604 {
1605 	uint64_t t02;
1606 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
1607 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
1608 	struct spdk_nvme_ctrlr *ctrlr = qpair->ctrlr;
1609 	struct spdk_nvme_ctrlr_process *active_proc;
1610 
1611 	/* Don't check timeouts during controller initialization. */
1612 	if (ctrlr->state != NVME_CTRLR_STATE_READY) {
1613 		return;
1614 	}
1615 
1616 	if (nvme_qpair_is_admin_queue(qpair)) {
1617 		active_proc = spdk_nvme_ctrlr_get_current_process(ctrlr);
1618 	} else {
1619 		active_proc = qpair->active_proc;
1620 	}
1621 
1622 	/* Only check timeouts if the current process has a timeout callback. */
1623 	if (active_proc == NULL || active_proc->timeout_cb_fn == NULL) {
1624 		return;
1625 	}
1626 
1627 	t02 = spdk_get_ticks();
1628 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
1629 		assert(rdma_req->req != NULL);
1630 
1631 		if (nvme_request_check_timeout(rdma_req->req, rdma_req->id, active_proc, t02)) {
1632 			/*
1633 			 * The requests are in order, so as soon as one has not timed out,
1634 			 * stop iterating.
1635 			 */
1636 			break;
1637 		}
1638 	}
1639 }
1640 
1641 #define MAX_COMPLETIONS_PER_POLL 128
1642 
1643 int
1644 nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
1645 				    uint32_t max_completions)
1646 {
1647 	struct nvme_rdma_qpair	*rqpair = nvme_rdma_qpair(qpair);
1648 	struct ibv_wc		wc[MAX_COMPLETIONS_PER_POLL];
1649 	int			i, rc, batch_size;
1650 	uint32_t		reaped;
1651 	struct ibv_cq		*cq;
1652 
1653 	if (max_completions == 0) {
1654 		max_completions = rqpair->num_entries;
1655 	} else {
1656 		max_completions = spdk_min(max_completions, rqpair->num_entries);
1657 	}
1658 
1659 	cq = rqpair->cq;
1660 
1661 	reaped = 0;
1662 	do {
1663 		batch_size = spdk_min((max_completions - reaped),
1664 				      MAX_COMPLETIONS_PER_POLL);
1665 		rc = ibv_poll_cq(cq, batch_size, wc);
1666 		if (rc < 0) {
1667 			SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
1668 				    errno, spdk_strerror(errno));
1669 			return -1;
1670 		} else if (rc == 0) {
1671 			/* Ran out of completions */
1672 			break;
1673 		}
1674 
1675 		for (i = 0; i < rc; i++) {
1676 			if (wc[i].status) {
1677 				SPDK_ERRLOG("CQ error on Queue Pair %p, Response Index %lu (%d): %s\n",
1678 					    qpair, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
1679 				return -1;
1680 			}
1681 
1682 			switch (wc[i].opcode) {
1683 			case IBV_WC_RECV:
1684 				SPDK_DEBUGLOG(SPDK_LOG_NVME, "CQ recv completion\n");
1685 
1686 				reaped++;
1687 
1688 				if (wc[i].byte_len < sizeof(struct spdk_nvme_cpl)) {
1689 					SPDK_ERRLOG("recv length %u less than expected response size\n", wc[i].byte_len);
1690 					return -1;
1691 				}
1692 
1693 				if (nvme_rdma_recv(rqpair, wc[i].wr_id)) {
1694 					SPDK_ERRLOG("nvme_rdma_recv processing failure\n");
1695 					return -1;
1696 				}
1697 				break;
1698 
1699 			case IBV_WC_SEND:
1700 				break;
1701 
1702 			default:
1703 				SPDK_ERRLOG("Received an unexpected opcode on the CQ: %d\n", wc[i].opcode);
1704 				return -1;
1705 			}
1706 		}
1707 	} while (reaped < max_completions);
1708 
1709 	if (spdk_unlikely(rqpair->qpair.ctrlr->timeout_enabled)) {
1710 		nvme_rdma_qpair_check_timeout(qpair);
1711 	}
1712 
1713 	return reaped;
1714 }
1715 
1716 uint32_t
1717 nvme_rdma_ctrlr_get_max_xfer_size(struct spdk_nvme_ctrlr *ctrlr)
1718 {
1719 	/* Todo, which should get from the NVMF target */
1720 	return NVME_RDMA_RW_BUFFER_SIZE;
1721 }
1722 
1723 uint16_t
1724 nvme_rdma_ctrlr_get_max_sges(struct spdk_nvme_ctrlr *ctrlr)
1725 {
1726 	return spdk_min(ctrlr->cdata.nvmf_specific.msdbd, NVME_RDMA_MAX_SGL_DESCRIPTORS);
1727 }
1728 
1729 void *
1730 nvme_rdma_ctrlr_alloc_cmb_io_buffer(struct spdk_nvme_ctrlr *ctrlr, size_t size)
1731 {
1732 	return NULL;
1733 }
1734 
1735 int
1736 nvme_rdma_ctrlr_free_cmb_io_buffer(struct spdk_nvme_ctrlr *ctrlr, void *buf, size_t size)
1737 {
1738 	return 0;
1739 }
1740 
1741 void
1742 spdk_nvme_rdma_init_hooks(struct spdk_nvme_rdma_hooks *hooks)
1743 {
1744 	g_nvme_hooks = *hooks;
1745 }
1746