xref: /spdk/lib/nvme/nvme_rdma.c (revision 407e88fd2ab020d753e33014cf759353a9901b51)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 /*
35  * NVMe over RDMA transport
36  */
37 
38 #include "spdk/stdinc.h"
39 
40 #include <infiniband/verbs.h>
41 #include <rdma/rdma_cma.h>
42 #include <rdma/rdma_verbs.h>
43 
44 #include "spdk/assert.h"
45 #include "spdk/log.h"
46 #include "spdk/trace.h"
47 #include "spdk/event.h"
48 #include "spdk/queue.h"
49 #include "spdk/nvme.h"
50 #include "spdk/nvmf_spec.h"
51 #include "spdk/string.h"
52 #include "spdk/endian.h"
53 #include "spdk/likely.h"
54 
55 #include "nvme_internal.h"
56 
57 #define NVME_RDMA_TIME_OUT_IN_MS 2000
58 #define NVME_RDMA_RW_BUFFER_SIZE 131072
59 
60 /*
61  * NVME RDMA qpair Resource Defaults
62  */
63 #define NVME_RDMA_DEFAULT_TX_SGE		2
64 #define NVME_RDMA_DEFAULT_RX_SGE		1
65 
66 
67 /* Max number of NVMe-oF SGL descriptors supported by the host */
68 #define NVME_RDMA_MAX_SGL_DESCRIPTORS		16
69 struct spdk_nvmf_cmd {
70 	struct spdk_nvme_cmd cmd;
71 	struct spdk_nvme_sgl_descriptor sgl[NVME_RDMA_MAX_SGL_DESCRIPTORS];
72 };
73 
74 struct spdk_nvme_rdma_hooks g_nvme_hooks = {};
75 
76 /* Mapping from virtual address to ibv_mr pointer for a protection domain */
77 struct spdk_nvme_rdma_mr_map {
78 	struct ibv_pd				*pd;
79 	struct spdk_mem_map			*map;
80 	uint64_t				ref;
81 	LIST_ENTRY(spdk_nvme_rdma_mr_map)	link;
82 };
83 
84 /* NVMe RDMA transport extensions for spdk_nvme_ctrlr */
85 struct nvme_rdma_ctrlr {
86 	struct spdk_nvme_ctrlr			ctrlr;
87 
88 	struct ibv_pd				*pd;
89 
90 	uint16_t				max_sge;
91 };
92 
93 /* NVMe RDMA qpair extensions for spdk_nvme_qpair */
94 struct nvme_rdma_qpair {
95 	struct spdk_nvme_qpair			qpair;
96 
97 	struct rdma_cm_id			*cm_id;
98 
99 	struct ibv_cq				*cq;
100 
101 	struct	spdk_nvme_rdma_req		*rdma_reqs;
102 
103 	uint32_t				max_send_sge;
104 
105 	uint32_t				max_recv_sge;
106 
107 	uint16_t				num_entries;
108 
109 	/* Parallel arrays of response buffers + response SGLs of size num_entries */
110 	struct ibv_sge				*rsp_sgls;
111 	struct spdk_nvme_cpl			*rsps;
112 
113 	struct ibv_recv_wr			*rsp_recv_wrs;
114 
115 	/* Memory region describing all rsps for this qpair */
116 	struct ibv_mr				*rsp_mr;
117 
118 	/*
119 	 * Array of num_entries NVMe commands registered as RDMA message buffers.
120 	 * Indexed by rdma_req->id.
121 	 */
122 	struct spdk_nvmf_cmd			*cmds;
123 
124 	/* Memory region describing all cmds for this qpair */
125 	struct ibv_mr				*cmd_mr;
126 
127 	struct spdk_nvme_rdma_mr_map		*mr_map;
128 
129 	TAILQ_HEAD(, spdk_nvme_rdma_req)	free_reqs;
130 	TAILQ_HEAD(, spdk_nvme_rdma_req)	outstanding_reqs;
131 
132 	/* Placed at the end of the struct since it is not used frequently */
133 	struct rdma_event_channel		*cm_channel;
134 };
135 
136 struct spdk_nvme_rdma_req {
137 	int					id;
138 
139 	struct ibv_send_wr			send_wr;
140 
141 	struct nvme_request			*req;
142 
143 	struct ibv_sge				send_sgl[NVME_RDMA_DEFAULT_TX_SGE];
144 
145 	TAILQ_ENTRY(spdk_nvme_rdma_req)		link;
146 
147 	bool					request_ready_to_put;
148 };
149 
150 static const char *rdma_cm_event_str[] = {
151 	"RDMA_CM_EVENT_ADDR_RESOLVED",
152 	"RDMA_CM_EVENT_ADDR_ERROR",
153 	"RDMA_CM_EVENT_ROUTE_RESOLVED",
154 	"RDMA_CM_EVENT_ROUTE_ERROR",
155 	"RDMA_CM_EVENT_CONNECT_REQUEST",
156 	"RDMA_CM_EVENT_CONNECT_RESPONSE",
157 	"RDMA_CM_EVENT_CONNECT_ERROR",
158 	"RDMA_CM_EVENT_UNREACHABLE",
159 	"RDMA_CM_EVENT_REJECTED",
160 	"RDMA_CM_EVENT_ESTABLISHED",
161 	"RDMA_CM_EVENT_DISCONNECTED",
162 	"RDMA_CM_EVENT_DEVICE_REMOVAL",
163 	"RDMA_CM_EVENT_MULTICAST_JOIN",
164 	"RDMA_CM_EVENT_MULTICAST_ERROR",
165 	"RDMA_CM_EVENT_ADDR_CHANGE",
166 	"RDMA_CM_EVENT_TIMEWAIT_EXIT"
167 };
168 
169 static LIST_HEAD(, spdk_nvme_rdma_mr_map) g_rdma_mr_maps = LIST_HEAD_INITIALIZER(&g_rdma_mr_maps);
170 static pthread_mutex_t g_rdma_mr_maps_mutex = PTHREAD_MUTEX_INITIALIZER;
171 
172 static int nvme_rdma_qpair_destroy(struct spdk_nvme_qpair *qpair);
173 
174 static inline struct nvme_rdma_qpair *
175 nvme_rdma_qpair(struct spdk_nvme_qpair *qpair)
176 {
177 	assert(qpair->trtype == SPDK_NVME_TRANSPORT_RDMA);
178 	return SPDK_CONTAINEROF(qpair, struct nvme_rdma_qpair, qpair);
179 }
180 
181 static inline struct nvme_rdma_ctrlr *
182 nvme_rdma_ctrlr(struct spdk_nvme_ctrlr *ctrlr)
183 {
184 	assert(ctrlr->trid.trtype == SPDK_NVME_TRANSPORT_RDMA);
185 	return SPDK_CONTAINEROF(ctrlr, struct nvme_rdma_ctrlr, ctrlr);
186 }
187 
188 static struct spdk_nvme_rdma_req *
189 nvme_rdma_req_get(struct nvme_rdma_qpair *rqpair)
190 {
191 	struct spdk_nvme_rdma_req *rdma_req;
192 
193 	rdma_req = TAILQ_FIRST(&rqpair->free_reqs);
194 	if (rdma_req) {
195 		TAILQ_REMOVE(&rqpair->free_reqs, rdma_req, link);
196 		TAILQ_INSERT_TAIL(&rqpair->outstanding_reqs, rdma_req, link);
197 	}
198 
199 	return rdma_req;
200 }
201 
202 static void
203 nvme_rdma_req_put(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
204 {
205 	rdma_req->request_ready_to_put = false;
206 	TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link);
207 	TAILQ_INSERT_HEAD(&rqpair->free_reqs, rdma_req, link);
208 }
209 
210 static void
211 nvme_rdma_req_complete(struct nvme_request *req,
212 		       struct spdk_nvme_cpl *rsp)
213 {
214 	nvme_complete_request(req->cb_fn, req->cb_arg, req->qpair, req, rsp);
215 	nvme_free_request(req);
216 }
217 
218 static const char *
219 nvme_rdma_cm_event_str_get(uint32_t event)
220 {
221 	if (event < SPDK_COUNTOF(rdma_cm_event_str)) {
222 		return rdma_cm_event_str[event];
223 	} else {
224 		return "Undefined";
225 	}
226 }
227 
228 static struct rdma_cm_event *
229 nvme_rdma_get_event(struct rdma_event_channel *channel,
230 		    enum rdma_cm_event_type evt)
231 {
232 	struct rdma_cm_event	*event;
233 	int			rc;
234 
235 	rc = rdma_get_cm_event(channel, &event);
236 	if (rc < 0) {
237 		SPDK_ERRLOG("Failed to get event from CM event channel. Error %d (%s)\n",
238 			    errno, spdk_strerror(errno));
239 		return NULL;
240 	}
241 
242 	if (event->event != evt) {
243 		SPDK_ERRLOG("Expected %s but received %s (%d) from CM event channel (status = %d)\n",
244 			    nvme_rdma_cm_event_str_get(evt),
245 			    nvme_rdma_cm_event_str_get(event->event), event->event, event->status);
246 		rdma_ack_cm_event(event);
247 		return NULL;
248 	}
249 
250 	return event;
251 }
252 
253 static int
254 nvme_rdma_qpair_init(struct nvme_rdma_qpair *rqpair)
255 {
256 	int			rc;
257 	struct ibv_qp_init_attr	attr;
258 	struct ibv_device_attr	dev_attr;
259 	struct nvme_rdma_ctrlr	*rctrlr;
260 
261 	rc = ibv_query_device(rqpair->cm_id->verbs, &dev_attr);
262 	if (rc != 0) {
263 		SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
264 		return -1;
265 	}
266 
267 	rqpair->cq = ibv_create_cq(rqpair->cm_id->verbs, rqpair->num_entries * 2, rqpair, NULL, 0);
268 	if (!rqpair->cq) {
269 		SPDK_ERRLOG("Unable to create completion queue: errno %d: %s\n", errno, spdk_strerror(errno));
270 		return -1;
271 	}
272 
273 	rctrlr = nvme_rdma_ctrlr(rqpair->qpair.ctrlr);
274 	if (g_nvme_hooks.get_ibv_pd) {
275 		rctrlr->pd = g_nvme_hooks.get_ibv_pd(&rctrlr->ctrlr.trid, rqpair->cm_id->verbs);
276 	} else {
277 		rctrlr->pd = NULL;
278 	}
279 
280 	memset(&attr, 0, sizeof(struct ibv_qp_init_attr));
281 	attr.qp_type		= IBV_QPT_RC;
282 	attr.send_cq		= rqpair->cq;
283 	attr.recv_cq		= rqpair->cq;
284 	attr.cap.max_send_wr	= rqpair->num_entries; /* SEND operations */
285 	attr.cap.max_recv_wr	= rqpair->num_entries; /* RECV operations */
286 	attr.cap.max_send_sge	= spdk_min(NVME_RDMA_DEFAULT_TX_SGE, dev_attr.max_sge);
287 	attr.cap.max_recv_sge	= spdk_min(NVME_RDMA_DEFAULT_RX_SGE, dev_attr.max_sge);
288 
289 	rc = rdma_create_qp(rqpair->cm_id, rctrlr->pd, &attr);
290 
291 	if (rc) {
292 		SPDK_ERRLOG("rdma_create_qp failed\n");
293 		return -1;
294 	}
295 
296 	/* ibv_create_qp will change the values in attr.cap. Make sure we store the proper value. */
297 	rqpair->max_send_sge = spdk_min(NVME_RDMA_DEFAULT_TX_SGE, attr.cap.max_send_sge);
298 	rqpair->max_recv_sge = spdk_min(NVME_RDMA_DEFAULT_RX_SGE, attr.cap.max_recv_sge);
299 
300 	rctrlr->pd = rqpair->cm_id->qp->pd;
301 
302 	rqpair->cm_id->context = &rqpair->qpair;
303 
304 	return 0;
305 }
306 
307 #define nvme_rdma_trace_ibv_sge(sg_list) \
308 	if (sg_list) { \
309 		SPDK_DEBUGLOG(SPDK_LOG_NVME, "local addr %p length 0x%x lkey 0x%x\n", \
310 			      (void *)(sg_list)->addr, (sg_list)->length, (sg_list)->lkey); \
311 	}
312 
313 static int
314 nvme_rdma_post_recv(struct nvme_rdma_qpair *rqpair, uint16_t rsp_idx)
315 {
316 	struct ibv_recv_wr *wr, *bad_wr = NULL;
317 	int rc;
318 
319 	wr = &rqpair->rsp_recv_wrs[rsp_idx];
320 	nvme_rdma_trace_ibv_sge(wr->sg_list);
321 
322 	rc = ibv_post_recv(rqpair->cm_id->qp, wr, &bad_wr);
323 	if (rc) {
324 		SPDK_ERRLOG("Failure posting rdma recv, rc = 0x%x\n", rc);
325 	}
326 
327 	return rc;
328 }
329 
330 static void
331 nvme_rdma_unregister_rsps(struct nvme_rdma_qpair *rqpair)
332 {
333 	if (rqpair->rsp_mr && rdma_dereg_mr(rqpair->rsp_mr)) {
334 		SPDK_ERRLOG("Unable to de-register rsp_mr\n");
335 	}
336 	rqpair->rsp_mr = NULL;
337 }
338 
339 static void
340 nvme_rdma_free_rsps(struct nvme_rdma_qpair *rqpair)
341 {
342 	free(rqpair->rsps);
343 	rqpair->rsps = NULL;
344 	free(rqpair->rsp_sgls);
345 	rqpair->rsp_sgls = NULL;
346 	free(rqpair->rsp_recv_wrs);
347 	rqpair->rsp_recv_wrs = NULL;
348 }
349 
350 static int
351 nvme_rdma_alloc_rsps(struct nvme_rdma_qpair *rqpair)
352 {
353 	rqpair->rsps = NULL;
354 	rqpair->rsp_recv_wrs = NULL;
355 
356 	rqpair->rsp_sgls = calloc(rqpair->num_entries, sizeof(*rqpair->rsp_sgls));
357 	if (!rqpair->rsp_sgls) {
358 		SPDK_ERRLOG("Failed to allocate rsp_sgls\n");
359 		goto fail;
360 	}
361 
362 	rqpair->rsp_recv_wrs = calloc(rqpair->num_entries,
363 				      sizeof(*rqpair->rsp_recv_wrs));
364 	if (!rqpair->rsp_recv_wrs) {
365 		SPDK_ERRLOG("Failed to allocate rsp_recv_wrs\n");
366 		goto fail;
367 	}
368 
369 	rqpair->rsps = calloc(rqpair->num_entries, sizeof(*rqpair->rsps));
370 	if (!rqpair->rsps) {
371 		SPDK_ERRLOG("can not allocate rdma rsps\n");
372 		goto fail;
373 	}
374 
375 	return 0;
376 fail:
377 	nvme_rdma_free_rsps(rqpair);
378 	return -ENOMEM;
379 }
380 
381 static int
382 nvme_rdma_register_rsps(struct nvme_rdma_qpair *rqpair)
383 {
384 	int i;
385 
386 	rqpair->rsp_mr = rdma_reg_msgs(rqpair->cm_id, rqpair->rsps,
387 				       rqpair->num_entries * sizeof(*rqpair->rsps));
388 	if (rqpair->rsp_mr == NULL) {
389 		SPDK_ERRLOG("Unable to register rsp_mr\n");
390 		goto fail;
391 	}
392 
393 	for (i = 0; i < rqpair->num_entries; i++) {
394 		struct ibv_sge *rsp_sgl = &rqpair->rsp_sgls[i];
395 
396 		rsp_sgl->addr = (uint64_t)&rqpair->rsps[i];
397 		rsp_sgl->length = sizeof(rqpair->rsps[i]);
398 		rsp_sgl->lkey = rqpair->rsp_mr->lkey;
399 
400 		rqpair->rsp_recv_wrs[i].wr_id = i;
401 		rqpair->rsp_recv_wrs[i].next = NULL;
402 		rqpair->rsp_recv_wrs[i].sg_list = rsp_sgl;
403 		rqpair->rsp_recv_wrs[i].num_sge = 1;
404 
405 		if (nvme_rdma_post_recv(rqpair, i)) {
406 			SPDK_ERRLOG("Unable to post connection rx desc\n");
407 			goto fail;
408 		}
409 	}
410 
411 	return 0;
412 
413 fail:
414 	nvme_rdma_unregister_rsps(rqpair);
415 	return -ENOMEM;
416 }
417 
418 static void
419 nvme_rdma_unregister_reqs(struct nvme_rdma_qpair *rqpair)
420 {
421 	if (rqpair->cmd_mr && rdma_dereg_mr(rqpair->cmd_mr)) {
422 		SPDK_ERRLOG("Unable to de-register cmd_mr\n");
423 	}
424 	rqpair->cmd_mr = NULL;
425 }
426 
427 static void
428 nvme_rdma_free_reqs(struct nvme_rdma_qpair *rqpair)
429 {
430 	if (!rqpair->rdma_reqs) {
431 		return;
432 	}
433 
434 	free(rqpair->cmds);
435 	rqpair->cmds = NULL;
436 
437 	free(rqpair->rdma_reqs);
438 	rqpair->rdma_reqs = NULL;
439 }
440 
441 static int
442 nvme_rdma_alloc_reqs(struct nvme_rdma_qpair *rqpair)
443 {
444 	rqpair->rdma_reqs = calloc(rqpair->num_entries, sizeof(struct spdk_nvme_rdma_req));
445 	if (rqpair->rdma_reqs == NULL) {
446 		SPDK_ERRLOG("Failed to allocate rdma_reqs\n");
447 		goto fail;
448 	}
449 
450 	rqpair->cmds = calloc(rqpair->num_entries, sizeof(*rqpair->cmds));
451 	if (!rqpair->cmds) {
452 		SPDK_ERRLOG("Failed to allocate RDMA cmds\n");
453 		goto fail;
454 	}
455 
456 	return 0;
457 fail:
458 	nvme_rdma_free_reqs(rqpair);
459 	return -ENOMEM;
460 }
461 
462 static int
463 nvme_rdma_register_reqs(struct nvme_rdma_qpair *rqpair)
464 {
465 	int i;
466 
467 	rqpair->cmd_mr = rdma_reg_msgs(rqpair->cm_id, rqpair->cmds,
468 				       rqpair->num_entries * sizeof(*rqpair->cmds));
469 	if (!rqpair->cmd_mr) {
470 		SPDK_ERRLOG("Unable to register cmd_mr\n");
471 		goto fail;
472 	}
473 
474 	TAILQ_INIT(&rqpair->free_reqs);
475 	TAILQ_INIT(&rqpair->outstanding_reqs);
476 	for (i = 0; i < rqpair->num_entries; i++) {
477 		struct spdk_nvme_rdma_req	*rdma_req;
478 		struct spdk_nvmf_cmd		*cmd;
479 
480 		rdma_req = &rqpair->rdma_reqs[i];
481 		cmd = &rqpair->cmds[i];
482 
483 		rdma_req->id = i;
484 
485 		/* The first RDMA sgl element will always point
486 		 * at this data structure. Depending on whether
487 		 * an NVMe-oF SGL is required, the length of
488 		 * this element may change. */
489 		rdma_req->send_sgl[0].addr = (uint64_t)cmd;
490 		rdma_req->send_sgl[0].lkey = rqpair->cmd_mr->lkey;
491 
492 		rdma_req->send_wr.wr_id = (uint64_t)rdma_req;
493 		rdma_req->send_wr.next = NULL;
494 		rdma_req->send_wr.opcode = IBV_WR_SEND;
495 		rdma_req->send_wr.send_flags = IBV_SEND_SIGNALED;
496 		rdma_req->send_wr.sg_list = rdma_req->send_sgl;
497 		rdma_req->send_wr.imm_data = 0;
498 
499 		TAILQ_INSERT_TAIL(&rqpair->free_reqs, rdma_req, link);
500 	}
501 
502 	return 0;
503 
504 fail:
505 	nvme_rdma_unregister_reqs(rqpair);
506 	return -ENOMEM;
507 }
508 
509 static int
510 nvme_rdma_recv(struct nvme_rdma_qpair *rqpair, uint64_t rsp_idx)
511 {
512 	struct spdk_nvme_qpair *qpair = &rqpair->qpair;
513 	struct spdk_nvme_rdma_req *rdma_req;
514 	struct spdk_nvme_cpl *rsp;
515 	struct nvme_request *req;
516 
517 	assert(rsp_idx < rqpair->num_entries);
518 	rsp = &rqpair->rsps[rsp_idx];
519 	rdma_req = &rqpair->rdma_reqs[rsp->cid];
520 
521 	req = rdma_req->req;
522 	nvme_rdma_req_complete(req, rsp);
523 
524 	if (rdma_req->request_ready_to_put) {
525 		nvme_rdma_req_put(rqpair, rdma_req);
526 	} else {
527 		rdma_req->request_ready_to_put = true;
528 	}
529 
530 	if (nvme_rdma_post_recv(rqpair, rsp_idx)) {
531 		SPDK_ERRLOG("Unable to re-post rx descriptor\n");
532 		return -1;
533 	}
534 
535 	if (!STAILQ_EMPTY(&qpair->queued_req) && !qpair->ctrlr->is_resetting) {
536 		req = STAILQ_FIRST(&qpair->queued_req);
537 		STAILQ_REMOVE_HEAD(&qpair->queued_req, stailq);
538 		nvme_qpair_submit_request(qpair, req);
539 	}
540 
541 	return 0;
542 }
543 
544 static int
545 nvme_rdma_resolve_addr(struct nvme_rdma_qpair *rqpair,
546 		       struct sockaddr *src_addr,
547 		       struct sockaddr *dst_addr,
548 		       struct rdma_event_channel *cm_channel)
549 {
550 	int ret;
551 	struct rdma_cm_event *event;
552 
553 	ret = rdma_resolve_addr(rqpair->cm_id, src_addr, dst_addr,
554 				NVME_RDMA_TIME_OUT_IN_MS);
555 	if (ret) {
556 		SPDK_ERRLOG("rdma_resolve_addr, %d\n", errno);
557 		return ret;
558 	}
559 
560 	event = nvme_rdma_get_event(cm_channel, RDMA_CM_EVENT_ADDR_RESOLVED);
561 	if (event == NULL) {
562 		SPDK_ERRLOG("RDMA address resolution error\n");
563 		return -1;
564 	}
565 	rdma_ack_cm_event(event);
566 
567 	ret = rdma_resolve_route(rqpair->cm_id, NVME_RDMA_TIME_OUT_IN_MS);
568 	if (ret) {
569 		SPDK_ERRLOG("rdma_resolve_route\n");
570 		return ret;
571 	}
572 
573 	event = nvme_rdma_get_event(cm_channel, RDMA_CM_EVENT_ROUTE_RESOLVED);
574 	if (event == NULL) {
575 		SPDK_ERRLOG("RDMA route resolution error\n");
576 		return -1;
577 	}
578 	rdma_ack_cm_event(event);
579 
580 	return 0;
581 }
582 
583 static int
584 nvme_rdma_connect(struct nvme_rdma_qpair *rqpair)
585 {
586 	struct rdma_conn_param				param = {};
587 	struct spdk_nvmf_rdma_request_private_data	request_data = {};
588 	struct spdk_nvmf_rdma_accept_private_data	*accept_data;
589 	struct ibv_device_attr				attr;
590 	int						ret;
591 	struct rdma_cm_event				*event;
592 	struct spdk_nvme_ctrlr				*ctrlr;
593 
594 	ret = ibv_query_device(rqpair->cm_id->verbs, &attr);
595 	if (ret != 0) {
596 		SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
597 		return ret;
598 	}
599 
600 	param.responder_resources = spdk_min(rqpair->num_entries, attr.max_qp_rd_atom);
601 
602 	ctrlr = rqpair->qpair.ctrlr;
603 	if (!ctrlr) {
604 		return -1;
605 	}
606 
607 	request_data.qid = rqpair->qpair.id;
608 	request_data.hrqsize = rqpair->num_entries;
609 	request_data.hsqsize = rqpair->num_entries - 1;
610 	request_data.cntlid = ctrlr->cntlid;
611 
612 	param.private_data = &request_data;
613 	param.private_data_len = sizeof(request_data);
614 	param.retry_count = 7;
615 	param.rnr_retry_count = 7;
616 
617 	ret = rdma_connect(rqpair->cm_id, &param);
618 	if (ret) {
619 		SPDK_ERRLOG("nvme rdma connect error\n");
620 		return ret;
621 	}
622 
623 	event = nvme_rdma_get_event(rqpair->cm_channel, RDMA_CM_EVENT_ESTABLISHED);
624 	if (event == NULL) {
625 		SPDK_ERRLOG("RDMA connect error\n");
626 		return -1;
627 	}
628 
629 	accept_data = (struct spdk_nvmf_rdma_accept_private_data *)event->param.conn.private_data;
630 	if (accept_data == NULL) {
631 		rdma_ack_cm_event(event);
632 		SPDK_ERRLOG("NVMe-oF target did not return accept data\n");
633 		return -1;
634 	}
635 
636 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "Requested queue depth %d. Actually got queue depth %d.\n",
637 		      rqpair->num_entries, accept_data->crqsize);
638 
639 	rqpair->num_entries = spdk_min(rqpair->num_entries, accept_data->crqsize);
640 
641 	rdma_ack_cm_event(event);
642 
643 	return 0;
644 }
645 
646 static int
647 nvme_rdma_parse_addr(struct sockaddr_storage *sa, int family, const char *addr, const char *service)
648 {
649 	struct addrinfo *res;
650 	struct addrinfo hints;
651 	int ret;
652 
653 	memset(&hints, 0, sizeof(hints));
654 	hints.ai_family = family;
655 	hints.ai_socktype = SOCK_STREAM;
656 	hints.ai_protocol = 0;
657 
658 	ret = getaddrinfo(addr, service, &hints, &res);
659 	if (ret) {
660 		SPDK_ERRLOG("getaddrinfo failed: %s (%d)\n", gai_strerror(ret), ret);
661 		return ret;
662 	}
663 
664 	if (res->ai_addrlen > sizeof(*sa)) {
665 		SPDK_ERRLOG("getaddrinfo() ai_addrlen %zu too large\n", (size_t)res->ai_addrlen);
666 		ret = EINVAL;
667 	} else {
668 		memcpy(sa, res->ai_addr, res->ai_addrlen);
669 	}
670 
671 	freeaddrinfo(res);
672 	return ret;
673 }
674 
675 static int
676 nvme_rdma_mr_map_notify(void *cb_ctx, struct spdk_mem_map *map,
677 			enum spdk_mem_map_notify_action action,
678 			void *vaddr, size_t size)
679 {
680 	struct ibv_pd *pd = cb_ctx;
681 	struct ibv_mr *mr;
682 	int rc;
683 
684 	switch (action) {
685 	case SPDK_MEM_MAP_NOTIFY_REGISTER:
686 		if (!g_nvme_hooks.get_rkey) {
687 			mr = ibv_reg_mr(pd, vaddr, size,
688 					IBV_ACCESS_LOCAL_WRITE |
689 					IBV_ACCESS_REMOTE_READ |
690 					IBV_ACCESS_REMOTE_WRITE);
691 			if (mr == NULL) {
692 				SPDK_ERRLOG("ibv_reg_mr() failed\n");
693 				return -EFAULT;
694 			} else {
695 				rc = spdk_mem_map_set_translation(map, (uint64_t)vaddr, size, (uint64_t)mr);
696 			}
697 		} else {
698 			rc = spdk_mem_map_set_translation(map, (uint64_t)vaddr, size,
699 							  g_nvme_hooks.get_rkey(pd, vaddr, size));
700 		}
701 		break;
702 	case SPDK_MEM_MAP_NOTIFY_UNREGISTER:
703 		if (!g_nvme_hooks.get_rkey) {
704 			mr = (struct ibv_mr *)spdk_mem_map_translate(map, (uint64_t)vaddr, NULL);
705 			if (mr) {
706 				ibv_dereg_mr(mr);
707 			}
708 		}
709 		rc = spdk_mem_map_clear_translation(map, (uint64_t)vaddr, size);
710 		break;
711 	default:
712 		SPDK_UNREACHABLE();
713 	}
714 
715 	return rc;
716 }
717 
718 static int
719 nvme_rdma_check_contiguous_entries(uint64_t addr_1, uint64_t addr_2)
720 {
721 	/* Two contiguous mappings will point to the same address which is the start of the RDMA MR. */
722 	return addr_1 == addr_2;
723 }
724 
725 static int
726 nvme_rdma_register_mem(struct nvme_rdma_qpair *rqpair)
727 {
728 	struct ibv_pd *pd = rqpair->cm_id->qp->pd;
729 	struct spdk_nvme_rdma_mr_map *mr_map;
730 	const struct spdk_mem_map_ops nvme_rdma_map_ops = {
731 		.notify_cb = nvme_rdma_mr_map_notify,
732 		.are_contiguous = nvme_rdma_check_contiguous_entries
733 	};
734 
735 	pthread_mutex_lock(&g_rdma_mr_maps_mutex);
736 
737 	/* Look up existing mem map registration for this pd */
738 	LIST_FOREACH(mr_map, &g_rdma_mr_maps, link) {
739 		if (mr_map->pd == pd) {
740 			mr_map->ref++;
741 			rqpair->mr_map = mr_map;
742 			pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
743 			return 0;
744 		}
745 	}
746 
747 	mr_map = calloc(1, sizeof(*mr_map));
748 	if (mr_map == NULL) {
749 		SPDK_ERRLOG("calloc() failed\n");
750 		pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
751 		return -1;
752 	}
753 
754 	mr_map->ref = 1;
755 	mr_map->pd = pd;
756 	mr_map->map = spdk_mem_map_alloc((uint64_t)NULL, &nvme_rdma_map_ops, pd);
757 	if (mr_map->map == NULL) {
758 		SPDK_ERRLOG("spdk_mem_map_alloc() failed\n");
759 		free(mr_map);
760 		pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
761 		return -1;
762 	}
763 
764 	rqpair->mr_map = mr_map;
765 	LIST_INSERT_HEAD(&g_rdma_mr_maps, mr_map, link);
766 
767 	pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
768 
769 	return 0;
770 }
771 
772 static void
773 nvme_rdma_unregister_mem(struct nvme_rdma_qpair *rqpair)
774 {
775 	struct spdk_nvme_rdma_mr_map *mr_map;
776 
777 	mr_map = rqpair->mr_map;
778 	rqpair->mr_map = NULL;
779 
780 	if (mr_map == NULL) {
781 		return;
782 	}
783 
784 	pthread_mutex_lock(&g_rdma_mr_maps_mutex);
785 
786 	assert(mr_map->ref > 0);
787 	mr_map->ref--;
788 	if (mr_map->ref == 0) {
789 		LIST_REMOVE(mr_map, link);
790 		spdk_mem_map_free(&mr_map->map);
791 		free(mr_map);
792 	}
793 
794 	pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
795 }
796 
797 static int
798 nvme_rdma_qpair_connect(struct nvme_rdma_qpair *rqpair)
799 {
800 	struct sockaddr_storage dst_addr;
801 	struct sockaddr_storage src_addr;
802 	bool src_addr_specified;
803 	int rc;
804 	struct spdk_nvme_ctrlr *ctrlr;
805 	int family;
806 
807 	rqpair->cm_channel = rdma_create_event_channel();
808 	if (rqpair->cm_channel == NULL) {
809 		SPDK_ERRLOG("rdma_create_event_channel() failed\n");
810 		return -1;
811 	}
812 
813 	ctrlr = rqpair->qpair.ctrlr;
814 
815 	switch (ctrlr->trid.adrfam) {
816 	case SPDK_NVMF_ADRFAM_IPV4:
817 		family = AF_INET;
818 		break;
819 	case SPDK_NVMF_ADRFAM_IPV6:
820 		family = AF_INET6;
821 		break;
822 	default:
823 		SPDK_ERRLOG("Unhandled ADRFAM %d\n", ctrlr->trid.adrfam);
824 		return -1;
825 	}
826 
827 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "adrfam %d ai_family %d\n", ctrlr->trid.adrfam, family);
828 
829 	memset(&dst_addr, 0, sizeof(dst_addr));
830 
831 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "trsvcid is %s\n", ctrlr->trid.trsvcid);
832 	rc = nvme_rdma_parse_addr(&dst_addr, family, ctrlr->trid.traddr, ctrlr->trid.trsvcid);
833 	if (rc != 0) {
834 		SPDK_ERRLOG("dst_addr nvme_rdma_parse_addr() failed\n");
835 		return -1;
836 	}
837 
838 	if (ctrlr->opts.src_addr[0] || ctrlr->opts.src_svcid[0]) {
839 		memset(&src_addr, 0, sizeof(src_addr));
840 		rc = nvme_rdma_parse_addr(&src_addr, family, ctrlr->opts.src_addr, ctrlr->opts.src_svcid);
841 		if (rc != 0) {
842 			SPDK_ERRLOG("src_addr nvme_rdma_parse_addr() failed\n");
843 			return -1;
844 		}
845 		src_addr_specified = true;
846 	} else {
847 		src_addr_specified = false;
848 	}
849 
850 	rc = rdma_create_id(rqpair->cm_channel, &rqpair->cm_id, rqpair, RDMA_PS_TCP);
851 	if (rc < 0) {
852 		SPDK_ERRLOG("rdma_create_id() failed\n");
853 		return -1;
854 	}
855 
856 	rc = nvme_rdma_resolve_addr(rqpair,
857 				    src_addr_specified ? (struct sockaddr *)&src_addr : NULL,
858 				    (struct sockaddr *)&dst_addr, rqpair->cm_channel);
859 	if (rc < 0) {
860 		SPDK_ERRLOG("nvme_rdma_resolve_addr() failed\n");
861 		return -1;
862 	}
863 
864 	rc = nvme_rdma_qpair_init(rqpair);
865 	if (rc < 0) {
866 		SPDK_ERRLOG("nvme_rdma_qpair_init() failed\n");
867 		return -1;
868 	}
869 
870 	rc = nvme_rdma_connect(rqpair);
871 	if (rc != 0) {
872 		SPDK_ERRLOG("Unable to connect the rqpair\n");
873 		return -1;
874 	}
875 
876 	rc = nvme_rdma_register_reqs(rqpair);
877 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "rc =%d\n", rc);
878 	if (rc) {
879 		SPDK_ERRLOG("Unable to register rqpair RDMA requests\n");
880 		return -1;
881 	}
882 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "RDMA requests registered\n");
883 
884 	rc = nvme_rdma_register_rsps(rqpair);
885 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "rc =%d\n", rc);
886 	if (rc < 0) {
887 		SPDK_ERRLOG("Unable to register rqpair RDMA responses\n");
888 		return -1;
889 	}
890 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "RDMA responses registered\n");
891 
892 	rc = nvme_rdma_register_mem(rqpair);
893 	if (rc < 0) {
894 		SPDK_ERRLOG("Unable to register memory for RDMA\n");
895 		return -1;
896 	}
897 
898 	rc = nvme_fabric_qpair_connect(&rqpair->qpair, rqpair->num_entries);
899 	if (rc < 0) {
900 		SPDK_ERRLOG("Failed to send an NVMe-oF Fabric CONNECT command\n");
901 		return -1;
902 	}
903 
904 	return 0;
905 }
906 
907 /*
908  * Build SGL describing empty payload.
909  */
910 static int
911 nvme_rdma_build_null_request(struct spdk_nvme_rdma_req *rdma_req)
912 {
913 	struct nvme_request *req = rdma_req->req;
914 
915 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
916 
917 	/* The first element of this SGL is pointing at an
918 	 * spdk_nvmf_cmd object. For this particular command,
919 	 * we only need the first 64 bytes corresponding to
920 	 * the NVMe command. */
921 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
922 
923 	/* The RDMA SGL needs one element describing the NVMe command. */
924 	rdma_req->send_wr.num_sge = 1;
925 
926 	req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
927 	req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
928 	req->cmd.dptr.sgl1.keyed.length = 0;
929 	req->cmd.dptr.sgl1.keyed.key = 0;
930 	req->cmd.dptr.sgl1.address = 0;
931 
932 	return 0;
933 }
934 
935 /*
936  * Build inline SGL describing contiguous payload buffer.
937  */
938 static int
939 nvme_rdma_build_contig_inline_request(struct nvme_rdma_qpair *rqpair,
940 				      struct spdk_nvme_rdma_req *rdma_req)
941 {
942 	struct nvme_request *req = rdma_req->req;
943 	struct ibv_mr *mr;
944 	void *payload;
945 	uint64_t requested_size;
946 
947 	payload = req->payload.contig_or_cb_arg + req->payload_offset;
948 	assert(req->payload_size != 0);
949 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
950 
951 	requested_size = req->payload_size;
952 
953 	if (!g_nvme_hooks.get_rkey) {
954 		mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map,
955 				(uint64_t)payload, &requested_size);
956 
957 		if (mr == NULL || requested_size < req->payload_size) {
958 			if (mr) {
959 				SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
960 			}
961 			return -EINVAL;
962 		}
963 		rdma_req->send_sgl[1].lkey = mr->lkey;
964 	} else {
965 		rdma_req->send_sgl[1].lkey = spdk_mem_map_translate(rqpair->mr_map->map,
966 					     (uint64_t)payload,
967 					     &requested_size);
968 
969 	}
970 
971 	/* The first element of this SGL is pointing at an
972 	 * spdk_nvmf_cmd object. For this particular command,
973 	 * we only need the first 64 bytes corresponding to
974 	 * the NVMe command. */
975 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
976 
977 	rdma_req->send_sgl[1].addr = (uint64_t)payload;
978 	rdma_req->send_sgl[1].length = (uint32_t)req->payload_size;
979 
980 	/* The RDMA SGL contains two elements. The first describes
981 	 * the NVMe command and the second describes the data
982 	 * payload. */
983 	rdma_req->send_wr.num_sge = 2;
984 
985 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
986 	req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
987 	req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
988 	req->cmd.dptr.sgl1.unkeyed.length = (uint32_t)req->payload_size;
989 	/* Inline only supported for icdoff == 0 currently.  This function will
990 	 * not get called for controllers with other values. */
991 	req->cmd.dptr.sgl1.address = (uint64_t)0;
992 
993 	return 0;
994 }
995 
996 /*
997  * Build SGL describing contiguous payload buffer.
998  */
999 static int
1000 nvme_rdma_build_contig_request(struct nvme_rdma_qpair *rqpair,
1001 			       struct spdk_nvme_rdma_req *rdma_req)
1002 {
1003 	struct nvme_request *req = rdma_req->req;
1004 	void *payload = req->payload.contig_or_cb_arg + req->payload_offset;
1005 	struct ibv_mr *mr;
1006 	uint64_t requested_size;
1007 
1008 	assert(req->payload_size != 0);
1009 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
1010 
1011 	requested_size = req->payload_size;
1012 	if (!g_nvme_hooks.get_rkey) {
1013 
1014 		mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map, (uint64_t)payload,
1015 				&requested_size);
1016 		if (mr == NULL) {
1017 			return -1;
1018 		}
1019 		req->cmd.dptr.sgl1.keyed.key = mr->rkey;
1020 	} else {
1021 		req->cmd.dptr.sgl1.keyed.key = spdk_mem_map_translate(rqpair->mr_map->map,
1022 					       (uint64_t)payload,
1023 					       &requested_size);
1024 	}
1025 
1026 	if (requested_size < req->payload_size) {
1027 		SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
1028 		return -1;
1029 	}
1030 
1031 	/* The first element of this SGL is pointing at an
1032 	 * spdk_nvmf_cmd object. For this particular command,
1033 	 * we only need the first 64 bytes corresponding to
1034 	 * the NVMe command. */
1035 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1036 
1037 	/* The RDMA SGL needs one element describing the NVMe command. */
1038 	rdma_req->send_wr.num_sge = 1;
1039 
1040 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1041 	req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1042 	req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1043 	req->cmd.dptr.sgl1.keyed.length = req->payload_size;
1044 	req->cmd.dptr.sgl1.address = (uint64_t)payload;
1045 
1046 	return 0;
1047 }
1048 
1049 /*
1050  * Build SGL describing scattered payload buffer.
1051  */
1052 static int
1053 nvme_rdma_build_sgl_request(struct nvme_rdma_qpair *rqpair,
1054 			    struct spdk_nvme_rdma_req *rdma_req)
1055 {
1056 	struct nvme_request *req = rdma_req->req;
1057 	struct spdk_nvmf_cmd *cmd = &rqpair->cmds[rdma_req->id];
1058 	struct ibv_mr *mr = NULL;
1059 	void *virt_addr;
1060 	uint64_t remaining_size, mr_length;
1061 	uint32_t sge_length;
1062 	int rc, max_num_sgl, num_sgl_desc;
1063 
1064 	assert(req->payload_size != 0);
1065 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
1066 	assert(req->payload.reset_sgl_fn != NULL);
1067 	assert(req->payload.next_sge_fn != NULL);
1068 	req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
1069 
1070 	max_num_sgl = req->qpair->ctrlr->max_sges;
1071 
1072 	remaining_size = req->payload_size;
1073 	num_sgl_desc = 0;
1074 	do {
1075 		rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &virt_addr, &sge_length);
1076 		if (rc) {
1077 			return -1;
1078 		}
1079 
1080 		sge_length = spdk_min(remaining_size, sge_length);
1081 		mr_length = sge_length;
1082 
1083 		if (!g_nvme_hooks.get_rkey) {
1084 			mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map,
1085 					(uint64_t)virt_addr,
1086 					&mr_length);
1087 			if (mr == NULL) {
1088 				return -1;
1089 			}
1090 			cmd->sgl[num_sgl_desc].keyed.key = mr->rkey;
1091 		} else {
1092 			cmd->sgl[num_sgl_desc].keyed.key = spdk_mem_map_translate(rqpair->mr_map->map,
1093 							   (uint64_t)virt_addr,
1094 							   &mr_length);
1095 		}
1096 
1097 		if (mr_length < sge_length) {
1098 			SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
1099 			return -1;
1100 		}
1101 
1102 		cmd->sgl[num_sgl_desc].keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
1103 		cmd->sgl[num_sgl_desc].keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
1104 		cmd->sgl[num_sgl_desc].keyed.length = sge_length;
1105 		cmd->sgl[num_sgl_desc].address = (uint64_t)virt_addr;
1106 
1107 		remaining_size -= sge_length;
1108 		num_sgl_desc++;
1109 	} while (remaining_size > 0 && num_sgl_desc < max_num_sgl);
1110 
1111 
1112 	/* Should be impossible if we did our sgl checks properly up the stack, but do a sanity check here. */
1113 	if (remaining_size > 0) {
1114 		return -1;
1115 	}
1116 
1117 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1118 
1119 	/* The RDMA SGL needs one element describing some portion
1120 	 * of the spdk_nvmf_cmd structure. */
1121 	rdma_req->send_wr.num_sge = 1;
1122 
1123 	/*
1124 	 * If only one SGL descriptor is required, it can be embedded directly in the command
1125 	 * as a data block descriptor.
1126 	 */
1127 	if (num_sgl_desc == 1) {
1128 		/* The first element of this SGL is pointing at an
1129 		 * spdk_nvmf_cmd object. For this particular command,
1130 		 * we only need the first 64 bytes corresponding to
1131 		 * the NVMe command. */
1132 		rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1133 
1134 		req->cmd.dptr.sgl1.keyed.type = cmd->sgl[0].keyed.type;
1135 		req->cmd.dptr.sgl1.keyed.subtype = cmd->sgl[0].keyed.subtype;
1136 		req->cmd.dptr.sgl1.keyed.length = cmd->sgl[0].keyed.length;
1137 		req->cmd.dptr.sgl1.keyed.key = cmd->sgl[0].keyed.key;
1138 		req->cmd.dptr.sgl1.address = cmd->sgl[0].address;
1139 	} else {
1140 		/*
1141 		 * Otherwise, The SGL descriptor embedded in the command must point to the list of
1142 		 * SGL descriptors used to describe the operation. In that case it is a last segment descriptor.
1143 		 */
1144 		rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd) + sizeof(struct
1145 					       spdk_nvme_sgl_descriptor) * num_sgl_desc;
1146 
1147 		req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_LAST_SEGMENT;
1148 		req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1149 		req->cmd.dptr.sgl1.unkeyed.length = num_sgl_desc * sizeof(struct spdk_nvme_sgl_descriptor);
1150 		req->cmd.dptr.sgl1.address = (uint64_t)0;
1151 	}
1152 
1153 	return 0;
1154 }
1155 
1156 /*
1157  * Build inline SGL describing sgl payload buffer.
1158  */
1159 static int
1160 nvme_rdma_build_sgl_inline_request(struct nvme_rdma_qpair *rqpair,
1161 				   struct spdk_nvme_rdma_req *rdma_req)
1162 {
1163 	struct nvme_request *req = rdma_req->req;
1164 	struct ibv_mr *mr;
1165 	uint32_t length;
1166 	uint64_t requested_size;
1167 	void *virt_addr;
1168 	int rc, i;
1169 
1170 	assert(req->payload_size != 0);
1171 	assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
1172 	assert(req->payload.reset_sgl_fn != NULL);
1173 	assert(req->payload.next_sge_fn != NULL);
1174 	req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
1175 
1176 	rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &virt_addr, &length);
1177 	if (rc) {
1178 		return -1;
1179 	}
1180 
1181 	if (length < req->payload_size) {
1182 		SPDK_DEBUGLOG(SPDK_LOG_NVME, "Inline SGL request split so sending separately.\n");
1183 		return nvme_rdma_build_sgl_request(rqpair, rdma_req);
1184 	}
1185 
1186 	if (length > req->payload_size) {
1187 		length = req->payload_size;
1188 	}
1189 
1190 	requested_size = length;
1191 	mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map, (uint64_t)virt_addr,
1192 			&requested_size);
1193 	if (mr == NULL || requested_size < length) {
1194 		for (i = 1; i < rdma_req->send_wr.num_sge; i++) {
1195 			rdma_req->send_sgl[i].addr = 0;
1196 			rdma_req->send_sgl[i].length = 0;
1197 			rdma_req->send_sgl[i].lkey = 0;
1198 		}
1199 
1200 		if (mr) {
1201 			SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
1202 		}
1203 		return -1;
1204 	}
1205 
1206 	rdma_req->send_sgl[1].addr = (uint64_t)virt_addr;
1207 	rdma_req->send_sgl[1].length = length;
1208 	rdma_req->send_sgl[1].lkey = mr->lkey;
1209 
1210 	rdma_req->send_wr.num_sge = 2;
1211 
1212 	/* The first element of this SGL is pointing at an
1213 	 * spdk_nvmf_cmd object. For this particular command,
1214 	 * we only need the first 64 bytes corresponding to
1215 	 * the NVMe command. */
1216 	rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
1217 
1218 	req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
1219 	req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
1220 	req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
1221 	req->cmd.dptr.sgl1.unkeyed.length = (uint32_t)req->payload_size;
1222 	/* Inline only supported for icdoff == 0 currently.  This function will
1223 	 * not get called for controllers with other values. */
1224 	req->cmd.dptr.sgl1.address = (uint64_t)0;
1225 
1226 	return 0;
1227 }
1228 
1229 static inline unsigned int
1230 nvme_rdma_icdsz_bytes(struct spdk_nvme_ctrlr *ctrlr)
1231 {
1232 	return (ctrlr->cdata.nvmf_specific.ioccsz * 16 - sizeof(struct spdk_nvme_cmd));
1233 }
1234 
1235 static int
1236 nvme_rdma_req_init(struct nvme_rdma_qpair *rqpair, struct nvme_request *req,
1237 		   struct spdk_nvme_rdma_req *rdma_req)
1238 {
1239 	struct spdk_nvme_ctrlr *ctrlr = rqpair->qpair.ctrlr;
1240 	int rc;
1241 
1242 	rdma_req->req = req;
1243 	req->cmd.cid = rdma_req->id;
1244 
1245 	if (req->payload_size == 0) {
1246 		rc = nvme_rdma_build_null_request(rdma_req);
1247 	} else if (nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG) {
1248 		/*
1249 		 * Check if icdoff is non zero, to avoid interop conflicts with
1250 		 * targets with non-zero icdoff.  Both SPDK and the Linux kernel
1251 		 * targets use icdoff = 0.  For targets with non-zero icdoff, we
1252 		 * will currently just not use inline data for now.
1253 		 */
1254 		if (req->cmd.opc == SPDK_NVME_OPC_WRITE &&
1255 		    req->payload_size <= nvme_rdma_icdsz_bytes(ctrlr) &&
1256 		    (ctrlr->cdata.nvmf_specific.icdoff == 0)) {
1257 			rc = nvme_rdma_build_contig_inline_request(rqpair, rdma_req);
1258 		} else {
1259 			rc = nvme_rdma_build_contig_request(rqpair, rdma_req);
1260 		}
1261 	} else if (nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL) {
1262 		if (req->cmd.opc == SPDK_NVME_OPC_WRITE &&
1263 		    req->payload_size <= nvme_rdma_icdsz_bytes(ctrlr) &&
1264 		    ctrlr->cdata.nvmf_specific.icdoff == 0) {
1265 			rc = nvme_rdma_build_sgl_inline_request(rqpair, rdma_req);
1266 		} else {
1267 			rc = nvme_rdma_build_sgl_request(rqpair, rdma_req);
1268 		}
1269 	} else {
1270 		rc = -1;
1271 	}
1272 
1273 	if (rc) {
1274 		return rc;
1275 	}
1276 
1277 	memcpy(&rqpair->cmds[rdma_req->id], &req->cmd, sizeof(req->cmd));
1278 	return 0;
1279 }
1280 
1281 static struct spdk_nvme_qpair *
1282 nvme_rdma_ctrlr_create_qpair(struct spdk_nvme_ctrlr *ctrlr,
1283 			     uint16_t qid, uint32_t qsize,
1284 			     enum spdk_nvme_qprio qprio,
1285 			     uint32_t num_requests)
1286 {
1287 	struct nvme_rdma_qpair *rqpair;
1288 	struct spdk_nvme_qpair *qpair;
1289 	int rc;
1290 
1291 	rqpair = calloc(1, sizeof(struct nvme_rdma_qpair));
1292 	if (!rqpair) {
1293 		SPDK_ERRLOG("failed to get create rqpair\n");
1294 		return NULL;
1295 	}
1296 
1297 	rqpair->num_entries = qsize;
1298 
1299 	qpair = &rqpair->qpair;
1300 
1301 	rc = nvme_qpair_init(qpair, qid, ctrlr, qprio, num_requests);
1302 	if (rc != 0) {
1303 		return NULL;
1304 	}
1305 
1306 	rc = nvme_rdma_alloc_reqs(rqpair);
1307 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "rc =%d\n", rc);
1308 	if (rc) {
1309 		SPDK_ERRLOG("Unable to allocate rqpair RDMA requests\n");
1310 		return NULL;
1311 	}
1312 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "RDMA requests allocated\n");
1313 
1314 	rc = nvme_rdma_alloc_rsps(rqpair);
1315 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "rc =%d\n", rc);
1316 	if (rc < 0) {
1317 		SPDK_ERRLOG("Unable to allocate rqpair RDMA responses\n");
1318 		return NULL;
1319 	}
1320 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "RDMA responses allocated\n");
1321 
1322 	rc = nvme_rdma_qpair_connect(rqpair);
1323 	if (rc < 0) {
1324 		nvme_rdma_qpair_destroy(qpair);
1325 		return NULL;
1326 	}
1327 
1328 	return qpair;
1329 }
1330 
1331 static void
1332 nvme_rdma_qpair_disconnect(struct spdk_nvme_qpair *qpair)
1333 {
1334 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
1335 
1336 	nvme_rdma_unregister_mem(rqpair);
1337 	nvme_rdma_unregister_reqs(rqpair);
1338 	nvme_rdma_unregister_rsps(rqpair);
1339 
1340 	if (rqpair->cm_id) {
1341 		if (rqpair->cm_id->qp) {
1342 			rdma_destroy_qp(rqpair->cm_id);
1343 		}
1344 		rdma_destroy_id(rqpair->cm_id);
1345 	}
1346 
1347 	if (rqpair->cq) {
1348 		ibv_destroy_cq(rqpair->cq);
1349 	}
1350 
1351 	if (rqpair->cm_channel) {
1352 		rdma_destroy_event_channel(rqpair->cm_channel);
1353 	}
1354 }
1355 
1356 static int
1357 nvme_rdma_qpair_destroy(struct spdk_nvme_qpair *qpair)
1358 {
1359 	struct nvme_rdma_qpair *rqpair;
1360 
1361 	if (!qpair) {
1362 		return -1;
1363 	}
1364 	nvme_rdma_qpair_disconnect(qpair);
1365 	nvme_rdma_qpair_abort_reqs(qpair, 1);
1366 	nvme_qpair_deinit(qpair);
1367 
1368 	rqpair = nvme_rdma_qpair(qpair);
1369 
1370 	nvme_rdma_free_reqs(rqpair);
1371 	nvme_rdma_free_rsps(rqpair);
1372 	free(rqpair);
1373 
1374 	return 0;
1375 }
1376 
1377 struct spdk_nvme_qpair *
1378 nvme_rdma_ctrlr_create_io_qpair(struct spdk_nvme_ctrlr *ctrlr, uint16_t qid,
1379 				const struct spdk_nvme_io_qpair_opts *opts)
1380 {
1381 	return nvme_rdma_ctrlr_create_qpair(ctrlr, qid, opts->io_queue_size, opts->qprio,
1382 					    opts->io_queue_requests);
1383 }
1384 
1385 int
1386 nvme_rdma_ctrlr_enable(struct spdk_nvme_ctrlr *ctrlr)
1387 {
1388 	/* do nothing here */
1389 	return 0;
1390 }
1391 
1392 /* This function must only be called while holding g_spdk_nvme_driver->lock */
1393 int
1394 nvme_rdma_ctrlr_scan(struct spdk_nvme_probe_ctx *probe_ctx,
1395 		     bool direct_connect)
1396 {
1397 	struct spdk_nvme_ctrlr_opts discovery_opts;
1398 	struct spdk_nvme_ctrlr *discovery_ctrlr;
1399 	union spdk_nvme_cc_register cc;
1400 	int rc;
1401 	struct nvme_completion_poll_status status;
1402 
1403 	if (strcmp(probe_ctx->trid.subnqn, SPDK_NVMF_DISCOVERY_NQN) != 0) {
1404 		/* It is not a discovery_ctrlr info and try to directly connect it */
1405 		rc = nvme_ctrlr_probe(&probe_ctx->trid, probe_ctx, NULL);
1406 		return rc;
1407 	}
1408 
1409 	spdk_nvme_ctrlr_get_default_ctrlr_opts(&discovery_opts, sizeof(discovery_opts));
1410 	/* For discovery_ctrlr set the timeout to 0 */
1411 	discovery_opts.keep_alive_timeout_ms = 0;
1412 
1413 	discovery_ctrlr = nvme_rdma_ctrlr_construct(&probe_ctx->trid, &discovery_opts, NULL);
1414 	if (discovery_ctrlr == NULL) {
1415 		return -1;
1416 	}
1417 
1418 	/* TODO: this should be using the normal NVMe controller initialization process */
1419 	cc.raw = 0;
1420 	cc.bits.en = 1;
1421 	cc.bits.iosqes = 6; /* SQ entry size == 64 == 2^6 */
1422 	cc.bits.iocqes = 4; /* CQ entry size == 16 == 2^4 */
1423 	rc = nvme_transport_ctrlr_set_reg_4(discovery_ctrlr, offsetof(struct spdk_nvme_registers, cc.raw),
1424 					    cc.raw);
1425 	if (rc < 0) {
1426 		SPDK_ERRLOG("Failed to set cc\n");
1427 		nvme_ctrlr_destruct(discovery_ctrlr);
1428 		return -1;
1429 	}
1430 
1431 	/* Direct attach through spdk_nvme_connect() API */
1432 	if (direct_connect == true) {
1433 		/* get the cdata info */
1434 		rc = nvme_ctrlr_cmd_identify(discovery_ctrlr, SPDK_NVME_IDENTIFY_CTRLR, 0, 0,
1435 					     &discovery_ctrlr->cdata, sizeof(discovery_ctrlr->cdata),
1436 					     nvme_completion_poll_cb, &status);
1437 		if (rc != 0) {
1438 			SPDK_ERRLOG("Failed to identify cdata\n");
1439 			return rc;
1440 		}
1441 
1442 		if (spdk_nvme_wait_for_completion(discovery_ctrlr->adminq, &status)) {
1443 			SPDK_ERRLOG("nvme_identify_controller failed!\n");
1444 			return -ENXIO;
1445 		}
1446 
1447 		/* Set the ready state to skip the normal init process */
1448 		discovery_ctrlr->state = NVME_CTRLR_STATE_READY;
1449 		nvme_ctrlr_connected(probe_ctx, discovery_ctrlr);
1450 		nvme_ctrlr_add_process(discovery_ctrlr, 0);
1451 		return 0;
1452 	}
1453 
1454 	rc = nvme_fabric_ctrlr_discover(discovery_ctrlr, probe_ctx);
1455 	nvme_ctrlr_destruct(discovery_ctrlr);
1456 	return rc;
1457 }
1458 
1459 struct spdk_nvme_ctrlr *nvme_rdma_ctrlr_construct(const struct spdk_nvme_transport_id *trid,
1460 		const struct spdk_nvme_ctrlr_opts *opts,
1461 		void *devhandle)
1462 {
1463 	struct nvme_rdma_ctrlr *rctrlr;
1464 	union spdk_nvme_cap_register cap;
1465 	union spdk_nvme_vs_register vs;
1466 	struct ibv_context **contexts;
1467 	struct ibv_device_attr dev_attr;
1468 	int i, rc;
1469 
1470 	rctrlr = calloc(1, sizeof(struct nvme_rdma_ctrlr));
1471 	if (rctrlr == NULL) {
1472 		SPDK_ERRLOG("could not allocate ctrlr\n");
1473 		return NULL;
1474 	}
1475 
1476 	rctrlr->ctrlr.trid.trtype = SPDK_NVME_TRANSPORT_RDMA;
1477 	rctrlr->ctrlr.opts = *opts;
1478 	memcpy(&rctrlr->ctrlr.trid, trid, sizeof(rctrlr->ctrlr.trid));
1479 
1480 	contexts = rdma_get_devices(NULL);
1481 	if (contexts == NULL) {
1482 		SPDK_ERRLOG("rdma_get_devices() failed: %s (%d)\n", spdk_strerror(errno), errno);
1483 		free(rctrlr);
1484 		return NULL;
1485 	}
1486 
1487 	i = 0;
1488 	rctrlr->max_sge = NVME_RDMA_MAX_SGL_DESCRIPTORS;
1489 
1490 	while (contexts[i] != NULL) {
1491 		rc = ibv_query_device(contexts[i], &dev_attr);
1492 		if (rc < 0) {
1493 			SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
1494 			rdma_free_devices(contexts);
1495 			free(rctrlr);
1496 			return NULL;
1497 		}
1498 		rctrlr->max_sge = spdk_min(rctrlr->max_sge, (uint16_t)dev_attr.max_sge);
1499 		i++;
1500 	}
1501 
1502 	rdma_free_devices(contexts);
1503 
1504 	rc = nvme_ctrlr_construct(&rctrlr->ctrlr);
1505 	if (rc != 0) {
1506 		free(rctrlr);
1507 		return NULL;
1508 	}
1509 
1510 	rctrlr->ctrlr.adminq = nvme_rdma_ctrlr_create_qpair(&rctrlr->ctrlr, 0,
1511 			       SPDK_NVMF_MIN_ADMIN_QUEUE_ENTRIES, 0, SPDK_NVMF_MIN_ADMIN_QUEUE_ENTRIES);
1512 	if (!rctrlr->ctrlr.adminq) {
1513 		SPDK_ERRLOG("failed to create admin qpair\n");
1514 		nvme_rdma_ctrlr_destruct(&rctrlr->ctrlr);
1515 		return NULL;
1516 	}
1517 
1518 	if (nvme_ctrlr_get_cap(&rctrlr->ctrlr, &cap)) {
1519 		SPDK_ERRLOG("get_cap() failed\n");
1520 		nvme_ctrlr_destruct(&rctrlr->ctrlr);
1521 		return NULL;
1522 	}
1523 
1524 	if (nvme_ctrlr_get_vs(&rctrlr->ctrlr, &vs)) {
1525 		SPDK_ERRLOG("get_vs() failed\n");
1526 		nvme_ctrlr_destruct(&rctrlr->ctrlr);
1527 		return NULL;
1528 	}
1529 
1530 	if (nvme_ctrlr_add_process(&rctrlr->ctrlr, 0) != 0) {
1531 		SPDK_ERRLOG("nvme_ctrlr_add_process() failed\n");
1532 		nvme_ctrlr_destruct(&rctrlr->ctrlr);
1533 		return NULL;
1534 	}
1535 
1536 	nvme_ctrlr_init_cap(&rctrlr->ctrlr, &cap, &vs);
1537 
1538 	SPDK_DEBUGLOG(SPDK_LOG_NVME, "successfully initialized the nvmf ctrlr\n");
1539 	return &rctrlr->ctrlr;
1540 }
1541 
1542 int
1543 nvme_rdma_ctrlr_destruct(struct spdk_nvme_ctrlr *ctrlr)
1544 {
1545 	struct nvme_rdma_ctrlr *rctrlr = nvme_rdma_ctrlr(ctrlr);
1546 
1547 	if (ctrlr->adminq) {
1548 		nvme_rdma_qpair_destroy(ctrlr->adminq);
1549 	}
1550 
1551 	nvme_ctrlr_destruct_finish(ctrlr);
1552 
1553 	free(rctrlr);
1554 
1555 	return 0;
1556 }
1557 
1558 int
1559 nvme_rdma_ctrlr_set_reg_4(struct spdk_nvme_ctrlr *ctrlr, uint32_t offset, uint32_t value)
1560 {
1561 	return nvme_fabric_ctrlr_set_reg_4(ctrlr, offset, value);
1562 }
1563 
1564 int
1565 nvme_rdma_ctrlr_set_reg_8(struct spdk_nvme_ctrlr *ctrlr, uint32_t offset, uint64_t value)
1566 {
1567 	return nvme_fabric_ctrlr_set_reg_8(ctrlr, offset, value);
1568 }
1569 
1570 int
1571 nvme_rdma_ctrlr_get_reg_4(struct spdk_nvme_ctrlr *ctrlr, uint32_t offset, uint32_t *value)
1572 {
1573 	return nvme_fabric_ctrlr_get_reg_4(ctrlr, offset, value);
1574 }
1575 
1576 int
1577 nvme_rdma_ctrlr_get_reg_8(struct spdk_nvme_ctrlr *ctrlr, uint32_t offset, uint64_t *value)
1578 {
1579 	return nvme_fabric_ctrlr_get_reg_8(ctrlr, offset, value);
1580 }
1581 
1582 int
1583 nvme_rdma_qpair_submit_request(struct spdk_nvme_qpair *qpair,
1584 			       struct nvme_request *req)
1585 {
1586 	struct nvme_rdma_qpair *rqpair;
1587 	struct spdk_nvme_rdma_req *rdma_req;
1588 	struct ibv_send_wr *wr, *bad_wr = NULL;
1589 	int rc;
1590 
1591 	rqpair = nvme_rdma_qpair(qpair);
1592 	assert(rqpair != NULL);
1593 	assert(req != NULL);
1594 
1595 	rdma_req = nvme_rdma_req_get(rqpair);
1596 	if (!rdma_req) {
1597 		/*
1598 		 * No rdma_req is available, so queue the request to be
1599 		 *  processed later.
1600 		 */
1601 		STAILQ_INSERT_TAIL(&qpair->queued_req, req, stailq);
1602 		return 0;
1603 	}
1604 
1605 	if (nvme_rdma_req_init(rqpair, req, rdma_req)) {
1606 		SPDK_ERRLOG("nvme_rdma_req_init() failed\n");
1607 		nvme_rdma_req_put(rqpair, rdma_req);
1608 		return -1;
1609 	}
1610 
1611 	wr = &rdma_req->send_wr;
1612 
1613 	nvme_rdma_trace_ibv_sge(wr->sg_list);
1614 
1615 	rc = ibv_post_send(rqpair->cm_id->qp, wr, &bad_wr);
1616 	if (rc) {
1617 		SPDK_ERRLOG("Failure posting rdma send for NVMf completion: %d (%s)\n", rc, spdk_strerror(rc));
1618 	}
1619 
1620 	return rc;
1621 }
1622 
1623 int
1624 nvme_rdma_ctrlr_delete_io_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
1625 {
1626 	return nvme_rdma_qpair_destroy(qpair);
1627 }
1628 
1629 int
1630 nvme_rdma_ctrlr_connect_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
1631 {
1632 	return nvme_rdma_qpair_connect(nvme_rdma_qpair(qpair));
1633 }
1634 
1635 void
1636 nvme_rdma_ctrlr_disconnect_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
1637 {
1638 	nvme_rdma_qpair_disconnect(qpair);
1639 }
1640 
1641 int
1642 nvme_rdma_qpair_reset(struct spdk_nvme_qpair *qpair)
1643 {
1644 	/* Currently, doing nothing here */
1645 	return 0;
1646 }
1647 
1648 void
1649 nvme_rdma_qpair_abort_reqs(struct spdk_nvme_qpair *qpair, uint32_t dnr)
1650 {
1651 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
1652 	struct nvme_request *req;
1653 	struct spdk_nvme_cpl cpl;
1654 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
1655 
1656 	cpl.status.sc = SPDK_NVME_SC_ABORTED_SQ_DELETION;
1657 	cpl.status.sct = SPDK_NVME_SCT_GENERIC;
1658 	cpl.status.dnr = dnr;
1659 
1660 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
1661 		assert(rdma_req->req != NULL);
1662 		req = rdma_req->req;
1663 
1664 		nvme_rdma_req_complete(req, &cpl);
1665 		nvme_rdma_req_put(rqpair, rdma_req);
1666 	}
1667 }
1668 
1669 static void
1670 nvme_rdma_qpair_check_timeout(struct spdk_nvme_qpair *qpair)
1671 {
1672 	uint64_t t02;
1673 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
1674 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
1675 	struct spdk_nvme_ctrlr *ctrlr = qpair->ctrlr;
1676 	struct spdk_nvme_ctrlr_process *active_proc;
1677 
1678 	/* Don't check timeouts during controller initialization. */
1679 	if (ctrlr->state != NVME_CTRLR_STATE_READY) {
1680 		return;
1681 	}
1682 
1683 	if (nvme_qpair_is_admin_queue(qpair)) {
1684 		active_proc = spdk_nvme_ctrlr_get_current_process(ctrlr);
1685 	} else {
1686 		active_proc = qpair->active_proc;
1687 	}
1688 
1689 	/* Only check timeouts if the current process has a timeout callback. */
1690 	if (active_proc == NULL || active_proc->timeout_cb_fn == NULL) {
1691 		return;
1692 	}
1693 
1694 	t02 = spdk_get_ticks();
1695 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
1696 		assert(rdma_req->req != NULL);
1697 
1698 		if (nvme_request_check_timeout(rdma_req->req, rdma_req->id, active_proc, t02)) {
1699 			/*
1700 			 * The requests are in order, so as soon as one has not timed out,
1701 			 * stop iterating.
1702 			 */
1703 			break;
1704 		}
1705 	}
1706 }
1707 
1708 #define MAX_COMPLETIONS_PER_POLL 128
1709 
1710 int
1711 nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
1712 				    uint32_t max_completions)
1713 {
1714 	struct nvme_rdma_qpair		*rqpair = nvme_rdma_qpair(qpair);
1715 	struct ibv_wc			wc[MAX_COMPLETIONS_PER_POLL];
1716 	int				i, rc, batch_size;
1717 	uint32_t			reaped;
1718 	struct ibv_cq			*cq;
1719 	struct spdk_nvme_rdma_req	*rdma_req;
1720 
1721 	if (max_completions == 0) {
1722 		max_completions = rqpair->num_entries;
1723 	} else {
1724 		max_completions = spdk_min(max_completions, rqpair->num_entries);
1725 	}
1726 
1727 	cq = rqpair->cq;
1728 
1729 	reaped = 0;
1730 	do {
1731 		batch_size = spdk_min((max_completions - reaped),
1732 				      MAX_COMPLETIONS_PER_POLL);
1733 		rc = ibv_poll_cq(cq, batch_size, wc);
1734 		if (rc < 0) {
1735 			SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
1736 				    errno, spdk_strerror(errno));
1737 			return -1;
1738 		} else if (rc == 0) {
1739 			/* Ran out of completions */
1740 			break;
1741 		}
1742 
1743 		for (i = 0; i < rc; i++) {
1744 			if (wc[i].status) {
1745 				SPDK_ERRLOG("CQ error on Queue Pair %p, Response Index %lu (%d): %s\n",
1746 					    qpair, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
1747 				return -1;
1748 			}
1749 
1750 			switch (wc[i].opcode) {
1751 			case IBV_WC_RECV:
1752 				SPDK_DEBUGLOG(SPDK_LOG_NVME, "CQ recv completion\n");
1753 
1754 				reaped++;
1755 
1756 				if (wc[i].byte_len < sizeof(struct spdk_nvme_cpl)) {
1757 					SPDK_ERRLOG("recv length %u less than expected response size\n", wc[i].byte_len);
1758 					return -1;
1759 				}
1760 
1761 				if (nvme_rdma_recv(rqpair, wc[i].wr_id)) {
1762 					SPDK_ERRLOG("nvme_rdma_recv processing failure\n");
1763 					return -1;
1764 				}
1765 				break;
1766 
1767 			case IBV_WC_SEND:
1768 				rdma_req = (struct spdk_nvme_rdma_req *)wc[i].wr_id;
1769 
1770 				if (rdma_req->request_ready_to_put) {
1771 					nvme_rdma_req_put(rqpair, rdma_req);
1772 				} else {
1773 					rdma_req->request_ready_to_put = true;
1774 				}
1775 				break;
1776 
1777 			default:
1778 				SPDK_ERRLOG("Received an unexpected opcode on the CQ: %d\n", wc[i].opcode);
1779 				return -1;
1780 			}
1781 		}
1782 	} while (reaped < max_completions);
1783 
1784 	if (spdk_unlikely(rqpair->qpair.ctrlr->timeout_enabled)) {
1785 		nvme_rdma_qpair_check_timeout(qpair);
1786 	}
1787 
1788 	return reaped;
1789 }
1790 
1791 uint32_t
1792 nvme_rdma_ctrlr_get_max_xfer_size(struct spdk_nvme_ctrlr *ctrlr)
1793 {
1794 	/* max_mr_size by ibv_query_device indicates the largest value that we can
1795 	 * set for a registered memory region.  It is independent from the actual
1796 	 * I/O size and is very likely to be larger than 2 MiB which is the
1797 	 * granularity we currently register memory regions.  Hence return
1798 	 * UINT32_MAX here and let the generic layer use the controller data to
1799 	 * moderate this value.
1800 	 */
1801 	return UINT32_MAX;
1802 }
1803 
1804 uint16_t
1805 nvme_rdma_ctrlr_get_max_sges(struct spdk_nvme_ctrlr *ctrlr)
1806 {
1807 	struct nvme_rdma_ctrlr *rctrlr = nvme_rdma_ctrlr(ctrlr);
1808 
1809 	return rctrlr->max_sge;
1810 }
1811 
1812 volatile struct spdk_nvme_registers *
1813 nvme_rdma_ctrlr_get_registers(struct spdk_nvme_ctrlr *ctrlr)
1814 {
1815 	return NULL;
1816 }
1817 
1818 void *
1819 nvme_rdma_ctrlr_alloc_cmb_io_buffer(struct spdk_nvme_ctrlr *ctrlr, size_t size)
1820 {
1821 	return NULL;
1822 }
1823 
1824 int
1825 nvme_rdma_ctrlr_free_cmb_io_buffer(struct spdk_nvme_ctrlr *ctrlr, void *buf, size_t size)
1826 {
1827 	return 0;
1828 }
1829 
1830 void
1831 nvme_rdma_admin_qpair_abort_aers(struct spdk_nvme_qpair *qpair)
1832 {
1833 	struct spdk_nvme_rdma_req *rdma_req, *tmp;
1834 	struct nvme_request *req;
1835 	struct spdk_nvme_cpl cpl;
1836 	struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
1837 
1838 	cpl.status.sc = SPDK_NVME_SC_ABORTED_SQ_DELETION;
1839 	cpl.status.sct = SPDK_NVME_SCT_GENERIC;
1840 
1841 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
1842 		if (rdma_req->req->cmd.opc != SPDK_NVME_OPC_ASYNC_EVENT_REQUEST) {
1843 			continue;
1844 		}
1845 		assert(rdma_req->req != NULL);
1846 		req = rdma_req->req;
1847 
1848 		nvme_rdma_req_complete(req, &cpl);
1849 		nvme_rdma_req_put(rqpair, rdma_req);
1850 	}
1851 }
1852 
1853 void
1854 spdk_nvme_rdma_init_hooks(struct spdk_nvme_rdma_hooks *hooks)
1855 {
1856 	g_nvme_hooks = *hooks;
1857 }
1858