xref: /dpdk/dts/framework/testbed_model/traffic_generator/capturing_traffic_generator.py (revision 0bf6796a8ca351fb79a62841c0afcd35576fcb9a)
1# SPDX-License-Identifier: BSD-3-Clause
2# Copyright(c) 2022 University of New Hampshire
3# Copyright(c) 2023 PANTHEON.tech s.r.o.
4
5"""Traffic generator that can capture packets.
6
7In functional testing, we need to interrogate received packets to check their validity.
8The module defines the interface common to all traffic generators capable of capturing
9traffic.
10"""
11
12import uuid
13from abc import abstractmethod
14from dataclasses import dataclass
15
16import scapy.utils  # type: ignore[import-untyped]
17from scapy.packet import Packet  # type: ignore[import-untyped]
18
19from framework.settings import SETTINGS
20from framework.testbed_model.port import Port
21from framework.utils import get_packet_summaries
22
23from .traffic_generator import TrafficGenerator
24
25
26def _get_default_capture_name() -> str:
27    return str(uuid.uuid4())
28
29
30@dataclass(slots=True, frozen=True)
31class PacketFilteringConfig:
32    """The supported filtering options for :class:`CapturingTrafficGenerator`.
33
34    Attributes:
35        no_lldp: If :data:`True`, LLDP packets will be filtered out when capturing.
36        no_arp: If :data:`True`, ARP packets will be filtered out when capturing.
37    """
38
39    no_lldp: bool = True
40    no_arp: bool = True
41
42
43class CapturingTrafficGenerator(TrafficGenerator):
44    """Capture packets after sending traffic.
45
46    The intermediary interface which enables a packet generator to declare that it can capture
47    packets and return them to the user.
48
49    Similarly to :class:`~.traffic_generator.TrafficGenerator`, this class exposes
50    the public methods specific to capturing traffic generators and defines a private method
51    that must implement the traffic generation and capturing logic in subclasses.
52
53    The methods of capturing traffic generators obey the following workflow:
54
55        1. send packets
56        2. capture packets
57        3. write the capture to a .pcap file
58        4. return the received packets
59    """
60
61    @property
62    def is_capturing(self) -> bool:
63        """This traffic generator can capture traffic."""
64        return True
65
66    def send_packets_and_capture(
67        self,
68        packets: list[Packet],
69        send_port: Port,
70        receive_port: Port,
71        filter_config: PacketFilteringConfig,
72        duration: float,
73        capture_name: str = _get_default_capture_name(),
74    ) -> list[Packet]:
75        """Send `packets` and capture received traffic.
76
77        Send `packets` on `send_port` and then return all traffic captured
78        on `receive_port` for the given `duration`.
79
80        The captured traffic is recorded in the `capture_name`.pcap file. The target directory
81        can be configured with the :option:`--output-dir` command line argument or
82        the :envvar:`DTS_OUTPUT_DIR` environment variable.
83
84        Args:
85            packets: The packets to send.
86            send_port: The egress port on the TG node.
87            receive_port: The ingress port in the TG node.
88            filter_config: Filters to apply when capturing packets.
89            duration: Capture traffic for this amount of time after sending the packets.
90            capture_name: The name of the .pcap file where to store the capture.
91
92        Returns:
93             The received packets. May be empty if no packets are captured.
94        """
95        self._logger.debug(get_packet_summaries(packets))
96        self._logger.debug(
97            f"Sending packet on {send_port.logical_name}, receiving on {receive_port.logical_name}."
98        )
99        received_packets = self._send_packets_and_capture(
100            packets,
101            send_port,
102            receive_port,
103            filter_config,
104            duration,
105        )
106
107        self._logger.debug(f"Received packets: {get_packet_summaries(received_packets)}")
108        self._write_capture_from_packets(capture_name, received_packets)
109        return received_packets
110
111    @abstractmethod
112    def _send_packets_and_capture(
113        self,
114        packets: list[Packet],
115        send_port: Port,
116        receive_port: Port,
117        filter_config: PacketFilteringConfig,
118        duration: float,
119    ) -> list[Packet]:
120        """The implementation of :method:`send_packets_and_capture`.
121
122        The subclasses must implement this method which sends `packets` on `send_port`
123        and receives packets on `receive_port` for the specified `duration`.
124
125        It must be able to handle receiving no packets.
126        """
127
128    def _write_capture_from_packets(self, capture_name: str, packets: list[Packet]) -> None:
129        file_name = f"{SETTINGS.output_dir}/{capture_name}.pcap"
130        self._logger.debug(f"Writing packets to {file_name}.")
131        scapy.utils.wrpcap(file_name, packets)
132