1 /* 2 * xfrd-tcp.c - XFR (transfer) Daemon TCP system source file. Manages tcp conn. 3 * 4 * Copyright (c) 2001-2006, NLnet Labs. All rights reserved. 5 * 6 * See LICENSE for the license. 7 * 8 */ 9 10 #include "config.h" 11 #include <assert.h> 12 #include <errno.h> 13 #include <fcntl.h> 14 #include <unistd.h> 15 #include <stdlib.h> 16 #include "nsd.h" 17 #include "xfrd-tcp.h" 18 #include "buffer.h" 19 #include "packet.h" 20 #include "dname.h" 21 #include "options.h" 22 #include "namedb.h" 23 #include "xfrd.h" 24 #include "xfrd-disk.h" 25 #include "util.h" 26 27 /* sort tcppipe, first on IP address, for an IPaddresss, sort on num_unused */ 28 static int 29 xfrd_pipe_cmp(const void* a, const void* b) 30 { 31 const struct xfrd_tcp_pipeline* x = (struct xfrd_tcp_pipeline*)a; 32 const struct xfrd_tcp_pipeline* y = (struct xfrd_tcp_pipeline*)b; 33 int r; 34 if(x == y) 35 return 0; 36 if(y->ip_len != x->ip_len) 37 /* subtraction works because nonnegative and small numbers */ 38 return (int)y->ip_len - (int)x->ip_len; 39 r = memcmp(&x->ip, &y->ip, x->ip_len); 40 if(r != 0) 41 return r; 42 /* sort that num_unused is sorted ascending, */ 43 if(x->num_unused != y->num_unused) { 44 return (x->num_unused < y->num_unused) ? -1 : 1; 45 } 46 /* different pipelines are different still, even with same numunused*/ 47 return (uintptr_t)x < (uintptr_t)y ? -1 : 1; 48 } 49 50 xfrd_tcp_set_t* xfrd_tcp_set_create(struct region* region) 51 { 52 int i; 53 xfrd_tcp_set_t* tcp_set = region_alloc(region, sizeof(xfrd_tcp_set_t)); 54 memset(tcp_set, 0, sizeof(xfrd_tcp_set_t)); 55 tcp_set->tcp_count = 0; 56 tcp_set->tcp_waiting_first = 0; 57 tcp_set->tcp_waiting_last = 0; 58 for(i=0; i<XFRD_MAX_TCP; i++) 59 tcp_set->tcp_state[i] = xfrd_tcp_pipeline_create(region); 60 tcp_set->pipetree = rbtree_create(region, &xfrd_pipe_cmp); 61 return tcp_set; 62 } 63 64 struct xfrd_tcp_pipeline* 65 xfrd_tcp_pipeline_create(region_type* region) 66 { 67 int i; 68 struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*) 69 region_alloc_zero(region, sizeof(*tp)); 70 tp->num_unused = ID_PIPE_NUM; 71 assert(sizeof(tp->unused)/sizeof(tp->unused[0]) == ID_PIPE_NUM); 72 for(i=0; i<ID_PIPE_NUM; i++) 73 tp->unused[i] = (uint16_t)i; 74 tp->tcp_r = xfrd_tcp_create(region, QIOBUFSZ); 75 tp->tcp_w = xfrd_tcp_create(region, 512); 76 return tp; 77 } 78 79 void 80 xfrd_setup_packet(buffer_type* packet, 81 uint16_t type, uint16_t klass, const dname_type* dname, uint16_t qid) 82 { 83 /* Set up the header */ 84 buffer_clear(packet); 85 ID_SET(packet, qid); 86 FLAGS_SET(packet, 0); 87 OPCODE_SET(packet, OPCODE_QUERY); 88 QDCOUNT_SET(packet, 1); 89 ANCOUNT_SET(packet, 0); 90 NSCOUNT_SET(packet, 0); 91 ARCOUNT_SET(packet, 0); 92 buffer_skip(packet, QHEADERSZ); 93 94 /* The question record. */ 95 buffer_write(packet, dname_name(dname), dname->name_size); 96 buffer_write_u16(packet, type); 97 buffer_write_u16(packet, klass); 98 } 99 100 static socklen_t 101 #ifdef INET6 102 xfrd_acl_sockaddr(acl_options_t* acl, unsigned int port, 103 struct sockaddr_storage *sck) 104 #else 105 xfrd_acl_sockaddr(acl_options_t* acl, unsigned int port, 106 struct sockaddr_in *sck, const char* fromto) 107 #endif /* INET6 */ 108 { 109 /* setup address structure */ 110 #ifdef INET6 111 memset(sck, 0, sizeof(struct sockaddr_storage)); 112 #else 113 memset(sck, 0, sizeof(struct sockaddr_in)); 114 #endif 115 if(acl->is_ipv6) { 116 #ifdef INET6 117 struct sockaddr_in6* sa = (struct sockaddr_in6*)sck; 118 sa->sin6_family = AF_INET6; 119 sa->sin6_port = htons(port); 120 sa->sin6_addr = acl->addr.addr6; 121 return sizeof(struct sockaddr_in6); 122 #else 123 log_msg(LOG_ERR, "xfrd: IPv6 connection %s %s attempted but no \ 124 INET6.", fromto, acl->ip_address_spec); 125 return 0; 126 #endif 127 } else { 128 struct sockaddr_in* sa = (struct sockaddr_in*)sck; 129 sa->sin_family = AF_INET; 130 sa->sin_port = htons(port); 131 sa->sin_addr = acl->addr.addr; 132 return sizeof(struct sockaddr_in); 133 } 134 } 135 136 socklen_t 137 #ifdef INET6 138 xfrd_acl_sockaddr_to(acl_options_t* acl, struct sockaddr_storage *to) 139 #else 140 xfrd_acl_sockaddr_to(acl_options_t* acl, struct sockaddr_in *to) 141 #endif /* INET6 */ 142 { 143 unsigned int port = acl->port?acl->port:(unsigned)atoi(TCP_PORT); 144 #ifdef INET6 145 return xfrd_acl_sockaddr(acl, port, to); 146 #else 147 return xfrd_acl_sockaddr(acl, port, to, "to"); 148 #endif /* INET6 */ 149 } 150 151 socklen_t 152 #ifdef INET6 153 xfrd_acl_sockaddr_frm(acl_options_t* acl, struct sockaddr_storage *frm) 154 #else 155 xfrd_acl_sockaddr_frm(acl_options_t* acl, struct sockaddr_in *frm) 156 #endif /* INET6 */ 157 { 158 unsigned int port = acl->port?acl->port:0; 159 #ifdef INET6 160 return xfrd_acl_sockaddr(acl, port, frm); 161 #else 162 return xfrd_acl_sockaddr(acl, port, frm, "from"); 163 #endif /* INET6 */ 164 } 165 166 void 167 xfrd_write_soa_buffer(struct buffer* packet, 168 const dname_type* apex, struct xfrd_soa* soa) 169 { 170 size_t rdlength_pos; 171 uint16_t rdlength; 172 buffer_write(packet, dname_name(apex), apex->name_size); 173 174 /* already in network order */ 175 buffer_write(packet, &soa->type, sizeof(soa->type)); 176 buffer_write(packet, &soa->klass, sizeof(soa->klass)); 177 buffer_write(packet, &soa->ttl, sizeof(soa->ttl)); 178 rdlength_pos = buffer_position(packet); 179 buffer_skip(packet, sizeof(rdlength)); 180 181 /* uncompressed dnames */ 182 buffer_write(packet, soa->prim_ns+1, soa->prim_ns[0]); 183 buffer_write(packet, soa->email+1, soa->email[0]); 184 185 buffer_write(packet, &soa->serial, sizeof(uint32_t)); 186 buffer_write(packet, &soa->refresh, sizeof(uint32_t)); 187 buffer_write(packet, &soa->retry, sizeof(uint32_t)); 188 buffer_write(packet, &soa->expire, sizeof(uint32_t)); 189 buffer_write(packet, &soa->minimum, sizeof(uint32_t)); 190 191 /* write length of RR */ 192 rdlength = buffer_position(packet) - rdlength_pos - sizeof(rdlength); 193 buffer_write_u16_at(packet, rdlength_pos, rdlength); 194 } 195 196 xfrd_tcp_t* 197 xfrd_tcp_create(region_type* region, size_t bufsize) 198 { 199 xfrd_tcp_t* tcp_state = (xfrd_tcp_t*)region_alloc( 200 region, sizeof(xfrd_tcp_t)); 201 memset(tcp_state, 0, sizeof(xfrd_tcp_t)); 202 tcp_state->packet = buffer_create(region, bufsize); 203 tcp_state->fd = -1; 204 205 return tcp_state; 206 } 207 208 static struct xfrd_tcp_pipeline* 209 pipeline_find(xfrd_tcp_set_t* set, xfrd_zone_t* zone) 210 { 211 rbnode_t* sme = NULL; 212 struct xfrd_tcp_pipeline* r; 213 /* smaller buf than a full pipeline with 64kb ID array, only need 214 * the front part with the key info, this front part contains the 215 * members that the compare function uses. */ 216 const size_t keysize = sizeof(struct xfrd_tcp_pipeline) - 217 ID_PIPE_NUM*(sizeof(struct xfrd_zone*) + sizeof(uint16_t)); 218 /* void* type for alignment of the struct, 219 * divide the keysize by ptr-size and then add one to round up */ 220 void* buf[ (keysize / sizeof(void*)) + 1 ]; 221 struct xfrd_tcp_pipeline* key = (struct xfrd_tcp_pipeline*)buf; 222 key->node.key = key; 223 key->ip_len = xfrd_acl_sockaddr_to(zone->master, &key->ip); 224 key->num_unused = ID_PIPE_NUM; 225 /* lookup existing tcp transfer to the master with highest unused */ 226 if(rbtree_find_less_equal(set->pipetree, key, &sme)) { 227 /* exact match, strange, fully unused tcp cannot be open */ 228 assert(0); 229 } 230 if(!sme) 231 return NULL; 232 r = (struct xfrd_tcp_pipeline*)sme->key; 233 /* <= key pointed at, is the master correct ? */ 234 if(r->ip_len != key->ip_len) 235 return NULL; 236 if(memcmp(&r->ip, &key->ip, key->ip_len) != 0) 237 return NULL; 238 /* correct master, is there a slot free for this transfer? */ 239 if(r->num_unused == 0) 240 return NULL; 241 return r; 242 } 243 244 /* remove zone from tcp waiting list */ 245 static void 246 tcp_zone_waiting_list_popfirst(xfrd_tcp_set_t* set, xfrd_zone_t* zone) 247 { 248 assert(zone->tcp_waiting); 249 set->tcp_waiting_first = zone->tcp_waiting_next; 250 if(zone->tcp_waiting_next) 251 zone->tcp_waiting_next->tcp_waiting_prev = NULL; 252 else set->tcp_waiting_last = 0; 253 zone->tcp_waiting_next = 0; 254 zone->tcp_waiting = 0; 255 } 256 257 /* remove zone from tcp pipe write-wait list */ 258 static void 259 tcp_pipe_sendlist_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone) 260 { 261 if(zone->in_tcp_send) { 262 if(zone->tcp_send_prev) 263 zone->tcp_send_prev->tcp_send_next=zone->tcp_send_next; 264 else tp->tcp_send_first=zone->tcp_send_next; 265 if(zone->tcp_send_next) 266 zone->tcp_send_next->tcp_send_prev=zone->tcp_send_prev; 267 else tp->tcp_send_last=zone->tcp_send_prev; 268 zone->in_tcp_send = 0; 269 } 270 } 271 272 /* remove first from write-wait list */ 273 static void 274 tcp_pipe_sendlist_popfirst(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone) 275 { 276 tp->tcp_send_first = zone->tcp_send_next; 277 if(tp->tcp_send_first) 278 tp->tcp_send_first->tcp_send_prev = NULL; 279 else tp->tcp_send_last = NULL; 280 zone->in_tcp_send = 0; 281 } 282 283 /* remove zone from tcp pipe ID map */ 284 static void 285 tcp_pipe_id_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone) 286 { 287 assert(tp->num_unused < ID_PIPE_NUM && tp->num_unused >= 0); 288 assert(tp->id[zone->query_id] == zone); 289 tp->id[zone->query_id] = NULL; 290 tp->unused[tp->num_unused] = zone->query_id; 291 /* must remove and re-add for sort order in tree */ 292 (void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node); 293 tp->num_unused++; 294 (void)rbtree_insert(xfrd->tcp_set->pipetree, &tp->node); 295 } 296 297 /* stop the tcp pipe (and all its zones need to retry) */ 298 static void 299 xfrd_tcp_pipe_stop(struct xfrd_tcp_pipeline* tp) 300 { 301 int i, conn = -1; 302 assert(tp->num_unused < ID_PIPE_NUM); /* at least one 'in-use' */ 303 assert(ID_PIPE_NUM - tp->num_unused > tp->num_skip); /* at least one 'nonskip' */ 304 /* need to retry for all the zones connected to it */ 305 /* these could use different lists and go to a different nextmaster*/ 306 for(i=0; i<ID_PIPE_NUM; i++) { 307 if(tp->id[i] && tp->id[i] != TCP_NULL_SKIP) { 308 xfrd_zone_t* zone = tp->id[i]; 309 conn = zone->tcp_conn; 310 zone->tcp_conn = -1; 311 zone->tcp_waiting = 0; 312 tcp_pipe_sendlist_remove(tp, zone); 313 tcp_pipe_id_remove(tp, zone); 314 xfrd_set_refresh_now(zone); 315 } 316 } 317 assert(conn != -1); 318 /* now release the entire tcp pipe */ 319 xfrd_tcp_pipe_release(xfrd->tcp_set, tp, conn); 320 } 321 322 static void 323 tcp_pipe_reset_timeout(struct xfrd_tcp_pipeline* tp) 324 { 325 int fd = tp->handler.ev_fd; 326 struct timeval tv; 327 tv.tv_sec = xfrd->tcp_set->tcp_timeout; 328 tv.tv_usec = 0; 329 if(tp->handler_added) 330 event_del(&tp->handler); 331 event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ| 332 (tp->tcp_send_first?EV_WRITE:0), xfrd_handle_tcp_pipe, tp); 333 if(event_base_set(xfrd->event_base, &tp->handler) != 0) 334 log_msg(LOG_ERR, "xfrd tcp: event_base_set failed"); 335 if(event_add(&tp->handler, &tv) != 0) 336 log_msg(LOG_ERR, "xfrd tcp: event_add failed"); 337 tp->handler_added = 1; 338 } 339 340 /* handle event from fd of tcp pipe */ 341 void 342 xfrd_handle_tcp_pipe(int ATTR_UNUSED(fd), short event, void* arg) 343 { 344 struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)arg; 345 if((event & EV_WRITE)) { 346 tcp_pipe_reset_timeout(tp); 347 if(tp->tcp_send_first) { 348 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp write, zone %s", 349 tp->tcp_send_first->apex_str)); 350 xfrd_tcp_write(tp, tp->tcp_send_first); 351 } 352 } 353 if((event & EV_READ) && tp->handler_added) { 354 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp read")); 355 tcp_pipe_reset_timeout(tp); 356 xfrd_tcp_read(tp); 357 } 358 if((event & EV_TIMEOUT) && tp->handler_added) { 359 /* tcp connection timed out */ 360 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp timeout")); 361 xfrd_tcp_pipe_stop(tp); 362 } 363 } 364 365 /* add a zone to the pipeline, it starts to want to write its query */ 366 static void 367 pipeline_setup_new_zone(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp, 368 xfrd_zone_t* zone) 369 { 370 /* assign the ID */ 371 int idx; 372 assert(tp->num_unused > 0); 373 /* we pick a random ID, even though it is TCP anyway */ 374 idx = random_generate(tp->num_unused); 375 zone->query_id = tp->unused[idx]; 376 tp->unused[idx] = tp->unused[tp->num_unused-1]; 377 tp->id[zone->query_id] = zone; 378 /* decrement unused counter, and fixup tree */ 379 (void)rbtree_delete(set->pipetree, &tp->node); 380 tp->num_unused--; 381 (void)rbtree_insert(set->pipetree, &tp->node); 382 383 /* add to sendlist, at end */ 384 zone->tcp_send_next = NULL; 385 zone->tcp_send_prev = tp->tcp_send_last; 386 zone->in_tcp_send = 1; 387 if(tp->tcp_send_last) 388 tp->tcp_send_last->tcp_send_next = zone; 389 else tp->tcp_send_first = zone; 390 tp->tcp_send_last = zone; 391 392 /* is it first in line? */ 393 if(tp->tcp_send_first == zone) { 394 xfrd_tcp_setup_write_packet(tp, zone); 395 /* add write to event handler */ 396 tcp_pipe_reset_timeout(tp); 397 } 398 } 399 400 void 401 xfrd_tcp_obtain(xfrd_tcp_set_t* set, xfrd_zone_t* zone) 402 { 403 struct xfrd_tcp_pipeline* tp; 404 assert(zone->tcp_conn == -1); 405 assert(zone->tcp_waiting == 0); 406 407 if(set->tcp_count < XFRD_MAX_TCP) { 408 int i; 409 assert(!set->tcp_waiting_first); 410 set->tcp_count ++; 411 /* find a free tcp_buffer */ 412 for(i=0; i<XFRD_MAX_TCP; i++) { 413 if(set->tcp_state[i]->tcp_r->fd == -1) { 414 zone->tcp_conn = i; 415 break; 416 } 417 } 418 /** What if there is no free tcp_buffer? return; */ 419 if (zone->tcp_conn < 0) { 420 return; 421 } 422 423 tp = set->tcp_state[zone->tcp_conn]; 424 zone->tcp_waiting = 0; 425 426 /* stop udp use (if any) */ 427 if(zone->zone_handler.ev_fd != -1) 428 xfrd_udp_release(zone); 429 430 if(!xfrd_tcp_open(set, tp, zone)) { 431 zone->tcp_conn = -1; 432 set->tcp_count --; 433 xfrd_set_refresh_now(zone); 434 return; 435 } 436 /* ip and ip_len set by tcp_open */ 437 tp->node.key = tp; 438 tp->num_unused = ID_PIPE_NUM; 439 tp->num_skip = 0; 440 tp->tcp_send_first = NULL; 441 tp->tcp_send_last = NULL; 442 memset(tp->id, 0, sizeof(tp->id)); 443 for(i=0; i<ID_PIPE_NUM; i++) { 444 tp->unused[i] = i; 445 } 446 447 /* insert into tree */ 448 (void)rbtree_insert(set->pipetree, &tp->node); 449 xfrd_deactivate_zone(zone); 450 xfrd_unset_timer(zone); 451 pipeline_setup_new_zone(set, tp, zone); 452 return; 453 } 454 /* check for a pipeline to the same master with unused ID */ 455 if((tp = pipeline_find(set, zone))!= NULL) { 456 int i; 457 if(zone->zone_handler.ev_fd != -1) 458 xfrd_udp_release(zone); 459 for(i=0; i<XFRD_MAX_TCP; i++) { 460 if(set->tcp_state[i] == tp) 461 zone->tcp_conn = i; 462 } 463 xfrd_deactivate_zone(zone); 464 xfrd_unset_timer(zone); 465 pipeline_setup_new_zone(set, tp, zone); 466 return; 467 } 468 469 /* wait, at end of line */ 470 DEBUG(DEBUG_XFRD,2, (LOG_INFO, "xfrd: max number of tcp " 471 "connections (%d) reached.", XFRD_MAX_TCP)); 472 zone->tcp_waiting_next = 0; 473 zone->tcp_waiting_prev = set->tcp_waiting_last; 474 zone->tcp_waiting = 1; 475 if(!set->tcp_waiting_last) { 476 set->tcp_waiting_first = zone; 477 set->tcp_waiting_last = zone; 478 } else { 479 set->tcp_waiting_last->tcp_waiting_next = zone; 480 set->tcp_waiting_last = zone; 481 } 482 xfrd_deactivate_zone(zone); 483 xfrd_unset_timer(zone); 484 } 485 486 int 487 xfrd_tcp_open(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp, 488 xfrd_zone_t* zone) 489 { 490 int fd, family, conn; 491 struct timeval tv; 492 assert(zone->tcp_conn != -1); 493 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s open tcp conn to %s", 494 zone->apex_str, zone->master->ip_address_spec)); 495 tp->tcp_r->is_reading = 1; 496 tp->tcp_r->total_bytes = 0; 497 tp->tcp_r->msglen = 0; 498 buffer_clear(tp->tcp_r->packet); 499 tp->tcp_w->is_reading = 0; 500 tp->tcp_w->total_bytes = 0; 501 tp->tcp_w->msglen = 0; 502 tp->connection_established = 0; 503 504 if(zone->master->is_ipv6) { 505 #ifdef INET6 506 family = PF_INET6; 507 #else 508 xfrd_set_refresh_now(zone); 509 return 0; 510 #endif 511 } else { 512 family = PF_INET; 513 } 514 fd = socket(family, SOCK_STREAM, IPPROTO_TCP); 515 if(fd == -1) { 516 log_msg(LOG_ERR, "xfrd: %s cannot create tcp socket: %s", 517 zone->master->ip_address_spec, strerror(errno)); 518 xfrd_set_refresh_now(zone); 519 return 0; 520 } 521 if(fcntl(fd, F_SETFL, O_NONBLOCK) == -1) { 522 log_msg(LOG_ERR, "xfrd: fcntl failed: %s", strerror(errno)); 523 close(fd); 524 xfrd_set_refresh_now(zone); 525 return 0; 526 } 527 528 if(xfrd->nsd->outgoing_tcp_mss > 0) { 529 #if defined(IPPROTO_TCP) && defined(TCP_MAXSEG) 530 if(setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, 531 (void*)&xfrd->nsd->outgoing_tcp_mss, 532 sizeof(xfrd->nsd->outgoing_tcp_mss)) < 0) { 533 log_msg(LOG_ERR, "xfrd: setsockopt(TCP_MAXSEG)" 534 "failed: %s", strerror(errno)); 535 } 536 #else 537 log_msg(LOG_ERR, "setsockopt(TCP_MAXSEG) unsupported"); 538 #endif 539 } 540 541 tp->ip_len = xfrd_acl_sockaddr_to(zone->master, &tp->ip); 542 543 /* bind it */ 544 if (!xfrd_bind_local_interface(fd, zone->zone_options->pattern-> 545 outgoing_interface, zone->master, 1)) { 546 close(fd); 547 xfrd_set_refresh_now(zone); 548 return 0; 549 } 550 551 conn = connect(fd, (struct sockaddr*)&tp->ip, tp->ip_len); 552 if (conn == -1 && errno != EINPROGRESS) { 553 log_msg(LOG_ERR, "xfrd: connect %s failed: %s", 554 zone->master->ip_address_spec, strerror(errno)); 555 close(fd); 556 xfrd_set_refresh_now(zone); 557 return 0; 558 } 559 tp->tcp_r->fd = fd; 560 tp->tcp_w->fd = fd; 561 562 /* set the tcp pipe event */ 563 if(tp->handler_added) 564 event_del(&tp->handler); 565 event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|EV_WRITE, 566 xfrd_handle_tcp_pipe, tp); 567 if(event_base_set(xfrd->event_base, &tp->handler) != 0) 568 log_msg(LOG_ERR, "xfrd tcp: event_base_set failed"); 569 tv.tv_sec = set->tcp_timeout; 570 tv.tv_usec = 0; 571 if(event_add(&tp->handler, &tv) != 0) 572 log_msg(LOG_ERR, "xfrd tcp: event_add failed"); 573 tp->handler_added = 1; 574 return 1; 575 } 576 577 void 578 xfrd_tcp_setup_write_packet(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone) 579 { 580 xfrd_tcp_t* tcp = tp->tcp_w; 581 assert(zone->tcp_conn != -1); 582 assert(zone->tcp_waiting == 0); 583 /* start AXFR or IXFR for the zone */ 584 if(zone->soa_disk_acquired == 0 || zone->master->use_axfr_only || 585 zone->master->ixfr_disabled || 586 /* if zone expired, after the first round, do not ask for 587 * IXFR any more, but full AXFR (of any serial number) */ 588 (zone->state == xfrd_zone_expired && zone->round_num != 0)) { 589 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request full zone transfer " 590 "(AXFR) for %s to %s", 591 zone->apex_str, zone->master->ip_address_spec)); 592 593 xfrd_setup_packet(tcp->packet, TYPE_AXFR, CLASS_IN, zone->apex, 594 zone->query_id); 595 } else { 596 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request incremental zone " 597 "transfer (IXFR) for %s to %s", 598 zone->apex_str, zone->master->ip_address_spec)); 599 600 xfrd_setup_packet(tcp->packet, TYPE_IXFR, CLASS_IN, zone->apex, 601 zone->query_id); 602 NSCOUNT_SET(tcp->packet, 1); 603 xfrd_write_soa_buffer(tcp->packet, zone->apex, &zone->soa_disk); 604 } 605 /* old transfer needs to be removed still? */ 606 if(zone->msg_seq_nr) 607 xfrd_unlink_xfrfile(xfrd->nsd, zone->xfrfilenumber); 608 zone->msg_seq_nr = 0; 609 zone->msg_rr_count = 0; 610 if(zone->master->key_options && zone->master->key_options->tsig_key) { 611 xfrd_tsig_sign_request(tcp->packet, &zone->tsig, zone->master); 612 } 613 buffer_flip(tcp->packet); 614 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "sent tcp query with ID %d", zone->query_id)); 615 tcp->msglen = buffer_limit(tcp->packet); 616 tcp->total_bytes = 0; 617 } 618 619 static void 620 tcp_conn_ready_for_reading(xfrd_tcp_t* tcp) 621 { 622 tcp->total_bytes = 0; 623 tcp->msglen = 0; 624 buffer_clear(tcp->packet); 625 } 626 627 int conn_write(xfrd_tcp_t* tcp) 628 { 629 ssize_t sent; 630 631 if(tcp->total_bytes < sizeof(tcp->msglen)) { 632 uint16_t sendlen = htons(tcp->msglen); 633 sent = write(tcp->fd, 634 (const char*)&sendlen + tcp->total_bytes, 635 sizeof(tcp->msglen) - tcp->total_bytes); 636 637 if(sent == -1) { 638 if(errno == EAGAIN || errno == EINTR) { 639 /* write would block, try later */ 640 return 0; 641 } else { 642 return -1; 643 } 644 } 645 646 tcp->total_bytes += sent; 647 if(tcp->total_bytes < sizeof(tcp->msglen)) { 648 /* incomplete write, resume later */ 649 return 0; 650 } 651 assert(tcp->total_bytes == sizeof(tcp->msglen)); 652 } 653 654 assert(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen)); 655 656 sent = write(tcp->fd, 657 buffer_current(tcp->packet), 658 buffer_remaining(tcp->packet)); 659 if(sent == -1) { 660 if(errno == EAGAIN || errno == EINTR) { 661 /* write would block, try later */ 662 return 0; 663 } else { 664 return -1; 665 } 666 } 667 668 buffer_skip(tcp->packet, sent); 669 tcp->total_bytes += sent; 670 671 if(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen)) { 672 /* more to write when socket becomes writable again */ 673 return 0; 674 } 675 676 assert(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen)); 677 return 1; 678 } 679 680 void 681 xfrd_tcp_write(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone) 682 { 683 int ret; 684 xfrd_tcp_t* tcp = tp->tcp_w; 685 assert(zone->tcp_conn != -1); 686 assert(zone == tp->tcp_send_first); 687 /* see if for non-established connection, there is a connect error */ 688 if(!tp->connection_established) { 689 /* check for pending error from nonblocking connect */ 690 /* from Stevens, unix network programming, vol1, 3rd ed, p450 */ 691 int error = 0; 692 socklen_t len = sizeof(error); 693 if(getsockopt(tcp->fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0){ 694 error = errno; /* on solaris errno is error */ 695 } 696 if(error == EINPROGRESS || error == EWOULDBLOCK) 697 return; /* try again later */ 698 if(error != 0) { 699 log_msg(LOG_ERR, "%s: Could not tcp connect to %s: %s", 700 zone->apex_str, zone->master->ip_address_spec, 701 strerror(error)); 702 xfrd_tcp_pipe_stop(tp); 703 return; 704 } 705 } 706 ret = conn_write(tcp); 707 if(ret == -1) { 708 log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno)); 709 xfrd_tcp_pipe_stop(tp); 710 return; 711 } 712 if(tcp->total_bytes != 0 && !tp->connection_established) 713 tp->connection_established = 1; 714 if(ret == 0) { 715 return; /* write again later */ 716 } 717 /* done writing this message */ 718 719 /* remove first zone from sendlist */ 720 tcp_pipe_sendlist_popfirst(tp, zone); 721 722 /* see if other zone wants to write; init; let it write (now) */ 723 /* and use a loop, because 64k stack calls is a too much */ 724 while(tp->tcp_send_first) { 725 /* setup to write for this zone */ 726 xfrd_tcp_setup_write_packet(tp, tp->tcp_send_first); 727 /* attempt to write for this zone (if success, continue loop)*/ 728 ret = conn_write(tcp); 729 if(ret == -1) { 730 log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno)); 731 xfrd_tcp_pipe_stop(tp); 732 return; 733 } 734 if(ret == 0) 735 return; /* write again later */ 736 tcp_pipe_sendlist_popfirst(tp, tp->tcp_send_first); 737 } 738 739 /* if sendlist empty, remove WRITE from event */ 740 741 /* listen to READ, and not WRITE events */ 742 assert(tp->tcp_send_first == NULL); 743 tcp_pipe_reset_timeout(tp); 744 } 745 746 int 747 conn_read(xfrd_tcp_t* tcp) 748 { 749 ssize_t received; 750 /* receive leading packet length bytes */ 751 if(tcp->total_bytes < sizeof(tcp->msglen)) { 752 received = read(tcp->fd, 753 (char*) &tcp->msglen + tcp->total_bytes, 754 sizeof(tcp->msglen) - tcp->total_bytes); 755 if(received == -1) { 756 if(errno == EAGAIN || errno == EINTR) { 757 /* read would block, try later */ 758 return 0; 759 } else { 760 #ifdef ECONNRESET 761 if (verbosity >= 2 || errno != ECONNRESET) 762 #endif /* ECONNRESET */ 763 log_msg(LOG_ERR, "tcp read sz: %s", strerror(errno)); 764 return -1; 765 } 766 } else if(received == 0) { 767 /* EOF */ 768 return -1; 769 } 770 tcp->total_bytes += received; 771 if(tcp->total_bytes < sizeof(tcp->msglen)) { 772 /* not complete yet, try later */ 773 return 0; 774 } 775 776 assert(tcp->total_bytes == sizeof(tcp->msglen)); 777 tcp->msglen = ntohs(tcp->msglen); 778 779 if(tcp->msglen == 0) { 780 buffer_set_limit(tcp->packet, tcp->msglen); 781 return 1; 782 } 783 if(tcp->msglen > buffer_capacity(tcp->packet)) { 784 log_msg(LOG_ERR, "buffer too small, dropping connection"); 785 return 0; 786 } 787 buffer_set_limit(tcp->packet, tcp->msglen); 788 } 789 790 assert(buffer_remaining(tcp->packet) > 0); 791 792 received = read(tcp->fd, buffer_current(tcp->packet), 793 buffer_remaining(tcp->packet)); 794 if(received == -1) { 795 if(errno == EAGAIN || errno == EINTR) { 796 /* read would block, try later */ 797 return 0; 798 } else { 799 #ifdef ECONNRESET 800 if (verbosity >= 2 || errno != ECONNRESET) 801 #endif /* ECONNRESET */ 802 log_msg(LOG_ERR, "tcp read %s", strerror(errno)); 803 return -1; 804 } 805 } else if(received == 0) { 806 /* EOF */ 807 return -1; 808 } 809 810 tcp->total_bytes += received; 811 buffer_skip(tcp->packet, received); 812 813 if(buffer_remaining(tcp->packet) > 0) { 814 /* not complete yet, wait for more */ 815 return 0; 816 } 817 818 /* completed */ 819 assert(buffer_position(tcp->packet) == tcp->msglen); 820 return 1; 821 } 822 823 void 824 xfrd_tcp_read(struct xfrd_tcp_pipeline* tp) 825 { 826 xfrd_zone_t* zone; 827 xfrd_tcp_t* tcp = tp->tcp_r; 828 int ret; 829 enum xfrd_packet_result pkt_result; 830 831 ret = conn_read(tcp); 832 if(ret == -1) { 833 xfrd_tcp_pipe_stop(tp); 834 return; 835 } 836 if(ret == 0) 837 return; 838 /* completed msg */ 839 buffer_flip(tcp->packet); 840 /* see which ID number it is, if skip, handle skip, NULL: warn */ 841 if(tcp->msglen < QHEADERSZ) { 842 /* too short for DNS header, skip it */ 843 DEBUG(DEBUG_XFRD,1, (LOG_INFO, 844 "xfrd: tcp skip response that is too short")); 845 tcp_conn_ready_for_reading(tcp); 846 return; 847 } 848 zone = tp->id[ID(tcp->packet)]; 849 if(!zone || zone == TCP_NULL_SKIP) { 850 /* no zone for this id? skip it */ 851 DEBUG(DEBUG_XFRD,1, (LOG_INFO, 852 "xfrd: tcp skip response with %s ID", 853 zone?"set-to-skip":"unknown")); 854 tcp_conn_ready_for_reading(tcp); 855 return; 856 } 857 assert(zone->tcp_conn != -1); 858 859 /* handle message for zone */ 860 pkt_result = xfrd_handle_received_xfr_packet(zone, tcp->packet); 861 /* setup for reading the next packet on this connection */ 862 tcp_conn_ready_for_reading(tcp); 863 switch(pkt_result) { 864 case xfrd_packet_more: 865 /* wait for next packet */ 866 break; 867 case xfrd_packet_newlease: 868 /* set to skip if more packets with this ID */ 869 tp->id[zone->query_id] = TCP_NULL_SKIP; 870 tp->num_skip++; 871 /* fall through to remove zone from tp */ 872 case xfrd_packet_transfer: 873 xfrd_tcp_release(xfrd->tcp_set, zone); 874 assert(zone->round_num == -1); 875 break; 876 case xfrd_packet_notimpl: 877 zone->master->ixfr_disabled = time(NULL); 878 xfrd_tcp_release(xfrd->tcp_set, zone); 879 /* query next server */ 880 xfrd_make_request(zone); 881 break; 882 case xfrd_packet_bad: 883 case xfrd_packet_tcp: 884 default: 885 /* set to skip if more packets with this ID */ 886 tp->id[zone->query_id] = TCP_NULL_SKIP; 887 tp->num_skip++; 888 xfrd_tcp_release(xfrd->tcp_set, zone); 889 /* query next server */ 890 xfrd_make_request(zone); 891 break; 892 } 893 } 894 895 void 896 xfrd_tcp_release(xfrd_tcp_set_t* set, xfrd_zone_t* zone) 897 { 898 int conn = zone->tcp_conn; 899 struct xfrd_tcp_pipeline* tp = set->tcp_state[conn]; 900 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s released tcp conn to %s", 901 zone->apex_str, zone->master->ip_address_spec)); 902 assert(zone->tcp_conn != -1); 903 assert(zone->tcp_waiting == 0); 904 zone->tcp_conn = -1; 905 zone->tcp_waiting = 0; 906 907 /* remove from tcp_send list */ 908 tcp_pipe_sendlist_remove(tp, zone); 909 /* remove it from the ID list */ 910 if(tp->id[zone->query_id] != TCP_NULL_SKIP) 911 tcp_pipe_id_remove(tp, zone); 912 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: released tcp pipe now %d unused", 913 tp->num_unused)); 914 /* if pipe was full, but no more, then see if waiting element is 915 * for the same master, and can fill the unused ID */ 916 if(tp->num_unused == 1 && set->tcp_waiting_first) { 917 #ifdef INET6 918 struct sockaddr_storage to; 919 #else 920 struct sockaddr_in to; 921 #endif 922 socklen_t to_len = xfrd_acl_sockaddr_to( 923 set->tcp_waiting_first->master, &to); 924 if(to_len == tp->ip_len && memcmp(&to, &tp->ip, to_len) == 0) { 925 /* use this connection for the waiting zone */ 926 zone = set->tcp_waiting_first; 927 assert(zone->tcp_conn == -1); 928 zone->tcp_conn = conn; 929 tcp_zone_waiting_list_popfirst(set, zone); 930 if(zone->zone_handler.ev_fd != -1) 931 xfrd_udp_release(zone); 932 xfrd_unset_timer(zone); 933 pipeline_setup_new_zone(set, tp, zone); 934 return; 935 } 936 /* waiting zone did not go to same server */ 937 } 938 939 /* if all unused, or only skipped leftover, close the pipeline */ 940 if(tp->num_unused >= ID_PIPE_NUM || tp->num_skip >= ID_PIPE_NUM - tp->num_unused) 941 xfrd_tcp_pipe_release(set, tp, conn); 942 } 943 944 void 945 xfrd_tcp_pipe_release(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp, 946 int conn) 947 { 948 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: tcp pipe released")); 949 /* one handler per tcp pipe */ 950 if(tp->handler_added) 951 event_del(&tp->handler); 952 tp->handler_added = 0; 953 954 /* fd in tcp_r and tcp_w is the same, close once */ 955 if(tp->tcp_r->fd != -1) 956 close(tp->tcp_r->fd); 957 tp->tcp_r->fd = -1; 958 tp->tcp_w->fd = -1; 959 960 /* remove from pipetree */ 961 (void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node); 962 963 /* a waiting zone can use the free tcp slot (to another server) */ 964 /* if that zone fails to set-up or connect, we try to start the next 965 * waiting zone in the list */ 966 while(set->tcp_count == XFRD_MAX_TCP && set->tcp_waiting_first) { 967 int i; 968 969 /* pop first waiting process */ 970 xfrd_zone_t* zone = set->tcp_waiting_first; 971 /* start it */ 972 assert(zone->tcp_conn == -1); 973 zone->tcp_conn = conn; 974 tcp_zone_waiting_list_popfirst(set, zone); 975 976 /* stop udp (if any) */ 977 if(zone->zone_handler.ev_fd != -1) 978 xfrd_udp_release(zone); 979 if(!xfrd_tcp_open(set, tp, zone)) { 980 zone->tcp_conn = -1; 981 xfrd_set_refresh_now(zone); 982 /* try to start the next zone (if any) */ 983 continue; 984 } 985 /* re-init this tcppipe */ 986 /* ip and ip_len set by tcp_open */ 987 tp->node.key = tp; 988 tp->num_unused = ID_PIPE_NUM; 989 tp->num_skip = 0; 990 tp->tcp_send_first = NULL; 991 tp->tcp_send_last = NULL; 992 memset(tp->id, 0, sizeof(tp->id)); 993 for(i=0; i<ID_PIPE_NUM; i++) { 994 tp->unused[i] = i; 995 } 996 997 /* insert into tree */ 998 (void)rbtree_insert(set->pipetree, &tp->node); 999 /* setup write */ 1000 xfrd_unset_timer(zone); 1001 pipeline_setup_new_zone(set, tp, zone); 1002 /* started a task, no need for cleanups, so return */ 1003 return; 1004 } 1005 /* no task to start, cleanup */ 1006 assert(!set->tcp_waiting_first); 1007 set->tcp_count --; 1008 assert(set->tcp_count >= 0); 1009 } 1010 1011