xref: /dpdk/usertools/dpdk-telemetry-exporter.py (revision d94ebd627a86dfbc0ccf8d7d3802196c55d826cf)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: BSD-3-Clause
3# Copyright (c) 2023 Robin Jarry
4
5r'''
6DPDK telemetry exporter.
7
8It uses dynamically loaded endpoint exporters which are basic python files that
9must implement two functions:
10
11    def info() -> dict[MetricName, MetricInfo]:
12        """
13        Mapping of metric names to their description and type.
14        """
15
16    def metrics(sock: TelemetrySocket) -> list[MetricValue]:
17        """
18        Request data from sock and return it as metric values. A metric value
19        is a 3-tuple: (name: str, value: any, labels: dict). Each name must be
20        present in info().
21        """
22
23The sock argument passed to metrics() has a single method:
24
25    def cmd(self, uri, arg=None) -> dict | list:
26        """
27        Request JSON data to the telemetry socket and parse it to python
28        values.
29        """
30
31See existing endpoints for examples.
32
33The exporter supports multiple output formats:
34
35prometheus://ADDRESS:PORT
36openmetrics://ADDRESS:PORT
37  Expose the enabled endpoints via a local HTTP server listening on the
38  specified address and port. GET requests on that server are served with
39  text/plain responses in the prometheus/openmetrics format.
40
41  More details:
42  https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format
43
44carbon://ADDRESS:PORT
45graphite://ADDRESS:PORT
46  Export all enabled endpoints to the specified TCP ADDRESS:PORT in the pickle
47  carbon format.
48
49  More details:
50  https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol
51'''
52
53import argparse
54import importlib.util
55import json
56import logging
57import os
58import pickle
59import re
60import socket
61import struct
62import sys
63import time
64import typing
65from http import HTTPStatus, server
66from urllib.parse import urlparse
67
68LOG = logging.getLogger(__name__)
69# Use local endpoints path only when running from source
70LOCAL = os.path.join(os.path.dirname(__file__), "telemetry-endpoints")
71DEFAULT_LOAD_PATHS = []
72if os.path.isdir(LOCAL):
73    DEFAULT_LOAD_PATHS.append(LOCAL)
74DEFAULT_LOAD_PATHS += [
75    "/usr/local/share/dpdk/telemetry-endpoints",
76    "/usr/share/dpdk/telemetry-endpoints",
77]
78DEFAULT_OUTPUT = "openmetrics://:9876"
79
80
81def main():
82    logging.basicConfig(
83        stream=sys.stdout,
84        level=logging.INFO,
85        format="%(asctime)s %(levelname)s %(message)s",
86        datefmt="%Y-%m-%d %H:%M:%S",
87    )
88    parser = argparse.ArgumentParser(
89        description=__doc__,
90        formatter_class=argparse.RawDescriptionHelpFormatter,
91    )
92    parser.add_argument(
93        "-o",
94        "--output",
95        metavar="FORMAT://PARAMETERS",
96        default=urlparse(DEFAULT_OUTPUT),
97        type=urlparse,
98        help=f"""
99        Output format (default: "{DEFAULT_OUTPUT}"). Depending on the format,
100        URL elements have different meanings. By default, the exporter starts a
101        local HTTP server on port 9876 that serves requests in the
102        prometheus/openmetrics plain text format.
103        """,
104    )
105    parser.add_argument(
106        "-p",
107        "--load-path",
108        dest="load_paths",
109        type=lambda v: v.split(os.pathsep),
110        default=DEFAULT_LOAD_PATHS,
111        help=f"""
112        The list of paths from which to disvover endpoints.
113        (default: "{os.pathsep.join(DEFAULT_LOAD_PATHS)}").
114        """,
115    )
116    parser.add_argument(
117        "-e",
118        "--endpoint",
119        dest="endpoints",
120        metavar="ENDPOINT",
121        action="append",
122        help="""
123        Telemetry endpoint to export (by default, all discovered endpoints are
124        enabled). This option can be specified more than once.
125        """,
126    )
127    parser.add_argument(
128        "-l",
129        "--list",
130        action="store_true",
131        help="""
132        Only list detected endpoints and exit.
133        """,
134    )
135    parser.add_argument(
136        "-s",
137        "--socket-path",
138        default="/run/dpdk/rte/dpdk_telemetry.v2",
139        help="""
140        The DPDK telemetry socket path (default: "%(default)s").
141        """,
142    )
143    args = parser.parse_args()
144    output = OUTPUT_FORMATS.get(args.output.scheme)
145    if output is None:
146        parser.error(f"unsupported output format: {args.output.scheme}://")
147
148    try:
149        endpoints = load_endpoints(args.load_paths, args.endpoints)
150        if args.list:
151            return
152    except Exception as e:
153        parser.error(str(e))
154
155    output(args, endpoints)
156
157
158class TelemetrySocket:
159    """
160    Abstraction of the DPDK telemetry socket.
161    """
162
163    def __init__(self, path: str):
164        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
165        self.sock.connect(path)
166        data = json.loads(self.sock.recv(1024).decode())
167        self.max_output_len = data["max_output_len"]
168
169    def cmd(
170        self, uri: str, arg: typing.Any = None
171    ) -> typing.Optional[typing.Union[dict, list]]:
172        """
173        Request JSON data to the telemetry socket and parse it to python
174        values.
175        """
176        if arg is not None:
177            u = f"{uri},{arg}"
178        else:
179            u = uri
180        self.sock.send(u.encode("utf-8"))
181        data = self.sock.recv(self.max_output_len)
182        return json.loads(data.decode("utf-8"))[uri]
183
184    def __enter__(self):
185        return self
186
187    def __exit__(self, *args, **kwargs):
188        self.sock.close()
189
190
191MetricDescription = str
192MetricType = str
193MetricName = str
194MetricLabels = typing.Dict[str, typing.Any]
195MetricInfo = typing.Tuple[MetricDescription, MetricType]
196MetricValue = typing.Tuple[MetricName, typing.Any, MetricLabels]
197
198
199class TelemetryEndpoint:
200    """
201    Placeholder class only used for typing annotations.
202    """
203
204    @staticmethod
205    def info() -> typing.Dict[MetricName, MetricInfo]:
206        """
207        Mapping of metric names to their description and type.
208        """
209        raise NotImplementedError()
210
211    @staticmethod
212    def metrics(sock: TelemetrySocket) -> typing.List[MetricValue]:
213        """
214        Request data from sock and return it as metric values. Each metric
215        name must be present in info().
216        """
217        raise NotImplementedError()
218
219
220def load_endpoints(
221    paths: typing.List[str], names: typing.List[str]
222) -> typing.List[TelemetryEndpoint]:
223    """
224    Load selected telemetry endpoints from the specified paths.
225    """
226
227    endpoints = {}
228    dwb = sys.dont_write_bytecode
229    sys.dont_write_bytecode = True  # never generate .pyc files for endpoints
230
231    for p in paths:
232        if not os.path.isdir(p):
233            continue
234        for fname in os.listdir(p):
235            f = os.path.join(p, fname)
236            if os.path.isdir(f):
237                continue
238            try:
239                name, _ = os.path.splitext(fname)
240                if names is not None and name not in names:
241                    # not selected by user
242                    continue
243                if name in endpoints:
244                    # endpoint with same name already loaded
245                    continue
246                spec = importlib.util.spec_from_file_location(name, f)
247                module = importlib.util.module_from_spec(spec)
248                spec.loader.exec_module(module)
249                endpoints[name] = module
250            except Exception:
251                LOG.exception("parsing endpoint: %s", f)
252
253    if not endpoints:
254        raise Exception("no telemetry endpoints detected/selected")
255
256    sys.dont_write_bytecode = dwb
257
258    modules = []
259    info = {}
260    for name, module in sorted(endpoints.items()):
261        LOG.info("using endpoint: %s (from %s)", name, module.__file__)
262        try:
263            for metric, (description, type_) in module.info().items():
264                info[(name, metric)] = (description, type_)
265            modules.append(module)
266        except Exception:
267            LOG.exception("getting endpoint info: %s", name)
268    return modules
269
270
271def serve_openmetrics(
272    args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint]
273):
274    """
275    Start an HTTP server and serve requests in the openmetrics/prometheus
276    format.
277    """
278    listen = (args.output.hostname or "", int(args.output.port or 80))
279    with server.HTTPServer(listen, OpenmetricsHandler) as httpd:
280        httpd.dpdk_socket_path = args.socket_path
281        httpd.telemetry_endpoints = endpoints
282        LOG.info("listening on port %s", httpd.server_port)
283        try:
284            httpd.serve_forever()
285        except KeyboardInterrupt:
286            LOG.info("shutting down")
287
288
289class OpenmetricsHandler(server.BaseHTTPRequestHandler):
290    """
291    Basic HTTP handler that returns prometheus/openmetrics formatted responses.
292    """
293
294    CONTENT_TYPE = "text/plain; version=0.0.4; charset=utf-8"
295
296    def escape(self, value: typing.Any) -> str:
297        """
298        Escape a metric label value.
299        """
300        value = str(value)
301        value = value.replace('"', '\\"')
302        value = value.replace("\\", "\\\\")
303        return value.replace("\n", "\\n")
304
305    def do_GET(self):
306        """
307        Called upon GET requests.
308        """
309        try:
310            lines = []
311            metrics_names = set()
312            with TelemetrySocket(self.server.dpdk_socket_path) as sock:
313                for e in self.server.telemetry_endpoints:
314                    info = e.info()
315                    metrics_lines = []
316                    try:
317                        metrics = e.metrics(sock)
318                    except Exception:
319                        LOG.exception("%s: metrics collection failed", e.__name__)
320                        continue
321                    for name, value, labels in metrics:
322                        fullname = re.sub(r"\W", "_", f"dpdk_{e.__name__}_{name}")
323                        labels = ", ".join(
324                            f'{k}="{self.escape(v)}"' for k, v in labels.items()
325                        )
326                        if labels:
327                            labels = f"{{{labels}}}"
328                        metrics_lines.append(f"{fullname}{labels} {value}")
329                        if fullname not in metrics_names:
330                            metrics_names.add(fullname)
331                            desc, metric_type = info[name]
332                            lines += [
333                                f"# HELP {fullname} {desc}",
334                                f"# TYPE {fullname} {metric_type}",
335                            ]
336                    lines += metrics_lines
337            if not lines:
338                self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR)
339                LOG.error(
340                    "%s %s: no metrics collected",
341                    self.address_string(),
342                    self.requestline,
343                )
344            body = "\n".join(lines).encode("utf-8") + b"\n"
345            self.send_response(HTTPStatus.OK)
346            self.send_header("Content-Type", self.CONTENT_TYPE)
347            self.send_header("Content-Length", str(len(body)))
348            self.end_headers()
349            self.wfile.write(body)
350            LOG.info("%s %s", self.address_string(), self.requestline)
351
352        except (FileNotFoundError, ConnectionRefusedError):
353            self.send_error(HTTPStatus.SERVICE_UNAVAILABLE)
354            LOG.exception(
355                "%s %s: telemetry socket not available",
356                self.address_string(),
357                self.requestline,
358            )
359        except Exception:
360            self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR)
361            LOG.exception("%s %s", self.address_string(), self.requestline)
362
363    def log_message(self, fmt, *args):
364        pass  # disable built-in logger
365
366
367def export_carbon(args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint]):
368    """
369    Collect all metrics and export them to a carbon server in the pickle format.
370    """
371    addr = (args.output.hostname or "", int(args.output.port or 80))
372    with TelemetrySocket(args.socket_path) as dpdk:
373        with socket.socket() as carbon:
374            carbon.connect(addr)
375            all_metrics = []
376            for e in endpoints:
377                try:
378                    metrics = e.metrics(dpdk)
379                except Exception:
380                    LOG.exception("%s: metrics collection failed", e.__name__)
381                    continue
382                for name, value, labels in metrics:
383                    fullname = re.sub(r"\W", ".", f"dpdk.{e.__name__}.{name}")
384                    for key, val in labels.items():
385                        val = str(val).replace(";", "")
386                        fullname += f";{key}={val}"
387                    all_metrics.append((fullname, (time.time(), value)))
388            if not all_metrics:
389                raise Exception("no metrics collected")
390            payload = pickle.dumps(all_metrics, protocol=2)
391            header = struct.pack("!L", len(payload))
392            buf = header + payload
393            carbon.sendall(buf)
394
395
396OUTPUT_FORMATS = {
397    "openmetrics": serve_openmetrics,
398    "prometheus": serve_openmetrics,
399    "carbon": export_carbon,
400    "graphite": export_carbon,
401}
402
403
404if __name__ == "__main__":
405    main()
406