1 /* 2 * xfrd-tcp.c - XFR (transfer) Daemon TCP system source file. Manages tcp conn. 3 * 4 * Copyright (c) 2001-2006, NLnet Labs. All rights reserved. 5 * 6 * See LICENSE for the license. 7 * 8 */ 9 10 #include "config.h" 11 #include <assert.h> 12 #include <errno.h> 13 #include <fcntl.h> 14 #include <unistd.h> 15 #include <stdlib.h> 16 #include <sys/uio.h> 17 #include "nsd.h" 18 #include "xfrd-tcp.h" 19 #include "buffer.h" 20 #include "packet.h" 21 #include "dname.h" 22 #include "options.h" 23 #include "namedb.h" 24 #include "xfrd.h" 25 #include "xfrd-disk.h" 26 #include "util.h" 27 28 /* sort tcppipe, first on IP address, for an IPaddresss, sort on num_unused */ 29 static int 30 xfrd_pipe_cmp(const void* a, const void* b) 31 { 32 const struct xfrd_tcp_pipeline* x = (struct xfrd_tcp_pipeline*)a; 33 const struct xfrd_tcp_pipeline* y = (struct xfrd_tcp_pipeline*)b; 34 int r; 35 if(x == y) 36 return 0; 37 if(y->ip_len != x->ip_len) 38 /* subtraction works because nonnegative and small numbers */ 39 return (int)y->ip_len - (int)x->ip_len; 40 r = memcmp(&x->ip, &y->ip, x->ip_len); 41 if(r != 0) 42 return r; 43 /* sort that num_unused is sorted ascending, */ 44 if(x->num_unused != y->num_unused) { 45 return (x->num_unused < y->num_unused) ? -1 : 1; 46 } 47 /* different pipelines are different still, even with same numunused*/ 48 return (uintptr_t)x < (uintptr_t)y ? -1 : 1; 49 } 50 51 struct xfrd_tcp_set* xfrd_tcp_set_create(struct region* region) 52 { 53 int i; 54 struct xfrd_tcp_set* tcp_set = region_alloc(region, 55 sizeof(struct xfrd_tcp_set)); 56 memset(tcp_set, 0, sizeof(struct xfrd_tcp_set)); 57 tcp_set->tcp_count = 0; 58 tcp_set->tcp_waiting_first = 0; 59 tcp_set->tcp_waiting_last = 0; 60 for(i=0; i<XFRD_MAX_TCP; i++) 61 tcp_set->tcp_state[i] = xfrd_tcp_pipeline_create(region); 62 tcp_set->pipetree = rbtree_create(region, &xfrd_pipe_cmp); 63 return tcp_set; 64 } 65 66 struct xfrd_tcp_pipeline* 67 xfrd_tcp_pipeline_create(region_type* region) 68 { 69 int i; 70 struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*) 71 region_alloc_zero(region, sizeof(*tp)); 72 tp->num_unused = ID_PIPE_NUM; 73 assert(sizeof(tp->unused)/sizeof(tp->unused[0]) == ID_PIPE_NUM); 74 for(i=0; i<ID_PIPE_NUM; i++) 75 tp->unused[i] = (uint16_t)i; 76 tp->tcp_r = xfrd_tcp_create(region, QIOBUFSZ); 77 tp->tcp_w = xfrd_tcp_create(region, 512); 78 return tp; 79 } 80 81 void 82 xfrd_setup_packet(buffer_type* packet, 83 uint16_t type, uint16_t klass, const dname_type* dname, uint16_t qid) 84 { 85 /* Set up the header */ 86 buffer_clear(packet); 87 ID_SET(packet, qid); 88 FLAGS_SET(packet, 0); 89 OPCODE_SET(packet, OPCODE_QUERY); 90 QDCOUNT_SET(packet, 1); 91 ANCOUNT_SET(packet, 0); 92 NSCOUNT_SET(packet, 0); 93 ARCOUNT_SET(packet, 0); 94 buffer_skip(packet, QHEADERSZ); 95 96 /* The question record. */ 97 buffer_write(packet, dname_name(dname), dname->name_size); 98 buffer_write_u16(packet, type); 99 buffer_write_u16(packet, klass); 100 } 101 102 static socklen_t 103 #ifdef INET6 104 xfrd_acl_sockaddr(acl_options_type* acl, unsigned int port, 105 struct sockaddr_storage *sck) 106 #else 107 xfrd_acl_sockaddr(acl_options_type* acl, unsigned int port, 108 struct sockaddr_in *sck, const char* fromto) 109 #endif /* INET6 */ 110 { 111 /* setup address structure */ 112 #ifdef INET6 113 memset(sck, 0, sizeof(struct sockaddr_storage)); 114 #else 115 memset(sck, 0, sizeof(struct sockaddr_in)); 116 #endif 117 if(acl->is_ipv6) { 118 #ifdef INET6 119 struct sockaddr_in6* sa = (struct sockaddr_in6*)sck; 120 sa->sin6_family = AF_INET6; 121 sa->sin6_port = htons(port); 122 sa->sin6_addr = acl->addr.addr6; 123 return sizeof(struct sockaddr_in6); 124 #else 125 log_msg(LOG_ERR, "xfrd: IPv6 connection %s %s attempted but no \ 126 INET6.", fromto, acl->ip_address_spec); 127 return 0; 128 #endif 129 } else { 130 struct sockaddr_in* sa = (struct sockaddr_in*)sck; 131 sa->sin_family = AF_INET; 132 sa->sin_port = htons(port); 133 sa->sin_addr = acl->addr.addr; 134 return sizeof(struct sockaddr_in); 135 } 136 } 137 138 socklen_t 139 #ifdef INET6 140 xfrd_acl_sockaddr_to(acl_options_type* acl, struct sockaddr_storage *to) 141 #else 142 xfrd_acl_sockaddr_to(acl_options_type* acl, struct sockaddr_in *to) 143 #endif /* INET6 */ 144 { 145 unsigned int port = acl->port?acl->port:(unsigned)atoi(TCP_PORT); 146 #ifdef INET6 147 return xfrd_acl_sockaddr(acl, port, to); 148 #else 149 return xfrd_acl_sockaddr(acl, port, to, "to"); 150 #endif /* INET6 */ 151 } 152 153 socklen_t 154 #ifdef INET6 155 xfrd_acl_sockaddr_frm(acl_options_type* acl, struct sockaddr_storage *frm) 156 #else 157 xfrd_acl_sockaddr_frm(acl_options_type* acl, struct sockaddr_in *frm) 158 #endif /* INET6 */ 159 { 160 unsigned int port = acl->port?acl->port:0; 161 #ifdef INET6 162 return xfrd_acl_sockaddr(acl, port, frm); 163 #else 164 return xfrd_acl_sockaddr(acl, port, frm, "from"); 165 #endif /* INET6 */ 166 } 167 168 void 169 xfrd_write_soa_buffer(struct buffer* packet, 170 const dname_type* apex, struct xfrd_soa* soa) 171 { 172 size_t rdlength_pos; 173 uint16_t rdlength; 174 buffer_write(packet, dname_name(apex), apex->name_size); 175 176 /* already in network order */ 177 buffer_write(packet, &soa->type, sizeof(soa->type)); 178 buffer_write(packet, &soa->klass, sizeof(soa->klass)); 179 buffer_write(packet, &soa->ttl, sizeof(soa->ttl)); 180 rdlength_pos = buffer_position(packet); 181 buffer_skip(packet, sizeof(rdlength)); 182 183 /* uncompressed dnames */ 184 buffer_write(packet, soa->prim_ns+1, soa->prim_ns[0]); 185 buffer_write(packet, soa->email+1, soa->email[0]); 186 187 buffer_write(packet, &soa->serial, sizeof(uint32_t)); 188 buffer_write(packet, &soa->refresh, sizeof(uint32_t)); 189 buffer_write(packet, &soa->retry, sizeof(uint32_t)); 190 buffer_write(packet, &soa->expire, sizeof(uint32_t)); 191 buffer_write(packet, &soa->minimum, sizeof(uint32_t)); 192 193 /* write length of RR */ 194 rdlength = buffer_position(packet) - rdlength_pos - sizeof(rdlength); 195 buffer_write_u16_at(packet, rdlength_pos, rdlength); 196 } 197 198 struct xfrd_tcp* 199 xfrd_tcp_create(region_type* region, size_t bufsize) 200 { 201 struct xfrd_tcp* tcp_state = (struct xfrd_tcp*)region_alloc( 202 region, sizeof(struct xfrd_tcp)); 203 memset(tcp_state, 0, sizeof(struct xfrd_tcp)); 204 tcp_state->packet = buffer_create(region, bufsize); 205 tcp_state->fd = -1; 206 207 return tcp_state; 208 } 209 210 static struct xfrd_tcp_pipeline* 211 pipeline_find(struct xfrd_tcp_set* set, xfrd_zone_type* zone) 212 { 213 rbnode_type* sme = NULL; 214 struct xfrd_tcp_pipeline* r; 215 /* smaller buf than a full pipeline with 64kb ID array, only need 216 * the front part with the key info, this front part contains the 217 * members that the compare function uses. */ 218 enum { keysize = sizeof(struct xfrd_tcp_pipeline) - 219 ID_PIPE_NUM*(sizeof(struct xfrd_zone*) + sizeof(uint16_t)) }; 220 /* void* type for alignment of the struct, 221 * divide the keysize by ptr-size and then add one to round up */ 222 void* buf[ (keysize / sizeof(void*)) + 1 ]; 223 struct xfrd_tcp_pipeline* key = (struct xfrd_tcp_pipeline*)buf; 224 key->node.key = key; 225 key->ip_len = xfrd_acl_sockaddr_to(zone->master, &key->ip); 226 key->num_unused = ID_PIPE_NUM; 227 /* lookup existing tcp transfer to the master with highest unused */ 228 if(rbtree_find_less_equal(set->pipetree, key, &sme)) { 229 /* exact match, strange, fully unused tcp cannot be open */ 230 assert(0); 231 } 232 if(!sme) 233 return NULL; 234 r = (struct xfrd_tcp_pipeline*)sme->key; 235 /* <= key pointed at, is the master correct ? */ 236 if(r->ip_len != key->ip_len) 237 return NULL; 238 if(memcmp(&r->ip, &key->ip, key->ip_len) != 0) 239 return NULL; 240 /* correct master, is there a slot free for this transfer? */ 241 if(r->num_unused == 0) 242 return NULL; 243 return r; 244 } 245 246 /* remove zone from tcp waiting list */ 247 static void 248 tcp_zone_waiting_list_popfirst(struct xfrd_tcp_set* set, xfrd_zone_type* zone) 249 { 250 assert(zone->tcp_waiting); 251 set->tcp_waiting_first = zone->tcp_waiting_next; 252 if(zone->tcp_waiting_next) 253 zone->tcp_waiting_next->tcp_waiting_prev = NULL; 254 else set->tcp_waiting_last = 0; 255 zone->tcp_waiting_next = 0; 256 zone->tcp_waiting = 0; 257 } 258 259 /* remove zone from tcp pipe write-wait list */ 260 static void 261 tcp_pipe_sendlist_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone) 262 { 263 if(zone->in_tcp_send) { 264 if(zone->tcp_send_prev) 265 zone->tcp_send_prev->tcp_send_next=zone->tcp_send_next; 266 else tp->tcp_send_first=zone->tcp_send_next; 267 if(zone->tcp_send_next) 268 zone->tcp_send_next->tcp_send_prev=zone->tcp_send_prev; 269 else tp->tcp_send_last=zone->tcp_send_prev; 270 zone->in_tcp_send = 0; 271 } 272 } 273 274 /* remove first from write-wait list */ 275 static void 276 tcp_pipe_sendlist_popfirst(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone) 277 { 278 tp->tcp_send_first = zone->tcp_send_next; 279 if(tp->tcp_send_first) 280 tp->tcp_send_first->tcp_send_prev = NULL; 281 else tp->tcp_send_last = NULL; 282 zone->in_tcp_send = 0; 283 } 284 285 /* remove zone from tcp pipe ID map */ 286 static void 287 tcp_pipe_id_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone) 288 { 289 assert(tp->num_unused < ID_PIPE_NUM && tp->num_unused >= 0); 290 assert(tp->id[zone->query_id] == zone); 291 tp->id[zone->query_id] = NULL; 292 tp->unused[tp->num_unused] = zone->query_id; 293 /* must remove and re-add for sort order in tree */ 294 (void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node); 295 tp->num_unused++; 296 (void)rbtree_insert(xfrd->tcp_set->pipetree, &tp->node); 297 } 298 299 /* stop the tcp pipe (and all its zones need to retry) */ 300 static void 301 xfrd_tcp_pipe_stop(struct xfrd_tcp_pipeline* tp) 302 { 303 int i, conn = -1; 304 assert(tp->num_unused < ID_PIPE_NUM); /* at least one 'in-use' */ 305 assert(ID_PIPE_NUM - tp->num_unused > tp->num_skip); /* at least one 'nonskip' */ 306 /* need to retry for all the zones connected to it */ 307 /* these could use different lists and go to a different nextmaster*/ 308 for(i=0; i<ID_PIPE_NUM; i++) { 309 if(tp->id[i] && tp->id[i] != TCP_NULL_SKIP) { 310 xfrd_zone_type* zone = tp->id[i]; 311 conn = zone->tcp_conn; 312 zone->tcp_conn = -1; 313 zone->tcp_waiting = 0; 314 tcp_pipe_sendlist_remove(tp, zone); 315 tcp_pipe_id_remove(tp, zone); 316 xfrd_set_refresh_now(zone); 317 } 318 } 319 assert(conn != -1); 320 /* now release the entire tcp pipe */ 321 xfrd_tcp_pipe_release(xfrd->tcp_set, tp, conn); 322 } 323 324 static void 325 tcp_pipe_reset_timeout(struct xfrd_tcp_pipeline* tp) 326 { 327 int fd = tp->handler.ev_fd; 328 struct timeval tv; 329 tv.tv_sec = xfrd->tcp_set->tcp_timeout; 330 tv.tv_usec = 0; 331 if(tp->handler_added) 332 event_del(&tp->handler); 333 memset(&tp->handler, 0, sizeof(tp->handler)); 334 event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ| 335 (tp->tcp_send_first?EV_WRITE:0), xfrd_handle_tcp_pipe, tp); 336 if(event_base_set(xfrd->event_base, &tp->handler) != 0) 337 log_msg(LOG_ERR, "xfrd tcp: event_base_set failed"); 338 if(event_add(&tp->handler, &tv) != 0) 339 log_msg(LOG_ERR, "xfrd tcp: event_add failed"); 340 tp->handler_added = 1; 341 } 342 343 /* handle event from fd of tcp pipe */ 344 void 345 xfrd_handle_tcp_pipe(int ATTR_UNUSED(fd), short event, void* arg) 346 { 347 struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)arg; 348 if((event & EV_WRITE)) { 349 tcp_pipe_reset_timeout(tp); 350 if(tp->tcp_send_first) { 351 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp write, zone %s", 352 tp->tcp_send_first->apex_str)); 353 xfrd_tcp_write(tp, tp->tcp_send_first); 354 } 355 } 356 if((event & EV_READ) && tp->handler_added) { 357 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp read")); 358 tcp_pipe_reset_timeout(tp); 359 xfrd_tcp_read(tp); 360 } 361 if((event & EV_TIMEOUT) && tp->handler_added) { 362 /* tcp connection timed out */ 363 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp timeout")); 364 xfrd_tcp_pipe_stop(tp); 365 } 366 } 367 368 /* add a zone to the pipeline, it starts to want to write its query */ 369 static void 370 pipeline_setup_new_zone(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp, 371 xfrd_zone_type* zone) 372 { 373 /* assign the ID */ 374 int idx; 375 assert(tp->num_unused > 0); 376 /* we pick a random ID, even though it is TCP anyway */ 377 idx = random_generate(tp->num_unused); 378 zone->query_id = tp->unused[idx]; 379 tp->unused[idx] = tp->unused[tp->num_unused-1]; 380 tp->id[zone->query_id] = zone; 381 /* decrement unused counter, and fixup tree */ 382 (void)rbtree_delete(set->pipetree, &tp->node); 383 tp->num_unused--; 384 (void)rbtree_insert(set->pipetree, &tp->node); 385 386 /* add to sendlist, at end */ 387 zone->tcp_send_next = NULL; 388 zone->tcp_send_prev = tp->tcp_send_last; 389 zone->in_tcp_send = 1; 390 if(tp->tcp_send_last) 391 tp->tcp_send_last->tcp_send_next = zone; 392 else tp->tcp_send_first = zone; 393 tp->tcp_send_last = zone; 394 395 /* is it first in line? */ 396 if(tp->tcp_send_first == zone) { 397 xfrd_tcp_setup_write_packet(tp, zone); 398 /* add write to event handler */ 399 tcp_pipe_reset_timeout(tp); 400 } 401 } 402 403 void 404 xfrd_tcp_obtain(struct xfrd_tcp_set* set, xfrd_zone_type* zone) 405 { 406 struct xfrd_tcp_pipeline* tp; 407 assert(zone->tcp_conn == -1); 408 assert(zone->tcp_waiting == 0); 409 410 if(set->tcp_count < XFRD_MAX_TCP) { 411 int i; 412 assert(!set->tcp_waiting_first); 413 set->tcp_count ++; 414 /* find a free tcp_buffer */ 415 for(i=0; i<XFRD_MAX_TCP; i++) { 416 if(set->tcp_state[i]->tcp_r->fd == -1) { 417 zone->tcp_conn = i; 418 break; 419 } 420 } 421 /** What if there is no free tcp_buffer? return; */ 422 if (zone->tcp_conn < 0) { 423 return; 424 } 425 426 tp = set->tcp_state[zone->tcp_conn]; 427 zone->tcp_waiting = 0; 428 429 /* stop udp use (if any) */ 430 if(zone->zone_handler.ev_fd != -1) 431 xfrd_udp_release(zone); 432 433 if(!xfrd_tcp_open(set, tp, zone)) { 434 zone->tcp_conn = -1; 435 set->tcp_count --; 436 xfrd_set_refresh_now(zone); 437 return; 438 } 439 /* ip and ip_len set by tcp_open */ 440 tp->node.key = tp; 441 tp->num_unused = ID_PIPE_NUM; 442 tp->num_skip = 0; 443 tp->tcp_send_first = NULL; 444 tp->tcp_send_last = NULL; 445 memset(tp->id, 0, sizeof(tp->id)); 446 for(i=0; i<ID_PIPE_NUM; i++) { 447 tp->unused[i] = i; 448 } 449 450 /* insert into tree */ 451 (void)rbtree_insert(set->pipetree, &tp->node); 452 xfrd_deactivate_zone(zone); 453 xfrd_unset_timer(zone); 454 pipeline_setup_new_zone(set, tp, zone); 455 return; 456 } 457 /* check for a pipeline to the same master with unused ID */ 458 if((tp = pipeline_find(set, zone))!= NULL) { 459 int i; 460 if(zone->zone_handler.ev_fd != -1) 461 xfrd_udp_release(zone); 462 for(i=0; i<XFRD_MAX_TCP; i++) { 463 if(set->tcp_state[i] == tp) 464 zone->tcp_conn = i; 465 } 466 xfrd_deactivate_zone(zone); 467 xfrd_unset_timer(zone); 468 pipeline_setup_new_zone(set, tp, zone); 469 return; 470 } 471 472 /* wait, at end of line */ 473 DEBUG(DEBUG_XFRD,2, (LOG_INFO, "xfrd: max number of tcp " 474 "connections (%d) reached.", XFRD_MAX_TCP)); 475 zone->tcp_waiting_next = 0; 476 zone->tcp_waiting_prev = set->tcp_waiting_last; 477 zone->tcp_waiting = 1; 478 if(!set->tcp_waiting_last) { 479 set->tcp_waiting_first = zone; 480 set->tcp_waiting_last = zone; 481 } else { 482 set->tcp_waiting_last->tcp_waiting_next = zone; 483 set->tcp_waiting_last = zone; 484 } 485 xfrd_deactivate_zone(zone); 486 xfrd_unset_timer(zone); 487 } 488 489 int 490 xfrd_tcp_open(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp, 491 xfrd_zone_type* zone) 492 { 493 int fd, family, conn; 494 struct timeval tv; 495 assert(zone->tcp_conn != -1); 496 497 /* if there is no next master, fallback to use the first one */ 498 /* but there really should be a master set */ 499 if(!zone->master) { 500 zone->master = zone->zone_options->pattern->request_xfr; 501 zone->master_num = 0; 502 } 503 504 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s open tcp conn to %s", 505 zone->apex_str, zone->master->ip_address_spec)); 506 tp->tcp_r->is_reading = 1; 507 tp->tcp_r->total_bytes = 0; 508 tp->tcp_r->msglen = 0; 509 buffer_clear(tp->tcp_r->packet); 510 tp->tcp_w->is_reading = 0; 511 tp->tcp_w->total_bytes = 0; 512 tp->tcp_w->msglen = 0; 513 tp->connection_established = 0; 514 515 if(zone->master->is_ipv6) { 516 #ifdef INET6 517 family = PF_INET6; 518 #else 519 xfrd_set_refresh_now(zone); 520 return 0; 521 #endif 522 } else { 523 family = PF_INET; 524 } 525 fd = socket(family, SOCK_STREAM, IPPROTO_TCP); 526 if(fd == -1) { 527 /* squelch 'Address family not supported by protocol' at low 528 * verbosity levels */ 529 if(errno != EAFNOSUPPORT || verbosity > 2) 530 log_msg(LOG_ERR, "xfrd: %s cannot create tcp socket: %s", 531 zone->master->ip_address_spec, strerror(errno)); 532 xfrd_set_refresh_now(zone); 533 return 0; 534 } 535 if(fcntl(fd, F_SETFL, O_NONBLOCK) == -1) { 536 log_msg(LOG_ERR, "xfrd: fcntl failed: %s", strerror(errno)); 537 close(fd); 538 xfrd_set_refresh_now(zone); 539 return 0; 540 } 541 542 if(xfrd->nsd->outgoing_tcp_mss > 0) { 543 #if defined(IPPROTO_TCP) && defined(TCP_MAXSEG) 544 if(setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, 545 (void*)&xfrd->nsd->outgoing_tcp_mss, 546 sizeof(xfrd->nsd->outgoing_tcp_mss)) < 0) { 547 log_msg(LOG_ERR, "xfrd: setsockopt(TCP_MAXSEG)" 548 "failed: %s", strerror(errno)); 549 } 550 #else 551 log_msg(LOG_ERR, "setsockopt(TCP_MAXSEG) unsupported"); 552 #endif 553 } 554 555 tp->ip_len = xfrd_acl_sockaddr_to(zone->master, &tp->ip); 556 557 /* bind it */ 558 if (!xfrd_bind_local_interface(fd, zone->zone_options->pattern-> 559 outgoing_interface, zone->master, 1)) { 560 close(fd); 561 xfrd_set_refresh_now(zone); 562 return 0; 563 } 564 565 conn = connect(fd, (struct sockaddr*)&tp->ip, tp->ip_len); 566 if (conn == -1 && errno != EINPROGRESS) { 567 log_msg(LOG_ERR, "xfrd: connect %s failed: %s", 568 zone->master->ip_address_spec, strerror(errno)); 569 close(fd); 570 xfrd_set_refresh_now(zone); 571 return 0; 572 } 573 tp->tcp_r->fd = fd; 574 tp->tcp_w->fd = fd; 575 576 /* set the tcp pipe event */ 577 if(tp->handler_added) 578 event_del(&tp->handler); 579 memset(&tp->handler, 0, sizeof(tp->handler)); 580 event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|EV_WRITE, 581 xfrd_handle_tcp_pipe, tp); 582 if(event_base_set(xfrd->event_base, &tp->handler) != 0) 583 log_msg(LOG_ERR, "xfrd tcp: event_base_set failed"); 584 tv.tv_sec = set->tcp_timeout; 585 tv.tv_usec = 0; 586 if(event_add(&tp->handler, &tv) != 0) 587 log_msg(LOG_ERR, "xfrd tcp: event_add failed"); 588 tp->handler_added = 1; 589 return 1; 590 } 591 592 void 593 xfrd_tcp_setup_write_packet(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone) 594 { 595 struct xfrd_tcp* tcp = tp->tcp_w; 596 assert(zone->tcp_conn != -1); 597 assert(zone->tcp_waiting == 0); 598 /* start AXFR or IXFR for the zone */ 599 if(zone->soa_disk_acquired == 0 || zone->master->use_axfr_only || 600 zone->master->ixfr_disabled || 601 /* if zone expired, after the first round, do not ask for 602 * IXFR any more, but full AXFR (of any serial number) */ 603 (zone->state == xfrd_zone_expired && zone->round_num != 0)) { 604 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request full zone transfer " 605 "(AXFR) for %s to %s", 606 zone->apex_str, zone->master->ip_address_spec)); 607 608 xfrd_setup_packet(tcp->packet, TYPE_AXFR, CLASS_IN, zone->apex, 609 zone->query_id); 610 zone->query_type = TYPE_AXFR; 611 } else { 612 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request incremental zone " 613 "transfer (IXFR) for %s to %s", 614 zone->apex_str, zone->master->ip_address_spec)); 615 616 xfrd_setup_packet(tcp->packet, TYPE_IXFR, CLASS_IN, zone->apex, 617 zone->query_id); 618 zone->query_type = TYPE_IXFR; 619 NSCOUNT_SET(tcp->packet, 1); 620 xfrd_write_soa_buffer(tcp->packet, zone->apex, &zone->soa_disk); 621 } 622 /* old transfer needs to be removed still? */ 623 if(zone->msg_seq_nr) 624 xfrd_unlink_xfrfile(xfrd->nsd, zone->xfrfilenumber); 625 zone->msg_seq_nr = 0; 626 zone->msg_rr_count = 0; 627 if(zone->master->key_options && zone->master->key_options->tsig_key) { 628 xfrd_tsig_sign_request(tcp->packet, &zone->tsig, zone->master); 629 } 630 buffer_flip(tcp->packet); 631 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "sent tcp query with ID %d", zone->query_id)); 632 tcp->msglen = buffer_limit(tcp->packet); 633 tcp->total_bytes = 0; 634 } 635 636 static void 637 tcp_conn_ready_for_reading(struct xfrd_tcp* tcp) 638 { 639 tcp->total_bytes = 0; 640 tcp->msglen = 0; 641 buffer_clear(tcp->packet); 642 } 643 644 int conn_write(struct xfrd_tcp* tcp) 645 { 646 ssize_t sent; 647 648 if(tcp->total_bytes < sizeof(tcp->msglen)) { 649 uint16_t sendlen = htons(tcp->msglen); 650 #ifdef HAVE_WRITEV 651 struct iovec iov[2]; 652 iov[0].iov_base = (uint8_t*)&sendlen + tcp->total_bytes; 653 iov[0].iov_len = sizeof(sendlen) - tcp->total_bytes; 654 iov[1].iov_base = buffer_begin(tcp->packet); 655 iov[1].iov_len = buffer_limit(tcp->packet); 656 sent = writev(tcp->fd, iov, 2); 657 #else /* HAVE_WRITEV */ 658 sent = write(tcp->fd, 659 (const char*)&sendlen + tcp->total_bytes, 660 sizeof(tcp->msglen) - tcp->total_bytes); 661 #endif /* HAVE_WRITEV */ 662 663 if(sent == -1) { 664 if(errno == EAGAIN || errno == EINTR) { 665 /* write would block, try later */ 666 return 0; 667 } else { 668 return -1; 669 } 670 } 671 672 tcp->total_bytes += sent; 673 if(sent > (ssize_t)sizeof(tcp->msglen)) 674 buffer_skip(tcp->packet, sent-sizeof(tcp->msglen)); 675 if(tcp->total_bytes < sizeof(tcp->msglen)) { 676 /* incomplete write, resume later */ 677 return 0; 678 } 679 #ifdef HAVE_WRITEV 680 if(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen)) { 681 /* packet done */ 682 return 1; 683 } 684 #endif 685 assert(tcp->total_bytes >= sizeof(tcp->msglen)); 686 } 687 688 assert(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen)); 689 690 sent = write(tcp->fd, 691 buffer_current(tcp->packet), 692 buffer_remaining(tcp->packet)); 693 if(sent == -1) { 694 if(errno == EAGAIN || errno == EINTR) { 695 /* write would block, try later */ 696 return 0; 697 } else { 698 return -1; 699 } 700 } 701 702 buffer_skip(tcp->packet, sent); 703 tcp->total_bytes += sent; 704 705 if(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen)) { 706 /* more to write when socket becomes writable again */ 707 return 0; 708 } 709 710 assert(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen)); 711 return 1; 712 } 713 714 void 715 xfrd_tcp_write(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone) 716 { 717 int ret; 718 struct xfrd_tcp* tcp = tp->tcp_w; 719 assert(zone->tcp_conn != -1); 720 assert(zone == tp->tcp_send_first); 721 /* see if for non-established connection, there is a connect error */ 722 if(!tp->connection_established) { 723 /* check for pending error from nonblocking connect */ 724 /* from Stevens, unix network programming, vol1, 3rd ed, p450 */ 725 int error = 0; 726 socklen_t len = sizeof(error); 727 if(getsockopt(tcp->fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0){ 728 error = errno; /* on solaris errno is error */ 729 } 730 if(error == EINPROGRESS || error == EWOULDBLOCK) 731 return; /* try again later */ 732 if(error != 0) { 733 log_msg(LOG_ERR, "%s: Could not tcp connect to %s: %s", 734 zone->apex_str, zone->master->ip_address_spec, 735 strerror(error)); 736 xfrd_tcp_pipe_stop(tp); 737 return; 738 } 739 } 740 ret = conn_write(tcp); 741 if(ret == -1) { 742 log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno)); 743 xfrd_tcp_pipe_stop(tp); 744 return; 745 } 746 if(tcp->total_bytes != 0 && !tp->connection_established) 747 tp->connection_established = 1; 748 if(ret == 0) { 749 return; /* write again later */ 750 } 751 /* done writing this message */ 752 753 /* remove first zone from sendlist */ 754 tcp_pipe_sendlist_popfirst(tp, zone); 755 756 /* see if other zone wants to write; init; let it write (now) */ 757 /* and use a loop, because 64k stack calls is a too much */ 758 while(tp->tcp_send_first) { 759 /* setup to write for this zone */ 760 xfrd_tcp_setup_write_packet(tp, tp->tcp_send_first); 761 /* attempt to write for this zone (if success, continue loop)*/ 762 ret = conn_write(tcp); 763 if(ret == -1) { 764 log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno)); 765 xfrd_tcp_pipe_stop(tp); 766 return; 767 } 768 if(ret == 0) 769 return; /* write again later */ 770 tcp_pipe_sendlist_popfirst(tp, tp->tcp_send_first); 771 } 772 773 /* if sendlist empty, remove WRITE from event */ 774 775 /* listen to READ, and not WRITE events */ 776 assert(tp->tcp_send_first == NULL); 777 tcp_pipe_reset_timeout(tp); 778 } 779 780 int 781 conn_read(struct xfrd_tcp* tcp) 782 { 783 ssize_t received; 784 /* receive leading packet length bytes */ 785 if(tcp->total_bytes < sizeof(tcp->msglen)) { 786 received = read(tcp->fd, 787 (char*) &tcp->msglen + tcp->total_bytes, 788 sizeof(tcp->msglen) - tcp->total_bytes); 789 if(received == -1) { 790 if(errno == EAGAIN || errno == EINTR) { 791 /* read would block, try later */ 792 return 0; 793 } else { 794 #ifdef ECONNRESET 795 if (verbosity >= 2 || errno != ECONNRESET) 796 #endif /* ECONNRESET */ 797 log_msg(LOG_ERR, "tcp read sz: %s", strerror(errno)); 798 return -1; 799 } 800 } else if(received == 0) { 801 /* EOF */ 802 return -1; 803 } 804 tcp->total_bytes += received; 805 if(tcp->total_bytes < sizeof(tcp->msglen)) { 806 /* not complete yet, try later */ 807 return 0; 808 } 809 810 assert(tcp->total_bytes == sizeof(tcp->msglen)); 811 tcp->msglen = ntohs(tcp->msglen); 812 813 if(tcp->msglen == 0) { 814 buffer_set_limit(tcp->packet, tcp->msglen); 815 return 1; 816 } 817 if(tcp->msglen > buffer_capacity(tcp->packet)) { 818 log_msg(LOG_ERR, "buffer too small, dropping connection"); 819 return 0; 820 } 821 buffer_set_limit(tcp->packet, tcp->msglen); 822 } 823 824 assert(buffer_remaining(tcp->packet) > 0); 825 826 received = read(tcp->fd, buffer_current(tcp->packet), 827 buffer_remaining(tcp->packet)); 828 if(received == -1) { 829 if(errno == EAGAIN || errno == EINTR) { 830 /* read would block, try later */ 831 return 0; 832 } else { 833 #ifdef ECONNRESET 834 if (verbosity >= 2 || errno != ECONNRESET) 835 #endif /* ECONNRESET */ 836 log_msg(LOG_ERR, "tcp read %s", strerror(errno)); 837 return -1; 838 } 839 } else if(received == 0) { 840 /* EOF */ 841 return -1; 842 } 843 844 tcp->total_bytes += received; 845 buffer_skip(tcp->packet, received); 846 847 if(buffer_remaining(tcp->packet) > 0) { 848 /* not complete yet, wait for more */ 849 return 0; 850 } 851 852 /* completed */ 853 assert(buffer_position(tcp->packet) == tcp->msglen); 854 return 1; 855 } 856 857 void 858 xfrd_tcp_read(struct xfrd_tcp_pipeline* tp) 859 { 860 xfrd_zone_type* zone; 861 struct xfrd_tcp* tcp = tp->tcp_r; 862 int ret; 863 enum xfrd_packet_result pkt_result; 864 865 ret = conn_read(tcp); 866 if(ret == -1) { 867 xfrd_tcp_pipe_stop(tp); 868 return; 869 } 870 if(ret == 0) 871 return; 872 /* completed msg */ 873 buffer_flip(tcp->packet); 874 /* see which ID number it is, if skip, handle skip, NULL: warn */ 875 if(tcp->msglen < QHEADERSZ) { 876 /* too short for DNS header, skip it */ 877 DEBUG(DEBUG_XFRD,1, (LOG_INFO, 878 "xfrd: tcp skip response that is too short")); 879 tcp_conn_ready_for_reading(tcp); 880 return; 881 } 882 zone = tp->id[ID(tcp->packet)]; 883 if(!zone || zone == TCP_NULL_SKIP) { 884 /* no zone for this id? skip it */ 885 DEBUG(DEBUG_XFRD,1, (LOG_INFO, 886 "xfrd: tcp skip response with %s ID", 887 zone?"set-to-skip":"unknown")); 888 tcp_conn_ready_for_reading(tcp); 889 return; 890 } 891 assert(zone->tcp_conn != -1); 892 893 /* handle message for zone */ 894 pkt_result = xfrd_handle_received_xfr_packet(zone, tcp->packet); 895 /* setup for reading the next packet on this connection */ 896 tcp_conn_ready_for_reading(tcp); 897 switch(pkt_result) { 898 case xfrd_packet_more: 899 /* wait for next packet */ 900 break; 901 case xfrd_packet_newlease: 902 /* set to skip if more packets with this ID */ 903 tp->id[zone->query_id] = TCP_NULL_SKIP; 904 tp->num_skip++; 905 /* fall through to remove zone from tp */ 906 /* fallthrough */ 907 case xfrd_packet_transfer: 908 if(zone->zone_options->pattern->multi_master_check) { 909 xfrd_tcp_release(xfrd->tcp_set, zone); 910 xfrd_make_request(zone); 911 break; 912 } 913 xfrd_tcp_release(xfrd->tcp_set, zone); 914 assert(zone->round_num == -1); 915 break; 916 case xfrd_packet_notimpl: 917 xfrd_disable_ixfr(zone); 918 xfrd_tcp_release(xfrd->tcp_set, zone); 919 /* query next server */ 920 xfrd_make_request(zone); 921 break; 922 case xfrd_packet_bad: 923 case xfrd_packet_tcp: 924 default: 925 /* set to skip if more packets with this ID */ 926 tp->id[zone->query_id] = TCP_NULL_SKIP; 927 tp->num_skip++; 928 xfrd_tcp_release(xfrd->tcp_set, zone); 929 /* query next server */ 930 xfrd_make_request(zone); 931 break; 932 } 933 } 934 935 void 936 xfrd_tcp_release(struct xfrd_tcp_set* set, xfrd_zone_type* zone) 937 { 938 int conn = zone->tcp_conn; 939 struct xfrd_tcp_pipeline* tp = set->tcp_state[conn]; 940 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s released tcp conn to %s", 941 zone->apex_str, zone->master->ip_address_spec)); 942 assert(zone->tcp_conn != -1); 943 assert(zone->tcp_waiting == 0); 944 zone->tcp_conn = -1; 945 zone->tcp_waiting = 0; 946 947 /* remove from tcp_send list */ 948 tcp_pipe_sendlist_remove(tp, zone); 949 /* remove it from the ID list */ 950 if(tp->id[zone->query_id] != TCP_NULL_SKIP) 951 tcp_pipe_id_remove(tp, zone); 952 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: released tcp pipe now %d unused", 953 tp->num_unused)); 954 /* if pipe was full, but no more, then see if waiting element is 955 * for the same master, and can fill the unused ID */ 956 if(tp->num_unused == 1 && set->tcp_waiting_first) { 957 #ifdef INET6 958 struct sockaddr_storage to; 959 #else 960 struct sockaddr_in to; 961 #endif 962 socklen_t to_len = xfrd_acl_sockaddr_to( 963 set->tcp_waiting_first->master, &to); 964 if(to_len == tp->ip_len && memcmp(&to, &tp->ip, to_len) == 0) { 965 /* use this connection for the waiting zone */ 966 zone = set->tcp_waiting_first; 967 assert(zone->tcp_conn == -1); 968 zone->tcp_conn = conn; 969 tcp_zone_waiting_list_popfirst(set, zone); 970 if(zone->zone_handler.ev_fd != -1) 971 xfrd_udp_release(zone); 972 xfrd_unset_timer(zone); 973 pipeline_setup_new_zone(set, tp, zone); 974 return; 975 } 976 /* waiting zone did not go to same server */ 977 } 978 979 /* if all unused, or only skipped leftover, close the pipeline */ 980 if(tp->num_unused >= ID_PIPE_NUM || tp->num_skip >= ID_PIPE_NUM - tp->num_unused) 981 xfrd_tcp_pipe_release(set, tp, conn); 982 } 983 984 void 985 xfrd_tcp_pipe_release(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp, 986 int conn) 987 { 988 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: tcp pipe released")); 989 /* one handler per tcp pipe */ 990 if(tp->handler_added) 991 event_del(&tp->handler); 992 tp->handler_added = 0; 993 994 /* fd in tcp_r and tcp_w is the same, close once */ 995 if(tp->tcp_r->fd != -1) 996 close(tp->tcp_r->fd); 997 tp->tcp_r->fd = -1; 998 tp->tcp_w->fd = -1; 999 1000 /* remove from pipetree */ 1001 (void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node); 1002 1003 /* a waiting zone can use the free tcp slot (to another server) */ 1004 /* if that zone fails to set-up or connect, we try to start the next 1005 * waiting zone in the list */ 1006 while(set->tcp_count == XFRD_MAX_TCP && set->tcp_waiting_first) { 1007 int i; 1008 1009 /* pop first waiting process */ 1010 xfrd_zone_type* zone = set->tcp_waiting_first; 1011 /* start it */ 1012 assert(zone->tcp_conn == -1); 1013 zone->tcp_conn = conn; 1014 tcp_zone_waiting_list_popfirst(set, zone); 1015 1016 /* stop udp (if any) */ 1017 if(zone->zone_handler.ev_fd != -1) 1018 xfrd_udp_release(zone); 1019 if(!xfrd_tcp_open(set, tp, zone)) { 1020 zone->tcp_conn = -1; 1021 xfrd_set_refresh_now(zone); 1022 /* try to start the next zone (if any) */ 1023 continue; 1024 } 1025 /* re-init this tcppipe */ 1026 /* ip and ip_len set by tcp_open */ 1027 tp->node.key = tp; 1028 tp->num_unused = ID_PIPE_NUM; 1029 tp->num_skip = 0; 1030 tp->tcp_send_first = NULL; 1031 tp->tcp_send_last = NULL; 1032 memset(tp->id, 0, sizeof(tp->id)); 1033 for(i=0; i<ID_PIPE_NUM; i++) { 1034 tp->unused[i] = i; 1035 } 1036 1037 /* insert into tree */ 1038 (void)rbtree_insert(set->pipetree, &tp->node); 1039 /* setup write */ 1040 xfrd_unset_timer(zone); 1041 pipeline_setup_new_zone(set, tp, zone); 1042 /* started a task, no need for cleanups, so return */ 1043 return; 1044 } 1045 /* no task to start, cleanup */ 1046 assert(!set->tcp_waiting_first); 1047 set->tcp_count --; 1048 assert(set->tcp_count >= 0); 1049 } 1050 1051