xref: /spdk/lib/nvmf/rdma.c (revision 6ff6f6d6f8fa6891737ed8b4e194dd1d3918c927)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation. All rights reserved.
5  *   Copyright (c) 2018 Mellanox Technologies LTD. All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include <infiniband/verbs.h>
37 #include <rdma/rdma_cma.h>
38 #include <rdma/rdma_verbs.h>
39 
40 #include "nvmf_internal.h"
41 #include "transport.h"
42 
43 #include "spdk/config.h"
44 #include "spdk/assert.h"
45 #include "spdk/thread.h"
46 #include "spdk/nvmf.h"
47 #include "spdk/nvmf_spec.h"
48 #include "spdk/string.h"
49 #include "spdk/trace.h"
50 #include "spdk/util.h"
51 
52 #include "spdk_internal/log.h"
53 
54 struct spdk_nvme_rdma_hooks g_nvmf_hooks = {};
55 
56 /*
57  RDMA Connection Resource Defaults
58  */
59 #define NVMF_DEFAULT_TX_SGE		SPDK_NVMF_MAX_SGL_ENTRIES
60 #define NVMF_DEFAULT_RSP_SGE		1
61 #define NVMF_DEFAULT_RX_SGE		2
62 
63 /* The RDMA completion queue size */
64 #define DEFAULT_NVMF_RDMA_CQ_SIZE	4096
65 #define MAX_WR_PER_QP(queue_depth)	(queue_depth * 3 + 2)
66 
67 enum spdk_nvmf_rdma_request_state {
68 	/* The request is not currently in use */
69 	RDMA_REQUEST_STATE_FREE = 0,
70 
71 	/* Initial state when request first received */
72 	RDMA_REQUEST_STATE_NEW,
73 
74 	/* The request is queued until a data buffer is available. */
75 	RDMA_REQUEST_STATE_NEED_BUFFER,
76 
77 	/* The request is waiting on RDMA queue depth availability
78 	 * to transfer data from the host to the controller.
79 	 */
80 	RDMA_REQUEST_STATE_DATA_TRANSFER_TO_CONTROLLER_PENDING,
81 
82 	/* The request is currently transferring data from the host to the controller. */
83 	RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER,
84 
85 	/* The request is ready to execute at the block device */
86 	RDMA_REQUEST_STATE_READY_TO_EXECUTE,
87 
88 	/* The request is currently executing at the block device */
89 	RDMA_REQUEST_STATE_EXECUTING,
90 
91 	/* The request finished executing at the block device */
92 	RDMA_REQUEST_STATE_EXECUTED,
93 
94 	/* The request is waiting on RDMA queue depth availability
95 	 * to transfer data from the controller to the host.
96 	 */
97 	RDMA_REQUEST_STATE_DATA_TRANSFER_TO_HOST_PENDING,
98 
99 	/* The request is ready to send a completion */
100 	RDMA_REQUEST_STATE_READY_TO_COMPLETE,
101 
102 	/* The request is currently transferring data from the controller to the host. */
103 	RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST,
104 
105 	/* The request currently has an outstanding completion without an
106 	 * associated data transfer.
107 	 */
108 	RDMA_REQUEST_STATE_COMPLETING,
109 
110 	/* The request completed and can be marked free. */
111 	RDMA_REQUEST_STATE_COMPLETED,
112 
113 	/* Terminator */
114 	RDMA_REQUEST_NUM_STATES,
115 };
116 
117 #define OBJECT_NVMF_RDMA_IO				0x40
118 
119 #define TRACE_GROUP_NVMF_RDMA				0x4
120 #define TRACE_RDMA_REQUEST_STATE_NEW					SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x0)
121 #define TRACE_RDMA_REQUEST_STATE_NEED_BUFFER				SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x1)
122 #define TRACE_RDMA_REQUEST_STATE_DATA_TRANSFER_TO_CONTROLLER_PENDING	SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x2)
123 #define TRACE_RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER	SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x3)
124 #define TRACE_RDMA_REQUEST_STATE_READY_TO_EXECUTE			SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x4)
125 #define TRACE_RDMA_REQUEST_STATE_EXECUTING				SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x5)
126 #define TRACE_RDMA_REQUEST_STATE_EXECUTED				SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x6)
127 #define TRACE_RDMA_REQUEST_STATE_DATA_TRANSFER_TO_HOST_PENDING		SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x7)
128 #define TRACE_RDMA_REQUEST_STATE_READY_TO_COMPLETE			SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x8)
129 #define TRACE_RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST	SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x9)
130 #define TRACE_RDMA_REQUEST_STATE_COMPLETING				SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0xA)
131 #define TRACE_RDMA_REQUEST_STATE_COMPLETED				SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0xB)
132 #define TRACE_RDMA_QP_CREATE						SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0xC)
133 #define TRACE_RDMA_IBV_ASYNC_EVENT					SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0xD)
134 #define TRACE_RDMA_CM_ASYNC_EVENT					SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0xE)
135 #define TRACE_RDMA_QP_STATE_CHANGE					SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0xF)
136 #define TRACE_RDMA_QP_DISCONNECT					SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x10)
137 #define TRACE_RDMA_QP_DESTROY						SPDK_TPOINT_ID(TRACE_GROUP_NVMF_RDMA, 0x11)
138 
139 SPDK_TRACE_REGISTER_FN(nvmf_trace, "nvmf_rdma", TRACE_GROUP_NVMF_RDMA)
140 {
141 	spdk_trace_register_object(OBJECT_NVMF_RDMA_IO, 'r');
142 	spdk_trace_register_description("RDMA_REQ_NEW", "",
143 					TRACE_RDMA_REQUEST_STATE_NEW,
144 					OWNER_NONE, OBJECT_NVMF_RDMA_IO, 1, 1, "cmid:   ");
145 	spdk_trace_register_description("RDMA_REQ_NEED_BUFFER", "",
146 					TRACE_RDMA_REQUEST_STATE_NEED_BUFFER,
147 					OWNER_NONE, OBJECT_NVMF_RDMA_IO, 0, 1, "cmid:   ");
148 	spdk_trace_register_description("RDMA_REQ_TX_PENDING_C_TO_H", "",
149 					TRACE_RDMA_REQUEST_STATE_DATA_TRANSFER_TO_HOST_PENDING,
150 					OWNER_NONE, OBJECT_NVMF_RDMA_IO, 0, 1, "cmid:   ");
151 	spdk_trace_register_description("RDMA_REQ_TX_PENDING_H_TO_C", "",
152 					TRACE_RDMA_REQUEST_STATE_DATA_TRANSFER_TO_CONTROLLER_PENDING,
153 					OWNER_NONE, OBJECT_NVMF_RDMA_IO, 0, 1, "cmid:   ");
154 	spdk_trace_register_description("RDMA_REQ_TX_H_TO_C", "",
155 					TRACE_RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER,
156 					OWNER_NONE, OBJECT_NVMF_RDMA_IO, 0, 1, "cmid:   ");
157 	spdk_trace_register_description("RDMA_REQ_RDY_TO_EXECUTE", "",
158 					TRACE_RDMA_REQUEST_STATE_READY_TO_EXECUTE,
159 					OWNER_NONE, OBJECT_NVMF_RDMA_IO, 0, 1, "cmid:   ");
160 	spdk_trace_register_description("RDMA_REQ_EXECUTING", "",
161 					TRACE_RDMA_REQUEST_STATE_EXECUTING,
162 					OWNER_NONE, OBJECT_NVMF_RDMA_IO, 0, 1, "cmid:   ");
163 	spdk_trace_register_description("RDMA_REQ_EXECUTED", "",
164 					TRACE_RDMA_REQUEST_STATE_EXECUTED,
165 					OWNER_NONE, OBJECT_NVMF_RDMA_IO, 0, 1, "cmid:   ");
166 	spdk_trace_register_description("RDMA_REQ_RDY_TO_COMPLETE", "",
167 					TRACE_RDMA_REQUEST_STATE_READY_TO_COMPLETE,
168 					OWNER_NONE, OBJECT_NVMF_RDMA_IO, 0, 1, "cmid:   ");
169 	spdk_trace_register_description("RDMA_REQ_COMPLETING_CONTROLLER_TO_HOST", "",
170 					TRACE_RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST,
171 					OWNER_NONE, OBJECT_NVMF_RDMA_IO, 0, 1, "cmid:   ");
172 	spdk_trace_register_description("RDMA_REQ_COMPLETING_INCAPSULE", "",
173 					TRACE_RDMA_REQUEST_STATE_COMPLETING,
174 					OWNER_NONE, OBJECT_NVMF_RDMA_IO, 0, 1, "cmid:   ");
175 	spdk_trace_register_description("RDMA_REQ_COMPLETED", "",
176 					TRACE_RDMA_REQUEST_STATE_COMPLETED,
177 					OWNER_NONE, OBJECT_NVMF_RDMA_IO, 0, 1, "cmid:   ");
178 
179 	spdk_trace_register_description("RDMA_QP_CREATE", "", TRACE_RDMA_QP_CREATE,
180 					OWNER_NONE, OBJECT_NONE, 0, 0, "");
181 	spdk_trace_register_description("RDMA_IBV_ASYNC_EVENT", "", TRACE_RDMA_IBV_ASYNC_EVENT,
182 					OWNER_NONE, OBJECT_NONE, 0, 0, "type:   ");
183 	spdk_trace_register_description("RDMA_CM_ASYNC_EVENT", "", TRACE_RDMA_CM_ASYNC_EVENT,
184 					OWNER_NONE, OBJECT_NONE, 0, 0, "type:   ");
185 	spdk_trace_register_description("RDMA_QP_STATE_CHANGE", "", TRACE_RDMA_QP_STATE_CHANGE,
186 					OWNER_NONE, OBJECT_NONE, 0, 1, "state:  ");
187 	spdk_trace_register_description("RDMA_QP_DISCONNECT", "", TRACE_RDMA_QP_DISCONNECT,
188 					OWNER_NONE, OBJECT_NONE, 0, 0, "");
189 	spdk_trace_register_description("RDMA_QP_DESTROY", "", TRACE_RDMA_QP_DESTROY,
190 					OWNER_NONE, OBJECT_NONE, 0, 0, "");
191 }
192 
193 enum spdk_nvmf_rdma_wr_type {
194 	RDMA_WR_TYPE_RECV,
195 	RDMA_WR_TYPE_SEND,
196 	RDMA_WR_TYPE_DATA,
197 	RDMA_WR_TYPE_DRAIN_SEND,
198 	RDMA_WR_TYPE_DRAIN_RECV
199 };
200 
201 struct spdk_nvmf_rdma_wr {
202 	enum spdk_nvmf_rdma_wr_type	type;
203 };
204 
205 /* This structure holds commands as they are received off the wire.
206  * It must be dynamically paired with a full request object
207  * (spdk_nvmf_rdma_request) to service a request. It is separate
208  * from the request because RDMA does not appear to order
209  * completions, so occasionally we'll get a new incoming
210  * command when there aren't any free request objects.
211  */
212 struct spdk_nvmf_rdma_recv {
213 	struct ibv_recv_wr			wr;
214 	struct ibv_sge				sgl[NVMF_DEFAULT_RX_SGE];
215 
216 	struct spdk_nvmf_rdma_qpair		*qpair;
217 
218 	/* In-capsule data buffer */
219 	uint8_t					*buf;
220 
221 	struct spdk_nvmf_rdma_wr		rdma_wr;
222 
223 	STAILQ_ENTRY(spdk_nvmf_rdma_recv)	link;
224 };
225 
226 struct spdk_nvmf_rdma_request_data {
227 	struct spdk_nvmf_rdma_wr	rdma_wr;
228 	struct ibv_send_wr		wr;
229 	struct ibv_sge			sgl[SPDK_NVMF_MAX_SGL_ENTRIES];
230 	void				*buffers[SPDK_NVMF_MAX_SGL_ENTRIES];
231 };
232 
233 struct spdk_nvmf_rdma_request {
234 	struct spdk_nvmf_request		req;
235 	bool					data_from_pool;
236 
237 	enum spdk_nvmf_rdma_request_state	state;
238 
239 	struct spdk_nvmf_rdma_recv		*recv;
240 
241 	struct {
242 		struct spdk_nvmf_rdma_wr	rdma_wr;
243 		struct	ibv_send_wr		wr;
244 		struct	ibv_sge			sgl[NVMF_DEFAULT_RSP_SGE];
245 	} rsp;
246 
247 	struct spdk_nvmf_rdma_request_data	data;
248 
249 	uint32_t				num_outstanding_data_wr;
250 
251 	TAILQ_ENTRY(spdk_nvmf_rdma_request)	link;
252 	STAILQ_ENTRY(spdk_nvmf_rdma_request)	state_link;
253 };
254 
255 enum spdk_nvmf_rdma_qpair_disconnect_flags {
256 	RDMA_QP_DISCONNECTING		= 1,
257 	RDMA_QP_RECV_DRAINED		= 1 << 1,
258 	RDMA_QP_SEND_DRAINED		= 1 << 2
259 };
260 
261 struct spdk_nvmf_rdma_qpair {
262 	struct spdk_nvmf_qpair			qpair;
263 
264 	struct spdk_nvmf_rdma_port		*port;
265 	struct spdk_nvmf_rdma_poller		*poller;
266 
267 	struct rdma_cm_id			*cm_id;
268 	struct rdma_cm_id			*listen_id;
269 
270 	/* The maximum number of I/O outstanding on this connection at one time */
271 	uint16_t				max_queue_depth;
272 
273 	/* The maximum number of active RDMA READ and ATOMIC operations at one time */
274 	uint16_t				max_read_depth;
275 
276 	/* The maximum number of RDMA SEND operations at one time */
277 	uint32_t				max_send_depth;
278 
279 	/* The current number of outstanding WRs from this qpair's
280 	 * recv queue. Should not exceed device->attr.max_queue_depth.
281 	 */
282 	uint16_t				current_recv_depth;
283 
284 	/* The current number of posted WRs from this qpair's
285 	 * send queue. Should not exceed max_send_depth.
286 	 */
287 	uint32_t				current_send_depth;
288 
289 	/* The current number of active RDMA READ operations */
290 	uint16_t				current_read_depth;
291 
292 	/* The maximum number of SGEs per WR on the send queue */
293 	uint32_t				max_send_sge;
294 
295 	/* The maximum number of SGEs per WR on the recv queue */
296 	uint32_t				max_recv_sge;
297 
298 	/* Receives that are waiting for a request object */
299 	STAILQ_HEAD(, spdk_nvmf_rdma_recv)	incoming_queue;
300 
301 	/* Queues to track requests in critical states */
302 	STAILQ_HEAD(, spdk_nvmf_rdma_request)	free_queue;
303 
304 	STAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_rdma_read_queue;
305 
306 	STAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_rdma_write_queue;
307 
308 	/* Number of requests not in the free state */
309 	uint32_t				qd;
310 
311 	/* Array of size "max_queue_depth" containing RDMA requests. */
312 	struct spdk_nvmf_rdma_request		*reqs;
313 
314 	/* Array of size "max_queue_depth" containing RDMA recvs. */
315 	struct spdk_nvmf_rdma_recv		*recvs;
316 
317 	/* Array of size "max_queue_depth" containing 64 byte capsules
318 	 * used for receive.
319 	 */
320 	union nvmf_h2c_msg			*cmds;
321 	struct ibv_mr				*cmds_mr;
322 
323 	/* Array of size "max_queue_depth" containing 16 byte completions
324 	 * to be sent back to the user.
325 	 */
326 	union nvmf_c2h_msg			*cpls;
327 	struct ibv_mr				*cpls_mr;
328 
329 	/* Array of size "max_queue_depth * InCapsuleDataSize" containing
330 	 * buffers to be used for in capsule data.
331 	 */
332 	void					*bufs;
333 	struct ibv_mr				*bufs_mr;
334 
335 	TAILQ_ENTRY(spdk_nvmf_rdma_qpair)	link;
336 
337 	/* IBV queue pair attributes: they are used to manage
338 	 * qp state and recover from errors.
339 	 */
340 	struct ibv_qp_attr			ibv_attr;
341 
342 	uint32_t				disconnect_flags;
343 	struct spdk_nvmf_rdma_wr		drain_send_wr;
344 	struct spdk_nvmf_rdma_wr		drain_recv_wr;
345 
346 	/* There are several ways a disconnect can start on a qpair
347 	 * and they are not all mutually exclusive. It is important
348 	 * that we only initialize one of these paths.
349 	 */
350 	bool					disconnect_started;
351 };
352 
353 struct spdk_nvmf_rdma_poller {
354 	struct spdk_nvmf_rdma_device		*device;
355 	struct spdk_nvmf_rdma_poll_group	*group;
356 
357 	int					num_cqe;
358 	int					required_num_wr;
359 	struct ibv_cq				*cq;
360 
361 	TAILQ_HEAD(, spdk_nvmf_rdma_qpair)	qpairs;
362 
363 	TAILQ_ENTRY(spdk_nvmf_rdma_poller)	link;
364 };
365 
366 struct spdk_nvmf_rdma_poll_group {
367 	struct spdk_nvmf_transport_poll_group	group;
368 
369 	/* Requests that are waiting to obtain a data buffer */
370 	TAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_data_buf_queue;
371 
372 	TAILQ_HEAD(, spdk_nvmf_rdma_poller)	pollers;
373 };
374 
375 /* Assuming rdma_cm uses just one protection domain per ibv_context. */
376 struct spdk_nvmf_rdma_device {
377 	struct ibv_device_attr			attr;
378 	struct ibv_context			*context;
379 
380 	struct spdk_mem_map			*map;
381 	struct ibv_pd				*pd;
382 
383 	TAILQ_ENTRY(spdk_nvmf_rdma_device)	link;
384 };
385 
386 struct spdk_nvmf_rdma_port {
387 	struct spdk_nvme_transport_id		trid;
388 	struct rdma_cm_id			*id;
389 	struct spdk_nvmf_rdma_device		*device;
390 	uint32_t				ref;
391 	TAILQ_ENTRY(spdk_nvmf_rdma_port)	link;
392 };
393 
394 struct spdk_nvmf_rdma_transport {
395 	struct spdk_nvmf_transport	transport;
396 
397 	struct rdma_event_channel	*event_channel;
398 
399 	struct spdk_mempool		*data_wr_pool;
400 
401 	pthread_mutex_t			lock;
402 
403 	/* fields used to poll RDMA/IB events */
404 	nfds_t			npoll_fds;
405 	struct pollfd		*poll_fds;
406 
407 	TAILQ_HEAD(, spdk_nvmf_rdma_device)	devices;
408 	TAILQ_HEAD(, spdk_nvmf_rdma_port)	ports;
409 };
410 
411 static inline int
412 spdk_nvmf_rdma_check_ibv_state(enum ibv_qp_state state)
413 {
414 	switch (state) {
415 	case IBV_QPS_RESET:
416 	case IBV_QPS_INIT:
417 	case IBV_QPS_RTR:
418 	case IBV_QPS_RTS:
419 	case IBV_QPS_SQD:
420 	case IBV_QPS_SQE:
421 	case IBV_QPS_ERR:
422 		return 0;
423 	default:
424 		return -1;
425 	}
426 }
427 
428 static enum ibv_qp_state
429 spdk_nvmf_rdma_update_ibv_state(struct spdk_nvmf_rdma_qpair *rqpair) {
430 	enum ibv_qp_state old_state, new_state;
431 	struct ibv_qp_init_attr init_attr;
432 	int rc;
433 
434 	/* All the attributes needed for recovery */
435 	static int spdk_nvmf_ibv_attr_mask =
436 	IBV_QP_STATE |
437 	IBV_QP_PKEY_INDEX |
438 	IBV_QP_PORT |
439 	IBV_QP_ACCESS_FLAGS |
440 	IBV_QP_AV |
441 	IBV_QP_PATH_MTU |
442 	IBV_QP_DEST_QPN |
443 	IBV_QP_RQ_PSN |
444 	IBV_QP_MAX_DEST_RD_ATOMIC |
445 	IBV_QP_MIN_RNR_TIMER |
446 	IBV_QP_SQ_PSN |
447 	IBV_QP_TIMEOUT |
448 	IBV_QP_RETRY_CNT |
449 	IBV_QP_RNR_RETRY |
450 	IBV_QP_MAX_QP_RD_ATOMIC;
451 
452 	old_state = rqpair->ibv_attr.qp_state;
453 	rc = ibv_query_qp(rqpair->cm_id->qp, &rqpair->ibv_attr,
454 			  spdk_nvmf_ibv_attr_mask, &init_attr);
455 
456 	if (rc)
457 	{
458 		SPDK_ERRLOG("Failed to get updated RDMA queue pair state!\n");
459 		assert(false);
460 	}
461 
462 	new_state = rqpair->ibv_attr.qp_state;
463 
464 	rc = spdk_nvmf_rdma_check_ibv_state(new_state);
465 	if (rc)
466 	{
467 		SPDK_ERRLOG("QP#%d: bad state updated: %u, maybe hardware issue\n", rqpair->qpair.qid, new_state);
468 		/*
469 		 * IBV_QPS_UNKNOWN undefined if lib version smaller than libibverbs-1.1.8
470 		 * IBV_QPS_UNKNOWN is the enum element after IBV_QPS_ERR
471 		 */
472 		return IBV_QPS_ERR + 1;
473 	}
474 
475 	if (old_state != new_state)
476 	{
477 		spdk_trace_record(TRACE_RDMA_QP_STATE_CHANGE, 0, 0,
478 				  (uintptr_t)rqpair->cm_id, new_state);
479 	}
480 	return new_state;
481 }
482 
483 static const char *str_ibv_qp_state[] = {
484 	"IBV_QPS_RESET",
485 	"IBV_QPS_INIT",
486 	"IBV_QPS_RTR",
487 	"IBV_QPS_RTS",
488 	"IBV_QPS_SQD",
489 	"IBV_QPS_SQE",
490 	"IBV_QPS_ERR",
491 	"IBV_QPS_UNKNOWN"
492 };
493 
494 static int
495 spdk_nvmf_rdma_set_ibv_state(struct spdk_nvmf_rdma_qpair *rqpair,
496 			     enum ibv_qp_state new_state)
497 {
498 	int rc;
499 	enum ibv_qp_state state;
500 	static int attr_mask_rc[] = {
501 		[IBV_QPS_RESET] = IBV_QP_STATE,
502 		[IBV_QPS_INIT] = (IBV_QP_STATE |
503 				  IBV_QP_PKEY_INDEX |
504 				  IBV_QP_PORT |
505 				  IBV_QP_ACCESS_FLAGS),
506 		[IBV_QPS_RTR] = (IBV_QP_STATE |
507 				 IBV_QP_AV |
508 				 IBV_QP_PATH_MTU |
509 				 IBV_QP_DEST_QPN |
510 				 IBV_QP_RQ_PSN |
511 				 IBV_QP_MAX_DEST_RD_ATOMIC |
512 				 IBV_QP_MIN_RNR_TIMER),
513 		[IBV_QPS_RTS] = (IBV_QP_STATE |
514 				 IBV_QP_SQ_PSN |
515 				 IBV_QP_TIMEOUT |
516 				 IBV_QP_RETRY_CNT |
517 				 IBV_QP_RNR_RETRY |
518 				 IBV_QP_MAX_QP_RD_ATOMIC),
519 		[IBV_QPS_SQD] = IBV_QP_STATE,
520 		[IBV_QPS_SQE] = IBV_QP_STATE,
521 		[IBV_QPS_ERR] = IBV_QP_STATE,
522 	};
523 
524 	rc = spdk_nvmf_rdma_check_ibv_state(new_state);
525 	if (rc) {
526 		SPDK_ERRLOG("QP#%d: bad state requested: %u\n",
527 			    rqpair->qpair.qid, new_state);
528 		return rc;
529 	}
530 
531 	rqpair->ibv_attr.cur_qp_state = rqpair->ibv_attr.qp_state;
532 	rqpair->ibv_attr.qp_state = new_state;
533 	rqpair->ibv_attr.ah_attr.port_num = rqpair->ibv_attr.port_num;
534 
535 	rc = ibv_modify_qp(rqpair->cm_id->qp, &rqpair->ibv_attr,
536 			   attr_mask_rc[new_state]);
537 
538 	if (rc) {
539 		SPDK_ERRLOG("QP#%d: failed to set state to: %s, %d (%s)\n",
540 			    rqpair->qpair.qid, str_ibv_qp_state[new_state], errno, strerror(errno));
541 		return rc;
542 	}
543 
544 	state = spdk_nvmf_rdma_update_ibv_state(rqpair);
545 
546 	if (state != new_state) {
547 		SPDK_ERRLOG("QP#%d: expected state: %s, actual state: %s\n",
548 			    rqpair->qpair.qid, str_ibv_qp_state[new_state],
549 			    str_ibv_qp_state[state]);
550 		return -1;
551 	}
552 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "IBV QP#%u changed to: %s\n", rqpair->qpair.qid,
553 		      str_ibv_qp_state[state]);
554 	return 0;
555 }
556 
557 static void
558 nvmf_rdma_dump_request(struct spdk_nvmf_rdma_request *req)
559 {
560 	SPDK_ERRLOG("\t\tRequest Data From Pool: %d\n", req->data_from_pool);
561 	if (req->req.cmd) {
562 		SPDK_ERRLOG("\t\tRequest opcode: %d\n", req->req.cmd->nvmf_cmd.opcode);
563 	}
564 	if (req->recv) {
565 		SPDK_ERRLOG("\t\tRequest recv wr_id%lu\n", req->recv->wr.wr_id);
566 	}
567 }
568 
569 static void
570 nvmf_rdma_dump_qpair_contents(struct spdk_nvmf_rdma_qpair *rqpair)
571 {
572 	int i;
573 
574 	SPDK_ERRLOG("Dumping contents of queue pair (QID %d)\n", rqpair->qpair.qid);
575 	for (i = 0; i < rqpair->max_queue_depth; i++) {
576 		if (rqpair->reqs[i].state != RDMA_REQUEST_STATE_FREE) {
577 			nvmf_rdma_dump_request(&rqpair->reqs[i]);
578 		}
579 	}
580 }
581 
582 static void
583 spdk_nvmf_rdma_qpair_destroy(struct spdk_nvmf_rdma_qpair *rqpair)
584 {
585 	spdk_trace_record(TRACE_RDMA_QP_DESTROY, 0, 0, (uintptr_t)rqpair->cm_id, 0);
586 
587 	if (rqpair->qd != 0) {
588 		nvmf_rdma_dump_qpair_contents(rqpair);
589 		SPDK_WARNLOG("Destroying qpair when queue depth is %d\n", rqpair->qd);
590 	}
591 
592 	if (rqpair->poller) {
593 		TAILQ_REMOVE(&rqpair->poller->qpairs, rqpair, link);
594 	}
595 
596 	if (rqpair->cmds_mr) {
597 		ibv_dereg_mr(rqpair->cmds_mr);
598 	}
599 
600 	if (rqpair->cpls_mr) {
601 		ibv_dereg_mr(rqpair->cpls_mr);
602 	}
603 
604 	if (rqpair->bufs_mr) {
605 		ibv_dereg_mr(rqpair->bufs_mr);
606 	}
607 
608 	if (rqpair->cm_id) {
609 		rdma_destroy_qp(rqpair->cm_id);
610 		rdma_destroy_id(rqpair->cm_id);
611 
612 		if (rqpair->poller) {
613 			rqpair->poller->required_num_wr -= MAX_WR_PER_QP(rqpair->max_queue_depth);
614 		}
615 	}
616 
617 	/* Free all memory */
618 	spdk_dma_free(rqpair->cmds);
619 	spdk_dma_free(rqpair->cpls);
620 	spdk_dma_free(rqpair->bufs);
621 	free(rqpair->reqs);
622 	free(rqpair->recvs);
623 	free(rqpair);
624 }
625 
626 static int
627 spdk_nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair)
628 {
629 	struct spdk_nvmf_rdma_transport *rtransport;
630 	struct spdk_nvmf_rdma_qpair	*rqpair;
631 	struct spdk_nvmf_rdma_poller	*rpoller;
632 	int				rc, i, num_cqe, required_num_wr;;
633 	struct spdk_nvmf_rdma_recv	*rdma_recv;
634 	struct spdk_nvmf_rdma_request	*rdma_req;
635 	struct spdk_nvmf_transport	*transport;
636 	struct spdk_nvmf_rdma_device	*device;
637 	struct ibv_qp_init_attr		ibv_init_attr;
638 
639 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
640 	rtransport = SPDK_CONTAINEROF(qpair->transport, struct spdk_nvmf_rdma_transport, transport);
641 	transport = &rtransport->transport;
642 	device = rqpair->port->device;
643 
644 	memset(&ibv_init_attr, 0, sizeof(struct ibv_qp_init_attr));
645 	ibv_init_attr.qp_context	= rqpair;
646 	ibv_init_attr.qp_type		= IBV_QPT_RC;
647 	ibv_init_attr.send_cq		= rqpair->poller->cq;
648 	ibv_init_attr.recv_cq		= rqpair->poller->cq;
649 	ibv_init_attr.cap.max_send_wr	= rqpair->max_queue_depth *
650 					  2 + 1; /* SEND, READ, and WRITE operations + dummy drain WR */
651 	ibv_init_attr.cap.max_recv_wr	= rqpair->max_queue_depth +
652 					  1; /* RECV operations + dummy drain WR */
653 	ibv_init_attr.cap.max_send_sge	= spdk_min(device->attr.max_sge, NVMF_DEFAULT_TX_SGE);
654 	ibv_init_attr.cap.max_recv_sge	= spdk_min(device->attr.max_sge, NVMF_DEFAULT_RX_SGE);
655 
656 	/* Enlarge CQ size dynamically */
657 	rpoller = rqpair->poller;
658 	required_num_wr = rpoller->required_num_wr + MAX_WR_PER_QP(rqpair->max_queue_depth);
659 	num_cqe = rpoller->num_cqe;
660 	if (num_cqe < required_num_wr) {
661 		num_cqe = spdk_max(num_cqe * 2, required_num_wr);
662 		num_cqe = spdk_min(num_cqe, device->attr.max_cqe);
663 	}
664 
665 	if (rpoller->num_cqe != num_cqe) {
666 		if (required_num_wr > device->attr.max_cqe) {
667 			SPDK_ERRLOG("RDMA CQE requirement (%d) exceeds device max_cqe limitation (%d)\n",
668 				    required_num_wr, device->attr.max_cqe);
669 			rdma_destroy_id(rqpair->cm_id);
670 			rqpair->cm_id = NULL;
671 			spdk_nvmf_rdma_qpair_destroy(rqpair);
672 			return -1;
673 		}
674 
675 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Resize RDMA CQ from %d to %d\n", rpoller->num_cqe, num_cqe);
676 		rc = ibv_resize_cq(rpoller->cq, num_cqe);
677 		if (rc) {
678 			SPDK_ERRLOG("RDMA CQ resize failed: errno %d: %s\n", errno, spdk_strerror(errno));
679 			rdma_destroy_id(rqpair->cm_id);
680 			rqpair->cm_id = NULL;
681 			spdk_nvmf_rdma_qpair_destroy(rqpair);
682 			return -1;
683 		}
684 
685 		rpoller->num_cqe = num_cqe;
686 	}
687 
688 	rc = rdma_create_qp(rqpair->cm_id, rqpair->port->device->pd, &ibv_init_attr);
689 	if (rc) {
690 		SPDK_ERRLOG("rdma_create_qp failed: errno %d: %s\n", errno, spdk_strerror(errno));
691 		rdma_destroy_id(rqpair->cm_id);
692 		rqpair->cm_id = NULL;
693 		spdk_nvmf_rdma_qpair_destroy(rqpair);
694 		return -1;
695 	}
696 
697 	rpoller->required_num_wr = required_num_wr;
698 
699 	rqpair->max_send_depth = spdk_min((uint32_t)(rqpair->max_queue_depth * 2 + 1),
700 					  ibv_init_attr.cap.max_send_wr);
701 	rqpair->max_send_sge = spdk_min(NVMF_DEFAULT_TX_SGE, ibv_init_attr.cap.max_send_sge);
702 	rqpair->max_recv_sge = spdk_min(NVMF_DEFAULT_RX_SGE, ibv_init_attr.cap.max_recv_sge);
703 	spdk_trace_record(TRACE_RDMA_QP_CREATE, 0, 0, (uintptr_t)rqpair->cm_id, 0);
704 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "New RDMA Connection: %p\n", qpair);
705 
706 	rqpair->reqs = calloc(rqpair->max_queue_depth, sizeof(*rqpair->reqs));
707 	rqpair->recvs = calloc(rqpair->max_queue_depth, sizeof(*rqpair->recvs));
708 	rqpair->cmds = spdk_dma_zmalloc(rqpair->max_queue_depth * sizeof(*rqpair->cmds),
709 					0x1000, NULL);
710 	rqpair->cpls = spdk_dma_zmalloc(rqpair->max_queue_depth * sizeof(*rqpair->cpls),
711 					0x1000, NULL);
712 
713 
714 	if (transport->opts.in_capsule_data_size > 0) {
715 		rqpair->bufs = spdk_dma_zmalloc(rqpair->max_queue_depth *
716 						transport->opts.in_capsule_data_size,
717 						0x1000, NULL);
718 	}
719 
720 	if (!rqpair->reqs || !rqpair->recvs || !rqpair->cmds ||
721 	    !rqpair->cpls || (transport->opts.in_capsule_data_size && !rqpair->bufs)) {
722 		SPDK_ERRLOG("Unable to allocate sufficient memory for RDMA queue.\n");
723 		spdk_nvmf_rdma_qpair_destroy(rqpair);
724 		return -1;
725 	}
726 
727 	rqpair->cmds_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->cmds,
728 				     rqpair->max_queue_depth * sizeof(*rqpair->cmds),
729 				     IBV_ACCESS_LOCAL_WRITE);
730 	rqpair->cpls_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->cpls,
731 				     rqpair->max_queue_depth * sizeof(*rqpair->cpls),
732 				     0);
733 
734 	if (transport->opts.in_capsule_data_size) {
735 		rqpair->bufs_mr = ibv_reg_mr(rqpair->cm_id->pd, rqpair->bufs,
736 					     rqpair->max_queue_depth *
737 					     transport->opts.in_capsule_data_size,
738 					     IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
739 	}
740 
741 	if (!rqpair->cmds_mr || !rqpair->cpls_mr || (transport->opts.in_capsule_data_size &&
742 			!rqpair->bufs_mr)) {
743 		SPDK_ERRLOG("Unable to register required memory for RDMA queue.\n");
744 		spdk_nvmf_rdma_qpair_destroy(rqpair);
745 		return -1;
746 	}
747 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Command Array: %p Length: %lx LKey: %x\n",
748 		      rqpair->cmds, rqpair->max_queue_depth * sizeof(*rqpair->cmds), rqpair->cmds_mr->lkey);
749 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Completion Array: %p Length: %lx LKey: %x\n",
750 		      rqpair->cpls, rqpair->max_queue_depth * sizeof(*rqpair->cpls), rqpair->cpls_mr->lkey);
751 	if (rqpair->bufs && rqpair->bufs_mr) {
752 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "In Capsule Data Array: %p Length: %x LKey: %x\n",
753 			      rqpair->bufs, rqpair->max_queue_depth *
754 			      transport->opts.in_capsule_data_size, rqpair->bufs_mr->lkey);
755 	}
756 
757 	STAILQ_INIT(&rqpair->free_queue);
758 	STAILQ_INIT(&rqpair->pending_rdma_read_queue);
759 	STAILQ_INIT(&rqpair->pending_rdma_write_queue);
760 
761 	rqpair->current_recv_depth = rqpair->max_queue_depth;
762 	for (i = 0; i < rqpair->max_queue_depth; i++) {
763 		struct ibv_recv_wr *bad_wr = NULL;
764 
765 		rdma_recv = &rqpair->recvs[i];
766 		rdma_recv->qpair = rqpair;
767 
768 		/* Set up memory to receive commands */
769 		if (rqpair->bufs) {
770 			rdma_recv->buf = (void *)((uintptr_t)rqpair->bufs + (i *
771 						  transport->opts.in_capsule_data_size));
772 		}
773 
774 		rdma_recv->rdma_wr.type = RDMA_WR_TYPE_RECV;
775 
776 		rdma_recv->sgl[0].addr = (uintptr_t)&rqpair->cmds[i];
777 		rdma_recv->sgl[0].length = sizeof(rqpair->cmds[i]);
778 		rdma_recv->sgl[0].lkey = rqpair->cmds_mr->lkey;
779 		rdma_recv->wr.num_sge = 1;
780 
781 		if (rdma_recv->buf && rqpair->bufs_mr) {
782 			rdma_recv->sgl[1].addr = (uintptr_t)rdma_recv->buf;
783 			rdma_recv->sgl[1].length = transport->opts.in_capsule_data_size;
784 			rdma_recv->sgl[1].lkey = rqpair->bufs_mr->lkey;
785 			rdma_recv->wr.num_sge++;
786 		}
787 
788 		rdma_recv->wr.wr_id = (uintptr_t)&rdma_recv->rdma_wr;
789 		rdma_recv->wr.sg_list = rdma_recv->sgl;
790 
791 		rc = ibv_post_recv(rqpair->cm_id->qp, &rdma_recv->wr, &bad_wr);
792 		assert(rqpair->current_recv_depth > 0);
793 		rqpair->current_recv_depth--;
794 		if (rc) {
795 			SPDK_ERRLOG("Unable to post capsule for RDMA RECV\n");
796 			spdk_nvmf_rdma_qpair_destroy(rqpair);
797 			return -1;
798 		}
799 	}
800 	assert(rqpair->current_recv_depth == 0);
801 
802 	for (i = 0; i < rqpair->max_queue_depth; i++) {
803 		rdma_req = &rqpair->reqs[i];
804 
805 		rdma_req->req.qpair = &rqpair->qpair;
806 		rdma_req->req.cmd = NULL;
807 
808 		/* Set up memory to send responses */
809 		rdma_req->req.rsp = &rqpair->cpls[i];
810 
811 		rdma_req->rsp.sgl[0].addr = (uintptr_t)&rqpair->cpls[i];
812 		rdma_req->rsp.sgl[0].length = sizeof(rqpair->cpls[i]);
813 		rdma_req->rsp.sgl[0].lkey = rqpair->cpls_mr->lkey;
814 
815 		rdma_req->rsp.rdma_wr.type = RDMA_WR_TYPE_SEND;
816 		rdma_req->rsp.wr.wr_id = (uintptr_t)&rdma_req->rsp.rdma_wr;
817 		rdma_req->rsp.wr.next = NULL;
818 		rdma_req->rsp.wr.opcode = IBV_WR_SEND;
819 		rdma_req->rsp.wr.send_flags = IBV_SEND_SIGNALED;
820 		rdma_req->rsp.wr.sg_list = rdma_req->rsp.sgl;
821 		rdma_req->rsp.wr.num_sge = SPDK_COUNTOF(rdma_req->rsp.sgl);
822 
823 		/* Set up memory for data buffers */
824 		rdma_req->data.rdma_wr.type = RDMA_WR_TYPE_DATA;
825 		rdma_req->data.wr.wr_id = (uintptr_t)&rdma_req->data.rdma_wr;
826 		rdma_req->data.wr.next = NULL;
827 		rdma_req->data.wr.send_flags = IBV_SEND_SIGNALED;
828 		rdma_req->data.wr.sg_list = rdma_req->data.sgl;
829 		rdma_req->data.wr.num_sge = SPDK_COUNTOF(rdma_req->data.sgl);
830 
831 		/* Initialize request state to FREE */
832 		rdma_req->state = RDMA_REQUEST_STATE_FREE;
833 		STAILQ_INSERT_HEAD(&rqpair->free_queue, rdma_req, state_link);
834 	}
835 
836 	return 0;
837 }
838 
839 static int
840 request_transfer_in(struct spdk_nvmf_request *req)
841 {
842 	int				rc;
843 	struct spdk_nvmf_rdma_request	*rdma_req;
844 	struct spdk_nvmf_qpair		*qpair;
845 	struct spdk_nvmf_rdma_qpair	*rqpair;
846 	struct ibv_send_wr		*bad_wr = NULL;
847 
848 	qpair = req->qpair;
849 	rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
850 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
851 
852 	assert(req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER);
853 	assert(rdma_req != NULL);
854 
855 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "RDMA READ POSTED. Request: %p Connection: %p\n", req, qpair);
856 
857 	rc = ibv_post_send(rqpair->cm_id->qp, &rdma_req->data.wr, &bad_wr);
858 	if (rc) {
859 		SPDK_ERRLOG("Unable to transfer data from host to target\n");
860 		return -1;
861 	}
862 	rqpair->current_read_depth += rdma_req->num_outstanding_data_wr;
863 	rqpair->current_send_depth += rdma_req->num_outstanding_data_wr;
864 	return 0;
865 }
866 
867 static int
868 request_transfer_out(struct spdk_nvmf_request *req, int *data_posted)
869 {
870 	int				rc;
871 	struct spdk_nvmf_rdma_request	*rdma_req;
872 	struct spdk_nvmf_qpair		*qpair;
873 	struct spdk_nvmf_rdma_qpair	*rqpair;
874 	struct spdk_nvme_cpl		*rsp;
875 	struct ibv_recv_wr		*bad_recv_wr = NULL;
876 	struct ibv_send_wr		*send_wr, *bad_send_wr = NULL;
877 
878 	*data_posted = 0;
879 	qpair = req->qpair;
880 	rsp = &req->rsp->nvme_cpl;
881 	rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
882 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
883 
884 	/* Advance our sq_head pointer */
885 	if (qpair->sq_head == qpair->sq_head_max) {
886 		qpair->sq_head = 0;
887 	} else {
888 		qpair->sq_head++;
889 	}
890 	rsp->sqhd = qpair->sq_head;
891 
892 	/* Post the capsule to the recv buffer */
893 	assert(rdma_req->recv != NULL);
894 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "RDMA RECV POSTED. Recv: %p Connection: %p\n", rdma_req->recv,
895 		      rqpair);
896 	rc = ibv_post_recv(rqpair->cm_id->qp, &rdma_req->recv->wr, &bad_recv_wr);
897 	if (rc) {
898 		SPDK_ERRLOG("Unable to re-post rx descriptor\n");
899 		return rc;
900 	}
901 	rdma_req->recv = NULL;
902 	assert(rqpair->current_recv_depth > 0);
903 	rqpair->current_recv_depth--;
904 
905 	/* Build the response which consists of an optional
906 	 * RDMA WRITE to transfer data, plus an RDMA SEND
907 	 * containing the response.
908 	 */
909 	send_wr = &rdma_req->rsp.wr;
910 
911 	if (rsp->status.sc == SPDK_NVME_SC_SUCCESS &&
912 	    req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
913 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "RDMA WRITE POSTED. Request: %p Connection: %p\n", req, qpair);
914 		send_wr = &rdma_req->data.wr;
915 		*data_posted = 1;
916 	}
917 
918 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "RDMA SEND POSTED. Request: %p Connection: %p\n", req, qpair);
919 
920 	/* Send the completion */
921 	rc = ibv_post_send(rqpair->cm_id->qp, send_wr, &bad_send_wr);
922 	if (rc) {
923 		SPDK_ERRLOG("Unable to send response capsule\n");
924 		return rc;
925 	}
926 	/* +1 for the rsp wr */
927 	rqpair->current_send_depth += rdma_req->num_outstanding_data_wr + 1;
928 
929 	return 0;
930 }
931 
932 static int
933 spdk_nvmf_rdma_event_accept(struct rdma_cm_id *id, struct spdk_nvmf_rdma_qpair *rqpair)
934 {
935 	struct spdk_nvmf_rdma_accept_private_data	accept_data;
936 	struct rdma_conn_param				ctrlr_event_data = {};
937 	int						rc;
938 
939 	accept_data.recfmt = 0;
940 	accept_data.crqsize = rqpair->max_queue_depth;
941 
942 	ctrlr_event_data.private_data = &accept_data;
943 	ctrlr_event_data.private_data_len = sizeof(accept_data);
944 	if (id->ps == RDMA_PS_TCP) {
945 		ctrlr_event_data.responder_resources = 0; /* We accept 0 reads from the host */
946 		ctrlr_event_data.initiator_depth = rqpair->max_read_depth;
947 	}
948 
949 	rc = rdma_accept(id, &ctrlr_event_data);
950 	if (rc) {
951 		SPDK_ERRLOG("Error %d on rdma_accept\n", errno);
952 	} else {
953 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Sent back the accept\n");
954 	}
955 
956 	return rc;
957 }
958 
959 static void
960 spdk_nvmf_rdma_event_reject(struct rdma_cm_id *id, enum spdk_nvmf_rdma_transport_error error)
961 {
962 	struct spdk_nvmf_rdma_reject_private_data	rej_data;
963 
964 	rej_data.recfmt = 0;
965 	rej_data.sts = error;
966 
967 	rdma_reject(id, &rej_data, sizeof(rej_data));
968 }
969 
970 static int
971 nvmf_rdma_connect(struct spdk_nvmf_transport *transport, struct rdma_cm_event *event,
972 		  new_qpair_fn cb_fn)
973 {
974 	struct spdk_nvmf_rdma_transport *rtransport;
975 	struct spdk_nvmf_rdma_qpair	*rqpair = NULL;
976 	struct spdk_nvmf_rdma_port	*port;
977 	struct rdma_conn_param		*rdma_param = NULL;
978 	const struct spdk_nvmf_rdma_request_private_data *private_data = NULL;
979 	uint16_t			max_queue_depth;
980 	uint16_t			max_read_depth;
981 
982 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
983 
984 	assert(event->id != NULL); /* Impossible. Can't even reject the connection. */
985 	assert(event->id->verbs != NULL); /* Impossible. No way to handle this. */
986 
987 	rdma_param = &event->param.conn;
988 	if (rdma_param->private_data == NULL ||
989 	    rdma_param->private_data_len < sizeof(struct spdk_nvmf_rdma_request_private_data)) {
990 		SPDK_ERRLOG("connect request: no private data provided\n");
991 		spdk_nvmf_rdma_event_reject(event->id, SPDK_NVMF_RDMA_ERROR_INVALID_PRIVATE_DATA_LENGTH);
992 		return -1;
993 	}
994 
995 	private_data = rdma_param->private_data;
996 	if (private_data->recfmt != 0) {
997 		SPDK_ERRLOG("Received RDMA private data with RECFMT != 0\n");
998 		spdk_nvmf_rdma_event_reject(event->id, SPDK_NVMF_RDMA_ERROR_INVALID_RECFMT);
999 		return -1;
1000 	}
1001 
1002 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Connect Recv on fabric intf name %s, dev_name %s\n",
1003 		      event->id->verbs->device->name, event->id->verbs->device->dev_name);
1004 
1005 	port = event->listen_id->context;
1006 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Listen Id was %p with verbs %p. ListenAddr: %p\n",
1007 		      event->listen_id, event->listen_id->verbs, port);
1008 
1009 	/* Figure out the supported queue depth. This is a multi-step process
1010 	 * that takes into account hardware maximums, host provided values,
1011 	 * and our target's internal memory limits */
1012 
1013 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Calculating Queue Depth\n");
1014 
1015 	/* Start with the maximum queue depth allowed by the target */
1016 	max_queue_depth = rtransport->transport.opts.max_queue_depth;
1017 	max_read_depth = rtransport->transport.opts.max_queue_depth;
1018 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Target Max Queue Depth: %d\n",
1019 		      rtransport->transport.opts.max_queue_depth);
1020 
1021 	/* Next check the local NIC's hardware limitations */
1022 	SPDK_DEBUGLOG(SPDK_LOG_RDMA,
1023 		      "Local NIC Max Send/Recv Queue Depth: %d Max Read/Write Queue Depth: %d\n",
1024 		      port->device->attr.max_qp_wr, port->device->attr.max_qp_rd_atom);
1025 	max_queue_depth = spdk_min(max_queue_depth, port->device->attr.max_qp_wr);
1026 	max_read_depth = spdk_min(max_read_depth, port->device->attr.max_qp_init_rd_atom);
1027 
1028 	/* Next check the remote NIC's hardware limitations */
1029 	SPDK_DEBUGLOG(SPDK_LOG_RDMA,
1030 		      "Host (Initiator) NIC Max Incoming RDMA R/W operations: %d Max Outgoing RDMA R/W operations: %d\n",
1031 		      rdma_param->initiator_depth, rdma_param->responder_resources);
1032 	if (rdma_param->initiator_depth > 0) {
1033 		max_read_depth = spdk_min(max_read_depth, rdma_param->initiator_depth);
1034 	}
1035 
1036 	/* Finally check for the host software requested values, which are
1037 	 * optional. */
1038 	if (rdma_param->private_data != NULL &&
1039 	    rdma_param->private_data_len >= sizeof(struct spdk_nvmf_rdma_request_private_data)) {
1040 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Host Receive Queue Size: %d\n", private_data->hrqsize);
1041 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Host Send Queue Size: %d\n", private_data->hsqsize);
1042 		max_queue_depth = spdk_min(max_queue_depth, private_data->hrqsize);
1043 		max_queue_depth = spdk_min(max_queue_depth, private_data->hsqsize + 1);
1044 	}
1045 
1046 	SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Final Negotiated Queue Depth: %d R/W Depth: %d\n",
1047 		      max_queue_depth, max_read_depth);
1048 
1049 	rqpair = calloc(1, sizeof(struct spdk_nvmf_rdma_qpair));
1050 	if (rqpair == NULL) {
1051 		SPDK_ERRLOG("Could not allocate new connection.\n");
1052 		spdk_nvmf_rdma_event_reject(event->id, SPDK_NVMF_RDMA_ERROR_NO_RESOURCES);
1053 		return -1;
1054 	}
1055 
1056 	rqpair->port = port;
1057 	rqpair->max_queue_depth = max_queue_depth;
1058 	rqpair->max_read_depth = max_read_depth;
1059 	rqpair->cm_id = event->id;
1060 	rqpair->listen_id = event->listen_id;
1061 	rqpair->qpair.transport = transport;
1062 	STAILQ_INIT(&rqpair->incoming_queue);
1063 	event->id->context = &rqpair->qpair;
1064 
1065 	cb_fn(&rqpair->qpair);
1066 
1067 	return 0;
1068 }
1069 
1070 static int
1071 spdk_nvmf_rdma_mem_notify(void *cb_ctx, struct spdk_mem_map *map,
1072 			  enum spdk_mem_map_notify_action action,
1073 			  void *vaddr, size_t size)
1074 {
1075 	struct ibv_pd *pd = cb_ctx;
1076 	struct ibv_mr *mr;
1077 
1078 	switch (action) {
1079 	case SPDK_MEM_MAP_NOTIFY_REGISTER:
1080 		if (!g_nvmf_hooks.get_rkey) {
1081 			mr = ibv_reg_mr(pd, vaddr, size,
1082 					IBV_ACCESS_LOCAL_WRITE |
1083 					IBV_ACCESS_REMOTE_READ |
1084 					IBV_ACCESS_REMOTE_WRITE);
1085 			if (mr == NULL) {
1086 				SPDK_ERRLOG("ibv_reg_mr() failed\n");
1087 				return -1;
1088 			} else {
1089 				spdk_mem_map_set_translation(map, (uint64_t)vaddr, size, (uint64_t)mr);
1090 			}
1091 		} else {
1092 			spdk_mem_map_set_translation(map, (uint64_t)vaddr, size,
1093 						     g_nvmf_hooks.get_rkey(pd, vaddr, size));
1094 		}
1095 		break;
1096 	case SPDK_MEM_MAP_NOTIFY_UNREGISTER:
1097 		if (!g_nvmf_hooks.get_rkey) {
1098 			mr = (struct ibv_mr *)spdk_mem_map_translate(map, (uint64_t)vaddr, NULL);
1099 			spdk_mem_map_clear_translation(map, (uint64_t)vaddr, size);
1100 			if (mr) {
1101 				ibv_dereg_mr(mr);
1102 			}
1103 		}
1104 		break;
1105 	}
1106 
1107 	return 0;
1108 }
1109 
1110 static int
1111 spdk_nvmf_rdma_check_contiguous_entries(uint64_t addr_1, uint64_t addr_2)
1112 {
1113 	/* Two contiguous mappings will point to the same address which is the start of the RDMA MR. */
1114 	return addr_1 == addr_2;
1115 }
1116 
1117 static void
1118 spdk_nvmf_rdma_request_free_buffers(struct spdk_nvmf_rdma_request *rdma_req,
1119 				    struct spdk_nvmf_transport_poll_group *group, struct spdk_nvmf_transport *transport)
1120 {
1121 	for (uint32_t i = 0; i < rdma_req->req.iovcnt; i++) {
1122 		if (group->buf_cache_count < group->buf_cache_size) {
1123 			STAILQ_INSERT_HEAD(&group->buf_cache,
1124 					   (struct spdk_nvmf_transport_pg_cache_buf *)rdma_req->data.buffers[i], link);
1125 			group->buf_cache_count++;
1126 		} else {
1127 			spdk_mempool_put(transport->data_buf_pool, rdma_req->data.buffers[i]);
1128 		}
1129 		rdma_req->req.iov[i].iov_base = NULL;
1130 		rdma_req->data.buffers[i] = NULL;
1131 		rdma_req->req.iov[i].iov_len = 0;
1132 
1133 	}
1134 	rdma_req->data_from_pool = false;
1135 }
1136 
1137 typedef enum spdk_nvme_data_transfer spdk_nvme_data_transfer_t;
1138 
1139 static spdk_nvme_data_transfer_t
1140 spdk_nvmf_rdma_request_get_xfer(struct spdk_nvmf_rdma_request *rdma_req)
1141 {
1142 	enum spdk_nvme_data_transfer xfer;
1143 	struct spdk_nvme_cmd *cmd = &rdma_req->req.cmd->nvme_cmd;
1144 	struct spdk_nvme_sgl_descriptor *sgl = &cmd->dptr.sgl1;
1145 
1146 #ifdef SPDK_CONFIG_RDMA_SEND_WITH_INVAL
1147 	rdma_req->rsp.wr.opcode = IBV_WR_SEND;
1148 	rdma_req->rsp.wr.imm_data = 0;
1149 #endif
1150 
1151 	/* Figure out data transfer direction */
1152 	if (cmd->opc == SPDK_NVME_OPC_FABRIC) {
1153 		xfer = spdk_nvme_opc_get_data_transfer(rdma_req->req.cmd->nvmf_cmd.fctype);
1154 	} else {
1155 		xfer = spdk_nvme_opc_get_data_transfer(cmd->opc);
1156 
1157 		/* Some admin commands are special cases */
1158 		if ((rdma_req->req.qpair->qid == 0) &&
1159 		    ((cmd->opc == SPDK_NVME_OPC_GET_FEATURES) ||
1160 		     (cmd->opc == SPDK_NVME_OPC_SET_FEATURES))) {
1161 			switch (cmd->cdw10 & 0xff) {
1162 			case SPDK_NVME_FEAT_LBA_RANGE_TYPE:
1163 			case SPDK_NVME_FEAT_AUTONOMOUS_POWER_STATE_TRANSITION:
1164 			case SPDK_NVME_FEAT_HOST_IDENTIFIER:
1165 				break;
1166 			default:
1167 				xfer = SPDK_NVME_DATA_NONE;
1168 			}
1169 		}
1170 	}
1171 
1172 	if (xfer == SPDK_NVME_DATA_NONE) {
1173 		return xfer;
1174 	}
1175 
1176 	/* Even for commands that may transfer data, they could have specified 0 length.
1177 	 * We want those to show up with xfer SPDK_NVME_DATA_NONE.
1178 	 */
1179 	switch (sgl->generic.type) {
1180 	case SPDK_NVME_SGL_TYPE_DATA_BLOCK:
1181 	case SPDK_NVME_SGL_TYPE_BIT_BUCKET:
1182 	case SPDK_NVME_SGL_TYPE_SEGMENT:
1183 	case SPDK_NVME_SGL_TYPE_LAST_SEGMENT:
1184 	case SPDK_NVME_SGL_TYPE_TRANSPORT_DATA_BLOCK:
1185 		if (sgl->unkeyed.length == 0) {
1186 			xfer = SPDK_NVME_DATA_NONE;
1187 		}
1188 		break;
1189 	case SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK:
1190 		if (sgl->keyed.length == 0) {
1191 			xfer = SPDK_NVME_DATA_NONE;
1192 		}
1193 		break;
1194 	}
1195 
1196 	return xfer;
1197 }
1198 
1199 static int
1200 spdk_nvmf_rdma_request_fill_iovs(struct spdk_nvmf_rdma_transport *rtransport,
1201 				 struct spdk_nvmf_rdma_device *device,
1202 				 struct spdk_nvmf_rdma_request *rdma_req)
1203 {
1204 	struct spdk_nvmf_rdma_qpair		*rqpair;
1205 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1206 	void					*buf = NULL;
1207 	uint32_t				length = rdma_req->req.length;
1208 	uint64_t				translation_len;
1209 	uint32_t				i = 0;
1210 	int					rc = 0;
1211 
1212 	rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
1213 	rgroup = rqpair->poller->group;
1214 	rdma_req->req.iovcnt = 0;
1215 	while (length) {
1216 		if (!(STAILQ_EMPTY(&rgroup->group.buf_cache))) {
1217 			rgroup->group.buf_cache_count--;
1218 			buf = STAILQ_FIRST(&rgroup->group.buf_cache);
1219 			STAILQ_REMOVE_HEAD(&rgroup->group.buf_cache, link);
1220 			assert(buf != NULL);
1221 		} else {
1222 			buf = spdk_mempool_get(rtransport->transport.data_buf_pool);
1223 			if (!buf) {
1224 				rc = -ENOMEM;
1225 				goto err_exit;
1226 			}
1227 		}
1228 
1229 		rdma_req->req.iov[i].iov_base = (void *)((uintptr_t)(buf + NVMF_DATA_BUFFER_MASK) &
1230 						~NVMF_DATA_BUFFER_MASK);
1231 		rdma_req->req.iov[i].iov_len  = spdk_min(length, rtransport->transport.opts.io_unit_size);
1232 		rdma_req->req.iovcnt++;
1233 		rdma_req->data.buffers[i] = buf;
1234 		rdma_req->data.wr.sg_list[i].addr = (uintptr_t)(rdma_req->req.iov[i].iov_base);
1235 		rdma_req->data.wr.sg_list[i].length = rdma_req->req.iov[i].iov_len;
1236 		translation_len = rdma_req->req.iov[i].iov_len;
1237 
1238 		if (!g_nvmf_hooks.get_rkey) {
1239 			rdma_req->data.wr.sg_list[i].lkey = ((struct ibv_mr *)spdk_mem_map_translate(device->map,
1240 							     (uint64_t)buf, &translation_len))->lkey;
1241 		} else {
1242 			rdma_req->data.wr.sg_list[i].lkey = spdk_mem_map_translate(device->map,
1243 							    (uint64_t)buf, &translation_len);
1244 		}
1245 
1246 		length -= rdma_req->req.iov[i].iov_len;
1247 
1248 		if (translation_len < rdma_req->req.iov[i].iov_len) {
1249 			SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
1250 			rc = -EINVAL;
1251 			goto err_exit;
1252 		}
1253 		i++;
1254 	}
1255 
1256 	assert(rdma_req->req.iovcnt <= rqpair->max_send_sge);
1257 
1258 	rdma_req->data_from_pool = true;
1259 
1260 	return rc;
1261 
1262 err_exit:
1263 	spdk_nvmf_rdma_request_free_buffers(rdma_req, &rgroup->group, &rtransport->transport);
1264 	while (i) {
1265 		i--;
1266 		rdma_req->data.wr.sg_list[i].addr = 0;
1267 		rdma_req->data.wr.sg_list[i].length = 0;
1268 		rdma_req->data.wr.sg_list[i].lkey = 0;
1269 	}
1270 	rdma_req->req.iovcnt = 0;
1271 	return rc;
1272 }
1273 
1274 static int
1275 spdk_nvmf_rdma_request_parse_sgl(struct spdk_nvmf_rdma_transport *rtransport,
1276 				 struct spdk_nvmf_rdma_device *device,
1277 				 struct spdk_nvmf_rdma_request *rdma_req)
1278 {
1279 	struct spdk_nvme_cmd			*cmd;
1280 	struct spdk_nvme_cpl			*rsp;
1281 	struct spdk_nvme_sgl_descriptor		*sgl;
1282 
1283 	cmd = &rdma_req->req.cmd->nvme_cmd;
1284 	rsp = &rdma_req->req.rsp->nvme_cpl;
1285 	sgl = &cmd->dptr.sgl1;
1286 
1287 	if (sgl->generic.type == SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK &&
1288 	    (sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_ADDRESS ||
1289 	     sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_INVALIDATE_KEY)) {
1290 		if (sgl->keyed.length > rtransport->transport.opts.max_io_size) {
1291 			SPDK_ERRLOG("SGL length 0x%x exceeds max io size 0x%x\n",
1292 				    sgl->keyed.length, rtransport->transport.opts.max_io_size);
1293 			rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID;
1294 			return -1;
1295 		}
1296 #ifdef SPDK_CONFIG_RDMA_SEND_WITH_INVAL
1297 		if ((device->attr.device_cap_flags & IBV_DEVICE_MEM_MGT_EXTENSIONS) != 0) {
1298 			if (sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_INVALIDATE_KEY) {
1299 				rdma_req->rsp.wr.opcode = IBV_WR_SEND_WITH_INV;
1300 				rdma_req->rsp.wr.imm_data = sgl->keyed.key;
1301 			}
1302 		}
1303 #endif
1304 
1305 		/* fill request length and populate iovs */
1306 		rdma_req->req.length = sgl->keyed.length;
1307 
1308 		if (spdk_nvmf_rdma_request_fill_iovs(rtransport, device, rdma_req) < 0) {
1309 			/* No available buffers. Queue this request up. */
1310 			SPDK_DEBUGLOG(SPDK_LOG_RDMA, "No available large data buffers. Queueing request %p\n", rdma_req);
1311 			return 0;
1312 		}
1313 
1314 		/* backward compatible */
1315 		rdma_req->req.data = rdma_req->req.iov[0].iov_base;
1316 
1317 		/* rdma wr specifics */
1318 		rdma_req->data.wr.num_sge = rdma_req->req.iovcnt;
1319 		rdma_req->data.wr.wr.rdma.rkey = sgl->keyed.key;
1320 		rdma_req->data.wr.wr.rdma.remote_addr = sgl->address;
1321 		if (rdma_req->req.xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
1322 			rdma_req->data.wr.opcode = IBV_WR_RDMA_WRITE;
1323 			rdma_req->data.wr.next = &rdma_req->rsp.wr;
1324 		} else if (rdma_req->req.xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) {
1325 			rdma_req->data.wr.opcode = IBV_WR_RDMA_READ;
1326 			rdma_req->data.wr.next = NULL;
1327 		}
1328 
1329 		/* set the number of outstanding data WRs for this request. */
1330 		rdma_req->num_outstanding_data_wr = 1;
1331 
1332 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Request %p took %d buffer/s from central pool\n", rdma_req,
1333 			      rdma_req->req.iovcnt);
1334 
1335 		return 0;
1336 	} else if (sgl->generic.type == SPDK_NVME_SGL_TYPE_DATA_BLOCK &&
1337 		   sgl->unkeyed.subtype == SPDK_NVME_SGL_SUBTYPE_OFFSET) {
1338 		uint64_t offset = sgl->address;
1339 		uint32_t max_len = rtransport->transport.opts.in_capsule_data_size;
1340 
1341 		SPDK_DEBUGLOG(SPDK_LOG_NVMF, "In-capsule data: offset 0x%" PRIx64 ", length 0x%x\n",
1342 			      offset, sgl->unkeyed.length);
1343 
1344 		if (offset > max_len) {
1345 			SPDK_ERRLOG("In-capsule offset 0x%" PRIx64 " exceeds capsule length 0x%x\n",
1346 				    offset, max_len);
1347 			rsp->status.sc = SPDK_NVME_SC_INVALID_SGL_OFFSET;
1348 			return -1;
1349 		}
1350 		max_len -= (uint32_t)offset;
1351 
1352 		if (sgl->unkeyed.length > max_len) {
1353 			SPDK_ERRLOG("In-capsule data length 0x%x exceeds capsule length 0x%x\n",
1354 				    sgl->unkeyed.length, max_len);
1355 			rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID;
1356 			return -1;
1357 		}
1358 
1359 		rdma_req->num_outstanding_data_wr = 0;
1360 		rdma_req->req.data = rdma_req->recv->buf + offset;
1361 		rdma_req->data_from_pool = false;
1362 		rdma_req->req.length = sgl->unkeyed.length;
1363 
1364 		rdma_req->req.iov[0].iov_base = rdma_req->req.data;
1365 		rdma_req->req.iov[0].iov_len = rdma_req->req.length;
1366 		rdma_req->req.iovcnt = 1;
1367 
1368 		return 0;
1369 	}
1370 
1371 	SPDK_ERRLOG("Invalid NVMf I/O Command SGL:  Type 0x%x, Subtype 0x%x\n",
1372 		    sgl->generic.type, sgl->generic.subtype);
1373 	rsp->status.sc = SPDK_NVME_SC_SGL_DESCRIPTOR_TYPE_INVALID;
1374 	return -1;
1375 }
1376 
1377 static void
1378 nvmf_rdma_request_free(struct spdk_nvmf_rdma_request *rdma_req,
1379 		       struct spdk_nvmf_rdma_transport	*rtransport)
1380 {
1381 	struct spdk_nvmf_rdma_qpair		*rqpair;
1382 	struct spdk_nvmf_rdma_poll_group	*rgroup;
1383 
1384 	rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
1385 	if (rdma_req->data_from_pool) {
1386 		rgroup = rqpair->poller->group;
1387 
1388 		spdk_nvmf_rdma_request_free_buffers(rdma_req, &rgroup->group, &rtransport->transport);
1389 	}
1390 	rdma_req->num_outstanding_data_wr = 0;
1391 	rdma_req->req.length = 0;
1392 	rdma_req->req.iovcnt = 0;
1393 	rdma_req->req.data = NULL;
1394 	rqpair->qd--;
1395 	STAILQ_INSERT_HEAD(&rqpair->free_queue, rdma_req, state_link);
1396 	rdma_req->state = RDMA_REQUEST_STATE_FREE;
1397 }
1398 
1399 static bool
1400 spdk_nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport,
1401 			       struct spdk_nvmf_rdma_request *rdma_req)
1402 {
1403 	struct spdk_nvmf_rdma_qpair	*rqpair;
1404 	struct spdk_nvmf_rdma_device	*device;
1405 	struct spdk_nvmf_rdma_poll_group *rgroup;
1406 	struct spdk_nvme_cpl		*rsp = &rdma_req->req.rsp->nvme_cpl;
1407 	int				rc;
1408 	struct spdk_nvmf_rdma_recv	*rdma_recv;
1409 	enum spdk_nvmf_rdma_request_state prev_state;
1410 	bool				progress = false;
1411 	int				data_posted;
1412 
1413 	rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
1414 	device = rqpair->port->device;
1415 	rgroup = rqpair->poller->group;
1416 
1417 	assert(rdma_req->state != RDMA_REQUEST_STATE_FREE);
1418 
1419 	/* If the queue pair is in an error state, force the request to the completed state
1420 	 * to release resources. */
1421 	if (rqpair->ibv_attr.qp_state == IBV_QPS_ERR || rqpair->qpair.state != SPDK_NVMF_QPAIR_ACTIVE) {
1422 		if (rdma_req->state == RDMA_REQUEST_STATE_NEED_BUFFER) {
1423 			TAILQ_REMOVE(&rgroup->pending_data_buf_queue, rdma_req, link);
1424 		}
1425 		rdma_req->state = RDMA_REQUEST_STATE_COMPLETED;
1426 	}
1427 
1428 	/* The loop here is to allow for several back-to-back state changes. */
1429 	do {
1430 		prev_state = rdma_req->state;
1431 
1432 		SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Request %p entering state %d\n", rdma_req, prev_state);
1433 
1434 		switch (rdma_req->state) {
1435 		case RDMA_REQUEST_STATE_FREE:
1436 			/* Some external code must kick a request into RDMA_REQUEST_STATE_NEW
1437 			 * to escape this state. */
1438 			break;
1439 		case RDMA_REQUEST_STATE_NEW:
1440 			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_NEW, 0, 0,
1441 					  (uintptr_t)rdma_req, (uintptr_t)rqpair->cm_id);
1442 			rdma_recv = rdma_req->recv;
1443 
1444 			/* The first element of the SGL is the NVMe command */
1445 			rdma_req->req.cmd = (union nvmf_h2c_msg *)rdma_recv->sgl[0].addr;
1446 			memset(rdma_req->req.rsp, 0, sizeof(*rdma_req->req.rsp));
1447 
1448 			if (rqpair->ibv_attr.qp_state == IBV_QPS_ERR  || rqpair->qpair.state != SPDK_NVMF_QPAIR_ACTIVE) {
1449 				rdma_req->state = RDMA_REQUEST_STATE_COMPLETED;
1450 				break;
1451 			}
1452 
1453 			/* The next state transition depends on the data transfer needs of this request. */
1454 			rdma_req->req.xfer = spdk_nvmf_rdma_request_get_xfer(rdma_req);
1455 
1456 			/* If no data to transfer, ready to execute. */
1457 			if (rdma_req->req.xfer == SPDK_NVME_DATA_NONE) {
1458 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
1459 				break;
1460 			}
1461 
1462 			rdma_req->state = RDMA_REQUEST_STATE_NEED_BUFFER;
1463 			TAILQ_INSERT_TAIL(&rgroup->pending_data_buf_queue, rdma_req, link);
1464 			break;
1465 		case RDMA_REQUEST_STATE_NEED_BUFFER:
1466 			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_NEED_BUFFER, 0, 0,
1467 					  (uintptr_t)rdma_req, (uintptr_t)rqpair->cm_id);
1468 
1469 			assert(rdma_req->req.xfer != SPDK_NVME_DATA_NONE);
1470 
1471 			if (rdma_req != TAILQ_FIRST(&rgroup->pending_data_buf_queue)) {
1472 				/* This request needs to wait in line to obtain a buffer */
1473 				break;
1474 			}
1475 
1476 			/* Try to get a data buffer */
1477 			rc = spdk_nvmf_rdma_request_parse_sgl(rtransport, device, rdma_req);
1478 			if (rc < 0) {
1479 				TAILQ_REMOVE(&rgroup->pending_data_buf_queue, rdma_req, link);
1480 				rsp->status.sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1481 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1482 				break;
1483 			}
1484 
1485 			if (!rdma_req->req.data) {
1486 				/* No buffers available. */
1487 				break;
1488 			}
1489 
1490 			TAILQ_REMOVE(&rgroup->pending_data_buf_queue, rdma_req, link);
1491 
1492 			/* If data is transferring from host to controller and the data didn't
1493 			 * arrive using in capsule data, we need to do a transfer from the host.
1494 			 */
1495 			if (rdma_req->req.xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER && rdma_req->data_from_pool) {
1496 				STAILQ_INSERT_TAIL(&rqpair->pending_rdma_read_queue, rdma_req, state_link);
1497 				rdma_req->state = RDMA_REQUEST_STATE_DATA_TRANSFER_TO_CONTROLLER_PENDING;
1498 				break;
1499 			}
1500 
1501 			rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
1502 			break;
1503 		case RDMA_REQUEST_STATE_DATA_TRANSFER_TO_CONTROLLER_PENDING:
1504 			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_DATA_TRANSFER_TO_CONTROLLER_PENDING, 0, 0,
1505 					  (uintptr_t)rdma_req, (uintptr_t)rqpair->cm_id);
1506 
1507 			if (rdma_req != STAILQ_FIRST(&rqpair->pending_rdma_read_queue)) {
1508 				/* This request needs to wait in line to perform RDMA */
1509 				break;
1510 			}
1511 			if (rqpair->current_send_depth + rdma_req->num_outstanding_data_wr > rqpair->max_send_depth
1512 			    || rqpair->current_read_depth + rdma_req->num_outstanding_data_wr > rqpair->max_read_depth) {
1513 				/* We can only have so many WRs outstanding. we have to wait until some finish. */
1514 				break;
1515 			}
1516 
1517 			/* We have already verified that this request is the head of the queue. */
1518 			STAILQ_REMOVE_HEAD(&rqpair->pending_rdma_read_queue, state_link);
1519 
1520 			rc = request_transfer_in(&rdma_req->req);
1521 			if (!rc) {
1522 				rdma_req->state = RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER;
1523 			} else {
1524 				rsp->status.sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1525 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1526 			}
1527 			break;
1528 		case RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER:
1529 			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER, 0, 0,
1530 					  (uintptr_t)rdma_req, (uintptr_t)rqpair->cm_id);
1531 			/* Some external code must kick a request into RDMA_REQUEST_STATE_READY_TO_EXECUTE
1532 			 * to escape this state. */
1533 			break;
1534 		case RDMA_REQUEST_STATE_READY_TO_EXECUTE:
1535 			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_READY_TO_EXECUTE, 0, 0,
1536 					  (uintptr_t)rdma_req, (uintptr_t)rqpair->cm_id);
1537 			rdma_req->state = RDMA_REQUEST_STATE_EXECUTING;
1538 			spdk_nvmf_request_exec(&rdma_req->req);
1539 			break;
1540 		case RDMA_REQUEST_STATE_EXECUTING:
1541 			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_EXECUTING, 0, 0,
1542 					  (uintptr_t)rdma_req, (uintptr_t)rqpair->cm_id);
1543 			/* Some external code must kick a request into RDMA_REQUEST_STATE_EXECUTED
1544 			 * to escape this state. */
1545 			break;
1546 		case RDMA_REQUEST_STATE_EXECUTED:
1547 			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_EXECUTED, 0, 0,
1548 					  (uintptr_t)rdma_req, (uintptr_t)rqpair->cm_id);
1549 			if (rdma_req->req.xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
1550 				STAILQ_INSERT_TAIL(&rqpair->pending_rdma_write_queue, rdma_req, state_link);
1551 				rdma_req->state = RDMA_REQUEST_STATE_DATA_TRANSFER_TO_HOST_PENDING;
1552 			} else {
1553 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1554 			}
1555 			break;
1556 		case RDMA_REQUEST_STATE_DATA_TRANSFER_TO_HOST_PENDING:
1557 			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_DATA_TRANSFER_TO_HOST_PENDING, 0, 0,
1558 					  (uintptr_t)rdma_req, (uintptr_t)rqpair->cm_id);
1559 
1560 			if (rdma_req != STAILQ_FIRST(&rqpair->pending_rdma_write_queue)) {
1561 				/* This request needs to wait in line to perform RDMA */
1562 				break;
1563 			}
1564 			if ((rqpair->current_send_depth + rdma_req->num_outstanding_data_wr + 1) >
1565 			    rqpair->max_send_depth) {
1566 				/* We can only have so many WRs outstanding. we have to wait until some finish.
1567 				 * +1 since each request has an additional wr in the resp. */
1568 				break;
1569 			}
1570 
1571 			/* We have already verified that this request is the head of the queue. */
1572 			STAILQ_REMOVE_HEAD(&rqpair->pending_rdma_write_queue, state_link);
1573 
1574 			/* The data transfer will be kicked off from
1575 			 * RDMA_REQUEST_STATE_READY_TO_COMPLETE state.
1576 			 */
1577 			rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE;
1578 			break;
1579 		case RDMA_REQUEST_STATE_READY_TO_COMPLETE:
1580 			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_READY_TO_COMPLETE, 0, 0,
1581 					  (uintptr_t)rdma_req, (uintptr_t)rqpair->cm_id);
1582 			rc = request_transfer_out(&rdma_req->req, &data_posted);
1583 			assert(rc == 0); /* No good way to handle this currently */
1584 			if (rc) {
1585 				rdma_req->state = RDMA_REQUEST_STATE_COMPLETED;
1586 			} else {
1587 				rdma_req->state = data_posted ? RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST :
1588 						  RDMA_REQUEST_STATE_COMPLETING;
1589 			}
1590 			break;
1591 		case RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST:
1592 			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST, 0, 0,
1593 					  (uintptr_t)rdma_req, (uintptr_t)rqpair->cm_id);
1594 			/* Some external code must kick a request into RDMA_REQUEST_STATE_COMPLETED
1595 			 * to escape this state. */
1596 			break;
1597 		case RDMA_REQUEST_STATE_COMPLETING:
1598 			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_COMPLETING, 0, 0,
1599 					  (uintptr_t)rdma_req, (uintptr_t)rqpair->cm_id);
1600 			/* Some external code must kick a request into RDMA_REQUEST_STATE_COMPLETED
1601 			 * to escape this state. */
1602 			break;
1603 		case RDMA_REQUEST_STATE_COMPLETED:
1604 			spdk_trace_record(TRACE_RDMA_REQUEST_STATE_COMPLETED, 0, 0,
1605 					  (uintptr_t)rdma_req, (uintptr_t)rqpair->cm_id);
1606 
1607 			nvmf_rdma_request_free(rdma_req, rtransport);
1608 			break;
1609 		case RDMA_REQUEST_NUM_STATES:
1610 		default:
1611 			assert(0);
1612 			break;
1613 		}
1614 
1615 		if (rdma_req->state != prev_state) {
1616 			progress = true;
1617 		}
1618 	} while (rdma_req->state != prev_state);
1619 
1620 	return progress;
1621 }
1622 
1623 /* Public API callbacks begin here */
1624 
1625 #define SPDK_NVMF_RDMA_DEFAULT_MAX_QUEUE_DEPTH 128
1626 #define SPDK_NVMF_RDMA_DEFAULT_AQ_DEPTH 128
1627 #define SPDK_NVMF_RDMA_DEFAULT_MAX_QPAIRS_PER_CTRLR 64
1628 #define SPDK_NVMF_RDMA_DEFAULT_IN_CAPSULE_DATA_SIZE 4096
1629 #define SPDK_NVMF_RDMA_DEFAULT_MAX_IO_SIZE 131072
1630 #define SPDK_NVMF_RDMA_MIN_IO_BUFFER_SIZE (SPDK_NVMF_RDMA_DEFAULT_MAX_IO_SIZE / SPDK_NVMF_MAX_SGL_ENTRIES)
1631 #define SPDK_NVMF_RDMA_DEFAULT_NUM_SHARED_BUFFERS 512
1632 #define SPDK_NVMF_RDMA_DEFAULT_BUFFER_CACHE_SIZE 32
1633 
1634 static void
1635 spdk_nvmf_rdma_opts_init(struct spdk_nvmf_transport_opts *opts)
1636 {
1637 	opts->max_queue_depth =		SPDK_NVMF_RDMA_DEFAULT_MAX_QUEUE_DEPTH;
1638 	opts->max_qpairs_per_ctrlr =	SPDK_NVMF_RDMA_DEFAULT_MAX_QPAIRS_PER_CTRLR;
1639 	opts->in_capsule_data_size =	SPDK_NVMF_RDMA_DEFAULT_IN_CAPSULE_DATA_SIZE;
1640 	opts->max_io_size =		SPDK_NVMF_RDMA_DEFAULT_MAX_IO_SIZE;
1641 	opts->io_unit_size =		SPDK_NVMF_RDMA_MIN_IO_BUFFER_SIZE;
1642 	opts->max_aq_depth =		SPDK_NVMF_RDMA_DEFAULT_AQ_DEPTH;
1643 	opts->num_shared_buffers =	SPDK_NVMF_RDMA_DEFAULT_NUM_SHARED_BUFFERS;
1644 	opts->buf_cache_size =		SPDK_NVMF_RDMA_DEFAULT_BUFFER_CACHE_SIZE;
1645 }
1646 
1647 static int spdk_nvmf_rdma_destroy(struct spdk_nvmf_transport *transport);
1648 
1649 static struct spdk_nvmf_transport *
1650 spdk_nvmf_rdma_create(struct spdk_nvmf_transport_opts *opts)
1651 {
1652 	int rc;
1653 	struct spdk_nvmf_rdma_transport *rtransport;
1654 	struct spdk_nvmf_rdma_device	*device, *tmp;
1655 	struct ibv_context		**contexts;
1656 	uint32_t			i;
1657 	int				flag;
1658 	uint32_t			sge_count;
1659 	uint32_t			min_shared_buffers;
1660 	int				max_device_sge = SPDK_NVMF_MAX_SGL_ENTRIES;
1661 
1662 	rtransport = calloc(1, sizeof(*rtransport));
1663 	if (!rtransport) {
1664 		return NULL;
1665 	}
1666 
1667 	if (pthread_mutex_init(&rtransport->lock, NULL)) {
1668 		SPDK_ERRLOG("pthread_mutex_init() failed\n");
1669 		free(rtransport);
1670 		return NULL;
1671 	}
1672 
1673 	TAILQ_INIT(&rtransport->devices);
1674 	TAILQ_INIT(&rtransport->ports);
1675 
1676 	rtransport->transport.ops = &spdk_nvmf_transport_rdma;
1677 
1678 	SPDK_INFOLOG(SPDK_LOG_RDMA, "*** RDMA Transport Init ***\n"
1679 		     "  Transport opts:  max_ioq_depth=%d, max_io_size=%d,\n"
1680 		     "  max_qpairs_per_ctrlr=%d, io_unit_size=%d,\n"
1681 		     "  in_capsule_data_size=%d, max_aq_depth=%d\n"
1682 		     "  num_shared_buffers=%d\n",
1683 		     opts->max_queue_depth,
1684 		     opts->max_io_size,
1685 		     opts->max_qpairs_per_ctrlr,
1686 		     opts->io_unit_size,
1687 		     opts->in_capsule_data_size,
1688 		     opts->max_aq_depth,
1689 		     opts->num_shared_buffers);
1690 
1691 	/* I/O unit size cannot be larger than max I/O size */
1692 	if (opts->io_unit_size > opts->max_io_size) {
1693 		opts->io_unit_size = opts->max_io_size;
1694 	}
1695 
1696 	if (opts->num_shared_buffers < (SPDK_NVMF_MAX_SGL_ENTRIES * 2)) {
1697 		SPDK_ERRLOG("The number of shared data buffers (%d) is less than"
1698 			    "the minimum number required to guarantee that forward progress can be made (%d)\n",
1699 			    opts->num_shared_buffers, (SPDK_NVMF_MAX_SGL_ENTRIES * 2));
1700 		spdk_nvmf_rdma_destroy(&rtransport->transport);
1701 		return NULL;
1702 	}
1703 
1704 	min_shared_buffers = spdk_thread_get_count() * opts->buf_cache_size;
1705 	if (min_shared_buffers > opts->num_shared_buffers) {
1706 		SPDK_ERRLOG("There are not enough buffers to satisfy"
1707 			    "per-poll group caches for each thread. (%" PRIu32 ")"
1708 			    "supplied. (%" PRIu32 ") required\n", opts->num_shared_buffers, min_shared_buffers);
1709 		SPDK_ERRLOG("Please specify a larger number of shared buffers\n");
1710 		spdk_nvmf_rdma_destroy(&rtransport->transport);
1711 		return NULL;
1712 	}
1713 
1714 	sge_count = opts->max_io_size / opts->io_unit_size;
1715 	if (sge_count > NVMF_DEFAULT_TX_SGE) {
1716 		SPDK_ERRLOG("Unsupported IO Unit size specified, %d bytes\n", opts->io_unit_size);
1717 		spdk_nvmf_rdma_destroy(&rtransport->transport);
1718 		return NULL;
1719 	}
1720 
1721 	rtransport->event_channel = rdma_create_event_channel();
1722 	if (rtransport->event_channel == NULL) {
1723 		SPDK_ERRLOG("rdma_create_event_channel() failed, %s\n", spdk_strerror(errno));
1724 		spdk_nvmf_rdma_destroy(&rtransport->transport);
1725 		return NULL;
1726 	}
1727 
1728 	flag = fcntl(rtransport->event_channel->fd, F_GETFL);
1729 	if (fcntl(rtransport->event_channel->fd, F_SETFL, flag | O_NONBLOCK) < 0) {
1730 		SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
1731 			    rtransport->event_channel->fd, spdk_strerror(errno));
1732 		spdk_nvmf_rdma_destroy(&rtransport->transport);
1733 		return NULL;
1734 	}
1735 
1736 	rtransport->data_wr_pool = spdk_mempool_create("spdk_nvmf_rdma_wr_data",
1737 				   opts->max_queue_depth * SPDK_NVMF_MAX_SGL_ENTRIES,
1738 				   sizeof(struct spdk_nvmf_rdma_request_data),
1739 				   SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
1740 				   SPDK_ENV_SOCKET_ID_ANY);
1741 	if (!rtransport->data_wr_pool) {
1742 		SPDK_ERRLOG("Unable to allocate work request pool for poll group\n");
1743 		spdk_nvmf_rdma_destroy(&rtransport->transport);
1744 		return NULL;
1745 	}
1746 
1747 	contexts = rdma_get_devices(NULL);
1748 	if (contexts == NULL) {
1749 		SPDK_ERRLOG("rdma_get_devices() failed: %s (%d)\n", spdk_strerror(errno), errno);
1750 		spdk_nvmf_rdma_destroy(&rtransport->transport);
1751 		return NULL;
1752 	}
1753 
1754 	i = 0;
1755 	rc = 0;
1756 	while (contexts[i] != NULL) {
1757 		device = calloc(1, sizeof(*device));
1758 		if (!device) {
1759 			SPDK_ERRLOG("Unable to allocate memory for RDMA devices.\n");
1760 			rc = -ENOMEM;
1761 			break;
1762 		}
1763 		device->context = contexts[i];
1764 		rc = ibv_query_device(device->context, &device->attr);
1765 		if (rc < 0) {
1766 			SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
1767 			free(device);
1768 			break;
1769 
1770 		}
1771 
1772 		max_device_sge = spdk_min(max_device_sge, device->attr.max_sge);
1773 
1774 #ifdef SPDK_CONFIG_RDMA_SEND_WITH_INVAL
1775 		if ((device->attr.device_cap_flags & IBV_DEVICE_MEM_MGT_EXTENSIONS) == 0) {
1776 			SPDK_WARNLOG("The libibverbs on this system supports SEND_WITH_INVALIDATE,");
1777 			SPDK_WARNLOG("but the device with vendor ID %u does not.\n", device->attr.vendor_id);
1778 		}
1779 
1780 		/**
1781 		 * The vendor ID is assigned by the IEEE and an ID of 0 implies Soft-RoCE.
1782 		 * The Soft-RoCE RXE driver does not currently support send with invalidate,
1783 		 * but incorrectly reports that it does. There are changes making their way
1784 		 * through the kernel now that will enable this feature. When they are merged,
1785 		 * we can conditionally enable this feature.
1786 		 *
1787 		 * TODO: enable this for versions of the kernel rxe driver that support it.
1788 		 */
1789 		if (device->attr.vendor_id == 0) {
1790 			device->attr.device_cap_flags &= ~(IBV_DEVICE_MEM_MGT_EXTENSIONS);
1791 		}
1792 #endif
1793 
1794 		/* set up device context async ev fd as NON_BLOCKING */
1795 		flag = fcntl(device->context->async_fd, F_GETFL);
1796 		rc = fcntl(device->context->async_fd, F_SETFL, flag | O_NONBLOCK);
1797 		if (rc < 0) {
1798 			SPDK_ERRLOG("Failed to set context async fd to NONBLOCK.\n");
1799 			free(device);
1800 			break;
1801 		}
1802 
1803 		TAILQ_INSERT_TAIL(&rtransport->devices, device, link);
1804 		i++;
1805 	}
1806 	rdma_free_devices(contexts);
1807 
1808 	if (opts->io_unit_size * max_device_sge < opts->max_io_size) {
1809 		/* divide and round up. */
1810 		opts->io_unit_size = (opts->max_io_size + max_device_sge - 1) / max_device_sge;
1811 
1812 		/* round up to the nearest 4k. */
1813 		opts->io_unit_size = (opts->io_unit_size + NVMF_DATA_BUFFER_ALIGNMENT - 1) & ~NVMF_DATA_BUFFER_MASK;
1814 
1815 		opts->io_unit_size = spdk_max(opts->io_unit_size, SPDK_NVMF_RDMA_MIN_IO_BUFFER_SIZE);
1816 		SPDK_NOTICELOG("Adjusting the io unit size to fit the device's maximum I/O size. New I/O unit size %u\n",
1817 			       opts->io_unit_size);
1818 	}
1819 
1820 	if (rc < 0) {
1821 		spdk_nvmf_rdma_destroy(&rtransport->transport);
1822 		return NULL;
1823 	}
1824 
1825 	/* Set up poll descriptor array to monitor events from RDMA and IB
1826 	 * in a single poll syscall
1827 	 */
1828 	rtransport->npoll_fds = i + 1;
1829 	i = 0;
1830 	rtransport->poll_fds = calloc(rtransport->npoll_fds, sizeof(struct pollfd));
1831 	if (rtransport->poll_fds == NULL) {
1832 		SPDK_ERRLOG("poll_fds allocation failed\n");
1833 		spdk_nvmf_rdma_destroy(&rtransport->transport);
1834 		return NULL;
1835 	}
1836 
1837 	rtransport->poll_fds[i].fd = rtransport->event_channel->fd;
1838 	rtransport->poll_fds[i++].events = POLLIN;
1839 
1840 	TAILQ_FOREACH_SAFE(device, &rtransport->devices, link, tmp) {
1841 		rtransport->poll_fds[i].fd = device->context->async_fd;
1842 		rtransport->poll_fds[i++].events = POLLIN;
1843 	}
1844 
1845 	return &rtransport->transport;
1846 }
1847 
1848 static int
1849 spdk_nvmf_rdma_destroy(struct spdk_nvmf_transport *transport)
1850 {
1851 	struct spdk_nvmf_rdma_transport	*rtransport;
1852 	struct spdk_nvmf_rdma_port	*port, *port_tmp;
1853 	struct spdk_nvmf_rdma_device	*device, *device_tmp;
1854 
1855 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1856 
1857 	TAILQ_FOREACH_SAFE(port, &rtransport->ports, link, port_tmp) {
1858 		TAILQ_REMOVE(&rtransport->ports, port, link);
1859 		rdma_destroy_id(port->id);
1860 		free(port);
1861 	}
1862 
1863 	if (rtransport->poll_fds != NULL) {
1864 		free(rtransport->poll_fds);
1865 	}
1866 
1867 	if (rtransport->event_channel != NULL) {
1868 		rdma_destroy_event_channel(rtransport->event_channel);
1869 	}
1870 
1871 	TAILQ_FOREACH_SAFE(device, &rtransport->devices, link, device_tmp) {
1872 		TAILQ_REMOVE(&rtransport->devices, device, link);
1873 		if (device->map) {
1874 			spdk_mem_map_free(&device->map);
1875 		}
1876 		if (device->pd) {
1877 			if (!g_nvmf_hooks.get_ibv_pd) {
1878 				ibv_dealloc_pd(device->pd);
1879 			}
1880 		}
1881 		free(device);
1882 	}
1883 
1884 	if (rtransport->data_wr_pool != NULL) {
1885 		if (spdk_mempool_count(rtransport->data_wr_pool) !=
1886 		    (transport->opts.max_queue_depth * SPDK_NVMF_MAX_SGL_ENTRIES)) {
1887 			SPDK_ERRLOG("transport wr pool count is %zu but should be %u\n",
1888 				    spdk_mempool_count(rtransport->data_wr_pool),
1889 				    transport->opts.max_queue_depth * SPDK_NVMF_MAX_SGL_ENTRIES);
1890 		}
1891 	}
1892 
1893 	spdk_mempool_free(rtransport->data_wr_pool);
1894 	pthread_mutex_destroy(&rtransport->lock);
1895 	free(rtransport);
1896 
1897 	return 0;
1898 }
1899 
1900 static int
1901 spdk_nvmf_rdma_trid_from_cm_id(struct rdma_cm_id *id,
1902 			       struct spdk_nvme_transport_id *trid,
1903 			       bool peer);
1904 
1905 const struct spdk_mem_map_ops g_nvmf_rdma_map_ops = {
1906 	.notify_cb = spdk_nvmf_rdma_mem_notify,
1907 	.are_contiguous = spdk_nvmf_rdma_check_contiguous_entries
1908 };
1909 
1910 static int
1911 spdk_nvmf_rdma_listen(struct spdk_nvmf_transport *transport,
1912 		      const struct spdk_nvme_transport_id *trid)
1913 {
1914 	struct spdk_nvmf_rdma_transport	*rtransport;
1915 	struct spdk_nvmf_rdma_device	*device;
1916 	struct spdk_nvmf_rdma_port	*port_tmp, *port;
1917 	struct ibv_pd			*pd;
1918 	struct addrinfo			*res;
1919 	struct addrinfo			hints;
1920 	int				family;
1921 	int				rc;
1922 
1923 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
1924 
1925 	port = calloc(1, sizeof(*port));
1926 	if (!port) {
1927 		return -ENOMEM;
1928 	}
1929 
1930 	/* Selectively copy the trid. Things like NQN don't matter here - that
1931 	 * mapping is enforced elsewhere.
1932 	 */
1933 	port->trid.trtype = SPDK_NVME_TRANSPORT_RDMA;
1934 	port->trid.adrfam = trid->adrfam;
1935 	snprintf(port->trid.traddr, sizeof(port->trid.traddr), "%s", trid->traddr);
1936 	snprintf(port->trid.trsvcid, sizeof(port->trid.trsvcid), "%s", trid->trsvcid);
1937 
1938 	pthread_mutex_lock(&rtransport->lock);
1939 	assert(rtransport->event_channel != NULL);
1940 	TAILQ_FOREACH(port_tmp, &rtransport->ports, link) {
1941 		if (spdk_nvme_transport_id_compare(&port_tmp->trid, &port->trid) == 0) {
1942 			port_tmp->ref++;
1943 			free(port);
1944 			/* Already listening at this address */
1945 			pthread_mutex_unlock(&rtransport->lock);
1946 			return 0;
1947 		}
1948 	}
1949 
1950 	rc = rdma_create_id(rtransport->event_channel, &port->id, port, RDMA_PS_TCP);
1951 	if (rc < 0) {
1952 		SPDK_ERRLOG("rdma_create_id() failed\n");
1953 		free(port);
1954 		pthread_mutex_unlock(&rtransport->lock);
1955 		return rc;
1956 	}
1957 
1958 	switch (port->trid.adrfam) {
1959 	case SPDK_NVMF_ADRFAM_IPV4:
1960 		family = AF_INET;
1961 		break;
1962 	case SPDK_NVMF_ADRFAM_IPV6:
1963 		family = AF_INET6;
1964 		break;
1965 	default:
1966 		SPDK_ERRLOG("Unhandled ADRFAM %d\n", port->trid.adrfam);
1967 		free(port);
1968 		pthread_mutex_unlock(&rtransport->lock);
1969 		return -EINVAL;
1970 	}
1971 
1972 	memset(&hints, 0, sizeof(hints));
1973 	hints.ai_family = family;
1974 	hints.ai_flags = AI_NUMERICSERV;
1975 	hints.ai_socktype = SOCK_STREAM;
1976 	hints.ai_protocol = 0;
1977 
1978 	rc = getaddrinfo(port->trid.traddr, port->trid.trsvcid, &hints, &res);
1979 	if (rc) {
1980 		SPDK_ERRLOG("getaddrinfo failed: %s (%d)\n", gai_strerror(rc), rc);
1981 		free(port);
1982 		pthread_mutex_unlock(&rtransport->lock);
1983 		return -EINVAL;
1984 	}
1985 
1986 	rc = rdma_bind_addr(port->id, res->ai_addr);
1987 	freeaddrinfo(res);
1988 
1989 	if (rc < 0) {
1990 		SPDK_ERRLOG("rdma_bind_addr() failed\n");
1991 		rdma_destroy_id(port->id);
1992 		free(port);
1993 		pthread_mutex_unlock(&rtransport->lock);
1994 		return rc;
1995 	}
1996 
1997 	if (!port->id->verbs) {
1998 		SPDK_ERRLOG("ibv_context is null\n");
1999 		rdma_destroy_id(port->id);
2000 		free(port);
2001 		pthread_mutex_unlock(&rtransport->lock);
2002 		return -1;
2003 	}
2004 
2005 	rc = rdma_listen(port->id, 10); /* 10 = backlog */
2006 	if (rc < 0) {
2007 		SPDK_ERRLOG("rdma_listen() failed\n");
2008 		rdma_destroy_id(port->id);
2009 		free(port);
2010 		pthread_mutex_unlock(&rtransport->lock);
2011 		return rc;
2012 	}
2013 
2014 	TAILQ_FOREACH(device, &rtransport->devices, link) {
2015 		if (device->context == port->id->verbs) {
2016 			port->device = device;
2017 			break;
2018 		}
2019 	}
2020 	if (!port->device) {
2021 		SPDK_ERRLOG("Accepted a connection with verbs %p, but unable to find a corresponding device.\n",
2022 			    port->id->verbs);
2023 		rdma_destroy_id(port->id);
2024 		free(port);
2025 		pthread_mutex_unlock(&rtransport->lock);
2026 		return -EINVAL;
2027 	}
2028 
2029 	pd = NULL;
2030 	if (g_nvmf_hooks.get_ibv_pd) {
2031 		if (spdk_nvmf_rdma_trid_from_cm_id(port->id, &port->trid, 1) < 0) {
2032 			rdma_destroy_id(port->id);
2033 			free(port);
2034 			pthread_mutex_unlock(&rtransport->lock);
2035 			return -EINVAL;
2036 		}
2037 
2038 		pd = g_nvmf_hooks.get_ibv_pd(&port->trid, port->id->verbs);
2039 	}
2040 
2041 	if (device->pd == NULL) {
2042 		/* Haven't created a protection domain yet. */
2043 
2044 		if (!g_nvmf_hooks.get_ibv_pd) {
2045 			device->pd = ibv_alloc_pd(device->context);
2046 			if (!device->pd) {
2047 				SPDK_ERRLOG("Unable to allocate protection domain.\n");
2048 				rdma_destroy_id(port->id);
2049 				free(port);
2050 				pthread_mutex_unlock(&rtransport->lock);
2051 				return -ENOMEM;
2052 			}
2053 		} else {
2054 			device->pd = pd;
2055 		}
2056 
2057 		assert(device->map == NULL);
2058 
2059 		device->map = spdk_mem_map_alloc(0, &g_nvmf_rdma_map_ops, device->pd);
2060 		if (!device->map) {
2061 			SPDK_ERRLOG("Unable to allocate memory map for listen address\n");
2062 			if (!g_nvmf_hooks.get_ibv_pd) {
2063 				ibv_dealloc_pd(device->pd);
2064 			}
2065 			rdma_destroy_id(port->id);
2066 			free(port);
2067 			pthread_mutex_unlock(&rtransport->lock);
2068 			return -ENOMEM;
2069 		}
2070 	} else if (g_nvmf_hooks.get_ibv_pd) {
2071 		/* A protection domain exists for this device, but the user has
2072 		 * enabled hooks. Verify that they only supply one pd per device. */
2073 		if (device->pd != pd) {
2074 			SPDK_ERRLOG("The NVMe-oF target only supports one protection domain per device.\n");
2075 			rdma_destroy_id(port->id);
2076 			free(port);
2077 			pthread_mutex_unlock(&rtransport->lock);
2078 			return -EINVAL;
2079 		}
2080 	}
2081 
2082 	assert(device->map != NULL);
2083 	assert(device->pd != NULL);
2084 
2085 	SPDK_INFOLOG(SPDK_LOG_RDMA, "*** NVMf Target Listening on %s port %d ***\n",
2086 		     port->trid.traddr, ntohs(rdma_get_src_port(port->id)));
2087 
2088 	port->ref = 1;
2089 
2090 	TAILQ_INSERT_TAIL(&rtransport->ports, port, link);
2091 	pthread_mutex_unlock(&rtransport->lock);
2092 
2093 	return 0;
2094 }
2095 
2096 static int
2097 spdk_nvmf_rdma_stop_listen(struct spdk_nvmf_transport *transport,
2098 			   const struct spdk_nvme_transport_id *_trid)
2099 {
2100 	struct spdk_nvmf_rdma_transport *rtransport;
2101 	struct spdk_nvmf_rdma_port *port, *tmp;
2102 	struct spdk_nvme_transport_id trid = {};
2103 
2104 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
2105 
2106 	/* Selectively copy the trid. Things like NQN don't matter here - that
2107 	 * mapping is enforced elsewhere.
2108 	 */
2109 	trid.trtype = SPDK_NVME_TRANSPORT_RDMA;
2110 	trid.adrfam = _trid->adrfam;
2111 	snprintf(trid.traddr, sizeof(port->trid.traddr), "%s", _trid->traddr);
2112 	snprintf(trid.trsvcid, sizeof(port->trid.trsvcid), "%s", _trid->trsvcid);
2113 
2114 	pthread_mutex_lock(&rtransport->lock);
2115 	TAILQ_FOREACH_SAFE(port, &rtransport->ports, link, tmp) {
2116 		if (spdk_nvme_transport_id_compare(&port->trid, &trid) == 0) {
2117 			assert(port->ref > 0);
2118 			port->ref--;
2119 			if (port->ref == 0) {
2120 				TAILQ_REMOVE(&rtransport->ports, port, link);
2121 				rdma_destroy_id(port->id);
2122 				free(port);
2123 			}
2124 			break;
2125 		}
2126 	}
2127 
2128 	pthread_mutex_unlock(&rtransport->lock);
2129 	return 0;
2130 }
2131 
2132 static bool
2133 spdk_nvmf_rdma_qpair_is_idle(struct spdk_nvmf_qpair *qpair)
2134 {
2135 	struct spdk_nvmf_rdma_qpair *rqpair;
2136 
2137 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
2138 
2139 	if (rqpair->qd == 0) {
2140 		return true;
2141 	}
2142 	return false;
2143 }
2144 
2145 static void
2146 spdk_nvmf_rdma_qpair_process_pending(struct spdk_nvmf_rdma_transport *rtransport,
2147 				     struct spdk_nvmf_rdma_qpair *rqpair, bool drain)
2148 {
2149 	struct spdk_nvmf_rdma_request	*rdma_req, *req_tmp;
2150 
2151 	/* We process I/O in the data transfer pending queue at the highest priority. RDMA reads first */
2152 	STAILQ_FOREACH_SAFE(rdma_req, &rqpair->pending_rdma_read_queue, state_link, req_tmp) {
2153 		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false && drain == false) {
2154 			break;
2155 		}
2156 	}
2157 
2158 	/* Then RDMA writes since reads have stronger restrictions than writes */
2159 	STAILQ_FOREACH_SAFE(rdma_req, &rqpair->pending_rdma_write_queue, state_link, req_tmp) {
2160 		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false && drain == false) {
2161 			break;
2162 		}
2163 	}
2164 
2165 	/* The second highest priority is I/O waiting on memory buffers. */
2166 	TAILQ_FOREACH_SAFE(rdma_req, &rqpair->poller->group->pending_data_buf_queue, link,
2167 			   req_tmp) {
2168 		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false && drain == false) {
2169 			break;
2170 		}
2171 	}
2172 
2173 	while (!STAILQ_EMPTY(&rqpair->free_queue) && !STAILQ_EMPTY(&rqpair->incoming_queue)) {
2174 
2175 		rdma_req = STAILQ_FIRST(&rqpair->free_queue);
2176 		STAILQ_REMOVE_HEAD(&rqpair->free_queue, state_link);
2177 		rdma_req->recv = STAILQ_FIRST(&rqpair->incoming_queue);
2178 		STAILQ_REMOVE_HEAD(&rqpair->incoming_queue, link);
2179 
2180 		rqpair->qd++;
2181 		rdma_req->state = RDMA_REQUEST_STATE_NEW;
2182 		if (spdk_nvmf_rdma_request_process(rtransport, rdma_req) == false) {
2183 			break;
2184 		}
2185 	}
2186 }
2187 
2188 static void
2189 _nvmf_rdma_qpair_disconnect(void *ctx)
2190 {
2191 	struct spdk_nvmf_qpair *qpair = ctx;
2192 
2193 	spdk_nvmf_qpair_disconnect(qpair, NULL, NULL);
2194 }
2195 
2196 static void
2197 _nvmf_rdma_try_disconnect(void *ctx)
2198 {
2199 	struct spdk_nvmf_qpair *qpair = ctx;
2200 	struct spdk_nvmf_poll_group *group;
2201 
2202 	/* Read the group out of the qpair. This is normally set and accessed only from
2203 	 * the thread that created the group. Here, we're not on that thread necessarily.
2204 	 * The data member qpair->group begins it's life as NULL and then is assigned to
2205 	 * a pointer and never changes. So fortunately reading this and checking for
2206 	 * non-NULL is thread safe in the x86_64 memory model. */
2207 	group = qpair->group;
2208 
2209 	if (group == NULL) {
2210 		/* The qpair hasn't been assigned to a group yet, so we can't
2211 		 * process a disconnect. Send a message to ourself and try again. */
2212 		spdk_thread_send_msg(spdk_get_thread(), _nvmf_rdma_try_disconnect, qpair);
2213 		return;
2214 	}
2215 
2216 	spdk_thread_send_msg(group->thread, _nvmf_rdma_qpair_disconnect, qpair);
2217 }
2218 
2219 static inline void
2220 spdk_nvmf_rdma_start_disconnect(struct spdk_nvmf_rdma_qpair *rqpair)
2221 {
2222 	if (__sync_bool_compare_and_swap(&rqpair->disconnect_started, false, true)) {
2223 		_nvmf_rdma_try_disconnect(&rqpair->qpair);
2224 	}
2225 }
2226 
2227 
2228 static int
2229 nvmf_rdma_disconnect(struct rdma_cm_event *evt)
2230 {
2231 	struct spdk_nvmf_qpair		*qpair;
2232 	struct spdk_nvmf_rdma_qpair	*rqpair;
2233 
2234 	if (evt->id == NULL) {
2235 		SPDK_ERRLOG("disconnect request: missing cm_id\n");
2236 		return -1;
2237 	}
2238 
2239 	qpair = evt->id->context;
2240 	if (qpair == NULL) {
2241 		SPDK_ERRLOG("disconnect request: no active connection\n");
2242 		return -1;
2243 	}
2244 
2245 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
2246 
2247 	spdk_trace_record(TRACE_RDMA_QP_DISCONNECT, 0, 0, (uintptr_t)rqpair->cm_id, 0);
2248 
2249 	spdk_nvmf_rdma_update_ibv_state(rqpair);
2250 
2251 	spdk_nvmf_rdma_start_disconnect(rqpair);
2252 
2253 	return 0;
2254 }
2255 
2256 #ifdef DEBUG
2257 static const char *CM_EVENT_STR[] = {
2258 	"RDMA_CM_EVENT_ADDR_RESOLVED",
2259 	"RDMA_CM_EVENT_ADDR_ERROR",
2260 	"RDMA_CM_EVENT_ROUTE_RESOLVED",
2261 	"RDMA_CM_EVENT_ROUTE_ERROR",
2262 	"RDMA_CM_EVENT_CONNECT_REQUEST",
2263 	"RDMA_CM_EVENT_CONNECT_RESPONSE",
2264 	"RDMA_CM_EVENT_CONNECT_ERROR",
2265 	"RDMA_CM_EVENT_UNREACHABLE",
2266 	"RDMA_CM_EVENT_REJECTED",
2267 	"RDMA_CM_EVENT_ESTABLISHED",
2268 	"RDMA_CM_EVENT_DISCONNECTED",
2269 	"RDMA_CM_EVENT_DEVICE_REMOVAL",
2270 	"RDMA_CM_EVENT_MULTICAST_JOIN",
2271 	"RDMA_CM_EVENT_MULTICAST_ERROR",
2272 	"RDMA_CM_EVENT_ADDR_CHANGE",
2273 	"RDMA_CM_EVENT_TIMEWAIT_EXIT"
2274 };
2275 #endif /* DEBUG */
2276 
2277 static void
2278 spdk_nvmf_process_cm_event(struct spdk_nvmf_transport *transport, new_qpair_fn cb_fn)
2279 {
2280 	struct spdk_nvmf_rdma_transport *rtransport;
2281 	struct rdma_cm_event		*event;
2282 	int				rc;
2283 
2284 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
2285 
2286 	if (rtransport->event_channel == NULL) {
2287 		return;
2288 	}
2289 
2290 	while (1) {
2291 		rc = rdma_get_cm_event(rtransport->event_channel, &event);
2292 		if (rc == 0) {
2293 			SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Acceptor Event: %s\n", CM_EVENT_STR[event->event]);
2294 
2295 			spdk_trace_record(TRACE_RDMA_CM_ASYNC_EVENT, 0, 0, 0, event->event);
2296 
2297 			switch (event->event) {
2298 			case RDMA_CM_EVENT_ADDR_RESOLVED:
2299 			case RDMA_CM_EVENT_ADDR_ERROR:
2300 			case RDMA_CM_EVENT_ROUTE_RESOLVED:
2301 			case RDMA_CM_EVENT_ROUTE_ERROR:
2302 				/* No action required. The target never attempts to resolve routes. */
2303 				break;
2304 			case RDMA_CM_EVENT_CONNECT_REQUEST:
2305 				rc = nvmf_rdma_connect(transport, event, cb_fn);
2306 				if (rc < 0) {
2307 					SPDK_ERRLOG("Unable to process connect event. rc: %d\n", rc);
2308 					break;
2309 				}
2310 				break;
2311 			case RDMA_CM_EVENT_CONNECT_RESPONSE:
2312 				/* The target never initiates a new connection. So this will not occur. */
2313 				break;
2314 			case RDMA_CM_EVENT_CONNECT_ERROR:
2315 				/* Can this happen? The docs say it can, but not sure what causes it. */
2316 				break;
2317 			case RDMA_CM_EVENT_UNREACHABLE:
2318 			case RDMA_CM_EVENT_REJECTED:
2319 				/* These only occur on the client side. */
2320 				break;
2321 			case RDMA_CM_EVENT_ESTABLISHED:
2322 				/* TODO: Should we be waiting for this event anywhere? */
2323 				break;
2324 			case RDMA_CM_EVENT_DISCONNECTED:
2325 			case RDMA_CM_EVENT_DEVICE_REMOVAL:
2326 				rc = nvmf_rdma_disconnect(event);
2327 				if (rc < 0) {
2328 					SPDK_ERRLOG("Unable to process disconnect event. rc: %d\n", rc);
2329 					break;
2330 				}
2331 				break;
2332 			case RDMA_CM_EVENT_MULTICAST_JOIN:
2333 			case RDMA_CM_EVENT_MULTICAST_ERROR:
2334 				/* Multicast is not used */
2335 				break;
2336 			case RDMA_CM_EVENT_ADDR_CHANGE:
2337 				/* Not utilizing this event */
2338 				break;
2339 			case RDMA_CM_EVENT_TIMEWAIT_EXIT:
2340 				/* For now, do nothing. The target never re-uses queue pairs. */
2341 				break;
2342 			default:
2343 				SPDK_ERRLOG("Unexpected Acceptor Event [%d]\n", event->event);
2344 				break;
2345 			}
2346 
2347 			rdma_ack_cm_event(event);
2348 		} else {
2349 			if (errno != EAGAIN && errno != EWOULDBLOCK) {
2350 				SPDK_ERRLOG("Acceptor Event Error: %s\n", spdk_strerror(errno));
2351 			}
2352 			break;
2353 		}
2354 	}
2355 }
2356 
2357 static void
2358 spdk_nvmf_process_ib_event(struct spdk_nvmf_rdma_device *device)
2359 {
2360 	int				rc;
2361 	struct spdk_nvmf_rdma_qpair	*rqpair;
2362 	struct ibv_async_event		event;
2363 	enum ibv_qp_state		state;
2364 
2365 	rc = ibv_get_async_event(device->context, &event);
2366 
2367 	if (rc) {
2368 		SPDK_ERRLOG("Failed to get async_event (%d): %s\n",
2369 			    errno, spdk_strerror(errno));
2370 		return;
2371 	}
2372 
2373 	SPDK_NOTICELOG("Async event: %s\n",
2374 		       ibv_event_type_str(event.event_type));
2375 
2376 	switch (event.event_type) {
2377 	case IBV_EVENT_QP_FATAL:
2378 		rqpair = event.element.qp->qp_context;
2379 		spdk_trace_record(TRACE_RDMA_IBV_ASYNC_EVENT, 0, 0,
2380 				  (uintptr_t)rqpair->cm_id, event.event_type);
2381 		spdk_nvmf_rdma_update_ibv_state(rqpair);
2382 		spdk_nvmf_rdma_start_disconnect(rqpair);
2383 		break;
2384 	case IBV_EVENT_QP_LAST_WQE_REACHED:
2385 		/* This event only occurs for shared receive queues, which are not currently supported. */
2386 		break;
2387 	case IBV_EVENT_SQ_DRAINED:
2388 		/* This event occurs frequently in both error and non-error states.
2389 		 * Check if the qpair is in an error state before sending a message.
2390 		 * Note that we're not on the correct thread to access the qpair, but
2391 		 * the operations that the below calls make all happen to be thread
2392 		 * safe. */
2393 		rqpair = event.element.qp->qp_context;
2394 		spdk_trace_record(TRACE_RDMA_IBV_ASYNC_EVENT, 0, 0,
2395 				  (uintptr_t)rqpair->cm_id, event.event_type);
2396 		state = spdk_nvmf_rdma_update_ibv_state(rqpair);
2397 		if (state == IBV_QPS_ERR) {
2398 			spdk_nvmf_rdma_start_disconnect(rqpair);
2399 		}
2400 		break;
2401 	case IBV_EVENT_QP_REQ_ERR:
2402 	case IBV_EVENT_QP_ACCESS_ERR:
2403 	case IBV_EVENT_COMM_EST:
2404 	case IBV_EVENT_PATH_MIG:
2405 	case IBV_EVENT_PATH_MIG_ERR:
2406 		rqpair = event.element.qp->qp_context;
2407 		spdk_trace_record(TRACE_RDMA_IBV_ASYNC_EVENT, 0, 0,
2408 				  (uintptr_t)rqpair->cm_id, event.event_type);
2409 		spdk_nvmf_rdma_update_ibv_state(rqpair);
2410 		break;
2411 	case IBV_EVENT_CQ_ERR:
2412 	case IBV_EVENT_DEVICE_FATAL:
2413 	case IBV_EVENT_PORT_ACTIVE:
2414 	case IBV_EVENT_PORT_ERR:
2415 	case IBV_EVENT_LID_CHANGE:
2416 	case IBV_EVENT_PKEY_CHANGE:
2417 	case IBV_EVENT_SM_CHANGE:
2418 	case IBV_EVENT_SRQ_ERR:
2419 	case IBV_EVENT_SRQ_LIMIT_REACHED:
2420 	case IBV_EVENT_CLIENT_REREGISTER:
2421 	case IBV_EVENT_GID_CHANGE:
2422 	default:
2423 		spdk_trace_record(TRACE_RDMA_IBV_ASYNC_EVENT, 0, 0, 0, event.event_type);
2424 		break;
2425 	}
2426 	ibv_ack_async_event(&event);
2427 }
2428 
2429 static void
2430 spdk_nvmf_rdma_accept(struct spdk_nvmf_transport *transport, new_qpair_fn cb_fn)
2431 {
2432 	int	nfds, i = 0;
2433 	struct spdk_nvmf_rdma_transport *rtransport;
2434 	struct spdk_nvmf_rdma_device *device, *tmp;
2435 
2436 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
2437 	nfds = poll(rtransport->poll_fds, rtransport->npoll_fds, 0);
2438 
2439 	if (nfds <= 0) {
2440 		return;
2441 	}
2442 
2443 	/* The first poll descriptor is RDMA CM event */
2444 	if (rtransport->poll_fds[i++].revents & POLLIN) {
2445 		spdk_nvmf_process_cm_event(transport, cb_fn);
2446 		nfds--;
2447 	}
2448 
2449 	if (nfds == 0) {
2450 		return;
2451 	}
2452 
2453 	/* Second and subsequent poll descriptors are IB async events */
2454 	TAILQ_FOREACH_SAFE(device, &rtransport->devices, link, tmp) {
2455 		if (rtransport->poll_fds[i++].revents & POLLIN) {
2456 			spdk_nvmf_process_ib_event(device);
2457 			nfds--;
2458 		}
2459 	}
2460 	/* check all flagged fd's have been served */
2461 	assert(nfds == 0);
2462 }
2463 
2464 static void
2465 spdk_nvmf_rdma_discover(struct spdk_nvmf_transport *transport,
2466 			struct spdk_nvme_transport_id *trid,
2467 			struct spdk_nvmf_discovery_log_page_entry *entry)
2468 {
2469 	entry->trtype = SPDK_NVMF_TRTYPE_RDMA;
2470 	entry->adrfam = trid->adrfam;
2471 	entry->treq.secure_channel = SPDK_NVMF_TREQ_SECURE_CHANNEL_NOT_SPECIFIED;
2472 
2473 	spdk_strcpy_pad(entry->trsvcid, trid->trsvcid, sizeof(entry->trsvcid), ' ');
2474 	spdk_strcpy_pad(entry->traddr, trid->traddr, sizeof(entry->traddr), ' ');
2475 
2476 	entry->tsas.rdma.rdma_qptype = SPDK_NVMF_RDMA_QPTYPE_RELIABLE_CONNECTED;
2477 	entry->tsas.rdma.rdma_prtype = SPDK_NVMF_RDMA_PRTYPE_NONE;
2478 	entry->tsas.rdma.rdma_cms = SPDK_NVMF_RDMA_CMS_RDMA_CM;
2479 }
2480 
2481 static struct spdk_nvmf_transport_poll_group *
2482 spdk_nvmf_rdma_poll_group_create(struct spdk_nvmf_transport *transport)
2483 {
2484 	struct spdk_nvmf_rdma_transport		*rtransport;
2485 	struct spdk_nvmf_rdma_poll_group	*rgroup;
2486 	struct spdk_nvmf_rdma_poller		*poller, *tpoller;
2487 	struct spdk_nvmf_rdma_device		*device;
2488 
2489 	rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport);
2490 
2491 	rgroup = calloc(1, sizeof(*rgroup));
2492 	if (!rgroup) {
2493 		return NULL;
2494 	}
2495 
2496 	TAILQ_INIT(&rgroup->pollers);
2497 	TAILQ_INIT(&rgroup->pending_data_buf_queue);
2498 
2499 	pthread_mutex_lock(&rtransport->lock);
2500 	TAILQ_FOREACH(device, &rtransport->devices, link) {
2501 		poller = calloc(1, sizeof(*poller));
2502 		if (!poller) {
2503 			SPDK_ERRLOG("Unable to allocate memory for new RDMA poller\n");
2504 			goto err_exit;
2505 		}
2506 
2507 		poller->device = device;
2508 		poller->group = rgroup;
2509 
2510 		TAILQ_INIT(&poller->qpairs);
2511 
2512 		poller->cq = ibv_create_cq(device->context, DEFAULT_NVMF_RDMA_CQ_SIZE, poller, NULL, 0);
2513 		if (!poller->cq) {
2514 			SPDK_ERRLOG("Unable to create completion queue\n");
2515 			free(poller);
2516 			goto err_exit;
2517 		}
2518 		poller->num_cqe = DEFAULT_NVMF_RDMA_CQ_SIZE;
2519 
2520 		TAILQ_INSERT_TAIL(&rgroup->pollers, poller, link);
2521 	}
2522 
2523 	pthread_mutex_unlock(&rtransport->lock);
2524 	return &rgroup->group;
2525 
2526 err_exit:
2527 	TAILQ_FOREACH_SAFE(poller, &rgroup->pollers, link, tpoller) {
2528 		TAILQ_REMOVE(&rgroup->pollers, poller, link);
2529 		if (poller->cq) {
2530 			ibv_destroy_cq(poller->cq);
2531 		}
2532 		free(poller);
2533 	}
2534 
2535 	free(rgroup);
2536 	pthread_mutex_unlock(&rtransport->lock);
2537 	return NULL;
2538 }
2539 
2540 static void
2541 spdk_nvmf_rdma_poll_group_destroy(struct spdk_nvmf_transport_poll_group *group)
2542 {
2543 	struct spdk_nvmf_rdma_poll_group	*rgroup;
2544 	struct spdk_nvmf_rdma_poller		*poller, *tmp;
2545 	struct spdk_nvmf_rdma_qpair		*qpair, *tmp_qpair;
2546 
2547 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
2548 
2549 	if (!rgroup) {
2550 		return;
2551 	}
2552 
2553 	TAILQ_FOREACH_SAFE(poller, &rgroup->pollers, link, tmp) {
2554 		TAILQ_REMOVE(&rgroup->pollers, poller, link);
2555 
2556 		if (poller->cq) {
2557 			ibv_destroy_cq(poller->cq);
2558 		}
2559 		TAILQ_FOREACH_SAFE(qpair, &poller->qpairs, link, tmp_qpair) {
2560 			spdk_nvmf_rdma_qpair_destroy(qpair);
2561 		}
2562 
2563 		free(poller);
2564 	}
2565 
2566 	if (!TAILQ_EMPTY(&rgroup->pending_data_buf_queue)) {
2567 		SPDK_ERRLOG("Pending I/O list wasn't empty on poll group destruction\n");
2568 	}
2569 
2570 	free(rgroup);
2571 }
2572 
2573 static void
2574 spdk_nvmf_rdma_qpair_reject_connection(struct spdk_nvmf_rdma_qpair *rqpair)
2575 {
2576 	spdk_nvmf_rdma_event_reject(rqpair->cm_id, SPDK_NVMF_RDMA_ERROR_NO_RESOURCES);
2577 	spdk_nvmf_rdma_qpair_destroy(rqpair);
2578 }
2579 
2580 static int
2581 spdk_nvmf_rdma_poll_group_add(struct spdk_nvmf_transport_poll_group *group,
2582 			      struct spdk_nvmf_qpair *qpair)
2583 {
2584 	struct spdk_nvmf_rdma_poll_group	*rgroup;
2585 	struct spdk_nvmf_rdma_qpair		*rqpair;
2586 	struct spdk_nvmf_rdma_device		*device;
2587 	struct spdk_nvmf_rdma_poller		*poller;
2588 	int					rc;
2589 
2590 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
2591 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
2592 
2593 	device = rqpair->port->device;
2594 
2595 	TAILQ_FOREACH(poller, &rgroup->pollers, link) {
2596 		if (poller->device == device) {
2597 			break;
2598 		}
2599 	}
2600 
2601 	if (!poller) {
2602 		SPDK_ERRLOG("No poller found for device.\n");
2603 		return -1;
2604 	}
2605 
2606 	TAILQ_INSERT_TAIL(&poller->qpairs, rqpair, link);
2607 	rqpair->poller = poller;
2608 
2609 	rc = spdk_nvmf_rdma_qpair_initialize(qpair);
2610 	if (rc < 0) {
2611 		SPDK_ERRLOG("Failed to initialize nvmf_rdma_qpair with qpair=%p\n", qpair);
2612 		return -1;
2613 	}
2614 
2615 	rc = spdk_nvmf_rdma_event_accept(rqpair->cm_id, rqpair);
2616 	if (rc) {
2617 		/* Try to reject, but we probably can't */
2618 		spdk_nvmf_rdma_qpair_reject_connection(rqpair);
2619 		return -1;
2620 	}
2621 
2622 	spdk_nvmf_rdma_update_ibv_state(rqpair);
2623 
2624 	return 0;
2625 }
2626 
2627 static int
2628 spdk_nvmf_rdma_request_free(struct spdk_nvmf_request *req)
2629 {
2630 	struct spdk_nvmf_rdma_request	*rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req);
2631 	struct spdk_nvmf_rdma_transport	*rtransport = SPDK_CONTAINEROF(req->qpair->transport,
2632 			struct spdk_nvmf_rdma_transport, transport);
2633 
2634 	nvmf_rdma_request_free(rdma_req, rtransport);
2635 	return 0;
2636 }
2637 
2638 static int
2639 spdk_nvmf_rdma_request_complete(struct spdk_nvmf_request *req)
2640 {
2641 	struct spdk_nvmf_rdma_transport	*rtransport = SPDK_CONTAINEROF(req->qpair->transport,
2642 			struct spdk_nvmf_rdma_transport, transport);
2643 	struct spdk_nvmf_rdma_request	*rdma_req = SPDK_CONTAINEROF(req,
2644 			struct spdk_nvmf_rdma_request, req);
2645 	struct spdk_nvmf_rdma_qpair     *rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair,
2646 			struct spdk_nvmf_rdma_qpair, qpair);
2647 
2648 	if (rqpair->ibv_attr.qp_state != IBV_QPS_ERR) {
2649 		/* The connection is alive, so process the request as normal */
2650 		rdma_req->state = RDMA_REQUEST_STATE_EXECUTED;
2651 	} else {
2652 		/* The connection is dead. Move the request directly to the completed state. */
2653 		rdma_req->state = RDMA_REQUEST_STATE_COMPLETED;
2654 	}
2655 
2656 	spdk_nvmf_rdma_request_process(rtransport, rdma_req);
2657 
2658 	return 0;
2659 }
2660 
2661 static void
2662 spdk_nvmf_rdma_close_qpair(struct spdk_nvmf_qpair *qpair)
2663 {
2664 	struct spdk_nvmf_rdma_qpair *rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
2665 	struct ibv_recv_wr recv_wr = {};
2666 	struct ibv_recv_wr *bad_recv_wr;
2667 	struct ibv_send_wr send_wr = {};
2668 	struct ibv_send_wr *bad_send_wr;
2669 	int rc;
2670 
2671 	if (rqpair->disconnect_flags & RDMA_QP_DISCONNECTING) {
2672 		return;
2673 	}
2674 
2675 	rqpair->disconnect_flags |= RDMA_QP_DISCONNECTING;
2676 
2677 	/* This happens only when the qpair is disconnected before
2678 	 * it is added to the poll group. Since there is no poll group,
2679 	 * the RDMA qp has not been initialized yet and the RDMA CM
2680 	 * event has not yet been acknowledged, so we need to reject it.
2681 	 */
2682 	if (rqpair->qpair.state == SPDK_NVMF_QPAIR_UNINITIALIZED) {
2683 		spdk_nvmf_rdma_qpair_reject_connection(rqpair);
2684 		return;
2685 	}
2686 
2687 	if (rqpair->ibv_attr.qp_state != IBV_QPS_ERR) {
2688 		spdk_nvmf_rdma_set_ibv_state(rqpair, IBV_QPS_ERR);
2689 	}
2690 
2691 	rqpair->drain_recv_wr.type = RDMA_WR_TYPE_DRAIN_RECV;
2692 	recv_wr.wr_id = (uintptr_t)&rqpair->drain_recv_wr;
2693 	rc = ibv_post_recv(rqpair->cm_id->qp, &recv_wr, &bad_recv_wr);
2694 	if (rc) {
2695 		SPDK_ERRLOG("Failed to post dummy receive WR, errno %d\n", errno);
2696 		assert(false);
2697 		return;
2698 	}
2699 
2700 	rqpair->drain_send_wr.type = RDMA_WR_TYPE_DRAIN_SEND;
2701 	send_wr.wr_id = (uintptr_t)&rqpair->drain_send_wr;
2702 	send_wr.opcode = IBV_WR_SEND;
2703 	rc = ibv_post_send(rqpair->cm_id->qp, &send_wr, &bad_send_wr);
2704 	if (rc) {
2705 		SPDK_ERRLOG("Failed to post dummy send WR, errno %d\n", errno);
2706 		assert(false);
2707 		return;
2708 	}
2709 	rqpair->current_send_depth++;
2710 }
2711 
2712 #ifdef DEBUG
2713 static int
2714 spdk_nvmf_rdma_req_is_completing(struct spdk_nvmf_rdma_request *rdma_req)
2715 {
2716 	return rdma_req->state == RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST ||
2717 	       rdma_req->state == RDMA_REQUEST_STATE_COMPLETING;
2718 }
2719 #endif
2720 
2721 static int
2722 spdk_nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport,
2723 			   struct spdk_nvmf_rdma_poller *rpoller)
2724 {
2725 	struct ibv_wc wc[32];
2726 	struct spdk_nvmf_rdma_wr	*rdma_wr;
2727 	struct spdk_nvmf_rdma_request	*rdma_req;
2728 	struct spdk_nvmf_rdma_recv	*rdma_recv;
2729 	struct spdk_nvmf_rdma_qpair	*rqpair;
2730 	int reaped, i;
2731 	int count = 0;
2732 	bool error = false;
2733 
2734 	/* Poll for completing operations. */
2735 	reaped = ibv_poll_cq(rpoller->cq, 32, wc);
2736 	if (reaped < 0) {
2737 		SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
2738 			    errno, spdk_strerror(errno));
2739 		return -1;
2740 	}
2741 
2742 	for (i = 0; i < reaped; i++) {
2743 
2744 		rdma_wr = (struct spdk_nvmf_rdma_wr *)wc[i].wr_id;
2745 
2746 		/* Handle error conditions */
2747 		if (wc[i].status) {
2748 			SPDK_DEBUGLOG(SPDK_LOG_RDMA, "CQ error on CQ %p, Request 0x%lu (%d): %s\n",
2749 				      rpoller->cq, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
2750 
2751 			error = true;
2752 
2753 			switch (rdma_wr->type) {
2754 			case RDMA_WR_TYPE_SEND:
2755 				rdma_req = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvmf_rdma_request, rsp.rdma_wr);
2756 				rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
2757 
2758 				SPDK_ERRLOG("data=%p length=%u\n", rdma_req->req.data, rdma_req->req.length);
2759 				/* We're going to attempt an error recovery, so force the request into
2760 				 * the completed state. */
2761 				rdma_req->state = RDMA_REQUEST_STATE_COMPLETED;
2762 				rqpair->current_send_depth--;
2763 
2764 				assert(rdma_req->num_outstanding_data_wr == 0);
2765 				spdk_nvmf_rdma_request_process(rtransport, rdma_req);
2766 				break;
2767 			case RDMA_WR_TYPE_RECV:
2768 				rdma_recv = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvmf_rdma_recv, rdma_wr);
2769 				rqpair = rdma_recv->qpair;
2770 
2771 				/* Dump this into the incoming queue. This gets cleaned up when
2772 				 * the queue pair disconnects or recovers. */
2773 				STAILQ_INSERT_TAIL(&rqpair->incoming_queue, rdma_recv, link);
2774 				rqpair->current_recv_depth++;
2775 
2776 				/* Don't worry about responding to recv overflow, we are disconnecting anyways */
2777 				break;
2778 			case RDMA_WR_TYPE_DATA:
2779 				/* If the data transfer fails still force the queue into the error state,
2780 				 * if we were performing an RDMA_READ, we need to force the request into a
2781 				 * completed state since it wasn't linked to a send. However, in the RDMA_WRITE
2782 				 * case, we should wait for the SEND to complete. */
2783 				rdma_req = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvmf_rdma_request, data.rdma_wr);
2784 				rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
2785 
2786 				SPDK_ERRLOG("data=%p length=%u\n", rdma_req->req.data, rdma_req->req.length);
2787 				rdma_req->num_outstanding_data_wr--;
2788 				if (rdma_req->data.wr.opcode == IBV_WR_RDMA_READ) {
2789 					assert(rdma_req->num_outstanding_data_wr > 0);
2790 					rqpair->current_read_depth--;
2791 					if (rdma_req->num_outstanding_data_wr == 0) {
2792 						rdma_req->state = RDMA_REQUEST_STATE_COMPLETED;
2793 					}
2794 				}
2795 				rqpair->current_send_depth--;
2796 				break;
2797 			case RDMA_WR_TYPE_DRAIN_RECV:
2798 				rqpair = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvmf_rdma_qpair, drain_recv_wr);
2799 				assert(rqpair->disconnect_flags & RDMA_QP_DISCONNECTING);
2800 				SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Drained QP RECV %u (%p)\n", rqpair->qpair.qid, rqpair);
2801 				rqpair->disconnect_flags |= RDMA_QP_RECV_DRAINED;
2802 				assert(rqpair->current_recv_depth == rqpair->max_queue_depth);
2803 				/* Don't worry about responding to recv overflow, we are disconnecting anyways */
2804 				if (rqpair->disconnect_flags & RDMA_QP_SEND_DRAINED) {
2805 					spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair, true);
2806 					spdk_nvmf_rdma_qpair_destroy(rqpair);
2807 				}
2808 				/* Continue so that this does not trigger the disconnect path below. */
2809 				continue;
2810 			case RDMA_WR_TYPE_DRAIN_SEND:
2811 				rqpair = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvmf_rdma_qpair, drain_send_wr);
2812 				assert(rqpair->disconnect_flags & RDMA_QP_DISCONNECTING);
2813 				SPDK_DEBUGLOG(SPDK_LOG_RDMA, "Drained QP SEND %u (%p)\n", rqpair->qpair.qid, rqpair);
2814 				rqpair->disconnect_flags |= RDMA_QP_SEND_DRAINED;
2815 				rqpair->current_send_depth--;
2816 				if (rqpair->disconnect_flags & RDMA_QP_RECV_DRAINED) {
2817 					spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair, true);
2818 					spdk_nvmf_rdma_qpair_destroy(rqpair);
2819 				}
2820 				/* Continue so that this does not trigger the disconnect path below. */
2821 				continue;
2822 			default:
2823 				SPDK_ERRLOG("Received an unknown opcode on the CQ: %d\n", wc[i].opcode);
2824 				continue;
2825 			}
2826 
2827 			if (rqpair->qpair.state == SPDK_NVMF_QPAIR_ACTIVE) {
2828 				/* Disconnect the connection. */
2829 				spdk_nvmf_rdma_start_disconnect(rqpair);
2830 			}
2831 			continue;
2832 		}
2833 
2834 		switch (wc[i].opcode) {
2835 		case IBV_WC_SEND:
2836 			assert(rdma_wr->type == RDMA_WR_TYPE_SEND);
2837 			rdma_req = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvmf_rdma_request, rsp.rdma_wr);
2838 			rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
2839 
2840 			assert(spdk_nvmf_rdma_req_is_completing(rdma_req));
2841 
2842 			rdma_req->state = RDMA_REQUEST_STATE_COMPLETED;
2843 			rqpair->current_send_depth--;
2844 			spdk_nvmf_rdma_request_process(rtransport, rdma_req);
2845 
2846 			count++;
2847 
2848 			assert(rdma_req->num_outstanding_data_wr == 0);
2849 			/* Try to process other queued requests */
2850 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair, false);
2851 			break;
2852 
2853 		case IBV_WC_RDMA_WRITE:
2854 			assert(rdma_wr->type == RDMA_WR_TYPE_DATA);
2855 			rdma_req = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvmf_rdma_request, data.rdma_wr);
2856 			rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
2857 			rqpair->current_send_depth--;
2858 			rdma_req->num_outstanding_data_wr--;
2859 
2860 			/* Try to process other queued requests */
2861 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair, false);
2862 			break;
2863 
2864 		case IBV_WC_RDMA_READ:
2865 			assert(rdma_wr->type == RDMA_WR_TYPE_DATA);
2866 			rdma_req = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvmf_rdma_request, data.rdma_wr);
2867 			rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair);
2868 			rqpair->current_send_depth--;
2869 
2870 			assert(rdma_req->state == RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER);
2871 			/* wait for all outstanding reads associated with the same rdma_req to complete before proceeding. */
2872 			assert(rdma_req->num_outstanding_data_wr > 0);
2873 			rqpair->current_read_depth--;
2874 			rdma_req->num_outstanding_data_wr--;
2875 			if (rdma_req->num_outstanding_data_wr == 0) {
2876 				rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE;
2877 				spdk_nvmf_rdma_request_process(rtransport, rdma_req);
2878 			}
2879 
2880 			/* Try to process other queued requests */
2881 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair, false);
2882 			break;
2883 
2884 		case IBV_WC_RECV:
2885 			assert(rdma_wr->type == RDMA_WR_TYPE_RECV);
2886 			rdma_recv = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvmf_rdma_recv, rdma_wr);
2887 			rqpair = rdma_recv->qpair;
2888 			/* The qpair should not send more requests than are allowed per qpair. */
2889 			if (rqpair->current_recv_depth >= rqpair->max_queue_depth) {
2890 				spdk_nvmf_rdma_start_disconnect(rqpair);
2891 			} else {
2892 				rqpair->current_recv_depth++;
2893 			}
2894 			STAILQ_INSERT_TAIL(&rqpair->incoming_queue, rdma_recv, link);
2895 			/* Try to process other queued requests */
2896 			spdk_nvmf_rdma_qpair_process_pending(rtransport, rqpair, false);
2897 			break;
2898 
2899 		default:
2900 			SPDK_ERRLOG("Received an unknown opcode on the CQ: %d\n", wc[i].opcode);
2901 			continue;
2902 		}
2903 	}
2904 
2905 	if (error == true) {
2906 		return -1;
2907 	}
2908 
2909 	return count;
2910 }
2911 
2912 static int
2913 spdk_nvmf_rdma_poll_group_poll(struct spdk_nvmf_transport_poll_group *group)
2914 {
2915 	struct spdk_nvmf_rdma_transport *rtransport;
2916 	struct spdk_nvmf_rdma_poll_group *rgroup;
2917 	struct spdk_nvmf_rdma_poller	*rpoller;
2918 	int				count, rc;
2919 
2920 	rtransport = SPDK_CONTAINEROF(group->transport, struct spdk_nvmf_rdma_transport, transport);
2921 	rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group);
2922 
2923 	count = 0;
2924 	TAILQ_FOREACH(rpoller, &rgroup->pollers, link) {
2925 		rc = spdk_nvmf_rdma_poller_poll(rtransport, rpoller);
2926 		if (rc < 0) {
2927 			return rc;
2928 		}
2929 		count += rc;
2930 	}
2931 
2932 	return count;
2933 }
2934 
2935 static int
2936 spdk_nvmf_rdma_trid_from_cm_id(struct rdma_cm_id *id,
2937 			       struct spdk_nvme_transport_id *trid,
2938 			       bool peer)
2939 {
2940 	struct sockaddr *saddr;
2941 	uint16_t port;
2942 
2943 	trid->trtype = SPDK_NVME_TRANSPORT_RDMA;
2944 
2945 	if (peer) {
2946 		saddr = rdma_get_peer_addr(id);
2947 	} else {
2948 		saddr = rdma_get_local_addr(id);
2949 	}
2950 	switch (saddr->sa_family) {
2951 	case AF_INET: {
2952 		struct sockaddr_in *saddr_in = (struct sockaddr_in *)saddr;
2953 
2954 		trid->adrfam = SPDK_NVMF_ADRFAM_IPV4;
2955 		inet_ntop(AF_INET, &saddr_in->sin_addr,
2956 			  trid->traddr, sizeof(trid->traddr));
2957 		if (peer) {
2958 			port = ntohs(rdma_get_dst_port(id));
2959 		} else {
2960 			port = ntohs(rdma_get_src_port(id));
2961 		}
2962 		snprintf(trid->trsvcid, sizeof(trid->trsvcid), "%u", port);
2963 		break;
2964 	}
2965 	case AF_INET6: {
2966 		struct sockaddr_in6 *saddr_in = (struct sockaddr_in6 *)saddr;
2967 		trid->adrfam = SPDK_NVMF_ADRFAM_IPV6;
2968 		inet_ntop(AF_INET6, &saddr_in->sin6_addr,
2969 			  trid->traddr, sizeof(trid->traddr));
2970 		if (peer) {
2971 			port = ntohs(rdma_get_dst_port(id));
2972 		} else {
2973 			port = ntohs(rdma_get_src_port(id));
2974 		}
2975 		snprintf(trid->trsvcid, sizeof(trid->trsvcid), "%u", port);
2976 		break;
2977 	}
2978 	default:
2979 		return -1;
2980 
2981 	}
2982 
2983 	return 0;
2984 }
2985 
2986 static int
2987 spdk_nvmf_rdma_qpair_get_peer_trid(struct spdk_nvmf_qpair *qpair,
2988 				   struct spdk_nvme_transport_id *trid)
2989 {
2990 	struct spdk_nvmf_rdma_qpair	*rqpair;
2991 
2992 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
2993 
2994 	return spdk_nvmf_rdma_trid_from_cm_id(rqpair->cm_id, trid, true);
2995 }
2996 
2997 static int
2998 spdk_nvmf_rdma_qpair_get_local_trid(struct spdk_nvmf_qpair *qpair,
2999 				    struct spdk_nvme_transport_id *trid)
3000 {
3001 	struct spdk_nvmf_rdma_qpair	*rqpair;
3002 
3003 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
3004 
3005 	return spdk_nvmf_rdma_trid_from_cm_id(rqpair->cm_id, trid, false);
3006 }
3007 
3008 static int
3009 spdk_nvmf_rdma_qpair_get_listen_trid(struct spdk_nvmf_qpair *qpair,
3010 				     struct spdk_nvme_transport_id *trid)
3011 {
3012 	struct spdk_nvmf_rdma_qpair	*rqpair;
3013 
3014 	rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair);
3015 
3016 	return spdk_nvmf_rdma_trid_from_cm_id(rqpair->listen_id, trid, false);
3017 }
3018 
3019 void
3020 spdk_nvmf_rdma_init_hooks(struct spdk_nvme_rdma_hooks *hooks)
3021 {
3022 	g_nvmf_hooks = *hooks;
3023 }
3024 
3025 const struct spdk_nvmf_transport_ops spdk_nvmf_transport_rdma = {
3026 	.type = SPDK_NVME_TRANSPORT_RDMA,
3027 	.opts_init = spdk_nvmf_rdma_opts_init,
3028 	.create = spdk_nvmf_rdma_create,
3029 	.destroy = spdk_nvmf_rdma_destroy,
3030 
3031 	.listen = spdk_nvmf_rdma_listen,
3032 	.stop_listen = spdk_nvmf_rdma_stop_listen,
3033 	.accept = spdk_nvmf_rdma_accept,
3034 
3035 	.listener_discover = spdk_nvmf_rdma_discover,
3036 
3037 	.poll_group_create = spdk_nvmf_rdma_poll_group_create,
3038 	.poll_group_destroy = spdk_nvmf_rdma_poll_group_destroy,
3039 	.poll_group_add = spdk_nvmf_rdma_poll_group_add,
3040 	.poll_group_poll = spdk_nvmf_rdma_poll_group_poll,
3041 
3042 	.req_free = spdk_nvmf_rdma_request_free,
3043 	.req_complete = spdk_nvmf_rdma_request_complete,
3044 
3045 	.qpair_fini = spdk_nvmf_rdma_close_qpair,
3046 	.qpair_is_idle = spdk_nvmf_rdma_qpair_is_idle,
3047 	.qpair_get_peer_trid = spdk_nvmf_rdma_qpair_get_peer_trid,
3048 	.qpair_get_local_trid = spdk_nvmf_rdma_qpair_get_local_trid,
3049 	.qpair_get_listen_trid = spdk_nvmf_rdma_qpair_get_listen_trid,
3050 
3051 };
3052 
3053 SPDK_LOG_REGISTER_COMPONENT("rdma", SPDK_LOG_RDMA)
3054