xref: /spdk/lib/nvmf/rdma.c (revision 04c48172b9879a8824de83c842087d871c433d12)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include <arpa/inet.h>
35 #include <fcntl.h>
36 #include <errno.h>
37 #include <infiniband/verbs.h>
38 #include <rdma/rdma_cma.h>
39 #include <rdma/rdma_verbs.h>
40 #include <unistd.h>
41 #include <stdio.h>
42 #include <stdint.h>
43 
44 #include "nvmf_internal.h"
45 #include "request.h"
46 #include "session.h"
47 #include "subsystem.h"
48 #include "transport.h"
49 #include "spdk/assert.h"
50 #include "spdk/log.h"
51 #include "spdk/nvmf.h"
52 #include "spdk/nvmf_spec.h"
53 #include "spdk/string.h"
54 #include "spdk/trace.h"
55 
56 /*
57  RDMA Connection Resouce Defaults
58  */
59 #define NVMF_DEFAULT_TX_SGE		1
60 #define NVMF_DEFAULT_RX_SGE		2
61 
62 struct spdk_nvmf_rdma_buf {
63 	SLIST_ENTRY(spdk_nvmf_rdma_buf) link;
64 };
65 
66 struct spdk_nvmf_rdma_request {
67 	struct spdk_nvmf_request		req;
68 
69 	/* In Capsule data buffer */
70 	uint8_t					*buf;
71 
72 	union {
73 		struct ibv_recv_wr 		recv;
74 		struct ibv_send_wr		send;
75 	} wr;
76 	struct ibv_sge 				sg_list[2];
77 
78 	TAILQ_ENTRY(spdk_nvmf_rdma_request)	link;
79 };
80 
81 struct spdk_nvmf_rdma_conn {
82 	struct spdk_nvmf_conn			conn;
83 
84 	struct rdma_cm_id			*cm_id;
85 	struct ibv_cq				*cq;
86 
87 	/* The maximum number of I/O outstanding on this connection at one time */
88 	uint16_t				max_queue_depth;
89 
90 	/* The maximum number of active RDMA READ and WRITE operations at one time */
91 	uint16_t				max_rw_depth;
92 
93 	/* The current number of I/O outstanding on this connection. This number
94 	 * includes all I/O from the time the capsule is first received until it is
95 	 * completed.
96 	 */
97 	uint16_t				cur_queue_depth;
98 
99 	/* The number of RDMA READ and WRITE requests that are outstanding */
100 	uint16_t				cur_rdma_rw_depth;
101 
102 	/* Requests that are waiting to obtain a data buffer */
103 	TAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_data_buf_queue;
104 
105 	/* Requests that are waiting to perform an RDMA READ or WRITE */
106 	TAILQ_HEAD(, spdk_nvmf_rdma_request)	pending_rdma_rw_queue;
107 
108 	/* Array of size "max_queue_depth" containing RDMA requests. */
109 	struct spdk_nvmf_rdma_request		*reqs;
110 
111 	/* Array of size "max_queue_depth" containing 64 byte capsules
112 	 * used for receive.
113 	 */
114 	union nvmf_h2c_msg			*cmds;
115 	struct ibv_mr				*cmds_mr;
116 
117 	/* Array of size "max_queue_depth" containing 16 byte completions
118 	 * to be sent back to the user.
119 	 */
120 	union nvmf_c2h_msg			*cpls;
121 	struct ibv_mr				*cpls_mr;
122 
123 	/* Array of size "max_queue_depth * InCapsuleDataSize" containing
124 	 * buffers to be used for in capsule data.
125 	 */
126 	void					*bufs;
127 	struct ibv_mr				*bufs_mr;
128 
129 	TAILQ_ENTRY(spdk_nvmf_rdma_conn)	link;
130 };
131 
132 /* List of RDMA connections that have not yet received a CONNECT capsule */
133 static TAILQ_HEAD(, spdk_nvmf_rdma_conn) g_pending_conns = TAILQ_HEAD_INITIALIZER(g_pending_conns);
134 
135 struct spdk_nvmf_rdma_session {
136 	struct spdk_nvmf_session		session;
137 
138 	SLIST_HEAD(, spdk_nvmf_rdma_buf)	data_buf_pool;
139 
140 	struct ibv_context			*verbs;
141 
142 	uint8_t					*buf;
143 	struct ibv_mr				*buf_mr;
144 };
145 
146 struct spdk_nvmf_rdma_listen_addr {
147 	char					*traddr;
148 	char					*trsvcid;
149 	struct rdma_cm_id			*id;
150 	struct ibv_device_attr 			attr;
151 	struct ibv_comp_channel			*comp_channel;
152 	TAILQ_ENTRY(spdk_nvmf_rdma_listen_addr)	link;
153 };
154 
155 struct spdk_nvmf_rdma {
156 	struct rdma_event_channel	*event_channel;
157 
158 	pthread_mutex_t 		lock;
159 
160 	uint16_t 			max_queue_depth;
161 	uint32_t 			max_io_size;
162 	uint32_t 			in_capsule_data_size;
163 
164 	TAILQ_HEAD(, spdk_nvmf_rdma_listen_addr)	listen_addrs;
165 };
166 
167 static struct spdk_nvmf_rdma g_rdma = {
168 	.lock = PTHREAD_MUTEX_INITIALIZER,
169 	.listen_addrs = TAILQ_HEAD_INITIALIZER(g_rdma.listen_addrs),
170 };
171 
172 static inline struct spdk_nvmf_rdma_conn *
173 get_rdma_conn(struct spdk_nvmf_conn *conn)
174 {
175 	return (struct spdk_nvmf_rdma_conn *)((uintptr_t)conn - offsetof(struct spdk_nvmf_rdma_conn, conn));
176 }
177 
178 static inline struct spdk_nvmf_rdma_request *
179 get_rdma_req(struct spdk_nvmf_request *req)
180 {
181 	return (struct spdk_nvmf_rdma_request *)((uintptr_t)req - offsetof(struct spdk_nvmf_rdma_request,
182 			req));
183 }
184 
185 static inline struct spdk_nvmf_rdma_session *
186 get_rdma_sess(struct spdk_nvmf_session *sess)
187 {
188 	return (struct spdk_nvmf_rdma_session *)((uintptr_t)sess - offsetof(struct spdk_nvmf_rdma_session,
189 			session));
190 }
191 
192 static int nvmf_post_rdma_recv(struct spdk_nvmf_request *req);
193 
194 static void
195 spdk_nvmf_rdma_conn_destroy(struct spdk_nvmf_rdma_conn *rdma_conn)
196 {
197 	if (rdma_conn->cmds_mr) {
198 		ibv_dereg_mr(rdma_conn->cmds_mr);
199 	}
200 
201 	if (rdma_conn->cpls_mr) {
202 		ibv_dereg_mr(rdma_conn->cpls_mr);
203 	}
204 
205 	if (rdma_conn->bufs_mr) {
206 		ibv_dereg_mr(rdma_conn->bufs_mr);
207 	}
208 
209 	if (rdma_conn->cm_id) {
210 		rdma_destroy_qp(rdma_conn->cm_id);
211 		rdma_destroy_id(rdma_conn->cm_id);
212 	}
213 
214 	if (rdma_conn->cq) {
215 		ibv_destroy_cq(rdma_conn->cq);
216 	}
217 
218 	/* Free all memory */
219 	spdk_free(rdma_conn->cmds);
220 	spdk_free(rdma_conn->cpls);
221 	spdk_free(rdma_conn->bufs);
222 	free(rdma_conn->reqs);
223 	free(rdma_conn);
224 }
225 
226 static struct spdk_nvmf_rdma_conn *
227 spdk_nvmf_rdma_conn_create(struct rdma_cm_id *id, struct ibv_comp_channel *channel,
228 			   uint16_t max_queue_depth, uint16_t max_rw_depth)
229 {
230 	struct spdk_nvmf_rdma_conn	*rdma_conn;
231 	struct spdk_nvmf_conn		*conn;
232 	int				rc, i;
233 	struct ibv_qp_init_attr		attr;
234 	struct spdk_nvmf_rdma_request	*rdma_req;
235 
236 	rdma_conn = calloc(1, sizeof(struct spdk_nvmf_rdma_conn));
237 	if (rdma_conn == NULL) {
238 		SPDK_ERRLOG("Could not allocate new connection.\n");
239 		return NULL;
240 	}
241 
242 	rdma_conn->max_queue_depth = max_queue_depth;
243 	rdma_conn->max_rw_depth = max_rw_depth;
244 	TAILQ_INIT(&rdma_conn->pending_data_buf_queue);
245 	TAILQ_INIT(&rdma_conn->pending_rdma_rw_queue);
246 
247 	rdma_conn->cq = ibv_create_cq(id->verbs, max_queue_depth * 2, rdma_conn, channel, 0);
248 	if (!rdma_conn->cq) {
249 		SPDK_ERRLOG("Unable to create completion queue\n");
250 		SPDK_ERRLOG("Completion Channel: %p Id: %p Verbs: %p\n", channel, id, id->verbs);
251 		SPDK_ERRLOG("Errno %d: %s\n", errno, strerror(errno));
252 		rdma_destroy_id(id);
253 		spdk_nvmf_rdma_conn_destroy(rdma_conn);
254 		return NULL;
255 	}
256 
257 	memset(&attr, 0, sizeof(struct ibv_qp_init_attr));
258 	attr.qp_type		= IBV_QPT_RC;
259 	attr.send_cq		= rdma_conn->cq;
260 	attr.recv_cq		= rdma_conn->cq;
261 	attr.cap.max_send_wr	= max_queue_depth; /* SEND, READ, and WRITE operations */
262 	attr.cap.max_recv_wr	= max_queue_depth; /* RECV operations */
263 	attr.cap.max_send_sge	= NVMF_DEFAULT_TX_SGE;
264 	attr.cap.max_recv_sge	= NVMF_DEFAULT_RX_SGE;
265 
266 	rc = rdma_create_qp(id, NULL, &attr);
267 	if (rc) {
268 		SPDK_ERRLOG("rdma_create_qp failed\n");
269 		SPDK_ERRLOG("Errno %d: %s\n", errno, strerror(errno));
270 		rdma_destroy_id(id);
271 		spdk_nvmf_rdma_conn_destroy(rdma_conn);
272 		return NULL;
273 	}
274 
275 	conn = &rdma_conn->conn;
276 	conn->transport = &spdk_nvmf_transport_rdma;
277 	id->context = conn;
278 	rdma_conn->cm_id = id;
279 
280 	SPDK_TRACELOG(SPDK_TRACE_RDMA, "New RDMA Connection: %p\n", conn);
281 
282 	rdma_conn->reqs = calloc(max_queue_depth, sizeof(*rdma_conn->reqs));
283 	rdma_conn->cmds = spdk_zmalloc(max_queue_depth * sizeof(*rdma_conn->cmds),
284 				       0x1000, NULL);
285 	rdma_conn->cpls = spdk_zmalloc(max_queue_depth * sizeof(*rdma_conn->cpls),
286 				       0x1000, NULL);
287 	rdma_conn->bufs = spdk_zmalloc(max_queue_depth * g_rdma.in_capsule_data_size,
288 				       0x1000, NULL);
289 	if (!rdma_conn->reqs || !rdma_conn->cmds || !rdma_conn->cpls || !rdma_conn->bufs) {
290 		SPDK_ERRLOG("Unable to allocate sufficient memory for RDMA queue.\n");
291 		spdk_nvmf_rdma_conn_destroy(rdma_conn);
292 		return NULL;
293 	}
294 
295 	rdma_conn->cmds_mr = ibv_reg_mr(id->pd, rdma_conn->cmds,
296 					max_queue_depth * sizeof(*rdma_conn->cmds),
297 					IBV_ACCESS_LOCAL_WRITE);
298 	rdma_conn->cpls_mr = ibv_reg_mr(id->pd, rdma_conn->cpls,
299 					max_queue_depth * sizeof(*rdma_conn->cpls),
300 					0);
301 	rdma_conn->bufs_mr = ibv_reg_mr(id->pd, rdma_conn->bufs,
302 					max_queue_depth * g_rdma.in_capsule_data_size,
303 					IBV_ACCESS_LOCAL_WRITE |
304 					IBV_ACCESS_REMOTE_WRITE);
305 	if (!rdma_conn->cmds_mr || !rdma_conn->cpls_mr || !rdma_conn->bufs_mr) {
306 		SPDK_ERRLOG("Unable to register required memory for RDMA queue.\n");
307 		spdk_nvmf_rdma_conn_destroy(rdma_conn);
308 		return NULL;
309 	}
310 	SPDK_TRACELOG(SPDK_TRACE_RDMA, "Command Array: %p Length: %lx LKey: %x\n",
311 		      rdma_conn->cmds, max_queue_depth * sizeof(*rdma_conn->cmds), rdma_conn->cmds_mr->lkey);
312 	SPDK_TRACELOG(SPDK_TRACE_RDMA, "Completion Array: %p Length: %lx LKey: %x\n",
313 		      rdma_conn->cpls, max_queue_depth * sizeof(*rdma_conn->cpls), rdma_conn->cpls_mr->lkey);
314 	SPDK_TRACELOG(SPDK_TRACE_RDMA, "In Capsule Data Array: %p Length: %x LKey: %x\n",
315 		      rdma_conn->bufs, max_queue_depth * g_rdma.in_capsule_data_size, rdma_conn->bufs_mr->lkey);
316 
317 	for (i = 0; i < max_queue_depth; i++) {
318 		rdma_req = &rdma_conn->reqs[i];
319 		rdma_req->buf = (void *)((uintptr_t)rdma_conn->bufs + (i * g_rdma.in_capsule_data_size));
320 		rdma_req->req.cmd = &rdma_conn->cmds[i];
321 		rdma_req->req.rsp = &rdma_conn->cpls[i];
322 		rdma_req->req.conn = &rdma_conn->conn;
323 
324 		if (nvmf_post_rdma_recv(&rdma_req->req)) {
325 			SPDK_ERRLOG("Unable to post capsule for RDMA RECV\n");
326 			spdk_nvmf_rdma_conn_destroy(rdma_conn);
327 			return NULL;
328 		}
329 	}
330 
331 	return rdma_conn;
332 }
333 
334 static inline void
335 nvmf_trace_ibv_sge(struct ibv_sge *sg_list)
336 {
337 	if (sg_list) {
338 		SPDK_TRACELOG(SPDK_TRACE_RDMA, "local addr %p length 0x%x lkey 0x%x\n",
339 			      (void *)sg_list->addr, sg_list->length, sg_list->lkey);
340 	}
341 }
342 
343 static inline void
344 nvmf_ibv_send_wr_init(struct ibv_send_wr *wr,
345 		      struct spdk_nvmf_request *req,
346 		      struct ibv_sge *sg_list,
347 		      enum ibv_wr_opcode opcode,
348 		      int send_flags)
349 {
350 	struct spdk_nvmf_rdma_request *rdma_req = get_rdma_req(req);
351 	assert(wr != NULL);
352 	assert(sg_list != NULL);
353 
354 	memset(wr, 0, sizeof(*wr));
355 
356 	wr->wr_id = (uint64_t)rdma_req;
357 	wr->opcode = opcode;
358 	wr->send_flags = send_flags;
359 	wr->sg_list = sg_list;
360 	wr->num_sge = 1;
361 }
362 
363 static inline void
364 nvmf_ibv_send_wr_set_rkey(struct ibv_send_wr *wr, struct spdk_nvmf_request *req)
365 {
366 	struct spdk_nvme_sgl_descriptor *sgl = &req->cmd->nvme_cmd.dptr.sgl1;
367 
368 	assert(sgl->generic.type == SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK);
369 
370 	wr->wr.rdma.rkey = sgl->keyed.key;
371 	wr->wr.rdma.remote_addr = sgl->address;
372 
373 	SPDK_TRACELOG(SPDK_TRACE_RDMA, "rkey %x remote_addr %p\n",
374 		      wr->wr.rdma.rkey, (void *)wr->wr.rdma.remote_addr);
375 }
376 
377 static int
378 nvmf_post_rdma_read(struct spdk_nvmf_request *req)
379 {
380 	struct ibv_send_wr	*bad_wr = NULL;
381 	struct spdk_nvmf_conn 	*conn = req->conn;
382 	struct spdk_nvmf_rdma_request 	*rdma_req = get_rdma_req(req);
383 	struct spdk_nvmf_rdma_conn 	*rdma_conn = get_rdma_conn(conn);
384 	struct spdk_nvmf_rdma_session 	*rdma_sess;
385 	int 			rc;
386 
387 	SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA READ POSTED. Request: %p Connection: %p\n", req, conn);
388 
389 	rdma_req->sg_list[0].addr = (uintptr_t)req->data;
390 	if (req->length > g_rdma.in_capsule_data_size) {
391 		rdma_sess = get_rdma_sess(conn->sess);
392 		rdma_req->sg_list[0].lkey = rdma_sess->buf_mr->lkey;
393 	} else {
394 		rdma_req->sg_list[0].lkey = rdma_conn->bufs_mr->lkey;
395 	}
396 	rdma_req->sg_list[0].length = req->length;
397 	nvmf_trace_ibv_sge(&rdma_req->sg_list[0]);
398 
399 	nvmf_ibv_send_wr_init(&rdma_req->wr.send, req, rdma_req->sg_list, IBV_WR_RDMA_READ,
400 			      IBV_SEND_SIGNALED);
401 	nvmf_ibv_send_wr_set_rkey(&rdma_req->wr.send, req);
402 
403 	spdk_trace_record(TRACE_RDMA_READ_START, 0, 0, (uintptr_t)req, 0);
404 	rc = ibv_post_send(rdma_conn->cm_id->qp, &rdma_req->wr.send, &bad_wr);
405 	if (rc) {
406 		SPDK_ERRLOG("Failure posting rdma read send, rc = 0x%x\n", rc);
407 	}
408 
409 	return rc;
410 }
411 
412 static int
413 nvmf_post_rdma_write(struct spdk_nvmf_request *req)
414 {
415 	struct ibv_send_wr	*bad_wr = NULL;
416 	struct spdk_nvmf_conn 	*conn = req->conn;
417 	struct spdk_nvmf_rdma_request 	*rdma_req = get_rdma_req(req);
418 	struct spdk_nvmf_rdma_conn 	*rdma_conn = get_rdma_conn(conn);
419 	struct spdk_nvmf_rdma_session 	*rdma_sess;
420 	int 			rc;
421 
422 	SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA WRITE POSTED. Request: %p Connection: %p\n", req, conn);
423 
424 	rdma_req->sg_list[0].addr = (uintptr_t)req->data;
425 	if (req->length > g_rdma.in_capsule_data_size) {
426 		rdma_sess = get_rdma_sess(conn->sess);
427 		rdma_req->sg_list[0].lkey = rdma_sess->buf_mr->lkey;
428 	} else {
429 		rdma_req->sg_list[0].lkey = rdma_conn->bufs_mr->lkey;
430 	}
431 	rdma_req->sg_list[0].length = req->length;
432 	nvmf_trace_ibv_sge(&rdma_req->sg_list[0]);
433 
434 	nvmf_ibv_send_wr_init(&rdma_req->wr.send, req, rdma_req->sg_list, IBV_WR_RDMA_WRITE,
435 			      IBV_SEND_SIGNALED);
436 	nvmf_ibv_send_wr_set_rkey(&rdma_req->wr.send, req);
437 
438 	spdk_trace_record(TRACE_RDMA_WRITE_START, 0, 0, (uintptr_t)req, 0);
439 	rc = ibv_post_send(rdma_conn->cm_id->qp, &rdma_req->wr.send, &bad_wr);
440 	if (rc) {
441 		SPDK_ERRLOG("Failure posting rdma write send, rc = 0x%x\n", rc);
442 	}
443 
444 	return rc;
445 }
446 
447 static int
448 nvmf_post_rdma_recv(struct spdk_nvmf_request *req)
449 {
450 	struct ibv_recv_wr *bad_wr = NULL;
451 	struct spdk_nvmf_conn *conn = req->conn;
452 	struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
453 	struct spdk_nvmf_rdma_request *rdma_req = get_rdma_req(req);
454 	int rc;
455 
456 	SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA RECV POSTED. Request: %p Connection: %p\n", req, conn);
457 
458 	rdma_req->sg_list[0].addr = (uintptr_t)req->cmd;
459 	rdma_req->sg_list[0].length = sizeof(*req->cmd);
460 	rdma_req->sg_list[0].lkey = rdma_conn->cmds_mr->lkey;
461 	nvmf_trace_ibv_sge(&rdma_req->sg_list[0]);
462 
463 	rdma_req->sg_list[1].addr = (uintptr_t)rdma_req->buf;
464 	rdma_req->sg_list[1].length = g_rdma.in_capsule_data_size;
465 	rdma_req->sg_list[1].lkey = rdma_conn->bufs_mr->lkey;
466 	nvmf_trace_ibv_sge(&rdma_req->sg_list[1]);
467 
468 	memset(&rdma_req->wr.recv, 0, sizeof(struct ibv_recv_wr));
469 	rdma_req->wr.recv.wr_id = (uintptr_t)rdma_req;
470 	rdma_req->wr.recv.next = NULL;
471 	rdma_req->wr.recv.sg_list = rdma_req->sg_list;
472 	rdma_req->wr.recv.num_sge = 2;
473 
474 	rc = ibv_post_recv(rdma_conn->cm_id->qp, &rdma_req->wr.recv, &bad_wr);
475 	if (rc) {
476 		SPDK_ERRLOG("Failure posting rdma recv, rc = 0x%x\n", rc);
477 	}
478 
479 	return rc;
480 }
481 
482 static int
483 nvmf_post_rdma_send(struct spdk_nvmf_request *req)
484 {
485 	struct ibv_send_wr	*bad_wr = NULL;
486 	struct spdk_nvmf_conn 	*conn = req->conn;
487 	struct spdk_nvmf_rdma_request 	*rdma_req = get_rdma_req(req);
488 	struct spdk_nvmf_rdma_conn 	*rdma_conn = get_rdma_conn(conn);
489 	int 			rc;
490 
491 	SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA SEND POSTED. Request: %p Connection: %p\n", req, conn);
492 
493 	rdma_req->sg_list[0].addr = (uintptr_t)req->rsp;
494 	rdma_req->sg_list[0].length = sizeof(*req->rsp);
495 	rdma_req->sg_list[0].lkey = rdma_conn->cpls_mr->lkey;
496 	nvmf_trace_ibv_sge(&rdma_req->sg_list[0]);
497 
498 	nvmf_ibv_send_wr_init(&rdma_req->wr.send, req, rdma_req->sg_list, IBV_WR_SEND, IBV_SEND_SIGNALED);
499 
500 	spdk_trace_record(TRACE_NVMF_IO_COMPLETE, 0, 0, (uintptr_t)req, 0);
501 	rc = ibv_post_send(rdma_conn->cm_id->qp, &rdma_req->wr.send, &bad_wr);
502 	if (rc) {
503 		SPDK_ERRLOG("Failure posting rdma send for NVMf completion, rc = 0x%x\n", rc);
504 	}
505 
506 	return rc;
507 }
508 
509 /**
510  * REQUEST COMPLETION HANDLING
511  *
512  * Request completion consists of three steps:
513  *
514  * 1) Transfer any data to the host using an RDMA Write. If no data or an NVMe write,
515  *    this step is unnecessary. (spdk_nvmf_rdma_request_transfer_data)
516  * 2) Upon transfer completion, update sq_head, re-post the recv capsule,
517  *    and send the completion. (spdk_nvmf_rdma_request_send_completion)
518  * 3) Upon getting acknowledgement of the completion, decrement the internal
519  *    count of number of outstanding requests. (spdk_nvmf_rdma_request_ack_completion)
520  *
521  * There are two public interfaces to initiate the process of completing a request,
522  * exposed as callbacks in the transport layer.
523  *
524  * 1) spdk_nvmf_rdma_request_complete, which attempts to do all three steps.
525  * 2) spdk_nvmf_rdma_request_release, which skips straight to step 3.
526 **/
527 
528 static int
529 spdk_nvmf_rdma_request_transfer_data(struct spdk_nvmf_request *req)
530 {
531 	int rc;
532 	struct spdk_nvmf_rdma_request *rdma_req = get_rdma_req(req);
533 	struct spdk_nvmf_conn *conn = req->conn;
534 	struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
535 
536 	assert(req->xfer != SPDK_NVME_DATA_NONE);
537 
538 	if (rdma_conn->cur_rdma_rw_depth < rdma_conn->max_rw_depth) {
539 		if (req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
540 			rc = nvmf_post_rdma_write(req);
541 			if (rc) {
542 				SPDK_ERRLOG("Unable to transfer data from target to host\n");
543 				return -1;
544 			}
545 		} else if (req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) {
546 			rc = nvmf_post_rdma_read(req);
547 			if (rc) {
548 				SPDK_ERRLOG("Unable to transfer data from host to target\n");
549 				return -1;
550 			}
551 		}
552 		rdma_conn->cur_rdma_rw_depth++;
553 	} else {
554 		TAILQ_INSERT_TAIL(&rdma_conn->pending_rdma_rw_queue, rdma_req, link);
555 	}
556 
557 	return 0;
558 }
559 
560 static int
561 spdk_nvmf_rdma_request_send_completion(struct spdk_nvmf_request *req)
562 {
563 	int rc;
564 	struct spdk_nvmf_conn		*conn = req->conn;
565 	struct spdk_nvme_cpl		*rsp = &req->rsp->nvme_cpl;
566 	struct spdk_nvmf_rdma_session	*rdma_sess;
567 	struct spdk_nvmf_rdma_buf	*buf;
568 
569 	if (req->length > g_rdma.in_capsule_data_size) {
570 		/* Put the buffer back in the pool */
571 		rdma_sess = get_rdma_sess(conn->sess);
572 		buf = req->data;
573 
574 		SLIST_INSERT_HEAD(&rdma_sess->data_buf_pool, buf, link);
575 		req->data = NULL;
576 		req->length = 0;
577 	}
578 
579 	/* Advance our sq_head pointer */
580 	if (conn->sq_head == conn->sq_head_max) {
581 		conn->sq_head = 0;
582 	} else {
583 		conn->sq_head++;
584 	}
585 	rsp->sqhd = conn->sq_head;
586 
587 	/* Post the capsule to the recv buffer */
588 	rc = nvmf_post_rdma_recv(req);
589 	if (rc) {
590 		SPDK_ERRLOG("Unable to re-post rx descriptor\n");
591 		return rc;
592 	}
593 
594 	/* Send the completion */
595 	rc = nvmf_post_rdma_send(req);
596 	if (rc) {
597 		SPDK_ERRLOG("Unable to send response capsule\n");
598 	}
599 
600 	return rc;
601 }
602 
603 static int
604 spdk_nvmf_rdma_request_ack_completion(struct spdk_nvmf_request *req)
605 {
606 	struct spdk_nvmf_conn *conn = req->conn;
607 	struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
608 
609 	/* Advance our sq_head pointer */
610 	if (conn->sq_head == conn->sq_head_max) {
611 		conn->sq_head = 0;
612 	} else {
613 		conn->sq_head++;
614 	}
615 
616 	rdma_conn->cur_queue_depth--;
617 
618 	return 0;
619 }
620 
621 static int
622 nvmf_rdma_connect(struct rdma_cm_event *event)
623 {
624 	struct spdk_nvmf_rdma_conn	*rdma_conn = NULL;
625 	struct spdk_nvmf_rdma_listen_addr *addr;
626 	struct rdma_conn_param		*rdma_param = NULL;
627 	struct rdma_conn_param		ctrlr_event_data;
628 	const struct spdk_nvmf_rdma_request_private_data *private_data = NULL;
629 	struct spdk_nvmf_rdma_accept_private_data accept_data;
630 	uint16_t			sts = 0;
631 	uint16_t			max_queue_depth;
632 	uint16_t			max_rw_depth;
633 	int 				rc;
634 
635 	if (event->id == NULL) {
636 		SPDK_ERRLOG("connect request: missing cm_id\n");
637 		goto err0;
638 	}
639 
640 	if (event->id->verbs == NULL) {
641 		SPDK_ERRLOG("connect request: missing cm_id ibv_context\n");
642 		goto err0;
643 	}
644 	SPDK_TRACELOG(SPDK_TRACE_RDMA, "Connect Recv on fabric intf name %s, dev_name %s\n",
645 		      event->id->verbs->device->name, event->id->verbs->device->dev_name);
646 
647 	addr = event->listen_id->context;
648 	SPDK_TRACELOG(SPDK_TRACE_RDMA, "Listen Id was %p with verbs %p. ListenAddr: %p\n",
649 		      event->listen_id, event->listen_id->verbs, addr);
650 
651 	/* Figure out the supported queue depth. This is a multi-step process
652 	 * that takes into account hardware maximums, host provided values,
653 	 * and our target's internal memory limits */
654 
655 	SPDK_TRACELOG(SPDK_TRACE_RDMA, "Calculating Queue Depth\n");
656 
657 	/* Start with the maximum queue depth allowed by the target */
658 	max_queue_depth = g_rdma.max_queue_depth;
659 	max_rw_depth = g_rdma.max_queue_depth;
660 	SPDK_TRACELOG(SPDK_TRACE_RDMA, "Target Max Queue Depth: %d\n", g_rdma.max_queue_depth);
661 
662 	/* Next check the local NIC's hardware limitations */
663 	SPDK_TRACELOG(SPDK_TRACE_RDMA,
664 		      "Local NIC Max Send/Recv Queue Depth: %d Max Read/Write Queue Depth: %d\n",
665 		      addr->attr.max_qp_wr, addr->attr.max_qp_rd_atom);
666 	max_queue_depth = nvmf_min(max_queue_depth, addr->attr.max_qp_wr);
667 	max_rw_depth = nvmf_min(max_rw_depth, addr->attr.max_qp_rd_atom);
668 
669 	/* Next check the remote NIC's hardware limitations */
670 	rdma_param = &event->param.conn;
671 	SPDK_TRACELOG(SPDK_TRACE_RDMA,
672 		      "Host NIC Max Incoming RDMA R/W operations: %d Max Outgoing RDMA R/W operations: %d\n",
673 		      rdma_param->initiator_depth, rdma_param->responder_resources);
674 	if (rdma_param->initiator_depth > 0) {
675 		max_rw_depth = nvmf_min(max_rw_depth, rdma_param->initiator_depth);
676 	}
677 
678 	/* Finally check for the host software requested values, which are
679 	 * optional. */
680 	if (rdma_param->private_data != NULL &&
681 	    rdma_param->private_data_len >= sizeof(struct spdk_nvmf_rdma_request_private_data)) {
682 		private_data = rdma_param->private_data;
683 		SPDK_TRACELOG(SPDK_TRACE_RDMA, "Host Receive Queue Size: %d\n", private_data->hrqsize);
684 		SPDK_TRACELOG(SPDK_TRACE_RDMA, "Host Send Queue Size: %d\n", private_data->hsqsize);
685 		max_queue_depth = nvmf_min(max_queue_depth, private_data->hrqsize);
686 		max_queue_depth = nvmf_min(max_queue_depth, private_data->hsqsize);
687 	}
688 
689 	SPDK_TRACELOG(SPDK_TRACE_RDMA, "Final Negotiated Queue Depth: %d R/W Depth: %d\n",
690 		      max_queue_depth, max_rw_depth);
691 
692 	/* Init the NVMf rdma transport connection */
693 	rdma_conn = spdk_nvmf_rdma_conn_create(event->id, addr->comp_channel, max_queue_depth,
694 					       max_rw_depth);
695 	if (rdma_conn == NULL) {
696 		SPDK_ERRLOG("Error on nvmf connection creation\n");
697 		goto err1;
698 	}
699 
700 	accept_data.recfmt = 0;
701 	accept_data.crqsize = max_queue_depth;
702 	ctrlr_event_data = *rdma_param;
703 	ctrlr_event_data.private_data = &accept_data;
704 	ctrlr_event_data.private_data_len = sizeof(accept_data);
705 	if (event->id->ps == RDMA_PS_TCP) {
706 		ctrlr_event_data.responder_resources = 0; /* We accept 0 reads from the host */
707 		ctrlr_event_data.initiator_depth = max_rw_depth;
708 	}
709 
710 	rc = rdma_accept(event->id, &ctrlr_event_data);
711 	if (rc) {
712 		SPDK_ERRLOG("Error on rdma_accept\n");
713 		goto err2;
714 	}
715 	SPDK_TRACELOG(SPDK_TRACE_RDMA, "Sent back the accept\n");
716 
717 	/* Add this RDMA connection to the global list until a CONNECT capsule
718 	 * is received. */
719 	TAILQ_INSERT_TAIL(&g_pending_conns, rdma_conn, link);
720 
721 	return 0;
722 
723 err2:
724 	spdk_nvmf_rdma_conn_destroy(rdma_conn);
725 
726 err1: {
727 		struct spdk_nvmf_rdma_reject_private_data rej_data;
728 
729 		rej_data.status.sc = sts;
730 		rdma_reject(event->id, &ctrlr_event_data, sizeof(rej_data));
731 	}
732 err0:
733 	return -1;
734 }
735 
736 static int
737 nvmf_rdma_disconnect(struct rdma_cm_event *evt)
738 {
739 	struct spdk_nvmf_conn		*conn;
740 	struct spdk_nvmf_session	*session;
741 	struct spdk_nvmf_subsystem	*subsystem;
742 	struct spdk_nvmf_rdma_conn 	*rdma_conn;
743 
744 	if (evt->id == NULL) {
745 		SPDK_ERRLOG("disconnect request: missing cm_id\n");
746 		return -1;
747 	}
748 
749 	conn = evt->id->context;
750 	if (conn == NULL) {
751 		SPDK_ERRLOG("disconnect request: no active connection\n");
752 		return -1;
753 	}
754 	/* ack the disconnect event before rdma_destroy_id */
755 	rdma_ack_cm_event(evt);
756 
757 	rdma_conn = get_rdma_conn(conn);
758 
759 	session = conn->sess;
760 	if (session == NULL) {
761 		/* No session has been established yet. That means the conn
762 		 * must be in the pending connections list. Remove it. */
763 		TAILQ_REMOVE(&g_pending_conns, rdma_conn, link);
764 		spdk_nvmf_rdma_conn_destroy(rdma_conn);
765 		return 0;
766 	}
767 
768 	subsystem = session->subsys;
769 
770 	subsystem->disconnect_cb(subsystem->cb_ctx, conn);
771 
772 	return 0;
773 }
774 
775 #ifdef DEBUG
776 static const char *CM_EVENT_STR[] = {
777 	"RDMA_CM_EVENT_ADDR_RESOLVED",
778 	"RDMA_CM_EVENT_ADDR_ERROR",
779 	"RDMA_CM_EVENT_ROUTE_RESOLVED",
780 	"RDMA_CM_EVENT_ROUTE_ERROR",
781 	"RDMA_CM_EVENT_CONNECT_REQUEST",
782 	"RDMA_CM_EVENT_CONNECT_RESPONSE",
783 	"RDMA_CM_EVENT_CONNECT_ERROR",
784 	"RDMA_CM_EVENT_UNREACHABLE",
785 	"RDMA_CM_EVENT_REJECTED",
786 	"RDMA_CM_EVENT_ESTABLISHED",
787 	"RDMA_CM_EVENT_DISCONNECTED",
788 	"RDMA_CM_EVENT_DEVICE_REMOVAL",
789 	"RDMA_CM_EVENT_MULTICAST_JOIN",
790 	"RDMA_CM_EVENT_MULTICAST_ERROR",
791 	"RDMA_CM_EVENT_ADDR_CHANGE",
792 	"RDMA_CM_EVENT_TIMEWAIT_EXIT"
793 };
794 #endif /* DEBUG */
795 
796 typedef enum _spdk_nvmf_request_prep_type {
797 	SPDK_NVMF_REQUEST_PREP_ERROR = -1,
798 	SPDK_NVMF_REQUEST_PREP_READY = 0,
799 	SPDK_NVMF_REQUEST_PREP_PENDING_BUFFER = 1,
800 	SPDK_NVMF_REQUEST_PREP_PENDING_DATA = 2,
801 } spdk_nvmf_request_prep_type;
802 
803 static spdk_nvmf_request_prep_type
804 spdk_nvmf_request_prep_data(struct spdk_nvmf_request *req)
805 {
806 	struct spdk_nvme_cmd		*cmd = &req->cmd->nvme_cmd;
807 	struct spdk_nvme_cpl		*rsp = &req->rsp->nvme_cpl;
808 	struct spdk_nvmf_rdma_request	*rdma_req = get_rdma_req(req);
809 	struct spdk_nvmf_rdma_session	*rdma_sess;
810 	struct spdk_nvme_sgl_descriptor *sgl;
811 
812 	req->length = 0;
813 	req->data = NULL;
814 
815 	if (cmd->opc == SPDK_NVME_OPC_FABRIC) {
816 		req->xfer = spdk_nvme_opc_get_data_transfer(req->cmd->nvmf_cmd.fctype);
817 	} else {
818 		req->xfer = spdk_nvme_opc_get_data_transfer(cmd->opc);
819 	}
820 
821 	if (req->xfer == SPDK_NVME_DATA_NONE) {
822 		return SPDK_NVMF_REQUEST_PREP_READY;
823 	}
824 
825 	sgl = &cmd->dptr.sgl1;
826 
827 	if (sgl->generic.type == SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK &&
828 	    (sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_ADDRESS ||
829 	     sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_INVALIDATE_KEY)) {
830 		if (sgl->keyed.length > g_rdma.max_io_size) {
831 			SPDK_ERRLOG("SGL length 0x%x exceeds max io size 0x%x\n",
832 				    sgl->keyed.length, g_rdma.max_io_size);
833 			rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID;
834 			return SPDK_NVMF_REQUEST_PREP_ERROR;
835 		}
836 
837 		if (sgl->keyed.length == 0) {
838 			req->xfer = SPDK_NVME_DATA_NONE;
839 			return SPDK_NVMF_REQUEST_PREP_READY;
840 		}
841 
842 		req->length = sgl->keyed.length;
843 
844 		/* TODO: In Capsule Data Size should be tracked per queue (admin, for instance, should always have 4k and no more). */
845 		if (sgl->keyed.length > g_rdma.in_capsule_data_size) {
846 			rdma_sess = get_rdma_sess(req->conn->sess);
847 			req->data = SLIST_FIRST(&rdma_sess->data_buf_pool);
848 			if (!req->data) {
849 				/* No available buffers. Queue this request up. */
850 				SPDK_TRACELOG(SPDK_TRACE_RDMA, "No available large data buffers. Queueing request %p\n", req);
851 				return SPDK_NVMF_REQUEST_PREP_PENDING_BUFFER;
852 			}
853 
854 			SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request %p took buffer from central pool\n", req);
855 			SLIST_REMOVE_HEAD(&rdma_sess->data_buf_pool, link);
856 		} else {
857 			/* Use the in capsule data buffer, even though this isn't in capsule data */
858 			SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request using in capsule buffer for non-capsule data\n");
859 			req->data = rdma_req->buf;
860 		}
861 		if (req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) {
862 			return SPDK_NVMF_REQUEST_PREP_PENDING_DATA;
863 		} else {
864 			return SPDK_NVMF_REQUEST_PREP_READY;
865 		}
866 	} else if (sgl->generic.type == SPDK_NVME_SGL_TYPE_DATA_BLOCK &&
867 		   sgl->unkeyed.subtype == SPDK_NVME_SGL_SUBTYPE_OFFSET) {
868 		uint64_t offset = sgl->address;
869 		uint32_t max_len = g_rdma.in_capsule_data_size;
870 
871 		SPDK_TRACELOG(SPDK_TRACE_NVMF, "In-capsule data: offset 0x%" PRIx64 ", length 0x%x\n",
872 			      offset, sgl->unkeyed.length);
873 
874 		if (offset > max_len) {
875 			SPDK_ERRLOG("In-capsule offset 0x%" PRIx64 " exceeds capsule length 0x%x\n",
876 				    offset, max_len);
877 			rsp->status.sc = SPDK_NVME_SC_INVALID_SGL_OFFSET;
878 			return SPDK_NVMF_REQUEST_PREP_ERROR;
879 		}
880 		max_len -= (uint32_t)offset;
881 
882 		if (sgl->unkeyed.length > max_len) {
883 			SPDK_ERRLOG("In-capsule data length 0x%x exceeds capsule length 0x%x\n",
884 				    sgl->unkeyed.length, max_len);
885 			rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID;
886 			return SPDK_NVMF_REQUEST_PREP_ERROR;
887 		}
888 
889 		if (sgl->unkeyed.length == 0) {
890 			req->xfer = SPDK_NVME_DATA_NONE;
891 			return SPDK_NVMF_REQUEST_PREP_READY;
892 		}
893 
894 		req->data = rdma_req->buf + offset;
895 		req->length = sgl->unkeyed.length;
896 		return SPDK_NVMF_REQUEST_PREP_READY;
897 	}
898 
899 	SPDK_ERRLOG("Invalid NVMf I/O Command SGL:  Type 0x%x, Subtype 0x%x\n",
900 		    sgl->generic.type, sgl->generic.subtype);
901 	rsp->status.sc = SPDK_NVME_SC_SGL_DESCRIPTOR_TYPE_INVALID;
902 	return SPDK_NVMF_REQUEST_PREP_ERROR;
903 }
904 
905 static int
906 spdk_nvmf_rdma_handle_pending_rdma_rw(struct spdk_nvmf_conn *conn)
907 {
908 	struct spdk_nvmf_rdma_conn	*rdma_conn = get_rdma_conn(conn);
909 	struct spdk_nvmf_rdma_session	*rdma_sess;
910 	struct spdk_nvmf_rdma_request	*rdma_req, *tmp;
911 	int rc;
912 	int count = 0;
913 
914 	/* First, try to assign free data buffers to requests that need one */
915 	if (conn->sess) {
916 		rdma_sess = get_rdma_sess(conn->sess);
917 		TAILQ_FOREACH_SAFE(rdma_req, &rdma_conn->pending_data_buf_queue, link, tmp) {
918 			assert(rdma_req->req.data == NULL);
919 			rdma_req->req.data = SLIST_FIRST(&rdma_sess->data_buf_pool);
920 			if (!rdma_req->req.data) {
921 				break;
922 			}
923 			SLIST_REMOVE_HEAD(&rdma_sess->data_buf_pool, link);
924 			TAILQ_REMOVE(&rdma_conn->pending_data_buf_queue, rdma_req, link);
925 			if (rdma_req->req.xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) {
926 				TAILQ_INSERT_TAIL(&rdma_conn->pending_rdma_rw_queue, rdma_req, link);
927 			} else {
928 				rc = spdk_nvmf_request_exec(&rdma_req->req);
929 				if (rc < 0) {
930 					return -1;
931 				}
932 				count++;
933 			}
934 		}
935 	}
936 
937 	/* Try to initiate RDMA Reads or Writes on requests that have data buffers */
938 	while (rdma_conn->cur_rdma_rw_depth < rdma_conn->max_rw_depth) {
939 		if (TAILQ_EMPTY(&rdma_conn->pending_rdma_rw_queue)) {
940 			break;
941 		}
942 
943 		rdma_req = TAILQ_FIRST(&rdma_conn->pending_rdma_rw_queue);
944 		TAILQ_REMOVE(&rdma_conn->pending_rdma_rw_queue, rdma_req, link);
945 
946 		SPDK_TRACELOG(SPDK_TRACE_RDMA, "Submitting previously queued for RDMA R/W request %p\n", rdma_req);
947 
948 		rc = spdk_nvmf_rdma_request_transfer_data(&rdma_req->req);
949 		if (rc) {
950 			return -1;
951 		}
952 	}
953 
954 	return count;
955 }
956 
957 /* Public API callbacks begin here */
958 
959 static int
960 spdk_nvmf_rdma_init(uint16_t max_queue_depth, uint32_t max_io_size,
961 		    uint32_t in_capsule_data_size)
962 {
963 	int rc;
964 
965 	SPDK_NOTICELOG("*** RDMA Transport Init ***\n");
966 
967 	pthread_mutex_lock(&g_rdma.lock);
968 	g_rdma.max_queue_depth = max_queue_depth;
969 	g_rdma.max_io_size = max_io_size;
970 	g_rdma.in_capsule_data_size = in_capsule_data_size;
971 
972 	g_rdma.event_channel = rdma_create_event_channel();
973 	if (g_rdma.event_channel == NULL) {
974 		SPDK_ERRLOG("rdma_create_event_channel() failed, %s\n", strerror(errno));
975 		pthread_mutex_unlock(&g_rdma.lock);
976 		return -1;
977 	}
978 
979 	rc = fcntl(g_rdma.event_channel->fd, F_SETFL, O_NONBLOCK);
980 	if (rc < 0) {
981 		SPDK_ERRLOG("fcntl to set fd to non-blocking failed\n");
982 		pthread_mutex_unlock(&g_rdma.lock);
983 		return -1;
984 	}
985 
986 	pthread_mutex_unlock(&g_rdma.lock);
987 	return 0;
988 }
989 
990 static int
991 spdk_nvmf_rdma_fini(void)
992 {
993 	struct spdk_nvmf_rdma_listen_addr *addr, *tmp;
994 
995 	pthread_mutex_lock(&g_rdma.lock);
996 	TAILQ_FOREACH_SAFE(addr, &g_rdma.listen_addrs, link, tmp) {
997 		TAILQ_REMOVE(&g_rdma.listen_addrs, addr, link);
998 		ibv_destroy_comp_channel(addr->comp_channel);
999 		rdma_destroy_id(addr->id);
1000 	}
1001 
1002 	if (g_rdma.event_channel != NULL) {
1003 		rdma_destroy_event_channel(g_rdma.event_channel);
1004 	}
1005 	pthread_mutex_unlock(&g_rdma.lock);
1006 
1007 	return 0;
1008 }
1009 
1010 static int
1011 spdk_nvmf_rdma_poll(struct spdk_nvmf_conn *conn);
1012 
1013 static void
1014 spdk_nvmf_rdma_acceptor_poll(void)
1015 {
1016 	struct rdma_cm_event		*event;
1017 	int				rc;
1018 	struct spdk_nvmf_rdma_conn	*rdma_conn, *tmp;
1019 
1020 	if (g_rdma.event_channel == NULL) {
1021 		return;
1022 	}
1023 
1024 	/* Process pending connections for incoming capsules. The only capsule
1025 	 * this should ever find is a CONNECT request. */
1026 	TAILQ_FOREACH_SAFE(rdma_conn, &g_pending_conns, link, tmp) {
1027 		rc = spdk_nvmf_rdma_poll(&rdma_conn->conn);
1028 		if (rc < 0) {
1029 			TAILQ_REMOVE(&g_pending_conns, rdma_conn, link);
1030 			spdk_nvmf_rdma_conn_destroy(rdma_conn);
1031 		} else if (rc > 0) {
1032 			/* At least one request was processed which is assumed to be
1033 			 * a CONNECT. Remove this connection from our list. */
1034 			TAILQ_REMOVE(&g_pending_conns, rdma_conn, link);
1035 		}
1036 	}
1037 
1038 	while (1) {
1039 		rc = rdma_get_cm_event(g_rdma.event_channel, &event);
1040 		if (rc == 0) {
1041 			SPDK_TRACELOG(SPDK_TRACE_RDMA, "Acceptor Event: %s\n", CM_EVENT_STR[event->event]);
1042 
1043 			switch (event->event) {
1044 			case RDMA_CM_EVENT_CONNECT_REQUEST:
1045 				rc = nvmf_rdma_connect(event);
1046 				if (rc < 0) {
1047 					SPDK_ERRLOG("Unable to process connect event. rc: %d\n", rc);
1048 					break;
1049 				}
1050 				break;
1051 			case RDMA_CM_EVENT_ESTABLISHED:
1052 				break;
1053 			case RDMA_CM_EVENT_ADDR_CHANGE:
1054 			case RDMA_CM_EVENT_DISCONNECTED:
1055 			case RDMA_CM_EVENT_DEVICE_REMOVAL:
1056 			case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1057 				rc = nvmf_rdma_disconnect(event);
1058 				if (rc < 0) {
1059 					SPDK_ERRLOG("Unable to process disconnect event. rc: %d\n", rc);
1060 					break;
1061 				}
1062 				continue;
1063 			default:
1064 				SPDK_ERRLOG("Unexpected Acceptor Event [%d]\n", event->event);
1065 				break;
1066 			}
1067 
1068 			rdma_ack_cm_event(event);
1069 		} else {
1070 			if (errno != EAGAIN && errno != EWOULDBLOCK) {
1071 				SPDK_ERRLOG("Acceptor Event Error: %s\n", strerror(errno));
1072 			}
1073 			break;
1074 		}
1075 	}
1076 }
1077 
1078 static int
1079 spdk_nvmf_rdma_listen(struct spdk_nvmf_listen_addr *listen_addr)
1080 {
1081 	struct spdk_nvmf_rdma_listen_addr *addr;
1082 	struct sockaddr_in saddr;
1083 	int rc;
1084 
1085 	pthread_mutex_lock(&g_rdma.lock);
1086 	assert(g_rdma.event_channel != NULL);
1087 	TAILQ_FOREACH(addr, &g_rdma.listen_addrs, link) {
1088 		if ((!strcasecmp(addr->traddr, listen_addr->traddr)) &&
1089 		    (!strcasecmp(addr->trsvcid, listen_addr->trsvcid))) {
1090 			/* Already listening at this address */
1091 			pthread_mutex_unlock(&g_rdma.lock);
1092 			return 0;
1093 		}
1094 	}
1095 
1096 	addr = calloc(1, sizeof(*addr));
1097 	if (!addr) {
1098 		pthread_mutex_unlock(&g_rdma.lock);
1099 		return -1;
1100 	}
1101 
1102 	addr->traddr = listen_addr->traddr;
1103 	addr->trsvcid = listen_addr->trsvcid;
1104 
1105 	rc = rdma_create_id(g_rdma.event_channel, &addr->id, addr, RDMA_PS_TCP);
1106 	if (rc < 0) {
1107 		SPDK_ERRLOG("rdma_create_id() failed\n");
1108 		free(addr);
1109 		pthread_mutex_unlock(&g_rdma.lock);
1110 		return -1;
1111 	}
1112 
1113 	memset(&saddr, 0, sizeof(saddr));
1114 	saddr.sin_family = AF_INET;
1115 	saddr.sin_addr.s_addr = inet_addr(addr->traddr);
1116 	saddr.sin_port = htons((uint16_t)strtoul(addr->trsvcid, NULL, 10));
1117 	rc = rdma_bind_addr(addr->id, (struct sockaddr *)&saddr);
1118 	if (rc < 0) {
1119 		SPDK_ERRLOG("rdma_bind_addr() failed\n");
1120 		rdma_destroy_id(addr->id);
1121 		free(addr);
1122 		pthread_mutex_unlock(&g_rdma.lock);
1123 		return -1;
1124 	}
1125 
1126 	rc = rdma_listen(addr->id, 10); /* 10 = backlog */
1127 	if (rc < 0) {
1128 		SPDK_ERRLOG("rdma_listen() failed\n");
1129 		rdma_destroy_id(addr->id);
1130 		free(addr);
1131 		pthread_mutex_unlock(&g_rdma.lock);
1132 		return -1;
1133 	}
1134 
1135 	rc = ibv_query_device(addr->id->verbs, &addr->attr);
1136 	if (rc < 0) {
1137 		SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
1138 		rdma_destroy_id(addr->id);
1139 		free(addr);
1140 		pthread_mutex_unlock(&g_rdma.lock);
1141 		return -1;
1142 	}
1143 
1144 	addr->comp_channel = ibv_create_comp_channel(addr->id->verbs);
1145 	if (!addr->comp_channel) {
1146 		SPDK_ERRLOG("Failed to create completion channel\n");
1147 		rdma_destroy_id(addr->id);
1148 		free(addr);
1149 		pthread_mutex_unlock(&g_rdma.lock);
1150 		return -1;
1151 	}
1152 	SPDK_TRACELOG(SPDK_TRACE_RDMA, "For listen id %p with context %p, created completion channel %p\n",
1153 		      addr->id, addr->id->verbs, addr->comp_channel);
1154 
1155 	rc = fcntl(addr->comp_channel->fd, F_SETFL, O_NONBLOCK);
1156 	if (rc < 0) {
1157 		SPDK_ERRLOG("fcntl to set comp channel to non-blocking failed\n");
1158 		rdma_destroy_id(addr->id);
1159 		ibv_destroy_comp_channel(addr->comp_channel);
1160 		free(addr);
1161 		pthread_mutex_unlock(&g_rdma.lock);
1162 		return -1;
1163 	}
1164 
1165 	TAILQ_INSERT_TAIL(&g_rdma.listen_addrs, addr, link);
1166 	pthread_mutex_unlock(&g_rdma.lock);
1167 
1168 	SPDK_NOTICELOG("*** NVMf Target Listening on %s port %d ***\n",
1169 		       addr->traddr, ntohs(rdma_get_src_port(addr->id)));
1170 
1171 	return 0;
1172 }
1173 
1174 static void
1175 spdk_nvmf_rdma_discover(struct spdk_nvmf_listen_addr *listen_addr,
1176 			struct spdk_nvmf_discovery_log_page_entry *entry)
1177 {
1178 	entry->trtype = SPDK_NVMF_TRTYPE_RDMA;
1179 	entry->adrfam = SPDK_NVMF_ADRFAM_IPV4;
1180 	entry->treq.secure_channel = SPDK_NVMF_TREQ_SECURE_CHANNEL_NOT_SPECIFIED;
1181 
1182 	spdk_strcpy_pad(entry->trsvcid, listen_addr->trsvcid, sizeof(entry->trsvcid), ' ');
1183 	spdk_strcpy_pad(entry->traddr, listen_addr->traddr, sizeof(entry->traddr), ' ');
1184 
1185 	entry->tsas.rdma.rdma_qptype = SPDK_NVMF_RDMA_QPTYPE_RELIABLE_CONNECTED;
1186 	entry->tsas.rdma.rdma_prtype = SPDK_NVMF_RDMA_PRTYPE_NONE;
1187 	entry->tsas.rdma.rdma_cms = SPDK_NVMF_RDMA_CMS_RDMA_CM;
1188 }
1189 
1190 static struct spdk_nvmf_session *
1191 spdk_nvmf_rdma_session_init(void)
1192 {
1193 	struct spdk_nvmf_rdma_session	*rdma_sess;
1194 	int				i;
1195 	struct spdk_nvmf_rdma_buf	*buf;
1196 
1197 	rdma_sess = calloc(1, sizeof(*rdma_sess));
1198 	if (!rdma_sess) {
1199 		return NULL;
1200 	}
1201 
1202 	/* TODO: Make the number of elements in this pool configurable. For now, one full queue
1203 	 *       worth seems reasonable.
1204 	 */
1205 	rdma_sess->buf = spdk_zmalloc(g_rdma.max_queue_depth * g_rdma.max_io_size,
1206 				      0x20000, NULL);
1207 	if (!rdma_sess->buf) {
1208 		SPDK_ERRLOG("Large buffer pool allocation failed (%d x %d)\n",
1209 			    g_rdma.max_queue_depth, g_rdma.max_io_size);
1210 		free(rdma_sess);
1211 		return NULL;
1212 	}
1213 
1214 	SLIST_INIT(&rdma_sess->data_buf_pool);
1215 	for (i = 0; i < g_rdma.max_queue_depth; i++) {
1216 		buf = (struct spdk_nvmf_rdma_buf *)(rdma_sess->buf + (i * g_rdma.max_io_size));
1217 		SLIST_INSERT_HEAD(&rdma_sess->data_buf_pool, buf, link);
1218 	}
1219 
1220 	rdma_sess->session.transport = &spdk_nvmf_transport_rdma;
1221 
1222 	return &rdma_sess->session;
1223 }
1224 
1225 static void
1226 spdk_nvmf_rdma_session_fini(struct spdk_nvmf_session *session)
1227 {
1228 	struct spdk_nvmf_rdma_session *rdma_sess = get_rdma_sess(session);
1229 
1230 	if (!rdma_sess) {
1231 		return;
1232 	}
1233 
1234 	ibv_dereg_mr(rdma_sess->buf_mr);
1235 	spdk_free(rdma_sess->buf);
1236 	free(rdma_sess);
1237 }
1238 
1239 static int
1240 spdk_nvmf_rdma_session_add_conn(struct spdk_nvmf_session *session,
1241 				struct spdk_nvmf_conn *conn)
1242 {
1243 	struct spdk_nvmf_rdma_session	*rdma_sess = get_rdma_sess(session);
1244 	struct spdk_nvmf_rdma_conn	*rdma_conn = get_rdma_conn(conn);
1245 
1246 	if (rdma_sess->verbs != NULL) {
1247 		if (rdma_sess->verbs != rdma_conn->cm_id->verbs) {
1248 			SPDK_ERRLOG("Two connections belonging to the same session cannot connect using different RDMA devices.\n");
1249 			return -1;
1250 		}
1251 
1252 		/* Nothing else to do. */
1253 		return 0;
1254 	}
1255 
1256 	rdma_sess->verbs = rdma_conn->cm_id->verbs;
1257 	rdma_sess->buf_mr = ibv_reg_mr(rdma_conn->cm_id->pd, rdma_sess->buf,
1258 				       g_rdma.max_queue_depth * g_rdma.max_io_size,
1259 				       IBV_ACCESS_LOCAL_WRITE |
1260 				       IBV_ACCESS_REMOTE_WRITE);
1261 	if (!rdma_sess->buf_mr) {
1262 		SPDK_ERRLOG("Large buffer pool registration failed (%d x %d)\n",
1263 			    g_rdma.max_queue_depth, g_rdma.max_io_size);
1264 		spdk_free(rdma_sess->buf);
1265 		free(rdma_sess);
1266 		return -1;
1267 	}
1268 
1269 	SPDK_TRACELOG(SPDK_TRACE_RDMA, "Session Shared Data Pool: %p Length: %x LKey: %x\n",
1270 		      rdma_sess->buf,  g_rdma.max_queue_depth * g_rdma.max_io_size, rdma_sess->buf_mr->lkey);
1271 
1272 	return 0;
1273 }
1274 
1275 static int
1276 spdk_nvmf_rdma_session_remove_conn(struct spdk_nvmf_session *session,
1277 				   struct spdk_nvmf_conn *conn)
1278 {
1279 	return 0;
1280 }
1281 
1282 static int
1283 spdk_nvmf_rdma_request_complete(struct spdk_nvmf_request *req)
1284 {
1285 	struct spdk_nvme_cpl *rsp = &req->rsp->nvme_cpl;
1286 	int rc;
1287 
1288 	if (rsp->status.sc == SPDK_NVME_SC_SUCCESS &&
1289 	    req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
1290 		rc = spdk_nvmf_rdma_request_transfer_data(req);
1291 	} else {
1292 		rc = spdk_nvmf_rdma_request_send_completion(req);
1293 	}
1294 
1295 	return rc;
1296 }
1297 
1298 static int
1299 spdk_nvmf_rdma_request_release(struct spdk_nvmf_request *req)
1300 {
1301 	return spdk_nvmf_rdma_request_ack_completion(req);
1302 }
1303 
1304 static void
1305 spdk_nvmf_rdma_close_conn(struct spdk_nvmf_conn *conn)
1306 {
1307 	struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
1308 
1309 	return spdk_nvmf_rdma_conn_destroy(rdma_conn);
1310 }
1311 
1312 /* Returns the number of times that spdk_nvmf_request_exec was called,
1313  * or -1 on error.
1314  */
1315 static int
1316 spdk_nvmf_rdma_poll(struct spdk_nvmf_conn *conn)
1317 {
1318 	struct ibv_wc wc[32];
1319 	struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
1320 	struct spdk_nvmf_rdma_request *rdma_req;
1321 	struct spdk_nvmf_request *req;
1322 	int reaped, i, rc;
1323 	int count = 0;
1324 
1325 	/* Poll for completing operations. */
1326 	rc = ibv_poll_cq(rdma_conn->cq, 32, wc);
1327 	if (rc < 0) {
1328 		SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
1329 			    errno, strerror(errno));
1330 		return -1;
1331 	}
1332 
1333 	reaped = rc;
1334 	for (i = 0; i < reaped; i++) {
1335 		if (wc[i].status) {
1336 			SPDK_ERRLOG("CQ error on Connection %p, Request 0x%lu (%d): %s\n",
1337 				    conn, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
1338 			return -1;
1339 		}
1340 
1341 		rdma_req = (struct spdk_nvmf_rdma_request *)wc[i].wr_id;
1342 		if (rdma_req == NULL) {
1343 			SPDK_ERRLOG("NULL wr_id in RDMA work completion\n");
1344 			return -1;
1345 		}
1346 
1347 		req = &rdma_req->req;
1348 
1349 		switch (wc[i].opcode) {
1350 		case IBV_WC_SEND:
1351 			assert(rdma_conn->cur_queue_depth > 0);
1352 			SPDK_TRACELOG(SPDK_TRACE_RDMA,
1353 				      "RDMA SEND Complete. Request: %p Connection: %p Outstanding I/O: %d\n",
1354 				      req, conn, rdma_conn->cur_queue_depth - 1);
1355 			rc = spdk_nvmf_rdma_request_ack_completion(req);
1356 			if (rc) {
1357 				return -1;
1358 			}
1359 			break;
1360 
1361 		case IBV_WC_RDMA_WRITE:
1362 			SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA WRITE Complete. Request: %p Connection: %p\n",
1363 				      req, conn);
1364 			spdk_trace_record(TRACE_RDMA_WRITE_COMPLETE, 0, 0, (uint64_t)req, 0);
1365 			rc = spdk_nvmf_rdma_request_send_completion(req);
1366 			if (rc) {
1367 				return -1;
1368 			}
1369 
1370 			/* Since an RDMA R/W operation completed, try to submit from the pending list. */
1371 			rdma_conn->cur_rdma_rw_depth--;
1372 			rc = spdk_nvmf_rdma_handle_pending_rdma_rw(conn);
1373 			if (rc < 0) {
1374 				return -1;
1375 			}
1376 			count += rc;
1377 			break;
1378 
1379 		case IBV_WC_RDMA_READ:
1380 			SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA READ Complete. Request: %p Connection: %p\n",
1381 				      req, conn);
1382 			spdk_trace_record(TRACE_RDMA_READ_COMPLETE, 0, 0, (uint64_t)req, 0);
1383 			rc = spdk_nvmf_request_exec(req);
1384 			if (rc) {
1385 				return -1;
1386 			}
1387 			count++;
1388 
1389 			/* Since an RDMA R/W operation completed, try to submit from the pending list. */
1390 			rdma_conn->cur_rdma_rw_depth--;
1391 			rc = spdk_nvmf_rdma_handle_pending_rdma_rw(conn);
1392 			if (rc < 0) {
1393 				return -1;
1394 			}
1395 			count += rc;
1396 			break;
1397 
1398 		case IBV_WC_RECV:
1399 			if (wc[i].byte_len < sizeof(struct spdk_nvmf_capsule_cmd)) {
1400 				SPDK_ERRLOG("recv length %u less than capsule header\n", wc[i].byte_len);
1401 				return -1;
1402 			}
1403 
1404 			rdma_conn->cur_queue_depth++;
1405 			SPDK_TRACELOG(SPDK_TRACE_RDMA,
1406 				      "RDMA RECV Complete. Request: %p Connection: %p Outstanding I/O: %d\n",
1407 				      req, conn, rdma_conn->cur_queue_depth);
1408 			spdk_trace_record(TRACE_NVMF_IO_START, 0, 0, (uint64_t)req, 0);
1409 
1410 			memset(req->rsp, 0, sizeof(*req->rsp));
1411 			rc = spdk_nvmf_request_prep_data(req);
1412 			switch (rc) {
1413 			case SPDK_NVMF_REQUEST_PREP_READY:
1414 				SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request %p is ready for execution\n", req);
1415 				/* Data is immediately available */
1416 				rc = spdk_nvmf_request_exec(req);
1417 				if (rc < 0) {
1418 					return -1;
1419 				}
1420 				count++;
1421 				break;
1422 			case SPDK_NVMF_REQUEST_PREP_PENDING_BUFFER:
1423 				SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request %p needs data buffer\n", req);
1424 				TAILQ_INSERT_TAIL(&rdma_conn->pending_data_buf_queue, rdma_req, link);
1425 				break;
1426 			case SPDK_NVMF_REQUEST_PREP_PENDING_DATA:
1427 				SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request %p needs data transfer\n", req);
1428 				rc = spdk_nvmf_rdma_request_transfer_data(req);
1429 				if (rc < 0) {
1430 					return -1;
1431 				}
1432 				break;
1433 			case SPDK_NVMF_REQUEST_PREP_ERROR:
1434 				spdk_nvmf_rdma_request_complete(req);
1435 				break;
1436 			}
1437 			break;
1438 
1439 		default:
1440 			SPDK_ERRLOG("Received an unknown opcode on the CQ: %d\n", wc[i].opcode);
1441 			return -1;
1442 		}
1443 	}
1444 
1445 	return count;
1446 }
1447 
1448 const struct spdk_nvmf_transport spdk_nvmf_transport_rdma = {
1449 	.name = "rdma",
1450 	.transport_init = spdk_nvmf_rdma_init,
1451 	.transport_fini = spdk_nvmf_rdma_fini,
1452 
1453 	.acceptor_poll = spdk_nvmf_rdma_acceptor_poll,
1454 
1455 	.listen_addr_add = spdk_nvmf_rdma_listen,
1456 	.listen_addr_discover = spdk_nvmf_rdma_discover,
1457 
1458 	.session_init = spdk_nvmf_rdma_session_init,
1459 	.session_fini = spdk_nvmf_rdma_session_fini,
1460 	.session_add_conn = spdk_nvmf_rdma_session_add_conn,
1461 	.session_remove_conn = spdk_nvmf_rdma_session_remove_conn,
1462 
1463 	.req_complete = spdk_nvmf_rdma_request_complete,
1464 	.req_release = spdk_nvmf_rdma_request_release,
1465 
1466 	.conn_fini = spdk_nvmf_rdma_close_conn,
1467 	.conn_poll = spdk_nvmf_rdma_poll,
1468 
1469 
1470 };
1471 
1472 SPDK_LOG_REGISTER_TRACE_FLAG("rdma", SPDK_TRACE_RDMA)
1473