xref: /dpdk/examples/ip_pipeline/thread.c (revision 5ac1abdd37aa43692603cd8670111c354014766f)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2018 Intel Corporation
3  */
4 
5 #include <stdlib.h>
6 
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11 
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17 
18 #include "common.h"
19 #include "thread.h"
20 #include "pipeline.h"
21 
22 #ifndef THREAD_PIPELINES_MAX
23 #define THREAD_PIPELINES_MAX                               256
24 #endif
25 
26 #ifndef THREAD_MSGQ_SIZE
27 #define THREAD_MSGQ_SIZE                                   64
28 #endif
29 
30 #ifndef THREAD_TIMER_PERIOD_MS
31 #define THREAD_TIMER_PERIOD_MS                             100
32 #endif
33 
34 /**
35  * Main thread: data plane thread context
36  */
37 struct thread {
38 	struct rte_ring *msgq_req;
39 	struct rte_ring *msgq_rsp;
40 
41 	uint32_t enabled;
42 };
43 
44 static struct thread thread[RTE_MAX_LCORE];
45 
46 /**
47  * Data plane threads: context
48  */
49 struct table_data {
50 	struct rte_table_action *a;
51 };
52 
53 struct pipeline_data {
54 	struct rte_pipeline *p;
55 	struct table_data table_data[RTE_PIPELINE_TABLE_MAX];
56 	uint32_t n_tables;
57 
58 	struct rte_ring *msgq_req;
59 	struct rte_ring *msgq_rsp;
60 	uint64_t timer_period; /* Measured in CPU cycles. */
61 	uint64_t time_next;
62 
63 	uint8_t buffer[TABLE_RULE_ACTION_SIZE_MAX];
64 };
65 
66 struct __rte_cache_aligned thread_data {
67 	struct rte_pipeline *p[THREAD_PIPELINES_MAX];
68 	uint32_t n_pipelines;
69 
70 	struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
71 	struct rte_ring *msgq_req;
72 	struct rte_ring *msgq_rsp;
73 	uint64_t timer_period; /* Measured in CPU cycles. */
74 	uint64_t time_next;
75 	uint64_t time_next_min;
76 };
77 
78 static struct thread_data thread_data[RTE_MAX_LCORE];
79 
80 /**
81  * Main thread: data plane thread init
82  */
83 static void
84 thread_free(void)
85 {
86 	uint32_t i;
87 
88 	for (i = 0; i < RTE_MAX_LCORE; i++) {
89 		struct thread *t = &thread[i];
90 
91 		if (!rte_lcore_is_enabled(i))
92 			continue;
93 
94 		/* MSGQs */
95 		rte_ring_free(t->msgq_req);
96 
97 		rte_ring_free(t->msgq_rsp);
98 	}
99 }
100 
101 int
102 thread_init(void)
103 {
104 	uint32_t i;
105 
106 	RTE_LCORE_FOREACH_WORKER(i) {
107 		char name[NAME_MAX];
108 		struct rte_ring *msgq_req, *msgq_rsp;
109 		struct thread *t = &thread[i];
110 		struct thread_data *t_data = &thread_data[i];
111 		uint32_t cpu_id = rte_lcore_to_socket_id(i);
112 
113 		/* MSGQs */
114 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
115 
116 		msgq_req = rte_ring_create(name,
117 			THREAD_MSGQ_SIZE,
118 			cpu_id,
119 			RING_F_SP_ENQ | RING_F_SC_DEQ);
120 
121 		if (msgq_req == NULL) {
122 			thread_free();
123 			return -1;
124 		}
125 
126 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
127 
128 		msgq_rsp = rte_ring_create(name,
129 			THREAD_MSGQ_SIZE,
130 			cpu_id,
131 			RING_F_SP_ENQ | RING_F_SC_DEQ);
132 
133 		if (msgq_rsp == NULL) {
134 			thread_free();
135 			return -1;
136 		}
137 
138 		/* Main thread records */
139 		t->msgq_req = msgq_req;
140 		t->msgq_rsp = msgq_rsp;
141 		t->enabled = 1;
142 
143 		/* Data plane thread records */
144 		t_data->n_pipelines = 0;
145 		t_data->msgq_req = msgq_req;
146 		t_data->msgq_rsp = msgq_rsp;
147 		t_data->timer_period =
148 			(rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
149 		t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
150 		t_data->time_next_min = t_data->time_next;
151 	}
152 
153 	return 0;
154 }
155 
156 static inline int
157 thread_is_running(uint32_t thread_id)
158 {
159 	enum rte_lcore_state_t thread_state;
160 
161 	thread_state = rte_eal_get_lcore_state(thread_id);
162 	return (thread_state == RUNNING) ? 1 : 0;
163 }
164 
165 /**
166  * Pipeline is running when:
167  *    (A) Pipeline is mapped to a data plane thread AND
168  *    (B) Its data plane thread is in RUNNING state.
169  */
170 static inline int
171 pipeline_is_running(struct pipeline *p)
172 {
173 	if (p->enabled == 0)
174 		return 0;
175 
176 	return thread_is_running(p->thread_id);
177 }
178 
179 /**
180  * Main thread & data plane threads: message passing
181  */
182 enum thread_req_type {
183 	THREAD_REQ_PIPELINE_ENABLE = 0,
184 	THREAD_REQ_PIPELINE_DISABLE,
185 	THREAD_REQ_MAX
186 };
187 
188 struct thread_msg_req {
189 	enum thread_req_type type;
190 
191 	union {
192 		struct {
193 			struct rte_pipeline *p;
194 			struct {
195 				struct rte_table_action *a;
196 			} table[RTE_PIPELINE_TABLE_MAX];
197 			struct rte_ring *msgq_req;
198 			struct rte_ring *msgq_rsp;
199 			uint32_t timer_period_ms;
200 			uint32_t n_tables;
201 		} pipeline_enable;
202 
203 		struct {
204 			struct rte_pipeline *p;
205 		} pipeline_disable;
206 	};
207 };
208 
209 struct thread_msg_rsp {
210 	int status;
211 };
212 
213 /**
214  * Main thread
215  */
216 static struct thread_msg_req *
217 thread_msg_alloc(void)
218 {
219 	size_t size = RTE_MAX(sizeof(struct thread_msg_req),
220 		sizeof(struct thread_msg_rsp));
221 
222 	return calloc(1, size);
223 }
224 
225 static void
226 thread_msg_free(struct thread_msg_rsp *rsp)
227 {
228 	free(rsp);
229 }
230 
231 static struct thread_msg_rsp *
232 thread_msg_send_recv(uint32_t thread_id,
233 	struct thread_msg_req *req)
234 {
235 	struct thread *t = &thread[thread_id];
236 	struct rte_ring *msgq_req = t->msgq_req;
237 	struct rte_ring *msgq_rsp = t->msgq_rsp;
238 	struct thread_msg_rsp *rsp;
239 	int status;
240 
241 	/* send */
242 	do {
243 		status = rte_ring_sp_enqueue(msgq_req, req);
244 	} while (status == -ENOBUFS);
245 
246 	/* recv */
247 	do {
248 		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
249 	} while (status != 0);
250 
251 	return rsp;
252 }
253 
254 int
255 thread_pipeline_enable(uint32_t thread_id,
256 	const char *pipeline_name)
257 {
258 	struct pipeline *p = pipeline_find(pipeline_name);
259 	struct thread *t;
260 	struct thread_msg_req *req;
261 	struct thread_msg_rsp *rsp;
262 	uint32_t i;
263 	int status;
264 
265 	/* Check input params */
266 	if ((thread_id >= RTE_MAX_LCORE) ||
267 		(p == NULL) ||
268 		(p->n_ports_in == 0) ||
269 		(p->n_ports_out == 0) ||
270 		(p->n_tables == 0))
271 		return -1;
272 
273 	t = &thread[thread_id];
274 	if ((t->enabled == 0) ||
275 		p->enabled)
276 		return -1;
277 
278 	if (!thread_is_running(thread_id)) {
279 		struct thread_data *td = &thread_data[thread_id];
280 		struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
281 
282 		if (td->n_pipelines >= THREAD_PIPELINES_MAX)
283 			return -1;
284 
285 		/* Data plane thread */
286 		td->p[td->n_pipelines] = p->p;
287 
288 		tdp->p = p->p;
289 		for (i = 0; i < p->n_tables; i++)
290 			tdp->table_data[i].a = p->table[i].a;
291 
292 		tdp->n_tables = p->n_tables;
293 
294 		tdp->msgq_req = p->msgq_req;
295 		tdp->msgq_rsp = p->msgq_rsp;
296 		tdp->timer_period = (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
297 		tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
298 
299 		td->n_pipelines++;
300 
301 		/* Pipeline */
302 		p->thread_id = thread_id;
303 		p->enabled = 1;
304 
305 		return 0;
306 	}
307 
308 	/* Allocate request */
309 	req = thread_msg_alloc();
310 	if (req == NULL)
311 		return -1;
312 
313 	/* Write request */
314 	req->type = THREAD_REQ_PIPELINE_ENABLE;
315 	req->pipeline_enable.p = p->p;
316 	for (i = 0; i < p->n_tables; i++)
317 		req->pipeline_enable.table[i].a =
318 			p->table[i].a;
319 	req->pipeline_enable.msgq_req = p->msgq_req;
320 	req->pipeline_enable.msgq_rsp = p->msgq_rsp;
321 	req->pipeline_enable.timer_period_ms = p->timer_period_ms;
322 	req->pipeline_enable.n_tables = p->n_tables;
323 
324 	/* Send request and wait for response */
325 	rsp = thread_msg_send_recv(thread_id, req);
326 
327 	/* Read response */
328 	status = rsp->status;
329 
330 	/* Free response */
331 	thread_msg_free(rsp);
332 
333 	/* Request completion */
334 	if (status)
335 		return status;
336 
337 	p->thread_id = thread_id;
338 	p->enabled = 1;
339 
340 	return 0;
341 }
342 
343 int
344 thread_pipeline_disable(uint32_t thread_id,
345 	const char *pipeline_name)
346 {
347 	struct pipeline *p = pipeline_find(pipeline_name);
348 	struct thread *t;
349 	struct thread_msg_req *req;
350 	struct thread_msg_rsp *rsp;
351 	int status;
352 
353 	/* Check input params */
354 	if ((thread_id >= RTE_MAX_LCORE) ||
355 		(p == NULL))
356 		return -1;
357 
358 	t = &thread[thread_id];
359 	if (t->enabled == 0)
360 		return -1;
361 
362 	if (p->enabled == 0)
363 		return 0;
364 
365 	if (p->thread_id != thread_id)
366 		return -1;
367 
368 	if (!thread_is_running(thread_id)) {
369 		struct thread_data *td = &thread_data[thread_id];
370 		uint32_t i;
371 
372 		for (i = 0; i < td->n_pipelines; i++) {
373 			struct pipeline_data *tdp = &td->pipeline_data[i];
374 
375 			if (tdp->p != p->p)
376 				continue;
377 
378 			/* Data plane thread */
379 			if (i < td->n_pipelines - 1) {
380 				struct rte_pipeline *pipeline_last =
381 					td->p[td->n_pipelines - 1];
382 				struct pipeline_data *tdp_last =
383 					&td->pipeline_data[td->n_pipelines - 1];
384 
385 				td->p[i] = pipeline_last;
386 				memcpy(tdp, tdp_last, sizeof(*tdp));
387 			}
388 
389 			td->n_pipelines--;
390 
391 			/* Pipeline */
392 			p->enabled = 0;
393 
394 			break;
395 		}
396 
397 		return 0;
398 	}
399 
400 	/* Allocate request */
401 	req = thread_msg_alloc();
402 	if (req == NULL)
403 		return -1;
404 
405 	/* Write request */
406 	req->type = THREAD_REQ_PIPELINE_DISABLE;
407 	req->pipeline_disable.p = p->p;
408 
409 	/* Send request and wait for response */
410 	rsp = thread_msg_send_recv(thread_id, req);
411 
412 	/* Read response */
413 	status = rsp->status;
414 
415 	/* Free response */
416 	thread_msg_free(rsp);
417 
418 	/* Request completion */
419 	if (status)
420 		return status;
421 
422 	p->enabled = 0;
423 
424 	return 0;
425 }
426 
427 /**
428  * Data plane threads: message handling
429  */
430 static inline struct thread_msg_req *
431 thread_msg_recv(struct rte_ring *msgq_req)
432 {
433 	struct thread_msg_req *req = NULL;
434 
435 	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
436 
437 	if (status != 0)
438 		return NULL;
439 
440 	return req;
441 }
442 
443 static inline void
444 thread_msg_send(struct rte_ring *msgq_rsp,
445 	struct thread_msg_rsp *rsp)
446 {
447 	int status;
448 
449 	do {
450 		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
451 	} while (status == -ENOBUFS);
452 }
453 
454 static struct thread_msg_rsp *
455 thread_msg_handle_pipeline_enable(struct thread_data *t,
456 	struct thread_msg_req *req)
457 {
458 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
459 	struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
460 	uint32_t i;
461 
462 	/* Request */
463 	if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
464 		rsp->status = -1;
465 		return rsp;
466 	}
467 
468 	t->p[t->n_pipelines] = req->pipeline_enable.p;
469 
470 	p->p = req->pipeline_enable.p;
471 	for (i = 0; i < req->pipeline_enable.n_tables; i++)
472 		p->table_data[i].a =
473 			req->pipeline_enable.table[i].a;
474 
475 	p->n_tables = req->pipeline_enable.n_tables;
476 
477 	p->msgq_req = req->pipeline_enable.msgq_req;
478 	p->msgq_rsp = req->pipeline_enable.msgq_rsp;
479 	p->timer_period =
480 		(rte_get_tsc_hz() * req->pipeline_enable.timer_period_ms) / 1000;
481 	p->time_next = rte_get_tsc_cycles() + p->timer_period;
482 
483 	t->n_pipelines++;
484 
485 	/* Response */
486 	rsp->status = 0;
487 	return rsp;
488 }
489 
490 static struct thread_msg_rsp *
491 thread_msg_handle_pipeline_disable(struct thread_data *t,
492 	struct thread_msg_req *req)
493 {
494 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
495 	uint32_t n_pipelines = t->n_pipelines;
496 	struct rte_pipeline *pipeline = req->pipeline_disable.p;
497 	uint32_t i;
498 
499 	/* find pipeline */
500 	for (i = 0; i < n_pipelines; i++) {
501 		struct pipeline_data *p = &t->pipeline_data[i];
502 
503 		if (p->p != pipeline)
504 			continue;
505 
506 		if (i < n_pipelines - 1) {
507 			struct rte_pipeline *pipeline_last =
508 				t->p[n_pipelines - 1];
509 			struct pipeline_data *p_last =
510 				&t->pipeline_data[n_pipelines - 1];
511 
512 			t->p[i] = pipeline_last;
513 			memcpy(p, p_last, sizeof(*p));
514 		}
515 
516 		t->n_pipelines--;
517 
518 		rsp->status = 0;
519 		return rsp;
520 	}
521 
522 	/* should not get here */
523 	rsp->status = 0;
524 	return rsp;
525 }
526 
527 static void
528 thread_msg_handle(struct thread_data *t)
529 {
530 	for ( ; ; ) {
531 		struct thread_msg_req *req;
532 		struct thread_msg_rsp *rsp;
533 
534 		req = thread_msg_recv(t->msgq_req);
535 		if (req == NULL)
536 			break;
537 
538 		switch (req->type) {
539 		case THREAD_REQ_PIPELINE_ENABLE:
540 			rsp = thread_msg_handle_pipeline_enable(t, req);
541 			break;
542 
543 		case THREAD_REQ_PIPELINE_DISABLE:
544 			rsp = thread_msg_handle_pipeline_disable(t, req);
545 			break;
546 
547 		default:
548 			rsp = (struct thread_msg_rsp *) req;
549 			rsp->status = -1;
550 		}
551 
552 		thread_msg_send(t->msgq_rsp, rsp);
553 	}
554 }
555 
556 /**
557  * Main thread & data plane threads: message passing
558  */
559 enum pipeline_req_type {
560 	/* Port IN */
561 	PIPELINE_REQ_PORT_IN_STATS_READ,
562 	PIPELINE_REQ_PORT_IN_ENABLE,
563 	PIPELINE_REQ_PORT_IN_DISABLE,
564 
565 	/* Port OUT */
566 	PIPELINE_REQ_PORT_OUT_STATS_READ,
567 
568 	/* Table */
569 	PIPELINE_REQ_TABLE_STATS_READ,
570 	PIPELINE_REQ_TABLE_RULE_ADD,
571 	PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT,
572 	PIPELINE_REQ_TABLE_RULE_ADD_BULK,
573 	PIPELINE_REQ_TABLE_RULE_DELETE,
574 	PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT,
575 	PIPELINE_REQ_TABLE_RULE_STATS_READ,
576 	PIPELINE_REQ_TABLE_MTR_PROFILE_ADD,
577 	PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE,
578 	PIPELINE_REQ_TABLE_RULE_MTR_READ,
579 	PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE,
580 	PIPELINE_REQ_TABLE_RULE_TTL_READ,
581 	PIPELINE_REQ_TABLE_RULE_TIME_READ,
582 	PIPELINE_REQ_MAX
583 };
584 
585 struct pipeline_msg_req_port_in_stats_read {
586 	int clear;
587 };
588 
589 struct pipeline_msg_req_port_out_stats_read {
590 	int clear;
591 };
592 
593 struct pipeline_msg_req_table_stats_read {
594 	int clear;
595 };
596 
597 struct pipeline_msg_req_table_rule_add {
598 	struct table_rule_match match;
599 	struct table_rule_action action;
600 };
601 
602 struct pipeline_msg_req_table_rule_add_default {
603 	struct table_rule_action action;
604 };
605 
606 struct pipeline_msg_req_table_rule_add_bulk {
607 	struct table_rule_list *list;
608 	int bulk;
609 };
610 
611 struct pipeline_msg_req_table_rule_delete {
612 	struct table_rule_match match;
613 };
614 
615 struct pipeline_msg_req_table_rule_stats_read {
616 	void *data;
617 	int clear;
618 };
619 
620 struct pipeline_msg_req_table_mtr_profile_add {
621 	uint32_t meter_profile_id;
622 	struct rte_table_action_meter_profile profile;
623 };
624 
625 struct pipeline_msg_req_table_mtr_profile_delete {
626 	uint32_t meter_profile_id;
627 };
628 
629 struct pipeline_msg_req_table_rule_mtr_read {
630 	void *data;
631 	uint32_t tc_mask;
632 	int clear;
633 };
634 
635 struct pipeline_msg_req_table_dscp_table_update {
636 	uint64_t dscp_mask;
637 	struct rte_table_action_dscp_table dscp_table;
638 };
639 
640 struct pipeline_msg_req_table_rule_ttl_read {
641 	void *data;
642 	int clear;
643 };
644 
645 struct pipeline_msg_req_table_rule_time_read {
646 	void *data;
647 };
648 
649 struct pipeline_msg_req {
650 	enum pipeline_req_type type;
651 	uint32_t id; /* Port IN, port OUT or table ID */
652 
653 	union {
654 		struct pipeline_msg_req_port_in_stats_read port_in_stats_read;
655 		struct pipeline_msg_req_port_out_stats_read port_out_stats_read;
656 		struct pipeline_msg_req_table_stats_read table_stats_read;
657 		struct pipeline_msg_req_table_rule_add table_rule_add;
658 		struct pipeline_msg_req_table_rule_add_default table_rule_add_default;
659 		struct pipeline_msg_req_table_rule_add_bulk table_rule_add_bulk;
660 		struct pipeline_msg_req_table_rule_delete table_rule_delete;
661 		struct pipeline_msg_req_table_rule_stats_read table_rule_stats_read;
662 		struct pipeline_msg_req_table_mtr_profile_add table_mtr_profile_add;
663 		struct pipeline_msg_req_table_mtr_profile_delete table_mtr_profile_delete;
664 		struct pipeline_msg_req_table_rule_mtr_read table_rule_mtr_read;
665 		struct pipeline_msg_req_table_dscp_table_update table_dscp_table_update;
666 		struct pipeline_msg_req_table_rule_ttl_read table_rule_ttl_read;
667 		struct pipeline_msg_req_table_rule_time_read table_rule_time_read;
668 	};
669 };
670 
671 struct pipeline_msg_rsp_port_in_stats_read {
672 	struct rte_pipeline_port_in_stats stats;
673 };
674 
675 struct pipeline_msg_rsp_port_out_stats_read {
676 	struct rte_pipeline_port_out_stats stats;
677 };
678 
679 struct pipeline_msg_rsp_table_stats_read {
680 	struct rte_pipeline_table_stats stats;
681 };
682 
683 struct pipeline_msg_rsp_table_rule_add {
684 	void *data;
685 };
686 
687 struct pipeline_msg_rsp_table_rule_add_default {
688 	void *data;
689 };
690 
691 struct pipeline_msg_rsp_table_rule_add_bulk {
692 	uint32_t n_rules;
693 };
694 
695 struct pipeline_msg_rsp_table_rule_stats_read {
696 	struct rte_table_action_stats_counters stats;
697 };
698 
699 struct pipeline_msg_rsp_table_rule_mtr_read {
700 	struct rte_table_action_mtr_counters stats;
701 };
702 
703 struct pipeline_msg_rsp_table_rule_ttl_read {
704 	struct rte_table_action_ttl_counters stats;
705 };
706 
707 struct pipeline_msg_rsp_table_rule_time_read {
708 	uint64_t timestamp;
709 };
710 
711 struct pipeline_msg_rsp {
712 	int status;
713 
714 	union {
715 		struct pipeline_msg_rsp_port_in_stats_read port_in_stats_read;
716 		struct pipeline_msg_rsp_port_out_stats_read port_out_stats_read;
717 		struct pipeline_msg_rsp_table_stats_read table_stats_read;
718 		struct pipeline_msg_rsp_table_rule_add table_rule_add;
719 		struct pipeline_msg_rsp_table_rule_add_default table_rule_add_default;
720 		struct pipeline_msg_rsp_table_rule_add_bulk table_rule_add_bulk;
721 		struct pipeline_msg_rsp_table_rule_stats_read table_rule_stats_read;
722 		struct pipeline_msg_rsp_table_rule_mtr_read table_rule_mtr_read;
723 		struct pipeline_msg_rsp_table_rule_ttl_read table_rule_ttl_read;
724 		struct pipeline_msg_rsp_table_rule_time_read table_rule_time_read;
725 	};
726 };
727 
728 /**
729  * Main thread
730  */
731 static struct pipeline_msg_req *
732 pipeline_msg_alloc(void)
733 {
734 	size_t size = RTE_MAX(sizeof(struct pipeline_msg_req),
735 		sizeof(struct pipeline_msg_rsp));
736 
737 	return calloc(1, size);
738 }
739 
740 static void
741 pipeline_msg_free(struct pipeline_msg_rsp *rsp)
742 {
743 	free(rsp);
744 }
745 
746 static struct pipeline_msg_rsp *
747 pipeline_msg_send_recv(struct pipeline *p,
748 	struct pipeline_msg_req *req)
749 {
750 	struct rte_ring *msgq_req = p->msgq_req;
751 	struct rte_ring *msgq_rsp = p->msgq_rsp;
752 	struct pipeline_msg_rsp *rsp;
753 	int status;
754 
755 	/* send */
756 	do {
757 		status = rte_ring_sp_enqueue(msgq_req, req);
758 	} while (status == -ENOBUFS);
759 
760 	/* recv */
761 	do {
762 		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
763 	} while (status != 0);
764 
765 	return rsp;
766 }
767 
768 int
769 pipeline_port_in_stats_read(const char *pipeline_name,
770 	uint32_t port_id,
771 	struct rte_pipeline_port_in_stats *stats,
772 	int clear)
773 {
774 	struct pipeline *p;
775 	struct pipeline_msg_req *req;
776 	struct pipeline_msg_rsp *rsp;
777 	int status;
778 
779 	/* Check input params */
780 	if ((pipeline_name == NULL) ||
781 		(stats == NULL))
782 		return -1;
783 
784 	p = pipeline_find(pipeline_name);
785 	if ((p == NULL) ||
786 		(port_id >= p->n_ports_in))
787 		return -1;
788 
789 	if (!pipeline_is_running(p)) {
790 		status = rte_pipeline_port_in_stats_read(p->p,
791 			port_id,
792 			stats,
793 			clear);
794 
795 		return status;
796 	}
797 
798 	/* Allocate request */
799 	req = pipeline_msg_alloc();
800 	if (req == NULL)
801 		return -1;
802 
803 	/* Write request */
804 	req->type = PIPELINE_REQ_PORT_IN_STATS_READ;
805 	req->id = port_id;
806 	req->port_in_stats_read.clear = clear;
807 
808 	/* Send request and wait for response */
809 	rsp = pipeline_msg_send_recv(p, req);
810 
811 	/* Read response */
812 	status = rsp->status;
813 	if (status == 0)
814 		memcpy(stats, &rsp->port_in_stats_read.stats, sizeof(*stats));
815 
816 	/* Free response */
817 	pipeline_msg_free(rsp);
818 
819 	return status;
820 }
821 
822 int
823 pipeline_port_in_enable(const char *pipeline_name,
824 	uint32_t port_id)
825 {
826 	struct pipeline *p;
827 	struct pipeline_msg_req *req;
828 	struct pipeline_msg_rsp *rsp;
829 	int status;
830 
831 	/* Check input params */
832 	if (pipeline_name == NULL)
833 		return -1;
834 
835 	p = pipeline_find(pipeline_name);
836 	if ((p == NULL) ||
837 		(port_id >= p->n_ports_in))
838 		return -1;
839 
840 	if (!pipeline_is_running(p)) {
841 		status = rte_pipeline_port_in_enable(p->p, port_id);
842 		return status;
843 	}
844 
845 	/* Allocate request */
846 	req = pipeline_msg_alloc();
847 	if (req == NULL)
848 		return -1;
849 
850 	/* Write request */
851 	req->type = PIPELINE_REQ_PORT_IN_ENABLE;
852 	req->id = port_id;
853 
854 	/* Send request and wait for response */
855 	rsp = pipeline_msg_send_recv(p, req);
856 
857 	/* Read response */
858 	status = rsp->status;
859 
860 	/* Free response */
861 	pipeline_msg_free(rsp);
862 
863 	return status;
864 }
865 
866 int
867 pipeline_port_in_disable(const char *pipeline_name,
868 	uint32_t port_id)
869 {
870 	struct pipeline *p;
871 	struct pipeline_msg_req *req;
872 	struct pipeline_msg_rsp *rsp;
873 	int status;
874 
875 	/* Check input params */
876 	if (pipeline_name == NULL)
877 		return -1;
878 
879 	p = pipeline_find(pipeline_name);
880 	if ((p == NULL) ||
881 		(port_id >= p->n_ports_in))
882 		return -1;
883 
884 	if (!pipeline_is_running(p)) {
885 		status = rte_pipeline_port_in_disable(p->p, port_id);
886 		return status;
887 	}
888 
889 	/* Allocate request */
890 	req = pipeline_msg_alloc();
891 	if (req == NULL)
892 		return -1;
893 
894 	/* Write request */
895 	req->type = PIPELINE_REQ_PORT_IN_DISABLE;
896 	req->id = port_id;
897 
898 	/* Send request and wait for response */
899 	rsp = pipeline_msg_send_recv(p, req);
900 
901 	/* Read response */
902 	status = rsp->status;
903 
904 	/* Free response */
905 	pipeline_msg_free(rsp);
906 
907 	return status;
908 }
909 
910 int
911 pipeline_port_out_stats_read(const char *pipeline_name,
912 	uint32_t port_id,
913 	struct rte_pipeline_port_out_stats *stats,
914 	int clear)
915 {
916 	struct pipeline *p;
917 	struct pipeline_msg_req *req;
918 	struct pipeline_msg_rsp *rsp;
919 	int status;
920 
921 	/* Check input params */
922 	if ((pipeline_name == NULL) ||
923 		(stats == NULL))
924 		return -1;
925 
926 	p = pipeline_find(pipeline_name);
927 	if ((p == NULL) ||
928 		(port_id >= p->n_ports_out))
929 		return -1;
930 
931 	if (!pipeline_is_running(p)) {
932 		status = rte_pipeline_port_out_stats_read(p->p,
933 			port_id,
934 			stats,
935 			clear);
936 
937 		return status;
938 	}
939 
940 	/* Allocate request */
941 	req = pipeline_msg_alloc();
942 	if (req == NULL)
943 		return -1;
944 
945 	/* Write request */
946 	req->type = PIPELINE_REQ_PORT_OUT_STATS_READ;
947 	req->id = port_id;
948 	req->port_out_stats_read.clear = clear;
949 
950 	/* Send request and wait for response */
951 	rsp = pipeline_msg_send_recv(p, req);
952 
953 	/* Read response */
954 	status = rsp->status;
955 	if (status == 0)
956 		memcpy(stats, &rsp->port_out_stats_read.stats, sizeof(*stats));
957 
958 	/* Free response */
959 	pipeline_msg_free(rsp);
960 
961 	return status;
962 }
963 
964 int
965 pipeline_table_stats_read(const char *pipeline_name,
966 	uint32_t table_id,
967 	struct rte_pipeline_table_stats *stats,
968 	int clear)
969 {
970 	struct pipeline *p;
971 	struct pipeline_msg_req *req;
972 	struct pipeline_msg_rsp *rsp;
973 	int status;
974 
975 	/* Check input params */
976 	if ((pipeline_name == NULL) ||
977 		(stats == NULL))
978 		return -1;
979 
980 	p = pipeline_find(pipeline_name);
981 	if ((p == NULL) ||
982 		(table_id >= p->n_tables))
983 		return -1;
984 
985 	if (!pipeline_is_running(p)) {
986 		status = rte_pipeline_table_stats_read(p->p,
987 			table_id,
988 			stats,
989 			clear);
990 
991 		return status;
992 	}
993 
994 	/* Allocate request */
995 	req = pipeline_msg_alloc();
996 	if (req == NULL)
997 		return -1;
998 
999 	/* Write request */
1000 	req->type = PIPELINE_REQ_TABLE_STATS_READ;
1001 	req->id = table_id;
1002 	req->table_stats_read.clear = clear;
1003 
1004 	/* Send request and wait for response */
1005 	rsp = pipeline_msg_send_recv(p, req);
1006 
1007 	/* Read response */
1008 	status = rsp->status;
1009 	if (status == 0)
1010 		memcpy(stats, &rsp->table_stats_read.stats, sizeof(*stats));
1011 
1012 	/* Free response */
1013 	pipeline_msg_free(rsp);
1014 
1015 	return status;
1016 }
1017 
1018 static int
1019 match_check(struct table_rule_match *match,
1020 	struct pipeline *p,
1021 	uint32_t table_id)
1022 {
1023 	struct table *table;
1024 
1025 	if ((match == NULL) ||
1026 		(p == NULL) ||
1027 		(table_id >= p->n_tables))
1028 		return -1;
1029 
1030 	table = &p->table[table_id];
1031 	if (match->match_type != table->params.match_type)
1032 		return -1;
1033 
1034 	switch (match->match_type) {
1035 	case TABLE_ACL:
1036 	{
1037 		struct table_acl_params *t = &table->params.match.acl;
1038 		struct table_rule_match_acl *r = &match->match.acl;
1039 
1040 		if ((r->ip_version && (t->ip_version == 0)) ||
1041 			((r->ip_version == 0) && t->ip_version))
1042 			return -1;
1043 
1044 		if (r->ip_version) {
1045 			if ((r->sa_depth > 32) ||
1046 				(r->da_depth > 32))
1047 				return -1;
1048 		} else {
1049 			if ((r->sa_depth > 128) ||
1050 				(r->da_depth > 128))
1051 				return -1;
1052 		}
1053 		return 0;
1054 	}
1055 
1056 	case TABLE_ARRAY:
1057 		return 0;
1058 
1059 	case TABLE_HASH:
1060 		return 0;
1061 
1062 	case TABLE_LPM:
1063 	{
1064 		struct table_lpm_params *t = &table->params.match.lpm;
1065 		struct table_rule_match_lpm *r = &match->match.lpm;
1066 
1067 		if ((r->ip_version && (t->key_size != 4)) ||
1068 			((r->ip_version == 0) && (t->key_size != 16)))
1069 			return -1;
1070 
1071 		if (r->ip_version) {
1072 			if (r->depth > 32)
1073 				return -1;
1074 		} else {
1075 			if (r->depth > 128)
1076 				return -1;
1077 		}
1078 		return 0;
1079 	}
1080 
1081 	case TABLE_STUB:
1082 		return -1;
1083 
1084 	default:
1085 		return -1;
1086 	}
1087 }
1088 
1089 static int
1090 action_check(struct table_rule_action *action,
1091 	struct pipeline *p,
1092 	uint32_t table_id)
1093 {
1094 	struct table_action_profile *ap;
1095 
1096 	if ((action == NULL) ||
1097 		(p == NULL) ||
1098 		(table_id >= p->n_tables))
1099 		return -1;
1100 
1101 	ap = p->table[table_id].ap;
1102 	if (action->action_mask != ap->params.action_mask)
1103 		return -1;
1104 
1105 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1106 		if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1107 			(action->fwd.id >= p->n_ports_out))
1108 			return -1;
1109 
1110 		if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1111 			(action->fwd.id >= p->n_tables))
1112 			return -1;
1113 	}
1114 
1115 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
1116 		uint32_t tc_mask0 = (1 << ap->params.mtr.n_tc) - 1;
1117 		uint32_t tc_mask1 = action->mtr.tc_mask;
1118 
1119 		if (tc_mask1 != tc_mask0)
1120 			return -1;
1121 	}
1122 
1123 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
1124 		uint32_t n_subports_per_port =
1125 			ap->params.tm.n_subports_per_port;
1126 		uint32_t n_pipes_per_subport =
1127 			ap->params.tm.n_pipes_per_subport;
1128 		uint32_t subport_id = action->tm.subport_id;
1129 		uint32_t pipe_id = action->tm.pipe_id;
1130 
1131 		if ((subport_id >= n_subports_per_port) ||
1132 			(pipe_id >= n_pipes_per_subport))
1133 			return -1;
1134 	}
1135 
1136 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
1137 		uint64_t encap_mask = ap->params.encap.encap_mask;
1138 		enum rte_table_action_encap_type type = action->encap.type;
1139 
1140 		if ((encap_mask & (1LLU << type)) == 0)
1141 			return -1;
1142 	}
1143 
1144 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
1145 		int ip_version0 = ap->params.common.ip_version;
1146 		int ip_version1 = action->nat.ip_version;
1147 
1148 		if ((ip_version1 && (ip_version0 == 0)) ||
1149 			((ip_version1 == 0) && ip_version0))
1150 			return -1;
1151 	}
1152 
1153 	return 0;
1154 }
1155 
1156 static int
1157 action_default_check(struct table_rule_action *action,
1158 	struct pipeline *p,
1159 	uint32_t table_id)
1160 {
1161 	if ((action == NULL) ||
1162 		(action->action_mask != (1LLU << RTE_TABLE_ACTION_FWD)) ||
1163 		(p == NULL) ||
1164 		(table_id >= p->n_tables))
1165 		return -1;
1166 
1167 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1168 		if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1169 			(action->fwd.id >= p->n_ports_out))
1170 			return -1;
1171 
1172 		if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1173 			(action->fwd.id >= p->n_tables))
1174 			return -1;
1175 	}
1176 
1177 	return 0;
1178 }
1179 
1180 union table_rule_match_low_level {
1181 	struct rte_table_acl_rule_add_params acl_add;
1182 	struct rte_table_acl_rule_delete_params acl_delete;
1183 	struct rte_table_array_key array;
1184 	uint8_t hash[TABLE_RULE_MATCH_SIZE_MAX];
1185 	struct rte_table_lpm_key lpm_ipv4;
1186 	struct rte_table_lpm_ipv6_key lpm_ipv6;
1187 };
1188 
1189 static int
1190 match_convert(struct table_rule_match *mh,
1191 	union table_rule_match_low_level *ml,
1192 	int add);
1193 
1194 static int
1195 action_convert(struct rte_table_action *a,
1196 	struct table_rule_action *action,
1197 	struct rte_pipeline_table_entry *data);
1198 
1199 struct table_ll {
1200 	struct rte_pipeline *p;
1201 	int table_id;
1202 	struct rte_table_action *a;
1203 	int bulk_supported;
1204 };
1205 
1206 static int
1207 table_rule_add_bulk_ll(struct table_ll *table,
1208 	struct table_rule_list *list,
1209 	uint32_t *n_rules)
1210 {
1211 	union table_rule_match_low_level *match_ll = NULL;
1212 	uint8_t *action_ll = NULL;
1213 	void **match_ll_ptr = NULL;
1214 	struct rte_pipeline_table_entry **action_ll_ptr = NULL;
1215 	struct rte_pipeline_table_entry **entries_ptr = NULL;
1216 	int *found = NULL;
1217 	struct table_rule *rule;
1218 	uint32_t n, i;
1219 	int status = 0;
1220 
1221 	n = 0;
1222 	TAILQ_FOREACH(rule, list, node)
1223 		n++;
1224 
1225 	/* Memory allocation */
1226 	match_ll = calloc(n, sizeof(union table_rule_match_low_level));
1227 	action_ll = calloc(n, TABLE_RULE_ACTION_SIZE_MAX);
1228 
1229 	match_ll_ptr = calloc(n, sizeof(void *));
1230 	action_ll_ptr = calloc(n, sizeof(struct rte_pipeline_table_entry *));
1231 
1232 	entries_ptr = calloc(n, sizeof(struct rte_pipeline_table_entry *));
1233 	found = calloc(n, sizeof(int));
1234 
1235 	if (match_ll == NULL ||
1236 		action_ll == NULL ||
1237 		match_ll_ptr == NULL ||
1238 		action_ll_ptr == NULL ||
1239 		entries_ptr == NULL ||
1240 		found == NULL) {
1241 			status = -ENOMEM;
1242 			goto table_rule_add_bulk_ll_free;
1243 	}
1244 
1245 	/* Init */
1246 	for (i = 0; i < n; i++) {
1247 		match_ll_ptr[i] = (void *)&match_ll[i];
1248 		action_ll_ptr[i] = (struct rte_pipeline_table_entry *)
1249 			&action_ll[i * TABLE_RULE_ACTION_SIZE_MAX];
1250 	}
1251 
1252 	/* Rule (match, action) conversion */
1253 	i = 0;
1254 	TAILQ_FOREACH(rule, list, node) {
1255 		status = match_convert(&rule->match, match_ll_ptr[i], 1);
1256 		if (status)
1257 			goto table_rule_add_bulk_ll_free;
1258 
1259 		status = action_convert(table->a, &rule->action, action_ll_ptr[i]);
1260 		if (status)
1261 			goto table_rule_add_bulk_ll_free;
1262 
1263 		i++;
1264 	}
1265 
1266 	/* Add rule (match, action) to table */
1267 	if (table->bulk_supported) {
1268 		status = rte_pipeline_table_entry_add_bulk(table->p,
1269 			table->table_id,
1270 			match_ll_ptr,
1271 			action_ll_ptr,
1272 			n,
1273 			found,
1274 			entries_ptr);
1275 		if (status)
1276 			goto table_rule_add_bulk_ll_free;
1277 	} else
1278 		for (i = 0; i < n; i++) {
1279 			status = rte_pipeline_table_entry_add(table->p,
1280 				table->table_id,
1281 				match_ll_ptr[i],
1282 				action_ll_ptr[i],
1283 				&found[i],
1284 				&entries_ptr[i]);
1285 			if (status) {
1286 				if (i == 0)
1287 					goto table_rule_add_bulk_ll_free;
1288 
1289 				/* No roll-back. */
1290 				status = 0;
1291 				n = i;
1292 				break;
1293 			}
1294 		}
1295 
1296 	/* Write back to the rule list. */
1297 	i = 0;
1298 	TAILQ_FOREACH(rule, list, node) {
1299 		if (i >= n)
1300 			break;
1301 
1302 		rule->data = entries_ptr[i];
1303 
1304 		i++;
1305 	}
1306 
1307 	*n_rules = n;
1308 
1309 	/* Free */
1310 table_rule_add_bulk_ll_free:
1311 	free(found);
1312 	free(entries_ptr);
1313 	free(action_ll_ptr);
1314 	free(match_ll_ptr);
1315 	free(action_ll);
1316 	free(match_ll);
1317 
1318 	return status;
1319 }
1320 
1321 int
1322 pipeline_table_rule_add(const char *pipeline_name,
1323 	uint32_t table_id,
1324 	struct table_rule_match *match,
1325 	struct table_rule_action *action)
1326 {
1327 	struct pipeline *p;
1328 	struct table *table;
1329 	struct pipeline_msg_req *req;
1330 	struct pipeline_msg_rsp *rsp;
1331 	struct table_rule *rule;
1332 	int status;
1333 
1334 	/* Check input params */
1335 	if ((pipeline_name == NULL) ||
1336 		(match == NULL) ||
1337 		(action == NULL))
1338 		return -1;
1339 
1340 	p = pipeline_find(pipeline_name);
1341 	if ((p == NULL) ||
1342 		(table_id >= p->n_tables) ||
1343 		match_check(match, p, table_id) ||
1344 		action_check(action, p, table_id))
1345 		return -1;
1346 
1347 	table = &p->table[table_id];
1348 
1349 	rule = calloc(1, sizeof(struct table_rule));
1350 	if (rule == NULL)
1351 		return -1;
1352 
1353 	memcpy(&rule->match, match, sizeof(*match));
1354 	memcpy(&rule->action, action, sizeof(*action));
1355 
1356 	if (!pipeline_is_running(p)) {
1357 		union table_rule_match_low_level match_ll;
1358 		struct rte_pipeline_table_entry *data_in, *data_out;
1359 		int key_found;
1360 		uint8_t *buffer;
1361 
1362 		buffer = calloc(TABLE_RULE_ACTION_SIZE_MAX, sizeof(uint8_t));
1363 		if (buffer == NULL) {
1364 			free(rule);
1365 			return -1;
1366 		}
1367 
1368 		/* Table match-action rule conversion */
1369 		data_in = (struct rte_pipeline_table_entry *)buffer;
1370 
1371 		status = match_convert(match, &match_ll, 1);
1372 		if (status) {
1373 			free(buffer);
1374 			free(rule);
1375 			return -1;
1376 		}
1377 
1378 		status = action_convert(table->a, action, data_in);
1379 		if (status) {
1380 			free(buffer);
1381 			free(rule);
1382 			return -1;
1383 		}
1384 
1385 		/* Add rule (match, action) to table */
1386 		status = rte_pipeline_table_entry_add(p->p,
1387 				table_id,
1388 				&match_ll,
1389 				data_in,
1390 				&key_found,
1391 				&data_out);
1392 		if (status) {
1393 			free(buffer);
1394 			free(rule);
1395 			return -1;
1396 		}
1397 
1398 		/* Write Response */
1399 		rule->data = data_out;
1400 		table_rule_add(table, rule);
1401 
1402 		free(buffer);
1403 		return 0;
1404 	}
1405 
1406 	/* Allocate request */
1407 	req = pipeline_msg_alloc();
1408 	if (req == NULL) {
1409 		free(rule);
1410 		return -1;
1411 	}
1412 
1413 	/* Write request */
1414 	req->type = PIPELINE_REQ_TABLE_RULE_ADD;
1415 	req->id = table_id;
1416 	memcpy(&req->table_rule_add.match, match, sizeof(*match));
1417 	memcpy(&req->table_rule_add.action, action, sizeof(*action));
1418 
1419 	/* Send request and wait for response */
1420 	rsp = pipeline_msg_send_recv(p, req);
1421 
1422 	/* Read response */
1423 	status = rsp->status;
1424 	if (status == 0) {
1425 		rule->data = rsp->table_rule_add.data;
1426 		table_rule_add(table, rule);
1427 	} else
1428 		free(rule);
1429 
1430 	/* Free response */
1431 	pipeline_msg_free(rsp);
1432 
1433 	return status;
1434 }
1435 
1436 int
1437 pipeline_table_rule_add_default(const char *pipeline_name,
1438 	uint32_t table_id,
1439 	struct table_rule_action *action)
1440 {
1441 	struct pipeline *p;
1442 	struct table *table;
1443 	struct pipeline_msg_req *req;
1444 	struct pipeline_msg_rsp *rsp;
1445 	struct table_rule *rule;
1446 	int status;
1447 
1448 	/* Check input params */
1449 	if ((pipeline_name == NULL) ||
1450 		(action == NULL))
1451 		return -1;
1452 
1453 	p = pipeline_find(pipeline_name);
1454 	if ((p == NULL) ||
1455 		(table_id >= p->n_tables) ||
1456 		action_default_check(action, p, table_id))
1457 		return -1;
1458 
1459 	table = &p->table[table_id];
1460 
1461 	rule = calloc(1, sizeof(struct table_rule));
1462 	if (rule == NULL)
1463 		return -1;
1464 
1465 	memcpy(&rule->action, action, sizeof(*action));
1466 
1467 	if (!pipeline_is_running(p)) {
1468 		struct rte_pipeline_table_entry *data_in, *data_out;
1469 		uint8_t *buffer;
1470 
1471 		buffer = calloc(TABLE_RULE_ACTION_SIZE_MAX, sizeof(uint8_t));
1472 		if (buffer == NULL) {
1473 			free(rule);
1474 			return -1;
1475 		}
1476 
1477 		/* Apply actions */
1478 		data_in = (struct rte_pipeline_table_entry *)buffer;
1479 
1480 		data_in->action = action->fwd.action;
1481 		if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
1482 			data_in->port_id = action->fwd.id;
1483 		if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
1484 			data_in->table_id = action->fwd.id;
1485 
1486 		/* Add default rule to table */
1487 		status = rte_pipeline_table_default_entry_add(p->p,
1488 				table_id,
1489 				data_in,
1490 				&data_out);
1491 		if (status) {
1492 			free(buffer);
1493 			free(rule);
1494 			return -1;
1495 		}
1496 
1497 		/* Write Response */
1498 		rule->data = data_out;
1499 		table_rule_default_add(table, rule);
1500 
1501 		free(buffer);
1502 		return 0;
1503 	}
1504 
1505 	/* Allocate request */
1506 	req = pipeline_msg_alloc();
1507 	if (req == NULL) {
1508 		free(rule);
1509 		return -1;
1510 	}
1511 
1512 	/* Write request */
1513 	req->type = PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT;
1514 	req->id = table_id;
1515 	memcpy(&req->table_rule_add_default.action, action, sizeof(*action));
1516 
1517 	/* Send request and wait for response */
1518 	rsp = pipeline_msg_send_recv(p, req);
1519 
1520 	/* Read response */
1521 	status = rsp->status;
1522 	if (status == 0) {
1523 		rule->data = rsp->table_rule_add_default.data;
1524 		table_rule_default_add(table, rule);
1525 	} else
1526 		free(rule);
1527 
1528 	/* Free response */
1529 	pipeline_msg_free(rsp);
1530 
1531 	return status;
1532 }
1533 
1534 static uint32_t
1535 table_rule_list_free(struct table_rule_list *list)
1536 {
1537 	uint32_t n = 0;
1538 
1539 	if (!list)
1540 		return 0;
1541 
1542 	for ( ; ; ) {
1543 		struct table_rule *rule;
1544 
1545 		rule = TAILQ_FIRST(list);
1546 		if (rule == NULL)
1547 			break;
1548 
1549 		TAILQ_REMOVE(list, rule, node);
1550 		free(rule);
1551 		n++;
1552 	}
1553 
1554 	free(list);
1555 	return n;
1556 }
1557 
1558 int
1559 pipeline_table_rule_add_bulk(const char *pipeline_name,
1560 	uint32_t table_id,
1561 	struct table_rule_list *list,
1562 	uint32_t *n_rules_added,
1563 	uint32_t *n_rules_not_added)
1564 {
1565 	struct pipeline *p;
1566 	struct table *table;
1567 	struct pipeline_msg_req *req;
1568 	struct pipeline_msg_rsp *rsp;
1569 	struct table_rule *rule;
1570 	int status = 0;
1571 
1572 	/* Check input params */
1573 	if ((pipeline_name == NULL) ||
1574 		(list == NULL) ||
1575 		TAILQ_EMPTY(list) ||
1576 		(n_rules_added == NULL) ||
1577 		(n_rules_not_added == NULL)) {
1578 		table_rule_list_free(list);
1579 		return -EINVAL;
1580 	}
1581 
1582 	p = pipeline_find(pipeline_name);
1583 	if ((p == NULL) ||
1584 		(table_id >= p->n_tables)) {
1585 		table_rule_list_free(list);
1586 		return -EINVAL;
1587 	}
1588 
1589 	table = &p->table[table_id];
1590 
1591 	TAILQ_FOREACH(rule, list, node)
1592 		if (match_check(&rule->match, p, table_id) ||
1593 			action_check(&rule->action, p, table_id)) {
1594 			table_rule_list_free(list);
1595 			return -EINVAL;
1596 		}
1597 
1598 	if (!pipeline_is_running(p)) {
1599 		struct table_ll table_ll = {
1600 			.p = p->p,
1601 			.table_id = table_id,
1602 			.a = table->a,
1603 			.bulk_supported = table->params.match_type == TABLE_ACL,
1604 		};
1605 
1606 		status = table_rule_add_bulk_ll(&table_ll, list, n_rules_added);
1607 		if (status) {
1608 			table_rule_list_free(list);
1609 			return status;
1610 		}
1611 
1612 		table_rule_add_bulk(table, list, *n_rules_added);
1613 		*n_rules_not_added = table_rule_list_free(list);
1614 		return 0;
1615 	}
1616 
1617 	/* Allocate request */
1618 	req = pipeline_msg_alloc();
1619 	if (req == NULL) {
1620 		table_rule_list_free(list);
1621 		return -ENOMEM;
1622 	}
1623 
1624 	/* Write request */
1625 	req->type = PIPELINE_REQ_TABLE_RULE_ADD_BULK;
1626 	req->id = table_id;
1627 	req->table_rule_add_bulk.list = list;
1628 	req->table_rule_add_bulk.bulk = table->params.match_type == TABLE_ACL;
1629 
1630 	/* Send request and wait for response */
1631 	rsp = pipeline_msg_send_recv(p, req);
1632 
1633 	/* Read response */
1634 	status = rsp->status;
1635 	if (status == 0) {
1636 		*n_rules_added = rsp->table_rule_add_bulk.n_rules;
1637 
1638 		table_rule_add_bulk(table, list, *n_rules_added);
1639 		*n_rules_not_added = table_rule_list_free(list);
1640 	} else
1641 		table_rule_list_free(list);
1642 
1643 
1644 	/* Free response */
1645 	pipeline_msg_free(rsp);
1646 
1647 	return status;
1648 }
1649 
1650 int
1651 pipeline_table_rule_delete(const char *pipeline_name,
1652 	uint32_t table_id,
1653 	struct table_rule_match *match)
1654 {
1655 	struct pipeline *p;
1656 	struct table *table;
1657 	struct pipeline_msg_req *req;
1658 	struct pipeline_msg_rsp *rsp;
1659 	int status;
1660 
1661 	/* Check input params */
1662 	if ((pipeline_name == NULL) ||
1663 		(match == NULL))
1664 		return -1;
1665 
1666 	p = pipeline_find(pipeline_name);
1667 	if ((p == NULL) ||
1668 		(table_id >= p->n_tables) ||
1669 		match_check(match, p, table_id))
1670 		return -1;
1671 
1672 	table = &p->table[table_id];
1673 
1674 	if (!pipeline_is_running(p)) {
1675 		union table_rule_match_low_level match_ll;
1676 		int key_found;
1677 
1678 		status = match_convert(match, &match_ll, 0);
1679 		if (status)
1680 			return -1;
1681 
1682 		status = rte_pipeline_table_entry_delete(p->p,
1683 				table_id,
1684 				&match_ll,
1685 				&key_found,
1686 				NULL);
1687 
1688 		if (status == 0)
1689 			table_rule_delete(table, match);
1690 
1691 		return status;
1692 	}
1693 
1694 	/* Allocate request */
1695 	req = pipeline_msg_alloc();
1696 	if (req == NULL)
1697 		return -1;
1698 
1699 	/* Write request */
1700 	req->type = PIPELINE_REQ_TABLE_RULE_DELETE;
1701 	req->id = table_id;
1702 	memcpy(&req->table_rule_delete.match, match, sizeof(*match));
1703 
1704 	/* Send request and wait for response */
1705 	rsp = pipeline_msg_send_recv(p, req);
1706 
1707 	/* Read response */
1708 	status = rsp->status;
1709 	if (status == 0)
1710 		table_rule_delete(table, match);
1711 
1712 	/* Free response */
1713 	pipeline_msg_free(rsp);
1714 
1715 	return status;
1716 }
1717 
1718 int
1719 pipeline_table_rule_delete_default(const char *pipeline_name,
1720 	uint32_t table_id)
1721 {
1722 	struct pipeline *p;
1723 	struct table *table;
1724 	struct pipeline_msg_req *req;
1725 	struct pipeline_msg_rsp *rsp;
1726 	int status;
1727 
1728 	/* Check input params */
1729 	if (pipeline_name == NULL)
1730 		return -1;
1731 
1732 	p = pipeline_find(pipeline_name);
1733 	if ((p == NULL) ||
1734 		(table_id >= p->n_tables))
1735 		return -1;
1736 
1737 	table = &p->table[table_id];
1738 
1739 	if (!pipeline_is_running(p)) {
1740 		status = rte_pipeline_table_default_entry_delete(p->p,
1741 			table_id,
1742 			NULL);
1743 
1744 		if (status == 0)
1745 			table_rule_default_delete(table);
1746 
1747 		return status;
1748 	}
1749 
1750 	/* Allocate request */
1751 	req = pipeline_msg_alloc();
1752 	if (req == NULL)
1753 		return -1;
1754 
1755 	/* Write request */
1756 	req->type = PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT;
1757 	req->id = table_id;
1758 
1759 	/* Send request and wait for response */
1760 	rsp = pipeline_msg_send_recv(p, req);
1761 
1762 	/* Read response */
1763 	status = rsp->status;
1764 	if (status == 0)
1765 		table_rule_default_delete(table);
1766 
1767 	/* Free response */
1768 	pipeline_msg_free(rsp);
1769 
1770 	return status;
1771 }
1772 
1773 int
1774 pipeline_table_rule_stats_read(const char *pipeline_name,
1775 	uint32_t table_id,
1776 	struct table_rule_match *match,
1777 	struct rte_table_action_stats_counters *stats,
1778 	int clear)
1779 {
1780 	struct pipeline *p;
1781 	struct table *table;
1782 	struct pipeline_msg_req *req;
1783 	struct pipeline_msg_rsp *rsp;
1784 	struct table_rule *rule;
1785 	int status;
1786 
1787 	/* Check input params */
1788 	if ((pipeline_name == NULL) ||
1789 		(match == NULL) ||
1790 		(stats == NULL))
1791 		return -1;
1792 
1793 	p = pipeline_find(pipeline_name);
1794 	if ((p == NULL) ||
1795 		(table_id >= p->n_tables) ||
1796 		match_check(match, p, table_id))
1797 		return -1;
1798 
1799 	table = &p->table[table_id];
1800 	rule = table_rule_find(table, match);
1801 	if (rule == NULL)
1802 		return -1;
1803 
1804 	if (!pipeline_is_running(p)) {
1805 		status = rte_table_action_stats_read(table->a,
1806 			rule->data,
1807 			stats,
1808 			clear);
1809 
1810 		return status;
1811 	}
1812 
1813 	/* Allocate request */
1814 	req = pipeline_msg_alloc();
1815 	if (req == NULL)
1816 		return -1;
1817 
1818 	/* Write request */
1819 	req->type = PIPELINE_REQ_TABLE_RULE_STATS_READ;
1820 	req->id = table_id;
1821 	req->table_rule_stats_read.data = rule->data;
1822 	req->table_rule_stats_read.clear = clear;
1823 
1824 	/* Send request and wait for response */
1825 	rsp = pipeline_msg_send_recv(p, req);
1826 
1827 	/* Read response */
1828 	status = rsp->status;
1829 	if (status == 0)
1830 		memcpy(stats, &rsp->table_rule_stats_read.stats, sizeof(*stats));
1831 
1832 	/* Free response */
1833 	pipeline_msg_free(rsp);
1834 
1835 	return status;
1836 }
1837 
1838 int
1839 pipeline_table_mtr_profile_add(const char *pipeline_name,
1840 	uint32_t table_id,
1841 	uint32_t meter_profile_id,
1842 	struct rte_table_action_meter_profile *profile)
1843 {
1844 	struct pipeline *p;
1845 	struct pipeline_msg_req *req;
1846 	struct pipeline_msg_rsp *rsp;
1847 	int status;
1848 
1849 	/* Check input params */
1850 	if ((pipeline_name == NULL) ||
1851 		(profile == NULL))
1852 		return -1;
1853 
1854 	p = pipeline_find(pipeline_name);
1855 	if ((p == NULL) ||
1856 		(table_id >= p->n_tables))
1857 		return -1;
1858 
1859 	if (!pipeline_is_running(p)) {
1860 		struct rte_table_action *a = p->table[table_id].a;
1861 
1862 		status = rte_table_action_meter_profile_add(a,
1863 			meter_profile_id,
1864 			profile);
1865 
1866 		return status;
1867 	}
1868 
1869 	/* Allocate request */
1870 	req = pipeline_msg_alloc();
1871 	if (req == NULL)
1872 		return -1;
1873 
1874 	/* Write request */
1875 	req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_ADD;
1876 	req->id = table_id;
1877 	req->table_mtr_profile_add.meter_profile_id = meter_profile_id;
1878 	memcpy(&req->table_mtr_profile_add.profile, profile, sizeof(*profile));
1879 
1880 	/* Send request and wait for response */
1881 	rsp = pipeline_msg_send_recv(p, req);
1882 
1883 	/* Read response */
1884 	status = rsp->status;
1885 
1886 	/* Free response */
1887 	pipeline_msg_free(rsp);
1888 
1889 	return status;
1890 }
1891 
1892 int
1893 pipeline_table_mtr_profile_delete(const char *pipeline_name,
1894 	uint32_t table_id,
1895 	uint32_t meter_profile_id)
1896 {
1897 	struct pipeline *p;
1898 	struct pipeline_msg_req *req;
1899 	struct pipeline_msg_rsp *rsp;
1900 	int status;
1901 
1902 	/* Check input params */
1903 	if (pipeline_name == NULL)
1904 		return -1;
1905 
1906 	p = pipeline_find(pipeline_name);
1907 	if ((p == NULL) ||
1908 		(table_id >= p->n_tables))
1909 		return -1;
1910 
1911 	if (!pipeline_is_running(p)) {
1912 		struct rte_table_action *a = p->table[table_id].a;
1913 
1914 		status = rte_table_action_meter_profile_delete(a,
1915 				meter_profile_id);
1916 
1917 		return status;
1918 	}
1919 
1920 	/* Allocate request */
1921 	req = pipeline_msg_alloc();
1922 	if (req == NULL)
1923 		return -1;
1924 
1925 	/* Write request */
1926 	req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE;
1927 	req->id = table_id;
1928 	req->table_mtr_profile_delete.meter_profile_id = meter_profile_id;
1929 
1930 	/* Send request and wait for response */
1931 	rsp = pipeline_msg_send_recv(p, req);
1932 
1933 	/* Read response */
1934 	status = rsp->status;
1935 
1936 	/* Free response */
1937 	pipeline_msg_free(rsp);
1938 
1939 	return status;
1940 }
1941 
1942 int
1943 pipeline_table_rule_mtr_read(const char *pipeline_name,
1944 	uint32_t table_id,
1945 	struct table_rule_match *match,
1946 	struct rte_table_action_mtr_counters *stats,
1947 	int clear)
1948 {
1949 	struct pipeline *p;
1950 	struct table *table;
1951 	struct pipeline_msg_req *req;
1952 	struct pipeline_msg_rsp *rsp;
1953 	struct table_rule *rule;
1954 	uint32_t tc_mask;
1955 	int status;
1956 
1957 	/* Check input params */
1958 	if ((pipeline_name == NULL) ||
1959 		(match == NULL) ||
1960 		(stats == NULL))
1961 		return -1;
1962 
1963 	p = pipeline_find(pipeline_name);
1964 	if ((p == NULL) ||
1965 		(table_id >= p->n_tables) ||
1966 		match_check(match, p, table_id))
1967 		return -1;
1968 
1969 	table = &p->table[table_id];
1970 	tc_mask = (1 << table->ap->params.mtr.n_tc) - 1;
1971 
1972 	rule = table_rule_find(table, match);
1973 	if (rule == NULL)
1974 		return -1;
1975 
1976 	if (!pipeline_is_running(p)) {
1977 		status = rte_table_action_meter_read(table->a,
1978 				rule->data,
1979 				tc_mask,
1980 				stats,
1981 				clear);
1982 
1983 		return status;
1984 	}
1985 
1986 	/* Allocate request */
1987 	req = pipeline_msg_alloc();
1988 	if (req == NULL)
1989 		return -1;
1990 
1991 	/* Write request */
1992 	req->type = PIPELINE_REQ_TABLE_RULE_MTR_READ;
1993 	req->id = table_id;
1994 	req->table_rule_mtr_read.data = rule->data;
1995 	req->table_rule_mtr_read.tc_mask = tc_mask;
1996 	req->table_rule_mtr_read.clear = clear;
1997 
1998 	/* Send request and wait for response */
1999 	rsp = pipeline_msg_send_recv(p, req);
2000 
2001 	/* Read response */
2002 	status = rsp->status;
2003 	if (status == 0)
2004 		memcpy(stats, &rsp->table_rule_mtr_read.stats, sizeof(*stats));
2005 
2006 	/* Free response */
2007 	pipeline_msg_free(rsp);
2008 
2009 	return status;
2010 }
2011 
2012 int
2013 pipeline_table_dscp_table_update(const char *pipeline_name,
2014 	uint32_t table_id,
2015 	uint64_t dscp_mask,
2016 	struct rte_table_action_dscp_table *dscp_table)
2017 {
2018 	struct pipeline *p;
2019 	struct pipeline_msg_req *req;
2020 	struct pipeline_msg_rsp *rsp;
2021 	int status;
2022 
2023 	/* Check input params */
2024 	if ((pipeline_name == NULL) ||
2025 		(dscp_table == NULL))
2026 		return -1;
2027 
2028 	p = pipeline_find(pipeline_name);
2029 	if ((p == NULL) ||
2030 		(table_id >= p->n_tables))
2031 		return -1;
2032 
2033 	if (!pipeline_is_running(p)) {
2034 		struct rte_table_action *a = p->table[table_id].a;
2035 
2036 		status = rte_table_action_dscp_table_update(a,
2037 				dscp_mask,
2038 				dscp_table);
2039 
2040 		return status;
2041 	}
2042 
2043 	/* Allocate request */
2044 	req = pipeline_msg_alloc();
2045 	if (req == NULL)
2046 		return -1;
2047 
2048 	/* Write request */
2049 	req->type = PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE;
2050 	req->id = table_id;
2051 	req->table_dscp_table_update.dscp_mask = dscp_mask;
2052 	memcpy(&req->table_dscp_table_update.dscp_table,
2053 		dscp_table, sizeof(*dscp_table));
2054 
2055 	/* Send request and wait for response */
2056 	rsp = pipeline_msg_send_recv(p, req);
2057 
2058 	/* Read response */
2059 	status = rsp->status;
2060 
2061 	/* Free response */
2062 	pipeline_msg_free(rsp);
2063 
2064 	return status;
2065 }
2066 
2067 int
2068 pipeline_table_rule_ttl_read(const char *pipeline_name,
2069 	uint32_t table_id,
2070 	struct table_rule_match *match,
2071 	struct rte_table_action_ttl_counters *stats,
2072 	int clear)
2073 {
2074 	struct pipeline *p;
2075 	struct table *table;
2076 	struct pipeline_msg_req *req;
2077 	struct pipeline_msg_rsp *rsp;
2078 	struct table_rule *rule;
2079 	int status;
2080 
2081 	/* Check input params */
2082 	if ((pipeline_name == NULL) ||
2083 		(match == NULL) ||
2084 		(stats == NULL))
2085 		return -1;
2086 
2087 	p = pipeline_find(pipeline_name);
2088 	if ((p == NULL) ||
2089 		(table_id >= p->n_tables) ||
2090 		match_check(match, p, table_id))
2091 		return -1;
2092 
2093 	table = &p->table[table_id];
2094 	if (!table->ap->params.ttl.n_packets_enabled)
2095 		return -1;
2096 
2097 	rule = table_rule_find(table, match);
2098 	if (rule == NULL)
2099 		return -1;
2100 
2101 	if (!pipeline_is_running(p)) {
2102 		status = rte_table_action_ttl_read(table->a,
2103 				rule->data,
2104 				stats,
2105 				clear);
2106 
2107 		return status;
2108 	}
2109 
2110 	/* Allocate request */
2111 	req = pipeline_msg_alloc();
2112 	if (req == NULL)
2113 		return -1;
2114 
2115 	/* Write request */
2116 	req->type = PIPELINE_REQ_TABLE_RULE_TTL_READ;
2117 	req->id = table_id;
2118 	req->table_rule_ttl_read.data = rule->data;
2119 	req->table_rule_ttl_read.clear = clear;
2120 
2121 	/* Send request and wait for response */
2122 	rsp = pipeline_msg_send_recv(p, req);
2123 
2124 	/* Read response */
2125 	status = rsp->status;
2126 	if (status == 0)
2127 		memcpy(stats, &rsp->table_rule_ttl_read.stats, sizeof(*stats));
2128 
2129 	/* Free response */
2130 	pipeline_msg_free(rsp);
2131 
2132 	return status;
2133 }
2134 
2135 int
2136 pipeline_table_rule_time_read(const char *pipeline_name,
2137 	uint32_t table_id,
2138 	struct table_rule_match *match,
2139 	uint64_t *timestamp)
2140 {
2141 	struct pipeline *p;
2142 	struct table *table;
2143 	struct pipeline_msg_req *req;
2144 	struct pipeline_msg_rsp *rsp;
2145 	struct table_rule *rule;
2146 	int status;
2147 
2148 	/* Check input params */
2149 	if ((pipeline_name == NULL) ||
2150 		(match == NULL) ||
2151 		(timestamp == NULL))
2152 		return -1;
2153 
2154 	p = pipeline_find(pipeline_name);
2155 	if ((p == NULL) ||
2156 		(table_id >= p->n_tables) ||
2157 		match_check(match, p, table_id))
2158 		return -1;
2159 
2160 	table = &p->table[table_id];
2161 
2162 	rule = table_rule_find(table, match);
2163 	if (rule == NULL)
2164 		return -1;
2165 
2166 	if (!pipeline_is_running(p)) {
2167 		status = rte_table_action_time_read(table->a,
2168 				rule->data,
2169 				timestamp);
2170 
2171 		return status;
2172 	}
2173 
2174 	/* Allocate request */
2175 	req = pipeline_msg_alloc();
2176 	if (req == NULL)
2177 		return -1;
2178 
2179 	/* Write request */
2180 	req->type = PIPELINE_REQ_TABLE_RULE_TIME_READ;
2181 	req->id = table_id;
2182 	req->table_rule_time_read.data = rule->data;
2183 
2184 	/* Send request and wait for response */
2185 	rsp = pipeline_msg_send_recv(p, req);
2186 
2187 	/* Read response */
2188 	status = rsp->status;
2189 	if (status == 0)
2190 		*timestamp = rsp->table_rule_time_read.timestamp;
2191 
2192 	/* Free response */
2193 	pipeline_msg_free(rsp);
2194 
2195 	return status;
2196 }
2197 
2198 /**
2199  * Data plane threads: message handling
2200  */
2201 static inline struct pipeline_msg_req *
2202 pipeline_msg_recv(struct rte_ring *msgq_req)
2203 {
2204 	struct pipeline_msg_req *req;
2205 
2206 	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
2207 
2208 	if (status != 0)
2209 		return NULL;
2210 
2211 	return req;
2212 }
2213 
2214 static inline void
2215 pipeline_msg_send(struct rte_ring *msgq_rsp,
2216 	struct pipeline_msg_rsp *rsp)
2217 {
2218 	int status;
2219 
2220 	do {
2221 		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
2222 	} while (status == -ENOBUFS);
2223 }
2224 
2225 static struct pipeline_msg_rsp *
2226 pipeline_msg_handle_port_in_stats_read(struct pipeline_data *p,
2227 	struct pipeline_msg_req *req)
2228 {
2229 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2230 	uint32_t port_id = req->id;
2231 	int clear = req->port_in_stats_read.clear;
2232 
2233 	rsp->status = rte_pipeline_port_in_stats_read(p->p,
2234 		port_id,
2235 		&rsp->port_in_stats_read.stats,
2236 		clear);
2237 
2238 	return rsp;
2239 }
2240 
2241 static struct pipeline_msg_rsp *
2242 pipeline_msg_handle_port_in_enable(struct pipeline_data *p,
2243 	struct pipeline_msg_req *req)
2244 {
2245 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2246 	uint32_t port_id = req->id;
2247 
2248 	rsp->status = rte_pipeline_port_in_enable(p->p,
2249 		port_id);
2250 
2251 	return rsp;
2252 }
2253 
2254 static struct pipeline_msg_rsp *
2255 pipeline_msg_handle_port_in_disable(struct pipeline_data *p,
2256 	struct pipeline_msg_req *req)
2257 {
2258 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2259 	uint32_t port_id = req->id;
2260 
2261 	rsp->status = rte_pipeline_port_in_disable(p->p,
2262 		port_id);
2263 
2264 	return rsp;
2265 }
2266 
2267 static struct pipeline_msg_rsp *
2268 pipeline_msg_handle_port_out_stats_read(struct pipeline_data *p,
2269 	struct pipeline_msg_req *req)
2270 {
2271 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2272 	uint32_t port_id = req->id;
2273 	int clear = req->port_out_stats_read.clear;
2274 
2275 	rsp->status = rte_pipeline_port_out_stats_read(p->p,
2276 		port_id,
2277 		&rsp->port_out_stats_read.stats,
2278 		clear);
2279 
2280 	return rsp;
2281 }
2282 
2283 static struct pipeline_msg_rsp *
2284 pipeline_msg_handle_table_stats_read(struct pipeline_data *p,
2285 	struct pipeline_msg_req *req)
2286 {
2287 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2288 	uint32_t port_id = req->id;
2289 	int clear = req->table_stats_read.clear;
2290 
2291 	rsp->status = rte_pipeline_table_stats_read(p->p,
2292 		port_id,
2293 		&rsp->table_stats_read.stats,
2294 		clear);
2295 
2296 	return rsp;
2297 }
2298 
2299 static int
2300 match_convert_ipv6_depth(uint32_t depth, uint32_t *depth32)
2301 {
2302 	if (depth > 128)
2303 		return -1;
2304 
2305 	switch (depth / 32) {
2306 	case 0:
2307 		depth32[0] = depth;
2308 		depth32[1] = 0;
2309 		depth32[2] = 0;
2310 		depth32[3] = 0;
2311 		return 0;
2312 
2313 	case 1:
2314 		depth32[0] = 32;
2315 		depth32[1] = depth - 32;
2316 		depth32[2] = 0;
2317 		depth32[3] = 0;
2318 		return 0;
2319 
2320 	case 2:
2321 		depth32[0] = 32;
2322 		depth32[1] = 32;
2323 		depth32[2] = depth - 64;
2324 		depth32[3] = 0;
2325 		return 0;
2326 
2327 	case 3:
2328 		depth32[0] = 32;
2329 		depth32[1] = 32;
2330 		depth32[2] = 32;
2331 		depth32[3] = depth - 96;
2332 		return 0;
2333 
2334 	case 4:
2335 		depth32[0] = 32;
2336 		depth32[1] = 32;
2337 		depth32[2] = 32;
2338 		depth32[3] = 32;
2339 		return 0;
2340 
2341 	default:
2342 		return -1;
2343 	}
2344 }
2345 
2346 static int
2347 match_convert(struct table_rule_match *mh,
2348 	union table_rule_match_low_level *ml,
2349 	int add)
2350 {
2351 	memset(ml, 0, sizeof(*ml));
2352 
2353 	switch (mh->match_type) {
2354 	case TABLE_ACL:
2355 		if (mh->match.acl.ip_version)
2356 			if (add) {
2357 				ml->acl_add.field_value[0].value.u8 =
2358 					mh->match.acl.proto;
2359 				ml->acl_add.field_value[0].mask_range.u8 =
2360 					mh->match.acl.proto_mask;
2361 
2362 				ml->acl_add.field_value[1].value.u32 =
2363 					mh->match.acl.ipv4.sa;
2364 				ml->acl_add.field_value[1].mask_range.u32 =
2365 					mh->match.acl.sa_depth;
2366 
2367 				ml->acl_add.field_value[2].value.u32 =
2368 					mh->match.acl.ipv4.da;
2369 				ml->acl_add.field_value[2].mask_range.u32 =
2370 					mh->match.acl.da_depth;
2371 
2372 				ml->acl_add.field_value[3].value.u16 =
2373 					mh->match.acl.sp0;
2374 				ml->acl_add.field_value[3].mask_range.u16 =
2375 					mh->match.acl.sp1;
2376 
2377 				ml->acl_add.field_value[4].value.u16 =
2378 					mh->match.acl.dp0;
2379 				ml->acl_add.field_value[4].mask_range.u16 =
2380 					mh->match.acl.dp1;
2381 
2382 				ml->acl_add.priority =
2383 					(int32_t) mh->match.acl.priority;
2384 			} else {
2385 				ml->acl_delete.field_value[0].value.u8 =
2386 					mh->match.acl.proto;
2387 				ml->acl_delete.field_value[0].mask_range.u8 =
2388 					mh->match.acl.proto_mask;
2389 
2390 				ml->acl_delete.field_value[1].value.u32 =
2391 					mh->match.acl.ipv4.sa;
2392 				ml->acl_delete.field_value[1].mask_range.u32 =
2393 					mh->match.acl.sa_depth;
2394 
2395 				ml->acl_delete.field_value[2].value.u32 =
2396 					mh->match.acl.ipv4.da;
2397 				ml->acl_delete.field_value[2].mask_range.u32 =
2398 					mh->match.acl.da_depth;
2399 
2400 				ml->acl_delete.field_value[3].value.u16 =
2401 					mh->match.acl.sp0;
2402 				ml->acl_delete.field_value[3].mask_range.u16 =
2403 					mh->match.acl.sp1;
2404 
2405 				ml->acl_delete.field_value[4].value.u16 =
2406 					mh->match.acl.dp0;
2407 				ml->acl_delete.field_value[4].mask_range.u16 =
2408 					mh->match.acl.dp1;
2409 			}
2410 		else
2411 			if (add) {
2412 				uint32_t *sa32 = (uint32_t *)&mh->match.acl.ipv6.sa;
2413 				uint32_t *da32 = (uint32_t *)&mh->match.acl.ipv6.da;
2414 				uint32_t sa32_depth[4], da32_depth[4];
2415 				int status;
2416 
2417 				status = match_convert_ipv6_depth(
2418 					mh->match.acl.sa_depth,
2419 					sa32_depth);
2420 				if (status)
2421 					return status;
2422 
2423 				status = match_convert_ipv6_depth(
2424 					mh->match.acl.da_depth,
2425 					da32_depth);
2426 				if (status)
2427 					return status;
2428 
2429 				ml->acl_add.field_value[0].value.u8 =
2430 					mh->match.acl.proto;
2431 				ml->acl_add.field_value[0].mask_range.u8 =
2432 					mh->match.acl.proto_mask;
2433 
2434 				ml->acl_add.field_value[1].value.u32 =
2435 					rte_be_to_cpu_32(sa32[0]);
2436 				ml->acl_add.field_value[1].mask_range.u32 =
2437 					sa32_depth[0];
2438 				ml->acl_add.field_value[2].value.u32 =
2439 					rte_be_to_cpu_32(sa32[1]);
2440 				ml->acl_add.field_value[2].mask_range.u32 =
2441 					sa32_depth[1];
2442 				ml->acl_add.field_value[3].value.u32 =
2443 					rte_be_to_cpu_32(sa32[2]);
2444 				ml->acl_add.field_value[3].mask_range.u32 =
2445 					sa32_depth[2];
2446 				ml->acl_add.field_value[4].value.u32 =
2447 					rte_be_to_cpu_32(sa32[3]);
2448 				ml->acl_add.field_value[4].mask_range.u32 =
2449 					sa32_depth[3];
2450 
2451 				ml->acl_add.field_value[5].value.u32 =
2452 					rte_be_to_cpu_32(da32[0]);
2453 				ml->acl_add.field_value[5].mask_range.u32 =
2454 					da32_depth[0];
2455 				ml->acl_add.field_value[6].value.u32 =
2456 					rte_be_to_cpu_32(da32[1]);
2457 				ml->acl_add.field_value[6].mask_range.u32 =
2458 					da32_depth[1];
2459 				ml->acl_add.field_value[7].value.u32 =
2460 					rte_be_to_cpu_32(da32[2]);
2461 				ml->acl_add.field_value[7].mask_range.u32 =
2462 					da32_depth[2];
2463 				ml->acl_add.field_value[8].value.u32 =
2464 					rte_be_to_cpu_32(da32[3]);
2465 				ml->acl_add.field_value[8].mask_range.u32 =
2466 					da32_depth[3];
2467 
2468 				ml->acl_add.field_value[9].value.u16 =
2469 					mh->match.acl.sp0;
2470 				ml->acl_add.field_value[9].mask_range.u16 =
2471 					mh->match.acl.sp1;
2472 
2473 				ml->acl_add.field_value[10].value.u16 =
2474 					mh->match.acl.dp0;
2475 				ml->acl_add.field_value[10].mask_range.u16 =
2476 					mh->match.acl.dp1;
2477 
2478 				ml->acl_add.priority =
2479 					(int32_t) mh->match.acl.priority;
2480 			} else {
2481 				uint32_t *sa32 = (uint32_t *)&mh->match.acl.ipv6.sa;
2482 				uint32_t *da32 = (uint32_t *)&mh->match.acl.ipv6.da;
2483 				uint32_t sa32_depth[4], da32_depth[4];
2484 				int status;
2485 
2486 				status = match_convert_ipv6_depth(
2487 					mh->match.acl.sa_depth,
2488 					sa32_depth);
2489 				if (status)
2490 					return status;
2491 
2492 				status = match_convert_ipv6_depth(
2493 					mh->match.acl.da_depth,
2494 					da32_depth);
2495 				if (status)
2496 					return status;
2497 
2498 				ml->acl_delete.field_value[0].value.u8 =
2499 					mh->match.acl.proto;
2500 				ml->acl_delete.field_value[0].mask_range.u8 =
2501 					mh->match.acl.proto_mask;
2502 
2503 				ml->acl_delete.field_value[1].value.u32 =
2504 					rte_be_to_cpu_32(sa32[0]);
2505 				ml->acl_delete.field_value[1].mask_range.u32 =
2506 					sa32_depth[0];
2507 				ml->acl_delete.field_value[2].value.u32 =
2508 					rte_be_to_cpu_32(sa32[1]);
2509 				ml->acl_delete.field_value[2].mask_range.u32 =
2510 					sa32_depth[1];
2511 				ml->acl_delete.field_value[3].value.u32 =
2512 					rte_be_to_cpu_32(sa32[2]);
2513 				ml->acl_delete.field_value[3].mask_range.u32 =
2514 					sa32_depth[2];
2515 				ml->acl_delete.field_value[4].value.u32 =
2516 					rte_be_to_cpu_32(sa32[3]);
2517 				ml->acl_delete.field_value[4].mask_range.u32 =
2518 					sa32_depth[3];
2519 
2520 				ml->acl_delete.field_value[5].value.u32 =
2521 					rte_be_to_cpu_32(da32[0]);
2522 				ml->acl_delete.field_value[5].mask_range.u32 =
2523 					da32_depth[0];
2524 				ml->acl_delete.field_value[6].value.u32 =
2525 					rte_be_to_cpu_32(da32[1]);
2526 				ml->acl_delete.field_value[6].mask_range.u32 =
2527 					da32_depth[1];
2528 				ml->acl_delete.field_value[7].value.u32 =
2529 					rte_be_to_cpu_32(da32[2]);
2530 				ml->acl_delete.field_value[7].mask_range.u32 =
2531 					da32_depth[2];
2532 				ml->acl_delete.field_value[8].value.u32 =
2533 					rte_be_to_cpu_32(da32[3]);
2534 				ml->acl_delete.field_value[8].mask_range.u32 =
2535 					da32_depth[3];
2536 
2537 				ml->acl_delete.field_value[9].value.u16 =
2538 					mh->match.acl.sp0;
2539 				ml->acl_delete.field_value[9].mask_range.u16 =
2540 					mh->match.acl.sp1;
2541 
2542 				ml->acl_delete.field_value[10].value.u16 =
2543 					mh->match.acl.dp0;
2544 				ml->acl_delete.field_value[10].mask_range.u16 =
2545 					mh->match.acl.dp1;
2546 			}
2547 		return 0;
2548 
2549 	case TABLE_ARRAY:
2550 		ml->array.pos = mh->match.array.pos;
2551 		return 0;
2552 
2553 	case TABLE_HASH:
2554 		memcpy(ml->hash, mh->match.hash.key, sizeof(ml->hash));
2555 		return 0;
2556 
2557 	case TABLE_LPM:
2558 		if (mh->match.lpm.ip_version) {
2559 			ml->lpm_ipv4.ip = mh->match.lpm.ipv4;
2560 			ml->lpm_ipv4.depth = mh->match.lpm.depth;
2561 		} else {
2562 			ml->lpm_ipv6.ip = mh->match.lpm.ipv6;
2563 			ml->lpm_ipv6.depth = mh->match.lpm.depth;
2564 		}
2565 
2566 		return 0;
2567 
2568 	default:
2569 		return -1;
2570 	}
2571 }
2572 
2573 static int
2574 action_convert(struct rte_table_action *a,
2575 	struct table_rule_action *action,
2576 	struct rte_pipeline_table_entry *data)
2577 {
2578 	int status;
2579 
2580 	/* Apply actions */
2581 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
2582 		status = rte_table_action_apply(a,
2583 			data,
2584 			RTE_TABLE_ACTION_FWD,
2585 			&action->fwd);
2586 
2587 		if (status)
2588 			return status;
2589 	}
2590 
2591 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_LB)) {
2592 		status = rte_table_action_apply(a,
2593 			data,
2594 			RTE_TABLE_ACTION_LB,
2595 			&action->lb);
2596 
2597 		if (status)
2598 			return status;
2599 	}
2600 
2601 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
2602 		status = rte_table_action_apply(a,
2603 			data,
2604 			RTE_TABLE_ACTION_MTR,
2605 			&action->mtr);
2606 
2607 		if (status)
2608 			return status;
2609 	}
2610 
2611 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
2612 		status = rte_table_action_apply(a,
2613 			data,
2614 			RTE_TABLE_ACTION_TM,
2615 			&action->tm);
2616 
2617 		if (status)
2618 			return status;
2619 	}
2620 
2621 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
2622 		status = rte_table_action_apply(a,
2623 			data,
2624 			RTE_TABLE_ACTION_ENCAP,
2625 			&action->encap);
2626 
2627 		if (status)
2628 			return status;
2629 	}
2630 
2631 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
2632 		status = rte_table_action_apply(a,
2633 			data,
2634 			RTE_TABLE_ACTION_NAT,
2635 			&action->nat);
2636 
2637 		if (status)
2638 			return status;
2639 	}
2640 
2641 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TTL)) {
2642 		status = rte_table_action_apply(a,
2643 			data,
2644 			RTE_TABLE_ACTION_TTL,
2645 			&action->ttl);
2646 
2647 		if (status)
2648 			return status;
2649 	}
2650 
2651 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_STATS)) {
2652 		status = rte_table_action_apply(a,
2653 			data,
2654 			RTE_TABLE_ACTION_STATS,
2655 			&action->stats);
2656 
2657 		if (status)
2658 			return status;
2659 	}
2660 
2661 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TIME)) {
2662 		status = rte_table_action_apply(a,
2663 			data,
2664 			RTE_TABLE_ACTION_TIME,
2665 			&action->time);
2666 
2667 		if (status)
2668 			return status;
2669 	}
2670 
2671 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_SYM_CRYPTO)) {
2672 		status = rte_table_action_apply(a,
2673 			data,
2674 			RTE_TABLE_ACTION_SYM_CRYPTO,
2675 			&action->sym_crypto);
2676 
2677 		if (status)
2678 			return status;
2679 	}
2680 
2681 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TAG)) {
2682 		status = rte_table_action_apply(a,
2683 			data,
2684 			RTE_TABLE_ACTION_TAG,
2685 			&action->tag);
2686 
2687 		if (status)
2688 			return status;
2689 	}
2690 
2691 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_DECAP)) {
2692 		status = rte_table_action_apply(a,
2693 			data,
2694 			RTE_TABLE_ACTION_DECAP,
2695 			&action->decap);
2696 
2697 		if (status)
2698 			return status;
2699 	}
2700 
2701 	return 0;
2702 }
2703 
2704 static struct pipeline_msg_rsp *
2705 pipeline_msg_handle_table_rule_add(struct pipeline_data *p,
2706 	struct pipeline_msg_req *req)
2707 {
2708 	union table_rule_match_low_level match_ll;
2709 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2710 	struct table_rule_match *match = &req->table_rule_add.match;
2711 	struct table_rule_action *action = &req->table_rule_add.action;
2712 	struct rte_pipeline_table_entry *data_in, *data_out;
2713 	uint32_t table_id = req->id;
2714 	int key_found, status;
2715 	struct rte_table_action *a = p->table_data[table_id].a;
2716 
2717 	/* Apply actions */
2718 	memset(p->buffer, 0, sizeof(p->buffer));
2719 	data_in = (struct rte_pipeline_table_entry *) p->buffer;
2720 
2721 	status = match_convert(match, &match_ll, 1);
2722 	if (status) {
2723 		rsp->status = -1;
2724 		return rsp;
2725 	}
2726 
2727 	status = action_convert(a, action, data_in);
2728 	if (status) {
2729 		rsp->status = -1;
2730 		return rsp;
2731 	}
2732 
2733 	status = rte_pipeline_table_entry_add(p->p,
2734 		table_id,
2735 		&match_ll,
2736 		data_in,
2737 		&key_found,
2738 		&data_out);
2739 	if (status) {
2740 		rsp->status = -1;
2741 		return rsp;
2742 	}
2743 
2744 	/* Write response */
2745 	rsp->status = 0;
2746 	rsp->table_rule_add.data = data_out;
2747 
2748 	return rsp;
2749 }
2750 
2751 static struct pipeline_msg_rsp *
2752 pipeline_msg_handle_table_rule_add_default(struct pipeline_data *p,
2753 	struct pipeline_msg_req *req)
2754 {
2755 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2756 	struct table_rule_action *action = &req->table_rule_add_default.action;
2757 	struct rte_pipeline_table_entry *data_in, *data_out;
2758 	uint32_t table_id = req->id;
2759 	int status;
2760 
2761 	/* Apply actions */
2762 	memset(p->buffer, 0, sizeof(p->buffer));
2763 	data_in = (struct rte_pipeline_table_entry *) p->buffer;
2764 
2765 	data_in->action = action->fwd.action;
2766 	if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
2767 		data_in->port_id = action->fwd.id;
2768 	if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
2769 		data_in->table_id = action->fwd.id;
2770 
2771 	/* Add default rule to table */
2772 	status = rte_pipeline_table_default_entry_add(p->p,
2773 		table_id,
2774 		data_in,
2775 		&data_out);
2776 	if (status) {
2777 		rsp->status = -1;
2778 		return rsp;
2779 	}
2780 
2781 	/* Write response */
2782 	rsp->status = 0;
2783 	rsp->table_rule_add_default.data = data_out;
2784 
2785 	return rsp;
2786 }
2787 
2788 static struct pipeline_msg_rsp *
2789 pipeline_msg_handle_table_rule_add_bulk(struct pipeline_data *p,
2790 	struct pipeline_msg_req *req)
2791 {
2792 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2793 
2794 	uint32_t table_id = req->id;
2795 	struct table_rule_list *list = req->table_rule_add_bulk.list;
2796 	uint32_t bulk = req->table_rule_add_bulk.bulk;
2797 
2798 	uint32_t n_rules_added;
2799 	int status;
2800 
2801 	struct table_ll table_ll = {
2802 		.p = p->p,
2803 		.table_id = table_id,
2804 		.a = p->table_data[table_id].a,
2805 		.bulk_supported = bulk,
2806 	};
2807 
2808 	status = table_rule_add_bulk_ll(&table_ll, list, &n_rules_added);
2809 	if (status) {
2810 		rsp->status = -1;
2811 		rsp->table_rule_add_bulk.n_rules = 0;
2812 		return rsp;
2813 	}
2814 
2815 	/* Write response */
2816 	rsp->status = 0;
2817 	rsp->table_rule_add_bulk.n_rules = n_rules_added;
2818 	return rsp;
2819 }
2820 
2821 static struct pipeline_msg_rsp *
2822 pipeline_msg_handle_table_rule_delete(struct pipeline_data *p,
2823 	struct pipeline_msg_req *req)
2824 {
2825 	union table_rule_match_low_level match_ll;
2826 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2827 	struct table_rule_match *match = &req->table_rule_delete.match;
2828 	uint32_t table_id = req->id;
2829 	int key_found, status;
2830 
2831 	status = match_convert(match, &match_ll, 0);
2832 	if (status) {
2833 		rsp->status = -1;
2834 		return rsp;
2835 	}
2836 
2837 	rsp->status = rte_pipeline_table_entry_delete(p->p,
2838 		table_id,
2839 		&match_ll,
2840 		&key_found,
2841 		NULL);
2842 
2843 	return rsp;
2844 }
2845 
2846 static struct pipeline_msg_rsp *
2847 pipeline_msg_handle_table_rule_delete_default(struct pipeline_data *p,
2848 	struct pipeline_msg_req *req)
2849 {
2850 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2851 	uint32_t table_id = req->id;
2852 
2853 	rsp->status = rte_pipeline_table_default_entry_delete(p->p,
2854 		table_id,
2855 		NULL);
2856 
2857 	return rsp;
2858 }
2859 
2860 static struct pipeline_msg_rsp *
2861 pipeline_msg_handle_table_rule_stats_read(struct pipeline_data *p,
2862 	struct pipeline_msg_req *req)
2863 {
2864 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2865 	uint32_t table_id = req->id;
2866 	void *data = req->table_rule_stats_read.data;
2867 	int clear = req->table_rule_stats_read.clear;
2868 	struct rte_table_action *a = p->table_data[table_id].a;
2869 
2870 	rsp->status = rte_table_action_stats_read(a,
2871 		data,
2872 		&rsp->table_rule_stats_read.stats,
2873 		clear);
2874 
2875 	return rsp;
2876 }
2877 
2878 static struct pipeline_msg_rsp *
2879 pipeline_msg_handle_table_mtr_profile_add(struct pipeline_data *p,
2880 	struct pipeline_msg_req *req)
2881 {
2882 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2883 	uint32_t table_id = req->id;
2884 	uint32_t meter_profile_id = req->table_mtr_profile_add.meter_profile_id;
2885 	struct rte_table_action_meter_profile *profile =
2886 		&req->table_mtr_profile_add.profile;
2887 	struct rte_table_action *a = p->table_data[table_id].a;
2888 
2889 	rsp->status = rte_table_action_meter_profile_add(a,
2890 		meter_profile_id,
2891 		profile);
2892 
2893 	return rsp;
2894 }
2895 
2896 static struct pipeline_msg_rsp *
2897 pipeline_msg_handle_table_mtr_profile_delete(struct pipeline_data *p,
2898 	struct pipeline_msg_req *req)
2899 {
2900 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2901 	uint32_t table_id = req->id;
2902 	uint32_t meter_profile_id =
2903 		req->table_mtr_profile_delete.meter_profile_id;
2904 	struct rte_table_action *a = p->table_data[table_id].a;
2905 
2906 	rsp->status = rte_table_action_meter_profile_delete(a,
2907 		meter_profile_id);
2908 
2909 	return rsp;
2910 }
2911 
2912 static struct pipeline_msg_rsp *
2913 pipeline_msg_handle_table_rule_mtr_read(struct pipeline_data *p,
2914 	struct pipeline_msg_req *req)
2915 {
2916 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2917 	uint32_t table_id = req->id;
2918 	void *data = req->table_rule_mtr_read.data;
2919 	uint32_t tc_mask = req->table_rule_mtr_read.tc_mask;
2920 	int clear = req->table_rule_mtr_read.clear;
2921 	struct rte_table_action *a = p->table_data[table_id].a;
2922 
2923 	rsp->status = rte_table_action_meter_read(a,
2924 		data,
2925 		tc_mask,
2926 		&rsp->table_rule_mtr_read.stats,
2927 		clear);
2928 
2929 	return rsp;
2930 }
2931 
2932 static struct pipeline_msg_rsp *
2933 pipeline_msg_handle_table_dscp_table_update(struct pipeline_data *p,
2934 	struct pipeline_msg_req *req)
2935 {
2936 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2937 	uint32_t table_id = req->id;
2938 	uint64_t dscp_mask = req->table_dscp_table_update.dscp_mask;
2939 	struct rte_table_action_dscp_table *dscp_table =
2940 		&req->table_dscp_table_update.dscp_table;
2941 	struct rte_table_action *a = p->table_data[table_id].a;
2942 
2943 	rsp->status = rte_table_action_dscp_table_update(a,
2944 		dscp_mask,
2945 		dscp_table);
2946 
2947 	return rsp;
2948 }
2949 
2950 static struct pipeline_msg_rsp *
2951 pipeline_msg_handle_table_rule_ttl_read(struct pipeline_data *p,
2952 	struct pipeline_msg_req *req)
2953 {
2954 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2955 	uint32_t table_id = req->id;
2956 	void *data = req->table_rule_ttl_read.data;
2957 	int clear = req->table_rule_ttl_read.clear;
2958 	struct rte_table_action *a = p->table_data[table_id].a;
2959 
2960 	rsp->status = rte_table_action_ttl_read(a,
2961 		data,
2962 		&rsp->table_rule_ttl_read.stats,
2963 		clear);
2964 
2965 	return rsp;
2966 }
2967 
2968 static struct pipeline_msg_rsp *
2969 pipeline_msg_handle_table_rule_time_read(struct pipeline_data *p,
2970 	struct pipeline_msg_req *req)
2971 {
2972 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2973 	uint32_t table_id = req->id;
2974 	void *data = req->table_rule_time_read.data;
2975 	struct rte_table_action *a = p->table_data[table_id].a;
2976 
2977 	rsp->status = rte_table_action_time_read(a,
2978 		data,
2979 		&rsp->table_rule_time_read.timestamp);
2980 
2981 	return rsp;
2982 }
2983 
2984 static void
2985 pipeline_msg_handle(struct pipeline_data *p)
2986 {
2987 	for ( ; ; ) {
2988 		struct pipeline_msg_req *req;
2989 		struct pipeline_msg_rsp *rsp;
2990 
2991 		req = pipeline_msg_recv(p->msgq_req);
2992 		if (req == NULL)
2993 			break;
2994 
2995 		switch (req->type) {
2996 		case PIPELINE_REQ_PORT_IN_STATS_READ:
2997 			rsp = pipeline_msg_handle_port_in_stats_read(p, req);
2998 			break;
2999 
3000 		case PIPELINE_REQ_PORT_IN_ENABLE:
3001 			rsp = pipeline_msg_handle_port_in_enable(p, req);
3002 			break;
3003 
3004 		case PIPELINE_REQ_PORT_IN_DISABLE:
3005 			rsp = pipeline_msg_handle_port_in_disable(p, req);
3006 			break;
3007 
3008 		case PIPELINE_REQ_PORT_OUT_STATS_READ:
3009 			rsp = pipeline_msg_handle_port_out_stats_read(p, req);
3010 			break;
3011 
3012 		case PIPELINE_REQ_TABLE_STATS_READ:
3013 			rsp = pipeline_msg_handle_table_stats_read(p, req);
3014 			break;
3015 
3016 		case PIPELINE_REQ_TABLE_RULE_ADD:
3017 			rsp = pipeline_msg_handle_table_rule_add(p, req);
3018 			break;
3019 
3020 		case PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT:
3021 			rsp = pipeline_msg_handle_table_rule_add_default(p,	req);
3022 			break;
3023 
3024 		case PIPELINE_REQ_TABLE_RULE_ADD_BULK:
3025 			rsp = pipeline_msg_handle_table_rule_add_bulk(p, req);
3026 			break;
3027 
3028 		case PIPELINE_REQ_TABLE_RULE_DELETE:
3029 			rsp = pipeline_msg_handle_table_rule_delete(p, req);
3030 			break;
3031 
3032 		case PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT:
3033 			rsp = pipeline_msg_handle_table_rule_delete_default(p, req);
3034 			break;
3035 
3036 		case PIPELINE_REQ_TABLE_RULE_STATS_READ:
3037 			rsp = pipeline_msg_handle_table_rule_stats_read(p, req);
3038 			break;
3039 
3040 		case PIPELINE_REQ_TABLE_MTR_PROFILE_ADD:
3041 			rsp = pipeline_msg_handle_table_mtr_profile_add(p, req);
3042 			break;
3043 
3044 		case PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE:
3045 			rsp = pipeline_msg_handle_table_mtr_profile_delete(p, req);
3046 			break;
3047 
3048 		case PIPELINE_REQ_TABLE_RULE_MTR_READ:
3049 			rsp = pipeline_msg_handle_table_rule_mtr_read(p, req);
3050 			break;
3051 
3052 		case PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE:
3053 			rsp = pipeline_msg_handle_table_dscp_table_update(p, req);
3054 			break;
3055 
3056 		case PIPELINE_REQ_TABLE_RULE_TTL_READ:
3057 			rsp = pipeline_msg_handle_table_rule_ttl_read(p, req);
3058 			break;
3059 
3060 		case PIPELINE_REQ_TABLE_RULE_TIME_READ:
3061 			rsp = pipeline_msg_handle_table_rule_time_read(p, req);
3062 			break;
3063 
3064 		default:
3065 			rsp = (struct pipeline_msg_rsp *) req;
3066 			rsp->status = -1;
3067 		}
3068 
3069 		pipeline_msg_send(p->msgq_rsp, rsp);
3070 	}
3071 }
3072 
3073 /**
3074  * Data plane threads: main
3075  */
3076 int
3077 thread_main(void *arg __rte_unused)
3078 {
3079 	struct thread_data *t;
3080 	uint32_t thread_id, i;
3081 
3082 	thread_id = rte_lcore_id();
3083 	t = &thread_data[thread_id];
3084 
3085 	/* Dispatch loop */
3086 	for (i = 0; ; i++) {
3087 		uint32_t j;
3088 
3089 		/* Data Plane */
3090 		for (j = 0; j < t->n_pipelines; j++)
3091 			rte_pipeline_run(t->p[j]);
3092 
3093 		/* Control Plane */
3094 		if ((i & 0xF) == 0) {
3095 			uint64_t time = rte_get_tsc_cycles();
3096 			uint64_t time_next_min = UINT64_MAX;
3097 
3098 			if (time < t->time_next_min)
3099 				continue;
3100 
3101 			/* Pipeline message queues */
3102 			for (j = 0; j < t->n_pipelines; j++) {
3103 				struct pipeline_data *p =
3104 					&t->pipeline_data[j];
3105 				uint64_t time_next = p->time_next;
3106 
3107 				if (time_next <= time) {
3108 					pipeline_msg_handle(p);
3109 					rte_pipeline_flush(p->p);
3110 					time_next = time + p->timer_period;
3111 					p->time_next = time_next;
3112 				}
3113 
3114 				if (time_next < time_next_min)
3115 					time_next_min = time_next;
3116 			}
3117 
3118 			/* Thread message queues */
3119 			{
3120 				uint64_t time_next = t->time_next;
3121 
3122 				if (time_next <= time) {
3123 					thread_msg_handle(t);
3124 					time_next = time + t->timer_period;
3125 					t->time_next = time_next;
3126 				}
3127 
3128 				if (time_next < time_next_min)
3129 					time_next_min = time_next;
3130 			}
3131 
3132 			t->time_next_min = time_next_min;
3133 		}
3134 	}
3135 
3136 	return 0;
3137 }
3138