xref: /dpdk/dts/framework/remote_session/testpmd_shell.py (revision bee7cf823cd80f83b026a02508ba05fdac1b4113)
1# SPDX-License-Identifier: BSD-3-Clause
2# Copyright(c) 2023 University of New Hampshire
3# Copyright(c) 2023 PANTHEON.tech s.r.o.
4# Copyright(c) 2024 Arm Limited
5
6"""Testpmd interactive shell.
7
8Typical usage example in a TestSuite::
9
10    testpmd_shell = TestPmdShell(self.sut_node)
11    devices = testpmd_shell.get_devices()
12    for device in devices:
13        print(device)
14    testpmd_shell.close()
15"""
16
17import functools
18import re
19import time
20from collections.abc import Callable, MutableSet
21from dataclasses import dataclass, field
22from enum import Flag, auto
23from os import environ
24from pathlib import PurePath
25from typing import TYPE_CHECKING, Any, ClassVar, Concatenate, ParamSpec, TypeAlias
26
27if TYPE_CHECKING or environ.get("DTS_DOC_BUILD"):
28    from enum import Enum as NoAliasEnum
29else:
30    from aenum import NoAliasEnum
31
32from typing_extensions import Self, Unpack
33
34from framework.exception import InteractiveCommandExecutionError, InternalError
35from framework.params.testpmd import SimpleForwardingModes, TestPmdParams
36from framework.params.types import TestPmdParamsDict
37from framework.parser import ParserFn, TextParser
38from framework.remote_session.dpdk_shell import DPDKShell
39from framework.settings import SETTINGS
40from framework.testbed_model.cpu import LogicalCoreCount, LogicalCoreList
41from framework.testbed_model.sut_node import SutNode
42from framework.utils import REGEX_FOR_MAC_ADDRESS, StrEnum
43
44P = ParamSpec("P")
45TestPmdShellMethod = Callable[Concatenate["TestPmdShell", P], Any]
46
47TestPmdShellCapabilityMethod: TypeAlias = Callable[
48    ["TestPmdShell", MutableSet["NicCapability"], MutableSet["NicCapability"]], None
49]
50
51TestPmdShellDecorator: TypeAlias = Callable[[TestPmdShellMethod], TestPmdShellMethod]
52
53TestPmdShellNicCapability = (
54    TestPmdShellCapabilityMethod | tuple[TestPmdShellCapabilityMethod, TestPmdShellDecorator]
55)
56
57
58class TestPmdDevice:
59    """The data of a device that testpmd can recognize.
60
61    Attributes:
62        pci_address: The PCI address of the device.
63    """
64
65    pci_address: str
66
67    def __init__(self, pci_address_line: str):
68        """Initialize the device from the testpmd output line string.
69
70        Args:
71            pci_address_line: A line of testpmd output that contains a device.
72        """
73        self.pci_address = pci_address_line.strip().split(": ")[1].strip()
74
75    def __str__(self) -> str:
76        """The PCI address captures what the device is."""
77        return self.pci_address
78
79
80class VLANOffloadFlag(Flag):
81    """Flag representing the VLAN offload settings of a NIC port."""
82
83    #:
84    STRIP = auto()
85    #:
86    FILTER = auto()
87    #:
88    EXTEND = auto()
89    #:
90    QINQ_STRIP = auto()
91
92    @classmethod
93    def from_str_dict(cls, d):
94        """Makes an instance from a dict containing the flag member names with an "on" value.
95
96        Args:
97            d: A dictionary containing the flag members as keys and any string value.
98
99        Returns:
100            A new instance of the flag.
101        """
102        flag = cls(0)
103        for name in cls.__members__:
104            if d.get(name) == "on":
105                flag |= cls[name]
106        return flag
107
108    @classmethod
109    def make_parser(cls) -> ParserFn:
110        """Makes a parser function.
111
112        Returns:
113            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
114                parser function that makes an instance of this flag from text.
115        """
116        return TextParser.wrap(
117            TextParser.find(
118                r"VLAN offload:\s+"
119                r"strip (?P<STRIP>on|off), "
120                r"filter (?P<FILTER>on|off), "
121                r"extend (?P<EXTEND>on|off), "
122                r"qinq strip (?P<QINQ_STRIP>on|off)",
123                re.MULTILINE,
124                named=True,
125            ),
126            cls.from_str_dict,
127        )
128
129
130class ChecksumOffloadOptions(Flag):
131    """Flag representing checksum hardware offload layer options."""
132
133    #:
134    ip = auto()
135    #:
136    udp = auto()
137    #:
138    tcp = auto()
139    #:
140    sctp = auto()
141    #:
142    outer_ip = auto()
143    #:
144    outer_udp = auto()
145
146
147class RSSOffloadTypesFlag(Flag):
148    """Flag representing the RSS offload flow types supported by the NIC port."""
149
150    #:
151    ipv4 = auto()
152    #:
153    ipv4_frag = auto()
154    #:
155    ipv4_tcp = auto()
156    #:
157    ipv4_udp = auto()
158    #:
159    ipv4_sctp = auto()
160    #:
161    ipv4_other = auto()
162    #:
163    ipv6 = auto()
164    #:
165    ipv6_frag = auto()
166    #:
167    ipv6_tcp = auto()
168    #:
169    ipv6_udp = auto()
170    #:
171    ipv6_sctp = auto()
172    #:
173    ipv6_other = auto()
174    #:
175    l2_payload = auto()
176    #:
177    ipv6_ex = auto()
178    #:
179    ipv6_tcp_ex = auto()
180    #:
181    ipv6_udp_ex = auto()
182    #:
183    port = auto()
184    #:
185    vxlan = auto()
186    #:
187    geneve = auto()
188    #:
189    nvgre = auto()
190    #:
191    user_defined_22 = auto()
192    #:
193    gtpu = auto()
194    #:
195    eth = auto()
196    #:
197    s_vlan = auto()
198    #:
199    c_vlan = auto()
200    #:
201    esp = auto()
202    #:
203    ah = auto()
204    #:
205    l2tpv3 = auto()
206    #:
207    pfcp = auto()
208    #:
209    pppoe = auto()
210    #:
211    ecpri = auto()
212    #:
213    mpls = auto()
214    #:
215    ipv4_chksum = auto()
216    #:
217    l4_chksum = auto()
218    #:
219    l2tpv2 = auto()
220    #:
221    ipv6_flow_label = auto()
222    #:
223    user_defined_38 = auto()
224    #:
225    user_defined_39 = auto()
226    #:
227    user_defined_40 = auto()
228    #:
229    user_defined_41 = auto()
230    #:
231    user_defined_42 = auto()
232    #:
233    user_defined_43 = auto()
234    #:
235    user_defined_44 = auto()
236    #:
237    user_defined_45 = auto()
238    #:
239    user_defined_46 = auto()
240    #:
241    user_defined_47 = auto()
242    #:
243    user_defined_48 = auto()
244    #:
245    user_defined_49 = auto()
246    #:
247    user_defined_50 = auto()
248    #:
249    user_defined_51 = auto()
250    #:
251    l3_pre96 = auto()
252    #:
253    l3_pre64 = auto()
254    #:
255    l3_pre56 = auto()
256    #:
257    l3_pre48 = auto()
258    #:
259    l3_pre40 = auto()
260    #:
261    l3_pre32 = auto()
262    #:
263    l2_dst_only = auto()
264    #:
265    l2_src_only = auto()
266    #:
267    l4_dst_only = auto()
268    #:
269    l4_src_only = auto()
270    #:
271    l3_dst_only = auto()
272    #:
273    l3_src_only = auto()
274
275    #:
276    ip = ipv4 | ipv4_frag | ipv4_other | ipv6 | ipv6_frag | ipv6_other | ipv6_ex
277    #:
278    udp = ipv4_udp | ipv6_udp | ipv6_udp_ex
279    #:
280    tcp = ipv4_tcp | ipv6_tcp | ipv6_tcp_ex
281    #:
282    sctp = ipv4_sctp | ipv6_sctp
283    #:
284    tunnel = vxlan | geneve | nvgre
285    #:
286    vlan = s_vlan | c_vlan
287    #:
288    all = (
289        eth
290        | vlan
291        | ip
292        | tcp
293        | udp
294        | sctp
295        | l2_payload
296        | l2tpv3
297        | esp
298        | ah
299        | pfcp
300        | gtpu
301        | ecpri
302        | mpls
303        | l2tpv2
304    )
305
306    @classmethod
307    def from_list_string(cls, names: str) -> Self:
308        """Makes a flag from a whitespace-separated list of names.
309
310        Args:
311            names: a whitespace-separated list containing the members of this flag.
312
313        Returns:
314            An instance of this flag.
315        """
316        flag = cls(0)
317        for name in names.split():
318            flag |= cls.from_str(name)
319        return flag
320
321    @classmethod
322    def from_str(cls, name: str) -> Self:
323        """Makes a flag matching the supplied name.
324
325        Args:
326            name: a valid member of this flag in text
327        Returns:
328            An instance of this flag.
329        """
330        member_name = name.strip().replace("-", "_")
331        return cls[member_name]
332
333    @classmethod
334    def make_parser(cls) -> ParserFn:
335        """Makes a parser function.
336
337        Returns:
338            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
339                parser function that makes an instance of this flag from text.
340        """
341        return TextParser.wrap(
342            TextParser.find(r"Supported RSS offload flow types:((?:\r?\n?  \S+)+)", re.MULTILINE),
343            RSSOffloadTypesFlag.from_list_string,
344        )
345
346
347class DeviceCapabilitiesFlag(Flag):
348    """Flag representing the device capabilities."""
349
350    #: Device supports Rx queue setup after device started.
351    RUNTIME_RX_QUEUE_SETUP = auto()
352    #: Device supports Tx queue setup after device started.
353    RUNTIME_TX_QUEUE_SETUP = auto()
354    #: Device supports shared Rx queue among ports within Rx domain and switch domain.
355    RXQ_SHARE = auto()
356    #: Device supports keeping flow rules across restart.
357    FLOW_RULE_KEEP = auto()
358    #: Device supports keeping shared flow objects across restart.
359    FLOW_SHARED_OBJECT_KEEP = auto()
360
361    @classmethod
362    def make_parser(cls) -> ParserFn:
363        """Makes a parser function.
364
365        Returns:
366            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
367                parser function that makes an instance of this flag from text.
368        """
369        return TextParser.wrap(
370            TextParser.find_int(r"Device capabilities: (0x[A-Fa-f\d]+)"),
371            cls,
372        )
373
374
375class DeviceErrorHandlingMode(StrEnum):
376    """Enum representing the device error handling mode."""
377
378    #:
379    none = auto()
380    #:
381    passive = auto()
382    #:
383    proactive = auto()
384    #:
385    unknown = auto()
386
387    @classmethod
388    def make_parser(cls) -> ParserFn:
389        """Makes a parser function.
390
391        Returns:
392            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
393                parser function that makes an instance of this enum from text.
394        """
395        return TextParser.wrap(TextParser.find(r"Device error handling mode: (\w+)"), cls)
396
397
398def make_device_private_info_parser() -> ParserFn:
399    """Device private information parser.
400
401    Ensures that we are not parsing invalid device private info output.
402
403    Returns:
404        ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a parser
405            function that parses the device private info from the TestPmd port info output.
406    """
407
408    def _validate(info: str):
409        info = info.strip()
410        if info == "none" or info.startswith("Invalid file") or info.startswith("Failed to dump"):
411            return None
412        return info
413
414    return TextParser.wrap(TextParser.find(r"Device private info:\s+([\s\S]+)"), _validate)
415
416
417class RxQueueState(StrEnum):
418    """RX queue states.
419
420    References:
421        DPDK lib: ``lib/ethdev/rte_ethdev.h``
422        testpmd display function: ``app/test-pmd/config.c:get_queue_state_name()``
423    """
424
425    #:
426    stopped = auto()
427    #:
428    started = auto()
429    #:
430    hairpin = auto()
431    #:
432    unknown = auto()
433
434    @classmethod
435    def make_parser(cls) -> ParserFn:
436        """Makes a parser function.
437
438        Returns:
439            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
440                parser function that makes an instance of this enum from text.
441        """
442        return TextParser.wrap(TextParser.find(r"Rx queue state: ([^\r\n]+)"), cls)
443
444
445@dataclass
446class TestPmdQueueInfo(TextParser):
447    """Dataclass representation of the common parts of the testpmd `show rxq/txq info` commands."""
448
449    #:
450    prefetch_threshold: int = field(metadata=TextParser.find_int(r"prefetch threshold: (\d+)"))
451    #:
452    host_threshold: int = field(metadata=TextParser.find_int(r"host threshold: (\d+)"))
453    #:
454    writeback_threshold: int = field(metadata=TextParser.find_int(r"writeback threshold: (\d+)"))
455    #:
456    free_threshold: int = field(metadata=TextParser.find_int(r"free threshold: (\d+)"))
457    #:
458    deferred_start: bool = field(metadata=TextParser.find("deferred start: on"))
459    #: The number of RXD/TXDs is just the ring size of the queue.
460    ring_size: int = field(metadata=TextParser.find_int(r"Number of (?:RXDs|TXDs): (\d+)"))
461    #:
462    is_queue_started: bool = field(metadata=TextParser.find("queue state: started"))
463    #:
464    burst_mode: str | None = field(
465        default=None, metadata=TextParser.find(r"Burst mode: ([^\r\n]+)")
466    )
467
468
469@dataclass
470class TestPmdTxqInfo(TestPmdQueueInfo):
471    """Representation of testpmd's ``show txq info <port_id> <queue_id>`` command.
472
473    References:
474        testpmd command function: ``app/test-pmd/cmdline.c:cmd_showqueue()``
475        testpmd display function: ``app/test-pmd/config.c:rx_queue_infos_display()``
476    """
477
478    #: Ring size threshold
479    rs_threshold: int | None = field(
480        default=None, metadata=TextParser.find_int(r"TX RS threshold: (\d+)\b")
481    )
482
483
484@dataclass
485class TestPmdRxqInfo(TestPmdQueueInfo):
486    """Representation of testpmd's ``show rxq info <port_id> <queue_id>`` command.
487
488    References:
489        testpmd command function: ``app/test-pmd/cmdline.c:cmd_showqueue()``
490        testpmd display function: ``app/test-pmd/config.c:rx_queue_infos_display()``
491    """
492
493    #: Mempool used by that queue
494    mempool: str | None = field(default=None, metadata=TextParser.find(r"Mempool: ([^\r\n]+)"))
495    #: Drop packets if no descriptors are available
496    drop_packets: bool | None = field(
497        default=None, metadata=TextParser.find(r"RX drop packets: on")
498    )
499    #: Scattered packets Rx enabled
500    scattered_packets: bool | None = field(
501        default=None, metadata=TextParser.find(r"RX scattered packets: on")
502    )
503    #: The state of the queue
504    queue_state: str | None = field(default=None, metadata=RxQueueState.make_parser())
505
506
507@dataclass
508class TestPmdPort(TextParser):
509    """Dataclass representing the result of testpmd's ``show port info`` command."""
510
511    #:
512    id: int = field(metadata=TextParser.find_int(r"Infos for port (\d+)\b"))
513    #:
514    device_name: str = field(metadata=TextParser.find(r"Device name: ([^\r\n]+)"))
515    #:
516    driver_name: str = field(metadata=TextParser.find(r"Driver name: ([^\r\n]+)"))
517    #:
518    socket_id: int = field(metadata=TextParser.find_int(r"Connect to socket: (\d+)"))
519    #:
520    is_link_up: bool = field(metadata=TextParser.find("Link status: up"))
521    #:
522    link_speed: str = field(metadata=TextParser.find(r"Link speed: ([^\r\n]+)"))
523    #:
524    is_link_full_duplex: bool = field(metadata=TextParser.find("Link duplex: full-duplex"))
525    #:
526    is_link_autonegotiated: bool = field(metadata=TextParser.find("Autoneg status: On"))
527    #:
528    is_promiscuous_mode_enabled: bool = field(metadata=TextParser.find("Promiscuous mode: enabled"))
529    #:
530    is_allmulticast_mode_enabled: bool = field(
531        metadata=TextParser.find("Allmulticast mode: enabled")
532    )
533    #: Maximum number of MAC addresses
534    max_mac_addresses_num: int = field(
535        metadata=TextParser.find_int(r"Maximum number of MAC addresses: (\d+)")
536    )
537    #: Maximum configurable length of RX packet
538    max_hash_mac_addresses_num: int = field(
539        metadata=TextParser.find_int(r"Maximum number of MAC addresses of hash filtering: (\d+)")
540    )
541    #: Minimum size of RX buffer
542    min_rx_bufsize: int = field(metadata=TextParser.find_int(r"Minimum size of RX buffer: (\d+)"))
543    #: Maximum configurable length of RX packet
544    max_rx_packet_length: int = field(
545        metadata=TextParser.find_int(r"Maximum configurable length of RX packet: (\d+)")
546    )
547    #: Maximum configurable size of LRO aggregated packet
548    max_lro_packet_size: int = field(
549        metadata=TextParser.find_int(r"Maximum configurable size of LRO aggregated packet: (\d+)")
550    )
551
552    #: Current number of RX queues
553    rx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of RX queues: (\d+)"))
554    #: Max possible RX queues
555    max_rx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible RX queues: (\d+)"))
556    #: Max possible number of RXDs per queue
557    max_queue_rxd_num: int = field(
558        metadata=TextParser.find_int(r"Max possible number of RXDs per queue: (\d+)")
559    )
560    #: Min possible number of RXDs per queue
561    min_queue_rxd_num: int = field(
562        metadata=TextParser.find_int(r"Min possible number of RXDs per queue: (\d+)")
563    )
564    #: RXDs number alignment
565    rxd_alignment_num: int = field(metadata=TextParser.find_int(r"RXDs number alignment: (\d+)"))
566
567    #: Current number of TX queues
568    tx_queues_num: int = field(metadata=TextParser.find_int(r"Current number of TX queues: (\d+)"))
569    #: Max possible TX queues
570    max_tx_queues_num: int = field(metadata=TextParser.find_int(r"Max possible TX queues: (\d+)"))
571    #: Max possible number of TXDs per queue
572    max_queue_txd_num: int = field(
573        metadata=TextParser.find_int(r"Max possible number of TXDs per queue: (\d+)")
574    )
575    #: Min possible number of TXDs per queue
576    min_queue_txd_num: int = field(
577        metadata=TextParser.find_int(r"Min possible number of TXDs per queue: (\d+)")
578    )
579    #: TXDs number alignment
580    txd_alignment_num: int = field(metadata=TextParser.find_int(r"TXDs number alignment: (\d+)"))
581    #: Max segment number per packet
582    max_packet_segment_num: int = field(
583        metadata=TextParser.find_int(r"Max segment number per packet: (\d+)")
584    )
585    #: Max segment number per MTU/TSO
586    max_mtu_segment_num: int = field(
587        metadata=TextParser.find_int(r"Max segment number per MTU\/TSO: (\d+)")
588    )
589
590    #:
591    device_capabilities: DeviceCapabilitiesFlag = field(
592        metadata=DeviceCapabilitiesFlag.make_parser(),
593    )
594    #:
595    device_error_handling_mode: DeviceErrorHandlingMode | None = field(
596        default=None, metadata=DeviceErrorHandlingMode.make_parser()
597    )
598    #:
599    device_private_info: str | None = field(
600        default=None,
601        metadata=make_device_private_info_parser(),
602    )
603
604    #:
605    hash_key_size: int | None = field(
606        default=None, metadata=TextParser.find_int(r"Hash key size in bytes: (\d+)")
607    )
608    #:
609    redirection_table_size: int | None = field(
610        default=None, metadata=TextParser.find_int(r"Redirection table size: (\d+)")
611    )
612    #:
613    supported_rss_offload_flow_types: RSSOffloadTypesFlag = field(
614        default=RSSOffloadTypesFlag(0), metadata=RSSOffloadTypesFlag.make_parser()
615    )
616
617    #:
618    mac_address: str | None = field(
619        default=None, metadata=TextParser.find(r"MAC address: ([A-Fa-f0-9:]+)")
620    )
621    #:
622    fw_version: str | None = field(
623        default=None, metadata=TextParser.find(r"Firmware-version: ([^\r\n]+)")
624    )
625    #:
626    dev_args: str | None = field(default=None, metadata=TextParser.find(r"Devargs: ([^\r\n]+)"))
627    #: Socket id of the memory allocation
628    mem_alloc_socket_id: int | None = field(
629        default=None,
630        metadata=TextParser.find_int(r"memory allocation on the socket: (\d+)"),
631    )
632    #:
633    mtu: int | None = field(default=None, metadata=TextParser.find_int(r"MTU: (\d+)"))
634
635    #:
636    vlan_offload: VLANOffloadFlag | None = field(
637        default=None,
638        metadata=VLANOffloadFlag.make_parser(),
639    )
640
641    #: Maximum size of RX buffer
642    max_rx_bufsize: int | None = field(
643        default=None, metadata=TextParser.find_int(r"Maximum size of RX buffer: (\d+)")
644    )
645    #: Maximum number of VFs
646    max_vfs_num: int | None = field(
647        default=None, metadata=TextParser.find_int(r"Maximum number of VFs: (\d+)")
648    )
649    #: Maximum number of VMDq pools
650    max_vmdq_pools_num: int | None = field(
651        default=None, metadata=TextParser.find_int(r"Maximum number of VMDq pools: (\d+)")
652    )
653
654    #:
655    switch_name: str | None = field(
656        default=None, metadata=TextParser.find(r"Switch name: ([\r\n]+)")
657    )
658    #:
659    switch_domain_id: int | None = field(
660        default=None, metadata=TextParser.find_int(r"Switch domain Id: (\d+)")
661    )
662    #:
663    switch_port_id: int | None = field(
664        default=None, metadata=TextParser.find_int(r"Switch Port Id: (\d+)")
665    )
666    #:
667    switch_rx_domain: int | None = field(
668        default=None, metadata=TextParser.find_int(r"Switch Rx domain: (\d+)")
669    )
670
671
672@dataclass
673class TestPmdPortStats(TextParser):
674    """Port statistics."""
675
676    #:
677    port_id: int = field(metadata=TextParser.find_int(r"NIC statistics for port (\d+)"))
678
679    #:
680    rx_packets: int = field(metadata=TextParser.find_int(r"RX-packets:\s+(\d+)"))
681    #:
682    rx_missed: int = field(metadata=TextParser.find_int(r"RX-missed:\s+(\d+)"))
683    #:
684    rx_bytes: int = field(metadata=TextParser.find_int(r"RX-bytes:\s+(\d+)"))
685    #:
686    rx_errors: int = field(metadata=TextParser.find_int(r"RX-errors:\s+(\d+)"))
687    #:
688    rx_nombuf: int = field(metadata=TextParser.find_int(r"RX-nombuf:\s+(\d+)"))
689
690    #:
691    tx_packets: int = field(metadata=TextParser.find_int(r"TX-packets:\s+(\d+)"))
692    #:
693    tx_errors: int = field(metadata=TextParser.find_int(r"TX-errors:\s+(\d+)"))
694    #:
695    tx_bytes: int = field(metadata=TextParser.find_int(r"TX-bytes:\s+(\d+)"))
696
697    #:
698    rx_pps: int = field(metadata=TextParser.find_int(r"Rx-pps:\s+(\d+)"))
699    #:
700    rx_bps: int = field(metadata=TextParser.find_int(r"Rx-bps:\s+(\d+)"))
701
702    #:
703    tx_pps: int = field(metadata=TextParser.find_int(r"Tx-pps:\s+(\d+)"))
704    #:
705    tx_bps: int = field(metadata=TextParser.find_int(r"Tx-bps:\s+(\d+)"))
706
707
708class PacketOffloadFlag(Flag):
709    """Flag representing the Packet Offload Features Flags in DPDK.
710
711    Values in this class are taken from the definitions in the RTE MBUF core library in DPDK
712    located in ``lib/mbuf/rte_mbuf_core.h``. It is expected that flag values in this class will
713    match the values they are set to in said DPDK library with one exception; all values must be
714    unique. For example, the definitions for unknown checksum flags in ``rte_mbuf_core.h`` are all
715    set to :data:`0`, but it is valuable to distinguish between them in this framework. For this
716    reason flags that are not unique in the DPDK library are set either to values within the
717    RTE_MBUF_F_FIRST_FREE-RTE_MBUF_F_LAST_FREE range for Rx or shifted 61+ bits for Tx.
718
719    References:
720        DPDK lib: ``lib/mbuf/rte_mbuf_core.h``
721    """
722
723    # RX flags
724
725    #: The RX packet is a 802.1q VLAN packet, and the tci has been saved in mbuf->vlan_tci. If the
726    #: flag RTE_MBUF_F_RX_VLAN_STRIPPED is also present, the VLAN header has been stripped from
727    #: mbuf data, else it is still present.
728    RTE_MBUF_F_RX_VLAN = auto()
729
730    #: RX packet with RSS hash result.
731    RTE_MBUF_F_RX_RSS_HASH = auto()
732
733    #: RX packet with FDIR match indicate.
734    RTE_MBUF_F_RX_FDIR = auto()
735
736    #: This flag is set when the outermost IP header checksum is detected as wrong by the hardware.
737    RTE_MBUF_F_RX_OUTER_IP_CKSUM_BAD = 1 << 5
738
739    #: A vlan has been stripped by the hardware and its tci is saved in mbuf->vlan_tci. This can
740    #: only happen if vlan stripping is enabled in the RX configuration of the PMD. When
741    #: RTE_MBUF_F_RX_VLAN_STRIPPED is set, RTE_MBUF_F_RX_VLAN must also be set.
742    RTE_MBUF_F_RX_VLAN_STRIPPED = auto()
743
744    #: No information about the RX IP checksum. Value is 0 in the DPDK library.
745    RTE_MBUF_F_RX_IP_CKSUM_UNKNOWN = 1 << 23
746    #: The IP checksum in the packet is wrong.
747    RTE_MBUF_F_RX_IP_CKSUM_BAD = 1 << 4
748    #: The IP checksum in the packet is valid.
749    RTE_MBUF_F_RX_IP_CKSUM_GOOD = 1 << 7
750    #: The IP checksum is not correct in the packet data, but the integrity of the IP header is
751    #: verified. Value is RTE_MBUF_F_RX_IP_CKSUM_BAD | RTE_MBUF_F_RX_IP_CKSUM_GOOD in the DPDK
752    #: library.
753    RTE_MBUF_F_RX_IP_CKSUM_NONE = 1 << 24
754
755    #: No information about the RX L4 checksum. Value is 0 in the DPDK library.
756    RTE_MBUF_F_RX_L4_CKSUM_UNKNOWN = 1 << 25
757    #: The L4 checksum in the packet is wrong.
758    RTE_MBUF_F_RX_L4_CKSUM_BAD = 1 << 3
759    #: The L4 checksum in the packet is valid.
760    RTE_MBUF_F_RX_L4_CKSUM_GOOD = 1 << 8
761    #: The L4 checksum is not correct in the packet data, but the integrity of the L4 data is
762    #: verified. Value is RTE_MBUF_F_RX_L4_CKSUM_BAD | RTE_MBUF_F_RX_L4_CKSUM_GOOD in the DPDK
763    #: library.
764    RTE_MBUF_F_RX_L4_CKSUM_NONE = 1 << 26
765
766    #: RX IEEE1588 L2 Ethernet PT Packet.
767    RTE_MBUF_F_RX_IEEE1588_PTP = 1 << 9
768    #: RX IEEE1588 L2/L4 timestamped packet.
769    RTE_MBUF_F_RX_IEEE1588_TMST = 1 << 10
770
771    #: FD id reported if FDIR match.
772    RTE_MBUF_F_RX_FDIR_ID = 1 << 13
773    #: Flexible bytes reported if FDIR match.
774    RTE_MBUF_F_RX_FDIR_FLX = 1 << 14
775
776    #: If both RTE_MBUF_F_RX_QINQ_STRIPPED and RTE_MBUF_F_RX_VLAN_STRIPPED are set, the 2 VLANs
777    #: have been stripped by the hardware. If RTE_MBUF_F_RX_QINQ_STRIPPED is set and
778    #: RTE_MBUF_F_RX_VLAN_STRIPPED is unset, only the outer VLAN is removed from packet data.
779    RTE_MBUF_F_RX_QINQ_STRIPPED = auto()
780
781    #: When packets are coalesced by a hardware or virtual driver, this flag can be set in the RX
782    #: mbuf, meaning that the m->tso_segsz field is valid and is set to the segment size of
783    #: original packets.
784    RTE_MBUF_F_RX_LRO = auto()
785
786    #: Indicate that security offload processing was applied on the RX packet.
787    RTE_MBUF_F_RX_SEC_OFFLOAD = 1 << 18
788    #: Indicate that security offload processing failed on the RX packet.
789    RTE_MBUF_F_RX_SEC_OFFLOAD_FAILED = auto()
790
791    #: The RX packet is a double VLAN. If this flag is set, RTE_MBUF_F_RX_VLAN must also be set. If
792    #: the flag RTE_MBUF_F_RX_QINQ_STRIPPED is also present, both VLANs headers have been stripped
793    #: from mbuf data, else they are still present.
794    RTE_MBUF_F_RX_QINQ = auto()
795
796    #: No info about the outer RX L4 checksum. Value is 0 in the DPDK library.
797    RTE_MBUF_F_RX_OUTER_L4_CKSUM_UNKNOWN = 1 << 27
798    #: The outer L4 checksum in the packet is wrong
799    RTE_MBUF_F_RX_OUTER_L4_CKSUM_BAD = 1 << 21
800    #: The outer L4 checksum in the packet is valid
801    RTE_MBUF_F_RX_OUTER_L4_CKSUM_GOOD = 1 << 22
802    #: Invalid outer L4 checksum state. Value is
803    #: RTE_MBUF_F_RX_OUTER_L4_CKSUM_BAD | RTE_MBUF_F_RX_OUTER_L4_CKSUM_GOOD in the DPDK library.
804    RTE_MBUF_F_RX_OUTER_L4_CKSUM_INVALID = 1 << 28
805
806    # TX flags
807
808    #: Outer UDP checksum offload flag. This flag is used for enabling outer UDP checksum in PMD.
809    #: To use outer UDP checksum, the user either needs to enable the following in mbuf:
810    #:
811    #:  a) Fill outer_l2_len and outer_l3_len in mbuf.
812    #:  b) Set the RTE_MBUF_F_TX_OUTER_UDP_CKSUM flag.
813    #:  c) Set the RTE_MBUF_F_TX_OUTER_IPV4 or RTE_MBUF_F_TX_OUTER_IPV6 flag.
814    #:
815    #: Or configure RTE_ETH_TX_OFFLOAD_OUTER_UDP_CKSUM offload flag.
816    RTE_MBUF_F_TX_OUTER_UDP_CKSUM = 1 << 41
817
818    #: UDP Fragmentation Offload flag. This flag is used for enabling UDP fragmentation in SW or in
819    #: HW.
820    RTE_MBUF_F_TX_UDP_SEG = auto()
821
822    #: Request security offload processing on the TX packet. To use Tx security offload, the user
823    #: needs to fill l2_len in mbuf indicating L2 header size and where L3 header starts.
824    #: Similarly, l3_len should also be filled along with ol_flags reflecting current L3 type.
825    RTE_MBUF_F_TX_SEC_OFFLOAD = auto()
826
827    #: Offload the MACsec. This flag must be set by the application to enable this offload feature
828    #: for a packet to be transmitted.
829    RTE_MBUF_F_TX_MACSEC = auto()
830
831    # Bits 45:48 are used for the tunnel type in ``lib/mbuf/rte_mbuf_core.h``, but some are modified
832    # in this Flag to maintain uniqueness. The tunnel type must be specified for TSO or checksum on
833    # the inner part of tunnel packets. These flags can be used with RTE_MBUF_F_TX_TCP_SEG for TSO,
834    # or RTE_MBUF_F_TX_xxx_CKSUM. The mbuf fields for inner and outer header lengths are required:
835    # outer_l2_len, outer_l3_len, l2_len, l3_len, l4_len and tso_segsz for TSO.
836
837    #:
838    RTE_MBUF_F_TX_TUNNEL_VXLAN = 1 << 45
839    #:
840    RTE_MBUF_F_TX_TUNNEL_GRE = 1 << 46
841    #: Value is 3 << 45 in the DPDK library.
842    RTE_MBUF_F_TX_TUNNEL_IPIP = 1 << 61
843    #:
844    RTE_MBUF_F_TX_TUNNEL_GENEVE = 1 << 47
845    #: TX packet with MPLS-in-UDP RFC 7510 header. Value is 5 << 45 in the DPDK library.
846    RTE_MBUF_F_TX_TUNNEL_MPLSINUDP = 1 << 62
847    #: Value is 6 << 45 in the DPDK library.
848    RTE_MBUF_F_TX_TUNNEL_VXLAN_GPE = 1 << 63
849    #: Value is 7 << 45 in the DPDK library.
850    RTE_MBUF_F_TX_TUNNEL_GTP = 1 << 64
851    #:
852    RTE_MBUF_F_TX_TUNNEL_ESP = 1 << 48
853    #: Generic IP encapsulated tunnel type, used for TSO and checksum offload. This can be used for
854    #: tunnels which are not standards or listed above. It is preferred to use specific tunnel
855    #: flags like RTE_MBUF_F_TX_TUNNEL_GRE or RTE_MBUF_F_TX_TUNNEL_IPIP if possible. The ethdev
856    #: must be configured with RTE_ETH_TX_OFFLOAD_IP_TNL_TSO.  Outer and inner checksums are done
857    #: according to the existing flags like RTE_MBUF_F_TX_xxx_CKSUM. Specific tunnel headers that
858    #: contain payload length, sequence id or checksum are not expected to be updated. Value is
859    #: 0xD << 45 in the DPDK library.
860    RTE_MBUF_F_TX_TUNNEL_IP = 1 << 65
861    #: Generic UDP encapsulated tunnel type, used for TSO and checksum offload. UDP tunnel type
862    #: implies outer IP layer. It can be used for tunnels which are not standards or listed above.
863    #: It is preferred to use specific tunnel flags like RTE_MBUF_F_TX_TUNNEL_VXLAN if possible.
864    #: The ethdev must be configured with RTE_ETH_TX_OFFLOAD_UDP_TNL_TSO. Outer and inner checksums
865    #: are done according to the existing flags like RTE_MBUF_F_TX_xxx_CKSUM. Specific tunnel
866    #: headers that contain payload length, sequence id or checksum are not expected to be updated.
867    #: value is 0xE << 45 in the DPDK library.
868    RTE_MBUF_F_TX_TUNNEL_UDP = 1 << 66
869
870    #: Double VLAN insertion (QinQ) request to driver, driver may offload the insertion based on
871    #: device capability. Mbuf 'vlan_tci' & 'vlan_tci_outer' must be valid when this flag is set.
872    RTE_MBUF_F_TX_QINQ = 1 << 49
873
874    #: TCP segmentation offload. To enable this offload feature for a packet to be transmitted on
875    #: hardware supporting TSO:
876    #:
877    #:  - set the RTE_MBUF_F_TX_TCP_SEG flag in mbuf->ol_flags (this flag implies
878    #:      RTE_MBUF_F_TX_TCP_CKSUM)
879    #:  - set the flag RTE_MBUF_F_TX_IPV4 or RTE_MBUF_F_TX_IPV6
880    #:      * if it's IPv4, set the RTE_MBUF_F_TX_IP_CKSUM flag
881    #:  - fill the mbuf offload information: l2_len, l3_len, l4_len, tso_segsz
882    RTE_MBUF_F_TX_TCP_SEG = auto()
883
884    #: TX IEEE1588 packet to timestamp.
885    RTE_MBUF_F_TX_IEEE1588_TMST = auto()
886
887    # Bits 52+53 used for L4 packet type with checksum enabled in ``lib/mbuf/rte_mbuf_core.h`` but
888    # some values must be modified in this framework to maintain uniqueness. To use hardware
889    # L4 checksum offload, the user needs to:
890    #
891    # - fill l2_len and l3_len in mbuf
892    # - set the flags RTE_MBUF_F_TX_TCP_CKSUM, RTE_MBUF_F_TX_SCTP_CKSUM or
893    #   RTE_MBUF_F_TX_UDP_CKSUM
894    # - set the flag RTE_MBUF_F_TX_IPV4 or RTE_MBUF_F_TX_IPV6
895
896    #: Disable L4 cksum of TX pkt. Value is 0 in the DPDK library.
897    RTE_MBUF_F_TX_L4_NO_CKSUM = 1 << 67
898    #: TCP cksum of TX pkt. Computed by NIC.
899    RTE_MBUF_F_TX_TCP_CKSUM = 1 << 52
900    #: SCTP cksum of TX pkt. Computed by NIC.
901    RTE_MBUF_F_TX_SCTP_CKSUM = 1 << 53
902    #: UDP cksum of TX pkt. Computed by NIC. Value is 3 << 52 in the DPDK library.
903    RTE_MBUF_F_TX_UDP_CKSUM = 1 << 68
904
905    #: Offload the IP checksum in the hardware. The flag RTE_MBUF_F_TX_IPV4 should also be set by
906    #: the application, although a PMD will only check RTE_MBUF_F_TX_IP_CKSUM.
907    RTE_MBUF_F_TX_IP_CKSUM = 1 << 54
908
909    #: Packet is IPv4. This flag must be set when using any offload feature (TSO, L3 or L4
910    #: checksum) to tell the NIC that the packet is an IPv4 packet. If the packet is a tunneled
911    #: packet, this flag is related to the inner headers.
912    RTE_MBUF_F_TX_IPV4 = auto()
913    #: Packet is IPv6. This flag must be set when using an offload feature (TSO or L4 checksum) to
914    #: tell the NIC that the packet is an IPv6 packet. If the packet is a tunneled packet, this
915    #: flag is related to the inner headers.
916    RTE_MBUF_F_TX_IPV6 = auto()
917    #: VLAN tag insertion request to driver, driver may offload the insertion based on the device
918    #: capability. mbuf 'vlan_tci' field must be valid when this flag is set.
919    RTE_MBUF_F_TX_VLAN = auto()
920
921    #: Offload the IP checksum of an external header in the hardware. The flag
922    #: RTE_MBUF_F_TX_OUTER_IPV4 should also be set by the application, although a PMD will only
923    #: check RTE_MBUF_F_TX_OUTER_IP_CKSUM.
924    RTE_MBUF_F_TX_OUTER_IP_CKSUM = auto()
925    #: Packet outer header is IPv4. This flag must be set when using any outer offload feature (L3
926    #: or L4 checksum) to tell the NIC that the outer header of the tunneled packet is an IPv4
927    #: packet.
928    RTE_MBUF_F_TX_OUTER_IPV4 = auto()
929    #: Packet outer header is IPv6. This flag must be set when using any outer offload feature (L4
930    #: checksum) to tell the NIC that the outer header of the tunneled packet is an IPv6 packet.
931    RTE_MBUF_F_TX_OUTER_IPV6 = auto()
932
933    @classmethod
934    def from_list_string(cls, names: str) -> Self:
935        """Makes a flag from a whitespace-separated list of names.
936
937        Args:
938            names: a whitespace-separated list containing the members of this flag.
939
940        Returns:
941            An instance of this flag.
942        """
943        flag = cls(0)
944        for name in names.split():
945            flag |= cls.from_str(name)
946        return flag
947
948    @classmethod
949    def from_str(cls, name: str) -> Self:
950        """Makes a flag matching the supplied name.
951
952        Args:
953            name: a valid member of this flag in text
954        Returns:
955            An instance of this flag.
956        """
957        member_name = name.strip().replace("-", "_")
958        return cls[member_name]
959
960    @classmethod
961    def make_parser(cls) -> ParserFn:
962        """Makes a parser function.
963
964        Returns:
965            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
966                parser function that makes an instance of this flag from text.
967        """
968        return TextParser.wrap(
969            TextParser.find(r"ol_flags: ([^\n]+)"),
970            cls.from_list_string,
971        )
972
973
974class RtePTypes(Flag):
975    """Flag representing possible packet types in DPDK verbose output.
976
977    Values in this class are derived from definitions in the RTE MBUF ptype library in DPDK located
978    in ``lib/mbuf/rte_mbuf_ptype.h``. Specifically, the names of values in this class should match
979    the possible return options from the functions ``rte_get_ptype_*_name`` in ``rte_mbuf_ptype.c``.
980
981    References:
982        DPDK lib: ``lib/mbuf/rte_mbuf_ptype.h``
983        DPDK ptype name formatting functions: ``lib/mbuf/rte_mbuf_ptype.c:rte_get_ptype_*_name()``
984    """
985
986    # L2
987    #: Ethernet packet type. This is used for outer packet for tunneling cases.
988    L2_ETHER = auto()
989    #: Ethernet packet type for time sync.
990    L2_ETHER_TIMESYNC = auto()
991    #: ARP (Address Resolution Protocol) packet type.
992    L2_ETHER_ARP = auto()
993    #: LLDP (Link Layer Discovery Protocol) packet type.
994    L2_ETHER_LLDP = auto()
995    #: NSH (Network Service Header) packet type.
996    L2_ETHER_NSH = auto()
997    #: VLAN packet type.
998    L2_ETHER_VLAN = auto()
999    #: QinQ packet type.
1000    L2_ETHER_QINQ = auto()
1001    #: PPPOE packet type.
1002    L2_ETHER_PPPOE = auto()
1003    #: FCoE packet type..
1004    L2_ETHER_FCOE = auto()
1005    #: MPLS packet type.
1006    L2_ETHER_MPLS = auto()
1007    #: No L2 packet information.
1008    L2_UNKNOWN = auto()
1009
1010    # L3
1011    #: IP (Internet Protocol) version 4 packet type. This is used for outer packet for tunneling
1012    #: cases, and does not contain any header option.
1013    L3_IPV4 = auto()
1014    #: IP (Internet Protocol) version 4 packet type. This is used for outer packet for tunneling
1015    #: cases, and contains header options.
1016    L3_IPV4_EXT = auto()
1017    #: IP (Internet Protocol) version 6 packet type. This is used for outer packet for tunneling
1018    #: cases, and does not contain any extension header.
1019    L3_IPV6 = auto()
1020    #: IP (Internet Protocol) version 4 packet type. This is used for outer packet for tunneling
1021    #: cases, and may or maynot contain header options.
1022    L3_IPV4_EXT_UNKNOWN = auto()
1023    #: IP (Internet Protocol) version 6 packet type. This is used for outer packet for tunneling
1024    #: cases, and contains extension headers.
1025    L3_IPV6_EXT = auto()
1026    #: IP (Internet Protocol) version 6 packet type. This is used for outer packet for tunneling
1027    #: cases, and may or maynot contain extension headers.
1028    L3_IPV6_EXT_UNKNOWN = auto()
1029    #: No L3 packet information.
1030    L3_UNKNOWN = auto()
1031
1032    # L4
1033    #: TCP (Transmission Control Protocol) packet type. This is used for outer packet for tunneling
1034    #: cases.
1035    L4_TCP = auto()
1036    #: UDP (User Datagram Protocol) packet type. This is used for outer packet for tunneling cases.
1037    L4_UDP = auto()
1038    #: Fragmented IP (Internet Protocol) packet type. This is used for outer packet for tunneling
1039    #: cases and refers to those packets of any IP types which can be recognized as fragmented. A
1040    #: fragmented packet cannot be recognized as any other L4 types (RTE_PTYPE_L4_TCP,
1041    #: RTE_PTYPE_L4_UDP, RTE_PTYPE_L4_SCTP, RTE_PTYPE_L4_ICMP, RTE_PTYPE_L4_NONFRAG).
1042    L4_FRAG = auto()
1043    #: SCTP (Stream Control Transmission Protocol) packet type. This is used for outer packet for
1044    #: tunneling cases.
1045    L4_SCTP = auto()
1046    #: ICMP (Internet Control Message Protocol) packet type. This is used for outer packet for
1047    #: tunneling cases.
1048    L4_ICMP = auto()
1049    #: Non-fragmented IP (Internet Protocol) packet type. This is used for outer packet for
1050    #: tunneling cases and refers to those packets of any IP types, that cannot be recognized as
1051    #: any of the above L4 types (RTE_PTYPE_L4_TCP, RTE_PTYPE_L4_UDP, RTE_PTYPE_L4_FRAG,
1052    #: RTE_PTYPE_L4_SCTP, RTE_PTYPE_L4_ICMP).
1053    L4_NONFRAG = auto()
1054    #: IGMP (Internet Group Management Protocol) packet type.
1055    L4_IGMP = auto()
1056    #: No L4 packet information.
1057    L4_UNKNOWN = auto()
1058
1059    # Tunnel
1060    #: IP (Internet Protocol) in IP (Internet Protocol) tunneling packet type.
1061    TUNNEL_IP = auto()
1062    #: GRE (Generic Routing Encapsulation) tunneling packet type.
1063    TUNNEL_GRE = auto()
1064    #: VXLAN (Virtual eXtensible Local Area Network) tunneling packet type.
1065    TUNNEL_VXLAN = auto()
1066    #: NVGRE (Network Virtualization using Generic Routing Encapsulation) tunneling packet type.
1067    TUNNEL_NVGRE = auto()
1068    #: GENEVE (Generic Network Virtualization Encapsulation) tunneling packet type.
1069    TUNNEL_GENEVE = auto()
1070    #: Tunneling packet type of Teredo, VXLAN (Virtual eXtensible Local Area Network) or GRE
1071    #: (Generic Routing Encapsulation) could be recognized as this packet type, if they can not be
1072    #: recognized independently as of hardware capability.
1073    TUNNEL_GRENAT = auto()
1074    #: GTP-C (GPRS Tunnelling Protocol) control tunneling packet type.
1075    TUNNEL_GTPC = auto()
1076    #: GTP-U (GPRS Tunnelling Protocol) user data tunneling packet type.
1077    TUNNEL_GTPU = auto()
1078    #: ESP (IP Encapsulating Security Payload) tunneling packet type.
1079    TUNNEL_ESP = auto()
1080    #: L2TP (Layer 2 Tunneling Protocol) tunneling packet type.
1081    TUNNEL_L2TP = auto()
1082    #: VXLAN-GPE (VXLAN Generic Protocol Extension) tunneling packet type.
1083    TUNNEL_VXLAN_GPE = auto()
1084    #: MPLS-in-UDP tunneling packet type (RFC 7510).
1085    TUNNEL_MPLS_IN_UDP = auto()
1086    #: MPLS-in-GRE tunneling packet type (RFC 4023).
1087    TUNNEL_MPLS_IN_GRE = auto()
1088    #: No tunnel information found on the packet.
1089    TUNNEL_UNKNOWN = auto()
1090
1091    # Inner L2
1092    #: Ethernet packet type. This is used for inner packet type only.
1093    INNER_L2_ETHER = auto()
1094    #: Ethernet packet type with VLAN (Virtual Local Area Network) tag.
1095    INNER_L2_ETHER_VLAN = auto()
1096    #: QinQ packet type.
1097    INNER_L2_ETHER_QINQ = auto()
1098    #: No inner L2 information found on the packet.
1099    INNER_L2_UNKNOWN = auto()
1100
1101    # Inner L3
1102    #: IP (Internet Protocol) version 4 packet type. This is used for inner packet only, and does
1103    #: not contain any header option.
1104    INNER_L3_IPV4 = auto()
1105    #: IP (Internet Protocol) version 4 packet type. This is used for inner packet only, and
1106    #: contains header options.
1107    INNER_L3_IPV4_EXT = auto()
1108    #: IP (Internet Protocol) version 6 packet type. This is used for inner packet only, and does
1109    #: not contain any extension header.
1110    INNER_L3_IPV6 = auto()
1111    #: IP (Internet Protocol) version 4 packet type. This is used for inner packet only, and may or
1112    #: may not contain header options.
1113    INNER_L3_IPV4_EXT_UNKNOWN = auto()
1114    #: IP (Internet Protocol) version 6 packet type. This is used for inner packet only, and
1115    #: contains extension headers.
1116    INNER_L3_IPV6_EXT = auto()
1117    #: IP (Internet Protocol) version 6 packet type. This is used for inner packet only, and may or
1118    #: may not contain extension headers.
1119    INNER_L3_IPV6_EXT_UNKNOWN = auto()
1120    #: No inner L3 information found on the packet.
1121    INNER_L3_UNKNOWN = auto()
1122
1123    # Inner L4
1124    #: TCP (Transmission Control Protocol) packet type. This is used for inner packet only.
1125    INNER_L4_TCP = auto()
1126    #: UDP (User Datagram Protocol) packet type. This is used for inner packet only.
1127    INNER_L4_UDP = auto()
1128    #: Fragmented IP (Internet Protocol) packet type. This is used for inner packet only, and may
1129    #: or maynot have a layer 4 packet.
1130    INNER_L4_FRAG = auto()
1131    #: SCTP (Stream Control Transmission Protocol) packet type. This is used for inner packet only.
1132    INNER_L4_SCTP = auto()
1133    #: ICMP (Internet Control Message Protocol) packet type. This is used for inner packet only.
1134    INNER_L4_ICMP = auto()
1135    #: Non-fragmented IP (Internet Protocol) packet type. It is used for inner packet only, and may
1136    #: or may not have other unknown layer 4 packet types.
1137    INNER_L4_NONFRAG = auto()
1138    #: No inner L4 information found on the packet.
1139    INNER_L4_UNKNOWN = auto()
1140
1141    @classmethod
1142    def from_list_string(cls, names: str) -> Self:
1143        """Makes a flag from a whitespace-separated list of names.
1144
1145        Args:
1146            names: a whitespace-separated list containing the members of this flag.
1147
1148        Returns:
1149            An instance of this flag.
1150        """
1151        flag = cls(0)
1152        for name in names.split():
1153            flag |= cls.from_str(name)
1154        return flag
1155
1156    @classmethod
1157    def from_str(cls, name: str) -> Self:
1158        """Makes a flag matching the supplied name.
1159
1160        Args:
1161            name: a valid member of this flag in text
1162        Returns:
1163            An instance of this flag.
1164        """
1165        member_name = name.strip().replace("-", "_")
1166        return cls[member_name]
1167
1168    @classmethod
1169    def make_parser(cls, hw: bool) -> ParserFn:
1170        """Makes a parser function.
1171
1172        Args:
1173            hw: Whether to make a parser for hardware ptypes or software ptypes. If :data:`True`,
1174                hardware ptypes will be collected, otherwise software pytpes will.
1175
1176        Returns:
1177            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
1178                parser function that makes an instance of this flag from text.
1179        """
1180        return TextParser.wrap(
1181            TextParser.find(f"{'hw' if hw else 'sw'} ptype: ([^-]+)"),
1182            cls.from_list_string,
1183        )
1184
1185
1186@dataclass
1187class TestPmdVerbosePacket(TextParser):
1188    """Packet information provided by verbose output in Testpmd.
1189
1190    This dataclass expects that packet information be prepended with the starting line of packet
1191    bursts. Specifically, the line that reads "port X/queue Y: sent/received Z packets".
1192    """
1193
1194    #: ID of the port that handled the packet.
1195    port_id: int = field(metadata=TextParser.find_int(r"port (\d+)/queue \d+"))
1196    #: ID of the queue that handled the packet.
1197    queue_id: int = field(metadata=TextParser.find_int(r"port \d+/queue (\d+)"))
1198    #: Whether the packet was received or sent by the queue/port.
1199    was_received: bool = field(metadata=TextParser.find(r"received \d+ packets"))
1200    #:
1201    src_mac: str = field(metadata=TextParser.find(f"src=({REGEX_FOR_MAC_ADDRESS})"))
1202    #:
1203    dst_mac: str = field(metadata=TextParser.find(f"dst=({REGEX_FOR_MAC_ADDRESS})"))
1204    #: Memory pool the packet was handled on.
1205    pool: str = field(metadata=TextParser.find(r"pool=(\S+)"))
1206    #: Packet type in hex.
1207    p_type: int = field(metadata=TextParser.find_int(r"type=(0x[a-fA-F\d]+)"))
1208    #:
1209    length: int = field(metadata=TextParser.find_int(r"length=(\d+)"))
1210    #: Number of segments in the packet.
1211    nb_segs: int = field(metadata=TextParser.find_int(r"nb_segs=(\d+)"))
1212    #: Hardware packet type.
1213    hw_ptype: RtePTypes = field(metadata=RtePTypes.make_parser(hw=True))
1214    #: Software packet type.
1215    sw_ptype: RtePTypes = field(metadata=RtePTypes.make_parser(hw=False))
1216    #:
1217    l2_len: int = field(metadata=TextParser.find_int(r"l2_len=(\d+)"))
1218    #:
1219    ol_flags: PacketOffloadFlag = field(metadata=PacketOffloadFlag.make_parser())
1220    #: RSS hash of the packet in hex.
1221    rss_hash: int | None = field(
1222        default=None, metadata=TextParser.find_int(r"RSS hash=(0x[a-fA-F\d]+)")
1223    )
1224    #: RSS queue that handled the packet in hex.
1225    rss_queue: int | None = field(
1226        default=None, metadata=TextParser.find_int(r"RSS queue=(0x[a-fA-F\d]+)")
1227    )
1228    #:
1229    l3_len: int | None = field(default=None, metadata=TextParser.find_int(r"l3_len=(\d+)"))
1230    #:
1231    l4_len: int | None = field(default=None, metadata=TextParser.find_int(r"l4_len=(\d+)"))
1232
1233
1234class RxOffloadCapability(Flag):
1235    """Rx offload capabilities of a device.
1236
1237    The flags are taken from ``lib/ethdev/rte_ethdev.h``.
1238    They're prefixed with ``RTE_ETH_RX_OFFLOAD`` in ``lib/ethdev/rte_ethdev.h``
1239    instead of ``RX_OFFLOAD``, which is what testpmd changes the prefix to.
1240    The values are not contiguous, so the correspondence is preserved
1241    by specifying concrete values interspersed between auto() values.
1242
1243    The ``RX_OFFLOAD`` prefix has been preserved so that the same flag names can be used
1244    in :class:`NicCapability`. The prefix is needed in :class:`NicCapability` since there's
1245    no other qualifier which would sufficiently distinguish it from other capabilities.
1246
1247    References:
1248        DPDK lib: ``lib/ethdev/rte_ethdev.h``
1249        testpmd display function: ``app/test-pmd/cmdline.c:print_rx_offloads()``
1250    """
1251
1252    #:
1253    RX_OFFLOAD_VLAN_STRIP = auto()
1254    #: Device supports L3 checksum offload.
1255    RX_OFFLOAD_IPV4_CKSUM = auto()
1256    #: Device supports L4 checksum offload.
1257    RX_OFFLOAD_UDP_CKSUM = auto()
1258    #: Device supports L4 checksum offload.
1259    RX_OFFLOAD_TCP_CKSUM = auto()
1260    #: Device supports Large Receive Offload.
1261    RX_OFFLOAD_TCP_LRO = auto()
1262    #: Device supports QinQ (queue in queue) offload.
1263    RX_OFFLOAD_QINQ_STRIP = auto()
1264    #: Device supports inner packet L3 checksum.
1265    RX_OFFLOAD_OUTER_IPV4_CKSUM = auto()
1266    #: Device supports MACsec.
1267    RX_OFFLOAD_MACSEC_STRIP = auto()
1268    #: Device supports filtering of a VLAN Tag identifier.
1269    RX_OFFLOAD_VLAN_FILTER = 1 << 9
1270    #: Device supports VLAN offload.
1271    RX_OFFLOAD_VLAN_EXTEND = auto()
1272    #: Device supports receiving segmented mbufs.
1273    RX_OFFLOAD_SCATTER = 1 << 13
1274    #: Device supports Timestamp.
1275    RX_OFFLOAD_TIMESTAMP = auto()
1276    #: Device supports crypto processing while packet is received in NIC.
1277    RX_OFFLOAD_SECURITY = auto()
1278    #: Device supports CRC stripping.
1279    RX_OFFLOAD_KEEP_CRC = auto()
1280    #: Device supports L4 checksum offload.
1281    RX_OFFLOAD_SCTP_CKSUM = auto()
1282    #: Device supports inner packet L4 checksum.
1283    RX_OFFLOAD_OUTER_UDP_CKSUM = auto()
1284    #: Device supports RSS hashing.
1285    RX_OFFLOAD_RSS_HASH = auto()
1286    #: Device supports
1287    RX_OFFLOAD_BUFFER_SPLIT = auto()
1288    #: Device supports all checksum capabilities.
1289    RX_OFFLOAD_CHECKSUM = RX_OFFLOAD_IPV4_CKSUM | RX_OFFLOAD_UDP_CKSUM | RX_OFFLOAD_TCP_CKSUM
1290    #: Device supports all VLAN capabilities.
1291    RX_OFFLOAD_VLAN = (
1292        RX_OFFLOAD_VLAN_STRIP
1293        | RX_OFFLOAD_VLAN_FILTER
1294        | RX_OFFLOAD_VLAN_EXTEND
1295        | RX_OFFLOAD_QINQ_STRIP
1296    )
1297
1298    @classmethod
1299    def from_string(cls, line: str) -> Self:
1300        """Make an instance from a string containing the flag names separated with a space.
1301
1302        Args:
1303            line: The line to parse.
1304
1305        Returns:
1306            A new instance containing all found flags.
1307        """
1308        flag = cls(0)
1309        for flag_name in line.split():
1310            flag |= cls[f"RX_OFFLOAD_{flag_name}"]
1311        return flag
1312
1313    @classmethod
1314    def make_parser(cls, per_port: bool) -> ParserFn:
1315        """Make a parser function.
1316
1317        Args:
1318            per_port: If :data:`True`, will return capabilities per port. If :data:`False`,
1319                will return capabilities per queue.
1320
1321        Returns:
1322            ParserFn: A dictionary for the `dataclasses.field` metadata argument containing a
1323                parser function that makes an instance of this flag from text.
1324        """
1325        granularity = "Port" if per_port else "Queue"
1326        return TextParser.wrap(
1327            TextParser.find(rf"Per {granularity}\s+:(.*)$", re.MULTILINE),
1328            cls.from_string,
1329        )
1330
1331
1332@dataclass
1333class RxOffloadCapabilities(TextParser):
1334    """The result of testpmd's ``show port <port_id> rx_offload capabilities`` command.
1335
1336    References:
1337        testpmd command function: ``app/test-pmd/cmdline.c:cmd_rx_offload_get_capa()``
1338        testpmd display function: ``app/test-pmd/cmdline.c:cmd_rx_offload_get_capa_parsed()``
1339    """
1340
1341    #:
1342    port_id: int = field(
1343        metadata=TextParser.find_int(r"Rx Offloading Capabilities of port (\d+) :")
1344    )
1345    #: Per-queue Rx offload capabilities.
1346    per_queue: RxOffloadCapability = field(metadata=RxOffloadCapability.make_parser(False))
1347    #: Capabilities other than per-queue Rx offload capabilities.
1348    per_port: RxOffloadCapability = field(metadata=RxOffloadCapability.make_parser(True))
1349
1350
1351def requires_stopped_ports(func: TestPmdShellMethod) -> TestPmdShellMethod:
1352    """Decorator for :class:`TestPmdShell` commands methods that require stopped ports.
1353
1354    If the decorated method is called while the ports are started, then these are stopped before
1355    continuing.
1356
1357    Args:
1358        func: The :class:`TestPmdShell` method to decorate.
1359    """
1360
1361    @functools.wraps(func)
1362    def _wrapper(self: "TestPmdShell", *args: P.args, **kwargs: P.kwargs):
1363        if self.ports_started:
1364            self._logger.debug("Ports need to be stopped to continue.")
1365            self.stop_all_ports()
1366
1367        return func(self, *args, **kwargs)
1368
1369    return _wrapper
1370
1371
1372def requires_started_ports(func: TestPmdShellMethod) -> TestPmdShellMethod:
1373    """Decorator for :class:`TestPmdShell` commands methods that require started ports.
1374
1375    If the decorated method is called while the ports are stopped, then these are started before
1376    continuing.
1377
1378    Args:
1379        func: The :class:`TestPmdShell` method to decorate.
1380    """
1381
1382    @functools.wraps(func)
1383    def _wrapper(self: "TestPmdShell", *args: P.args, **kwargs: P.kwargs):
1384        if not self.ports_started:
1385            self._logger.debug("Ports need to be started to continue.")
1386            self.start_all_ports()
1387
1388        return func(self, *args, **kwargs)
1389
1390    return _wrapper
1391
1392
1393def add_remove_mtu(mtu: int = 1500) -> Callable[[TestPmdShellMethod], TestPmdShellMethod]:
1394    """Configure MTU to `mtu` on all ports, run the decorated function, then revert.
1395
1396    Args:
1397        mtu: The MTU to configure all ports on.
1398
1399    Returns:
1400        The method decorated with setting and reverting MTU.
1401    """
1402
1403    def decorator(func: TestPmdShellMethod) -> TestPmdShellMethod:
1404        @functools.wraps(func)
1405        def wrapper(self: "TestPmdShell", *args: P.args, **kwargs: P.kwargs):
1406            original_mtu = self.ports[0].mtu
1407            self.set_port_mtu_all(mtu=mtu, verify=False)
1408            retval = func(self, *args, **kwargs)
1409            self.set_port_mtu_all(original_mtu if original_mtu else 1500, verify=False)
1410            return retval
1411
1412        return wrapper
1413
1414    return decorator
1415
1416
1417class TestPmdShell(DPDKShell):
1418    """Testpmd interactive shell.
1419
1420    The testpmd shell users should never use
1421    the :meth:`~.interactive_shell.InteractiveShell.send_command` method directly, but rather
1422    call specialized methods. If there isn't one that satisfies a need, it should be added.
1423
1424    Attributes:
1425        ports_started: Indicates whether the ports are started.
1426    """
1427
1428    _app_params: TestPmdParams
1429    _ports: list[TestPmdPort] | None
1430
1431    #: The path to the testpmd executable.
1432    path: ClassVar[PurePath] = PurePath("app", "dpdk-testpmd")
1433
1434    #: The testpmd's prompt.
1435    _default_prompt: ClassVar[str] = "testpmd>"
1436
1437    #: This forces the prompt to appear after sending a command.
1438    _command_extra_chars: ClassVar[str] = "\n"
1439
1440    ports_started: bool
1441
1442    def __init__(
1443        self,
1444        node: SutNode,
1445        privileged: bool = True,
1446        timeout: float = SETTINGS.timeout,
1447        lcore_filter_specifier: LogicalCoreCount | LogicalCoreList = LogicalCoreCount(),
1448        ascending_cores: bool = True,
1449        append_prefix_timestamp: bool = True,
1450        name: str | None = None,
1451        **app_params: Unpack[TestPmdParamsDict],
1452    ) -> None:
1453        """Overrides :meth:`~.dpdk_shell.DPDKShell.__init__`. Changes app_params to kwargs."""
1454        super().__init__(
1455            node,
1456            privileged,
1457            timeout,
1458            lcore_filter_specifier,
1459            ascending_cores,
1460            append_prefix_timestamp,
1461            TestPmdParams(**app_params),
1462            name,
1463        )
1464        self.ports_started = not self._app_params.disable_device_start
1465        self._ports = None
1466
1467    @property
1468    def ports(self) -> list[TestPmdPort]:
1469        """The ports of the instance.
1470
1471        This caches the ports returned by :meth:`show_port_info_all`.
1472        To force an update of port information, execute :meth:`show_port_info_all` or
1473        :meth:`show_port_info`.
1474
1475        Returns: The list of known testpmd ports.
1476        """
1477        if self._ports is None:
1478            return self.show_port_info_all()
1479        return self._ports
1480
1481    @requires_started_ports
1482    def start(self, verify: bool = True) -> None:
1483        """Start packet forwarding with the current configuration.
1484
1485        Args:
1486            verify: If :data:`True` , a second start command will be sent in an attempt to verify
1487                packet forwarding started as expected.
1488
1489        Raises:
1490            InteractiveCommandExecutionError: If `verify` is :data:`True` and forwarding fails to
1491                start or ports fail to come up.
1492        """
1493        self.send_command("start")
1494        if verify:
1495            # If forwarding was already started, sending "start" again should tell us
1496            start_cmd_output = self.send_command("start")
1497            if "Packet forwarding already started" not in start_cmd_output:
1498                self._logger.debug(f"Failed to start packet forwarding: \n{start_cmd_output}")
1499                raise InteractiveCommandExecutionError("Testpmd failed to start packet forwarding.")
1500
1501            number_of_ports = len(self._app_params.allowed_ports or [])
1502            for port_id in range(number_of_ports):
1503                if not self.wait_link_status_up(port_id):
1504                    raise InteractiveCommandExecutionError(
1505                        "Not all ports came up after starting packet forwarding in testpmd."
1506                    )
1507
1508    def stop(self, verify: bool = True) -> str:
1509        """Stop packet forwarding.
1510
1511        Args:
1512            verify: If :data:`True` , the output of the stop command is scanned to verify that
1513                forwarding was stopped successfully or not started. If neither is found, it is
1514                considered an error.
1515
1516        Raises:
1517            InteractiveCommandExecutionError: If `verify` is :data:`True` and the command to stop
1518                forwarding results in an error.
1519
1520        Returns:
1521            Output gathered from the stop command and all other preceding logs in the buffer. This
1522            output is most often used to view forwarding statistics that are displayed when this
1523            command is sent as well as any verbose packet information that hasn't been consumed
1524            prior to calling this method.
1525        """
1526        stop_cmd_output = self.send_command("stop")
1527        if verify:
1528            if (
1529                "Done." not in stop_cmd_output
1530                and "Packet forwarding not started" not in stop_cmd_output
1531            ):
1532                self._logger.debug(f"Failed to stop packet forwarding: \n{stop_cmd_output}")
1533                raise InteractiveCommandExecutionError("Testpmd failed to stop packet forwarding.")
1534        return stop_cmd_output
1535
1536    def get_devices(self) -> list[TestPmdDevice]:
1537        """Get a list of device names that are known to testpmd.
1538
1539        Uses the device info listed in testpmd and then parses the output.
1540
1541        Returns:
1542            A list of devices.
1543        """
1544        dev_info: str = self.send_command("show device info all")
1545        dev_list: list[TestPmdDevice] = []
1546        for line in dev_info.split("\n"):
1547            if "device name:" in line.lower():
1548                dev_list.append(TestPmdDevice(line))
1549        return dev_list
1550
1551    def wait_link_status_up(self, port_id: int, timeout=SETTINGS.timeout) -> bool:
1552        """Wait until the link status on the given port is "up".
1553
1554        Arguments:
1555            port_id: Port to check the link status on.
1556            timeout: Time to wait for the link to come up. The default value for this
1557                argument may be modified using the :option:`--timeout` command-line argument
1558                or the :envvar:`DTS_TIMEOUT` environment variable.
1559
1560        Returns:
1561            Whether the link came up in time or not.
1562        """
1563        time_to_stop = time.time() + timeout
1564        port_info: str = ""
1565        while time.time() < time_to_stop:
1566            port_info = self.send_command(f"show port info {port_id}")
1567            if "Link status: up" in port_info:
1568                break
1569            time.sleep(0.5)
1570        else:
1571            self._logger.error(f"The link for port {port_id} did not come up in the given timeout.")
1572        return "Link status: up" in port_info
1573
1574    def set_forward_mode(self, mode: SimpleForwardingModes, verify: bool = True):
1575        """Set packet forwarding mode.
1576
1577        Args:
1578            mode: The forwarding mode to use.
1579            verify: If :data:`True` the output of the command will be scanned in an attempt to
1580                verify that the forwarding mode was set to `mode` properly.
1581
1582        Raises:
1583            InteractiveCommandExecutionError: If `verify` is :data:`True` and the forwarding mode
1584                fails to update.
1585        """
1586        set_fwd_output = self.send_command(f"set fwd {mode.value}")
1587        if verify:
1588            if f"Set {mode.value} packet forwarding mode" not in set_fwd_output:
1589                self._logger.debug(f"Failed to set fwd mode to {mode.value}:\n{set_fwd_output}")
1590                raise InteractiveCommandExecutionError(
1591                    f"Test pmd failed to set fwd mode to {mode.value}"
1592                )
1593
1594    def stop_all_ports(self, verify: bool = True) -> None:
1595        """Stops all the ports.
1596
1597        Args:
1598            verify: If :data:`True`, the output of the command will be checked for a successful
1599                execution.
1600
1601        Raises:
1602            InteractiveCommandExecutionError: If `verify` is :data:`True` and the ports were not
1603                stopped successfully.
1604        """
1605        self._logger.debug("Stopping all the ports...")
1606        output = self.send_command("port stop all")
1607        if verify and not output.strip().endswith("Done"):
1608            raise InteractiveCommandExecutionError("Ports were not stopped successfully.")
1609
1610        self.ports_started = False
1611
1612    def start_all_ports(self, verify: bool = True) -> None:
1613        """Starts all the ports.
1614
1615        Args:
1616            verify: If :data:`True`, the output of the command will be checked for a successful
1617                execution.
1618
1619        Raises:
1620            InteractiveCommandExecutionError: If `verify` is :data:`True` and the ports were not
1621                started successfully.
1622        """
1623        self._logger.debug("Starting all the ports...")
1624        output = self.send_command("port start all")
1625        if verify and not output.strip().endswith("Done"):
1626            raise InteractiveCommandExecutionError("Ports were not started successfully.")
1627
1628        self.ports_started = True
1629
1630    @requires_stopped_ports
1631    def set_ports_queues(self, number_of: int) -> None:
1632        """Sets the number of queues per port.
1633
1634        Args:
1635            number_of: The number of RX/TX queues to create per port.
1636
1637        Raises:
1638            InternalError: If `number_of` is invalid.
1639        """
1640        if number_of < 1:
1641            raise InternalError("The number of queues must be positive and non-zero.")
1642
1643        self.send_command(f"port config all rxq {number_of}")
1644        self.send_command(f"port config all txq {number_of}")
1645
1646    def show_port_info_all(self) -> list[TestPmdPort]:
1647        """Returns the information of all the ports.
1648
1649        Returns:
1650            list[TestPmdPort]: A list containing all the ports information as `TestPmdPort`.
1651        """
1652        output = self.send_command("show port info all")
1653
1654        # Sample output of the "all" command looks like:
1655        #
1656        # <start>
1657        #
1658        #   ********************* Infos for port 0 *********************
1659        #   Key: value
1660        #
1661        #   ********************* Infos for port 1 *********************
1662        #   Key: value
1663        # <end>
1664        #
1665        # Takes advantage of the double new line in between ports as end delimiter. But we need to
1666        # artificially add a new line at the end to pick up the last port. Because commands are
1667        # executed on a pseudo-terminal created by paramiko on the remote node, lines end with CRLF.
1668        # Therefore we also need to take the carriage return into account.
1669        iter = re.finditer(r"\*{21}.*?[\r\n]{4}", output + "\r\n", re.S)
1670        self._ports = [TestPmdPort.parse(block.group(0)) for block in iter]
1671        return self._ports
1672
1673    def show_port_info(self, port_id: int) -> TestPmdPort:
1674        """Returns the given port information.
1675
1676        Args:
1677            port_id: The port ID to gather information for.
1678
1679        Raises:
1680            InteractiveCommandExecutionError: If `port_id` is invalid.
1681
1682        Returns:
1683            TestPmdPort: An instance of `TestPmdPort` containing the given port's information.
1684        """
1685        output = self.send_command(f"show port info {port_id}", skip_first_line=True)
1686        if output.startswith("Invalid port"):
1687            raise InteractiveCommandExecutionError("invalid port given")
1688
1689        port = TestPmdPort.parse(output)
1690        self._update_port(port)
1691        return port
1692
1693    def _update_port(self, port: TestPmdPort) -> None:
1694        if self._ports:
1695            self._ports = [
1696                existing_port if port.id != existing_port.id else port
1697                for existing_port in self._ports
1698            ]
1699
1700    def set_mac_addr(self, port_id: int, mac_address: str, add: bool, verify: bool = True) -> None:
1701        """Add or remove a mac address on a given port's Allowlist.
1702
1703        Args:
1704            port_id: The port ID the mac address is set on.
1705            mac_address: The mac address to be added to or removed from the specified port.
1706            add: If :data:`True`, add the specified mac address. If :data:`False`, remove specified
1707                mac address.
1708            verify: If :data:'True', assert that the 'mac_addr' operation was successful. If
1709                :data:'False', run the command and skip this assertion.
1710
1711        Raises:
1712            InteractiveCommandExecutionError: If the set mac address operation fails.
1713        """
1714        mac_cmd = "add" if add else "remove"
1715        output = self.send_command(f"mac_addr {mac_cmd} {port_id} {mac_address}")
1716        if "Bad arguments" in output:
1717            self._logger.debug("Invalid argument provided to mac_addr")
1718            raise InteractiveCommandExecutionError("Invalid argument provided")
1719
1720        if verify:
1721            if "mac_addr_cmd error:" in output:
1722                self._logger.debug(f"Failed to {mac_cmd} {mac_address} on port {port_id}")
1723                raise InteractiveCommandExecutionError(
1724                    f"Failed to {mac_cmd} {mac_address} on port {port_id} \n{output}"
1725                )
1726
1727    def set_multicast_mac_addr(
1728        self, port_id: int, multi_addr: str, add: bool, verify: bool = True
1729    ) -> None:
1730        """Add or remove multicast mac address to a specified port's allow list.
1731
1732        Args:
1733            port_id: The port ID the multicast address is set on.
1734            multi_addr: The multicast address to be added or removed from the filter.
1735            add: If :data:'True', add the specified multicast address to the port filter.
1736                If :data:'False', remove the specified multicast address from the port filter.
1737            verify: If :data:'True', assert that the 'mcast_addr' operations was successful.
1738                If :data:'False', execute the 'mcast_addr' operation and skip the assertion.
1739
1740        Raises:
1741            InteractiveCommandExecutionError: If either the 'add' or 'remove' operations fails.
1742        """
1743        mcast_cmd = "add" if add else "remove"
1744        output = self.send_command(f"mcast_addr {mcast_cmd} {port_id} {multi_addr}")
1745        if "Bad arguments" in output:
1746            self._logger.debug("Invalid arguments provided to mcast_addr")
1747            raise InteractiveCommandExecutionError("Invalid argument provided")
1748
1749        if verify:
1750            if (
1751                "Invalid multicast_addr" in output
1752                or f'multicast address {"already" if add else "not"} filtered by port' in output
1753            ):
1754                self._logger.debug(f"Failed to {mcast_cmd} {multi_addr} on port {port_id}")
1755                raise InteractiveCommandExecutionError(
1756                    f"Failed to {mcast_cmd} {multi_addr} on port {port_id} \n{output}"
1757                )
1758
1759    def show_port_stats_all(self) -> list[TestPmdPortStats]:
1760        """Returns the statistics of all the ports.
1761
1762        Returns:
1763            list[TestPmdPortStats]: A list containing all the ports stats as `TestPmdPortStats`.
1764        """
1765        output = self.send_command("show port stats all")
1766
1767        # Sample output of the "all" command looks like:
1768        #
1769        #   ########### NIC statistics for port 0 ###########
1770        #   values...
1771        #   #################################################
1772        #
1773        #   ########### NIC statistics for port 1 ###########
1774        #   values...
1775        #   #################################################
1776        #
1777        iter = re.finditer(r"(^  #*.+#*$[^#]+)^  #*\r$", output, re.MULTILINE)
1778        return [TestPmdPortStats.parse(block.group(1)) for block in iter]
1779
1780    def show_port_stats(self, port_id: int) -> TestPmdPortStats:
1781        """Returns the given port statistics.
1782
1783        Args:
1784            port_id: The port ID to gather information for.
1785
1786        Raises:
1787            InteractiveCommandExecutionError: If `port_id` is invalid.
1788
1789        Returns:
1790            TestPmdPortStats: An instance of `TestPmdPortStats` containing the given port's stats.
1791        """
1792        output = self.send_command(f"show port stats {port_id}", skip_first_line=True)
1793        if output.startswith("Invalid port"):
1794            raise InteractiveCommandExecutionError("invalid port given")
1795
1796        return TestPmdPortStats.parse(output)
1797
1798    @requires_stopped_ports
1799    def csum_set_hw(
1800        self, layers: ChecksumOffloadOptions, port_id: int, verify: bool = True
1801    ) -> None:
1802        """Enables hardware checksum offloading on the specified layer.
1803
1804        Args:
1805            layers: The layer/layers that checksum offloading should be enabled on.
1806            port_id: The port number to enable checksum offloading on, should be within 0-32.
1807            verify: If :data:`True` the output of the command will be scanned in an attempt to
1808                verify that checksum offloading was enabled on the port.
1809
1810        Raises:
1811            InteractiveCommandExecutionError: If checksum offload is not enabled successfully.
1812        """
1813        for name, offload in ChecksumOffloadOptions.__members__.items():
1814            if offload in layers:
1815                name = name.replace("_", "-")
1816                csum_output = self.send_command(f"csum set {name} hw {port_id}")
1817                if verify:
1818                    if (
1819                        "Bad arguments" in csum_output
1820                        or f"Please stop port {port_id} first" in csum_output
1821                        or f"checksum offload is not supported by port {port_id}" in csum_output
1822                    ):
1823                        self._logger.debug(f"Csum set hw error:\n{csum_output}")
1824                        raise InteractiveCommandExecutionError(
1825                            f"Failed to set csum hw {name} mode on port {port_id}"
1826                        )
1827                success = False
1828                if f"{name} checksum offload is hw" in csum_output.lower():
1829                    success = True
1830                if not success and verify:
1831                    self._logger.debug(
1832                        f"Failed to set csum hw mode on port {port_id}:\n{csum_output}"
1833                    )
1834                    raise InteractiveCommandExecutionError(
1835                        f"""Failed to set csum hw mode on port
1836                                                           {port_id}:\n{csum_output}"""
1837                    )
1838
1839    @requires_stopped_ports
1840    def set_port_mtu(self, port_id: int, mtu: int, verify: bool = True) -> None:
1841        """Change the MTU of a port using testpmd.
1842
1843        Some PMDs require that the port be stopped before changing the MTU, and it does no harm to
1844        stop the port before configuring in cases where it isn't required, so ports are stopped
1845        prior to changing their MTU.
1846
1847        Args:
1848            port_id: ID of the port to adjust the MTU on.
1849            mtu: Desired value for the MTU to be set to.
1850            verify: If `verify` is :data:`True` then the output will be scanned in an attempt to
1851                verify that the mtu was properly set on the port. Defaults to :data:`True`.
1852
1853        Raises:
1854            InteractiveCommandExecutionError: If `verify` is :data:`True` and the MTU was not
1855                properly updated on the port matching `port_id`.
1856        """
1857        set_mtu_output = self.send_command(f"port config mtu {port_id} {mtu}")
1858        if verify and (f"MTU: {mtu}" not in self.send_command(f"show port info {port_id}")):
1859            self._logger.debug(
1860                f"Failed to set mtu to {mtu} on port {port_id}." f" Output was:\n{set_mtu_output}"
1861            )
1862            raise InteractiveCommandExecutionError(
1863                f"Test pmd failed to update mtu of port {port_id} to {mtu}"
1864            )
1865
1866    def set_port_mtu_all(self, mtu: int, verify: bool = True) -> None:
1867        """Change the MTU of all ports using testpmd.
1868
1869        Runs :meth:`set_port_mtu` for every port that testpmd is aware of.
1870
1871        Args:
1872            mtu: Desired value for the MTU to be set to.
1873            verify: Whether to verify that setting the MTU on each port was successful or not.
1874                Defaults to :data:`True`.
1875
1876        Raises:
1877            InteractiveCommandExecutionError: If `verify` is :data:`True` and the MTU was not
1878                properly updated on at least one port.
1879        """
1880        for port in self.ports:
1881            self.set_port_mtu(port.id, mtu, verify)
1882
1883    @staticmethod
1884    def extract_verbose_output(output: str) -> list[TestPmdVerbosePacket]:
1885        """Extract the verbose information present in given testpmd output.
1886
1887        This method extracts sections of verbose output that begin with the line
1888        "port X/queue Y: sent/received Z packets" and end with the ol_flags of a packet.
1889
1890        Args:
1891            output: Testpmd output that contains verbose information
1892
1893        Returns:
1894            List of parsed packet information gathered from verbose information in `output`.
1895        """
1896        out: list[TestPmdVerbosePacket] = []
1897        prev_header: str = ""
1898        iter = re.finditer(
1899            r"(?P<HEADER>(?:port \d+/queue \d+: (?:received|sent) \d+ packets)?)\s*"
1900            r"(?P<PACKET>src=[\w\s=:-]+?ol_flags: [\w ]+)",
1901            output,
1902        )
1903        for match in iter:
1904            if match.group("HEADER"):
1905                prev_header = match.group("HEADER")
1906            out.append(TestPmdVerbosePacket.parse(f"{prev_header}\n{match.group('PACKET')}"))
1907        return out
1908
1909    def set_vlan_filter(self, port: int, enable: bool, verify: bool = True) -> None:
1910        """Set vlan filter on.
1911
1912        Args:
1913            port: The port number to enable VLAN filter on.
1914            enable: Enable the filter on `port` if :data:`True`, otherwise disable it.
1915            verify: If :data:`True`, the output of the command and show port info
1916                is scanned to verify that vlan filtering was set successfully.
1917
1918        Raises:
1919            InteractiveCommandExecutionError: If `verify` is :data:`True` and the filter
1920                fails to update.
1921        """
1922        filter_cmd_output = self.send_command(f"vlan set filter {'on' if enable else 'off'} {port}")
1923        if verify:
1924            vlan_settings = self.show_port_info(port_id=port).vlan_offload
1925            if enable ^ (vlan_settings is not None and VLANOffloadFlag.FILTER in vlan_settings):
1926                self._logger.debug(
1927                    f"""Failed to {'enable' if enable else 'disable'}
1928                                   filter on port {port}: \n{filter_cmd_output}"""
1929                )
1930                raise InteractiveCommandExecutionError(
1931                    f"""Failed to {'enable' if enable else 'disable'}
1932                    filter on port {port}"""
1933                )
1934
1935    def rx_vlan(self, vlan: int, port: int, add: bool, verify: bool = True) -> None:
1936        """Add specified vlan tag to the filter list on a port. Requires vlan filter to be on.
1937
1938        Args:
1939            vlan: The vlan tag to add, should be within 1-1005.
1940            port: The port number to add the tag on.
1941            add: Adds the tag if :data:`True`, otherwise removes the tag.
1942            verify: If :data:`True`, the output of the command is scanned to verify that
1943                the vlan tag was added to the filter list on the specified port.
1944
1945        Raises:
1946            InteractiveCommandExecutionError: If `verify` is :data:`True` and the tag
1947                is not added.
1948        """
1949        rx_cmd_output = self.send_command(f"rx_vlan {'add' if add else 'rm'} {vlan} {port}")
1950        if verify:
1951            if (
1952                "VLAN-filtering disabled" in rx_cmd_output
1953                or "Invalid vlan_id" in rx_cmd_output
1954                or "Bad arguments" in rx_cmd_output
1955            ):
1956                self._logger.debug(
1957                    f"""Failed to {'add' if add else 'remove'} tag {vlan}
1958                    port {port}: \n{rx_cmd_output}"""
1959                )
1960                raise InteractiveCommandExecutionError(
1961                    f"Testpmd failed to {'add' if add else 'remove'} tag {vlan} on port {port}."
1962                )
1963
1964    def set_vlan_strip(self, port: int, enable: bool, verify: bool = True) -> None:
1965        """Enable or disable vlan stripping on the specified port.
1966
1967        Args:
1968            port: The port number to use.
1969            enable: If :data:`True`, will turn vlan stripping on, otherwise will turn off.
1970            verify: If :data:`True`, the output of the command and show port info
1971                is scanned to verify that vlan stripping was enabled on the specified port.
1972
1973        Raises:
1974            InteractiveCommandExecutionError: If `verify` is :data:`True` and stripping
1975                fails to update.
1976        """
1977        strip_cmd_output = self.send_command(f"vlan set strip {'on' if enable else 'off'} {port}")
1978        if verify:
1979            vlan_settings = self.show_port_info(port_id=port).vlan_offload
1980            if enable ^ (vlan_settings is not None and VLANOffloadFlag.STRIP in vlan_settings):
1981                self._logger.debug(
1982                    f"""Failed to set strip {'on' if enable else 'off'}
1983                    port {port}: \n{strip_cmd_output}"""
1984                )
1985                raise InteractiveCommandExecutionError(
1986                    f"Testpmd failed to set strip {'on' if enable else 'off'} port {port}."
1987                )
1988
1989    @requires_stopped_ports
1990    def tx_vlan_set(
1991        self, port: int, enable: bool, vlan: int | None = None, verify: bool = True
1992    ) -> None:
1993        """Set hardware insertion of vlan tags in packets sent on a port.
1994
1995        Args:
1996            port: The port number to use.
1997            enable: Sets vlan tag insertion if :data:`True`, and resets if :data:`False`.
1998            vlan: The vlan tag to insert if enable is :data:`True`.
1999            verify: If :data:`True`, the output of the command is scanned to verify that
2000                vlan insertion was enabled on the specified port.
2001
2002        Raises:
2003            InteractiveCommandExecutionError: If `verify` is :data:`True` and the insertion
2004                tag is not set.
2005        """
2006        if enable:
2007            tx_vlan_cmd_output = self.send_command(f"tx_vlan set {port} {vlan}")
2008            if verify:
2009                if (
2010                    "Please stop port" in tx_vlan_cmd_output
2011                    or "Invalid vlan_id" in tx_vlan_cmd_output
2012                    or "Invalid port" in tx_vlan_cmd_output
2013                ):
2014                    self._logger.debug(
2015                        f"Failed to set vlan tag {vlan} on port {port}:\n{tx_vlan_cmd_output}"
2016                    )
2017                    raise InteractiveCommandExecutionError(
2018                        f"Testpmd failed to set vlan insertion tag {vlan} on port {port}."
2019                    )
2020        else:
2021            tx_vlan_cmd_output = self.send_command(f"tx_vlan reset {port}")
2022            if verify:
2023                if "Please stop port" in tx_vlan_cmd_output or "Invalid port" in tx_vlan_cmd_output:
2024                    self._logger.debug(
2025                        f"Failed to reset vlan insertion on port {port}: \n{tx_vlan_cmd_output}"
2026                    )
2027                    raise InteractiveCommandExecutionError(
2028                        f"Testpmd failed to reset vlan insertion on port {port}."
2029                    )
2030
2031    def set_promisc(self, port: int, enable: bool, verify: bool = True) -> None:
2032        """Enable or disable promiscuous mode for the specified port.
2033
2034        Args:
2035            port: Port number to use.
2036            enable: If :data:`True`, turn promiscuous mode on, otherwise turn off.
2037            verify: If :data:`True` an additional command will be sent to verify that
2038                promiscuous mode is properly set. Defaults to :data:`True`.
2039
2040        Raises:
2041            InteractiveCommandExecutionError: If `verify` is :data:`True` and promiscuous mode
2042                is not correctly set.
2043        """
2044        promisc_cmd_output = self.send_command(f"set promisc {port} {'on' if enable else 'off'}")
2045        if verify:
2046            stats = self.show_port_info(port_id=port)
2047            if enable ^ stats.is_promiscuous_mode_enabled:
2048                self._logger.debug(
2049                    f"Failed to set promiscuous mode on port {port}: \n{promisc_cmd_output}"
2050                )
2051                raise InteractiveCommandExecutionError(
2052                    f"Testpmd failed to set promiscuous mode on port {port}."
2053                )
2054
2055    def set_verbose(self, level: int, verify: bool = True) -> None:
2056        """Set debug verbosity level.
2057
2058        Args:
2059            level: 0 - silent except for error
2060                1 - fully verbose except for Tx packets
2061                2 - fully verbose except for Rx packets
2062                >2 - fully verbose
2063            verify: If :data:`True` the command output will be scanned to verify that verbose level
2064                is properly set. Defaults to :data:`True`.
2065
2066        Raises:
2067            InteractiveCommandExecutionError: If `verify` is :data:`True` and verbose level
2068            is not correctly set.
2069        """
2070        verbose_cmd_output = self.send_command(f"set verbose {level}")
2071        if verify:
2072            if "Change verbose level" not in verbose_cmd_output:
2073                self._logger.debug(
2074                    f"Failed to set verbose level to {level}: \n{verbose_cmd_output}"
2075                )
2076                raise InteractiveCommandExecutionError(
2077                    f"Testpmd failed to set verbose level to {level}."
2078                )
2079
2080    def _close(self) -> None:
2081        """Overrides :meth:`~.interactive_shell.close`."""
2082        self.stop()
2083        self.send_command("quit", "Bye...")
2084        return super()._close()
2085
2086    """
2087    ====== Capability retrieval methods ======
2088    """
2089
2090    def get_capabilities_rx_offload(
2091        self,
2092        supported_capabilities: MutableSet["NicCapability"],
2093        unsupported_capabilities: MutableSet["NicCapability"],
2094    ) -> None:
2095        """Get all rx offload capabilities and divide them into supported and unsupported.
2096
2097        Args:
2098            supported_capabilities: Supported capabilities will be added to this set.
2099            unsupported_capabilities: Unsupported capabilities will be added to this set.
2100        """
2101        self._logger.debug("Getting rx offload capabilities.")
2102        command = f"show port {self.ports[0].id} rx_offload capabilities"
2103        rx_offload_capabilities_out = self.send_command(command)
2104        rx_offload_capabilities = RxOffloadCapabilities.parse(rx_offload_capabilities_out)
2105        self._update_capabilities_from_flag(
2106            supported_capabilities,
2107            unsupported_capabilities,
2108            RxOffloadCapability,
2109            rx_offload_capabilities.per_port | rx_offload_capabilities.per_queue,
2110        )
2111
2112    def get_port_queue_info(
2113        self, port_id: int, queue_id: int, is_rx_queue: bool
2114    ) -> TestPmdQueueInfo:
2115        """Returns the current state of the specified queue."""
2116        command = f"show {'rxq' if is_rx_queue else 'txq'} info {port_id} {queue_id}"
2117        queue_info = TestPmdQueueInfo.parse(self.send_command(command))
2118        return queue_info
2119
2120    def setup_port_queue(self, port_id: int, queue_id: int, is_rx_queue: bool) -> None:
2121        """Setup a given queue on a port.
2122
2123        This functionality cannot be verified because the setup action only takes effect when the
2124        queue is started.
2125
2126        Args:
2127            port_id: ID of the port where the queue resides.
2128            queue_id: ID of the queue to setup.
2129            is_rx_queue: Type of queue to setup. If :data:`True` an RX queue will be setup,
2130                otherwise a TX queue will be setup.
2131        """
2132        self.send_command(f"port {port_id} {'rxq' if is_rx_queue else 'txq'} {queue_id} setup")
2133
2134    def stop_port_queue(
2135        self, port_id: int, queue_id: int, is_rx_queue: bool, verify: bool = True
2136    ) -> None:
2137        """Stops a given queue on a port.
2138
2139        Args:
2140            port_id: ID of the port that the queue belongs to.
2141            queue_id: ID of the queue to stop.
2142            is_rx_queue: Type of queue to stop. If :data:`True` an RX queue will be stopped,
2143                otherwise a TX queue will be stopped.
2144            verify: If :data:`True` an additional command will be sent to verify the queue stopped.
2145                Defaults to :data:`True`.
2146
2147        Raises:
2148            InteractiveCommandExecutionError: If `verify` is :data:`True` and the queue fails to
2149                stop.
2150        """
2151        port_type = "rxq" if is_rx_queue else "txq"
2152        stop_cmd_output = self.send_command(f"port {port_id} {port_type} {queue_id} stop")
2153        if verify:
2154            queue_started = self.get_port_queue_info(
2155                port_id, queue_id, is_rx_queue
2156            ).is_queue_started
2157            if queue_started:
2158                self._logger.debug(
2159                    f"Failed to stop {port_type} {queue_id} on port {port_id}:\n{stop_cmd_output}"
2160                )
2161                raise InteractiveCommandExecutionError(
2162                    f"Test pmd failed to stop {port_type} {queue_id} on port {port_id}"
2163                )
2164
2165    def start_port_queue(
2166        self, port_id: int, queue_id: int, is_rx_queue: bool, verify: bool = True
2167    ) -> None:
2168        """Starts a given queue on a port.
2169
2170        First sets up the port queue, then starts it.
2171
2172        Args:
2173            port_id: ID of the port that the queue belongs to.
2174            queue_id: ID of the queue to start.
2175            is_rx_queue: Type of queue to start. If :data:`True` an RX queue will be started,
2176                otherwise a TX queue will be started.
2177            verify: if :data:`True` an additional command will be sent to verify that the queue was
2178                started. Defaults to :data:`True`.
2179
2180        Raises:
2181            InteractiveCommandExecutionError: If `verify` is :data:`True` and the queue fails to
2182                start.
2183        """
2184        port_type = "rxq" if is_rx_queue else "txq"
2185        self.setup_port_queue(port_id, queue_id, is_rx_queue)
2186        start_cmd_output = self.send_command(f"port {port_id} {port_type} {queue_id} start")
2187        if verify:
2188            queue_started = self.get_port_queue_info(
2189                port_id, queue_id, is_rx_queue
2190            ).is_queue_started
2191            if not queue_started:
2192                self._logger.debug(
2193                    f"Failed to start {port_type} {queue_id} on port {port_id}:\n{start_cmd_output}"
2194                )
2195                raise InteractiveCommandExecutionError(
2196                    f"Test pmd failed to start {port_type} {queue_id} on port {port_id}"
2197                )
2198
2199    def get_queue_ring_size(self, port_id: int, queue_id: int, is_rx_queue: bool) -> int:
2200        """Returns the current size of the ring on the specified queue."""
2201        command = f"show {'rxq' if is_rx_queue else 'txq'} info {port_id} {queue_id}"
2202        queue_info = TestPmdQueueInfo.parse(self.send_command(command))
2203        return queue_info.ring_size
2204
2205    def set_queue_ring_size(
2206        self,
2207        port_id: int,
2208        queue_id: int,
2209        size: int,
2210        is_rx_queue: bool,
2211        verify: bool = True,
2212    ) -> None:
2213        """Update the ring size of an Rx/Tx queue on a given port.
2214
2215        Queue is setup after setting the ring size so that the queue info reflects this change and
2216        it can be verified.
2217
2218        Args:
2219            port_id: The port that the queue resides on.
2220            queue_id: The ID of the queue on the port.
2221            size: The size to update the ring size to.
2222            is_rx_queue: Whether to modify an RX or TX queue. If :data:`True` an RX queue will be
2223                updated, otherwise a TX queue will be updated.
2224            verify: If :data:`True` an additional command will be sent to check the ring size of
2225                the queue in an attempt to validate that the size was changes properly.
2226
2227        Raises:
2228            InteractiveCommandExecutionError: If `verify` is :data:`True` and there is a failure
2229                when updating ring size.
2230        """
2231        queue_type = "rxq" if is_rx_queue else "txq"
2232        self.send_command(f"port config {port_id} {queue_type} {queue_id} ring_size {size}")
2233        self.setup_port_queue(port_id, queue_id, is_rx_queue)
2234        if verify:
2235            curr_ring_size = self.get_queue_ring_size(port_id, queue_id, is_rx_queue)
2236            if curr_ring_size != size:
2237                self._logger.debug(
2238                    f"Failed up update ring size of queue {queue_id} on port {port_id}. Current"
2239                    f" ring size is {curr_ring_size}."
2240                )
2241                raise InteractiveCommandExecutionError(
2242                    f"Failed to update ring size of queue {queue_id} on port {port_id}"
2243                )
2244
2245    def _update_capabilities_from_flag(
2246        self,
2247        supported_capabilities: MutableSet["NicCapability"],
2248        unsupported_capabilities: MutableSet["NicCapability"],
2249        flag_class: type[Flag],
2250        supported_flags: Flag,
2251    ) -> None:
2252        """Divide all flags from `flag_class` into supported and unsupported."""
2253        for flag in flag_class:
2254            if flag in supported_flags:
2255                supported_capabilities.add(NicCapability[str(flag.name)])
2256            else:
2257                unsupported_capabilities.add(NicCapability[str(flag.name)])
2258
2259    @requires_started_ports
2260    def get_capabilities_rxq_info(
2261        self,
2262        supported_capabilities: MutableSet["NicCapability"],
2263        unsupported_capabilities: MutableSet["NicCapability"],
2264    ) -> None:
2265        """Get all rxq capabilities and divide them into supported and unsupported.
2266
2267        Args:
2268            supported_capabilities: Supported capabilities will be added to this set.
2269            unsupported_capabilities: Unsupported capabilities will be added to this set.
2270        """
2271        self._logger.debug("Getting rxq capabilities.")
2272        command = f"show rxq info {self.ports[0].id} 0"
2273        rxq_info = TestPmdRxqInfo.parse(self.send_command(command))
2274        if rxq_info.scattered_packets:
2275            supported_capabilities.add(NicCapability.SCATTERED_RX_ENABLED)
2276        else:
2277            unsupported_capabilities.add(NicCapability.SCATTERED_RX_ENABLED)
2278
2279    def get_capabilities_show_port_info(
2280        self,
2281        supported_capabilities: MutableSet["NicCapability"],
2282        unsupported_capabilities: MutableSet["NicCapability"],
2283    ) -> None:
2284        """Get all capabilities from show port info and divide them into supported and unsupported.
2285
2286        Args:
2287            supported_capabilities: Supported capabilities will be added to this set.
2288            unsupported_capabilities: Unsupported capabilities will be added to this set.
2289        """
2290        self._update_capabilities_from_flag(
2291            supported_capabilities,
2292            unsupported_capabilities,
2293            DeviceCapabilitiesFlag,
2294            self.ports[0].device_capabilities,
2295        )
2296
2297    def get_capabilities_mcast_filtering(
2298        self,
2299        supported_capabilities: MutableSet["NicCapability"],
2300        unsupported_capabilities: MutableSet["NicCapability"],
2301    ) -> None:
2302        """Get multicast filtering capability from mcast_addr add and check for testpmd error code.
2303
2304        Args:
2305            supported_capabilities: Supported capabilities will be added to this set.
2306            unsupported_capabilities: Unsupported capabilities will be added to this set.
2307        """
2308        self._logger.debug("Getting mcast filter capabilities.")
2309        command = f"mcast_addr add {self.ports[0].id} 01:00:5E:00:00:00"
2310        output = self.send_command(command)
2311        if "diag=-95" in output:
2312            unsupported_capabilities.add(NicCapability.MCAST_FILTERING)
2313        else:
2314            supported_capabilities.add(NicCapability.MCAST_FILTERING)
2315            command = str.replace(command, "add", "remove", 1)
2316            self.send_command(command)
2317
2318
2319class NicCapability(NoAliasEnum):
2320    """A mapping between capability names and the associated :class:`TestPmdShell` methods.
2321
2322    The :class:`TestPmdShell` capability checking method executes the command that checks
2323    whether the capability is supported.
2324    A decorator may optionally be added to the method that will add and remove configuration
2325    that's necessary to retrieve the capability support status.
2326    The Enum members may be assigned the method itself or a tuple of
2327    (capability_checking_method, decorator_function).
2328
2329    The signature of each :class:`TestPmdShell` capability checking method must be::
2330
2331        fn(self, supported_capabilities: MutableSet, unsupported_capabilities: MutableSet) -> None
2332
2333    The capability checking method must get whether a capability is supported or not
2334    from a testpmd command. If multiple capabilities can be obtained from a testpmd command,
2335    each should be obtained in the method. These capabilities should then
2336    be added to `supported_capabilities` or `unsupported_capabilities` based on their support.
2337
2338    The two dictionaries are shared across all capability discovery function calls in a given
2339    test run so that we don't call the same function multiple times. For example, when we find
2340    :attr:`SCATTERED_RX_ENABLED` in :meth:`TestPmdShell.get_capabilities_rxq_info`,
2341    we don't go looking for it again if a different test case also needs it.
2342    """
2343
2344    #: Scattered packets Rx enabled
2345    SCATTERED_RX_ENABLED: TestPmdShellNicCapability = (
2346        TestPmdShell.get_capabilities_rxq_info,
2347        add_remove_mtu(9000),
2348    )
2349    #:
2350    RX_OFFLOAD_VLAN_STRIP: TestPmdShellCapabilityMethod = functools.partial(
2351        TestPmdShell.get_capabilities_rx_offload
2352    )
2353    #: Device supports L3 checksum offload.
2354    RX_OFFLOAD_IPV4_CKSUM: TestPmdShellCapabilityMethod = functools.partial(
2355        TestPmdShell.get_capabilities_rx_offload
2356    )
2357    #: Device supports L4 checksum offload.
2358    RX_OFFLOAD_UDP_CKSUM: TestPmdShellCapabilityMethod = functools.partial(
2359        TestPmdShell.get_capabilities_rx_offload
2360    )
2361    #: Device supports L4 checksum offload.
2362    RX_OFFLOAD_TCP_CKSUM: TestPmdShellCapabilityMethod = functools.partial(
2363        TestPmdShell.get_capabilities_rx_offload
2364    )
2365    #: Device supports Large Receive Offload.
2366    RX_OFFLOAD_TCP_LRO: TestPmdShellCapabilityMethod = functools.partial(
2367        TestPmdShell.get_capabilities_rx_offload
2368    )
2369    #: Device supports QinQ (queue in queue) offload.
2370    RX_OFFLOAD_QINQ_STRIP: TestPmdShellCapabilityMethod = functools.partial(
2371        TestPmdShell.get_capabilities_rx_offload
2372    )
2373    #: Device supports inner packet L3 checksum.
2374    RX_OFFLOAD_OUTER_IPV4_CKSUM: TestPmdShellCapabilityMethod = functools.partial(
2375        TestPmdShell.get_capabilities_rx_offload
2376    )
2377    #: Device supports MACsec.
2378    RX_OFFLOAD_MACSEC_STRIP: TestPmdShellCapabilityMethod = functools.partial(
2379        TestPmdShell.get_capabilities_rx_offload
2380    )
2381    #: Device supports filtering of a VLAN Tag identifier.
2382    RX_OFFLOAD_VLAN_FILTER: TestPmdShellCapabilityMethod = functools.partial(
2383        TestPmdShell.get_capabilities_rx_offload
2384    )
2385    #: Device supports VLAN offload.
2386    RX_OFFLOAD_VLAN_EXTEND: TestPmdShellCapabilityMethod = functools.partial(
2387        TestPmdShell.get_capabilities_rx_offload
2388    )
2389    #: Device supports receiving segmented mbufs.
2390    RX_OFFLOAD_SCATTER: TestPmdShellCapabilityMethod = functools.partial(
2391        TestPmdShell.get_capabilities_rx_offload
2392    )
2393    #: Device supports Timestamp.
2394    RX_OFFLOAD_TIMESTAMP: TestPmdShellCapabilityMethod = functools.partial(
2395        TestPmdShell.get_capabilities_rx_offload
2396    )
2397    #: Device supports crypto processing while packet is received in NIC.
2398    RX_OFFLOAD_SECURITY: TestPmdShellCapabilityMethod = functools.partial(
2399        TestPmdShell.get_capabilities_rx_offload
2400    )
2401    #: Device supports CRC stripping.
2402    RX_OFFLOAD_KEEP_CRC: TestPmdShellCapabilityMethod = functools.partial(
2403        TestPmdShell.get_capabilities_rx_offload
2404    )
2405    #: Device supports L4 checksum offload.
2406    RX_OFFLOAD_SCTP_CKSUM: TestPmdShellCapabilityMethod = functools.partial(
2407        TestPmdShell.get_capabilities_rx_offload
2408    )
2409    #: Device supports inner packet L4 checksum.
2410    RX_OFFLOAD_OUTER_UDP_CKSUM: TestPmdShellCapabilityMethod = functools.partial(
2411        TestPmdShell.get_capabilities_rx_offload
2412    )
2413    #: Device supports RSS hashing.
2414    RX_OFFLOAD_RSS_HASH: TestPmdShellCapabilityMethod = functools.partial(
2415        TestPmdShell.get_capabilities_rx_offload
2416    )
2417    #: Device supports scatter Rx packets to segmented mbufs.
2418    RX_OFFLOAD_BUFFER_SPLIT: TestPmdShellCapabilityMethod = functools.partial(
2419        TestPmdShell.get_capabilities_rx_offload
2420    )
2421    #: Device supports all checksum capabilities.
2422    RX_OFFLOAD_CHECKSUM: TestPmdShellCapabilityMethod = functools.partial(
2423        TestPmdShell.get_capabilities_rx_offload
2424    )
2425    #: Device supports all VLAN capabilities.
2426    RX_OFFLOAD_VLAN: TestPmdShellCapabilityMethod = functools.partial(
2427        TestPmdShell.get_capabilities_rx_offload
2428    )
2429    #: Device supports Rx queue setup after device started.
2430    RUNTIME_RX_QUEUE_SETUP: TestPmdShellCapabilityMethod = functools.partial(
2431        TestPmdShell.get_capabilities_show_port_info
2432    )
2433    #: Device supports Tx queue setup after device started.
2434    RUNTIME_TX_QUEUE_SETUP: TestPmdShellCapabilityMethod = functools.partial(
2435        TestPmdShell.get_capabilities_show_port_info
2436    )
2437    #: Device supports shared Rx queue among ports within Rx domain and switch domain.
2438    RXQ_SHARE: TestPmdShellCapabilityMethod = functools.partial(
2439        TestPmdShell.get_capabilities_show_port_info
2440    )
2441    #: Device supports keeping flow rules across restart.
2442    FLOW_RULE_KEEP: TestPmdShellCapabilityMethod = functools.partial(
2443        TestPmdShell.get_capabilities_show_port_info
2444    )
2445    #: Device supports keeping shared flow objects across restart.
2446    FLOW_SHARED_OBJECT_KEEP: TestPmdShellCapabilityMethod = functools.partial(
2447        TestPmdShell.get_capabilities_show_port_info
2448    )
2449    #: Device supports multicast address filtering.
2450    MCAST_FILTERING: TestPmdShellCapabilityMethod = functools.partial(
2451        TestPmdShell.get_capabilities_mcast_filtering
2452    )
2453
2454    def __call__(
2455        self,
2456        testpmd_shell: TestPmdShell,
2457        supported_capabilities: MutableSet[Self],
2458        unsupported_capabilities: MutableSet[Self],
2459    ) -> None:
2460        """Execute the associated capability retrieval function.
2461
2462        Args:
2463            testpmd_shell: :class:`TestPmdShell` object to which the function will be bound to.
2464            supported_capabilities: The dictionary storing the supported capabilities
2465                of a given test run.
2466            unsupported_capabilities: The dictionary storing the unsupported capabilities
2467                of a given test run.
2468        """
2469        self.value(testpmd_shell, supported_capabilities, unsupported_capabilities)
2470