Lines Matching defs:self
67 def __init__(self, tg_node: Node, config: ScapyTrafficGeneratorConfig, **kwargs):
85 self.start_application()
87 def start_application(self) -> None:
94 self.send_command("from scapy.all import *")
96 def _send_packets(self, packets: list[Packet], port: Port) -> None:
101 self._shell_set_packet_list(packets)
104 f"{self._send_packet_list_name},",
110 self.send_command(f"\n{self._python_indentation}".join(send_command))
113 self,
129 self._shell_create_sniffer(
130 packets, send_port, recv_port, self._create_packet_filter(filter_config)
132 return self._shell_start_and_stop_sniffing(duration)
134 def _shell_set_packet_list(self, packets: list[Packet]) -> None:
148 self._logger.info("Building a list of packets to send.")
149 self.send_command(
150 f"{self._send_packet_list_name} = [{', '.join(map(Packet.command, packets))}]"
153 def _create_packet_filter(self, filter_config: PacketFilteringConfig) -> str:
176 self, packets_to_send: list[Packet], send_port: Port, recv_port: Port, filter_config: str
190 self._shell_set_packet_list(packets_to_send)
192 self.send_command("import time")
194 f"{self._sniffer_name} = AsyncSniffer(",
202 f"{self._python_indentation}{self._send_packet_list_name},"
210 self.send_command(f"\n{self._python_indentation}".join(sniffer_commands))
212 def _shell_start_and_stop_sniffing(self, duration: float) -> list[Packet]:
227 self.send_command(f"{self._sniffer_name}.start()")
230 self.send_command(f"{sniffed_packets_name} = {self._sniffer_name}.stop(join=True)")
232 packet_strs = self.send_command(