1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright(c) 2023 University of New Hampshire 3# Copyright(c) 2023 PANTHEON.tech s.r.o. 4# Copyright(c) 2024 Arm Limited 5 6"""Testpmd interactive shell. 7 8Typical usage example in a TestSuite:: 9 10 testpmd_shell = self.sut_node.create_interactive_shell( 11 TestPmdShell, privileged=True 12 ) 13 devices = testpmd_shell.get_devices() 14 for device in devices: 15 print(device) 16 testpmd_shell.close() 17""" 18 19import re 20import time 21from dataclasses import dataclass, field 22from enum import Flag, auto 23from pathlib import PurePath 24from typing import Callable, ClassVar 25 26from typing_extensions import Self 27 28from framework.exception import InteractiveCommandExecutionError 29from framework.parser import ParserFn, TextParser 30from framework.settings import SETTINGS 31from framework.utils import StrEnum 32 33from .interactive_shell import InteractiveShell 34 35 36class TestPmdDevice: 37 """The data of a device that testpmd can recognize. 38 39 Attributes: 40 pci_address: The PCI address of the device. 41 """ 42 43 pci_address: str 44 45 def __init__(self, pci_address_line: str): 46 """Initialize the device from the testpmd output line string. 47 48 Args: 49 pci_address_line: A line of testpmd output that contains a device. 50 """ 51 self.pci_address = pci_address_line.strip().split(": ")[1].strip() 52 53 def __str__(self) -> str: 54 """The PCI address captures what the device is.""" 55 return self.pci_address 56 57 58class TestPmdForwardingModes(StrEnum): 59 r"""The supported packet forwarding modes for :class:`~TestPmdShell`\s.""" 60 61 #: 62 io = auto() 63 #: 64 mac = auto() 65 #: 66 macswap = auto() 67 #: 68 flowgen = auto() 69 #: 70 rxonly = auto() 71 #: 72 txonly = auto() 73 #: 74 csum = auto() 75 #: 76 icmpecho = auto() 77 #: 78 ieee1588 = auto() 79 #: 80 noisy = auto() 81 #: 82 fivetswap = "5tswap" 83 #: 84 shared_rxq = "shared-rxq" 85 #: 86 recycle_mbufs = auto() 87 88 89class VLANOffloadFlag(Flag): 90 """Flag representing the VLAN offload settings of a NIC port.""" 91 92 #: 93 STRIP = auto() 94 #: 95 FILTER = auto() 96 #: 97 EXTEND = auto() 98 #: 99 QINQ_STRIP = auto() 100 101 @classmethod 102 def from_str_dict(cls, d): 103 """Makes an instance from a dict containing the flag member names with an "on" value. 104 105 Args: 106 d: A dictionary containing the flag members as keys and any string value. 107 108 Returns: 109 A new instance of the flag. 110 """ 111 flag = cls(0) 112 for name in cls.__members__: 113 if d.get(name) == "on": 114 flag |= cls[name] 115 return flag 116 117 @classmethod 118 def make_parser(cls) -> ParserFn: 119 """Makes a parser function. 120 121 Returns: 122 ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a 123 parser function that makes an instance of this flag from text. 124 """ 125 return TextParser.wrap( 126 TextParser.find( 127 r"VLAN offload:\s+" 128 r"strip (?P<STRIP>on|off), " 129 r"filter (?P<FILTER>on|off), " 130 r"extend (?P<EXTEND>on|off), " 131 r"qinq strip (?P<QINQ_STRIP>on|off)$", 132 re.MULTILINE, 133 named=True, 134 ), 135 cls.from_str_dict, 136 ) 137 138 139class RSSOffloadTypesFlag(Flag): 140 """Flag representing the RSS offload flow types supported by the NIC port.""" 141 142 #: 143 ipv4 = auto() 144 #: 145 ipv4_frag = auto() 146 #: 147 ipv4_tcp = auto() 148 #: 149 ipv4_udp = auto() 150 #: 151 ipv4_sctp = auto() 152 #: 153 ipv4_other = auto() 154 #: 155 ipv6 = auto() 156 #: 157 ipv6_frag = auto() 158 #: 159 ipv6_tcp = auto() 160 #: 161 ipv6_udp = auto() 162 #: 163 ipv6_sctp = auto() 164 #: 165 ipv6_other = auto() 166 #: 167 l2_payload = auto() 168 #: 169 ipv6_ex = auto() 170 #: 171 ipv6_tcp_ex = auto() 172 #: 173 ipv6_udp_ex = auto() 174 #: 175 port = auto() 176 #: 177 vxlan = auto() 178 #: 179 geneve = auto() 180 #: 181 nvgre = auto() 182 #: 183 user_defined_22 = auto() 184 #: 185 gtpu = auto() 186 #: 187 eth = auto() 188 #: 189 s_vlan = auto() 190 #: 191 c_vlan = auto() 192 #: 193 esp = auto() 194 #: 195 ah = auto() 196 #: 197 l2tpv3 = auto() 198 #: 199 pfcp = auto() 200 #: 201 pppoe = auto() 202 #: 203 ecpri = auto() 204 #: 205 mpls = auto() 206 #: 207 ipv4_chksum = auto() 208 #: 209 l4_chksum = auto() 210 #: 211 l2tpv2 = auto() 212 #: 213 ipv6_flow_label = auto() 214 #: 215 user_defined_38 = auto() 216 #: 217 user_defined_39 = auto() 218 #: 219 user_defined_40 = auto() 220 #: 221 user_defined_41 = auto() 222 #: 223 user_defined_42 = auto() 224 #: 225 user_defined_43 = auto() 226 #: 227 user_defined_44 = auto() 228 #: 229 user_defined_45 = auto() 230 #: 231 user_defined_46 = auto() 232 #: 233 user_defined_47 = auto() 234 #: 235 user_defined_48 = auto() 236 #: 237 user_defined_49 = auto() 238 #: 239 user_defined_50 = auto() 240 #: 241 user_defined_51 = auto() 242 #: 243 l3_pre96 = auto() 244 #: 245 l3_pre64 = auto() 246 #: 247 l3_pre56 = auto() 248 #: 249 l3_pre48 = auto() 250 #: 251 l3_pre40 = auto() 252 #: 253 l3_pre32 = auto() 254 #: 255 l2_dst_only = auto() 256 #: 257 l2_src_only = auto() 258 #: 259 l4_dst_only = auto() 260 #: 261 l4_src_only = auto() 262 #: 263 l3_dst_only = auto() 264 #: 265 l3_src_only = auto() 266 267 #: 268 ip = ipv4 | ipv4_frag | ipv4_other | ipv6 | ipv6_frag | ipv6_other | ipv6_ex 269 #: 270 udp = ipv4_udp | ipv6_udp | ipv6_udp_ex 271 #: 272 tcp = ipv4_tcp | ipv6_tcp | ipv6_tcp_ex 273 #: 274 sctp = ipv4_sctp | ipv6_sctp 275 #: 276 tunnel = vxlan | geneve | nvgre 277 #: 278 vlan = s_vlan | c_vlan 279 #: 280 all = ( 281 eth 282 | vlan 283 | ip 284 | tcp 285 | udp 286 | sctp 287 | l2_payload 288 | l2tpv3 289 | esp 290 | ah 291 | pfcp 292 | gtpu 293 | ecpri 294 | mpls 295 | l2tpv2 296 ) 297 298 @classmethod 299 def from_list_string(cls, names: str) -> Self: 300 """Makes a flag from a whitespace-separated list of names. 301 302 Args: 303 names: a whitespace-separated list containing the members of this flag. 304 305 Returns: 306 An instance of this flag. 307 """ 308 flag = cls(0) 309 for name in names.split(): 310 flag |= cls.from_str(name) 311 return flag 312 313 @classmethod 314 def from_str(cls, name: str) -> Self: 315 """Makes a flag matching the supplied name. 316 317 Args: 318 name: a valid member of this flag in text 319 Returns: 320 An instance of this flag. 321 """ 322 member_name = name.strip().replace("-", "_") 323 return cls[member_name] 324 325 @classmethod 326 def make_parser(cls) -> ParserFn: 327 """Makes a parser function. 328 329 Returns: 330 ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a 331 parser function that makes an instance of this flag from text. 332 """ 333 return TextParser.wrap( 334 TextParser.find(r"Supported RSS offload flow types:((?:\r?\n? \S+)+)", re.MULTILINE), 335 RSSOffloadTypesFlag.from_list_string, 336 ) 337 338 339class DeviceCapabilitiesFlag(Flag): 340 """Flag representing the device capabilities.""" 341 342 #: Device supports Rx queue setup after device started. 343 RUNTIME_RX_QUEUE_SETUP = auto() 344 #: Device supports Tx queue setup after device started. 345 RUNTIME_TX_QUEUE_SETUP = auto() 346 #: Device supports shared Rx queue among ports within Rx domain and switch domain. 347 RXQ_SHARE = auto() 348 #: Device supports keeping flow rules across restart. 349 FLOW_RULE_KEEP = auto() 350 #: Device supports keeping shared flow objects across restart. 351 FLOW_SHARED_OBJECT_KEEP = auto() 352 353 @classmethod 354 def make_parser(cls) -> ParserFn: 355 """Makes a parser function. 356 357 Returns: 358 ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a 359 parser function that makes an instance of this flag from text. 360 """ 361 return TextParser.wrap( 362 TextParser.find_int(r"Device capabilities: (0x[A-Fa-f\d]+)"), 363 cls, 364 ) 365 366 367class DeviceErrorHandlingMode(StrEnum): 368 """Enum representing the device error handling mode.""" 369 370 #: 371 none = auto() 372 #: 373 passive = auto() 374 #: 375 proactive = auto() 376 #: 377 unknown = auto() 378 379 @classmethod 380 def make_parser(cls) -> ParserFn: 381 """Makes a parser function. 382 383 Returns: 384 ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a 385 parser function that makes an instance of this enum from text. 386 """ 387 return TextParser.wrap(TextParser.find(r"Device error handling mode: (\w+)"), cls) 388 389 390def make_device_private_info_parser() -> ParserFn: 391 """Device private information parser. 392 393 Ensures that we are not parsing invalid device private info output. 394 395 Returns: 396 ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a parser 397 function that parses the device private info from the TestPmd port info output. 398 """ 399 400 def _validate(info: str): 401 info = info.strip() 402 if info == "none" or info.startswith("Invalid file") or info.startswith("Failed to dump"): 403 return None 404 return info 405 406 return TextParser.wrap(TextParser.find(r"Device private info:\s+([\s\S]+)"), _validate) 407 408 409@dataclass 410class TestPmdPort(TextParser): 411 """Dataclass representing the result of testpmd's ``show port info`` command.""" 412 413 #: 414 id: int = field(metadata=TextParser.find_int(r"Infos for port (\d+)\b")) 415 #: 416 device_name: str = field(metadata=TextParser.find(r"Device name: ([^\r\n]+)")) 417 #: 418 driver_name: str = field(metadata=TextParser.find(r"Driver name: ([^\r\n]+)")) 419 #: 420 socket_id: int = field(metadata=TextParser.find_int(r"Connect to socket: (\d+)")) 421 #: 422 is_link_up: bool = field(metadata=TextParser.find("Link status: up")) 423 #: 424 link_speed: str = field(metadata=TextParser.find(r"Link speed: ([^\r\n]+)")) 425 #: 426 is_link_full_duplex: bool = field(metadata=TextParser.find("Link duplex: full-duplex")) 427 #: 428 is_link_autonegotiated: bool = field(metadata=TextParser.find("Autoneg status: On")) 429 #: 430 is_promiscuous_mode_enabled: bool = field(metadata=TextParser.find("Promiscuous mode: enabled")) 431 #: 432 is_allmulticast_mode_enabled: bool = field( 433 metadata=TextParser.find("Allmulticast mode: enabled") 434 ) 435 #: Maximum number of MAC addresses 436 max_mac_addresses_num: int = field( 437 metadata=TextParser.find_int(r"Maximum number of MAC addresses: (\d+)") 438 ) 439 #: Maximum configurable length of RX packet 440 max_hash_mac_addresses_num: int = field( 441 metadata=TextParser.find_int(r"Maximum number of MAC addresses of hash filtering: (\d+)") 442 ) 443 #: Minimum size of RX buffer 444 min_rx_bufsize: int = field(metadata=TextParser.find_int(r"Minimum size of RX buffer: (\d+)")) 445 #: Maximum configurable length of RX packet 446 max_rx_packet_length: int = field( 447 metadata=TextParser.find_int(r"Maximum configurable length of RX packet: (\d+)") 448 ) 449 #: Maximum configurable size of LRO aggregated packet 450 max_lro_packet_size: int = field( 451 metadata=TextParser.find_int(r"Maximum configurable size of LRO aggregated packet: (\d+)") 452 ) 453 454 #: Current number of RX queues 455 rx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of RX queues: (\d+)")) 456 #: Max possible RX queues 457 max_rx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible RX queues: (\d+)")) 458 #: Max possible number of RXDs per queue 459 max_queue_rxd_num: int = field( 460 metadata=TextParser.find_int(r"Max possible number of RXDs per queue: (\d+)") 461 ) 462 #: Min possible number of RXDs per queue 463 min_queue_rxd_num: int = field( 464 metadata=TextParser.find_int(r"Min possible number of RXDs per queue: (\d+)") 465 ) 466 #: RXDs number alignment 467 rxd_alignment_num: int = field(metadata=TextParser.find_int(r"RXDs number alignment: (\d+)")) 468 469 #: Current number of TX queues 470 tx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of TX queues: (\d+)")) 471 #: Max possible TX queues 472 max_tx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible TX queues: (\d+)")) 473 #: Max possible number of TXDs per queue 474 max_queue_txd_num: int = field( 475 metadata=TextParser.find_int(r"Max possible number of TXDs per queue: (\d+)") 476 ) 477 #: Min possible number of TXDs per queue 478 min_queue_txd_num: int = field( 479 metadata=TextParser.find_int(r"Min possible number of TXDs per queue: (\d+)") 480 ) 481 #: TXDs number alignment 482 txd_alignment_num: int = field(metadata=TextParser.find_int(r"TXDs number alignment: (\d+)")) 483 #: Max segment number per packet 484 max_packet_segment_num: int = field( 485 metadata=TextParser.find_int(r"Max segment number per packet: (\d+)") 486 ) 487 #: Max segment number per MTU/TSO 488 max_mtu_segment_num: int = field( 489 metadata=TextParser.find_int(r"Max segment number per MTU\/TSO: (\d+)") 490 ) 491 492 #: 493 device_capabilities: DeviceCapabilitiesFlag = field( 494 metadata=DeviceCapabilitiesFlag.make_parser(), 495 ) 496 #: 497 device_error_handling_mode: DeviceErrorHandlingMode = field( 498 metadata=DeviceErrorHandlingMode.make_parser() 499 ) 500 #: 501 device_private_info: str | None = field( 502 default=None, 503 metadata=make_device_private_info_parser(), 504 ) 505 506 #: 507 hash_key_size: int | None = field( 508 default=None, metadata=TextParser.find_int(r"Hash key size in bytes: (\d+)") 509 ) 510 #: 511 redirection_table_size: int | None = field( 512 default=None, metadata=TextParser.find_int(r"Redirection table size: (\d+)") 513 ) 514 #: 515 supported_rss_offload_flow_types: RSSOffloadTypesFlag = field( 516 default=RSSOffloadTypesFlag(0), metadata=RSSOffloadTypesFlag.make_parser() 517 ) 518 519 #: 520 mac_address: str | None = field( 521 default=None, metadata=TextParser.find(r"MAC address: ([A-Fa-f0-9:]+)") 522 ) 523 #: 524 fw_version: str | None = field( 525 default=None, metadata=TextParser.find(r"Firmware-version: ([^\r\n]+)") 526 ) 527 #: 528 dev_args: str | None = field(default=None, metadata=TextParser.find(r"Devargs: ([^\r\n]+)")) 529 #: Socket id of the memory allocation 530 mem_alloc_socket_id: int | None = field( 531 default=None, 532 metadata=TextParser.find_int(r"memory allocation on the socket: (\d+)"), 533 ) 534 #: 535 mtu: int | None = field(default=None, metadata=TextParser.find_int(r"MTU: (\d+)")) 536 537 #: 538 vlan_offload: VLANOffloadFlag | None = field( 539 default=None, 540 metadata=VLANOffloadFlag.make_parser(), 541 ) 542 543 #: Maximum size of RX buffer 544 max_rx_bufsize: int | None = field( 545 default=None, metadata=TextParser.find_int(r"Maximum size of RX buffer: (\d+)") 546 ) 547 #: Maximum number of VFs 548 max_vfs_num: int | None = field( 549 default=None, metadata=TextParser.find_int(r"Maximum number of VFs: (\d+)") 550 ) 551 #: Maximum number of VMDq pools 552 max_vmdq_pools_num: int | None = field( 553 default=None, metadata=TextParser.find_int(r"Maximum number of VMDq pools: (\d+)") 554 ) 555 556 #: 557 switch_name: str | None = field( 558 default=None, metadata=TextParser.find(r"Switch name: ([\r\n]+)") 559 ) 560 #: 561 switch_domain_id: int | None = field( 562 default=None, metadata=TextParser.find_int(r"Switch domain Id: (\d+)") 563 ) 564 #: 565 switch_port_id: int | None = field( 566 default=None, metadata=TextParser.find_int(r"Switch Port Id: (\d+)") 567 ) 568 #: 569 switch_rx_domain: int | None = field( 570 default=None, metadata=TextParser.find_int(r"Switch Rx domain: (\d+)") 571 ) 572 573 574@dataclass 575class TestPmdPortStats(TextParser): 576 """Port statistics.""" 577 578 #: 579 port_id: int = field(metadata=TextParser.find_int(r"NIC statistics for port (\d+)")) 580 581 #: 582 rx_packets: int = field(metadata=TextParser.find_int(r"RX-packets:\s+(\d+)")) 583 #: 584 rx_missed: int = field(metadata=TextParser.find_int(r"RX-missed:\s+(\d+)")) 585 #: 586 rx_bytes: int = field(metadata=TextParser.find_int(r"RX-bytes:\s+(\d+)")) 587 #: 588 rx_errors: int = field(metadata=TextParser.find_int(r"RX-errors:\s+(\d+)")) 589 #: 590 rx_nombuf: int = field(metadata=TextParser.find_int(r"RX-nombuf:\s+(\d+)")) 591 592 #: 593 tx_packets: int = field(metadata=TextParser.find_int(r"TX-packets:\s+(\d+)")) 594 #: 595 tx_errors: int = field(metadata=TextParser.find_int(r"TX-errors:\s+(\d+)")) 596 #: 597 tx_bytes: int = field(metadata=TextParser.find_int(r"TX-bytes:\s+(\d+)")) 598 599 #: 600 rx_pps: int = field(metadata=TextParser.find_int(r"Rx-pps:\s+(\d+)")) 601 #: 602 rx_bps: int = field(metadata=TextParser.find_int(r"Rx-bps:\s+(\d+)")) 603 604 #: 605 tx_pps: int = field(metadata=TextParser.find_int(r"Tx-pps:\s+(\d+)")) 606 #: 607 tx_bps: int = field(metadata=TextParser.find_int(r"Tx-bps:\s+(\d+)")) 608 609 610class TestPmdShell(InteractiveShell): 611 """Testpmd interactive shell. 612 613 The testpmd shell users should never use 614 the :meth:`~.interactive_shell.InteractiveShell.send_command` method directly, but rather 615 call specialized methods. If there isn't one that satisfies a need, it should be added. 616 617 Attributes: 618 number_of_ports: The number of ports which were allowed on the command-line when testpmd 619 was started. 620 """ 621 622 number_of_ports: int 623 624 #: The path to the testpmd executable. 625 path: ClassVar[PurePath] = PurePath("app", "dpdk-testpmd") 626 627 #: Flag this as a DPDK app so that it's clear this is not a system app and 628 #: needs to be looked in a specific path. 629 dpdk_app: ClassVar[bool] = True 630 631 #: The testpmd's prompt. 632 _default_prompt: ClassVar[str] = "testpmd>" 633 634 #: This forces the prompt to appear after sending a command. 635 _command_extra_chars: ClassVar[str] = "\n" 636 637 def _start_application(self, get_privileged_command: Callable[[str], str] | None) -> None: 638 """Overrides :meth:`~.interactive_shell._start_application`. 639 640 Add flags for starting testpmd in interactive mode and disabling messages for link state 641 change events before starting the application. Link state is verified before starting 642 packet forwarding and the messages create unexpected newlines in the terminal which 643 complicates output collection. 644 645 Also find the number of pci addresses which were allowed on the command line when the app 646 was started. 647 """ 648 self._app_args += " -i --mask-event intr_lsc" 649 self.number_of_ports = self._app_args.count("-a ") 650 super()._start_application(get_privileged_command) 651 652 def start(self, verify: bool = True) -> None: 653 """Start packet forwarding with the current configuration. 654 655 Args: 656 verify: If :data:`True` , a second start command will be sent in an attempt to verify 657 packet forwarding started as expected. 658 659 Raises: 660 InteractiveCommandExecutionError: If `verify` is :data:`True` and forwarding fails to 661 start or ports fail to come up. 662 """ 663 self.send_command("start") 664 if verify: 665 # If forwarding was already started, sending "start" again should tell us 666 start_cmd_output = self.send_command("start") 667 if "Packet forwarding already started" not in start_cmd_output: 668 self._logger.debug(f"Failed to start packet forwarding: \n{start_cmd_output}") 669 raise InteractiveCommandExecutionError("Testpmd failed to start packet forwarding.") 670 671 for port_id in range(self.number_of_ports): 672 if not self.wait_link_status_up(port_id): 673 raise InteractiveCommandExecutionError( 674 "Not all ports came up after starting packet forwarding in testpmd." 675 ) 676 677 def stop(self, verify: bool = True) -> None: 678 """Stop packet forwarding. 679 680 Args: 681 verify: If :data:`True` , the output of the stop command is scanned to verify that 682 forwarding was stopped successfully or not started. If neither is found, it is 683 considered an error. 684 685 Raises: 686 InteractiveCommandExecutionError: If `verify` is :data:`True` and the command to stop 687 forwarding results in an error. 688 """ 689 stop_cmd_output = self.send_command("stop") 690 if verify: 691 if ( 692 "Done." not in stop_cmd_output 693 and "Packet forwarding not started" not in stop_cmd_output 694 ): 695 self._logger.debug(f"Failed to stop packet forwarding: \n{stop_cmd_output}") 696 raise InteractiveCommandExecutionError("Testpmd failed to stop packet forwarding.") 697 698 def get_devices(self) -> list[TestPmdDevice]: 699 """Get a list of device names that are known to testpmd. 700 701 Uses the device info listed in testpmd and then parses the output. 702 703 Returns: 704 A list of devices. 705 """ 706 dev_info: str = self.send_command("show device info all") 707 dev_list: list[TestPmdDevice] = [] 708 for line in dev_info.split("\n"): 709 if "device name:" in line.lower(): 710 dev_list.append(TestPmdDevice(line)) 711 return dev_list 712 713 def wait_link_status_up(self, port_id: int, timeout=SETTINGS.timeout) -> bool: 714 """Wait until the link status on the given port is "up". 715 716 Arguments: 717 port_id: Port to check the link status on. 718 timeout: Time to wait for the link to come up. The default value for this 719 argument may be modified using the :option:`--timeout` command-line argument 720 or the :envvar:`DTS_TIMEOUT` environment variable. 721 722 Returns: 723 Whether the link came up in time or not. 724 """ 725 time_to_stop = time.time() + timeout 726 port_info: str = "" 727 while time.time() < time_to_stop: 728 port_info = self.send_command(f"show port info {port_id}") 729 if "Link status: up" in port_info: 730 break 731 time.sleep(0.5) 732 else: 733 self._logger.error(f"The link for port {port_id} did not come up in the given timeout.") 734 return "Link status: up" in port_info 735 736 def set_forward_mode(self, mode: TestPmdForwardingModes, verify: bool = True): 737 """Set packet forwarding mode. 738 739 Args: 740 mode: The forwarding mode to use. 741 verify: If :data:`True` the output of the command will be scanned in an attempt to 742 verify that the forwarding mode was set to `mode` properly. 743 744 Raises: 745 InteractiveCommandExecutionError: If `verify` is :data:`True` and the forwarding mode 746 fails to update. 747 """ 748 set_fwd_output = self.send_command(f"set fwd {mode.value}") 749 if f"Set {mode.value} packet forwarding mode" not in set_fwd_output: 750 self._logger.debug(f"Failed to set fwd mode to {mode.value}:\n{set_fwd_output}") 751 raise InteractiveCommandExecutionError( 752 f"Test pmd failed to set fwd mode to {mode.value}" 753 ) 754 755 def show_port_info_all(self) -> list[TestPmdPort]: 756 """Returns the information of all the ports. 757 758 Returns: 759 list[TestPmdPort]: A list containing all the ports information as `TestPmdPort`. 760 """ 761 output = self.send_command("show port info all") 762 763 # Sample output of the "all" command looks like: 764 # 765 # <start> 766 # 767 # ********************* Infos for port 0 ********************* 768 # Key: value 769 # 770 # ********************* Infos for port 1 ********************* 771 # Key: value 772 # <end> 773 # 774 # Takes advantage of the double new line in between ports as end delimiter. But we need to 775 # artificially add a new line at the end to pick up the last port. Because commands are 776 # executed on a pseudo-terminal created by paramiko on the remote node, lines end with CRLF. 777 # Therefore we also need to take the carriage return into account. 778 iter = re.finditer(r"\*{21}.*?[\r\n]{4}", output + "\r\n", re.S) 779 return [TestPmdPort.parse(block.group(0)) for block in iter] 780 781 def show_port_info(self, port_id: int) -> TestPmdPort: 782 """Returns the given port information. 783 784 Args: 785 port_id: The port ID to gather information for. 786 787 Raises: 788 InteractiveCommandExecutionError: If `port_id` is invalid. 789 790 Returns: 791 TestPmdPort: An instance of `TestPmdPort` containing the given port's information. 792 """ 793 output = self.send_command(f"show port info {port_id}", skip_first_line=True) 794 if output.startswith("Invalid port"): 795 raise InteractiveCommandExecutionError("invalid port given") 796 797 return TestPmdPort.parse(output) 798 799 def show_port_stats_all(self) -> list[TestPmdPortStats]: 800 """Returns the statistics of all the ports. 801 802 Returns: 803 list[TestPmdPortStats]: A list containing all the ports stats as `TestPmdPortStats`. 804 """ 805 output = self.send_command("show port stats all") 806 807 # Sample output of the "all" command looks like: 808 # 809 # ########### NIC statistics for port 0 ########### 810 # values... 811 # ################################################# 812 # 813 # ########### NIC statistics for port 1 ########### 814 # values... 815 # ################################################# 816 # 817 iter = re.finditer(r"(^ #*.+#*$[^#]+)^ #*\r$", output, re.MULTILINE) 818 return [TestPmdPortStats.parse(block.group(1)) for block in iter] 819 820 def show_port_stats(self, port_id: int) -> TestPmdPortStats: 821 """Returns the given port statistics. 822 823 Args: 824 port_id: The port ID to gather information for. 825 826 Raises: 827 InteractiveCommandExecutionError: If `port_id` is invalid. 828 829 Returns: 830 TestPmdPortStats: An instance of `TestPmdPortStats` containing the given port's stats. 831 """ 832 output = self.send_command(f"show port stats {port_id}", skip_first_line=True) 833 if output.startswith("Invalid port"): 834 raise InteractiveCommandExecutionError("invalid port given") 835 836 return TestPmdPortStats.parse(output) 837 838 def close(self) -> None: 839 """Overrides :meth:`~.interactive_shell.close`.""" 840 self.send_command("quit", "") 841 return super().close() 842