xref: /dpdk/dts/framework/remote_session/testpmd_shell.py (revision 53eacf3d71f011d7c7ae9f154c5c40f8b12c1b17)
1# SPDX-License-Identifier: BSD-3-Clause
2# Copyright(c) 2023 University of New Hampshire
3# Copyright(c) 2023 PANTHEON.tech s.r.o.
4# Copyright(c) 2024 Arm Limited
5
6"""Testpmd interactive shell.
7
8Typical usage example in a TestSuite::
9
10    testpmd_shell = self.sut_node.create_interactive_shell(
11            TestPmdShell, privileged=True
12        )
13    devices = testpmd_shell.get_devices()
14    for device in devices:
15        print(device)
16    testpmd_shell.close()
17"""
18
19import re
20import time
21from dataclasses import dataclass, field
22from enum import Flag, auto
23from pathlib import PurePath
24from typing import Callable, ClassVar
25
26from typing_extensions import Self
27
28from framework.exception import InteractiveCommandExecutionError
29from framework.parser import ParserFn, TextParser
30from framework.settings import SETTINGS
31from framework.utils import StrEnum
32
33from .interactive_shell import InteractiveShell
34
35
36class TestPmdDevice:
37    """The data of a device that testpmd can recognize.
38
39    Attributes:
40        pci_address: The PCI address of the device.
41    """
42
43    pci_address: str
44
45    def __init__(self, pci_address_line: str):
46        """Initialize the device from the testpmd output line string.
47
48        Args:
49            pci_address_line: A line of testpmd output that contains a device.
50        """
51        self.pci_address = pci_address_line.strip().split(": ")[1].strip()
52
53    def __str__(self) -> str:
54        """The PCI address captures what the device is."""
55        return self.pci_address
56
57
58class TestPmdForwardingModes(StrEnum):
59    r"""The supported packet forwarding modes for :class:`~TestPmdShell`\s."""
60
61    #:
62    io = auto()
63    #:
64    mac = auto()
65    #:
66    macswap = auto()
67    #:
68    flowgen = auto()
69    #:
70    rxonly = auto()
71    #:
72    txonly = auto()
73    #:
74    csum = auto()
75    #:
76    icmpecho = auto()
77    #:
78    ieee1588 = auto()
79    #:
80    noisy = auto()
81    #:
82    fivetswap = "5tswap"
83    #:
84    shared_rxq = "shared-rxq"
85    #:
86    recycle_mbufs = auto()
87
88
89class VLANOffloadFlag(Flag):
90    """Flag representing the VLAN offload settings of a NIC port."""
91
92    #:
93    STRIP = auto()
94    #:
95    FILTER = auto()
96    #:
97    EXTEND = auto()
98    #:
99    QINQ_STRIP = auto()
100
101    @classmethod
102    def from_str_dict(cls, d):
103        """Makes an instance from a dict containing the flag member names with an "on" value.
104
105        Args:
106            d: A dictionary containing the flag members as keys and any string value.
107
108        Returns:
109            A new instance of the flag.
110        """
111        flag = cls(0)
112        for name in cls.__members__:
113            if d.get(name) == "on":
114                flag |= cls[name]
115        return flag
116
117    @classmethod
118    def make_parser(cls) -> ParserFn:
119        """Makes a parser function.
120
121        Returns:
122            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
123                parser function that makes an instance of this flag from text.
124        """
125        return TextParser.wrap(
126            TextParser.find(
127                r"VLAN offload:\s+"
128                r"strip (?P<STRIP>on|off), "
129                r"filter (?P<FILTER>on|off), "
130                r"extend (?P<EXTEND>on|off), "
131                r"qinq strip (?P<QINQ_STRIP>on|off)$",
132                re.MULTILINE,
133                named=True,
134            ),
135            cls.from_str_dict,
136        )
137
138
139class RSSOffloadTypesFlag(Flag):
140    """Flag representing the RSS offload flow types supported by the NIC port."""
141
142    #:
143    ipv4 = auto()
144    #:
145    ipv4_frag = auto()
146    #:
147    ipv4_tcp = auto()
148    #:
149    ipv4_udp = auto()
150    #:
151    ipv4_sctp = auto()
152    #:
153    ipv4_other = auto()
154    #:
155    ipv6 = auto()
156    #:
157    ipv6_frag = auto()
158    #:
159    ipv6_tcp = auto()
160    #:
161    ipv6_udp = auto()
162    #:
163    ipv6_sctp = auto()
164    #:
165    ipv6_other = auto()
166    #:
167    l2_payload = auto()
168    #:
169    ipv6_ex = auto()
170    #:
171    ipv6_tcp_ex = auto()
172    #:
173    ipv6_udp_ex = auto()
174    #:
175    port = auto()
176    #:
177    vxlan = auto()
178    #:
179    geneve = auto()
180    #:
181    nvgre = auto()
182    #:
183    user_defined_22 = auto()
184    #:
185    gtpu = auto()
186    #:
187    eth = auto()
188    #:
189    s_vlan = auto()
190    #:
191    c_vlan = auto()
192    #:
193    esp = auto()
194    #:
195    ah = auto()
196    #:
197    l2tpv3 = auto()
198    #:
199    pfcp = auto()
200    #:
201    pppoe = auto()
202    #:
203    ecpri = auto()
204    #:
205    mpls = auto()
206    #:
207    ipv4_chksum = auto()
208    #:
209    l4_chksum = auto()
210    #:
211    l2tpv2 = auto()
212    #:
213    ipv6_flow_label = auto()
214    #:
215    user_defined_38 = auto()
216    #:
217    user_defined_39 = auto()
218    #:
219    user_defined_40 = auto()
220    #:
221    user_defined_41 = auto()
222    #:
223    user_defined_42 = auto()
224    #:
225    user_defined_43 = auto()
226    #:
227    user_defined_44 = auto()
228    #:
229    user_defined_45 = auto()
230    #:
231    user_defined_46 = auto()
232    #:
233    user_defined_47 = auto()
234    #:
235    user_defined_48 = auto()
236    #:
237    user_defined_49 = auto()
238    #:
239    user_defined_50 = auto()
240    #:
241    user_defined_51 = auto()
242    #:
243    l3_pre96 = auto()
244    #:
245    l3_pre64 = auto()
246    #:
247    l3_pre56 = auto()
248    #:
249    l3_pre48 = auto()
250    #:
251    l3_pre40 = auto()
252    #:
253    l3_pre32 = auto()
254    #:
255    l2_dst_only = auto()
256    #:
257    l2_src_only = auto()
258    #:
259    l4_dst_only = auto()
260    #:
261    l4_src_only = auto()
262    #:
263    l3_dst_only = auto()
264    #:
265    l3_src_only = auto()
266
267    #:
268    ip = ipv4 | ipv4_frag | ipv4_other | ipv6 | ipv6_frag | ipv6_other | ipv6_ex
269    #:
270    udp = ipv4_udp | ipv6_udp | ipv6_udp_ex
271    #:
272    tcp = ipv4_tcp | ipv6_tcp | ipv6_tcp_ex
273    #:
274    sctp = ipv4_sctp | ipv6_sctp
275    #:
276    tunnel = vxlan | geneve | nvgre
277    #:
278    vlan = s_vlan | c_vlan
279    #:
280    all = (
281        eth
282        | vlan
283        | ip
284        | tcp
285        | udp
286        | sctp
287        | l2_payload
288        | l2tpv3
289        | esp
290        | ah
291        | pfcp
292        | gtpu
293        | ecpri
294        | mpls
295        | l2tpv2
296    )
297
298    @classmethod
299    def from_list_string(cls, names: str) -> Self:
300        """Makes a flag from a whitespace-separated list of names.
301
302        Args:
303            names: a whitespace-separated list containing the members of this flag.
304
305        Returns:
306            An instance of this flag.
307        """
308        flag = cls(0)
309        for name in names.split():
310            flag |= cls.from_str(name)
311        return flag
312
313    @classmethod
314    def from_str(cls, name: str) -> Self:
315        """Makes a flag matching the supplied name.
316
317        Args:
318            name: a valid member of this flag in text
319        Returns:
320            An instance of this flag.
321        """
322        member_name = name.strip().replace("-", "_")
323        return cls[member_name]
324
325    @classmethod
326    def make_parser(cls) -> ParserFn:
327        """Makes a parser function.
328
329        Returns:
330            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
331                parser function that makes an instance of this flag from text.
332        """
333        return TextParser.wrap(
334            TextParser.find(r"Supported RSS offload flow types:((?:\r?\n?  \S+)+)", re.MULTILINE),
335            RSSOffloadTypesFlag.from_list_string,
336        )
337
338
339class DeviceCapabilitiesFlag(Flag):
340    """Flag representing the device capabilities."""
341
342    #: Device supports Rx queue setup after device started.
343    RUNTIME_RX_QUEUE_SETUP = auto()
344    #: Device supports Tx queue setup after device started.
345    RUNTIME_TX_QUEUE_SETUP = auto()
346    #: Device supports shared Rx queue among ports within Rx domain and switch domain.
347    RXQ_SHARE = auto()
348    #: Device supports keeping flow rules across restart.
349    FLOW_RULE_KEEP = auto()
350    #: Device supports keeping shared flow objects across restart.
351    FLOW_SHARED_OBJECT_KEEP = auto()
352
353    @classmethod
354    def make_parser(cls) -> ParserFn:
355        """Makes a parser function.
356
357        Returns:
358            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
359                parser function that makes an instance of this flag from text.
360        """
361        return TextParser.wrap(
362            TextParser.find_int(r"Device capabilities: (0x[A-Fa-f\d]+)"),
363            cls,
364        )
365
366
367class DeviceErrorHandlingMode(StrEnum):
368    """Enum representing the device error handling mode."""
369
370    #:
371    none = auto()
372    #:
373    passive = auto()
374    #:
375    proactive = auto()
376    #:
377    unknown = auto()
378
379    @classmethod
380    def make_parser(cls) -> ParserFn:
381        """Makes a parser function.
382
383        Returns:
384            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
385                parser function that makes an instance of this enum from text.
386        """
387        return TextParser.wrap(TextParser.find(r"Device error handling mode: (\w+)"), cls)
388
389
390def make_device_private_info_parser() -> ParserFn:
391    """Device private information parser.
392
393    Ensures that we are not parsing invalid device private info output.
394
395    Returns:
396        ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a parser
397            function that parses the device private info from the TestPmd port info output.
398    """
399
400    def _validate(info: str):
401        info = info.strip()
402        if info == "none" or info.startswith("Invalid file") or info.startswith("Failed to dump"):
403            return None
404        return info
405
406    return TextParser.wrap(TextParser.find(r"Device private info:\s+([\s\S]+)"), _validate)
407
408
409@dataclass
410class TestPmdPort(TextParser):
411    """Dataclass representing the result of testpmd's ``show port info`` command."""
412
413    #:
414    id: int = field(metadata=TextParser.find_int(r"Infos for port (\d+)\b"))
415    #:
416    device_name: str = field(metadata=TextParser.find(r"Device name: ([^\r\n]+)"))
417    #:
418    driver_name: str = field(metadata=TextParser.find(r"Driver name: ([^\r\n]+)"))
419    #:
420    socket_id: int = field(metadata=TextParser.find_int(r"Connect to socket: (\d+)"))
421    #:
422    is_link_up: bool = field(metadata=TextParser.find("Link status: up"))
423    #:
424    link_speed: str = field(metadata=TextParser.find(r"Link speed: ([^\r\n]+)"))
425    #:
426    is_link_full_duplex: bool = field(metadata=TextParser.find("Link duplex: full-duplex"))
427    #:
428    is_link_autonegotiated: bool = field(metadata=TextParser.find("Autoneg status: On"))
429    #:
430    is_promiscuous_mode_enabled: bool = field(metadata=TextParser.find("Promiscuous mode: enabled"))
431    #:
432    is_allmulticast_mode_enabled: bool = field(
433        metadata=TextParser.find("Allmulticast mode: enabled")
434    )
435    #: Maximum number of MAC addresses
436    max_mac_addresses_num: int = field(
437        metadata=TextParser.find_int(r"Maximum number of MAC addresses: (\d+)")
438    )
439    #: Maximum configurable length of RX packet
440    max_hash_mac_addresses_num: int = field(
441        metadata=TextParser.find_int(r"Maximum number of MAC addresses of hash filtering: (\d+)")
442    )
443    #: Minimum size of RX buffer
444    min_rx_bufsize: int = field(metadata=TextParser.find_int(r"Minimum size of RX buffer: (\d+)"))
445    #: Maximum configurable length of RX packet
446    max_rx_packet_length: int = field(
447        metadata=TextParser.find_int(r"Maximum configurable length of RX packet: (\d+)")
448    )
449    #: Maximum configurable size of LRO aggregated packet
450    max_lro_packet_size: int = field(
451        metadata=TextParser.find_int(r"Maximum configurable size of LRO aggregated packet: (\d+)")
452    )
453
454    #: Current number of RX queues
455    rx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of RX queues: (\d+)"))
456    #: Max possible RX queues
457    max_rx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible RX queues: (\d+)"))
458    #: Max possible number of RXDs per queue
459    max_queue_rxd_num: int = field(
460        metadata=TextParser.find_int(r"Max possible number of RXDs per queue: (\d+)")
461    )
462    #: Min possible number of RXDs per queue
463    min_queue_rxd_num: int = field(
464        metadata=TextParser.find_int(r"Min possible number of RXDs per queue: (\d+)")
465    )
466    #: RXDs number alignment
467    rxd_alignment_num: int = field(metadata=TextParser.find_int(r"RXDs number alignment: (\d+)"))
468
469    #: Current number of TX queues
470    tx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of TX queues: (\d+)"))
471    #: Max possible TX queues
472    max_tx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible TX queues: (\d+)"))
473    #: Max possible number of TXDs per queue
474    max_queue_txd_num: int = field(
475        metadata=TextParser.find_int(r"Max possible number of TXDs per queue: (\d+)")
476    )
477    #: Min possible number of TXDs per queue
478    min_queue_txd_num: int = field(
479        metadata=TextParser.find_int(r"Min possible number of TXDs per queue: (\d+)")
480    )
481    #: TXDs number alignment
482    txd_alignment_num: int = field(metadata=TextParser.find_int(r"TXDs number alignment: (\d+)"))
483    #: Max segment number per packet
484    max_packet_segment_num: int = field(
485        metadata=TextParser.find_int(r"Max segment number per packet: (\d+)")
486    )
487    #: Max segment number per MTU/TSO
488    max_mtu_segment_num: int = field(
489        metadata=TextParser.find_int(r"Max segment number per MTU\/TSO: (\d+)")
490    )
491
492    #:
493    device_capabilities: DeviceCapabilitiesFlag = field(
494        metadata=DeviceCapabilitiesFlag.make_parser(),
495    )
496    #:
497    device_error_handling_mode: DeviceErrorHandlingMode = field(
498        metadata=DeviceErrorHandlingMode.make_parser()
499    )
500    #:
501    device_private_info: str | None = field(
502        default=None,
503        metadata=make_device_private_info_parser(),
504    )
505
506    #:
507    hash_key_size: int | None = field(
508        default=None, metadata=TextParser.find_int(r"Hash key size in bytes: (\d+)")
509    )
510    #:
511    redirection_table_size: int | None = field(
512        default=None, metadata=TextParser.find_int(r"Redirection table size: (\d+)")
513    )
514    #:
515    supported_rss_offload_flow_types: RSSOffloadTypesFlag = field(
516        default=RSSOffloadTypesFlag(0), metadata=RSSOffloadTypesFlag.make_parser()
517    )
518
519    #:
520    mac_address: str | None = field(
521        default=None, metadata=TextParser.find(r"MAC address: ([A-Fa-f0-9:]+)")
522    )
523    #:
524    fw_version: str | None = field(
525        default=None, metadata=TextParser.find(r"Firmware-version: ([^\r\n]+)")
526    )
527    #:
528    dev_args: str | None = field(default=None, metadata=TextParser.find(r"Devargs: ([^\r\n]+)"))
529    #: Socket id of the memory allocation
530    mem_alloc_socket_id: int | None = field(
531        default=None,
532        metadata=TextParser.find_int(r"memory allocation on the socket: (\d+)"),
533    )
534    #:
535    mtu: int | None = field(default=None, metadata=TextParser.find_int(r"MTU: (\d+)"))
536
537    #:
538    vlan_offload: VLANOffloadFlag | None = field(
539        default=None,
540        metadata=VLANOffloadFlag.make_parser(),
541    )
542
543    #: Maximum size of RX buffer
544    max_rx_bufsize: int | None = field(
545        default=None, metadata=TextParser.find_int(r"Maximum size of RX buffer: (\d+)")
546    )
547    #: Maximum number of VFs
548    max_vfs_num: int | None = field(
549        default=None, metadata=TextParser.find_int(r"Maximum number of VFs: (\d+)")
550    )
551    #: Maximum number of VMDq pools
552    max_vmdq_pools_num: int | None = field(
553        default=None, metadata=TextParser.find_int(r"Maximum number of VMDq pools: (\d+)")
554    )
555
556    #:
557    switch_name: str | None = field(
558        default=None, metadata=TextParser.find(r"Switch name: ([\r\n]+)")
559    )
560    #:
561    switch_domain_id: int | None = field(
562        default=None, metadata=TextParser.find_int(r"Switch domain Id: (\d+)")
563    )
564    #:
565    switch_port_id: int | None = field(
566        default=None, metadata=TextParser.find_int(r"Switch Port Id: (\d+)")
567    )
568    #:
569    switch_rx_domain: int | None = field(
570        default=None, metadata=TextParser.find_int(r"Switch Rx domain: (\d+)")
571    )
572
573
574@dataclass
575class TestPmdPortStats(TextParser):
576    """Port statistics."""
577
578    #:
579    port_id: int = field(metadata=TextParser.find_int(r"NIC statistics for port (\d+)"))
580
581    #:
582    rx_packets: int = field(metadata=TextParser.find_int(r"RX-packets:\s+(\d+)"))
583    #:
584    rx_missed: int = field(metadata=TextParser.find_int(r"RX-missed:\s+(\d+)"))
585    #:
586    rx_bytes: int = field(metadata=TextParser.find_int(r"RX-bytes:\s+(\d+)"))
587    #:
588    rx_errors: int = field(metadata=TextParser.find_int(r"RX-errors:\s+(\d+)"))
589    #:
590    rx_nombuf: int = field(metadata=TextParser.find_int(r"RX-nombuf:\s+(\d+)"))
591
592    #:
593    tx_packets: int = field(metadata=TextParser.find_int(r"TX-packets:\s+(\d+)"))
594    #:
595    tx_errors: int = field(metadata=TextParser.find_int(r"TX-errors:\s+(\d+)"))
596    #:
597    tx_bytes: int = field(metadata=TextParser.find_int(r"TX-bytes:\s+(\d+)"))
598
599    #:
600    rx_pps: int = field(metadata=TextParser.find_int(r"Rx-pps:\s+(\d+)"))
601    #:
602    rx_bps: int = field(metadata=TextParser.find_int(r"Rx-bps:\s+(\d+)"))
603
604    #:
605    tx_pps: int = field(metadata=TextParser.find_int(r"Tx-pps:\s+(\d+)"))
606    #:
607    tx_bps: int = field(metadata=TextParser.find_int(r"Tx-bps:\s+(\d+)"))
608
609
610class TestPmdShell(InteractiveShell):
611    """Testpmd interactive shell.
612
613    The testpmd shell users should never use
614    the :meth:`~.interactive_shell.InteractiveShell.send_command` method directly, but rather
615    call specialized methods. If there isn't one that satisfies a need, it should be added.
616
617    Attributes:
618        number_of_ports: The number of ports which were allowed on the command-line when testpmd
619            was started.
620    """
621
622    number_of_ports: int
623
624    #: The path to the testpmd executable.
625    path: ClassVar[PurePath] = PurePath("app", "dpdk-testpmd")
626
627    #: Flag this as a DPDK app so that it's clear this is not a system app and
628    #: needs to be looked in a specific path.
629    dpdk_app: ClassVar[bool] = True
630
631    #: The testpmd's prompt.
632    _default_prompt: ClassVar[str] = "testpmd>"
633
634    #: This forces the prompt to appear after sending a command.
635    _command_extra_chars: ClassVar[str] = "\n"
636
637    def _start_application(self, get_privileged_command: Callable[[str], str] | None) -> None:
638        """Overrides :meth:`~.interactive_shell._start_application`.
639
640        Add flags for starting testpmd in interactive mode and disabling messages for link state
641        change events before starting the application. Link state is verified before starting
642        packet forwarding and the messages create unexpected newlines in the terminal which
643        complicates output collection.
644
645        Also find the number of pci addresses which were allowed on the command line when the app
646        was started.
647        """
648        self._app_args += " -i --mask-event intr_lsc"
649        self.number_of_ports = self._app_args.count("-a ")
650        super()._start_application(get_privileged_command)
651
652    def start(self, verify: bool = True) -> None:
653        """Start packet forwarding with the current configuration.
654
655        Args:
656            verify: If :data:`True` , a second start command will be sent in an attempt to verify
657                packet forwarding started as expected.
658
659        Raises:
660            InteractiveCommandExecutionError: If `verify` is :data:`True` and forwarding fails to
661                start or ports fail to come up.
662        """
663        self.send_command("start")
664        if verify:
665            # If forwarding was already started, sending "start" again should tell us
666            start_cmd_output = self.send_command("start")
667            if "Packet forwarding already started" not in start_cmd_output:
668                self._logger.debug(f"Failed to start packet forwarding: \n{start_cmd_output}")
669                raise InteractiveCommandExecutionError("Testpmd failed to start packet forwarding.")
670
671            for port_id in range(self.number_of_ports):
672                if not self.wait_link_status_up(port_id):
673                    raise InteractiveCommandExecutionError(
674                        "Not all ports came up after starting packet forwarding in testpmd."
675                    )
676
677    def stop(self, verify: bool = True) -> None:
678        """Stop packet forwarding.
679
680        Args:
681            verify: If :data:`True` , the output of the stop command is scanned to verify that
682                forwarding was stopped successfully or not started. If neither is found, it is
683                considered an error.
684
685        Raises:
686            InteractiveCommandExecutionError: If `verify` is :data:`True` and the command to stop
687                forwarding results in an error.
688        """
689        stop_cmd_output = self.send_command("stop")
690        if verify:
691            if (
692                "Done." not in stop_cmd_output
693                and "Packet forwarding not started" not in stop_cmd_output
694            ):
695                self._logger.debug(f"Failed to stop packet forwarding: \n{stop_cmd_output}")
696                raise InteractiveCommandExecutionError("Testpmd failed to stop packet forwarding.")
697
698    def get_devices(self) -> list[TestPmdDevice]:
699        """Get a list of device names that are known to testpmd.
700
701        Uses the device info listed in testpmd and then parses the output.
702
703        Returns:
704            A list of devices.
705        """
706        dev_info: str = self.send_command("show device info all")
707        dev_list: list[TestPmdDevice] = []
708        for line in dev_info.split("\n"):
709            if "device name:" in line.lower():
710                dev_list.append(TestPmdDevice(line))
711        return dev_list
712
713    def wait_link_status_up(self, port_id: int, timeout=SETTINGS.timeout) -> bool:
714        """Wait until the link status on the given port is "up".
715
716        Arguments:
717            port_id: Port to check the link status on.
718            timeout: Time to wait for the link to come up. The default value for this
719                argument may be modified using the :option:`--timeout` command-line argument
720                or the :envvar:`DTS_TIMEOUT` environment variable.
721
722        Returns:
723            Whether the link came up in time or not.
724        """
725        time_to_stop = time.time() + timeout
726        port_info: str = ""
727        while time.time() < time_to_stop:
728            port_info = self.send_command(f"show port info {port_id}")
729            if "Link status: up" in port_info:
730                break
731            time.sleep(0.5)
732        else:
733            self._logger.error(f"The link for port {port_id} did not come up in the given timeout.")
734        return "Link status: up" in port_info
735
736    def set_forward_mode(self, mode: TestPmdForwardingModes, verify: bool = True):
737        """Set packet forwarding mode.
738
739        Args:
740            mode: The forwarding mode to use.
741            verify: If :data:`True` the output of the command will be scanned in an attempt to
742                verify that the forwarding mode was set to `mode` properly.
743
744        Raises:
745            InteractiveCommandExecutionError: If `verify` is :data:`True` and the forwarding mode
746                fails to update.
747        """
748        set_fwd_output = self.send_command(f"set fwd {mode.value}")
749        if f"Set {mode.value} packet forwarding mode" not in set_fwd_output:
750            self._logger.debug(f"Failed to set fwd mode to {mode.value}:\n{set_fwd_output}")
751            raise InteractiveCommandExecutionError(
752                f"Test pmd failed to set fwd mode to {mode.value}"
753            )
754
755    def show_port_info_all(self) -> list[TestPmdPort]:
756        """Returns the information of all the ports.
757
758        Returns:
759            list[TestPmdPort]: A list containing all the ports information as `TestPmdPort`.
760        """
761        output = self.send_command("show port info all")
762
763        # Sample output of the "all" command looks like:
764        #
765        # <start>
766        #
767        #   ********************* Infos for port 0 *********************
768        #   Key: value
769        #
770        #   ********************* Infos for port 1 *********************
771        #   Key: value
772        # <end>
773        #
774        # Takes advantage of the double new line in between ports as end delimiter. But we need to
775        # artificially add a new line at the end to pick up the last port. Because commands are
776        # executed on a pseudo-terminal created by paramiko on the remote node, lines end with CRLF.
777        # Therefore we also need to take the carriage return into account.
778        iter = re.finditer(r"\*{21}.*?[\r\n]{4}", output + "\r\n", re.S)
779        return [TestPmdPort.parse(block.group(0)) for block in iter]
780
781    def show_port_info(self, port_id: int) -> TestPmdPort:
782        """Returns the given port information.
783
784        Args:
785            port_id: The port ID to gather information for.
786
787        Raises:
788            InteractiveCommandExecutionError: If `port_id` is invalid.
789
790        Returns:
791            TestPmdPort: An instance of `TestPmdPort` containing the given port's information.
792        """
793        output = self.send_command(f"show port info {port_id}", skip_first_line=True)
794        if output.startswith("Invalid port"):
795            raise InteractiveCommandExecutionError("invalid port given")
796
797        return TestPmdPort.parse(output)
798
799    def show_port_stats_all(self) -> list[TestPmdPortStats]:
800        """Returns the statistics of all the ports.
801
802        Returns:
803            list[TestPmdPortStats]: A list containing all the ports stats as `TestPmdPortStats`.
804        """
805        output = self.send_command("show port stats all")
806
807        # Sample output of the "all" command looks like:
808        #
809        #   ########### NIC statistics for port 0 ###########
810        #   values...
811        #   #################################################
812        #
813        #   ########### NIC statistics for port 1 ###########
814        #   values...
815        #   #################################################
816        #
817        iter = re.finditer(r"(^  #*.+#*$[^#]+)^  #*\r$", output, re.MULTILINE)
818        return [TestPmdPortStats.parse(block.group(1)) for block in iter]
819
820    def show_port_stats(self, port_id: int) -> TestPmdPortStats:
821        """Returns the given port statistics.
822
823        Args:
824            port_id: The port ID to gather information for.
825
826        Raises:
827            InteractiveCommandExecutionError: If `port_id` is invalid.
828
829        Returns:
830            TestPmdPortStats: An instance of `TestPmdPortStats` containing the given port's stats.
831        """
832        output = self.send_command(f"show port stats {port_id}", skip_first_line=True)
833        if output.startswith("Invalid port"):
834            raise InteractiveCommandExecutionError("invalid port given")
835
836        return TestPmdPortStats.parse(output)
837
838    def close(self) -> None:
839        """Overrides :meth:`~.interactive_shell.close`."""
840        self.send_command("quit", "")
841        return super().close()
842