1#!/usr/bin/env python3 2 3from argparse import ArgumentParser 4from dataclasses import dataclass, field 5from itertools import islice 6from typing import Dict, List, TypeVar 7import ctypes as ct 8import ijson 9import magic 10import os 11import re 12import subprocess 13import sys 14import tempfile 15 16TSC_MAX = (1 << 64) - 1 17UCHAR_MAX = (1 << 8) - 1 18TRACE_MAX_LCORE = 128 19TRACE_MAX_GROUP_ID = 16 20TRACE_MAX_TPOINT_ID = TRACE_MAX_GROUP_ID * 64 21TRACE_MAX_ARGS_COUNT = 8 22TRACE_MAX_RELATIONS = 16 23TRACE_INVALID_OBJECT = (1 << 64) - 1 24OBJECT_NONE = 0 25OWNER_NONE = 0 26 27 28@dataclass 29class DTraceArgument: 30 """Describes a DTrace probe (usdt) argument""" 31 name: str 32 pos: int 33 type: type 34 35 36@dataclass 37class DTraceProbe: 38 """Describes a DTrace probe (usdt) point""" 39 name: str 40 args: Dict[str, DTraceArgument] 41 42 def __init__(self, name, args): 43 self.name = name 44 self.args = {a.name: a for a in args} 45 46 47@dataclass 48class DTraceEntry: 49 """Describes a single DTrace probe invocation""" 50 name: str 51 args: Dict[str, TypeVar('ArgumentType', str, int)] 52 53 def __init__(self, probe, args): 54 valmap = {int: lambda x: int(x, 16), 55 str: lambda x: x.strip().strip("'")} 56 self.name = probe.name 57 self.args = {} 58 for name, value in args.items(): 59 arg = probe.args.get(name) 60 if arg is None: 61 raise ValueError(f'Unexpected argument: {name}') 62 self.args[name] = valmap[arg.type](value) 63 64 65class DTrace: 66 """Generates bpftrace script based on the supplied probe points, parses its 67 output and stores is as a list of DTraceEntry sorted by their tsc. 68 """ 69 def __init__(self, probes, file=None): 70 self._avail_probes = self._list_probes() 71 self._probes = {p.name: p for p in probes} 72 self.entries = self._parse(file) if file is not None else [] 73 # Sanitize the probe definitions 74 for probe in probes: 75 if probe.name not in self._avail_probes: 76 raise ValueError(f'Couldn\'t find probe: "{probe.name}"') 77 for arg in probe.args.values(): 78 if arg.pos >= self._avail_probes[probe.name]: 79 raise ValueError('Invalid probe argument position') 80 if arg.type not in (int, str): 81 raise ValueError('Invalid argument type') 82 83 def _parse(self, file): 84 regex = re.compile(r'(\w+): (.*)') 85 entries = [] 86 87 for line in file.readlines(): 88 match = regex.match(line) 89 if match is None: 90 continue 91 name, args = match.groups() 92 probe = self._probes.get(name) 93 # Skip the line if we don't recognize the probe name 94 if probe is None: 95 continue 96 entries.append(DTraceEntry(probe, args=dict(a.strip().split('=') 97 for a in args.split(',')))) 98 entries.sort(key=lambda e: e.args['tsc']) 99 return entries 100 101 def _list_probes(self): 102 files = subprocess.check_output(['git', 'ls-files', '*.[ch]', 103 ':!:include/spdk_internal/usdt.h']) 104 files = filter(lambda f: len(f) > 0, str(files, 'ascii').split('\n')) 105 regex = re.compile(r'SPDK_DTRACE_PROBE([0-9]*)\((\w+)') 106 probes = {} 107 108 for fname in files: 109 with open(fname, 'r') as file: 110 for match in regex.finditer(file.read()): 111 nargs, name = match.group(1), match.group(2) 112 nargs = int(nargs) if len(nargs) > 0 else 0 113 # Add one to accommodate for the tsc being the first arg 114 probes[name] = nargs + 1 115 return probes 116 117 def _gen_usdt(self, probe): 118 usdt = (f'usdt:__EXE__:{probe.name} {{' + 119 f'printf("{probe.name}: ') 120 args = probe.args 121 if len(args) > 0: 122 argtype = {int: '0x%lx', str: '\'%s\''} 123 argcast = {int: lambda x: x, str: lambda x: f'str({x})'} 124 argstr = [f'{a.name}={argtype[a.type]}' for a in args.values()] 125 argval = [f'{argcast[a.type](f"arg{a.pos}")}' for a in args.values()] 126 usdt += ', '.join(argstr) + '\\n", ' + ', '.join(argval) 127 else: 128 usdt += '\\n"' 129 usdt += ');}' 130 return usdt 131 132 def generate(self): 133 return '\n'.join([self._gen_usdt(p) for p in self._probes.values()]) 134 135 def record(self, pid): 136 with tempfile.NamedTemporaryFile(mode='w+') as script: 137 script.write(self.generate()) 138 script.flush() 139 try: 140 subprocess.run([f'{os.path.dirname(__file__)}/../bpftrace.sh', 141 f'{pid}', f'{script.name}']) 142 except KeyboardInterrupt: 143 pass 144 145 146@dataclass 147class TracepointArgument: 148 """Describes an SPDK tracepoint argument""" 149 TYPE_INT = 0 150 TYPE_PTR = 1 151 TYPE_STR = 2 152 name: str 153 argtype: int 154 155 156@dataclass 157class Tracepoint: 158 """Describes an SPDK tracepoint, equivalent to struct spdk_trace_tpoint""" 159 name: str 160 id: int 161 new_object: bool 162 object_type: int 163 owner_type: int 164 args: List[TracepointArgument] 165 166 167@dataclass 168class TraceEntry: 169 """Describes an SPDK tracepoint entry, equivalent to struct spdk_trace_entry""" 170 lcore: int 171 tpoint: Tracepoint 172 tsc: int 173 poller: str 174 size: int 175 object_id: str 176 object_ptr: int 177 time: int 178 args: Dict[str, TypeVar('ArgumentType', str, int)] 179 related: str 180 181 182class TraceProvider: 183 """Defines interface for objects providing traces and tracepoint definitions""" 184 185 def tpoints(self): 186 """Returns tracepoint definitions as a dict of (tracepoint_name, tracepoint)""" 187 raise NotImplementedError() 188 189 def entries(self): 190 """Generator returning subsequent trace entries""" 191 raise NotImplementedError() 192 193 def tsc_rate(self): 194 """Returns the TSC rate that was in place when traces were collected""" 195 raise NotImplementedError() 196 197 198class JsonProvider(TraceProvider): 199 """Trace provider based on JSON-formatted output produced by spdk_trace app""" 200 def __init__(self, file): 201 self._parser = ijson.parse(file) 202 self._tpoints = {} 203 self._parse_defs() 204 205 def _parse_tpoints(self, tpoints): 206 for tpoint in tpoints: 207 tpoint_id = tpoint['id'] 208 self._tpoints[tpoint_id] = Tracepoint( 209 name=tpoint['name'], id=tpoint_id, 210 new_object=tpoint['new_object'], object_type=OBJECT_NONE, 211 owner_type=OWNER_NONE, 212 args=[TracepointArgument(name=a['name'], 213 argtype=a['type']) 214 for a in tpoint.get('args', [])]) 215 216 def _parse_defs(self): 217 builder = None 218 for prefix, event, value in self._parser: 219 # If we reach entries array, there are no more tracepoint definitions 220 if prefix == 'entries': 221 break 222 elif prefix == 'tsc_rate': 223 self._tsc_rate = value 224 continue 225 226 if (prefix, event) == ('tpoints', 'start_array'): 227 builder = ijson.ObjectBuilder() 228 if builder is not None: 229 builder.event(event, value) 230 if (prefix, event) == ('tpoints', 'end_array'): 231 self._parse_tpoints(builder.value) 232 builder = None 233 234 def _parse_entry(self, entry): 235 tpoint = self._tpoints[entry['tpoint']] 236 obj = entry.get('object', {}) 237 return TraceEntry(tpoint=tpoint, lcore=entry['lcore'], tsc=entry['tsc'], 238 size=entry.get('size'), object_id=obj.get('id'), 239 object_ptr=obj.get('value'), related=entry.get('related'), 240 time=obj.get('time'), poller=entry.get('poller'), 241 args={n.name: v for n, v in zip(tpoint.args, entry.get('args', []))}) 242 243 def tsc_rate(self): 244 return self._tsc_rate 245 246 def tpoints(self): 247 return self._tpoints 248 249 def entries(self): 250 builder = None 251 for prefix, event, value in self._parser: 252 if (prefix, event) == ('entries.item', 'start_map'): 253 builder = ijson.ObjectBuilder() 254 if builder is not None: 255 builder.event(event, value) 256 if (prefix, event) == ('entries.item', 'end_map'): 257 yield self._parse_entry(builder.value) 258 builder = None 259 260 261class CParserOpts(ct.Structure): 262 _fields_ = [('filename', ct.c_char_p), 263 ('mode', ct.c_int), 264 ('lcore', ct.c_uint16)] 265 266 267class CTraceOwner(ct.Structure): 268 _fields_ = [('type', ct.c_uint8), 269 ('id_prefix', ct.c_char)] 270 271 272class CTraceObject(ct.Structure): 273 _fields_ = [('type', ct.c_uint8), 274 ('id_prefix', ct.c_char)] 275 276 277class CTpointArgument(ct.Structure): 278 _fields_ = [('name', ct.c_char * 14), 279 ('type', ct.c_uint8), 280 ('size', ct.c_uint8)] 281 282 283class CTpointRelatedObject(ct.Structure): 284 _fields_ = [('object_type', ct.c_uint8), 285 ('arg_index', ct.c_uint8)] 286 287 288class CTracepoint(ct.Structure): 289 _fields_ = [('name', ct.c_char * 24), 290 ('tpoint_id', ct.c_uint16), 291 ('owner_type', ct.c_uint8), 292 ('object_type', ct.c_uint8), 293 ('new_object', ct.c_uint8), 294 ('num_args', ct.c_uint8), 295 ('args', CTpointArgument * TRACE_MAX_ARGS_COUNT), 296 ('related_objects', CTpointRelatedObject * TRACE_MAX_RELATIONS)] 297 298 299class CTraceFlags(ct.Structure): 300 _fields_ = [('tsc_rate', ct.c_uint64), 301 ('tpoint_mask', ct.c_uint64 * TRACE_MAX_GROUP_ID), 302 ('owner', CTraceOwner * (UCHAR_MAX + 1)), 303 ('object', CTraceObject * (UCHAR_MAX + 1)), 304 ('tpoint', CTracepoint * TRACE_MAX_TPOINT_ID)] 305 306 307class CTraceEntry(ct.Structure): 308 _fields_ = [('tsc', ct.c_uint64), 309 ('tpoint_id', ct.c_uint16), 310 ('poller_id', ct.c_uint16), 311 ('size', ct.c_uint32), 312 ('object_id', ct.c_uint64)] 313 314 315class CTraceParserArgument(ct.Union): 316 _fields_ = [('integer', ct.c_uint64), 317 ('pointer', ct.c_void_p), 318 ('string', ct.c_char * (UCHAR_MAX + 1))] 319 320 321class CTraceParserEntry(ct.Structure): 322 _fields_ = [('entry', ct.POINTER(CTraceEntry)), 323 ('object_index', ct.c_uint64), 324 ('object_start', ct.c_uint64), 325 ('lcore', ct.c_uint16), 326 ('related_index', ct.c_uint64), 327 ('related_type', ct.c_uint8), 328 ('args', CTraceParserArgument * TRACE_MAX_ARGS_COUNT)] 329 330 331class NativeProvider(TraceProvider): 332 """Trace provider based on SPDK's trace library""" 333 def __init__(self, file): 334 self._setup_binding(file.name) 335 self._parse_defs() 336 337 def __del__(self): 338 if hasattr(self, '_parser'): 339 self._lib.spdk_trace_parser_cleanup(self._parser) 340 341 def _setup_binding(self, filename): 342 self._lib = ct.CDLL('build/lib/libspdk_trace_parser.so') 343 self._lib.spdk_trace_parser_init.restype = ct.c_void_p 344 self._lib.spdk_trace_parser_init.errcheck = lambda r, *_: ct.c_void_p(r) 345 self._lib.spdk_trace_parser_get_flags.restype = ct.POINTER(CTraceFlags) 346 opts = CParserOpts(filename=bytes(filename, 'ascii'), mode=0, 347 lcore=TRACE_MAX_LCORE) 348 self._parser = self._lib.spdk_trace_parser_init(ct.byref(opts)) 349 if not self._parser: 350 raise ValueError('Failed to construct SPDK trace parser') 351 352 def _parse_tpoints(self, tpoints): 353 self._tpoints = {} 354 for tpoint in tpoints: 355 if len(tpoint.name) == 0: 356 continue 357 self._tpoints[tpoint.tpoint_id] = Tracepoint( 358 name=str(tpoint.name, 'ascii'), object_type=tpoint.object_type, 359 owner_type=tpoint.owner_type, id=tpoint.tpoint_id, 360 new_object=bool(tpoint.new_object), 361 args=[TracepointArgument(name=str(a.name, 'ascii'), argtype=a.type) 362 for a in tpoint.args[:tpoint.num_args]]) 363 364 def _parse_defs(self): 365 flags = self._lib.spdk_trace_parser_get_flags(self._parser) 366 self._tsc_rate = flags.contents.tsc_rate 367 self._parse_tpoints(flags.contents.tpoint) 368 369 def conv_objs(arr): 370 return {int(o.type): str(o.id_prefix, 'ascii') for o in arr if o.id_prefix != b'\x00'} 371 self._owners = conv_objs(flags.contents.owner) 372 self._objects = conv_objs(flags.contents.object) 373 374 def tsc_rate(self): 375 return self._tsc_rate 376 377 def tpoints(self): 378 return self._tpoints 379 380 def entries(self): 381 pe = CTraceParserEntry() 382 argconv = {TracepointArgument.TYPE_INT: lambda a: a.integer, 383 TracepointArgument.TYPE_PTR: lambda a: int(a.pointer or 0), 384 TracepointArgument.TYPE_STR: lambda a: str(a.string, 'ascii')} 385 386 while self._lib.spdk_trace_parser_next_entry(self._parser, ct.byref(pe)): 387 entry = pe.entry.contents 388 lcore = pe.lcore 389 tpoint = self._tpoints[entry.tpoint_id] 390 args = {a.name: argconv[a.argtype](pe.args[i]) for i, a in enumerate(tpoint.args)} 391 392 if tpoint.object_type != OBJECT_NONE: 393 if pe.object_index != TRACE_INVALID_OBJECT: 394 object_id = '{}{}'.format(self._objects[tpoint.object_type], pe.object_index) 395 ts = entry.tsc - pe.object_start 396 else: 397 object_id, ts = 'n/a', None 398 elif entry.object_id != 0: 399 object_id, ts = '{:x}'.format(entry.object_id), None 400 else: 401 object_id, ts = None, None 402 403 if tpoint.owner_type != OWNER_NONE: 404 poller_id = '{}{:02}'.format(self._owners[tpoint.owner_type], entry.poller_id) 405 else: 406 poller_id = None 407 408 if pe.related_type != OBJECT_NONE: 409 related = '{}{}'.format(self._objects[pe.related_type], pe.related_index) 410 else: 411 related = None 412 413 yield TraceEntry(tpoint=tpoint, lcore=lcore, tsc=entry.tsc, 414 size=entry.size, object_id=object_id, 415 object_ptr=entry.object_id, poller=poller_id, time=ts, 416 args=args, related=related) 417 418 419class Trace: 420 """Stores, parses, and prints out SPDK traces""" 421 def __init__(self, file): 422 if file == sys.stdin or magic.from_file(file.name, mime=True) == 'application/json': 423 self._provider = JsonProvider(file) 424 else: 425 self._provider = NativeProvider(file) 426 self._objects = [] 427 self._argfmt = {TracepointArgument.TYPE_PTR: lambda a: f'0x{a:x}'} 428 self.tpoints = self._provider.tpoints() 429 430 def _annotate_args(self, entry): 431 annotations = {} 432 for obj in self._objects: 433 current = obj.annotate(entry) 434 if current is None: 435 continue 436 annotations.update(current) 437 return annotations 438 439 def _format_args(self, entry): 440 annotations = self._annotate_args(entry) 441 args = [] 442 for arg, (name, value) in zip(entry.tpoint.args, entry.args.items()): 443 annot = annotations.get(name) 444 if annot is not None: 445 args.append('{}({})'.format(name, ', '.join(f'{n}={v}' for n, v in annot.items()))) 446 else: 447 args.append('{}: {}'.format(name, self._argfmt.get(arg.argtype, 448 lambda a: a)(value))) 449 return args 450 451 def register_object(self, obj): 452 self._objects.append(obj) 453 454 def print(self): 455 def get_us(tsc, off): 456 return ((tsc - off) * 10 ** 6) / self._provider.tsc_rate() 457 458 offset = None 459 for e in self._provider.entries(): 460 offset = e.tsc if offset is None else offset 461 timestamp = get_us(e.tsc, offset) 462 diff = get_us(e.time, 0) if e.time is not None else None 463 args = ', '.join(self._format_args(e)) 464 related = ' (' + e.related + ')' if e.related is not None else '' 465 466 print(('{:3} {:16.3f} {:3} {:24} {:12}'.format( 467 e.lcore, timestamp, e.poller if e.poller is not None else '', 468 e.tpoint.name, f'size: {e.size}' if e.size else '') + 469 (f'id: {e.object_id + related:12} ' if e.object_id is not None else '') + 470 (f'time: {diff:<8.3f} ' if diff is not None else '') + 471 args).rstrip()) 472 473 474class SPDKObject: 475 """Describes a specific type of an SPDK objects (e.g. qpair, thread, etc.)""" 476 @dataclass 477 class Lifetime: 478 """Describes a lifetime and properties of a particular SPDK object.""" 479 begin: int 480 end: int 481 ptr: int 482 properties: dict = field(default_factory=dict) 483 484 def __init__(self, trace: Trace, tpoints: List[str]): 485 self.tpoints = {} 486 for name in tpoints: 487 tpoint = next((t for t in trace.tpoints.values() if t.name == name), None) 488 if tpoint is None: 489 # Some tpoints might be undefined if configured without specific subsystems 490 continue 491 self.tpoints[tpoint.id] = tpoint 492 493 def _annotate(self, entry: TraceEntry): 494 """Abstract annotation method to be implemented by subclasses.""" 495 raise NotImplementedError() 496 497 def annotate(self, entry: TraceEntry): 498 """Annotates a tpoint entry and returns a dict indexed by argname with values representing 499 various object properties. For instance, {"qpair": {"qid": 1, "subnqn": "nqn"}} could be 500 returned to annotate an argument called "qpair" with two items: "qid" and "subnqn". 501 """ 502 if entry.tpoint.id not in self.tpoints: 503 return None 504 return self._annotate(entry) 505 506 507class QPair(SPDKObject): 508 def __init__(self, trace: Trace, dtrace: DTrace): 509 super().__init__(trace, tpoints=[ 510 'RDMA_REQ_NEW', 511 'RDMA_REQ_NEED_BUFFER', 512 'RDMA_REQ_TX_PENDING_C2H', 513 'RDMA_REQ_TX_PENDING_H2C', 514 'RDMA_REQ_TX_H2C', 515 'RDMA_REQ_RDY_TO_EXECUTE', 516 'RDMA_REQ_EXECUTING', 517 'RDMA_REQ_EXECUTED', 518 'RDMA_REQ_RDY_TO_COMPL', 519 'RDMA_REQ_COMPLETING_C2H', 520 'RDMA_REQ_COMPLETING', 521 'RDMA_REQ_COMPLETED', 522 'TCP_REQ_NEW', 523 'TCP_REQ_NEED_BUFFER', 524 'TCP_REQ_TX_H_TO_C', 525 'TCP_REQ_RDY_TO_EXECUTE', 526 'TCP_REQ_EXECUTING', 527 'TCP_REQ_EXECUTED', 528 'TCP_REQ_RDY_TO_COMPLETE', 529 'TCP_REQ_TRANSFER_C2H', 530 'TCP_REQ_COMPLETED', 531 'TCP_WRITE_START', 532 'TCP_WRITE_DONE', 533 'TCP_READ_DONE', 534 'TCP_REQ_AWAIT_R2T_ACK']) 535 self._objects = [] 536 self._find_objects(dtrace.entries) 537 538 def _find_objects(self, dprobes): 539 def probe_match(probe, other): 540 return probe.args['qpair'] == other.args['qpair'] 541 542 for i, dprobe in enumerate(dprobes): 543 if dprobe.name != 'nvmf_poll_group_add_qpair': 544 continue 545 # We've found a new qpair, now find the probe indicating its destruction 546 last_idx, last = next((((i + j + 1), d) for j, d in enumerate(islice(dprobes, i, None)) 547 if d.name == 'nvmf_poll_group_remove_qpair' and 548 probe_match(d, dprobe)), (None, None)) 549 obj = SPDKObject.Lifetime(begin=dprobe.args['tsc'], 550 end=last.args['tsc'] if last is not None else TSC_MAX, 551 ptr=dprobe.args['qpair'], 552 properties={'ptr': hex(dprobe.args['qpair']), 553 'thread': dprobe.args['thread']}) 554 for other in filter(lambda p: probe_match(p, dprobe), dprobes[i:last_idx]): 555 if other.name == 'nvmf_ctrlr_add_qpair': 556 for prop in ['qid', 'subnqn', 'hostnqn']: 557 obj.properties[prop] = other.args[prop] 558 self._objects.append(obj) 559 560 def _annotate(self, entry): 561 qpair = entry.args.get('qpair') 562 if qpair is None: 563 return None 564 for obj in self._objects: 565 if obj.ptr == qpair and obj.begin <= entry.tsc <= obj.end: 566 return {'qpair': obj.properties} 567 return None 568 569 570def build_dtrace(file=None): 571 return DTrace([ 572 DTraceProbe( 573 name='nvmf_poll_group_add_qpair', 574 args=[DTraceArgument(name='tsc', pos=0, type=int), 575 DTraceArgument(name='qpair', pos=1, type=int), 576 DTraceArgument(name='thread', pos=2, type=int)]), 577 DTraceProbe( 578 name='nvmf_poll_group_remove_qpair', 579 args=[DTraceArgument(name='tsc', pos=0, type=int), 580 DTraceArgument(name='qpair', pos=1, type=int), 581 DTraceArgument(name='thread', pos=2, type=int)]), 582 DTraceProbe( 583 name='nvmf_ctrlr_add_qpair', 584 args=[DTraceArgument(name='tsc', pos=0, type=int), 585 DTraceArgument(name='qpair', pos=1, type=int), 586 DTraceArgument(name='qid', pos=2, type=int), 587 DTraceArgument(name='subnqn', pos=3, type=str), 588 DTraceArgument(name='hostnqn', pos=4, type=str)])], file) 589 590 591def print_trace(trace_file, dtrace_file): 592 dtrace = build_dtrace(dtrace_file) 593 trace = Trace(trace_file) 594 trace.register_object(QPair(trace, dtrace)) 595 trace.print() 596 597 598def main(argv): 599 parser = ArgumentParser(description='SPDK trace annotation script') 600 parser.add_argument('-i', '--input', 601 help='Trace file to annotate (either JSON generated by spdk_trace or ' + 602 'raw binary produced by the SPDK application itself)') 603 parser.add_argument('-g', '--generate', help='Generate bpftrace script', action='store_true') 604 parser.add_argument('-r', '--record', help='Record BPF traces on PID', metavar='PID', type=int) 605 parser.add_argument('-b', '--bpftrace', help='BPF trace script to use for annotations') 606 args = parser.parse_args(argv) 607 608 if args.generate: 609 print(build_dtrace().generate()) 610 elif args.record: 611 build_dtrace().record(args.record) 612 else: 613 print_trace(open(args.input, 'r') if args.input is not None else sys.stdin, 614 open(args.bpftrace) if args.bpftrace is not None else None) 615 616 617if __name__ == '__main__': 618 # In order for the changes to LD_LIBRARY_PATH to be visible to the loader, 619 # they need to be applied before starting a process, so we need to 620 # re-execute the script after updating it. 621 if os.environ.get('SPDK_BPF_TRACE_PY') is None: 622 rootdir = f'{os.path.dirname(__file__)}/../..' 623 os.environ['LD_LIBRARY_PATH'] = ':'.join([os.environ.get('LD_LIBRARY_PATH', ''), 624 f'{rootdir}/build/lib']) 625 os.environ['SPDK_BPF_TRACE_PY'] = '1' 626 os.execv(sys.argv[0], sys.argv) 627 else: 628 try: 629 main(sys.argv[1:]) 630 except (KeyboardInterrupt, BrokenPipeError): 631 pass 632