xref: /spdk/lib/nvmf/rdma.c (revision aac1f5f9349f33fccbc208f672115beeb2542d35)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include <infiniband/verbs.h>
37 #include <rdma/rdma_cma.h>
38 #include <rdma/rdma_verbs.h>
39 
40 #include "nvmf_internal.h"
41 #include "transport.h"
42 
43 #include "spdk/assert.h"
44 #include "spdk/io_channel.h"
45 #include "spdk/nvmf.h"
46 #include "spdk/nvmf_spec.h"
47 #include "spdk/string.h"
48 #include "spdk/trace.h"
49 #include "spdk/util.h"
50 
51 #include "spdk_internal/log.h"
52 
53 /*
54  RDMA Connection Resouce Defaults
55  */
56 #define NVMF_DEFAULT_TX_SGE		1
57 #define NVMF_DEFAULT_RX_SGE		2
58 
59 /* The RDMA completion queue size */
60 #define NVMF_RDMA_CQ_SIZE	4096
61 
62 /* AIO backend requires block size aligned data buffers,
63  * extra 4KiB aligned data buffer should work for most devices.
64  */
65 #define SHIFT_4KB			12
66 #define NVMF_DATA_BUFFER_ALIGNMENT	(1 << SHIFT_4KB)
67 #define NVMF_DATA_BUFFER_MASK		(NVMF_DATA_BUFFER_ALIGNMENT - 1)
68 
69 enum spdk_nvmf_rdma_request_state {
70 	/* The request is not currently in use */
71 	RDMA_REQUEST_STATE_FREE = 0,
72 
73 	/* Initial state when request first received */
74 	RDMA_REQUEST_STATE_NEW,
75 
76 	/* The request is queued until a data buffer is available. */
77 	RDMA_REQUEST_STATE_NEED_BUFFER,
78 
79 	/* The request is waiting on RDMA queue depth availability
80 	 * to transfer data from the host to the controller.
81 	 */
82 	RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER,
83 
84 	/* The request is currently transferring data from the host to the controller. */
85 	RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER,
86 
87 	/* The request is ready to execute at the block device */
88 	RDMA_REQUEST_STATE_READY_TO_EXECUTE,
89 
90 	/* The request is currently executing at the block device */
91 	RDMA_REQUEST_STATE_EXECUTING,
92 
93 	/* The request finished executing at the block device */
94 	RDMA_REQUEST_STATE_EXECUTED,
95 
96 	/* The request is waiting on RDMA queue depth availability
97 	 * to transfer data from the controller to the host.
98 	 */
99 	RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST,
100 
101 	/* The request is ready to send a completion */
102 	RDMA_REQUEST_STATE_READY_TO_COMPLETE,
103 
104 	/* The request currently has a completion outstanding */
105 	RDMA_REQUEST_STATE_COMPLETING,
106 
107 	/* The request completed and can be marked free. */
108 	RDMA_REQUEST_STATE_COMPLETED,
109 };
110 
111 /* This structure holds commands as they are received off the wire.
112  * It must be dynamically paired with a full request object
113  * (spdk_nvmf_rdma_request) to service a request. It is separate
114  * from the request because RDMA does not appear to order
115  * completions, so occasionally we'll get a new incoming
116  * command when there aren't any free request objects.
117  */
118 struct spdk_nvmf_rdma_recv {
119 	struct ibv_recv_wr		wr;
120 	struct ibv_sge			sgl[NVMF_DEFAULT_RX_SGE];
121 
122 	struct spdk_nvmf_rdma_qpair	*qpair;
123 
124 	/* In-capsule data buffer */
125 	uint8_t				*buf;
126 
127 	TAILQ_ENTRY(spdk_nvmf_rdma_recv) link;
128 };
129 
130 struct spdk_nvmf_rdma_request {
131 	struct spdk_nvmf_request		req;
132 	void					*data_from_pool;
133 
134 	enum spdk_nvmf_rdma_request_state	state;
135 
136 	struct spdk_nvmf_rdma_recv		*recv;
137 
138 	struct {
139 		struct	ibv_send_wr		wr;
140 		struct	ibv_sge			sgl[NVMF_DEFAULT_TX_SGE];
141 	} rsp;
142 
143 	struct {
144 		struct ibv_send_wr		wr;
145 		struct ibv_sge			sgl[NVMF_DEFAULT_TX_SGE];
146 	} data;
147 
148 	TAILQ_ENTRY(spdk_nvmf_rdma_request)	link;
149 };
150 
151 struct spdk_nvmf_rdma_qpair {
152 	struct spdk_nvmf_qpair			qpair;
153 
154 	struct spdk_nvmf_rdma_port		*port;
155 	struct spdk_nvmf_rdma_poller		*poller;
156 
157 	struct rdma_cm_id			*cm_id;
158 
159 	/* The maximum number of I/O outstanding on this connection at one time */
160 	uint16_t				max_queue_depth;
161 
162 	/* The maximum number of active RDMA READ and WRITE operations at one time */
163 	uint16_t				max_rw_depth;
164 
165 	/* The current number of I/O outstanding on this connection. This number
166 	 * includes all I/O from the time the capsule is first received until it is
167 	 * completed.
168 	 */
169 	uint16_t				cur_queue_depth;
170 
171 	/* The number of RDMA READ and WRITE requests that are outstanding */
172 	uint16_t				cur_rdma_rw_depth;
173 
174 	/* Receives that are waiting for a request object */
175 	TAILQ_HEAD(, spdk_nvmf_rdma_recv)	incoming_queue;
176 
177 	/* Requests that are not in use */
178 	TAILQ_HEAD(, spdk_nvmf_rdma_request)	free_queue;
179 
180 	/* Requests that are waiting to perform an RDMA READ or WRITE */
181 	TAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_rdma_rw_queue;
182 
183 	/* Array of size "max_queue_depth" containing RDMA requests. */
184 	struct spdk_nvmf_rdma_request		*reqs;
185 
186 	/* Array of size "max_queue_depth" containing RDMA recvs. */
187 	struct spdk_nvmf_rdma_recv		*recvs;
188 
189 	/* Array of size "max_queue_depth" containing 64 byte capsules
190 	 * used for receive.
191 	 */
192 	union nvmf_h2c_msg			*cmds;
193 	struct ibv_mr				*cmds_mr;
194 
195 	/* Array of size "max_queue_depth" containing 16 byte completions
196 	 * to be sent back to the user.
197 	 */
198 	union nvmf_c2h_msg			*cpls;
199 	struct ibv_mr				*cpls_mr;
200 
201 	/* Array of size "max_queue_depth * InCapsuleDataSize" containing
202 	 * buffers to be used for in capsule data.
203 	 */
204 	void					*bufs;
205 	struct ibv_mr				*bufs_mr;
206 
207 	TAILQ_ENTRY(spdk_nvmf_rdma_qpair)	link;
208 	TAILQ_ENTRY(spdk_nvmf_rdma_qpair)	pending_link;
209 
210 	/* Mgmt channel */
211 	struct spdk_io_channel			*mgmt_channel;
212 	struct spdk_nvmf_rdma_mgmt_channel	*ch;
213 };
214 
215 struct spdk_nvmf_rdma_poller {
216 	struct spdk_nvmf_rdma_device		*device;
217 	struct spdk_nvmf_rdma_poll_group	*group;
218 
219 	struct ibv_cq				*cq;
220 
221 	TAILQ_HEAD(, spdk_nvmf_rdma_qpair)	qpairs;
222 
223 	TAILQ_ENTRY(spdk_nvmf_rdma_poller)	link;
224 };
225 
226 struct spdk_nvmf_rdma_poll_group {
227 	struct spdk_nvmf_transport_poll_group	group;
228 
229 	TAILQ_HEAD(, spdk_nvmf_rdma_poller)	pollers;
230 };
231 
232 /* Assuming rdma_cm uses just one protection domain per ibv_context. */
233 struct spdk_nvmf_rdma_device {
234 	struct ibv_device_attr			attr;
235 	struct ibv_context			*context;
236 
237 	struct spdk_mem_map			*map;
238 	struct ibv_pd				*pd;
239 
240 	TAILQ_ENTRY(spdk_nvmf_rdma_device)	link;
241 };
242 
243 struct spdk_nvmf_rdma_port {
244 	struct spdk_nvme_transport_id		trid;
245 	struct rdma_cm_id			*id;
246 	struct spdk_nvmf_rdma_device		*device;
247 	uint32_t				ref;
248 	TAILQ_ENTRY(spdk_nvmf_rdma_port)	link;
249 };
250 
251 struct spdk_nvmf_rdma_transport {
252 	struct spdk_nvmf_transport	transport;
253 
254 	struct rdma_event_channel	*event_channel;
255 
256 	struct spdk_mempool		*data_buf_pool;
257 
258 	pthread_mutex_t			lock;
259 
260 	uint16_t			max_queue_depth;
261 	uint32_t			max_io_size;
262 	uint32_t			in_capsule_data_size;
263 
264 	TAILQ_HEAD(, spdk_nvmf_rdma_device)	devices;
265 	TAILQ_HEAD(, spdk_nvmf_rdma_port)	ports;
266 };
267 
268 struct spdk_nvmf_rdma_mgmt_channel {
269 	/* Requests that are waiting to obtain a data buffer */
270 	TAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_data_buf_queue;
271 };
272 
273 static int
274 spdk_nvmf_rdma_mgmt_channel_create(void *io_device, void *ctx_buf)
275 {
276 	struct spdk_nvmf_rdma_mgmt_channel *ch = ctx_buf;
277 
278 	TAILQ_INIT(&ch->pending_data_buf_queue);
279 	return 0;
280 }
281 
282 static void
283 spdk_nvmf_rdma_mgmt_channel_destroy(void *io_device, void *ctx_buf)
284 {
285 	struct spdk_nvmf_rdma_mgmt_channel *ch = ctx_buf;
286 
287 	if (!TAILQ_EMPTY(&ch->pending_data_buf_queue)) {
288 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel destruction\n");
289 	}
290 }
291 
292 static void
293 spdk_nvmf_rdma_qpair_destroy(struct spdk_nvmf_rdma_qpair *rqpair)
294 {
295 	if (rqpair->poller) {
296 		TAILQ_REMOVE(&rqpair->poller->qpairs, rqpair, link);
297 	}
298 
299 	if (rqpair->cmds_mr) {
300 		ibv_dereg_mr(rqpair->cmds_mr);
301 	}
302 
303 	if (rqpair->cpls_mr) {
304 		ibv_dereg_mr(rqpair->cpls_mr);
305 	}
306 
307 	if (rqpair->bufs_mr) {
308 		ibv_dereg_mr(rqpair->bufs_mr);
309 	}
310 
311 	if (rqpair->cm_id) {
312 		rdma_destroy_qp(rqpair->cm_id);
313 		rdma_destroy_id(rqpair->cm_id);
314 	}
315 
316 	if (rqpair->mgmt_channel) {
317 		spdk_put_io_channel(rqpair->mgmt_channel);
318 	}
319 
320 	/* Free all memory */
321 	spdk_dma_free(rqpair->cmds);
322 	spdk_dma_free(rqpair->cpls);
323 	spdk_dma_free(rqpair->bufs);
324 	free(rqpair->reqs);
325 	free(rqpair->recvs);
326 	free(rqpair);
327 }
328 
329 static int
330 spdk_nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair)
331 {
332 	struct spdk_nvmf_rdma_transport *rtransport;
333 	struct spdk_nvmf_rdma_qpair	*rqpair;
334 	int				rc, i;
335 	struct ibv_qp_init_attr		attr;
336 	struct spdk_nvmf_rdma_recv	*rdma_recv;
337 	struct spdk_nvmf_rdma_request	*rdma_req;
338 
339 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
340 	rtransport = SPDK_CONTAINEROF(qpair->transport, struct spdk_nvmf_rdma_transport, transport);
341 
342 	memset(&attr, 0, sizeof(struct ibv_qp_init_attr));
343 	attr.qp_type		= IBV_QPT_RC;
344 	attr.send_cq		= rqpair->poller->cq;
345 	attr.recv_cq		= rqpair->poller->cq;
346 	attr.cap.max_send_wr	= rqpair->max_queue_depth * 2; /* SEND, READ, and WRITE operations */
347 	attr.cap.max_recv_wr	= rqpair->max_queue_depth; /* RECV operations */
348 	attr.cap.max_send_sge	= NVMF_DEFAULT_TX_SGE;
349 	attr.cap.max_recv_sge	= NVMF_DEFAULT_RX_SGE;
350 
351 	rc = rdma_create_qp(rqpair->cm_id, NULL, &attr);
352 	if (rc) {
353 		SPDK_ERRLOG("rdma_create_qp failed: errno %d: %s\n", errno, spdk_strerror(errno));
354 		rdma_destroy_id(rqpair->cm_id);
355 		rqpair->cm_id = NULL;
356 		spdk_nvmf_rdma_qpair_destroy(rqpair);
357 		return -1;
358 	}
359 
360 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "New RDMA Connection: %p\n", qpair);
361 
362 	rqpair->reqs = calloc(rqpair->max_queue_depth, sizeof(*rqpair->reqs));
363 	rqpair->recvs = calloc(rqpair->max_queue_depth, sizeof(*rqpair->recvs));
364 	rqpair->cmds = spdk_dma_zmalloc(rqpair->max_queue_depth * sizeof(*rqpair->cmds),
365 					0x1000, NULL);
366 	rqpair->cpls = spdk_dma_zmalloc(rqpair->max_queue_depth * sizeof(*rqpair->cpls),
367 					0x1000, NULL);
368 	rqpair->bufs = spdk_dma_zmalloc(rqpair->max_queue_depth * rtransport->in_capsule_data_size,
369 					0x1000, NULL);
370 	if (!rqpair->reqs || !rqpair->recvs || !rqpair->cmds ||
371 	    !rqpair->cpls || !rqpair->bufs) {
372 		SPDK_ERRLOG("Unable to allocate sufficient memory for RDMA queue.\n");
373 		spdk_nvmf_rdma_qpair_destroy(rqpair);
374 		return -1;
375 	}
376 
377 	rqpair->cmds_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->cmds,
378 				     rqpair->max_queue_depth * sizeof(*rqpair->cmds),
379 				     IBV_ACCESS_LOCAL_WRITE);
380 	rqpair->cpls_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->cpls,
381 				     rqpair->max_queue_depth * sizeof(*rqpair->cpls),
382 				     0);
383 	rqpair->bufs_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->bufs,
384 				     rqpair->max_queue_depth * rtransport->in_capsule_data_size,
385 				     IBV_ACCESS_LOCAL_WRITE |
386 				     IBV_ACCESS_REMOTE_WRITE);
387 	if (!rqpair->cmds_mr || !rqpair->cpls_mr || !rqpair->bufs_mr) {
388 		SPDK_ERRLOG("Unable to register required memory for RDMA queue.\n");
389 		spdk_nvmf_rdma_qpair_destroy(rqpair);
390 		return -1;
391 	}
392 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Command Array: %p Length: %lx LKey: %x\n",
393 		      rqpair->cmds, rqpair->max_queue_depth * sizeof(*rqpair->cmds), rqpair->cmds_mr->lkey);
394 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Completion Array: %p Length: %lx LKey: %x\n",
395 		      rqpair->cpls, rqpair->max_queue_depth * sizeof(*rqpair->cpls), rqpair->cpls_mr->lkey);
396 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "In Capsule Data Array: %p Length: %x LKey: %x\n",
397 		      rqpair->bufs, rqpair->max_queue_depth * rtransport->in_capsule_data_size, rqpair->bufs_mr->lkey);
398 
399 	for (i = 0; i < rqpair->max_queue_depth; i++) {
400 		struct ibv_recv_wr *bad_wr = NULL;
401 
402 		rdma_recv = &rqpair->recvs[i];
403 		rdma_recv->qpair = rqpair;
404 
405 		/* Set up memory to receive commands */
406 		rdma_recv->buf = (void *)((uintptr_t)rqpair->bufs + (i * rtransport->in_capsule_data_size));
407 
408 		rdma_recv->sgl[0].addr = (uintptr_t)&rqpair->cmds[i];
409 		rdma_recv->sgl[0].length = sizeof(rqpair->cmds[i]);
410 		rdma_recv->sgl[0].lkey = rqpair->cmds_mr->lkey;
411 
412 		rdma_recv->sgl[1].addr = (uintptr_t)rdma_recv->buf;
413 		rdma_recv->sgl[1].length = rtransport->in_capsule_data_size;
414 		rdma_recv->sgl[1].lkey = rqpair->bufs_mr->lkey;
415 
416 		rdma_recv->wr.wr_id = (uintptr_t)rdma_recv;
417 		rdma_recv->wr.sg_list = rdma_recv->sgl;
418 		rdma_recv->wr.num_sge = SPDK_COUNTOF(rdma_recv->sgl);
419 
420 		rc = ibv_post_recv(rqpair->cm_id->qp, &rdma_recv->wr, &bad_wr);
421 		if (rc) {
422 			SPDK_ERRLOG("Unable to post capsule for RDMA RECV\n");
423 			spdk_nvmf_rdma_qpair_destroy(rqpair);
424 			return -1;
425 		}
426 	}
427 
428 	for (i = 0; i < rqpair->max_queue_depth; i++) {
429 		rdma_req = &rqpair->reqs[i];
430 
431 		rdma_req->req.qpair = &rqpair->qpair;
432 		rdma_req->req.cmd = NULL;
433 
434 		/* Set up memory to send responses */
435 		rdma_req->req.rsp = &rqpair->cpls[i];
436 
437 		rdma_req->rsp.sgl[0].addr = (uintptr_t)&rqpair->cpls[i];
438 		rdma_req->rsp.sgl[0].length = sizeof(rqpair->cpls[i]);
439 		rdma_req->rsp.sgl[0].lkey = rqpair->cpls_mr->lkey;
440 
441 		rdma_req->rsp.wr.wr_id = (uintptr_t)rdma_req;
442 		rdma_req->rsp.wr.next = NULL;
443 		rdma_req->rsp.wr.opcode = IBV_WR_SEND;
444 		rdma_req->rsp.wr.send_flags = IBV_SEND_SIGNALED;
445 		rdma_req->rsp.wr.sg_list = rdma_req->rsp.sgl;
446 		rdma_req->rsp.wr.num_sge = SPDK_COUNTOF(rdma_req->rsp.sgl);
447 
448 		/* Set up memory for data buffers */
449 		rdma_req->data.wr.wr_id = (uint64_t)rdma_req;
450 		rdma_req->data.wr.next = NULL;
451 		rdma_req->data.wr.send_flags = IBV_SEND_SIGNALED;
452 		rdma_req->data.wr.sg_list = rdma_req->data.sgl;
453 		rdma_req->data.wr.num_sge = SPDK_COUNTOF(rdma_req->data.sgl);
454 
455 		TAILQ_INSERT_TAIL(&rqpair->free_queue, rdma_req, link);
456 	}
457 
458 	return 0;
459 }
460 
461 static int
462 request_transfer_in(struct spdk_nvmf_request *req)
463 {
464 	int				rc;
465 	struct spdk_nvmf_rdma_request	*rdma_req;
466 	struct spdk_nvmf_qpair		*qpair;
467 	struct spdk_nvmf_rdma_qpair	*rqpair;
468 	struct ibv_send_wr		*bad_wr = NULL;
469 
470 	qpair = req->qpair;
471 	rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
472 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
473 
474 	assert(req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER);
475 
476 	rqpair->cur_rdma_rw_depth++;
477 
478 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "RDMA READ POSTED. Request: %p Connection: %p\n", req, qpair);
479 	spdk_trace_record(TRACE_RDMA_READ_START, 0, 0, (uintptr_t)req, 0);
480 
481 	rdma_req->data.wr.opcode = IBV_WR_RDMA_READ;
482 	rdma_req->data.wr.next = NULL;
483 	rc = ibv_post_send(rqpair->cm_id->qp, &rdma_req->data.wr, &bad_wr);
484 	if (rc) {
485 		SPDK_ERRLOG("Unable to transfer data from host to target\n");
486 
487 		/* Decrement r/w counter back since data transfer
488 		 * has not started.
489 		 */
490 		rqpair->cur_rdma_rw_depth--;
491 		return -1;
492 	}
493 
494 	return 0;
495 }
496 
497 static int
498 request_transfer_out(struct spdk_nvmf_request *req)
499 {
500 	int				rc;
501 	struct spdk_nvmf_rdma_request	*rdma_req;
502 	struct spdk_nvmf_qpair		*qpair;
503 	struct spdk_nvmf_rdma_qpair	*rqpair;
504 	struct spdk_nvme_cpl		*rsp;
505 	struct ibv_recv_wr		*bad_recv_wr = NULL;
506 	struct ibv_send_wr		*send_wr, *bad_send_wr = NULL;
507 
508 	qpair = req->qpair;
509 	rsp = &req->rsp->nvme_cpl;
510 	rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
511 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
512 
513 	/* Advance our sq_head pointer */
514 	if (qpair->sq_head == qpair->sq_head_max) {
515 		qpair->sq_head = 0;
516 	} else {
517 		qpair->sq_head++;
518 	}
519 	rsp->sqhd = qpair->sq_head;
520 
521 	/* Post the capsule to the recv buffer */
522 	assert(rdma_req->recv != NULL);
523 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "RDMA RECV POSTED. Recv: %p Connection: %p\n", rdma_req->recv,
524 		      rqpair);
525 	rc = ibv_post_recv(rqpair->cm_id->qp, &rdma_req->recv->wr, &bad_recv_wr);
526 	if (rc) {
527 		SPDK_ERRLOG("Unable to re-post rx descriptor\n");
528 		return rc;
529 	}
530 	rdma_req->recv = NULL;
531 
532 	/* Build the response which consists of an optional
533 	 * RDMA WRITE to transfer data, plus an RDMA SEND
534 	 * containing the response.
535 	 */
536 	send_wr = &rdma_req->rsp.wr;
537 
538 	if (rsp->status.sc == SPDK_NVME_SC_SUCCESS &&
539 	    req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
540 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "RDMA WRITE POSTED. Request: %p Connection: %p\n", req, qpair);
541 		spdk_trace_record(TRACE_RDMA_WRITE_START, 0, 0, (uintptr_t)req, 0);
542 
543 		rqpair->cur_rdma_rw_depth++;
544 		rdma_req->data.wr.opcode = IBV_WR_RDMA_WRITE;
545 
546 		rdma_req->data.wr.next = send_wr;
547 		send_wr = &rdma_req->data.wr;
548 	}
549 
550 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "RDMA SEND POSTED. Request: %p Connection: %p\n", req, qpair);
551 	spdk_trace_record(TRACE_NVMF_IO_COMPLETE, 0, 0, (uintptr_t)req, 0);
552 
553 	/* Send the completion */
554 	rc = ibv_post_send(rqpair->cm_id->qp, send_wr, &bad_send_wr);
555 	if (rc) {
556 		SPDK_ERRLOG("Unable to send response capsule\n");
557 
558 		if (rdma_req->data.wr.opcode == IBV_WR_RDMA_WRITE) {
559 			/* Decrement r/w counter back since data transfer
560 			 * has not started.
561 			 */
562 			rqpair->cur_rdma_rw_depth--;
563 		}
564 	}
565 
566 	return rc;
567 }
568 
569 static int
570 spdk_nvmf_rdma_event_accept(struct rdma_cm_id *id, struct spdk_nvmf_rdma_qpair *rqpair)
571 {
572 	struct spdk_nvmf_rdma_accept_private_data	accept_data;
573 	struct rdma_conn_param				ctrlr_event_data = {};
574 	int						rc;
575 
576 	accept_data.recfmt = 0;
577 	accept_data.crqsize = rqpair->max_queue_depth;
578 
579 	ctrlr_event_data.private_data = &accept_data;
580 	ctrlr_event_data.private_data_len = sizeof(accept_data);
581 	if (id->ps == RDMA_PS_TCP) {
582 		ctrlr_event_data.responder_resources = 0; /* We accept 0 reads from the host */
583 		ctrlr_event_data.initiator_depth = rqpair->max_rw_depth;
584 	}
585 
586 	rc = rdma_accept(id, &ctrlr_event_data);
587 	if (rc) {
588 		SPDK_ERRLOG("Error %d on rdma_accept\n", errno);
589 	} else {
590 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Sent back the accept\n");
591 	}
592 
593 	return rc;
594 }
595 
596 static void
597 spdk_nvmf_rdma_event_reject(struct rdma_cm_id *id, enum spdk_nvmf_rdma_transport_error error)
598 {
599 	struct spdk_nvmf_rdma_reject_private_data	rej_data;
600 
601 	rej_data.recfmt = 0;
602 	rej_data.sts = error;
603 
604 	rdma_reject(id, &rej_data, sizeof(rej_data));
605 }
606 
607 static int
608 nvmf_rdma_connect(struct spdk_nvmf_transport *transport, struct rdma_cm_event *event,
609 		  new_qpair_fn cb_fn)
610 {
611 	struct spdk_nvmf_rdma_transport *rtransport;
612 	struct spdk_nvmf_rdma_qpair	*rqpair = NULL;
613 	struct spdk_nvmf_rdma_port	*port;
614 	struct rdma_conn_param		*rdma_param = NULL;
615 	const struct spdk_nvmf_rdma_request_private_data *private_data = NULL;
616 	uint16_t			max_queue_depth;
617 	uint16_t			max_rw_depth;
618 
619 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
620 
621 	assert(event->id != NULL); /* Impossible. Can't even reject the connection. */
622 	assert(event->id->verbs != NULL); /* Impossible. No way to handle this. */
623 
624 	rdma_param = &event->param.conn;
625 	if (rdma_param->private_data == NULL ||
626 	    rdma_param->private_data_len < sizeof(struct spdk_nvmf_rdma_request_private_data)) {
627 		SPDK_ERRLOG("connect request: no private data provided\n");
628 		spdk_nvmf_rdma_event_reject(event->id, SPDK_NVMF_RDMA_ERROR_INVALID_PRIVATE_DATA_LENGTH);
629 		return -1;
630 	}
631 
632 	private_data = rdma_param->private_data;
633 	if (private_data->recfmt != 0) {
634 		SPDK_ERRLOG("Received RDMA private data with RECFMT != 0\n");
635 		spdk_nvmf_rdma_event_reject(event->id, SPDK_NVMF_RDMA_ERROR_INVALID_RECFMT);
636 		return -1;
637 	}
638 
639 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Connect Recv on fabric intf name %s, dev_name %s\n",
640 		      event->id->verbs->device->name, event->id->verbs->device->dev_name);
641 
642 	port = event->listen_id->context;
643 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Listen Id was %p with verbs %p. ListenAddr: %p\n",
644 		      event->listen_id, event->listen_id->verbs, port);
645 
646 	/* Figure out the supported queue depth. This is a multi-step process
647 	 * that takes into account hardware maximums, host provided values,
648 	 * and our target's internal memory limits */
649 
650 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Calculating Queue Depth\n");
651 
652 	/* Start with the maximum queue depth allowed by the target */
653 	max_queue_depth = rtransport->max_queue_depth;
654 	max_rw_depth = rtransport->max_queue_depth;
655 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Target Max Queue Depth: %d\n", rtransport->max_queue_depth);
656 
657 	/* Next check the local NIC's hardware limitations */
658 	SPDK_DEBUGLOG(SPDK_LOG_RDMA,
659 		      "Local NIC Max Send/Recv Queue Depth: %d Max Read/Write Queue Depth: %d\n",
660 		      port->device->attr.max_qp_wr, port->device->attr.max_qp_rd_atom);
661 	max_queue_depth = spdk_min(max_queue_depth, port->device->attr.max_qp_wr);
662 	max_rw_depth = spdk_min(max_rw_depth, port->device->attr.max_qp_rd_atom);
663 
664 	/* Next check the remote NIC's hardware limitations */
665 	SPDK_DEBUGLOG(SPDK_LOG_RDMA,
666 		      "Host (Initiator) NIC Max Incoming RDMA R/W operations: %d Max Outgoing RDMA R/W operations: %d\n",
667 		      rdma_param->initiator_depth, rdma_param->responder_resources);
668 	if (rdma_param->initiator_depth > 0) {
669 		max_rw_depth = spdk_min(max_rw_depth, rdma_param->initiator_depth);
670 	}
671 
672 	/* Finally check for the host software requested values, which are
673 	 * optional. */
674 	if (rdma_param->private_data != NULL &&
675 	    rdma_param->private_data_len >= sizeof(struct spdk_nvmf_rdma_request_private_data)) {
676 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Host Receive Queue Size: %d\n", private_data->hrqsize);
677 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Host Send Queue Size: %d\n", private_data->hsqsize);
678 		max_queue_depth = spdk_min(max_queue_depth, private_data->hrqsize);
679 		max_queue_depth = spdk_min(max_queue_depth, private_data->hsqsize + 1);
680 	}
681 
682 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Final Negotiated Queue Depth: %d R/W Depth: %d\n",
683 		      max_queue_depth, max_rw_depth);
684 
685 	rqpair = calloc(1, sizeof(struct spdk_nvmf_rdma_qpair));
686 	if (rqpair == NULL) {
687 		SPDK_ERRLOG("Could not allocate new connection.\n");
688 		spdk_nvmf_rdma_event_reject(event->id, SPDK_NVMF_RDMA_ERROR_NO_RESOURCES);
689 		return -1;
690 	}
691 
692 	rqpair->port = port;
693 	rqpair->max_queue_depth = max_queue_depth;
694 	rqpair->max_rw_depth = max_rw_depth;
695 	rqpair->cm_id = event->id;
696 	rqpair->qpair.transport = transport;
697 	TAILQ_INIT(&rqpair->incoming_queue);
698 	TAILQ_INIT(&rqpair->free_queue);
699 	TAILQ_INIT(&rqpair->pending_rdma_rw_queue);
700 
701 	event->id->context = &rqpair->qpair;
702 
703 	cb_fn(&rqpair->qpair);
704 
705 	return 0;
706 }
707 
708 static void
709 nvmf_rdma_handle_disconnect(void *ctx)
710 {
711 	struct spdk_nvmf_qpair		*qpair = ctx;
712 	struct spdk_nvmf_ctrlr		*ctrlr;
713 	struct spdk_nvmf_rdma_qpair	*rqpair;
714 
715 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
716 
717 	ctrlr = qpair->ctrlr;
718 	if (ctrlr == NULL) {
719 		/* No ctrlr has been established yet, so destroy
720 		 * the connection.
721 		 */
722 		spdk_nvmf_rdma_qpair_destroy(rqpair);
723 		return;
724 	}
725 
726 	spdk_nvmf_ctrlr_disconnect(qpair);
727 }
728 
729 static int
730 nvmf_rdma_disconnect(struct rdma_cm_event *evt)
731 {
732 	struct spdk_nvmf_qpair	*qpair;
733 	struct spdk_io_channel	*ch;
734 
735 	if (evt->id == NULL) {
736 		SPDK_ERRLOG("disconnect request: missing cm_id\n");
737 		return -1;
738 	}
739 
740 	qpair = evt->id->context;
741 	if (qpair == NULL) {
742 		SPDK_ERRLOG("disconnect request: no active connection\n");
743 		return -1;
744 	}
745 	/* ack the disconnect event before rdma_destroy_id */
746 	rdma_ack_cm_event(evt);
747 
748 	ch = spdk_io_channel_from_ctx(qpair->group);
749 	spdk_thread_send_msg(spdk_io_channel_get_thread(ch), nvmf_rdma_handle_disconnect, qpair);
750 
751 	return 0;
752 }
753 
754 #ifdef DEBUG
755 static const char *CM_EVENT_STR[] = {
756 	"RDMA_CM_EVENT_ADDR_RESOLVED",
757 	"RDMA_CM_EVENT_ADDR_ERROR",
758 	"RDMA_CM_EVENT_ROUTE_RESOLVED",
759 	"RDMA_CM_EVENT_ROUTE_ERROR",
760 	"RDMA_CM_EVENT_CONNECT_REQUEST",
761 	"RDMA_CM_EVENT_CONNECT_RESPONSE",
762 	"RDMA_CM_EVENT_CONNECT_ERROR",
763 	"RDMA_CM_EVENT_UNREACHABLE",
764 	"RDMA_CM_EVENT_REJECTED",
765 	"RDMA_CM_EVENT_ESTABLISHED",
766 	"RDMA_CM_EVENT_DISCONNECTED",
767 	"RDMA_CM_EVENT_DEVICE_REMOVAL",
768 	"RDMA_CM_EVENT_MULTICAST_JOIN",
769 	"RDMA_CM_EVENT_MULTICAST_ERROR",
770 	"RDMA_CM_EVENT_ADDR_CHANGE",
771 	"RDMA_CM_EVENT_TIMEWAIT_EXIT"
772 };
773 #endif /* DEBUG */
774 
775 static int
776 spdk_nvmf_rdma_mem_notify(void *cb_ctx, struct spdk_mem_map *map,
777 			  enum spdk_mem_map_notify_action action,
778 			  void *vaddr, size_t size)
779 {
780 	struct spdk_nvmf_rdma_device *device = cb_ctx;
781 	struct ibv_pd *pd = device->pd;
782 	struct ibv_mr *mr;
783 
784 	switch (action) {
785 	case SPDK_MEM_MAP_NOTIFY_REGISTER:
786 		mr = ibv_reg_mr(pd, vaddr, size,
787 				IBV_ACCESS_LOCAL_WRITE |
788 				IBV_ACCESS_REMOTE_READ |
789 				IBV_ACCESS_REMOTE_WRITE);
790 		if (mr == NULL) {
791 			SPDK_ERRLOG("ibv_reg_mr() failed\n");
792 			return -1;
793 		} else {
794 			spdk_mem_map_set_translation(map, (uint64_t)vaddr, size, (uint64_t)mr);
795 		}
796 		break;
797 	case SPDK_MEM_MAP_NOTIFY_UNREGISTER:
798 		mr = (struct ibv_mr *)spdk_mem_map_translate(map, (uint64_t)vaddr);
799 		spdk_mem_map_clear_translation(map, (uint64_t)vaddr, size);
800 		if (mr) {
801 			ibv_dereg_mr(mr);
802 		}
803 		break;
804 	}
805 
806 	return 0;
807 }
808 
809 typedef enum spdk_nvme_data_transfer spdk_nvme_data_transfer_t;
810 
811 static spdk_nvme_data_transfer_t
812 spdk_nvmf_rdma_request_get_xfer(struct spdk_nvmf_rdma_request *rdma_req)
813 {
814 	enum spdk_nvme_data_transfer xfer;
815 	struct spdk_nvme_cmd *cmd = &rdma_req->req.cmd->nvme_cmd;
816 	struct spdk_nvme_sgl_descriptor *sgl = &cmd->dptr.sgl1;
817 
818 	/* Figure out data transfer direction */
819 	if (cmd->opc == SPDK_NVME_OPC_FABRIC) {
820 		xfer = spdk_nvme_opc_get_data_transfer(rdma_req->req.cmd->nvmf_cmd.fctype);
821 	} else {
822 		xfer = spdk_nvme_opc_get_data_transfer(cmd->opc);
823 
824 		/* Some admin commands are special cases */
825 		if ((rdma_req->req.qpair->qid == 0) &&
826 		    ((cmd->opc == SPDK_NVME_OPC_GET_FEATURES) ||
827 		     (cmd->opc == SPDK_NVME_OPC_SET_FEATURES))) {
828 			switch (cmd->cdw10 & 0xff) {
829 			case SPDK_NVME_FEAT_LBA_RANGE_TYPE:
830 			case SPDK_NVME_FEAT_AUTONOMOUS_POWER_STATE_TRANSITION:
831 			case SPDK_NVME_FEAT_HOST_IDENTIFIER:
832 				break;
833 			default:
834 				xfer = SPDK_NVME_DATA_NONE;
835 			}
836 		}
837 	}
838 
839 	if (xfer == SPDK_NVME_DATA_NONE) {
840 		return xfer;
841 	}
842 
843 	/* Even for commands that may transfer data, they could have specified 0 length.
844 	 * We want those to show up with xfer SPDK_NVME_DATA_NONE.
845 	 */
846 	switch (sgl->generic.type) {
847 	case SPDK_NVME_SGL_TYPE_DATA_BLOCK:
848 	case SPDK_NVME_SGL_TYPE_BIT_BUCKET:
849 	case SPDK_NVME_SGL_TYPE_SEGMENT:
850 	case SPDK_NVME_SGL_TYPE_LAST_SEGMENT:
851 	case SPDK_NVME_SGL_TYPE_TRANSPORT_DATA_BLOCK:
852 		if (sgl->unkeyed.length == 0) {
853 			xfer = SPDK_NVME_DATA_NONE;
854 		}
855 		break;
856 	case SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK:
857 		if (sgl->keyed.length == 0) {
858 			xfer = SPDK_NVME_DATA_NONE;
859 		}
860 		break;
861 	}
862 
863 	return xfer;
864 }
865 
866 static int
867 spdk_nvmf_rdma_request_parse_sgl(struct spdk_nvmf_rdma_transport *rtransport,
868 				 struct spdk_nvmf_rdma_device *device,
869 				 struct spdk_nvmf_rdma_request *rdma_req)
870 {
871 	struct spdk_nvme_cmd			*cmd;
872 	struct spdk_nvme_cpl			*rsp;
873 	struct spdk_nvme_sgl_descriptor		*sgl;
874 
875 	cmd = &rdma_req->req.cmd->nvme_cmd;
876 	rsp = &rdma_req->req.rsp->nvme_cpl;
877 	sgl = &cmd->dptr.sgl1;
878 
879 	if (sgl->generic.type == SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK &&
880 	    (sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_ADDRESS ||
881 	     sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_INVALIDATE_KEY)) {
882 		if (sgl->keyed.length > rtransport->max_io_size) {
883 			SPDK_ERRLOG("SGL length 0x%x exceeds max io size 0x%x\n",
884 				    sgl->keyed.length, rtransport->max_io_size);
885 			rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID;
886 			return -1;
887 		}
888 
889 		rdma_req->req.length = sgl->keyed.length;
890 		rdma_req->data_from_pool = spdk_mempool_get(rtransport->data_buf_pool);
891 		if (!rdma_req->data_from_pool) {
892 			/* No available buffers. Queue this request up. */
893 			SPDK_DEBUGLOG(SPDK_LOG_RDMA, "No available large data buffers. Queueing request %p\n", rdma_req);
894 			return 0;
895 		}
896 		/* AIO backend requires block size aligned data buffers,
897 		 * 4KiB aligned data buffer should work for most devices.
898 		 */
899 		rdma_req->req.data = (void *)((uintptr_t)(rdma_req->data_from_pool + NVMF_DATA_BUFFER_MASK)
900 					      & ~NVMF_DATA_BUFFER_MASK);
901 		rdma_req->data.sgl[0].addr = (uintptr_t)rdma_req->req.data;
902 		rdma_req->data.sgl[0].length = sgl->keyed.length;
903 		rdma_req->data.sgl[0].lkey = ((struct ibv_mr *)spdk_mem_map_translate(device->map,
904 					      (uint64_t)rdma_req->req.data))->lkey;
905 		rdma_req->data.wr.wr.rdma.rkey = sgl->keyed.key;
906 		rdma_req->data.wr.wr.rdma.remote_addr = sgl->address;
907 
908 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Request %p took buffer from central pool\n", rdma_req);
909 
910 		return 0;
911 	} else if (sgl->generic.type == SPDK_NVME_SGL_TYPE_DATA_BLOCK &&
912 		   sgl->unkeyed.subtype == SPDK_NVME_SGL_SUBTYPE_OFFSET) {
913 		uint64_t offset = sgl->address;
914 		uint32_t max_len = rtransport->in_capsule_data_size;
915 
916 		SPDK_DEBUGLOG(SPDK_LOG_NVMF, "In-capsule data: offset 0x%" PRIx64 ", length 0x%x\n",
917 			      offset, sgl->unkeyed.length);
918 
919 		if (offset > max_len) {
920 			SPDK_ERRLOG("In-capsule offset 0x%" PRIx64 " exceeds capsule length 0x%x\n",
921 				    offset, max_len);
922 			rsp->status.sc = SPDK_NVME_SC_INVALID_SGL_OFFSET;
923 			return -1;
924 		}
925 		max_len -= (uint32_t)offset;
926 
927 		if (sgl->unkeyed.length > max_len) {
928 			SPDK_ERRLOG("In-capsule data length 0x%x exceeds capsule length 0x%x\n",
929 				    sgl->unkeyed.length, max_len);
930 			rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID;
931 			return -1;
932 		}
933 
934 		rdma_req->req.data = rdma_req->recv->buf + offset;
935 		rdma_req->data_from_pool = NULL;
936 		rdma_req->req.length = sgl->unkeyed.length;
937 		return 0;
938 	}
939 
940 	SPDK_ERRLOG("Invalid NVMf I/O Command SGL:  Type 0x%x, Subtype 0x%x\n",
941 		    sgl->generic.type, sgl->generic.subtype);
942 	rsp->status.sc = SPDK_NVME_SC_SGL_DESCRIPTOR_TYPE_INVALID;
943 	return -1;
944 }
945 
946 static bool
947 spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
948 			       struct spdk_nvmf_rdma_request *rdma_req)
949 {
950 	struct spdk_nvmf_rdma_qpair	*rqpair;
951 	struct spdk_nvmf_rdma_device	*device;
952 	struct spdk_nvme_cpl		*rsp = &rdma_req->req.rsp->nvme_cpl;
953 	int				rc;
954 	struct spdk_nvmf_rdma_recv	*rdma_recv;
955 	enum spdk_nvmf_rdma_request_state prev_state;
956 	bool				progress = false;
957 
958 	rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
959 	device = rqpair->port->device;
960 
961 	assert(rdma_req->state != RDMA_REQUEST_STATE_FREE);
962 
963 	/* The loop here is to allow for several back-to-back state changes. */
964 	do {
965 		prev_state = rdma_req->state;
966 
967 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Request %p entering state %d\n", rdma_req, prev_state);
968 
969 		switch (rdma_req->state) {
970 		case RDMA_REQUEST_STATE_FREE:
971 			/* Some external code must kick a request into RDMA_REQUEST_STATE_NEW
972 			 * to escape this state. */
973 			break;
974 		case RDMA_REQUEST_STATE_NEW:
975 			rqpair->cur_queue_depth++;
976 			rdma_recv = rdma_req->recv;
977 
978 			/* The first element of the SGL is the NVMe command */
979 			rdma_req->req.cmd = (union nvmf_h2c_msg *)rdma_recv->sgl[0].addr;
980 			memset(rdma_req->req.rsp, 0, sizeof(*rdma_req->req.rsp));
981 
982 			TAILQ_REMOVE(&rqpair->incoming_queue, rdma_recv, link);
983 			TAILQ_REMOVE(&rqpair->free_queue, rdma_req, link);
984 
985 			/* The next state transition depends on the data transfer needs of this request. */
986 			rdma_req->req.xfer = spdk_nvmf_rdma_request_get_xfer(rdma_req);
987 
988 			/* If no data to transfer, ready to execute. */
989 			if (rdma_req->req.xfer == SPDK_NVME_DATA_NONE) {
990 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
991 				break;
992 			}
993 
994 			rdma_req->state = RDMA_REQUEST_STATE_NEED_BUFFER;
995 			TAILQ_INSERT_TAIL(&rqpair->ch->pending_data_buf_queue, rdma_req, link);
996 			break;
997 		case RDMA_REQUEST_STATE_NEED_BUFFER:
998 			assert(rdma_req->req.xfer != SPDK_NVME_DATA_NONE);
999 
1000 			if (rdma_req != TAILQ_FIRST(&rqpair->ch->pending_data_buf_queue)) {
1001 				/* This request needs to wait in line to obtain a buffer */
1002 				break;
1003 			}
1004 
1005 			/* Try to get a data buffer */
1006 			rc = spdk_nvmf_rdma_request_parse_sgl(rtransport, device, rdma_req);
1007 			if (rc < 0) {
1008 				TAILQ_REMOVE(&rqpair->ch->pending_data_buf_queue, rdma_req, link);
1009 				rsp->status.sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1010 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1011 				break;
1012 			}
1013 
1014 			if (!rdma_req->req.data) {
1015 				/* No buffers available. */
1016 				break;
1017 			}
1018 
1019 			TAILQ_REMOVE(&rqpair->ch->pending_data_buf_queue, rdma_req, link);
1020 
1021 			/* If data is transferring from host to controller and the data didn't
1022 			 * arrive using in capsule data, we need to do a transfer from the host.
1023 			 */
1024 			if (rdma_req->req.xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER && rdma_req->data_from_pool != NULL) {
1025 				rdma_req->state = RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER;
1026 				TAILQ_INSERT_TAIL(&rqpair->pending_rdma_rw_queue, rdma_req, link);
1027 				break;
1028 			}
1029 
1030 			rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
1031 			break;
1032 		case RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER:
1033 			if (rdma_req != TAILQ_FIRST(&rqpair->pending_rdma_rw_queue)) {
1034 				/* This request needs to wait in line to perform RDMA */
1035 				break;
1036 			}
1037 
1038 			if (rqpair->cur_rdma_rw_depth < rqpair->max_rw_depth) {
1039 				TAILQ_REMOVE(&rqpair->pending_rdma_rw_queue, rdma_req, link);
1040 				rdma_req->state = RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER;
1041 				rc = request_transfer_in(&rdma_req->req);
1042 				if (rc) {
1043 					rsp->status.sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1044 					rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1045 				}
1046 			}
1047 			break;
1048 		case RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER:
1049 			/* Some external code must kick a request into RDMA_REQUEST_STATE_READY_TO_EXECUTE
1050 			 * to escape this state. */
1051 			break;
1052 		case RDMA_REQUEST_STATE_READY_TO_EXECUTE:
1053 			rdma_req->state = RDMA_REQUEST_STATE_EXECUTING;
1054 			spdk_nvmf_request_exec(&rdma_req->req);
1055 			break;
1056 		case RDMA_REQUEST_STATE_EXECUTING:
1057 			/* Some external code must kick a request into RDMA_REQUEST_STATE_EXECUTED
1058 			 * to escape this state. */
1059 			break;
1060 		case RDMA_REQUEST_STATE_EXECUTED:
1061 			if (rdma_req->req.xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
1062 				rdma_req->state = RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST;
1063 				TAILQ_INSERT_TAIL(&rqpair->pending_rdma_rw_queue, rdma_req, link);
1064 			} else {
1065 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1066 			}
1067 			break;
1068 		case RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST:
1069 			if (rdma_req != TAILQ_FIRST(&rqpair->pending_rdma_rw_queue)) {
1070 				/* This request needs to wait in line to perform RDMA */
1071 				break;
1072 			}
1073 
1074 			if (rqpair->cur_rdma_rw_depth < rqpair->max_rw_depth) {
1075 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1076 				TAILQ_REMOVE(&rqpair->pending_rdma_rw_queue, rdma_req, link);
1077 			}
1078 			break;
1079 		case RDMA_REQUEST_STATE_READY_TO_COMPLETE:
1080 			rdma_req->state = RDMA_REQUEST_STATE_COMPLETING;
1081 
1082 			rc = request_transfer_out(&rdma_req->req);
1083 			assert(rc == 0); /* No good way to handle this currently */
1084 			break;
1085 		case RDMA_REQUEST_STATE_COMPLETING:
1086 			/* Some external code must kick a request into RDMA_REQUEST_STATE_COMPLETED
1087 			 * to escape this state. */
1088 			break;
1089 		case RDMA_REQUEST_STATE_COMPLETED:
1090 			assert(rqpair->cur_queue_depth > 0);
1091 			rqpair->cur_queue_depth--;
1092 
1093 			if (rdma_req->data_from_pool) {
1094 				/* Put the buffer back in the pool */
1095 				spdk_mempool_put(rtransport->data_buf_pool, rdma_req->data_from_pool);
1096 				rdma_req->data_from_pool = NULL;
1097 			}
1098 			rdma_req->req.length = 0;
1099 			rdma_req->req.data = NULL;
1100 			rdma_req->state = RDMA_REQUEST_STATE_FREE;
1101 			TAILQ_INSERT_TAIL(&rqpair->free_queue, rdma_req, link);
1102 			break;
1103 		}
1104 
1105 		if (rdma_req->state != prev_state) {
1106 			progress = true;
1107 		}
1108 	} while (rdma_req->state != prev_state);
1109 
1110 	return progress;
1111 }
1112 
1113 /* Public API callbacks begin here */
1114 
1115 static struct spdk_nvmf_transport *
1116 spdk_nvmf_rdma_create(struct spdk_nvmf_tgt *tgt)
1117 {
1118 	int rc;
1119 	struct spdk_nvmf_rdma_transport *rtransport;
1120 	struct spdk_nvmf_rdma_device	*device, *tmp;
1121 	struct ibv_context		**contexts;
1122 	uint32_t			i;
1123 	int				flag;
1124 
1125 	rtransport = calloc(1, sizeof(*rtransport));
1126 	if (!rtransport) {
1127 		return NULL;
1128 	}
1129 
1130 	pthread_mutex_init(&rtransport->lock, NULL);
1131 	TAILQ_INIT(&rtransport->devices);
1132 	TAILQ_INIT(&rtransport->ports);
1133 
1134 	rtransport->transport.tgt = tgt;
1135 	rtransport->transport.ops = &spdk_nvmf_transport_rdma;
1136 
1137 	SPDK_INFOLOG(SPDK_LOG_RDMA, "*** RDMA Transport Init ***\n");
1138 
1139 	rtransport->max_queue_depth = tgt->opts.max_queue_depth;
1140 	rtransport->max_io_size = tgt->opts.max_io_size;
1141 	rtransport->in_capsule_data_size = tgt->opts.in_capsule_data_size;
1142 
1143 	rtransport->event_channel = rdma_create_event_channel();
1144 	if (rtransport->event_channel == NULL) {
1145 		SPDK_ERRLOG("rdma_create_event_channel() failed, %s\n", spdk_strerror(errno));
1146 		free(rtransport);
1147 		return NULL;
1148 	}
1149 
1150 	flag = fcntl(rtransport->event_channel->fd, F_GETFL);
1151 	if (fcntl(rtransport->event_channel->fd, F_SETFL, flag | O_NONBLOCK) < 0) {
1152 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
1153 			    rtransport->event_channel->fd, spdk_strerror(errno));
1154 		free(rtransport);
1155 		return NULL;
1156 	}
1157 
1158 	rtransport->data_buf_pool = spdk_mempool_create("spdk_nvmf_rdma",
1159 				    rtransport->max_queue_depth * 4, /* The 4 is arbitrarily chosen. Needs to be configurable. */
1160 				    rtransport->max_io_size + NVMF_DATA_BUFFER_ALIGNMENT,
1161 				    SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
1162 				    SPDK_ENV_SOCKET_ID_ANY);
1163 	if (!rtransport->data_buf_pool) {
1164 		SPDK_ERRLOG("Unable to allocate buffer pool for poll group\n");
1165 		free(rtransport);
1166 		return NULL;
1167 	}
1168 
1169 	spdk_io_device_register(rtransport, spdk_nvmf_rdma_mgmt_channel_create,
1170 				spdk_nvmf_rdma_mgmt_channel_destroy,
1171 				sizeof(struct spdk_nvmf_rdma_mgmt_channel));
1172 
1173 	contexts = rdma_get_devices(NULL);
1174 	i = 0;
1175 	rc = 0;
1176 	while (contexts[i] != NULL) {
1177 		device = calloc(1, sizeof(*device));
1178 		if (!device) {
1179 			SPDK_ERRLOG("Unable to allocate memory for RDMA devices.\n");
1180 			rc = -ENOMEM;
1181 			break;
1182 		}
1183 		device->context = contexts[i];
1184 		rc = ibv_query_device(device->context, &device->attr);
1185 		if (rc < 0) {
1186 			SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
1187 			free(device);
1188 			break;
1189 
1190 		}
1191 
1192 		device->pd = NULL;
1193 		device->map = NULL;
1194 
1195 		TAILQ_INSERT_TAIL(&rtransport->devices, device, link);
1196 		i++;
1197 	}
1198 
1199 	if (rc < 0) {
1200 		TAILQ_FOREACH_SAFE(device, &rtransport->devices, link, tmp) {
1201 			TAILQ_REMOVE(&rtransport->devices, device, link);
1202 			free(device);
1203 		}
1204 		spdk_mempool_free(rtransport->data_buf_pool);
1205 		rdma_destroy_event_channel(rtransport->event_channel);
1206 		free(rtransport);
1207 		rdma_free_devices(contexts);
1208 		return NULL;
1209 	}
1210 
1211 	rdma_free_devices(contexts);
1212 
1213 	return &rtransport->transport;
1214 }
1215 
1216 static int
1217 spdk_nvmf_rdma_destroy(struct spdk_nvmf_transport *transport)
1218 {
1219 	struct spdk_nvmf_rdma_transport	*rtransport;
1220 	struct spdk_nvmf_rdma_port	*port, *port_tmp;
1221 	struct spdk_nvmf_rdma_device	*device, *device_tmp;
1222 
1223 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1224 
1225 	TAILQ_FOREACH_SAFE(port, &rtransport->ports, link, port_tmp) {
1226 		TAILQ_REMOVE(&rtransport->ports, port, link);
1227 		rdma_destroy_id(port->id);
1228 		free(port);
1229 	}
1230 
1231 	if (rtransport->event_channel != NULL) {
1232 		rdma_destroy_event_channel(rtransport->event_channel);
1233 	}
1234 
1235 	TAILQ_FOREACH_SAFE(device, &rtransport->devices, link, device_tmp) {
1236 		TAILQ_REMOVE(&rtransport->devices, device, link);
1237 		if (device->map) {
1238 			spdk_mem_map_free(&device->map);
1239 		}
1240 		free(device);
1241 	}
1242 
1243 	if (spdk_mempool_count(rtransport->data_buf_pool) != (rtransport->max_queue_depth * 4)) {
1244 		SPDK_ERRLOG("transport buffer pool count is %zu but should be %u\n",
1245 			    spdk_mempool_count(rtransport->data_buf_pool),
1246 			    rtransport->max_queue_depth * 4);
1247 	}
1248 
1249 	spdk_mempool_free(rtransport->data_buf_pool);
1250 	spdk_io_device_unregister(rtransport, NULL);
1251 	free(rtransport);
1252 
1253 	return 0;
1254 }
1255 
1256 static int
1257 spdk_nvmf_rdma_listen(struct spdk_nvmf_transport *transport,
1258 		      const struct spdk_nvme_transport_id *trid)
1259 {
1260 	struct spdk_nvmf_rdma_transport	*rtransport;
1261 	struct spdk_nvmf_rdma_device	*device;
1262 	struct spdk_nvmf_rdma_port	*port_tmp, *port;
1263 	struct addrinfo			*res;
1264 	struct addrinfo			hints;
1265 	int				family;
1266 	int				rc;
1267 
1268 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1269 
1270 	port = calloc(1, sizeof(*port));
1271 	if (!port) {
1272 		return -ENOMEM;
1273 	}
1274 
1275 	/* Selectively copy the trid. Things like NQN don't matter here - that
1276 	 * mapping is enforced elsewhere.
1277 	 */
1278 	port->trid.trtype = SPDK_NVME_TRANSPORT_RDMA;
1279 	port->trid.adrfam = trid->adrfam;
1280 	snprintf(port->trid.traddr, sizeof(port->trid.traddr), "%s", trid->traddr);
1281 	snprintf(port->trid.trsvcid, sizeof(port->trid.trsvcid), "%s", trid->trsvcid);
1282 
1283 	pthread_mutex_lock(&rtransport->lock);
1284 	assert(rtransport->event_channel != NULL);
1285 	TAILQ_FOREACH(port_tmp, &rtransport->ports, link) {
1286 		if (spdk_nvme_transport_id_compare(&port_tmp->trid, &port->trid) == 0) {
1287 			port_tmp->ref++;
1288 			free(port);
1289 			/* Already listening at this address */
1290 			pthread_mutex_unlock(&rtransport->lock);
1291 			return 0;
1292 		}
1293 	}
1294 
1295 	rc = rdma_create_id(rtransport->event_channel, &port->id, port, RDMA_PS_TCP);
1296 	if (rc < 0) {
1297 		SPDK_ERRLOG("rdma_create_id() failed\n");
1298 		free(port);
1299 		pthread_mutex_unlock(&rtransport->lock);
1300 		return rc;
1301 	}
1302 
1303 	switch (port->trid.adrfam) {
1304 	case SPDK_NVMF_ADRFAM_IPV4:
1305 		family = AF_INET;
1306 		break;
1307 	case SPDK_NVMF_ADRFAM_IPV6:
1308 		family = AF_INET6;
1309 		break;
1310 	default:
1311 		SPDK_ERRLOG("Unhandled ADRFAM %d\n", port->trid.adrfam);
1312 		free(port);
1313 		pthread_mutex_unlock(&rtransport->lock);
1314 		return -EINVAL;
1315 	}
1316 
1317 	memset(&hints, 0, sizeof(hints));
1318 	hints.ai_family = family;
1319 	hints.ai_socktype = SOCK_STREAM;
1320 	hints.ai_protocol = 0;
1321 
1322 	rc = getaddrinfo(port->trid.traddr, port->trid.trsvcid, &hints, &res);
1323 	if (rc) {
1324 		SPDK_ERRLOG("getaddrinfo failed: %s (%d)\n", gai_strerror(rc), rc);
1325 		free(port);
1326 		pthread_mutex_unlock(&rtransport->lock);
1327 		return -EINVAL;
1328 	}
1329 
1330 	rc = rdma_bind_addr(port->id, res->ai_addr);
1331 	freeaddrinfo(res);
1332 
1333 	if (rc < 0) {
1334 		SPDK_ERRLOG("rdma_bind_addr() failed\n");
1335 		rdma_destroy_id(port->id);
1336 		free(port);
1337 		pthread_mutex_unlock(&rtransport->lock);
1338 		return rc;
1339 	}
1340 
1341 	rc = rdma_listen(port->id, 10); /* 10 = backlog */
1342 	if (rc < 0) {
1343 		SPDK_ERRLOG("rdma_listen() failed\n");
1344 		rdma_destroy_id(port->id);
1345 		free(port);
1346 		pthread_mutex_unlock(&rtransport->lock);
1347 		return rc;
1348 	}
1349 
1350 	TAILQ_FOREACH(device, &rtransport->devices, link) {
1351 		if (device->context == port->id->verbs) {
1352 			port->device = device;
1353 			break;
1354 		}
1355 	}
1356 	if (!port->device) {
1357 		SPDK_ERRLOG("Accepted a connection with verbs %p, but unable to find a corresponding device.\n",
1358 			    port->id->verbs);
1359 		rdma_destroy_id(port->id);
1360 		free(port);
1361 		pthread_mutex_unlock(&rtransport->lock);
1362 		return -EINVAL;
1363 	}
1364 
1365 	if (!device->map) {
1366 		device->pd = port->id->pd;
1367 		device->map = spdk_mem_map_alloc(0, spdk_nvmf_rdma_mem_notify, device);
1368 		if (!device->map) {
1369 			SPDK_ERRLOG("Unable to allocate memory map for new poll group\n");
1370 			return -1;
1371 		}
1372 	} else {
1373 		assert(device->pd == port->id->pd);
1374 	}
1375 
1376 	SPDK_INFOLOG(SPDK_LOG_RDMA, "*** NVMf Target Listening on %s port %d ***\n",
1377 		     port->trid.traddr, ntohs(rdma_get_src_port(port->id)));
1378 
1379 	port->ref = 1;
1380 
1381 	TAILQ_INSERT_TAIL(&rtransport->ports, port, link);
1382 	pthread_mutex_unlock(&rtransport->lock);
1383 
1384 	return 0;
1385 }
1386 
1387 static int
1388 spdk_nvmf_rdma_stop_listen(struct spdk_nvmf_transport *transport,
1389 			   const struct spdk_nvme_transport_id *_trid)
1390 {
1391 	struct spdk_nvmf_rdma_transport *rtransport;
1392 	struct spdk_nvmf_rdma_port *port, *tmp;
1393 	struct spdk_nvme_transport_id trid = {};
1394 
1395 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1396 
1397 	/* Selectively copy the trid. Things like NQN don't matter here - that
1398 	 * mapping is enforced elsewhere.
1399 	 */
1400 	trid.trtype = SPDK_NVME_TRANSPORT_RDMA;
1401 	trid.adrfam = _trid->adrfam;
1402 	snprintf(trid.traddr, sizeof(port->trid.traddr), "%s", _trid->traddr);
1403 	snprintf(trid.trsvcid, sizeof(port->trid.trsvcid), "%s", _trid->trsvcid);
1404 
1405 	pthread_mutex_lock(&rtransport->lock);
1406 	TAILQ_FOREACH_SAFE(port, &rtransport->ports, link, tmp) {
1407 		if (spdk_nvme_transport_id_compare(&port->trid, &trid) == 0) {
1408 			assert(port->ref > 0);
1409 			port->ref--;
1410 			if (port->ref == 0) {
1411 				TAILQ_REMOVE(&rtransport->ports, port, link);
1412 				rdma_destroy_id(port->id);
1413 				free(port);
1414 			}
1415 			break;
1416 		}
1417 	}
1418 
1419 	pthread_mutex_unlock(&rtransport->lock);
1420 	return 0;
1421 }
1422 
1423 static void
1424 spdk_nvmf_rdma_accept(struct spdk_nvmf_transport *transport, new_qpair_fn cb_fn)
1425 {
1426 	struct spdk_nvmf_rdma_transport *rtransport;
1427 	struct rdma_cm_event		*event;
1428 	int				rc;
1429 
1430 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1431 
1432 	if (rtransport->event_channel == NULL) {
1433 		return;
1434 	}
1435 
1436 	while (1) {
1437 		rc = rdma_get_cm_event(rtransport->event_channel, &event);
1438 		if (rc == 0) {
1439 			SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Acceptor Event: %s\n", CM_EVENT_STR[event->event]);
1440 
1441 			switch (event->event) {
1442 			case RDMA_CM_EVENT_ADDR_RESOLVED:
1443 			case RDMA_CM_EVENT_ADDR_ERROR:
1444 			case RDMA_CM_EVENT_ROUTE_RESOLVED:
1445 			case RDMA_CM_EVENT_ROUTE_ERROR:
1446 				/* No action required. The target never attempts to resolve routes. */
1447 				break;
1448 			case RDMA_CM_EVENT_CONNECT_REQUEST:
1449 				rc = nvmf_rdma_connect(transport, event, cb_fn);
1450 				if (rc < 0) {
1451 					SPDK_ERRLOG("Unable to process connect event. rc: %d\n", rc);
1452 					break;
1453 				}
1454 				break;
1455 			case RDMA_CM_EVENT_CONNECT_RESPONSE:
1456 				/* The target never initiates a new connection. So this will not occur. */
1457 				break;
1458 			case RDMA_CM_EVENT_CONNECT_ERROR:
1459 				/* Can this happen? The docs say it can, but not sure what causes it. */
1460 				break;
1461 			case RDMA_CM_EVENT_UNREACHABLE:
1462 			case RDMA_CM_EVENT_REJECTED:
1463 				/* These only occur on the client side. */
1464 				break;
1465 			case RDMA_CM_EVENT_ESTABLISHED:
1466 				/* TODO: Should we be waiting for this event anywhere? */
1467 				break;
1468 			case RDMA_CM_EVENT_DISCONNECTED:
1469 			case RDMA_CM_EVENT_DEVICE_REMOVAL:
1470 				rc = nvmf_rdma_disconnect(event);
1471 				if (rc < 0) {
1472 					SPDK_ERRLOG("Unable to process disconnect event. rc: %d\n", rc);
1473 					break;
1474 				}
1475 				continue;
1476 			case RDMA_CM_EVENT_MULTICAST_JOIN:
1477 			case RDMA_CM_EVENT_MULTICAST_ERROR:
1478 				/* Multicast is not used */
1479 				break;
1480 			case RDMA_CM_EVENT_ADDR_CHANGE:
1481 				/* Not utilizing this event */
1482 				break;
1483 			case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1484 				/* For now, do nothing. The target never re-uses queue pairs. */
1485 				break;
1486 			default:
1487 				SPDK_ERRLOG("Unexpected Acceptor Event [%d]\n", event->event);
1488 				break;
1489 			}
1490 
1491 			rdma_ack_cm_event(event);
1492 		} else {
1493 			if (errno != EAGAIN && errno != EWOULDBLOCK) {
1494 				SPDK_ERRLOG("Acceptor Event Error: %s\n", spdk_strerror(errno));
1495 			}
1496 			break;
1497 		}
1498 	}
1499 }
1500 
1501 static void
1502 spdk_nvmf_rdma_discover(struct spdk_nvmf_transport *transport,
1503 			struct spdk_nvme_transport_id *trid,
1504 			struct spdk_nvmf_discovery_log_page_entry *entry)
1505 {
1506 	entry->trtype = SPDK_NVMF_TRTYPE_RDMA;
1507 	entry->adrfam = trid->adrfam;
1508 	entry->treq.secure_channel = SPDK_NVMF_TREQ_SECURE_CHANNEL_NOT_SPECIFIED;
1509 
1510 	spdk_strcpy_pad(entry->trsvcid, trid->trsvcid, sizeof(entry->trsvcid), ' ');
1511 	spdk_strcpy_pad(entry->traddr, trid->traddr, sizeof(entry->traddr), ' ');
1512 
1513 	entry->tsas.rdma.rdma_qptype = SPDK_NVMF_RDMA_QPTYPE_RELIABLE_CONNECTED;
1514 	entry->tsas.rdma.rdma_prtype = SPDK_NVMF_RDMA_PRTYPE_NONE;
1515 	entry->tsas.rdma.rdma_cms = SPDK_NVMF_RDMA_CMS_RDMA_CM;
1516 }
1517 
1518 static struct spdk_nvmf_transport_poll_group *
1519 spdk_nvmf_rdma_poll_group_create(struct spdk_nvmf_transport *transport)
1520 {
1521 	struct spdk_nvmf_rdma_transport		*rtransport;
1522 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1523 	struct spdk_nvmf_rdma_poller		*poller;
1524 	struct spdk_nvmf_rdma_device		*device;
1525 
1526 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1527 
1528 	rgroup = calloc(1, sizeof(*rgroup));
1529 	if (!rgroup) {
1530 		return NULL;
1531 	}
1532 
1533 	TAILQ_INIT(&rgroup->pollers);
1534 
1535 	pthread_mutex_lock(&rtransport->lock);
1536 	TAILQ_FOREACH(device, &rtransport->devices, link) {
1537 		if (device->map == NULL) {
1538 			/*
1539 			 * The device is not in use (no listeners),
1540 			 * so no protection domain has been constructed.
1541 			 * Skip it.
1542 			 */
1543 			SPDK_NOTICELOG("Skipping unused RDMA device when creating poll group.\n");
1544 			continue;
1545 		}
1546 
1547 		poller = calloc(1, sizeof(*poller));
1548 		if (!poller) {
1549 			SPDK_ERRLOG("Unable to allocate memory for new RDMA poller\n");
1550 			free(rgroup);
1551 			pthread_mutex_unlock(&rtransport->lock);
1552 			return NULL;
1553 		}
1554 
1555 		poller->device = device;
1556 		poller->group = rgroup;
1557 
1558 		TAILQ_INIT(&poller->qpairs);
1559 
1560 		poller->cq = ibv_create_cq(device->context, NVMF_RDMA_CQ_SIZE, poller, NULL, 0);
1561 		if (!poller->cq) {
1562 			SPDK_ERRLOG("Unable to create completion queue\n");
1563 			free(poller);
1564 			free(rgroup);
1565 			pthread_mutex_unlock(&rtransport->lock);
1566 			return NULL;
1567 		}
1568 
1569 		TAILQ_INSERT_TAIL(&rgroup->pollers, poller, link);
1570 	}
1571 
1572 	pthread_mutex_unlock(&rtransport->lock);
1573 	return &rgroup->group;
1574 }
1575 
1576 static void
1577 spdk_nvmf_rdma_poll_group_destroy(struct spdk_nvmf_transport_poll_group *group)
1578 {
1579 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1580 	struct spdk_nvmf_rdma_poller		*poller, *tmp;
1581 
1582 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
1583 
1584 	if (!rgroup) {
1585 		return;
1586 	}
1587 
1588 	TAILQ_FOREACH_SAFE(poller, &rgroup->pollers, link, tmp) {
1589 		TAILQ_REMOVE(&rgroup->pollers, poller, link);
1590 
1591 		if (poller->cq) {
1592 			ibv_destroy_cq(poller->cq);
1593 		}
1594 
1595 		free(poller);
1596 	}
1597 
1598 	free(rgroup);
1599 }
1600 
1601 static int
1602 spdk_nvmf_rdma_poll_group_add(struct spdk_nvmf_transport_poll_group *group,
1603 			      struct spdk_nvmf_qpair *qpair)
1604 {
1605 	struct spdk_nvmf_rdma_transport		*rtransport;
1606 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1607 	struct spdk_nvmf_rdma_qpair		*rqpair;
1608 	struct spdk_nvmf_rdma_device		*device;
1609 	struct spdk_nvmf_rdma_poller		*poller;
1610 	int					rc;
1611 
1612 	rtransport = SPDK_CONTAINEROF(qpair->transport, struct spdk_nvmf_rdma_transport, transport);
1613 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
1614 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
1615 
1616 	device = rqpair->port->device;
1617 
1618 	if (device->pd != rqpair->cm_id->pd) {
1619 		SPDK_ERRLOG("Mismatched protection domains\n");
1620 		return -1;
1621 	}
1622 
1623 	TAILQ_FOREACH(poller, &rgroup->pollers, link) {
1624 		if (poller->device == device) {
1625 			break;
1626 		}
1627 	}
1628 
1629 	if (!poller) {
1630 		SPDK_ERRLOG("No poller found for device.\n");
1631 		return -1;
1632 	}
1633 
1634 	TAILQ_INSERT_TAIL(&poller->qpairs, rqpair, link);
1635 	rqpair->poller = poller;
1636 
1637 	rc = spdk_nvmf_rdma_qpair_initialize(qpair);
1638 	if (rc < 0) {
1639 		SPDK_ERRLOG("Failed to initialize nvmf_rdma_qpair with qpair=%p\n", qpair);
1640 		return -1;
1641 	}
1642 
1643 	rqpair->mgmt_channel = spdk_get_io_channel(rtransport);
1644 	if (!rqpair->mgmt_channel) {
1645 		spdk_nvmf_rdma_event_reject(rqpair->cm_id, SPDK_NVMF_RDMA_ERROR_NO_RESOURCES);
1646 		spdk_nvmf_rdma_qpair_destroy(rqpair);
1647 		return -1;
1648 	}
1649 
1650 	rqpair->ch = spdk_io_channel_get_ctx(rqpair->mgmt_channel);
1651 	assert(rqpair->ch != NULL);
1652 
1653 	rc = spdk_nvmf_rdma_event_accept(rqpair->cm_id, rqpair);
1654 	if (rc) {
1655 		/* Try to reject, but we probably can't */
1656 		spdk_nvmf_rdma_event_reject(rqpair->cm_id, SPDK_NVMF_RDMA_ERROR_NO_RESOURCES);
1657 		spdk_nvmf_rdma_qpair_destroy(rqpair);
1658 		return -1;
1659 	}
1660 
1661 	return 0;
1662 }
1663 
1664 static int
1665 spdk_nvmf_rdma_poll_group_remove(struct spdk_nvmf_transport_poll_group *group,
1666 				 struct spdk_nvmf_qpair *qpair)
1667 {
1668 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1669 	struct spdk_nvmf_rdma_qpair		*rqpair;
1670 	struct spdk_nvmf_rdma_device		*device;
1671 	struct spdk_nvmf_rdma_poller		*poller;
1672 	struct spdk_nvmf_rdma_qpair		*rq, *trq;
1673 
1674 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
1675 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
1676 
1677 	device = rqpair->port->device;
1678 
1679 	TAILQ_FOREACH(poller, &rgroup->pollers, link) {
1680 		if (poller->device == device) {
1681 			break;
1682 		}
1683 	}
1684 
1685 	if (!poller) {
1686 		SPDK_ERRLOG("No poller found for device.\n");
1687 		return -1;
1688 	}
1689 
1690 	TAILQ_FOREACH_SAFE(rq, &poller->qpairs, link, trq) {
1691 		if (rq == rqpair) {
1692 			TAILQ_REMOVE(&poller->qpairs, rqpair, link);
1693 			break;
1694 		}
1695 	}
1696 
1697 	if (rq == NULL) {
1698 		SPDK_ERRLOG("RDMA qpair cannot be removed from group (not in group).\n");
1699 		return -1;
1700 	}
1701 
1702 	return 0;
1703 }
1704 
1705 static int
1706 spdk_nvmf_rdma_request_complete(struct spdk_nvmf_request *req)
1707 {
1708 	struct spdk_nvmf_rdma_transport	*rtransport = SPDK_CONTAINEROF(req->qpair->transport,
1709 			struct spdk_nvmf_rdma_transport, transport);
1710 	struct spdk_nvmf_rdma_request	*rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
1711 
1712 	rdma_req->state = RDMA_REQUEST_STATE_EXECUTED;
1713 	spdk_nvmf_rdma_request_process(rtransport, rdma_req);
1714 
1715 	return 0;
1716 }
1717 
1718 static void
1719 spdk_nvmf_rdma_close_qpair(struct spdk_nvmf_qpair *qpair)
1720 {
1721 	spdk_nvmf_rdma_qpair_destroy(SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair));
1722 }
1723 
1724 static void
1725 spdk_nvmf_rdma_qpair_process_pending(struct spdk_nvmf_rdma_transport *rtransport,
1726 				     struct spdk_nvmf_rdma_qpair *rqpair)
1727 {
1728 	struct spdk_nvmf_rdma_recv	*rdma_recv, *recv_tmp;
1729 	struct spdk_nvmf_rdma_request	*rdma_req, *req_tmp;
1730 
1731 	/* We process I/O in the pending_rdma_rw queue at the highest priority. */
1732 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->pending_rdma_rw_queue, link, req_tmp) {
1733 		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
1734 			break;
1735 		}
1736 	}
1737 
1738 	/* The second highest priority is I/O waiting on memory buffers. */
1739 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->ch->pending_data_buf_queue, link, req_tmp) {
1740 		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
1741 			break;
1742 		}
1743 	}
1744 
1745 	/* The lowest priority is processing newly received commands */
1746 	TAILQ_FOREACH_SAFE(rdma_recv, &rqpair->incoming_queue, link, recv_tmp) {
1747 		rdma_req = TAILQ_FIRST(&rqpair->free_queue);
1748 		if (rdma_req == NULL) {
1749 			/* Need to wait for more SEND completions */
1750 			break;
1751 		}
1752 
1753 		rdma_req->recv = rdma_recv;
1754 		rdma_req->state = RDMA_REQUEST_STATE_NEW;
1755 		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
1756 			break;
1757 		}
1758 	}
1759 }
1760 
1761 static struct spdk_nvmf_rdma_request *
1762 get_rdma_req_from_wc(struct ibv_wc *wc)
1763 {
1764 	struct spdk_nvmf_rdma_request *rdma_req;
1765 
1766 	rdma_req = (struct spdk_nvmf_rdma_request *)wc->wr_id;
1767 	assert(rdma_req != NULL);
1768 
1769 #ifdef DEBUG
1770 	struct spdk_nvmf_rdma_qpair *rqpair;
1771 	rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
1772 
1773 	assert(rdma_req - rqpair->reqs >= 0);
1774 	assert(rdma_req - rqpair->reqs < (ptrdiff_t)rqpair->max_queue_depth);
1775 #endif
1776 
1777 	return rdma_req;
1778 }
1779 
1780 static struct spdk_nvmf_rdma_recv *
1781 get_rdma_recv_from_wc(struct ibv_wc *wc)
1782 {
1783 	struct spdk_nvmf_rdma_recv *rdma_recv;
1784 
1785 	assert(wc->byte_len >= sizeof(struct spdk_nvmf_capsule_cmd));
1786 
1787 	rdma_recv = (struct spdk_nvmf_rdma_recv *)wc->wr_id;
1788 	assert(rdma_recv != NULL);
1789 
1790 #ifdef DEBUG
1791 	struct spdk_nvmf_rdma_qpair *rqpair = rdma_recv->qpair;
1792 
1793 	assert(rdma_recv - rqpair->recvs >= 0);
1794 	assert(rdma_recv - rqpair->recvs < (ptrdiff_t)rqpair->max_queue_depth);
1795 #endif
1796 
1797 	return rdma_recv;
1798 }
1799 
1800 static int
1801 spdk_nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport,
1802 			   struct spdk_nvmf_rdma_poller *rpoller)
1803 {
1804 	struct ibv_wc wc[32];
1805 	struct spdk_nvmf_rdma_request	*rdma_req;
1806 	struct spdk_nvmf_rdma_recv	*rdma_recv;
1807 	struct spdk_nvmf_rdma_qpair	*rqpair;
1808 	int reaped, i;
1809 	int count = 0;
1810 	bool error = false;
1811 
1812 	/* Poll for completing operations. */
1813 	reaped = ibv_poll_cq(rpoller->cq, 32, wc);
1814 	if (reaped < 0) {
1815 		SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
1816 			    errno, spdk_strerror(errno));
1817 		return -1;
1818 	}
1819 
1820 	for (i = 0; i < reaped; i++) {
1821 		if (wc[i].status) {
1822 			SPDK_ERRLOG("CQ error on CQ %p, Request 0x%lu (%d): %s\n",
1823 				    rpoller->cq, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
1824 			error = true;
1825 			continue;
1826 		}
1827 
1828 		switch (wc[i].opcode) {
1829 		case IBV_WC_SEND:
1830 			rdma_req = get_rdma_req_from_wc(&wc[i]);
1831 			rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
1832 
1833 			assert(rdma_req->state == RDMA_REQUEST_STATE_COMPLETING);
1834 			rdma_req->state = RDMA_REQUEST_STATE_COMPLETED;
1835 
1836 			spdk_nvmf_rdma_request_process(rtransport, rdma_req);
1837 
1838 			count++;
1839 
1840 			/* Try to process other queued requests */
1841 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair);
1842 			break;
1843 
1844 		case IBV_WC_RDMA_WRITE:
1845 			rdma_req = get_rdma_req_from_wc(&wc[i]);
1846 			rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
1847 
1848 			rqpair->cur_rdma_rw_depth--;
1849 
1850 			/* Try to process other queued requests */
1851 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair);
1852 			break;
1853 
1854 		case IBV_WC_RDMA_READ:
1855 			rdma_req = get_rdma_req_from_wc(&wc[i]);
1856 			rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
1857 
1858 			assert(rdma_req->state == RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER);
1859 			rqpair->cur_rdma_rw_depth--;
1860 			rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
1861 
1862 			spdk_nvmf_rdma_request_process(rtransport, rdma_req);
1863 
1864 			/* Try to process other queued requests */
1865 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair);
1866 			break;
1867 
1868 		case IBV_WC_RECV:
1869 			rdma_recv = get_rdma_recv_from_wc(&wc[i]);
1870 			rqpair = rdma_recv->qpair;
1871 
1872 			TAILQ_INSERT_TAIL(&rqpair->incoming_queue, rdma_recv, link);
1873 
1874 			/* Try to process other queued requests */
1875 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair);
1876 			break;
1877 
1878 		default:
1879 			SPDK_ERRLOG("Received an unknown opcode on the CQ: %d\n", wc[i].opcode);
1880 			continue;
1881 		}
1882 	}
1883 
1884 	if (error == true) {
1885 		return -1;
1886 	}
1887 
1888 	return count;
1889 }
1890 
1891 static int
1892 spdk_nvmf_rdma_poll_group_poll(struct spdk_nvmf_transport_poll_group *group)
1893 {
1894 	struct spdk_nvmf_rdma_transport *rtransport;
1895 	struct spdk_nvmf_rdma_poll_group *rgroup;
1896 	struct spdk_nvmf_rdma_poller	*rpoller;
1897 	int				count, rc;
1898 
1899 	rtransport = SPDK_CONTAINEROF(group->transport, struct spdk_nvmf_rdma_transport, transport);
1900 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
1901 
1902 	count = 0;
1903 	TAILQ_FOREACH(rpoller, &rgroup->pollers, link) {
1904 		rc = spdk_nvmf_rdma_poller_poll(rtransport, rpoller);
1905 		if (rc < 0) {
1906 			return rc;
1907 		}
1908 		count += rc;
1909 	}
1910 
1911 	return count;
1912 }
1913 
1914 static bool
1915 spdk_nvmf_rdma_qpair_is_idle(struct spdk_nvmf_qpair *qpair)
1916 {
1917 	struct spdk_nvmf_rdma_qpair *rqpair;
1918 
1919 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
1920 
1921 	if (rqpair->cur_queue_depth == 0 && rqpair->cur_rdma_rw_depth == 0) {
1922 		return true;
1923 	}
1924 	return false;
1925 }
1926 
1927 const struct spdk_nvmf_transport_ops spdk_nvmf_transport_rdma = {
1928 	.type = SPDK_NVME_TRANSPORT_RDMA,
1929 	.create = spdk_nvmf_rdma_create,
1930 	.destroy = spdk_nvmf_rdma_destroy,
1931 
1932 	.listen = spdk_nvmf_rdma_listen,
1933 	.stop_listen = spdk_nvmf_rdma_stop_listen,
1934 	.accept = spdk_nvmf_rdma_accept,
1935 
1936 	.listener_discover = spdk_nvmf_rdma_discover,
1937 
1938 	.poll_group_create = spdk_nvmf_rdma_poll_group_create,
1939 	.poll_group_destroy = spdk_nvmf_rdma_poll_group_destroy,
1940 	.poll_group_add = spdk_nvmf_rdma_poll_group_add,
1941 	.poll_group_remove = spdk_nvmf_rdma_poll_group_remove,
1942 	.poll_group_poll = spdk_nvmf_rdma_poll_group_poll,
1943 
1944 	.req_complete = spdk_nvmf_rdma_request_complete,
1945 
1946 	.qpair_fini = spdk_nvmf_rdma_close_qpair,
1947 	.qpair_is_idle = spdk_nvmf_rdma_qpair_is_idle,
1948 
1949 };
1950 
1951 SPDK_LOG_REGISTER_COMPONENT("rdma", SPDK_LOG_RDMA)
1952