1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (c) 2023 Robin Jarry 4 5r''' 6DPDK telemetry exporter. 7 8It uses dynamically loaded endpoint exporters which are basic python files that 9must implement two functions: 10 11 def info() -> dict[MetricName, MetricInfo]: 12 """ 13 Mapping of metric names to their description and type. 14 """ 15 16 def metrics(sock: TelemetrySocket) -> list[MetricValue]: 17 """ 18 Request data from sock and return it as metric values. A metric value 19 is a 3-tuple: (name: str, value: any, labels: dict). Each name must be 20 present in info(). 21 """ 22 23The sock argument passed to metrics() has a single method: 24 25 def cmd(self, uri, arg=None) -> dict | list: 26 """ 27 Request JSON data to the telemetry socket and parse it to python 28 values. 29 """ 30 31See existing endpoints for examples. 32 33The exporter supports multiple output formats: 34 35prometheus://ADDRESS:PORT 36openmetrics://ADDRESS:PORT 37 Expose the enabled endpoints via a local HTTP server listening on the 38 specified address and port. GET requests on that server are served with 39 text/plain responses in the prometheus/openmetrics format. 40 41 More details: 42 https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format 43 44carbon://ADDRESS:PORT 45graphite://ADDRESS:PORT 46 Export all enabled endpoints to the specified TCP ADDRESS:PORT in the pickle 47 carbon format. 48 49 More details: 50 https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol 51''' 52 53import argparse 54import importlib.util 55import json 56import logging 57import os 58import pickle 59import re 60import socket 61import struct 62import sys 63import time 64import typing 65from http import HTTPStatus, server 66from urllib.parse import urlparse 67 68LOG = logging.getLogger(__name__) 69# Use local endpoints path only when running from source 70LOCAL = os.path.join(os.path.dirname(__file__), "telemetry-endpoints") 71DEFAULT_LOAD_PATHS = [] 72if os.path.isdir(LOCAL): 73 DEFAULT_LOAD_PATHS.append(LOCAL) 74DEFAULT_LOAD_PATHS += [ 75 "/usr/local/share/dpdk/telemetry-endpoints", 76 "/usr/share/dpdk/telemetry-endpoints", 77] 78DEFAULT_OUTPUT = "openmetrics://:9876" 79 80 81def main(): 82 logging.basicConfig( 83 stream=sys.stdout, 84 level=logging.INFO, 85 format="%(asctime)s %(levelname)s %(message)s", 86 datefmt="%Y-%m-%d %H:%M:%S", 87 ) 88 parser = argparse.ArgumentParser( 89 description=__doc__, 90 formatter_class=argparse.RawDescriptionHelpFormatter, 91 ) 92 parser.add_argument( 93 "-o", 94 "--output", 95 metavar="FORMAT://PARAMETERS", 96 default=urlparse(DEFAULT_OUTPUT), 97 type=urlparse, 98 help=f""" 99 Output format (default: "{DEFAULT_OUTPUT}"). Depending on the format, 100 URL elements have different meanings. By default, the exporter starts a 101 local HTTP server on port 9876 that serves requests in the 102 prometheus/openmetrics plain text format. 103 """, 104 ) 105 parser.add_argument( 106 "-p", 107 "--load-path", 108 dest="load_paths", 109 type=lambda v: v.split(os.pathsep), 110 default=DEFAULT_LOAD_PATHS, 111 help=f""" 112 The list of paths from which to disvover endpoints. 113 (default: "{os.pathsep.join(DEFAULT_LOAD_PATHS)}"). 114 """, 115 ) 116 parser.add_argument( 117 "-e", 118 "--endpoint", 119 dest="endpoints", 120 metavar="ENDPOINT", 121 action="append", 122 help=""" 123 Telemetry endpoint to export (by default, all discovered endpoints are 124 enabled). This option can be specified more than once. 125 """, 126 ) 127 parser.add_argument( 128 "-l", 129 "--list", 130 action="store_true", 131 help=""" 132 Only list detected endpoints and exit. 133 """, 134 ) 135 parser.add_argument( 136 "-s", 137 "--socket-path", 138 default="/run/dpdk/rte/dpdk_telemetry.v2", 139 help=""" 140 The DPDK telemetry socket path (default: "%(default)s"). 141 """, 142 ) 143 args = parser.parse_args() 144 output = OUTPUT_FORMATS.get(args.output.scheme) 145 if output is None: 146 parser.error(f"unsupported output format: {args.output.scheme}://") 147 148 try: 149 endpoints = load_endpoints(args.load_paths, args.endpoints) 150 if args.list: 151 return 152 except Exception as e: 153 parser.error(str(e)) 154 155 output(args, endpoints) 156 157 158class TelemetrySocket: 159 """ 160 Abstraction of the DPDK telemetry socket. 161 """ 162 163 def __init__(self, path: str): 164 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) 165 self.sock.connect(path) 166 data = json.loads(self.sock.recv(1024).decode()) 167 self.max_output_len = data["max_output_len"] 168 169 def cmd( 170 self, uri: str, arg: typing.Any = None 171 ) -> typing.Optional[typing.Union[dict, list]]: 172 """ 173 Request JSON data to the telemetry socket and parse it to python 174 values. 175 """ 176 if arg is not None: 177 u = f"{uri},{arg}" 178 else: 179 u = uri 180 self.sock.send(u.encode("utf-8")) 181 data = self.sock.recv(self.max_output_len) 182 return json.loads(data.decode("utf-8"))[uri] 183 184 def __enter__(self): 185 return self 186 187 def __exit__(self, *args, **kwargs): 188 self.sock.close() 189 190 191MetricDescription = str 192MetricType = str 193MetricName = str 194MetricLabels = typing.Dict[str, typing.Any] 195MetricInfo = typing.Tuple[MetricDescription, MetricType] 196MetricValue = typing.Tuple[MetricName, typing.Any, MetricLabels] 197 198 199class TelemetryEndpoint: 200 """ 201 Placeholder class only used for typing annotations. 202 """ 203 204 @staticmethod 205 def info() -> typing.Dict[MetricName, MetricInfo]: 206 """ 207 Mapping of metric names to their description and type. 208 """ 209 raise NotImplementedError() 210 211 @staticmethod 212 def metrics(sock: TelemetrySocket) -> typing.List[MetricValue]: 213 """ 214 Request data from sock and return it as metric values. Each metric 215 name must be present in info(). 216 """ 217 raise NotImplementedError() 218 219 220def load_endpoints( 221 paths: typing.List[str], names: typing.List[str] 222) -> typing.List[TelemetryEndpoint]: 223 """ 224 Load selected telemetry endpoints from the specified paths. 225 """ 226 227 endpoints = {} 228 dwb = sys.dont_write_bytecode 229 sys.dont_write_bytecode = True # never generate .pyc files for endpoints 230 231 for p in paths: 232 if not os.path.isdir(p): 233 continue 234 for fname in os.listdir(p): 235 f = os.path.join(p, fname) 236 if os.path.isdir(f): 237 continue 238 try: 239 name, _ = os.path.splitext(fname) 240 if names is not None and name not in names: 241 # not selected by user 242 continue 243 if name in endpoints: 244 # endpoint with same name already loaded 245 continue 246 spec = importlib.util.spec_from_file_location(name, f) 247 module = importlib.util.module_from_spec(spec) 248 spec.loader.exec_module(module) 249 endpoints[name] = module 250 except Exception: 251 LOG.exception("parsing endpoint: %s", f) 252 253 if not endpoints: 254 raise Exception("no telemetry endpoints detected/selected") 255 256 sys.dont_write_bytecode = dwb 257 258 modules = [] 259 info = {} 260 for name, module in sorted(endpoints.items()): 261 LOG.info("using endpoint: %s (from %s)", name, module.__file__) 262 try: 263 for metric, (description, type_) in module.info().items(): 264 info[(name, metric)] = (description, type_) 265 modules.append(module) 266 except Exception: 267 LOG.exception("getting endpoint info: %s", name) 268 return modules 269 270 271def serve_openmetrics( 272 args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint] 273): 274 """ 275 Start an HTTP server and serve requests in the openmetrics/prometheus 276 format. 277 """ 278 listen = (args.output.hostname or "", int(args.output.port or 80)) 279 with server.HTTPServer(listen, OpenmetricsHandler) as httpd: 280 httpd.dpdk_socket_path = args.socket_path 281 httpd.telemetry_endpoints = endpoints 282 LOG.info("listening on port %s", httpd.server_port) 283 try: 284 httpd.serve_forever() 285 except KeyboardInterrupt: 286 LOG.info("shutting down") 287 288 289class OpenmetricsHandler(server.BaseHTTPRequestHandler): 290 """ 291 Basic HTTP handler that returns prometheus/openmetrics formatted responses. 292 """ 293 294 CONTENT_TYPE = "text/plain; version=0.0.4; charset=utf-8" 295 296 def escape(self, value: typing.Any) -> str: 297 """ 298 Escape a metric label value. 299 """ 300 value = str(value) 301 value = value.replace('"', '\\"') 302 value = value.replace("\\", "\\\\") 303 return value.replace("\n", "\\n") 304 305 def do_GET(self): 306 """ 307 Called upon GET requests. 308 """ 309 try: 310 lines = [] 311 metrics_names = set() 312 with TelemetrySocket(self.server.dpdk_socket_path) as sock: 313 for e in self.server.telemetry_endpoints: 314 info = e.info() 315 metrics_lines = [] 316 try: 317 metrics = e.metrics(sock) 318 except Exception: 319 LOG.exception("%s: metrics collection failed", e.__name__) 320 continue 321 for name, value, labels in metrics: 322 fullname = re.sub(r"\W", "_", f"dpdk_{e.__name__}_{name}") 323 labels = ", ".join( 324 f'{k}="{self.escape(v)}"' for k, v in labels.items() 325 ) 326 if labels: 327 labels = f"{{{labels}}}" 328 metrics_lines.append(f"{fullname}{labels} {value}") 329 if fullname not in metrics_names: 330 metrics_names.add(fullname) 331 desc, metric_type = info[name] 332 lines += [ 333 f"# HELP {fullname} {desc}", 334 f"# TYPE {fullname} {metric_type}", 335 ] 336 lines += metrics_lines 337 if not lines: 338 self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR) 339 LOG.error( 340 "%s %s: no metrics collected", 341 self.address_string(), 342 self.requestline, 343 ) 344 body = "\n".join(lines).encode("utf-8") + b"\n" 345 self.send_response(HTTPStatus.OK) 346 self.send_header("Content-Type", self.CONTENT_TYPE) 347 self.send_header("Content-Length", str(len(body))) 348 self.end_headers() 349 self.wfile.write(body) 350 LOG.info("%s %s", self.address_string(), self.requestline) 351 352 except (FileNotFoundError, ConnectionRefusedError): 353 self.send_error(HTTPStatus.SERVICE_UNAVAILABLE) 354 LOG.exception( 355 "%s %s: telemetry socket not available", 356 self.address_string(), 357 self.requestline, 358 ) 359 except Exception: 360 self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR) 361 LOG.exception("%s %s", self.address_string(), self.requestline) 362 363 def log_message(self, fmt, *args): 364 pass # disable built-in logger 365 366 367def export_carbon(args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint]): 368 """ 369 Collect all metrics and export them to a carbon server in the pickle format. 370 """ 371 addr = (args.output.hostname or "", int(args.output.port or 80)) 372 with TelemetrySocket(args.socket_path) as dpdk: 373 with socket.socket() as carbon: 374 carbon.connect(addr) 375 all_metrics = [] 376 for e in endpoints: 377 try: 378 metrics = e.metrics(dpdk) 379 except Exception: 380 LOG.exception("%s: metrics collection failed", e.__name__) 381 continue 382 for name, value, labels in metrics: 383 fullname = re.sub(r"\W", ".", f"dpdk.{e.__name__}.{name}") 384 for key, val in labels.items(): 385 val = str(val).replace(";", "") 386 fullname += f";{key}={val}" 387 all_metrics.append((fullname, (time.time(), value))) 388 if not all_metrics: 389 raise Exception("no metrics collected") 390 payload = pickle.dumps(all_metrics, protocol=2) 391 header = struct.pack("!L", len(payload)) 392 buf = header + payload 393 carbon.sendall(buf) 394 395 396OUTPUT_FORMATS = { 397 "openmetrics": serve_openmetrics, 398 "prometheus": serve_openmetrics, 399 "carbon": export_carbon, 400 "graphite": export_carbon, 401} 402 403 404if __name__ == "__main__": 405 main() 406