1############################################################################ 2# Copyright (C) Internet Systems Consortium, Inc. ("ISC") 3# 4# SPDX-License-Identifier: MPL-2.0 5# 6# This Source Code Form is subject to the terms of the Mozilla Public 7# License, v. 2.0. If a copy of the MPL was not distributed with this 8# file, you can obtain one at https://mozilla.org/MPL/2.0/. 9# 10# See the COPYRIGHT file distributed with this work for additional 11# information regarding copyright ownership. 12############################################################################ 13 14""" 15Sphinx domains for ISC configuration files. 16 17Use setup() to install new Sphinx domains for ISC configuration files. 18 19This extension is based on combination of two Sphinx extension tutorials: 20https://www.sphinx-doc.org/en/master/development/tutorials/todo.html 21https://www.sphinx-doc.org/en/master/development/tutorials/recipe.html 22""" 23 24from collections import namedtuple 25 26from docutils.parsers.rst import Directive, directives 27from docutils import nodes 28 29from sphinx import addnodes 30from sphinx.directives import ObjectDescription 31from sphinx.domains import Domain 32from sphinx.roles import XRefRole 33from sphinx.util import logging 34from sphinx.util.nodes import make_refnode 35 36import checkgrammar 37 38 39logger = logging.getLogger(__name__) 40 41 42def split_csv(argument, required): 43 argument = argument or "" 44 outlist = list(filter(len, (s.strip() for s in argument.split(",")))) 45 if required and not outlist: 46 raise ValueError( 47 "a non-empty list required; provide at least one value or remove" 48 " this option" 49 ) 50 if not len(outlist) == len(set(outlist)): 51 raise ValueError("duplicate value detected") 52 return outlist 53 54 55def domain_factory(domainname, domainlabel, todolist, grammar): 56 """ 57 Return parametrized Sphinx domain object. 58 @param domainname Name used when referencing domain in .rst: e.g. namedconf 59 @param confname Humand-readable name for texts, e.g. named.conf 60 @param todolist A placeholder object which must be pickable. 61 See StatementListDirective. 62 """ 63 64 class StatementListDirective(Directive): 65 """A custom directive to generate list of statements. 66 It only installs placeholder which is later replaced by 67 process_statementlist_nodes() callback. 68 """ 69 70 option_spec = { 71 "filter_blocks": lambda arg: split_csv(arg, required=True), 72 "filter_tags": lambda arg: split_csv(arg, required=True), 73 } 74 75 def run(self): 76 placeholder = todolist("") 77 placeholder["isc_filter_tags"] = self.options.get("filter_tags", []) 78 placeholder["isc_filter_blocks"] = self.options.get("filter_blocks", []) 79 return [placeholder] 80 81 class ISCConfDomain(Domain): 82 """ 83 Custom Sphinx domain for ISC config. 84 Provides .. statement:: directive to define config statement and 85 .. statementlist:: to generate summary tables. 86 :ref:`statementname` works as usual. 87 88 See https://www.sphinx-doc.org/en/master/extdev/domainapi.html 89 """ 90 91 class StatementDirective(ObjectDescription): 92 """ 93 A custom directive that describes a statement, 94 e.g. max-cache-size. 95 """ 96 97 has_content = True 98 required_arguments = 1 99 option_spec = { 100 "tags": lambda arg: split_csv(arg, required=False), 101 # one-sentece description for use in summary tables 102 "short": directives.unchanged_required, 103 "suppress_grammar": directives.flag, 104 } 105 106 @property 107 def isc_name(self): 108 names = self.get_signatures() 109 if len(names) != 1: 110 raise NotImplementedError( 111 "statements with more than one name are not supported", names 112 ) 113 return names[0] 114 115 def handle_signature(self, sig, signode): 116 signode += addnodes.desc_name(text=sig) 117 return sig 118 119 def add_target_and_index(self, _name_cls, sig, signode): 120 signode["ids"].append(domainname + "-statement-" + sig) 121 122 iscconf = self.env.get_domain(domainname) 123 iscconf.add_statement( 124 sig, self.isc_tags, self.isc_short, self.isc_short_node, self.lineno 125 ) 126 127 @property 128 def isc_tags(self): 129 return self.options.get("tags", []) 130 131 @property 132 def isc_short(self): 133 return self.options.get("short", "") 134 135 @property 136 def isc_short_node(self): 137 """Short description parsed from rst to docutils node""" 138 return self.parse_nested_str(self.isc_short) 139 140 def format_path(self, path): 141 assert path[0] == "_top" 142 if len(path) == 1: 143 return "topmost" 144 return ".".join(path[1:]) 145 146 def format_paths(self, paths): 147 zone_types = set() 148 nozone_paths = [] 149 for path in paths: 150 try: 151 zone_idx = path.index("zone") 152 zone_type_txt = path[zone_idx + 1] 153 if zone_type_txt.startswith("type "): 154 zone_types.add(zone_type_txt[len("type ") :]) 155 else: 156 assert zone_type_txt == "in-view" 157 zone_types.add(zone_type_txt) 158 except (ValueError, IndexError): 159 nozone_paths.append(path) 160 condensed_paths = nozone_paths[:] 161 if zone_types: 162 condensed_paths.append( 163 ("_top", "zone (" + ", ".join(sorted(zone_types)) + ")") 164 ) 165 condensed_paths = sorted(condensed_paths, key=len) 166 return list(self.format_path(path) for path in condensed_paths) 167 168 def format_blocks(self, grammar_blocks): 169 """Generate node with list of all allowed blocks""" 170 blocks = nodes.paragraph() 171 blocks += nodes.strong(text="Blocks: ") 172 blocks += nodes.Text(", ".join(self.format_paths(grammar_blocks))) 173 return blocks 174 175 def format_grammar(self, list_blocks, grammar_grp): 176 """ 177 Generate grammar description node, optionally with list of 178 blocks accepting this particular grammar. 179 Example: Grammar (block1, block2): grammar; 180 """ 181 grammarnode = nodes.paragraph() 182 if list_blocks: 183 separator = " " 184 paths = ", ".join( 185 self.format_paths(variant.path for variant in grammar_grp) 186 ) 187 else: 188 separator = "" 189 paths = "" 190 subgrammar = grammar_grp[0].subgrammar 191 subgrammar_txt = checkgrammar.pformat_grammar(subgrammar).strip() 192 grammar_txt = subgrammar.get("_pprint_name", self.isc_name) 193 if subgrammar_txt != ";": 194 grammar_txt += " " 195 grammar_txt += subgrammar_txt 196 if "\n" in grammar_txt.strip(): 197 nodetype = nodes.literal_block 198 else: 199 nodetype = nodes.literal 200 grammarnode += nodes.strong(text=f"Grammar{separator}{paths}: ") 201 grammarnode += nodetype(text=grammar_txt) 202 return grammarnode 203 204 def format_warnings(self, flags): 205 """Return node with a warning box about deprecated and 206 experimental options""" 207 warn = nodes.warning() 208 if "deprecated" in flags: 209 warn += nodes.paragraph( 210 text=( 211 "This option is deprecated and will be removed in a future" 212 " version of BIND." 213 ) 214 ) 215 if "experimental" in flags: 216 warn += nodes.paragraph( 217 text="This option is experimental and subject to change." 218 ) 219 return warn 220 221 def parse_nested_str(self, instr): 222 """Parse string as nested rst syntax and produce a node""" 223 raw = nodes.paragraph(text=instr) 224 parsed = nodes.paragraph() 225 self.state.nested_parse(raw, self.content_offset, parsed) 226 return parsed 227 228 def transform_content(self, content_node: addnodes.desc_content) -> None: 229 """autogenerate content from structured data""" 230 self.workaround_transform_content = True 231 if self.isc_short: 232 content_node.insert(0, self.isc_short_node) 233 if self.isc_tags: 234 tags = nodes.paragraph() 235 tags += nodes.strong(text="Tags: ") 236 tags += nodes.Text(", ".join(self.isc_tags)) 237 content_node.insert(0, tags) 238 239 iscconf = self.env.get_domain(domainname) 240 241 name = self.isc_name 242 if name not in iscconf.statement_blocks: 243 return # not defined in grammar, nothing to render 244 245 blocks = self.format_blocks(iscconf.statement_blocks[name]) 246 content_node.insert(0, blocks) 247 248 grammars = iscconf.statement_grammar_groups[name] 249 multi_grammar = len(grammars) > 1 250 union_flags = set() 251 for grammar_grp in grammars: 252 for one_grammar_dict in grammar_grp: 253 union_flags = union_flags.union( 254 set(one_grammar_dict.subgrammar.get("_flags", [])) 255 ) 256 if "suppress_grammar" in self.options: 257 continue 258 grammarnode = self.format_grammar(multi_grammar, grammar_grp) 259 content_node.insert(0, grammarnode) 260 261 warn = self.format_warnings(union_flags) 262 if len(warn): 263 content_node.insert(0, warn) 264 265 def __init__(self, *args, **kwargs): 266 """Compability with Sphinx < 3.0.0""" 267 self.workaround_transform_content = False 268 super().__init__(*args, **kwargs) 269 270 def run(self): 271 """Compability with Sphinx < 3.0.0""" 272 nodelist = super().run() 273 if not self.workaround_transform_content: 274 # get access to "contentnode" created inside super.run() 275 self.transform_content(nodelist[1][-1]) 276 return nodelist 277 278 name = domainname 279 label = domainlabel 280 281 directives = { 282 "statement": StatementDirective, 283 "statementlist": StatementListDirective, 284 } 285 286 roles = {"ref": XRefRole(warn_dangling=True)} 287 initial_data = { 288 # name -> {"tags": [list of tags], ...}; see add_statement() 289 "statements": {}, 290 } 291 292 indices = {} # no custom indicies 293 294 def __init__(self, *args, **kwargs): 295 super().__init__(*args, **kwargs) 296 self.grammar = grammar 297 self.statement_blocks = checkgrammar.statement2block(grammar, ["_top"]) 298 self.statement_grammar_groups = checkgrammar.diff_statements( 299 self.grammar, self.statement_blocks 300 ) 301 302 def get_objects(self): 303 """ 304 Sphinx API: 305 Iterable of Sphinx object descriptions (tuples defined in the API). 306 """ 307 for obj in self.data["statements"].values(): 308 yield tuple( 309 obj[key] 310 for key in [ 311 "fullname", 312 "signature", 313 "label", 314 "docname", 315 "anchor", 316 "priority", 317 ] 318 ) 319 320 def resolve_xref(self, env, fromdocname, builder, typ, target, node, contnode): 321 """ 322 Sphinx API: 323 Resolve the pending_xref *node* with the given typ and target. 324 """ 325 try: 326 obj = self.data["statements"][self.get_statement_name(target)] 327 except KeyError: 328 return None 329 330 refnode = make_refnode( 331 builder, 332 fromdocname, 333 obj["docname"], 334 obj["anchor"], 335 contnode, 336 obj["anchor"], 337 ) 338 return refnode 339 340 def resolve_any_xref(self, env, fromdocname, builder, target, node, contnode): 341 """ 342 Sphinx API: 343 Raising NotImplementedError uses fall-back bassed on resolve_xref. 344 """ 345 raise NotImplementedError 346 347 @staticmethod 348 def log_statement_overlap(new, old): 349 assert new["fullname"] == old["fullname"] 350 logger.warning( 351 "duplicite detected! %s previously defined at %s:%d", 352 new["fullname"], 353 old["filename"], 354 old["lineno"], 355 location=(new["docname"], new["lineno"]), 356 ) 357 358 def get_statement_name(self, signature): 359 return "{}.{}.{}".format(domainname, "statement", signature) 360 361 def add_statement(self, signature, tags, short, short_node, lineno): 362 """ 363 Add a new statement to the domain data structures. 364 No visible effect. 365 """ 366 name = self.get_statement_name(signature) 367 anchor = "{}-statement-{}".format(domainname, signature) 368 369 new = { 370 "tags": tags, 371 "short": short, 372 "short_node": short_node, 373 "filename": self.env.doc2path(self.env.docname), 374 "lineno": lineno, 375 # Sphinx API 376 "fullname": name, # internal name 377 "signature": signature, # display name 378 "label": domainlabel + " statement", # description for index 379 "docname": self.env.docname, 380 "anchor": anchor, 381 "priority": 1, # search priority 382 } 383 384 if name in self.data["statements"]: 385 self.log_statement_overlap(new, self.data["statements"][name]) 386 self.data["statements"][name] = new 387 388 def clear_doc(self, docname): 389 """ 390 Sphinx API: like env-purge-doc event, but in a domain. 391 392 Remove traces of a document in the domain-specific inventories. 393 """ 394 self.data["statements"] = dict( 395 { 396 key: obj 397 for key, obj in self.data["statements"].items() 398 if obj["docname"] != docname 399 } 400 ) 401 402 def merge_domaindata(self, docnames, otherdata): 403 """Sphinx API: Merge in data regarding *docnames* from a different 404 domaindata inventory (coming from a subprocess in parallel builds). 405 406 @param otherdata is self.data equivalent from another process 407 """ 408 old = self.data["statements"] 409 new = otherdata["statements"] 410 for name in set(old).intersection(set(new)): 411 self.log_statement_overlap(new[name], old[name]) 412 old.update(new) 413 414 def check_consistency(self): 415 """Sphinx API""" 416 defined_statements = set( 417 obj["signature"] for obj in self.data["statements"].values() 418 ) 419 statements_in_grammar = set(self.statement_blocks) 420 missing_statement_sigs = statements_in_grammar.difference( 421 defined_statements 422 ) 423 for missing in missing_statement_sigs: 424 grammars = self.statement_grammar_groups[missing] 425 if len(grammars) == 1: 426 flags = grammars[0][0].subgrammar.get("_flags", []) 427 if ("obsolete" in flags) or ("test only" in flags): 428 continue 429 430 logger.warning( 431 "statement %s is defined in %s grammar but is not described" 432 " using .. statement:: directive", 433 missing, 434 domainlabel, 435 ) 436 437 extra_statement_sigs = defined_statements.difference(statements_in_grammar) 438 for extra in extra_statement_sigs: 439 fullname = self.get_statement_name(extra) 440 desc = self.data["statements"][fullname] 441 logger.warning( 442 ".. statement:: %s found but matching definition in %s grammar is" 443 " missing", 444 extra, 445 domainlabel, 446 location=(desc["docname"], desc["lineno"]), 447 ) 448 449 @classmethod 450 def process_statementlist_nodes(cls, app, doctree): 451 """ 452 Replace todolist objects (placed into document using 453 .. statementlist::) with automatically generated table 454 of statements. 455 """ 456 457 def gen_replacement_table(acceptable_blocks, acceptable_tags): 458 table_header = [ 459 TableColumn("ref", "Statement"), 460 TableColumn("short_node", "Description"), 461 ] 462 tag_header = [] 463 if len(acceptable_tags) != 1: 464 # tags column only if tag filter is not applied 465 tag_header = [ 466 TableColumn("tags_txt", "Tags"), 467 ] 468 469 table_b = DictToDocutilsTableBuilder(table_header + tag_header) 470 table_b.append_iterable( 471 sorted( 472 filter( 473 lambda item: ( 474 ( 475 not acceptable_tags 476 or set(item["tags"]).intersection(acceptable_tags) 477 ) 478 and ( 479 not acceptable_blocks 480 or set(item["block_names"]).intersection( 481 acceptable_blocks 482 ) 483 ) 484 ), 485 iscconf.list_all(), 486 ), 487 key=lambda x: x["fullname"], 488 ) 489 ) 490 return table_b.get_docutils() 491 492 env = app.builder.env 493 iscconf = env.get_domain(cls.name) 494 495 for node in doctree.traverse(todolist): 496 acceptable_tags = node["isc_filter_tags"] 497 acceptable_blocks = node["isc_filter_blocks"] 498 node.replace_self( 499 gen_replacement_table(acceptable_blocks, acceptable_tags) 500 ) 501 502 def list_all(self): 503 for statement in self.data["statements"].values(): 504 sig = statement["signature"] 505 block_names = set( 506 path[-1] for path in self.statement_blocks.get(sig, []) 507 ) 508 tags_txt = ", ".join(statement["tags"]) 509 510 refpara = nodes.inline() 511 refnode = addnodes.pending_xref( 512 sig, 513 reftype="statement", 514 refdomain=domainname, 515 reftarget=sig, 516 refwarn=True, 517 ) 518 refnode += nodes.Text(sig) 519 refpara += refnode 520 521 copy = statement.copy() 522 copy["block_names"] = block_names 523 copy["ref"] = refpara 524 copy["tags_txt"] = tags_txt 525 yield copy 526 527 return ISCConfDomain 528 529 530# source dict key: human description 531TableColumn = namedtuple("TableColumn", ["dictkey", "description"]) 532 533 534class DictToDocutilsTableBuilder: 535 """generate docutils table""" 536 537 def __init__(self, header): 538 """@param header: [ordered list of TableColumn]s""" 539 self.header = header 540 self.table = nodes.table() 541 self.table["classes"] += ["colwidths-auto"] 542 self.returned = False 543 # inner nodes of the table 544 self.tgroup = nodes.tgroup(cols=len(self.header)) 545 for _ in range(len(self.header)): 546 # ignored because of colwidths-auto, but must be present 547 colspec = nodes.colspec(colwidth=1) 548 self.tgroup.append(colspec) 549 self.table += self.tgroup 550 self._gen_header() 551 552 self.tbody = nodes.tbody() 553 self.tgroup += self.tbody 554 555 def _gen_header(self): 556 thead = nodes.thead() 557 558 row = nodes.row() 559 for column in self.header: 560 entry = nodes.entry() 561 entry += nodes.paragraph(text=column.description) 562 row += entry 563 564 thead.append(row) 565 self.tgroup += thead 566 567 def append_iterable(self, objects): 568 """Append rows for each object (dict), ir order. 569 Extract column values from keys listed in self.header.""" 570 for obj in objects: 571 row = nodes.row() 572 for column in self.header: 573 entry = nodes.entry() 574 value = obj[column.dictkey] 575 if isinstance(value, str): 576 value = nodes.paragraph(text=value) 577 else: 578 value = value.deepcopy() 579 entry += value 580 row += entry 581 self.tbody.append(row) 582 583 def get_docutils(self): 584 # guard against table reuse - that's most likely an error 585 assert not self.returned 586 self.returned = True 587 return self.table 588 589 590def setup(app, domainname, confname, docutilsplaceholder, grammar): 591 """ 592 Install new parametrized Sphinx domain. 593 """ 594 595 Conf = domain_factory(domainname, confname, docutilsplaceholder, grammar) 596 app.add_domain(Conf) 597 app.connect("doctree-read", Conf.process_statementlist_nodes) 598 599 return { 600 "version": "0.1", 601 "parallel_read_safe": True, 602 "parallel_write_safe": True, 603 } 604