1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(C) 2020 Marvell International Ltd. 3 */ 4 5 #ifndef _RTE_GRAPH_WORKER_H_ 6 #define _RTE_GRAPH_WORKER_H_ 7 8 /** 9 * @file rte_graph_worker.h 10 * 11 * @warning 12 * @b EXPERIMENTAL: 13 * All functions in this file may be changed or removed without prior notice. 14 * 15 * This API allows a worker thread to walk over a graph and nodes to create, 16 * process, enqueue and move streams of objects to the next nodes. 17 */ 18 19 #include <rte_compat.h> 20 #include <rte_common.h> 21 #include <rte_cycles.h> 22 #include <rte_prefetch.h> 23 #include <rte_memcpy.h> 24 #include <rte_memory.h> 25 26 #include "rte_graph.h" 27 28 #ifdef __cplusplus 29 extern "C" { 30 #endif 31 32 /** 33 * @internal 34 * 35 * Data structure to hold graph data. 36 */ 37 struct rte_graph { 38 uint32_t tail; /**< Tail of circular buffer. */ 39 uint32_t head; /**< Head of circular buffer. */ 40 uint32_t cir_mask; /**< Circular buffer wrap around mask. */ 41 rte_node_t nb_nodes; /**< Number of nodes in the graph. */ 42 rte_graph_off_t *cir_start; /**< Pointer to circular buffer. */ 43 rte_graph_off_t nodes_start; /**< Offset at which node memory starts. */ 44 rte_graph_t id; /**< Graph identifier. */ 45 int socket; /**< Socket ID where memory is allocated. */ 46 char name[RTE_GRAPH_NAMESIZE]; /**< Name of the graph. */ 47 uint64_t fence; /**< Fence. */ 48 } __rte_cache_aligned; 49 50 /** 51 * @internal 52 * 53 * Data structure to hold node data. 54 */ 55 struct rte_node { 56 /* Slow path area */ 57 uint64_t fence; /**< Fence. */ 58 rte_graph_off_t next; /**< Index to next node. */ 59 rte_node_t id; /**< Node identifier. */ 60 rte_node_t parent_id; /**< Parent Node identifier. */ 61 rte_edge_t nb_edges; /**< Number of edges from this node. */ 62 uint32_t realloc_count; /**< Number of times realloced. */ 63 64 char parent[RTE_NODE_NAMESIZE]; /**< Parent node name. */ 65 char name[RTE_NODE_NAMESIZE]; /**< Name of the node. */ 66 67 /* Fast path area */ 68 #define RTE_NODE_CTX_SZ 16 69 uint8_t ctx[RTE_NODE_CTX_SZ] __rte_cache_aligned; /**< Node Context. */ 70 uint16_t size; /**< Total number of objects available. */ 71 uint16_t idx; /**< Number of objects used. */ 72 rte_graph_off_t off; /**< Offset of node in the graph reel. */ 73 uint64_t total_cycles; /**< Cycles spent in this node. */ 74 uint64_t total_calls; /**< Calls done to this node. */ 75 uint64_t total_objs; /**< Objects processed by this node. */ 76 RTE_STD_C11 77 union { 78 void **objs; /**< Array of object pointers. */ 79 uint64_t objs_u64; 80 }; 81 RTE_STD_C11 82 union { 83 rte_node_process_t process; /**< Process function. */ 84 uint64_t process_u64; 85 }; 86 struct rte_node *nodes[] __rte_cache_min_aligned; /**< Next nodes. */ 87 } __rte_cache_aligned; 88 89 /** 90 * @internal 91 * 92 * Allocate a stream of objects. 93 * 94 * If stream already exists then re-allocate it to a larger size. 95 * 96 * @param graph 97 * Pointer to the graph object. 98 * @param node 99 * Pointer to the node object. 100 */ 101 __rte_experimental 102 void __rte_node_stream_alloc(struct rte_graph *graph, struct rte_node *node); 103 104 /** 105 * @internal 106 * 107 * Allocate a stream with requested number of objects. 108 * 109 * If stream already exists then re-allocate it to a larger size. 110 * 111 * @param graph 112 * Pointer to the graph object. 113 * @param node 114 * Pointer to the node object. 115 * @param req_size 116 * Number of objects to be allocated. 117 */ 118 __rte_experimental 119 void __rte_node_stream_alloc_size(struct rte_graph *graph, 120 struct rte_node *node, uint16_t req_size); 121 122 /** 123 * Perform graph walk on the circular buffer and invoke the process function 124 * of the nodes and collect the stats. 125 * 126 * @param graph 127 * Graph pointer returned from rte_graph_lookup function. 128 * 129 * @see rte_graph_lookup() 130 */ 131 __rte_experimental 132 static inline void 133 rte_graph_walk(struct rte_graph *graph) 134 { 135 const rte_graph_off_t *cir_start = graph->cir_start; 136 const rte_node_t mask = graph->cir_mask; 137 uint32_t head = graph->head; 138 struct rte_node *node; 139 uint64_t start; 140 uint16_t rc; 141 void **objs; 142 143 /* 144 * Walk on the source node(s) ((cir_start - head) -> cir_start) and then 145 * on the pending streams (cir_start -> (cir_start + mask) -> cir_start) 146 * in a circular buffer fashion. 147 * 148 * +-----+ <= cir_start - head [number of source nodes] 149 * | | 150 * | ... | <= source nodes 151 * | | 152 * +-----+ <= cir_start [head = 0] [tail = 0] 153 * | | 154 * | ... | <= pending streams 155 * | | 156 * +-----+ <= cir_start + mask 157 */ 158 while (likely(head != graph->tail)) { 159 node = (struct rte_node *)RTE_PTR_ADD(graph, cir_start[(int32_t)head++]); 160 RTE_ASSERT(node->fence == RTE_GRAPH_FENCE); 161 objs = node->objs; 162 rte_prefetch0(objs); 163 164 if (rte_graph_has_stats_feature()) { 165 start = rte_rdtsc(); 166 rc = node->process(graph, node, objs, node->idx); 167 node->total_cycles += rte_rdtsc() - start; 168 node->total_calls++; 169 node->total_objs += rc; 170 } else { 171 node->process(graph, node, objs, node->idx); 172 } 173 node->idx = 0; 174 head = likely((int32_t)head > 0) ? head & mask : head; 175 } 176 graph->tail = 0; 177 } 178 179 /* Fast path helper functions */ 180 181 /** 182 * @internal 183 * 184 * Enqueue a given node to the tail of the graph reel. 185 * 186 * @param graph 187 * Pointer Graph object. 188 * @param node 189 * Pointer to node object to be enqueued. 190 */ 191 static __rte_always_inline void 192 __rte_node_enqueue_tail_update(struct rte_graph *graph, struct rte_node *node) 193 { 194 uint32_t tail; 195 196 tail = graph->tail; 197 graph->cir_start[tail++] = node->off; 198 graph->tail = tail & graph->cir_mask; 199 } 200 201 /** 202 * @internal 203 * 204 * Enqueue sequence prologue function. 205 * 206 * Updates the node to tail of graph reel and resizes the number of objects 207 * available in the stream as needed. 208 * 209 * @param graph 210 * Pointer to the graph object. 211 * @param node 212 * Pointer to the node object. 213 * @param idx 214 * Index at which the object enqueue starts from. 215 * @param space 216 * Space required for the object enqueue. 217 */ 218 static __rte_always_inline void 219 __rte_node_enqueue_prologue(struct rte_graph *graph, struct rte_node *node, 220 const uint16_t idx, const uint16_t space) 221 { 222 223 /* Add to the pending stream list if the node is new */ 224 if (idx == 0) 225 __rte_node_enqueue_tail_update(graph, node); 226 227 if (unlikely(node->size < (idx + space))) 228 __rte_node_stream_alloc_size(graph, node, node->size + space); 229 } 230 231 /** 232 * @internal 233 * 234 * Get the node pointer from current node edge id. 235 * 236 * @param node 237 * Current node pointer. 238 * @param next 239 * Edge id of the required node. 240 * 241 * @return 242 * Pointer to the node denoted by the edge id. 243 */ 244 static __rte_always_inline struct rte_node * 245 __rte_node_next_node_get(struct rte_node *node, rte_edge_t next) 246 { 247 RTE_ASSERT(next < node->nb_edges); 248 RTE_ASSERT(node->fence == RTE_GRAPH_FENCE); 249 node = node->nodes[next]; 250 RTE_ASSERT(node->fence == RTE_GRAPH_FENCE); 251 252 return node; 253 } 254 255 /** 256 * Enqueue the objs to next node for further processing and set 257 * the next node to pending state in the circular buffer. 258 * 259 * @param graph 260 * Graph pointer returned from rte_graph_lookup(). 261 * @param node 262 * Current node pointer. 263 * @param next 264 * Relative next node index to enqueue objs. 265 * @param objs 266 * Objs to enqueue. 267 * @param nb_objs 268 * Number of objs to enqueue. 269 */ 270 __rte_experimental 271 static inline void 272 rte_node_enqueue(struct rte_graph *graph, struct rte_node *node, 273 rte_edge_t next, void **objs, uint16_t nb_objs) 274 { 275 node = __rte_node_next_node_get(node, next); 276 const uint16_t idx = node->idx; 277 278 __rte_node_enqueue_prologue(graph, node, idx, nb_objs); 279 280 rte_memcpy(&node->objs[idx], objs, nb_objs * sizeof(void *)); 281 node->idx = idx + nb_objs; 282 } 283 284 /** 285 * Enqueue only one obj to next node for further processing and 286 * set the next node to pending state in the circular buffer. 287 * 288 * @param graph 289 * Graph pointer returned from rte_graph_lookup(). 290 * @param node 291 * Current node pointer. 292 * @param next 293 * Relative next node index to enqueue objs. 294 * @param obj 295 * Obj to enqueue. 296 */ 297 __rte_experimental 298 static inline void 299 rte_node_enqueue_x1(struct rte_graph *graph, struct rte_node *node, 300 rte_edge_t next, void *obj) 301 { 302 node = __rte_node_next_node_get(node, next); 303 uint16_t idx = node->idx; 304 305 __rte_node_enqueue_prologue(graph, node, idx, 1); 306 307 node->objs[idx++] = obj; 308 node->idx = idx; 309 } 310 311 /** 312 * Enqueue only two objs to next node for further processing and 313 * set the next node to pending state in the circular buffer. 314 * Same as rte_node_enqueue_x1 but enqueue two objs. 315 * 316 * @param graph 317 * Graph pointer returned from rte_graph_lookup(). 318 * @param node 319 * Current node pointer. 320 * @param next 321 * Relative next node index to enqueue objs. 322 * @param obj0 323 * Obj to enqueue. 324 * @param obj1 325 * Obj to enqueue. 326 */ 327 __rte_experimental 328 static inline void 329 rte_node_enqueue_x2(struct rte_graph *graph, struct rte_node *node, 330 rte_edge_t next, void *obj0, void *obj1) 331 { 332 node = __rte_node_next_node_get(node, next); 333 uint16_t idx = node->idx; 334 335 __rte_node_enqueue_prologue(graph, node, idx, 2); 336 337 node->objs[idx++] = obj0; 338 node->objs[idx++] = obj1; 339 node->idx = idx; 340 } 341 342 /** 343 * Enqueue only four objs to next node for further processing and 344 * set the next node to pending state in the circular buffer. 345 * Same as rte_node_enqueue_x1 but enqueue four objs. 346 * 347 * @param graph 348 * Graph pointer returned from rte_graph_lookup(). 349 * @param node 350 * Current node pointer. 351 * @param next 352 * Relative next node index to enqueue objs. 353 * @param obj0 354 * 1st obj to enqueue. 355 * @param obj1 356 * 2nd obj to enqueue. 357 * @param obj2 358 * 3rd obj to enqueue. 359 * @param obj3 360 * 4th obj to enqueue. 361 */ 362 __rte_experimental 363 static inline void 364 rte_node_enqueue_x4(struct rte_graph *graph, struct rte_node *node, 365 rte_edge_t next, void *obj0, void *obj1, void *obj2, 366 void *obj3) 367 { 368 node = __rte_node_next_node_get(node, next); 369 uint16_t idx = node->idx; 370 371 __rte_node_enqueue_prologue(graph, node, idx, 4); 372 373 node->objs[idx++] = obj0; 374 node->objs[idx++] = obj1; 375 node->objs[idx++] = obj2; 376 node->objs[idx++] = obj3; 377 node->idx = idx; 378 } 379 380 /** 381 * Enqueue objs to multiple next nodes for further processing and 382 * set the next nodes to pending state in the circular buffer. 383 * objs[i] will be enqueued to nexts[i]. 384 * 385 * @param graph 386 * Graph pointer returned from rte_graph_lookup(). 387 * @param node 388 * Current node pointer. 389 * @param nexts 390 * List of relative next node indices to enqueue objs. 391 * @param objs 392 * List of objs to enqueue. 393 * @param nb_objs 394 * Number of objs to enqueue. 395 */ 396 __rte_experimental 397 static inline void 398 rte_node_enqueue_next(struct rte_graph *graph, struct rte_node *node, 399 rte_edge_t *nexts, void **objs, uint16_t nb_objs) 400 { 401 uint16_t i; 402 403 for (i = 0; i < nb_objs; i++) 404 rte_node_enqueue_x1(graph, node, nexts[i], objs[i]); 405 } 406 407 /** 408 * Get the stream of next node to enqueue the objs. 409 * Once done with the updating the objs, needs to call 410 * rte_node_next_stream_put to put the next node to pending state. 411 * 412 * @param graph 413 * Graph pointer returned from rte_graph_lookup(). 414 * @param node 415 * Current node pointer. 416 * @param next 417 * Relative next node index to get stream. 418 * @param nb_objs 419 * Requested free size of the next stream. 420 * 421 * @return 422 * Valid next stream on success. 423 * 424 * @see rte_node_next_stream_put(). 425 */ 426 __rte_experimental 427 static inline void ** 428 rte_node_next_stream_get(struct rte_graph *graph, struct rte_node *node, 429 rte_edge_t next, uint16_t nb_objs) 430 { 431 node = __rte_node_next_node_get(node, next); 432 const uint16_t idx = node->idx; 433 uint16_t free_space = node->size - idx; 434 435 if (unlikely(free_space < nb_objs)) 436 __rte_node_stream_alloc_size(graph, node, node->size + nb_objs); 437 438 return &node->objs[idx]; 439 } 440 441 /** 442 * Put the next stream to pending state in the circular buffer 443 * for further processing. Should be invoked after rte_node_next_stream_get(). 444 * 445 * @param graph 446 * Graph pointer returned from rte_graph_lookup(). 447 * @param node 448 * Current node pointer. 449 * @param next 450 * Relative next node index.. 451 * @param idx 452 * Number of objs updated in the stream after getting the stream using 453 * rte_node_next_stream_get. 454 * 455 * @see rte_node_next_stream_get(). 456 */ 457 __rte_experimental 458 static inline void 459 rte_node_next_stream_put(struct rte_graph *graph, struct rte_node *node, 460 rte_edge_t next, uint16_t idx) 461 { 462 if (unlikely(!idx)) 463 return; 464 465 node = __rte_node_next_node_get(node, next); 466 if (node->idx == 0) 467 __rte_node_enqueue_tail_update(graph, node); 468 469 node->idx += idx; 470 } 471 472 /** 473 * Home run scenario, Enqueue all the objs of current node to next 474 * node in optimized way by swapping the streams of both nodes. 475 * Performs good when next node is already not in pending state. 476 * If next node is already in pending state then normal enqueue 477 * will be used. 478 * 479 * @param graph 480 * Graph pointer returned from rte_graph_lookup(). 481 * @param src 482 * Current node pointer. 483 * @param next 484 * Relative next node index. 485 */ 486 __rte_experimental 487 static inline void 488 rte_node_next_stream_move(struct rte_graph *graph, struct rte_node *src, 489 rte_edge_t next) 490 { 491 struct rte_node *dst = __rte_node_next_node_get(src, next); 492 493 /* Let swap the pointers if dst don't have valid objs */ 494 if (likely(dst->idx == 0)) { 495 void **dobjs = dst->objs; 496 uint16_t dsz = dst->size; 497 dst->objs = src->objs; 498 dst->size = src->size; 499 src->objs = dobjs; 500 src->size = dsz; 501 dst->idx = src->idx; 502 __rte_node_enqueue_tail_update(graph, dst); 503 } else { /* Move the objects from src node to dst node */ 504 rte_node_enqueue(graph, src, next, src->objs, src->idx); 505 } 506 } 507 508 #ifdef __cplusplus 509 } 510 #endif 511 512 #endif /* _RTE_GRAPH_WORKER_H_ */ 513