xref: /spdk/lib/nvmf/rdma.c (revision d92f0f75caf311608f5f0e19d4b3db349609b4e8)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include <infiniband/verbs.h>
37 #include <rdma/rdma_cma.h>
38 #include <rdma/rdma_verbs.h>
39 
40 #include "nvmf_internal.h"
41 #include "request.h"
42 #include "ctrlr.h"
43 #include "subsystem.h"
44 #include "transport.h"
45 
46 #include "spdk/assert.h"
47 #include "spdk/nvmf.h"
48 #include "spdk/nvmf_spec.h"
49 #include "spdk/string.h"
50 #include "spdk/trace.h"
51 #include "spdk/util.h"
52 #include "spdk/likely.h"
53 
54 #include "spdk_internal/log.h"
55 
56 /*
57  RDMA Connection Resouce Defaults
58  */
59 #define NVMF_DEFAULT_TX_SGE		1
60 #define NVMF_DEFAULT_RX_SGE		2
61 
62 enum spdk_nvmf_rdma_request_state {
63 	/* The request is not currently in use */
64 	RDMA_REQUEST_STATE_FREE = 0,
65 
66 	/* Initial state when request first received */
67 	RDMA_REQUEST_STATE_NEW,
68 
69 	/* The request is queued until a data buffer is available. */
70 	RDMA_REQUEST_STATE_NEED_BUFFER,
71 
72 	/* The request is waiting on RDMA queue depth availability
73 	 * to transfer data from the host to the controller.
74 	 */
75 	RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER,
76 
77 	/* The request is currently transferring data from the host to the controller. */
78 	RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER,
79 
80 	/* The request is ready to execute at the block device */
81 	RDMA_REQUEST_STATE_READY_TO_EXECUTE,
82 
83 	/* The request is currently executing at the block device */
84 	RDMA_REQUEST_STATE_EXECUTING,
85 
86 	/* The request finished executing at the block device */
87 	RDMA_REQUEST_STATE_EXECUTED,
88 
89 	/* The request is waiting on RDMA queue depth availability
90 	 * to transfer data from the controller to the host.
91 	 */
92 	RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST,
93 
94 	/* The request is ready to send a completion */
95 	RDMA_REQUEST_STATE_READY_TO_COMPLETE,
96 
97 	/* The request currently has a completion outstanding */
98 	RDMA_REQUEST_STATE_COMPLETING,
99 
100 	/* The request completed and can be marked free. */
101 	RDMA_REQUEST_STATE_COMPLETED,
102 };
103 
104 /* This structure holds commands as they are received off the wire.
105  * It must be dynamically paired with a full request object
106  * (spdk_nvmf_rdma_request) to service a request. It is separate
107  * from the request because RDMA does not appear to order
108  * completions, so occasionally we'll get a new incoming
109  * command when there aren't any free request objects.
110  */
111 struct spdk_nvmf_rdma_recv {
112 	struct ibv_recv_wr		wr;
113 	struct ibv_sge			sgl[NVMF_DEFAULT_RX_SGE];
114 
115 	/* In-capsule data buffer */
116 	uint8_t				*buf;
117 
118 	TAILQ_ENTRY(spdk_nvmf_rdma_recv) link;
119 };
120 
121 struct spdk_nvmf_rdma_request {
122 	struct spdk_nvmf_request		req;
123 	bool					data_from_pool;
124 
125 	enum spdk_nvmf_rdma_request_state	state;
126 
127 	struct spdk_nvmf_rdma_recv		*recv;
128 
129 	struct {
130 		struct	ibv_send_wr		wr;
131 		struct	ibv_sge			sgl[NVMF_DEFAULT_TX_SGE];
132 	} rsp;
133 
134 	struct {
135 		struct ibv_send_wr		wr;
136 		struct ibv_sge			sgl[NVMF_DEFAULT_TX_SGE];
137 	} data;
138 
139 	TAILQ_ENTRY(spdk_nvmf_rdma_request)	link;
140 };
141 
142 struct spdk_nvmf_rdma_qpair {
143 	struct spdk_nvmf_qpair			qpair;
144 
145 	struct spdk_nvmf_rdma_port		*port;
146 
147 	struct rdma_cm_id			*cm_id;
148 	struct ibv_cq				*cq;
149 
150 	/* The maximum number of I/O outstanding on this connection at one time */
151 	uint16_t				max_queue_depth;
152 
153 	/* The maximum number of active RDMA READ and WRITE operations at one time */
154 	uint16_t				max_rw_depth;
155 
156 	/* The current number of I/O outstanding on this connection. This number
157 	 * includes all I/O from the time the capsule is first received until it is
158 	 * completed.
159 	 */
160 	uint16_t				cur_queue_depth;
161 
162 	/* The number of RDMA READ and WRITE requests that are outstanding */
163 	uint16_t				cur_rdma_rw_depth;
164 
165 	/* Receives that are waiting for a request object */
166 	TAILQ_HEAD(, spdk_nvmf_rdma_recv)	incoming_queue;
167 
168 	/* Requests that are not in use */
169 	TAILQ_HEAD(, spdk_nvmf_rdma_request)	free_queue;
170 
171 	/* Requests that are waiting to obtain a data buffer */
172 	TAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_data_buf_queue;
173 
174 	/* Requests that are waiting to perform an RDMA READ or WRITE */
175 	TAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_rdma_rw_queue;
176 
177 	/* Array of size "max_queue_depth" containing RDMA requests. */
178 	struct spdk_nvmf_rdma_request		*reqs;
179 
180 	/* Array of size "max_queue_depth" containing RDMA recvs. */
181 	struct spdk_nvmf_rdma_recv		*recvs;
182 
183 	/* Array of size "max_queue_depth" containing 64 byte capsules
184 	 * used for receive.
185 	 */
186 	union nvmf_h2c_msg			*cmds;
187 	struct ibv_mr				*cmds_mr;
188 
189 	/* Array of size "max_queue_depth" containing 16 byte completions
190 	 * to be sent back to the user.
191 	 */
192 	union nvmf_c2h_msg			*cpls;
193 	struct ibv_mr				*cpls_mr;
194 
195 	/* Array of size "max_queue_depth * InCapsuleDataSize" containing
196 	 * buffers to be used for in capsule data.
197 	 */
198 	void					*bufs;
199 	struct ibv_mr				*bufs_mr;
200 
201 	TAILQ_ENTRY(spdk_nvmf_rdma_qpair)	link;
202 };
203 
204 /* List of RDMA connections that have not yet received a CONNECT capsule */
205 static TAILQ_HEAD(, spdk_nvmf_rdma_qpair) g_pending_conns = TAILQ_HEAD_INITIALIZER(g_pending_conns);
206 
207 struct spdk_nvmf_rdma_poll_group {
208 	struct spdk_nvmf_poll_group		group;
209 
210 	struct spdk_nvmf_rdma_device		*device;
211 };
212 
213 /* Assuming rdma_cm uses just one protection domain per ibv_context. */
214 struct spdk_nvmf_rdma_device {
215 	struct ibv_device_attr			attr;
216 	struct ibv_context			*context;
217 
218 	struct spdk_mem_map			*map;
219 	struct ibv_pd				*pd;
220 
221 	TAILQ_ENTRY(spdk_nvmf_rdma_device)	link;
222 };
223 
224 struct spdk_nvmf_rdma_port {
225 	struct spdk_nvme_transport_id		trid;
226 	struct rdma_cm_id			*id;
227 	struct spdk_nvmf_rdma_device		*device;
228 	uint32_t				ref;
229 	TAILQ_ENTRY(spdk_nvmf_rdma_port)	link;
230 };
231 
232 struct spdk_nvmf_rdma_transport {
233 	struct spdk_nvmf_transport	transport;
234 
235 	struct rdma_event_channel	*event_channel;
236 
237 	struct spdk_mempool		*data_buf_pool;
238 
239 	pthread_mutex_t 		lock;
240 
241 	uint16_t 			max_queue_depth;
242 	uint32_t 			max_io_size;
243 	uint32_t 			in_capsule_data_size;
244 
245 	TAILQ_HEAD(, spdk_nvmf_rdma_device)	devices;
246 	TAILQ_HEAD(, spdk_nvmf_rdma_port)	ports;
247 };
248 
249 static void
250 spdk_nvmf_rdma_qpair_destroy(struct spdk_nvmf_rdma_qpair *rdma_qpair)
251 {
252 	if (rdma_qpair->cmds_mr) {
253 		ibv_dereg_mr(rdma_qpair->cmds_mr);
254 	}
255 
256 	if (rdma_qpair->cpls_mr) {
257 		ibv_dereg_mr(rdma_qpair->cpls_mr);
258 	}
259 
260 	if (rdma_qpair->bufs_mr) {
261 		ibv_dereg_mr(rdma_qpair->bufs_mr);
262 	}
263 
264 	if (rdma_qpair->cm_id) {
265 		rdma_destroy_qp(rdma_qpair->cm_id);
266 		rdma_destroy_id(rdma_qpair->cm_id);
267 	}
268 
269 	if (rdma_qpair->cq) {
270 		ibv_destroy_cq(rdma_qpair->cq);
271 	}
272 
273 	/* Free all memory */
274 	spdk_dma_free(rdma_qpair->cmds);
275 	spdk_dma_free(rdma_qpair->cpls);
276 	spdk_dma_free(rdma_qpair->bufs);
277 	free(rdma_qpair->reqs);
278 	free(rdma_qpair);
279 }
280 
281 static struct spdk_nvmf_rdma_qpair *
282 spdk_nvmf_rdma_qpair_create(struct spdk_nvmf_transport *transport,
283 			    struct spdk_nvmf_rdma_port *port,
284 			    struct rdma_cm_id *id,
285 			    uint16_t max_queue_depth, uint16_t max_rw_depth, uint32_t subsystem_id)
286 {
287 	struct spdk_nvmf_rdma_transport *rtransport;
288 	struct spdk_nvmf_rdma_qpair	*rdma_qpair;
289 	struct spdk_nvmf_qpair		*qpair;
290 	int				rc, i;
291 	struct ibv_qp_init_attr		attr;
292 	struct spdk_nvmf_rdma_recv	*rdma_recv;
293 	struct spdk_nvmf_rdma_request	*rdma_req;
294 	char buf[64];
295 
296 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
297 
298 	rdma_qpair = calloc(1, sizeof(struct spdk_nvmf_rdma_qpair));
299 	if (rdma_qpair == NULL) {
300 		SPDK_ERRLOG("Could not allocate new connection.\n");
301 		return NULL;
302 	}
303 
304 	rdma_qpair->port = port;
305 	rdma_qpair->max_queue_depth = max_queue_depth;
306 	rdma_qpair->max_rw_depth = max_rw_depth;
307 	TAILQ_INIT(&rdma_qpair->incoming_queue);
308 	TAILQ_INIT(&rdma_qpair->free_queue);
309 	TAILQ_INIT(&rdma_qpair->pending_data_buf_queue);
310 	TAILQ_INIT(&rdma_qpair->pending_rdma_rw_queue);
311 
312 	rdma_qpair->cq = ibv_create_cq(id->verbs, max_queue_depth * 3, rdma_qpair, NULL, 0);
313 	if (!rdma_qpair->cq) {
314 		spdk_strerror_r(errno, buf, sizeof(buf));
315 		SPDK_ERRLOG("Unable to create completion queue\n");
316 		SPDK_ERRLOG("Errno %d: %s\n", errno, buf);
317 		rdma_destroy_id(id);
318 		spdk_nvmf_rdma_qpair_destroy(rdma_qpair);
319 		return NULL;
320 	}
321 
322 	memset(&attr, 0, sizeof(struct ibv_qp_init_attr));
323 	attr.qp_type		= IBV_QPT_RC;
324 	attr.send_cq		= rdma_qpair->cq;
325 	attr.recv_cq		= rdma_qpair->cq;
326 	attr.cap.max_send_wr	= max_queue_depth * 2; /* SEND, READ, and WRITE operations */
327 	attr.cap.max_recv_wr	= max_queue_depth; /* RECV operations */
328 	attr.cap.max_send_sge	= NVMF_DEFAULT_TX_SGE;
329 	attr.cap.max_recv_sge	= NVMF_DEFAULT_RX_SGE;
330 
331 	rc = rdma_create_qp(id, NULL, &attr);
332 	if (rc) {
333 		spdk_strerror_r(errno, buf, sizeof(buf));
334 		SPDK_ERRLOG("rdma_create_qp failed\n");
335 		SPDK_ERRLOG("Errno %d: %s\n", errno, buf);
336 		rdma_destroy_id(id);
337 		spdk_nvmf_rdma_qpair_destroy(rdma_qpair);
338 		return NULL;
339 	}
340 
341 	qpair = &rdma_qpair->qpair;
342 	qpair->transport = transport;
343 	id->context = qpair;
344 	rdma_qpair->cm_id = id;
345 
346 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "New RDMA Connection: %p\n", qpair);
347 
348 	rdma_qpair->reqs = calloc(max_queue_depth, sizeof(*rdma_qpair->reqs));
349 	rdma_qpair->recvs = calloc(max_queue_depth, sizeof(*rdma_qpair->recvs));
350 	rdma_qpair->cmds = spdk_dma_zmalloc(max_queue_depth * sizeof(*rdma_qpair->cmds),
351 					    0x1000, NULL);
352 	rdma_qpair->cpls = spdk_dma_zmalloc(max_queue_depth * sizeof(*rdma_qpair->cpls),
353 					    0x1000, NULL);
354 	rdma_qpair->bufs = spdk_dma_zmalloc(max_queue_depth * rtransport->in_capsule_data_size,
355 					    0x1000, NULL);
356 	if (!rdma_qpair->reqs || !rdma_qpair->recvs || !rdma_qpair->cmds ||
357 	    !rdma_qpair->cpls || !rdma_qpair->bufs) {
358 		SPDK_ERRLOG("Unable to allocate sufficient memory for RDMA queue.\n");
359 		spdk_nvmf_rdma_qpair_destroy(rdma_qpair);
360 		return NULL;
361 	}
362 
363 	rdma_qpair->cmds_mr = ibv_reg_mr(id->pd, rdma_qpair->cmds,
364 					 max_queue_depth * sizeof(*rdma_qpair->cmds),
365 					 IBV_ACCESS_LOCAL_WRITE);
366 	rdma_qpair->cpls_mr = ibv_reg_mr(id->pd, rdma_qpair->cpls,
367 					 max_queue_depth * sizeof(*rdma_qpair->cpls),
368 					 0);
369 	rdma_qpair->bufs_mr = ibv_reg_mr(id->pd, rdma_qpair->bufs,
370 					 max_queue_depth * rtransport->in_capsule_data_size,
371 					 IBV_ACCESS_LOCAL_WRITE |
372 					 IBV_ACCESS_REMOTE_WRITE);
373 	if (!rdma_qpair->cmds_mr || !rdma_qpair->cpls_mr || !rdma_qpair->bufs_mr) {
374 		SPDK_ERRLOG("Unable to register required memory for RDMA queue.\n");
375 		spdk_nvmf_rdma_qpair_destroy(rdma_qpair);
376 		return NULL;
377 	}
378 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Command Array: %p Length: %lx LKey: %x\n",
379 		      rdma_qpair->cmds, max_queue_depth * sizeof(*rdma_qpair->cmds), rdma_qpair->cmds_mr->lkey);
380 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Completion Array: %p Length: %lx LKey: %x\n",
381 		      rdma_qpair->cpls, max_queue_depth * sizeof(*rdma_qpair->cpls), rdma_qpair->cpls_mr->lkey);
382 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "In Capsule Data Array: %p Length: %x LKey: %x\n",
383 		      rdma_qpair->bufs, max_queue_depth * rtransport->in_capsule_data_size, rdma_qpair->bufs_mr->lkey);
384 
385 	for (i = 0; i < max_queue_depth; i++) {
386 		struct ibv_recv_wr *bad_wr = NULL;
387 
388 		rdma_recv = &rdma_qpair->recvs[i];
389 
390 		/* Set up memory to receive commands */
391 		rdma_recv->buf = (void *)((uintptr_t)rdma_qpair->bufs + (i * rtransport->in_capsule_data_size));
392 
393 		rdma_recv->sgl[0].addr = (uintptr_t)&rdma_qpair->cmds[i];
394 		rdma_recv->sgl[0].length = sizeof(rdma_qpair->cmds[i]);
395 		rdma_recv->sgl[0].lkey = rdma_qpair->cmds_mr->lkey;
396 
397 		rdma_recv->sgl[1].addr = (uintptr_t)rdma_recv->buf;
398 		rdma_recv->sgl[1].length = rtransport->in_capsule_data_size;
399 		rdma_recv->sgl[1].lkey = rdma_qpair->bufs_mr->lkey;
400 
401 		rdma_recv->wr.wr_id = (uintptr_t)rdma_recv;
402 		rdma_recv->wr.sg_list = rdma_recv->sgl;
403 		rdma_recv->wr.num_sge = SPDK_COUNTOF(rdma_recv->sgl);
404 
405 		rc = ibv_post_recv(rdma_qpair->cm_id->qp, &rdma_recv->wr, &bad_wr);
406 		if (rc) {
407 			SPDK_ERRLOG("Unable to post capsule for RDMA RECV\n");
408 			spdk_nvmf_rdma_qpair_destroy(rdma_qpair);
409 			return NULL;
410 		}
411 	}
412 
413 	for (i = 0; i < max_queue_depth; i++) {
414 		rdma_req = &rdma_qpair->reqs[i];
415 
416 		rdma_req->req.qpair = &rdma_qpair->qpair;
417 		rdma_req->req.cmd = NULL;
418 
419 		/* Set up memory to send responses */
420 		rdma_req->req.rsp = &rdma_qpair->cpls[i];
421 
422 		rdma_req->rsp.sgl[0].addr = (uintptr_t)&rdma_qpair->cpls[i];
423 		rdma_req->rsp.sgl[0].length = sizeof(rdma_qpair->cpls[i]);
424 		rdma_req->rsp.sgl[0].lkey = rdma_qpair->cpls_mr->lkey;
425 
426 		rdma_req->rsp.wr.wr_id = (uintptr_t)rdma_req;
427 		rdma_req->rsp.wr.next = NULL;
428 		rdma_req->rsp.wr.opcode = IBV_WR_SEND;
429 		rdma_req->rsp.wr.send_flags = IBV_SEND_SIGNALED;
430 		rdma_req->rsp.wr.sg_list = rdma_req->rsp.sgl;
431 		rdma_req->rsp.wr.num_sge = SPDK_COUNTOF(rdma_req->rsp.sgl);
432 
433 		/* Set up memory for data buffers */
434 		rdma_req->data.wr.wr_id = (uint64_t)rdma_req;
435 		rdma_req->data.wr.next = NULL;
436 		rdma_req->data.wr.send_flags = IBV_SEND_SIGNALED;
437 		rdma_req->data.wr.sg_list = rdma_req->data.sgl;
438 		rdma_req->data.wr.num_sge = SPDK_COUNTOF(rdma_req->data.sgl);
439 
440 		TAILQ_INSERT_TAIL(&rdma_qpair->free_queue, rdma_req, link);
441 	}
442 
443 	return rdma_qpair;
444 }
445 
446 static int
447 request_transfer_in(struct spdk_nvmf_request *req)
448 {
449 	int				rc;
450 	struct spdk_nvmf_rdma_request	*rdma_req;
451 	struct spdk_nvmf_qpair 		*qpair;
452 	struct spdk_nvmf_rdma_qpair 	*rdma_qpair;
453 	struct ibv_send_wr		*bad_wr = NULL;
454 
455 	qpair = req->qpair;
456 	rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
457 	rdma_qpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
458 
459 	assert(req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER);
460 
461 	rdma_qpair->cur_rdma_rw_depth++;
462 
463 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "RDMA READ POSTED. Request: %p Connection: %p\n", req, qpair);
464 	spdk_trace_record(TRACE_RDMA_READ_START, 0, 0, (uintptr_t)req, 0);
465 
466 	rdma_req->data.wr.opcode = IBV_WR_RDMA_READ;
467 	rdma_req->data.wr.next = NULL;
468 	rc = ibv_post_send(rdma_qpair->cm_id->qp, &rdma_req->data.wr, &bad_wr);
469 	if (rc) {
470 		SPDK_ERRLOG("Unable to transfer data from host to target\n");
471 		return -1;
472 	}
473 
474 	return 0;
475 }
476 
477 static int
478 request_transfer_out(struct spdk_nvmf_request *req)
479 {
480 	int 				rc;
481 	struct spdk_nvmf_rdma_request	*rdma_req;
482 	struct spdk_nvmf_qpair		*qpair;
483 	struct spdk_nvmf_rdma_qpair 	*rdma_qpair;
484 	struct spdk_nvme_cpl		*rsp;
485 	struct ibv_recv_wr		*bad_recv_wr = NULL;
486 	struct ibv_send_wr		*send_wr, *bad_send_wr = NULL;
487 
488 	qpair = req->qpair;
489 	rsp = &req->rsp->nvme_cpl;
490 	rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
491 	rdma_qpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
492 
493 	/* Advance our sq_head pointer */
494 	if (qpair->sq_head == qpair->sq_head_max) {
495 		qpair->sq_head = 0;
496 	} else {
497 		qpair->sq_head++;
498 	}
499 	rsp->sqhd = qpair->sq_head;
500 
501 	/* Post the capsule to the recv buffer */
502 	assert(rdma_req->recv != NULL);
503 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "RDMA RECV POSTED. Recv: %p Connection: %p\n", rdma_req->recv,
504 		      rdma_qpair);
505 	rc = ibv_post_recv(rdma_qpair->cm_id->qp, &rdma_req->recv->wr, &bad_recv_wr);
506 	if (rc) {
507 		SPDK_ERRLOG("Unable to re-post rx descriptor\n");
508 		return rc;
509 	}
510 	rdma_req->recv = NULL;
511 
512 	/* Build the response which consists of an optional
513 	 * RDMA WRITE to transfer data, plus an RDMA SEND
514 	 * containing the response.
515 	 */
516 	send_wr = &rdma_req->rsp.wr;
517 
518 	if (rsp->status.sc == SPDK_NVME_SC_SUCCESS &&
519 	    req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
520 		SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "RDMA WRITE POSTED. Request: %p Connection: %p\n", req, qpair);
521 		spdk_trace_record(TRACE_RDMA_WRITE_START, 0, 0, (uintptr_t)req, 0);
522 
523 		rdma_qpair->cur_rdma_rw_depth++;
524 		rdma_req->data.wr.opcode = IBV_WR_RDMA_WRITE;
525 
526 		rdma_req->data.wr.next = send_wr;
527 		send_wr = &rdma_req->data.wr;
528 	}
529 
530 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "RDMA SEND POSTED. Request: %p Connection: %p\n", req, qpair);
531 	spdk_trace_record(TRACE_NVMF_IO_COMPLETE, 0, 0, (uintptr_t)req, 0);
532 
533 	/* Send the completion */
534 	rc = ibv_post_send(rdma_qpair->cm_id->qp, send_wr, &bad_send_wr);
535 	if (rc) {
536 		SPDK_ERRLOG("Unable to send response capsule\n");
537 	}
538 
539 	return rc;
540 }
541 
542 static int
543 nvmf_rdma_connect(struct spdk_nvmf_transport *transport, struct rdma_cm_event *event)
544 {
545 	struct spdk_nvmf_rdma_transport *rtransport;
546 	struct spdk_nvmf_rdma_qpair	*rdma_qpair = NULL;
547 	struct spdk_nvmf_rdma_port 	*port;
548 	struct rdma_conn_param		*rdma_param = NULL;
549 	struct rdma_conn_param		ctrlr_event_data;
550 	const struct spdk_nvmf_rdma_request_private_data *private_data = NULL;
551 	struct spdk_nvmf_rdma_accept_private_data accept_data;
552 	uint16_t			sts = 0;
553 	uint16_t			max_queue_depth;
554 	uint16_t			max_rw_depth;
555 	uint32_t			subsystem_id = 0;
556 	int 				rc;
557 
558 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
559 
560 	if (event->id == NULL) {
561 		SPDK_ERRLOG("connect request: missing cm_id\n");
562 		goto err0;
563 	}
564 
565 	if (event->id->verbs == NULL) {
566 		SPDK_ERRLOG("connect request: missing cm_id ibv_context\n");
567 		goto err0;
568 	}
569 
570 	rdma_param = &event->param.conn;
571 	if (rdma_param->private_data == NULL ||
572 	    rdma_param->private_data_len < sizeof(struct spdk_nvmf_rdma_request_private_data)) {
573 		SPDK_ERRLOG("connect request: no private data provided\n");
574 		goto err0;
575 	}
576 	private_data = rdma_param->private_data;
577 
578 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Connect Recv on fabric intf name %s, dev_name %s\n",
579 		      event->id->verbs->device->name, event->id->verbs->device->dev_name);
580 
581 	port = event->listen_id->context;
582 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Listen Id was %p with verbs %p. ListenAddr: %p\n",
583 		      event->listen_id, event->listen_id->verbs, port);
584 
585 	/* Figure out the supported queue depth. This is a multi-step process
586 	 * that takes into account hardware maximums, host provided values,
587 	 * and our target's internal memory limits */
588 
589 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Calculating Queue Depth\n");
590 
591 	/* Start with the maximum queue depth allowed by the target */
592 	max_queue_depth = rtransport->max_queue_depth;
593 	max_rw_depth = rtransport->max_queue_depth;
594 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Target Max Queue Depth: %d\n", rtransport->max_queue_depth);
595 
596 	/* Next check the local NIC's hardware limitations */
597 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA,
598 		      "Local NIC Max Send/Recv Queue Depth: %d Max Read/Write Queue Depth: %d\n",
599 		      port->device->attr.max_qp_wr, port->device->attr.max_qp_rd_atom);
600 	max_queue_depth = spdk_min(max_queue_depth, port->device->attr.max_qp_wr);
601 	max_rw_depth = spdk_min(max_rw_depth, port->device->attr.max_qp_rd_atom);
602 
603 	/* Next check the remote NIC's hardware limitations */
604 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA,
605 		      "Host (Initiator) NIC Max Incoming RDMA R/W operations: %d Max Outgoing RDMA R/W operations: %d\n",
606 		      rdma_param->initiator_depth, rdma_param->responder_resources);
607 	if (rdma_param->initiator_depth > 0) {
608 		max_rw_depth = spdk_min(max_rw_depth, rdma_param->initiator_depth);
609 	}
610 
611 	/* Finally check for the host software requested values, which are
612 	 * optional. */
613 	if (rdma_param->private_data != NULL &&
614 	    rdma_param->private_data_len >= sizeof(struct spdk_nvmf_rdma_request_private_data)) {
615 		SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Host Receive Queue Size: %d\n", private_data->hrqsize);
616 		SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Host Send Queue Size: %d\n", private_data->hsqsize);
617 		max_queue_depth = spdk_min(max_queue_depth, private_data->hrqsize);
618 		max_queue_depth = spdk_min(max_queue_depth, private_data->hsqsize + 1);
619 	}
620 
621 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Final Negotiated Queue Depth: %d R/W Depth: %d\n",
622 		      max_queue_depth, max_rw_depth);
623 
624 	/* Init the NVMf rdma transport connection */
625 	rdma_qpair = spdk_nvmf_rdma_qpair_create(transport, port, event->id, max_queue_depth,
626 			max_rw_depth, subsystem_id);
627 	if (rdma_qpair == NULL) {
628 		SPDK_ERRLOG("Error on nvmf connection creation\n");
629 		goto err1;
630 	}
631 
632 	accept_data.recfmt = 0;
633 	accept_data.crqsize = max_queue_depth;
634 	ctrlr_event_data = *rdma_param;
635 	ctrlr_event_data.private_data = &accept_data;
636 	ctrlr_event_data.private_data_len = sizeof(accept_data);
637 	if (event->id->ps == RDMA_PS_TCP) {
638 		ctrlr_event_data.responder_resources = 0; /* We accept 0 reads from the host */
639 		ctrlr_event_data.initiator_depth = max_rw_depth;
640 	}
641 
642 	rc = rdma_accept(event->id, &ctrlr_event_data);
643 	if (rc) {
644 		SPDK_ERRLOG("Error %d on rdma_accept\n", errno);
645 		goto err2;
646 	}
647 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Sent back the accept\n");
648 
649 	/* Add this RDMA connection to the global list until a CONNECT capsule
650 	 * is received. */
651 	TAILQ_INSERT_TAIL(&g_pending_conns, rdma_qpair, link);
652 
653 	return 0;
654 
655 err2:
656 	spdk_nvmf_rdma_qpair_destroy(rdma_qpair);
657 
658 err1: {
659 		struct spdk_nvmf_rdma_reject_private_data rej_data;
660 
661 		rej_data.status.sc = sts;
662 		rdma_reject(event->id, &ctrlr_event_data, sizeof(rej_data));
663 	}
664 err0:
665 	return -1;
666 }
667 
668 static int
669 nvmf_rdma_disconnect(struct rdma_cm_event *evt)
670 {
671 	struct spdk_nvmf_qpair		*qpair;
672 	struct spdk_nvmf_ctrlr		*ctrlr;
673 	struct spdk_nvmf_subsystem	*subsystem;
674 	struct spdk_nvmf_rdma_qpair 	*rdma_qpair;
675 	struct spdk_nvmf_rdma_qpair	*r, *t;
676 
677 	if (evt->id == NULL) {
678 		SPDK_ERRLOG("disconnect request: missing cm_id\n");
679 		return -1;
680 	}
681 
682 	qpair = evt->id->context;
683 	if (qpair == NULL) {
684 		SPDK_ERRLOG("disconnect request: no active connection\n");
685 		return -1;
686 	}
687 	/* ack the disconnect event before rdma_destroy_id */
688 	rdma_ack_cm_event(evt);
689 
690 	rdma_qpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
691 
692 	/* The connection may still be in this pending list when a disconnect
693 	 * event arrives. Search for it and remove it if it is found.
694 	 */
695 	TAILQ_FOREACH_SAFE(r, &g_pending_conns, link, t) {
696 		if (r == rdma_qpair) {
697 			SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Received disconnect for qpair %p before first SEND ack\n",
698 				      rdma_qpair);
699 			TAILQ_REMOVE(&g_pending_conns, rdma_qpair, link);
700 			break;
701 		}
702 	}
703 
704 	ctrlr = qpair->ctrlr;
705 	if (ctrlr == NULL) {
706 		/* No ctrlr has been established yet, so destroy
707 		 * the connection immediately.
708 		 */
709 		spdk_nvmf_rdma_qpair_destroy(rdma_qpair);
710 		return 0;
711 	}
712 
713 	subsystem = ctrlr->subsys;
714 
715 	subsystem->disconnect_cb(subsystem->cb_ctx, qpair);
716 
717 	return 0;
718 }
719 
720 #ifdef DEBUG
721 static const char *CM_EVENT_STR[] = {
722 	"RDMA_CM_EVENT_ADDR_RESOLVED",
723 	"RDMA_CM_EVENT_ADDR_ERROR",
724 	"RDMA_CM_EVENT_ROUTE_RESOLVED",
725 	"RDMA_CM_EVENT_ROUTE_ERROR",
726 	"RDMA_CM_EVENT_CONNECT_REQUEST",
727 	"RDMA_CM_EVENT_CONNECT_RESPONSE",
728 	"RDMA_CM_EVENT_CONNECT_ERROR",
729 	"RDMA_CM_EVENT_UNREACHABLE",
730 	"RDMA_CM_EVENT_REJECTED",
731 	"RDMA_CM_EVENT_ESTABLISHED",
732 	"RDMA_CM_EVENT_DISCONNECTED",
733 	"RDMA_CM_EVENT_DEVICE_REMOVAL",
734 	"RDMA_CM_EVENT_MULTICAST_JOIN",
735 	"RDMA_CM_EVENT_MULTICAST_ERROR",
736 	"RDMA_CM_EVENT_ADDR_CHANGE",
737 	"RDMA_CM_EVENT_TIMEWAIT_EXIT"
738 };
739 #endif /* DEBUG */
740 
741 static int
742 spdk_nvmf_rdma_mem_notify(void *cb_ctx, struct spdk_mem_map *map,
743 			  enum spdk_mem_map_notify_action action,
744 			  void *vaddr, size_t size)
745 {
746 	struct spdk_nvmf_rdma_device *device = cb_ctx;
747 	struct ibv_pd *pd = device->pd;
748 	struct ibv_mr *mr;
749 
750 	switch (action) {
751 	case SPDK_MEM_MAP_NOTIFY_REGISTER:
752 		mr = ibv_reg_mr(pd, vaddr, size,
753 				IBV_ACCESS_LOCAL_WRITE |
754 				IBV_ACCESS_REMOTE_READ |
755 				IBV_ACCESS_REMOTE_WRITE);
756 		if (mr == NULL) {
757 			SPDK_ERRLOG("ibv_reg_mr() failed\n");
758 			return -1;
759 		} else {
760 			spdk_mem_map_set_translation(map, (uint64_t)vaddr, size, (uint64_t)mr);
761 		}
762 		break;
763 	case SPDK_MEM_MAP_NOTIFY_UNREGISTER:
764 		mr = (struct ibv_mr *)spdk_mem_map_translate(map, (uint64_t)vaddr);
765 		spdk_mem_map_clear_translation(map, (uint64_t)vaddr, size);
766 		if (mr) {
767 			ibv_dereg_mr(mr);
768 		}
769 		break;
770 	}
771 
772 	return 0;
773 }
774 
775 typedef enum spdk_nvme_data_transfer spdk_nvme_data_transfer_t;
776 
777 static spdk_nvme_data_transfer_t
778 spdk_nvmf_rdma_request_get_xfer(struct spdk_nvmf_rdma_request *rdma_req)
779 {
780 	enum spdk_nvme_data_transfer xfer;
781 	struct spdk_nvme_cmd *cmd = &rdma_req->req.cmd->nvme_cmd;
782 	struct spdk_nvme_sgl_descriptor *sgl = &cmd->dptr.sgl1;
783 
784 	/* Figure out data transfer direction */
785 	if (cmd->opc == SPDK_NVME_OPC_FABRIC) {
786 		xfer = spdk_nvme_opc_get_data_transfer(rdma_req->req.cmd->nvmf_cmd.fctype);
787 	} else {
788 		xfer = spdk_nvme_opc_get_data_transfer(cmd->opc);
789 
790 		/* Some admin commands are special cases */
791 		if ((rdma_req->req.qpair->qid == 0) &&
792 		    ((cmd->opc == SPDK_NVME_OPC_GET_FEATURES) ||
793 		     (cmd->opc == SPDK_NVME_OPC_SET_FEATURES))) {
794 			switch (cmd->cdw10 & 0xff) {
795 			case SPDK_NVME_FEAT_LBA_RANGE_TYPE:
796 			case SPDK_NVME_FEAT_AUTONOMOUS_POWER_STATE_TRANSITION:
797 			case SPDK_NVME_FEAT_HOST_IDENTIFIER:
798 				break;
799 			default:
800 				xfer = SPDK_NVME_DATA_NONE;
801 			}
802 		}
803 	}
804 
805 	if (xfer == SPDK_NVME_DATA_NONE) {
806 		return xfer;
807 	}
808 
809 	/* Even for commands that may transfer data, they could have specified 0 length.
810 	 * We want those to show up with xfer SPDK_NVME_DATA_NONE.
811 	 */
812 	switch (sgl->generic.type) {
813 	case SPDK_NVME_SGL_TYPE_DATA_BLOCK:
814 	case SPDK_NVME_SGL_TYPE_BIT_BUCKET:
815 	case SPDK_NVME_SGL_TYPE_SEGMENT:
816 	case SPDK_NVME_SGL_TYPE_LAST_SEGMENT:
817 		if (sgl->unkeyed.length == 0) {
818 			xfer = SPDK_NVME_DATA_NONE;
819 		}
820 		break;
821 	case SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK:
822 		if (sgl->keyed.length == 0) {
823 			xfer = SPDK_NVME_DATA_NONE;
824 		}
825 		break;
826 	}
827 
828 	return xfer;
829 }
830 
831 static int
832 spdk_nvmf_rdma_request_parse_sgl(struct spdk_nvmf_rdma_transport *rtransport,
833 				 struct spdk_nvmf_rdma_device *device,
834 				 struct spdk_nvmf_rdma_request *rdma_req)
835 {
836 	struct spdk_nvme_cmd			*cmd;
837 	struct spdk_nvme_cpl			*rsp;
838 	struct spdk_nvme_sgl_descriptor		*sgl;
839 
840 	cmd = &rdma_req->req.cmd->nvme_cmd;
841 	rsp = &rdma_req->req.rsp->nvme_cpl;
842 	sgl = &cmd->dptr.sgl1;
843 
844 	if (sgl->generic.type == SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK &&
845 	    (sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_ADDRESS ||
846 	     sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_INVALIDATE_KEY)) {
847 		if (sgl->keyed.length > rtransport->max_io_size) {
848 			SPDK_ERRLOG("SGL length 0x%x exceeds max io size 0x%x\n",
849 				    sgl->keyed.length, rtransport->max_io_size);
850 			rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID;
851 			return -1;
852 		}
853 
854 		rdma_req->req.length = sgl->keyed.length;
855 		rdma_req->req.data = spdk_mempool_get(rtransport->data_buf_pool);
856 		if (!rdma_req->req.data) {
857 			/* No available buffers. Queue this request up. */
858 			SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "No available large data buffers. Queueing request %p\n", rdma_req);
859 			return 0;
860 		}
861 
862 		rdma_req->data_from_pool = true;
863 		rdma_req->data.sgl[0].addr = (uintptr_t)rdma_req->req.data;
864 		rdma_req->data.sgl[0].length = sgl->keyed.length;
865 		rdma_req->data.sgl[0].lkey = ((struct ibv_mr *)spdk_mem_map_translate(device->map,
866 					      (uint64_t)rdma_req->req.data))->lkey;
867 		rdma_req->data.wr.wr.rdma.rkey = sgl->keyed.key;
868 		rdma_req->data.wr.wr.rdma.remote_addr = sgl->address;
869 
870 		SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Request %p took buffer from central pool\n", rdma_req);
871 
872 		return 0;
873 	} else if (sgl->generic.type == SPDK_NVME_SGL_TYPE_DATA_BLOCK &&
874 		   sgl->unkeyed.subtype == SPDK_NVME_SGL_SUBTYPE_OFFSET) {
875 		uint64_t offset = sgl->address;
876 		uint32_t max_len = rtransport->in_capsule_data_size;
877 
878 		SPDK_DEBUGLOG(SPDK_TRACE_NVMF, "In-capsule data: offset 0x%" PRIx64 ", length 0x%x\n",
879 			      offset, sgl->unkeyed.length);
880 
881 		if (offset > max_len) {
882 			SPDK_ERRLOG("In-capsule offset 0x%" PRIx64 " exceeds capsule length 0x%x\n",
883 				    offset, max_len);
884 			rsp->status.sc = SPDK_NVME_SC_INVALID_SGL_OFFSET;
885 			return -1;
886 		}
887 		max_len -= (uint32_t)offset;
888 
889 		if (sgl->unkeyed.length > max_len) {
890 			SPDK_ERRLOG("In-capsule data length 0x%x exceeds capsule length 0x%x\n",
891 				    sgl->unkeyed.length, max_len);
892 			rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID;
893 			return -1;
894 		}
895 
896 		rdma_req->req.data = rdma_req->recv->buf + offset;
897 		rdma_req->data_from_pool = false;
898 		rdma_req->req.length = sgl->unkeyed.length;
899 		return 0;
900 	}
901 
902 	SPDK_ERRLOG("Invalid NVMf I/O Command SGL:  Type 0x%x, Subtype 0x%x\n",
903 		    sgl->generic.type, sgl->generic.subtype);
904 	rsp->status.sc = SPDK_NVME_SC_SGL_DESCRIPTOR_TYPE_INVALID;
905 	return -1;
906 }
907 
908 static bool
909 spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
910 			       struct spdk_nvmf_rdma_request *rdma_req)
911 {
912 	struct spdk_nvmf_rdma_qpair	*rqpair;
913 	struct spdk_nvmf_rdma_device	*device;
914 	struct spdk_nvme_cpl		*rsp = &rdma_req->req.rsp->nvme_cpl;
915 	int				rc;
916 	struct spdk_nvmf_rdma_recv	*rdma_recv;
917 	enum spdk_nvmf_rdma_request_state prev_state;
918 	bool				progress = false;
919 
920 	rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
921 	device = rqpair->port->device;
922 
923 	assert(rdma_req->state != RDMA_REQUEST_STATE_FREE);
924 
925 	/* The loop here is to allow for several back-to-back state changes. */
926 	do {
927 		prev_state = rdma_req->state;
928 
929 		SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Request %p entering state %d\n", rdma_req, prev_state);
930 
931 		switch (rdma_req->state) {
932 		case RDMA_REQUEST_STATE_FREE:
933 			/* Some external code must kick a request into RDMA_REQUEST_STATE_NEW
934 			 * to escape this state. */
935 			break;
936 		case RDMA_REQUEST_STATE_NEW:
937 			rqpair->cur_queue_depth++;
938 			rdma_recv = rdma_req->recv;
939 
940 			/* The first element of the SGL is the NVMe command */
941 			rdma_req->req.cmd = (union nvmf_h2c_msg *)rdma_recv->sgl[0].addr;
942 			memset(rdma_req->req.rsp, 0, sizeof(*rdma_req->req.rsp));
943 
944 			TAILQ_REMOVE(&rqpair->incoming_queue, rdma_recv, link);
945 			TAILQ_REMOVE(&rqpair->free_queue, rdma_req, link);
946 
947 			/* The next state transition depends on the data transfer needs of this request. */
948 			rdma_req->req.xfer = spdk_nvmf_rdma_request_get_xfer(rdma_req);
949 
950 			/* If no data to transfer, ready to execute. */
951 			if (rdma_req->req.xfer == SPDK_NVME_DATA_NONE) {
952 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
953 				break;
954 			}
955 
956 			rdma_req->state = RDMA_REQUEST_STATE_NEED_BUFFER;
957 			TAILQ_INSERT_TAIL(&rqpair->pending_data_buf_queue, rdma_req, link);
958 			break;
959 		case RDMA_REQUEST_STATE_NEED_BUFFER:
960 			assert(rdma_req->req.xfer != SPDK_NVME_DATA_NONE);
961 
962 			if (rdma_req != TAILQ_FIRST(&rqpair->pending_data_buf_queue)) {
963 				/* This request needs to wait in line to obtain a buffer */
964 				break;
965 			}
966 
967 			TAILQ_REMOVE(&rqpair->pending_data_buf_queue, rdma_req, link);
968 
969 			/* Try to get a data buffer */
970 			rc = spdk_nvmf_rdma_request_parse_sgl(rtransport, device, rdma_req);
971 			if (rc < 0) {
972 				rsp->status.sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
973 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
974 				break;
975 			}
976 
977 			if (!rdma_req->req.data) {
978 				/* No buffers available. Put this request back at the head of
979 				 * the queue. */
980 				TAILQ_INSERT_HEAD(&rqpair->pending_data_buf_queue, rdma_req, link);
981 				break;
982 			}
983 
984 			/* If data is transferring from host to controller and the data didn't
985 			 * arrive using in capsule data, we need to do a transfer from the host.
986 			 */
987 			if (rdma_req->req.xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER && rdma_req->data_from_pool) {
988 				rdma_req->state = RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER;
989 				TAILQ_INSERT_TAIL(&rqpair->pending_rdma_rw_queue, rdma_req, link);
990 				break;
991 			}
992 
993 			rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
994 			break;
995 		case RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER:
996 			if (rdma_req != TAILQ_FIRST(&rqpair->pending_rdma_rw_queue)) {
997 				/* This request needs to wait in line to perform RDMA */
998 				break;
999 			}
1000 
1001 			if (rqpair->cur_rdma_rw_depth < rqpair->max_rw_depth) {
1002 				TAILQ_REMOVE(&rqpair->pending_rdma_rw_queue, rdma_req, link);
1003 				rdma_req->state = RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER;
1004 				rc = request_transfer_in(&rdma_req->req);
1005 				if (rc) {
1006 					rsp->status.sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1007 					rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1008 				}
1009 			}
1010 			break;
1011 		case RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER:
1012 			/* Some external code must kick a request into RDMA_REQUEST_STATE_READY_TO_EXECUTE
1013 			 * to escape this state. */
1014 			break;
1015 		case RDMA_REQUEST_STATE_READY_TO_EXECUTE:
1016 			rdma_req->state = RDMA_REQUEST_STATE_EXECUTING;
1017 			spdk_nvmf_request_exec(&rdma_req->req);
1018 			break;
1019 		case RDMA_REQUEST_STATE_EXECUTING:
1020 			/* Some external code must kick a request into RDMA_REQUEST_STATE_EXECUTED
1021 			 * to escape this state. */
1022 			break;
1023 		case RDMA_REQUEST_STATE_EXECUTED:
1024 			if (rdma_req->req.xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
1025 				rdma_req->state = RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST;
1026 				TAILQ_INSERT_TAIL(&rqpair->pending_rdma_rw_queue, rdma_req, link);
1027 			} else {
1028 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1029 			}
1030 			break;
1031 		case RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST:
1032 			if (rdma_req != TAILQ_FIRST(&rqpair->pending_rdma_rw_queue)) {
1033 				/* This request needs to wait in line to perform RDMA */
1034 				break;
1035 			}
1036 
1037 			if (rqpair->cur_rdma_rw_depth < rqpair->max_rw_depth) {
1038 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1039 				TAILQ_REMOVE(&rqpair->pending_rdma_rw_queue, rdma_req, link);
1040 			}
1041 			break;
1042 		case RDMA_REQUEST_STATE_READY_TO_COMPLETE:
1043 			rdma_req->state = RDMA_REQUEST_STATE_COMPLETING;
1044 
1045 			rc = request_transfer_out(&rdma_req->req);
1046 			assert(rc == 0); /* No good way to handle this currently */
1047 			break;
1048 		case RDMA_REQUEST_STATE_COMPLETING:
1049 			/* Some external code must kick a request into RDMA_REQUEST_STATE_COMPLETED
1050 			 * to escape this state. */
1051 			break;
1052 		case RDMA_REQUEST_STATE_COMPLETED:
1053 			assert(rqpair->cur_queue_depth > 0);
1054 			rqpair->cur_queue_depth--;
1055 
1056 			if (rdma_req->data_from_pool) {
1057 				/* Put the buffer back in the pool */
1058 				spdk_mempool_put(rtransport->data_buf_pool, rdma_req->req.data);
1059 				rdma_req->data_from_pool = false;
1060 			}
1061 			rdma_req->req.length = 0;
1062 			rdma_req->req.data = NULL;
1063 			rdma_req->state = RDMA_REQUEST_STATE_FREE;
1064 			TAILQ_INSERT_TAIL(&rqpair->free_queue, rdma_req, link);
1065 			break;
1066 		}
1067 
1068 		if (rdma_req->state != prev_state) {
1069 			progress = true;
1070 		}
1071 	} while (rdma_req->state != prev_state);
1072 
1073 	return progress;
1074 }
1075 
1076 /* Public API callbacks begin here */
1077 
1078 static struct spdk_nvmf_transport *
1079 spdk_nvmf_rdma_create(struct spdk_nvmf_tgt *tgt)
1080 {
1081 	int rc;
1082 	struct spdk_nvmf_rdma_transport *rtransport;
1083 	struct spdk_nvmf_rdma_device	*device, *tmp;
1084 	struct ibv_context		**contexts;
1085 	uint32_t			i;
1086 	char				buf[64];
1087 
1088 	rtransport = calloc(1, sizeof(*rtransport));
1089 	if (!rtransport) {
1090 		return NULL;
1091 	}
1092 
1093 	pthread_mutex_init(&rtransport->lock, NULL);
1094 	TAILQ_INIT(&rtransport->devices);
1095 	TAILQ_INIT(&rtransport->ports);
1096 
1097 	rtransport->transport.tgt = tgt;
1098 	rtransport->transport.ops = &spdk_nvmf_transport_rdma;
1099 
1100 	SPDK_NOTICELOG("*** RDMA Transport Init ***\n");
1101 
1102 	rtransport->max_queue_depth = tgt->opts.max_queue_depth;
1103 	rtransport->max_io_size = tgt->opts.max_io_size;
1104 	rtransport->in_capsule_data_size = tgt->opts.in_capsule_data_size;
1105 
1106 	rtransport->event_channel = rdma_create_event_channel();
1107 	if (rtransport->event_channel == NULL) {
1108 		spdk_strerror_r(errno, buf, sizeof(buf));
1109 		SPDK_ERRLOG("rdma_create_event_channel() failed, %s\n", buf);
1110 		free(rtransport);
1111 		return NULL;
1112 	}
1113 
1114 	rc = fcntl(rtransport->event_channel->fd, F_SETFL, O_NONBLOCK);
1115 	if (rc < 0) {
1116 		SPDK_ERRLOG("fcntl to set fd to non-blocking failed\n");
1117 		free(rtransport);
1118 		return NULL;
1119 	}
1120 
1121 	rtransport->data_buf_pool = spdk_mempool_create("spdk_nvmf_rdma",
1122 				    rtransport->max_queue_depth * 4, /* The 4 is arbitrarily chosen. Needs to be configurable. */
1123 				    rtransport->max_io_size,
1124 				    SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
1125 				    SPDK_ENV_SOCKET_ID_ANY);
1126 	if (!rtransport->data_buf_pool) {
1127 		SPDK_ERRLOG("Unable to allocate buffer pool for poll group\n");
1128 		free(rtransport);
1129 		return NULL;
1130 	}
1131 
1132 	contexts = rdma_get_devices(NULL);
1133 	i = 0;
1134 	rc = 0;
1135 	while (contexts[i] != NULL) {
1136 		device = calloc(1, sizeof(*device));
1137 		if (!device) {
1138 			SPDK_ERRLOG("Unable to allocate memory for RDMA devices.\n");
1139 			rc = -ENOMEM;
1140 			break;
1141 		}
1142 		device->context = contexts[i];
1143 		rc = ibv_query_device(device->context, &device->attr);
1144 		if (rc < 0) {
1145 			SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
1146 			free(device);
1147 			break;
1148 
1149 		}
1150 
1151 		device->pd = NULL;
1152 		device->map = NULL;
1153 
1154 		TAILQ_INSERT_TAIL(&rtransport->devices, device, link);
1155 		i++;
1156 	}
1157 
1158 	if (rc < 0) {
1159 		TAILQ_FOREACH_SAFE(device, &rtransport->devices, link, tmp) {
1160 			TAILQ_REMOVE(&rtransport->devices, device, link);
1161 			free(device);
1162 		}
1163 		spdk_mempool_free(rtransport->data_buf_pool);
1164 		rdma_destroy_event_channel(rtransport->event_channel);
1165 		free(rtransport);
1166 		rdma_free_devices(contexts);
1167 		return NULL;
1168 	}
1169 
1170 	rdma_free_devices(contexts);
1171 
1172 	return &rtransport->transport;
1173 }
1174 
1175 static int
1176 spdk_nvmf_rdma_destroy(struct spdk_nvmf_transport *transport)
1177 {
1178 	struct spdk_nvmf_rdma_transport	*rtransport;
1179 	struct spdk_nvmf_rdma_device	*device, *device_tmp;
1180 
1181 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1182 
1183 	assert(TAILQ_EMPTY(&rtransport->ports));
1184 	if (rtransport->event_channel != NULL) {
1185 		rdma_destroy_event_channel(rtransport->event_channel);
1186 	}
1187 
1188 	TAILQ_FOREACH_SAFE(device, &rtransport->devices, link, device_tmp) {
1189 		TAILQ_REMOVE(&rtransport->devices, device, link);
1190 		if (device->map) {
1191 			spdk_mem_map_free(&device->map);
1192 		}
1193 		free(device);
1194 	}
1195 
1196 	spdk_mempool_free(rtransport->data_buf_pool);
1197 	free(rtransport);
1198 
1199 	return 0;
1200 }
1201 
1202 static int
1203 spdk_nvmf_rdma_listen(struct spdk_nvmf_transport *transport,
1204 		      const struct spdk_nvme_transport_id *trid)
1205 {
1206 	struct spdk_nvmf_rdma_transport *rtransport;
1207 	struct spdk_nvmf_rdma_device	*device;
1208 	struct spdk_nvmf_rdma_port 	*port_tmp, *port;
1209 	struct sockaddr_in saddr;
1210 	int rc;
1211 
1212 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1213 
1214 	port = calloc(1, sizeof(*port));
1215 	if (!port) {
1216 		return -ENOMEM;
1217 	}
1218 
1219 	/* Selectively copy the trid. Things like NQN don't matter here - that
1220 	 * mapping is enforced elsewhere.
1221 	 */
1222 	port->trid.trtype = SPDK_NVME_TRANSPORT_RDMA;
1223 	port->trid.adrfam = trid->adrfam;
1224 	snprintf(port->trid.traddr, sizeof(port->trid.traddr), "%s", trid->traddr);
1225 	snprintf(port->trid.trsvcid, sizeof(port->trid.trsvcid), "%s", trid->trsvcid);
1226 
1227 	pthread_mutex_lock(&rtransport->lock);
1228 	assert(rtransport->event_channel != NULL);
1229 	TAILQ_FOREACH(port_tmp, &rtransport->ports, link) {
1230 		if (spdk_nvme_transport_id_compare(&port_tmp->trid, &port->trid) == 0) {
1231 			port_tmp->ref++;
1232 			free(port);
1233 			/* Already listening at this address */
1234 			pthread_mutex_unlock(&rtransport->lock);
1235 			return 0;
1236 		}
1237 	}
1238 
1239 	rc = rdma_create_id(rtransport->event_channel, &port->id, port, RDMA_PS_TCP);
1240 	if (rc < 0) {
1241 		SPDK_ERRLOG("rdma_create_id() failed\n");
1242 		free(port);
1243 		pthread_mutex_unlock(&rtransport->lock);
1244 		return rc;
1245 	}
1246 
1247 	memset(&saddr, 0, sizeof(saddr));
1248 	saddr.sin_family = AF_INET;
1249 	saddr.sin_addr.s_addr = inet_addr(port->trid.traddr);
1250 	saddr.sin_port = htons((uint16_t)strtoul(port->trid.trsvcid, NULL, 10));
1251 	rc = rdma_bind_addr(port->id, (struct sockaddr *)&saddr);
1252 	if (rc < 0) {
1253 		SPDK_ERRLOG("rdma_bind_addr() failed\n");
1254 		rdma_destroy_id(port->id);
1255 		free(port);
1256 		pthread_mutex_unlock(&rtransport->lock);
1257 		return rc;
1258 	}
1259 
1260 	rc = rdma_listen(port->id, 10); /* 10 = backlog */
1261 	if (rc < 0) {
1262 		SPDK_ERRLOG("rdma_listen() failed\n");
1263 		rdma_destroy_id(port->id);
1264 		free(port);
1265 		pthread_mutex_unlock(&rtransport->lock);
1266 		return rc;
1267 	}
1268 
1269 	TAILQ_FOREACH(device, &rtransport->devices, link) {
1270 		if (device->context == port->id->verbs) {
1271 			port->device = device;
1272 			break;
1273 		}
1274 	}
1275 	if (!port->device) {
1276 		SPDK_ERRLOG("Accepted a connection with verbs %p, but unable to find a corresponding device.\n",
1277 			    port->id->verbs);
1278 		rdma_destroy_id(port->id);
1279 		free(port);
1280 		pthread_mutex_unlock(&rtransport->lock);
1281 		return -EINVAL;
1282 	}
1283 
1284 	if (!device->map) {
1285 		device->pd = port->id->pd;
1286 		device->map = spdk_mem_map_alloc(0, spdk_nvmf_rdma_mem_notify, device);
1287 		if (!device->map) {
1288 			SPDK_ERRLOG("Unable to allocate memory map for new poll group\n");
1289 			return -1;
1290 		}
1291 	} else {
1292 		assert(device->pd == port->id->pd);
1293 	}
1294 
1295 	SPDK_NOTICELOG("*** NVMf Target Listening on %s port %d ***\n",
1296 		       port->trid.traddr, ntohs(rdma_get_src_port(port->id)));
1297 
1298 	port->ref = 1;
1299 
1300 	TAILQ_INSERT_TAIL(&rtransport->ports, port, link);
1301 	pthread_mutex_unlock(&rtransport->lock);
1302 
1303 	return 0;
1304 }
1305 
1306 static int
1307 spdk_nvmf_rdma_stop_listen(struct spdk_nvmf_transport *transport,
1308 			   const struct spdk_nvme_transport_id *_trid)
1309 {
1310 	struct spdk_nvmf_rdma_transport *rtransport;
1311 	struct spdk_nvmf_rdma_port *port, *tmp;
1312 	struct spdk_nvme_transport_id trid = {};
1313 
1314 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1315 
1316 	/* Selectively copy the trid. Things like NQN don't matter here - that
1317 	 * mapping is enforced elsewhere.
1318 	 */
1319 	trid.trtype = SPDK_NVME_TRANSPORT_RDMA;
1320 	trid.adrfam = _trid->adrfam;
1321 	snprintf(trid.traddr, sizeof(port->trid.traddr), "%s", _trid->traddr);
1322 	snprintf(trid.trsvcid, sizeof(port->trid.trsvcid), "%s", _trid->trsvcid);
1323 
1324 	pthread_mutex_lock(&rtransport->lock);
1325 	TAILQ_FOREACH_SAFE(port, &rtransport->ports, link, tmp) {
1326 		if (spdk_nvme_transport_id_compare(&port->trid, &trid) == 0) {
1327 			assert(port->ref > 0);
1328 			port->ref--;
1329 			if (port->ref == 0) {
1330 				TAILQ_REMOVE(&rtransport->ports, port, link);
1331 				rdma_destroy_id(port->id);
1332 				free(port);
1333 			}
1334 			break;
1335 		}
1336 	}
1337 
1338 	pthread_mutex_unlock(&rtransport->lock);
1339 	return 0;
1340 }
1341 
1342 static int
1343 spdk_nvmf_rdma_poll(struct spdk_nvmf_qpair *qpair);
1344 
1345 static void
1346 spdk_nvmf_rdma_accept(struct spdk_nvmf_transport *transport)
1347 {
1348 	struct spdk_nvmf_rdma_transport *rtransport;
1349 	struct rdma_cm_event		*event;
1350 	int				rc;
1351 	struct spdk_nvmf_rdma_qpair	*rdma_qpair, *tmp;
1352 	char buf[64];
1353 
1354 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1355 
1356 	if (rtransport->event_channel == NULL) {
1357 		return;
1358 	}
1359 
1360 	/* Process pending connections for incoming capsules. The only capsule
1361 	 * this should ever find is a CONNECT request. */
1362 	TAILQ_FOREACH_SAFE(rdma_qpair, &g_pending_conns, link, tmp) {
1363 		rc = spdk_nvmf_rdma_poll(&rdma_qpair->qpair);
1364 		if (rc < 0) {
1365 			TAILQ_REMOVE(&g_pending_conns, rdma_qpair, link);
1366 			spdk_nvmf_rdma_qpair_destroy(rdma_qpair);
1367 		} else if (rc > 0) {
1368 			/* At least one request was processed which is assumed to be
1369 			 * a CONNECT. Remove this connection from our list. */
1370 			TAILQ_REMOVE(&g_pending_conns, rdma_qpair, link);
1371 		}
1372 	}
1373 
1374 	while (1) {
1375 		rc = rdma_get_cm_event(rtransport->event_channel, &event);
1376 		if (rc == 0) {
1377 			SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Acceptor Event: %s\n", CM_EVENT_STR[event->event]);
1378 
1379 			switch (event->event) {
1380 			case RDMA_CM_EVENT_CONNECT_REQUEST:
1381 				rc = nvmf_rdma_connect(transport, event);
1382 				if (rc < 0) {
1383 					SPDK_ERRLOG("Unable to process connect event. rc: %d\n", rc);
1384 					break;
1385 				}
1386 				break;
1387 			case RDMA_CM_EVENT_ESTABLISHED:
1388 				break;
1389 			case RDMA_CM_EVENT_ADDR_CHANGE:
1390 			case RDMA_CM_EVENT_DISCONNECTED:
1391 			case RDMA_CM_EVENT_DEVICE_REMOVAL:
1392 			case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1393 				rc = nvmf_rdma_disconnect(event);
1394 				if (rc < 0) {
1395 					SPDK_ERRLOG("Unable to process disconnect event. rc: %d\n", rc);
1396 					break;
1397 				}
1398 				continue;
1399 			default:
1400 				SPDK_ERRLOG("Unexpected Acceptor Event [%d]\n", event->event);
1401 				break;
1402 			}
1403 
1404 			rdma_ack_cm_event(event);
1405 		} else {
1406 			if (errno != EAGAIN && errno != EWOULDBLOCK) {
1407 				spdk_strerror_r(errno, buf, sizeof(buf));
1408 				SPDK_ERRLOG("Acceptor Event Error: %s\n", buf);
1409 			}
1410 			break;
1411 		}
1412 	}
1413 }
1414 
1415 static void
1416 spdk_nvmf_rdma_discover(struct spdk_nvmf_transport *transport,
1417 			struct spdk_nvmf_listen_addr *port,
1418 			struct spdk_nvmf_discovery_log_page_entry *entry)
1419 {
1420 	entry->trtype = SPDK_NVMF_TRTYPE_RDMA;
1421 	entry->adrfam = port->trid.adrfam;
1422 	entry->treq.secure_channel = SPDK_NVMF_TREQ_SECURE_CHANNEL_NOT_SPECIFIED;
1423 
1424 	spdk_strcpy_pad(entry->trsvcid, port->trid.trsvcid, sizeof(entry->trsvcid), ' ');
1425 	spdk_strcpy_pad(entry->traddr, port->trid.traddr, sizeof(entry->traddr), ' ');
1426 
1427 	entry->tsas.rdma.rdma_qptype = SPDK_NVMF_RDMA_QPTYPE_RELIABLE_CONNECTED;
1428 	entry->tsas.rdma.rdma_prtype = SPDK_NVMF_RDMA_PRTYPE_NONE;
1429 	entry->tsas.rdma.rdma_cms = SPDK_NVMF_RDMA_CMS_RDMA_CM;
1430 }
1431 
1432 static struct spdk_nvmf_poll_group *
1433 spdk_nvmf_rdma_poll_group_create(struct spdk_nvmf_transport *transport)
1434 {
1435 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1436 
1437 	rgroup = calloc(1, sizeof(*rgroup));
1438 	if (!rgroup) {
1439 		return NULL;
1440 	}
1441 
1442 	return &rgroup->group;
1443 }
1444 
1445 static void
1446 spdk_nvmf_rdma_poll_group_destroy(struct spdk_nvmf_poll_group *group)
1447 {
1448 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1449 
1450 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
1451 
1452 	if (!rgroup) {
1453 		return;
1454 	}
1455 
1456 	free(rgroup);
1457 }
1458 
1459 static int
1460 spdk_nvmf_rdma_poll_group_add(struct spdk_nvmf_poll_group *group,
1461 			      struct spdk_nvmf_qpair *qpair)
1462 {
1463 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1464 	struct spdk_nvmf_rdma_qpair 		*rdma_qpair;
1465 	struct spdk_nvmf_rdma_transport		*rtransport;
1466 	struct spdk_nvmf_rdma_device 		*device;
1467 
1468 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
1469 	rdma_qpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
1470 	rtransport = SPDK_CONTAINEROF(group->transport, struct spdk_nvmf_rdma_transport, transport);
1471 
1472 	if (rgroup->device != NULL) {
1473 		if (rgroup->device->context != rdma_qpair->cm_id->verbs) {
1474 			SPDK_ERRLOG("Attempted to add a qpair to a poll group with mismatched RDMA devices.\n");
1475 			return -1;
1476 		}
1477 
1478 		if (rgroup->device->pd != rdma_qpair->cm_id->pd) {
1479 			SPDK_ERRLOG("Mismatched protection domains\n");
1480 			return -1;
1481 		}
1482 
1483 		return 0;
1484 	}
1485 
1486 	TAILQ_FOREACH(device, &rtransport->devices, link) {
1487 		if (device->context == rdma_qpair->cm_id->verbs) {
1488 			break;
1489 		}
1490 	}
1491 	if (!device) {
1492 		SPDK_ERRLOG("Attempted to add a qpair with an unknown device\n");
1493 		return -EINVAL;
1494 	}
1495 
1496 	rgroup->device = device;
1497 
1498 	return 0;
1499 }
1500 
1501 static int
1502 spdk_nvmf_rdma_poll_group_remove(struct spdk_nvmf_poll_group *group,
1503 				 struct spdk_nvmf_qpair *qpair)
1504 {
1505 	return 0;
1506 }
1507 
1508 static int
1509 spdk_nvmf_rdma_request_complete(struct spdk_nvmf_request *req)
1510 {
1511 	struct spdk_nvmf_rdma_transport	*rtransport = SPDK_CONTAINEROF(req->qpair->transport,
1512 			struct spdk_nvmf_rdma_transport, transport);
1513 	struct spdk_nvmf_rdma_request	*rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
1514 
1515 	rdma_req->state = RDMA_REQUEST_STATE_EXECUTED;
1516 	spdk_nvmf_rdma_request_process(rtransport, rdma_req);
1517 
1518 	return 0;
1519 }
1520 
1521 static void
1522 spdk_nvmf_rdma_close_qpair(struct spdk_nvmf_qpair *qpair)
1523 {
1524 	spdk_nvmf_rdma_qpair_destroy(SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair));
1525 }
1526 
1527 static void
1528 spdk_nvmf_rdma_qpair_process_pending(struct spdk_nvmf_rdma_transport *rtransport,
1529 				     struct spdk_nvmf_rdma_qpair *rqpair)
1530 {
1531 	struct spdk_nvmf_rdma_recv	*rdma_recv, *recv_tmp;
1532 	struct spdk_nvmf_rdma_request	*rdma_req, *req_tmp;
1533 
1534 	/* We process I/O in the pending_rdma_rw queue at the highest priority. */
1535 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->pending_rdma_rw_queue, link, req_tmp) {
1536 		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
1537 			break;
1538 		}
1539 	}
1540 
1541 	/* The second highest priority is I/O waiting on memory buffers. */
1542 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->pending_data_buf_queue, link, req_tmp) {
1543 		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
1544 			break;
1545 		}
1546 	}
1547 
1548 	/* The lowest priority is processing newly received commands */
1549 	TAILQ_FOREACH_SAFE(rdma_recv, &rqpair->incoming_queue, link, recv_tmp) {
1550 		rdma_req = TAILQ_FIRST(&rqpair->free_queue);
1551 		if (rdma_req == NULL) {
1552 			/* Need to wait for more SEND completions */
1553 			break;
1554 		}
1555 
1556 		rdma_req->recv = rdma_recv;
1557 		rdma_req->state = RDMA_REQUEST_STATE_NEW;
1558 		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
1559 			break;
1560 		}
1561 	}
1562 }
1563 
1564 static struct spdk_nvmf_rdma_request *
1565 get_rdma_req_from_wc(struct spdk_nvmf_rdma_qpair *rdma_qpair,
1566 		     struct ibv_wc *wc)
1567 {
1568 	struct spdk_nvmf_rdma_request *rdma_req;
1569 
1570 	rdma_req = (struct spdk_nvmf_rdma_request *)wc->wr_id;
1571 	assert(rdma_req != NULL);
1572 	assert(rdma_req - rdma_qpair->reqs >= 0);
1573 	assert(rdma_req - rdma_qpair->reqs < (ptrdiff_t)rdma_qpair->max_queue_depth);
1574 
1575 	return rdma_req;
1576 }
1577 
1578 static struct spdk_nvmf_rdma_recv *
1579 get_rdma_recv_from_wc(struct spdk_nvmf_rdma_qpair *rdma_qpair,
1580 		      struct ibv_wc *wc)
1581 {
1582 	struct spdk_nvmf_rdma_recv *rdma_recv;
1583 
1584 	assert(wc->byte_len >= sizeof(struct spdk_nvmf_capsule_cmd));
1585 
1586 	rdma_recv = (struct spdk_nvmf_rdma_recv *)wc->wr_id;
1587 	assert(rdma_recv != NULL);
1588 	assert(rdma_recv - rdma_qpair->recvs >= 0);
1589 	assert(rdma_recv - rdma_qpair->recvs < (ptrdiff_t)rdma_qpair->max_queue_depth);
1590 
1591 	return rdma_recv;
1592 }
1593 
1594 static int
1595 spdk_nvmf_rdma_poll(struct spdk_nvmf_qpair *qpair)
1596 {
1597 	struct ibv_wc wc[32];
1598 	struct spdk_nvmf_rdma_transport *rtransport;
1599 	struct spdk_nvmf_rdma_qpair	*rdma_qpair;
1600 	struct spdk_nvmf_rdma_request	*rdma_req;
1601 	struct spdk_nvmf_rdma_recv	*rdma_recv;
1602 	int reaped, i;
1603 	int count = 0;
1604 	bool error = false;
1605 	char buf[64];
1606 
1607 	rtransport = SPDK_CONTAINEROF(qpair->transport, struct spdk_nvmf_rdma_transport, transport);
1608 	rdma_qpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
1609 
1610 	/* Poll for completing operations. */
1611 	reaped = ibv_poll_cq(rdma_qpair->cq, 32, wc);
1612 	if (reaped < 0) {
1613 		spdk_strerror_r(errno, buf, sizeof(buf));
1614 		SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
1615 			    errno, buf);
1616 		return -1;
1617 	}
1618 
1619 	for (i = 0; i < reaped; i++) {
1620 		if (wc[i].status) {
1621 			SPDK_ERRLOG("CQ error on CQ %p, Request 0x%lu (%d): %s\n",
1622 				    rdma_qpair->cq, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
1623 			error = true;
1624 			continue;
1625 		}
1626 
1627 		switch (wc[i].opcode) {
1628 		case IBV_WC_SEND:
1629 			rdma_req = get_rdma_req_from_wc(rdma_qpair, &wc[i]);
1630 
1631 			assert(rdma_req->state == RDMA_REQUEST_STATE_COMPLETING);
1632 			rdma_req->state = RDMA_REQUEST_STATE_COMPLETED;
1633 
1634 			spdk_nvmf_rdma_request_process(rtransport, rdma_req);
1635 
1636 			count++;
1637 
1638 			/* Try to process other queued requests */
1639 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rdma_qpair);
1640 			break;
1641 
1642 		case IBV_WC_RDMA_WRITE:
1643 			rdma_qpair->cur_rdma_rw_depth--;
1644 
1645 			/* Try to process other queued requests */
1646 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rdma_qpair);
1647 			break;
1648 
1649 		case IBV_WC_RDMA_READ:
1650 			rdma_req = get_rdma_req_from_wc(rdma_qpair, &wc[i]);
1651 
1652 			assert(rdma_req->state == RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER);
1653 			rdma_qpair->cur_rdma_rw_depth--;
1654 			rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
1655 
1656 			spdk_nvmf_rdma_request_process(rtransport, rdma_req);
1657 
1658 			/* Try to process other queued requests */
1659 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rdma_qpair);
1660 			break;
1661 
1662 		case IBV_WC_RECV:
1663 			rdma_recv = get_rdma_recv_from_wc(rdma_qpair, &wc[i]);
1664 
1665 			TAILQ_INSERT_TAIL(&rdma_qpair->incoming_queue, rdma_recv, link);
1666 
1667 			/* Try to process other queued requests */
1668 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rdma_qpair);
1669 			break;
1670 
1671 		default:
1672 			SPDK_ERRLOG("Received an unknown opcode on the CQ: %d\n", wc[i].opcode);
1673 			continue;
1674 		}
1675 	}
1676 
1677 	if (error == true) {
1678 		return -1;
1679 	}
1680 
1681 	return count;
1682 }
1683 
1684 static bool
1685 spdk_nvmf_rdma_qpair_is_idle(struct spdk_nvmf_qpair *qpair)
1686 {
1687 	struct spdk_nvmf_rdma_qpair *rdma_qpair;
1688 
1689 	rdma_qpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
1690 
1691 	if (rdma_qpair->cur_queue_depth == 0 && rdma_qpair->cur_rdma_rw_depth == 0) {
1692 		return true;
1693 	}
1694 	return false;
1695 }
1696 
1697 const struct spdk_nvmf_transport_ops spdk_nvmf_transport_rdma = {
1698 	.type = SPDK_NVME_TRANSPORT_RDMA,
1699 	.create = spdk_nvmf_rdma_create,
1700 	.destroy = spdk_nvmf_rdma_destroy,
1701 
1702 	.listen = spdk_nvmf_rdma_listen,
1703 	.stop_listen = spdk_nvmf_rdma_stop_listen,
1704 	.accept = spdk_nvmf_rdma_accept,
1705 
1706 	.listen_addr_discover = spdk_nvmf_rdma_discover,
1707 
1708 	.poll_group_create = spdk_nvmf_rdma_poll_group_create,
1709 	.poll_group_destroy = spdk_nvmf_rdma_poll_group_destroy,
1710 	.poll_group_add = spdk_nvmf_rdma_poll_group_add,
1711 	.poll_group_remove = spdk_nvmf_rdma_poll_group_remove,
1712 
1713 	.req_complete = spdk_nvmf_rdma_request_complete,
1714 
1715 	.qpair_fini = spdk_nvmf_rdma_close_qpair,
1716 	.qpair_poll = spdk_nvmf_rdma_poll,
1717 	.qpair_is_idle = spdk_nvmf_rdma_qpair_is_idle,
1718 
1719 };
1720 
1721 SPDK_LOG_REGISTER_TRACE_FLAG("rdma", SPDK_TRACE_RDMA)
1722