xref: /netbsd-src/external/mpl/bind/dist/bin/tests/system/checkds/tests_checkds.py (revision 9689912e6b171cbda866ec33f15ae94a04e2c02d)
1#!/usr/bin/python3
2
3# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
4#
5# SPDX-License-Identifier: MPL-2.0
6#
7# This Source Code Form is subject to the terms of the Mozilla Public
8# License, v. 2.0.  If a copy of the MPL was not distributed with this
9# file, you can obtain one at https://mozilla.org/MPL/2.0/.
10#
11# See the COPYRIGHT file distributed with this work for additional
12# information regarding copyright ownership.
13
14
15from typing import NamedTuple, Tuple
16
17import os
18import sys
19import time
20
21import isctest
22import pytest
23
24pytest.importorskip("dns", minversion="2.0.0")
25import dns.exception
26import dns.message
27import dns.name
28import dns.rcode
29import dns.rdataclass
30import dns.rdatatype
31
32
33pytestmark = [
34    pytest.mark.skipif(
35        sys.version_info < (3, 7), reason="Python >= 3.7 required [GL #3001]"
36    ),
37    pytest.mark.extra_artifacts(
38        [
39            "*.out",
40            "ns*/*.db",
41            "ns*/*.db.infile",
42            "ns*/*.db.signed",
43            "ns*/*.jnl",
44            "ns*/*.jbk",
45            "ns*/*.keyname",
46            "ns*/dsset-*",
47            "ns*/K*",
48            "ns*/keygen.out*",
49            "ns*/settime.out*",
50            "ns*/signer.out*",
51            "ns*/trusted.conf",
52            "ns*/zones",
53        ]
54    ),
55]
56
57
58def has_signed_apex_nsec(zone, response):
59    has_nsec = False
60    has_rrsig = False
61
62    ttl = 300
63    nextname = "a."
64    labelcount = zone.count(".")  # zone is specified as FQDN
65    types = "NS SOA RRSIG NSEC DNSKEY"
66    match = f"{zone} {ttl} IN NSEC {nextname}{zone} {types}"
67    sig = f"{zone} {ttl} IN RRSIG NSEC 13 {labelcount} 300"
68
69    for rr in response.answer:
70        if match in rr.to_text():
71            has_nsec = True
72        if sig in rr.to_text():
73            has_rrsig = True
74
75    if not has_nsec:
76        print("error: missing apex NSEC record in response")
77    if not has_rrsig:
78        print("error: missing NSEC signature in response")
79
80    return has_nsec and has_rrsig
81
82
83def do_query(server, qname, qtype, tcp=False):
84    msg = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
85    query_func = isctest.query.tcp if tcp else isctest.query.udp
86    response = query_func(msg, server.ip, expected_rcode=dns.rcode.NOERROR)
87    return response
88
89
90def verify_zone(zone, transfer):
91    verify = os.getenv("VERIFY")
92    assert verify is not None
93
94    filename = f"{zone}out"
95    with open(filename, "w", encoding="utf-8") as file:
96        for rr in transfer.answer:
97            file.write(rr.to_text())
98            file.write("\n")
99
100    # dnssec-verify command with default arguments.
101    verify_cmd = [verify, "-z", "-o", zone, filename]
102
103    verifier = isctest.run.cmd(verify_cmd)
104
105    if verifier.returncode != 0:
106        print(f"error: dnssec-verify {zone} failed")
107        sys.stderr.buffer.write(verifier.stderr)
108
109    return verifier.returncode == 0
110
111
112def read_statefile(server, zone):
113    count = 0
114    keyid = 0
115    state = {}
116
117    response = do_query(server, zone, "DS", tcp=True)
118    # fetch key id from response.
119    for rr in response.answer:
120        if rr.match(
121            dns.name.from_text(zone),
122            dns.rdataclass.IN,
123            dns.rdatatype.DS,
124            dns.rdatatype.NONE,
125        ):
126            if count == 0:
127                keyid = list(dict(rr.items).items())[0][0].key_tag
128            count += 1
129
130    assert (
131        count == 1
132    ), f"expected a single DS in response for {zone} from {server.ip}, got {count}"
133
134    filename = f"ns9/K{zone}+013+{keyid:05d}.state"
135    print(f"read state file {filename}")
136
137    try:
138        with open(filename, "r", encoding="utf-8") as file:
139            for line in file:
140                if line.startswith(";"):
141                    continue
142                key, val = line.strip().split(":", 1)
143                state[key.strip()] = val.strip()
144    except FileNotFoundError:
145        # file may not be written just yet.
146        return {}
147
148    return state
149
150
151def zone_check(server, zone):
152    fqdn = f"{zone}."
153
154    # check zone is fully signed.
155    response = do_query(server, fqdn, "NSEC")
156    assert has_signed_apex_nsec(fqdn, response)
157
158    # check if zone if DNSSEC valid.
159    transfer = do_query(server, fqdn, "AXFR", tcp=True)
160    assert verify_zone(fqdn, transfer)
161
162
163def keystate_check(server, zone, key):
164    fqdn = f"{zone}."
165    val = 0
166    deny = False
167
168    search = key
169    if key.startswith("!"):
170        deny = True
171        search = key[1:]
172
173    for _ in range(10):
174        state = read_statefile(server, fqdn)
175        try:
176            val = state[search]
177        except KeyError:
178            pass
179
180        if not deny and val != 0:
181            break
182        if deny and val == 0:
183            break
184
185        time.sleep(1)
186
187    if deny:
188        assert val == 0
189    else:
190        assert val != 0
191
192
193def rekey(zone):
194    rndc = os.getenv("RNDC")
195    assert rndc is not None
196
197    port = os.getenv("CONTROLPORT")
198    assert port is not None
199
200    # rndc loadkeys.
201    rndc_cmd = [
202        rndc,
203        "-c",
204        "../_common/rndc.conf",
205        "-p",
206        port,
207        "-s",
208        "10.53.0.9",
209        "loadkeys",
210        zone,
211    ]
212    controller = isctest.run.cmd(rndc_cmd)
213
214    if controller.returncode != 0:
215        print(f"error: rndc loadkeys {zone} failed")
216        sys.stderr.buffer.write(controller.stderr)
217
218    assert controller.returncode == 0
219
220
221class CheckDSTest(NamedTuple):
222    zone: str
223    logs_to_wait_for: Tuple[str]
224    expected_parent_state: str
225
226
227parental_agents_tests = [
228    # Using a reference to parental-agents.
229    CheckDSTest(
230        zone="reference.explicit.dspublish.ns2",
231        logs_to_wait_for=("DS response from 10.53.0.8",),
232        expected_parent_state="DSPublish",
233    ),
234    # Using a resolver as parental-agent (ns3).
235    CheckDSTest(
236        zone="resolver.explicit.dspublish.ns2",
237        logs_to_wait_for=("DS response from 10.53.0.3",),
238        expected_parent_state="DSPublish",
239    ),
240    # Using a resolver as parental-agent (ns3).
241    CheckDSTest(
242        zone="resolver.explicit.dsremoved.ns5",
243        logs_to_wait_for=("empty DS response from 10.53.0.3",),
244        expected_parent_state="DSRemoved",
245    ),
246]
247
248no_ent_tests = [
249    CheckDSTest(
250        zone="no-ent.ns2",
251        logs_to_wait_for=("DS response from 10.53.0.2",),
252        expected_parent_state="DSPublish",
253    ),
254    CheckDSTest(
255        zone="no-ent.ns5",
256        logs_to_wait_for=("DS response from 10.53.0.5",),
257        expected_parent_state="DSRemoved",
258    ),
259]
260
261
262def dspublished_tests(checkds, addr):
263    return [
264        #
265        # 1.1.1: DS is correctly published in parent.
266        # parental-agents: ns2
267        #
268        # The simple case.
269        CheckDSTest(
270            zone=f"good.{checkds}.dspublish.ns2",
271            logs_to_wait_for=(f"DS response from {addr}",),
272            expected_parent_state="DSPublish",
273        ),
274        #
275        # 1.1.2: DS is not published in parent.
276        # parental-agents: ns5
277        #
278        CheckDSTest(
279            zone=f"not-yet.{checkds}.dspublish.ns5",
280            logs_to_wait_for=("empty DS response from 10.53.0.5",),
281            expected_parent_state="!DSPublish",
282        ),
283        #
284        # 1.1.3: The parental agent is badly configured.
285        # parental-agents: ns6
286        #
287        CheckDSTest(
288            zone=f"bad.{checkds}.dspublish.ns6",
289            logs_to_wait_for=(
290                (
291                    "bad DS response from 10.53.0.6"
292                    if checkds == "explicit"
293                    else "error during parental-agents processing"
294                ),
295            ),
296            expected_parent_state="!DSPublish",
297        ),
298        #
299        # 1.1.4: DS is published, but has bogus signature.
300        #
301        # TBD
302        #
303        # 1.2.1: DS is correctly published in all parents.
304        # parental-agents: ns2, ns4
305        #
306        CheckDSTest(
307            zone=f"good.{checkds}.dspublish.ns2-4",
308            logs_to_wait_for=(f"DS response from {addr}", "DS response from 10.53.0.4"),
309            expected_parent_state="DSPublish",
310        ),
311        #
312        # 1.2.2: DS is not published in some parents.
313        # parental-agents: ns2, ns4, ns5
314        #
315        CheckDSTest(
316            zone=f"incomplete.{checkds}.dspublish.ns2-4-5",
317            logs_to_wait_for=(
318                f"DS response from {addr}",
319                "DS response from 10.53.0.4",
320                "empty DS response from 10.53.0.5",
321            ),
322            expected_parent_state="!DSPublish",
323        ),
324        #
325        # 1.2.3: One parental agent is badly configured.
326        # parental-agents: ns2, ns4, ns6
327        #
328        CheckDSTest(
329            zone=f"bad.{checkds}.dspublish.ns2-4-6",
330            logs_to_wait_for=(
331                f"DS response from {addr}",
332                "DS response from 10.53.0.4",
333                "bad DS response from 10.53.0.6",
334            ),
335            expected_parent_state="!DSPublish",
336        ),
337        #
338        # 1.2.4: DS is completely published, bogus signature.
339        #
340        # TBD
341        # TBD: Check with TSIG
342        # TBD: Check with TLS
343    ]
344
345
346def dswithdrawn_tests(checkds, addr):
347    return [
348        #
349        # 2.1.1: DS correctly withdrawn from the parent.
350        # parental-agents: ns5
351        #
352        # The simple case.
353        CheckDSTest(
354            zone=f"good.{checkds}.dsremoved.ns5",
355            logs_to_wait_for=(f"empty DS response from {addr}",),
356            expected_parent_state="DSRemoved",
357        ),
358        #
359        # 2.1.2: DS is published in the parent.
360        # parental-agents: ns2
361        #
362        CheckDSTest(
363            zone=f"still-there.{checkds}.dsremoved.ns2",
364            logs_to_wait_for=("DS response from 10.53.0.2",),
365            expected_parent_state="!DSRemoved",
366        ),
367        #
368        # 2.1.3: The parental agent is badly configured.
369        # parental-agents: ns6
370        #
371        CheckDSTest(
372            zone=f"bad.{checkds}.dsremoved.ns6",
373            logs_to_wait_for=(
374                (
375                    "bad DS response from 10.53.0.6"
376                    if checkds == "explicit"
377                    else "error during parental-agents processing"
378                ),
379            ),
380            expected_parent_state="!DSRemoved",
381        ),
382        #
383        # 2.1.4: DS is withdrawn, but has bogus signature.
384        #
385        # TBD
386        #
387        # 2.2.1: DS is correctly withdrawn from all parents.
388        # parental-agents: ns5, ns7
389        #
390        CheckDSTest(
391            zone=f"good.{checkds}.dsremoved.ns5-7",
392            logs_to_wait_for=(
393                f"empty DS response from {addr}",
394                "empty DS response from 10.53.0.7",
395            ),
396            expected_parent_state="DSRemoved",
397        ),
398        #
399        # 2.2.2: DS is not withdrawn from some parents.
400        # parental-agents: ns2, ns5, ns7
401        #
402        CheckDSTest(
403            zone=f"incomplete.{checkds}.dsremoved.ns2-5-7",
404            logs_to_wait_for=(
405                "DS response from 10.53.0.2",
406                f"empty DS response from {addr}",
407                "empty DS response from 10.53.0.7",
408            ),
409            expected_parent_state="!DSRemoved",
410        ),
411        #
412        # 2.2.3: One parental agent is badly configured.
413        # parental-agents: ns5, ns6, ns7
414        #
415        CheckDSTest(
416            zone=f"bad.{checkds}.dsremoved.ns5-6-7",
417            logs_to_wait_for=(
418                f"empty DS response from {addr}",
419                "empty DS response from 10.53.0.7",
420                "bad DS response from 10.53.0.6",
421            ),
422            expected_parent_state="!DSRemoved",
423        ),
424        #
425        # 2.2.4:: DS is removed completely, bogus signature.
426        #
427        # TBD
428    ]
429
430
431checkds_no_tests = [
432    CheckDSTest(
433        zone="good.no.dspublish.ns2",
434        logs_to_wait_for=(),
435        expected_parent_state="!DSPublish",
436    ),
437    CheckDSTest(
438        zone="good.no.dspublish.ns2-4",
439        logs_to_wait_for=(),
440        expected_parent_state="!DSPublish",
441    ),
442    CheckDSTest(
443        zone="good.no.dsremoved.ns5",
444        logs_to_wait_for=(),
445        expected_parent_state="!DSRemoved",
446    ),
447    CheckDSTest(
448        zone="good.no.dsremoved.ns5-7",
449        logs_to_wait_for=(),
450        expected_parent_state="!DSRemoved",
451    ),
452]
453
454
455checkds_tests = (
456    parental_agents_tests
457    + no_ent_tests
458    + dspublished_tests("explicit", "10.53.0.8")
459    + dspublished_tests("yes", "10.53.0.2")
460    + dswithdrawn_tests("explicit", "10.53.0.10")
461    + dswithdrawn_tests("yes", "10.53.0.5")
462    + checkds_no_tests
463)
464
465
466@pytest.mark.parametrize("params", checkds_tests, ids=lambda t: t.zone)
467def test_checkds(servers, params):
468    # Wait until the provided zone is signed and then verify its DNSSEC data.
469    zone_check(servers["ns9"], params.zone)
470
471    # Wait up to 10 seconds until all the expected log lines are found in the
472    # log file for the provided server.  Rekey every second if necessary.
473    time_remaining = 10
474    for log_string in params.logs_to_wait_for:
475        line = f"zone {params.zone}/IN (signed): checkds: {log_string}"
476        while line not in servers["ns9"].log:
477            rekey(params.zone)
478            time_remaining -= 1
479            assert time_remaining, f'Timed out waiting for "{log_string}" to be logged'
480            time.sleep(1)
481
482    # Check whether key states on the parent server provided match
483    # expectations.
484    keystate_check(servers["ns2"], params.zone, params.expected_parent_state)
485