1#!/usr/bin/env python3 2 3import fcntl 4import pkg_resources 5import socket 6import struct 7import sys 8import unittest 9 10 11if sys.version_info < (3, 0): 12 print("Python3 is required to run this script") 13 sys.exit(1) 14 15 16try: 17 from scapy.all import Ether 18except ImportError: 19 print("Scapy module is required") 20 sys.exit(1) 21 22 23PKTTEST_REQ = [ 24 "scapy>=2.4.3", 25] 26 27 28def assert_requirements(req): 29 """ 30 assert requirement is met 31 req can hold a string or a list of strings 32 """ 33 try: 34 pkg_resources.require(req) 35 except (pkg_resources.DistributionNotFound, pkg_resources.VersionConflict) as e: 36 print("Requirement assertion: " + str(e)) 37 sys.exit(1) 38 39 40TAP_UNPROTECTED = "dtap1" 41TAP_PROTECTED = "dtap0" 42 43 44class Interface(object): 45 ETH_P_ALL = 3 46 MAX_PACKET_SIZE = 1280 47 IOCTL_GET_INFO = 0x8927 48 SOCKET_TIMEOUT = 0.5 49 def __init__(self, ifname): 50 self.name = ifname 51 52 # create and bind socket to specified interface 53 self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(Interface.ETH_P_ALL)) 54 self.s.settimeout(Interface.SOCKET_TIMEOUT) 55 self.s.bind((self.name, 0, socket.PACKET_OTHERHOST)) 56 57 # get interface MAC address 58 info = fcntl.ioctl(self.s.fileno(), Interface.IOCTL_GET_INFO, struct.pack('256s', bytes(ifname[:15], encoding='ascii'))) 59 self.mac = ':'.join(['%02x' % i for i in info[18:24]]) 60 61 def __del__(self): 62 self.s.close() 63 64 def send_l3packet(self, pkt, mac): 65 e = Ether(src=self.mac, dst=mac) 66 self.send_packet(e/pkt) 67 68 def send_packet(self, pkt): 69 self.send_bytes(bytes(pkt)) 70 71 def send_bytes(self, bytedata): 72 self.s.send(bytedata) 73 74 def recv_packet(self): 75 return Ether(self.recv_bytes()) 76 77 def recv_bytes(self): 78 return self.s.recv(Interface.MAX_PACKET_SIZE) 79 80 def get_mac(self): 81 return self.mac 82 83 84class PacketXfer(object): 85 def __init__(self, protected_iface=TAP_PROTECTED, unprotected_iface=TAP_UNPROTECTED): 86 self.protected_port = Interface(protected_iface) 87 self.unprotected_port = Interface(unprotected_iface) 88 89 def send_to_protected_port(self, pkt, remote_mac=None): 90 if remote_mac is None: 91 remote_mac = self.unprotected_port.get_mac() 92 self.protected_port.send_l3packet(pkt, remote_mac) 93 94 def send_to_unprotected_port(self, pkt, remote_mac=None): 95 if remote_mac is None: 96 remote_mac = self.protected_port.get_mac() 97 self.unprotected_port.send_l3packet(pkt, remote_mac) 98 99 def xfer_unprotected(self, pkt): 100 self.send_to_unprotected_port(pkt) 101 return self.protected_port.recv_packet() 102 103 def xfer_protected(self, pkt): 104 self.send_to_protected_port(pkt) 105 return self.unprotected_port.recv_packet() 106 107 108def pkttest(): 109 if len(sys.argv) == 1: 110 sys.exit(unittest.main(verbosity=2)) 111 elif len(sys.argv) == 2: 112 if sys.argv[1] == "config": 113 module = __import__('__main__') 114 try: 115 print(module.config()) 116 except AttributeError: 117 sys.stderr.write("Cannot find \"config()\" in a test") 118 sys.exit(1) 119 else: 120 sys.exit(1) 121 122 123if __name__ == "__main__": 124 if len(sys.argv) == 2 and sys.argv[1] == "check_reqs": 125 assert_requirements(PKTTEST_REQ) 126 else: 127 print("Usage: " + sys.argv[0] + " check_reqs") 128