xref: /dpdk/lib/graph/rte_graph_worker.h (revision 665b49c51639a10c553433bc2bcd85c7331c631e)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(C) 2020 Marvell International Ltd.
3  */
4 
5 #ifndef _RTE_GRAPH_WORKER_H_
6 #define _RTE_GRAPH_WORKER_H_
7 
8 /**
9  * @file rte_graph_worker.h
10  *
11  * @warning
12  * @b EXPERIMENTAL:
13  * All functions in this file may be changed or removed without prior notice.
14  *
15  * This API allows a worker thread to walk over a graph and nodes to create,
16  * process, enqueue and move streams of objects to the next nodes.
17  */
18 
19 #include <rte_compat.h>
20 #include <rte_common.h>
21 #include <rte_cycles.h>
22 #include <rte_prefetch.h>
23 #include <rte_memcpy.h>
24 #include <rte_memory.h>
25 
26 #include "rte_graph.h"
27 
28 #ifdef __cplusplus
29 extern "C" {
30 #endif
31 
32 /**
33  * @internal
34  *
35  * Data structure to hold graph data.
36  */
37 struct rte_graph {
38 	uint32_t tail;		     /**< Tail of circular buffer. */
39 	uint32_t head;		     /**< Head of circular buffer. */
40 	uint32_t cir_mask;	     /**< Circular buffer wrap around mask. */
41 	rte_node_t nb_nodes;	     /**< Number of nodes in the graph. */
42 	rte_graph_off_t *cir_start;  /**< Pointer to circular buffer. */
43 	rte_graph_off_t nodes_start; /**< Offset at which node memory starts. */
44 	rte_graph_t id;	/**< Graph identifier. */
45 	int socket;	/**< Socket ID where memory is allocated. */
46 	char name[RTE_GRAPH_NAMESIZE];	/**< Name of the graph. */
47 	bool pcap_enable;	        /**< Pcap trace enabled. */
48 	/** Number of packets captured per core. */
49 	uint64_t nb_pkt_captured;
50 	/** Number of packets to capture per core. */
51 	uint64_t nb_pkt_to_capture;
52 	char pcap_filename[RTE_GRAPH_PCAP_FILE_SZ];  /**< Pcap filename. */
53 	uint64_t fence;			/**< Fence. */
54 } __rte_cache_aligned;
55 
56 /**
57  * @internal
58  *
59  * Data structure to hold node data.
60  */
61 struct rte_node {
62 	/* Slow path area  */
63 	uint64_t fence;		/**< Fence. */
64 	rte_graph_off_t next;	/**< Index to next node. */
65 	rte_node_t id;		/**< Node identifier. */
66 	rte_node_t parent_id;	/**< Parent Node identifier. */
67 	rte_edge_t nb_edges;	/**< Number of edges from this node. */
68 	uint32_t realloc_count;	/**< Number of times realloced. */
69 
70 	char parent[RTE_NODE_NAMESIZE];	/**< Parent node name. */
71 	char name[RTE_NODE_NAMESIZE];	/**< Name of the node. */
72 
73 	/** Original process function when pcap is enabled. */
74 	rte_node_process_t original_process;
75 
76 	/* Fast path area  */
77 #define RTE_NODE_CTX_SZ 16
78 	uint8_t ctx[RTE_NODE_CTX_SZ] __rte_cache_aligned; /**< Node Context. */
79 	uint16_t size;		/**< Total number of objects available. */
80 	uint16_t idx;		/**< Number of objects used. */
81 	rte_graph_off_t off;	/**< Offset of node in the graph reel. */
82 	uint64_t total_cycles;	/**< Cycles spent in this node. */
83 	uint64_t total_calls;	/**< Calls done to this node. */
84 	uint64_t total_objs;	/**< Objects processed by this node. */
85 	RTE_STD_C11
86 		union {
87 			void **objs;	   /**< Array of object pointers. */
88 			uint64_t objs_u64;
89 		};
90 	RTE_STD_C11
91 		union {
92 			rte_node_process_t process; /**< Process function. */
93 			uint64_t process_u64;
94 		};
95 	struct rte_node *nodes[] __rte_cache_min_aligned; /**< Next nodes. */
96 } __rte_cache_aligned;
97 
98 /**
99  * @internal
100  *
101  * Allocate a stream of objects.
102  *
103  * If stream already exists then re-allocate it to a larger size.
104  *
105  * @param graph
106  *   Pointer to the graph object.
107  * @param node
108  *   Pointer to the node object.
109  */
110 __rte_experimental
111 void __rte_node_stream_alloc(struct rte_graph *graph, struct rte_node *node);
112 
113 /**
114  * @internal
115  *
116  * Allocate a stream with requested number of objects.
117  *
118  * If stream already exists then re-allocate it to a larger size.
119  *
120  * @param graph
121  *   Pointer to the graph object.
122  * @param node
123  *   Pointer to the node object.
124  * @param req_size
125  *   Number of objects to be allocated.
126  */
127 __rte_experimental
128 void __rte_node_stream_alloc_size(struct rte_graph *graph,
129 				  struct rte_node *node, uint16_t req_size);
130 
131 /**
132  * Perform graph walk on the circular buffer and invoke the process function
133  * of the nodes and collect the stats.
134  *
135  * @param graph
136  *   Graph pointer returned from rte_graph_lookup function.
137  *
138  * @see rte_graph_lookup()
139  */
140 __rte_experimental
141 static inline void
142 rte_graph_walk(struct rte_graph *graph)
143 {
144 	const rte_graph_off_t *cir_start = graph->cir_start;
145 	const rte_node_t mask = graph->cir_mask;
146 	uint32_t head = graph->head;
147 	struct rte_node *node;
148 	uint64_t start;
149 	uint16_t rc;
150 	void **objs;
151 
152 	/*
153 	 * Walk on the source node(s) ((cir_start - head) -> cir_start) and then
154 	 * on the pending streams (cir_start -> (cir_start + mask) -> cir_start)
155 	 * in a circular buffer fashion.
156 	 *
157 	 *	+-----+ <= cir_start - head [number of source nodes]
158 	 *	|     |
159 	 *	| ... | <= source nodes
160 	 *	|     |
161 	 *	+-----+ <= cir_start [head = 0] [tail = 0]
162 	 *	|     |
163 	 *	| ... | <= pending streams
164 	 *	|     |
165 	 *	+-----+ <= cir_start + mask
166 	 */
167 	while (likely(head != graph->tail)) {
168 		node = (struct rte_node *)RTE_PTR_ADD(graph, cir_start[(int32_t)head++]);
169 		RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
170 		objs = node->objs;
171 		rte_prefetch0(objs);
172 
173 		if (rte_graph_has_stats_feature()) {
174 			start = rte_rdtsc();
175 			rc = node->process(graph, node, objs, node->idx);
176 			node->total_cycles += rte_rdtsc() - start;
177 			node->total_calls++;
178 			node->total_objs += rc;
179 		} else {
180 			node->process(graph, node, objs, node->idx);
181 		}
182 		node->idx = 0;
183 		head = likely((int32_t)head > 0) ? head & mask : head;
184 	}
185 	graph->tail = 0;
186 }
187 
188 /* Fast path helper functions */
189 
190 /**
191  * @internal
192  *
193  * Enqueue a given node to the tail of the graph reel.
194  *
195  * @param graph
196  *   Pointer Graph object.
197  * @param node
198  *   Pointer to node object to be enqueued.
199  */
200 static __rte_always_inline void
201 __rte_node_enqueue_tail_update(struct rte_graph *graph, struct rte_node *node)
202 {
203 	uint32_t tail;
204 
205 	tail = graph->tail;
206 	graph->cir_start[tail++] = node->off;
207 	graph->tail = tail & graph->cir_mask;
208 }
209 
210 /**
211  * @internal
212  *
213  * Enqueue sequence prologue function.
214  *
215  * Updates the node to tail of graph reel and resizes the number of objects
216  * available in the stream as needed.
217  *
218  * @param graph
219  *   Pointer to the graph object.
220  * @param node
221  *   Pointer to the node object.
222  * @param idx
223  *   Index at which the object enqueue starts from.
224  * @param space
225  *   Space required for the object enqueue.
226  */
227 static __rte_always_inline void
228 __rte_node_enqueue_prologue(struct rte_graph *graph, struct rte_node *node,
229 			    const uint16_t idx, const uint16_t space)
230 {
231 
232 	/* Add to the pending stream list if the node is new */
233 	if (idx == 0)
234 		__rte_node_enqueue_tail_update(graph, node);
235 
236 	if (unlikely(node->size < (idx + space)))
237 		__rte_node_stream_alloc_size(graph, node, node->size + space);
238 }
239 
240 /**
241  * @internal
242  *
243  * Get the node pointer from current node edge id.
244  *
245  * @param node
246  *   Current node pointer.
247  * @param next
248  *   Edge id of the required node.
249  *
250  * @return
251  *   Pointer to the node denoted by the edge id.
252  */
253 static __rte_always_inline struct rte_node *
254 __rte_node_next_node_get(struct rte_node *node, rte_edge_t next)
255 {
256 	RTE_ASSERT(next < node->nb_edges);
257 	RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
258 	node = node->nodes[next];
259 	RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
260 
261 	return node;
262 }
263 
264 /**
265  * Enqueue the objs to next node for further processing and set
266  * the next node to pending state in the circular buffer.
267  *
268  * @param graph
269  *   Graph pointer returned from rte_graph_lookup().
270  * @param node
271  *   Current node pointer.
272  * @param next
273  *   Relative next node index to enqueue objs.
274  * @param objs
275  *   Objs to enqueue.
276  * @param nb_objs
277  *   Number of objs to enqueue.
278  */
279 __rte_experimental
280 static inline void
281 rte_node_enqueue(struct rte_graph *graph, struct rte_node *node,
282 		 rte_edge_t next, void **objs, uint16_t nb_objs)
283 {
284 	node = __rte_node_next_node_get(node, next);
285 	const uint16_t idx = node->idx;
286 
287 	__rte_node_enqueue_prologue(graph, node, idx, nb_objs);
288 
289 	rte_memcpy(&node->objs[idx], objs, nb_objs * sizeof(void *));
290 	node->idx = idx + nb_objs;
291 }
292 
293 /**
294  * Enqueue only one obj to next node for further processing and
295  * set the next node to pending state in the circular buffer.
296  *
297  * @param graph
298  *   Graph pointer returned from rte_graph_lookup().
299  * @param node
300  *   Current node pointer.
301  * @param next
302  *   Relative next node index to enqueue objs.
303  * @param obj
304  *   Obj to enqueue.
305  */
306 __rte_experimental
307 static inline void
308 rte_node_enqueue_x1(struct rte_graph *graph, struct rte_node *node,
309 		    rte_edge_t next, void *obj)
310 {
311 	node = __rte_node_next_node_get(node, next);
312 	uint16_t idx = node->idx;
313 
314 	__rte_node_enqueue_prologue(graph, node, idx, 1);
315 
316 	node->objs[idx++] = obj;
317 	node->idx = idx;
318 }
319 
320 /**
321  * Enqueue only two objs to next node for further processing and
322  * set the next node to pending state in the circular buffer.
323  * Same as rte_node_enqueue_x1 but enqueue two objs.
324  *
325  * @param graph
326  *   Graph pointer returned from rte_graph_lookup().
327  * @param node
328  *   Current node pointer.
329  * @param next
330  *   Relative next node index to enqueue objs.
331  * @param obj0
332  *   Obj to enqueue.
333  * @param obj1
334  *   Obj to enqueue.
335  */
336 __rte_experimental
337 static inline void
338 rte_node_enqueue_x2(struct rte_graph *graph, struct rte_node *node,
339 		    rte_edge_t next, void *obj0, void *obj1)
340 {
341 	node = __rte_node_next_node_get(node, next);
342 	uint16_t idx = node->idx;
343 
344 	__rte_node_enqueue_prologue(graph, node, idx, 2);
345 
346 	node->objs[idx++] = obj0;
347 	node->objs[idx++] = obj1;
348 	node->idx = idx;
349 }
350 
351 /**
352  * Enqueue only four objs to next node for further processing and
353  * set the next node to pending state in the circular buffer.
354  * Same as rte_node_enqueue_x1 but enqueue four objs.
355  *
356  * @param graph
357  *   Graph pointer returned from rte_graph_lookup().
358  * @param node
359  *   Current node pointer.
360  * @param next
361  *   Relative next node index to enqueue objs.
362  * @param obj0
363  *   1st obj to enqueue.
364  * @param obj1
365  *   2nd obj to enqueue.
366  * @param obj2
367  *   3rd obj to enqueue.
368  * @param obj3
369  *   4th obj to enqueue.
370  */
371 __rte_experimental
372 static inline void
373 rte_node_enqueue_x4(struct rte_graph *graph, struct rte_node *node,
374 		    rte_edge_t next, void *obj0, void *obj1, void *obj2,
375 		    void *obj3)
376 {
377 	node = __rte_node_next_node_get(node, next);
378 	uint16_t idx = node->idx;
379 
380 	__rte_node_enqueue_prologue(graph, node, idx, 4);
381 
382 	node->objs[idx++] = obj0;
383 	node->objs[idx++] = obj1;
384 	node->objs[idx++] = obj2;
385 	node->objs[idx++] = obj3;
386 	node->idx = idx;
387 }
388 
389 /**
390  * Enqueue objs to multiple next nodes for further processing and
391  * set the next nodes to pending state in the circular buffer.
392  * objs[i] will be enqueued to nexts[i].
393  *
394  * @param graph
395  *   Graph pointer returned from rte_graph_lookup().
396  * @param node
397  *   Current node pointer.
398  * @param nexts
399  *   List of relative next node indices to enqueue objs.
400  * @param objs
401  *   List of objs to enqueue.
402  * @param nb_objs
403  *   Number of objs to enqueue.
404  */
405 __rte_experimental
406 static inline void
407 rte_node_enqueue_next(struct rte_graph *graph, struct rte_node *node,
408 		      rte_edge_t *nexts, void **objs, uint16_t nb_objs)
409 {
410 	uint16_t i;
411 
412 	for (i = 0; i < nb_objs; i++)
413 		rte_node_enqueue_x1(graph, node, nexts[i], objs[i]);
414 }
415 
416 /**
417  * Get the stream of next node to enqueue the objs.
418  * Once done with the updating the objs, needs to call
419  * rte_node_next_stream_put to put the next node to pending state.
420  *
421  * @param graph
422  *   Graph pointer returned from rte_graph_lookup().
423  * @param node
424  *   Current node pointer.
425  * @param next
426  *   Relative next node index to get stream.
427  * @param nb_objs
428  *   Requested free size of the next stream.
429  *
430  * @return
431  *   Valid next stream on success.
432  *
433  * @see rte_node_next_stream_put().
434  */
435 __rte_experimental
436 static inline void **
437 rte_node_next_stream_get(struct rte_graph *graph, struct rte_node *node,
438 			 rte_edge_t next, uint16_t nb_objs)
439 {
440 	node = __rte_node_next_node_get(node, next);
441 	const uint16_t idx = node->idx;
442 	uint16_t free_space = node->size - idx;
443 
444 	if (unlikely(free_space < nb_objs))
445 		__rte_node_stream_alloc_size(graph, node, node->size + nb_objs);
446 
447 	return &node->objs[idx];
448 }
449 
450 /**
451  * Put the next stream to pending state in the circular buffer
452  * for further processing. Should be invoked after rte_node_next_stream_get().
453  *
454  * @param graph
455  *   Graph pointer returned from rte_graph_lookup().
456  * @param node
457  *   Current node pointer.
458  * @param next
459  *   Relative next node index..
460  * @param idx
461  *   Number of objs updated in the stream after getting the stream using
462  *   rte_node_next_stream_get.
463  *
464  * @see rte_node_next_stream_get().
465  */
466 __rte_experimental
467 static inline void
468 rte_node_next_stream_put(struct rte_graph *graph, struct rte_node *node,
469 			 rte_edge_t next, uint16_t idx)
470 {
471 	if (unlikely(!idx))
472 		return;
473 
474 	node = __rte_node_next_node_get(node, next);
475 	if (node->idx == 0)
476 		__rte_node_enqueue_tail_update(graph, node);
477 
478 	node->idx += idx;
479 }
480 
481 /**
482  * Home run scenario, Enqueue all the objs of current node to next
483  * node in optimized way by swapping the streams of both nodes.
484  * Performs good when next node is already not in pending state.
485  * If next node is already in pending state then normal enqueue
486  * will be used.
487  *
488  * @param graph
489  *   Graph pointer returned from rte_graph_lookup().
490  * @param src
491  *   Current node pointer.
492  * @param next
493  *   Relative next node index.
494  */
495 __rte_experimental
496 static inline void
497 rte_node_next_stream_move(struct rte_graph *graph, struct rte_node *src,
498 			  rte_edge_t next)
499 {
500 	struct rte_node *dst = __rte_node_next_node_get(src, next);
501 
502 	/* Let swap the pointers if dst don't have valid objs */
503 	if (likely(dst->idx == 0)) {
504 		void **dobjs = dst->objs;
505 		uint16_t dsz = dst->size;
506 		dst->objs = src->objs;
507 		dst->size = src->size;
508 		src->objs = dobjs;
509 		src->size = dsz;
510 		dst->idx = src->idx;
511 		__rte_node_enqueue_tail_update(graph, dst);
512 	} else { /* Move the objects from src node to dst node */
513 		rte_node_enqueue(graph, src, next, src->objs, src->idx);
514 	}
515 }
516 
517 #ifdef __cplusplus
518 }
519 #endif
520 
521 #endif /* _RTE_GRAPH_WORKER_H_ */
522