xref: /spdk/scripts/bpf/trace.py (revision 9efad7468f30e1c5f7442823f5a8b17acd1e6a9b)
1#!/usr/bin/env python3
2
3from argparse import ArgumentParser
4from dataclasses import dataclass, field
5from itertools import islice
6from typing import Dict, List, TypeVar
7import ijson
8import os
9import re
10import subprocess
11import sys
12import tempfile
13
14TSC_MAX = (1 << 64) - 1
15
16
17@dataclass
18class DTraceArgument:
19    """Describes a DTrace probe (usdt) argument"""
20    name: str
21    pos: int
22    type: type
23
24
25@dataclass
26class DTraceProbe:
27    """Describes a DTrace probe (usdt) point"""
28    name: str
29    args: Dict[str, DTraceArgument]
30
31    def __init__(self, name, args):
32        self.name = name
33        self.args = {a.name: a for a in args}
34
35
36@dataclass
37class DTraceEntry:
38    """Describes a single DTrace probe invocation"""
39    name: str
40    args: Dict[str, TypeVar('ArgumentType', str, int)]
41
42    def __init__(self, probe, args):
43        valmap = {int: lambda x: int(x, 16),
44                  str: lambda x: x.strip().strip("'")}
45        self.name = probe.name
46        self.args = {}
47        for name, value in args.items():
48            arg = probe.args.get(name)
49            if arg is None:
50                raise ValueError(f'Unexpected argument: {name}')
51            self.args[name] = valmap[arg.type](value)
52
53
54class DTrace:
55    """Generates bpftrace script based on the supplied probe points, parses its
56    output and stores is as a list of DTraceEntry sorted by their tsc.
57    """
58    def __init__(self, probes, file=None):
59        self._avail_probes = self._list_probes()
60        self._probes = {p.name: p for p in probes}
61        self.entries = self._parse(file) if file is not None else []
62        # Sanitize the probe definitions
63        for probe in probes:
64            if probe.name not in self._avail_probes:
65                raise ValueError(f'Couldn\'t find probe: "{probe.name}"')
66            for arg in probe.args.values():
67                if arg.pos >= self._avail_probes[probe.name]:
68                    raise ValueError('Invalid probe argument position')
69                if arg.type not in (int, str):
70                    raise ValueError('Invalid argument type')
71
72    def _parse(self, file):
73        regex = re.compile(r'(\w+): (.*)')
74        entries = []
75
76        for line in file.readlines():
77            match = regex.match(line)
78            if match is None:
79                continue
80            name, args = match.groups()
81            probe = self._probes.get(name)
82            # Skip the line if we don't recognize the probe name
83            if probe is None:
84                continue
85            entries.append(DTraceEntry(probe, args=dict(a.strip().split('=')
86                                                        for a in args.split(','))))
87        entries.sort(key=lambda e: e.args['tsc'])
88        return entries
89
90    def _list_probes(self):
91        files = subprocess.check_output(['git', 'ls-files', '*.[ch]',
92                                        ':!:include/spdk_internal/usdt.h'])
93        files = filter(lambda f: len(f) > 0, str(files, 'ascii').split('\n'))
94        regex = re.compile(r'SPDK_DTRACE_PROBE([0-9]*)\((\w+)')
95        probes = {}
96
97        for fname in files:
98            with open(fname, 'r') as file:
99                for match in regex.finditer(file.read()):
100                    nargs, name = match.group(1), match.group(2)
101                    nargs = int(nargs) if len(nargs) > 0 else 0
102                    # Add one to accommodate for the tsc being the first arg
103                    probes[name] = nargs + 1
104        return probes
105
106    def _gen_usdt(self, probe):
107        usdt = (f'usdt:__EXE__:{probe.name} {{' +
108                f'printf("{probe.name}: ')
109        args = probe.args
110        if len(args) > 0:
111            argtype = {int: '0x%lx', str: '\'%s\''}
112            argcast = {int: lambda x: x, str: lambda x: f'str({x})'}
113            argstr = [f'{a.name}={argtype[a.type]}' for a in args.values()]
114            argval = [f'{argcast[a.type](f"arg{a.pos}")}' for a in args.values()]
115            usdt += ', '.join(argstr) + '\\n", ' + ', '.join(argval)
116        else:
117            usdt += '\\n"'
118        usdt += ');}'
119        return usdt
120
121    def generate(self):
122        return '\n'.join([self._gen_usdt(p) for p in self._probes.values()])
123
124    def record(self, pid):
125        with tempfile.NamedTemporaryFile(mode='w+') as script:
126            script.write(self.generate())
127            script.flush()
128            try:
129                subprocess.run([f'{os.path.dirname(__file__)}/../bpftrace.sh',
130                                f'{pid}', f'{script.name}'])
131            except KeyboardInterrupt:
132                pass
133
134
135@dataclass
136class TracepointArgument:
137    """Describes an SPDK tracepoint argument"""
138    TYPE_INT = 0
139    TYPE_PTR = 1
140    TYPE_STR = 2
141    name: str
142    argtype: int
143
144
145@dataclass
146class Tracepoint:
147    """Describes an SPDK tracepoint, equivalent to struct spdk_trace_tpoint"""
148    name: str
149    id: int
150    new_object: bool
151    object_type: int
152    owner_type: int
153    args: List[TracepointArgument]
154
155
156@dataclass
157class TraceEntry:
158    """Describes an SPDK tracepoint entry, equivalent to struct spdk_trace_entry"""
159    lcore: int
160    tpoint: Tracepoint
161    tsc: int
162    poller: str
163    size: int
164    object_id: str
165    object_ptr: int
166    time: int
167    args: Dict[str, TypeVar('ArgumentType', str, int)]
168
169
170class TraceProvider:
171    """Defines interface for objects providing traces and tracepoint definitions"""
172
173    def tpoints(self):
174        """Returns tracepoint definitions as a dict of (tracepoint_name, tracepoint)"""
175        raise NotImplementedError()
176
177    def entries(self):
178        """Generator returning subsequent trace entries"""
179        raise NotImplementedError()
180
181    def tsc_rate(self):
182        """Returns the TSC rate that was in place when traces were collected"""
183        raise NotImplementedError()
184
185
186class JsonProvider(TraceProvider):
187    """Trace provider based on JSON-formatted output produced by spdk_trace app"""
188    def __init__(self, file):
189        self._parser = ijson.parse(file)
190        self._tpoints = {}
191        self._parse_defs()
192
193    def _parse_tpoints(self, tpoints):
194        for tpoint in tpoints:
195            tpoint_id = tpoint['id']
196            self._tpoints[tpoint_id] = Tracepoint(
197                name=tpoint['name'], id=tpoint_id,
198                new_object=tpoint['new_object'], object_type=OBJECT_NONE,
199                owner_type=OWNER_NONE,
200                args=[TracepointArgument(name=a['name'],
201                                         argtype=a['type'])
202                      for a in tpoint.get('args', [])])
203
204    def _parse_defs(self):
205        builder = None
206        for prefix, event, value in self._parser:
207            # If we reach entries array, there are no more tracepoint definitions
208            if prefix == 'entries':
209                break
210            elif prefix == 'tsc_rate':
211                self._tsc_rate = value
212                continue
213
214            if (prefix, event) == ('tpoints', 'start_array'):
215                builder = ijson.ObjectBuilder()
216            if builder is not None:
217                builder.event(event, value)
218            if (prefix, event) == ('tpoints', 'end_array'):
219                self._parse_tpoints(builder.value)
220                builder = None
221
222    def _parse_entry(self, entry):
223        tpoint = self._tpoints[entry['tpoint']]
224        obj = entry.get('object', {})
225        return TraceEntry(tpoint=tpoint, lcore=entry['lcore'], tsc=entry['tsc'],
226                          size=entry.get('size'), object_id=obj.get('id'),
227                          object_ptr=obj.get('value'), time=obj.get('time'),
228                          poller=entry.get('poller'),
229                          args={n.name: v for n, v in zip(tpoint.args, entry.get('args', []))})
230
231    def tsc_rate(self):
232        return self._tsc_rate
233
234    def tpoints(self):
235        return self._tpoints
236
237    def entries(self):
238        builder = None
239        for prefix, event, value in self._parser:
240            if (prefix, event) == ('entries.item', 'start_map'):
241                builder = ijson.ObjectBuilder()
242            if builder is not None:
243                builder.event(event, value)
244            if (prefix, event) == ('entries.item', 'end_map'):
245                yield self._parse_entry(builder.value)
246                builder = None
247
248
249class Trace:
250    """Stores, parses, and prints out SPDK traces"""
251    def __init__(self, file):
252        self._provider = JsonProvider(file)
253        self._objects = []
254        self._argfmt = {TracepointArgument.TYPE_PTR: lambda a: f'0x{a:x}'}
255        self.tpoints = self._provider.tpoints()
256
257    def _annotate_args(self, entry):
258        annotations = {}
259        for obj in self._objects:
260            current = obj.annotate(entry)
261            if current is None:
262                continue
263            annotations.update(current)
264        return annotations
265
266    def _format_args(self, entry):
267        annotations = self._annotate_args(entry)
268        args = []
269        for arg, (name, value) in zip(entry.tpoint.args, entry.args.items()):
270            annot = annotations.get(name)
271            if annot is not None:
272                args.append('{}({})'.format(name, ', '.join(f'{n}={v}' for n, v in annot.items())))
273            else:
274                args.append('{}: {}'.format(name, self._argfmt.get(arg.argtype,
275                                                                   lambda a: a)(value)))
276        return args
277
278    def register_object(self, obj):
279        self._objects.append(obj)
280
281    def print(self):
282        def get_us(tsc, off):
283            return ((tsc - off) * 10 ** 6) / self._provider.tsc_rate()
284
285        offset = None
286        for e in self._provider.entries():
287            offset = e.tsc if offset is None else offset
288            timestamp = get_us(e.tsc, offset)
289            diff = get_us(e.time, 0) if e.time is not None else None
290            args = ', '.join(self._format_args(e))
291            fields = [
292                f'{e.lcore:3}',
293                f'{timestamp:16.3f}',
294                f'{e.poller:3}' if e.poller is not None else ' ' * 3,
295                f'{e.tpoint.name:24}',
296                f'size: {e.size:6}' if e.size is not None else ' ' * (len('size: ') + 6),
297                f'id: {e.object_id:8}' if e.object_id is not None else None,
298                f'time: {diff:<8.3f}' if diff is not None else None,
299                args
300            ]
301
302            print(' '.join([*filter(lambda f: f is not None, fields)]).rstrip())
303
304
305class SPDKObject:
306    """Describes a specific type of an SPDK objects (e.g. qpair, thread, etc.)"""
307    @dataclass
308    class Lifetime:
309        """Describes a lifetime and properites of a particular SPDK object."""
310        begin: int
311        end: int
312        ptr: int
313        properties: dict = field(default_factory=dict)
314
315    def __init__(self, trace: Trace, tpoints: List[str]):
316        self.tpoints = {}
317        for name in tpoints:
318            tpoint = next((t for t in trace.tpoints.values() if t.name == name), None)
319            if tpoint is None:
320                # Some tpoints might be undefined if configured without specific subystems
321                continue
322            self.tpoints[tpoint.id] = tpoint
323
324    def _annotate(self, entry: TraceEntry):
325        """Abstract annotation method to be implemented by subclasses."""
326        raise NotImplementedError()
327
328    def annotate(self, entry: TraceEntry):
329        """Annotates a tpoint entry and returns a dict indexed by argname with values representing
330        various object properites.  For instance, {"qpair": {"qid": 1, "subnqn": "nqn"}} could be
331        returned to annotate an argument called "qpair" with two items: "qid" and "subnqn".
332        """
333        if entry.tpoint.id not in self.tpoints:
334            return None
335        return self._annotate(entry)
336
337
338class QPair(SPDKObject):
339    def __init__(self, trace: Trace, dtrace: DTrace):
340        super().__init__(trace, tpoints=[
341            'RDMA_REQ_NEW',
342            'RDMA_REQ_NEED_BUFFER',
343            'RDMA_REQ_TX_PENDING_C2H',
344            'RDMA_REQ_TX_PENDING_H2C',
345            'RDMA_REQ_TX_H2C',
346            'RDMA_REQ_RDY_TO_EXECUTE',
347            'RDMA_REQ_EXECUTING',
348            'RDMA_REQ_EXECUTED',
349            'RDMA_REQ_RDY_TO_COMPL',
350            'RDMA_REQ_COMPLETING_C2H',
351            'RDMA_REQ_COMPLETING',
352            'RDMA_REQ_COMPLETED'])
353        self._objects = []
354        self._find_objects(dtrace.entries)
355
356    def _find_objects(self, dprobes):
357        def probe_match(probe, other):
358            return probe.args['qpair'] == other.args['qpair']
359
360        for i, dprobe in enumerate(dprobes):
361            if dprobe.name != 'nvmf_poll_group_add_qpair':
362                continue
363            # We've found a new qpair, now find the probe indicating its destruction
364            last_idx, last = next((((i + j + 1), d) for j, d in enumerate(islice(dprobes, i, None))
365                                   if d.name == 'nvmf_poll_group_remove_qpair' and
366                                   probe_match(d, dprobe)), (None, None))
367            obj = SPDKObject.Lifetime(begin=dprobe.args['tsc'],
368                                      end=last.args['tsc'] if last is not None else TSC_MAX,
369                                      ptr=dprobe.args['qpair'],
370                                      properties={'ptr': hex(dprobe.args['qpair']),
371                                                  'thread': dprobe.args['thread']})
372            for other in filter(lambda p: probe_match(p, dprobe), dprobes[i:last_idx]):
373                if other.name == 'nvmf_ctrlr_add_qpair':
374                    for prop in ['qid', 'subnqn', 'hostnqn']:
375                        obj.properties[prop] = other.args[prop]
376            self._objects.append(obj)
377
378    def _annotate(self, entry):
379        qpair = entry.args.get('qpair')
380        if qpair is None:
381            return None
382        for obj in self._objects:
383            if obj.ptr == qpair and obj.begin <= entry.tsc <= obj.end:
384                return {'qpair': obj.properties}
385        return None
386
387
388def build_dtrace(file=None):
389    return DTrace([
390        DTraceProbe(
391            name='nvmf_poll_group_add_qpair',
392            args=[DTraceArgument(name='tsc', pos=0, type=int),
393                  DTraceArgument(name='qpair', pos=1, type=int),
394                  DTraceArgument(name='thread', pos=2, type=int)]),
395        DTraceProbe(
396            name='nvmf_poll_group_remove_qpair',
397            args=[DTraceArgument(name='tsc', pos=0, type=int),
398                  DTraceArgument(name='qpair', pos=1, type=int),
399                  DTraceArgument(name='thread', pos=2, type=int)]),
400        DTraceProbe(
401            name='nvmf_ctrlr_add_qpair',
402            args=[DTraceArgument(name='tsc', pos=0, type=int),
403                  DTraceArgument(name='qpair', pos=1, type=int),
404                  DTraceArgument(name='qid', pos=2, type=int),
405                  DTraceArgument(name='subnqn', pos=3, type=str),
406                  DTraceArgument(name='hostnqn', pos=4, type=str)])], file)
407
408
409def print_trace(trace_file, dtrace_file):
410    dtrace = build_dtrace(dtrace_file)
411    trace = Trace(trace_file)
412    trace.register_object(QPair(trace, dtrace))
413    trace.print()
414
415
416def main(argv):
417    parser = ArgumentParser(description='SPDK trace annotation script')
418    parser.add_argument('-i', '--input',
419                        help='JSON-formatted trace file produced by spdk_trace app')
420    parser.add_argument('-g', '--generate', help='Generate bpftrace script', action='store_true')
421    parser.add_argument('-r', '--record', help='Record BPF traces on PID', metavar='PID', type=int)
422    parser.add_argument('-b', '--bpftrace', help='BPF trace script to use for annotations')
423    args = parser.parse_args(argv)
424
425    if args.generate:
426        print(build_dtrace().generate())
427    elif args.record:
428        build_dtrace().record(args.record)
429    else:
430        print_trace(open(args.input, 'r') if args.input is not None else sys.stdin,
431                    open(args.bpftrace) if args.bpftrace is not None else None)
432
433
434if __name__ == '__main__':
435    main(sys.argv[1:])
436