1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright(c) 2023 University of New Hampshire 3# Copyright(c) 2023 PANTHEON.tech s.r.o. 4# Copyright(c) 2024 Arm Limited 5 6"""Testpmd interactive shell. 7 8Typical usage example in a TestSuite:: 9 10 testpmd_shell = self.sut_node.create_interactive_shell( 11 TestPmdShell, privileged=True 12 ) 13 devices = testpmd_shell.get_devices() 14 for device in devices: 15 print(device) 16 testpmd_shell.close() 17""" 18 19import re 20import time 21from dataclasses import dataclass, field 22from enum import Flag, auto 23from pathlib import PurePath 24from typing import Callable, ClassVar 25 26from typing_extensions import Self 27 28from framework.exception import InteractiveCommandExecutionError 29from framework.parser import ParserFn, TextParser 30from framework.settings import SETTINGS 31from framework.utils import StrEnum 32 33from .interactive_shell import InteractiveShell 34 35 36class TestPmdDevice: 37 """The data of a device that testpmd can recognize. 38 39 Attributes: 40 pci_address: The PCI address of the device. 41 """ 42 43 pci_address: str 44 45 def __init__(self, pci_address_line: str): 46 """Initialize the device from the testpmd output line string. 47 48 Args: 49 pci_address_line: A line of testpmd output that contains a device. 50 """ 51 self.pci_address = pci_address_line.strip().split(": ")[1].strip() 52 53 def __str__(self) -> str: 54 """The PCI address captures what the device is.""" 55 return self.pci_address 56 57 58class TestPmdForwardingModes(StrEnum): 59 r"""The supported packet forwarding modes for :class:`~TestPmdShell`\s.""" 60 61 #: 62 io = auto() 63 #: 64 mac = auto() 65 #: 66 macswap = auto() 67 #: 68 flowgen = auto() 69 #: 70 rxonly = auto() 71 #: 72 txonly = auto() 73 #: 74 csum = auto() 75 #: 76 icmpecho = auto() 77 #: 78 ieee1588 = auto() 79 #: 80 noisy = auto() 81 #: 82 fivetswap = "5tswap" 83 #: 84 shared_rxq = "shared-rxq" 85 #: 86 recycle_mbufs = auto() 87 88 89class VLANOffloadFlag(Flag): 90 """Flag representing the VLAN offload settings of a NIC port.""" 91 92 #: 93 STRIP = auto() 94 #: 95 FILTER = auto() 96 #: 97 EXTEND = auto() 98 #: 99 QINQ_STRIP = auto() 100 101 @classmethod 102 def from_str_dict(cls, d): 103 """Makes an instance from a dict containing the flag member names with an "on" value. 104 105 Args: 106 d: A dictionary containing the flag members as keys and any string value. 107 108 Returns: 109 A new instance of the flag. 110 """ 111 flag = cls(0) 112 for name in cls.__members__: 113 if d.get(name) == "on": 114 flag |= cls[name] 115 return flag 116 117 @classmethod 118 def make_parser(cls) -> ParserFn: 119 """Makes a parser function. 120 121 Returns: 122 ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a 123 parser function that makes an instance of this flag from text. 124 """ 125 return TextParser.wrap( 126 TextParser.find( 127 r"VLAN offload:\s+" 128 r"strip (?P<STRIP>on|off), " 129 r"filter (?P<FILTER>on|off), " 130 r"extend (?P<EXTEND>on|off), " 131 r"qinq strip (?P<QINQ_STRIP>on|off)$", 132 re.MULTILINE, 133 named=True, 134 ), 135 cls.from_str_dict, 136 ) 137 138 139class RSSOffloadTypesFlag(Flag): 140 """Flag representing the RSS offload flow types supported by the NIC port.""" 141 142 #: 143 ipv4 = auto() 144 #: 145 ipv4_frag = auto() 146 #: 147 ipv4_tcp = auto() 148 #: 149 ipv4_udp = auto() 150 #: 151 ipv4_sctp = auto() 152 #: 153 ipv4_other = auto() 154 #: 155 ipv6 = auto() 156 #: 157 ipv6_frag = auto() 158 #: 159 ipv6_tcp = auto() 160 #: 161 ipv6_udp = auto() 162 #: 163 ipv6_sctp = auto() 164 #: 165 ipv6_other = auto() 166 #: 167 l2_payload = auto() 168 #: 169 ipv6_ex = auto() 170 #: 171 ipv6_tcp_ex = auto() 172 #: 173 ipv6_udp_ex = auto() 174 #: 175 port = auto() 176 #: 177 vxlan = auto() 178 #: 179 geneve = auto() 180 #: 181 nvgre = auto() 182 #: 183 user_defined_22 = auto() 184 #: 185 gtpu = auto() 186 #: 187 eth = auto() 188 #: 189 s_vlan = auto() 190 #: 191 c_vlan = auto() 192 #: 193 esp = auto() 194 #: 195 ah = auto() 196 #: 197 l2tpv3 = auto() 198 #: 199 pfcp = auto() 200 #: 201 pppoe = auto() 202 #: 203 ecpri = auto() 204 #: 205 mpls = auto() 206 #: 207 ipv4_chksum = auto() 208 #: 209 l4_chksum = auto() 210 #: 211 l2tpv2 = auto() 212 #: 213 ipv6_flow_label = auto() 214 #: 215 user_defined_38 = auto() 216 #: 217 user_defined_39 = auto() 218 #: 219 user_defined_40 = auto() 220 #: 221 user_defined_41 = auto() 222 #: 223 user_defined_42 = auto() 224 #: 225 user_defined_43 = auto() 226 #: 227 user_defined_44 = auto() 228 #: 229 user_defined_45 = auto() 230 #: 231 user_defined_46 = auto() 232 #: 233 user_defined_47 = auto() 234 #: 235 user_defined_48 = auto() 236 #: 237 user_defined_49 = auto() 238 #: 239 user_defined_50 = auto() 240 #: 241 user_defined_51 = auto() 242 #: 243 l3_pre96 = auto() 244 #: 245 l3_pre64 = auto() 246 #: 247 l3_pre56 = auto() 248 #: 249 l3_pre48 = auto() 250 #: 251 l3_pre40 = auto() 252 #: 253 l3_pre32 = auto() 254 #: 255 l2_dst_only = auto() 256 #: 257 l2_src_only = auto() 258 #: 259 l4_dst_only = auto() 260 #: 261 l4_src_only = auto() 262 #: 263 l3_dst_only = auto() 264 #: 265 l3_src_only = auto() 266 267 #: 268 ip = ipv4 | ipv4_frag | ipv4_other | ipv6 | ipv6_frag | ipv6_other | ipv6_ex 269 #: 270 udp = ipv4_udp | ipv6_udp | ipv6_udp_ex 271 #: 272 tcp = ipv4_tcp | ipv6_tcp | ipv6_tcp_ex 273 #: 274 sctp = ipv4_sctp | ipv6_sctp 275 #: 276 tunnel = vxlan | geneve | nvgre 277 #: 278 vlan = s_vlan | c_vlan 279 #: 280 all = ( 281 eth 282 | vlan 283 | ip 284 | tcp 285 | udp 286 | sctp 287 | l2_payload 288 | l2tpv3 289 | esp 290 | ah 291 | pfcp 292 | gtpu 293 | ecpri 294 | mpls 295 | l2tpv2 296 ) 297 298 @classmethod 299 def from_list_string(cls, names: str) -> Self: 300 """Makes a flag from a whitespace-separated list of names. 301 302 Args: 303 names: a whitespace-separated list containing the members of this flag. 304 305 Returns: 306 An instance of this flag. 307 """ 308 flag = cls(0) 309 for name in names.split(): 310 flag |= cls.from_str(name) 311 return flag 312 313 @classmethod 314 def from_str(cls, name: str) -> Self: 315 """Makes a flag matching the supplied name. 316 317 Args: 318 name: a valid member of this flag in text 319 Returns: 320 An instance of this flag. 321 """ 322 member_name = name.strip().replace("-", "_") 323 return cls[member_name] 324 325 @classmethod 326 def make_parser(cls) -> ParserFn: 327 """Makes a parser function. 328 329 Returns: 330 ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a 331 parser function that makes an instance of this flag from text. 332 """ 333 return TextParser.wrap( 334 TextParser.find(r"Supported RSS offload flow types:((?:\r?\n? \S+)+)", re.MULTILINE), 335 RSSOffloadTypesFlag.from_list_string, 336 ) 337 338 339class DeviceCapabilitiesFlag(Flag): 340 """Flag representing the device capabilities.""" 341 342 #: Device supports Rx queue setup after device started. 343 RUNTIME_RX_QUEUE_SETUP = auto() 344 #: Device supports Tx queue setup after device started. 345 RUNTIME_TX_QUEUE_SETUP = auto() 346 #: Device supports shared Rx queue among ports within Rx domain and switch domain. 347 RXQ_SHARE = auto() 348 #: Device supports keeping flow rules across restart. 349 FLOW_RULE_KEEP = auto() 350 #: Device supports keeping shared flow objects across restart. 351 FLOW_SHARED_OBJECT_KEEP = auto() 352 353 @classmethod 354 def make_parser(cls) -> ParserFn: 355 """Makes a parser function. 356 357 Returns: 358 ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a 359 parser function that makes an instance of this flag from text. 360 """ 361 return TextParser.wrap( 362 TextParser.find_int(r"Device capabilities: (0x[A-Fa-f\d]+)"), 363 cls, 364 ) 365 366 367class DeviceErrorHandlingMode(StrEnum): 368 """Enum representing the device error handling mode.""" 369 370 #: 371 none = auto() 372 #: 373 passive = auto() 374 #: 375 proactive = auto() 376 #: 377 unknown = auto() 378 379 @classmethod 380 def make_parser(cls) -> ParserFn: 381 """Makes a parser function. 382 383 Returns: 384 ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a 385 parser function that makes an instance of this enum from text. 386 """ 387 return TextParser.wrap(TextParser.find(r"Device error handling mode: (\w+)"), cls) 388 389 390def make_device_private_info_parser() -> ParserFn: 391 """Device private information parser. 392 393 Ensures that we are not parsing invalid device private info output. 394 395 Returns: 396 ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a parser 397 function that parses the device private info from the TestPmd port info output. 398 """ 399 400 def _validate(info: str): 401 info = info.strip() 402 if info == "none" or info.startswith("Invalid file") or info.startswith("Failed to dump"): 403 return None 404 return info 405 406 return TextParser.wrap(TextParser.find(r"Device private info:\s+([\s\S]+)"), _validate) 407 408 409@dataclass 410class TestPmdPort(TextParser): 411 """Dataclass representing the result of testpmd's ``show port info`` command.""" 412 413 #: 414 id: int = field(metadata=TextParser.find_int(r"Infos for port (\d+)\b")) 415 #: 416 device_name: str = field(metadata=TextParser.find(r"Device name: ([^\r\n]+)")) 417 #: 418 driver_name: str = field(metadata=TextParser.find(r"Driver name: ([^\r\n]+)")) 419 #: 420 socket_id: int = field(metadata=TextParser.find_int(r"Connect to socket: (\d+)")) 421 #: 422 is_link_up: bool = field(metadata=TextParser.find("Link status: up")) 423 #: 424 link_speed: str = field(metadata=TextParser.find(r"Link speed: ([^\r\n]+)")) 425 #: 426 is_link_full_duplex: bool = field(metadata=TextParser.find("Link duplex: full-duplex")) 427 #: 428 is_link_autonegotiated: bool = field(metadata=TextParser.find("Autoneg status: On")) 429 #: 430 is_promiscuous_mode_enabled: bool = field(metadata=TextParser.find("Promiscuous mode: enabled")) 431 #: 432 is_allmulticast_mode_enabled: bool = field( 433 metadata=TextParser.find("Allmulticast mode: enabled") 434 ) 435 #: Maximum number of MAC addresses 436 max_mac_addresses_num: int = field( 437 metadata=TextParser.find_int(r"Maximum number of MAC addresses: (\d+)") 438 ) 439 #: Maximum configurable length of RX packet 440 max_hash_mac_addresses_num: int = field( 441 metadata=TextParser.find_int(r"Maximum number of MAC addresses of hash filtering: (\d+)") 442 ) 443 #: Minimum size of RX buffer 444 min_rx_bufsize: int = field(metadata=TextParser.find_int(r"Minimum size of RX buffer: (\d+)")) 445 #: Maximum configurable length of RX packet 446 max_rx_packet_length: int = field( 447 metadata=TextParser.find_int(r"Maximum configurable length of RX packet: (\d+)") 448 ) 449 #: Maximum configurable size of LRO aggregated packet 450 max_lro_packet_size: int = field( 451 metadata=TextParser.find_int(r"Maximum configurable size of LRO aggregated packet: (\d+)") 452 ) 453 454 #: Current number of RX queues 455 rx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of RX queues: (\d+)")) 456 #: Max possible RX queues 457 max_rx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible RX queues: (\d+)")) 458 #: Max possible number of RXDs per queue 459 max_queue_rxd_num: int = field( 460 metadata=TextParser.find_int(r"Max possible number of RXDs per queue: (\d+)") 461 ) 462 #: Min possible number of RXDs per queue 463 min_queue_rxd_num: int = field( 464 metadata=TextParser.find_int(r"Min possible number of RXDs per queue: (\d+)") 465 ) 466 #: RXDs number alignment 467 rxd_alignment_num: int = field(metadata=TextParser.find_int(r"RXDs number alignment: (\d+)")) 468 469 #: Current number of TX queues 470 tx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of TX queues: (\d+)")) 471 #: Max possible TX queues 472 max_tx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible TX queues: (\d+)")) 473 #: Max possible number of TXDs per queue 474 max_queue_txd_num: int = field( 475 metadata=TextParser.find_int(r"Max possible number of TXDs per queue: (\d+)") 476 ) 477 #: Min possible number of TXDs per queue 478 min_queue_txd_num: int = field( 479 metadata=TextParser.find_int(r"Min possible number of TXDs per queue: (\d+)") 480 ) 481 #: TXDs number alignment 482 txd_alignment_num: int = field(metadata=TextParser.find_int(r"TXDs number alignment: (\d+)")) 483 #: Max segment number per packet 484 max_packet_segment_num: int = field( 485 metadata=TextParser.find_int(r"Max segment number per packet: (\d+)") 486 ) 487 #: Max segment number per MTU/TSO 488 max_mtu_segment_num: int = field( 489 metadata=TextParser.find_int(r"Max segment number per MTU\/TSO: (\d+)") 490 ) 491 492 #: 493 device_capabilities: DeviceCapabilitiesFlag = field( 494 metadata=DeviceCapabilitiesFlag.make_parser(), 495 ) 496 #: 497 device_error_handling_mode: DeviceErrorHandlingMode = field( 498 metadata=DeviceErrorHandlingMode.make_parser() 499 ) 500 #: 501 device_private_info: str | None = field( 502 default=None, 503 metadata=make_device_private_info_parser(), 504 ) 505 506 #: 507 hash_key_size: int | None = field( 508 default=None, metadata=TextParser.find_int(r"Hash key size in bytes: (\d+)") 509 ) 510 #: 511 redirection_table_size: int | None = field( 512 default=None, metadata=TextParser.find_int(r"Redirection table size: (\d+)") 513 ) 514 #: 515 supported_rss_offload_flow_types: RSSOffloadTypesFlag = field( 516 default=RSSOffloadTypesFlag(0), metadata=RSSOffloadTypesFlag.make_parser() 517 ) 518 519 #: 520 mac_address: str | None = field( 521 default=None, metadata=TextParser.find(r"MAC address: ([A-Fa-f0-9:]+)") 522 ) 523 #: 524 fw_version: str | None = field( 525 default=None, metadata=TextParser.find(r"Firmware-version: ([^\r\n]+)") 526 ) 527 #: 528 dev_args: str | None = field(default=None, metadata=TextParser.find(r"Devargs: ([^\r\n]+)")) 529 #: Socket id of the memory allocation 530 mem_alloc_socket_id: int | None = field( 531 default=None, 532 metadata=TextParser.find_int(r"memory allocation on the socket: (\d+)"), 533 ) 534 #: 535 mtu: int | None = field(default=None, metadata=TextParser.find_int(r"MTU: (\d+)")) 536 537 #: 538 vlan_offload: VLANOffloadFlag | None = field( 539 default=None, 540 metadata=VLANOffloadFlag.make_parser(), 541 ) 542 543 #: Maximum size of RX buffer 544 max_rx_bufsize: int | None = field( 545 default=None, metadata=TextParser.find_int(r"Maximum size of RX buffer: (\d+)") 546 ) 547 #: Maximum number of VFs 548 max_vfs_num: int | None = field( 549 default=None, metadata=TextParser.find_int(r"Maximum number of VFs: (\d+)") 550 ) 551 #: Maximum number of VMDq pools 552 max_vmdq_pools_num: int | None = field( 553 default=None, metadata=TextParser.find_int(r"Maximum number of VMDq pools: (\d+)") 554 ) 555 556 #: 557 switch_name: str | None = field( 558 default=None, metadata=TextParser.find(r"Switch name: ([\r\n]+)") 559 ) 560 #: 561 switch_domain_id: int | None = field( 562 default=None, metadata=TextParser.find_int(r"Switch domain Id: (\d+)") 563 ) 564 #: 565 switch_port_id: int | None = field( 566 default=None, metadata=TextParser.find_int(r"Switch Port Id: (\d+)") 567 ) 568 #: 569 switch_rx_domain: int | None = field( 570 default=None, metadata=TextParser.find_int(r"Switch Rx domain: (\d+)") 571 ) 572 573 574class TestPmdShell(InteractiveShell): 575 """Testpmd interactive shell. 576 577 The testpmd shell users should never use 578 the :meth:`~.interactive_shell.InteractiveShell.send_command` method directly, but rather 579 call specialized methods. If there isn't one that satisfies a need, it should be added. 580 581 Attributes: 582 number_of_ports: The number of ports which were allowed on the command-line when testpmd 583 was started. 584 """ 585 586 number_of_ports: int 587 588 #: The path to the testpmd executable. 589 path: ClassVar[PurePath] = PurePath("app", "dpdk-testpmd") 590 591 #: Flag this as a DPDK app so that it's clear this is not a system app and 592 #: needs to be looked in a specific path. 593 dpdk_app: ClassVar[bool] = True 594 595 #: The testpmd's prompt. 596 _default_prompt: ClassVar[str] = "testpmd>" 597 598 #: This forces the prompt to appear after sending a command. 599 _command_extra_chars: ClassVar[str] = "\n" 600 601 def _start_application(self, get_privileged_command: Callable[[str], str] | None) -> None: 602 """Overrides :meth:`~.interactive_shell._start_application`. 603 604 Add flags for starting testpmd in interactive mode and disabling messages for link state 605 change events before starting the application. Link state is verified before starting 606 packet forwarding and the messages create unexpected newlines in the terminal which 607 complicates output collection. 608 609 Also find the number of pci addresses which were allowed on the command line when the app 610 was started. 611 """ 612 self._app_args += " -i --mask-event intr_lsc" 613 self.number_of_ports = self._app_args.count("-a ") 614 super()._start_application(get_privileged_command) 615 616 def start(self, verify: bool = True) -> None: 617 """Start packet forwarding with the current configuration. 618 619 Args: 620 verify: If :data:`True` , a second start command will be sent in an attempt to verify 621 packet forwarding started as expected. 622 623 Raises: 624 InteractiveCommandExecutionError: If `verify` is :data:`True` and forwarding fails to 625 start or ports fail to come up. 626 """ 627 self.send_command("start") 628 if verify: 629 # If forwarding was already started, sending "start" again should tell us 630 start_cmd_output = self.send_command("start") 631 if "Packet forwarding already started" not in start_cmd_output: 632 self._logger.debug(f"Failed to start packet forwarding: \n{start_cmd_output}") 633 raise InteractiveCommandExecutionError("Testpmd failed to start packet forwarding.") 634 635 for port_id in range(self.number_of_ports): 636 if not self.wait_link_status_up(port_id): 637 raise InteractiveCommandExecutionError( 638 "Not all ports came up after starting packet forwarding in testpmd." 639 ) 640 641 def stop(self, verify: bool = True) -> None: 642 """Stop packet forwarding. 643 644 Args: 645 verify: If :data:`True` , the output of the stop command is scanned to verify that 646 forwarding was stopped successfully or not started. If neither is found, it is 647 considered an error. 648 649 Raises: 650 InteractiveCommandExecutionError: If `verify` is :data:`True` and the command to stop 651 forwarding results in an error. 652 """ 653 stop_cmd_output = self.send_command("stop") 654 if verify: 655 if ( 656 "Done." not in stop_cmd_output 657 and "Packet forwarding not started" not in stop_cmd_output 658 ): 659 self._logger.debug(f"Failed to stop packet forwarding: \n{stop_cmd_output}") 660 raise InteractiveCommandExecutionError("Testpmd failed to stop packet forwarding.") 661 662 def get_devices(self) -> list[TestPmdDevice]: 663 """Get a list of device names that are known to testpmd. 664 665 Uses the device info listed in testpmd and then parses the output. 666 667 Returns: 668 A list of devices. 669 """ 670 dev_info: str = self.send_command("show device info all") 671 dev_list: list[TestPmdDevice] = [] 672 for line in dev_info.split("\n"): 673 if "device name:" in line.lower(): 674 dev_list.append(TestPmdDevice(line)) 675 return dev_list 676 677 def wait_link_status_up(self, port_id: int, timeout=SETTINGS.timeout) -> bool: 678 """Wait until the link status on the given port is "up". 679 680 Arguments: 681 port_id: Port to check the link status on. 682 timeout: Time to wait for the link to come up. The default value for this 683 argument may be modified using the :option:`--timeout` command-line argument 684 or the :envvar:`DTS_TIMEOUT` environment variable. 685 686 Returns: 687 Whether the link came up in time or not. 688 """ 689 time_to_stop = time.time() + timeout 690 port_info: str = "" 691 while time.time() < time_to_stop: 692 port_info = self.send_command(f"show port info {port_id}") 693 if "Link status: up" in port_info: 694 break 695 time.sleep(0.5) 696 else: 697 self._logger.error(f"The link for port {port_id} did not come up in the given timeout.") 698 return "Link status: up" in port_info 699 700 def set_forward_mode(self, mode: TestPmdForwardingModes, verify: bool = True): 701 """Set packet forwarding mode. 702 703 Args: 704 mode: The forwarding mode to use. 705 verify: If :data:`True` the output of the command will be scanned in an attempt to 706 verify that the forwarding mode was set to `mode` properly. 707 708 Raises: 709 InteractiveCommandExecutionError: If `verify` is :data:`True` and the forwarding mode 710 fails to update. 711 """ 712 set_fwd_output = self.send_command(f"set fwd {mode.value}") 713 if f"Set {mode.value} packet forwarding mode" not in set_fwd_output: 714 self._logger.debug(f"Failed to set fwd mode to {mode.value}:\n{set_fwd_output}") 715 raise InteractiveCommandExecutionError( 716 f"Test pmd failed to set fwd mode to {mode.value}" 717 ) 718 719 def show_port_info_all(self) -> list[TestPmdPort]: 720 """Returns the information of all the ports. 721 722 Returns: 723 list[TestPmdPort]: A list containing all the ports information as `TestPmdPort`. 724 """ 725 output = self.send_command("show port info all") 726 727 # Sample output of the "all" command looks like: 728 # 729 # <start> 730 # 731 # ********************* Infos for port 0 ********************* 732 # Key: value 733 # 734 # ********************* Infos for port 1 ********************* 735 # Key: value 736 # <end> 737 # 738 # Takes advantage of the double new line in between ports as end delimiter. But we need to 739 # artificially add a new line at the end to pick up the last port. Because commands are 740 # executed on a pseudo-terminal created by paramiko on the remote node, lines end with CRLF. 741 # Therefore we also need to take the carriage return into account. 742 iter = re.finditer(r"\*{21}.*?[\r\n]{4}", output + "\r\n", re.S) 743 return [TestPmdPort.parse(block.group(0)) for block in iter] 744 745 def show_port_info(self, port_id: int) -> TestPmdPort: 746 """Returns the given port information. 747 748 Args: 749 port_id: The port ID to gather information for. 750 751 Raises: 752 InteractiveCommandExecutionError: If `port_id` is invalid. 753 754 Returns: 755 TestPmdPort: An instance of `TestPmdPort` containing the given port's information. 756 """ 757 output = self.send_command(f"show port info {port_id}", skip_first_line=True) 758 if output.startswith("Invalid port"): 759 raise InteractiveCommandExecutionError("invalid port given") 760 761 return TestPmdPort.parse(output) 762 763 def close(self) -> None: 764 """Overrides :meth:`~.interactive_shell.close`.""" 765 self.send_command("quit", "") 766 return super().close() 767