1#!/usr/bin/python3 2 3# Copyright (C) Internet Systems Consortium, Inc. ("ISC") 4# 5# SPDX-License-Identifier: MPL-2.0 6# 7# This Source Code Form is subject to the terms of the Mozilla Public 8# License, v. 2.0. If a copy of the MPL was not distributed with this 9# file, you can obtain one at https://mozilla.org/MPL/2.0/. 10# 11# See the COPYRIGHT file distributed with this work for additional 12# information regarding copyright ownership. 13 14 15from typing import NamedTuple, Tuple 16 17import os 18import sys 19import time 20 21import isctest 22import pytest 23 24pytest.importorskip("dns", minversion="2.0.0") 25import dns.exception 26import dns.message 27import dns.name 28import dns.rcode 29import dns.rdataclass 30import dns.rdatatype 31 32 33pytestmark = [ 34 pytest.mark.skipif( 35 sys.version_info < (3, 7), reason="Python >= 3.7 required [GL #3001]" 36 ), 37 pytest.mark.extra_artifacts( 38 [ 39 "*.out", 40 "ns*/*.db", 41 "ns*/*.db.infile", 42 "ns*/*.db.signed", 43 "ns*/*.jnl", 44 "ns*/*.jbk", 45 "ns*/*.keyname", 46 "ns*/dsset-*", 47 "ns*/K*", 48 "ns*/keygen.out*", 49 "ns*/settime.out*", 50 "ns*/signer.out*", 51 "ns*/trusted.conf", 52 "ns*/zones", 53 ] 54 ), 55] 56 57 58def has_signed_apex_nsec(zone, response): 59 has_nsec = False 60 has_rrsig = False 61 62 ttl = 300 63 nextname = "a." 64 labelcount = zone.count(".") # zone is specified as FQDN 65 types = "NS SOA RRSIG NSEC DNSKEY" 66 match = f"{zone} {ttl} IN NSEC {nextname}{zone} {types}" 67 sig = f"{zone} {ttl} IN RRSIG NSEC 13 {labelcount} 300" 68 69 for rr in response.answer: 70 if match in rr.to_text(): 71 has_nsec = True 72 if sig in rr.to_text(): 73 has_rrsig = True 74 75 if not has_nsec: 76 print("error: missing apex NSEC record in response") 77 if not has_rrsig: 78 print("error: missing NSEC signature in response") 79 80 return has_nsec and has_rrsig 81 82 83def do_query(server, qname, qtype, tcp=False): 84 msg = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True) 85 query_func = isctest.query.tcp if tcp else isctest.query.udp 86 response = query_func(msg, server.ip, expected_rcode=dns.rcode.NOERROR) 87 return response 88 89 90def verify_zone(zone, transfer): 91 verify = os.getenv("VERIFY") 92 assert verify is not None 93 94 filename = f"{zone}out" 95 with open(filename, "w", encoding="utf-8") as file: 96 for rr in transfer.answer: 97 file.write(rr.to_text()) 98 file.write("\n") 99 100 # dnssec-verify command with default arguments. 101 verify_cmd = [verify, "-z", "-o", zone, filename] 102 103 verifier = isctest.run.cmd(verify_cmd) 104 105 if verifier.returncode != 0: 106 print(f"error: dnssec-verify {zone} failed") 107 sys.stderr.buffer.write(verifier.stderr) 108 109 return verifier.returncode == 0 110 111 112def read_statefile(server, zone): 113 count = 0 114 keyid = 0 115 state = {} 116 117 response = do_query(server, zone, "DS", tcp=True) 118 # fetch key id from response. 119 for rr in response.answer: 120 if rr.match( 121 dns.name.from_text(zone), 122 dns.rdataclass.IN, 123 dns.rdatatype.DS, 124 dns.rdatatype.NONE, 125 ): 126 if count == 0: 127 keyid = list(dict(rr.items).items())[0][0].key_tag 128 count += 1 129 130 assert ( 131 count == 1 132 ), f"expected a single DS in response for {zone} from {server.ip}, got {count}" 133 134 filename = f"ns9/K{zone}+013+{keyid:05d}.state" 135 print(f"read state file {filename}") 136 137 try: 138 with open(filename, "r", encoding="utf-8") as file: 139 for line in file: 140 if line.startswith(";"): 141 continue 142 key, val = line.strip().split(":", 1) 143 state[key.strip()] = val.strip() 144 except FileNotFoundError: 145 # file may not be written just yet. 146 return {} 147 148 return state 149 150 151def zone_check(server, zone): 152 fqdn = f"{zone}." 153 154 # check zone is fully signed. 155 response = do_query(server, fqdn, "NSEC") 156 assert has_signed_apex_nsec(fqdn, response) 157 158 # check if zone if DNSSEC valid. 159 transfer = do_query(server, fqdn, "AXFR", tcp=True) 160 assert verify_zone(fqdn, transfer) 161 162 163def keystate_check(server, zone, key): 164 fqdn = f"{zone}." 165 val = 0 166 deny = False 167 168 search = key 169 if key.startswith("!"): 170 deny = True 171 search = key[1:] 172 173 for _ in range(10): 174 state = read_statefile(server, fqdn) 175 try: 176 val = state[search] 177 except KeyError: 178 pass 179 180 if not deny and val != 0: 181 break 182 if deny and val == 0: 183 break 184 185 time.sleep(1) 186 187 if deny: 188 assert val == 0 189 else: 190 assert val != 0 191 192 193def rekey(zone): 194 rndc = os.getenv("RNDC") 195 assert rndc is not None 196 197 port = os.getenv("CONTROLPORT") 198 assert port is not None 199 200 # rndc loadkeys. 201 rndc_cmd = [ 202 rndc, 203 "-c", 204 "../_common/rndc.conf", 205 "-p", 206 port, 207 "-s", 208 "10.53.0.9", 209 "loadkeys", 210 zone, 211 ] 212 controller = isctest.run.cmd(rndc_cmd) 213 214 if controller.returncode != 0: 215 print(f"error: rndc loadkeys {zone} failed") 216 sys.stderr.buffer.write(controller.stderr) 217 218 assert controller.returncode == 0 219 220 221class CheckDSTest(NamedTuple): 222 zone: str 223 logs_to_wait_for: Tuple[str] 224 expected_parent_state: str 225 226 227parental_agents_tests = [ 228 # Using a reference to parental-agents. 229 CheckDSTest( 230 zone="reference.explicit.dspublish.ns2", 231 logs_to_wait_for=("DS response from 10.53.0.8",), 232 expected_parent_state="DSPublish", 233 ), 234 # Using a resolver as parental-agent (ns3). 235 CheckDSTest( 236 zone="resolver.explicit.dspublish.ns2", 237 logs_to_wait_for=("DS response from 10.53.0.3",), 238 expected_parent_state="DSPublish", 239 ), 240 # Using a resolver as parental-agent (ns3). 241 CheckDSTest( 242 zone="resolver.explicit.dsremoved.ns5", 243 logs_to_wait_for=("empty DS response from 10.53.0.3",), 244 expected_parent_state="DSRemoved", 245 ), 246] 247 248no_ent_tests = [ 249 CheckDSTest( 250 zone="no-ent.ns2", 251 logs_to_wait_for=("DS response from 10.53.0.2",), 252 expected_parent_state="DSPublish", 253 ), 254 CheckDSTest( 255 zone="no-ent.ns5", 256 logs_to_wait_for=("DS response from 10.53.0.5",), 257 expected_parent_state="DSRemoved", 258 ), 259] 260 261 262def dspublished_tests(checkds, addr): 263 return [ 264 # 265 # 1.1.1: DS is correctly published in parent. 266 # parental-agents: ns2 267 # 268 # The simple case. 269 CheckDSTest( 270 zone=f"good.{checkds}.dspublish.ns2", 271 logs_to_wait_for=(f"DS response from {addr}",), 272 expected_parent_state="DSPublish", 273 ), 274 # 275 # 1.1.2: DS is not published in parent. 276 # parental-agents: ns5 277 # 278 CheckDSTest( 279 zone=f"not-yet.{checkds}.dspublish.ns5", 280 logs_to_wait_for=("empty DS response from 10.53.0.5",), 281 expected_parent_state="!DSPublish", 282 ), 283 # 284 # 1.1.3: The parental agent is badly configured. 285 # parental-agents: ns6 286 # 287 CheckDSTest( 288 zone=f"bad.{checkds}.dspublish.ns6", 289 logs_to_wait_for=( 290 ( 291 "bad DS response from 10.53.0.6" 292 if checkds == "explicit" 293 else "error during parental-agents processing" 294 ), 295 ), 296 expected_parent_state="!DSPublish", 297 ), 298 # 299 # 1.1.4: DS is published, but has bogus signature. 300 # 301 # TBD 302 # 303 # 1.2.1: DS is correctly published in all parents. 304 # parental-agents: ns2, ns4 305 # 306 CheckDSTest( 307 zone=f"good.{checkds}.dspublish.ns2-4", 308 logs_to_wait_for=(f"DS response from {addr}", "DS response from 10.53.0.4"), 309 expected_parent_state="DSPublish", 310 ), 311 # 312 # 1.2.2: DS is not published in some parents. 313 # parental-agents: ns2, ns4, ns5 314 # 315 CheckDSTest( 316 zone=f"incomplete.{checkds}.dspublish.ns2-4-5", 317 logs_to_wait_for=( 318 f"DS response from {addr}", 319 "DS response from 10.53.0.4", 320 "empty DS response from 10.53.0.5", 321 ), 322 expected_parent_state="!DSPublish", 323 ), 324 # 325 # 1.2.3: One parental agent is badly configured. 326 # parental-agents: ns2, ns4, ns6 327 # 328 CheckDSTest( 329 zone=f"bad.{checkds}.dspublish.ns2-4-6", 330 logs_to_wait_for=( 331 f"DS response from {addr}", 332 "DS response from 10.53.0.4", 333 "bad DS response from 10.53.0.6", 334 ), 335 expected_parent_state="!DSPublish", 336 ), 337 # 338 # 1.2.4: DS is completely published, bogus signature. 339 # 340 # TBD 341 # TBD: Check with TSIG 342 # TBD: Check with TLS 343 ] 344 345 346def dswithdrawn_tests(checkds, addr): 347 return [ 348 # 349 # 2.1.1: DS correctly withdrawn from the parent. 350 # parental-agents: ns5 351 # 352 # The simple case. 353 CheckDSTest( 354 zone=f"good.{checkds}.dsremoved.ns5", 355 logs_to_wait_for=(f"empty DS response from {addr}",), 356 expected_parent_state="DSRemoved", 357 ), 358 # 359 # 2.1.2: DS is published in the parent. 360 # parental-agents: ns2 361 # 362 CheckDSTest( 363 zone=f"still-there.{checkds}.dsremoved.ns2", 364 logs_to_wait_for=("DS response from 10.53.0.2",), 365 expected_parent_state="!DSRemoved", 366 ), 367 # 368 # 2.1.3: The parental agent is badly configured. 369 # parental-agents: ns6 370 # 371 CheckDSTest( 372 zone=f"bad.{checkds}.dsremoved.ns6", 373 logs_to_wait_for=( 374 ( 375 "bad DS response from 10.53.0.6" 376 if checkds == "explicit" 377 else "error during parental-agents processing" 378 ), 379 ), 380 expected_parent_state="!DSRemoved", 381 ), 382 # 383 # 2.1.4: DS is withdrawn, but has bogus signature. 384 # 385 # TBD 386 # 387 # 2.2.1: DS is correctly withdrawn from all parents. 388 # parental-agents: ns5, ns7 389 # 390 CheckDSTest( 391 zone=f"good.{checkds}.dsremoved.ns5-7", 392 logs_to_wait_for=( 393 f"empty DS response from {addr}", 394 "empty DS response from 10.53.0.7", 395 ), 396 expected_parent_state="DSRemoved", 397 ), 398 # 399 # 2.2.2: DS is not withdrawn from some parents. 400 # parental-agents: ns2, ns5, ns7 401 # 402 CheckDSTest( 403 zone=f"incomplete.{checkds}.dsremoved.ns2-5-7", 404 logs_to_wait_for=( 405 "DS response from 10.53.0.2", 406 f"empty DS response from {addr}", 407 "empty DS response from 10.53.0.7", 408 ), 409 expected_parent_state="!DSRemoved", 410 ), 411 # 412 # 2.2.3: One parental agent is badly configured. 413 # parental-agents: ns5, ns6, ns7 414 # 415 CheckDSTest( 416 zone=f"bad.{checkds}.dsremoved.ns5-6-7", 417 logs_to_wait_for=( 418 f"empty DS response from {addr}", 419 "empty DS response from 10.53.0.7", 420 "bad DS response from 10.53.0.6", 421 ), 422 expected_parent_state="!DSRemoved", 423 ), 424 # 425 # 2.2.4:: DS is removed completely, bogus signature. 426 # 427 # TBD 428 ] 429 430 431checkds_no_tests = [ 432 CheckDSTest( 433 zone="good.no.dspublish.ns2", 434 logs_to_wait_for=(), 435 expected_parent_state="!DSPublish", 436 ), 437 CheckDSTest( 438 zone="good.no.dspublish.ns2-4", 439 logs_to_wait_for=(), 440 expected_parent_state="!DSPublish", 441 ), 442 CheckDSTest( 443 zone="good.no.dsremoved.ns5", 444 logs_to_wait_for=(), 445 expected_parent_state="!DSRemoved", 446 ), 447 CheckDSTest( 448 zone="good.no.dsremoved.ns5-7", 449 logs_to_wait_for=(), 450 expected_parent_state="!DSRemoved", 451 ), 452] 453 454 455checkds_tests = ( 456 parental_agents_tests 457 + no_ent_tests 458 + dspublished_tests("explicit", "10.53.0.8") 459 + dspublished_tests("yes", "10.53.0.2") 460 + dswithdrawn_tests("explicit", "10.53.0.10") 461 + dswithdrawn_tests("yes", "10.53.0.5") 462 + checkds_no_tests 463) 464 465 466@pytest.mark.parametrize("params", checkds_tests, ids=lambda t: t.zone) 467def test_checkds(servers, params): 468 # Wait until the provided zone is signed and then verify its DNSSEC data. 469 zone_check(servers["ns9"], params.zone) 470 471 # Wait up to 10 seconds until all the expected log lines are found in the 472 # log file for the provided server. Rekey every second if necessary. 473 time_remaining = 10 474 for log_string in params.logs_to_wait_for: 475 line = f"zone {params.zone}/IN (signed): checkds: {log_string}" 476 while line not in servers["ns9"].log: 477 rekey(params.zone) 478 time_remaining -= 1 479 assert time_remaining, f'Timed out waiting for "{log_string}" to be logged' 480 time.sleep(1) 481 482 # Check whether key states on the parent server provided match 483 # expectations. 484 keystate_check(servers["ns2"], params.zone, params.expected_parent_state) 485