xref: /netbsd-src/external/mpl/bind/dist/bin/tests/system/cookie/ans9/ans.py (revision 9689912e6b171cbda866ec33f15ae94a04e2c02d)
1# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
2#
3# SPDX-License-Identifier: MPL-2.0
4#
5# This Source Code Form is subject to the terms of the Mozilla Public
6# License, v. 2.0.  If a copy of the MPL was not distributed with this
7# file, you can obtain one at https://mozilla.org/MPL/2.0/.
8#
9# See the COPYRIGHT file distributed with this work for additional
10# information regarding copyright ownership.
11
12from __future__ import print_function
13import os
14import sys
15import signal
16import socket
17import select
18from datetime import datetime, timedelta
19import time
20import functools
21
22import dns
23import dns.edns
24import dns.flags
25import dns.message
26import dns.query
27import dns.tsig
28import dns.tsigkeyring
29import dns.version
30
31from dns.edns import *
32from dns.name import *
33from dns.rcode import *
34from dns.rdataclass import *
35from dns.rdatatype import *
36from dns.tsig import *
37
38
39# Log query to file
40def logquery(type, qname):
41    with open("qlog", "a") as f:
42        f.write("%s %s\n", type, qname)
43
44
45# DNS 2.0 keyring specifies the algorithm
46try:
47    keyring = dns.tsigkeyring.from_text(
48        {
49            "foo": {os.getenv("DEFAULT_HMAC", "hmac-sha256"), "aaaaaaaaaaaa"},
50            "fake": {os.getenv("DEFAULT_HMAC", "hmac-sha256"), "aaaaaaaaaaaa"},
51        }
52    )
53except:
54    keyring = dns.tsigkeyring.from_text({"foo": "aaaaaaaaaaaa", "fake": "aaaaaaaaaaaa"})
55
56dopass2 = False
57
58
59############################################################################
60#
61# This server will serve valid and spoofed answers. A spoofed answer will
62# have the address 10.53.0.10 included.
63#
64# When receiving a query over UDP:
65#
66# A query to "nocookie"/A will result in a spoofed answer with no cookie set.
67# A query to "tcponly"/A will result in a spoofed answer with no cookie set.
68# A query to "withtsig"/A will result in two responses, the first is a spoofed
69# answer that is TSIG signed, the second is a valid answer with a cookie set.
70# A query to anything else will result in a valid answer with a cookie set.
71#
72# When receiving a query over TCP:
73#
74# A query to "nocookie"/A will result in a valid answer with no cookie set.
75# A query to anything else will result in a valid answer with a cookie set.
76#
77############################################################################
78def create_response(msg, tcp, first, ns10):
79    global dopass2
80    m = dns.message.from_wire(msg, keyring=keyring)
81    qname = m.question[0].name.to_text()
82    lqname = qname.lower()
83    labels = lqname.split(".")
84    rrtype = m.question[0].rdtype
85    typename = dns.rdatatype.to_text(rrtype)
86
87    with open("query.log", "a") as f:
88        f.write("%s %s\n" % (typename, qname))
89        print("%s %s" % (typename, qname), end=" ")
90
91    r = dns.message.make_response(m)
92    r.set_rcode(NOERROR)
93    if rrtype == A:
94        # exempt potential nameserver A records.
95        if labels[0] == "ns" and ns10:
96            r.answer.append(dns.rrset.from_text(qname, 1, IN, A, "10.53.0.10"))
97        else:
98            r.answer.append(dns.rrset.from_text(qname, 1, IN, A, "10.53.0.9"))
99        if not tcp and labels[0] == "nocookie":
100            r.answer.append(dns.rrset.from_text(qname, 1, IN, A, "10.53.0.10"))
101        if not tcp and labels[0] == "tcponly":
102            r.answer.append(dns.rrset.from_text(qname, 1, IN, A, "10.53.0.10"))
103        if first and not tcp and labels[0] == "withtsig":
104            r.answer.append(dns.rrset.from_text(qname, 1, IN, A, "10.53.0.10"))
105            dopass2 = True
106    elif rrtype == NS:
107        r.answer.append(dns.rrset.from_text(qname, 1, IN, NS, "."))
108    elif rrtype == SOA:
109        r.answer.append(dns.rrset.from_text(qname, 1, IN, SOA, ". . 0 0 0 0 0"))
110    else:
111        r.authority.append(dns.rrset.from_text(qname, 1, IN, SOA, ". . 0 0 0 0 0"))
112    # Add a server cookie to the response
113    if labels[0] != "nocookie":
114        for o in m.options:
115            if o.otype == 10:  # Use 10 instead of COOKIE
116                if first and labels[0] == "withtsig" and not tcp:
117                    r.use_tsig(
118                        keyring=keyring,
119                        keyname=dns.name.from_text("fake"),
120                        algorithm=HMAC_SHA256,
121                    )
122                elif labels[0] != "tcponly" or tcp:
123                    cookie = o
124                    try:
125                        if len(o.server) == 0:
126                            cookie.server = o.client
127                    except AttributeError:  # dnspython<2.7.0 compat
128                        if len(o.data) == 8:
129                            cookie.data = o.data + o.data
130                        else:
131                            cookie.data = o.data
132                    r.use_edns(options=[cookie])
133    r.flags |= dns.flags.AA
134    return r
135
136
137def sigterm(signum, frame):
138    print("Shutting down now...")
139    os.remove("ans.pid")
140    running = False
141    sys.exit(0)
142
143
144############################################################################
145# Main
146#
147# Set up responder and control channel, open the pid file, and start
148# the main loop, listening for queries on the query channel or commands
149# on the control channel and acting on them.
150############################################################################
151ip4_addr1 = "10.53.0.9"
152ip4_addr2 = "10.53.0.10"
153ip6_addr1 = "fd92:7065:b8e:ffff::9"
154ip6_addr2 = "fd92:7065:b8e:ffff::10"
155
156try:
157    port = int(os.environ["PORT"])
158except:
159    port = 5300
160
161query4_udp1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
162query4_udp1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
163query4_udp1.bind((ip4_addr1, port))
164query4_tcp1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
165query4_tcp1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
166query4_tcp1.bind((ip4_addr1, port))
167query4_tcp1.listen(1)
168query4_tcp1.settimeout(1)
169
170query4_udp2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
171query4_udp2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
172query4_udp2.bind((ip4_addr2, port))
173query4_tcp2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
174query4_tcp2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
175query4_tcp2.bind((ip4_addr2, port))
176query4_tcp2.listen(1)
177query4_tcp2.settimeout(1)
178
179havev6 = True
180query6_udp1 = None
181query6_udp2 = None
182query6_tcp1 = None
183query6_tcp2 = None
184try:
185    query6_udp1 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
186    query6_udp1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
187    query6_udp1.bind((ip6_addr1, port))
188    query6_tcp1 = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
189    query6_tcp1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
190    query6_tcp1.bind((ip6_addr1, port))
191    query6_tcp1.listen(1)
192    query6_tcp1.settimeout(1)
193
194    query6_udp2 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
195    query6_udp2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
196    query6_udp2.bind((ip6_addr2, port))
197    query6_tcp2 = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
198    query6_tcp2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
199    query6_tcp2.bind((ip6_addr2, port))
200    query6_tcp2.listen(1)
201    query6_tcp2.settimeout(1)
202except:
203    if query6_udp1 != None:
204        query6_udp1.close()
205    if query6_tcp1 != None:
206        query6_tcp1.close()
207    if query6_udp2 != None:
208        query6_udp2.close()
209    if query6_tcp2 != None:
210        query6_tcp2.close()
211    havev6 = False
212
213signal.signal(signal.SIGTERM, sigterm)
214
215f = open("ans.pid", "w")
216pid = os.getpid()
217print(pid, file=f)
218f.close()
219
220running = True
221
222print("Using DNS version %s" % dns.version.version)
223print("Listening on %s port %d" % (ip4_addr1, port))
224print("Listening on %s port %d" % (ip4_addr2, port))
225if havev6:
226    print("Listening on %s port %d" % (ip6_addr1, port))
227    print("Listening on %s port %d" % (ip6_addr2, port))
228print("Ctrl-c to quit")
229
230if havev6:
231    input = [
232        query4_udp1,
233        query6_udp1,
234        query4_tcp1,
235        query6_tcp1,
236        query4_udp2,
237        query6_udp2,
238        query4_tcp2,
239        query6_tcp2,
240    ]
241else:
242    input = [query4_udp1, query4_tcp1, query4_udp2, query4_tcp2]
243
244while running:
245    try:
246        inputready, outputready, exceptready = select.select(input, [], [])
247    except select.error as e:
248        break
249    except socket.error as e:
250        break
251    except KeyboardInterrupt:
252        break
253
254    for s in inputready:
255        ns10 = False
256        if s == query4_udp1 or s == query6_udp1 or s == query4_udp2 or s == query6_udp2:
257            if s == query4_udp1 or s == query6_udp1:
258                print(
259                    "UDP Query received on %s"
260                    % (ip4_addr1 if s == query4_udp1 else ip6_addr1),
261                    end=" ",
262                )
263            if s == query4_udp2 or s == query6_udp2:
264                print(
265                    "UDP Query received on %s"
266                    % (ip4_addr2 if s == query4_udp2 else ip6_addr2),
267                    end=" ",
268                )
269                ns10 = True
270            # Handle incoming queries
271            msg = s.recvfrom(65535)
272            dopass2 = False
273            rsp = create_response(msg[0], False, True, ns10)
274            print(dns.rcode.to_text(rsp.rcode()))
275            s.sendto(rsp.to_wire(), msg[1])
276            if dopass2:
277                print("Sending second UDP response without TSIG", end=" ")
278                rsp = create_response(msg[0], False, False, ns10)
279                s.sendto(rsp.to_wire(), msg[1])
280                print(dns.rcode.to_text(rsp.rcode()))
281
282        if s == query4_tcp1 or s == query6_tcp1 or s == query4_tcp2 or s == query6_tcp2:
283            try:
284                (cs, _) = s.accept()
285                if s == query4_tcp1 or s == query6_tcp1:
286                    print(
287                        "TCP Query received on %s"
288                        % (ip4_addr1 if s == query4_tcp1 else ip6_addr1),
289                        end=" ",
290                    )
291                if s == query4_tcp2 or s == query6_tcp2:
292                    print(
293                        "TCP Query received on %s"
294                        % (ip4_addr2 if s == query4_tcp2 else ip6_addr2),
295                        end=" ",
296                    )
297                    ns10 = True
298                # get TCP message length
299                buf = cs.recv(2)
300                length = struct.unpack(">H", buf[:2])[0]
301                # grep DNS message
302                msg = cs.recv(length)
303                rsp = create_response(msg, True, True, ns10)
304                print(dns.rcode.to_text(rsp.rcode()))
305                wire = rsp.to_wire()
306                cs.send(struct.pack(">H", len(wire)))
307                cs.send(wire)
308                cs.close()
309            except s.timeout:
310                pass
311    if not running:
312        break
313