1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright (C) 2017 Intel Corporation. 3# All rights reserved. 4 5import json 6import socket 7import time 8import os 9import logging 10import copy 11import ctypes 12 13 14def print_dict(d): 15 print(json.dumps(d, indent=2)) 16 17 18def print_json(s): 19 print(json.dumps(s, indent=2).strip('"')) 20 21 22def get_addr_type(addr): 23 try: 24 socket.inet_pton(socket.AF_INET, addr) 25 return socket.AF_INET 26 except Exception as e: 27 pass 28 try: 29 socket.inet_pton(socket.AF_INET6, addr) 30 return socket.AF_INET6 31 except Exception as e: 32 pass 33 if os.path.exists(addr): 34 return socket.AF_UNIX 35 return None 36 37 38class JSONRPCException(Exception): 39 def __init__(self, message): 40 self.message = message 41 42 43class JSONRPCClient(object): 44 def __init__(self, addr, port=None, timeout=None, **kwargs): 45 self.sock = None 46 ch = logging.StreamHandler() 47 ch.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) 48 ch.setLevel(logging.DEBUG) 49 self._logger = logging.getLogger("JSONRPCClient(%s)" % addr) 50 self._logger.addHandler(ch) 51 self.log_set_level(kwargs.get('log_level', logging.ERROR)) 52 connect_retries = kwargs.get('conn_retries', 0) 53 54 self.timeout = timeout if timeout is not None else 60.0 55 self._request_id = 0 56 self._recv_buf = "" 57 self._reqs = [] 58 59 for i in range(connect_retries): 60 try: 61 self._connect(addr, port) 62 return 63 except Exception as e: 64 # ignore and retry in 200ms 65 time.sleep(0.2) 66 67 # try one last time without try/except 68 self._connect(addr, port) 69 70 def __enter__(self): 71 return self 72 73 def __exit__(self, exception_type, exception_value, traceback): 74 self.close() 75 76 def _connect(self, addr, port): 77 try: 78 addr_type = get_addr_type(addr) 79 80 if addr_type == socket.AF_UNIX: 81 self._logger.debug("Trying to connect to UNIX socket: %s", addr) 82 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 83 self.sock.connect(addr) 84 elif addr_type == socket.AF_INET6: 85 self._logger.debug("Trying to connect to IPv6 address addr:%s, port:%i", addr, port) 86 for res in socket.getaddrinfo(addr, port, socket.AF_INET6, socket.SOCK_STREAM, socket.SOL_TCP): 87 af, socktype, proto, canonname, sa = res 88 self.sock = socket.socket(af, socktype, proto) 89 self.sock.connect(sa) 90 elif addr_type == socket.AF_INET: 91 self._logger.debug("Trying to connect to IPv4 address addr:%s, port:%i'", addr, port) 92 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 93 self.sock.connect((addr, port)) 94 else: 95 raise socket.error("Invalid or non-existing address: '%s'" % addr) 96 except socket.error as ex: 97 raise JSONRPCException("Error while connecting to %s\n" 98 "Is SPDK application running?\n" 99 "Error details: %s" % (addr, ex)) 100 101 def get_logger(self): 102 return self._logger 103 104 """Set logging level 105 106 Args: 107 lvl: Log level to set as accepted by logger.setLevel 108 """ 109 def log_set_level(self, lvl): 110 self._logger.info("Setting log level to %s", lvl) 111 self._logger.setLevel(lvl) 112 self._logger.info("Log level set to %s", lvl) 113 114 def close(self): 115 if getattr(self, "sock", None): 116 self.sock.shutdown(socket.SHUT_RDWR) 117 self.sock.close() 118 self.sock = None 119 120 def add_request(self, method, params): 121 self._request_id += 1 122 req = { 123 'jsonrpc': '2.0', 124 'method': method, 125 'id': self._request_id 126 } 127 128 if params: 129 req['params'] = copy.deepcopy(params) 130 131 self._logger.debug("append request:\n%s\n", json.dumps(req)) 132 self._reqs.append(req) 133 return self._request_id 134 135 def flush(self): 136 self._logger.debug("Flushing buffer") 137 # TODO: We can drop indent parameter 138 reqstr = "\n".join(json.dumps(req, indent=2) for req in self._reqs) 139 self._reqs = [] 140 self._logger.info("Requests:\n%s\n", reqstr) 141 self.sock.sendall(reqstr.encode("utf-8")) 142 143 def send(self, method, params=None): 144 id = self.add_request(method, params) 145 self.flush() 146 return id 147 148 def decode_one_response(self): 149 try: 150 self._logger.debug("Trying to decode response '%s'", self._recv_buf) 151 buf = self._recv_buf.lstrip() 152 obj, idx = json.JSONDecoder().raw_decode(buf) 153 self._recv_buf = buf[idx:] 154 return obj 155 except ValueError: 156 self._logger.debug("Partial response") 157 return None 158 159 def recv(self): 160 start_time = time.process_time() 161 response = self.decode_one_response() 162 while not response: 163 try: 164 timeout = self.timeout - (time.process_time() - start_time) 165 self.sock.settimeout(timeout) 166 newdata = self.sock.recv(4096) 167 if not newdata: 168 self.sock.close() 169 self.sock = None 170 raise JSONRPCException("Connection closed with partial response:\n%s\n" % self._recv_buf) 171 self._recv_buf += newdata.decode("utf-8") 172 response = self.decode_one_response() 173 except socket.timeout: 174 break # throw exception after loop to avoid Python freaking out about nested exceptions 175 except ValueError: 176 continue # incomplete response; keep buffering 177 178 if not response: 179 raise JSONRPCException("Timeout while waiting for response:\n%s\n" % self._recv_buf) 180 181 self._logger.info("response:\n%s\n", json.dumps(response, indent=2)) 182 return response 183 184 def call(self, method, params=None): 185 self._logger.debug("call('%s')" % method) 186 params = {} if params is None else params 187 if self.timeout <= 0: 188 raise JSONRPCException("Timeout value is invalid: %s\n" % self.timeout) 189 req_id = self.send(method, params) 190 try: 191 response = self.recv() 192 except JSONRPCException as e: 193 """ Don't expect response to kill """ 194 if not self.sock and method == "spdk_kill_instance": 195 self._logger.info("Connection terminated but ignoring since method is '%s'" % method) 196 return {} 197 else: 198 raise e 199 200 if 'error' in response: 201 params["method"] = method 202 params["req_id"] = req_id 203 msg = "\n".join(["request:", "%s" % json.dumps(params, indent=2), 204 "Got JSON-RPC error response", 205 "response:", 206 json.dumps(response['error'], indent=2)]) 207 raise JSONRPCException(msg) 208 209 return response['result'] 210 211 212class JSONRPCGoClient(object): 213 INVALID_PARAMETER_ERROR = 1 214 CONNECTION_ERROR = 2 215 JSON_RPC_CALL_ERROR = 3 216 INVALID_RESPONSE_ERROR = 4 217 218 def __init__(self, addr, **kwargs): 219 self.addr = addr 220 ch = logging.StreamHandler() 221 ch.setFormatter(logging.Formatter('Go client - %(levelname)s: %(message)s')) 222 ch.setLevel(logging.DEBUG) 223 self._logger = logging.getLogger("JSONRPCGoClient(%s)" % addr) 224 self._logger.addHandler(ch) 225 self._logger.setLevel(kwargs.get('log_level', logging.ERROR)) 226 227 def call(self, method, params=None): 228 self._logger.debug("call('%s')" % method) 229 params = {} if params is None else params 230 231 class GoClientResponse(ctypes.Structure): 232 _fields_ = [("response", ctypes.POINTER(ctypes.c_char)), ("error", ctypes.c_int)] 233 234 client_path = os.path.normpath(os.path.abspath(os.path.join(os.path.dirname(__file__), 235 os.path.pardir)) 236 + '/../../build/go/rpc/libspdk_gorpc.so') 237 try: 238 lib = ctypes.cdll.LoadLibrary(client_path) 239 except OSError: 240 raise JSONRPCException(f'Failed to load the Go RPC client at {client_path}') 241 lib.spdk_gorpc_free_response.argtypes = [ctypes.POINTER(ctypes.c_char)] 242 lib.spdk_gorpc_call.argtypes = [ctypes.c_char_p, ctypes.c_char_p] 243 lib.spdk_gorpc_call.restype = GoClientResponse 244 245 command_info = { 246 "method": method, 247 "params": params 248 } 249 resp = lib.spdk_gorpc_call(json.dumps(command_info).encode('utf-8'), 250 self.addr.encode('utf-8')) 251 if resp.error > 0: 252 rpc_error = "\n".join(["request:", "%s" % json.dumps(command_info, indent=2), 253 "Got JSON-RPC error response"]) 254 if resp.error == self.INVALID_PARAMETER_ERROR: 255 rpc_error = "\n".join([rpc_error, "GoRPCClient: error when decoding " 256 "function arguments"]) 257 elif resp.error == self.CONNECTION_ERROR: 258 rpc_error = "\n".join([rpc_error, "GoRPCClient: Error while connecting to " 259 f"{self.addr}. Is SPDK application running?"]) 260 elif resp.error == self.JSON_RPC_CALL_ERROR: 261 rpc_error = "\n".join([rpc_error, "GoRPCClient: error on JSON-RPC call"]) 262 elif resp.error == self.INVALID_RESPONSE_ERROR: 263 rpc_error = "\n".join([rpc_error, "GoRPCClient: error on creating json " 264 "representation of response"]) 265 raise JSONRPCException(rpc_error) 266 267 try: 268 json_resp = json.loads(ctypes.c_char_p.from_buffer(resp.response).value) 269 finally: 270 lib.spdk_gorpc_free_response(resp.response) 271 272 if 'error' in json_resp: 273 params["method"] = method 274 params["req_id"] = json_resp['id'] 275 msg = "\n".join(["request:", "%s" % json.dumps(params, indent=2), 276 "Got JSON-RPC error response", 277 "response:", 278 json.dumps(json_resp['error'], indent=2)]) 279 raise JSONRPCException(msg) 280 281 return json_resp['result'] 282