1#!/usr/bin/env python3 2 3from argparse import ArgumentParser 4from dataclasses import dataclass, field 5from itertools import islice 6from typing import Dict, List, TypeVar 7import ijson 8import os 9import re 10import subprocess 11import sys 12import tempfile 13 14TSC_MAX = (1 << 64) - 1 15 16 17@dataclass 18class DTraceArgument: 19 """Describes a DTrace probe (usdt) argument""" 20 name: str 21 pos: int 22 type: type 23 24 25@dataclass 26class DTraceProbe: 27 """Describes a DTrace probe (usdt) point""" 28 name: str 29 args: Dict[str, DTraceArgument] 30 31 def __init__(self, name, args): 32 self.name = name 33 self.args = {a.name: a for a in args} 34 35 36@dataclass 37class DTraceEntry: 38 """Describes a single DTrace probe invocation""" 39 name: str 40 args: Dict[str, TypeVar('ArgumentType', str, int)] 41 42 def __init__(self, probe, args): 43 valmap = {int: lambda x: int(x, 16), 44 str: lambda x: x.strip().strip("'")} 45 self.name = probe.name 46 self.args = {} 47 for name, value in args.items(): 48 arg = probe.args.get(name) 49 if arg is None: 50 raise ValueError(f'Unexpected argument: {name}') 51 self.args[name] = valmap[arg.type](value) 52 53 54class DTrace: 55 """Generates bpftrace script based on the supplied probe points, parses its 56 output and stores is as a list of DTraceEntry sorted by their tsc. 57 """ 58 def __init__(self, probes, file=None): 59 self._avail_probes = self._list_probes() 60 self._probes = {p.name: p for p in probes} 61 self.entries = self._parse(file) if file is not None else [] 62 # Sanitize the probe definitions 63 for probe in probes: 64 if probe.name not in self._avail_probes: 65 raise ValueError(f'Couldn\'t find probe: "{probe.name}"') 66 for arg in probe.args.values(): 67 if arg.pos >= self._avail_probes[probe.name]: 68 raise ValueError('Invalid probe argument position') 69 if arg.type not in (int, str): 70 raise ValueError('Invalid argument type') 71 72 def _parse(self, file): 73 regex = re.compile(r'(\w+): (.*)') 74 entries = [] 75 76 for line in file.readlines(): 77 match = regex.match(line) 78 if match is None: 79 continue 80 name, args = match.groups() 81 probe = self._probes.get(name) 82 # Skip the line if we don't recognize the probe name 83 if probe is None: 84 continue 85 entries.append(DTraceEntry(probe, args=dict(a.strip().split('=') 86 for a in args.split(',')))) 87 entries.sort(key=lambda e: e.args['tsc']) 88 return entries 89 90 def _list_probes(self): 91 files = subprocess.check_output(['git', 'ls-files', '*.[ch]', 92 ':!:include/spdk_internal/usdt.h']) 93 files = filter(lambda f: len(f) > 0, str(files, 'ascii').split('\n')) 94 regex = re.compile(r'SPDK_DTRACE_PROBE([0-9]*)\((\w+)') 95 probes = {} 96 97 for fname in files: 98 with open(fname, 'r') as file: 99 for match in regex.finditer(file.read()): 100 nargs, name = match.group(1), match.group(2) 101 nargs = int(nargs) if len(nargs) > 0 else 0 102 # Add one to accommodate for the tsc being the first arg 103 probes[name] = nargs + 1 104 return probes 105 106 def _gen_usdt(self, probe): 107 usdt = (f'usdt:__EXE__:{probe.name} {{' + 108 f'printf("{probe.name}: ') 109 args = probe.args 110 if len(args) > 0: 111 argtype = {int: '0x%lx', str: '\'%s\''} 112 argcast = {int: lambda x: x, str: lambda x: f'str({x})'} 113 argstr = [f'{a.name}={argtype[a.type]}' for a in args.values()] 114 argval = [f'{argcast[a.type](f"arg{a.pos}")}' for a in args.values()] 115 usdt += ', '.join(argstr) + '\\n", ' + ', '.join(argval) 116 else: 117 usdt += '\\n"' 118 usdt += ');}' 119 return usdt 120 121 def generate(self): 122 return '\n'.join([self._gen_usdt(p) for p in self._probes.values()]) 123 124 def record(self, pid): 125 with tempfile.NamedTemporaryFile(mode='w+') as script: 126 script.write(self.generate()) 127 script.flush() 128 try: 129 subprocess.run([f'{os.path.dirname(__file__)}/../bpftrace.sh', 130 f'{pid}', f'{script.name}']) 131 except KeyboardInterrupt: 132 pass 133 134 135@dataclass 136class TracepointArgument: 137 """Describes an SPDK tracepoint argument""" 138 TYPE_INT = 0 139 TYPE_PTR = 1 140 TYPE_STR = 2 141 name: str 142 argtype: int 143 144 145@dataclass 146class Tracepoint: 147 """Describes an SPDK tracepoint, equivalent to struct spdk_trace_tpoint""" 148 name: str 149 id: int 150 new_object: bool 151 args: List[TracepointArgument] 152 153 154@dataclass 155class TraceEntry: 156 """Describes an SPDK tracepoint entry, equivalent to struct spdk_trace_entry""" 157 lcore: int 158 tpoint: Tracepoint 159 tsc: int 160 poller: str 161 size: int 162 object_id: str 163 object_ptr: int 164 time: int 165 args: Dict[str, TypeVar('ArgumentType', str, int)] 166 167 168class Trace: 169 """Stores, parses, and prints out SPDK traces""" 170 def __init__(self, file): 171 self._parser = ijson.parse(file) 172 self._objects = [] 173 self._argfmt = {TracepointArgument.TYPE_PTR: lambda a: f'0x{a:x}'} 174 self.tpoints = {} 175 self._parse_defs() 176 177 def _parse_tpoints(self, tpoints): 178 for tpoint in tpoints: 179 tpoint_id = tpoint['id'] 180 self.tpoints[tpoint_id] = Tracepoint( 181 name=tpoint['name'], id=tpoint_id, 182 new_object=tpoint['new_object'], 183 args=[TracepointArgument(name=a['name'], 184 argtype=a['type']) 185 for a in tpoint.get('args', [])]) 186 187 def _parse_defs(self): 188 builder = None 189 for prefix, event, value in self._parser: 190 # If we reach entries array, there are no more tracepoint definitions 191 if prefix == 'entries': 192 break 193 elif prefix == 'tsc_rate': 194 self.tsc_rate = value 195 continue 196 197 if (prefix, event) == ('tpoints', 'start_array'): 198 builder = ijson.ObjectBuilder() 199 if builder is not None: 200 builder.event(event, value) 201 if (prefix, event) == ('tpoints', 'end_array'): 202 self._parse_tpoints(builder.value) 203 builder = None 204 205 def _parse_entry(self, entry): 206 tpoint = self.tpoints[entry['tpoint']] 207 obj = entry.get('object', {}) 208 return TraceEntry(tpoint=tpoint, lcore=entry['lcore'], tsc=entry['tsc'], 209 size=entry.get('size'), object_id=obj.get('id'), 210 object_ptr=obj.get('value'), time=obj.get('time'), 211 poller=entry.get('poller'), 212 args={n.name: v for n, v in zip(tpoint.args, entry.get('args', []))}) 213 214 def _entries(self): 215 builder = None 216 for prefix, event, value in self._parser: 217 if (prefix, event) == ('entries.item', 'start_map'): 218 builder = ijson.ObjectBuilder() 219 if builder is not None: 220 builder.event(event, value) 221 if (prefix, event) == ('entries.item', 'end_map'): 222 yield self._parse_entry(builder.value) 223 builder = None 224 225 def _annotate_args(self, entry): 226 annotations = {} 227 for obj in self._objects: 228 current = obj.annotate(entry) 229 if current is None: 230 continue 231 annotations.update(current) 232 return annotations 233 234 def _format_args(self, entry): 235 annotations = self._annotate_args(entry) 236 args = [] 237 for arg, (name, value) in zip(entry.tpoint.args, entry.args.items()): 238 annot = annotations.get(name) 239 if annot is not None: 240 args.append('{}({})'.format(name, ', '.join(f'{n}={v}' for n, v in annot.items()))) 241 else: 242 args.append('{}: {}'.format(name, self._argfmt.get(arg.argtype, 243 lambda a: a)(value))) 244 return args 245 246 def register_object(self, obj): 247 self._objects.append(obj) 248 249 def print(self): 250 def get_us(tsc, off): 251 return ((tsc - off) * 10 ** 6) / self.tsc_rate 252 253 offset = None 254 for e in self._entries(): 255 offset = e.tsc if offset is None else offset 256 timestamp = get_us(e.tsc, offset) 257 diff = get_us(e.time, 0) if e.time is not None else None 258 args = ', '.join(self._format_args(e)) 259 fields = [ 260 f'{e.lcore:3}', 261 f'{timestamp:16.3f}', 262 f'{e.poller:3}' if e.poller is not None else ' ' * 3, 263 f'{e.tpoint.name:24}', 264 f'size: {e.size:6}' if e.size is not None else ' ' * (len('size: ') + 6), 265 f'id: {e.object_id:8}' if e.object_id is not None else None, 266 f'time: {diff:<8.3f}' if diff is not None else None, 267 args 268 ] 269 270 print(' '.join([*filter(lambda f: f is not None, fields)]).rstrip()) 271 272 273class SPDKObject: 274 """Describes a specific type of an SPDK objects (e.g. qpair, thread, etc.)""" 275 @dataclass 276 class Lifetime: 277 """Describes a lifetime and properites of a particular SPDK object.""" 278 begin: int 279 end: int 280 ptr: int 281 properties: dict = field(default_factory=dict) 282 283 def __init__(self, trace: Trace, tpoints: List[str]): 284 self.tpoints = {} 285 for name in tpoints: 286 tpoint = next((t for t in trace.tpoints.values() if t.name == name), None) 287 if tpoint is None: 288 # Some tpoints might be undefined if configured without specific subystems 289 continue 290 self.tpoints[tpoint.id] = tpoint 291 292 def _annotate(self, entry: TraceEntry): 293 """Abstract annotation method to be implemented by subclasses.""" 294 raise NotImplementedError() 295 296 def annotate(self, entry: TraceEntry): 297 """Annotates a tpoint entry and returns a dict indexed by argname with values representing 298 various object properites. For instance, {"qpair": {"qid": 1, "subnqn": "nqn"}} could be 299 returned to annotate an argument called "qpair" with two items: "qid" and "subnqn". 300 """ 301 if entry.tpoint.id not in self.tpoints: 302 return None 303 return self._annotate(entry) 304 305 306class QPair(SPDKObject): 307 def __init__(self, trace: Trace, dtrace: DTrace): 308 super().__init__(trace, tpoints=[ 309 'RDMA_REQ_NEW', 310 'RDMA_REQ_NEED_BUFFER', 311 'RDMA_REQ_TX_PENDING_C2H', 312 'RDMA_REQ_TX_PENDING_H2C', 313 'RDMA_REQ_TX_H2C', 314 'RDMA_REQ_RDY_TO_EXECUTE', 315 'RDMA_REQ_EXECUTING', 316 'RDMA_REQ_EXECUTED', 317 'RDMA_REQ_RDY_TO_COMPL', 318 'RDMA_REQ_COMPLETING_C2H', 319 'RDMA_REQ_COMPLETING', 320 'RDMA_REQ_COMPLETED']) 321 self._objects = [] 322 self._find_objects(dtrace.entries) 323 324 def _find_objects(self, dprobes): 325 def probe_match(probe, other): 326 return probe.args['qpair'] == other.args['qpair'] 327 328 for i, dprobe in enumerate(dprobes): 329 if dprobe.name != 'nvmf_poll_group_add_qpair': 330 continue 331 # We've found a new qpair, now find the probe indicating its destruction 332 last_idx, last = next((((i + j + 1), d) for j, d in enumerate(islice(dprobes, i, None)) 333 if d.name == 'nvmf_poll_group_remove_qpair' and 334 probe_match(d, dprobe)), (None, None)) 335 obj = SPDKObject.Lifetime(begin=dprobe.args['tsc'], 336 end=last.args['tsc'] if last is not None else TSC_MAX, 337 ptr=dprobe.args['qpair'], 338 properties={'ptr': hex(dprobe.args['qpair']), 339 'thread': dprobe.args['thread']}) 340 for other in filter(lambda p: probe_match(p, dprobe), dprobes[i:last_idx]): 341 if other.name == 'nvmf_ctrlr_add_qpair': 342 for prop in ['qid', 'subnqn', 'hostnqn']: 343 obj.properties[prop] = other.args[prop] 344 self._objects.append(obj) 345 346 def _annotate(self, entry): 347 qpair = entry.args.get('qpair') 348 if qpair is None: 349 return None 350 for obj in self._objects: 351 if obj.ptr == qpair and obj.begin <= entry.tsc <= obj.end: 352 return {'qpair': obj.properties} 353 return None 354 355 356def build_dtrace(file=None): 357 return DTrace([ 358 DTraceProbe( 359 name='nvmf_poll_group_add_qpair', 360 args=[DTraceArgument(name='tsc', pos=0, type=int), 361 DTraceArgument(name='qpair', pos=1, type=int), 362 DTraceArgument(name='thread', pos=2, type=int)]), 363 DTraceProbe( 364 name='nvmf_poll_group_remove_qpair', 365 args=[DTraceArgument(name='tsc', pos=0, type=int), 366 DTraceArgument(name='qpair', pos=1, type=int), 367 DTraceArgument(name='thread', pos=2, type=int)]), 368 DTraceProbe( 369 name='nvmf_ctrlr_add_qpair', 370 args=[DTraceArgument(name='tsc', pos=0, type=int), 371 DTraceArgument(name='qpair', pos=1, type=int), 372 DTraceArgument(name='qid', pos=2, type=int), 373 DTraceArgument(name='subnqn', pos=3, type=str), 374 DTraceArgument(name='hostnqn', pos=4, type=str)])], file) 375 376 377def print_trace(trace_file, dtrace_file): 378 dtrace = build_dtrace(dtrace_file) 379 trace = Trace(trace_file) 380 trace.register_object(QPair(trace, dtrace)) 381 trace.print() 382 383 384def main(argv): 385 parser = ArgumentParser(description='SPDK trace annotation script') 386 parser.add_argument('-i', '--input', 387 help='JSON-formatted trace file produced by spdk_trace app') 388 parser.add_argument('-g', '--generate', help='Generate bpftrace script', action='store_true') 389 parser.add_argument('-r', '--record', help='Record BPF traces on PID', metavar='PID', type=int) 390 parser.add_argument('-b', '--bpftrace', help='BPF trace script to use for annotations') 391 args = parser.parse_args(argv) 392 393 if args.generate: 394 print(build_dtrace().generate()) 395 elif args.record: 396 build_dtrace().record(args.record) 397 else: 398 print_trace(open(args.input, 'r') if args.input is not None else sys.stdin, 399 open(args.bpftrace) if args.bpftrace is not None else None) 400 401 402if __name__ == '__main__': 403 main(sys.argv[1:]) 404