xref: /openbsd-src/usr.sbin/nsd/xfrd-tcp.c (revision f1dd7b858388b4a23f4f67a4957ec5ff656ebbe8)
1 /*
2  * xfrd-tcp.c - XFR (transfer) Daemon TCP system source file. Manages tcp conn.
3  *
4  * Copyright (c) 2001-2006, NLnet Labs. All rights reserved.
5  *
6  * See LICENSE for the license.
7  *
8  */
9 
10 #include "config.h"
11 #include <assert.h>
12 #include <errno.h>
13 #include <fcntl.h>
14 #include <unistd.h>
15 #include <stdlib.h>
16 #include <sys/uio.h>
17 #include "nsd.h"
18 #include "xfrd-tcp.h"
19 #include "buffer.h"
20 #include "packet.h"
21 #include "dname.h"
22 #include "options.h"
23 #include "namedb.h"
24 #include "xfrd.h"
25 #include "xfrd-disk.h"
26 #include "util.h"
27 
28 /* sort tcppipe, first on IP address, for an IPaddresss, sort on num_unused */
29 static int
30 xfrd_pipe_cmp(const void* a, const void* b)
31 {
32 	const struct xfrd_tcp_pipeline* x = (struct xfrd_tcp_pipeline*)a;
33 	const struct xfrd_tcp_pipeline* y = (struct xfrd_tcp_pipeline*)b;
34 	int r;
35 	if(x == y)
36 		return 0;
37 	if(y->ip_len != x->ip_len)
38 		/* subtraction works because nonnegative and small numbers */
39 		return (int)y->ip_len - (int)x->ip_len;
40 	r = memcmp(&x->ip, &y->ip, x->ip_len);
41 	if(r != 0)
42 		return r;
43 	/* sort that num_unused is sorted ascending, */
44 	if(x->num_unused != y->num_unused) {
45 		return (x->num_unused < y->num_unused) ? -1 : 1;
46 	}
47 	/* different pipelines are different still, even with same numunused*/
48 	return (uintptr_t)x < (uintptr_t)y ? -1 : 1;
49 }
50 
51 struct xfrd_tcp_set* xfrd_tcp_set_create(struct region* region)
52 {
53 	int i;
54 	struct xfrd_tcp_set* tcp_set = region_alloc(region,
55 		sizeof(struct xfrd_tcp_set));
56 	memset(tcp_set, 0, sizeof(struct xfrd_tcp_set));
57 	tcp_set->tcp_count = 0;
58 	tcp_set->tcp_waiting_first = 0;
59 	tcp_set->tcp_waiting_last = 0;
60 	for(i=0; i<XFRD_MAX_TCP; i++)
61 		tcp_set->tcp_state[i] = xfrd_tcp_pipeline_create(region);
62 	tcp_set->pipetree = rbtree_create(region, &xfrd_pipe_cmp);
63 	return tcp_set;
64 }
65 
66 struct xfrd_tcp_pipeline*
67 xfrd_tcp_pipeline_create(region_type* region)
68 {
69 	int i;
70 	struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)
71 		region_alloc_zero(region, sizeof(*tp));
72 	tp->num_unused = ID_PIPE_NUM;
73 	assert(sizeof(tp->unused)/sizeof(tp->unused[0]) == ID_PIPE_NUM);
74 	for(i=0; i<ID_PIPE_NUM; i++)
75 		tp->unused[i] = (uint16_t)i;
76 	tp->tcp_r = xfrd_tcp_create(region, QIOBUFSZ);
77 	tp->tcp_w = xfrd_tcp_create(region, 512);
78 	return tp;
79 }
80 
81 void
82 xfrd_setup_packet(buffer_type* packet,
83 	uint16_t type, uint16_t klass, const dname_type* dname, uint16_t qid)
84 {
85 	/* Set up the header */
86 	buffer_clear(packet);
87 	ID_SET(packet, qid);
88 	FLAGS_SET(packet, 0);
89 	OPCODE_SET(packet, OPCODE_QUERY);
90 	QDCOUNT_SET(packet, 1);
91 	ANCOUNT_SET(packet, 0);
92 	NSCOUNT_SET(packet, 0);
93 	ARCOUNT_SET(packet, 0);
94 	buffer_skip(packet, QHEADERSZ);
95 
96 	/* The question record. */
97 	buffer_write(packet, dname_name(dname), dname->name_size);
98 	buffer_write_u16(packet, type);
99 	buffer_write_u16(packet, klass);
100 }
101 
102 static socklen_t
103 #ifdef INET6
104 xfrd_acl_sockaddr(acl_options_type* acl, unsigned int port,
105 	struct sockaddr_storage *sck)
106 #else
107 xfrd_acl_sockaddr(acl_options_type* acl, unsigned int port,
108 	struct sockaddr_in *sck, const char* fromto)
109 #endif /* INET6 */
110 {
111 	/* setup address structure */
112 #ifdef INET6
113 	memset(sck, 0, sizeof(struct sockaddr_storage));
114 #else
115 	memset(sck, 0, sizeof(struct sockaddr_in));
116 #endif
117 	if(acl->is_ipv6) {
118 #ifdef INET6
119 		struct sockaddr_in6* sa = (struct sockaddr_in6*)sck;
120 		sa->sin6_family = AF_INET6;
121 		sa->sin6_port = htons(port);
122 		sa->sin6_addr = acl->addr.addr6;
123 		return sizeof(struct sockaddr_in6);
124 #else
125 		log_msg(LOG_ERR, "xfrd: IPv6 connection %s %s attempted but no \
126 INET6.", fromto, acl->ip_address_spec);
127 		return 0;
128 #endif
129 	} else {
130 		struct sockaddr_in* sa = (struct sockaddr_in*)sck;
131 		sa->sin_family = AF_INET;
132 		sa->sin_port = htons(port);
133 		sa->sin_addr = acl->addr.addr;
134 		return sizeof(struct sockaddr_in);
135 	}
136 }
137 
138 socklen_t
139 #ifdef INET6
140 xfrd_acl_sockaddr_to(acl_options_type* acl, struct sockaddr_storage *to)
141 #else
142 xfrd_acl_sockaddr_to(acl_options_type* acl, struct sockaddr_in *to)
143 #endif /* INET6 */
144 {
145 	unsigned int port = acl->port?acl->port:(unsigned)atoi(TCP_PORT);
146 #ifdef INET6
147 	return xfrd_acl_sockaddr(acl, port, to);
148 #else
149 	return xfrd_acl_sockaddr(acl, port, to, "to");
150 #endif /* INET6 */
151 }
152 
153 socklen_t
154 #ifdef INET6
155 xfrd_acl_sockaddr_frm(acl_options_type* acl, struct sockaddr_storage *frm)
156 #else
157 xfrd_acl_sockaddr_frm(acl_options_type* acl, struct sockaddr_in *frm)
158 #endif /* INET6 */
159 {
160 	unsigned int port = acl->port?acl->port:0;
161 #ifdef INET6
162 	return xfrd_acl_sockaddr(acl, port, frm);
163 #else
164 	return xfrd_acl_sockaddr(acl, port, frm, "from");
165 #endif /* INET6 */
166 }
167 
168 void
169 xfrd_write_soa_buffer(struct buffer* packet,
170 	const dname_type* apex, struct xfrd_soa* soa)
171 {
172 	size_t rdlength_pos;
173 	uint16_t rdlength;
174 	buffer_write(packet, dname_name(apex), apex->name_size);
175 
176 	/* already in network order */
177 	buffer_write(packet, &soa->type, sizeof(soa->type));
178 	buffer_write(packet, &soa->klass, sizeof(soa->klass));
179 	buffer_write(packet, &soa->ttl, sizeof(soa->ttl));
180 	rdlength_pos = buffer_position(packet);
181 	buffer_skip(packet, sizeof(rdlength));
182 
183 	/* uncompressed dnames */
184 	buffer_write(packet, soa->prim_ns+1, soa->prim_ns[0]);
185 	buffer_write(packet, soa->email+1, soa->email[0]);
186 
187 	buffer_write(packet, &soa->serial, sizeof(uint32_t));
188 	buffer_write(packet, &soa->refresh, sizeof(uint32_t));
189 	buffer_write(packet, &soa->retry, sizeof(uint32_t));
190 	buffer_write(packet, &soa->expire, sizeof(uint32_t));
191 	buffer_write(packet, &soa->minimum, sizeof(uint32_t));
192 
193 	/* write length of RR */
194 	rdlength = buffer_position(packet) - rdlength_pos - sizeof(rdlength);
195 	buffer_write_u16_at(packet, rdlength_pos, rdlength);
196 }
197 
198 struct xfrd_tcp*
199 xfrd_tcp_create(region_type* region, size_t bufsize)
200 {
201 	struct xfrd_tcp* tcp_state = (struct xfrd_tcp*)region_alloc(
202 		region, sizeof(struct xfrd_tcp));
203 	memset(tcp_state, 0, sizeof(struct xfrd_tcp));
204 	tcp_state->packet = buffer_create(region, bufsize);
205 	tcp_state->fd = -1;
206 
207 	return tcp_state;
208 }
209 
210 static struct xfrd_tcp_pipeline*
211 pipeline_find(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
212 {
213 	rbnode_type* sme = NULL;
214 	struct xfrd_tcp_pipeline* r;
215 	/* smaller buf than a full pipeline with 64kb ID array, only need
216 	 * the front part with the key info, this front part contains the
217 	 * members that the compare function uses. */
218 	enum { keysize = sizeof(struct xfrd_tcp_pipeline) -
219 		ID_PIPE_NUM*(sizeof(struct xfrd_zone*) + sizeof(uint16_t)) };
220 	/* void* type for alignment of the struct,
221 	 * divide the keysize by ptr-size and then add one to round up */
222 	void* buf[ (keysize / sizeof(void*)) + 1 ];
223 	struct xfrd_tcp_pipeline* key = (struct xfrd_tcp_pipeline*)buf;
224 	key->node.key = key;
225 	key->ip_len = xfrd_acl_sockaddr_to(zone->master, &key->ip);
226 	key->num_unused = ID_PIPE_NUM;
227 	/* lookup existing tcp transfer to the master with highest unused */
228 	if(rbtree_find_less_equal(set->pipetree, key, &sme)) {
229 		/* exact match, strange, fully unused tcp cannot be open */
230 		assert(0);
231 	}
232 	if(!sme)
233 		return NULL;
234 	r = (struct xfrd_tcp_pipeline*)sme->key;
235 	/* <= key pointed at, is the master correct ? */
236 	if(r->ip_len != key->ip_len)
237 		return NULL;
238 	if(memcmp(&r->ip, &key->ip, key->ip_len) != 0)
239 		return NULL;
240 	/* correct master, is there a slot free for this transfer? */
241 	if(r->num_unused == 0)
242 		return NULL;
243 	return r;
244 }
245 
246 /* remove zone from tcp waiting list */
247 static void
248 tcp_zone_waiting_list_popfirst(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
249 {
250 	assert(zone->tcp_waiting);
251 	set->tcp_waiting_first = zone->tcp_waiting_next;
252 	if(zone->tcp_waiting_next)
253 		zone->tcp_waiting_next->tcp_waiting_prev = NULL;
254 	else	set->tcp_waiting_last = 0;
255 	zone->tcp_waiting_next = 0;
256 	zone->tcp_waiting = 0;
257 }
258 
259 /* remove zone from tcp pipe write-wait list */
260 static void
261 tcp_pipe_sendlist_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
262 {
263 	if(zone->in_tcp_send) {
264 		if(zone->tcp_send_prev)
265 			zone->tcp_send_prev->tcp_send_next=zone->tcp_send_next;
266 		else	tp->tcp_send_first=zone->tcp_send_next;
267 		if(zone->tcp_send_next)
268 			zone->tcp_send_next->tcp_send_prev=zone->tcp_send_prev;
269 		else	tp->tcp_send_last=zone->tcp_send_prev;
270 		zone->in_tcp_send = 0;
271 	}
272 }
273 
274 /* remove first from write-wait list */
275 static void
276 tcp_pipe_sendlist_popfirst(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
277 {
278 	tp->tcp_send_first = zone->tcp_send_next;
279 	if(tp->tcp_send_first)
280 		tp->tcp_send_first->tcp_send_prev = NULL;
281 	else	tp->tcp_send_last = NULL;
282 	zone->in_tcp_send = 0;
283 }
284 
285 /* remove zone from tcp pipe ID map */
286 static void
287 tcp_pipe_id_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
288 {
289 	assert(tp->num_unused < ID_PIPE_NUM && tp->num_unused >= 0);
290 	assert(tp->id[zone->query_id] == zone);
291 	tp->id[zone->query_id] = NULL;
292 	tp->unused[tp->num_unused] = zone->query_id;
293 	/* must remove and re-add for sort order in tree */
294 	(void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
295 	tp->num_unused++;
296 	(void)rbtree_insert(xfrd->tcp_set->pipetree, &tp->node);
297 }
298 
299 /* stop the tcp pipe (and all its zones need to retry) */
300 static void
301 xfrd_tcp_pipe_stop(struct xfrd_tcp_pipeline* tp)
302 {
303 	int i, conn = -1;
304 	assert(tp->num_unused < ID_PIPE_NUM); /* at least one 'in-use' */
305 	assert(ID_PIPE_NUM - tp->num_unused > tp->num_skip); /* at least one 'nonskip' */
306 	/* need to retry for all the zones connected to it */
307 	/* these could use different lists and go to a different nextmaster*/
308 	for(i=0; i<ID_PIPE_NUM; i++) {
309 		if(tp->id[i] && tp->id[i] != TCP_NULL_SKIP) {
310 			xfrd_zone_type* zone = tp->id[i];
311 			conn = zone->tcp_conn;
312 			zone->tcp_conn = -1;
313 			zone->tcp_waiting = 0;
314 			tcp_pipe_sendlist_remove(tp, zone);
315 			tcp_pipe_id_remove(tp, zone);
316 			xfrd_set_refresh_now(zone);
317 		}
318 	}
319 	assert(conn != -1);
320 	/* now release the entire tcp pipe */
321 	xfrd_tcp_pipe_release(xfrd->tcp_set, tp, conn);
322 }
323 
324 static void
325 tcp_pipe_reset_timeout(struct xfrd_tcp_pipeline* tp)
326 {
327 	int fd = tp->handler.ev_fd;
328 	struct timeval tv;
329 	tv.tv_sec = xfrd->tcp_set->tcp_timeout;
330 	tv.tv_usec = 0;
331 	if(tp->handler_added)
332 		event_del(&tp->handler);
333 	memset(&tp->handler, 0, sizeof(tp->handler));
334 	event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|
335 		(tp->tcp_send_first?EV_WRITE:0), xfrd_handle_tcp_pipe, tp);
336 	if(event_base_set(xfrd->event_base, &tp->handler) != 0)
337 		log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
338 	if(event_add(&tp->handler, &tv) != 0)
339 		log_msg(LOG_ERR, "xfrd tcp: event_add failed");
340 	tp->handler_added = 1;
341 }
342 
343 /* handle event from fd of tcp pipe */
344 void
345 xfrd_handle_tcp_pipe(int ATTR_UNUSED(fd), short event, void* arg)
346 {
347 	struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)arg;
348 	if((event & EV_WRITE)) {
349 		tcp_pipe_reset_timeout(tp);
350 		if(tp->tcp_send_first) {
351 			DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp write, zone %s",
352 				tp->tcp_send_first->apex_str));
353 			xfrd_tcp_write(tp, tp->tcp_send_first);
354 		}
355 	}
356 	if((event & EV_READ) && tp->handler_added) {
357 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp read"));
358 		tcp_pipe_reset_timeout(tp);
359 		xfrd_tcp_read(tp);
360 	}
361 	if((event & EV_TIMEOUT) && tp->handler_added) {
362 		/* tcp connection timed out */
363 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp timeout"));
364 		xfrd_tcp_pipe_stop(tp);
365 	}
366 }
367 
368 /* add a zone to the pipeline, it starts to want to write its query */
369 static void
370 pipeline_setup_new_zone(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp,
371 	xfrd_zone_type* zone)
372 {
373 	/* assign the ID */
374 	int idx;
375 	assert(tp->num_unused > 0);
376 	/* we pick a random ID, even though it is TCP anyway */
377 	idx = random_generate(tp->num_unused);
378 	zone->query_id = tp->unused[idx];
379 	tp->unused[idx] = tp->unused[tp->num_unused-1];
380 	tp->id[zone->query_id] = zone;
381 	/* decrement unused counter, and fixup tree */
382 	(void)rbtree_delete(set->pipetree, &tp->node);
383 	tp->num_unused--;
384 	(void)rbtree_insert(set->pipetree, &tp->node);
385 
386 	/* add to sendlist, at end */
387 	zone->tcp_send_next = NULL;
388 	zone->tcp_send_prev = tp->tcp_send_last;
389 	zone->in_tcp_send = 1;
390 	if(tp->tcp_send_last)
391 		tp->tcp_send_last->tcp_send_next = zone;
392 	else	tp->tcp_send_first = zone;
393 	tp->tcp_send_last = zone;
394 
395 	/* is it first in line? */
396 	if(tp->tcp_send_first == zone) {
397 		xfrd_tcp_setup_write_packet(tp, zone);
398 		/* add write to event handler */
399 		tcp_pipe_reset_timeout(tp);
400 	}
401 }
402 
403 void
404 xfrd_tcp_obtain(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
405 {
406 	struct xfrd_tcp_pipeline* tp;
407 	assert(zone->tcp_conn == -1);
408 	assert(zone->tcp_waiting == 0);
409 
410 	if(set->tcp_count < XFRD_MAX_TCP) {
411 		int i;
412 		assert(!set->tcp_waiting_first);
413 		set->tcp_count ++;
414 		/* find a free tcp_buffer */
415 		for(i=0; i<XFRD_MAX_TCP; i++) {
416 			if(set->tcp_state[i]->tcp_r->fd == -1) {
417 				zone->tcp_conn = i;
418 				break;
419 			}
420 		}
421 		/** What if there is no free tcp_buffer? return; */
422 		if (zone->tcp_conn < 0) {
423 			return;
424 		}
425 
426 		tp = set->tcp_state[zone->tcp_conn];
427 		zone->tcp_waiting = 0;
428 
429 		/* stop udp use (if any) */
430 		if(zone->zone_handler.ev_fd != -1)
431 			xfrd_udp_release(zone);
432 
433 		if(!xfrd_tcp_open(set, tp, zone)) {
434 			zone->tcp_conn = -1;
435 			set->tcp_count --;
436 			xfrd_set_refresh_now(zone);
437 			return;
438 		}
439 		/* ip and ip_len set by tcp_open */
440 		tp->node.key = tp;
441 		tp->num_unused = ID_PIPE_NUM;
442 		tp->num_skip = 0;
443 		tp->tcp_send_first = NULL;
444 		tp->tcp_send_last = NULL;
445 		memset(tp->id, 0, sizeof(tp->id));
446 		for(i=0; i<ID_PIPE_NUM; i++) {
447 			tp->unused[i] = i;
448 		}
449 
450 		/* insert into tree */
451 		(void)rbtree_insert(set->pipetree, &tp->node);
452 		xfrd_deactivate_zone(zone);
453 		xfrd_unset_timer(zone);
454 		pipeline_setup_new_zone(set, tp, zone);
455 		return;
456 	}
457 	/* check for a pipeline to the same master with unused ID */
458 	if((tp = pipeline_find(set, zone))!= NULL) {
459 		int i;
460 		if(zone->zone_handler.ev_fd != -1)
461 			xfrd_udp_release(zone);
462 		for(i=0; i<XFRD_MAX_TCP; i++) {
463 			if(set->tcp_state[i] == tp)
464 				zone->tcp_conn = i;
465 		}
466 		xfrd_deactivate_zone(zone);
467 		xfrd_unset_timer(zone);
468 		pipeline_setup_new_zone(set, tp, zone);
469 		return;
470 	}
471 
472 	/* wait, at end of line */
473 	DEBUG(DEBUG_XFRD,2, (LOG_INFO, "xfrd: max number of tcp "
474 		"connections (%d) reached.", XFRD_MAX_TCP));
475 	zone->tcp_waiting_next = 0;
476 	zone->tcp_waiting_prev = set->tcp_waiting_last;
477 	zone->tcp_waiting = 1;
478 	if(!set->tcp_waiting_last) {
479 		set->tcp_waiting_first = zone;
480 		set->tcp_waiting_last = zone;
481 	} else {
482 		set->tcp_waiting_last->tcp_waiting_next = zone;
483 		set->tcp_waiting_last = zone;
484 	}
485 	xfrd_deactivate_zone(zone);
486 	xfrd_unset_timer(zone);
487 }
488 
489 int
490 xfrd_tcp_open(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp,
491 	xfrd_zone_type* zone)
492 {
493 	int fd, family, conn;
494 	struct timeval tv;
495 	assert(zone->tcp_conn != -1);
496 
497 	/* if there is no next master, fallback to use the first one */
498 	/* but there really should be a master set */
499 	if(!zone->master) {
500 		zone->master = zone->zone_options->pattern->request_xfr;
501 		zone->master_num = 0;
502 	}
503 
504 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s open tcp conn to %s",
505 		zone->apex_str, zone->master->ip_address_spec));
506 	tp->tcp_r->is_reading = 1;
507 	tp->tcp_r->total_bytes = 0;
508 	tp->tcp_r->msglen = 0;
509 	buffer_clear(tp->tcp_r->packet);
510 	tp->tcp_w->is_reading = 0;
511 	tp->tcp_w->total_bytes = 0;
512 	tp->tcp_w->msglen = 0;
513 	tp->connection_established = 0;
514 
515 	if(zone->master->is_ipv6) {
516 #ifdef INET6
517 		family = PF_INET6;
518 #else
519 		xfrd_set_refresh_now(zone);
520 		return 0;
521 #endif
522 	} else {
523 		family = PF_INET;
524 	}
525 	fd = socket(family, SOCK_STREAM, IPPROTO_TCP);
526 	if(fd == -1) {
527 		/* squelch 'Address family not supported by protocol' at low
528 		 * verbosity levels */
529 		if(errno != EAFNOSUPPORT || verbosity > 2)
530 		    log_msg(LOG_ERR, "xfrd: %s cannot create tcp socket: %s",
531 			zone->master->ip_address_spec, strerror(errno));
532 		xfrd_set_refresh_now(zone);
533 		return 0;
534 	}
535 	if(fcntl(fd, F_SETFL, O_NONBLOCK) == -1) {
536 		log_msg(LOG_ERR, "xfrd: fcntl failed: %s", strerror(errno));
537 		close(fd);
538 		xfrd_set_refresh_now(zone);
539 		return 0;
540 	}
541 
542 	if(xfrd->nsd->outgoing_tcp_mss > 0) {
543 #if defined(IPPROTO_TCP) && defined(TCP_MAXSEG)
544 		if(setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG,
545 			(void*)&xfrd->nsd->outgoing_tcp_mss,
546 			sizeof(xfrd->nsd->outgoing_tcp_mss)) < 0) {
547 			log_msg(LOG_ERR, "xfrd: setsockopt(TCP_MAXSEG)"
548 					"failed: %s", strerror(errno));
549 		}
550 #else
551 		log_msg(LOG_ERR, "setsockopt(TCP_MAXSEG) unsupported");
552 #endif
553 	}
554 
555 	tp->ip_len = xfrd_acl_sockaddr_to(zone->master, &tp->ip);
556 
557 	/* bind it */
558 	if (!xfrd_bind_local_interface(fd, zone->zone_options->pattern->
559 		outgoing_interface, zone->master, 1)) {
560 		close(fd);
561 		xfrd_set_refresh_now(zone);
562 		return 0;
563         }
564 
565 	conn = connect(fd, (struct sockaddr*)&tp->ip, tp->ip_len);
566 	if (conn == -1 && errno != EINPROGRESS) {
567 		log_msg(LOG_ERR, "xfrd: connect %s failed: %s",
568 			zone->master->ip_address_spec, strerror(errno));
569 		close(fd);
570 		xfrd_set_refresh_now(zone);
571 		return 0;
572 	}
573 	tp->tcp_r->fd = fd;
574 	tp->tcp_w->fd = fd;
575 
576 	/* set the tcp pipe event */
577 	if(tp->handler_added)
578 		event_del(&tp->handler);
579 	memset(&tp->handler, 0, sizeof(tp->handler));
580 	event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|EV_WRITE,
581 		xfrd_handle_tcp_pipe, tp);
582 	if(event_base_set(xfrd->event_base, &tp->handler) != 0)
583 		log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
584 	tv.tv_sec = set->tcp_timeout;
585 	tv.tv_usec = 0;
586 	if(event_add(&tp->handler, &tv) != 0)
587 		log_msg(LOG_ERR, "xfrd tcp: event_add failed");
588 	tp->handler_added = 1;
589 	return 1;
590 }
591 
592 void
593 xfrd_tcp_setup_write_packet(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
594 {
595 	struct xfrd_tcp* tcp = tp->tcp_w;
596 	assert(zone->tcp_conn != -1);
597 	assert(zone->tcp_waiting == 0);
598 	/* start AXFR or IXFR for the zone */
599 	if(zone->soa_disk_acquired == 0 || zone->master->use_axfr_only ||
600 		zone->master->ixfr_disabled ||
601 		/* if zone expired, after the first round, do not ask for
602 		 * IXFR any more, but full AXFR (of any serial number) */
603 		(zone->state == xfrd_zone_expired && zone->round_num != 0)) {
604 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request full zone transfer "
605 						"(AXFR) for %s to %s",
606 			zone->apex_str, zone->master->ip_address_spec));
607 
608 		xfrd_setup_packet(tcp->packet, TYPE_AXFR, CLASS_IN, zone->apex,
609 			zone->query_id);
610 		zone->query_type = TYPE_AXFR;
611 	} else {
612 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request incremental zone "
613 						"transfer (IXFR) for %s to %s",
614 			zone->apex_str, zone->master->ip_address_spec));
615 
616 		xfrd_setup_packet(tcp->packet, TYPE_IXFR, CLASS_IN, zone->apex,
617 			zone->query_id);
618 		zone->query_type = TYPE_IXFR;
619         	NSCOUNT_SET(tcp->packet, 1);
620 		xfrd_write_soa_buffer(tcp->packet, zone->apex, &zone->soa_disk);
621 	}
622 	/* old transfer needs to be removed still? */
623 	if(zone->msg_seq_nr)
624 		xfrd_unlink_xfrfile(xfrd->nsd, zone->xfrfilenumber);
625 	zone->msg_seq_nr = 0;
626 	zone->msg_rr_count = 0;
627 	if(zone->master->key_options && zone->master->key_options->tsig_key) {
628 		xfrd_tsig_sign_request(tcp->packet, &zone->tsig, zone->master);
629 	}
630 	buffer_flip(tcp->packet);
631 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "sent tcp query with ID %d", zone->query_id));
632 	tcp->msglen = buffer_limit(tcp->packet);
633 	tcp->total_bytes = 0;
634 }
635 
636 static void
637 tcp_conn_ready_for_reading(struct xfrd_tcp* tcp)
638 {
639 	tcp->total_bytes = 0;
640 	tcp->msglen = 0;
641 	buffer_clear(tcp->packet);
642 }
643 
644 int conn_write(struct xfrd_tcp* tcp)
645 {
646 	ssize_t sent;
647 
648 	if(tcp->total_bytes < sizeof(tcp->msglen)) {
649 		uint16_t sendlen = htons(tcp->msglen);
650 #ifdef HAVE_WRITEV
651 		struct iovec iov[2];
652 		iov[0].iov_base = (uint8_t*)&sendlen + tcp->total_bytes;
653 		iov[0].iov_len = sizeof(sendlen) - tcp->total_bytes;
654 		iov[1].iov_base = buffer_begin(tcp->packet);
655 		iov[1].iov_len = buffer_limit(tcp->packet);
656 		sent = writev(tcp->fd, iov, 2);
657 #else /* HAVE_WRITEV */
658 		sent = write(tcp->fd,
659 			(const char*)&sendlen + tcp->total_bytes,
660 			sizeof(tcp->msglen) - tcp->total_bytes);
661 #endif /* HAVE_WRITEV */
662 
663 		if(sent == -1) {
664 			if(errno == EAGAIN || errno == EINTR) {
665 				/* write would block, try later */
666 				return 0;
667 			} else {
668 				return -1;
669 			}
670 		}
671 
672 		tcp->total_bytes += sent;
673 		if(sent > (ssize_t)sizeof(tcp->msglen))
674 			buffer_skip(tcp->packet, sent-sizeof(tcp->msglen));
675 		if(tcp->total_bytes < sizeof(tcp->msglen)) {
676 			/* incomplete write, resume later */
677 			return 0;
678 		}
679 #ifdef HAVE_WRITEV
680 		if(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen)) {
681 			/* packet done */
682 			return 1;
683 		}
684 #endif
685 		assert(tcp->total_bytes >= sizeof(tcp->msglen));
686 	}
687 
688 	assert(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen));
689 
690 	sent = write(tcp->fd,
691 		buffer_current(tcp->packet),
692 		buffer_remaining(tcp->packet));
693 	if(sent == -1) {
694 		if(errno == EAGAIN || errno == EINTR) {
695 			/* write would block, try later */
696 			return 0;
697 		} else {
698 			return -1;
699 		}
700 	}
701 
702 	buffer_skip(tcp->packet, sent);
703 	tcp->total_bytes += sent;
704 
705 	if(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen)) {
706 		/* more to write when socket becomes writable again */
707 		return 0;
708 	}
709 
710 	assert(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen));
711 	return 1;
712 }
713 
714 void
715 xfrd_tcp_write(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
716 {
717 	int ret;
718 	struct xfrd_tcp* tcp = tp->tcp_w;
719 	assert(zone->tcp_conn != -1);
720 	assert(zone == tp->tcp_send_first);
721 	/* see if for non-established connection, there is a connect error */
722 	if(!tp->connection_established) {
723 		/* check for pending error from nonblocking connect */
724 		/* from Stevens, unix network programming, vol1, 3rd ed, p450 */
725 		int error = 0;
726 		socklen_t len = sizeof(error);
727 		if(getsockopt(tcp->fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0){
728 			error = errno; /* on solaris errno is error */
729 		}
730 		if(error == EINPROGRESS || error == EWOULDBLOCK)
731 			return; /* try again later */
732 		if(error != 0) {
733 			log_msg(LOG_ERR, "%s: Could not tcp connect to %s: %s",
734 				zone->apex_str, zone->master->ip_address_spec,
735 				strerror(error));
736 			xfrd_tcp_pipe_stop(tp);
737 			return;
738 		}
739 	}
740 	ret = conn_write(tcp);
741 	if(ret == -1) {
742 		log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
743 		xfrd_tcp_pipe_stop(tp);
744 		return;
745 	}
746 	if(tcp->total_bytes != 0 && !tp->connection_established)
747 		tp->connection_established = 1;
748 	if(ret == 0) {
749 		return; /* write again later */
750 	}
751 	/* done writing this message */
752 
753 	/* remove first zone from sendlist */
754 	tcp_pipe_sendlist_popfirst(tp, zone);
755 
756 	/* see if other zone wants to write; init; let it write (now) */
757 	/* and use a loop, because 64k stack calls is a too much */
758 	while(tp->tcp_send_first) {
759 		/* setup to write for this zone */
760 		xfrd_tcp_setup_write_packet(tp, tp->tcp_send_first);
761 		/* attempt to write for this zone (if success, continue loop)*/
762 		ret = conn_write(tcp);
763 		if(ret == -1) {
764 			log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
765 			xfrd_tcp_pipe_stop(tp);
766 			return;
767 		}
768 		if(ret == 0)
769 			return; /* write again later */
770 		tcp_pipe_sendlist_popfirst(tp, tp->tcp_send_first);
771 	}
772 
773 	/* if sendlist empty, remove WRITE from event */
774 
775 	/* listen to READ, and not WRITE events */
776 	assert(tp->tcp_send_first == NULL);
777 	tcp_pipe_reset_timeout(tp);
778 }
779 
780 int
781 conn_read(struct xfrd_tcp* tcp)
782 {
783 	ssize_t received;
784 	/* receive leading packet length bytes */
785 	if(tcp->total_bytes < sizeof(tcp->msglen)) {
786 		received = read(tcp->fd,
787 			(char*) &tcp->msglen + tcp->total_bytes,
788 			sizeof(tcp->msglen) - tcp->total_bytes);
789 		if(received == -1) {
790 			if(errno == EAGAIN || errno == EINTR) {
791 				/* read would block, try later */
792 				return 0;
793 			} else {
794 #ifdef ECONNRESET
795 				if (verbosity >= 2 || errno != ECONNRESET)
796 #endif /* ECONNRESET */
797 				log_msg(LOG_ERR, "tcp read sz: %s", strerror(errno));
798 				return -1;
799 			}
800 		} else if(received == 0) {
801 			/* EOF */
802 			return -1;
803 		}
804 		tcp->total_bytes += received;
805 		if(tcp->total_bytes < sizeof(tcp->msglen)) {
806 			/* not complete yet, try later */
807 			return 0;
808 		}
809 
810 		assert(tcp->total_bytes == sizeof(tcp->msglen));
811 		tcp->msglen = ntohs(tcp->msglen);
812 
813 		if(tcp->msglen == 0) {
814 			buffer_set_limit(tcp->packet, tcp->msglen);
815 			return 1;
816 		}
817 		if(tcp->msglen > buffer_capacity(tcp->packet)) {
818 			log_msg(LOG_ERR, "buffer too small, dropping connection");
819 			return 0;
820 		}
821 		buffer_set_limit(tcp->packet, tcp->msglen);
822 	}
823 
824 	assert(buffer_remaining(tcp->packet) > 0);
825 
826 	received = read(tcp->fd, buffer_current(tcp->packet),
827 		buffer_remaining(tcp->packet));
828 	if(received == -1) {
829 		if(errno == EAGAIN || errno == EINTR) {
830 			/* read would block, try later */
831 			return 0;
832 		} else {
833 #ifdef ECONNRESET
834 			if (verbosity >= 2 || errno != ECONNRESET)
835 #endif /* ECONNRESET */
836 			log_msg(LOG_ERR, "tcp read %s", strerror(errno));
837 			return -1;
838 		}
839 	} else if(received == 0) {
840 		/* EOF */
841 		return -1;
842 	}
843 
844 	tcp->total_bytes += received;
845 	buffer_skip(tcp->packet, received);
846 
847 	if(buffer_remaining(tcp->packet) > 0) {
848 		/* not complete yet, wait for more */
849 		return 0;
850 	}
851 
852 	/* completed */
853 	assert(buffer_position(tcp->packet) == tcp->msglen);
854 	return 1;
855 }
856 
857 void
858 xfrd_tcp_read(struct xfrd_tcp_pipeline* tp)
859 {
860 	xfrd_zone_type* zone;
861 	struct xfrd_tcp* tcp = tp->tcp_r;
862 	int ret;
863 	enum xfrd_packet_result pkt_result;
864 
865 	ret = conn_read(tcp);
866 	if(ret == -1) {
867 		xfrd_tcp_pipe_stop(tp);
868 		return;
869 	}
870 	if(ret == 0)
871 		return;
872 	/* completed msg */
873 	buffer_flip(tcp->packet);
874 	/* see which ID number it is, if skip, handle skip, NULL: warn */
875 	if(tcp->msglen < QHEADERSZ) {
876 		/* too short for DNS header, skip it */
877 		DEBUG(DEBUG_XFRD,1, (LOG_INFO,
878 			"xfrd: tcp skip response that is too short"));
879 		tcp_conn_ready_for_reading(tcp);
880 		return;
881 	}
882 	zone = tp->id[ID(tcp->packet)];
883 	if(!zone || zone == TCP_NULL_SKIP) {
884 		/* no zone for this id? skip it */
885 		DEBUG(DEBUG_XFRD,1, (LOG_INFO,
886 			"xfrd: tcp skip response with %s ID",
887 			zone?"set-to-skip":"unknown"));
888 		tcp_conn_ready_for_reading(tcp);
889 		return;
890 	}
891 	assert(zone->tcp_conn != -1);
892 
893 	/* handle message for zone */
894 	pkt_result = xfrd_handle_received_xfr_packet(zone, tcp->packet);
895 	/* setup for reading the next packet on this connection */
896 	tcp_conn_ready_for_reading(tcp);
897 	switch(pkt_result) {
898 		case xfrd_packet_more:
899 			/* wait for next packet */
900 			break;
901 		case xfrd_packet_newlease:
902 			/* set to skip if more packets with this ID */
903 			tp->id[zone->query_id] = TCP_NULL_SKIP;
904 			tp->num_skip++;
905 			/* fall through to remove zone from tp */
906 			/* fallthrough */
907 		case xfrd_packet_transfer:
908 			if(zone->zone_options->pattern->multi_master_check) {
909 				xfrd_tcp_release(xfrd->tcp_set, zone);
910 				xfrd_make_request(zone);
911 				break;
912 			}
913 			xfrd_tcp_release(xfrd->tcp_set, zone);
914 			assert(zone->round_num == -1);
915 			break;
916 		case xfrd_packet_notimpl:
917 			xfrd_disable_ixfr(zone);
918 			xfrd_tcp_release(xfrd->tcp_set, zone);
919 			/* query next server */
920 			xfrd_make_request(zone);
921 			break;
922 		case xfrd_packet_bad:
923 		case xfrd_packet_tcp:
924 		default:
925 			/* set to skip if more packets with this ID */
926 			tp->id[zone->query_id] = TCP_NULL_SKIP;
927 			tp->num_skip++;
928 			xfrd_tcp_release(xfrd->tcp_set, zone);
929 			/* query next server */
930 			xfrd_make_request(zone);
931 			break;
932 	}
933 }
934 
935 void
936 xfrd_tcp_release(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
937 {
938 	int conn = zone->tcp_conn;
939 	struct xfrd_tcp_pipeline* tp = set->tcp_state[conn];
940 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s released tcp conn to %s",
941 		zone->apex_str, zone->master->ip_address_spec));
942 	assert(zone->tcp_conn != -1);
943 	assert(zone->tcp_waiting == 0);
944 	zone->tcp_conn = -1;
945 	zone->tcp_waiting = 0;
946 
947 	/* remove from tcp_send list */
948 	tcp_pipe_sendlist_remove(tp, zone);
949 	/* remove it from the ID list */
950 	if(tp->id[zone->query_id] != TCP_NULL_SKIP)
951 		tcp_pipe_id_remove(tp, zone);
952 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: released tcp pipe now %d unused",
953 		tp->num_unused));
954 	/* if pipe was full, but no more, then see if waiting element is
955 	 * for the same master, and can fill the unused ID */
956 	if(tp->num_unused == 1 && set->tcp_waiting_first) {
957 #ifdef INET6
958 		struct sockaddr_storage to;
959 #else
960 		struct sockaddr_in to;
961 #endif
962 		socklen_t to_len = xfrd_acl_sockaddr_to(
963 			set->tcp_waiting_first->master, &to);
964 		if(to_len == tp->ip_len && memcmp(&to, &tp->ip, to_len) == 0) {
965 			/* use this connection for the waiting zone */
966 			zone = set->tcp_waiting_first;
967 			assert(zone->tcp_conn == -1);
968 			zone->tcp_conn = conn;
969 			tcp_zone_waiting_list_popfirst(set, zone);
970 			if(zone->zone_handler.ev_fd != -1)
971 				xfrd_udp_release(zone);
972 			xfrd_unset_timer(zone);
973 			pipeline_setup_new_zone(set, tp, zone);
974 			return;
975 		}
976 		/* waiting zone did not go to same server */
977 	}
978 
979 	/* if all unused, or only skipped leftover, close the pipeline */
980 	if(tp->num_unused >= ID_PIPE_NUM || tp->num_skip >= ID_PIPE_NUM - tp->num_unused)
981 		xfrd_tcp_pipe_release(set, tp, conn);
982 }
983 
984 void
985 xfrd_tcp_pipe_release(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp,
986 	int conn)
987 {
988 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: tcp pipe released"));
989 	/* one handler per tcp pipe */
990 	if(tp->handler_added)
991 		event_del(&tp->handler);
992 	tp->handler_added = 0;
993 
994 	/* fd in tcp_r and tcp_w is the same, close once */
995 	if(tp->tcp_r->fd != -1)
996 		close(tp->tcp_r->fd);
997 	tp->tcp_r->fd = -1;
998 	tp->tcp_w->fd = -1;
999 
1000 	/* remove from pipetree */
1001 	(void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
1002 
1003 	/* a waiting zone can use the free tcp slot (to another server) */
1004 	/* if that zone fails to set-up or connect, we try to start the next
1005 	 * waiting zone in the list */
1006 	while(set->tcp_count == XFRD_MAX_TCP && set->tcp_waiting_first) {
1007 		int i;
1008 
1009 		/* pop first waiting process */
1010 		xfrd_zone_type* zone = set->tcp_waiting_first;
1011 		/* start it */
1012 		assert(zone->tcp_conn == -1);
1013 		zone->tcp_conn = conn;
1014 		tcp_zone_waiting_list_popfirst(set, zone);
1015 
1016 		/* stop udp (if any) */
1017 		if(zone->zone_handler.ev_fd != -1)
1018 			xfrd_udp_release(zone);
1019 		if(!xfrd_tcp_open(set, tp, zone)) {
1020 			zone->tcp_conn = -1;
1021 			xfrd_set_refresh_now(zone);
1022 			/* try to start the next zone (if any) */
1023 			continue;
1024 		}
1025 		/* re-init this tcppipe */
1026 		/* ip and ip_len set by tcp_open */
1027 		tp->node.key = tp;
1028 		tp->num_unused = ID_PIPE_NUM;
1029 		tp->num_skip = 0;
1030 		tp->tcp_send_first = NULL;
1031 		tp->tcp_send_last = NULL;
1032 		memset(tp->id, 0, sizeof(tp->id));
1033 		for(i=0; i<ID_PIPE_NUM; i++) {
1034 			tp->unused[i] = i;
1035 		}
1036 
1037 		/* insert into tree */
1038 		(void)rbtree_insert(set->pipetree, &tp->node);
1039 		/* setup write */
1040 		xfrd_unset_timer(zone);
1041 		pipeline_setup_new_zone(set, tp, zone);
1042 		/* started a task, no need for cleanups, so return */
1043 		return;
1044 	}
1045 	/* no task to start, cleanup */
1046 	assert(!set->tcp_waiting_first);
1047 	set->tcp_count --;
1048 	assert(set->tcp_count >= 0);
1049 }
1050 
1051