1 /* 2 * testcode/delayer.c - debug program that delays queries to a server. 3 * 4 * Copyright (c) 2008, NLnet Labs. All rights reserved. 5 * 6 * This software is open source. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * Redistributions of source code must retain the above copyright notice, 13 * this list of conditions and the following disclaimer. 14 * 15 * Redistributions in binary form must reproduce the above copyright notice, 16 * this list of conditions and the following disclaimer in the documentation 17 * and/or other materials provided with the distribution. 18 * 19 * Neither the name of the NLNET LABS nor the names of its contributors may 20 * be used to endorse or promote products derived from this software without 21 * specific prior written permission. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 34 */ 35 36 /** 37 * \file 38 * 39 * This program delays queries made. It performs as a proxy to another 40 * server and delays queries to it. 41 */ 42 43 #include "config.h" 44 #ifdef HAVE_GETOPT_H 45 #include <getopt.h> 46 #endif 47 #ifdef HAVE_TIME_H 48 #include <time.h> 49 #endif 50 #include <sys/time.h> 51 #include "util/net_help.h" 52 #include "util/config_file.h" 53 #include "sldns/sbuffer.h" 54 #include <signal.h> 55 56 /** number of reads per select for delayer */ 57 #define TRIES_PER_SELECT 100 58 59 /** 60 * The ring buffer 61 */ 62 struct ringbuf { 63 /** base of buffer */ 64 uint8_t* buf; 65 /** size of buffer */ 66 size_t size; 67 /** low mark, items start here */ 68 size_t low; 69 /** high mark, items end here */ 70 size_t high; 71 }; 72 73 /** 74 * List of proxy fds that return replies from the server to our clients. 75 */ 76 struct proxy { 77 /** the fd to listen for replies from server */ 78 int s; 79 /** last time this was used */ 80 struct timeval lastuse; 81 /** remote address */ 82 struct sockaddr_storage addr; 83 /** length of addr */ 84 socklen_t addr_len; 85 /** number of queries waiting (in total) */ 86 size_t numwait; 87 /** number of queries sent to server (in total) */ 88 size_t numsent; 89 /** numberof answers returned to client (in total) */ 90 size_t numreturn; 91 /** how many times repurposed */ 92 size_t numreuse; 93 /** next in proxylist */ 94 struct proxy* next; 95 }; 96 97 /** 98 * An item that has to be TCP relayed 99 */ 100 struct tcp_send_list { 101 /** the data item */ 102 uint8_t* item; 103 /** size of item */ 104 size_t len; 105 /** time when the item can be transmitted on */ 106 struct timeval wait; 107 /** how much of the item has already been transmitted */ 108 size_t done; 109 /** next in list */ 110 struct tcp_send_list* next; 111 }; 112 113 /** 114 * List of TCP proxy fd pairs to TCP connect client to server 115 */ 116 struct tcp_proxy { 117 /** the fd to listen for client query */ 118 int client_s; 119 /** the fd to listen for server answer */ 120 int server_s; 121 122 /** remote client address */ 123 struct sockaddr_storage addr; 124 /** length of address */ 125 socklen_t addr_len; 126 /** timeout on this entry */ 127 struct timeval timeout; 128 129 /** list of query items to send to server */ 130 struct tcp_send_list* querylist; 131 /** last in query list */ 132 struct tcp_send_list* querylast; 133 /** list of answer items to send to client */ 134 struct tcp_send_list* answerlist; 135 /** last in answerlist */ 136 struct tcp_send_list* answerlast; 137 138 /** next in list */ 139 struct tcp_proxy* next; 140 }; 141 142 /** usage information for delayer */ 143 static void usage(char* argv[]) 144 { 145 printf("usage: %s [options]\n", argv[0]); 146 printf(" -f addr : use addr, forward to that server, @port.\n"); 147 printf(" -b addr : bind to this address to listen.\n"); 148 printf(" -p port : bind to this port (use 0 for random).\n"); 149 printf(" -m mem : use this much memory for waiting queries.\n"); 150 printf(" -d delay: UDP queries are delayed n milliseconds.\n"); 151 printf(" TCP is delayed twice (on send, on recv).\n"); 152 printf(" -h : this help message\n"); 153 exit(1); 154 } 155 156 /** timeval compare, t1 < t2 */ 157 static int 158 dl_tv_smaller(struct timeval* t1, const struct timeval* t2) 159 { 160 #ifndef S_SPLINT_S 161 if(t1->tv_sec < t2->tv_sec) 162 return 1; 163 if(t1->tv_sec == t2->tv_sec && 164 t1->tv_usec < t2->tv_usec) 165 return 1; 166 #endif 167 return 0; 168 } 169 170 /** timeval add, t1 += t2 */ 171 static void 172 dl_tv_add(struct timeval* t1, const struct timeval* t2) 173 { 174 #ifndef S_SPLINT_S 175 t1->tv_sec += t2->tv_sec; 176 t1->tv_usec += t2->tv_usec; 177 while(t1->tv_usec >= 1000000) { 178 t1->tv_usec -= 1000000; 179 t1->tv_sec++; 180 } 181 #endif 182 } 183 184 /** timeval subtract, t1 -= t2 */ 185 static void 186 dl_tv_subtract(struct timeval* t1, const struct timeval* t2) 187 { 188 #ifndef S_SPLINT_S 189 t1->tv_sec -= t2->tv_sec; 190 if(t1->tv_usec >= t2->tv_usec) { 191 t1->tv_usec -= t2->tv_usec; 192 } else { 193 t1->tv_sec--; 194 t1->tv_usec = 1000000-(t2->tv_usec-t1->tv_usec); 195 } 196 #endif 197 } 198 199 200 /** create new ring buffer */ 201 static struct ringbuf* 202 ring_create(size_t sz) 203 { 204 struct ringbuf* r = (struct ringbuf*)calloc(1, sizeof(*r)); 205 if(!r) fatal_exit("out of memory"); 206 r->buf = (uint8_t*)malloc(sz); 207 if(!r->buf) fatal_exit("out of memory"); 208 r->size = sz; 209 r->low = 0; 210 r->high = 0; 211 return r; 212 } 213 214 /** delete ring buffer */ 215 static void 216 ring_delete(struct ringbuf* r) 217 { 218 if(!r) return; 219 free(r->buf); 220 free(r); 221 } 222 223 /** add entry to ringbuffer */ 224 static void 225 ring_add(struct ringbuf* r, sldns_buffer* pkt, struct timeval* now, 226 struct timeval* delay, struct proxy* p) 227 { 228 /* time -- proxy* -- 16bitlen -- message */ 229 uint16_t len = (uint16_t)sldns_buffer_limit(pkt); 230 struct timeval when; 231 size_t needed; 232 uint8_t* where = NULL; 233 log_assert(sldns_buffer_limit(pkt) <= 65535); 234 needed = sizeof(when) + sizeof(p) + sizeof(len) + len; 235 /* put item into ringbuffer */ 236 if(r->low < r->high) { 237 /* used part is in the middle */ 238 if(r->size - r->high >= needed) { 239 where = r->buf + r->high; 240 r->high += needed; 241 } else if(r->low > needed) { 242 /* wrap around ringbuffer */ 243 /* make sure r->low == r->high means empty */ 244 /* so r->low == r->high cannot be used to signify 245 * a completely full ringbuf */ 246 if(r->size - r->high > sizeof(when)+sizeof(p)) { 247 /* zero entry at end of buffer */ 248 memset(r->buf+r->high, 0, 249 sizeof(when)+sizeof(p)); 250 } 251 where = r->buf; 252 r->high = needed; 253 } else { 254 /* drop message */ 255 log_warn("warning: mem full, dropped message"); 256 return; 257 } 258 } else { 259 /* empty */ 260 if(r->high == r->low) { 261 where = r->buf; 262 r->low = 0; 263 r->high = needed; 264 /* unused part is in the middle */ 265 /* so ringbuffer has wrapped around */ 266 } else if(r->low - r->high > needed) { 267 where = r->buf + r->high; 268 r->high += needed; 269 } else { 270 log_warn("warning: mem full, dropped message"); 271 return; 272 } 273 } 274 when = *now; 275 dl_tv_add(&when, delay); 276 /* copy it at where part */ 277 log_assert(where != NULL); 278 memmove(where, &when, sizeof(when)); 279 memmove(where+sizeof(when), &p, sizeof(p)); 280 memmove(where+sizeof(when)+sizeof(p), &len, sizeof(len)); 281 memmove(where+sizeof(when)+sizeof(p)+sizeof(len), 282 sldns_buffer_begin(pkt), len); 283 } 284 285 /** see if the ringbuffer is empty */ 286 static int 287 ring_empty(struct ringbuf* r) 288 { 289 return (r->low == r->high); 290 } 291 292 /** peek at timevalue for next item in ring */ 293 static struct timeval* 294 ring_peek_time(struct ringbuf* r) 295 { 296 if(ring_empty(r)) 297 return NULL; 298 return (struct timeval*)&r->buf[r->low]; 299 } 300 301 /** get entry from ringbuffer */ 302 static int 303 ring_pop(struct ringbuf* r, sldns_buffer* pkt, struct timeval* tv, 304 struct proxy** p) 305 { 306 /* time -- proxy* -- 16bitlen -- message */ 307 uint16_t len; 308 uint8_t* where = NULL; 309 size_t done; 310 if(r->low == r->high) 311 return 0; 312 where = r->buf + r->low; 313 memmove(tv, where, sizeof(*tv)); 314 memmove(p, where+sizeof(*tv), sizeof(*p)); 315 memmove(&len, where+sizeof(*tv)+sizeof(*p), sizeof(len)); 316 memmove(sldns_buffer_begin(pkt), 317 where+sizeof(*tv)+sizeof(*p)+sizeof(len), len); 318 sldns_buffer_set_limit(pkt, (size_t)len); 319 done = sizeof(*tv)+sizeof(*p)+sizeof(len)+len; 320 /* move lowmark */ 321 if(r->low < r->high) { 322 /* used part in middle */ 323 log_assert(r->high - r->low >= done); 324 r->low += done; 325 } else { 326 /* unused part in middle */ 327 log_assert(r->size - r->low >= done); 328 r->low += done; 329 if(r->size - r->low > sizeof(*tv)+sizeof(*p)) { 330 /* see if it is zeroed; means end of buffer */ 331 struct proxy* pz; 332 memmove(&pz, r->buf+r->low+sizeof(*tv), sizeof(pz)); 333 if(pz == NULL) 334 r->low = 0; 335 } else r->low = 0; 336 } 337 if(r->low == r->high) { 338 r->low = 0; /* reset if empty */ 339 r->high = 0; 340 } 341 return 1; 342 } 343 344 /** signal handler global info */ 345 static volatile int do_quit = 0; 346 347 /** signal handler for user quit */ 348 static RETSIGTYPE delayer_sigh(int sig) 349 { 350 char str[] = "exit on signal \n"; 351 str[15] = '0' + (sig/10)%10; 352 str[16] = '0' + sig%10; 353 /* simple cast to void will not silence Wunused-result */ 354 (void)!write(STDOUT_FILENO, str, strlen(str)); 355 do_quit = 1; 356 } 357 358 /** send out waiting packets */ 359 static void 360 service_send(struct ringbuf* ring, struct timeval* now, sldns_buffer* pkt, 361 struct sockaddr_storage* srv_addr, socklen_t srv_len) 362 { 363 struct proxy* p; 364 struct timeval tv; 365 ssize_t sent; 366 while(!ring_empty(ring) && 367 dl_tv_smaller(ring_peek_time(ring), now)) { 368 /* this items needs to be sent out */ 369 if(!ring_pop(ring, pkt, &tv, &p)) 370 fatal_exit("ringbuf error: pop failed"); 371 verbose(1, "send out query %d.%6.6d", 372 (unsigned)tv.tv_sec, (unsigned)tv.tv_usec); 373 log_addr(1, "from client", &p->addr, p->addr_len); 374 /* send it */ 375 sent = sendto(p->s, (void*)sldns_buffer_begin(pkt), 376 sldns_buffer_limit(pkt), 0, 377 (struct sockaddr*)srv_addr, srv_len); 378 if(sent == -1) { 379 log_err("sendto: %s", sock_strerror(errno)); 380 } else if(sent != (ssize_t)sldns_buffer_limit(pkt)) { 381 log_err("sendto: partial send"); 382 } 383 p->lastuse = *now; 384 p->numsent++; 385 } 386 } 387 388 /** do proxy for one readable client */ 389 static void 390 do_proxy(struct proxy* p, int retsock, sldns_buffer* pkt) 391 { 392 int i; 393 ssize_t r; 394 for(i=0; i<TRIES_PER_SELECT; i++) { 395 r = recv(p->s, (void*)sldns_buffer_begin(pkt), 396 sldns_buffer_capacity(pkt), 0); 397 if(r == -1) { 398 #ifndef USE_WINSOCK 399 if(errno == EAGAIN || errno == EINTR) 400 return; 401 #else 402 if(WSAGetLastError() == WSAEINPROGRESS || 403 WSAGetLastError() == WSAEWOULDBLOCK) 404 return; 405 #endif 406 log_err("recv: %s", sock_strerror(errno)); 407 return; 408 } 409 sldns_buffer_set_limit(pkt, (size_t)r); 410 log_addr(1, "return reply to client", &p->addr, p->addr_len); 411 /* send reply back to the real client */ 412 p->numreturn++; 413 r = sendto(retsock, (void*)sldns_buffer_begin(pkt), (size_t)r, 414 0, (struct sockaddr*)&p->addr, p->addr_len); 415 if(r == -1) { 416 log_err("sendto: %s", sock_strerror(errno)); 417 } 418 } 419 } 420 421 /** proxy return replies to clients */ 422 static void 423 service_proxy(fd_set* rset, int retsock, struct proxy* proxies, 424 sldns_buffer* pkt, struct timeval* now) 425 { 426 struct proxy* p; 427 for(p = proxies; p; p = p->next) { 428 if(FD_ISSET(p->s, rset)) { 429 p->lastuse = *now; 430 do_proxy(p, retsock, pkt); 431 } 432 } 433 } 434 435 /** find or else create proxy for this remote client */ 436 static struct proxy* 437 find_create_proxy(struct sockaddr_storage* from, socklen_t from_len, 438 fd_set* rorig, int* max, struct proxy** proxies, int serv_ip6, 439 struct timeval* now, struct timeval* reuse_timeout) 440 { 441 struct proxy* p; 442 struct timeval t; 443 for(p = *proxies; p; p = p->next) { 444 if(sockaddr_cmp(from, from_len, &p->addr, p->addr_len)==0) 445 return p; 446 } 447 /* possibly: reuse lapsed entries */ 448 for(p = *proxies; p; p = p->next) { 449 if(p->numwait > p->numsent || p->numsent > p->numreturn) 450 continue; 451 t = *now; 452 dl_tv_subtract(&t, &p->lastuse); 453 if(dl_tv_smaller(&t, reuse_timeout)) 454 continue; 455 /* yes! */ 456 verbose(1, "reuse existing entry"); 457 memmove(&p->addr, from, from_len); 458 p->addr_len = from_len; 459 p->numreuse++; 460 return p; 461 } 462 /* create new */ 463 p = (struct proxy*)calloc(1, sizeof(*p)); 464 if(!p) fatal_exit("out of memory"); 465 p->s = socket(serv_ip6?AF_INET6:AF_INET, SOCK_DGRAM, 0); 466 if(p->s == -1) { 467 fatal_exit("socket: %s", sock_strerror(errno)); 468 } 469 fd_set_nonblock(p->s); 470 memmove(&p->addr, from, from_len); 471 p->addr_len = from_len; 472 p->next = *proxies; 473 *proxies = p; 474 FD_SET(FD_SET_T p->s, rorig); 475 if(p->s+1 > *max) 476 *max = p->s+1; 477 return p; 478 } 479 480 /** recv new waiting packets */ 481 static void 482 service_recv(int s, struct ringbuf* ring, sldns_buffer* pkt, 483 fd_set* rorig, int* max, struct proxy** proxies, 484 struct sockaddr_storage* srv_addr, socklen_t srv_len, 485 struct timeval* now, struct timeval* delay, struct timeval* reuse) 486 { 487 int i; 488 struct sockaddr_storage from; 489 socklen_t from_len; 490 ssize_t len; 491 struct proxy* p; 492 for(i=0; i<TRIES_PER_SELECT; i++) { 493 from_len = (socklen_t)sizeof(from); 494 len = recvfrom(s, (void*)sldns_buffer_begin(pkt), 495 sldns_buffer_capacity(pkt), 0, 496 (struct sockaddr*)&from, &from_len); 497 if(len < 0) { 498 #ifndef USE_WINSOCK 499 if(errno == EAGAIN || errno == EINTR) 500 return; 501 #else 502 if(WSAGetLastError() == WSAEWOULDBLOCK || 503 WSAGetLastError() == WSAEINPROGRESS) 504 return; 505 #endif 506 fatal_exit("recvfrom: %s", sock_strerror(errno)); 507 } 508 sldns_buffer_set_limit(pkt, (size_t)len); 509 /* find its proxy element */ 510 p = find_create_proxy(&from, from_len, rorig, max, proxies, 511 addr_is_ip6(srv_addr, srv_len), now, reuse); 512 if(!p) fatal_exit("error: cannot find or create proxy"); 513 p->lastuse = *now; 514 ring_add(ring, pkt, now, delay, p); 515 p->numwait++; 516 log_addr(1, "recv from client", &p->addr, p->addr_len); 517 } 518 } 519 520 /** delete tcp proxy */ 521 static void 522 tcp_proxy_delete(struct tcp_proxy* p) 523 { 524 struct tcp_send_list* s, *sn; 525 if(!p) 526 return; 527 log_addr(1, "delete tcp proxy", &p->addr, p->addr_len); 528 s = p->querylist; 529 while(s) { 530 sn = s->next; 531 free(s->item); 532 free(s); 533 s = sn; 534 } 535 s = p->answerlist; 536 while(s) { 537 sn = s->next; 538 free(s->item); 539 free(s); 540 s = sn; 541 } 542 sock_close(p->client_s); 543 if(p->server_s != -1) 544 sock_close(p->server_s); 545 free(p); 546 } 547 548 /** accept new TCP connections, and set them up */ 549 static void 550 service_tcp_listen(int s, fd_set* rorig, int* max, struct tcp_proxy** proxies, 551 struct sockaddr_storage* srv_addr, socklen_t srv_len, 552 struct timeval* now, struct timeval* tcp_timeout) 553 { 554 int newfd; 555 struct sockaddr_storage addr; 556 struct tcp_proxy* p; 557 socklen_t addr_len; 558 newfd = accept(s, (struct sockaddr*)&addr, &addr_len); 559 if(newfd == -1) { 560 #ifndef USE_WINSOCK 561 if(errno == EAGAIN || errno == EINTR) 562 return; 563 #else 564 if(WSAGetLastError() == WSAEWOULDBLOCK || 565 WSAGetLastError() == WSAEINPROGRESS || 566 WSAGetLastError() == WSAECONNRESET) 567 return; 568 #endif 569 fatal_exit("accept: %s", sock_strerror(errno)); 570 } 571 p = (struct tcp_proxy*)calloc(1, sizeof(*p)); 572 if(!p) fatal_exit("out of memory"); 573 memmove(&p->addr, &addr, addr_len); 574 p->addr_len = addr_len; 575 log_addr(1, "new tcp proxy", &p->addr, p->addr_len); 576 p->client_s = newfd; 577 p->server_s = socket(addr_is_ip6(srv_addr, srv_len)?AF_INET6:AF_INET, 578 SOCK_STREAM, 0); 579 if(p->server_s == -1) { 580 fatal_exit("tcp socket: %s", sock_strerror(errno)); 581 } 582 fd_set_nonblock(p->client_s); 583 fd_set_nonblock(p->server_s); 584 if(connect(p->server_s, (struct sockaddr*)srv_addr, srv_len) == -1) { 585 #ifndef USE_WINSOCK 586 if(errno != EINPROGRESS) { 587 log_err("tcp connect: %s", strerror(errno)); 588 #else 589 if(WSAGetLastError() != WSAEWOULDBLOCK && 590 WSAGetLastError() != WSAEINPROGRESS) { 591 log_err("tcp connect: %s", 592 wsa_strerror(WSAGetLastError())); 593 #endif 594 sock_close(p->server_s); 595 sock_close(p->client_s); 596 free(p); 597 return; 598 } 599 } 600 p->timeout = *now; 601 dl_tv_add(&p->timeout, tcp_timeout); 602 603 /* listen to client and server */ 604 FD_SET(FD_SET_T p->client_s, rorig); 605 FD_SET(FD_SET_T p->server_s, rorig); 606 if(p->client_s+1 > *max) 607 *max = p->client_s+1; 608 if(p->server_s+1 > *max) 609 *max = p->server_s+1; 610 611 /* add into proxy list */ 612 p->next = *proxies; 613 *proxies = p; 614 } 615 616 /** relay TCP, read a part */ 617 static int 618 tcp_relay_read(int s, struct tcp_send_list** first, 619 struct tcp_send_list** last, struct timeval* now, 620 struct timeval* delay, sldns_buffer* pkt) 621 { 622 struct tcp_send_list* item; 623 ssize_t r = recv(s, (void*)sldns_buffer_begin(pkt), 624 sldns_buffer_capacity(pkt), 0); 625 if(r == -1) { 626 #ifndef USE_WINSOCK 627 if(errno == EINTR || errno == EAGAIN) 628 return 1; 629 #else 630 if(WSAGetLastError() == WSAEINPROGRESS || 631 WSAGetLastError() == WSAEWOULDBLOCK) 632 return 1; 633 #endif 634 log_err("tcp read: %s", sock_strerror(errno)); 635 return 0; 636 } else if(r == 0) { 637 /* connection closed */ 638 return 0; 639 } 640 item = (struct tcp_send_list*)malloc(sizeof(*item)); 641 if(!item) { 642 log_err("out of memory"); 643 return 0; 644 } 645 verbose(1, "read item len %d", (int)r); 646 item->len = (size_t)r; 647 item->item = memdup(sldns_buffer_begin(pkt), item->len); 648 if(!item->item) { 649 free(item); 650 log_err("out of memory"); 651 return 0; 652 } 653 item->done = 0; 654 item->wait = *now; 655 dl_tv_add(&item->wait, delay); 656 item->next = NULL; 657 658 /* link in */ 659 if(*first) { 660 (*last)->next = item; 661 } else { 662 *first = item; 663 } 664 *last = item; 665 return 1; 666 } 667 668 /** relay TCP, write a part */ 669 static int 670 tcp_relay_write(int s, struct tcp_send_list** first, 671 struct tcp_send_list** last, struct timeval* now) 672 { 673 ssize_t r; 674 struct tcp_send_list* p; 675 while(*first) { 676 p = *first; 677 /* is the item ready? */ 678 if(!dl_tv_smaller(&p->wait, now)) 679 return 1; 680 /* write it */ 681 r = send(s, (void*)(p->item + p->done), p->len - p->done, 0); 682 if(r == -1) { 683 #ifndef USE_WINSOCK 684 if(errno == EAGAIN || errno == EINTR) 685 return 1; 686 #else 687 if(WSAGetLastError() == WSAEWOULDBLOCK || 688 WSAGetLastError() == WSAEINPROGRESS) 689 return 1; 690 #endif 691 log_err("tcp write: %s", sock_strerror(errno)); 692 return 0; 693 } else if(r == 0) { 694 /* closed */ 695 return 0; 696 } 697 /* account it */ 698 p->done += (size_t)r; 699 verbose(1, "write item %d of %d", (int)p->done, (int)p->len); 700 if(p->done >= p->len) { 701 free(p->item); 702 *first = p->next; 703 if(!*first) 704 *last = NULL; 705 free(p); 706 } else { 707 /* partial write */ 708 return 1; 709 } 710 } 711 return 1; 712 } 713 714 /** perform TCP relaying */ 715 static void 716 service_tcp_relay(struct tcp_proxy** tcp_proxies, struct timeval* now, 717 struct timeval* delay, struct timeval* tcp_timeout, sldns_buffer* pkt, 718 fd_set* rset, fd_set* rorig, fd_set* worig) 719 { 720 struct tcp_proxy* p, **prev; 721 struct timeval tout; 722 int delete_it; 723 p = *tcp_proxies; 724 prev = tcp_proxies; 725 tout = *now; 726 dl_tv_add(&tout, tcp_timeout); 727 728 while(p) { 729 delete_it = 0; 730 /* can we receive further queries? */ 731 if(!delete_it && FD_ISSET(p->client_s, rset)) { 732 p->timeout = tout; 733 log_addr(1, "read tcp query", &p->addr, p->addr_len); 734 if(!tcp_relay_read(p->client_s, &p->querylist, 735 &p->querylast, now, delay, pkt)) 736 delete_it = 1; 737 } 738 /* can we receive further answers? */ 739 if(!delete_it && p->server_s != -1 && 740 FD_ISSET(p->server_s, rset)) { 741 p->timeout = tout; 742 log_addr(1, "read tcp answer", &p->addr, p->addr_len); 743 if(!tcp_relay_read(p->server_s, &p->answerlist, 744 &p->answerlast, now, delay, pkt)) { 745 sock_close(p->server_s); 746 FD_CLR(FD_SET_T p->server_s, worig); 747 FD_CLR(FD_SET_T p->server_s, rorig); 748 p->server_s = -1; 749 } 750 } 751 /* can we send on further queries */ 752 if(!delete_it && p->querylist && p->server_s != -1) { 753 p->timeout = tout; 754 if(dl_tv_smaller(&p->querylist->wait, now)) 755 log_addr(1, "write tcp query", 756 &p->addr, p->addr_len); 757 if(!tcp_relay_write(p->server_s, &p->querylist, 758 &p->querylast, now)) 759 delete_it = 1; 760 if(p->querylist && 761 dl_tv_smaller(&p->querylist->wait, now)) 762 FD_SET(FD_SET_T p->server_s, worig); 763 else FD_CLR(FD_SET_T p->server_s, worig); 764 } 765 766 /* can we send on further answers */ 767 if(!delete_it && p->answerlist) { 768 p->timeout = tout; 769 if(dl_tv_smaller(&p->answerlist->wait, now)) 770 log_addr(1, "write tcp answer", 771 &p->addr, p->addr_len); 772 if(!tcp_relay_write(p->client_s, &p->answerlist, 773 &p->answerlast, now)) 774 delete_it = 1; 775 if(p->answerlist && dl_tv_smaller(&p->answerlist->wait, 776 now)) 777 FD_SET(FD_SET_T p->client_s, worig); 778 else FD_CLR(FD_SET_T p->client_s, worig); 779 if(!p->answerlist && p->server_s == -1) 780 delete_it = 1; 781 } 782 783 /* does this entry timeout? (unused too long) */ 784 if(dl_tv_smaller(&p->timeout, now)) { 785 delete_it = 1; 786 } 787 if(delete_it) { 788 struct tcp_proxy* np = p->next; 789 *prev = np; 790 FD_CLR(FD_SET_T p->client_s, rorig); 791 FD_CLR(FD_SET_T p->client_s, worig); 792 if(p->server_s != -1) { 793 FD_CLR(FD_SET_T p->server_s, rorig); 794 FD_CLR(FD_SET_T p->server_s, worig); 795 } 796 tcp_proxy_delete(p); 797 p = np; 798 continue; 799 } 800 801 prev = &p->next; 802 p = p->next; 803 } 804 } 805 806 /** find waiting time */ 807 static int 808 service_findwait(struct timeval* now, struct timeval* wait, 809 struct ringbuf* ring, struct tcp_proxy* tcplist) 810 { 811 /* first item is the time to wait */ 812 struct timeval* peek = ring_peek_time(ring); 813 struct timeval tcv; 814 int have_tcpval = 0; 815 struct tcp_proxy* p; 816 817 /* also for TCP list the first in sendlists is the time to wait */ 818 for(p=tcplist; p; p=p->next) { 819 if(!have_tcpval) 820 tcv = p->timeout; 821 have_tcpval = 1; 822 if(dl_tv_smaller(&p->timeout, &tcv)) 823 tcv = p->timeout; 824 if(p->querylist && dl_tv_smaller(&p->querylist->wait, &tcv)) 825 tcv = p->querylist->wait; 826 if(p->answerlist && dl_tv_smaller(&p->answerlist->wait, &tcv)) 827 tcv = p->answerlist->wait; 828 } 829 if(peek) { 830 /* peek can be unaligned */ 831 /* use wait as a temp variable */ 832 memmove(wait, peek, sizeof(*wait)); 833 if(!have_tcpval) 834 tcv = *wait; 835 else if(dl_tv_smaller(wait, &tcv)) 836 tcv = *wait; 837 have_tcpval = 1; 838 } 839 if(have_tcpval) { 840 *wait = tcv; 841 dl_tv_subtract(wait, now); 842 return 1; 843 } 844 /* nothing, block */ 845 return 0; 846 } 847 848 /** clear proxy list */ 849 static void 850 proxy_list_clear(struct proxy* p) 851 { 852 char from[109]; 853 struct proxy* np; 854 int i=0, port; 855 while(p) { 856 np = p->next; 857 port = (int)ntohs(((struct sockaddr_in*)&p->addr)->sin_port); 858 if(addr_is_ip6(&p->addr, p->addr_len)) { 859 if(inet_ntop(AF_INET6, 860 &((struct sockaddr_in6*)&p->addr)->sin6_addr, 861 from, (socklen_t)sizeof(from)) == 0) 862 (void)strlcpy(from, "err", sizeof(from)); 863 } else { 864 if(inet_ntop(AF_INET, 865 &((struct sockaddr_in*)&p->addr)->sin_addr, 866 from, (socklen_t)sizeof(from)) == 0) 867 (void)strlcpy(from, "err", sizeof(from)); 868 } 869 printf("client[%d]: last %s@%d of %d : %u in, %u out, " 870 "%u returned\n", i++, from, port, (int)p->numreuse+1, 871 (unsigned)p->numwait, (unsigned)p->numsent, 872 (unsigned)p->numreturn); 873 sock_close(p->s); 874 free(p); 875 p = np; 876 } 877 } 878 879 /** clear TCP proxy list */ 880 static void 881 tcp_proxy_list_clear(struct tcp_proxy* p) 882 { 883 struct tcp_proxy* np; 884 while(p) { 885 np = p->next; 886 tcp_proxy_delete(p); 887 p = np; 888 } 889 } 890 891 /** delayer service loop */ 892 static void 893 service_loop(int udp_s, int listen_s, struct ringbuf* ring, 894 struct timeval* delay, struct timeval* reuse, 895 struct sockaddr_storage* srv_addr, socklen_t srv_len, 896 sldns_buffer* pkt) 897 { 898 fd_set rset, rorig; 899 fd_set wset, worig; 900 struct timeval now, wait; 901 int max, have_wait = 0; 902 struct proxy* proxies = NULL; 903 struct tcp_proxy* tcp_proxies = NULL; 904 struct timeval tcp_timeout; 905 tcp_timeout.tv_sec = 120; 906 tcp_timeout.tv_usec = 0; 907 #ifndef S_SPLINT_S 908 FD_ZERO(&rorig); 909 FD_ZERO(&worig); 910 FD_SET(FD_SET_T udp_s, &rorig); 911 FD_SET(FD_SET_T listen_s, &rorig); 912 #endif 913 max = udp_s + 1; 914 if(listen_s + 1 > max) max = listen_s + 1; 915 while(!do_quit) { 916 /* wait for events */ 917 rset = rorig; 918 wset = worig; 919 if(have_wait) 920 verbose(1, "wait for %d.%6.6d", 921 (unsigned)wait.tv_sec, (unsigned)wait.tv_usec); 922 else verbose(1, "wait"); 923 if(select(max, &rset, &wset, NULL, have_wait?&wait:NULL) < 0) { 924 if(errno == EAGAIN || errno == EINTR) 925 continue; 926 fatal_exit("select: %s", strerror(errno)); 927 } 928 /* get current time */ 929 if(gettimeofday(&now, NULL) < 0) { 930 if(errno == EAGAIN || errno == EINTR) 931 continue; 932 fatal_exit("gettimeofday: %s", strerror(errno)); 933 } 934 verbose(1, "process at %u.%6.6u\n", 935 (unsigned)now.tv_sec, (unsigned)now.tv_usec); 936 /* sendout delayed queries to master server (frees up buffer)*/ 937 service_send(ring, &now, pkt, srv_addr, srv_len); 938 /* proxy return replies */ 939 service_proxy(&rset, udp_s, proxies, pkt, &now); 940 /* see what can be received to start waiting */ 941 service_recv(udp_s, ring, pkt, &rorig, &max, &proxies, 942 srv_addr, srv_len, &now, delay, reuse); 943 /* see if there are new tcp connections */ 944 service_tcp_listen(listen_s, &rorig, &max, &tcp_proxies, 945 srv_addr, srv_len, &now, &tcp_timeout); 946 /* service tcp connections */ 947 service_tcp_relay(&tcp_proxies, &now, delay, &tcp_timeout, 948 pkt, &rset, &rorig, &worig); 949 /* see what next timeout is (if any) */ 950 have_wait = service_findwait(&now, &wait, ring, tcp_proxies); 951 } 952 proxy_list_clear(proxies); 953 tcp_proxy_list_clear(tcp_proxies); 954 } 955 956 /** delayer main service routine */ 957 static void 958 service(const char* bind_str, int bindport, const char* serv_str, 959 size_t memsize, int delay_msec) 960 { 961 struct sockaddr_storage bind_addr, srv_addr; 962 socklen_t bind_len, srv_len; 963 struct ringbuf* ring = ring_create(memsize); 964 struct timeval delay, reuse; 965 sldns_buffer* pkt; 966 int i, s, listen_s; 967 #ifndef S_SPLINT_S 968 delay.tv_sec = delay_msec / 1000; 969 delay.tv_usec = (delay_msec % 1000)*1000; 970 #endif 971 reuse = delay; /* reuse is max(4*delay, 1 second) */ 972 dl_tv_add(&reuse, &delay); 973 dl_tv_add(&reuse, &delay); 974 dl_tv_add(&reuse, &delay); 975 if(reuse.tv_sec == 0) 976 reuse.tv_sec = 1; 977 if(!extstrtoaddr(serv_str, &srv_addr, &srv_len)) { 978 printf("cannot parse forward address: %s\n", serv_str); 979 exit(1); 980 } 981 pkt = sldns_buffer_new(65535); 982 if(!pkt) 983 fatal_exit("out of memory"); 984 if( signal(SIGINT, delayer_sigh) == SIG_ERR || 985 #ifdef SIGHUP 986 signal(SIGHUP, delayer_sigh) == SIG_ERR || 987 #endif 988 #ifdef SIGQUIT 989 signal(SIGQUIT, delayer_sigh) == SIG_ERR || 990 #endif 991 #ifdef SIGBREAK 992 signal(SIGBREAK, delayer_sigh) == SIG_ERR || 993 #endif 994 #ifdef SIGALRM 995 signal(SIGALRM, delayer_sigh) == SIG_ERR || 996 #endif 997 signal(SIGTERM, delayer_sigh) == SIG_ERR) 998 fatal_exit("could not bind to signal"); 999 /* bind UDP port */ 1000 if((s = socket(str_is_ip6(bind_str)?AF_INET6:AF_INET, 1001 SOCK_DGRAM, 0)) == -1) { 1002 fatal_exit("socket: %s", sock_strerror(errno)); 1003 } 1004 i=0; 1005 if(bindport == 0) { 1006 bindport = 1024 + ((int)arc4random())%64000; 1007 i = 100; 1008 } 1009 while(1) { 1010 if(!ipstrtoaddr(bind_str, bindport, &bind_addr, &bind_len)) { 1011 printf("cannot parse listen address: %s\n", bind_str); 1012 exit(1); 1013 } 1014 if(bind(s, (struct sockaddr*)&bind_addr, bind_len) == -1) { 1015 log_err("bind: %s", sock_strerror(errno)); 1016 if(i--==0) 1017 fatal_exit("cannot bind any port"); 1018 bindport = 1024 + ((int)arc4random())%64000; 1019 } else break; 1020 } 1021 fd_set_nonblock(s); 1022 /* and TCP port */ 1023 if((listen_s = socket(str_is_ip6(bind_str)?AF_INET6:AF_INET, 1024 SOCK_STREAM, 0)) == -1) { 1025 fatal_exit("tcp socket: %s", sock_strerror(errno)); 1026 } 1027 #ifdef SO_REUSEADDR 1028 if(1) { 1029 int on = 1; 1030 if(setsockopt(listen_s, SOL_SOCKET, SO_REUSEADDR, (void*)&on, 1031 (socklen_t)sizeof(on)) < 0) 1032 fatal_exit("setsockopt(.. SO_REUSEADDR ..) failed: %s", 1033 sock_strerror(errno)); 1034 } 1035 #endif 1036 if(bind(listen_s, (struct sockaddr*)&bind_addr, bind_len) == -1) { 1037 fatal_exit("tcp bind: %s", sock_strerror(errno)); 1038 } 1039 if(listen(listen_s, 5) == -1) { 1040 fatal_exit("tcp listen: %s", sock_strerror(errno)); 1041 } 1042 fd_set_nonblock(listen_s); 1043 printf("listening on port: %d\n", bindport); 1044 1045 /* process loop */ 1046 do_quit = 0; 1047 service_loop(s, listen_s, ring, &delay, &reuse, &srv_addr, srv_len, 1048 pkt); 1049 1050 /* cleanup */ 1051 verbose(1, "cleanup"); 1052 sock_close(s); 1053 sock_close(listen_s); 1054 sldns_buffer_free(pkt); 1055 ring_delete(ring); 1056 } 1057 1058 /** getopt global, in case header files fail to declare it. */ 1059 extern int optind; 1060 /** getopt global, in case header files fail to declare it. */ 1061 extern char* optarg; 1062 1063 /** main program for delayer */ 1064 int main(int argc, char** argv) 1065 { 1066 int c; /* defaults */ 1067 const char* server = "127.0.0.1@53"; 1068 const char* bindto = "0.0.0.0"; 1069 int bindport = 0; 1070 size_t memsize = 10*1024*1024; 1071 int delay = 100; 1072 1073 verbosity = 0; 1074 log_init(0, 0, 0); 1075 log_ident_set("delayer"); 1076 if(argc == 1) usage(argv); 1077 while( (c=getopt(argc, argv, "b:d:f:hm:p:")) != -1) { 1078 switch(c) { 1079 case 'b': 1080 bindto = optarg; 1081 break; 1082 case 'd': 1083 if(atoi(optarg)==0 && strcmp(optarg,"0")!=0) { 1084 printf("bad delay: %s\n", optarg); 1085 return 1; 1086 } 1087 delay = atoi(optarg); 1088 break; 1089 case 'f': 1090 server = optarg; 1091 break; 1092 case 'm': 1093 if(!cfg_parse_memsize(optarg, &memsize)) { 1094 printf("bad memsize: %s\n", optarg); 1095 return 1; 1096 } 1097 break; 1098 case 'p': 1099 if(atoi(optarg)==0 && strcmp(optarg,"0")!=0) { 1100 printf("bad port nr: %s\n", optarg); 1101 return 1; 1102 } 1103 bindport = atoi(optarg); 1104 break; 1105 case 'h': 1106 case '?': 1107 default: 1108 usage(argv); 1109 } 1110 } 1111 argc -= optind; 1112 argv += optind; 1113 if(argc != 0) 1114 usage(argv); 1115 1116 printf("bind to %s @ %d and forward to %s after %d msec\n", 1117 bindto, bindport, server, delay); 1118 service(bindto, bindport, server, memsize, delay); 1119 return 0; 1120 } 1121