1#!/usr/bin/env python3 2 3from argparse import ArgumentParser 4import importlib 5import logging 6import os 7import signal 8import sys 9import threading 10import time 11import yaml 12 13sys.path.append(os.path.dirname(__file__) + '/../python') 14 15import spdk.sma as sma # noqa 16import spdk.rpc.client as rpcclient # noqa 17 18 19def parse_config(path): 20 if path is None: 21 return {} 22 with open(path, 'r') as cfgfile: 23 config = yaml.load(cfgfile, Loader=yaml.FullLoader) 24 return {**config} if config is not None else {} 25 26 27def parse_argv(): 28 parser = ArgumentParser(description='Storage Management Agent command line interface') 29 parser.add_argument('--address', '-a', help='IP address to listen on') 30 parser.add_argument('--socket', '-s', help='SPDK RPC socket') 31 parser.add_argument('--port', '-p', type=int, help='IP port to listen on') 32 parser.add_argument('--config', '-c', help='Path to config file') 33 defaults = {'address': 'localhost', 34 'socket': '/var/tmp/spdk.sock', 35 'port': 8080, 36 'discovery_timeout': 10.0, 37 'volume_cleanup_period': 60.0} 38 # Merge the default values, config file, and the command-line 39 args = vars(parser.parse_args()) 40 config = parse_config(args.get('config')) 41 for argname, argvalue in defaults.items(): 42 if args.get(argname) is not None: 43 if config.get(argname) is not None: 44 logging.info(f'Overriding "{argname}" value from command-line') 45 config[argname] = args[argname] 46 if config.get(argname) is None: 47 config[argname] = argvalue 48 return config 49 50 51def get_build_client(sock): 52 def build_client(): 53 return rpcclient.JSONRPCClient(sock) 54 55 return build_client 56 57 58def register_devices(agent, devices, config): 59 for device_config in config.get('devices') or []: 60 name = device_config.get('name') 61 device_manager = next(filter(lambda s: s.name == name, devices), None) 62 if device_manager is None: 63 logging.error(f'Couldn\'t find device: {name}') 64 sys.exit(1) 65 logging.info(f'Registering device: {name}') 66 device_manager.init(device_config.get('params')) 67 agent.register_device(device_manager) 68 69 70def load_plugins(plugins, client): 71 devices = [] 72 for plugin in plugins: 73 module = importlib.import_module(plugin) 74 for device in getattr(module, 'devices', []): 75 logging.debug(f'Loading external device: {plugin}.{device.__name__}') 76 devices.append(device(client)) 77 return devices 78 79 80def wait_for_listen(client, timeout): 81 start = time.monotonic() 82 while True: 83 try: 84 with client() as _client: 85 _client.call('rpc_get_methods') 86 # If we got here, the process is responding to RPCs 87 break 88 except rpcclient.JSONRPCException: 89 logging.debug('The SPDK process is not responding for {}s'.format( 90 int(time.monotonic() - start))) 91 92 if time.monotonic() > start + timeout: 93 logging.error('Timed out while waiting for SPDK process to respond') 94 sys.exit(1) 95 time.sleep(1) 96 97 98def run(agent): 99 event = threading.Event() 100 101 def signal_handler(signum, frame): 102 event.set() 103 104 for signum in [signal.SIGTERM, signal.SIGINT]: 105 signal.signal(signum, signal_handler) 106 107 agent.start() 108 event.wait() 109 agent.stop() 110 111 112if __name__ == '__main__': 113 logging.basicConfig(level=os.environ.get('SMA_LOGLEVEL', 'WARNING').upper()) 114 115 config = parse_argv() 116 client = get_build_client(config['socket']) 117 118 # Wait until the SPDK process starts responding to RPCs 119 wait_for_listen(client, timeout=60.0) 120 121 agent = sma.StorageManagementAgent(config, client) 122 123 devices = [sma.NvmfTcpDeviceManager(client), sma.VhostBlkDeviceManager(client)] 124 devices += load_plugins(config.get('plugins') or [], client) 125 devices += load_plugins(filter(None, os.environ.get('SMA_PLUGINS', '').split(':')), 126 client) 127 register_devices(agent, devices, config) 128 run(agent) 129