1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(C) 2020 Marvell International Ltd. 3 */ 4 5 #ifndef _RTE_GRAPH_WORKER_H_ 6 #define _RTE_GRAPH_WORKER_H_ 7 8 /** 9 * @file rte_graph_worker.h 10 * 11 * @warning 12 * @b EXPERIMENTAL: 13 * All functions in this file may be changed or removed without prior notice. 14 * 15 * This API allows a worker thread to walk over a graph and nodes to create, 16 * process, enqueue and move streams of objects to the next nodes. 17 */ 18 19 #include <rte_compat.h> 20 #include <rte_common.h> 21 #include <rte_cycles.h> 22 #include <rte_prefetch.h> 23 #include <rte_memcpy.h> 24 #include <rte_memory.h> 25 26 #include "rte_graph.h" 27 28 #ifdef __cplusplus 29 extern "C" { 30 #endif 31 32 /** 33 * @internal 34 * 35 * Data structure to hold graph data. 36 */ 37 struct rte_graph { 38 uint32_t tail; /**< Tail of circular buffer. */ 39 uint32_t head; /**< Head of circular buffer. */ 40 uint32_t cir_mask; /**< Circular buffer wrap around mask. */ 41 rte_node_t nb_nodes; /**< Number of nodes in the graph. */ 42 rte_graph_off_t *cir_start; /**< Pointer to circular buffer. */ 43 rte_graph_off_t nodes_start; /**< Offset at which node memory starts. */ 44 rte_graph_t id; /**< Graph identifier. */ 45 int socket; /**< Socket ID where memory is allocated. */ 46 char name[RTE_GRAPH_NAMESIZE]; /**< Name of the graph. */ 47 bool pcap_enable; /**< Pcap trace enabled. */ 48 /** Number of packets captured per core. */ 49 uint64_t nb_pkt_captured; 50 /** Number of packets to capture per core. */ 51 uint64_t nb_pkt_to_capture; 52 char pcap_filename[RTE_GRAPH_PCAP_FILE_SZ]; /**< Pcap filename. */ 53 uint64_t fence; /**< Fence. */ 54 } __rte_cache_aligned; 55 56 /** 57 * @internal 58 * 59 * Data structure to hold node data. 60 */ 61 struct rte_node { 62 /* Slow path area */ 63 uint64_t fence; /**< Fence. */ 64 rte_graph_off_t next; /**< Index to next node. */ 65 rte_node_t id; /**< Node identifier. */ 66 rte_node_t parent_id; /**< Parent Node identifier. */ 67 rte_edge_t nb_edges; /**< Number of edges from this node. */ 68 uint32_t realloc_count; /**< Number of times realloced. */ 69 70 char parent[RTE_NODE_NAMESIZE]; /**< Parent node name. */ 71 char name[RTE_NODE_NAMESIZE]; /**< Name of the node. */ 72 73 /** Original process function when pcap is enabled. */ 74 rte_node_process_t original_process; 75 76 /* Fast path area */ 77 #define RTE_NODE_CTX_SZ 16 78 uint8_t ctx[RTE_NODE_CTX_SZ] __rte_cache_aligned; /**< Node Context. */ 79 uint16_t size; /**< Total number of objects available. */ 80 uint16_t idx; /**< Number of objects used. */ 81 rte_graph_off_t off; /**< Offset of node in the graph reel. */ 82 uint64_t total_cycles; /**< Cycles spent in this node. */ 83 uint64_t total_calls; /**< Calls done to this node. */ 84 uint64_t total_objs; /**< Objects processed by this node. */ 85 RTE_STD_C11 86 union { 87 void **objs; /**< Array of object pointers. */ 88 uint64_t objs_u64; 89 }; 90 RTE_STD_C11 91 union { 92 rte_node_process_t process; /**< Process function. */ 93 uint64_t process_u64; 94 }; 95 struct rte_node *nodes[] __rte_cache_min_aligned; /**< Next nodes. */ 96 } __rte_cache_aligned; 97 98 /** 99 * @internal 100 * 101 * Allocate a stream of objects. 102 * 103 * If stream already exists then re-allocate it to a larger size. 104 * 105 * @param graph 106 * Pointer to the graph object. 107 * @param node 108 * Pointer to the node object. 109 */ 110 __rte_experimental 111 void __rte_node_stream_alloc(struct rte_graph *graph, struct rte_node *node); 112 113 /** 114 * @internal 115 * 116 * Allocate a stream with requested number of objects. 117 * 118 * If stream already exists then re-allocate it to a larger size. 119 * 120 * @param graph 121 * Pointer to the graph object. 122 * @param node 123 * Pointer to the node object. 124 * @param req_size 125 * Number of objects to be allocated. 126 */ 127 __rte_experimental 128 void __rte_node_stream_alloc_size(struct rte_graph *graph, 129 struct rte_node *node, uint16_t req_size); 130 131 /** 132 * Perform graph walk on the circular buffer and invoke the process function 133 * of the nodes and collect the stats. 134 * 135 * @param graph 136 * Graph pointer returned from rte_graph_lookup function. 137 * 138 * @see rte_graph_lookup() 139 */ 140 __rte_experimental 141 static inline void 142 rte_graph_walk(struct rte_graph *graph) 143 { 144 const rte_graph_off_t *cir_start = graph->cir_start; 145 const rte_node_t mask = graph->cir_mask; 146 uint32_t head = graph->head; 147 struct rte_node *node; 148 uint64_t start; 149 uint16_t rc; 150 void **objs; 151 152 /* 153 * Walk on the source node(s) ((cir_start - head) -> cir_start) and then 154 * on the pending streams (cir_start -> (cir_start + mask) -> cir_start) 155 * in a circular buffer fashion. 156 * 157 * +-----+ <= cir_start - head [number of source nodes] 158 * | | 159 * | ... | <= source nodes 160 * | | 161 * +-----+ <= cir_start [head = 0] [tail = 0] 162 * | | 163 * | ... | <= pending streams 164 * | | 165 * +-----+ <= cir_start + mask 166 */ 167 while (likely(head != graph->tail)) { 168 node = (struct rte_node *)RTE_PTR_ADD(graph, cir_start[(int32_t)head++]); 169 RTE_ASSERT(node->fence == RTE_GRAPH_FENCE); 170 objs = node->objs; 171 rte_prefetch0(objs); 172 173 if (rte_graph_has_stats_feature()) { 174 start = rte_rdtsc(); 175 rc = node->process(graph, node, objs, node->idx); 176 node->total_cycles += rte_rdtsc() - start; 177 node->total_calls++; 178 node->total_objs += rc; 179 } else { 180 node->process(graph, node, objs, node->idx); 181 } 182 node->idx = 0; 183 head = likely((int32_t)head > 0) ? head & mask : head; 184 } 185 graph->tail = 0; 186 } 187 188 /* Fast path helper functions */ 189 190 /** 191 * @internal 192 * 193 * Enqueue a given node to the tail of the graph reel. 194 * 195 * @param graph 196 * Pointer Graph object. 197 * @param node 198 * Pointer to node object to be enqueued. 199 */ 200 static __rte_always_inline void 201 __rte_node_enqueue_tail_update(struct rte_graph *graph, struct rte_node *node) 202 { 203 uint32_t tail; 204 205 tail = graph->tail; 206 graph->cir_start[tail++] = node->off; 207 graph->tail = tail & graph->cir_mask; 208 } 209 210 /** 211 * @internal 212 * 213 * Enqueue sequence prologue function. 214 * 215 * Updates the node to tail of graph reel and resizes the number of objects 216 * available in the stream as needed. 217 * 218 * @param graph 219 * Pointer to the graph object. 220 * @param node 221 * Pointer to the node object. 222 * @param idx 223 * Index at which the object enqueue starts from. 224 * @param space 225 * Space required for the object enqueue. 226 */ 227 static __rte_always_inline void 228 __rte_node_enqueue_prologue(struct rte_graph *graph, struct rte_node *node, 229 const uint16_t idx, const uint16_t space) 230 { 231 232 /* Add to the pending stream list if the node is new */ 233 if (idx == 0) 234 __rte_node_enqueue_tail_update(graph, node); 235 236 if (unlikely(node->size < (idx + space))) 237 __rte_node_stream_alloc_size(graph, node, node->size + space); 238 } 239 240 /** 241 * @internal 242 * 243 * Get the node pointer from current node edge id. 244 * 245 * @param node 246 * Current node pointer. 247 * @param next 248 * Edge id of the required node. 249 * 250 * @return 251 * Pointer to the node denoted by the edge id. 252 */ 253 static __rte_always_inline struct rte_node * 254 __rte_node_next_node_get(struct rte_node *node, rte_edge_t next) 255 { 256 RTE_ASSERT(next < node->nb_edges); 257 RTE_ASSERT(node->fence == RTE_GRAPH_FENCE); 258 node = node->nodes[next]; 259 RTE_ASSERT(node->fence == RTE_GRAPH_FENCE); 260 261 return node; 262 } 263 264 /** 265 * Enqueue the objs to next node for further processing and set 266 * the next node to pending state in the circular buffer. 267 * 268 * @param graph 269 * Graph pointer returned from rte_graph_lookup(). 270 * @param node 271 * Current node pointer. 272 * @param next 273 * Relative next node index to enqueue objs. 274 * @param objs 275 * Objs to enqueue. 276 * @param nb_objs 277 * Number of objs to enqueue. 278 */ 279 __rte_experimental 280 static inline void 281 rte_node_enqueue(struct rte_graph *graph, struct rte_node *node, 282 rte_edge_t next, void **objs, uint16_t nb_objs) 283 { 284 node = __rte_node_next_node_get(node, next); 285 const uint16_t idx = node->idx; 286 287 __rte_node_enqueue_prologue(graph, node, idx, nb_objs); 288 289 rte_memcpy(&node->objs[idx], objs, nb_objs * sizeof(void *)); 290 node->idx = idx + nb_objs; 291 } 292 293 /** 294 * Enqueue only one obj to next node for further processing and 295 * set the next node to pending state in the circular buffer. 296 * 297 * @param graph 298 * Graph pointer returned from rte_graph_lookup(). 299 * @param node 300 * Current node pointer. 301 * @param next 302 * Relative next node index to enqueue objs. 303 * @param obj 304 * Obj to enqueue. 305 */ 306 __rte_experimental 307 static inline void 308 rte_node_enqueue_x1(struct rte_graph *graph, struct rte_node *node, 309 rte_edge_t next, void *obj) 310 { 311 node = __rte_node_next_node_get(node, next); 312 uint16_t idx = node->idx; 313 314 __rte_node_enqueue_prologue(graph, node, idx, 1); 315 316 node->objs[idx++] = obj; 317 node->idx = idx; 318 } 319 320 /** 321 * Enqueue only two objs to next node for further processing and 322 * set the next node to pending state in the circular buffer. 323 * Same as rte_node_enqueue_x1 but enqueue two objs. 324 * 325 * @param graph 326 * Graph pointer returned from rte_graph_lookup(). 327 * @param node 328 * Current node pointer. 329 * @param next 330 * Relative next node index to enqueue objs. 331 * @param obj0 332 * Obj to enqueue. 333 * @param obj1 334 * Obj to enqueue. 335 */ 336 __rte_experimental 337 static inline void 338 rte_node_enqueue_x2(struct rte_graph *graph, struct rte_node *node, 339 rte_edge_t next, void *obj0, void *obj1) 340 { 341 node = __rte_node_next_node_get(node, next); 342 uint16_t idx = node->idx; 343 344 __rte_node_enqueue_prologue(graph, node, idx, 2); 345 346 node->objs[idx++] = obj0; 347 node->objs[idx++] = obj1; 348 node->idx = idx; 349 } 350 351 /** 352 * Enqueue only four objs to next node for further processing and 353 * set the next node to pending state in the circular buffer. 354 * Same as rte_node_enqueue_x1 but enqueue four objs. 355 * 356 * @param graph 357 * Graph pointer returned from rte_graph_lookup(). 358 * @param node 359 * Current node pointer. 360 * @param next 361 * Relative next node index to enqueue objs. 362 * @param obj0 363 * 1st obj to enqueue. 364 * @param obj1 365 * 2nd obj to enqueue. 366 * @param obj2 367 * 3rd obj to enqueue. 368 * @param obj3 369 * 4th obj to enqueue. 370 */ 371 __rte_experimental 372 static inline void 373 rte_node_enqueue_x4(struct rte_graph *graph, struct rte_node *node, 374 rte_edge_t next, void *obj0, void *obj1, void *obj2, 375 void *obj3) 376 { 377 node = __rte_node_next_node_get(node, next); 378 uint16_t idx = node->idx; 379 380 __rte_node_enqueue_prologue(graph, node, idx, 4); 381 382 node->objs[idx++] = obj0; 383 node->objs[idx++] = obj1; 384 node->objs[idx++] = obj2; 385 node->objs[idx++] = obj3; 386 node->idx = idx; 387 } 388 389 /** 390 * Enqueue objs to multiple next nodes for further processing and 391 * set the next nodes to pending state in the circular buffer. 392 * objs[i] will be enqueued to nexts[i]. 393 * 394 * @param graph 395 * Graph pointer returned from rte_graph_lookup(). 396 * @param node 397 * Current node pointer. 398 * @param nexts 399 * List of relative next node indices to enqueue objs. 400 * @param objs 401 * List of objs to enqueue. 402 * @param nb_objs 403 * Number of objs to enqueue. 404 */ 405 __rte_experimental 406 static inline void 407 rte_node_enqueue_next(struct rte_graph *graph, struct rte_node *node, 408 rte_edge_t *nexts, void **objs, uint16_t nb_objs) 409 { 410 uint16_t i; 411 412 for (i = 0; i < nb_objs; i++) 413 rte_node_enqueue_x1(graph, node, nexts[i], objs[i]); 414 } 415 416 /** 417 * Get the stream of next node to enqueue the objs. 418 * Once done with the updating the objs, needs to call 419 * rte_node_next_stream_put to put the next node to pending state. 420 * 421 * @param graph 422 * Graph pointer returned from rte_graph_lookup(). 423 * @param node 424 * Current node pointer. 425 * @param next 426 * Relative next node index to get stream. 427 * @param nb_objs 428 * Requested free size of the next stream. 429 * 430 * @return 431 * Valid next stream on success. 432 * 433 * @see rte_node_next_stream_put(). 434 */ 435 __rte_experimental 436 static inline void ** 437 rte_node_next_stream_get(struct rte_graph *graph, struct rte_node *node, 438 rte_edge_t next, uint16_t nb_objs) 439 { 440 node = __rte_node_next_node_get(node, next); 441 const uint16_t idx = node->idx; 442 uint16_t free_space = node->size - idx; 443 444 if (unlikely(free_space < nb_objs)) 445 __rte_node_stream_alloc_size(graph, node, node->size + nb_objs); 446 447 return &node->objs[idx]; 448 } 449 450 /** 451 * Put the next stream to pending state in the circular buffer 452 * for further processing. Should be invoked after rte_node_next_stream_get(). 453 * 454 * @param graph 455 * Graph pointer returned from rte_graph_lookup(). 456 * @param node 457 * Current node pointer. 458 * @param next 459 * Relative next node index.. 460 * @param idx 461 * Number of objs updated in the stream after getting the stream using 462 * rte_node_next_stream_get. 463 * 464 * @see rte_node_next_stream_get(). 465 */ 466 __rte_experimental 467 static inline void 468 rte_node_next_stream_put(struct rte_graph *graph, struct rte_node *node, 469 rte_edge_t next, uint16_t idx) 470 { 471 if (unlikely(!idx)) 472 return; 473 474 node = __rte_node_next_node_get(node, next); 475 if (node->idx == 0) 476 __rte_node_enqueue_tail_update(graph, node); 477 478 node->idx += idx; 479 } 480 481 /** 482 * Home run scenario, Enqueue all the objs of current node to next 483 * node in optimized way by swapping the streams of both nodes. 484 * Performs good when next node is already not in pending state. 485 * If next node is already in pending state then normal enqueue 486 * will be used. 487 * 488 * @param graph 489 * Graph pointer returned from rte_graph_lookup(). 490 * @param src 491 * Current node pointer. 492 * @param next 493 * Relative next node index. 494 */ 495 __rte_experimental 496 static inline void 497 rte_node_next_stream_move(struct rte_graph *graph, struct rte_node *src, 498 rte_edge_t next) 499 { 500 struct rte_node *dst = __rte_node_next_node_get(src, next); 501 502 /* Let swap the pointers if dst don't have valid objs */ 503 if (likely(dst->idx == 0)) { 504 void **dobjs = dst->objs; 505 uint16_t dsz = dst->size; 506 dst->objs = src->objs; 507 dst->size = src->size; 508 src->objs = dobjs; 509 src->size = dsz; 510 dst->idx = src->idx; 511 __rte_node_enqueue_tail_update(graph, dst); 512 } else { /* Move the objects from src node to dst node */ 513 rte_node_enqueue(graph, src, next, src->objs, src->idx); 514 } 515 } 516 517 #ifdef __cplusplus 518 } 519 #endif 520 521 #endif /* _RTE_GRAPH_WORKER_H_ */ 522