1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright (C) 2022 Intel Corporation. 3# All rights reserved. 4 5import grpc 6import ipaddress 7import logging 8import threading 9import uuid 10from dataclasses import dataclass 11from spdk.rpc.client import JSONRPCException 12from . import crypto 13from ..common import format_volume_id 14from ..proto import sma_pb2 15 16 17log = logging.getLogger(__name__) 18 19 20class VolumeException(Exception): 21 def __init__(self, code, message): 22 self.code = code 23 self.message = message 24 25 26class Volume: 27 def __init__(self, volume_id, device_handle, discovery_services): 28 self.volume_id = volume_id 29 self.discovery_services = discovery_services 30 self.device_handle = device_handle 31 32 33class VolumeManager: 34 def __init__(self, client, discovery_timeout, cleanup_period): 35 self._client = client 36 # Discovery service map (name -> refcnt) 37 self._discovery = {} 38 # Volume map (volume_id -> Volume) 39 self._volumes = {} 40 self._discovery_timeout = int(discovery_timeout * 1000) 41 self._cleanup_period = cleanup_period 42 self._lock = threading.Lock() 43 self._cv = threading.Condition(self._lock) 44 self._running = False 45 self._thread = None 46 47 def _locked(f): 48 def wrapper(self, *args, **kwargs): 49 self._lock.acquire() 50 try: 51 return f(self, *args, **kwargs) 52 finally: 53 self._lock.release() 54 return wrapper 55 56 def start(self): 57 if self._thread is not None: 58 raise ValueError('Volume manager was already started') 59 self._running = True 60 self._thread = threading.Thread(target=self._cleanup_thread, args=(self,)) 61 self._thread.start() 62 63 def stop(self): 64 if self._thread is None: 65 return 66 with self._lock: 67 self._running = False 68 self._cv.notify_all() 69 self._thread.join() 70 self._thread = None 71 72 @staticmethod 73 def _cleanup_thread(*args): 74 self, = args 75 with self._lock: 76 while self._running: 77 self._cleanup_volumes() 78 self._cv.wait(self._cleanup_period) 79 80 def _cleanup_volumes(self): 81 try: 82 disconnected = [] 83 with self._client() as client: 84 bdevs = client.call('bdev_get_bdevs') 85 for volume_id in self._volumes: 86 if volume_id not in [b['uuid'] for b in bdevs]: 87 log.warning(f'Found disconnected volume: {volume_id}') 88 disconnected.append(volume_id) 89 for volume_id in disconnected: 90 self._disconnect_volume(volume_id) 91 except VolumeException as ex: 92 log.error(f'Failure when trying to disconnect volumes: {ex.message}') 93 except JSONRPCException as ex: 94 log.error(f'Failed to retrieve bdevs: {ex.message}') 95 96 def _get_discovery_info(self): 97 try: 98 with self._client() as client: 99 return client.call('bdev_nvme_get_discovery_info') 100 except JSONRPCException: 101 raise VolumeException(grpc.StatusCode.INTERNAL, 102 'Failed to retrieve discovery service status') 103 104 def _compare_trid(self, trid1, trid2): 105 return (trid1['trtype'].lower() == trid2['trtype'].lower() and 106 trid1['traddr'].lower() == trid2['traddr'].lower() and 107 trid1['trsvcid'].lower() == trid2['trsvcid'].lower() and 108 trid1['adrfam'].lower() == trid2['adrfam'].lower()) 109 110 def _get_adrfam(self, traddr): 111 try: 112 return 'ipv{}'.format(ipaddress.ip_address(traddr).version) 113 except ValueError: 114 raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT, 115 'Invalid traddr') 116 117 def _get_volume_bdev(self, volume_id, timeout): 118 try: 119 with self._client() as client: 120 return client.call('bdev_get_bdevs', 121 {'name': volume_id, 122 'timeout': timeout})[0] 123 except JSONRPCException: 124 return None 125 126 def _start_discovery(self, trid, hostnqn): 127 try: 128 # Use random UUID as name 129 name = str(uuid.uuid4()) 130 log.debug(f'Starting discovery service {name}') 131 with self._client() as client: 132 client.call('bdev_nvme_start_discovery', 133 {'name': name, 134 'wait_for_attach': True, 135 'attach_timeout_ms': self._discovery_timeout, 136 'hostnqn': hostnqn, 137 **trid}) 138 self._discovery[name] = 1 139 return name 140 except JSONRPCException: 141 raise VolumeException(grpc.StatusCode.INTERNAL, 142 'Failed to start discovery') 143 144 def _stop_discovery(self, name): 145 refcnt = self._discovery.get(name) 146 log.debug(f'Stopping discovery service {name}, refcnt={refcnt}') 147 if refcnt is None: 148 # Should never happen 149 log.warning('Tried to stop discovery using non-existing name') 150 return 151 # Check the refcount to leave the service running if there are more volumes using it 152 if refcnt > 1: 153 self._discovery[name] = refcnt - 1 154 return 155 del self._discovery[name] 156 try: 157 with self._client() as client: 158 client.call('bdev_nvme_stop_discovery', 159 {'name': name}) 160 log.debug(f'Stopped discovery service {name}') 161 except JSONRPCException: 162 raise VolumeException(grpc.StatusCode.INTERNAL, 163 'Failed to stop discovery') 164 165 def _get_crypto_params(self, params): 166 key, cipher, key2, tweak_mode = None, None, None, None 167 try: 168 if params.HasField('crypto'): 169 key, cipher = params.crypto.key.decode('ascii'), params.crypto.cipher 170 if len(params.crypto.key2) > 0: 171 key2 = params.crypto.key2.decode('ascii') 172 if params.crypto.tweak_mode is not None: 173 tweak_mode = params.crypto.tweak_mode 174 except UnicodeDecodeError: 175 raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT, 176 'Corrupted crypto key') 177 return key, cipher, key2, tweak_mode 178 179 def _setup_crypto(self, volume_id, params): 180 try: 181 if not params.HasField('crypto'): 182 return 183 key, cipher, key2, tweak_mode = self._get_crypto_params(params) 184 crypto.get_crypto_engine().setup(volume_id, key, cipher, key2, tweak_mode) 185 except crypto.CryptoException as ex: 186 raise VolumeException(ex.code, ex.message) 187 188 def _cleanup_crypto(self, volume_id): 189 try: 190 crypto.get_crypto_engine().cleanup(volume_id) 191 except crypto.CryptoException as ex: 192 logging.warning(f'Failed to cleanup crypto: {ex.message}') 193 194 def _verify_crypto(self, volume_id, params): 195 try: 196 key, cipher, key2, tweak_mode = self._get_crypto_params(params) 197 crypto.get_crypto_engine().verify(volume_id, key, cipher, key2, tweak_mode) 198 except crypto.CryptoException as ex: 199 raise VolumeException(ex.code, ex.message) 200 201 @_locked 202 def connect_volume(self, params, device_handle=None): 203 """ Connects a volume through a discovery service. Returns a tuple (volume_id, existing): 204 the first item is a volume_id as str, while the second denotes whether the selected volume 205 existed prior to calling this method. 206 """ 207 volume_id = format_volume_id(params.volume_id) 208 if volume_id is None: 209 raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT, 210 'Invalid volume ID') 211 if volume_id in self._volumes: 212 volume = self._volumes[volume_id] 213 if device_handle is not None and volume.device_handle != device_handle: 214 raise VolumeException(grpc.StatusCode.ALREADY_EXISTS, 215 'Volume is already attached to a different device') 216 # Make sure the crypto params are the same 217 self._verify_crypto(volume_id, params) 218 return volume_id, True 219 discovery_services = set() 220 try: 221 # First start discovery connecting to specified endpoints 222 for req_ep in params.nvmf.discovery.discovery_endpoints: 223 info = self._get_discovery_info() 224 trid = {'trtype': req_ep.trtype, 225 'traddr': req_ep.traddr, 226 'trsvcid': req_ep.trsvcid, 227 'adrfam': self._get_adrfam(req_ep.traddr)} 228 name = None 229 for discovery in info: 230 if self._compare_trid(discovery['trid'], trid): 231 name = discovery['name'] 232 break 233 if next(filter(lambda r: self._compare_trid(r['trid'], trid), 234 discovery['referrals']), None): 235 name = discovery['name'] 236 break 237 if name is not None: 238 # If we've already attached a discovery service, it probably means that the user 239 # specified a referred address 240 if name not in discovery_services: 241 refcnt = self._discovery.get(name) 242 if refcnt is None: 243 log.warning('Found a discovery service missing from internal map') 244 refcnt = 0 245 self._discovery[name] = refcnt + 1 246 else: 247 name = self._start_discovery(trid, params.nvmf.hostnqn) 248 discovery_services.add(name) 249 250 # Now check if a bdev with specified volume_id exists, give it 1s to appear 251 bdev = self._get_volume_bdev(volume_id, timeout=1000) 252 if bdev is None: 253 raise VolumeException(grpc.StatusCode.NOT_FOUND, 254 'Volume could not be found') 255 # Check subsystem's NQN if it's specified 256 if params.nvmf.subnqn: 257 nvme = bdev.get('driver_specific', {}).get('nvme', []) 258 # The NVMe bdev can report multiple subnqns, but they all should be the same, so 259 # don't bother checking more than the first one 260 subnqn = next(iter(nvme), {}).get('trid', {}).get('subnqn') 261 if subnqn != params.nvmf.subnqn: 262 raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT, 263 'Unexpected subsystem NQN') 264 self._setup_crypto(volume_id, params) 265 # Finally remember that volume 266 self._volumes[volume_id] = Volume(volume_id, device_handle, discovery_services) 267 except Exception as ex: 268 for name in discovery_services: 269 try: 270 self._stop_discovery(name) 271 except Exception: 272 log.warning(f'Failed to cleanup discovery service: {name}') 273 raise ex 274 return volume_id, False 275 276 def _disconnect_volume(self, volume_id): 277 id = format_volume_id(volume_id) 278 if id is None: 279 raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT, 280 'Invalid volume ID') 281 # Return immediately if the volume is not on our map 282 volume = self._volumes.get(id) 283 if volume is None: 284 return 285 self._cleanup_crypto(id) 286 # Delete the volume from the map and stop the services it uses 287 for name in volume.discovery_services: 288 try: 289 self._stop_discovery(name) 290 except Exception: 291 # There's no good way to handle this, so just print an error message and 292 # continue 293 log.error(f'Failed to stop discovery service: {name}') 294 del self._volumes[id] 295 296 @_locked 297 def disconnect_volume(self, volume_id): 298 """Disconnects a volume connected through discovery service""" 299 return self._disconnect_volume(volume_id) 300 301 @_locked 302 def set_device(self, volume_id, device_handle): 303 """Marks a previously connected volume as being attached to specified device. This is only 304 necessary if the device handle is not known at a time a volume is connected. 305 """ 306 id = format_volume_id(volume_id) 307 if id is None: 308 raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT, 309 'Invalid volume ID') 310 volume = self._volumes.get(id) 311 if volume is None: 312 raise VolumeException(grpc.StatusCode.NOT_FOUND, 313 'Volume could not be found') 314 if volume.device_handle is not None and volume.device_handle != device_handle: 315 raise VolumeException(grpc.StatusCode.ALREADY_EXISTS, 316 'Volume is already attached to a different device') 317 volume.device_handle = device_handle 318 319 @_locked 320 def disconnect_device_volumes(self, device_handle): 321 """Disconnects all volumes attached to a specific device""" 322 volumes = [i for i, v in self._volumes.items() if v.device_handle == device_handle] 323 for volume_id in volumes: 324 self._disconnect_volume(volume_id) 325 326 @_locked 327 def has_volumes(self, device_handle): 328 """Checks whether a given device has volumes attached to it""" 329 return next(filter(lambda v: v.device_handle == device_handle, 330 self._volumes.values()), None) is not None 331