1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2016 Intel Corporation. All rights reserved. 3 * Copyright (c) 2019-2021 Mellanox Technologies LTD. All rights reserved. 4 * Copyright (c) 2021-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "spdk/config.h" 10 #include "spdk/thread.h" 11 #include "spdk/likely.h" 12 #include "spdk/nvmf_transport.h" 13 #include "spdk/string.h" 14 #include "spdk/trace.h" 15 #include "spdk/tree.h" 16 #include "spdk/util.h" 17 18 #include "spdk_internal/assert.h" 19 #include "spdk/log.h" 20 #include "spdk_internal/rdma_provider.h" 21 #include "spdk_internal/rdma_utils.h" 22 23 #include "nvmf_internal.h" 24 #include "transport.h" 25 26 #include "spdk_internal/trace_defs.h" 27 28 struct spdk_nvme_rdma_hooks g_nvmf_hooks = {}; 29 const struct spdk_nvmf_transport_ops spdk_nvmf_transport_rdma; 30 31 /* 32 RDMA Connection Resource Defaults 33 */ 34 #define NVMF_DEFAULT_MSDBD 16 35 #define NVMF_DEFAULT_TX_SGE SPDK_NVMF_MAX_SGL_ENTRIES 36 #define NVMF_DEFAULT_RSP_SGE 1 37 #define NVMF_DEFAULT_RX_SGE 2 38 39 #define NVMF_RDMA_MAX_EVENTS_PER_POLL 32 40 41 SPDK_STATIC_ASSERT(NVMF_DEFAULT_MSDBD <= SPDK_NVMF_MAX_SGL_ENTRIES, 42 "MSDBD must not exceed SPDK_NVMF_MAX_SGL_ENTRIES"); 43 44 /* The RDMA completion queue size */ 45 #define DEFAULT_NVMF_RDMA_CQ_SIZE 4096 46 #define MAX_WR_PER_QP(queue_depth) (queue_depth * 3 + 2) 47 48 enum spdk_nvmf_rdma_request_state { 49 /* The request is not currently in use */ 50 RDMA_REQUEST_STATE_FREE = 0, 51 52 /* Initial state when request first received */ 53 RDMA_REQUEST_STATE_NEW, 54 55 /* The request is queued until a data buffer is available. */ 56 RDMA_REQUEST_STATE_NEED_BUFFER, 57 58 /* The request is waiting on RDMA queue depth availability 59 * to transfer data from the host to the controller. 60 */ 61 RDMA_REQUEST_STATE_DATA_TRANSFER_TO_CONTROLLER_PENDING, 62 63 /* The request is currently transferring data from the host to the controller. */ 64 RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER, 65 66 /* The request is ready to execute at the block device */ 67 RDMA_REQUEST_STATE_READY_TO_EXECUTE, 68 69 /* The request is currently executing at the block device */ 70 RDMA_REQUEST_STATE_EXECUTING, 71 72 /* The request finished executing at the block device */ 73 RDMA_REQUEST_STATE_EXECUTED, 74 75 /* The request is waiting on RDMA queue depth availability 76 * to transfer data from the controller to the host. 77 */ 78 RDMA_REQUEST_STATE_DATA_TRANSFER_TO_HOST_PENDING, 79 80 /* The request is waiting on RDMA queue depth availability 81 * to send response to the host. 82 */ 83 RDMA_REQUEST_STATE_READY_TO_COMPLETE_PENDING, 84 85 /* The request is ready to send a completion */ 86 RDMA_REQUEST_STATE_READY_TO_COMPLETE, 87 88 /* The request is currently transferring data from the controller to the host. */ 89 RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST, 90 91 /* The request currently has an outstanding completion without an 92 * associated data transfer. 93 */ 94 RDMA_REQUEST_STATE_COMPLETING, 95 96 /* The request completed and can be marked free. */ 97 RDMA_REQUEST_STATE_COMPLETED, 98 99 /* Terminator */ 100 RDMA_REQUEST_NUM_STATES, 101 }; 102 103 static void 104 nvmf_trace(void) 105 { 106 spdk_trace_register_object(OBJECT_NVMF_RDMA_IO, 'r'); 107 108 struct spdk_trace_tpoint_opts opts[] = { 109 { 110 "RDMA_REQ_NEW", TRACE_RDMA_REQUEST_STATE_NEW, 111 OWNER_TYPE_NONE, OBJECT_NVMF_RDMA_IO, 1, 112 { 113 { "qpair", SPDK_TRACE_ARG_TYPE_PTR, 8 }, 114 { "qd", SPDK_TRACE_ARG_TYPE_INT, 4 } 115 } 116 }, 117 { 118 "RDMA_REQ_COMPLETED", TRACE_RDMA_REQUEST_STATE_COMPLETED, 119 OWNER_TYPE_NONE, OBJECT_NVMF_RDMA_IO, 0, 120 { 121 { "qpair", SPDK_TRACE_ARG_TYPE_PTR, 8 }, 122 { "qd", SPDK_TRACE_ARG_TYPE_INT, 4 } 123 } 124 }, 125 }; 126 127 spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts)); 128 spdk_trace_register_description("RDMA_REQ_NEED_BUFFER", TRACE_RDMA_REQUEST_STATE_NEED_BUFFER, 129 OWNER_TYPE_NONE, OBJECT_NVMF_RDMA_IO, 0, 130 SPDK_TRACE_ARG_TYPE_PTR, "qpair"); 131 spdk_trace_register_description("RDMA_REQ_TX_PENDING_C2H", 132 TRACE_RDMA_REQUEST_STATE_DATA_TRANSFER_TO_HOST_PENDING, 133 OWNER_TYPE_NONE, OBJECT_NVMF_RDMA_IO, 0, 134 SPDK_TRACE_ARG_TYPE_PTR, "qpair"); 135 spdk_trace_register_description("RDMA_REQ_TX_PENDING_H2C", 136 TRACE_RDMA_REQUEST_STATE_DATA_TRANSFER_TO_CONTROLLER_PENDING, 137 OWNER_TYPE_NONE, OBJECT_NVMF_RDMA_IO, 0, 138 SPDK_TRACE_ARG_TYPE_PTR, "qpair"); 139 spdk_trace_register_description("RDMA_REQ_TX_H2C", 140 TRACE_RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER, 141 OWNER_TYPE_NONE, OBJECT_NVMF_RDMA_IO, 0, 142 SPDK_TRACE_ARG_TYPE_PTR, "qpair"); 143 spdk_trace_register_description("RDMA_REQ_RDY_TO_EXECUTE", 144 TRACE_RDMA_REQUEST_STATE_READY_TO_EXECUTE, 145 OWNER_TYPE_NONE, OBJECT_NVMF_RDMA_IO, 0, 146 SPDK_TRACE_ARG_TYPE_PTR, "qpair"); 147 spdk_trace_register_description("RDMA_REQ_EXECUTING", 148 TRACE_RDMA_REQUEST_STATE_EXECUTING, 149 OWNER_TYPE_NONE, OBJECT_NVMF_RDMA_IO, 0, 150 SPDK_TRACE_ARG_TYPE_PTR, "qpair"); 151 spdk_trace_register_description("RDMA_REQ_EXECUTED", 152 TRACE_RDMA_REQUEST_STATE_EXECUTED, 153 OWNER_TYPE_NONE, OBJECT_NVMF_RDMA_IO, 0, 154 SPDK_TRACE_ARG_TYPE_PTR, "qpair"); 155 spdk_trace_register_description("RDMA_REQ_RDY2COMPL_PEND", 156 TRACE_RDMA_REQUEST_STATE_READY_TO_COMPLETE_PENDING, 157 OWNER_TYPE_NONE, OBJECT_NVMF_RDMA_IO, 0, 158 SPDK_TRACE_ARG_TYPE_PTR, "qpair"); 159 spdk_trace_register_description("RDMA_REQ_RDY_TO_COMPL", 160 TRACE_RDMA_REQUEST_STATE_READY_TO_COMPLETE, 161 OWNER_TYPE_NONE, OBJECT_NVMF_RDMA_IO, 0, 162 SPDK_TRACE_ARG_TYPE_PTR, "qpair"); 163 spdk_trace_register_description("RDMA_REQ_COMPLETING_C2H", 164 TRACE_RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST, 165 OWNER_TYPE_NONE, OBJECT_NVMF_RDMA_IO, 0, 166 SPDK_TRACE_ARG_TYPE_PTR, "qpair"); 167 spdk_trace_register_description("RDMA_REQ_COMPLETING", 168 TRACE_RDMA_REQUEST_STATE_COMPLETING, 169 OWNER_TYPE_NONE, OBJECT_NVMF_RDMA_IO, 0, 170 SPDK_TRACE_ARG_TYPE_PTR, "qpair"); 171 172 spdk_trace_register_description("RDMA_QP_CREATE", TRACE_RDMA_QP_CREATE, 173 OWNER_TYPE_NONE, OBJECT_NONE, 0, 174 SPDK_TRACE_ARG_TYPE_INT, ""); 175 spdk_trace_register_description("RDMA_IBV_ASYNC_EVENT", TRACE_RDMA_IBV_ASYNC_EVENT, 176 OWNER_TYPE_NONE, OBJECT_NONE, 0, 177 SPDK_TRACE_ARG_TYPE_INT, "type"); 178 spdk_trace_register_description("RDMA_CM_ASYNC_EVENT", TRACE_RDMA_CM_ASYNC_EVENT, 179 OWNER_TYPE_NONE, OBJECT_NONE, 0, 180 SPDK_TRACE_ARG_TYPE_INT, "type"); 181 spdk_trace_register_description("RDMA_QP_DISCONNECT", TRACE_RDMA_QP_DISCONNECT, 182 OWNER_TYPE_NONE, OBJECT_NONE, 0, 183 SPDK_TRACE_ARG_TYPE_INT, ""); 184 spdk_trace_register_description("RDMA_QP_DESTROY", TRACE_RDMA_QP_DESTROY, 185 OWNER_TYPE_NONE, OBJECT_NONE, 0, 186 SPDK_TRACE_ARG_TYPE_INT, ""); 187 188 spdk_trace_tpoint_register_relation(TRACE_BDEV_IO_START, OBJECT_NVMF_RDMA_IO, 1); 189 spdk_trace_tpoint_register_relation(TRACE_BDEV_IO_DONE, OBJECT_NVMF_RDMA_IO, 0); 190 } 191 SPDK_TRACE_REGISTER_FN(nvmf_trace, "nvmf_rdma", TRACE_GROUP_NVMF_RDMA) 192 193 enum spdk_nvmf_rdma_wr_type { 194 RDMA_WR_TYPE_RECV, 195 RDMA_WR_TYPE_SEND, 196 RDMA_WR_TYPE_DATA, 197 }; 198 199 struct spdk_nvmf_rdma_wr { 200 /* Uses enum spdk_nvmf_rdma_wr_type */ 201 uint8_t type; 202 }; 203 204 /* This structure holds commands as they are received off the wire. 205 * It must be dynamically paired with a full request object 206 * (spdk_nvmf_rdma_request) to service a request. It is separate 207 * from the request because RDMA does not appear to order 208 * completions, so occasionally we'll get a new incoming 209 * command when there aren't any free request objects. 210 */ 211 struct spdk_nvmf_rdma_recv { 212 struct ibv_recv_wr wr; 213 struct ibv_sge sgl[NVMF_DEFAULT_RX_SGE]; 214 215 struct spdk_nvmf_rdma_qpair *qpair; 216 217 /* In-capsule data buffer */ 218 uint8_t *buf; 219 220 struct spdk_nvmf_rdma_wr rdma_wr; 221 uint64_t receive_tsc; 222 223 STAILQ_ENTRY(spdk_nvmf_rdma_recv) link; 224 }; 225 226 struct spdk_nvmf_rdma_request_data { 227 struct ibv_send_wr wr; 228 struct ibv_sge sgl[SPDK_NVMF_MAX_SGL_ENTRIES]; 229 }; 230 231 struct spdk_nvmf_rdma_request { 232 struct spdk_nvmf_request req; 233 234 bool fused_failed; 235 236 struct spdk_nvmf_rdma_wr data_wr; 237 struct spdk_nvmf_rdma_wr rsp_wr; 238 239 /* Uses enum spdk_nvmf_rdma_request_state */ 240 uint8_t state; 241 242 /* Data offset in req.iov */ 243 uint32_t offset; 244 245 struct spdk_nvmf_rdma_recv *recv; 246 247 struct { 248 struct ibv_send_wr wr; 249 struct ibv_sge sgl[NVMF_DEFAULT_RSP_SGE]; 250 } rsp; 251 252 uint16_t iovpos; 253 uint16_t num_outstanding_data_wr; 254 /* Used to split Write IO with multi SGL payload */ 255 uint16_t num_remaining_data_wr; 256 uint64_t receive_tsc; 257 struct spdk_nvmf_rdma_request *fused_pair; 258 STAILQ_ENTRY(spdk_nvmf_rdma_request) state_link; 259 struct ibv_send_wr *remaining_tranfer_in_wrs; 260 struct ibv_send_wr *transfer_wr; 261 struct spdk_nvmf_rdma_request_data data; 262 }; 263 264 struct spdk_nvmf_rdma_resource_opts { 265 struct spdk_nvmf_rdma_qpair *qpair; 266 /* qp points either to an ibv_qp object or an ibv_srq object depending on the value of shared. */ 267 void *qp; 268 struct spdk_rdma_utils_mem_map *map; 269 uint32_t max_queue_depth; 270 uint32_t in_capsule_data_size; 271 bool shared; 272 }; 273 274 struct spdk_nvmf_rdma_resources { 275 /* Array of size "max_queue_depth" containing RDMA requests. */ 276 struct spdk_nvmf_rdma_request *reqs; 277 278 /* Array of size "max_queue_depth" containing RDMA recvs. */ 279 struct spdk_nvmf_rdma_recv *recvs; 280 281 /* Array of size "max_queue_depth" containing 64 byte capsules 282 * used for receive. 283 */ 284 union nvmf_h2c_msg *cmds; 285 286 /* Array of size "max_queue_depth" containing 16 byte completions 287 * to be sent back to the user. 288 */ 289 union nvmf_c2h_msg *cpls; 290 291 /* Array of size "max_queue_depth * InCapsuleDataSize" containing 292 * buffers to be used for in capsule data. 293 */ 294 void *bufs; 295 296 /* Receives that are waiting for a request object */ 297 STAILQ_HEAD(, spdk_nvmf_rdma_recv) incoming_queue; 298 299 /* Queue to track free requests */ 300 STAILQ_HEAD(, spdk_nvmf_rdma_request) free_queue; 301 }; 302 303 typedef void (*spdk_nvmf_rdma_qpair_ibv_event)(struct spdk_nvmf_rdma_qpair *rqpair); 304 305 typedef void (*spdk_poller_destroy_cb)(void *ctx); 306 307 struct spdk_nvmf_rdma_ibv_event_ctx { 308 struct spdk_nvmf_rdma_qpair *rqpair; 309 spdk_nvmf_rdma_qpair_ibv_event cb_fn; 310 /* Link to other ibv events associated with this qpair */ 311 STAILQ_ENTRY(spdk_nvmf_rdma_ibv_event_ctx) link; 312 }; 313 314 struct spdk_nvmf_rdma_qpair { 315 struct spdk_nvmf_qpair qpair; 316 317 struct spdk_nvmf_rdma_device *device; 318 struct spdk_nvmf_rdma_poller *poller; 319 320 struct spdk_rdma_provider_qp *rdma_qp; 321 struct rdma_cm_id *cm_id; 322 struct spdk_rdma_provider_srq *srq; 323 struct rdma_cm_id *listen_id; 324 325 /* Cache the QP number to improve QP search by RB tree. */ 326 uint32_t qp_num; 327 328 /* The maximum number of I/O outstanding on this connection at one time */ 329 uint16_t max_queue_depth; 330 331 /* The maximum number of active RDMA READ and ATOMIC operations at one time */ 332 uint16_t max_read_depth; 333 334 /* The maximum number of RDMA SEND operations at one time */ 335 uint32_t max_send_depth; 336 337 /* The current number of outstanding WRs from this qpair's 338 * recv queue. Should not exceed device->attr.max_queue_depth. 339 */ 340 uint16_t current_recv_depth; 341 342 /* The current number of active RDMA READ operations */ 343 uint16_t current_read_depth; 344 345 /* The current number of posted WRs from this qpair's 346 * send queue. Should not exceed max_send_depth. 347 */ 348 uint32_t current_send_depth; 349 350 /* The maximum number of SGEs per WR on the send queue */ 351 uint32_t max_send_sge; 352 353 /* The maximum number of SGEs per WR on the recv queue */ 354 uint32_t max_recv_sge; 355 356 struct spdk_nvmf_rdma_resources *resources; 357 358 STAILQ_HEAD(, spdk_nvmf_rdma_request) pending_rdma_read_queue; 359 360 STAILQ_HEAD(, spdk_nvmf_rdma_request) pending_rdma_write_queue; 361 362 STAILQ_HEAD(, spdk_nvmf_rdma_request) pending_rdma_send_queue; 363 364 /* Number of requests not in the free state */ 365 uint32_t qd; 366 367 bool ibv_in_error_state; 368 369 RB_ENTRY(spdk_nvmf_rdma_qpair) node; 370 371 STAILQ_ENTRY(spdk_nvmf_rdma_qpair) recv_link; 372 373 STAILQ_ENTRY(spdk_nvmf_rdma_qpair) send_link; 374 375 /* Points to the a request that has fuse bits set to 376 * SPDK_NVME_CMD_FUSE_FIRST, when the qpair is waiting 377 * for the request that has SPDK_NVME_CMD_FUSE_SECOND. 378 */ 379 struct spdk_nvmf_rdma_request *fused_first; 380 381 /* 382 * io_channel which is used to destroy qpair when it is removed from poll group 383 */ 384 struct spdk_io_channel *destruct_channel; 385 386 /* List of ibv async events */ 387 STAILQ_HEAD(, spdk_nvmf_rdma_ibv_event_ctx) ibv_events; 388 389 /* Lets us know that we have received the last_wqe event. */ 390 bool last_wqe_reached; 391 392 /* Indicate that nvmf_rdma_close_qpair is called */ 393 bool to_close; 394 }; 395 396 struct spdk_nvmf_rdma_poller_stat { 397 uint64_t completions; 398 uint64_t polls; 399 uint64_t idle_polls; 400 uint64_t requests; 401 uint64_t request_latency; 402 uint64_t pending_free_request; 403 uint64_t pending_rdma_read; 404 uint64_t pending_rdma_write; 405 uint64_t pending_rdma_send; 406 struct spdk_rdma_provider_qp_stats qp_stats; 407 }; 408 409 struct spdk_nvmf_rdma_poller { 410 struct spdk_nvmf_rdma_device *device; 411 struct spdk_nvmf_rdma_poll_group *group; 412 413 int num_cqe; 414 int required_num_wr; 415 struct ibv_cq *cq; 416 417 /* The maximum number of I/O outstanding on the shared receive queue at one time */ 418 uint16_t max_srq_depth; 419 bool need_destroy; 420 421 /* Shared receive queue */ 422 struct spdk_rdma_provider_srq *srq; 423 424 struct spdk_nvmf_rdma_resources *resources; 425 struct spdk_nvmf_rdma_poller_stat stat; 426 427 spdk_poller_destroy_cb destroy_cb; 428 void *destroy_cb_ctx; 429 430 RB_HEAD(qpairs_tree, spdk_nvmf_rdma_qpair) qpairs; 431 432 STAILQ_HEAD(, spdk_nvmf_rdma_qpair) qpairs_pending_recv; 433 434 STAILQ_HEAD(, spdk_nvmf_rdma_qpair) qpairs_pending_send; 435 436 TAILQ_ENTRY(spdk_nvmf_rdma_poller) link; 437 }; 438 439 struct spdk_nvmf_rdma_poll_group_stat { 440 uint64_t pending_data_buffer; 441 }; 442 443 struct spdk_nvmf_rdma_poll_group { 444 struct spdk_nvmf_transport_poll_group group; 445 struct spdk_nvmf_rdma_poll_group_stat stat; 446 TAILQ_HEAD(, spdk_nvmf_rdma_poller) pollers; 447 TAILQ_ENTRY(spdk_nvmf_rdma_poll_group) link; 448 }; 449 450 struct spdk_nvmf_rdma_conn_sched { 451 struct spdk_nvmf_rdma_poll_group *next_admin_pg; 452 struct spdk_nvmf_rdma_poll_group *next_io_pg; 453 }; 454 455 /* Assuming rdma_cm uses just one protection domain per ibv_context. */ 456 struct spdk_nvmf_rdma_device { 457 struct ibv_device_attr attr; 458 struct ibv_context *context; 459 460 struct spdk_rdma_utils_mem_map *map; 461 struct ibv_pd *pd; 462 463 int num_srq; 464 bool need_destroy; 465 bool ready_to_destroy; 466 bool is_ready; 467 468 TAILQ_ENTRY(spdk_nvmf_rdma_device) link; 469 }; 470 471 struct spdk_nvmf_rdma_port { 472 const struct spdk_nvme_transport_id *trid; 473 struct rdma_cm_id *id; 474 struct spdk_nvmf_rdma_device *device; 475 TAILQ_ENTRY(spdk_nvmf_rdma_port) link; 476 }; 477 478 struct rdma_transport_opts { 479 int num_cqe; 480 uint32_t max_srq_depth; 481 bool no_srq; 482 bool no_wr_batching; 483 int acceptor_backlog; 484 }; 485 486 struct spdk_nvmf_rdma_transport { 487 struct spdk_nvmf_transport transport; 488 struct rdma_transport_opts rdma_opts; 489 490 struct spdk_nvmf_rdma_conn_sched conn_sched; 491 492 struct rdma_event_channel *event_channel; 493 494 struct spdk_mempool *data_wr_pool; 495 496 struct spdk_poller *accept_poller; 497 498 /* fields used to poll RDMA/IB events */ 499 nfds_t npoll_fds; 500 struct pollfd *poll_fds; 501 502 TAILQ_HEAD(, spdk_nvmf_rdma_device) devices; 503 TAILQ_HEAD(, spdk_nvmf_rdma_port) ports; 504 TAILQ_HEAD(, spdk_nvmf_rdma_poll_group) poll_groups; 505 506 /* ports that are removed unexpectedly and need retry listen */ 507 TAILQ_HEAD(, spdk_nvmf_rdma_port) retry_ports; 508 }; 509 510 struct poller_manage_ctx { 511 struct spdk_nvmf_rdma_transport *rtransport; 512 struct spdk_nvmf_rdma_poll_group *rgroup; 513 struct spdk_nvmf_rdma_poller *rpoller; 514 struct spdk_nvmf_rdma_device *device; 515 516 struct spdk_thread *thread; 517 volatile int *inflight_op_counter; 518 }; 519 520 static const struct spdk_json_object_decoder rdma_transport_opts_decoder[] = { 521 { 522 "num_cqe", offsetof(struct rdma_transport_opts, num_cqe), 523 spdk_json_decode_int32, true 524 }, 525 { 526 "max_srq_depth", offsetof(struct rdma_transport_opts, max_srq_depth), 527 spdk_json_decode_uint32, true 528 }, 529 { 530 "no_srq", offsetof(struct rdma_transport_opts, no_srq), 531 spdk_json_decode_bool, true 532 }, 533 { 534 "no_wr_batching", offsetof(struct rdma_transport_opts, no_wr_batching), 535 spdk_json_decode_bool, true 536 }, 537 { 538 "acceptor_backlog", offsetof(struct rdma_transport_opts, acceptor_backlog), 539 spdk_json_decode_int32, true 540 }, 541 }; 542 543 static int 544 nvmf_rdma_qpair_compare(struct spdk_nvmf_rdma_qpair *rqpair1, struct spdk_nvmf_rdma_qpair *rqpair2) 545 { 546 return rqpair1->qp_num < rqpair2->qp_num ? -1 : rqpair1->qp_num > rqpair2->qp_num; 547 } 548 549 RB_GENERATE_STATIC(qpairs_tree, spdk_nvmf_rdma_qpair, node, nvmf_rdma_qpair_compare); 550 551 static bool nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport, 552 struct spdk_nvmf_rdma_request *rdma_req); 553 554 static void _poller_submit_sends(struct spdk_nvmf_rdma_transport *rtransport, 555 struct spdk_nvmf_rdma_poller *rpoller); 556 557 static void _poller_submit_recvs(struct spdk_nvmf_rdma_transport *rtransport, 558 struct spdk_nvmf_rdma_poller *rpoller); 559 560 static void _nvmf_rdma_remove_destroyed_device(void *c); 561 562 static inline enum spdk_nvme_media_error_status_code 563 nvmf_rdma_dif_error_to_compl_status(uint8_t err_type) { 564 enum spdk_nvme_media_error_status_code result; 565 switch (err_type) 566 { 567 case SPDK_DIF_REFTAG_ERROR: 568 result = SPDK_NVME_SC_REFERENCE_TAG_CHECK_ERROR; 569 break; 570 case SPDK_DIF_APPTAG_ERROR: 571 result = SPDK_NVME_SC_APPLICATION_TAG_CHECK_ERROR; 572 break; 573 case SPDK_DIF_GUARD_ERROR: 574 result = SPDK_NVME_SC_GUARD_CHECK_ERROR; 575 break; 576 default: 577 SPDK_UNREACHABLE(); 578 } 579 580 return result; 581 } 582 583 /* 584 * Return data_wrs to pool starting from \b data_wr 585 * Request's own response and data WR are excluded 586 */ 587 static void 588 _nvmf_rdma_request_free_data(struct spdk_nvmf_rdma_request *rdma_req, 589 struct ibv_send_wr *data_wr, 590 struct spdk_mempool *pool) 591 { 592 struct spdk_nvmf_rdma_request_data *work_requests[SPDK_NVMF_MAX_SGL_ENTRIES]; 593 struct spdk_nvmf_rdma_request_data *nvmf_data; 594 struct ibv_send_wr *next_send_wr; 595 uint64_t req_wrid = (uint64_t)&rdma_req->data_wr; 596 uint32_t num_wrs = 0; 597 598 while (data_wr && data_wr->wr_id == req_wrid) { 599 nvmf_data = SPDK_CONTAINEROF(data_wr, struct spdk_nvmf_rdma_request_data, wr); 600 memset(nvmf_data->sgl, 0, sizeof(data_wr->sg_list[0]) * data_wr->num_sge); 601 data_wr->num_sge = 0; 602 next_send_wr = data_wr->next; 603 if (data_wr != &rdma_req->data.wr) { 604 data_wr->next = NULL; 605 assert(num_wrs < SPDK_NVMF_MAX_SGL_ENTRIES); 606 work_requests[num_wrs] = nvmf_data; 607 num_wrs++; 608 } 609 data_wr = (!next_send_wr || next_send_wr == &rdma_req->rsp.wr) ? NULL : next_send_wr; 610 } 611 612 if (num_wrs) { 613 spdk_mempool_put_bulk(pool, (void **) work_requests, num_wrs); 614 } 615 } 616 617 static void 618 nvmf_rdma_request_free_data(struct spdk_nvmf_rdma_request *rdma_req, 619 struct spdk_nvmf_rdma_transport *rtransport) 620 { 621 rdma_req->num_outstanding_data_wr = 0; 622 623 _nvmf_rdma_request_free_data(rdma_req, rdma_req->transfer_wr, rtransport->data_wr_pool); 624 625 if (rdma_req->remaining_tranfer_in_wrs) { 626 _nvmf_rdma_request_free_data(rdma_req, rdma_req->remaining_tranfer_in_wrs, 627 rtransport->data_wr_pool); 628 rdma_req->remaining_tranfer_in_wrs = NULL; 629 } 630 631 rdma_req->data.wr.next = NULL; 632 rdma_req->rsp.wr.next = NULL; 633 } 634 635 static void 636 nvmf_rdma_dump_request(struct spdk_nvmf_rdma_request *req) 637 { 638 SPDK_ERRLOG("\t\tRequest Data From Pool: %d\n", req->req.data_from_pool); 639 if (req->req.cmd) { 640 SPDK_ERRLOG("\t\tRequest opcode: %d\n", req->req.cmd->nvmf_cmd.opcode); 641 } 642 if (req->recv) { 643 SPDK_ERRLOG("\t\tRequest recv wr_id%lu\n", req->recv->wr.wr_id); 644 } 645 } 646 647 static void 648 nvmf_rdma_dump_qpair_contents(struct spdk_nvmf_rdma_qpair *rqpair) 649 { 650 int i; 651 652 SPDK_ERRLOG("Dumping contents of queue pair (QID %d)\n", rqpair->qpair.qid); 653 for (i = 0; i < rqpair->max_queue_depth; i++) { 654 if (rqpair->resources->reqs[i].state != RDMA_REQUEST_STATE_FREE) { 655 nvmf_rdma_dump_request(&rqpair->resources->reqs[i]); 656 } 657 } 658 } 659 660 static void 661 nvmf_rdma_resources_destroy(struct spdk_nvmf_rdma_resources *resources) 662 { 663 spdk_free(resources->cmds); 664 spdk_free(resources->cpls); 665 spdk_free(resources->bufs); 666 spdk_free(resources->reqs); 667 spdk_free(resources->recvs); 668 free(resources); 669 } 670 671 672 static struct spdk_nvmf_rdma_resources * 673 nvmf_rdma_resources_create(struct spdk_nvmf_rdma_resource_opts *opts) 674 { 675 struct spdk_nvmf_rdma_resources *resources; 676 struct spdk_nvmf_rdma_request *rdma_req; 677 struct spdk_nvmf_rdma_recv *rdma_recv; 678 struct spdk_rdma_provider_qp *qp = NULL; 679 struct spdk_rdma_provider_srq *srq = NULL; 680 struct ibv_recv_wr *bad_wr = NULL; 681 struct spdk_rdma_utils_memory_translation translation; 682 uint32_t i; 683 int rc = 0; 684 685 resources = calloc(1, sizeof(struct spdk_nvmf_rdma_resources)); 686 if (!resources) { 687 SPDK_ERRLOG("Unable to allocate resources for receive queue.\n"); 688 return NULL; 689 } 690 691 resources->reqs = spdk_zmalloc(opts->max_queue_depth * sizeof(*resources->reqs), 692 0x1000, NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 693 resources->recvs = spdk_zmalloc(opts->max_queue_depth * sizeof(*resources->recvs), 694 0x1000, NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 695 resources->cmds = spdk_zmalloc(opts->max_queue_depth * sizeof(*resources->cmds), 696 0x1000, NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 697 resources->cpls = spdk_zmalloc(opts->max_queue_depth * sizeof(*resources->cpls), 698 0x1000, NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 699 700 if (opts->in_capsule_data_size > 0) { 701 resources->bufs = spdk_zmalloc(opts->max_queue_depth * opts->in_capsule_data_size, 702 0x1000, NULL, SPDK_ENV_LCORE_ID_ANY, 703 SPDK_MALLOC_DMA); 704 } 705 706 if (!resources->reqs || !resources->recvs || !resources->cmds || 707 !resources->cpls || (opts->in_capsule_data_size && !resources->bufs)) { 708 SPDK_ERRLOG("Unable to allocate sufficient memory for RDMA queue.\n"); 709 goto cleanup; 710 } 711 712 SPDK_DEBUGLOG(rdma, "Command Array: %p Length: %lx\n", 713 resources->cmds, opts->max_queue_depth * sizeof(*resources->cmds)); 714 SPDK_DEBUGLOG(rdma, "Completion Array: %p Length: %lx\n", 715 resources->cpls, opts->max_queue_depth * sizeof(*resources->cpls)); 716 if (resources->bufs) { 717 SPDK_DEBUGLOG(rdma, "In Capsule Data Array: %p Length: %x\n", 718 resources->bufs, opts->max_queue_depth * 719 opts->in_capsule_data_size); 720 } 721 722 /* Initialize queues */ 723 STAILQ_INIT(&resources->incoming_queue); 724 STAILQ_INIT(&resources->free_queue); 725 726 if (opts->shared) { 727 srq = (struct spdk_rdma_provider_srq *)opts->qp; 728 } else { 729 qp = (struct spdk_rdma_provider_qp *)opts->qp; 730 } 731 732 for (i = 0; i < opts->max_queue_depth; i++) { 733 rdma_recv = &resources->recvs[i]; 734 rdma_recv->qpair = opts->qpair; 735 736 /* Set up memory to receive commands */ 737 if (resources->bufs) { 738 rdma_recv->buf = (void *)((uintptr_t)resources->bufs + (i * 739 opts->in_capsule_data_size)); 740 } 741 742 rdma_recv->rdma_wr.type = RDMA_WR_TYPE_RECV; 743 744 rdma_recv->sgl[0].addr = (uintptr_t)&resources->cmds[i]; 745 rdma_recv->sgl[0].length = sizeof(resources->cmds[i]); 746 rc = spdk_rdma_utils_get_translation(opts->map, &resources->cmds[i], sizeof(resources->cmds[i]), 747 &translation); 748 if (rc) { 749 goto cleanup; 750 } 751 rdma_recv->sgl[0].lkey = spdk_rdma_utils_memory_translation_get_lkey(&translation); 752 rdma_recv->wr.num_sge = 1; 753 754 if (rdma_recv->buf) { 755 rdma_recv->sgl[1].addr = (uintptr_t)rdma_recv->buf; 756 rdma_recv->sgl[1].length = opts->in_capsule_data_size; 757 rc = spdk_rdma_utils_get_translation(opts->map, rdma_recv->buf, opts->in_capsule_data_size, 758 &translation); 759 if (rc) { 760 goto cleanup; 761 } 762 rdma_recv->sgl[1].lkey = spdk_rdma_utils_memory_translation_get_lkey(&translation); 763 rdma_recv->wr.num_sge++; 764 } 765 766 rdma_recv->wr.wr_id = (uintptr_t)&rdma_recv->rdma_wr; 767 rdma_recv->wr.sg_list = rdma_recv->sgl; 768 if (srq) { 769 spdk_rdma_provider_srq_queue_recv_wrs(srq, &rdma_recv->wr); 770 } else { 771 spdk_rdma_provider_qp_queue_recv_wrs(qp, &rdma_recv->wr); 772 } 773 } 774 775 for (i = 0; i < opts->max_queue_depth; i++) { 776 rdma_req = &resources->reqs[i]; 777 778 if (opts->qpair != NULL) { 779 rdma_req->req.qpair = &opts->qpair->qpair; 780 } else { 781 rdma_req->req.qpair = NULL; 782 } 783 rdma_req->req.cmd = NULL; 784 rdma_req->req.iovcnt = 0; 785 rdma_req->req.stripped_data = NULL; 786 787 /* Set up memory to send responses */ 788 rdma_req->req.rsp = &resources->cpls[i]; 789 790 rdma_req->rsp.sgl[0].addr = (uintptr_t)&resources->cpls[i]; 791 rdma_req->rsp.sgl[0].length = sizeof(resources->cpls[i]); 792 rc = spdk_rdma_utils_get_translation(opts->map, &resources->cpls[i], sizeof(resources->cpls[i]), 793 &translation); 794 if (rc) { 795 goto cleanup; 796 } 797 rdma_req->rsp.sgl[0].lkey = spdk_rdma_utils_memory_translation_get_lkey(&translation); 798 799 rdma_req->rsp_wr.type = RDMA_WR_TYPE_SEND; 800 rdma_req->rsp.wr.wr_id = (uintptr_t)&rdma_req->rsp_wr; 801 rdma_req->rsp.wr.next = NULL; 802 rdma_req->rsp.wr.opcode = IBV_WR_SEND; 803 rdma_req->rsp.wr.send_flags = IBV_SEND_SIGNALED; 804 rdma_req->rsp.wr.sg_list = rdma_req->rsp.sgl; 805 rdma_req->rsp.wr.num_sge = SPDK_COUNTOF(rdma_req->rsp.sgl); 806 807 /* Set up memory for data buffers */ 808 rdma_req->data_wr.type = RDMA_WR_TYPE_DATA; 809 rdma_req->data.wr.wr_id = (uintptr_t)&rdma_req->data_wr; 810 rdma_req->data.wr.next = NULL; 811 rdma_req->data.wr.send_flags = IBV_SEND_SIGNALED; 812 rdma_req->data.wr.sg_list = rdma_req->data.sgl; 813 rdma_req->data.wr.num_sge = SPDK_COUNTOF(rdma_req->data.sgl); 814 815 /* Initialize request state to FREE */ 816 rdma_req->state = RDMA_REQUEST_STATE_FREE; 817 STAILQ_INSERT_TAIL(&resources->free_queue, rdma_req, state_link); 818 } 819 820 if (srq) { 821 rc = spdk_rdma_provider_srq_flush_recv_wrs(srq, &bad_wr); 822 } else { 823 rc = spdk_rdma_provider_qp_flush_recv_wrs(qp, &bad_wr); 824 } 825 826 if (rc) { 827 goto cleanup; 828 } 829 830 return resources; 831 832 cleanup: 833 nvmf_rdma_resources_destroy(resources); 834 return NULL; 835 } 836 837 static void 838 nvmf_rdma_qpair_clean_ibv_events(struct spdk_nvmf_rdma_qpair *rqpair) 839 { 840 struct spdk_nvmf_rdma_ibv_event_ctx *ctx, *tctx; 841 STAILQ_FOREACH_SAFE(ctx, &rqpair->ibv_events, link, tctx) { 842 ctx->rqpair = NULL; 843 /* Memory allocated for ctx is freed in nvmf_rdma_qpair_process_ibv_event */ 844 STAILQ_REMOVE(&rqpair->ibv_events, ctx, spdk_nvmf_rdma_ibv_event_ctx, link); 845 } 846 } 847 848 static void nvmf_rdma_poller_destroy(struct spdk_nvmf_rdma_poller *poller); 849 850 static void 851 nvmf_rdma_qpair_destroy(struct spdk_nvmf_rdma_qpair *rqpair) 852 { 853 struct spdk_nvmf_rdma_recv *rdma_recv, *recv_tmp; 854 struct ibv_recv_wr *bad_recv_wr = NULL; 855 int rc; 856 857 spdk_trace_record(TRACE_RDMA_QP_DESTROY, 0, 0, (uintptr_t)rqpair); 858 859 if (rqpair->qd != 0) { 860 struct spdk_nvmf_qpair *qpair = &rqpair->qpair; 861 struct spdk_nvmf_rdma_transport *rtransport = SPDK_CONTAINEROF(qpair->transport, 862 struct spdk_nvmf_rdma_transport, transport); 863 struct spdk_nvmf_rdma_request *req; 864 uint32_t i, max_req_count = 0; 865 866 SPDK_WARNLOG("Destroying qpair when queue depth is %d\n", rqpair->qd); 867 868 if (rqpair->srq == NULL) { 869 nvmf_rdma_dump_qpair_contents(rqpair); 870 max_req_count = rqpair->max_queue_depth; 871 } else if (rqpair->poller && rqpair->resources) { 872 max_req_count = rqpair->poller->max_srq_depth; 873 } 874 875 SPDK_DEBUGLOG(rdma, "Release incomplete requests\n"); 876 for (i = 0; i < max_req_count; i++) { 877 req = &rqpair->resources->reqs[i]; 878 if (req->req.qpair == qpair && req->state != RDMA_REQUEST_STATE_FREE) { 879 /* nvmf_rdma_request_process checks qpair ibv and internal state 880 * and completes a request */ 881 nvmf_rdma_request_process(rtransport, req); 882 } 883 } 884 assert(rqpair->qd == 0); 885 } 886 887 if (rqpair->poller) { 888 RB_REMOVE(qpairs_tree, &rqpair->poller->qpairs, rqpair); 889 890 if (rqpair->srq != NULL && rqpair->resources != NULL) { 891 /* Drop all received but unprocessed commands for this queue and return them to SRQ */ 892 STAILQ_FOREACH_SAFE(rdma_recv, &rqpair->resources->incoming_queue, link, recv_tmp) { 893 if (rqpair == rdma_recv->qpair) { 894 STAILQ_REMOVE(&rqpair->resources->incoming_queue, rdma_recv, spdk_nvmf_rdma_recv, link); 895 spdk_rdma_provider_srq_queue_recv_wrs(rqpair->srq, &rdma_recv->wr); 896 rc = spdk_rdma_provider_srq_flush_recv_wrs(rqpair->srq, &bad_recv_wr); 897 if (rc) { 898 SPDK_ERRLOG("Unable to re-post rx descriptor\n"); 899 } 900 } 901 } 902 } 903 } 904 905 if (rqpair->cm_id) { 906 if (rqpair->rdma_qp != NULL) { 907 spdk_rdma_provider_qp_destroy(rqpair->rdma_qp); 908 rqpair->rdma_qp = NULL; 909 } 910 911 if (rqpair->poller != NULL && rqpair->srq == NULL) { 912 rqpair->poller->required_num_wr -= MAX_WR_PER_QP(rqpair->max_queue_depth); 913 } 914 } 915 916 if (rqpair->srq == NULL && rqpair->resources != NULL) { 917 nvmf_rdma_resources_destroy(rqpair->resources); 918 } 919 920 nvmf_rdma_qpair_clean_ibv_events(rqpair); 921 922 if (rqpair->destruct_channel) { 923 spdk_put_io_channel(rqpair->destruct_channel); 924 rqpair->destruct_channel = NULL; 925 } 926 927 if (rqpair->poller && rqpair->poller->need_destroy && RB_EMPTY(&rqpair->poller->qpairs)) { 928 nvmf_rdma_poller_destroy(rqpair->poller); 929 } 930 931 /* destroy cm_id last so cma device will not be freed before we destroy the cq. */ 932 if (rqpair->cm_id) { 933 rdma_destroy_id(rqpair->cm_id); 934 } 935 936 free(rqpair); 937 } 938 939 static int 940 nvmf_rdma_resize_cq(struct spdk_nvmf_rdma_qpair *rqpair, struct spdk_nvmf_rdma_device *device) 941 { 942 struct spdk_nvmf_rdma_poller *rpoller; 943 int rc, num_cqe, required_num_wr; 944 945 /* Enlarge CQ size dynamically */ 946 rpoller = rqpair->poller; 947 required_num_wr = rpoller->required_num_wr + MAX_WR_PER_QP(rqpair->max_queue_depth); 948 num_cqe = rpoller->num_cqe; 949 if (num_cqe < required_num_wr) { 950 num_cqe = spdk_max(num_cqe * 2, required_num_wr); 951 num_cqe = spdk_min(num_cqe, device->attr.max_cqe); 952 } 953 954 if (rpoller->num_cqe != num_cqe) { 955 if (device->context->device->transport_type == IBV_TRANSPORT_IWARP) { 956 SPDK_ERRLOG("iWARP doesn't support CQ resize. Current capacity %u, required %u\n" 957 "Using CQ of insufficient size may lead to CQ overrun\n", rpoller->num_cqe, num_cqe); 958 return -1; 959 } 960 if (required_num_wr > device->attr.max_cqe) { 961 SPDK_ERRLOG("RDMA CQE requirement (%d) exceeds device max_cqe limitation (%d)\n", 962 required_num_wr, device->attr.max_cqe); 963 return -1; 964 } 965 966 SPDK_DEBUGLOG(rdma, "Resize RDMA CQ from %d to %d\n", rpoller->num_cqe, num_cqe); 967 rc = ibv_resize_cq(rpoller->cq, num_cqe); 968 if (rc) { 969 SPDK_ERRLOG("RDMA CQ resize failed: errno %d: %s\n", errno, spdk_strerror(errno)); 970 return -1; 971 } 972 973 rpoller->num_cqe = num_cqe; 974 } 975 976 rpoller->required_num_wr = required_num_wr; 977 return 0; 978 } 979 980 static int 981 nvmf_rdma_qpair_initialize(struct spdk_nvmf_qpair *qpair) 982 { 983 struct spdk_nvmf_rdma_qpair *rqpair; 984 struct spdk_nvmf_rdma_transport *rtransport; 985 struct spdk_nvmf_transport *transport; 986 struct spdk_nvmf_rdma_resource_opts opts; 987 struct spdk_nvmf_rdma_device *device; 988 struct spdk_rdma_provider_qp_init_attr qp_init_attr = {}; 989 990 rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair); 991 device = rqpair->device; 992 993 qp_init_attr.qp_context = rqpair; 994 qp_init_attr.pd = device->pd; 995 qp_init_attr.send_cq = rqpair->poller->cq; 996 qp_init_attr.recv_cq = rqpair->poller->cq; 997 998 if (rqpair->srq) { 999 qp_init_attr.srq = rqpair->srq->srq; 1000 } else { 1001 qp_init_attr.cap.max_recv_wr = rqpair->max_queue_depth; 1002 } 1003 1004 /* SEND, READ, and WRITE operations */ 1005 qp_init_attr.cap.max_send_wr = (uint32_t)rqpair->max_queue_depth * 2; 1006 qp_init_attr.cap.max_send_sge = spdk_min((uint32_t)device->attr.max_sge, NVMF_DEFAULT_TX_SGE); 1007 qp_init_attr.cap.max_recv_sge = spdk_min((uint32_t)device->attr.max_sge, NVMF_DEFAULT_RX_SGE); 1008 qp_init_attr.stats = &rqpair->poller->stat.qp_stats; 1009 1010 if (rqpair->srq == NULL && nvmf_rdma_resize_cq(rqpair, device) < 0) { 1011 SPDK_ERRLOG("Failed to resize the completion queue. Cannot initialize qpair.\n"); 1012 goto error; 1013 } 1014 1015 rqpair->rdma_qp = spdk_rdma_provider_qp_create(rqpair->cm_id, &qp_init_attr); 1016 if (!rqpair->rdma_qp) { 1017 goto error; 1018 } 1019 1020 rqpair->qp_num = rqpair->rdma_qp->qp->qp_num; 1021 1022 rqpair->max_send_depth = spdk_min((uint32_t)(rqpair->max_queue_depth * 2), 1023 qp_init_attr.cap.max_send_wr); 1024 rqpair->max_send_sge = spdk_min(NVMF_DEFAULT_TX_SGE, qp_init_attr.cap.max_send_sge); 1025 rqpair->max_recv_sge = spdk_min(NVMF_DEFAULT_RX_SGE, qp_init_attr.cap.max_recv_sge); 1026 spdk_trace_record(TRACE_RDMA_QP_CREATE, 0, 0, (uintptr_t)rqpair); 1027 SPDK_DEBUGLOG(rdma, "New RDMA Connection: %p\n", qpair); 1028 1029 if (rqpair->poller->srq == NULL) { 1030 rtransport = SPDK_CONTAINEROF(qpair->transport, struct spdk_nvmf_rdma_transport, transport); 1031 transport = &rtransport->transport; 1032 1033 opts.qp = rqpair->rdma_qp; 1034 opts.map = device->map; 1035 opts.qpair = rqpair; 1036 opts.shared = false; 1037 opts.max_queue_depth = rqpair->max_queue_depth; 1038 opts.in_capsule_data_size = transport->opts.in_capsule_data_size; 1039 1040 rqpair->resources = nvmf_rdma_resources_create(&opts); 1041 1042 if (!rqpair->resources) { 1043 SPDK_ERRLOG("Unable to allocate resources for receive queue.\n"); 1044 rdma_destroy_qp(rqpair->cm_id); 1045 goto error; 1046 } 1047 } else { 1048 rqpair->resources = rqpair->poller->resources; 1049 } 1050 1051 rqpair->current_recv_depth = 0; 1052 STAILQ_INIT(&rqpair->pending_rdma_read_queue); 1053 STAILQ_INIT(&rqpair->pending_rdma_write_queue); 1054 STAILQ_INIT(&rqpair->pending_rdma_send_queue); 1055 rqpair->qpair.queue_depth = 0; 1056 1057 return 0; 1058 1059 error: 1060 rdma_destroy_id(rqpair->cm_id); 1061 rqpair->cm_id = NULL; 1062 return -1; 1063 } 1064 1065 /* Append the given recv wr structure to the resource structs outstanding recvs list. */ 1066 /* This function accepts either a single wr or the first wr in a linked list. */ 1067 static void 1068 nvmf_rdma_qpair_queue_recv_wrs(struct spdk_nvmf_rdma_qpair *rqpair, struct ibv_recv_wr *first) 1069 { 1070 struct spdk_nvmf_rdma_transport *rtransport = SPDK_CONTAINEROF(rqpair->qpair.transport, 1071 struct spdk_nvmf_rdma_transport, transport); 1072 1073 if (rqpair->srq != NULL) { 1074 spdk_rdma_provider_srq_queue_recv_wrs(rqpair->srq, first); 1075 } else { 1076 if (spdk_rdma_provider_qp_queue_recv_wrs(rqpair->rdma_qp, first)) { 1077 STAILQ_INSERT_TAIL(&rqpair->poller->qpairs_pending_recv, rqpair, recv_link); 1078 } 1079 } 1080 1081 if (rtransport->rdma_opts.no_wr_batching) { 1082 _poller_submit_recvs(rtransport, rqpair->poller); 1083 } 1084 } 1085 1086 static inline void 1087 request_transfer_in(struct spdk_nvmf_request *req) 1088 { 1089 struct spdk_nvmf_rdma_request *rdma_req; 1090 struct spdk_nvmf_qpair *qpair; 1091 struct spdk_nvmf_rdma_qpair *rqpair; 1092 struct spdk_nvmf_rdma_transport *rtransport; 1093 1094 qpair = req->qpair; 1095 rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req); 1096 rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair); 1097 rtransport = SPDK_CONTAINEROF(rqpair->qpair.transport, 1098 struct spdk_nvmf_rdma_transport, transport); 1099 1100 assert(req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER); 1101 assert(rdma_req != NULL); 1102 1103 if (spdk_rdma_provider_qp_queue_send_wrs(rqpair->rdma_qp, rdma_req->transfer_wr)) { 1104 STAILQ_INSERT_TAIL(&rqpair->poller->qpairs_pending_send, rqpair, send_link); 1105 } 1106 if (rtransport->rdma_opts.no_wr_batching) { 1107 _poller_submit_sends(rtransport, rqpair->poller); 1108 } 1109 1110 assert(rqpair->current_read_depth + rdma_req->num_outstanding_data_wr <= rqpair->max_read_depth); 1111 rqpair->current_read_depth += rdma_req->num_outstanding_data_wr; 1112 assert(rqpair->current_send_depth + rdma_req->num_outstanding_data_wr <= rqpair->max_send_depth); 1113 rqpair->current_send_depth += rdma_req->num_outstanding_data_wr; 1114 } 1115 1116 static inline void 1117 nvmf_rdma_request_reset_transfer_in(struct spdk_nvmf_rdma_request *rdma_req, 1118 struct spdk_nvmf_rdma_transport *rtransport) 1119 { 1120 /* Put completed WRs back to pool and move transfer_wr pointer */ 1121 _nvmf_rdma_request_free_data(rdma_req, rdma_req->transfer_wr, rtransport->data_wr_pool); 1122 rdma_req->transfer_wr = rdma_req->remaining_tranfer_in_wrs; 1123 rdma_req->remaining_tranfer_in_wrs = NULL; 1124 rdma_req->num_outstanding_data_wr = rdma_req->num_remaining_data_wr; 1125 rdma_req->num_remaining_data_wr = 0; 1126 } 1127 1128 static inline int 1129 request_prepare_transfer_in_part(struct spdk_nvmf_request *req, uint32_t num_reads_available) 1130 { 1131 struct spdk_nvmf_rdma_request *rdma_req; 1132 struct ibv_send_wr *wr; 1133 uint32_t i; 1134 1135 rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req); 1136 1137 assert(req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER); 1138 assert(rdma_req != NULL); 1139 assert(num_reads_available > 0); 1140 assert(rdma_req->num_outstanding_data_wr > num_reads_available); 1141 wr = rdma_req->transfer_wr; 1142 1143 for (i = 0; i < num_reads_available - 1; i++) { 1144 wr = wr->next; 1145 } 1146 1147 rdma_req->remaining_tranfer_in_wrs = wr->next; 1148 rdma_req->num_remaining_data_wr = rdma_req->num_outstanding_data_wr - num_reads_available; 1149 rdma_req->num_outstanding_data_wr = num_reads_available; 1150 /* Break chain of WRs to send only part. Once this portion completes, we continue sending RDMA_READs */ 1151 wr->next = NULL; 1152 1153 return 0; 1154 } 1155 1156 static int 1157 request_transfer_out(struct spdk_nvmf_request *req, int *data_posted) 1158 { 1159 int num_outstanding_data_wr = 0; 1160 struct spdk_nvmf_rdma_request *rdma_req; 1161 struct spdk_nvmf_qpair *qpair; 1162 struct spdk_nvmf_rdma_qpair *rqpair; 1163 struct spdk_nvme_cpl *rsp; 1164 struct ibv_send_wr *first = NULL; 1165 struct spdk_nvmf_rdma_transport *rtransport; 1166 1167 *data_posted = 0; 1168 qpair = req->qpair; 1169 rsp = &req->rsp->nvme_cpl; 1170 rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req); 1171 rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair); 1172 rtransport = SPDK_CONTAINEROF(rqpair->qpair.transport, 1173 struct spdk_nvmf_rdma_transport, transport); 1174 1175 /* Advance our sq_head pointer */ 1176 if (qpair->sq_head == qpair->sq_head_max) { 1177 qpair->sq_head = 0; 1178 } else { 1179 qpair->sq_head++; 1180 } 1181 rsp->sqhd = qpair->sq_head; 1182 1183 /* queue the capsule for the recv buffer */ 1184 assert(rdma_req->recv != NULL); 1185 1186 nvmf_rdma_qpair_queue_recv_wrs(rqpair, &rdma_req->recv->wr); 1187 1188 rdma_req->recv = NULL; 1189 assert(rqpair->current_recv_depth > 0); 1190 rqpair->current_recv_depth--; 1191 1192 /* Build the response which consists of optional 1193 * RDMA WRITEs to transfer data, plus an RDMA SEND 1194 * containing the response. 1195 */ 1196 first = &rdma_req->rsp.wr; 1197 1198 if (spdk_unlikely(rsp->status.sc != SPDK_NVME_SC_SUCCESS)) { 1199 /* On failure, data was not read from the controller. So clear the 1200 * number of outstanding data WRs to zero. 1201 */ 1202 rdma_req->num_outstanding_data_wr = 0; 1203 } else if (req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) { 1204 first = rdma_req->transfer_wr; 1205 *data_posted = 1; 1206 num_outstanding_data_wr = rdma_req->num_outstanding_data_wr; 1207 } 1208 if (spdk_rdma_provider_qp_queue_send_wrs(rqpair->rdma_qp, first)) { 1209 STAILQ_INSERT_TAIL(&rqpair->poller->qpairs_pending_send, rqpair, send_link); 1210 } 1211 if (rtransport->rdma_opts.no_wr_batching) { 1212 _poller_submit_sends(rtransport, rqpair->poller); 1213 } 1214 1215 /* +1 for the rsp wr */ 1216 assert(rqpair->current_send_depth + num_outstanding_data_wr + 1 <= rqpair->max_send_depth); 1217 rqpair->current_send_depth += num_outstanding_data_wr + 1; 1218 1219 return 0; 1220 } 1221 1222 static int 1223 nvmf_rdma_event_accept(struct rdma_cm_id *id, struct spdk_nvmf_rdma_qpair *rqpair) 1224 { 1225 struct spdk_nvmf_rdma_accept_private_data accept_data; 1226 struct rdma_conn_param ctrlr_event_data = {}; 1227 int rc; 1228 1229 accept_data.recfmt = 0; 1230 accept_data.crqsize = rqpair->max_queue_depth; 1231 1232 ctrlr_event_data.private_data = &accept_data; 1233 ctrlr_event_data.private_data_len = sizeof(accept_data); 1234 if (id->ps == RDMA_PS_TCP) { 1235 ctrlr_event_data.responder_resources = 0; /* We accept 0 reads from the host */ 1236 ctrlr_event_data.initiator_depth = rqpair->max_read_depth; 1237 } 1238 1239 /* Configure infinite retries for the initiator side qpair. 1240 * We need to pass this value to the initiator to prevent the 1241 * initiator side NIC from completing SEND requests back to the 1242 * initiator with status rnr_retry_count_exceeded. */ 1243 ctrlr_event_data.rnr_retry_count = 0x7; 1244 1245 /* When qpair is created without use of rdma cm API, an additional 1246 * information must be provided to initiator in the connection response: 1247 * whether qpair is using SRQ and its qp_num 1248 * Fields below are ignored by rdma cm if qpair has been 1249 * created using rdma cm API. */ 1250 ctrlr_event_data.srq = rqpair->srq ? 1 : 0; 1251 ctrlr_event_data.qp_num = rqpair->qp_num; 1252 1253 rc = spdk_rdma_provider_qp_accept(rqpair->rdma_qp, &ctrlr_event_data); 1254 if (rc) { 1255 SPDK_ERRLOG("Error %d on spdk_rdma_provider_qp_accept\n", errno); 1256 } else { 1257 SPDK_DEBUGLOG(rdma, "Sent back the accept\n"); 1258 } 1259 1260 return rc; 1261 } 1262 1263 static void 1264 nvmf_rdma_event_reject(struct rdma_cm_id *id, enum spdk_nvmf_rdma_transport_error error) 1265 { 1266 struct spdk_nvmf_rdma_reject_private_data rej_data; 1267 1268 rej_data.recfmt = 0; 1269 rej_data.sts = error; 1270 1271 rdma_reject(id, &rej_data, sizeof(rej_data)); 1272 } 1273 1274 static int 1275 nvmf_rdma_connect(struct spdk_nvmf_transport *transport, struct rdma_cm_event *event) 1276 { 1277 struct spdk_nvmf_rdma_transport *rtransport; 1278 struct spdk_nvmf_rdma_qpair *rqpair = NULL; 1279 struct spdk_nvmf_rdma_port *port; 1280 struct rdma_conn_param *rdma_param = NULL; 1281 const struct spdk_nvmf_rdma_request_private_data *private_data = NULL; 1282 uint16_t max_queue_depth; 1283 uint16_t max_read_depth; 1284 1285 rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport); 1286 1287 assert(event->id != NULL); /* Impossible. Can't even reject the connection. */ 1288 assert(event->id->verbs != NULL); /* Impossible. No way to handle this. */ 1289 1290 rdma_param = &event->param.conn; 1291 if (rdma_param->private_data == NULL || 1292 rdma_param->private_data_len < sizeof(struct spdk_nvmf_rdma_request_private_data)) { 1293 SPDK_ERRLOG("connect request: no private data provided\n"); 1294 nvmf_rdma_event_reject(event->id, SPDK_NVMF_RDMA_ERROR_INVALID_PRIVATE_DATA_LENGTH); 1295 return -1; 1296 } 1297 1298 private_data = rdma_param->private_data; 1299 if (private_data->recfmt != 0) { 1300 SPDK_ERRLOG("Received RDMA private data with RECFMT != 0\n"); 1301 nvmf_rdma_event_reject(event->id, SPDK_NVMF_RDMA_ERROR_INVALID_RECFMT); 1302 return -1; 1303 } 1304 1305 SPDK_DEBUGLOG(rdma, "Connect Recv on fabric intf name %s, dev_name %s\n", 1306 event->id->verbs->device->name, event->id->verbs->device->dev_name); 1307 1308 port = event->listen_id->context; 1309 SPDK_DEBUGLOG(rdma, "Listen Id was %p with verbs %p. ListenAddr: %p\n", 1310 event->listen_id, event->listen_id->verbs, port); 1311 1312 /* Figure out the supported queue depth. This is a multi-step process 1313 * that takes into account hardware maximums, host provided values, 1314 * and our target's internal memory limits */ 1315 1316 SPDK_DEBUGLOG(rdma, "Calculating Queue Depth\n"); 1317 1318 /* Start with the maximum queue depth allowed by the target */ 1319 max_queue_depth = rtransport->transport.opts.max_queue_depth; 1320 max_read_depth = rtransport->transport.opts.max_queue_depth; 1321 SPDK_DEBUGLOG(rdma, "Target Max Queue Depth: %d\n", 1322 rtransport->transport.opts.max_queue_depth); 1323 1324 /* Next check the local NIC's hardware limitations */ 1325 SPDK_DEBUGLOG(rdma, 1326 "Local NIC Max Send/Recv Queue Depth: %d Max Read/Write Queue Depth: %d\n", 1327 port->device->attr.max_qp_wr, port->device->attr.max_qp_rd_atom); 1328 max_queue_depth = spdk_min(max_queue_depth, port->device->attr.max_qp_wr); 1329 max_read_depth = spdk_min(max_read_depth, port->device->attr.max_qp_init_rd_atom); 1330 1331 /* Next check the remote NIC's hardware limitations */ 1332 SPDK_DEBUGLOG(rdma, 1333 "Host (Initiator) NIC Max Incoming RDMA R/W operations: %d Max Outgoing RDMA R/W operations: %d\n", 1334 rdma_param->initiator_depth, rdma_param->responder_resources); 1335 /* from man3 rdma_get_cm_event 1336 * responder_resources - Specifies the number of responder resources that is requested by the recipient. 1337 * The responder_resources field must match the initiator depth specified by the remote node when running 1338 * the rdma_connect and rdma_accept functions. */ 1339 if (rdma_param->responder_resources != 0) { 1340 if (private_data->qid) { 1341 SPDK_DEBUGLOG(rdma, "Host (Initiator) is not allowed to use RDMA operations," 1342 " responder_resources must be 0 but set to %u\n", 1343 rdma_param->responder_resources); 1344 } else { 1345 SPDK_WARNLOG("Host (Initiator) is not allowed to use RDMA operations," 1346 " responder_resources must be 0 but set to %u\n", 1347 rdma_param->responder_resources); 1348 } 1349 } 1350 /* from man3 rdma_get_cm_event 1351 * initiator_depth - Specifies the maximum number of outstanding RDMA read operations that the recipient holds. 1352 * The initiator_depth field must match the responder resources specified by the remote node when running 1353 * the rdma_connect and rdma_accept functions. */ 1354 if (rdma_param->initiator_depth == 0) { 1355 SPDK_ERRLOG("Host (Initiator) doesn't support RDMA_READ or atomic operations\n"); 1356 nvmf_rdma_event_reject(event->id, SPDK_NVMF_RDMA_ERROR_INVALID_IRD); 1357 return -1; 1358 } 1359 max_read_depth = spdk_min(max_read_depth, rdma_param->initiator_depth); 1360 1361 SPDK_DEBUGLOG(rdma, "Host Receive Queue Size: %d\n", private_data->hrqsize); 1362 SPDK_DEBUGLOG(rdma, "Host Send Queue Size: %d\n", private_data->hsqsize); 1363 max_queue_depth = spdk_min(max_queue_depth, private_data->hrqsize); 1364 max_queue_depth = spdk_min(max_queue_depth, private_data->hsqsize + 1); 1365 1366 SPDK_DEBUGLOG(rdma, "Final Negotiated Queue Depth: %d R/W Depth: %d\n", 1367 max_queue_depth, max_read_depth); 1368 1369 rqpair = calloc(1, sizeof(struct spdk_nvmf_rdma_qpair)); 1370 if (rqpair == NULL) { 1371 SPDK_ERRLOG("Could not allocate new connection.\n"); 1372 nvmf_rdma_event_reject(event->id, SPDK_NVMF_RDMA_ERROR_NO_RESOURCES); 1373 return -1; 1374 } 1375 1376 rqpair->device = port->device; 1377 rqpair->max_queue_depth = max_queue_depth; 1378 rqpair->max_read_depth = max_read_depth; 1379 rqpair->cm_id = event->id; 1380 rqpair->listen_id = event->listen_id; 1381 rqpair->qpair.transport = transport; 1382 STAILQ_INIT(&rqpair->ibv_events); 1383 /* use qid from the private data to determine the qpair type 1384 qid will be set to the appropriate value when the controller is created */ 1385 rqpair->qpair.qid = private_data->qid; 1386 rqpair->qpair.numa.id_valid = 1; 1387 rqpair->qpair.numa.id = spdk_rdma_cm_id_get_numa_id(rqpair->cm_id); 1388 1389 event->id->context = &rqpair->qpair; 1390 1391 spdk_nvmf_tgt_new_qpair(transport->tgt, &rqpair->qpair); 1392 1393 return 0; 1394 } 1395 1396 static inline void 1397 nvmf_rdma_setup_wr(struct ibv_send_wr *wr, struct ibv_send_wr *next, 1398 enum spdk_nvme_data_transfer xfer) 1399 { 1400 if (xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) { 1401 wr->opcode = IBV_WR_RDMA_WRITE; 1402 wr->send_flags = 0; 1403 wr->next = next; 1404 } else if (xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) { 1405 wr->opcode = IBV_WR_RDMA_READ; 1406 wr->send_flags = IBV_SEND_SIGNALED; 1407 wr->next = NULL; 1408 } else { 1409 assert(0); 1410 } 1411 } 1412 1413 static int 1414 nvmf_request_alloc_wrs(struct spdk_nvmf_rdma_transport *rtransport, 1415 struct spdk_nvmf_rdma_request *rdma_req, 1416 uint32_t num_sgl_descriptors) 1417 { 1418 struct spdk_nvmf_rdma_request_data *work_requests[SPDK_NVMF_MAX_SGL_ENTRIES]; 1419 struct spdk_nvmf_rdma_request_data *current_data_wr; 1420 uint32_t i; 1421 1422 if (spdk_unlikely(num_sgl_descriptors > SPDK_NVMF_MAX_SGL_ENTRIES)) { 1423 SPDK_ERRLOG("Requested too much entries (%u), the limit is %u\n", 1424 num_sgl_descriptors, SPDK_NVMF_MAX_SGL_ENTRIES); 1425 return -EINVAL; 1426 } 1427 1428 if (spdk_unlikely(spdk_mempool_get_bulk(rtransport->data_wr_pool, (void **)work_requests, 1429 num_sgl_descriptors))) { 1430 return -ENOMEM; 1431 } 1432 1433 current_data_wr = &rdma_req->data; 1434 1435 for (i = 0; i < num_sgl_descriptors; i++) { 1436 nvmf_rdma_setup_wr(¤t_data_wr->wr, &work_requests[i]->wr, rdma_req->req.xfer); 1437 current_data_wr->wr.next = &work_requests[i]->wr; 1438 current_data_wr = work_requests[i]; 1439 current_data_wr->wr.sg_list = current_data_wr->sgl; 1440 current_data_wr->wr.wr_id = rdma_req->data.wr.wr_id; 1441 } 1442 1443 nvmf_rdma_setup_wr(¤t_data_wr->wr, &rdma_req->rsp.wr, rdma_req->req.xfer); 1444 1445 return 0; 1446 } 1447 1448 static inline void 1449 nvmf_rdma_setup_request(struct spdk_nvmf_rdma_request *rdma_req) 1450 { 1451 struct ibv_send_wr *wr = &rdma_req->data.wr; 1452 struct spdk_nvme_sgl_descriptor *sgl = &rdma_req->req.cmd->nvme_cmd.dptr.sgl1; 1453 1454 wr->wr.rdma.rkey = sgl->keyed.key; 1455 wr->wr.rdma.remote_addr = sgl->address; 1456 nvmf_rdma_setup_wr(wr, &rdma_req->rsp.wr, rdma_req->req.xfer); 1457 } 1458 1459 static inline void 1460 nvmf_rdma_update_remote_addr(struct spdk_nvmf_rdma_request *rdma_req, uint32_t num_wrs) 1461 { 1462 struct ibv_send_wr *wr = &rdma_req->data.wr; 1463 struct spdk_nvme_sgl_descriptor *sgl = &rdma_req->req.cmd->nvme_cmd.dptr.sgl1; 1464 uint32_t i; 1465 int j; 1466 uint64_t remote_addr_offset = 0; 1467 1468 for (i = 0; i < num_wrs; ++i) { 1469 wr->wr.rdma.rkey = sgl->keyed.key; 1470 wr->wr.rdma.remote_addr = sgl->address + remote_addr_offset; 1471 for (j = 0; j < wr->num_sge; ++j) { 1472 remote_addr_offset += wr->sg_list[j].length; 1473 } 1474 wr = wr->next; 1475 } 1476 } 1477 1478 static int 1479 nvmf_rdma_fill_wr_sgl(struct spdk_nvmf_rdma_device *device, 1480 struct spdk_nvmf_rdma_request *rdma_req, 1481 struct ibv_send_wr *wr, 1482 uint32_t total_length) 1483 { 1484 struct spdk_rdma_utils_memory_translation mem_translation; 1485 struct ibv_sge *sg_ele; 1486 struct iovec *iov; 1487 uint32_t lkey, remaining; 1488 int rc; 1489 1490 wr->num_sge = 0; 1491 1492 while (total_length && wr->num_sge < SPDK_NVMF_MAX_SGL_ENTRIES) { 1493 iov = &rdma_req->req.iov[rdma_req->iovpos]; 1494 rc = spdk_rdma_utils_get_translation(device->map, iov->iov_base, iov->iov_len, &mem_translation); 1495 if (spdk_unlikely(rc)) { 1496 return rc; 1497 } 1498 1499 lkey = spdk_rdma_utils_memory_translation_get_lkey(&mem_translation); 1500 sg_ele = &wr->sg_list[wr->num_sge]; 1501 remaining = spdk_min((uint32_t)iov->iov_len - rdma_req->offset, total_length); 1502 1503 sg_ele->lkey = lkey; 1504 sg_ele->addr = (uintptr_t)iov->iov_base + rdma_req->offset; 1505 sg_ele->length = remaining; 1506 SPDK_DEBUGLOG(rdma, "sge[%d] %p addr 0x%"PRIx64", len %u\n", wr->num_sge, sg_ele, sg_ele->addr, 1507 sg_ele->length); 1508 rdma_req->offset += sg_ele->length; 1509 total_length -= sg_ele->length; 1510 wr->num_sge++; 1511 1512 if (rdma_req->offset == iov->iov_len) { 1513 rdma_req->offset = 0; 1514 rdma_req->iovpos++; 1515 } 1516 } 1517 1518 if (spdk_unlikely(total_length)) { 1519 SPDK_ERRLOG("Not enough SG entries to hold data buffer\n"); 1520 return -EINVAL; 1521 } 1522 1523 return 0; 1524 } 1525 1526 static int 1527 nvmf_rdma_fill_wr_sgl_with_dif(struct spdk_nvmf_rdma_device *device, 1528 struct spdk_nvmf_rdma_request *rdma_req, 1529 struct ibv_send_wr *wr, 1530 uint32_t total_length, 1531 uint32_t num_extra_wrs) 1532 { 1533 struct spdk_rdma_utils_memory_translation mem_translation; 1534 struct spdk_dif_ctx *dif_ctx = &rdma_req->req.dif.dif_ctx; 1535 struct ibv_sge *sg_ele; 1536 struct iovec *iov; 1537 struct iovec *rdma_iov; 1538 uint32_t lkey, remaining; 1539 uint32_t remaining_data_block, data_block_size, md_size; 1540 uint32_t sge_len; 1541 int rc; 1542 1543 data_block_size = dif_ctx->block_size - dif_ctx->md_size; 1544 1545 if (spdk_likely(!rdma_req->req.stripped_data)) { 1546 rdma_iov = rdma_req->req.iov; 1547 remaining_data_block = data_block_size; 1548 md_size = dif_ctx->md_size; 1549 } else { 1550 rdma_iov = rdma_req->req.stripped_data->iov; 1551 total_length = total_length / dif_ctx->block_size * data_block_size; 1552 remaining_data_block = total_length; 1553 md_size = 0; 1554 } 1555 1556 wr->num_sge = 0; 1557 1558 while (total_length && (num_extra_wrs || wr->num_sge < SPDK_NVMF_MAX_SGL_ENTRIES)) { 1559 iov = rdma_iov + rdma_req->iovpos; 1560 rc = spdk_rdma_utils_get_translation(device->map, iov->iov_base, iov->iov_len, &mem_translation); 1561 if (spdk_unlikely(rc)) { 1562 return rc; 1563 } 1564 1565 lkey = spdk_rdma_utils_memory_translation_get_lkey(&mem_translation); 1566 sg_ele = &wr->sg_list[wr->num_sge]; 1567 remaining = spdk_min((uint32_t)iov->iov_len - rdma_req->offset, total_length); 1568 1569 while (remaining) { 1570 if (wr->num_sge >= SPDK_NVMF_MAX_SGL_ENTRIES) { 1571 if (num_extra_wrs > 0 && wr->next) { 1572 wr = wr->next; 1573 wr->num_sge = 0; 1574 sg_ele = &wr->sg_list[wr->num_sge]; 1575 num_extra_wrs--; 1576 } else { 1577 break; 1578 } 1579 } 1580 sg_ele->lkey = lkey; 1581 sg_ele->addr = (uintptr_t)((char *)iov->iov_base + rdma_req->offset); 1582 sge_len = spdk_min(remaining, remaining_data_block); 1583 sg_ele->length = sge_len; 1584 SPDK_DEBUGLOG(rdma, "sge[%d] %p addr 0x%"PRIx64", len %u\n", wr->num_sge, sg_ele, 1585 sg_ele->addr, sg_ele->length); 1586 remaining -= sge_len; 1587 remaining_data_block -= sge_len; 1588 rdma_req->offset += sge_len; 1589 total_length -= sge_len; 1590 1591 sg_ele++; 1592 wr->num_sge++; 1593 1594 if (remaining_data_block == 0) { 1595 /* skip metadata */ 1596 rdma_req->offset += md_size; 1597 total_length -= md_size; 1598 /* Metadata that do not fit this IO buffer will be included in the next IO buffer */ 1599 remaining -= spdk_min(remaining, md_size); 1600 remaining_data_block = data_block_size; 1601 } 1602 1603 if (remaining == 0) { 1604 /* By subtracting the size of the last IOV from the offset, we ensure that we skip 1605 the remaining metadata bits at the beginning of the next buffer */ 1606 rdma_req->offset -= spdk_min(iov->iov_len, rdma_req->offset); 1607 rdma_req->iovpos++; 1608 } 1609 } 1610 } 1611 1612 if (spdk_unlikely(total_length)) { 1613 SPDK_ERRLOG("Not enough SG entries to hold data buffer\n"); 1614 return -EINVAL; 1615 } 1616 1617 return 0; 1618 } 1619 1620 static inline uint32_t 1621 nvmf_rdma_calc_num_wrs(uint32_t length, uint32_t io_unit_size, uint32_t block_size) 1622 { 1623 /* estimate the number of SG entries and WRs needed to process the request */ 1624 uint32_t num_sge = 0; 1625 uint32_t i; 1626 uint32_t num_buffers = SPDK_CEIL_DIV(length, io_unit_size); 1627 1628 for (i = 0; i < num_buffers && length > 0; i++) { 1629 uint32_t buffer_len = spdk_min(length, io_unit_size); 1630 uint32_t num_sge_in_block = SPDK_CEIL_DIV(buffer_len, block_size); 1631 1632 if (num_sge_in_block * block_size > buffer_len) { 1633 ++num_sge_in_block; 1634 } 1635 num_sge += num_sge_in_block; 1636 length -= buffer_len; 1637 } 1638 return SPDK_CEIL_DIV(num_sge, SPDK_NVMF_MAX_SGL_ENTRIES); 1639 } 1640 1641 static int 1642 nvmf_rdma_request_fill_iovs(struct spdk_nvmf_rdma_transport *rtransport, 1643 struct spdk_nvmf_rdma_device *device, 1644 struct spdk_nvmf_rdma_request *rdma_req) 1645 { 1646 struct spdk_nvmf_rdma_qpair *rqpair; 1647 struct spdk_nvmf_rdma_poll_group *rgroup; 1648 struct spdk_nvmf_request *req = &rdma_req->req; 1649 struct ibv_send_wr *wr = &rdma_req->data.wr; 1650 int rc; 1651 uint32_t num_wrs = 1; 1652 uint32_t length; 1653 1654 rqpair = SPDK_CONTAINEROF(req->qpair, struct spdk_nvmf_rdma_qpair, qpair); 1655 rgroup = rqpair->poller->group; 1656 1657 /* rdma wr specifics */ 1658 nvmf_rdma_setup_request(rdma_req); 1659 1660 length = req->length; 1661 if (spdk_unlikely(req->dif_enabled)) { 1662 req->dif.orig_length = length; 1663 length = spdk_dif_get_length_with_md(length, &req->dif.dif_ctx); 1664 req->dif.elba_length = length; 1665 } 1666 1667 rc = spdk_nvmf_request_get_buffers(req, &rgroup->group, &rtransport->transport, 1668 length); 1669 if (spdk_unlikely(rc != 0)) { 1670 return rc; 1671 } 1672 1673 assert(req->iovcnt <= rqpair->max_send_sge); 1674 1675 /* When dif_insert_or_strip is true and the I/O data length is greater than one block, 1676 * the stripped_buffers are got for DIF stripping. */ 1677 if (spdk_unlikely(req->dif_enabled && (req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) 1678 && (req->dif.elba_length > req->dif.dif_ctx.block_size))) { 1679 rc = nvmf_request_get_stripped_buffers(req, &rgroup->group, 1680 &rtransport->transport, req->dif.orig_length); 1681 if (rc != 0) { 1682 SPDK_INFOLOG(rdma, "Get stripped buffers fail %d, fallback to req.iov.\n", rc); 1683 } 1684 } 1685 1686 rdma_req->iovpos = 0; 1687 1688 if (spdk_unlikely(req->dif_enabled)) { 1689 num_wrs = nvmf_rdma_calc_num_wrs(length, rtransport->transport.opts.io_unit_size, 1690 req->dif.dif_ctx.block_size); 1691 if (num_wrs > 1) { 1692 rc = nvmf_request_alloc_wrs(rtransport, rdma_req, num_wrs - 1); 1693 if (spdk_unlikely(rc != 0)) { 1694 goto err_exit; 1695 } 1696 } 1697 1698 rc = nvmf_rdma_fill_wr_sgl_with_dif(device, rdma_req, wr, length, num_wrs - 1); 1699 if (spdk_unlikely(rc != 0)) { 1700 goto err_exit; 1701 } 1702 1703 if (num_wrs > 1) { 1704 nvmf_rdma_update_remote_addr(rdma_req, num_wrs); 1705 } 1706 } else { 1707 rc = nvmf_rdma_fill_wr_sgl(device, rdma_req, wr, length); 1708 if (spdk_unlikely(rc != 0)) { 1709 goto err_exit; 1710 } 1711 } 1712 1713 /* set the number of outstanding data WRs for this request. */ 1714 rdma_req->num_outstanding_data_wr = num_wrs; 1715 1716 return rc; 1717 1718 err_exit: 1719 spdk_nvmf_request_free_buffers(req, &rgroup->group, &rtransport->transport); 1720 nvmf_rdma_request_free_data(rdma_req, rtransport); 1721 req->iovcnt = 0; 1722 return rc; 1723 } 1724 1725 static int 1726 nvmf_rdma_request_fill_iovs_multi_sgl(struct spdk_nvmf_rdma_transport *rtransport, 1727 struct spdk_nvmf_rdma_device *device, 1728 struct spdk_nvmf_rdma_request *rdma_req) 1729 { 1730 struct spdk_nvmf_rdma_qpair *rqpair; 1731 struct spdk_nvmf_rdma_poll_group *rgroup; 1732 struct ibv_send_wr *current_wr; 1733 struct spdk_nvmf_request *req = &rdma_req->req; 1734 struct spdk_nvme_sgl_descriptor *inline_segment, *desc; 1735 uint32_t num_sgl_descriptors; 1736 uint32_t lengths[SPDK_NVMF_MAX_SGL_ENTRIES], total_length = 0; 1737 uint32_t i; 1738 int rc; 1739 1740 rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair); 1741 rgroup = rqpair->poller->group; 1742 1743 inline_segment = &req->cmd->nvme_cmd.dptr.sgl1; 1744 assert(inline_segment->generic.type == SPDK_NVME_SGL_TYPE_LAST_SEGMENT); 1745 assert(inline_segment->unkeyed.subtype == SPDK_NVME_SGL_SUBTYPE_OFFSET); 1746 1747 num_sgl_descriptors = inline_segment->unkeyed.length / sizeof(struct spdk_nvme_sgl_descriptor); 1748 assert(num_sgl_descriptors <= SPDK_NVMF_MAX_SGL_ENTRIES); 1749 1750 desc = (struct spdk_nvme_sgl_descriptor *)rdma_req->recv->buf + inline_segment->address; 1751 for (i = 0; i < num_sgl_descriptors; i++) { 1752 if (spdk_likely(!req->dif_enabled)) { 1753 lengths[i] = desc->keyed.length; 1754 } else { 1755 req->dif.orig_length += desc->keyed.length; 1756 lengths[i] = spdk_dif_get_length_with_md(desc->keyed.length, &req->dif.dif_ctx); 1757 req->dif.elba_length += lengths[i]; 1758 } 1759 total_length += lengths[i]; 1760 desc++; 1761 } 1762 1763 if (spdk_unlikely(total_length > rtransport->transport.opts.max_io_size)) { 1764 SPDK_ERRLOG("Multi SGL length 0x%x exceeds max io size 0x%x\n", 1765 total_length, rtransport->transport.opts.max_io_size); 1766 req->rsp->nvme_cpl.status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID; 1767 return -EINVAL; 1768 } 1769 1770 rc = nvmf_request_alloc_wrs(rtransport, rdma_req, num_sgl_descriptors - 1); 1771 if (spdk_unlikely(rc != 0)) { 1772 return -ENOMEM; 1773 } 1774 1775 rc = spdk_nvmf_request_get_buffers(req, &rgroup->group, &rtransport->transport, total_length); 1776 if (spdk_unlikely(rc != 0)) { 1777 nvmf_rdma_request_free_data(rdma_req, rtransport); 1778 return rc; 1779 } 1780 1781 /* When dif_insert_or_strip is true and the I/O data length is greater than one block, 1782 * the stripped_buffers are got for DIF stripping. */ 1783 if (spdk_unlikely(req->dif_enabled && (req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) 1784 && (req->dif.elba_length > req->dif.dif_ctx.block_size))) { 1785 rc = nvmf_request_get_stripped_buffers(req, &rgroup->group, 1786 &rtransport->transport, req->dif.orig_length); 1787 if (spdk_unlikely(rc != 0)) { 1788 SPDK_INFOLOG(rdma, "Get stripped buffers fail %d, fallback to req.iov.\n", rc); 1789 } 1790 } 1791 1792 /* The first WR must always be the embedded data WR. This is how we unwind them later. */ 1793 current_wr = &rdma_req->data.wr; 1794 assert(current_wr != NULL); 1795 1796 req->length = 0; 1797 rdma_req->iovpos = 0; 1798 desc = (struct spdk_nvme_sgl_descriptor *)rdma_req->recv->buf + inline_segment->address; 1799 for (i = 0; i < num_sgl_descriptors; i++) { 1800 /* The descriptors must be keyed data block descriptors with an address, not an offset. */ 1801 if (spdk_unlikely(desc->generic.type != SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK || 1802 desc->keyed.subtype != SPDK_NVME_SGL_SUBTYPE_ADDRESS)) { 1803 rc = -EINVAL; 1804 goto err_exit; 1805 } 1806 1807 if (spdk_likely(!req->dif_enabled)) { 1808 rc = nvmf_rdma_fill_wr_sgl(device, rdma_req, current_wr, lengths[i]); 1809 } else { 1810 rc = nvmf_rdma_fill_wr_sgl_with_dif(device, rdma_req, current_wr, 1811 lengths[i], 0); 1812 } 1813 if (spdk_unlikely(rc != 0)) { 1814 rc = -ENOMEM; 1815 goto err_exit; 1816 } 1817 1818 req->length += desc->keyed.length; 1819 current_wr->wr.rdma.rkey = desc->keyed.key; 1820 current_wr->wr.rdma.remote_addr = desc->address; 1821 current_wr = current_wr->next; 1822 desc++; 1823 } 1824 1825 #ifdef SPDK_CONFIG_RDMA_SEND_WITH_INVAL 1826 /* Go back to the last descriptor in the list. */ 1827 desc--; 1828 if ((device->attr.device_cap_flags & IBV_DEVICE_MEM_MGT_EXTENSIONS) != 0) { 1829 if (desc->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_INVALIDATE_KEY) { 1830 rdma_req->rsp.wr.opcode = IBV_WR_SEND_WITH_INV; 1831 rdma_req->rsp.wr.imm_data = desc->keyed.key; 1832 } 1833 } 1834 #endif 1835 1836 rdma_req->num_outstanding_data_wr = num_sgl_descriptors; 1837 1838 return 0; 1839 1840 err_exit: 1841 spdk_nvmf_request_free_buffers(req, &rgroup->group, &rtransport->transport); 1842 nvmf_rdma_request_free_data(rdma_req, rtransport); 1843 return rc; 1844 } 1845 1846 static int 1847 nvmf_rdma_request_parse_sgl(struct spdk_nvmf_rdma_transport *rtransport, 1848 struct spdk_nvmf_rdma_device *device, 1849 struct spdk_nvmf_rdma_request *rdma_req) 1850 { 1851 struct spdk_nvmf_request *req = &rdma_req->req; 1852 struct spdk_nvme_cpl *rsp; 1853 struct spdk_nvme_sgl_descriptor *sgl; 1854 int rc; 1855 uint32_t length; 1856 1857 rsp = &req->rsp->nvme_cpl; 1858 sgl = &req->cmd->nvme_cmd.dptr.sgl1; 1859 1860 if (sgl->generic.type == SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK && 1861 (sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_ADDRESS || 1862 sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_INVALIDATE_KEY)) { 1863 1864 length = sgl->keyed.length; 1865 if (spdk_unlikely(length > rtransport->transport.opts.max_io_size)) { 1866 SPDK_ERRLOG("SGL length 0x%x exceeds max io size 0x%x\n", 1867 length, rtransport->transport.opts.max_io_size); 1868 rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID; 1869 return -1; 1870 } 1871 #ifdef SPDK_CONFIG_RDMA_SEND_WITH_INVAL 1872 if ((device->attr.device_cap_flags & IBV_DEVICE_MEM_MGT_EXTENSIONS) != 0) { 1873 if (sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_INVALIDATE_KEY) { 1874 rdma_req->rsp.wr.opcode = IBV_WR_SEND_WITH_INV; 1875 rdma_req->rsp.wr.imm_data = sgl->keyed.key; 1876 } 1877 } 1878 #endif 1879 1880 /* fill request length and populate iovs */ 1881 req->length = length; 1882 1883 rc = nvmf_rdma_request_fill_iovs(rtransport, device, rdma_req); 1884 if (spdk_unlikely(rc < 0)) { 1885 if (rc == -EINVAL) { 1886 SPDK_ERRLOG("SGL length exceeds the max I/O size\n"); 1887 rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID; 1888 return -1; 1889 } 1890 /* No available buffers. Queue this request up. */ 1891 SPDK_DEBUGLOG(rdma, "No available large data buffers. Queueing request %p\n", rdma_req); 1892 return 0; 1893 } 1894 1895 SPDK_DEBUGLOG(rdma, "Request %p took %d buffer/s from central pool\n", rdma_req, 1896 req->iovcnt); 1897 1898 return 0; 1899 } else if (sgl->generic.type == SPDK_NVME_SGL_TYPE_DATA_BLOCK && 1900 sgl->unkeyed.subtype == SPDK_NVME_SGL_SUBTYPE_OFFSET) { 1901 uint64_t offset = sgl->address; 1902 uint32_t max_len = rtransport->transport.opts.in_capsule_data_size; 1903 1904 SPDK_DEBUGLOG(nvmf, "In-capsule data: offset 0x%" PRIx64 ", length 0x%x\n", 1905 offset, sgl->unkeyed.length); 1906 1907 if (spdk_unlikely(offset > max_len)) { 1908 SPDK_ERRLOG("In-capsule offset 0x%" PRIx64 " exceeds capsule length 0x%x\n", 1909 offset, max_len); 1910 rsp->status.sc = SPDK_NVME_SC_INVALID_SGL_OFFSET; 1911 return -1; 1912 } 1913 max_len -= (uint32_t)offset; 1914 1915 if (spdk_unlikely(sgl->unkeyed.length > max_len)) { 1916 SPDK_ERRLOG("In-capsule data length 0x%x exceeds capsule length 0x%x\n", 1917 sgl->unkeyed.length, max_len); 1918 rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID; 1919 return -1; 1920 } 1921 1922 rdma_req->num_outstanding_data_wr = 0; 1923 req->data_from_pool = false; 1924 req->length = sgl->unkeyed.length; 1925 1926 req->iov[0].iov_base = rdma_req->recv->buf + offset; 1927 req->iov[0].iov_len = req->length; 1928 req->iovcnt = 1; 1929 1930 return 0; 1931 } else if (sgl->generic.type == SPDK_NVME_SGL_TYPE_LAST_SEGMENT && 1932 sgl->unkeyed.subtype == SPDK_NVME_SGL_SUBTYPE_OFFSET) { 1933 1934 rc = nvmf_rdma_request_fill_iovs_multi_sgl(rtransport, device, rdma_req); 1935 if (spdk_unlikely(rc == -ENOMEM)) { 1936 SPDK_DEBUGLOG(rdma, "No available large data buffers. Queueing request %p\n", rdma_req); 1937 return 0; 1938 } else if (spdk_unlikely(rc == -EINVAL)) { 1939 SPDK_ERRLOG("Multi SGL element request length exceeds the max I/O size\n"); 1940 rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID; 1941 return -1; 1942 } 1943 1944 SPDK_DEBUGLOG(rdma, "Request %p took %d buffer/s from central pool\n", rdma_req, 1945 req->iovcnt); 1946 1947 return 0; 1948 } 1949 1950 SPDK_ERRLOG("Invalid NVMf I/O Command SGL: Type 0x%x, Subtype 0x%x\n", 1951 sgl->generic.type, sgl->generic.subtype); 1952 rsp->status.sc = SPDK_NVME_SC_SGL_DESCRIPTOR_TYPE_INVALID; 1953 return -1; 1954 } 1955 1956 static void 1957 _nvmf_rdma_request_free(struct spdk_nvmf_rdma_request *rdma_req, 1958 struct spdk_nvmf_rdma_transport *rtransport) 1959 { 1960 struct spdk_nvmf_rdma_qpair *rqpair; 1961 struct spdk_nvmf_rdma_poll_group *rgroup; 1962 1963 rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair); 1964 if (rdma_req->req.data_from_pool) { 1965 rgroup = rqpair->poller->group; 1966 1967 spdk_nvmf_request_free_buffers(&rdma_req->req, &rgroup->group, &rtransport->transport); 1968 } 1969 if (rdma_req->req.stripped_data) { 1970 nvmf_request_free_stripped_buffers(&rdma_req->req, 1971 &rqpair->poller->group->group, 1972 &rtransport->transport); 1973 } 1974 nvmf_rdma_request_free_data(rdma_req, rtransport); 1975 rdma_req->req.length = 0; 1976 rdma_req->req.iovcnt = 0; 1977 rdma_req->offset = 0; 1978 rdma_req->req.dif_enabled = false; 1979 rdma_req->fused_failed = false; 1980 rdma_req->transfer_wr = NULL; 1981 if (rdma_req->fused_pair) { 1982 /* This req was part of a valid fused pair, but failed before it got to 1983 * READ_TO_EXECUTE state. This means we need to fail the other request 1984 * in the pair, because it is no longer part of a valid pair. If the pair 1985 * already reached READY_TO_EXECUTE state, we need to kick it. 1986 */ 1987 rdma_req->fused_pair->fused_failed = true; 1988 if (rdma_req->fused_pair->state == RDMA_REQUEST_STATE_READY_TO_EXECUTE) { 1989 nvmf_rdma_request_process(rtransport, rdma_req->fused_pair); 1990 } 1991 rdma_req->fused_pair = NULL; 1992 } 1993 memset(&rdma_req->req.dif, 0, sizeof(rdma_req->req.dif)); 1994 rqpair->qd--; 1995 1996 STAILQ_INSERT_HEAD(&rqpair->resources->free_queue, rdma_req, state_link); 1997 rqpair->qpair.queue_depth--; 1998 rdma_req->state = RDMA_REQUEST_STATE_FREE; 1999 } 2000 2001 static void 2002 nvmf_rdma_check_fused_ordering(struct spdk_nvmf_rdma_transport *rtransport, 2003 struct spdk_nvmf_rdma_qpair *rqpair, 2004 struct spdk_nvmf_rdma_request *rdma_req) 2005 { 2006 enum spdk_nvme_cmd_fuse last, next; 2007 2008 last = rqpair->fused_first ? rqpair->fused_first->req.cmd->nvme_cmd.fuse : SPDK_NVME_CMD_FUSE_NONE; 2009 next = rdma_req->req.cmd->nvme_cmd.fuse; 2010 2011 assert(last != SPDK_NVME_CMD_FUSE_SECOND); 2012 2013 if (spdk_likely(last == SPDK_NVME_CMD_FUSE_NONE && next == SPDK_NVME_CMD_FUSE_NONE)) { 2014 return; 2015 } 2016 2017 if (last == SPDK_NVME_CMD_FUSE_FIRST) { 2018 if (next == SPDK_NVME_CMD_FUSE_SECOND) { 2019 /* This is a valid pair of fused commands. Point them at each other 2020 * so they can be submitted consecutively once ready to be executed. 2021 */ 2022 rqpair->fused_first->fused_pair = rdma_req; 2023 rdma_req->fused_pair = rqpair->fused_first; 2024 rqpair->fused_first = NULL; 2025 return; 2026 } else { 2027 /* Mark the last req as failed since it wasn't followed by a SECOND. */ 2028 rqpair->fused_first->fused_failed = true; 2029 2030 /* If the last req is in READY_TO_EXECUTE state, then call 2031 * nvmf_rdma_request_process(), otherwise nothing else will kick it. 2032 */ 2033 if (rqpair->fused_first->state == RDMA_REQUEST_STATE_READY_TO_EXECUTE) { 2034 nvmf_rdma_request_process(rtransport, rqpair->fused_first); 2035 } 2036 2037 rqpair->fused_first = NULL; 2038 } 2039 } 2040 2041 if (next == SPDK_NVME_CMD_FUSE_FIRST) { 2042 /* Set rqpair->fused_first here so that we know to check that the next request 2043 * is a SECOND (and to fail this one if it isn't). 2044 */ 2045 rqpair->fused_first = rdma_req; 2046 } else if (next == SPDK_NVME_CMD_FUSE_SECOND) { 2047 /* Mark this req failed since it ia SECOND and the last one was not a FIRST. */ 2048 rdma_req->fused_failed = true; 2049 } 2050 } 2051 2052 bool 2053 nvmf_rdma_request_process(struct spdk_nvmf_rdma_transport *rtransport, 2054 struct spdk_nvmf_rdma_request *rdma_req) 2055 { 2056 struct spdk_nvmf_rdma_qpair *rqpair; 2057 struct spdk_nvmf_rdma_device *device; 2058 struct spdk_nvmf_rdma_poll_group *rgroup; 2059 struct spdk_nvme_cpl *rsp = &rdma_req->req.rsp->nvme_cpl; 2060 int rc; 2061 struct spdk_nvmf_rdma_recv *rdma_recv; 2062 enum spdk_nvmf_rdma_request_state prev_state; 2063 bool progress = false; 2064 int data_posted; 2065 uint32_t num_blocks, num_rdma_reads_available, qdepth; 2066 2067 rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair); 2068 device = rqpair->device; 2069 rgroup = rqpair->poller->group; 2070 2071 assert(rdma_req->state != RDMA_REQUEST_STATE_FREE); 2072 2073 /* If the queue pair is in an error state, force the request to the completed state 2074 * to release resources. */ 2075 if (spdk_unlikely(rqpair->ibv_in_error_state || !spdk_nvmf_qpair_is_active(&rqpair->qpair))) { 2076 switch (rdma_req->state) { 2077 case RDMA_REQUEST_STATE_NEED_BUFFER: 2078 STAILQ_REMOVE(&rgroup->group.pending_buf_queue, &rdma_req->req, spdk_nvmf_request, buf_link); 2079 break; 2080 case RDMA_REQUEST_STATE_DATA_TRANSFER_TO_CONTROLLER_PENDING: 2081 STAILQ_REMOVE(&rqpair->pending_rdma_read_queue, rdma_req, spdk_nvmf_rdma_request, state_link); 2082 break; 2083 case RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER: 2084 if (rdma_req->num_remaining_data_wr) { 2085 /* Partially sent request is still in the pending_rdma_read_queue, 2086 * remove it before completing */ 2087 rdma_req->num_remaining_data_wr = 0; 2088 STAILQ_REMOVE(&rqpair->pending_rdma_read_queue, rdma_req, spdk_nvmf_rdma_request, state_link); 2089 } 2090 break; 2091 case RDMA_REQUEST_STATE_DATA_TRANSFER_TO_HOST_PENDING: 2092 STAILQ_REMOVE(&rqpair->pending_rdma_write_queue, rdma_req, spdk_nvmf_rdma_request, state_link); 2093 break; 2094 case RDMA_REQUEST_STATE_READY_TO_COMPLETE_PENDING: 2095 STAILQ_REMOVE(&rqpair->pending_rdma_send_queue, rdma_req, spdk_nvmf_rdma_request, state_link); 2096 break; 2097 default: 2098 break; 2099 } 2100 rdma_req->state = RDMA_REQUEST_STATE_COMPLETED; 2101 } 2102 2103 /* The loop here is to allow for several back-to-back state changes. */ 2104 do { 2105 prev_state = rdma_req->state; 2106 2107 SPDK_DEBUGLOG(rdma, "Request %p entering state %d\n", rdma_req, prev_state); 2108 2109 switch (rdma_req->state) { 2110 case RDMA_REQUEST_STATE_FREE: 2111 /* Some external code must kick a request into RDMA_REQUEST_STATE_NEW 2112 * to escape this state. */ 2113 break; 2114 case RDMA_REQUEST_STATE_NEW: 2115 spdk_trace_record(TRACE_RDMA_REQUEST_STATE_NEW, 0, 0, 2116 (uintptr_t)rdma_req, (uintptr_t)rqpair, rqpair->qpair.queue_depth); 2117 rdma_recv = rdma_req->recv; 2118 2119 /* The first element of the SGL is the NVMe command */ 2120 rdma_req->req.cmd = (union nvmf_h2c_msg *)rdma_recv->sgl[0].addr; 2121 memset(rdma_req->req.rsp, 0, sizeof(*rdma_req->req.rsp)); 2122 rdma_req->transfer_wr = &rdma_req->data.wr; 2123 2124 if (spdk_unlikely(rqpair->ibv_in_error_state || !spdk_nvmf_qpair_is_active(&rqpair->qpair))) { 2125 rdma_req->state = RDMA_REQUEST_STATE_COMPLETED; 2126 break; 2127 } 2128 2129 if (spdk_unlikely(spdk_nvmf_request_get_dif_ctx(&rdma_req->req, &rdma_req->req.dif.dif_ctx))) { 2130 rdma_req->req.dif_enabled = true; 2131 } 2132 2133 nvmf_rdma_check_fused_ordering(rtransport, rqpair, rdma_req); 2134 2135 #ifdef SPDK_CONFIG_RDMA_SEND_WITH_INVAL 2136 rdma_req->rsp.wr.opcode = IBV_WR_SEND; 2137 rdma_req->rsp.wr.imm_data = 0; 2138 #endif 2139 2140 /* The next state transition depends on the data transfer needs of this request. */ 2141 rdma_req->req.xfer = spdk_nvmf_req_get_xfer(&rdma_req->req); 2142 2143 if (spdk_unlikely(rdma_req->req.xfer == SPDK_NVME_DATA_BIDIRECTIONAL)) { 2144 rsp->status.sct = SPDK_NVME_SCT_GENERIC; 2145 rsp->status.sc = SPDK_NVME_SC_INVALID_OPCODE; 2146 STAILQ_INSERT_TAIL(&rqpair->pending_rdma_send_queue, rdma_req, state_link); 2147 rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE_PENDING; 2148 SPDK_DEBUGLOG(rdma, "Request %p: invalid xfer type (BIDIRECTIONAL)\n", rdma_req); 2149 break; 2150 } 2151 2152 /* If no data to transfer, ready to execute. */ 2153 if (rdma_req->req.xfer == SPDK_NVME_DATA_NONE) { 2154 rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE; 2155 break; 2156 } 2157 2158 rdma_req->state = RDMA_REQUEST_STATE_NEED_BUFFER; 2159 STAILQ_INSERT_TAIL(&rgroup->group.pending_buf_queue, &rdma_req->req, buf_link); 2160 break; 2161 case RDMA_REQUEST_STATE_NEED_BUFFER: 2162 spdk_trace_record(TRACE_RDMA_REQUEST_STATE_NEED_BUFFER, 0, 0, 2163 (uintptr_t)rdma_req, (uintptr_t)rqpair); 2164 2165 assert(rdma_req->req.xfer != SPDK_NVME_DATA_NONE); 2166 2167 if (&rdma_req->req != STAILQ_FIRST(&rgroup->group.pending_buf_queue)) { 2168 /* This request needs to wait in line to obtain a buffer */ 2169 break; 2170 } 2171 2172 /* Try to get a data buffer */ 2173 rc = nvmf_rdma_request_parse_sgl(rtransport, device, rdma_req); 2174 if (spdk_unlikely(rc < 0)) { 2175 STAILQ_REMOVE_HEAD(&rgroup->group.pending_buf_queue, buf_link); 2176 STAILQ_INSERT_TAIL(&rqpair->pending_rdma_send_queue, rdma_req, state_link); 2177 rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE_PENDING; 2178 break; 2179 } 2180 2181 if (rdma_req->req.iovcnt == 0) { 2182 /* No buffers available. */ 2183 rgroup->stat.pending_data_buffer++; 2184 break; 2185 } 2186 2187 STAILQ_REMOVE_HEAD(&rgroup->group.pending_buf_queue, buf_link); 2188 2189 /* If data is transferring from host to controller and the data didn't 2190 * arrive using in capsule data, we need to do a transfer from the host. 2191 */ 2192 if (rdma_req->req.xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER && 2193 rdma_req->req.data_from_pool) { 2194 STAILQ_INSERT_TAIL(&rqpair->pending_rdma_read_queue, rdma_req, state_link); 2195 rdma_req->state = RDMA_REQUEST_STATE_DATA_TRANSFER_TO_CONTROLLER_PENDING; 2196 break; 2197 } 2198 2199 rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE; 2200 break; 2201 case RDMA_REQUEST_STATE_DATA_TRANSFER_TO_CONTROLLER_PENDING: 2202 spdk_trace_record(TRACE_RDMA_REQUEST_STATE_DATA_TRANSFER_TO_CONTROLLER_PENDING, 0, 0, 2203 (uintptr_t)rdma_req, (uintptr_t)rqpair); 2204 2205 if (rdma_req != STAILQ_FIRST(&rqpair->pending_rdma_read_queue)) { 2206 /* This request needs to wait in line to perform RDMA */ 2207 break; 2208 } 2209 assert(rqpair->max_send_depth >= rqpair->current_send_depth); 2210 qdepth = rqpair->max_send_depth - rqpair->current_send_depth; 2211 assert(rqpair->max_read_depth >= rqpair->current_read_depth); 2212 num_rdma_reads_available = rqpair->max_read_depth - rqpair->current_read_depth; 2213 if (rdma_req->num_outstanding_data_wr > qdepth || 2214 rdma_req->num_outstanding_data_wr > num_rdma_reads_available) { 2215 if (num_rdma_reads_available && qdepth) { 2216 /* Send as much as we can */ 2217 request_prepare_transfer_in_part(&rdma_req->req, spdk_min(num_rdma_reads_available, qdepth)); 2218 } else { 2219 /* We can only have so many WRs outstanding. we have to wait until some finish. */ 2220 rqpair->poller->stat.pending_rdma_read++; 2221 break; 2222 } 2223 } 2224 2225 /* We have already verified that this request is the head of the queue. */ 2226 if (rdma_req->num_remaining_data_wr == 0) { 2227 STAILQ_REMOVE_HEAD(&rqpair->pending_rdma_read_queue, state_link); 2228 } 2229 2230 request_transfer_in(&rdma_req->req); 2231 rdma_req->state = RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER; 2232 2233 break; 2234 case RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER: 2235 spdk_trace_record(TRACE_RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER, 0, 0, 2236 (uintptr_t)rdma_req, (uintptr_t)rqpair); 2237 /* Some external code must kick a request into RDMA_REQUEST_STATE_READY_TO_EXECUTE 2238 * to escape this state. */ 2239 break; 2240 case RDMA_REQUEST_STATE_READY_TO_EXECUTE: 2241 spdk_trace_record(TRACE_RDMA_REQUEST_STATE_READY_TO_EXECUTE, 0, 0, 2242 (uintptr_t)rdma_req, (uintptr_t)rqpair); 2243 2244 if (spdk_unlikely(rdma_req->req.dif_enabled)) { 2245 if (rdma_req->req.xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) { 2246 /* generate DIF for write operation */ 2247 num_blocks = SPDK_CEIL_DIV(rdma_req->req.dif.elba_length, rdma_req->req.dif.dif_ctx.block_size); 2248 assert(num_blocks > 0); 2249 2250 rc = spdk_dif_generate(rdma_req->req.iov, rdma_req->req.iovcnt, 2251 num_blocks, &rdma_req->req.dif.dif_ctx); 2252 if (rc != 0) { 2253 SPDK_ERRLOG("DIF generation failed\n"); 2254 rdma_req->state = RDMA_REQUEST_STATE_COMPLETED; 2255 spdk_nvmf_qpair_disconnect(&rqpair->qpair); 2256 break; 2257 } 2258 } 2259 2260 assert(rdma_req->req.dif.elba_length >= rdma_req->req.length); 2261 /* set extended length before IO operation */ 2262 rdma_req->req.length = rdma_req->req.dif.elba_length; 2263 } 2264 2265 if (rdma_req->req.cmd->nvme_cmd.fuse != SPDK_NVME_CMD_FUSE_NONE) { 2266 if (rdma_req->fused_failed) { 2267 /* This request failed FUSED semantics. Fail it immediately, without 2268 * even sending it to the target layer. 2269 */ 2270 rsp->status.sct = SPDK_NVME_SCT_GENERIC; 2271 rsp->status.sc = SPDK_NVME_SC_ABORTED_MISSING_FUSED; 2272 STAILQ_INSERT_TAIL(&rqpair->pending_rdma_send_queue, rdma_req, state_link); 2273 rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE_PENDING; 2274 break; 2275 } 2276 2277 if (rdma_req->fused_pair == NULL || 2278 rdma_req->fused_pair->state != RDMA_REQUEST_STATE_READY_TO_EXECUTE) { 2279 /* This request is ready to execute, but either we don't know yet if it's 2280 * valid - i.e. this is a FIRST but we haven't received the next 2281 * request yet or the other request of this fused pair isn't ready to 2282 * execute. So break here and this request will get processed later either 2283 * when the other request is ready or we find that this request isn't valid. 2284 */ 2285 break; 2286 } 2287 } 2288 2289 /* If we get to this point, and this request is a fused command, we know that 2290 * it is part of valid sequence (FIRST followed by a SECOND) and that both 2291 * requests are READY_TO_EXECUTE. So call spdk_nvmf_request_exec() both on this 2292 * request, and the other request of the fused pair, in the correct order. 2293 * Also clear the ->fused_pair pointers on both requests, since after this point 2294 * we no longer need to maintain the relationship between these two requests. 2295 */ 2296 if (rdma_req->req.cmd->nvme_cmd.fuse == SPDK_NVME_CMD_FUSE_SECOND) { 2297 assert(rdma_req->fused_pair != NULL); 2298 assert(rdma_req->fused_pair->fused_pair != NULL); 2299 rdma_req->fused_pair->state = RDMA_REQUEST_STATE_EXECUTING; 2300 spdk_nvmf_request_exec(&rdma_req->fused_pair->req); 2301 rdma_req->fused_pair->fused_pair = NULL; 2302 rdma_req->fused_pair = NULL; 2303 } 2304 rdma_req->state = RDMA_REQUEST_STATE_EXECUTING; 2305 spdk_nvmf_request_exec(&rdma_req->req); 2306 if (rdma_req->req.cmd->nvme_cmd.fuse == SPDK_NVME_CMD_FUSE_FIRST) { 2307 assert(rdma_req->fused_pair != NULL); 2308 assert(rdma_req->fused_pair->fused_pair != NULL); 2309 rdma_req->fused_pair->state = RDMA_REQUEST_STATE_EXECUTING; 2310 spdk_nvmf_request_exec(&rdma_req->fused_pair->req); 2311 rdma_req->fused_pair->fused_pair = NULL; 2312 rdma_req->fused_pair = NULL; 2313 } 2314 break; 2315 case RDMA_REQUEST_STATE_EXECUTING: 2316 spdk_trace_record(TRACE_RDMA_REQUEST_STATE_EXECUTING, 0, 0, 2317 (uintptr_t)rdma_req, (uintptr_t)rqpair); 2318 /* Some external code must kick a request into RDMA_REQUEST_STATE_EXECUTED 2319 * to escape this state. */ 2320 break; 2321 case RDMA_REQUEST_STATE_EXECUTED: 2322 spdk_trace_record(TRACE_RDMA_REQUEST_STATE_EXECUTED, 0, 0, 2323 (uintptr_t)rdma_req, (uintptr_t)rqpair); 2324 if (rsp->status.sc == SPDK_NVME_SC_SUCCESS && 2325 rdma_req->req.xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) { 2326 STAILQ_INSERT_TAIL(&rqpair->pending_rdma_write_queue, rdma_req, state_link); 2327 rdma_req->state = RDMA_REQUEST_STATE_DATA_TRANSFER_TO_HOST_PENDING; 2328 } else { 2329 STAILQ_INSERT_TAIL(&rqpair->pending_rdma_send_queue, rdma_req, state_link); 2330 rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE_PENDING; 2331 } 2332 if (spdk_unlikely(rdma_req->req.dif_enabled)) { 2333 /* restore the original length */ 2334 rdma_req->req.length = rdma_req->req.dif.orig_length; 2335 2336 if (rdma_req->req.xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) { 2337 struct spdk_dif_error error_blk; 2338 2339 num_blocks = SPDK_CEIL_DIV(rdma_req->req.dif.elba_length, rdma_req->req.dif.dif_ctx.block_size); 2340 if (!rdma_req->req.stripped_data) { 2341 rc = spdk_dif_verify(rdma_req->req.iov, rdma_req->req.iovcnt, num_blocks, 2342 &rdma_req->req.dif.dif_ctx, &error_blk); 2343 } else { 2344 rc = spdk_dif_verify_copy(rdma_req->req.stripped_data->iov, 2345 rdma_req->req.stripped_data->iovcnt, 2346 rdma_req->req.iov, rdma_req->req.iovcnt, num_blocks, 2347 &rdma_req->req.dif.dif_ctx, &error_blk); 2348 } 2349 if (rc) { 2350 struct spdk_nvme_cpl *rsp = &rdma_req->req.rsp->nvme_cpl; 2351 2352 SPDK_ERRLOG("DIF error detected. type=%d, offset=%" PRIu32 "\n", error_blk.err_type, 2353 error_blk.err_offset); 2354 rsp->status.sct = SPDK_NVME_SCT_MEDIA_ERROR; 2355 rsp->status.sc = nvmf_rdma_dif_error_to_compl_status(error_blk.err_type); 2356 STAILQ_REMOVE(&rqpair->pending_rdma_write_queue, rdma_req, spdk_nvmf_rdma_request, state_link); 2357 STAILQ_INSERT_TAIL(&rqpair->pending_rdma_send_queue, rdma_req, state_link); 2358 rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE_PENDING; 2359 } 2360 } 2361 } 2362 break; 2363 case RDMA_REQUEST_STATE_DATA_TRANSFER_TO_HOST_PENDING: 2364 spdk_trace_record(TRACE_RDMA_REQUEST_STATE_DATA_TRANSFER_TO_HOST_PENDING, 0, 0, 2365 (uintptr_t)rdma_req, (uintptr_t)rqpair); 2366 2367 if (rdma_req != STAILQ_FIRST(&rqpair->pending_rdma_write_queue)) { 2368 /* This request needs to wait in line to perform RDMA */ 2369 break; 2370 } 2371 if ((rqpair->current_send_depth + rdma_req->num_outstanding_data_wr + 1) > 2372 rqpair->max_send_depth) { 2373 /* We can only have so many WRs outstanding. we have to wait until some finish. 2374 * +1 since each request has an additional wr in the resp. */ 2375 rqpair->poller->stat.pending_rdma_write++; 2376 break; 2377 } 2378 2379 /* We have already verified that this request is the head of the queue. */ 2380 STAILQ_REMOVE_HEAD(&rqpair->pending_rdma_write_queue, state_link); 2381 2382 /* The data transfer will be kicked off from 2383 * RDMA_REQUEST_STATE_READY_TO_COMPLETE state. 2384 * We verified that data + response fit into send queue, so we can go to the next state directly 2385 */ 2386 rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE; 2387 break; 2388 case RDMA_REQUEST_STATE_READY_TO_COMPLETE_PENDING: 2389 spdk_trace_record(TRACE_RDMA_REQUEST_STATE_READY_TO_COMPLETE_PENDING, 0, 0, 2390 (uintptr_t)rdma_req, (uintptr_t)rqpair); 2391 2392 if (rdma_req != STAILQ_FIRST(&rqpair->pending_rdma_send_queue)) { 2393 /* This request needs to wait in line to send the completion */ 2394 break; 2395 } 2396 2397 assert(rqpair->current_send_depth <= rqpair->max_send_depth); 2398 if (rqpair->current_send_depth == rqpair->max_send_depth) { 2399 /* We can only have so many WRs outstanding. we have to wait until some finish */ 2400 rqpair->poller->stat.pending_rdma_send++; 2401 break; 2402 } 2403 2404 /* We have already verified that this request is the head of the queue. */ 2405 STAILQ_REMOVE_HEAD(&rqpair->pending_rdma_send_queue, state_link); 2406 2407 /* The response sending will be kicked off from 2408 * RDMA_REQUEST_STATE_READY_TO_COMPLETE state. 2409 */ 2410 rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE; 2411 break; 2412 case RDMA_REQUEST_STATE_READY_TO_COMPLETE: 2413 spdk_trace_record(TRACE_RDMA_REQUEST_STATE_READY_TO_COMPLETE, 0, 0, 2414 (uintptr_t)rdma_req, (uintptr_t)rqpair); 2415 rc = request_transfer_out(&rdma_req->req, &data_posted); 2416 assert(rc == 0); /* No good way to handle this currently */ 2417 if (spdk_unlikely(rc)) { 2418 rdma_req->state = RDMA_REQUEST_STATE_COMPLETED; 2419 } else { 2420 rdma_req->state = data_posted ? RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST : 2421 RDMA_REQUEST_STATE_COMPLETING; 2422 } 2423 break; 2424 case RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST: 2425 spdk_trace_record(TRACE_RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST, 0, 0, 2426 (uintptr_t)rdma_req, (uintptr_t)rqpair); 2427 /* Some external code must kick a request into RDMA_REQUEST_STATE_COMPLETED 2428 * to escape this state. */ 2429 break; 2430 case RDMA_REQUEST_STATE_COMPLETING: 2431 spdk_trace_record(TRACE_RDMA_REQUEST_STATE_COMPLETING, 0, 0, 2432 (uintptr_t)rdma_req, (uintptr_t)rqpair); 2433 /* Some external code must kick a request into RDMA_REQUEST_STATE_COMPLETED 2434 * to escape this state. */ 2435 break; 2436 case RDMA_REQUEST_STATE_COMPLETED: 2437 spdk_trace_record(TRACE_RDMA_REQUEST_STATE_COMPLETED, 0, 0, 2438 (uintptr_t)rdma_req, (uintptr_t)rqpair, rqpair->qpair.queue_depth); 2439 2440 rqpair->poller->stat.request_latency += spdk_get_ticks() - rdma_req->receive_tsc; 2441 _nvmf_rdma_request_free(rdma_req, rtransport); 2442 break; 2443 case RDMA_REQUEST_NUM_STATES: 2444 default: 2445 assert(0); 2446 break; 2447 } 2448 2449 if (rdma_req->state != prev_state) { 2450 progress = true; 2451 } 2452 } while (rdma_req->state != prev_state); 2453 2454 return progress; 2455 } 2456 2457 /* Public API callbacks begin here */ 2458 2459 #define SPDK_NVMF_RDMA_DEFAULT_MAX_QUEUE_DEPTH 128 2460 #define SPDK_NVMF_RDMA_DEFAULT_AQ_DEPTH 128 2461 #define SPDK_NVMF_RDMA_DEFAULT_SRQ_DEPTH 4096 2462 #define SPDK_NVMF_RDMA_DEFAULT_MAX_QPAIRS_PER_CTRLR 128 2463 #define SPDK_NVMF_RDMA_DEFAULT_IN_CAPSULE_DATA_SIZE 4096 2464 #define SPDK_NVMF_RDMA_DEFAULT_MAX_IO_SIZE 131072 2465 #define SPDK_NVMF_RDMA_MIN_IO_BUFFER_SIZE (SPDK_NVMF_RDMA_DEFAULT_MAX_IO_SIZE / SPDK_NVMF_MAX_SGL_ENTRIES) 2466 #define SPDK_NVMF_RDMA_DEFAULT_NUM_SHARED_BUFFERS 4095 2467 #define SPDK_NVMF_RDMA_DEFAULT_BUFFER_CACHE_SIZE UINT32_MAX 2468 #define SPDK_NVMF_RDMA_DEFAULT_NO_SRQ false 2469 #define SPDK_NVMF_RDMA_DIF_INSERT_OR_STRIP false 2470 #define SPDK_NVMF_RDMA_ACCEPTOR_BACKLOG 100 2471 #define SPDK_NVMF_RDMA_DEFAULT_ABORT_TIMEOUT_SEC 1 2472 #define SPDK_NVMF_RDMA_DEFAULT_NO_WR_BATCHING false 2473 #define SPDK_NVMF_RDMA_DEFAULT_DATA_WR_POOL_SIZE 4095 2474 2475 static void 2476 nvmf_rdma_opts_init(struct spdk_nvmf_transport_opts *opts) 2477 { 2478 opts->max_queue_depth = SPDK_NVMF_RDMA_DEFAULT_MAX_QUEUE_DEPTH; 2479 opts->max_qpairs_per_ctrlr = SPDK_NVMF_RDMA_DEFAULT_MAX_QPAIRS_PER_CTRLR; 2480 opts->in_capsule_data_size = SPDK_NVMF_RDMA_DEFAULT_IN_CAPSULE_DATA_SIZE; 2481 opts->max_io_size = SPDK_NVMF_RDMA_DEFAULT_MAX_IO_SIZE; 2482 opts->io_unit_size = SPDK_NVMF_RDMA_MIN_IO_BUFFER_SIZE; 2483 opts->max_aq_depth = SPDK_NVMF_RDMA_DEFAULT_AQ_DEPTH; 2484 opts->num_shared_buffers = SPDK_NVMF_RDMA_DEFAULT_NUM_SHARED_BUFFERS; 2485 opts->buf_cache_size = SPDK_NVMF_RDMA_DEFAULT_BUFFER_CACHE_SIZE; 2486 opts->dif_insert_or_strip = SPDK_NVMF_RDMA_DIF_INSERT_OR_STRIP; 2487 opts->abort_timeout_sec = SPDK_NVMF_RDMA_DEFAULT_ABORT_TIMEOUT_SEC; 2488 opts->transport_specific = NULL; 2489 opts->data_wr_pool_size = SPDK_NVMF_RDMA_DEFAULT_DATA_WR_POOL_SIZE; 2490 } 2491 2492 static int nvmf_rdma_destroy(struct spdk_nvmf_transport *transport, 2493 spdk_nvmf_transport_destroy_done_cb cb_fn, void *cb_arg); 2494 2495 static inline bool 2496 nvmf_rdma_is_rxe_device(struct spdk_nvmf_rdma_device *device) 2497 { 2498 return device->attr.vendor_id == SPDK_RDMA_RXE_VENDOR_ID_OLD || 2499 device->attr.vendor_id == SPDK_RDMA_RXE_VENDOR_ID_NEW; 2500 } 2501 2502 static int nvmf_rdma_accept(void *ctx); 2503 static bool nvmf_rdma_retry_listen_port(struct spdk_nvmf_rdma_transport *rtransport); 2504 static void destroy_ib_device(struct spdk_nvmf_rdma_transport *rtransport, 2505 struct spdk_nvmf_rdma_device *device); 2506 2507 static int 2508 create_ib_device(struct spdk_nvmf_rdma_transport *rtransport, struct ibv_context *context, 2509 struct spdk_nvmf_rdma_device **new_device) 2510 { 2511 struct spdk_nvmf_rdma_device *device; 2512 int flag = 0; 2513 int rc = 0; 2514 2515 device = calloc(1, sizeof(*device)); 2516 if (!device) { 2517 SPDK_ERRLOG("Unable to allocate memory for RDMA devices.\n"); 2518 return -ENOMEM; 2519 } 2520 device->context = context; 2521 rc = ibv_query_device(device->context, &device->attr); 2522 if (rc < 0) { 2523 SPDK_ERRLOG("Failed to query RDMA device attributes.\n"); 2524 free(device); 2525 return rc; 2526 } 2527 2528 #ifdef SPDK_CONFIG_RDMA_SEND_WITH_INVAL 2529 if ((device->attr.device_cap_flags & IBV_DEVICE_MEM_MGT_EXTENSIONS) == 0) { 2530 SPDK_WARNLOG("The libibverbs on this system supports SEND_WITH_INVALIDATE,"); 2531 SPDK_WARNLOG("but the device with vendor ID %u does not.\n", device->attr.vendor_id); 2532 } 2533 2534 /** 2535 * The vendor ID is assigned by the IEEE and an ID of 0 implies Soft-RoCE. 2536 * The Soft-RoCE RXE driver does not currently support send with invalidate, 2537 * but incorrectly reports that it does. There are changes making their way 2538 * through the kernel now that will enable this feature. When they are merged, 2539 * we can conditionally enable this feature. 2540 * 2541 * TODO: enable this for versions of the kernel rxe driver that support it. 2542 */ 2543 if (nvmf_rdma_is_rxe_device(device)) { 2544 device->attr.device_cap_flags &= ~(IBV_DEVICE_MEM_MGT_EXTENSIONS); 2545 } 2546 #endif 2547 2548 /* set up device context async ev fd as NON_BLOCKING */ 2549 flag = fcntl(device->context->async_fd, F_GETFL); 2550 rc = fcntl(device->context->async_fd, F_SETFL, flag | O_NONBLOCK); 2551 if (rc < 0) { 2552 SPDK_ERRLOG("Failed to set context async fd to NONBLOCK.\n"); 2553 free(device); 2554 return rc; 2555 } 2556 2557 TAILQ_INSERT_TAIL(&rtransport->devices, device, link); 2558 SPDK_DEBUGLOG(rdma, "New device %p is added to RDMA transport\n", device); 2559 2560 if (g_nvmf_hooks.get_ibv_pd) { 2561 device->pd = g_nvmf_hooks.get_ibv_pd(NULL, device->context); 2562 } else { 2563 device->pd = ibv_alloc_pd(device->context); 2564 } 2565 2566 if (!device->pd) { 2567 SPDK_ERRLOG("Unable to allocate protection domain.\n"); 2568 destroy_ib_device(rtransport, device); 2569 return -ENOMEM; 2570 } 2571 2572 assert(device->map == NULL); 2573 2574 device->map = spdk_rdma_utils_create_mem_map(device->pd, &g_nvmf_hooks, IBV_ACCESS_LOCAL_WRITE); 2575 if (!device->map) { 2576 SPDK_ERRLOG("Unable to allocate memory map for listen address\n"); 2577 destroy_ib_device(rtransport, device); 2578 return -ENOMEM; 2579 } 2580 2581 assert(device->map != NULL); 2582 assert(device->pd != NULL); 2583 2584 if (new_device) { 2585 *new_device = device; 2586 } 2587 SPDK_NOTICELOG("Create IB device %s(%p/%p) succeed.\n", ibv_get_device_name(context->device), 2588 device, context); 2589 2590 return 0; 2591 } 2592 2593 static void 2594 free_poll_fds(struct spdk_nvmf_rdma_transport *rtransport) 2595 { 2596 if (rtransport->poll_fds) { 2597 free(rtransport->poll_fds); 2598 rtransport->poll_fds = NULL; 2599 } 2600 rtransport->npoll_fds = 0; 2601 } 2602 2603 static int 2604 generate_poll_fds(struct spdk_nvmf_rdma_transport *rtransport) 2605 { 2606 /* Set up poll descriptor array to monitor events from RDMA and IB 2607 * in a single poll syscall 2608 */ 2609 int device_count = 0; 2610 int i = 0; 2611 struct spdk_nvmf_rdma_device *device, *tmp; 2612 2613 TAILQ_FOREACH_SAFE(device, &rtransport->devices, link, tmp) { 2614 device_count++; 2615 } 2616 2617 rtransport->npoll_fds = device_count + 1; 2618 2619 rtransport->poll_fds = calloc(rtransport->npoll_fds, sizeof(struct pollfd)); 2620 if (rtransport->poll_fds == NULL) { 2621 SPDK_ERRLOG("poll_fds allocation failed\n"); 2622 return -ENOMEM; 2623 } 2624 2625 rtransport->poll_fds[i].fd = rtransport->event_channel->fd; 2626 rtransport->poll_fds[i++].events = POLLIN; 2627 2628 TAILQ_FOREACH_SAFE(device, &rtransport->devices, link, tmp) { 2629 rtransport->poll_fds[i].fd = device->context->async_fd; 2630 rtransport->poll_fds[i++].events = POLLIN; 2631 } 2632 2633 return 0; 2634 } 2635 2636 static struct spdk_nvmf_transport * 2637 nvmf_rdma_create(struct spdk_nvmf_transport_opts *opts) 2638 { 2639 int rc; 2640 struct spdk_nvmf_rdma_transport *rtransport; 2641 struct spdk_nvmf_rdma_device *device; 2642 struct ibv_context **contexts; 2643 size_t data_wr_pool_size; 2644 uint32_t i; 2645 int flag; 2646 uint32_t sge_count; 2647 uint32_t min_shared_buffers; 2648 uint32_t min_in_capsule_data_size; 2649 int max_device_sge = SPDK_NVMF_MAX_SGL_ENTRIES; 2650 2651 rtransport = calloc(1, sizeof(*rtransport)); 2652 if (!rtransport) { 2653 return NULL; 2654 } 2655 2656 TAILQ_INIT(&rtransport->devices); 2657 TAILQ_INIT(&rtransport->ports); 2658 TAILQ_INIT(&rtransport->poll_groups); 2659 TAILQ_INIT(&rtransport->retry_ports); 2660 2661 rtransport->transport.ops = &spdk_nvmf_transport_rdma; 2662 rtransport->rdma_opts.num_cqe = DEFAULT_NVMF_RDMA_CQ_SIZE; 2663 rtransport->rdma_opts.max_srq_depth = SPDK_NVMF_RDMA_DEFAULT_SRQ_DEPTH; 2664 rtransport->rdma_opts.no_srq = SPDK_NVMF_RDMA_DEFAULT_NO_SRQ; 2665 rtransport->rdma_opts.acceptor_backlog = SPDK_NVMF_RDMA_ACCEPTOR_BACKLOG; 2666 rtransport->rdma_opts.no_wr_batching = SPDK_NVMF_RDMA_DEFAULT_NO_WR_BATCHING; 2667 if (opts->transport_specific != NULL && 2668 spdk_json_decode_object_relaxed(opts->transport_specific, rdma_transport_opts_decoder, 2669 SPDK_COUNTOF(rdma_transport_opts_decoder), 2670 &rtransport->rdma_opts)) { 2671 SPDK_ERRLOG("spdk_json_decode_object_relaxed failed\n"); 2672 nvmf_rdma_destroy(&rtransport->transport, NULL, NULL); 2673 return NULL; 2674 } 2675 2676 SPDK_INFOLOG(rdma, "*** RDMA Transport Init ***\n" 2677 " Transport opts: max_ioq_depth=%d, max_io_size=%d,\n" 2678 " max_io_qpairs_per_ctrlr=%d, io_unit_size=%d,\n" 2679 " in_capsule_data_size=%d, max_aq_depth=%d,\n" 2680 " num_shared_buffers=%d, num_cqe=%d, max_srq_depth=%d, no_srq=%d," 2681 " acceptor_backlog=%d, no_wr_batching=%d abort_timeout_sec=%d\n", 2682 opts->max_queue_depth, 2683 opts->max_io_size, 2684 opts->max_qpairs_per_ctrlr - 1, 2685 opts->io_unit_size, 2686 opts->in_capsule_data_size, 2687 opts->max_aq_depth, 2688 opts->num_shared_buffers, 2689 rtransport->rdma_opts.num_cqe, 2690 rtransport->rdma_opts.max_srq_depth, 2691 rtransport->rdma_opts.no_srq, 2692 rtransport->rdma_opts.acceptor_backlog, 2693 rtransport->rdma_opts.no_wr_batching, 2694 opts->abort_timeout_sec); 2695 2696 /* I/O unit size cannot be larger than max I/O size */ 2697 if (opts->io_unit_size > opts->max_io_size) { 2698 opts->io_unit_size = opts->max_io_size; 2699 } 2700 2701 if (rtransport->rdma_opts.acceptor_backlog <= 0) { 2702 SPDK_ERRLOG("The acceptor backlog cannot be less than 1, setting to the default value of (%d).\n", 2703 SPDK_NVMF_RDMA_ACCEPTOR_BACKLOG); 2704 rtransport->rdma_opts.acceptor_backlog = SPDK_NVMF_RDMA_ACCEPTOR_BACKLOG; 2705 } 2706 2707 if (opts->num_shared_buffers < (SPDK_NVMF_MAX_SGL_ENTRIES * 2)) { 2708 SPDK_ERRLOG("The number of shared data buffers (%d) is less than" 2709 "the minimum number required to guarantee that forward progress can be made (%d)\n", 2710 opts->num_shared_buffers, (SPDK_NVMF_MAX_SGL_ENTRIES * 2)); 2711 nvmf_rdma_destroy(&rtransport->transport, NULL, NULL); 2712 return NULL; 2713 } 2714 2715 /* If buf_cache_size == UINT32_MAX, we will dynamically pick a cache size later that we know will fit. */ 2716 if (opts->buf_cache_size < UINT32_MAX) { 2717 min_shared_buffers = spdk_env_get_core_count() * opts->buf_cache_size; 2718 if (min_shared_buffers > opts->num_shared_buffers) { 2719 SPDK_ERRLOG("There are not enough buffers to satisfy" 2720 "per-poll group caches for each thread. (%" PRIu32 ")" 2721 "supplied. (%" PRIu32 ") required\n", opts->num_shared_buffers, min_shared_buffers); 2722 SPDK_ERRLOG("Please specify a larger number of shared buffers\n"); 2723 nvmf_rdma_destroy(&rtransport->transport, NULL, NULL); 2724 return NULL; 2725 } 2726 } 2727 2728 sge_count = opts->max_io_size / opts->io_unit_size; 2729 if (sge_count > NVMF_DEFAULT_TX_SGE) { 2730 SPDK_ERRLOG("Unsupported IO Unit size specified, %d bytes\n", opts->io_unit_size); 2731 nvmf_rdma_destroy(&rtransport->transport, NULL, NULL); 2732 return NULL; 2733 } 2734 2735 min_in_capsule_data_size = sizeof(struct spdk_nvme_sgl_descriptor) * SPDK_NVMF_MAX_SGL_ENTRIES; 2736 if (opts->in_capsule_data_size < min_in_capsule_data_size) { 2737 SPDK_WARNLOG("In capsule data size is set to %u, this is minimum size required to support msdbd=16\n", 2738 min_in_capsule_data_size); 2739 opts->in_capsule_data_size = min_in_capsule_data_size; 2740 } 2741 2742 rtransport->event_channel = rdma_create_event_channel(); 2743 if (rtransport->event_channel == NULL) { 2744 SPDK_ERRLOG("rdma_create_event_channel() failed, %s\n", spdk_strerror(errno)); 2745 nvmf_rdma_destroy(&rtransport->transport, NULL, NULL); 2746 return NULL; 2747 } 2748 2749 flag = fcntl(rtransport->event_channel->fd, F_GETFL); 2750 if (fcntl(rtransport->event_channel->fd, F_SETFL, flag | O_NONBLOCK) < 0) { 2751 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n", 2752 rtransport->event_channel->fd, spdk_strerror(errno)); 2753 nvmf_rdma_destroy(&rtransport->transport, NULL, NULL); 2754 return NULL; 2755 } 2756 2757 data_wr_pool_size = opts->data_wr_pool_size; 2758 if (data_wr_pool_size < SPDK_NVMF_MAX_SGL_ENTRIES * 2 * spdk_env_get_core_count()) { 2759 data_wr_pool_size = SPDK_NVMF_MAX_SGL_ENTRIES * 2 * spdk_env_get_core_count(); 2760 SPDK_NOTICELOG("data_wr_pool_size is changed to %zu to guarantee enough cache for handling " 2761 "at least one IO in each core\n", data_wr_pool_size); 2762 } 2763 rtransport->data_wr_pool = spdk_mempool_create("spdk_nvmf_rdma_wr_data", data_wr_pool_size, 2764 sizeof(struct spdk_nvmf_rdma_request_data), SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 2765 SPDK_ENV_NUMA_ID_ANY); 2766 if (!rtransport->data_wr_pool) { 2767 if (spdk_mempool_lookup("spdk_nvmf_rdma_wr_data") != NULL) { 2768 SPDK_ERRLOG("Unable to allocate work request pool for poll group: already exists\n"); 2769 SPDK_ERRLOG("Probably running in multiprocess environment, which is " 2770 "unsupported by the nvmf library\n"); 2771 } else { 2772 SPDK_ERRLOG("Unable to allocate work request pool for poll group\n"); 2773 } 2774 nvmf_rdma_destroy(&rtransport->transport, NULL, NULL); 2775 return NULL; 2776 } 2777 2778 contexts = rdma_get_devices(NULL); 2779 if (contexts == NULL) { 2780 SPDK_ERRLOG("rdma_get_devices() failed: %s (%d)\n", spdk_strerror(errno), errno); 2781 nvmf_rdma_destroy(&rtransport->transport, NULL, NULL); 2782 return NULL; 2783 } 2784 2785 i = 0; 2786 rc = 0; 2787 while (contexts[i] != NULL) { 2788 rc = create_ib_device(rtransport, contexts[i], &device); 2789 if (rc < 0) { 2790 break; 2791 } 2792 i++; 2793 max_device_sge = spdk_min(max_device_sge, device->attr.max_sge); 2794 device->is_ready = true; 2795 } 2796 rdma_free_devices(contexts); 2797 2798 if (opts->io_unit_size * max_device_sge < opts->max_io_size) { 2799 /* divide and round up. */ 2800 opts->io_unit_size = (opts->max_io_size + max_device_sge - 1) / max_device_sge; 2801 2802 /* round up to the nearest 4k. */ 2803 opts->io_unit_size = (opts->io_unit_size + NVMF_DATA_BUFFER_ALIGNMENT - 1) & ~NVMF_DATA_BUFFER_MASK; 2804 2805 opts->io_unit_size = spdk_max(opts->io_unit_size, SPDK_NVMF_RDMA_MIN_IO_BUFFER_SIZE); 2806 SPDK_NOTICELOG("Adjusting the io unit size to fit the device's maximum I/O size. New I/O unit size %u\n", 2807 opts->io_unit_size); 2808 } 2809 2810 if (rc < 0) { 2811 nvmf_rdma_destroy(&rtransport->transport, NULL, NULL); 2812 return NULL; 2813 } 2814 2815 rc = generate_poll_fds(rtransport); 2816 if (rc < 0) { 2817 nvmf_rdma_destroy(&rtransport->transport, NULL, NULL); 2818 return NULL; 2819 } 2820 2821 rtransport->accept_poller = SPDK_POLLER_REGISTER(nvmf_rdma_accept, &rtransport->transport, 2822 opts->acceptor_poll_rate); 2823 if (!rtransport->accept_poller) { 2824 nvmf_rdma_destroy(&rtransport->transport, NULL, NULL); 2825 return NULL; 2826 } 2827 2828 return &rtransport->transport; 2829 } 2830 2831 static void 2832 destroy_ib_device(struct spdk_nvmf_rdma_transport *rtransport, 2833 struct spdk_nvmf_rdma_device *device) 2834 { 2835 TAILQ_REMOVE(&rtransport->devices, device, link); 2836 spdk_rdma_utils_free_mem_map(&device->map); 2837 if (device->pd) { 2838 if (!g_nvmf_hooks.get_ibv_pd) { 2839 ibv_dealloc_pd(device->pd); 2840 } 2841 } 2842 SPDK_DEBUGLOG(rdma, "IB device [%p] is destroyed.\n", device); 2843 free(device); 2844 } 2845 2846 static void 2847 nvmf_rdma_dump_opts(struct spdk_nvmf_transport *transport, struct spdk_json_write_ctx *w) 2848 { 2849 struct spdk_nvmf_rdma_transport *rtransport; 2850 assert(w != NULL); 2851 2852 rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport); 2853 spdk_json_write_named_uint32(w, "max_srq_depth", rtransport->rdma_opts.max_srq_depth); 2854 spdk_json_write_named_bool(w, "no_srq", rtransport->rdma_opts.no_srq); 2855 if (rtransport->rdma_opts.no_srq == true) { 2856 spdk_json_write_named_int32(w, "num_cqe", rtransport->rdma_opts.num_cqe); 2857 } 2858 spdk_json_write_named_int32(w, "acceptor_backlog", rtransport->rdma_opts.acceptor_backlog); 2859 spdk_json_write_named_bool(w, "no_wr_batching", rtransport->rdma_opts.no_wr_batching); 2860 } 2861 2862 static int 2863 nvmf_rdma_destroy(struct spdk_nvmf_transport *transport, 2864 spdk_nvmf_transport_destroy_done_cb cb_fn, void *cb_arg) 2865 { 2866 struct spdk_nvmf_rdma_transport *rtransport; 2867 struct spdk_nvmf_rdma_port *port, *port_tmp; 2868 struct spdk_nvmf_rdma_device *device, *device_tmp; 2869 2870 rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport); 2871 2872 TAILQ_FOREACH_SAFE(port, &rtransport->retry_ports, link, port_tmp) { 2873 TAILQ_REMOVE(&rtransport->retry_ports, port, link); 2874 free(port); 2875 } 2876 2877 TAILQ_FOREACH_SAFE(port, &rtransport->ports, link, port_tmp) { 2878 TAILQ_REMOVE(&rtransport->ports, port, link); 2879 rdma_destroy_id(port->id); 2880 free(port); 2881 } 2882 2883 free_poll_fds(rtransport); 2884 2885 if (rtransport->event_channel != NULL) { 2886 rdma_destroy_event_channel(rtransport->event_channel); 2887 } 2888 2889 TAILQ_FOREACH_SAFE(device, &rtransport->devices, link, device_tmp) { 2890 destroy_ib_device(rtransport, device); 2891 } 2892 2893 if (rtransport->data_wr_pool != NULL) { 2894 if (spdk_mempool_count(rtransport->data_wr_pool) != transport->opts.data_wr_pool_size) { 2895 SPDK_ERRLOG("transport wr pool count is %zu but should be %u\n", 2896 spdk_mempool_count(rtransport->data_wr_pool), 2897 transport->opts.max_queue_depth * SPDK_NVMF_MAX_SGL_ENTRIES); 2898 } 2899 } 2900 2901 spdk_mempool_free(rtransport->data_wr_pool); 2902 2903 spdk_poller_unregister(&rtransport->accept_poller); 2904 free(rtransport); 2905 2906 if (cb_fn) { 2907 cb_fn(cb_arg); 2908 } 2909 return 0; 2910 } 2911 2912 static int nvmf_rdma_trid_from_cm_id(struct rdma_cm_id *id, 2913 struct spdk_nvme_transport_id *trid, 2914 bool peer); 2915 2916 static bool nvmf_rdma_rescan_devices(struct spdk_nvmf_rdma_transport *rtransport); 2917 2918 static int 2919 nvmf_rdma_listen(struct spdk_nvmf_transport *transport, const struct spdk_nvme_transport_id *trid, 2920 struct spdk_nvmf_listen_opts *listen_opts) 2921 { 2922 struct spdk_nvmf_rdma_transport *rtransport; 2923 struct spdk_nvmf_rdma_device *device; 2924 struct spdk_nvmf_rdma_port *port, *tmp_port; 2925 struct addrinfo *res; 2926 struct addrinfo hints; 2927 int family; 2928 int rc; 2929 long int port_val; 2930 bool is_retry = false; 2931 2932 if (!strlen(trid->trsvcid)) { 2933 SPDK_ERRLOG("Service id is required\n"); 2934 return -EINVAL; 2935 } 2936 2937 rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport); 2938 assert(rtransport->event_channel != NULL); 2939 2940 port = calloc(1, sizeof(*port)); 2941 if (!port) { 2942 SPDK_ERRLOG("Port allocation failed\n"); 2943 return -ENOMEM; 2944 } 2945 2946 port->trid = trid; 2947 2948 switch (trid->adrfam) { 2949 case SPDK_NVMF_ADRFAM_IPV4: 2950 family = AF_INET; 2951 break; 2952 case SPDK_NVMF_ADRFAM_IPV6: 2953 family = AF_INET6; 2954 break; 2955 default: 2956 SPDK_ERRLOG("Unhandled ADRFAM %d\n", trid->adrfam); 2957 free(port); 2958 return -EINVAL; 2959 } 2960 2961 memset(&hints, 0, sizeof(hints)); 2962 hints.ai_family = family; 2963 hints.ai_flags = AI_NUMERICSERV; 2964 hints.ai_socktype = SOCK_STREAM; 2965 hints.ai_protocol = 0; 2966 2967 /* Range check the trsvcid. Fail in 3 cases: 2968 * < 0: means that spdk_strtol hit an error 2969 * 0: this results in ephemeral port which we don't want 2970 * > 65535: port too high 2971 */ 2972 port_val = spdk_strtol(trid->trsvcid, 10); 2973 if (port_val <= 0 || port_val > 65535) { 2974 SPDK_ERRLOG("invalid trsvcid %s\n", trid->trsvcid); 2975 free(port); 2976 return -EINVAL; 2977 } 2978 2979 rc = getaddrinfo(trid->traddr, trid->trsvcid, &hints, &res); 2980 if (rc) { 2981 SPDK_ERRLOG("getaddrinfo failed: %s (%d)\n", gai_strerror(rc), rc); 2982 free(port); 2983 return -(abs(rc)); 2984 } 2985 2986 rc = rdma_create_id(rtransport->event_channel, &port->id, port, RDMA_PS_TCP); 2987 if (rc < 0) { 2988 SPDK_ERRLOG("rdma_create_id() failed\n"); 2989 freeaddrinfo(res); 2990 free(port); 2991 return rc; 2992 } 2993 2994 rc = rdma_bind_addr(port->id, res->ai_addr); 2995 freeaddrinfo(res); 2996 2997 if (rc < 0) { 2998 TAILQ_FOREACH(tmp_port, &rtransport->retry_ports, link) { 2999 if (spdk_nvme_transport_id_compare(tmp_port->trid, trid) == 0) { 3000 is_retry = true; 3001 break; 3002 } 3003 } 3004 if (!is_retry) { 3005 SPDK_ERRLOG("rdma_bind_addr() failed\n"); 3006 } 3007 rdma_destroy_id(port->id); 3008 free(port); 3009 return rc; 3010 } 3011 3012 if (!port->id->verbs) { 3013 SPDK_ERRLOG("ibv_context is null\n"); 3014 rdma_destroy_id(port->id); 3015 free(port); 3016 return -1; 3017 } 3018 3019 rc = rdma_listen(port->id, rtransport->rdma_opts.acceptor_backlog); 3020 if (rc < 0) { 3021 SPDK_ERRLOG("rdma_listen() failed\n"); 3022 rdma_destroy_id(port->id); 3023 free(port); 3024 return rc; 3025 } 3026 3027 TAILQ_FOREACH(device, &rtransport->devices, link) { 3028 if (device->context == port->id->verbs && device->is_ready) { 3029 port->device = device; 3030 break; 3031 } 3032 } 3033 if (!port->device) { 3034 SPDK_ERRLOG("Accepted a connection with verbs %p, but unable to find a corresponding device.\n", 3035 port->id->verbs); 3036 rdma_destroy_id(port->id); 3037 free(port); 3038 nvmf_rdma_rescan_devices(rtransport); 3039 return -EINVAL; 3040 } 3041 3042 SPDK_NOTICELOG("*** NVMe/RDMA Target Listening on %s port %s ***\n", 3043 trid->traddr, trid->trsvcid); 3044 3045 TAILQ_INSERT_TAIL(&rtransport->ports, port, link); 3046 return 0; 3047 } 3048 3049 static void 3050 nvmf_rdma_stop_listen_ex(struct spdk_nvmf_transport *transport, 3051 const struct spdk_nvme_transport_id *trid, bool need_retry) 3052 { 3053 struct spdk_nvmf_rdma_transport *rtransport; 3054 struct spdk_nvmf_rdma_port *port, *tmp; 3055 3056 rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport); 3057 3058 if (!need_retry) { 3059 TAILQ_FOREACH_SAFE(port, &rtransport->retry_ports, link, tmp) { 3060 if (spdk_nvme_transport_id_compare(port->trid, trid) == 0) { 3061 TAILQ_REMOVE(&rtransport->retry_ports, port, link); 3062 free(port); 3063 } 3064 } 3065 } 3066 3067 TAILQ_FOREACH_SAFE(port, &rtransport->ports, link, tmp) { 3068 if (spdk_nvme_transport_id_compare(port->trid, trid) == 0) { 3069 SPDK_DEBUGLOG(rdma, "Port %s:%s removed. need retry: %d\n", 3070 port->trid->traddr, port->trid->trsvcid, need_retry); 3071 TAILQ_REMOVE(&rtransport->ports, port, link); 3072 rdma_destroy_id(port->id); 3073 port->id = NULL; 3074 port->device = NULL; 3075 if (need_retry) { 3076 TAILQ_INSERT_TAIL(&rtransport->retry_ports, port, link); 3077 } else { 3078 free(port); 3079 } 3080 break; 3081 } 3082 } 3083 } 3084 3085 static void 3086 nvmf_rdma_stop_listen(struct spdk_nvmf_transport *transport, 3087 const struct spdk_nvme_transport_id *trid) 3088 { 3089 nvmf_rdma_stop_listen_ex(transport, trid, false); 3090 } 3091 3092 static void _nvmf_rdma_register_poller_in_group(void *c); 3093 static void _nvmf_rdma_remove_poller_in_group(void *c); 3094 3095 static bool 3096 nvmf_rdma_all_pollers_management_done(void *c) 3097 { 3098 struct poller_manage_ctx *ctx = c; 3099 int counter; 3100 3101 counter = __atomic_sub_fetch(ctx->inflight_op_counter, 1, __ATOMIC_SEQ_CST); 3102 SPDK_DEBUGLOG(rdma, "nvmf_rdma_all_pollers_management_done called. counter: %d, poller: %p\n", 3103 counter, ctx->rpoller); 3104 3105 if (counter == 0) { 3106 free((void *)ctx->inflight_op_counter); 3107 } 3108 free(ctx); 3109 3110 return counter == 0; 3111 } 3112 3113 static int 3114 nvmf_rdma_manage_poller(struct spdk_nvmf_rdma_transport *rtransport, 3115 struct spdk_nvmf_rdma_device *device, bool *has_inflight, bool is_add) 3116 { 3117 struct spdk_nvmf_rdma_poll_group *rgroup; 3118 struct spdk_nvmf_rdma_poller *rpoller; 3119 struct spdk_nvmf_poll_group *poll_group; 3120 struct poller_manage_ctx *ctx; 3121 bool found; 3122 int *inflight_counter; 3123 spdk_msg_fn do_fn; 3124 3125 *has_inflight = false; 3126 do_fn = is_add ? _nvmf_rdma_register_poller_in_group : _nvmf_rdma_remove_poller_in_group; 3127 inflight_counter = calloc(1, sizeof(int)); 3128 if (!inflight_counter) { 3129 SPDK_ERRLOG("Failed to allocate inflight counter when removing pollers\n"); 3130 return -ENOMEM; 3131 } 3132 3133 TAILQ_FOREACH(rgroup, &rtransport->poll_groups, link) { 3134 (*inflight_counter)++; 3135 } 3136 3137 TAILQ_FOREACH(rgroup, &rtransport->poll_groups, link) { 3138 found = false; 3139 TAILQ_FOREACH(rpoller, &rgroup->pollers, link) { 3140 if (rpoller->device == device) { 3141 found = true; 3142 break; 3143 } 3144 } 3145 if (found == is_add) { 3146 __atomic_fetch_sub(inflight_counter, 1, __ATOMIC_SEQ_CST); 3147 continue; 3148 } 3149 3150 ctx = calloc(1, sizeof(struct poller_manage_ctx)); 3151 if (!ctx) { 3152 SPDK_ERRLOG("Failed to allocate poller_manage_ctx when removing pollers\n"); 3153 if (!*has_inflight) { 3154 free(inflight_counter); 3155 } 3156 return -ENOMEM; 3157 } 3158 3159 ctx->rtransport = rtransport; 3160 ctx->rgroup = rgroup; 3161 ctx->rpoller = rpoller; 3162 ctx->device = device; 3163 ctx->thread = spdk_get_thread(); 3164 ctx->inflight_op_counter = inflight_counter; 3165 *has_inflight = true; 3166 3167 poll_group = rgroup->group.group; 3168 if (poll_group->thread != spdk_get_thread()) { 3169 spdk_thread_send_msg(poll_group->thread, do_fn, ctx); 3170 } else { 3171 do_fn(ctx); 3172 } 3173 } 3174 3175 if (!*has_inflight) { 3176 free(inflight_counter); 3177 } 3178 3179 return 0; 3180 } 3181 3182 static void nvmf_rdma_handle_device_removal(struct spdk_nvmf_rdma_transport *rtransport, 3183 struct spdk_nvmf_rdma_device *device); 3184 3185 static struct spdk_nvmf_rdma_device * 3186 nvmf_rdma_find_ib_device(struct spdk_nvmf_rdma_transport *rtransport, 3187 struct ibv_context *context) 3188 { 3189 struct spdk_nvmf_rdma_device *device, *tmp_device; 3190 3191 TAILQ_FOREACH_SAFE(device, &rtransport->devices, link, tmp_device) { 3192 if (device->need_destroy) { 3193 continue; 3194 } 3195 3196 if (strcmp(device->context->device->dev_name, context->device->dev_name) == 0) { 3197 return device; 3198 } 3199 } 3200 3201 return NULL; 3202 } 3203 3204 static bool 3205 nvmf_rdma_check_devices_context(struct spdk_nvmf_rdma_transport *rtransport, 3206 struct ibv_context *context) 3207 { 3208 struct spdk_nvmf_rdma_device *old_device, *new_device; 3209 int rc = 0; 3210 bool has_inflight; 3211 3212 old_device = nvmf_rdma_find_ib_device(rtransport, context); 3213 3214 if (old_device) { 3215 if (old_device->context != context && !old_device->need_destroy && old_device->is_ready) { 3216 /* context may not have time to be cleaned when rescan. exactly one context 3217 * is valid for a device so this context must be invalid and just remove it. */ 3218 SPDK_WARNLOG("Device %p has a invalid context %p\n", old_device, old_device->context); 3219 old_device->need_destroy = true; 3220 nvmf_rdma_handle_device_removal(rtransport, old_device); 3221 } 3222 return false; 3223 } 3224 3225 rc = create_ib_device(rtransport, context, &new_device); 3226 /* TODO: update transport opts. */ 3227 if (rc < 0) { 3228 SPDK_ERRLOG("Failed to create ib device for context: %s(%p)\n", 3229 ibv_get_device_name(context->device), context); 3230 return false; 3231 } 3232 3233 rc = nvmf_rdma_manage_poller(rtransport, new_device, &has_inflight, true); 3234 if (rc < 0) { 3235 SPDK_ERRLOG("Failed to add poller for device context: %s(%p)\n", 3236 ibv_get_device_name(context->device), context); 3237 return false; 3238 } 3239 3240 if (has_inflight) { 3241 new_device->is_ready = true; 3242 } 3243 3244 return true; 3245 } 3246 3247 static bool 3248 nvmf_rdma_rescan_devices(struct spdk_nvmf_rdma_transport *rtransport) 3249 { 3250 struct spdk_nvmf_rdma_device *device; 3251 struct ibv_device **ibv_device_list = NULL; 3252 struct ibv_context **contexts = NULL; 3253 int i = 0; 3254 int num_dev = 0; 3255 bool new_create = false, has_new_device = false; 3256 struct ibv_context *tmp_verbs = NULL; 3257 3258 /* do not rescan when any device is destroying, or context may be freed when 3259 * regenerating the poll fds. 3260 */ 3261 TAILQ_FOREACH(device, &rtransport->devices, link) { 3262 if (device->need_destroy) { 3263 return false; 3264 } 3265 } 3266 3267 ibv_device_list = ibv_get_device_list(&num_dev); 3268 3269 /* There is a bug in librdmacm. If verbs init failed in rdma_get_devices, it'll be 3270 * marked as dead verbs and never be init again. So we need to make sure the 3271 * verbs is available before we call rdma_get_devices. */ 3272 if (num_dev >= 0) { 3273 for (i = 0; i < num_dev; i++) { 3274 tmp_verbs = ibv_open_device(ibv_device_list[i]); 3275 if (!tmp_verbs) { 3276 SPDK_WARNLOG("Failed to init ibv device %p, err %d. Skip rescan.\n", ibv_device_list[i], errno); 3277 break; 3278 } 3279 if (nvmf_rdma_find_ib_device(rtransport, tmp_verbs) == NULL) { 3280 SPDK_DEBUGLOG(rdma, "Find new verbs init ibv device %p(%s).\n", ibv_device_list[i], 3281 tmp_verbs->device->dev_name); 3282 has_new_device = true; 3283 } 3284 ibv_close_device(tmp_verbs); 3285 } 3286 ibv_free_device_list(ibv_device_list); 3287 if (!tmp_verbs || !has_new_device) { 3288 return false; 3289 } 3290 } 3291 3292 contexts = rdma_get_devices(NULL); 3293 3294 for (i = 0; contexts && contexts[i] != NULL; i++) { 3295 new_create |= nvmf_rdma_check_devices_context(rtransport, contexts[i]); 3296 } 3297 3298 if (new_create) { 3299 free_poll_fds(rtransport); 3300 generate_poll_fds(rtransport); 3301 } 3302 3303 if (contexts) { 3304 rdma_free_devices(contexts); 3305 } 3306 3307 return new_create; 3308 } 3309 3310 static bool 3311 nvmf_rdma_retry_listen_port(struct spdk_nvmf_rdma_transport *rtransport) 3312 { 3313 struct spdk_nvmf_rdma_port *port, *tmp_port; 3314 int rc = 0; 3315 bool new_create = false; 3316 3317 if (TAILQ_EMPTY(&rtransport->retry_ports)) { 3318 return false; 3319 } 3320 3321 new_create = nvmf_rdma_rescan_devices(rtransport); 3322 3323 TAILQ_FOREACH_SAFE(port, &rtransport->retry_ports, link, tmp_port) { 3324 rc = nvmf_rdma_listen(&rtransport->transport, port->trid, NULL); 3325 3326 TAILQ_REMOVE(&rtransport->retry_ports, port, link); 3327 if (rc) { 3328 if (new_create) { 3329 SPDK_ERRLOG("Found new IB device but port %s:%s is still failed(%d) to listen.\n", 3330 port->trid->traddr, port->trid->trsvcid, rc); 3331 } 3332 TAILQ_INSERT_TAIL(&rtransport->retry_ports, port, link); 3333 break; 3334 } else { 3335 SPDK_NOTICELOG("Port %s:%s come back\n", port->trid->traddr, port->trid->trsvcid); 3336 free(port); 3337 } 3338 } 3339 3340 return true; 3341 } 3342 3343 static void 3344 nvmf_rdma_qpair_process_pending(struct spdk_nvmf_rdma_transport *rtransport, 3345 struct spdk_nvmf_rdma_qpair *rqpair, bool drain) 3346 { 3347 struct spdk_nvmf_request *req, *tmp; 3348 struct spdk_nvmf_rdma_request *rdma_req, *req_tmp; 3349 struct spdk_nvmf_rdma_resources *resources; 3350 3351 /* First process requests which are waiting for response to be sent */ 3352 STAILQ_FOREACH_SAFE(rdma_req, &rqpair->pending_rdma_send_queue, state_link, req_tmp) { 3353 if (nvmf_rdma_request_process(rtransport, rdma_req) == false && drain == false) { 3354 break; 3355 } 3356 } 3357 3358 /* We process I/O in the data transfer pending queue at the highest priority. */ 3359 STAILQ_FOREACH_SAFE(rdma_req, &rqpair->pending_rdma_read_queue, state_link, req_tmp) { 3360 if (rdma_req->state != RDMA_REQUEST_STATE_DATA_TRANSFER_TO_CONTROLLER_PENDING) { 3361 /* Requests in this queue might be in state RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER, 3362 * they are transmitting data over network but we keep them in the list to guarantee 3363 * fair processing. */ 3364 continue; 3365 } 3366 if (nvmf_rdma_request_process(rtransport, rdma_req) == false && drain == false) { 3367 break; 3368 } 3369 } 3370 3371 /* Then RDMA writes since reads have stronger restrictions than writes */ 3372 STAILQ_FOREACH_SAFE(rdma_req, &rqpair->pending_rdma_write_queue, state_link, req_tmp) { 3373 if (nvmf_rdma_request_process(rtransport, rdma_req) == false && drain == false) { 3374 break; 3375 } 3376 } 3377 3378 /* Then we handle request waiting on memory buffers. */ 3379 STAILQ_FOREACH_SAFE(req, &rqpair->poller->group->group.pending_buf_queue, buf_link, tmp) { 3380 rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req); 3381 if (nvmf_rdma_request_process(rtransport, rdma_req) == false && drain == false) { 3382 break; 3383 } 3384 } 3385 3386 resources = rqpair->resources; 3387 while (!STAILQ_EMPTY(&resources->free_queue) && !STAILQ_EMPTY(&resources->incoming_queue)) { 3388 rdma_req = STAILQ_FIRST(&resources->free_queue); 3389 STAILQ_REMOVE_HEAD(&resources->free_queue, state_link); 3390 rdma_req->recv = STAILQ_FIRST(&resources->incoming_queue); 3391 STAILQ_REMOVE_HEAD(&resources->incoming_queue, link); 3392 3393 if (rqpair->srq != NULL) { 3394 rdma_req->req.qpair = &rdma_req->recv->qpair->qpair; 3395 rdma_req->recv->qpair->qd++; 3396 } else { 3397 rqpair->qd++; 3398 } 3399 3400 rdma_req->receive_tsc = rdma_req->recv->receive_tsc; 3401 rdma_req->state = RDMA_REQUEST_STATE_NEW; 3402 if (nvmf_rdma_request_process(rtransport, rdma_req) == false) { 3403 break; 3404 } 3405 } 3406 if (!STAILQ_EMPTY(&resources->incoming_queue) && STAILQ_EMPTY(&resources->free_queue)) { 3407 rqpair->poller->stat.pending_free_request++; 3408 } 3409 } 3410 3411 static void 3412 nvmf_rdma_poller_process_pending_buf_queue(struct spdk_nvmf_rdma_transport *rtransport, 3413 struct spdk_nvmf_rdma_poller *rpoller) 3414 { 3415 struct spdk_nvmf_request *req, *tmp; 3416 struct spdk_nvmf_rdma_request *rdma_req; 3417 3418 STAILQ_FOREACH_SAFE(req, &rpoller->group->group.pending_buf_queue, buf_link, tmp) { 3419 rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req); 3420 if (nvmf_rdma_request_process(rtransport, rdma_req) == false) { 3421 break; 3422 } 3423 } 3424 } 3425 3426 static inline bool 3427 nvmf_rdma_can_ignore_last_wqe_reached(struct spdk_nvmf_rdma_device *device) 3428 { 3429 /* iWARP transport and SoftRoCE driver don't support LAST_WQE_REACHED ibv async event */ 3430 return nvmf_rdma_is_rxe_device(device) || 3431 device->context->device->transport_type == IBV_TRANSPORT_IWARP; 3432 } 3433 3434 static void 3435 nvmf_rdma_destroy_drained_qpair(struct spdk_nvmf_rdma_qpair *rqpair) 3436 { 3437 struct spdk_nvmf_rdma_transport *rtransport = SPDK_CONTAINEROF(rqpair->qpair.transport, 3438 struct spdk_nvmf_rdma_transport, transport); 3439 3440 nvmf_rdma_qpair_process_pending(rtransport, rqpair, true); 3441 3442 /* nvmf_rdma_close_qpair is not called */ 3443 if (!rqpair->to_close) { 3444 return; 3445 } 3446 3447 /* device is already destroyed and we should force destroy this qpair. */ 3448 if (rqpair->poller && rqpair->poller->need_destroy) { 3449 nvmf_rdma_qpair_destroy(rqpair); 3450 return; 3451 } 3452 3453 /* In non SRQ path, we will reach rqpair->max_queue_depth. In SRQ path, we will get the last_wqe event. */ 3454 if (rqpair->current_send_depth != 0) { 3455 return; 3456 } 3457 3458 if (rqpair->srq == NULL && rqpair->current_recv_depth != rqpair->max_queue_depth) { 3459 return; 3460 } 3461 3462 if (rqpair->srq != NULL && rqpair->last_wqe_reached == false && 3463 !nvmf_rdma_can_ignore_last_wqe_reached(rqpair->device)) { 3464 return; 3465 } 3466 3467 assert(rqpair->qpair.state == SPDK_NVMF_QPAIR_ERROR); 3468 3469 nvmf_rdma_qpair_destroy(rqpair); 3470 } 3471 3472 static int 3473 nvmf_rdma_disconnect(struct rdma_cm_event *evt, bool *event_acked) 3474 { 3475 struct spdk_nvmf_qpair *qpair; 3476 struct spdk_nvmf_rdma_qpair *rqpair; 3477 3478 if (evt->id == NULL) { 3479 SPDK_ERRLOG("disconnect request: missing cm_id\n"); 3480 return -1; 3481 } 3482 3483 qpair = evt->id->context; 3484 if (qpair == NULL) { 3485 SPDK_ERRLOG("disconnect request: no active connection\n"); 3486 return -1; 3487 } 3488 3489 rdma_ack_cm_event(evt); 3490 *event_acked = true; 3491 3492 rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair); 3493 3494 spdk_trace_record(TRACE_RDMA_QP_DISCONNECT, 0, 0, (uintptr_t)rqpair); 3495 3496 spdk_nvmf_qpair_disconnect(&rqpair->qpair); 3497 3498 return 0; 3499 } 3500 3501 #ifdef DEBUG 3502 static const char *CM_EVENT_STR[] = { 3503 "RDMA_CM_EVENT_ADDR_RESOLVED", 3504 "RDMA_CM_EVENT_ADDR_ERROR", 3505 "RDMA_CM_EVENT_ROUTE_RESOLVED", 3506 "RDMA_CM_EVENT_ROUTE_ERROR", 3507 "RDMA_CM_EVENT_CONNECT_REQUEST", 3508 "RDMA_CM_EVENT_CONNECT_RESPONSE", 3509 "RDMA_CM_EVENT_CONNECT_ERROR", 3510 "RDMA_CM_EVENT_UNREACHABLE", 3511 "RDMA_CM_EVENT_REJECTED", 3512 "RDMA_CM_EVENT_ESTABLISHED", 3513 "RDMA_CM_EVENT_DISCONNECTED", 3514 "RDMA_CM_EVENT_DEVICE_REMOVAL", 3515 "RDMA_CM_EVENT_MULTICAST_JOIN", 3516 "RDMA_CM_EVENT_MULTICAST_ERROR", 3517 "RDMA_CM_EVENT_ADDR_CHANGE", 3518 "RDMA_CM_EVENT_TIMEWAIT_EXIT" 3519 }; 3520 #endif /* DEBUG */ 3521 3522 static void 3523 nvmf_rdma_disconnect_qpairs_on_port(struct spdk_nvmf_rdma_transport *rtransport, 3524 struct spdk_nvmf_rdma_port *port) 3525 { 3526 struct spdk_nvmf_rdma_poll_group *rgroup; 3527 struct spdk_nvmf_rdma_poller *rpoller; 3528 struct spdk_nvmf_rdma_qpair *rqpair; 3529 3530 TAILQ_FOREACH(rgroup, &rtransport->poll_groups, link) { 3531 TAILQ_FOREACH(rpoller, &rgroup->pollers, link) { 3532 RB_FOREACH(rqpair, qpairs_tree, &rpoller->qpairs) { 3533 if (rqpair->listen_id == port->id) { 3534 spdk_nvmf_qpair_disconnect(&rqpair->qpair); 3535 } 3536 } 3537 } 3538 } 3539 } 3540 3541 static bool 3542 nvmf_rdma_handle_cm_event_addr_change(struct spdk_nvmf_transport *transport, 3543 struct rdma_cm_event *event) 3544 { 3545 const struct spdk_nvme_transport_id *trid; 3546 struct spdk_nvmf_rdma_port *port; 3547 struct spdk_nvmf_rdma_transport *rtransport; 3548 bool event_acked = false; 3549 3550 rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport); 3551 TAILQ_FOREACH(port, &rtransport->ports, link) { 3552 if (port->id == event->id) { 3553 SPDK_ERRLOG("ADDR_CHANGE: IP %s:%s migrated\n", port->trid->traddr, port->trid->trsvcid); 3554 rdma_ack_cm_event(event); 3555 event_acked = true; 3556 trid = port->trid; 3557 break; 3558 } 3559 } 3560 3561 if (event_acked) { 3562 nvmf_rdma_disconnect_qpairs_on_port(rtransport, port); 3563 3564 nvmf_rdma_stop_listen(transport, trid); 3565 nvmf_rdma_listen(transport, trid, NULL); 3566 } 3567 3568 return event_acked; 3569 } 3570 3571 static void 3572 nvmf_rdma_handle_device_removal(struct spdk_nvmf_rdma_transport *rtransport, 3573 struct spdk_nvmf_rdma_device *device) 3574 { 3575 struct spdk_nvmf_rdma_port *port, *port_tmp; 3576 int rc; 3577 bool has_inflight; 3578 3579 rc = nvmf_rdma_manage_poller(rtransport, device, &has_inflight, false); 3580 if (rc) { 3581 SPDK_ERRLOG("Failed to handle device removal, rc %d\n", rc); 3582 return; 3583 } 3584 3585 if (!has_inflight) { 3586 /* no pollers, destroy the device */ 3587 device->ready_to_destroy = true; 3588 spdk_thread_send_msg(spdk_get_thread(), _nvmf_rdma_remove_destroyed_device, rtransport); 3589 } 3590 3591 TAILQ_FOREACH_SAFE(port, &rtransport->ports, link, port_tmp) { 3592 if (port->device == device) { 3593 SPDK_NOTICELOG("Port %s:%s on device %s is being removed.\n", 3594 port->trid->traddr, 3595 port->trid->trsvcid, 3596 ibv_get_device_name(port->device->context->device)); 3597 3598 /* keep NVMF listener and only destroy structures of the 3599 * RDMA transport. when the device comes back we can retry listening 3600 * and the application's workflow will not be interrupted. 3601 */ 3602 nvmf_rdma_stop_listen_ex(&rtransport->transport, port->trid, true); 3603 } 3604 } 3605 } 3606 3607 static void 3608 nvmf_rdma_handle_cm_event_port_removal(struct spdk_nvmf_transport *transport, 3609 struct rdma_cm_event *event) 3610 { 3611 struct spdk_nvmf_rdma_port *port, *tmp_port; 3612 struct spdk_nvmf_rdma_transport *rtransport; 3613 3614 port = event->id->context; 3615 rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport); 3616 3617 rdma_ack_cm_event(event); 3618 3619 /* if device removal happens during ctrl qpair disconnecting, it's possible that we receive 3620 * an DEVICE_REMOVAL event on qpair but the id->qp is just NULL. So we should make sure that 3621 * we are handling a port event here. 3622 */ 3623 TAILQ_FOREACH(tmp_port, &rtransport->ports, link) { 3624 if (port == tmp_port && port->device && !port->device->need_destroy) { 3625 port->device->need_destroy = true; 3626 nvmf_rdma_handle_device_removal(rtransport, port->device); 3627 } 3628 } 3629 } 3630 3631 static void 3632 nvmf_process_cm_events(struct spdk_nvmf_transport *transport, uint32_t max_events) 3633 { 3634 struct spdk_nvmf_rdma_transport *rtransport; 3635 struct rdma_cm_event *event; 3636 uint32_t i; 3637 int rc; 3638 bool event_acked; 3639 3640 rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport); 3641 3642 if (rtransport->event_channel == NULL) { 3643 return; 3644 } 3645 3646 for (i = 0; i < max_events; i++) { 3647 event_acked = false; 3648 rc = rdma_get_cm_event(rtransport->event_channel, &event); 3649 if (rc) { 3650 if (errno != EAGAIN && errno != EWOULDBLOCK) { 3651 SPDK_ERRLOG("Acceptor Event Error: %s\n", spdk_strerror(errno)); 3652 } 3653 break; 3654 } 3655 3656 SPDK_DEBUGLOG(rdma, "Acceptor Event: %s\n", CM_EVENT_STR[event->event]); 3657 3658 spdk_trace_record(TRACE_RDMA_CM_ASYNC_EVENT, 0, 0, 0, event->event); 3659 3660 switch (event->event) { 3661 case RDMA_CM_EVENT_ADDR_RESOLVED: 3662 case RDMA_CM_EVENT_ADDR_ERROR: 3663 case RDMA_CM_EVENT_ROUTE_RESOLVED: 3664 case RDMA_CM_EVENT_ROUTE_ERROR: 3665 /* No action required. The target never attempts to resolve routes. */ 3666 break; 3667 case RDMA_CM_EVENT_CONNECT_REQUEST: 3668 rc = nvmf_rdma_connect(transport, event); 3669 if (rc < 0) { 3670 SPDK_ERRLOG("Unable to process connect event. rc: %d\n", rc); 3671 break; 3672 } 3673 break; 3674 case RDMA_CM_EVENT_CONNECT_RESPONSE: 3675 /* The target never initiates a new connection. So this will not occur. */ 3676 break; 3677 case RDMA_CM_EVENT_CONNECT_ERROR: 3678 /* Can this happen? The docs say it can, but not sure what causes it. */ 3679 break; 3680 case RDMA_CM_EVENT_UNREACHABLE: 3681 case RDMA_CM_EVENT_REJECTED: 3682 /* These only occur on the client side. */ 3683 break; 3684 case RDMA_CM_EVENT_ESTABLISHED: 3685 /* TODO: Should we be waiting for this event anywhere? */ 3686 break; 3687 case RDMA_CM_EVENT_DISCONNECTED: 3688 rc = nvmf_rdma_disconnect(event, &event_acked); 3689 if (rc < 0) { 3690 SPDK_ERRLOG("Unable to process disconnect event. rc: %d\n", rc); 3691 break; 3692 } 3693 break; 3694 case RDMA_CM_EVENT_DEVICE_REMOVAL: 3695 /* In case of device removal, kernel IB part triggers IBV_EVENT_DEVICE_FATAL 3696 * which triggers RDMA_CM_EVENT_DEVICE_REMOVAL on all cma_id’s. 3697 * Once these events are sent to SPDK, we should release all IB resources and 3698 * don't make attempts to call any ibv_query/modify/create functions. We can only call 3699 * ibv_destroy* functions to release user space memory allocated by IB. All kernel 3700 * resources are already cleaned. */ 3701 if (event->id->qp) { 3702 /* If rdma_cm event has a valid `qp` pointer then the event refers to the 3703 * corresponding qpair. Otherwise the event refers to a listening device. */ 3704 rc = nvmf_rdma_disconnect(event, &event_acked); 3705 if (rc < 0) { 3706 SPDK_ERRLOG("Unable to process disconnect event. rc: %d\n", rc); 3707 break; 3708 } 3709 } else { 3710 nvmf_rdma_handle_cm_event_port_removal(transport, event); 3711 event_acked = true; 3712 } 3713 break; 3714 case RDMA_CM_EVENT_MULTICAST_JOIN: 3715 case RDMA_CM_EVENT_MULTICAST_ERROR: 3716 /* Multicast is not used */ 3717 break; 3718 case RDMA_CM_EVENT_ADDR_CHANGE: 3719 event_acked = nvmf_rdma_handle_cm_event_addr_change(transport, event); 3720 break; 3721 case RDMA_CM_EVENT_TIMEWAIT_EXIT: 3722 /* For now, do nothing. The target never re-uses queue pairs. */ 3723 break; 3724 default: 3725 SPDK_ERRLOG("Unexpected Acceptor Event [%d]\n", event->event); 3726 break; 3727 } 3728 if (!event_acked) { 3729 rdma_ack_cm_event(event); 3730 } 3731 } 3732 } 3733 3734 static void 3735 nvmf_rdma_handle_last_wqe_reached(struct spdk_nvmf_rdma_qpair *rqpair) 3736 { 3737 rqpair->last_wqe_reached = true; 3738 nvmf_rdma_destroy_drained_qpair(rqpair); 3739 } 3740 3741 static void 3742 nvmf_rdma_qpair_process_ibv_event(void *ctx) 3743 { 3744 struct spdk_nvmf_rdma_ibv_event_ctx *event_ctx = ctx; 3745 3746 if (event_ctx->rqpair) { 3747 STAILQ_REMOVE(&event_ctx->rqpair->ibv_events, event_ctx, spdk_nvmf_rdma_ibv_event_ctx, link); 3748 if (event_ctx->cb_fn) { 3749 event_ctx->cb_fn(event_ctx->rqpair); 3750 } 3751 } 3752 free(event_ctx); 3753 } 3754 3755 static int 3756 nvmf_rdma_send_qpair_async_event(struct spdk_nvmf_rdma_qpair *rqpair, 3757 spdk_nvmf_rdma_qpair_ibv_event fn) 3758 { 3759 struct spdk_nvmf_rdma_ibv_event_ctx *ctx; 3760 struct spdk_thread *thr = NULL; 3761 int rc; 3762 3763 if (rqpair->qpair.group) { 3764 thr = rqpair->qpair.group->thread; 3765 } else if (rqpair->destruct_channel) { 3766 thr = spdk_io_channel_get_thread(rqpair->destruct_channel); 3767 } 3768 3769 if (!thr) { 3770 SPDK_DEBUGLOG(rdma, "rqpair %p has no thread\n", rqpair); 3771 return -EINVAL; 3772 } 3773 3774 ctx = calloc(1, sizeof(*ctx)); 3775 if (!ctx) { 3776 return -ENOMEM; 3777 } 3778 3779 ctx->rqpair = rqpair; 3780 ctx->cb_fn = fn; 3781 STAILQ_INSERT_TAIL(&rqpair->ibv_events, ctx, link); 3782 3783 rc = spdk_thread_send_msg(thr, nvmf_rdma_qpair_process_ibv_event, ctx); 3784 if (rc) { 3785 STAILQ_REMOVE(&rqpair->ibv_events, ctx, spdk_nvmf_rdma_ibv_event_ctx, link); 3786 free(ctx); 3787 } 3788 3789 return rc; 3790 } 3791 3792 static int 3793 nvmf_process_ib_event(struct spdk_nvmf_rdma_device *device) 3794 { 3795 int rc; 3796 struct spdk_nvmf_rdma_qpair *rqpair = NULL; 3797 struct ibv_async_event event; 3798 3799 rc = ibv_get_async_event(device->context, &event); 3800 3801 if (rc) { 3802 /* In non-blocking mode -1 means there are no events available */ 3803 return rc; 3804 } 3805 3806 switch (event.event_type) { 3807 case IBV_EVENT_QP_FATAL: 3808 case IBV_EVENT_QP_LAST_WQE_REACHED: 3809 case IBV_EVENT_QP_REQ_ERR: 3810 case IBV_EVENT_QP_ACCESS_ERR: 3811 case IBV_EVENT_COMM_EST: 3812 case IBV_EVENT_PATH_MIG: 3813 case IBV_EVENT_PATH_MIG_ERR: 3814 rqpair = event.element.qp->qp_context; 3815 if (!rqpair) { 3816 /* Any QP event for NVMe-RDMA initiator may be returned. */ 3817 SPDK_NOTICELOG("Async QP event for unknown QP: %s\n", 3818 ibv_event_type_str(event.event_type)); 3819 break; 3820 } 3821 3822 switch (event.event_type) { 3823 case IBV_EVENT_QP_FATAL: 3824 SPDK_ERRLOG("Fatal event received for rqpair %p\n", rqpair); 3825 spdk_trace_record(TRACE_RDMA_IBV_ASYNC_EVENT, 0, 0, 3826 (uintptr_t)rqpair, event.event_type); 3827 rqpair->ibv_in_error_state = true; 3828 spdk_nvmf_qpair_disconnect(&rqpair->qpair); 3829 break; 3830 case IBV_EVENT_QP_LAST_WQE_REACHED: 3831 /* This event only occurs for shared receive queues. */ 3832 SPDK_DEBUGLOG(rdma, "Last WQE reached event received for rqpair %p\n", rqpair); 3833 rc = nvmf_rdma_send_qpair_async_event(rqpair, nvmf_rdma_handle_last_wqe_reached); 3834 if (rc) { 3835 SPDK_WARNLOG("Failed to send LAST_WQE_REACHED event. rqpair %p, err %d\n", rqpair, rc); 3836 rqpair->last_wqe_reached = true; 3837 } 3838 break; 3839 case IBV_EVENT_QP_REQ_ERR: 3840 case IBV_EVENT_QP_ACCESS_ERR: 3841 case IBV_EVENT_COMM_EST: 3842 case IBV_EVENT_PATH_MIG: 3843 case IBV_EVENT_PATH_MIG_ERR: 3844 SPDK_NOTICELOG("Async QP event: %s\n", 3845 ibv_event_type_str(event.event_type)); 3846 spdk_trace_record(TRACE_RDMA_IBV_ASYNC_EVENT, 0, 0, 3847 (uintptr_t)rqpair, event.event_type); 3848 rqpair->ibv_in_error_state = true; 3849 break; 3850 default: 3851 break; 3852 } 3853 break; 3854 case IBV_EVENT_DEVICE_FATAL: 3855 SPDK_ERRLOG("Device Fatal event[%s] received on %s. device: %p\n", 3856 ibv_event_type_str(event.event_type), ibv_get_device_name(device->context->device), device); 3857 device->need_destroy = true; 3858 break; 3859 case IBV_EVENT_CQ_ERR: 3860 case IBV_EVENT_PORT_ACTIVE: 3861 case IBV_EVENT_PORT_ERR: 3862 case IBV_EVENT_LID_CHANGE: 3863 case IBV_EVENT_PKEY_CHANGE: 3864 case IBV_EVENT_SM_CHANGE: 3865 case IBV_EVENT_SRQ_ERR: 3866 case IBV_EVENT_SRQ_LIMIT_REACHED: 3867 case IBV_EVENT_CLIENT_REREGISTER: 3868 case IBV_EVENT_GID_CHANGE: 3869 case IBV_EVENT_SQ_DRAINED: 3870 default: 3871 SPDK_NOTICELOG("Async event: %s\n", 3872 ibv_event_type_str(event.event_type)); 3873 spdk_trace_record(TRACE_RDMA_IBV_ASYNC_EVENT, 0, 0, 0, event.event_type); 3874 break; 3875 } 3876 ibv_ack_async_event(&event); 3877 3878 return 0; 3879 } 3880 3881 static void 3882 nvmf_process_ib_events(struct spdk_nvmf_rdma_device *device, uint32_t max_events) 3883 { 3884 int rc = 0; 3885 uint32_t i = 0; 3886 3887 for (i = 0; i < max_events; i++) { 3888 rc = nvmf_process_ib_event(device); 3889 if (rc) { 3890 break; 3891 } 3892 } 3893 3894 SPDK_DEBUGLOG(rdma, "Device %s: %u events processed\n", device->context->device->name, i); 3895 } 3896 3897 static int 3898 nvmf_rdma_accept(void *ctx) 3899 { 3900 int nfds, i = 0; 3901 struct spdk_nvmf_transport *transport = ctx; 3902 struct spdk_nvmf_rdma_transport *rtransport; 3903 struct spdk_nvmf_rdma_device *device, *tmp; 3904 uint32_t count; 3905 short revents; 3906 bool do_retry; 3907 3908 rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport); 3909 do_retry = nvmf_rdma_retry_listen_port(rtransport); 3910 3911 count = nfds = poll(rtransport->poll_fds, rtransport->npoll_fds, 0); 3912 3913 if (nfds <= 0) { 3914 return do_retry ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE; 3915 } 3916 3917 /* The first poll descriptor is RDMA CM event */ 3918 if (rtransport->poll_fds[i++].revents & POLLIN) { 3919 nvmf_process_cm_events(transport, NVMF_RDMA_MAX_EVENTS_PER_POLL); 3920 nfds--; 3921 } 3922 3923 if (nfds == 0) { 3924 return SPDK_POLLER_BUSY; 3925 } 3926 3927 /* Second and subsequent poll descriptors are IB async events */ 3928 TAILQ_FOREACH_SAFE(device, &rtransport->devices, link, tmp) { 3929 revents = rtransport->poll_fds[i++].revents; 3930 if (revents & POLLIN) { 3931 if (spdk_likely(!device->need_destroy)) { 3932 nvmf_process_ib_events(device, NVMF_RDMA_MAX_EVENTS_PER_POLL); 3933 if (spdk_unlikely(device->need_destroy)) { 3934 nvmf_rdma_handle_device_removal(rtransport, device); 3935 } 3936 } 3937 nfds--; 3938 } else if (revents & POLLNVAL || revents & POLLHUP) { 3939 SPDK_ERRLOG("Receive unknown revent %x on device %p\n", (int)revents, device); 3940 nfds--; 3941 } 3942 } 3943 /* check all flagged fd's have been served */ 3944 assert(nfds == 0); 3945 3946 return count > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE; 3947 } 3948 3949 static void 3950 nvmf_rdma_cdata_init(struct spdk_nvmf_transport *transport, struct spdk_nvmf_subsystem *subsystem, 3951 struct spdk_nvmf_ctrlr_data *cdata) 3952 { 3953 cdata->nvmf_specific.msdbd = NVMF_DEFAULT_MSDBD; 3954 3955 /* Disable in-capsule data transfer for RDMA controller when dif_insert_or_strip is enabled 3956 since in-capsule data only works with NVME drives that support SGL memory layout */ 3957 if (transport->opts.dif_insert_or_strip) { 3958 cdata->nvmf_specific.ioccsz = sizeof(struct spdk_nvme_cmd) / 16; 3959 } 3960 3961 if (cdata->nvmf_specific.ioccsz > ((sizeof(struct spdk_nvme_cmd) + 0x1000) / 16)) { 3962 SPDK_WARNLOG("RDMA is configured to support up to 16 SGL entries while in capsule" 3963 " data is greater than 4KiB.\n"); 3964 SPDK_WARNLOG("When used in conjunction with the NVMe-oF initiator from the Linux " 3965 "kernel between versions 5.4 and 5.12 data corruption may occur for " 3966 "writes that are not a multiple of 4KiB in size.\n"); 3967 } 3968 } 3969 3970 static void 3971 nvmf_rdma_discover(struct spdk_nvmf_transport *transport, 3972 struct spdk_nvme_transport_id *trid, 3973 struct spdk_nvmf_discovery_log_page_entry *entry) 3974 { 3975 entry->trtype = SPDK_NVMF_TRTYPE_RDMA; 3976 entry->adrfam = trid->adrfam; 3977 entry->treq.secure_channel = SPDK_NVMF_TREQ_SECURE_CHANNEL_NOT_REQUIRED; 3978 3979 spdk_strcpy_pad(entry->trsvcid, trid->trsvcid, sizeof(entry->trsvcid), ' '); 3980 spdk_strcpy_pad(entry->traddr, trid->traddr, sizeof(entry->traddr), ' '); 3981 3982 entry->tsas.rdma.rdma_qptype = SPDK_NVMF_RDMA_QPTYPE_RELIABLE_CONNECTED; 3983 entry->tsas.rdma.rdma_prtype = SPDK_NVMF_RDMA_PRTYPE_NONE; 3984 entry->tsas.rdma.rdma_cms = SPDK_NVMF_RDMA_CMS_RDMA_CM; 3985 } 3986 3987 static int 3988 nvmf_rdma_poller_create(struct spdk_nvmf_rdma_transport *rtransport, 3989 struct spdk_nvmf_rdma_poll_group *rgroup, struct spdk_nvmf_rdma_device *device, 3990 struct spdk_nvmf_rdma_poller **out_poller) 3991 { 3992 struct spdk_nvmf_rdma_poller *poller; 3993 struct spdk_rdma_provider_srq_init_attr srq_init_attr; 3994 struct spdk_nvmf_rdma_resource_opts opts; 3995 int num_cqe; 3996 3997 poller = calloc(1, sizeof(*poller)); 3998 if (!poller) { 3999 SPDK_ERRLOG("Unable to allocate memory for new RDMA poller\n"); 4000 return -1; 4001 } 4002 4003 poller->device = device; 4004 poller->group = rgroup; 4005 *out_poller = poller; 4006 4007 RB_INIT(&poller->qpairs); 4008 STAILQ_INIT(&poller->qpairs_pending_send); 4009 STAILQ_INIT(&poller->qpairs_pending_recv); 4010 4011 TAILQ_INSERT_TAIL(&rgroup->pollers, poller, link); 4012 SPDK_DEBUGLOG(rdma, "Create poller %p on device %p in poll group %p.\n", poller, device, rgroup); 4013 if (rtransport->rdma_opts.no_srq == false && device->num_srq < device->attr.max_srq) { 4014 if ((int)rtransport->rdma_opts.max_srq_depth > device->attr.max_srq_wr) { 4015 SPDK_WARNLOG("Requested SRQ depth %u, max supported by dev %s is %d\n", 4016 rtransport->rdma_opts.max_srq_depth, device->context->device->name, device->attr.max_srq_wr); 4017 } 4018 poller->max_srq_depth = spdk_min((int)rtransport->rdma_opts.max_srq_depth, device->attr.max_srq_wr); 4019 4020 device->num_srq++; 4021 memset(&srq_init_attr, 0, sizeof(srq_init_attr)); 4022 srq_init_attr.pd = device->pd; 4023 srq_init_attr.stats = &poller->stat.qp_stats.recv; 4024 srq_init_attr.srq_init_attr.attr.max_wr = poller->max_srq_depth; 4025 srq_init_attr.srq_init_attr.attr.max_sge = spdk_min(device->attr.max_sge, NVMF_DEFAULT_RX_SGE); 4026 poller->srq = spdk_rdma_provider_srq_create(&srq_init_attr); 4027 if (!poller->srq) { 4028 SPDK_ERRLOG("Unable to create shared receive queue, errno %d\n", errno); 4029 return -1; 4030 } 4031 4032 opts.qp = poller->srq; 4033 opts.map = device->map; 4034 opts.qpair = NULL; 4035 opts.shared = true; 4036 opts.max_queue_depth = poller->max_srq_depth; 4037 opts.in_capsule_data_size = rtransport->transport.opts.in_capsule_data_size; 4038 4039 poller->resources = nvmf_rdma_resources_create(&opts); 4040 if (!poller->resources) { 4041 SPDK_ERRLOG("Unable to allocate resources for shared receive queue.\n"); 4042 return -1; 4043 } 4044 } 4045 4046 /* 4047 * When using an srq, we can limit the completion queue at startup. 4048 * The following formula represents the calculation: 4049 * num_cqe = num_recv + num_data_wr + num_send_wr. 4050 * where num_recv=num_data_wr=and num_send_wr=poller->max_srq_depth 4051 */ 4052 if (poller->srq) { 4053 num_cqe = poller->max_srq_depth * 3; 4054 } else { 4055 num_cqe = rtransport->rdma_opts.num_cqe; 4056 } 4057 4058 poller->cq = ibv_create_cq(device->context, num_cqe, poller, NULL, 0); 4059 if (!poller->cq) { 4060 SPDK_ERRLOG("Unable to create completion queue\n"); 4061 return -1; 4062 } 4063 poller->num_cqe = num_cqe; 4064 return 0; 4065 } 4066 4067 static void 4068 _nvmf_rdma_register_poller_in_group(void *c) 4069 { 4070 struct spdk_nvmf_rdma_poller *poller; 4071 struct poller_manage_ctx *ctx = c; 4072 struct spdk_nvmf_rdma_device *device; 4073 int rc; 4074 4075 rc = nvmf_rdma_poller_create(ctx->rtransport, ctx->rgroup, ctx->device, &poller); 4076 if (rc < 0 && poller) { 4077 nvmf_rdma_poller_destroy(poller); 4078 } 4079 4080 device = ctx->device; 4081 if (nvmf_rdma_all_pollers_management_done(ctx)) { 4082 device->is_ready = true; 4083 } 4084 } 4085 4086 static void nvmf_rdma_poll_group_destroy(struct spdk_nvmf_transport_poll_group *group); 4087 4088 static struct spdk_nvmf_transport_poll_group * 4089 nvmf_rdma_poll_group_create(struct spdk_nvmf_transport *transport, 4090 struct spdk_nvmf_poll_group *group) 4091 { 4092 struct spdk_nvmf_rdma_transport *rtransport; 4093 struct spdk_nvmf_rdma_poll_group *rgroup; 4094 struct spdk_nvmf_rdma_poller *poller; 4095 struct spdk_nvmf_rdma_device *device; 4096 int rc; 4097 4098 if (spdk_interrupt_mode_is_enabled()) { 4099 SPDK_ERRLOG("RDMA transport does not support interrupt mode\n"); 4100 return NULL; 4101 } 4102 4103 rtransport = SPDK_CONTAINEROF(transport, struct spdk_nvmf_rdma_transport, transport); 4104 4105 rgroup = calloc(1, sizeof(*rgroup)); 4106 if (!rgroup) { 4107 return NULL; 4108 } 4109 4110 TAILQ_INIT(&rgroup->pollers); 4111 4112 TAILQ_FOREACH(device, &rtransport->devices, link) { 4113 rc = nvmf_rdma_poller_create(rtransport, rgroup, device, &poller); 4114 if (rc < 0) { 4115 nvmf_rdma_poll_group_destroy(&rgroup->group); 4116 return NULL; 4117 } 4118 } 4119 4120 TAILQ_INSERT_TAIL(&rtransport->poll_groups, rgroup, link); 4121 if (rtransport->conn_sched.next_admin_pg == NULL) { 4122 rtransport->conn_sched.next_admin_pg = rgroup; 4123 rtransport->conn_sched.next_io_pg = rgroup; 4124 } 4125 4126 return &rgroup->group; 4127 } 4128 4129 static uint32_t 4130 nvmf_poll_group_get_io_qpair_count(struct spdk_nvmf_poll_group *pg) 4131 { 4132 uint32_t count; 4133 4134 /* Just assume that unassociated qpairs will eventually be io 4135 * qpairs. This is close enough for the use cases for this 4136 * function. 4137 */ 4138 pthread_mutex_lock(&pg->mutex); 4139 count = pg->stat.current_io_qpairs + pg->current_unassociated_qpairs; 4140 pthread_mutex_unlock(&pg->mutex); 4141 4142 return count; 4143 } 4144 4145 static struct spdk_nvmf_transport_poll_group * 4146 nvmf_rdma_get_optimal_poll_group(struct spdk_nvmf_qpair *qpair) 4147 { 4148 struct spdk_nvmf_rdma_transport *rtransport; 4149 struct spdk_nvmf_rdma_poll_group **pg; 4150 struct spdk_nvmf_transport_poll_group *result; 4151 uint32_t count; 4152 4153 rtransport = SPDK_CONTAINEROF(qpair->transport, struct spdk_nvmf_rdma_transport, transport); 4154 4155 if (TAILQ_EMPTY(&rtransport->poll_groups)) { 4156 return NULL; 4157 } 4158 4159 if (qpair->qid == 0) { 4160 pg = &rtransport->conn_sched.next_admin_pg; 4161 } else { 4162 struct spdk_nvmf_rdma_poll_group *pg_min, *pg_start, *pg_current; 4163 uint32_t min_value; 4164 4165 pg = &rtransport->conn_sched.next_io_pg; 4166 pg_min = *pg; 4167 pg_start = *pg; 4168 pg_current = *pg; 4169 min_value = nvmf_poll_group_get_io_qpair_count(pg_current->group.group); 4170 4171 while (1) { 4172 count = nvmf_poll_group_get_io_qpair_count(pg_current->group.group); 4173 4174 if (count < min_value) { 4175 min_value = count; 4176 pg_min = pg_current; 4177 } 4178 4179 pg_current = TAILQ_NEXT(pg_current, link); 4180 if (pg_current == NULL) { 4181 pg_current = TAILQ_FIRST(&rtransport->poll_groups); 4182 } 4183 4184 if (pg_current == pg_start || min_value == 0) { 4185 break; 4186 } 4187 } 4188 *pg = pg_min; 4189 } 4190 4191 assert(*pg != NULL); 4192 4193 result = &(*pg)->group; 4194 4195 *pg = TAILQ_NEXT(*pg, link); 4196 if (*pg == NULL) { 4197 *pg = TAILQ_FIRST(&rtransport->poll_groups); 4198 } 4199 4200 return result; 4201 } 4202 4203 static void 4204 nvmf_rdma_poller_destroy(struct spdk_nvmf_rdma_poller *poller) 4205 { 4206 struct spdk_nvmf_rdma_qpair *qpair, *tmp_qpair; 4207 int rc; 4208 4209 TAILQ_REMOVE(&poller->group->pollers, poller, link); 4210 RB_FOREACH_SAFE(qpair, qpairs_tree, &poller->qpairs, tmp_qpair) { 4211 nvmf_rdma_qpair_destroy(qpair); 4212 } 4213 4214 if (poller->srq) { 4215 if (poller->resources) { 4216 nvmf_rdma_resources_destroy(poller->resources); 4217 } 4218 spdk_rdma_provider_srq_destroy(poller->srq); 4219 SPDK_DEBUGLOG(rdma, "Destroyed RDMA shared queue %p\n", poller->srq); 4220 } 4221 4222 if (poller->cq) { 4223 rc = ibv_destroy_cq(poller->cq); 4224 if (rc != 0) { 4225 SPDK_ERRLOG("Destroy cq return %d, error: %s\n", rc, strerror(errno)); 4226 } 4227 } 4228 4229 if (poller->destroy_cb) { 4230 poller->destroy_cb(poller->destroy_cb_ctx); 4231 poller->destroy_cb = NULL; 4232 } 4233 4234 free(poller); 4235 } 4236 4237 static void 4238 nvmf_rdma_poll_group_destroy(struct spdk_nvmf_transport_poll_group *group) 4239 { 4240 struct spdk_nvmf_rdma_poll_group *rgroup, *next_rgroup; 4241 struct spdk_nvmf_rdma_poller *poller, *tmp; 4242 struct spdk_nvmf_rdma_transport *rtransport; 4243 4244 rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group); 4245 if (!rgroup) { 4246 return; 4247 } 4248 4249 TAILQ_FOREACH_SAFE(poller, &rgroup->pollers, link, tmp) { 4250 nvmf_rdma_poller_destroy(poller); 4251 } 4252 4253 if (rgroup->group.transport == NULL) { 4254 /* Transport can be NULL when nvmf_rdma_poll_group_create() 4255 * calls this function directly in a failure path. */ 4256 free(rgroup); 4257 return; 4258 } 4259 4260 rtransport = SPDK_CONTAINEROF(rgroup->group.transport, struct spdk_nvmf_rdma_transport, transport); 4261 4262 next_rgroup = TAILQ_NEXT(rgroup, link); 4263 TAILQ_REMOVE(&rtransport->poll_groups, rgroup, link); 4264 if (next_rgroup == NULL) { 4265 next_rgroup = TAILQ_FIRST(&rtransport->poll_groups); 4266 } 4267 if (rtransport->conn_sched.next_admin_pg == rgroup) { 4268 rtransport->conn_sched.next_admin_pg = next_rgroup; 4269 } 4270 if (rtransport->conn_sched.next_io_pg == rgroup) { 4271 rtransport->conn_sched.next_io_pg = next_rgroup; 4272 } 4273 4274 free(rgroup); 4275 } 4276 4277 static void 4278 nvmf_rdma_qpair_reject_connection(struct spdk_nvmf_rdma_qpair *rqpair) 4279 { 4280 if (rqpair->cm_id != NULL) { 4281 nvmf_rdma_event_reject(rqpair->cm_id, SPDK_NVMF_RDMA_ERROR_NO_RESOURCES); 4282 } 4283 } 4284 4285 static int 4286 nvmf_rdma_poll_group_add(struct spdk_nvmf_transport_poll_group *group, 4287 struct spdk_nvmf_qpair *qpair) 4288 { 4289 struct spdk_nvmf_rdma_poll_group *rgroup; 4290 struct spdk_nvmf_rdma_qpair *rqpair; 4291 struct spdk_nvmf_rdma_device *device; 4292 struct spdk_nvmf_rdma_poller *poller; 4293 int rc; 4294 4295 rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group); 4296 rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair); 4297 4298 device = rqpair->device; 4299 4300 TAILQ_FOREACH(poller, &rgroup->pollers, link) { 4301 if (poller->device == device) { 4302 break; 4303 } 4304 } 4305 4306 if (!poller) { 4307 SPDK_ERRLOG("No poller found for device.\n"); 4308 return -1; 4309 } 4310 4311 if (poller->need_destroy) { 4312 SPDK_ERRLOG("Poller is destroying.\n"); 4313 return -1; 4314 } 4315 4316 rqpair->poller = poller; 4317 rqpair->srq = rqpair->poller->srq; 4318 4319 rc = nvmf_rdma_qpair_initialize(qpair); 4320 if (rc < 0) { 4321 SPDK_ERRLOG("Failed to initialize nvmf_rdma_qpair with qpair=%p\n", qpair); 4322 rqpair->poller = NULL; 4323 rqpair->srq = NULL; 4324 return -1; 4325 } 4326 4327 RB_INSERT(qpairs_tree, &poller->qpairs, rqpair); 4328 4329 rc = nvmf_rdma_event_accept(rqpair->cm_id, rqpair); 4330 if (rc) { 4331 /* Try to reject, but we probably can't */ 4332 nvmf_rdma_qpair_reject_connection(rqpair); 4333 return -1; 4334 } 4335 4336 return 0; 4337 } 4338 4339 static int 4340 nvmf_rdma_poll_group_remove(struct spdk_nvmf_transport_poll_group *group, 4341 struct spdk_nvmf_qpair *qpair) 4342 { 4343 struct spdk_nvmf_rdma_qpair *rqpair; 4344 4345 rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair); 4346 assert(group->transport->tgt != NULL); 4347 4348 rqpair->destruct_channel = spdk_get_io_channel(group->transport->tgt); 4349 4350 if (!rqpair->destruct_channel) { 4351 SPDK_WARNLOG("failed to get io_channel, qpair %p\n", qpair); 4352 return 0; 4353 } 4354 4355 /* Sanity check that we get io_channel on the correct thread */ 4356 if (qpair->group) { 4357 assert(qpair->group->thread == spdk_io_channel_get_thread(rqpair->destruct_channel)); 4358 } 4359 4360 return 0; 4361 } 4362 4363 static int 4364 nvmf_rdma_request_free(struct spdk_nvmf_request *req) 4365 { 4366 struct spdk_nvmf_rdma_request *rdma_req = SPDK_CONTAINEROF(req, struct spdk_nvmf_rdma_request, req); 4367 struct spdk_nvmf_rdma_transport *rtransport = SPDK_CONTAINEROF(req->qpair->transport, 4368 struct spdk_nvmf_rdma_transport, transport); 4369 struct spdk_nvmf_rdma_qpair *rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, 4370 struct spdk_nvmf_rdma_qpair, qpair); 4371 4372 /* 4373 * AER requests are freed when a qpair is destroyed. The recv corresponding to that request 4374 * needs to be returned to the shared receive queue or the poll group will eventually be 4375 * starved of RECV structures. 4376 */ 4377 if (rqpair->srq && rdma_req->recv) { 4378 int rc; 4379 struct ibv_recv_wr *bad_recv_wr; 4380 4381 spdk_rdma_provider_srq_queue_recv_wrs(rqpair->srq, &rdma_req->recv->wr); 4382 rc = spdk_rdma_provider_srq_flush_recv_wrs(rqpair->srq, &bad_recv_wr); 4383 if (rc) { 4384 SPDK_ERRLOG("Unable to re-post rx descriptor\n"); 4385 } 4386 } 4387 4388 _nvmf_rdma_request_free(rdma_req, rtransport); 4389 return 0; 4390 } 4391 4392 static int 4393 nvmf_rdma_request_complete(struct spdk_nvmf_request *req) 4394 { 4395 struct spdk_nvmf_rdma_transport *rtransport = SPDK_CONTAINEROF(req->qpair->transport, 4396 struct spdk_nvmf_rdma_transport, transport); 4397 struct spdk_nvmf_rdma_request *rdma_req = SPDK_CONTAINEROF(req, 4398 struct spdk_nvmf_rdma_request, req); 4399 struct spdk_nvmf_rdma_qpair *rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, 4400 struct spdk_nvmf_rdma_qpair, qpair); 4401 4402 if (spdk_unlikely(rqpair->ibv_in_error_state)) { 4403 /* The connection is dead. Move the request directly to the completed state. */ 4404 rdma_req->state = RDMA_REQUEST_STATE_COMPLETED; 4405 } else { 4406 /* The connection is alive, so process the request as normal */ 4407 rdma_req->state = RDMA_REQUEST_STATE_EXECUTED; 4408 } 4409 4410 nvmf_rdma_request_process(rtransport, rdma_req); 4411 4412 return 0; 4413 } 4414 4415 static void 4416 nvmf_rdma_close_qpair(struct spdk_nvmf_qpair *qpair, 4417 spdk_nvmf_transport_qpair_fini_cb cb_fn, void *cb_arg) 4418 { 4419 struct spdk_nvmf_rdma_qpair *rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair); 4420 4421 rqpair->to_close = true; 4422 4423 /* This happens only when the qpair is disconnected before 4424 * it is added to the poll group. Since there is no poll group, 4425 * the RDMA qp has not been initialized yet and the RDMA CM 4426 * event has not yet been acknowledged, so we need to reject it. 4427 */ 4428 if (rqpair->qpair.state == SPDK_NVMF_QPAIR_UNINITIALIZED) { 4429 nvmf_rdma_qpair_reject_connection(rqpair); 4430 nvmf_rdma_qpair_destroy(rqpair); 4431 return; 4432 } 4433 4434 if (rqpair->rdma_qp) { 4435 spdk_rdma_provider_qp_disconnect(rqpair->rdma_qp); 4436 } 4437 4438 nvmf_rdma_destroy_drained_qpair(rqpair); 4439 4440 if (cb_fn) { 4441 cb_fn(cb_arg); 4442 } 4443 } 4444 4445 static struct spdk_nvmf_rdma_qpair * 4446 get_rdma_qpair_from_wc(struct spdk_nvmf_rdma_poller *rpoller, struct ibv_wc *wc) 4447 { 4448 struct spdk_nvmf_rdma_qpair find; 4449 4450 find.qp_num = wc->qp_num; 4451 4452 return RB_FIND(qpairs_tree, &rpoller->qpairs, &find); 4453 } 4454 4455 #ifdef DEBUG 4456 static int 4457 nvmf_rdma_req_is_completing(struct spdk_nvmf_rdma_request *rdma_req) 4458 { 4459 return rdma_req->state == RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST || 4460 rdma_req->state == RDMA_REQUEST_STATE_COMPLETING; 4461 } 4462 #endif 4463 4464 static void 4465 _poller_reset_failed_recvs(struct spdk_nvmf_rdma_poller *rpoller, struct ibv_recv_wr *bad_recv_wr, 4466 int rc) 4467 { 4468 struct spdk_nvmf_rdma_recv *rdma_recv; 4469 struct spdk_nvmf_rdma_wr *bad_rdma_wr; 4470 4471 SPDK_ERRLOG("Failed to post a recv for the poller %p with errno %d\n", rpoller, -rc); 4472 while (bad_recv_wr != NULL) { 4473 bad_rdma_wr = (struct spdk_nvmf_rdma_wr *)bad_recv_wr->wr_id; 4474 rdma_recv = SPDK_CONTAINEROF(bad_rdma_wr, struct spdk_nvmf_rdma_recv, rdma_wr); 4475 4476 rdma_recv->qpair->current_recv_depth++; 4477 bad_recv_wr = bad_recv_wr->next; 4478 SPDK_ERRLOG("Failed to post a recv for the qpair %p with errno %d\n", rdma_recv->qpair, -rc); 4479 spdk_nvmf_qpair_disconnect(&rdma_recv->qpair->qpair); 4480 } 4481 } 4482 4483 static void 4484 _qp_reset_failed_recvs(struct spdk_nvmf_rdma_qpair *rqpair, struct ibv_recv_wr *bad_recv_wr, int rc) 4485 { 4486 SPDK_ERRLOG("Failed to post a recv for the qpair %p with errno %d\n", rqpair, -rc); 4487 while (bad_recv_wr != NULL) { 4488 bad_recv_wr = bad_recv_wr->next; 4489 rqpair->current_recv_depth++; 4490 } 4491 spdk_nvmf_qpair_disconnect(&rqpair->qpair); 4492 } 4493 4494 static void 4495 _poller_submit_recvs(struct spdk_nvmf_rdma_transport *rtransport, 4496 struct spdk_nvmf_rdma_poller *rpoller) 4497 { 4498 struct spdk_nvmf_rdma_qpair *rqpair; 4499 struct ibv_recv_wr *bad_recv_wr; 4500 int rc; 4501 4502 if (rpoller->srq) { 4503 rc = spdk_rdma_provider_srq_flush_recv_wrs(rpoller->srq, &bad_recv_wr); 4504 if (spdk_unlikely(rc)) { 4505 _poller_reset_failed_recvs(rpoller, bad_recv_wr, rc); 4506 } 4507 } else { 4508 while (!STAILQ_EMPTY(&rpoller->qpairs_pending_recv)) { 4509 rqpair = STAILQ_FIRST(&rpoller->qpairs_pending_recv); 4510 rc = spdk_rdma_provider_qp_flush_recv_wrs(rqpair->rdma_qp, &bad_recv_wr); 4511 if (spdk_unlikely(rc)) { 4512 _qp_reset_failed_recvs(rqpair, bad_recv_wr, rc); 4513 } 4514 STAILQ_REMOVE_HEAD(&rpoller->qpairs_pending_recv, recv_link); 4515 } 4516 } 4517 } 4518 4519 static void 4520 _qp_reset_failed_sends(struct spdk_nvmf_rdma_transport *rtransport, 4521 struct spdk_nvmf_rdma_qpair *rqpair, struct ibv_send_wr *bad_wr, int rc) 4522 { 4523 struct spdk_nvmf_rdma_wr *bad_rdma_wr; 4524 struct spdk_nvmf_rdma_request *prev_rdma_req = NULL, *cur_rdma_req = NULL; 4525 4526 SPDK_ERRLOG("Failed to post a send for the qpair %p with errno %d\n", rqpair, -rc); 4527 for (; bad_wr != NULL; bad_wr = bad_wr->next) { 4528 bad_rdma_wr = (struct spdk_nvmf_rdma_wr *)bad_wr->wr_id; 4529 assert(rqpair->current_send_depth > 0); 4530 rqpair->current_send_depth--; 4531 switch (bad_rdma_wr->type) { 4532 case RDMA_WR_TYPE_DATA: 4533 cur_rdma_req = SPDK_CONTAINEROF(bad_rdma_wr, struct spdk_nvmf_rdma_request, data_wr); 4534 if (bad_wr->opcode == IBV_WR_RDMA_READ) { 4535 assert(rqpair->current_read_depth > 0); 4536 rqpair->current_read_depth--; 4537 } 4538 break; 4539 case RDMA_WR_TYPE_SEND: 4540 cur_rdma_req = SPDK_CONTAINEROF(bad_rdma_wr, struct spdk_nvmf_rdma_request, rsp_wr); 4541 break; 4542 default: 4543 SPDK_ERRLOG("Found a RECV in the list of pending SEND requests for qpair %p\n", rqpair); 4544 prev_rdma_req = cur_rdma_req; 4545 continue; 4546 } 4547 4548 if (prev_rdma_req == cur_rdma_req) { 4549 /* this request was handled by an earlier wr. i.e. we were performing an nvme read. */ 4550 /* We only have to check against prev_wr since each requests wrs are contiguous in this list. */ 4551 continue; 4552 } 4553 4554 switch (cur_rdma_req->state) { 4555 case RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER: 4556 cur_rdma_req->req.rsp->nvme_cpl.status.sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 4557 STAILQ_INSERT_TAIL(&rqpair->pending_rdma_send_queue, cur_rdma_req, state_link); 4558 cur_rdma_req->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE_PENDING; 4559 break; 4560 case RDMA_REQUEST_STATE_TRANSFERRING_CONTROLLER_TO_HOST: 4561 case RDMA_REQUEST_STATE_COMPLETING: 4562 cur_rdma_req->state = RDMA_REQUEST_STATE_COMPLETED; 4563 break; 4564 default: 4565 SPDK_ERRLOG("Found a request in a bad state %d when draining pending SEND requests for qpair %p\n", 4566 cur_rdma_req->state, rqpair); 4567 continue; 4568 } 4569 4570 nvmf_rdma_request_process(rtransport, cur_rdma_req); 4571 prev_rdma_req = cur_rdma_req; 4572 } 4573 4574 if (spdk_nvmf_qpair_is_active(&rqpair->qpair)) { 4575 /* Disconnect the connection. */ 4576 spdk_nvmf_qpair_disconnect(&rqpair->qpair); 4577 } 4578 4579 } 4580 4581 static void 4582 _poller_submit_sends(struct spdk_nvmf_rdma_transport *rtransport, 4583 struct spdk_nvmf_rdma_poller *rpoller) 4584 { 4585 struct spdk_nvmf_rdma_qpair *rqpair; 4586 struct ibv_send_wr *bad_wr = NULL; 4587 int rc; 4588 4589 while (!STAILQ_EMPTY(&rpoller->qpairs_pending_send)) { 4590 rqpair = STAILQ_FIRST(&rpoller->qpairs_pending_send); 4591 rc = spdk_rdma_provider_qp_flush_send_wrs(rqpair->rdma_qp, &bad_wr); 4592 4593 /* bad wr always points to the first wr that failed. */ 4594 if (spdk_unlikely(rc)) { 4595 _qp_reset_failed_sends(rtransport, rqpair, bad_wr, rc); 4596 } 4597 STAILQ_REMOVE_HEAD(&rpoller->qpairs_pending_send, send_link); 4598 } 4599 } 4600 4601 static const char * 4602 nvmf_rdma_wr_type_str(enum spdk_nvmf_rdma_wr_type wr_type) 4603 { 4604 switch (wr_type) { 4605 case RDMA_WR_TYPE_RECV: 4606 return "RECV"; 4607 case RDMA_WR_TYPE_SEND: 4608 return "SEND"; 4609 case RDMA_WR_TYPE_DATA: 4610 return "DATA"; 4611 default: 4612 SPDK_ERRLOG("Unknown WR type %d\n", wr_type); 4613 SPDK_UNREACHABLE(); 4614 } 4615 } 4616 4617 static inline void 4618 nvmf_rdma_log_wc_status(struct spdk_nvmf_rdma_qpair *rqpair, struct ibv_wc *wc) 4619 { 4620 enum spdk_nvmf_rdma_wr_type wr_type = ((struct spdk_nvmf_rdma_wr *)wc->wr_id)->type; 4621 4622 if (wc->status == IBV_WC_WR_FLUSH_ERR) { 4623 /* If qpair is in ERR state, we will receive completions for all posted and not completed 4624 * Work Requests with IBV_WC_WR_FLUSH_ERR status. Don't log an error in that case */ 4625 SPDK_DEBUGLOG(rdma, 4626 "Error on CQ %p, (qp state %d, in_error %d) request 0x%lu, type %s, status: (%d): %s\n", 4627 rqpair->poller->cq, rqpair->qpair.state, rqpair->ibv_in_error_state, wc->wr_id, 4628 nvmf_rdma_wr_type_str(wr_type), wc->status, ibv_wc_status_str(wc->status)); 4629 } else { 4630 SPDK_ERRLOG("Error on CQ %p, (qp state %d, in_error %d) request 0x%lu, type %s, status: (%d): %s\n", 4631 rqpair->poller->cq, rqpair->qpair.state, rqpair->ibv_in_error_state, wc->wr_id, 4632 nvmf_rdma_wr_type_str(wr_type), wc->status, ibv_wc_status_str(wc->status)); 4633 } 4634 } 4635 4636 static int 4637 nvmf_rdma_poller_poll(struct spdk_nvmf_rdma_transport *rtransport, 4638 struct spdk_nvmf_rdma_poller *rpoller) 4639 { 4640 struct ibv_wc wc[32]; 4641 struct spdk_nvmf_rdma_wr *rdma_wr; 4642 struct spdk_nvmf_rdma_request *rdma_req; 4643 struct spdk_nvmf_rdma_recv *rdma_recv; 4644 struct spdk_nvmf_rdma_qpair *rqpair, *tmp_rqpair; 4645 int reaped, i; 4646 int count = 0; 4647 int rc; 4648 bool error = false; 4649 uint64_t poll_tsc = spdk_get_ticks(); 4650 4651 if (spdk_unlikely(rpoller->need_destroy)) { 4652 /* If qpair is closed before poller destroy, nvmf_rdma_destroy_drained_qpair may not 4653 * be called because we cannot poll anything from cq. So we call that here to force 4654 * destroy the qpair after to_close turning true. 4655 */ 4656 RB_FOREACH_SAFE(rqpair, qpairs_tree, &rpoller->qpairs, tmp_rqpair) { 4657 nvmf_rdma_destroy_drained_qpair(rqpair); 4658 } 4659 return 0; 4660 } 4661 4662 /* Poll for completing operations. */ 4663 reaped = ibv_poll_cq(rpoller->cq, 32, wc); 4664 if (spdk_unlikely(reaped < 0)) { 4665 SPDK_ERRLOG("Error polling CQ! (%d): %s\n", 4666 errno, spdk_strerror(errno)); 4667 return -1; 4668 } else if (reaped == 0) { 4669 rpoller->stat.idle_polls++; 4670 } 4671 4672 rpoller->stat.polls++; 4673 rpoller->stat.completions += reaped; 4674 4675 for (i = 0; i < reaped; i++) { 4676 4677 rdma_wr = (struct spdk_nvmf_rdma_wr *)wc[i].wr_id; 4678 4679 switch (rdma_wr->type) { 4680 case RDMA_WR_TYPE_SEND: 4681 rdma_req = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvmf_rdma_request, rsp_wr); 4682 rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair); 4683 4684 if (spdk_likely(!wc[i].status)) { 4685 count++; 4686 assert(wc[i].opcode == IBV_WC_SEND); 4687 assert(nvmf_rdma_req_is_completing(rdma_req)); 4688 } 4689 4690 rdma_req->state = RDMA_REQUEST_STATE_COMPLETED; 4691 /* RDMA_WRITE operation completed. +1 since it was chained with rsp WR */ 4692 assert(rqpair->current_send_depth >= (uint32_t)rdma_req->num_outstanding_data_wr + 1); 4693 rqpair->current_send_depth -= rdma_req->num_outstanding_data_wr + 1; 4694 rdma_req->num_outstanding_data_wr = 0; 4695 4696 nvmf_rdma_request_process(rtransport, rdma_req); 4697 break; 4698 case RDMA_WR_TYPE_RECV: 4699 /* rdma_recv->qpair will be invalid if using an SRQ. In that case we have to get the qpair from the wc. */ 4700 rdma_recv = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvmf_rdma_recv, rdma_wr); 4701 if (rpoller->srq != NULL) { 4702 rdma_recv->qpair = get_rdma_qpair_from_wc(rpoller, &wc[i]); 4703 /* It is possible that there are still some completions for destroyed QP 4704 * associated with SRQ. We just ignore these late completions and re-post 4705 * receive WRs back to SRQ. 4706 */ 4707 if (spdk_unlikely(NULL == rdma_recv->qpair)) { 4708 struct ibv_recv_wr *bad_wr; 4709 4710 rdma_recv->wr.next = NULL; 4711 spdk_rdma_provider_srq_queue_recv_wrs(rpoller->srq, &rdma_recv->wr); 4712 rc = spdk_rdma_provider_srq_flush_recv_wrs(rpoller->srq, &bad_wr); 4713 if (rc) { 4714 SPDK_ERRLOG("Failed to re-post recv WR to SRQ, err %d\n", rc); 4715 } 4716 continue; 4717 } 4718 } 4719 rqpair = rdma_recv->qpair; 4720 4721 assert(rqpair != NULL); 4722 if (spdk_likely(!wc[i].status)) { 4723 assert(wc[i].opcode == IBV_WC_RECV); 4724 if (rqpair->current_recv_depth >= rqpair->max_queue_depth) { 4725 spdk_nvmf_qpair_disconnect(&rqpair->qpair); 4726 break; 4727 } 4728 } 4729 4730 rdma_recv->wr.next = NULL; 4731 rqpair->current_recv_depth++; 4732 rdma_recv->receive_tsc = poll_tsc; 4733 rpoller->stat.requests++; 4734 STAILQ_INSERT_HEAD(&rqpair->resources->incoming_queue, rdma_recv, link); 4735 rqpair->qpair.queue_depth++; 4736 break; 4737 case RDMA_WR_TYPE_DATA: 4738 rdma_req = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvmf_rdma_request, data_wr); 4739 rqpair = SPDK_CONTAINEROF(rdma_req->req.qpair, struct spdk_nvmf_rdma_qpair, qpair); 4740 4741 assert(rdma_req->num_outstanding_data_wr > 0); 4742 4743 rqpair->current_send_depth--; 4744 rdma_req->num_outstanding_data_wr--; 4745 if (spdk_likely(!wc[i].status)) { 4746 assert(wc[i].opcode == IBV_WC_RDMA_READ); 4747 rqpair->current_read_depth--; 4748 /* wait for all outstanding reads associated with the same rdma_req to complete before proceeding. */ 4749 if (rdma_req->num_outstanding_data_wr == 0) { 4750 if (rdma_req->num_remaining_data_wr) { 4751 /* Only part of RDMA_READ operations was submitted, process the rest */ 4752 nvmf_rdma_request_reset_transfer_in(rdma_req, rtransport); 4753 rdma_req->state = RDMA_REQUEST_STATE_DATA_TRANSFER_TO_CONTROLLER_PENDING; 4754 nvmf_rdma_request_process(rtransport, rdma_req); 4755 break; 4756 } 4757 rdma_req->state = RDMA_REQUEST_STATE_READY_TO_EXECUTE; 4758 nvmf_rdma_request_process(rtransport, rdma_req); 4759 } 4760 } else { 4761 /* If the data transfer fails still force the queue into the error state, 4762 * if we were performing an RDMA_READ, we need to force the request into a 4763 * completed state since it wasn't linked to a send. However, in the RDMA_WRITE 4764 * case, we should wait for the SEND to complete. */ 4765 if (rdma_req->data.wr.opcode == IBV_WR_RDMA_READ) { 4766 rqpair->current_read_depth--; 4767 if (rdma_req->num_outstanding_data_wr == 0) { 4768 if (rdma_req->num_remaining_data_wr) { 4769 /* Partially sent request is still in the pending_rdma_read_queue, 4770 * remove it now before completing */ 4771 rdma_req->num_remaining_data_wr = 0; 4772 STAILQ_REMOVE(&rqpair->pending_rdma_read_queue, rdma_req, spdk_nvmf_rdma_request, state_link); 4773 } 4774 rdma_req->state = RDMA_REQUEST_STATE_COMPLETED; 4775 nvmf_rdma_request_process(rtransport, rdma_req); 4776 } 4777 } 4778 } 4779 break; 4780 default: 4781 SPDK_ERRLOG("Received an unknown opcode on the CQ: %d\n", wc[i].opcode); 4782 continue; 4783 } 4784 4785 /* Handle error conditions */ 4786 if (spdk_unlikely(wc[i].status)) { 4787 rqpair->ibv_in_error_state = true; 4788 nvmf_rdma_log_wc_status(rqpair, &wc[i]); 4789 4790 error = true; 4791 4792 if (spdk_nvmf_qpair_is_active(&rqpair->qpair)) { 4793 /* Disconnect the connection. */ 4794 spdk_nvmf_qpair_disconnect(&rqpair->qpair); 4795 } else { 4796 nvmf_rdma_destroy_drained_qpair(rqpair); 4797 } 4798 continue; 4799 } 4800 4801 nvmf_rdma_qpair_process_pending(rtransport, rqpair, false); 4802 4803 if (spdk_unlikely(!spdk_nvmf_qpair_is_active(&rqpair->qpair))) { 4804 nvmf_rdma_destroy_drained_qpair(rqpair); 4805 } 4806 } 4807 4808 if (spdk_unlikely(error == true)) { 4809 return -1; 4810 } 4811 4812 if (reaped == 0) { 4813 /* In some cases we may not receive any CQE but we still may have pending IO requests waiting for 4814 * a resource (e.g. a WR from the data_wr_pool). 4815 * We need to start processing of such requests if no CQE reaped */ 4816 nvmf_rdma_poller_process_pending_buf_queue(rtransport, rpoller); 4817 } 4818 4819 /* submit outstanding work requests. */ 4820 _poller_submit_recvs(rtransport, rpoller); 4821 _poller_submit_sends(rtransport, rpoller); 4822 4823 return count; 4824 } 4825 4826 static void 4827 _nvmf_rdma_remove_destroyed_device(void *c) 4828 { 4829 struct spdk_nvmf_rdma_transport *rtransport = c; 4830 struct spdk_nvmf_rdma_device *device, *device_tmp; 4831 int rc; 4832 4833 TAILQ_FOREACH_SAFE(device, &rtransport->devices, link, device_tmp) { 4834 if (device->ready_to_destroy) { 4835 destroy_ib_device(rtransport, device); 4836 } 4837 } 4838 4839 free_poll_fds(rtransport); 4840 rc = generate_poll_fds(rtransport); 4841 /* cannot handle fd allocation error here */ 4842 if (rc != 0) { 4843 SPDK_ERRLOG("Failed to generate poll fds after remove ib device.\n"); 4844 } 4845 } 4846 4847 static void 4848 _nvmf_rdma_remove_poller_in_group_cb(void *c) 4849 { 4850 struct poller_manage_ctx *ctx = c; 4851 struct spdk_nvmf_rdma_transport *rtransport = ctx->rtransport; 4852 struct spdk_nvmf_rdma_device *device = ctx->device; 4853 struct spdk_thread *thread = ctx->thread; 4854 4855 if (nvmf_rdma_all_pollers_management_done(c)) { 4856 /* destroy device when last poller is destroyed */ 4857 device->ready_to_destroy = true; 4858 spdk_thread_send_msg(thread, _nvmf_rdma_remove_destroyed_device, rtransport); 4859 } 4860 } 4861 4862 static void 4863 _nvmf_rdma_remove_poller_in_group(void *c) 4864 { 4865 struct poller_manage_ctx *ctx = c; 4866 4867 ctx->rpoller->need_destroy = true; 4868 ctx->rpoller->destroy_cb_ctx = ctx; 4869 ctx->rpoller->destroy_cb = _nvmf_rdma_remove_poller_in_group_cb; 4870 4871 /* qp will be disconnected after receiving a RDMA_CM_EVENT_DEVICE_REMOVAL event. */ 4872 if (RB_EMPTY(&ctx->rpoller->qpairs)) { 4873 nvmf_rdma_poller_destroy(ctx->rpoller); 4874 } 4875 } 4876 4877 static int 4878 nvmf_rdma_poll_group_poll(struct spdk_nvmf_transport_poll_group *group) 4879 { 4880 struct spdk_nvmf_rdma_transport *rtransport; 4881 struct spdk_nvmf_rdma_poll_group *rgroup; 4882 struct spdk_nvmf_rdma_poller *rpoller, *tmp; 4883 int count = 0, rc, rc2 = 0; 4884 4885 rtransport = SPDK_CONTAINEROF(group->transport, struct spdk_nvmf_rdma_transport, transport); 4886 rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group); 4887 4888 TAILQ_FOREACH_SAFE(rpoller, &rgroup->pollers, link, tmp) { 4889 rc = nvmf_rdma_poller_poll(rtransport, rpoller); 4890 if (spdk_unlikely(rc < 0)) { 4891 if (rc2 == 0) { 4892 rc2 = rc; 4893 } 4894 continue; 4895 } 4896 count += rc; 4897 } 4898 4899 return rc2 ? rc2 : count; 4900 } 4901 4902 static int 4903 nvmf_rdma_trid_from_cm_id(struct rdma_cm_id *id, 4904 struct spdk_nvme_transport_id *trid, 4905 bool peer) 4906 { 4907 struct sockaddr *saddr; 4908 uint16_t port; 4909 4910 spdk_nvme_trid_populate_transport(trid, SPDK_NVME_TRANSPORT_RDMA); 4911 4912 if (peer) { 4913 saddr = rdma_get_peer_addr(id); 4914 } else { 4915 saddr = rdma_get_local_addr(id); 4916 } 4917 switch (saddr->sa_family) { 4918 case AF_INET: { 4919 struct sockaddr_in *saddr_in = (struct sockaddr_in *)saddr; 4920 4921 trid->adrfam = SPDK_NVMF_ADRFAM_IPV4; 4922 inet_ntop(AF_INET, &saddr_in->sin_addr, 4923 trid->traddr, sizeof(trid->traddr)); 4924 if (peer) { 4925 port = ntohs(rdma_get_dst_port(id)); 4926 } else { 4927 port = ntohs(rdma_get_src_port(id)); 4928 } 4929 snprintf(trid->trsvcid, sizeof(trid->trsvcid), "%u", port); 4930 break; 4931 } 4932 case AF_INET6: { 4933 struct sockaddr_in6 *saddr_in = (struct sockaddr_in6 *)saddr; 4934 trid->adrfam = SPDK_NVMF_ADRFAM_IPV6; 4935 inet_ntop(AF_INET6, &saddr_in->sin6_addr, 4936 trid->traddr, sizeof(trid->traddr)); 4937 if (peer) { 4938 port = ntohs(rdma_get_dst_port(id)); 4939 } else { 4940 port = ntohs(rdma_get_src_port(id)); 4941 } 4942 snprintf(trid->trsvcid, sizeof(trid->trsvcid), "%u", port); 4943 break; 4944 } 4945 default: 4946 return -1; 4947 4948 } 4949 4950 return 0; 4951 } 4952 4953 static int 4954 nvmf_rdma_qpair_get_peer_trid(struct spdk_nvmf_qpair *qpair, 4955 struct spdk_nvme_transport_id *trid) 4956 { 4957 struct spdk_nvmf_rdma_qpair *rqpair; 4958 4959 rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair); 4960 4961 return nvmf_rdma_trid_from_cm_id(rqpair->cm_id, trid, true); 4962 } 4963 4964 static int 4965 nvmf_rdma_qpair_get_local_trid(struct spdk_nvmf_qpair *qpair, 4966 struct spdk_nvme_transport_id *trid) 4967 { 4968 struct spdk_nvmf_rdma_qpair *rqpair; 4969 4970 rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair); 4971 4972 return nvmf_rdma_trid_from_cm_id(rqpair->cm_id, trid, false); 4973 } 4974 4975 static int 4976 nvmf_rdma_qpair_get_listen_trid(struct spdk_nvmf_qpair *qpair, 4977 struct spdk_nvme_transport_id *trid) 4978 { 4979 struct spdk_nvmf_rdma_qpair *rqpair; 4980 4981 rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair); 4982 4983 return nvmf_rdma_trid_from_cm_id(rqpair->listen_id, trid, false); 4984 } 4985 4986 void 4987 spdk_nvmf_rdma_init_hooks(struct spdk_nvme_rdma_hooks *hooks) 4988 { 4989 g_nvmf_hooks = *hooks; 4990 } 4991 4992 static void 4993 nvmf_rdma_request_set_abort_status(struct spdk_nvmf_request *req, 4994 struct spdk_nvmf_rdma_request *rdma_req_to_abort, 4995 struct spdk_nvmf_rdma_qpair *rqpair) 4996 { 4997 rdma_req_to_abort->req.rsp->nvme_cpl.status.sct = SPDK_NVME_SCT_GENERIC; 4998 rdma_req_to_abort->req.rsp->nvme_cpl.status.sc = SPDK_NVME_SC_ABORTED_BY_REQUEST; 4999 5000 STAILQ_INSERT_TAIL(&rqpair->pending_rdma_send_queue, rdma_req_to_abort, state_link); 5001 rdma_req_to_abort->state = RDMA_REQUEST_STATE_READY_TO_COMPLETE_PENDING; 5002 5003 req->rsp->nvme_cpl.cdw0 &= ~1U; /* Command was successfully aborted. */ 5004 } 5005 5006 static int 5007 _nvmf_rdma_qpair_abort_request(void *ctx) 5008 { 5009 struct spdk_nvmf_request *req = ctx; 5010 struct spdk_nvmf_rdma_request *rdma_req_to_abort = SPDK_CONTAINEROF( 5011 req->req_to_abort, struct spdk_nvmf_rdma_request, req); 5012 struct spdk_nvmf_rdma_qpair *rqpair = SPDK_CONTAINEROF(req->req_to_abort->qpair, 5013 struct spdk_nvmf_rdma_qpair, qpair); 5014 int rc; 5015 5016 spdk_poller_unregister(&req->poller); 5017 5018 switch (rdma_req_to_abort->state) { 5019 case RDMA_REQUEST_STATE_EXECUTING: 5020 rc = nvmf_ctrlr_abort_request(req); 5021 if (rc == SPDK_NVMF_REQUEST_EXEC_STATUS_ASYNCHRONOUS) { 5022 return SPDK_POLLER_BUSY; 5023 } 5024 break; 5025 5026 case RDMA_REQUEST_STATE_NEED_BUFFER: 5027 STAILQ_REMOVE(&rqpair->poller->group->group.pending_buf_queue, 5028 &rdma_req_to_abort->req, spdk_nvmf_request, buf_link); 5029 5030 nvmf_rdma_request_set_abort_status(req, rdma_req_to_abort, rqpair); 5031 break; 5032 5033 case RDMA_REQUEST_STATE_DATA_TRANSFER_TO_CONTROLLER_PENDING: 5034 STAILQ_REMOVE(&rqpair->pending_rdma_read_queue, rdma_req_to_abort, 5035 spdk_nvmf_rdma_request, state_link); 5036 5037 nvmf_rdma_request_set_abort_status(req, rdma_req_to_abort, rqpair); 5038 break; 5039 5040 case RDMA_REQUEST_STATE_DATA_TRANSFER_TO_HOST_PENDING: 5041 STAILQ_REMOVE(&rqpair->pending_rdma_write_queue, rdma_req_to_abort, 5042 spdk_nvmf_rdma_request, state_link); 5043 5044 nvmf_rdma_request_set_abort_status(req, rdma_req_to_abort, rqpair); 5045 break; 5046 5047 case RDMA_REQUEST_STATE_READY_TO_COMPLETE_PENDING: 5048 /* Remove req from the list here to re-use common function */ 5049 STAILQ_REMOVE(&rqpair->pending_rdma_send_queue, rdma_req_to_abort, 5050 spdk_nvmf_rdma_request, state_link); 5051 5052 nvmf_rdma_request_set_abort_status(req, rdma_req_to_abort, rqpair); 5053 break; 5054 5055 case RDMA_REQUEST_STATE_TRANSFERRING_HOST_TO_CONTROLLER: 5056 if (spdk_get_ticks() < req->timeout_tsc) { 5057 req->poller = SPDK_POLLER_REGISTER(_nvmf_rdma_qpair_abort_request, req, 0); 5058 return SPDK_POLLER_BUSY; 5059 } 5060 break; 5061 5062 default: 5063 break; 5064 } 5065 5066 spdk_nvmf_request_complete(req); 5067 return SPDK_POLLER_BUSY; 5068 } 5069 5070 static void 5071 nvmf_rdma_qpair_abort_request(struct spdk_nvmf_qpair *qpair, 5072 struct spdk_nvmf_request *req) 5073 { 5074 struct spdk_nvmf_rdma_qpair *rqpair; 5075 struct spdk_nvmf_rdma_transport *rtransport; 5076 struct spdk_nvmf_transport *transport; 5077 uint16_t cid; 5078 uint32_t i, max_req_count; 5079 struct spdk_nvmf_rdma_request *rdma_req_to_abort = NULL, *rdma_req; 5080 5081 rqpair = SPDK_CONTAINEROF(qpair, struct spdk_nvmf_rdma_qpair, qpair); 5082 rtransport = SPDK_CONTAINEROF(qpair->transport, struct spdk_nvmf_rdma_transport, transport); 5083 transport = &rtransport->transport; 5084 5085 cid = req->cmd->nvme_cmd.cdw10_bits.abort.cid; 5086 max_req_count = rqpair->srq == NULL ? rqpair->max_queue_depth : rqpair->poller->max_srq_depth; 5087 5088 for (i = 0; i < max_req_count; i++) { 5089 rdma_req = &rqpair->resources->reqs[i]; 5090 /* When SRQ == NULL, rqpair has its own requests and req.qpair pointer always points to the qpair 5091 * When SRQ != NULL all rqpairs share common requests and qpair pointer is assigned when we start to 5092 * process a request. So in both cases all requests which are not in FREE state have valid qpair ptr */ 5093 if (rdma_req->state != RDMA_REQUEST_STATE_FREE && rdma_req->req.cmd->nvme_cmd.cid == cid && 5094 rdma_req->req.qpair == qpair) { 5095 rdma_req_to_abort = rdma_req; 5096 break; 5097 } 5098 } 5099 5100 if (rdma_req_to_abort == NULL) { 5101 spdk_nvmf_request_complete(req); 5102 return; 5103 } 5104 5105 req->req_to_abort = &rdma_req_to_abort->req; 5106 req->timeout_tsc = spdk_get_ticks() + 5107 transport->opts.abort_timeout_sec * spdk_get_ticks_hz(); 5108 req->poller = NULL; 5109 5110 _nvmf_rdma_qpair_abort_request(req); 5111 } 5112 5113 static void 5114 nvmf_rdma_poll_group_dump_stat(struct spdk_nvmf_transport_poll_group *group, 5115 struct spdk_json_write_ctx *w) 5116 { 5117 struct spdk_nvmf_rdma_poll_group *rgroup; 5118 struct spdk_nvmf_rdma_poller *rpoller; 5119 5120 assert(w != NULL); 5121 5122 rgroup = SPDK_CONTAINEROF(group, struct spdk_nvmf_rdma_poll_group, group); 5123 5124 spdk_json_write_named_uint64(w, "pending_data_buffer", rgroup->stat.pending_data_buffer); 5125 5126 spdk_json_write_named_array_begin(w, "devices"); 5127 5128 TAILQ_FOREACH(rpoller, &rgroup->pollers, link) { 5129 spdk_json_write_object_begin(w); 5130 spdk_json_write_named_string(w, "name", 5131 ibv_get_device_name(rpoller->device->context->device)); 5132 spdk_json_write_named_uint64(w, "polls", 5133 rpoller->stat.polls); 5134 spdk_json_write_named_uint64(w, "idle_polls", 5135 rpoller->stat.idle_polls); 5136 spdk_json_write_named_uint64(w, "completions", 5137 rpoller->stat.completions); 5138 spdk_json_write_named_uint64(w, "requests", 5139 rpoller->stat.requests); 5140 spdk_json_write_named_uint64(w, "request_latency", 5141 rpoller->stat.request_latency); 5142 spdk_json_write_named_uint64(w, "pending_free_request", 5143 rpoller->stat.pending_free_request); 5144 spdk_json_write_named_uint64(w, "pending_rdma_read", 5145 rpoller->stat.pending_rdma_read); 5146 spdk_json_write_named_uint64(w, "pending_rdma_write", 5147 rpoller->stat.pending_rdma_write); 5148 spdk_json_write_named_uint64(w, "pending_rdma_send", 5149 rpoller->stat.pending_rdma_send); 5150 spdk_json_write_named_uint64(w, "total_send_wrs", 5151 rpoller->stat.qp_stats.send.num_submitted_wrs); 5152 spdk_json_write_named_uint64(w, "send_doorbell_updates", 5153 rpoller->stat.qp_stats.send.doorbell_updates); 5154 spdk_json_write_named_uint64(w, "total_recv_wrs", 5155 rpoller->stat.qp_stats.recv.num_submitted_wrs); 5156 spdk_json_write_named_uint64(w, "recv_doorbell_updates", 5157 rpoller->stat.qp_stats.recv.doorbell_updates); 5158 spdk_json_write_object_end(w); 5159 } 5160 5161 spdk_json_write_array_end(w); 5162 } 5163 5164 const struct spdk_nvmf_transport_ops spdk_nvmf_transport_rdma = { 5165 .name = "RDMA", 5166 .type = SPDK_NVME_TRANSPORT_RDMA, 5167 .opts_init = nvmf_rdma_opts_init, 5168 .create = nvmf_rdma_create, 5169 .dump_opts = nvmf_rdma_dump_opts, 5170 .destroy = nvmf_rdma_destroy, 5171 5172 .listen = nvmf_rdma_listen, 5173 .stop_listen = nvmf_rdma_stop_listen, 5174 .cdata_init = nvmf_rdma_cdata_init, 5175 5176 .listener_discover = nvmf_rdma_discover, 5177 5178 .poll_group_create = nvmf_rdma_poll_group_create, 5179 .get_optimal_poll_group = nvmf_rdma_get_optimal_poll_group, 5180 .poll_group_destroy = nvmf_rdma_poll_group_destroy, 5181 .poll_group_add = nvmf_rdma_poll_group_add, 5182 .poll_group_remove = nvmf_rdma_poll_group_remove, 5183 .poll_group_poll = nvmf_rdma_poll_group_poll, 5184 5185 .req_free = nvmf_rdma_request_free, 5186 .req_complete = nvmf_rdma_request_complete, 5187 5188 .qpair_fini = nvmf_rdma_close_qpair, 5189 .qpair_get_peer_trid = nvmf_rdma_qpair_get_peer_trid, 5190 .qpair_get_local_trid = nvmf_rdma_qpair_get_local_trid, 5191 .qpair_get_listen_trid = nvmf_rdma_qpair_get_listen_trid, 5192 .qpair_abort_request = nvmf_rdma_qpair_abort_request, 5193 5194 .poll_group_dump_stat = nvmf_rdma_poll_group_dump_stat, 5195 }; 5196 5197 SPDK_NVMF_TRANSPORT_REGISTER(rdma, &spdk_nvmf_transport_rdma); 5198 SPDK_LOG_REGISTER_COMPONENT(rdma) 5199