xref: /dpdk/lib/graph/rte_graph_worker.h (revision daa02b5cddbb8e11b31d41e2bf7bb1ae64dcae2f)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(C) 2020 Marvell International Ltd.
3  */
4 
5 #ifndef _RTE_GRAPH_WORKER_H_
6 #define _RTE_GRAPH_WORKER_H_
7 
8 /**
9  * @file rte_graph_worker.h
10  *
11  * @warning
12  * @b EXPERIMENTAL:
13  * All functions in this file may be changed or removed without prior notice.
14  *
15  * This API allows a worker thread to walk over a graph and nodes to create,
16  * process, enqueue and move streams of objects to the next nodes.
17  */
18 
19 #include <rte_common.h>
20 #include <rte_cycles.h>
21 #include <rte_prefetch.h>
22 #include <rte_memcpy.h>
23 #include <rte_memory.h>
24 
25 #include "rte_graph.h"
26 
27 #ifdef __cplusplus
28 extern "C" {
29 #endif
30 
31 /**
32  * @internal
33  *
34  * Data structure to hold graph data.
35  */
36 struct rte_graph {
37 	uint32_t tail;		     /**< Tail of circular buffer. */
38 	uint32_t head;		     /**< Head of circular buffer. */
39 	uint32_t cir_mask;	     /**< Circular buffer wrap around mask. */
40 	rte_node_t nb_nodes;	     /**< Number of nodes in the graph. */
41 	rte_graph_off_t *cir_start;  /**< Pointer to circular buffer. */
42 	rte_graph_off_t nodes_start; /**< Offset at which node memory starts. */
43 	rte_graph_t id;	/**< Graph identifier. */
44 	int socket;	/**< Socket ID where memory is allocated. */
45 	char name[RTE_GRAPH_NAMESIZE];	/**< Name of the graph. */
46 	uint64_t fence;			/**< Fence. */
47 } __rte_cache_aligned;
48 
49 /**
50  * @internal
51  *
52  * Data structure to hold node data.
53  */
54 struct rte_node {
55 	/* Slow path area  */
56 	uint64_t fence;		/**< Fence. */
57 	rte_graph_off_t next;	/**< Index to next node. */
58 	rte_node_t id;		/**< Node identifier. */
59 	rte_node_t parent_id;	/**< Parent Node identifier. */
60 	rte_edge_t nb_edges;	/**< Number of edges from this node. */
61 	uint32_t realloc_count;	/**< Number of times realloced. */
62 
63 	char parent[RTE_NODE_NAMESIZE];	/**< Parent node name. */
64 	char name[RTE_NODE_NAMESIZE];	/**< Name of the node. */
65 
66 	/* Fast path area  */
67 #define RTE_NODE_CTX_SZ 16
68 	uint8_t ctx[RTE_NODE_CTX_SZ] __rte_cache_aligned; /**< Node Context. */
69 	uint16_t size;		/**< Total number of objects available. */
70 	uint16_t idx;		/**< Number of objects used. */
71 	rte_graph_off_t off;	/**< Offset of node in the graph reel. */
72 	uint64_t total_cycles;	/**< Cycles spent in this node. */
73 	uint64_t total_calls;	/**< Calls done to this node. */
74 	uint64_t total_objs;	/**< Objects processed by this node. */
75 	RTE_STD_C11
76 		union {
77 			void **objs;	   /**< Array of object pointers. */
78 			uint64_t objs_u64;
79 		};
80 	RTE_STD_C11
81 		union {
82 			rte_node_process_t process; /**< Process function. */
83 			uint64_t process_u64;
84 		};
85 	struct rte_node *nodes[] __rte_cache_min_aligned; /**< Next nodes. */
86 } __rte_cache_aligned;
87 
88 /**
89  * @internal
90  *
91  * Allocate a stream of objects.
92  *
93  * If stream already exists then re-allocate it to a larger size.
94  *
95  * @param graph
96  *   Pointer to the graph object.
97  * @param node
98  *   Pointer to the node object.
99  */
100 __rte_experimental
101 void __rte_node_stream_alloc(struct rte_graph *graph, struct rte_node *node);
102 
103 /**
104  * @internal
105  *
106  * Allocate a stream with requested number of objects.
107  *
108  * If stream already exists then re-allocate it to a larger size.
109  *
110  * @param graph
111  *   Pointer to the graph object.
112  * @param node
113  *   Pointer to the node object.
114  * @param req_size
115  *   Number of objects to be allocated.
116  */
117 __rte_experimental
118 void __rte_node_stream_alloc_size(struct rte_graph *graph,
119 				  struct rte_node *node, uint16_t req_size);
120 
121 /**
122  * Perform graph walk on the circular buffer and invoke the process function
123  * of the nodes and collect the stats.
124  *
125  * @param graph
126  *   Graph pointer returned from rte_graph_lookup function.
127  *
128  * @see rte_graph_lookup()
129  */
130 __rte_experimental
131 static inline void
132 rte_graph_walk(struct rte_graph *graph)
133 {
134 	const rte_graph_off_t *cir_start = graph->cir_start;
135 	const rte_node_t mask = graph->cir_mask;
136 	uint32_t head = graph->head;
137 	struct rte_node *node;
138 	uint64_t start;
139 	uint16_t rc;
140 	void **objs;
141 
142 	/*
143 	 * Walk on the source node(s) ((cir_start - head) -> cir_start) and then
144 	 * on the pending streams (cir_start -> (cir_start + mask) -> cir_start)
145 	 * in a circular buffer fashion.
146 	 *
147 	 *	+-----+ <= cir_start - head [number of source nodes]
148 	 *	|     |
149 	 *	| ... | <= source nodes
150 	 *	|     |
151 	 *	+-----+ <= cir_start [head = 0] [tail = 0]
152 	 *	|     |
153 	 *	| ... | <= pending streams
154 	 *	|     |
155 	 *	+-----+ <= cir_start + mask
156 	 */
157 	while (likely(head != graph->tail)) {
158 		node = RTE_PTR_ADD(graph, cir_start[(int32_t)head++]);
159 		RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
160 		objs = node->objs;
161 		rte_prefetch0(objs);
162 
163 		if (rte_graph_has_stats_feature()) {
164 			start = rte_rdtsc();
165 			rc = node->process(graph, node, objs, node->idx);
166 			node->total_cycles += rte_rdtsc() - start;
167 			node->total_calls++;
168 			node->total_objs += rc;
169 		} else {
170 			node->process(graph, node, objs, node->idx);
171 		}
172 		node->idx = 0;
173 		head = likely((int32_t)head > 0) ? head & mask : head;
174 	}
175 	graph->tail = 0;
176 }
177 
178 /* Fast path helper functions */
179 
180 /**
181  * @internal
182  *
183  * Enqueue a given node to the tail of the graph reel.
184  *
185  * @param graph
186  *   Pointer Graph object.
187  * @param node
188  *   Pointer to node object to be enqueued.
189  */
190 static __rte_always_inline void
191 __rte_node_enqueue_tail_update(struct rte_graph *graph, struct rte_node *node)
192 {
193 	uint32_t tail;
194 
195 	tail = graph->tail;
196 	graph->cir_start[tail++] = node->off;
197 	graph->tail = tail & graph->cir_mask;
198 }
199 
200 /**
201  * @internal
202  *
203  * Enqueue sequence prologue function.
204  *
205  * Updates the node to tail of graph reel and resizes the number of objects
206  * available in the stream as needed.
207  *
208  * @param graph
209  *   Pointer to the graph object.
210  * @param node
211  *   Pointer to the node object.
212  * @param idx
213  *   Index at which the object enqueue starts from.
214  * @param space
215  *   Space required for the object enqueue.
216  */
217 static __rte_always_inline void
218 __rte_node_enqueue_prologue(struct rte_graph *graph, struct rte_node *node,
219 			    const uint16_t idx, const uint16_t space)
220 {
221 
222 	/* Add to the pending stream list if the node is new */
223 	if (idx == 0)
224 		__rte_node_enqueue_tail_update(graph, node);
225 
226 	if (unlikely(node->size < (idx + space)))
227 		__rte_node_stream_alloc(graph, node);
228 }
229 
230 /**
231  * @internal
232  *
233  * Get the node pointer from current node edge id.
234  *
235  * @param node
236  *   Current node pointer.
237  * @param next
238  *   Edge id of the required node.
239  *
240  * @return
241  *   Pointer to the node denoted by the edge id.
242  */
243 static __rte_always_inline struct rte_node *
244 __rte_node_next_node_get(struct rte_node *node, rte_edge_t next)
245 {
246 	RTE_ASSERT(next < node->nb_edges);
247 	RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
248 	node = node->nodes[next];
249 	RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
250 
251 	return node;
252 }
253 
254 /**
255  * Enqueue the objs to next node for further processing and set
256  * the next node to pending state in the circular buffer.
257  *
258  * @param graph
259  *   Graph pointer returned from rte_graph_lookup().
260  * @param node
261  *   Current node pointer.
262  * @param next
263  *   Relative next node index to enqueue objs.
264  * @param objs
265  *   Objs to enqueue.
266  * @param nb_objs
267  *   Number of objs to enqueue.
268  */
269 __rte_experimental
270 static inline void
271 rte_node_enqueue(struct rte_graph *graph, struct rte_node *node,
272 		 rte_edge_t next, void **objs, uint16_t nb_objs)
273 {
274 	node = __rte_node_next_node_get(node, next);
275 	const uint16_t idx = node->idx;
276 
277 	__rte_node_enqueue_prologue(graph, node, idx, nb_objs);
278 
279 	rte_memcpy(&node->objs[idx], objs, nb_objs * sizeof(void *));
280 	node->idx = idx + nb_objs;
281 }
282 
283 /**
284  * Enqueue only one obj to next node for further processing and
285  * set the next node to pending state in the circular buffer.
286  *
287  * @param graph
288  *   Graph pointer returned from rte_graph_lookup().
289  * @param node
290  *   Current node pointer.
291  * @param next
292  *   Relative next node index to enqueue objs.
293  * @param obj
294  *   Obj to enqueue.
295  */
296 __rte_experimental
297 static inline void
298 rte_node_enqueue_x1(struct rte_graph *graph, struct rte_node *node,
299 		    rte_edge_t next, void *obj)
300 {
301 	node = __rte_node_next_node_get(node, next);
302 	uint16_t idx = node->idx;
303 
304 	__rte_node_enqueue_prologue(graph, node, idx, 1);
305 
306 	node->objs[idx++] = obj;
307 	node->idx = idx;
308 }
309 
310 /**
311  * Enqueue only two objs to next node for further processing and
312  * set the next node to pending state in the circular buffer.
313  * Same as rte_node_enqueue_x1 but enqueue two objs.
314  *
315  * @param graph
316  *   Graph pointer returned from rte_graph_lookup().
317  * @param node
318  *   Current node pointer.
319  * @param next
320  *   Relative next node index to enqueue objs.
321  * @param obj0
322  *   Obj to enqueue.
323  * @param obj1
324  *   Obj to enqueue.
325  */
326 __rte_experimental
327 static inline void
328 rte_node_enqueue_x2(struct rte_graph *graph, struct rte_node *node,
329 		    rte_edge_t next, void *obj0, void *obj1)
330 {
331 	node = __rte_node_next_node_get(node, next);
332 	uint16_t idx = node->idx;
333 
334 	__rte_node_enqueue_prologue(graph, node, idx, 2);
335 
336 	node->objs[idx++] = obj0;
337 	node->objs[idx++] = obj1;
338 	node->idx = idx;
339 }
340 
341 /**
342  * Enqueue only four objs to next node for further processing and
343  * set the next node to pending state in the circular buffer.
344  * Same as rte_node_enqueue_x1 but enqueue four objs.
345  *
346  * @param graph
347  *   Graph pointer returned from rte_graph_lookup().
348  * @param node
349  *   Current node pointer.
350  * @param next
351  *   Relative next node index to enqueue objs.
352  * @param obj0
353  *   1st obj to enqueue.
354  * @param obj1
355  *   2nd obj to enqueue.
356  * @param obj2
357  *   3rd obj to enqueue.
358  * @param obj3
359  *   4th obj to enqueue.
360  */
361 __rte_experimental
362 static inline void
363 rte_node_enqueue_x4(struct rte_graph *graph, struct rte_node *node,
364 		    rte_edge_t next, void *obj0, void *obj1, void *obj2,
365 		    void *obj3)
366 {
367 	node = __rte_node_next_node_get(node, next);
368 	uint16_t idx = node->idx;
369 
370 	__rte_node_enqueue_prologue(graph, node, idx, 4);
371 
372 	node->objs[idx++] = obj0;
373 	node->objs[idx++] = obj1;
374 	node->objs[idx++] = obj2;
375 	node->objs[idx++] = obj3;
376 	node->idx = idx;
377 }
378 
379 /**
380  * Enqueue objs to multiple next nodes for further processing and
381  * set the next nodes to pending state in the circular buffer.
382  * objs[i] will be enqueued to nexts[i].
383  *
384  * @param graph
385  *   Graph pointer returned from rte_graph_lookup().
386  * @param node
387  *   Current node pointer.
388  * @param nexts
389  *   List of relative next node indices to enqueue objs.
390  * @param objs
391  *   List of objs to enqueue.
392  * @param nb_objs
393  *   Number of objs to enqueue.
394  */
395 __rte_experimental
396 static inline void
397 rte_node_enqueue_next(struct rte_graph *graph, struct rte_node *node,
398 		      rte_edge_t *nexts, void **objs, uint16_t nb_objs)
399 {
400 	uint16_t i;
401 
402 	for (i = 0; i < nb_objs; i++)
403 		rte_node_enqueue_x1(graph, node, nexts[i], objs[i]);
404 }
405 
406 /**
407  * Get the stream of next node to enqueue the objs.
408  * Once done with the updating the objs, needs to call
409  * rte_node_next_stream_put to put the next node to pending state.
410  *
411  * @param graph
412  *   Graph pointer returned from rte_graph_lookup().
413  * @param node
414  *   Current node pointer.
415  * @param next
416  *   Relative next node index to get stream.
417  * @param nb_objs
418  *   Requested free size of the next stream.
419  *
420  * @return
421  *   Valid next stream on success.
422  *
423  * @see rte_node_next_stream_put().
424  */
425 __rte_experimental
426 static inline void **
427 rte_node_next_stream_get(struct rte_graph *graph, struct rte_node *node,
428 			 rte_edge_t next, uint16_t nb_objs)
429 {
430 	node = __rte_node_next_node_get(node, next);
431 	const uint16_t idx = node->idx;
432 	uint16_t free_space = node->size - idx;
433 
434 	if (unlikely(free_space < nb_objs))
435 		__rte_node_stream_alloc_size(graph, node, nb_objs);
436 
437 	return &node->objs[idx];
438 }
439 
440 /**
441  * Put the next stream to pending state in the circular buffer
442  * for further processing. Should be invoked after rte_node_next_stream_get().
443  *
444  * @param graph
445  *   Graph pointer returned from rte_graph_lookup().
446  * @param node
447  *   Current node pointer.
448  * @param next
449  *   Relative next node index..
450  * @param idx
451  *   Number of objs updated in the stream after getting the stream using
452  *   rte_node_next_stream_get.
453  *
454  * @see rte_node_next_stream_get().
455  */
456 __rte_experimental
457 static inline void
458 rte_node_next_stream_put(struct rte_graph *graph, struct rte_node *node,
459 			 rte_edge_t next, uint16_t idx)
460 {
461 	if (unlikely(!idx))
462 		return;
463 
464 	node = __rte_node_next_node_get(node, next);
465 	if (node->idx == 0)
466 		__rte_node_enqueue_tail_update(graph, node);
467 
468 	node->idx += idx;
469 }
470 
471 /**
472  * Home run scenario, Enqueue all the objs of current node to next
473  * node in optimized way by swapping the streams of both nodes.
474  * Performs good when next node is already not in pending state.
475  * If next node is already in pending state then normal enqueue
476  * will be used.
477  *
478  * @param graph
479  *   Graph pointer returned from rte_graph_lookup().
480  * @param src
481  *   Current node pointer.
482  * @param next
483  *   Relative next node index.
484  */
485 __rte_experimental
486 static inline void
487 rte_node_next_stream_move(struct rte_graph *graph, struct rte_node *src,
488 			  rte_edge_t next)
489 {
490 	struct rte_node *dst = __rte_node_next_node_get(src, next);
491 
492 	/* Let swap the pointers if dst don't have valid objs */
493 	if (likely(dst->idx == 0)) {
494 		void **dobjs = dst->objs;
495 		uint16_t dsz = dst->size;
496 		dst->objs = src->objs;
497 		dst->size = src->size;
498 		src->objs = dobjs;
499 		src->size = dsz;
500 		dst->idx = src->idx;
501 		__rte_node_enqueue_tail_update(graph, dst);
502 	} else { /* Move the objects from src node to dst node */
503 		rte_node_enqueue(graph, src, next, src->objs, src->idx);
504 	}
505 }
506 
507 #ifdef __cplusplus
508 }
509 #endif
510 
511 #endif /* _RTE_GRAPH_WORKER_H_ */
512