xref: /dpdk/dts/framework/remote_session/testpmd_shell.py (revision 61d5bc9bf974d5f379ec64b1ab9d0b609b659171)
1# SPDX-License-Identifier: BSD-3-Clause
2# Copyright(c) 2023 University of New Hampshire
3# Copyright(c) 2023 PANTHEON.tech s.r.o.
4# Copyright(c) 2024 Arm Limited
5
6"""Testpmd interactive shell.
7
8Typical usage example in a TestSuite::
9
10    testpmd_shell = self.sut_node.create_interactive_shell(
11            TestPmdShell, privileged=True
12        )
13    devices = testpmd_shell.get_devices()
14    for device in devices:
15        print(device)
16    testpmd_shell.close()
17"""
18
19import re
20import time
21from dataclasses import dataclass, field
22from enum import Flag, auto
23from pathlib import PurePath
24from typing import Callable, ClassVar
25
26from typing_extensions import Self
27
28from framework.exception import InteractiveCommandExecutionError
29from framework.parser import ParserFn, TextParser
30from framework.settings import SETTINGS
31from framework.utils import StrEnum
32
33from .interactive_shell import InteractiveShell
34
35
36class TestPmdDevice:
37    """The data of a device that testpmd can recognize.
38
39    Attributes:
40        pci_address: The PCI address of the device.
41    """
42
43    pci_address: str
44
45    def __init__(self, pci_address_line: str):
46        """Initialize the device from the testpmd output line string.
47
48        Args:
49            pci_address_line: A line of testpmd output that contains a device.
50        """
51        self.pci_address = pci_address_line.strip().split(": ")[1].strip()
52
53    def __str__(self) -> str:
54        """The PCI address captures what the device is."""
55        return self.pci_address
56
57
58class TestPmdForwardingModes(StrEnum):
59    r"""The supported packet forwarding modes for :class:`~TestPmdShell`\s."""
60
61    #:
62    io = auto()
63    #:
64    mac = auto()
65    #:
66    macswap = auto()
67    #:
68    flowgen = auto()
69    #:
70    rxonly = auto()
71    #:
72    txonly = auto()
73    #:
74    csum = auto()
75    #:
76    icmpecho = auto()
77    #:
78    ieee1588 = auto()
79    #:
80    noisy = auto()
81    #:
82    fivetswap = "5tswap"
83    #:
84    shared_rxq = "shared-rxq"
85    #:
86    recycle_mbufs = auto()
87
88
89class VLANOffloadFlag(Flag):
90    """Flag representing the VLAN offload settings of a NIC port."""
91
92    #:
93    STRIP = auto()
94    #:
95    FILTER = auto()
96    #:
97    EXTEND = auto()
98    #:
99    QINQ_STRIP = auto()
100
101    @classmethod
102    def from_str_dict(cls, d):
103        """Makes an instance from a dict containing the flag member names with an "on" value.
104
105        Args:
106            d: A dictionary containing the flag members as keys and any string value.
107
108        Returns:
109            A new instance of the flag.
110        """
111        flag = cls(0)
112        for name in cls.__members__:
113            if d.get(name) == "on":
114                flag |= cls[name]
115        return flag
116
117    @classmethod
118    def make_parser(cls) -> ParserFn:
119        """Makes a parser function.
120
121        Returns:
122            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
123                parser function that makes an instance of this flag from text.
124        """
125        return TextParser.wrap(
126            TextParser.find(
127                r"VLAN offload:\s+"
128                r"strip (?P<STRIP>on|off), "
129                r"filter (?P<FILTER>on|off), "
130                r"extend (?P<EXTEND>on|off), "
131                r"qinq strip (?P<QINQ_STRIP>on|off)$",
132                re.MULTILINE,
133                named=True,
134            ),
135            cls.from_str_dict,
136        )
137
138
139class RSSOffloadTypesFlag(Flag):
140    """Flag representing the RSS offload flow types supported by the NIC port."""
141
142    #:
143    ipv4 = auto()
144    #:
145    ipv4_frag = auto()
146    #:
147    ipv4_tcp = auto()
148    #:
149    ipv4_udp = auto()
150    #:
151    ipv4_sctp = auto()
152    #:
153    ipv4_other = auto()
154    #:
155    ipv6 = auto()
156    #:
157    ipv6_frag = auto()
158    #:
159    ipv6_tcp = auto()
160    #:
161    ipv6_udp = auto()
162    #:
163    ipv6_sctp = auto()
164    #:
165    ipv6_other = auto()
166    #:
167    l2_payload = auto()
168    #:
169    ipv6_ex = auto()
170    #:
171    ipv6_tcp_ex = auto()
172    #:
173    ipv6_udp_ex = auto()
174    #:
175    port = auto()
176    #:
177    vxlan = auto()
178    #:
179    geneve = auto()
180    #:
181    nvgre = auto()
182    #:
183    user_defined_22 = auto()
184    #:
185    gtpu = auto()
186    #:
187    eth = auto()
188    #:
189    s_vlan = auto()
190    #:
191    c_vlan = auto()
192    #:
193    esp = auto()
194    #:
195    ah = auto()
196    #:
197    l2tpv3 = auto()
198    #:
199    pfcp = auto()
200    #:
201    pppoe = auto()
202    #:
203    ecpri = auto()
204    #:
205    mpls = auto()
206    #:
207    ipv4_chksum = auto()
208    #:
209    l4_chksum = auto()
210    #:
211    l2tpv2 = auto()
212    #:
213    ipv6_flow_label = auto()
214    #:
215    user_defined_38 = auto()
216    #:
217    user_defined_39 = auto()
218    #:
219    user_defined_40 = auto()
220    #:
221    user_defined_41 = auto()
222    #:
223    user_defined_42 = auto()
224    #:
225    user_defined_43 = auto()
226    #:
227    user_defined_44 = auto()
228    #:
229    user_defined_45 = auto()
230    #:
231    user_defined_46 = auto()
232    #:
233    user_defined_47 = auto()
234    #:
235    user_defined_48 = auto()
236    #:
237    user_defined_49 = auto()
238    #:
239    user_defined_50 = auto()
240    #:
241    user_defined_51 = auto()
242    #:
243    l3_pre96 = auto()
244    #:
245    l3_pre64 = auto()
246    #:
247    l3_pre56 = auto()
248    #:
249    l3_pre48 = auto()
250    #:
251    l3_pre40 = auto()
252    #:
253    l3_pre32 = auto()
254    #:
255    l2_dst_only = auto()
256    #:
257    l2_src_only = auto()
258    #:
259    l4_dst_only = auto()
260    #:
261    l4_src_only = auto()
262    #:
263    l3_dst_only = auto()
264    #:
265    l3_src_only = auto()
266
267    #:
268    ip = ipv4 | ipv4_frag | ipv4_other | ipv6 | ipv6_frag | ipv6_other | ipv6_ex
269    #:
270    udp = ipv4_udp | ipv6_udp | ipv6_udp_ex
271    #:
272    tcp = ipv4_tcp | ipv6_tcp | ipv6_tcp_ex
273    #:
274    sctp = ipv4_sctp | ipv6_sctp
275    #:
276    tunnel = vxlan | geneve | nvgre
277    #:
278    vlan = s_vlan | c_vlan
279    #:
280    all = (
281        eth
282        | vlan
283        | ip
284        | tcp
285        | udp
286        | sctp
287        | l2_payload
288        | l2tpv3
289        | esp
290        | ah
291        | pfcp
292        | gtpu
293        | ecpri
294        | mpls
295        | l2tpv2
296    )
297
298    @classmethod
299    def from_list_string(cls, names: str) -> Self:
300        """Makes a flag from a whitespace-separated list of names.
301
302        Args:
303            names: a whitespace-separated list containing the members of this flag.
304
305        Returns:
306            An instance of this flag.
307        """
308        flag = cls(0)
309        for name in names.split():
310            flag |= cls.from_str(name)
311        return flag
312
313    @classmethod
314    def from_str(cls, name: str) -> Self:
315        """Makes a flag matching the supplied name.
316
317        Args:
318            name: a valid member of this flag in text
319        Returns:
320            An instance of this flag.
321        """
322        member_name = name.strip().replace("-", "_")
323        return cls[member_name]
324
325    @classmethod
326    def make_parser(cls) -> ParserFn:
327        """Makes a parser function.
328
329        Returns:
330            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
331                parser function that makes an instance of this flag from text.
332        """
333        return TextParser.wrap(
334            TextParser.find(r"Supported RSS offload flow types:((?:\r?\n?  \S+)+)", re.MULTILINE),
335            RSSOffloadTypesFlag.from_list_string,
336        )
337
338
339class DeviceCapabilitiesFlag(Flag):
340    """Flag representing the device capabilities."""
341
342    #: Device supports Rx queue setup after device started.
343    RUNTIME_RX_QUEUE_SETUP = auto()
344    #: Device supports Tx queue setup after device started.
345    RUNTIME_TX_QUEUE_SETUP = auto()
346    #: Device supports shared Rx queue among ports within Rx domain and switch domain.
347    RXQ_SHARE = auto()
348    #: Device supports keeping flow rules across restart.
349    FLOW_RULE_KEEP = auto()
350    #: Device supports keeping shared flow objects across restart.
351    FLOW_SHARED_OBJECT_KEEP = auto()
352
353    @classmethod
354    def make_parser(cls) -> ParserFn:
355        """Makes a parser function.
356
357        Returns:
358            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
359                parser function that makes an instance of this flag from text.
360        """
361        return TextParser.wrap(
362            TextParser.find_int(r"Device capabilities: (0x[A-Fa-f\d]+)"),
363            cls,
364        )
365
366
367class DeviceErrorHandlingMode(StrEnum):
368    """Enum representing the device error handling mode."""
369
370    #:
371    none = auto()
372    #:
373    passive = auto()
374    #:
375    proactive = auto()
376    #:
377    unknown = auto()
378
379    @classmethod
380    def make_parser(cls) -> ParserFn:
381        """Makes a parser function.
382
383        Returns:
384            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
385                parser function that makes an instance of this enum from text.
386        """
387        return TextParser.wrap(TextParser.find(r"Device error handling mode: (\w+)"), cls)
388
389
390def make_device_private_info_parser() -> ParserFn:
391    """Device private information parser.
392
393    Ensures that we are not parsing invalid device private info output.
394
395    Returns:
396        ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a parser
397            function that parses the device private info from the TestPmd port info output.
398    """
399
400    def _validate(info: str):
401        info = info.strip()
402        if info == "none" or info.startswith("Invalid file") or info.startswith("Failed to dump"):
403            return None
404        return info
405
406    return TextParser.wrap(TextParser.find(r"Device private info:\s+([\s\S]+)"), _validate)
407
408
409@dataclass
410class TestPmdPort(TextParser):
411    """Dataclass representing the result of testpmd's ``show port info`` command."""
412
413    #:
414    id: int = field(metadata=TextParser.find_int(r"Infos for port (\d+)\b"))
415    #:
416    device_name: str = field(metadata=TextParser.find(r"Device name: ([^\r\n]+)"))
417    #:
418    driver_name: str = field(metadata=TextParser.find(r"Driver name: ([^\r\n]+)"))
419    #:
420    socket_id: int = field(metadata=TextParser.find_int(r"Connect to socket: (\d+)"))
421    #:
422    is_link_up: bool = field(metadata=TextParser.find("Link status: up"))
423    #:
424    link_speed: str = field(metadata=TextParser.find(r"Link speed: ([^\r\n]+)"))
425    #:
426    is_link_full_duplex: bool = field(metadata=TextParser.find("Link duplex: full-duplex"))
427    #:
428    is_link_autonegotiated: bool = field(metadata=TextParser.find("Autoneg status: On"))
429    #:
430    is_promiscuous_mode_enabled: bool = field(metadata=TextParser.find("Promiscuous mode: enabled"))
431    #:
432    is_allmulticast_mode_enabled: bool = field(
433        metadata=TextParser.find("Allmulticast mode: enabled")
434    )
435    #: Maximum number of MAC addresses
436    max_mac_addresses_num: int = field(
437        metadata=TextParser.find_int(r"Maximum number of MAC addresses: (\d+)")
438    )
439    #: Maximum configurable length of RX packet
440    max_hash_mac_addresses_num: int = field(
441        metadata=TextParser.find_int(r"Maximum number of MAC addresses of hash filtering: (\d+)")
442    )
443    #: Minimum size of RX buffer
444    min_rx_bufsize: int = field(metadata=TextParser.find_int(r"Minimum size of RX buffer: (\d+)"))
445    #: Maximum configurable length of RX packet
446    max_rx_packet_length: int = field(
447        metadata=TextParser.find_int(r"Maximum configurable length of RX packet: (\d+)")
448    )
449    #: Maximum configurable size of LRO aggregated packet
450    max_lro_packet_size: int = field(
451        metadata=TextParser.find_int(r"Maximum configurable size of LRO aggregated packet: (\d+)")
452    )
453
454    #: Current number of RX queues
455    rx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of RX queues: (\d+)"))
456    #: Max possible RX queues
457    max_rx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible RX queues: (\d+)"))
458    #: Max possible number of RXDs per queue
459    max_queue_rxd_num: int = field(
460        metadata=TextParser.find_int(r"Max possible number of RXDs per queue: (\d+)")
461    )
462    #: Min possible number of RXDs per queue
463    min_queue_rxd_num: int = field(
464        metadata=TextParser.find_int(r"Min possible number of RXDs per queue: (\d+)")
465    )
466    #: RXDs number alignment
467    rxd_alignment_num: int = field(metadata=TextParser.find_int(r"RXDs number alignment: (\d+)"))
468
469    #: Current number of TX queues
470    tx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of TX queues: (\d+)"))
471    #: Max possible TX queues
472    max_tx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible TX queues: (\d+)"))
473    #: Max possible number of TXDs per queue
474    max_queue_txd_num: int = field(
475        metadata=TextParser.find_int(r"Max possible number of TXDs per queue: (\d+)")
476    )
477    #: Min possible number of TXDs per queue
478    min_queue_txd_num: int = field(
479        metadata=TextParser.find_int(r"Min possible number of TXDs per queue: (\d+)")
480    )
481    #: TXDs number alignment
482    txd_alignment_num: int = field(metadata=TextParser.find_int(r"TXDs number alignment: (\d+)"))
483    #: Max segment number per packet
484    max_packet_segment_num: int = field(
485        metadata=TextParser.find_int(r"Max segment number per packet: (\d+)")
486    )
487    #: Max segment number per MTU/TSO
488    max_mtu_segment_num: int = field(
489        metadata=TextParser.find_int(r"Max segment number per MTU\/TSO: (\d+)")
490    )
491
492    #:
493    device_capabilities: DeviceCapabilitiesFlag = field(
494        metadata=DeviceCapabilitiesFlag.make_parser(),
495    )
496    #:
497    device_error_handling_mode: DeviceErrorHandlingMode = field(
498        metadata=DeviceErrorHandlingMode.make_parser()
499    )
500    #:
501    device_private_info: str | None = field(
502        default=None,
503        metadata=make_device_private_info_parser(),
504    )
505
506    #:
507    hash_key_size: int | None = field(
508        default=None, metadata=TextParser.find_int(r"Hash key size in bytes: (\d+)")
509    )
510    #:
511    redirection_table_size: int | None = field(
512        default=None, metadata=TextParser.find_int(r"Redirection table size: (\d+)")
513    )
514    #:
515    supported_rss_offload_flow_types: RSSOffloadTypesFlag = field(
516        default=RSSOffloadTypesFlag(0), metadata=RSSOffloadTypesFlag.make_parser()
517    )
518
519    #:
520    mac_address: str | None = field(
521        default=None, metadata=TextParser.find(r"MAC address: ([A-Fa-f0-9:]+)")
522    )
523    #:
524    fw_version: str | None = field(
525        default=None, metadata=TextParser.find(r"Firmware-version: ([^\r\n]+)")
526    )
527    #:
528    dev_args: str | None = field(default=None, metadata=TextParser.find(r"Devargs: ([^\r\n]+)"))
529    #: Socket id of the memory allocation
530    mem_alloc_socket_id: int | None = field(
531        default=None,
532        metadata=TextParser.find_int(r"memory allocation on the socket: (\d+)"),
533    )
534    #:
535    mtu: int | None = field(default=None, metadata=TextParser.find_int(r"MTU: (\d+)"))
536
537    #:
538    vlan_offload: VLANOffloadFlag | None = field(
539        default=None,
540        metadata=VLANOffloadFlag.make_parser(),
541    )
542
543    #: Maximum size of RX buffer
544    max_rx_bufsize: int | None = field(
545        default=None, metadata=TextParser.find_int(r"Maximum size of RX buffer: (\d+)")
546    )
547    #: Maximum number of VFs
548    max_vfs_num: int | None = field(
549        default=None, metadata=TextParser.find_int(r"Maximum number of VFs: (\d+)")
550    )
551    #: Maximum number of VMDq pools
552    max_vmdq_pools_num: int | None = field(
553        default=None, metadata=TextParser.find_int(r"Maximum number of VMDq pools: (\d+)")
554    )
555
556    #:
557    switch_name: str | None = field(
558        default=None, metadata=TextParser.find(r"Switch name: ([\r\n]+)")
559    )
560    #:
561    switch_domain_id: int | None = field(
562        default=None, metadata=TextParser.find_int(r"Switch domain Id: (\d+)")
563    )
564    #:
565    switch_port_id: int | None = field(
566        default=None, metadata=TextParser.find_int(r"Switch Port Id: (\d+)")
567    )
568    #:
569    switch_rx_domain: int | None = field(
570        default=None, metadata=TextParser.find_int(r"Switch Rx domain: (\d+)")
571    )
572
573
574class TestPmdShell(InteractiveShell):
575    """Testpmd interactive shell.
576
577    The testpmd shell users should never use
578    the :meth:`~.interactive_shell.InteractiveShell.send_command` method directly, but rather
579    call specialized methods. If there isn't one that satisfies a need, it should be added.
580
581    Attributes:
582        number_of_ports: The number of ports which were allowed on the command-line when testpmd
583            was started.
584    """
585
586    number_of_ports: int
587
588    #: The path to the testpmd executable.
589    path: ClassVar[PurePath] = PurePath("app", "dpdk-testpmd")
590
591    #: Flag this as a DPDK app so that it's clear this is not a system app and
592    #: needs to be looked in a specific path.
593    dpdk_app: ClassVar[bool] = True
594
595    #: The testpmd's prompt.
596    _default_prompt: ClassVar[str] = "testpmd>"
597
598    #: This forces the prompt to appear after sending a command.
599    _command_extra_chars: ClassVar[str] = "\n"
600
601    def _start_application(self, get_privileged_command: Callable[[str], str] | None) -> None:
602        """Overrides :meth:`~.interactive_shell._start_application`.
603
604        Add flags for starting testpmd in interactive mode and disabling messages for link state
605        change events before starting the application. Link state is verified before starting
606        packet forwarding and the messages create unexpected newlines in the terminal which
607        complicates output collection.
608
609        Also find the number of pci addresses which were allowed on the command line when the app
610        was started.
611        """
612        self._app_args += " -i --mask-event intr_lsc"
613        self.number_of_ports = self._app_args.count("-a ")
614        super()._start_application(get_privileged_command)
615
616    def start(self, verify: bool = True) -> None:
617        """Start packet forwarding with the current configuration.
618
619        Args:
620            verify: If :data:`True` , a second start command will be sent in an attempt to verify
621                packet forwarding started as expected.
622
623        Raises:
624            InteractiveCommandExecutionError: If `verify` is :data:`True` and forwarding fails to
625                start or ports fail to come up.
626        """
627        self.send_command("start")
628        if verify:
629            # If forwarding was already started, sending "start" again should tell us
630            start_cmd_output = self.send_command("start")
631            if "Packet forwarding already started" not in start_cmd_output:
632                self._logger.debug(f"Failed to start packet forwarding: \n{start_cmd_output}")
633                raise InteractiveCommandExecutionError("Testpmd failed to start packet forwarding.")
634
635            for port_id in range(self.number_of_ports):
636                if not self.wait_link_status_up(port_id):
637                    raise InteractiveCommandExecutionError(
638                        "Not all ports came up after starting packet forwarding in testpmd."
639                    )
640
641    def stop(self, verify: bool = True) -> None:
642        """Stop packet forwarding.
643
644        Args:
645            verify: If :data:`True` , the output of the stop command is scanned to verify that
646                forwarding was stopped successfully or not started. If neither is found, it is
647                considered an error.
648
649        Raises:
650            InteractiveCommandExecutionError: If `verify` is :data:`True` and the command to stop
651                forwarding results in an error.
652        """
653        stop_cmd_output = self.send_command("stop")
654        if verify:
655            if (
656                "Done." not in stop_cmd_output
657                and "Packet forwarding not started" not in stop_cmd_output
658            ):
659                self._logger.debug(f"Failed to stop packet forwarding: \n{stop_cmd_output}")
660                raise InteractiveCommandExecutionError("Testpmd failed to stop packet forwarding.")
661
662    def get_devices(self) -> list[TestPmdDevice]:
663        """Get a list of device names that are known to testpmd.
664
665        Uses the device info listed in testpmd and then parses the output.
666
667        Returns:
668            A list of devices.
669        """
670        dev_info: str = self.send_command("show device info all")
671        dev_list: list[TestPmdDevice] = []
672        for line in dev_info.split("\n"):
673            if "device name:" in line.lower():
674                dev_list.append(TestPmdDevice(line))
675        return dev_list
676
677    def wait_link_status_up(self, port_id: int, timeout=SETTINGS.timeout) -> bool:
678        """Wait until the link status on the given port is "up".
679
680        Arguments:
681            port_id: Port to check the link status on.
682            timeout: Time to wait for the link to come up. The default value for this
683                argument may be modified using the :option:`--timeout` command-line argument
684                or the :envvar:`DTS_TIMEOUT` environment variable.
685
686        Returns:
687            Whether the link came up in time or not.
688        """
689        time_to_stop = time.time() + timeout
690        port_info: str = ""
691        while time.time() < time_to_stop:
692            port_info = self.send_command(f"show port info {port_id}")
693            if "Link status: up" in port_info:
694                break
695            time.sleep(0.5)
696        else:
697            self._logger.error(f"The link for port {port_id} did not come up in the given timeout.")
698        return "Link status: up" in port_info
699
700    def set_forward_mode(self, mode: TestPmdForwardingModes, verify: bool = True):
701        """Set packet forwarding mode.
702
703        Args:
704            mode: The forwarding mode to use.
705            verify: If :data:`True` the output of the command will be scanned in an attempt to
706                verify that the forwarding mode was set to `mode` properly.
707
708        Raises:
709            InteractiveCommandExecutionError: If `verify` is :data:`True` and the forwarding mode
710                fails to update.
711        """
712        set_fwd_output = self.send_command(f"set fwd {mode.value}")
713        if f"Set {mode.value} packet forwarding mode" not in set_fwd_output:
714            self._logger.debug(f"Failed to set fwd mode to {mode.value}:\n{set_fwd_output}")
715            raise InteractiveCommandExecutionError(
716                f"Test pmd failed to set fwd mode to {mode.value}"
717            )
718
719    def show_port_info_all(self) -> list[TestPmdPort]:
720        """Returns the information of all the ports.
721
722        Returns:
723            list[TestPmdPort]: A list containing all the ports information as `TestPmdPort`.
724        """
725        output = self.send_command("show port info all")
726
727        # Sample output of the "all" command looks like:
728        #
729        # <start>
730        #
731        #   ********************* Infos for port 0 *********************
732        #   Key: value
733        #
734        #   ********************* Infos for port 1 *********************
735        #   Key: value
736        # <end>
737        #
738        # Takes advantage of the double new line in between ports as end delimiter. But we need to
739        # artificially add a new line at the end to pick up the last port. Because commands are
740        # executed on a pseudo-terminal created by paramiko on the remote node, lines end with CRLF.
741        # Therefore we also need to take the carriage return into account.
742        iter = re.finditer(r"\*{21}.*?[\r\n]{4}", output + "\r\n", re.S)
743        return [TestPmdPort.parse(block.group(0)) for block in iter]
744
745    def show_port_info(self, port_id: int) -> TestPmdPort:
746        """Returns the given port information.
747
748        Args:
749            port_id: The port ID to gather information for.
750
751        Raises:
752            InteractiveCommandExecutionError: If `port_id` is invalid.
753
754        Returns:
755            TestPmdPort: An instance of `TestPmdPort` containing the given port's information.
756        """
757        output = self.send_command(f"show port info {port_id}", skip_first_line=True)
758        if output.startswith("Invalid port"):
759            raise InteractiveCommandExecutionError("invalid port given")
760
761        return TestPmdPort.parse(output)
762
763    def close(self) -> None:
764        """Overrides :meth:`~.interactive_shell.close`."""
765        self.send_command("quit", "")
766        return super().close()
767