xref: /dpdk/examples/ipsec-secgw/test/pkttest.py (revision 1dc48bce518de1d8cb73f961404a47311c4a248d)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: BSD-3-Clause
3
4import fcntl
5import pkg_resources
6import socket
7import struct
8import sys
9import unittest
10
11
12if sys.version_info < (3, 0):
13    print("Python3 is required to run this script")
14    sys.exit(1)
15
16
17try:
18    from scapy.all import Ether
19except ImportError:
20    print("Scapy module is required")
21    sys.exit(1)
22
23
24PKTTEST_REQ = [
25    "scapy>=2.4.3",
26]
27
28
29def assert_requirements(req):
30    """
31    assert requirement is met
32    req can hold a string or a list of strings
33    """
34    try:
35        pkg_resources.require(req)
36    except (pkg_resources.DistributionNotFound, pkg_resources.VersionConflict) as e:
37        print("Requirement assertion: " + str(e))
38        sys.exit(1)
39
40
41TAP_UNPROTECTED = "dtap1"
42TAP_PROTECTED = "dtap0"
43
44
45class Interface(object):
46    ETH_P_ALL = 3
47    MAX_PACKET_SIZE = 1280
48    IOCTL_GET_INFO = 0x8927
49    SOCKET_TIMEOUT = 0.5
50    def __init__(self, ifname):
51        self.name = ifname
52
53        # create and bind socket to specified interface
54        self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(Interface.ETH_P_ALL))
55        self.s.settimeout(Interface.SOCKET_TIMEOUT)
56        self.s.bind((self.name, 0, socket.PACKET_OTHERHOST))
57
58        # get interface MAC address
59        info = fcntl.ioctl(self.s.fileno(), Interface.IOCTL_GET_INFO,  struct.pack('256s', bytes(ifname[:15], encoding='ascii')))
60        self.mac = ':'.join(['%02x' % i for i in info[18:24]])
61
62    def __del__(self):
63        self.s.close()
64
65    def send_l3packet(self, pkt, mac):
66        e = Ether(src=self.mac, dst=mac)
67        self.send_packet(e/pkt)
68
69    def send_packet(self, pkt):
70        self.send_bytes(bytes(pkt))
71
72    def send_bytes(self, bytedata):
73        self.s.send(bytedata)
74
75    def recv_packet(self):
76        return Ether(self.recv_bytes())
77
78    def recv_bytes(self):
79        return self.s.recv(Interface.MAX_PACKET_SIZE)
80
81    def get_mac(self):
82        return self.mac
83
84
85class PacketXfer(object):
86    def __init__(self, protected_iface=TAP_PROTECTED, unprotected_iface=TAP_UNPROTECTED):
87        self.protected_port = Interface(protected_iface)
88        self.unprotected_port = Interface(unprotected_iface)
89
90    def send_to_protected_port(self, pkt, remote_mac=None):
91        if remote_mac is None:
92            remote_mac = self.unprotected_port.get_mac()
93        self.protected_port.send_l3packet(pkt, remote_mac)
94
95    def send_to_unprotected_port(self, pkt, remote_mac=None):
96        if remote_mac is None:
97            remote_mac = self.protected_port.get_mac()
98        self.unprotected_port.send_l3packet(pkt, remote_mac)
99
100    def xfer_unprotected(self, pkt):
101        self.send_to_unprotected_port(pkt)
102        return self.protected_port.recv_packet()
103
104    def xfer_protected(self, pkt):
105        self.send_to_protected_port(pkt)
106        return self.unprotected_port.recv_packet()
107
108
109def pkttest():
110    if len(sys.argv) == 1:
111        sys.exit(unittest.main(verbosity=2))
112    elif len(sys.argv) == 2:
113        if sys.argv[1] == "config":
114            module = __import__('__main__')
115            try:
116                print(module.config())
117            except AttributeError:
118                sys.stderr.write("Cannot find \"config()\" in a test")
119                sys.exit(1)
120    else:
121        sys.exit(1)
122
123
124if __name__ == "__main__":
125    if len(sys.argv) == 2 and sys.argv[1] == "check_reqs":
126        assert_requirements(PKTTEST_REQ)
127    else:
128        print("Usage: " + sys.argv[0] + " check_reqs")
129