xref: /dpdk/dts/framework/remote_session/testpmd_shell.py (revision 21a66096bb44a4468353782c36fc85913520dc6c)
1# SPDX-License-Identifier: BSD-3-Clause
2# Copyright(c) 2023 University of New Hampshire
3# Copyright(c) 2023 PANTHEON.tech s.r.o.
4# Copyright(c) 2024 Arm Limited
5
6"""Testpmd interactive shell.
7
8Typical usage example in a TestSuite::
9
10    testpmd_shell = TestPmdShell(self.sut_node)
11    devices = testpmd_shell.get_devices()
12    for device in devices:
13        print(device)
14    testpmd_shell.close()
15"""
16
17import functools
18import re
19import time
20from collections.abc import Callable, MutableSet
21from dataclasses import dataclass, field
22from enum import Flag, auto
23from pathlib import PurePath
24from typing import TYPE_CHECKING, Any, ClassVar, Concatenate, ParamSpec, TypeAlias
25
26if TYPE_CHECKING:
27    from enum import Enum as NoAliasEnum
28else:
29    from aenum import NoAliasEnum
30
31from typing_extensions import Self, Unpack
32
33from framework.exception import InteractiveCommandExecutionError, InternalError
34from framework.params.testpmd import SimpleForwardingModes, TestPmdParams
35from framework.params.types import TestPmdParamsDict
36from framework.parser import ParserFn, TextParser
37from framework.remote_session.dpdk_shell import DPDKShell
38from framework.settings import SETTINGS
39from framework.testbed_model.cpu import LogicalCoreCount, LogicalCoreList
40from framework.testbed_model.sut_node import SutNode
41from framework.utils import REGEX_FOR_MAC_ADDRESS, StrEnum
42
43P = ParamSpec("P")
44TestPmdShellMethod = Callable[Concatenate["TestPmdShell", P], Any]
45
46TestPmdShellCapabilityMethod: TypeAlias = Callable[
47    ["TestPmdShell", MutableSet["NicCapability"], MutableSet["NicCapability"]], None
48]
49
50TestPmdShellDecorator: TypeAlias = Callable[[TestPmdShellMethod], TestPmdShellMethod]
51
52TestPmdShellNicCapability = (
53    TestPmdShellCapabilityMethod | tuple[TestPmdShellCapabilityMethod, TestPmdShellDecorator]
54)
55
56
57class TestPmdDevice:
58    """The data of a device that testpmd can recognize.
59
60    Attributes:
61        pci_address: The PCI address of the device.
62    """
63
64    pci_address: str
65
66    def __init__(self, pci_address_line: str):
67        """Initialize the device from the testpmd output line string.
68
69        Args:
70            pci_address_line: A line of testpmd output that contains a device.
71        """
72        self.pci_address = pci_address_line.strip().split(": ")[1].strip()
73
74    def __str__(self) -> str:
75        """The PCI address captures what the device is."""
76        return self.pci_address
77
78
79class VLANOffloadFlag(Flag):
80    """Flag representing the VLAN offload settings of a NIC port."""
81
82    #:
83    STRIP = auto()
84    #:
85    FILTER = auto()
86    #:
87    EXTEND = auto()
88    #:
89    QINQ_STRIP = auto()
90
91    @classmethod
92    def from_str_dict(cls, d):
93        """Makes an instance from a dict containing the flag member names with an "on" value.
94
95        Args:
96            d: A dictionary containing the flag members as keys and any string value.
97
98        Returns:
99            A new instance of the flag.
100        """
101        flag = cls(0)
102        for name in cls.__members__:
103            if d.get(name) == "on":
104                flag |= cls[name]
105        return flag
106
107    @classmethod
108    def make_parser(cls) -> ParserFn:
109        """Makes a parser function.
110
111        Returns:
112            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
113                parser function that makes an instance of this flag from text.
114        """
115        return TextParser.wrap(
116            TextParser.find(
117                r"VLAN offload:\s+"
118                r"strip (?P<STRIP>on|off), "
119                r"filter (?P<FILTER>on|off), "
120                r"extend (?P<EXTEND>on|off), "
121                r"qinq strip (?P<QINQ_STRIP>on|off)",
122                re.MULTILINE,
123                named=True,
124            ),
125            cls.from_str_dict,
126        )
127
128
129class RSSOffloadTypesFlag(Flag):
130    """Flag representing the RSS offload flow types supported by the NIC port."""
131
132    #:
133    ipv4 = auto()
134    #:
135    ipv4_frag = auto()
136    #:
137    ipv4_tcp = auto()
138    #:
139    ipv4_udp = auto()
140    #:
141    ipv4_sctp = auto()
142    #:
143    ipv4_other = auto()
144    #:
145    ipv6 = auto()
146    #:
147    ipv6_frag = auto()
148    #:
149    ipv6_tcp = auto()
150    #:
151    ipv6_udp = auto()
152    #:
153    ipv6_sctp = auto()
154    #:
155    ipv6_other = auto()
156    #:
157    l2_payload = auto()
158    #:
159    ipv6_ex = auto()
160    #:
161    ipv6_tcp_ex = auto()
162    #:
163    ipv6_udp_ex = auto()
164    #:
165    port = auto()
166    #:
167    vxlan = auto()
168    #:
169    geneve = auto()
170    #:
171    nvgre = auto()
172    #:
173    user_defined_22 = auto()
174    #:
175    gtpu = auto()
176    #:
177    eth = auto()
178    #:
179    s_vlan = auto()
180    #:
181    c_vlan = auto()
182    #:
183    esp = auto()
184    #:
185    ah = auto()
186    #:
187    l2tpv3 = auto()
188    #:
189    pfcp = auto()
190    #:
191    pppoe = auto()
192    #:
193    ecpri = auto()
194    #:
195    mpls = auto()
196    #:
197    ipv4_chksum = auto()
198    #:
199    l4_chksum = auto()
200    #:
201    l2tpv2 = auto()
202    #:
203    ipv6_flow_label = auto()
204    #:
205    user_defined_38 = auto()
206    #:
207    user_defined_39 = auto()
208    #:
209    user_defined_40 = auto()
210    #:
211    user_defined_41 = auto()
212    #:
213    user_defined_42 = auto()
214    #:
215    user_defined_43 = auto()
216    #:
217    user_defined_44 = auto()
218    #:
219    user_defined_45 = auto()
220    #:
221    user_defined_46 = auto()
222    #:
223    user_defined_47 = auto()
224    #:
225    user_defined_48 = auto()
226    #:
227    user_defined_49 = auto()
228    #:
229    user_defined_50 = auto()
230    #:
231    user_defined_51 = auto()
232    #:
233    l3_pre96 = auto()
234    #:
235    l3_pre64 = auto()
236    #:
237    l3_pre56 = auto()
238    #:
239    l3_pre48 = auto()
240    #:
241    l3_pre40 = auto()
242    #:
243    l3_pre32 = auto()
244    #:
245    l2_dst_only = auto()
246    #:
247    l2_src_only = auto()
248    #:
249    l4_dst_only = auto()
250    #:
251    l4_src_only = auto()
252    #:
253    l3_dst_only = auto()
254    #:
255    l3_src_only = auto()
256
257    #:
258    ip = ipv4 | ipv4_frag | ipv4_other | ipv6 | ipv6_frag | ipv6_other | ipv6_ex
259    #:
260    udp = ipv4_udp | ipv6_udp | ipv6_udp_ex
261    #:
262    tcp = ipv4_tcp | ipv6_tcp | ipv6_tcp_ex
263    #:
264    sctp = ipv4_sctp | ipv6_sctp
265    #:
266    tunnel = vxlan | geneve | nvgre
267    #:
268    vlan = s_vlan | c_vlan
269    #:
270    all = (
271        eth
272        | vlan
273        | ip
274        | tcp
275        | udp
276        | sctp
277        | l2_payload
278        | l2tpv3
279        | esp
280        | ah
281        | pfcp
282        | gtpu
283        | ecpri
284        | mpls
285        | l2tpv2
286    )
287
288    @classmethod
289    def from_list_string(cls, names: str) -> Self:
290        """Makes a flag from a whitespace-separated list of names.
291
292        Args:
293            names: a whitespace-separated list containing the members of this flag.
294
295        Returns:
296            An instance of this flag.
297        """
298        flag = cls(0)
299        for name in names.split():
300            flag |= cls.from_str(name)
301        return flag
302
303    @classmethod
304    def from_str(cls, name: str) -> Self:
305        """Makes a flag matching the supplied name.
306
307        Args:
308            name: a valid member of this flag in text
309        Returns:
310            An instance of this flag.
311        """
312        member_name = name.strip().replace("-", "_")
313        return cls[member_name]
314
315    @classmethod
316    def make_parser(cls) -> ParserFn:
317        """Makes a parser function.
318
319        Returns:
320            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
321                parser function that makes an instance of this flag from text.
322        """
323        return TextParser.wrap(
324            TextParser.find(r"Supported RSS offload flow types:((?:\r?\n?  \S+)+)", re.MULTILINE),
325            RSSOffloadTypesFlag.from_list_string,
326        )
327
328
329class DeviceCapabilitiesFlag(Flag):
330    """Flag representing the device capabilities."""
331
332    #: Device supports Rx queue setup after device started.
333    RUNTIME_RX_QUEUE_SETUP = auto()
334    #: Device supports Tx queue setup after device started.
335    RUNTIME_TX_QUEUE_SETUP = auto()
336    #: Device supports shared Rx queue among ports within Rx domain and switch domain.
337    RXQ_SHARE = auto()
338    #: Device supports keeping flow rules across restart.
339    FLOW_RULE_KEEP = auto()
340    #: Device supports keeping shared flow objects across restart.
341    FLOW_SHARED_OBJECT_KEEP = auto()
342
343    @classmethod
344    def make_parser(cls) -> ParserFn:
345        """Makes a parser function.
346
347        Returns:
348            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
349                parser function that makes an instance of this flag from text.
350        """
351        return TextParser.wrap(
352            TextParser.find_int(r"Device capabilities: (0x[A-Fa-f\d]+)"),
353            cls,
354        )
355
356
357class DeviceErrorHandlingMode(StrEnum):
358    """Enum representing the device error handling mode."""
359
360    #:
361    none = auto()
362    #:
363    passive = auto()
364    #:
365    proactive = auto()
366    #:
367    unknown = auto()
368
369    @classmethod
370    def make_parser(cls) -> ParserFn:
371        """Makes a parser function.
372
373        Returns:
374            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
375                parser function that makes an instance of this enum from text.
376        """
377        return TextParser.wrap(TextParser.find(r"Device error handling mode: (\w+)"), cls)
378
379
380def make_device_private_info_parser() -> ParserFn:
381    """Device private information parser.
382
383    Ensures that we are not parsing invalid device private info output.
384
385    Returns:
386        ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a parser
387            function that parses the device private info from the TestPmd port info output.
388    """
389
390    def _validate(info: str):
391        info = info.strip()
392        if info == "none" or info.startswith("Invalid file") or info.startswith("Failed to dump"):
393            return None
394        return info
395
396    return TextParser.wrap(TextParser.find(r"Device private info:\s+([\s\S]+)"), _validate)
397
398
399class RxQueueState(StrEnum):
400    """RX queue states.
401
402    References:
403        DPDK lib: ``lib/ethdev/rte_ethdev.h``
404        testpmd display function: ``app/test-pmd/config.c:get_queue_state_name()``
405    """
406
407    #:
408    stopped = auto()
409    #:
410    started = auto()
411    #:
412    hairpin = auto()
413    #:
414    unknown = auto()
415
416    @classmethod
417    def make_parser(cls) -> ParserFn:
418        """Makes a parser function.
419
420        Returns:
421            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
422                parser function that makes an instance of this enum from text.
423        """
424        return TextParser.wrap(TextParser.find(r"Rx queue state: ([^\r\n]+)"), cls)
425
426
427@dataclass
428class TestPmdRxqInfo(TextParser):
429    """Representation of testpmd's ``show rxq info <port_id> <queue_id>`` command.
430
431    References:
432        testpmd command function: ``app/test-pmd/cmdline.c:cmd_showqueue()``
433        testpmd display function: ``app/test-pmd/config.c:rx_queue_infos_display()``
434    """
435
436    #:
437    port_id: int = field(metadata=TextParser.find_int(r"Infos for port (\d+)\b ?, RX queue \d+\b"))
438    #:
439    queue_id: int = field(metadata=TextParser.find_int(r"Infos for port \d+\b ?, RX queue (\d+)\b"))
440    #: Mempool used by that queue
441    mempool: str = field(metadata=TextParser.find(r"Mempool: ([^\r\n]+)"))
442    #: Ring prefetch threshold
443    rx_prefetch_threshold: int = field(
444        metadata=TextParser.find_int(r"RX prefetch threshold: (\d+)\b")
445    )
446    #: Ring host threshold
447    rx_host_threshold: int = field(metadata=TextParser.find_int(r"RX host threshold: (\d+)\b"))
448    #: Ring writeback threshold
449    rx_writeback_threshold: int = field(
450        metadata=TextParser.find_int(r"RX writeback threshold: (\d+)\b")
451    )
452    #: Drives the freeing of Rx descriptors
453    rx_free_threshold: int = field(metadata=TextParser.find_int(r"RX free threshold: (\d+)\b"))
454    #: Drop packets if no descriptors are available
455    rx_drop_packets: bool = field(metadata=TextParser.find(r"RX drop packets: on"))
456    #: Do not start queue with rte_eth_dev_start()
457    rx_deferred_start: bool = field(metadata=TextParser.find(r"RX deferred start: on"))
458    #: Scattered packets Rx enabled
459    rx_scattered_packets: bool = field(metadata=TextParser.find(r"RX scattered packets: on"))
460    #: The state of the queue
461    rx_queue_state: str = field(metadata=RxQueueState.make_parser())
462    #: Configured number of RXDs
463    number_of_rxds: int = field(metadata=TextParser.find_int(r"Number of RXDs: (\d+)\b"))
464    #: Hardware receive buffer size
465    rx_buffer_size: int | None = field(
466        default=None, metadata=TextParser.find_int(r"RX buffer size: (\d+)\b")
467    )
468    #: Burst mode information
469    burst_mode: str | None = field(
470        default=None, metadata=TextParser.find(r"Burst mode: ([^\r\n]+)")
471    )
472
473
474@dataclass
475class TestPmdPort(TextParser):
476    """Dataclass representing the result of testpmd's ``show port info`` command."""
477
478    #:
479    id: int = field(metadata=TextParser.find_int(r"Infos for port (\d+)\b"))
480    #:
481    device_name: str = field(metadata=TextParser.find(r"Device name: ([^\r\n]+)"))
482    #:
483    driver_name: str = field(metadata=TextParser.find(r"Driver name: ([^\r\n]+)"))
484    #:
485    socket_id: int = field(metadata=TextParser.find_int(r"Connect to socket: (\d+)"))
486    #:
487    is_link_up: bool = field(metadata=TextParser.find("Link status: up"))
488    #:
489    link_speed: str = field(metadata=TextParser.find(r"Link speed: ([^\r\n]+)"))
490    #:
491    is_link_full_duplex: bool = field(metadata=TextParser.find("Link duplex: full-duplex"))
492    #:
493    is_link_autonegotiated: bool = field(metadata=TextParser.find("Autoneg status: On"))
494    #:
495    is_promiscuous_mode_enabled: bool = field(metadata=TextParser.find("Promiscuous mode: enabled"))
496    #:
497    is_allmulticast_mode_enabled: bool = field(
498        metadata=TextParser.find("Allmulticast mode: enabled")
499    )
500    #: Maximum number of MAC addresses
501    max_mac_addresses_num: int = field(
502        metadata=TextParser.find_int(r"Maximum number of MAC addresses: (\d+)")
503    )
504    #: Maximum configurable length of RX packet
505    max_hash_mac_addresses_num: int = field(
506        metadata=TextParser.find_int(r"Maximum number of MAC addresses of hash filtering: (\d+)")
507    )
508    #: Minimum size of RX buffer
509    min_rx_bufsize: int = field(metadata=TextParser.find_int(r"Minimum size of RX buffer: (\d+)"))
510    #: Maximum configurable length of RX packet
511    max_rx_packet_length: int = field(
512        metadata=TextParser.find_int(r"Maximum configurable length of RX packet: (\d+)")
513    )
514    #: Maximum configurable size of LRO aggregated packet
515    max_lro_packet_size: int = field(
516        metadata=TextParser.find_int(r"Maximum configurable size of LRO aggregated packet: (\d+)")
517    )
518
519    #: Current number of RX queues
520    rx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of RX queues: (\d+)"))
521    #: Max possible RX queues
522    max_rx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible RX queues: (\d+)"))
523    #: Max possible number of RXDs per queue
524    max_queue_rxd_num: int = field(
525        metadata=TextParser.find_int(r"Max possible number of RXDs per queue: (\d+)")
526    )
527    #: Min possible number of RXDs per queue
528    min_queue_rxd_num: int = field(
529        metadata=TextParser.find_int(r"Min possible number of RXDs per queue: (\d+)")
530    )
531    #: RXDs number alignment
532    rxd_alignment_num: int = field(metadata=TextParser.find_int(r"RXDs number alignment: (\d+)"))
533
534    #: Current number of TX queues
535    tx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of TX queues: (\d+)"))
536    #: Max possible TX queues
537    max_tx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible TX queues: (\d+)"))
538    #: Max possible number of TXDs per queue
539    max_queue_txd_num: int = field(
540        metadata=TextParser.find_int(r"Max possible number of TXDs per queue: (\d+)")
541    )
542    #: Min possible number of TXDs per queue
543    min_queue_txd_num: int = field(
544        metadata=TextParser.find_int(r"Min possible number of TXDs per queue: (\d+)")
545    )
546    #: TXDs number alignment
547    txd_alignment_num: int = field(metadata=TextParser.find_int(r"TXDs number alignment: (\d+)"))
548    #: Max segment number per packet
549    max_packet_segment_num: int = field(
550        metadata=TextParser.find_int(r"Max segment number per packet: (\d+)")
551    )
552    #: Max segment number per MTU/TSO
553    max_mtu_segment_num: int = field(
554        metadata=TextParser.find_int(r"Max segment number per MTU\/TSO: (\d+)")
555    )
556
557    #:
558    device_capabilities: DeviceCapabilitiesFlag = field(
559        metadata=DeviceCapabilitiesFlag.make_parser(),
560    )
561    #:
562    device_error_handling_mode: DeviceErrorHandlingMode | None = field(
563        default=None, metadata=DeviceErrorHandlingMode.make_parser()
564    )
565    #:
566    device_private_info: str | None = field(
567        default=None,
568        metadata=make_device_private_info_parser(),
569    )
570
571    #:
572    hash_key_size: int | None = field(
573        default=None, metadata=TextParser.find_int(r"Hash key size in bytes: (\d+)")
574    )
575    #:
576    redirection_table_size: int | None = field(
577        default=None, metadata=TextParser.find_int(r"Redirection table size: (\d+)")
578    )
579    #:
580    supported_rss_offload_flow_types: RSSOffloadTypesFlag = field(
581        default=RSSOffloadTypesFlag(0), metadata=RSSOffloadTypesFlag.make_parser()
582    )
583
584    #:
585    mac_address: str | None = field(
586        default=None, metadata=TextParser.find(r"MAC address: ([A-Fa-f0-9:]+)")
587    )
588    #:
589    fw_version: str | None = field(
590        default=None, metadata=TextParser.find(r"Firmware-version: ([^\r\n]+)")
591    )
592    #:
593    dev_args: str | None = field(default=None, metadata=TextParser.find(r"Devargs: ([^\r\n]+)"))
594    #: Socket id of the memory allocation
595    mem_alloc_socket_id: int | None = field(
596        default=None,
597        metadata=TextParser.find_int(r"memory allocation on the socket: (\d+)"),
598    )
599    #:
600    mtu: int | None = field(default=None, metadata=TextParser.find_int(r"MTU: (\d+)"))
601
602    #:
603    vlan_offload: VLANOffloadFlag | None = field(
604        default=None,
605        metadata=VLANOffloadFlag.make_parser(),
606    )
607
608    #: Maximum size of RX buffer
609    max_rx_bufsize: int | None = field(
610        default=None, metadata=TextParser.find_int(r"Maximum size of RX buffer: (\d+)")
611    )
612    #: Maximum number of VFs
613    max_vfs_num: int | None = field(
614        default=None, metadata=TextParser.find_int(r"Maximum number of VFs: (\d+)")
615    )
616    #: Maximum number of VMDq pools
617    max_vmdq_pools_num: int | None = field(
618        default=None, metadata=TextParser.find_int(r"Maximum number of VMDq pools: (\d+)")
619    )
620
621    #:
622    switch_name: str | None = field(
623        default=None, metadata=TextParser.find(r"Switch name: ([\r\n]+)")
624    )
625    #:
626    switch_domain_id: int | None = field(
627        default=None, metadata=TextParser.find_int(r"Switch domain Id: (\d+)")
628    )
629    #:
630    switch_port_id: int | None = field(
631        default=None, metadata=TextParser.find_int(r"Switch Port Id: (\d+)")
632    )
633    #:
634    switch_rx_domain: int | None = field(
635        default=None, metadata=TextParser.find_int(r"Switch Rx domain: (\d+)")
636    )
637
638
639@dataclass
640class TestPmdPortStats(TextParser):
641    """Port statistics."""
642
643    #:
644    port_id: int = field(metadata=TextParser.find_int(r"NIC statistics for port (\d+)"))
645
646    #:
647    rx_packets: int = field(metadata=TextParser.find_int(r"RX-packets:\s+(\d+)"))
648    #:
649    rx_missed: int = field(metadata=TextParser.find_int(r"RX-missed:\s+(\d+)"))
650    #:
651    rx_bytes: int = field(metadata=TextParser.find_int(r"RX-bytes:\s+(\d+)"))
652    #:
653    rx_errors: int = field(metadata=TextParser.find_int(r"RX-errors:\s+(\d+)"))
654    #:
655    rx_nombuf: int = field(metadata=TextParser.find_int(r"RX-nombuf:\s+(\d+)"))
656
657    #:
658    tx_packets: int = field(metadata=TextParser.find_int(r"TX-packets:\s+(\d+)"))
659    #:
660    tx_errors: int = field(metadata=TextParser.find_int(r"TX-errors:\s+(\d+)"))
661    #:
662    tx_bytes: int = field(metadata=TextParser.find_int(r"TX-bytes:\s+(\d+)"))
663
664    #:
665    rx_pps: int = field(metadata=TextParser.find_int(r"Rx-pps:\s+(\d+)"))
666    #:
667    rx_bps: int = field(metadata=TextParser.find_int(r"Rx-bps:\s+(\d+)"))
668
669    #:
670    tx_pps: int = field(metadata=TextParser.find_int(r"Tx-pps:\s+(\d+)"))
671    #:
672    tx_bps: int = field(metadata=TextParser.find_int(r"Tx-bps:\s+(\d+)"))
673
674
675class PacketOffloadFlag(Flag):
676    """Flag representing the Packet Offload Features Flags in DPDK.
677
678    Values in this class are taken from the definitions in the RTE MBUF core library in DPDK
679    located in ``lib/mbuf/rte_mbuf_core.h``. It is expected that flag values in this class will
680    match the values they are set to in said DPDK library with one exception; all values must be
681    unique. For example, the definitions for unknown checksum flags in ``rte_mbuf_core.h`` are all
682    set to :data:`0`, but it is valuable to distinguish between them in this framework. For this
683    reason flags that are not unique in the DPDK library are set either to values within the
684    RTE_MBUF_F_FIRST_FREE-RTE_MBUF_F_LAST_FREE range for Rx or shifted 61+ bits for Tx.
685
686    References:
687        DPDK lib: ``lib/mbuf/rte_mbuf_core.h``
688    """
689
690    # RX flags
691
692    #: The RX packet is a 802.1q VLAN packet, and the tci has been saved in mbuf->vlan_tci. If the
693    #: flag RTE_MBUF_F_RX_VLAN_STRIPPED is also present, the VLAN header has been stripped from
694    #: mbuf data, else it is still present.
695    RTE_MBUF_F_RX_VLAN = auto()
696
697    #: RX packet with RSS hash result.
698    RTE_MBUF_F_RX_RSS_HASH = auto()
699
700    #: RX packet with FDIR match indicate.
701    RTE_MBUF_F_RX_FDIR = auto()
702
703    #: This flag is set when the outermost IP header checksum is detected as wrong by the hardware.
704    RTE_MBUF_F_RX_OUTER_IP_CKSUM_BAD = 1 << 5
705
706    #: A vlan has been stripped by the hardware and its tci is saved in mbuf->vlan_tci. This can
707    #: only happen if vlan stripping is enabled in the RX configuration of the PMD. When
708    #: RTE_MBUF_F_RX_VLAN_STRIPPED is set, RTE_MBUF_F_RX_VLAN must also be set.
709    RTE_MBUF_F_RX_VLAN_STRIPPED = auto()
710
711    #: No information about the RX IP checksum. Value is 0 in the DPDK library.
712    RTE_MBUF_F_RX_IP_CKSUM_UNKNOWN = 1 << 23
713    #: The IP checksum in the packet is wrong.
714    RTE_MBUF_F_RX_IP_CKSUM_BAD = 1 << 4
715    #: The IP checksum in the packet is valid.
716    RTE_MBUF_F_RX_IP_CKSUM_GOOD = 1 << 7
717    #: The IP checksum is not correct in the packet data, but the integrity of the IP header is
718    #: verified. Value is RTE_MBUF_F_RX_IP_CKSUM_BAD | RTE_MBUF_F_RX_IP_CKSUM_GOOD in the DPDK
719    #: library.
720    RTE_MBUF_F_RX_IP_CKSUM_NONE = 1 << 24
721
722    #: No information about the RX L4 checksum. Value is 0 in the DPDK library.
723    RTE_MBUF_F_RX_L4_CKSUM_UNKNOWN = 1 << 25
724    #: The L4 checksum in the packet is wrong.
725    RTE_MBUF_F_RX_L4_CKSUM_BAD = 1 << 3
726    #: The L4 checksum in the packet is valid.
727    RTE_MBUF_F_RX_L4_CKSUM_GOOD = 1 << 8
728    #: The L4 checksum is not correct in the packet data, but the integrity of the L4 data is
729    #: verified. Value is RTE_MBUF_F_RX_L4_CKSUM_BAD | RTE_MBUF_F_RX_L4_CKSUM_GOOD in the DPDK
730    #: library.
731    RTE_MBUF_F_RX_L4_CKSUM_NONE = 1 << 26
732
733    #: RX IEEE1588 L2 Ethernet PT Packet.
734    RTE_MBUF_F_RX_IEEE1588_PTP = 1 << 9
735    #: RX IEEE1588 L2/L4 timestamped packet.
736    RTE_MBUF_F_RX_IEEE1588_TMST = 1 << 10
737
738    #: FD id reported if FDIR match.
739    RTE_MBUF_F_RX_FDIR_ID = 1 << 13
740    #: Flexible bytes reported if FDIR match.
741    RTE_MBUF_F_RX_FDIR_FLX = 1 << 14
742
743    #: If both RTE_MBUF_F_RX_QINQ_STRIPPED and RTE_MBUF_F_RX_VLAN_STRIPPED are set, the 2 VLANs
744    #: have been stripped by the hardware. If RTE_MBUF_F_RX_QINQ_STRIPPED is set and
745    #: RTE_MBUF_F_RX_VLAN_STRIPPED is unset, only the outer VLAN is removed from packet data.
746    RTE_MBUF_F_RX_QINQ_STRIPPED = auto()
747
748    #: When packets are coalesced by a hardware or virtual driver, this flag can be set in the RX
749    #: mbuf, meaning that the m->tso_segsz field is valid and is set to the segment size of
750    #: original packets.
751    RTE_MBUF_F_RX_LRO = auto()
752
753    #: Indicate that security offload processing was applied on the RX packet.
754    RTE_MBUF_F_RX_SEC_OFFLOAD = 1 << 18
755    #: Indicate that security offload processing failed on the RX packet.
756    RTE_MBUF_F_RX_SEC_OFFLOAD_FAILED = auto()
757
758    #: The RX packet is a double VLAN. If this flag is set, RTE_MBUF_F_RX_VLAN must also be set. If
759    #: the flag RTE_MBUF_F_RX_QINQ_STRIPPED is also present, both VLANs headers have been stripped
760    #: from mbuf data, else they are still present.
761    RTE_MBUF_F_RX_QINQ = auto()
762
763    #: No info about the outer RX L4 checksum. Value is 0 in the DPDK library.
764    RTE_MBUF_F_RX_OUTER_L4_CKSUM_UNKNOWN = 1 << 27
765    #: The outer L4 checksum in the packet is wrong
766    RTE_MBUF_F_RX_OUTER_L4_CKSUM_BAD = 1 << 21
767    #: The outer L4 checksum in the packet is valid
768    RTE_MBUF_F_RX_OUTER_L4_CKSUM_GOOD = 1 << 22
769    #: Invalid outer L4 checksum state. Value is
770    #: RTE_MBUF_F_RX_OUTER_L4_CKSUM_BAD | RTE_MBUF_F_RX_OUTER_L4_CKSUM_GOOD in the DPDK library.
771    RTE_MBUF_F_RX_OUTER_L4_CKSUM_INVALID = 1 << 28
772
773    # TX flags
774
775    #: Outer UDP checksum offload flag. This flag is used for enabling outer UDP checksum in PMD.
776    #: To use outer UDP checksum, the user either needs to enable the following in mbuf:
777    #:
778    #:  a) Fill outer_l2_len and outer_l3_len in mbuf.
779    #:  b) Set the RTE_MBUF_F_TX_OUTER_UDP_CKSUM flag.
780    #:  c) Set the RTE_MBUF_F_TX_OUTER_IPV4 or RTE_MBUF_F_TX_OUTER_IPV6 flag.
781    #:
782    #: Or configure RTE_ETH_TX_OFFLOAD_OUTER_UDP_CKSUM offload flag.
783    RTE_MBUF_F_TX_OUTER_UDP_CKSUM = 1 << 41
784
785    #: UDP Fragmentation Offload flag. This flag is used for enabling UDP fragmentation in SW or in
786    #: HW.
787    RTE_MBUF_F_TX_UDP_SEG = auto()
788
789    #: Request security offload processing on the TX packet. To use Tx security offload, the user
790    #: needs to fill l2_len in mbuf indicating L2 header size and where L3 header starts.
791    #: Similarly, l3_len should also be filled along with ol_flags reflecting current L3 type.
792    RTE_MBUF_F_TX_SEC_OFFLOAD = auto()
793
794    #: Offload the MACsec. This flag must be set by the application to enable this offload feature
795    #: for a packet to be transmitted.
796    RTE_MBUF_F_TX_MACSEC = auto()
797
798    # Bits 45:48 are used for the tunnel type in ``lib/mbuf/rte_mbuf_core.h``, but some are modified
799    # in this Flag to maintain uniqueness. The tunnel type must be specified for TSO or checksum on
800    # the inner part of tunnel packets. These flags can be used with RTE_MBUF_F_TX_TCP_SEG for TSO,
801    # or RTE_MBUF_F_TX_xxx_CKSUM. The mbuf fields for inner and outer header lengths are required:
802    # outer_l2_len, outer_l3_len, l2_len, l3_len, l4_len and tso_segsz for TSO.
803
804    #:
805    RTE_MBUF_F_TX_TUNNEL_VXLAN = 1 << 45
806    #:
807    RTE_MBUF_F_TX_TUNNEL_GRE = 1 << 46
808    #: Value is 3 << 45 in the DPDK library.
809    RTE_MBUF_F_TX_TUNNEL_IPIP = 1 << 61
810    #:
811    RTE_MBUF_F_TX_TUNNEL_GENEVE = 1 << 47
812    #: TX packet with MPLS-in-UDP RFC 7510 header. Value is 5 << 45 in the DPDK library.
813    RTE_MBUF_F_TX_TUNNEL_MPLSINUDP = 1 << 62
814    #: Value is 6 << 45 in the DPDK library.
815    RTE_MBUF_F_TX_TUNNEL_VXLAN_GPE = 1 << 63
816    #: Value is 7 << 45 in the DPDK library.
817    RTE_MBUF_F_TX_TUNNEL_GTP = 1 << 64
818    #:
819    RTE_MBUF_F_TX_TUNNEL_ESP = 1 << 48
820    #: Generic IP encapsulated tunnel type, used for TSO and checksum offload. This can be used for
821    #: tunnels which are not standards or listed above. It is preferred to use specific tunnel
822    #: flags like RTE_MBUF_F_TX_TUNNEL_GRE or RTE_MBUF_F_TX_TUNNEL_IPIP if possible. The ethdev
823    #: must be configured with RTE_ETH_TX_OFFLOAD_IP_TNL_TSO.  Outer and inner checksums are done
824    #: according to the existing flags like RTE_MBUF_F_TX_xxx_CKSUM. Specific tunnel headers that
825    #: contain payload length, sequence id or checksum are not expected to be updated. Value is
826    #: 0xD << 45 in the DPDK library.
827    RTE_MBUF_F_TX_TUNNEL_IP = 1 << 65
828    #: Generic UDP encapsulated tunnel type, used for TSO and checksum offload. UDP tunnel type
829    #: implies outer IP layer. It can be used for tunnels which are not standards or listed above.
830    #: It is preferred to use specific tunnel flags like RTE_MBUF_F_TX_TUNNEL_VXLAN if possible.
831    #: The ethdev must be configured with RTE_ETH_TX_OFFLOAD_UDP_TNL_TSO. Outer and inner checksums
832    #: are done according to the existing flags like RTE_MBUF_F_TX_xxx_CKSUM. Specific tunnel
833    #: headers that contain payload length, sequence id or checksum are not expected to be updated.
834    #: value is 0xE << 45 in the DPDK library.
835    RTE_MBUF_F_TX_TUNNEL_UDP = 1 << 66
836
837    #: Double VLAN insertion (QinQ) request to driver, driver may offload the insertion based on
838    #: device capability. Mbuf 'vlan_tci' & 'vlan_tci_outer' must be valid when this flag is set.
839    RTE_MBUF_F_TX_QINQ = 1 << 49
840
841    #: TCP segmentation offload. To enable this offload feature for a packet to be transmitted on
842    #: hardware supporting TSO:
843    #:
844    #:  - set the RTE_MBUF_F_TX_TCP_SEG flag in mbuf->ol_flags (this flag implies
845    #:      RTE_MBUF_F_TX_TCP_CKSUM)
846    #:  - set the flag RTE_MBUF_F_TX_IPV4 or RTE_MBUF_F_TX_IPV6
847    #:      * if it's IPv4, set the RTE_MBUF_F_TX_IP_CKSUM flag
848    #:  - fill the mbuf offload information: l2_len, l3_len, l4_len, tso_segsz
849    RTE_MBUF_F_TX_TCP_SEG = auto()
850
851    #: TX IEEE1588 packet to timestamp.
852    RTE_MBUF_F_TX_IEEE1588_TMST = auto()
853
854    # Bits 52+53 used for L4 packet type with checksum enabled in ``lib/mbuf/rte_mbuf_core.h`` but
855    # some values must be modified in this framework to maintain uniqueness. To use hardware
856    # L4 checksum offload, the user needs to:
857    #
858    # - fill l2_len and l3_len in mbuf
859    # - set the flags RTE_MBUF_F_TX_TCP_CKSUM, RTE_MBUF_F_TX_SCTP_CKSUM or
860    #   RTE_MBUF_F_TX_UDP_CKSUM
861    # - set the flag RTE_MBUF_F_TX_IPV4 or RTE_MBUF_F_TX_IPV6
862
863    #: Disable L4 cksum of TX pkt. Value is 0 in the DPDK library.
864    RTE_MBUF_F_TX_L4_NO_CKSUM = 1 << 67
865    #: TCP cksum of TX pkt. Computed by NIC.
866    RTE_MBUF_F_TX_TCP_CKSUM = 1 << 52
867    #: SCTP cksum of TX pkt. Computed by NIC.
868    RTE_MBUF_F_TX_SCTP_CKSUM = 1 << 53
869    #: UDP cksum of TX pkt. Computed by NIC. Value is 3 << 52 in the DPDK library.
870    RTE_MBUF_F_TX_UDP_CKSUM = 1 << 68
871
872    #: Offload the IP checksum in the hardware. The flag RTE_MBUF_F_TX_IPV4 should also be set by
873    #: the application, although a PMD will only check RTE_MBUF_F_TX_IP_CKSUM.
874    RTE_MBUF_F_TX_IP_CKSUM = 1 << 54
875
876    #: Packet is IPv4. This flag must be set when using any offload feature (TSO, L3 or L4
877    #: checksum) to tell the NIC that the packet is an IPv4 packet. If the packet is a tunneled
878    #: packet, this flag is related to the inner headers.
879    RTE_MBUF_F_TX_IPV4 = auto()
880    #: Packet is IPv6. This flag must be set when using an offload feature (TSO or L4 checksum) to
881    #: tell the NIC that the packet is an IPv6 packet. If the packet is a tunneled packet, this
882    #: flag is related to the inner headers.
883    RTE_MBUF_F_TX_IPV6 = auto()
884    #: VLAN tag insertion request to driver, driver may offload the insertion based on the device
885    #: capability. mbuf 'vlan_tci' field must be valid when this flag is set.
886    RTE_MBUF_F_TX_VLAN = auto()
887
888    #: Offload the IP checksum of an external header in the hardware. The flag
889    #: RTE_MBUF_F_TX_OUTER_IPV4 should also be set by the application, although a PMD will only
890    #: check RTE_MBUF_F_TX_OUTER_IP_CKSUM.
891    RTE_MBUF_F_TX_OUTER_IP_CKSUM = auto()
892    #: Packet outer header is IPv4. This flag must be set when using any outer offload feature (L3
893    #: or L4 checksum) to tell the NIC that the outer header of the tunneled packet is an IPv4
894    #: packet.
895    RTE_MBUF_F_TX_OUTER_IPV4 = auto()
896    #: Packet outer header is IPv6. This flag must be set when using any outer offload feature (L4
897    #: checksum) to tell the NIC that the outer header of the tunneled packet is an IPv6 packet.
898    RTE_MBUF_F_TX_OUTER_IPV6 = auto()
899
900    @classmethod
901    def from_list_string(cls, names: str) -> Self:
902        """Makes a flag from a whitespace-separated list of names.
903
904        Args:
905            names: a whitespace-separated list containing the members of this flag.
906
907        Returns:
908            An instance of this flag.
909        """
910        flag = cls(0)
911        for name in names.split():
912            flag |= cls.from_str(name)
913        return flag
914
915    @classmethod
916    def from_str(cls, name: str) -> Self:
917        """Makes a flag matching the supplied name.
918
919        Args:
920            name: a valid member of this flag in text
921        Returns:
922            An instance of this flag.
923        """
924        member_name = name.strip().replace("-", "_")
925        return cls[member_name]
926
927    @classmethod
928    def make_parser(cls) -> ParserFn:
929        """Makes a parser function.
930
931        Returns:
932            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
933                parser function that makes an instance of this flag from text.
934        """
935        return TextParser.wrap(
936            TextParser.find(r"ol_flags: ([^\n]+)"),
937            cls.from_list_string,
938        )
939
940
941class RtePTypes(Flag):
942    """Flag representing possible packet types in DPDK verbose output.
943
944    Values in this class are derived from definitions in the RTE MBUF ptype library in DPDK located
945    in ``lib/mbuf/rte_mbuf_ptype.h``. Specifically, the names of values in this class should match
946    the possible return options from the functions ``rte_get_ptype_*_name`` in ``rte_mbuf_ptype.c``.
947
948    References:
949        DPDK lib: ``lib/mbuf/rte_mbuf_ptype.h``
950        DPDK ptype name formatting functions: ``lib/mbuf/rte_mbuf_ptype.c:rte_get_ptype_*_name()``
951    """
952
953    # L2
954    #: Ethernet packet type. This is used for outer packet for tunneling cases.
955    L2_ETHER = auto()
956    #: Ethernet packet type for time sync.
957    L2_ETHER_TIMESYNC = auto()
958    #: ARP (Address Resolution Protocol) packet type.
959    L2_ETHER_ARP = auto()
960    #: LLDP (Link Layer Discovery Protocol) packet type.
961    L2_ETHER_LLDP = auto()
962    #: NSH (Network Service Header) packet type.
963    L2_ETHER_NSH = auto()
964    #: VLAN packet type.
965    L2_ETHER_VLAN = auto()
966    #: QinQ packet type.
967    L2_ETHER_QINQ = auto()
968    #: PPPOE packet type.
969    L2_ETHER_PPPOE = auto()
970    #: FCoE packet type..
971    L2_ETHER_FCOE = auto()
972    #: MPLS packet type.
973    L2_ETHER_MPLS = auto()
974    #: No L2 packet information.
975    L2_UNKNOWN = auto()
976
977    # L3
978    #: IP (Internet Protocol) version 4 packet type. This is used for outer packet for tunneling
979    #: cases, and does not contain any header option.
980    L3_IPV4 = auto()
981    #: IP (Internet Protocol) version 4 packet type. This is used for outer packet for tunneling
982    #: cases, and contains header options.
983    L3_IPV4_EXT = auto()
984    #: IP (Internet Protocol) version 6 packet type. This is used for outer packet for tunneling
985    #: cases, and does not contain any extension header.
986    L3_IPV6 = auto()
987    #: IP (Internet Protocol) version 4 packet type. This is used for outer packet for tunneling
988    #: cases, and may or maynot contain header options.
989    L3_IPV4_EXT_UNKNOWN = auto()
990    #: IP (Internet Protocol) version 6 packet type. This is used for outer packet for tunneling
991    #: cases, and contains extension headers.
992    L3_IPV6_EXT = auto()
993    #: IP (Internet Protocol) version 6 packet type. This is used for outer packet for tunneling
994    #: cases, and may or maynot contain extension headers.
995    L3_IPV6_EXT_UNKNOWN = auto()
996    #: No L3 packet information.
997    L3_UNKNOWN = auto()
998
999    # L4
1000    #: TCP (Transmission Control Protocol) packet type. This is used for outer packet for tunneling
1001    #: cases.
1002    L4_TCP = auto()
1003    #: UDP (User Datagram Protocol) packet type. This is used for outer packet for tunneling cases.
1004    L4_UDP = auto()
1005    #: Fragmented IP (Internet Protocol) packet type. This is used for outer packet for tunneling
1006    #: cases and refers to those packets of any IP types which can be recognized as fragmented. A
1007    #: fragmented packet cannot be recognized as any other L4 types (RTE_PTYPE_L4_TCP,
1008    #: RTE_PTYPE_L4_UDP, RTE_PTYPE_L4_SCTP, RTE_PTYPE_L4_ICMP, RTE_PTYPE_L4_NONFRAG).
1009    L4_FRAG = auto()
1010    #: SCTP (Stream Control Transmission Protocol) packet type. This is used for outer packet for
1011    #: tunneling cases.
1012    L4_SCTP = auto()
1013    #: ICMP (Internet Control Message Protocol) packet type. This is used for outer packet for
1014    #: tunneling cases.
1015    L4_ICMP = auto()
1016    #: Non-fragmented IP (Internet Protocol) packet type. This is used for outer packet for
1017    #: tunneling cases and refers to those packets of any IP types, that cannot be recognized as
1018    #: any of the above L4 types (RTE_PTYPE_L4_TCP, RTE_PTYPE_L4_UDP, RTE_PTYPE_L4_FRAG,
1019    #: RTE_PTYPE_L4_SCTP, RTE_PTYPE_L4_ICMP).
1020    L4_NONFRAG = auto()
1021    #: IGMP (Internet Group Management Protocol) packet type.
1022    L4_IGMP = auto()
1023    #: No L4 packet information.
1024    L4_UNKNOWN = auto()
1025
1026    # Tunnel
1027    #: IP (Internet Protocol) in IP (Internet Protocol) tunneling packet type.
1028    TUNNEL_IP = auto()
1029    #: GRE (Generic Routing Encapsulation) tunneling packet type.
1030    TUNNEL_GRE = auto()
1031    #: VXLAN (Virtual eXtensible Local Area Network) tunneling packet type.
1032    TUNNEL_VXLAN = auto()
1033    #: NVGRE (Network Virtualization using Generic Routing Encapsulation) tunneling packet type.
1034    TUNNEL_NVGRE = auto()
1035    #: GENEVE (Generic Network Virtualization Encapsulation) tunneling packet type.
1036    TUNNEL_GENEVE = auto()
1037    #: Tunneling packet type of Teredo, VXLAN (Virtual eXtensible Local Area Network) or GRE
1038    #: (Generic Routing Encapsulation) could be recognized as this packet type, if they can not be
1039    #: recognized independently as of hardware capability.
1040    TUNNEL_GRENAT = auto()
1041    #: GTP-C (GPRS Tunnelling Protocol) control tunneling packet type.
1042    TUNNEL_GTPC = auto()
1043    #: GTP-U (GPRS Tunnelling Protocol) user data tunneling packet type.
1044    TUNNEL_GTPU = auto()
1045    #: ESP (IP Encapsulating Security Payload) tunneling packet type.
1046    TUNNEL_ESP = auto()
1047    #: L2TP (Layer 2 Tunneling Protocol) tunneling packet type.
1048    TUNNEL_L2TP = auto()
1049    #: VXLAN-GPE (VXLAN Generic Protocol Extension) tunneling packet type.
1050    TUNNEL_VXLAN_GPE = auto()
1051    #: MPLS-in-UDP tunneling packet type (RFC 7510).
1052    TUNNEL_MPLS_IN_UDP = auto()
1053    #: MPLS-in-GRE tunneling packet type (RFC 4023).
1054    TUNNEL_MPLS_IN_GRE = auto()
1055    #: No tunnel information found on the packet.
1056    TUNNEL_UNKNOWN = auto()
1057
1058    # Inner L2
1059    #: Ethernet packet type. This is used for inner packet type only.
1060    INNER_L2_ETHER = auto()
1061    #: Ethernet packet type with VLAN (Virtual Local Area Network) tag.
1062    INNER_L2_ETHER_VLAN = auto()
1063    #: QinQ packet type.
1064    INNER_L2_ETHER_QINQ = auto()
1065    #: No inner L2 information found on the packet.
1066    INNER_L2_UNKNOWN = auto()
1067
1068    # Inner L3
1069    #: IP (Internet Protocol) version 4 packet type. This is used for inner packet only, and does
1070    #: not contain any header option.
1071    INNER_L3_IPV4 = auto()
1072    #: IP (Internet Protocol) version 4 packet type. This is used for inner packet only, and
1073    #: contains header options.
1074    INNER_L3_IPV4_EXT = auto()
1075    #: IP (Internet Protocol) version 6 packet type. This is used for inner packet only, and does
1076    #: not contain any extension header.
1077    INNER_L3_IPV6 = auto()
1078    #: IP (Internet Protocol) version 4 packet type. This is used for inner packet only, and may or
1079    #: may not contain header options.
1080    INNER_L3_IPV4_EXT_UNKNOWN = auto()
1081    #: IP (Internet Protocol) version 6 packet type. This is used for inner packet only, and
1082    #: contains extension headers.
1083    INNER_L3_IPV6_EXT = auto()
1084    #: IP (Internet Protocol) version 6 packet type. This is used for inner packet only, and may or
1085    #: may not contain extension headers.
1086    INNER_L3_IPV6_EXT_UNKNOWN = auto()
1087    #: No inner L3 information found on the packet.
1088    INNER_L3_UNKNOWN = auto()
1089
1090    # Inner L4
1091    #: TCP (Transmission Control Protocol) packet type. This is used for inner packet only.
1092    INNER_L4_TCP = auto()
1093    #: UDP (User Datagram Protocol) packet type. This is used for inner packet only.
1094    INNER_L4_UDP = auto()
1095    #: Fragmented IP (Internet Protocol) packet type. This is used for inner packet only, and may
1096    #: or maynot have a layer 4 packet.
1097    INNER_L4_FRAG = auto()
1098    #: SCTP (Stream Control Transmission Protocol) packet type. This is used for inner packet only.
1099    INNER_L4_SCTP = auto()
1100    #: ICMP (Internet Control Message Protocol) packet type. This is used for inner packet only.
1101    INNER_L4_ICMP = auto()
1102    #: Non-fragmented IP (Internet Protocol) packet type. It is used for inner packet only, and may
1103    #: or may not have other unknown layer 4 packet types.
1104    INNER_L4_NONFRAG = auto()
1105    #: No inner L4 information found on the packet.
1106    INNER_L4_UNKNOWN = auto()
1107
1108    @classmethod
1109    def from_list_string(cls, names: str) -> Self:
1110        """Makes a flag from a whitespace-separated list of names.
1111
1112        Args:
1113            names: a whitespace-separated list containing the members of this flag.
1114
1115        Returns:
1116            An instance of this flag.
1117        """
1118        flag = cls(0)
1119        for name in names.split():
1120            flag |= cls.from_str(name)
1121        return flag
1122
1123    @classmethod
1124    def from_str(cls, name: str) -> Self:
1125        """Makes a flag matching the supplied name.
1126
1127        Args:
1128            name: a valid member of this flag in text
1129        Returns:
1130            An instance of this flag.
1131        """
1132        member_name = name.strip().replace("-", "_")
1133        return cls[member_name]
1134
1135    @classmethod
1136    def make_parser(cls, hw: bool) -> ParserFn:
1137        """Makes a parser function.
1138
1139        Args:
1140            hw: Whether to make a parser for hardware ptypes or software ptypes. If :data:`True`,
1141                hardware ptypes will be collected, otherwise software pytpes will.
1142
1143        Returns:
1144            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
1145                parser function that makes an instance of this flag from text.
1146        """
1147        return TextParser.wrap(
1148            TextParser.find(f"{'hw' if hw else 'sw'} ptype: ([^-]+)"),
1149            cls.from_list_string,
1150        )
1151
1152
1153@dataclass
1154class TestPmdVerbosePacket(TextParser):
1155    """Packet information provided by verbose output in Testpmd.
1156
1157    This dataclass expects that packet information be prepended with the starting line of packet
1158    bursts. Specifically, the line that reads "port X/queue Y: sent/received Z packets".
1159    """
1160
1161    #: ID of the port that handled the packet.
1162    port_id: int = field(metadata=TextParser.find_int(r"port (\d+)/queue \d+"))
1163    #: ID of the queue that handled the packet.
1164    queue_id: int = field(metadata=TextParser.find_int(r"port \d+/queue (\d+)"))
1165    #: Whether the packet was received or sent by the queue/port.
1166    was_received: bool = field(metadata=TextParser.find(r"received \d+ packets"))
1167    #:
1168    src_mac: str = field(metadata=TextParser.find(f"src=({REGEX_FOR_MAC_ADDRESS})"))
1169    #:
1170    dst_mac: str = field(metadata=TextParser.find(f"dst=({REGEX_FOR_MAC_ADDRESS})"))
1171    #: Memory pool the packet was handled on.
1172    pool: str = field(metadata=TextParser.find(r"pool=(\S+)"))
1173    #: Packet type in hex.
1174    p_type: int = field(metadata=TextParser.find_int(r"type=(0x[a-fA-F\d]+)"))
1175    #:
1176    length: int = field(metadata=TextParser.find_int(r"length=(\d+)"))
1177    #: Number of segments in the packet.
1178    nb_segs: int = field(metadata=TextParser.find_int(r"nb_segs=(\d+)"))
1179    #: Hardware packet type.
1180    hw_ptype: RtePTypes = field(metadata=RtePTypes.make_parser(hw=True))
1181    #: Software packet type.
1182    sw_ptype: RtePTypes = field(metadata=RtePTypes.make_parser(hw=False))
1183    #:
1184    l2_len: int = field(metadata=TextParser.find_int(r"l2_len=(\d+)"))
1185    #:
1186    ol_flags: PacketOffloadFlag = field(metadata=PacketOffloadFlag.make_parser())
1187    #: RSS hash of the packet in hex.
1188    rss_hash: int | None = field(
1189        default=None, metadata=TextParser.find_int(r"RSS hash=(0x[a-fA-F\d]+)")
1190    )
1191    #: RSS queue that handled the packet in hex.
1192    rss_queue: int | None = field(
1193        default=None, metadata=TextParser.find_int(r"RSS queue=(0x[a-fA-F\d]+)")
1194    )
1195    #:
1196    l3_len: int | None = field(default=None, metadata=TextParser.find_int(r"l3_len=(\d+)"))
1197    #:
1198    l4_len: int | None = field(default=None, metadata=TextParser.find_int(r"l4_len=(\d+)"))
1199
1200
1201class RxOffloadCapability(Flag):
1202    """Rx offload capabilities of a device.
1203
1204    The flags are taken from ``lib/ethdev/rte_ethdev.h``.
1205    They're prefixed with ``RTE_ETH_RX_OFFLOAD`` in ``lib/ethdev/rte_ethdev.h``
1206    instead of ``RX_OFFLOAD``, which is what testpmd changes the prefix to.
1207    The values are not contiguous, so the correspondence is preserved
1208    by specifying concrete values interspersed between auto() values.
1209
1210    The ``RX_OFFLOAD`` prefix has been preserved so that the same flag names can be used
1211    in :class:`NicCapability`. The prefix is needed in :class:`NicCapability` since there's
1212    no other qualifier which would sufficiently distinguish it from other capabilities.
1213
1214    References:
1215        DPDK lib: ``lib/ethdev/rte_ethdev.h``
1216        testpmd display function: ``app/test-pmd/cmdline.c:print_rx_offloads()``
1217    """
1218
1219    #:
1220    RX_OFFLOAD_VLAN_STRIP = auto()
1221    #: Device supports L3 checksum offload.
1222    RX_OFFLOAD_IPV4_CKSUM = auto()
1223    #: Device supports L4 checksum offload.
1224    RX_OFFLOAD_UDP_CKSUM = auto()
1225    #: Device supports L4 checksum offload.
1226    RX_OFFLOAD_TCP_CKSUM = auto()
1227    #: Device supports Large Receive Offload.
1228    RX_OFFLOAD_TCP_LRO = auto()
1229    #: Device supports QinQ (queue in queue) offload.
1230    RX_OFFLOAD_QINQ_STRIP = auto()
1231    #: Device supports inner packet L3 checksum.
1232    RX_OFFLOAD_OUTER_IPV4_CKSUM = auto()
1233    #: Device supports MACsec.
1234    RX_OFFLOAD_MACSEC_STRIP = auto()
1235    #: Device supports filtering of a VLAN Tag identifier.
1236    RX_OFFLOAD_VLAN_FILTER = 1 << 9
1237    #: Device supports VLAN offload.
1238    RX_OFFLOAD_VLAN_EXTEND = auto()
1239    #: Device supports receiving segmented mbufs.
1240    RX_OFFLOAD_SCATTER = 1 << 13
1241    #: Device supports Timestamp.
1242    RX_OFFLOAD_TIMESTAMP = auto()
1243    #: Device supports crypto processing while packet is received in NIC.
1244    RX_OFFLOAD_SECURITY = auto()
1245    #: Device supports CRC stripping.
1246    RX_OFFLOAD_KEEP_CRC = auto()
1247    #: Device supports L4 checksum offload.
1248    RX_OFFLOAD_SCTP_CKSUM = auto()
1249    #: Device supports inner packet L4 checksum.
1250    RX_OFFLOAD_OUTER_UDP_CKSUM = auto()
1251    #: Device supports RSS hashing.
1252    RX_OFFLOAD_RSS_HASH = auto()
1253    #: Device supports
1254    RX_OFFLOAD_BUFFER_SPLIT = auto()
1255    #: Device supports all checksum capabilities.
1256    RX_OFFLOAD_CHECKSUM = RX_OFFLOAD_IPV4_CKSUM | RX_OFFLOAD_UDP_CKSUM | RX_OFFLOAD_TCP_CKSUM
1257    #: Device supports all VLAN capabilities.
1258    RX_OFFLOAD_VLAN = (
1259        RX_OFFLOAD_VLAN_STRIP
1260        | RX_OFFLOAD_VLAN_FILTER
1261        | RX_OFFLOAD_VLAN_EXTEND
1262        | RX_OFFLOAD_QINQ_STRIP
1263    )
1264
1265    @classmethod
1266    def from_string(cls, line: str) -> Self:
1267        """Make an instance from a string containing the flag names separated with a space.
1268
1269        Args:
1270            line: The line to parse.
1271
1272        Returns:
1273            A new instance containing all found flags.
1274        """
1275        flag = cls(0)
1276        for flag_name in line.split():
1277            flag |= cls[f"RX_OFFLOAD_{flag_name}"]
1278        return flag
1279
1280    @classmethod
1281    def make_parser(cls, per_port: bool) -> ParserFn:
1282        """Make a parser function.
1283
1284        Args:
1285            per_port: If :data:`True`, will return capabilities per port. If :data:`False`,
1286                will return capabilities per queue.
1287
1288        Returns:
1289            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
1290                parser function that makes an instance of this flag from text.
1291        """
1292        granularity = "Port" if per_port else "Queue"
1293        return TextParser.wrap(
1294            TextParser.find(rf"Per {granularity}\s+:(.*)$", re.MULTILINE),
1295            cls.from_string,
1296        )
1297
1298
1299@dataclass
1300class RxOffloadCapabilities(TextParser):
1301    """The result of testpmd's ``show port <port_id> rx_offload capabilities`` command.
1302
1303    References:
1304        testpmd command function: ``app/test-pmd/cmdline.c:cmd_rx_offload_get_capa()``
1305        testpmd display function: ``app/test-pmd/cmdline.c:cmd_rx_offload_get_capa_parsed()``
1306    """
1307
1308    #:
1309    port_id: int = field(
1310        metadata=TextParser.find_int(r"Rx Offloading Capabilities of port (\d+) :")
1311    )
1312    #: Per-queue Rx offload capabilities.
1313    per_queue: RxOffloadCapability = field(metadata=RxOffloadCapability.make_parser(False))
1314    #: Capabilities other than per-queue Rx offload capabilities.
1315    per_port: RxOffloadCapability = field(metadata=RxOffloadCapability.make_parser(True))
1316
1317
1318def requires_stopped_ports(func: TestPmdShellMethod) -> TestPmdShellMethod:
1319    """Decorator for :class:`TestPmdShell` commands methods that require stopped ports.
1320
1321    If the decorated method is called while the ports are started, then these are stopped before
1322    continuing.
1323
1324    Args:
1325        func: The :class:`TestPmdShell` method to decorate.
1326    """
1327
1328    @functools.wraps(func)
1329    def _wrapper(self: "TestPmdShell", *args: P.args, **kwargs: P.kwargs):
1330        if self.ports_started:
1331            self._logger.debug("Ports need to be stopped to continue.")
1332            self.stop_all_ports()
1333
1334        return func(self, *args, **kwargs)
1335
1336    return _wrapper
1337
1338
1339def requires_started_ports(func: TestPmdShellMethod) -> TestPmdShellMethod:
1340    """Decorator for :class:`TestPmdShell` commands methods that require started ports.
1341
1342    If the decorated method is called while the ports are stopped, then these are started before
1343    continuing.
1344
1345    Args:
1346        func: The :class:`TestPmdShell` method to decorate.
1347    """
1348
1349    @functools.wraps(func)
1350    def _wrapper(self: "TestPmdShell", *args: P.args, **kwargs: P.kwargs):
1351        if not self.ports_started:
1352            self._logger.debug("Ports need to be started to continue.")
1353            self.start_all_ports()
1354
1355        return func(self, *args, **kwargs)
1356
1357    return _wrapper
1358
1359
1360def add_remove_mtu(mtu: int = 1500) -> Callable[[TestPmdShellMethod], TestPmdShellMethod]:
1361    """Configure MTU to `mtu` on all ports, run the decorated function, then revert.
1362
1363    Args:
1364        mtu: The MTU to configure all ports on.
1365
1366    Returns:
1367        The method decorated with setting and reverting MTU.
1368    """
1369
1370    def decorator(func: TestPmdShellMethod) -> TestPmdShellMethod:
1371        @functools.wraps(func)
1372        def wrapper(self: "TestPmdShell", *args: P.args, **kwargs: P.kwargs):
1373            original_mtu = self.ports[0].mtu
1374            self.set_port_mtu_all(mtu=mtu, verify=False)
1375            retval = func(self, *args, **kwargs)
1376            self.set_port_mtu_all(original_mtu if original_mtu else 1500, verify=False)
1377            return retval
1378
1379        return wrapper
1380
1381    return decorator
1382
1383
1384class TestPmdShell(DPDKShell):
1385    """Testpmd interactive shell.
1386
1387    The testpmd shell users should never use
1388    the :meth:`~.interactive_shell.InteractiveShell.send_command` method directly, but rather
1389    call specialized methods. If there isn't one that satisfies a need, it should be added.
1390
1391    Attributes:
1392        ports_started: Indicates whether the ports are started.
1393    """
1394
1395    _app_params: TestPmdParams
1396    _ports: list[TestPmdPort] | None
1397
1398    #: The path to the testpmd executable.
1399    path: ClassVar[PurePath] = PurePath("app", "dpdk-testpmd")
1400
1401    #: The testpmd's prompt.
1402    _default_prompt: ClassVar[str] = "testpmd>"
1403
1404    #: This forces the prompt to appear after sending a command.
1405    _command_extra_chars: ClassVar[str] = "\n"
1406
1407    ports_started: bool
1408
1409    def __init__(
1410        self,
1411        node: SutNode,
1412        privileged: bool = True,
1413        timeout: float = SETTINGS.timeout,
1414        lcore_filter_specifier: LogicalCoreCount | LogicalCoreList = LogicalCoreCount(),
1415        ascending_cores: bool = True,
1416        append_prefix_timestamp: bool = True,
1417        name: str | None = None,
1418        **app_params: Unpack[TestPmdParamsDict],
1419    ) -> None:
1420        """Overrides :meth:`~.dpdk_shell.DPDKShell.__init__`. Changes app_params to kwargs."""
1421        super().__init__(
1422            node,
1423            privileged,
1424            timeout,
1425            lcore_filter_specifier,
1426            ascending_cores,
1427            append_prefix_timestamp,
1428            TestPmdParams(**app_params),
1429            name,
1430        )
1431        self.ports_started = not self._app_params.disable_device_start
1432        self._ports = None
1433
1434    @property
1435    def ports(self) -> list[TestPmdPort]:
1436        """The ports of the instance.
1437
1438        This caches the ports returned by :meth:`show_port_info_all`.
1439        To force an update of port information, execute :meth:`show_port_info_all` or
1440        :meth:`show_port_info`.
1441
1442        Returns: The list of known testpmd ports.
1443        """
1444        if self._ports is None:
1445            return self.show_port_info_all()
1446        return self._ports
1447
1448    @requires_started_ports
1449    def start(self, verify: bool = True) -> None:
1450        """Start packet forwarding with the current configuration.
1451
1452        Args:
1453            verify: If :data:`True` , a second start command will be sent in an attempt to verify
1454                packet forwarding started as expected.
1455
1456        Raises:
1457            InteractiveCommandExecutionError: If `verify` is :data:`True` and forwarding fails to
1458                start or ports fail to come up.
1459        """
1460        self.send_command("start")
1461        if verify:
1462            # If forwarding was already started, sending "start" again should tell us
1463            start_cmd_output = self.send_command("start")
1464            if "Packet forwarding already started" not in start_cmd_output:
1465                self._logger.debug(f"Failed to start packet forwarding: \n{start_cmd_output}")
1466                raise InteractiveCommandExecutionError("Testpmd failed to start packet forwarding.")
1467
1468            number_of_ports = len(self._app_params.ports or [])
1469            for port_id in range(number_of_ports):
1470                if not self.wait_link_status_up(port_id):
1471                    raise InteractiveCommandExecutionError(
1472                        "Not all ports came up after starting packet forwarding in testpmd."
1473                    )
1474
1475    def stop(self, verify: bool = True) -> str:
1476        """Stop packet forwarding.
1477
1478        Args:
1479            verify: If :data:`True` , the output of the stop command is scanned to verify that
1480                forwarding was stopped successfully or not started. If neither is found, it is
1481                considered an error.
1482
1483        Raises:
1484            InteractiveCommandExecutionError: If `verify` is :data:`True` and the command to stop
1485                forwarding results in an error.
1486
1487        Returns:
1488            Output gathered from the stop command and all other preceding logs in the buffer. This
1489            output is most often used to view forwarding statistics that are displayed when this
1490            command is sent as well as any verbose packet information that hasn't been consumed
1491            prior to calling this method.
1492        """
1493        stop_cmd_output = self.send_command("stop")
1494        if verify:
1495            if (
1496                "Done." not in stop_cmd_output
1497                and "Packet forwarding not started" not in stop_cmd_output
1498            ):
1499                self._logger.debug(f"Failed to stop packet forwarding: \n{stop_cmd_output}")
1500                raise InteractiveCommandExecutionError("Testpmd failed to stop packet forwarding.")
1501        return stop_cmd_output
1502
1503    def get_devices(self) -> list[TestPmdDevice]:
1504        """Get a list of device names that are known to testpmd.
1505
1506        Uses the device info listed in testpmd and then parses the output.
1507
1508        Returns:
1509            A list of devices.
1510        """
1511        dev_info: str = self.send_command("show device info all")
1512        dev_list: list[TestPmdDevice] = []
1513        for line in dev_info.split("\n"):
1514            if "device name:" in line.lower():
1515                dev_list.append(TestPmdDevice(line))
1516        return dev_list
1517
1518    def wait_link_status_up(self, port_id: int, timeout=SETTINGS.timeout) -> bool:
1519        """Wait until the link status on the given port is "up".
1520
1521        Arguments:
1522            port_id: Port to check the link status on.
1523            timeout: Time to wait for the link to come up. The default value for this
1524                argument may be modified using the :option:`--timeout` command-line argument
1525                or the :envvar:`DTS_TIMEOUT` environment variable.
1526
1527        Returns:
1528            Whether the link came up in time or not.
1529        """
1530        time_to_stop = time.time() + timeout
1531        port_info: str = ""
1532        while time.time() < time_to_stop:
1533            port_info = self.send_command(f"show port info {port_id}")
1534            if "Link status: up" in port_info:
1535                break
1536            time.sleep(0.5)
1537        else:
1538            self._logger.error(f"The link for port {port_id} did not come up in the given timeout.")
1539        return "Link status: up" in port_info
1540
1541    def set_forward_mode(self, mode: SimpleForwardingModes, verify: bool = True):
1542        """Set packet forwarding mode.
1543
1544        Args:
1545            mode: The forwarding mode to use.
1546            verify: If :data:`True` the output of the command will be scanned in an attempt to
1547                verify that the forwarding mode was set to `mode` properly.
1548
1549        Raises:
1550            InteractiveCommandExecutionError: If `verify` is :data:`True` and the forwarding mode
1551                fails to update.
1552        """
1553        set_fwd_output = self.send_command(f"set fwd {mode.value}")
1554        if f"Set {mode.value} packet forwarding mode" not in set_fwd_output:
1555            self._logger.debug(f"Failed to set fwd mode to {mode.value}:\n{set_fwd_output}")
1556            raise InteractiveCommandExecutionError(
1557                f"Test pmd failed to set fwd mode to {mode.value}"
1558            )
1559
1560    def stop_all_ports(self, verify: bool = True) -> None:
1561        """Stops all the ports.
1562
1563        Args:
1564            verify: If :data:`True`, the output of the command will be checked for a successful
1565                execution.
1566
1567        Raises:
1568            InteractiveCommandExecutionError: If `verify` is :data:`True` and the ports were not
1569                stopped successfully.
1570        """
1571        self._logger.debug("Stopping all the ports...")
1572        output = self.send_command("port stop all")
1573        if verify and not output.strip().endswith("Done"):
1574            raise InteractiveCommandExecutionError("Ports were not stopped successfully.")
1575
1576        self.ports_started = False
1577
1578    def start_all_ports(self, verify: bool = True) -> None:
1579        """Starts all the ports.
1580
1581        Args:
1582            verify: If :data:`True`, the output of the command will be checked for a successful
1583                execution.
1584
1585        Raises:
1586            InteractiveCommandExecutionError: If `verify` is :data:`True` and the ports were not
1587                started successfully.
1588        """
1589        self._logger.debug("Starting all the ports...")
1590        output = self.send_command("port start all")
1591        if verify and not output.strip().endswith("Done"):
1592            raise InteractiveCommandExecutionError("Ports were not started successfully.")
1593
1594        self.ports_started = True
1595
1596    @requires_stopped_ports
1597    def set_ports_queues(self, number_of: int) -> None:
1598        """Sets the number of queues per port.
1599
1600        Args:
1601            number_of: The number of RX/TX queues to create per port.
1602
1603        Raises:
1604            InternalError: If `number_of` is invalid.
1605        """
1606        if number_of < 1:
1607            raise InternalError("The number of queues must be positive and non-zero.")
1608
1609        self.send_command(f"port config all rxq {number_of}")
1610        self.send_command(f"port config all txq {number_of}")
1611
1612    def show_port_info_all(self) -> list[TestPmdPort]:
1613        """Returns the information of all the ports.
1614
1615        Returns:
1616            list[TestPmdPort]: A list containing all the ports information as `TestPmdPort`.
1617        """
1618        output = self.send_command("show port info all")
1619
1620        # Sample output of the "all" command looks like:
1621        #
1622        # <start>
1623        #
1624        #   ********************* Infos for port 0 *********************
1625        #   Key: value
1626        #
1627        #   ********************* Infos for port 1 *********************
1628        #   Key: value
1629        # <end>
1630        #
1631        # Takes advantage of the double new line in between ports as end delimiter. But we need to
1632        # artificially add a new line at the end to pick up the last port. Because commands are
1633        # executed on a pseudo-terminal created by paramiko on the remote node, lines end with CRLF.
1634        # Therefore we also need to take the carriage return into account.
1635        iter = re.finditer(r"\*{21}.*?[\r\n]{4}", output + "\r\n", re.S)
1636        self._ports = [TestPmdPort.parse(block.group(0)) for block in iter]
1637        return self._ports
1638
1639    def show_port_info(self, port_id: int) -> TestPmdPort:
1640        """Returns the given port information.
1641
1642        Args:
1643            port_id: The port ID to gather information for.
1644
1645        Raises:
1646            InteractiveCommandExecutionError: If `port_id` is invalid.
1647
1648        Returns:
1649            TestPmdPort: An instance of `TestPmdPort` containing the given port's information.
1650        """
1651        output = self.send_command(f"show port info {port_id}", skip_first_line=True)
1652        if output.startswith("Invalid port"):
1653            raise InteractiveCommandExecutionError("invalid port given")
1654
1655        port = TestPmdPort.parse(output)
1656        self._update_port(port)
1657        return port
1658
1659    def _update_port(self, port: TestPmdPort) -> None:
1660        if self._ports:
1661            self._ports = [
1662                existing_port if port.id != existing_port.id else port
1663                for existing_port in self._ports
1664            ]
1665
1666    def show_port_stats_all(self) -> list[TestPmdPortStats]:
1667        """Returns the statistics of all the ports.
1668
1669        Returns:
1670            list[TestPmdPortStats]: A list containing all the ports stats as `TestPmdPortStats`.
1671        """
1672        output = self.send_command("show port stats all")
1673
1674        # Sample output of the "all" command looks like:
1675        #
1676        #   ########### NIC statistics for port 0 ###########
1677        #   values...
1678        #   #################################################
1679        #
1680        #   ########### NIC statistics for port 1 ###########
1681        #   values...
1682        #   #################################################
1683        #
1684        iter = re.finditer(r"(^  #*.+#*$[^#]+)^  #*\r$", output, re.MULTILINE)
1685        return [TestPmdPortStats.parse(block.group(1)) for block in iter]
1686
1687    def show_port_stats(self, port_id: int) -> TestPmdPortStats:
1688        """Returns the given port statistics.
1689
1690        Args:
1691            port_id: The port ID to gather information for.
1692
1693        Raises:
1694            InteractiveCommandExecutionError: If `port_id` is invalid.
1695
1696        Returns:
1697            TestPmdPortStats: An instance of `TestPmdPortStats` containing the given port's stats.
1698        """
1699        output = self.send_command(f"show port stats {port_id}", skip_first_line=True)
1700        if output.startswith("Invalid port"):
1701            raise InteractiveCommandExecutionError("invalid port given")
1702
1703        return TestPmdPortStats.parse(output)
1704
1705    @requires_stopped_ports
1706    def set_port_mtu(self, port_id: int, mtu: int, verify: bool = True) -> None:
1707        """Change the MTU of a port using testpmd.
1708
1709        Some PMDs require that the port be stopped before changing the MTU, and it does no harm to
1710        stop the port before configuring in cases where it isn't required, so ports are stopped
1711        prior to changing their MTU.
1712
1713        Args:
1714            port_id: ID of the port to adjust the MTU on.
1715            mtu: Desired value for the MTU to be set to.
1716            verify: If `verify` is :data:`True` then the output will be scanned in an attempt to
1717                verify that the mtu was properly set on the port. Defaults to :data:`True`.
1718
1719        Raises:
1720            InteractiveCommandExecutionError: If `verify` is :data:`True` and the MTU was not
1721                properly updated on the port matching `port_id`.
1722        """
1723        set_mtu_output = self.send_command(f"port config mtu {port_id} {mtu}")
1724        if verify and (f"MTU: {mtu}" not in self.send_command(f"show port info {port_id}")):
1725            self._logger.debug(
1726                f"Failed to set mtu to {mtu} on port {port_id}." f" Output was:\n{set_mtu_output}"
1727            )
1728            raise InteractiveCommandExecutionError(
1729                f"Test pmd failed to update mtu of port {port_id} to {mtu}"
1730            )
1731
1732    def set_port_mtu_all(self, mtu: int, verify: bool = True) -> None:
1733        """Change the MTU of all ports using testpmd.
1734
1735        Runs :meth:`set_port_mtu` for every port that testpmd is aware of.
1736
1737        Args:
1738            mtu: Desired value for the MTU to be set to.
1739            verify: Whether to verify that setting the MTU on each port was successful or not.
1740                Defaults to :data:`True`.
1741
1742        Raises:
1743            InteractiveCommandExecutionError: If `verify` is :data:`True` and the MTU was not
1744                properly updated on at least one port.
1745        """
1746        for port in self.ports:
1747            self.set_port_mtu(port.id, mtu, verify)
1748
1749    @staticmethod
1750    def extract_verbose_output(output: str) -> list[TestPmdVerbosePacket]:
1751        """Extract the verbose information present in given testpmd output.
1752
1753        This method extracts sections of verbose output that begin with the line
1754        "port X/queue Y: sent/received Z packets" and end with the ol_flags of a packet.
1755
1756        Args:
1757            output: Testpmd output that contains verbose information
1758
1759        Returns:
1760            List of parsed packet information gathered from verbose information in `output`.
1761        """
1762        out: list[TestPmdVerbosePacket] = []
1763        prev_header: str = ""
1764        iter = re.finditer(
1765            r"(?P<HEADER>(?:port \d+/queue \d+: (?:received|sent) \d+ packets)?)\s*"
1766            r"(?P<PACKET>src=[\w\s=:-]+?ol_flags: [\w ]+)",
1767            output,
1768        )
1769        for match in iter:
1770            if match.group("HEADER"):
1771                prev_header = match.group("HEADER")
1772            out.append(TestPmdVerbosePacket.parse(f"{prev_header}\n{match.group('PACKET')}"))
1773        return out
1774
1775    def set_vlan_filter(self, port: int, enable: bool, verify: bool = True) -> None:
1776        """Set vlan filter on.
1777
1778        Args:
1779            port: The port number to enable VLAN filter on.
1780            enable: Enable the filter on `port` if :data:`True`, otherwise disable it.
1781            verify: If :data:`True`, the output of the command and show port info
1782                is scanned to verify that vlan filtering was set successfully.
1783
1784        Raises:
1785            InteractiveCommandExecutionError: If `verify` is :data:`True` and the filter
1786                fails to update.
1787        """
1788        filter_cmd_output = self.send_command(f"vlan set filter {'on' if enable else 'off'} {port}")
1789        if verify:
1790            vlan_settings = self.show_port_info(port_id=port).vlan_offload
1791            if enable ^ (vlan_settings is not None and VLANOffloadFlag.FILTER in vlan_settings):
1792                self._logger.debug(
1793                    f"""Failed to {'enable' if enable else 'disable'}
1794                                   filter on port {port}: \n{filter_cmd_output}"""
1795                )
1796                raise InteractiveCommandExecutionError(
1797                    f"""Failed to {'enable' if enable else 'disable'}
1798                    filter on port {port}"""
1799                )
1800
1801    def rx_vlan(self, vlan: int, port: int, add: bool, verify: bool = True) -> None:
1802        """Add specified vlan tag to the filter list on a port. Requires vlan filter to be on.
1803
1804        Args:
1805            vlan: The vlan tag to add, should be within 1-1005.
1806            port: The port number to add the tag on.
1807            add: Adds the tag if :data:`True`, otherwise removes the tag.
1808            verify: If :data:`True`, the output of the command is scanned to verify that
1809                the vlan tag was added to the filter list on the specified port.
1810
1811        Raises:
1812            InteractiveCommandExecutionError: If `verify` is :data:`True` and the tag
1813                is not added.
1814        """
1815        rx_cmd_output = self.send_command(f"rx_vlan {'add' if add else 'rm'} {vlan} {port}")
1816        if verify:
1817            if (
1818                "VLAN-filtering disabled" in rx_cmd_output
1819                or "Invalid vlan_id" in rx_cmd_output
1820                or "Bad arguments" in rx_cmd_output
1821            ):
1822                self._logger.debug(
1823                    f"""Failed to {'add' if add else 'remove'} tag {vlan}
1824                    port {port}: \n{rx_cmd_output}"""
1825                )
1826                raise InteractiveCommandExecutionError(
1827                    f"Testpmd failed to {'add' if add else 'remove'} tag {vlan} on port {port}."
1828                )
1829
1830    def set_vlan_strip(self, port: int, enable: bool, verify: bool = True) -> None:
1831        """Enable or disable vlan stripping on the specified port.
1832
1833        Args:
1834            port: The port number to use.
1835            enable: If :data:`True`, will turn vlan stripping on, otherwise will turn off.
1836            verify: If :data:`True`, the output of the command and show port info
1837                is scanned to verify that vlan stripping was enabled on the specified port.
1838
1839        Raises:
1840            InteractiveCommandExecutionError: If `verify` is :data:`True` and stripping
1841                fails to update.
1842        """
1843        strip_cmd_output = self.send_command(f"vlan set strip {'on' if enable else 'off'} {port}")
1844        if verify:
1845            vlan_settings = self.show_port_info(port_id=port).vlan_offload
1846            if enable ^ (vlan_settings is not None and VLANOffloadFlag.STRIP in vlan_settings):
1847                self._logger.debug(
1848                    f"""Failed to set strip {'on' if enable else 'off'}
1849                    port {port}: \n{strip_cmd_output}"""
1850                )
1851                raise InteractiveCommandExecutionError(
1852                    f"Testpmd failed to set strip {'on' if enable else 'off'} port {port}."
1853                )
1854
1855    @requires_stopped_ports
1856    def tx_vlan_set(
1857        self, port: int, enable: bool, vlan: int | None = None, verify: bool = True
1858    ) -> None:
1859        """Set hardware insertion of vlan tags in packets sent on a port.
1860
1861        Args:
1862            port: The port number to use.
1863            enable: Sets vlan tag insertion if :data:`True`, and resets if :data:`False`.
1864            vlan: The vlan tag to insert if enable is :data:`True`.
1865            verify: If :data:`True`, the output of the command is scanned to verify that
1866                vlan insertion was enabled on the specified port.
1867
1868        Raises:
1869            InteractiveCommandExecutionError: If `verify` is :data:`True` and the insertion
1870                tag is not set.
1871        """
1872        if enable:
1873            tx_vlan_cmd_output = self.send_command(f"tx_vlan set {port} {vlan}")
1874            if verify:
1875                if (
1876                    "Please stop port" in tx_vlan_cmd_output
1877                    or "Invalid vlan_id" in tx_vlan_cmd_output
1878                    or "Invalid port" in tx_vlan_cmd_output
1879                ):
1880                    self._logger.debug(
1881                        f"Failed to set vlan tag {vlan} on port {port}:\n{tx_vlan_cmd_output}"
1882                    )
1883                    raise InteractiveCommandExecutionError(
1884                        f"Testpmd failed to set vlan insertion tag {vlan} on port {port}."
1885                    )
1886        else:
1887            tx_vlan_cmd_output = self.send_command(f"tx_vlan reset {port}")
1888            if verify:
1889                if "Please stop port" in tx_vlan_cmd_output or "Invalid port" in tx_vlan_cmd_output:
1890                    self._logger.debug(
1891                        f"Failed to reset vlan insertion on port {port}: \n{tx_vlan_cmd_output}"
1892                    )
1893                    raise InteractiveCommandExecutionError(
1894                        f"Testpmd failed to reset vlan insertion on port {port}."
1895                    )
1896
1897    def set_promisc(self, port: int, enable: bool, verify: bool = True) -> None:
1898        """Enable or disable promiscuous mode for the specified port.
1899
1900        Args:
1901            port: Port number to use.
1902            enable: If :data:`True`, turn promiscuous mode on, otherwise turn off.
1903            verify: If :data:`True` an additional command will be sent to verify that
1904                promiscuous mode is properly set. Defaults to :data:`True`.
1905
1906        Raises:
1907            InteractiveCommandExecutionError: If `verify` is :data:`True` and promiscuous mode
1908                is not correctly set.
1909        """
1910        promisc_cmd_output = self.send_command(f"set promisc {port} {'on' if enable else 'off'}")
1911        if verify:
1912            stats = self.show_port_info(port_id=port)
1913            if enable ^ stats.is_promiscuous_mode_enabled:
1914                self._logger.debug(
1915                    f"Failed to set promiscuous mode on port {port}: \n{promisc_cmd_output}"
1916                )
1917                raise InteractiveCommandExecutionError(
1918                    f"Testpmd failed to set promiscuous mode on port {port}."
1919                )
1920
1921    def set_verbose(self, level: int, verify: bool = True) -> None:
1922        """Set debug verbosity level.
1923
1924        Args:
1925            level: 0 - silent except for error
1926                1 - fully verbose except for Tx packets
1927                2 - fully verbose except for Rx packets
1928                >2 - fully verbose
1929            verify: If :data:`True` the command output will be scanned to verify that verbose level
1930                is properly set. Defaults to :data:`True`.
1931
1932        Raises:
1933            InteractiveCommandExecutionError: If `verify` is :data:`True` and verbose level
1934            is not correctly set.
1935        """
1936        verbose_cmd_output = self.send_command(f"set verbose {level}")
1937        if verify:
1938            if "Change verbose level" not in verbose_cmd_output:
1939                self._logger.debug(
1940                    f"Failed to set verbose level to {level}: \n{verbose_cmd_output}"
1941                )
1942                raise InteractiveCommandExecutionError(
1943                    f"Testpmd failed to set verbose level to {level}."
1944                )
1945
1946    def _close(self) -> None:
1947        """Overrides :meth:`~.interactive_shell.close`."""
1948        self.stop()
1949        self.send_command("quit", "Bye...")
1950        return super()._close()
1951
1952    """
1953    ====== Capability retrieval methods ======
1954    """
1955
1956    def get_capabilities_rx_offload(
1957        self,
1958        supported_capabilities: MutableSet["NicCapability"],
1959        unsupported_capabilities: MutableSet["NicCapability"],
1960    ) -> None:
1961        """Get all rx offload capabilities and divide them into supported and unsupported.
1962
1963        Args:
1964            supported_capabilities: Supported capabilities will be added to this set.
1965            unsupported_capabilities: Unsupported capabilities will be added to this set.
1966        """
1967        self._logger.debug("Getting rx offload capabilities.")
1968        command = f"show port {self.ports[0].id} rx_offload capabilities"
1969        rx_offload_capabilities_out = self.send_command(command)
1970        rx_offload_capabilities = RxOffloadCapabilities.parse(rx_offload_capabilities_out)
1971        self._update_capabilities_from_flag(
1972            supported_capabilities,
1973            unsupported_capabilities,
1974            RxOffloadCapability,
1975            rx_offload_capabilities.per_port | rx_offload_capabilities.per_queue,
1976        )
1977
1978    def _update_capabilities_from_flag(
1979        self,
1980        supported_capabilities: MutableSet["NicCapability"],
1981        unsupported_capabilities: MutableSet["NicCapability"],
1982        flag_class: type[Flag],
1983        supported_flags: Flag,
1984    ) -> None:
1985        """Divide all flags from `flag_class` into supported and unsupported."""
1986        for flag in flag_class:
1987            if flag in supported_flags:
1988                supported_capabilities.add(NicCapability[str(flag.name)])
1989            else:
1990                unsupported_capabilities.add(NicCapability[str(flag.name)])
1991
1992    @requires_started_ports
1993    def get_capabilities_rxq_info(
1994        self,
1995        supported_capabilities: MutableSet["NicCapability"],
1996        unsupported_capabilities: MutableSet["NicCapability"],
1997    ) -> None:
1998        """Get all rxq capabilities and divide them into supported and unsupported.
1999
2000        Args:
2001            supported_capabilities: Supported capabilities will be added to this set.
2002            unsupported_capabilities: Unsupported capabilities will be added to this set.
2003        """
2004        self._logger.debug("Getting rxq capabilities.")
2005        command = f"show rxq info {self.ports[0].id} 0"
2006        rxq_info = TestPmdRxqInfo.parse(self.send_command(command))
2007        if rxq_info.rx_scattered_packets:
2008            supported_capabilities.add(NicCapability.SCATTERED_RX_ENABLED)
2009        else:
2010            unsupported_capabilities.add(NicCapability.SCATTERED_RX_ENABLED)
2011
2012    def get_capabilities_show_port_info(
2013        self,
2014        supported_capabilities: MutableSet["NicCapability"],
2015        unsupported_capabilities: MutableSet["NicCapability"],
2016    ) -> None:
2017        """Get all capabilities from show port info and divide them into supported and unsupported.
2018
2019        Args:
2020            supported_capabilities: Supported capabilities will be added to this set.
2021            unsupported_capabilities: Unsupported capabilities will be added to this set.
2022        """
2023        self._update_capabilities_from_flag(
2024            supported_capabilities,
2025            unsupported_capabilities,
2026            DeviceCapabilitiesFlag,
2027            self.ports[0].device_capabilities,
2028        )
2029
2030
2031class NicCapability(NoAliasEnum):
2032    """A mapping between capability names and the associated :class:`TestPmdShell` methods.
2033
2034    The :class:`TestPmdShell` capability checking method executes the command that checks
2035    whether the capability is supported.
2036    A decorator may optionally be added to the method that will add and remove configuration
2037    that's necessary to retrieve the capability support status.
2038    The Enum members may be assigned the method itself or a tuple of
2039    (capability_checking_method, decorator_function).
2040
2041    The signature of each :class:`TestPmdShell` capability checking method must be::
2042
2043        fn(self, supported_capabilities: MutableSet, unsupported_capabilities: MutableSet) -> None
2044
2045    The capability checking method must get whether a capability is supported or not
2046    from a testpmd command. If multiple capabilities can be obtained from a testpmd command,
2047    each should be obtained in the method. These capabilities should then
2048    be added to `supported_capabilities` or `unsupported_capabilities` based on their support.
2049
2050    The two dictionaries are shared across all capability discovery function calls in a given
2051    test run so that we don't call the same function multiple times. For example, when we find
2052    :attr:`SCATTERED_RX_ENABLED` in :meth:`TestPmdShell.get_capabilities_rxq_info`,
2053    we don't go looking for it again if a different test case also needs it.
2054    """
2055
2056    #: Scattered packets Rx enabled
2057    SCATTERED_RX_ENABLED: TestPmdShellNicCapability = (
2058        TestPmdShell.get_capabilities_rxq_info,
2059        add_remove_mtu(9000),
2060    )
2061    #:
2062    RX_OFFLOAD_VLAN_STRIP: TestPmdShellCapabilityMethod = functools.partial(
2063        TestPmdShell.get_capabilities_rx_offload
2064    )
2065    #: Device supports L3 checksum offload.
2066    RX_OFFLOAD_IPV4_CKSUM: TestPmdShellCapabilityMethod = functools.partial(
2067        TestPmdShell.get_capabilities_rx_offload
2068    )
2069    #: Device supports L4 checksum offload.
2070    RX_OFFLOAD_UDP_CKSUM: TestPmdShellCapabilityMethod = functools.partial(
2071        TestPmdShell.get_capabilities_rx_offload
2072    )
2073    #: Device supports L4 checksum offload.
2074    RX_OFFLOAD_TCP_CKSUM: TestPmdShellCapabilityMethod = functools.partial(
2075        TestPmdShell.get_capabilities_rx_offload
2076    )
2077    #: Device supports Large Receive Offload.
2078    RX_OFFLOAD_TCP_LRO: TestPmdShellCapabilityMethod = functools.partial(
2079        TestPmdShell.get_capabilities_rx_offload
2080    )
2081    #: Device supports QinQ (queue in queue) offload.
2082    RX_OFFLOAD_QINQ_STRIP: TestPmdShellCapabilityMethod = functools.partial(
2083        TestPmdShell.get_capabilities_rx_offload
2084    )
2085    #: Device supports inner packet L3 checksum.
2086    RX_OFFLOAD_OUTER_IPV4_CKSUM: TestPmdShellCapabilityMethod = functools.partial(
2087        TestPmdShell.get_capabilities_rx_offload
2088    )
2089    #: Device supports MACsec.
2090    RX_OFFLOAD_MACSEC_STRIP: TestPmdShellCapabilityMethod = functools.partial(
2091        TestPmdShell.get_capabilities_rx_offload
2092    )
2093    #: Device supports filtering of a VLAN Tag identifier.
2094    RX_OFFLOAD_VLAN_FILTER: TestPmdShellCapabilityMethod = functools.partial(
2095        TestPmdShell.get_capabilities_rx_offload
2096    )
2097    #: Device supports VLAN offload.
2098    RX_OFFLOAD_VLAN_EXTEND: TestPmdShellCapabilityMethod = functools.partial(
2099        TestPmdShell.get_capabilities_rx_offload
2100    )
2101    #: Device supports receiving segmented mbufs.
2102    RX_OFFLOAD_SCATTER: TestPmdShellCapabilityMethod = functools.partial(
2103        TestPmdShell.get_capabilities_rx_offload
2104    )
2105    #: Device supports Timestamp.
2106    RX_OFFLOAD_TIMESTAMP: TestPmdShellCapabilityMethod = functools.partial(
2107        TestPmdShell.get_capabilities_rx_offload
2108    )
2109    #: Device supports crypto processing while packet is received in NIC.
2110    RX_OFFLOAD_SECURITY: TestPmdShellCapabilityMethod = functools.partial(
2111        TestPmdShell.get_capabilities_rx_offload
2112    )
2113    #: Device supports CRC stripping.
2114    RX_OFFLOAD_KEEP_CRC: TestPmdShellCapabilityMethod = functools.partial(
2115        TestPmdShell.get_capabilities_rx_offload
2116    )
2117    #: Device supports L4 checksum offload.
2118    RX_OFFLOAD_SCTP_CKSUM: TestPmdShellCapabilityMethod = functools.partial(
2119        TestPmdShell.get_capabilities_rx_offload
2120    )
2121    #: Device supports inner packet L4 checksum.
2122    RX_OFFLOAD_OUTER_UDP_CKSUM: TestPmdShellCapabilityMethod = functools.partial(
2123        TestPmdShell.get_capabilities_rx_offload
2124    )
2125    #: Device supports RSS hashing.
2126    RX_OFFLOAD_RSS_HASH: TestPmdShellCapabilityMethod = functools.partial(
2127        TestPmdShell.get_capabilities_rx_offload
2128    )
2129    #: Device supports scatter Rx packets to segmented mbufs.
2130    RX_OFFLOAD_BUFFER_SPLIT: TestPmdShellCapabilityMethod = functools.partial(
2131        TestPmdShell.get_capabilities_rx_offload
2132    )
2133    #: Device supports all checksum capabilities.
2134    RX_OFFLOAD_CHECKSUM: TestPmdShellCapabilityMethod = functools.partial(
2135        TestPmdShell.get_capabilities_rx_offload
2136    )
2137    #: Device supports all VLAN capabilities.
2138    RX_OFFLOAD_VLAN: TestPmdShellCapabilityMethod = functools.partial(
2139        TestPmdShell.get_capabilities_rx_offload
2140    )
2141    #: Device supports Rx queue setup after device started.
2142    RUNTIME_RX_QUEUE_SETUP: TestPmdShellCapabilityMethod = functools.partial(
2143        TestPmdShell.get_capabilities_show_port_info
2144    )
2145    #: Device supports Tx queue setup after device started.
2146    RUNTIME_TX_QUEUE_SETUP: TestPmdShellCapabilityMethod = functools.partial(
2147        TestPmdShell.get_capabilities_show_port_info
2148    )
2149    #: Device supports shared Rx queue among ports within Rx domain and switch domain.
2150    RXQ_SHARE: TestPmdShellCapabilityMethod = functools.partial(
2151        TestPmdShell.get_capabilities_show_port_info
2152    )
2153    #: Device supports keeping flow rules across restart.
2154    FLOW_RULE_KEEP: TestPmdShellCapabilityMethod = functools.partial(
2155        TestPmdShell.get_capabilities_show_port_info
2156    )
2157    #: Device supports keeping shared flow objects across restart.
2158    FLOW_SHARED_OBJECT_KEEP: TestPmdShellCapabilityMethod = functools.partial(
2159        TestPmdShell.get_capabilities_show_port_info
2160    )
2161
2162    def __call__(
2163        self,
2164        testpmd_shell: TestPmdShell,
2165        supported_capabilities: MutableSet[Self],
2166        unsupported_capabilities: MutableSet[Self],
2167    ) -> None:
2168        """Execute the associated capability retrieval function.
2169
2170        Args:
2171            testpmd_shell: :class:`TestPmdShell` object to which the function will be bound to.
2172            supported_capabilities: The dictionary storing the supported capabilities
2173                of a given test run.
2174            unsupported_capabilities: The dictionary storing the unsupported capabilities
2175                of a given test run.
2176        """
2177        self.value(testpmd_shell, supported_capabilities, unsupported_capabilities)
2178