1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (c) 2023 NVIDIA Corporation & Affiliates 4 5""" 6Analyzing the mlx5 PMD datapath tracings 7""" 8import sys 9import argparse 10import bt2 11 12PFX_TX = "pmd.net.mlx5.tx." 13PFX_TX_LEN = len(PFX_TX) 14 15 16class MlxQueue: 17 """Queue container object""" 18 19 def __init__(self): 20 self.done_burst = [] # completed bursts 21 self.wait_burst = [] # waiting for completion 22 self.pq_id = 0 23 24 def log(self): 25 """Log all queue bursts""" 26 for txb in self.done_burst: 27 txb.log() 28 29 30class MlxMbuf: 31 """Packet mbufs container object""" 32 33 def __init__(self): 34 self.wqe = 0 # wqe id 35 self.ptr = None # first packet mbuf pointer 36 self.len = 0 # packet data length 37 self.nseg = 0 # number of segments 38 39 def log(self): 40 """Log mbuf""" 41 out_txt = " %X: %u" % (self.ptr, self.len) 42 if self.nseg != 1: 43 out_txt += " (%d segs)" % self.nseg 44 print(out_txt) 45 46 47class MlxWqe: 48 """WQE container object""" 49 50 def __init__(self): 51 self.mbuf = [] # list of mbufs in WQE 52 self.wait_ts = 0 # preceding wait/push timestamp 53 self.comp_ts = 0 # send/recv completion timestamp 54 self.opcode = 0 55 56 def log(self): 57 """Log WQE""" 58 wqe_id = (self.opcode >> 8) & 0xFFFF 59 wqe_op = self.opcode & 0xFF 60 out_txt = " %04X: " % wqe_id 61 if wqe_op == 0xF: 62 out_txt += "WAIT" 63 elif wqe_op == 0x29: 64 out_txt += "EMPW" 65 elif wqe_op == 0xE: 66 out_txt += "TSO " 67 elif wqe_op == 0xA: 68 out_txt += "SEND" 69 else: 70 out_txt += "0x%02X" % wqe_op 71 if self.comp_ts != 0: 72 out_txt += " (%d, %d)" % (self.wait_ts, self.comp_ts - self.wait_ts) 73 else: 74 out_txt += " (%d)" % self.wait_ts 75 print(out_txt) 76 for mbuf in self.mbuf: 77 mbuf.log() 78 79 def comp(self, wqe_id, wqe_ts): 80 """Return 0 if WQE in not completedLog WQE""" 81 if self.comp_ts != 0: 82 return 1 83 cur_id = (self.opcode >> 8) & 0xFFFF 84 if cur_id > wqe_id: 85 cur_id -= wqe_id 86 if cur_id <= 0x8000: 87 return 0 88 else: 89 cur_id = wqe_id - cur_id 90 if cur_id >= 0x8000: 91 return 0 92 self.comp_ts = wqe_ts 93 return 1 94 95 96class MlxBurst: 97 """Packet burst container object""" 98 99 def __init__(self): 100 self.wqes = [] # issued burst WQEs 101 self.done = 0 # number of sent/recv packets 102 self.req = 0 # requested number of packets 103 self.call_ts = 0 # burst routine invocation 104 self.done_ts = 0 # burst routine done 105 self.queue = None 106 107 def log(self): 108 """Log burst""" 109 port = self.queue.pq_id >> 16 110 queue = self.queue.pq_id & 0xFFFF 111 if self.req == 0: 112 print( 113 "%u: tx(p=%u, q=%u, %u/%u pkts (incomplete)" 114 % (self.call_ts, port, queue, self.done, self.req) 115 ) 116 else: 117 print( 118 "%u: tx(p=%u, q=%u, %u/%u pkts in %u" 119 % ( 120 self.call_ts, 121 port, 122 queue, 123 self.done, 124 self.req, 125 self.done_ts - self.call_ts, 126 ) 127 ) 128 for wqe in self.wqes: 129 wqe.log() 130 131 def comp(self, wqe_id, wqe_ts): 132 """Return 0 if not all of WQEs in burst completed""" 133 wlen = len(self.wqes) 134 if wlen == 0: 135 return 0 136 for wqe in self.wqes: 137 if wqe.comp(wqe_id, wqe_ts) == 0: 138 return 0 139 return 1 140 141 142class MlxTrace: 143 """Trace representing object""" 144 145 def __init__(self): 146 self.tx_blst = {} # current Tx bursts per CPU 147 self.tx_qlst = {} # active Tx queues per port/queue 148 self.tx_wlst = {} # wait timestamp list per CPU 149 150 def run(self, msg_it): 151 """Run over gathered tracing data and build database""" 152 for msg in msg_it: 153 if not isinstance(msg, bt2._EventMessageConst): 154 continue 155 event = msg.event 156 if event.name.startswith(PFX_TX): 157 do_tx(msg, self) 158 # Handling of other log event cathegories can be added here 159 160 def log(self): 161 """Log gathered trace database""" 162 for pq_id in self.tx_qlst: 163 queue = self.tx_qlst.get(pq_id) 164 queue.log() 165 166 167def do_tx_entry(msg, trace): 168 """Handle entry Tx busrt""" 169 event = msg.event 170 cpu_id = event["cpu_id"] 171 burst = trace.tx_blst.get(cpu_id) 172 if burst is not None: 173 # continue existing burst after WAIT 174 return 175 # allocate the new burst and append to the queue 176 burst = MlxBurst() 177 burst.call_ts = msg.default_clock_snapshot.ns_from_origin 178 trace.tx_blst[cpu_id] = burst 179 pq_id = event["port_id"] << 16 | event["queue_id"] 180 queue = trace.tx_qlst.get(pq_id) 181 if queue is None: 182 # queue does not exist - allocate the new one 183 queue = MlxQueue() 184 queue.pq_id = pq_id 185 trace.tx_qlst[pq_id] = queue 186 burst.queue = queue 187 queue.wait_burst.append(burst) 188 189 190def do_tx_exit(msg, trace): 191 """Handle exit Tx busrt""" 192 event = msg.event 193 cpu_id = event["cpu_id"] 194 burst = trace.tx_blst.get(cpu_id) 195 if burst is None: 196 return 197 burst.done_ts = msg.default_clock_snapshot.ns_from_origin 198 burst.req = event["nb_req"] 199 burst.done = event["nb_sent"] 200 trace.tx_blst.pop(cpu_id) 201 202 203def do_tx_wqe(msg, trace): 204 """Handle WQE record""" 205 event = msg.event 206 cpu_id = event["cpu_id"] 207 burst = trace.tx_blst.get(cpu_id) 208 if burst is None: 209 return 210 wqe = MlxWqe() 211 wqe.wait_ts = trace.tx_wlst.get(cpu_id) 212 if wqe.wait_ts is None: 213 wqe.wait_ts = msg.default_clock_snapshot.ns_from_origin 214 wqe.opcode = event["opcode"] 215 burst.wqes.append(wqe) 216 217 218def do_tx_wait(msg, trace): 219 """Handle WAIT record""" 220 event = msg.event 221 cpu_id = event["cpu_id"] 222 trace.tx_wlst[cpu_id] = event["ts"] 223 224 225def do_tx_push(msg, trace): 226 """Handle WQE push event""" 227 event = msg.event 228 cpu_id = event["cpu_id"] 229 burst = trace.tx_blst.get(cpu_id) 230 if burst is None: 231 return 232 if not burst.wqes: 233 return 234 wqe = burst.wqes[-1] 235 mbuf = MlxMbuf() 236 mbuf.wqe = event["wqe_id"] 237 mbuf.ptr = event["mbuf"] 238 mbuf.len = event["mbuf_pkt_len"] 239 mbuf.nseg = event["mbuf_nb_segs"] 240 wqe.mbuf.append(mbuf) 241 242 243def do_tx_complete(msg, trace): 244 """Handle send completion event""" 245 event = msg.event 246 pq_id = event["port_id"] << 16 | event["queue_id"] 247 queue = trace.tx_qlst.get(pq_id) 248 if queue is None: 249 return 250 qlen = len(queue.wait_burst) 251 if qlen == 0: 252 return 253 wqe_id = event["wqe_id"] 254 wqe_ts = event["ts"] 255 rmv = 0 256 while rmv < qlen: 257 burst = queue.wait_burst[rmv] 258 if burst.comp(wqe_id, wqe_ts) == 0: 259 break 260 rmv += 1 261 # mode completed burst to done list 262 if rmv != 0: 263 idx = 0 264 while idx < rmv: 265 queue.done_burst.append(burst) 266 idx += 1 267 del queue.wait_burst[0:rmv] 268 269 270def do_tx(msg, trace): 271 """Handle Tx related records""" 272 name = msg.event.name[PFX_TX_LEN:] 273 if name == "entry": 274 do_tx_entry(msg, trace) 275 elif name == "exit": 276 do_tx_exit(msg, trace) 277 elif name == "wqe": 278 do_tx_wqe(msg, trace) 279 elif name == "wait": 280 do_tx_wait(msg, trace) 281 elif name == "push": 282 do_tx_push(msg, trace) 283 elif name == "complete": 284 do_tx_complete(msg, trace) 285 else: 286 print("Error: unrecognized Tx event name: %s" % msg.event.name, file=sys.stderr) 287 raise ValueError() 288 289 290def main() -> int: 291 """Script entry point""" 292 try: 293 parser = argparse.ArgumentParser() 294 parser.add_argument("path", nargs=1, type=str, help="input trace folder") 295 args = parser.parse_args() 296 297 mlx_tr = MlxTrace() 298 msg_it = bt2.TraceCollectionMessageIterator(args.path) 299 mlx_tr.run(msg_it) 300 mlx_tr.log() 301 return 0 302 except ValueError: 303 return -1 304 305 306if __name__ == "__main__": 307 sys.exit(main()) 308