1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright (C) 2022 Intel Corporation. 3# All rights reserved. 4 5import grpc 6import logging 7 8 9log = logging.getLogger(__name__) 10 11 12class CryptoException(Exception): 13 def __init__(self, code, message): 14 self.code = code 15 self.message = message 16 17 18class CryptoEngine: 19 def __init__(self, name): 20 self.name = name 21 22 def init(self, client, params): 23 """Initialize crypto engine""" 24 self._client = client 25 26 def setup(self, volume_id, key, cipher, key2=None, tweak_mode=None): 27 """Set up crypto on a given volume""" 28 raise NotImplementedError() 29 30 def cleanup(self, volume_id): 31 """ 32 Disable crypto on a given volume. If crypto was not configured on that volume, this method 33 is a no-op and shouldn't raise any exceptions. 34 """ 35 raise NotImplementedError() 36 37 def verify(self, volume_id, key, cipher, key2=None, tweak_mode=None): 38 """ 39 Verify that specified crypto parameters match those that are currently deployed on a given 40 volume. If key is None, this method ensures that the volume doesn't use crypto. If 41 something is wrong (e.g. keys don't match, different cipher is used, etc.), this method 42 raises CryptoException. 43 """ 44 raise NotImplementedError() 45 46 def get_crypto_bdev(self, volume_id): 47 """ 48 Return the name of a crypto bdev on a given volume. This method might return volume_id if 49 crypto engine doesn't create a separate crypto bdev to set up crypto. If crypto is 50 disabled on a given volue, this method returns None. 51 """ 52 raise NotImplementedError() 53 54 55class CryptoEngineNop(CryptoEngine): 56 def __init__(self): 57 super().__init__('nop') 58 59 def setup(self, volume_id, key, cipher, key2=None, tweak_mode=None): 60 raise CryptoException(grpc.StatusCode.INVALID_ARGUMENT, 'Crypto is disabled') 61 62 def cleanup(self, volume_id): 63 pass 64 65 def verify(self, volume_id, key, cipher, key2=None, tweak_mode=None): 66 pass 67 68 def get_crypto_bdev(self, volume_id): 69 return None 70 71 72_crypto_engine = None 73_crypto_engines = {} 74 75 76def get_crypto_engine(): 77 return _crypto_engine 78 79 80def set_crypto_engine(name): 81 global _crypto_engine 82 engine = _crypto_engines.get(name) 83 if engine is None: 84 raise ValueError(f'Unknown crypto engine: {name}') 85 log.info(f'Setting crypto engine: {name}') 86 _crypto_engine = engine 87 88 89def register_crypto_engine(engine): 90 global _crypto_engines 91 _crypto_engines[engine.name] = engine 92