1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(C) 2020 Marvell International Ltd. 3 */ 4 5 #ifndef _RTE_GRAPH_WORKER_H_ 6 #define _RTE_GRAPH_WORKER_H_ 7 8 /** 9 * @file rte_graph_worker.h 10 * 11 * @warning 12 * @b EXPERIMENTAL: 13 * All functions in this file may be changed or removed without prior notice. 14 * 15 * This API allows a worker thread to walk over a graph and nodes to create, 16 * process, enqueue and move streams of objects to the next nodes. 17 */ 18 19 #include <rte_common.h> 20 #include <rte_cycles.h> 21 #include <rte_prefetch.h> 22 #include <rte_memcpy.h> 23 #include <rte_memory.h> 24 25 #include "rte_graph.h" 26 27 #ifdef __cplusplus 28 extern "C" { 29 #endif 30 31 /** 32 * @internal 33 * 34 * Data structure to hold graph data. 35 */ 36 struct rte_graph { 37 uint32_t tail; /**< Tail of circular buffer. */ 38 uint32_t head; /**< Head of circular buffer. */ 39 uint32_t cir_mask; /**< Circular buffer wrap around mask. */ 40 rte_node_t nb_nodes; /**< Number of nodes in the graph. */ 41 rte_graph_off_t *cir_start; /**< Pointer to circular buffer. */ 42 rte_graph_off_t nodes_start; /**< Offset at which node memory starts. */ 43 rte_graph_t id; /**< Graph identifier. */ 44 int socket; /**< Socket ID where memory is allocated. */ 45 char name[RTE_GRAPH_NAMESIZE]; /**< Name of the graph. */ 46 uint64_t fence; /**< Fence. */ 47 } __rte_cache_aligned; 48 49 /** 50 * @internal 51 * 52 * Data structure to hold node data. 53 */ 54 struct rte_node { 55 /* Slow path area */ 56 uint64_t fence; /**< Fence. */ 57 rte_graph_off_t next; /**< Index to next node. */ 58 rte_node_t id; /**< Node identifier. */ 59 rte_node_t parent_id; /**< Parent Node identifier. */ 60 rte_edge_t nb_edges; /**< Number of edges from this node. */ 61 uint32_t realloc_count; /**< Number of times realloced. */ 62 63 char parent[RTE_NODE_NAMESIZE]; /**< Parent node name. */ 64 char name[RTE_NODE_NAMESIZE]; /**< Name of the node. */ 65 66 /* Fast path area */ 67 #define RTE_NODE_CTX_SZ 16 68 uint8_t ctx[RTE_NODE_CTX_SZ] __rte_cache_aligned; /**< Node Context. */ 69 uint16_t size; /**< Total number of objects available. */ 70 uint16_t idx; /**< Number of objects used. */ 71 rte_graph_off_t off; /**< Offset of node in the graph reel. */ 72 uint64_t total_cycles; /**< Cycles spent in this node. */ 73 uint64_t total_calls; /**< Calls done to this node. */ 74 uint64_t total_objs; /**< Objects processed by this node. */ 75 RTE_STD_C11 76 union { 77 void **objs; /**< Array of object pointers. */ 78 uint64_t objs_u64; 79 }; 80 RTE_STD_C11 81 union { 82 rte_node_process_t process; /**< Process function. */ 83 uint64_t process_u64; 84 }; 85 struct rte_node *nodes[] __rte_cache_min_aligned; /**< Next nodes. */ 86 } __rte_cache_aligned; 87 88 /** 89 * @internal 90 * 91 * Allocate a stream of objects. 92 * 93 * If stream already exists then re-allocate it to a larger size. 94 * 95 * @param graph 96 * Pointer to the graph object. 97 * @param node 98 * Pointer to the node object. 99 */ 100 __rte_experimental 101 void __rte_node_stream_alloc(struct rte_graph *graph, struct rte_node *node); 102 103 /** 104 * @internal 105 * 106 * Allocate a stream with requested number of objects. 107 * 108 * If stream already exists then re-allocate it to a larger size. 109 * 110 * @param graph 111 * Pointer to the graph object. 112 * @param node 113 * Pointer to the node object. 114 * @param req_size 115 * Number of objects to be allocated. 116 */ 117 __rte_experimental 118 void __rte_node_stream_alloc_size(struct rte_graph *graph, 119 struct rte_node *node, uint16_t req_size); 120 121 /** 122 * Perform graph walk on the circular buffer and invoke the process function 123 * of the nodes and collect the stats. 124 * 125 * @param graph 126 * Graph pointer returned from rte_graph_lookup function. 127 * 128 * @see rte_graph_lookup() 129 */ 130 __rte_experimental 131 static inline void 132 rte_graph_walk(struct rte_graph *graph) 133 { 134 const rte_graph_off_t *cir_start = graph->cir_start; 135 const rte_node_t mask = graph->cir_mask; 136 uint32_t head = graph->head; 137 struct rte_node *node; 138 uint64_t start; 139 uint16_t rc; 140 void **objs; 141 142 /* 143 * Walk on the source node(s) ((cir_start - head) -> cir_start) and then 144 * on the pending streams (cir_start -> (cir_start + mask) -> cir_start) 145 * in a circular buffer fashion. 146 * 147 * +-----+ <= cir_start - head [number of source nodes] 148 * | | 149 * | ... | <= source nodes 150 * | | 151 * +-----+ <= cir_start [head = 0] [tail = 0] 152 * | | 153 * | ... | <= pending streams 154 * | | 155 * +-----+ <= cir_start + mask 156 */ 157 while (likely(head != graph->tail)) { 158 node = RTE_PTR_ADD(graph, cir_start[(int32_t)head++]); 159 RTE_ASSERT(node->fence == RTE_GRAPH_FENCE); 160 objs = node->objs; 161 rte_prefetch0(objs); 162 163 if (rte_graph_has_stats_feature()) { 164 start = rte_rdtsc(); 165 rc = node->process(graph, node, objs, node->idx); 166 node->total_cycles += rte_rdtsc() - start; 167 node->total_calls++; 168 node->total_objs += rc; 169 } else { 170 node->process(graph, node, objs, node->idx); 171 } 172 node->idx = 0; 173 head = likely((int32_t)head > 0) ? head & mask : head; 174 } 175 graph->tail = 0; 176 } 177 178 /* Fast path helper functions */ 179 180 /** 181 * @internal 182 * 183 * Enqueue a given node to the tail of the graph reel. 184 * 185 * @param graph 186 * Pointer Graph object. 187 * @param node 188 * Pointer to node object to be enqueued. 189 */ 190 static __rte_always_inline void 191 __rte_node_enqueue_tail_update(struct rte_graph *graph, struct rte_node *node) 192 { 193 uint32_t tail; 194 195 tail = graph->tail; 196 graph->cir_start[tail++] = node->off; 197 graph->tail = tail & graph->cir_mask; 198 } 199 200 /** 201 * @internal 202 * 203 * Enqueue sequence prologue function. 204 * 205 * Updates the node to tail of graph reel and resizes the number of objects 206 * available in the stream as needed. 207 * 208 * @param graph 209 * Pointer to the graph object. 210 * @param node 211 * Pointer to the node object. 212 * @param idx 213 * Index at which the object enqueue starts from. 214 * @param space 215 * Space required for the object enqueue. 216 */ 217 static __rte_always_inline void 218 __rte_node_enqueue_prologue(struct rte_graph *graph, struct rte_node *node, 219 const uint16_t idx, const uint16_t space) 220 { 221 222 /* Add to the pending stream list if the node is new */ 223 if (idx == 0) 224 __rte_node_enqueue_tail_update(graph, node); 225 226 if (unlikely(node->size < (idx + space))) 227 __rte_node_stream_alloc(graph, node); 228 } 229 230 /** 231 * @internal 232 * 233 * Get the node pointer from current node edge id. 234 * 235 * @param node 236 * Current node pointer. 237 * @param next 238 * Edge id of the required node. 239 * 240 * @return 241 * Pointer to the node denoted by the edge id. 242 */ 243 static __rte_always_inline struct rte_node * 244 __rte_node_next_node_get(struct rte_node *node, rte_edge_t next) 245 { 246 RTE_ASSERT(next < node->nb_edges); 247 RTE_ASSERT(node->fence == RTE_GRAPH_FENCE); 248 node = node->nodes[next]; 249 RTE_ASSERT(node->fence == RTE_GRAPH_FENCE); 250 251 return node; 252 } 253 254 /** 255 * Enqueue the objs to next node for further processing and set 256 * the next node to pending state in the circular buffer. 257 * 258 * @param graph 259 * Graph pointer returned from rte_graph_lookup(). 260 * @param node 261 * Current node pointer. 262 * @param next 263 * Relative next node index to enqueue objs. 264 * @param objs 265 * Objs to enqueue. 266 * @param nb_objs 267 * Number of objs to enqueue. 268 */ 269 __rte_experimental 270 static inline void 271 rte_node_enqueue(struct rte_graph *graph, struct rte_node *node, 272 rte_edge_t next, void **objs, uint16_t nb_objs) 273 { 274 node = __rte_node_next_node_get(node, next); 275 const uint16_t idx = node->idx; 276 277 __rte_node_enqueue_prologue(graph, node, idx, nb_objs); 278 279 rte_memcpy(&node->objs[idx], objs, nb_objs * sizeof(void *)); 280 node->idx = idx + nb_objs; 281 } 282 283 /** 284 * Enqueue only one obj to next node for further processing and 285 * set the next node to pending state in the circular buffer. 286 * 287 * @param graph 288 * Graph pointer returned from rte_graph_lookup(). 289 * @param node 290 * Current node pointer. 291 * @param next 292 * Relative next node index to enqueue objs. 293 * @param obj 294 * Obj to enqueue. 295 */ 296 __rte_experimental 297 static inline void 298 rte_node_enqueue_x1(struct rte_graph *graph, struct rte_node *node, 299 rte_edge_t next, void *obj) 300 { 301 node = __rte_node_next_node_get(node, next); 302 uint16_t idx = node->idx; 303 304 __rte_node_enqueue_prologue(graph, node, idx, 1); 305 306 node->objs[idx++] = obj; 307 node->idx = idx; 308 } 309 310 /** 311 * Enqueue only two objs to next node for further processing and 312 * set the next node to pending state in the circular buffer. 313 * Same as rte_node_enqueue_x1 but enqueue two objs. 314 * 315 * @param graph 316 * Graph pointer returned from rte_graph_lookup(). 317 * @param node 318 * Current node pointer. 319 * @param next 320 * Relative next node index to enqueue objs. 321 * @param obj0 322 * Obj to enqueue. 323 * @param obj1 324 * Obj to enqueue. 325 */ 326 __rte_experimental 327 static inline void 328 rte_node_enqueue_x2(struct rte_graph *graph, struct rte_node *node, 329 rte_edge_t next, void *obj0, void *obj1) 330 { 331 node = __rte_node_next_node_get(node, next); 332 uint16_t idx = node->idx; 333 334 __rte_node_enqueue_prologue(graph, node, idx, 2); 335 336 node->objs[idx++] = obj0; 337 node->objs[idx++] = obj1; 338 node->idx = idx; 339 } 340 341 /** 342 * Enqueue only four objs to next node for further processing and 343 * set the next node to pending state in the circular buffer. 344 * Same as rte_node_enqueue_x1 but enqueue four objs. 345 * 346 * @param graph 347 * Graph pointer returned from rte_graph_lookup(). 348 * @param node 349 * Current node pointer. 350 * @param next 351 * Relative next node index to enqueue objs. 352 * @param obj0 353 * 1st obj to enqueue. 354 * @param obj1 355 * 2nd obj to enqueue. 356 * @param obj2 357 * 3rd obj to enqueue. 358 * @param obj3 359 * 4th obj to enqueue. 360 */ 361 __rte_experimental 362 static inline void 363 rte_node_enqueue_x4(struct rte_graph *graph, struct rte_node *node, 364 rte_edge_t next, void *obj0, void *obj1, void *obj2, 365 void *obj3) 366 { 367 node = __rte_node_next_node_get(node, next); 368 uint16_t idx = node->idx; 369 370 __rte_node_enqueue_prologue(graph, node, idx, 4); 371 372 node->objs[idx++] = obj0; 373 node->objs[idx++] = obj1; 374 node->objs[idx++] = obj2; 375 node->objs[idx++] = obj3; 376 node->idx = idx; 377 } 378 379 /** 380 * Enqueue objs to multiple next nodes for further processing and 381 * set the next nodes to pending state in the circular buffer. 382 * objs[i] will be enqueued to nexts[i]. 383 * 384 * @param graph 385 * Graph pointer returned from rte_graph_lookup(). 386 * @param node 387 * Current node pointer. 388 * @param nexts 389 * List of relative next node indices to enqueue objs. 390 * @param objs 391 * List of objs to enqueue. 392 * @param nb_objs 393 * Number of objs to enqueue. 394 */ 395 __rte_experimental 396 static inline void 397 rte_node_enqueue_next(struct rte_graph *graph, struct rte_node *node, 398 rte_edge_t *nexts, void **objs, uint16_t nb_objs) 399 { 400 uint16_t i; 401 402 for (i = 0; i < nb_objs; i++) 403 rte_node_enqueue_x1(graph, node, nexts[i], objs[i]); 404 } 405 406 /** 407 * Get the stream of next node to enqueue the objs. 408 * Once done with the updating the objs, needs to call 409 * rte_node_next_stream_put to put the next node to pending state. 410 * 411 * @param graph 412 * Graph pointer returned from rte_graph_lookup(). 413 * @param node 414 * Current node pointer. 415 * @param next 416 * Relative next node index to get stream. 417 * @param nb_objs 418 * Requested free size of the next stream. 419 * 420 * @return 421 * Valid next stream on success. 422 * 423 * @see rte_node_next_stream_put(). 424 */ 425 __rte_experimental 426 static inline void ** 427 rte_node_next_stream_get(struct rte_graph *graph, struct rte_node *node, 428 rte_edge_t next, uint16_t nb_objs) 429 { 430 node = __rte_node_next_node_get(node, next); 431 const uint16_t idx = node->idx; 432 uint16_t free_space = node->size - idx; 433 434 if (unlikely(free_space < nb_objs)) 435 __rte_node_stream_alloc_size(graph, node, nb_objs); 436 437 return &node->objs[idx]; 438 } 439 440 /** 441 * Put the next stream to pending state in the circular buffer 442 * for further processing. Should be invoked after rte_node_next_stream_get(). 443 * 444 * @param graph 445 * Graph pointer returned from rte_graph_lookup(). 446 * @param node 447 * Current node pointer. 448 * @param next 449 * Relative next node index.. 450 * @param idx 451 * Number of objs updated in the stream after getting the stream using 452 * rte_node_next_stream_get. 453 * 454 * @see rte_node_next_stream_get(). 455 */ 456 __rte_experimental 457 static inline void 458 rte_node_next_stream_put(struct rte_graph *graph, struct rte_node *node, 459 rte_edge_t next, uint16_t idx) 460 { 461 if (unlikely(!idx)) 462 return; 463 464 node = __rte_node_next_node_get(node, next); 465 if (node->idx == 0) 466 __rte_node_enqueue_tail_update(graph, node); 467 468 node->idx += idx; 469 } 470 471 /** 472 * Home run scenario, Enqueue all the objs of current node to next 473 * node in optimized way by swapping the streams of both nodes. 474 * Performs good when next node is already not in pending state. 475 * If next node is already in pending state then normal enqueue 476 * will be used. 477 * 478 * @param graph 479 * Graph pointer returned from rte_graph_lookup(). 480 * @param src 481 * Current node pointer. 482 * @param next 483 * Relative next node index. 484 */ 485 __rte_experimental 486 static inline void 487 rte_node_next_stream_move(struct rte_graph *graph, struct rte_node *src, 488 rte_edge_t next) 489 { 490 struct rte_node *dst = __rte_node_next_node_get(src, next); 491 492 /* Let swap the pointers if dst don't have valid objs */ 493 if (likely(dst->idx == 0)) { 494 void **dobjs = dst->objs; 495 uint16_t dsz = dst->size; 496 dst->objs = src->objs; 497 dst->size = src->size; 498 src->objs = dobjs; 499 src->size = dsz; 500 dst->idx = src->idx; 501 __rte_node_enqueue_tail_update(graph, dst); 502 } else { /* Move the objects from src node to dst node */ 503 rte_node_enqueue(graph, src, next, src->objs, src->idx); 504 } 505 } 506 507 #ifdef __cplusplus 508 } 509 #endif 510 511 #endif /* _RTE_GRAPH_WORKER_H_ */ 512