xref: /spdk/lib/nvmf/rdma.c (revision 22898a91b9b6f289933db19b0175821cfb7e7820)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include <infiniband/verbs.h>
37 #include <rdma/rdma_cma.h>
38 #include <rdma/rdma_verbs.h>
39 
40 #include "nvmf_internal.h"
41 #include "transport.h"
42 
43 #include "spdk/assert.h"
44 #include "spdk/io_channel.h"
45 #include "spdk/nvmf.h"
46 #include "spdk/nvmf_spec.h"
47 #include "spdk/string.h"
48 #include "spdk/trace.h"
49 #include "spdk/util.h"
50 
51 #include "spdk_internal/log.h"
52 
53 /*
54  RDMA Connection Resouce Defaults
55  */
56 #define NVMF_DEFAULT_TX_SGE		1
57 #define NVMF_DEFAULT_RX_SGE		2
58 
59 /* The RDMA completion queue size */
60 #define NVMF_RDMA_CQ_SIZE	4096
61 
62 /* AIO backend requires block size aligned data buffers,
63  * extra 4KiB aligned data buffer should work for most devices.
64  */
65 #define SHIFT_4KB			12
66 #define NVMF_DATA_BUFFER_ALIGNMENT	(1 << SHIFT_4KB)
67 #define NVMF_DATA_BUFFER_MASK		(NVMF_DATA_BUFFER_ALIGNMENT - 1)
68 
69 enum spdk_nvmf_rdma_request_state {
70 	/* The request is not currently in use */
71 	RDMA_REQUEST_STATE_FREE = 0,
72 
73 	/* Initial state when request first received */
74 	RDMA_REQUEST_STATE_NEW,
75 
76 	/* The request is queued until a data buffer is available. */
77 	RDMA_REQUEST_STATE_NEED_BUFFER,
78 
79 	/* The request is waiting on RDMA queue depth availability
80 	 * to transfer data from the host to the controller.
81 	 */
82 	RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER,
83 
84 	/* The request is currently transferring data from the host to the controller. */
85 	RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER,
86 
87 	/* The request is ready to execute at the block device */
88 	RDMA_REQUEST_STATE_READY_TO_EXECUTE,
89 
90 	/* The request is currently executing at the block device */
91 	RDMA_REQUEST_STATE_EXECUTING,
92 
93 	/* The request finished executing at the block device */
94 	RDMA_REQUEST_STATE_EXECUTED,
95 
96 	/* The request is waiting on RDMA queue depth availability
97 	 * to transfer data from the controller to the host.
98 	 */
99 	RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST,
100 
101 	/* The request is ready to send a completion */
102 	RDMA_REQUEST_STATE_READY_TO_COMPLETE,
103 
104 	/* The request currently has a completion outstanding */
105 	RDMA_REQUEST_STATE_COMPLETING,
106 
107 	/* The request completed and can be marked free. */
108 	RDMA_REQUEST_STATE_COMPLETED,
109 };
110 
111 /* This structure holds commands as they are received off the wire.
112  * It must be dynamically paired with a full request object
113  * (spdk_nvmf_rdma_request) to service a request. It is separate
114  * from the request because RDMA does not appear to order
115  * completions, so occasionally we'll get a new incoming
116  * command when there aren't any free request objects.
117  */
118 struct spdk_nvmf_rdma_recv {
119 	struct ibv_recv_wr		wr;
120 	struct ibv_sge			sgl[NVMF_DEFAULT_RX_SGE];
121 
122 	struct spdk_nvmf_rdma_qpair	*qpair;
123 
124 	/* In-capsule data buffer */
125 	uint8_t				*buf;
126 
127 	TAILQ_ENTRY(spdk_nvmf_rdma_recv) link;
128 };
129 
130 struct spdk_nvmf_rdma_request {
131 	struct spdk_nvmf_request		req;
132 	void					*data_from_pool;
133 
134 	enum spdk_nvmf_rdma_request_state	state;
135 
136 	struct spdk_nvmf_rdma_recv		*recv;
137 
138 	struct {
139 		struct	ibv_send_wr		wr;
140 		struct	ibv_sge			sgl[NVMF_DEFAULT_TX_SGE];
141 	} rsp;
142 
143 	struct {
144 		struct ibv_send_wr		wr;
145 		struct ibv_sge			sgl[NVMF_DEFAULT_TX_SGE];
146 	} data;
147 
148 	TAILQ_ENTRY(spdk_nvmf_rdma_request)	link;
149 };
150 
151 struct spdk_nvmf_rdma_qpair {
152 	struct spdk_nvmf_qpair			qpair;
153 
154 	struct spdk_nvmf_rdma_port		*port;
155 	struct spdk_nvmf_rdma_poller		*poller;
156 
157 	struct rdma_cm_id			*cm_id;
158 
159 	/* The maximum number of I/O outstanding on this connection at one time */
160 	uint16_t				max_queue_depth;
161 
162 	/* The maximum number of active RDMA READ and WRITE operations at one time */
163 	uint16_t				max_rw_depth;
164 
165 	/* The current number of I/O outstanding on this connection. This number
166 	 * includes all I/O from the time the capsule is first received until it is
167 	 * completed.
168 	 */
169 	uint16_t				cur_queue_depth;
170 
171 	/* The number of RDMA READ and WRITE requests that are outstanding */
172 	uint16_t				cur_rdma_rw_depth;
173 
174 	/* Receives that are waiting for a request object */
175 	TAILQ_HEAD(, spdk_nvmf_rdma_recv)	incoming_queue;
176 
177 	/* Requests that are not in use */
178 	TAILQ_HEAD(, spdk_nvmf_rdma_request)	free_queue;
179 
180 	/* Requests that are waiting to perform an RDMA READ or WRITE */
181 	TAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_rdma_rw_queue;
182 
183 	/* Array of size "max_queue_depth" containing RDMA requests. */
184 	struct spdk_nvmf_rdma_request		*reqs;
185 
186 	/* Array of size "max_queue_depth" containing RDMA recvs. */
187 	struct spdk_nvmf_rdma_recv		*recvs;
188 
189 	/* Array of size "max_queue_depth" containing 64 byte capsules
190 	 * used for receive.
191 	 */
192 	union nvmf_h2c_msg			*cmds;
193 	struct ibv_mr				*cmds_mr;
194 
195 	/* Array of size "max_queue_depth" containing 16 byte completions
196 	 * to be sent back to the user.
197 	 */
198 	union nvmf_c2h_msg			*cpls;
199 	struct ibv_mr				*cpls_mr;
200 
201 	/* Array of size "max_queue_depth * InCapsuleDataSize" containing
202 	 * buffers to be used for in capsule data.
203 	 */
204 	void					*bufs;
205 	struct ibv_mr				*bufs_mr;
206 
207 	TAILQ_ENTRY(spdk_nvmf_rdma_qpair)	link;
208 	TAILQ_ENTRY(spdk_nvmf_rdma_qpair)	pending_link;
209 
210 	/* Mgmt channel */
211 	struct spdk_io_channel			*mgmt_channel;
212 	struct spdk_nvmf_rdma_mgmt_channel	*ch;
213 };
214 
215 struct spdk_nvmf_rdma_poller {
216 	struct spdk_nvmf_rdma_device		*device;
217 	struct spdk_nvmf_rdma_poll_group	*group;
218 
219 	struct ibv_cq				*cq;
220 
221 	TAILQ_HEAD(, spdk_nvmf_rdma_qpair)	qpairs;
222 
223 	TAILQ_ENTRY(spdk_nvmf_rdma_poller)	link;
224 };
225 
226 struct spdk_nvmf_rdma_poll_group {
227 	struct spdk_nvmf_transport_poll_group	group;
228 
229 	TAILQ_HEAD(, spdk_nvmf_rdma_poller)	pollers;
230 };
231 
232 /* Assuming rdma_cm uses just one protection domain per ibv_context. */
233 struct spdk_nvmf_rdma_device {
234 	struct ibv_device_attr			attr;
235 	struct ibv_context			*context;
236 
237 	struct spdk_mem_map			*map;
238 	struct ibv_pd				*pd;
239 
240 	TAILQ_ENTRY(spdk_nvmf_rdma_device)	link;
241 };
242 
243 struct spdk_nvmf_rdma_port {
244 	struct spdk_nvme_transport_id		trid;
245 	struct rdma_cm_id			*id;
246 	struct spdk_nvmf_rdma_device		*device;
247 	uint32_t				ref;
248 	TAILQ_ENTRY(spdk_nvmf_rdma_port)	link;
249 };
250 
251 struct spdk_nvmf_rdma_transport {
252 	struct spdk_nvmf_transport	transport;
253 
254 	struct rdma_event_channel	*event_channel;
255 
256 	struct spdk_mempool		*data_buf_pool;
257 
258 	pthread_mutex_t			lock;
259 
260 	uint16_t			max_queue_depth;
261 	uint32_t			max_io_size;
262 	uint32_t			in_capsule_data_size;
263 
264 	TAILQ_HEAD(, spdk_nvmf_rdma_device)	devices;
265 	TAILQ_HEAD(, spdk_nvmf_rdma_port)	ports;
266 };
267 
268 struct spdk_nvmf_rdma_mgmt_channel {
269 	/* Requests that are waiting to obtain a data buffer */
270 	TAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_data_buf_queue;
271 };
272 
273 static int
274 spdk_nvmf_rdma_mgmt_channel_create(void *io_device, void *ctx_buf)
275 {
276 	struct spdk_nvmf_rdma_mgmt_channel *ch = ctx_buf;
277 
278 	TAILQ_INIT(&ch->pending_data_buf_queue);
279 	return 0;
280 }
281 
282 static void
283 spdk_nvmf_rdma_mgmt_channel_destroy(void *io_device, void *ctx_buf)
284 {
285 	struct spdk_nvmf_rdma_mgmt_channel *ch = ctx_buf;
286 
287 	if (!TAILQ_EMPTY(&ch->pending_data_buf_queue)) {
288 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel destruction\n");
289 	}
290 }
291 
292 static void
293 spdk_nvmf_rdma_qpair_destroy(struct spdk_nvmf_rdma_qpair *rqpair)
294 {
295 	if (rqpair->poller) {
296 		TAILQ_REMOVE(&rqpair->poller->qpairs, rqpair, link);
297 	}
298 
299 	if (rqpair->cmds_mr) {
300 		ibv_dereg_mr(rqpair->cmds_mr);
301 	}
302 
303 	if (rqpair->cpls_mr) {
304 		ibv_dereg_mr(rqpair->cpls_mr);
305 	}
306 
307 	if (rqpair->bufs_mr) {
308 		ibv_dereg_mr(rqpair->bufs_mr);
309 	}
310 
311 	if (rqpair->cm_id) {
312 		rdma_destroy_qp(rqpair->cm_id);
313 		rdma_destroy_id(rqpair->cm_id);
314 	}
315 
316 	if (rqpair->mgmt_channel) {
317 		spdk_put_io_channel(rqpair->mgmt_channel);
318 	}
319 
320 	/* Free all memory */
321 	spdk_dma_free(rqpair->cmds);
322 	spdk_dma_free(rqpair->cpls);
323 	spdk_dma_free(rqpair->bufs);
324 	free(rqpair->reqs);
325 	free(rqpair->recvs);
326 	free(rqpair);
327 }
328 
329 static int
330 spdk_nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair)
331 {
332 	struct spdk_nvmf_rdma_transport *rtransport;
333 	struct spdk_nvmf_rdma_qpair	*rqpair;
334 	int				rc, i;
335 	struct ibv_qp_init_attr		attr;
336 	struct spdk_nvmf_rdma_recv	*rdma_recv;
337 	struct spdk_nvmf_rdma_request	*rdma_req;
338 
339 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
340 	rtransport = SPDK_CONTAINEROF(qpair->transport, struct spdk_nvmf_rdma_transport, transport);
341 
342 	memset(&attr, 0, sizeof(struct ibv_qp_init_attr));
343 	attr.qp_type		= IBV_QPT_RC;
344 	attr.send_cq		= rqpair->poller->cq;
345 	attr.recv_cq		= rqpair->poller->cq;
346 	attr.cap.max_send_wr	= rqpair->max_queue_depth * 2; /* SEND, READ, and WRITE operations */
347 	attr.cap.max_recv_wr	= rqpair->max_queue_depth; /* RECV operations */
348 	attr.cap.max_send_sge	= NVMF_DEFAULT_TX_SGE;
349 	attr.cap.max_recv_sge	= NVMF_DEFAULT_RX_SGE;
350 
351 	rc = rdma_create_qp(rqpair->cm_id, NULL, &attr);
352 	if (rc) {
353 		SPDK_ERRLOG("rdma_create_qp failed: errno %d: %s\n", errno, spdk_strerror(errno));
354 		rdma_destroy_id(rqpair->cm_id);
355 		rqpair->cm_id = NULL;
356 		spdk_nvmf_rdma_qpair_destroy(rqpair);
357 		return -1;
358 	}
359 
360 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "New RDMA Connection: %p\n", qpair);
361 
362 	rqpair->reqs = calloc(rqpair->max_queue_depth, sizeof(*rqpair->reqs));
363 	rqpair->recvs = calloc(rqpair->max_queue_depth, sizeof(*rqpair->recvs));
364 	rqpair->cmds = spdk_dma_zmalloc(rqpair->max_queue_depth * sizeof(*rqpair->cmds),
365 					0x1000, NULL);
366 	rqpair->cpls = spdk_dma_zmalloc(rqpair->max_queue_depth * sizeof(*rqpair->cpls),
367 					0x1000, NULL);
368 	rqpair->bufs = spdk_dma_zmalloc(rqpair->max_queue_depth * rtransport->in_capsule_data_size,
369 					0x1000, NULL);
370 	if (!rqpair->reqs || !rqpair->recvs || !rqpair->cmds ||
371 	    !rqpair->cpls || !rqpair->bufs) {
372 		SPDK_ERRLOG("Unable to allocate sufficient memory for RDMA queue.\n");
373 		spdk_nvmf_rdma_qpair_destroy(rqpair);
374 		return -1;
375 	}
376 
377 	rqpair->cmds_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->cmds,
378 				     rqpair->max_queue_depth * sizeof(*rqpair->cmds),
379 				     IBV_ACCESS_LOCAL_WRITE);
380 	rqpair->cpls_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->cpls,
381 				     rqpair->max_queue_depth * sizeof(*rqpair->cpls),
382 				     0);
383 	rqpair->bufs_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->bufs,
384 				     rqpair->max_queue_depth * rtransport->in_capsule_data_size,
385 				     IBV_ACCESS_LOCAL_WRITE |
386 				     IBV_ACCESS_REMOTE_WRITE);
387 	if (!rqpair->cmds_mr || !rqpair->cpls_mr || !rqpair->bufs_mr) {
388 		SPDK_ERRLOG("Unable to register required memory for RDMA queue.\n");
389 		spdk_nvmf_rdma_qpair_destroy(rqpair);
390 		return -1;
391 	}
392 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Command Array: %p Length: %lx LKey: %x\n",
393 		      rqpair->cmds, rqpair->max_queue_depth * sizeof(*rqpair->cmds), rqpair->cmds_mr->lkey);
394 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Completion Array: %p Length: %lx LKey: %x\n",
395 		      rqpair->cpls, rqpair->max_queue_depth * sizeof(*rqpair->cpls), rqpair->cpls_mr->lkey);
396 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "In Capsule Data Array: %p Length: %x LKey: %x\n",
397 		      rqpair->bufs, rqpair->max_queue_depth * rtransport->in_capsule_data_size, rqpair->bufs_mr->lkey);
398 
399 	for (i = 0; i < rqpair->max_queue_depth; i++) {
400 		struct ibv_recv_wr *bad_wr = NULL;
401 
402 		rdma_recv = &rqpair->recvs[i];
403 		rdma_recv->qpair = rqpair;
404 
405 		/* Set up memory to receive commands */
406 		rdma_recv->buf = (void *)((uintptr_t)rqpair->bufs + (i * rtransport->in_capsule_data_size));
407 
408 		rdma_recv->sgl[0].addr = (uintptr_t)&rqpair->cmds[i];
409 		rdma_recv->sgl[0].length = sizeof(rqpair->cmds[i]);
410 		rdma_recv->sgl[0].lkey = rqpair->cmds_mr->lkey;
411 
412 		rdma_recv->sgl[1].addr = (uintptr_t)rdma_recv->buf;
413 		rdma_recv->sgl[1].length = rtransport->in_capsule_data_size;
414 		rdma_recv->sgl[1].lkey = rqpair->bufs_mr->lkey;
415 
416 		rdma_recv->wr.wr_id = (uintptr_t)rdma_recv;
417 		rdma_recv->wr.sg_list = rdma_recv->sgl;
418 		rdma_recv->wr.num_sge = SPDK_COUNTOF(rdma_recv->sgl);
419 
420 		rc = ibv_post_recv(rqpair->cm_id->qp, &rdma_recv->wr, &bad_wr);
421 		if (rc) {
422 			SPDK_ERRLOG("Unable to post capsule for RDMA RECV\n");
423 			spdk_nvmf_rdma_qpair_destroy(rqpair);
424 			return -1;
425 		}
426 	}
427 
428 	for (i = 0; i < rqpair->max_queue_depth; i++) {
429 		rdma_req = &rqpair->reqs[i];
430 
431 		rdma_req->req.qpair = &rqpair->qpair;
432 		rdma_req->req.cmd = NULL;
433 
434 		/* Set up memory to send responses */
435 		rdma_req->req.rsp = &rqpair->cpls[i];
436 
437 		rdma_req->rsp.sgl[0].addr = (uintptr_t)&rqpair->cpls[i];
438 		rdma_req->rsp.sgl[0].length = sizeof(rqpair->cpls[i]);
439 		rdma_req->rsp.sgl[0].lkey = rqpair->cpls_mr->lkey;
440 
441 		rdma_req->rsp.wr.wr_id = (uintptr_t)rdma_req;
442 		rdma_req->rsp.wr.next = NULL;
443 		rdma_req->rsp.wr.opcode = IBV_WR_SEND;
444 		rdma_req->rsp.wr.send_flags = IBV_SEND_SIGNALED;
445 		rdma_req->rsp.wr.sg_list = rdma_req->rsp.sgl;
446 		rdma_req->rsp.wr.num_sge = SPDK_COUNTOF(rdma_req->rsp.sgl);
447 
448 		/* Set up memory for data buffers */
449 		rdma_req->data.wr.wr_id = (uint64_t)rdma_req;
450 		rdma_req->data.wr.next = NULL;
451 		rdma_req->data.wr.send_flags = IBV_SEND_SIGNALED;
452 		rdma_req->data.wr.sg_list = rdma_req->data.sgl;
453 		rdma_req->data.wr.num_sge = SPDK_COUNTOF(rdma_req->data.sgl);
454 
455 		TAILQ_INSERT_TAIL(&rqpair->free_queue, rdma_req, link);
456 	}
457 
458 	return 0;
459 }
460 
461 static int
462 request_transfer_in(struct spdk_nvmf_request *req)
463 {
464 	int				rc;
465 	struct spdk_nvmf_rdma_request	*rdma_req;
466 	struct spdk_nvmf_qpair		*qpair;
467 	struct spdk_nvmf_rdma_qpair	*rqpair;
468 	struct ibv_send_wr		*bad_wr = NULL;
469 
470 	qpair = req->qpair;
471 	rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
472 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
473 
474 	assert(req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER);
475 
476 	rqpair->cur_rdma_rw_depth++;
477 
478 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "RDMA READ POSTED. Request: %p Connection: %p\n", req, qpair);
479 	spdk_trace_record(TRACE_RDMA_READ_START, 0, 0, (uintptr_t)req, 0);
480 
481 	rdma_req->data.wr.opcode = IBV_WR_RDMA_READ;
482 	rdma_req->data.wr.next = NULL;
483 	rc = ibv_post_send(rqpair->cm_id->qp, &rdma_req->data.wr, &bad_wr);
484 	if (rc) {
485 		SPDK_ERRLOG("Unable to transfer data from host to target\n");
486 
487 		/* Decrement r/w counter back since data transfer
488 		 * has not started.
489 		 */
490 		rqpair->cur_rdma_rw_depth--;
491 		return -1;
492 	}
493 
494 	return 0;
495 }
496 
497 static int
498 request_transfer_out(struct spdk_nvmf_request *req)
499 {
500 	int				rc;
501 	struct spdk_nvmf_rdma_request	*rdma_req;
502 	struct spdk_nvmf_qpair		*qpair;
503 	struct spdk_nvmf_rdma_qpair	*rqpair;
504 	struct spdk_nvme_cpl		*rsp;
505 	struct ibv_recv_wr		*bad_recv_wr = NULL;
506 	struct ibv_send_wr		*send_wr, *bad_send_wr = NULL;
507 
508 	qpair = req->qpair;
509 	rsp = &req->rsp->nvme_cpl;
510 	rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
511 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
512 
513 	/* Advance our sq_head pointer */
514 	if (qpair->sq_head == qpair->sq_head_max) {
515 		qpair->sq_head = 0;
516 	} else {
517 		qpair->sq_head++;
518 	}
519 	rsp->sqhd = qpair->sq_head;
520 
521 	/* Post the capsule to the recv buffer */
522 	assert(rdma_req->recv != NULL);
523 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "RDMA RECV POSTED. Recv: %p Connection: %p\n", rdma_req->recv,
524 		      rqpair);
525 	rc = ibv_post_recv(rqpair->cm_id->qp, &rdma_req->recv->wr, &bad_recv_wr);
526 	if (rc) {
527 		SPDK_ERRLOG("Unable to re-post rx descriptor\n");
528 		return rc;
529 	}
530 	rdma_req->recv = NULL;
531 
532 	/* Build the response which consists of an optional
533 	 * RDMA WRITE to transfer data, plus an RDMA SEND
534 	 * containing the response.
535 	 */
536 	send_wr = &rdma_req->rsp.wr;
537 
538 	if (rsp->status.sc == SPDK_NVME_SC_SUCCESS &&
539 	    req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
540 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "RDMA WRITE POSTED. Request: %p Connection: %p\n", req, qpair);
541 		spdk_trace_record(TRACE_RDMA_WRITE_START, 0, 0, (uintptr_t)req, 0);
542 
543 		rqpair->cur_rdma_rw_depth++;
544 		rdma_req->data.wr.opcode = IBV_WR_RDMA_WRITE;
545 
546 		rdma_req->data.wr.next = send_wr;
547 		send_wr = &rdma_req->data.wr;
548 	}
549 
550 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "RDMA SEND POSTED. Request: %p Connection: %p\n", req, qpair);
551 	spdk_trace_record(TRACE_NVMF_IO_COMPLETE, 0, 0, (uintptr_t)req, 0);
552 
553 	/* Send the completion */
554 	rc = ibv_post_send(rqpair->cm_id->qp, send_wr, &bad_send_wr);
555 	if (rc) {
556 		SPDK_ERRLOG("Unable to send response capsule\n");
557 
558 		if (rdma_req->data.wr.opcode == IBV_WR_RDMA_WRITE) {
559 			/* Decrement r/w counter back since data transfer
560 			 * has not started.
561 			 */
562 			rqpair->cur_rdma_rw_depth--;
563 		}
564 	}
565 
566 	return rc;
567 }
568 
569 static int
570 spdk_nvmf_rdma_event_accept(struct rdma_cm_id *id, struct spdk_nvmf_rdma_qpair *rqpair)
571 {
572 	struct spdk_nvmf_rdma_accept_private_data	accept_data;
573 	struct rdma_conn_param				ctrlr_event_data = {};
574 	int						rc;
575 
576 	accept_data.recfmt = 0;
577 	accept_data.crqsize = rqpair->max_queue_depth;
578 
579 	ctrlr_event_data.private_data = &accept_data;
580 	ctrlr_event_data.private_data_len = sizeof(accept_data);
581 	if (id->ps == RDMA_PS_TCP) {
582 		ctrlr_event_data.responder_resources = 0; /* We accept 0 reads from the host */
583 		ctrlr_event_data.initiator_depth = rqpair->max_rw_depth;
584 	}
585 
586 	rc = rdma_accept(id, &ctrlr_event_data);
587 	if (rc) {
588 		SPDK_ERRLOG("Error %d on rdma_accept\n", errno);
589 	} else {
590 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Sent back the accept\n");
591 	}
592 
593 	return rc;
594 }
595 
596 static void
597 spdk_nvmf_rdma_event_reject(struct rdma_cm_id *id, enum spdk_nvmf_rdma_transport_error error)
598 {
599 	struct spdk_nvmf_rdma_reject_private_data	rej_data;
600 
601 	rej_data.recfmt = 0;
602 	rej_data.sts = error;
603 
604 	rdma_reject(id, &rej_data, sizeof(rej_data));
605 }
606 
607 static int
608 nvmf_rdma_connect(struct spdk_nvmf_transport *transport, struct rdma_cm_event *event,
609 		  new_qpair_fn cb_fn)
610 {
611 	struct spdk_nvmf_rdma_transport *rtransport;
612 	struct spdk_nvmf_rdma_qpair	*rqpair = NULL;
613 	struct spdk_nvmf_rdma_port	*port;
614 	struct rdma_conn_param		*rdma_param = NULL;
615 	const struct spdk_nvmf_rdma_request_private_data *private_data = NULL;
616 	uint16_t			max_queue_depth;
617 	uint16_t			max_rw_depth;
618 
619 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
620 
621 	assert(event->id != NULL); /* Impossible. Can't even reject the connection. */
622 	assert(event->id->verbs != NULL); /* Impossible. No way to handle this. */
623 
624 	rdma_param = &event->param.conn;
625 	if (rdma_param->private_data == NULL ||
626 	    rdma_param->private_data_len < sizeof(struct spdk_nvmf_rdma_request_private_data)) {
627 		SPDK_ERRLOG("connect request: no private data provided\n");
628 		spdk_nvmf_rdma_event_reject(event->id, SPDK_NVMF_RDMA_ERROR_INVALID_PRIVATE_DATA_LENGTH);
629 		return -1;
630 	}
631 
632 	private_data = rdma_param->private_data;
633 	if (private_data->recfmt != 0) {
634 		SPDK_ERRLOG("Received RDMA private data with RECFMT != 0\n");
635 		spdk_nvmf_rdma_event_reject(event->id, SPDK_NVMF_RDMA_ERROR_INVALID_RECFMT);
636 		return -1;
637 	}
638 
639 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Connect Recv on fabric intf name %s, dev_name %s\n",
640 		      event->id->verbs->device->name, event->id->verbs->device->dev_name);
641 
642 	port = event->listen_id->context;
643 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Listen Id was %p with verbs %p. ListenAddr: %p\n",
644 		      event->listen_id, event->listen_id->verbs, port);
645 
646 	/* Figure out the supported queue depth. This is a multi-step process
647 	 * that takes into account hardware maximums, host provided values,
648 	 * and our target's internal memory limits */
649 
650 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Calculating Queue Depth\n");
651 
652 	/* Start with the maximum queue depth allowed by the target */
653 	max_queue_depth = rtransport->max_queue_depth;
654 	max_rw_depth = rtransport->max_queue_depth;
655 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Target Max Queue Depth: %d\n", rtransport->max_queue_depth);
656 
657 	/* Next check the local NIC's hardware limitations */
658 	SPDK_DEBUGLOG(SPDK_LOG_RDMA,
659 		      "Local NIC Max Send/Recv Queue Depth: %d Max Read/Write Queue Depth: %d\n",
660 		      port->device->attr.max_qp_wr, port->device->attr.max_qp_rd_atom);
661 	max_queue_depth = spdk_min(max_queue_depth, port->device->attr.max_qp_wr);
662 	max_rw_depth = spdk_min(max_rw_depth, port->device->attr.max_qp_rd_atom);
663 
664 	/* Next check the remote NIC's hardware limitations */
665 	SPDK_DEBUGLOG(SPDK_LOG_RDMA,
666 		      "Host (Initiator) NIC Max Incoming RDMA R/W operations: %d Max Outgoing RDMA R/W operations: %d\n",
667 		      rdma_param->initiator_depth, rdma_param->responder_resources);
668 	if (rdma_param->initiator_depth > 0) {
669 		max_rw_depth = spdk_min(max_rw_depth, rdma_param->initiator_depth);
670 	}
671 
672 	/* Finally check for the host software requested values, which are
673 	 * optional. */
674 	if (rdma_param->private_data != NULL &&
675 	    rdma_param->private_data_len >= sizeof(struct spdk_nvmf_rdma_request_private_data)) {
676 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Host Receive Queue Size: %d\n", private_data->hrqsize);
677 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Host Send Queue Size: %d\n", private_data->hsqsize);
678 		max_queue_depth = spdk_min(max_queue_depth, private_data->hrqsize);
679 		max_queue_depth = spdk_min(max_queue_depth, private_data->hsqsize + 1);
680 	}
681 
682 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Final Negotiated Queue Depth: %d R/W Depth: %d\n",
683 		      max_queue_depth, max_rw_depth);
684 
685 	rqpair = calloc(1, sizeof(struct spdk_nvmf_rdma_qpair));
686 	if (rqpair == NULL) {
687 		SPDK_ERRLOG("Could not allocate new connection.\n");
688 		spdk_nvmf_rdma_event_reject(event->id, SPDK_NVMF_RDMA_ERROR_NO_RESOURCES);
689 		return -1;
690 	}
691 
692 	rqpair->port = port;
693 	rqpair->max_queue_depth = max_queue_depth;
694 	rqpair->max_rw_depth = max_rw_depth;
695 	rqpair->cm_id = event->id;
696 	rqpair->qpair.transport = transport;
697 	TAILQ_INIT(&rqpair->incoming_queue);
698 	TAILQ_INIT(&rqpair->free_queue);
699 	TAILQ_INIT(&rqpair->pending_rdma_rw_queue);
700 
701 	event->id->context = &rqpair->qpair;
702 
703 	cb_fn(&rqpair->qpair);
704 
705 	return 0;
706 }
707 
708 static void
709 nvmf_rdma_handle_disconnect(void *ctx)
710 {
711 	struct spdk_nvmf_qpair		*qpair = ctx;
712 	struct spdk_nvmf_ctrlr		*ctrlr;
713 	struct spdk_nvmf_rdma_qpair	*rqpair;
714 
715 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
716 
717 	ctrlr = qpair->ctrlr;
718 	if (ctrlr == NULL) {
719 		/* No ctrlr has been established yet, so destroy
720 		 * the connection.
721 		 */
722 		spdk_nvmf_rdma_qpair_destroy(rqpair);
723 		return;
724 	}
725 
726 	spdk_nvmf_ctrlr_disconnect(qpair);
727 }
728 
729 static int
730 nvmf_rdma_disconnect(struct rdma_cm_event *evt)
731 {
732 	struct spdk_nvmf_qpair	*qpair;
733 	struct spdk_io_channel	*ch;
734 
735 	if (evt->id == NULL) {
736 		SPDK_ERRLOG("disconnect request: missing cm_id\n");
737 		return -1;
738 	}
739 
740 	qpair = evt->id->context;
741 	if (qpair == NULL) {
742 		SPDK_ERRLOG("disconnect request: no active connection\n");
743 		return -1;
744 	}
745 	/* ack the disconnect event before rdma_destroy_id */
746 	rdma_ack_cm_event(evt);
747 
748 	ch = spdk_io_channel_from_ctx(qpair->group);
749 	spdk_thread_send_msg(spdk_io_channel_get_thread(ch), nvmf_rdma_handle_disconnect, qpair);
750 
751 	return 0;
752 }
753 
754 #ifdef DEBUG
755 static const char *CM_EVENT_STR[] = {
756 	"RDMA_CM_EVENT_ADDR_RESOLVED",
757 	"RDMA_CM_EVENT_ADDR_ERROR",
758 	"RDMA_CM_EVENT_ROUTE_RESOLVED",
759 	"RDMA_CM_EVENT_ROUTE_ERROR",
760 	"RDMA_CM_EVENT_CONNECT_REQUEST",
761 	"RDMA_CM_EVENT_CONNECT_RESPONSE",
762 	"RDMA_CM_EVENT_CONNECT_ERROR",
763 	"RDMA_CM_EVENT_UNREACHABLE",
764 	"RDMA_CM_EVENT_REJECTED",
765 	"RDMA_CM_EVENT_ESTABLISHED",
766 	"RDMA_CM_EVENT_DISCONNECTED",
767 	"RDMA_CM_EVENT_DEVICE_REMOVAL",
768 	"RDMA_CM_EVENT_MULTICAST_JOIN",
769 	"RDMA_CM_EVENT_MULTICAST_ERROR",
770 	"RDMA_CM_EVENT_ADDR_CHANGE",
771 	"RDMA_CM_EVENT_TIMEWAIT_EXIT"
772 };
773 #endif /* DEBUG */
774 
775 static int
776 spdk_nvmf_rdma_mem_notify(void *cb_ctx, struct spdk_mem_map *map,
777 			  enum spdk_mem_map_notify_action action,
778 			  void *vaddr, size_t size)
779 {
780 	struct spdk_nvmf_rdma_device *device = cb_ctx;
781 	struct ibv_pd *pd = device->pd;
782 	struct ibv_mr *mr;
783 
784 	switch (action) {
785 	case SPDK_MEM_MAP_NOTIFY_REGISTER:
786 		mr = ibv_reg_mr(pd, vaddr, size,
787 				IBV_ACCESS_LOCAL_WRITE |
788 				IBV_ACCESS_REMOTE_READ |
789 				IBV_ACCESS_REMOTE_WRITE);
790 		if (mr == NULL) {
791 			SPDK_ERRLOG("ibv_reg_mr() failed\n");
792 			return -1;
793 		} else {
794 			spdk_mem_map_set_translation(map, (uint64_t)vaddr, size, (uint64_t)mr);
795 		}
796 		break;
797 	case SPDK_MEM_MAP_NOTIFY_UNREGISTER:
798 		mr = (struct ibv_mr *)spdk_mem_map_translate(map, (uint64_t)vaddr);
799 		spdk_mem_map_clear_translation(map, (uint64_t)vaddr, size);
800 		if (mr) {
801 			ibv_dereg_mr(mr);
802 		}
803 		break;
804 	}
805 
806 	return 0;
807 }
808 
809 typedef enum spdk_nvme_data_transfer spdk_nvme_data_transfer_t;
810 
811 static spdk_nvme_data_transfer_t
812 spdk_nvmf_rdma_request_get_xfer(struct spdk_nvmf_rdma_request *rdma_req)
813 {
814 	enum spdk_nvme_data_transfer xfer;
815 	struct spdk_nvme_cmd *cmd = &rdma_req->req.cmd->nvme_cmd;
816 	struct spdk_nvme_sgl_descriptor *sgl = &cmd->dptr.sgl1;
817 
818 	/* Figure out data transfer direction */
819 	if (cmd->opc == SPDK_NVME_OPC_FABRIC) {
820 		xfer = spdk_nvme_opc_get_data_transfer(rdma_req->req.cmd->nvmf_cmd.fctype);
821 	} else {
822 		xfer = spdk_nvme_opc_get_data_transfer(cmd->opc);
823 
824 		/* Some admin commands are special cases */
825 		if ((rdma_req->req.qpair->qid == 0) &&
826 		    ((cmd->opc == SPDK_NVME_OPC_GET_FEATURES) ||
827 		     (cmd->opc == SPDK_NVME_OPC_SET_FEATURES))) {
828 			switch (cmd->cdw10 & 0xff) {
829 			case SPDK_NVME_FEAT_LBA_RANGE_TYPE:
830 			case SPDK_NVME_FEAT_AUTONOMOUS_POWER_STATE_TRANSITION:
831 			case SPDK_NVME_FEAT_HOST_IDENTIFIER:
832 				break;
833 			default:
834 				xfer = SPDK_NVME_DATA_NONE;
835 			}
836 		}
837 	}
838 
839 	if (xfer == SPDK_NVME_DATA_NONE) {
840 		return xfer;
841 	}
842 
843 	/* Even for commands that may transfer data, they could have specified 0 length.
844 	 * We want those to show up with xfer SPDK_NVME_DATA_NONE.
845 	 */
846 	switch (sgl->generic.type) {
847 	case SPDK_NVME_SGL_TYPE_DATA_BLOCK:
848 	case SPDK_NVME_SGL_TYPE_BIT_BUCKET:
849 	case SPDK_NVME_SGL_TYPE_SEGMENT:
850 	case SPDK_NVME_SGL_TYPE_LAST_SEGMENT:
851 		if (sgl->unkeyed.length == 0) {
852 			xfer = SPDK_NVME_DATA_NONE;
853 		}
854 		break;
855 	case SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK:
856 		if (sgl->keyed.length == 0) {
857 			xfer = SPDK_NVME_DATA_NONE;
858 		}
859 		break;
860 	}
861 
862 	return xfer;
863 }
864 
865 static int
866 spdk_nvmf_rdma_request_parse_sgl(struct spdk_nvmf_rdma_transport *rtransport,
867 				 struct spdk_nvmf_rdma_device *device,
868 				 struct spdk_nvmf_rdma_request *rdma_req)
869 {
870 	struct spdk_nvme_cmd			*cmd;
871 	struct spdk_nvme_cpl			*rsp;
872 	struct spdk_nvme_sgl_descriptor		*sgl;
873 
874 	cmd = &rdma_req->req.cmd->nvme_cmd;
875 	rsp = &rdma_req->req.rsp->nvme_cpl;
876 	sgl = &cmd->dptr.sgl1;
877 
878 	if (sgl->generic.type == SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK &&
879 	    (sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_ADDRESS ||
880 	     sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_INVALIDATE_KEY)) {
881 		if (sgl->keyed.length > rtransport->max_io_size) {
882 			SPDK_ERRLOG("SGL length 0x%x exceeds max io size 0x%x\n",
883 				    sgl->keyed.length, rtransport->max_io_size);
884 			rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID;
885 			return -1;
886 		}
887 
888 		rdma_req->req.length = sgl->keyed.length;
889 		rdma_req->data_from_pool = spdk_mempool_get(rtransport->data_buf_pool);
890 		if (!rdma_req->data_from_pool) {
891 			/* No available buffers. Queue this request up. */
892 			SPDK_DEBUGLOG(SPDK_LOG_RDMA, "No available large data buffers. Queueing request %p\n", rdma_req);
893 			return 0;
894 		}
895 		/* AIO backend requires block size aligned data buffers,
896 		 * 4KiB aligned data buffer should work for most devices.
897 		 */
898 		rdma_req->req.data = (void *)((uintptr_t)(rdma_req->data_from_pool + NVMF_DATA_BUFFER_MASK)
899 					      & ~NVMF_DATA_BUFFER_MASK);
900 		rdma_req->data.sgl[0].addr = (uintptr_t)rdma_req->req.data;
901 		rdma_req->data.sgl[0].length = sgl->keyed.length;
902 		rdma_req->data.sgl[0].lkey = ((struct ibv_mr *)spdk_mem_map_translate(device->map,
903 					      (uint64_t)rdma_req->req.data))->lkey;
904 		rdma_req->data.wr.wr.rdma.rkey = sgl->keyed.key;
905 		rdma_req->data.wr.wr.rdma.remote_addr = sgl->address;
906 
907 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Request %p took buffer from central pool\n", rdma_req);
908 
909 		return 0;
910 	} else if (sgl->generic.type == SPDK_NVME_SGL_TYPE_DATA_BLOCK &&
911 		   sgl->unkeyed.subtype == SPDK_NVME_SGL_SUBTYPE_OFFSET) {
912 		uint64_t offset = sgl->address;
913 		uint32_t max_len = rtransport->in_capsule_data_size;
914 
915 		SPDK_DEBUGLOG(SPDK_LOG_NVMF, "In-capsule data: offset 0x%" PRIx64 ", length 0x%x\n",
916 			      offset, sgl->unkeyed.length);
917 
918 		if (offset > max_len) {
919 			SPDK_ERRLOG("In-capsule offset 0x%" PRIx64 " exceeds capsule length 0x%x\n",
920 				    offset, max_len);
921 			rsp->status.sc = SPDK_NVME_SC_INVALID_SGL_OFFSET;
922 			return -1;
923 		}
924 		max_len -= (uint32_t)offset;
925 
926 		if (sgl->unkeyed.length > max_len) {
927 			SPDK_ERRLOG("In-capsule data length 0x%x exceeds capsule length 0x%x\n",
928 				    sgl->unkeyed.length, max_len);
929 			rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID;
930 			return -1;
931 		}
932 
933 		rdma_req->req.data = rdma_req->recv->buf + offset;
934 		rdma_req->data_from_pool = NULL;
935 		rdma_req->req.length = sgl->unkeyed.length;
936 		return 0;
937 	}
938 
939 	SPDK_ERRLOG("Invalid NVMf I/O Command SGL:  Type 0x%x, Subtype 0x%x\n",
940 		    sgl->generic.type, sgl->generic.subtype);
941 	rsp->status.sc = SPDK_NVME_SC_SGL_DESCRIPTOR_TYPE_INVALID;
942 	return -1;
943 }
944 
945 static bool
946 spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
947 			       struct spdk_nvmf_rdma_request *rdma_req)
948 {
949 	struct spdk_nvmf_rdma_qpair	*rqpair;
950 	struct spdk_nvmf_rdma_device	*device;
951 	struct spdk_nvme_cpl		*rsp = &rdma_req->req.rsp->nvme_cpl;
952 	int				rc;
953 	struct spdk_nvmf_rdma_recv	*rdma_recv;
954 	enum spdk_nvmf_rdma_request_state prev_state;
955 	bool				progress = false;
956 
957 	rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
958 	device = rqpair->port->device;
959 
960 	assert(rdma_req->state != RDMA_REQUEST_STATE_FREE);
961 
962 	/* The loop here is to allow for several back-to-back state changes. */
963 	do {
964 		prev_state = rdma_req->state;
965 
966 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Request %p entering state %d\n", rdma_req, prev_state);
967 
968 		switch (rdma_req->state) {
969 		case RDMA_REQUEST_STATE_FREE:
970 			/* Some external code must kick a request into RDMA_REQUEST_STATE_NEW
971 			 * to escape this state. */
972 			break;
973 		case RDMA_REQUEST_STATE_NEW:
974 			rqpair->cur_queue_depth++;
975 			rdma_recv = rdma_req->recv;
976 
977 			/* The first element of the SGL is the NVMe command */
978 			rdma_req->req.cmd = (union nvmf_h2c_msg *)rdma_recv->sgl[0].addr;
979 			memset(rdma_req->req.rsp, 0, sizeof(*rdma_req->req.rsp));
980 
981 			TAILQ_REMOVE(&rqpair->incoming_queue, rdma_recv, link);
982 			TAILQ_REMOVE(&rqpair->free_queue, rdma_req, link);
983 
984 			/* The next state transition depends on the data transfer needs of this request. */
985 			rdma_req->req.xfer = spdk_nvmf_rdma_request_get_xfer(rdma_req);
986 
987 			/* If no data to transfer, ready to execute. */
988 			if (rdma_req->req.xfer == SPDK_NVME_DATA_NONE) {
989 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
990 				break;
991 			}
992 
993 			rdma_req->state = RDMA_REQUEST_STATE_NEED_BUFFER;
994 			TAILQ_INSERT_TAIL(&rqpair->ch->pending_data_buf_queue, rdma_req, link);
995 			break;
996 		case RDMA_REQUEST_STATE_NEED_BUFFER:
997 			assert(rdma_req->req.xfer != SPDK_NVME_DATA_NONE);
998 
999 			if (rdma_req != TAILQ_FIRST(&rqpair->ch->pending_data_buf_queue)) {
1000 				/* This request needs to wait in line to obtain a buffer */
1001 				break;
1002 			}
1003 
1004 			/* Try to get a data buffer */
1005 			rc = spdk_nvmf_rdma_request_parse_sgl(rtransport, device, rdma_req);
1006 			if (rc < 0) {
1007 				TAILQ_REMOVE(&rqpair->ch->pending_data_buf_queue, rdma_req, link);
1008 				rsp->status.sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1009 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1010 				break;
1011 			}
1012 
1013 			if (!rdma_req->req.data) {
1014 				/* No buffers available. */
1015 				break;
1016 			}
1017 
1018 			TAILQ_REMOVE(&rqpair->ch->pending_data_buf_queue, rdma_req, link);
1019 
1020 			/* If data is transferring from host to controller and the data didn't
1021 			 * arrive using in capsule data, we need to do a transfer from the host.
1022 			 */
1023 			if (rdma_req->req.xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER && rdma_req->data_from_pool != NULL) {
1024 				rdma_req->state = RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER;
1025 				TAILQ_INSERT_TAIL(&rqpair->pending_rdma_rw_queue, rdma_req, link);
1026 				break;
1027 			}
1028 
1029 			rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
1030 			break;
1031 		case RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER:
1032 			if (rdma_req != TAILQ_FIRST(&rqpair->pending_rdma_rw_queue)) {
1033 				/* This request needs to wait in line to perform RDMA */
1034 				break;
1035 			}
1036 
1037 			if (rqpair->cur_rdma_rw_depth < rqpair->max_rw_depth) {
1038 				TAILQ_REMOVE(&rqpair->pending_rdma_rw_queue, rdma_req, link);
1039 				rdma_req->state = RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER;
1040 				rc = request_transfer_in(&rdma_req->req);
1041 				if (rc) {
1042 					rsp->status.sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1043 					rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1044 				}
1045 			}
1046 			break;
1047 		case RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER:
1048 			/* Some external code must kick a request into RDMA_REQUEST_STATE_READY_TO_EXECUTE
1049 			 * to escape this state. */
1050 			break;
1051 		case RDMA_REQUEST_STATE_READY_TO_EXECUTE:
1052 			rdma_req->state = RDMA_REQUEST_STATE_EXECUTING;
1053 			spdk_nvmf_request_exec(&rdma_req->req);
1054 			break;
1055 		case RDMA_REQUEST_STATE_EXECUTING:
1056 			/* Some external code must kick a request into RDMA_REQUEST_STATE_EXECUTED
1057 			 * to escape this state. */
1058 			break;
1059 		case RDMA_REQUEST_STATE_EXECUTED:
1060 			if (rdma_req->req.xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
1061 				rdma_req->state = RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST;
1062 				TAILQ_INSERT_TAIL(&rqpair->pending_rdma_rw_queue, rdma_req, link);
1063 			} else {
1064 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1065 			}
1066 			break;
1067 		case RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST:
1068 			if (rdma_req != TAILQ_FIRST(&rqpair->pending_rdma_rw_queue)) {
1069 				/* This request needs to wait in line to perform RDMA */
1070 				break;
1071 			}
1072 
1073 			if (rqpair->cur_rdma_rw_depth < rqpair->max_rw_depth) {
1074 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1075 				TAILQ_REMOVE(&rqpair->pending_rdma_rw_queue, rdma_req, link);
1076 			}
1077 			break;
1078 		case RDMA_REQUEST_STATE_READY_TO_COMPLETE:
1079 			rdma_req->state = RDMA_REQUEST_STATE_COMPLETING;
1080 
1081 			rc = request_transfer_out(&rdma_req->req);
1082 			assert(rc == 0); /* No good way to handle this currently */
1083 			break;
1084 		case RDMA_REQUEST_STATE_COMPLETING:
1085 			/* Some external code must kick a request into RDMA_REQUEST_STATE_COMPLETED
1086 			 * to escape this state. */
1087 			break;
1088 		case RDMA_REQUEST_STATE_COMPLETED:
1089 			assert(rqpair->cur_queue_depth > 0);
1090 			rqpair->cur_queue_depth--;
1091 
1092 			if (rdma_req->data_from_pool) {
1093 				/* Put the buffer back in the pool */
1094 				spdk_mempool_put(rtransport->data_buf_pool, rdma_req->data_from_pool);
1095 				rdma_req->data_from_pool = NULL;
1096 			}
1097 			rdma_req->req.length = 0;
1098 			rdma_req->req.data = NULL;
1099 			rdma_req->state = RDMA_REQUEST_STATE_FREE;
1100 			TAILQ_INSERT_TAIL(&rqpair->free_queue, rdma_req, link);
1101 			break;
1102 		}
1103 
1104 		if (rdma_req->state != prev_state) {
1105 			progress = true;
1106 		}
1107 	} while (rdma_req->state != prev_state);
1108 
1109 	return progress;
1110 }
1111 
1112 /* Public API callbacks begin here */
1113 
1114 static struct spdk_nvmf_transport *
1115 spdk_nvmf_rdma_create(struct spdk_nvmf_tgt *tgt)
1116 {
1117 	int rc;
1118 	struct spdk_nvmf_rdma_transport *rtransport;
1119 	struct spdk_nvmf_rdma_device	*device, *tmp;
1120 	struct ibv_context		**contexts;
1121 	uint32_t			i;
1122 	int				flag;
1123 
1124 	rtransport = calloc(1, sizeof(*rtransport));
1125 	if (!rtransport) {
1126 		return NULL;
1127 	}
1128 
1129 	pthread_mutex_init(&rtransport->lock, NULL);
1130 	TAILQ_INIT(&rtransport->devices);
1131 	TAILQ_INIT(&rtransport->ports);
1132 
1133 	rtransport->transport.tgt = tgt;
1134 	rtransport->transport.ops = &spdk_nvmf_transport_rdma;
1135 
1136 	SPDK_INFOLOG(SPDK_LOG_RDMA, "*** RDMA Transport Init ***\n");
1137 
1138 	rtransport->max_queue_depth = tgt->opts.max_queue_depth;
1139 	rtransport->max_io_size = tgt->opts.max_io_size;
1140 	rtransport->in_capsule_data_size = tgt->opts.in_capsule_data_size;
1141 
1142 	rtransport->event_channel = rdma_create_event_channel();
1143 	if (rtransport->event_channel == NULL) {
1144 		SPDK_ERRLOG("rdma_create_event_channel() failed, %s\n", spdk_strerror(errno));
1145 		free(rtransport);
1146 		return NULL;
1147 	}
1148 
1149 	flag = fcntl(rtransport->event_channel->fd, F_GETFL);
1150 	if (fcntl(rtransport->event_channel->fd, F_SETFL, flag | O_NONBLOCK) < 0) {
1151 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
1152 			    rtransport->event_channel->fd, spdk_strerror(errno));
1153 		free(rtransport);
1154 		return NULL;
1155 	}
1156 
1157 	rtransport->data_buf_pool = spdk_mempool_create("spdk_nvmf_rdma",
1158 				    rtransport->max_queue_depth * 4, /* The 4 is arbitrarily chosen. Needs to be configurable. */
1159 				    rtransport->max_io_size + NVMF_DATA_BUFFER_ALIGNMENT,
1160 				    SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
1161 				    SPDK_ENV_SOCKET_ID_ANY);
1162 	if (!rtransport->data_buf_pool) {
1163 		SPDK_ERRLOG("Unable to allocate buffer pool for poll group\n");
1164 		free(rtransport);
1165 		return NULL;
1166 	}
1167 
1168 	spdk_io_device_register(rtransport, spdk_nvmf_rdma_mgmt_channel_create,
1169 				spdk_nvmf_rdma_mgmt_channel_destroy,
1170 				sizeof(struct spdk_nvmf_rdma_mgmt_channel));
1171 
1172 	contexts = rdma_get_devices(NULL);
1173 	i = 0;
1174 	rc = 0;
1175 	while (contexts[i] != NULL) {
1176 		device = calloc(1, sizeof(*device));
1177 		if (!device) {
1178 			SPDK_ERRLOG("Unable to allocate memory for RDMA devices.\n");
1179 			rc = -ENOMEM;
1180 			break;
1181 		}
1182 		device->context = contexts[i];
1183 		rc = ibv_query_device(device->context, &device->attr);
1184 		if (rc < 0) {
1185 			SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
1186 			free(device);
1187 			break;
1188 
1189 		}
1190 
1191 		device->pd = NULL;
1192 		device->map = NULL;
1193 
1194 		TAILQ_INSERT_TAIL(&rtransport->devices, device, link);
1195 		i++;
1196 	}
1197 
1198 	if (rc < 0) {
1199 		TAILQ_FOREACH_SAFE(device, &rtransport->devices, link, tmp) {
1200 			TAILQ_REMOVE(&rtransport->devices, device, link);
1201 			free(device);
1202 		}
1203 		spdk_mempool_free(rtransport->data_buf_pool);
1204 		rdma_destroy_event_channel(rtransport->event_channel);
1205 		free(rtransport);
1206 		rdma_free_devices(contexts);
1207 		return NULL;
1208 	}
1209 
1210 	rdma_free_devices(contexts);
1211 
1212 	return &rtransport->transport;
1213 }
1214 
1215 static int
1216 spdk_nvmf_rdma_destroy(struct spdk_nvmf_transport *transport)
1217 {
1218 	struct spdk_nvmf_rdma_transport	*rtransport;
1219 	struct spdk_nvmf_rdma_port	*port, *port_tmp;
1220 	struct spdk_nvmf_rdma_device	*device, *device_tmp;
1221 
1222 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1223 
1224 	TAILQ_FOREACH_SAFE(port, &rtransport->ports, link, port_tmp) {
1225 		TAILQ_REMOVE(&rtransport->ports, port, link);
1226 		rdma_destroy_id(port->id);
1227 		free(port);
1228 	}
1229 
1230 	if (rtransport->event_channel != NULL) {
1231 		rdma_destroy_event_channel(rtransport->event_channel);
1232 	}
1233 
1234 	TAILQ_FOREACH_SAFE(device, &rtransport->devices, link, device_tmp) {
1235 		TAILQ_REMOVE(&rtransport->devices, device, link);
1236 		if (device->map) {
1237 			spdk_mem_map_free(&device->map);
1238 		}
1239 		free(device);
1240 	}
1241 
1242 	if (spdk_mempool_count(rtransport->data_buf_pool) != (rtransport->max_queue_depth * 4)) {
1243 		SPDK_ERRLOG("transport buffer pool count is %zu but should be %u\n",
1244 			    spdk_mempool_count(rtransport->data_buf_pool),
1245 			    rtransport->max_queue_depth * 4);
1246 	}
1247 
1248 	spdk_mempool_free(rtransport->data_buf_pool);
1249 	spdk_io_device_unregister(rtransport, NULL);
1250 	free(rtransport);
1251 
1252 	return 0;
1253 }
1254 
1255 static int
1256 spdk_nvmf_rdma_listen(struct spdk_nvmf_transport *transport,
1257 		      const struct spdk_nvme_transport_id *trid)
1258 {
1259 	struct spdk_nvmf_rdma_transport	*rtransport;
1260 	struct spdk_nvmf_rdma_device	*device;
1261 	struct spdk_nvmf_rdma_port	*port_tmp, *port;
1262 	struct addrinfo			*res;
1263 	struct addrinfo			hints;
1264 	int				family;
1265 	int				rc;
1266 
1267 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1268 
1269 	port = calloc(1, sizeof(*port));
1270 	if (!port) {
1271 		return -ENOMEM;
1272 	}
1273 
1274 	/* Selectively copy the trid. Things like NQN don't matter here - that
1275 	 * mapping is enforced elsewhere.
1276 	 */
1277 	port->trid.trtype = SPDK_NVME_TRANSPORT_RDMA;
1278 	port->trid.adrfam = trid->adrfam;
1279 	snprintf(port->trid.traddr, sizeof(port->trid.traddr), "%s", trid->traddr);
1280 	snprintf(port->trid.trsvcid, sizeof(port->trid.trsvcid), "%s", trid->trsvcid);
1281 
1282 	pthread_mutex_lock(&rtransport->lock);
1283 	assert(rtransport->event_channel != NULL);
1284 	TAILQ_FOREACH(port_tmp, &rtransport->ports, link) {
1285 		if (spdk_nvme_transport_id_compare(&port_tmp->trid, &port->trid) == 0) {
1286 			port_tmp->ref++;
1287 			free(port);
1288 			/* Already listening at this address */
1289 			pthread_mutex_unlock(&rtransport->lock);
1290 			return 0;
1291 		}
1292 	}
1293 
1294 	rc = rdma_create_id(rtransport->event_channel, &port->id, port, RDMA_PS_TCP);
1295 	if (rc < 0) {
1296 		SPDK_ERRLOG("rdma_create_id() failed\n");
1297 		free(port);
1298 		pthread_mutex_unlock(&rtransport->lock);
1299 		return rc;
1300 	}
1301 
1302 	switch (port->trid.adrfam) {
1303 	case SPDK_NVMF_ADRFAM_IPV4:
1304 		family = AF_INET;
1305 		break;
1306 	case SPDK_NVMF_ADRFAM_IPV6:
1307 		family = AF_INET6;
1308 		break;
1309 	default:
1310 		SPDK_ERRLOG("Unhandled ADRFAM %d\n", port->trid.adrfam);
1311 		free(port);
1312 		pthread_mutex_unlock(&rtransport->lock);
1313 		return -EINVAL;
1314 	}
1315 
1316 	memset(&hints, 0, sizeof(hints));
1317 	hints.ai_family = family;
1318 	hints.ai_socktype = SOCK_STREAM;
1319 	hints.ai_protocol = 0;
1320 
1321 	rc = getaddrinfo(port->trid.traddr, port->trid.trsvcid, &hints, &res);
1322 	if (rc) {
1323 		SPDK_ERRLOG("getaddrinfo failed: %s (%d)\n", gai_strerror(rc), rc);
1324 		free(port);
1325 		pthread_mutex_unlock(&rtransport->lock);
1326 		return -EINVAL;
1327 	}
1328 
1329 	rc = rdma_bind_addr(port->id, res->ai_addr);
1330 	freeaddrinfo(res);
1331 
1332 	if (rc < 0) {
1333 		SPDK_ERRLOG("rdma_bind_addr() failed\n");
1334 		rdma_destroy_id(port->id);
1335 		free(port);
1336 		pthread_mutex_unlock(&rtransport->lock);
1337 		return rc;
1338 	}
1339 
1340 	rc = rdma_listen(port->id, 10); /* 10 = backlog */
1341 	if (rc < 0) {
1342 		SPDK_ERRLOG("rdma_listen() failed\n");
1343 		rdma_destroy_id(port->id);
1344 		free(port);
1345 		pthread_mutex_unlock(&rtransport->lock);
1346 		return rc;
1347 	}
1348 
1349 	TAILQ_FOREACH(device, &rtransport->devices, link) {
1350 		if (device->context == port->id->verbs) {
1351 			port->device = device;
1352 			break;
1353 		}
1354 	}
1355 	if (!port->device) {
1356 		SPDK_ERRLOG("Accepted a connection with verbs %p, but unable to find a corresponding device.\n",
1357 			    port->id->verbs);
1358 		rdma_destroy_id(port->id);
1359 		free(port);
1360 		pthread_mutex_unlock(&rtransport->lock);
1361 		return -EINVAL;
1362 	}
1363 
1364 	if (!device->map) {
1365 		device->pd = port->id->pd;
1366 		device->map = spdk_mem_map_alloc(0, spdk_nvmf_rdma_mem_notify, device);
1367 		if (!device->map) {
1368 			SPDK_ERRLOG("Unable to allocate memory map for new poll group\n");
1369 			return -1;
1370 		}
1371 	} else {
1372 		assert(device->pd == port->id->pd);
1373 	}
1374 
1375 	SPDK_INFOLOG(SPDK_LOG_RDMA, "*** NVMf Target Listening on %s port %d ***\n",
1376 		     port->trid.traddr, ntohs(rdma_get_src_port(port->id)));
1377 
1378 	port->ref = 1;
1379 
1380 	TAILQ_INSERT_TAIL(&rtransport->ports, port, link);
1381 	pthread_mutex_unlock(&rtransport->lock);
1382 
1383 	return 0;
1384 }
1385 
1386 static int
1387 spdk_nvmf_rdma_stop_listen(struct spdk_nvmf_transport *transport,
1388 			   const struct spdk_nvme_transport_id *_trid)
1389 {
1390 	struct spdk_nvmf_rdma_transport *rtransport;
1391 	struct spdk_nvmf_rdma_port *port, *tmp;
1392 	struct spdk_nvme_transport_id trid = {};
1393 
1394 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1395 
1396 	/* Selectively copy the trid. Things like NQN don't matter here - that
1397 	 * mapping is enforced elsewhere.
1398 	 */
1399 	trid.trtype = SPDK_NVME_TRANSPORT_RDMA;
1400 	trid.adrfam = _trid->adrfam;
1401 	snprintf(trid.traddr, sizeof(port->trid.traddr), "%s", _trid->traddr);
1402 	snprintf(trid.trsvcid, sizeof(port->trid.trsvcid), "%s", _trid->trsvcid);
1403 
1404 	pthread_mutex_lock(&rtransport->lock);
1405 	TAILQ_FOREACH_SAFE(port, &rtransport->ports, link, tmp) {
1406 		if (spdk_nvme_transport_id_compare(&port->trid, &trid) == 0) {
1407 			assert(port->ref > 0);
1408 			port->ref--;
1409 			if (port->ref == 0) {
1410 				TAILQ_REMOVE(&rtransport->ports, port, link);
1411 				rdma_destroy_id(port->id);
1412 				free(port);
1413 			}
1414 			break;
1415 		}
1416 	}
1417 
1418 	pthread_mutex_unlock(&rtransport->lock);
1419 	return 0;
1420 }
1421 
1422 static void
1423 spdk_nvmf_rdma_accept(struct spdk_nvmf_transport *transport, new_qpair_fn cb_fn)
1424 {
1425 	struct spdk_nvmf_rdma_transport *rtransport;
1426 	struct rdma_cm_event		*event;
1427 	int				rc;
1428 
1429 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1430 
1431 	if (rtransport->event_channel == NULL) {
1432 		return;
1433 	}
1434 
1435 	while (1) {
1436 		rc = rdma_get_cm_event(rtransport->event_channel, &event);
1437 		if (rc == 0) {
1438 			SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Acceptor Event: %s\n", CM_EVENT_STR[event->event]);
1439 
1440 			switch (event->event) {
1441 			case RDMA_CM_EVENT_CONNECT_REQUEST:
1442 				rc = nvmf_rdma_connect(transport, event, cb_fn);
1443 				if (rc < 0) {
1444 					SPDK_ERRLOG("Unable to process connect event. rc: %d\n", rc);
1445 					break;
1446 				}
1447 				break;
1448 			case RDMA_CM_EVENT_ESTABLISHED:
1449 				break;
1450 			case RDMA_CM_EVENT_ADDR_CHANGE:
1451 			case RDMA_CM_EVENT_DISCONNECTED:
1452 			case RDMA_CM_EVENT_DEVICE_REMOVAL:
1453 			case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1454 				rc = nvmf_rdma_disconnect(event);
1455 				if (rc < 0) {
1456 					SPDK_ERRLOG("Unable to process disconnect event. rc: %d\n", rc);
1457 					break;
1458 				}
1459 				continue;
1460 			default:
1461 				SPDK_ERRLOG("Unexpected Acceptor Event [%d]\n", event->event);
1462 				break;
1463 			}
1464 
1465 			rdma_ack_cm_event(event);
1466 		} else {
1467 			if (errno != EAGAIN && errno != EWOULDBLOCK) {
1468 				SPDK_ERRLOG("Acceptor Event Error: %s\n", spdk_strerror(errno));
1469 			}
1470 			break;
1471 		}
1472 	}
1473 }
1474 
1475 static void
1476 spdk_nvmf_rdma_discover(struct spdk_nvmf_transport *transport,
1477 			struct spdk_nvme_transport_id *trid,
1478 			struct spdk_nvmf_discovery_log_page_entry *entry)
1479 {
1480 	entry->trtype = SPDK_NVMF_TRTYPE_RDMA;
1481 	entry->adrfam = trid->adrfam;
1482 	entry->treq.secure_channel = SPDK_NVMF_TREQ_SECURE_CHANNEL_NOT_SPECIFIED;
1483 
1484 	spdk_strcpy_pad(entry->trsvcid, trid->trsvcid, sizeof(entry->trsvcid), ' ');
1485 	spdk_strcpy_pad(entry->traddr, trid->traddr, sizeof(entry->traddr), ' ');
1486 
1487 	entry->tsas.rdma.rdma_qptype = SPDK_NVMF_RDMA_QPTYPE_RELIABLE_CONNECTED;
1488 	entry->tsas.rdma.rdma_prtype = SPDK_NVMF_RDMA_PRTYPE_NONE;
1489 	entry->tsas.rdma.rdma_cms = SPDK_NVMF_RDMA_CMS_RDMA_CM;
1490 }
1491 
1492 static struct spdk_nvmf_transport_poll_group *
1493 spdk_nvmf_rdma_poll_group_create(struct spdk_nvmf_transport *transport)
1494 {
1495 	struct spdk_nvmf_rdma_transport		*rtransport;
1496 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1497 	struct spdk_nvmf_rdma_poller		*poller;
1498 	struct spdk_nvmf_rdma_device		*device;
1499 
1500 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1501 
1502 	rgroup = calloc(1, sizeof(*rgroup));
1503 	if (!rgroup) {
1504 		return NULL;
1505 	}
1506 
1507 	TAILQ_INIT(&rgroup->pollers);
1508 
1509 	pthread_mutex_lock(&rtransport->lock);
1510 	TAILQ_FOREACH(device, &rtransport->devices, link) {
1511 		if (device->map == NULL) {
1512 			/*
1513 			 * The device is not in use (no listeners),
1514 			 * so no protection domain has been constructed.
1515 			 * Skip it.
1516 			 */
1517 			SPDK_NOTICELOG("Skipping unused RDMA device when creating poll group.\n");
1518 			continue;
1519 		}
1520 
1521 		poller = calloc(1, sizeof(*poller));
1522 		if (!poller) {
1523 			SPDK_ERRLOG("Unable to allocate memory for new RDMA poller\n");
1524 			free(rgroup);
1525 			pthread_mutex_unlock(&rtransport->lock);
1526 			return NULL;
1527 		}
1528 
1529 		poller->device = device;
1530 		poller->group = rgroup;
1531 
1532 		TAILQ_INIT(&poller->qpairs);
1533 
1534 		poller->cq = ibv_create_cq(device->context, NVMF_RDMA_CQ_SIZE, poller, NULL, 0);
1535 		if (!poller->cq) {
1536 			SPDK_ERRLOG("Unable to create completion queue\n");
1537 			free(poller);
1538 			free(rgroup);
1539 			pthread_mutex_unlock(&rtransport->lock);
1540 			return NULL;
1541 		}
1542 
1543 		TAILQ_INSERT_TAIL(&rgroup->pollers, poller, link);
1544 	}
1545 
1546 	pthread_mutex_unlock(&rtransport->lock);
1547 	return &rgroup->group;
1548 }
1549 
1550 static void
1551 spdk_nvmf_rdma_poll_group_destroy(struct spdk_nvmf_transport_poll_group *group)
1552 {
1553 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1554 	struct spdk_nvmf_rdma_poller		*poller, *tmp;
1555 
1556 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
1557 
1558 	if (!rgroup) {
1559 		return;
1560 	}
1561 
1562 	TAILQ_FOREACH_SAFE(poller, &rgroup->pollers, link, tmp) {
1563 		TAILQ_REMOVE(&rgroup->pollers, poller, link);
1564 
1565 		if (poller->cq) {
1566 			ibv_destroy_cq(poller->cq);
1567 		}
1568 
1569 		free(poller);
1570 	}
1571 
1572 	free(rgroup);
1573 }
1574 
1575 static int
1576 spdk_nvmf_rdma_poll_group_add(struct spdk_nvmf_transport_poll_group *group,
1577 			      struct spdk_nvmf_qpair *qpair)
1578 {
1579 	struct spdk_nvmf_rdma_transport		*rtransport;
1580 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1581 	struct spdk_nvmf_rdma_qpair		*rqpair;
1582 	struct spdk_nvmf_rdma_device		*device;
1583 	struct spdk_nvmf_rdma_poller		*poller;
1584 	int					rc;
1585 
1586 	rtransport = SPDK_CONTAINEROF(qpair->transport, struct spdk_nvmf_rdma_transport, transport);
1587 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
1588 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
1589 
1590 	device = rqpair->port->device;
1591 
1592 	if (device->pd != rqpair->cm_id->pd) {
1593 		SPDK_ERRLOG("Mismatched protection domains\n");
1594 		return -1;
1595 	}
1596 
1597 	TAILQ_FOREACH(poller, &rgroup->pollers, link) {
1598 		if (poller->device == device) {
1599 			break;
1600 		}
1601 	}
1602 
1603 	if (!poller) {
1604 		SPDK_ERRLOG("No poller found for device.\n");
1605 		return -1;
1606 	}
1607 
1608 	TAILQ_INSERT_TAIL(&poller->qpairs, rqpair, link);
1609 	rqpair->poller = poller;
1610 
1611 	rc = spdk_nvmf_rdma_qpair_initialize(qpair);
1612 	if (rc < 0) {
1613 		SPDK_ERRLOG("Failed to initialize nvmf_rdma_qpair with qpair=%p\n", qpair);
1614 		return -1;
1615 	}
1616 
1617 	rqpair->mgmt_channel = spdk_get_io_channel(rtransport);
1618 	if (!rqpair->mgmt_channel) {
1619 		spdk_nvmf_rdma_event_reject(rqpair->cm_id, SPDK_NVMF_RDMA_ERROR_NO_RESOURCES);
1620 		spdk_nvmf_rdma_qpair_destroy(rqpair);
1621 		return -1;
1622 	}
1623 
1624 	rqpair->ch = spdk_io_channel_get_ctx(rqpair->mgmt_channel);
1625 	assert(rqpair->ch != NULL);
1626 
1627 	rc = spdk_nvmf_rdma_event_accept(rqpair->cm_id, rqpair);
1628 	if (rc) {
1629 		/* Try to reject, but we probably can't */
1630 		spdk_nvmf_rdma_event_reject(rqpair->cm_id, SPDK_NVMF_RDMA_ERROR_NO_RESOURCES);
1631 		spdk_nvmf_rdma_qpair_destroy(rqpair);
1632 		return -1;
1633 	}
1634 
1635 	return 0;
1636 }
1637 
1638 static int
1639 spdk_nvmf_rdma_poll_group_remove(struct spdk_nvmf_transport_poll_group *group,
1640 				 struct spdk_nvmf_qpair *qpair)
1641 {
1642 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1643 	struct spdk_nvmf_rdma_qpair		*rqpair;
1644 	struct spdk_nvmf_rdma_device		*device;
1645 	struct spdk_nvmf_rdma_poller		*poller;
1646 	struct spdk_nvmf_rdma_qpair		*rq, *trq;
1647 
1648 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
1649 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
1650 
1651 	device = rqpair->port->device;
1652 
1653 	TAILQ_FOREACH(poller, &rgroup->pollers, link) {
1654 		if (poller->device == device) {
1655 			break;
1656 		}
1657 	}
1658 
1659 	if (!poller) {
1660 		SPDK_ERRLOG("No poller found for device.\n");
1661 		return -1;
1662 	}
1663 
1664 	TAILQ_FOREACH_SAFE(rq, &poller->qpairs, link, trq) {
1665 		if (rq == rqpair) {
1666 			TAILQ_REMOVE(&poller->qpairs, rqpair, link);
1667 			break;
1668 		}
1669 	}
1670 
1671 	if (rq == NULL) {
1672 		SPDK_ERRLOG("RDMA qpair cannot be removed from group (not in group).\n");
1673 		return -1;
1674 	}
1675 
1676 	return 0;
1677 }
1678 
1679 static int
1680 spdk_nvmf_rdma_request_complete(struct spdk_nvmf_request *req)
1681 {
1682 	struct spdk_nvmf_rdma_transport	*rtransport = SPDK_CONTAINEROF(req->qpair->transport,
1683 			struct spdk_nvmf_rdma_transport, transport);
1684 	struct spdk_nvmf_rdma_request	*rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
1685 
1686 	rdma_req->state = RDMA_REQUEST_STATE_EXECUTED;
1687 	spdk_nvmf_rdma_request_process(rtransport, rdma_req);
1688 
1689 	return 0;
1690 }
1691 
1692 static void
1693 spdk_nvmf_rdma_close_qpair(struct spdk_nvmf_qpair *qpair)
1694 {
1695 	spdk_nvmf_rdma_qpair_destroy(SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair));
1696 }
1697 
1698 static void
1699 spdk_nvmf_rdma_qpair_process_pending(struct spdk_nvmf_rdma_transport *rtransport,
1700 				     struct spdk_nvmf_rdma_qpair *rqpair)
1701 {
1702 	struct spdk_nvmf_rdma_recv	*rdma_recv, *recv_tmp;
1703 	struct spdk_nvmf_rdma_request	*rdma_req, *req_tmp;
1704 
1705 	/* We process I/O in the pending_rdma_rw queue at the highest priority. */
1706 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->pending_rdma_rw_queue, link, req_tmp) {
1707 		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
1708 			break;
1709 		}
1710 	}
1711 
1712 	/* The second highest priority is I/O waiting on memory buffers. */
1713 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->ch->pending_data_buf_queue, link, req_tmp) {
1714 		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
1715 			break;
1716 		}
1717 	}
1718 
1719 	/* The lowest priority is processing newly received commands */
1720 	TAILQ_FOREACH_SAFE(rdma_recv, &rqpair->incoming_queue, link, recv_tmp) {
1721 		rdma_req = TAILQ_FIRST(&rqpair->free_queue);
1722 		if (rdma_req == NULL) {
1723 			/* Need to wait for more SEND completions */
1724 			break;
1725 		}
1726 
1727 		rdma_req->recv = rdma_recv;
1728 		rdma_req->state = RDMA_REQUEST_STATE_NEW;
1729 		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
1730 			break;
1731 		}
1732 	}
1733 }
1734 
1735 static struct spdk_nvmf_rdma_request *
1736 get_rdma_req_from_wc(struct ibv_wc *wc)
1737 {
1738 	struct spdk_nvmf_rdma_request *rdma_req;
1739 
1740 	rdma_req = (struct spdk_nvmf_rdma_request *)wc->wr_id;
1741 	assert(rdma_req != NULL);
1742 
1743 #ifdef DEBUG
1744 	struct spdk_nvmf_rdma_qpair *rqpair;
1745 	rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
1746 
1747 	assert(rdma_req - rqpair->reqs >= 0);
1748 	assert(rdma_req - rqpair->reqs < (ptrdiff_t)rqpair->max_queue_depth);
1749 #endif
1750 
1751 	return rdma_req;
1752 }
1753 
1754 static struct spdk_nvmf_rdma_recv *
1755 get_rdma_recv_from_wc(struct ibv_wc *wc)
1756 {
1757 	struct spdk_nvmf_rdma_recv *rdma_recv;
1758 
1759 	assert(wc->byte_len >= sizeof(struct spdk_nvmf_capsule_cmd));
1760 
1761 	rdma_recv = (struct spdk_nvmf_rdma_recv *)wc->wr_id;
1762 	assert(rdma_recv != NULL);
1763 
1764 #ifdef DEBUG
1765 	struct spdk_nvmf_rdma_qpair *rqpair = rdma_recv->qpair;
1766 
1767 	assert(rdma_recv - rqpair->recvs >= 0);
1768 	assert(rdma_recv - rqpair->recvs < (ptrdiff_t)rqpair->max_queue_depth);
1769 #endif
1770 
1771 	return rdma_recv;
1772 }
1773 
1774 static int
1775 spdk_nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport,
1776 			   struct spdk_nvmf_rdma_poller *rpoller)
1777 {
1778 	struct ibv_wc wc[32];
1779 	struct spdk_nvmf_rdma_request	*rdma_req;
1780 	struct spdk_nvmf_rdma_recv	*rdma_recv;
1781 	struct spdk_nvmf_rdma_qpair	*rqpair;
1782 	int reaped, i;
1783 	int count = 0;
1784 	bool error = false;
1785 
1786 	/* Poll for completing operations. */
1787 	reaped = ibv_poll_cq(rpoller->cq, 32, wc);
1788 	if (reaped < 0) {
1789 		SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
1790 			    errno, spdk_strerror(errno));
1791 		return -1;
1792 	}
1793 
1794 	for (i = 0; i < reaped; i++) {
1795 		if (wc[i].status) {
1796 			SPDK_ERRLOG("CQ error on CQ %p, Request 0x%lu (%d): %s\n",
1797 				    rpoller->cq, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
1798 			error = true;
1799 			continue;
1800 		}
1801 
1802 		switch (wc[i].opcode) {
1803 		case IBV_WC_SEND:
1804 			rdma_req = get_rdma_req_from_wc(&wc[i]);
1805 			rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
1806 
1807 			assert(rdma_req->state == RDMA_REQUEST_STATE_COMPLETING);
1808 			rdma_req->state = RDMA_REQUEST_STATE_COMPLETED;
1809 
1810 			spdk_nvmf_rdma_request_process(rtransport, rdma_req);
1811 
1812 			count++;
1813 
1814 			/* Try to process other queued requests */
1815 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair);
1816 			break;
1817 
1818 		case IBV_WC_RDMA_WRITE:
1819 			rdma_req = get_rdma_req_from_wc(&wc[i]);
1820 			rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
1821 
1822 			rqpair->cur_rdma_rw_depth--;
1823 
1824 			/* Try to process other queued requests */
1825 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair);
1826 			break;
1827 
1828 		case IBV_WC_RDMA_READ:
1829 			rdma_req = get_rdma_req_from_wc(&wc[i]);
1830 			rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
1831 
1832 			assert(rdma_req->state == RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER);
1833 			rqpair->cur_rdma_rw_depth--;
1834 			rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
1835 
1836 			spdk_nvmf_rdma_request_process(rtransport, rdma_req);
1837 
1838 			/* Try to process other queued requests */
1839 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair);
1840 			break;
1841 
1842 		case IBV_WC_RECV:
1843 			rdma_recv = get_rdma_recv_from_wc(&wc[i]);
1844 			rqpair = rdma_recv->qpair;
1845 
1846 			TAILQ_INSERT_TAIL(&rqpair->incoming_queue, rdma_recv, link);
1847 
1848 			/* Try to process other queued requests */
1849 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair);
1850 			break;
1851 
1852 		default:
1853 			SPDK_ERRLOG("Received an unknown opcode on the CQ: %d\n", wc[i].opcode);
1854 			continue;
1855 		}
1856 	}
1857 
1858 	if (error == true) {
1859 		return -1;
1860 	}
1861 
1862 	return count;
1863 }
1864 
1865 static int
1866 spdk_nvmf_rdma_poll_group_poll(struct spdk_nvmf_transport_poll_group *group)
1867 {
1868 	struct spdk_nvmf_rdma_transport *rtransport;
1869 	struct spdk_nvmf_rdma_poll_group *rgroup;
1870 	struct spdk_nvmf_rdma_poller	*rpoller;
1871 	int				count, rc;
1872 
1873 	rtransport = SPDK_CONTAINEROF(group->transport, struct spdk_nvmf_rdma_transport, transport);
1874 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
1875 
1876 	count = 0;
1877 	TAILQ_FOREACH(rpoller, &rgroup->pollers, link) {
1878 		rc = spdk_nvmf_rdma_poller_poll(rtransport, rpoller);
1879 		if (rc < 0) {
1880 			return rc;
1881 		}
1882 		count += rc;
1883 	}
1884 
1885 	return count;
1886 }
1887 
1888 static bool
1889 spdk_nvmf_rdma_qpair_is_idle(struct spdk_nvmf_qpair *qpair)
1890 {
1891 	struct spdk_nvmf_rdma_qpair *rqpair;
1892 
1893 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
1894 
1895 	if (rqpair->cur_queue_depth == 0 && rqpair->cur_rdma_rw_depth == 0) {
1896 		return true;
1897 	}
1898 	return false;
1899 }
1900 
1901 const struct spdk_nvmf_transport_ops spdk_nvmf_transport_rdma = {
1902 	.type = SPDK_NVME_TRANSPORT_RDMA,
1903 	.create = spdk_nvmf_rdma_create,
1904 	.destroy = spdk_nvmf_rdma_destroy,
1905 
1906 	.listen = spdk_nvmf_rdma_listen,
1907 	.stop_listen = spdk_nvmf_rdma_stop_listen,
1908 	.accept = spdk_nvmf_rdma_accept,
1909 
1910 	.listener_discover = spdk_nvmf_rdma_discover,
1911 
1912 	.poll_group_create = spdk_nvmf_rdma_poll_group_create,
1913 	.poll_group_destroy = spdk_nvmf_rdma_poll_group_destroy,
1914 	.poll_group_add = spdk_nvmf_rdma_poll_group_add,
1915 	.poll_group_remove = spdk_nvmf_rdma_poll_group_remove,
1916 	.poll_group_poll = spdk_nvmf_rdma_poll_group_poll,
1917 
1918 	.req_complete = spdk_nvmf_rdma_request_complete,
1919 
1920 	.qpair_fini = spdk_nvmf_rdma_close_qpair,
1921 	.qpair_is_idle = spdk_nvmf_rdma_qpair_is_idle,
1922 
1923 };
1924 
1925 SPDK_LOG_REGISTER_COMPONENT("rdma", SPDK_LOG_RDMA)
1926