1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2018 Intel Corporation. All rights reserved. 3 * Copyright (c) 2020 Mellanox Technologies LTD. All rights reserved. 4 */ 5 6 /** \file 7 * TCP network implementation abstraction layer 8 */ 9 10 #ifndef SPDK_INTERNAL_SOCK_H 11 #define SPDK_INTERNAL_SOCK_H 12 13 #include "spdk/stdinc.h" 14 #include "spdk/sock.h" 15 #include "spdk/queue.h" 16 #include "spdk/likely.h" 17 #include "spdk/log.h" 18 19 #ifdef __cplusplus 20 extern "C" { 21 #endif 22 23 #define MAX_EVENTS_PER_POLL 32 24 #define DEFAULT_SOCK_PRIORITY 0 25 #define MIN_SOCK_PIPE_SIZE 1024 26 #define DEFAULT_SO_RCVBUF_SIZE (2 * 1024 * 1024) 27 #define DEFAULT_SO_SNDBUF_SIZE (2 * 1024 * 1024) 28 #define MIN_SO_RCVBUF_SIZE (4 * 1024) 29 #define MIN_SO_SNDBUF_SIZE (4 * 1024) 30 #define IOV_BATCH_SIZE 64 31 32 struct spdk_sock { 33 struct spdk_net_impl *net_impl; 34 struct spdk_sock_opts opts; 35 struct spdk_sock_group_impl *group_impl; 36 TAILQ_ENTRY(spdk_sock) link; 37 38 TAILQ_HEAD(, spdk_sock_request) queued_reqs; 39 TAILQ_HEAD(, spdk_sock_request) pending_reqs; 40 struct spdk_sock_request *read_req; 41 int queued_iovcnt; 42 int cb_cnt; 43 spdk_sock_cb cb_fn; 44 void *cb_arg; 45 struct { 46 uint8_t closed : 1; 47 uint8_t reserved : 7; 48 } flags; 49 struct spdk_sock_impl_opts impl_opts; 50 }; 51 52 struct spdk_sock_group_provided_buf { 53 size_t len; 54 void *ctx; 55 STAILQ_ENTRY(spdk_sock_group_provided_buf) link; 56 }; 57 58 struct spdk_sock_group { 59 STAILQ_HEAD(, spdk_sock_group_impl) group_impls; 60 STAILQ_HEAD(, spdk_sock_group_provided_buf) pool; 61 void *ctx; 62 }; 63 64 struct spdk_sock_group_impl { 65 struct spdk_net_impl *net_impl; 66 struct spdk_sock_group *group; 67 TAILQ_HEAD(, spdk_sock) socks; 68 STAILQ_ENTRY(spdk_sock_group_impl) link; 69 }; 70 71 struct spdk_sock_map { 72 STAILQ_HEAD(, spdk_sock_placement_id_entry) entries; 73 pthread_mutex_t mtx; 74 }; 75 76 struct spdk_net_impl { 77 const char *name; 78 int priority; 79 80 int (*getaddr)(struct spdk_sock *sock, char *saddr, int slen, uint16_t *sport, char *caddr, 81 int clen, uint16_t *cport); 82 struct spdk_sock *(*connect)(const char *ip, int port, struct spdk_sock_opts *opts); 83 struct spdk_sock *(*listen)(const char *ip, int port, struct spdk_sock_opts *opts); 84 struct spdk_sock *(*accept)(struct spdk_sock *sock); 85 int (*close)(struct spdk_sock *sock); 86 ssize_t (*recv)(struct spdk_sock *sock, void *buf, size_t len); 87 ssize_t (*readv)(struct spdk_sock *sock, struct iovec *iov, int iovcnt); 88 ssize_t (*writev)(struct spdk_sock *sock, struct iovec *iov, int iovcnt); 89 90 int (*recv_next)(struct spdk_sock *sock, void **buf, void **ctx); 91 void (*writev_async)(struct spdk_sock *sock, struct spdk_sock_request *req); 92 void (*readv_async)(struct spdk_sock *sock, struct spdk_sock_request *req); 93 int (*flush)(struct spdk_sock *sock); 94 95 int (*set_recvlowat)(struct spdk_sock *sock, int nbytes); 96 int (*set_recvbuf)(struct spdk_sock *sock, int sz); 97 int (*set_sendbuf)(struct spdk_sock *sock, int sz); 98 99 bool (*is_ipv6)(struct spdk_sock *sock); 100 bool (*is_ipv4)(struct spdk_sock *sock); 101 bool (*is_connected)(struct spdk_sock *sock); 102 103 struct spdk_sock_group_impl *(*group_impl_get_optimal)(struct spdk_sock *sock, 104 struct spdk_sock_group_impl *hint); 105 struct spdk_sock_group_impl *(*group_impl_create)(void); 106 int (*group_impl_add_sock)(struct spdk_sock_group_impl *group, struct spdk_sock *sock); 107 int (*group_impl_remove_sock)(struct spdk_sock_group_impl *group, struct spdk_sock *sock); 108 int (*group_impl_poll)(struct spdk_sock_group_impl *group, int max_events, 109 struct spdk_sock **socks); 110 int (*group_impl_close)(struct spdk_sock_group_impl *group); 111 112 int (*get_opts)(struct spdk_sock_impl_opts *opts, size_t *len); 113 int (*set_opts)(const struct spdk_sock_impl_opts *opts, size_t len); 114 115 STAILQ_ENTRY(spdk_net_impl) link; 116 }; 117 118 void spdk_net_impl_register(struct spdk_net_impl *impl, int priority); 119 120 #define SPDK_NET_IMPL_REGISTER(name, impl, priority) \ 121 static void __attribute__((constructor)) net_impl_register_##name(void) \ 122 { \ 123 spdk_net_impl_register(impl, priority); \ 124 } 125 126 size_t spdk_sock_group_get_buf(struct spdk_sock_group *group, void **buf, void **ctx); 127 128 static inline void 129 spdk_sock_request_queue(struct spdk_sock *sock, struct spdk_sock_request *req) 130 { 131 assert(req->internal.curr_list == NULL); 132 TAILQ_INSERT_TAIL(&sock->queued_reqs, req, internal.link); 133 #ifdef DEBUG 134 req->internal.curr_list = &sock->queued_reqs; 135 #endif 136 sock->queued_iovcnt += req->iovcnt; 137 } 138 139 static inline void 140 spdk_sock_request_pend(struct spdk_sock *sock, struct spdk_sock_request *req) 141 { 142 assert(req->internal.curr_list == &sock->queued_reqs); 143 TAILQ_REMOVE(&sock->queued_reqs, req, internal.link); 144 assert(sock->queued_iovcnt >= req->iovcnt); 145 sock->queued_iovcnt -= req->iovcnt; 146 TAILQ_INSERT_TAIL(&sock->pending_reqs, req, internal.link); 147 #ifdef DEBUG 148 req->internal.curr_list = &sock->pending_reqs; 149 #endif 150 } 151 152 static inline int 153 spdk_sock_request_complete(struct spdk_sock *sock, struct spdk_sock_request *req, int err) 154 { 155 bool closed; 156 int rc = 0; 157 158 req->internal.offset = 0; 159 req->internal.is_zcopy = 0; 160 161 closed = sock->flags.closed; 162 sock->cb_cnt++; 163 req->cb_fn(req->cb_arg, err); 164 assert(sock->cb_cnt > 0); 165 sock->cb_cnt--; 166 167 if (sock->cb_cnt == 0 && !closed && sock->flags.closed) { 168 /* The user closed the socket in response to a callback above. */ 169 rc = -1; 170 spdk_sock_close(&sock); 171 } 172 173 return rc; 174 } 175 176 static inline int 177 spdk_sock_request_put(struct spdk_sock *sock, struct spdk_sock_request *req, int err) 178 { 179 assert(req->internal.curr_list == &sock->pending_reqs); 180 TAILQ_REMOVE(&sock->pending_reqs, req, internal.link); 181 #ifdef DEBUG 182 req->internal.curr_list = NULL; 183 #endif 184 return spdk_sock_request_complete(sock, req, err); 185 } 186 187 static inline int 188 spdk_sock_abort_requests(struct spdk_sock *sock) 189 { 190 struct spdk_sock_request *req; 191 bool closed; 192 int rc = 0; 193 194 closed = sock->flags.closed; 195 sock->cb_cnt++; 196 197 req = TAILQ_FIRST(&sock->pending_reqs); 198 while (req) { 199 assert(req->internal.curr_list == &sock->pending_reqs); 200 TAILQ_REMOVE(&sock->pending_reqs, req, internal.link); 201 #ifdef DEBUG 202 req->internal.curr_list = NULL; 203 #endif 204 205 req->cb_fn(req->cb_arg, -ECANCELED); 206 207 req = TAILQ_FIRST(&sock->pending_reqs); 208 } 209 210 req = TAILQ_FIRST(&sock->queued_reqs); 211 while (req) { 212 assert(req->internal.curr_list == &sock->queued_reqs); 213 TAILQ_REMOVE(&sock->queued_reqs, req, internal.link); 214 #ifdef DEBUG 215 req->internal.curr_list = NULL; 216 #endif 217 218 assert(sock->queued_iovcnt >= req->iovcnt); 219 sock->queued_iovcnt -= req->iovcnt; 220 221 req->cb_fn(req->cb_arg, -ECANCELED); 222 223 req = TAILQ_FIRST(&sock->queued_reqs); 224 } 225 226 req = sock->read_req; 227 if (req != NULL) { 228 sock->read_req = NULL; 229 req->cb_fn(req->cb_arg, -ECANCELED); 230 } 231 assert(sock->cb_cnt > 0); 232 sock->cb_cnt--; 233 234 assert(TAILQ_EMPTY(&sock->queued_reqs)); 235 assert(TAILQ_EMPTY(&sock->pending_reqs)); 236 237 if (sock->cb_cnt == 0 && !closed && sock->flags.closed) { 238 /* The user closed the socket in response to a callback above. */ 239 rc = -1; 240 spdk_sock_close(&sock); 241 } 242 243 return rc; 244 } 245 246 static inline int 247 spdk_sock_prep_req(struct spdk_sock_request *req, struct iovec *iovs, int index, 248 uint64_t *num_bytes) 249 { 250 unsigned int offset; 251 int iovcnt, i; 252 253 assert(index < IOV_BATCH_SIZE); 254 offset = req->internal.offset; 255 iovcnt = index; 256 257 for (i = 0; i < req->iovcnt; i++) { 258 /* Consume any offset first */ 259 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { 260 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; 261 continue; 262 } 263 264 iovs[iovcnt].iov_base = (uint8_t *)SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset; 265 iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; 266 if (num_bytes != NULL) { 267 *num_bytes += iovs[iovcnt].iov_len; 268 } 269 270 iovcnt++; 271 offset = 0; 272 273 if (iovcnt >= IOV_BATCH_SIZE) { 274 break; 275 } 276 } 277 278 return iovcnt; 279 } 280 281 static inline int 282 spdk_sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index, 283 struct spdk_sock_request **last_req, int *flags) 284 { 285 int iovcnt; 286 struct spdk_sock_request *req; 287 uint64_t total = 0; 288 289 /* Gather an iov */ 290 iovcnt = index; 291 if (spdk_unlikely(iovcnt >= IOV_BATCH_SIZE)) { 292 goto end; 293 } 294 295 if (last_req != NULL && *last_req != NULL) { 296 req = TAILQ_NEXT(*last_req, internal.link); 297 } else { 298 req = TAILQ_FIRST(&_sock->queued_reqs); 299 } 300 301 while (req) { 302 iovcnt = spdk_sock_prep_req(req, iovs, iovcnt, &total); 303 if (iovcnt >= IOV_BATCH_SIZE) { 304 break; 305 } 306 307 if (last_req != NULL) { 308 *last_req = req; 309 } 310 req = TAILQ_NEXT(req, internal.link); 311 } 312 313 end: 314 315 #if defined(MSG_ZEROCOPY) 316 /* if data size < zerocopy_threshold, remove MSG_ZEROCOPY flag */ 317 if (total < _sock->impl_opts.zerocopy_threshold && flags != NULL) { 318 *flags = *flags & (~MSG_ZEROCOPY); 319 } 320 #endif 321 322 return iovcnt; 323 } 324 325 static inline void 326 spdk_sock_get_placement_id(int fd, enum spdk_placement_mode mode, int *placement_id) 327 { 328 *placement_id = -1; 329 330 switch (mode) { 331 case PLACEMENT_NONE: 332 break; 333 case PLACEMENT_MARK: 334 case PLACEMENT_NAPI: { 335 #if defined(SO_INCOMING_NAPI_ID) 336 socklen_t len = sizeof(int); 337 338 int rc = getsockopt(fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &len); 339 if (rc == -1) { 340 SPDK_ERRLOG("getsockopt() failed: %s\n", strerror(errno)); 341 assert(false); 342 } 343 #endif 344 break; 345 } 346 case PLACEMENT_CPU: { 347 #if defined(SO_INCOMING_CPU) 348 socklen_t len = sizeof(int); 349 350 int rc = getsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, placement_id, &len); 351 if (rc == -1) { 352 SPDK_ERRLOG("getsockopt() failed: %s\n", strerror(errno)); 353 assert(false); 354 } 355 #endif 356 break; 357 } 358 default: 359 break; 360 } 361 } 362 363 /** 364 * Insert a group into the placement map. 365 * If the group is already in the map, take a reference. 366 */ 367 int spdk_sock_map_insert(struct spdk_sock_map *map, int placement_id, 368 struct spdk_sock_group_impl *group_impl); 369 370 /** 371 * Release a reference for the given placement_id. If the reference count goes to 0, the 372 * entry will no longer be associated with a group. 373 */ 374 void spdk_sock_map_release(struct spdk_sock_map *map, int placement_id); 375 376 /** 377 * Look up the group for the given placement_id. 378 */ 379 int spdk_sock_map_lookup(struct spdk_sock_map *map, int placement_id, 380 struct spdk_sock_group_impl **group_impl, struct spdk_sock_group_impl *hint); 381 382 /** 383 * Find a placement id with no associated group 384 */ 385 int spdk_sock_map_find_free(struct spdk_sock_map *map); 386 387 /** 388 * Clean up all memory associated with the given map 389 */ 390 void spdk_sock_map_cleanup(struct spdk_sock_map *map); 391 392 #ifdef __cplusplus 393 } 394 #endif 395 396 #endif /* SPDK_INTERNAL_SOCK_H */ 397