xref: /spdk/python/spdk/sma/sma.py (revision 606c4a83856e7e4339dfd5d76fbe6272f435f493)
1#  SPDX-License-Identifier: BSD-3-Clause
2#  Copyright (C) 2022 Intel Corporation.
3#  All rights reserved.
4
5from concurrent import futures
6from contextlib import contextmanager
7from multiprocessing import Lock
8import grpc
9import logging
10from .device import DeviceException
11from .volume import VolumeException, VolumeManager
12from .volume import crypto
13from .volume import crypto_bdev
14from .proto import sma_pb2 as pb2
15from .proto import sma_pb2_grpc as pb2_grpc
16
17
18class StorageManagementAgent(pb2_grpc.StorageManagementAgentServicer):
19    def __init__(self, config, client):
20        addr, port = config['address'], config['port']
21        self._devices = {}
22        self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
23        self._server.add_insecure_port(f'{addr}:{port}')
24        self._volume_mgr = VolumeManager(client, config['discovery_timeout'],
25                                         config['volume_cleanup_period'])
26        pb2_grpc.add_StorageManagementAgentServicer_to_server(self, self._server)
27
28    def _grpc_method(f):
29        def wrapper(self, request, context):
30            logging.debug(f'{f.__name__}\n{request}')
31            return f(self, request, context)
32        return wrapper
33
34    def register_device(self, device_manager):
35        self._devices[device_manager.protocol] = device_manager
36
37    def start(self):
38        self._volume_mgr.start()
39        self._server.start()
40
41    def stop(self):
42        self._server.stop(None)
43        self._volume_mgr.stop()
44
45    def _find_device_by_name(self, name):
46        return self._devices.get(name)
47
48    def _find_device_by_handle(self, handle):
49        for device in self._devices.values():
50            try:
51                if device.owns_device(handle):
52                    return device
53            except NotImplementedError:
54                pass
55        return None
56
57    def _cleanup_volume(self, volume_id, existing):
58        if volume_id is None or existing:
59            return
60        try:
61            self._volume_mgr.disconnect_volume(volume_id)
62        except VolumeException:
63            logging.warning('Failed to cleanup volume {volume_id}')
64
65    @_grpc_method
66    def CreateDevice(self, request, context):
67        response = pb2.CreateDeviceResponse()
68        volume_id, existing = None, False
69        try:
70            if request.HasField('volume'):
71                volume_id, existing = self._volume_mgr.connect_volume(request.volume)
72
73            manager = self._find_device_by_name(request.WhichOneof('params'))
74            if manager is None:
75                raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT,
76                                      'Unsupported device type')
77            response = manager.create_device(request)
78            # Now that we know the device handle, mark the volume as attached to
79            # that device
80            if volume_id is not None:
81                self._volume_mgr.set_device(volume_id, response.handle)
82        except (DeviceException, VolumeException) as ex:
83            self._cleanup_volume(volume_id, existing)
84            context.set_details(ex.message)
85            context.set_code(ex.code)
86        except NotImplementedError:
87            self._cleanup_volume(volume_id, existing)
88            context.set_details('Method is not implemented by selected device type')
89            context.set_code(grpc.StatusCode.UNIMPLEMENTED)
90        return response
91
92    @_grpc_method
93    def DeleteDevice(self, request, context):
94        response = pb2.DeleteDeviceResponse()
95        try:
96            device = self._find_device_by_handle(request.handle)
97            if device is None:
98                raise DeviceException(grpc.StatusCode.NOT_FOUND,
99                                      'Invalid device handle')
100            if not device.allow_delete_volumes and self._volume_mgr.has_volumes(request.handle):
101                raise DeviceException(grpc.StatusCode.FAILED_PRECONDITION,
102                                      'Device has attached volumes')
103            device.delete_device(request)
104            # Either there are no volumes attached to this device or we're allowed to delete it
105            # with volumes still attached
106            self._volume_mgr.disconnect_device_volumes(request.handle)
107        except DeviceException as ex:
108            context.set_details(ex.message)
109            context.set_code(ex.code)
110        except NotImplementedError:
111            context.set_details('Method is not implemented by selected device type')
112            context.set_code(grpc.StatusCode.UNIMPLEMENTED)
113        return response
114
115    @_grpc_method
116    def AttachVolume(self, request, context):
117        response = pb2.AttachVolumeResponse()
118        volume_id, existing = None, False
119        try:
120            if not request.HasField('volume'):
121                raise VolumeException(grpc.StatusCode.INVALID_ARGUMENT,
122                                      'Missing required field: volume')
123            volume_id, existing = self._volume_mgr.connect_volume(request.volume,
124                                                                  request.device_handle)
125            device = self._find_device_by_handle(request.device_handle)
126            if device is None:
127                raise DeviceException(grpc.StatusCode.NOT_FOUND, 'Invalid device handle')
128            device.attach_volume(request)
129        except (DeviceException, VolumeException) as ex:
130            self._cleanup_volume(volume_id, existing)
131            context.set_details(ex.message)
132            context.set_code(ex.code)
133        except NotImplementedError:
134            self._cleanup_volume(volume_id, existing)
135            context.set_details('Method is not implemented by selected device type')
136            context.set_code(grpc.StatusCode.UNIMPLEMENTED)
137        return response
138
139    @_grpc_method
140    def DetachVolume(self, request, context):
141        response = pb2.DetachVolumeResponse()
142        try:
143            device = self._find_device_by_handle(request.device_handle)
144            if device is None:
145                raise DeviceException(grpc.StatusCode.NOT_FOUND, 'Invalid device handle')
146            device.detach_volume(request)
147            self._volume_mgr.disconnect_volume(request.volume_id)
148        except DeviceException as ex:
149            context.set_details(ex.message)
150            context.set_code(ex.code)
151        return response
152
153    @_grpc_method
154    def SetQos(self, request, context):
155        response = pb2.SetQosResponse()
156        try:
157            device = self._find_device_by_handle(request.device_handle)
158            if device is None:
159                raise DeviceException(grpc.StatusCode.NOT_FOUND, 'Invalid device handle')
160            device.set_qos(request)
161        except DeviceException as ex:
162            context.set_details(ex.message)
163            context.set_code(ex.code)
164        except NotImplementedError:
165            context.set_details('Method is not implemented by selected device type')
166            context.set_code(grpc.StatusCode.UNIMPLEMENTED)
167        return response
168
169    @_grpc_method
170    def GetQosCapabilities(self, request, context):
171        device_type_map = {
172            pb2.DeviceType.DEVICE_TYPE_NVME: 'nvme',
173            pb2.DeviceType.DEVICE_TYPE_VIRTIO_BLK: 'virtio_blk',
174            pb2.DeviceType.DEVICE_TYPE_NVMF_TCP: 'nvmf_tcp',
175        }
176        response = pb2.GetQosCapabilitiesResponse()
177        try:
178            name = device_type_map.get(request.device_type)
179            if name is None:
180                raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT,
181                                      'Invalid device type')
182            manager = self._find_device_by_name(name)
183            if manager is None:
184                raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT,
185                                      'Unsupported device type')
186            response = manager.get_qos_capabilities(request)
187        except DeviceException as ex:
188            context.set_details(ex.message)
189            context.set_code(ex.code)
190        except NotImplementedError:
191            # If a device manager doesn't implement this method, return empty capabilities to
192            # indicate that no QoS capabilities are supported
193            pass
194        return response
195
196
197crypto.register_crypto_engine(crypto.CryptoEngineNop())
198crypto.register_crypto_engine(crypto_bdev.CryptoEngineBdev())
199crypto.set_crypto_engine('nop')
200