1#!/usr/bin/env python3 2 3from argparse import ArgumentParser 4from dataclasses import dataclass, field 5from itertools import islice 6from typing import Dict, List, TypeVar 7import ijson 8import os 9import re 10import subprocess 11import sys 12import tempfile 13 14TSC_MAX = (1 << 64) - 1 15 16 17@dataclass 18class DTraceArgument: 19 """Describes a DTrace probe (usdt) argument""" 20 name: str 21 pos: int 22 type: type 23 24 25@dataclass 26class DTraceProbe: 27 """Describes a DTrace probe (usdt) point""" 28 name: str 29 args: Dict[str, DTraceArgument] 30 31 def __init__(self, name, args): 32 self.name = name 33 self.args = {a.name: a for a in args} 34 35 36@dataclass 37class DTraceEntry: 38 """Describes a single DTrace probe invocation""" 39 name: str 40 args: Dict[str, TypeVar('ArgumentType', str, int)] 41 42 def __init__(self, probe, args): 43 valmap = {int: lambda x: int(x, 16), 44 str: lambda x: x.strip().strip("'")} 45 self.name = probe.name 46 self.args = {} 47 for name, value in args.items(): 48 arg = probe.args.get(name) 49 if arg is None: 50 raise ValueError(f'Unexpected argument: {name}') 51 self.args[name] = valmap[arg.type](value) 52 53 54class DTrace: 55 """Generates bpftrace script based on the supplied probe points, parses its 56 output and stores is as a list of DTraceEntry sorted by their tsc. 57 """ 58 def __init__(self, probes, file=None): 59 self._avail_probes = self._list_probes() 60 self._probes = {p.name: p for p in probes} 61 self.entries = self._parse(file) if file is not None else [] 62 # Sanitize the probe definitions 63 for probe in probes: 64 if probe.name not in self._avail_probes: 65 raise ValueError(f'Couldn\'t find probe: "{probe.name}"') 66 for arg in probe.args.values(): 67 if arg.pos >= self._avail_probes[probe.name]: 68 raise ValueError('Invalid probe argument position') 69 if arg.type not in (int, str): 70 raise ValueError('Invalid argument type') 71 72 def _parse(self, file): 73 regex = re.compile(r'(\w+): (.*)') 74 entries = [] 75 76 for line in file.readlines(): 77 match = regex.match(line) 78 if match is None: 79 continue 80 name, args = match.groups() 81 probe = self._probes.get(name) 82 # Skip the line if we don't recognize the probe name 83 if probe is None: 84 continue 85 entries.append(DTraceEntry(probe, args=dict(a.strip().split('=') 86 for a in args.split(',')))) 87 entries.sort(key=lambda e: e.args['tsc']) 88 return entries 89 90 def _list_probes(self): 91 files = subprocess.check_output(['git', 'ls-files', '*.[ch]', 92 ':!:include/spdk_internal/usdt.h']) 93 files = filter(lambda f: len(f) > 0, str(files, 'ascii').split('\n')) 94 regex = re.compile(r'SPDK_DTRACE_PROBE([0-9]*)\((\w+)') 95 probes = {} 96 97 for fname in files: 98 with open(fname, 'r') as file: 99 for match in regex.finditer(file.read()): 100 nargs, name = match.group(1), match.group(2) 101 nargs = int(nargs) if len(nargs) > 0 else 0 102 # Add one to accommodate for the tsc being the first arg 103 probes[name] = nargs + 1 104 return probes 105 106 def _gen_usdt(self, probe): 107 usdt = (f'usdt:__EXE__:{probe.name} {{' + 108 f'printf("{probe.name}: ') 109 args = probe.args 110 if len(args) > 0: 111 argtype = {int: '0x%lx', str: '\'%s\''} 112 argcast = {int: lambda x: x, str: lambda x: f'str({x})'} 113 argstr = [f'{a.name}={argtype[a.type]}' for a in args.values()] 114 argval = [f'{argcast[a.type](f"arg{a.pos}")}' for a in args.values()] 115 usdt += ', '.join(argstr) + '\\n", ' + ', '.join(argval) 116 else: 117 usdt += '\\n"' 118 usdt += ');}' 119 return usdt 120 121 def generate(self): 122 return '\n'.join([self._gen_usdt(p) for p in self._probes.values()]) 123 124 def record(self, pid): 125 with tempfile.NamedTemporaryFile(mode='w+') as script: 126 script.write(self.generate()) 127 script.flush() 128 try: 129 subprocess.run([f'{os.path.dirname(__file__)}/../bpftrace.sh', 130 f'{pid}', f'{script.name}']) 131 except KeyboardInterrupt: 132 pass 133 134 135@dataclass 136class TracepointArgument: 137 """Describes an SPDK tracepoint argument""" 138 TYPE_INT = 0 139 TYPE_PTR = 1 140 TYPE_STR = 2 141 name: str 142 argtype: int 143 144 145@dataclass 146class Tracepoint: 147 """Describes an SPDK tracepoint, equivalent to struct spdk_trace_tpoint""" 148 name: str 149 id: int 150 new_object: bool 151 object_type: int 152 owner_type: int 153 args: List[TracepointArgument] 154 155 156@dataclass 157class TraceEntry: 158 """Describes an SPDK tracepoint entry, equivalent to struct spdk_trace_entry""" 159 lcore: int 160 tpoint: Tracepoint 161 tsc: int 162 poller: str 163 size: int 164 object_id: str 165 object_ptr: int 166 time: int 167 args: Dict[str, TypeVar('ArgumentType', str, int)] 168 169 170class TraceProvider: 171 """Defines interface for objects providing traces and tracepoint definitions""" 172 173 def tpoints(self): 174 """Returns tracepoint definitions as a dict of (tracepoint_name, tracepoint)""" 175 raise NotImplementedError() 176 177 def entries(self): 178 """Generator returning subsequent trace entries""" 179 raise NotImplementedError() 180 181 def tsc_rate(self): 182 """Returns the TSC rate that was in place when traces were collected""" 183 raise NotImplementedError() 184 185 186class JsonProvider(TraceProvider): 187 """Trace provider based on JSON-formatted output produced by spdk_trace app""" 188 def __init__(self, file): 189 self._parser = ijson.parse(file) 190 self._tpoints = {} 191 self._parse_defs() 192 193 def _parse_tpoints(self, tpoints): 194 for tpoint in tpoints: 195 tpoint_id = tpoint['id'] 196 self._tpoints[tpoint_id] = Tracepoint( 197 name=tpoint['name'], id=tpoint_id, 198 new_object=tpoint['new_object'], object_type=OBJECT_NONE, 199 owner_type=OWNER_NONE, 200 args=[TracepointArgument(name=a['name'], 201 argtype=a['type']) 202 for a in tpoint.get('args', [])]) 203 204 def _parse_defs(self): 205 builder = None 206 for prefix, event, value in self._parser: 207 # If we reach entries array, there are no more tracepoint definitions 208 if prefix == 'entries': 209 break 210 elif prefix == 'tsc_rate': 211 self._tsc_rate = value 212 continue 213 214 if (prefix, event) == ('tpoints', 'start_array'): 215 builder = ijson.ObjectBuilder() 216 if builder is not None: 217 builder.event(event, value) 218 if (prefix, event) == ('tpoints', 'end_array'): 219 self._parse_tpoints(builder.value) 220 builder = None 221 222 def _parse_entry(self, entry): 223 tpoint = self._tpoints[entry['tpoint']] 224 obj = entry.get('object', {}) 225 return TraceEntry(tpoint=tpoint, lcore=entry['lcore'], tsc=entry['tsc'], 226 size=entry.get('size'), object_id=obj.get('id'), 227 object_ptr=obj.get('value'), time=obj.get('time'), 228 poller=entry.get('poller'), 229 args={n.name: v for n, v in zip(tpoint.args, entry.get('args', []))}) 230 231 def tsc_rate(self): 232 return self._tsc_rate 233 234 def tpoints(self): 235 return self._tpoints 236 237 def entries(self): 238 builder = None 239 for prefix, event, value in self._parser: 240 if (prefix, event) == ('entries.item', 'start_map'): 241 builder = ijson.ObjectBuilder() 242 if builder is not None: 243 builder.event(event, value) 244 if (prefix, event) == ('entries.item', 'end_map'): 245 yield self._parse_entry(builder.value) 246 builder = None 247 248 249class Trace: 250 """Stores, parses, and prints out SPDK traces""" 251 def __init__(self, file): 252 self._provider = JsonProvider(file) 253 self._objects = [] 254 self._argfmt = {TracepointArgument.TYPE_PTR: lambda a: f'0x{a:x}'} 255 self.tpoints = self._provider.tpoints() 256 257 def _annotate_args(self, entry): 258 annotations = {} 259 for obj in self._objects: 260 current = obj.annotate(entry) 261 if current is None: 262 continue 263 annotations.update(current) 264 return annotations 265 266 def _format_args(self, entry): 267 annotations = self._annotate_args(entry) 268 args = [] 269 for arg, (name, value) in zip(entry.tpoint.args, entry.args.items()): 270 annot = annotations.get(name) 271 if annot is not None: 272 args.append('{}({})'.format(name, ', '.join(f'{n}={v}' for n, v in annot.items()))) 273 else: 274 args.append('{}: {}'.format(name, self._argfmt.get(arg.argtype, 275 lambda a: a)(value))) 276 return args 277 278 def register_object(self, obj): 279 self._objects.append(obj) 280 281 def print(self): 282 def get_us(tsc, off): 283 return ((tsc - off) * 10 ** 6) / self._provider.tsc_rate() 284 285 offset = None 286 for e in self._provider.entries(): 287 offset = e.tsc if offset is None else offset 288 timestamp = get_us(e.tsc, offset) 289 diff = get_us(e.time, 0) if e.time is not None else None 290 args = ', '.join(self._format_args(e)) 291 fields = [ 292 f'{e.lcore:3}', 293 f'{timestamp:16.3f}', 294 f'{e.poller:3}' if e.poller is not None else ' ' * 3, 295 f'{e.tpoint.name:24}', 296 f'size: {e.size:6}' if e.size is not None else ' ' * (len('size: ') + 6), 297 f'id: {e.object_id:8}' if e.object_id is not None else None, 298 f'time: {diff:<8.3f}' if diff is not None else None, 299 args 300 ] 301 302 print(' '.join([*filter(lambda f: f is not None, fields)]).rstrip()) 303 304 305class SPDKObject: 306 """Describes a specific type of an SPDK objects (e.g. qpair, thread, etc.)""" 307 @dataclass 308 class Lifetime: 309 """Describes a lifetime and properites of a particular SPDK object.""" 310 begin: int 311 end: int 312 ptr: int 313 properties: dict = field(default_factory=dict) 314 315 def __init__(self, trace: Trace, tpoints: List[str]): 316 self.tpoints = {} 317 for name in tpoints: 318 tpoint = next((t for t in trace.tpoints.values() if t.name == name), None) 319 if tpoint is None: 320 # Some tpoints might be undefined if configured without specific subystems 321 continue 322 self.tpoints[tpoint.id] = tpoint 323 324 def _annotate(self, entry: TraceEntry): 325 """Abstract annotation method to be implemented by subclasses.""" 326 raise NotImplementedError() 327 328 def annotate(self, entry: TraceEntry): 329 """Annotates a tpoint entry and returns a dict indexed by argname with values representing 330 various object properites. For instance, {"qpair": {"qid": 1, "subnqn": "nqn"}} could be 331 returned to annotate an argument called "qpair" with two items: "qid" and "subnqn". 332 """ 333 if entry.tpoint.id not in self.tpoints: 334 return None 335 return self._annotate(entry) 336 337 338class QPair(SPDKObject): 339 def __init__(self, trace: Trace, dtrace: DTrace): 340 super().__init__(trace, tpoints=[ 341 'RDMA_REQ_NEW', 342 'RDMA_REQ_NEED_BUFFER', 343 'RDMA_REQ_TX_PENDING_C2H', 344 'RDMA_REQ_TX_PENDING_H2C', 345 'RDMA_REQ_TX_H2C', 346 'RDMA_REQ_RDY_TO_EXECUTE', 347 'RDMA_REQ_EXECUTING', 348 'RDMA_REQ_EXECUTED', 349 'RDMA_REQ_RDY_TO_COMPL', 350 'RDMA_REQ_COMPLETING_C2H', 351 'RDMA_REQ_COMPLETING', 352 'RDMA_REQ_COMPLETED']) 353 self._objects = [] 354 self._find_objects(dtrace.entries) 355 356 def _find_objects(self, dprobes): 357 def probe_match(probe, other): 358 return probe.args['qpair'] == other.args['qpair'] 359 360 for i, dprobe in enumerate(dprobes): 361 if dprobe.name != 'nvmf_poll_group_add_qpair': 362 continue 363 # We've found a new qpair, now find the probe indicating its destruction 364 last_idx, last = next((((i + j + 1), d) for j, d in enumerate(islice(dprobes, i, None)) 365 if d.name == 'nvmf_poll_group_remove_qpair' and 366 probe_match(d, dprobe)), (None, None)) 367 obj = SPDKObject.Lifetime(begin=dprobe.args['tsc'], 368 end=last.args['tsc'] if last is not None else TSC_MAX, 369 ptr=dprobe.args['qpair'], 370 properties={'ptr': hex(dprobe.args['qpair']), 371 'thread': dprobe.args['thread']}) 372 for other in filter(lambda p: probe_match(p, dprobe), dprobes[i:last_idx]): 373 if other.name == 'nvmf_ctrlr_add_qpair': 374 for prop in ['qid', 'subnqn', 'hostnqn']: 375 obj.properties[prop] = other.args[prop] 376 self._objects.append(obj) 377 378 def _annotate(self, entry): 379 qpair = entry.args.get('qpair') 380 if qpair is None: 381 return None 382 for obj in self._objects: 383 if obj.ptr == qpair and obj.begin <= entry.tsc <= obj.end: 384 return {'qpair': obj.properties} 385 return None 386 387 388def build_dtrace(file=None): 389 return DTrace([ 390 DTraceProbe( 391 name='nvmf_poll_group_add_qpair', 392 args=[DTraceArgument(name='tsc', pos=0, type=int), 393 DTraceArgument(name='qpair', pos=1, type=int), 394 DTraceArgument(name='thread', pos=2, type=int)]), 395 DTraceProbe( 396 name='nvmf_poll_group_remove_qpair', 397 args=[DTraceArgument(name='tsc', pos=0, type=int), 398 DTraceArgument(name='qpair', pos=1, type=int), 399 DTraceArgument(name='thread', pos=2, type=int)]), 400 DTraceProbe( 401 name='nvmf_ctrlr_add_qpair', 402 args=[DTraceArgument(name='tsc', pos=0, type=int), 403 DTraceArgument(name='qpair', pos=1, type=int), 404 DTraceArgument(name='qid', pos=2, type=int), 405 DTraceArgument(name='subnqn', pos=3, type=str), 406 DTraceArgument(name='hostnqn', pos=4, type=str)])], file) 407 408 409def print_trace(trace_file, dtrace_file): 410 dtrace = build_dtrace(dtrace_file) 411 trace = Trace(trace_file) 412 trace.register_object(QPair(trace, dtrace)) 413 trace.print() 414 415 416def main(argv): 417 parser = ArgumentParser(description='SPDK trace annotation script') 418 parser.add_argument('-i', '--input', 419 help='JSON-formatted trace file produced by spdk_trace app') 420 parser.add_argument('-g', '--generate', help='Generate bpftrace script', action='store_true') 421 parser.add_argument('-r', '--record', help='Record BPF traces on PID', metavar='PID', type=int) 422 parser.add_argument('-b', '--bpftrace', help='BPF trace script to use for annotations') 423 args = parser.parse_args(argv) 424 425 if args.generate: 426 print(build_dtrace().generate()) 427 elif args.record: 428 build_dtrace().record(args.record) 429 else: 430 print_trace(open(args.input, 'r') if args.input is not None else sys.stdin, 431 open(args.bpftrace) if args.bpftrace is not None else None) 432 433 434if __name__ == '__main__': 435 main(sys.argv[1:]) 436