1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright(c) 2022 University of New Hampshire 3# Copyright(c) 2023 PANTHEON.tech s.r.o. 4 5"""Traffic generator that can capture packets. 6 7In functional testing, we need to interrogate received packets to check their validity. 8The module defines the interface common to all traffic generators capable of capturing 9traffic. 10""" 11 12import uuid 13from abc import abstractmethod 14from dataclasses import dataclass 15 16import scapy.utils # type: ignore[import-untyped] 17from scapy.packet import Packet # type: ignore[import-untyped] 18 19from framework.settings import SETTINGS 20from framework.testbed_model.port import Port 21from framework.utils import get_packet_summaries 22 23from .traffic_generator import TrafficGenerator 24 25 26def _get_default_capture_name() -> str: 27 return str(uuid.uuid4()) 28 29 30@dataclass(slots=True, frozen=True) 31class PacketFilteringConfig: 32 """The supported filtering options for :class:`CapturingTrafficGenerator`. 33 34 Attributes: 35 no_lldp: If :data:`True`, LLDP packets will be filtered out when capturing. 36 no_arp: If :data:`True`, ARP packets will be filtered out when capturing. 37 """ 38 39 no_lldp: bool = True 40 no_arp: bool = True 41 42 43class CapturingTrafficGenerator(TrafficGenerator): 44 """Capture packets after sending traffic. 45 46 The intermediary interface which enables a packet generator to declare that it can capture 47 packets and return them to the user. 48 49 Similarly to :class:`~.traffic_generator.TrafficGenerator`, this class exposes 50 the public methods specific to capturing traffic generators and defines a private method 51 that must implement the traffic generation and capturing logic in subclasses. 52 53 The methods of capturing traffic generators obey the following workflow: 54 55 1. send packets 56 2. capture packets 57 3. write the capture to a .pcap file 58 4. return the received packets 59 """ 60 61 @property 62 def is_capturing(self) -> bool: 63 """This traffic generator can capture traffic.""" 64 return True 65 66 def send_packets_and_capture( 67 self, 68 packets: list[Packet], 69 send_port: Port, 70 receive_port: Port, 71 filter_config: PacketFilteringConfig, 72 duration: float, 73 capture_name: str = _get_default_capture_name(), 74 ) -> list[Packet]: 75 """Send `packets` and capture received traffic. 76 77 Send `packets` on `send_port` and then return all traffic captured 78 on `receive_port` for the given `duration`. 79 80 The captured traffic is recorded in the `capture_name`.pcap file. The target directory 81 can be configured with the :option:`--output-dir` command line argument or 82 the :envvar:`DTS_OUTPUT_DIR` environment variable. 83 84 Args: 85 packets: The packets to send. 86 send_port: The egress port on the TG node. 87 receive_port: The ingress port in the TG node. 88 filter_config: Filters to apply when capturing packets. 89 duration: Capture traffic for this amount of time after sending the packets. 90 capture_name: The name of the .pcap file where to store the capture. 91 92 Returns: 93 The received packets. May be empty if no packets are captured. 94 """ 95 self._logger.debug(get_packet_summaries(packets)) 96 self._logger.debug( 97 f"Sending packet on {send_port.logical_name}, receiving on {receive_port.logical_name}." 98 ) 99 received_packets = self._send_packets_and_capture( 100 packets, 101 send_port, 102 receive_port, 103 filter_config, 104 duration, 105 ) 106 107 self._logger.debug(f"Received packets: {get_packet_summaries(received_packets)}") 108 self._write_capture_from_packets(capture_name, received_packets) 109 return received_packets 110 111 @abstractmethod 112 def _send_packets_and_capture( 113 self, 114 packets: list[Packet], 115 send_port: Port, 116 receive_port: Port, 117 filter_config: PacketFilteringConfig, 118 duration: float, 119 ) -> list[Packet]: 120 """The implementation of :method:`send_packets_and_capture`. 121 122 The subclasses must implement this method which sends `packets` on `send_port` 123 and receives packets on `receive_port` for the specified `duration`. 124 125 It must be able to handle receiving no packets. 126 """ 127 128 def _write_capture_from_packets(self, capture_name: str, packets: list[Packet]) -> None: 129 file_name = f"{SETTINGS.output_dir}/{capture_name}.pcap" 130 self._logger.debug(f"Writing packets to {file_name}.") 131 scapy.utils.wrpcap(file_name, packets) 132