xref: /dpdk/examples/ip_pipeline/thread.c (revision daa02b5cddbb8e11b31d41e2bf7bb1ae64dcae2f)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2018 Intel Corporation
3  */
4 
5 #include <stdlib.h>
6 
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11 
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17 
18 #include "common.h"
19 #include "thread.h"
20 #include "pipeline.h"
21 
22 #ifndef THREAD_PIPELINES_MAX
23 #define THREAD_PIPELINES_MAX                               256
24 #endif
25 
26 #ifndef THREAD_MSGQ_SIZE
27 #define THREAD_MSGQ_SIZE                                   64
28 #endif
29 
30 #ifndef THREAD_TIMER_PERIOD_MS
31 #define THREAD_TIMER_PERIOD_MS                             100
32 #endif
33 
34 /**
35  * Main thread: data plane thread context
36  */
37 struct thread {
38 	struct rte_ring *msgq_req;
39 	struct rte_ring *msgq_rsp;
40 
41 	uint32_t enabled;
42 };
43 
44 static struct thread thread[RTE_MAX_LCORE];
45 
46 /**
47  * Data plane threads: context
48  */
49 struct table_data {
50 	struct rte_table_action *a;
51 };
52 
53 struct pipeline_data {
54 	struct rte_pipeline *p;
55 	struct table_data table_data[RTE_PIPELINE_TABLE_MAX];
56 	uint32_t n_tables;
57 
58 	struct rte_ring *msgq_req;
59 	struct rte_ring *msgq_rsp;
60 	uint64_t timer_period; /* Measured in CPU cycles. */
61 	uint64_t time_next;
62 
63 	uint8_t buffer[TABLE_RULE_ACTION_SIZE_MAX];
64 };
65 
66 struct thread_data {
67 	struct rte_pipeline *p[THREAD_PIPELINES_MAX];
68 	uint32_t n_pipelines;
69 
70 	struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
71 	struct rte_ring *msgq_req;
72 	struct rte_ring *msgq_rsp;
73 	uint64_t timer_period; /* Measured in CPU cycles. */
74 	uint64_t time_next;
75 	uint64_t time_next_min;
76 } __rte_cache_aligned;
77 
78 static struct thread_data thread_data[RTE_MAX_LCORE];
79 
80 /**
81  * Main thread: data plane thread init
82  */
83 static void
84 thread_free(void)
85 {
86 	uint32_t i;
87 
88 	for (i = 0; i < RTE_MAX_LCORE; i++) {
89 		struct thread *t = &thread[i];
90 
91 		if (!rte_lcore_is_enabled(i))
92 			continue;
93 
94 		/* MSGQs */
95 		if (t->msgq_req)
96 			rte_ring_free(t->msgq_req);
97 
98 		if (t->msgq_rsp)
99 			rte_ring_free(t->msgq_rsp);
100 	}
101 }
102 
103 int
104 thread_init(void)
105 {
106 	uint32_t i;
107 
108 	RTE_LCORE_FOREACH_WORKER(i) {
109 		char name[NAME_MAX];
110 		struct rte_ring *msgq_req, *msgq_rsp;
111 		struct thread *t = &thread[i];
112 		struct thread_data *t_data = &thread_data[i];
113 		uint32_t cpu_id = rte_lcore_to_socket_id(i);
114 
115 		/* MSGQs */
116 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
117 
118 		msgq_req = rte_ring_create(name,
119 			THREAD_MSGQ_SIZE,
120 			cpu_id,
121 			RING_F_SP_ENQ | RING_F_SC_DEQ);
122 
123 		if (msgq_req == NULL) {
124 			thread_free();
125 			return -1;
126 		}
127 
128 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
129 
130 		msgq_rsp = rte_ring_create(name,
131 			THREAD_MSGQ_SIZE,
132 			cpu_id,
133 			RING_F_SP_ENQ | RING_F_SC_DEQ);
134 
135 		if (msgq_rsp == NULL) {
136 			thread_free();
137 			return -1;
138 		}
139 
140 		/* Main thread records */
141 		t->msgq_req = msgq_req;
142 		t->msgq_rsp = msgq_rsp;
143 		t->enabled = 1;
144 
145 		/* Data plane thread records */
146 		t_data->n_pipelines = 0;
147 		t_data->msgq_req = msgq_req;
148 		t_data->msgq_rsp = msgq_rsp;
149 		t_data->timer_period =
150 			(rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
151 		t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
152 		t_data->time_next_min = t_data->time_next;
153 	}
154 
155 	return 0;
156 }
157 
158 static inline int
159 thread_is_running(uint32_t thread_id)
160 {
161 	enum rte_lcore_state_t thread_state;
162 
163 	thread_state = rte_eal_get_lcore_state(thread_id);
164 	return (thread_state == RUNNING) ? 1 : 0;
165 }
166 
167 /**
168  * Pipeline is running when:
169  *    (A) Pipeline is mapped to a data plane thread AND
170  *    (B) Its data plane thread is in RUNNING state.
171  */
172 static inline int
173 pipeline_is_running(struct pipeline *p)
174 {
175 	if (p->enabled == 0)
176 		return 0;
177 
178 	return thread_is_running(p->thread_id);
179 }
180 
181 /**
182  * Main thread & data plane threads: message passing
183  */
184 enum thread_req_type {
185 	THREAD_REQ_PIPELINE_ENABLE = 0,
186 	THREAD_REQ_PIPELINE_DISABLE,
187 	THREAD_REQ_MAX
188 };
189 
190 struct thread_msg_req {
191 	enum thread_req_type type;
192 
193 	union {
194 		struct {
195 			struct rte_pipeline *p;
196 			struct {
197 				struct rte_table_action *a;
198 			} table[RTE_PIPELINE_TABLE_MAX];
199 			struct rte_ring *msgq_req;
200 			struct rte_ring *msgq_rsp;
201 			uint32_t timer_period_ms;
202 			uint32_t n_tables;
203 		} pipeline_enable;
204 
205 		struct {
206 			struct rte_pipeline *p;
207 		} pipeline_disable;
208 	};
209 };
210 
211 struct thread_msg_rsp {
212 	int status;
213 };
214 
215 /**
216  * Main thread
217  */
218 static struct thread_msg_req *
219 thread_msg_alloc(void)
220 {
221 	size_t size = RTE_MAX(sizeof(struct thread_msg_req),
222 		sizeof(struct thread_msg_rsp));
223 
224 	return calloc(1, size);
225 }
226 
227 static void
228 thread_msg_free(struct thread_msg_rsp *rsp)
229 {
230 	free(rsp);
231 }
232 
233 static struct thread_msg_rsp *
234 thread_msg_send_recv(uint32_t thread_id,
235 	struct thread_msg_req *req)
236 {
237 	struct thread *t = &thread[thread_id];
238 	struct rte_ring *msgq_req = t->msgq_req;
239 	struct rte_ring *msgq_rsp = t->msgq_rsp;
240 	struct thread_msg_rsp *rsp;
241 	int status;
242 
243 	/* send */
244 	do {
245 		status = rte_ring_sp_enqueue(msgq_req, req);
246 	} while (status == -ENOBUFS);
247 
248 	/* recv */
249 	do {
250 		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
251 	} while (status != 0);
252 
253 	return rsp;
254 }
255 
256 int
257 thread_pipeline_enable(uint32_t thread_id,
258 	const char *pipeline_name)
259 {
260 	struct pipeline *p = pipeline_find(pipeline_name);
261 	struct thread *t;
262 	struct thread_msg_req *req;
263 	struct thread_msg_rsp *rsp;
264 	uint32_t i;
265 	int status;
266 
267 	/* Check input params */
268 	if ((thread_id >= RTE_MAX_LCORE) ||
269 		(p == NULL) ||
270 		(p->n_ports_in == 0) ||
271 		(p->n_ports_out == 0) ||
272 		(p->n_tables == 0))
273 		return -1;
274 
275 	t = &thread[thread_id];
276 	if ((t->enabled == 0) ||
277 		p->enabled)
278 		return -1;
279 
280 	if (!thread_is_running(thread_id)) {
281 		struct thread_data *td = &thread_data[thread_id];
282 		struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
283 
284 		if (td->n_pipelines >= THREAD_PIPELINES_MAX)
285 			return -1;
286 
287 		/* Data plane thread */
288 		td->p[td->n_pipelines] = p->p;
289 
290 		tdp->p = p->p;
291 		for (i = 0; i < p->n_tables; i++)
292 			tdp->table_data[i].a = p->table[i].a;
293 
294 		tdp->n_tables = p->n_tables;
295 
296 		tdp->msgq_req = p->msgq_req;
297 		tdp->msgq_rsp = p->msgq_rsp;
298 		tdp->timer_period = (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
299 		tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
300 
301 		td->n_pipelines++;
302 
303 		/* Pipeline */
304 		p->thread_id = thread_id;
305 		p->enabled = 1;
306 
307 		return 0;
308 	}
309 
310 	/* Allocate request */
311 	req = thread_msg_alloc();
312 	if (req == NULL)
313 		return -1;
314 
315 	/* Write request */
316 	req->type = THREAD_REQ_PIPELINE_ENABLE;
317 	req->pipeline_enable.p = p->p;
318 	for (i = 0; i < p->n_tables; i++)
319 		req->pipeline_enable.table[i].a =
320 			p->table[i].a;
321 	req->pipeline_enable.msgq_req = p->msgq_req;
322 	req->pipeline_enable.msgq_rsp = p->msgq_rsp;
323 	req->pipeline_enable.timer_period_ms = p->timer_period_ms;
324 	req->pipeline_enable.n_tables = p->n_tables;
325 
326 	/* Send request and wait for response */
327 	rsp = thread_msg_send_recv(thread_id, req);
328 
329 	/* Read response */
330 	status = rsp->status;
331 
332 	/* Free response */
333 	thread_msg_free(rsp);
334 
335 	/* Request completion */
336 	if (status)
337 		return status;
338 
339 	p->thread_id = thread_id;
340 	p->enabled = 1;
341 
342 	return 0;
343 }
344 
345 int
346 thread_pipeline_disable(uint32_t thread_id,
347 	const char *pipeline_name)
348 {
349 	struct pipeline *p = pipeline_find(pipeline_name);
350 	struct thread *t;
351 	struct thread_msg_req *req;
352 	struct thread_msg_rsp *rsp;
353 	int status;
354 
355 	/* Check input params */
356 	if ((thread_id >= RTE_MAX_LCORE) ||
357 		(p == NULL))
358 		return -1;
359 
360 	t = &thread[thread_id];
361 	if (t->enabled == 0)
362 		return -1;
363 
364 	if (p->enabled == 0)
365 		return 0;
366 
367 	if (p->thread_id != thread_id)
368 		return -1;
369 
370 	if (!thread_is_running(thread_id)) {
371 		struct thread_data *td = &thread_data[thread_id];
372 		uint32_t i;
373 
374 		for (i = 0; i < td->n_pipelines; i++) {
375 			struct pipeline_data *tdp = &td->pipeline_data[i];
376 
377 			if (tdp->p != p->p)
378 				continue;
379 
380 			/* Data plane thread */
381 			if (i < td->n_pipelines - 1) {
382 				struct rte_pipeline *pipeline_last =
383 					td->p[td->n_pipelines - 1];
384 				struct pipeline_data *tdp_last =
385 					&td->pipeline_data[td->n_pipelines - 1];
386 
387 				td->p[i] = pipeline_last;
388 				memcpy(tdp, tdp_last, sizeof(*tdp));
389 			}
390 
391 			td->n_pipelines--;
392 
393 			/* Pipeline */
394 			p->enabled = 0;
395 
396 			break;
397 		}
398 
399 		return 0;
400 	}
401 
402 	/* Allocate request */
403 	req = thread_msg_alloc();
404 	if (req == NULL)
405 		return -1;
406 
407 	/* Write request */
408 	req->type = THREAD_REQ_PIPELINE_DISABLE;
409 	req->pipeline_disable.p = p->p;
410 
411 	/* Send request and wait for response */
412 	rsp = thread_msg_send_recv(thread_id, req);
413 
414 	/* Read response */
415 	status = rsp->status;
416 
417 	/* Free response */
418 	thread_msg_free(rsp);
419 
420 	/* Request completion */
421 	if (status)
422 		return status;
423 
424 	p->enabled = 0;
425 
426 	return 0;
427 }
428 
429 /**
430  * Data plane threads: message handling
431  */
432 static inline struct thread_msg_req *
433 thread_msg_recv(struct rte_ring *msgq_req)
434 {
435 	struct thread_msg_req *req;
436 
437 	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
438 
439 	if (status != 0)
440 		return NULL;
441 
442 	return req;
443 }
444 
445 static inline void
446 thread_msg_send(struct rte_ring *msgq_rsp,
447 	struct thread_msg_rsp *rsp)
448 {
449 	int status;
450 
451 	do {
452 		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
453 	} while (status == -ENOBUFS);
454 }
455 
456 static struct thread_msg_rsp *
457 thread_msg_handle_pipeline_enable(struct thread_data *t,
458 	struct thread_msg_req *req)
459 {
460 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
461 	struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
462 	uint32_t i;
463 
464 	/* Request */
465 	if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
466 		rsp->status = -1;
467 		return rsp;
468 	}
469 
470 	t->p[t->n_pipelines] = req->pipeline_enable.p;
471 
472 	p->p = req->pipeline_enable.p;
473 	for (i = 0; i < req->pipeline_enable.n_tables; i++)
474 		p->table_data[i].a =
475 			req->pipeline_enable.table[i].a;
476 
477 	p->n_tables = req->pipeline_enable.n_tables;
478 
479 	p->msgq_req = req->pipeline_enable.msgq_req;
480 	p->msgq_rsp = req->pipeline_enable.msgq_rsp;
481 	p->timer_period =
482 		(rte_get_tsc_hz() * req->pipeline_enable.timer_period_ms) / 1000;
483 	p->time_next = rte_get_tsc_cycles() + p->timer_period;
484 
485 	t->n_pipelines++;
486 
487 	/* Response */
488 	rsp->status = 0;
489 	return rsp;
490 }
491 
492 static struct thread_msg_rsp *
493 thread_msg_handle_pipeline_disable(struct thread_data *t,
494 	struct thread_msg_req *req)
495 {
496 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
497 	uint32_t n_pipelines = t->n_pipelines;
498 	struct rte_pipeline *pipeline = req->pipeline_disable.p;
499 	uint32_t i;
500 
501 	/* find pipeline */
502 	for (i = 0; i < n_pipelines; i++) {
503 		struct pipeline_data *p = &t->pipeline_data[i];
504 
505 		if (p->p != pipeline)
506 			continue;
507 
508 		if (i < n_pipelines - 1) {
509 			struct rte_pipeline *pipeline_last =
510 				t->p[n_pipelines - 1];
511 			struct pipeline_data *p_last =
512 				&t->pipeline_data[n_pipelines - 1];
513 
514 			t->p[i] = pipeline_last;
515 			memcpy(p, p_last, sizeof(*p));
516 		}
517 
518 		t->n_pipelines--;
519 
520 		rsp->status = 0;
521 		return rsp;
522 	}
523 
524 	/* should not get here */
525 	rsp->status = 0;
526 	return rsp;
527 }
528 
529 static void
530 thread_msg_handle(struct thread_data *t)
531 {
532 	for ( ; ; ) {
533 		struct thread_msg_req *req;
534 		struct thread_msg_rsp *rsp;
535 
536 		req = thread_msg_recv(t->msgq_req);
537 		if (req == NULL)
538 			break;
539 
540 		switch (req->type) {
541 		case THREAD_REQ_PIPELINE_ENABLE:
542 			rsp = thread_msg_handle_pipeline_enable(t, req);
543 			break;
544 
545 		case THREAD_REQ_PIPELINE_DISABLE:
546 			rsp = thread_msg_handle_pipeline_disable(t, req);
547 			break;
548 
549 		default:
550 			rsp = (struct thread_msg_rsp *) req;
551 			rsp->status = -1;
552 		}
553 
554 		thread_msg_send(t->msgq_rsp, rsp);
555 	}
556 }
557 
558 /**
559  * Main thread & data plane threads: message passing
560  */
561 enum pipeline_req_type {
562 	/* Port IN */
563 	PIPELINE_REQ_PORT_IN_STATS_READ,
564 	PIPELINE_REQ_PORT_IN_ENABLE,
565 	PIPELINE_REQ_PORT_IN_DISABLE,
566 
567 	/* Port OUT */
568 	PIPELINE_REQ_PORT_OUT_STATS_READ,
569 
570 	/* Table */
571 	PIPELINE_REQ_TABLE_STATS_READ,
572 	PIPELINE_REQ_TABLE_RULE_ADD,
573 	PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT,
574 	PIPELINE_REQ_TABLE_RULE_ADD_BULK,
575 	PIPELINE_REQ_TABLE_RULE_DELETE,
576 	PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT,
577 	PIPELINE_REQ_TABLE_RULE_STATS_READ,
578 	PIPELINE_REQ_TABLE_MTR_PROFILE_ADD,
579 	PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE,
580 	PIPELINE_REQ_TABLE_RULE_MTR_READ,
581 	PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE,
582 	PIPELINE_REQ_TABLE_RULE_TTL_READ,
583 	PIPELINE_REQ_TABLE_RULE_TIME_READ,
584 	PIPELINE_REQ_MAX
585 };
586 
587 struct pipeline_msg_req_port_in_stats_read {
588 	int clear;
589 };
590 
591 struct pipeline_msg_req_port_out_stats_read {
592 	int clear;
593 };
594 
595 struct pipeline_msg_req_table_stats_read {
596 	int clear;
597 };
598 
599 struct pipeline_msg_req_table_rule_add {
600 	struct table_rule_match match;
601 	struct table_rule_action action;
602 };
603 
604 struct pipeline_msg_req_table_rule_add_default {
605 	struct table_rule_action action;
606 };
607 
608 struct pipeline_msg_req_table_rule_add_bulk {
609 	struct table_rule_list *list;
610 	int bulk;
611 };
612 
613 struct pipeline_msg_req_table_rule_delete {
614 	struct table_rule_match match;
615 };
616 
617 struct pipeline_msg_req_table_rule_stats_read {
618 	void *data;
619 	int clear;
620 };
621 
622 struct pipeline_msg_req_table_mtr_profile_add {
623 	uint32_t meter_profile_id;
624 	struct rte_table_action_meter_profile profile;
625 };
626 
627 struct pipeline_msg_req_table_mtr_profile_delete {
628 	uint32_t meter_profile_id;
629 };
630 
631 struct pipeline_msg_req_table_rule_mtr_read {
632 	void *data;
633 	uint32_t tc_mask;
634 	int clear;
635 };
636 
637 struct pipeline_msg_req_table_dscp_table_update {
638 	uint64_t dscp_mask;
639 	struct rte_table_action_dscp_table dscp_table;
640 };
641 
642 struct pipeline_msg_req_table_rule_ttl_read {
643 	void *data;
644 	int clear;
645 };
646 
647 struct pipeline_msg_req_table_rule_time_read {
648 	void *data;
649 };
650 
651 struct pipeline_msg_req {
652 	enum pipeline_req_type type;
653 	uint32_t id; /* Port IN, port OUT or table ID */
654 
655 	RTE_STD_C11
656 	union {
657 		struct pipeline_msg_req_port_in_stats_read port_in_stats_read;
658 		struct pipeline_msg_req_port_out_stats_read port_out_stats_read;
659 		struct pipeline_msg_req_table_stats_read table_stats_read;
660 		struct pipeline_msg_req_table_rule_add table_rule_add;
661 		struct pipeline_msg_req_table_rule_add_default table_rule_add_default;
662 		struct pipeline_msg_req_table_rule_add_bulk table_rule_add_bulk;
663 		struct pipeline_msg_req_table_rule_delete table_rule_delete;
664 		struct pipeline_msg_req_table_rule_stats_read table_rule_stats_read;
665 		struct pipeline_msg_req_table_mtr_profile_add table_mtr_profile_add;
666 		struct pipeline_msg_req_table_mtr_profile_delete table_mtr_profile_delete;
667 		struct pipeline_msg_req_table_rule_mtr_read table_rule_mtr_read;
668 		struct pipeline_msg_req_table_dscp_table_update table_dscp_table_update;
669 		struct pipeline_msg_req_table_rule_ttl_read table_rule_ttl_read;
670 		struct pipeline_msg_req_table_rule_time_read table_rule_time_read;
671 	};
672 };
673 
674 struct pipeline_msg_rsp_port_in_stats_read {
675 	struct rte_pipeline_port_in_stats stats;
676 };
677 
678 struct pipeline_msg_rsp_port_out_stats_read {
679 	struct rte_pipeline_port_out_stats stats;
680 };
681 
682 struct pipeline_msg_rsp_table_stats_read {
683 	struct rte_pipeline_table_stats stats;
684 };
685 
686 struct pipeline_msg_rsp_table_rule_add {
687 	void *data;
688 };
689 
690 struct pipeline_msg_rsp_table_rule_add_default {
691 	void *data;
692 };
693 
694 struct pipeline_msg_rsp_table_rule_add_bulk {
695 	uint32_t n_rules;
696 };
697 
698 struct pipeline_msg_rsp_table_rule_stats_read {
699 	struct rte_table_action_stats_counters stats;
700 };
701 
702 struct pipeline_msg_rsp_table_rule_mtr_read {
703 	struct rte_table_action_mtr_counters stats;
704 };
705 
706 struct pipeline_msg_rsp_table_rule_ttl_read {
707 	struct rte_table_action_ttl_counters stats;
708 };
709 
710 struct pipeline_msg_rsp_table_rule_time_read {
711 	uint64_t timestamp;
712 };
713 
714 struct pipeline_msg_rsp {
715 	int status;
716 
717 	RTE_STD_C11
718 	union {
719 		struct pipeline_msg_rsp_port_in_stats_read port_in_stats_read;
720 		struct pipeline_msg_rsp_port_out_stats_read port_out_stats_read;
721 		struct pipeline_msg_rsp_table_stats_read table_stats_read;
722 		struct pipeline_msg_rsp_table_rule_add table_rule_add;
723 		struct pipeline_msg_rsp_table_rule_add_default table_rule_add_default;
724 		struct pipeline_msg_rsp_table_rule_add_bulk table_rule_add_bulk;
725 		struct pipeline_msg_rsp_table_rule_stats_read table_rule_stats_read;
726 		struct pipeline_msg_rsp_table_rule_mtr_read table_rule_mtr_read;
727 		struct pipeline_msg_rsp_table_rule_ttl_read table_rule_ttl_read;
728 		struct pipeline_msg_rsp_table_rule_time_read table_rule_time_read;
729 	};
730 };
731 
732 /**
733  * Main thread
734  */
735 static struct pipeline_msg_req *
736 pipeline_msg_alloc(void)
737 {
738 	size_t size = RTE_MAX(sizeof(struct pipeline_msg_req),
739 		sizeof(struct pipeline_msg_rsp));
740 
741 	return calloc(1, size);
742 }
743 
744 static void
745 pipeline_msg_free(struct pipeline_msg_rsp *rsp)
746 {
747 	free(rsp);
748 }
749 
750 static struct pipeline_msg_rsp *
751 pipeline_msg_send_recv(struct pipeline *p,
752 	struct pipeline_msg_req *req)
753 {
754 	struct rte_ring *msgq_req = p->msgq_req;
755 	struct rte_ring *msgq_rsp = p->msgq_rsp;
756 	struct pipeline_msg_rsp *rsp;
757 	int status;
758 
759 	/* send */
760 	do {
761 		status = rte_ring_sp_enqueue(msgq_req, req);
762 	} while (status == -ENOBUFS);
763 
764 	/* recv */
765 	do {
766 		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
767 	} while (status != 0);
768 
769 	return rsp;
770 }
771 
772 int
773 pipeline_port_in_stats_read(const char *pipeline_name,
774 	uint32_t port_id,
775 	struct rte_pipeline_port_in_stats *stats,
776 	int clear)
777 {
778 	struct pipeline *p;
779 	struct pipeline_msg_req *req;
780 	struct pipeline_msg_rsp *rsp;
781 	int status;
782 
783 	/* Check input params */
784 	if ((pipeline_name == NULL) ||
785 		(stats == NULL))
786 		return -1;
787 
788 	p = pipeline_find(pipeline_name);
789 	if ((p == NULL) ||
790 		(port_id >= p->n_ports_in))
791 		return -1;
792 
793 	if (!pipeline_is_running(p)) {
794 		status = rte_pipeline_port_in_stats_read(p->p,
795 			port_id,
796 			stats,
797 			clear);
798 
799 		return status;
800 	}
801 
802 	/* Allocate request */
803 	req = pipeline_msg_alloc();
804 	if (req == NULL)
805 		return -1;
806 
807 	/* Write request */
808 	req->type = PIPELINE_REQ_PORT_IN_STATS_READ;
809 	req->id = port_id;
810 	req->port_in_stats_read.clear = clear;
811 
812 	/* Send request and wait for response */
813 	rsp = pipeline_msg_send_recv(p, req);
814 
815 	/* Read response */
816 	status = rsp->status;
817 	if (status == 0)
818 		memcpy(stats, &rsp->port_in_stats_read.stats, sizeof(*stats));
819 
820 	/* Free response */
821 	pipeline_msg_free(rsp);
822 
823 	return status;
824 }
825 
826 int
827 pipeline_port_in_enable(const char *pipeline_name,
828 	uint32_t port_id)
829 {
830 	struct pipeline *p;
831 	struct pipeline_msg_req *req;
832 	struct pipeline_msg_rsp *rsp;
833 	int status;
834 
835 	/* Check input params */
836 	if (pipeline_name == NULL)
837 		return -1;
838 
839 	p = pipeline_find(pipeline_name);
840 	if ((p == NULL) ||
841 		(port_id >= p->n_ports_in))
842 		return -1;
843 
844 	if (!pipeline_is_running(p)) {
845 		status = rte_pipeline_port_in_enable(p->p, port_id);
846 		return status;
847 	}
848 
849 	/* Allocate request */
850 	req = pipeline_msg_alloc();
851 	if (req == NULL)
852 		return -1;
853 
854 	/* Write request */
855 	req->type = PIPELINE_REQ_PORT_IN_ENABLE;
856 	req->id = port_id;
857 
858 	/* Send request and wait for response */
859 	rsp = pipeline_msg_send_recv(p, req);
860 
861 	/* Read response */
862 	status = rsp->status;
863 
864 	/* Free response */
865 	pipeline_msg_free(rsp);
866 
867 	return status;
868 }
869 
870 int
871 pipeline_port_in_disable(const char *pipeline_name,
872 	uint32_t port_id)
873 {
874 	struct pipeline *p;
875 	struct pipeline_msg_req *req;
876 	struct pipeline_msg_rsp *rsp;
877 	int status;
878 
879 	/* Check input params */
880 	if (pipeline_name == NULL)
881 		return -1;
882 
883 	p = pipeline_find(pipeline_name);
884 	if ((p == NULL) ||
885 		(port_id >= p->n_ports_in))
886 		return -1;
887 
888 	if (!pipeline_is_running(p)) {
889 		status = rte_pipeline_port_in_disable(p->p, port_id);
890 		return status;
891 	}
892 
893 	/* Allocate request */
894 	req = pipeline_msg_alloc();
895 	if (req == NULL)
896 		return -1;
897 
898 	/* Write request */
899 	req->type = PIPELINE_REQ_PORT_IN_DISABLE;
900 	req->id = port_id;
901 
902 	/* Send request and wait for response */
903 	rsp = pipeline_msg_send_recv(p, req);
904 
905 	/* Read response */
906 	status = rsp->status;
907 
908 	/* Free response */
909 	pipeline_msg_free(rsp);
910 
911 	return status;
912 }
913 
914 int
915 pipeline_port_out_stats_read(const char *pipeline_name,
916 	uint32_t port_id,
917 	struct rte_pipeline_port_out_stats *stats,
918 	int clear)
919 {
920 	struct pipeline *p;
921 	struct pipeline_msg_req *req;
922 	struct pipeline_msg_rsp *rsp;
923 	int status;
924 
925 	/* Check input params */
926 	if ((pipeline_name == NULL) ||
927 		(stats == NULL))
928 		return -1;
929 
930 	p = pipeline_find(pipeline_name);
931 	if ((p == NULL) ||
932 		(port_id >= p->n_ports_out))
933 		return -1;
934 
935 	if (!pipeline_is_running(p)) {
936 		status = rte_pipeline_port_out_stats_read(p->p,
937 			port_id,
938 			stats,
939 			clear);
940 
941 		return status;
942 	}
943 
944 	/* Allocate request */
945 	req = pipeline_msg_alloc();
946 	if (req == NULL)
947 		return -1;
948 
949 	/* Write request */
950 	req->type = PIPELINE_REQ_PORT_OUT_STATS_READ;
951 	req->id = port_id;
952 	req->port_out_stats_read.clear = clear;
953 
954 	/* Send request and wait for response */
955 	rsp = pipeline_msg_send_recv(p, req);
956 
957 	/* Read response */
958 	status = rsp->status;
959 	if (status == 0)
960 		memcpy(stats, &rsp->port_out_stats_read.stats, sizeof(*stats));
961 
962 	/* Free response */
963 	pipeline_msg_free(rsp);
964 
965 	return status;
966 }
967 
968 int
969 pipeline_table_stats_read(const char *pipeline_name,
970 	uint32_t table_id,
971 	struct rte_pipeline_table_stats *stats,
972 	int clear)
973 {
974 	struct pipeline *p;
975 	struct pipeline_msg_req *req;
976 	struct pipeline_msg_rsp *rsp;
977 	int status;
978 
979 	/* Check input params */
980 	if ((pipeline_name == NULL) ||
981 		(stats == NULL))
982 		return -1;
983 
984 	p = pipeline_find(pipeline_name);
985 	if ((p == NULL) ||
986 		(table_id >= p->n_tables))
987 		return -1;
988 
989 	if (!pipeline_is_running(p)) {
990 		status = rte_pipeline_table_stats_read(p->p,
991 			table_id,
992 			stats,
993 			clear);
994 
995 		return status;
996 	}
997 
998 	/* Allocate request */
999 	req = pipeline_msg_alloc();
1000 	if (req == NULL)
1001 		return -1;
1002 
1003 	/* Write request */
1004 	req->type = PIPELINE_REQ_TABLE_STATS_READ;
1005 	req->id = table_id;
1006 	req->table_stats_read.clear = clear;
1007 
1008 	/* Send request and wait for response */
1009 	rsp = pipeline_msg_send_recv(p, req);
1010 
1011 	/* Read response */
1012 	status = rsp->status;
1013 	if (status == 0)
1014 		memcpy(stats, &rsp->table_stats_read.stats, sizeof(*stats));
1015 
1016 	/* Free response */
1017 	pipeline_msg_free(rsp);
1018 
1019 	return status;
1020 }
1021 
1022 static int
1023 match_check(struct table_rule_match *match,
1024 	struct pipeline *p,
1025 	uint32_t table_id)
1026 {
1027 	struct table *table;
1028 
1029 	if ((match == NULL) ||
1030 		(p == NULL) ||
1031 		(table_id >= p->n_tables))
1032 		return -1;
1033 
1034 	table = &p->table[table_id];
1035 	if (match->match_type != table->params.match_type)
1036 		return -1;
1037 
1038 	switch (match->match_type) {
1039 	case TABLE_ACL:
1040 	{
1041 		struct table_acl_params *t = &table->params.match.acl;
1042 		struct table_rule_match_acl *r = &match->match.acl;
1043 
1044 		if ((r->ip_version && (t->ip_version == 0)) ||
1045 			((r->ip_version == 0) && t->ip_version))
1046 			return -1;
1047 
1048 		if (r->ip_version) {
1049 			if ((r->sa_depth > 32) ||
1050 				(r->da_depth > 32))
1051 				return -1;
1052 		} else {
1053 			if ((r->sa_depth > 128) ||
1054 				(r->da_depth > 128))
1055 				return -1;
1056 		}
1057 		return 0;
1058 	}
1059 
1060 	case TABLE_ARRAY:
1061 		return 0;
1062 
1063 	case TABLE_HASH:
1064 		return 0;
1065 
1066 	case TABLE_LPM:
1067 	{
1068 		struct table_lpm_params *t = &table->params.match.lpm;
1069 		struct table_rule_match_lpm *r = &match->match.lpm;
1070 
1071 		if ((r->ip_version && (t->key_size != 4)) ||
1072 			((r->ip_version == 0) && (t->key_size != 16)))
1073 			return -1;
1074 
1075 		if (r->ip_version) {
1076 			if (r->depth > 32)
1077 				return -1;
1078 		} else {
1079 			if (r->depth > 128)
1080 				return -1;
1081 		}
1082 		return 0;
1083 	}
1084 
1085 	case TABLE_STUB:
1086 		return -1;
1087 
1088 	default:
1089 		return -1;
1090 	}
1091 }
1092 
1093 static int
1094 action_check(struct table_rule_action *action,
1095 	struct pipeline *p,
1096 	uint32_t table_id)
1097 {
1098 	struct table_action_profile *ap;
1099 
1100 	if ((action == NULL) ||
1101 		(p == NULL) ||
1102 		(table_id >= p->n_tables))
1103 		return -1;
1104 
1105 	ap = p->table[table_id].ap;
1106 	if (action->action_mask != ap->params.action_mask)
1107 		return -1;
1108 
1109 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1110 		if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1111 			(action->fwd.id >= p->n_ports_out))
1112 			return -1;
1113 
1114 		if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1115 			(action->fwd.id >= p->n_tables))
1116 			return -1;
1117 	}
1118 
1119 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
1120 		uint32_t tc_mask0 = (1 << ap->params.mtr.n_tc) - 1;
1121 		uint32_t tc_mask1 = action->mtr.tc_mask;
1122 
1123 		if (tc_mask1 != tc_mask0)
1124 			return -1;
1125 	}
1126 
1127 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
1128 		uint32_t n_subports_per_port =
1129 			ap->params.tm.n_subports_per_port;
1130 		uint32_t n_pipes_per_subport =
1131 			ap->params.tm.n_pipes_per_subport;
1132 		uint32_t subport_id = action->tm.subport_id;
1133 		uint32_t pipe_id = action->tm.pipe_id;
1134 
1135 		if ((subport_id >= n_subports_per_port) ||
1136 			(pipe_id >= n_pipes_per_subport))
1137 			return -1;
1138 	}
1139 
1140 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
1141 		uint64_t encap_mask = ap->params.encap.encap_mask;
1142 		enum rte_table_action_encap_type type = action->encap.type;
1143 
1144 		if ((encap_mask & (1LLU << type)) == 0)
1145 			return -1;
1146 	}
1147 
1148 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
1149 		int ip_version0 = ap->params.common.ip_version;
1150 		int ip_version1 = action->nat.ip_version;
1151 
1152 		if ((ip_version1 && (ip_version0 == 0)) ||
1153 			((ip_version1 == 0) && ip_version0))
1154 			return -1;
1155 	}
1156 
1157 	return 0;
1158 }
1159 
1160 static int
1161 action_default_check(struct table_rule_action *action,
1162 	struct pipeline *p,
1163 	uint32_t table_id)
1164 {
1165 	if ((action == NULL) ||
1166 		(action->action_mask != (1LLU << RTE_TABLE_ACTION_FWD)) ||
1167 		(p == NULL) ||
1168 		(table_id >= p->n_tables))
1169 		return -1;
1170 
1171 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1172 		if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1173 			(action->fwd.id >= p->n_ports_out))
1174 			return -1;
1175 
1176 		if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1177 			(action->fwd.id >= p->n_tables))
1178 			return -1;
1179 	}
1180 
1181 	return 0;
1182 }
1183 
1184 union table_rule_match_low_level {
1185 	struct rte_table_acl_rule_add_params acl_add;
1186 	struct rte_table_acl_rule_delete_params acl_delete;
1187 	struct rte_table_array_key array;
1188 	uint8_t hash[TABLE_RULE_MATCH_SIZE_MAX];
1189 	struct rte_table_lpm_key lpm_ipv4;
1190 	struct rte_table_lpm_ipv6_key lpm_ipv6;
1191 };
1192 
1193 static int
1194 match_convert(struct table_rule_match *mh,
1195 	union table_rule_match_low_level *ml,
1196 	int add);
1197 
1198 static int
1199 action_convert(struct rte_table_action *a,
1200 	struct table_rule_action *action,
1201 	struct rte_pipeline_table_entry *data);
1202 
1203 struct table_ll {
1204 	struct rte_pipeline *p;
1205 	int table_id;
1206 	struct rte_table_action *a;
1207 	int bulk_supported;
1208 };
1209 
1210 static int
1211 table_rule_add_bulk_ll(struct table_ll *table,
1212 	struct table_rule_list *list,
1213 	uint32_t *n_rules)
1214 {
1215 	union table_rule_match_low_level *match_ll = NULL;
1216 	uint8_t *action_ll = NULL;
1217 	void **match_ll_ptr = NULL;
1218 	struct rte_pipeline_table_entry **action_ll_ptr = NULL;
1219 	struct rte_pipeline_table_entry **entries_ptr = NULL;
1220 	int *found = NULL;
1221 	struct table_rule *rule;
1222 	uint32_t n, i;
1223 	int status = 0;
1224 
1225 	n = 0;
1226 	TAILQ_FOREACH(rule, list, node)
1227 		n++;
1228 
1229 	/* Memory allocation */
1230 	match_ll = calloc(n, sizeof(union table_rule_match_low_level));
1231 	action_ll = calloc(n, TABLE_RULE_ACTION_SIZE_MAX);
1232 
1233 	match_ll_ptr = calloc(n, sizeof(void *));
1234 	action_ll_ptr = calloc(n, sizeof(struct rte_pipeline_table_entry *));
1235 
1236 	entries_ptr = calloc(n, sizeof(struct rte_pipeline_table_entry *));
1237 	found = calloc(n, sizeof(int));
1238 
1239 	if (match_ll == NULL ||
1240 		action_ll == NULL ||
1241 		match_ll_ptr == NULL ||
1242 		action_ll_ptr == NULL ||
1243 		entries_ptr == NULL ||
1244 		found == NULL) {
1245 			status = -ENOMEM;
1246 			goto table_rule_add_bulk_ll_free;
1247 	}
1248 
1249 	/* Init */
1250 	for (i = 0; i < n; i++) {
1251 		match_ll_ptr[i] = (void *)&match_ll[i];
1252 		action_ll_ptr[i] = (struct rte_pipeline_table_entry *)
1253 			&action_ll[i * TABLE_RULE_ACTION_SIZE_MAX];
1254 	}
1255 
1256 	/* Rule (match, action) conversion */
1257 	i = 0;
1258 	TAILQ_FOREACH(rule, list, node) {
1259 		status = match_convert(&rule->match, match_ll_ptr[i], 1);
1260 		if (status)
1261 			goto table_rule_add_bulk_ll_free;
1262 
1263 		status = action_convert(table->a, &rule->action, action_ll_ptr[i]);
1264 		if (status)
1265 			goto table_rule_add_bulk_ll_free;
1266 
1267 		i++;
1268 	}
1269 
1270 	/* Add rule (match, action) to table */
1271 	if (table->bulk_supported) {
1272 		status = rte_pipeline_table_entry_add_bulk(table->p,
1273 			table->table_id,
1274 			match_ll_ptr,
1275 			action_ll_ptr,
1276 			n,
1277 			found,
1278 			entries_ptr);
1279 		if (status)
1280 			goto table_rule_add_bulk_ll_free;
1281 	} else
1282 		for (i = 0; i < n; i++) {
1283 			status = rte_pipeline_table_entry_add(table->p,
1284 				table->table_id,
1285 				match_ll_ptr[i],
1286 				action_ll_ptr[i],
1287 				&found[i],
1288 				&entries_ptr[i]);
1289 			if (status) {
1290 				if (i == 0)
1291 					goto table_rule_add_bulk_ll_free;
1292 
1293 				/* No roll-back. */
1294 				status = 0;
1295 				n = i;
1296 				break;
1297 			}
1298 		}
1299 
1300 	/* Write back to the rule list. */
1301 	i = 0;
1302 	TAILQ_FOREACH(rule, list, node) {
1303 		if (i >= n)
1304 			break;
1305 
1306 		rule->data = entries_ptr[i];
1307 
1308 		i++;
1309 	}
1310 
1311 	*n_rules = n;
1312 
1313 	/* Free */
1314 table_rule_add_bulk_ll_free:
1315 	free(found);
1316 	free(entries_ptr);
1317 	free(action_ll_ptr);
1318 	free(match_ll_ptr);
1319 	free(action_ll);
1320 	free(match_ll);
1321 
1322 	return status;
1323 }
1324 
1325 int
1326 pipeline_table_rule_add(const char *pipeline_name,
1327 	uint32_t table_id,
1328 	struct table_rule_match *match,
1329 	struct table_rule_action *action)
1330 {
1331 	struct pipeline *p;
1332 	struct table *table;
1333 	struct pipeline_msg_req *req;
1334 	struct pipeline_msg_rsp *rsp;
1335 	struct table_rule *rule;
1336 	int status;
1337 
1338 	/* Check input params */
1339 	if ((pipeline_name == NULL) ||
1340 		(match == NULL) ||
1341 		(action == NULL))
1342 		return -1;
1343 
1344 	p = pipeline_find(pipeline_name);
1345 	if ((p == NULL) ||
1346 		(table_id >= p->n_tables) ||
1347 		match_check(match, p, table_id) ||
1348 		action_check(action, p, table_id))
1349 		return -1;
1350 
1351 	table = &p->table[table_id];
1352 
1353 	rule = calloc(1, sizeof(struct table_rule));
1354 	if (rule == NULL)
1355 		return -1;
1356 
1357 	memcpy(&rule->match, match, sizeof(*match));
1358 	memcpy(&rule->action, action, sizeof(*action));
1359 
1360 	if (!pipeline_is_running(p)) {
1361 		union table_rule_match_low_level match_ll;
1362 		struct rte_pipeline_table_entry *data_in, *data_out;
1363 		int key_found;
1364 		uint8_t *buffer;
1365 
1366 		buffer = calloc(TABLE_RULE_ACTION_SIZE_MAX, sizeof(uint8_t));
1367 		if (buffer == NULL) {
1368 			free(rule);
1369 			return -1;
1370 		}
1371 
1372 		/* Table match-action rule conversion */
1373 		data_in = (struct rte_pipeline_table_entry *)buffer;
1374 
1375 		status = match_convert(match, &match_ll, 1);
1376 		if (status) {
1377 			free(buffer);
1378 			free(rule);
1379 			return -1;
1380 		}
1381 
1382 		status = action_convert(table->a, action, data_in);
1383 		if (status) {
1384 			free(buffer);
1385 			free(rule);
1386 			return -1;
1387 		}
1388 
1389 		/* Add rule (match, action) to table */
1390 		status = rte_pipeline_table_entry_add(p->p,
1391 				table_id,
1392 				&match_ll,
1393 				data_in,
1394 				&key_found,
1395 				&data_out);
1396 		if (status) {
1397 			free(buffer);
1398 			free(rule);
1399 			return -1;
1400 		}
1401 
1402 		/* Write Response */
1403 		rule->data = data_out;
1404 		table_rule_add(table, rule);
1405 
1406 		free(buffer);
1407 		return 0;
1408 	}
1409 
1410 	/* Allocate request */
1411 	req = pipeline_msg_alloc();
1412 	if (req == NULL) {
1413 		free(rule);
1414 		return -1;
1415 	}
1416 
1417 	/* Write request */
1418 	req->type = PIPELINE_REQ_TABLE_RULE_ADD;
1419 	req->id = table_id;
1420 	memcpy(&req->table_rule_add.match, match, sizeof(*match));
1421 	memcpy(&req->table_rule_add.action, action, sizeof(*action));
1422 
1423 	/* Send request and wait for response */
1424 	rsp = pipeline_msg_send_recv(p, req);
1425 
1426 	/* Read response */
1427 	status = rsp->status;
1428 	if (status == 0) {
1429 		rule->data = rsp->table_rule_add.data;
1430 		table_rule_add(table, rule);
1431 	} else
1432 		free(rule);
1433 
1434 	/* Free response */
1435 	pipeline_msg_free(rsp);
1436 
1437 	return status;
1438 }
1439 
1440 int
1441 pipeline_table_rule_add_default(const char *pipeline_name,
1442 	uint32_t table_id,
1443 	struct table_rule_action *action)
1444 {
1445 	struct pipeline *p;
1446 	struct table *table;
1447 	struct pipeline_msg_req *req;
1448 	struct pipeline_msg_rsp *rsp;
1449 	struct table_rule *rule;
1450 	int status;
1451 
1452 	/* Check input params */
1453 	if ((pipeline_name == NULL) ||
1454 		(action == NULL))
1455 		return -1;
1456 
1457 	p = pipeline_find(pipeline_name);
1458 	if ((p == NULL) ||
1459 		(table_id >= p->n_tables) ||
1460 		action_default_check(action, p, table_id))
1461 		return -1;
1462 
1463 	table = &p->table[table_id];
1464 
1465 	rule = calloc(1, sizeof(struct table_rule));
1466 	if (rule == NULL)
1467 		return -1;
1468 
1469 	memcpy(&rule->action, action, sizeof(*action));
1470 
1471 	if (!pipeline_is_running(p)) {
1472 		struct rte_pipeline_table_entry *data_in, *data_out;
1473 		uint8_t *buffer;
1474 
1475 		buffer = calloc(TABLE_RULE_ACTION_SIZE_MAX, sizeof(uint8_t));
1476 		if (buffer == NULL) {
1477 			free(rule);
1478 			return -1;
1479 		}
1480 
1481 		/* Apply actions */
1482 		data_in = (struct rte_pipeline_table_entry *)buffer;
1483 
1484 		data_in->action = action->fwd.action;
1485 		if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
1486 			data_in->port_id = action->fwd.id;
1487 		if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
1488 			data_in->table_id = action->fwd.id;
1489 
1490 		/* Add default rule to table */
1491 		status = rte_pipeline_table_default_entry_add(p->p,
1492 				table_id,
1493 				data_in,
1494 				&data_out);
1495 		if (status) {
1496 			free(buffer);
1497 			free(rule);
1498 			return -1;
1499 		}
1500 
1501 		/* Write Response */
1502 		rule->data = data_out;
1503 		table_rule_default_add(table, rule);
1504 
1505 		free(buffer);
1506 		return 0;
1507 	}
1508 
1509 	/* Allocate request */
1510 	req = pipeline_msg_alloc();
1511 	if (req == NULL) {
1512 		free(rule);
1513 		return -1;
1514 	}
1515 
1516 	/* Write request */
1517 	req->type = PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT;
1518 	req->id = table_id;
1519 	memcpy(&req->table_rule_add_default.action, action, sizeof(*action));
1520 
1521 	/* Send request and wait for response */
1522 	rsp = pipeline_msg_send_recv(p, req);
1523 
1524 	/* Read response */
1525 	status = rsp->status;
1526 	if (status == 0) {
1527 		rule->data = rsp->table_rule_add_default.data;
1528 		table_rule_default_add(table, rule);
1529 	} else
1530 		free(rule);
1531 
1532 	/* Free response */
1533 	pipeline_msg_free(rsp);
1534 
1535 	return status;
1536 }
1537 
1538 static uint32_t
1539 table_rule_list_free(struct table_rule_list *list)
1540 {
1541 	uint32_t n = 0;
1542 
1543 	if (!list)
1544 		return 0;
1545 
1546 	for ( ; ; ) {
1547 		struct table_rule *rule;
1548 
1549 		rule = TAILQ_FIRST(list);
1550 		if (rule == NULL)
1551 			break;
1552 
1553 		TAILQ_REMOVE(list, rule, node);
1554 		free(rule);
1555 		n++;
1556 	}
1557 
1558 	free(list);
1559 	return n;
1560 }
1561 
1562 int
1563 pipeline_table_rule_add_bulk(const char *pipeline_name,
1564 	uint32_t table_id,
1565 	struct table_rule_list *list,
1566 	uint32_t *n_rules_added,
1567 	uint32_t *n_rules_not_added)
1568 {
1569 	struct pipeline *p;
1570 	struct table *table;
1571 	struct pipeline_msg_req *req;
1572 	struct pipeline_msg_rsp *rsp;
1573 	struct table_rule *rule;
1574 	int status = 0;
1575 
1576 	/* Check input params */
1577 	if ((pipeline_name == NULL) ||
1578 		(list == NULL) ||
1579 		TAILQ_EMPTY(list) ||
1580 		(n_rules_added == NULL) ||
1581 		(n_rules_not_added == NULL)) {
1582 		table_rule_list_free(list);
1583 		return -EINVAL;
1584 	}
1585 
1586 	p = pipeline_find(pipeline_name);
1587 	if ((p == NULL) ||
1588 		(table_id >= p->n_tables)) {
1589 		table_rule_list_free(list);
1590 		return -EINVAL;
1591 	}
1592 
1593 	table = &p->table[table_id];
1594 
1595 	TAILQ_FOREACH(rule, list, node)
1596 		if (match_check(&rule->match, p, table_id) ||
1597 			action_check(&rule->action, p, table_id)) {
1598 			table_rule_list_free(list);
1599 			return -EINVAL;
1600 		}
1601 
1602 	if (!pipeline_is_running(p)) {
1603 		struct table_ll table_ll = {
1604 			.p = p->p,
1605 			.table_id = table_id,
1606 			.a = table->a,
1607 			.bulk_supported = table->params.match_type == TABLE_ACL,
1608 		};
1609 
1610 		status = table_rule_add_bulk_ll(&table_ll, list, n_rules_added);
1611 		if (status) {
1612 			table_rule_list_free(list);
1613 			return status;
1614 		}
1615 
1616 		table_rule_add_bulk(table, list, *n_rules_added);
1617 		*n_rules_not_added = table_rule_list_free(list);
1618 		return 0;
1619 	}
1620 
1621 	/* Allocate request */
1622 	req = pipeline_msg_alloc();
1623 	if (req == NULL) {
1624 		table_rule_list_free(list);
1625 		return -ENOMEM;
1626 	}
1627 
1628 	/* Write request */
1629 	req->type = PIPELINE_REQ_TABLE_RULE_ADD_BULK;
1630 	req->id = table_id;
1631 	req->table_rule_add_bulk.list = list;
1632 	req->table_rule_add_bulk.bulk = table->params.match_type == TABLE_ACL;
1633 
1634 	/* Send request and wait for response */
1635 	rsp = pipeline_msg_send_recv(p, req);
1636 
1637 	/* Read response */
1638 	status = rsp->status;
1639 	if (status == 0) {
1640 		*n_rules_added = rsp->table_rule_add_bulk.n_rules;
1641 
1642 		table_rule_add_bulk(table, list, *n_rules_added);
1643 		*n_rules_not_added = table_rule_list_free(list);
1644 	} else
1645 		table_rule_list_free(list);
1646 
1647 
1648 	/* Free response */
1649 	pipeline_msg_free(rsp);
1650 
1651 	return status;
1652 }
1653 
1654 int
1655 pipeline_table_rule_delete(const char *pipeline_name,
1656 	uint32_t table_id,
1657 	struct table_rule_match *match)
1658 {
1659 	struct pipeline *p;
1660 	struct table *table;
1661 	struct pipeline_msg_req *req;
1662 	struct pipeline_msg_rsp *rsp;
1663 	int status;
1664 
1665 	/* Check input params */
1666 	if ((pipeline_name == NULL) ||
1667 		(match == NULL))
1668 		return -1;
1669 
1670 	p = pipeline_find(pipeline_name);
1671 	if ((p == NULL) ||
1672 		(table_id >= p->n_tables) ||
1673 		match_check(match, p, table_id))
1674 		return -1;
1675 
1676 	table = &p->table[table_id];
1677 
1678 	if (!pipeline_is_running(p)) {
1679 		union table_rule_match_low_level match_ll;
1680 		int key_found;
1681 
1682 		status = match_convert(match, &match_ll, 0);
1683 		if (status)
1684 			return -1;
1685 
1686 		status = rte_pipeline_table_entry_delete(p->p,
1687 				table_id,
1688 				&match_ll,
1689 				&key_found,
1690 				NULL);
1691 
1692 		if (status == 0)
1693 			table_rule_delete(table, match);
1694 
1695 		return status;
1696 	}
1697 
1698 	/* Allocate request */
1699 	req = pipeline_msg_alloc();
1700 	if (req == NULL)
1701 		return -1;
1702 
1703 	/* Write request */
1704 	req->type = PIPELINE_REQ_TABLE_RULE_DELETE;
1705 	req->id = table_id;
1706 	memcpy(&req->table_rule_delete.match, match, sizeof(*match));
1707 
1708 	/* Send request and wait for response */
1709 	rsp = pipeline_msg_send_recv(p, req);
1710 
1711 	/* Read response */
1712 	status = rsp->status;
1713 	if (status == 0)
1714 		table_rule_delete(table, match);
1715 
1716 	/* Free response */
1717 	pipeline_msg_free(rsp);
1718 
1719 	return status;
1720 }
1721 
1722 int
1723 pipeline_table_rule_delete_default(const char *pipeline_name,
1724 	uint32_t table_id)
1725 {
1726 	struct pipeline *p;
1727 	struct table *table;
1728 	struct pipeline_msg_req *req;
1729 	struct pipeline_msg_rsp *rsp;
1730 	int status;
1731 
1732 	/* Check input params */
1733 	if (pipeline_name == NULL)
1734 		return -1;
1735 
1736 	p = pipeline_find(pipeline_name);
1737 	if ((p == NULL) ||
1738 		(table_id >= p->n_tables))
1739 		return -1;
1740 
1741 	table = &p->table[table_id];
1742 
1743 	if (!pipeline_is_running(p)) {
1744 		status = rte_pipeline_table_default_entry_delete(p->p,
1745 			table_id,
1746 			NULL);
1747 
1748 		if (status == 0)
1749 			table_rule_default_delete(table);
1750 
1751 		return status;
1752 	}
1753 
1754 	/* Allocate request */
1755 	req = pipeline_msg_alloc();
1756 	if (req == NULL)
1757 		return -1;
1758 
1759 	/* Write request */
1760 	req->type = PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT;
1761 	req->id = table_id;
1762 
1763 	/* Send request and wait for response */
1764 	rsp = pipeline_msg_send_recv(p, req);
1765 
1766 	/* Read response */
1767 	status = rsp->status;
1768 	if (status == 0)
1769 		table_rule_default_delete(table);
1770 
1771 	/* Free response */
1772 	pipeline_msg_free(rsp);
1773 
1774 	return status;
1775 }
1776 
1777 int
1778 pipeline_table_rule_stats_read(const char *pipeline_name,
1779 	uint32_t table_id,
1780 	struct table_rule_match *match,
1781 	struct rte_table_action_stats_counters *stats,
1782 	int clear)
1783 {
1784 	struct pipeline *p;
1785 	struct table *table;
1786 	struct pipeline_msg_req *req;
1787 	struct pipeline_msg_rsp *rsp;
1788 	struct table_rule *rule;
1789 	int status;
1790 
1791 	/* Check input params */
1792 	if ((pipeline_name == NULL) ||
1793 		(match == NULL) ||
1794 		(stats == NULL))
1795 		return -1;
1796 
1797 	p = pipeline_find(pipeline_name);
1798 	if ((p == NULL) ||
1799 		(table_id >= p->n_tables) ||
1800 		match_check(match, p, table_id))
1801 		return -1;
1802 
1803 	table = &p->table[table_id];
1804 	rule = table_rule_find(table, match);
1805 	if (rule == NULL)
1806 		return -1;
1807 
1808 	if (!pipeline_is_running(p)) {
1809 		status = rte_table_action_stats_read(table->a,
1810 			rule->data,
1811 			stats,
1812 			clear);
1813 
1814 		return status;
1815 	}
1816 
1817 	/* Allocate request */
1818 	req = pipeline_msg_alloc();
1819 	if (req == NULL)
1820 		return -1;
1821 
1822 	/* Write request */
1823 	req->type = PIPELINE_REQ_TABLE_RULE_STATS_READ;
1824 	req->id = table_id;
1825 	req->table_rule_stats_read.data = rule->data;
1826 	req->table_rule_stats_read.clear = clear;
1827 
1828 	/* Send request and wait for response */
1829 	rsp = pipeline_msg_send_recv(p, req);
1830 
1831 	/* Read response */
1832 	status = rsp->status;
1833 	if (status == 0)
1834 		memcpy(stats, &rsp->table_rule_stats_read.stats, sizeof(*stats));
1835 
1836 	/* Free response */
1837 	pipeline_msg_free(rsp);
1838 
1839 	return status;
1840 }
1841 
1842 int
1843 pipeline_table_mtr_profile_add(const char *pipeline_name,
1844 	uint32_t table_id,
1845 	uint32_t meter_profile_id,
1846 	struct rte_table_action_meter_profile *profile)
1847 {
1848 	struct pipeline *p;
1849 	struct pipeline_msg_req *req;
1850 	struct pipeline_msg_rsp *rsp;
1851 	int status;
1852 
1853 	/* Check input params */
1854 	if ((pipeline_name == NULL) ||
1855 		(profile == NULL))
1856 		return -1;
1857 
1858 	p = pipeline_find(pipeline_name);
1859 	if ((p == NULL) ||
1860 		(table_id >= p->n_tables))
1861 		return -1;
1862 
1863 	if (!pipeline_is_running(p)) {
1864 		struct rte_table_action *a = p->table[table_id].a;
1865 
1866 		status = rte_table_action_meter_profile_add(a,
1867 			meter_profile_id,
1868 			profile);
1869 
1870 		return status;
1871 	}
1872 
1873 	/* Allocate request */
1874 	req = pipeline_msg_alloc();
1875 	if (req == NULL)
1876 		return -1;
1877 
1878 	/* Write request */
1879 	req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_ADD;
1880 	req->id = table_id;
1881 	req->table_mtr_profile_add.meter_profile_id = meter_profile_id;
1882 	memcpy(&req->table_mtr_profile_add.profile, profile, sizeof(*profile));
1883 
1884 	/* Send request and wait for response */
1885 	rsp = pipeline_msg_send_recv(p, req);
1886 
1887 	/* Read response */
1888 	status = rsp->status;
1889 
1890 	/* Free response */
1891 	pipeline_msg_free(rsp);
1892 
1893 	return status;
1894 }
1895 
1896 int
1897 pipeline_table_mtr_profile_delete(const char *pipeline_name,
1898 	uint32_t table_id,
1899 	uint32_t meter_profile_id)
1900 {
1901 	struct pipeline *p;
1902 	struct pipeline_msg_req *req;
1903 	struct pipeline_msg_rsp *rsp;
1904 	int status;
1905 
1906 	/* Check input params */
1907 	if (pipeline_name == NULL)
1908 		return -1;
1909 
1910 	p = pipeline_find(pipeline_name);
1911 	if ((p == NULL) ||
1912 		(table_id >= p->n_tables))
1913 		return -1;
1914 
1915 	if (!pipeline_is_running(p)) {
1916 		struct rte_table_action *a = p->table[table_id].a;
1917 
1918 		status = rte_table_action_meter_profile_delete(a,
1919 				meter_profile_id);
1920 
1921 		return status;
1922 	}
1923 
1924 	/* Allocate request */
1925 	req = pipeline_msg_alloc();
1926 	if (req == NULL)
1927 		return -1;
1928 
1929 	/* Write request */
1930 	req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE;
1931 	req->id = table_id;
1932 	req->table_mtr_profile_delete.meter_profile_id = meter_profile_id;
1933 
1934 	/* Send request and wait for response */
1935 	rsp = pipeline_msg_send_recv(p, req);
1936 
1937 	/* Read response */
1938 	status = rsp->status;
1939 
1940 	/* Free response */
1941 	pipeline_msg_free(rsp);
1942 
1943 	return status;
1944 }
1945 
1946 int
1947 pipeline_table_rule_mtr_read(const char *pipeline_name,
1948 	uint32_t table_id,
1949 	struct table_rule_match *match,
1950 	struct rte_table_action_mtr_counters *stats,
1951 	int clear)
1952 {
1953 	struct pipeline *p;
1954 	struct table *table;
1955 	struct pipeline_msg_req *req;
1956 	struct pipeline_msg_rsp *rsp;
1957 	struct table_rule *rule;
1958 	uint32_t tc_mask;
1959 	int status;
1960 
1961 	/* Check input params */
1962 	if ((pipeline_name == NULL) ||
1963 		(match == NULL) ||
1964 		(stats == NULL))
1965 		return -1;
1966 
1967 	p = pipeline_find(pipeline_name);
1968 	if ((p == NULL) ||
1969 		(table_id >= p->n_tables) ||
1970 		match_check(match, p, table_id))
1971 		return -1;
1972 
1973 	table = &p->table[table_id];
1974 	tc_mask = (1 << table->ap->params.mtr.n_tc) - 1;
1975 
1976 	rule = table_rule_find(table, match);
1977 	if (rule == NULL)
1978 		return -1;
1979 
1980 	if (!pipeline_is_running(p)) {
1981 		status = rte_table_action_meter_read(table->a,
1982 				rule->data,
1983 				tc_mask,
1984 				stats,
1985 				clear);
1986 
1987 		return status;
1988 	}
1989 
1990 	/* Allocate request */
1991 	req = pipeline_msg_alloc();
1992 	if (req == NULL)
1993 		return -1;
1994 
1995 	/* Write request */
1996 	req->type = PIPELINE_REQ_TABLE_RULE_MTR_READ;
1997 	req->id = table_id;
1998 	req->table_rule_mtr_read.data = rule->data;
1999 	req->table_rule_mtr_read.tc_mask = tc_mask;
2000 	req->table_rule_mtr_read.clear = clear;
2001 
2002 	/* Send request and wait for response */
2003 	rsp = pipeline_msg_send_recv(p, req);
2004 
2005 	/* Read response */
2006 	status = rsp->status;
2007 	if (status == 0)
2008 		memcpy(stats, &rsp->table_rule_mtr_read.stats, sizeof(*stats));
2009 
2010 	/* Free response */
2011 	pipeline_msg_free(rsp);
2012 
2013 	return status;
2014 }
2015 
2016 int
2017 pipeline_table_dscp_table_update(const char *pipeline_name,
2018 	uint32_t table_id,
2019 	uint64_t dscp_mask,
2020 	struct rte_table_action_dscp_table *dscp_table)
2021 {
2022 	struct pipeline *p;
2023 	struct pipeline_msg_req *req;
2024 	struct pipeline_msg_rsp *rsp;
2025 	int status;
2026 
2027 	/* Check input params */
2028 	if ((pipeline_name == NULL) ||
2029 		(dscp_table == NULL))
2030 		return -1;
2031 
2032 	p = pipeline_find(pipeline_name);
2033 	if ((p == NULL) ||
2034 		(table_id >= p->n_tables))
2035 		return -1;
2036 
2037 	if (!pipeline_is_running(p)) {
2038 		struct rte_table_action *a = p->table[table_id].a;
2039 
2040 		status = rte_table_action_dscp_table_update(a,
2041 				dscp_mask,
2042 				dscp_table);
2043 
2044 		return status;
2045 	}
2046 
2047 	/* Allocate request */
2048 	req = pipeline_msg_alloc();
2049 	if (req == NULL)
2050 		return -1;
2051 
2052 	/* Write request */
2053 	req->type = PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE;
2054 	req->id = table_id;
2055 	req->table_dscp_table_update.dscp_mask = dscp_mask;
2056 	memcpy(&req->table_dscp_table_update.dscp_table,
2057 		dscp_table, sizeof(*dscp_table));
2058 
2059 	/* Send request and wait for response */
2060 	rsp = pipeline_msg_send_recv(p, req);
2061 
2062 	/* Read response */
2063 	status = rsp->status;
2064 
2065 	/* Free response */
2066 	pipeline_msg_free(rsp);
2067 
2068 	return status;
2069 }
2070 
2071 int
2072 pipeline_table_rule_ttl_read(const char *pipeline_name,
2073 	uint32_t table_id,
2074 	struct table_rule_match *match,
2075 	struct rte_table_action_ttl_counters *stats,
2076 	int clear)
2077 {
2078 	struct pipeline *p;
2079 	struct table *table;
2080 	struct pipeline_msg_req *req;
2081 	struct pipeline_msg_rsp *rsp;
2082 	struct table_rule *rule;
2083 	int status;
2084 
2085 	/* Check input params */
2086 	if ((pipeline_name == NULL) ||
2087 		(match == NULL) ||
2088 		(stats == NULL))
2089 		return -1;
2090 
2091 	p = pipeline_find(pipeline_name);
2092 	if ((p == NULL) ||
2093 		(table_id >= p->n_tables) ||
2094 		match_check(match, p, table_id))
2095 		return -1;
2096 
2097 	table = &p->table[table_id];
2098 	if (!table->ap->params.ttl.n_packets_enabled)
2099 		return -1;
2100 
2101 	rule = table_rule_find(table, match);
2102 	if (rule == NULL)
2103 		return -1;
2104 
2105 	if (!pipeline_is_running(p)) {
2106 		status = rte_table_action_ttl_read(table->a,
2107 				rule->data,
2108 				stats,
2109 				clear);
2110 
2111 		return status;
2112 	}
2113 
2114 	/* Allocate request */
2115 	req = pipeline_msg_alloc();
2116 	if (req == NULL)
2117 		return -1;
2118 
2119 	/* Write request */
2120 	req->type = PIPELINE_REQ_TABLE_RULE_TTL_READ;
2121 	req->id = table_id;
2122 	req->table_rule_ttl_read.data = rule->data;
2123 	req->table_rule_ttl_read.clear = clear;
2124 
2125 	/* Send request and wait for response */
2126 	rsp = pipeline_msg_send_recv(p, req);
2127 
2128 	/* Read response */
2129 	status = rsp->status;
2130 	if (status == 0)
2131 		memcpy(stats, &rsp->table_rule_ttl_read.stats, sizeof(*stats));
2132 
2133 	/* Free response */
2134 	pipeline_msg_free(rsp);
2135 
2136 	return status;
2137 }
2138 
2139 int
2140 pipeline_table_rule_time_read(const char *pipeline_name,
2141 	uint32_t table_id,
2142 	struct table_rule_match *match,
2143 	uint64_t *timestamp)
2144 {
2145 	struct pipeline *p;
2146 	struct table *table;
2147 	struct pipeline_msg_req *req;
2148 	struct pipeline_msg_rsp *rsp;
2149 	struct table_rule *rule;
2150 	int status;
2151 
2152 	/* Check input params */
2153 	if ((pipeline_name == NULL) ||
2154 		(match == NULL) ||
2155 		(timestamp == NULL))
2156 		return -1;
2157 
2158 	p = pipeline_find(pipeline_name);
2159 	if ((p == NULL) ||
2160 		(table_id >= p->n_tables) ||
2161 		match_check(match, p, table_id))
2162 		return -1;
2163 
2164 	table = &p->table[table_id];
2165 
2166 	rule = table_rule_find(table, match);
2167 	if (rule == NULL)
2168 		return -1;
2169 
2170 	if (!pipeline_is_running(p)) {
2171 		status = rte_table_action_time_read(table->a,
2172 				rule->data,
2173 				timestamp);
2174 
2175 		return status;
2176 	}
2177 
2178 	/* Allocate request */
2179 	req = pipeline_msg_alloc();
2180 	if (req == NULL)
2181 		return -1;
2182 
2183 	/* Write request */
2184 	req->type = PIPELINE_REQ_TABLE_RULE_TIME_READ;
2185 	req->id = table_id;
2186 	req->table_rule_time_read.data = rule->data;
2187 
2188 	/* Send request and wait for response */
2189 	rsp = pipeline_msg_send_recv(p, req);
2190 
2191 	/* Read response */
2192 	status = rsp->status;
2193 	if (status == 0)
2194 		*timestamp = rsp->table_rule_time_read.timestamp;
2195 
2196 	/* Free response */
2197 	pipeline_msg_free(rsp);
2198 
2199 	return status;
2200 }
2201 
2202 /**
2203  * Data plane threads: message handling
2204  */
2205 static inline struct pipeline_msg_req *
2206 pipeline_msg_recv(struct rte_ring *msgq_req)
2207 {
2208 	struct pipeline_msg_req *req;
2209 
2210 	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
2211 
2212 	if (status != 0)
2213 		return NULL;
2214 
2215 	return req;
2216 }
2217 
2218 static inline void
2219 pipeline_msg_send(struct rte_ring *msgq_rsp,
2220 	struct pipeline_msg_rsp *rsp)
2221 {
2222 	int status;
2223 
2224 	do {
2225 		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
2226 	} while (status == -ENOBUFS);
2227 }
2228 
2229 static struct pipeline_msg_rsp *
2230 pipeline_msg_handle_port_in_stats_read(struct pipeline_data *p,
2231 	struct pipeline_msg_req *req)
2232 {
2233 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2234 	uint32_t port_id = req->id;
2235 	int clear = req->port_in_stats_read.clear;
2236 
2237 	rsp->status = rte_pipeline_port_in_stats_read(p->p,
2238 		port_id,
2239 		&rsp->port_in_stats_read.stats,
2240 		clear);
2241 
2242 	return rsp;
2243 }
2244 
2245 static struct pipeline_msg_rsp *
2246 pipeline_msg_handle_port_in_enable(struct pipeline_data *p,
2247 	struct pipeline_msg_req *req)
2248 {
2249 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2250 	uint32_t port_id = req->id;
2251 
2252 	rsp->status = rte_pipeline_port_in_enable(p->p,
2253 		port_id);
2254 
2255 	return rsp;
2256 }
2257 
2258 static struct pipeline_msg_rsp *
2259 pipeline_msg_handle_port_in_disable(struct pipeline_data *p,
2260 	struct pipeline_msg_req *req)
2261 {
2262 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2263 	uint32_t port_id = req->id;
2264 
2265 	rsp->status = rte_pipeline_port_in_disable(p->p,
2266 		port_id);
2267 
2268 	return rsp;
2269 }
2270 
2271 static struct pipeline_msg_rsp *
2272 pipeline_msg_handle_port_out_stats_read(struct pipeline_data *p,
2273 	struct pipeline_msg_req *req)
2274 {
2275 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2276 	uint32_t port_id = req->id;
2277 	int clear = req->port_out_stats_read.clear;
2278 
2279 	rsp->status = rte_pipeline_port_out_stats_read(p->p,
2280 		port_id,
2281 		&rsp->port_out_stats_read.stats,
2282 		clear);
2283 
2284 	return rsp;
2285 }
2286 
2287 static struct pipeline_msg_rsp *
2288 pipeline_msg_handle_table_stats_read(struct pipeline_data *p,
2289 	struct pipeline_msg_req *req)
2290 {
2291 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2292 	uint32_t port_id = req->id;
2293 	int clear = req->table_stats_read.clear;
2294 
2295 	rsp->status = rte_pipeline_table_stats_read(p->p,
2296 		port_id,
2297 		&rsp->table_stats_read.stats,
2298 		clear);
2299 
2300 	return rsp;
2301 }
2302 
2303 static int
2304 match_convert_ipv6_depth(uint32_t depth, uint32_t *depth32)
2305 {
2306 	if (depth > 128)
2307 		return -1;
2308 
2309 	switch (depth / 32) {
2310 	case 0:
2311 		depth32[0] = depth;
2312 		depth32[1] = 0;
2313 		depth32[2] = 0;
2314 		depth32[3] = 0;
2315 		return 0;
2316 
2317 	case 1:
2318 		depth32[0] = 32;
2319 		depth32[1] = depth - 32;
2320 		depth32[2] = 0;
2321 		depth32[3] = 0;
2322 		return 0;
2323 
2324 	case 2:
2325 		depth32[0] = 32;
2326 		depth32[1] = 32;
2327 		depth32[2] = depth - 64;
2328 		depth32[3] = 0;
2329 		return 0;
2330 
2331 	case 3:
2332 		depth32[0] = 32;
2333 		depth32[1] = 32;
2334 		depth32[2] = 32;
2335 		depth32[3] = depth - 96;
2336 		return 0;
2337 
2338 	case 4:
2339 		depth32[0] = 32;
2340 		depth32[1] = 32;
2341 		depth32[2] = 32;
2342 		depth32[3] = 32;
2343 		return 0;
2344 
2345 	default:
2346 		return -1;
2347 	}
2348 }
2349 
2350 static int
2351 match_convert(struct table_rule_match *mh,
2352 	union table_rule_match_low_level *ml,
2353 	int add)
2354 {
2355 	memset(ml, 0, sizeof(*ml));
2356 
2357 	switch (mh->match_type) {
2358 	case TABLE_ACL:
2359 		if (mh->match.acl.ip_version)
2360 			if (add) {
2361 				ml->acl_add.field_value[0].value.u8 =
2362 					mh->match.acl.proto;
2363 				ml->acl_add.field_value[0].mask_range.u8 =
2364 					mh->match.acl.proto_mask;
2365 
2366 				ml->acl_add.field_value[1].value.u32 =
2367 					mh->match.acl.ipv4.sa;
2368 				ml->acl_add.field_value[1].mask_range.u32 =
2369 					mh->match.acl.sa_depth;
2370 
2371 				ml->acl_add.field_value[2].value.u32 =
2372 					mh->match.acl.ipv4.da;
2373 				ml->acl_add.field_value[2].mask_range.u32 =
2374 					mh->match.acl.da_depth;
2375 
2376 				ml->acl_add.field_value[3].value.u16 =
2377 					mh->match.acl.sp0;
2378 				ml->acl_add.field_value[3].mask_range.u16 =
2379 					mh->match.acl.sp1;
2380 
2381 				ml->acl_add.field_value[4].value.u16 =
2382 					mh->match.acl.dp0;
2383 				ml->acl_add.field_value[4].mask_range.u16 =
2384 					mh->match.acl.dp1;
2385 
2386 				ml->acl_add.priority =
2387 					(int32_t) mh->match.acl.priority;
2388 			} else {
2389 				ml->acl_delete.field_value[0].value.u8 =
2390 					mh->match.acl.proto;
2391 				ml->acl_delete.field_value[0].mask_range.u8 =
2392 					mh->match.acl.proto_mask;
2393 
2394 				ml->acl_delete.field_value[1].value.u32 =
2395 					mh->match.acl.ipv4.sa;
2396 				ml->acl_delete.field_value[1].mask_range.u32 =
2397 					mh->match.acl.sa_depth;
2398 
2399 				ml->acl_delete.field_value[2].value.u32 =
2400 					mh->match.acl.ipv4.da;
2401 				ml->acl_delete.field_value[2].mask_range.u32 =
2402 					mh->match.acl.da_depth;
2403 
2404 				ml->acl_delete.field_value[3].value.u16 =
2405 					mh->match.acl.sp0;
2406 				ml->acl_delete.field_value[3].mask_range.u16 =
2407 					mh->match.acl.sp1;
2408 
2409 				ml->acl_delete.field_value[4].value.u16 =
2410 					mh->match.acl.dp0;
2411 				ml->acl_delete.field_value[4].mask_range.u16 =
2412 					mh->match.acl.dp1;
2413 			}
2414 		else
2415 			if (add) {
2416 				uint32_t *sa32 =
2417 					(uint32_t *) mh->match.acl.ipv6.sa;
2418 				uint32_t *da32 =
2419 					(uint32_t *) mh->match.acl.ipv6.da;
2420 				uint32_t sa32_depth[4], da32_depth[4];
2421 				int status;
2422 
2423 				status = match_convert_ipv6_depth(
2424 					mh->match.acl.sa_depth,
2425 					sa32_depth);
2426 				if (status)
2427 					return status;
2428 
2429 				status = match_convert_ipv6_depth(
2430 					mh->match.acl.da_depth,
2431 					da32_depth);
2432 				if (status)
2433 					return status;
2434 
2435 				ml->acl_add.field_value[0].value.u8 =
2436 					mh->match.acl.proto;
2437 				ml->acl_add.field_value[0].mask_range.u8 =
2438 					mh->match.acl.proto_mask;
2439 
2440 				ml->acl_add.field_value[1].value.u32 =
2441 					rte_be_to_cpu_32(sa32[0]);
2442 				ml->acl_add.field_value[1].mask_range.u32 =
2443 					sa32_depth[0];
2444 				ml->acl_add.field_value[2].value.u32 =
2445 					rte_be_to_cpu_32(sa32[1]);
2446 				ml->acl_add.field_value[2].mask_range.u32 =
2447 					sa32_depth[1];
2448 				ml->acl_add.field_value[3].value.u32 =
2449 					rte_be_to_cpu_32(sa32[2]);
2450 				ml->acl_add.field_value[3].mask_range.u32 =
2451 					sa32_depth[2];
2452 				ml->acl_add.field_value[4].value.u32 =
2453 					rte_be_to_cpu_32(sa32[3]);
2454 				ml->acl_add.field_value[4].mask_range.u32 =
2455 					sa32_depth[3];
2456 
2457 				ml->acl_add.field_value[5].value.u32 =
2458 					rte_be_to_cpu_32(da32[0]);
2459 				ml->acl_add.field_value[5].mask_range.u32 =
2460 					da32_depth[0];
2461 				ml->acl_add.field_value[6].value.u32 =
2462 					rte_be_to_cpu_32(da32[1]);
2463 				ml->acl_add.field_value[6].mask_range.u32 =
2464 					da32_depth[1];
2465 				ml->acl_add.field_value[7].value.u32 =
2466 					rte_be_to_cpu_32(da32[2]);
2467 				ml->acl_add.field_value[7].mask_range.u32 =
2468 					da32_depth[2];
2469 				ml->acl_add.field_value[8].value.u32 =
2470 					rte_be_to_cpu_32(da32[3]);
2471 				ml->acl_add.field_value[8].mask_range.u32 =
2472 					da32_depth[3];
2473 
2474 				ml->acl_add.field_value[9].value.u16 =
2475 					mh->match.acl.sp0;
2476 				ml->acl_add.field_value[9].mask_range.u16 =
2477 					mh->match.acl.sp1;
2478 
2479 				ml->acl_add.field_value[10].value.u16 =
2480 					mh->match.acl.dp0;
2481 				ml->acl_add.field_value[10].mask_range.u16 =
2482 					mh->match.acl.dp1;
2483 
2484 				ml->acl_add.priority =
2485 					(int32_t) mh->match.acl.priority;
2486 			} else {
2487 				uint32_t *sa32 =
2488 					(uint32_t *) mh->match.acl.ipv6.sa;
2489 				uint32_t *da32 =
2490 					(uint32_t *) mh->match.acl.ipv6.da;
2491 				uint32_t sa32_depth[4], da32_depth[4];
2492 				int status;
2493 
2494 				status = match_convert_ipv6_depth(
2495 					mh->match.acl.sa_depth,
2496 					sa32_depth);
2497 				if (status)
2498 					return status;
2499 
2500 				status = match_convert_ipv6_depth(
2501 					mh->match.acl.da_depth,
2502 					da32_depth);
2503 				if (status)
2504 					return status;
2505 
2506 				ml->acl_delete.field_value[0].value.u8 =
2507 					mh->match.acl.proto;
2508 				ml->acl_delete.field_value[0].mask_range.u8 =
2509 					mh->match.acl.proto_mask;
2510 
2511 				ml->acl_delete.field_value[1].value.u32 =
2512 					rte_be_to_cpu_32(sa32[0]);
2513 				ml->acl_delete.field_value[1].mask_range.u32 =
2514 					sa32_depth[0];
2515 				ml->acl_delete.field_value[2].value.u32 =
2516 					rte_be_to_cpu_32(sa32[1]);
2517 				ml->acl_delete.field_value[2].mask_range.u32 =
2518 					sa32_depth[1];
2519 				ml->acl_delete.field_value[3].value.u32 =
2520 					rte_be_to_cpu_32(sa32[2]);
2521 				ml->acl_delete.field_value[3].mask_range.u32 =
2522 					sa32_depth[2];
2523 				ml->acl_delete.field_value[4].value.u32 =
2524 					rte_be_to_cpu_32(sa32[3]);
2525 				ml->acl_delete.field_value[4].mask_range.u32 =
2526 					sa32_depth[3];
2527 
2528 				ml->acl_delete.field_value[5].value.u32 =
2529 					rte_be_to_cpu_32(da32[0]);
2530 				ml->acl_delete.field_value[5].mask_range.u32 =
2531 					da32_depth[0];
2532 				ml->acl_delete.field_value[6].value.u32 =
2533 					rte_be_to_cpu_32(da32[1]);
2534 				ml->acl_delete.field_value[6].mask_range.u32 =
2535 					da32_depth[1];
2536 				ml->acl_delete.field_value[7].value.u32 =
2537 					rte_be_to_cpu_32(da32[2]);
2538 				ml->acl_delete.field_value[7].mask_range.u32 =
2539 					da32_depth[2];
2540 				ml->acl_delete.field_value[8].value.u32 =
2541 					rte_be_to_cpu_32(da32[3]);
2542 				ml->acl_delete.field_value[8].mask_range.u32 =
2543 					da32_depth[3];
2544 
2545 				ml->acl_delete.field_value[9].value.u16 =
2546 					mh->match.acl.sp0;
2547 				ml->acl_delete.field_value[9].mask_range.u16 =
2548 					mh->match.acl.sp1;
2549 
2550 				ml->acl_delete.field_value[10].value.u16 =
2551 					mh->match.acl.dp0;
2552 				ml->acl_delete.field_value[10].mask_range.u16 =
2553 					mh->match.acl.dp1;
2554 			}
2555 		return 0;
2556 
2557 	case TABLE_ARRAY:
2558 		ml->array.pos = mh->match.array.pos;
2559 		return 0;
2560 
2561 	case TABLE_HASH:
2562 		memcpy(ml->hash, mh->match.hash.key, sizeof(ml->hash));
2563 		return 0;
2564 
2565 	case TABLE_LPM:
2566 		if (mh->match.lpm.ip_version) {
2567 			ml->lpm_ipv4.ip = mh->match.lpm.ipv4;
2568 			ml->lpm_ipv4.depth = mh->match.lpm.depth;
2569 		} else {
2570 			memcpy(ml->lpm_ipv6.ip,
2571 				mh->match.lpm.ipv6, sizeof(ml->lpm_ipv6.ip));
2572 			ml->lpm_ipv6.depth = mh->match.lpm.depth;
2573 		}
2574 
2575 		return 0;
2576 
2577 	default:
2578 		return -1;
2579 	}
2580 }
2581 
2582 static int
2583 action_convert(struct rte_table_action *a,
2584 	struct table_rule_action *action,
2585 	struct rte_pipeline_table_entry *data)
2586 {
2587 	int status;
2588 
2589 	/* Apply actions */
2590 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
2591 		status = rte_table_action_apply(a,
2592 			data,
2593 			RTE_TABLE_ACTION_FWD,
2594 			&action->fwd);
2595 
2596 		if (status)
2597 			return status;
2598 	}
2599 
2600 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_LB)) {
2601 		status = rte_table_action_apply(a,
2602 			data,
2603 			RTE_TABLE_ACTION_LB,
2604 			&action->lb);
2605 
2606 		if (status)
2607 			return status;
2608 	}
2609 
2610 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
2611 		status = rte_table_action_apply(a,
2612 			data,
2613 			RTE_TABLE_ACTION_MTR,
2614 			&action->mtr);
2615 
2616 		if (status)
2617 			return status;
2618 	}
2619 
2620 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
2621 		status = rte_table_action_apply(a,
2622 			data,
2623 			RTE_TABLE_ACTION_TM,
2624 			&action->tm);
2625 
2626 		if (status)
2627 			return status;
2628 	}
2629 
2630 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
2631 		status = rte_table_action_apply(a,
2632 			data,
2633 			RTE_TABLE_ACTION_ENCAP,
2634 			&action->encap);
2635 
2636 		if (status)
2637 			return status;
2638 	}
2639 
2640 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
2641 		status = rte_table_action_apply(a,
2642 			data,
2643 			RTE_TABLE_ACTION_NAT,
2644 			&action->nat);
2645 
2646 		if (status)
2647 			return status;
2648 	}
2649 
2650 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TTL)) {
2651 		status = rte_table_action_apply(a,
2652 			data,
2653 			RTE_TABLE_ACTION_TTL,
2654 			&action->ttl);
2655 
2656 		if (status)
2657 			return status;
2658 	}
2659 
2660 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_STATS)) {
2661 		status = rte_table_action_apply(a,
2662 			data,
2663 			RTE_TABLE_ACTION_STATS,
2664 			&action->stats);
2665 
2666 		if (status)
2667 			return status;
2668 	}
2669 
2670 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TIME)) {
2671 		status = rte_table_action_apply(a,
2672 			data,
2673 			RTE_TABLE_ACTION_TIME,
2674 			&action->time);
2675 
2676 		if (status)
2677 			return status;
2678 	}
2679 
2680 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_SYM_CRYPTO)) {
2681 		status = rte_table_action_apply(a,
2682 			data,
2683 			RTE_TABLE_ACTION_SYM_CRYPTO,
2684 			&action->sym_crypto);
2685 
2686 		if (status)
2687 			return status;
2688 	}
2689 
2690 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TAG)) {
2691 		status = rte_table_action_apply(a,
2692 			data,
2693 			RTE_TABLE_ACTION_TAG,
2694 			&action->tag);
2695 
2696 		if (status)
2697 			return status;
2698 	}
2699 
2700 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_DECAP)) {
2701 		status = rte_table_action_apply(a,
2702 			data,
2703 			RTE_TABLE_ACTION_DECAP,
2704 			&action->decap);
2705 
2706 		if (status)
2707 			return status;
2708 	}
2709 
2710 	return 0;
2711 }
2712 
2713 static struct pipeline_msg_rsp *
2714 pipeline_msg_handle_table_rule_add(struct pipeline_data *p,
2715 	struct pipeline_msg_req *req)
2716 {
2717 	union table_rule_match_low_level match_ll;
2718 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2719 	struct table_rule_match *match = &req->table_rule_add.match;
2720 	struct table_rule_action *action = &req->table_rule_add.action;
2721 	struct rte_pipeline_table_entry *data_in, *data_out;
2722 	uint32_t table_id = req->id;
2723 	int key_found, status;
2724 	struct rte_table_action *a = p->table_data[table_id].a;
2725 
2726 	/* Apply actions */
2727 	memset(p->buffer, 0, sizeof(p->buffer));
2728 	data_in = (struct rte_pipeline_table_entry *) p->buffer;
2729 
2730 	status = match_convert(match, &match_ll, 1);
2731 	if (status) {
2732 		rsp->status = -1;
2733 		return rsp;
2734 	}
2735 
2736 	status = action_convert(a, action, data_in);
2737 	if (status) {
2738 		rsp->status = -1;
2739 		return rsp;
2740 	}
2741 
2742 	status = rte_pipeline_table_entry_add(p->p,
2743 		table_id,
2744 		&match_ll,
2745 		data_in,
2746 		&key_found,
2747 		&data_out);
2748 	if (status) {
2749 		rsp->status = -1;
2750 		return rsp;
2751 	}
2752 
2753 	/* Write response */
2754 	rsp->status = 0;
2755 	rsp->table_rule_add.data = data_out;
2756 
2757 	return rsp;
2758 }
2759 
2760 static struct pipeline_msg_rsp *
2761 pipeline_msg_handle_table_rule_add_default(struct pipeline_data *p,
2762 	struct pipeline_msg_req *req)
2763 {
2764 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2765 	struct table_rule_action *action = &req->table_rule_add_default.action;
2766 	struct rte_pipeline_table_entry *data_in, *data_out;
2767 	uint32_t table_id = req->id;
2768 	int status;
2769 
2770 	/* Apply actions */
2771 	memset(p->buffer, 0, sizeof(p->buffer));
2772 	data_in = (struct rte_pipeline_table_entry *) p->buffer;
2773 
2774 	data_in->action = action->fwd.action;
2775 	if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
2776 		data_in->port_id = action->fwd.id;
2777 	if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
2778 		data_in->table_id = action->fwd.id;
2779 
2780 	/* Add default rule to table */
2781 	status = rte_pipeline_table_default_entry_add(p->p,
2782 		table_id,
2783 		data_in,
2784 		&data_out);
2785 	if (status) {
2786 		rsp->status = -1;
2787 		return rsp;
2788 	}
2789 
2790 	/* Write response */
2791 	rsp->status = 0;
2792 	rsp->table_rule_add_default.data = data_out;
2793 
2794 	return rsp;
2795 }
2796 
2797 static struct pipeline_msg_rsp *
2798 pipeline_msg_handle_table_rule_add_bulk(struct pipeline_data *p,
2799 	struct pipeline_msg_req *req)
2800 {
2801 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2802 
2803 	uint32_t table_id = req->id;
2804 	struct table_rule_list *list = req->table_rule_add_bulk.list;
2805 	uint32_t bulk = req->table_rule_add_bulk.bulk;
2806 
2807 	uint32_t n_rules_added;
2808 	int status;
2809 
2810 	struct table_ll table_ll = {
2811 		.p = p->p,
2812 		.table_id = table_id,
2813 		.a = p->table_data[table_id].a,
2814 		.bulk_supported = bulk,
2815 	};
2816 
2817 	status = table_rule_add_bulk_ll(&table_ll, list, &n_rules_added);
2818 	if (status) {
2819 		rsp->status = -1;
2820 		rsp->table_rule_add_bulk.n_rules = 0;
2821 		return rsp;
2822 	}
2823 
2824 	/* Write response */
2825 	rsp->status = 0;
2826 	rsp->table_rule_add_bulk.n_rules = n_rules_added;
2827 	return rsp;
2828 }
2829 
2830 static struct pipeline_msg_rsp *
2831 pipeline_msg_handle_table_rule_delete(struct pipeline_data *p,
2832 	struct pipeline_msg_req *req)
2833 {
2834 	union table_rule_match_low_level match_ll;
2835 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2836 	struct table_rule_match *match = &req->table_rule_delete.match;
2837 	uint32_t table_id = req->id;
2838 	int key_found, status;
2839 
2840 	status = match_convert(match, &match_ll, 0);
2841 	if (status) {
2842 		rsp->status = -1;
2843 		return rsp;
2844 	}
2845 
2846 	rsp->status = rte_pipeline_table_entry_delete(p->p,
2847 		table_id,
2848 		&match_ll,
2849 		&key_found,
2850 		NULL);
2851 
2852 	return rsp;
2853 }
2854 
2855 static struct pipeline_msg_rsp *
2856 pipeline_msg_handle_table_rule_delete_default(struct pipeline_data *p,
2857 	struct pipeline_msg_req *req)
2858 {
2859 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2860 	uint32_t table_id = req->id;
2861 
2862 	rsp->status = rte_pipeline_table_default_entry_delete(p->p,
2863 		table_id,
2864 		NULL);
2865 
2866 	return rsp;
2867 }
2868 
2869 static struct pipeline_msg_rsp *
2870 pipeline_msg_handle_table_rule_stats_read(struct pipeline_data *p,
2871 	struct pipeline_msg_req *req)
2872 {
2873 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2874 	uint32_t table_id = req->id;
2875 	void *data = req->table_rule_stats_read.data;
2876 	int clear = req->table_rule_stats_read.clear;
2877 	struct rte_table_action *a = p->table_data[table_id].a;
2878 
2879 	rsp->status = rte_table_action_stats_read(a,
2880 		data,
2881 		&rsp->table_rule_stats_read.stats,
2882 		clear);
2883 
2884 	return rsp;
2885 }
2886 
2887 static struct pipeline_msg_rsp *
2888 pipeline_msg_handle_table_mtr_profile_add(struct pipeline_data *p,
2889 	struct pipeline_msg_req *req)
2890 {
2891 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2892 	uint32_t table_id = req->id;
2893 	uint32_t meter_profile_id = req->table_mtr_profile_add.meter_profile_id;
2894 	struct rte_table_action_meter_profile *profile =
2895 		&req->table_mtr_profile_add.profile;
2896 	struct rte_table_action *a = p->table_data[table_id].a;
2897 
2898 	rsp->status = rte_table_action_meter_profile_add(a,
2899 		meter_profile_id,
2900 		profile);
2901 
2902 	return rsp;
2903 }
2904 
2905 static struct pipeline_msg_rsp *
2906 pipeline_msg_handle_table_mtr_profile_delete(struct pipeline_data *p,
2907 	struct pipeline_msg_req *req)
2908 {
2909 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2910 	uint32_t table_id = req->id;
2911 	uint32_t meter_profile_id =
2912 		req->table_mtr_profile_delete.meter_profile_id;
2913 	struct rte_table_action *a = p->table_data[table_id].a;
2914 
2915 	rsp->status = rte_table_action_meter_profile_delete(a,
2916 		meter_profile_id);
2917 
2918 	return rsp;
2919 }
2920 
2921 static struct pipeline_msg_rsp *
2922 pipeline_msg_handle_table_rule_mtr_read(struct pipeline_data *p,
2923 	struct pipeline_msg_req *req)
2924 {
2925 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2926 	uint32_t table_id = req->id;
2927 	void *data = req->table_rule_mtr_read.data;
2928 	uint32_t tc_mask = req->table_rule_mtr_read.tc_mask;
2929 	int clear = req->table_rule_mtr_read.clear;
2930 	struct rte_table_action *a = p->table_data[table_id].a;
2931 
2932 	rsp->status = rte_table_action_meter_read(a,
2933 		data,
2934 		tc_mask,
2935 		&rsp->table_rule_mtr_read.stats,
2936 		clear);
2937 
2938 	return rsp;
2939 }
2940 
2941 static struct pipeline_msg_rsp *
2942 pipeline_msg_handle_table_dscp_table_update(struct pipeline_data *p,
2943 	struct pipeline_msg_req *req)
2944 {
2945 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2946 	uint32_t table_id = req->id;
2947 	uint64_t dscp_mask = req->table_dscp_table_update.dscp_mask;
2948 	struct rte_table_action_dscp_table *dscp_table =
2949 		&req->table_dscp_table_update.dscp_table;
2950 	struct rte_table_action *a = p->table_data[table_id].a;
2951 
2952 	rsp->status = rte_table_action_dscp_table_update(a,
2953 		dscp_mask,
2954 		dscp_table);
2955 
2956 	return rsp;
2957 }
2958 
2959 static struct pipeline_msg_rsp *
2960 pipeline_msg_handle_table_rule_ttl_read(struct pipeline_data *p,
2961 	struct pipeline_msg_req *req)
2962 {
2963 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2964 	uint32_t table_id = req->id;
2965 	void *data = req->table_rule_ttl_read.data;
2966 	int clear = req->table_rule_ttl_read.clear;
2967 	struct rte_table_action *a = p->table_data[table_id].a;
2968 
2969 	rsp->status = rte_table_action_ttl_read(a,
2970 		data,
2971 		&rsp->table_rule_ttl_read.stats,
2972 		clear);
2973 
2974 	return rsp;
2975 }
2976 
2977 static struct pipeline_msg_rsp *
2978 pipeline_msg_handle_table_rule_time_read(struct pipeline_data *p,
2979 	struct pipeline_msg_req *req)
2980 {
2981 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2982 	uint32_t table_id = req->id;
2983 	void *data = req->table_rule_time_read.data;
2984 	struct rte_table_action *a = p->table_data[table_id].a;
2985 
2986 	rsp->status = rte_table_action_time_read(a,
2987 		data,
2988 		&rsp->table_rule_time_read.timestamp);
2989 
2990 	return rsp;
2991 }
2992 
2993 static void
2994 pipeline_msg_handle(struct pipeline_data *p)
2995 {
2996 	for ( ; ; ) {
2997 		struct pipeline_msg_req *req;
2998 		struct pipeline_msg_rsp *rsp;
2999 
3000 		req = pipeline_msg_recv(p->msgq_req);
3001 		if (req == NULL)
3002 			break;
3003 
3004 		switch (req->type) {
3005 		case PIPELINE_REQ_PORT_IN_STATS_READ:
3006 			rsp = pipeline_msg_handle_port_in_stats_read(p, req);
3007 			break;
3008 
3009 		case PIPELINE_REQ_PORT_IN_ENABLE:
3010 			rsp = pipeline_msg_handle_port_in_enable(p, req);
3011 			break;
3012 
3013 		case PIPELINE_REQ_PORT_IN_DISABLE:
3014 			rsp = pipeline_msg_handle_port_in_disable(p, req);
3015 			break;
3016 
3017 		case PIPELINE_REQ_PORT_OUT_STATS_READ:
3018 			rsp = pipeline_msg_handle_port_out_stats_read(p, req);
3019 			break;
3020 
3021 		case PIPELINE_REQ_TABLE_STATS_READ:
3022 			rsp = pipeline_msg_handle_table_stats_read(p, req);
3023 			break;
3024 
3025 		case PIPELINE_REQ_TABLE_RULE_ADD:
3026 			rsp = pipeline_msg_handle_table_rule_add(p, req);
3027 			break;
3028 
3029 		case PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT:
3030 			rsp = pipeline_msg_handle_table_rule_add_default(p,	req);
3031 			break;
3032 
3033 		case PIPELINE_REQ_TABLE_RULE_ADD_BULK:
3034 			rsp = pipeline_msg_handle_table_rule_add_bulk(p, req);
3035 			break;
3036 
3037 		case PIPELINE_REQ_TABLE_RULE_DELETE:
3038 			rsp = pipeline_msg_handle_table_rule_delete(p, req);
3039 			break;
3040 
3041 		case PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT:
3042 			rsp = pipeline_msg_handle_table_rule_delete_default(p, req);
3043 			break;
3044 
3045 		case PIPELINE_REQ_TABLE_RULE_STATS_READ:
3046 			rsp = pipeline_msg_handle_table_rule_stats_read(p, req);
3047 			break;
3048 
3049 		case PIPELINE_REQ_TABLE_MTR_PROFILE_ADD:
3050 			rsp = pipeline_msg_handle_table_mtr_profile_add(p, req);
3051 			break;
3052 
3053 		case PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE:
3054 			rsp = pipeline_msg_handle_table_mtr_profile_delete(p, req);
3055 			break;
3056 
3057 		case PIPELINE_REQ_TABLE_RULE_MTR_READ:
3058 			rsp = pipeline_msg_handle_table_rule_mtr_read(p, req);
3059 			break;
3060 
3061 		case PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE:
3062 			rsp = pipeline_msg_handle_table_dscp_table_update(p, req);
3063 			break;
3064 
3065 		case PIPELINE_REQ_TABLE_RULE_TTL_READ:
3066 			rsp = pipeline_msg_handle_table_rule_ttl_read(p, req);
3067 			break;
3068 
3069 		case PIPELINE_REQ_TABLE_RULE_TIME_READ:
3070 			rsp = pipeline_msg_handle_table_rule_time_read(p, req);
3071 			break;
3072 
3073 		default:
3074 			rsp = (struct pipeline_msg_rsp *) req;
3075 			rsp->status = -1;
3076 		}
3077 
3078 		pipeline_msg_send(p->msgq_rsp, rsp);
3079 	}
3080 }
3081 
3082 /**
3083  * Data plane threads: main
3084  */
3085 int
3086 thread_main(void *arg __rte_unused)
3087 {
3088 	struct thread_data *t;
3089 	uint32_t thread_id, i;
3090 
3091 	thread_id = rte_lcore_id();
3092 	t = &thread_data[thread_id];
3093 
3094 	/* Dispatch loop */
3095 	for (i = 0; ; i++) {
3096 		uint32_t j;
3097 
3098 		/* Data Plane */
3099 		for (j = 0; j < t->n_pipelines; j++)
3100 			rte_pipeline_run(t->p[j]);
3101 
3102 		/* Control Plane */
3103 		if ((i & 0xF) == 0) {
3104 			uint64_t time = rte_get_tsc_cycles();
3105 			uint64_t time_next_min = UINT64_MAX;
3106 
3107 			if (time < t->time_next_min)
3108 				continue;
3109 
3110 			/* Pipeline message queues */
3111 			for (j = 0; j < t->n_pipelines; j++) {
3112 				struct pipeline_data *p =
3113 					&t->pipeline_data[j];
3114 				uint64_t time_next = p->time_next;
3115 
3116 				if (time_next <= time) {
3117 					pipeline_msg_handle(p);
3118 					rte_pipeline_flush(p->p);
3119 					time_next = time + p->timer_period;
3120 					p->time_next = time_next;
3121 				}
3122 
3123 				if (time_next < time_next_min)
3124 					time_next_min = time_next;
3125 			}
3126 
3127 			/* Thread message queues */
3128 			{
3129 				uint64_t time_next = t->time_next;
3130 
3131 				if (time_next <= time) {
3132 					thread_msg_handle(t);
3133 					time_next = time + t->timer_period;
3134 					t->time_next = time_next;
3135 				}
3136 
3137 				if (time_next < time_next_min)
3138 					time_next_min = time_next;
3139 			}
3140 
3141 			t->time_next_min = time_next_min;
3142 		}
3143 	}
3144 
3145 	return 0;
3146 }
3147