xref: /dpdk/examples/ip_pipeline/thread.c (revision a137d012a0dd7be982367687ea6cd10de6f12c69)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2018 Intel Corporation
3  */
4 
5 #include <stdlib.h>
6 
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11 
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17 
18 #include "common.h"
19 #include "thread.h"
20 #include "pipeline.h"
21 
22 #ifndef THREAD_PIPELINES_MAX
23 #define THREAD_PIPELINES_MAX                               256
24 #endif
25 
26 #ifndef THREAD_MSGQ_SIZE
27 #define THREAD_MSGQ_SIZE                                   64
28 #endif
29 
30 #ifndef THREAD_TIMER_PERIOD_MS
31 #define THREAD_TIMER_PERIOD_MS                             100
32 #endif
33 
34 /**
35  * Master thead: data plane thread context
36  */
37 struct thread {
38 	struct rte_ring *msgq_req;
39 	struct rte_ring *msgq_rsp;
40 
41 	uint32_t enabled;
42 };
43 
44 static struct thread thread[RTE_MAX_LCORE];
45 
46 /**
47  * Data plane threads: context
48  */
49 struct table_data {
50 	struct rte_table_action *a;
51 };
52 
53 struct pipeline_data {
54 	struct rte_pipeline *p;
55 	struct table_data table_data[RTE_PIPELINE_TABLE_MAX];
56 	uint32_t n_tables;
57 
58 	struct rte_ring *msgq_req;
59 	struct rte_ring *msgq_rsp;
60 	uint64_t timer_period; /* Measured in CPU cycles. */
61 	uint64_t time_next;
62 
63 	uint8_t buffer[TABLE_RULE_ACTION_SIZE_MAX];
64 };
65 
66 struct thread_data {
67 	struct rte_pipeline *p[THREAD_PIPELINES_MAX];
68 	uint32_t n_pipelines;
69 
70 	struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
71 	struct rte_ring *msgq_req;
72 	struct rte_ring *msgq_rsp;
73 	uint64_t timer_period; /* Measured in CPU cycles. */
74 	uint64_t time_next;
75 	uint64_t time_next_min;
76 } __rte_cache_aligned;
77 
78 static struct thread_data thread_data[RTE_MAX_LCORE];
79 
80 /**
81  * Master thread: data plane thread init
82  */
83 static void
84 thread_free(void)
85 {
86 	uint32_t i;
87 
88 	for (i = 0; i < RTE_MAX_LCORE; i++) {
89 		struct thread *t = &thread[i];
90 
91 		if (!rte_lcore_is_enabled(i))
92 			continue;
93 
94 		/* MSGQs */
95 		if (t->msgq_req)
96 			rte_ring_free(t->msgq_req);
97 
98 		if (t->msgq_rsp)
99 			rte_ring_free(t->msgq_rsp);
100 	}
101 }
102 
103 int
104 thread_init(void)
105 {
106 	uint32_t i;
107 
108 	RTE_LCORE_FOREACH_SLAVE(i) {
109 		char name[NAME_MAX];
110 		struct rte_ring *msgq_req, *msgq_rsp;
111 		struct thread *t = &thread[i];
112 		struct thread_data *t_data = &thread_data[i];
113 		uint32_t cpu_id = rte_lcore_to_socket_id(i);
114 
115 		/* MSGQs */
116 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
117 
118 		msgq_req = rte_ring_create(name,
119 			THREAD_MSGQ_SIZE,
120 			cpu_id,
121 			RING_F_SP_ENQ | RING_F_SC_DEQ);
122 
123 		if (msgq_req == NULL) {
124 			thread_free();
125 			return -1;
126 		}
127 
128 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
129 
130 		msgq_rsp = rte_ring_create(name,
131 			THREAD_MSGQ_SIZE,
132 			cpu_id,
133 			RING_F_SP_ENQ | RING_F_SC_DEQ);
134 
135 		if (msgq_rsp == NULL) {
136 			thread_free();
137 			return -1;
138 		}
139 
140 		/* Master thread records */
141 		t->msgq_req = msgq_req;
142 		t->msgq_rsp = msgq_rsp;
143 		t->enabled = 1;
144 
145 		/* Data plane thread records */
146 		t_data->n_pipelines = 0;
147 		t_data->msgq_req = msgq_req;
148 		t_data->msgq_rsp = msgq_rsp;
149 		t_data->timer_period =
150 			(rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
151 		t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
152 		t_data->time_next_min = t_data->time_next;
153 	}
154 
155 	return 0;
156 }
157 
158 static inline int
159 thread_is_running(uint32_t thread_id)
160 {
161 	enum rte_lcore_state_t thread_state;
162 
163 	thread_state = rte_eal_get_lcore_state(thread_id);
164 	return (thread_state == RUNNING) ? 1 : 0;
165 }
166 
167 /**
168  * Pipeline is running when:
169  *    (A) Pipeline is mapped to a data plane thread AND
170  *    (B) Its data plane thread is in RUNNING state.
171  */
172 static inline int
173 pipeline_is_running(struct pipeline *p)
174 {
175 	if (p->enabled == 0)
176 		return 0;
177 
178 	return thread_is_running(p->thread_id);
179 }
180 
181 /**
182  * Master thread & data plane threads: message passing
183  */
184 enum thread_req_type {
185 	THREAD_REQ_PIPELINE_ENABLE = 0,
186 	THREAD_REQ_PIPELINE_DISABLE,
187 	THREAD_REQ_MAX
188 };
189 
190 struct thread_msg_req {
191 	enum thread_req_type type;
192 
193 	union {
194 		struct {
195 			struct rte_pipeline *p;
196 			struct {
197 				struct rte_table_action *a;
198 			} table[RTE_PIPELINE_TABLE_MAX];
199 			struct rte_ring *msgq_req;
200 			struct rte_ring *msgq_rsp;
201 			uint32_t timer_period_ms;
202 			uint32_t n_tables;
203 		} pipeline_enable;
204 
205 		struct {
206 			struct rte_pipeline *p;
207 		} pipeline_disable;
208 	};
209 };
210 
211 struct thread_msg_rsp {
212 	int status;
213 };
214 
215 /**
216  * Master thread
217  */
218 static struct thread_msg_req *
219 thread_msg_alloc(void)
220 {
221 	size_t size = RTE_MAX(sizeof(struct thread_msg_req),
222 		sizeof(struct thread_msg_rsp));
223 
224 	return calloc(1, size);
225 }
226 
227 static void
228 thread_msg_free(struct thread_msg_rsp *rsp)
229 {
230 	free(rsp);
231 }
232 
233 static struct thread_msg_rsp *
234 thread_msg_send_recv(uint32_t thread_id,
235 	struct thread_msg_req *req)
236 {
237 	struct thread *t = &thread[thread_id];
238 	struct rte_ring *msgq_req = t->msgq_req;
239 	struct rte_ring *msgq_rsp = t->msgq_rsp;
240 	struct thread_msg_rsp *rsp;
241 	int status;
242 
243 	/* send */
244 	do {
245 		status = rte_ring_sp_enqueue(msgq_req, req);
246 	} while (status == -ENOBUFS);
247 
248 	/* recv */
249 	do {
250 		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
251 	} while (status != 0);
252 
253 	return rsp;
254 }
255 
256 int
257 thread_pipeline_enable(uint32_t thread_id,
258 	const char *pipeline_name)
259 {
260 	struct pipeline *p = pipeline_find(pipeline_name);
261 	struct thread *t;
262 	struct thread_msg_req *req;
263 	struct thread_msg_rsp *rsp;
264 	uint32_t i;
265 	int status;
266 
267 	/* Check input params */
268 	if ((thread_id >= RTE_MAX_LCORE) ||
269 		(p == NULL) ||
270 		(p->n_ports_in == 0) ||
271 		(p->n_ports_out == 0) ||
272 		(p->n_tables == 0))
273 		return -1;
274 
275 	t = &thread[thread_id];
276 	if ((t->enabled == 0) ||
277 		p->enabled)
278 		return -1;
279 
280 	if (!thread_is_running(thread_id)) {
281 		struct thread_data *td = &thread_data[thread_id];
282 		struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
283 
284 		if (td->n_pipelines >= THREAD_PIPELINES_MAX)
285 			return -1;
286 
287 		/* Data plane thread */
288 		td->p[td->n_pipelines] = p->p;
289 
290 		tdp->p = p->p;
291 		for (i = 0; i < p->n_tables; i++)
292 			tdp->table_data[i].a = p->table[i].a;
293 
294 		tdp->n_tables = p->n_tables;
295 
296 		tdp->msgq_req = p->msgq_req;
297 		tdp->msgq_rsp = p->msgq_rsp;
298 		tdp->timer_period = (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
299 		tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
300 
301 		td->n_pipelines++;
302 
303 		/* Pipeline */
304 		p->thread_id = thread_id;
305 		p->enabled = 1;
306 
307 		return 0;
308 	}
309 
310 	/* Allocate request */
311 	req = thread_msg_alloc();
312 	if (req == NULL)
313 		return -1;
314 
315 	/* Write request */
316 	req->type = THREAD_REQ_PIPELINE_ENABLE;
317 	req->pipeline_enable.p = p->p;
318 	for (i = 0; i < p->n_tables; i++)
319 		req->pipeline_enable.table[i].a =
320 			p->table[i].a;
321 	req->pipeline_enable.msgq_req = p->msgq_req;
322 	req->pipeline_enable.msgq_rsp = p->msgq_rsp;
323 	req->pipeline_enable.timer_period_ms = p->timer_period_ms;
324 	req->pipeline_enable.n_tables = p->n_tables;
325 
326 	/* Send request and wait for response */
327 	rsp = thread_msg_send_recv(thread_id, req);
328 	if (rsp == NULL)
329 		return -1;
330 
331 	/* Read response */
332 	status = rsp->status;
333 
334 	/* Free response */
335 	thread_msg_free(rsp);
336 
337 	/* Request completion */
338 	if (status)
339 		return status;
340 
341 	p->thread_id = thread_id;
342 	p->enabled = 1;
343 
344 	return 0;
345 }
346 
347 int
348 thread_pipeline_disable(uint32_t thread_id,
349 	const char *pipeline_name)
350 {
351 	struct pipeline *p = pipeline_find(pipeline_name);
352 	struct thread *t;
353 	struct thread_msg_req *req;
354 	struct thread_msg_rsp *rsp;
355 	int status;
356 
357 	/* Check input params */
358 	if ((thread_id >= RTE_MAX_LCORE) ||
359 		(p == NULL))
360 		return -1;
361 
362 	t = &thread[thread_id];
363 	if (t->enabled == 0)
364 		return -1;
365 
366 	if (p->enabled == 0)
367 		return 0;
368 
369 	if (p->thread_id != thread_id)
370 		return -1;
371 
372 	if (!thread_is_running(thread_id)) {
373 		struct thread_data *td = &thread_data[thread_id];
374 		uint32_t i;
375 
376 		for (i = 0; i < td->n_pipelines; i++) {
377 			struct pipeline_data *tdp = &td->pipeline_data[i];
378 
379 			if (tdp->p != p->p)
380 				continue;
381 
382 			/* Data plane thread */
383 			if (i < td->n_pipelines - 1) {
384 				struct rte_pipeline *pipeline_last =
385 					td->p[td->n_pipelines - 1];
386 				struct pipeline_data *tdp_last =
387 					&td->pipeline_data[td->n_pipelines - 1];
388 
389 				td->p[i] = pipeline_last;
390 				memcpy(tdp, tdp_last, sizeof(*tdp));
391 			}
392 
393 			td->n_pipelines--;
394 
395 			/* Pipeline */
396 			p->enabled = 0;
397 
398 			break;
399 		}
400 
401 		return 0;
402 	}
403 
404 	/* Allocate request */
405 	req = thread_msg_alloc();
406 	if (req == NULL)
407 		return -1;
408 
409 	/* Write request */
410 	req->type = THREAD_REQ_PIPELINE_DISABLE;
411 	req->pipeline_disable.p = p->p;
412 
413 	/* Send request and wait for response */
414 	rsp = thread_msg_send_recv(thread_id, req);
415 	if (rsp == NULL)
416 		return -1;
417 
418 	/* Read response */
419 	status = rsp->status;
420 
421 	/* Free response */
422 	thread_msg_free(rsp);
423 
424 	/* Request completion */
425 	if (status)
426 		return status;
427 
428 	p->enabled = 0;
429 
430 	return 0;
431 }
432 
433 /**
434  * Data plane threads: message handling
435  */
436 static inline struct thread_msg_req *
437 thread_msg_recv(struct rte_ring *msgq_req)
438 {
439 	struct thread_msg_req *req;
440 
441 	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
442 
443 	if (status != 0)
444 		return NULL;
445 
446 	return req;
447 }
448 
449 static inline void
450 thread_msg_send(struct rte_ring *msgq_rsp,
451 	struct thread_msg_rsp *rsp)
452 {
453 	int status;
454 
455 	do {
456 		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
457 	} while (status == -ENOBUFS);
458 }
459 
460 static struct thread_msg_rsp *
461 thread_msg_handle_pipeline_enable(struct thread_data *t,
462 	struct thread_msg_req *req)
463 {
464 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
465 	struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
466 	uint32_t i;
467 
468 	/* Request */
469 	if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
470 		rsp->status = -1;
471 		return rsp;
472 	}
473 
474 	t->p[t->n_pipelines] = req->pipeline_enable.p;
475 
476 	p->p = req->pipeline_enable.p;
477 	for (i = 0; i < req->pipeline_enable.n_tables; i++)
478 		p->table_data[i].a =
479 			req->pipeline_enable.table[i].a;
480 
481 	p->n_tables = req->pipeline_enable.n_tables;
482 
483 	p->msgq_req = req->pipeline_enable.msgq_req;
484 	p->msgq_rsp = req->pipeline_enable.msgq_rsp;
485 	p->timer_period =
486 		(rte_get_tsc_hz() * req->pipeline_enable.timer_period_ms) / 1000;
487 	p->time_next = rte_get_tsc_cycles() + p->timer_period;
488 
489 	t->n_pipelines++;
490 
491 	/* Response */
492 	rsp->status = 0;
493 	return rsp;
494 }
495 
496 static struct thread_msg_rsp *
497 thread_msg_handle_pipeline_disable(struct thread_data *t,
498 	struct thread_msg_req *req)
499 {
500 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
501 	uint32_t n_pipelines = t->n_pipelines;
502 	struct rte_pipeline *pipeline = req->pipeline_disable.p;
503 	uint32_t i;
504 
505 	/* find pipeline */
506 	for (i = 0; i < n_pipelines; i++) {
507 		struct pipeline_data *p = &t->pipeline_data[i];
508 
509 		if (p->p != pipeline)
510 			continue;
511 
512 		if (i < n_pipelines - 1) {
513 			struct rte_pipeline *pipeline_last =
514 				t->p[n_pipelines - 1];
515 			struct pipeline_data *p_last =
516 				&t->pipeline_data[n_pipelines - 1];
517 
518 			t->p[i] = pipeline_last;
519 			memcpy(p, p_last, sizeof(*p));
520 		}
521 
522 		t->n_pipelines--;
523 
524 		rsp->status = 0;
525 		return rsp;
526 	}
527 
528 	/* should not get here */
529 	rsp->status = 0;
530 	return rsp;
531 }
532 
533 static void
534 thread_msg_handle(struct thread_data *t)
535 {
536 	for ( ; ; ) {
537 		struct thread_msg_req *req;
538 		struct thread_msg_rsp *rsp;
539 
540 		req = thread_msg_recv(t->msgq_req);
541 		if (req == NULL)
542 			break;
543 
544 		switch (req->type) {
545 		case THREAD_REQ_PIPELINE_ENABLE:
546 			rsp = thread_msg_handle_pipeline_enable(t, req);
547 			break;
548 
549 		case THREAD_REQ_PIPELINE_DISABLE:
550 			rsp = thread_msg_handle_pipeline_disable(t, req);
551 			break;
552 
553 		default:
554 			rsp = (struct thread_msg_rsp *) req;
555 			rsp->status = -1;
556 		}
557 
558 		thread_msg_send(t->msgq_rsp, rsp);
559 	}
560 }
561 
562 /**
563  * Master thread & data plane threads: message passing
564  */
565 enum pipeline_req_type {
566 	/* Port IN */
567 	PIPELINE_REQ_PORT_IN_STATS_READ,
568 	PIPELINE_REQ_PORT_IN_ENABLE,
569 	PIPELINE_REQ_PORT_IN_DISABLE,
570 
571 	/* Port OUT */
572 	PIPELINE_REQ_PORT_OUT_STATS_READ,
573 
574 	/* Table */
575 	PIPELINE_REQ_TABLE_STATS_READ,
576 	PIPELINE_REQ_TABLE_RULE_ADD,
577 	PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT,
578 	PIPELINE_REQ_TABLE_RULE_ADD_BULK,
579 	PIPELINE_REQ_TABLE_RULE_DELETE,
580 	PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT,
581 	PIPELINE_REQ_TABLE_RULE_STATS_READ,
582 	PIPELINE_REQ_TABLE_MTR_PROFILE_ADD,
583 	PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE,
584 	PIPELINE_REQ_TABLE_RULE_MTR_READ,
585 	PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE,
586 	PIPELINE_REQ_TABLE_RULE_TTL_READ,
587 	PIPELINE_REQ_MAX
588 };
589 
590 struct pipeline_msg_req_port_in_stats_read {
591 	int clear;
592 };
593 
594 struct pipeline_msg_req_port_out_stats_read {
595 	int clear;
596 };
597 
598 struct pipeline_msg_req_table_stats_read {
599 	int clear;
600 };
601 
602 struct pipeline_msg_req_table_rule_add {
603 	struct table_rule_match match;
604 	struct table_rule_action action;
605 };
606 
607 struct pipeline_msg_req_table_rule_add_default {
608 	struct table_rule_action action;
609 };
610 
611 struct pipeline_msg_req_table_rule_add_bulk {
612 	struct table_rule_match *match;
613 	struct table_rule_action *action;
614 	void **data;
615 	uint32_t n_rules;
616 	int bulk;
617 };
618 
619 struct pipeline_msg_req_table_rule_delete {
620 	struct table_rule_match match;
621 };
622 
623 struct pipeline_msg_req_table_rule_stats_read {
624 	void *data;
625 	int clear;
626 };
627 
628 struct pipeline_msg_req_table_mtr_profile_add {
629 	uint32_t meter_profile_id;
630 	struct rte_table_action_meter_profile profile;
631 };
632 
633 struct pipeline_msg_req_table_mtr_profile_delete {
634 	uint32_t meter_profile_id;
635 };
636 
637 struct pipeline_msg_req_table_rule_mtr_read {
638 	void *data;
639 	uint32_t tc_mask;
640 	int clear;
641 };
642 
643 struct pipeline_msg_req_table_dscp_table_update {
644 	uint64_t dscp_mask;
645 	struct rte_table_action_dscp_table dscp_table;
646 };
647 
648 struct pipeline_msg_req_table_rule_ttl_read {
649 	void *data;
650 	int clear;
651 };
652 
653 struct pipeline_msg_req {
654 	enum pipeline_req_type type;
655 	uint32_t id; /* Port IN, port OUT or table ID */
656 
657 	RTE_STD_C11
658 	union {
659 		struct pipeline_msg_req_port_in_stats_read port_in_stats_read;
660 		struct pipeline_msg_req_port_out_stats_read port_out_stats_read;
661 		struct pipeline_msg_req_table_stats_read table_stats_read;
662 		struct pipeline_msg_req_table_rule_add table_rule_add;
663 		struct pipeline_msg_req_table_rule_add_default table_rule_add_default;
664 		struct pipeline_msg_req_table_rule_add_bulk table_rule_add_bulk;
665 		struct pipeline_msg_req_table_rule_delete table_rule_delete;
666 		struct pipeline_msg_req_table_rule_stats_read table_rule_stats_read;
667 		struct pipeline_msg_req_table_mtr_profile_add table_mtr_profile_add;
668 		struct pipeline_msg_req_table_mtr_profile_delete table_mtr_profile_delete;
669 		struct pipeline_msg_req_table_rule_mtr_read table_rule_mtr_read;
670 		struct pipeline_msg_req_table_dscp_table_update table_dscp_table_update;
671 		struct pipeline_msg_req_table_rule_ttl_read table_rule_ttl_read;
672 	};
673 };
674 
675 struct pipeline_msg_rsp_port_in_stats_read {
676 	struct rte_pipeline_port_in_stats stats;
677 };
678 
679 struct pipeline_msg_rsp_port_out_stats_read {
680 	struct rte_pipeline_port_out_stats stats;
681 };
682 
683 struct pipeline_msg_rsp_table_stats_read {
684 	struct rte_pipeline_table_stats stats;
685 };
686 
687 struct pipeline_msg_rsp_table_rule_add {
688 	void *data;
689 };
690 
691 struct pipeline_msg_rsp_table_rule_add_default {
692 	void *data;
693 };
694 
695 struct pipeline_msg_rsp_table_rule_add_bulk {
696 	uint32_t n_rules;
697 };
698 
699 struct pipeline_msg_rsp_table_rule_stats_read {
700 	struct rte_table_action_stats_counters stats;
701 };
702 
703 struct pipeline_msg_rsp_table_rule_mtr_read {
704 	struct rte_table_action_mtr_counters stats;
705 };
706 
707 struct pipeline_msg_rsp_table_rule_ttl_read {
708 	struct rte_table_action_ttl_counters stats;
709 };
710 
711 struct pipeline_msg_rsp {
712 	int status;
713 
714 	RTE_STD_C11
715 	union {
716 		struct pipeline_msg_rsp_port_in_stats_read port_in_stats_read;
717 		struct pipeline_msg_rsp_port_out_stats_read port_out_stats_read;
718 		struct pipeline_msg_rsp_table_stats_read table_stats_read;
719 		struct pipeline_msg_rsp_table_rule_add table_rule_add;
720 		struct pipeline_msg_rsp_table_rule_add_default table_rule_add_default;
721 		struct pipeline_msg_rsp_table_rule_add_bulk table_rule_add_bulk;
722 		struct pipeline_msg_rsp_table_rule_stats_read table_rule_stats_read;
723 		struct pipeline_msg_rsp_table_rule_mtr_read table_rule_mtr_read;
724 		struct pipeline_msg_rsp_table_rule_ttl_read table_rule_ttl_read;
725 	};
726 };
727 
728 /**
729  * Master thread
730  */
731 static struct pipeline_msg_req *
732 pipeline_msg_alloc(void)
733 {
734 	size_t size = RTE_MAX(sizeof(struct pipeline_msg_req),
735 		sizeof(struct pipeline_msg_rsp));
736 
737 	return calloc(1, size);
738 }
739 
740 static void
741 pipeline_msg_free(struct pipeline_msg_rsp *rsp)
742 {
743 	free(rsp);
744 }
745 
746 static struct pipeline_msg_rsp *
747 pipeline_msg_send_recv(struct pipeline *p,
748 	struct pipeline_msg_req *req)
749 {
750 	struct rte_ring *msgq_req = p->msgq_req;
751 	struct rte_ring *msgq_rsp = p->msgq_rsp;
752 	struct pipeline_msg_rsp *rsp;
753 	int status;
754 
755 	/* send */
756 	do {
757 		status = rte_ring_sp_enqueue(msgq_req, req);
758 	} while (status == -ENOBUFS);
759 
760 	/* recv */
761 	do {
762 		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
763 	} while (status != 0);
764 
765 	return rsp;
766 }
767 
768 int
769 pipeline_port_in_stats_read(const char *pipeline_name,
770 	uint32_t port_id,
771 	struct rte_pipeline_port_in_stats *stats,
772 	int clear)
773 {
774 	struct pipeline *p;
775 	struct pipeline_msg_req *req;
776 	struct pipeline_msg_rsp *rsp;
777 	int status;
778 
779 	/* Check input params */
780 	if ((pipeline_name == NULL) ||
781 		(stats == NULL))
782 		return -1;
783 
784 	p = pipeline_find(pipeline_name);
785 	if ((p == NULL) ||
786 		(port_id >= p->n_ports_in))
787 		return -1;
788 
789 	if (!pipeline_is_running(p)) {
790 		status = rte_pipeline_port_in_stats_read(p->p,
791 			port_id,
792 			stats,
793 			clear);
794 
795 		return status;
796 	}
797 
798 	/* Allocate request */
799 	req = pipeline_msg_alloc();
800 	if (req == NULL)
801 		return -1;
802 
803 	/* Write request */
804 	req->type = PIPELINE_REQ_PORT_IN_STATS_READ;
805 	req->id = port_id;
806 	req->port_in_stats_read.clear = clear;
807 
808 	/* Send request and wait for response */
809 	rsp = pipeline_msg_send_recv(p, req);
810 	if (rsp == NULL)
811 		return -1;
812 
813 	/* Read response */
814 	status = rsp->status;
815 	if (status)
816 		memcpy(stats, &rsp->port_in_stats_read.stats, sizeof(*stats));
817 
818 	/* Free response */
819 	pipeline_msg_free(rsp);
820 
821 	return status;
822 }
823 
824 int
825 pipeline_port_in_enable(const char *pipeline_name,
826 	uint32_t port_id)
827 {
828 	struct pipeline *p;
829 	struct pipeline_msg_req *req;
830 	struct pipeline_msg_rsp *rsp;
831 	int status;
832 
833 	/* Check input params */
834 	if (pipeline_name == NULL)
835 		return -1;
836 
837 	p = pipeline_find(pipeline_name);
838 	if ((p == NULL) ||
839 		(port_id >= p->n_ports_in))
840 		return -1;
841 
842 	if (!pipeline_is_running(p)) {
843 		status = rte_pipeline_port_in_enable(p->p, port_id);
844 		return status;
845 	}
846 
847 	/* Allocate request */
848 	req = pipeline_msg_alloc();
849 	if (req == NULL)
850 		return -1;
851 
852 	/* Write request */
853 	req->type = PIPELINE_REQ_PORT_IN_ENABLE;
854 	req->id = port_id;
855 
856 	/* Send request and wait for response */
857 	rsp = pipeline_msg_send_recv(p, req);
858 	if (rsp == NULL)
859 		return -1;
860 
861 	/* Read response */
862 	status = rsp->status;
863 
864 	/* Free response */
865 	pipeline_msg_free(rsp);
866 
867 	return status;
868 }
869 
870 int
871 pipeline_port_in_disable(const char *pipeline_name,
872 	uint32_t port_id)
873 {
874 	struct pipeline *p;
875 	struct pipeline_msg_req *req;
876 	struct pipeline_msg_rsp *rsp;
877 	int status;
878 
879 	/* Check input params */
880 	if (pipeline_name == NULL)
881 		return -1;
882 
883 	p = pipeline_find(pipeline_name);
884 	if ((p == NULL) ||
885 		(port_id >= p->n_ports_in))
886 		return -1;
887 
888 	if (!pipeline_is_running(p)) {
889 		status = rte_pipeline_port_in_disable(p->p, port_id);
890 		return status;
891 	}
892 
893 	/* Allocate request */
894 	req = pipeline_msg_alloc();
895 	if (req == NULL)
896 		return -1;
897 
898 	/* Write request */
899 	req->type = PIPELINE_REQ_PORT_IN_DISABLE;
900 	req->id = port_id;
901 
902 	/* Send request and wait for response */
903 	rsp = pipeline_msg_send_recv(p, req);
904 	if (rsp == NULL)
905 		return -1;
906 
907 	/* Read response */
908 	status = rsp->status;
909 
910 	/* Free response */
911 	pipeline_msg_free(rsp);
912 
913 	return status;
914 }
915 
916 int
917 pipeline_port_out_stats_read(const char *pipeline_name,
918 	uint32_t port_id,
919 	struct rte_pipeline_port_out_stats *stats,
920 	int clear)
921 {
922 	struct pipeline *p;
923 	struct pipeline_msg_req *req;
924 	struct pipeline_msg_rsp *rsp;
925 	int status;
926 
927 	/* Check input params */
928 	if ((pipeline_name == NULL) ||
929 		(stats == NULL))
930 		return -1;
931 
932 	p = pipeline_find(pipeline_name);
933 	if ((p == NULL) ||
934 		(port_id >= p->n_ports_out))
935 		return -1;
936 
937 	if (!pipeline_is_running(p)) {
938 		status = rte_pipeline_port_out_stats_read(p->p,
939 			port_id,
940 			stats,
941 			clear);
942 
943 		return status;
944 	}
945 
946 	/* Allocate request */
947 	req = pipeline_msg_alloc();
948 	if (req == NULL)
949 		return -1;
950 
951 	/* Write request */
952 	req->type = PIPELINE_REQ_PORT_OUT_STATS_READ;
953 	req->id = port_id;
954 	req->port_out_stats_read.clear = clear;
955 
956 	/* Send request and wait for response */
957 	rsp = pipeline_msg_send_recv(p, req);
958 	if (rsp == NULL)
959 		return -1;
960 
961 	/* Read response */
962 	status = rsp->status;
963 	if (status)
964 		memcpy(stats, &rsp->port_out_stats_read.stats, sizeof(*stats));
965 
966 	/* Free response */
967 	pipeline_msg_free(rsp);
968 
969 	return status;
970 }
971 
972 int
973 pipeline_table_stats_read(const char *pipeline_name,
974 	uint32_t table_id,
975 	struct rte_pipeline_table_stats *stats,
976 	int clear)
977 {
978 	struct pipeline *p;
979 	struct pipeline_msg_req *req;
980 	struct pipeline_msg_rsp *rsp;
981 	int status;
982 
983 	/* Check input params */
984 	if ((pipeline_name == NULL) ||
985 		(stats == NULL))
986 		return -1;
987 
988 	p = pipeline_find(pipeline_name);
989 	if ((p == NULL) ||
990 		(table_id >= p->n_tables))
991 		return -1;
992 
993 	if (!pipeline_is_running(p)) {
994 		status = rte_pipeline_table_stats_read(p->p,
995 			table_id,
996 			stats,
997 			clear);
998 
999 		return status;
1000 	}
1001 
1002 	/* Allocate request */
1003 	req = pipeline_msg_alloc();
1004 	if (req == NULL)
1005 		return -1;
1006 
1007 	/* Write request */
1008 	req->type = PIPELINE_REQ_TABLE_STATS_READ;
1009 	req->id = table_id;
1010 	req->table_stats_read.clear = clear;
1011 
1012 	/* Send request and wait for response */
1013 	rsp = pipeline_msg_send_recv(p, req);
1014 	if (rsp == NULL)
1015 		return -1;
1016 
1017 	/* Read response */
1018 	status = rsp->status;
1019 	if (status)
1020 		memcpy(stats, &rsp->table_stats_read.stats, sizeof(*stats));
1021 
1022 	/* Free response */
1023 	pipeline_msg_free(rsp);
1024 
1025 	return status;
1026 }
1027 
1028 static int
1029 match_check(struct table_rule_match *match,
1030 	struct pipeline *p,
1031 	uint32_t table_id)
1032 {
1033 	struct table *table;
1034 
1035 	if ((match == NULL) ||
1036 		(p == NULL) ||
1037 		(table_id >= p->n_tables))
1038 		return -1;
1039 
1040 	table = &p->table[table_id];
1041 	if (match->match_type != table->params.match_type)
1042 		return -1;
1043 
1044 	switch (match->match_type) {
1045 	case TABLE_ACL:
1046 	{
1047 		struct table_acl_params *t = &table->params.match.acl;
1048 		struct table_rule_match_acl *r = &match->match.acl;
1049 
1050 		if ((r->ip_version && (t->ip_version == 0)) ||
1051 			((r->ip_version == 0) && t->ip_version))
1052 			return -1;
1053 
1054 		if (r->ip_version) {
1055 			if ((r->sa_depth > 32) ||
1056 				(r->da_depth > 32))
1057 				return -1;
1058 		} else {
1059 			if ((r->sa_depth > 128) ||
1060 				(r->da_depth > 128))
1061 				return -1;
1062 		}
1063 		return 0;
1064 	}
1065 
1066 	case TABLE_ARRAY:
1067 		return 0;
1068 
1069 	case TABLE_HASH:
1070 		return 0;
1071 
1072 	case TABLE_LPM:
1073 	{
1074 		struct table_lpm_params *t = &table->params.match.lpm;
1075 		struct table_rule_match_lpm *r = &match->match.lpm;
1076 
1077 		if ((r->ip_version && (t->key_size != 4)) ||
1078 			((r->ip_version == 0) && (t->key_size != 16)))
1079 			return -1;
1080 
1081 		if (r->ip_version) {
1082 			if (r->depth > 32)
1083 				return -1;
1084 		} else {
1085 			if (r->depth > 128)
1086 				return -1;
1087 		}
1088 		return 0;
1089 	}
1090 
1091 	case TABLE_STUB:
1092 		return -1;
1093 
1094 	default:
1095 		return -1;
1096 	}
1097 }
1098 
1099 static int
1100 action_check(struct table_rule_action *action,
1101 	struct pipeline *p,
1102 	uint32_t table_id)
1103 {
1104 	struct table_action_profile *ap;
1105 
1106 	if ((action == NULL) ||
1107 		(p == NULL) ||
1108 		(table_id >= p->n_tables))
1109 		return -1;
1110 
1111 	ap = p->table[table_id].ap;
1112 	if (action->action_mask != ap->params.action_mask)
1113 		return -1;
1114 
1115 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1116 		if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1117 			(action->fwd.id >= p->n_ports_out))
1118 			return -1;
1119 
1120 		if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1121 			(action->fwd.id >= p->n_tables))
1122 			return -1;
1123 	}
1124 
1125 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
1126 		uint32_t tc_mask0 = (1 << ap->params.mtr.n_tc) - 1;
1127 		uint32_t tc_mask1 = action->mtr.tc_mask;
1128 
1129 		if (tc_mask1 != tc_mask0)
1130 			return -1;
1131 	}
1132 
1133 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
1134 		uint32_t n_subports_per_port =
1135 			ap->params.tm.n_subports_per_port;
1136 		uint32_t n_pipes_per_subport =
1137 			ap->params.tm.n_pipes_per_subport;
1138 		uint32_t subport_id = action->tm.subport_id;
1139 		uint32_t pipe_id = action->tm.pipe_id;
1140 
1141 		if ((subport_id >= n_subports_per_port) ||
1142 			(pipe_id >= n_pipes_per_subport))
1143 			return -1;
1144 	}
1145 
1146 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
1147 		uint64_t encap_mask = ap->params.encap.encap_mask;
1148 		enum rte_table_action_encap_type type = action->encap.type;
1149 
1150 		if ((encap_mask & (1LLU << type)) == 0)
1151 			return -1;
1152 	}
1153 
1154 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
1155 		int ip_version0 = ap->params.common.ip_version;
1156 		int ip_version1 = action->nat.ip_version;
1157 
1158 		if ((ip_version1 && (ip_version0 == 0)) ||
1159 			((ip_version1 == 0) && ip_version0))
1160 			return -1;
1161 	}
1162 
1163 	return 0;
1164 }
1165 
1166 static int
1167 action_default_check(struct table_rule_action *action,
1168 	struct pipeline *p,
1169 	uint32_t table_id)
1170 {
1171 	if ((action == NULL) ||
1172 		(action->action_mask != (1LLU << RTE_TABLE_ACTION_FWD)) ||
1173 		(p == NULL) ||
1174 		(table_id >= p->n_tables))
1175 		return -1;
1176 
1177 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1178 		if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1179 			(action->fwd.id >= p->n_ports_out))
1180 			return -1;
1181 
1182 		if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1183 			(action->fwd.id >= p->n_tables))
1184 			return -1;
1185 	}
1186 
1187 	return 0;
1188 }
1189 
1190 union table_rule_match_low_level {
1191 	struct rte_table_acl_rule_add_params acl_add;
1192 	struct rte_table_acl_rule_delete_params acl_delete;
1193 	struct rte_table_array_key array;
1194 	uint8_t hash[TABLE_RULE_MATCH_SIZE_MAX];
1195 	struct rte_table_lpm_key lpm_ipv4;
1196 	struct rte_table_lpm_ipv6_key lpm_ipv6;
1197 };
1198 
1199 static int
1200 match_convert(struct table_rule_match *mh,
1201 	union table_rule_match_low_level *ml,
1202 	int add);
1203 
1204 static int
1205 action_convert(struct rte_table_action *a,
1206 	struct table_rule_action *action,
1207 	struct rte_pipeline_table_entry *data);
1208 
1209 int
1210 pipeline_table_rule_add(const char *pipeline_name,
1211 	uint32_t table_id,
1212 	struct table_rule_match *match,
1213 	struct table_rule_action *action,
1214 	void **data)
1215 {
1216 	struct pipeline *p;
1217 	struct pipeline_msg_req *req;
1218 	struct pipeline_msg_rsp *rsp;
1219 	int status;
1220 
1221 	/* Check input params */
1222 	if ((pipeline_name == NULL) ||
1223 		(match == NULL) ||
1224 		(action == NULL) ||
1225 		(data == NULL))
1226 		return -1;
1227 
1228 	p = pipeline_find(pipeline_name);
1229 	if ((p == NULL) ||
1230 		(table_id >= p->n_tables) ||
1231 		match_check(match, p, table_id) ||
1232 		action_check(action, p, table_id))
1233 		return -1;
1234 
1235 	if (!pipeline_is_running(p)) {
1236 		struct rte_table_action *a = p->table[table_id].a;
1237 		union table_rule_match_low_level match_ll;
1238 		struct rte_pipeline_table_entry *data_in, *data_out;
1239 		int key_found;
1240 		uint8_t *buffer;
1241 
1242 		buffer = calloc(TABLE_RULE_ACTION_SIZE_MAX, sizeof(uint8_t));
1243 		if (buffer == NULL)
1244 			return -1;
1245 
1246 		/* Table match-action rule conversion */
1247 		data_in = (struct rte_pipeline_table_entry *)buffer;
1248 
1249 		status = match_convert(match, &match_ll, 1);
1250 		if (status) {
1251 			free(buffer);
1252 			return -1;
1253 		}
1254 
1255 		status = action_convert(a, action, data_in);
1256 		if (status) {
1257 			free(buffer);
1258 			return -1;
1259 		}
1260 
1261 		/* Add rule (match, action) to table */
1262 		status = rte_pipeline_table_entry_add(p->p,
1263 				table_id,
1264 				&match_ll,
1265 				data_in,
1266 				&key_found,
1267 				&data_out);
1268 		if (status) {
1269 			free(buffer);
1270 			return -1;
1271 		}
1272 
1273 		/* Write Response */
1274 		*data = data_out;
1275 
1276 		free(buffer);
1277 		return 0;
1278 	}
1279 
1280 	/* Allocate request */
1281 	req = pipeline_msg_alloc();
1282 	if (req == NULL)
1283 		return -1;
1284 
1285 	/* Write request */
1286 	req->type = PIPELINE_REQ_TABLE_RULE_ADD;
1287 	req->id = table_id;
1288 	memcpy(&req->table_rule_add.match, match, sizeof(*match));
1289 	memcpy(&req->table_rule_add.action, action, sizeof(*action));
1290 
1291 	/* Send request and wait for response */
1292 	rsp = pipeline_msg_send_recv(p, req);
1293 	if (rsp == NULL)
1294 		return -1;
1295 
1296 	/* Read response */
1297 	status = rsp->status;
1298 	if (status == 0)
1299 		*data = rsp->table_rule_add.data;
1300 
1301 	/* Free response */
1302 	pipeline_msg_free(rsp);
1303 
1304 	return status;
1305 }
1306 
1307 int
1308 pipeline_table_rule_add_default(const char *pipeline_name,
1309 	uint32_t table_id,
1310 	struct table_rule_action *action,
1311 	void **data)
1312 {
1313 	struct pipeline *p;
1314 	struct pipeline_msg_req *req;
1315 	struct pipeline_msg_rsp *rsp;
1316 	int status;
1317 
1318 	/* Check input params */
1319 	if ((pipeline_name == NULL) ||
1320 		(action == NULL) ||
1321 		(data == NULL))
1322 		return -1;
1323 
1324 	p = pipeline_find(pipeline_name);
1325 	if ((p == NULL) ||
1326 		(table_id >= p->n_tables) ||
1327 		action_default_check(action, p, table_id))
1328 		return -1;
1329 
1330 	if (!pipeline_is_running(p)) {
1331 		struct rte_pipeline_table_entry *data_in, *data_out;
1332 		uint8_t *buffer;
1333 
1334 		buffer = calloc(TABLE_RULE_ACTION_SIZE_MAX, sizeof(uint8_t));
1335 		if (buffer == NULL)
1336 			return -1;
1337 
1338 		/* Apply actions */
1339 		data_in = (struct rte_pipeline_table_entry *)buffer;
1340 
1341 		data_in->action = action->fwd.action;
1342 		if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
1343 			data_in->port_id = action->fwd.id;
1344 		if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
1345 			data_in->table_id = action->fwd.id;
1346 
1347 		/* Add default rule to table */
1348 		status = rte_pipeline_table_default_entry_add(p->p,
1349 				table_id,
1350 				data_in,
1351 				&data_out);
1352 		if (status) {
1353 			free(buffer);
1354 			return -1;
1355 		}
1356 
1357 		/* Write Response */
1358 		*data = data_out;
1359 
1360 		free(buffer);
1361 		return 0;
1362 	}
1363 
1364 	/* Allocate request */
1365 	req = pipeline_msg_alloc();
1366 	if (req == NULL)
1367 		return -1;
1368 
1369 	/* Write request */
1370 	req->type = PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT;
1371 	req->id = table_id;
1372 	memcpy(&req->table_rule_add_default.action, action, sizeof(*action));
1373 
1374 	/* Send request and wait for response */
1375 	rsp = pipeline_msg_send_recv(p, req);
1376 	if (rsp == NULL)
1377 		return -1;
1378 
1379 	/* Read response */
1380 	status = rsp->status;
1381 	if (status == 0)
1382 		*data = rsp->table_rule_add_default.data;
1383 
1384 	/* Free response */
1385 	pipeline_msg_free(rsp);
1386 
1387 	return status;
1388 }
1389 
1390 int
1391 pipeline_table_rule_add_bulk(const char *pipeline_name,
1392 	uint32_t table_id,
1393 	struct table_rule_match *match,
1394 	struct table_rule_action *action,
1395 	void **data,
1396 	uint32_t *n_rules)
1397 {
1398 	struct pipeline *p;
1399 	struct pipeline_msg_req *req;
1400 	struct pipeline_msg_rsp *rsp;
1401 	uint32_t i;
1402 	int status;
1403 
1404 	/* Check input params */
1405 	if ((pipeline_name == NULL) ||
1406 		(match == NULL) ||
1407 		(action == NULL) ||
1408 		(data == NULL) ||
1409 		(n_rules == NULL) ||
1410 		(*n_rules == 0))
1411 		return -1;
1412 
1413 	p = pipeline_find(pipeline_name);
1414 	if ((p == NULL) ||
1415 		(table_id >= p->n_tables))
1416 		return -1;
1417 
1418 	for (i = 0; i < *n_rules; i++)
1419 		if (match_check(match, p, table_id) ||
1420 			action_check(action, p, table_id))
1421 			return -1;
1422 
1423 	if (!pipeline_is_running(p)) {
1424 		struct rte_table_action *a = p->table[table_id].a;
1425 		union table_rule_match_low_level *match_ll;
1426 		uint8_t *action_ll;
1427 		void **match_ll_ptr;
1428 		struct rte_pipeline_table_entry **action_ll_ptr;
1429 		struct rte_pipeline_table_entry **entries_ptr =
1430 			(struct rte_pipeline_table_entry **)data;
1431 		uint32_t bulk =
1432 			(p->table[table_id].params.match_type == TABLE_ACL) ? 1 : 0;
1433 		int *found;
1434 
1435 		/* Memory allocation */
1436 		match_ll = calloc(*n_rules, sizeof(union table_rule_match_low_level));
1437 		action_ll = calloc(*n_rules, TABLE_RULE_ACTION_SIZE_MAX);
1438 		match_ll_ptr = calloc(*n_rules, sizeof(void *));
1439 		action_ll_ptr =
1440 			calloc(*n_rules, sizeof(struct rte_pipeline_table_entry *));
1441 		found = calloc(*n_rules, sizeof(int));
1442 
1443 		if (match_ll == NULL ||
1444 			action_ll == NULL ||
1445 			match_ll_ptr == NULL ||
1446 			action_ll_ptr == NULL ||
1447 			found == NULL)
1448 			goto fail;
1449 
1450 		for (i = 0; i < *n_rules; i++) {
1451 			match_ll_ptr[i] = (void *)&match_ll[i];
1452 			action_ll_ptr[i] =
1453 				(struct rte_pipeline_table_entry *)&action_ll[i * TABLE_RULE_ACTION_SIZE_MAX];
1454 		}
1455 
1456 		/* Rule match conversion */
1457 		for (i = 0; i < *n_rules; i++) {
1458 			status = match_convert(&match[i], match_ll_ptr[i], 1);
1459 			if (status)
1460 				goto fail;
1461 		}
1462 
1463 		/* Rule action conversion */
1464 		for (i = 0; i < *n_rules; i++) {
1465 			status = action_convert(a, &action[i], action_ll_ptr[i]);
1466 			if (status)
1467 				goto fail;
1468 		}
1469 
1470 		/* Add rule (match, action) to table */
1471 		if (bulk) {
1472 			status = rte_pipeline_table_entry_add_bulk(p->p,
1473 				table_id,
1474 				match_ll_ptr,
1475 				action_ll_ptr,
1476 				*n_rules,
1477 				found,
1478 				entries_ptr);
1479 			if (status)
1480 				*n_rules = 0;
1481 		} else {
1482 			for (i = 0; i < *n_rules; i++) {
1483 				status = rte_pipeline_table_entry_add(p->p,
1484 					table_id,
1485 					match_ll_ptr[i],
1486 					action_ll_ptr[i],
1487 					&found[i],
1488 					&entries_ptr[i]);
1489 				if (status) {
1490 					*n_rules = i;
1491 					break;
1492 				}
1493 			}
1494 		}
1495 
1496 		/* Free */
1497 		free(found);
1498 		free(action_ll_ptr);
1499 		free(match_ll_ptr);
1500 		free(action_ll);
1501 		free(match_ll);
1502 
1503 		return status;
1504 
1505 fail:
1506 		free(found);
1507 		free(action_ll_ptr);
1508 		free(match_ll_ptr);
1509 		free(action_ll);
1510 		free(match_ll);
1511 
1512 		*n_rules = 0;
1513 		return -1;
1514 	}
1515 
1516 	/* Allocate request */
1517 	req = pipeline_msg_alloc();
1518 	if (req == NULL)
1519 		return -1;
1520 
1521 	/* Write request */
1522 	req->type = PIPELINE_REQ_TABLE_RULE_ADD_BULK;
1523 	req->id = table_id;
1524 	req->table_rule_add_bulk.match = match;
1525 	req->table_rule_add_bulk.action = action;
1526 	req->table_rule_add_bulk.data = data;
1527 	req->table_rule_add_bulk.n_rules = *n_rules;
1528 	req->table_rule_add_bulk.bulk =
1529 		(p->table[table_id].params.match_type == TABLE_ACL) ? 1 : 0;
1530 
1531 	/* Send request and wait for response */
1532 	rsp = pipeline_msg_send_recv(p, req);
1533 	if (rsp == NULL)
1534 		return -1;
1535 
1536 	/* Read response */
1537 	status = rsp->status;
1538 	if (status == 0)
1539 		*n_rules = rsp->table_rule_add_bulk.n_rules;
1540 
1541 	/* Free response */
1542 	pipeline_msg_free(rsp);
1543 
1544 	return status;
1545 }
1546 
1547 int
1548 pipeline_table_rule_delete(const char *pipeline_name,
1549 	uint32_t table_id,
1550 	struct table_rule_match *match)
1551 {
1552 	struct pipeline *p;
1553 	struct pipeline_msg_req *req;
1554 	struct pipeline_msg_rsp *rsp;
1555 	int status;
1556 
1557 	/* Check input params */
1558 	if ((pipeline_name == NULL) ||
1559 		(match == NULL))
1560 		return -1;
1561 
1562 	p = pipeline_find(pipeline_name);
1563 	if ((p == NULL) ||
1564 		(table_id >= p->n_tables) ||
1565 		match_check(match, p, table_id))
1566 		return -1;
1567 
1568 	if (!pipeline_is_running(p)) {
1569 		union table_rule_match_low_level match_ll;
1570 		int key_found;
1571 
1572 		status = match_convert(match, &match_ll, 0);
1573 		if (status)
1574 			return -1;
1575 
1576 		status = rte_pipeline_table_entry_delete(p->p,
1577 				table_id,
1578 				&match_ll,
1579 				&key_found,
1580 				NULL);
1581 
1582 		return status;
1583 	}
1584 
1585 	/* Allocate request */
1586 	req = pipeline_msg_alloc();
1587 	if (req == NULL)
1588 		return -1;
1589 
1590 	/* Write request */
1591 	req->type = PIPELINE_REQ_TABLE_RULE_DELETE;
1592 	req->id = table_id;
1593 	memcpy(&req->table_rule_delete.match, match, sizeof(*match));
1594 
1595 	/* Send request and wait for response */
1596 	rsp = pipeline_msg_send_recv(p, req);
1597 	if (rsp == NULL)
1598 		return -1;
1599 
1600 	/* Read response */
1601 	status = rsp->status;
1602 
1603 	/* Free response */
1604 	pipeline_msg_free(rsp);
1605 
1606 	return status;
1607 }
1608 
1609 int
1610 pipeline_table_rule_delete_default(const char *pipeline_name,
1611 	uint32_t table_id)
1612 {
1613 	struct pipeline *p;
1614 	struct pipeline_msg_req *req;
1615 	struct pipeline_msg_rsp *rsp;
1616 	int status;
1617 
1618 	/* Check input params */
1619 	if (pipeline_name == NULL)
1620 		return -1;
1621 
1622 	p = pipeline_find(pipeline_name);
1623 	if ((p == NULL) ||
1624 		(table_id >= p->n_tables))
1625 		return -1;
1626 
1627 	if (!pipeline_is_running(p)) {
1628 		status = rte_pipeline_table_default_entry_delete(p->p,
1629 			table_id,
1630 			NULL);
1631 
1632 		return status;
1633 	}
1634 
1635 	/* Allocate request */
1636 	req = pipeline_msg_alloc();
1637 	if (req == NULL)
1638 		return -1;
1639 
1640 	/* Write request */
1641 	req->type = PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT;
1642 	req->id = table_id;
1643 
1644 	/* Send request and wait for response */
1645 	rsp = pipeline_msg_send_recv(p, req);
1646 	if (rsp == NULL)
1647 		return -1;
1648 
1649 	/* Read response */
1650 	status = rsp->status;
1651 
1652 	/* Free response */
1653 	pipeline_msg_free(rsp);
1654 
1655 	return status;
1656 }
1657 
1658 int
1659 pipeline_table_rule_stats_read(const char *pipeline_name,
1660 	uint32_t table_id,
1661 	void *data,
1662 	struct rte_table_action_stats_counters *stats,
1663 	int clear)
1664 {
1665 	struct pipeline *p;
1666 	struct pipeline_msg_req *req;
1667 	struct pipeline_msg_rsp *rsp;
1668 	int status;
1669 
1670 	/* Check input params */
1671 	if ((pipeline_name == NULL) ||
1672 		(data == NULL) ||
1673 		(stats == NULL))
1674 		return -1;
1675 
1676 	p = pipeline_find(pipeline_name);
1677 	if ((p == NULL) ||
1678 		(table_id >= p->n_tables))
1679 		return -1;
1680 
1681 	if (!pipeline_is_running(p)) {
1682 		struct rte_table_action *a = p->table[table_id].a;
1683 
1684 		status = rte_table_action_stats_read(a,
1685 			data,
1686 			stats,
1687 			clear);
1688 
1689 		return status;
1690 	}
1691 
1692 	/* Allocate request */
1693 	req = pipeline_msg_alloc();
1694 	if (req == NULL)
1695 		return -1;
1696 
1697 	/* Write request */
1698 	req->type = PIPELINE_REQ_TABLE_RULE_STATS_READ;
1699 	req->id = table_id;
1700 	req->table_rule_stats_read.data = data;
1701 	req->table_rule_stats_read.clear = clear;
1702 
1703 	/* Send request and wait for response */
1704 	rsp = pipeline_msg_send_recv(p, req);
1705 	if (rsp == NULL)
1706 		return -1;
1707 
1708 	/* Read response */
1709 	status = rsp->status;
1710 	if (status)
1711 		memcpy(stats, &rsp->table_rule_stats_read.stats, sizeof(*stats));
1712 
1713 	/* Free response */
1714 	pipeline_msg_free(rsp);
1715 
1716 	return status;
1717 }
1718 
1719 int
1720 pipeline_table_mtr_profile_add(const char *pipeline_name,
1721 	uint32_t table_id,
1722 	uint32_t meter_profile_id,
1723 	struct rte_table_action_meter_profile *profile)
1724 {
1725 	struct pipeline *p;
1726 	struct pipeline_msg_req *req;
1727 	struct pipeline_msg_rsp *rsp;
1728 	int status;
1729 
1730 	/* Check input params */
1731 	if ((pipeline_name == NULL) ||
1732 		(profile == NULL))
1733 		return -1;
1734 
1735 	p = pipeline_find(pipeline_name);
1736 	if ((p == NULL) ||
1737 		(table_id >= p->n_tables))
1738 		return -1;
1739 
1740 	if (!pipeline_is_running(p)) {
1741 		struct rte_table_action *a = p->table[table_id].a;
1742 
1743 		status = rte_table_action_meter_profile_add(a,
1744 			meter_profile_id,
1745 			profile);
1746 
1747 		return status;
1748 	}
1749 
1750 	/* Allocate request */
1751 	req = pipeline_msg_alloc();
1752 	if (req == NULL)
1753 		return -1;
1754 
1755 	/* Write request */
1756 	req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_ADD;
1757 	req->id = table_id;
1758 	req->table_mtr_profile_add.meter_profile_id = meter_profile_id;
1759 	memcpy(&req->table_mtr_profile_add.profile, profile, sizeof(*profile));
1760 
1761 	/* Send request and wait for response */
1762 	rsp = pipeline_msg_send_recv(p, req);
1763 	if (rsp == NULL)
1764 		return -1;
1765 
1766 	/* Read response */
1767 	status = rsp->status;
1768 
1769 	/* Free response */
1770 	pipeline_msg_free(rsp);
1771 
1772 	return status;
1773 }
1774 
1775 int
1776 pipeline_table_mtr_profile_delete(const char *pipeline_name,
1777 	uint32_t table_id,
1778 	uint32_t meter_profile_id)
1779 {
1780 	struct pipeline *p;
1781 	struct pipeline_msg_req *req;
1782 	struct pipeline_msg_rsp *rsp;
1783 	int status;
1784 
1785 	/* Check input params */
1786 	if (pipeline_name == NULL)
1787 		return -1;
1788 
1789 	p = pipeline_find(pipeline_name);
1790 	if ((p == NULL) ||
1791 		(table_id >= p->n_tables))
1792 		return -1;
1793 
1794 	if (!pipeline_is_running(p)) {
1795 		struct rte_table_action *a = p->table[table_id].a;
1796 
1797 		status = rte_table_action_meter_profile_delete(a,
1798 				meter_profile_id);
1799 
1800 		return status;
1801 	}
1802 
1803 	/* Allocate request */
1804 	req = pipeline_msg_alloc();
1805 	if (req == NULL)
1806 		return -1;
1807 
1808 	/* Write request */
1809 	req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE;
1810 	req->id = table_id;
1811 	req->table_mtr_profile_delete.meter_profile_id = meter_profile_id;
1812 
1813 	/* Send request and wait for response */
1814 	rsp = pipeline_msg_send_recv(p, req);
1815 	if (rsp == NULL)
1816 		return -1;
1817 
1818 	/* Read response */
1819 	status = rsp->status;
1820 
1821 	/* Free response */
1822 	pipeline_msg_free(rsp);
1823 
1824 	return status;
1825 }
1826 
1827 int
1828 pipeline_table_rule_mtr_read(const char *pipeline_name,
1829 	uint32_t table_id,
1830 	void *data,
1831 	uint32_t tc_mask,
1832 	struct rte_table_action_mtr_counters *stats,
1833 	int clear)
1834 {
1835 	struct pipeline *p;
1836 	struct pipeline_msg_req *req;
1837 	struct pipeline_msg_rsp *rsp;
1838 	int status;
1839 
1840 	/* Check input params */
1841 	if ((pipeline_name == NULL) ||
1842 		(data == NULL) ||
1843 		(stats == NULL))
1844 		return -1;
1845 
1846 	p = pipeline_find(pipeline_name);
1847 	if ((p == NULL) ||
1848 		(table_id >= p->n_tables))
1849 		return -1;
1850 
1851 	if (!pipeline_is_running(p)) {
1852 		struct rte_table_action *a = p->table[table_id].a;
1853 
1854 		status = rte_table_action_meter_read(a,
1855 				data,
1856 				tc_mask,
1857 				stats,
1858 				clear);
1859 
1860 		return status;
1861 	}
1862 
1863 	/* Allocate request */
1864 	req = pipeline_msg_alloc();
1865 	if (req == NULL)
1866 		return -1;
1867 
1868 	/* Write request */
1869 	req->type = PIPELINE_REQ_TABLE_RULE_MTR_READ;
1870 	req->id = table_id;
1871 	req->table_rule_mtr_read.data = data;
1872 	req->table_rule_mtr_read.tc_mask = tc_mask;
1873 	req->table_rule_mtr_read.clear = clear;
1874 
1875 	/* Send request and wait for response */
1876 	rsp = pipeline_msg_send_recv(p, req);
1877 	if (rsp == NULL)
1878 		return -1;
1879 
1880 	/* Read response */
1881 	status = rsp->status;
1882 	if (status)
1883 		memcpy(stats, &rsp->table_rule_mtr_read.stats, sizeof(*stats));
1884 
1885 	/* Free response */
1886 	pipeline_msg_free(rsp);
1887 
1888 	return status;
1889 }
1890 
1891 int
1892 pipeline_table_dscp_table_update(const char *pipeline_name,
1893 	uint32_t table_id,
1894 	uint64_t dscp_mask,
1895 	struct rte_table_action_dscp_table *dscp_table)
1896 {
1897 	struct pipeline *p;
1898 	struct pipeline_msg_req *req;
1899 	struct pipeline_msg_rsp *rsp;
1900 	int status;
1901 
1902 	/* Check input params */
1903 	if ((pipeline_name == NULL) ||
1904 		(dscp_table == NULL))
1905 		return -1;
1906 
1907 	p = pipeline_find(pipeline_name);
1908 	if ((p == NULL) ||
1909 		(table_id >= p->n_tables))
1910 		return -1;
1911 
1912 	if (!pipeline_is_running(p)) {
1913 		struct rte_table_action *a = p->table[table_id].a;
1914 
1915 		status = rte_table_action_dscp_table_update(a,
1916 				dscp_mask,
1917 				dscp_table);
1918 
1919 		return status;
1920 	}
1921 
1922 	/* Allocate request */
1923 	req = pipeline_msg_alloc();
1924 	if (req == NULL)
1925 		return -1;
1926 
1927 	/* Write request */
1928 	req->type = PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE;
1929 	req->id = table_id;
1930 	req->table_dscp_table_update.dscp_mask = dscp_mask;
1931 	memcpy(&req->table_dscp_table_update.dscp_table,
1932 		dscp_table, sizeof(*dscp_table));
1933 
1934 	/* Send request and wait for response */
1935 	rsp = pipeline_msg_send_recv(p, req);
1936 	if (rsp == NULL)
1937 		return -1;
1938 
1939 	/* Read response */
1940 	status = rsp->status;
1941 
1942 	/* Free response */
1943 	pipeline_msg_free(rsp);
1944 
1945 	return status;
1946 }
1947 
1948 int
1949 pipeline_table_rule_ttl_read(const char *pipeline_name,
1950 	uint32_t table_id,
1951 	void *data,
1952 	struct rte_table_action_ttl_counters *stats,
1953 	int clear)
1954 {
1955 	struct pipeline *p;
1956 	struct pipeline_msg_req *req;
1957 	struct pipeline_msg_rsp *rsp;
1958 	int status;
1959 
1960 	/* Check input params */
1961 	if ((pipeline_name == NULL) ||
1962 		(data == NULL) ||
1963 		(stats == NULL))
1964 		return -1;
1965 
1966 	p = pipeline_find(pipeline_name);
1967 	if ((p == NULL) ||
1968 		(table_id >= p->n_tables))
1969 		return -1;
1970 
1971 	if (!pipeline_is_running(p)) {
1972 		struct rte_table_action *a = p->table[table_id].a;
1973 
1974 		status = rte_table_action_ttl_read(a,
1975 				data,
1976 				stats,
1977 				clear);
1978 
1979 		return status;
1980 	}
1981 
1982 	/* Allocate request */
1983 	req = pipeline_msg_alloc();
1984 	if (req == NULL)
1985 		return -1;
1986 
1987 	/* Write request */
1988 	req->type = PIPELINE_REQ_TABLE_RULE_TTL_READ;
1989 	req->id = table_id;
1990 	req->table_rule_ttl_read.data = data;
1991 	req->table_rule_ttl_read.clear = clear;
1992 
1993 	/* Send request and wait for response */
1994 	rsp = pipeline_msg_send_recv(p, req);
1995 	if (rsp == NULL)
1996 		return -1;
1997 
1998 	/* Read response */
1999 	status = rsp->status;
2000 	if (status)
2001 		memcpy(stats, &rsp->table_rule_ttl_read.stats, sizeof(*stats));
2002 
2003 	/* Free response */
2004 	pipeline_msg_free(rsp);
2005 
2006 	return status;
2007 }
2008 
2009 /**
2010  * Data plane threads: message handling
2011  */
2012 static inline struct pipeline_msg_req *
2013 pipeline_msg_recv(struct rte_ring *msgq_req)
2014 {
2015 	struct pipeline_msg_req *req;
2016 
2017 	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
2018 
2019 	if (status != 0)
2020 		return NULL;
2021 
2022 	return req;
2023 }
2024 
2025 static inline void
2026 pipeline_msg_send(struct rte_ring *msgq_rsp,
2027 	struct pipeline_msg_rsp *rsp)
2028 {
2029 	int status;
2030 
2031 	do {
2032 		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
2033 	} while (status == -ENOBUFS);
2034 }
2035 
2036 static struct pipeline_msg_rsp *
2037 pipeline_msg_handle_port_in_stats_read(struct pipeline_data *p,
2038 	struct pipeline_msg_req *req)
2039 {
2040 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2041 	uint32_t port_id = req->id;
2042 	int clear = req->port_in_stats_read.clear;
2043 
2044 	rsp->status = rte_pipeline_port_in_stats_read(p->p,
2045 		port_id,
2046 		&rsp->port_in_stats_read.stats,
2047 		clear);
2048 
2049 	return rsp;
2050 }
2051 
2052 static struct pipeline_msg_rsp *
2053 pipeline_msg_handle_port_in_enable(struct pipeline_data *p,
2054 	struct pipeline_msg_req *req)
2055 {
2056 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2057 	uint32_t port_id = req->id;
2058 
2059 	rsp->status = rte_pipeline_port_in_enable(p->p,
2060 		port_id);
2061 
2062 	return rsp;
2063 }
2064 
2065 static struct pipeline_msg_rsp *
2066 pipeline_msg_handle_port_in_disable(struct pipeline_data *p,
2067 	struct pipeline_msg_req *req)
2068 {
2069 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2070 	uint32_t port_id = req->id;
2071 
2072 	rsp->status = rte_pipeline_port_in_disable(p->p,
2073 		port_id);
2074 
2075 	return rsp;
2076 }
2077 
2078 static struct pipeline_msg_rsp *
2079 pipeline_msg_handle_port_out_stats_read(struct pipeline_data *p,
2080 	struct pipeline_msg_req *req)
2081 {
2082 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2083 	uint32_t port_id = req->id;
2084 	int clear = req->port_out_stats_read.clear;
2085 
2086 	rsp->status = rte_pipeline_port_out_stats_read(p->p,
2087 		port_id,
2088 		&rsp->port_out_stats_read.stats,
2089 		clear);
2090 
2091 	return rsp;
2092 }
2093 
2094 static struct pipeline_msg_rsp *
2095 pipeline_msg_handle_table_stats_read(struct pipeline_data *p,
2096 	struct pipeline_msg_req *req)
2097 {
2098 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2099 	uint32_t port_id = req->id;
2100 	int clear = req->table_stats_read.clear;
2101 
2102 	rsp->status = rte_pipeline_table_stats_read(p->p,
2103 		port_id,
2104 		&rsp->table_stats_read.stats,
2105 		clear);
2106 
2107 	return rsp;
2108 }
2109 
2110 static int
2111 match_convert_ipv6_depth(uint32_t depth, uint32_t *depth32)
2112 {
2113 	if (depth > 128)
2114 		return -1;
2115 
2116 	switch (depth / 32) {
2117 	case 0:
2118 		depth32[0] = depth;
2119 		depth32[1] = 0;
2120 		depth32[2] = 0;
2121 		depth32[3] = 0;
2122 		return 0;
2123 
2124 	case 1:
2125 		depth32[0] = 32;
2126 		depth32[1] = depth - 32;
2127 		depth32[2] = 0;
2128 		depth32[3] = 0;
2129 		return 0;
2130 
2131 	case 2:
2132 		depth32[0] = 32;
2133 		depth32[1] = 32;
2134 		depth32[2] = depth - 64;
2135 		depth32[3] = 0;
2136 		return 0;
2137 
2138 	case 3:
2139 		depth32[0] = 32;
2140 		depth32[1] = 32;
2141 		depth32[2] = 32;
2142 		depth32[3] = depth - 96;
2143 		return 0;
2144 
2145 	case 4:
2146 		depth32[0] = 32;
2147 		depth32[1] = 32;
2148 		depth32[2] = 32;
2149 		depth32[3] = 32;
2150 		return 0;
2151 
2152 	default:
2153 		return -1;
2154 	}
2155 }
2156 
2157 static int
2158 match_convert(struct table_rule_match *mh,
2159 	union table_rule_match_low_level *ml,
2160 	int add)
2161 {
2162 	memset(ml, 0, sizeof(*ml));
2163 
2164 	switch (mh->match_type) {
2165 	case TABLE_ACL:
2166 		if (mh->match.acl.ip_version)
2167 			if (add) {
2168 				ml->acl_add.field_value[0].value.u8 =
2169 					mh->match.acl.proto;
2170 				ml->acl_add.field_value[0].mask_range.u8 =
2171 					mh->match.acl.proto_mask;
2172 
2173 				ml->acl_add.field_value[1].value.u32 =
2174 					mh->match.acl.ipv4.sa;
2175 				ml->acl_add.field_value[1].mask_range.u32 =
2176 					mh->match.acl.sa_depth;
2177 
2178 				ml->acl_add.field_value[2].value.u32 =
2179 					mh->match.acl.ipv4.da;
2180 				ml->acl_add.field_value[2].mask_range.u32 =
2181 					mh->match.acl.da_depth;
2182 
2183 				ml->acl_add.field_value[3].value.u16 =
2184 					mh->match.acl.sp0;
2185 				ml->acl_add.field_value[3].mask_range.u16 =
2186 					mh->match.acl.sp1;
2187 
2188 				ml->acl_add.field_value[4].value.u16 =
2189 					mh->match.acl.dp0;
2190 				ml->acl_add.field_value[4].mask_range.u16 =
2191 					mh->match.acl.dp1;
2192 
2193 				ml->acl_add.priority =
2194 					(int32_t) mh->match.acl.priority;
2195 			} else {
2196 				ml->acl_delete.field_value[0].value.u8 =
2197 					mh->match.acl.proto;
2198 				ml->acl_delete.field_value[0].mask_range.u8 =
2199 					mh->match.acl.proto_mask;
2200 
2201 				ml->acl_delete.field_value[1].value.u32 =
2202 					mh->match.acl.ipv4.sa;
2203 				ml->acl_delete.field_value[1].mask_range.u32 =
2204 					mh->match.acl.sa_depth;
2205 
2206 				ml->acl_delete.field_value[2].value.u32 =
2207 					mh->match.acl.ipv4.da;
2208 				ml->acl_delete.field_value[2].mask_range.u32 =
2209 					mh->match.acl.da_depth;
2210 
2211 				ml->acl_delete.field_value[3].value.u16 =
2212 					mh->match.acl.sp0;
2213 				ml->acl_delete.field_value[3].mask_range.u16 =
2214 					mh->match.acl.sp1;
2215 
2216 				ml->acl_delete.field_value[4].value.u16 =
2217 					mh->match.acl.dp0;
2218 				ml->acl_delete.field_value[4].mask_range.u16 =
2219 					mh->match.acl.dp1;
2220 			}
2221 		else
2222 			if (add) {
2223 				uint32_t *sa32 =
2224 					(uint32_t *) mh->match.acl.ipv6.sa;
2225 				uint32_t *da32 =
2226 					(uint32_t *) mh->match.acl.ipv6.da;
2227 				uint32_t sa32_depth[4], da32_depth[4];
2228 				int status;
2229 
2230 				status = match_convert_ipv6_depth(
2231 					mh->match.acl.sa_depth,
2232 					sa32_depth);
2233 				if (status)
2234 					return status;
2235 
2236 				status = match_convert_ipv6_depth(
2237 					mh->match.acl.da_depth,
2238 					da32_depth);
2239 				if (status)
2240 					return status;
2241 
2242 				ml->acl_add.field_value[0].value.u8 =
2243 					mh->match.acl.proto;
2244 				ml->acl_add.field_value[0].mask_range.u8 =
2245 					mh->match.acl.proto_mask;
2246 
2247 				ml->acl_add.field_value[1].value.u32 =
2248 					rte_be_to_cpu_32(sa32[0]);
2249 				ml->acl_add.field_value[1].mask_range.u32 =
2250 					sa32_depth[0];
2251 				ml->acl_add.field_value[2].value.u32 =
2252 					rte_be_to_cpu_32(sa32[1]);
2253 				ml->acl_add.field_value[2].mask_range.u32 =
2254 					sa32_depth[1];
2255 				ml->acl_add.field_value[3].value.u32 =
2256 					rte_be_to_cpu_32(sa32[2]);
2257 				ml->acl_add.field_value[3].mask_range.u32 =
2258 					sa32_depth[2];
2259 				ml->acl_add.field_value[4].value.u32 =
2260 					rte_be_to_cpu_32(sa32[3]);
2261 				ml->acl_add.field_value[4].mask_range.u32 =
2262 					sa32_depth[3];
2263 
2264 				ml->acl_add.field_value[5].value.u32 =
2265 					rte_be_to_cpu_32(da32[0]);
2266 				ml->acl_add.field_value[5].mask_range.u32 =
2267 					da32_depth[0];
2268 				ml->acl_add.field_value[6].value.u32 =
2269 					rte_be_to_cpu_32(da32[1]);
2270 				ml->acl_add.field_value[6].mask_range.u32 =
2271 					da32_depth[1];
2272 				ml->acl_add.field_value[7].value.u32 =
2273 					rte_be_to_cpu_32(da32[2]);
2274 				ml->acl_add.field_value[7].mask_range.u32 =
2275 					da32_depth[2];
2276 				ml->acl_add.field_value[8].value.u32 =
2277 					rte_be_to_cpu_32(da32[3]);
2278 				ml->acl_add.field_value[8].mask_range.u32 =
2279 					da32_depth[3];
2280 
2281 				ml->acl_add.field_value[9].value.u16 =
2282 					mh->match.acl.sp0;
2283 				ml->acl_add.field_value[9].mask_range.u16 =
2284 					mh->match.acl.sp1;
2285 
2286 				ml->acl_add.field_value[10].value.u16 =
2287 					mh->match.acl.dp0;
2288 				ml->acl_add.field_value[10].mask_range.u16 =
2289 					mh->match.acl.dp1;
2290 
2291 				ml->acl_add.priority =
2292 					(int32_t) mh->match.acl.priority;
2293 			} else {
2294 				uint32_t *sa32 =
2295 					(uint32_t *) mh->match.acl.ipv6.sa;
2296 				uint32_t *da32 =
2297 					(uint32_t *) mh->match.acl.ipv6.da;
2298 				uint32_t sa32_depth[4], da32_depth[4];
2299 				int status;
2300 
2301 				status = match_convert_ipv6_depth(
2302 					mh->match.acl.sa_depth,
2303 					sa32_depth);
2304 				if (status)
2305 					return status;
2306 
2307 				status = match_convert_ipv6_depth(
2308 					mh->match.acl.da_depth,
2309 					da32_depth);
2310 				if (status)
2311 					return status;
2312 
2313 				ml->acl_delete.field_value[0].value.u8 =
2314 					mh->match.acl.proto;
2315 				ml->acl_delete.field_value[0].mask_range.u8 =
2316 					mh->match.acl.proto_mask;
2317 
2318 				ml->acl_delete.field_value[1].value.u32 =
2319 					rte_be_to_cpu_32(sa32[0]);
2320 				ml->acl_delete.field_value[1].mask_range.u32 =
2321 					sa32_depth[0];
2322 				ml->acl_delete.field_value[2].value.u32 =
2323 					rte_be_to_cpu_32(sa32[1]);
2324 				ml->acl_delete.field_value[2].mask_range.u32 =
2325 					sa32_depth[1];
2326 				ml->acl_delete.field_value[3].value.u32 =
2327 					rte_be_to_cpu_32(sa32[2]);
2328 				ml->acl_delete.field_value[3].mask_range.u32 =
2329 					sa32_depth[2];
2330 				ml->acl_delete.field_value[4].value.u32 =
2331 					rte_be_to_cpu_32(sa32[3]);
2332 				ml->acl_delete.field_value[4].mask_range.u32 =
2333 					sa32_depth[3];
2334 
2335 				ml->acl_delete.field_value[5].value.u32 =
2336 					rte_be_to_cpu_32(da32[0]);
2337 				ml->acl_delete.field_value[5].mask_range.u32 =
2338 					da32_depth[0];
2339 				ml->acl_delete.field_value[6].value.u32 =
2340 					rte_be_to_cpu_32(da32[1]);
2341 				ml->acl_delete.field_value[6].mask_range.u32 =
2342 					da32_depth[1];
2343 				ml->acl_delete.field_value[7].value.u32 =
2344 					rte_be_to_cpu_32(da32[2]);
2345 				ml->acl_delete.field_value[7].mask_range.u32 =
2346 					da32_depth[2];
2347 				ml->acl_delete.field_value[8].value.u32 =
2348 					rte_be_to_cpu_32(da32[3]);
2349 				ml->acl_delete.field_value[8].mask_range.u32 =
2350 					da32_depth[3];
2351 
2352 				ml->acl_delete.field_value[9].value.u16 =
2353 					mh->match.acl.sp0;
2354 				ml->acl_delete.field_value[9].mask_range.u16 =
2355 					mh->match.acl.sp1;
2356 
2357 				ml->acl_delete.field_value[10].value.u16 =
2358 					mh->match.acl.dp0;
2359 				ml->acl_delete.field_value[10].mask_range.u16 =
2360 					mh->match.acl.dp1;
2361 			}
2362 		return 0;
2363 
2364 	case TABLE_ARRAY:
2365 		ml->array.pos = mh->match.array.pos;
2366 		return 0;
2367 
2368 	case TABLE_HASH:
2369 		memcpy(ml->hash, mh->match.hash.key, sizeof(ml->hash));
2370 		return 0;
2371 
2372 	case TABLE_LPM:
2373 		if (mh->match.lpm.ip_version) {
2374 			ml->lpm_ipv4.ip = mh->match.lpm.ipv4;
2375 			ml->lpm_ipv4.depth = mh->match.lpm.depth;
2376 		} else {
2377 			memcpy(ml->lpm_ipv6.ip,
2378 				mh->match.lpm.ipv6, sizeof(ml->lpm_ipv6.ip));
2379 			ml->lpm_ipv6.depth = mh->match.lpm.depth;
2380 		}
2381 
2382 		return 0;
2383 
2384 	default:
2385 		return -1;
2386 	}
2387 }
2388 
2389 static int
2390 action_convert(struct rte_table_action *a,
2391 	struct table_rule_action *action,
2392 	struct rte_pipeline_table_entry *data)
2393 {
2394 	int status;
2395 
2396 	/* Apply actions */
2397 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
2398 		status = rte_table_action_apply(a,
2399 			data,
2400 			RTE_TABLE_ACTION_FWD,
2401 			&action->fwd);
2402 
2403 		if (status)
2404 			return status;
2405 	}
2406 
2407 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_LB)) {
2408 		status = rte_table_action_apply(a,
2409 			data,
2410 			RTE_TABLE_ACTION_LB,
2411 			&action->lb);
2412 
2413 		if (status)
2414 			return status;
2415 	}
2416 
2417 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
2418 		status = rte_table_action_apply(a,
2419 			data,
2420 			RTE_TABLE_ACTION_MTR,
2421 			&action->mtr);
2422 
2423 		if (status)
2424 			return status;
2425 	}
2426 
2427 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
2428 		status = rte_table_action_apply(a,
2429 			data,
2430 			RTE_TABLE_ACTION_TM,
2431 			&action->tm);
2432 
2433 		if (status)
2434 			return status;
2435 	}
2436 
2437 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
2438 		status = rte_table_action_apply(a,
2439 			data,
2440 			RTE_TABLE_ACTION_ENCAP,
2441 			&action->encap);
2442 
2443 		if (status)
2444 			return status;
2445 	}
2446 
2447 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
2448 		status = rte_table_action_apply(a,
2449 			data,
2450 			RTE_TABLE_ACTION_NAT,
2451 			&action->nat);
2452 
2453 		if (status)
2454 			return status;
2455 	}
2456 
2457 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TTL)) {
2458 		status = rte_table_action_apply(a,
2459 			data,
2460 			RTE_TABLE_ACTION_TTL,
2461 			&action->ttl);
2462 
2463 		if (status)
2464 			return status;
2465 	}
2466 
2467 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_STATS)) {
2468 		status = rte_table_action_apply(a,
2469 			data,
2470 			RTE_TABLE_ACTION_STATS,
2471 			&action->stats);
2472 
2473 		if (status)
2474 			return status;
2475 	}
2476 
2477 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TIME)) {
2478 		status = rte_table_action_apply(a,
2479 			data,
2480 			RTE_TABLE_ACTION_TIME,
2481 			&action->time);
2482 
2483 		if (status)
2484 			return status;
2485 	}
2486 
2487 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_SYM_CRYPTO)) {
2488 		status = rte_table_action_apply(a,
2489 			data,
2490 			RTE_TABLE_ACTION_SYM_CRYPTO,
2491 			&action->sym_crypto);
2492 
2493 		if (status)
2494 			return status;
2495 	}
2496 
2497 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TAG)) {
2498 		status = rte_table_action_apply(a,
2499 			data,
2500 			RTE_TABLE_ACTION_TAG,
2501 			&action->tag);
2502 
2503 		if (status)
2504 			return status;
2505 	}
2506 
2507 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_DECAP)) {
2508 		status = rte_table_action_apply(a,
2509 			data,
2510 			RTE_TABLE_ACTION_DECAP,
2511 			&action->decap);
2512 
2513 		if (status)
2514 			return status;
2515 	}
2516 
2517 	return 0;
2518 }
2519 
2520 static struct pipeline_msg_rsp *
2521 pipeline_msg_handle_table_rule_add(struct pipeline_data *p,
2522 	struct pipeline_msg_req *req)
2523 {
2524 	union table_rule_match_low_level match_ll;
2525 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2526 	struct table_rule_match *match = &req->table_rule_add.match;
2527 	struct table_rule_action *action = &req->table_rule_add.action;
2528 	struct rte_pipeline_table_entry *data_in, *data_out;
2529 	uint32_t table_id = req->id;
2530 	int key_found, status;
2531 	struct rte_table_action *a = p->table_data[table_id].a;
2532 
2533 	/* Apply actions */
2534 	memset(p->buffer, 0, sizeof(p->buffer));
2535 	data_in = (struct rte_pipeline_table_entry *) p->buffer;
2536 
2537 	status = match_convert(match, &match_ll, 1);
2538 	if (status) {
2539 		rsp->status = -1;
2540 		return rsp;
2541 	}
2542 
2543 	status = action_convert(a, action, data_in);
2544 	if (status) {
2545 		rsp->status = -1;
2546 		return rsp;
2547 	}
2548 
2549 	status = rte_pipeline_table_entry_add(p->p,
2550 		table_id,
2551 		&match_ll,
2552 		data_in,
2553 		&key_found,
2554 		&data_out);
2555 	if (status) {
2556 		rsp->status = -1;
2557 		return rsp;
2558 	}
2559 
2560 	/* Write response */
2561 	rsp->status = 0;
2562 	rsp->table_rule_add.data = data_out;
2563 
2564 	return rsp;
2565 }
2566 
2567 static struct pipeline_msg_rsp *
2568 pipeline_msg_handle_table_rule_add_default(struct pipeline_data *p,
2569 	struct pipeline_msg_req *req)
2570 {
2571 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2572 	struct table_rule_action *action = &req->table_rule_add_default.action;
2573 	struct rte_pipeline_table_entry *data_in, *data_out;
2574 	uint32_t table_id = req->id;
2575 	int status;
2576 
2577 	/* Apply actions */
2578 	memset(p->buffer, 0, sizeof(p->buffer));
2579 	data_in = (struct rte_pipeline_table_entry *) p->buffer;
2580 
2581 	data_in->action = action->fwd.action;
2582 	if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
2583 		data_in->port_id = action->fwd.id;
2584 	if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
2585 		data_in->table_id = action->fwd.id;
2586 
2587 	/* Add default rule to table */
2588 	status = rte_pipeline_table_default_entry_add(p->p,
2589 		table_id,
2590 		data_in,
2591 		&data_out);
2592 	if (status) {
2593 		rsp->status = -1;
2594 		return rsp;
2595 	}
2596 
2597 	/* Write response */
2598 	rsp->status = 0;
2599 	rsp->table_rule_add_default.data = data_out;
2600 
2601 	return rsp;
2602 }
2603 
2604 static struct pipeline_msg_rsp *
2605 pipeline_msg_handle_table_rule_add_bulk(struct pipeline_data *p,
2606 	struct pipeline_msg_req *req)
2607 {
2608 
2609 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2610 
2611 	uint32_t table_id = req->id;
2612 	struct table_rule_match *match = req->table_rule_add_bulk.match;
2613 	struct table_rule_action *action = req->table_rule_add_bulk.action;
2614 	struct rte_pipeline_table_entry **data =
2615 		(struct rte_pipeline_table_entry **)req->table_rule_add_bulk.data;
2616 	uint32_t n_rules = req->table_rule_add_bulk.n_rules;
2617 	uint32_t bulk = req->table_rule_add_bulk.bulk;
2618 
2619 	struct rte_table_action *a = p->table_data[table_id].a;
2620 	union table_rule_match_low_level *match_ll;
2621 	uint8_t *action_ll;
2622 	void **match_ll_ptr;
2623 	struct rte_pipeline_table_entry **action_ll_ptr;
2624 	int *found, status;
2625 	uint32_t i;
2626 
2627 	/* Memory allocation */
2628 	match_ll = calloc(n_rules, sizeof(union table_rule_match_low_level));
2629 	action_ll = calloc(n_rules, TABLE_RULE_ACTION_SIZE_MAX);
2630 	match_ll_ptr = calloc(n_rules, sizeof(void *));
2631 	action_ll_ptr =
2632 		calloc(n_rules, sizeof(struct rte_pipeline_table_entry *));
2633 	found = calloc(n_rules, sizeof(int));
2634 
2635 	if ((match_ll == NULL) ||
2636 		(action_ll == NULL) ||
2637 		(match_ll_ptr == NULL) ||
2638 		(action_ll_ptr == NULL) ||
2639 		(found == NULL))
2640 		goto fail;
2641 
2642 	for (i = 0; i < n_rules; i++) {
2643 		match_ll_ptr[i] = (void *)&match_ll[i];
2644 		action_ll_ptr[i] =
2645 			(struct rte_pipeline_table_entry *)&action_ll[i * TABLE_RULE_ACTION_SIZE_MAX];
2646 	}
2647 
2648 	/* Rule match conversion */
2649 	for (i = 0; i < n_rules; i++) {
2650 		status = match_convert(&match[i], match_ll_ptr[i], 1);
2651 		if (status)
2652 			goto fail;
2653 	}
2654 
2655 	/* Rule action conversion */
2656 	for (i = 0; i < n_rules; i++) {
2657 		status = action_convert(a, &action[i], action_ll_ptr[i]);
2658 		if (status)
2659 			goto fail;
2660 	}
2661 
2662 	/* Add rule (match, action) to table */
2663 	if (bulk) {
2664 		status = rte_pipeline_table_entry_add_bulk(p->p,
2665 			table_id,
2666 			match_ll_ptr,
2667 			action_ll_ptr,
2668 			n_rules,
2669 			found,
2670 			data);
2671 		if (status)
2672 			n_rules = 0;
2673 	} else
2674 		for (i = 0; i < n_rules; i++) {
2675 			status = rte_pipeline_table_entry_add(p->p,
2676 				table_id,
2677 				match_ll_ptr[i],
2678 				action_ll_ptr[i],
2679 				&found[i],
2680 				&data[i]);
2681 			if (status) {
2682 				n_rules = i;
2683 				break;
2684 			}
2685 		}
2686 
2687 	/* Write response */
2688 	rsp->status = 0;
2689 	rsp->table_rule_add_bulk.n_rules = n_rules;
2690 
2691 	/* Free */
2692 	free(found);
2693 	free(action_ll_ptr);
2694 	free(match_ll_ptr);
2695 	free(action_ll);
2696 	free(match_ll);
2697 
2698 	return rsp;
2699 
2700 fail:
2701 	free(found);
2702 	free(action_ll_ptr);
2703 	free(match_ll_ptr);
2704 	free(action_ll);
2705 	free(match_ll);
2706 
2707 	rsp->status = -1;
2708 	rsp->table_rule_add_bulk.n_rules = 0;
2709 	return rsp;
2710 }
2711 
2712 static struct pipeline_msg_rsp *
2713 pipeline_msg_handle_table_rule_delete(struct pipeline_data *p,
2714 	struct pipeline_msg_req *req)
2715 {
2716 	union table_rule_match_low_level match_ll;
2717 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2718 	struct table_rule_match *match = &req->table_rule_delete.match;
2719 	uint32_t table_id = req->id;
2720 	int key_found, status;
2721 
2722 	status = match_convert(match, &match_ll, 0);
2723 	if (status) {
2724 		rsp->status = -1;
2725 		return rsp;
2726 	}
2727 
2728 	rsp->status = rte_pipeline_table_entry_delete(p->p,
2729 		table_id,
2730 		&match_ll,
2731 		&key_found,
2732 		NULL);
2733 
2734 	return rsp;
2735 }
2736 
2737 static struct pipeline_msg_rsp *
2738 pipeline_msg_handle_table_rule_delete_default(struct pipeline_data *p,
2739 	struct pipeline_msg_req *req)
2740 {
2741 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2742 	uint32_t table_id = req->id;
2743 
2744 	rsp->status = rte_pipeline_table_default_entry_delete(p->p,
2745 		table_id,
2746 		NULL);
2747 
2748 	return rsp;
2749 }
2750 
2751 static struct pipeline_msg_rsp *
2752 pipeline_msg_handle_table_rule_stats_read(struct pipeline_data *p,
2753 	struct pipeline_msg_req *req)
2754 {
2755 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2756 	uint32_t table_id = req->id;
2757 	void *data = req->table_rule_stats_read.data;
2758 	int clear = req->table_rule_stats_read.clear;
2759 	struct rte_table_action *a = p->table_data[table_id].a;
2760 
2761 	rsp->status = rte_table_action_stats_read(a,
2762 		data,
2763 		&rsp->table_rule_stats_read.stats,
2764 		clear);
2765 
2766 	return rsp;
2767 }
2768 
2769 static struct pipeline_msg_rsp *
2770 pipeline_msg_handle_table_mtr_profile_add(struct pipeline_data *p,
2771 	struct pipeline_msg_req *req)
2772 {
2773 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2774 	uint32_t table_id = req->id;
2775 	uint32_t meter_profile_id = req->table_mtr_profile_add.meter_profile_id;
2776 	struct rte_table_action_meter_profile *profile =
2777 		&req->table_mtr_profile_add.profile;
2778 	struct rte_table_action *a = p->table_data[table_id].a;
2779 
2780 	rsp->status = rte_table_action_meter_profile_add(a,
2781 		meter_profile_id,
2782 		profile);
2783 
2784 	return rsp;
2785 }
2786 
2787 static struct pipeline_msg_rsp *
2788 pipeline_msg_handle_table_mtr_profile_delete(struct pipeline_data *p,
2789 	struct pipeline_msg_req *req)
2790 {
2791 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2792 	uint32_t table_id = req->id;
2793 	uint32_t meter_profile_id =
2794 		req->table_mtr_profile_delete.meter_profile_id;
2795 	struct rte_table_action *a = p->table_data[table_id].a;
2796 
2797 	rsp->status = rte_table_action_meter_profile_delete(a,
2798 		meter_profile_id);
2799 
2800 	return rsp;
2801 }
2802 
2803 static struct pipeline_msg_rsp *
2804 pipeline_msg_handle_table_rule_mtr_read(struct pipeline_data *p,
2805 	struct pipeline_msg_req *req)
2806 {
2807 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2808 	uint32_t table_id = req->id;
2809 	void *data = req->table_rule_mtr_read.data;
2810 	uint32_t tc_mask = req->table_rule_mtr_read.tc_mask;
2811 	int clear = req->table_rule_mtr_read.clear;
2812 	struct rte_table_action *a = p->table_data[table_id].a;
2813 
2814 	rsp->status = rte_table_action_meter_read(a,
2815 		data,
2816 		tc_mask,
2817 		&rsp->table_rule_mtr_read.stats,
2818 		clear);
2819 
2820 	return rsp;
2821 }
2822 
2823 static struct pipeline_msg_rsp *
2824 pipeline_msg_handle_table_dscp_table_update(struct pipeline_data *p,
2825 	struct pipeline_msg_req *req)
2826 {
2827 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2828 	uint32_t table_id = req->id;
2829 	uint64_t dscp_mask = req->table_dscp_table_update.dscp_mask;
2830 	struct rte_table_action_dscp_table *dscp_table =
2831 		&req->table_dscp_table_update.dscp_table;
2832 	struct rte_table_action *a = p->table_data[table_id].a;
2833 
2834 	rsp->status = rte_table_action_dscp_table_update(a,
2835 		dscp_mask,
2836 		dscp_table);
2837 
2838 	return rsp;
2839 }
2840 
2841 static struct pipeline_msg_rsp *
2842 pipeline_msg_handle_table_rule_ttl_read(struct pipeline_data *p,
2843 	struct pipeline_msg_req *req)
2844 {
2845 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2846 	uint32_t table_id = req->id;
2847 	void *data = req->table_rule_ttl_read.data;
2848 	int clear = req->table_rule_ttl_read.clear;
2849 	struct rte_table_action *a = p->table_data[table_id].a;
2850 
2851 	rsp->status = rte_table_action_ttl_read(a,
2852 		data,
2853 		&rsp->table_rule_ttl_read.stats,
2854 		clear);
2855 
2856 	return rsp;
2857 }
2858 
2859 static void
2860 pipeline_msg_handle(struct pipeline_data *p)
2861 {
2862 	for ( ; ; ) {
2863 		struct pipeline_msg_req *req;
2864 		struct pipeline_msg_rsp *rsp;
2865 
2866 		req = pipeline_msg_recv(p->msgq_req);
2867 		if (req == NULL)
2868 			break;
2869 
2870 		switch (req->type) {
2871 		case PIPELINE_REQ_PORT_IN_STATS_READ:
2872 			rsp = pipeline_msg_handle_port_in_stats_read(p, req);
2873 			break;
2874 
2875 		case PIPELINE_REQ_PORT_IN_ENABLE:
2876 			rsp = pipeline_msg_handle_port_in_enable(p, req);
2877 			break;
2878 
2879 		case PIPELINE_REQ_PORT_IN_DISABLE:
2880 			rsp = pipeline_msg_handle_port_in_disable(p, req);
2881 			break;
2882 
2883 		case PIPELINE_REQ_PORT_OUT_STATS_READ:
2884 			rsp = pipeline_msg_handle_port_out_stats_read(p, req);
2885 			break;
2886 
2887 		case PIPELINE_REQ_TABLE_STATS_READ:
2888 			rsp = pipeline_msg_handle_table_stats_read(p, req);
2889 			break;
2890 
2891 		case PIPELINE_REQ_TABLE_RULE_ADD:
2892 			rsp = pipeline_msg_handle_table_rule_add(p, req);
2893 			break;
2894 
2895 		case PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT:
2896 			rsp = pipeline_msg_handle_table_rule_add_default(p,	req);
2897 			break;
2898 
2899 		case PIPELINE_REQ_TABLE_RULE_ADD_BULK:
2900 			rsp = pipeline_msg_handle_table_rule_add_bulk(p, req);
2901 			break;
2902 
2903 		case PIPELINE_REQ_TABLE_RULE_DELETE:
2904 			rsp = pipeline_msg_handle_table_rule_delete(p, req);
2905 			break;
2906 
2907 		case PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT:
2908 			rsp = pipeline_msg_handle_table_rule_delete_default(p, req);
2909 			break;
2910 
2911 		case PIPELINE_REQ_TABLE_RULE_STATS_READ:
2912 			rsp = pipeline_msg_handle_table_rule_stats_read(p, req);
2913 			break;
2914 
2915 		case PIPELINE_REQ_TABLE_MTR_PROFILE_ADD:
2916 			rsp = pipeline_msg_handle_table_mtr_profile_add(p, req);
2917 			break;
2918 
2919 		case PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE:
2920 			rsp = pipeline_msg_handle_table_mtr_profile_delete(p, req);
2921 			break;
2922 
2923 		case PIPELINE_REQ_TABLE_RULE_MTR_READ:
2924 			rsp = pipeline_msg_handle_table_rule_mtr_read(p, req);
2925 			break;
2926 
2927 		case PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE:
2928 			rsp = pipeline_msg_handle_table_dscp_table_update(p, req);
2929 			break;
2930 
2931 		case PIPELINE_REQ_TABLE_RULE_TTL_READ:
2932 			rsp = pipeline_msg_handle_table_rule_ttl_read(p, req);
2933 			break;
2934 
2935 		default:
2936 			rsp = (struct pipeline_msg_rsp *) req;
2937 			rsp->status = -1;
2938 		}
2939 
2940 		pipeline_msg_send(p->msgq_rsp, rsp);
2941 	}
2942 }
2943 
2944 /**
2945  * Data plane threads: main
2946  */
2947 int
2948 thread_main(void *arg __rte_unused)
2949 {
2950 	struct thread_data *t;
2951 	uint32_t thread_id, i;
2952 
2953 	thread_id = rte_lcore_id();
2954 	t = &thread_data[thread_id];
2955 
2956 	/* Dispatch loop */
2957 	for (i = 0; ; i++) {
2958 		uint32_t j;
2959 
2960 		/* Data Plane */
2961 		for (j = 0; j < t->n_pipelines; j++)
2962 			rte_pipeline_run(t->p[j]);
2963 
2964 		/* Control Plane */
2965 		if ((i & 0xF) == 0) {
2966 			uint64_t time = rte_get_tsc_cycles();
2967 			uint64_t time_next_min = UINT64_MAX;
2968 
2969 			if (time < t->time_next_min)
2970 				continue;
2971 
2972 			/* Pipeline message queues */
2973 			for (j = 0; j < t->n_pipelines; j++) {
2974 				struct pipeline_data *p =
2975 					&t->pipeline_data[j];
2976 				uint64_t time_next = p->time_next;
2977 
2978 				if (time_next <= time) {
2979 					pipeline_msg_handle(p);
2980 					rte_pipeline_flush(p->p);
2981 					time_next = time + p->timer_period;
2982 					p->time_next = time_next;
2983 				}
2984 
2985 				if (time_next < time_next_min)
2986 					time_next_min = time_next;
2987 			}
2988 
2989 			/* Thread message queues */
2990 			{
2991 				uint64_t time_next = t->time_next;
2992 
2993 				if (time_next <= time) {
2994 					thread_msg_handle(t);
2995 					time_next = time + t->timer_period;
2996 					t->time_next = time_next;
2997 				}
2998 
2999 				if (time_next < time_next_min)
3000 					time_next_min = time_next;
3001 			}
3002 
3003 			t->time_next_min = time_next_min;
3004 		}
3005 	}
3006 
3007 	return 0;
3008 }
3009