xref: /spdk/lib/nvme/nvme_rdma.c (revision f93b6fb0a4ebcee203e7c44c9e170c20bbce96cc)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 /*
35  * NVMe over RDMA transport
36  */
37 
38 #include "spdk/stdinc.h"
39 
40 #include <infiniband/verbs.h>
41 #include <rdma/rdma_cma.h>
42 #include <rdma/rdma_verbs.h>
43 
44 #include "spdk/assert.h"
45 #include "spdk/log.h"
46 #include "spdk/trace.h"
47 #include "spdk/event.h"
48 #include "spdk/queue.h"
49 #include "spdk/nvme.h"
50 #include "spdk/nvmf_spec.h"
51 #include "spdk/string.h"
52 #include "spdk/endian.h"
53 #include "spdk/likely.h"
54 
55 #include "nvme_internal.h"
56 
57 #define NVME_RDMA_TIME_OUT_IN_MS 2000
58 #define NVME_RDMA_RW_BUFFER_SIZE 131072
59 
60 /*
61  * NVME RDMA qpair Resource Defaults
62  */
63 #define NVME_RDMA_DEFAULT_TX_SGE		2
64 #define NVME_RDMA_DEFAULT_RX_SGE		1
65 
66 
67 /* Max number of NVMe-oF SGL descriptors supported by the host */
68 #define NVME_RDMA_MAX_SGL_DESCRIPTORS		16
69 struct spdk_nvmf_cmd {
70 	struct spdk_nvme_cmd cmd;
71 	struct spdk_nvme_sgl_descriptor sgl[NVME_RDMA_MAX_SGL_DESCRIPTORS];
72 };
73 
74 struct spdk_nvme_rdma_hooks g_nvme_hooks = {};
75 
76 /* Mapping from virtual address to ibv_mr pointer for a protection domain */
77 struct spdk_nvme_rdma_mr_map {
78 	struct ibv_pd				*pd;
79 	struct spdk_mem_map			*map;
80 	uint64_t				ref;
81 	LIST_ENTRY(spdk_nvme_rdma_mr_map)	link;
82 };
83 
84 /* NVMe RDMA transport extensions for spdk_nvme_ctrlr */
85 struct nvme_rdma_ctrlr {
86 	struct spdk_nvme_ctrlr			ctrlr;
87 
88 	struct ibv_pd				*pd;
89 };
90 
91 /* NVMe RDMA qpair extensions for spdk_nvme_qpair */
92 struct nvme_rdma_qpair {
93 	struct spdk_nvme_qpair			qpair;
94 
95 	struct rdma_cm_id			*cm_id;
96 
97 	struct ibv_cq				*cq;
98 
99 	struct	spdk_nvme_rdma_req		*rdma_reqs;
100 
101 	uint32_t				max_send_sge;
102 
103 	uint32_t				max_recv_sge;
104 
105 	uint16_t				num_entries;
106 
107 	/* Parallel arrays of response buffers + response SGLs of size num_entries */
108 	struct ibv_sge				*rsp_sgls;
109 	struct spdk_nvme_cpl			*rsps;
110 
111 	struct ibv_recv_wr			*rsp_recv_wrs;
112 
113 	/* Memory region describing all rsps for this qpair */
114 	struct ibv_mr				*rsp_mr;
115 
116 	/*
117 	 * Array of num_entries NVMe commands registered as RDMA message buffers.
118 	 * Indexed by rdma_req->id.
119 	 */
120 	struct spdk_nvmf_cmd			*cmds;
121 
122 	/* Memory region describing all cmds for this qpair */
123 	struct ibv_mr				*cmd_mr;
124 
125 	struct spdk_nvme_rdma_mr_map		*mr_map;
126 
127 	TAILQ_HEAD(, spdk_nvme_rdma_req)	free_reqs;
128 	TAILQ_HEAD(, spdk_nvme_rdma_req)	outstanding_reqs;
129 
130 	/* Placed at the end of the struct since it is not used frequently */
131 	struct rdma_event_channel		*cm_channel;
132 };
133 
134 struct spdk_nvme_rdma_req {
135 	int					id;
136 
137 	struct ibv_send_wr			send_wr;
138 
139 	struct nvme_request			*req;
140 
141 	struct ibv_sge				send_sgl[NVME_RDMA_DEFAULT_TX_SGE];
142 
143 	TAILQ_ENTRY(spdk_nvme_rdma_req)		link;
144 
145 	bool					request_ready_to_put;
146 };
147 
148 static const char *rdma_cm_event_str[] = {
149 	"RDMA_CM_EVENT_ADDR_RESOLVED",
150 	"RDMA_CM_EVENT_ADDR_ERROR",
151 	"RDMA_CM_EVENT_ROUTE_RESOLVED",
152 	"RDMA_CM_EVENT_ROUTE_ERROR",
153 	"RDMA_CM_EVENT_CONNECT_REQUEST",
154 	"RDMA_CM_EVENT_CONNECT_RESPONSE",
155 	"RDMA_CM_EVENT_CONNECT_ERROR",
156 	"RDMA_CM_EVENT_UNREACHABLE",
157 	"RDMA_CM_EVENT_REJECTED",
158 	"RDMA_CM_EVENT_ESTABLISHED",
159 	"RDMA_CM_EVENT_DISCONNECTED",
160 	"RDMA_CM_EVENT_DEVICE_REMOVAL",
161 	"RDMA_CM_EVENT_MULTICAST_JOIN",
162 	"RDMA_CM_EVENT_MULTICAST_ERROR",
163 	"RDMA_CM_EVENT_ADDR_CHANGE",
164 	"RDMA_CM_EVENT_TIMEWAIT_EXIT"
165 };
166 
167 static LIST_HEAD(, spdk_nvme_rdma_mr_map) g_rdma_mr_maps = LIST_HEAD_INITIALIZER(&g_rdma_mr_maps);
168 static pthread_mutex_t g_rdma_mr_maps_mutex = PTHREAD_MUTEX_INITIALIZER;
169 
170 static int nvme_rdma_qpair_destroy(struct spdk_nvme_qpair *qpair);
171 
172 static inline struct nvme_rdma_qpair *
173 nvme_rdma_qpair(struct spdk_nvme_qpair *qpair)
174 {
175 	assert(qpair->trtype == SPDK_NVME_TRANSPORT_RDMA);
176 	return SPDK_CONTAINEROF(qpair, struct nvme_rdma_qpair, qpair);
177 }
178 
179 static inline struct nvme_rdma_ctrlr *
180 nvme_rdma_ctrlr(struct spdk_nvme_ctrlr *ctrlr)
181 {
182 	assert(ctrlr->trid.trtype == SPDK_NVME_TRANSPORT_RDMA);
183 	return SPDK_CONTAINEROF(ctrlr, struct nvme_rdma_ctrlr, ctrlr);
184 }
185 
186 static struct spdk_nvme_rdma_req *
187 nvme_rdma_req_get(struct nvme_rdma_qpair *rqpair)
188 {
189 	struct spdk_nvme_rdma_req *rdma_req;
190 
191 	rdma_req = TAILQ_FIRST(&rqpair->free_reqs);
192 	if (rdma_req) {
193 		TAILQ_REMOVE(&rqpair->free_reqs, rdma_req, link);
194 		TAILQ_INSERT_TAIL(&rqpair->outstanding_reqs, rdma_req, link);
195 	}
196 
197 	return rdma_req;
198 }
199 
200 static void
201 nvme_rdma_req_put(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
202 {
203 	rdma_req->request_ready_to_put = false;
204 	TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link);
205 	TAILQ_INSERT_HEAD(&rqpair->free_reqs, rdma_req, link);
206 }
207 
208 static void
209 nvme_rdma_req_complete(struct nvme_request *req,
210 		       struct spdk_nvme_cpl *rsp)
211 {
212 	nvme_complete_request(req->cb_fn, req->cb_arg, req, rsp);
213 	nvme_free_request(req);
214 }
215 
216 static const char *
217 nvme_rdma_cm_event_str_get(uint32_t event)
218 {
219 	if (event < SPDK_COUNTOF(rdma_cm_event_str)) {
220 		return rdma_cm_event_str[event];
221 	} else {
222 		return "Undefined";
223 	}
224 }
225 
226 static struct rdma_cm_event *
227 nvme_rdma_get_event(struct rdma_event_channel *channel,
228 		    enum rdma_cm_event_type evt)
229 {
230 	struct rdma_cm_event	*event;
231 	int			rc;
232 
233 	rc = rdma_get_cm_event(channel, &event);
234 	if (rc < 0) {
235 		SPDK_ERRLOG("Failed to get event from CM event channel. Error %d (%s)\n",
236 			    errno, spdk_strerror(errno));
237 		return NULL;
238 	}
239 
240 	if (event->event != evt) {
241 		SPDK_ERRLOG("Expected %s but received %s (%d) from CM event channel (status = %d)\n",
242 			    nvme_rdma_cm_event_str_get(evt),
243 			    nvme_rdma_cm_event_str_get(event->event), event->event, event->status);
244 		rdma_ack_cm_event(event);
245 		return NULL;
246 	}
247 
248 	return event;
249 }
250 
251 static int
252 nvme_rdma_qpair_init(struct nvme_rdma_qpair *rqpair)
253 {
254 	int			rc;
255 	struct ibv_qp_init_attr	attr;
256 	struct ibv_device_attr	dev_attr;
257 	struct nvme_rdma_ctrlr	*rctrlr;
258 
259 	rc = ibv_query_device(rqpair->cm_id->verbs, &dev_attr);
260 	if (rc != 0) {
261 		SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
262 		return -1;
263 	}
264 
265 	rqpair->cq = ibv_create_cq(rqpair->cm_id->verbs, rqpair->num_entries * 2, rqpair, NULL, 0);
266 	if (!rqpair->cq) {
267 		SPDK_ERRLOG("Unable to create completion queue: errno %d: %s\n", errno, spdk_strerror(errno));
268 		return -1;
269 	}
270 
271 	rctrlr = nvme_rdma_ctrlr(rqpair->qpair.ctrlr);
272 	if (g_nvme_hooks.get_ibv_pd) {
273 		rctrlr->pd = g_nvme_hooks.get_ibv_pd(&rctrlr->ctrlr.trid, rqpair->cm_id->verbs);
274 	} else {
275 		rctrlr->pd = NULL;
276 	}
277 
278 	memset(&attr, 0, sizeof(struct ibv_qp_init_attr));
279 	attr.qp_type		= IBV_QPT_RC;
280 	attr.send_cq		= rqpair->cq;
281 	attr.recv_cq		= rqpair->cq;
282 	attr.cap.max_send_wr	= rqpair->num_entries; /* SEND operations */
283 	attr.cap.max_recv_wr	= rqpair->num_entries; /* RECV operations */
284 	attr.cap.max_send_sge	= spdk_min(NVME_RDMA_DEFAULT_TX_SGE, dev_attr.max_sge);
285 	attr.cap.max_recv_sge	= spdk_min(NVME_RDMA_DEFAULT_RX_SGE, dev_attr.max_sge);
286 
287 	rc = rdma_create_qp(rqpair->cm_id, rctrlr->pd, &attr);
288 
289 	if (rc) {
290 		SPDK_ERRLOG("rdma_create_qp failed\n");
291 		return -1;
292 	}
293 
294 	/* ibv_create_qp will change the values in attr.cap. Make sure we store the proper value. */
295 	rqpair->max_send_sge = spdk_min(NVME_RDMA_DEFAULT_TX_SGE, attr.cap.max_send_sge);
296 	rqpair->max_recv_sge = spdk_min(NVME_RDMA_DEFAULT_RX_SGE, attr.cap.max_recv_sge);
297 
298 	rctrlr->pd = rqpair->cm_id->qp->pd;
299 
300 	rqpair->cm_id->context = &rqpair->qpair;
301 
302 	return 0;
303 }
304 
305 #define nvme_rdma_trace_ibv_sge(sg_list) \
306 	if (sg_list) { \
307 		SPDK_DEBUGLOG(SPDK_LOG_NVME, "local addr %p length 0x%x lkey 0x%x\n", \
308 			      (void *)(sg_list)->addr, (sg_list)->length, (sg_list)->lkey); \
309 	}
310 
311 static int
312 nvme_rdma_post_recv(struct nvme_rdma_qpair *rqpair, uint16_t rsp_idx)
313 {
314 	struct ibv_recv_wr *wr, *bad_wr = NULL;
315 	int rc;
316 
317 	wr = &rqpair->rsp_recv_wrs[rsp_idx];
318 	nvme_rdma_trace_ibv_sge(wr->sg_list);
319 
320 	rc = ibv_post_recv(rqpair->cm_id->qp, wr, &bad_wr);
321 	if (rc) {
322 		SPDK_ERRLOG("Failure posting rdma recv, rc = 0x%x\n", rc);
323 	}
324 
325 	return rc;
326 }
327 
328 static void
329 nvme_rdma_free_rsps(struct nvme_rdma_qpair *rqpair)
330 {
331 	if (rqpair->rsp_mr && rdma_dereg_mr(rqpair->rsp_mr)) {
332 		SPDK_ERRLOG("Unable to de-register rsp_mr\n");
333 	}
334 	rqpair->rsp_mr = NULL;
335 
336 	free(rqpair->rsps);
337 	rqpair->rsps = NULL;
338 	free(rqpair->rsp_sgls);
339 	rqpair->rsp_sgls = NULL;
340 	free(rqpair->rsp_recv_wrs);
341 	rqpair->rsp_recv_wrs = NULL;
342 }
343 
344 static int
345 nvme_rdma_alloc_rsps(struct nvme_rdma_qpair *rqpair)
346 {
347 	uint16_t i;
348 
349 	rqpair->rsp_mr = NULL;
350 	rqpair->rsps = NULL;
351 	rqpair->rsp_recv_wrs = NULL;
352 
353 	rqpair->rsp_sgls = calloc(rqpair->num_entries, sizeof(*rqpair->rsp_sgls));
354 	if (!rqpair->rsp_sgls) {
355 		SPDK_ERRLOG("Failed to allocate rsp_sgls\n");
356 		goto fail;
357 	}
358 
359 	rqpair->rsp_recv_wrs = calloc(rqpair->num_entries,
360 				      sizeof(*rqpair->rsp_recv_wrs));
361 	if (!rqpair->rsp_recv_wrs) {
362 		SPDK_ERRLOG("Failed to allocate rsp_recv_wrs\n");
363 		goto fail;
364 	}
365 
366 	rqpair->rsps = calloc(rqpair->num_entries, sizeof(*rqpair->rsps));
367 	if (!rqpair->rsps) {
368 		SPDK_ERRLOG("can not allocate rdma rsps\n");
369 		goto fail;
370 	}
371 
372 	rqpair->rsp_mr = rdma_reg_msgs(rqpair->cm_id, rqpair->rsps,
373 				       rqpair->num_entries * sizeof(*rqpair->rsps));
374 	if (rqpair->rsp_mr == NULL) {
375 		SPDK_ERRLOG("Unable to register rsp_mr\n");
376 		goto fail;
377 	}
378 
379 	for (i = 0; i < rqpair->num_entries; i++) {
380 		struct ibv_sge *rsp_sgl = &rqpair->rsp_sgls[i];
381 
382 		rsp_sgl->addr = (uint64_t)&rqpair->rsps[i];
383 		rsp_sgl->length = sizeof(rqpair->rsps[i]);
384 		rsp_sgl->lkey = rqpair->rsp_mr->lkey;
385 
386 		rqpair->rsp_recv_wrs[i].wr_id = i;
387 		rqpair->rsp_recv_wrs[i].next = NULL;
388 		rqpair->rsp_recv_wrs[i].sg_list = rsp_sgl;
389 		rqpair->rsp_recv_wrs[i].num_sge = 1;
390 
391 		if (nvme_rdma_post_recv(rqpair, i)) {
392 			SPDK_ERRLOG("Unable to post connection rx desc\n");
393 			goto fail;
394 		}
395 	}
396 
397 	return 0;
398 
399 fail:
400 	nvme_rdma_free_rsps(rqpair);
401 	return -ENOMEM;
402 }
403 
404 static void
405 nvme_rdma_free_reqs(struct nvme_rdma_qpair *rqpair)
406 {
407 	if (!rqpair->rdma_reqs) {
408 		return;
409 	}
410 
411 	if (rqpair->cmd_mr && rdma_dereg_mr(rqpair->cmd_mr)) {
412 		SPDK_ERRLOG("Unable to de-register cmd_mr\n");
413 	}
414 	rqpair->cmd_mr = NULL;
415 
416 	free(rqpair->cmds);
417 	rqpair->cmds = NULL;
418 
419 	free(rqpair->rdma_reqs);
420 	rqpair->rdma_reqs = NULL;
421 }
422 
423 static int
424 nvme_rdma_alloc_reqs(struct nvme_rdma_qpair *rqpair)
425 {
426 	int i;
427 
428 	rqpair->rdma_reqs = calloc(rqpair->num_entries, sizeof(struct spdk_nvme_rdma_req));
429 	if (rqpair->rdma_reqs == NULL) {
430 		SPDK_ERRLOG("Failed to allocate rdma_reqs\n");
431 		goto fail;
432 	}
433 
434 	rqpair->cmds = calloc(rqpair->num_entries, sizeof(*rqpair->cmds));
435 	if (!rqpair->cmds) {
436 		SPDK_ERRLOG("Failed to allocate RDMA cmds\n");
437 		goto fail;
438 	}
439 
440 	rqpair->cmd_mr = rdma_reg_msgs(rqpair->cm_id, rqpair->cmds,
441 				       rqpair->num_entries * sizeof(*rqpair->cmds));
442 	if (!rqpair->cmd_mr) {
443 		SPDK_ERRLOG("Unable to register cmd_mr\n");
444 		goto fail;
445 	}
446 
447 	TAILQ_INIT(&rqpair->free_reqs);
448 	TAILQ_INIT(&rqpair->outstanding_reqs);
449 	for (i = 0; i < rqpair->num_entries; i++) {
450 		struct spdk_nvme_rdma_req	*rdma_req;
451 		struct spdk_nvmf_cmd		*cmd;
452 
453 		rdma_req = &rqpair->rdma_reqs[i];
454 		cmd = &rqpair->cmds[i];
455 
456 		rdma_req->id = i;
457 
458 		/* The first RDMA sgl element will always point
459 		 * at this data structure. Depending on whether
460 		 * an NVMe-oF SGL is required, the length of
461 		 * this element may change. */
462 		rdma_req->send_sgl[0].addr = (uint64_t)cmd;
463 		rdma_req->send_sgl[0].lkey = rqpair->cmd_mr->lkey;
464 
465 		rdma_req->send_wr.wr_id = (uint64_t)rdma_req;
466 		rdma_req->send_wr.next = NULL;
467 		rdma_req->send_wr.opcode = IBV_WR_SEND;
468 		rdma_req->send_wr.send_flags = IBV_SEND_SIGNALED;
469 		rdma_req->send_wr.sg_list = rdma_req->send_sgl;
470 		rdma_req->send_wr.imm_data = 0;
471 
472 		TAILQ_INSERT_TAIL(&rqpair->free_reqs, rdma_req, link);
473 	}
474 
475 	return 0;
476 
477 fail:
478 	nvme_rdma_free_reqs(rqpair);
479 	return -ENOMEM;
480 }
481 
482 static int
483 nvme_rdma_recv(struct nvme_rdma_qpair *rqpair, uint64_t rsp_idx)
484 {
485 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
486 	struct spdk_nvme_rdma_req *rdma_req;
487 	struct spdk_nvme_cpl *rsp;
488 	struct nvme_request *req;
489 
490 	assert(rsp_idx < rqpair->num_entries);
491 	rsp = &rqpair->rsps[rsp_idx];
492 	rdma_req = &rqpair->rdma_reqs[rsp->cid];
493 
494 	req = rdma_req->req;
495 	nvme_rdma_req_complete(req, rsp);
496 
497 	if (rdma_req->request_ready_to_put) {
498 		nvme_rdma_req_put(rqpair, rdma_req);
499 	} else {
500 		rdma_req->request_ready_to_put = true;
501 	}
502 
503 	if (nvme_rdma_post_recv(rqpair, rsp_idx)) {
504 		SPDK_ERRLOG("Unable to re-post rx descriptor\n");
505 		return -1;
506 	}
507 
508 	if (!STAILQ_EMPTY(&qpair->queued_req) && !qpair->ctrlr->is_resetting) {
509 		req = STAILQ_FIRST(&qpair->queued_req);
510 		STAILQ_REMOVE_HEAD(&qpair->queued_req, stailq);
511 		nvme_qpair_submit_request(qpair, req);
512 	}
513 
514 	return 0;
515 }
516 
517 static int
518 nvme_rdma_resolve_addr(struct nvme_rdma_qpair *rqpair,
519 		       struct sockaddr *src_addr,
520 		       struct sockaddr *dst_addr,
521 		       struct rdma_event_channel *cm_channel)
522 {
523 	int ret;
524 	struct rdma_cm_event *event;
525 
526 	ret = rdma_resolve_addr(rqpair->cm_id, src_addr, dst_addr,
527 				NVME_RDMA_TIME_OUT_IN_MS);
528 	if (ret) {
529 		SPDK_ERRLOG("rdma_resolve_addr, %d\n", errno);
530 		return ret;
531 	}
532 
533 	event = nvme_rdma_get_event(cm_channel, RDMA_CM_EVENT_ADDR_RESOLVED);
534 	if (event == NULL) {
535 		SPDK_ERRLOG("RDMA address resolution error\n");
536 		return -1;
537 	}
538 	rdma_ack_cm_event(event);
539 
540 	ret = rdma_resolve_route(rqpair->cm_id, NVME_RDMA_TIME_OUT_IN_MS);
541 	if (ret) {
542 		SPDK_ERRLOG("rdma_resolve_route\n");
543 		return ret;
544 	}
545 
546 	event = nvme_rdma_get_event(cm_channel, RDMA_CM_EVENT_ROUTE_RESOLVED);
547 	if (event == NULL) {
548 		SPDK_ERRLOG("RDMA route resolution error\n");
549 		return -1;
550 	}
551 	rdma_ack_cm_event(event);
552 
553 	return 0;
554 }
555 
556 static int
557 nvme_rdma_connect(struct nvme_rdma_qpair *rqpair)
558 {
559 	struct rdma_conn_param				param = {};
560 	struct spdk_nvmf_rdma_request_private_data	request_data = {};
561 	struct spdk_nvmf_rdma_accept_private_data	*accept_data;
562 	struct ibv_device_attr				attr;
563 	int						ret;
564 	struct rdma_cm_event				*event;
565 	struct spdk_nvme_ctrlr				*ctrlr;
566 
567 	ret = ibv_query_device(rqpair->cm_id->verbs, &attr);
568 	if (ret != 0) {
569 		SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
570 		return ret;
571 	}
572 
573 	param.responder_resources = spdk_min(rqpair->num_entries, attr.max_qp_rd_atom);
574 
575 	ctrlr = rqpair->qpair.ctrlr;
576 	if (!ctrlr) {
577 		return -1;
578 	}
579 
580 	request_data.qid = rqpair->qpair.id;
581 	request_data.hrqsize = rqpair->num_entries;
582 	request_data.hsqsize = rqpair->num_entries - 1;
583 	request_data.cntlid = ctrlr->cntlid;
584 
585 	param.private_data = &request_data;
586 	param.private_data_len = sizeof(request_data);
587 	param.retry_count = 7;
588 	param.rnr_retry_count = 7;
589 
590 	ret = rdma_connect(rqpair->cm_id, &param);
591 	if (ret) {
592 		SPDK_ERRLOG("nvme rdma connect error\n");
593 		return ret;
594 	}
595 
596 	event = nvme_rdma_get_event(rqpair->cm_channel, RDMA_CM_EVENT_ESTABLISHED);
597 	if (event == NULL) {
598 		SPDK_ERRLOG("RDMA connect error\n");
599 		return -1;
600 	}
601 
602 	accept_data = (struct spdk_nvmf_rdma_accept_private_data *)event->param.conn.private_data;
603 	if (accept_data == NULL) {
604 		rdma_ack_cm_event(event);
605 		SPDK_ERRLOG("NVMe-oF target did not return accept data\n");
606 		return -1;
607 	}
608 
609 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "Requested queue depth %d. Actually got queue depth %d.\n",
610 		      rqpair->num_entries, accept_data->crqsize);
611 
612 	rqpair->num_entries = spdk_min(rqpair->num_entries, accept_data->crqsize);
613 
614 	rdma_ack_cm_event(event);
615 
616 	return 0;
617 }
618 
619 static int
620 nvme_rdma_parse_addr(struct sockaddr_storage *sa, int family, const char *addr, const char *service)
621 {
622 	struct addrinfo *res;
623 	struct addrinfo hints;
624 	int ret;
625 
626 	memset(&hints, 0, sizeof(hints));
627 	hints.ai_family = family;
628 	hints.ai_socktype = SOCK_STREAM;
629 	hints.ai_protocol = 0;
630 
631 	ret = getaddrinfo(addr, service, &hints, &res);
632 	if (ret) {
633 		SPDK_ERRLOG("getaddrinfo failed: %s (%d)\n", gai_strerror(ret), ret);
634 		return ret;
635 	}
636 
637 	if (res->ai_addrlen > sizeof(*sa)) {
638 		SPDK_ERRLOG("getaddrinfo() ai_addrlen %zu too large\n", (size_t)res->ai_addrlen);
639 		ret = EINVAL;
640 	} else {
641 		memcpy(sa, res->ai_addr, res->ai_addrlen);
642 	}
643 
644 	freeaddrinfo(res);
645 	return ret;
646 }
647 
648 static int
649 nvme_rdma_mr_map_notify(void *cb_ctx, struct spdk_mem_map *map,
650 			enum spdk_mem_map_notify_action action,
651 			void *vaddr, size_t size)
652 {
653 	struct ibv_pd *pd = cb_ctx;
654 	struct ibv_mr *mr;
655 	int rc;
656 
657 	switch (action) {
658 	case SPDK_MEM_MAP_NOTIFY_REGISTER:
659 		if (!g_nvme_hooks.get_rkey) {
660 			mr = ibv_reg_mr(pd, vaddr, size,
661 					IBV_ACCESS_LOCAL_WRITE |
662 					IBV_ACCESS_REMOTE_READ |
663 					IBV_ACCESS_REMOTE_WRITE);
664 			if (mr == NULL) {
665 				SPDK_ERRLOG("ibv_reg_mr() failed\n");
666 				return -EFAULT;
667 			} else {
668 				rc = spdk_mem_map_set_translation(map, (uint64_t)vaddr, size, (uint64_t)mr);
669 			}
670 		} else {
671 			rc = spdk_mem_map_set_translation(map, (uint64_t)vaddr, size,
672 							  g_nvme_hooks.get_rkey(pd, vaddr, size));
673 		}
674 		break;
675 	case SPDK_MEM_MAP_NOTIFY_UNREGISTER:
676 		if (!g_nvme_hooks.get_rkey) {
677 			mr = (struct ibv_mr *)spdk_mem_map_translate(map, (uint64_t)vaddr, NULL);
678 			if (mr) {
679 				ibv_dereg_mr(mr);
680 			}
681 		}
682 		rc = spdk_mem_map_clear_translation(map, (uint64_t)vaddr, size);
683 		break;
684 	default:
685 		SPDK_UNREACHABLE();
686 	}
687 
688 	return rc;
689 }
690 
691 static int
692 nvme_rdma_check_contiguous_entries(uint64_t addr_1, uint64_t addr_2)
693 {
694 	/* Two contiguous mappings will point to the same address which is the start of the RDMA MR. */
695 	return addr_1 == addr_2;
696 }
697 
698 static int
699 nvme_rdma_register_mem(struct nvme_rdma_qpair *rqpair)
700 {
701 	struct ibv_pd *pd = rqpair->cm_id->qp->pd;
702 	struct spdk_nvme_rdma_mr_map *mr_map;
703 	const struct spdk_mem_map_ops nvme_rdma_map_ops = {
704 		.notify_cb = nvme_rdma_mr_map_notify,
705 		.are_contiguous = nvme_rdma_check_contiguous_entries
706 	};
707 
708 	pthread_mutex_lock(&g_rdma_mr_maps_mutex);
709 
710 	/* Look up existing mem map registration for this pd */
711 	LIST_FOREACH(mr_map, &g_rdma_mr_maps, link) {
712 		if (mr_map->pd == pd) {
713 			mr_map->ref++;
714 			rqpair->mr_map = mr_map;
715 			pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
716 			return 0;
717 		}
718 	}
719 
720 	mr_map = calloc(1, sizeof(*mr_map));
721 	if (mr_map == NULL) {
722 		SPDK_ERRLOG("calloc() failed\n");
723 		pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
724 		return -1;
725 	}
726 
727 	mr_map->ref = 1;
728 	mr_map->pd = pd;
729 	mr_map->map = spdk_mem_map_alloc((uint64_t)NULL, &nvme_rdma_map_ops, pd);
730 	if (mr_map->map == NULL) {
731 		SPDK_ERRLOG("spdk_mem_map_alloc() failed\n");
732 		free(mr_map);
733 		pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
734 		return -1;
735 	}
736 
737 	rqpair->mr_map = mr_map;
738 	LIST_INSERT_HEAD(&g_rdma_mr_maps, mr_map, link);
739 
740 	pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
741 
742 	return 0;
743 }
744 
745 static void
746 nvme_rdma_unregister_mem(struct nvme_rdma_qpair *rqpair)
747 {
748 	struct spdk_nvme_rdma_mr_map *mr_map;
749 
750 	mr_map = rqpair->mr_map;
751 	rqpair->mr_map = NULL;
752 
753 	if (mr_map == NULL) {
754 		return;
755 	}
756 
757 	pthread_mutex_lock(&g_rdma_mr_maps_mutex);
758 
759 	assert(mr_map->ref > 0);
760 	mr_map->ref--;
761 	if (mr_map->ref == 0) {
762 		LIST_REMOVE(mr_map, link);
763 		spdk_mem_map_free(&mr_map->map);
764 		free(mr_map);
765 	}
766 
767 	pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
768 }
769 
770 static int
771 nvme_rdma_qpair_connect(struct nvme_rdma_qpair *rqpair)
772 {
773 	struct sockaddr_storage dst_addr;
774 	struct sockaddr_storage src_addr;
775 	bool src_addr_specified;
776 	int rc;
777 	struct spdk_nvme_ctrlr *ctrlr;
778 	int family;
779 
780 	rqpair->cm_channel = rdma_create_event_channel();
781 	if (rqpair->cm_channel == NULL) {
782 		SPDK_ERRLOG("rdma_create_event_channel() failed\n");
783 		return -1;
784 	}
785 
786 	ctrlr = rqpair->qpair.ctrlr;
787 
788 	switch (ctrlr->trid.adrfam) {
789 	case SPDK_NVMF_ADRFAM_IPV4:
790 		family = AF_INET;
791 		break;
792 	case SPDK_NVMF_ADRFAM_IPV6:
793 		family = AF_INET6;
794 		break;
795 	default:
796 		SPDK_ERRLOG("Unhandled ADRFAM %d\n", ctrlr->trid.adrfam);
797 		return -1;
798 	}
799 
800 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "adrfam %d ai_family %d\n", ctrlr->trid.adrfam, family);
801 
802 	memset(&dst_addr, 0, sizeof(dst_addr));
803 
804 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "trsvcid is %s\n", ctrlr->trid.trsvcid);
805 	rc = nvme_rdma_parse_addr(&dst_addr, family, ctrlr->trid.traddr, ctrlr->trid.trsvcid);
806 	if (rc != 0) {
807 		SPDK_ERRLOG("dst_addr nvme_rdma_parse_addr() failed\n");
808 		return -1;
809 	}
810 
811 	if (ctrlr->opts.src_addr[0] || ctrlr->opts.src_svcid[0]) {
812 		memset(&src_addr, 0, sizeof(src_addr));
813 		rc = nvme_rdma_parse_addr(&src_addr, family, ctrlr->opts.src_addr, ctrlr->opts.src_svcid);
814 		if (rc != 0) {
815 			SPDK_ERRLOG("src_addr nvme_rdma_parse_addr() failed\n");
816 			return -1;
817 		}
818 		src_addr_specified = true;
819 	} else {
820 		src_addr_specified = false;
821 	}
822 
823 	rc = rdma_create_id(rqpair->cm_channel, &rqpair->cm_id, rqpair, RDMA_PS_TCP);
824 	if (rc < 0) {
825 		SPDK_ERRLOG("rdma_create_id() failed\n");
826 		return -1;
827 	}
828 
829 	rc = nvme_rdma_resolve_addr(rqpair,
830 				    src_addr_specified ? (struct sockaddr *)&src_addr : NULL,
831 				    (struct sockaddr *)&dst_addr, rqpair->cm_channel);
832 	if (rc < 0) {
833 		SPDK_ERRLOG("nvme_rdma_resolve_addr() failed\n");
834 		return -1;
835 	}
836 
837 	rc = nvme_rdma_qpair_init(rqpair);
838 	if (rc < 0) {
839 		SPDK_ERRLOG("nvme_rdma_qpair_init() failed\n");
840 		return -1;
841 	}
842 
843 	rc = nvme_rdma_connect(rqpair);
844 	if (rc != 0) {
845 		SPDK_ERRLOG("Unable to connect the rqpair\n");
846 		return -1;
847 	}
848 
849 	rc = nvme_rdma_alloc_reqs(rqpair);
850 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "rc =%d\n", rc);
851 	if (rc) {
852 		SPDK_ERRLOG("Unable to allocate rqpair  RDMA requests\n");
853 		return -1;
854 	}
855 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "RDMA requests allocated\n");
856 
857 	rc = nvme_rdma_alloc_rsps(rqpair);
858 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "rc =%d\n", rc);
859 	if (rc < 0) {
860 		SPDK_ERRLOG("Unable to allocate rqpair RDMA responses\n");
861 		return -1;
862 	}
863 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "RDMA responses allocated\n");
864 
865 	rc = nvme_rdma_register_mem(rqpair);
866 	if (rc < 0) {
867 		SPDK_ERRLOG("Unable to register memory for RDMA\n");
868 		return -1;
869 	}
870 
871 	rc = nvme_fabric_qpair_connect(&rqpair->qpair, rqpair->num_entries);
872 	if (rc < 0) {
873 		SPDK_ERRLOG("Failed to send an NVMe-oF Fabric CONNECT command\n");
874 		return -1;
875 	}
876 
877 	return 0;
878 }
879 
880 /*
881  * Build SGL describing empty payload.
882  */
883 static int
884 nvme_rdma_build_null_request(struct spdk_nvme_rdma_req *rdma_req)
885 {
886 	struct nvme_request *req = rdma_req->req;
887 
888 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
889 
890 	/* The first element of this SGL is pointing at an
891 	 * spdk_nvmf_cmd object. For this particular command,
892 	 * we only need the first 64 bytes corresponding to
893 	 * the NVMe command. */
894 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
895 
896 	/* The RDMA SGL needs one element describing the NVMe command. */
897 	rdma_req->send_wr.num_sge = 1;
898 
899 	req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
900 	req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
901 	req->cmd.dptr.sgl1.keyed.length = 0;
902 	req->cmd.dptr.sgl1.keyed.key = 0;
903 	req->cmd.dptr.sgl1.address = 0;
904 
905 	return 0;
906 }
907 
908 /*
909  * Build inline SGL describing contiguous payload buffer.
910  */
911 static int
912 nvme_rdma_build_contig_inline_request(struct nvme_rdma_qpair *rqpair,
913 				      struct spdk_nvme_rdma_req *rdma_req)
914 {
915 	struct nvme_request *req = rdma_req->req;
916 	struct ibv_mr *mr;
917 	void *payload;
918 	uint64_t requested_size;
919 
920 	payload = req->payload.contig_or_cb_arg + req->payload_offset;
921 	assert(req->payload_size != 0);
922 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
923 
924 	requested_size = req->payload_size;
925 
926 	if (!g_nvme_hooks.get_rkey) {
927 		mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map,
928 				(uint64_t)payload, &requested_size);
929 
930 		if (mr == NULL || requested_size < req->payload_size) {
931 			if (mr) {
932 				SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
933 			}
934 			return -EINVAL;
935 		}
936 		rdma_req->send_sgl[1].lkey = mr->lkey;
937 	} else {
938 		rdma_req->send_sgl[1].lkey = spdk_mem_map_translate(rqpair->mr_map->map,
939 					     (uint64_t)payload,
940 					     &requested_size);
941 
942 	}
943 
944 	/* The first element of this SGL is pointing at an
945 	 * spdk_nvmf_cmd object. For this particular command,
946 	 * we only need the first 64 bytes corresponding to
947 	 * the NVMe command. */
948 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
949 
950 	rdma_req->send_sgl[1].addr = (uint64_t)payload;
951 	rdma_req->send_sgl[1].length = (uint32_t)req->payload_size;
952 
953 	/* The RDMA SGL contains two elements. The first describes
954 	 * the NVMe command and the second describes the data
955 	 * payload. */
956 	rdma_req->send_wr.num_sge = 2;
957 
958 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
959 	req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
960 	req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
961 	req->cmd.dptr.sgl1.unkeyed.length = (uint32_t)req->payload_size;
962 	/* Inline only supported for icdoff == 0 currently.  This function will
963 	 * not get called for controllers with other values. */
964 	req->cmd.dptr.sgl1.address = (uint64_t)0;
965 
966 	return 0;
967 }
968 
969 /*
970  * Build SGL describing contiguous payload buffer.
971  */
972 static int
973 nvme_rdma_build_contig_request(struct nvme_rdma_qpair *rqpair,
974 			       struct spdk_nvme_rdma_req *rdma_req)
975 {
976 	struct nvme_request *req = rdma_req->req;
977 	void *payload = req->payload.contig_or_cb_arg + req->payload_offset;
978 	struct ibv_mr *mr;
979 	uint64_t requested_size;
980 
981 	assert(req->payload_size != 0);
982 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
983 
984 	requested_size = req->payload_size;
985 	if (!g_nvme_hooks.get_rkey) {
986 
987 		mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map, (uint64_t)payload,
988 				&requested_size);
989 		if (mr == NULL) {
990 			return -1;
991 		}
992 		req->cmd.dptr.sgl1.keyed.key = mr->rkey;
993 	} else {
994 		req->cmd.dptr.sgl1.keyed.key = spdk_mem_map_translate(rqpair->mr_map->map,
995 					       (uint64_t)payload,
996 					       &requested_size);
997 	}
998 
999 	if (requested_size < req->payload_size) {
1000 		SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
1001 		return -1;
1002 	}
1003 
1004 	/* The first element of this SGL is pointing at an
1005 	 * spdk_nvmf_cmd object. For this particular command,
1006 	 * we only need the first 64 bytes corresponding to
1007 	 * the NVMe command. */
1008 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1009 
1010 	/* The RDMA SGL needs one element describing the NVMe command. */
1011 	rdma_req->send_wr.num_sge = 1;
1012 
1013 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1014 	req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1015 	req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1016 	req->cmd.dptr.sgl1.keyed.length = req->payload_size;
1017 	req->cmd.dptr.sgl1.address = (uint64_t)payload;
1018 
1019 	return 0;
1020 }
1021 
1022 /*
1023  * Build SGL describing scattered payload buffer.
1024  */
1025 static int
1026 nvme_rdma_build_sgl_request(struct nvme_rdma_qpair *rqpair,
1027 			    struct spdk_nvme_rdma_req *rdma_req)
1028 {
1029 	struct nvme_request *req = rdma_req->req;
1030 	struct spdk_nvmf_cmd *cmd = &rqpair->cmds[rdma_req->id];
1031 	struct ibv_mr *mr = NULL;
1032 	void *virt_addr;
1033 	uint64_t remaining_size, mr_length;
1034 	uint32_t sge_length;
1035 	int rc, max_num_sgl, num_sgl_desc;
1036 
1037 	assert(req->payload_size != 0);
1038 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
1039 	assert(req->payload.reset_sgl_fn != NULL);
1040 	assert(req->payload.next_sge_fn != NULL);
1041 	req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
1042 
1043 	max_num_sgl = req->qpair->ctrlr->max_sges;
1044 
1045 	remaining_size = req->payload_size;
1046 	num_sgl_desc = 0;
1047 	do {
1048 		rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &virt_addr, &sge_length);
1049 		if (rc) {
1050 			return -1;
1051 		}
1052 
1053 		sge_length = spdk_min(remaining_size, sge_length);
1054 		mr_length = sge_length;
1055 
1056 		if (!g_nvme_hooks.get_rkey) {
1057 			mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map,
1058 					(uint64_t)virt_addr,
1059 					&mr_length);
1060 			if (mr == NULL) {
1061 				return -1;
1062 			}
1063 			cmd->sgl[num_sgl_desc].keyed.key = mr->rkey;
1064 		} else {
1065 			cmd->sgl[num_sgl_desc].keyed.key = spdk_mem_map_translate(rqpair->mr_map->map,
1066 							   (uint64_t)virt_addr,
1067 							   &mr_length);
1068 		}
1069 
1070 		if (mr_length < sge_length) {
1071 			SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
1072 			return -1;
1073 		}
1074 
1075 		cmd->sgl[num_sgl_desc].keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1076 		cmd->sgl[num_sgl_desc].keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1077 		cmd->sgl[num_sgl_desc].keyed.length = sge_length;
1078 		cmd->sgl[num_sgl_desc].address = (uint64_t)virt_addr;
1079 
1080 		remaining_size -= sge_length;
1081 		num_sgl_desc++;
1082 	} while (remaining_size > 0 && num_sgl_desc < max_num_sgl);
1083 
1084 
1085 	/* Should be impossible if we did our sgl checks properly up the stack, but do a sanity check here. */
1086 	if (remaining_size > 0) {
1087 		return -1;
1088 	}
1089 
1090 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1091 
1092 	/* The RDMA SGL needs one element describing some portion
1093 	 * of the spdk_nvmf_cmd structure. */
1094 	rdma_req->send_wr.num_sge = 1;
1095 
1096 	/*
1097 	 * If only one SGL descriptor is required, it can be embedded directly in the command
1098 	 * as a data block descriptor.
1099 	 */
1100 	if (num_sgl_desc == 1) {
1101 		/* The first element of this SGL is pointing at an
1102 		 * spdk_nvmf_cmd object. For this particular command,
1103 		 * we only need the first 64 bytes corresponding to
1104 		 * the NVMe command. */
1105 		rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1106 
1107 		req->cmd.dptr.sgl1.keyed.type = cmd->sgl[0].keyed.type;
1108 		req->cmd.dptr.sgl1.keyed.subtype = cmd->sgl[0].keyed.subtype;
1109 		req->cmd.dptr.sgl1.keyed.length = cmd->sgl[0].keyed.length;
1110 		req->cmd.dptr.sgl1.keyed.key = cmd->sgl[0].keyed.key;
1111 		req->cmd.dptr.sgl1.address = cmd->sgl[0].address;
1112 	} else {
1113 		/*
1114 		 * Otherwise, The SGL descriptor embedded in the command must point to the list of
1115 		 * SGL descriptors used to describe the operation. In that case it is a last segment descriptor.
1116 		 */
1117 		rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd) + sizeof(struct
1118 					       spdk_nvme_sgl_descriptor) * num_sgl_desc;
1119 
1120 		req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_LAST_SEGMENT;
1121 		req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1122 		req->cmd.dptr.sgl1.unkeyed.length = num_sgl_desc * sizeof(struct spdk_nvme_sgl_descriptor);
1123 		req->cmd.dptr.sgl1.address = (uint64_t)0;
1124 	}
1125 
1126 	return 0;
1127 }
1128 
1129 /*
1130  * Build inline SGL describing sgl payload buffer.
1131  */
1132 static int
1133 nvme_rdma_build_sgl_inline_request(struct nvme_rdma_qpair *rqpair,
1134 				   struct spdk_nvme_rdma_req *rdma_req)
1135 {
1136 	struct nvme_request *req = rdma_req->req;
1137 	struct ibv_mr *mr;
1138 	uint32_t length;
1139 	uint64_t requested_size;
1140 	uint32_t remaining_payload;
1141 	void *virt_addr;
1142 	int rc, i;
1143 
1144 	assert(req->payload_size != 0);
1145 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
1146 	assert(req->payload.reset_sgl_fn != NULL);
1147 	assert(req->payload.next_sge_fn != NULL);
1148 	req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
1149 
1150 	remaining_payload = req->payload_size;
1151 	rdma_req->send_wr.num_sge = 1;
1152 
1153 	do {
1154 		rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &virt_addr, &length);
1155 		if (rc) {
1156 			return -1;
1157 		}
1158 
1159 		if (length > remaining_payload) {
1160 			length = remaining_payload;
1161 		}
1162 
1163 		requested_size = length;
1164 		mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map, (uint64_t)virt_addr,
1165 				&requested_size);
1166 		if (mr == NULL || requested_size < length) {
1167 			for (i = 1; i < rdma_req->send_wr.num_sge; i++) {
1168 				rdma_req->send_sgl[i].addr = 0;
1169 				rdma_req->send_sgl[i].length = 0;
1170 				rdma_req->send_sgl[i].lkey = 0;
1171 			}
1172 
1173 			if (mr) {
1174 				SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
1175 			}
1176 			return -1;
1177 		}
1178 
1179 		rdma_req->send_sgl[rdma_req->send_wr.num_sge].addr = (uint64_t)virt_addr;
1180 		rdma_req->send_sgl[rdma_req->send_wr.num_sge].length = length;
1181 		rdma_req->send_sgl[rdma_req->send_wr.num_sge].lkey = mr->lkey;
1182 		rdma_req->send_wr.num_sge++;
1183 
1184 		remaining_payload -= length;
1185 	} while (remaining_payload && rdma_req->send_wr.num_sge < (int64_t)rqpair->max_send_sge);
1186 
1187 	if (remaining_payload) {
1188 		SPDK_ERRLOG("Unable to prepare request. Too many SGL elements\n");
1189 		return -1;
1190 	}
1191 
1192 	/* The first element of this SGL is pointing at an
1193 	 * spdk_nvmf_cmd object. For this particular command,
1194 	 * we only need the first 64 bytes corresponding to
1195 	 * the NVMe command. */
1196 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1197 
1198 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1199 	req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
1200 	req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1201 	req->cmd.dptr.sgl1.unkeyed.length = (uint32_t)req->payload_size;
1202 	/* Inline only supported for icdoff == 0 currently.  This function will
1203 	 * not get called for controllers with other values. */
1204 	req->cmd.dptr.sgl1.address = (uint64_t)0;
1205 
1206 	return 0;
1207 }
1208 
1209 static inline unsigned int
1210 nvme_rdma_icdsz_bytes(struct spdk_nvme_ctrlr *ctrlr)
1211 {
1212 	return (ctrlr->cdata.nvmf_specific.ioccsz * 16 - sizeof(struct spdk_nvme_cmd));
1213 }
1214 
1215 static int
1216 nvme_rdma_req_init(struct nvme_rdma_qpair *rqpair, struct nvme_request *req,
1217 		   struct spdk_nvme_rdma_req *rdma_req)
1218 {
1219 	struct spdk_nvme_ctrlr *ctrlr = rqpair->qpair.ctrlr;
1220 	int rc;
1221 
1222 	rdma_req->req = req;
1223 	req->cmd.cid = rdma_req->id;
1224 
1225 	if (req->payload_size == 0) {
1226 		rc = nvme_rdma_build_null_request(rdma_req);
1227 	} else if (nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG) {
1228 		/*
1229 		 * Check if icdoff is non zero, to avoid interop conflicts with
1230 		 * targets with non-zero icdoff.  Both SPDK and the Linux kernel
1231 		 * targets use icdoff = 0.  For targets with non-zero icdoff, we
1232 		 * will currently just not use inline data for now.
1233 		 */
1234 		if (req->cmd.opc == SPDK_NVME_OPC_WRITE &&
1235 		    req->payload_size <= nvme_rdma_icdsz_bytes(ctrlr) &&
1236 		    (ctrlr->cdata.nvmf_specific.icdoff == 0)) {
1237 			rc = nvme_rdma_build_contig_inline_request(rqpair, rdma_req);
1238 		} else {
1239 			rc = nvme_rdma_build_contig_request(rqpair, rdma_req);
1240 		}
1241 	} else if (nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL) {
1242 		if (req->cmd.opc == SPDK_NVME_OPC_WRITE &&
1243 		    req->payload_size <= nvme_rdma_icdsz_bytes(ctrlr) &&
1244 		    ctrlr->cdata.nvmf_specific.icdoff == 0) {
1245 			rc = nvme_rdma_build_sgl_inline_request(rqpair, rdma_req);
1246 		} else {
1247 			rc = nvme_rdma_build_sgl_request(rqpair, rdma_req);
1248 		}
1249 	} else {
1250 		rc = -1;
1251 	}
1252 
1253 	if (rc) {
1254 		return rc;
1255 	}
1256 
1257 	memcpy(&rqpair->cmds[rdma_req->id], &req->cmd, sizeof(req->cmd));
1258 	return 0;
1259 }
1260 
1261 static struct spdk_nvme_qpair *
1262 nvme_rdma_ctrlr_create_qpair(struct spdk_nvme_ctrlr *ctrlr,
1263 			     uint16_t qid, uint32_t qsize,
1264 			     enum spdk_nvme_qprio qprio,
1265 			     uint32_t num_requests)
1266 {
1267 	struct nvme_rdma_qpair *rqpair;
1268 	struct spdk_nvme_qpair *qpair;
1269 	int rc;
1270 
1271 	rqpair = calloc(1, sizeof(struct nvme_rdma_qpair));
1272 	if (!rqpair) {
1273 		SPDK_ERRLOG("failed to get create rqpair\n");
1274 		return NULL;
1275 	}
1276 
1277 	rqpair->num_entries = qsize;
1278 
1279 	qpair = &rqpair->qpair;
1280 
1281 	rc = nvme_qpair_init(qpair, qid, ctrlr, qprio, num_requests);
1282 	if (rc != 0) {
1283 		return NULL;
1284 	}
1285 
1286 	rc = nvme_rdma_qpair_connect(rqpair);
1287 	if (rc < 0) {
1288 		nvme_rdma_qpair_destroy(qpair);
1289 		return NULL;
1290 	}
1291 
1292 	return qpair;
1293 }
1294 
1295 static int
1296 nvme_rdma_qpair_destroy(struct spdk_nvme_qpair *qpair)
1297 {
1298 	struct nvme_rdma_qpair *rqpair;
1299 
1300 	if (!qpair) {
1301 		return -1;
1302 	}
1303 	nvme_rdma_qpair_fail(qpair);
1304 	nvme_qpair_deinit(qpair);
1305 
1306 	rqpair = nvme_rdma_qpair(qpair);
1307 
1308 	nvme_rdma_unregister_mem(rqpair);
1309 	nvme_rdma_free_reqs(rqpair);
1310 	nvme_rdma_free_rsps(rqpair);
1311 
1312 	if (rqpair->cm_id) {
1313 		if (rqpair->cm_id->qp) {
1314 			rdma_destroy_qp(rqpair->cm_id);
1315 		}
1316 		rdma_destroy_id(rqpair->cm_id);
1317 	}
1318 
1319 	if (rqpair->cq) {
1320 		ibv_destroy_cq(rqpair->cq);
1321 	}
1322 
1323 	if (rqpair->cm_channel) {
1324 		rdma_destroy_event_channel(rqpair->cm_channel);
1325 	}
1326 
1327 	free(rqpair);
1328 
1329 	return 0;
1330 }
1331 
1332 struct spdk_nvme_qpair *
1333 nvme_rdma_ctrlr_create_io_qpair(struct spdk_nvme_ctrlr *ctrlr, uint16_t qid,
1334 				const struct spdk_nvme_io_qpair_opts *opts)
1335 {
1336 	return nvme_rdma_ctrlr_create_qpair(ctrlr, qid, opts->io_queue_size, opts->qprio,
1337 					    opts->io_queue_requests);
1338 }
1339 
1340 int
1341 nvme_rdma_ctrlr_enable(struct spdk_nvme_ctrlr *ctrlr)
1342 {
1343 	/* do nothing here */
1344 	return 0;
1345 }
1346 
1347 /* This function must only be called while holding g_spdk_nvme_driver->lock */
1348 int
1349 nvme_rdma_ctrlr_scan(struct spdk_nvme_probe_ctx *probe_ctx,
1350 		     bool direct_connect)
1351 {
1352 	struct spdk_nvme_ctrlr_opts discovery_opts;
1353 	struct spdk_nvme_ctrlr *discovery_ctrlr;
1354 	union spdk_nvme_cc_register cc;
1355 	int rc;
1356 	struct nvme_completion_poll_status status;
1357 
1358 	if (strcmp(probe_ctx->trid.subnqn, SPDK_NVMF_DISCOVERY_NQN) != 0) {
1359 		/* It is not a discovery_ctrlr info and try to directly connect it */
1360 		rc = nvme_ctrlr_probe(&probe_ctx->trid, probe_ctx, NULL);
1361 		return rc;
1362 	}
1363 
1364 	spdk_nvme_ctrlr_get_default_ctrlr_opts(&discovery_opts, sizeof(discovery_opts));
1365 	/* For discovery_ctrlr set the timeout to 0 */
1366 	discovery_opts.keep_alive_timeout_ms = 0;
1367 
1368 	discovery_ctrlr = nvme_rdma_ctrlr_construct(&probe_ctx->trid, &discovery_opts, NULL);
1369 	if (discovery_ctrlr == NULL) {
1370 		return -1;
1371 	}
1372 
1373 	/* TODO: this should be using the normal NVMe controller initialization process */
1374 	cc.raw = 0;
1375 	cc.bits.en = 1;
1376 	cc.bits.iosqes = 6; /* SQ entry size == 64 == 2^6 */
1377 	cc.bits.iocqes = 4; /* CQ entry size == 16 == 2^4 */
1378 	rc = nvme_transport_ctrlr_set_reg_4(discovery_ctrlr, offsetof(struct spdk_nvme_registers, cc.raw),
1379 					    cc.raw);
1380 	if (rc < 0) {
1381 		SPDK_ERRLOG("Failed to set cc\n");
1382 		nvme_ctrlr_destruct(discovery_ctrlr);
1383 		return -1;
1384 	}
1385 
1386 	/* Direct attach through spdk_nvme_connect() API */
1387 	if (direct_connect == true) {
1388 		/* get the cdata info */
1389 		rc = nvme_ctrlr_cmd_identify(discovery_ctrlr, SPDK_NVME_IDENTIFY_CTRLR, 0, 0,
1390 					     &discovery_ctrlr->cdata, sizeof(discovery_ctrlr->cdata),
1391 					     nvme_completion_poll_cb, &status);
1392 		if (rc != 0) {
1393 			SPDK_ERRLOG("Failed to identify cdata\n");
1394 			return rc;
1395 		}
1396 
1397 		if (spdk_nvme_wait_for_completion(discovery_ctrlr->adminq, &status)) {
1398 			SPDK_ERRLOG("nvme_identify_controller failed!\n");
1399 			return -ENXIO;
1400 		}
1401 
1402 		/* Set the ready state to skip the normal init process */
1403 		discovery_ctrlr->state = NVME_CTRLR_STATE_READY;
1404 		nvme_ctrlr_connected(probe_ctx, discovery_ctrlr);
1405 		nvme_ctrlr_add_process(discovery_ctrlr, 0);
1406 		return 0;
1407 	}
1408 
1409 	rc = nvme_fabric_ctrlr_discover(discovery_ctrlr, probe_ctx);
1410 	nvme_ctrlr_destruct(discovery_ctrlr);
1411 	return rc;
1412 }
1413 
1414 struct spdk_nvme_ctrlr *nvme_rdma_ctrlr_construct(const struct spdk_nvme_transport_id *trid,
1415 		const struct spdk_nvme_ctrlr_opts *opts,
1416 		void *devhandle)
1417 {
1418 	struct nvme_rdma_ctrlr *rctrlr;
1419 	union spdk_nvme_cap_register cap;
1420 	union spdk_nvme_vs_register vs;
1421 	int rc;
1422 
1423 	rctrlr = calloc(1, sizeof(struct nvme_rdma_ctrlr));
1424 	if (rctrlr == NULL) {
1425 		SPDK_ERRLOG("could not allocate ctrlr\n");
1426 		return NULL;
1427 	}
1428 
1429 	rctrlr->ctrlr.trid.trtype = SPDK_NVME_TRANSPORT_RDMA;
1430 	rctrlr->ctrlr.opts = *opts;
1431 	memcpy(&rctrlr->ctrlr.trid, trid, sizeof(rctrlr->ctrlr.trid));
1432 
1433 	rc = nvme_ctrlr_construct(&rctrlr->ctrlr);
1434 	if (rc != 0) {
1435 		free(rctrlr);
1436 		return NULL;
1437 	}
1438 
1439 	rctrlr->ctrlr.adminq = nvme_rdma_ctrlr_create_qpair(&rctrlr->ctrlr, 0,
1440 			       SPDK_NVMF_MIN_ADMIN_QUEUE_ENTRIES, 0, SPDK_NVMF_MIN_ADMIN_QUEUE_ENTRIES);
1441 	if (!rctrlr->ctrlr.adminq) {
1442 		SPDK_ERRLOG("failed to create admin qpair\n");
1443 		nvme_rdma_ctrlr_destruct(&rctrlr->ctrlr);
1444 		return NULL;
1445 	}
1446 
1447 	if (nvme_ctrlr_get_cap(&rctrlr->ctrlr, &cap)) {
1448 		SPDK_ERRLOG("get_cap() failed\n");
1449 		nvme_ctrlr_destruct(&rctrlr->ctrlr);
1450 		return NULL;
1451 	}
1452 
1453 	if (nvme_ctrlr_get_vs(&rctrlr->ctrlr, &vs)) {
1454 		SPDK_ERRLOG("get_vs() failed\n");
1455 		nvme_ctrlr_destruct(&rctrlr->ctrlr);
1456 		return NULL;
1457 	}
1458 
1459 	if (nvme_ctrlr_add_process(&rctrlr->ctrlr, 0) != 0) {
1460 		SPDK_ERRLOG("nvme_ctrlr_add_process() failed\n");
1461 		nvme_ctrlr_destruct(&rctrlr->ctrlr);
1462 		return NULL;
1463 	}
1464 
1465 	nvme_ctrlr_init_cap(&rctrlr->ctrlr, &cap, &vs);
1466 
1467 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "successfully initialized the nvmf ctrlr\n");
1468 	return &rctrlr->ctrlr;
1469 }
1470 
1471 int
1472 nvme_rdma_ctrlr_destruct(struct spdk_nvme_ctrlr *ctrlr)
1473 {
1474 	struct nvme_rdma_ctrlr *rctrlr = nvme_rdma_ctrlr(ctrlr);
1475 
1476 	if (ctrlr->adminq) {
1477 		nvme_rdma_qpair_destroy(ctrlr->adminq);
1478 	}
1479 
1480 	nvme_ctrlr_destruct_finish(ctrlr);
1481 
1482 	free(rctrlr);
1483 
1484 	return 0;
1485 }
1486 
1487 int
1488 nvme_rdma_ctrlr_set_reg_4(struct spdk_nvme_ctrlr *ctrlr, uint32_t offset, uint32_t value)
1489 {
1490 	return nvme_fabric_ctrlr_set_reg_4(ctrlr, offset, value);
1491 }
1492 
1493 int
1494 nvme_rdma_ctrlr_set_reg_8(struct spdk_nvme_ctrlr *ctrlr, uint32_t offset, uint64_t value)
1495 {
1496 	return nvme_fabric_ctrlr_set_reg_8(ctrlr, offset, value);
1497 }
1498 
1499 int
1500 nvme_rdma_ctrlr_get_reg_4(struct spdk_nvme_ctrlr *ctrlr, uint32_t offset, uint32_t *value)
1501 {
1502 	return nvme_fabric_ctrlr_get_reg_4(ctrlr, offset, value);
1503 }
1504 
1505 int
1506 nvme_rdma_ctrlr_get_reg_8(struct spdk_nvme_ctrlr *ctrlr, uint32_t offset, uint64_t *value)
1507 {
1508 	return nvme_fabric_ctrlr_get_reg_8(ctrlr, offset, value);
1509 }
1510 
1511 int
1512 nvme_rdma_qpair_submit_request(struct spdk_nvme_qpair *qpair,
1513 			       struct nvme_request *req)
1514 {
1515 	struct nvme_rdma_qpair *rqpair;
1516 	struct spdk_nvme_rdma_req *rdma_req;
1517 	struct ibv_send_wr *wr, *bad_wr = NULL;
1518 	int rc;
1519 
1520 	rqpair = nvme_rdma_qpair(qpair);
1521 	assert(rqpair != NULL);
1522 	assert(req != NULL);
1523 
1524 	rdma_req = nvme_rdma_req_get(rqpair);
1525 	if (!rdma_req) {
1526 		/*
1527 		 * No rdma_req is available.  Queue the request to be processed later.
1528 		 */
1529 		STAILQ_INSERT_TAIL(&qpair->queued_req, req, stailq);
1530 		return 0;
1531 	}
1532 
1533 	if (nvme_rdma_req_init(rqpair, req, rdma_req)) {
1534 		SPDK_ERRLOG("nvme_rdma_req_init() failed\n");
1535 		nvme_rdma_req_put(rqpair, rdma_req);
1536 		return -1;
1537 	}
1538 
1539 	wr = &rdma_req->send_wr;
1540 
1541 	nvme_rdma_trace_ibv_sge(wr->sg_list);
1542 
1543 	rc = ibv_post_send(rqpair->cm_id->qp, wr, &bad_wr);
1544 	if (rc) {
1545 		SPDK_ERRLOG("Failure posting rdma send for NVMf completion: %d (%s)\n", rc, spdk_strerror(rc));
1546 	}
1547 
1548 	return rc;
1549 }
1550 
1551 int
1552 nvme_rdma_ctrlr_delete_io_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
1553 {
1554 	return nvme_rdma_qpair_destroy(qpair);
1555 }
1556 
1557 int
1558 nvme_rdma_ctrlr_reinit_io_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
1559 {
1560 	return nvme_rdma_qpair_connect(nvme_rdma_qpair(qpair));
1561 }
1562 
1563 int
1564 nvme_rdma_qpair_enable(struct spdk_nvme_qpair *qpair)
1565 {
1566 	/* Currently, doing nothing here */
1567 	return 0;
1568 }
1569 
1570 int
1571 nvme_rdma_qpair_disable(struct spdk_nvme_qpair *qpair)
1572 {
1573 	/* Currently, doing nothing here */
1574 	return 0;
1575 }
1576 
1577 int
1578 nvme_rdma_qpair_reset(struct spdk_nvme_qpair *qpair)
1579 {
1580 	/* Currently, doing nothing here */
1581 	return 0;
1582 }
1583 
1584 int
1585 nvme_rdma_qpair_fail(struct spdk_nvme_qpair *qpair)
1586 {
1587 	/*
1588 	 * If the qpair is really failed, the connection is broken
1589 	 * and we need to flush back all I/O
1590 	 */
1591 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
1592 	struct nvme_request *req;
1593 	struct spdk_nvme_cpl cpl;
1594 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
1595 
1596 	cpl.status.sc = SPDK_NVME_SC_ABORTED_SQ_DELETION;
1597 	cpl.status.sct = SPDK_NVME_SCT_GENERIC;
1598 
1599 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
1600 		assert(rdma_req->req != NULL);
1601 		req = rdma_req->req;
1602 
1603 		nvme_rdma_req_complete(req, &cpl);
1604 		nvme_rdma_req_put(rqpair, rdma_req);
1605 	}
1606 
1607 	return 0;
1608 }
1609 
1610 static void
1611 nvme_rdma_qpair_check_timeout(struct spdk_nvme_qpair *qpair)
1612 {
1613 	uint64_t t02;
1614 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
1615 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
1616 	struct spdk_nvme_ctrlr *ctrlr = qpair->ctrlr;
1617 	struct spdk_nvme_ctrlr_process *active_proc;
1618 
1619 	/* Don't check timeouts during controller initialization. */
1620 	if (ctrlr->state != NVME_CTRLR_STATE_READY) {
1621 		return;
1622 	}
1623 
1624 	if (nvme_qpair_is_admin_queue(qpair)) {
1625 		active_proc = spdk_nvme_ctrlr_get_current_process(ctrlr);
1626 	} else {
1627 		active_proc = qpair->active_proc;
1628 	}
1629 
1630 	/* Only check timeouts if the current process has a timeout callback. */
1631 	if (active_proc == NULL || active_proc->timeout_cb_fn == NULL) {
1632 		return;
1633 	}
1634 
1635 	t02 = spdk_get_ticks();
1636 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
1637 		assert(rdma_req->req != NULL);
1638 
1639 		if (nvme_request_check_timeout(rdma_req->req, rdma_req->id, active_proc, t02)) {
1640 			/*
1641 			 * The requests are in order, so as soon as one has not timed out,
1642 			 * stop iterating.
1643 			 */
1644 			break;
1645 		}
1646 	}
1647 }
1648 
1649 #define MAX_COMPLETIONS_PER_POLL 128
1650 
1651 int
1652 nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
1653 				    uint32_t max_completions)
1654 {
1655 	struct nvme_rdma_qpair		*rqpair = nvme_rdma_qpair(qpair);
1656 	struct ibv_wc			wc[MAX_COMPLETIONS_PER_POLL];
1657 	int				i, rc, batch_size;
1658 	uint32_t			reaped;
1659 	struct ibv_cq			*cq;
1660 	struct spdk_nvme_rdma_req	*rdma_req;
1661 
1662 	if (max_completions == 0) {
1663 		max_completions = rqpair->num_entries;
1664 	} else {
1665 		max_completions = spdk_min(max_completions, rqpair->num_entries);
1666 	}
1667 
1668 	cq = rqpair->cq;
1669 
1670 	reaped = 0;
1671 	do {
1672 		batch_size = spdk_min((max_completions - reaped),
1673 				      MAX_COMPLETIONS_PER_POLL);
1674 		rc = ibv_poll_cq(cq, batch_size, wc);
1675 		if (rc < 0) {
1676 			SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
1677 				    errno, spdk_strerror(errno));
1678 			return -1;
1679 		} else if (rc == 0) {
1680 			/* Ran out of completions */
1681 			break;
1682 		}
1683 
1684 		for (i = 0; i < rc; i++) {
1685 			if (wc[i].status) {
1686 				SPDK_ERRLOG("CQ error on Queue Pair %p, Response Index %lu (%d): %s\n",
1687 					    qpair, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
1688 				return -1;
1689 			}
1690 
1691 			switch (wc[i].opcode) {
1692 			case IBV_WC_RECV:
1693 				SPDK_DEBUGLOG(SPDK_LOG_NVME, "CQ recv completion\n");
1694 
1695 				reaped++;
1696 
1697 				if (wc[i].byte_len < sizeof(struct spdk_nvme_cpl)) {
1698 					SPDK_ERRLOG("recv length %u less than expected response size\n", wc[i].byte_len);
1699 					return -1;
1700 				}
1701 
1702 				if (nvme_rdma_recv(rqpair, wc[i].wr_id)) {
1703 					SPDK_ERRLOG("nvme_rdma_recv processing failure\n");
1704 					return -1;
1705 				}
1706 				break;
1707 
1708 			case IBV_WC_SEND:
1709 				rdma_req = (struct spdk_nvme_rdma_req *)wc[i].wr_id;
1710 
1711 				if (rdma_req->request_ready_to_put) {
1712 					nvme_rdma_req_put(rqpair, rdma_req);
1713 				} else {
1714 					rdma_req->request_ready_to_put = true;
1715 				}
1716 				break;
1717 
1718 			default:
1719 				SPDK_ERRLOG("Received an unexpected opcode on the CQ: %d\n", wc[i].opcode);
1720 				return -1;
1721 			}
1722 		}
1723 	} while (reaped < max_completions);
1724 
1725 	if (spdk_unlikely(rqpair->qpair.ctrlr->timeout_enabled)) {
1726 		nvme_rdma_qpair_check_timeout(qpair);
1727 	}
1728 
1729 	return reaped;
1730 }
1731 
1732 uint32_t
1733 nvme_rdma_ctrlr_get_max_xfer_size(struct spdk_nvme_ctrlr *ctrlr)
1734 {
1735 	/* Todo, which should get from the NVMF target */
1736 	return NVME_RDMA_RW_BUFFER_SIZE;
1737 }
1738 
1739 uint16_t
1740 nvme_rdma_ctrlr_get_max_sges(struct spdk_nvme_ctrlr *ctrlr)
1741 {
1742 	return spdk_min(ctrlr->cdata.nvmf_specific.msdbd, NVME_RDMA_MAX_SGL_DESCRIPTORS);
1743 }
1744 
1745 void *
1746 nvme_rdma_ctrlr_alloc_cmb_io_buffer(struct spdk_nvme_ctrlr *ctrlr, size_t size)
1747 {
1748 	return NULL;
1749 }
1750 
1751 int
1752 nvme_rdma_ctrlr_free_cmb_io_buffer(struct spdk_nvme_ctrlr *ctrlr, void *buf, size_t size)
1753 {
1754 	return 0;
1755 }
1756 
1757 void
1758 spdk_nvme_rdma_init_hooks(struct spdk_nvme_rdma_hooks *hooks)
1759 {
1760 	g_nvme_hooks = *hooks;
1761 }
1762