1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (C) 2022 Intel Corporation 4# All rights reserved. 5# 6 7import socket 8from socket import error as SocketError 9import time 10import json 11import logging 12import sys 13from typing import (Any, Dict, Tuple) 14from argparse import ArgumentParser 15 16log = logging.getLogger(__name__) 17 18 19QMPMessage = Dict[str, Any] 20''' 21Base class for all QMPBaseClass messages 22''' 23 24 25QMPEvent = QMPMessage 26''' 27Base class alias all asynchronous event messages 28''' 29 30 31class QMPError(Exception): 32 ''' 33 Base Exception Class for QMPClient implementation 34 ''' 35 def __init__(self, message, code='internal'): 36 self.code = repr(code) 37 self.message = repr(message) 38 self.description = f'QMP Error ({self.code}): {self.message}' 39 40 def __str__(self): 41 return repr(self.description) 42 43 44class QMPSocketError(QMPError): 45 ''' 46 Exception Class for socket exceptions in QMPClient implementation 47 ''' 48 def __init__(self, message, code='socket'): 49 super().__init__(message, code) 50 51 52class QMPRequestError(QMPError): 53 ''' 54 Exception Class for handling request response errors 55 ''' 56 def __init__(self, reply: QMPMessage): 57 self.error_class = reply.get('error', {}).get('class', 'Undefined') 58 self.error_msg = reply.get('error', {}).get('desc', 'Unknown') 59 super().__init__(self.error_msg, self.error_class) 60 61 62class QMPClient(): 63 ''' 64 QMPBaseClass implements a low level connection to QMP socket 65 66 :param family is one of [socket.AF_INET, socket.AF_UNIX] 67 :param address is tuple(address, port) for socket.AF_INET 68 or a path string for socket.AF_UNIX 69 :param timeout: timeout in seconds to use for the connection 70 :raise QMPError: for most error cases 71 ''' 72 def __init__(self, 73 address=('127.0.0.1', 10500), 74 family: socket.AddressFamily = socket.AF_INET, 75 timeout: float = 8.0): 76 self._exec_id = 0 77 self._capabilities = None 78 self._timeout = timeout 79 self._socketf = None 80 self._address = address 81 try: 82 self._socket = socket.socket(family, socket.SOCK_STREAM) 83 self._socket.settimeout(timeout) 84 except OSError as e: 85 raise QMPSocketError('Create: exception while creating') from e 86 87 def __enter__(self): 88 self._start() 89 return self 90 91 def __exit__(self, exception_type, exception_value, traceback): 92 self._disconnect_socket() 93 94 def _start(self): 95 ''' 96 Exit negotiation mode and enter command mode 97 98 Based on: https://wiki.qemu.org/Documentation/QMP 99 Part of communication done after connect. 100 As stated in Capabilities Negotiation paragraph, for new connection 101 QMP sends greetings msg and enters capabilities negotiation mode. 102 To enter command mode, the qmp_capabilities command must be issued. 103 Can be issued only once per session or the QMP will report an error. 104 ''' 105 self._connect() 106 self._capabilities = self._receive()[0] 107 if 'QMP' not in self._capabilities: 108 raise QMPError('NegotiateCap: protocol error, wrong message') 109 self.exec('qmp_capabilities') 110 111 def _get_next_exec_id(self): 112 self._exec_id += 1 113 return str(self._exec_id) 114 115 def _connect(self): 116 try: 117 if not self._is_connected(): 118 self._socket.connect(self._address) 119 self._socketf = self._socket.makefile(mode='rw', encoding='utf-8') 120 except SocketError as e: 121 raise QMPSocketError('Connect: could not connect') from e 122 123 def _disconnect_socket(self): 124 if self._socket is not None: 125 self._socket.close() 126 if self._socketf is not None: 127 self._socketf.close() 128 self._socket = None 129 self._socketf = None 130 131 def _is_connected(self) -> bool: 132 return self._socketf is not None 133 134 def _check_event(self, event, received): 135 ''' 136 Method for checking if "received" is the "event" we are waiting for. 137 :param event: dictionary description of event, mandatory fields are 138 'event' = QMP name of the event 139 'data' = event specific params in form of a dict. 140 :param received: received QMP event to check. 141 ''' 142 if event['event'].lower() != received['event'].lower(): 143 return False 144 for it in event.get('data', {}).items(): 145 if it not in received.get('data', {}).items(): 146 return False 147 return True 148 149 def _receive(self, event=None) -> Tuple[QMPMessage, QMPEvent]: 150 response = None 151 timeout_begin = time.time() 152 while self._timeout > (time.time() - timeout_begin): 153 try: 154 data = self._socketf.readline() 155 if data is None: 156 raise QMPSocketError('Receive: socket got disconnected') 157 log.debug(f'Received: {data}') 158 msg = json.loads(data) 159 except SocketError as e: 160 raise QMPSocketError('Receive: socket read failed') from e 161 except EOFError as e: 162 raise QMPSocketError('Receive: socket read got unexpected EOF') from e 163 except json.JSONDecodeError as e: 164 raise QMPError('Receive: QMP message decode failed, JSONDecodeError') from e 165 if response is None: 166 if 'error' in msg: 167 return msg, None 168 elif 'return' in msg: 169 if event is None: 170 return msg, None 171 response = msg 172 # Sent only once per connection. Valid for capabilities negotiation mode only 173 elif 'QMP' in msg: 174 if self._capabilities is not None: 175 raise QMPError('Receive: QMP unexpected message type') 176 return msg, None 177 elif self._check_event(event, msg): 178 return response, msg 179 raise QMPSocketError('Receive: Timed out while processing QMP receive loop') 180 181 def _send(self, msg: Dict): 182 log.debug(f'Sending: {msg}') 183 try: 184 self._socket.sendall(bytes(json.dumps(msg) + '\r\n', 'utf-8')) 185 except TimeoutError as e: 186 raise QMPSocketError('Send: got socket timeout error') from e 187 except SocketError as e: 188 raise QMPSocketError('Send: got system socket error') from e 189 190 def exec(self, cmd: str, args: Dict = None, event: Dict = None) -> QMPMessage: 191 ''' 192 Execute QMP cmd and read result. Returns resulting message, error or optionally 193 an event that the QMP client should wait for to be send by the server. 194 195 :param cmd: string name of the command to execute 196 :param args: optional arguments dictionary to pass 197 :param event: optional dictionary describing an event to wait for 198 :return command exec response or optionally execute result event 199 :raise QMPRequestError: on response from QMP server being of error type 200 :raise QMPSocketError: on timeout or socket errors 201 :raise QMPError: on id mismatch and JSONdecoder errors 202 ''' 203 cmd_id = self._get_next_exec_id() 204 msg = {'execute': cmd, 'id': cmd_id} 205 if args is not None and len(args): 206 msg['arguments'] = args 207 208 self._send(msg) 209 response, result = self._receive(event) 210 211 if response.get('id') != cmd_id: 212 raise QMPError('QMP Protocol Error, invalid result id') 213 elif 'error' in response: 214 raise QMPRequestError(response) 215 if result is not None: 216 return result 217 return response 218 219 def device_add(self, params: Dict, event: Dict = None): 220 return self.exec('device_add', params, event) 221 222 def device_del(self, params: Dict, event: Dict = None): 223 return self.exec('device_del', params, event) 224 225 def chardev_add(self, params: Dict, event: Dict = None): 226 return self.exec('chardev-add', params, event) 227 228 def chardev_remove(self, params: Dict, event: Dict = None): 229 return self.exec('chardev-remove', params, event) 230 231 def query_pci(self): 232 return self.exec('query-pci') 233 234 def query_chardev(self): 235 return self.exec('query-chardev') 236 237 def device_list_properties(self, typename: str): 238 return self.exec('device-list-properties', {'typename': typename}) 239 240 241def parse_argv(): 242 parser = ArgumentParser(description='QEMU Machine Protocol (QMP) client') 243 parser.add_argument('--address', '-a', default='127.0.0.1', 244 help='IP address of QMP server instance to connect to') 245 parser.add_argument('--port', '-p', default=10500, type=int, 246 help='Port number of QMP server instance to connect to') 247 return parser.parse_args() 248 249 250def main(args): 251 argv = parse_argv() 252 data = json.loads(sys.stdin.read()) 253 request = data.get('request') 254 event = data.get('event') 255 with QMPClient((argv.address, argv.port)) as cli: 256 result = cli.exec(request['execute'], request.get('arguments'), event) 257 print(json.dumps(result, indent=2)) 258 259 260# Example usage with command line calls: 261# 1) Without event parameter: 262# { 263# "request": { 264# "execute": "device-list-properties", 265# "arguments": { 266# "typename": "vfiouser-1-1" 267# } 268# } 269# } 270# 2) With event parameter specified. Specifying 'event' 271# parameter will set script to block wait for occurrence 272# of such one after a valid execution of specified request: 273# { 274# "event": { 275# "event": "DEVICE_DELETED", 276# "data": { 277# "device": "vfiouser-1-1" 278# } 279# }, 280# "request": { 281# "execute": "device_del", 282# "arguments": { 283# "id": "vfiouser-1-1" 284# } 285# } 286# } 287 288 289if __name__ == '__main__': 290 main(sys.argv[1:]) 291