1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/log.h" 37 #include "spdk/sock.h" 38 #include "spdk_internal/sock.h" 39 #include "spdk/queue.h" 40 41 static STAILQ_HEAD(, spdk_net_impl) g_net_impls = STAILQ_HEAD_INITIALIZER(g_net_impls); 42 43 struct spdk_sock_placement_id_entry { 44 int placement_id; 45 uint32_t ref; 46 struct spdk_sock_group *group; 47 STAILQ_ENTRY(spdk_sock_placement_id_entry) link; 48 }; 49 50 static STAILQ_HEAD(, spdk_sock_placement_id_entry) g_placement_id_map = STAILQ_HEAD_INITIALIZER( 51 g_placement_id_map); 52 static pthread_mutex_t g_map_table_mutex = PTHREAD_MUTEX_INITIALIZER; 53 54 /* Insert a group into the placement map. 55 * If the group is already in the map, take a reference. 56 */ 57 static int 58 spdk_sock_map_insert(int placement_id, struct spdk_sock_group *group) 59 { 60 struct spdk_sock_placement_id_entry *entry; 61 62 pthread_mutex_lock(&g_map_table_mutex); 63 STAILQ_FOREACH(entry, &g_placement_id_map, link) { 64 if (placement_id == entry->placement_id) { 65 /* The mapping already exists, it means that different sockets have 66 * the same placement_ids. 67 */ 68 entry->ref++; 69 pthread_mutex_unlock(&g_map_table_mutex); 70 return 0; 71 } 72 } 73 74 entry = calloc(1, sizeof(*entry)); 75 if (!entry) { 76 SPDK_ERRLOG("Cannot allocate an entry for placement_id=%u\n", placement_id); 77 pthread_mutex_unlock(&g_map_table_mutex); 78 return -ENOMEM; 79 } 80 81 entry->placement_id = placement_id; 82 entry->group = group; 83 entry->ref++; 84 85 STAILQ_INSERT_TAIL(&g_placement_id_map, entry, link); 86 pthread_mutex_unlock(&g_map_table_mutex); 87 88 return 0; 89 } 90 91 /* Release a reference to the group for a given placement_id. 92 * If the reference count is 0, remove the group. 93 */ 94 static void 95 spdk_sock_map_release(int placement_id) 96 { 97 struct spdk_sock_placement_id_entry *entry; 98 99 pthread_mutex_lock(&g_map_table_mutex); 100 STAILQ_FOREACH(entry, &g_placement_id_map, link) { 101 if (placement_id == entry->placement_id) { 102 assert(entry->ref > 0); 103 entry->ref--; 104 break; 105 } 106 } 107 108 pthread_mutex_unlock(&g_map_table_mutex); 109 } 110 111 /* Look up the group for a placement_id. */ 112 static void 113 spdk_sock_map_lookup(int placement_id, struct spdk_sock_group **group) 114 { 115 struct spdk_sock_placement_id_entry *entry; 116 117 *group = NULL; 118 pthread_mutex_lock(&g_map_table_mutex); 119 STAILQ_FOREACH(entry, &g_placement_id_map, link) { 120 if (placement_id == entry->placement_id) { 121 assert(entry->group != NULL); 122 *group = entry->group; 123 break; 124 } 125 } 126 pthread_mutex_unlock(&g_map_table_mutex); 127 } 128 129 /* Remove the socket group from the map table */ 130 static void 131 spdk_sock_remove_sock_group_from_map_table(struct spdk_sock_group *group) 132 { 133 struct spdk_sock_placement_id_entry *entry, *tmp; 134 135 pthread_mutex_lock(&g_map_table_mutex); 136 STAILQ_FOREACH_SAFE(entry, &g_placement_id_map, link, tmp) { 137 if (entry->group == group) { 138 STAILQ_REMOVE(&g_placement_id_map, entry, spdk_sock_placement_id_entry, link); 139 free(entry); 140 } 141 } 142 pthread_mutex_unlock(&g_map_table_mutex); 143 144 } 145 146 int 147 spdk_sock_get_optimal_sock_group(struct spdk_sock *sock, struct spdk_sock_group **group) 148 { 149 int placement_id = 0, rc; 150 151 rc = sock->net_impl->get_placement_id(sock, &placement_id); 152 if (!rc && (placement_id != 0)) { 153 spdk_sock_map_lookup(placement_id, group); 154 return 0; 155 } else { 156 return -1; 157 } 158 } 159 160 int 161 spdk_sock_getaddr(struct spdk_sock *sock, char *saddr, int slen, uint16_t *sport, 162 char *caddr, int clen, uint16_t *cport) 163 { 164 return sock->net_impl->getaddr(sock, saddr, slen, sport, caddr, clen, cport); 165 } 166 167 struct spdk_sock * 168 spdk_sock_connect(const char *ip, int port, char *impl_name) 169 { 170 struct spdk_net_impl *impl = NULL; 171 struct spdk_sock *sock; 172 173 STAILQ_FOREACH_FROM(impl, &g_net_impls, link) { 174 if (impl_name && strncmp(impl_name, impl->name, strlen(impl->name) + 1)) { 175 continue; 176 } 177 178 sock = impl->connect(ip, port); 179 if (sock != NULL) { 180 sock->net_impl = impl; 181 TAILQ_INIT(&sock->queued_reqs); 182 TAILQ_INIT(&sock->pending_reqs); 183 return sock; 184 } 185 } 186 187 return NULL; 188 } 189 190 struct spdk_sock * 191 spdk_sock_listen(const char *ip, int port, char *impl_name) 192 { 193 struct spdk_net_impl *impl = NULL; 194 struct spdk_sock *sock; 195 196 STAILQ_FOREACH_FROM(impl, &g_net_impls, link) { 197 if (impl_name && strncmp(impl_name, impl->name, strlen(impl->name) + 1)) { 198 continue; 199 } 200 201 sock = impl->listen(ip, port); 202 if (sock != NULL) { 203 sock->net_impl = impl; 204 /* Don't need to initialize the request queues for listen 205 * sockets. */ 206 return sock; 207 } 208 } 209 210 return NULL; 211 } 212 213 struct spdk_sock * 214 spdk_sock_accept(struct spdk_sock *sock) 215 { 216 struct spdk_sock *new_sock; 217 218 new_sock = sock->net_impl->accept(sock); 219 if (new_sock != NULL) { 220 new_sock->net_impl = sock->net_impl; 221 TAILQ_INIT(&new_sock->queued_reqs); 222 TAILQ_INIT(&new_sock->pending_reqs); 223 } 224 225 return new_sock; 226 } 227 228 int 229 spdk_sock_close(struct spdk_sock **_sock) 230 { 231 struct spdk_sock *sock = *_sock; 232 int rc; 233 234 if (sock == NULL) { 235 errno = EBADF; 236 return -1; 237 } 238 239 if (sock->cb_fn != NULL) { 240 /* This sock is still part of a sock_group. */ 241 errno = EBUSY; 242 return -1; 243 } 244 245 sock->flags.closed = true; 246 247 if (sock->cb_cnt > 0) { 248 /* Let the callback unwind before destroying the socket */ 249 return 0; 250 } 251 252 spdk_sock_abort_requests(sock); 253 254 rc = sock->net_impl->close(sock); 255 if (rc == 0) { 256 *_sock = NULL; 257 } 258 259 return rc; 260 } 261 262 ssize_t 263 spdk_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 264 { 265 if (sock == NULL) { 266 errno = EBADF; 267 return -1; 268 } 269 270 if (sock->flags.closed) { 271 errno = EBADF; 272 return -1; 273 } 274 275 return sock->net_impl->recv(sock, buf, len); 276 } 277 278 ssize_t 279 spdk_sock_readv(struct spdk_sock *sock, struct iovec *iov, int iovcnt) 280 { 281 if (sock == NULL) { 282 errno = EBADF; 283 return -1; 284 } 285 286 if (sock->flags.closed) { 287 errno = EBADF; 288 return -1; 289 } 290 291 return sock->net_impl->readv(sock, iov, iovcnt); 292 } 293 294 ssize_t 295 spdk_sock_writev(struct spdk_sock *sock, struct iovec *iov, int iovcnt) 296 { 297 if (sock == NULL) { 298 errno = EBADF; 299 return -1; 300 } 301 302 if (sock->flags.closed) { 303 errno = EBADF; 304 return -1; 305 } 306 307 return sock->net_impl->writev(sock, iov, iovcnt); 308 } 309 310 void 311 spdk_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) 312 { 313 assert(req->cb_fn != NULL); 314 315 if (sock == NULL) { 316 req->cb_fn(req->cb_arg, -EBADF); 317 return; 318 } 319 320 if (sock->flags.closed) { 321 req->cb_fn(req->cb_arg, -EBADF); 322 return; 323 } 324 325 sock->net_impl->writev_async(sock, req); 326 } 327 328 int 329 spdk_sock_flush(struct spdk_sock *sock) 330 { 331 if (sock == NULL) { 332 return -EBADF; 333 } 334 335 if (sock->flags.closed) { 336 return -EBADF; 337 } 338 339 return sock->net_impl->flush(sock); 340 } 341 342 int 343 spdk_sock_set_recvlowat(struct spdk_sock *sock, int nbytes) 344 { 345 return sock->net_impl->set_recvlowat(sock, nbytes); 346 } 347 348 int 349 spdk_sock_set_recvbuf(struct spdk_sock *sock, int sz) 350 { 351 return sock->net_impl->set_recvbuf(sock, sz); 352 } 353 354 int 355 spdk_sock_set_sendbuf(struct spdk_sock *sock, int sz) 356 { 357 return sock->net_impl->set_sendbuf(sock, sz); 358 } 359 360 bool 361 spdk_sock_is_ipv6(struct spdk_sock *sock) 362 { 363 return sock->net_impl->is_ipv6(sock); 364 } 365 366 bool 367 spdk_sock_is_ipv4(struct spdk_sock *sock) 368 { 369 return sock->net_impl->is_ipv4(sock); 370 } 371 372 bool 373 spdk_sock_is_connected(struct spdk_sock *sock) 374 { 375 return sock->net_impl->is_connected(sock); 376 } 377 378 struct spdk_sock_group * 379 spdk_sock_group_create(void *ctx) 380 { 381 struct spdk_net_impl *impl = NULL; 382 struct spdk_sock_group *group; 383 struct spdk_sock_group_impl *group_impl; 384 385 group = calloc(1, sizeof(*group)); 386 if (group == NULL) { 387 return NULL; 388 } 389 390 STAILQ_INIT(&group->group_impls); 391 392 STAILQ_FOREACH_FROM(impl, &g_net_impls, link) { 393 group_impl = impl->group_impl_create(); 394 if (group_impl != NULL) { 395 STAILQ_INSERT_TAIL(&group->group_impls, group_impl, link); 396 TAILQ_INIT(&group_impl->socks); 397 group_impl->num_removed_socks = 0; 398 group_impl->net_impl = impl; 399 } 400 } 401 402 group->ctx = ctx; 403 return group; 404 } 405 406 void * 407 spdk_sock_group_get_ctx(struct spdk_sock_group *group) 408 { 409 if (group == NULL) { 410 return NULL; 411 } 412 413 return group->ctx; 414 } 415 416 int 417 spdk_sock_group_add_sock(struct spdk_sock_group *group, struct spdk_sock *sock, 418 spdk_sock_cb cb_fn, void *cb_arg) 419 { 420 struct spdk_sock_group_impl *group_impl = NULL; 421 int rc, placement_id = 0; 422 423 if (cb_fn == NULL) { 424 errno = EINVAL; 425 return -1; 426 } 427 428 if (sock->group_impl != NULL) { 429 /* 430 * This sock is already part of a sock_group. Currently we don't 431 * support this. 432 */ 433 errno = EBUSY; 434 return -1; 435 } 436 437 rc = sock->net_impl->get_placement_id(sock, &placement_id); 438 if (!rc && (placement_id != 0)) { 439 rc = spdk_sock_map_insert(placement_id, group); 440 if (rc < 0) { 441 return -1; 442 } 443 } 444 445 STAILQ_FOREACH_FROM(group_impl, &group->group_impls, link) { 446 if (sock->net_impl == group_impl->net_impl) { 447 break; 448 } 449 } 450 451 if (group_impl == NULL) { 452 errno = EINVAL; 453 return -1; 454 } 455 456 rc = group_impl->net_impl->group_impl_add_sock(group_impl, sock); 457 if (rc == 0) { 458 TAILQ_INSERT_TAIL(&group_impl->socks, sock, link); 459 sock->group_impl = group_impl; 460 sock->cb_fn = cb_fn; 461 sock->cb_arg = cb_arg; 462 } 463 464 return rc; 465 } 466 467 int 468 spdk_sock_group_remove_sock(struct spdk_sock_group *group, struct spdk_sock *sock) 469 { 470 struct spdk_sock_group_impl *group_impl = NULL; 471 int rc, placement_id = 0; 472 473 STAILQ_FOREACH_FROM(group_impl, &group->group_impls, link) { 474 if (sock->net_impl == group_impl->net_impl) { 475 break; 476 } 477 } 478 479 if (group_impl == NULL) { 480 errno = EINVAL; 481 return -1; 482 } 483 484 assert(group_impl == sock->group_impl); 485 486 rc = sock->net_impl->get_placement_id(sock, &placement_id); 487 if (!rc && (placement_id != 0)) { 488 spdk_sock_map_release(placement_id); 489 } 490 491 rc = group_impl->net_impl->group_impl_remove_sock(group_impl, sock); 492 if (rc == 0) { 493 TAILQ_REMOVE(&group_impl->socks, sock, link); 494 assert(group_impl->num_removed_socks < MAX_EVENTS_PER_POLL); 495 group_impl->removed_socks[group_impl->num_removed_socks] = (uintptr_t)sock; 496 group_impl->num_removed_socks++; 497 sock->group_impl = NULL; 498 sock->cb_fn = NULL; 499 sock->cb_arg = NULL; 500 } 501 502 return rc; 503 } 504 505 int 506 spdk_sock_group_poll(struct spdk_sock_group *group) 507 { 508 return spdk_sock_group_poll_count(group, MAX_EVENTS_PER_POLL); 509 } 510 511 static int 512 spdk_sock_group_impl_poll_count(struct spdk_sock_group_impl *group_impl, 513 struct spdk_sock_group *group, 514 int max_events) 515 { 516 struct spdk_sock *socks[MAX_EVENTS_PER_POLL]; 517 int num_events, i; 518 519 if (TAILQ_EMPTY(&group_impl->socks)) { 520 return 0; 521 } 522 523 /* The number of removed sockets should be reset for each call to poll. */ 524 group_impl->num_removed_socks = 0; 525 526 num_events = group_impl->net_impl->group_impl_poll(group_impl, max_events, socks); 527 if (num_events == -1) { 528 return -1; 529 } 530 531 for (i = 0; i < num_events; i++) { 532 struct spdk_sock *sock = socks[i]; 533 int j; 534 bool valid = true; 535 for (j = 0; j < group_impl->num_removed_socks; j++) { 536 if ((uintptr_t)sock == group_impl->removed_socks[j]) { 537 valid = false; 538 break; 539 } 540 } 541 542 if (valid) { 543 assert(sock->cb_fn != NULL); 544 sock->cb_fn(sock->cb_arg, group, sock); 545 } 546 } 547 548 return num_events; 549 } 550 551 int 552 spdk_sock_group_poll_count(struct spdk_sock_group *group, int max_events) 553 { 554 struct spdk_sock_group_impl *group_impl = NULL; 555 int rc, num_events = 0; 556 557 if (max_events < 1) { 558 errno = -EINVAL; 559 return -1; 560 } 561 562 /* 563 * Only poll for up to 32 events at a time - if more events are pending, 564 * the next call to this function will reap them. 565 */ 566 if (max_events > MAX_EVENTS_PER_POLL) { 567 max_events = MAX_EVENTS_PER_POLL; 568 } 569 570 STAILQ_FOREACH_FROM(group_impl, &group->group_impls, link) { 571 rc = spdk_sock_group_impl_poll_count(group_impl, group, max_events); 572 if (rc < 0) { 573 num_events = -1; 574 SPDK_ERRLOG("group_impl_poll_count for net(%s) failed\n", 575 group_impl->net_impl->name); 576 } else if (num_events >= 0) { 577 num_events += rc; 578 } 579 } 580 581 return num_events; 582 } 583 584 int 585 spdk_sock_group_close(struct spdk_sock_group **group) 586 { 587 struct spdk_sock_group_impl *group_impl = NULL, *tmp; 588 int rc; 589 590 if (*group == NULL) { 591 errno = EBADF; 592 return -1; 593 } 594 595 STAILQ_FOREACH_SAFE(group_impl, &(*group)->group_impls, link, tmp) { 596 if (!TAILQ_EMPTY(&group_impl->socks)) { 597 errno = EBUSY; 598 return -1; 599 } 600 } 601 602 STAILQ_FOREACH_SAFE(group_impl, &(*group)->group_impls, link, tmp) { 603 rc = group_impl->net_impl->group_impl_close(group_impl); 604 if (rc != 0) { 605 SPDK_ERRLOG("group_impl_close for net(%s) failed\n", 606 group_impl->net_impl->name); 607 } 608 } 609 610 spdk_sock_remove_sock_group_from_map_table(*group); 611 free(*group); 612 *group = NULL; 613 614 return 0; 615 } 616 617 void 618 spdk_net_impl_register(struct spdk_net_impl *impl, int priority) 619 { 620 struct spdk_net_impl *cur, *prev; 621 622 impl->priority = priority; 623 prev = NULL; 624 STAILQ_FOREACH(cur, &g_net_impls, link) { 625 if (impl->priority > cur->priority) { 626 break; 627 } 628 prev = cur; 629 } 630 631 if (prev) { 632 STAILQ_INSERT_AFTER(&g_net_impls, prev, impl, link); 633 } else { 634 STAILQ_INSERT_HEAD(&g_net_impls, impl, link); 635 } 636 } 637