1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright (C) 2022 Intel Corporation. 3# All rights reserved. 4 5import grpc 6import logging 7import uuid 8from spdk.rpc.client import JSONRPCException 9from spdk.sma import qos 10from .device import DeviceManager, DeviceException 11from ..common import format_volume_id, volume_id_to_nguid 12from ..volume import get_crypto_engine, CryptoException 13from ..proto import sma_pb2 14from ..proto import nvmf_tcp_pb2 15 16 17class NvmfTcpDeviceManager(DeviceManager): 18 def __init__(self, client): 19 super().__init__('nvmf_tcp', 'nvmf_tcp', client) 20 21 def init(self, config): 22 self._has_transport = self._create_transport() 23 24 def _create_transport(self): 25 try: 26 with self._client() as client: 27 transports = client.call('nvmf_get_transports') 28 for transport in transports: 29 if transport['trtype'].lower() == 'tcp': 30 return True 31 # TODO: take the transport params from config 32 return client.call('nvmf_create_transport', 33 {'trtype': 'tcp'}) 34 except JSONRPCException: 35 logging.error('Failed to query for NVMe/TCP transport') 36 return False 37 38 def _check_transport(f): 39 def wrapper(self, *args): 40 if not self._has_transport: 41 raise DeviceException(grpc.StatusCode.INTERNAL, 42 'NVMe/TCP transport is unavailable') 43 return f(self, *args) 44 return wrapper 45 46 def _get_params(self, request, params): 47 result = {} 48 for grpc_param, *rpc_param in params: 49 rpc_param = rpc_param[0] if rpc_param else grpc_param 50 result[rpc_param] = getattr(request, grpc_param) 51 return result 52 53 def _check_addr(self, addr, addrlist): 54 return next(filter(lambda a: ( 55 a['trtype'].lower() == 'tcp' and 56 a['adrfam'].lower() == addr['adrfam'].lower() and 57 a['traddr'].lower() == addr['traddr'].lower() and 58 a['trsvcid'].lower() == addr['trsvcid'].lower()), addrlist), None) is not None 59 60 def _get_nqn_from_handle(self, handle): 61 return handle[len('nvmf-tcp:'):] 62 63 @_check_transport 64 def create_device(self, request): 65 params = request.nvmf_tcp 66 with self._client() as client: 67 try: 68 subsystems = client.call('nvmf_get_subsystems') 69 for subsystem in subsystems: 70 if subsystem['nqn'] == params.subnqn: 71 break 72 else: 73 subsystem = None 74 result = client.call('nvmf_create_subsystem', 75 {**self._get_params(params, [ 76 ('subnqn', 'nqn'), 77 ('allow_any_host',)])}) 78 except JSONRPCException: 79 raise DeviceException(grpc.StatusCode.INTERNAL, 80 'Failed to create NVMe/TCP device') 81 try: 82 for host in params.hosts: 83 client.call('nvmf_subsystem_add_host', 84 {'nqn': params.subnqn, 85 'host': host}) 86 if subsystem is not None: 87 for host in [h['nqn'] for h in subsystem['hosts']]: 88 if host not in params.hosts: 89 client.call('nvmf_subsystem_remove_host', 90 {'nqn': params.subnqn, 91 'host': host}) 92 93 addr = self._get_params(params, [ 94 ('adrfam',), 95 ('traddr',), 96 ('trsvcid',)]) 97 if subsystem is None or not self._check_addr(addr, 98 subsystem['listen_addresses']): 99 client.call('nvmf_subsystem_add_listener', 100 {'nqn': params.subnqn, 101 'listen_address': {'trtype': 'tcp', **addr}}) 102 volume_id = format_volume_id(request.volume.volume_id) 103 if volume_id is not None: 104 bdev_name = get_crypto_engine().get_crypto_bdev(volume_id) or volume_id 105 result = client.call('nvmf_subsystem_add_ns', 106 {'nqn': params.subnqn, 107 'namespace': { 108 'bdev_name': bdev_name, 109 'uuid': volume_id, 110 'nguid': volume_id_to_nguid(volume_id)}}) 111 except (JSONRPCException, CryptoException): 112 try: 113 client.call('nvmf_delete_subsystem', {'nqn': params.subnqn}) 114 except JSONRPCException: 115 logging.warning(f'Failed to delete subsystem: {params.subnqn}') 116 raise DeviceException(grpc.StatusCode.INTERNAL, 117 'Failed to create NVMe/TCP device') 118 119 return sma_pb2.CreateDeviceResponse(handle=f'nvmf-tcp:{params.subnqn}') 120 121 @_check_transport 122 def delete_device(self, request): 123 with self._client() as client: 124 nqn = self._get_nqn_from_handle(request.handle) 125 subsystems = client.call('nvmf_get_subsystems') 126 for subsystem in subsystems: 127 if subsystem['nqn'] == nqn: 128 result = client.call('nvmf_delete_subsystem', 129 {'nqn': nqn}) 130 if not result: 131 raise DeviceException(grpc.StatusCode.INTERNAL, 132 'Failed to delete device') 133 break 134 else: 135 logging.info(f'Tried to delete a non-existing device: {nqn}') 136 137 def _find_bdev(self, client, guid): 138 try: 139 bdev_name = get_crypto_engine().get_crypto_bdev(guid) or guid 140 return client.call('bdev_get_bdevs', {'name': bdev_name})[0] 141 except (JSONRPCException, CryptoException): 142 return None 143 144 @_check_transport 145 def attach_volume(self, request): 146 nqn = self._get_nqn_from_handle(request.device_handle) 147 volume_id = format_volume_id(request.volume.volume_id) 148 if volume_id is None: 149 raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT, 150 'Invalid volume ID') 151 try: 152 with self._client() as client: 153 bdev = self._find_bdev(client, volume_id) 154 if bdev is None: 155 raise DeviceException(grpc.StatusCode.NOT_FOUND, 156 'Invalid volume GUID') 157 subsystems = client.call('nvmf_get_subsystems') 158 for subsys in subsystems: 159 if subsys['nqn'] == nqn: 160 break 161 else: 162 raise DeviceException(grpc.StatusCode.NOT_FOUND, 163 'Invalid device handle') 164 if bdev['name'] not in [ns['name'] for ns in subsys['namespaces']]: 165 result = client.call('nvmf_subsystem_add_ns', 166 {'nqn': nqn, 167 'namespace': { 168 'bdev_name': bdev['name'], 169 'uuid': volume_id, 170 'nguid': volume_id_to_nguid(volume_id)}}) 171 if not result: 172 raise DeviceException(grpc.StatusCode.INTERNAL, 173 'Failed to attach volume') 174 except JSONRPCException: 175 raise DeviceException(grpc.StatusCode.INTERNAL, 176 'Failed to attach volume') 177 178 @_check_transport 179 def detach_volume(self, request): 180 nqn = self._get_nqn_from_handle(request.device_handle) 181 volume = format_volume_id(request.volume_id) 182 if volume is None: 183 raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT, 184 'Invalid volume ID') 185 try: 186 with self._client() as client: 187 bdev = self._find_bdev(client, volume) 188 if bdev is None: 189 logging.info(f'Tried to detach non-existing volume: {volume}') 190 return 191 192 subsystems = client.call('nvmf_get_subsystems') 193 for subsys in subsystems: 194 if subsys['nqn'] == nqn: 195 break 196 else: 197 logging.info(f'Tried to detach volume: {volume} from non-existing ' + 198 f'device: {nqn}') 199 return 200 201 for ns in subsys['namespaces']: 202 if ns['name'] != bdev['name']: 203 continue 204 result = client.call('nvmf_subsystem_remove_ns', 205 {'nqn': nqn, 206 'nsid': ns['nsid']}) 207 if not result: 208 raise DeviceException(grpc.StatusCode.INTERNAL, 209 'Failed to detach volume') 210 break 211 except JSONRPCException: 212 raise DeviceException(grpc.StatusCode.INTERNAL, 213 'Failed to detach volume') 214 215 def owns_device(self, handle): 216 return handle.startswith('nvmf-tcp') 217 218 def set_qos(self, request): 219 nqn = self._get_nqn_from_handle(request.device_handle) 220 volume = format_volume_id(request.volume_id) 221 if volume is None: 222 raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT, 223 'Invalid volume ID') 224 try: 225 with self._client() as client: 226 # Make sure that a volume exists and is attached to the device 227 bdev = self._find_bdev(client, volume) 228 if bdev is None: 229 raise DeviceException(grpc.StatusCode.NOT_FOUND, 230 'No volume associated with volume_id could be found') 231 try: 232 subsys = client.call('nvmf_get_subsystems', {'nqn': nqn})[0] 233 except JSONRPCException: 234 raise DeviceException(grpc.StatusCode.NOT_FOUND, 235 'No device associated with device_handle could be found') 236 for ns in subsys['namespaces']: 237 if ns['name'] == bdev['name']: 238 break 239 else: 240 raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT, 241 'Specified volume is not attached to the device') 242 qos.set_volume_bdev_qos(client, request) 243 except qos.QosException as ex: 244 raise DeviceException(ex.code, ex.message) 245 except JSONRPCException: 246 raise DeviceException(grpc.StatusCode.INTERNAL, 247 'Failed to set QoS') 248 249 def get_qos_capabilities(self, request): 250 return qos.get_bdev_qos_capabilities() 251