xref: /netbsd-src/external/mpl/bind/dist/bin/tests/system/tcp/tests_tcp.py (revision 9689912e6b171cbda866ec33f15ae94a04e2c02d)
1#!/usr/bin/python3
2
3# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
4#
5# SPDX-License-Identifier: MPL-2.0
6#
7# This Source Code Form is subject to the terms of the Mozilla Public
8# License, v. 2.0.  If a copy of the MPL was not distributed with this
9# file, you can obtain one at https://mozilla.org/MPL/2.0/.
10#
11# See the COPYRIGHT file distributed with this work for additional
12# information regarding copyright ownership.
13
14# pylint: disable=unused-variable
15
16import socket
17import struct
18import time
19
20import pytest
21
22pytest.importorskip("dns", minversion="2.0.0")
23import dns.message
24import dns.query
25
26pytestmark = pytest.mark.extra_artifacts(
27    [
28        "ans*/ans.run",
29    ]
30)
31
32TIMEOUT = 10
33
34
35def create_msg(qname, qtype, edns=-1):
36    msg = dns.message.make_query(qname, qtype, use_edns=edns)
37    return msg
38
39
40def timeout():
41    return time.time() + TIMEOUT
42
43
44def create_socket(host, port):
45    sock = socket.create_connection((host, port), timeout=10)
46    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
47    return sock
48
49
50def test_tcp_garbage(named_port):
51    with create_socket("10.53.0.7", named_port) as sock:
52        msg = create_msg("a.example.", "A")
53        dns.query.send_tcp(sock, msg, timeout())
54        dns.query.receive_tcp(sock, timeout())
55
56        wire = msg.to_wire()
57        assert len(wire) > 0
58
59        # Send DNS message shorter than DNS message header (12),
60        # this should cause the connection to be terminated
61        sock.send(struct.pack("!H", 11))
62        sock.send(struct.pack("!s", b"0123456789a"))
63
64        with pytest.raises(EOFError):
65            try:
66                dns.query.send_tcp(sock, msg, timeout())
67                dns.query.receive_tcp(sock, timeout())
68            except ConnectionError as e:
69                raise EOFError from e
70
71
72def test_tcp_garbage_response(named_port):
73    with create_socket("10.53.0.7", named_port) as sock:
74        msg = create_msg("a.example.", "A")
75        dns.query.send_tcp(sock, msg, timeout())
76        dns.query.receive_tcp(sock, timeout())
77
78        wire = msg.to_wire()
79        assert len(wire) > 0
80
81        # Send DNS response instead of DNS query, this should cause
82        # the connection to be terminated
83
84        rmsg = dns.message.make_response(msg)
85        dns.query.send_tcp(sock, rmsg, timeout())
86
87        with pytest.raises(EOFError):
88            try:
89                dns.query.receive_tcp(sock, timeout())
90            except ConnectionError as e:
91                raise EOFError from e
92
93
94# Regression test for CVE-2022-0396
95def test_close_wait(named_port):
96    with create_socket("10.53.0.7", named_port) as sock:
97        msg = create_msg("a.example.", "A")
98        dns.query.send_tcp(sock, msg, timeout())
99        dns.query.receive_tcp(sock, timeout())
100
101        msg = dns.message.make_query("a.example.", "A", use_edns=0, payload=1232)
102        dns.query.send_tcp(sock, msg, timeout())
103
104        # Shutdown the socket, but ignore the other side closing the socket
105        # first because we sent DNS message with EDNS0
106        try:
107            sock.shutdown(socket.SHUT_RDWR)
108        except ConnectionError:
109            pass
110        except OSError:
111            pass
112
113    # BIND allows one TCP client, the part above sends DNS messaage with EDNS0
114    # after the first query. BIND should react adequately because of
115    # ns7/named.dropedns and close the socket, making room for the next
116    # request. If it gets stuck in CLOSE_WAIT state, there is no connection
117    # available for the query below and it will time out.
118    with create_socket("10.53.0.7", named_port) as sock:
119        msg = create_msg("a.example.", "A")
120        dns.query.send_tcp(sock, msg, timeout())
121        dns.query.receive_tcp(sock, timeout())
122
123
124# GL #4273
125def test_tcp_big(named_port):
126    with create_socket("10.53.0.7", named_port) as sock:
127        msg = dns.message.Message(id=0)
128        msg.flags = dns.flags.RD
129        msg.question.append(dns.rrset.from_text(dns.name.root, 0, 1, "URI"))
130        msg.additional.append(
131            dns.rrset.from_text(dns.name.root, 0, 1, "URI", "0 0 " + "b" * 65503)
132        )
133        dns.query.send_tcp(sock, msg, timeout())
134        dns.query.receive_tcp(sock, timeout())
135
136        # Now check that the server is alive and well
137        msg = create_msg("a.example.", "A")
138        dns.query.send_tcp(sock, msg, timeout())
139        dns.query.receive_tcp(sock, timeout())
140