xref: /dpdk/examples/ip_pipeline/thread.c (revision 25d11a86c56d50947af33d0b79ede622809bd8b9)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2018 Intel Corporation
3  */
4 
5 #include <stdlib.h>
6 
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11 
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17 
18 #include "common.h"
19 #include "thread.h"
20 #include "pipeline.h"
21 
22 #ifndef THREAD_PIPELINES_MAX
23 #define THREAD_PIPELINES_MAX                               256
24 #endif
25 
26 #ifndef THREAD_MSGQ_SIZE
27 #define THREAD_MSGQ_SIZE                                   64
28 #endif
29 
30 #ifndef THREAD_TIMER_PERIOD_MS
31 #define THREAD_TIMER_PERIOD_MS                             100
32 #endif
33 
34 /**
35  * Master thead: data plane thread context
36  */
37 struct thread {
38 	struct rte_ring *msgq_req;
39 	struct rte_ring *msgq_rsp;
40 
41 	uint32_t enabled;
42 };
43 
44 static struct thread thread[RTE_MAX_LCORE];
45 
46 /**
47  * Data plane threads: context
48  */
49 struct table_data {
50 	struct rte_table_action *a;
51 };
52 
53 struct pipeline_data {
54 	struct rte_pipeline *p;
55 	struct table_data table_data[RTE_PIPELINE_TABLE_MAX];
56 	uint32_t n_tables;
57 
58 	struct rte_ring *msgq_req;
59 	struct rte_ring *msgq_rsp;
60 	uint64_t timer_period; /* Measured in CPU cycles. */
61 	uint64_t time_next;
62 
63 	uint8_t buffer[TABLE_RULE_ACTION_SIZE_MAX];
64 };
65 
66 struct thread_data {
67 	struct rte_pipeline *p[THREAD_PIPELINES_MAX];
68 	uint32_t n_pipelines;
69 
70 	struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
71 	struct rte_ring *msgq_req;
72 	struct rte_ring *msgq_rsp;
73 	uint64_t timer_period; /* Measured in CPU cycles. */
74 	uint64_t time_next;
75 	uint64_t time_next_min;
76 } __rte_cache_aligned;
77 
78 static struct thread_data thread_data[RTE_MAX_LCORE];
79 
80 /**
81  * Master thread: data plane thread init
82  */
83 static void
84 thread_free(void)
85 {
86 	uint32_t i;
87 
88 	for (i = 0; i < RTE_MAX_LCORE; i++) {
89 		struct thread *t = &thread[i];
90 
91 		if (!rte_lcore_is_enabled(i))
92 			continue;
93 
94 		/* MSGQs */
95 		if (t->msgq_req)
96 			rte_ring_free(t->msgq_req);
97 
98 		if (t->msgq_rsp)
99 			rte_ring_free(t->msgq_rsp);
100 	}
101 }
102 
103 int
104 thread_init(void)
105 {
106 	uint32_t i;
107 
108 	RTE_LCORE_FOREACH_SLAVE(i) {
109 		char name[NAME_MAX];
110 		struct rte_ring *msgq_req, *msgq_rsp;
111 		struct thread *t = &thread[i];
112 		struct thread_data *t_data = &thread_data[i];
113 		uint32_t cpu_id = rte_lcore_to_socket_id(i);
114 
115 		/* MSGQs */
116 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
117 
118 		msgq_req = rte_ring_create(name,
119 			THREAD_MSGQ_SIZE,
120 			cpu_id,
121 			RING_F_SP_ENQ | RING_F_SC_DEQ);
122 
123 		if (msgq_req == NULL) {
124 			thread_free();
125 			return -1;
126 		}
127 
128 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
129 
130 		msgq_rsp = rte_ring_create(name,
131 			THREAD_MSGQ_SIZE,
132 			cpu_id,
133 			RING_F_SP_ENQ | RING_F_SC_DEQ);
134 
135 		if (msgq_rsp == NULL) {
136 			thread_free();
137 			return -1;
138 		}
139 
140 		/* Master thread records */
141 		t->msgq_req = msgq_req;
142 		t->msgq_rsp = msgq_rsp;
143 		t->enabled = 1;
144 
145 		/* Data plane thread records */
146 		t_data->n_pipelines = 0;
147 		t_data->msgq_req = msgq_req;
148 		t_data->msgq_rsp = msgq_rsp;
149 		t_data->timer_period =
150 			(rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
151 		t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
152 		t_data->time_next_min = t_data->time_next;
153 	}
154 
155 	return 0;
156 }
157 
158 static inline int
159 thread_is_running(uint32_t thread_id)
160 {
161 	enum rte_lcore_state_t thread_state;
162 
163 	thread_state = rte_eal_get_lcore_state(thread_id);
164 	return (thread_state == RUNNING) ? 1 : 0;
165 }
166 
167 /**
168  * Pipeline is running when:
169  *    (A) Pipeline is mapped to a data plane thread AND
170  *    (B) Its data plane thread is in RUNNING state.
171  */
172 static inline int
173 pipeline_is_running(struct pipeline *p)
174 {
175 	if (p->enabled == 0)
176 		return 0;
177 
178 	return thread_is_running(p->thread_id);
179 }
180 
181 /**
182  * Master thread & data plane threads: message passing
183  */
184 enum thread_req_type {
185 	THREAD_REQ_PIPELINE_ENABLE = 0,
186 	THREAD_REQ_PIPELINE_DISABLE,
187 	THREAD_REQ_MAX
188 };
189 
190 struct thread_msg_req {
191 	enum thread_req_type type;
192 
193 	union {
194 		struct {
195 			struct rte_pipeline *p;
196 			struct {
197 				struct rte_table_action *a;
198 			} table[RTE_PIPELINE_TABLE_MAX];
199 			struct rte_ring *msgq_req;
200 			struct rte_ring *msgq_rsp;
201 			uint32_t timer_period_ms;
202 			uint32_t n_tables;
203 		} pipeline_enable;
204 
205 		struct {
206 			struct rte_pipeline *p;
207 		} pipeline_disable;
208 	};
209 };
210 
211 struct thread_msg_rsp {
212 	int status;
213 };
214 
215 /**
216  * Master thread
217  */
218 static struct thread_msg_req *
219 thread_msg_alloc(void)
220 {
221 	size_t size = RTE_MAX(sizeof(struct thread_msg_req),
222 		sizeof(struct thread_msg_rsp));
223 
224 	return calloc(1, size);
225 }
226 
227 static void
228 thread_msg_free(struct thread_msg_rsp *rsp)
229 {
230 	free(rsp);
231 }
232 
233 static struct thread_msg_rsp *
234 thread_msg_send_recv(uint32_t thread_id,
235 	struct thread_msg_req *req)
236 {
237 	struct thread *t = &thread[thread_id];
238 	struct rte_ring *msgq_req = t->msgq_req;
239 	struct rte_ring *msgq_rsp = t->msgq_rsp;
240 	struct thread_msg_rsp *rsp;
241 	int status;
242 
243 	/* send */
244 	do {
245 		status = rte_ring_sp_enqueue(msgq_req, req);
246 	} while (status == -ENOBUFS);
247 
248 	/* recv */
249 	do {
250 		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
251 	} while (status != 0);
252 
253 	return rsp;
254 }
255 
256 int
257 thread_pipeline_enable(uint32_t thread_id,
258 	const char *pipeline_name)
259 {
260 	struct pipeline *p = pipeline_find(pipeline_name);
261 	struct thread *t;
262 	struct thread_msg_req *req;
263 	struct thread_msg_rsp *rsp;
264 	uint32_t i;
265 	int status;
266 
267 	/* Check input params */
268 	if ((thread_id >= RTE_MAX_LCORE) ||
269 		(p == NULL) ||
270 		(p->n_ports_in == 0) ||
271 		(p->n_ports_out == 0) ||
272 		(p->n_tables == 0))
273 		return -1;
274 
275 	t = &thread[thread_id];
276 	if ((t->enabled == 0) ||
277 		p->enabled)
278 		return -1;
279 
280 	if (!thread_is_running(thread_id)) {
281 		struct thread_data *td = &thread_data[thread_id];
282 		struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
283 
284 		if (td->n_pipelines >= THREAD_PIPELINES_MAX)
285 			return -1;
286 
287 		/* Data plane thread */
288 		td->p[td->n_pipelines] = p->p;
289 
290 		tdp->p = p->p;
291 		for (i = 0; i < p->n_tables; i++)
292 			tdp->table_data[i].a = p->table[i].a;
293 
294 		tdp->n_tables = p->n_tables;
295 
296 		tdp->msgq_req = p->msgq_req;
297 		tdp->msgq_rsp = p->msgq_rsp;
298 		tdp->timer_period = (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
299 		tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
300 
301 		td->n_pipelines++;
302 
303 		/* Pipeline */
304 		p->thread_id = thread_id;
305 		p->enabled = 1;
306 
307 		return 0;
308 	}
309 
310 	/* Allocate request */
311 	req = thread_msg_alloc();
312 	if (req == NULL)
313 		return -1;
314 
315 	/* Write request */
316 	req->type = THREAD_REQ_PIPELINE_ENABLE;
317 	req->pipeline_enable.p = p->p;
318 	for (i = 0; i < p->n_tables; i++)
319 		req->pipeline_enable.table[i].a =
320 			p->table[i].a;
321 	req->pipeline_enable.msgq_req = p->msgq_req;
322 	req->pipeline_enable.msgq_rsp = p->msgq_rsp;
323 	req->pipeline_enable.timer_period_ms = p->timer_period_ms;
324 	req->pipeline_enable.n_tables = p->n_tables;
325 
326 	/* Send request and wait for response */
327 	rsp = thread_msg_send_recv(thread_id, req);
328 	if (rsp == NULL)
329 		return -1;
330 
331 	/* Read response */
332 	status = rsp->status;
333 
334 	/* Free response */
335 	thread_msg_free(rsp);
336 
337 	/* Request completion */
338 	if (status)
339 		return status;
340 
341 	p->thread_id = thread_id;
342 	p->enabled = 1;
343 
344 	return 0;
345 }
346 
347 int
348 thread_pipeline_disable(uint32_t thread_id,
349 	const char *pipeline_name)
350 {
351 	struct pipeline *p = pipeline_find(pipeline_name);
352 	struct thread *t;
353 	struct thread_msg_req *req;
354 	struct thread_msg_rsp *rsp;
355 	int status;
356 
357 	/* Check input params */
358 	if ((thread_id >= RTE_MAX_LCORE) ||
359 		(p == NULL))
360 		return -1;
361 
362 	t = &thread[thread_id];
363 	if (t->enabled == 0)
364 		return -1;
365 
366 	if (p->enabled == 0)
367 		return 0;
368 
369 	if (p->thread_id != thread_id)
370 		return -1;
371 
372 	if (!thread_is_running(thread_id)) {
373 		struct thread_data *td = &thread_data[thread_id];
374 		uint32_t i;
375 
376 		for (i = 0; i < td->n_pipelines; i++) {
377 			struct pipeline_data *tdp = &td->pipeline_data[i];
378 
379 			if (tdp->p != p->p)
380 				continue;
381 
382 			/* Data plane thread */
383 			if (i < td->n_pipelines - 1) {
384 				struct rte_pipeline *pipeline_last =
385 					td->p[td->n_pipelines - 1];
386 				struct pipeline_data *tdp_last =
387 					&td->pipeline_data[td->n_pipelines - 1];
388 
389 				td->p[i] = pipeline_last;
390 				memcpy(tdp, tdp_last, sizeof(*tdp));
391 			}
392 
393 			td->n_pipelines--;
394 
395 			/* Pipeline */
396 			p->enabled = 0;
397 
398 			break;
399 		}
400 
401 		return 0;
402 	}
403 
404 	/* Allocate request */
405 	req = thread_msg_alloc();
406 	if (req == NULL)
407 		return -1;
408 
409 	/* Write request */
410 	req->type = THREAD_REQ_PIPELINE_DISABLE;
411 	req->pipeline_disable.p = p->p;
412 
413 	/* Send request and wait for response */
414 	rsp = thread_msg_send_recv(thread_id, req);
415 	if (rsp == NULL)
416 		return -1;
417 
418 	/* Read response */
419 	status = rsp->status;
420 
421 	/* Free response */
422 	thread_msg_free(rsp);
423 
424 	/* Request completion */
425 	if (status)
426 		return status;
427 
428 	p->enabled = 0;
429 
430 	return 0;
431 }
432 
433 /**
434  * Data plane threads: message handling
435  */
436 static inline struct thread_msg_req *
437 thread_msg_recv(struct rte_ring *msgq_req)
438 {
439 	struct thread_msg_req *req;
440 
441 	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
442 
443 	if (status != 0)
444 		return NULL;
445 
446 	return req;
447 }
448 
449 static inline void
450 thread_msg_send(struct rte_ring *msgq_rsp,
451 	struct thread_msg_rsp *rsp)
452 {
453 	int status;
454 
455 	do {
456 		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
457 	} while (status == -ENOBUFS);
458 }
459 
460 static struct thread_msg_rsp *
461 thread_msg_handle_pipeline_enable(struct thread_data *t,
462 	struct thread_msg_req *req)
463 {
464 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
465 	struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
466 	uint32_t i;
467 
468 	/* Request */
469 	if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
470 		rsp->status = -1;
471 		return rsp;
472 	}
473 
474 	t->p[t->n_pipelines] = req->pipeline_enable.p;
475 
476 	p->p = req->pipeline_enable.p;
477 	for (i = 0; i < req->pipeline_enable.n_tables; i++)
478 		p->table_data[i].a =
479 			req->pipeline_enable.table[i].a;
480 
481 	p->n_tables = req->pipeline_enable.n_tables;
482 
483 	p->msgq_req = req->pipeline_enable.msgq_req;
484 	p->msgq_rsp = req->pipeline_enable.msgq_rsp;
485 	p->timer_period =
486 		(rte_get_tsc_hz() * req->pipeline_enable.timer_period_ms) / 1000;
487 	p->time_next = rte_get_tsc_cycles() + p->timer_period;
488 
489 	t->n_pipelines++;
490 
491 	/* Response */
492 	rsp->status = 0;
493 	return rsp;
494 }
495 
496 static struct thread_msg_rsp *
497 thread_msg_handle_pipeline_disable(struct thread_data *t,
498 	struct thread_msg_req *req)
499 {
500 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
501 	uint32_t n_pipelines = t->n_pipelines;
502 	struct rte_pipeline *pipeline = req->pipeline_disable.p;
503 	uint32_t i;
504 
505 	/* find pipeline */
506 	for (i = 0; i < n_pipelines; i++) {
507 		struct pipeline_data *p = &t->pipeline_data[i];
508 
509 		if (p->p != pipeline)
510 			continue;
511 
512 		if (i < n_pipelines - 1) {
513 			struct rte_pipeline *pipeline_last =
514 				t->p[n_pipelines - 1];
515 			struct pipeline_data *p_last =
516 				&t->pipeline_data[n_pipelines - 1];
517 
518 			t->p[i] = pipeline_last;
519 			memcpy(p, p_last, sizeof(*p));
520 		}
521 
522 		t->n_pipelines--;
523 
524 		rsp->status = 0;
525 		return rsp;
526 	}
527 
528 	/* should not get here */
529 	rsp->status = 0;
530 	return rsp;
531 }
532 
533 static void
534 thread_msg_handle(struct thread_data *t)
535 {
536 	for ( ; ; ) {
537 		struct thread_msg_req *req;
538 		struct thread_msg_rsp *rsp;
539 
540 		req = thread_msg_recv(t->msgq_req);
541 		if (req == NULL)
542 			break;
543 
544 		switch (req->type) {
545 		case THREAD_REQ_PIPELINE_ENABLE:
546 			rsp = thread_msg_handle_pipeline_enable(t, req);
547 			break;
548 
549 		case THREAD_REQ_PIPELINE_DISABLE:
550 			rsp = thread_msg_handle_pipeline_disable(t, req);
551 			break;
552 
553 		default:
554 			rsp = (struct thread_msg_rsp *) req;
555 			rsp->status = -1;
556 		}
557 
558 		thread_msg_send(t->msgq_rsp, rsp);
559 	}
560 }
561 
562 /**
563  * Master thread & data plane threads: message passing
564  */
565 enum pipeline_req_type {
566 	/* Port IN */
567 	PIPELINE_REQ_PORT_IN_STATS_READ,
568 	PIPELINE_REQ_PORT_IN_ENABLE,
569 	PIPELINE_REQ_PORT_IN_DISABLE,
570 
571 	/* Port OUT */
572 	PIPELINE_REQ_PORT_OUT_STATS_READ,
573 
574 	/* Table */
575 	PIPELINE_REQ_TABLE_STATS_READ,
576 	PIPELINE_REQ_TABLE_RULE_ADD,
577 	PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT,
578 	PIPELINE_REQ_TABLE_RULE_ADD_BULK,
579 	PIPELINE_REQ_TABLE_RULE_DELETE,
580 	PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT,
581 	PIPELINE_REQ_TABLE_RULE_STATS_READ,
582 	PIPELINE_REQ_TABLE_MTR_PROFILE_ADD,
583 	PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE,
584 	PIPELINE_REQ_TABLE_RULE_MTR_READ,
585 	PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE,
586 	PIPELINE_REQ_TABLE_RULE_TTL_READ,
587 	PIPELINE_REQ_TABLE_RULE_TIME_READ,
588 	PIPELINE_REQ_MAX
589 };
590 
591 struct pipeline_msg_req_port_in_stats_read {
592 	int clear;
593 };
594 
595 struct pipeline_msg_req_port_out_stats_read {
596 	int clear;
597 };
598 
599 struct pipeline_msg_req_table_stats_read {
600 	int clear;
601 };
602 
603 struct pipeline_msg_req_table_rule_add {
604 	struct table_rule_match match;
605 	struct table_rule_action action;
606 };
607 
608 struct pipeline_msg_req_table_rule_add_default {
609 	struct table_rule_action action;
610 };
611 
612 struct pipeline_msg_req_table_rule_add_bulk {
613 	struct table_rule_list *list;
614 	int bulk;
615 };
616 
617 struct pipeline_msg_req_table_rule_delete {
618 	struct table_rule_match match;
619 };
620 
621 struct pipeline_msg_req_table_rule_stats_read {
622 	void *data;
623 	int clear;
624 };
625 
626 struct pipeline_msg_req_table_mtr_profile_add {
627 	uint32_t meter_profile_id;
628 	struct rte_table_action_meter_profile profile;
629 };
630 
631 struct pipeline_msg_req_table_mtr_profile_delete {
632 	uint32_t meter_profile_id;
633 };
634 
635 struct pipeline_msg_req_table_rule_mtr_read {
636 	void *data;
637 	uint32_t tc_mask;
638 	int clear;
639 };
640 
641 struct pipeline_msg_req_table_dscp_table_update {
642 	uint64_t dscp_mask;
643 	struct rte_table_action_dscp_table dscp_table;
644 };
645 
646 struct pipeline_msg_req_table_rule_ttl_read {
647 	void *data;
648 	int clear;
649 };
650 
651 struct pipeline_msg_req_table_rule_time_read {
652 	void *data;
653 };
654 
655 struct pipeline_msg_req {
656 	enum pipeline_req_type type;
657 	uint32_t id; /* Port IN, port OUT or table ID */
658 
659 	RTE_STD_C11
660 	union {
661 		struct pipeline_msg_req_port_in_stats_read port_in_stats_read;
662 		struct pipeline_msg_req_port_out_stats_read port_out_stats_read;
663 		struct pipeline_msg_req_table_stats_read table_stats_read;
664 		struct pipeline_msg_req_table_rule_add table_rule_add;
665 		struct pipeline_msg_req_table_rule_add_default table_rule_add_default;
666 		struct pipeline_msg_req_table_rule_add_bulk table_rule_add_bulk;
667 		struct pipeline_msg_req_table_rule_delete table_rule_delete;
668 		struct pipeline_msg_req_table_rule_stats_read table_rule_stats_read;
669 		struct pipeline_msg_req_table_mtr_profile_add table_mtr_profile_add;
670 		struct pipeline_msg_req_table_mtr_profile_delete table_mtr_profile_delete;
671 		struct pipeline_msg_req_table_rule_mtr_read table_rule_mtr_read;
672 		struct pipeline_msg_req_table_dscp_table_update table_dscp_table_update;
673 		struct pipeline_msg_req_table_rule_ttl_read table_rule_ttl_read;
674 		struct pipeline_msg_req_table_rule_time_read table_rule_time_read;
675 	};
676 };
677 
678 struct pipeline_msg_rsp_port_in_stats_read {
679 	struct rte_pipeline_port_in_stats stats;
680 };
681 
682 struct pipeline_msg_rsp_port_out_stats_read {
683 	struct rte_pipeline_port_out_stats stats;
684 };
685 
686 struct pipeline_msg_rsp_table_stats_read {
687 	struct rte_pipeline_table_stats stats;
688 };
689 
690 struct pipeline_msg_rsp_table_rule_add {
691 	void *data;
692 };
693 
694 struct pipeline_msg_rsp_table_rule_add_default {
695 	void *data;
696 };
697 
698 struct pipeline_msg_rsp_table_rule_add_bulk {
699 	uint32_t n_rules;
700 };
701 
702 struct pipeline_msg_rsp_table_rule_stats_read {
703 	struct rte_table_action_stats_counters stats;
704 };
705 
706 struct pipeline_msg_rsp_table_rule_mtr_read {
707 	struct rte_table_action_mtr_counters stats;
708 };
709 
710 struct pipeline_msg_rsp_table_rule_ttl_read {
711 	struct rte_table_action_ttl_counters stats;
712 };
713 
714 struct pipeline_msg_rsp_table_rule_time_read {
715 	uint64_t timestamp;
716 };
717 
718 struct pipeline_msg_rsp {
719 	int status;
720 
721 	RTE_STD_C11
722 	union {
723 		struct pipeline_msg_rsp_port_in_stats_read port_in_stats_read;
724 		struct pipeline_msg_rsp_port_out_stats_read port_out_stats_read;
725 		struct pipeline_msg_rsp_table_stats_read table_stats_read;
726 		struct pipeline_msg_rsp_table_rule_add table_rule_add;
727 		struct pipeline_msg_rsp_table_rule_add_default table_rule_add_default;
728 		struct pipeline_msg_rsp_table_rule_add_bulk table_rule_add_bulk;
729 		struct pipeline_msg_rsp_table_rule_stats_read table_rule_stats_read;
730 		struct pipeline_msg_rsp_table_rule_mtr_read table_rule_mtr_read;
731 		struct pipeline_msg_rsp_table_rule_ttl_read table_rule_ttl_read;
732 		struct pipeline_msg_rsp_table_rule_time_read table_rule_time_read;
733 	};
734 };
735 
736 /**
737  * Master thread
738  */
739 static struct pipeline_msg_req *
740 pipeline_msg_alloc(void)
741 {
742 	size_t size = RTE_MAX(sizeof(struct pipeline_msg_req),
743 		sizeof(struct pipeline_msg_rsp));
744 
745 	return calloc(1, size);
746 }
747 
748 static void
749 pipeline_msg_free(struct pipeline_msg_rsp *rsp)
750 {
751 	free(rsp);
752 }
753 
754 static struct pipeline_msg_rsp *
755 pipeline_msg_send_recv(struct pipeline *p,
756 	struct pipeline_msg_req *req)
757 {
758 	struct rte_ring *msgq_req = p->msgq_req;
759 	struct rte_ring *msgq_rsp = p->msgq_rsp;
760 	struct pipeline_msg_rsp *rsp;
761 	int status;
762 
763 	/* send */
764 	do {
765 		status = rte_ring_sp_enqueue(msgq_req, req);
766 	} while (status == -ENOBUFS);
767 
768 	/* recv */
769 	do {
770 		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
771 	} while (status != 0);
772 
773 	return rsp;
774 }
775 
776 int
777 pipeline_port_in_stats_read(const char *pipeline_name,
778 	uint32_t port_id,
779 	struct rte_pipeline_port_in_stats *stats,
780 	int clear)
781 {
782 	struct pipeline *p;
783 	struct pipeline_msg_req *req;
784 	struct pipeline_msg_rsp *rsp;
785 	int status;
786 
787 	/* Check input params */
788 	if ((pipeline_name == NULL) ||
789 		(stats == NULL))
790 		return -1;
791 
792 	p = pipeline_find(pipeline_name);
793 	if ((p == NULL) ||
794 		(port_id >= p->n_ports_in))
795 		return -1;
796 
797 	if (!pipeline_is_running(p)) {
798 		status = rte_pipeline_port_in_stats_read(p->p,
799 			port_id,
800 			stats,
801 			clear);
802 
803 		return status;
804 	}
805 
806 	/* Allocate request */
807 	req = pipeline_msg_alloc();
808 	if (req == NULL)
809 		return -1;
810 
811 	/* Write request */
812 	req->type = PIPELINE_REQ_PORT_IN_STATS_READ;
813 	req->id = port_id;
814 	req->port_in_stats_read.clear = clear;
815 
816 	/* Send request and wait for response */
817 	rsp = pipeline_msg_send_recv(p, req);
818 	if (rsp == NULL)
819 		return -1;
820 
821 	/* Read response */
822 	status = rsp->status;
823 	if (status == 0)
824 		memcpy(stats, &rsp->port_in_stats_read.stats, sizeof(*stats));
825 
826 	/* Free response */
827 	pipeline_msg_free(rsp);
828 
829 	return status;
830 }
831 
832 int
833 pipeline_port_in_enable(const char *pipeline_name,
834 	uint32_t port_id)
835 {
836 	struct pipeline *p;
837 	struct pipeline_msg_req *req;
838 	struct pipeline_msg_rsp *rsp;
839 	int status;
840 
841 	/* Check input params */
842 	if (pipeline_name == NULL)
843 		return -1;
844 
845 	p = pipeline_find(pipeline_name);
846 	if ((p == NULL) ||
847 		(port_id >= p->n_ports_in))
848 		return -1;
849 
850 	if (!pipeline_is_running(p)) {
851 		status = rte_pipeline_port_in_enable(p->p, port_id);
852 		return status;
853 	}
854 
855 	/* Allocate request */
856 	req = pipeline_msg_alloc();
857 	if (req == NULL)
858 		return -1;
859 
860 	/* Write request */
861 	req->type = PIPELINE_REQ_PORT_IN_ENABLE;
862 	req->id = port_id;
863 
864 	/* Send request and wait for response */
865 	rsp = pipeline_msg_send_recv(p, req);
866 	if (rsp == NULL)
867 		return -1;
868 
869 	/* Read response */
870 	status = rsp->status;
871 
872 	/* Free response */
873 	pipeline_msg_free(rsp);
874 
875 	return status;
876 }
877 
878 int
879 pipeline_port_in_disable(const char *pipeline_name,
880 	uint32_t port_id)
881 {
882 	struct pipeline *p;
883 	struct pipeline_msg_req *req;
884 	struct pipeline_msg_rsp *rsp;
885 	int status;
886 
887 	/* Check input params */
888 	if (pipeline_name == NULL)
889 		return -1;
890 
891 	p = pipeline_find(pipeline_name);
892 	if ((p == NULL) ||
893 		(port_id >= p->n_ports_in))
894 		return -1;
895 
896 	if (!pipeline_is_running(p)) {
897 		status = rte_pipeline_port_in_disable(p->p, port_id);
898 		return status;
899 	}
900 
901 	/* Allocate request */
902 	req = pipeline_msg_alloc();
903 	if (req == NULL)
904 		return -1;
905 
906 	/* Write request */
907 	req->type = PIPELINE_REQ_PORT_IN_DISABLE;
908 	req->id = port_id;
909 
910 	/* Send request and wait for response */
911 	rsp = pipeline_msg_send_recv(p, req);
912 	if (rsp == NULL)
913 		return -1;
914 
915 	/* Read response */
916 	status = rsp->status;
917 
918 	/* Free response */
919 	pipeline_msg_free(rsp);
920 
921 	return status;
922 }
923 
924 int
925 pipeline_port_out_stats_read(const char *pipeline_name,
926 	uint32_t port_id,
927 	struct rte_pipeline_port_out_stats *stats,
928 	int clear)
929 {
930 	struct pipeline *p;
931 	struct pipeline_msg_req *req;
932 	struct pipeline_msg_rsp *rsp;
933 	int status;
934 
935 	/* Check input params */
936 	if ((pipeline_name == NULL) ||
937 		(stats == NULL))
938 		return -1;
939 
940 	p = pipeline_find(pipeline_name);
941 	if ((p == NULL) ||
942 		(port_id >= p->n_ports_out))
943 		return -1;
944 
945 	if (!pipeline_is_running(p)) {
946 		status = rte_pipeline_port_out_stats_read(p->p,
947 			port_id,
948 			stats,
949 			clear);
950 
951 		return status;
952 	}
953 
954 	/* Allocate request */
955 	req = pipeline_msg_alloc();
956 	if (req == NULL)
957 		return -1;
958 
959 	/* Write request */
960 	req->type = PIPELINE_REQ_PORT_OUT_STATS_READ;
961 	req->id = port_id;
962 	req->port_out_stats_read.clear = clear;
963 
964 	/* Send request and wait for response */
965 	rsp = pipeline_msg_send_recv(p, req);
966 	if (rsp == NULL)
967 		return -1;
968 
969 	/* Read response */
970 	status = rsp->status;
971 	if (status == 0)
972 		memcpy(stats, &rsp->port_out_stats_read.stats, sizeof(*stats));
973 
974 	/* Free response */
975 	pipeline_msg_free(rsp);
976 
977 	return status;
978 }
979 
980 int
981 pipeline_table_stats_read(const char *pipeline_name,
982 	uint32_t table_id,
983 	struct rte_pipeline_table_stats *stats,
984 	int clear)
985 {
986 	struct pipeline *p;
987 	struct pipeline_msg_req *req;
988 	struct pipeline_msg_rsp *rsp;
989 	int status;
990 
991 	/* Check input params */
992 	if ((pipeline_name == NULL) ||
993 		(stats == NULL))
994 		return -1;
995 
996 	p = pipeline_find(pipeline_name);
997 	if ((p == NULL) ||
998 		(table_id >= p->n_tables))
999 		return -1;
1000 
1001 	if (!pipeline_is_running(p)) {
1002 		status = rte_pipeline_table_stats_read(p->p,
1003 			table_id,
1004 			stats,
1005 			clear);
1006 
1007 		return status;
1008 	}
1009 
1010 	/* Allocate request */
1011 	req = pipeline_msg_alloc();
1012 	if (req == NULL)
1013 		return -1;
1014 
1015 	/* Write request */
1016 	req->type = PIPELINE_REQ_TABLE_STATS_READ;
1017 	req->id = table_id;
1018 	req->table_stats_read.clear = clear;
1019 
1020 	/* Send request and wait for response */
1021 	rsp = pipeline_msg_send_recv(p, req);
1022 	if (rsp == NULL)
1023 		return -1;
1024 
1025 	/* Read response */
1026 	status = rsp->status;
1027 	if (status == 0)
1028 		memcpy(stats, &rsp->table_stats_read.stats, sizeof(*stats));
1029 
1030 	/* Free response */
1031 	pipeline_msg_free(rsp);
1032 
1033 	return status;
1034 }
1035 
1036 static int
1037 match_check(struct table_rule_match *match,
1038 	struct pipeline *p,
1039 	uint32_t table_id)
1040 {
1041 	struct table *table;
1042 
1043 	if ((match == NULL) ||
1044 		(p == NULL) ||
1045 		(table_id >= p->n_tables))
1046 		return -1;
1047 
1048 	table = &p->table[table_id];
1049 	if (match->match_type != table->params.match_type)
1050 		return -1;
1051 
1052 	switch (match->match_type) {
1053 	case TABLE_ACL:
1054 	{
1055 		struct table_acl_params *t = &table->params.match.acl;
1056 		struct table_rule_match_acl *r = &match->match.acl;
1057 
1058 		if ((r->ip_version && (t->ip_version == 0)) ||
1059 			((r->ip_version == 0) && t->ip_version))
1060 			return -1;
1061 
1062 		if (r->ip_version) {
1063 			if ((r->sa_depth > 32) ||
1064 				(r->da_depth > 32))
1065 				return -1;
1066 		} else {
1067 			if ((r->sa_depth > 128) ||
1068 				(r->da_depth > 128))
1069 				return -1;
1070 		}
1071 		return 0;
1072 	}
1073 
1074 	case TABLE_ARRAY:
1075 		return 0;
1076 
1077 	case TABLE_HASH:
1078 		return 0;
1079 
1080 	case TABLE_LPM:
1081 	{
1082 		struct table_lpm_params *t = &table->params.match.lpm;
1083 		struct table_rule_match_lpm *r = &match->match.lpm;
1084 
1085 		if ((r->ip_version && (t->key_size != 4)) ||
1086 			((r->ip_version == 0) && (t->key_size != 16)))
1087 			return -1;
1088 
1089 		if (r->ip_version) {
1090 			if (r->depth > 32)
1091 				return -1;
1092 		} else {
1093 			if (r->depth > 128)
1094 				return -1;
1095 		}
1096 		return 0;
1097 	}
1098 
1099 	case TABLE_STUB:
1100 		return -1;
1101 
1102 	default:
1103 		return -1;
1104 	}
1105 }
1106 
1107 static int
1108 action_check(struct table_rule_action *action,
1109 	struct pipeline *p,
1110 	uint32_t table_id)
1111 {
1112 	struct table_action_profile *ap;
1113 
1114 	if ((action == NULL) ||
1115 		(p == NULL) ||
1116 		(table_id >= p->n_tables))
1117 		return -1;
1118 
1119 	ap = p->table[table_id].ap;
1120 	if (action->action_mask != ap->params.action_mask)
1121 		return -1;
1122 
1123 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1124 		if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1125 			(action->fwd.id >= p->n_ports_out))
1126 			return -1;
1127 
1128 		if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1129 			(action->fwd.id >= p->n_tables))
1130 			return -1;
1131 	}
1132 
1133 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
1134 		uint32_t tc_mask0 = (1 << ap->params.mtr.n_tc) - 1;
1135 		uint32_t tc_mask1 = action->mtr.tc_mask;
1136 
1137 		if (tc_mask1 != tc_mask0)
1138 			return -1;
1139 	}
1140 
1141 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
1142 		uint32_t n_subports_per_port =
1143 			ap->params.tm.n_subports_per_port;
1144 		uint32_t n_pipes_per_subport =
1145 			ap->params.tm.n_pipes_per_subport;
1146 		uint32_t subport_id = action->tm.subport_id;
1147 		uint32_t pipe_id = action->tm.pipe_id;
1148 
1149 		if ((subport_id >= n_subports_per_port) ||
1150 			(pipe_id >= n_pipes_per_subport))
1151 			return -1;
1152 	}
1153 
1154 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
1155 		uint64_t encap_mask = ap->params.encap.encap_mask;
1156 		enum rte_table_action_encap_type type = action->encap.type;
1157 
1158 		if ((encap_mask & (1LLU << type)) == 0)
1159 			return -1;
1160 	}
1161 
1162 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
1163 		int ip_version0 = ap->params.common.ip_version;
1164 		int ip_version1 = action->nat.ip_version;
1165 
1166 		if ((ip_version1 && (ip_version0 == 0)) ||
1167 			((ip_version1 == 0) && ip_version0))
1168 			return -1;
1169 	}
1170 
1171 	return 0;
1172 }
1173 
1174 static int
1175 action_default_check(struct table_rule_action *action,
1176 	struct pipeline *p,
1177 	uint32_t table_id)
1178 {
1179 	if ((action == NULL) ||
1180 		(action->action_mask != (1LLU << RTE_TABLE_ACTION_FWD)) ||
1181 		(p == NULL) ||
1182 		(table_id >= p->n_tables))
1183 		return -1;
1184 
1185 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1186 		if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1187 			(action->fwd.id >= p->n_ports_out))
1188 			return -1;
1189 
1190 		if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1191 			(action->fwd.id >= p->n_tables))
1192 			return -1;
1193 	}
1194 
1195 	return 0;
1196 }
1197 
1198 union table_rule_match_low_level {
1199 	struct rte_table_acl_rule_add_params acl_add;
1200 	struct rte_table_acl_rule_delete_params acl_delete;
1201 	struct rte_table_array_key array;
1202 	uint8_t hash[TABLE_RULE_MATCH_SIZE_MAX];
1203 	struct rte_table_lpm_key lpm_ipv4;
1204 	struct rte_table_lpm_ipv6_key lpm_ipv6;
1205 };
1206 
1207 static int
1208 match_convert(struct table_rule_match *mh,
1209 	union table_rule_match_low_level *ml,
1210 	int add);
1211 
1212 static int
1213 action_convert(struct rte_table_action *a,
1214 	struct table_rule_action *action,
1215 	struct rte_pipeline_table_entry *data);
1216 
1217 struct table_ll {
1218 	struct rte_pipeline *p;
1219 	int table_id;
1220 	struct rte_table_action *a;
1221 	int bulk_supported;
1222 };
1223 
1224 static int
1225 table_rule_add_bulk_ll(struct table_ll *table,
1226 	struct table_rule_list *list,
1227 	uint32_t *n_rules)
1228 {
1229 	union table_rule_match_low_level *match_ll = NULL;
1230 	uint8_t *action_ll = NULL;
1231 	void **match_ll_ptr = NULL;
1232 	struct rte_pipeline_table_entry **action_ll_ptr = NULL;
1233 	struct rte_pipeline_table_entry **entries_ptr = NULL;
1234 	int *found = NULL;
1235 	struct table_rule *rule;
1236 	uint32_t n, i;
1237 	int status = 0;
1238 
1239 	n = 0;
1240 	TAILQ_FOREACH(rule, list, node)
1241 		n++;
1242 
1243 	/* Memory allocation */
1244 	match_ll = calloc(n, sizeof(union table_rule_match_low_level));
1245 	action_ll = calloc(n, TABLE_RULE_ACTION_SIZE_MAX);
1246 
1247 	match_ll_ptr = calloc(n, sizeof(void *));
1248 	action_ll_ptr = calloc(n, sizeof(struct rte_pipeline_table_entry *));
1249 
1250 	entries_ptr = calloc(n, sizeof(struct rte_pipeline_table_entry *));
1251 	found = calloc(n, sizeof(int));
1252 
1253 	if (match_ll == NULL ||
1254 		action_ll == NULL ||
1255 		match_ll_ptr == NULL ||
1256 		action_ll_ptr == NULL ||
1257 		entries_ptr == NULL ||
1258 		found == NULL) {
1259 			status = -ENOMEM;
1260 			goto table_rule_add_bulk_ll_free;
1261 	}
1262 
1263 	/* Init */
1264 	for (i = 0; i < n; i++) {
1265 		match_ll_ptr[i] = (void *)&match_ll[i];
1266 		action_ll_ptr[i] = (struct rte_pipeline_table_entry *)
1267 			&action_ll[i * TABLE_RULE_ACTION_SIZE_MAX];
1268 	}
1269 
1270 	/* Rule (match, action) conversion */
1271 	i = 0;
1272 	TAILQ_FOREACH(rule, list, node) {
1273 		status = match_convert(&rule->match, match_ll_ptr[i], 1);
1274 		if (status)
1275 			goto table_rule_add_bulk_ll_free;
1276 
1277 		status = action_convert(table->a, &rule->action, action_ll_ptr[i]);
1278 		if (status)
1279 			goto table_rule_add_bulk_ll_free;
1280 
1281 		i++;
1282 	}
1283 
1284 	/* Add rule (match, action) to table */
1285 	if (table->bulk_supported) {
1286 		status = rte_pipeline_table_entry_add_bulk(table->p,
1287 			table->table_id,
1288 			match_ll_ptr,
1289 			action_ll_ptr,
1290 			n,
1291 			found,
1292 			entries_ptr);
1293 		if (status)
1294 			goto table_rule_add_bulk_ll_free;
1295 	} else
1296 		for (i = 0; i < n; i++) {
1297 			status = rte_pipeline_table_entry_add(table->p,
1298 				table->table_id,
1299 				match_ll_ptr[i],
1300 				action_ll_ptr[i],
1301 				&found[i],
1302 				&entries_ptr[i]);
1303 			if (status) {
1304 				if (i == 0)
1305 					goto table_rule_add_bulk_ll_free;
1306 
1307 				/* No roll-back. */
1308 				status = 0;
1309 				n = i;
1310 				break;
1311 			}
1312 		}
1313 
1314 	/* Write back to the rule list. */
1315 	i = 0;
1316 	TAILQ_FOREACH(rule, list, node) {
1317 		if (i >= n)
1318 			break;
1319 
1320 		rule->data = entries_ptr[i];
1321 
1322 		i++;
1323 	}
1324 
1325 	*n_rules = n;
1326 
1327 	/* Free */
1328 table_rule_add_bulk_ll_free:
1329 	free(found);
1330 	free(entries_ptr);
1331 	free(action_ll_ptr);
1332 	free(match_ll_ptr);
1333 	free(action_ll);
1334 	free(match_ll);
1335 
1336 	return status;
1337 }
1338 
1339 int
1340 pipeline_table_rule_add(const char *pipeline_name,
1341 	uint32_t table_id,
1342 	struct table_rule_match *match,
1343 	struct table_rule_action *action)
1344 {
1345 	struct pipeline *p;
1346 	struct table *table;
1347 	struct pipeline_msg_req *req;
1348 	struct pipeline_msg_rsp *rsp;
1349 	struct table_rule *rule;
1350 	int status;
1351 
1352 	/* Check input params */
1353 	if ((pipeline_name == NULL) ||
1354 		(match == NULL) ||
1355 		(action == NULL))
1356 		return -1;
1357 
1358 	p = pipeline_find(pipeline_name);
1359 	if ((p == NULL) ||
1360 		(table_id >= p->n_tables) ||
1361 		match_check(match, p, table_id) ||
1362 		action_check(action, p, table_id))
1363 		return -1;
1364 
1365 	table = &p->table[table_id];
1366 
1367 	rule = calloc(1, sizeof(struct table_rule));
1368 	if (rule == NULL)
1369 		return -1;
1370 
1371 	memcpy(&rule->match, match, sizeof(*match));
1372 	memcpy(&rule->action, action, sizeof(*action));
1373 
1374 	if (!pipeline_is_running(p)) {
1375 		union table_rule_match_low_level match_ll;
1376 		struct rte_pipeline_table_entry *data_in, *data_out;
1377 		int key_found;
1378 		uint8_t *buffer;
1379 
1380 		buffer = calloc(TABLE_RULE_ACTION_SIZE_MAX, sizeof(uint8_t));
1381 		if (buffer == NULL) {
1382 			free(rule);
1383 			return -1;
1384 		}
1385 
1386 		/* Table match-action rule conversion */
1387 		data_in = (struct rte_pipeline_table_entry *)buffer;
1388 
1389 		status = match_convert(match, &match_ll, 1);
1390 		if (status) {
1391 			free(buffer);
1392 			free(rule);
1393 			return -1;
1394 		}
1395 
1396 		status = action_convert(table->a, action, data_in);
1397 		if (status) {
1398 			free(buffer);
1399 			free(rule);
1400 			return -1;
1401 		}
1402 
1403 		/* Add rule (match, action) to table */
1404 		status = rte_pipeline_table_entry_add(p->p,
1405 				table_id,
1406 				&match_ll,
1407 				data_in,
1408 				&key_found,
1409 				&data_out);
1410 		if (status) {
1411 			free(buffer);
1412 			free(rule);
1413 			return -1;
1414 		}
1415 
1416 		/* Write Response */
1417 		rule->data = data_out;
1418 		table_rule_add(table, rule);
1419 
1420 		free(buffer);
1421 		return 0;
1422 	}
1423 
1424 	/* Allocate request */
1425 	req = pipeline_msg_alloc();
1426 	if (req == NULL) {
1427 		free(rule);
1428 		return -1;
1429 	}
1430 
1431 	/* Write request */
1432 	req->type = PIPELINE_REQ_TABLE_RULE_ADD;
1433 	req->id = table_id;
1434 	memcpy(&req->table_rule_add.match, match, sizeof(*match));
1435 	memcpy(&req->table_rule_add.action, action, sizeof(*action));
1436 
1437 	/* Send request and wait for response */
1438 	rsp = pipeline_msg_send_recv(p, req);
1439 	if (rsp == NULL) {
1440 		free(rule);
1441 		return -1;
1442 	}
1443 
1444 	/* Read response */
1445 	status = rsp->status;
1446 	if (status == 0) {
1447 		rule->data = rsp->table_rule_add.data;
1448 		table_rule_add(table, rule);
1449 	} else
1450 		free(rule);
1451 
1452 	/* Free response */
1453 	pipeline_msg_free(rsp);
1454 
1455 	return status;
1456 }
1457 
1458 int
1459 pipeline_table_rule_add_default(const char *pipeline_name,
1460 	uint32_t table_id,
1461 	struct table_rule_action *action)
1462 {
1463 	struct pipeline *p;
1464 	struct table *table;
1465 	struct pipeline_msg_req *req;
1466 	struct pipeline_msg_rsp *rsp;
1467 	struct table_rule *rule;
1468 	int status;
1469 
1470 	/* Check input params */
1471 	if ((pipeline_name == NULL) ||
1472 		(action == NULL))
1473 		return -1;
1474 
1475 	p = pipeline_find(pipeline_name);
1476 	if ((p == NULL) ||
1477 		(table_id >= p->n_tables) ||
1478 		action_default_check(action, p, table_id))
1479 		return -1;
1480 
1481 	table = &p->table[table_id];
1482 
1483 	rule = calloc(1, sizeof(struct table_rule));
1484 	if (rule == NULL)
1485 		return -1;
1486 
1487 	memcpy(&rule->action, action, sizeof(*action));
1488 
1489 	if (!pipeline_is_running(p)) {
1490 		struct rte_pipeline_table_entry *data_in, *data_out;
1491 		uint8_t *buffer;
1492 
1493 		buffer = calloc(TABLE_RULE_ACTION_SIZE_MAX, sizeof(uint8_t));
1494 		if (buffer == NULL) {
1495 			free(rule);
1496 			return -1;
1497 		}
1498 
1499 		/* Apply actions */
1500 		data_in = (struct rte_pipeline_table_entry *)buffer;
1501 
1502 		data_in->action = action->fwd.action;
1503 		if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
1504 			data_in->port_id = action->fwd.id;
1505 		if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
1506 			data_in->table_id = action->fwd.id;
1507 
1508 		/* Add default rule to table */
1509 		status = rte_pipeline_table_default_entry_add(p->p,
1510 				table_id,
1511 				data_in,
1512 				&data_out);
1513 		if (status) {
1514 			free(buffer);
1515 			free(rule);
1516 			return -1;
1517 		}
1518 
1519 		/* Write Response */
1520 		rule->data = data_out;
1521 		table_rule_default_add(table, rule);
1522 
1523 		free(buffer);
1524 		return 0;
1525 	}
1526 
1527 	/* Allocate request */
1528 	req = pipeline_msg_alloc();
1529 	if (req == NULL) {
1530 		free(rule);
1531 		return -1;
1532 	}
1533 
1534 	/* Write request */
1535 	req->type = PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT;
1536 	req->id = table_id;
1537 	memcpy(&req->table_rule_add_default.action, action, sizeof(*action));
1538 
1539 	/* Send request and wait for response */
1540 	rsp = pipeline_msg_send_recv(p, req);
1541 	if (rsp == NULL) {
1542 		free(rule);
1543 		return -1;
1544 	}
1545 
1546 	/* Read response */
1547 	status = rsp->status;
1548 	if (status == 0) {
1549 		rule->data = rsp->table_rule_add_default.data;
1550 		table_rule_default_add(table, rule);
1551 	} else
1552 		free(rule);
1553 
1554 	/* Free response */
1555 	pipeline_msg_free(rsp);
1556 
1557 	return status;
1558 }
1559 
1560 static uint32_t
1561 table_rule_list_free(struct table_rule_list *list)
1562 {
1563 	uint32_t n = 0;
1564 
1565 	if (!list)
1566 		return 0;
1567 
1568 	for ( ; ; ) {
1569 		struct table_rule *rule;
1570 
1571 		rule = TAILQ_FIRST(list);
1572 		if (rule == NULL)
1573 			break;
1574 
1575 		TAILQ_REMOVE(list, rule, node);
1576 		free(rule);
1577 		n++;
1578 	}
1579 
1580 	free(list);
1581 	return n;
1582 }
1583 
1584 int
1585 pipeline_table_rule_add_bulk(const char *pipeline_name,
1586 	uint32_t table_id,
1587 	struct table_rule_list *list,
1588 	uint32_t *n_rules_added,
1589 	uint32_t *n_rules_not_added)
1590 {
1591 	struct pipeline *p;
1592 	struct table *table;
1593 	struct pipeline_msg_req *req;
1594 	struct pipeline_msg_rsp *rsp;
1595 	struct table_rule *rule;
1596 	int status = 0;
1597 
1598 	/* Check input params */
1599 	if ((pipeline_name == NULL) ||
1600 		(list == NULL) ||
1601 		TAILQ_EMPTY(list) ||
1602 		(n_rules_added == NULL) ||
1603 		(n_rules_not_added == NULL)) {
1604 		table_rule_list_free(list);
1605 		return -EINVAL;
1606 	}
1607 
1608 	p = pipeline_find(pipeline_name);
1609 	if ((p == NULL) ||
1610 		(table_id >= p->n_tables)) {
1611 		table_rule_list_free(list);
1612 		return -EINVAL;
1613 	}
1614 
1615 	table = &p->table[table_id];
1616 
1617 	TAILQ_FOREACH(rule, list, node)
1618 		if (match_check(&rule->match, p, table_id) ||
1619 			action_check(&rule->action, p, table_id)) {
1620 			table_rule_list_free(list);
1621 			return -EINVAL;
1622 		}
1623 
1624 	if (!pipeline_is_running(p)) {
1625 		struct table_ll table_ll = {
1626 			.p = p->p,
1627 			.table_id = table_id,
1628 			.a = table->a,
1629 			.bulk_supported = table->params.match_type == TABLE_ACL,
1630 		};
1631 
1632 		status = table_rule_add_bulk_ll(&table_ll, list, n_rules_added);
1633 		if (status) {
1634 			table_rule_list_free(list);
1635 			return status;
1636 		}
1637 
1638 		table_rule_add_bulk(table, list, *n_rules_added);
1639 		*n_rules_not_added = table_rule_list_free(list);
1640 		return 0;
1641 	}
1642 
1643 	/* Allocate request */
1644 	req = pipeline_msg_alloc();
1645 	if (req == NULL) {
1646 		table_rule_list_free(list);
1647 		return -ENOMEM;
1648 	}
1649 
1650 	/* Write request */
1651 	req->type = PIPELINE_REQ_TABLE_RULE_ADD_BULK;
1652 	req->id = table_id;
1653 	req->table_rule_add_bulk.list = list;
1654 	req->table_rule_add_bulk.bulk = table->params.match_type == TABLE_ACL;
1655 
1656 	/* Send request and wait for response */
1657 	rsp = pipeline_msg_send_recv(p, req);
1658 	if (rsp == NULL) {
1659 		table_rule_list_free(list);
1660 		return -ENOMEM;
1661 	}
1662 
1663 	/* Read response */
1664 	status = rsp->status;
1665 	if (status == 0) {
1666 		*n_rules_added = rsp->table_rule_add_bulk.n_rules;
1667 
1668 		table_rule_add_bulk(table, list, *n_rules_added);
1669 		*n_rules_not_added = table_rule_list_free(list);
1670 	} else
1671 		table_rule_list_free(list);
1672 
1673 
1674 	/* Free response */
1675 	pipeline_msg_free(rsp);
1676 
1677 	return status;
1678 }
1679 
1680 int
1681 pipeline_table_rule_delete(const char *pipeline_name,
1682 	uint32_t table_id,
1683 	struct table_rule_match *match)
1684 {
1685 	struct pipeline *p;
1686 	struct table *table;
1687 	struct pipeline_msg_req *req;
1688 	struct pipeline_msg_rsp *rsp;
1689 	int status;
1690 
1691 	/* Check input params */
1692 	if ((pipeline_name == NULL) ||
1693 		(match == NULL))
1694 		return -1;
1695 
1696 	p = pipeline_find(pipeline_name);
1697 	if ((p == NULL) ||
1698 		(table_id >= p->n_tables) ||
1699 		match_check(match, p, table_id))
1700 		return -1;
1701 
1702 	table = &p->table[table_id];
1703 
1704 	if (!pipeline_is_running(p)) {
1705 		union table_rule_match_low_level match_ll;
1706 		int key_found;
1707 
1708 		status = match_convert(match, &match_ll, 0);
1709 		if (status)
1710 			return -1;
1711 
1712 		status = rte_pipeline_table_entry_delete(p->p,
1713 				table_id,
1714 				&match_ll,
1715 				&key_found,
1716 				NULL);
1717 
1718 		if (status == 0)
1719 			table_rule_delete(table, match);
1720 
1721 		return status;
1722 	}
1723 
1724 	/* Allocate request */
1725 	req = pipeline_msg_alloc();
1726 	if (req == NULL)
1727 		return -1;
1728 
1729 	/* Write request */
1730 	req->type = PIPELINE_REQ_TABLE_RULE_DELETE;
1731 	req->id = table_id;
1732 	memcpy(&req->table_rule_delete.match, match, sizeof(*match));
1733 
1734 	/* Send request and wait for response */
1735 	rsp = pipeline_msg_send_recv(p, req);
1736 	if (rsp == NULL)
1737 		return -1;
1738 
1739 	/* Read response */
1740 	status = rsp->status;
1741 	if (status == 0)
1742 		table_rule_delete(table, match);
1743 
1744 	/* Free response */
1745 	pipeline_msg_free(rsp);
1746 
1747 	return status;
1748 }
1749 
1750 int
1751 pipeline_table_rule_delete_default(const char *pipeline_name,
1752 	uint32_t table_id)
1753 {
1754 	struct pipeline *p;
1755 	struct table *table;
1756 	struct pipeline_msg_req *req;
1757 	struct pipeline_msg_rsp *rsp;
1758 	int status;
1759 
1760 	/* Check input params */
1761 	if (pipeline_name == NULL)
1762 		return -1;
1763 
1764 	p = pipeline_find(pipeline_name);
1765 	if ((p == NULL) ||
1766 		(table_id >= p->n_tables))
1767 		return -1;
1768 
1769 	table = &p->table[table_id];
1770 
1771 	if (!pipeline_is_running(p)) {
1772 		status = rte_pipeline_table_default_entry_delete(p->p,
1773 			table_id,
1774 			NULL);
1775 
1776 		if (status == 0)
1777 			table_rule_default_delete(table);
1778 
1779 		return status;
1780 	}
1781 
1782 	/* Allocate request */
1783 	req = pipeline_msg_alloc();
1784 	if (req == NULL)
1785 		return -1;
1786 
1787 	/* Write request */
1788 	req->type = PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT;
1789 	req->id = table_id;
1790 
1791 	/* Send request and wait for response */
1792 	rsp = pipeline_msg_send_recv(p, req);
1793 	if (rsp == NULL)
1794 		return -1;
1795 
1796 	/* Read response */
1797 	status = rsp->status;
1798 	if (status == 0)
1799 		table_rule_default_delete(table);
1800 
1801 	/* Free response */
1802 	pipeline_msg_free(rsp);
1803 
1804 	return status;
1805 }
1806 
1807 int
1808 pipeline_table_rule_stats_read(const char *pipeline_name,
1809 	uint32_t table_id,
1810 	struct table_rule_match *match,
1811 	struct rte_table_action_stats_counters *stats,
1812 	int clear)
1813 {
1814 	struct pipeline *p;
1815 	struct table *table;
1816 	struct pipeline_msg_req *req;
1817 	struct pipeline_msg_rsp *rsp;
1818 	struct table_rule *rule;
1819 	int status;
1820 
1821 	/* Check input params */
1822 	if ((pipeline_name == NULL) ||
1823 		(match == NULL) ||
1824 		(stats == NULL))
1825 		return -1;
1826 
1827 	p = pipeline_find(pipeline_name);
1828 	if ((p == NULL) ||
1829 		(table_id >= p->n_tables) ||
1830 		match_check(match, p, table_id))
1831 		return -1;
1832 
1833 	table = &p->table[table_id];
1834 	rule = table_rule_find(table, match);
1835 	if (rule == NULL)
1836 		return -1;
1837 
1838 	if (!pipeline_is_running(p)) {
1839 		status = rte_table_action_stats_read(table->a,
1840 			rule->data,
1841 			stats,
1842 			clear);
1843 
1844 		return status;
1845 	}
1846 
1847 	/* Allocate request */
1848 	req = pipeline_msg_alloc();
1849 	if (req == NULL)
1850 		return -1;
1851 
1852 	/* Write request */
1853 	req->type = PIPELINE_REQ_TABLE_RULE_STATS_READ;
1854 	req->id = table_id;
1855 	req->table_rule_stats_read.data = rule->data;
1856 	req->table_rule_stats_read.clear = clear;
1857 
1858 	/* Send request and wait for response */
1859 	rsp = pipeline_msg_send_recv(p, req);
1860 	if (rsp == NULL)
1861 		return -1;
1862 
1863 	/* Read response */
1864 	status = rsp->status;
1865 	if (status == 0)
1866 		memcpy(stats, &rsp->table_rule_stats_read.stats, sizeof(*stats));
1867 
1868 	/* Free response */
1869 	pipeline_msg_free(rsp);
1870 
1871 	return status;
1872 }
1873 
1874 int
1875 pipeline_table_mtr_profile_add(const char *pipeline_name,
1876 	uint32_t table_id,
1877 	uint32_t meter_profile_id,
1878 	struct rte_table_action_meter_profile *profile)
1879 {
1880 	struct pipeline *p;
1881 	struct pipeline_msg_req *req;
1882 	struct pipeline_msg_rsp *rsp;
1883 	int status;
1884 
1885 	/* Check input params */
1886 	if ((pipeline_name == NULL) ||
1887 		(profile == NULL))
1888 		return -1;
1889 
1890 	p = pipeline_find(pipeline_name);
1891 	if ((p == NULL) ||
1892 		(table_id >= p->n_tables))
1893 		return -1;
1894 
1895 	if (!pipeline_is_running(p)) {
1896 		struct rte_table_action *a = p->table[table_id].a;
1897 
1898 		status = rte_table_action_meter_profile_add(a,
1899 			meter_profile_id,
1900 			profile);
1901 
1902 		return status;
1903 	}
1904 
1905 	/* Allocate request */
1906 	req = pipeline_msg_alloc();
1907 	if (req == NULL)
1908 		return -1;
1909 
1910 	/* Write request */
1911 	req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_ADD;
1912 	req->id = table_id;
1913 	req->table_mtr_profile_add.meter_profile_id = meter_profile_id;
1914 	memcpy(&req->table_mtr_profile_add.profile, profile, sizeof(*profile));
1915 
1916 	/* Send request and wait for response */
1917 	rsp = pipeline_msg_send_recv(p, req);
1918 	if (rsp == NULL)
1919 		return -1;
1920 
1921 	/* Read response */
1922 	status = rsp->status;
1923 
1924 	/* Free response */
1925 	pipeline_msg_free(rsp);
1926 
1927 	return status;
1928 }
1929 
1930 int
1931 pipeline_table_mtr_profile_delete(const char *pipeline_name,
1932 	uint32_t table_id,
1933 	uint32_t meter_profile_id)
1934 {
1935 	struct pipeline *p;
1936 	struct pipeline_msg_req *req;
1937 	struct pipeline_msg_rsp *rsp;
1938 	int status;
1939 
1940 	/* Check input params */
1941 	if (pipeline_name == NULL)
1942 		return -1;
1943 
1944 	p = pipeline_find(pipeline_name);
1945 	if ((p == NULL) ||
1946 		(table_id >= p->n_tables))
1947 		return -1;
1948 
1949 	if (!pipeline_is_running(p)) {
1950 		struct rte_table_action *a = p->table[table_id].a;
1951 
1952 		status = rte_table_action_meter_profile_delete(a,
1953 				meter_profile_id);
1954 
1955 		return status;
1956 	}
1957 
1958 	/* Allocate request */
1959 	req = pipeline_msg_alloc();
1960 	if (req == NULL)
1961 		return -1;
1962 
1963 	/* Write request */
1964 	req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE;
1965 	req->id = table_id;
1966 	req->table_mtr_profile_delete.meter_profile_id = meter_profile_id;
1967 
1968 	/* Send request and wait for response */
1969 	rsp = pipeline_msg_send_recv(p, req);
1970 	if (rsp == NULL)
1971 		return -1;
1972 
1973 	/* Read response */
1974 	status = rsp->status;
1975 
1976 	/* Free response */
1977 	pipeline_msg_free(rsp);
1978 
1979 	return status;
1980 }
1981 
1982 int
1983 pipeline_table_rule_mtr_read(const char *pipeline_name,
1984 	uint32_t table_id,
1985 	struct table_rule_match *match,
1986 	struct rte_table_action_mtr_counters *stats,
1987 	int clear)
1988 {
1989 	struct pipeline *p;
1990 	struct table *table;
1991 	struct pipeline_msg_req *req;
1992 	struct pipeline_msg_rsp *rsp;
1993 	struct table_rule *rule;
1994 	uint32_t tc_mask;
1995 	int status;
1996 
1997 	/* Check input params */
1998 	if ((pipeline_name == NULL) ||
1999 		(match == NULL) ||
2000 		(stats == NULL))
2001 		return -1;
2002 
2003 	p = pipeline_find(pipeline_name);
2004 	if ((p == NULL) ||
2005 		(table_id >= p->n_tables) ||
2006 		match_check(match, p, table_id))
2007 		return -1;
2008 
2009 	table = &p->table[table_id];
2010 	tc_mask = (1 << table->ap->params.mtr.n_tc) - 1;
2011 
2012 	rule = table_rule_find(table, match);
2013 	if (rule == NULL)
2014 		return -1;
2015 
2016 	if (!pipeline_is_running(p)) {
2017 		status = rte_table_action_meter_read(table->a,
2018 				rule->data,
2019 				tc_mask,
2020 				stats,
2021 				clear);
2022 
2023 		return status;
2024 	}
2025 
2026 	/* Allocate request */
2027 	req = pipeline_msg_alloc();
2028 	if (req == NULL)
2029 		return -1;
2030 
2031 	/* Write request */
2032 	req->type = PIPELINE_REQ_TABLE_RULE_MTR_READ;
2033 	req->id = table_id;
2034 	req->table_rule_mtr_read.data = rule->data;
2035 	req->table_rule_mtr_read.tc_mask = tc_mask;
2036 	req->table_rule_mtr_read.clear = clear;
2037 
2038 	/* Send request and wait for response */
2039 	rsp = pipeline_msg_send_recv(p, req);
2040 	if (rsp == NULL)
2041 		return -1;
2042 
2043 	/* Read response */
2044 	status = rsp->status;
2045 	if (status == 0)
2046 		memcpy(stats, &rsp->table_rule_mtr_read.stats, sizeof(*stats));
2047 
2048 	/* Free response */
2049 	pipeline_msg_free(rsp);
2050 
2051 	return status;
2052 }
2053 
2054 int
2055 pipeline_table_dscp_table_update(const char *pipeline_name,
2056 	uint32_t table_id,
2057 	uint64_t dscp_mask,
2058 	struct rte_table_action_dscp_table *dscp_table)
2059 {
2060 	struct pipeline *p;
2061 	struct pipeline_msg_req *req;
2062 	struct pipeline_msg_rsp *rsp;
2063 	int status;
2064 
2065 	/* Check input params */
2066 	if ((pipeline_name == NULL) ||
2067 		(dscp_table == NULL))
2068 		return -1;
2069 
2070 	p = pipeline_find(pipeline_name);
2071 	if ((p == NULL) ||
2072 		(table_id >= p->n_tables))
2073 		return -1;
2074 
2075 	if (!pipeline_is_running(p)) {
2076 		struct rte_table_action *a = p->table[table_id].a;
2077 
2078 		status = rte_table_action_dscp_table_update(a,
2079 				dscp_mask,
2080 				dscp_table);
2081 
2082 		return status;
2083 	}
2084 
2085 	/* Allocate request */
2086 	req = pipeline_msg_alloc();
2087 	if (req == NULL)
2088 		return -1;
2089 
2090 	/* Write request */
2091 	req->type = PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE;
2092 	req->id = table_id;
2093 	req->table_dscp_table_update.dscp_mask = dscp_mask;
2094 	memcpy(&req->table_dscp_table_update.dscp_table,
2095 		dscp_table, sizeof(*dscp_table));
2096 
2097 	/* Send request and wait for response */
2098 	rsp = pipeline_msg_send_recv(p, req);
2099 	if (rsp == NULL)
2100 		return -1;
2101 
2102 	/* Read response */
2103 	status = rsp->status;
2104 
2105 	/* Free response */
2106 	pipeline_msg_free(rsp);
2107 
2108 	return status;
2109 }
2110 
2111 int
2112 pipeline_table_rule_ttl_read(const char *pipeline_name,
2113 	uint32_t table_id,
2114 	struct table_rule_match *match,
2115 	struct rte_table_action_ttl_counters *stats,
2116 	int clear)
2117 {
2118 	struct pipeline *p;
2119 	struct table *table;
2120 	struct pipeline_msg_req *req;
2121 	struct pipeline_msg_rsp *rsp;
2122 	struct table_rule *rule;
2123 	int status;
2124 
2125 	/* Check input params */
2126 	if ((pipeline_name == NULL) ||
2127 		(match == NULL) ||
2128 		(stats == NULL))
2129 		return -1;
2130 
2131 	p = pipeline_find(pipeline_name);
2132 	if ((p == NULL) ||
2133 		(table_id >= p->n_tables) ||
2134 		match_check(match, p, table_id))
2135 		return -1;
2136 
2137 	table = &p->table[table_id];
2138 	if (!table->ap->params.ttl.n_packets_enabled)
2139 		return -1;
2140 
2141 	rule = table_rule_find(table, match);
2142 	if (rule == NULL)
2143 		return -1;
2144 
2145 	if (!pipeline_is_running(p)) {
2146 		status = rte_table_action_ttl_read(table->a,
2147 				rule->data,
2148 				stats,
2149 				clear);
2150 
2151 		return status;
2152 	}
2153 
2154 	/* Allocate request */
2155 	req = pipeline_msg_alloc();
2156 	if (req == NULL)
2157 		return -1;
2158 
2159 	/* Write request */
2160 	req->type = PIPELINE_REQ_TABLE_RULE_TTL_READ;
2161 	req->id = table_id;
2162 	req->table_rule_ttl_read.data = rule->data;
2163 	req->table_rule_ttl_read.clear = clear;
2164 
2165 	/* Send request and wait for response */
2166 	rsp = pipeline_msg_send_recv(p, req);
2167 	if (rsp == NULL)
2168 		return -1;
2169 
2170 	/* Read response */
2171 	status = rsp->status;
2172 	if (status == 0)
2173 		memcpy(stats, &rsp->table_rule_ttl_read.stats, sizeof(*stats));
2174 
2175 	/* Free response */
2176 	pipeline_msg_free(rsp);
2177 
2178 	return status;
2179 }
2180 
2181 int
2182 pipeline_table_rule_time_read(const char *pipeline_name,
2183 	uint32_t table_id,
2184 	struct table_rule_match *match,
2185 	uint64_t *timestamp)
2186 {
2187 	struct pipeline *p;
2188 	struct table *table;
2189 	struct pipeline_msg_req *req;
2190 	struct pipeline_msg_rsp *rsp;
2191 	struct table_rule *rule;
2192 	int status;
2193 
2194 	/* Check input params */
2195 	if ((pipeline_name == NULL) ||
2196 		(match == NULL) ||
2197 		(timestamp == NULL))
2198 		return -1;
2199 
2200 	p = pipeline_find(pipeline_name);
2201 	if ((p == NULL) ||
2202 		(table_id >= p->n_tables) ||
2203 		match_check(match, p, table_id))
2204 		return -1;
2205 
2206 	table = &p->table[table_id];
2207 
2208 	rule = table_rule_find(table, match);
2209 	if (rule == NULL)
2210 		return -1;
2211 
2212 	if (!pipeline_is_running(p)) {
2213 		status = rte_table_action_time_read(table->a,
2214 				rule->data,
2215 				timestamp);
2216 
2217 		return status;
2218 	}
2219 
2220 	/* Allocate request */
2221 	req = pipeline_msg_alloc();
2222 	if (req == NULL)
2223 		return -1;
2224 
2225 	/* Write request */
2226 	req->type = PIPELINE_REQ_TABLE_RULE_TIME_READ;
2227 	req->id = table_id;
2228 	req->table_rule_time_read.data = rule->data;
2229 
2230 	/* Send request and wait for response */
2231 	rsp = pipeline_msg_send_recv(p, req);
2232 	if (rsp == NULL)
2233 		return -1;
2234 
2235 	/* Read response */
2236 	status = rsp->status;
2237 	if (status == 0)
2238 		*timestamp = rsp->table_rule_time_read.timestamp;
2239 
2240 	/* Free response */
2241 	pipeline_msg_free(rsp);
2242 
2243 	return status;
2244 }
2245 
2246 /**
2247  * Data plane threads: message handling
2248  */
2249 static inline struct pipeline_msg_req *
2250 pipeline_msg_recv(struct rte_ring *msgq_req)
2251 {
2252 	struct pipeline_msg_req *req;
2253 
2254 	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
2255 
2256 	if (status != 0)
2257 		return NULL;
2258 
2259 	return req;
2260 }
2261 
2262 static inline void
2263 pipeline_msg_send(struct rte_ring *msgq_rsp,
2264 	struct pipeline_msg_rsp *rsp)
2265 {
2266 	int status;
2267 
2268 	do {
2269 		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
2270 	} while (status == -ENOBUFS);
2271 }
2272 
2273 static struct pipeline_msg_rsp *
2274 pipeline_msg_handle_port_in_stats_read(struct pipeline_data *p,
2275 	struct pipeline_msg_req *req)
2276 {
2277 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2278 	uint32_t port_id = req->id;
2279 	int clear = req->port_in_stats_read.clear;
2280 
2281 	rsp->status = rte_pipeline_port_in_stats_read(p->p,
2282 		port_id,
2283 		&rsp->port_in_stats_read.stats,
2284 		clear);
2285 
2286 	return rsp;
2287 }
2288 
2289 static struct pipeline_msg_rsp *
2290 pipeline_msg_handle_port_in_enable(struct pipeline_data *p,
2291 	struct pipeline_msg_req *req)
2292 {
2293 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2294 	uint32_t port_id = req->id;
2295 
2296 	rsp->status = rte_pipeline_port_in_enable(p->p,
2297 		port_id);
2298 
2299 	return rsp;
2300 }
2301 
2302 static struct pipeline_msg_rsp *
2303 pipeline_msg_handle_port_in_disable(struct pipeline_data *p,
2304 	struct pipeline_msg_req *req)
2305 {
2306 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2307 	uint32_t port_id = req->id;
2308 
2309 	rsp->status = rte_pipeline_port_in_disable(p->p,
2310 		port_id);
2311 
2312 	return rsp;
2313 }
2314 
2315 static struct pipeline_msg_rsp *
2316 pipeline_msg_handle_port_out_stats_read(struct pipeline_data *p,
2317 	struct pipeline_msg_req *req)
2318 {
2319 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2320 	uint32_t port_id = req->id;
2321 	int clear = req->port_out_stats_read.clear;
2322 
2323 	rsp->status = rte_pipeline_port_out_stats_read(p->p,
2324 		port_id,
2325 		&rsp->port_out_stats_read.stats,
2326 		clear);
2327 
2328 	return rsp;
2329 }
2330 
2331 static struct pipeline_msg_rsp *
2332 pipeline_msg_handle_table_stats_read(struct pipeline_data *p,
2333 	struct pipeline_msg_req *req)
2334 {
2335 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2336 	uint32_t port_id = req->id;
2337 	int clear = req->table_stats_read.clear;
2338 
2339 	rsp->status = rte_pipeline_table_stats_read(p->p,
2340 		port_id,
2341 		&rsp->table_stats_read.stats,
2342 		clear);
2343 
2344 	return rsp;
2345 }
2346 
2347 static int
2348 match_convert_ipv6_depth(uint32_t depth, uint32_t *depth32)
2349 {
2350 	if (depth > 128)
2351 		return -1;
2352 
2353 	switch (depth / 32) {
2354 	case 0:
2355 		depth32[0] = depth;
2356 		depth32[1] = 0;
2357 		depth32[2] = 0;
2358 		depth32[3] = 0;
2359 		return 0;
2360 
2361 	case 1:
2362 		depth32[0] = 32;
2363 		depth32[1] = depth - 32;
2364 		depth32[2] = 0;
2365 		depth32[3] = 0;
2366 		return 0;
2367 
2368 	case 2:
2369 		depth32[0] = 32;
2370 		depth32[1] = 32;
2371 		depth32[2] = depth - 64;
2372 		depth32[3] = 0;
2373 		return 0;
2374 
2375 	case 3:
2376 		depth32[0] = 32;
2377 		depth32[1] = 32;
2378 		depth32[2] = 32;
2379 		depth32[3] = depth - 96;
2380 		return 0;
2381 
2382 	case 4:
2383 		depth32[0] = 32;
2384 		depth32[1] = 32;
2385 		depth32[2] = 32;
2386 		depth32[3] = 32;
2387 		return 0;
2388 
2389 	default:
2390 		return -1;
2391 	}
2392 }
2393 
2394 static int
2395 match_convert(struct table_rule_match *mh,
2396 	union table_rule_match_low_level *ml,
2397 	int add)
2398 {
2399 	memset(ml, 0, sizeof(*ml));
2400 
2401 	switch (mh->match_type) {
2402 	case TABLE_ACL:
2403 		if (mh->match.acl.ip_version)
2404 			if (add) {
2405 				ml->acl_add.field_value[0].value.u8 =
2406 					mh->match.acl.proto;
2407 				ml->acl_add.field_value[0].mask_range.u8 =
2408 					mh->match.acl.proto_mask;
2409 
2410 				ml->acl_add.field_value[1].value.u32 =
2411 					mh->match.acl.ipv4.sa;
2412 				ml->acl_add.field_value[1].mask_range.u32 =
2413 					mh->match.acl.sa_depth;
2414 
2415 				ml->acl_add.field_value[2].value.u32 =
2416 					mh->match.acl.ipv4.da;
2417 				ml->acl_add.field_value[2].mask_range.u32 =
2418 					mh->match.acl.da_depth;
2419 
2420 				ml->acl_add.field_value[3].value.u16 =
2421 					mh->match.acl.sp0;
2422 				ml->acl_add.field_value[3].mask_range.u16 =
2423 					mh->match.acl.sp1;
2424 
2425 				ml->acl_add.field_value[4].value.u16 =
2426 					mh->match.acl.dp0;
2427 				ml->acl_add.field_value[4].mask_range.u16 =
2428 					mh->match.acl.dp1;
2429 
2430 				ml->acl_add.priority =
2431 					(int32_t) mh->match.acl.priority;
2432 			} else {
2433 				ml->acl_delete.field_value[0].value.u8 =
2434 					mh->match.acl.proto;
2435 				ml->acl_delete.field_value[0].mask_range.u8 =
2436 					mh->match.acl.proto_mask;
2437 
2438 				ml->acl_delete.field_value[1].value.u32 =
2439 					mh->match.acl.ipv4.sa;
2440 				ml->acl_delete.field_value[1].mask_range.u32 =
2441 					mh->match.acl.sa_depth;
2442 
2443 				ml->acl_delete.field_value[2].value.u32 =
2444 					mh->match.acl.ipv4.da;
2445 				ml->acl_delete.field_value[2].mask_range.u32 =
2446 					mh->match.acl.da_depth;
2447 
2448 				ml->acl_delete.field_value[3].value.u16 =
2449 					mh->match.acl.sp0;
2450 				ml->acl_delete.field_value[3].mask_range.u16 =
2451 					mh->match.acl.sp1;
2452 
2453 				ml->acl_delete.field_value[4].value.u16 =
2454 					mh->match.acl.dp0;
2455 				ml->acl_delete.field_value[4].mask_range.u16 =
2456 					mh->match.acl.dp1;
2457 			}
2458 		else
2459 			if (add) {
2460 				uint32_t *sa32 =
2461 					(uint32_t *) mh->match.acl.ipv6.sa;
2462 				uint32_t *da32 =
2463 					(uint32_t *) mh->match.acl.ipv6.da;
2464 				uint32_t sa32_depth[4], da32_depth[4];
2465 				int status;
2466 
2467 				status = match_convert_ipv6_depth(
2468 					mh->match.acl.sa_depth,
2469 					sa32_depth);
2470 				if (status)
2471 					return status;
2472 
2473 				status = match_convert_ipv6_depth(
2474 					mh->match.acl.da_depth,
2475 					da32_depth);
2476 				if (status)
2477 					return status;
2478 
2479 				ml->acl_add.field_value[0].value.u8 =
2480 					mh->match.acl.proto;
2481 				ml->acl_add.field_value[0].mask_range.u8 =
2482 					mh->match.acl.proto_mask;
2483 
2484 				ml->acl_add.field_value[1].value.u32 =
2485 					rte_be_to_cpu_32(sa32[0]);
2486 				ml->acl_add.field_value[1].mask_range.u32 =
2487 					sa32_depth[0];
2488 				ml->acl_add.field_value[2].value.u32 =
2489 					rte_be_to_cpu_32(sa32[1]);
2490 				ml->acl_add.field_value[2].mask_range.u32 =
2491 					sa32_depth[1];
2492 				ml->acl_add.field_value[3].value.u32 =
2493 					rte_be_to_cpu_32(sa32[2]);
2494 				ml->acl_add.field_value[3].mask_range.u32 =
2495 					sa32_depth[2];
2496 				ml->acl_add.field_value[4].value.u32 =
2497 					rte_be_to_cpu_32(sa32[3]);
2498 				ml->acl_add.field_value[4].mask_range.u32 =
2499 					sa32_depth[3];
2500 
2501 				ml->acl_add.field_value[5].value.u32 =
2502 					rte_be_to_cpu_32(da32[0]);
2503 				ml->acl_add.field_value[5].mask_range.u32 =
2504 					da32_depth[0];
2505 				ml->acl_add.field_value[6].value.u32 =
2506 					rte_be_to_cpu_32(da32[1]);
2507 				ml->acl_add.field_value[6].mask_range.u32 =
2508 					da32_depth[1];
2509 				ml->acl_add.field_value[7].value.u32 =
2510 					rte_be_to_cpu_32(da32[2]);
2511 				ml->acl_add.field_value[7].mask_range.u32 =
2512 					da32_depth[2];
2513 				ml->acl_add.field_value[8].value.u32 =
2514 					rte_be_to_cpu_32(da32[3]);
2515 				ml->acl_add.field_value[8].mask_range.u32 =
2516 					da32_depth[3];
2517 
2518 				ml->acl_add.field_value[9].value.u16 =
2519 					mh->match.acl.sp0;
2520 				ml->acl_add.field_value[9].mask_range.u16 =
2521 					mh->match.acl.sp1;
2522 
2523 				ml->acl_add.field_value[10].value.u16 =
2524 					mh->match.acl.dp0;
2525 				ml->acl_add.field_value[10].mask_range.u16 =
2526 					mh->match.acl.dp1;
2527 
2528 				ml->acl_add.priority =
2529 					(int32_t) mh->match.acl.priority;
2530 			} else {
2531 				uint32_t *sa32 =
2532 					(uint32_t *) mh->match.acl.ipv6.sa;
2533 				uint32_t *da32 =
2534 					(uint32_t *) mh->match.acl.ipv6.da;
2535 				uint32_t sa32_depth[4], da32_depth[4];
2536 				int status;
2537 
2538 				status = match_convert_ipv6_depth(
2539 					mh->match.acl.sa_depth,
2540 					sa32_depth);
2541 				if (status)
2542 					return status;
2543 
2544 				status = match_convert_ipv6_depth(
2545 					mh->match.acl.da_depth,
2546 					da32_depth);
2547 				if (status)
2548 					return status;
2549 
2550 				ml->acl_delete.field_value[0].value.u8 =
2551 					mh->match.acl.proto;
2552 				ml->acl_delete.field_value[0].mask_range.u8 =
2553 					mh->match.acl.proto_mask;
2554 
2555 				ml->acl_delete.field_value[1].value.u32 =
2556 					rte_be_to_cpu_32(sa32[0]);
2557 				ml->acl_delete.field_value[1].mask_range.u32 =
2558 					sa32_depth[0];
2559 				ml->acl_delete.field_value[2].value.u32 =
2560 					rte_be_to_cpu_32(sa32[1]);
2561 				ml->acl_delete.field_value[2].mask_range.u32 =
2562 					sa32_depth[1];
2563 				ml->acl_delete.field_value[3].value.u32 =
2564 					rte_be_to_cpu_32(sa32[2]);
2565 				ml->acl_delete.field_value[3].mask_range.u32 =
2566 					sa32_depth[2];
2567 				ml->acl_delete.field_value[4].value.u32 =
2568 					rte_be_to_cpu_32(sa32[3]);
2569 				ml->acl_delete.field_value[4].mask_range.u32 =
2570 					sa32_depth[3];
2571 
2572 				ml->acl_delete.field_value[5].value.u32 =
2573 					rte_be_to_cpu_32(da32[0]);
2574 				ml->acl_delete.field_value[5].mask_range.u32 =
2575 					da32_depth[0];
2576 				ml->acl_delete.field_value[6].value.u32 =
2577 					rte_be_to_cpu_32(da32[1]);
2578 				ml->acl_delete.field_value[6].mask_range.u32 =
2579 					da32_depth[1];
2580 				ml->acl_delete.field_value[7].value.u32 =
2581 					rte_be_to_cpu_32(da32[2]);
2582 				ml->acl_delete.field_value[7].mask_range.u32 =
2583 					da32_depth[2];
2584 				ml->acl_delete.field_value[8].value.u32 =
2585 					rte_be_to_cpu_32(da32[3]);
2586 				ml->acl_delete.field_value[8].mask_range.u32 =
2587 					da32_depth[3];
2588 
2589 				ml->acl_delete.field_value[9].value.u16 =
2590 					mh->match.acl.sp0;
2591 				ml->acl_delete.field_value[9].mask_range.u16 =
2592 					mh->match.acl.sp1;
2593 
2594 				ml->acl_delete.field_value[10].value.u16 =
2595 					mh->match.acl.dp0;
2596 				ml->acl_delete.field_value[10].mask_range.u16 =
2597 					mh->match.acl.dp1;
2598 			}
2599 		return 0;
2600 
2601 	case TABLE_ARRAY:
2602 		ml->array.pos = mh->match.array.pos;
2603 		return 0;
2604 
2605 	case TABLE_HASH:
2606 		memcpy(ml->hash, mh->match.hash.key, sizeof(ml->hash));
2607 		return 0;
2608 
2609 	case TABLE_LPM:
2610 		if (mh->match.lpm.ip_version) {
2611 			ml->lpm_ipv4.ip = mh->match.lpm.ipv4;
2612 			ml->lpm_ipv4.depth = mh->match.lpm.depth;
2613 		} else {
2614 			memcpy(ml->lpm_ipv6.ip,
2615 				mh->match.lpm.ipv6, sizeof(ml->lpm_ipv6.ip));
2616 			ml->lpm_ipv6.depth = mh->match.lpm.depth;
2617 		}
2618 
2619 		return 0;
2620 
2621 	default:
2622 		return -1;
2623 	}
2624 }
2625 
2626 static int
2627 action_convert(struct rte_table_action *a,
2628 	struct table_rule_action *action,
2629 	struct rte_pipeline_table_entry *data)
2630 {
2631 	int status;
2632 
2633 	/* Apply actions */
2634 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
2635 		status = rte_table_action_apply(a,
2636 			data,
2637 			RTE_TABLE_ACTION_FWD,
2638 			&action->fwd);
2639 
2640 		if (status)
2641 			return status;
2642 	}
2643 
2644 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_LB)) {
2645 		status = rte_table_action_apply(a,
2646 			data,
2647 			RTE_TABLE_ACTION_LB,
2648 			&action->lb);
2649 
2650 		if (status)
2651 			return status;
2652 	}
2653 
2654 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
2655 		status = rte_table_action_apply(a,
2656 			data,
2657 			RTE_TABLE_ACTION_MTR,
2658 			&action->mtr);
2659 
2660 		if (status)
2661 			return status;
2662 	}
2663 
2664 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
2665 		status = rte_table_action_apply(a,
2666 			data,
2667 			RTE_TABLE_ACTION_TM,
2668 			&action->tm);
2669 
2670 		if (status)
2671 			return status;
2672 	}
2673 
2674 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
2675 		status = rte_table_action_apply(a,
2676 			data,
2677 			RTE_TABLE_ACTION_ENCAP,
2678 			&action->encap);
2679 
2680 		if (status)
2681 			return status;
2682 	}
2683 
2684 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
2685 		status = rte_table_action_apply(a,
2686 			data,
2687 			RTE_TABLE_ACTION_NAT,
2688 			&action->nat);
2689 
2690 		if (status)
2691 			return status;
2692 	}
2693 
2694 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TTL)) {
2695 		status = rte_table_action_apply(a,
2696 			data,
2697 			RTE_TABLE_ACTION_TTL,
2698 			&action->ttl);
2699 
2700 		if (status)
2701 			return status;
2702 	}
2703 
2704 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_STATS)) {
2705 		status = rte_table_action_apply(a,
2706 			data,
2707 			RTE_TABLE_ACTION_STATS,
2708 			&action->stats);
2709 
2710 		if (status)
2711 			return status;
2712 	}
2713 
2714 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TIME)) {
2715 		status = rte_table_action_apply(a,
2716 			data,
2717 			RTE_TABLE_ACTION_TIME,
2718 			&action->time);
2719 
2720 		if (status)
2721 			return status;
2722 	}
2723 
2724 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_SYM_CRYPTO)) {
2725 		status = rte_table_action_apply(a,
2726 			data,
2727 			RTE_TABLE_ACTION_SYM_CRYPTO,
2728 			&action->sym_crypto);
2729 
2730 		if (status)
2731 			return status;
2732 	}
2733 
2734 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TAG)) {
2735 		status = rte_table_action_apply(a,
2736 			data,
2737 			RTE_TABLE_ACTION_TAG,
2738 			&action->tag);
2739 
2740 		if (status)
2741 			return status;
2742 	}
2743 
2744 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_DECAP)) {
2745 		status = rte_table_action_apply(a,
2746 			data,
2747 			RTE_TABLE_ACTION_DECAP,
2748 			&action->decap);
2749 
2750 		if (status)
2751 			return status;
2752 	}
2753 
2754 	return 0;
2755 }
2756 
2757 static struct pipeline_msg_rsp *
2758 pipeline_msg_handle_table_rule_add(struct pipeline_data *p,
2759 	struct pipeline_msg_req *req)
2760 {
2761 	union table_rule_match_low_level match_ll;
2762 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2763 	struct table_rule_match *match = &req->table_rule_add.match;
2764 	struct table_rule_action *action = &req->table_rule_add.action;
2765 	struct rte_pipeline_table_entry *data_in, *data_out;
2766 	uint32_t table_id = req->id;
2767 	int key_found, status;
2768 	struct rte_table_action *a = p->table_data[table_id].a;
2769 
2770 	/* Apply actions */
2771 	memset(p->buffer, 0, sizeof(p->buffer));
2772 	data_in = (struct rte_pipeline_table_entry *) p->buffer;
2773 
2774 	status = match_convert(match, &match_ll, 1);
2775 	if (status) {
2776 		rsp->status = -1;
2777 		return rsp;
2778 	}
2779 
2780 	status = action_convert(a, action, data_in);
2781 	if (status) {
2782 		rsp->status = -1;
2783 		return rsp;
2784 	}
2785 
2786 	status = rte_pipeline_table_entry_add(p->p,
2787 		table_id,
2788 		&match_ll,
2789 		data_in,
2790 		&key_found,
2791 		&data_out);
2792 	if (status) {
2793 		rsp->status = -1;
2794 		return rsp;
2795 	}
2796 
2797 	/* Write response */
2798 	rsp->status = 0;
2799 	rsp->table_rule_add.data = data_out;
2800 
2801 	return rsp;
2802 }
2803 
2804 static struct pipeline_msg_rsp *
2805 pipeline_msg_handle_table_rule_add_default(struct pipeline_data *p,
2806 	struct pipeline_msg_req *req)
2807 {
2808 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2809 	struct table_rule_action *action = &req->table_rule_add_default.action;
2810 	struct rte_pipeline_table_entry *data_in, *data_out;
2811 	uint32_t table_id = req->id;
2812 	int status;
2813 
2814 	/* Apply actions */
2815 	memset(p->buffer, 0, sizeof(p->buffer));
2816 	data_in = (struct rte_pipeline_table_entry *) p->buffer;
2817 
2818 	data_in->action = action->fwd.action;
2819 	if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
2820 		data_in->port_id = action->fwd.id;
2821 	if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
2822 		data_in->table_id = action->fwd.id;
2823 
2824 	/* Add default rule to table */
2825 	status = rte_pipeline_table_default_entry_add(p->p,
2826 		table_id,
2827 		data_in,
2828 		&data_out);
2829 	if (status) {
2830 		rsp->status = -1;
2831 		return rsp;
2832 	}
2833 
2834 	/* Write response */
2835 	rsp->status = 0;
2836 	rsp->table_rule_add_default.data = data_out;
2837 
2838 	return rsp;
2839 }
2840 
2841 static struct pipeline_msg_rsp *
2842 pipeline_msg_handle_table_rule_add_bulk(struct pipeline_data *p,
2843 	struct pipeline_msg_req *req)
2844 {
2845 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2846 
2847 	uint32_t table_id = req->id;
2848 	struct table_rule_list *list = req->table_rule_add_bulk.list;
2849 	uint32_t bulk = req->table_rule_add_bulk.bulk;
2850 
2851 	uint32_t n_rules_added;
2852 	int status;
2853 
2854 	struct table_ll table_ll = {
2855 		.p = p->p,
2856 		.table_id = table_id,
2857 		.a = p->table_data[table_id].a,
2858 		.bulk_supported = bulk,
2859 	};
2860 
2861 	status = table_rule_add_bulk_ll(&table_ll, list, &n_rules_added);
2862 	if (status) {
2863 		rsp->status = -1;
2864 		rsp->table_rule_add_bulk.n_rules = 0;
2865 		return rsp;
2866 	}
2867 
2868 	/* Write response */
2869 	rsp->status = 0;
2870 	rsp->table_rule_add_bulk.n_rules = n_rules_added;
2871 	return rsp;
2872 }
2873 
2874 static struct pipeline_msg_rsp *
2875 pipeline_msg_handle_table_rule_delete(struct pipeline_data *p,
2876 	struct pipeline_msg_req *req)
2877 {
2878 	union table_rule_match_low_level match_ll;
2879 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2880 	struct table_rule_match *match = &req->table_rule_delete.match;
2881 	uint32_t table_id = req->id;
2882 	int key_found, status;
2883 
2884 	status = match_convert(match, &match_ll, 0);
2885 	if (status) {
2886 		rsp->status = -1;
2887 		return rsp;
2888 	}
2889 
2890 	rsp->status = rte_pipeline_table_entry_delete(p->p,
2891 		table_id,
2892 		&match_ll,
2893 		&key_found,
2894 		NULL);
2895 
2896 	return rsp;
2897 }
2898 
2899 static struct pipeline_msg_rsp *
2900 pipeline_msg_handle_table_rule_delete_default(struct pipeline_data *p,
2901 	struct pipeline_msg_req *req)
2902 {
2903 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2904 	uint32_t table_id = req->id;
2905 
2906 	rsp->status = rte_pipeline_table_default_entry_delete(p->p,
2907 		table_id,
2908 		NULL);
2909 
2910 	return rsp;
2911 }
2912 
2913 static struct pipeline_msg_rsp *
2914 pipeline_msg_handle_table_rule_stats_read(struct pipeline_data *p,
2915 	struct pipeline_msg_req *req)
2916 {
2917 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2918 	uint32_t table_id = req->id;
2919 	void *data = req->table_rule_stats_read.data;
2920 	int clear = req->table_rule_stats_read.clear;
2921 	struct rte_table_action *a = p->table_data[table_id].a;
2922 
2923 	rsp->status = rte_table_action_stats_read(a,
2924 		data,
2925 		&rsp->table_rule_stats_read.stats,
2926 		clear);
2927 
2928 	return rsp;
2929 }
2930 
2931 static struct pipeline_msg_rsp *
2932 pipeline_msg_handle_table_mtr_profile_add(struct pipeline_data *p,
2933 	struct pipeline_msg_req *req)
2934 {
2935 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2936 	uint32_t table_id = req->id;
2937 	uint32_t meter_profile_id = req->table_mtr_profile_add.meter_profile_id;
2938 	struct rte_table_action_meter_profile *profile =
2939 		&req->table_mtr_profile_add.profile;
2940 	struct rte_table_action *a = p->table_data[table_id].a;
2941 
2942 	rsp->status = rte_table_action_meter_profile_add(a,
2943 		meter_profile_id,
2944 		profile);
2945 
2946 	return rsp;
2947 }
2948 
2949 static struct pipeline_msg_rsp *
2950 pipeline_msg_handle_table_mtr_profile_delete(struct pipeline_data *p,
2951 	struct pipeline_msg_req *req)
2952 {
2953 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2954 	uint32_t table_id = req->id;
2955 	uint32_t meter_profile_id =
2956 		req->table_mtr_profile_delete.meter_profile_id;
2957 	struct rte_table_action *a = p->table_data[table_id].a;
2958 
2959 	rsp->status = rte_table_action_meter_profile_delete(a,
2960 		meter_profile_id);
2961 
2962 	return rsp;
2963 }
2964 
2965 static struct pipeline_msg_rsp *
2966 pipeline_msg_handle_table_rule_mtr_read(struct pipeline_data *p,
2967 	struct pipeline_msg_req *req)
2968 {
2969 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2970 	uint32_t table_id = req->id;
2971 	void *data = req->table_rule_mtr_read.data;
2972 	uint32_t tc_mask = req->table_rule_mtr_read.tc_mask;
2973 	int clear = req->table_rule_mtr_read.clear;
2974 	struct rte_table_action *a = p->table_data[table_id].a;
2975 
2976 	rsp->status = rte_table_action_meter_read(a,
2977 		data,
2978 		tc_mask,
2979 		&rsp->table_rule_mtr_read.stats,
2980 		clear);
2981 
2982 	return rsp;
2983 }
2984 
2985 static struct pipeline_msg_rsp *
2986 pipeline_msg_handle_table_dscp_table_update(struct pipeline_data *p,
2987 	struct pipeline_msg_req *req)
2988 {
2989 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2990 	uint32_t table_id = req->id;
2991 	uint64_t dscp_mask = req->table_dscp_table_update.dscp_mask;
2992 	struct rte_table_action_dscp_table *dscp_table =
2993 		&req->table_dscp_table_update.dscp_table;
2994 	struct rte_table_action *a = p->table_data[table_id].a;
2995 
2996 	rsp->status = rte_table_action_dscp_table_update(a,
2997 		dscp_mask,
2998 		dscp_table);
2999 
3000 	return rsp;
3001 }
3002 
3003 static struct pipeline_msg_rsp *
3004 pipeline_msg_handle_table_rule_ttl_read(struct pipeline_data *p,
3005 	struct pipeline_msg_req *req)
3006 {
3007 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
3008 	uint32_t table_id = req->id;
3009 	void *data = req->table_rule_ttl_read.data;
3010 	int clear = req->table_rule_ttl_read.clear;
3011 	struct rte_table_action *a = p->table_data[table_id].a;
3012 
3013 	rsp->status = rte_table_action_ttl_read(a,
3014 		data,
3015 		&rsp->table_rule_ttl_read.stats,
3016 		clear);
3017 
3018 	return rsp;
3019 }
3020 
3021 static struct pipeline_msg_rsp *
3022 pipeline_msg_handle_table_rule_time_read(struct pipeline_data *p,
3023 	struct pipeline_msg_req *req)
3024 {
3025 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
3026 	uint32_t table_id = req->id;
3027 	void *data = req->table_rule_time_read.data;
3028 	struct rte_table_action *a = p->table_data[table_id].a;
3029 
3030 	rsp->status = rte_table_action_time_read(a,
3031 		data,
3032 		&rsp->table_rule_time_read.timestamp);
3033 
3034 	return rsp;
3035 }
3036 
3037 static void
3038 pipeline_msg_handle(struct pipeline_data *p)
3039 {
3040 	for ( ; ; ) {
3041 		struct pipeline_msg_req *req;
3042 		struct pipeline_msg_rsp *rsp;
3043 
3044 		req = pipeline_msg_recv(p->msgq_req);
3045 		if (req == NULL)
3046 			break;
3047 
3048 		switch (req->type) {
3049 		case PIPELINE_REQ_PORT_IN_STATS_READ:
3050 			rsp = pipeline_msg_handle_port_in_stats_read(p, req);
3051 			break;
3052 
3053 		case PIPELINE_REQ_PORT_IN_ENABLE:
3054 			rsp = pipeline_msg_handle_port_in_enable(p, req);
3055 			break;
3056 
3057 		case PIPELINE_REQ_PORT_IN_DISABLE:
3058 			rsp = pipeline_msg_handle_port_in_disable(p, req);
3059 			break;
3060 
3061 		case PIPELINE_REQ_PORT_OUT_STATS_READ:
3062 			rsp = pipeline_msg_handle_port_out_stats_read(p, req);
3063 			break;
3064 
3065 		case PIPELINE_REQ_TABLE_STATS_READ:
3066 			rsp = pipeline_msg_handle_table_stats_read(p, req);
3067 			break;
3068 
3069 		case PIPELINE_REQ_TABLE_RULE_ADD:
3070 			rsp = pipeline_msg_handle_table_rule_add(p, req);
3071 			break;
3072 
3073 		case PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT:
3074 			rsp = pipeline_msg_handle_table_rule_add_default(p,	req);
3075 			break;
3076 
3077 		case PIPELINE_REQ_TABLE_RULE_ADD_BULK:
3078 			rsp = pipeline_msg_handle_table_rule_add_bulk(p, req);
3079 			break;
3080 
3081 		case PIPELINE_REQ_TABLE_RULE_DELETE:
3082 			rsp = pipeline_msg_handle_table_rule_delete(p, req);
3083 			break;
3084 
3085 		case PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT:
3086 			rsp = pipeline_msg_handle_table_rule_delete_default(p, req);
3087 			break;
3088 
3089 		case PIPELINE_REQ_TABLE_RULE_STATS_READ:
3090 			rsp = pipeline_msg_handle_table_rule_stats_read(p, req);
3091 			break;
3092 
3093 		case PIPELINE_REQ_TABLE_MTR_PROFILE_ADD:
3094 			rsp = pipeline_msg_handle_table_mtr_profile_add(p, req);
3095 			break;
3096 
3097 		case PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE:
3098 			rsp = pipeline_msg_handle_table_mtr_profile_delete(p, req);
3099 			break;
3100 
3101 		case PIPELINE_REQ_TABLE_RULE_MTR_READ:
3102 			rsp = pipeline_msg_handle_table_rule_mtr_read(p, req);
3103 			break;
3104 
3105 		case PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE:
3106 			rsp = pipeline_msg_handle_table_dscp_table_update(p, req);
3107 			break;
3108 
3109 		case PIPELINE_REQ_TABLE_RULE_TTL_READ:
3110 			rsp = pipeline_msg_handle_table_rule_ttl_read(p, req);
3111 			break;
3112 
3113 		case PIPELINE_REQ_TABLE_RULE_TIME_READ:
3114 			rsp = pipeline_msg_handle_table_rule_time_read(p, req);
3115 			break;
3116 
3117 		default:
3118 			rsp = (struct pipeline_msg_rsp *) req;
3119 			rsp->status = -1;
3120 		}
3121 
3122 		pipeline_msg_send(p->msgq_rsp, rsp);
3123 	}
3124 }
3125 
3126 /**
3127  * Data plane threads: main
3128  */
3129 int
3130 thread_main(void *arg __rte_unused)
3131 {
3132 	struct thread_data *t;
3133 	uint32_t thread_id, i;
3134 
3135 	thread_id = rte_lcore_id();
3136 	t = &thread_data[thread_id];
3137 
3138 	/* Dispatch loop */
3139 	for (i = 0; ; i++) {
3140 		uint32_t j;
3141 
3142 		/* Data Plane */
3143 		for (j = 0; j < t->n_pipelines; j++)
3144 			rte_pipeline_run(t->p[j]);
3145 
3146 		/* Control Plane */
3147 		if ((i & 0xF) == 0) {
3148 			uint64_t time = rte_get_tsc_cycles();
3149 			uint64_t time_next_min = UINT64_MAX;
3150 
3151 			if (time < t->time_next_min)
3152 				continue;
3153 
3154 			/* Pipeline message queues */
3155 			for (j = 0; j < t->n_pipelines; j++) {
3156 				struct pipeline_data *p =
3157 					&t->pipeline_data[j];
3158 				uint64_t time_next = p->time_next;
3159 
3160 				if (time_next <= time) {
3161 					pipeline_msg_handle(p);
3162 					rte_pipeline_flush(p->p);
3163 					time_next = time + p->timer_period;
3164 					p->time_next = time_next;
3165 				}
3166 
3167 				if (time_next < time_next_min)
3168 					time_next_min = time_next;
3169 			}
3170 
3171 			/* Thread message queues */
3172 			{
3173 				uint64_t time_next = t->time_next;
3174 
3175 				if (time_next <= time) {
3176 					thread_msg_handle(t);
3177 					time_next = time + t->timer_period;
3178 					t->time_next = time_next;
3179 				}
3180 
3181 				if (time_next < time_next_min)
3182 					time_next_min = time_next;
3183 			}
3184 
3185 			t->time_next_min = time_next_min;
3186 		}
3187 	}
3188 
3189 	return 0;
3190 }
3191