xref: /dpdk/examples/ipsec-secgw/test/pkttest.py (revision 1cde1b9a9b4dbf31cb5e5ccdfc5da3cb079f43a2)
1#!/usr/bin/env python3
2
3import fcntl
4import pkg_resources
5import socket
6import struct
7import sys
8import unittest
9
10
11if sys.version_info < (3, 0):
12    print("Python3 is required to run this script")
13    sys.exit(1)
14
15
16try:
17    from scapy.all import Ether
18except ImportError:
19    print("Scapy module is required")
20    sys.exit(1)
21
22
23PKTTEST_REQ = [
24    "scapy>=2.4.3",
25]
26
27
28def assert_requirements(req):
29    """
30    assert requirement is met
31    req can hold a string or a list of strings
32    """
33    try:
34        pkg_resources.require(req)
35    except (pkg_resources.DistributionNotFound, pkg_resources.VersionConflict) as e:
36        print("Requirement assertion: " + str(e))
37        sys.exit(1)
38
39
40TAP_UNPROTECTED = "dtap1"
41TAP_PROTECTED = "dtap0"
42
43
44class Interface(object):
45    ETH_P_ALL = 3
46    MAX_PACKET_SIZE = 1280
47    IOCTL_GET_INFO = 0x8927
48    SOCKET_TIMEOUT = 0.5
49    def __init__(self, ifname):
50        self.name = ifname
51
52        # create and bind socket to specified interface
53        self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(Interface.ETH_P_ALL))
54        self.s.settimeout(Interface.SOCKET_TIMEOUT)
55        self.s.bind((self.name, 0, socket.PACKET_OTHERHOST))
56
57        # get interface MAC address
58        info = fcntl.ioctl(self.s.fileno(), Interface.IOCTL_GET_INFO,  struct.pack('256s', bytes(ifname[:15], encoding='ascii')))
59        self.mac = ':'.join(['%02x' % i for i in info[18:24]])
60
61    def __del__(self):
62        self.s.close()
63
64    def send_l3packet(self, pkt, mac):
65        e = Ether(src=self.mac, dst=mac)
66        self.send_packet(e/pkt)
67
68    def send_packet(self, pkt):
69        self.send_bytes(bytes(pkt))
70
71    def send_bytes(self, bytedata):
72        self.s.send(bytedata)
73
74    def recv_packet(self):
75        return Ether(self.recv_bytes())
76
77    def recv_bytes(self):
78        return self.s.recv(Interface.MAX_PACKET_SIZE)
79
80    def get_mac(self):
81        return self.mac
82
83
84class PacketXfer(object):
85    def __init__(self, protected_iface=TAP_PROTECTED, unprotected_iface=TAP_UNPROTECTED):
86        self.protected_port = Interface(protected_iface)
87        self.unprotected_port = Interface(unprotected_iface)
88
89    def send_to_protected_port(self, pkt, remote_mac=None):
90        if remote_mac is None:
91            remote_mac = self.unprotected_port.get_mac()
92        self.protected_port.send_l3packet(pkt, remote_mac)
93
94    def send_to_unprotected_port(self, pkt, remote_mac=None):
95        if remote_mac is None:
96            remote_mac = self.protected_port.get_mac()
97        self.unprotected_port.send_l3packet(pkt, remote_mac)
98
99    def xfer_unprotected(self, pkt):
100        self.send_to_unprotected_port(pkt)
101        return self.protected_port.recv_packet()
102
103    def xfer_protected(self, pkt):
104        self.send_to_protected_port(pkt)
105        return self.unprotected_port.recv_packet()
106
107
108def pkttest():
109    if len(sys.argv) == 1:
110        sys.exit(unittest.main(verbosity=2))
111    elif len(sys.argv) == 2:
112        if sys.argv[1] == "config":
113            module = __import__('__main__')
114            try:
115                print(module.config())
116            except AttributeError:
117                sys.stderr.write("Cannot find \"config()\" in a test")
118                sys.exit(1)
119    else:
120        sys.exit(1)
121
122
123if __name__ == "__main__":
124    if len(sys.argv) == 2 and sys.argv[1] == "check_reqs":
125        assert_requirements(PKTTEST_REQ)
126    else:
127        print("Usage: " + sys.argv[0] + " check_reqs")
128