xref: /netbsd-src/external/mpl/bind/dist/bin/tests/system/tcp/tests_tcp.py (revision a45db23f655e22f0c2354600d3b3c2cb98abf2dc)
1#!/usr/bin/python3
2
3# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
4#
5# SPDX-License-Identifier: MPL-2.0
6#
7# This Source Code Form is subject to the terms of the Mozilla Public
8# License, v. 2.0.  If a copy of the MPL was not distributed with this
9# file, you can obtain one at https://mozilla.org/MPL/2.0/.
10#
11# See the COPYRIGHT file distributed with this work for additional
12# information regarding copyright ownership.
13
14# pylint: disable=unused-variable
15
16import socket
17import time
18
19import pytest
20
21pytest.importorskip("dns", minversion="2.0.0")
22import dns.message
23import dns.query
24
25TIMEOUT = 10
26
27
28def create_msg(qname, qtype, edns=-1):
29    msg = dns.message.make_query(qname, qtype, use_edns=edns)
30    return msg
31
32
33def timeout():
34    return time.time() + TIMEOUT
35
36
37def create_socket(host, port):
38    sock = socket.create_connection((host, port), timeout=10)
39    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
40    return sock
41
42
43# Regression test for CVE-2022-0396
44def test_close_wait(named_port):
45    with create_socket("10.53.0.7", named_port) as sock:
46        msg = create_msg("a.example.", "A")
47        (sbytes, stime) = dns.query.send_tcp(sock, msg, timeout())
48        (response, rtime) = dns.query.receive_tcp(sock, timeout())
49
50        msg = dns.message.make_query("a.example.", "A", use_edns=0, payload=1232)
51        (sbytes, stime) = dns.query.send_tcp(sock, msg, timeout())
52
53        # Shutdown the socket, but ignore the other side closing the socket
54        # first because we sent DNS message with EDNS0
55        try:
56            sock.shutdown(socket.SHUT_RDWR)
57        except ConnectionError:
58            pass
59        except OSError:
60            pass
61
62    # BIND allows one TCP client, the part above sends DNS messaage with EDNS0
63    # after the first query. BIND should react adequately because of
64    # ns7/named.dropedns and close the socket, making room for the next
65    # request. If it gets stuck in CLOSE_WAIT state, there is no connection
66    # available for the query below and it will time out.
67    with create_socket("10.53.0.7", named_port) as sock:
68        msg = create_msg("a.example.", "A")
69        (sbytes, stime) = dns.query.send_tcp(sock, msg, timeout())
70        (response, rtime) = dns.query.receive_tcp(sock, timeout())
71