xref: /dpdk/dts/framework/remote_session/testpmd_shell.py (revision 25a2a0dc3de31ca0a6fbc9371cf3dd85dfd74b07)
1# SPDX-License-Identifier: BSD-3-Clause
2# Copyright(c) 2023 University of New Hampshire
3# Copyright(c) 2023 PANTHEON.tech s.r.o.
4# Copyright(c) 2024 Arm Limited
5
6"""Testpmd interactive shell.
7
8Typical usage example in a TestSuite::
9
10    testpmd_shell = TestPmdShell(self.sut_node)
11    devices = testpmd_shell.get_devices()
12    for device in devices:
13        print(device)
14    testpmd_shell.close()
15"""
16
17import re
18import time
19from dataclasses import dataclass, field
20from enum import Flag, auto
21from pathlib import PurePath
22from typing import ClassVar
23
24from typing_extensions import Self, Unpack
25
26from framework.exception import InteractiveCommandExecutionError
27from framework.params.testpmd import SimpleForwardingModes, TestPmdParams
28from framework.params.types import TestPmdParamsDict
29from framework.parser import ParserFn, TextParser
30from framework.remote_session.dpdk_shell import DPDKShell
31from framework.settings import SETTINGS
32from framework.testbed_model.cpu import LogicalCoreCount, LogicalCoreList
33from framework.testbed_model.sut_node import SutNode
34from framework.utils import StrEnum
35
36
37class TestPmdDevice:
38    """The data of a device that testpmd can recognize.
39
40    Attributes:
41        pci_address: The PCI address of the device.
42    """
43
44    pci_address: str
45
46    def __init__(self, pci_address_line: str):
47        """Initialize the device from the testpmd output line string.
48
49        Args:
50            pci_address_line: A line of testpmd output that contains a device.
51        """
52        self.pci_address = pci_address_line.strip().split(": ")[1].strip()
53
54    def __str__(self) -> str:
55        """The PCI address captures what the device is."""
56        return self.pci_address
57
58
59class VLANOffloadFlag(Flag):
60    """Flag representing the VLAN offload settings of a NIC port."""
61
62    #:
63    STRIP = auto()
64    #:
65    FILTER = auto()
66    #:
67    EXTEND = auto()
68    #:
69    QINQ_STRIP = auto()
70
71    @classmethod
72    def from_str_dict(cls, d):
73        """Makes an instance from a dict containing the flag member names with an "on" value.
74
75        Args:
76            d: A dictionary containing the flag members as keys and any string value.
77
78        Returns:
79            A new instance of the flag.
80        """
81        flag = cls(0)
82        for name in cls.__members__:
83            if d.get(name) == "on":
84                flag |= cls[name]
85        return flag
86
87    @classmethod
88    def make_parser(cls) -> ParserFn:
89        """Makes a parser function.
90
91        Returns:
92            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
93                parser function that makes an instance of this flag from text.
94        """
95        return TextParser.wrap(
96            TextParser.find(
97                r"VLAN offload:\s+"
98                r"strip (?P<STRIP>on|off), "
99                r"filter (?P<FILTER>on|off), "
100                r"extend (?P<EXTEND>on|off), "
101                r"qinq strip (?P<QINQ_STRIP>on|off)$",
102                re.MULTILINE,
103                named=True,
104            ),
105            cls.from_str_dict,
106        )
107
108
109class RSSOffloadTypesFlag(Flag):
110    """Flag representing the RSS offload flow types supported by the NIC port."""
111
112    #:
113    ipv4 = auto()
114    #:
115    ipv4_frag = auto()
116    #:
117    ipv4_tcp = auto()
118    #:
119    ipv4_udp = auto()
120    #:
121    ipv4_sctp = auto()
122    #:
123    ipv4_other = auto()
124    #:
125    ipv6 = auto()
126    #:
127    ipv6_frag = auto()
128    #:
129    ipv6_tcp = auto()
130    #:
131    ipv6_udp = auto()
132    #:
133    ipv6_sctp = auto()
134    #:
135    ipv6_other = auto()
136    #:
137    l2_payload = auto()
138    #:
139    ipv6_ex = auto()
140    #:
141    ipv6_tcp_ex = auto()
142    #:
143    ipv6_udp_ex = auto()
144    #:
145    port = auto()
146    #:
147    vxlan = auto()
148    #:
149    geneve = auto()
150    #:
151    nvgre = auto()
152    #:
153    user_defined_22 = auto()
154    #:
155    gtpu = auto()
156    #:
157    eth = auto()
158    #:
159    s_vlan = auto()
160    #:
161    c_vlan = auto()
162    #:
163    esp = auto()
164    #:
165    ah = auto()
166    #:
167    l2tpv3 = auto()
168    #:
169    pfcp = auto()
170    #:
171    pppoe = auto()
172    #:
173    ecpri = auto()
174    #:
175    mpls = auto()
176    #:
177    ipv4_chksum = auto()
178    #:
179    l4_chksum = auto()
180    #:
181    l2tpv2 = auto()
182    #:
183    ipv6_flow_label = auto()
184    #:
185    user_defined_38 = auto()
186    #:
187    user_defined_39 = auto()
188    #:
189    user_defined_40 = auto()
190    #:
191    user_defined_41 = auto()
192    #:
193    user_defined_42 = auto()
194    #:
195    user_defined_43 = auto()
196    #:
197    user_defined_44 = auto()
198    #:
199    user_defined_45 = auto()
200    #:
201    user_defined_46 = auto()
202    #:
203    user_defined_47 = auto()
204    #:
205    user_defined_48 = auto()
206    #:
207    user_defined_49 = auto()
208    #:
209    user_defined_50 = auto()
210    #:
211    user_defined_51 = auto()
212    #:
213    l3_pre96 = auto()
214    #:
215    l3_pre64 = auto()
216    #:
217    l3_pre56 = auto()
218    #:
219    l3_pre48 = auto()
220    #:
221    l3_pre40 = auto()
222    #:
223    l3_pre32 = auto()
224    #:
225    l2_dst_only = auto()
226    #:
227    l2_src_only = auto()
228    #:
229    l4_dst_only = auto()
230    #:
231    l4_src_only = auto()
232    #:
233    l3_dst_only = auto()
234    #:
235    l3_src_only = auto()
236
237    #:
238    ip = ipv4 | ipv4_frag | ipv4_other | ipv6 | ipv6_frag | ipv6_other | ipv6_ex
239    #:
240    udp = ipv4_udp | ipv6_udp | ipv6_udp_ex
241    #:
242    tcp = ipv4_tcp | ipv6_tcp | ipv6_tcp_ex
243    #:
244    sctp = ipv4_sctp | ipv6_sctp
245    #:
246    tunnel = vxlan | geneve | nvgre
247    #:
248    vlan = s_vlan | c_vlan
249    #:
250    all = (
251        eth
252        | vlan
253        | ip
254        | tcp
255        | udp
256        | sctp
257        | l2_payload
258        | l2tpv3
259        | esp
260        | ah
261        | pfcp
262        | gtpu
263        | ecpri
264        | mpls
265        | l2tpv2
266    )
267
268    @classmethod
269    def from_list_string(cls, names: str) -> Self:
270        """Makes a flag from a whitespace-separated list of names.
271
272        Args:
273            names: a whitespace-separated list containing the members of this flag.
274
275        Returns:
276            An instance of this flag.
277        """
278        flag = cls(0)
279        for name in names.split():
280            flag |= cls.from_str(name)
281        return flag
282
283    @classmethod
284    def from_str(cls, name: str) -> Self:
285        """Makes a flag matching the supplied name.
286
287        Args:
288            name: a valid member of this flag in text
289        Returns:
290            An instance of this flag.
291        """
292        member_name = name.strip().replace("-", "_")
293        return cls[member_name]
294
295    @classmethod
296    def make_parser(cls) -> ParserFn:
297        """Makes a parser function.
298
299        Returns:
300            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
301                parser function that makes an instance of this flag from text.
302        """
303        return TextParser.wrap(
304            TextParser.find(r"Supported RSS offload flow types:((?:\r?\n?  \S+)+)", re.MULTILINE),
305            RSSOffloadTypesFlag.from_list_string,
306        )
307
308
309class DeviceCapabilitiesFlag(Flag):
310    """Flag representing the device capabilities."""
311
312    #: Device supports Rx queue setup after device started.
313    RUNTIME_RX_QUEUE_SETUP = auto()
314    #: Device supports Tx queue setup after device started.
315    RUNTIME_TX_QUEUE_SETUP = auto()
316    #: Device supports shared Rx queue among ports within Rx domain and switch domain.
317    RXQ_SHARE = auto()
318    #: Device supports keeping flow rules across restart.
319    FLOW_RULE_KEEP = auto()
320    #: Device supports keeping shared flow objects across restart.
321    FLOW_SHARED_OBJECT_KEEP = auto()
322
323    @classmethod
324    def make_parser(cls) -> ParserFn:
325        """Makes a parser function.
326
327        Returns:
328            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
329                parser function that makes an instance of this flag from text.
330        """
331        return TextParser.wrap(
332            TextParser.find_int(r"Device capabilities: (0x[A-Fa-f\d]+)"),
333            cls,
334        )
335
336
337class DeviceErrorHandlingMode(StrEnum):
338    """Enum representing the device error handling mode."""
339
340    #:
341    none = auto()
342    #:
343    passive = auto()
344    #:
345    proactive = auto()
346    #:
347    unknown = auto()
348
349    @classmethod
350    def make_parser(cls) -> ParserFn:
351        """Makes a parser function.
352
353        Returns:
354            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
355                parser function that makes an instance of this enum from text.
356        """
357        return TextParser.wrap(TextParser.find(r"Device error handling mode: (\w+)"), cls)
358
359
360def make_device_private_info_parser() -> ParserFn:
361    """Device private information parser.
362
363    Ensures that we are not parsing invalid device private info output.
364
365    Returns:
366        ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a parser
367            function that parses the device private info from the TestPmd port info output.
368    """
369
370    def _validate(info: str):
371        info = info.strip()
372        if info == "none" or info.startswith("Invalid file") or info.startswith("Failed to dump"):
373            return None
374        return info
375
376    return TextParser.wrap(TextParser.find(r"Device private info:\s+([\s\S]+)"), _validate)
377
378
379@dataclass
380class TestPmdPort(TextParser):
381    """Dataclass representing the result of testpmd's ``show port info`` command."""
382
383    #:
384    id: int = field(metadata=TextParser.find_int(r"Infos for port (\d+)\b"))
385    #:
386    device_name: str = field(metadata=TextParser.find(r"Device name: ([^\r\n]+)"))
387    #:
388    driver_name: str = field(metadata=TextParser.find(r"Driver name: ([^\r\n]+)"))
389    #:
390    socket_id: int = field(metadata=TextParser.find_int(r"Connect to socket: (\d+)"))
391    #:
392    is_link_up: bool = field(metadata=TextParser.find("Link status: up"))
393    #:
394    link_speed: str = field(metadata=TextParser.find(r"Link speed: ([^\r\n]+)"))
395    #:
396    is_link_full_duplex: bool = field(metadata=TextParser.find("Link duplex: full-duplex"))
397    #:
398    is_link_autonegotiated: bool = field(metadata=TextParser.find("Autoneg status: On"))
399    #:
400    is_promiscuous_mode_enabled: bool = field(metadata=TextParser.find("Promiscuous mode: enabled"))
401    #:
402    is_allmulticast_mode_enabled: bool = field(
403        metadata=TextParser.find("Allmulticast mode: enabled")
404    )
405    #: Maximum number of MAC addresses
406    max_mac_addresses_num: int = field(
407        metadata=TextParser.find_int(r"Maximum number of MAC addresses: (\d+)")
408    )
409    #: Maximum configurable length of RX packet
410    max_hash_mac_addresses_num: int = field(
411        metadata=TextParser.find_int(r"Maximum number of MAC addresses of hash filtering: (\d+)")
412    )
413    #: Minimum size of RX buffer
414    min_rx_bufsize: int = field(metadata=TextParser.find_int(r"Minimum size of RX buffer: (\d+)"))
415    #: Maximum configurable length of RX packet
416    max_rx_packet_length: int = field(
417        metadata=TextParser.find_int(r"Maximum configurable length of RX packet: (\d+)")
418    )
419    #: Maximum configurable size of LRO aggregated packet
420    max_lro_packet_size: int = field(
421        metadata=TextParser.find_int(r"Maximum configurable size of LRO aggregated packet: (\d+)")
422    )
423
424    #: Current number of RX queues
425    rx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of RX queues: (\d+)"))
426    #: Max possible RX queues
427    max_rx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible RX queues: (\d+)"))
428    #: Max possible number of RXDs per queue
429    max_queue_rxd_num: int = field(
430        metadata=TextParser.find_int(r"Max possible number of RXDs per queue: (\d+)")
431    )
432    #: Min possible number of RXDs per queue
433    min_queue_rxd_num: int = field(
434        metadata=TextParser.find_int(r"Min possible number of RXDs per queue: (\d+)")
435    )
436    #: RXDs number alignment
437    rxd_alignment_num: int = field(metadata=TextParser.find_int(r"RXDs number alignment: (\d+)"))
438
439    #: Current number of TX queues
440    tx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of TX queues: (\d+)"))
441    #: Max possible TX queues
442    max_tx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible TX queues: (\d+)"))
443    #: Max possible number of TXDs per queue
444    max_queue_txd_num: int = field(
445        metadata=TextParser.find_int(r"Max possible number of TXDs per queue: (\d+)")
446    )
447    #: Min possible number of TXDs per queue
448    min_queue_txd_num: int = field(
449        metadata=TextParser.find_int(r"Min possible number of TXDs per queue: (\d+)")
450    )
451    #: TXDs number alignment
452    txd_alignment_num: int = field(metadata=TextParser.find_int(r"TXDs number alignment: (\d+)"))
453    #: Max segment number per packet
454    max_packet_segment_num: int = field(
455        metadata=TextParser.find_int(r"Max segment number per packet: (\d+)")
456    )
457    #: Max segment number per MTU/TSO
458    max_mtu_segment_num: int = field(
459        metadata=TextParser.find_int(r"Max segment number per MTU\/TSO: (\d+)")
460    )
461
462    #:
463    device_capabilities: DeviceCapabilitiesFlag = field(
464        metadata=DeviceCapabilitiesFlag.make_parser(),
465    )
466    #:
467    device_error_handling_mode: DeviceErrorHandlingMode = field(
468        metadata=DeviceErrorHandlingMode.make_parser()
469    )
470    #:
471    device_private_info: str | None = field(
472        default=None,
473        metadata=make_device_private_info_parser(),
474    )
475
476    #:
477    hash_key_size: int | None = field(
478        default=None, metadata=TextParser.find_int(r"Hash key size in bytes: (\d+)")
479    )
480    #:
481    redirection_table_size: int | None = field(
482        default=None, metadata=TextParser.find_int(r"Redirection table size: (\d+)")
483    )
484    #:
485    supported_rss_offload_flow_types: RSSOffloadTypesFlag = field(
486        default=RSSOffloadTypesFlag(0), metadata=RSSOffloadTypesFlag.make_parser()
487    )
488
489    #:
490    mac_address: str | None = field(
491        default=None, metadata=TextParser.find(r"MAC address: ([A-Fa-f0-9:]+)")
492    )
493    #:
494    fw_version: str | None = field(
495        default=None, metadata=TextParser.find(r"Firmware-version: ([^\r\n]+)")
496    )
497    #:
498    dev_args: str | None = field(default=None, metadata=TextParser.find(r"Devargs: ([^\r\n]+)"))
499    #: Socket id of the memory allocation
500    mem_alloc_socket_id: int | None = field(
501        default=None,
502        metadata=TextParser.find_int(r"memory allocation on the socket: (\d+)"),
503    )
504    #:
505    mtu: int | None = field(default=None, metadata=TextParser.find_int(r"MTU: (\d+)"))
506
507    #:
508    vlan_offload: VLANOffloadFlag | None = field(
509        default=None,
510        metadata=VLANOffloadFlag.make_parser(),
511    )
512
513    #: Maximum size of RX buffer
514    max_rx_bufsize: int | None = field(
515        default=None, metadata=TextParser.find_int(r"Maximum size of RX buffer: (\d+)")
516    )
517    #: Maximum number of VFs
518    max_vfs_num: int | None = field(
519        default=None, metadata=TextParser.find_int(r"Maximum number of VFs: (\d+)")
520    )
521    #: Maximum number of VMDq pools
522    max_vmdq_pools_num: int | None = field(
523        default=None, metadata=TextParser.find_int(r"Maximum number of VMDq pools: (\d+)")
524    )
525
526    #:
527    switch_name: str | None = field(
528        default=None, metadata=TextParser.find(r"Switch name: ([\r\n]+)")
529    )
530    #:
531    switch_domain_id: int | None = field(
532        default=None, metadata=TextParser.find_int(r"Switch domain Id: (\d+)")
533    )
534    #:
535    switch_port_id: int | None = field(
536        default=None, metadata=TextParser.find_int(r"Switch Port Id: (\d+)")
537    )
538    #:
539    switch_rx_domain: int | None = field(
540        default=None, metadata=TextParser.find_int(r"Switch Rx domain: (\d+)")
541    )
542
543
544@dataclass
545class TestPmdPortStats(TextParser):
546    """Port statistics."""
547
548    #:
549    port_id: int = field(metadata=TextParser.find_int(r"NIC statistics for port (\d+)"))
550
551    #:
552    rx_packets: int = field(metadata=TextParser.find_int(r"RX-packets:\s+(\d+)"))
553    #:
554    rx_missed: int = field(metadata=TextParser.find_int(r"RX-missed:\s+(\d+)"))
555    #:
556    rx_bytes: int = field(metadata=TextParser.find_int(r"RX-bytes:\s+(\d+)"))
557    #:
558    rx_errors: int = field(metadata=TextParser.find_int(r"RX-errors:\s+(\d+)"))
559    #:
560    rx_nombuf: int = field(metadata=TextParser.find_int(r"RX-nombuf:\s+(\d+)"))
561
562    #:
563    tx_packets: int = field(metadata=TextParser.find_int(r"TX-packets:\s+(\d+)"))
564    #:
565    tx_errors: int = field(metadata=TextParser.find_int(r"TX-errors:\s+(\d+)"))
566    #:
567    tx_bytes: int = field(metadata=TextParser.find_int(r"TX-bytes:\s+(\d+)"))
568
569    #:
570    rx_pps: int = field(metadata=TextParser.find_int(r"Rx-pps:\s+(\d+)"))
571    #:
572    rx_bps: int = field(metadata=TextParser.find_int(r"Rx-bps:\s+(\d+)"))
573
574    #:
575    tx_pps: int = field(metadata=TextParser.find_int(r"Tx-pps:\s+(\d+)"))
576    #:
577    tx_bps: int = field(metadata=TextParser.find_int(r"Tx-bps:\s+(\d+)"))
578
579
580class TestPmdShell(DPDKShell):
581    """Testpmd interactive shell.
582
583    The testpmd shell users should never use
584    the :meth:`~.interactive_shell.InteractiveShell.send_command` method directly, but rather
585    call specialized methods. If there isn't one that satisfies a need, it should be added.
586    """
587
588    _app_params: TestPmdParams
589
590    #: The path to the testpmd executable.
591    path: ClassVar[PurePath] = PurePath("app", "dpdk-testpmd")
592
593    #: The testpmd's prompt.
594    _default_prompt: ClassVar[str] = "testpmd>"
595
596    #: This forces the prompt to appear after sending a command.
597    _command_extra_chars: ClassVar[str] = "\n"
598
599    def __init__(
600        self,
601        node: SutNode,
602        privileged: bool = True,
603        timeout: float = SETTINGS.timeout,
604        lcore_filter_specifier: LogicalCoreCount | LogicalCoreList = LogicalCoreCount(),
605        ascending_cores: bool = True,
606        append_prefix_timestamp: bool = True,
607        name: str | None = None,
608        **app_params: Unpack[TestPmdParamsDict],
609    ) -> None:
610        """Overrides :meth:`~.dpdk_shell.DPDKShell.__init__`. Changes app_params to kwargs."""
611        super().__init__(
612            node,
613            privileged,
614            timeout,
615            lcore_filter_specifier,
616            ascending_cores,
617            append_prefix_timestamp,
618            TestPmdParams(**app_params),
619            name,
620        )
621
622    def start(self, verify: bool = True) -> None:
623        """Start packet forwarding with the current configuration.
624
625        Args:
626            verify: If :data:`True` , a second start command will be sent in an attempt to verify
627                packet forwarding started as expected.
628
629        Raises:
630            InteractiveCommandExecutionError: If `verify` is :data:`True` and forwarding fails to
631                start or ports fail to come up.
632        """
633        self.send_command("start")
634        if verify:
635            # If forwarding was already started, sending "start" again should tell us
636            start_cmd_output = self.send_command("start")
637            if "Packet forwarding already started" not in start_cmd_output:
638                self._logger.debug(f"Failed to start packet forwarding: \n{start_cmd_output}")
639                raise InteractiveCommandExecutionError("Testpmd failed to start packet forwarding.")
640
641            number_of_ports = len(self._app_params.ports or [])
642            for port_id in range(number_of_ports):
643                if not self.wait_link_status_up(port_id):
644                    raise InteractiveCommandExecutionError(
645                        "Not all ports came up after starting packet forwarding in testpmd."
646                    )
647
648    def stop(self, verify: bool = True) -> None:
649        """Stop packet forwarding.
650
651        Args:
652            verify: If :data:`True` , the output of the stop command is scanned to verify that
653                forwarding was stopped successfully or not started. If neither is found, it is
654                considered an error.
655
656        Raises:
657            InteractiveCommandExecutionError: If `verify` is :data:`True` and the command to stop
658                forwarding results in an error.
659        """
660        stop_cmd_output = self.send_command("stop")
661        if verify:
662            if (
663                "Done." not in stop_cmd_output
664                and "Packet forwarding not started" not in stop_cmd_output
665            ):
666                self._logger.debug(f"Failed to stop packet forwarding: \n{stop_cmd_output}")
667                raise InteractiveCommandExecutionError("Testpmd failed to stop packet forwarding.")
668
669    def get_devices(self) -> list[TestPmdDevice]:
670        """Get a list of device names that are known to testpmd.
671
672        Uses the device info listed in testpmd and then parses the output.
673
674        Returns:
675            A list of devices.
676        """
677        dev_info: str = self.send_command("show device info all")
678        dev_list: list[TestPmdDevice] = []
679        for line in dev_info.split("\n"):
680            if "device name:" in line.lower():
681                dev_list.append(TestPmdDevice(line))
682        return dev_list
683
684    def wait_link_status_up(self, port_id: int, timeout=SETTINGS.timeout) -> bool:
685        """Wait until the link status on the given port is "up".
686
687        Arguments:
688            port_id: Port to check the link status on.
689            timeout: Time to wait for the link to come up. The default value for this
690                argument may be modified using the :option:`--timeout` command-line argument
691                or the :envvar:`DTS_TIMEOUT` environment variable.
692
693        Returns:
694            Whether the link came up in time or not.
695        """
696        time_to_stop = time.time() + timeout
697        port_info: str = ""
698        while time.time() < time_to_stop:
699            port_info = self.send_command(f"show port info {port_id}")
700            if "Link status: up" in port_info:
701                break
702            time.sleep(0.5)
703        else:
704            self._logger.error(f"The link for port {port_id} did not come up in the given timeout.")
705        return "Link status: up" in port_info
706
707    def set_forward_mode(self, mode: SimpleForwardingModes, verify: bool = True):
708        """Set packet forwarding mode.
709
710        Args:
711            mode: The forwarding mode to use.
712            verify: If :data:`True` the output of the command will be scanned in an attempt to
713                verify that the forwarding mode was set to `mode` properly.
714
715        Raises:
716            InteractiveCommandExecutionError: If `verify` is :data:`True` and the forwarding mode
717                fails to update.
718        """
719        set_fwd_output = self.send_command(f"set fwd {mode.value}")
720        if f"Set {mode.value} packet forwarding mode" not in set_fwd_output:
721            self._logger.debug(f"Failed to set fwd mode to {mode.value}:\n{set_fwd_output}")
722            raise InteractiveCommandExecutionError(
723                f"Test pmd failed to set fwd mode to {mode.value}"
724            )
725
726    def show_port_info_all(self) -> list[TestPmdPort]:
727        """Returns the information of all the ports.
728
729        Returns:
730            list[TestPmdPort]: A list containing all the ports information as `TestPmdPort`.
731        """
732        output = self.send_command("show port info all")
733
734        # Sample output of the "all" command looks like:
735        #
736        # <start>
737        #
738        #   ********************* Infos for port 0 *********************
739        #   Key: value
740        #
741        #   ********************* Infos for port 1 *********************
742        #   Key: value
743        # <end>
744        #
745        # Takes advantage of the double new line in between ports as end delimiter. But we need to
746        # artificially add a new line at the end to pick up the last port. Because commands are
747        # executed on a pseudo-terminal created by paramiko on the remote node, lines end with CRLF.
748        # Therefore we also need to take the carriage return into account.
749        iter = re.finditer(r"\*{21}.*?[\r\n]{4}", output + "\r\n", re.S)
750        return [TestPmdPort.parse(block.group(0)) for block in iter]
751
752    def show_port_info(self, port_id: int) -> TestPmdPort:
753        """Returns the given port information.
754
755        Args:
756            port_id: The port ID to gather information for.
757
758        Raises:
759            InteractiveCommandExecutionError: If `port_id` is invalid.
760
761        Returns:
762            TestPmdPort: An instance of `TestPmdPort` containing the given port's information.
763        """
764        output = self.send_command(f"show port info {port_id}", skip_first_line=True)
765        if output.startswith("Invalid port"):
766            raise InteractiveCommandExecutionError("invalid port given")
767
768        return TestPmdPort.parse(output)
769
770    def show_port_stats_all(self) -> list[TestPmdPortStats]:
771        """Returns the statistics of all the ports.
772
773        Returns:
774            list[TestPmdPortStats]: A list containing all the ports stats as `TestPmdPortStats`.
775        """
776        output = self.send_command("show port stats all")
777
778        # Sample output of the "all" command looks like:
779        #
780        #   ########### NIC statistics for port 0 ###########
781        #   values...
782        #   #################################################
783        #
784        #   ########### NIC statistics for port 1 ###########
785        #   values...
786        #   #################################################
787        #
788        iter = re.finditer(r"(^  #*.+#*$[^#]+)^  #*\r$", output, re.MULTILINE)
789        return [TestPmdPortStats.parse(block.group(1)) for block in iter]
790
791    def show_port_stats(self, port_id: int) -> TestPmdPortStats:
792        """Returns the given port statistics.
793
794        Args:
795            port_id: The port ID to gather information for.
796
797        Raises:
798            InteractiveCommandExecutionError: If `port_id` is invalid.
799
800        Returns:
801            TestPmdPortStats: An instance of `TestPmdPortStats` containing the given port's stats.
802        """
803        output = self.send_command(f"show port stats {port_id}", skip_first_line=True)
804        if output.startswith("Invalid port"):
805            raise InteractiveCommandExecutionError("invalid port given")
806
807        return TestPmdPortStats.parse(output)
808
809    def _close(self) -> None:
810        """Overrides :meth:`~.interactive_shell.close`."""
811        self.stop()
812        self.send_command("quit", "Bye...")
813        return super()._close()
814