1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (c) 2023 NVIDIA Corporation & Affiliates 4 5""" 6Analyzing the mlx5 PMD datapath tracings 7""" 8import sys 9import argparse 10import bt2 11 12PFX_TX = "pmd.net.mlx5.tx." 13PFX_TX_LEN = len(PFX_TX) 14 15 16class MlxQueue: 17 """Queue container object""" 18 19 def __init__(self): 20 self.done_burst = [] # completed bursts 21 self.wait_burst = [] # waiting for completion 22 self.pq_id = 0 23 24 def log(self, all): 25 """Log all queue bursts""" 26 for txb in self.done_burst: 27 txb.log() 28 if all == True: 29 for txb in self.wait_burst: 30 txb.log() 31 32 33class MlxMbuf: 34 """Packet mbufs container object""" 35 36 def __init__(self): 37 self.wqe = 0 # wqe id 38 self.ptr = None # first packet mbuf pointer 39 self.len = 0 # packet data length 40 self.nseg = 0 # number of segments 41 42 def log(self): 43 """Log mbuf""" 44 out_txt = " %X: %u" % (self.ptr, self.len) 45 if self.nseg != 1: 46 out_txt += " (%d segs)" % self.nseg 47 print(out_txt) 48 49 50class MlxWqe: 51 """WQE container object""" 52 53 def __init__(self): 54 self.mbuf = [] # list of mbufs in WQE 55 self.wait_ts = 0 # preceding wait/push timestamp 56 self.comp_ts = 0 # send/recv completion timestamp 57 self.opcode = 0 58 59 def log(self): 60 """Log WQE""" 61 wqe_id = (self.opcode >> 8) & 0xFFFF 62 wqe_op = self.opcode & 0xFF 63 out_txt = " %04X: " % wqe_id 64 if wqe_op == 0xF: 65 out_txt += "WAIT" 66 elif wqe_op == 0x29: 67 out_txt += "EMPW" 68 elif wqe_op == 0xE: 69 out_txt += "TSO " 70 elif wqe_op == 0xA: 71 out_txt += "SEND" 72 else: 73 out_txt += "0x%02X" % wqe_op 74 if self.comp_ts != 0: 75 out_txt += " (%d, %d)" % (self.wait_ts, self.comp_ts - self.wait_ts) 76 else: 77 out_txt += " (%d)" % self.wait_ts 78 print(out_txt) 79 for mbuf in self.mbuf: 80 mbuf.log() 81 82 def comp(self, wqe_id, wqe_ts): 83 """Return 0 if WQE in not completedLog WQE""" 84 if self.comp_ts != 0: 85 return 1 86 cur_id = (self.opcode >> 8) & 0xFFFF 87 if cur_id > wqe_id: 88 cur_id -= wqe_id 89 if cur_id <= 0x8000: 90 return 0 91 else: 92 cur_id = wqe_id - cur_id 93 if cur_id >= 0x8000: 94 return 0 95 self.comp_ts = wqe_ts 96 return 1 97 98 99class MlxBurst: 100 """Packet burst container object""" 101 102 def __init__(self): 103 self.wqes = [] # issued burst WQEs 104 self.done = 0 # number of sent/recv packets 105 self.req = 0 # requested number of packets 106 self.call_ts = 0 # burst routine invocation 107 self.done_ts = 0 # burst routine done 108 self.queue = None 109 110 def log(self): 111 """Log burst""" 112 port = self.queue.pq_id >> 16 113 queue = self.queue.pq_id & 0xFFFF 114 if self.req == 0: 115 print( 116 "%u: tx(p=%u, q=%u, %u/%u pkts (incomplete)" 117 % (self.call_ts, port, queue, self.done, self.req) 118 ) 119 else: 120 print( 121 "%u: tx(p=%u, q=%u, %u/%u pkts in %u" 122 % ( 123 self.call_ts, 124 port, 125 queue, 126 self.done, 127 self.req, 128 self.done_ts - self.call_ts, 129 ) 130 ) 131 for wqe in self.wqes: 132 wqe.log() 133 134 def comp(self, wqe_id, wqe_ts): 135 """Return 0 if not all of WQEs in burst completed""" 136 wlen = len(self.wqes) 137 if wlen == 0: 138 return 0 139 for wqe in self.wqes: 140 if wqe.comp(wqe_id, wqe_ts) == 0: 141 return 0 142 return 1 143 144 145class MlxTrace: 146 """Trace representing object""" 147 148 def __init__(self): 149 self.tx_blst = {} # current Tx bursts per CPU 150 self.tx_qlst = {} # active Tx queues per port/queue 151 self.tx_wlst = {} # wait timestamp list per CPU 152 153 def run(self, msg_it, verbose): 154 """Run over gathered tracing data and build database""" 155 for msg in msg_it: 156 if not isinstance(msg, bt2._EventMessageConst): 157 continue 158 event = msg.event 159 if event.name.startswith(PFX_TX): 160 do_tx(msg, self, verbose) 161 # Handling of other log event cathegories can be added here 162 if verbose: 163 print("*** End of raw data dump ***") 164 165 def log(self, all): 166 """Log gathered trace database""" 167 for pq_id in self.tx_qlst: 168 queue = self.tx_qlst.get(pq_id) 169 queue.log(all) 170 171 172def do_tx_entry(msg, trace, verbose): 173 """Handle entry Tx busrt""" 174 event = msg.event 175 cpu_id = event["cpu_id"] 176 burst = trace.tx_blst.get(cpu_id) 177 if burst is not None: 178 # continue existing burst after WAIT 179 return 180 if verbose > 0: 181 print("%u:%X tx_entry(real_time=%u, port_id=%u, queue_id=%u)" % 182 (msg.default_clock_snapshot.ns_from_origin, cpu_id, 183 event["real_time"], event["port_id"], event["queue_id"])) 184 # allocate the new burst and append to the queue 185 burst = MlxBurst() 186 burst.call_ts = event["real_time"] 187 if burst.call_ts == 0: 188 burst.call_ts = msg.default_clock_snapshot.ns_from_origin 189 trace.tx_blst[cpu_id] = burst 190 pq_id = event["port_id"] << 16 | event["queue_id"] 191 queue = trace.tx_qlst.get(pq_id) 192 if queue is None: 193 # queue does not exist - allocate the new one 194 queue = MlxQueue() 195 queue.pq_id = pq_id 196 trace.tx_qlst[pq_id] = queue 197 burst.queue = queue 198 queue.wait_burst.append(burst) 199 200 201def do_tx_exit(msg, trace, verbose): 202 """Handle exit Tx busrt""" 203 event = msg.event 204 cpu_id = event["cpu_id"] 205 if verbose > 0: 206 print("%u:%X tx_exit(real_time=%u, nb_sent=%u, nb_req=%u)" % 207 (msg.default_clock_snapshot.ns_from_origin, cpu_id, 208 event["real_time"], event["nb_sent"], event["nb_req"])) 209 burst = trace.tx_blst.get(cpu_id) 210 if burst is None: 211 return 212 burst.done_ts = event["real_time"] 213 if burst.done_ts == 0: 214 burst.done_ts = msg.default_clock_snapshot.ns_from_origin 215 burst.req = event["nb_req"] 216 burst.done = event["nb_sent"] 217 trace.tx_blst.pop(cpu_id) 218 219 220def do_tx_wqe(msg, trace, verbose): 221 """Handle WQE record""" 222 event = msg.event 223 cpu_id = event["cpu_id"] 224 if verbose > 1: 225 print("%u:%X tx_wqe(real_time=%u, opcode=%08X)" % 226 (msg.default_clock_snapshot.ns_from_origin, cpu_id, 227 event["real_time"], event["opcode"])) 228 burst = trace.tx_blst.get(cpu_id) 229 if burst is None: 230 return 231 wqe = MlxWqe() 232 wqe.wait_ts = trace.tx_wlst.get(cpu_id) 233 if wqe.wait_ts is None: 234 wqe.wait_ts = event["real_time"] 235 if wqe.wait_ts == 0: 236 wqe.wait_ts = msg.default_clock_snapshot.ns_from_origin 237 wqe.opcode = event["opcode"] 238 burst.wqes.append(wqe) 239 240 241def do_tx_wait(msg, trace, verbose): 242 """Handle WAIT record""" 243 event = msg.event 244 cpu_id = event["cpu_id"] 245 if verbose > 1: 246 print("%u:%X tx_wait(ts=%u)" % 247 (msg.default_clock_snapshot.ns_from_origin, cpu_id, event["ts"])) 248 trace.tx_wlst[cpu_id] = event["ts"] 249 250 251def do_tx_push(msg, trace, verbose): 252 """Handle WQE push event""" 253 event = msg.event 254 cpu_id = event["cpu_id"] 255 if verbose > 2: 256 print("%u:%X tx_push(mbuf=%X, pkt_len=%u, nb_segs=%u, wqe_id=%04X)" % 257 (msg.default_clock_snapshot.ns_from_origin, cpu_id, event["mbuf"], 258 event["mbuf_pkt_len"], event["mbuf_nb_segs"], event["wqe_id"])) 259 burst = trace.tx_blst.get(cpu_id) 260 if burst is None: 261 return 262 if not burst.wqes: 263 return 264 wqe = burst.wqes[-1] 265 mbuf = MlxMbuf() 266 mbuf.wqe = event["wqe_id"] 267 mbuf.ptr = event["mbuf"] 268 mbuf.len = event["mbuf_pkt_len"] 269 mbuf.nseg = event["mbuf_nb_segs"] 270 wqe.mbuf.append(mbuf) 271 272 273def do_tx_complete(msg, trace, verbose): 274 """Handle send completion event""" 275 event = msg.event 276 pq_id = event["port_id"] << 16 | event["queue_id"] 277 if verbose > 1: 278 cpu_id = event["cpu_id"] 279 print("%u:%X tx_complete(port_id=%u, queue_id=%u, ts=%u, wqe_id=%04X)" % 280 (msg.default_clock_snapshot.ns_from_origin, cpu_id, 281 event["port_id"], event["queue_id"], event["ts"], event["wqe_id"])) 282 queue = trace.tx_qlst.get(pq_id) 283 if queue is None: 284 return 285 qlen = len(queue.wait_burst) 286 if qlen == 0: 287 return 288 wqe_id = event["wqe_id"] 289 wqe_ts = event["ts"] 290 rmv = 0 291 while rmv < qlen: 292 burst = queue.wait_burst[rmv] 293 if burst.comp(wqe_id, wqe_ts) == 0: 294 break 295 rmv += 1 296 # move completed burst(s) to done list 297 if rmv != 0: 298 idx = 0 299 while idx < rmv: 300 burst = queue.wait_burst[idx] 301 queue.done_burst.append(burst) 302 idx += 1 303 queue.wait_burst = queue.wait_burst[rmv:] 304 305 306def do_tx(msg, trace, verbose): 307 """Handle Tx related records""" 308 name = msg.event.name[PFX_TX_LEN:] 309 if name == "entry": 310 do_tx_entry(msg, trace, verbose) 311 elif name == "exit": 312 do_tx_exit(msg, trace, verbose) 313 elif name == "wqe": 314 do_tx_wqe(msg, trace, verbose) 315 elif name == "wait": 316 do_tx_wait(msg, trace, verbose) 317 elif name == "push": 318 do_tx_push(msg, trace, verbose) 319 elif name == "complete": 320 do_tx_complete(msg, trace, verbose) 321 else: 322 print("Error: unrecognized Tx event name: %s" % msg.event.name, file=sys.stderr) 323 raise ValueError() 324 325 326def main() -> int: 327 """Script entry point""" 328 try: 329 parser = argparse.ArgumentParser() 330 parser.add_argument("path", nargs=1, type=str, help="input trace folder") 331 parser.add_argument("-a", "--all", nargs="?", default=False, const=True, 332 help="show all the bursts, including incomplete ones") 333 parser.add_argument("-v", "--verbose", type=int, nargs="?", default=0, const=2, 334 help="show all the records below specified level") 335 args = parser.parse_args() 336 337 mlx_tr = MlxTrace() 338 msg_it = bt2.TraceCollectionMessageIterator(args.path) 339 mlx_tr.run(msg_it, args.verbose) 340 mlx_tr.log(args.all) 341 return 0 342 except ValueError: 343 return -1 344 345 346if __name__ == "__main__": 347 sys.exit(main()) 348