xref: /spdk/python/spdk/rpc/client.py (revision bf8dcb56e50ff84fddf6de364d0a71d3203ee459)
1#  SPDX-License-Identifier: BSD-3-Clause
2#  Copyright (C) 2017 Intel Corporation.
3#  All rights reserved.
4
5import json
6import socket
7import time
8import os
9import logging
10import copy
11import ctypes
12
13
14def print_dict(d):
15    print(json.dumps(d, indent=2))
16
17
18def print_json(s):
19    print(json.dumps(s, indent=2).strip('"'))
20
21
22def get_addr_type(addr):
23    try:
24        socket.inet_pton(socket.AF_INET, addr)
25        return socket.AF_INET
26    except Exception as e:
27        pass
28    try:
29        socket.inet_pton(socket.AF_INET6, addr)
30        return socket.AF_INET6
31    except Exception as e:
32        pass
33    if os.path.exists(addr):
34        return socket.AF_UNIX
35    return None
36
37
38class JSONRPCException(Exception):
39    def __init__(self, message):
40        self.message = message
41
42
43class JSONRPCClient(object):
44    def __init__(self, addr, port=None, timeout=None, **kwargs):
45        self.sock = None
46        ch = logging.StreamHandler()
47        ch.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
48        ch.setLevel(logging.DEBUG)
49        self._logger = logging.getLogger("JSONRPCClient(%s)" % addr)
50        self._logger.addHandler(ch)
51        self.log_set_level(kwargs.get('log_level', logging.ERROR))
52        connect_retries = kwargs.get('conn_retries', 0)
53
54        self.timeout = timeout if timeout is not None else 60.0
55        self._request_id = 0
56        self._recv_buf = ""
57        self._reqs = []
58
59        for i in range(connect_retries):
60            try:
61                self._connect(addr, port)
62                return
63            except Exception as e:
64                # ignore and retry in 200ms
65                time.sleep(0.2)
66
67        # try one last time without try/except
68        self._connect(addr, port)
69
70    def __enter__(self):
71        return self
72
73    def __exit__(self, exception_type, exception_value, traceback):
74        self.close()
75
76    def _connect(self, addr, port):
77        try:
78            addr_type = get_addr_type(addr)
79
80            if addr_type == socket.AF_UNIX:
81                self._logger.debug("Trying to connect to UNIX socket: %s", addr)
82                self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
83                self.sock.connect(addr)
84            elif addr_type == socket.AF_INET6:
85                self._logger.debug("Trying to connect to IPv6 address addr:%s, port:%i", addr, port)
86                for res in socket.getaddrinfo(addr, port, socket.AF_INET6, socket.SOCK_STREAM, socket.SOL_TCP):
87                    af, socktype, proto, canonname, sa = res
88                self.sock = socket.socket(af, socktype, proto)
89                self.sock.connect(sa)
90            elif addr_type == socket.AF_INET:
91                self._logger.debug("Trying to connect to IPv4 address addr:%s, port:%i'", addr, port)
92                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
93                self.sock.connect((addr, port))
94            else:
95                raise socket.error("Invalid or non-existing address: '%s'" % addr)
96        except socket.error as ex:
97            raise JSONRPCException("Error while connecting to %s\n"
98                                   "Is SPDK application running?\n"
99                                   "Error details: %s" % (addr, ex))
100
101    def get_logger(self):
102        return self._logger
103
104    """Set logging level
105
106    Args:
107        lvl: Log level to set as accepted by logger.setLevel
108    """
109    def log_set_level(self, lvl):
110        self._logger.info("Setting log level to %s", lvl)
111        self._logger.setLevel(lvl)
112        self._logger.info("Log level set to %s", lvl)
113
114    def close(self):
115        if getattr(self, "sock", None):
116            self.sock.shutdown(socket.SHUT_RDWR)
117            self.sock.close()
118            self.sock = None
119
120    def add_request(self, method, params):
121        self._request_id += 1
122        req = {
123            'jsonrpc': '2.0',
124            'method': method,
125            'id': self._request_id
126        }
127
128        if params:
129            req['params'] = copy.deepcopy(params)
130
131        self._logger.debug("append request:\n%s\n", json.dumps(req))
132        self._reqs.append(req)
133        return self._request_id
134
135    def flush(self):
136        self._logger.debug("Flushing buffer")
137        # TODO: We can drop indent parameter
138        reqstr = "\n".join(json.dumps(req, indent=2) for req in self._reqs)
139        self._reqs = []
140        self._logger.info("Requests:\n%s\n", reqstr)
141        self.sock.sendall(reqstr.encode("utf-8"))
142
143    def send(self, method, params=None):
144        id = self.add_request(method, params)
145        self.flush()
146        return id
147
148    def decode_one_response(self):
149        try:
150            self._logger.debug("Trying to decode response '%s'", self._recv_buf)
151            buf = self._recv_buf.lstrip()
152            obj, idx = json.JSONDecoder().raw_decode(buf)
153            self._recv_buf = buf[idx:]
154            return obj
155        except ValueError:
156            self._logger.debug("Partial response")
157            return None
158
159    def recv(self):
160        start_time = time.process_time()
161        response = self.decode_one_response()
162        while not response:
163            try:
164                timeout = self.timeout - (time.process_time() - start_time)
165                self.sock.settimeout(timeout)
166                newdata = self.sock.recv(4096)
167                if not newdata:
168                    self.sock.close()
169                    self.sock = None
170                    raise JSONRPCException("Connection closed with partial response:\n%s\n" % self._recv_buf)
171                self._recv_buf += newdata.decode("utf-8")
172                response = self.decode_one_response()
173            except socket.timeout:
174                break  # throw exception after loop to avoid Python freaking out about nested exceptions
175            except ValueError:
176                continue  # incomplete response; keep buffering
177
178        if not response:
179            raise JSONRPCException("Timeout while waiting for response:\n%s\n" % self._recv_buf)
180
181        self._logger.info("response:\n%s\n", json.dumps(response, indent=2))
182        return response
183
184    def call(self, method, params=None):
185        self._logger.debug("call('%s')" % method)
186        params = {} if params is None else params
187        if self.timeout <= 0:
188            raise JSONRPCException("Timeout value is invalid: %s\n" % self.timeout)
189        req_id = self.send(method, params)
190        try:
191            response = self.recv()
192        except JSONRPCException as e:
193            """ Don't expect response to kill """
194            if not self.sock and method == "spdk_kill_instance":
195                self._logger.info("Connection terminated but ignoring since method is '%s'" % method)
196                return {}
197            else:
198                raise e
199
200        if 'error' in response:
201            params["method"] = method
202            params["req_id"] = req_id
203            msg = "\n".join(["request:", "%s" % json.dumps(params, indent=2),
204                             "Got JSON-RPC error response",
205                             "response:",
206                             json.dumps(response['error'], indent=2)])
207            raise JSONRPCException(msg)
208
209        return response['result']
210
211
212class JSONRPCGoClient(object):
213    INVALID_PARAMETER_ERROR = 1
214    CONNECTION_ERROR = 2
215    JSON_RPC_CALL_ERROR = 3
216    INVALID_RESPONSE_ERROR = 4
217
218    def __init__(self, addr, **kwargs):
219        self.addr = addr
220        ch = logging.StreamHandler()
221        ch.setFormatter(logging.Formatter('Go client - %(levelname)s: %(message)s'))
222        ch.setLevel(logging.DEBUG)
223        self._logger = logging.getLogger("JSONRPCGoClient(%s)" % addr)
224        self._logger.addHandler(ch)
225        self._logger.setLevel(kwargs.get('log_level', logging.ERROR))
226
227    def call(self, method, params=None):
228        self._logger.debug("call('%s')" % method)
229        params = {} if params is None else params
230
231        class GoClientResponse(ctypes.Structure):
232            _fields_ = [("response", ctypes.POINTER(ctypes.c_char)), ("error", ctypes.c_int)]
233
234        client_path = os.path.normpath(os.path.abspath(os.path.join(os.path.dirname(__file__),
235                                                                    os.path.pardir))
236                                       + '/../../build/go/rpc/libspdk_gorpc.so')
237        try:
238            lib = ctypes.cdll.LoadLibrary(client_path)
239        except OSError:
240            raise JSONRPCException(f'Failed to load the Go RPC client at {client_path}')
241        lib.spdk_gorpc_free_response.argtypes = [ctypes.POINTER(ctypes.c_char)]
242        lib.spdk_gorpc_call.argtypes = [ctypes.c_char_p, ctypes.c_char_p]
243        lib.spdk_gorpc_call.restype = GoClientResponse
244
245        command_info = {
246            "method": method,
247            "params": params
248        }
249        resp = lib.spdk_gorpc_call(json.dumps(command_info).encode('utf-8'),
250                                   self.addr.encode('utf-8'))
251        if resp.error > 0:
252            rpc_error = "\n".join(["request:", "%s" % json.dumps(command_info, indent=2),
253                                   "Got JSON-RPC error response"])
254            if resp.error == self.INVALID_PARAMETER_ERROR:
255                rpc_error = "\n".join([rpc_error, "GoRPCClient: error when decoding "
256                                                  "function arguments"])
257            elif resp.error == self.CONNECTION_ERROR:
258                rpc_error = "\n".join([rpc_error, "GoRPCClient: Error while connecting to "
259                                                  f"{self.addr}. Is SPDK application running?"])
260            elif resp.error == self.JSON_RPC_CALL_ERROR:
261                rpc_error = "\n".join([rpc_error, "GoRPCClient: error on JSON-RPC call"])
262            elif resp.error == self.INVALID_RESPONSE_ERROR:
263                rpc_error = "\n".join([rpc_error, "GoRPCClient: error on creating json "
264                                                  "representation of response"])
265            raise JSONRPCException(rpc_error)
266
267        try:
268            json_resp = json.loads(ctypes.c_char_p.from_buffer(resp.response).value)
269        finally:
270            lib.spdk_gorpc_free_response(resp.response)
271
272        if 'error' in json_resp:
273            params["method"] = method
274            params["req_id"] = json_resp['id']
275            msg = "\n".join(["request:", "%s" % json.dumps(params, indent=2),
276                             "Got JSON-RPC error response",
277                             "response:",
278                             json.dumps(json_resp['error'], indent=2)])
279            raise JSONRPCException(msg)
280
281        return json_resp['result']
282