xref: /dpdk/lib/graph/rte_graph_worker_common.h (revision 070db97e017b7ed9a5320b2f624f05562a632bd3)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(C) 2020 Marvell International Ltd.
3  */
4 
5 #ifndef _RTE_GRAPH_WORKER_COMMON_H_
6 #define _RTE_GRAPH_WORKER_COMMON_H_
7 
8 /**
9  * @file rte_graph_worker_common.h
10  *
11  * This API allows a worker thread to walk over a graph and nodes to create,
12  * process, enqueue and move streams of objects to the next nodes.
13  */
14 
15 #include <assert.h>
16 #include <stdalign.h>
17 #include <stddef.h>
18 
19 #include <rte_common.h>
20 #include <rte_cycles.h>
21 #include <rte_prefetch.h>
22 #include <rte_memcpy.h>
23 #include <rte_memory.h>
24 
25 #include "rte_graph.h"
26 
27 #ifdef __cplusplus
28 extern "C" {
29 #endif
30 
31 /** Graph worker models */
32 /* When adding a new graph model entry, update rte_graph_model_is_valid() implementation. */
33 #define RTE_GRAPH_MODEL_RTC 0 /**< Run-To-Completion model. It is the default model. */
34 #define RTE_GRAPH_MODEL_MCORE_DISPATCH 1
35 /**< Dispatch model to support cross-core dispatching within core affinity. */
36 #define RTE_GRAPH_MODEL_DEFAULT RTE_GRAPH_MODEL_RTC /**< Default graph model. */
37 
38 /**
39  * @internal
40  *
41  * Singly-linked list head for graph schedule run-queue.
42  */
43 SLIST_HEAD(rte_graph_rq_head, rte_graph);
44 
45 /**
46  * @internal
47  *
48  * Data structure to hold graph data.
49  */
50 struct __rte_cache_aligned rte_graph {
51 	/* Fast path area. */
52 	uint32_t tail;		     /**< Tail of circular buffer. */
53 	uint32_t head;		     /**< Head of circular buffer. */
54 	uint32_t cir_mask;	     /**< Circular buffer wrap around mask. */
55 	rte_node_t nb_nodes;	     /**< Number of nodes in the graph. */
56 	rte_graph_off_t *cir_start;  /**< Pointer to circular buffer. */
57 	rte_graph_off_t nodes_start; /**< Offset at which node memory starts. */
58 	uint8_t model;		     /**< graph model */
59 	uint8_t reserved1;	     /**< Reserved for future use. */
60 	uint16_t reserved2;	     /**< Reserved for future use. */
61 	union {
62 		/* Fast schedule area for mcore dispatch model */
63 		struct {
64 			alignas(RTE_CACHE_LINE_SIZE) struct rte_graph_rq_head *rq;
65 				/* The run-queue */
66 			struct rte_graph_rq_head rq_head; /* The head for run-queue list */
67 
68 			unsigned int lcore_id;  /**< The graph running Lcore. */
69 			struct rte_ring *wq;    /**< The work-queue for pending streams. */
70 			struct rte_mempool *mp; /**< The mempool for scheduling streams. */
71 		} dispatch; /** Only used by dispatch model */
72 	};
73 	SLIST_ENTRY(rte_graph) next;   /* The next for rte_graph list */
74 	/* End of Fast path area.*/
75 	rte_graph_t id;	/**< Graph identifier. */
76 	int socket;	/**< Socket ID where memory is allocated. */
77 	char name[RTE_GRAPH_NAMESIZE];	/**< Name of the graph. */
78 	bool pcap_enable;	        /**< Pcap trace enabled. */
79 	/** Number of packets captured per core. */
80 	uint64_t nb_pkt_captured;
81 	/** Number of packets to capture per core. */
82 	uint64_t nb_pkt_to_capture;
83 	char pcap_filename[RTE_GRAPH_PCAP_FILE_SZ];  /**< Pcap filename. */
84 	uint64_t fence;			/**< Fence. */
85 };
86 
87 /**
88  * @internal
89  *
90  * Data structure to hold node data.
91  */
92 struct __rte_cache_aligned rte_node {
93 	/* Slow path area  */
94 	uint64_t fence;		/**< Fence. */
95 	rte_graph_off_t next;	/**< Index to next node. */
96 	rte_node_t id;		/**< Node identifier. */
97 	rte_node_t parent_id;	/**< Parent Node identifier. */
98 	rte_edge_t nb_edges;	/**< Number of edges from this node. */
99 	uint32_t realloc_count;	/**< Number of times realloced. */
100 
101 	char parent[RTE_NODE_NAMESIZE];	/**< Parent node name. */
102 	char name[RTE_NODE_NAMESIZE];	/**< Name of the node. */
103 
104 	/** Original process function when pcap is enabled. */
105 	rte_node_process_t original_process;
106 
107 	union {
108 		/* Fast schedule area for mcore dispatch model */
109 		struct {
110 			unsigned int lcore_id;  /**< Node running lcore. */
111 			uint64_t total_sched_objs; /**< Number of objects scheduled. */
112 			uint64_t total_sched_fail; /**< Number of scheduled failure. */
113 		} dispatch;
114 	};
115 	rte_graph_off_t xstat_off; /**< Offset to xstat counters. */
116 	/* Fast path area  */
117 	__extension__ struct __rte_cache_aligned {
118 #define RTE_NODE_CTX_SZ 16
119 		union {
120 			uint8_t ctx[RTE_NODE_CTX_SZ];
121 			__extension__ struct {
122 				void *ctx_ptr;
123 				void *ctx_ptr2;
124 			};
125 		}; /**< Node Context. */
126 		uint16_t size;		/**< Total number of objects available. */
127 		uint16_t idx;		/**< Number of objects used. */
128 		rte_graph_off_t off;	/**< Offset of node in the graph reel. */
129 		uint64_t total_cycles;	/**< Cycles spent in this node. */
130 		uint64_t total_calls;	/**< Calls done to this node. */
131 		uint64_t total_objs;	/**< Objects processed by this node. */
132 		union {
133 			void **objs;	   /**< Array of object pointers. */
134 			uint64_t objs_u64;
135 		};
136 		union {
137 			rte_node_process_t process; /**< Process function. */
138 			uint64_t process_u64;
139 		};
140 		alignas(RTE_CACHE_LINE_MIN_SIZE) struct rte_node *nodes[]; /**< Next nodes. */
141 	};
142 };
143 
144 static_assert(offsetof(struct rte_node, nodes) - offsetof(struct rte_node, ctx)
145 	== RTE_CACHE_LINE_MIN_SIZE, "rte_node fast path area must fit in 64 bytes");
146 
147 /**
148  * @internal
149  *
150  * Allocate a stream of objects.
151  *
152  * If stream already exists then re-allocate it to a larger size.
153  *
154  * @param graph
155  *   Pointer to the graph object.
156  * @param node
157  *   Pointer to the node object.
158  */
159 void __rte_node_stream_alloc(struct rte_graph *graph, struct rte_node *node);
160 
161 /**
162  * @internal
163  *
164  * Allocate a stream with requested number of objects.
165  *
166  * If stream already exists then re-allocate it to a larger size.
167  *
168  * @param graph
169  *   Pointer to the graph object.
170  * @param node
171  *   Pointer to the node object.
172  * @param req_size
173  *   Number of objects to be allocated.
174  */
175 void __rte_node_stream_alloc_size(struct rte_graph *graph,
176 				  struct rte_node *node, uint16_t req_size);
177 
178 /* Fast path helper functions */
179 
180 /**
181  * @internal
182  *
183  * Enqueue a given node to the tail of the graph reel.
184  *
185  * @param graph
186  *   Pointer Graph object.
187  * @param node
188  *   Pointer to node object to be enqueued.
189  */
190 static __rte_always_inline void
191 __rte_node_process(struct rte_graph *graph, struct rte_node *node)
192 {
193 	uint64_t start;
194 	uint16_t rc;
195 	void **objs;
196 
197 	RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
198 	objs = node->objs;
199 	rte_prefetch0(objs);
200 
201 	if (rte_graph_has_stats_feature()) {
202 		start = rte_rdtsc();
203 		rc = node->process(graph, node, objs, node->idx);
204 		node->total_cycles += rte_rdtsc() - start;
205 		node->total_calls++;
206 		node->total_objs += rc;
207 	} else {
208 		node->process(graph, node, objs, node->idx);
209 	}
210 	node->idx = 0;
211 }
212 
213 /**
214  * @internal
215  *
216  * Enqueue a given node to the tail of the graph reel.
217  *
218  * @param graph
219  *   Pointer Graph object.
220  * @param node
221  *   Pointer to node object to be enqueued.
222  */
223 static __rte_always_inline void
224 __rte_node_enqueue_tail_update(struct rte_graph *graph, struct rte_node *node)
225 {
226 	uint32_t tail;
227 
228 	tail = graph->tail;
229 	graph->cir_start[tail++] = node->off;
230 	graph->tail = tail & graph->cir_mask;
231 }
232 
233 /**
234  * @internal
235  *
236  * Enqueue sequence prologue function.
237  *
238  * Updates the node to tail of graph reel and resizes the number of objects
239  * available in the stream as needed.
240  *
241  * @param graph
242  *   Pointer to the graph object.
243  * @param node
244  *   Pointer to the node object.
245  * @param idx
246  *   Index at which the object enqueue starts from.
247  * @param space
248  *   Space required for the object enqueue.
249  */
250 static __rte_always_inline void
251 __rte_node_enqueue_prologue(struct rte_graph *graph, struct rte_node *node,
252 			    const uint16_t idx, const uint16_t space)
253 {
254 
255 	/* Add to the pending stream list if the node is new */
256 	if (idx == 0)
257 		__rte_node_enqueue_tail_update(graph, node);
258 
259 	if (unlikely(node->size < (idx + space)))
260 		__rte_node_stream_alloc_size(graph, node, node->size + space);
261 }
262 
263 /**
264  * @internal
265  *
266  * Get the node pointer from current node edge id.
267  *
268  * @param node
269  *   Current node pointer.
270  * @param next
271  *   Edge id of the required node.
272  *
273  * @return
274  *   Pointer to the node denoted by the edge id.
275  */
276 static __rte_always_inline struct rte_node *
277 __rte_node_next_node_get(struct rte_node *node, rte_edge_t next)
278 {
279 	RTE_ASSERT(next < node->nb_edges);
280 	RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
281 	node = node->nodes[next];
282 	RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
283 
284 	return node;
285 }
286 
287 /**
288  * Enqueue the objs to next node for further processing and set
289  * the next node to pending state in the circular buffer.
290  *
291  * @param graph
292  *   Graph pointer returned from rte_graph_lookup().
293  * @param node
294  *   Current node pointer.
295  * @param next
296  *   Relative next node index to enqueue objs.
297  * @param objs
298  *   Objs to enqueue.
299  * @param nb_objs
300  *   Number of objs to enqueue.
301  */
302 static inline void
303 rte_node_enqueue(struct rte_graph *graph, struct rte_node *node,
304 		 rte_edge_t next, void **objs, uint16_t nb_objs)
305 {
306 	node = __rte_node_next_node_get(node, next);
307 	const uint16_t idx = node->idx;
308 
309 	__rte_node_enqueue_prologue(graph, node, idx, nb_objs);
310 
311 	rte_memcpy(&node->objs[idx], objs, nb_objs * sizeof(void *));
312 	node->idx = idx + nb_objs;
313 }
314 
315 /**
316  * Enqueue only one obj to next node for further processing and
317  * set the next node to pending state in the circular buffer.
318  *
319  * @param graph
320  *   Graph pointer returned from rte_graph_lookup().
321  * @param node
322  *   Current node pointer.
323  * @param next
324  *   Relative next node index to enqueue objs.
325  * @param obj
326  *   Obj to enqueue.
327  */
328 static inline void
329 rte_node_enqueue_x1(struct rte_graph *graph, struct rte_node *node,
330 		    rte_edge_t next, void *obj)
331 {
332 	node = __rte_node_next_node_get(node, next);
333 	uint16_t idx = node->idx;
334 
335 	__rte_node_enqueue_prologue(graph, node, idx, 1);
336 
337 	node->objs[idx++] = obj;
338 	node->idx = idx;
339 }
340 
341 /**
342  * Enqueue only two objs to next node for further processing and
343  * set the next node to pending state in the circular buffer.
344  * Same as rte_node_enqueue_x1 but enqueue two objs.
345  *
346  * @param graph
347  *   Graph pointer returned from rte_graph_lookup().
348  * @param node
349  *   Current node pointer.
350  * @param next
351  *   Relative next node index to enqueue objs.
352  * @param obj0
353  *   Obj to enqueue.
354  * @param obj1
355  *   Obj to enqueue.
356  */
357 static inline void
358 rte_node_enqueue_x2(struct rte_graph *graph, struct rte_node *node,
359 		    rte_edge_t next, void *obj0, void *obj1)
360 {
361 	node = __rte_node_next_node_get(node, next);
362 	uint16_t idx = node->idx;
363 
364 	__rte_node_enqueue_prologue(graph, node, idx, 2);
365 
366 	node->objs[idx++] = obj0;
367 	node->objs[idx++] = obj1;
368 	node->idx = idx;
369 }
370 
371 /**
372  * Enqueue only four objs to next node for further processing and
373  * set the next node to pending state in the circular buffer.
374  * Same as rte_node_enqueue_x1 but enqueue four objs.
375  *
376  * @param graph
377  *   Graph pointer returned from rte_graph_lookup().
378  * @param node
379  *   Current node pointer.
380  * @param next
381  *   Relative next node index to enqueue objs.
382  * @param obj0
383  *   1st obj to enqueue.
384  * @param obj1
385  *   2nd obj to enqueue.
386  * @param obj2
387  *   3rd obj to enqueue.
388  * @param obj3
389  *   4th obj to enqueue.
390  */
391 static inline void
392 rte_node_enqueue_x4(struct rte_graph *graph, struct rte_node *node,
393 		    rte_edge_t next, void *obj0, void *obj1, void *obj2,
394 		    void *obj3)
395 {
396 	node = __rte_node_next_node_get(node, next);
397 	uint16_t idx = node->idx;
398 
399 	__rte_node_enqueue_prologue(graph, node, idx, 4);
400 
401 	node->objs[idx++] = obj0;
402 	node->objs[idx++] = obj1;
403 	node->objs[idx++] = obj2;
404 	node->objs[idx++] = obj3;
405 	node->idx = idx;
406 }
407 
408 /**
409  * Enqueue objs to multiple next nodes for further processing and
410  * set the next nodes to pending state in the circular buffer.
411  * objs[i] will be enqueued to nexts[i].
412  *
413  * @param graph
414  *   Graph pointer returned from rte_graph_lookup().
415  * @param node
416  *   Current node pointer.
417  * @param nexts
418  *   List of relative next node indices to enqueue objs.
419  * @param objs
420  *   List of objs to enqueue.
421  * @param nb_objs
422  *   Number of objs to enqueue.
423  */
424 static inline void
425 rte_node_enqueue_next(struct rte_graph *graph, struct rte_node *node,
426 		      rte_edge_t *nexts, void **objs, uint16_t nb_objs)
427 {
428 	uint16_t i;
429 
430 	for (i = 0; i < nb_objs; i++)
431 		rte_node_enqueue_x1(graph, node, nexts[i], objs[i]);
432 }
433 
434 /**
435  * Get the stream of next node to enqueue the objs.
436  * Once done with the updating the objs, needs to call
437  * rte_node_next_stream_put to put the next node to pending state.
438  *
439  * @param graph
440  *   Graph pointer returned from rte_graph_lookup().
441  * @param node
442  *   Current node pointer.
443  * @param next
444  *   Relative next node index to get stream.
445  * @param nb_objs
446  *   Requested free size of the next stream.
447  *
448  * @return
449  *   Valid next stream on success.
450  *
451  * @see rte_node_next_stream_put().
452  */
453 static inline void **
454 rte_node_next_stream_get(struct rte_graph *graph, struct rte_node *node,
455 			 rte_edge_t next, uint16_t nb_objs)
456 {
457 	node = __rte_node_next_node_get(node, next);
458 	const uint16_t idx = node->idx;
459 	uint16_t free_space = node->size - idx;
460 
461 	if (unlikely(free_space < nb_objs))
462 		__rte_node_stream_alloc_size(graph, node, node->size + nb_objs);
463 
464 	return &node->objs[idx];
465 }
466 
467 /**
468  * Put the next stream to pending state in the circular buffer
469  * for further processing. Should be invoked after rte_node_next_stream_get().
470  *
471  * @param graph
472  *   Graph pointer returned from rte_graph_lookup().
473  * @param node
474  *   Current node pointer.
475  * @param next
476  *   Relative next node index..
477  * @param idx
478  *   Number of objs updated in the stream after getting the stream using
479  *   rte_node_next_stream_get.
480  *
481  * @see rte_node_next_stream_get().
482  */
483 static inline void
484 rte_node_next_stream_put(struct rte_graph *graph, struct rte_node *node,
485 			 rte_edge_t next, uint16_t idx)
486 {
487 	if (unlikely(!idx))
488 		return;
489 
490 	node = __rte_node_next_node_get(node, next);
491 	if (node->idx == 0)
492 		__rte_node_enqueue_tail_update(graph, node);
493 
494 	node->idx += idx;
495 }
496 
497 /**
498  * Home run scenario, Enqueue all the objs of current node to next
499  * node in optimized way by swapping the streams of both nodes.
500  * Performs good when next node is already not in pending state.
501  * If next node is already in pending state then normal enqueue
502  * will be used.
503  *
504  * @param graph
505  *   Graph pointer returned from rte_graph_lookup().
506  * @param src
507  *   Current node pointer.
508  * @param next
509  *   Relative next node index.
510  */
511 static inline void
512 rte_node_next_stream_move(struct rte_graph *graph, struct rte_node *src,
513 			  rte_edge_t next)
514 {
515 	struct rte_node *dst = __rte_node_next_node_get(src, next);
516 
517 	/* Let swap the pointers if dst don't have valid objs */
518 	if (likely(dst->idx == 0)) {
519 		void **dobjs = dst->objs;
520 		uint16_t dsz = dst->size;
521 		dst->objs = src->objs;
522 		dst->size = src->size;
523 		src->objs = dobjs;
524 		src->size = dsz;
525 		dst->idx = src->idx;
526 		__rte_node_enqueue_tail_update(graph, dst);
527 	} else { /* Move the objects from src node to dst node */
528 		rte_node_enqueue(graph, src, next, src->objs, src->idx);
529 	}
530 }
531 
532 /**
533  * Test the validity of model.
534  *
535  * @param model
536  *   Model to check.
537  *
538  * @return
539  *   True if graph model is valid, false otherwise.
540  */
541 bool
542 rte_graph_model_is_valid(uint8_t model);
543 
544 /**
545  * @note This function does not perform any locking, and is only safe to call
546  *    before graph running. It will set all graphs the same model.
547  *
548  * @param model
549  *   Name of the graph worker model.
550  *
551  * @return
552  *   0 on success, -1 otherwise.
553  */
554 int rte_graph_worker_model_set(uint8_t model);
555 
556 /**
557  * Get the graph worker model
558  *
559  * @note All graph will use the same model and this function will get model from the first one.
560  *    Used for slow path.
561  *
562  * @param graph
563  *   Graph pointer.
564  *
565  * @return
566  *   Graph worker model on success.
567  */
568 uint8_t rte_graph_worker_model_get(struct rte_graph *graph);
569 
570 /**
571  * Get the graph worker model without check
572  *
573  * @note All graph will use the same model and this function will get model from the first one.
574  *    Used for fast path.
575  *
576  * @param graph
577  *   Graph pointer.
578  *
579  * @return
580  *   Graph worker model on success.
581  */
582 static __rte_always_inline
583 uint8_t rte_graph_worker_model_no_check_get(struct rte_graph *graph)
584 {
585 	return graph->model;
586 }
587 
588 /**
589  * Increment Node xstat count.
590  *
591  * Increment the count of an xstat for a given node.
592  *
593  * @param node
594  *   Pointer to the node.
595  * @param xstat_id
596  *   xstat ID.
597  * @param value
598  *   Value to increment.
599  */
600 __rte_experimental
601 static inline void
602 rte_node_xstat_increment(struct rte_node *node, uint16_t xstat_id, uint64_t value)
603 {
604 	if (rte_graph_has_stats_feature()) {
605 		uint64_t *xstat = (uint64_t *)RTE_PTR_ADD(node, node->xstat_off);
606 		xstat[xstat_id] += value;
607 	}
608 }
609 
610 #ifdef __cplusplus
611 }
612 #endif
613 
614 #endif /* _RTE_GRAPH_WORKER_COIMMON_H_ */
615