xref: /spdk/lib/nvme/nvme_rdma.c (revision fa2d95b3fe66e7f5c543eaef89fa00d4eaa0e6e7)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 /*
35  * NVMe over RDMA transport
36  */
37 
38 #include "spdk/stdinc.h"
39 
40 #include <infiniband/verbs.h>
41 #include <rdma/rdma_cma.h>
42 #include <rdma/rdma_verbs.h>
43 
44 #include "spdk/assert.h"
45 #include "spdk/log.h"
46 #include "spdk/trace.h"
47 #include "spdk/event.h"
48 #include "spdk/queue.h"
49 #include "spdk/nvme.h"
50 #include "spdk/nvmf_spec.h"
51 #include "spdk/string.h"
52 #include "spdk/endian.h"
53 #include "spdk/likely.h"
54 
55 #include "nvme_internal.h"
56 
57 #define NVME_RDMA_TIME_OUT_IN_MS 2000
58 #define NVME_RDMA_RW_BUFFER_SIZE 131072
59 
60 /*
61  * NVME RDMA qpair Resource Defaults
62  */
63 #define NVME_RDMA_DEFAULT_TX_SGE		2
64 #define NVME_RDMA_DEFAULT_RX_SGE		1
65 
66 
67 /* Max number of NVMe-oF SGL descriptors supported by the host */
68 #define NVME_RDMA_MAX_SGL_DESCRIPTORS		16
69 struct spdk_nvmf_cmd {
70 	struct spdk_nvme_cmd cmd;
71 	struct spdk_nvme_sgl_descriptor sgl[NVME_RDMA_MAX_SGL_DESCRIPTORS];
72 };
73 
74 struct spdk_nvme_rdma_hooks g_nvme_hooks = {};
75 
76 /* Mapping from virtual address to ibv_mr pointer for a protection domain */
77 struct spdk_nvme_rdma_mr_map {
78 	struct ibv_pd				*pd;
79 	struct spdk_mem_map			*map;
80 	uint64_t				ref;
81 	LIST_ENTRY(spdk_nvme_rdma_mr_map)	link;
82 };
83 
84 /* NVMe RDMA transport extensions for spdk_nvme_ctrlr */
85 struct nvme_rdma_ctrlr {
86 	struct spdk_nvme_ctrlr			ctrlr;
87 
88 	struct ibv_pd				*pd;
89 };
90 
91 /* NVMe RDMA qpair extensions for spdk_nvme_qpair */
92 struct nvme_rdma_qpair {
93 	struct spdk_nvme_qpair			qpair;
94 
95 	struct rdma_cm_id			*cm_id;
96 
97 	struct ibv_cq				*cq;
98 
99 	struct	spdk_nvme_rdma_req		*rdma_reqs;
100 
101 	uint32_t				max_send_sge;
102 
103 	uint32_t				max_recv_sge;
104 
105 	uint16_t				num_entries;
106 
107 	/* Parallel arrays of response buffers + response SGLs of size num_entries */
108 	struct ibv_sge				*rsp_sgls;
109 	struct spdk_nvme_cpl			*rsps;
110 
111 	struct ibv_recv_wr			*rsp_recv_wrs;
112 
113 	/* Memory region describing all rsps for this qpair */
114 	struct ibv_mr				*rsp_mr;
115 
116 	/*
117 	 * Array of num_entries NVMe commands registered as RDMA message buffers.
118 	 * Indexed by rdma_req->id.
119 	 */
120 	struct spdk_nvmf_cmd			*cmds;
121 
122 	/* Memory region describing all cmds for this qpair */
123 	struct ibv_mr				*cmd_mr;
124 
125 	struct spdk_nvme_rdma_mr_map		*mr_map;
126 
127 	TAILQ_HEAD(, spdk_nvme_rdma_req)	free_reqs;
128 	TAILQ_HEAD(, spdk_nvme_rdma_req)	outstanding_reqs;
129 
130 	/* Placed at the end of the struct since it is not used frequently */
131 	struct rdma_event_channel		*cm_channel;
132 };
133 
134 struct spdk_nvme_rdma_req {
135 	int					id;
136 
137 	struct ibv_send_wr			send_wr;
138 
139 	struct nvme_request			*req;
140 
141 	struct ibv_sge				send_sgl[NVME_RDMA_DEFAULT_TX_SGE];
142 
143 	TAILQ_ENTRY(spdk_nvme_rdma_req)		link;
144 
145 	bool					request_ready_to_put;
146 };
147 
148 static const char *rdma_cm_event_str[] = {
149 	"RDMA_CM_EVENT_ADDR_RESOLVED",
150 	"RDMA_CM_EVENT_ADDR_ERROR",
151 	"RDMA_CM_EVENT_ROUTE_RESOLVED",
152 	"RDMA_CM_EVENT_ROUTE_ERROR",
153 	"RDMA_CM_EVENT_CONNECT_REQUEST",
154 	"RDMA_CM_EVENT_CONNECT_RESPONSE",
155 	"RDMA_CM_EVENT_CONNECT_ERROR",
156 	"RDMA_CM_EVENT_UNREACHABLE",
157 	"RDMA_CM_EVENT_REJECTED",
158 	"RDMA_CM_EVENT_ESTABLISHED",
159 	"RDMA_CM_EVENT_DISCONNECTED",
160 	"RDMA_CM_EVENT_DEVICE_REMOVAL",
161 	"RDMA_CM_EVENT_MULTICAST_JOIN",
162 	"RDMA_CM_EVENT_MULTICAST_ERROR",
163 	"RDMA_CM_EVENT_ADDR_CHANGE",
164 	"RDMA_CM_EVENT_TIMEWAIT_EXIT"
165 };
166 
167 static LIST_HEAD(, spdk_nvme_rdma_mr_map) g_rdma_mr_maps = LIST_HEAD_INITIALIZER(&g_rdma_mr_maps);
168 static pthread_mutex_t g_rdma_mr_maps_mutex = PTHREAD_MUTEX_INITIALIZER;
169 
170 static int nvme_rdma_qpair_destroy(struct spdk_nvme_qpair *qpair);
171 
172 static inline struct nvme_rdma_qpair *
173 nvme_rdma_qpair(struct spdk_nvme_qpair *qpair)
174 {
175 	assert(qpair->trtype == SPDK_NVME_TRANSPORT_RDMA);
176 	return SPDK_CONTAINEROF(qpair, struct nvme_rdma_qpair, qpair);
177 }
178 
179 static inline struct nvme_rdma_ctrlr *
180 nvme_rdma_ctrlr(struct spdk_nvme_ctrlr *ctrlr)
181 {
182 	assert(ctrlr->trid.trtype == SPDK_NVME_TRANSPORT_RDMA);
183 	return SPDK_CONTAINEROF(ctrlr, struct nvme_rdma_ctrlr, ctrlr);
184 }
185 
186 static struct spdk_nvme_rdma_req *
187 nvme_rdma_req_get(struct nvme_rdma_qpair *rqpair)
188 {
189 	struct spdk_nvme_rdma_req *rdma_req;
190 
191 	rdma_req = TAILQ_FIRST(&rqpair->free_reqs);
192 	if (rdma_req) {
193 		TAILQ_REMOVE(&rqpair->free_reqs, rdma_req, link);
194 		TAILQ_INSERT_TAIL(&rqpair->outstanding_reqs, rdma_req, link);
195 	}
196 
197 	return rdma_req;
198 }
199 
200 static void
201 nvme_rdma_req_put(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
202 {
203 	rdma_req->request_ready_to_put = false;
204 	TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link);
205 	TAILQ_INSERT_HEAD(&rqpair->free_reqs, rdma_req, link);
206 }
207 
208 static void
209 nvme_rdma_req_complete(struct nvme_request *req,
210 		       struct spdk_nvme_cpl *rsp)
211 {
212 	nvme_complete_request(req, rsp);
213 	nvme_free_request(req);
214 }
215 
216 static const char *
217 nvme_rdma_cm_event_str_get(uint32_t event)
218 {
219 	if (event < SPDK_COUNTOF(rdma_cm_event_str)) {
220 		return rdma_cm_event_str[event];
221 	} else {
222 		return "Undefined";
223 	}
224 }
225 
226 static struct rdma_cm_event *
227 nvme_rdma_get_event(struct rdma_event_channel *channel,
228 		    enum rdma_cm_event_type evt)
229 {
230 	struct rdma_cm_event	*event;
231 	int			rc;
232 
233 	rc = rdma_get_cm_event(channel, &event);
234 	if (rc < 0) {
235 		SPDK_ERRLOG("Failed to get event from CM event channel. Error %d (%s)\n",
236 			    errno, spdk_strerror(errno));
237 		return NULL;
238 	}
239 
240 	if (event->event != evt) {
241 		SPDK_ERRLOG("Expected %s but received %s (%d) from CM event channel (status = %d)\n",
242 			    nvme_rdma_cm_event_str_get(evt),
243 			    nvme_rdma_cm_event_str_get(event->event), event->event, event->status);
244 		rdma_ack_cm_event(event);
245 		return NULL;
246 	}
247 
248 	return event;
249 }
250 
251 static int
252 nvme_rdma_qpair_init(struct nvme_rdma_qpair *rqpair)
253 {
254 	int			rc;
255 	struct ibv_qp_init_attr	attr;
256 	struct ibv_device_attr	dev_attr;
257 	struct nvme_rdma_ctrlr	*rctrlr;
258 
259 	rc = ibv_query_device(rqpair->cm_id->verbs, &dev_attr);
260 	if (rc != 0) {
261 		SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
262 		return -1;
263 	}
264 
265 	rqpair->cq = ibv_create_cq(rqpair->cm_id->verbs, rqpair->num_entries * 2, rqpair, NULL, 0);
266 	if (!rqpair->cq) {
267 		SPDK_ERRLOG("Unable to create completion queue: errno %d: %s\n", errno, spdk_strerror(errno));
268 		return -1;
269 	}
270 
271 	rctrlr = nvme_rdma_ctrlr(rqpair->qpair.ctrlr);
272 	if (g_nvme_hooks.get_ibv_pd) {
273 		rctrlr->pd = g_nvme_hooks.get_ibv_pd(&rctrlr->ctrlr.trid, rqpair->cm_id->verbs);
274 	} else {
275 		rctrlr->pd = NULL;
276 	}
277 
278 	memset(&attr, 0, sizeof(struct ibv_qp_init_attr));
279 	attr.qp_type		= IBV_QPT_RC;
280 	attr.send_cq		= rqpair->cq;
281 	attr.recv_cq		= rqpair->cq;
282 	attr.cap.max_send_wr	= rqpair->num_entries; /* SEND operations */
283 	attr.cap.max_recv_wr	= rqpair->num_entries; /* RECV operations */
284 	attr.cap.max_send_sge	= spdk_min(NVME_RDMA_DEFAULT_TX_SGE, dev_attr.max_sge);
285 	attr.cap.max_recv_sge	= spdk_min(NVME_RDMA_DEFAULT_RX_SGE, dev_attr.max_sge);
286 
287 	rc = rdma_create_qp(rqpair->cm_id, rctrlr->pd, &attr);
288 
289 	if (rc) {
290 		SPDK_ERRLOG("rdma_create_qp failed\n");
291 		return -1;
292 	}
293 
294 	/* ibv_create_qp will change the values in attr.cap. Make sure we store the proper value. */
295 	rqpair->max_send_sge = spdk_min(NVME_RDMA_DEFAULT_TX_SGE, attr.cap.max_send_sge);
296 	rqpair->max_recv_sge = spdk_min(NVME_RDMA_DEFAULT_RX_SGE, attr.cap.max_recv_sge);
297 
298 	rctrlr->pd = rqpair->cm_id->qp->pd;
299 
300 	rqpair->cm_id->context = &rqpair->qpair;
301 
302 	return 0;
303 }
304 
305 #define nvme_rdma_trace_ibv_sge(sg_list) \
306 	if (sg_list) { \
307 		SPDK_DEBUGLOG(SPDK_LOG_NVME, "local addr %p length 0x%x lkey 0x%x\n", \
308 			      (void *)(sg_list)->addr, (sg_list)->length, (sg_list)->lkey); \
309 	}
310 
311 static int
312 nvme_rdma_post_recv(struct nvme_rdma_qpair *rqpair, uint16_t rsp_idx)
313 {
314 	struct ibv_recv_wr *wr, *bad_wr = NULL;
315 	int rc;
316 
317 	wr = &rqpair->rsp_recv_wrs[rsp_idx];
318 	nvme_rdma_trace_ibv_sge(wr->sg_list);
319 
320 	rc = ibv_post_recv(rqpair->cm_id->qp, wr, &bad_wr);
321 	if (rc) {
322 		SPDK_ERRLOG("Failure posting rdma recv, rc = 0x%x\n", rc);
323 	}
324 
325 	return rc;
326 }
327 
328 static void
329 nvme_rdma_free_rsps(struct nvme_rdma_qpair *rqpair)
330 {
331 	if (rqpair->rsp_mr && rdma_dereg_mr(rqpair->rsp_mr)) {
332 		SPDK_ERRLOG("Unable to de-register rsp_mr\n");
333 	}
334 	rqpair->rsp_mr = NULL;
335 
336 	free(rqpair->rsps);
337 	rqpair->rsps = NULL;
338 	free(rqpair->rsp_sgls);
339 	rqpair->rsp_sgls = NULL;
340 	free(rqpair->rsp_recv_wrs);
341 	rqpair->rsp_recv_wrs = NULL;
342 }
343 
344 static int
345 nvme_rdma_alloc_rsps(struct nvme_rdma_qpair *rqpair)
346 {
347 	uint16_t i;
348 
349 	rqpair->rsp_mr = NULL;
350 	rqpair->rsps = NULL;
351 	rqpair->rsp_recv_wrs = NULL;
352 
353 	rqpair->rsp_sgls = calloc(rqpair->num_entries, sizeof(*rqpair->rsp_sgls));
354 	if (!rqpair->rsp_sgls) {
355 		SPDK_ERRLOG("Failed to allocate rsp_sgls\n");
356 		goto fail;
357 	}
358 
359 	rqpair->rsp_recv_wrs = calloc(rqpair->num_entries,
360 				      sizeof(*rqpair->rsp_recv_wrs));
361 	if (!rqpair->rsp_recv_wrs) {
362 		SPDK_ERRLOG("Failed to allocate rsp_recv_wrs\n");
363 		goto fail;
364 	}
365 
366 	rqpair->rsps = calloc(rqpair->num_entries, sizeof(*rqpair->rsps));
367 	if (!rqpair->rsps) {
368 		SPDK_ERRLOG("can not allocate rdma rsps\n");
369 		goto fail;
370 	}
371 
372 	rqpair->rsp_mr = rdma_reg_msgs(rqpair->cm_id, rqpair->rsps,
373 				       rqpair->num_entries * sizeof(*rqpair->rsps));
374 	if (rqpair->rsp_mr == NULL) {
375 		SPDK_ERRLOG("Unable to register rsp_mr\n");
376 		goto fail;
377 	}
378 
379 	for (i = 0; i < rqpair->num_entries; i++) {
380 		struct ibv_sge *rsp_sgl = &rqpair->rsp_sgls[i];
381 
382 		rsp_sgl->addr = (uint64_t)&rqpair->rsps[i];
383 		rsp_sgl->length = sizeof(rqpair->rsps[i]);
384 		rsp_sgl->lkey = rqpair->rsp_mr->lkey;
385 
386 		rqpair->rsp_recv_wrs[i].wr_id = i;
387 		rqpair->rsp_recv_wrs[i].next = NULL;
388 		rqpair->rsp_recv_wrs[i].sg_list = rsp_sgl;
389 		rqpair->rsp_recv_wrs[i].num_sge = 1;
390 
391 		if (nvme_rdma_post_recv(rqpair, i)) {
392 			SPDK_ERRLOG("Unable to post connection rx desc\n");
393 			goto fail;
394 		}
395 	}
396 
397 	return 0;
398 
399 fail:
400 	nvme_rdma_free_rsps(rqpair);
401 	return -ENOMEM;
402 }
403 
404 static void
405 nvme_rdma_free_reqs(struct nvme_rdma_qpair *rqpair)
406 {
407 	if (!rqpair->rdma_reqs) {
408 		return;
409 	}
410 
411 	if (rqpair->cmd_mr && rdma_dereg_mr(rqpair->cmd_mr)) {
412 		SPDK_ERRLOG("Unable to de-register cmd_mr\n");
413 	}
414 	rqpair->cmd_mr = NULL;
415 
416 	free(rqpair->cmds);
417 	rqpair->cmds = NULL;
418 
419 	free(rqpair->rdma_reqs);
420 	rqpair->rdma_reqs = NULL;
421 }
422 
423 static int
424 nvme_rdma_alloc_reqs(struct nvme_rdma_qpair *rqpair)
425 {
426 	int i;
427 
428 	rqpair->rdma_reqs = calloc(rqpair->num_entries, sizeof(struct spdk_nvme_rdma_req));
429 	if (rqpair->rdma_reqs == NULL) {
430 		SPDK_ERRLOG("Failed to allocate rdma_reqs\n");
431 		goto fail;
432 	}
433 
434 	rqpair->cmds = calloc(rqpair->num_entries, sizeof(*rqpair->cmds));
435 	if (!rqpair->cmds) {
436 		SPDK_ERRLOG("Failed to allocate RDMA cmds\n");
437 		goto fail;
438 	}
439 
440 	rqpair->cmd_mr = rdma_reg_msgs(rqpair->cm_id, rqpair->cmds,
441 				       rqpair->num_entries * sizeof(*rqpair->cmds));
442 	if (!rqpair->cmd_mr) {
443 		SPDK_ERRLOG("Unable to register cmd_mr\n");
444 		goto fail;
445 	}
446 
447 	TAILQ_INIT(&rqpair->free_reqs);
448 	TAILQ_INIT(&rqpair->outstanding_reqs);
449 	for (i = 0; i < rqpair->num_entries; i++) {
450 		struct spdk_nvme_rdma_req	*rdma_req;
451 		struct spdk_nvmf_cmd		*cmd;
452 
453 		rdma_req = &rqpair->rdma_reqs[i];
454 		cmd = &rqpair->cmds[i];
455 
456 		rdma_req->id = i;
457 
458 		/* The first RDMA sgl element will always point
459 		 * at this data structure. Depending on whether
460 		 * an NVMe-oF SGL is required, the length of
461 		 * this element may change. */
462 		rdma_req->send_sgl[0].addr = (uint64_t)cmd;
463 		rdma_req->send_sgl[0].lkey = rqpair->cmd_mr->lkey;
464 
465 		rdma_req->send_wr.wr_id = (uint64_t)rdma_req;
466 		rdma_req->send_wr.next = NULL;
467 		rdma_req->send_wr.opcode = IBV_WR_SEND;
468 		rdma_req->send_wr.send_flags = IBV_SEND_SIGNALED;
469 		rdma_req->send_wr.sg_list = rdma_req->send_sgl;
470 		rdma_req->send_wr.imm_data = 0;
471 
472 		TAILQ_INSERT_TAIL(&rqpair->free_reqs, rdma_req, link);
473 	}
474 
475 	return 0;
476 
477 fail:
478 	nvme_rdma_free_reqs(rqpair);
479 	return -ENOMEM;
480 }
481 
482 static int
483 nvme_rdma_recv(struct nvme_rdma_qpair *rqpair, uint64_t rsp_idx)
484 {
485 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
486 	struct spdk_nvme_rdma_req *rdma_req;
487 	struct spdk_nvme_cpl *rsp;
488 	struct nvme_request *req;
489 
490 	assert(rsp_idx < rqpair->num_entries);
491 	rsp = &rqpair->rsps[rsp_idx];
492 	rdma_req = &rqpair->rdma_reqs[rsp->cid];
493 
494 	req = rdma_req->req;
495 	nvme_rdma_req_complete(req, rsp);
496 
497 	if (rdma_req->request_ready_to_put) {
498 		nvme_rdma_req_put(rqpair, rdma_req);
499 	} else {
500 		rdma_req->request_ready_to_put = true;
501 	}
502 
503 	if (nvme_rdma_post_recv(rqpair, rsp_idx)) {
504 		SPDK_ERRLOG("Unable to re-post rx descriptor\n");
505 		return -1;
506 	}
507 
508 	if (!STAILQ_EMPTY(&qpair->queued_req) && !qpair->ctrlr->is_resetting) {
509 		req = STAILQ_FIRST(&qpair->queued_req);
510 		STAILQ_REMOVE_HEAD(&qpair->queued_req, stailq);
511 		nvme_qpair_submit_request(qpair, req);
512 	}
513 
514 	return 0;
515 }
516 
517 static int
518 nvme_rdma_resolve_addr(struct nvme_rdma_qpair *rqpair,
519 		       struct sockaddr *src_addr,
520 		       struct sockaddr *dst_addr,
521 		       struct rdma_event_channel *cm_channel)
522 {
523 	int ret;
524 	struct rdma_cm_event *event;
525 
526 	ret = rdma_resolve_addr(rqpair->cm_id, src_addr, dst_addr,
527 				NVME_RDMA_TIME_OUT_IN_MS);
528 	if (ret) {
529 		SPDK_ERRLOG("rdma_resolve_addr, %d\n", errno);
530 		return ret;
531 	}
532 
533 	event = nvme_rdma_get_event(cm_channel, RDMA_CM_EVENT_ADDR_RESOLVED);
534 	if (event == NULL) {
535 		SPDK_ERRLOG("RDMA address resolution error\n");
536 		return -1;
537 	}
538 	rdma_ack_cm_event(event);
539 
540 	ret = rdma_resolve_route(rqpair->cm_id, NVME_RDMA_TIME_OUT_IN_MS);
541 	if (ret) {
542 		SPDK_ERRLOG("rdma_resolve_route\n");
543 		return ret;
544 	}
545 
546 	event = nvme_rdma_get_event(cm_channel, RDMA_CM_EVENT_ROUTE_RESOLVED);
547 	if (event == NULL) {
548 		SPDK_ERRLOG("RDMA route resolution error\n");
549 		return -1;
550 	}
551 	rdma_ack_cm_event(event);
552 
553 	return 0;
554 }
555 
556 static int
557 nvme_rdma_connect(struct nvme_rdma_qpair *rqpair)
558 {
559 	struct rdma_conn_param				param = {};
560 	struct spdk_nvmf_rdma_request_private_data	request_data = {};
561 	struct spdk_nvmf_rdma_accept_private_data	*accept_data;
562 	struct ibv_device_attr				attr;
563 	int						ret;
564 	struct rdma_cm_event				*event;
565 	struct spdk_nvme_ctrlr				*ctrlr;
566 
567 	ret = ibv_query_device(rqpair->cm_id->verbs, &attr);
568 	if (ret != 0) {
569 		SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
570 		return ret;
571 	}
572 
573 	param.responder_resources = spdk_min(rqpair->num_entries, attr.max_qp_rd_atom);
574 
575 	ctrlr = rqpair->qpair.ctrlr;
576 	if (!ctrlr) {
577 		return -1;
578 	}
579 
580 	request_data.qid = rqpair->qpair.id;
581 	request_data.hrqsize = rqpair->num_entries;
582 	request_data.hsqsize = rqpair->num_entries - 1;
583 	request_data.cntlid = ctrlr->cntlid;
584 
585 	param.private_data = &request_data;
586 	param.private_data_len = sizeof(request_data);
587 	param.retry_count = 7;
588 	param.rnr_retry_count = 7;
589 
590 	ret = rdma_connect(rqpair->cm_id, &param);
591 	if (ret) {
592 		SPDK_ERRLOG("nvme rdma connect error\n");
593 		return ret;
594 	}
595 
596 	event = nvme_rdma_get_event(rqpair->cm_channel, RDMA_CM_EVENT_ESTABLISHED);
597 	if (event == NULL) {
598 		SPDK_ERRLOG("RDMA connect error\n");
599 		return -1;
600 	}
601 
602 	accept_data = (struct spdk_nvmf_rdma_accept_private_data *)event->param.conn.private_data;
603 	if (accept_data == NULL) {
604 		rdma_ack_cm_event(event);
605 		SPDK_ERRLOG("NVMe-oF target did not return accept data\n");
606 		return -1;
607 	}
608 
609 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "Requested queue depth %d. Actually got queue depth %d.\n",
610 		      rqpair->num_entries, accept_data->crqsize);
611 
612 	rqpair->num_entries = spdk_min(rqpair->num_entries, accept_data->crqsize);
613 
614 	rdma_ack_cm_event(event);
615 
616 	return 0;
617 }
618 
619 static int
620 nvme_rdma_parse_addr(struct sockaddr_storage *sa, int family, const char *addr, const char *service)
621 {
622 	struct addrinfo *res;
623 	struct addrinfo hints;
624 	int ret;
625 
626 	memset(&hints, 0, sizeof(hints));
627 	hints.ai_family = family;
628 	hints.ai_socktype = SOCK_STREAM;
629 	hints.ai_protocol = 0;
630 
631 	ret = getaddrinfo(addr, service, &hints, &res);
632 	if (ret) {
633 		SPDK_ERRLOG("getaddrinfo failed: %s (%d)\n", gai_strerror(ret), ret);
634 		return ret;
635 	}
636 
637 	if (res->ai_addrlen > sizeof(*sa)) {
638 		SPDK_ERRLOG("getaddrinfo() ai_addrlen %zu too large\n", (size_t)res->ai_addrlen);
639 		ret = EINVAL;
640 	} else {
641 		memcpy(sa, res->ai_addr, res->ai_addrlen);
642 	}
643 
644 	freeaddrinfo(res);
645 	return ret;
646 }
647 
648 static int
649 nvme_rdma_mr_map_notify(void *cb_ctx, struct spdk_mem_map *map,
650 			enum spdk_mem_map_notify_action action,
651 			void *vaddr, size_t size)
652 {
653 	struct ibv_pd *pd = cb_ctx;
654 	struct ibv_mr *mr;
655 	int rc;
656 
657 	switch (action) {
658 	case SPDK_MEM_MAP_NOTIFY_REGISTER:
659 		if (!g_nvme_hooks.get_rkey) {
660 			mr = ibv_reg_mr(pd, vaddr, size,
661 					IBV_ACCESS_LOCAL_WRITE |
662 					IBV_ACCESS_REMOTE_READ |
663 					IBV_ACCESS_REMOTE_WRITE);
664 			if (mr == NULL) {
665 				SPDK_ERRLOG("ibv_reg_mr() failed\n");
666 				return -EFAULT;
667 			} else {
668 				rc = spdk_mem_map_set_translation(map, (uint64_t)vaddr, size, (uint64_t)mr);
669 			}
670 		} else {
671 			rc = spdk_mem_map_set_translation(map, (uint64_t)vaddr, size,
672 							  g_nvme_hooks.get_rkey(pd, vaddr, size));
673 		}
674 		break;
675 	case SPDK_MEM_MAP_NOTIFY_UNREGISTER:
676 		if (!g_nvme_hooks.get_rkey) {
677 			mr = (struct ibv_mr *)spdk_mem_map_translate(map, (uint64_t)vaddr, NULL);
678 			if (mr) {
679 				ibv_dereg_mr(mr);
680 			}
681 		}
682 		rc = spdk_mem_map_clear_translation(map, (uint64_t)vaddr, size);
683 		break;
684 	default:
685 		SPDK_UNREACHABLE();
686 	}
687 
688 	return rc;
689 }
690 
691 static int
692 nvme_rdma_check_contiguous_entries(uint64_t addr_1, uint64_t addr_2)
693 {
694 	/* Two contiguous mappings will point to the same address which is the start of the RDMA MR. */
695 	return addr_1 == addr_2;
696 }
697 
698 static int
699 nvme_rdma_register_mem(struct nvme_rdma_qpair *rqpair)
700 {
701 	struct ibv_pd *pd = rqpair->cm_id->qp->pd;
702 	struct spdk_nvme_rdma_mr_map *mr_map;
703 	const struct spdk_mem_map_ops nvme_rdma_map_ops = {
704 		.notify_cb = nvme_rdma_mr_map_notify,
705 		.are_contiguous = nvme_rdma_check_contiguous_entries
706 	};
707 
708 	pthread_mutex_lock(&g_rdma_mr_maps_mutex);
709 
710 	/* Look up existing mem map registration for this pd */
711 	LIST_FOREACH(mr_map, &g_rdma_mr_maps, link) {
712 		if (mr_map->pd == pd) {
713 			mr_map->ref++;
714 			rqpair->mr_map = mr_map;
715 			pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
716 			return 0;
717 		}
718 	}
719 
720 	mr_map = calloc(1, sizeof(*mr_map));
721 	if (mr_map == NULL) {
722 		SPDK_ERRLOG("calloc() failed\n");
723 		pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
724 		return -1;
725 	}
726 
727 	mr_map->ref = 1;
728 	mr_map->pd = pd;
729 	mr_map->map = spdk_mem_map_alloc((uint64_t)NULL, &nvme_rdma_map_ops, pd);
730 	if (mr_map->map == NULL) {
731 		SPDK_ERRLOG("spdk_mem_map_alloc() failed\n");
732 		free(mr_map);
733 		pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
734 		return -1;
735 	}
736 
737 	rqpair->mr_map = mr_map;
738 	LIST_INSERT_HEAD(&g_rdma_mr_maps, mr_map, link);
739 
740 	pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
741 
742 	return 0;
743 }
744 
745 static void
746 nvme_rdma_unregister_mem(struct nvme_rdma_qpair *rqpair)
747 {
748 	struct spdk_nvme_rdma_mr_map *mr_map;
749 
750 	mr_map = rqpair->mr_map;
751 	rqpair->mr_map = NULL;
752 
753 	if (mr_map == NULL) {
754 		return;
755 	}
756 
757 	pthread_mutex_lock(&g_rdma_mr_maps_mutex);
758 
759 	assert(mr_map->ref > 0);
760 	mr_map->ref--;
761 	if (mr_map->ref == 0) {
762 		LIST_REMOVE(mr_map, link);
763 		spdk_mem_map_free(&mr_map->map);
764 		free(mr_map);
765 	}
766 
767 	pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
768 }
769 
770 static int
771 nvme_rdma_qpair_connect(struct nvme_rdma_qpair *rqpair)
772 {
773 	struct sockaddr_storage dst_addr;
774 	struct sockaddr_storage src_addr;
775 	bool src_addr_specified;
776 	int rc;
777 	struct spdk_nvme_ctrlr *ctrlr;
778 	int family;
779 
780 	rqpair->cm_channel = rdma_create_event_channel();
781 	if (rqpair->cm_channel == NULL) {
782 		SPDK_ERRLOG("rdma_create_event_channel() failed\n");
783 		return -1;
784 	}
785 
786 	ctrlr = rqpair->qpair.ctrlr;
787 
788 	switch (ctrlr->trid.adrfam) {
789 	case SPDK_NVMF_ADRFAM_IPV4:
790 		family = AF_INET;
791 		break;
792 	case SPDK_NVMF_ADRFAM_IPV6:
793 		family = AF_INET6;
794 		break;
795 	default:
796 		SPDK_ERRLOG("Unhandled ADRFAM %d\n", ctrlr->trid.adrfam);
797 		return -1;
798 	}
799 
800 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "adrfam %d ai_family %d\n", ctrlr->trid.adrfam, family);
801 
802 	memset(&dst_addr, 0, sizeof(dst_addr));
803 
804 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "trsvcid is %s\n", ctrlr->trid.trsvcid);
805 	rc = nvme_rdma_parse_addr(&dst_addr, family, ctrlr->trid.traddr, ctrlr->trid.trsvcid);
806 	if (rc != 0) {
807 		SPDK_ERRLOG("dst_addr nvme_rdma_parse_addr() failed\n");
808 		return -1;
809 	}
810 
811 	if (ctrlr->opts.src_addr[0] || ctrlr->opts.src_svcid[0]) {
812 		memset(&src_addr, 0, sizeof(src_addr));
813 		rc = nvme_rdma_parse_addr(&src_addr, family, ctrlr->opts.src_addr, ctrlr->opts.src_svcid);
814 		if (rc != 0) {
815 			SPDK_ERRLOG("src_addr nvme_rdma_parse_addr() failed\n");
816 			return -1;
817 		}
818 		src_addr_specified = true;
819 	} else {
820 		src_addr_specified = false;
821 	}
822 
823 	rc = rdma_create_id(rqpair->cm_channel, &rqpair->cm_id, rqpair, RDMA_PS_TCP);
824 	if (rc < 0) {
825 		SPDK_ERRLOG("rdma_create_id() failed\n");
826 		return -1;
827 	}
828 
829 	rc = nvme_rdma_resolve_addr(rqpair,
830 				    src_addr_specified ? (struct sockaddr *)&src_addr : NULL,
831 				    (struct sockaddr *)&dst_addr, rqpair->cm_channel);
832 	if (rc < 0) {
833 		SPDK_ERRLOG("nvme_rdma_resolve_addr() failed\n");
834 		return -1;
835 	}
836 
837 	rc = nvme_rdma_qpair_init(rqpair);
838 	if (rc < 0) {
839 		SPDK_ERRLOG("nvme_rdma_qpair_init() failed\n");
840 		return -1;
841 	}
842 
843 	rc = nvme_rdma_connect(rqpair);
844 	if (rc != 0) {
845 		SPDK_ERRLOG("Unable to connect the rqpair\n");
846 		return -1;
847 	}
848 
849 	rc = nvme_rdma_alloc_reqs(rqpair);
850 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "rc =%d\n", rc);
851 	if (rc) {
852 		SPDK_ERRLOG("Unable to allocate rqpair  RDMA requests\n");
853 		return -1;
854 	}
855 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "RDMA requests allocated\n");
856 
857 	rc = nvme_rdma_alloc_rsps(rqpair);
858 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "rc =%d\n", rc);
859 	if (rc < 0) {
860 		SPDK_ERRLOG("Unable to allocate rqpair RDMA responses\n");
861 		return -1;
862 	}
863 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "RDMA responses allocated\n");
864 
865 	rc = nvme_rdma_register_mem(rqpair);
866 	if (rc < 0) {
867 		SPDK_ERRLOG("Unable to register memory for RDMA\n");
868 		return -1;
869 	}
870 
871 	rc = nvme_fabric_qpair_connect(&rqpair->qpair, rqpair->num_entries);
872 	if (rc < 0) {
873 		SPDK_ERRLOG("Failed to send an NVMe-oF Fabric CONNECT command\n");
874 		return -1;
875 	}
876 
877 	return 0;
878 }
879 
880 /*
881  * Build SGL describing empty payload.
882  */
883 static int
884 nvme_rdma_build_null_request(struct spdk_nvme_rdma_req *rdma_req)
885 {
886 	struct nvme_request *req = rdma_req->req;
887 
888 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
889 
890 	/* The first element of this SGL is pointing at an
891 	 * spdk_nvmf_cmd object. For this particular command,
892 	 * we only need the first 64 bytes corresponding to
893 	 * the NVMe command. */
894 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
895 
896 	/* The RDMA SGL needs one element describing the NVMe command. */
897 	rdma_req->send_wr.num_sge = 1;
898 
899 	req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
900 	req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
901 	req->cmd.dptr.sgl1.keyed.length = 0;
902 	req->cmd.dptr.sgl1.keyed.key = 0;
903 	req->cmd.dptr.sgl1.address = 0;
904 
905 	return 0;
906 }
907 
908 /*
909  * Build inline SGL describing contiguous payload buffer.
910  */
911 static int
912 nvme_rdma_build_contig_inline_request(struct nvme_rdma_qpair *rqpair,
913 				      struct spdk_nvme_rdma_req *rdma_req)
914 {
915 	struct nvme_request *req = rdma_req->req;
916 	struct ibv_mr *mr;
917 	void *payload;
918 	uint64_t requested_size;
919 
920 	payload = req->payload.contig_or_cb_arg + req->payload_offset;
921 	assert(req->payload_size != 0);
922 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
923 
924 	requested_size = req->payload_size;
925 
926 	if (!g_nvme_hooks.get_rkey) {
927 		mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map,
928 				(uint64_t)payload, &requested_size);
929 
930 		if (mr == NULL || requested_size < req->payload_size) {
931 			if (mr) {
932 				SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
933 			}
934 			return -EINVAL;
935 		}
936 		rdma_req->send_sgl[1].lkey = mr->lkey;
937 	} else {
938 		rdma_req->send_sgl[1].lkey = spdk_mem_map_translate(rqpair->mr_map->map,
939 					     (uint64_t)payload,
940 					     &requested_size);
941 
942 	}
943 
944 	/* The first element of this SGL is pointing at an
945 	 * spdk_nvmf_cmd object. For this particular command,
946 	 * we only need the first 64 bytes corresponding to
947 	 * the NVMe command. */
948 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
949 
950 	rdma_req->send_sgl[1].addr = (uint64_t)payload;
951 	rdma_req->send_sgl[1].length = (uint32_t)req->payload_size;
952 
953 	/* The RDMA SGL contains two elements. The first describes
954 	 * the NVMe command and the second describes the data
955 	 * payload. */
956 	rdma_req->send_wr.num_sge = 2;
957 
958 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
959 	req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
960 	req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
961 	req->cmd.dptr.sgl1.unkeyed.length = (uint32_t)req->payload_size;
962 	/* Inline only supported for icdoff == 0 currently.  This function will
963 	 * not get called for controllers with other values. */
964 	req->cmd.dptr.sgl1.address = (uint64_t)0;
965 
966 	return 0;
967 }
968 
969 /*
970  * Build SGL describing contiguous payload buffer.
971  */
972 static int
973 nvme_rdma_build_contig_request(struct nvme_rdma_qpair *rqpair,
974 			       struct spdk_nvme_rdma_req *rdma_req)
975 {
976 	struct nvme_request *req = rdma_req->req;
977 	void *payload = req->payload.contig_or_cb_arg + req->payload_offset;
978 	struct ibv_mr *mr;
979 	uint64_t requested_size;
980 
981 	assert(req->payload_size != 0);
982 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
983 
984 	requested_size = req->payload_size;
985 	if (!g_nvme_hooks.get_rkey) {
986 
987 		mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map, (uint64_t)payload,
988 				&requested_size);
989 		if (mr == NULL) {
990 			return -1;
991 		}
992 		req->cmd.dptr.sgl1.keyed.key = mr->rkey;
993 	} else {
994 		req->cmd.dptr.sgl1.keyed.key = spdk_mem_map_translate(rqpair->mr_map->map,
995 					       (uint64_t)payload,
996 					       &requested_size);
997 	}
998 
999 	if (requested_size < req->payload_size) {
1000 		SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
1001 		return -1;
1002 	}
1003 
1004 	/* The first element of this SGL is pointing at an
1005 	 * spdk_nvmf_cmd object. For this particular command,
1006 	 * we only need the first 64 bytes corresponding to
1007 	 * the NVMe command. */
1008 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1009 
1010 	/* The RDMA SGL needs one element describing the NVMe command. */
1011 	rdma_req->send_wr.num_sge = 1;
1012 
1013 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1014 	req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1015 	req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1016 	req->cmd.dptr.sgl1.keyed.length = req->payload_size;
1017 	req->cmd.dptr.sgl1.address = (uint64_t)payload;
1018 
1019 	return 0;
1020 }
1021 
1022 /*
1023  * Build SGL describing scattered payload buffer.
1024  */
1025 static int
1026 nvme_rdma_build_sgl_request(struct nvme_rdma_qpair *rqpair,
1027 			    struct spdk_nvme_rdma_req *rdma_req)
1028 {
1029 	struct nvme_request *req = rdma_req->req;
1030 	struct spdk_nvmf_cmd *cmd = &rqpair->cmds[rdma_req->id];
1031 	struct ibv_mr *mr = NULL;
1032 	void *virt_addr;
1033 	uint64_t remaining_size, mr_length;
1034 	uint32_t sge_length;
1035 	int rc, max_num_sgl, num_sgl_desc;
1036 
1037 	assert(req->payload_size != 0);
1038 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
1039 	assert(req->payload.reset_sgl_fn != NULL);
1040 	assert(req->payload.next_sge_fn != NULL);
1041 	req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
1042 
1043 	max_num_sgl = req->qpair->ctrlr->max_sges;
1044 
1045 	remaining_size = req->payload_size;
1046 	num_sgl_desc = 0;
1047 	do {
1048 		rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &virt_addr, &sge_length);
1049 		if (rc) {
1050 			return -1;
1051 		}
1052 
1053 		sge_length = spdk_min(remaining_size, sge_length);
1054 		mr_length = sge_length;
1055 
1056 		if (!g_nvme_hooks.get_rkey) {
1057 			mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map,
1058 					(uint64_t)virt_addr,
1059 					&mr_length);
1060 			if (mr == NULL) {
1061 				return -1;
1062 			}
1063 			cmd->sgl[num_sgl_desc].keyed.key = mr->rkey;
1064 		} else {
1065 			cmd->sgl[num_sgl_desc].keyed.key = spdk_mem_map_translate(rqpair->mr_map->map,
1066 							   (uint64_t)virt_addr,
1067 							   &mr_length);
1068 		}
1069 
1070 		if (mr_length < sge_length) {
1071 			SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
1072 			return -1;
1073 		}
1074 
1075 		cmd->sgl[num_sgl_desc].keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1076 		cmd->sgl[num_sgl_desc].keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1077 		cmd->sgl[num_sgl_desc].keyed.length = sge_length;
1078 		cmd->sgl[num_sgl_desc].address = (uint64_t)virt_addr;
1079 
1080 		remaining_size -= sge_length;
1081 		num_sgl_desc++;
1082 	} while (remaining_size > 0 && num_sgl_desc < max_num_sgl);
1083 
1084 
1085 	/* Should be impossible if we did our sgl checks properly up the stack, but do a sanity check here. */
1086 	if (remaining_size > 0) {
1087 		return -1;
1088 	}
1089 
1090 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1091 
1092 	/* The RDMA SGL needs one element describing some portion
1093 	 * of the spdk_nvmf_cmd structure. */
1094 	rdma_req->send_wr.num_sge = 1;
1095 
1096 	/*
1097 	 * If only one SGL descriptor is required, it can be embedded directly in the command
1098 	 * as a data block descriptor.
1099 	 */
1100 	if (num_sgl_desc == 1) {
1101 		/* The first element of this SGL is pointing at an
1102 		 * spdk_nvmf_cmd object. For this particular command,
1103 		 * we only need the first 64 bytes corresponding to
1104 		 * the NVMe command. */
1105 		rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1106 
1107 		req->cmd.dptr.sgl1.keyed.type = cmd->sgl[0].keyed.type;
1108 		req->cmd.dptr.sgl1.keyed.subtype = cmd->sgl[0].keyed.subtype;
1109 		req->cmd.dptr.sgl1.keyed.length = cmd->sgl[0].keyed.length;
1110 		req->cmd.dptr.sgl1.keyed.key = cmd->sgl[0].keyed.key;
1111 		req->cmd.dptr.sgl1.address = cmd->sgl[0].address;
1112 	} else {
1113 		/*
1114 		 * Otherwise, The SGL descriptor embedded in the command must point to the list of
1115 		 * SGL descriptors used to describe the operation. In that case it is a last segment descriptor.
1116 		 */
1117 		rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd) + sizeof(struct
1118 					       spdk_nvme_sgl_descriptor) * num_sgl_desc;
1119 
1120 		req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_LAST_SEGMENT;
1121 		req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1122 		req->cmd.dptr.sgl1.unkeyed.length = num_sgl_desc * sizeof(struct spdk_nvme_sgl_descriptor);
1123 		req->cmd.dptr.sgl1.address = (uint64_t)0;
1124 	}
1125 
1126 	return 0;
1127 }
1128 
1129 /*
1130  * Build inline SGL describing sgl payload buffer.
1131  */
1132 static int
1133 nvme_rdma_build_sgl_inline_request(struct nvme_rdma_qpair *rqpair,
1134 				   struct spdk_nvme_rdma_req *rdma_req)
1135 {
1136 	struct nvme_request *req = rdma_req->req;
1137 	struct ibv_mr *mr;
1138 	uint32_t length;
1139 	uint64_t requested_size;
1140 	uint32_t remaining_payload;
1141 	void *virt_addr;
1142 	int rc, i;
1143 
1144 	assert(req->payload_size != 0);
1145 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
1146 	assert(req->payload.reset_sgl_fn != NULL);
1147 	assert(req->payload.next_sge_fn != NULL);
1148 	req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
1149 
1150 	remaining_payload = req->payload_size;
1151 	rdma_req->send_wr.num_sge = 1;
1152 
1153 	do {
1154 		rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &virt_addr, &length);
1155 		if (rc) {
1156 			return -1;
1157 		}
1158 
1159 		assert(length <= remaining_payload);
1160 
1161 		requested_size = length;
1162 		mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map, (uint64_t)virt_addr,
1163 				&requested_size);
1164 		if (mr == NULL || requested_size < length) {
1165 			for (i = 1; i < rdma_req->send_wr.num_sge; i++) {
1166 				rdma_req->send_sgl[i].addr = 0;
1167 				rdma_req->send_sgl[i].length = 0;
1168 				rdma_req->send_sgl[i].lkey = 0;
1169 			}
1170 
1171 			if (mr) {
1172 				SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
1173 			}
1174 			return -1;
1175 		}
1176 
1177 		rdma_req->send_sgl[rdma_req->send_wr.num_sge].addr = (uint64_t)virt_addr;
1178 		rdma_req->send_sgl[rdma_req->send_wr.num_sge].length = length;
1179 		rdma_req->send_sgl[rdma_req->send_wr.num_sge].lkey = mr->lkey;
1180 		rdma_req->send_wr.num_sge++;
1181 
1182 		remaining_payload -= length;
1183 	} while (remaining_payload && rdma_req->send_wr.num_sge < (int64_t)rqpair->max_send_sge);
1184 
1185 	if (remaining_payload) {
1186 		SPDK_ERRLOG("Unable to prepare request. Too many SGL elements\n");
1187 		return -1;
1188 	}
1189 
1190 	/* The first element of this SGL is pointing at an
1191 	 * spdk_nvmf_cmd object. For this particular command,
1192 	 * we only need the first 64 bytes corresponding to
1193 	 * the NVMe command. */
1194 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1195 
1196 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1197 	req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
1198 	req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1199 	req->cmd.dptr.sgl1.unkeyed.length = (uint32_t)req->payload_size;
1200 	/* Inline only supported for icdoff == 0 currently.  This function will
1201 	 * not get called for controllers with other values. */
1202 	req->cmd.dptr.sgl1.address = (uint64_t)0;
1203 
1204 	return 0;
1205 }
1206 
1207 static inline unsigned int
1208 nvme_rdma_icdsz_bytes(struct spdk_nvme_ctrlr *ctrlr)
1209 {
1210 	return (ctrlr->cdata.nvmf_specific.ioccsz * 16 - sizeof(struct spdk_nvme_cmd));
1211 }
1212 
1213 static int
1214 nvme_rdma_req_init(struct nvme_rdma_qpair *rqpair, struct nvme_request *req,
1215 		   struct spdk_nvme_rdma_req *rdma_req)
1216 {
1217 	struct spdk_nvme_ctrlr *ctrlr = rqpair->qpair.ctrlr;
1218 	int rc;
1219 
1220 	rdma_req->req = req;
1221 	req->cmd.cid = rdma_req->id;
1222 
1223 	if (req->payload_size == 0) {
1224 		rc = nvme_rdma_build_null_request(rdma_req);
1225 	} else if (nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG) {
1226 		/*
1227 		 * Check if icdoff is non zero, to avoid interop conflicts with
1228 		 * targets with non-zero icdoff.  Both SPDK and the Linux kernel
1229 		 * targets use icdoff = 0.  For targets with non-zero icdoff, we
1230 		 * will currently just not use inline data for now.
1231 		 */
1232 		if (req->cmd.opc == SPDK_NVME_OPC_WRITE &&
1233 		    req->payload_size <= nvme_rdma_icdsz_bytes(ctrlr) &&
1234 		    (ctrlr->cdata.nvmf_specific.icdoff == 0)) {
1235 			rc = nvme_rdma_build_contig_inline_request(rqpair, rdma_req);
1236 		} else {
1237 			rc = nvme_rdma_build_contig_request(rqpair, rdma_req);
1238 		}
1239 	} else if (nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL) {
1240 		if (req->cmd.opc == SPDK_NVME_OPC_WRITE &&
1241 		    req->payload_size <= nvme_rdma_icdsz_bytes(ctrlr) &&
1242 		    ctrlr->cdata.nvmf_specific.icdoff == 0) {
1243 			rc = nvme_rdma_build_sgl_inline_request(rqpair, rdma_req);
1244 		} else {
1245 			rc = nvme_rdma_build_sgl_request(rqpair, rdma_req);
1246 		}
1247 	} else {
1248 		rc = -1;
1249 	}
1250 
1251 	if (rc) {
1252 		return rc;
1253 	}
1254 
1255 	memcpy(&rqpair->cmds[rdma_req->id], &req->cmd, sizeof(req->cmd));
1256 	return 0;
1257 }
1258 
1259 static struct spdk_nvme_qpair *
1260 nvme_rdma_ctrlr_create_qpair(struct spdk_nvme_ctrlr *ctrlr,
1261 			     uint16_t qid, uint32_t qsize,
1262 			     enum spdk_nvme_qprio qprio,
1263 			     uint32_t num_requests)
1264 {
1265 	struct nvme_rdma_qpair *rqpair;
1266 	struct spdk_nvme_qpair *qpair;
1267 	int rc;
1268 
1269 	rqpair = calloc(1, sizeof(struct nvme_rdma_qpair));
1270 	if (!rqpair) {
1271 		SPDK_ERRLOG("failed to get create rqpair\n");
1272 		return NULL;
1273 	}
1274 
1275 	rqpair->num_entries = qsize;
1276 
1277 	qpair = &rqpair->qpair;
1278 
1279 	rc = nvme_qpair_init(qpair, qid, ctrlr, qprio, num_requests);
1280 	if (rc != 0) {
1281 		return NULL;
1282 	}
1283 
1284 	rc = nvme_rdma_qpair_connect(rqpair);
1285 	if (rc < 0) {
1286 		nvme_rdma_qpair_destroy(qpair);
1287 		return NULL;
1288 	}
1289 
1290 	return qpair;
1291 }
1292 
1293 static int
1294 nvme_rdma_qpair_destroy(struct spdk_nvme_qpair *qpair)
1295 {
1296 	struct nvme_rdma_qpair *rqpair;
1297 
1298 	if (!qpair) {
1299 		return -1;
1300 	}
1301 	nvme_rdma_qpair_fail(qpair);
1302 	nvme_qpair_deinit(qpair);
1303 
1304 	rqpair = nvme_rdma_qpair(qpair);
1305 
1306 	nvme_rdma_unregister_mem(rqpair);
1307 	nvme_rdma_free_reqs(rqpair);
1308 	nvme_rdma_free_rsps(rqpair);
1309 
1310 	if (rqpair->cm_id) {
1311 		if (rqpair->cm_id->qp) {
1312 			rdma_destroy_qp(rqpair->cm_id);
1313 		}
1314 		rdma_destroy_id(rqpair->cm_id);
1315 	}
1316 
1317 	if (rqpair->cq) {
1318 		ibv_destroy_cq(rqpair->cq);
1319 	}
1320 
1321 	if (rqpair->cm_channel) {
1322 		rdma_destroy_event_channel(rqpair->cm_channel);
1323 	}
1324 
1325 	free(rqpair);
1326 
1327 	return 0;
1328 }
1329 
1330 struct spdk_nvme_qpair *
1331 nvme_rdma_ctrlr_create_io_qpair(struct spdk_nvme_ctrlr *ctrlr, uint16_t qid,
1332 				const struct spdk_nvme_io_qpair_opts *opts)
1333 {
1334 	return nvme_rdma_ctrlr_create_qpair(ctrlr, qid, opts->io_queue_size, opts->qprio,
1335 					    opts->io_queue_requests);
1336 }
1337 
1338 int
1339 nvme_rdma_ctrlr_enable(struct spdk_nvme_ctrlr *ctrlr)
1340 {
1341 	/* do nothing here */
1342 	return 0;
1343 }
1344 
1345 /* This function must only be called while holding g_spdk_nvme_driver->lock */
1346 int
1347 nvme_rdma_ctrlr_scan(struct spdk_nvme_probe_ctx *probe_ctx,
1348 		     bool direct_connect)
1349 {
1350 	struct spdk_nvme_ctrlr_opts discovery_opts;
1351 	struct spdk_nvme_ctrlr *discovery_ctrlr;
1352 	union spdk_nvme_cc_register cc;
1353 	int rc;
1354 	struct nvme_completion_poll_status status;
1355 
1356 	if (strcmp(probe_ctx->trid.subnqn, SPDK_NVMF_DISCOVERY_NQN) != 0) {
1357 		/* It is not a discovery_ctrlr info and try to directly connect it */
1358 		rc = nvme_ctrlr_probe(&probe_ctx->trid, probe_ctx, NULL);
1359 		return rc;
1360 	}
1361 
1362 	spdk_nvme_ctrlr_get_default_ctrlr_opts(&discovery_opts, sizeof(discovery_opts));
1363 	/* For discovery_ctrlr set the timeout to 0 */
1364 	discovery_opts.keep_alive_timeout_ms = 0;
1365 
1366 	discovery_ctrlr = nvme_rdma_ctrlr_construct(&probe_ctx->trid, &discovery_opts, NULL);
1367 	if (discovery_ctrlr == NULL) {
1368 		return -1;
1369 	}
1370 
1371 	/* TODO: this should be using the normal NVMe controller initialization process */
1372 	cc.raw = 0;
1373 	cc.bits.en = 1;
1374 	cc.bits.iosqes = 6; /* SQ entry size == 64 == 2^6 */
1375 	cc.bits.iocqes = 4; /* CQ entry size == 16 == 2^4 */
1376 	rc = nvme_transport_ctrlr_set_reg_4(discovery_ctrlr, offsetof(struct spdk_nvme_registers, cc.raw),
1377 					    cc.raw);
1378 	if (rc < 0) {
1379 		SPDK_ERRLOG("Failed to set cc\n");
1380 		nvme_ctrlr_destruct(discovery_ctrlr);
1381 		return -1;
1382 	}
1383 
1384 	/* Direct attach through spdk_nvme_connect() API */
1385 	if (direct_connect == true) {
1386 		/* get the cdata info */
1387 		rc = nvme_ctrlr_cmd_identify(discovery_ctrlr, SPDK_NVME_IDENTIFY_CTRLR, 0, 0,
1388 					     &discovery_ctrlr->cdata, sizeof(discovery_ctrlr->cdata),
1389 					     nvme_completion_poll_cb, &status);
1390 		if (rc != 0) {
1391 			SPDK_ERRLOG("Failed to identify cdata\n");
1392 			return rc;
1393 		}
1394 
1395 		if (spdk_nvme_wait_for_completion(discovery_ctrlr->adminq, &status)) {
1396 			SPDK_ERRLOG("nvme_identify_controller failed!\n");
1397 			return -ENXIO;
1398 		}
1399 
1400 		/* Set the ready state to skip the normal init process */
1401 		discovery_ctrlr->state = NVME_CTRLR_STATE_READY;
1402 		nvme_ctrlr_connected(probe_ctx, discovery_ctrlr);
1403 		nvme_ctrlr_add_process(discovery_ctrlr, 0);
1404 		return 0;
1405 	}
1406 
1407 	rc = nvme_fabric_ctrlr_discover(discovery_ctrlr, probe_ctx);
1408 	nvme_ctrlr_destruct(discovery_ctrlr);
1409 	return rc;
1410 }
1411 
1412 struct spdk_nvme_ctrlr *nvme_rdma_ctrlr_construct(const struct spdk_nvme_transport_id *trid,
1413 		const struct spdk_nvme_ctrlr_opts *opts,
1414 		void *devhandle)
1415 {
1416 	struct nvme_rdma_ctrlr *rctrlr;
1417 	union spdk_nvme_cap_register cap;
1418 	union spdk_nvme_vs_register vs;
1419 	int rc;
1420 
1421 	rctrlr = calloc(1, sizeof(struct nvme_rdma_ctrlr));
1422 	if (rctrlr == NULL) {
1423 		SPDK_ERRLOG("could not allocate ctrlr\n");
1424 		return NULL;
1425 	}
1426 
1427 	rctrlr->ctrlr.trid.trtype = SPDK_NVME_TRANSPORT_RDMA;
1428 	rctrlr->ctrlr.opts = *opts;
1429 	memcpy(&rctrlr->ctrlr.trid, trid, sizeof(rctrlr->ctrlr.trid));
1430 
1431 	rc = nvme_ctrlr_construct(&rctrlr->ctrlr);
1432 	if (rc != 0) {
1433 		free(rctrlr);
1434 		return NULL;
1435 	}
1436 
1437 	rctrlr->ctrlr.adminq = nvme_rdma_ctrlr_create_qpair(&rctrlr->ctrlr, 0,
1438 			       SPDK_NVMF_MIN_ADMIN_QUEUE_ENTRIES, 0, SPDK_NVMF_MIN_ADMIN_QUEUE_ENTRIES);
1439 	if (!rctrlr->ctrlr.adminq) {
1440 		SPDK_ERRLOG("failed to create admin qpair\n");
1441 		nvme_rdma_ctrlr_destruct(&rctrlr->ctrlr);
1442 		return NULL;
1443 	}
1444 
1445 	if (nvme_ctrlr_get_cap(&rctrlr->ctrlr, &cap)) {
1446 		SPDK_ERRLOG("get_cap() failed\n");
1447 		nvme_ctrlr_destruct(&rctrlr->ctrlr);
1448 		return NULL;
1449 	}
1450 
1451 	if (nvme_ctrlr_get_vs(&rctrlr->ctrlr, &vs)) {
1452 		SPDK_ERRLOG("get_vs() failed\n");
1453 		nvme_ctrlr_destruct(&rctrlr->ctrlr);
1454 		return NULL;
1455 	}
1456 
1457 	if (nvme_ctrlr_add_process(&rctrlr->ctrlr, 0) != 0) {
1458 		SPDK_ERRLOG("nvme_ctrlr_add_process() failed\n");
1459 		nvme_ctrlr_destruct(&rctrlr->ctrlr);
1460 		return NULL;
1461 	}
1462 
1463 	nvme_ctrlr_init_cap(&rctrlr->ctrlr, &cap, &vs);
1464 
1465 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "successfully initialized the nvmf ctrlr\n");
1466 	return &rctrlr->ctrlr;
1467 }
1468 
1469 int
1470 nvme_rdma_ctrlr_destruct(struct spdk_nvme_ctrlr *ctrlr)
1471 {
1472 	struct nvme_rdma_ctrlr *rctrlr = nvme_rdma_ctrlr(ctrlr);
1473 
1474 	if (ctrlr->adminq) {
1475 		nvme_rdma_qpair_destroy(ctrlr->adminq);
1476 	}
1477 
1478 	nvme_ctrlr_destruct_finish(ctrlr);
1479 
1480 	free(rctrlr);
1481 
1482 	return 0;
1483 }
1484 
1485 int
1486 nvme_rdma_ctrlr_set_reg_4(struct spdk_nvme_ctrlr *ctrlr, uint32_t offset, uint32_t value)
1487 {
1488 	return nvme_fabric_ctrlr_set_reg_4(ctrlr, offset, value);
1489 }
1490 
1491 int
1492 nvme_rdma_ctrlr_set_reg_8(struct spdk_nvme_ctrlr *ctrlr, uint32_t offset, uint64_t value)
1493 {
1494 	return nvme_fabric_ctrlr_set_reg_8(ctrlr, offset, value);
1495 }
1496 
1497 int
1498 nvme_rdma_ctrlr_get_reg_4(struct spdk_nvme_ctrlr *ctrlr, uint32_t offset, uint32_t *value)
1499 {
1500 	return nvme_fabric_ctrlr_get_reg_4(ctrlr, offset, value);
1501 }
1502 
1503 int
1504 nvme_rdma_ctrlr_get_reg_8(struct spdk_nvme_ctrlr *ctrlr, uint32_t offset, uint64_t *value)
1505 {
1506 	return nvme_fabric_ctrlr_get_reg_8(ctrlr, offset, value);
1507 }
1508 
1509 int
1510 nvme_rdma_qpair_submit_request(struct spdk_nvme_qpair *qpair,
1511 			       struct nvme_request *req)
1512 {
1513 	struct nvme_rdma_qpair *rqpair;
1514 	struct spdk_nvme_rdma_req *rdma_req;
1515 	struct ibv_send_wr *wr, *bad_wr = NULL;
1516 	int rc;
1517 
1518 	rqpair = nvme_rdma_qpair(qpair);
1519 	assert(rqpair != NULL);
1520 	assert(req != NULL);
1521 
1522 	rdma_req = nvme_rdma_req_get(rqpair);
1523 	if (!rdma_req) {
1524 		/*
1525 		 * No rdma_req is available.  Queue the request to be processed later.
1526 		 */
1527 		STAILQ_INSERT_TAIL(&qpair->queued_req, req, stailq);
1528 		return 0;
1529 	}
1530 
1531 	if (nvme_rdma_req_init(rqpair, req, rdma_req)) {
1532 		SPDK_ERRLOG("nvme_rdma_req_init() failed\n");
1533 		nvme_rdma_req_put(rqpair, rdma_req);
1534 		return -1;
1535 	}
1536 
1537 	wr = &rdma_req->send_wr;
1538 
1539 	nvme_rdma_trace_ibv_sge(wr->sg_list);
1540 
1541 	rc = ibv_post_send(rqpair->cm_id->qp, wr, &bad_wr);
1542 	if (rc) {
1543 		SPDK_ERRLOG("Failure posting rdma send for NVMf completion: %d (%s)\n", rc, spdk_strerror(rc));
1544 	}
1545 
1546 	return rc;
1547 }
1548 
1549 int
1550 nvme_rdma_ctrlr_delete_io_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
1551 {
1552 	return nvme_rdma_qpair_destroy(qpair);
1553 }
1554 
1555 int
1556 nvme_rdma_ctrlr_reinit_io_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
1557 {
1558 	return nvme_rdma_qpair_connect(nvme_rdma_qpair(qpair));
1559 }
1560 
1561 int
1562 nvme_rdma_qpair_enable(struct spdk_nvme_qpair *qpair)
1563 {
1564 	/* Currently, doing nothing here */
1565 	return 0;
1566 }
1567 
1568 int
1569 nvme_rdma_qpair_disable(struct spdk_nvme_qpair *qpair)
1570 {
1571 	/* Currently, doing nothing here */
1572 	return 0;
1573 }
1574 
1575 int
1576 nvme_rdma_qpair_reset(struct spdk_nvme_qpair *qpair)
1577 {
1578 	/* Currently, doing nothing here */
1579 	return 0;
1580 }
1581 
1582 int
1583 nvme_rdma_qpair_fail(struct spdk_nvme_qpair *qpair)
1584 {
1585 	/*
1586 	 * If the qpair is really failed, the connection is broken
1587 	 * and we need to flush back all I/O
1588 	 */
1589 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
1590 	struct nvme_request *req;
1591 	struct spdk_nvme_cpl cpl;
1592 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
1593 
1594 	cpl.status.sc = SPDK_NVME_SC_ABORTED_SQ_DELETION;
1595 	cpl.status.sct = SPDK_NVME_SCT_GENERIC;
1596 
1597 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
1598 		assert(rdma_req->req != NULL);
1599 		req = rdma_req->req;
1600 
1601 		nvme_rdma_req_complete(req, &cpl);
1602 		nvme_rdma_req_put(rqpair, rdma_req);
1603 	}
1604 
1605 	return 0;
1606 }
1607 
1608 static void
1609 nvme_rdma_qpair_check_timeout(struct spdk_nvme_qpair *qpair)
1610 {
1611 	uint64_t t02;
1612 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
1613 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
1614 	struct spdk_nvme_ctrlr *ctrlr = qpair->ctrlr;
1615 	struct spdk_nvme_ctrlr_process *active_proc;
1616 
1617 	/* Don't check timeouts during controller initialization. */
1618 	if (ctrlr->state != NVME_CTRLR_STATE_READY) {
1619 		return;
1620 	}
1621 
1622 	if (nvme_qpair_is_admin_queue(qpair)) {
1623 		active_proc = spdk_nvme_ctrlr_get_current_process(ctrlr);
1624 	} else {
1625 		active_proc = qpair->active_proc;
1626 	}
1627 
1628 	/* Only check timeouts if the current process has a timeout callback. */
1629 	if (active_proc == NULL || active_proc->timeout_cb_fn == NULL) {
1630 		return;
1631 	}
1632 
1633 	t02 = spdk_get_ticks();
1634 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
1635 		assert(rdma_req->req != NULL);
1636 
1637 		if (nvme_request_check_timeout(rdma_req->req, rdma_req->id, active_proc, t02)) {
1638 			/*
1639 			 * The requests are in order, so as soon as one has not timed out,
1640 			 * stop iterating.
1641 			 */
1642 			break;
1643 		}
1644 	}
1645 }
1646 
1647 #define MAX_COMPLETIONS_PER_POLL 128
1648 
1649 int
1650 nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
1651 				    uint32_t max_completions)
1652 {
1653 	struct nvme_rdma_qpair		*rqpair = nvme_rdma_qpair(qpair);
1654 	struct ibv_wc			wc[MAX_COMPLETIONS_PER_POLL];
1655 	int				i, rc, batch_size;
1656 	uint32_t			reaped;
1657 	struct ibv_cq			*cq;
1658 	struct spdk_nvme_rdma_req	*rdma_req;
1659 
1660 	if (max_completions == 0) {
1661 		max_completions = rqpair->num_entries;
1662 	} else {
1663 		max_completions = spdk_min(max_completions, rqpair->num_entries);
1664 	}
1665 
1666 	cq = rqpair->cq;
1667 
1668 	reaped = 0;
1669 	do {
1670 		batch_size = spdk_min((max_completions - reaped),
1671 				      MAX_COMPLETIONS_PER_POLL);
1672 		rc = ibv_poll_cq(cq, batch_size, wc);
1673 		if (rc < 0) {
1674 			SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
1675 				    errno, spdk_strerror(errno));
1676 			return -1;
1677 		} else if (rc == 0) {
1678 			/* Ran out of completions */
1679 			break;
1680 		}
1681 
1682 		for (i = 0; i < rc; i++) {
1683 			if (wc[i].status) {
1684 				SPDK_ERRLOG("CQ error on Queue Pair %p, Response Index %lu (%d): %s\n",
1685 					    qpair, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
1686 				return -1;
1687 			}
1688 
1689 			switch (wc[i].opcode) {
1690 			case IBV_WC_RECV:
1691 				SPDK_DEBUGLOG(SPDK_LOG_NVME, "CQ recv completion\n");
1692 
1693 				reaped++;
1694 
1695 				if (wc[i].byte_len < sizeof(struct spdk_nvme_cpl)) {
1696 					SPDK_ERRLOG("recv length %u less than expected response size\n", wc[i].byte_len);
1697 					return -1;
1698 				}
1699 
1700 				if (nvme_rdma_recv(rqpair, wc[i].wr_id)) {
1701 					SPDK_ERRLOG("nvme_rdma_recv processing failure\n");
1702 					return -1;
1703 				}
1704 				break;
1705 
1706 			case IBV_WC_SEND:
1707 				rdma_req = (struct spdk_nvme_rdma_req *)wc[i].wr_id;
1708 
1709 				if (rdma_req->request_ready_to_put) {
1710 					nvme_rdma_req_put(rqpair, rdma_req);
1711 				} else {
1712 					rdma_req->request_ready_to_put = true;
1713 				}
1714 				break;
1715 
1716 			default:
1717 				SPDK_ERRLOG("Received an unexpected opcode on the CQ: %d\n", wc[i].opcode);
1718 				return -1;
1719 			}
1720 		}
1721 	} while (reaped < max_completions);
1722 
1723 	if (spdk_unlikely(rqpair->qpair.ctrlr->timeout_enabled)) {
1724 		nvme_rdma_qpair_check_timeout(qpair);
1725 	}
1726 
1727 	return reaped;
1728 }
1729 
1730 uint32_t
1731 nvme_rdma_ctrlr_get_max_xfer_size(struct spdk_nvme_ctrlr *ctrlr)
1732 {
1733 	/* Todo, which should get from the NVMF target */
1734 	return NVME_RDMA_RW_BUFFER_SIZE;
1735 }
1736 
1737 uint16_t
1738 nvme_rdma_ctrlr_get_max_sges(struct spdk_nvme_ctrlr *ctrlr)
1739 {
1740 	return spdk_min(ctrlr->cdata.nvmf_specific.msdbd, NVME_RDMA_MAX_SGL_DESCRIPTORS);
1741 }
1742 
1743 void *
1744 nvme_rdma_ctrlr_alloc_cmb_io_buffer(struct spdk_nvme_ctrlr *ctrlr, size_t size)
1745 {
1746 	return NULL;
1747 }
1748 
1749 int
1750 nvme_rdma_ctrlr_free_cmb_io_buffer(struct spdk_nvme_ctrlr *ctrlr, void *buf, size_t size)
1751 {
1752 	return 0;
1753 }
1754 
1755 void
1756 spdk_nvme_rdma_init_hooks(struct spdk_nvme_rdma_hooks *hooks)
1757 {
1758 	g_nvme_hooks = *hooks;
1759 }
1760