1#!/usr/bin/env python3 2 3from argparse import ArgumentParser 4from dataclasses import dataclass, field 5from itertools import islice 6from typing import Dict, List, TypeVar 7import ctypes as ct 8import ijson 9import magic 10import os 11import re 12import subprocess 13import sys 14import tempfile 15 16TSC_MAX = (1 << 64) - 1 17UCHAR_MAX = (1 << 8) - 1 18TRACE_MAX_LCORE = 128 19TRACE_MAX_GROUP_ID = 16 20TRACE_MAX_TPOINT_ID = TRACE_MAX_GROUP_ID * 64 21TRACE_MAX_ARGS_COUNT = 5 22TRACE_INVALID_OBJECT = (1 << 64) - 1 23OBJECT_NONE = 0 24OWNER_NONE = 0 25 26 27@dataclass 28class DTraceArgument: 29 """Describes a DTrace probe (usdt) argument""" 30 name: str 31 pos: int 32 type: type 33 34 35@dataclass 36class DTraceProbe: 37 """Describes a DTrace probe (usdt) point""" 38 name: str 39 args: Dict[str, DTraceArgument] 40 41 def __init__(self, name, args): 42 self.name = name 43 self.args = {a.name: a for a in args} 44 45 46@dataclass 47class DTraceEntry: 48 """Describes a single DTrace probe invocation""" 49 name: str 50 args: Dict[str, TypeVar('ArgumentType', str, int)] 51 52 def __init__(self, probe, args): 53 valmap = {int: lambda x: int(x, 16), 54 str: lambda x: x.strip().strip("'")} 55 self.name = probe.name 56 self.args = {} 57 for name, value in args.items(): 58 arg = probe.args.get(name) 59 if arg is None: 60 raise ValueError(f'Unexpected argument: {name}') 61 self.args[name] = valmap[arg.type](value) 62 63 64class DTrace: 65 """Generates bpftrace script based on the supplied probe points, parses its 66 output and stores is as a list of DTraceEntry sorted by their tsc. 67 """ 68 def __init__(self, probes, file=None): 69 self._avail_probes = self._list_probes() 70 self._probes = {p.name: p for p in probes} 71 self.entries = self._parse(file) if file is not None else [] 72 # Sanitize the probe definitions 73 for probe in probes: 74 if probe.name not in self._avail_probes: 75 raise ValueError(f'Couldn\'t find probe: "{probe.name}"') 76 for arg in probe.args.values(): 77 if arg.pos >= self._avail_probes[probe.name]: 78 raise ValueError('Invalid probe argument position') 79 if arg.type not in (int, str): 80 raise ValueError('Invalid argument type') 81 82 def _parse(self, file): 83 regex = re.compile(r'(\w+): (.*)') 84 entries = [] 85 86 for line in file.readlines(): 87 match = regex.match(line) 88 if match is None: 89 continue 90 name, args = match.groups() 91 probe = self._probes.get(name) 92 # Skip the line if we don't recognize the probe name 93 if probe is None: 94 continue 95 entries.append(DTraceEntry(probe, args=dict(a.strip().split('=') 96 for a in args.split(',')))) 97 entries.sort(key=lambda e: e.args['tsc']) 98 return entries 99 100 def _list_probes(self): 101 files = subprocess.check_output(['git', 'ls-files', '*.[ch]', 102 ':!:include/spdk_internal/usdt.h']) 103 files = filter(lambda f: len(f) > 0, str(files, 'ascii').split('\n')) 104 regex = re.compile(r'SPDK_DTRACE_PROBE([0-9]*)\((\w+)') 105 probes = {} 106 107 for fname in files: 108 with open(fname, 'r') as file: 109 for match in regex.finditer(file.read()): 110 nargs, name = match.group(1), match.group(2) 111 nargs = int(nargs) if len(nargs) > 0 else 0 112 # Add one to accommodate for the tsc being the first arg 113 probes[name] = nargs + 1 114 return probes 115 116 def _gen_usdt(self, probe): 117 usdt = (f'usdt:__EXE__:{probe.name} {{' + 118 f'printf("{probe.name}: ') 119 args = probe.args 120 if len(args) > 0: 121 argtype = {int: '0x%lx', str: '\'%s\''} 122 argcast = {int: lambda x: x, str: lambda x: f'str({x})'} 123 argstr = [f'{a.name}={argtype[a.type]}' for a in args.values()] 124 argval = [f'{argcast[a.type](f"arg{a.pos}")}' for a in args.values()] 125 usdt += ', '.join(argstr) + '\\n", ' + ', '.join(argval) 126 else: 127 usdt += '\\n"' 128 usdt += ');}' 129 return usdt 130 131 def generate(self): 132 return '\n'.join([self._gen_usdt(p) for p in self._probes.values()]) 133 134 def record(self, pid): 135 with tempfile.NamedTemporaryFile(mode='w+') as script: 136 script.write(self.generate()) 137 script.flush() 138 try: 139 subprocess.run([f'{os.path.dirname(__file__)}/../bpftrace.sh', 140 f'{pid}', f'{script.name}']) 141 except KeyboardInterrupt: 142 pass 143 144 145@dataclass 146class TracepointArgument: 147 """Describes an SPDK tracepoint argument""" 148 TYPE_INT = 0 149 TYPE_PTR = 1 150 TYPE_STR = 2 151 name: str 152 argtype: int 153 154 155@dataclass 156class Tracepoint: 157 """Describes an SPDK tracepoint, equivalent to struct spdk_trace_tpoint""" 158 name: str 159 id: int 160 new_object: bool 161 object_type: int 162 owner_type: int 163 args: List[TracepointArgument] 164 165 166@dataclass 167class TraceEntry: 168 """Describes an SPDK tracepoint entry, equivalent to struct spdk_trace_entry""" 169 lcore: int 170 tpoint: Tracepoint 171 tsc: int 172 poller: str 173 size: int 174 object_id: str 175 object_ptr: int 176 time: int 177 args: Dict[str, TypeVar('ArgumentType', str, int)] 178 179 180class TraceProvider: 181 """Defines interface for objects providing traces and tracepoint definitions""" 182 183 def tpoints(self): 184 """Returns tracepoint definitions as a dict of (tracepoint_name, tracepoint)""" 185 raise NotImplementedError() 186 187 def entries(self): 188 """Generator returning subsequent trace entries""" 189 raise NotImplementedError() 190 191 def tsc_rate(self): 192 """Returns the TSC rate that was in place when traces were collected""" 193 raise NotImplementedError() 194 195 196class JsonProvider(TraceProvider): 197 """Trace provider based on JSON-formatted output produced by spdk_trace app""" 198 def __init__(self, file): 199 self._parser = ijson.parse(file) 200 self._tpoints = {} 201 self._parse_defs() 202 203 def _parse_tpoints(self, tpoints): 204 for tpoint in tpoints: 205 tpoint_id = tpoint['id'] 206 self._tpoints[tpoint_id] = Tracepoint( 207 name=tpoint['name'], id=tpoint_id, 208 new_object=tpoint['new_object'], object_type=OBJECT_NONE, 209 owner_type=OWNER_NONE, 210 args=[TracepointArgument(name=a['name'], 211 argtype=a['type']) 212 for a in tpoint.get('args', [])]) 213 214 def _parse_defs(self): 215 builder = None 216 for prefix, event, value in self._parser: 217 # If we reach entries array, there are no more tracepoint definitions 218 if prefix == 'entries': 219 break 220 elif prefix == 'tsc_rate': 221 self._tsc_rate = value 222 continue 223 224 if (prefix, event) == ('tpoints', 'start_array'): 225 builder = ijson.ObjectBuilder() 226 if builder is not None: 227 builder.event(event, value) 228 if (prefix, event) == ('tpoints', 'end_array'): 229 self._parse_tpoints(builder.value) 230 builder = None 231 232 def _parse_entry(self, entry): 233 tpoint = self._tpoints[entry['tpoint']] 234 obj = entry.get('object', {}) 235 return TraceEntry(tpoint=tpoint, lcore=entry['lcore'], tsc=entry['tsc'], 236 size=entry.get('size'), object_id=obj.get('id'), 237 object_ptr=obj.get('value'), time=obj.get('time'), 238 poller=entry.get('poller'), 239 args={n.name: v for n, v in zip(tpoint.args, entry.get('args', []))}) 240 241 def tsc_rate(self): 242 return self._tsc_rate 243 244 def tpoints(self): 245 return self._tpoints 246 247 def entries(self): 248 builder = None 249 for prefix, event, value in self._parser: 250 if (prefix, event) == ('entries.item', 'start_map'): 251 builder = ijson.ObjectBuilder() 252 if builder is not None: 253 builder.event(event, value) 254 if (prefix, event) == ('entries.item', 'end_map'): 255 yield self._parse_entry(builder.value) 256 builder = None 257 258 259class CParserOpts(ct.Structure): 260 _fields_ = [('filename', ct.c_char_p), 261 ('mode', ct.c_int), 262 ('lcore', ct.c_uint16)] 263 264 265class CTraceOwner(ct.Structure): 266 _fields_ = [('type', ct.c_uint8), 267 ('id_prefix', ct.c_char)] 268 269 270class CTraceObject(ct.Structure): 271 _fields_ = [('type', ct.c_uint8), 272 ('id_prefix', ct.c_char)] 273 274 275class CTpointArgument(ct.Structure): 276 _fields_ = [('name', ct.c_char * 14), 277 ('type', ct.c_uint8), 278 ('size', ct.c_uint8)] 279 280 281class CTracepoint(ct.Structure): 282 _fields_ = [('name', ct.c_char * 24), 283 ('tpoint_id', ct.c_uint16), 284 ('owner_type', ct.c_uint8), 285 ('object_type', ct.c_uint8), 286 ('new_object', ct.c_uint8), 287 ('num_args', ct.c_uint8), 288 ('args', CTpointArgument * TRACE_MAX_ARGS_COUNT)] 289 290 291class CTraceFlags(ct.Structure): 292 _fields_ = [('tsc_rate', ct.c_uint64), 293 ('tpoint_mask', ct.c_uint64 * TRACE_MAX_GROUP_ID), 294 ('owner', CTraceOwner * (UCHAR_MAX + 1)), 295 ('object', CTraceObject * (UCHAR_MAX + 1)), 296 ('tpoint', CTracepoint * TRACE_MAX_TPOINT_ID)] 297 298 299class CTraceEntry(ct.Structure): 300 _fields_ = [('tsc', ct.c_uint64), 301 ('tpoint_id', ct.c_uint16), 302 ('poller_id', ct.c_uint16), 303 ('size', ct.c_uint32), 304 ('object_id', ct.c_uint64)] 305 306 307class CTraceParserArgument(ct.Union): 308 _fields_ = [('integer', ct.c_uint64), 309 ('pointer', ct.c_void_p), 310 ('string', ct.c_char * (UCHAR_MAX + 1))] 311 312 313class CTraceParserEntry(ct.Structure): 314 _fields_ = [('entry', ct.POINTER(CTraceEntry)), 315 ('object_index', ct.c_uint64), 316 ('object_start', ct.c_uint64), 317 ('lcore', ct.c_uint16), 318 ('args', CTraceParserArgument * TRACE_MAX_ARGS_COUNT)] 319 320 321class NativeProvider(TraceProvider): 322 """Trace provider based on SPDK's trace library""" 323 def __init__(self, file): 324 self._setup_binding(file.name) 325 self._parse_defs() 326 327 def __del__(self): 328 if hasattr(self, '_parser'): 329 self._lib.spdk_trace_parser_cleanup(self._parser) 330 331 def _setup_binding(self, filename): 332 self._lib = ct.CDLL('build/lib/libspdk_trace_parser.so') 333 self._lib.spdk_trace_parser_init.restype = ct.c_void_p 334 self._lib.spdk_trace_parser_init.errcheck = lambda r, *_: ct.c_void_p(r) 335 self._lib.spdk_trace_parser_get_flags.restype = ct.POINTER(CTraceFlags) 336 opts = CParserOpts(filename=bytes(filename, 'ascii'), mode=0, 337 lcore=TRACE_MAX_LCORE) 338 self._parser = self._lib.spdk_trace_parser_init(ct.byref(opts)) 339 if not self._parser: 340 raise ValueError('Failed to construct SPDK trace parser') 341 342 def _parse_tpoints(self, tpoints): 343 self._tpoints = {} 344 for tpoint in tpoints: 345 if len(tpoint.name) == 0: 346 continue 347 self._tpoints[tpoint.tpoint_id] = Tracepoint( 348 name=str(tpoint.name, 'ascii'), object_type=tpoint.object_type, 349 owner_type=tpoint.owner_type, id=tpoint.tpoint_id, 350 new_object=bool(tpoint.new_object), 351 args=[TracepointArgument(name=str(a.name, 'ascii'), argtype=a.type) 352 for a in tpoint.args[:tpoint.num_args]]) 353 354 def _parse_defs(self): 355 flags = self._lib.spdk_trace_parser_get_flags(self._parser) 356 self._tsc_rate = flags.contents.tsc_rate 357 self._parse_tpoints(flags.contents.tpoint) 358 359 def conv_objs(arr): 360 return {int(o.type): str(o.id_prefix, 'ascii') for o in arr if o.id_prefix != b'\x00'} 361 self._owners = conv_objs(flags.contents.owner) 362 self._objects = conv_objs(flags.contents.object) 363 364 def tsc_rate(self): 365 return self._tsc_rate 366 367 def tpoints(self): 368 return self._tpoints 369 370 def entries(self): 371 pe = CTraceParserEntry() 372 argconv = {TracepointArgument.TYPE_INT: lambda a: a.integer, 373 TracepointArgument.TYPE_PTR: lambda a: int(a.pointer or 0), 374 TracepointArgument.TYPE_STR: lambda a: str(a.string, 'ascii')} 375 376 while self._lib.spdk_trace_parser_next_entry(self._parser, ct.byref(pe)): 377 entry = pe.entry.contents 378 lcore = pe.lcore 379 tpoint = self._tpoints[entry.tpoint_id] 380 args = {a.name: argconv[a.argtype](pe.args[i]) for i, a in enumerate(tpoint.args)} 381 382 if tpoint.object_type != OBJECT_NONE: 383 if pe.object_index != TRACE_INVALID_OBJECT: 384 object_id = '{}{}'.format(self._objects[tpoint.object_type], pe.object_index) 385 ts = entry.tsc - pe.object_start 386 else: 387 object_id, ts = 'n/a', None 388 elif entry.object_id != 0: 389 object_id, ts = '{:x}'.format(entry.object_id), None 390 else: 391 object_id, ts = None, None 392 393 if tpoint.owner_type != OWNER_NONE: 394 poller_id = '{}{:02}'.format(self._owners[tpoint.owner_type], entry.poller_id) 395 else: 396 poller_id = None 397 398 yield TraceEntry(tpoint=tpoint, lcore=lcore, tsc=entry.tsc, 399 size=entry.size, object_id=object_id, 400 object_ptr=entry.object_id, poller=poller_id, time=ts, 401 args=args) 402 403 404class Trace: 405 """Stores, parses, and prints out SPDK traces""" 406 def __init__(self, file): 407 if file == sys.stdin or magic.from_file(file.name, mime=True) == 'application/json': 408 self._provider = JsonProvider(file) 409 else: 410 self._provider = NativeProvider(file) 411 self._objects = [] 412 self._argfmt = {TracepointArgument.TYPE_PTR: lambda a: f'0x{a:x}'} 413 self.tpoints = self._provider.tpoints() 414 415 def _annotate_args(self, entry): 416 annotations = {} 417 for obj in self._objects: 418 current = obj.annotate(entry) 419 if current is None: 420 continue 421 annotations.update(current) 422 return annotations 423 424 def _format_args(self, entry): 425 annotations = self._annotate_args(entry) 426 args = [] 427 for arg, (name, value) in zip(entry.tpoint.args, entry.args.items()): 428 annot = annotations.get(name) 429 if annot is not None: 430 args.append('{}({})'.format(name, ', '.join(f'{n}={v}' for n, v in annot.items()))) 431 else: 432 args.append('{}: {}'.format(name, self._argfmt.get(arg.argtype, 433 lambda a: a)(value))) 434 return args 435 436 def register_object(self, obj): 437 self._objects.append(obj) 438 439 def print(self): 440 def get_us(tsc, off): 441 return ((tsc - off) * 10 ** 6) / self._provider.tsc_rate() 442 443 offset = None 444 for e in self._provider.entries(): 445 offset = e.tsc if offset is None else offset 446 timestamp = get_us(e.tsc, offset) 447 diff = get_us(e.time, 0) if e.time is not None else None 448 args = ', '.join(self._format_args(e)) 449 450 print(('{:3} {:16.3f} {:3} {:24} {:12}'.format( 451 e.lcore, timestamp, e.poller if e.poller is not None else '', 452 e.tpoint.name, f'size: {e.size}' if e.size else '') + 453 (f'id: {e.object_id:8} ' if e.object_id is not None else '') + 454 (f'time: {diff:<8.3f} ' if diff is not None else '') + 455 args).rstrip()) 456 457 458class SPDKObject: 459 """Describes a specific type of an SPDK objects (e.g. qpair, thread, etc.)""" 460 @dataclass 461 class Lifetime: 462 """Describes a lifetime and properites of a particular SPDK object.""" 463 begin: int 464 end: int 465 ptr: int 466 properties: dict = field(default_factory=dict) 467 468 def __init__(self, trace: Trace, tpoints: List[str]): 469 self.tpoints = {} 470 for name in tpoints: 471 tpoint = next((t for t in trace.tpoints.values() if t.name == name), None) 472 if tpoint is None: 473 # Some tpoints might be undefined if configured without specific subystems 474 continue 475 self.tpoints[tpoint.id] = tpoint 476 477 def _annotate(self, entry: TraceEntry): 478 """Abstract annotation method to be implemented by subclasses.""" 479 raise NotImplementedError() 480 481 def annotate(self, entry: TraceEntry): 482 """Annotates a tpoint entry and returns a dict indexed by argname with values representing 483 various object properites. For instance, {"qpair": {"qid": 1, "subnqn": "nqn"}} could be 484 returned to annotate an argument called "qpair" with two items: "qid" and "subnqn". 485 """ 486 if entry.tpoint.id not in self.tpoints: 487 return None 488 return self._annotate(entry) 489 490 491class QPair(SPDKObject): 492 def __init__(self, trace: Trace, dtrace: DTrace): 493 super().__init__(trace, tpoints=[ 494 'RDMA_REQ_NEW', 495 'RDMA_REQ_NEED_BUFFER', 496 'RDMA_REQ_TX_PENDING_C2H', 497 'RDMA_REQ_TX_PENDING_H2C', 498 'RDMA_REQ_TX_H2C', 499 'RDMA_REQ_RDY_TO_EXECUTE', 500 'RDMA_REQ_EXECUTING', 501 'RDMA_REQ_EXECUTED', 502 'RDMA_REQ_RDY_TO_COMPL', 503 'RDMA_REQ_COMPLETING_C2H', 504 'RDMA_REQ_COMPLETING', 505 'RDMA_REQ_COMPLETED', 506 'TCP_REQ_NEW', 507 'TCP_REQ_NEED_BUFFER', 508 'TCP_REQ_TX_H_TO_C', 509 'TCP_REQ_RDY_TO_EXECUTE', 510 'TCP_REQ_EXECUTING', 511 'TCP_REQ_EXECUTED', 512 'TCP_REQ_RDY_TO_COMPLETE', 513 'TCP_REQ_TRANSFER_C2H', 514 'TCP_REQ_COMPLETED', 515 'TCP_WRITE_START', 516 'TCP_WRITE_DONE', 517 'TCP_READ_DONE', 518 'TCP_REQ_AWAIT_R2T_ACK']) 519 self._objects = [] 520 self._find_objects(dtrace.entries) 521 522 def _find_objects(self, dprobes): 523 def probe_match(probe, other): 524 return probe.args['qpair'] == other.args['qpair'] 525 526 for i, dprobe in enumerate(dprobes): 527 if dprobe.name != 'nvmf_poll_group_add_qpair': 528 continue 529 # We've found a new qpair, now find the probe indicating its destruction 530 last_idx, last = next((((i + j + 1), d) for j, d in enumerate(islice(dprobes, i, None)) 531 if d.name == 'nvmf_poll_group_remove_qpair' and 532 probe_match(d, dprobe)), (None, None)) 533 obj = SPDKObject.Lifetime(begin=dprobe.args['tsc'], 534 end=last.args['tsc'] if last is not None else TSC_MAX, 535 ptr=dprobe.args['qpair'], 536 properties={'ptr': hex(dprobe.args['qpair']), 537 'thread': dprobe.args['thread']}) 538 for other in filter(lambda p: probe_match(p, dprobe), dprobes[i:last_idx]): 539 if other.name == 'nvmf_ctrlr_add_qpair': 540 for prop in ['qid', 'subnqn', 'hostnqn']: 541 obj.properties[prop] = other.args[prop] 542 self._objects.append(obj) 543 544 def _annotate(self, entry): 545 qpair = entry.args.get('qpair') 546 if qpair is None: 547 return None 548 for obj in self._objects: 549 if obj.ptr == qpair and obj.begin <= entry.tsc <= obj.end: 550 return {'qpair': obj.properties} 551 return None 552 553 554def build_dtrace(file=None): 555 return DTrace([ 556 DTraceProbe( 557 name='nvmf_poll_group_add_qpair', 558 args=[DTraceArgument(name='tsc', pos=0, type=int), 559 DTraceArgument(name='qpair', pos=1, type=int), 560 DTraceArgument(name='thread', pos=2, type=int)]), 561 DTraceProbe( 562 name='nvmf_poll_group_remove_qpair', 563 args=[DTraceArgument(name='tsc', pos=0, type=int), 564 DTraceArgument(name='qpair', pos=1, type=int), 565 DTraceArgument(name='thread', pos=2, type=int)]), 566 DTraceProbe( 567 name='nvmf_ctrlr_add_qpair', 568 args=[DTraceArgument(name='tsc', pos=0, type=int), 569 DTraceArgument(name='qpair', pos=1, type=int), 570 DTraceArgument(name='qid', pos=2, type=int), 571 DTraceArgument(name='subnqn', pos=3, type=str), 572 DTraceArgument(name='hostnqn', pos=4, type=str)])], file) 573 574 575def print_trace(trace_file, dtrace_file): 576 dtrace = build_dtrace(dtrace_file) 577 trace = Trace(trace_file) 578 trace.register_object(QPair(trace, dtrace)) 579 trace.print() 580 581 582def main(argv): 583 parser = ArgumentParser(description='SPDK trace annotation script') 584 parser.add_argument('-i', '--input', 585 help='Trace file to annotate (either JSON generated by spdk_trace or ' + 586 'raw binary produced by the SPDK application itself)') 587 parser.add_argument('-g', '--generate', help='Generate bpftrace script', action='store_true') 588 parser.add_argument('-r', '--record', help='Record BPF traces on PID', metavar='PID', type=int) 589 parser.add_argument('-b', '--bpftrace', help='BPF trace script to use for annotations') 590 args = parser.parse_args(argv) 591 592 if args.generate: 593 print(build_dtrace().generate()) 594 elif args.record: 595 build_dtrace().record(args.record) 596 else: 597 print_trace(open(args.input, 'r') if args.input is not None else sys.stdin, 598 open(args.bpftrace) if args.bpftrace is not None else None) 599 600 601if __name__ == '__main__': 602 # In order for the changes to LD_LIBRARY_PATH to be visible to the loader, 603 # they need to be applied before starting a process, so we need to 604 # re-execute the script after updating it. 605 if os.environ.get('SPDK_BPF_TRACE_PY') is None: 606 rootdir = f'{os.path.dirname(__file__)}/../..' 607 os.environ['LD_LIBRARY_PATH'] = ':'.join([os.environ.get('LD_LIBRARY_PATH', ''), 608 f'{rootdir}/build/lib']) 609 os.environ['SPDK_BPF_TRACE_PY'] = '1' 610 os.execv(sys.argv[0], sys.argv) 611 else: 612 try: 613 main(sys.argv[1:]) 614 except (KeyboardInterrupt, BrokenPipeError): 615 pass 616