xref: /spdk/lib/nvmf/rdma.c (revision f86f10757912918b8ba7b4b3bfdab1cd4c2d180c)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include <infiniband/verbs.h>
37 #include <rdma/rdma_cma.h>
38 #include <rdma/rdma_verbs.h>
39 
40 #include "nvmf_internal.h"
41 #include "transport.h"
42 
43 #include "spdk/assert.h"
44 #include "spdk/io_channel.h"
45 #include "spdk/nvmf.h"
46 #include "spdk/nvmf_spec.h"
47 #include "spdk/string.h"
48 #include "spdk/trace.h"
49 #include "spdk/util.h"
50 
51 #include "spdk_internal/log.h"
52 
53 /*
54  RDMA Connection Resouce Defaults
55  */
56 #define NVMF_DEFAULT_TX_SGE		1
57 #define NVMF_DEFAULT_RX_SGE		2
58 
59 /* AIO backend requires block size aligned data buffers,
60  * extra 4KiB aligned data buffer should work for most devices.
61  */
62 #define SHIFT_4KB			12
63 #define NVMF_DATA_BUFFER_ALIGNMENT	(1 << SHIFT_4KB)
64 #define NVMF_DATA_BUFFER_MASK		(NVMF_DATA_BUFFER_ALIGNMENT - 1)
65 
66 enum spdk_nvmf_rdma_request_state {
67 	/* The request is not currently in use */
68 	RDMA_REQUEST_STATE_FREE = 0,
69 
70 	/* Initial state when request first received */
71 	RDMA_REQUEST_STATE_NEW,
72 
73 	/* The request is queued until a data buffer is available. */
74 	RDMA_REQUEST_STATE_NEED_BUFFER,
75 
76 	/* The request is waiting on RDMA queue depth availability
77 	 * to transfer data from the host to the controller.
78 	 */
79 	RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER,
80 
81 	/* The request is currently transferring data from the host to the controller. */
82 	RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER,
83 
84 	/* The request is ready to execute at the block device */
85 	RDMA_REQUEST_STATE_READY_TO_EXECUTE,
86 
87 	/* The request is currently executing at the block device */
88 	RDMA_REQUEST_STATE_EXECUTING,
89 
90 	/* The request finished executing at the block device */
91 	RDMA_REQUEST_STATE_EXECUTED,
92 
93 	/* The request is waiting on RDMA queue depth availability
94 	 * to transfer data from the controller to the host.
95 	 */
96 	RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST,
97 
98 	/* The request is ready to send a completion */
99 	RDMA_REQUEST_STATE_READY_TO_COMPLETE,
100 
101 	/* The request currently has a completion outstanding */
102 	RDMA_REQUEST_STATE_COMPLETING,
103 
104 	/* The request completed and can be marked free. */
105 	RDMA_REQUEST_STATE_COMPLETED,
106 };
107 
108 /* This structure holds commands as they are received off the wire.
109  * It must be dynamically paired with a full request object
110  * (spdk_nvmf_rdma_request) to service a request. It is separate
111  * from the request because RDMA does not appear to order
112  * completions, so occasionally we'll get a new incoming
113  * command when there aren't any free request objects.
114  */
115 struct spdk_nvmf_rdma_recv {
116 	struct ibv_recv_wr		wr;
117 	struct ibv_sge			sgl[NVMF_DEFAULT_RX_SGE];
118 
119 	/* In-capsule data buffer */
120 	uint8_t				*buf;
121 
122 	TAILQ_ENTRY(spdk_nvmf_rdma_recv) link;
123 };
124 
125 struct spdk_nvmf_rdma_request {
126 	struct spdk_nvmf_request		req;
127 	void					*data_from_pool;
128 
129 	enum spdk_nvmf_rdma_request_state	state;
130 
131 	struct spdk_nvmf_rdma_recv		*recv;
132 
133 	struct {
134 		struct	ibv_send_wr		wr;
135 		struct	ibv_sge			sgl[NVMF_DEFAULT_TX_SGE];
136 	} rsp;
137 
138 	struct {
139 		struct ibv_send_wr		wr;
140 		struct ibv_sge			sgl[NVMF_DEFAULT_TX_SGE];
141 	} data;
142 
143 	TAILQ_ENTRY(spdk_nvmf_rdma_request)	link;
144 };
145 
146 struct spdk_nvmf_rdma_qpair {
147 	struct spdk_nvmf_qpair			qpair;
148 
149 	struct spdk_nvmf_rdma_port		*port;
150 	struct spdk_nvmf_rdma_poller		*poller;
151 
152 	struct rdma_cm_id			*cm_id;
153 	struct ibv_cq				*cq;
154 
155 	/* The maximum number of I/O outstanding on this connection at one time */
156 	uint16_t				max_queue_depth;
157 
158 	/* The maximum number of active RDMA READ and WRITE operations at one time */
159 	uint16_t				max_rw_depth;
160 
161 	/* The current number of I/O outstanding on this connection. This number
162 	 * includes all I/O from the time the capsule is first received until it is
163 	 * completed.
164 	 */
165 	uint16_t				cur_queue_depth;
166 
167 	/* The number of RDMA READ and WRITE requests that are outstanding */
168 	uint16_t				cur_rdma_rw_depth;
169 
170 	/* Receives that are waiting for a request object */
171 	TAILQ_HEAD(, spdk_nvmf_rdma_recv)	incoming_queue;
172 
173 	/* Requests that are not in use */
174 	TAILQ_HEAD(, spdk_nvmf_rdma_request)	free_queue;
175 
176 	/* Requests that are waiting to perform an RDMA READ or WRITE */
177 	TAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_rdma_rw_queue;
178 
179 	/* Array of size "max_queue_depth" containing RDMA requests. */
180 	struct spdk_nvmf_rdma_request		*reqs;
181 
182 	/* Array of size "max_queue_depth" containing RDMA recvs. */
183 	struct spdk_nvmf_rdma_recv		*recvs;
184 
185 	/* Array of size "max_queue_depth" containing 64 byte capsules
186 	 * used for receive.
187 	 */
188 	union nvmf_h2c_msg			*cmds;
189 	struct ibv_mr				*cmds_mr;
190 
191 	/* Array of size "max_queue_depth" containing 16 byte completions
192 	 * to be sent back to the user.
193 	 */
194 	union nvmf_c2h_msg			*cpls;
195 	struct ibv_mr				*cpls_mr;
196 
197 	/* Array of size "max_queue_depth * InCapsuleDataSize" containing
198 	 * buffers to be used for in capsule data.
199 	 */
200 	void					*bufs;
201 	struct ibv_mr				*bufs_mr;
202 
203 	TAILQ_ENTRY(spdk_nvmf_rdma_qpair)	link;
204 	TAILQ_ENTRY(spdk_nvmf_rdma_qpair)	pending_link;
205 
206 	/* Mgmt channel */
207 	struct spdk_io_channel			*mgmt_channel;
208 	struct spdk_nvmf_rdma_mgmt_channel	*ch;
209 	struct spdk_thread                      *thread;
210 };
211 
212 struct spdk_nvmf_rdma_poller {
213 	struct spdk_nvmf_rdma_device		*device;
214 	struct spdk_nvmf_rdma_poll_group	*group;
215 
216 	TAILQ_HEAD(, spdk_nvmf_rdma_qpair)	qpairs;
217 
218 	TAILQ_ENTRY(spdk_nvmf_rdma_poller)	link;
219 };
220 
221 struct spdk_nvmf_rdma_poll_group {
222 	struct spdk_nvmf_transport_poll_group	group;
223 
224 	TAILQ_HEAD(, spdk_nvmf_rdma_poller)	pollers;
225 };
226 
227 /* Assuming rdma_cm uses just one protection domain per ibv_context. */
228 struct spdk_nvmf_rdma_device {
229 	struct ibv_device_attr			attr;
230 	struct ibv_context			*context;
231 
232 	struct spdk_mem_map			*map;
233 	struct ibv_pd				*pd;
234 
235 	TAILQ_ENTRY(spdk_nvmf_rdma_device)	link;
236 };
237 
238 struct spdk_nvmf_rdma_port {
239 	struct spdk_nvme_transport_id		trid;
240 	struct rdma_cm_id			*id;
241 	struct spdk_nvmf_rdma_device		*device;
242 	uint32_t				ref;
243 	TAILQ_ENTRY(spdk_nvmf_rdma_port)	link;
244 };
245 
246 struct spdk_nvmf_rdma_transport {
247 	struct spdk_nvmf_transport	transport;
248 
249 	struct rdma_event_channel	*event_channel;
250 
251 	struct spdk_mempool		*data_buf_pool;
252 
253 	pthread_mutex_t 		lock;
254 
255 	uint16_t 			max_queue_depth;
256 	uint32_t 			max_io_size;
257 	uint32_t 			in_capsule_data_size;
258 
259 	TAILQ_HEAD(, spdk_nvmf_rdma_device)	devices;
260 	TAILQ_HEAD(, spdk_nvmf_rdma_port)	ports;
261 };
262 
263 struct spdk_nvmf_rdma_mgmt_channel {
264 	/* Requests that are waiting to obtain a data buffer */
265 	TAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_data_buf_queue;
266 };
267 
268 static int
269 spdk_nvmf_rdma_mgmt_channel_create(void *io_device, void *ctx_buf)
270 {
271 	struct spdk_nvmf_rdma_mgmt_channel *ch = ctx_buf;
272 
273 	TAILQ_INIT(&ch->pending_data_buf_queue);
274 	return 0;
275 }
276 
277 static void
278 spdk_nvmf_rdma_mgmt_channel_destroy(void *io_device, void *ctx_buf)
279 {
280 	struct spdk_nvmf_rdma_mgmt_channel *ch = ctx_buf;
281 
282 	if (!TAILQ_EMPTY(&ch->pending_data_buf_queue)) {
283 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel destruction\n");
284 	}
285 }
286 
287 static void
288 spdk_nvmf_rdma_qpair_destroy(struct spdk_nvmf_rdma_qpair *rqpair)
289 {
290 	if (rqpair->poller) {
291 		TAILQ_REMOVE(&rqpair->poller->qpairs, rqpair, link);
292 	}
293 
294 	if (rqpair->cmds_mr) {
295 		ibv_dereg_mr(rqpair->cmds_mr);
296 	}
297 
298 	if (rqpair->cpls_mr) {
299 		ibv_dereg_mr(rqpair->cpls_mr);
300 	}
301 
302 	if (rqpair->bufs_mr) {
303 		ibv_dereg_mr(rqpair->bufs_mr);
304 	}
305 
306 	if (rqpair->cm_id) {
307 		rdma_destroy_qp(rqpair->cm_id);
308 		rdma_destroy_id(rqpair->cm_id);
309 	}
310 
311 	if (rqpair->cq) {
312 		ibv_destroy_cq(rqpair->cq);
313 	}
314 
315 	if (rqpair->mgmt_channel) {
316 		spdk_put_io_channel(rqpair->mgmt_channel);
317 	}
318 
319 	/* Free all memory */
320 	spdk_dma_free(rqpair->cmds);
321 	spdk_dma_free(rqpair->cpls);
322 	spdk_dma_free(rqpair->bufs);
323 	free(rqpair->reqs);
324 	free(rqpair->recvs);
325 	free(rqpair);
326 }
327 
328 static int
329 spdk_nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair)
330 {
331 	struct spdk_nvmf_rdma_transport *rtransport;
332 	struct spdk_nvmf_rdma_qpair	*rqpair;
333 	int				rc, i;
334 	struct ibv_qp_init_attr		attr;
335 	struct spdk_nvmf_rdma_recv	*rdma_recv;
336 	struct spdk_nvmf_rdma_request	*rdma_req;
337 	char buf[64];
338 
339 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
340 	rtransport = SPDK_CONTAINEROF(qpair->transport, struct spdk_nvmf_rdma_transport, transport);
341 
342 	rqpair->cq = ibv_create_cq(rqpair->cm_id->verbs, rqpair->max_queue_depth * 3, rqpair, NULL, 0);
343 	if (!rqpair->cq) {
344 		spdk_strerror_r(errno, buf, sizeof(buf));
345 		SPDK_ERRLOG("Unable to create completion queue\n");
346 		SPDK_ERRLOG("Errno %d: %s\n", errno, buf);
347 		rdma_destroy_id(rqpair->cm_id);
348 		spdk_nvmf_rdma_qpair_destroy(rqpair);
349 		return -1;
350 	}
351 
352 	memset(&attr, 0, sizeof(struct ibv_qp_init_attr));
353 	attr.qp_type		= IBV_QPT_RC;
354 	attr.send_cq		= rqpair->cq;
355 	attr.recv_cq		= rqpair->cq;
356 	attr.cap.max_send_wr	= rqpair->max_queue_depth * 2; /* SEND, READ, and WRITE operations */
357 	attr.cap.max_recv_wr	= rqpair->max_queue_depth; /* RECV operations */
358 	attr.cap.max_send_sge	= NVMF_DEFAULT_TX_SGE;
359 	attr.cap.max_recv_sge	= NVMF_DEFAULT_RX_SGE;
360 
361 	rc = rdma_create_qp(rqpair->cm_id, NULL, &attr);
362 	if (rc) {
363 		spdk_strerror_r(errno, buf, sizeof(buf));
364 		SPDK_ERRLOG("rdma_create_qp failed\n");
365 		SPDK_ERRLOG("Errno %d: %s\n", errno, buf);
366 		rdma_destroy_id(rqpair->cm_id);
367 		spdk_nvmf_rdma_qpair_destroy(rqpair);
368 		return -1;
369 	}
370 
371 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "New RDMA Connection: %p\n", qpair);
372 
373 	rqpair->reqs = calloc(rqpair->max_queue_depth, sizeof(*rqpair->reqs));
374 	rqpair->recvs = calloc(rqpair->max_queue_depth, sizeof(*rqpair->recvs));
375 	rqpair->cmds = spdk_dma_zmalloc(rqpair->max_queue_depth * sizeof(*rqpair->cmds),
376 					0x1000, NULL);
377 	rqpair->cpls = spdk_dma_zmalloc(rqpair->max_queue_depth * sizeof(*rqpair->cpls),
378 					0x1000, NULL);
379 	rqpair->bufs = spdk_dma_zmalloc(rqpair->max_queue_depth * rtransport->in_capsule_data_size,
380 					0x1000, NULL);
381 	if (!rqpair->reqs || !rqpair->recvs || !rqpair->cmds ||
382 	    !rqpair->cpls || !rqpair->bufs) {
383 		SPDK_ERRLOG("Unable to allocate sufficient memory for RDMA queue.\n");
384 		spdk_nvmf_rdma_qpair_destroy(rqpair);
385 		return -1;
386 	}
387 
388 	rqpair->cmds_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->cmds,
389 				     rqpair->max_queue_depth * sizeof(*rqpair->cmds),
390 				     IBV_ACCESS_LOCAL_WRITE);
391 	rqpair->cpls_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->cpls,
392 				     rqpair->max_queue_depth * sizeof(*rqpair->cpls),
393 				     0);
394 	rqpair->bufs_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->bufs,
395 				     rqpair->max_queue_depth * rtransport->in_capsule_data_size,
396 				     IBV_ACCESS_LOCAL_WRITE |
397 				     IBV_ACCESS_REMOTE_WRITE);
398 	if (!rqpair->cmds_mr || !rqpair->cpls_mr || !rqpair->bufs_mr) {
399 		SPDK_ERRLOG("Unable to register required memory for RDMA queue.\n");
400 		spdk_nvmf_rdma_qpair_destroy(rqpair);
401 		return -1;
402 	}
403 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Command Array: %p Length: %lx LKey: %x\n",
404 		      rqpair->cmds, rqpair->max_queue_depth * sizeof(*rqpair->cmds), rqpair->cmds_mr->lkey);
405 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Completion Array: %p Length: %lx LKey: %x\n",
406 		      rqpair->cpls, rqpair->max_queue_depth * sizeof(*rqpair->cpls), rqpair->cpls_mr->lkey);
407 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "In Capsule Data Array: %p Length: %x LKey: %x\n",
408 		      rqpair->bufs, rqpair->max_queue_depth * rtransport->in_capsule_data_size, rqpair->bufs_mr->lkey);
409 
410 	for (i = 0; i < rqpair->max_queue_depth; i++) {
411 		struct ibv_recv_wr *bad_wr = NULL;
412 
413 		rdma_recv = &rqpair->recvs[i];
414 
415 		/* Set up memory to receive commands */
416 		rdma_recv->buf = (void *)((uintptr_t)rqpair->bufs + (i * rtransport->in_capsule_data_size));
417 
418 		rdma_recv->sgl[0].addr = (uintptr_t)&rqpair->cmds[i];
419 		rdma_recv->sgl[0].length = sizeof(rqpair->cmds[i]);
420 		rdma_recv->sgl[0].lkey = rqpair->cmds_mr->lkey;
421 
422 		rdma_recv->sgl[1].addr = (uintptr_t)rdma_recv->buf;
423 		rdma_recv->sgl[1].length = rtransport->in_capsule_data_size;
424 		rdma_recv->sgl[1].lkey = rqpair->bufs_mr->lkey;
425 
426 		rdma_recv->wr.wr_id = (uintptr_t)rdma_recv;
427 		rdma_recv->wr.sg_list = rdma_recv->sgl;
428 		rdma_recv->wr.num_sge = SPDK_COUNTOF(rdma_recv->sgl);
429 
430 		rc = ibv_post_recv(rqpair->cm_id->qp, &rdma_recv->wr, &bad_wr);
431 		if (rc) {
432 			SPDK_ERRLOG("Unable to post capsule for RDMA RECV\n");
433 			spdk_nvmf_rdma_qpair_destroy(rqpair);
434 			return -1;
435 		}
436 	}
437 
438 	for (i = 0; i < rqpair->max_queue_depth; i++) {
439 		rdma_req = &rqpair->reqs[i];
440 
441 		rdma_req->req.qpair = &rqpair->qpair;
442 		rdma_req->req.cmd = NULL;
443 
444 		/* Set up memory to send responses */
445 		rdma_req->req.rsp = &rqpair->cpls[i];
446 
447 		rdma_req->rsp.sgl[0].addr = (uintptr_t)&rqpair->cpls[i];
448 		rdma_req->rsp.sgl[0].length = sizeof(rqpair->cpls[i]);
449 		rdma_req->rsp.sgl[0].lkey = rqpair->cpls_mr->lkey;
450 
451 		rdma_req->rsp.wr.wr_id = (uintptr_t)rdma_req;
452 		rdma_req->rsp.wr.next = NULL;
453 		rdma_req->rsp.wr.opcode = IBV_WR_SEND;
454 		rdma_req->rsp.wr.send_flags = IBV_SEND_SIGNALED;
455 		rdma_req->rsp.wr.sg_list = rdma_req->rsp.sgl;
456 		rdma_req->rsp.wr.num_sge = SPDK_COUNTOF(rdma_req->rsp.sgl);
457 
458 		/* Set up memory for data buffers */
459 		rdma_req->data.wr.wr_id = (uint64_t)rdma_req;
460 		rdma_req->data.wr.next = NULL;
461 		rdma_req->data.wr.send_flags = IBV_SEND_SIGNALED;
462 		rdma_req->data.wr.sg_list = rdma_req->data.sgl;
463 		rdma_req->data.wr.num_sge = SPDK_COUNTOF(rdma_req->data.sgl);
464 
465 		TAILQ_INSERT_TAIL(&rqpair->free_queue, rdma_req, link);
466 	}
467 
468 	return 0;
469 }
470 
471 static int
472 request_transfer_in(struct spdk_nvmf_request *req)
473 {
474 	int				rc;
475 	struct spdk_nvmf_rdma_request	*rdma_req;
476 	struct spdk_nvmf_qpair 		*qpair;
477 	struct spdk_nvmf_rdma_qpair 	*rqpair;
478 	struct ibv_send_wr		*bad_wr = NULL;
479 
480 	qpair = req->qpair;
481 	rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
482 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
483 
484 	assert(req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER);
485 
486 	rqpair->cur_rdma_rw_depth++;
487 
488 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "RDMA READ POSTED. Request: %p Connection: %p\n", req, qpair);
489 	spdk_trace_record(TRACE_RDMA_READ_START, 0, 0, (uintptr_t)req, 0);
490 
491 	rdma_req->data.wr.opcode = IBV_WR_RDMA_READ;
492 	rdma_req->data.wr.next = NULL;
493 	rc = ibv_post_send(rqpair->cm_id->qp, &rdma_req->data.wr, &bad_wr);
494 	if (rc) {
495 		SPDK_ERRLOG("Unable to transfer data from host to target\n");
496 		return -1;
497 	}
498 
499 	return 0;
500 }
501 
502 static int
503 request_transfer_out(struct spdk_nvmf_request *req)
504 {
505 	int 				rc;
506 	struct spdk_nvmf_rdma_request	*rdma_req;
507 	struct spdk_nvmf_qpair		*qpair;
508 	struct spdk_nvmf_rdma_qpair 	*rqpair;
509 	struct spdk_nvme_cpl		*rsp;
510 	struct ibv_recv_wr		*bad_recv_wr = NULL;
511 	struct ibv_send_wr		*send_wr, *bad_send_wr = NULL;
512 
513 	qpair = req->qpair;
514 	rsp = &req->rsp->nvme_cpl;
515 	rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
516 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
517 
518 	/* Advance our sq_head pointer */
519 	if (qpair->sq_head == qpair->sq_head_max) {
520 		qpair->sq_head = 0;
521 	} else {
522 		qpair->sq_head++;
523 	}
524 	rsp->sqhd = qpair->sq_head;
525 
526 	/* Post the capsule to the recv buffer */
527 	assert(rdma_req->recv != NULL);
528 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "RDMA RECV POSTED. Recv: %p Connection: %p\n", rdma_req->recv,
529 		      rqpair);
530 	rc = ibv_post_recv(rqpair->cm_id->qp, &rdma_req->recv->wr, &bad_recv_wr);
531 	if (rc) {
532 		SPDK_ERRLOG("Unable to re-post rx descriptor\n");
533 		return rc;
534 	}
535 	rdma_req->recv = NULL;
536 
537 	/* Build the response which consists of an optional
538 	 * RDMA WRITE to transfer data, plus an RDMA SEND
539 	 * containing the response.
540 	 */
541 	send_wr = &rdma_req->rsp.wr;
542 
543 	if (rsp->status.sc == SPDK_NVME_SC_SUCCESS &&
544 	    req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
545 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "RDMA WRITE POSTED. Request: %p Connection: %p\n", req, qpair);
546 		spdk_trace_record(TRACE_RDMA_WRITE_START, 0, 0, (uintptr_t)req, 0);
547 
548 		rqpair->cur_rdma_rw_depth++;
549 		rdma_req->data.wr.opcode = IBV_WR_RDMA_WRITE;
550 
551 		rdma_req->data.wr.next = send_wr;
552 		send_wr = &rdma_req->data.wr;
553 	}
554 
555 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "RDMA SEND POSTED. Request: %p Connection: %p\n", req, qpair);
556 	spdk_trace_record(TRACE_NVMF_IO_COMPLETE, 0, 0, (uintptr_t)req, 0);
557 
558 	/* Send the completion */
559 	rc = ibv_post_send(rqpair->cm_id->qp, send_wr, &bad_send_wr);
560 	if (rc) {
561 		SPDK_ERRLOG("Unable to send response capsule\n");
562 	}
563 
564 	return rc;
565 }
566 
567 static int
568 spdk_nvmf_rdma_event_accept(struct rdma_cm_id *id, struct spdk_nvmf_rdma_qpair *rqpair)
569 {
570 	struct spdk_nvmf_rdma_accept_private_data	accept_data;
571 	struct rdma_conn_param				ctrlr_event_data = {};
572 	int						rc;
573 
574 	accept_data.recfmt = 0;
575 	accept_data.crqsize = rqpair->max_queue_depth;
576 
577 	ctrlr_event_data.private_data = &accept_data;
578 	ctrlr_event_data.private_data_len = sizeof(accept_data);
579 	if (id->ps == RDMA_PS_TCP) {
580 		ctrlr_event_data.responder_resources = 0; /* We accept 0 reads from the host */
581 		ctrlr_event_data.initiator_depth = rqpair->max_rw_depth;
582 	}
583 
584 	rc = rdma_accept(id, &ctrlr_event_data);
585 	if (rc) {
586 		SPDK_ERRLOG("Error %d on rdma_accept\n", errno);
587 	} else {
588 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Sent back the accept\n");
589 	}
590 
591 	return rc;
592 }
593 
594 static void
595 spdk_nvmf_rdma_event_reject(struct rdma_cm_id *id, enum spdk_nvmf_rdma_transport_error error)
596 {
597 	struct spdk_nvmf_rdma_reject_private_data	rej_data;
598 
599 	rej_data.recfmt = 0;
600 	rej_data.sts = error;
601 
602 	rdma_reject(id, &rej_data, sizeof(rej_data));
603 }
604 
605 static int
606 nvmf_rdma_connect(struct spdk_nvmf_transport *transport, struct rdma_cm_event *event,
607 		  new_qpair_fn cb_fn)
608 {
609 	struct spdk_nvmf_rdma_transport *rtransport;
610 	struct spdk_nvmf_rdma_qpair	*rqpair = NULL;
611 	struct spdk_nvmf_rdma_port 	*port;
612 	struct rdma_conn_param		*rdma_param = NULL;
613 	const struct spdk_nvmf_rdma_request_private_data *private_data = NULL;
614 	uint16_t			max_queue_depth;
615 	uint16_t			max_rw_depth;
616 
617 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
618 
619 	assert(event->id != NULL); /* Impossible. Can't even reject the connection. */
620 	assert(event->id->verbs != NULL); /* Impossible. No way to handle this. */
621 
622 	rdma_param = &event->param.conn;
623 	if (rdma_param->private_data == NULL ||
624 	    rdma_param->private_data_len < sizeof(struct spdk_nvmf_rdma_request_private_data)) {
625 		SPDK_ERRLOG("connect request: no private data provided\n");
626 		spdk_nvmf_rdma_event_reject(event->id, SPDK_NVMF_RDMA_ERROR_INVALID_PRIVATE_DATA_LENGTH);
627 		return -1;
628 	}
629 
630 	private_data = rdma_param->private_data;
631 	if (private_data->recfmt != 0) {
632 		SPDK_ERRLOG("Received RDMA private data with RECFMT != 0\n");
633 		spdk_nvmf_rdma_event_reject(event->id, SPDK_NVMF_RDMA_ERROR_INVALID_RECFMT);
634 		return -1;
635 	}
636 
637 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Connect Recv on fabric intf name %s, dev_name %s\n",
638 		      event->id->verbs->device->name, event->id->verbs->device->dev_name);
639 
640 	port = event->listen_id->context;
641 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Listen Id was %p with verbs %p. ListenAddr: %p\n",
642 		      event->listen_id, event->listen_id->verbs, port);
643 
644 	/* Figure out the supported queue depth. This is a multi-step process
645 	 * that takes into account hardware maximums, host provided values,
646 	 * and our target's internal memory limits */
647 
648 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Calculating Queue Depth\n");
649 
650 	/* Start with the maximum queue depth allowed by the target */
651 	max_queue_depth = rtransport->max_queue_depth;
652 	max_rw_depth = rtransport->max_queue_depth;
653 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Target Max Queue Depth: %d\n", rtransport->max_queue_depth);
654 
655 	/* Next check the local NIC's hardware limitations */
656 	SPDK_DEBUGLOG(SPDK_LOG_RDMA,
657 		      "Local NIC Max Send/Recv Queue Depth: %d Max Read/Write Queue Depth: %d\n",
658 		      port->device->attr.max_qp_wr, port->device->attr.max_qp_rd_atom);
659 	max_queue_depth = spdk_min(max_queue_depth, port->device->attr.max_qp_wr);
660 	max_rw_depth = spdk_min(max_rw_depth, port->device->attr.max_qp_rd_atom);
661 
662 	/* Next check the remote NIC's hardware limitations */
663 	SPDK_DEBUGLOG(SPDK_LOG_RDMA,
664 		      "Host (Initiator) NIC Max Incoming RDMA R/W operations: %d Max Outgoing RDMA R/W operations: %d\n",
665 		      rdma_param->initiator_depth, rdma_param->responder_resources);
666 	if (rdma_param->initiator_depth > 0) {
667 		max_rw_depth = spdk_min(max_rw_depth, rdma_param->initiator_depth);
668 	}
669 
670 	/* Finally check for the host software requested values, which are
671 	 * optional. */
672 	if (rdma_param->private_data != NULL &&
673 	    rdma_param->private_data_len >= sizeof(struct spdk_nvmf_rdma_request_private_data)) {
674 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Host Receive Queue Size: %d\n", private_data->hrqsize);
675 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Host Send Queue Size: %d\n", private_data->hsqsize);
676 		max_queue_depth = spdk_min(max_queue_depth, private_data->hrqsize);
677 		max_queue_depth = spdk_min(max_queue_depth, private_data->hsqsize + 1);
678 	}
679 
680 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Final Negotiated Queue Depth: %d R/W Depth: %d\n",
681 		      max_queue_depth, max_rw_depth);
682 
683 	rqpair = calloc(1, sizeof(struct spdk_nvmf_rdma_qpair));
684 	if (rqpair == NULL) {
685 		SPDK_ERRLOG("Could not allocate new connection.\n");
686 		spdk_nvmf_rdma_event_reject(event->id, SPDK_NVMF_RDMA_ERROR_NO_RESOURCES);
687 		return -1;
688 	}
689 
690 	rqpair->port = port;
691 	rqpair->max_queue_depth = max_queue_depth;
692 	rqpair->max_rw_depth = max_rw_depth;
693 	rqpair->cm_id = event->id;
694 	rqpair->qpair.transport = transport;
695 	TAILQ_INIT(&rqpair->incoming_queue);
696 	TAILQ_INIT(&rqpair->free_queue);
697 	TAILQ_INIT(&rqpair->pending_rdma_rw_queue);
698 
699 	event->id->context = &rqpair->qpair;
700 
701 	cb_fn(&rqpair->qpair);
702 
703 	return 0;
704 }
705 
706 static void
707 nvmf_rdma_handle_disconnect(void *ctx)
708 {
709 	struct spdk_nvmf_qpair 		*qpair = ctx;
710 	struct spdk_nvmf_ctrlr		*ctrlr;
711 	struct spdk_nvmf_rdma_qpair 	*rqpair;
712 
713 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
714 
715 	ctrlr = qpair->ctrlr;
716 	if (ctrlr == NULL) {
717 		/* No ctrlr has been established yet, so destroy
718 		 * the connection.
719 		 */
720 		spdk_nvmf_rdma_qpair_destroy(rqpair);
721 		return;
722 	}
723 
724 	spdk_nvmf_ctrlr_disconnect(qpair);
725 }
726 
727 static int
728 nvmf_rdma_disconnect(struct rdma_cm_event *evt)
729 {
730 	struct spdk_nvmf_qpair	*qpair;
731 	struct spdk_io_channel 	*ch;
732 
733 	if (evt->id == NULL) {
734 		SPDK_ERRLOG("disconnect request: missing cm_id\n");
735 		return -1;
736 	}
737 
738 	qpair = evt->id->context;
739 	if (qpair == NULL) {
740 		SPDK_ERRLOG("disconnect request: no active connection\n");
741 		return -1;
742 	}
743 	/* ack the disconnect event before rdma_destroy_id */
744 	rdma_ack_cm_event(evt);
745 
746 	ch = spdk_io_channel_from_ctx(qpair->group);
747 	spdk_thread_send_msg(spdk_io_channel_get_thread(ch), nvmf_rdma_handle_disconnect, qpair);
748 
749 	return 0;
750 }
751 
752 #ifdef DEBUG
753 static const char *CM_EVENT_STR[] = {
754 	"RDMA_CM_EVENT_ADDR_RESOLVED",
755 	"RDMA_CM_EVENT_ADDR_ERROR",
756 	"RDMA_CM_EVENT_ROUTE_RESOLVED",
757 	"RDMA_CM_EVENT_ROUTE_ERROR",
758 	"RDMA_CM_EVENT_CONNECT_REQUEST",
759 	"RDMA_CM_EVENT_CONNECT_RESPONSE",
760 	"RDMA_CM_EVENT_CONNECT_ERROR",
761 	"RDMA_CM_EVENT_UNREACHABLE",
762 	"RDMA_CM_EVENT_REJECTED",
763 	"RDMA_CM_EVENT_ESTABLISHED",
764 	"RDMA_CM_EVENT_DISCONNECTED",
765 	"RDMA_CM_EVENT_DEVICE_REMOVAL",
766 	"RDMA_CM_EVENT_MULTICAST_JOIN",
767 	"RDMA_CM_EVENT_MULTICAST_ERROR",
768 	"RDMA_CM_EVENT_ADDR_CHANGE",
769 	"RDMA_CM_EVENT_TIMEWAIT_EXIT"
770 };
771 #endif /* DEBUG */
772 
773 static int
774 spdk_nvmf_rdma_mem_notify(void *cb_ctx, struct spdk_mem_map *map,
775 			  enum spdk_mem_map_notify_action action,
776 			  void *vaddr, size_t size)
777 {
778 	struct spdk_nvmf_rdma_device *device = cb_ctx;
779 	struct ibv_pd *pd = device->pd;
780 	struct ibv_mr *mr;
781 
782 	switch (action) {
783 	case SPDK_MEM_MAP_NOTIFY_REGISTER:
784 		mr = ibv_reg_mr(pd, vaddr, size,
785 				IBV_ACCESS_LOCAL_WRITE |
786 				IBV_ACCESS_REMOTE_READ |
787 				IBV_ACCESS_REMOTE_WRITE);
788 		if (mr == NULL) {
789 			SPDK_ERRLOG("ibv_reg_mr() failed\n");
790 			return -1;
791 		} else {
792 			spdk_mem_map_set_translation(map, (uint64_t)vaddr, size, (uint64_t)mr);
793 		}
794 		break;
795 	case SPDK_MEM_MAP_NOTIFY_UNREGISTER:
796 		mr = (struct ibv_mr *)spdk_mem_map_translate(map, (uint64_t)vaddr);
797 		spdk_mem_map_clear_translation(map, (uint64_t)vaddr, size);
798 		if (mr) {
799 			ibv_dereg_mr(mr);
800 		}
801 		break;
802 	}
803 
804 	return 0;
805 }
806 
807 typedef enum spdk_nvme_data_transfer spdk_nvme_data_transfer_t;
808 
809 static spdk_nvme_data_transfer_t
810 spdk_nvmf_rdma_request_get_xfer(struct spdk_nvmf_rdma_request *rdma_req)
811 {
812 	enum spdk_nvme_data_transfer xfer;
813 	struct spdk_nvme_cmd *cmd = &rdma_req->req.cmd->nvme_cmd;
814 	struct spdk_nvme_sgl_descriptor *sgl = &cmd->dptr.sgl1;
815 
816 	/* Figure out data transfer direction */
817 	if (cmd->opc == SPDK_NVME_OPC_FABRIC) {
818 		xfer = spdk_nvme_opc_get_data_transfer(rdma_req->req.cmd->nvmf_cmd.fctype);
819 	} else {
820 		xfer = spdk_nvme_opc_get_data_transfer(cmd->opc);
821 
822 		/* Some admin commands are special cases */
823 		if ((rdma_req->req.qpair->qid == 0) &&
824 		    ((cmd->opc == SPDK_NVME_OPC_GET_FEATURES) ||
825 		     (cmd->opc == SPDK_NVME_OPC_SET_FEATURES))) {
826 			switch (cmd->cdw10 & 0xff) {
827 			case SPDK_NVME_FEAT_LBA_RANGE_TYPE:
828 			case SPDK_NVME_FEAT_AUTONOMOUS_POWER_STATE_TRANSITION:
829 			case SPDK_NVME_FEAT_HOST_IDENTIFIER:
830 				break;
831 			default:
832 				xfer = SPDK_NVME_DATA_NONE;
833 			}
834 		}
835 	}
836 
837 	if (xfer == SPDK_NVME_DATA_NONE) {
838 		return xfer;
839 	}
840 
841 	/* Even for commands that may transfer data, they could have specified 0 length.
842 	 * We want those to show up with xfer SPDK_NVME_DATA_NONE.
843 	 */
844 	switch (sgl->generic.type) {
845 	case SPDK_NVME_SGL_TYPE_DATA_BLOCK:
846 	case SPDK_NVME_SGL_TYPE_BIT_BUCKET:
847 	case SPDK_NVME_SGL_TYPE_SEGMENT:
848 	case SPDK_NVME_SGL_TYPE_LAST_SEGMENT:
849 		if (sgl->unkeyed.length == 0) {
850 			xfer = SPDK_NVME_DATA_NONE;
851 		}
852 		break;
853 	case SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK:
854 		if (sgl->keyed.length == 0) {
855 			xfer = SPDK_NVME_DATA_NONE;
856 		}
857 		break;
858 	}
859 
860 	return xfer;
861 }
862 
863 static int
864 spdk_nvmf_rdma_request_parse_sgl(struct spdk_nvmf_rdma_transport *rtransport,
865 				 struct spdk_nvmf_rdma_device *device,
866 				 struct spdk_nvmf_rdma_request *rdma_req)
867 {
868 	struct spdk_nvme_cmd			*cmd;
869 	struct spdk_nvme_cpl			*rsp;
870 	struct spdk_nvme_sgl_descriptor		*sgl;
871 
872 	cmd = &rdma_req->req.cmd->nvme_cmd;
873 	rsp = &rdma_req->req.rsp->nvme_cpl;
874 	sgl = &cmd->dptr.sgl1;
875 
876 	if (sgl->generic.type == SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK &&
877 	    (sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_ADDRESS ||
878 	     sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_INVALIDATE_KEY)) {
879 		if (sgl->keyed.length > rtransport->max_io_size) {
880 			SPDK_ERRLOG("SGL length 0x%x exceeds max io size 0x%x\n",
881 				    sgl->keyed.length, rtransport->max_io_size);
882 			rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID;
883 			return -1;
884 		}
885 
886 		rdma_req->req.length = sgl->keyed.length;
887 		rdma_req->data_from_pool = spdk_mempool_get(rtransport->data_buf_pool);
888 		if (!rdma_req->data_from_pool) {
889 			/* No available buffers. Queue this request up. */
890 			SPDK_DEBUGLOG(SPDK_LOG_RDMA, "No available large data buffers. Queueing request %p\n", rdma_req);
891 			return 0;
892 		}
893 		/* AIO backend requires block size aligned data buffers,
894 		 * 4KiB aligned data buffer should work for most devices.
895 		 */
896 		rdma_req->req.data = (void *)((uintptr_t)(rdma_req->data_from_pool + NVMF_DATA_BUFFER_MASK)
897 					      & ~NVMF_DATA_BUFFER_MASK);
898 		rdma_req->data.sgl[0].addr = (uintptr_t)rdma_req->req.data;
899 		rdma_req->data.sgl[0].length = sgl->keyed.length;
900 		rdma_req->data.sgl[0].lkey = ((struct ibv_mr *)spdk_mem_map_translate(device->map,
901 					      (uint64_t)rdma_req->req.data))->lkey;
902 		rdma_req->data.wr.wr.rdma.rkey = sgl->keyed.key;
903 		rdma_req->data.wr.wr.rdma.remote_addr = sgl->address;
904 
905 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Request %p took buffer from central pool\n", rdma_req);
906 
907 		return 0;
908 	} else if (sgl->generic.type == SPDK_NVME_SGL_TYPE_DATA_BLOCK &&
909 		   sgl->unkeyed.subtype == SPDK_NVME_SGL_SUBTYPE_OFFSET) {
910 		uint64_t offset = sgl->address;
911 		uint32_t max_len = rtransport->in_capsule_data_size;
912 
913 		SPDK_DEBUGLOG(SPDK_LOG_NVMF, "In-capsule data: offset 0x%" PRIx64 ", length 0x%x\n",
914 			      offset, sgl->unkeyed.length);
915 
916 		if (offset > max_len) {
917 			SPDK_ERRLOG("In-capsule offset 0x%" PRIx64 " exceeds capsule length 0x%x\n",
918 				    offset, max_len);
919 			rsp->status.sc = SPDK_NVME_SC_INVALID_SGL_OFFSET;
920 			return -1;
921 		}
922 		max_len -= (uint32_t)offset;
923 
924 		if (sgl->unkeyed.length > max_len) {
925 			SPDK_ERRLOG("In-capsule data length 0x%x exceeds capsule length 0x%x\n",
926 				    sgl->unkeyed.length, max_len);
927 			rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID;
928 			return -1;
929 		}
930 
931 		rdma_req->req.data = rdma_req->recv->buf + offset;
932 		rdma_req->data_from_pool = NULL;
933 		rdma_req->req.length = sgl->unkeyed.length;
934 		return 0;
935 	}
936 
937 	SPDK_ERRLOG("Invalid NVMf I/O Command SGL:  Type 0x%x, Subtype 0x%x\n",
938 		    sgl->generic.type, sgl->generic.subtype);
939 	rsp->status.sc = SPDK_NVME_SC_SGL_DESCRIPTOR_TYPE_INVALID;
940 	return -1;
941 }
942 
943 static bool
944 spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
945 			       struct spdk_nvmf_rdma_request *rdma_req)
946 {
947 	struct spdk_nvmf_rdma_qpair	*rqpair;
948 	struct spdk_nvmf_rdma_device	*device;
949 	struct spdk_nvme_cpl		*rsp = &rdma_req->req.rsp->nvme_cpl;
950 	int				rc;
951 	struct spdk_nvmf_rdma_recv	*rdma_recv;
952 	enum spdk_nvmf_rdma_request_state prev_state;
953 	bool				progress = false;
954 
955 	rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
956 	device = rqpair->port->device;
957 
958 	assert(rdma_req->state != RDMA_REQUEST_STATE_FREE);
959 
960 	/* The loop here is to allow for several back-to-back state changes. */
961 	do {
962 		prev_state = rdma_req->state;
963 
964 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Request %p entering state %d\n", rdma_req, prev_state);
965 
966 		switch (rdma_req->state) {
967 		case RDMA_REQUEST_STATE_FREE:
968 			/* Some external code must kick a request into RDMA_REQUEST_STATE_NEW
969 			 * to escape this state. */
970 			break;
971 		case RDMA_REQUEST_STATE_NEW:
972 			rqpair->cur_queue_depth++;
973 			rdma_recv = rdma_req->recv;
974 
975 			/* The first element of the SGL is the NVMe command */
976 			rdma_req->req.cmd = (union nvmf_h2c_msg *)rdma_recv->sgl[0].addr;
977 			memset(rdma_req->req.rsp, 0, sizeof(*rdma_req->req.rsp));
978 
979 			TAILQ_REMOVE(&rqpair->incoming_queue, rdma_recv, link);
980 			TAILQ_REMOVE(&rqpair->free_queue, rdma_req, link);
981 
982 			/* The next state transition depends on the data transfer needs of this request. */
983 			rdma_req->req.xfer = spdk_nvmf_rdma_request_get_xfer(rdma_req);
984 
985 			/* If no data to transfer, ready to execute. */
986 			if (rdma_req->req.xfer == SPDK_NVME_DATA_NONE) {
987 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
988 				break;
989 			}
990 
991 			rdma_req->state = RDMA_REQUEST_STATE_NEED_BUFFER;
992 			TAILQ_INSERT_TAIL(&rqpair->ch->pending_data_buf_queue, rdma_req, link);
993 			break;
994 		case RDMA_REQUEST_STATE_NEED_BUFFER:
995 			assert(rdma_req->req.xfer != SPDK_NVME_DATA_NONE);
996 
997 			if (rdma_req != TAILQ_FIRST(&rqpair->ch->pending_data_buf_queue)) {
998 				/* This request needs to wait in line to obtain a buffer */
999 				break;
1000 			}
1001 
1002 			/* Try to get a data buffer */
1003 			rc = spdk_nvmf_rdma_request_parse_sgl(rtransport, device, rdma_req);
1004 			if (rc < 0) {
1005 				TAILQ_REMOVE(&rqpair->ch->pending_data_buf_queue, rdma_req, link);
1006 				rsp->status.sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1007 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1008 				break;
1009 			}
1010 
1011 			if (!rdma_req->req.data) {
1012 				/* No buffers available. */
1013 				break;
1014 			}
1015 
1016 			TAILQ_REMOVE(&rqpair->ch->pending_data_buf_queue, rdma_req, link);
1017 
1018 			/* If data is transferring from host to controller and the data didn't
1019 			 * arrive using in capsule data, we need to do a transfer from the host.
1020 			 */
1021 			if (rdma_req->req.xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER && rdma_req->data_from_pool != NULL) {
1022 				rdma_req->state = RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER;
1023 				TAILQ_INSERT_TAIL(&rqpair->pending_rdma_rw_queue, rdma_req, link);
1024 				break;
1025 			}
1026 
1027 			rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
1028 			break;
1029 		case RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER:
1030 			if (rdma_req != TAILQ_FIRST(&rqpair->pending_rdma_rw_queue)) {
1031 				/* This request needs to wait in line to perform RDMA */
1032 				break;
1033 			}
1034 
1035 			if (rqpair->cur_rdma_rw_depth < rqpair->max_rw_depth) {
1036 				TAILQ_REMOVE(&rqpair->pending_rdma_rw_queue, rdma_req, link);
1037 				rdma_req->state = RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER;
1038 				rc = request_transfer_in(&rdma_req->req);
1039 				if (rc) {
1040 					rsp->status.sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1041 					rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1042 				}
1043 			}
1044 			break;
1045 		case RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER:
1046 			/* Some external code must kick a request into RDMA_REQUEST_STATE_READY_TO_EXECUTE
1047 			 * to escape this state. */
1048 			break;
1049 		case RDMA_REQUEST_STATE_READY_TO_EXECUTE:
1050 			rdma_req->state = RDMA_REQUEST_STATE_EXECUTING;
1051 			spdk_nvmf_request_exec(&rdma_req->req);
1052 			break;
1053 		case RDMA_REQUEST_STATE_EXECUTING:
1054 			/* Some external code must kick a request into RDMA_REQUEST_STATE_EXECUTED
1055 			 * to escape this state. */
1056 			break;
1057 		case RDMA_REQUEST_STATE_EXECUTED:
1058 			if (rdma_req->req.xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
1059 				rdma_req->state = RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST;
1060 				TAILQ_INSERT_TAIL(&rqpair->pending_rdma_rw_queue, rdma_req, link);
1061 			} else {
1062 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1063 			}
1064 			break;
1065 		case RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST:
1066 			if (rdma_req != TAILQ_FIRST(&rqpair->pending_rdma_rw_queue)) {
1067 				/* This request needs to wait in line to perform RDMA */
1068 				break;
1069 			}
1070 
1071 			if (rqpair->cur_rdma_rw_depth < rqpair->max_rw_depth) {
1072 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1073 				TAILQ_REMOVE(&rqpair->pending_rdma_rw_queue, rdma_req, link);
1074 			}
1075 			break;
1076 		case RDMA_REQUEST_STATE_READY_TO_COMPLETE:
1077 			rdma_req->state = RDMA_REQUEST_STATE_COMPLETING;
1078 
1079 			rc = request_transfer_out(&rdma_req->req);
1080 			assert(rc == 0); /* No good way to handle this currently */
1081 			break;
1082 		case RDMA_REQUEST_STATE_COMPLETING:
1083 			/* Some external code must kick a request into RDMA_REQUEST_STATE_COMPLETED
1084 			 * to escape this state. */
1085 			break;
1086 		case RDMA_REQUEST_STATE_COMPLETED:
1087 			assert(rqpair->cur_queue_depth > 0);
1088 			rqpair->cur_queue_depth--;
1089 
1090 			if (rdma_req->data_from_pool) {
1091 				/* Put the buffer back in the pool */
1092 				spdk_mempool_put(rtransport->data_buf_pool, rdma_req->data_from_pool);
1093 				rdma_req->data_from_pool = NULL;
1094 			}
1095 			rdma_req->req.length = 0;
1096 			rdma_req->req.data = NULL;
1097 			rdma_req->state = RDMA_REQUEST_STATE_FREE;
1098 			TAILQ_INSERT_TAIL(&rqpair->free_queue, rdma_req, link);
1099 			break;
1100 		}
1101 
1102 		if (rdma_req->state != prev_state) {
1103 			progress = true;
1104 		}
1105 	} while (rdma_req->state != prev_state);
1106 
1107 	return progress;
1108 }
1109 
1110 /* Public API callbacks begin here */
1111 
1112 static struct spdk_nvmf_transport *
1113 spdk_nvmf_rdma_create(struct spdk_nvmf_tgt *tgt)
1114 {
1115 	int rc;
1116 	struct spdk_nvmf_rdma_transport *rtransport;
1117 	struct spdk_nvmf_rdma_device	*device, *tmp;
1118 	struct ibv_context		**contexts;
1119 	uint32_t			i;
1120 	char				buf[64];
1121 	int				flag;
1122 
1123 	rtransport = calloc(1, sizeof(*rtransport));
1124 	if (!rtransport) {
1125 		return NULL;
1126 	}
1127 
1128 	pthread_mutex_init(&rtransport->lock, NULL);
1129 	TAILQ_INIT(&rtransport->devices);
1130 	TAILQ_INIT(&rtransport->ports);
1131 
1132 	rtransport->transport.tgt = tgt;
1133 	rtransport->transport.ops = &spdk_nvmf_transport_rdma;
1134 
1135 	SPDK_NOTICELOG("*** RDMA Transport Init ***\n");
1136 
1137 	rtransport->max_queue_depth = tgt->opts.max_queue_depth;
1138 	rtransport->max_io_size = tgt->opts.max_io_size;
1139 	rtransport->in_capsule_data_size = tgt->opts.in_capsule_data_size;
1140 
1141 	rtransport->event_channel = rdma_create_event_channel();
1142 	if (rtransport->event_channel == NULL) {
1143 		spdk_strerror_r(errno, buf, sizeof(buf));
1144 		SPDK_ERRLOG("rdma_create_event_channel() failed, %s\n", buf);
1145 		free(rtransport);
1146 		return NULL;
1147 	}
1148 
1149 	flag = fcntl(rtransport->event_channel->fd, F_GETFL);
1150 	if (fcntl(rtransport->event_channel->fd, F_SETFL, flag | O_NONBLOCK) < 0) {
1151 		spdk_strerror_r(errno, buf, sizeof(buf));
1152 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
1153 			    rtransport->event_channel->fd, buf);
1154 		free(rtransport);
1155 		return NULL;
1156 	}
1157 
1158 	rtransport->data_buf_pool = spdk_mempool_create("spdk_nvmf_rdma",
1159 				    rtransport->max_queue_depth * 4, /* The 4 is arbitrarily chosen. Needs to be configurable. */
1160 				    rtransport->max_io_size + NVMF_DATA_BUFFER_ALIGNMENT,
1161 				    SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
1162 				    SPDK_ENV_SOCKET_ID_ANY);
1163 	if (!rtransport->data_buf_pool) {
1164 		SPDK_ERRLOG("Unable to allocate buffer pool for poll group\n");
1165 		free(rtransport);
1166 		return NULL;
1167 	}
1168 
1169 	spdk_io_device_register(rtransport, spdk_nvmf_rdma_mgmt_channel_create,
1170 				spdk_nvmf_rdma_mgmt_channel_destroy,
1171 				sizeof(struct spdk_nvmf_rdma_mgmt_channel));
1172 
1173 	contexts = rdma_get_devices(NULL);
1174 	i = 0;
1175 	rc = 0;
1176 	while (contexts[i] != NULL) {
1177 		device = calloc(1, sizeof(*device));
1178 		if (!device) {
1179 			SPDK_ERRLOG("Unable to allocate memory for RDMA devices.\n");
1180 			rc = -ENOMEM;
1181 			break;
1182 		}
1183 		device->context = contexts[i];
1184 		rc = ibv_query_device(device->context, &device->attr);
1185 		if (rc < 0) {
1186 			SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
1187 			free(device);
1188 			break;
1189 
1190 		}
1191 
1192 		device->pd = NULL;
1193 		device->map = NULL;
1194 
1195 		TAILQ_INSERT_TAIL(&rtransport->devices, device, link);
1196 		i++;
1197 	}
1198 
1199 	if (rc < 0) {
1200 		TAILQ_FOREACH_SAFE(device, &rtransport->devices, link, tmp) {
1201 			TAILQ_REMOVE(&rtransport->devices, device, link);
1202 			free(device);
1203 		}
1204 		spdk_mempool_free(rtransport->data_buf_pool);
1205 		rdma_destroy_event_channel(rtransport->event_channel);
1206 		free(rtransport);
1207 		rdma_free_devices(contexts);
1208 		return NULL;
1209 	}
1210 
1211 	rdma_free_devices(contexts);
1212 
1213 	return &rtransport->transport;
1214 }
1215 
1216 static int
1217 spdk_nvmf_rdma_destroy(struct spdk_nvmf_transport *transport)
1218 {
1219 	struct spdk_nvmf_rdma_transport	*rtransport;
1220 	struct spdk_nvmf_rdma_port	*port, *port_tmp;
1221 	struct spdk_nvmf_rdma_device	*device, *device_tmp;
1222 
1223 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1224 
1225 	TAILQ_FOREACH_SAFE(port, &rtransport->ports, link, port_tmp) {
1226 		TAILQ_REMOVE(&rtransport->ports, port, link);
1227 		rdma_destroy_id(port->id);
1228 		free(port);
1229 	}
1230 
1231 	if (rtransport->event_channel != NULL) {
1232 		rdma_destroy_event_channel(rtransport->event_channel);
1233 	}
1234 
1235 	TAILQ_FOREACH_SAFE(device, &rtransport->devices, link, device_tmp) {
1236 		TAILQ_REMOVE(&rtransport->devices, device, link);
1237 		if (device->map) {
1238 			spdk_mem_map_free(&device->map);
1239 		}
1240 		free(device);
1241 	}
1242 
1243 	if (spdk_mempool_count(rtransport->data_buf_pool) != (rtransport->max_queue_depth * 4)) {
1244 		SPDK_ERRLOG("transport buffer pool count is %zu but should be %u\n",
1245 			    spdk_mempool_count(rtransport->data_buf_pool),
1246 			    rtransport->max_queue_depth * 4);
1247 	}
1248 
1249 	spdk_mempool_free(rtransport->data_buf_pool);
1250 	spdk_io_device_unregister(rtransport, NULL);
1251 	free(rtransport);
1252 
1253 	return 0;
1254 }
1255 
1256 static int
1257 spdk_nvmf_rdma_listen(struct spdk_nvmf_transport *transport,
1258 		      const struct spdk_nvme_transport_id *trid)
1259 {
1260 	struct spdk_nvmf_rdma_transport *rtransport;
1261 	struct spdk_nvmf_rdma_device	*device;
1262 	struct spdk_nvmf_rdma_port 	*port_tmp, *port;
1263 	struct sockaddr_in saddr;
1264 	int rc;
1265 
1266 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1267 
1268 	port = calloc(1, sizeof(*port));
1269 	if (!port) {
1270 		return -ENOMEM;
1271 	}
1272 
1273 	/* Selectively copy the trid. Things like NQN don't matter here - that
1274 	 * mapping is enforced elsewhere.
1275 	 */
1276 	port->trid.trtype = SPDK_NVME_TRANSPORT_RDMA;
1277 	port->trid.adrfam = trid->adrfam;
1278 	snprintf(port->trid.traddr, sizeof(port->trid.traddr), "%s", trid->traddr);
1279 	snprintf(port->trid.trsvcid, sizeof(port->trid.trsvcid), "%s", trid->trsvcid);
1280 
1281 	pthread_mutex_lock(&rtransport->lock);
1282 	assert(rtransport->event_channel != NULL);
1283 	TAILQ_FOREACH(port_tmp, &rtransport->ports, link) {
1284 		if (spdk_nvme_transport_id_compare(&port_tmp->trid, &port->trid) == 0) {
1285 			port_tmp->ref++;
1286 			free(port);
1287 			/* Already listening at this address */
1288 			pthread_mutex_unlock(&rtransport->lock);
1289 			return 0;
1290 		}
1291 	}
1292 
1293 	rc = rdma_create_id(rtransport->event_channel, &port->id, port, RDMA_PS_TCP);
1294 	if (rc < 0) {
1295 		SPDK_ERRLOG("rdma_create_id() failed\n");
1296 		free(port);
1297 		pthread_mutex_unlock(&rtransport->lock);
1298 		return rc;
1299 	}
1300 
1301 	memset(&saddr, 0, sizeof(saddr));
1302 	saddr.sin_family = AF_INET;
1303 	saddr.sin_addr.s_addr = inet_addr(port->trid.traddr);
1304 	saddr.sin_port = htons((uint16_t)strtoul(port->trid.trsvcid, NULL, 10));
1305 	rc = rdma_bind_addr(port->id, (struct sockaddr *)&saddr);
1306 	if (rc < 0) {
1307 		SPDK_ERRLOG("rdma_bind_addr() failed\n");
1308 		rdma_destroy_id(port->id);
1309 		free(port);
1310 		pthread_mutex_unlock(&rtransport->lock);
1311 		return rc;
1312 	}
1313 
1314 	rc = rdma_listen(port->id, 10); /* 10 = backlog */
1315 	if (rc < 0) {
1316 		SPDK_ERRLOG("rdma_listen() failed\n");
1317 		rdma_destroy_id(port->id);
1318 		free(port);
1319 		pthread_mutex_unlock(&rtransport->lock);
1320 		return rc;
1321 	}
1322 
1323 	TAILQ_FOREACH(device, &rtransport->devices, link) {
1324 		if (device->context == port->id->verbs) {
1325 			port->device = device;
1326 			break;
1327 		}
1328 	}
1329 	if (!port->device) {
1330 		SPDK_ERRLOG("Accepted a connection with verbs %p, but unable to find a corresponding device.\n",
1331 			    port->id->verbs);
1332 		rdma_destroy_id(port->id);
1333 		free(port);
1334 		pthread_mutex_unlock(&rtransport->lock);
1335 		return -EINVAL;
1336 	}
1337 
1338 	if (!device->map) {
1339 		device->pd = port->id->pd;
1340 		device->map = spdk_mem_map_alloc(0, spdk_nvmf_rdma_mem_notify, device);
1341 		if (!device->map) {
1342 			SPDK_ERRLOG("Unable to allocate memory map for new poll group\n");
1343 			return -1;
1344 		}
1345 	} else {
1346 		assert(device->pd == port->id->pd);
1347 	}
1348 
1349 	SPDK_NOTICELOG("*** NVMf Target Listening on %s port %d ***\n",
1350 		       port->trid.traddr, ntohs(rdma_get_src_port(port->id)));
1351 
1352 	port->ref = 1;
1353 
1354 	TAILQ_INSERT_TAIL(&rtransport->ports, port, link);
1355 	pthread_mutex_unlock(&rtransport->lock);
1356 
1357 	return 0;
1358 }
1359 
1360 static int
1361 spdk_nvmf_rdma_stop_listen(struct spdk_nvmf_transport *transport,
1362 			   const struct spdk_nvme_transport_id *_trid)
1363 {
1364 	struct spdk_nvmf_rdma_transport *rtransport;
1365 	struct spdk_nvmf_rdma_port *port, *tmp;
1366 	struct spdk_nvme_transport_id trid = {};
1367 
1368 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1369 
1370 	/* Selectively copy the trid. Things like NQN don't matter here - that
1371 	 * mapping is enforced elsewhere.
1372 	 */
1373 	trid.trtype = SPDK_NVME_TRANSPORT_RDMA;
1374 	trid.adrfam = _trid->adrfam;
1375 	snprintf(trid.traddr, sizeof(port->trid.traddr), "%s", _trid->traddr);
1376 	snprintf(trid.trsvcid, sizeof(port->trid.trsvcid), "%s", _trid->trsvcid);
1377 
1378 	pthread_mutex_lock(&rtransport->lock);
1379 	TAILQ_FOREACH_SAFE(port, &rtransport->ports, link, tmp) {
1380 		if (spdk_nvme_transport_id_compare(&port->trid, &trid) == 0) {
1381 			assert(port->ref > 0);
1382 			port->ref--;
1383 			if (port->ref == 0) {
1384 				TAILQ_REMOVE(&rtransport->ports, port, link);
1385 				rdma_destroy_id(port->id);
1386 				free(port);
1387 			}
1388 			break;
1389 		}
1390 	}
1391 
1392 	pthread_mutex_unlock(&rtransport->lock);
1393 	return 0;
1394 }
1395 
1396 static int
1397 spdk_nvmf_rdma_qpair_poll(struct spdk_nvmf_rdma_transport *rtransport,
1398 			  struct spdk_nvmf_rdma_qpair *rqpair);
1399 
1400 static void
1401 spdk_nvmf_rdma_accept(struct spdk_nvmf_transport *transport, new_qpair_fn cb_fn)
1402 {
1403 	struct spdk_nvmf_rdma_transport *rtransport;
1404 	struct rdma_cm_event		*event;
1405 	int				rc;
1406 	char buf[64];
1407 
1408 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1409 
1410 	if (rtransport->event_channel == NULL) {
1411 		return;
1412 	}
1413 
1414 	while (1) {
1415 		rc = rdma_get_cm_event(rtransport->event_channel, &event);
1416 		if (rc == 0) {
1417 			SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Acceptor Event: %s\n", CM_EVENT_STR[event->event]);
1418 
1419 			switch (event->event) {
1420 			case RDMA_CM_EVENT_CONNECT_REQUEST:
1421 				rc = nvmf_rdma_connect(transport, event, cb_fn);
1422 				if (rc < 0) {
1423 					SPDK_ERRLOG("Unable to process connect event. rc: %d\n", rc);
1424 					break;
1425 				}
1426 				break;
1427 			case RDMA_CM_EVENT_ESTABLISHED:
1428 				break;
1429 			case RDMA_CM_EVENT_ADDR_CHANGE:
1430 			case RDMA_CM_EVENT_DISCONNECTED:
1431 			case RDMA_CM_EVENT_DEVICE_REMOVAL:
1432 			case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1433 				rc = nvmf_rdma_disconnect(event);
1434 				if (rc < 0) {
1435 					SPDK_ERRLOG("Unable to process disconnect event. rc: %d\n", rc);
1436 					break;
1437 				}
1438 				continue;
1439 			default:
1440 				SPDK_ERRLOG("Unexpected Acceptor Event [%d]\n", event->event);
1441 				break;
1442 			}
1443 
1444 			rdma_ack_cm_event(event);
1445 		} else {
1446 			if (errno != EAGAIN && errno != EWOULDBLOCK) {
1447 				spdk_strerror_r(errno, buf, sizeof(buf));
1448 				SPDK_ERRLOG("Acceptor Event Error: %s\n", buf);
1449 			}
1450 			break;
1451 		}
1452 	}
1453 }
1454 
1455 static void
1456 spdk_nvmf_rdma_discover(struct spdk_nvmf_transport *transport,
1457 			struct spdk_nvme_transport_id *trid,
1458 			struct spdk_nvmf_discovery_log_page_entry *entry)
1459 {
1460 	entry->trtype = SPDK_NVMF_TRTYPE_RDMA;
1461 	entry->adrfam = trid->adrfam;
1462 	entry->treq.secure_channel = SPDK_NVMF_TREQ_SECURE_CHANNEL_NOT_SPECIFIED;
1463 
1464 	spdk_strcpy_pad(entry->trsvcid, trid->trsvcid, sizeof(entry->trsvcid), ' ');
1465 	spdk_strcpy_pad(entry->traddr, trid->traddr, sizeof(entry->traddr), ' ');
1466 
1467 	entry->tsas.rdma.rdma_qptype = SPDK_NVMF_RDMA_QPTYPE_RELIABLE_CONNECTED;
1468 	entry->tsas.rdma.rdma_prtype = SPDK_NVMF_RDMA_PRTYPE_NONE;
1469 	entry->tsas.rdma.rdma_cms = SPDK_NVMF_RDMA_CMS_RDMA_CM;
1470 }
1471 
1472 static struct spdk_nvmf_transport_poll_group *
1473 spdk_nvmf_rdma_poll_group_create(struct spdk_nvmf_transport *transport)
1474 {
1475 	struct spdk_nvmf_rdma_transport		*rtransport;
1476 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1477 	struct spdk_nvmf_rdma_poller		*poller;
1478 	struct spdk_nvmf_rdma_device		*device;
1479 
1480 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1481 
1482 	rgroup = calloc(1, sizeof(*rgroup));
1483 	if (!rgroup) {
1484 		return NULL;
1485 	}
1486 
1487 	TAILQ_INIT(&rgroup->pollers);
1488 
1489 	pthread_mutex_lock(&rtransport->lock);
1490 	TAILQ_FOREACH(device, &rtransport->devices, link) {
1491 		if (device->map == NULL) {
1492 			/*
1493 			 * The device is not in use (no listeners),
1494 			 * so no protection domain has been constructed.
1495 			 * Skip it.
1496 			 */
1497 			SPDK_NOTICELOG("Skipping unused RDMA device when creating poll group.\n");
1498 			continue;
1499 		}
1500 
1501 		poller = calloc(1, sizeof(*poller));
1502 		if (!poller) {
1503 			SPDK_ERRLOG("Unable to allocate memory for new RDMA poller\n");
1504 			free(rgroup);
1505 			pthread_mutex_unlock(&rtransport->lock);
1506 			return NULL;
1507 		}
1508 
1509 		poller->device = device;
1510 		poller->group = rgroup;
1511 
1512 		TAILQ_INIT(&poller->qpairs);
1513 
1514 		TAILQ_INSERT_TAIL(&rgroup->pollers, poller, link);
1515 	}
1516 
1517 	pthread_mutex_unlock(&rtransport->lock);
1518 	return &rgroup->group;
1519 }
1520 
1521 static void
1522 spdk_nvmf_rdma_poll_group_destroy(struct spdk_nvmf_transport_poll_group *group)
1523 {
1524 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1525 	struct spdk_nvmf_rdma_poller		*poller, *tmp;
1526 
1527 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
1528 
1529 	if (!rgroup) {
1530 		return;
1531 	}
1532 
1533 	TAILQ_FOREACH_SAFE(poller, &rgroup->pollers, link, tmp) {
1534 		TAILQ_REMOVE(&rgroup->pollers, poller, link);
1535 		free(poller);
1536 	}
1537 
1538 	free(rgroup);
1539 }
1540 
1541 static int
1542 spdk_nvmf_rdma_poll_group_add(struct spdk_nvmf_transport_poll_group *group,
1543 			      struct spdk_nvmf_qpair *qpair)
1544 {
1545 	struct spdk_nvmf_rdma_transport		*rtransport;
1546 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1547 	struct spdk_nvmf_rdma_qpair 		*rqpair;
1548 	struct spdk_nvmf_rdma_device 		*device;
1549 	struct spdk_nvmf_rdma_poller		*poller;
1550 	int					rc;
1551 
1552 	rtransport = SPDK_CONTAINEROF(qpair->transport, struct spdk_nvmf_rdma_transport, transport);
1553 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
1554 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
1555 
1556 	device = rqpair->port->device;
1557 
1558 	if (device->pd != rqpair->cm_id->pd) {
1559 		SPDK_ERRLOG("Mismatched protection domains\n");
1560 		return -1;
1561 	}
1562 
1563 	TAILQ_FOREACH(poller, &rgroup->pollers, link) {
1564 		if (poller->device == device) {
1565 			break;
1566 		}
1567 	}
1568 
1569 	if (!poller) {
1570 		SPDK_ERRLOG("No poller found for device.\n");
1571 		return -1;
1572 	}
1573 
1574 	TAILQ_INSERT_TAIL(&poller->qpairs, rqpair, link);
1575 	rqpair->poller = poller;
1576 
1577 	spdk_nvmf_rdma_qpair_initialize(qpair);
1578 
1579 	rqpair->mgmt_channel = spdk_get_io_channel(rtransport);
1580 	if (!rqpair->mgmt_channel) {
1581 		spdk_nvmf_rdma_event_reject(rqpair->cm_id, SPDK_NVMF_RDMA_ERROR_NO_RESOURCES);
1582 		spdk_nvmf_rdma_qpair_destroy(rqpair);
1583 		return -1;
1584 	}
1585 
1586 	rqpair->ch = spdk_io_channel_get_ctx(rqpair->mgmt_channel);
1587 	assert(rqpair->ch != NULL);
1588 
1589 	rc = spdk_nvmf_rdma_event_accept(rqpair->cm_id, rqpair);
1590 	if (rc) {
1591 		/* Try to reject, but we probably can't */
1592 		spdk_nvmf_rdma_event_reject(rqpair->cm_id, SPDK_NVMF_RDMA_ERROR_NO_RESOURCES);
1593 		spdk_nvmf_rdma_qpair_destroy(rqpair);
1594 		return -1;
1595 	}
1596 
1597 	return 0;
1598 }
1599 
1600 static int
1601 spdk_nvmf_rdma_poll_group_remove(struct spdk_nvmf_transport_poll_group *group,
1602 				 struct spdk_nvmf_qpair *qpair)
1603 {
1604 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1605 	struct spdk_nvmf_rdma_qpair 		*rqpair;
1606 	struct spdk_nvmf_rdma_device 		*device;
1607 	struct spdk_nvmf_rdma_poller		*poller;
1608 	struct spdk_nvmf_rdma_qpair		*rq, *trq;
1609 
1610 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
1611 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
1612 
1613 	device = rqpair->port->device;
1614 
1615 	TAILQ_FOREACH(poller, &rgroup->pollers, link) {
1616 		if (poller->device == device) {
1617 			break;
1618 		}
1619 	}
1620 
1621 	if (!poller) {
1622 		SPDK_ERRLOG("No poller found for device.\n");
1623 		return -1;
1624 	}
1625 
1626 	TAILQ_FOREACH_SAFE(rq, &poller->qpairs, link, trq) {
1627 		if (rq == rqpair) {
1628 			TAILQ_REMOVE(&poller->qpairs, rqpair, link);
1629 			break;
1630 		}
1631 	}
1632 
1633 	if (rq == NULL) {
1634 		SPDK_ERRLOG("RDMA qpair cannot be removed from group (not in group).\n");
1635 		return -1;
1636 	}
1637 
1638 	return 0;
1639 }
1640 
1641 static int
1642 spdk_nvmf_rdma_request_complete(struct spdk_nvmf_request *req)
1643 {
1644 	struct spdk_nvmf_rdma_transport	*rtransport = SPDK_CONTAINEROF(req->qpair->transport,
1645 			struct spdk_nvmf_rdma_transport, transport);
1646 	struct spdk_nvmf_rdma_request	*rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
1647 
1648 	rdma_req->state = RDMA_REQUEST_STATE_EXECUTED;
1649 	spdk_nvmf_rdma_request_process(rtransport, rdma_req);
1650 
1651 	return 0;
1652 }
1653 
1654 static void
1655 spdk_nvmf_rdma_close_qpair(struct spdk_nvmf_qpair *qpair)
1656 {
1657 	spdk_nvmf_rdma_qpair_destroy(SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair));
1658 }
1659 
1660 static void
1661 spdk_nvmf_rdma_qpair_process_pending(struct spdk_nvmf_rdma_transport *rtransport,
1662 				     struct spdk_nvmf_rdma_qpair *rqpair)
1663 {
1664 	struct spdk_nvmf_rdma_recv	*rdma_recv, *recv_tmp;
1665 	struct spdk_nvmf_rdma_request	*rdma_req, *req_tmp;
1666 
1667 	/* We process I/O in the pending_rdma_rw queue at the highest priority. */
1668 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->pending_rdma_rw_queue, link, req_tmp) {
1669 		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
1670 			break;
1671 		}
1672 	}
1673 
1674 	/* The second highest priority is I/O waiting on memory buffers. */
1675 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->ch->pending_data_buf_queue, link, req_tmp) {
1676 		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
1677 			break;
1678 		}
1679 	}
1680 
1681 	/* The lowest priority is processing newly received commands */
1682 	TAILQ_FOREACH_SAFE(rdma_recv, &rqpair->incoming_queue, link, recv_tmp) {
1683 		rdma_req = TAILQ_FIRST(&rqpair->free_queue);
1684 		if (rdma_req == NULL) {
1685 			/* Need to wait for more SEND completions */
1686 			break;
1687 		}
1688 
1689 		rdma_req->recv = rdma_recv;
1690 		rdma_req->state = RDMA_REQUEST_STATE_NEW;
1691 		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
1692 			break;
1693 		}
1694 	}
1695 }
1696 
1697 static struct spdk_nvmf_rdma_request *
1698 get_rdma_req_from_wc(struct spdk_nvmf_rdma_qpair *rqpair,
1699 		     struct ibv_wc *wc)
1700 {
1701 	struct spdk_nvmf_rdma_request *rdma_req;
1702 
1703 	rdma_req = (struct spdk_nvmf_rdma_request *)wc->wr_id;
1704 	assert(rdma_req != NULL);
1705 	assert(rdma_req - rqpair->reqs >= 0);
1706 	assert(rdma_req - rqpair->reqs < (ptrdiff_t)rqpair->max_queue_depth);
1707 
1708 	return rdma_req;
1709 }
1710 
1711 static struct spdk_nvmf_rdma_recv *
1712 get_rdma_recv_from_wc(struct spdk_nvmf_rdma_qpair *rqpair,
1713 		      struct ibv_wc *wc)
1714 {
1715 	struct spdk_nvmf_rdma_recv *rdma_recv;
1716 
1717 	assert(wc->byte_len >= sizeof(struct spdk_nvmf_capsule_cmd));
1718 
1719 	rdma_recv = (struct spdk_nvmf_rdma_recv *)wc->wr_id;
1720 	assert(rdma_recv != NULL);
1721 	assert(rdma_recv - rqpair->recvs >= 0);
1722 	assert(rdma_recv - rqpair->recvs < (ptrdiff_t)rqpair->max_queue_depth);
1723 
1724 	return rdma_recv;
1725 }
1726 
1727 static int
1728 spdk_nvmf_rdma_qpair_poll(struct spdk_nvmf_rdma_transport *rtransport,
1729 			  struct spdk_nvmf_rdma_qpair *rqpair)
1730 {
1731 	struct ibv_wc wc[32];
1732 	struct spdk_nvmf_rdma_request	*rdma_req;
1733 	struct spdk_nvmf_rdma_recv	*rdma_recv;
1734 	int reaped, i;
1735 	int count = 0;
1736 	bool error = false;
1737 	char buf[64];
1738 
1739 	/* Poll for completing operations. */
1740 	reaped = ibv_poll_cq(rqpair->cq, 32, wc);
1741 	if (reaped < 0) {
1742 		spdk_strerror_r(errno, buf, sizeof(buf));
1743 		SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
1744 			    errno, buf);
1745 		return -1;
1746 	}
1747 
1748 	for (i = 0; i < reaped; i++) {
1749 		if (wc[i].status) {
1750 			SPDK_ERRLOG("CQ error on CQ %p, Request 0x%lu (%d): %s\n",
1751 				    rqpair->cq, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
1752 			error = true;
1753 			continue;
1754 		}
1755 
1756 		switch (wc[i].opcode) {
1757 		case IBV_WC_SEND:
1758 			rdma_req = get_rdma_req_from_wc(rqpair, &wc[i]);
1759 
1760 			assert(rdma_req->state == RDMA_REQUEST_STATE_COMPLETING);
1761 			rdma_req->state = RDMA_REQUEST_STATE_COMPLETED;
1762 
1763 			spdk_nvmf_rdma_request_process(rtransport, rdma_req);
1764 
1765 			count++;
1766 
1767 			/* Try to process other queued requests */
1768 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair);
1769 			break;
1770 
1771 		case IBV_WC_RDMA_WRITE:
1772 			rqpair->cur_rdma_rw_depth--;
1773 
1774 			/* Try to process other queued requests */
1775 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair);
1776 			break;
1777 
1778 		case IBV_WC_RDMA_READ:
1779 			rdma_req = get_rdma_req_from_wc(rqpair, &wc[i]);
1780 
1781 			assert(rdma_req->state == RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER);
1782 			rqpair->cur_rdma_rw_depth--;
1783 			rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
1784 
1785 			spdk_nvmf_rdma_request_process(rtransport, rdma_req);
1786 
1787 			/* Try to process other queued requests */
1788 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair);
1789 			break;
1790 
1791 		case IBV_WC_RECV:
1792 			rdma_recv = get_rdma_recv_from_wc(rqpair, &wc[i]);
1793 
1794 			TAILQ_INSERT_TAIL(&rqpair->incoming_queue, rdma_recv, link);
1795 
1796 			/* Try to process other queued requests */
1797 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair);
1798 			break;
1799 
1800 		default:
1801 			SPDK_ERRLOG("Received an unknown opcode on the CQ: %d\n", wc[i].opcode);
1802 			continue;
1803 		}
1804 	}
1805 
1806 	if (error == true) {
1807 		return -1;
1808 	}
1809 
1810 	return count;
1811 }
1812 
1813 static int
1814 spdk_nvmf_rdma_poll_group_poll(struct spdk_nvmf_transport_poll_group *group)
1815 {
1816 	struct spdk_nvmf_rdma_transport *rtransport;
1817 	struct spdk_nvmf_rdma_poll_group *rgroup;
1818 	struct spdk_nvmf_rdma_poller	*rpoller;
1819 	struct spdk_nvmf_rdma_qpair	*rqpair;
1820 	int				count, rc;
1821 
1822 	rtransport = SPDK_CONTAINEROF(group->transport, struct spdk_nvmf_rdma_transport, transport);
1823 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
1824 
1825 	count = 0;
1826 	TAILQ_FOREACH(rpoller, &rgroup->pollers, link) {
1827 		TAILQ_FOREACH(rqpair, &rpoller->qpairs, link) {
1828 			rc = spdk_nvmf_rdma_qpair_poll(rtransport, rqpair);
1829 			if (rc < 0) {
1830 				return rc;
1831 			}
1832 			count += rc;
1833 		}
1834 	}
1835 
1836 	return count;
1837 }
1838 
1839 static bool
1840 spdk_nvmf_rdma_qpair_is_idle(struct spdk_nvmf_qpair *qpair)
1841 {
1842 	struct spdk_nvmf_rdma_qpair *rqpair;
1843 
1844 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
1845 
1846 	if (rqpair->cur_queue_depth == 0 && rqpair->cur_rdma_rw_depth == 0) {
1847 		return true;
1848 	}
1849 	return false;
1850 }
1851 
1852 const struct spdk_nvmf_transport_ops spdk_nvmf_transport_rdma = {
1853 	.type = SPDK_NVME_TRANSPORT_RDMA,
1854 	.create = spdk_nvmf_rdma_create,
1855 	.destroy = spdk_nvmf_rdma_destroy,
1856 
1857 	.listen = spdk_nvmf_rdma_listen,
1858 	.stop_listen = spdk_nvmf_rdma_stop_listen,
1859 	.accept = spdk_nvmf_rdma_accept,
1860 
1861 	.listener_discover = spdk_nvmf_rdma_discover,
1862 
1863 	.poll_group_create = spdk_nvmf_rdma_poll_group_create,
1864 	.poll_group_destroy = spdk_nvmf_rdma_poll_group_destroy,
1865 	.poll_group_add = spdk_nvmf_rdma_poll_group_add,
1866 	.poll_group_remove = spdk_nvmf_rdma_poll_group_remove,
1867 	.poll_group_poll = spdk_nvmf_rdma_poll_group_poll,
1868 
1869 	.req_complete = spdk_nvmf_rdma_request_complete,
1870 
1871 	.qpair_fini = spdk_nvmf_rdma_close_qpair,
1872 	.qpair_is_idle = spdk_nvmf_rdma_qpair_is_idle,
1873 
1874 };
1875 
1876 SPDK_LOG_REGISTER_COMPONENT("rdma", SPDK_LOG_RDMA)
1877