1 /* 2 * testcode/delayer.c - debug program that delays queries to a server. 3 * 4 * Copyright (c) 2008, NLnet Labs. All rights reserved. 5 * 6 * This software is open source. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * Redistributions of source code must retain the above copyright notice, 13 * this list of conditions and the following disclaimer. 14 * 15 * Redistributions in binary form must reproduce the above copyright notice, 16 * this list of conditions and the following disclaimer in the documentation 17 * and/or other materials provided with the distribution. 18 * 19 * Neither the name of the NLNET LABS nor the names of its contributors may 20 * be used to endorse or promote products derived from this software without 21 * specific prior written permission. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 34 */ 35 36 /** 37 * \file 38 * 39 * This program delays queries made. It performs as a proxy to another 40 * server and delays queries to it. 41 */ 42 43 #include "config.h" 44 #ifdef HAVE_GETOPT_H 45 #include <getopt.h> 46 #endif 47 #ifdef HAVE_TIME_H 48 #include <time.h> 49 #endif 50 #include <sys/time.h> 51 #include "util/net_help.h" 52 #include "util/config_file.h" 53 #include "sldns/sbuffer.h" 54 #include <signal.h> 55 56 /** number of reads per select for delayer */ 57 #define TRIES_PER_SELECT 100 58 59 /** 60 * The ring buffer 61 */ 62 struct ringbuf { 63 /** base of buffer */ 64 uint8_t* buf; 65 /** size of buffer */ 66 size_t size; 67 /** low mark, items start here */ 68 size_t low; 69 /** high mark, items end here */ 70 size_t high; 71 }; 72 73 /** 74 * List of proxy fds that return replies from the server to our clients. 75 */ 76 struct proxy { 77 /** the fd to listen for replies from server */ 78 int s; 79 /** last time this was used */ 80 struct timeval lastuse; 81 /** remote address */ 82 struct sockaddr_storage addr; 83 /** length of addr */ 84 socklen_t addr_len; 85 /** number of queries waiting (in total) */ 86 size_t numwait; 87 /** number of queries sent to server (in total) */ 88 size_t numsent; 89 /** numberof answers returned to client (in total) */ 90 size_t numreturn; 91 /** how many times repurposed */ 92 size_t numreuse; 93 /** next in proxylist */ 94 struct proxy* next; 95 }; 96 97 /** 98 * An item that has to be TCP relayed 99 */ 100 struct tcp_send_list { 101 /** the data item */ 102 uint8_t* item; 103 /** size of item */ 104 size_t len; 105 /** time when the item can be transmitted on */ 106 struct timeval wait; 107 /** how much of the item has already been transmitted */ 108 size_t done; 109 /** next in list */ 110 struct tcp_send_list* next; 111 }; 112 113 /** 114 * List of TCP proxy fd pairs to TCP connect client to server 115 */ 116 struct tcp_proxy { 117 /** the fd to listen for client query */ 118 int client_s; 119 /** the fd to listen for server answer */ 120 int server_s; 121 122 /** remote client address */ 123 struct sockaddr_storage addr; 124 /** length of address */ 125 socklen_t addr_len; 126 /** timeout on this entry */ 127 struct timeval timeout; 128 129 /** list of query items to send to server */ 130 struct tcp_send_list* querylist; 131 /** last in query list */ 132 struct tcp_send_list* querylast; 133 /** list of answer items to send to client */ 134 struct tcp_send_list* answerlist; 135 /** last in answerlist */ 136 struct tcp_send_list* answerlast; 137 138 /** next in list */ 139 struct tcp_proxy* next; 140 }; 141 142 /** usage information for delayer */ 143 static void usage(char* argv[]) 144 { 145 printf("usage: %s [options]\n", argv[0]); 146 printf(" -f addr : use addr, forward to that server, @port.\n"); 147 printf(" -b addr : bind to this address to listen.\n"); 148 printf(" -p port : bind to this port (use 0 for random).\n"); 149 printf(" -m mem : use this much memory for waiting queries.\n"); 150 printf(" -d delay: UDP queries are delayed n milliseconds.\n"); 151 printf(" TCP is delayed twice (on send, on recv).\n"); 152 printf(" -h : this help message\n"); 153 exit(1); 154 } 155 156 /** timeval compare, t1 < t2 */ 157 static int 158 dl_tv_smaller(struct timeval* t1, const struct timeval* t2) 159 { 160 #ifndef S_SPLINT_S 161 if(t1->tv_sec < t2->tv_sec) 162 return 1; 163 if(t1->tv_sec == t2->tv_sec && 164 t1->tv_usec < t2->tv_usec) 165 return 1; 166 #endif 167 return 0; 168 } 169 170 /** timeval add, t1 += t2 */ 171 static void 172 dl_tv_add(struct timeval* t1, const struct timeval* t2) 173 { 174 #ifndef S_SPLINT_S 175 t1->tv_sec += t2->tv_sec; 176 t1->tv_usec += t2->tv_usec; 177 while(t1->tv_usec > 1000000) { 178 t1->tv_usec -= 1000000; 179 t1->tv_sec++; 180 } 181 #endif 182 } 183 184 /** timeval subtract, t1 -= t2 */ 185 static void 186 dl_tv_subtract(struct timeval* t1, const struct timeval* t2) 187 { 188 #ifndef S_SPLINT_S 189 t1->tv_sec -= t2->tv_sec; 190 if(t1->tv_usec >= t2->tv_usec) { 191 t1->tv_usec -= t2->tv_usec; 192 } else { 193 t1->tv_sec--; 194 t1->tv_usec = 1000000-(t2->tv_usec-t1->tv_usec); 195 } 196 #endif 197 } 198 199 200 /** create new ring buffer */ 201 static struct ringbuf* 202 ring_create(size_t sz) 203 { 204 struct ringbuf* r = (struct ringbuf*)calloc(1, sizeof(*r)); 205 if(!r) fatal_exit("out of memory"); 206 r->buf = (uint8_t*)malloc(sz); 207 if(!r->buf) fatal_exit("out of memory"); 208 r->size = sz; 209 r->low = 0; 210 r->high = 0; 211 return r; 212 } 213 214 /** delete ring buffer */ 215 static void 216 ring_delete(struct ringbuf* r) 217 { 218 if(!r) return; 219 free(r->buf); 220 free(r); 221 } 222 223 /** add entry to ringbuffer */ 224 static void 225 ring_add(struct ringbuf* r, sldns_buffer* pkt, struct timeval* now, 226 struct timeval* delay, struct proxy* p) 227 { 228 /* time -- proxy* -- 16bitlen -- message */ 229 uint16_t len = (uint16_t)sldns_buffer_limit(pkt); 230 struct timeval when; 231 size_t needed; 232 uint8_t* where = NULL; 233 log_assert(sldns_buffer_limit(pkt) <= 65535); 234 needed = sizeof(when) + sizeof(p) + sizeof(len) + len; 235 /* put item into ringbuffer */ 236 if(r->low < r->high) { 237 /* used part is in the middle */ 238 if(r->size - r->high >= needed) { 239 where = r->buf + r->high; 240 r->high += needed; 241 } else if(r->low > needed) { 242 /* wrap around ringbuffer */ 243 /* make sure r->low == r->high means empty */ 244 /* so r->low == r->high cannot be used to signify 245 * a completely full ringbuf */ 246 if(r->size - r->high > sizeof(when)+sizeof(p)) { 247 /* zero entry at end of buffer */ 248 memset(r->buf+r->high, 0, 249 sizeof(when)+sizeof(p)); 250 } 251 where = r->buf; 252 r->high = needed; 253 } else { 254 /* drop message */ 255 log_warn("warning: mem full, dropped message"); 256 return; 257 } 258 } else { 259 /* empty */ 260 if(r->high == r->low) { 261 where = r->buf; 262 r->low = 0; 263 r->high = needed; 264 /* unused part is in the middle */ 265 /* so ringbuffer has wrapped around */ 266 } else if(r->low - r->high > needed) { 267 where = r->buf + r->high; 268 r->high += needed; 269 } else { 270 log_warn("warning: mem full, dropped message"); 271 return; 272 } 273 } 274 when = *now; 275 dl_tv_add(&when, delay); 276 /* copy it at where part */ 277 log_assert(where != NULL); 278 memmove(where, &when, sizeof(when)); 279 memmove(where+sizeof(when), &p, sizeof(p)); 280 memmove(where+sizeof(when)+sizeof(p), &len, sizeof(len)); 281 memmove(where+sizeof(when)+sizeof(p)+sizeof(len), 282 sldns_buffer_begin(pkt), len); 283 } 284 285 /** see if the ringbuffer is empty */ 286 static int 287 ring_empty(struct ringbuf* r) 288 { 289 return (r->low == r->high); 290 } 291 292 /** peek at timevalue for next item in ring */ 293 static struct timeval* 294 ring_peek_time(struct ringbuf* r) 295 { 296 if(ring_empty(r)) 297 return NULL; 298 return (struct timeval*)&r->buf[r->low]; 299 } 300 301 /** get entry from ringbuffer */ 302 static int 303 ring_pop(struct ringbuf* r, sldns_buffer* pkt, struct timeval* tv, 304 struct proxy** p) 305 { 306 /* time -- proxy* -- 16bitlen -- message */ 307 uint16_t len; 308 uint8_t* where = NULL; 309 size_t done; 310 if(r->low == r->high) 311 return 0; 312 where = r->buf + r->low; 313 memmove(tv, where, sizeof(*tv)); 314 memmove(p, where+sizeof(*tv), sizeof(*p)); 315 memmove(&len, where+sizeof(*tv)+sizeof(*p), sizeof(len)); 316 memmove(sldns_buffer_begin(pkt), 317 where+sizeof(*tv)+sizeof(*p)+sizeof(len), len); 318 sldns_buffer_set_limit(pkt, (size_t)len); 319 done = sizeof(*tv)+sizeof(*p)+sizeof(len)+len; 320 /* move lowmark */ 321 if(r->low < r->high) { 322 /* used part in middle */ 323 log_assert(r->high - r->low >= done); 324 r->low += done; 325 } else { 326 /* unused part in middle */ 327 log_assert(r->size - r->low >= done); 328 r->low += done; 329 if(r->size - r->low > sizeof(*tv)+sizeof(*p)) { 330 /* see if it is zeroed; means end of buffer */ 331 struct proxy* pz; 332 memmove(&pz, r->buf+r->low+sizeof(*tv), sizeof(pz)); 333 if(pz == NULL) 334 r->low = 0; 335 } else r->low = 0; 336 } 337 if(r->low == r->high) { 338 r->low = 0; /* reset if empty */ 339 r->high = 0; 340 } 341 return 1; 342 } 343 344 /** signal handler global info */ 345 static volatile int do_quit = 0; 346 347 /** signal handler for user quit */ 348 static RETSIGTYPE delayer_sigh(int sig) 349 { 350 printf("exit on signal %d\n", sig); 351 do_quit = 1; 352 } 353 354 /** send out waiting packets */ 355 static void 356 service_send(struct ringbuf* ring, struct timeval* now, sldns_buffer* pkt, 357 struct sockaddr_storage* srv_addr, socklen_t srv_len) 358 { 359 struct proxy* p; 360 struct timeval tv; 361 ssize_t sent; 362 while(!ring_empty(ring) && 363 dl_tv_smaller(ring_peek_time(ring), now)) { 364 /* this items needs to be sent out */ 365 if(!ring_pop(ring, pkt, &tv, &p)) 366 fatal_exit("ringbuf error: pop failed"); 367 verbose(1, "send out query %d.%6.6d", 368 (unsigned)tv.tv_sec, (unsigned)tv.tv_usec); 369 log_addr(1, "from client", &p->addr, p->addr_len); 370 /* send it */ 371 sent = sendto(p->s, (void*)sldns_buffer_begin(pkt), 372 sldns_buffer_limit(pkt), 0, 373 (struct sockaddr*)srv_addr, srv_len); 374 if(sent == -1) { 375 #ifndef USE_WINSOCK 376 log_err("sendto: %s", strerror(errno)); 377 #else 378 log_err("sendto: %s", wsa_strerror(WSAGetLastError())); 379 #endif 380 } else if(sent != (ssize_t)sldns_buffer_limit(pkt)) { 381 log_err("sendto: partial send"); 382 } 383 p->lastuse = *now; 384 p->numsent++; 385 } 386 } 387 388 /** do proxy for one readable client */ 389 static void 390 do_proxy(struct proxy* p, int retsock, sldns_buffer* pkt) 391 { 392 int i; 393 ssize_t r; 394 for(i=0; i<TRIES_PER_SELECT; i++) { 395 r = recv(p->s, (void*)sldns_buffer_begin(pkt), 396 sldns_buffer_capacity(pkt), 0); 397 if(r == -1) { 398 #ifndef USE_WINSOCK 399 if(errno == EAGAIN || errno == EINTR) 400 return; 401 log_err("recv: %s", strerror(errno)); 402 #else 403 if(WSAGetLastError() == WSAEINPROGRESS || 404 WSAGetLastError() == WSAEWOULDBLOCK) 405 return; 406 log_err("recv: %s", wsa_strerror(WSAGetLastError())); 407 #endif 408 return; 409 } 410 sldns_buffer_set_limit(pkt, (size_t)r); 411 log_addr(1, "return reply to client", &p->addr, p->addr_len); 412 /* send reply back to the real client */ 413 p->numreturn++; 414 r = sendto(retsock, (void*)sldns_buffer_begin(pkt), (size_t)r, 415 0, (struct sockaddr*)&p->addr, p->addr_len); 416 if(r == -1) { 417 #ifndef USE_WINSOCK 418 log_err("sendto: %s", strerror(errno)); 419 #else 420 log_err("sendto: %s", wsa_strerror(WSAGetLastError())); 421 #endif 422 } 423 } 424 } 425 426 /** proxy return replies to clients */ 427 static void 428 service_proxy(fd_set* rset, int retsock, struct proxy* proxies, 429 sldns_buffer* pkt, struct timeval* now) 430 { 431 struct proxy* p; 432 for(p = proxies; p; p = p->next) { 433 if(FD_ISSET(p->s, rset)) { 434 p->lastuse = *now; 435 do_proxy(p, retsock, pkt); 436 } 437 } 438 } 439 440 /** find or else create proxy for this remote client */ 441 static struct proxy* 442 find_create_proxy(struct sockaddr_storage* from, socklen_t from_len, 443 fd_set* rorig, int* max, struct proxy** proxies, int serv_ip6, 444 struct timeval* now, struct timeval* reuse_timeout) 445 { 446 struct proxy* p; 447 struct timeval t; 448 for(p = *proxies; p; p = p->next) { 449 if(sockaddr_cmp(from, from_len, &p->addr, p->addr_len)==0) 450 return p; 451 } 452 /* possibly: reuse lapsed entries */ 453 for(p = *proxies; p; p = p->next) { 454 if(p->numwait > p->numsent || p->numsent > p->numreturn) 455 continue; 456 t = *now; 457 dl_tv_subtract(&t, &p->lastuse); 458 if(dl_tv_smaller(&t, reuse_timeout)) 459 continue; 460 /* yes! */ 461 verbose(1, "reuse existing entry"); 462 memmove(&p->addr, from, from_len); 463 p->addr_len = from_len; 464 p->numreuse++; 465 return p; 466 } 467 /* create new */ 468 p = (struct proxy*)calloc(1, sizeof(*p)); 469 if(!p) fatal_exit("out of memory"); 470 p->s = socket(serv_ip6?AF_INET6:AF_INET, SOCK_DGRAM, 0); 471 if(p->s == -1) { 472 #ifndef USE_WINSOCK 473 fatal_exit("socket: %s", strerror(errno)); 474 #else 475 fatal_exit("socket: %s", wsa_strerror(WSAGetLastError())); 476 #endif 477 } 478 fd_set_nonblock(p->s); 479 memmove(&p->addr, from, from_len); 480 p->addr_len = from_len; 481 p->next = *proxies; 482 *proxies = p; 483 FD_SET(FD_SET_T p->s, rorig); 484 if(p->s+1 > *max) 485 *max = p->s+1; 486 return p; 487 } 488 489 /** recv new waiting packets */ 490 static void 491 service_recv(int s, struct ringbuf* ring, sldns_buffer* pkt, 492 fd_set* rorig, int* max, struct proxy** proxies, 493 struct sockaddr_storage* srv_addr, socklen_t srv_len, 494 struct timeval* now, struct timeval* delay, struct timeval* reuse) 495 { 496 int i; 497 struct sockaddr_storage from; 498 socklen_t from_len; 499 ssize_t len; 500 struct proxy* p; 501 for(i=0; i<TRIES_PER_SELECT; i++) { 502 from_len = (socklen_t)sizeof(from); 503 len = recvfrom(s, (void*)sldns_buffer_begin(pkt), 504 sldns_buffer_capacity(pkt), 0, 505 (struct sockaddr*)&from, &from_len); 506 if(len < 0) { 507 #ifndef USE_WINSOCK 508 if(errno == EAGAIN || errno == EINTR) 509 return; 510 fatal_exit("recvfrom: %s", strerror(errno)); 511 #else 512 if(WSAGetLastError() == WSAEWOULDBLOCK || 513 WSAGetLastError() == WSAEINPROGRESS) 514 return; 515 fatal_exit("recvfrom: %s", 516 wsa_strerror(WSAGetLastError())); 517 #endif 518 } 519 sldns_buffer_set_limit(pkt, (size_t)len); 520 /* find its proxy element */ 521 p = find_create_proxy(&from, from_len, rorig, max, proxies, 522 addr_is_ip6(srv_addr, srv_len), now, reuse); 523 if(!p) fatal_exit("error: cannot find or create proxy"); 524 p->lastuse = *now; 525 ring_add(ring, pkt, now, delay, p); 526 p->numwait++; 527 log_addr(1, "recv from client", &p->addr, p->addr_len); 528 } 529 } 530 531 /** delete tcp proxy */ 532 static void 533 tcp_proxy_delete(struct tcp_proxy* p) 534 { 535 struct tcp_send_list* s, *sn; 536 if(!p) 537 return; 538 log_addr(1, "delete tcp proxy", &p->addr, p->addr_len); 539 s = p->querylist; 540 while(s) { 541 sn = s->next; 542 free(s->item); 543 free(s); 544 s = sn; 545 } 546 s = p->answerlist; 547 while(s) { 548 sn = s->next; 549 free(s->item); 550 free(s); 551 s = sn; 552 } 553 #ifndef USE_WINSOCK 554 close(p->client_s); 555 if(p->server_s != -1) 556 close(p->server_s); 557 #else 558 closesocket(p->client_s); 559 if(p->server_s != -1) 560 closesocket(p->server_s); 561 #endif 562 free(p); 563 } 564 565 /** accept new TCP connections, and set them up */ 566 static void 567 service_tcp_listen(int s, fd_set* rorig, int* max, struct tcp_proxy** proxies, 568 struct sockaddr_storage* srv_addr, socklen_t srv_len, 569 struct timeval* now, struct timeval* tcp_timeout) 570 { 571 int newfd; 572 struct sockaddr_storage addr; 573 struct tcp_proxy* p; 574 socklen_t addr_len; 575 newfd = accept(s, (struct sockaddr*)&addr, &addr_len); 576 if(newfd == -1) { 577 #ifndef USE_WINSOCK 578 if(errno == EAGAIN || errno == EINTR) 579 return; 580 fatal_exit("accept: %s", strerror(errno)); 581 #else 582 if(WSAGetLastError() == WSAEWOULDBLOCK || 583 WSAGetLastError() == WSAEINPROGRESS || 584 WSAGetLastError() == WSAECONNRESET) 585 return; 586 fatal_exit("accept: %s", wsa_strerror(WSAGetLastError())); 587 #endif 588 } 589 p = (struct tcp_proxy*)calloc(1, sizeof(*p)); 590 if(!p) fatal_exit("out of memory"); 591 memmove(&p->addr, &addr, addr_len); 592 p->addr_len = addr_len; 593 log_addr(1, "new tcp proxy", &p->addr, p->addr_len); 594 p->client_s = newfd; 595 p->server_s = socket(addr_is_ip6(srv_addr, srv_len)?AF_INET6:AF_INET, 596 SOCK_STREAM, 0); 597 if(p->server_s == -1) { 598 #ifndef USE_WINSOCK 599 fatal_exit("tcp socket: %s", strerror(errno)); 600 #else 601 fatal_exit("tcp socket: %s", wsa_strerror(WSAGetLastError())); 602 #endif 603 } 604 fd_set_nonblock(p->client_s); 605 fd_set_nonblock(p->server_s); 606 if(connect(p->server_s, (struct sockaddr*)srv_addr, srv_len) == -1) { 607 #ifndef USE_WINSOCK 608 if(errno != EINPROGRESS) { 609 log_err("tcp connect: %s", strerror(errno)); 610 close(p->server_s); 611 close(p->client_s); 612 #else 613 if(WSAGetLastError() != WSAEWOULDBLOCK && 614 WSAGetLastError() != WSAEINPROGRESS) { 615 log_err("tcp connect: %s", 616 wsa_strerror(WSAGetLastError())); 617 closesocket(p->server_s); 618 closesocket(p->client_s); 619 #endif 620 free(p); 621 return; 622 } 623 } 624 p->timeout = *now; 625 dl_tv_add(&p->timeout, tcp_timeout); 626 627 /* listen to client and server */ 628 FD_SET(FD_SET_T p->client_s, rorig); 629 FD_SET(FD_SET_T p->server_s, rorig); 630 if(p->client_s+1 > *max) 631 *max = p->client_s+1; 632 if(p->server_s+1 > *max) 633 *max = p->server_s+1; 634 635 /* add into proxy list */ 636 p->next = *proxies; 637 *proxies = p; 638 } 639 640 /** relay TCP, read a part */ 641 static int 642 tcp_relay_read(int s, struct tcp_send_list** first, 643 struct tcp_send_list** last, struct timeval* now, 644 struct timeval* delay, sldns_buffer* pkt) 645 { 646 struct tcp_send_list* item; 647 ssize_t r = recv(s, (void*)sldns_buffer_begin(pkt), 648 sldns_buffer_capacity(pkt), 0); 649 if(r == -1) { 650 #ifndef USE_WINSOCK 651 if(errno == EINTR || errno == EAGAIN) 652 return 1; 653 log_err("tcp read: %s", strerror(errno)); 654 #else 655 if(WSAGetLastError() == WSAEINPROGRESS || 656 WSAGetLastError() == WSAEWOULDBLOCK) 657 return 1; 658 log_err("tcp read: %s", wsa_strerror(WSAGetLastError())); 659 #endif 660 return 0; 661 } else if(r == 0) { 662 /* connection closed */ 663 return 0; 664 } 665 item = (struct tcp_send_list*)malloc(sizeof(*item)); 666 if(!item) { 667 log_err("out of memory"); 668 return 0; 669 } 670 verbose(1, "read item len %d", (int)r); 671 item->len = (size_t)r; 672 item->item = memdup(sldns_buffer_begin(pkt), item->len); 673 if(!item->item) { 674 free(item); 675 log_err("out of memory"); 676 return 0; 677 } 678 item->done = 0; 679 item->wait = *now; 680 dl_tv_add(&item->wait, delay); 681 item->next = NULL; 682 683 /* link in */ 684 if(*first) { 685 (*last)->next = item; 686 } else { 687 *first = item; 688 } 689 *last = item; 690 return 1; 691 } 692 693 /** relay TCP, write a part */ 694 static int 695 tcp_relay_write(int s, struct tcp_send_list** first, 696 struct tcp_send_list** last, struct timeval* now) 697 { 698 ssize_t r; 699 struct tcp_send_list* p; 700 while(*first) { 701 p = *first; 702 /* is the item ready? */ 703 if(!dl_tv_smaller(&p->wait, now)) 704 return 1; 705 /* write it */ 706 r = send(s, (void*)(p->item + p->done), p->len - p->done, 0); 707 if(r == -1) { 708 #ifndef USE_WINSOCK 709 if(errno == EAGAIN || errno == EINTR) 710 return 1; 711 log_err("tcp write: %s", strerror(errno)); 712 #else 713 if(WSAGetLastError() == WSAEWOULDBLOCK || 714 WSAGetLastError() == WSAEINPROGRESS) 715 return 1; 716 log_err("tcp write: %s", 717 wsa_strerror(WSAGetLastError())); 718 #endif 719 return 0; 720 } else if(r == 0) { 721 /* closed */ 722 return 0; 723 } 724 /* account it */ 725 p->done += (size_t)r; 726 verbose(1, "write item %d of %d", (int)p->done, (int)p->len); 727 if(p->done >= p->len) { 728 free(p->item); 729 *first = p->next; 730 if(!*first) 731 *last = NULL; 732 free(p); 733 } else { 734 /* partial write */ 735 return 1; 736 } 737 } 738 return 1; 739 } 740 741 /** perform TCP relaying */ 742 static void 743 service_tcp_relay(struct tcp_proxy** tcp_proxies, struct timeval* now, 744 struct timeval* delay, struct timeval* tcp_timeout, sldns_buffer* pkt, 745 fd_set* rset, fd_set* rorig, fd_set* worig) 746 { 747 struct tcp_proxy* p, **prev; 748 struct timeval tout; 749 int delete_it; 750 p = *tcp_proxies; 751 prev = tcp_proxies; 752 tout = *now; 753 dl_tv_add(&tout, tcp_timeout); 754 755 while(p) { 756 delete_it = 0; 757 /* can we receive further queries? */ 758 if(!delete_it && FD_ISSET(p->client_s, rset)) { 759 p->timeout = tout; 760 log_addr(1, "read tcp query", &p->addr, p->addr_len); 761 if(!tcp_relay_read(p->client_s, &p->querylist, 762 &p->querylast, now, delay, pkt)) 763 delete_it = 1; 764 } 765 /* can we receive further answers? */ 766 if(!delete_it && p->server_s != -1 && 767 FD_ISSET(p->server_s, rset)) { 768 p->timeout = tout; 769 log_addr(1, "read tcp answer", &p->addr, p->addr_len); 770 if(!tcp_relay_read(p->server_s, &p->answerlist, 771 &p->answerlast, now, delay, pkt)) { 772 #ifndef USE_WINSOCK 773 close(p->server_s); 774 #else 775 closesocket(p->server_s); 776 #endif 777 FD_CLR(FD_SET_T p->server_s, worig); 778 FD_CLR(FD_SET_T p->server_s, rorig); 779 p->server_s = -1; 780 } 781 } 782 /* can we send on further queries */ 783 if(!delete_it && p->querylist && p->server_s != -1) { 784 p->timeout = tout; 785 if(dl_tv_smaller(&p->querylist->wait, now)) 786 log_addr(1, "write tcp query", 787 &p->addr, p->addr_len); 788 if(!tcp_relay_write(p->server_s, &p->querylist, 789 &p->querylast, now)) 790 delete_it = 1; 791 if(p->querylist && p->server_s != -1 && 792 dl_tv_smaller(&p->querylist->wait, now)) 793 FD_SET(FD_SET_T p->server_s, worig); 794 else FD_CLR(FD_SET_T p->server_s, worig); 795 } 796 797 /* can we send on further answers */ 798 if(!delete_it && p->answerlist) { 799 p->timeout = tout; 800 if(dl_tv_smaller(&p->answerlist->wait, now)) 801 log_addr(1, "write tcp answer", 802 &p->addr, p->addr_len); 803 if(!tcp_relay_write(p->client_s, &p->answerlist, 804 &p->answerlast, now)) 805 delete_it = 1; 806 if(p->answerlist && dl_tv_smaller(&p->answerlist->wait, 807 now)) 808 FD_SET(FD_SET_T p->client_s, worig); 809 else FD_CLR(FD_SET_T p->client_s, worig); 810 if(!p->answerlist && p->server_s == -1) 811 delete_it = 1; 812 } 813 814 /* does this entry timeout? (unused too long) */ 815 if(dl_tv_smaller(&p->timeout, now)) { 816 delete_it = 1; 817 } 818 if(delete_it) { 819 struct tcp_proxy* np = p->next; 820 *prev = np; 821 FD_CLR(FD_SET_T p->client_s, rorig); 822 FD_CLR(FD_SET_T p->client_s, worig); 823 if(p->server_s != -1) { 824 FD_CLR(FD_SET_T p->server_s, rorig); 825 FD_CLR(FD_SET_T p->server_s, worig); 826 } 827 tcp_proxy_delete(p); 828 p = np; 829 continue; 830 } 831 832 prev = &p->next; 833 p = p->next; 834 } 835 } 836 837 /** find waiting time */ 838 static int 839 service_findwait(struct timeval* now, struct timeval* wait, 840 struct ringbuf* ring, struct tcp_proxy* tcplist) 841 { 842 /* first item is the time to wait */ 843 struct timeval* peek = ring_peek_time(ring); 844 struct timeval tcv; 845 int have_tcpval = 0; 846 struct tcp_proxy* p; 847 848 /* also for TCP list the first in sendlists is the time to wait */ 849 for(p=tcplist; p; p=p->next) { 850 if(!have_tcpval) 851 tcv = p->timeout; 852 have_tcpval = 1; 853 if(dl_tv_smaller(&p->timeout, &tcv)) 854 tcv = p->timeout; 855 if(p->querylist && dl_tv_smaller(&p->querylist->wait, &tcv)) 856 tcv = p->querylist->wait; 857 if(p->answerlist && dl_tv_smaller(&p->answerlist->wait, &tcv)) 858 tcv = p->answerlist->wait; 859 } 860 if(peek) { 861 /* peek can be unaligned */ 862 /* use wait as a temp variable */ 863 memmove(wait, peek, sizeof(*wait)); 864 if(!have_tcpval) 865 tcv = *wait; 866 else if(dl_tv_smaller(wait, &tcv)) 867 tcv = *wait; 868 have_tcpval = 1; 869 } 870 if(have_tcpval) { 871 *wait = tcv; 872 dl_tv_subtract(wait, now); 873 return 1; 874 } 875 /* nothing, block */ 876 return 0; 877 } 878 879 /** clear proxy list */ 880 static void 881 proxy_list_clear(struct proxy* p) 882 { 883 char from[109]; 884 struct proxy* np; 885 int i=0, port; 886 while(p) { 887 np = p->next; 888 port = (int)ntohs(((struct sockaddr_in*)&p->addr)->sin_port); 889 if(addr_is_ip6(&p->addr, p->addr_len)) { 890 if(inet_ntop(AF_INET6, 891 &((struct sockaddr_in6*)&p->addr)->sin6_addr, 892 from, (socklen_t)sizeof(from)) == 0) 893 (void)strlcpy(from, "err", sizeof(from)); 894 } else { 895 if(inet_ntop(AF_INET, 896 &((struct sockaddr_in*)&p->addr)->sin_addr, 897 from, (socklen_t)sizeof(from)) == 0) 898 (void)strlcpy(from, "err", sizeof(from)); 899 } 900 printf("client[%d]: last %s@%d of %d : %u in, %u out, " 901 "%u returned\n", i++, from, port, (int)p->numreuse+1, 902 (unsigned)p->numwait, (unsigned)p->numsent, 903 (unsigned)p->numreturn); 904 #ifndef USE_WINSOCK 905 close(p->s); 906 #else 907 closesocket(p->s); 908 #endif 909 free(p); 910 p = np; 911 } 912 } 913 914 /** clear TCP proxy list */ 915 static void 916 tcp_proxy_list_clear(struct tcp_proxy* p) 917 { 918 struct tcp_proxy* np; 919 while(p) { 920 np = p->next; 921 tcp_proxy_delete(p); 922 p = np; 923 } 924 } 925 926 /** delayer service loop */ 927 static void 928 service_loop(int udp_s, int listen_s, struct ringbuf* ring, 929 struct timeval* delay, struct timeval* reuse, 930 struct sockaddr_storage* srv_addr, socklen_t srv_len, 931 sldns_buffer* pkt) 932 { 933 fd_set rset, rorig; 934 fd_set wset, worig; 935 struct timeval now, wait; 936 int max, have_wait = 0; 937 struct proxy* proxies = NULL; 938 struct tcp_proxy* tcp_proxies = NULL; 939 struct timeval tcp_timeout; 940 tcp_timeout.tv_sec = 120; 941 tcp_timeout.tv_usec = 0; 942 #ifndef S_SPLINT_S 943 FD_ZERO(&rorig); 944 FD_ZERO(&worig); 945 FD_SET(FD_SET_T udp_s, &rorig); 946 FD_SET(FD_SET_T listen_s, &rorig); 947 #endif 948 max = udp_s + 1; 949 if(listen_s + 1 > max) max = listen_s + 1; 950 while(!do_quit) { 951 /* wait for events */ 952 rset = rorig; 953 wset = worig; 954 if(have_wait) 955 verbose(1, "wait for %d.%6.6d", 956 (unsigned)wait.tv_sec, (unsigned)wait.tv_usec); 957 else verbose(1, "wait"); 958 if(select(max, &rset, &wset, NULL, have_wait?&wait:NULL) < 0) { 959 if(errno == EAGAIN || errno == EINTR) 960 continue; 961 fatal_exit("select: %s", strerror(errno)); 962 } 963 /* get current time */ 964 if(gettimeofday(&now, NULL) < 0) { 965 if(errno == EAGAIN || errno == EINTR) 966 continue; 967 fatal_exit("gettimeofday: %s", strerror(errno)); 968 } 969 verbose(1, "process at %u.%6.6u\n", 970 (unsigned)now.tv_sec, (unsigned)now.tv_usec); 971 /* sendout delayed queries to master server (frees up buffer)*/ 972 service_send(ring, &now, pkt, srv_addr, srv_len); 973 /* proxy return replies */ 974 service_proxy(&rset, udp_s, proxies, pkt, &now); 975 /* see what can be received to start waiting */ 976 service_recv(udp_s, ring, pkt, &rorig, &max, &proxies, 977 srv_addr, srv_len, &now, delay, reuse); 978 /* see if there are new tcp connections */ 979 service_tcp_listen(listen_s, &rorig, &max, &tcp_proxies, 980 srv_addr, srv_len, &now, &tcp_timeout); 981 /* service tcp connections */ 982 service_tcp_relay(&tcp_proxies, &now, delay, &tcp_timeout, 983 pkt, &rset, &rorig, &worig); 984 /* see what next timeout is (if any) */ 985 have_wait = service_findwait(&now, &wait, ring, tcp_proxies); 986 } 987 proxy_list_clear(proxies); 988 tcp_proxy_list_clear(tcp_proxies); 989 } 990 991 /** delayer main service routine */ 992 static void 993 service(const char* bind_str, int bindport, const char* serv_str, 994 size_t memsize, int delay_msec) 995 { 996 struct sockaddr_storage bind_addr, srv_addr; 997 socklen_t bind_len, srv_len; 998 struct ringbuf* ring = ring_create(memsize); 999 struct timeval delay, reuse; 1000 sldns_buffer* pkt; 1001 int i, s, listen_s; 1002 #ifndef S_SPLINT_S 1003 delay.tv_sec = delay_msec / 1000; 1004 delay.tv_usec = (delay_msec % 1000)*1000; 1005 #endif 1006 reuse = delay; /* reuse is max(4*delay, 1 second) */ 1007 dl_tv_add(&reuse, &delay); 1008 dl_tv_add(&reuse, &delay); 1009 dl_tv_add(&reuse, &delay); 1010 if(reuse.tv_sec == 0) 1011 reuse.tv_sec = 1; 1012 if(!extstrtoaddr(serv_str, &srv_addr, &srv_len)) { 1013 printf("cannot parse forward address: %s\n", serv_str); 1014 exit(1); 1015 } 1016 pkt = sldns_buffer_new(65535); 1017 if(!pkt) 1018 fatal_exit("out of memory"); 1019 if( signal(SIGINT, delayer_sigh) == SIG_ERR || 1020 #ifdef SIGHUP 1021 signal(SIGHUP, delayer_sigh) == SIG_ERR || 1022 #endif 1023 #ifdef SIGQUIT 1024 signal(SIGQUIT, delayer_sigh) == SIG_ERR || 1025 #endif 1026 #ifdef SIGBREAK 1027 signal(SIGBREAK, delayer_sigh) == SIG_ERR || 1028 #endif 1029 #ifdef SIGALRM 1030 signal(SIGALRM, delayer_sigh) == SIG_ERR || 1031 #endif 1032 signal(SIGTERM, delayer_sigh) == SIG_ERR) 1033 fatal_exit("could not bind to signal"); 1034 /* bind UDP port */ 1035 if((s = socket(str_is_ip6(bind_str)?AF_INET6:AF_INET, 1036 SOCK_DGRAM, 0)) == -1) { 1037 #ifndef USE_WINSOCK 1038 fatal_exit("socket: %s", strerror(errno)); 1039 #else 1040 fatal_exit("socket: %s", wsa_strerror(WSAGetLastError())); 1041 #endif 1042 } 1043 i=0; 1044 if(bindport == 0) { 1045 bindport = 1024 + arc4random()%64000; 1046 i = 100; 1047 } 1048 while(1) { 1049 if(!ipstrtoaddr(bind_str, bindport, &bind_addr, &bind_len)) { 1050 printf("cannot parse listen address: %s\n", bind_str); 1051 exit(1); 1052 } 1053 if(bind(s, (struct sockaddr*)&bind_addr, bind_len) == -1) { 1054 #ifndef USE_WINSOCK 1055 log_err("bind: %s", strerror(errno)); 1056 #else 1057 log_err("bind: %s", wsa_strerror(WSAGetLastError())); 1058 #endif 1059 if(i--==0) 1060 fatal_exit("cannot bind any port"); 1061 bindport = 1024 + arc4random()%64000; 1062 } else break; 1063 } 1064 fd_set_nonblock(s); 1065 /* and TCP port */ 1066 if((listen_s = socket(str_is_ip6(bind_str)?AF_INET6:AF_INET, 1067 SOCK_STREAM, 0)) == -1) { 1068 #ifndef USE_WINSOCK 1069 fatal_exit("tcp socket: %s", strerror(errno)); 1070 #else 1071 fatal_exit("tcp socket: %s", wsa_strerror(WSAGetLastError())); 1072 #endif 1073 } 1074 #ifdef SO_REUSEADDR 1075 if(1) { 1076 int on = 1; 1077 if(setsockopt(listen_s, SOL_SOCKET, SO_REUSEADDR, (void*)&on, 1078 (socklen_t)sizeof(on)) < 0) 1079 #ifndef USE_WINSOCK 1080 fatal_exit("setsockopt(.. SO_REUSEADDR ..) failed: %s", 1081 strerror(errno)); 1082 #else 1083 fatal_exit("setsockopt(.. SO_REUSEADDR ..) failed: %s", 1084 wsa_strerror(WSAGetLastError())); 1085 #endif 1086 } 1087 #endif 1088 if(bind(listen_s, (struct sockaddr*)&bind_addr, bind_len) == -1) { 1089 #ifndef USE_WINSOCK 1090 fatal_exit("tcp bind: %s", strerror(errno)); 1091 #else 1092 fatal_exit("tcp bind: %s", wsa_strerror(WSAGetLastError())); 1093 #endif 1094 } 1095 if(listen(listen_s, 5) == -1) { 1096 #ifndef USE_WINSOCK 1097 fatal_exit("tcp listen: %s", strerror(errno)); 1098 #else 1099 fatal_exit("tcp listen: %s", wsa_strerror(WSAGetLastError())); 1100 #endif 1101 } 1102 fd_set_nonblock(listen_s); 1103 printf("listening on port: %d\n", bindport); 1104 1105 /* process loop */ 1106 do_quit = 0; 1107 service_loop(s, listen_s, ring, &delay, &reuse, &srv_addr, srv_len, 1108 pkt); 1109 1110 /* cleanup */ 1111 verbose(1, "cleanup"); 1112 #ifndef USE_WINSOCK 1113 close(s); 1114 close(listen_s); 1115 #else 1116 closesocket(s); 1117 closesocket(listen_s); 1118 #endif 1119 sldns_buffer_free(pkt); 1120 ring_delete(ring); 1121 } 1122 1123 /** getopt global, in case header files fail to declare it. */ 1124 extern int optind; 1125 /** getopt global, in case header files fail to declare it. */ 1126 extern char* optarg; 1127 1128 /** main program for delayer */ 1129 int main(int argc, char** argv) 1130 { 1131 int c; /* defaults */ 1132 const char* server = "127.0.0.1@53"; 1133 const char* bindto = "0.0.0.0"; 1134 int bindport = 0; 1135 size_t memsize = 10*1024*1024; 1136 int delay = 100; 1137 1138 verbosity = 0; 1139 log_init(0, 0, 0); 1140 log_ident_set("delayer"); 1141 if(argc == 1) usage(argv); 1142 while( (c=getopt(argc, argv, "b:d:f:hm:p:")) != -1) { 1143 switch(c) { 1144 case 'b': 1145 bindto = optarg; 1146 break; 1147 case 'd': 1148 if(atoi(optarg)==0 && strcmp(optarg,"0")!=0) { 1149 printf("bad delay: %s\n", optarg); 1150 return 1; 1151 } 1152 delay = atoi(optarg); 1153 break; 1154 case 'f': 1155 server = optarg; 1156 break; 1157 case 'm': 1158 if(!cfg_parse_memsize(optarg, &memsize)) { 1159 printf("bad memsize: %s\n", optarg); 1160 return 1; 1161 } 1162 break; 1163 case 'p': 1164 if(atoi(optarg)==0 && strcmp(optarg,"0")!=0) { 1165 printf("bad port nr: %s\n", optarg); 1166 return 1; 1167 } 1168 bindport = atoi(optarg); 1169 break; 1170 case 'h': 1171 case '?': 1172 default: 1173 usage(argv); 1174 } 1175 } 1176 argc -= optind; 1177 argv += optind; 1178 if(argc != 0) 1179 usage(argv); 1180 1181 printf("bind to %s @ %d and forward to %s after %d msec\n", 1182 bindto, bindport, server, delay); 1183 service(bindto, bindport, server, memsize, delay); 1184 return 0; 1185 } 1186