xref: /spdk/scripts/bpf/trace.py (revision 9544fe07aad355262fcaa65dc27f9965a8ea4617)
1#!/usr/bin/env python3
2
3from argparse import ArgumentParser
4from dataclasses import dataclass, field
5from itertools import islice
6from typing import Dict, List, TypeVar
7import ctypes as ct
8import ijson
9import magic
10import os
11import re
12import subprocess
13import sys
14import tempfile
15
16TSC_MAX = (1 << 64) - 1
17UCHAR_MAX = (1 << 8) - 1
18TRACE_MAX_LCORE = 128
19TRACE_MAX_GROUP_ID = 16
20TRACE_MAX_TPOINT_ID = TRACE_MAX_GROUP_ID * 64
21TRACE_MAX_ARGS_COUNT = 5
22TRACE_INVALID_OBJECT = (1 << 64) - 1
23OBJECT_NONE = 0
24OWNER_NONE = 0
25
26
27@dataclass
28class DTraceArgument:
29    """Describes a DTrace probe (usdt) argument"""
30    name: str
31    pos: int
32    type: type
33
34
35@dataclass
36class DTraceProbe:
37    """Describes a DTrace probe (usdt) point"""
38    name: str
39    args: Dict[str, DTraceArgument]
40
41    def __init__(self, name, args):
42        self.name = name
43        self.args = {a.name: a for a in args}
44
45
46@dataclass
47class DTraceEntry:
48    """Describes a single DTrace probe invocation"""
49    name: str
50    args: Dict[str, TypeVar('ArgumentType', str, int)]
51
52    def __init__(self, probe, args):
53        valmap = {int: lambda x: int(x, 16),
54                  str: lambda x: x.strip().strip("'")}
55        self.name = probe.name
56        self.args = {}
57        for name, value in args.items():
58            arg = probe.args.get(name)
59            if arg is None:
60                raise ValueError(f'Unexpected argument: {name}')
61            self.args[name] = valmap[arg.type](value)
62
63
64class DTrace:
65    """Generates bpftrace script based on the supplied probe points, parses its
66    output and stores is as a list of DTraceEntry sorted by their tsc.
67    """
68    def __init__(self, probes, file=None):
69        self._avail_probes = self._list_probes()
70        self._probes = {p.name: p for p in probes}
71        self.entries = self._parse(file) if file is not None else []
72        # Sanitize the probe definitions
73        for probe in probes:
74            if probe.name not in self._avail_probes:
75                raise ValueError(f'Couldn\'t find probe: "{probe.name}"')
76            for arg in probe.args.values():
77                if arg.pos >= self._avail_probes[probe.name]:
78                    raise ValueError('Invalid probe argument position')
79                if arg.type not in (int, str):
80                    raise ValueError('Invalid argument type')
81
82    def _parse(self, file):
83        regex = re.compile(r'(\w+): (.*)')
84        entries = []
85
86        for line in file.readlines():
87            match = regex.match(line)
88            if match is None:
89                continue
90            name, args = match.groups()
91            probe = self._probes.get(name)
92            # Skip the line if we don't recognize the probe name
93            if probe is None:
94                continue
95            entries.append(DTraceEntry(probe, args=dict(a.strip().split('=')
96                                                        for a in args.split(','))))
97        entries.sort(key=lambda e: e.args['tsc'])
98        return entries
99
100    def _list_probes(self):
101        files = subprocess.check_output(['git', 'ls-files', '*.[ch]',
102                                        ':!:include/spdk_internal/usdt.h'])
103        files = filter(lambda f: len(f) > 0, str(files, 'ascii').split('\n'))
104        regex = re.compile(r'SPDK_DTRACE_PROBE([0-9]*)\((\w+)')
105        probes = {}
106
107        for fname in files:
108            with open(fname, 'r') as file:
109                for match in regex.finditer(file.read()):
110                    nargs, name = match.group(1), match.group(2)
111                    nargs = int(nargs) if len(nargs) > 0 else 0
112                    # Add one to accommodate for the tsc being the first arg
113                    probes[name] = nargs + 1
114        return probes
115
116    def _gen_usdt(self, probe):
117        usdt = (f'usdt:__EXE__:{probe.name} {{' +
118                f'printf("{probe.name}: ')
119        args = probe.args
120        if len(args) > 0:
121            argtype = {int: '0x%lx', str: '\'%s\''}
122            argcast = {int: lambda x: x, str: lambda x: f'str({x})'}
123            argstr = [f'{a.name}={argtype[a.type]}' for a in args.values()]
124            argval = [f'{argcast[a.type](f"arg{a.pos}")}' for a in args.values()]
125            usdt += ', '.join(argstr) + '\\n", ' + ', '.join(argval)
126        else:
127            usdt += '\\n"'
128        usdt += ');}'
129        return usdt
130
131    def generate(self):
132        return '\n'.join([self._gen_usdt(p) for p in self._probes.values()])
133
134    def record(self, pid):
135        with tempfile.NamedTemporaryFile(mode='w+') as script:
136            script.write(self.generate())
137            script.flush()
138            try:
139                subprocess.run([f'{os.path.dirname(__file__)}/../bpftrace.sh',
140                                f'{pid}', f'{script.name}'])
141            except KeyboardInterrupt:
142                pass
143
144
145@dataclass
146class TracepointArgument:
147    """Describes an SPDK tracepoint argument"""
148    TYPE_INT = 0
149    TYPE_PTR = 1
150    TYPE_STR = 2
151    name: str
152    argtype: int
153
154
155@dataclass
156class Tracepoint:
157    """Describes an SPDK tracepoint, equivalent to struct spdk_trace_tpoint"""
158    name: str
159    id: int
160    new_object: bool
161    object_type: int
162    owner_type: int
163    args: List[TracepointArgument]
164
165
166@dataclass
167class TraceEntry:
168    """Describes an SPDK tracepoint entry, equivalent to struct spdk_trace_entry"""
169    lcore: int
170    tpoint: Tracepoint
171    tsc: int
172    poller: str
173    size: int
174    object_id: str
175    object_ptr: int
176    time: int
177    args: Dict[str, TypeVar('ArgumentType', str, int)]
178
179
180class TraceProvider:
181    """Defines interface for objects providing traces and tracepoint definitions"""
182
183    def tpoints(self):
184        """Returns tracepoint definitions as a dict of (tracepoint_name, tracepoint)"""
185        raise NotImplementedError()
186
187    def entries(self):
188        """Generator returning subsequent trace entries"""
189        raise NotImplementedError()
190
191    def tsc_rate(self):
192        """Returns the TSC rate that was in place when traces were collected"""
193        raise NotImplementedError()
194
195
196class JsonProvider(TraceProvider):
197    """Trace provider based on JSON-formatted output produced by spdk_trace app"""
198    def __init__(self, file):
199        self._parser = ijson.parse(file)
200        self._tpoints = {}
201        self._parse_defs()
202
203    def _parse_tpoints(self, tpoints):
204        for tpoint in tpoints:
205            tpoint_id = tpoint['id']
206            self._tpoints[tpoint_id] = Tracepoint(
207                name=tpoint['name'], id=tpoint_id,
208                new_object=tpoint['new_object'], object_type=OBJECT_NONE,
209                owner_type=OWNER_NONE,
210                args=[TracepointArgument(name=a['name'],
211                                         argtype=a['type'])
212                      for a in tpoint.get('args', [])])
213
214    def _parse_defs(self):
215        builder = None
216        for prefix, event, value in self._parser:
217            # If we reach entries array, there are no more tracepoint definitions
218            if prefix == 'entries':
219                break
220            elif prefix == 'tsc_rate':
221                self._tsc_rate = value
222                continue
223
224            if (prefix, event) == ('tpoints', 'start_array'):
225                builder = ijson.ObjectBuilder()
226            if builder is not None:
227                builder.event(event, value)
228            if (prefix, event) == ('tpoints', 'end_array'):
229                self._parse_tpoints(builder.value)
230                builder = None
231
232    def _parse_entry(self, entry):
233        tpoint = self._tpoints[entry['tpoint']]
234        obj = entry.get('object', {})
235        return TraceEntry(tpoint=tpoint, lcore=entry['lcore'], tsc=entry['tsc'],
236                          size=entry.get('size'), object_id=obj.get('id'),
237                          object_ptr=obj.get('value'), time=obj.get('time'),
238                          poller=entry.get('poller'),
239                          args={n.name: v for n, v in zip(tpoint.args, entry.get('args', []))})
240
241    def tsc_rate(self):
242        return self._tsc_rate
243
244    def tpoints(self):
245        return self._tpoints
246
247    def entries(self):
248        builder = None
249        for prefix, event, value in self._parser:
250            if (prefix, event) == ('entries.item', 'start_map'):
251                builder = ijson.ObjectBuilder()
252            if builder is not None:
253                builder.event(event, value)
254            if (prefix, event) == ('entries.item', 'end_map'):
255                yield self._parse_entry(builder.value)
256                builder = None
257
258
259class CParserOpts(ct.Structure):
260    _fields_ = [('filename', ct.c_char_p),
261                ('mode', ct.c_int),
262                ('lcore', ct.c_uint16)]
263
264
265class CTraceOwner(ct.Structure):
266    _fields_ = [('type', ct.c_uint8),
267                ('id_prefix', ct.c_char)]
268
269
270class CTraceObject(ct.Structure):
271    _fields_ = [('type', ct.c_uint8),
272                ('id_prefix', ct.c_char)]
273
274
275class CTpointArgument(ct.Structure):
276    _fields_ = [('name', ct.c_char * 14),
277                ('type', ct.c_uint8),
278                ('size', ct.c_uint8)]
279
280
281class CTracepoint(ct.Structure):
282    _fields_ = [('name', ct.c_char * 24),
283                ('tpoint_id', ct.c_uint16),
284                ('owner_type', ct.c_uint8),
285                ('object_type', ct.c_uint8),
286                ('new_object', ct.c_uint8),
287                ('num_args', ct.c_uint8),
288                ('args', CTpointArgument * TRACE_MAX_ARGS_COUNT)]
289
290
291class CTraceFlags(ct.Structure):
292    _fields_ = [('tsc_rate', ct.c_uint64),
293                ('tpoint_mask', ct.c_uint64 * TRACE_MAX_GROUP_ID),
294                ('owner', CTraceOwner * (UCHAR_MAX + 1)),
295                ('object', CTraceObject * (UCHAR_MAX + 1)),
296                ('tpoint', CTracepoint * TRACE_MAX_TPOINT_ID)]
297
298
299class CTraceEntry(ct.Structure):
300    _fields_ = [('tsc', ct.c_uint64),
301                ('tpoint_id', ct.c_uint16),
302                ('poller_id', ct.c_uint16),
303                ('size', ct.c_uint32),
304                ('object_id', ct.c_uint64)]
305
306
307class CTraceParserArgument(ct.Union):
308    _fields_ = [('integer', ct.c_uint64),
309                ('pointer', ct.c_void_p),
310                ('string', ct.c_char * (UCHAR_MAX + 1))]
311
312
313class CTraceParserEntry(ct.Structure):
314    _fields_ = [('entry', ct.POINTER(CTraceEntry)),
315                ('object_index', ct.c_uint64),
316                ('object_start', ct.c_uint64),
317                ('lcore', ct.c_uint16),
318                ('args', CTraceParserArgument * TRACE_MAX_ARGS_COUNT)]
319
320
321class NativeProvider(TraceProvider):
322    """Trace provider based on SPDK's trace library"""
323    def __init__(self, file):
324        self._setup_binding(file.name)
325        self._parse_defs()
326
327    def __del__(self):
328        if hasattr(self, '_parser'):
329            self._lib.spdk_trace_parser_cleanup(self._parser)
330
331    def _setup_binding(self, filename):
332        self._lib = ct.CDLL('build/lib/libspdk_trace_parser.so')
333        self._lib.spdk_trace_parser_init.restype = ct.c_void_p
334        self._lib.spdk_trace_parser_init.errcheck = lambda r, *_: ct.c_void_p(r)
335        self._lib.spdk_trace_parser_get_flags.restype = ct.POINTER(CTraceFlags)
336        opts = CParserOpts(filename=bytes(filename, 'ascii'), mode=0,
337                           lcore=TRACE_MAX_LCORE)
338        self._parser = self._lib.spdk_trace_parser_init(ct.byref(opts))
339        if not self._parser:
340            raise ValueError('Failed to construct SPDK trace parser')
341
342    def _parse_tpoints(self, tpoints):
343        self._tpoints = {}
344        for tpoint in tpoints:
345            if len(tpoint.name) == 0:
346                continue
347            self._tpoints[tpoint.tpoint_id] = Tracepoint(
348                name=str(tpoint.name, 'ascii'), object_type=tpoint.object_type,
349                owner_type=tpoint.owner_type, id=tpoint.tpoint_id,
350                new_object=bool(tpoint.new_object),
351                args=[TracepointArgument(name=str(a.name, 'ascii'), argtype=a.type)
352                      for a in tpoint.args[:tpoint.num_args]])
353
354    def _parse_defs(self):
355        flags = self._lib.spdk_trace_parser_get_flags(self._parser)
356        self._tsc_rate = flags.contents.tsc_rate
357        self._parse_tpoints(flags.contents.tpoint)
358
359        def conv_objs(arr):
360            return {int(o.type): str(o.id_prefix, 'ascii') for o in arr if o.id_prefix != b'\x00'}
361        self._owners = conv_objs(flags.contents.owner)
362        self._objects = conv_objs(flags.contents.object)
363
364    def tsc_rate(self):
365        return self._tsc_rate
366
367    def tpoints(self):
368        return self._tpoints
369
370    def entries(self):
371        pe = CTraceParserEntry()
372        argconv = {TracepointArgument.TYPE_INT: lambda a: a.integer,
373                   TracepointArgument.TYPE_PTR: lambda a: int(a.pointer or 0),
374                   TracepointArgument.TYPE_STR: lambda a: str(a.string, 'ascii')}
375
376        while self._lib.spdk_trace_parser_next_entry(self._parser, ct.byref(pe)):
377            entry = pe.entry.contents
378            lcore = pe.lcore
379            tpoint = self._tpoints[entry.tpoint_id]
380            args = {a.name: argconv[a.argtype](pe.args[i]) for i, a in enumerate(tpoint.args)}
381
382            if tpoint.object_type != OBJECT_NONE:
383                if pe.object_index != TRACE_INVALID_OBJECT:
384                    object_id = '{}{}'.format(self._objects[tpoint.object_type], pe.object_index)
385                    ts = entry.tsc - pe.object_start
386                else:
387                    object_id, ts = 'n/a', None
388            elif entry.object_id != 0:
389                object_id, ts = '{:x}'.format(entry.object_id), None
390            else:
391                object_id, ts = None, None
392
393            if tpoint.owner_type != OWNER_NONE:
394                poller_id = '{}{:02}'.format(self._owners[tpoint.owner_type], entry.poller_id)
395            else:
396                poller_id = None
397
398            yield TraceEntry(tpoint=tpoint, lcore=lcore, tsc=entry.tsc,
399                             size=entry.size, object_id=object_id,
400                             object_ptr=entry.object_id, poller=poller_id, time=ts,
401                             args=args)
402
403
404class Trace:
405    """Stores, parses, and prints out SPDK traces"""
406    def __init__(self, file):
407        if file == sys.stdin or magic.from_file(file.name, mime=True) == 'application/json':
408            self._provider = JsonProvider(file)
409        else:
410            self._provider = NativeProvider(file)
411        self._objects = []
412        self._argfmt = {TracepointArgument.TYPE_PTR: lambda a: f'0x{a:x}'}
413        self.tpoints = self._provider.tpoints()
414
415    def _annotate_args(self, entry):
416        annotations = {}
417        for obj in self._objects:
418            current = obj.annotate(entry)
419            if current is None:
420                continue
421            annotations.update(current)
422        return annotations
423
424    def _format_args(self, entry):
425        annotations = self._annotate_args(entry)
426        args = []
427        for arg, (name, value) in zip(entry.tpoint.args, entry.args.items()):
428            annot = annotations.get(name)
429            if annot is not None:
430                args.append('{}({})'.format(name, ', '.join(f'{n}={v}' for n, v in annot.items())))
431            else:
432                args.append('{}: {}'.format(name, self._argfmt.get(arg.argtype,
433                                                                   lambda a: a)(value)))
434        return args
435
436    def register_object(self, obj):
437        self._objects.append(obj)
438
439    def print(self):
440        def get_us(tsc, off):
441            return ((tsc - off) * 10 ** 6) / self._provider.tsc_rate()
442
443        offset = None
444        for e in self._provider.entries():
445            offset = e.tsc if offset is None else offset
446            timestamp = get_us(e.tsc, offset)
447            diff = get_us(e.time, 0) if e.time is not None else None
448            args = ', '.join(self._format_args(e))
449
450            print(('{:3} {:16.3f} {:3} {:24} {:12}'.format(
451                e.lcore, timestamp, e.poller if e.poller is not None else '',
452                e.tpoint.name, f'size: {e.size}' if e.size else '') +
453                (f'id: {e.object_id:8} ' if e.object_id is not None else '') +
454                (f'time: {diff:<8.3f} ' if diff is not None else '') +
455                args).rstrip())
456
457
458class SPDKObject:
459    """Describes a specific type of an SPDK objects (e.g. qpair, thread, etc.)"""
460    @dataclass
461    class Lifetime:
462        """Describes a lifetime and properites of a particular SPDK object."""
463        begin: int
464        end: int
465        ptr: int
466        properties: dict = field(default_factory=dict)
467
468    def __init__(self, trace: Trace, tpoints: List[str]):
469        self.tpoints = {}
470        for name in tpoints:
471            tpoint = next((t for t in trace.tpoints.values() if t.name == name), None)
472            if tpoint is None:
473                # Some tpoints might be undefined if configured without specific subystems
474                continue
475            self.tpoints[tpoint.id] = tpoint
476
477    def _annotate(self, entry: TraceEntry):
478        """Abstract annotation method to be implemented by subclasses."""
479        raise NotImplementedError()
480
481    def annotate(self, entry: TraceEntry):
482        """Annotates a tpoint entry and returns a dict indexed by argname with values representing
483        various object properites.  For instance, {"qpair": {"qid": 1, "subnqn": "nqn"}} could be
484        returned to annotate an argument called "qpair" with two items: "qid" and "subnqn".
485        """
486        if entry.tpoint.id not in self.tpoints:
487            return None
488        return self._annotate(entry)
489
490
491class QPair(SPDKObject):
492    def __init__(self, trace: Trace, dtrace: DTrace):
493        super().__init__(trace, tpoints=[
494            'RDMA_REQ_NEW',
495            'RDMA_REQ_NEED_BUFFER',
496            'RDMA_REQ_TX_PENDING_C2H',
497            'RDMA_REQ_TX_PENDING_H2C',
498            'RDMA_REQ_TX_H2C',
499            'RDMA_REQ_RDY_TO_EXECUTE',
500            'RDMA_REQ_EXECUTING',
501            'RDMA_REQ_EXECUTED',
502            'RDMA_REQ_RDY_TO_COMPL',
503            'RDMA_REQ_COMPLETING_C2H',
504            'RDMA_REQ_COMPLETING',
505            'RDMA_REQ_COMPLETED',
506            'TCP_REQ_NEW',
507            'TCP_REQ_NEED_BUFFER',
508            'TCP_REQ_TX_H_TO_C',
509            'TCP_REQ_RDY_TO_EXECUTE',
510            'TCP_REQ_EXECUTING',
511            'TCP_REQ_EXECUTED',
512            'TCP_REQ_RDY_TO_COMPLETE',
513            'TCP_REQ_TRANSFER_C2H',
514            'TCP_REQ_COMPLETED',
515            'TCP_WRITE_START',
516            'TCP_WRITE_DONE',
517            'TCP_READ_DONE',
518            'TCP_REQ_AWAIT_R2T_ACK'])
519        self._objects = []
520        self._find_objects(dtrace.entries)
521
522    def _find_objects(self, dprobes):
523        def probe_match(probe, other):
524            return probe.args['qpair'] == other.args['qpair']
525
526        for i, dprobe in enumerate(dprobes):
527            if dprobe.name != 'nvmf_poll_group_add_qpair':
528                continue
529            # We've found a new qpair, now find the probe indicating its destruction
530            last_idx, last = next((((i + j + 1), d) for j, d in enumerate(islice(dprobes, i, None))
531                                   if d.name == 'nvmf_poll_group_remove_qpair' and
532                                   probe_match(d, dprobe)), (None, None))
533            obj = SPDKObject.Lifetime(begin=dprobe.args['tsc'],
534                                      end=last.args['tsc'] if last is not None else TSC_MAX,
535                                      ptr=dprobe.args['qpair'],
536                                      properties={'ptr': hex(dprobe.args['qpair']),
537                                                  'thread': dprobe.args['thread']})
538            for other in filter(lambda p: probe_match(p, dprobe), dprobes[i:last_idx]):
539                if other.name == 'nvmf_ctrlr_add_qpair':
540                    for prop in ['qid', 'subnqn', 'hostnqn']:
541                        obj.properties[prop] = other.args[prop]
542            self._objects.append(obj)
543
544    def _annotate(self, entry):
545        qpair = entry.args.get('qpair')
546        if qpair is None:
547            return None
548        for obj in self._objects:
549            if obj.ptr == qpair and obj.begin <= entry.tsc <= obj.end:
550                return {'qpair': obj.properties}
551        return None
552
553
554def build_dtrace(file=None):
555    return DTrace([
556        DTraceProbe(
557            name='nvmf_poll_group_add_qpair',
558            args=[DTraceArgument(name='tsc', pos=0, type=int),
559                  DTraceArgument(name='qpair', pos=1, type=int),
560                  DTraceArgument(name='thread', pos=2, type=int)]),
561        DTraceProbe(
562            name='nvmf_poll_group_remove_qpair',
563            args=[DTraceArgument(name='tsc', pos=0, type=int),
564                  DTraceArgument(name='qpair', pos=1, type=int),
565                  DTraceArgument(name='thread', pos=2, type=int)]),
566        DTraceProbe(
567            name='nvmf_ctrlr_add_qpair',
568            args=[DTraceArgument(name='tsc', pos=0, type=int),
569                  DTraceArgument(name='qpair', pos=1, type=int),
570                  DTraceArgument(name='qid', pos=2, type=int),
571                  DTraceArgument(name='subnqn', pos=3, type=str),
572                  DTraceArgument(name='hostnqn', pos=4, type=str)])], file)
573
574
575def print_trace(trace_file, dtrace_file):
576    dtrace = build_dtrace(dtrace_file)
577    trace = Trace(trace_file)
578    trace.register_object(QPair(trace, dtrace))
579    trace.print()
580
581
582def main(argv):
583    parser = ArgumentParser(description='SPDK trace annotation script')
584    parser.add_argument('-i', '--input',
585                        help='Trace file to annotate (either JSON generated by spdk_trace or ' +
586                             'raw binary produced by the SPDK application itself)')
587    parser.add_argument('-g', '--generate', help='Generate bpftrace script', action='store_true')
588    parser.add_argument('-r', '--record', help='Record BPF traces on PID', metavar='PID', type=int)
589    parser.add_argument('-b', '--bpftrace', help='BPF trace script to use for annotations')
590    args = parser.parse_args(argv)
591
592    if args.generate:
593        print(build_dtrace().generate())
594    elif args.record:
595        build_dtrace().record(args.record)
596    else:
597        print_trace(open(args.input, 'r') if args.input is not None else sys.stdin,
598                    open(args.bpftrace) if args.bpftrace is not None else None)
599
600
601if __name__ == '__main__':
602    # In order for the changes to LD_LIBRARY_PATH to be visible to the loader,
603    # they need to be applied before starting a process, so we need to
604    # re-execute the script after updating it.
605    if os.environ.get('SPDK_BPF_TRACE_PY') is None:
606        rootdir = f'{os.path.dirname(__file__)}/../..'
607        os.environ['LD_LIBRARY_PATH'] = ':'.join([os.environ.get('LD_LIBRARY_PATH', ''),
608                                                  f'{rootdir}/build/lib'])
609        os.environ['SPDK_BPF_TRACE_PY'] = '1'
610        os.execv(sys.argv[0], sys.argv)
611    else:
612        try:
613            main(sys.argv[1:])
614        except (KeyboardInterrupt, BrokenPipeError):
615            pass
616