xref: /spdk/python/spdk/sma/volume/volume.py (revision 3408399172d246a023bf7cb18d84f88ad6233dc4)
1#  SPDX-License-Identifier: BSD-3-Clause
2#  Copyright (C) 2022 Intel Corporation.
3#  All rights reserved.
4
5import grpc
6import ipaddress
7import logging
8import threading
9import uuid
10from dataclasses import dataclass
11from spdk.rpc.client import JSONRPCException
12from . import crypto
13from ..common import format_volume_id
14from ..proto import sma_pb2
15
16
17log = logging.getLogger(__name__)
18
19
20class VolumeException(Exception):
21    def __init__(self, code, message):
22        self.code = code
23        self.message = message
24
25
26class Volume:
27    def __init__(self, volume_id, device_handle, discovery_services):
28        self.volume_id = volume_id
29        self.discovery_services = discovery_services
30        self.device_handle = device_handle
31
32
33class VolumeManager:
34    def __init__(self, client, discovery_timeout, cleanup_period):
35        self._client = client
36        # Discovery service map (name -> refcnt)
37        self._discovery = {}
38        # Volume map (volume_id -> Volume)
39        self._volumes = {}
40        self._discovery_timeout = int(discovery_timeout * 1000)
41        self._cleanup_period = cleanup_period
42        self._lock = threading.Lock()
43        self._cv = threading.Condition(self._lock)
44        self._running = False
45        self._thread = None
46
47    def _locked(f):
48        def wrapper(self, *args, **kwargs):
49            self._lock.acquire()
50            try:
51                return f(self, *args, **kwargs)
52            finally:
53                self._lock.release()
54        return wrapper
55
56    def start(self):
57        if self._thread is not None:
58            raise ValueError('Volume manager was already started')
59        self._running = True
60        self._thread = threading.Thread(target=self._cleanup_thread, args=(self,))
61        self._thread.start()
62
63    def stop(self):
64        if self._thread is None:
65            return
66        with self._lock:
67            self._running = False
68            self._cv.notify_all()
69        self._thread.join()
70        self._thread = None
71
72    @staticmethod
73    def _cleanup_thread(*args):
74        self, = args
75        with self._lock:
76            while self._running:
77                self._cleanup_volumes()
78                self._cv.wait(self._cleanup_period)
79
80    def _cleanup_volumes(self):
81        try:
82            disconnected = []
83            with self._client() as client:
84                bdevs = client.call('bdev_get_bdevs')
85                for volume_id in self._volumes:
86                    if volume_id not in [b['uuid'] for b in bdevs]:
87                        log.warning(f'Found disconnected volume: {volume_id}')
88                        disconnected.append(volume_id)
89            for volume_id in disconnected:
90                self._disconnect_volume(volume_id)
91        except VolumeException as ex:
92            log.error(f'Failure when trying to disconnect volumes: {ex.message}')
93        except JSONRPCException as ex:
94            log.error(f'Failed to retrieve bdevs: {ex.message}')
95
96    def _get_discovery_info(self):
97        try:
98            with self._client() as client:
99                return client.call('bdev_nvme_get_discovery_info')
100        except JSONRPCException:
101            raise VolumeException(grpc.StatusCode.INTERNAL,
102                                  'Failed to retrieve discovery service status')
103
104    def _compare_trid(self, trid1, trid2):
105        return (trid1['trtype'].lower() == trid2['trtype'].lower() and
106                trid1['traddr'].lower() == trid2['traddr'].lower() and
107                trid1['trsvcid'].lower() == trid2['trsvcid'].lower() and
108                trid1['adrfam'].lower() == trid2['adrfam'].lower())
109
110    def _get_adrfam(self, traddr):
111        try:
112            return 'ipv{}'.format(ipaddress.ip_address(traddr).version)
113        except ValueError:
114            raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT,
115                                  'Invalid traddr')
116
117    def _get_volume_bdev(self, volume_id, timeout):
118        try:
119            with self._client() as client:
120                return client.call('bdev_get_bdevs',
121                                   {'name': volume_id,
122                                    'timeout': timeout})[0]
123        except JSONRPCException:
124            return None
125
126    def _start_discovery(self, trid, hostnqn):
127        try:
128            # Use random UUID as name
129            name = str(uuid.uuid4())
130            log.debug(f'Starting discovery service {name}')
131            with self._client() as client:
132                client.call('bdev_nvme_start_discovery',
133                            {'name': name,
134                             'wait_for_attach': True,
135                             'attach_timeout_ms': self._discovery_timeout,
136                             'hostnqn': hostnqn,
137                             **trid})
138            self._discovery[name] = 1
139            return name
140        except JSONRPCException:
141            raise VolumeException(grpc.StatusCode.INTERNAL,
142                                  'Failed to start discovery')
143
144    def _stop_discovery(self, name):
145        refcnt = self._discovery.get(name)
146        log.debug(f'Stopping discovery service {name}, refcnt={refcnt}')
147        if refcnt is None:
148            # Should never happen
149            log.warning('Tried to stop discovery using non-existing name')
150            return
151        # Check the refcount to leave the service running if there are more volumes using it
152        if refcnt > 1:
153            self._discovery[name] = refcnt - 1
154            return
155        del self._discovery[name]
156        try:
157            with self._client() as client:
158                client.call('bdev_nvme_stop_discovery',
159                            {'name': name})
160            log.debug(f'Stopped discovery service {name}')
161        except JSONRPCException:
162            raise VolumeException(grpc.StatusCode.INTERNAL,
163                                  'Failed to stop discovery')
164
165    def _get_crypto_params(self, params):
166        key, cipher, key2, tweak_mode = None, None, None, None
167        try:
168            if params.HasField('crypto'):
169                key, cipher = params.crypto.key.decode('ascii'), params.crypto.cipher
170                if len(params.crypto.key2) > 0:
171                    key2 = params.crypto.key2.decode('ascii')
172                if params.crypto.tweak_mode is not None:
173                    tweak_mode = params.crypto.tweak_mode
174        except UnicodeDecodeError:
175            raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT,
176                                  'Corrupted crypto key')
177        return key, cipher, key2, tweak_mode
178
179    def _setup_crypto(self, volume_id, params):
180        try:
181            if not params.HasField('crypto'):
182                return
183            key, cipher, key2, tweak_mode = self._get_crypto_params(params)
184            crypto.get_crypto_engine().setup(volume_id, key, cipher, key2, tweak_mode)
185        except crypto.CryptoException as ex:
186            raise VolumeException(ex.code, ex.message)
187
188    def _cleanup_crypto(self, volume_id):
189        try:
190            crypto.get_crypto_engine().cleanup(volume_id)
191        except crypto.CryptoException as ex:
192            logging.warning(f'Failed to cleanup crypto: {ex.message}')
193
194    def _verify_crypto(self, volume_id, params):
195        try:
196            key, cipher, key2, tweak_mode = self._get_crypto_params(params)
197            crypto.get_crypto_engine().verify(volume_id, key, cipher, key2, tweak_mode)
198        except crypto.CryptoException as ex:
199            raise VolumeException(ex.code, ex.message)
200
201    @_locked
202    def connect_volume(self, params, device_handle=None):
203        """ Connects a volume through a discovery service.  Returns a tuple (volume_id, existing):
204        the first item is a volume_id as str, while the second denotes whether the selected volume
205        existed prior to calling this method.
206        """
207        volume_id = format_volume_id(params.volume_id)
208        if volume_id is None:
209            raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT,
210                                  'Invalid volume ID')
211        if volume_id in self._volumes:
212            volume = self._volumes[volume_id]
213            if device_handle is not None and volume.device_handle != device_handle:
214                raise VolumeException(grpc.StatusCode.ALREADY_EXISTS,
215                                      'Volume is already attached to a different device')
216            # Make sure the crypto params are the same
217            self._verify_crypto(volume_id, params)
218            return volume_id, True
219        discovery_services = set()
220        try:
221            # First start discovery connecting to specified endpoints
222            for req_ep in params.nvmf.discovery.discovery_endpoints:
223                info = self._get_discovery_info()
224                trid = {'trtype': req_ep.trtype,
225                        'traddr': req_ep.traddr,
226                        'trsvcid': req_ep.trsvcid,
227                        'adrfam': self._get_adrfam(req_ep.traddr)}
228                name = None
229                for discovery in info:
230                    if self._compare_trid(discovery['trid'], trid):
231                        name = discovery['name']
232                        break
233                    if next(filter(lambda r: self._compare_trid(r['trid'], trid),
234                                   discovery['referrals']), None):
235                        name = discovery['name']
236                        break
237                if name is not None:
238                    # If we've already attached a discovery service, it probably means that the user
239                    # specified a referred address
240                    if name not in discovery_services:
241                        refcnt = self._discovery.get(name)
242                        if refcnt is None:
243                            log.warning('Found a discovery service missing from internal map')
244                            refcnt = 0
245                        self._discovery[name] = refcnt + 1
246                else:
247                    name = self._start_discovery(trid, params.nvmf.hostnqn)
248                discovery_services.add(name)
249
250            # Now check if a bdev with specified volume_id exists, give it 1s to appear
251            bdev = self._get_volume_bdev(volume_id, timeout=1000)
252            if bdev is None:
253                raise VolumeException(grpc.StatusCode.NOT_FOUND,
254                                      'Volume could not be found')
255            # Check subsystem's NQN if it's specified
256            if params.nvmf.subnqn:
257                nvme = bdev.get('driver_specific', {}).get('nvme', [])
258                # The NVMe bdev can report multiple subnqns, but they all should be the same, so
259                # don't bother checking more than the first one
260                subnqn = next(iter(nvme), {}).get('trid', {}).get('subnqn')
261                if subnqn != params.nvmf.subnqn:
262                    raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT,
263                                          'Unexpected subsystem NQN')
264            self._setup_crypto(volume_id, params)
265            # Finally remember that volume
266            self._volumes[volume_id] = Volume(volume_id, device_handle, discovery_services)
267        except Exception as ex:
268            for name in discovery_services:
269                try:
270                    self._stop_discovery(name)
271                except Exception:
272                    log.warning(f'Failed to cleanup discovery service: {name}')
273            raise ex
274        return volume_id, False
275
276    def _disconnect_volume(self, volume_id):
277        id = format_volume_id(volume_id)
278        if id is None:
279            raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT,
280                                  'Invalid volume ID')
281        # Return immediately if the volume is not on our map
282        volume = self._volumes.get(id)
283        if volume is None:
284            return
285        self._cleanup_crypto(id)
286        # Delete the volume from the map and stop the services it uses
287        for name in volume.discovery_services:
288            try:
289                self._stop_discovery(name)
290            except Exception:
291                # There's no good way to handle this, so just print an error message and
292                # continue
293                log.error(f'Failed to stop discovery service: {name}')
294        del self._volumes[id]
295
296    @_locked
297    def disconnect_volume(self, volume_id):
298        """Disconnects a volume connected through discovery service"""
299        return self._disconnect_volume(volume_id)
300
301    @_locked
302    def set_device(self, volume_id, device_handle):
303        """Marks a previously connected volume as being attached to specified device.  This is only
304        necessary if the device handle is not known at a time a volume is connected.
305        """
306        id = format_volume_id(volume_id)
307        if id is None:
308            raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT,
309                                  'Invalid volume ID')
310        volume = self._volumes.get(id)
311        if volume is None:
312            raise VolumeException(grpc.StatusCode.NOT_FOUND,
313                                  'Volume could not be found')
314        if volume.device_handle is not None and volume.device_handle != device_handle:
315            raise VolumeException(grpc.StatusCode.ALREADY_EXISTS,
316                                  'Volume is already attached to a different device')
317        volume.device_handle = device_handle
318
319    @_locked
320    def disconnect_device_volumes(self, device_handle):
321        """Disconnects all volumes attached to a specific device"""
322        volumes = [i for i, v in self._volumes.items() if v.device_handle == device_handle]
323        for volume_id in volumes:
324            self._disconnect_volume(volume_id)
325
326    @_locked
327    def has_volumes(self, device_handle):
328        """Checks whether a given device has volumes attached to it"""
329        return next(filter(lambda v: v.device_handle == device_handle,
330                           self._volumes.values()), None) is not None
331