1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2018 Intel Corporation. All rights reserved. 3 * Copyright (c) 2020 Mellanox Technologies LTD. All rights reserved. 4 */ 5 6 /** \file 7 * TCP network implementation abstraction layer 8 */ 9 10 #ifndef SPDK_INTERNAL_SOCK_H 11 #define SPDK_INTERNAL_SOCK_H 12 13 #include "spdk/stdinc.h" 14 #include "spdk/sock.h" 15 #include "spdk/queue.h" 16 #include "spdk/likely.h" 17 #include "spdk/log.h" 18 #include "spdk/trace.h" 19 #include "spdk_internal/trace_defs.h" 20 21 #ifdef __cplusplus 22 extern "C" { 23 #endif 24 25 #define MAX_EVENTS_PER_POLL 32 26 #define DEFAULT_SOCK_PRIORITY 0 27 #define MIN_SOCK_PIPE_SIZE 1024 28 #define DEFAULT_SO_RCVBUF_SIZE (2 * 1024 * 1024) 29 #define DEFAULT_SO_SNDBUF_SIZE (2 * 1024 * 1024) 30 #define MIN_SO_RCVBUF_SIZE (4 * 1024) 31 #define MIN_SO_SNDBUF_SIZE (4 * 1024) 32 #define IOV_BATCH_SIZE 64 33 34 struct spdk_sock { 35 struct spdk_net_impl *net_impl; 36 struct spdk_sock_opts opts; 37 struct spdk_sock_group_impl *group_impl; 38 TAILQ_ENTRY(spdk_sock) link; 39 40 TAILQ_HEAD(, spdk_sock_request) queued_reqs; 41 TAILQ_HEAD(, spdk_sock_request) pending_reqs; 42 struct spdk_sock_request *read_req; 43 int queued_iovcnt; 44 int cb_cnt; 45 spdk_sock_cb cb_fn; 46 void *cb_arg; 47 struct { 48 uint8_t closed : 1; 49 uint8_t reserved : 7; 50 } flags; 51 struct spdk_sock_impl_opts impl_opts; 52 }; 53 54 struct spdk_sock_group_provided_buf { 55 size_t len; 56 void *ctx; 57 STAILQ_ENTRY(spdk_sock_group_provided_buf) link; 58 }; 59 60 struct spdk_sock_group { 61 STAILQ_HEAD(, spdk_sock_group_impl) group_impls; 62 STAILQ_HEAD(, spdk_sock_group_provided_buf) pool; 63 void *ctx; 64 }; 65 66 struct spdk_sock_group_impl { 67 struct spdk_net_impl *net_impl; 68 struct spdk_sock_group *group; 69 TAILQ_HEAD(, spdk_sock) socks; 70 STAILQ_ENTRY(spdk_sock_group_impl) link; 71 }; 72 73 struct spdk_sock_map { 74 STAILQ_HEAD(, spdk_sock_placement_id_entry) entries; 75 pthread_mutex_t mtx; 76 }; 77 78 struct spdk_net_impl { 79 const char *name; 80 81 int (*getaddr)(struct spdk_sock *sock, char *saddr, int slen, uint16_t *sport, char *caddr, 82 int clen, uint16_t *cport); 83 struct spdk_sock *(*connect)(const char *ip, int port, struct spdk_sock_opts *opts); 84 struct spdk_sock *(*listen)(const char *ip, int port, struct spdk_sock_opts *opts); 85 struct spdk_sock *(*accept)(struct spdk_sock *sock); 86 int (*close)(struct spdk_sock *sock); 87 ssize_t (*recv)(struct spdk_sock *sock, void *buf, size_t len); 88 ssize_t (*readv)(struct spdk_sock *sock, struct iovec *iov, int iovcnt); 89 ssize_t (*writev)(struct spdk_sock *sock, struct iovec *iov, int iovcnt); 90 91 int (*recv_next)(struct spdk_sock *sock, void **buf, void **ctx); 92 void (*writev_async)(struct spdk_sock *sock, struct spdk_sock_request *req); 93 void (*readv_async)(struct spdk_sock *sock, struct spdk_sock_request *req); 94 int (*flush)(struct spdk_sock *sock); 95 96 int (*set_recvlowat)(struct spdk_sock *sock, int nbytes); 97 int (*set_recvbuf)(struct spdk_sock *sock, int sz); 98 int (*set_sendbuf)(struct spdk_sock *sock, int sz); 99 100 bool (*is_ipv6)(struct spdk_sock *sock); 101 bool (*is_ipv4)(struct spdk_sock *sock); 102 bool (*is_connected)(struct spdk_sock *sock); 103 104 struct spdk_sock_group_impl *(*group_impl_get_optimal)(struct spdk_sock *sock, 105 struct spdk_sock_group_impl *hint); 106 struct spdk_sock_group_impl *(*group_impl_create)(void); 107 int (*group_impl_add_sock)(struct spdk_sock_group_impl *group, struct spdk_sock *sock); 108 int (*group_impl_remove_sock)(struct spdk_sock_group_impl *group, struct spdk_sock *sock); 109 int (*group_impl_poll)(struct spdk_sock_group_impl *group, int max_events, 110 struct spdk_sock **socks); 111 int (*group_impl_close)(struct spdk_sock_group_impl *group); 112 113 int (*get_opts)(struct spdk_sock_impl_opts *opts, size_t *len); 114 int (*set_opts)(const struct spdk_sock_impl_opts *opts, size_t len); 115 116 STAILQ_ENTRY(spdk_net_impl) link; 117 }; 118 119 void spdk_net_impl_register(struct spdk_net_impl *impl); 120 121 #define SPDK_NET_IMPL_REGISTER(name, impl) \ 122 static void __attribute__((constructor)) net_impl_register_##name(void) \ 123 { \ 124 spdk_net_impl_register(impl); \ 125 } 126 127 #define SPDK_NET_IMPL_REGISTER_DEFAULT(name, impl) \ 128 static void __attribute__((constructor)) net_impl_register_default_##name(void) \ 129 { \ 130 spdk_net_impl_register(impl); \ 131 spdk_sock_set_default_impl(SPDK_STRINGIFY(name)); \ 132 } 133 134 size_t spdk_sock_group_get_buf(struct spdk_sock_group *group, void **buf, void **ctx); 135 136 static inline void 137 spdk_sock_request_queue(struct spdk_sock *sock, struct spdk_sock_request *req) 138 { 139 assert(req->internal.curr_list == NULL); 140 if (spdk_trace_tpoint_enabled(TRACE_SOCK_REQ_QUEUE)) { 141 uint64_t len = 0; 142 int i; 143 144 for (i = 0; i < req->iovcnt; i++) { 145 len += SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 146 } 147 spdk_trace_record(TRACE_SOCK_REQ_QUEUE, 0, len, (uintptr_t)req, (uintptr_t)req->cb_arg); 148 } 149 TAILQ_INSERT_TAIL(&sock->queued_reqs, req, internal.link); 150 #ifdef DEBUG 151 req->internal.curr_list = &sock->queued_reqs; 152 #endif 153 sock->queued_iovcnt += req->iovcnt; 154 } 155 156 static inline void 157 spdk_sock_request_pend(struct spdk_sock *sock, struct spdk_sock_request *req) 158 { 159 assert(req->internal.curr_list == &sock->queued_reqs); 160 spdk_trace_record(TRACE_SOCK_REQ_PEND, 0, 0, (uintptr_t)req, (uintptr_t)req->cb_arg); 161 TAILQ_REMOVE(&sock->queued_reqs, req, internal.link); 162 assert(sock->queued_iovcnt >= req->iovcnt); 163 sock->queued_iovcnt -= req->iovcnt; 164 TAILQ_INSERT_TAIL(&sock->pending_reqs, req, internal.link); 165 #ifdef DEBUG 166 req->internal.curr_list = &sock->pending_reqs; 167 #endif 168 } 169 170 static inline int 171 spdk_sock_request_complete(struct spdk_sock *sock, struct spdk_sock_request *req, int err) 172 { 173 bool closed; 174 int rc = 0; 175 176 spdk_trace_record(TRACE_SOCK_REQ_COMPLETE, 0, 0, (uintptr_t)req, (uintptr_t)req->cb_arg); 177 req->internal.offset = 0; 178 req->internal.is_zcopy = 0; 179 180 closed = sock->flags.closed; 181 sock->cb_cnt++; 182 req->cb_fn(req->cb_arg, err); 183 assert(sock->cb_cnt > 0); 184 sock->cb_cnt--; 185 186 if (sock->cb_cnt == 0 && !closed && sock->flags.closed) { 187 /* The user closed the socket in response to a callback above. */ 188 rc = -1; 189 spdk_sock_close(&sock); 190 } 191 192 return rc; 193 } 194 195 static inline int 196 spdk_sock_request_put(struct spdk_sock *sock, struct spdk_sock_request *req, int err) 197 { 198 assert(req->internal.curr_list == &sock->pending_reqs); 199 TAILQ_REMOVE(&sock->pending_reqs, req, internal.link); 200 #ifdef DEBUG 201 req->internal.curr_list = NULL; 202 #endif 203 return spdk_sock_request_complete(sock, req, err); 204 } 205 206 static inline int 207 spdk_sock_abort_requests(struct spdk_sock *sock) 208 { 209 struct spdk_sock_request *req; 210 bool closed; 211 int rc = 0; 212 213 closed = sock->flags.closed; 214 sock->cb_cnt++; 215 216 req = TAILQ_FIRST(&sock->pending_reqs); 217 while (req) { 218 assert(req->internal.curr_list == &sock->pending_reqs); 219 TAILQ_REMOVE(&sock->pending_reqs, req, internal.link); 220 #ifdef DEBUG 221 req->internal.curr_list = NULL; 222 #endif 223 224 req->cb_fn(req->cb_arg, -ECANCELED); 225 226 req = TAILQ_FIRST(&sock->pending_reqs); 227 } 228 229 req = TAILQ_FIRST(&sock->queued_reqs); 230 while (req) { 231 assert(req->internal.curr_list == &sock->queued_reqs); 232 TAILQ_REMOVE(&sock->queued_reqs, req, internal.link); 233 #ifdef DEBUG 234 req->internal.curr_list = NULL; 235 #endif 236 237 assert(sock->queued_iovcnt >= req->iovcnt); 238 sock->queued_iovcnt -= req->iovcnt; 239 240 req->cb_fn(req->cb_arg, -ECANCELED); 241 242 req = TAILQ_FIRST(&sock->queued_reqs); 243 } 244 245 req = sock->read_req; 246 if (req != NULL) { 247 sock->read_req = NULL; 248 req->cb_fn(req->cb_arg, -ECANCELED); 249 } 250 assert(sock->cb_cnt > 0); 251 sock->cb_cnt--; 252 253 assert(TAILQ_EMPTY(&sock->queued_reqs)); 254 assert(TAILQ_EMPTY(&sock->pending_reqs)); 255 256 if (sock->cb_cnt == 0 && !closed && sock->flags.closed) { 257 /* The user closed the socket in response to a callback above. */ 258 rc = -1; 259 spdk_sock_close(&sock); 260 } 261 262 return rc; 263 } 264 265 static inline int 266 spdk_sock_prep_req(struct spdk_sock_request *req, struct iovec *iovs, int index, 267 uint64_t *num_bytes) 268 { 269 unsigned int offset; 270 int iovcnt, i; 271 272 assert(index < IOV_BATCH_SIZE); 273 offset = req->internal.offset; 274 iovcnt = index; 275 276 for (i = 0; i < req->iovcnt; i++) { 277 /* Consume any offset first */ 278 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 279 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 280 continue; 281 } 282 283 iovs[iovcnt].iov_base = (uint8_t *)SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset; 284 iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 285 if (num_bytes != NULL) { 286 *num_bytes += iovs[iovcnt].iov_len; 287 } 288 289 iovcnt++; 290 offset = 0; 291 292 if (iovcnt >= IOV_BATCH_SIZE) { 293 break; 294 } 295 } 296 297 return iovcnt; 298 } 299 300 static inline int 301 spdk_sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index, 302 struct spdk_sock_request **last_req, int *flags) 303 { 304 int iovcnt; 305 struct spdk_sock_request *req; 306 uint64_t total = 0; 307 308 /* Gather an iov */ 309 iovcnt = index; 310 if (spdk_unlikely(iovcnt >= IOV_BATCH_SIZE)) { 311 goto end; 312 } 313 314 if (last_req != NULL && *last_req != NULL) { 315 req = TAILQ_NEXT(*last_req, internal.link); 316 } else { 317 req = TAILQ_FIRST(&_sock->queued_reqs); 318 } 319 320 while (req) { 321 iovcnt = spdk_sock_prep_req(req, iovs, iovcnt, &total); 322 if (iovcnt >= IOV_BATCH_SIZE) { 323 break; 324 } 325 326 if (last_req != NULL) { 327 *last_req = req; 328 } 329 req = TAILQ_NEXT(req, internal.link); 330 } 331 332 end: 333 334 #if defined(MSG_ZEROCOPY) 335 /* if data size < zerocopy_threshold, remove MSG_ZEROCOPY flag */ 336 if (total < _sock->impl_opts.zerocopy_threshold && flags != NULL) { 337 *flags = *flags & (~MSG_ZEROCOPY); 338 } 339 #endif 340 341 return iovcnt; 342 } 343 344 static inline void 345 spdk_sock_get_placement_id(int fd, enum spdk_placement_mode mode, int *placement_id) 346 { 347 *placement_id = -1; 348 349 switch (mode) { 350 case PLACEMENT_NONE: 351 break; 352 case PLACEMENT_MARK: 353 case PLACEMENT_NAPI: { 354 #if defined(SO_INCOMING_NAPI_ID) 355 socklen_t len = sizeof(int); 356 357 int rc = getsockopt(fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &len); 358 if (rc == -1) { 359 SPDK_ERRLOG("getsockopt() failed: %s\n", strerror(errno)); 360 assert(false); 361 } 362 #endif 363 break; 364 } 365 case PLACEMENT_CPU: { 366 #if defined(SO_INCOMING_CPU) 367 socklen_t len = sizeof(int); 368 369 int rc = getsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, placement_id, &len); 370 if (rc == -1) { 371 SPDK_ERRLOG("getsockopt() failed: %s\n", strerror(errno)); 372 assert(false); 373 } 374 #endif 375 break; 376 } 377 default: 378 break; 379 } 380 } 381 382 /** 383 * Insert a group into the placement map. 384 * If the group is already in the map, take a reference. 385 */ 386 int spdk_sock_map_insert(struct spdk_sock_map *map, int placement_id, 387 struct spdk_sock_group_impl *group_impl); 388 389 /** 390 * Release a reference for the given placement_id. If the reference count goes to 0, the 391 * entry will no longer be associated with a group. 392 */ 393 void spdk_sock_map_release(struct spdk_sock_map *map, int placement_id); 394 395 /** 396 * Look up the group for the given placement_id. 397 */ 398 int spdk_sock_map_lookup(struct spdk_sock_map *map, int placement_id, 399 struct spdk_sock_group_impl **group_impl, struct spdk_sock_group_impl *hint); 400 401 /** 402 * Find a placement id with no associated group 403 */ 404 int spdk_sock_map_find_free(struct spdk_sock_map *map); 405 406 /** 407 * Clean up all memory associated with the given map 408 */ 409 void spdk_sock_map_cleanup(struct spdk_sock_map *map); 410 411 #ifdef __cplusplus 412 } 413 #endif 414 415 #endif /* SPDK_INTERNAL_SOCK_H */ 416