1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright (C) 2022 Intel Corporation. 3# All rights reserved. 4 5import grpc 6import logging 7import uuid 8from spdk.rpc.client import JSONRPCException 9from . import crypto 10from ..common import format_volume_id 11from ..proto import sma_pb2 12 13 14log = logging.getLogger(__name__) 15 16 17class CryptoEngineBdev(crypto.CryptoEngine): 18 _ciphers = {sma_pb2.VolumeCryptoParameters.AES_CBC: 'AES_CBC', 19 sma_pb2.VolumeCryptoParameters.AES_XTS: 'AES_XTS'} 20 21 def __init__(self): 22 super().__init__('bdev_crypto') 23 24 def init(self, client, params): 25 super().init(client, params) 26 # _driver can be None 27 self._driver = params.get('driver') 28 29 def setup(self, volume_id, key, cipher, key2=None, tweak_mode=None): 30 try: 31 with self._client() as client: 32 cipher = self._ciphers.get(cipher) 33 if cipher is None: 34 raise crypto.CryptoException(grpc.StatusCode.INVALID_ARGUMENT, 35 'Invalid volume crypto configuration: bad cipher') 36 params = {'base_bdev_name': volume_id, 37 'name': str(uuid.uuid4()), 38 'key': key, 39 'cipher': cipher} 40 if self._driver is not None: 41 params['crypto_pmd'] = self._driver 42 if key2 is not None: 43 params['key2'] = key2 44 if tweak_mode is not None and tweak_mode != sma_pb2.VolumeCryptoParameters.TWEAK_MODE_SIMPLE_LBA: 45 raise crypto.CryptoException(grpc.StatusCode.INVALID_ARGUMENT, 46 'Invalid volume crypto configuration: bad tweak_mode') 47 48 log.info('Creating crypto bdev: {} on volume: {}'.format( 49 params['name'], volume_id)) 50 client.call('bdev_crypto_create', params) 51 except JSONRPCException: 52 raise crypto.CryptoException(grpc.StatusCode.INTERNAL, 53 f'Failed to setup crypto for volume: {volume_id}') 54 55 def cleanup(self, volume_id): 56 crypto_bdev = self.get_crypto_bdev(volume_id) 57 # If there's no crypto bdev set up on top of this volume, we're done 58 if crypto_bdev is None: 59 return 60 try: 61 with self._client() as client: 62 log.info('Deleting crypto bdev: {} from volume: {}'.format( 63 crypto_bdev, volume_id)) 64 client.call('bdev_crypto_delete', {'name': crypto_bdev}) 65 except JSONRPCException: 66 raise crypto.CryptoException(grpc.StatusCode.INTERNAL, 67 'Failed to delete crypto bdev') 68 69 def verify(self, volume_id, key, cipher, key2=None, tweak_mode=None): 70 crypto_bdev = self._get_crypto_bdev(volume_id) 71 # Key being None/non-None defines whether we expect a bdev_crypto on top of a given volume 72 if ((key is None and crypto_bdev is not None) or (key is not None and crypto_bdev is None)): 73 raise crypto.CryptoException(grpc.StatusCode.INVALID_ARGUMENT, 74 'Invalid volume crypto configuration') 75 if key is None: 76 return 77 params = crypto_bdev['driver_specific']['crypto'] 78 crypto_key = self._get_crypto_key(params['key_name']) 79 if crypto_key is None: 80 raise crypto.CryptoException(grpc.StatusCode.INVALID_ARGUMENT, 81 'No key object found') 82 cipher = self._ciphers.get(cipher) 83 if cipher is None: 84 raise crypto.CryptoException(grpc.StatusCode.INVALID_ARGUMENT, 85 'Invalid volume crypto configuration: bad cipher') 86 if crypto_key['cipher'].lower() != cipher.lower(): 87 raise crypto.CryptoException(grpc.StatusCode.INVALID_ARGUMENT, 88 'Invalid volume crypto configuration: bad cipher') 89 if crypto_key['key'].lower() != key.lower(): 90 raise crypto.CryptoException(grpc.StatusCode.INVALID_ARGUMENT, 91 'Invalid volume crypto configuration: bad key') 92 if key2 is not None and crypto_key.get('key2', '').lower() != key2.lower(): 93 raise crypto.CryptoException(grpc.StatusCode.INVALID_ARGUMENT, 94 'Invalid volume crypto configuration: bad key2') 95 if crypto_key['name'].lower() != params['key_name'].lower(): 96 raise crypto.CryptoException(grpc.StatusCode.INVALID_ARGUMENT, 97 'Invalid volume crypto configuration: key name does not match') 98 if tweak_mode is not None and tweak_mode != sma_pb2.VolumeCryptoParameters.TWEAK_MODE_SIMPLE_LBA: 99 raise crypto.CryptoException(grpc.StatusCode.INVALID_ARGUMENT, 100 'Invalid volume crypto configuration: bad tweak_mode') 101 102 def _get_crypto_bdev(self, volume_id): 103 try: 104 with self._client() as client: 105 bdevs = client.call('bdev_get_bdevs') 106 for bdev in [b for b in bdevs if b['product_name'] == 'crypto']: 107 base_name = bdev['driver_specific']['crypto']['base_bdev_name'] 108 base_bdev = next(filter(lambda b: b['name'] == base_name, bdevs), None) 109 # Should never really happen, but check it just in case 110 if base_bdev is None: 111 raise crypto.CryptoException( 112 grpc.StatusCode.INTERNAL, 113 'Unexpected crypto configuration: cannot find base bdev') 114 if format_volume_id(base_bdev['uuid']) == volume_id: 115 return bdev 116 # There's no crypto bdev set up on top of this volume 117 return None 118 except JSONRPCException: 119 raise crypto.CryptoException(grpc.StatusCode.INTERNAL, 120 f'Failed to get bdev_crypto for volume: {volume_id}') 121 122 def _get_crypto_key(self, key_name): 123 try: 124 with self._client() as client: 125 _keys = client.call('accel_crypto_keys_get', {'key_name': key_name}) 126 if _keys is not None: 127 return _keys[0] 128 return None 129 except JSONRPCException: 130 pass 131 132 def get_crypto_bdev(self, volume_id): 133 bdev = self._get_crypto_bdev(volume_id) 134 if bdev is not None: 135 return bdev['name'] 136 return None 137