xref: /spdk/python/spdk/sma/volume/crypto_bdev.py (revision 3408399172d246a023bf7cb18d84f88ad6233dc4)
1#  SPDX-License-Identifier: BSD-3-Clause
2#  Copyright (C) 2022 Intel Corporation.
3#  All rights reserved.
4
5import grpc
6import logging
7import uuid
8from spdk.rpc.client import JSONRPCException
9from . import crypto
10from ..common import format_volume_id
11from ..proto import sma_pb2
12
13
14log = logging.getLogger(__name__)
15
16
17class CryptoEngineBdev(crypto.CryptoEngine):
18    _ciphers = {sma_pb2.VolumeCryptoParameters.AES_CBC: 'AES_CBC',
19                sma_pb2.VolumeCryptoParameters.AES_XTS: 'AES_XTS'}
20
21    def __init__(self):
22        super().__init__('bdev_crypto')
23
24    def init(self, client, params):
25        super().init(client, params)
26        # _driver can be None
27        self._driver = params.get('driver')
28
29    def setup(self, volume_id, key, cipher, key2=None, tweak_mode=None):
30        try:
31            with self._client() as client:
32                cipher = self._ciphers.get(cipher)
33                if cipher is None:
34                    raise crypto.CryptoException(grpc.StatusCode.INVALID_ARGUMENT,
35                                                 'Invalid volume crypto configuration: bad cipher')
36                params = {'base_bdev_name': volume_id,
37                          'name': str(uuid.uuid4()),
38                          'key': key,
39                          'cipher': cipher}
40                if self._driver is not None:
41                    params['crypto_pmd'] = self._driver
42                if key2 is not None:
43                    params['key2'] = key2
44                if tweak_mode is not None and tweak_mode != sma_pb2.VolumeCryptoParameters.TWEAK_MODE_SIMPLE_LBA:
45                    raise crypto.CryptoException(grpc.StatusCode.INVALID_ARGUMENT,
46                                                 'Invalid volume crypto configuration: bad tweak_mode')
47
48                log.info('Creating crypto bdev: {} on volume: {}'.format(
49                            params['name'], volume_id))
50                client.call('bdev_crypto_create', params)
51        except JSONRPCException:
52            raise crypto.CryptoException(grpc.StatusCode.INTERNAL,
53                                         f'Failed to setup crypto for volume: {volume_id}')
54
55    def cleanup(self, volume_id):
56        crypto_bdev = self.get_crypto_bdev(volume_id)
57        # If there's no crypto bdev set up on top of this volume, we're done
58        if crypto_bdev is None:
59            return
60        try:
61            with self._client() as client:
62                log.info('Deleting crypto bdev: {} from volume: {}'.format(
63                            crypto_bdev, volume_id))
64                client.call('bdev_crypto_delete', {'name': crypto_bdev})
65        except JSONRPCException:
66            raise crypto.CryptoException(grpc.StatusCode.INTERNAL,
67                                         'Failed to delete crypto bdev')
68
69    def verify(self, volume_id, key, cipher, key2=None, tweak_mode=None):
70        crypto_bdev = self._get_crypto_bdev(volume_id)
71        # Key being None/non-None defines whether we expect a bdev_crypto on top of a given volume
72        if ((key is None and crypto_bdev is not None) or (key is not None and crypto_bdev is None)):
73            raise crypto.CryptoException(grpc.StatusCode.INVALID_ARGUMENT,
74                                         'Invalid volume crypto configuration')
75        if key is None:
76            return
77        params = crypto_bdev['driver_specific']['crypto']
78        crypto_key = self._get_crypto_key(params['key_name'])
79        if crypto_key is None:
80            raise crypto.CryptoException(grpc.StatusCode.INVALID_ARGUMENT,
81                                         'No key object found')
82        cipher = self._ciphers.get(cipher)
83        if cipher is None:
84            raise crypto.CryptoException(grpc.StatusCode.INVALID_ARGUMENT,
85                                         'Invalid volume crypto configuration: bad cipher')
86        if crypto_key['cipher'].lower() != cipher.lower():
87            raise crypto.CryptoException(grpc.StatusCode.INVALID_ARGUMENT,
88                                         'Invalid volume crypto configuration: bad cipher')
89        if crypto_key['key'].lower() != key.lower():
90            raise crypto.CryptoException(grpc.StatusCode.INVALID_ARGUMENT,
91                                         'Invalid volume crypto configuration: bad key')
92        if key2 is not None and crypto_key.get('key2', '').lower() != key2.lower():
93            raise crypto.CryptoException(grpc.StatusCode.INVALID_ARGUMENT,
94                                         'Invalid volume crypto configuration: bad key2')
95        if crypto_key['name'].lower() != params['key_name'].lower():
96            raise crypto.CryptoException(grpc.StatusCode.INVALID_ARGUMENT,
97                                         'Invalid volume crypto configuration: key name does not match')
98        if tweak_mode is not None and tweak_mode != sma_pb2.VolumeCryptoParameters.TWEAK_MODE_SIMPLE_LBA:
99            raise crypto.CryptoException(grpc.StatusCode.INVALID_ARGUMENT,
100                                         'Invalid volume crypto configuration: bad tweak_mode')
101
102    def _get_crypto_bdev(self, volume_id):
103        try:
104            with self._client() as client:
105                bdevs = client.call('bdev_get_bdevs')
106                for bdev in [b for b in bdevs if b['product_name'] == 'crypto']:
107                    base_name = bdev['driver_specific']['crypto']['base_bdev_name']
108                    base_bdev = next(filter(lambda b: b['name'] == base_name, bdevs), None)
109                    # Should never really happen, but check it just in case
110                    if base_bdev is None:
111                        raise crypto.CryptoException(
112                                grpc.StatusCode.INTERNAL,
113                                'Unexpected crypto configuration: cannot find base bdev')
114                    if format_volume_id(base_bdev['uuid']) == volume_id:
115                        return bdev
116                # There's no crypto bdev set up on top of this volume
117                return None
118        except JSONRPCException:
119            raise crypto.CryptoException(grpc.StatusCode.INTERNAL,
120                                         f'Failed to get bdev_crypto for volume: {volume_id}')
121
122    def _get_crypto_key(self, key_name):
123        try:
124            with self._client() as client:
125                _keys = client.call('accel_crypto_keys_get', {'key_name': key_name})
126                if _keys is not None:
127                    return _keys[0]
128                return None
129        except JSONRPCException:
130            pass
131
132    def get_crypto_bdev(self, volume_id):
133        bdev = self._get_crypto_bdev(volume_id)
134        if bdev is not None:
135            return bdev['name']
136        return None
137