1 /* 2 * xfrd-tcp.c - XFR (transfer) Daemon TCP system source file. Manages tcp conn. 3 * 4 * Copyright (c) 2001-2006, NLnet Labs. All rights reserved. 5 * 6 * See LICENSE for the license. 7 * 8 */ 9 10 #include "config.h" 11 #include <assert.h> 12 #include <errno.h> 13 #include <fcntl.h> 14 #include <unistd.h> 15 #include <stdlib.h> 16 #include <sys/uio.h> 17 #include "nsd.h" 18 #include "xfrd-tcp.h" 19 #include "buffer.h" 20 #include "packet.h" 21 #include "dname.h" 22 #include "options.h" 23 #include "namedb.h" 24 #include "xfrd.h" 25 #include "xfrd-disk.h" 26 #include "util.h" 27 28 /* sort tcppipe, first on IP address, for an IPaddresss, sort on num_unused */ 29 static int 30 xfrd_pipe_cmp(const void* a, const void* b) 31 { 32 const struct xfrd_tcp_pipeline* x = (struct xfrd_tcp_pipeline*)a; 33 const struct xfrd_tcp_pipeline* y = (struct xfrd_tcp_pipeline*)b; 34 int r; 35 if(x == y) 36 return 0; 37 if(y->ip_len != x->ip_len) 38 /* subtraction works because nonnegative and small numbers */ 39 return (int)y->ip_len - (int)x->ip_len; 40 r = memcmp(&x->ip, &y->ip, x->ip_len); 41 if(r != 0) 42 return r; 43 /* sort that num_unused is sorted ascending, */ 44 if(x->num_unused != y->num_unused) { 45 return (x->num_unused < y->num_unused) ? -1 : 1; 46 } 47 /* different pipelines are different still, even with same numunused*/ 48 return (uintptr_t)x < (uintptr_t)y ? -1 : 1; 49 } 50 51 struct xfrd_tcp_set* xfrd_tcp_set_create(struct region* region) 52 { 53 int i; 54 struct xfrd_tcp_set* tcp_set = region_alloc(region, 55 sizeof(struct xfrd_tcp_set)); 56 memset(tcp_set, 0, sizeof(struct xfrd_tcp_set)); 57 tcp_set->tcp_count = 0; 58 tcp_set->tcp_waiting_first = 0; 59 tcp_set->tcp_waiting_last = 0; 60 for(i=0; i<XFRD_MAX_TCP; i++) 61 tcp_set->tcp_state[i] = xfrd_tcp_pipeline_create(region); 62 tcp_set->pipetree = rbtree_create(region, &xfrd_pipe_cmp); 63 return tcp_set; 64 } 65 66 struct xfrd_tcp_pipeline* 67 xfrd_tcp_pipeline_create(region_type* region) 68 { 69 int i; 70 struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*) 71 region_alloc_zero(region, sizeof(*tp)); 72 tp->num_unused = ID_PIPE_NUM; 73 assert(sizeof(tp->unused)/sizeof(tp->unused[0]) == ID_PIPE_NUM); 74 for(i=0; i<ID_PIPE_NUM; i++) 75 tp->unused[i] = (uint16_t)i; 76 tp->tcp_r = xfrd_tcp_create(region, QIOBUFSZ); 77 tp->tcp_w = xfrd_tcp_create(region, 512); 78 return tp; 79 } 80 81 void 82 xfrd_setup_packet(buffer_type* packet, 83 uint16_t type, uint16_t klass, const dname_type* dname, uint16_t qid) 84 { 85 /* Set up the header */ 86 buffer_clear(packet); 87 ID_SET(packet, qid); 88 FLAGS_SET(packet, 0); 89 OPCODE_SET(packet, OPCODE_QUERY); 90 QDCOUNT_SET(packet, 1); 91 ANCOUNT_SET(packet, 0); 92 NSCOUNT_SET(packet, 0); 93 ARCOUNT_SET(packet, 0); 94 buffer_skip(packet, QHEADERSZ); 95 96 /* The question record. */ 97 buffer_write(packet, dname_name(dname), dname->name_size); 98 buffer_write_u16(packet, type); 99 buffer_write_u16(packet, klass); 100 } 101 102 static socklen_t 103 #ifdef INET6 104 xfrd_acl_sockaddr(acl_options_type* acl, unsigned int port, 105 struct sockaddr_storage *sck) 106 #else 107 xfrd_acl_sockaddr(acl_options_type* acl, unsigned int port, 108 struct sockaddr_in *sck, const char* fromto) 109 #endif /* INET6 */ 110 { 111 /* setup address structure */ 112 #ifdef INET6 113 memset(sck, 0, sizeof(struct sockaddr_storage)); 114 #else 115 memset(sck, 0, sizeof(struct sockaddr_in)); 116 #endif 117 if(acl->is_ipv6) { 118 #ifdef INET6 119 struct sockaddr_in6* sa = (struct sockaddr_in6*)sck; 120 sa->sin6_family = AF_INET6; 121 sa->sin6_port = htons(port); 122 sa->sin6_addr = acl->addr.addr6; 123 return sizeof(struct sockaddr_in6); 124 #else 125 log_msg(LOG_ERR, "xfrd: IPv6 connection %s %s attempted but no \ 126 INET6.", fromto, acl->ip_address_spec); 127 return 0; 128 #endif 129 } else { 130 struct sockaddr_in* sa = (struct sockaddr_in*)sck; 131 sa->sin_family = AF_INET; 132 sa->sin_port = htons(port); 133 sa->sin_addr = acl->addr.addr; 134 return sizeof(struct sockaddr_in); 135 } 136 } 137 138 socklen_t 139 #ifdef INET6 140 xfrd_acl_sockaddr_to(acl_options_type* acl, struct sockaddr_storage *to) 141 #else 142 xfrd_acl_sockaddr_to(acl_options_type* acl, struct sockaddr_in *to) 143 #endif /* INET6 */ 144 { 145 unsigned int port = acl->port?acl->port:(unsigned)atoi(TCP_PORT); 146 #ifdef INET6 147 return xfrd_acl_sockaddr(acl, port, to); 148 #else 149 return xfrd_acl_sockaddr(acl, port, to, "to"); 150 #endif /* INET6 */ 151 } 152 153 socklen_t 154 #ifdef INET6 155 xfrd_acl_sockaddr_frm(acl_options_type* acl, struct sockaddr_storage *frm) 156 #else 157 xfrd_acl_sockaddr_frm(acl_options_type* acl, struct sockaddr_in *frm) 158 #endif /* INET6 */ 159 { 160 unsigned int port = acl->port?acl->port:0; 161 #ifdef INET6 162 return xfrd_acl_sockaddr(acl, port, frm); 163 #else 164 return xfrd_acl_sockaddr(acl, port, frm, "from"); 165 #endif /* INET6 */ 166 } 167 168 void 169 xfrd_write_soa_buffer(struct buffer* packet, 170 const dname_type* apex, struct xfrd_soa* soa) 171 { 172 size_t rdlength_pos; 173 uint16_t rdlength; 174 buffer_write(packet, dname_name(apex), apex->name_size); 175 176 /* already in network order */ 177 buffer_write(packet, &soa->type, sizeof(soa->type)); 178 buffer_write(packet, &soa->klass, sizeof(soa->klass)); 179 buffer_write(packet, &soa->ttl, sizeof(soa->ttl)); 180 rdlength_pos = buffer_position(packet); 181 buffer_skip(packet, sizeof(rdlength)); 182 183 /* uncompressed dnames */ 184 buffer_write(packet, soa->prim_ns+1, soa->prim_ns[0]); 185 buffer_write(packet, soa->email+1, soa->email[0]); 186 187 buffer_write(packet, &soa->serial, sizeof(uint32_t)); 188 buffer_write(packet, &soa->refresh, sizeof(uint32_t)); 189 buffer_write(packet, &soa->retry, sizeof(uint32_t)); 190 buffer_write(packet, &soa->expire, sizeof(uint32_t)); 191 buffer_write(packet, &soa->minimum, sizeof(uint32_t)); 192 193 /* write length of RR */ 194 rdlength = buffer_position(packet) - rdlength_pos - sizeof(rdlength); 195 buffer_write_u16_at(packet, rdlength_pos, rdlength); 196 } 197 198 struct xfrd_tcp* 199 xfrd_tcp_create(region_type* region, size_t bufsize) 200 { 201 struct xfrd_tcp* tcp_state = (struct xfrd_tcp*)region_alloc( 202 region, sizeof(struct xfrd_tcp)); 203 memset(tcp_state, 0, sizeof(struct xfrd_tcp)); 204 tcp_state->packet = buffer_create(region, bufsize); 205 tcp_state->fd = -1; 206 207 return tcp_state; 208 } 209 210 static struct xfrd_tcp_pipeline* 211 pipeline_find(struct xfrd_tcp_set* set, xfrd_zone_type* zone) 212 { 213 rbnode_type* sme = NULL; 214 struct xfrd_tcp_pipeline* r; 215 /* smaller buf than a full pipeline with 64kb ID array, only need 216 * the front part with the key info, this front part contains the 217 * members that the compare function uses. */ 218 const size_t keysize = sizeof(struct xfrd_tcp_pipeline) - 219 ID_PIPE_NUM*(sizeof(struct xfrd_zone*) + sizeof(uint16_t)); 220 /* void* type for alignment of the struct, 221 * divide the keysize by ptr-size and then add one to round up */ 222 void* buf[ (keysize / sizeof(void*)) + 1 ]; 223 struct xfrd_tcp_pipeline* key = (struct xfrd_tcp_pipeline*)buf; 224 key->node.key = key; 225 key->ip_len = xfrd_acl_sockaddr_to(zone->master, &key->ip); 226 key->num_unused = ID_PIPE_NUM; 227 /* lookup existing tcp transfer to the master with highest unused */ 228 if(rbtree_find_less_equal(set->pipetree, key, &sme)) { 229 /* exact match, strange, fully unused tcp cannot be open */ 230 assert(0); 231 } 232 if(!sme) 233 return NULL; 234 r = (struct xfrd_tcp_pipeline*)sme->key; 235 /* <= key pointed at, is the master correct ? */ 236 if(r->ip_len != key->ip_len) 237 return NULL; 238 if(memcmp(&r->ip, &key->ip, key->ip_len) != 0) 239 return NULL; 240 /* correct master, is there a slot free for this transfer? */ 241 if(r->num_unused == 0) 242 return NULL; 243 return r; 244 } 245 246 /* remove zone from tcp waiting list */ 247 static void 248 tcp_zone_waiting_list_popfirst(struct xfrd_tcp_set* set, xfrd_zone_type* zone) 249 { 250 assert(zone->tcp_waiting); 251 set->tcp_waiting_first = zone->tcp_waiting_next; 252 if(zone->tcp_waiting_next) 253 zone->tcp_waiting_next->tcp_waiting_prev = NULL; 254 else set->tcp_waiting_last = 0; 255 zone->tcp_waiting_next = 0; 256 zone->tcp_waiting = 0; 257 } 258 259 /* remove zone from tcp pipe write-wait list */ 260 static void 261 tcp_pipe_sendlist_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone) 262 { 263 if(zone->in_tcp_send) { 264 if(zone->tcp_send_prev) 265 zone->tcp_send_prev->tcp_send_next=zone->tcp_send_next; 266 else tp->tcp_send_first=zone->tcp_send_next; 267 if(zone->tcp_send_next) 268 zone->tcp_send_next->tcp_send_prev=zone->tcp_send_prev; 269 else tp->tcp_send_last=zone->tcp_send_prev; 270 zone->in_tcp_send = 0; 271 } 272 } 273 274 /* remove first from write-wait list */ 275 static void 276 tcp_pipe_sendlist_popfirst(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone) 277 { 278 tp->tcp_send_first = zone->tcp_send_next; 279 if(tp->tcp_send_first) 280 tp->tcp_send_first->tcp_send_prev = NULL; 281 else tp->tcp_send_last = NULL; 282 zone->in_tcp_send = 0; 283 } 284 285 /* remove zone from tcp pipe ID map */ 286 static void 287 tcp_pipe_id_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone) 288 { 289 assert(tp->num_unused < ID_PIPE_NUM && tp->num_unused >= 0); 290 assert(tp->id[zone->query_id] == zone); 291 tp->id[zone->query_id] = NULL; 292 tp->unused[tp->num_unused] = zone->query_id; 293 /* must remove and re-add for sort order in tree */ 294 (void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node); 295 tp->num_unused++; 296 (void)rbtree_insert(xfrd->tcp_set->pipetree, &tp->node); 297 } 298 299 /* stop the tcp pipe (and all its zones need to retry) */ 300 static void 301 xfrd_tcp_pipe_stop(struct xfrd_tcp_pipeline* tp) 302 { 303 int i, conn = -1; 304 assert(tp->num_unused < ID_PIPE_NUM); /* at least one 'in-use' */ 305 assert(ID_PIPE_NUM - tp->num_unused > tp->num_skip); /* at least one 'nonskip' */ 306 /* need to retry for all the zones connected to it */ 307 /* these could use different lists and go to a different nextmaster*/ 308 for(i=0; i<ID_PIPE_NUM; i++) { 309 if(tp->id[i] && tp->id[i] != TCP_NULL_SKIP) { 310 xfrd_zone_type* zone = tp->id[i]; 311 conn = zone->tcp_conn; 312 zone->tcp_conn = -1; 313 zone->tcp_waiting = 0; 314 tcp_pipe_sendlist_remove(tp, zone); 315 tcp_pipe_id_remove(tp, zone); 316 xfrd_set_refresh_now(zone); 317 } 318 } 319 assert(conn != -1); 320 /* now release the entire tcp pipe */ 321 xfrd_tcp_pipe_release(xfrd->tcp_set, tp, conn); 322 } 323 324 static void 325 tcp_pipe_reset_timeout(struct xfrd_tcp_pipeline* tp) 326 { 327 int fd = tp->handler.ev_fd; 328 struct timeval tv; 329 tv.tv_sec = xfrd->tcp_set->tcp_timeout; 330 tv.tv_usec = 0; 331 if(tp->handler_added) 332 event_del(&tp->handler); 333 event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ| 334 (tp->tcp_send_first?EV_WRITE:0), xfrd_handle_tcp_pipe, tp); 335 if(event_base_set(xfrd->event_base, &tp->handler) != 0) 336 log_msg(LOG_ERR, "xfrd tcp: event_base_set failed"); 337 if(event_add(&tp->handler, &tv) != 0) 338 log_msg(LOG_ERR, "xfrd tcp: event_add failed"); 339 tp->handler_added = 1; 340 } 341 342 /* handle event from fd of tcp pipe */ 343 void 344 xfrd_handle_tcp_pipe(int ATTR_UNUSED(fd), short event, void* arg) 345 { 346 struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)arg; 347 if((event & EV_WRITE)) { 348 tcp_pipe_reset_timeout(tp); 349 if(tp->tcp_send_first) { 350 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp write, zone %s", 351 tp->tcp_send_first->apex_str)); 352 xfrd_tcp_write(tp, tp->tcp_send_first); 353 } 354 } 355 if((event & EV_READ) && tp->handler_added) { 356 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp read")); 357 tcp_pipe_reset_timeout(tp); 358 xfrd_tcp_read(tp); 359 } 360 if((event & EV_TIMEOUT) && tp->handler_added) { 361 /* tcp connection timed out */ 362 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp timeout")); 363 xfrd_tcp_pipe_stop(tp); 364 } 365 } 366 367 /* add a zone to the pipeline, it starts to want to write its query */ 368 static void 369 pipeline_setup_new_zone(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp, 370 xfrd_zone_type* zone) 371 { 372 /* assign the ID */ 373 int idx; 374 assert(tp->num_unused > 0); 375 /* we pick a random ID, even though it is TCP anyway */ 376 idx = random_generate(tp->num_unused); 377 zone->query_id = tp->unused[idx]; 378 tp->unused[idx] = tp->unused[tp->num_unused-1]; 379 tp->id[zone->query_id] = zone; 380 /* decrement unused counter, and fixup tree */ 381 (void)rbtree_delete(set->pipetree, &tp->node); 382 tp->num_unused--; 383 (void)rbtree_insert(set->pipetree, &tp->node); 384 385 /* add to sendlist, at end */ 386 zone->tcp_send_next = NULL; 387 zone->tcp_send_prev = tp->tcp_send_last; 388 zone->in_tcp_send = 1; 389 if(tp->tcp_send_last) 390 tp->tcp_send_last->tcp_send_next = zone; 391 else tp->tcp_send_first = zone; 392 tp->tcp_send_last = zone; 393 394 /* is it first in line? */ 395 if(tp->tcp_send_first == zone) { 396 xfrd_tcp_setup_write_packet(tp, zone); 397 /* add write to event handler */ 398 tcp_pipe_reset_timeout(tp); 399 } 400 } 401 402 void 403 xfrd_tcp_obtain(struct xfrd_tcp_set* set, xfrd_zone_type* zone) 404 { 405 struct xfrd_tcp_pipeline* tp; 406 assert(zone->tcp_conn == -1); 407 assert(zone->tcp_waiting == 0); 408 409 if(set->tcp_count < XFRD_MAX_TCP) { 410 int i; 411 assert(!set->tcp_waiting_first); 412 set->tcp_count ++; 413 /* find a free tcp_buffer */ 414 for(i=0; i<XFRD_MAX_TCP; i++) { 415 if(set->tcp_state[i]->tcp_r->fd == -1) { 416 zone->tcp_conn = i; 417 break; 418 } 419 } 420 /** What if there is no free tcp_buffer? return; */ 421 if (zone->tcp_conn < 0) { 422 return; 423 } 424 425 tp = set->tcp_state[zone->tcp_conn]; 426 zone->tcp_waiting = 0; 427 428 /* stop udp use (if any) */ 429 if(zone->zone_handler.ev_fd != -1) 430 xfrd_udp_release(zone); 431 432 if(!xfrd_tcp_open(set, tp, zone)) { 433 zone->tcp_conn = -1; 434 set->tcp_count --; 435 xfrd_set_refresh_now(zone); 436 return; 437 } 438 /* ip and ip_len set by tcp_open */ 439 tp->node.key = tp; 440 tp->num_unused = ID_PIPE_NUM; 441 tp->num_skip = 0; 442 tp->tcp_send_first = NULL; 443 tp->tcp_send_last = NULL; 444 memset(tp->id, 0, sizeof(tp->id)); 445 for(i=0; i<ID_PIPE_NUM; i++) { 446 tp->unused[i] = i; 447 } 448 449 /* insert into tree */ 450 (void)rbtree_insert(set->pipetree, &tp->node); 451 xfrd_deactivate_zone(zone); 452 xfrd_unset_timer(zone); 453 pipeline_setup_new_zone(set, tp, zone); 454 return; 455 } 456 /* check for a pipeline to the same master with unused ID */ 457 if((tp = pipeline_find(set, zone))!= NULL) { 458 int i; 459 if(zone->zone_handler.ev_fd != -1) 460 xfrd_udp_release(zone); 461 for(i=0; i<XFRD_MAX_TCP; i++) { 462 if(set->tcp_state[i] == tp) 463 zone->tcp_conn = i; 464 } 465 xfrd_deactivate_zone(zone); 466 xfrd_unset_timer(zone); 467 pipeline_setup_new_zone(set, tp, zone); 468 return; 469 } 470 471 /* wait, at end of line */ 472 DEBUG(DEBUG_XFRD,2, (LOG_INFO, "xfrd: max number of tcp " 473 "connections (%d) reached.", XFRD_MAX_TCP)); 474 zone->tcp_waiting_next = 0; 475 zone->tcp_waiting_prev = set->tcp_waiting_last; 476 zone->tcp_waiting = 1; 477 if(!set->tcp_waiting_last) { 478 set->tcp_waiting_first = zone; 479 set->tcp_waiting_last = zone; 480 } else { 481 set->tcp_waiting_last->tcp_waiting_next = zone; 482 set->tcp_waiting_last = zone; 483 } 484 xfrd_deactivate_zone(zone); 485 xfrd_unset_timer(zone); 486 } 487 488 int 489 xfrd_tcp_open(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp, 490 xfrd_zone_type* zone) 491 { 492 int fd, family, conn; 493 struct timeval tv; 494 assert(zone->tcp_conn != -1); 495 496 /* if there is no next master, fallback to use the first one */ 497 /* but there really should be a master set */ 498 if(!zone->master) { 499 zone->master = zone->zone_options->pattern->request_xfr; 500 zone->master_num = 0; 501 } 502 503 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s open tcp conn to %s", 504 zone->apex_str, zone->master->ip_address_spec)); 505 tp->tcp_r->is_reading = 1; 506 tp->tcp_r->total_bytes = 0; 507 tp->tcp_r->msglen = 0; 508 buffer_clear(tp->tcp_r->packet); 509 tp->tcp_w->is_reading = 0; 510 tp->tcp_w->total_bytes = 0; 511 tp->tcp_w->msglen = 0; 512 tp->connection_established = 0; 513 514 if(zone->master->is_ipv6) { 515 #ifdef INET6 516 family = PF_INET6; 517 #else 518 xfrd_set_refresh_now(zone); 519 return 0; 520 #endif 521 } else { 522 family = PF_INET; 523 } 524 fd = socket(family, SOCK_STREAM, IPPROTO_TCP); 525 if(fd == -1) { 526 /* squelch 'Address family not supported by protocol' at low 527 * verbosity levels */ 528 if(errno != EAFNOSUPPORT || verbosity > 2) 529 log_msg(LOG_ERR, "xfrd: %s cannot create tcp socket: %s", 530 zone->master->ip_address_spec, strerror(errno)); 531 xfrd_set_refresh_now(zone); 532 return 0; 533 } 534 if(fcntl(fd, F_SETFL, O_NONBLOCK) == -1) { 535 log_msg(LOG_ERR, "xfrd: fcntl failed: %s", strerror(errno)); 536 close(fd); 537 xfrd_set_refresh_now(zone); 538 return 0; 539 } 540 541 if(xfrd->nsd->outgoing_tcp_mss > 0) { 542 #if defined(IPPROTO_TCP) && defined(TCP_MAXSEG) 543 if(setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, 544 (void*)&xfrd->nsd->outgoing_tcp_mss, 545 sizeof(xfrd->nsd->outgoing_tcp_mss)) < 0) { 546 log_msg(LOG_ERR, "xfrd: setsockopt(TCP_MAXSEG)" 547 "failed: %s", strerror(errno)); 548 } 549 #else 550 log_msg(LOG_ERR, "setsockopt(TCP_MAXSEG) unsupported"); 551 #endif 552 } 553 554 tp->ip_len = xfrd_acl_sockaddr_to(zone->master, &tp->ip); 555 556 /* bind it */ 557 if (!xfrd_bind_local_interface(fd, zone->zone_options->pattern-> 558 outgoing_interface, zone->master, 1)) { 559 close(fd); 560 xfrd_set_refresh_now(zone); 561 return 0; 562 } 563 564 conn = connect(fd, (struct sockaddr*)&tp->ip, tp->ip_len); 565 if (conn == -1 && errno != EINPROGRESS) { 566 log_msg(LOG_ERR, "xfrd: connect %s failed: %s", 567 zone->master->ip_address_spec, strerror(errno)); 568 close(fd); 569 xfrd_set_refresh_now(zone); 570 return 0; 571 } 572 tp->tcp_r->fd = fd; 573 tp->tcp_w->fd = fd; 574 575 /* set the tcp pipe event */ 576 if(tp->handler_added) 577 event_del(&tp->handler); 578 event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|EV_WRITE, 579 xfrd_handle_tcp_pipe, tp); 580 if(event_base_set(xfrd->event_base, &tp->handler) != 0) 581 log_msg(LOG_ERR, "xfrd tcp: event_base_set failed"); 582 tv.tv_sec = set->tcp_timeout; 583 tv.tv_usec = 0; 584 if(event_add(&tp->handler, &tv) != 0) 585 log_msg(LOG_ERR, "xfrd tcp: event_add failed"); 586 tp->handler_added = 1; 587 return 1; 588 } 589 590 void 591 xfrd_tcp_setup_write_packet(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone) 592 { 593 struct xfrd_tcp* tcp = tp->tcp_w; 594 assert(zone->tcp_conn != -1); 595 assert(zone->tcp_waiting == 0); 596 /* start AXFR or IXFR for the zone */ 597 if(zone->soa_disk_acquired == 0 || zone->master->use_axfr_only || 598 zone->master->ixfr_disabled || 599 /* if zone expired, after the first round, do not ask for 600 * IXFR any more, but full AXFR (of any serial number) */ 601 (zone->state == xfrd_zone_expired && zone->round_num != 0)) { 602 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request full zone transfer " 603 "(AXFR) for %s to %s", 604 zone->apex_str, zone->master->ip_address_spec)); 605 606 xfrd_setup_packet(tcp->packet, TYPE_AXFR, CLASS_IN, zone->apex, 607 zone->query_id); 608 } else { 609 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request incremental zone " 610 "transfer (IXFR) for %s to %s", 611 zone->apex_str, zone->master->ip_address_spec)); 612 613 xfrd_setup_packet(tcp->packet, TYPE_IXFR, CLASS_IN, zone->apex, 614 zone->query_id); 615 NSCOUNT_SET(tcp->packet, 1); 616 xfrd_write_soa_buffer(tcp->packet, zone->apex, &zone->soa_disk); 617 } 618 /* old transfer needs to be removed still? */ 619 if(zone->msg_seq_nr) 620 xfrd_unlink_xfrfile(xfrd->nsd, zone->xfrfilenumber); 621 zone->msg_seq_nr = 0; 622 zone->msg_rr_count = 0; 623 if(zone->master->key_options && zone->master->key_options->tsig_key) { 624 xfrd_tsig_sign_request(tcp->packet, &zone->tsig, zone->master); 625 } 626 buffer_flip(tcp->packet); 627 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "sent tcp query with ID %d", zone->query_id)); 628 tcp->msglen = buffer_limit(tcp->packet); 629 tcp->total_bytes = 0; 630 } 631 632 static void 633 tcp_conn_ready_for_reading(struct xfrd_tcp* tcp) 634 { 635 tcp->total_bytes = 0; 636 tcp->msglen = 0; 637 buffer_clear(tcp->packet); 638 } 639 640 int conn_write(struct xfrd_tcp* tcp) 641 { 642 ssize_t sent; 643 644 if(tcp->total_bytes < sizeof(tcp->msglen)) { 645 uint16_t sendlen = htons(tcp->msglen); 646 #ifdef HAVE_WRITEV 647 struct iovec iov[2]; 648 iov[0].iov_base = (uint8_t*)&sendlen + tcp->total_bytes; 649 iov[0].iov_len = sizeof(sendlen) - tcp->total_bytes; 650 iov[1].iov_base = buffer_begin(tcp->packet); 651 iov[1].iov_len = buffer_limit(tcp->packet); 652 sent = writev(tcp->fd, iov, 2); 653 #else /* HAVE_WRITEV */ 654 sent = write(tcp->fd, 655 (const char*)&sendlen + tcp->total_bytes, 656 sizeof(tcp->msglen) - tcp->total_bytes); 657 #endif /* HAVE_WRITEV */ 658 659 if(sent == -1) { 660 if(errno == EAGAIN || errno == EINTR) { 661 /* write would block, try later */ 662 return 0; 663 } else { 664 return -1; 665 } 666 } 667 668 tcp->total_bytes += sent; 669 if(sent > (ssize_t)sizeof(tcp->msglen)) 670 buffer_skip(tcp->packet, sent-sizeof(tcp->msglen)); 671 if(tcp->total_bytes < sizeof(tcp->msglen)) { 672 /* incomplete write, resume later */ 673 return 0; 674 } 675 #ifdef HAVE_WRITEV 676 if(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen)) { 677 /* packet done */ 678 return 1; 679 } 680 #endif 681 assert(tcp->total_bytes >= sizeof(tcp->msglen)); 682 } 683 684 assert(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen)); 685 686 sent = write(tcp->fd, 687 buffer_current(tcp->packet), 688 buffer_remaining(tcp->packet)); 689 if(sent == -1) { 690 if(errno == EAGAIN || errno == EINTR) { 691 /* write would block, try later */ 692 return 0; 693 } else { 694 return -1; 695 } 696 } 697 698 buffer_skip(tcp->packet, sent); 699 tcp->total_bytes += sent; 700 701 if(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen)) { 702 /* more to write when socket becomes writable again */ 703 return 0; 704 } 705 706 assert(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen)); 707 return 1; 708 } 709 710 void 711 xfrd_tcp_write(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone) 712 { 713 int ret; 714 struct xfrd_tcp* tcp = tp->tcp_w; 715 assert(zone->tcp_conn != -1); 716 assert(zone == tp->tcp_send_first); 717 /* see if for non-established connection, there is a connect error */ 718 if(!tp->connection_established) { 719 /* check for pending error from nonblocking connect */ 720 /* from Stevens, unix network programming, vol1, 3rd ed, p450 */ 721 int error = 0; 722 socklen_t len = sizeof(error); 723 if(getsockopt(tcp->fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0){ 724 error = errno; /* on solaris errno is error */ 725 } 726 if(error == EINPROGRESS || error == EWOULDBLOCK) 727 return; /* try again later */ 728 if(error != 0) { 729 log_msg(LOG_ERR, "%s: Could not tcp connect to %s: %s", 730 zone->apex_str, zone->master->ip_address_spec, 731 strerror(error)); 732 xfrd_tcp_pipe_stop(tp); 733 return; 734 } 735 } 736 ret = conn_write(tcp); 737 if(ret == -1) { 738 log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno)); 739 xfrd_tcp_pipe_stop(tp); 740 return; 741 } 742 if(tcp->total_bytes != 0 && !tp->connection_established) 743 tp->connection_established = 1; 744 if(ret == 0) { 745 return; /* write again later */ 746 } 747 /* done writing this message */ 748 749 /* remove first zone from sendlist */ 750 tcp_pipe_sendlist_popfirst(tp, zone); 751 752 /* see if other zone wants to write; init; let it write (now) */ 753 /* and use a loop, because 64k stack calls is a too much */ 754 while(tp->tcp_send_first) { 755 /* setup to write for this zone */ 756 xfrd_tcp_setup_write_packet(tp, tp->tcp_send_first); 757 /* attempt to write for this zone (if success, continue loop)*/ 758 ret = conn_write(tcp); 759 if(ret == -1) { 760 log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno)); 761 xfrd_tcp_pipe_stop(tp); 762 return; 763 } 764 if(ret == 0) 765 return; /* write again later */ 766 tcp_pipe_sendlist_popfirst(tp, tp->tcp_send_first); 767 } 768 769 /* if sendlist empty, remove WRITE from event */ 770 771 /* listen to READ, and not WRITE events */ 772 assert(tp->tcp_send_first == NULL); 773 tcp_pipe_reset_timeout(tp); 774 } 775 776 int 777 conn_read(struct xfrd_tcp* tcp) 778 { 779 ssize_t received; 780 /* receive leading packet length bytes */ 781 if(tcp->total_bytes < sizeof(tcp->msglen)) { 782 received = read(tcp->fd, 783 (char*) &tcp->msglen + tcp->total_bytes, 784 sizeof(tcp->msglen) - tcp->total_bytes); 785 if(received == -1) { 786 if(errno == EAGAIN || errno == EINTR) { 787 /* read would block, try later */ 788 return 0; 789 } else { 790 #ifdef ECONNRESET 791 if (verbosity >= 2 || errno != ECONNRESET) 792 #endif /* ECONNRESET */ 793 log_msg(LOG_ERR, "tcp read sz: %s", strerror(errno)); 794 return -1; 795 } 796 } else if(received == 0) { 797 /* EOF */ 798 return -1; 799 } 800 tcp->total_bytes += received; 801 if(tcp->total_bytes < sizeof(tcp->msglen)) { 802 /* not complete yet, try later */ 803 return 0; 804 } 805 806 assert(tcp->total_bytes == sizeof(tcp->msglen)); 807 tcp->msglen = ntohs(tcp->msglen); 808 809 if(tcp->msglen == 0) { 810 buffer_set_limit(tcp->packet, tcp->msglen); 811 return 1; 812 } 813 if(tcp->msglen > buffer_capacity(tcp->packet)) { 814 log_msg(LOG_ERR, "buffer too small, dropping connection"); 815 return 0; 816 } 817 buffer_set_limit(tcp->packet, tcp->msglen); 818 } 819 820 assert(buffer_remaining(tcp->packet) > 0); 821 822 received = read(tcp->fd, buffer_current(tcp->packet), 823 buffer_remaining(tcp->packet)); 824 if(received == -1) { 825 if(errno == EAGAIN || errno == EINTR) { 826 /* read would block, try later */ 827 return 0; 828 } else { 829 #ifdef ECONNRESET 830 if (verbosity >= 2 || errno != ECONNRESET) 831 #endif /* ECONNRESET */ 832 log_msg(LOG_ERR, "tcp read %s", strerror(errno)); 833 return -1; 834 } 835 } else if(received == 0) { 836 /* EOF */ 837 return -1; 838 } 839 840 tcp->total_bytes += received; 841 buffer_skip(tcp->packet, received); 842 843 if(buffer_remaining(tcp->packet) > 0) { 844 /* not complete yet, wait for more */ 845 return 0; 846 } 847 848 /* completed */ 849 assert(buffer_position(tcp->packet) == tcp->msglen); 850 return 1; 851 } 852 853 void 854 xfrd_tcp_read(struct xfrd_tcp_pipeline* tp) 855 { 856 xfrd_zone_type* zone; 857 struct xfrd_tcp* tcp = tp->tcp_r; 858 int ret; 859 enum xfrd_packet_result pkt_result; 860 861 ret = conn_read(tcp); 862 if(ret == -1) { 863 xfrd_tcp_pipe_stop(tp); 864 return; 865 } 866 if(ret == 0) 867 return; 868 /* completed msg */ 869 buffer_flip(tcp->packet); 870 /* see which ID number it is, if skip, handle skip, NULL: warn */ 871 if(tcp->msglen < QHEADERSZ) { 872 /* too short for DNS header, skip it */ 873 DEBUG(DEBUG_XFRD,1, (LOG_INFO, 874 "xfrd: tcp skip response that is too short")); 875 tcp_conn_ready_for_reading(tcp); 876 return; 877 } 878 zone = tp->id[ID(tcp->packet)]; 879 if(!zone || zone == TCP_NULL_SKIP) { 880 /* no zone for this id? skip it */ 881 DEBUG(DEBUG_XFRD,1, (LOG_INFO, 882 "xfrd: tcp skip response with %s ID", 883 zone?"set-to-skip":"unknown")); 884 tcp_conn_ready_for_reading(tcp); 885 return; 886 } 887 assert(zone->tcp_conn != -1); 888 889 /* handle message for zone */ 890 pkt_result = xfrd_handle_received_xfr_packet(zone, tcp->packet); 891 /* setup for reading the next packet on this connection */ 892 tcp_conn_ready_for_reading(tcp); 893 switch(pkt_result) { 894 case xfrd_packet_more: 895 /* wait for next packet */ 896 break; 897 case xfrd_packet_newlease: 898 /* set to skip if more packets with this ID */ 899 tp->id[zone->query_id] = TCP_NULL_SKIP; 900 tp->num_skip++; 901 /* fall through to remove zone from tp */ 902 /* fallthrough */ 903 case xfrd_packet_transfer: 904 if(zone->zone_options->pattern->multi_master_check) { 905 xfrd_tcp_release(xfrd->tcp_set, zone); 906 xfrd_make_request(zone); 907 break; 908 } 909 xfrd_tcp_release(xfrd->tcp_set, zone); 910 assert(zone->round_num == -1); 911 break; 912 case xfrd_packet_notimpl: 913 xfrd_disable_ixfr(zone); 914 xfrd_tcp_release(xfrd->tcp_set, zone); 915 /* query next server */ 916 xfrd_make_request(zone); 917 break; 918 case xfrd_packet_bad: 919 case xfrd_packet_tcp: 920 default: 921 /* set to skip if more packets with this ID */ 922 tp->id[zone->query_id] = TCP_NULL_SKIP; 923 tp->num_skip++; 924 xfrd_tcp_release(xfrd->tcp_set, zone); 925 /* query next server */ 926 xfrd_make_request(zone); 927 break; 928 } 929 } 930 931 void 932 xfrd_tcp_release(struct xfrd_tcp_set* set, xfrd_zone_type* zone) 933 { 934 int conn = zone->tcp_conn; 935 struct xfrd_tcp_pipeline* tp = set->tcp_state[conn]; 936 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s released tcp conn to %s", 937 zone->apex_str, zone->master->ip_address_spec)); 938 assert(zone->tcp_conn != -1); 939 assert(zone->tcp_waiting == 0); 940 zone->tcp_conn = -1; 941 zone->tcp_waiting = 0; 942 943 /* remove from tcp_send list */ 944 tcp_pipe_sendlist_remove(tp, zone); 945 /* remove it from the ID list */ 946 if(tp->id[zone->query_id] != TCP_NULL_SKIP) 947 tcp_pipe_id_remove(tp, zone); 948 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: released tcp pipe now %d unused", 949 tp->num_unused)); 950 /* if pipe was full, but no more, then see if waiting element is 951 * for the same master, and can fill the unused ID */ 952 if(tp->num_unused == 1 && set->tcp_waiting_first) { 953 #ifdef INET6 954 struct sockaddr_storage to; 955 #else 956 struct sockaddr_in to; 957 #endif 958 socklen_t to_len = xfrd_acl_sockaddr_to( 959 set->tcp_waiting_first->master, &to); 960 if(to_len == tp->ip_len && memcmp(&to, &tp->ip, to_len) == 0) { 961 /* use this connection for the waiting zone */ 962 zone = set->tcp_waiting_first; 963 assert(zone->tcp_conn == -1); 964 zone->tcp_conn = conn; 965 tcp_zone_waiting_list_popfirst(set, zone); 966 if(zone->zone_handler.ev_fd != -1) 967 xfrd_udp_release(zone); 968 xfrd_unset_timer(zone); 969 pipeline_setup_new_zone(set, tp, zone); 970 return; 971 } 972 /* waiting zone did not go to same server */ 973 } 974 975 /* if all unused, or only skipped leftover, close the pipeline */ 976 if(tp->num_unused >= ID_PIPE_NUM || tp->num_skip >= ID_PIPE_NUM - tp->num_unused) 977 xfrd_tcp_pipe_release(set, tp, conn); 978 } 979 980 void 981 xfrd_tcp_pipe_release(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp, 982 int conn) 983 { 984 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: tcp pipe released")); 985 /* one handler per tcp pipe */ 986 if(tp->handler_added) 987 event_del(&tp->handler); 988 tp->handler_added = 0; 989 990 /* fd in tcp_r and tcp_w is the same, close once */ 991 if(tp->tcp_r->fd != -1) 992 close(tp->tcp_r->fd); 993 tp->tcp_r->fd = -1; 994 tp->tcp_w->fd = -1; 995 996 /* remove from pipetree */ 997 (void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node); 998 999 /* a waiting zone can use the free tcp slot (to another server) */ 1000 /* if that zone fails to set-up or connect, we try to start the next 1001 * waiting zone in the list */ 1002 while(set->tcp_count == XFRD_MAX_TCP && set->tcp_waiting_first) { 1003 int i; 1004 1005 /* pop first waiting process */ 1006 xfrd_zone_type* zone = set->tcp_waiting_first; 1007 /* start it */ 1008 assert(zone->tcp_conn == -1); 1009 zone->tcp_conn = conn; 1010 tcp_zone_waiting_list_popfirst(set, zone); 1011 1012 /* stop udp (if any) */ 1013 if(zone->zone_handler.ev_fd != -1) 1014 xfrd_udp_release(zone); 1015 if(!xfrd_tcp_open(set, tp, zone)) { 1016 zone->tcp_conn = -1; 1017 xfrd_set_refresh_now(zone); 1018 /* try to start the next zone (if any) */ 1019 continue; 1020 } 1021 /* re-init this tcppipe */ 1022 /* ip and ip_len set by tcp_open */ 1023 tp->node.key = tp; 1024 tp->num_unused = ID_PIPE_NUM; 1025 tp->num_skip = 0; 1026 tp->tcp_send_first = NULL; 1027 tp->tcp_send_last = NULL; 1028 memset(tp->id, 0, sizeof(tp->id)); 1029 for(i=0; i<ID_PIPE_NUM; i++) { 1030 tp->unused[i] = i; 1031 } 1032 1033 /* insert into tree */ 1034 (void)rbtree_insert(set->pipetree, &tp->node); 1035 /* setup write */ 1036 xfrd_unset_timer(zone); 1037 pipeline_setup_new_zone(set, tp, zone); 1038 /* started a task, no need for cleanups, so return */ 1039 return; 1040 } 1041 /* no task to start, cleanup */ 1042 assert(!set->tcp_waiting_first); 1043 set->tcp_count --; 1044 assert(set->tcp_count >= 0); 1045 } 1046 1047