xref: /spdk/python/spdk/sma/qmp.py (revision 34edd9f1bf5fda4c987f4500ddc3c9f50be32e7d)
1#!/usr/bin/env python3
2#  SPDX-License-Identifier: BSD-3-Clause
3#  Copyright (C) 2022 Intel Corporation
4#  All rights reserved.
5#
6
7import socket
8from socket import error as SocketError
9import time
10import json
11import logging
12import sys
13from typing import (Any, Dict, Tuple)
14from argparse import ArgumentParser
15
16log = logging.getLogger(__name__)
17
18
19QMPMessage = Dict[str, Any]
20'''
21Base class for all QMPBaseClass messages
22'''
23
24
25QMPEvent = QMPMessage
26'''
27Base class alias all asynchronous event messages
28'''
29
30
31class QMPError(Exception):
32    '''
33    Base Exception Class for QMPClient implementation
34    '''
35    def __init__(self, message, code='internal'):
36        self.code = repr(code)
37        self.message = repr(message)
38        self.description = f'QMP Error ({self.code}): {self.message}'
39
40    def __str__(self):
41        return repr(self.description)
42
43
44class QMPSocketError(QMPError):
45    '''
46    Exception Class for socket exceptions in QMPClient implementation
47    '''
48    def __init__(self, message, code='socket'):
49        super().__init__(message, code)
50
51
52class QMPRequestError(QMPError):
53    '''
54    Exception Class for handling request response errors
55    '''
56    def __init__(self, reply: QMPMessage):
57        self.error_class = reply.get('error', {}).get('class', 'Undefined')
58        self.error_msg = reply.get('error', {}).get('desc', 'Unknown')
59        super().__init__(self.error_msg, self.error_class)
60
61
62class QMPClient():
63    '''
64    QMPBaseClass implements a low level connection to QMP socket
65
66    :param family is one of [socket.AF_INET, socket.AF_UNIX]
67    :param address is tuple(address, port) for socket.AF_INET
68                   or a path string for socket.AF_UNIX
69    :param timeout: timeout in seconds to use for the connection
70    :raise QMPError: for most error cases
71    '''
72    def __init__(self,
73                 address=('127.0.0.1', 10500),
74                 family: socket.AddressFamily = socket.AF_INET,
75                 timeout: float = 8.0):
76        self._exec_id = 0
77        self._capabilities = None
78        self._timeout = timeout
79        self._socketf = None
80        self._address = address
81        try:
82            self._socket = socket.socket(family, socket.SOCK_STREAM)
83            self._socket.settimeout(timeout)
84        except OSError as e:
85            raise QMPSocketError('Create: exception while creating') from e
86
87    def __enter__(self):
88        self._start()
89        return self
90
91    def __exit__(self, exception_type, exception_value, traceback):
92        self._disconnect_socket()
93
94    def _start(self):
95        '''
96        Exit negotiation mode and enter command mode
97
98        Based on: https://wiki.qemu.org/Documentation/QMP
99        Part of communication done after connect.
100        As stated in Capabilities Negotiation paragraph, for new connection
101        QMP sends greetings msg and enters capabilities negotiation mode.
102        To enter command mode, the qmp_capabilities command must be issued.
103        Can be issued only once per session or the QMP will report an error.
104        '''
105        self._connect()
106        self._capabilities = self._receive()[0]
107        if 'QMP' not in self._capabilities:
108            raise QMPError('NegotiateCap: protocol error, wrong message')
109        self.exec('qmp_capabilities')
110
111    def _get_next_exec_id(self):
112        self._exec_id += 1
113        return str(self._exec_id)
114
115    def _connect(self):
116        try:
117            if not self._is_connected():
118                self._socket.connect(self._address)
119                self._socketf = self._socket.makefile(mode='rw', encoding='utf-8')
120        except SocketError as e:
121            raise QMPSocketError('Connect: could not connect') from e
122
123    def _disconnect_socket(self):
124        if self._socket is not None:
125            self._socket.close()
126        if self._socketf is not None:
127            self._socketf.close()
128        self._socket = None
129        self._socketf = None
130
131    def _is_connected(self) -> bool:
132        return self._socketf is not None
133
134    def _check_event(self, event, received):
135        '''
136        Method for checking if "received" is the "event" we are waiting for.
137        :param event: dictionary description of event, mandatory fields are
138                      'event' = QMP name of the event
139                      'data' = event specific params in form of a dict.
140        :param received: received QMP event to check.
141        '''
142        if event['event'].lower() != received['event'].lower():
143            return False
144        for it in event.get('data', {}).items():
145            if it not in received.get('data', {}).items():
146                return False
147        return True
148
149    def _receive(self, event=None) -> Tuple[QMPMessage, QMPEvent]:
150        response = None
151        timeout_begin = time.time()
152        while self._timeout > (time.time() - timeout_begin):
153            try:
154                data = self._socketf.readline()
155                if data is None:
156                    raise QMPSocketError('Receive: socket got disconnected')
157                log.debug(f'Received: {data}')
158                msg = json.loads(data)
159            except SocketError as e:
160                raise QMPSocketError('Receive: socket read failed') from e
161            except EOFError as e:
162                raise QMPSocketError('Receive: socket read got unexpected EOF') from e
163            except json.JSONDecodeError as e:
164                raise QMPError('Receive: QMP message decode failed, JSONDecodeError') from e
165            if response is None:
166                if 'error' in msg:
167                    return msg, None
168                elif 'return' in msg:
169                    if event is None:
170                        return msg, None
171                    response = msg
172                # Sent only once per connection. Valid for capabilities negotiation mode only
173                elif 'QMP' in msg:
174                    if self._capabilities is not None:
175                        raise QMPError('Receive: QMP unexpected message type')
176                    return msg, None
177            elif self._check_event(event, msg):
178                return response, msg
179        raise QMPSocketError('Receive: Timed out while processing QMP receive loop')
180
181    def _send(self, msg: Dict):
182        log.debug(f'Sending: {msg}')
183        try:
184            self._socket.sendall(bytes(json.dumps(msg) + '\r\n', 'utf-8'))
185        except TimeoutError as e:
186            raise QMPSocketError('Send: got socket timeout error') from e
187        except SocketError as e:
188            raise QMPSocketError('Send: got system socket error') from e
189
190    def exec(self, cmd: str, args: Dict = None, event: Dict = None) -> QMPMessage:
191        '''
192        Execute QMP cmd and read result. Returns resulting message, error or optionally
193        an event that the QMP client should wait for to be send by the server.
194
195        :param cmd: string name of the command to execute
196        :param args: optional arguments dictionary to pass
197        :param event: optional dictionary describing an event to wait for
198        :return command exec response or optionally execute result event
199        :raise QMPRequestError: on response from QMP server being of error type
200        :raise QMPSocketError: on timeout or socket errors
201        :raise QMPError: on id mismatch and JSONdecoder errors
202        '''
203        cmd_id = self._get_next_exec_id()
204        msg = {'execute': cmd, 'id': cmd_id}
205        if args is not None and len(args):
206            msg['arguments'] = args
207
208        self._send(msg)
209        response, result = self._receive(event)
210
211        if response.get('id') != cmd_id:
212            raise QMPError('QMP Protocol Error, invalid result id')
213        elif 'error' in response:
214            raise QMPRequestError(response)
215        if result is not None:
216            return result
217        return response
218
219    def device_add(self, params: Dict, event: Dict = None):
220        return self.exec('device_add', params, event)
221
222    def device_del(self, params: Dict, event: Dict = None):
223        return self.exec('device_del', params, event)
224
225    def chardev_add(self, params: Dict, event: Dict = None):
226        return self.exec('chardev-add', params, event)
227
228    def chardev_remove(self, params: Dict, event: Dict = None):
229        return self.exec('chardev-remove', params, event)
230
231    def query_pci(self):
232        return self.exec('query-pci')
233
234    def query_chardev(self):
235        return self.exec('query-chardev')
236
237    def device_list_properties(self, typename: str):
238        return self.exec('device-list-properties', {'typename': typename})
239
240
241def parse_argv():
242    parser = ArgumentParser(description='QEMU Machine Protocol (QMP) client')
243    parser.add_argument('--address', '-a', default='127.0.0.1',
244                        help='IP address of QMP server instance to connect to')
245    parser.add_argument('--port', '-p', default=10500, type=int,
246                        help='Port number of QMP server instance to connect to')
247    return parser.parse_args()
248
249
250def main(args):
251    argv = parse_argv()
252    data = json.loads(sys.stdin.read())
253    request = data.get('request')
254    event = data.get('event')
255    with QMPClient((argv.address, argv.port)) as cli:
256        result = cli.exec(request['execute'], request.get('arguments'), event)
257        print(json.dumps(result, indent=2))
258
259
260# Example usage with command line calls:
261# 1) Without event parameter:
262# {
263#     "request": {
264#         "execute": "device-list-properties",
265#         "arguments": {
266#             "typename": "vfiouser-1-1"
267#         }
268#     }
269# }
270# 2) With event parameter specified. Specifying 'event'
271# parameter will set script to block wait for occurrence
272# of such one after a valid execution of specified request:
273# {
274#     "event": {
275#         "event": "DEVICE_DELETED",
276#         "data": {
277#             "device": "vfiouser-1-1"
278#         }
279#     },
280#     "request": {
281#         "execute": "device_del",
282#         "arguments": {
283#             "id": "vfiouser-1-1"
284#         }
285#     }
286# }
287
288
289if __name__ == '__main__':
290    main(sys.argv[1:])
291