1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/log.h" 37 #include "spdk/sock.h" 38 #include "spdk_internal/sock.h" 39 #include "spdk/queue.h" 40 41 static STAILQ_HEAD(, spdk_net_impl) g_net_impls = STAILQ_HEAD_INITIALIZER(g_net_impls); 42 43 struct spdk_sock_placement_id_entry { 44 int placement_id; 45 uint32_t ref; 46 struct spdk_sock_group *group; 47 STAILQ_ENTRY(spdk_sock_placement_id_entry) link; 48 }; 49 50 static STAILQ_HEAD(, spdk_sock_placement_id_entry) g_placement_id_map = STAILQ_HEAD_INITIALIZER( 51 g_placement_id_map); 52 static pthread_mutex_t g_map_table_mutex = PTHREAD_MUTEX_INITIALIZER; 53 54 /* Insert a group into the placement map. 55 * If the group is already in the map, take a reference. 56 */ 57 static int 58 spdk_sock_map_insert(int placement_id, struct spdk_sock_group *group) 59 { 60 struct spdk_sock_placement_id_entry *entry; 61 62 pthread_mutex_lock(&g_map_table_mutex); 63 STAILQ_FOREACH(entry, &g_placement_id_map, link) { 64 if (placement_id == entry->placement_id) { 65 /* The mapping already exists, it means that different sockets have 66 * the same placement_ids. 67 */ 68 entry->ref++; 69 pthread_mutex_unlock(&g_map_table_mutex); 70 return 0; 71 } 72 } 73 74 entry = calloc(1, sizeof(*entry)); 75 if (!entry) { 76 SPDK_ERRLOG("Cannot allocate an entry for placement_id=%u\n", placement_id); 77 pthread_mutex_unlock(&g_map_table_mutex); 78 return -ENOMEM; 79 } 80 81 entry->placement_id = placement_id; 82 entry->group = group; 83 entry->ref++; 84 85 STAILQ_INSERT_TAIL(&g_placement_id_map, entry, link); 86 pthread_mutex_unlock(&g_map_table_mutex); 87 88 return 0; 89 } 90 91 /* Release a reference to the group for a given placement_id. 92 * If the reference count is 0, remove the group. 93 */ 94 static void 95 spdk_sock_map_release(int placement_id) 96 { 97 struct spdk_sock_placement_id_entry *entry; 98 99 pthread_mutex_lock(&g_map_table_mutex); 100 STAILQ_FOREACH(entry, &g_placement_id_map, link) { 101 if (placement_id == entry->placement_id) { 102 assert(entry->ref > 0); 103 entry->ref--; 104 if (!entry->ref) { 105 STAILQ_REMOVE(&g_placement_id_map, entry, spdk_sock_placement_id_entry, link); 106 free(entry); 107 } 108 break; 109 } 110 } 111 112 pthread_mutex_unlock(&g_map_table_mutex); 113 } 114 115 /* Look up the group for a placement_id. */ 116 static void 117 spdk_sock_map_lookup(int placement_id, struct spdk_sock_group **group) 118 { 119 struct spdk_sock_placement_id_entry *entry; 120 121 *group = NULL; 122 pthread_mutex_lock(&g_map_table_mutex); 123 STAILQ_FOREACH(entry, &g_placement_id_map, link) { 124 if (placement_id == entry->placement_id) { 125 assert(entry->group != NULL); 126 *group = entry->group; 127 break; 128 } 129 } 130 pthread_mutex_unlock(&g_map_table_mutex); 131 } 132 133 /* Remove the socket group from the map table */ 134 static void 135 spdk_sock_remove_sock_group_from_map_table(struct spdk_sock_group *group) 136 { 137 struct spdk_sock_placement_id_entry *entry, *tmp; 138 139 pthread_mutex_lock(&g_map_table_mutex); 140 STAILQ_FOREACH_SAFE(entry, &g_placement_id_map, link, tmp) { 141 if (entry->group == group) { 142 STAILQ_REMOVE(&g_placement_id_map, entry, spdk_sock_placement_id_entry, link); 143 free(entry); 144 } 145 } 146 pthread_mutex_unlock(&g_map_table_mutex); 147 148 } 149 150 int 151 spdk_sock_get_optimal_sock_group(struct spdk_sock *sock, struct spdk_sock_group **group) 152 { 153 int placement_id = 0, rc; 154 155 rc = sock->net_impl->get_placement_id(sock, &placement_id); 156 if (!rc && (placement_id != 0)) { 157 spdk_sock_map_lookup(placement_id, group); 158 return 0; 159 } else { 160 return -1; 161 } 162 } 163 164 int 165 spdk_sock_getaddr(struct spdk_sock *sock, char *saddr, int slen, uint16_t *sport, 166 char *caddr, int clen, uint16_t *cport) 167 { 168 return sock->net_impl->getaddr(sock, saddr, slen, sport, caddr, clen, cport); 169 } 170 171 struct spdk_sock * 172 spdk_sock_connect(const char *ip, int port) 173 { 174 struct spdk_net_impl *impl = NULL; 175 struct spdk_sock *sock; 176 177 STAILQ_FOREACH_FROM(impl, &g_net_impls, link) { 178 sock = impl->connect(ip, port); 179 if (sock != NULL) { 180 sock->net_impl = impl; 181 TAILQ_INIT(&sock->queued_reqs); 182 TAILQ_INIT(&sock->pending_reqs); 183 return sock; 184 } 185 } 186 187 return NULL; 188 } 189 190 struct spdk_sock * 191 spdk_sock_listen(const char *ip, int port) 192 { 193 struct spdk_net_impl *impl = NULL; 194 struct spdk_sock *sock; 195 196 STAILQ_FOREACH_FROM(impl, &g_net_impls, link) { 197 sock = impl->listen(ip, port); 198 if (sock != NULL) { 199 sock->net_impl = impl; 200 /* Don't need to initialize the request queues for listen 201 * sockets. */ 202 return sock; 203 } 204 } 205 206 return NULL; 207 } 208 209 struct spdk_sock * 210 spdk_sock_accept(struct spdk_sock *sock) 211 { 212 struct spdk_sock *new_sock; 213 214 new_sock = sock->net_impl->accept(sock); 215 if (new_sock != NULL) { 216 new_sock->net_impl = sock->net_impl; 217 TAILQ_INIT(&new_sock->queued_reqs); 218 TAILQ_INIT(&new_sock->pending_reqs); 219 } 220 221 return new_sock; 222 } 223 224 int 225 spdk_sock_close(struct spdk_sock **_sock) 226 { 227 struct spdk_sock *sock = *_sock; 228 int rc; 229 230 if (sock == NULL) { 231 errno = EBADF; 232 return -1; 233 } 234 235 if (sock->cb_fn != NULL) { 236 /* This sock is still part of a sock_group. */ 237 errno = EBUSY; 238 return -1; 239 } 240 241 sock->flags.closed = true; 242 243 if (sock->cb_cnt > 0) { 244 /* Let the callback unwind before destroying the socket */ 245 return 0; 246 } 247 248 spdk_sock_abort_requests(sock); 249 250 rc = sock->net_impl->close(sock); 251 if (rc == 0) { 252 *_sock = NULL; 253 } 254 255 return rc; 256 } 257 258 ssize_t 259 spdk_sock_recv(struct spdk_sock *sock, void *buf, size_t len) 260 { 261 if (sock == NULL) { 262 errno = EBADF; 263 return -1; 264 } 265 266 if (sock->flags.closed) { 267 errno = EBADF; 268 return -1; 269 } 270 271 return sock->net_impl->recv(sock, buf, len); 272 } 273 274 ssize_t 275 spdk_sock_readv(struct spdk_sock *sock, struct iovec *iov, int iovcnt) 276 { 277 if (sock == NULL) { 278 errno = EBADF; 279 return -1; 280 } 281 282 if (sock->flags.closed) { 283 errno = EBADF; 284 return -1; 285 } 286 287 return sock->net_impl->readv(sock, iov, iovcnt); 288 } 289 290 ssize_t 291 spdk_sock_writev(struct spdk_sock *sock, struct iovec *iov, int iovcnt) 292 { 293 if (sock == NULL) { 294 errno = EBADF; 295 return -1; 296 } 297 298 if (sock->flags.closed) { 299 errno = EBADF; 300 return -1; 301 } 302 303 return sock->net_impl->writev(sock, iov, iovcnt); 304 } 305 306 void 307 spdk_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) 308 { 309 assert(req->cb_fn != NULL); 310 311 if (sock == NULL) { 312 req->cb_fn(req->cb_arg, -EBADF); 313 return; 314 } 315 316 if (sock->flags.closed) { 317 req->cb_fn(req->cb_arg, -EBADF); 318 return; 319 } 320 321 sock->net_impl->writev_async(sock, req); 322 } 323 324 int 325 spdk_sock_set_recvlowat(struct spdk_sock *sock, int nbytes) 326 { 327 return sock->net_impl->set_recvlowat(sock, nbytes); 328 } 329 330 int 331 spdk_sock_set_recvbuf(struct spdk_sock *sock, int sz) 332 { 333 return sock->net_impl->set_recvbuf(sock, sz); 334 } 335 336 int 337 spdk_sock_set_sendbuf(struct spdk_sock *sock, int sz) 338 { 339 return sock->net_impl->set_sendbuf(sock, sz); 340 } 341 342 int 343 spdk_sock_set_priority(struct spdk_sock *sock, int priority) 344 { 345 return sock->net_impl->set_priority(sock, priority); 346 } 347 348 bool 349 spdk_sock_is_ipv6(struct spdk_sock *sock) 350 { 351 return sock->net_impl->is_ipv6(sock); 352 } 353 354 bool 355 spdk_sock_is_ipv4(struct spdk_sock *sock) 356 { 357 return sock->net_impl->is_ipv4(sock); 358 } 359 360 bool 361 spdk_sock_is_connected(struct spdk_sock *sock) 362 { 363 return sock->net_impl->is_connected(sock); 364 } 365 366 struct spdk_sock_group * 367 spdk_sock_group_create(void *ctx) 368 { 369 struct spdk_net_impl *impl = NULL; 370 struct spdk_sock_group *group; 371 struct spdk_sock_group_impl *group_impl; 372 373 group = calloc(1, sizeof(*group)); 374 if (group == NULL) { 375 return NULL; 376 } 377 378 STAILQ_INIT(&group->group_impls); 379 380 STAILQ_FOREACH_FROM(impl, &g_net_impls, link) { 381 group_impl = impl->group_impl_create(); 382 if (group_impl != NULL) { 383 STAILQ_INSERT_TAIL(&group->group_impls, group_impl, link); 384 TAILQ_INIT(&group_impl->socks); 385 group_impl->net_impl = impl; 386 } 387 } 388 389 group->ctx = ctx; 390 return group; 391 } 392 393 void * 394 spdk_sock_group_get_ctx(struct spdk_sock_group *group) 395 { 396 if (group == NULL) { 397 return NULL; 398 } 399 400 return group->ctx; 401 } 402 403 int 404 spdk_sock_group_add_sock(struct spdk_sock_group *group, struct spdk_sock *sock, 405 spdk_sock_cb cb_fn, void *cb_arg) 406 { 407 struct spdk_sock_group_impl *group_impl = NULL; 408 int rc, placement_id = 0; 409 410 if (cb_fn == NULL) { 411 errno = EINVAL; 412 return -1; 413 } 414 415 if (sock->group_impl != NULL) { 416 /* 417 * This sock is already part of a sock_group. Currently we don't 418 * support this. 419 */ 420 errno = EBUSY; 421 return -1; 422 } 423 424 rc = sock->net_impl->get_placement_id(sock, &placement_id); 425 if (!rc && (placement_id != 0)) { 426 rc = spdk_sock_map_insert(placement_id, group); 427 if (rc < 0) { 428 return -1; 429 } 430 } 431 432 STAILQ_FOREACH_FROM(group_impl, &group->group_impls, link) { 433 if (sock->net_impl == group_impl->net_impl) { 434 break; 435 } 436 } 437 438 if (group_impl == NULL) { 439 errno = EINVAL; 440 return -1; 441 } 442 443 rc = group_impl->net_impl->group_impl_add_sock(group_impl, sock); 444 if (rc == 0) { 445 TAILQ_INSERT_TAIL(&group_impl->socks, sock, link); 446 sock->group_impl = group_impl; 447 sock->cb_fn = cb_fn; 448 sock->cb_arg = cb_arg; 449 } 450 451 return rc; 452 } 453 454 int 455 spdk_sock_group_remove_sock(struct spdk_sock_group *group, struct spdk_sock *sock) 456 { 457 struct spdk_sock_group_impl *group_impl = NULL; 458 int rc, placement_id = 0; 459 460 STAILQ_FOREACH_FROM(group_impl, &group->group_impls, link) { 461 if (sock->net_impl == group_impl->net_impl) { 462 break; 463 } 464 } 465 466 if (group_impl == NULL) { 467 errno = EINVAL; 468 return -1; 469 } 470 471 assert(group_impl == sock->group_impl); 472 473 rc = sock->net_impl->get_placement_id(sock, &placement_id); 474 if (!rc && (placement_id != 0)) { 475 spdk_sock_map_release(placement_id); 476 } 477 478 rc = group_impl->net_impl->group_impl_remove_sock(group_impl, sock); 479 if (rc == 0) { 480 TAILQ_REMOVE(&group_impl->socks, sock, link); 481 sock->group_impl = NULL; 482 sock->cb_fn = NULL; 483 sock->cb_arg = NULL; 484 } 485 486 return rc; 487 } 488 489 int 490 spdk_sock_group_poll(struct spdk_sock_group *group) 491 { 492 return spdk_sock_group_poll_count(group, MAX_EVENTS_PER_POLL); 493 } 494 495 static int 496 spdk_sock_group_impl_poll_count(struct spdk_sock_group_impl *group_impl, 497 struct spdk_sock_group *group, 498 int max_events) 499 { 500 struct spdk_sock *socks[MAX_EVENTS_PER_POLL]; 501 int num_events, i; 502 503 if (TAILQ_EMPTY(&group_impl->socks)) { 504 return 0; 505 } 506 507 num_events = group_impl->net_impl->group_impl_poll(group_impl, max_events, socks); 508 if (num_events == -1) { 509 return -1; 510 } 511 512 for (i = 0; i < num_events; i++) { 513 struct spdk_sock *sock = socks[i]; 514 515 assert(sock->cb_fn != NULL); 516 sock->cb_fn(sock->cb_arg, group, sock); 517 } 518 return num_events; 519 } 520 521 int 522 spdk_sock_group_poll_count(struct spdk_sock_group *group, int max_events) 523 { 524 struct spdk_sock_group_impl *group_impl = NULL; 525 int rc, num_events = 0; 526 527 if (max_events < 1) { 528 errno = -EINVAL; 529 return -1; 530 } 531 532 /* 533 * Only poll for up to 32 events at a time - if more events are pending, 534 * the next call to this function will reap them. 535 */ 536 if (max_events > MAX_EVENTS_PER_POLL) { 537 max_events = MAX_EVENTS_PER_POLL; 538 } 539 540 STAILQ_FOREACH_FROM(group_impl, &group->group_impls, link) { 541 rc = spdk_sock_group_impl_poll_count(group_impl, group, max_events); 542 if (rc < 0) { 543 num_events = -1; 544 SPDK_ERRLOG("group_impl_poll_count for net(%s) failed\n", 545 group_impl->net_impl->name); 546 } else if (num_events >= 0) { 547 num_events += rc; 548 } 549 } 550 551 return num_events; 552 } 553 554 int 555 spdk_sock_group_close(struct spdk_sock_group **group) 556 { 557 struct spdk_sock_group_impl *group_impl = NULL, *tmp; 558 int rc; 559 560 if (*group == NULL) { 561 errno = EBADF; 562 return -1; 563 } 564 565 STAILQ_FOREACH_SAFE(group_impl, &(*group)->group_impls, link, tmp) { 566 if (!TAILQ_EMPTY(&group_impl->socks)) { 567 errno = EBUSY; 568 return -1; 569 } 570 } 571 572 STAILQ_FOREACH_SAFE(group_impl, &(*group)->group_impls, link, tmp) { 573 rc = group_impl->net_impl->group_impl_close(group_impl); 574 if (rc != 0) { 575 SPDK_ERRLOG("group_impl_close for net(%s) failed\n", 576 group_impl->net_impl->name); 577 } 578 } 579 580 spdk_sock_remove_sock_group_from_map_table(*group); 581 free(*group); 582 *group = NULL; 583 584 return 0; 585 } 586 587 void 588 spdk_net_impl_register(struct spdk_net_impl *impl) 589 { 590 if (!strcmp("posix", impl->name)) { 591 STAILQ_INSERT_TAIL(&g_net_impls, impl, link); 592 } else { 593 STAILQ_INSERT_HEAD(&g_net_impls, impl, link); 594 } 595 } 596