1 /* $OpenBSD: ioev.c,v 1.3 2012/05/25 13:52:33 chl Exp $ */ 2 /* 3 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> 4 * 5 * Permission to use, copy, modify, and distribute this software for any 6 * purpose with or without fee is hereby granted, provided that the above 7 * copyright notice and this permission notice appear in all copies. 8 * 9 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 10 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 11 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 12 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 13 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 14 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 15 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 16 */ 17 18 #include <sys/param.h> 19 #include <sys/queue.h> 20 #include <sys/socket.h> 21 22 #include <err.h> 23 #include <errno.h> 24 #include <fcntl.h> 25 #include <inttypes.h> 26 #include <stdlib.h> 27 #include <string.h> 28 #include <stdio.h> 29 #include <unistd.h> 30 31 #include "ioev.h" 32 #include "iobuf.h" 33 34 #ifdef IO_SSL 35 # include <openssl/ssl.h> 36 #endif 37 38 enum { 39 IO_STATE_NONE, 40 IO_STATE_CONNECT, 41 IO_STATE_CONNECT_SSL, 42 IO_STATE_ACCEPT_SSL, 43 IO_STATE_UP, 44 45 IO_STATE_MAX, 46 }; 47 48 const char* io_strflags(int); 49 const char* io_evstr(short); 50 51 void _io_init(void); 52 void io_hold(struct io *); 53 void io_release(struct io *); 54 void io_callback(struct io*, int); 55 void io_dispatch(int, short, void *); 56 void io_dispatch_connect(int, short, void *); 57 size_t io_pending(struct io *); 58 size_t io_queued(struct io*); 59 void io_reset(struct io *, short, void (*)(int, short, void*)); 60 void io_frame_enter(const char *, struct io *, int); 61 void io_frame_leave(struct io *); 62 63 #ifdef IO_SSL 64 void ssl_error(const char *); /* XXX external */ 65 66 void io_dispatch_accept_ssl(int, short, void *); 67 void io_dispatch_connect_ssl(int, short, void *); 68 void io_dispatch_read_ssl(int, short, void *); 69 void io_dispatch_write_ssl(int, short, void *); 70 void io_reload_ssl(struct io *io); 71 #endif 72 73 static struct io *current = NULL; 74 static uint64_t frame = 0; 75 static int _io_debug = 0; 76 77 #define io_debug(args...) do { if (_io_debug) printf(args); } while(0) 78 79 80 const char* 81 io_strio(struct io *io) 82 { 83 static char buf[128]; 84 85 snprintf(buf, sizeof buf, "<io:%p fd=%i to=%i fl=%s ib=%zu ob=%zu>", 86 io, io->sock, io->timeout, io_strflags(io->flags), 87 io_pending(io), io_queued(io)); 88 return (buf); 89 } 90 91 #define CASE(x) case x : return #x 92 93 const char* 94 io_strevent(int evt) 95 { 96 static char buf[32]; 97 98 switch(evt) { 99 CASE(IO_CONNECTED); 100 CASE(IO_TLSREADY); 101 CASE(IO_DATAIN); 102 CASE(IO_LOWAT); 103 CASE(IO_DISCONNECTED); 104 CASE(IO_TIMEOUT); 105 CASE(IO_ERROR); 106 default: 107 snprintf(buf, sizeof(buf), "IO_? %i", evt); 108 return buf; 109 } 110 } 111 112 void 113 io_set_blocking(int fd, int blocking) 114 { 115 int flags; 116 117 if ((flags = fcntl(fd, F_GETFL, 0)) == -1) 118 err(1, "io_set_blocking:fcntl(F_GETFL)"); 119 120 if (blocking) 121 flags &= ~O_NONBLOCK; 122 else 123 flags |= O_NONBLOCK; 124 125 if ((flags = fcntl(fd, F_SETFL, flags)) == -1) 126 err(1, "io_set_blocking:fcntl(F_SETFL)"); 127 } 128 129 void 130 io_set_linger(int fd, int linger) 131 { 132 struct linger l; 133 134 bzero(&l, sizeof(l)); 135 l.l_onoff = linger ? 1 : 0; 136 l.l_linger = linger; 137 if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, sizeof(l)) == -1) 138 err(1, "io_set_linger:setsockopt()"); 139 } 140 141 /* 142 * Event framing must not rely on an io pointer to refer to the "same" io 143 * throughout the frame, beacuse this is not always the case: 144 * 145 * 1) enter(addr0) -> free(addr0) -> leave(addr0) = SEGV 146 * 2) enter(addr0) -> free(addr0) -> malloc == addr0 -> leave(addr0) = BAD! 147 * 148 * In both case, the problem is that the io is freed in the callback, so 149 * the pointer becomes invalid. If that happens, the user is required to 150 * call io_clear, so we can adapt the frame state there. 151 */ 152 void 153 io_frame_enter(const char *where, struct io *io, int ev) 154 { 155 io_debug("\n=== %" PRIu64 " ===\n" 156 "io_frame_enter(%s, %s, %s)\n", 157 frame, where, io_evstr(ev), io_strio(io)); 158 159 if (current) 160 errx(1, "io_frame_enter: interleaved frames"); 161 162 current = io; 163 164 io_hold(io); 165 } 166 167 void 168 io_frame_leave(struct io *io) 169 { 170 io_debug("io_frame_leave(%" PRIu64 ")\n", frame); 171 172 if (current && current != io) 173 errx(1, "io_frame_leave: io mismatch"); 174 175 /* io has been cleared */ 176 if (current == NULL) 177 goto done; 178 179 /* TODO: There is a possible optimization there: 180 * In a typical half-duplex request/response scenario, 181 * the io is waiting to read a request, and when done, it queues 182 * the response in the output buffer and goes to write mode. 183 * There, the write event is set and will be triggered in the next 184 * event frame. In most case, the write call could be done 185 * immediatly as part of the last read frame, thus avoiding to go 186 * through the event loop machinery. So, as an optimisation, we 187 * could detect that case here and force an event dispatching. 188 */ 189 190 /* Reload the io if it has not been reset already. */ 191 io_release(io); 192 current = NULL; 193 done: 194 io_debug("=== /%" PRIu64 "\n", frame); 195 196 frame += 1; 197 } 198 199 void 200 _io_init() 201 { 202 static int init = 0; 203 204 if (init) 205 return; 206 207 init = 1; 208 _io_debug = getenv("IO_DEBUG") != NULL; 209 } 210 211 void 212 io_init(struct io *io, int sock, void *arg, 213 void(*cb)(struct io*, int), struct iobuf *iobuf) 214 { 215 _io_init(); 216 217 memset(io, 0, sizeof *io); 218 219 io->sock = sock; 220 io->timeout = -1; 221 io->arg = arg; 222 io->iobuf = iobuf; 223 io->cb = cb; 224 225 if (sock != -1) 226 io_reload(io); 227 } 228 229 void 230 io_clear(struct io *io) 231 { 232 io_debug("io_clear(%p)\n", io); 233 234 /* the current io is virtually dead */ 235 if (io == current) 236 current = NULL; 237 238 #ifdef IO_SSL 239 if (io->ssl) { 240 SSL_free(io->ssl); 241 io->ssl = NULL; 242 } 243 #endif 244 245 event_del(&io->ev); 246 if (io->sock != -1) { 247 close(io->sock); 248 io->sock = -1; 249 } 250 } 251 252 void 253 io_hold(struct io *io) 254 { 255 io_debug("io_enter(%p)\n", io); 256 257 if (io->flags & IO_HELD) 258 errx(1, "io_hold: io is already held"); 259 260 io->flags &= ~IO_RESET; 261 io->flags |= IO_HELD; 262 } 263 264 void 265 io_release(struct io *io) 266 { 267 if (!(io->flags & IO_HELD)) 268 errx(1, "io_release: io is not held"); 269 270 io->flags &= ~IO_HELD; 271 if (!(io->flags & IO_RESET)) 272 io_reload(io); 273 } 274 275 void 276 io_set_timeout(struct io *io, int msec) 277 { 278 io_debug("io_set_timeout(%p, %i)\n", io, msec); 279 280 io->timeout = msec; 281 } 282 283 void 284 io_set_lowat(struct io *io, size_t lowat) 285 { 286 io_debug("io_set_lowat(%p, %zu)\n", io, lowat); 287 288 io->lowat = lowat; 289 } 290 291 void 292 io_pause(struct io *io, int dir) 293 { 294 io_debug("io_pause(%p, %x)\n", io, dir); 295 296 io->flags |= dir & (IO_PAUSE_IN | IO_PAUSE_OUT); 297 io_reload(io); 298 } 299 300 void 301 io_resume(struct io *io, int dir) 302 { 303 io_debug("io_resume(%p, %x)\n", io, dir); 304 305 io->flags &= ~(dir & (IO_PAUSE_IN | IO_PAUSE_OUT)); 306 io_reload(io); 307 } 308 309 void 310 io_set_read(struct io *io) 311 { 312 int mode; 313 314 io_debug("io_set_read(%p)\n", io); 315 316 mode = io->flags & IO_RW; 317 if (!(mode == 0 || mode == IO_WRITE)) 318 errx(1, "io_set_read(): full-duplex or reading"); 319 320 io->flags &= ~IO_RW; 321 io->flags |= IO_READ; 322 io_reload(io); 323 } 324 325 void 326 io_set_write(struct io *io) 327 { 328 int mode; 329 330 io_debug("io_set_write(%p)\n", io); 331 332 mode = io->flags & IO_RW; 333 if (!(mode == 0 || mode == IO_READ)) 334 errx(1, "io_set_write(): full-duplex or writing"); 335 336 io->flags &= ~IO_RW; 337 io->flags |= IO_WRITE; 338 io_reload(io); 339 } 340 341 #define IO_READING(io) (((io)->flags & IO_RW) != IO_WRITE) 342 #define IO_WRITING(io) (((io)->flags & IO_RW) != IO_READ) 343 344 /* 345 * Setup the necessary events as required by the current io state, 346 * honouring duplex mode and i/o pauses. 347 */ 348 void 349 io_reload(struct io *io) 350 { 351 short events; 352 353 /* io will be reloaded at release time */ 354 if (io->flags & IO_HELD) 355 return; 356 357 #ifdef IO_SSL 358 if (io->ssl) { 359 io_reload_ssl(io); 360 return; 361 } 362 #endif 363 364 io_debug("io_reload(%p)\n", io); 365 366 events = 0; 367 if (IO_READING(io) && !(io->flags & IO_PAUSE_IN)) 368 events = EV_READ; 369 if (IO_WRITING(io) && !(io->flags & IO_PAUSE_OUT) && io_queued(io)) 370 events |= EV_WRITE; 371 372 io_reset(io, events, io_dispatch); 373 } 374 375 /* Set the requested event. */ 376 void 377 io_reset(struct io *io, short events, void (*dispatch)(int, short, void*)) 378 { 379 struct timeval tv, *ptv; 380 381 io_debug("io_reset(%p, %s, %p) -> %s\n", 382 io, io_evstr(events), dispatch, io_strio(io)); 383 384 /* 385 * Indicate that the event has already been reset so that reload 386 * is not called on frame_leave. 387 */ 388 io->flags |= IO_RESET; 389 390 event_del(&io->ev); 391 392 /* 393 * The io is paused by the user, so we don't want the timeout to be 394 * effective. 395 */ 396 if (events == 0) 397 return; 398 399 event_set(&io->ev, io->sock, events, dispatch, io); 400 if (io->timeout >= 0) { 401 tv.tv_sec = io->timeout / 1000; 402 tv.tv_usec = (io->timeout % 1000) * 1000; 403 ptv = &tv; 404 } else 405 ptv = NULL; 406 407 event_add(&io->ev, ptv); 408 } 409 410 size_t 411 io_pending(struct io *io) 412 { 413 return iobuf_len(io->iobuf); 414 } 415 416 size_t 417 io_queued(struct io *io) 418 { 419 return iobuf_queued(io->iobuf); 420 } 421 422 const char* 423 io_strflags(int flags) 424 { 425 static char buf[64]; 426 427 buf[0] = '\0'; 428 429 switch(flags & IO_RW) { 430 case 0: 431 strlcat(buf, "rw", sizeof buf); 432 break; 433 case IO_READ: 434 strlcat(buf, "R", sizeof buf); 435 break; 436 case IO_WRITE: 437 strlcat(buf, "W", sizeof buf); 438 break; 439 case IO_RW: 440 strlcat(buf, "RW", sizeof buf); 441 break; 442 } 443 444 if (flags & IO_PAUSE_IN) 445 strlcat(buf, ",F_PI", sizeof buf); 446 if (flags & IO_PAUSE_OUT) 447 strlcat(buf, ",F_PO", sizeof buf); 448 449 return buf; 450 } 451 452 const char* 453 io_evstr(short ev) 454 { 455 static char buf[64]; 456 char buf2[16]; 457 int n; 458 459 n = 0; 460 buf[0] = '\0'; 461 462 if (ev == 0) { 463 strlcat(buf, "<NONE>", sizeof(buf)); 464 return buf; 465 } 466 467 if (ev & EV_TIMEOUT) { 468 if (n) 469 strlcat(buf, "|", sizeof(buf)); 470 strlcat(buf, "EV_TIMEOUT", sizeof(buf)); 471 ev &= ~EV_TIMEOUT; 472 n++; 473 } 474 475 if (ev & EV_READ) { 476 if (n) 477 strlcat(buf, "|", sizeof(buf)); 478 strlcat(buf, "EV_READ", sizeof(buf)); 479 ev &= ~EV_READ; 480 n++; 481 } 482 483 if (ev & EV_WRITE) { 484 if (n) 485 strlcat(buf, "|", sizeof(buf)); 486 strlcat(buf, "EV_WRITE", sizeof(buf)); 487 ev &= ~EV_WRITE; 488 n++; 489 } 490 491 if (ev & EV_SIGNAL) { 492 if (n) 493 strlcat(buf, "|", sizeof(buf)); 494 strlcat(buf, "EV_SIGNAL", sizeof(buf)); 495 ev &= ~EV_SIGNAL; 496 n++; 497 } 498 499 if (ev) { 500 if (n) 501 strlcat(buf, "|", sizeof(buf)); 502 strlcat(buf, "EV_?=0x", sizeof(buf)); 503 snprintf(buf2, sizeof(buf2), "%hx", ev); 504 strlcat(buf, buf2, sizeof(buf)); 505 } 506 507 return buf; 508 } 509 510 void 511 io_dispatch(int fd, short ev, void *humppa) 512 { 513 struct io *io = humppa; 514 size_t w; 515 ssize_t n; 516 517 io_frame_enter("io_dispatch", io, ev); 518 519 if (ev == EV_TIMEOUT) { 520 io_callback(io, IO_TIMEOUT); 521 goto leave; 522 } 523 524 if (ev & EV_WRITE && (w = io_queued(io))) { 525 if ((n = iobuf_write(io->iobuf, io->sock)) < 0) { 526 io_callback(io, n == IOBUF_CLOSED ? 527 IO_DISCONNECTED : IO_ERROR); 528 goto leave; 529 } 530 if (w > io->lowat && w - n <= io->lowat) 531 io_callback(io, IO_LOWAT); 532 } 533 534 if (ev & EV_READ) { 535 if ((n = iobuf_read(io->iobuf, io->sock)) < 0) { 536 io_callback(io, n == IOBUF_CLOSED ? 537 IO_DISCONNECTED : IO_ERROR); 538 goto leave; 539 } 540 if (n) 541 io_callback(io, IO_DATAIN); 542 } 543 544 leave: 545 io_frame_leave(io); 546 } 547 548 void 549 io_callback(struct io *io, int evt) 550 { 551 io->cb(io, evt); 552 } 553 554 int 555 io_connect(struct io *io, const struct sockaddr *sa) 556 { 557 int sock, errno_save; 558 559 if ((sock = socket(sa->sa_family, SOCK_STREAM, 0)) == -1) 560 goto fail; 561 562 io_set_blocking(sock, 0); 563 io_set_linger(sock, 0); 564 565 if (connect(sock, sa, sa->sa_len) == -1) 566 if (errno != EINPROGRESS) 567 goto fail; 568 569 io->sock = sock; 570 io_reset(io, EV_WRITE, io_dispatch_connect); 571 572 return (sock); 573 574 fail: 575 if (sock != -1) { 576 errno_save = errno; 577 close(sock); 578 errno = errno_save; 579 } 580 return (-1); 581 } 582 583 void 584 io_dispatch_connect(int fd, short ev, void *humppa) 585 { 586 struct io *io = humppa; 587 588 io_frame_enter("io_dispatch_connect", io, ev); 589 590 if (ev == EV_TIMEOUT) { 591 close(fd); 592 io->sock = -1; 593 io_callback(io, IO_TIMEOUT); 594 } else { 595 io->state = IO_STATE_UP; 596 io_callback(io, IO_CONNECTED); 597 } 598 599 io_frame_leave(io); 600 } 601 602 #ifdef IO_SSL 603 604 int 605 io_start_tls(struct io *io, void *ssl) 606 { 607 int mode; 608 609 mode = io->flags & IO_RW; 610 if (mode == 0 || mode == IO_RW) 611 errx(1, "io_start_tls(): full-duplex or unset"); 612 613 if (io->ssl) 614 errx(1, "io_start_tls(): SSL already started"); 615 io->ssl = ssl; 616 617 if (SSL_set_fd(io->ssl, io->sock) == 0) { 618 ssl_error("io_start_ssl:SSL_set_fd"); 619 return (-1); 620 } 621 622 if (mode == IO_WRITE) { 623 io->state = IO_STATE_CONNECT_SSL; 624 SSL_set_connect_state(io->ssl); 625 io_reset(io, EV_READ | EV_WRITE, io_dispatch_connect_ssl); 626 } else { 627 io->state = IO_STATE_ACCEPT_SSL; 628 SSL_set_accept_state(io->ssl); 629 io_reset(io, EV_READ | EV_WRITE, io_dispatch_accept_ssl); 630 } 631 632 return (0); 633 } 634 635 void 636 io_dispatch_accept_ssl(int fd, short event, void *humppa) 637 { 638 struct io *io = humppa; 639 int e, ret; 640 641 io_frame_enter("io_dispatch_accept_ssl", io, event); 642 643 if (event == EV_TIMEOUT) { 644 io_callback(io, IO_TIMEOUT); 645 goto leave; 646 } 647 648 if ((ret = SSL_accept(io->ssl)) > 0) { 649 io->state = IO_STATE_UP; 650 io_callback(io, IO_TLSREADY); 651 goto leave; 652 } 653 654 switch ((e = SSL_get_error(io->ssl, ret))) { 655 case SSL_ERROR_WANT_READ: 656 io_reset(io, EV_READ, io_dispatch_accept_ssl); 657 break; 658 case SSL_ERROR_WANT_WRITE: 659 io_reset(io, EV_WRITE, io_dispatch_accept_ssl); 660 break; 661 default: 662 ssl_error("io_dispatch_accept_ssl:SSL_accept"); 663 io_callback(io, IO_ERROR); 664 break; 665 } 666 667 leave: 668 io_frame_leave(io); 669 } 670 671 void 672 io_dispatch_connect_ssl(int fd, short event, void *humppa) 673 { 674 struct io *io = humppa; 675 int e, ret; 676 677 io_frame_enter("io_dispatch_connect_ssl", io, event); 678 679 if (event == EV_TIMEOUT) { 680 io_callback(io, IO_TIMEOUT); 681 goto leave; 682 } 683 684 if ((ret = SSL_connect(io->ssl)) > 0) { 685 io->state = IO_STATE_UP; 686 io_callback(io, IO_TLSREADY); 687 goto leave; 688 } 689 690 switch ((e = SSL_get_error(io->ssl, ret))) { 691 case SSL_ERROR_WANT_READ: 692 io_reset(io, EV_READ, io_dispatch_connect_ssl); 693 break; 694 case SSL_ERROR_WANT_WRITE: 695 io_reset(io, EV_WRITE, io_dispatch_connect_ssl); 696 break; 697 default: 698 io_callback(io, IO_ERROR); 699 break; 700 } 701 702 leave: 703 io_frame_leave(io); 704 } 705 706 void 707 io_dispatch_read_ssl(int fd, short event, void *humppa) 708 { 709 struct io *io = humppa; 710 int n; 711 712 io_frame_enter("io_dispatch_read_ssl", io, event); 713 714 if (event == EV_TIMEOUT) { 715 io_callback(io, IO_TIMEOUT); 716 goto leave; 717 } 718 719 switch ((n = iobuf_read_ssl(io->iobuf, (SSL*)io->ssl))) { 720 case IOBUF_WANT_READ: 721 io_reset(io, EV_READ, io_dispatch_read_ssl); 722 break; 723 case IOBUF_WANT_WRITE: 724 io_reset(io, EV_WRITE, io_dispatch_read_ssl); 725 break; 726 case IOBUF_CLOSED: 727 io_callback(io, IO_DISCONNECTED); 728 break; 729 case IOBUF_ERROR: 730 io_callback(io, IO_ERROR); 731 break; 732 default: 733 io_debug("io_dispatch_read_ssl(...) -> r=%i\n", n); 734 io_callback(io, IO_DATAIN); 735 } 736 737 leave: 738 io_frame_leave(io); 739 } 740 741 void 742 io_dispatch_write_ssl(int fd, short event, void *humppa) 743 { 744 struct io *io = humppa; 745 int n; 746 size_t w2, w; 747 748 io_frame_enter("io_dispatch_write_ssl", io, event); 749 750 if (event == EV_TIMEOUT) { 751 io_callback(io, IO_TIMEOUT); 752 goto leave; 753 } 754 755 w = io_queued(io); 756 switch ((n = iobuf_write_ssl(io->iobuf, (SSL*)io->ssl))) { 757 case IOBUF_WANT_READ: 758 io_reset(io, EV_READ, io_dispatch_write_ssl); 759 break; 760 case IOBUF_WANT_WRITE: 761 io_reset(io, EV_WRITE, io_dispatch_write_ssl); 762 break; 763 case IOBUF_CLOSED: 764 io_callback(io, IO_DISCONNECTED); 765 break; 766 case IOBUF_ERROR: 767 io_callback(io, IO_ERROR); 768 break; 769 default: 770 io_debug("io_dispatch_write_ssl(...) -> w=%i\n", n); 771 w2 = io_queued(io); 772 if (w > io->lowat && w2 <= io->lowat) 773 io_callback(io, IO_LOWAT); 774 break; 775 } 776 777 leave: 778 io_frame_leave(io); 779 } 780 781 void 782 io_reload_ssl(struct io *io) 783 { 784 void (*dispatch)(int, short, void*) = NULL; 785 786 switch(io->state) { 787 case IO_STATE_CONNECT_SSL: 788 dispatch = io_dispatch_connect_ssl; 789 break; 790 case IO_STATE_ACCEPT_SSL: 791 dispatch = io_dispatch_accept_ssl; 792 break; 793 case IO_STATE_UP: 794 if ((io->flags & IO_RW) == IO_READ) 795 dispatch = io_dispatch_read_ssl; 796 else { 797 if (io_queued(io) == 0) 798 return; /* nothing to write */ 799 dispatch = io_dispatch_write_ssl; 800 } 801 break; 802 default: 803 errx(1, "io_reload_ssl(): bad state"); 804 } 805 806 io_reset(io, EV_READ | EV_WRITE, dispatch); 807 } 808 809 #endif /* IO_SSL */ 810