1#!/usr/bin/env python3 2 3from argparse import ArgumentParser 4import importlib 5import logging 6import os 7import signal 8import sys 9import threading 10import time 11import yaml 12 13sys.path.append(os.path.dirname(__file__) + '/../python') 14 15import spdk.sma as sma # noqa 16import spdk.rpc.client as rpcclient # noqa 17 18 19def parse_config(path): 20 if path is None: 21 return {} 22 with open(path, 'r') as cfgfile: 23 config = yaml.load(cfgfile, Loader=yaml.FullLoader) 24 return {**config} if config is not None else {} 25 26 27def parse_argv(): 28 parser = ArgumentParser(description='Storage Management Agent command line interface') 29 parser.add_argument('--address', '-a', help='IP address to listen on') 30 parser.add_argument('--socket', '-s', help='SPDK RPC socket') 31 parser.add_argument('--port', '-p', type=int, help='IP port to listen on') 32 parser.add_argument('--config', '-c', help='Path to config file') 33 defaults = {'address': 'localhost', 34 'socket': '/var/tmp/spdk.sock', 35 'port': 8080, 36 'discovery_timeout': 10.0, 37 'volume_cleanup_period': 60.0} 38 # Merge the default values, config file, and the command-line 39 args = vars(parser.parse_args()) 40 config = parse_config(args.get('config')) 41 for argname, argvalue in defaults.items(): 42 if args.get(argname) is not None: 43 if config.get(argname) is not None: 44 logging.info(f'Overriding "{argname}" value from command-line') 45 config[argname] = args[argname] 46 if config.get(argname) is None: 47 config[argname] = argvalue 48 return config 49 50 51def get_build_client(sock): 52 def build_client(): 53 return rpcclient.JSONRPCClient(sock) 54 55 return build_client 56 57 58def register_devices(agent, devices, config): 59 for device_config in config.get('devices') or []: 60 name = device_config.get('name') 61 device_manager = next(filter(lambda s: s.name == name, devices), None) 62 if device_manager is None: 63 logging.error(f'Couldn\'t find device: {name}') 64 sys.exit(1) 65 logging.info(f'Registering device: {name}') 66 device_manager.init(device_config.get('params')) 67 agent.register_device(device_manager) 68 69 70def init_crypto(config, client): 71 crypto_config = config.get('crypto') 72 if crypto_config is None: 73 return 74 name = crypto_config.get('name') 75 if name is None: 76 logging.error('Crypto engine name is missing') 77 sys.exit(1) 78 try: 79 sma.set_crypto_engine(name) 80 sma.get_crypto_engine().init(client, crypto_config.get('params', {})) 81 except ValueError: 82 logging.error(f'Invalid crypto engine: {name}') 83 sys.exit(1) 84 85 86def load_plugins(plugins, client): 87 devices = [] 88 for plugin in plugins: 89 module = importlib.import_module(plugin) 90 for device in getattr(module, 'devices', []): 91 logging.debug(f'Loading external device: {plugin}.{device.__name__}') 92 devices.append(device(client)) 93 for engine_class in getattr(module, 'crypto_engines', []): 94 engine = engine_class() 95 logging.debug(f'Loading external crypto engine: {plugin}.{engine.name}') 96 sma.register_crypto_engine(engine) 97 return devices 98 99 100def wait_for_listen(client, timeout): 101 start = time.monotonic() 102 while True: 103 try: 104 with client() as _client: 105 _client.call('rpc_get_methods') 106 # If we got here, the process is responding to RPCs 107 break 108 except rpcclient.JSONRPCException: 109 logging.debug('The SPDK process is not responding for {}s'.format( 110 int(time.monotonic() - start))) 111 112 if time.monotonic() > start + timeout: 113 logging.error('Timed out while waiting for SPDK process to respond') 114 sys.exit(1) 115 time.sleep(1) 116 117 118def run(agent): 119 event = threading.Event() 120 121 def signal_handler(signum, frame): 122 event.set() 123 124 for signum in [signal.SIGTERM, signal.SIGINT]: 125 signal.signal(signum, signal_handler) 126 127 agent.start() 128 event.wait() 129 agent.stop() 130 131 132if __name__ == '__main__': 133 logging.basicConfig(level=os.environ.get('SMA_LOGLEVEL', 'WARNING').upper()) 134 135 config = parse_argv() 136 client = get_build_client(config['socket']) 137 138 # Wait until the SPDK process starts responding to RPCs 139 wait_for_listen(client, timeout=60.0) 140 141 agent = sma.StorageManagementAgent(config, client) 142 143 devices = [sma.NvmfTcpDeviceManager(client), sma.VhostBlkDeviceManager(client), 144 sma.NvmfVfioDeviceManager(client)] 145 devices += load_plugins(config.get('plugins') or [], client) 146 devices += load_plugins(filter(None, os.environ.get('SMA_PLUGINS', '').split(':')), 147 client) 148 init_crypto(config, client) 149 register_devices(agent, devices, config) 150 run(agent) 151