1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright (C) 2022 Intel Corporation. 3# All rights reserved. 4 5from concurrent import futures 6from contextlib import contextmanager 7from multiprocessing import Lock 8import grpc 9import logging 10from .device import DeviceException 11from .volume import VolumeException, VolumeManager 12from .volume import crypto 13from .volume import crypto_bdev 14from .proto import sma_pb2 as pb2 15from .proto import sma_pb2_grpc as pb2_grpc 16 17 18class StorageManagementAgent(pb2_grpc.StorageManagementAgentServicer): 19 def __init__(self, config, client): 20 addr, port = config['address'], config['port'] 21 self._devices = {} 22 self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) 23 self._server.add_insecure_port(f'{addr}:{port}') 24 self._volume_mgr = VolumeManager(client, config['discovery_timeout'], 25 config['volume_cleanup_period']) 26 pb2_grpc.add_StorageManagementAgentServicer_to_server(self, self._server) 27 28 def _grpc_method(f): 29 def wrapper(self, request, context): 30 logging.debug(f'{f.__name__}\n{request}') 31 return f(self, request, context) 32 return wrapper 33 34 def register_device(self, device_manager): 35 self._devices[device_manager.protocol] = device_manager 36 37 def start(self): 38 self._volume_mgr.start() 39 self._server.start() 40 41 def stop(self): 42 self._server.stop(None) 43 self._volume_mgr.stop() 44 45 def _find_device_by_name(self, name): 46 return self._devices.get(name) 47 48 def _find_device_by_handle(self, handle): 49 for device in self._devices.values(): 50 try: 51 if device.owns_device(handle): 52 return device 53 except NotImplementedError: 54 pass 55 return None 56 57 def _cleanup_volume(self, volume_id, existing): 58 if volume_id is None or existing: 59 return 60 try: 61 self._volume_mgr.disconnect_volume(volume_id) 62 except VolumeException: 63 logging.warning('Failed to cleanup volume {volume_id}') 64 65 @_grpc_method 66 def CreateDevice(self, request, context): 67 response = pb2.CreateDeviceResponse() 68 volume_id, existing = None, False 69 try: 70 if request.HasField('volume'): 71 volume_id, existing = self._volume_mgr.connect_volume(request.volume) 72 73 manager = self._find_device_by_name(request.WhichOneof('params')) 74 if manager is None: 75 raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT, 76 'Unsupported device type') 77 response = manager.create_device(request) 78 # Now that we know the device handle, mark the volume as attached to 79 # that device 80 if volume_id is not None: 81 self._volume_mgr.set_device(volume_id, response.handle) 82 except (DeviceException, VolumeException) as ex: 83 self._cleanup_volume(volume_id, existing) 84 context.set_details(ex.message) 85 context.set_code(ex.code) 86 except NotImplementedError: 87 self._cleanup_volume(volume_id, existing) 88 context.set_details('Method is not implemented by selected device type') 89 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 90 return response 91 92 @_grpc_method 93 def DeleteDevice(self, request, context): 94 response = pb2.DeleteDeviceResponse() 95 try: 96 device = self._find_device_by_handle(request.handle) 97 if device is None: 98 raise DeviceException(grpc.StatusCode.NOT_FOUND, 99 'Invalid device handle') 100 if not device.allow_delete_volumes and self._volume_mgr.has_volumes(request.handle): 101 raise DeviceException(grpc.StatusCode.FAILED_PRECONDITION, 102 'Device has attached volumes') 103 device.delete_device(request) 104 # Either there are no volumes attached to this device or we're allowed to delete it 105 # with volumes still attached 106 self._volume_mgr.disconnect_device_volumes(request.handle) 107 except DeviceException as ex: 108 context.set_details(ex.message) 109 context.set_code(ex.code) 110 except NotImplementedError: 111 context.set_details('Method is not implemented by selected device type') 112 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 113 return response 114 115 @_grpc_method 116 def AttachVolume(self, request, context): 117 response = pb2.AttachVolumeResponse() 118 volume_id, existing = None, False 119 try: 120 if not request.HasField('volume'): 121 raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT, 122 'Missing required field: volume') 123 volume_id, existing = self._volume_mgr.connect_volume(request.volume, 124 request.device_handle) 125 device = self._find_device_by_handle(request.device_handle) 126 if device is None: 127 raise DeviceException(grpc.StatusCode.NOT_FOUND, 'Invalid device handle') 128 device.attach_volume(request) 129 except (DeviceException, VolumeException) as ex: 130 self._cleanup_volume(volume_id, existing) 131 context.set_details(ex.message) 132 context.set_code(ex.code) 133 except NotImplementedError: 134 self._cleanup_volume(volume_id, existing) 135 context.set_details('Method is not implemented by selected device type') 136 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 137 return response 138 139 @_grpc_method 140 def DetachVolume(self, request, context): 141 response = pb2.DetachVolumeResponse() 142 try: 143 device = self._find_device_by_handle(request.device_handle) 144 if device is None: 145 raise DeviceException(grpc.StatusCode.NOT_FOUND, 'Invalid device handle') 146 device.detach_volume(request) 147 self._volume_mgr.disconnect_volume(request.volume_id) 148 except DeviceException as ex: 149 context.set_details(ex.message) 150 context.set_code(ex.code) 151 return response 152 153 @_grpc_method 154 def SetQos(self, request, context): 155 response = pb2.SetQosResponse() 156 try: 157 device = self._find_device_by_handle(request.device_handle) 158 if device is None: 159 raise DeviceException(grpc.StatusCode.NOT_FOUND, 'Invalid device handle') 160 device.set_qos(request) 161 except DeviceException as ex: 162 context.set_details(ex.message) 163 context.set_code(ex.code) 164 except NotImplementedError: 165 context.set_details('Method is not implemented by selected device type') 166 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 167 return response 168 169 @_grpc_method 170 def GetQosCapabilities(self, request, context): 171 device_type_map = { 172 pb2.DeviceType.DEVICE_TYPE_NVME: 'nvme', 173 pb2.DeviceType.DEVICE_TYPE_VIRTIO_BLK: 'virtio_blk', 174 pb2.DeviceType.DEVICE_TYPE_NVMF_TCP: 'nvmf_tcp', 175 } 176 response = pb2.GetQosCapabilitiesResponse() 177 try: 178 name = device_type_map.get(request.device_type) 179 if name is None: 180 raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT, 181 'Invalid device type') 182 manager = self._find_device_by_name(name) 183 if manager is None: 184 raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT, 185 'Unsupported device type') 186 response = manager.get_qos_capabilities(request) 187 except DeviceException as ex: 188 context.set_details(ex.message) 189 context.set_code(ex.code) 190 except NotImplementedError: 191 # If a device manager doesn't implement this method, return empty capabilities to 192 # indicate that no QoS capabilities are supported 193 pass 194 return response 195 196 197crypto.register_crypto_engine(crypto.CryptoEngineNop()) 198crypto.register_crypto_engine(crypto_bdev.CryptoEngineBdev()) 199crypto.set_crypto_engine('nop') 200