1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (C) 2021 Intel Corporation 4# All rights reserved. 5# 6 7 8from argparse import ArgumentParser 9from dataclasses import dataclass, field 10from itertools import islice 11from typing import Dict, List, TypeVar 12import ctypes as ct 13import ijson 14import magic 15import os 16import re 17import subprocess 18import sys 19import tempfile 20 21TSC_MAX = (1 << 64) - 1 22UCHAR_MAX = (1 << 8) - 1 23TRACE_MAX_LCORE = 128 24TRACE_MAX_GROUP_ID = 16 25TRACE_MAX_TPOINT_ID = TRACE_MAX_GROUP_ID * 64 26TRACE_MAX_ARGS_COUNT = 8 27TRACE_MAX_RELATIONS = 16 28TRACE_INVALID_OBJECT = (1 << 64) - 1 29OBJECT_NONE = 0 30OWNER_NONE = 0 31 32 33@dataclass 34class DTraceArgument: 35 """Describes a DTrace probe (usdt) argument""" 36 name: str 37 pos: int 38 type: type 39 40 41@dataclass 42class DTraceProbe: 43 """Describes a DTrace probe (usdt) point""" 44 name: str 45 args: Dict[str, DTraceArgument] 46 47 def __init__(self, name, args): 48 self.name = name 49 self.args = {a.name: a for a in args} 50 51 52@dataclass 53class DTraceEntry: 54 """Describes a single DTrace probe invocation""" 55 name: str 56 args: Dict[str, TypeVar('ArgumentType', str, int)] 57 58 def __init__(self, probe, args): 59 valmap = {int: lambda x: int(x, 16), 60 str: lambda x: x.strip().strip("'")} 61 self.name = probe.name 62 self.args = {} 63 for name, value in args.items(): 64 arg = probe.args.get(name) 65 if arg is None: 66 raise ValueError(f'Unexpected argument: {name}') 67 self.args[name] = valmap[arg.type](value) 68 69 70class DTrace: 71 """Generates bpftrace script based on the supplied probe points, parses its 72 output and stores is as a list of DTraceEntry sorted by their tsc. 73 """ 74 def __init__(self, probes, file=None): 75 self._avail_probes = self._list_probes() 76 self._probes = {p.name: p for p in probes} 77 self.entries = self._parse(file) if file is not None else [] 78 # Sanitize the probe definitions 79 for probe in probes: 80 if probe.name not in self._avail_probes: 81 raise ValueError(f'Couldn\'t find probe: "{probe.name}"') 82 for arg in probe.args.values(): 83 if arg.pos >= self._avail_probes[probe.name]: 84 raise ValueError('Invalid probe argument position') 85 if arg.type not in (int, str): 86 raise ValueError('Invalid argument type') 87 88 def _parse(self, file): 89 regex = re.compile(r'(\w+): (.*)') 90 entries = [] 91 92 for line in file.readlines(): 93 match = regex.match(line) 94 if match is None: 95 continue 96 name, args = match.groups() 97 probe = self._probes.get(name) 98 # Skip the line if we don't recognize the probe name 99 if probe is None: 100 continue 101 entries.append(DTraceEntry(probe, args=dict(a.strip().split('=') 102 for a in args.split(',')))) 103 entries.sort(key=lambda e: e.args['tsc']) 104 return entries 105 106 def _list_probes(self): 107 files = subprocess.check_output(['git', 'ls-files', '*.[ch]', 108 ':!:include/spdk_internal/usdt.h']) 109 files = filter(lambda f: len(f) > 0, str(files, 'ascii').split('\n')) 110 regex = re.compile(r'SPDK_DTRACE_PROBE([0-9]*)\((\w+)') 111 probes = {} 112 113 for fname in files: 114 with open(fname, 'r') as file: 115 for match in regex.finditer(file.read()): 116 nargs, name = match.group(1), match.group(2) 117 nargs = int(nargs) if len(nargs) > 0 else 0 118 # Add one to accommodate for the tsc being the first arg 119 probes[name] = nargs + 1 120 return probes 121 122 def _gen_usdt(self, probe): 123 usdt = (f'usdt:__EXE__:{probe.name} {{' + 124 f'printf("{probe.name}: ') 125 args = probe.args 126 if len(args) > 0: 127 argtype = {int: '0x%lx', str: '\'%s\''} 128 argcast = {int: lambda x: x, str: lambda x: f'str({x})'} 129 argstr = [f'{a.name}={argtype[a.type]}' for a in args.values()] 130 argval = [f'{argcast[a.type](f"arg{a.pos}")}' for a in args.values()] 131 usdt += ', '.join(argstr) + '\\n", ' + ', '.join(argval) 132 else: 133 usdt += '\\n"' 134 usdt += ');}' 135 return usdt 136 137 def generate(self): 138 return '\n'.join([self._gen_usdt(p) for p in self._probes.values()]) 139 140 def record(self, pid): 141 with tempfile.NamedTemporaryFile(mode='w+') as script: 142 script.write(self.generate()) 143 script.flush() 144 try: 145 subprocess.run([f'{os.path.dirname(__file__)}/../bpftrace.sh', 146 f'{pid}', f'{script.name}']) 147 except KeyboardInterrupt: 148 pass 149 150 151@dataclass 152class TracepointArgument: 153 """Describes an SPDK tracepoint argument""" 154 TYPE_INT = 0 155 TYPE_PTR = 1 156 TYPE_STR = 2 157 name: str 158 argtype: int 159 160 161@dataclass 162class Tracepoint: 163 """Describes an SPDK tracepoint, equivalent to struct spdk_trace_tpoint""" 164 name: str 165 id: int 166 new_object: bool 167 object_type: int 168 owner_type: int 169 args: List[TracepointArgument] 170 171 172@dataclass 173class TraceEntry: 174 """Describes an SPDK tracepoint entry, equivalent to struct spdk_trace_entry""" 175 lcore: int 176 tpoint: Tracepoint 177 tsc: int 178 poller: str 179 size: int 180 object_id: str 181 object_ptr: int 182 time: int 183 args: Dict[str, TypeVar('ArgumentType', str, int)] 184 related: str 185 186 187class TraceProvider: 188 """Defines interface for objects providing traces and tracepoint definitions""" 189 190 def tpoints(self): 191 """Returns tracepoint definitions as a dict of (tracepoint_name, tracepoint)""" 192 raise NotImplementedError() 193 194 def entries(self): 195 """Generator returning subsequent trace entries""" 196 raise NotImplementedError() 197 198 def tsc_rate(self): 199 """Returns the TSC rate that was in place when traces were collected""" 200 raise NotImplementedError() 201 202 203class JsonProvider(TraceProvider): 204 """Trace provider based on JSON-formatted output produced by spdk_trace app""" 205 def __init__(self, file): 206 self._parser = ijson.parse(file) 207 self._tpoints = {} 208 self._parse_defs() 209 210 def _parse_tpoints(self, tpoints): 211 for tpoint in tpoints: 212 tpoint_id = tpoint['id'] 213 self._tpoints[tpoint_id] = Tracepoint( 214 name=tpoint['name'], id=tpoint_id, 215 new_object=tpoint['new_object'], object_type=OBJECT_NONE, 216 owner_type=OWNER_NONE, 217 args=[TracepointArgument(name=a['name'], 218 argtype=a['type']) 219 for a in tpoint.get('args', [])]) 220 221 def _parse_defs(self): 222 builder = None 223 for prefix, event, value in self._parser: 224 # If we reach entries array, there are no more tracepoint definitions 225 if prefix == 'entries': 226 break 227 elif prefix == 'tsc_rate': 228 self._tsc_rate = value 229 continue 230 231 if (prefix, event) == ('tpoints', 'start_array'): 232 builder = ijson.ObjectBuilder() 233 if builder is not None: 234 builder.event(event, value) 235 if (prefix, event) == ('tpoints', 'end_array'): 236 self._parse_tpoints(builder.value) 237 builder = None 238 239 def _parse_entry(self, entry): 240 tpoint = self._tpoints[entry['tpoint']] 241 obj = entry.get('object', {}) 242 return TraceEntry(tpoint=tpoint, lcore=entry['lcore'], tsc=entry['tsc'], 243 size=entry.get('size'), object_id=obj.get('id'), 244 object_ptr=obj.get('value'), related=entry.get('related'), 245 time=obj.get('time'), poller=entry.get('poller'), 246 args={n.name: v for n, v in zip(tpoint.args, entry.get('args', []))}) 247 248 def tsc_rate(self): 249 return self._tsc_rate 250 251 def tpoints(self): 252 return self._tpoints 253 254 def entries(self): 255 builder = None 256 for prefix, event, value in self._parser: 257 if (prefix, event) == ('entries.item', 'start_map'): 258 builder = ijson.ObjectBuilder() 259 if builder is not None: 260 builder.event(event, value) 261 if (prefix, event) == ('entries.item', 'end_map'): 262 yield self._parse_entry(builder.value) 263 builder = None 264 265 266class CParserOpts(ct.Structure): 267 _fields_ = [('filename', ct.c_char_p), 268 ('mode', ct.c_int), 269 ('lcore', ct.c_uint16)] 270 271 272class CTraceOwner(ct.Structure): 273 _fields_ = [('type', ct.c_uint8), 274 ('id_prefix', ct.c_char)] 275 276 277class CTraceObject(ct.Structure): 278 _fields_ = [('type', ct.c_uint8), 279 ('id_prefix', ct.c_char)] 280 281 282class CTpointArgument(ct.Structure): 283 _fields_ = [('name', ct.c_char * 14), 284 ('type', ct.c_uint8), 285 ('size', ct.c_uint8)] 286 287 288class CTpointRelatedObject(ct.Structure): 289 _fields_ = [('object_type', ct.c_uint8), 290 ('arg_index', ct.c_uint8)] 291 292 293class CTracepoint(ct.Structure): 294 _fields_ = [('name', ct.c_char * 24), 295 ('tpoint_id', ct.c_uint16), 296 ('owner_type', ct.c_uint8), 297 ('object_type', ct.c_uint8), 298 ('new_object', ct.c_uint8), 299 ('num_args', ct.c_uint8), 300 ('args', CTpointArgument * TRACE_MAX_ARGS_COUNT), 301 ('related_objects', CTpointRelatedObject * TRACE_MAX_RELATIONS)] 302 303 304class CTraceFlags(ct.Structure): 305 _fields_ = [('tsc_rate', ct.c_uint64), 306 ('tpoint_mask', ct.c_uint64 * TRACE_MAX_GROUP_ID), 307 ('owner', CTraceOwner * (UCHAR_MAX + 1)), 308 ('object', CTraceObject * (UCHAR_MAX + 1)), 309 ('tpoint', CTracepoint * TRACE_MAX_TPOINT_ID)] 310 311 312class CTraceEntry(ct.Structure): 313 _fields_ = [('tsc', ct.c_uint64), 314 ('tpoint_id', ct.c_uint16), 315 ('poller_id', ct.c_uint16), 316 ('size', ct.c_uint32), 317 ('object_id', ct.c_uint64)] 318 319 320class CTraceParserArgument(ct.Union): 321 _fields_ = [('integer', ct.c_uint64), 322 ('pointer', ct.c_void_p), 323 ('string', ct.c_char * (UCHAR_MAX + 1))] 324 325 326class CTraceParserEntry(ct.Structure): 327 _fields_ = [('entry', ct.POINTER(CTraceEntry)), 328 ('object_index', ct.c_uint64), 329 ('object_start', ct.c_uint64), 330 ('lcore', ct.c_uint16), 331 ('related_index', ct.c_uint64), 332 ('related_type', ct.c_uint8), 333 ('args', CTraceParserArgument * TRACE_MAX_ARGS_COUNT)] 334 335 336class NativeProvider(TraceProvider): 337 """Trace provider based on SPDK's trace library""" 338 def __init__(self, file): 339 self._setup_binding(file.name) 340 self._parse_defs() 341 342 def __del__(self): 343 if hasattr(self, '_parser'): 344 self._lib.spdk_trace_parser_cleanup(self._parser) 345 346 def _setup_binding(self, filename): 347 self._lib = ct.CDLL('build/lib/libspdk_trace_parser.so') 348 self._lib.spdk_trace_parser_init.restype = ct.c_void_p 349 self._lib.spdk_trace_parser_init.errcheck = lambda r, *_: ct.c_void_p(r) 350 self._lib.spdk_trace_parser_get_flags.restype = ct.POINTER(CTraceFlags) 351 opts = CParserOpts(filename=bytes(filename, 'ascii'), mode=0, 352 lcore=TRACE_MAX_LCORE) 353 self._parser = self._lib.spdk_trace_parser_init(ct.byref(opts)) 354 if not self._parser: 355 raise ValueError('Failed to construct SPDK trace parser') 356 357 def _parse_tpoints(self, tpoints): 358 self._tpoints = {} 359 for tpoint in tpoints: 360 if len(tpoint.name) == 0: 361 continue 362 self._tpoints[tpoint.tpoint_id] = Tracepoint( 363 name=str(tpoint.name, 'ascii'), object_type=tpoint.object_type, 364 owner_type=tpoint.owner_type, id=tpoint.tpoint_id, 365 new_object=bool(tpoint.new_object), 366 args=[TracepointArgument(name=str(a.name, 'ascii'), argtype=a.type) 367 for a in tpoint.args[:tpoint.num_args]]) 368 369 def _parse_defs(self): 370 flags = self._lib.spdk_trace_parser_get_flags(self._parser) 371 self._tsc_rate = flags.contents.tsc_rate 372 self._parse_tpoints(flags.contents.tpoint) 373 374 def conv_objs(arr): 375 return {int(o.type): str(o.id_prefix, 'ascii') for o in arr if o.id_prefix != b'\x00'} 376 self._owners = conv_objs(flags.contents.owner) 377 self._objects = conv_objs(flags.contents.object) 378 379 def tsc_rate(self): 380 return self._tsc_rate 381 382 def tpoints(self): 383 return self._tpoints 384 385 def entries(self): 386 pe = CTraceParserEntry() 387 argconv = {TracepointArgument.TYPE_INT: lambda a: a.integer, 388 TracepointArgument.TYPE_PTR: lambda a: int(a.pointer or 0), 389 TracepointArgument.TYPE_STR: lambda a: str(a.string, 'ascii')} 390 391 while self._lib.spdk_trace_parser_next_entry(self._parser, ct.byref(pe)): 392 entry = pe.entry.contents 393 lcore = pe.lcore 394 tpoint = self._tpoints[entry.tpoint_id] 395 args = {a.name: argconv[a.argtype](pe.args[i]) for i, a in enumerate(tpoint.args)} 396 397 if tpoint.object_type != OBJECT_NONE: 398 if pe.object_index != TRACE_INVALID_OBJECT: 399 object_id = '{}{}'.format(self._objects[tpoint.object_type], pe.object_index) 400 ts = entry.tsc - pe.object_start 401 else: 402 object_id, ts = 'n/a', None 403 elif entry.object_id != 0: 404 object_id, ts = '{:x}'.format(entry.object_id), None 405 else: 406 object_id, ts = None, None 407 408 if tpoint.owner_type != OWNER_NONE: 409 poller_id = '{}{:02}'.format(self._owners[tpoint.owner_type], entry.poller_id) 410 else: 411 poller_id = None 412 413 if pe.related_type != OBJECT_NONE: 414 related = '{}{}'.format(self._objects[pe.related_type], pe.related_index) 415 else: 416 related = None 417 418 yield TraceEntry(tpoint=tpoint, lcore=lcore, tsc=entry.tsc, 419 size=entry.size, object_id=object_id, 420 object_ptr=entry.object_id, poller=poller_id, time=ts, 421 args=args, related=related) 422 423 424class Trace: 425 """Stores, parses, and prints out SPDK traces""" 426 def __init__(self, file): 427 if file == sys.stdin or magic.from_file(file.name, mime=True) == 'application/json': 428 self._provider = JsonProvider(file) 429 else: 430 self._provider = NativeProvider(file) 431 self._objects = [] 432 self._argfmt = {TracepointArgument.TYPE_PTR: lambda a: f'0x{a:x}'} 433 self.tpoints = self._provider.tpoints() 434 435 def _annotate_args(self, entry): 436 annotations = {} 437 for obj in self._objects: 438 current = obj.annotate(entry) 439 if current is None: 440 continue 441 annotations.update(current) 442 return annotations 443 444 def _format_args(self, entry): 445 annotations = self._annotate_args(entry) 446 args = [] 447 for arg, (name, value) in zip(entry.tpoint.args, entry.args.items()): 448 annot = annotations.get(name) 449 if annot is not None: 450 args.append('{}({})'.format(name, ', '.join(f'{n}={v}' for n, v in annot.items()))) 451 else: 452 args.append('{}: {}'.format(name, self._argfmt.get(arg.argtype, 453 lambda a: a)(value))) 454 return args 455 456 def register_object(self, obj): 457 self._objects.append(obj) 458 459 def print(self): 460 def get_us(tsc, off): 461 return ((tsc - off) * 10 ** 6) / self._provider.tsc_rate() 462 463 offset = None 464 for e in self._provider.entries(): 465 offset = e.tsc if offset is None else offset 466 timestamp = get_us(e.tsc, offset) 467 diff = get_us(e.time, 0) if e.time is not None else None 468 args = ', '.join(self._format_args(e)) 469 related = ' (' + e.related + ')' if e.related is not None else '' 470 471 print(('{:3} {:16.3f} {:3} {:24} {:12}'.format( 472 e.lcore, timestamp, e.poller if e.poller is not None else '', 473 e.tpoint.name, f'size: {e.size}' if e.size else '') + 474 (f'id: {e.object_id + related:12} ' if e.object_id is not None else '') + 475 (f'time: {diff:<8.3f} ' if diff is not None else '') + 476 args).rstrip()) 477 478 479class SPDKObject: 480 """Describes a specific type of an SPDK objects (e.g. qpair, thread, etc.)""" 481 @dataclass 482 class Lifetime: 483 """Describes a lifetime and properties of a particular SPDK object.""" 484 begin: int 485 end: int 486 ptr: int 487 properties: dict = field(default_factory=dict) 488 489 def __init__(self, trace: Trace, tpoints: List[str]): 490 self.tpoints = {} 491 for name in tpoints: 492 tpoint = next((t for t in trace.tpoints.values() if t.name == name), None) 493 if tpoint is None: 494 # Some tpoints might be undefined if configured without specific subsystems 495 continue 496 self.tpoints[tpoint.id] = tpoint 497 498 def _annotate(self, entry: TraceEntry): 499 """Abstract annotation method to be implemented by subclasses.""" 500 raise NotImplementedError() 501 502 def annotate(self, entry: TraceEntry): 503 """Annotates a tpoint entry and returns a dict indexed by argname with values representing 504 various object properties. For instance, {"qpair": {"qid": 1, "subnqn": "nqn"}} could be 505 returned to annotate an argument called "qpair" with two items: "qid" and "subnqn". 506 """ 507 if entry.tpoint.id not in self.tpoints: 508 return None 509 return self._annotate(entry) 510 511 512class QPair(SPDKObject): 513 def __init__(self, trace: Trace, dtrace: DTrace): 514 super().__init__(trace, tpoints=[ 515 'RDMA_REQ_NEW', 516 'RDMA_REQ_NEED_BUFFER', 517 'RDMA_REQ_TX_PENDING_C2H', 518 'RDMA_REQ_TX_PENDING_H2C', 519 'RDMA_REQ_TX_H2C', 520 'RDMA_REQ_RDY_TO_EXECUTE', 521 'RDMA_REQ_EXECUTING', 522 'RDMA_REQ_EXECUTED', 523 'RDMA_REQ_RDY_TO_COMPL', 524 'RDMA_REQ_COMPLETING_C2H', 525 'RDMA_REQ_COMPLETING', 526 'RDMA_REQ_COMPLETED', 527 'TCP_REQ_NEW', 528 'TCP_REQ_NEED_BUFFER', 529 'TCP_REQ_TX_H_TO_C', 530 'TCP_REQ_RDY_TO_EXECUTE', 531 'TCP_REQ_EXECUTING', 532 'TCP_REQ_EXECUTED', 533 'TCP_REQ_RDY_TO_COMPLETE', 534 'TCP_REQ_TRANSFER_C2H', 535 'TCP_REQ_COMPLETED', 536 'TCP_WRITE_START', 537 'TCP_WRITE_DONE', 538 'TCP_READ_DONE', 539 'TCP_REQ_AWAIT_R2T_ACK']) 540 self._objects = [] 541 self._find_objects(dtrace.entries) 542 543 def _find_objects(self, dprobes): 544 def probe_match(probe, other): 545 return probe.args['qpair'] == other.args['qpair'] 546 547 for i, dprobe in enumerate(dprobes): 548 if dprobe.name != 'nvmf_poll_group_add_qpair': 549 continue 550 # We've found a new qpair, now find the probe indicating its destruction 551 last_idx, last = next((((i + j + 1), d) for j, d in enumerate(islice(dprobes, i, None)) 552 if d.name == 'nvmf_poll_group_remove_qpair' and 553 probe_match(d, dprobe)), (None, None)) 554 obj = SPDKObject.Lifetime(begin=dprobe.args['tsc'], 555 end=last.args['tsc'] if last is not None else TSC_MAX, 556 ptr=dprobe.args['qpair'], 557 properties={'ptr': hex(dprobe.args['qpair']), 558 'thread': dprobe.args['thread']}) 559 for other in filter(lambda p: probe_match(p, dprobe), dprobes[i:last_idx]): 560 if other.name == 'nvmf_ctrlr_add_qpair': 561 for prop in ['qid', 'subnqn', 'hostnqn']: 562 obj.properties[prop] = other.args[prop] 563 self._objects.append(obj) 564 565 def _annotate(self, entry): 566 qpair = entry.args.get('qpair') 567 if qpair is None: 568 return None 569 for obj in self._objects: 570 if obj.ptr == qpair and obj.begin <= entry.tsc <= obj.end: 571 return {'qpair': obj.properties} 572 return None 573 574 575def build_dtrace(file=None): 576 return DTrace([ 577 DTraceProbe( 578 name='nvmf_poll_group_add_qpair', 579 args=[DTraceArgument(name='tsc', pos=0, type=int), 580 DTraceArgument(name='qpair', pos=1, type=int), 581 DTraceArgument(name='thread', pos=2, type=int)]), 582 DTraceProbe( 583 name='nvmf_poll_group_remove_qpair', 584 args=[DTraceArgument(name='tsc', pos=0, type=int), 585 DTraceArgument(name='qpair', pos=1, type=int), 586 DTraceArgument(name='thread', pos=2, type=int)]), 587 DTraceProbe( 588 name='nvmf_ctrlr_add_qpair', 589 args=[DTraceArgument(name='tsc', pos=0, type=int), 590 DTraceArgument(name='qpair', pos=1, type=int), 591 DTraceArgument(name='qid', pos=2, type=int), 592 DTraceArgument(name='subnqn', pos=3, type=str), 593 DTraceArgument(name='hostnqn', pos=4, type=str)])], file) 594 595 596def print_trace(trace_file, dtrace_file): 597 dtrace = build_dtrace(dtrace_file) 598 trace = Trace(trace_file) 599 trace.register_object(QPair(trace, dtrace)) 600 trace.print() 601 602 603def main(argv): 604 parser = ArgumentParser(description='SPDK trace annotation script') 605 parser.add_argument('-i', '--input', 606 help='Trace file to annotate (either JSON generated by spdk_trace or ' + 607 'raw binary produced by the SPDK application itself)') 608 parser.add_argument('-g', '--generate', help='Generate bpftrace script', action='store_true') 609 parser.add_argument('-r', '--record', help='Record BPF traces on PID', metavar='PID', type=int) 610 parser.add_argument('-b', '--bpftrace', help='BPF trace script to use for annotations') 611 args = parser.parse_args(argv) 612 613 if args.generate: 614 print(build_dtrace().generate()) 615 elif args.record: 616 build_dtrace().record(args.record) 617 else: 618 print_trace(open(args.input, 'r') if args.input is not None else sys.stdin, 619 open(args.bpftrace) if args.bpftrace is not None else None) 620 621 622if __name__ == '__main__': 623 # In order for the changes to LD_LIBRARY_PATH to be visible to the loader, 624 # they need to be applied before starting a process, so we need to 625 # re-execute the script after updating it. 626 if os.environ.get('SPDK_BPF_TRACE_PY') is None: 627 rootdir = f'{os.path.dirname(__file__)}/../..' 628 os.environ['LD_LIBRARY_PATH'] = ':'.join([os.environ.get('LD_LIBRARY_PATH', ''), 629 f'{rootdir}/build/lib']) 630 os.environ['SPDK_BPF_TRACE_PY'] = '1' 631 os.execv(sys.argv[0], sys.argv) 632 else: 633 try: 634 main(sys.argv[1:]) 635 except (KeyboardInterrupt, BrokenPipeError): 636 pass 637