1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3 4import fcntl 5import pkg_resources 6import socket 7import struct 8import sys 9import unittest 10 11 12if sys.version_info < (3, 0): 13 print("Python3 is required to run this script") 14 sys.exit(1) 15 16 17try: 18 from scapy.all import Ether 19except ImportError: 20 print("Scapy module is required") 21 sys.exit(1) 22 23 24PKTTEST_REQ = [ 25 "scapy>=2.4.3", 26] 27 28 29def assert_requirements(req): 30 """ 31 assert requirement is met 32 req can hold a string or a list of strings 33 """ 34 try: 35 pkg_resources.require(req) 36 except (pkg_resources.DistributionNotFound, pkg_resources.VersionConflict) as e: 37 print("Requirement assertion: " + str(e)) 38 sys.exit(1) 39 40 41TAP_UNPROTECTED = "dtap1" 42TAP_PROTECTED = "dtap0" 43 44 45class Interface(object): 46 ETH_P_ALL = 3 47 MAX_PACKET_SIZE = 1280 48 IOCTL_GET_INFO = 0x8927 49 SOCKET_TIMEOUT = 0.5 50 def __init__(self, ifname): 51 self.name = ifname 52 53 # create and bind socket to specified interface 54 self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(Interface.ETH_P_ALL)) 55 self.s.settimeout(Interface.SOCKET_TIMEOUT) 56 self.s.bind((self.name, 0, socket.PACKET_OTHERHOST)) 57 58 # get interface MAC address 59 info = fcntl.ioctl(self.s.fileno(), Interface.IOCTL_GET_INFO, struct.pack('256s', bytes(ifname[:15], encoding='ascii'))) 60 self.mac = ':'.join(['%02x' % i for i in info[18:24]]) 61 62 def __del__(self): 63 self.s.close() 64 65 def send_l3packet(self, pkt, mac): 66 e = Ether(src=self.mac, dst=mac) 67 self.send_packet(e/pkt) 68 69 def send_packet(self, pkt): 70 self.send_bytes(bytes(pkt)) 71 72 def send_bytes(self, bytedata): 73 self.s.send(bytedata) 74 75 def recv_packet(self): 76 return Ether(self.recv_bytes()) 77 78 def recv_bytes(self): 79 return self.s.recv(Interface.MAX_PACKET_SIZE) 80 81 def get_mac(self): 82 return self.mac 83 84 85class PacketXfer(object): 86 def __init__(self, protected_iface=TAP_PROTECTED, unprotected_iface=TAP_UNPROTECTED): 87 self.protected_port = Interface(protected_iface) 88 self.unprotected_port = Interface(unprotected_iface) 89 90 def send_to_protected_port(self, pkt, remote_mac=None): 91 if remote_mac is None: 92 remote_mac = self.unprotected_port.get_mac() 93 self.protected_port.send_l3packet(pkt, remote_mac) 94 95 def send_to_unprotected_port(self, pkt, remote_mac=None): 96 if remote_mac is None: 97 remote_mac = self.protected_port.get_mac() 98 self.unprotected_port.send_l3packet(pkt, remote_mac) 99 100 def xfer_unprotected(self, pkt): 101 self.send_to_unprotected_port(pkt) 102 return self.protected_port.recv_packet() 103 104 def xfer_protected(self, pkt): 105 self.send_to_protected_port(pkt) 106 return self.unprotected_port.recv_packet() 107 108 109def pkttest(): 110 if len(sys.argv) == 1: 111 sys.exit(unittest.main(verbosity=2)) 112 elif len(sys.argv) == 2: 113 if sys.argv[1] == "config": 114 module = __import__('__main__') 115 try: 116 print(module.config()) 117 except AttributeError: 118 sys.stderr.write("Cannot find \"config()\" in a test") 119 sys.exit(1) 120 else: 121 sys.exit(1) 122 123 124if __name__ == "__main__": 125 if len(sys.argv) == 2 and sys.argv[1] == "check_reqs": 126 assert_requirements(PKTTEST_REQ) 127 else: 128 print("Usage: " + sys.argv[0] + " check_reqs") 129