xref: /netbsd-src/external/mpl/dhcp/bind/dist/lib/isc/netmgr/tcp.c (revision 4afad4b7fa6d4a0d3dedf41d1587a7250710ae54)
1 /*	$NetBSD: tcp.c,v 1.1 2024/02/18 20:57:55 christos Exp $	*/
2 
3 /*
4  * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
5  *
6  * SPDX-License-Identifier: MPL-2.0
7  *
8  * This Source Code Form is subject to the terms of the Mozilla Public
9  * License, v. 2.0. If a copy of the MPL was not distributed with this
10  * file, you can obtain one at https://mozilla.org/MPL/2.0/.
11  *
12  * See the COPYRIGHT file distributed with this work for additional
13  * information regarding copyright ownership.
14  */
15 
16 #include <libgen.h>
17 #include <unistd.h>
18 #include <uv.h>
19 
20 #include <isc/atomic.h>
21 #include <isc/barrier.h>
22 #include <isc/buffer.h>
23 #include <isc/condition.h>
24 #include <isc/errno.h>
25 #include <isc/log.h>
26 #include <isc/magic.h>
27 #include <isc/mem.h>
28 #include <isc/netmgr.h>
29 #include <isc/quota.h>
30 #include <isc/random.h>
31 #include <isc/refcount.h>
32 #include <isc/region.h>
33 #include <isc/result.h>
34 #include <isc/sockaddr.h>
35 #include <isc/stdtime.h>
36 #include <isc/thread.h>
37 #include <isc/util.h>
38 
39 #include "netmgr-int.h"
40 #include "uv-compat.h"
41 
42 static atomic_uint_fast32_t last_tcpquota_log = 0;
43 
44 static bool
can_log_tcp_quota(void)45 can_log_tcp_quota(void) {
46 	isc_stdtime_t now, last;
47 
48 	isc_stdtime_get(&now);
49 	last = atomic_exchange_relaxed(&last_tcpquota_log, now);
50 	if (now != last) {
51 		return (true);
52 	}
53 
54 	return (false);
55 }
56 
57 static isc_result_t
58 tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req);
59 
60 static void
61 tcp_close_direct(isc_nmsocket_t *sock);
62 
63 static isc_result_t
64 tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req);
65 static void
66 tcp_connect_cb(uv_connect_t *uvreq, int status);
67 
68 static void
69 tcp_connection_cb(uv_stream_t *server, int status);
70 
71 static void
72 tcp_close_cb(uv_handle_t *uvhandle);
73 
74 static isc_result_t
75 accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota);
76 
77 static void
78 quota_accept_cb(isc_quota_t *quota, void *sock0);
79 
80 static void
81 failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult);
82 
83 static void
84 stop_tcp_parent(isc_nmsocket_t *sock);
85 static void
86 stop_tcp_child(isc_nmsocket_t *sock);
87 
88 static void
failed_accept_cb(isc_nmsocket_t * sock,isc_result_t eresult)89 failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) {
90 	REQUIRE(sock->accepting);
91 	REQUIRE(sock->server);
92 
93 	/*
94 	 * Detach the quota early to make room for other connections;
95 	 * otherwise it'd be detached later asynchronously, and clog
96 	 * the quota unnecessarily.
97 	 */
98 	if (sock->quota != NULL) {
99 		isc_quota_detach(&sock->quota);
100 	}
101 
102 	isc__nmsocket_detach(&sock->server);
103 
104 	sock->accepting = false;
105 
106 	switch (eresult) {
107 	case ISC_R_NOTCONNECTED:
108 		/* IGNORE: The client disconnected before we could accept */
109 		break;
110 	default:
111 		isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
112 			      ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
113 			      "Accepting TCP connection failed: %s",
114 			      isc_result_totext(eresult));
115 	}
116 }
117 
118 static isc_result_t
tcp_connect_direct(isc_nmsocket_t * sock,isc__nm_uvreq_t * req)119 tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
120 	isc__networker_t *worker = NULL;
121 	isc_result_t result = ISC_R_UNSET;
122 	int r;
123 
124 	REQUIRE(VALID_NMSOCK(sock));
125 	REQUIRE(VALID_UVREQ(req));
126 
127 	REQUIRE(isc__nm_in_netthread());
128 	REQUIRE(sock->tid == isc_nm_tid());
129 
130 	worker = &sock->mgr->workers[sock->tid];
131 
132 	atomic_store(&sock->connecting, true);
133 
134 	/* 2 minute timeout */
135 	result = isc__nm_socket_connectiontimeout(sock->fd, 120 * 1000);
136 	RUNTIME_CHECK(result == ISC_R_SUCCESS);
137 
138 	r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
139 	UV_RUNTIME_CHECK(uv_tcp_init, r);
140 	uv_handle_set_data(&sock->uv_handle.handle, sock);
141 
142 	r = uv_timer_init(&worker->loop, &sock->read_timer);
143 	UV_RUNTIME_CHECK(uv_timer_init, r);
144 	uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock);
145 
146 	r = uv_tcp_open(&sock->uv_handle.tcp, sock->fd);
147 	if (r != 0) {
148 		isc__nm_closesocket(sock->fd);
149 		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
150 		goto done;
151 	}
152 	isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]);
153 
154 	if (req->local.length != 0) {
155 		r = uv_tcp_bind(&sock->uv_handle.tcp, &req->local.type.sa, 0);
156 		if (r != 0) {
157 			isc__nm_incstats(sock->mgr,
158 					 sock->statsindex[STATID_BINDFAIL]);
159 			goto done;
160 		}
161 	}
162 
163 	uv_handle_set_data(&req->uv_req.handle, req);
164 	r = uv_tcp_connect(&req->uv_req.connect, &sock->uv_handle.tcp,
165 			   &req->peer.type.sa, tcp_connect_cb);
166 	if (r != 0) {
167 		isc__nm_incstats(sock->mgr,
168 				 sock->statsindex[STATID_CONNECTFAIL]);
169 		goto done;
170 	}
171 	isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
172 
173 	uv_handle_set_data((uv_handle_t *)&sock->read_timer,
174 			   &req->uv_req.connect);
175 	isc__nmsocket_timer_start(sock);
176 
177 	atomic_store(&sock->connected, true);
178 
179 done:
180 	result = isc__nm_uverr2result(r);
181 	LOCK(&sock->lock);
182 	sock->result = result;
183 	SIGNAL(&sock->cond);
184 	if (!atomic_load(&sock->active)) {
185 		WAIT(&sock->scond, &sock->lock);
186 	}
187 	INSIST(atomic_load(&sock->active));
188 	UNLOCK(&sock->lock);
189 
190 	return (result);
191 }
192 
193 void
isc__nm_async_tcpconnect(isc__networker_t * worker,isc__netievent_t * ev0)194 isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
195 	isc__netievent_tcpconnect_t *ievent =
196 		(isc__netievent_tcpconnect_t *)ev0;
197 	isc_nmsocket_t *sock = ievent->sock;
198 	isc__nm_uvreq_t *req = ievent->req;
199 	isc_result_t result = ISC_R_SUCCESS;
200 
201 	UNUSED(worker);
202 
203 	REQUIRE(VALID_NMSOCK(sock));
204 	REQUIRE(sock->type == isc_nm_tcpsocket);
205 	REQUIRE(sock->parent == NULL);
206 	REQUIRE(sock->tid == isc_nm_tid());
207 
208 	result = tcp_connect_direct(sock, req);
209 	if (result != ISC_R_SUCCESS) {
210 		atomic_store(&sock->active, false);
211 		if (sock->fd != (uv_os_sock_t)(-1)) {
212 			isc__nm_tcp_close(sock);
213 		}
214 		isc__nm_connectcb(sock, req, result, true);
215 	}
216 
217 	/*
218 	 * The sock is now attached to the handle.
219 	 */
220 	isc__nmsocket_detach(&sock);
221 }
222 
223 static void
tcp_connect_cb(uv_connect_t * uvreq,int status)224 tcp_connect_cb(uv_connect_t *uvreq, int status) {
225 	isc_result_t result;
226 	isc__nm_uvreq_t *req = NULL;
227 	isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
228 	struct sockaddr_storage ss;
229 	int r;
230 
231 	REQUIRE(VALID_NMSOCK(sock));
232 	REQUIRE(sock->tid == isc_nm_tid());
233 
234 	isc__nmsocket_timer_stop(sock);
235 	uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock);
236 
237 	req = uv_handle_get_data((uv_handle_t *)uvreq);
238 
239 	REQUIRE(VALID_UVREQ(req));
240 	REQUIRE(VALID_NMHANDLE(req->handle));
241 
242 	if (atomic_load(&sock->timedout)) {
243 		result = ISC_R_TIMEDOUT;
244 		goto error;
245 	}
246 
247 	if (!atomic_load(&sock->connecting)) {
248 		/*
249 		 * The connect was cancelled from timeout; just clean up
250 		 * the req.
251 		 */
252 		isc__nm_uvreq_put(&req, sock);
253 		return;
254 	} else if (isc__nmsocket_closing(sock)) {
255 		/* Socket was closed midflight by isc__nm_tcp_shutdown() */
256 		result = ISC_R_CANCELED;
257 		goto error;
258 	} else if (status == UV_ETIMEDOUT) {
259 		/* Timeout status code here indicates hard error */
260 		result = ISC_R_TIMEDOUT;
261 		goto error;
262 	} else if (status != 0) {
263 		result = isc__nm_uverr2result(status);
264 		goto error;
265 	}
266 
267 	isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
268 	r = uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss,
269 			       &(int){ sizeof(ss) });
270 	if (r != 0) {
271 		result = isc__nm_uverr2result(r);
272 		goto error;
273 	}
274 
275 	atomic_store(&sock->connecting, false);
276 
277 	result = isc_sockaddr_fromsockaddr(&sock->peer, (struct sockaddr *)&ss);
278 	RUNTIME_CHECK(result == ISC_R_SUCCESS);
279 
280 	isc__nm_connectcb(sock, req, ISC_R_SUCCESS, false);
281 
282 	return;
283 
284 error:
285 	isc__nm_failed_connect_cb(sock, req, result, false);
286 }
287 
288 void
isc_nm_tcpconnect(isc_nm_t * mgr,isc_sockaddr_t * local,isc_sockaddr_t * peer,isc_nm_cb_t cb,void * cbarg,unsigned int timeout,size_t extrahandlesize)289 isc_nm_tcpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
290 		  isc_nm_cb_t cb, void *cbarg, unsigned int timeout,
291 		  size_t extrahandlesize) {
292 	isc_result_t result = ISC_R_SUCCESS;
293 	isc_nmsocket_t *sock = NULL;
294 	isc__netievent_tcpconnect_t *ievent = NULL;
295 	isc__nm_uvreq_t *req = NULL;
296 	sa_family_t sa_family;
297 
298 	REQUIRE(VALID_NM(mgr));
299 	REQUIRE(local != NULL);
300 	REQUIRE(peer != NULL);
301 
302 	sa_family = peer->type.sa.sa_family;
303 
304 	sock = isc_mem_get(mgr->mctx, sizeof(*sock));
305 	isc__nmsocket_init(sock, mgr, isc_nm_tcpsocket, local);
306 
307 	sock->extrahandlesize = extrahandlesize;
308 	sock->connect_timeout = timeout;
309 	sock->result = ISC_R_UNSET;
310 	sock->fd = (uv_os_sock_t)-1;
311 	atomic_init(&sock->client, true);
312 
313 	req = isc__nm_uvreq_get(mgr, sock);
314 	req->cb.connect = cb;
315 	req->cbarg = cbarg;
316 	req->peer = *peer;
317 	req->local = *local;
318 	req->handle = isc__nmhandle_get(sock, &req->peer, &sock->iface);
319 
320 	result = isc__nm_socket(sa_family, SOCK_STREAM, 0, &sock->fd);
321 	if (result != ISC_R_SUCCESS) {
322 		if (isc__nm_in_netthread()) {
323 			sock->tid = isc_nm_tid();
324 			isc__nmsocket_clearcb(sock);
325 			isc__nm_connectcb(sock, req, result, false);
326 		} else {
327 			isc__nmsocket_clearcb(sock);
328 			sock->tid = isc_random_uniform(mgr->nlisteners);
329 			isc__nm_connectcb(sock, req, result, true);
330 		}
331 		atomic_store(&sock->closed, true);
332 		isc__nmsocket_detach(&sock);
333 		return;
334 	}
335 
336 	ievent = isc__nm_get_netievent_tcpconnect(mgr, sock, req);
337 
338 	if (isc__nm_in_netthread()) {
339 		atomic_store(&sock->active, true);
340 		sock->tid = isc_nm_tid();
341 		isc__nm_async_tcpconnect(&mgr->workers[sock->tid],
342 					 (isc__netievent_t *)ievent);
343 		isc__nm_put_netievent_tcpconnect(mgr, ievent);
344 	} else {
345 		atomic_init(&sock->active, false);
346 		sock->tid = isc_random_uniform(mgr->nlisteners);
347 		isc__nm_enqueue_ievent(&mgr->workers[sock->tid],
348 				       (isc__netievent_t *)ievent);
349 	}
350 	LOCK(&sock->lock);
351 	while (sock->result == ISC_R_UNSET) {
352 		WAIT(&sock->cond, &sock->lock);
353 	}
354 	atomic_store(&sock->active, true);
355 	BROADCAST(&sock->scond);
356 	UNLOCK(&sock->lock);
357 }
358 
359 static uv_os_sock_t
isc__nm_tcp_lb_socket(isc_nm_t * mgr,sa_family_t sa_family)360 isc__nm_tcp_lb_socket(isc_nm_t *mgr, sa_family_t sa_family) {
361 	isc_result_t result;
362 	uv_os_sock_t sock;
363 
364 	result = isc__nm_socket(sa_family, SOCK_STREAM, 0, &sock);
365 	RUNTIME_CHECK(result == ISC_R_SUCCESS);
366 
367 	(void)isc__nm_socket_incoming_cpu(sock);
368 
369 	/* FIXME: set mss */
370 
371 	result = isc__nm_socket_reuse(sock);
372 	RUNTIME_CHECK(result == ISC_R_SUCCESS);
373 
374 #ifndef _WIN32
375 	if (mgr->load_balance_sockets) {
376 		result = isc__nm_socket_reuse_lb(sock);
377 		RUNTIME_CHECK(result == ISC_R_SUCCESS);
378 	}
379 #endif
380 
381 	return (sock);
382 }
383 
384 static void
start_tcp_child(isc_nm_t * mgr,isc_sockaddr_t * iface,isc_nmsocket_t * sock,uv_os_sock_t fd,int tid)385 start_tcp_child(isc_nm_t *mgr, isc_sockaddr_t *iface, isc_nmsocket_t *sock,
386 		uv_os_sock_t fd, int tid) {
387 	isc__netievent_tcplisten_t *ievent = NULL;
388 	isc_nmsocket_t *csock = &sock->children[tid];
389 
390 	isc__nmsocket_init(csock, mgr, isc_nm_tcpsocket, iface);
391 	csock->parent = sock;
392 	csock->accept_cb = sock->accept_cb;
393 	csock->accept_cbarg = sock->accept_cbarg;
394 	csock->extrahandlesize = sock->extrahandlesize;
395 	csock->backlog = sock->backlog;
396 	csock->tid = tid;
397 	/*
398 	 * We don't attach to quota, just assign - to avoid
399 	 * increasing quota unnecessarily.
400 	 */
401 	csock->pquota = sock->pquota;
402 	isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock);
403 
404 #ifdef _WIN32
405 	UNUSED(fd);
406 	csock->fd = isc__nm_tcp_lb_socket(mgr, iface->type.sa.sa_family);
407 #else
408 	if (mgr->load_balance_sockets) {
409 		UNUSED(fd);
410 		csock->fd = isc__nm_tcp_lb_socket(mgr,
411 						  iface->type.sa.sa_family);
412 	} else {
413 		csock->fd = dup(fd);
414 	}
415 #endif
416 	REQUIRE(csock->fd >= 0);
417 
418 	ievent = isc__nm_get_netievent_tcplisten(mgr, csock);
419 	isc__nm_maybe_enqueue_ievent(&mgr->workers[tid],
420 				     (isc__netievent_t *)ievent);
421 }
422 
423 static void
enqueue_stoplistening(isc_nmsocket_t * sock)424 enqueue_stoplistening(isc_nmsocket_t *sock) {
425 	isc__netievent_tcpstop_t *ievent =
426 		isc__nm_get_netievent_tcpstop(sock->mgr, sock);
427 	isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
428 			       (isc__netievent_t *)ievent);
429 }
430 
431 isc_result_t
isc_nm_listentcp(isc_nm_t * mgr,isc_sockaddr_t * iface,isc_nm_accept_cb_t accept_cb,void * accept_cbarg,size_t extrahandlesize,int backlog,isc_quota_t * quota,isc_nmsocket_t ** sockp)432 isc_nm_listentcp(isc_nm_t *mgr, isc_sockaddr_t *iface,
433 		 isc_nm_accept_cb_t accept_cb, void *accept_cbarg,
434 		 size_t extrahandlesize, int backlog, isc_quota_t *quota,
435 		 isc_nmsocket_t **sockp) {
436 	isc_result_t result = ISC_R_SUCCESS;
437 	isc_nmsocket_t *sock = NULL;
438 	size_t children_size = 0;
439 	uv_os_sock_t fd = -1;
440 
441 	REQUIRE(VALID_NM(mgr));
442 
443 	sock = isc_mem_get(mgr->mctx, sizeof(*sock));
444 	isc__nmsocket_init(sock, mgr, isc_nm_tcplistener, iface);
445 
446 	atomic_init(&sock->rchildren, 0);
447 #if defined(WIN32)
448 	sock->nchildren = 1;
449 #else
450 	sock->nchildren = mgr->nlisteners;
451 #endif
452 	children_size = sock->nchildren * sizeof(sock->children[0]);
453 	sock->children = isc_mem_get(mgr->mctx, children_size);
454 	memset(sock->children, 0, children_size);
455 
456 	sock->result = ISC_R_UNSET;
457 
458 	sock->accept_cb = accept_cb;
459 	sock->accept_cbarg = accept_cbarg;
460 	sock->extrahandlesize = extrahandlesize;
461 	sock->backlog = backlog;
462 	sock->pquota = quota;
463 
464 	sock->tid = 0;
465 	sock->fd = -1;
466 
467 #ifndef _WIN32
468 	if (!mgr->load_balance_sockets) {
469 		fd = isc__nm_tcp_lb_socket(mgr, iface->type.sa.sa_family);
470 	}
471 #endif
472 
473 	isc_barrier_init(&sock->startlistening, sock->nchildren);
474 
475 	for (size_t i = 0; i < sock->nchildren; i++) {
476 		if ((int)i == isc_nm_tid()) {
477 			continue;
478 		}
479 		start_tcp_child(mgr, iface, sock, fd, i);
480 	}
481 
482 	if (isc__nm_in_netthread()) {
483 		start_tcp_child(mgr, iface, sock, fd, isc_nm_tid());
484 	}
485 
486 #ifndef _WIN32
487 	if (!mgr->load_balance_sockets) {
488 		isc__nm_closesocket(fd);
489 	}
490 #endif
491 
492 	LOCK(&sock->lock);
493 	while (atomic_load(&sock->rchildren) != sock->nchildren) {
494 		WAIT(&sock->cond, &sock->lock);
495 	}
496 	result = sock->result;
497 	atomic_store(&sock->active, true);
498 	UNLOCK(&sock->lock);
499 
500 	INSIST(result != ISC_R_UNSET);
501 
502 	if (result == ISC_R_SUCCESS) {
503 		REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren);
504 		*sockp = sock;
505 	} else {
506 		atomic_store(&sock->active, false);
507 		enqueue_stoplistening(sock);
508 		isc_nmsocket_close(&sock);
509 	}
510 
511 	return (result);
512 }
513 
514 void
isc__nm_async_tcplisten(isc__networker_t * worker,isc__netievent_t * ev0)515 isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
516 	isc__netievent_tcplisten_t *ievent = (isc__netievent_tcplisten_t *)ev0;
517 	sa_family_t sa_family;
518 	int r;
519 	int flags = 0;
520 	isc_nmsocket_t *sock = NULL;
521 	isc_result_t result;
522 	isc_nm_t *mgr;
523 
524 	REQUIRE(VALID_NMSOCK(ievent->sock));
525 	REQUIRE(ievent->sock->tid == isc_nm_tid());
526 	REQUIRE(VALID_NMSOCK(ievent->sock->parent));
527 
528 	sock = ievent->sock;
529 	sa_family = sock->iface.type.sa.sa_family;
530 	mgr = sock->mgr;
531 
532 	REQUIRE(sock->type == isc_nm_tcpsocket);
533 	REQUIRE(sock->parent != NULL);
534 	REQUIRE(sock->tid == isc_nm_tid());
535 
536 	/* TODO: set min mss */
537 
538 	r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
539 	UV_RUNTIME_CHECK(uv_tcp_init, r);
540 
541 	uv_handle_set_data(&sock->uv_handle.handle, sock);
542 	/* This keeps the socket alive after everything else is gone */
543 	isc__nmsocket_attach(sock, &(isc_nmsocket_t *){ NULL });
544 
545 	r = uv_timer_init(&worker->loop, &sock->read_timer);
546 	UV_RUNTIME_CHECK(uv_timer_init, r);
547 	uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock);
548 
549 	LOCK(&sock->parent->lock);
550 
551 	r = uv_tcp_open(&sock->uv_handle.tcp, sock->fd);
552 	if (r < 0) {
553 		isc__nm_closesocket(sock->fd);
554 		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
555 		goto done;
556 	}
557 	isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]);
558 
559 	if (sa_family == AF_INET6) {
560 		flags = UV_TCP_IPV6ONLY;
561 	}
562 
563 #ifdef _WIN32
564 	r = isc_uv_tcp_freebind(&sock->uv_handle.tcp, &sock->iface.type.sa,
565 				flags);
566 	if (r < 0) {
567 		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]);
568 		goto done;
569 	}
570 #else
571 	if (mgr->load_balance_sockets) {
572 		r = isc_uv_tcp_freebind(&sock->uv_handle.tcp,
573 					&sock->iface.type.sa, flags);
574 		if (r < 0) {
575 			isc__nm_incstats(sock->mgr,
576 					 sock->statsindex[STATID_BINDFAIL]);
577 			goto done;
578 		}
579 	} else {
580 		if (sock->parent->fd == -1) {
581 			r = isc_uv_tcp_freebind(&sock->uv_handle.tcp,
582 						&sock->iface.type.sa, flags);
583 			if (r < 0) {
584 				isc__nm_incstats(sock->mgr, STATID_BINDFAIL);
585 				goto done;
586 			}
587 			sock->parent->uv_handle.tcp.flags =
588 				sock->uv_handle.tcp.flags;
589 			sock->parent->fd = sock->fd;
590 		} else {
591 			/* The socket is already bound, just copy the flags */
592 			sock->uv_handle.tcp.flags =
593 				sock->parent->uv_handle.tcp.flags;
594 		}
595 	}
596 #endif
597 
598 	/*
599 	 * The callback will run in the same thread uv_listen() was called
600 	 * from, so a race with tcp_connection_cb() isn't possible.
601 	 */
602 	r = uv_listen((uv_stream_t *)&sock->uv_handle.tcp, sock->backlog,
603 		      tcp_connection_cb);
604 	if (r != 0) {
605 		isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
606 			      ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
607 			      "uv_listen failed: %s",
608 			      isc_result_totext(isc__nm_uverr2result(r)));
609 		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]);
610 		goto done;
611 	}
612 
613 	atomic_store(&sock->listening, true);
614 
615 done:
616 	result = isc__nm_uverr2result(r);
617 	if (result != ISC_R_SUCCESS) {
618 		sock->pquota = NULL;
619 	}
620 
621 	atomic_fetch_add(&sock->parent->rchildren, 1);
622 	if (sock->parent->result == ISC_R_UNSET) {
623 		sock->parent->result = result;
624 	}
625 	SIGNAL(&sock->parent->cond);
626 	UNLOCK(&sock->parent->lock);
627 
628 	isc_barrier_wait(&sock->parent->startlistening);
629 }
630 
631 static void
tcp_connection_cb(uv_stream_t * server,int status)632 tcp_connection_cb(uv_stream_t *server, int status) {
633 	isc_nmsocket_t *ssock = uv_handle_get_data((uv_handle_t *)server);
634 	isc_result_t result;
635 	isc_quota_t *quota = NULL;
636 
637 	if (status != 0) {
638 		result = isc__nm_uverr2result(status);
639 		goto done;
640 	}
641 
642 	REQUIRE(VALID_NMSOCK(ssock));
643 	REQUIRE(ssock->tid == isc_nm_tid());
644 
645 	if (isc__nmsocket_closing(ssock)) {
646 		result = ISC_R_CANCELED;
647 		goto done;
648 	}
649 
650 	if (ssock->pquota != NULL) {
651 		result = isc_quota_attach_cb(ssock->pquota, &quota,
652 					     &ssock->quotacb);
653 		if (result == ISC_R_QUOTA) {
654 			isc__nm_incstats(ssock->mgr,
655 					 ssock->statsindex[STATID_ACCEPTFAIL]);
656 			goto done;
657 		}
658 	}
659 
660 	result = accept_connection(ssock, quota);
661 done:
662 	isc__nm_accept_connection_log(result, can_log_tcp_quota());
663 }
664 
665 void
isc__nm_tcp_stoplistening(isc_nmsocket_t * sock)666 isc__nm_tcp_stoplistening(isc_nmsocket_t *sock) {
667 	REQUIRE(VALID_NMSOCK(sock));
668 	REQUIRE(sock->type == isc_nm_tcplistener);
669 
670 	if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
671 					    true))
672 	{
673 		UNREACHABLE();
674 	}
675 
676 	if (!isc__nm_in_netthread()) {
677 		enqueue_stoplistening(sock);
678 	} else {
679 		stop_tcp_parent(sock);
680 	}
681 }
682 
683 void
isc__nm_async_tcpstop(isc__networker_t * worker,isc__netievent_t * ev0)684 isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0) {
685 	isc__netievent_tcpstop_t *ievent = (isc__netievent_tcpstop_t *)ev0;
686 	isc_nmsocket_t *sock = ievent->sock;
687 
688 	UNUSED(worker);
689 
690 	REQUIRE(VALID_NMSOCK(sock));
691 	REQUIRE(sock->tid == isc_nm_tid());
692 
693 	if (sock->parent != NULL) {
694 		stop_tcp_child(sock);
695 		return;
696 	}
697 
698 	stop_tcp_parent(sock);
699 }
700 
701 void
isc__nm_tcp_failed_read_cb(isc_nmsocket_t * sock,isc_result_t result)702 isc__nm_tcp_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
703 	REQUIRE(VALID_NMSOCK(sock));
704 	REQUIRE(result != ISC_R_SUCCESS);
705 
706 	isc__nmsocket_timer_stop(sock);
707 	isc__nm_stop_reading(sock);
708 
709 	if (!sock->recv_read) {
710 		goto destroy;
711 	}
712 	sock->recv_read = false;
713 
714 	if (sock->recv_cb != NULL) {
715 		isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL);
716 		isc__nmsocket_clearcb(sock);
717 		isc__nm_readcb(sock, req, result);
718 	}
719 
720 destroy:
721 	isc__nmsocket_prep_destroy(sock);
722 
723 	/*
724 	 * We need to detach from quota after the read callback function had a
725 	 * chance to be executed.
726 	 */
727 	if (sock->quota != NULL) {
728 		isc_quota_detach(&sock->quota);
729 	}
730 }
731 
732 void
isc__nm_tcp_read(isc_nmhandle_t * handle,isc_nm_recv_cb_t cb,void * cbarg)733 isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
734 	REQUIRE(VALID_NMHANDLE(handle));
735 	REQUIRE(VALID_NMSOCK(handle->sock));
736 
737 	isc_nmsocket_t *sock = handle->sock;
738 	isc__netievent_tcpstartread_t *ievent = NULL;
739 
740 	REQUIRE(sock->type == isc_nm_tcpsocket);
741 	REQUIRE(sock->statichandle == handle);
742 	REQUIRE(sock->tid == isc_nm_tid());
743 	REQUIRE(!sock->recv_read);
744 
745 	sock->recv_cb = cb;
746 	sock->recv_cbarg = cbarg;
747 	sock->recv_read = true;
748 	if (sock->read_timeout == 0) {
749 		sock->read_timeout =
750 			(atomic_load(&sock->keepalive)
751 				 ? atomic_load(&sock->mgr->keepalive)
752 				 : atomic_load(&sock->mgr->idle));
753 	}
754 
755 	ievent = isc__nm_get_netievent_tcpstartread(sock->mgr, sock);
756 
757 	/*
758 	 * This MUST be done asynchronously, no matter which thread we're
759 	 * in. The callback function for isc_nm_read() often calls
760 	 * isc_nm_read() again; if we tried to do that synchronously
761 	 * we'd clash in processbuffer() and grow the stack indefinitely.
762 	 */
763 	isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
764 			       (isc__netievent_t *)ievent);
765 
766 	return;
767 }
768 
769 void
isc__nm_async_tcpstartread(isc__networker_t * worker,isc__netievent_t * ev0)770 isc__nm_async_tcpstartread(isc__networker_t *worker, isc__netievent_t *ev0) {
771 	isc__netievent_tcpstartread_t *ievent =
772 		(isc__netievent_tcpstartread_t *)ev0;
773 	isc_nmsocket_t *sock = ievent->sock;
774 	isc_result_t result;
775 
776 	REQUIRE(VALID_NMSOCK(sock));
777 	REQUIRE(sock->tid == isc_nm_tid());
778 	UNUSED(worker);
779 
780 	if (isc__nmsocket_closing(sock)) {
781 		result = ISC_R_CANCELED;
782 	} else {
783 		result = isc__nm_start_reading(sock);
784 	}
785 
786 	if (result != ISC_R_SUCCESS) {
787 		sock->reading = true;
788 		isc__nm_tcp_failed_read_cb(sock, result);
789 		return;
790 	}
791 
792 	isc__nmsocket_timer_start(sock);
793 }
794 
795 void
isc__nm_tcp_pauseread(isc_nmhandle_t * handle)796 isc__nm_tcp_pauseread(isc_nmhandle_t *handle) {
797 	isc__netievent_tcppauseread_t *ievent = NULL;
798 	isc_nmsocket_t *sock = NULL;
799 
800 	REQUIRE(VALID_NMHANDLE(handle));
801 
802 	sock = handle->sock;
803 
804 	REQUIRE(VALID_NMSOCK(sock));
805 
806 	if (!atomic_compare_exchange_strong(&sock->readpaused, &(bool){ false },
807 					    true))
808 	{
809 		return;
810 	}
811 
812 	ievent = isc__nm_get_netievent_tcppauseread(sock->mgr, sock);
813 
814 	isc__nm_maybe_enqueue_ievent(&sock->mgr->workers[sock->tid],
815 				     (isc__netievent_t *)ievent);
816 
817 	return;
818 }
819 
820 void
isc__nm_async_tcppauseread(isc__networker_t * worker,isc__netievent_t * ev0)821 isc__nm_async_tcppauseread(isc__networker_t *worker, isc__netievent_t *ev0) {
822 	isc__netievent_tcppauseread_t *ievent =
823 		(isc__netievent_tcppauseread_t *)ev0;
824 	isc_nmsocket_t *sock = ievent->sock;
825 
826 	REQUIRE(VALID_NMSOCK(sock));
827 	REQUIRE(sock->tid == isc_nm_tid());
828 	UNUSED(worker);
829 
830 	isc__nmsocket_timer_stop(sock);
831 	isc__nm_stop_reading(sock);
832 }
833 
834 void
isc__nm_tcp_resumeread(isc_nmhandle_t * handle)835 isc__nm_tcp_resumeread(isc_nmhandle_t *handle) {
836 	REQUIRE(VALID_NMHANDLE(handle));
837 	REQUIRE(VALID_NMSOCK(handle->sock));
838 
839 	isc__netievent_tcpstartread_t *ievent = NULL;
840 	isc_nmsocket_t *sock = handle->sock;
841 
842 	REQUIRE(sock->tid == isc_nm_tid());
843 
844 	if (sock->recv_cb == NULL) {
845 		/* We are no longer reading */
846 		return;
847 	}
848 
849 	if (!isc__nmsocket_active(sock)) {
850 		sock->reading = true;
851 		isc__nm_tcp_failed_read_cb(sock, ISC_R_CANCELED);
852 		return;
853 	}
854 
855 	if (!atomic_compare_exchange_strong(&sock->readpaused, &(bool){ true },
856 					    false))
857 	{
858 		return;
859 	}
860 
861 	ievent = isc__nm_get_netievent_tcpstartread(sock->mgr, sock);
862 
863 	isc__nm_maybe_enqueue_ievent(&sock->mgr->workers[sock->tid],
864 				     (isc__netievent_t *)ievent);
865 }
866 
867 void
isc__nm_tcp_read_cb(uv_stream_t * stream,ssize_t nread,const uv_buf_t * buf)868 isc__nm_tcp_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
869 	isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)stream);
870 	isc__nm_uvreq_t *req = NULL;
871 
872 	REQUIRE(VALID_NMSOCK(sock));
873 	REQUIRE(sock->tid == isc_nm_tid());
874 	REQUIRE(sock->reading);
875 	REQUIRE(buf != NULL);
876 
877 	if (isc__nmsocket_closing(sock)) {
878 		isc__nm_tcp_failed_read_cb(sock, ISC_R_CANCELED);
879 		goto free;
880 	}
881 
882 	if (nread < 0) {
883 		if (nread != UV_EOF) {
884 			isc__nm_incstats(sock->mgr,
885 					 sock->statsindex[STATID_RECVFAIL]);
886 		}
887 
888 		isc__nm_tcp_failed_read_cb(sock, isc__nm_uverr2result(nread));
889 
890 		goto free;
891 	}
892 
893 	req = isc__nm_get_read_req(sock, NULL);
894 
895 	/*
896 	 * The callback will be called synchronously because the
897 	 * result is ISC_R_SUCCESS, so we don't need to retain
898 	 * the buffer
899 	 */
900 	req->uvbuf.base = buf->base;
901 	req->uvbuf.len = nread;
902 
903 	if (!atomic_load(&sock->client)) {
904 		sock->read_timeout =
905 			(atomic_load(&sock->keepalive)
906 				 ? atomic_load(&sock->mgr->keepalive)
907 				 : atomic_load(&sock->mgr->idle));
908 	}
909 
910 	isc__nm_readcb(sock, req, ISC_R_SUCCESS);
911 
912 	/* The readcb could have paused the reading */
913 	if (sock->reading) {
914 		/* The timer will be updated */
915 		isc__nmsocket_timer_restart(sock);
916 	}
917 
918 free:
919 	if (nread < 0) {
920 		/*
921 		 * The buffer may be a null buffer on error.
922 		 */
923 		if (buf->base == NULL && buf->len == 0) {
924 			return;
925 		}
926 	}
927 
928 	isc__nm_free_uvbuf(sock, buf);
929 }
930 
931 static void
quota_accept_cb(isc_quota_t * quota,void * sock0)932 quota_accept_cb(isc_quota_t *quota, void *sock0) {
933 	isc_nmsocket_t *sock = (isc_nmsocket_t *)sock0;
934 	isc__netievent_tcpaccept_t *ievent = NULL;
935 
936 	REQUIRE(VALID_NMSOCK(sock));
937 
938 	/*
939 	 * Create a tcpaccept event and pass it using the async channel.
940 	 */
941 	ievent = isc__nm_get_netievent_tcpaccept(sock->mgr, sock, quota);
942 	isc__nm_maybe_enqueue_ievent(&sock->mgr->workers[sock->tid],
943 				     (isc__netievent_t *)ievent);
944 }
945 
946 /*
947  * This is called after we get a quota_accept_cb() callback.
948  */
949 void
isc__nm_async_tcpaccept(isc__networker_t * worker,isc__netievent_t * ev0)950 isc__nm_async_tcpaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
951 	isc__netievent_tcpaccept_t *ievent = (isc__netievent_tcpaccept_t *)ev0;
952 	isc_nmsocket_t *sock = ievent->sock;
953 	isc_result_t result;
954 
955 	UNUSED(worker);
956 
957 	REQUIRE(VALID_NMSOCK(sock));
958 	REQUIRE(sock->tid == isc_nm_tid());
959 
960 	result = accept_connection(sock, ievent->quota);
961 	isc__nm_accept_connection_log(result, can_log_tcp_quota());
962 }
963 
964 static isc_result_t
accept_connection(isc_nmsocket_t * ssock,isc_quota_t * quota)965 accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) {
966 	isc_nmsocket_t *csock = NULL;
967 	isc__networker_t *worker = NULL;
968 	int r;
969 	isc_result_t result;
970 	struct sockaddr_storage ss;
971 	isc_sockaddr_t local;
972 	isc_nmhandle_t *handle = NULL;
973 
974 	REQUIRE(VALID_NMSOCK(ssock));
975 	REQUIRE(ssock->tid == isc_nm_tid());
976 
977 	if (isc__nmsocket_closing(ssock)) {
978 		if (quota != NULL) {
979 			isc_quota_detach(&quota);
980 		}
981 		return (ISC_R_CANCELED);
982 	}
983 
984 	csock = isc_mem_get(ssock->mgr->mctx, sizeof(isc_nmsocket_t));
985 	isc__nmsocket_init(csock, ssock->mgr, isc_nm_tcpsocket, &ssock->iface);
986 	csock->tid = ssock->tid;
987 	csock->extrahandlesize = ssock->extrahandlesize;
988 	isc__nmsocket_attach(ssock, &csock->server);
989 	csock->recv_cb = ssock->recv_cb;
990 	csock->recv_cbarg = ssock->recv_cbarg;
991 	csock->quota = quota;
992 	csock->accepting = true;
993 
994 	worker = &csock->mgr->workers[isc_nm_tid()];
995 
996 	r = uv_tcp_init(&worker->loop, &csock->uv_handle.tcp);
997 	UV_RUNTIME_CHECK(uv_tcp_init, r);
998 	uv_handle_set_data(&csock->uv_handle.handle, csock);
999 
1000 	r = uv_timer_init(&worker->loop, &csock->read_timer);
1001 	UV_RUNTIME_CHECK(uv_timer_init, r);
1002 	uv_handle_set_data((uv_handle_t *)&csock->read_timer, csock);
1003 
1004 	r = uv_accept(&ssock->uv_handle.stream, &csock->uv_handle.stream);
1005 	if (r != 0) {
1006 		result = isc__nm_uverr2result(r);
1007 		goto failure;
1008 	}
1009 
1010 	r = uv_tcp_getpeername(&csock->uv_handle.tcp, (struct sockaddr *)&ss,
1011 			       &(int){ sizeof(ss) });
1012 	if (r != 0) {
1013 		result = isc__nm_uverr2result(r);
1014 		goto failure;
1015 	}
1016 
1017 	result = isc_sockaddr_fromsockaddr(&csock->peer,
1018 					   (struct sockaddr *)&ss);
1019 	if (result != ISC_R_SUCCESS) {
1020 		goto failure;
1021 	}
1022 
1023 	r = uv_tcp_getsockname(&csock->uv_handle.tcp, (struct sockaddr *)&ss,
1024 			       &(int){ sizeof(ss) });
1025 	if (r != 0) {
1026 		result = isc__nm_uverr2result(r);
1027 		goto failure;
1028 	}
1029 
1030 	result = isc_sockaddr_fromsockaddr(&local, (struct sockaddr *)&ss);
1031 	if (result != ISC_R_SUCCESS) {
1032 		goto failure;
1033 	}
1034 
1035 	handle = isc__nmhandle_get(csock, NULL, &local);
1036 
1037 	result = ssock->accept_cb(handle, ISC_R_SUCCESS, ssock->accept_cbarg);
1038 	if (result != ISC_R_SUCCESS) {
1039 		isc_nmhandle_detach(&handle);
1040 		goto failure;
1041 	}
1042 
1043 	csock->accepting = false;
1044 
1045 	isc__nm_incstats(csock->mgr, csock->statsindex[STATID_ACCEPT]);
1046 
1047 	csock->read_timeout = atomic_load(&csock->mgr->init);
1048 
1049 	atomic_fetch_add(&ssock->parent->active_child_connections, 1);
1050 
1051 	/*
1052 	 * The acceptcb needs to attach to the handle if it wants to keep the
1053 	 * connection alive
1054 	 */
1055 	isc_nmhandle_detach(&handle);
1056 
1057 	/*
1058 	 * sock is now attached to the handle.
1059 	 */
1060 	isc__nmsocket_detach(&csock);
1061 
1062 	return (ISC_R_SUCCESS);
1063 
1064 failure:
1065 	atomic_store(&csock->active, false);
1066 
1067 	failed_accept_cb(csock, result);
1068 
1069 	isc__nmsocket_prep_destroy(csock);
1070 
1071 	isc__nmsocket_detach(&csock);
1072 
1073 	return (result);
1074 }
1075 
1076 void
isc__nm_tcp_send(isc_nmhandle_t * handle,const isc_region_t * region,isc_nm_cb_t cb,void * cbarg)1077 isc__nm_tcp_send(isc_nmhandle_t *handle, const isc_region_t *region,
1078 		 isc_nm_cb_t cb, void *cbarg) {
1079 	REQUIRE(VALID_NMHANDLE(handle));
1080 	REQUIRE(VALID_NMSOCK(handle->sock));
1081 
1082 	isc_nmsocket_t *sock = handle->sock;
1083 	isc__netievent_tcpsend_t *ievent = NULL;
1084 	isc__nm_uvreq_t *uvreq = NULL;
1085 
1086 	REQUIRE(sock->type == isc_nm_tcpsocket);
1087 
1088 	uvreq = isc__nm_uvreq_get(sock->mgr, sock);
1089 	uvreq->uvbuf.base = (char *)region->base;
1090 	uvreq->uvbuf.len = region->length;
1091 
1092 	isc_nmhandle_attach(handle, &uvreq->handle);
1093 
1094 	uvreq->cb.send = cb;
1095 	uvreq->cbarg = cbarg;
1096 
1097 	ievent = isc__nm_get_netievent_tcpsend(sock->mgr, sock, uvreq);
1098 	isc__nm_maybe_enqueue_ievent(&sock->mgr->workers[sock->tid],
1099 				     (isc__netievent_t *)ievent);
1100 
1101 	return;
1102 }
1103 
1104 static void
tcp_send_cb(uv_write_t * req,int status)1105 tcp_send_cb(uv_write_t *req, int status) {
1106 	isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data;
1107 	isc_nmsocket_t *sock = NULL;
1108 
1109 	REQUIRE(VALID_UVREQ(uvreq));
1110 	REQUIRE(VALID_NMSOCK(uvreq->sock));
1111 
1112 	sock = uvreq->sock;
1113 
1114 	isc_nm_timer_stop(uvreq->timer);
1115 	isc_nm_timer_detach(&uvreq->timer);
1116 
1117 	if (status < 0) {
1118 		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
1119 		isc__nm_failed_send_cb(sock, uvreq,
1120 				       isc__nm_uverr2result(status));
1121 		return;
1122 	}
1123 
1124 	isc__nm_sendcb(sock, uvreq, ISC_R_SUCCESS, false);
1125 }
1126 
1127 /*
1128  * Handle 'tcpsend' async event - send a packet on the socket
1129  */
1130 void
isc__nm_async_tcpsend(isc__networker_t * worker,isc__netievent_t * ev0)1131 isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ev0) {
1132 	isc_result_t result;
1133 	isc__netievent_tcpsend_t *ievent = (isc__netievent_tcpsend_t *)ev0;
1134 	isc_nmsocket_t *sock = ievent->sock;
1135 	isc__nm_uvreq_t *uvreq = ievent->req;
1136 
1137 	REQUIRE(sock->type == isc_nm_tcpsocket);
1138 	REQUIRE(sock->tid == isc_nm_tid());
1139 	UNUSED(worker);
1140 
1141 	if (sock->write_timeout == 0) {
1142 		sock->write_timeout =
1143 			(atomic_load(&sock->keepalive)
1144 				 ? atomic_load(&sock->mgr->keepalive)
1145 				 : atomic_load(&sock->mgr->idle));
1146 	}
1147 
1148 	result = tcp_send_direct(sock, uvreq);
1149 	if (result != ISC_R_SUCCESS) {
1150 		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
1151 		isc__nm_failed_send_cb(sock, uvreq, result);
1152 	}
1153 }
1154 
1155 static isc_result_t
tcp_send_direct(isc_nmsocket_t * sock,isc__nm_uvreq_t * req)1156 tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
1157 	REQUIRE(VALID_NMSOCK(sock));
1158 	REQUIRE(VALID_UVREQ(req));
1159 	REQUIRE(sock->tid == isc_nm_tid());
1160 	REQUIRE(sock->type == isc_nm_tcpsocket);
1161 
1162 	int r;
1163 
1164 	if (isc__nmsocket_closing(sock)) {
1165 		return (ISC_R_CANCELED);
1166 	}
1167 
1168 	r = uv_write(&req->uv_req.write, &sock->uv_handle.stream, &req->uvbuf,
1169 		     1, tcp_send_cb);
1170 	if (r < 0) {
1171 		return (isc__nm_uverr2result(r));
1172 	}
1173 
1174 	isc_nm_timer_create(req->handle, isc__nmsocket_writetimeout_cb, req,
1175 			    &req->timer);
1176 	if (sock->write_timeout > 0) {
1177 		isc_nm_timer_start(req->timer, sock->write_timeout);
1178 	}
1179 
1180 	return (ISC_R_SUCCESS);
1181 }
1182 
1183 static void
tcp_stop_cb(uv_handle_t * handle)1184 tcp_stop_cb(uv_handle_t *handle) {
1185 	isc_nmsocket_t *sock = uv_handle_get_data(handle);
1186 	uv_handle_set_data(handle, NULL);
1187 
1188 	REQUIRE(VALID_NMSOCK(sock));
1189 	REQUIRE(sock->tid == isc_nm_tid());
1190 	REQUIRE(atomic_load(&sock->closing));
1191 
1192 	if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false },
1193 					    true))
1194 	{
1195 		UNREACHABLE();
1196 	}
1197 
1198 	isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
1199 
1200 	atomic_store(&sock->listening, false);
1201 
1202 	isc__nmsocket_detach(&sock);
1203 }
1204 
1205 static void
tcp_close_sock(isc_nmsocket_t * sock)1206 tcp_close_sock(isc_nmsocket_t *sock) {
1207 	REQUIRE(VALID_NMSOCK(sock));
1208 	REQUIRE(sock->tid == isc_nm_tid());
1209 	REQUIRE(atomic_load(&sock->closing));
1210 
1211 	if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false },
1212 					    true))
1213 	{
1214 		UNREACHABLE();
1215 	}
1216 
1217 	isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
1218 
1219 	if (sock->server != NULL) {
1220 		isc__nmsocket_detach(&sock->server);
1221 	}
1222 
1223 	atomic_store(&sock->connected, false);
1224 
1225 	isc__nmsocket_prep_destroy(sock);
1226 }
1227 
1228 static void
tcp_close_cb(uv_handle_t * handle)1229 tcp_close_cb(uv_handle_t *handle) {
1230 	isc_nmsocket_t *sock = uv_handle_get_data(handle);
1231 	uv_handle_set_data(handle, NULL);
1232 
1233 	tcp_close_sock(sock);
1234 }
1235 
1236 static void
read_timer_close_cb(uv_handle_t * handle)1237 read_timer_close_cb(uv_handle_t *handle) {
1238 	isc_nmsocket_t *sock = uv_handle_get_data(handle);
1239 	uv_handle_set_data(handle, NULL);
1240 
1241 	if (sock->parent) {
1242 		uv_close(&sock->uv_handle.handle, tcp_stop_cb);
1243 	} else if (uv_is_closing(&sock->uv_handle.handle)) {
1244 		tcp_close_sock(sock);
1245 	} else {
1246 		uv_close(&sock->uv_handle.handle, tcp_close_cb);
1247 	}
1248 }
1249 
1250 static void
stop_tcp_child(isc_nmsocket_t * sock)1251 stop_tcp_child(isc_nmsocket_t *sock) {
1252 	REQUIRE(sock->type == isc_nm_tcpsocket);
1253 	REQUIRE(sock->tid == isc_nm_tid());
1254 
1255 	if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
1256 					    true))
1257 	{
1258 		return;
1259 	}
1260 
1261 	tcp_close_direct(sock);
1262 
1263 	atomic_fetch_sub(&sock->parent->rchildren, 1);
1264 
1265 	isc_barrier_wait(&sock->parent->stoplistening);
1266 }
1267 
1268 static void
stop_tcp_parent(isc_nmsocket_t * sock)1269 stop_tcp_parent(isc_nmsocket_t *sock) {
1270 	isc_nmsocket_t *csock = NULL;
1271 
1272 	REQUIRE(VALID_NMSOCK(sock));
1273 	REQUIRE(sock->tid == isc_nm_tid());
1274 	REQUIRE(sock->type == isc_nm_tcplistener);
1275 
1276 	isc_barrier_init(&sock->stoplistening, sock->nchildren);
1277 
1278 	for (size_t i = 0; i < sock->nchildren; i++) {
1279 		csock = &sock->children[i];
1280 		REQUIRE(VALID_NMSOCK(csock));
1281 
1282 		if ((int)i == isc_nm_tid()) {
1283 			/*
1284 			 * We need to schedule closing the other sockets first
1285 			 */
1286 			continue;
1287 		}
1288 
1289 		atomic_store(&csock->active, false);
1290 		enqueue_stoplistening(csock);
1291 	}
1292 
1293 	csock = &sock->children[isc_nm_tid()];
1294 	atomic_store(&csock->active, false);
1295 	stop_tcp_child(csock);
1296 
1297 	atomic_store(&sock->closed, true);
1298 	isc__nmsocket_prep_destroy(sock);
1299 }
1300 
1301 static void
tcp_close_direct(isc_nmsocket_t * sock)1302 tcp_close_direct(isc_nmsocket_t *sock) {
1303 	REQUIRE(VALID_NMSOCK(sock));
1304 	REQUIRE(sock->tid == isc_nm_tid());
1305 	REQUIRE(atomic_load(&sock->closing));
1306 
1307 	if (sock->server != NULL) {
1308 		REQUIRE(VALID_NMSOCK(sock->server));
1309 		REQUIRE(VALID_NMSOCK(sock->server->parent));
1310 		if (sock->server->parent != NULL) {
1311 			atomic_fetch_sub(
1312 				&sock->server->parent->active_child_connections,
1313 				1);
1314 		}
1315 	}
1316 
1317 	if (sock->quota != NULL) {
1318 		isc_quota_detach(&sock->quota);
1319 	}
1320 
1321 	isc__nmsocket_timer_stop(sock);
1322 	isc__nm_stop_reading(sock);
1323 
1324 	uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock);
1325 	uv_close((uv_handle_t *)&sock->read_timer, read_timer_close_cb);
1326 }
1327 
1328 void
isc__nm_tcp_close(isc_nmsocket_t * sock)1329 isc__nm_tcp_close(isc_nmsocket_t *sock) {
1330 	REQUIRE(VALID_NMSOCK(sock));
1331 	REQUIRE(sock->type == isc_nm_tcpsocket);
1332 	REQUIRE(!isc__nmsocket_active(sock));
1333 
1334 	if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
1335 					    true))
1336 	{
1337 		return;
1338 	}
1339 
1340 	if (sock->tid == isc_nm_tid()) {
1341 		tcp_close_direct(sock);
1342 	} else {
1343 		/*
1344 		 * We need to create an event and pass it using async channel
1345 		 */
1346 		isc__netievent_tcpclose_t *ievent =
1347 			isc__nm_get_netievent_tcpclose(sock->mgr, sock);
1348 
1349 		isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
1350 				       (isc__netievent_t *)ievent);
1351 	}
1352 }
1353 
1354 void
isc__nm_async_tcpclose(isc__networker_t * worker,isc__netievent_t * ev0)1355 isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ev0) {
1356 	isc__netievent_tcpclose_t *ievent = (isc__netievent_tcpclose_t *)ev0;
1357 	isc_nmsocket_t *sock = ievent->sock;
1358 
1359 	REQUIRE(VALID_NMSOCK(sock));
1360 	REQUIRE(sock->tid == isc_nm_tid());
1361 
1362 	UNUSED(worker);
1363 
1364 	tcp_close_direct(sock);
1365 }
1366 
1367 static void
tcp_close_connect_cb(uv_handle_t * handle)1368 tcp_close_connect_cb(uv_handle_t *handle) {
1369 	isc_nmsocket_t *sock = uv_handle_get_data(handle);
1370 
1371 	REQUIRE(VALID_NMSOCK(sock));
1372 
1373 	REQUIRE(isc__nm_in_netthread());
1374 	REQUIRE(sock->tid == isc_nm_tid());
1375 
1376 	isc__nmsocket_prep_destroy(sock);
1377 	isc__nmsocket_detach(&sock);
1378 }
1379 
1380 void
isc__nm_tcp_shutdown(isc_nmsocket_t * sock)1381 isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
1382 	REQUIRE(VALID_NMSOCK(sock));
1383 	REQUIRE(sock->tid == isc_nm_tid());
1384 	REQUIRE(sock->type == isc_nm_tcpsocket);
1385 
1386 	/*
1387 	 * If the socket is active, mark it inactive and
1388 	 * continue. If it isn't active, stop now.
1389 	 */
1390 	if (!isc__nmsocket_deactivate(sock)) {
1391 		return;
1392 	}
1393 
1394 	if (sock->accepting) {
1395 		return;
1396 	}
1397 
1398 	if (atomic_load(&sock->connecting)) {
1399 		isc_nmsocket_t *tsock = NULL;
1400 		isc__nmsocket_attach(sock, &tsock);
1401 		uv_close(&sock->uv_handle.handle, tcp_close_connect_cb);
1402 		return;
1403 	}
1404 
1405 	if (sock->statichandle != NULL) {
1406 		isc__nm_tcp_failed_read_cb(sock, ISC_R_CANCELED);
1407 		return;
1408 	}
1409 
1410 	/*
1411 	 * Otherwise, we just send the socket to abyss...
1412 	 */
1413 	if (sock->parent == NULL) {
1414 		isc__nmsocket_prep_destroy(sock);
1415 	}
1416 }
1417 
1418 void
isc__nm_tcp_cancelread(isc_nmhandle_t * handle)1419 isc__nm_tcp_cancelread(isc_nmhandle_t *handle) {
1420 	isc_nmsocket_t *sock = NULL;
1421 	isc__netievent_tcpcancel_t *ievent = NULL;
1422 
1423 	REQUIRE(VALID_NMHANDLE(handle));
1424 
1425 	sock = handle->sock;
1426 
1427 	REQUIRE(VALID_NMSOCK(sock));
1428 	REQUIRE(sock->type == isc_nm_tcpsocket);
1429 
1430 	ievent = isc__nm_get_netievent_tcpcancel(sock->mgr, sock, handle);
1431 	isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
1432 			       (isc__netievent_t *)ievent);
1433 }
1434 
1435 void
isc__nm_async_tcpcancel(isc__networker_t * worker,isc__netievent_t * ev0)1436 isc__nm_async_tcpcancel(isc__networker_t *worker, isc__netievent_t *ev0) {
1437 	isc__netievent_tcpcancel_t *ievent = (isc__netievent_tcpcancel_t *)ev0;
1438 	isc_nmsocket_t *sock = ievent->sock;
1439 
1440 	REQUIRE(VALID_NMSOCK(sock));
1441 	REQUIRE(sock->tid == isc_nm_tid());
1442 	UNUSED(worker);
1443 
1444 	uv_timer_stop(&sock->read_timer);
1445 
1446 	isc__nm_tcp_failed_read_cb(sock, ISC_R_EOF);
1447 }
1448 
1449 int_fast32_t
isc__nm_tcp_listener_nactive(isc_nmsocket_t * listener)1450 isc__nm_tcp_listener_nactive(isc_nmsocket_t *listener) {
1451 	int_fast32_t nactive;
1452 
1453 	REQUIRE(VALID_NMSOCK(listener));
1454 
1455 	nactive = atomic_load(&listener->active_child_connections);
1456 	INSIST(nactive >= 0);
1457 	return (nactive);
1458 }
1459