xref: /spdk/lib/nvmf/rdma.c (revision 97f3104bc7b8f305409105d19b7e6d6d87d56da5)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include <infiniband/verbs.h>
37 #include <rdma/rdma_cma.h>
38 #include <rdma/rdma_verbs.h>
39 
40 #include "nvmf_internal.h"
41 #include "transport.h"
42 
43 #include "spdk/assert.h"
44 #include "spdk/io_channel.h"
45 #include "spdk/nvmf.h"
46 #include "spdk/nvmf_spec.h"
47 #include "spdk/string.h"
48 #include "spdk/trace.h"
49 #include "spdk/util.h"
50 
51 #include "spdk_internal/log.h"
52 
53 /*
54  RDMA Connection Resouce Defaults
55  */
56 #define NVMF_DEFAULT_TX_SGE		1
57 #define NVMF_DEFAULT_RX_SGE		2
58 
59 enum spdk_nvmf_rdma_request_state {
60 	/* The request is not currently in use */
61 	RDMA_REQUEST_STATE_FREE = 0,
62 
63 	/* Initial state when request first received */
64 	RDMA_REQUEST_STATE_NEW,
65 
66 	/* The request is queued until a data buffer is available. */
67 	RDMA_REQUEST_STATE_NEED_BUFFER,
68 
69 	/* The request is waiting on RDMA queue depth availability
70 	 * to transfer data from the host to the controller.
71 	 */
72 	RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER,
73 
74 	/* The request is currently transferring data from the host to the controller. */
75 	RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER,
76 
77 	/* The request is ready to execute at the block device */
78 	RDMA_REQUEST_STATE_READY_TO_EXECUTE,
79 
80 	/* The request is currently executing at the block device */
81 	RDMA_REQUEST_STATE_EXECUTING,
82 
83 	/* The request finished executing at the block device */
84 	RDMA_REQUEST_STATE_EXECUTED,
85 
86 	/* The request is waiting on RDMA queue depth availability
87 	 * to transfer data from the controller to the host.
88 	 */
89 	RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST,
90 
91 	/* The request is ready to send a completion */
92 	RDMA_REQUEST_STATE_READY_TO_COMPLETE,
93 
94 	/* The request currently has a completion outstanding */
95 	RDMA_REQUEST_STATE_COMPLETING,
96 
97 	/* The request completed and can be marked free. */
98 	RDMA_REQUEST_STATE_COMPLETED,
99 };
100 
101 /* This structure holds commands as they are received off the wire.
102  * It must be dynamically paired with a full request object
103  * (spdk_nvmf_rdma_request) to service a request. It is separate
104  * from the request because RDMA does not appear to order
105  * completions, so occasionally we'll get a new incoming
106  * command when there aren't any free request objects.
107  */
108 struct spdk_nvmf_rdma_recv {
109 	struct ibv_recv_wr		wr;
110 	struct ibv_sge			sgl[NVMF_DEFAULT_RX_SGE];
111 
112 	/* In-capsule data buffer */
113 	uint8_t				*buf;
114 
115 	TAILQ_ENTRY(spdk_nvmf_rdma_recv) link;
116 };
117 
118 struct spdk_nvmf_rdma_request {
119 	struct spdk_nvmf_request		req;
120 	bool					data_from_pool;
121 
122 	enum spdk_nvmf_rdma_request_state	state;
123 
124 	struct spdk_nvmf_rdma_recv		*recv;
125 
126 	struct {
127 		struct	ibv_send_wr		wr;
128 		struct	ibv_sge			sgl[NVMF_DEFAULT_TX_SGE];
129 	} rsp;
130 
131 	struct {
132 		struct ibv_send_wr		wr;
133 		struct ibv_sge			sgl[NVMF_DEFAULT_TX_SGE];
134 	} data;
135 
136 	TAILQ_ENTRY(spdk_nvmf_rdma_request)	link;
137 };
138 
139 struct spdk_nvmf_rdma_qpair {
140 	struct spdk_nvmf_qpair			qpair;
141 
142 	struct spdk_nvmf_rdma_port		*port;
143 
144 	struct rdma_cm_id			*cm_id;
145 	struct ibv_cq				*cq;
146 
147 	/* The maximum number of I/O outstanding on this connection at one time */
148 	uint16_t				max_queue_depth;
149 
150 	/* The maximum number of active RDMA READ and WRITE operations at one time */
151 	uint16_t				max_rw_depth;
152 
153 	/* The current number of I/O outstanding on this connection. This number
154 	 * includes all I/O from the time the capsule is first received until it is
155 	 * completed.
156 	 */
157 	uint16_t				cur_queue_depth;
158 
159 	/* The number of RDMA READ and WRITE requests that are outstanding */
160 	uint16_t				cur_rdma_rw_depth;
161 
162 	/* Receives that are waiting for a request object */
163 	TAILQ_HEAD(, spdk_nvmf_rdma_recv)	incoming_queue;
164 
165 	/* Requests that are not in use */
166 	TAILQ_HEAD(, spdk_nvmf_rdma_request)	free_queue;
167 
168 	/* Requests that are waiting to perform an RDMA READ or WRITE */
169 	TAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_rdma_rw_queue;
170 
171 	/* Array of size "max_queue_depth" containing RDMA requests. */
172 	struct spdk_nvmf_rdma_request		*reqs;
173 
174 	/* Array of size "max_queue_depth" containing RDMA recvs. */
175 	struct spdk_nvmf_rdma_recv		*recvs;
176 
177 	/* Array of size "max_queue_depth" containing 64 byte capsules
178 	 * used for receive.
179 	 */
180 	union nvmf_h2c_msg			*cmds;
181 	struct ibv_mr				*cmds_mr;
182 
183 	/* Array of size "max_queue_depth" containing 16 byte completions
184 	 * to be sent back to the user.
185 	 */
186 	union nvmf_c2h_msg			*cpls;
187 	struct ibv_mr				*cpls_mr;
188 
189 	/* Array of size "max_queue_depth * InCapsuleDataSize" containing
190 	 * buffers to be used for in capsule data.
191 	 */
192 	void					*bufs;
193 	struct ibv_mr				*bufs_mr;
194 
195 	TAILQ_ENTRY(spdk_nvmf_rdma_qpair)	link;
196 	TAILQ_ENTRY(spdk_nvmf_rdma_qpair)	pending_link;
197 
198 	/* Mgmt channel */
199 	struct spdk_io_channel			*mgmt_channel;
200 	struct spdk_nvmf_rdma_mgmt_channel	*ch;
201 	struct spdk_thread                      *thread;
202 };
203 
204 /* List of RDMA connections that have not yet received a CONNECT capsule */
205 static TAILQ_HEAD(, spdk_nvmf_rdma_qpair) g_pending_conns = TAILQ_HEAD_INITIALIZER(g_pending_conns);
206 
207 struct spdk_nvmf_rdma_poller {
208 	struct spdk_nvmf_rdma_device		*device;
209 	struct spdk_nvmf_rdma_poll_group	*group;
210 
211 	TAILQ_HEAD(, spdk_nvmf_rdma_qpair)	qpairs;
212 
213 	TAILQ_ENTRY(spdk_nvmf_rdma_poller)	link;
214 };
215 
216 struct spdk_nvmf_rdma_poll_group {
217 	struct spdk_nvmf_transport_poll_group	group;
218 
219 	TAILQ_HEAD(, spdk_nvmf_rdma_poller)	pollers;
220 };
221 
222 /* Assuming rdma_cm uses just one protection domain per ibv_context. */
223 struct spdk_nvmf_rdma_device {
224 	struct ibv_device_attr			attr;
225 	struct ibv_context			*context;
226 
227 	struct spdk_mem_map			*map;
228 	struct ibv_pd				*pd;
229 
230 	TAILQ_ENTRY(spdk_nvmf_rdma_device)	link;
231 };
232 
233 struct spdk_nvmf_rdma_port {
234 	struct spdk_nvme_transport_id		trid;
235 	struct rdma_cm_id			*id;
236 	struct spdk_nvmf_rdma_device		*device;
237 	uint32_t				ref;
238 	TAILQ_ENTRY(spdk_nvmf_rdma_port)	link;
239 };
240 
241 struct spdk_nvmf_rdma_transport {
242 	struct spdk_nvmf_transport	transport;
243 
244 	struct rdma_event_channel	*event_channel;
245 
246 	struct spdk_mempool		*data_buf_pool;
247 
248 	pthread_mutex_t 		lock;
249 
250 	uint16_t 			max_queue_depth;
251 	uint32_t 			max_io_size;
252 	uint32_t 			in_capsule_data_size;
253 
254 	TAILQ_HEAD(, spdk_nvmf_rdma_device)	devices;
255 	TAILQ_HEAD(, spdk_nvmf_rdma_port)	ports;
256 };
257 
258 struct spdk_nvmf_rdma_mgmt_channel {
259 	/* Requests that are waiting to obtain a data buffer */
260 	TAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_data_buf_queue;
261 };
262 
263 static int
264 spdk_nvmf_rdma_mgmt_channel_create(void *io_device, void *ctx_buf)
265 {
266 	struct spdk_nvmf_rdma_mgmt_channel *ch = ctx_buf;
267 
268 	TAILQ_INIT(&ch->pending_data_buf_queue);
269 	return 0;
270 }
271 
272 static void
273 spdk_nvmf_rdma_mgmt_channel_destroy(void *io_device, void *ctx_buf)
274 {
275 	struct spdk_nvmf_rdma_mgmt_channel *ch = ctx_buf;
276 
277 	if (!TAILQ_EMPTY(&ch->pending_data_buf_queue)) {
278 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel destruction\n");
279 	}
280 }
281 
282 static int
283 spdk_nvmf_rdma_qpair_allocate_channel(struct spdk_nvmf_rdma_qpair *rdma_qpair,
284 				      struct spdk_nvmf_rdma_transport *rtransport)
285 {
286 	rdma_qpair->mgmt_channel = spdk_get_io_channel(rtransport);
287 	if (!rdma_qpair->mgmt_channel) {
288 		return -1;
289 	}
290 
291 	rdma_qpair->thread = spdk_get_thread();
292 	rdma_qpair->ch = spdk_io_channel_get_ctx(rdma_qpair->mgmt_channel);
293 	assert(rdma_qpair->ch != NULL);
294 	return 0;
295 }
296 
297 static void
298 spdk_nvmf_rdma_qpair_destroy(struct spdk_nvmf_rdma_qpair *rdma_qpair)
299 {
300 	if (rdma_qpair->cmds_mr) {
301 		ibv_dereg_mr(rdma_qpair->cmds_mr);
302 	}
303 
304 	if (rdma_qpair->cpls_mr) {
305 		ibv_dereg_mr(rdma_qpair->cpls_mr);
306 	}
307 
308 	if (rdma_qpair->bufs_mr) {
309 		ibv_dereg_mr(rdma_qpair->bufs_mr);
310 	}
311 
312 	if (rdma_qpair->cm_id) {
313 		rdma_destroy_qp(rdma_qpair->cm_id);
314 		rdma_destroy_id(rdma_qpair->cm_id);
315 	}
316 
317 	if (rdma_qpair->cq) {
318 		ibv_destroy_cq(rdma_qpair->cq);
319 	}
320 
321 	spdk_put_io_channel(rdma_qpair->mgmt_channel);
322 	/* Free all memory */
323 	spdk_dma_free(rdma_qpair->cmds);
324 	spdk_dma_free(rdma_qpair->cpls);
325 	spdk_dma_free(rdma_qpair->bufs);
326 	free(rdma_qpair->reqs);
327 	free(rdma_qpair->recvs);
328 	free(rdma_qpair);
329 }
330 
331 static struct spdk_nvmf_rdma_qpair *
332 spdk_nvmf_rdma_qpair_create(struct spdk_nvmf_transport *transport,
333 			    struct spdk_nvmf_rdma_port *port,
334 			    struct rdma_cm_id *id,
335 			    uint16_t max_queue_depth, uint16_t max_rw_depth, uint32_t subsystem_id)
336 {
337 	struct spdk_nvmf_rdma_transport *rtransport;
338 	struct spdk_nvmf_rdma_qpair	*rdma_qpair;
339 	struct spdk_nvmf_qpair		*qpair;
340 	int				rc, i;
341 	struct ibv_qp_init_attr		attr;
342 	struct spdk_nvmf_rdma_recv	*rdma_recv;
343 	struct spdk_nvmf_rdma_request	*rdma_req;
344 	char buf[64];
345 
346 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
347 
348 	rdma_qpair = calloc(1, sizeof(struct spdk_nvmf_rdma_qpair));
349 	if (rdma_qpair == NULL) {
350 		SPDK_ERRLOG("Could not allocate new connection.\n");
351 		return NULL;
352 	}
353 
354 	rdma_qpair->port = port;
355 	rdma_qpair->max_queue_depth = max_queue_depth;
356 	rdma_qpair->max_rw_depth = max_rw_depth;
357 	TAILQ_INIT(&rdma_qpair->incoming_queue);
358 	TAILQ_INIT(&rdma_qpair->free_queue);
359 	TAILQ_INIT(&rdma_qpair->pending_rdma_rw_queue);
360 
361 	rdma_qpair->cq = ibv_create_cq(id->verbs, max_queue_depth * 3, rdma_qpair, NULL, 0);
362 	if (!rdma_qpair->cq) {
363 		spdk_strerror_r(errno, buf, sizeof(buf));
364 		SPDK_ERRLOG("Unable to create completion queue\n");
365 		SPDK_ERRLOG("Errno %d: %s\n", errno, buf);
366 		rdma_destroy_id(id);
367 		spdk_nvmf_rdma_qpair_destroy(rdma_qpair);
368 		return NULL;
369 	}
370 
371 	memset(&attr, 0, sizeof(struct ibv_qp_init_attr));
372 	attr.qp_type		= IBV_QPT_RC;
373 	attr.send_cq		= rdma_qpair->cq;
374 	attr.recv_cq		= rdma_qpair->cq;
375 	attr.cap.max_send_wr	= max_queue_depth * 2; /* SEND, READ, and WRITE operations */
376 	attr.cap.max_recv_wr	= max_queue_depth; /* RECV operations */
377 	attr.cap.max_send_sge	= NVMF_DEFAULT_TX_SGE;
378 	attr.cap.max_recv_sge	= NVMF_DEFAULT_RX_SGE;
379 
380 	rc = rdma_create_qp(id, NULL, &attr);
381 	if (rc) {
382 		spdk_strerror_r(errno, buf, sizeof(buf));
383 		SPDK_ERRLOG("rdma_create_qp failed\n");
384 		SPDK_ERRLOG("Errno %d: %s\n", errno, buf);
385 		rdma_destroy_id(id);
386 		spdk_nvmf_rdma_qpair_destroy(rdma_qpair);
387 		return NULL;
388 	}
389 
390 	qpair = &rdma_qpair->qpair;
391 	qpair->transport = transport;
392 	id->context = qpair;
393 	rdma_qpair->cm_id = id;
394 
395 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "New RDMA Connection: %p\n", qpair);
396 
397 	rdma_qpair->reqs = calloc(max_queue_depth, sizeof(*rdma_qpair->reqs));
398 	rdma_qpair->recvs = calloc(max_queue_depth, sizeof(*rdma_qpair->recvs));
399 	rdma_qpair->cmds = spdk_dma_zmalloc(max_queue_depth * sizeof(*rdma_qpair->cmds),
400 					    0x1000, NULL);
401 	rdma_qpair->cpls = spdk_dma_zmalloc(max_queue_depth * sizeof(*rdma_qpair->cpls),
402 					    0x1000, NULL);
403 	rdma_qpair->bufs = spdk_dma_zmalloc(max_queue_depth * rtransport->in_capsule_data_size,
404 					    0x1000, NULL);
405 	if (!rdma_qpair->reqs || !rdma_qpair->recvs || !rdma_qpair->cmds ||
406 	    !rdma_qpair->cpls || !rdma_qpair->bufs) {
407 		SPDK_ERRLOG("Unable to allocate sufficient memory for RDMA queue.\n");
408 		spdk_nvmf_rdma_qpair_destroy(rdma_qpair);
409 		return NULL;
410 	}
411 
412 	rdma_qpair->cmds_mr = ibv_reg_mr(id->pd, rdma_qpair->cmds,
413 					 max_queue_depth * sizeof(*rdma_qpair->cmds),
414 					 IBV_ACCESS_LOCAL_WRITE);
415 	rdma_qpair->cpls_mr = ibv_reg_mr(id->pd, rdma_qpair->cpls,
416 					 max_queue_depth * sizeof(*rdma_qpair->cpls),
417 					 0);
418 	rdma_qpair->bufs_mr = ibv_reg_mr(id->pd, rdma_qpair->bufs,
419 					 max_queue_depth * rtransport->in_capsule_data_size,
420 					 IBV_ACCESS_LOCAL_WRITE |
421 					 IBV_ACCESS_REMOTE_WRITE);
422 	if (!rdma_qpair->cmds_mr || !rdma_qpair->cpls_mr || !rdma_qpair->bufs_mr) {
423 		SPDK_ERRLOG("Unable to register required memory for RDMA queue.\n");
424 		spdk_nvmf_rdma_qpair_destroy(rdma_qpair);
425 		return NULL;
426 	}
427 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Command Array: %p Length: %lx LKey: %x\n",
428 		      rdma_qpair->cmds, max_queue_depth * sizeof(*rdma_qpair->cmds), rdma_qpair->cmds_mr->lkey);
429 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Completion Array: %p Length: %lx LKey: %x\n",
430 		      rdma_qpair->cpls, max_queue_depth * sizeof(*rdma_qpair->cpls), rdma_qpair->cpls_mr->lkey);
431 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "In Capsule Data Array: %p Length: %x LKey: %x\n",
432 		      rdma_qpair->bufs, max_queue_depth * rtransport->in_capsule_data_size, rdma_qpair->bufs_mr->lkey);
433 
434 	for (i = 0; i < max_queue_depth; i++) {
435 		struct ibv_recv_wr *bad_wr = NULL;
436 
437 		rdma_recv = &rdma_qpair->recvs[i];
438 
439 		/* Set up memory to receive commands */
440 		rdma_recv->buf = (void *)((uintptr_t)rdma_qpair->bufs + (i * rtransport->in_capsule_data_size));
441 
442 		rdma_recv->sgl[0].addr = (uintptr_t)&rdma_qpair->cmds[i];
443 		rdma_recv->sgl[0].length = sizeof(rdma_qpair->cmds[i]);
444 		rdma_recv->sgl[0].lkey = rdma_qpair->cmds_mr->lkey;
445 
446 		rdma_recv->sgl[1].addr = (uintptr_t)rdma_recv->buf;
447 		rdma_recv->sgl[1].length = rtransport->in_capsule_data_size;
448 		rdma_recv->sgl[1].lkey = rdma_qpair->bufs_mr->lkey;
449 
450 		rdma_recv->wr.wr_id = (uintptr_t)rdma_recv;
451 		rdma_recv->wr.sg_list = rdma_recv->sgl;
452 		rdma_recv->wr.num_sge = SPDK_COUNTOF(rdma_recv->sgl);
453 
454 		rc = ibv_post_recv(rdma_qpair->cm_id->qp, &rdma_recv->wr, &bad_wr);
455 		if (rc) {
456 			SPDK_ERRLOG("Unable to post capsule for RDMA RECV\n");
457 			spdk_nvmf_rdma_qpair_destroy(rdma_qpair);
458 			return NULL;
459 		}
460 	}
461 
462 	for (i = 0; i < max_queue_depth; i++) {
463 		rdma_req = &rdma_qpair->reqs[i];
464 
465 		rdma_req->req.qpair = &rdma_qpair->qpair;
466 		rdma_req->req.cmd = NULL;
467 
468 		/* Set up memory to send responses */
469 		rdma_req->req.rsp = &rdma_qpair->cpls[i];
470 
471 		rdma_req->rsp.sgl[0].addr = (uintptr_t)&rdma_qpair->cpls[i];
472 		rdma_req->rsp.sgl[0].length = sizeof(rdma_qpair->cpls[i]);
473 		rdma_req->rsp.sgl[0].lkey = rdma_qpair->cpls_mr->lkey;
474 
475 		rdma_req->rsp.wr.wr_id = (uintptr_t)rdma_req;
476 		rdma_req->rsp.wr.next = NULL;
477 		rdma_req->rsp.wr.opcode = IBV_WR_SEND;
478 		rdma_req->rsp.wr.send_flags = IBV_SEND_SIGNALED;
479 		rdma_req->rsp.wr.sg_list = rdma_req->rsp.sgl;
480 		rdma_req->rsp.wr.num_sge = SPDK_COUNTOF(rdma_req->rsp.sgl);
481 
482 		/* Set up memory for data buffers */
483 		rdma_req->data.wr.wr_id = (uint64_t)rdma_req;
484 		rdma_req->data.wr.next = NULL;
485 		rdma_req->data.wr.send_flags = IBV_SEND_SIGNALED;
486 		rdma_req->data.wr.sg_list = rdma_req->data.sgl;
487 		rdma_req->data.wr.num_sge = SPDK_COUNTOF(rdma_req->data.sgl);
488 
489 		TAILQ_INSERT_TAIL(&rdma_qpair->free_queue, rdma_req, link);
490 	}
491 
492 	return rdma_qpair;
493 }
494 
495 static int
496 request_transfer_in(struct spdk_nvmf_request *req)
497 {
498 	int				rc;
499 	struct spdk_nvmf_rdma_request	*rdma_req;
500 	struct spdk_nvmf_qpair 		*qpair;
501 	struct spdk_nvmf_rdma_qpair 	*rdma_qpair;
502 	struct ibv_send_wr		*bad_wr = NULL;
503 
504 	qpair = req->qpair;
505 	rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
506 	rdma_qpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
507 
508 	assert(req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER);
509 
510 	rdma_qpair->cur_rdma_rw_depth++;
511 
512 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "RDMA READ POSTED. Request: %p Connection: %p\n", req, qpair);
513 	spdk_trace_record(TRACE_RDMA_READ_START, 0, 0, (uintptr_t)req, 0);
514 
515 	rdma_req->data.wr.opcode = IBV_WR_RDMA_READ;
516 	rdma_req->data.wr.next = NULL;
517 	rc = ibv_post_send(rdma_qpair->cm_id->qp, &rdma_req->data.wr, &bad_wr);
518 	if (rc) {
519 		SPDK_ERRLOG("Unable to transfer data from host to target\n");
520 		return -1;
521 	}
522 
523 	return 0;
524 }
525 
526 static int
527 request_transfer_out(struct spdk_nvmf_request *req)
528 {
529 	int 				rc;
530 	struct spdk_nvmf_rdma_request	*rdma_req;
531 	struct spdk_nvmf_qpair		*qpair;
532 	struct spdk_nvmf_rdma_qpair 	*rdma_qpair;
533 	struct spdk_nvme_cpl		*rsp;
534 	struct ibv_recv_wr		*bad_recv_wr = NULL;
535 	struct ibv_send_wr		*send_wr, *bad_send_wr = NULL;
536 
537 	qpair = req->qpair;
538 	rsp = &req->rsp->nvme_cpl;
539 	rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
540 	rdma_qpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
541 
542 	/* Advance our sq_head pointer */
543 	if (qpair->sq_head == qpair->sq_head_max) {
544 		qpair->sq_head = 0;
545 	} else {
546 		qpair->sq_head++;
547 	}
548 	rsp->sqhd = qpair->sq_head;
549 
550 	/* Post the capsule to the recv buffer */
551 	assert(rdma_req->recv != NULL);
552 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "RDMA RECV POSTED. Recv: %p Connection: %p\n", rdma_req->recv,
553 		      rdma_qpair);
554 	rc = ibv_post_recv(rdma_qpair->cm_id->qp, &rdma_req->recv->wr, &bad_recv_wr);
555 	if (rc) {
556 		SPDK_ERRLOG("Unable to re-post rx descriptor\n");
557 		return rc;
558 	}
559 	rdma_req->recv = NULL;
560 
561 	/* Build the response which consists of an optional
562 	 * RDMA WRITE to transfer data, plus an RDMA SEND
563 	 * containing the response.
564 	 */
565 	send_wr = &rdma_req->rsp.wr;
566 
567 	if (rsp->status.sc == SPDK_NVME_SC_SUCCESS &&
568 	    req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
569 		SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "RDMA WRITE POSTED. Request: %p Connection: %p\n", req, qpair);
570 		spdk_trace_record(TRACE_RDMA_WRITE_START, 0, 0, (uintptr_t)req, 0);
571 
572 		rdma_qpair->cur_rdma_rw_depth++;
573 		rdma_req->data.wr.opcode = IBV_WR_RDMA_WRITE;
574 
575 		rdma_req->data.wr.next = send_wr;
576 		send_wr = &rdma_req->data.wr;
577 	}
578 
579 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "RDMA SEND POSTED. Request: %p Connection: %p\n", req, qpair);
580 	spdk_trace_record(TRACE_NVMF_IO_COMPLETE, 0, 0, (uintptr_t)req, 0);
581 
582 	/* Send the completion */
583 	rc = ibv_post_send(rdma_qpair->cm_id->qp, send_wr, &bad_send_wr);
584 	if (rc) {
585 		SPDK_ERRLOG("Unable to send response capsule\n");
586 	}
587 
588 	return rc;
589 }
590 
591 static int
592 nvmf_rdma_connect(struct spdk_nvmf_transport *transport, struct rdma_cm_event *event)
593 {
594 	struct spdk_nvmf_rdma_transport *rtransport;
595 	struct spdk_nvmf_rdma_qpair	*rdma_qpair = NULL;
596 	struct spdk_nvmf_rdma_port 	*port;
597 	struct rdma_conn_param		*rdma_param = NULL;
598 	struct rdma_conn_param		ctrlr_event_data;
599 	const struct spdk_nvmf_rdma_request_private_data *private_data = NULL;
600 	struct spdk_nvmf_rdma_accept_private_data accept_data;
601 	uint16_t			sts = 0;
602 	uint16_t			max_queue_depth;
603 	uint16_t			max_rw_depth;
604 	uint32_t			subsystem_id = 0;
605 	int 				rc;
606 
607 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
608 
609 	if (event->id == NULL) {
610 		SPDK_ERRLOG("connect request: missing cm_id\n");
611 		goto err0;
612 	}
613 
614 	if (event->id->verbs == NULL) {
615 		SPDK_ERRLOG("connect request: missing cm_id ibv_context\n");
616 		goto err0;
617 	}
618 
619 	rdma_param = &event->param.conn;
620 	if (rdma_param->private_data == NULL ||
621 	    rdma_param->private_data_len < sizeof(struct spdk_nvmf_rdma_request_private_data)) {
622 		SPDK_ERRLOG("connect request: no private data provided\n");
623 		goto err0;
624 	}
625 	private_data = rdma_param->private_data;
626 
627 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Connect Recv on fabric intf name %s, dev_name %s\n",
628 		      event->id->verbs->device->name, event->id->verbs->device->dev_name);
629 
630 	port = event->listen_id->context;
631 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Listen Id was %p with verbs %p. ListenAddr: %p\n",
632 		      event->listen_id, event->listen_id->verbs, port);
633 
634 	/* Figure out the supported queue depth. This is a multi-step process
635 	 * that takes into account hardware maximums, host provided values,
636 	 * and our target's internal memory limits */
637 
638 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Calculating Queue Depth\n");
639 
640 	/* Start with the maximum queue depth allowed by the target */
641 	max_queue_depth = rtransport->max_queue_depth;
642 	max_rw_depth = rtransport->max_queue_depth;
643 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Target Max Queue Depth: %d\n", rtransport->max_queue_depth);
644 
645 	/* Next check the local NIC's hardware limitations */
646 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA,
647 		      "Local NIC Max Send/Recv Queue Depth: %d Max Read/Write Queue Depth: %d\n",
648 		      port->device->attr.max_qp_wr, port->device->attr.max_qp_rd_atom);
649 	max_queue_depth = spdk_min(max_queue_depth, port->device->attr.max_qp_wr);
650 	max_rw_depth = spdk_min(max_rw_depth, port->device->attr.max_qp_rd_atom);
651 
652 	/* Next check the remote NIC's hardware limitations */
653 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA,
654 		      "Host (Initiator) NIC Max Incoming RDMA R/W operations: %d Max Outgoing RDMA R/W operations: %d\n",
655 		      rdma_param->initiator_depth, rdma_param->responder_resources);
656 	if (rdma_param->initiator_depth > 0) {
657 		max_rw_depth = spdk_min(max_rw_depth, rdma_param->initiator_depth);
658 	}
659 
660 	/* Finally check for the host software requested values, which are
661 	 * optional. */
662 	if (rdma_param->private_data != NULL &&
663 	    rdma_param->private_data_len >= sizeof(struct spdk_nvmf_rdma_request_private_data)) {
664 		SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Host Receive Queue Size: %d\n", private_data->hrqsize);
665 		SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Host Send Queue Size: %d\n", private_data->hsqsize);
666 		max_queue_depth = spdk_min(max_queue_depth, private_data->hrqsize);
667 		max_queue_depth = spdk_min(max_queue_depth, private_data->hsqsize + 1);
668 	}
669 
670 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Final Negotiated Queue Depth: %d R/W Depth: %d\n",
671 		      max_queue_depth, max_rw_depth);
672 
673 	/* Init the NVMf rdma transport connection */
674 	rdma_qpair = spdk_nvmf_rdma_qpair_create(transport, port, event->id, max_queue_depth,
675 			max_rw_depth, subsystem_id);
676 	if (rdma_qpair == NULL) {
677 		SPDK_ERRLOG("Error on nvmf connection creation\n");
678 		goto err1;
679 	}
680 
681 	accept_data.recfmt = 0;
682 	accept_data.crqsize = max_queue_depth;
683 	ctrlr_event_data = *rdma_param;
684 	ctrlr_event_data.private_data = &accept_data;
685 	ctrlr_event_data.private_data_len = sizeof(accept_data);
686 	if (event->id->ps == RDMA_PS_TCP) {
687 		ctrlr_event_data.responder_resources = 0; /* We accept 0 reads from the host */
688 		ctrlr_event_data.initiator_depth = max_rw_depth;
689 	}
690 
691 	rc = rdma_accept(event->id, &ctrlr_event_data);
692 	if (rc) {
693 		SPDK_ERRLOG("Error %d on rdma_accept\n", errno);
694 		goto err2;
695 	}
696 	SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Sent back the accept\n");
697 
698 	/* Add this RDMA connection to the global list until a CONNECT capsule
699 	 * is received. */
700 	TAILQ_INSERT_TAIL(&g_pending_conns, rdma_qpair, pending_link);
701 
702 	rc = spdk_nvmf_rdma_qpair_allocate_channel(rdma_qpair, rtransport);
703 	if (rc) {
704 		goto err2;
705 	}
706 
707 	return 0;
708 
709 err2:
710 	spdk_nvmf_rdma_qpair_destroy(rdma_qpair);
711 
712 err1: {
713 		struct spdk_nvmf_rdma_reject_private_data rej_data;
714 
715 		rej_data.status.sc = sts;
716 		rdma_reject(event->id, &ctrlr_event_data, sizeof(rej_data));
717 	}
718 err0:
719 	return -1;
720 }
721 
722 static void
723 nvmf_rdma_handle_disconnect(void *ctx)
724 {
725 	struct spdk_nvmf_qpair *qpair = ctx;
726 
727 	spdk_nvmf_ctrlr_disconnect(qpair);
728 }
729 
730 static int
731 nvmf_rdma_disconnect(struct rdma_cm_event *evt)
732 {
733 	struct spdk_nvmf_qpair		*qpair;
734 	struct spdk_nvmf_ctrlr		*ctrlr;
735 	struct spdk_nvmf_rdma_qpair 	*rdma_qpair;
736 	struct spdk_nvmf_rdma_qpair	*r, *t;
737 
738 	if (evt->id == NULL) {
739 		SPDK_ERRLOG("disconnect request: missing cm_id\n");
740 		return -1;
741 	}
742 
743 	qpair = evt->id->context;
744 	if (qpair == NULL) {
745 		SPDK_ERRLOG("disconnect request: no active connection\n");
746 		return -1;
747 	}
748 	/* ack the disconnect event before rdma_destroy_id */
749 	rdma_ack_cm_event(evt);
750 
751 	rdma_qpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
752 
753 	/* The connection may still be in this pending list when a disconnect
754 	 * event arrives. Search for it and remove it if it is found.
755 	 */
756 	TAILQ_FOREACH_SAFE(r, &g_pending_conns, pending_link, t) {
757 		if (r == rdma_qpair) {
758 			SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Received disconnect for qpair %p before first SEND ack\n",
759 				      rdma_qpair);
760 			TAILQ_REMOVE(&g_pending_conns, rdma_qpair, pending_link);
761 			break;
762 		}
763 	}
764 
765 	ctrlr = qpair->ctrlr;
766 	if (ctrlr == NULL) {
767 		/* No ctrlr has been established yet, so destroy
768 		 * the connection immediately.
769 		 */
770 		spdk_nvmf_rdma_qpair_destroy(rdma_qpair);
771 		return 0;
772 	}
773 
774 	spdk_thread_send_msg(qpair->thread, nvmf_rdma_handle_disconnect, qpair);
775 
776 	return 0;
777 }
778 
779 #ifdef DEBUG
780 static const char *CM_EVENT_STR[] = {
781 	"RDMA_CM_EVENT_ADDR_RESOLVED",
782 	"RDMA_CM_EVENT_ADDR_ERROR",
783 	"RDMA_CM_EVENT_ROUTE_RESOLVED",
784 	"RDMA_CM_EVENT_ROUTE_ERROR",
785 	"RDMA_CM_EVENT_CONNECT_REQUEST",
786 	"RDMA_CM_EVENT_CONNECT_RESPONSE",
787 	"RDMA_CM_EVENT_CONNECT_ERROR",
788 	"RDMA_CM_EVENT_UNREACHABLE",
789 	"RDMA_CM_EVENT_REJECTED",
790 	"RDMA_CM_EVENT_ESTABLISHED",
791 	"RDMA_CM_EVENT_DISCONNECTED",
792 	"RDMA_CM_EVENT_DEVICE_REMOVAL",
793 	"RDMA_CM_EVENT_MULTICAST_JOIN",
794 	"RDMA_CM_EVENT_MULTICAST_ERROR",
795 	"RDMA_CM_EVENT_ADDR_CHANGE",
796 	"RDMA_CM_EVENT_TIMEWAIT_EXIT"
797 };
798 #endif /* DEBUG */
799 
800 static int
801 spdk_nvmf_rdma_mem_notify(void *cb_ctx, struct spdk_mem_map *map,
802 			  enum spdk_mem_map_notify_action action,
803 			  void *vaddr, size_t size)
804 {
805 	struct spdk_nvmf_rdma_device *device = cb_ctx;
806 	struct ibv_pd *pd = device->pd;
807 	struct ibv_mr *mr;
808 
809 	switch (action) {
810 	case SPDK_MEM_MAP_NOTIFY_REGISTER:
811 		mr = ibv_reg_mr(pd, vaddr, size,
812 				IBV_ACCESS_LOCAL_WRITE |
813 				IBV_ACCESS_REMOTE_READ |
814 				IBV_ACCESS_REMOTE_WRITE);
815 		if (mr == NULL) {
816 			SPDK_ERRLOG("ibv_reg_mr() failed\n");
817 			return -1;
818 		} else {
819 			spdk_mem_map_set_translation(map, (uint64_t)vaddr, size, (uint64_t)mr);
820 		}
821 		break;
822 	case SPDK_MEM_MAP_NOTIFY_UNREGISTER:
823 		mr = (struct ibv_mr *)spdk_mem_map_translate(map, (uint64_t)vaddr);
824 		spdk_mem_map_clear_translation(map, (uint64_t)vaddr, size);
825 		if (mr) {
826 			ibv_dereg_mr(mr);
827 		}
828 		break;
829 	}
830 
831 	return 0;
832 }
833 
834 typedef enum spdk_nvme_data_transfer spdk_nvme_data_transfer_t;
835 
836 static spdk_nvme_data_transfer_t
837 spdk_nvmf_rdma_request_get_xfer(struct spdk_nvmf_rdma_request *rdma_req)
838 {
839 	enum spdk_nvme_data_transfer xfer;
840 	struct spdk_nvme_cmd *cmd = &rdma_req->req.cmd->nvme_cmd;
841 	struct spdk_nvme_sgl_descriptor *sgl = &cmd->dptr.sgl1;
842 
843 	/* Figure out data transfer direction */
844 	if (cmd->opc == SPDK_NVME_OPC_FABRIC) {
845 		xfer = spdk_nvme_opc_get_data_transfer(rdma_req->req.cmd->nvmf_cmd.fctype);
846 	} else {
847 		xfer = spdk_nvme_opc_get_data_transfer(cmd->opc);
848 
849 		/* Some admin commands are special cases */
850 		if ((rdma_req->req.qpair->qid == 0) &&
851 		    ((cmd->opc == SPDK_NVME_OPC_GET_FEATURES) ||
852 		     (cmd->opc == SPDK_NVME_OPC_SET_FEATURES))) {
853 			switch (cmd->cdw10 & 0xff) {
854 			case SPDK_NVME_FEAT_LBA_RANGE_TYPE:
855 			case SPDK_NVME_FEAT_AUTONOMOUS_POWER_STATE_TRANSITION:
856 			case SPDK_NVME_FEAT_HOST_IDENTIFIER:
857 				break;
858 			default:
859 				xfer = SPDK_NVME_DATA_NONE;
860 			}
861 		}
862 	}
863 
864 	if (xfer == SPDK_NVME_DATA_NONE) {
865 		return xfer;
866 	}
867 
868 	/* Even for commands that may transfer data, they could have specified 0 length.
869 	 * We want those to show up with xfer SPDK_NVME_DATA_NONE.
870 	 */
871 	switch (sgl->generic.type) {
872 	case SPDK_NVME_SGL_TYPE_DATA_BLOCK:
873 	case SPDK_NVME_SGL_TYPE_BIT_BUCKET:
874 	case SPDK_NVME_SGL_TYPE_SEGMENT:
875 	case SPDK_NVME_SGL_TYPE_LAST_SEGMENT:
876 		if (sgl->unkeyed.length == 0) {
877 			xfer = SPDK_NVME_DATA_NONE;
878 		}
879 		break;
880 	case SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK:
881 		if (sgl->keyed.length == 0) {
882 			xfer = SPDK_NVME_DATA_NONE;
883 		}
884 		break;
885 	}
886 
887 	return xfer;
888 }
889 
890 static int
891 spdk_nvmf_rdma_request_parse_sgl(struct spdk_nvmf_rdma_transport *rtransport,
892 				 struct spdk_nvmf_rdma_device *device,
893 				 struct spdk_nvmf_rdma_request *rdma_req)
894 {
895 	struct spdk_nvme_cmd			*cmd;
896 	struct spdk_nvme_cpl			*rsp;
897 	struct spdk_nvme_sgl_descriptor		*sgl;
898 
899 	cmd = &rdma_req->req.cmd->nvme_cmd;
900 	rsp = &rdma_req->req.rsp->nvme_cpl;
901 	sgl = &cmd->dptr.sgl1;
902 
903 	if (sgl->generic.type == SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK &&
904 	    (sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_ADDRESS ||
905 	     sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_INVALIDATE_KEY)) {
906 		if (sgl->keyed.length > rtransport->max_io_size) {
907 			SPDK_ERRLOG("SGL length 0x%x exceeds max io size 0x%x\n",
908 				    sgl->keyed.length, rtransport->max_io_size);
909 			rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID;
910 			return -1;
911 		}
912 
913 		rdma_req->req.length = sgl->keyed.length;
914 		rdma_req->req.data = spdk_mempool_get(rtransport->data_buf_pool);
915 		if (!rdma_req->req.data) {
916 			/* No available buffers. Queue this request up. */
917 			SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "No available large data buffers. Queueing request %p\n", rdma_req);
918 			return 0;
919 		}
920 
921 		rdma_req->data_from_pool = true;
922 		rdma_req->data.sgl[0].addr = (uintptr_t)rdma_req->req.data;
923 		rdma_req->data.sgl[0].length = sgl->keyed.length;
924 		rdma_req->data.sgl[0].lkey = ((struct ibv_mr *)spdk_mem_map_translate(device->map,
925 					      (uint64_t)rdma_req->req.data))->lkey;
926 		rdma_req->data.wr.wr.rdma.rkey = sgl->keyed.key;
927 		rdma_req->data.wr.wr.rdma.remote_addr = sgl->address;
928 
929 		SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Request %p took buffer from central pool\n", rdma_req);
930 
931 		return 0;
932 	} else if (sgl->generic.type == SPDK_NVME_SGL_TYPE_DATA_BLOCK &&
933 		   sgl->unkeyed.subtype == SPDK_NVME_SGL_SUBTYPE_OFFSET) {
934 		uint64_t offset = sgl->address;
935 		uint32_t max_len = rtransport->in_capsule_data_size;
936 
937 		SPDK_DEBUGLOG(SPDK_TRACE_NVMF, "In-capsule data: offset 0x%" PRIx64 ", length 0x%x\n",
938 			      offset, sgl->unkeyed.length);
939 
940 		if (offset > max_len) {
941 			SPDK_ERRLOG("In-capsule offset 0x%" PRIx64 " exceeds capsule length 0x%x\n",
942 				    offset, max_len);
943 			rsp->status.sc = SPDK_NVME_SC_INVALID_SGL_OFFSET;
944 			return -1;
945 		}
946 		max_len -= (uint32_t)offset;
947 
948 		if (sgl->unkeyed.length > max_len) {
949 			SPDK_ERRLOG("In-capsule data length 0x%x exceeds capsule length 0x%x\n",
950 				    sgl->unkeyed.length, max_len);
951 			rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID;
952 			return -1;
953 		}
954 
955 		rdma_req->req.data = rdma_req->recv->buf + offset;
956 		rdma_req->data_from_pool = false;
957 		rdma_req->req.length = sgl->unkeyed.length;
958 		return 0;
959 	}
960 
961 	SPDK_ERRLOG("Invalid NVMf I/O Command SGL:  Type 0x%x, Subtype 0x%x\n",
962 		    sgl->generic.type, sgl->generic.subtype);
963 	rsp->status.sc = SPDK_NVME_SC_SGL_DESCRIPTOR_TYPE_INVALID;
964 	return -1;
965 }
966 
967 static bool
968 spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
969 			       struct spdk_nvmf_rdma_request *rdma_req)
970 {
971 	struct spdk_nvmf_rdma_qpair	*rqpair;
972 	struct spdk_nvmf_rdma_device	*device;
973 	struct spdk_nvme_cpl		*rsp = &rdma_req->req.rsp->nvme_cpl;
974 	int				rc;
975 	struct spdk_nvmf_rdma_recv	*rdma_recv;
976 	enum spdk_nvmf_rdma_request_state prev_state;
977 	bool				progress = false;
978 
979 	rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
980 	device = rqpair->port->device;
981 
982 	assert(rdma_req->state != RDMA_REQUEST_STATE_FREE);
983 
984 	/* The loop here is to allow for several back-to-back state changes. */
985 	do {
986 		prev_state = rdma_req->state;
987 
988 		SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Request %p entering state %d\n", rdma_req, prev_state);
989 
990 		switch (rdma_req->state) {
991 		case RDMA_REQUEST_STATE_FREE:
992 			/* Some external code must kick a request into RDMA_REQUEST_STATE_NEW
993 			 * to escape this state. */
994 			break;
995 		case RDMA_REQUEST_STATE_NEW:
996 			rqpair->cur_queue_depth++;
997 			rdma_recv = rdma_req->recv;
998 
999 			/* The first element of the SGL is the NVMe command */
1000 			rdma_req->req.cmd = (union nvmf_h2c_msg *)rdma_recv->sgl[0].addr;
1001 			memset(rdma_req->req.rsp, 0, sizeof(*rdma_req->req.rsp));
1002 
1003 			TAILQ_REMOVE(&rqpair->incoming_queue, rdma_recv, link);
1004 			TAILQ_REMOVE(&rqpair->free_queue, rdma_req, link);
1005 
1006 			/* The next state transition depends on the data transfer needs of this request. */
1007 			rdma_req->req.xfer = spdk_nvmf_rdma_request_get_xfer(rdma_req);
1008 
1009 			/* If no data to transfer, ready to execute. */
1010 			if (rdma_req->req.xfer == SPDK_NVME_DATA_NONE) {
1011 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
1012 				break;
1013 			}
1014 
1015 			rdma_req->state = RDMA_REQUEST_STATE_NEED_BUFFER;
1016 			TAILQ_INSERT_TAIL(&rqpair->ch->pending_data_buf_queue, rdma_req, link);
1017 			break;
1018 		case RDMA_REQUEST_STATE_NEED_BUFFER:
1019 			assert(rdma_req->req.xfer != SPDK_NVME_DATA_NONE);
1020 
1021 			if (rdma_req != TAILQ_FIRST(&rqpair->ch->pending_data_buf_queue)) {
1022 				/* This request needs to wait in line to obtain a buffer */
1023 				break;
1024 			}
1025 
1026 			/* Try to get a data buffer */
1027 			rc = spdk_nvmf_rdma_request_parse_sgl(rtransport, device, rdma_req);
1028 			if (rc < 0) {
1029 				TAILQ_REMOVE(&rqpair->ch->pending_data_buf_queue, rdma_req, link);
1030 				rsp->status.sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1031 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1032 				break;
1033 			}
1034 
1035 			if (!rdma_req->req.data) {
1036 				/* No buffers available. */
1037 				break;
1038 			}
1039 
1040 			TAILQ_REMOVE(&rqpair->ch->pending_data_buf_queue, rdma_req, link);
1041 
1042 			/* If data is transferring from host to controller and the data didn't
1043 			 * arrive using in capsule data, we need to do a transfer from the host.
1044 			 */
1045 			if (rdma_req->req.xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER && rdma_req->data_from_pool) {
1046 				rdma_req->state = RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER;
1047 				TAILQ_INSERT_TAIL(&rqpair->pending_rdma_rw_queue, rdma_req, link);
1048 				break;
1049 			}
1050 
1051 			rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
1052 			break;
1053 		case RDMA_REQUEST_STATE_TRANSFER_PENDING_HOST_TO_CONTROLLER:
1054 			if (rdma_req != TAILQ_FIRST(&rqpair->pending_rdma_rw_queue)) {
1055 				/* This request needs to wait in line to perform RDMA */
1056 				break;
1057 			}
1058 
1059 			if (rqpair->cur_rdma_rw_depth < rqpair->max_rw_depth) {
1060 				TAILQ_REMOVE(&rqpair->pending_rdma_rw_queue, rdma_req, link);
1061 				rdma_req->state = RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER;
1062 				rc = request_transfer_in(&rdma_req->req);
1063 				if (rc) {
1064 					rsp->status.sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1065 					rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1066 				}
1067 			}
1068 			break;
1069 		case RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER:
1070 			/* Some external code must kick a request into RDMA_REQUEST_STATE_READY_TO_EXECUTE
1071 			 * to escape this state. */
1072 			break;
1073 		case RDMA_REQUEST_STATE_READY_TO_EXECUTE:
1074 			rdma_req->state = RDMA_REQUEST_STATE_EXECUTING;
1075 			spdk_nvmf_request_exec(&rdma_req->req);
1076 			break;
1077 		case RDMA_REQUEST_STATE_EXECUTING:
1078 			/* Some external code must kick a request into RDMA_REQUEST_STATE_EXECUTED
1079 			 * to escape this state. */
1080 			break;
1081 		case RDMA_REQUEST_STATE_EXECUTED:
1082 			if (rdma_req->req.xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
1083 				rdma_req->state = RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST;
1084 				TAILQ_INSERT_TAIL(&rqpair->pending_rdma_rw_queue, rdma_req, link);
1085 			} else {
1086 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1087 			}
1088 			break;
1089 		case RDMA_REQUEST_STATE_TRANSFER_PENDING_CONTROLLER_TO_HOST:
1090 			if (rdma_req != TAILQ_FIRST(&rqpair->pending_rdma_rw_queue)) {
1091 				/* This request needs to wait in line to perform RDMA */
1092 				break;
1093 			}
1094 
1095 			if (rqpair->cur_rdma_rw_depth < rqpair->max_rw_depth) {
1096 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1097 				TAILQ_REMOVE(&rqpair->pending_rdma_rw_queue, rdma_req, link);
1098 			}
1099 			break;
1100 		case RDMA_REQUEST_STATE_READY_TO_COMPLETE:
1101 			rdma_req->state = RDMA_REQUEST_STATE_COMPLETING;
1102 
1103 			rc = request_transfer_out(&rdma_req->req);
1104 			assert(rc == 0); /* No good way to handle this currently */
1105 			break;
1106 		case RDMA_REQUEST_STATE_COMPLETING:
1107 			/* Some external code must kick a request into RDMA_REQUEST_STATE_COMPLETED
1108 			 * to escape this state. */
1109 			break;
1110 		case RDMA_REQUEST_STATE_COMPLETED:
1111 			assert(rqpair->cur_queue_depth > 0);
1112 			rqpair->cur_queue_depth--;
1113 
1114 			if (rdma_req->data_from_pool) {
1115 				/* Put the buffer back in the pool */
1116 				spdk_mempool_put(rtransport->data_buf_pool, rdma_req->req.data);
1117 				rdma_req->data_from_pool = false;
1118 			}
1119 			rdma_req->req.length = 0;
1120 			rdma_req->req.data = NULL;
1121 			rdma_req->state = RDMA_REQUEST_STATE_FREE;
1122 			TAILQ_INSERT_TAIL(&rqpair->free_queue, rdma_req, link);
1123 			break;
1124 		}
1125 
1126 		if (rdma_req->state != prev_state) {
1127 			progress = true;
1128 		}
1129 	} while (rdma_req->state != prev_state);
1130 
1131 	return progress;
1132 }
1133 
1134 /* Public API callbacks begin here */
1135 
1136 static struct spdk_nvmf_transport *
1137 spdk_nvmf_rdma_create(struct spdk_nvmf_tgt *tgt)
1138 {
1139 	int rc;
1140 	struct spdk_nvmf_rdma_transport *rtransport;
1141 	struct spdk_nvmf_rdma_device	*device, *tmp;
1142 	struct ibv_context		**contexts;
1143 	uint32_t			i;
1144 	char				buf[64];
1145 
1146 	rtransport = calloc(1, sizeof(*rtransport));
1147 	if (!rtransport) {
1148 		return NULL;
1149 	}
1150 
1151 	pthread_mutex_init(&rtransport->lock, NULL);
1152 	TAILQ_INIT(&rtransport->devices);
1153 	TAILQ_INIT(&rtransport->ports);
1154 
1155 	rtransport->transport.tgt = tgt;
1156 	rtransport->transport.ops = &spdk_nvmf_transport_rdma;
1157 
1158 	SPDK_NOTICELOG("*** RDMA Transport Init ***\n");
1159 
1160 	rtransport->max_queue_depth = tgt->opts.max_queue_depth;
1161 	rtransport->max_io_size = tgt->opts.max_io_size;
1162 	rtransport->in_capsule_data_size = tgt->opts.in_capsule_data_size;
1163 
1164 	rtransport->event_channel = rdma_create_event_channel();
1165 	if (rtransport->event_channel == NULL) {
1166 		spdk_strerror_r(errno, buf, sizeof(buf));
1167 		SPDK_ERRLOG("rdma_create_event_channel() failed, %s\n", buf);
1168 		free(rtransport);
1169 		return NULL;
1170 	}
1171 
1172 	rc = fcntl(rtransport->event_channel->fd, F_SETFL, O_NONBLOCK);
1173 	if (rc < 0) {
1174 		SPDK_ERRLOG("fcntl to set fd to non-blocking failed\n");
1175 		free(rtransport);
1176 		return NULL;
1177 	}
1178 
1179 	rtransport->data_buf_pool = spdk_mempool_create("spdk_nvmf_rdma",
1180 				    rtransport->max_queue_depth * 4, /* The 4 is arbitrarily chosen. Needs to be configurable. */
1181 				    rtransport->max_io_size,
1182 				    SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
1183 				    SPDK_ENV_SOCKET_ID_ANY);
1184 	if (!rtransport->data_buf_pool) {
1185 		SPDK_ERRLOG("Unable to allocate buffer pool for poll group\n");
1186 		free(rtransport);
1187 		return NULL;
1188 	}
1189 
1190 	spdk_io_device_register(rtransport, spdk_nvmf_rdma_mgmt_channel_create,
1191 				spdk_nvmf_rdma_mgmt_channel_destroy,
1192 				sizeof(struct spdk_nvmf_rdma_mgmt_channel));
1193 
1194 	contexts = rdma_get_devices(NULL);
1195 	i = 0;
1196 	rc = 0;
1197 	while (contexts[i] != NULL) {
1198 		device = calloc(1, sizeof(*device));
1199 		if (!device) {
1200 			SPDK_ERRLOG("Unable to allocate memory for RDMA devices.\n");
1201 			rc = -ENOMEM;
1202 			break;
1203 		}
1204 		device->context = contexts[i];
1205 		rc = ibv_query_device(device->context, &device->attr);
1206 		if (rc < 0) {
1207 			SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
1208 			free(device);
1209 			break;
1210 
1211 		}
1212 
1213 		device->pd = NULL;
1214 		device->map = NULL;
1215 
1216 		TAILQ_INSERT_TAIL(&rtransport->devices, device, link);
1217 		i++;
1218 	}
1219 
1220 	if (rc < 0) {
1221 		TAILQ_FOREACH_SAFE(device, &rtransport->devices, link, tmp) {
1222 			TAILQ_REMOVE(&rtransport->devices, device, link);
1223 			free(device);
1224 		}
1225 		spdk_mempool_free(rtransport->data_buf_pool);
1226 		rdma_destroy_event_channel(rtransport->event_channel);
1227 		free(rtransport);
1228 		rdma_free_devices(contexts);
1229 		return NULL;
1230 	}
1231 
1232 	rdma_free_devices(contexts);
1233 
1234 	return &rtransport->transport;
1235 }
1236 
1237 static int
1238 spdk_nvmf_rdma_destroy(struct spdk_nvmf_transport *transport)
1239 {
1240 	struct spdk_nvmf_rdma_transport	*rtransport;
1241 	struct spdk_nvmf_rdma_port	*port, *port_tmp;
1242 	struct spdk_nvmf_rdma_device	*device, *device_tmp;
1243 
1244 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1245 
1246 	TAILQ_FOREACH_SAFE(port, &rtransport->ports, link, port_tmp) {
1247 		TAILQ_REMOVE(&rtransport->ports, port, link);
1248 		rdma_destroy_id(port->id);
1249 		free(port);
1250 	}
1251 
1252 	if (rtransport->event_channel != NULL) {
1253 		rdma_destroy_event_channel(rtransport->event_channel);
1254 	}
1255 
1256 	TAILQ_FOREACH_SAFE(device, &rtransport->devices, link, device_tmp) {
1257 		TAILQ_REMOVE(&rtransport->devices, device, link);
1258 		if (device->map) {
1259 			spdk_mem_map_free(&device->map);
1260 		}
1261 		free(device);
1262 	}
1263 
1264 	if (spdk_mempool_count(rtransport->data_buf_pool) != (rtransport->max_queue_depth * 4)) {
1265 		SPDK_ERRLOG("transport buffer pool count is %zu but should be %u\n",
1266 			    spdk_mempool_count(rtransport->data_buf_pool),
1267 			    rtransport->max_queue_depth * 4);
1268 	}
1269 
1270 	spdk_mempool_free(rtransport->data_buf_pool);
1271 	spdk_io_device_unregister(rtransport, NULL);
1272 	free(rtransport);
1273 
1274 	return 0;
1275 }
1276 
1277 static int
1278 spdk_nvmf_rdma_listen(struct spdk_nvmf_transport *transport,
1279 		      const struct spdk_nvme_transport_id *trid)
1280 {
1281 	struct spdk_nvmf_rdma_transport *rtransport;
1282 	struct spdk_nvmf_rdma_device	*device;
1283 	struct spdk_nvmf_rdma_port 	*port_tmp, *port;
1284 	struct sockaddr_in saddr;
1285 	int rc;
1286 
1287 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1288 
1289 	port = calloc(1, sizeof(*port));
1290 	if (!port) {
1291 		return -ENOMEM;
1292 	}
1293 
1294 	/* Selectively copy the trid. Things like NQN don't matter here - that
1295 	 * mapping is enforced elsewhere.
1296 	 */
1297 	port->trid.trtype = SPDK_NVME_TRANSPORT_RDMA;
1298 	port->trid.adrfam = trid->adrfam;
1299 	snprintf(port->trid.traddr, sizeof(port->trid.traddr), "%s", trid->traddr);
1300 	snprintf(port->trid.trsvcid, sizeof(port->trid.trsvcid), "%s", trid->trsvcid);
1301 
1302 	pthread_mutex_lock(&rtransport->lock);
1303 	assert(rtransport->event_channel != NULL);
1304 	TAILQ_FOREACH(port_tmp, &rtransport->ports, link) {
1305 		if (spdk_nvme_transport_id_compare(&port_tmp->trid, &port->trid) == 0) {
1306 			port_tmp->ref++;
1307 			free(port);
1308 			/* Already listening at this address */
1309 			pthread_mutex_unlock(&rtransport->lock);
1310 			return 0;
1311 		}
1312 	}
1313 
1314 	rc = rdma_create_id(rtransport->event_channel, &port->id, port, RDMA_PS_TCP);
1315 	if (rc < 0) {
1316 		SPDK_ERRLOG("rdma_create_id() failed\n");
1317 		free(port);
1318 		pthread_mutex_unlock(&rtransport->lock);
1319 		return rc;
1320 	}
1321 
1322 	memset(&saddr, 0, sizeof(saddr));
1323 	saddr.sin_family = AF_INET;
1324 	saddr.sin_addr.s_addr = inet_addr(port->trid.traddr);
1325 	saddr.sin_port = htons((uint16_t)strtoul(port->trid.trsvcid, NULL, 10));
1326 	rc = rdma_bind_addr(port->id, (struct sockaddr *)&saddr);
1327 	if (rc < 0) {
1328 		SPDK_ERRLOG("rdma_bind_addr() failed\n");
1329 		rdma_destroy_id(port->id);
1330 		free(port);
1331 		pthread_mutex_unlock(&rtransport->lock);
1332 		return rc;
1333 	}
1334 
1335 	rc = rdma_listen(port->id, 10); /* 10 = backlog */
1336 	if (rc < 0) {
1337 		SPDK_ERRLOG("rdma_listen() failed\n");
1338 		rdma_destroy_id(port->id);
1339 		free(port);
1340 		pthread_mutex_unlock(&rtransport->lock);
1341 		return rc;
1342 	}
1343 
1344 	TAILQ_FOREACH(device, &rtransport->devices, link) {
1345 		if (device->context == port->id->verbs) {
1346 			port->device = device;
1347 			break;
1348 		}
1349 	}
1350 	if (!port->device) {
1351 		SPDK_ERRLOG("Accepted a connection with verbs %p, but unable to find a corresponding device.\n",
1352 			    port->id->verbs);
1353 		rdma_destroy_id(port->id);
1354 		free(port);
1355 		pthread_mutex_unlock(&rtransport->lock);
1356 		return -EINVAL;
1357 	}
1358 
1359 	if (!device->map) {
1360 		device->pd = port->id->pd;
1361 		device->map = spdk_mem_map_alloc(0, spdk_nvmf_rdma_mem_notify, device);
1362 		if (!device->map) {
1363 			SPDK_ERRLOG("Unable to allocate memory map for new poll group\n");
1364 			return -1;
1365 		}
1366 	} else {
1367 		assert(device->pd == port->id->pd);
1368 	}
1369 
1370 	SPDK_NOTICELOG("*** NVMf Target Listening on %s port %d ***\n",
1371 		       port->trid.traddr, ntohs(rdma_get_src_port(port->id)));
1372 
1373 	port->ref = 1;
1374 
1375 	TAILQ_INSERT_TAIL(&rtransport->ports, port, link);
1376 	pthread_mutex_unlock(&rtransport->lock);
1377 
1378 	return 0;
1379 }
1380 
1381 static int
1382 spdk_nvmf_rdma_stop_listen(struct spdk_nvmf_transport *transport,
1383 			   const struct spdk_nvme_transport_id *_trid)
1384 {
1385 	struct spdk_nvmf_rdma_transport *rtransport;
1386 	struct spdk_nvmf_rdma_port *port, *tmp;
1387 	struct spdk_nvme_transport_id trid = {};
1388 
1389 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1390 
1391 	/* Selectively copy the trid. Things like NQN don't matter here - that
1392 	 * mapping is enforced elsewhere.
1393 	 */
1394 	trid.trtype = SPDK_NVME_TRANSPORT_RDMA;
1395 	trid.adrfam = _trid->adrfam;
1396 	snprintf(trid.traddr, sizeof(port->trid.traddr), "%s", _trid->traddr);
1397 	snprintf(trid.trsvcid, sizeof(port->trid.trsvcid), "%s", _trid->trsvcid);
1398 
1399 	pthread_mutex_lock(&rtransport->lock);
1400 	TAILQ_FOREACH_SAFE(port, &rtransport->ports, link, tmp) {
1401 		if (spdk_nvme_transport_id_compare(&port->trid, &trid) == 0) {
1402 			assert(port->ref > 0);
1403 			port->ref--;
1404 			if (port->ref == 0) {
1405 				TAILQ_REMOVE(&rtransport->ports, port, link);
1406 				rdma_destroy_id(port->id);
1407 				free(port);
1408 			}
1409 			break;
1410 		}
1411 	}
1412 
1413 	pthread_mutex_unlock(&rtransport->lock);
1414 	return 0;
1415 }
1416 
1417 static int
1418 spdk_nvmf_rdma_qpair_poll(struct spdk_nvmf_rdma_transport *rtransport,
1419 			  struct spdk_nvmf_rdma_qpair *rqpair);
1420 
1421 static void
1422 spdk_nvmf_rdma_accept(struct spdk_nvmf_transport *transport)
1423 {
1424 	struct spdk_nvmf_rdma_transport *rtransport;
1425 	struct rdma_cm_event		*event;
1426 	int				rc;
1427 	struct spdk_nvmf_rdma_qpair	*rdma_qpair, *tmp;
1428 	char buf[64];
1429 
1430 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1431 
1432 	if (rtransport->event_channel == NULL) {
1433 		return;
1434 	}
1435 
1436 	/* Process pending connections for incoming capsules. The only capsule
1437 	 * this should ever find is a CONNECT request. */
1438 	TAILQ_FOREACH_SAFE(rdma_qpair, &g_pending_conns, pending_link, tmp) {
1439 		rc = spdk_nvmf_rdma_qpair_poll(rtransport, rdma_qpair);
1440 		if (rc < 0) {
1441 			TAILQ_REMOVE(&g_pending_conns, rdma_qpair, pending_link);
1442 			spdk_nvmf_rdma_qpair_destroy(rdma_qpair);
1443 		} else if (rc > 0) {
1444 			spdk_put_io_channel(rdma_qpair->mgmt_channel);
1445 			rdma_qpair->mgmt_channel = NULL;
1446 			/* At least one request was processed which is assumed to be
1447 			 * a CONNECT. Remove this connection from our list. */
1448 			TAILQ_REMOVE(&g_pending_conns, rdma_qpair, pending_link);
1449 		}
1450 	}
1451 
1452 	while (1) {
1453 		rc = rdma_get_cm_event(rtransport->event_channel, &event);
1454 		if (rc == 0) {
1455 			SPDK_DEBUGLOG(SPDK_TRACE_RDMA, "Acceptor Event: %s\n", CM_EVENT_STR[event->event]);
1456 
1457 			switch (event->event) {
1458 			case RDMA_CM_EVENT_CONNECT_REQUEST:
1459 				rc = nvmf_rdma_connect(transport, event);
1460 				if (rc < 0) {
1461 					SPDK_ERRLOG("Unable to process connect event. rc: %d\n", rc);
1462 					break;
1463 				}
1464 				break;
1465 			case RDMA_CM_EVENT_ESTABLISHED:
1466 				break;
1467 			case RDMA_CM_EVENT_ADDR_CHANGE:
1468 			case RDMA_CM_EVENT_DISCONNECTED:
1469 			case RDMA_CM_EVENT_DEVICE_REMOVAL:
1470 			case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1471 				rc = nvmf_rdma_disconnect(event);
1472 				if (rc < 0) {
1473 					SPDK_ERRLOG("Unable to process disconnect event. rc: %d\n", rc);
1474 					break;
1475 				}
1476 				continue;
1477 			default:
1478 				SPDK_ERRLOG("Unexpected Acceptor Event [%d]\n", event->event);
1479 				break;
1480 			}
1481 
1482 			rdma_ack_cm_event(event);
1483 		} else {
1484 			if (errno != EAGAIN && errno != EWOULDBLOCK) {
1485 				spdk_strerror_r(errno, buf, sizeof(buf));
1486 				SPDK_ERRLOG("Acceptor Event Error: %s\n", buf);
1487 			}
1488 			break;
1489 		}
1490 	}
1491 }
1492 
1493 static void
1494 spdk_nvmf_rdma_discover(struct spdk_nvmf_transport *transport,
1495 			struct spdk_nvme_transport_id *trid,
1496 			struct spdk_nvmf_discovery_log_page_entry *entry)
1497 {
1498 	entry->trtype = SPDK_NVMF_TRTYPE_RDMA;
1499 	entry->adrfam = trid->adrfam;
1500 	entry->treq.secure_channel = SPDK_NVMF_TREQ_SECURE_CHANNEL_NOT_SPECIFIED;
1501 
1502 	spdk_strcpy_pad(entry->trsvcid, trid->trsvcid, sizeof(entry->trsvcid), ' ');
1503 	spdk_strcpy_pad(entry->traddr, trid->traddr, sizeof(entry->traddr), ' ');
1504 
1505 	entry->tsas.rdma.rdma_qptype = SPDK_NVMF_RDMA_QPTYPE_RELIABLE_CONNECTED;
1506 	entry->tsas.rdma.rdma_prtype = SPDK_NVMF_RDMA_PRTYPE_NONE;
1507 	entry->tsas.rdma.rdma_cms = SPDK_NVMF_RDMA_CMS_RDMA_CM;
1508 }
1509 
1510 static struct spdk_nvmf_transport_poll_group *
1511 spdk_nvmf_rdma_poll_group_create(struct spdk_nvmf_transport *transport)
1512 {
1513 	struct spdk_nvmf_rdma_transport		*rtransport;
1514 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1515 	struct spdk_nvmf_rdma_poller		*poller;
1516 	struct spdk_nvmf_rdma_device		*device;
1517 
1518 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1519 
1520 	rgroup = calloc(1, sizeof(*rgroup));
1521 	if (!rgroup) {
1522 		return NULL;
1523 	}
1524 
1525 	TAILQ_INIT(&rgroup->pollers);
1526 
1527 	pthread_mutex_lock(&rtransport->lock);
1528 	TAILQ_FOREACH(device, &rtransport->devices, link) {
1529 		if (device->map == NULL) {
1530 			/*
1531 			 * The device is not in use (no listeners),
1532 			 * so no protection domain has been constructed.
1533 			 * Skip it.
1534 			 */
1535 			SPDK_NOTICELOG("Skipping unused RDMA device when creating poll group.\n");
1536 			continue;
1537 		}
1538 
1539 		poller = calloc(1, sizeof(*poller));
1540 		if (!poller) {
1541 			SPDK_ERRLOG("Unable to allocate memory for new RDMA poller\n");
1542 			free(rgroup);
1543 			pthread_mutex_unlock(&rtransport->lock);
1544 			return NULL;
1545 		}
1546 
1547 		poller->device = device;
1548 		poller->group = rgroup;
1549 
1550 		TAILQ_INIT(&poller->qpairs);
1551 
1552 		TAILQ_INSERT_TAIL(&rgroup->pollers, poller, link);
1553 	}
1554 
1555 	pthread_mutex_unlock(&rtransport->lock);
1556 	return &rgroup->group;
1557 }
1558 
1559 static void
1560 spdk_nvmf_rdma_poll_group_destroy(struct spdk_nvmf_transport_poll_group *group)
1561 {
1562 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1563 	struct spdk_nvmf_rdma_poller		*poller, *tmp;
1564 
1565 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
1566 
1567 	if (!rgroup) {
1568 		return;
1569 	}
1570 
1571 	TAILQ_FOREACH_SAFE(poller, &rgroup->pollers, link, tmp) {
1572 		TAILQ_REMOVE(&rgroup->pollers, poller, link);
1573 		free(poller);
1574 	}
1575 
1576 	free(rgroup);
1577 }
1578 
1579 static int
1580 spdk_nvmf_rdma_poll_group_add(struct spdk_nvmf_transport_poll_group *group,
1581 			      struct spdk_nvmf_qpair *qpair)
1582 {
1583 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1584 	struct spdk_nvmf_rdma_qpair 		*rqpair;
1585 	struct spdk_nvmf_rdma_device 		*device;
1586 	struct spdk_nvmf_rdma_poller		*poller;
1587 
1588 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
1589 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
1590 
1591 	device = rqpair->port->device;
1592 
1593 	if (device->pd != rqpair->cm_id->pd) {
1594 		SPDK_ERRLOG("Mismatched protection domains\n");
1595 		return -1;
1596 	}
1597 
1598 	TAILQ_FOREACH(poller, &rgroup->pollers, link) {
1599 		if (poller->device == device) {
1600 			break;
1601 		}
1602 	}
1603 
1604 	if (!poller) {
1605 		SPDK_ERRLOG("No poller found for device.\n");
1606 		return -1;
1607 	}
1608 
1609 	TAILQ_INSERT_TAIL(&poller->qpairs, rqpair, link);
1610 
1611 	return 0;
1612 }
1613 
1614 static int
1615 spdk_nvmf_rdma_poll_group_remove(struct spdk_nvmf_transport_poll_group *group,
1616 				 struct spdk_nvmf_qpair *qpair)
1617 {
1618 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1619 	struct spdk_nvmf_rdma_qpair 		*rqpair;
1620 	struct spdk_nvmf_rdma_device 		*device;
1621 	struct spdk_nvmf_rdma_poller		*poller;
1622 	struct spdk_nvmf_rdma_qpair		*rq, *trq;
1623 
1624 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
1625 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
1626 
1627 	device = rqpair->port->device;
1628 
1629 	TAILQ_FOREACH(poller, &rgroup->pollers, link) {
1630 		if (poller->device == device) {
1631 			break;
1632 		}
1633 	}
1634 
1635 	if (!poller) {
1636 		SPDK_ERRLOG("No poller found for device.\n");
1637 		return -1;
1638 	}
1639 
1640 	TAILQ_FOREACH_SAFE(rq, &poller->qpairs, link, trq) {
1641 		if (rq == rqpair) {
1642 			TAILQ_REMOVE(&poller->qpairs, rqpair, link);
1643 			break;
1644 		}
1645 	}
1646 
1647 	if (rq == NULL) {
1648 		SPDK_ERRLOG("RDMA qpair cannot be removed from group (not in group).\n");
1649 		return -1;
1650 	}
1651 
1652 	return 0;
1653 }
1654 
1655 static int
1656 spdk_nvmf_rdma_request_complete(struct spdk_nvmf_request *req)
1657 {
1658 	struct spdk_nvmf_rdma_transport	*rtransport = SPDK_CONTAINEROF(req->qpair->transport,
1659 			struct spdk_nvmf_rdma_transport, transport);
1660 	struct spdk_nvmf_rdma_request	*rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
1661 
1662 	rdma_req->state = RDMA_REQUEST_STATE_EXECUTED;
1663 	spdk_nvmf_rdma_request_process(rtransport, rdma_req);
1664 
1665 	return 0;
1666 }
1667 
1668 static void
1669 spdk_nvmf_rdma_close_qpair(struct spdk_nvmf_qpair *qpair)
1670 {
1671 	spdk_nvmf_rdma_qpair_destroy(SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair));
1672 }
1673 
1674 static void
1675 spdk_nvmf_rdma_qpair_process_pending(struct spdk_nvmf_rdma_transport *rtransport,
1676 				     struct spdk_nvmf_rdma_qpair *rqpair)
1677 {
1678 	struct spdk_nvmf_rdma_recv	*rdma_recv, *recv_tmp;
1679 	struct spdk_nvmf_rdma_request	*rdma_req, *req_tmp;
1680 
1681 	/* We process I/O in the pending_rdma_rw queue at the highest priority. */
1682 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->pending_rdma_rw_queue, link, req_tmp) {
1683 		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
1684 			break;
1685 		}
1686 	}
1687 
1688 	/* The second highest priority is I/O waiting on memory buffers. */
1689 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->ch->pending_data_buf_queue, link, req_tmp) {
1690 		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
1691 			break;
1692 		}
1693 	}
1694 
1695 	/* The lowest priority is processing newly received commands */
1696 	TAILQ_FOREACH_SAFE(rdma_recv, &rqpair->incoming_queue, link, recv_tmp) {
1697 		rdma_req = TAILQ_FIRST(&rqpair->free_queue);
1698 		if (rdma_req == NULL) {
1699 			/* Need to wait for more SEND completions */
1700 			break;
1701 		}
1702 
1703 		rdma_req->recv = rdma_recv;
1704 		rdma_req->state = RDMA_REQUEST_STATE_NEW;
1705 		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
1706 			break;
1707 		}
1708 	}
1709 }
1710 
1711 static struct spdk_nvmf_rdma_request *
1712 get_rdma_req_from_wc(struct spdk_nvmf_rdma_qpair *rdma_qpair,
1713 		     struct ibv_wc *wc)
1714 {
1715 	struct spdk_nvmf_rdma_request *rdma_req;
1716 
1717 	rdma_req = (struct spdk_nvmf_rdma_request *)wc->wr_id;
1718 	assert(rdma_req != NULL);
1719 	assert(rdma_req - rdma_qpair->reqs >= 0);
1720 	assert(rdma_req - rdma_qpair->reqs < (ptrdiff_t)rdma_qpair->max_queue_depth);
1721 
1722 	return rdma_req;
1723 }
1724 
1725 static struct spdk_nvmf_rdma_recv *
1726 get_rdma_recv_from_wc(struct spdk_nvmf_rdma_qpair *rdma_qpair,
1727 		      struct ibv_wc *wc)
1728 {
1729 	struct spdk_nvmf_rdma_recv *rdma_recv;
1730 
1731 	assert(wc->byte_len >= sizeof(struct spdk_nvmf_capsule_cmd));
1732 
1733 	rdma_recv = (struct spdk_nvmf_rdma_recv *)wc->wr_id;
1734 	assert(rdma_recv != NULL);
1735 	assert(rdma_recv - rdma_qpair->recvs >= 0);
1736 	assert(rdma_recv - rdma_qpair->recvs < (ptrdiff_t)rdma_qpair->max_queue_depth);
1737 
1738 	return rdma_recv;
1739 }
1740 
1741 static int
1742 spdk_nvmf_rdma_qpair_poll(struct spdk_nvmf_rdma_transport *rtransport,
1743 			  struct spdk_nvmf_rdma_qpair *rqpair)
1744 {
1745 	struct ibv_wc wc[32];
1746 	struct spdk_nvmf_rdma_request	*rdma_req;
1747 	struct spdk_nvmf_rdma_recv	*rdma_recv;
1748 	int reaped, i;
1749 	int count = 0;
1750 	bool error = false;
1751 	char buf[64];
1752 
1753 	/* reset the mgmt_channel and thread info of qpair */
1754 	if (rqpair->mgmt_channel != NULL) {
1755 		if (rqpair->thread != spdk_get_thread()) {
1756 			return 0;
1757 		}
1758 	} else if (spdk_nvmf_rdma_qpair_allocate_channel(rqpair, rtransport)) {
1759 		return -1;
1760 	}
1761 
1762 	/* Poll for completing operations. */
1763 	reaped = ibv_poll_cq(rqpair->cq, 32, wc);
1764 	if (reaped < 0) {
1765 		spdk_strerror_r(errno, buf, sizeof(buf));
1766 		SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
1767 			    errno, buf);
1768 		return -1;
1769 	}
1770 
1771 	for (i = 0; i < reaped; i++) {
1772 		if (wc[i].status) {
1773 			SPDK_ERRLOG("CQ error on CQ %p, Request 0x%lu (%d): %s\n",
1774 				    rqpair->cq, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
1775 			error = true;
1776 			continue;
1777 		}
1778 
1779 		switch (wc[i].opcode) {
1780 		case IBV_WC_SEND:
1781 			rdma_req = get_rdma_req_from_wc(rqpair, &wc[i]);
1782 
1783 			assert(rdma_req->state == RDMA_REQUEST_STATE_COMPLETING);
1784 			rdma_req->state = RDMA_REQUEST_STATE_COMPLETED;
1785 
1786 			spdk_nvmf_rdma_request_process(rtransport, rdma_req);
1787 
1788 			count++;
1789 
1790 			/* Try to process other queued requests */
1791 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair);
1792 			break;
1793 
1794 		case IBV_WC_RDMA_WRITE:
1795 			rqpair->cur_rdma_rw_depth--;
1796 
1797 			/* Try to process other queued requests */
1798 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair);
1799 			break;
1800 
1801 		case IBV_WC_RDMA_READ:
1802 			rdma_req = get_rdma_req_from_wc(rqpair, &wc[i]);
1803 
1804 			assert(rdma_req->state == RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER);
1805 			rqpair->cur_rdma_rw_depth--;
1806 			rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
1807 
1808 			spdk_nvmf_rdma_request_process(rtransport, rdma_req);
1809 
1810 			/* Try to process other queued requests */
1811 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair);
1812 			break;
1813 
1814 		case IBV_WC_RECV:
1815 			rdma_recv = get_rdma_recv_from_wc(rqpair, &wc[i]);
1816 
1817 			TAILQ_INSERT_TAIL(&rqpair->incoming_queue, rdma_recv, link);
1818 
1819 			/* Try to process other queued requests */
1820 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair);
1821 			break;
1822 
1823 		default:
1824 			SPDK_ERRLOG("Received an unknown opcode on the CQ: %d\n", wc[i].opcode);
1825 			continue;
1826 		}
1827 	}
1828 
1829 	if (error == true) {
1830 		return -1;
1831 	}
1832 
1833 	return count;
1834 }
1835 
1836 static int
1837 spdk_nvmf_rdma_poll_group_poll(struct spdk_nvmf_transport_poll_group *group)
1838 {
1839 	struct spdk_nvmf_rdma_transport *rtransport;
1840 	struct spdk_nvmf_rdma_poll_group *rgroup;
1841 	struct spdk_nvmf_rdma_poller	*rpoller;
1842 	struct spdk_nvmf_rdma_qpair	*rqpair;
1843 	int				count, rc;
1844 
1845 	rtransport = SPDK_CONTAINEROF(group->transport, struct spdk_nvmf_rdma_transport, transport);
1846 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
1847 
1848 	count = 0;
1849 	TAILQ_FOREACH(rpoller, &rgroup->pollers, link) {
1850 		TAILQ_FOREACH(rqpair, &rpoller->qpairs, link) {
1851 			rc = spdk_nvmf_rdma_qpair_poll(rtransport, rqpair);
1852 			if (rc < 0) {
1853 				return rc;
1854 			}
1855 			count += rc;
1856 		}
1857 	}
1858 
1859 	return count;
1860 }
1861 
1862 static bool
1863 spdk_nvmf_rdma_qpair_is_idle(struct spdk_nvmf_qpair *qpair)
1864 {
1865 	struct spdk_nvmf_rdma_qpair *rdma_qpair;
1866 
1867 	rdma_qpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
1868 
1869 	if (rdma_qpair->cur_queue_depth == 0 && rdma_qpair->cur_rdma_rw_depth == 0) {
1870 		return true;
1871 	}
1872 	return false;
1873 }
1874 
1875 const struct spdk_nvmf_transport_ops spdk_nvmf_transport_rdma = {
1876 	.type = SPDK_NVME_TRANSPORT_RDMA,
1877 	.create = spdk_nvmf_rdma_create,
1878 	.destroy = spdk_nvmf_rdma_destroy,
1879 
1880 	.listen = spdk_nvmf_rdma_listen,
1881 	.stop_listen = spdk_nvmf_rdma_stop_listen,
1882 	.accept = spdk_nvmf_rdma_accept,
1883 
1884 	.listener_discover = spdk_nvmf_rdma_discover,
1885 
1886 	.poll_group_create = spdk_nvmf_rdma_poll_group_create,
1887 	.poll_group_destroy = spdk_nvmf_rdma_poll_group_destroy,
1888 	.poll_group_add = spdk_nvmf_rdma_poll_group_add,
1889 	.poll_group_remove = spdk_nvmf_rdma_poll_group_remove,
1890 	.poll_group_poll = spdk_nvmf_rdma_poll_group_poll,
1891 
1892 	.req_complete = spdk_nvmf_rdma_request_complete,
1893 
1894 	.qpair_fini = spdk_nvmf_rdma_close_qpair,
1895 	.qpair_is_idle = spdk_nvmf_rdma_qpair_is_idle,
1896 
1897 };
1898 
1899 SPDK_LOG_REGISTER_TRACE_FLAG("rdma", SPDK_TRACE_RDMA)
1900