1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/log.h" 37 #include "spdk/sock.h" 38 #include "spdk_internal/sock.h" 39 #include "spdk/queue.h" 40 41 static STAILQ_HEAD(, spdk_net_impl) g_net_impls = STAILQ_HEAD_INITIALIZER(g_net_impls); 42 43 struct spdk_sock_placement_id_entry { 44 int placement_id; 45 uint32_t ref; 46 struct spdk_sock_group *group; 47 STAILQ_ENTRY(spdk_sock_placement_id_entry) link; 48 }; 49 50 static STAILQ_HEAD(, spdk_sock_placement_id_entry) g_placement_id_map = STAILQ_HEAD_INITIALIZER( 51 g_placement_id_map); 52 static pthread_mutex_t g_map_table_mutex = PTHREAD_MUTEX_INITIALIZER; 53 54 /* Insert a group into the placement map. 55 * If the group is already in the map, take a reference. 56 */ 57 static int 58 spdk_sock_map_insert(int placement_id, struct spdk_sock_group *group) 59 { 60 struct spdk_sock_placement_id_entry *entry; 61 62 pthread_mutex_lock(&g_map_table_mutex); 63 STAILQ_FOREACH(entry, &g_placement_id_map, link) { 64 if (placement_id == entry->placement_id) { 65 /* The mapping already exists, it means that different sockets have 66 * the same placement_ids. 67 */ 68 entry->ref++; 69 pthread_mutex_unlock(&g_map_table_mutex); 70 return 0; 71 } 72 } 73 74 entry = calloc(1, sizeof(*entry)); 75 if (!entry) { 76 SPDK_ERRLOG("Cannot allocate an entry for placement_id=%u\n", placement_id); 77 pthread_mutex_unlock(&g_map_table_mutex); 78 return -ENOMEM; 79 } 80 81 entry->placement_id = placement_id; 82 entry->group = group; 83 entry->ref++; 84 85 STAILQ_INSERT_TAIL(&g_placement_id_map, entry, link); 86 pthread_mutex_unlock(&g_map_table_mutex); 87 88 return 0; 89 } 90 91 /* Release a reference to the group for a given placement_id. 92 * If the reference count is 0, remove the group. 93 */ 94 static void 95 spdk_sock_map_release(int placement_id) 96 { 97 struct spdk_sock_placement_id_entry *entry; 98 99 pthread_mutex_lock(&g_map_table_mutex); 100 STAILQ_FOREACH(entry, &g_placement_id_map, link) { 101 if (placement_id == entry->placement_id) { 102 assert(entry->ref > 0); 103 entry->ref--; 104 if (!entry->ref) { 105 STAILQ_REMOVE(&g_placement_id_map, entry, spdk_sock_placement_id_entry, link); 106 free(entry); 107 } 108 break; 109 } 110 } 111 112 pthread_mutex_unlock(&g_map_table_mutex); 113 } 114 115 /* Look up the group for a placement_id. */ 116 static void 117 spdk_sock_map_lookup(int placement_id, struct spdk_sock_group **group) 118 { 119 struct spdk_sock_placement_id_entry *entry; 120 121 *group = NULL; 122 pthread_mutex_lock(&g_map_table_mutex); 123 STAILQ_FOREACH(entry, &g_placement_id_map, link) { 124 if (placement_id == entry->placement_id) { 125 assert(entry->group != NULL); 126 *group = entry->group; 127 break; 128 } 129 } 130 pthread_mutex_unlock(&g_map_table_mutex); 131 } 132 133 /* Remove the socket group from the map table */ 134 static void 135 spdk_sock_remove_sock_group_from_map_table(struct spdk_sock_group *group) 136 { 137 struct spdk_sock_placement_id_entry *entry, *tmp; 138 139 pthread_mutex_lock(&g_map_table_mutex); 140 STAILQ_FOREACH_SAFE(entry, &g_placement_id_map, link, tmp) { 141 if (entry->group == group) { 142 STAILQ_REMOVE(&g_placement_id_map, entry, spdk_sock_placement_id_entry, link); 143 free(entry); 144 } 145 } 146 pthread_mutex_unlock(&g_map_table_mutex); 147 148 } 149 150 int 151 spdk_sock_get_optimal_sock_group(struct spdk_sock *sock, struct spdk_sock_group **group) 152 { 153 int placement_id = 0, rc; 154 155 rc = sock->net_impl->get_placement_id(sock, &placement_id); 156 if (!rc && (placement_id != 0)) { 157 spdk_sock_map_lookup(placement_id, group); 158 return 0; 159 } else { 160 return -1; 161 } 162 } 163 164 int 165 spdk_sock_getaddr(struct spdk_sock *sock, char *saddr, int slen, uint16_t *sport, 166 char *caddr, int clen, uint16_t *cport) 167 { 168 return sock->net_impl->getaddr(sock, saddr, slen, sport, caddr, clen, cport); 169 } 170 171 struct spdk_sock * 172 spdk_sock_connect(const char *ip, int port, char *impl_name) 173 { 174 struct spdk_net_impl *impl = NULL; 175 struct spdk_sock *sock; 176 177 STAILQ_FOREACH_FROM(impl, &g_net_impls, link) { 178 if (impl_name && strncmp(impl_name, impl->name, strlen(impl->name) + 1)) { 179 continue; 180 } 181 182 sock = impl->connect(ip, port); 183 if (sock != NULL) { 184 sock->net_impl = impl; 185 TAILQ_INIT(&sock->queued_reqs); 186 TAILQ_INIT(&sock->pending_reqs); 187 return sock; 188 } 189 } 190 191 return NULL; 192 } 193 194 struct spdk_sock * 195 spdk_sock_listen(const char *ip, int port, char *impl_name) 196 { 197 struct spdk_net_impl *impl = NULL; 198 struct spdk_sock *sock; 199 200 STAILQ_FOREACH_FROM(impl, &g_net_impls, link) { 201 if (impl_name && strncmp(impl_name, impl->name, strlen(impl->name) + 1)) { 202 continue; 203 } 204 205 sock = impl->listen(ip, port); 206 if (sock != NULL) { 207 sock->net_impl = impl; 208 /* Don't need to initialize the request queues for listen 209 * sockets. */ 210 return sock; 211 } 212 } 213 214 return NULL; 215 } 216 217 struct spdk_sock * 218 spdk_sock_accept(struct spdk_sock *sock) 219 { 220 struct spdk_sock *new_sock; 221 222 new_sock = sock->net_impl->accept(sock); 223 if (new_sock != NULL) { 224 new_sock->net_impl = sock->net_impl; 225 TAILQ_INIT(&new_sock->queued_reqs); 226 TAILQ_INIT(&new_sock->pending_reqs); 227 } 228 229 return new_sock; 230 } 231 232 int 233 spdk_sock_close(struct spdk_sock **_sock) 234 { 235 struct spdk_sock *sock = *_sock; 236 int rc; 237 238 if (sock == NULL) { 239 errno = EBADF; 240 return -1; 241 } 242 243 if (sock->cb_fn != NULL) { 244 /* This sock is still part of a sock_group. */ 245 errno = EBUSY; 246 return -1; 247 } 248 249 sock->flags.closed = true; 250 251 if (sock->cb_cnt > 0) { 252 /* Let the callback unwind before destroying the socket */ 253 return 0; 254 } 255 256 spdk_sock_abort_requests(sock); 257 258 rc = sock->net_impl->close(sock); 259 if (rc == 0) { 260 *_sock = NULL; 261 } 262 263 return rc; 264 } 265 266 ssize_t 267 spdk_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 268 { 269 if (sock == NULL) { 270 errno = EBADF; 271 return -1; 272 } 273 274 if (sock->flags.closed) { 275 errno = EBADF; 276 return -1; 277 } 278 279 return sock->net_impl->recv(sock, buf, len); 280 } 281 282 ssize_t 283 spdk_sock_readv(struct spdk_sock *sock, struct iovec *iov, int iovcnt) 284 { 285 if (sock == NULL) { 286 errno = EBADF; 287 return -1; 288 } 289 290 if (sock->flags.closed) { 291 errno = EBADF; 292 return -1; 293 } 294 295 return sock->net_impl->readv(sock, iov, iovcnt); 296 } 297 298 ssize_t 299 spdk_sock_writev(struct spdk_sock *sock, struct iovec *iov, int iovcnt) 300 { 301 if (sock == NULL) { 302 errno = EBADF; 303 return -1; 304 } 305 306 if (sock->flags.closed) { 307 errno = EBADF; 308 return -1; 309 } 310 311 return sock->net_impl->writev(sock, iov, iovcnt); 312 } 313 314 void 315 spdk_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) 316 { 317 assert(req->cb_fn != NULL); 318 319 if (sock == NULL) { 320 req->cb_fn(req->cb_arg, -EBADF); 321 return; 322 } 323 324 if (sock->flags.closed) { 325 req->cb_fn(req->cb_arg, -EBADF); 326 return; 327 } 328 329 sock->net_impl->writev_async(sock, req); 330 } 331 332 int 333 spdk_sock_flush(struct spdk_sock *sock) 334 { 335 if (sock == NULL) { 336 return -EBADF; 337 } 338 339 if (sock->flags.closed) { 340 return -EBADF; 341 } 342 343 return sock->net_impl->flush(sock); 344 } 345 346 int 347 spdk_sock_set_recvlowat(struct spdk_sock *sock, int nbytes) 348 { 349 return sock->net_impl->set_recvlowat(sock, nbytes); 350 } 351 352 int 353 spdk_sock_set_recvbuf(struct spdk_sock *sock, int sz) 354 { 355 return sock->net_impl->set_recvbuf(sock, sz); 356 } 357 358 int 359 spdk_sock_set_sendbuf(struct spdk_sock *sock, int sz) 360 { 361 return sock->net_impl->set_sendbuf(sock, sz); 362 } 363 364 int 365 spdk_sock_set_priority(struct spdk_sock *sock, int priority) 366 { 367 return sock->net_impl->set_priority(sock, priority); 368 } 369 370 bool 371 spdk_sock_is_ipv6(struct spdk_sock *sock) 372 { 373 return sock->net_impl->is_ipv6(sock); 374 } 375 376 bool 377 spdk_sock_is_ipv4(struct spdk_sock *sock) 378 { 379 return sock->net_impl->is_ipv4(sock); 380 } 381 382 bool 383 spdk_sock_is_connected(struct spdk_sock *sock) 384 { 385 return sock->net_impl->is_connected(sock); 386 } 387 388 struct spdk_sock_group * 389 spdk_sock_group_create(void *ctx) 390 { 391 struct spdk_net_impl *impl = NULL; 392 struct spdk_sock_group *group; 393 struct spdk_sock_group_impl *group_impl; 394 395 group = calloc(1, sizeof(*group)); 396 if (group == NULL) { 397 return NULL; 398 } 399 400 STAILQ_INIT(&group->group_impls); 401 402 STAILQ_FOREACH_FROM(impl, &g_net_impls, link) { 403 group_impl = impl->group_impl_create(); 404 if (group_impl != NULL) { 405 STAILQ_INSERT_TAIL(&group->group_impls, group_impl, link); 406 TAILQ_INIT(&group_impl->socks); 407 group_impl->net_impl = impl; 408 } 409 } 410 411 group->ctx = ctx; 412 return group; 413 } 414 415 void * 416 spdk_sock_group_get_ctx(struct spdk_sock_group *group) 417 { 418 if (group == NULL) { 419 return NULL; 420 } 421 422 return group->ctx; 423 } 424 425 int 426 spdk_sock_group_add_sock(struct spdk_sock_group *group, struct spdk_sock *sock, 427 spdk_sock_cb cb_fn, void *cb_arg) 428 { 429 struct spdk_sock_group_impl *group_impl = NULL; 430 int rc, placement_id = 0; 431 432 if (cb_fn == NULL) { 433 errno = EINVAL; 434 return -1; 435 } 436 437 if (sock->group_impl != NULL) { 438 /* 439 * This sock is already part of a sock_group. Currently we don't 440 * support this. 441 */ 442 errno = EBUSY; 443 return -1; 444 } 445 446 rc = sock->net_impl->get_placement_id(sock, &placement_id); 447 if (!rc && (placement_id != 0)) { 448 rc = spdk_sock_map_insert(placement_id, group); 449 if (rc < 0) { 450 return -1; 451 } 452 } 453 454 STAILQ_FOREACH_FROM(group_impl, &group->group_impls, link) { 455 if (sock->net_impl == group_impl->net_impl) { 456 break; 457 } 458 } 459 460 if (group_impl == NULL) { 461 errno = EINVAL; 462 return -1; 463 } 464 465 rc = group_impl->net_impl->group_impl_add_sock(group_impl, sock); 466 if (rc == 0) { 467 TAILQ_INSERT_TAIL(&group_impl->socks, sock, link); 468 sock->group_impl = group_impl; 469 sock->cb_fn = cb_fn; 470 sock->cb_arg = cb_arg; 471 } 472 473 return rc; 474 } 475 476 int 477 spdk_sock_group_remove_sock(struct spdk_sock_group *group, struct spdk_sock *sock) 478 { 479 struct spdk_sock_group_impl *group_impl = NULL; 480 int rc, placement_id = 0; 481 482 STAILQ_FOREACH_FROM(group_impl, &group->group_impls, link) { 483 if (sock->net_impl == group_impl->net_impl) { 484 break; 485 } 486 } 487 488 if (group_impl == NULL) { 489 errno = EINVAL; 490 return -1; 491 } 492 493 assert(group_impl == sock->group_impl); 494 495 rc = sock->net_impl->get_placement_id(sock, &placement_id); 496 if (!rc && (placement_id != 0)) { 497 spdk_sock_map_release(placement_id); 498 } 499 500 rc = group_impl->net_impl->group_impl_remove_sock(group_impl, sock); 501 if (rc == 0) { 502 TAILQ_REMOVE(&group_impl->socks, sock, link); 503 sock->group_impl = NULL; 504 sock->cb_fn = NULL; 505 sock->cb_arg = NULL; 506 } 507 508 return rc; 509 } 510 511 int 512 spdk_sock_group_poll(struct spdk_sock_group *group) 513 { 514 return spdk_sock_group_poll_count(group, MAX_EVENTS_PER_POLL); 515 } 516 517 static int 518 spdk_sock_group_impl_poll_count(struct spdk_sock_group_impl *group_impl, 519 struct spdk_sock_group *group, 520 int max_events) 521 { 522 struct spdk_sock *socks[MAX_EVENTS_PER_POLL]; 523 int num_events, i; 524 525 if (TAILQ_EMPTY(&group_impl->socks)) { 526 return 0; 527 } 528 529 num_events = group_impl->net_impl->group_impl_poll(group_impl, max_events, socks); 530 if (num_events == -1) { 531 return -1; 532 } 533 534 for (i = 0; i < num_events; i++) { 535 struct spdk_sock *sock = socks[i]; 536 537 assert(sock->cb_fn != NULL); 538 sock->cb_fn(sock->cb_arg, group, sock); 539 } 540 return num_events; 541 } 542 543 int 544 spdk_sock_group_poll_count(struct spdk_sock_group *group, int max_events) 545 { 546 struct spdk_sock_group_impl *group_impl = NULL; 547 int rc, num_events = 0; 548 549 if (max_events < 1) { 550 errno = -EINVAL; 551 return -1; 552 } 553 554 /* 555 * Only poll for up to 32 events at a time - if more events are pending, 556 * the next call to this function will reap them. 557 */ 558 if (max_events > MAX_EVENTS_PER_POLL) { 559 max_events = MAX_EVENTS_PER_POLL; 560 } 561 562 STAILQ_FOREACH_FROM(group_impl, &group->group_impls, link) { 563 rc = spdk_sock_group_impl_poll_count(group_impl, group, max_events); 564 if (rc < 0) { 565 num_events = -1; 566 SPDK_ERRLOG("group_impl_poll_count for net(%s) failed\n", 567 group_impl->net_impl->name); 568 } else if (num_events >= 0) { 569 num_events += rc; 570 } 571 } 572 573 return num_events; 574 } 575 576 int 577 spdk_sock_group_close(struct spdk_sock_group **group) 578 { 579 struct spdk_sock_group_impl *group_impl = NULL, *tmp; 580 int rc; 581 582 if (*group == NULL) { 583 errno = EBADF; 584 return -1; 585 } 586 587 STAILQ_FOREACH_SAFE(group_impl, &(*group)->group_impls, link, tmp) { 588 if (!TAILQ_EMPTY(&group_impl->socks)) { 589 errno = EBUSY; 590 return -1; 591 } 592 } 593 594 STAILQ_FOREACH_SAFE(group_impl, &(*group)->group_impls, link, tmp) { 595 rc = group_impl->net_impl->group_impl_close(group_impl); 596 if (rc != 0) { 597 SPDK_ERRLOG("group_impl_close for net(%s) failed\n", 598 group_impl->net_impl->name); 599 } 600 } 601 602 spdk_sock_remove_sock_group_from_map_table(*group); 603 free(*group); 604 *group = NULL; 605 606 return 0; 607 } 608 609 void 610 spdk_net_impl_register(struct spdk_net_impl *impl, int priority) 611 { 612 struct spdk_net_impl *cur, *prev; 613 614 impl->priority = priority; 615 prev = NULL; 616 STAILQ_FOREACH(cur, &g_net_impls, link) { 617 if (impl->priority > cur->priority) { 618 break; 619 } 620 prev = cur; 621 } 622 623 if (prev) { 624 STAILQ_INSERT_AFTER(&g_net_impls, prev, impl, link); 625 } else { 626 STAILQ_INSERT_HEAD(&g_net_impls, impl, link); 627 } 628 } 629