1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright(c) 2023 University of New Hampshire 3# Copyright(c) 2023 PANTHEON.tech s.r.o. 4# Copyright(c) 2024 Arm Limited 5 6"""Testpmd interactive shell. 7 8Typical usage example in a TestSuite:: 9 10 testpmd_shell = TestPmdShell(self.sut_node) 11 devices = testpmd_shell.get_devices() 12 for device in devices: 13 print(device) 14 testpmd_shell.close() 15""" 16 17import re 18import time 19from dataclasses import dataclass, field 20from enum import Flag, auto 21from pathlib import PurePath 22from typing import ClassVar 23 24from typing_extensions import Self, Unpack 25 26from framework.exception import InteractiveCommandExecutionError 27from framework.params.testpmd import SimpleForwardingModes, TestPmdParams 28from framework.params.types import TestPmdParamsDict 29from framework.parser import ParserFn, TextParser 30from framework.remote_session.dpdk_shell import DPDKShell 31from framework.settings import SETTINGS 32from framework.testbed_model.cpu import LogicalCoreCount, LogicalCoreList 33from framework.testbed_model.sut_node import SutNode 34from framework.utils import StrEnum 35 36 37class TestPmdDevice: 38 """The data of a device that testpmd can recognize. 39 40 Attributes: 41 pci_address: The PCI address of the device. 42 """ 43 44 pci_address: str 45 46 def __init__(self, pci_address_line: str): 47 """Initialize the device from the testpmd output line string. 48 49 Args: 50 pci_address_line: A line of testpmd output that contains a device. 51 """ 52 self.pci_address = pci_address_line.strip().split(": ")[1].strip() 53 54 def __str__(self) -> str: 55 """The PCI address captures what the device is.""" 56 return self.pci_address 57 58 59class VLANOffloadFlag(Flag): 60 """Flag representing the VLAN offload settings of a NIC port.""" 61 62 #: 63 STRIP = auto() 64 #: 65 FILTER = auto() 66 #: 67 EXTEND = auto() 68 #: 69 QINQ_STRIP = auto() 70 71 @classmethod 72 def from_str_dict(cls, d): 73 """Makes an instance from a dict containing the flag member names with an "on" value. 74 75 Args: 76 d: A dictionary containing the flag members as keys and any string value. 77 78 Returns: 79 A new instance of the flag. 80 """ 81 flag = cls(0) 82 for name in cls.__members__: 83 if d.get(name) == "on": 84 flag |= cls[name] 85 return flag 86 87 @classmethod 88 def make_parser(cls) -> ParserFn: 89 """Makes a parser function. 90 91 Returns: 92 ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a 93 parser function that makes an instance of this flag from text. 94 """ 95 return TextParser.wrap( 96 TextParser.find( 97 r"VLAN offload:\s+" 98 r"strip (?P<STRIP>on|off), " 99 r"filter (?P<FILTER>on|off), " 100 r"extend (?P<EXTEND>on|off), " 101 r"qinq strip (?P<QINQ_STRIP>on|off)$", 102 re.MULTILINE, 103 named=True, 104 ), 105 cls.from_str_dict, 106 ) 107 108 109class RSSOffloadTypesFlag(Flag): 110 """Flag representing the RSS offload flow types supported by the NIC port.""" 111 112 #: 113 ipv4 = auto() 114 #: 115 ipv4_frag = auto() 116 #: 117 ipv4_tcp = auto() 118 #: 119 ipv4_udp = auto() 120 #: 121 ipv4_sctp = auto() 122 #: 123 ipv4_other = auto() 124 #: 125 ipv6 = auto() 126 #: 127 ipv6_frag = auto() 128 #: 129 ipv6_tcp = auto() 130 #: 131 ipv6_udp = auto() 132 #: 133 ipv6_sctp = auto() 134 #: 135 ipv6_other = auto() 136 #: 137 l2_payload = auto() 138 #: 139 ipv6_ex = auto() 140 #: 141 ipv6_tcp_ex = auto() 142 #: 143 ipv6_udp_ex = auto() 144 #: 145 port = auto() 146 #: 147 vxlan = auto() 148 #: 149 geneve = auto() 150 #: 151 nvgre = auto() 152 #: 153 user_defined_22 = auto() 154 #: 155 gtpu = auto() 156 #: 157 eth = auto() 158 #: 159 s_vlan = auto() 160 #: 161 c_vlan = auto() 162 #: 163 esp = auto() 164 #: 165 ah = auto() 166 #: 167 l2tpv3 = auto() 168 #: 169 pfcp = auto() 170 #: 171 pppoe = auto() 172 #: 173 ecpri = auto() 174 #: 175 mpls = auto() 176 #: 177 ipv4_chksum = auto() 178 #: 179 l4_chksum = auto() 180 #: 181 l2tpv2 = auto() 182 #: 183 ipv6_flow_label = auto() 184 #: 185 user_defined_38 = auto() 186 #: 187 user_defined_39 = auto() 188 #: 189 user_defined_40 = auto() 190 #: 191 user_defined_41 = auto() 192 #: 193 user_defined_42 = auto() 194 #: 195 user_defined_43 = auto() 196 #: 197 user_defined_44 = auto() 198 #: 199 user_defined_45 = auto() 200 #: 201 user_defined_46 = auto() 202 #: 203 user_defined_47 = auto() 204 #: 205 user_defined_48 = auto() 206 #: 207 user_defined_49 = auto() 208 #: 209 user_defined_50 = auto() 210 #: 211 user_defined_51 = auto() 212 #: 213 l3_pre96 = auto() 214 #: 215 l3_pre64 = auto() 216 #: 217 l3_pre56 = auto() 218 #: 219 l3_pre48 = auto() 220 #: 221 l3_pre40 = auto() 222 #: 223 l3_pre32 = auto() 224 #: 225 l2_dst_only = auto() 226 #: 227 l2_src_only = auto() 228 #: 229 l4_dst_only = auto() 230 #: 231 l4_src_only = auto() 232 #: 233 l3_dst_only = auto() 234 #: 235 l3_src_only = auto() 236 237 #: 238 ip = ipv4 | ipv4_frag | ipv4_other | ipv6 | ipv6_frag | ipv6_other | ipv6_ex 239 #: 240 udp = ipv4_udp | ipv6_udp | ipv6_udp_ex 241 #: 242 tcp = ipv4_tcp | ipv6_tcp | ipv6_tcp_ex 243 #: 244 sctp = ipv4_sctp | ipv6_sctp 245 #: 246 tunnel = vxlan | geneve | nvgre 247 #: 248 vlan = s_vlan | c_vlan 249 #: 250 all = ( 251 eth 252 | vlan 253 | ip 254 | tcp 255 | udp 256 | sctp 257 | l2_payload 258 | l2tpv3 259 | esp 260 | ah 261 | pfcp 262 | gtpu 263 | ecpri 264 | mpls 265 | l2tpv2 266 ) 267 268 @classmethod 269 def from_list_string(cls, names: str) -> Self: 270 """Makes a flag from a whitespace-separated list of names. 271 272 Args: 273 names: a whitespace-separated list containing the members of this flag. 274 275 Returns: 276 An instance of this flag. 277 """ 278 flag = cls(0) 279 for name in names.split(): 280 flag |= cls.from_str(name) 281 return flag 282 283 @classmethod 284 def from_str(cls, name: str) -> Self: 285 """Makes a flag matching the supplied name. 286 287 Args: 288 name: a valid member of this flag in text 289 Returns: 290 An instance of this flag. 291 """ 292 member_name = name.strip().replace("-", "_") 293 return cls[member_name] 294 295 @classmethod 296 def make_parser(cls) -> ParserFn: 297 """Makes a parser function. 298 299 Returns: 300 ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a 301 parser function that makes an instance of this flag from text. 302 """ 303 return TextParser.wrap( 304 TextParser.find(r"Supported RSS offload flow types:((?:\r?\n? \S+)+)", re.MULTILINE), 305 RSSOffloadTypesFlag.from_list_string, 306 ) 307 308 309class DeviceCapabilitiesFlag(Flag): 310 """Flag representing the device capabilities.""" 311 312 #: Device supports Rx queue setup after device started. 313 RUNTIME_RX_QUEUE_SETUP = auto() 314 #: Device supports Tx queue setup after device started. 315 RUNTIME_TX_QUEUE_SETUP = auto() 316 #: Device supports shared Rx queue among ports within Rx domain and switch domain. 317 RXQ_SHARE = auto() 318 #: Device supports keeping flow rules across restart. 319 FLOW_RULE_KEEP = auto() 320 #: Device supports keeping shared flow objects across restart. 321 FLOW_SHARED_OBJECT_KEEP = auto() 322 323 @classmethod 324 def make_parser(cls) -> ParserFn: 325 """Makes a parser function. 326 327 Returns: 328 ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a 329 parser function that makes an instance of this flag from text. 330 """ 331 return TextParser.wrap( 332 TextParser.find_int(r"Device capabilities: (0x[A-Fa-f\d]+)"), 333 cls, 334 ) 335 336 337class DeviceErrorHandlingMode(StrEnum): 338 """Enum representing the device error handling mode.""" 339 340 #: 341 none = auto() 342 #: 343 passive = auto() 344 #: 345 proactive = auto() 346 #: 347 unknown = auto() 348 349 @classmethod 350 def make_parser(cls) -> ParserFn: 351 """Makes a parser function. 352 353 Returns: 354 ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a 355 parser function that makes an instance of this enum from text. 356 """ 357 return TextParser.wrap(TextParser.find(r"Device error handling mode: (\w+)"), cls) 358 359 360def make_device_private_info_parser() -> ParserFn: 361 """Device private information parser. 362 363 Ensures that we are not parsing invalid device private info output. 364 365 Returns: 366 ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a parser 367 function that parses the device private info from the TestPmd port info output. 368 """ 369 370 def _validate(info: str): 371 info = info.strip() 372 if info == "none" or info.startswith("Invalid file") or info.startswith("Failed to dump"): 373 return None 374 return info 375 376 return TextParser.wrap(TextParser.find(r"Device private info:\s+([\s\S]+)"), _validate) 377 378 379@dataclass 380class TestPmdPort(TextParser): 381 """Dataclass representing the result of testpmd's ``show port info`` command.""" 382 383 #: 384 id: int = field(metadata=TextParser.find_int(r"Infos for port (\d+)\b")) 385 #: 386 device_name: str = field(metadata=TextParser.find(r"Device name: ([^\r\n]+)")) 387 #: 388 driver_name: str = field(metadata=TextParser.find(r"Driver name: ([^\r\n]+)")) 389 #: 390 socket_id: int = field(metadata=TextParser.find_int(r"Connect to socket: (\d+)")) 391 #: 392 is_link_up: bool = field(metadata=TextParser.find("Link status: up")) 393 #: 394 link_speed: str = field(metadata=TextParser.find(r"Link speed: ([^\r\n]+)")) 395 #: 396 is_link_full_duplex: bool = field(metadata=TextParser.find("Link duplex: full-duplex")) 397 #: 398 is_link_autonegotiated: bool = field(metadata=TextParser.find("Autoneg status: On")) 399 #: 400 is_promiscuous_mode_enabled: bool = field(metadata=TextParser.find("Promiscuous mode: enabled")) 401 #: 402 is_allmulticast_mode_enabled: bool = field( 403 metadata=TextParser.find("Allmulticast mode: enabled") 404 ) 405 #: Maximum number of MAC addresses 406 max_mac_addresses_num: int = field( 407 metadata=TextParser.find_int(r"Maximum number of MAC addresses: (\d+)") 408 ) 409 #: Maximum configurable length of RX packet 410 max_hash_mac_addresses_num: int = field( 411 metadata=TextParser.find_int(r"Maximum number of MAC addresses of hash filtering: (\d+)") 412 ) 413 #: Minimum size of RX buffer 414 min_rx_bufsize: int = field(metadata=TextParser.find_int(r"Minimum size of RX buffer: (\d+)")) 415 #: Maximum configurable length of RX packet 416 max_rx_packet_length: int = field( 417 metadata=TextParser.find_int(r"Maximum configurable length of RX packet: (\d+)") 418 ) 419 #: Maximum configurable size of LRO aggregated packet 420 max_lro_packet_size: int = field( 421 metadata=TextParser.find_int(r"Maximum configurable size of LRO aggregated packet: (\d+)") 422 ) 423 424 #: Current number of RX queues 425 rx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of RX queues: (\d+)")) 426 #: Max possible RX queues 427 max_rx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible RX queues: (\d+)")) 428 #: Max possible number of RXDs per queue 429 max_queue_rxd_num: int = field( 430 metadata=TextParser.find_int(r"Max possible number of RXDs per queue: (\d+)") 431 ) 432 #: Min possible number of RXDs per queue 433 min_queue_rxd_num: int = field( 434 metadata=TextParser.find_int(r"Min possible number of RXDs per queue: (\d+)") 435 ) 436 #: RXDs number alignment 437 rxd_alignment_num: int = field(metadata=TextParser.find_int(r"RXDs number alignment: (\d+)")) 438 439 #: Current number of TX queues 440 tx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of TX queues: (\d+)")) 441 #: Max possible TX queues 442 max_tx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible TX queues: (\d+)")) 443 #: Max possible number of TXDs per queue 444 max_queue_txd_num: int = field( 445 metadata=TextParser.find_int(r"Max possible number of TXDs per queue: (\d+)") 446 ) 447 #: Min possible number of TXDs per queue 448 min_queue_txd_num: int = field( 449 metadata=TextParser.find_int(r"Min possible number of TXDs per queue: (\d+)") 450 ) 451 #: TXDs number alignment 452 txd_alignment_num: int = field(metadata=TextParser.find_int(r"TXDs number alignment: (\d+)")) 453 #: Max segment number per packet 454 max_packet_segment_num: int = field( 455 metadata=TextParser.find_int(r"Max segment number per packet: (\d+)") 456 ) 457 #: Max segment number per MTU/TSO 458 max_mtu_segment_num: int = field( 459 metadata=TextParser.find_int(r"Max segment number per MTU\/TSO: (\d+)") 460 ) 461 462 #: 463 device_capabilities: DeviceCapabilitiesFlag = field( 464 metadata=DeviceCapabilitiesFlag.make_parser(), 465 ) 466 #: 467 device_error_handling_mode: DeviceErrorHandlingMode = field( 468 metadata=DeviceErrorHandlingMode.make_parser() 469 ) 470 #: 471 device_private_info: str | None = field( 472 default=None, 473 metadata=make_device_private_info_parser(), 474 ) 475 476 #: 477 hash_key_size: int | None = field( 478 default=None, metadata=TextParser.find_int(r"Hash key size in bytes: (\d+)") 479 ) 480 #: 481 redirection_table_size: int | None = field( 482 default=None, metadata=TextParser.find_int(r"Redirection table size: (\d+)") 483 ) 484 #: 485 supported_rss_offload_flow_types: RSSOffloadTypesFlag = field( 486 default=RSSOffloadTypesFlag(0), metadata=RSSOffloadTypesFlag.make_parser() 487 ) 488 489 #: 490 mac_address: str | None = field( 491 default=None, metadata=TextParser.find(r"MAC address: ([A-Fa-f0-9:]+)") 492 ) 493 #: 494 fw_version: str | None = field( 495 default=None, metadata=TextParser.find(r"Firmware-version: ([^\r\n]+)") 496 ) 497 #: 498 dev_args: str | None = field(default=None, metadata=TextParser.find(r"Devargs: ([^\r\n]+)")) 499 #: Socket id of the memory allocation 500 mem_alloc_socket_id: int | None = field( 501 default=None, 502 metadata=TextParser.find_int(r"memory allocation on the socket: (\d+)"), 503 ) 504 #: 505 mtu: int | None = field(default=None, metadata=TextParser.find_int(r"MTU: (\d+)")) 506 507 #: 508 vlan_offload: VLANOffloadFlag | None = field( 509 default=None, 510 metadata=VLANOffloadFlag.make_parser(), 511 ) 512 513 #: Maximum size of RX buffer 514 max_rx_bufsize: int | None = field( 515 default=None, metadata=TextParser.find_int(r"Maximum size of RX buffer: (\d+)") 516 ) 517 #: Maximum number of VFs 518 max_vfs_num: int | None = field( 519 default=None, metadata=TextParser.find_int(r"Maximum number of VFs: (\d+)") 520 ) 521 #: Maximum number of VMDq pools 522 max_vmdq_pools_num: int | None = field( 523 default=None, metadata=TextParser.find_int(r"Maximum number of VMDq pools: (\d+)") 524 ) 525 526 #: 527 switch_name: str | None = field( 528 default=None, metadata=TextParser.find(r"Switch name: ([\r\n]+)") 529 ) 530 #: 531 switch_domain_id: int | None = field( 532 default=None, metadata=TextParser.find_int(r"Switch domain Id: (\d+)") 533 ) 534 #: 535 switch_port_id: int | None = field( 536 default=None, metadata=TextParser.find_int(r"Switch Port Id: (\d+)") 537 ) 538 #: 539 switch_rx_domain: int | None = field( 540 default=None, metadata=TextParser.find_int(r"Switch Rx domain: (\d+)") 541 ) 542 543 544@dataclass 545class TestPmdPortStats(TextParser): 546 """Port statistics.""" 547 548 #: 549 port_id: int = field(metadata=TextParser.find_int(r"NIC statistics for port (\d+)")) 550 551 #: 552 rx_packets: int = field(metadata=TextParser.find_int(r"RX-packets:\s+(\d+)")) 553 #: 554 rx_missed: int = field(metadata=TextParser.find_int(r"RX-missed:\s+(\d+)")) 555 #: 556 rx_bytes: int = field(metadata=TextParser.find_int(r"RX-bytes:\s+(\d+)")) 557 #: 558 rx_errors: int = field(metadata=TextParser.find_int(r"RX-errors:\s+(\d+)")) 559 #: 560 rx_nombuf: int = field(metadata=TextParser.find_int(r"RX-nombuf:\s+(\d+)")) 561 562 #: 563 tx_packets: int = field(metadata=TextParser.find_int(r"TX-packets:\s+(\d+)")) 564 #: 565 tx_errors: int = field(metadata=TextParser.find_int(r"TX-errors:\s+(\d+)")) 566 #: 567 tx_bytes: int = field(metadata=TextParser.find_int(r"TX-bytes:\s+(\d+)")) 568 569 #: 570 rx_pps: int = field(metadata=TextParser.find_int(r"Rx-pps:\s+(\d+)")) 571 #: 572 rx_bps: int = field(metadata=TextParser.find_int(r"Rx-bps:\s+(\d+)")) 573 574 #: 575 tx_pps: int = field(metadata=TextParser.find_int(r"Tx-pps:\s+(\d+)")) 576 #: 577 tx_bps: int = field(metadata=TextParser.find_int(r"Tx-bps:\s+(\d+)")) 578 579 580class TestPmdShell(DPDKShell): 581 """Testpmd interactive shell. 582 583 The testpmd shell users should never use 584 the :meth:`~.interactive_shell.InteractiveShell.send_command` method directly, but rather 585 call specialized methods. If there isn't one that satisfies a need, it should be added. 586 """ 587 588 _app_params: TestPmdParams 589 590 #: The path to the testpmd executable. 591 path: ClassVar[PurePath] = PurePath("app", "dpdk-testpmd") 592 593 #: The testpmd's prompt. 594 _default_prompt: ClassVar[str] = "testpmd>" 595 596 #: This forces the prompt to appear after sending a command. 597 _command_extra_chars: ClassVar[str] = "\n" 598 599 def __init__( 600 self, 601 node: SutNode, 602 privileged: bool = True, 603 timeout: float = SETTINGS.timeout, 604 lcore_filter_specifier: LogicalCoreCount | LogicalCoreList = LogicalCoreCount(), 605 ascending_cores: bool = True, 606 append_prefix_timestamp: bool = True, 607 name: str | None = None, 608 **app_params: Unpack[TestPmdParamsDict], 609 ) -> None: 610 """Overrides :meth:`~.dpdk_shell.DPDKShell.__init__`. Changes app_params to kwargs.""" 611 super().__init__( 612 node, 613 privileged, 614 timeout, 615 lcore_filter_specifier, 616 ascending_cores, 617 append_prefix_timestamp, 618 TestPmdParams(**app_params), 619 name, 620 ) 621 622 def start(self, verify: bool = True) -> None: 623 """Start packet forwarding with the current configuration. 624 625 Args: 626 verify: If :data:`True` , a second start command will be sent in an attempt to verify 627 packet forwarding started as expected. 628 629 Raises: 630 InteractiveCommandExecutionError: If `verify` is :data:`True` and forwarding fails to 631 start or ports fail to come up. 632 """ 633 self.send_command("start") 634 if verify: 635 # If forwarding was already started, sending "start" again should tell us 636 start_cmd_output = self.send_command("start") 637 if "Packet forwarding already started" not in start_cmd_output: 638 self._logger.debug(f"Failed to start packet forwarding: \n{start_cmd_output}") 639 raise InteractiveCommandExecutionError("Testpmd failed to start packet forwarding.") 640 641 number_of_ports = len(self._app_params.ports or []) 642 for port_id in range(number_of_ports): 643 if not self.wait_link_status_up(port_id): 644 raise InteractiveCommandExecutionError( 645 "Not all ports came up after starting packet forwarding in testpmd." 646 ) 647 648 def stop(self, verify: bool = True) -> None: 649 """Stop packet forwarding. 650 651 Args: 652 verify: If :data:`True` , the output of the stop command is scanned to verify that 653 forwarding was stopped successfully or not started. If neither is found, it is 654 considered an error. 655 656 Raises: 657 InteractiveCommandExecutionError: If `verify` is :data:`True` and the command to stop 658 forwarding results in an error. 659 """ 660 stop_cmd_output = self.send_command("stop") 661 if verify: 662 if ( 663 "Done." not in stop_cmd_output 664 and "Packet forwarding not started" not in stop_cmd_output 665 ): 666 self._logger.debug(f"Failed to stop packet forwarding: \n{stop_cmd_output}") 667 raise InteractiveCommandExecutionError("Testpmd failed to stop packet forwarding.") 668 669 def get_devices(self) -> list[TestPmdDevice]: 670 """Get a list of device names that are known to testpmd. 671 672 Uses the device info listed in testpmd and then parses the output. 673 674 Returns: 675 A list of devices. 676 """ 677 dev_info: str = self.send_command("show device info all") 678 dev_list: list[TestPmdDevice] = [] 679 for line in dev_info.split("\n"): 680 if "device name:" in line.lower(): 681 dev_list.append(TestPmdDevice(line)) 682 return dev_list 683 684 def wait_link_status_up(self, port_id: int, timeout=SETTINGS.timeout) -> bool: 685 """Wait until the link status on the given port is "up". 686 687 Arguments: 688 port_id: Port to check the link status on. 689 timeout: Time to wait for the link to come up. The default value for this 690 argument may be modified using the :option:`--timeout` command-line argument 691 or the :envvar:`DTS_TIMEOUT` environment variable. 692 693 Returns: 694 Whether the link came up in time or not. 695 """ 696 time_to_stop = time.time() + timeout 697 port_info: str = "" 698 while time.time() < time_to_stop: 699 port_info = self.send_command(f"show port info {port_id}") 700 if "Link status: up" in port_info: 701 break 702 time.sleep(0.5) 703 else: 704 self._logger.error(f"The link for port {port_id} did not come up in the given timeout.") 705 return "Link status: up" in port_info 706 707 def set_forward_mode(self, mode: SimpleForwardingModes, verify: bool = True): 708 """Set packet forwarding mode. 709 710 Args: 711 mode: The forwarding mode to use. 712 verify: If :data:`True` the output of the command will be scanned in an attempt to 713 verify that the forwarding mode was set to `mode` properly. 714 715 Raises: 716 InteractiveCommandExecutionError: If `verify` is :data:`True` and the forwarding mode 717 fails to update. 718 """ 719 set_fwd_output = self.send_command(f"set fwd {mode.value}") 720 if f"Set {mode.value} packet forwarding mode" not in set_fwd_output: 721 self._logger.debug(f"Failed to set fwd mode to {mode.value}:\n{set_fwd_output}") 722 raise InteractiveCommandExecutionError( 723 f"Test pmd failed to set fwd mode to {mode.value}" 724 ) 725 726 def show_port_info_all(self) -> list[TestPmdPort]: 727 """Returns the information of all the ports. 728 729 Returns: 730 list[TestPmdPort]: A list containing all the ports information as `TestPmdPort`. 731 """ 732 output = self.send_command("show port info all") 733 734 # Sample output of the "all" command looks like: 735 # 736 # <start> 737 # 738 # ********************* Infos for port 0 ********************* 739 # Key: value 740 # 741 # ********************* Infos for port 1 ********************* 742 # Key: value 743 # <end> 744 # 745 # Takes advantage of the double new line in between ports as end delimiter. But we need to 746 # artificially add a new line at the end to pick up the last port. Because commands are 747 # executed on a pseudo-terminal created by paramiko on the remote node, lines end with CRLF. 748 # Therefore we also need to take the carriage return into account. 749 iter = re.finditer(r"\*{21}.*?[\r\n]{4}", output + "\r\n", re.S) 750 return [TestPmdPort.parse(block.group(0)) for block in iter] 751 752 def show_port_info(self, port_id: int) -> TestPmdPort: 753 """Returns the given port information. 754 755 Args: 756 port_id: The port ID to gather information for. 757 758 Raises: 759 InteractiveCommandExecutionError: If `port_id` is invalid. 760 761 Returns: 762 TestPmdPort: An instance of `TestPmdPort` containing the given port's information. 763 """ 764 output = self.send_command(f"show port info {port_id}", skip_first_line=True) 765 if output.startswith("Invalid port"): 766 raise InteractiveCommandExecutionError("invalid port given") 767 768 return TestPmdPort.parse(output) 769 770 def show_port_stats_all(self) -> list[TestPmdPortStats]: 771 """Returns the statistics of all the ports. 772 773 Returns: 774 list[TestPmdPortStats]: A list containing all the ports stats as `TestPmdPortStats`. 775 """ 776 output = self.send_command("show port stats all") 777 778 # Sample output of the "all" command looks like: 779 # 780 # ########### NIC statistics for port 0 ########### 781 # values... 782 # ################################################# 783 # 784 # ########### NIC statistics for port 1 ########### 785 # values... 786 # ################################################# 787 # 788 iter = re.finditer(r"(^ #*.+#*$[^#]+)^ #*\r$", output, re.MULTILINE) 789 return [TestPmdPortStats.parse(block.group(1)) for block in iter] 790 791 def show_port_stats(self, port_id: int) -> TestPmdPortStats: 792 """Returns the given port statistics. 793 794 Args: 795 port_id: The port ID to gather information for. 796 797 Raises: 798 InteractiveCommandExecutionError: If `port_id` is invalid. 799 800 Returns: 801 TestPmdPortStats: An instance of `TestPmdPortStats` containing the given port's stats. 802 """ 803 output = self.send_command(f"show port stats {port_id}", skip_first_line=True) 804 if output.startswith("Invalid port"): 805 raise InteractiveCommandExecutionError("invalid port given") 806 807 return TestPmdPortStats.parse(output) 808 809 def _close(self) -> None: 810 """Overrides :meth:`~.interactive_shell.close`.""" 811 self.stop() 812 self.send_command("quit", "Bye...") 813 return super()._close() 814