xref: /openbsd-src/usr.sbin/nsd/xfrd-tcp.c (revision f2da64fbbbf1b03f09f390ab01267c93dfd77c4c)
1 /*
2  * xfrd-tcp.c - XFR (transfer) Daemon TCP system source file. Manages tcp conn.
3  *
4  * Copyright (c) 2001-2006, NLnet Labs. All rights reserved.
5  *
6  * See LICENSE for the license.
7  *
8  */
9 
10 #include "config.h"
11 #include <assert.h>
12 #include <errno.h>
13 #include <fcntl.h>
14 #include <unistd.h>
15 #include <stdlib.h>
16 #include "nsd.h"
17 #include "xfrd-tcp.h"
18 #include "buffer.h"
19 #include "packet.h"
20 #include "dname.h"
21 #include "options.h"
22 #include "namedb.h"
23 #include "xfrd.h"
24 #include "xfrd-disk.h"
25 #include "util.h"
26 
27 /* sort tcppipe, first on IP address, for an IPaddresss, sort on num_unused */
28 static int
29 xfrd_pipe_cmp(const void* a, const void* b)
30 {
31 	const struct xfrd_tcp_pipeline* x = (struct xfrd_tcp_pipeline*)a;
32 	const struct xfrd_tcp_pipeline* y = (struct xfrd_tcp_pipeline*)b;
33 	int r;
34 	if(x == y)
35 		return 0;
36 	if(y->ip_len != x->ip_len)
37 		/* subtraction works because nonnegative and small numbers */
38 		return (int)y->ip_len - (int)x->ip_len;
39 	r = memcmp(&x->ip, &y->ip, x->ip_len);
40 	if(r != 0)
41 		return r;
42 	/* sort that num_unused is sorted ascending, */
43 	if(x->num_unused != y->num_unused) {
44 		return (x->num_unused < y->num_unused) ? -1 : 1;
45 	}
46 	/* different pipelines are different still, even with same numunused*/
47 	return (uintptr_t)x < (uintptr_t)y ? -1 : 1;
48 }
49 
50 xfrd_tcp_set_t* xfrd_tcp_set_create(struct region* region)
51 {
52 	int i;
53 	xfrd_tcp_set_t* tcp_set = region_alloc(region, sizeof(xfrd_tcp_set_t));
54 	memset(tcp_set, 0, sizeof(xfrd_tcp_set_t));
55 	tcp_set->tcp_count = 0;
56 	tcp_set->tcp_waiting_first = 0;
57 	tcp_set->tcp_waiting_last = 0;
58 	for(i=0; i<XFRD_MAX_TCP; i++)
59 		tcp_set->tcp_state[i] = xfrd_tcp_pipeline_create(region);
60 	tcp_set->pipetree = rbtree_create(region, &xfrd_pipe_cmp);
61 	return tcp_set;
62 }
63 
64 struct xfrd_tcp_pipeline*
65 xfrd_tcp_pipeline_create(region_type* region)
66 {
67 	int i;
68 	struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)
69 		region_alloc_zero(region, sizeof(*tp));
70 	tp->num_unused = ID_PIPE_NUM;
71 	assert(sizeof(tp->unused)/sizeof(tp->unused[0]) == ID_PIPE_NUM);
72 	for(i=0; i<ID_PIPE_NUM; i++)
73 		tp->unused[i] = (uint16_t)i;
74 	tp->tcp_r = xfrd_tcp_create(region, QIOBUFSZ);
75 	tp->tcp_w = xfrd_tcp_create(region, 512);
76 	return tp;
77 }
78 
79 void
80 xfrd_setup_packet(buffer_type* packet,
81 	uint16_t type, uint16_t klass, const dname_type* dname, uint16_t qid)
82 {
83 	/* Set up the header */
84 	buffer_clear(packet);
85 	ID_SET(packet, qid);
86 	FLAGS_SET(packet, 0);
87 	OPCODE_SET(packet, OPCODE_QUERY);
88 	QDCOUNT_SET(packet, 1);
89 	ANCOUNT_SET(packet, 0);
90 	NSCOUNT_SET(packet, 0);
91 	ARCOUNT_SET(packet, 0);
92 	buffer_skip(packet, QHEADERSZ);
93 
94 	/* The question record. */
95 	buffer_write(packet, dname_name(dname), dname->name_size);
96 	buffer_write_u16(packet, type);
97 	buffer_write_u16(packet, klass);
98 }
99 
100 static socklen_t
101 #ifdef INET6
102 xfrd_acl_sockaddr(acl_options_t* acl, unsigned int port,
103 	struct sockaddr_storage *sck)
104 #else
105 xfrd_acl_sockaddr(acl_options_t* acl, unsigned int port,
106 	struct sockaddr_in *sck, const char* fromto)
107 #endif /* INET6 */
108 {
109 	/* setup address structure */
110 #ifdef INET6
111 	memset(sck, 0, sizeof(struct sockaddr_storage));
112 #else
113 	memset(sck, 0, sizeof(struct sockaddr_in));
114 #endif
115 	if(acl->is_ipv6) {
116 #ifdef INET6
117 		struct sockaddr_in6* sa = (struct sockaddr_in6*)sck;
118 		sa->sin6_family = AF_INET6;
119 		sa->sin6_port = htons(port);
120 		sa->sin6_addr = acl->addr.addr6;
121 		return sizeof(struct sockaddr_in6);
122 #else
123 		log_msg(LOG_ERR, "xfrd: IPv6 connection %s %s attempted but no \
124 INET6.", fromto, acl->ip_address_spec);
125 		return 0;
126 #endif
127 	} else {
128 		struct sockaddr_in* sa = (struct sockaddr_in*)sck;
129 		sa->sin_family = AF_INET;
130 		sa->sin_port = htons(port);
131 		sa->sin_addr = acl->addr.addr;
132 		return sizeof(struct sockaddr_in);
133 	}
134 }
135 
136 socklen_t
137 #ifdef INET6
138 xfrd_acl_sockaddr_to(acl_options_t* acl, struct sockaddr_storage *to)
139 #else
140 xfrd_acl_sockaddr_to(acl_options_t* acl, struct sockaddr_in *to)
141 #endif /* INET6 */
142 {
143 	unsigned int port = acl->port?acl->port:(unsigned)atoi(TCP_PORT);
144 #ifdef INET6
145 	return xfrd_acl_sockaddr(acl, port, to);
146 #else
147 	return xfrd_acl_sockaddr(acl, port, to, "to");
148 #endif /* INET6 */
149 }
150 
151 socklen_t
152 #ifdef INET6
153 xfrd_acl_sockaddr_frm(acl_options_t* acl, struct sockaddr_storage *frm)
154 #else
155 xfrd_acl_sockaddr_frm(acl_options_t* acl, struct sockaddr_in *frm)
156 #endif /* INET6 */
157 {
158 	unsigned int port = acl->port?acl->port:0;
159 #ifdef INET6
160 	return xfrd_acl_sockaddr(acl, port, frm);
161 #else
162 	return xfrd_acl_sockaddr(acl, port, frm, "from");
163 #endif /* INET6 */
164 }
165 
166 void
167 xfrd_write_soa_buffer(struct buffer* packet,
168 	const dname_type* apex, struct xfrd_soa* soa)
169 {
170 	size_t rdlength_pos;
171 	uint16_t rdlength;
172 	buffer_write(packet, dname_name(apex), apex->name_size);
173 
174 	/* already in network order */
175 	buffer_write(packet, &soa->type, sizeof(soa->type));
176 	buffer_write(packet, &soa->klass, sizeof(soa->klass));
177 	buffer_write(packet, &soa->ttl, sizeof(soa->ttl));
178 	rdlength_pos = buffer_position(packet);
179 	buffer_skip(packet, sizeof(rdlength));
180 
181 	/* uncompressed dnames */
182 	buffer_write(packet, soa->prim_ns+1, soa->prim_ns[0]);
183 	buffer_write(packet, soa->email+1, soa->email[0]);
184 
185 	buffer_write(packet, &soa->serial, sizeof(uint32_t));
186 	buffer_write(packet, &soa->refresh, sizeof(uint32_t));
187 	buffer_write(packet, &soa->retry, sizeof(uint32_t));
188 	buffer_write(packet, &soa->expire, sizeof(uint32_t));
189 	buffer_write(packet, &soa->minimum, sizeof(uint32_t));
190 
191 	/* write length of RR */
192 	rdlength = buffer_position(packet) - rdlength_pos - sizeof(rdlength);
193 	buffer_write_u16_at(packet, rdlength_pos, rdlength);
194 }
195 
196 xfrd_tcp_t*
197 xfrd_tcp_create(region_type* region, size_t bufsize)
198 {
199 	xfrd_tcp_t* tcp_state = (xfrd_tcp_t*)region_alloc(
200 		region, sizeof(xfrd_tcp_t));
201 	memset(tcp_state, 0, sizeof(xfrd_tcp_t));
202 	tcp_state->packet = buffer_create(region, bufsize);
203 	tcp_state->fd = -1;
204 
205 	return tcp_state;
206 }
207 
208 static struct xfrd_tcp_pipeline*
209 pipeline_find(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
210 {
211 	rbnode_t* sme = NULL;
212 	struct xfrd_tcp_pipeline* r;
213 	/* smaller buf than a full pipeline with 64kb ID array, only need
214 	 * the front part with the key info, this front part contains the
215 	 * members that the compare function uses. */
216 	const size_t keysize = sizeof(struct xfrd_tcp_pipeline) -
217 		ID_PIPE_NUM*(sizeof(struct xfrd_zone*) + sizeof(uint16_t));
218 	/* void* type for alignment of the struct,
219 	 * divide the keysize by ptr-size and then add one to round up */
220 	void* buf[ (keysize / sizeof(void*)) + 1 ];
221 	struct xfrd_tcp_pipeline* key = (struct xfrd_tcp_pipeline*)buf;
222 	key->node.key = key;
223 	key->ip_len = xfrd_acl_sockaddr_to(zone->master, &key->ip);
224 	key->num_unused = ID_PIPE_NUM;
225 	/* lookup existing tcp transfer to the master with highest unused */
226 	if(rbtree_find_less_equal(set->pipetree, key, &sme)) {
227 		/* exact match, strange, fully unused tcp cannot be open */
228 		assert(0);
229 	}
230 	if(!sme)
231 		return NULL;
232 	r = (struct xfrd_tcp_pipeline*)sme->key;
233 	/* <= key pointed at, is the master correct ? */
234 	if(r->ip_len != key->ip_len)
235 		return NULL;
236 	if(memcmp(&r->ip, &key->ip, key->ip_len) != 0)
237 		return NULL;
238 	/* correct master, is there a slot free for this transfer? */
239 	if(r->num_unused == 0)
240 		return NULL;
241 	return r;
242 }
243 
244 /* remove zone from tcp waiting list */
245 static void
246 tcp_zone_waiting_list_popfirst(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
247 {
248 	assert(zone->tcp_waiting);
249 	set->tcp_waiting_first = zone->tcp_waiting_next;
250 	if(zone->tcp_waiting_next)
251 		zone->tcp_waiting_next->tcp_waiting_prev = NULL;
252 	else	set->tcp_waiting_last = 0;
253 	zone->tcp_waiting_next = 0;
254 	zone->tcp_waiting = 0;
255 }
256 
257 /* remove zone from tcp pipe write-wait list */
258 static void
259 tcp_pipe_sendlist_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
260 {
261 	if(zone->in_tcp_send) {
262 		if(zone->tcp_send_prev)
263 			zone->tcp_send_prev->tcp_send_next=zone->tcp_send_next;
264 		else	tp->tcp_send_first=zone->tcp_send_next;
265 		if(zone->tcp_send_next)
266 			zone->tcp_send_next->tcp_send_prev=zone->tcp_send_prev;
267 		else	tp->tcp_send_last=zone->tcp_send_prev;
268 		zone->in_tcp_send = 0;
269 	}
270 }
271 
272 /* remove first from write-wait list */
273 static void
274 tcp_pipe_sendlist_popfirst(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
275 {
276 	tp->tcp_send_first = zone->tcp_send_next;
277 	if(tp->tcp_send_first)
278 		tp->tcp_send_first->tcp_send_prev = NULL;
279 	else	tp->tcp_send_last = NULL;
280 	zone->in_tcp_send = 0;
281 }
282 
283 /* remove zone from tcp pipe ID map */
284 static void
285 tcp_pipe_id_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
286 {
287 	assert(tp->num_unused < ID_PIPE_NUM && tp->num_unused >= 0);
288 	assert(tp->id[zone->query_id] == zone);
289 	tp->id[zone->query_id] = NULL;
290 	tp->unused[tp->num_unused] = zone->query_id;
291 	/* must remove and re-add for sort order in tree */
292 	(void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
293 	tp->num_unused++;
294 	(void)rbtree_insert(xfrd->tcp_set->pipetree, &tp->node);
295 }
296 
297 /* stop the tcp pipe (and all its zones need to retry) */
298 static void
299 xfrd_tcp_pipe_stop(struct xfrd_tcp_pipeline* tp)
300 {
301 	int i, conn = -1;
302 	assert(tp->num_unused < ID_PIPE_NUM); /* at least one 'in-use' */
303 	assert(ID_PIPE_NUM - tp->num_unused > tp->num_skip); /* at least one 'nonskip' */
304 	/* need to retry for all the zones connected to it */
305 	/* these could use different lists and go to a different nextmaster*/
306 	for(i=0; i<ID_PIPE_NUM; i++) {
307 		if(tp->id[i] && tp->id[i] != TCP_NULL_SKIP) {
308 			xfrd_zone_t* zone = tp->id[i];
309 			conn = zone->tcp_conn;
310 			zone->tcp_conn = -1;
311 			zone->tcp_waiting = 0;
312 			tcp_pipe_sendlist_remove(tp, zone);
313 			tcp_pipe_id_remove(tp, zone);
314 			xfrd_set_refresh_now(zone);
315 		}
316 	}
317 	assert(conn != -1);
318 	/* now release the entire tcp pipe */
319 	xfrd_tcp_pipe_release(xfrd->tcp_set, tp, conn);
320 }
321 
322 static void
323 tcp_pipe_reset_timeout(struct xfrd_tcp_pipeline* tp)
324 {
325 	int fd = tp->handler.ev_fd;
326 	struct timeval tv;
327 	tv.tv_sec = xfrd->tcp_set->tcp_timeout;
328 	tv.tv_usec = 0;
329 	if(tp->handler_added)
330 		event_del(&tp->handler);
331 	event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|
332 		(tp->tcp_send_first?EV_WRITE:0), xfrd_handle_tcp_pipe, tp);
333 	if(event_base_set(xfrd->event_base, &tp->handler) != 0)
334 		log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
335 	if(event_add(&tp->handler, &tv) != 0)
336 		log_msg(LOG_ERR, "xfrd tcp: event_add failed");
337 	tp->handler_added = 1;
338 }
339 
340 /* handle event from fd of tcp pipe */
341 void
342 xfrd_handle_tcp_pipe(int ATTR_UNUSED(fd), short event, void* arg)
343 {
344 	struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)arg;
345 	if((event & EV_WRITE)) {
346 		tcp_pipe_reset_timeout(tp);
347 		if(tp->tcp_send_first) {
348 			DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp write, zone %s",
349 				tp->tcp_send_first->apex_str));
350 			xfrd_tcp_write(tp, tp->tcp_send_first);
351 		}
352 	}
353 	if((event & EV_READ) && tp->handler_added) {
354 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp read"));
355 		tcp_pipe_reset_timeout(tp);
356 		xfrd_tcp_read(tp);
357 	}
358 	if((event & EV_TIMEOUT) && tp->handler_added) {
359 		/* tcp connection timed out */
360 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp timeout"));
361 		xfrd_tcp_pipe_stop(tp);
362 	}
363 }
364 
365 /* add a zone to the pipeline, it starts to want to write its query */
366 static void
367 pipeline_setup_new_zone(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp,
368 	xfrd_zone_t* zone)
369 {
370 	/* assign the ID */
371 	int idx;
372 	assert(tp->num_unused > 0);
373 	/* we pick a random ID, even though it is TCP anyway */
374 	idx = random_generate(tp->num_unused);
375 	zone->query_id = tp->unused[idx];
376 	tp->unused[idx] = tp->unused[tp->num_unused-1];
377 	tp->id[zone->query_id] = zone;
378 	/* decrement unused counter, and fixup tree */
379 	(void)rbtree_delete(set->pipetree, &tp->node);
380 	tp->num_unused--;
381 	(void)rbtree_insert(set->pipetree, &tp->node);
382 
383 	/* add to sendlist, at end */
384 	zone->tcp_send_next = NULL;
385 	zone->tcp_send_prev = tp->tcp_send_last;
386 	zone->in_tcp_send = 1;
387 	if(tp->tcp_send_last)
388 		tp->tcp_send_last->tcp_send_next = zone;
389 	else	tp->tcp_send_first = zone;
390 	tp->tcp_send_last = zone;
391 
392 	/* is it first in line? */
393 	if(tp->tcp_send_first == zone) {
394 		xfrd_tcp_setup_write_packet(tp, zone);
395 		/* add write to event handler */
396 		tcp_pipe_reset_timeout(tp);
397 	}
398 }
399 
400 void
401 xfrd_tcp_obtain(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
402 {
403 	struct xfrd_tcp_pipeline* tp;
404 	assert(zone->tcp_conn == -1);
405 	assert(zone->tcp_waiting == 0);
406 
407 	if(set->tcp_count < XFRD_MAX_TCP) {
408 		int i;
409 		assert(!set->tcp_waiting_first);
410 		set->tcp_count ++;
411 		/* find a free tcp_buffer */
412 		for(i=0; i<XFRD_MAX_TCP; i++) {
413 			if(set->tcp_state[i]->tcp_r->fd == -1) {
414 				zone->tcp_conn = i;
415 				break;
416 			}
417 		}
418 		/** What if there is no free tcp_buffer? return; */
419 		if (zone->tcp_conn < 0) {
420 			return;
421 		}
422 
423 		tp = set->tcp_state[zone->tcp_conn];
424 		zone->tcp_waiting = 0;
425 
426 		/* stop udp use (if any) */
427 		if(zone->zone_handler.ev_fd != -1)
428 			xfrd_udp_release(zone);
429 
430 		if(!xfrd_tcp_open(set, tp, zone)) {
431 			zone->tcp_conn = -1;
432 			set->tcp_count --;
433 			xfrd_set_refresh_now(zone);
434 			return;
435 		}
436 		/* ip and ip_len set by tcp_open */
437 		tp->node.key = tp;
438 		tp->num_unused = ID_PIPE_NUM;
439 		tp->num_skip = 0;
440 		tp->tcp_send_first = NULL;
441 		tp->tcp_send_last = NULL;
442 		memset(tp->id, 0, sizeof(tp->id));
443 		for(i=0; i<ID_PIPE_NUM; i++) {
444 			tp->unused[i] = i;
445 		}
446 
447 		/* insert into tree */
448 		(void)rbtree_insert(set->pipetree, &tp->node);
449 		xfrd_deactivate_zone(zone);
450 		xfrd_unset_timer(zone);
451 		pipeline_setup_new_zone(set, tp, zone);
452 		return;
453 	}
454 	/* check for a pipeline to the same master with unused ID */
455 	if((tp = pipeline_find(set, zone))!= NULL) {
456 		int i;
457 		if(zone->zone_handler.ev_fd != -1)
458 			xfrd_udp_release(zone);
459 		for(i=0; i<XFRD_MAX_TCP; i++) {
460 			if(set->tcp_state[i] == tp)
461 				zone->tcp_conn = i;
462 		}
463 		xfrd_deactivate_zone(zone);
464 		xfrd_unset_timer(zone);
465 		pipeline_setup_new_zone(set, tp, zone);
466 		return;
467 	}
468 
469 	/* wait, at end of line */
470 	DEBUG(DEBUG_XFRD,2, (LOG_INFO, "xfrd: max number of tcp "
471 		"connections (%d) reached.", XFRD_MAX_TCP));
472 	zone->tcp_waiting_next = 0;
473 	zone->tcp_waiting_prev = set->tcp_waiting_last;
474 	zone->tcp_waiting = 1;
475 	if(!set->tcp_waiting_last) {
476 		set->tcp_waiting_first = zone;
477 		set->tcp_waiting_last = zone;
478 	} else {
479 		set->tcp_waiting_last->tcp_waiting_next = zone;
480 		set->tcp_waiting_last = zone;
481 	}
482 	xfrd_deactivate_zone(zone);
483 	xfrd_unset_timer(zone);
484 }
485 
486 int
487 xfrd_tcp_open(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp,
488 	xfrd_zone_t* zone)
489 {
490 	int fd, family, conn;
491 	struct timeval tv;
492 	assert(zone->tcp_conn != -1);
493 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s open tcp conn to %s",
494 		zone->apex_str, zone->master->ip_address_spec));
495 	tp->tcp_r->is_reading = 1;
496 	tp->tcp_r->total_bytes = 0;
497 	tp->tcp_r->msglen = 0;
498 	buffer_clear(tp->tcp_r->packet);
499 	tp->tcp_w->is_reading = 0;
500 	tp->tcp_w->total_bytes = 0;
501 	tp->tcp_w->msglen = 0;
502 	tp->connection_established = 0;
503 
504 	if(zone->master->is_ipv6) {
505 #ifdef INET6
506 		family = PF_INET6;
507 #else
508 		xfrd_set_refresh_now(zone);
509 		return 0;
510 #endif
511 	} else {
512 		family = PF_INET;
513 	}
514 	fd = socket(family, SOCK_STREAM, IPPROTO_TCP);
515 	if(fd == -1) {
516 		log_msg(LOG_ERR, "xfrd: %s cannot create tcp socket: %s",
517 			zone->master->ip_address_spec, strerror(errno));
518 		xfrd_set_refresh_now(zone);
519 		return 0;
520 	}
521 	if(fcntl(fd, F_SETFL, O_NONBLOCK) == -1) {
522 		log_msg(LOG_ERR, "xfrd: fcntl failed: %s", strerror(errno));
523 		close(fd);
524 		xfrd_set_refresh_now(zone);
525 		return 0;
526 	}
527 
528 	if(xfrd->nsd->outgoing_tcp_mss > 0) {
529 #if defined(IPPROTO_TCP) && defined(TCP_MAXSEG)
530 		if(setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG,
531 			(void*)&xfrd->nsd->outgoing_tcp_mss,
532 			sizeof(xfrd->nsd->outgoing_tcp_mss)) < 0) {
533 			log_msg(LOG_ERR, "xfrd: setsockopt(TCP_MAXSEG)"
534 					"failed: %s", strerror(errno));
535 		}
536 #else
537 		log_msg(LOG_ERR, "setsockopt(TCP_MAXSEG) unsupported");
538 #endif
539 	}
540 
541 	tp->ip_len = xfrd_acl_sockaddr_to(zone->master, &tp->ip);
542 
543 	/* bind it */
544 	if (!xfrd_bind_local_interface(fd, zone->zone_options->pattern->
545 		outgoing_interface, zone->master, 1)) {
546 		close(fd);
547 		xfrd_set_refresh_now(zone);
548 		return 0;
549         }
550 
551 	conn = connect(fd, (struct sockaddr*)&tp->ip, tp->ip_len);
552 	if (conn == -1 && errno != EINPROGRESS) {
553 		log_msg(LOG_ERR, "xfrd: connect %s failed: %s",
554 			zone->master->ip_address_spec, strerror(errno));
555 		close(fd);
556 		xfrd_set_refresh_now(zone);
557 		return 0;
558 	}
559 	tp->tcp_r->fd = fd;
560 	tp->tcp_w->fd = fd;
561 
562 	/* set the tcp pipe event */
563 	if(tp->handler_added)
564 		event_del(&tp->handler);
565 	event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|EV_WRITE,
566 		xfrd_handle_tcp_pipe, tp);
567 	if(event_base_set(xfrd->event_base, &tp->handler) != 0)
568 		log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
569 	tv.tv_sec = set->tcp_timeout;
570 	tv.tv_usec = 0;
571 	if(event_add(&tp->handler, &tv) != 0)
572 		log_msg(LOG_ERR, "xfrd tcp: event_add failed");
573 	tp->handler_added = 1;
574 	return 1;
575 }
576 
577 void
578 xfrd_tcp_setup_write_packet(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
579 {
580 	xfrd_tcp_t* tcp = tp->tcp_w;
581 	assert(zone->tcp_conn != -1);
582 	assert(zone->tcp_waiting == 0);
583 	/* start AXFR or IXFR for the zone */
584 	if(zone->soa_disk_acquired == 0 || zone->master->use_axfr_only ||
585 		zone->master->ixfr_disabled ||
586 		/* if zone expired, after the first round, do not ask for
587 		 * IXFR any more, but full AXFR (of any serial number) */
588 		(zone->state == xfrd_zone_expired && zone->round_num != 0)) {
589 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request full zone transfer "
590 						"(AXFR) for %s to %s",
591 			zone->apex_str, zone->master->ip_address_spec));
592 
593 		xfrd_setup_packet(tcp->packet, TYPE_AXFR, CLASS_IN, zone->apex,
594 			zone->query_id);
595 	} else {
596 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request incremental zone "
597 						"transfer (IXFR) for %s to %s",
598 			zone->apex_str, zone->master->ip_address_spec));
599 
600 		xfrd_setup_packet(tcp->packet, TYPE_IXFR, CLASS_IN, zone->apex,
601 			zone->query_id);
602         	NSCOUNT_SET(tcp->packet, 1);
603 		xfrd_write_soa_buffer(tcp->packet, zone->apex, &zone->soa_disk);
604 	}
605 	/* old transfer needs to be removed still? */
606 	if(zone->msg_seq_nr)
607 		xfrd_unlink_xfrfile(xfrd->nsd, zone->xfrfilenumber);
608 	zone->msg_seq_nr = 0;
609 	zone->msg_rr_count = 0;
610 	if(zone->master->key_options && zone->master->key_options->tsig_key) {
611 		xfrd_tsig_sign_request(tcp->packet, &zone->tsig, zone->master);
612 	}
613 	buffer_flip(tcp->packet);
614 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "sent tcp query with ID %d", zone->query_id));
615 	tcp->msglen = buffer_limit(tcp->packet);
616 	tcp->total_bytes = 0;
617 }
618 
619 static void
620 tcp_conn_ready_for_reading(xfrd_tcp_t* tcp)
621 {
622 	tcp->total_bytes = 0;
623 	tcp->msglen = 0;
624 	buffer_clear(tcp->packet);
625 }
626 
627 int conn_write(xfrd_tcp_t* tcp)
628 {
629 	ssize_t sent;
630 
631 	if(tcp->total_bytes < sizeof(tcp->msglen)) {
632 		uint16_t sendlen = htons(tcp->msglen);
633 		sent = write(tcp->fd,
634 			(const char*)&sendlen + tcp->total_bytes,
635 			sizeof(tcp->msglen) - tcp->total_bytes);
636 
637 		if(sent == -1) {
638 			if(errno == EAGAIN || errno == EINTR) {
639 				/* write would block, try later */
640 				return 0;
641 			} else {
642 				return -1;
643 			}
644 		}
645 
646 		tcp->total_bytes += sent;
647 		if(tcp->total_bytes < sizeof(tcp->msglen)) {
648 			/* incomplete write, resume later */
649 			return 0;
650 		}
651 		assert(tcp->total_bytes == sizeof(tcp->msglen));
652 	}
653 
654 	assert(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen));
655 
656 	sent = write(tcp->fd,
657 		buffer_current(tcp->packet),
658 		buffer_remaining(tcp->packet));
659 	if(sent == -1) {
660 		if(errno == EAGAIN || errno == EINTR) {
661 			/* write would block, try later */
662 			return 0;
663 		} else {
664 			return -1;
665 		}
666 	}
667 
668 	buffer_skip(tcp->packet, sent);
669 	tcp->total_bytes += sent;
670 
671 	if(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen)) {
672 		/* more to write when socket becomes writable again */
673 		return 0;
674 	}
675 
676 	assert(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen));
677 	return 1;
678 }
679 
680 void
681 xfrd_tcp_write(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
682 {
683 	int ret;
684 	xfrd_tcp_t* tcp = tp->tcp_w;
685 	assert(zone->tcp_conn != -1);
686 	assert(zone == tp->tcp_send_first);
687 	/* see if for non-established connection, there is a connect error */
688 	if(!tp->connection_established) {
689 		/* check for pending error from nonblocking connect */
690 		/* from Stevens, unix network programming, vol1, 3rd ed, p450 */
691 		int error = 0;
692 		socklen_t len = sizeof(error);
693 		if(getsockopt(tcp->fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0){
694 			error = errno; /* on solaris errno is error */
695 		}
696 		if(error == EINPROGRESS || error == EWOULDBLOCK)
697 			return; /* try again later */
698 		if(error != 0) {
699 			log_msg(LOG_ERR, "%s: Could not tcp connect to %s: %s",
700 				zone->apex_str, zone->master->ip_address_spec,
701 				strerror(error));
702 			xfrd_tcp_pipe_stop(tp);
703 			return;
704 		}
705 	}
706 	ret = conn_write(tcp);
707 	if(ret == -1) {
708 		log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
709 		xfrd_tcp_pipe_stop(tp);
710 		return;
711 	}
712 	if(tcp->total_bytes != 0 && !tp->connection_established)
713 		tp->connection_established = 1;
714 	if(ret == 0) {
715 		return; /* write again later */
716 	}
717 	/* done writing this message */
718 
719 	/* remove first zone from sendlist */
720 	tcp_pipe_sendlist_popfirst(tp, zone);
721 
722 	/* see if other zone wants to write; init; let it write (now) */
723 	/* and use a loop, because 64k stack calls is a too much */
724 	while(tp->tcp_send_first) {
725 		/* setup to write for this zone */
726 		xfrd_tcp_setup_write_packet(tp, tp->tcp_send_first);
727 		/* attempt to write for this zone (if success, continue loop)*/
728 		ret = conn_write(tcp);
729 		if(ret == -1) {
730 			log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
731 			xfrd_tcp_pipe_stop(tp);
732 			return;
733 		}
734 		if(ret == 0)
735 			return; /* write again later */
736 		tcp_pipe_sendlist_popfirst(tp, tp->tcp_send_first);
737 	}
738 
739 	/* if sendlist empty, remove WRITE from event */
740 
741 	/* listen to READ, and not WRITE events */
742 	assert(tp->tcp_send_first == NULL);
743 	tcp_pipe_reset_timeout(tp);
744 }
745 
746 int
747 conn_read(xfrd_tcp_t* tcp)
748 {
749 	ssize_t received;
750 	/* receive leading packet length bytes */
751 	if(tcp->total_bytes < sizeof(tcp->msglen)) {
752 		received = read(tcp->fd,
753 			(char*) &tcp->msglen + tcp->total_bytes,
754 			sizeof(tcp->msglen) - tcp->total_bytes);
755 		if(received == -1) {
756 			if(errno == EAGAIN || errno == EINTR) {
757 				/* read would block, try later */
758 				return 0;
759 			} else {
760 #ifdef ECONNRESET
761 				if (verbosity >= 2 || errno != ECONNRESET)
762 #endif /* ECONNRESET */
763 				log_msg(LOG_ERR, "tcp read sz: %s", strerror(errno));
764 				return -1;
765 			}
766 		} else if(received == 0) {
767 			/* EOF */
768 			return -1;
769 		}
770 		tcp->total_bytes += received;
771 		if(tcp->total_bytes < sizeof(tcp->msglen)) {
772 			/* not complete yet, try later */
773 			return 0;
774 		}
775 
776 		assert(tcp->total_bytes == sizeof(tcp->msglen));
777 		tcp->msglen = ntohs(tcp->msglen);
778 
779 		if(tcp->msglen == 0) {
780 			buffer_set_limit(tcp->packet, tcp->msglen);
781 			return 1;
782 		}
783 		if(tcp->msglen > buffer_capacity(tcp->packet)) {
784 			log_msg(LOG_ERR, "buffer too small, dropping connection");
785 			return 0;
786 		}
787 		buffer_set_limit(tcp->packet, tcp->msglen);
788 	}
789 
790 	assert(buffer_remaining(tcp->packet) > 0);
791 
792 	received = read(tcp->fd, buffer_current(tcp->packet),
793 		buffer_remaining(tcp->packet));
794 	if(received == -1) {
795 		if(errno == EAGAIN || errno == EINTR) {
796 			/* read would block, try later */
797 			return 0;
798 		} else {
799 #ifdef ECONNRESET
800 			if (verbosity >= 2 || errno != ECONNRESET)
801 #endif /* ECONNRESET */
802 			log_msg(LOG_ERR, "tcp read %s", strerror(errno));
803 			return -1;
804 		}
805 	} else if(received == 0) {
806 		/* EOF */
807 		return -1;
808 	}
809 
810 	tcp->total_bytes += received;
811 	buffer_skip(tcp->packet, received);
812 
813 	if(buffer_remaining(tcp->packet) > 0) {
814 		/* not complete yet, wait for more */
815 		return 0;
816 	}
817 
818 	/* completed */
819 	assert(buffer_position(tcp->packet) == tcp->msglen);
820 	return 1;
821 }
822 
823 void
824 xfrd_tcp_read(struct xfrd_tcp_pipeline* tp)
825 {
826 	xfrd_zone_t* zone;
827 	xfrd_tcp_t* tcp = tp->tcp_r;
828 	int ret;
829 	enum xfrd_packet_result pkt_result;
830 
831 	ret = conn_read(tcp);
832 	if(ret == -1) {
833 		xfrd_tcp_pipe_stop(tp);
834 		return;
835 	}
836 	if(ret == 0)
837 		return;
838 	/* completed msg */
839 	buffer_flip(tcp->packet);
840 	/* see which ID number it is, if skip, handle skip, NULL: warn */
841 	if(tcp->msglen < QHEADERSZ) {
842 		/* too short for DNS header, skip it */
843 		DEBUG(DEBUG_XFRD,1, (LOG_INFO,
844 			"xfrd: tcp skip response that is too short"));
845 		tcp_conn_ready_for_reading(tcp);
846 		return;
847 	}
848 	zone = tp->id[ID(tcp->packet)];
849 	if(!zone || zone == TCP_NULL_SKIP) {
850 		/* no zone for this id? skip it */
851 		DEBUG(DEBUG_XFRD,1, (LOG_INFO,
852 			"xfrd: tcp skip response with %s ID",
853 			zone?"set-to-skip":"unknown"));
854 		tcp_conn_ready_for_reading(tcp);
855 		return;
856 	}
857 	assert(zone->tcp_conn != -1);
858 
859 	/* handle message for zone */
860 	pkt_result = xfrd_handle_received_xfr_packet(zone, tcp->packet);
861 	/* setup for reading the next packet on this connection */
862 	tcp_conn_ready_for_reading(tcp);
863 	switch(pkt_result) {
864 		case xfrd_packet_more:
865 			/* wait for next packet */
866 			break;
867 		case xfrd_packet_newlease:
868 			/* set to skip if more packets with this ID */
869 			tp->id[zone->query_id] = TCP_NULL_SKIP;
870 			tp->num_skip++;
871 			/* fall through to remove zone from tp */
872 		case xfrd_packet_transfer:
873 			xfrd_tcp_release(xfrd->tcp_set, zone);
874 			assert(zone->round_num == -1);
875 			break;
876 		case xfrd_packet_notimpl:
877 			zone->master->ixfr_disabled = time(NULL);
878 			xfrd_tcp_release(xfrd->tcp_set, zone);
879 			/* query next server */
880 			xfrd_make_request(zone);
881 			break;
882 		case xfrd_packet_bad:
883 		case xfrd_packet_tcp:
884 		default:
885 			/* set to skip if more packets with this ID */
886 			tp->id[zone->query_id] = TCP_NULL_SKIP;
887 			tp->num_skip++;
888 			xfrd_tcp_release(xfrd->tcp_set, zone);
889 			/* query next server */
890 			xfrd_make_request(zone);
891 			break;
892 	}
893 }
894 
895 void
896 xfrd_tcp_release(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
897 {
898 	int conn = zone->tcp_conn;
899 	struct xfrd_tcp_pipeline* tp = set->tcp_state[conn];
900 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s released tcp conn to %s",
901 		zone->apex_str, zone->master->ip_address_spec));
902 	assert(zone->tcp_conn != -1);
903 	assert(zone->tcp_waiting == 0);
904 	zone->tcp_conn = -1;
905 	zone->tcp_waiting = 0;
906 
907 	/* remove from tcp_send list */
908 	tcp_pipe_sendlist_remove(tp, zone);
909 	/* remove it from the ID list */
910 	if(tp->id[zone->query_id] != TCP_NULL_SKIP)
911 		tcp_pipe_id_remove(tp, zone);
912 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: released tcp pipe now %d unused",
913 		tp->num_unused));
914 	/* if pipe was full, but no more, then see if waiting element is
915 	 * for the same master, and can fill the unused ID */
916 	if(tp->num_unused == 1 && set->tcp_waiting_first) {
917 #ifdef INET6
918 		struct sockaddr_storage to;
919 #else
920 		struct sockaddr_in to;
921 #endif
922 		socklen_t to_len = xfrd_acl_sockaddr_to(
923 			set->tcp_waiting_first->master, &to);
924 		if(to_len == tp->ip_len && memcmp(&to, &tp->ip, to_len) == 0) {
925 			/* use this connection for the waiting zone */
926 			zone = set->tcp_waiting_first;
927 			assert(zone->tcp_conn == -1);
928 			zone->tcp_conn = conn;
929 			tcp_zone_waiting_list_popfirst(set, zone);
930 			if(zone->zone_handler.ev_fd != -1)
931 				xfrd_udp_release(zone);
932 			xfrd_unset_timer(zone);
933 			pipeline_setup_new_zone(set, tp, zone);
934 			return;
935 		}
936 		/* waiting zone did not go to same server */
937 	}
938 
939 	/* if all unused, or only skipped leftover, close the pipeline */
940 	if(tp->num_unused >= ID_PIPE_NUM || tp->num_skip >= ID_PIPE_NUM - tp->num_unused)
941 		xfrd_tcp_pipe_release(set, tp, conn);
942 }
943 
944 void
945 xfrd_tcp_pipe_release(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp,
946 	int conn)
947 {
948 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: tcp pipe released"));
949 	/* one handler per tcp pipe */
950 	if(tp->handler_added)
951 		event_del(&tp->handler);
952 	tp->handler_added = 0;
953 
954 	/* fd in tcp_r and tcp_w is the same, close once */
955 	if(tp->tcp_r->fd != -1)
956 		close(tp->tcp_r->fd);
957 	tp->tcp_r->fd = -1;
958 	tp->tcp_w->fd = -1;
959 
960 	/* remove from pipetree */
961 	(void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
962 
963 	/* a waiting zone can use the free tcp slot (to another server) */
964 	/* if that zone fails to set-up or connect, we try to start the next
965 	 * waiting zone in the list */
966 	while(set->tcp_count == XFRD_MAX_TCP && set->tcp_waiting_first) {
967 		int i;
968 
969 		/* pop first waiting process */
970 		xfrd_zone_t* zone = set->tcp_waiting_first;
971 		/* start it */
972 		assert(zone->tcp_conn == -1);
973 		zone->tcp_conn = conn;
974 		tcp_zone_waiting_list_popfirst(set, zone);
975 
976 		/* stop udp (if any) */
977 		if(zone->zone_handler.ev_fd != -1)
978 			xfrd_udp_release(zone);
979 		if(!xfrd_tcp_open(set, tp, zone)) {
980 			zone->tcp_conn = -1;
981 			xfrd_set_refresh_now(zone);
982 			/* try to start the next zone (if any) */
983 			continue;
984 		}
985 		/* re-init this tcppipe */
986 		/* ip and ip_len set by tcp_open */
987 		tp->node.key = tp;
988 		tp->num_unused = ID_PIPE_NUM;
989 		tp->num_skip = 0;
990 		tp->tcp_send_first = NULL;
991 		tp->tcp_send_last = NULL;
992 		memset(tp->id, 0, sizeof(tp->id));
993 		for(i=0; i<ID_PIPE_NUM; i++) {
994 			tp->unused[i] = i;
995 		}
996 
997 		/* insert into tree */
998 		(void)rbtree_insert(set->pipetree, &tp->node);
999 		/* setup write */
1000 		xfrd_unset_timer(zone);
1001 		pipeline_setup_new_zone(set, tp, zone);
1002 		/* started a task, no need for cleanups, so return */
1003 		return;
1004 	}
1005 	/* no task to start, cleanup */
1006 	assert(!set->tcp_waiting_first);
1007 	set->tcp_count --;
1008 	assert(set->tcp_count >= 0);
1009 }
1010 
1011