xref: /dpdk/lib/graph/rte_graph_worker.h (revision af0785a2447b307965377b62f46a5f39457a85a3)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(C) 2020 Marvell International Ltd.
3  */
4 
5 #ifndef _RTE_GRAPH_WORKER_H_
6 #define _RTE_GRAPH_WORKER_H_
7 
8 /**
9  * @file rte_graph_worker.h
10  *
11  * @warning
12  * @b EXPERIMENTAL:
13  * All functions in this file may be changed or removed without prior notice.
14  *
15  * This API allows a worker thread to walk over a graph and nodes to create,
16  * process, enqueue and move streams of objects to the next nodes.
17  */
18 
19 #include <rte_compat.h>
20 #include <rte_common.h>
21 #include <rte_cycles.h>
22 #include <rte_prefetch.h>
23 #include <rte_memcpy.h>
24 #include <rte_memory.h>
25 
26 #include "rte_graph.h"
27 
28 #ifdef __cplusplus
29 extern "C" {
30 #endif
31 
32 /**
33  * @internal
34  *
35  * Data structure to hold graph data.
36  */
37 struct rte_graph {
38 	uint32_t tail;		     /**< Tail of circular buffer. */
39 	uint32_t head;		     /**< Head of circular buffer. */
40 	uint32_t cir_mask;	     /**< Circular buffer wrap around mask. */
41 	rte_node_t nb_nodes;	     /**< Number of nodes in the graph. */
42 	rte_graph_off_t *cir_start;  /**< Pointer to circular buffer. */
43 	rte_graph_off_t nodes_start; /**< Offset at which node memory starts. */
44 	rte_graph_t id;	/**< Graph identifier. */
45 	int socket;	/**< Socket ID where memory is allocated. */
46 	char name[RTE_GRAPH_NAMESIZE];	/**< Name of the graph. */
47 	uint64_t fence;			/**< Fence. */
48 } __rte_cache_aligned;
49 
50 /**
51  * @internal
52  *
53  * Data structure to hold node data.
54  */
55 struct rte_node {
56 	/* Slow path area  */
57 	uint64_t fence;		/**< Fence. */
58 	rte_graph_off_t next;	/**< Index to next node. */
59 	rte_node_t id;		/**< Node identifier. */
60 	rte_node_t parent_id;	/**< Parent Node identifier. */
61 	rte_edge_t nb_edges;	/**< Number of edges from this node. */
62 	uint32_t realloc_count;	/**< Number of times realloced. */
63 
64 	char parent[RTE_NODE_NAMESIZE];	/**< Parent node name. */
65 	char name[RTE_NODE_NAMESIZE];	/**< Name of the node. */
66 
67 	/* Fast path area  */
68 #define RTE_NODE_CTX_SZ 16
69 	uint8_t ctx[RTE_NODE_CTX_SZ] __rte_cache_aligned; /**< Node Context. */
70 	uint16_t size;		/**< Total number of objects available. */
71 	uint16_t idx;		/**< Number of objects used. */
72 	rte_graph_off_t off;	/**< Offset of node in the graph reel. */
73 	uint64_t total_cycles;	/**< Cycles spent in this node. */
74 	uint64_t total_calls;	/**< Calls done to this node. */
75 	uint64_t total_objs;	/**< Objects processed by this node. */
76 	RTE_STD_C11
77 		union {
78 			void **objs;	   /**< Array of object pointers. */
79 			uint64_t objs_u64;
80 		};
81 	RTE_STD_C11
82 		union {
83 			rte_node_process_t process; /**< Process function. */
84 			uint64_t process_u64;
85 		};
86 	struct rte_node *nodes[] __rte_cache_min_aligned; /**< Next nodes. */
87 } __rte_cache_aligned;
88 
89 /**
90  * @internal
91  *
92  * Allocate a stream of objects.
93  *
94  * If stream already exists then re-allocate it to a larger size.
95  *
96  * @param graph
97  *   Pointer to the graph object.
98  * @param node
99  *   Pointer to the node object.
100  */
101 __rte_experimental
102 void __rte_node_stream_alloc(struct rte_graph *graph, struct rte_node *node);
103 
104 /**
105  * @internal
106  *
107  * Allocate a stream with requested number of objects.
108  *
109  * If stream already exists then re-allocate it to a larger size.
110  *
111  * @param graph
112  *   Pointer to the graph object.
113  * @param node
114  *   Pointer to the node object.
115  * @param req_size
116  *   Number of objects to be allocated.
117  */
118 __rte_experimental
119 void __rte_node_stream_alloc_size(struct rte_graph *graph,
120 				  struct rte_node *node, uint16_t req_size);
121 
122 /**
123  * Perform graph walk on the circular buffer and invoke the process function
124  * of the nodes and collect the stats.
125  *
126  * @param graph
127  *   Graph pointer returned from rte_graph_lookup function.
128  *
129  * @see rte_graph_lookup()
130  */
131 __rte_experimental
132 static inline void
133 rte_graph_walk(struct rte_graph *graph)
134 {
135 	const rte_graph_off_t *cir_start = graph->cir_start;
136 	const rte_node_t mask = graph->cir_mask;
137 	uint32_t head = graph->head;
138 	struct rte_node *node;
139 	uint64_t start;
140 	uint16_t rc;
141 	void **objs;
142 
143 	/*
144 	 * Walk on the source node(s) ((cir_start - head) -> cir_start) and then
145 	 * on the pending streams (cir_start -> (cir_start + mask) -> cir_start)
146 	 * in a circular buffer fashion.
147 	 *
148 	 *	+-----+ <= cir_start - head [number of source nodes]
149 	 *	|     |
150 	 *	| ... | <= source nodes
151 	 *	|     |
152 	 *	+-----+ <= cir_start [head = 0] [tail = 0]
153 	 *	|     |
154 	 *	| ... | <= pending streams
155 	 *	|     |
156 	 *	+-----+ <= cir_start + mask
157 	 */
158 	while (likely(head != graph->tail)) {
159 		node = (struct rte_node *)RTE_PTR_ADD(graph, cir_start[(int32_t)head++]);
160 		RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
161 		objs = node->objs;
162 		rte_prefetch0(objs);
163 
164 		if (rte_graph_has_stats_feature()) {
165 			start = rte_rdtsc();
166 			rc = node->process(graph, node, objs, node->idx);
167 			node->total_cycles += rte_rdtsc() - start;
168 			node->total_calls++;
169 			node->total_objs += rc;
170 		} else {
171 			node->process(graph, node, objs, node->idx);
172 		}
173 		node->idx = 0;
174 		head = likely((int32_t)head > 0) ? head & mask : head;
175 	}
176 	graph->tail = 0;
177 }
178 
179 /* Fast path helper functions */
180 
181 /**
182  * @internal
183  *
184  * Enqueue a given node to the tail of the graph reel.
185  *
186  * @param graph
187  *   Pointer Graph object.
188  * @param node
189  *   Pointer to node object to be enqueued.
190  */
191 static __rte_always_inline void
192 __rte_node_enqueue_tail_update(struct rte_graph *graph, struct rte_node *node)
193 {
194 	uint32_t tail;
195 
196 	tail = graph->tail;
197 	graph->cir_start[tail++] = node->off;
198 	graph->tail = tail & graph->cir_mask;
199 }
200 
201 /**
202  * @internal
203  *
204  * Enqueue sequence prologue function.
205  *
206  * Updates the node to tail of graph reel and resizes the number of objects
207  * available in the stream as needed.
208  *
209  * @param graph
210  *   Pointer to the graph object.
211  * @param node
212  *   Pointer to the node object.
213  * @param idx
214  *   Index at which the object enqueue starts from.
215  * @param space
216  *   Space required for the object enqueue.
217  */
218 static __rte_always_inline void
219 __rte_node_enqueue_prologue(struct rte_graph *graph, struct rte_node *node,
220 			    const uint16_t idx, const uint16_t space)
221 {
222 
223 	/* Add to the pending stream list if the node is new */
224 	if (idx == 0)
225 		__rte_node_enqueue_tail_update(graph, node);
226 
227 	if (unlikely(node->size < (idx + space)))
228 		__rte_node_stream_alloc_size(graph, node, node->size + space);
229 }
230 
231 /**
232  * @internal
233  *
234  * Get the node pointer from current node edge id.
235  *
236  * @param node
237  *   Current node pointer.
238  * @param next
239  *   Edge id of the required node.
240  *
241  * @return
242  *   Pointer to the node denoted by the edge id.
243  */
244 static __rte_always_inline struct rte_node *
245 __rte_node_next_node_get(struct rte_node *node, rte_edge_t next)
246 {
247 	RTE_ASSERT(next < node->nb_edges);
248 	RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
249 	node = node->nodes[next];
250 	RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
251 
252 	return node;
253 }
254 
255 /**
256  * Enqueue the objs to next node for further processing and set
257  * the next node to pending state in the circular buffer.
258  *
259  * @param graph
260  *   Graph pointer returned from rte_graph_lookup().
261  * @param node
262  *   Current node pointer.
263  * @param next
264  *   Relative next node index to enqueue objs.
265  * @param objs
266  *   Objs to enqueue.
267  * @param nb_objs
268  *   Number of objs to enqueue.
269  */
270 __rte_experimental
271 static inline void
272 rte_node_enqueue(struct rte_graph *graph, struct rte_node *node,
273 		 rte_edge_t next, void **objs, uint16_t nb_objs)
274 {
275 	node = __rte_node_next_node_get(node, next);
276 	const uint16_t idx = node->idx;
277 
278 	__rte_node_enqueue_prologue(graph, node, idx, nb_objs);
279 
280 	rte_memcpy(&node->objs[idx], objs, nb_objs * sizeof(void *));
281 	node->idx = idx + nb_objs;
282 }
283 
284 /**
285  * Enqueue only one obj to next node for further processing and
286  * set the next node to pending state in the circular buffer.
287  *
288  * @param graph
289  *   Graph pointer returned from rte_graph_lookup().
290  * @param node
291  *   Current node pointer.
292  * @param next
293  *   Relative next node index to enqueue objs.
294  * @param obj
295  *   Obj to enqueue.
296  */
297 __rte_experimental
298 static inline void
299 rte_node_enqueue_x1(struct rte_graph *graph, struct rte_node *node,
300 		    rte_edge_t next, void *obj)
301 {
302 	node = __rte_node_next_node_get(node, next);
303 	uint16_t idx = node->idx;
304 
305 	__rte_node_enqueue_prologue(graph, node, idx, 1);
306 
307 	node->objs[idx++] = obj;
308 	node->idx = idx;
309 }
310 
311 /**
312  * Enqueue only two objs to next node for further processing and
313  * set the next node to pending state in the circular buffer.
314  * Same as rte_node_enqueue_x1 but enqueue two objs.
315  *
316  * @param graph
317  *   Graph pointer returned from rte_graph_lookup().
318  * @param node
319  *   Current node pointer.
320  * @param next
321  *   Relative next node index to enqueue objs.
322  * @param obj0
323  *   Obj to enqueue.
324  * @param obj1
325  *   Obj to enqueue.
326  */
327 __rte_experimental
328 static inline void
329 rte_node_enqueue_x2(struct rte_graph *graph, struct rte_node *node,
330 		    rte_edge_t next, void *obj0, void *obj1)
331 {
332 	node = __rte_node_next_node_get(node, next);
333 	uint16_t idx = node->idx;
334 
335 	__rte_node_enqueue_prologue(graph, node, idx, 2);
336 
337 	node->objs[idx++] = obj0;
338 	node->objs[idx++] = obj1;
339 	node->idx = idx;
340 }
341 
342 /**
343  * Enqueue only four objs to next node for further processing and
344  * set the next node to pending state in the circular buffer.
345  * Same as rte_node_enqueue_x1 but enqueue four objs.
346  *
347  * @param graph
348  *   Graph pointer returned from rte_graph_lookup().
349  * @param node
350  *   Current node pointer.
351  * @param next
352  *   Relative next node index to enqueue objs.
353  * @param obj0
354  *   1st obj to enqueue.
355  * @param obj1
356  *   2nd obj to enqueue.
357  * @param obj2
358  *   3rd obj to enqueue.
359  * @param obj3
360  *   4th obj to enqueue.
361  */
362 __rte_experimental
363 static inline void
364 rte_node_enqueue_x4(struct rte_graph *graph, struct rte_node *node,
365 		    rte_edge_t next, void *obj0, void *obj1, void *obj2,
366 		    void *obj3)
367 {
368 	node = __rte_node_next_node_get(node, next);
369 	uint16_t idx = node->idx;
370 
371 	__rte_node_enqueue_prologue(graph, node, idx, 4);
372 
373 	node->objs[idx++] = obj0;
374 	node->objs[idx++] = obj1;
375 	node->objs[idx++] = obj2;
376 	node->objs[idx++] = obj3;
377 	node->idx = idx;
378 }
379 
380 /**
381  * Enqueue objs to multiple next nodes for further processing and
382  * set the next nodes to pending state in the circular buffer.
383  * objs[i] will be enqueued to nexts[i].
384  *
385  * @param graph
386  *   Graph pointer returned from rte_graph_lookup().
387  * @param node
388  *   Current node pointer.
389  * @param nexts
390  *   List of relative next node indices to enqueue objs.
391  * @param objs
392  *   List of objs to enqueue.
393  * @param nb_objs
394  *   Number of objs to enqueue.
395  */
396 __rte_experimental
397 static inline void
398 rte_node_enqueue_next(struct rte_graph *graph, struct rte_node *node,
399 		      rte_edge_t *nexts, void **objs, uint16_t nb_objs)
400 {
401 	uint16_t i;
402 
403 	for (i = 0; i < nb_objs; i++)
404 		rte_node_enqueue_x1(graph, node, nexts[i], objs[i]);
405 }
406 
407 /**
408  * Get the stream of next node to enqueue the objs.
409  * Once done with the updating the objs, needs to call
410  * rte_node_next_stream_put to put the next node to pending state.
411  *
412  * @param graph
413  *   Graph pointer returned from rte_graph_lookup().
414  * @param node
415  *   Current node pointer.
416  * @param next
417  *   Relative next node index to get stream.
418  * @param nb_objs
419  *   Requested free size of the next stream.
420  *
421  * @return
422  *   Valid next stream on success.
423  *
424  * @see rte_node_next_stream_put().
425  */
426 __rte_experimental
427 static inline void **
428 rte_node_next_stream_get(struct rte_graph *graph, struct rte_node *node,
429 			 rte_edge_t next, uint16_t nb_objs)
430 {
431 	node = __rte_node_next_node_get(node, next);
432 	const uint16_t idx = node->idx;
433 	uint16_t free_space = node->size - idx;
434 
435 	if (unlikely(free_space < nb_objs))
436 		__rte_node_stream_alloc_size(graph, node, node->size + nb_objs);
437 
438 	return &node->objs[idx];
439 }
440 
441 /**
442  * Put the next stream to pending state in the circular buffer
443  * for further processing. Should be invoked after rte_node_next_stream_get().
444  *
445  * @param graph
446  *   Graph pointer returned from rte_graph_lookup().
447  * @param node
448  *   Current node pointer.
449  * @param next
450  *   Relative next node index..
451  * @param idx
452  *   Number of objs updated in the stream after getting the stream using
453  *   rte_node_next_stream_get.
454  *
455  * @see rte_node_next_stream_get().
456  */
457 __rte_experimental
458 static inline void
459 rte_node_next_stream_put(struct rte_graph *graph, struct rte_node *node,
460 			 rte_edge_t next, uint16_t idx)
461 {
462 	if (unlikely(!idx))
463 		return;
464 
465 	node = __rte_node_next_node_get(node, next);
466 	if (node->idx == 0)
467 		__rte_node_enqueue_tail_update(graph, node);
468 
469 	node->idx += idx;
470 }
471 
472 /**
473  * Home run scenario, Enqueue all the objs of current node to next
474  * node in optimized way by swapping the streams of both nodes.
475  * Performs good when next node is already not in pending state.
476  * If next node is already in pending state then normal enqueue
477  * will be used.
478  *
479  * @param graph
480  *   Graph pointer returned from rte_graph_lookup().
481  * @param src
482  *   Current node pointer.
483  * @param next
484  *   Relative next node index.
485  */
486 __rte_experimental
487 static inline void
488 rte_node_next_stream_move(struct rte_graph *graph, struct rte_node *src,
489 			  rte_edge_t next)
490 {
491 	struct rte_node *dst = __rte_node_next_node_get(src, next);
492 
493 	/* Let swap the pointers if dst don't have valid objs */
494 	if (likely(dst->idx == 0)) {
495 		void **dobjs = dst->objs;
496 		uint16_t dsz = dst->size;
497 		dst->objs = src->objs;
498 		dst->size = src->size;
499 		src->objs = dobjs;
500 		src->size = dsz;
501 		dst->idx = src->idx;
502 		__rte_node_enqueue_tail_update(graph, dst);
503 	} else { /* Move the objects from src node to dst node */
504 		rte_node_enqueue(graph, src, next, src->objs, src->idx);
505 	}
506 }
507 
508 #ifdef __cplusplus
509 }
510 #endif
511 
512 #endif /* _RTE_GRAPH_WORKER_H_ */
513