xref: /dpdk/examples/ip_pipeline/thread.c (revision 169a9da64a859e1782e49886e7214982304a580f)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2018 Intel Corporation
3  */
4 
5 #include <stdlib.h>
6 
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11 
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17 
18 #include "common.h"
19 #include "thread.h"
20 #include "pipeline.h"
21 
22 #ifndef THREAD_PIPELINES_MAX
23 #define THREAD_PIPELINES_MAX                               256
24 #endif
25 
26 #ifndef THREAD_MSGQ_SIZE
27 #define THREAD_MSGQ_SIZE                                   64
28 #endif
29 
30 #ifndef THREAD_TIMER_PERIOD_MS
31 #define THREAD_TIMER_PERIOD_MS                             100
32 #endif
33 
34 /**
35  * Master thead: data plane thread context
36  */
37 struct thread {
38 	struct rte_ring *msgq_req;
39 	struct rte_ring *msgq_rsp;
40 
41 	uint32_t enabled;
42 };
43 
44 static struct thread thread[RTE_MAX_LCORE];
45 
46 /**
47  * Data plane threads: context
48  */
49 struct table_data {
50 	struct rte_table_action *a;
51 };
52 
53 struct pipeline_data {
54 	struct rte_pipeline *p;
55 	struct table_data table_data[RTE_PIPELINE_TABLE_MAX];
56 	uint32_t n_tables;
57 
58 	struct rte_ring *msgq_req;
59 	struct rte_ring *msgq_rsp;
60 	uint64_t timer_period; /* Measured in CPU cycles. */
61 	uint64_t time_next;
62 
63 	uint8_t buffer[TABLE_RULE_ACTION_SIZE_MAX];
64 };
65 
66 struct thread_data {
67 	struct rte_pipeline *p[THREAD_PIPELINES_MAX];
68 	uint32_t n_pipelines;
69 
70 	struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
71 	struct rte_ring *msgq_req;
72 	struct rte_ring *msgq_rsp;
73 	uint64_t timer_period; /* Measured in CPU cycles. */
74 	uint64_t time_next;
75 	uint64_t time_next_min;
76 } __rte_cache_aligned;
77 
78 static struct thread_data thread_data[RTE_MAX_LCORE];
79 
80 /**
81  * Master thread: data plane thread init
82  */
83 static void
84 thread_free(void)
85 {
86 	uint32_t i;
87 
88 	for (i = 0; i < RTE_MAX_LCORE; i++) {
89 		struct thread *t = &thread[i];
90 
91 		if (!rte_lcore_is_enabled(i))
92 			continue;
93 
94 		/* MSGQs */
95 		if (t->msgq_req)
96 			rte_ring_free(t->msgq_req);
97 
98 		if (t->msgq_rsp)
99 			rte_ring_free(t->msgq_rsp);
100 	}
101 }
102 
103 int
104 thread_init(void)
105 {
106 	uint32_t i;
107 
108 	RTE_LCORE_FOREACH_SLAVE(i) {
109 		char name[NAME_MAX];
110 		struct rte_ring *msgq_req, *msgq_rsp;
111 		struct thread *t = &thread[i];
112 		struct thread_data *t_data = &thread_data[i];
113 		uint32_t cpu_id = rte_lcore_to_socket_id(i);
114 
115 		/* MSGQs */
116 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
117 
118 		msgq_req = rte_ring_create(name,
119 			THREAD_MSGQ_SIZE,
120 			cpu_id,
121 			RING_F_SP_ENQ | RING_F_SC_DEQ);
122 
123 		if (msgq_req == NULL) {
124 			thread_free();
125 			return -1;
126 		}
127 
128 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
129 
130 		msgq_rsp = rte_ring_create(name,
131 			THREAD_MSGQ_SIZE,
132 			cpu_id,
133 			RING_F_SP_ENQ | RING_F_SC_DEQ);
134 
135 		if (msgq_rsp == NULL) {
136 			thread_free();
137 			return -1;
138 		}
139 
140 		/* Master thread records */
141 		t->msgq_req = msgq_req;
142 		t->msgq_rsp = msgq_rsp;
143 		t->enabled = 1;
144 
145 		/* Data plane thread records */
146 		t_data->n_pipelines = 0;
147 		t_data->msgq_req = msgq_req;
148 		t_data->msgq_rsp = msgq_rsp;
149 		t_data->timer_period =
150 			(rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
151 		t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
152 		t_data->time_next_min = t_data->time_next;
153 	}
154 
155 	return 0;
156 }
157 
158 /**
159  * Master thread & data plane threads: message passing
160  */
161 enum thread_req_type {
162 	THREAD_REQ_PIPELINE_ENABLE = 0,
163 	THREAD_REQ_PIPELINE_DISABLE,
164 	THREAD_REQ_MAX
165 };
166 
167 struct thread_msg_req {
168 	enum thread_req_type type;
169 
170 	union {
171 		struct {
172 			struct rte_pipeline *p;
173 			struct {
174 				struct rte_table_action *a;
175 			} table[RTE_PIPELINE_TABLE_MAX];
176 			struct rte_ring *msgq_req;
177 			struct rte_ring *msgq_rsp;
178 			uint32_t timer_period_ms;
179 			uint32_t n_tables;
180 		} pipeline_enable;
181 
182 		struct {
183 			struct rte_pipeline *p;
184 		} pipeline_disable;
185 	};
186 };
187 
188 struct thread_msg_rsp {
189 	int status;
190 };
191 
192 /**
193  * Master thread
194  */
195 static struct thread_msg_req *
196 thread_msg_alloc(void)
197 {
198 	size_t size = RTE_MAX(sizeof(struct thread_msg_req),
199 		sizeof(struct thread_msg_rsp));
200 
201 	return calloc(1, size);
202 }
203 
204 static void
205 thread_msg_free(struct thread_msg_rsp *rsp)
206 {
207 	free(rsp);
208 }
209 
210 static struct thread_msg_rsp *
211 thread_msg_send_recv(uint32_t thread_id,
212 	struct thread_msg_req *req)
213 {
214 	struct thread *t = &thread[thread_id];
215 	struct rte_ring *msgq_req = t->msgq_req;
216 	struct rte_ring *msgq_rsp = t->msgq_rsp;
217 	struct thread_msg_rsp *rsp;
218 	int status;
219 
220 	/* send */
221 	do {
222 		status = rte_ring_sp_enqueue(msgq_req, req);
223 	} while (status == -ENOBUFS);
224 
225 	/* recv */
226 	do {
227 		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
228 	} while (status != 0);
229 
230 	return rsp;
231 }
232 
233 int
234 thread_pipeline_enable(uint32_t thread_id,
235 	const char *pipeline_name)
236 {
237 	struct pipeline *p = pipeline_find(pipeline_name);
238 	struct thread *t;
239 	struct thread_msg_req *req;
240 	struct thread_msg_rsp *rsp;
241 	uint32_t i;
242 	int status;
243 
244 	/* Check input params */
245 	if ((thread_id >= RTE_MAX_LCORE) ||
246 		(p == NULL) ||
247 		(p->n_ports_in == 0) ||
248 		(p->n_ports_out == 0) ||
249 		(p->n_tables == 0))
250 		return -1;
251 
252 	t = &thread[thread_id];
253 	if ((t->enabled == 0) ||
254 		p->enabled)
255 		return -1;
256 
257 	/* Allocate request */
258 	req = thread_msg_alloc();
259 	if (req == NULL)
260 		return -1;
261 
262 	/* Write request */
263 	req->type = THREAD_REQ_PIPELINE_ENABLE;
264 	req->pipeline_enable.p = p->p;
265 	for (i = 0; i < p->n_tables; i++)
266 		req->pipeline_enable.table[i].a =
267 			p->table[i].a;
268 	req->pipeline_enable.msgq_req = p->msgq_req;
269 	req->pipeline_enable.msgq_rsp = p->msgq_rsp;
270 	req->pipeline_enable.timer_period_ms = p->timer_period_ms;
271 	req->pipeline_enable.n_tables = p->n_tables;
272 
273 	/* Send request and wait for response */
274 	rsp = thread_msg_send_recv(thread_id, req);
275 	if (rsp == NULL)
276 		return -1;
277 
278 	/* Read response */
279 	status = rsp->status;
280 
281 	/* Free response */
282 	thread_msg_free(rsp);
283 
284 	/* Request completion */
285 	if (status)
286 		return status;
287 
288 	p->thread_id = thread_id;
289 	p->enabled = 1;
290 
291 	return 0;
292 }
293 
294 int
295 thread_pipeline_disable(uint32_t thread_id,
296 	const char *pipeline_name)
297 {
298 	struct pipeline *p = pipeline_find(pipeline_name);
299 	struct thread *t;
300 	struct thread_msg_req *req;
301 	struct thread_msg_rsp *rsp;
302 	int status;
303 
304 	/* Check input params */
305 	if ((thread_id >= RTE_MAX_LCORE) ||
306 		(p == NULL))
307 		return -1;
308 
309 	t = &thread[thread_id];
310 	if (t->enabled == 0)
311 		return -1;
312 
313 	if (p->enabled == 0)
314 		return 0;
315 
316 	if (p->thread_id != thread_id)
317 		return -1;
318 
319 	/* Allocate request */
320 	req = thread_msg_alloc();
321 	if (req == NULL)
322 		return -1;
323 
324 	/* Write request */
325 	req->type = THREAD_REQ_PIPELINE_DISABLE;
326 	req->pipeline_disable.p = p->p;
327 
328 	/* Send request and wait for response */
329 	rsp = thread_msg_send_recv(thread_id, req);
330 	if (rsp == NULL)
331 		return -1;
332 
333 	/* Read response */
334 	status = rsp->status;
335 
336 	/* Free response */
337 	thread_msg_free(rsp);
338 
339 	/* Request completion */
340 	if (status)
341 		return status;
342 
343 	p->enabled = 0;
344 
345 	return 0;
346 }
347 
348 /**
349  * Data plane threads: message handling
350  */
351 static inline struct thread_msg_req *
352 thread_msg_recv(struct rte_ring *msgq_req)
353 {
354 	struct thread_msg_req *req;
355 
356 	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
357 
358 	if (status != 0)
359 		return NULL;
360 
361 	return req;
362 }
363 
364 static inline void
365 thread_msg_send(struct rte_ring *msgq_rsp,
366 	struct thread_msg_rsp *rsp)
367 {
368 	int status;
369 
370 	do {
371 		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
372 	} while (status == -ENOBUFS);
373 }
374 
375 static struct thread_msg_rsp *
376 thread_msg_handle_pipeline_enable(struct thread_data *t,
377 	struct thread_msg_req *req)
378 {
379 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
380 	struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
381 	uint32_t i;
382 
383 	/* Request */
384 	if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
385 		rsp->status = -1;
386 		return rsp;
387 	}
388 
389 	t->p[t->n_pipelines] = req->pipeline_enable.p;
390 
391 	p->p = req->pipeline_enable.p;
392 	for (i = 0; i < req->pipeline_enable.n_tables; i++)
393 		p->table_data[i].a =
394 			req->pipeline_enable.table[i].a;
395 
396 	p->n_tables = req->pipeline_enable.n_tables;
397 
398 	p->msgq_req = req->pipeline_enable.msgq_req;
399 	p->msgq_rsp = req->pipeline_enable.msgq_rsp;
400 	p->timer_period =
401 		(rte_get_tsc_hz() * req->pipeline_enable.timer_period_ms) / 1000;
402 	p->time_next = rte_get_tsc_cycles() + p->timer_period;
403 
404 	t->n_pipelines++;
405 
406 	/* Response */
407 	rsp->status = 0;
408 	return rsp;
409 }
410 
411 static struct thread_msg_rsp *
412 thread_msg_handle_pipeline_disable(struct thread_data *t,
413 	struct thread_msg_req *req)
414 {
415 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
416 	uint32_t n_pipelines = t->n_pipelines;
417 	struct rte_pipeline *pipeline = req->pipeline_disable.p;
418 	uint32_t i;
419 
420 	/* find pipeline */
421 	for (i = 0; i < n_pipelines; i++) {
422 		struct pipeline_data *p = &t->pipeline_data[i];
423 
424 		if (p->p != pipeline)
425 			continue;
426 
427 		if (i < n_pipelines - 1) {
428 			struct rte_pipeline *pipeline_last =
429 				t->p[n_pipelines - 1];
430 			struct pipeline_data *p_last =
431 				&t->pipeline_data[n_pipelines - 1];
432 
433 			t->p[i] = pipeline_last;
434 			memcpy(p, p_last, sizeof(*p));
435 		}
436 
437 		t->n_pipelines--;
438 
439 		rsp->status = 0;
440 		return rsp;
441 	}
442 
443 	/* should not get here */
444 	rsp->status = 0;
445 	return rsp;
446 }
447 
448 static void
449 thread_msg_handle(struct thread_data *t)
450 {
451 	for ( ; ; ) {
452 		struct thread_msg_req *req;
453 		struct thread_msg_rsp *rsp;
454 
455 		req = thread_msg_recv(t->msgq_req);
456 		if (req == NULL)
457 			break;
458 
459 		switch (req->type) {
460 		case THREAD_REQ_PIPELINE_ENABLE:
461 			rsp = thread_msg_handle_pipeline_enable(t, req);
462 			break;
463 
464 		case THREAD_REQ_PIPELINE_DISABLE:
465 			rsp = thread_msg_handle_pipeline_disable(t, req);
466 			break;
467 
468 		default:
469 			rsp = (struct thread_msg_rsp *) req;
470 			rsp->status = -1;
471 		}
472 
473 		thread_msg_send(t->msgq_rsp, rsp);
474 	}
475 }
476 
477 /**
478  * Master thread & data plane threads: message passing
479  */
480 enum pipeline_req_type {
481 	/* Port IN */
482 	PIPELINE_REQ_PORT_IN_STATS_READ,
483 	PIPELINE_REQ_PORT_IN_ENABLE,
484 	PIPELINE_REQ_PORT_IN_DISABLE,
485 
486 	/* Port OUT */
487 	PIPELINE_REQ_PORT_OUT_STATS_READ,
488 
489 	/* Table */
490 	PIPELINE_REQ_TABLE_STATS_READ,
491 	PIPELINE_REQ_TABLE_RULE_ADD,
492 	PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT,
493 	PIPELINE_REQ_TABLE_RULE_ADD_BULK,
494 	PIPELINE_REQ_TABLE_RULE_DELETE,
495 	PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT,
496 	PIPELINE_REQ_TABLE_RULE_STATS_READ,
497 	PIPELINE_REQ_TABLE_MTR_PROFILE_ADD,
498 	PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE,
499 	PIPELINE_REQ_TABLE_RULE_MTR_READ,
500 	PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE,
501 	PIPELINE_REQ_TABLE_RULE_TTL_READ,
502 	PIPELINE_REQ_MAX
503 };
504 
505 struct pipeline_msg_req_port_in_stats_read {
506 	int clear;
507 };
508 
509 struct pipeline_msg_req_port_out_stats_read {
510 	int clear;
511 };
512 
513 struct pipeline_msg_req_table_stats_read {
514 	int clear;
515 };
516 
517 struct pipeline_msg_req_table_rule_add {
518 	struct table_rule_match match;
519 	struct table_rule_action action;
520 };
521 
522 struct pipeline_msg_req_table_rule_add_default {
523 	struct table_rule_action action;
524 };
525 
526 struct pipeline_msg_req_table_rule_add_bulk {
527 	struct table_rule_match *match;
528 	struct table_rule_action *action;
529 	void **data;
530 	uint32_t n_rules;
531 	int bulk;
532 };
533 
534 struct pipeline_msg_req_table_rule_delete {
535 	struct table_rule_match match;
536 };
537 
538 struct pipeline_msg_req_table_rule_stats_read {
539 	void *data;
540 	int clear;
541 };
542 
543 struct pipeline_msg_req_table_mtr_profile_add {
544 	uint32_t meter_profile_id;
545 	struct rte_table_action_meter_profile profile;
546 };
547 
548 struct pipeline_msg_req_table_mtr_profile_delete {
549 	uint32_t meter_profile_id;
550 };
551 
552 struct pipeline_msg_req_table_rule_mtr_read {
553 	void *data;
554 	uint32_t tc_mask;
555 	int clear;
556 };
557 
558 struct pipeline_msg_req_table_dscp_table_update {
559 	uint64_t dscp_mask;
560 	struct rte_table_action_dscp_table dscp_table;
561 };
562 
563 struct pipeline_msg_req_table_rule_ttl_read {
564 	void *data;
565 	int clear;
566 };
567 
568 struct pipeline_msg_req {
569 	enum pipeline_req_type type;
570 	uint32_t id; /* Port IN, port OUT or table ID */
571 
572 	RTE_STD_C11
573 	union {
574 		struct pipeline_msg_req_port_in_stats_read port_in_stats_read;
575 		struct pipeline_msg_req_port_out_stats_read port_out_stats_read;
576 		struct pipeline_msg_req_table_stats_read table_stats_read;
577 		struct pipeline_msg_req_table_rule_add table_rule_add;
578 		struct pipeline_msg_req_table_rule_add_default table_rule_add_default;
579 		struct pipeline_msg_req_table_rule_add_bulk table_rule_add_bulk;
580 		struct pipeline_msg_req_table_rule_delete table_rule_delete;
581 		struct pipeline_msg_req_table_rule_stats_read table_rule_stats_read;
582 		struct pipeline_msg_req_table_mtr_profile_add table_mtr_profile_add;
583 		struct pipeline_msg_req_table_mtr_profile_delete table_mtr_profile_delete;
584 		struct pipeline_msg_req_table_rule_mtr_read table_rule_mtr_read;
585 		struct pipeline_msg_req_table_dscp_table_update table_dscp_table_update;
586 		struct pipeline_msg_req_table_rule_ttl_read table_rule_ttl_read;
587 	};
588 };
589 
590 struct pipeline_msg_rsp_port_in_stats_read {
591 	struct rte_pipeline_port_in_stats stats;
592 };
593 
594 struct pipeline_msg_rsp_port_out_stats_read {
595 	struct rte_pipeline_port_out_stats stats;
596 };
597 
598 struct pipeline_msg_rsp_table_stats_read {
599 	struct rte_pipeline_table_stats stats;
600 };
601 
602 struct pipeline_msg_rsp_table_rule_add {
603 	void *data;
604 };
605 
606 struct pipeline_msg_rsp_table_rule_add_default {
607 	void *data;
608 };
609 
610 struct pipeline_msg_rsp_table_rule_add_bulk {
611 	uint32_t n_rules;
612 };
613 
614 struct pipeline_msg_rsp_table_rule_stats_read {
615 	struct rte_table_action_stats_counters stats;
616 };
617 
618 struct pipeline_msg_rsp_table_rule_mtr_read {
619 	struct rte_table_action_mtr_counters stats;
620 };
621 
622 struct pipeline_msg_rsp_table_rule_ttl_read {
623 	struct rte_table_action_ttl_counters stats;
624 };
625 
626 struct pipeline_msg_rsp {
627 	int status;
628 
629 	RTE_STD_C11
630 	union {
631 		struct pipeline_msg_rsp_port_in_stats_read port_in_stats_read;
632 		struct pipeline_msg_rsp_port_out_stats_read port_out_stats_read;
633 		struct pipeline_msg_rsp_table_stats_read table_stats_read;
634 		struct pipeline_msg_rsp_table_rule_add table_rule_add;
635 		struct pipeline_msg_rsp_table_rule_add_default table_rule_add_default;
636 		struct pipeline_msg_rsp_table_rule_add_bulk table_rule_add_bulk;
637 		struct pipeline_msg_rsp_table_rule_stats_read table_rule_stats_read;
638 		struct pipeline_msg_rsp_table_rule_mtr_read table_rule_mtr_read;
639 		struct pipeline_msg_rsp_table_rule_ttl_read table_rule_ttl_read;
640 	};
641 };
642 
643 /**
644  * Master thread
645  */
646 static struct pipeline_msg_req *
647 pipeline_msg_alloc(void)
648 {
649 	size_t size = RTE_MAX(sizeof(struct pipeline_msg_req),
650 		sizeof(struct pipeline_msg_rsp));
651 
652 	return calloc(1, size);
653 }
654 
655 static void
656 pipeline_msg_free(struct pipeline_msg_rsp *rsp)
657 {
658 	free(rsp);
659 }
660 
661 static struct pipeline_msg_rsp *
662 pipeline_msg_send_recv(struct pipeline *p,
663 	struct pipeline_msg_req *req)
664 {
665 	struct rte_ring *msgq_req = p->msgq_req;
666 	struct rte_ring *msgq_rsp = p->msgq_rsp;
667 	struct pipeline_msg_rsp *rsp;
668 	int status;
669 
670 	/* send */
671 	do {
672 		status = rte_ring_sp_enqueue(msgq_req, req);
673 	} while (status == -ENOBUFS);
674 
675 	/* recv */
676 	do {
677 		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
678 	} while (status != 0);
679 
680 	return rsp;
681 }
682 
683 int
684 pipeline_port_in_stats_read(const char *pipeline_name,
685 	uint32_t port_id,
686 	struct rte_pipeline_port_in_stats *stats,
687 	int clear)
688 {
689 	struct pipeline *p;
690 	struct pipeline_msg_req *req;
691 	struct pipeline_msg_rsp *rsp;
692 	int status;
693 
694 	/* Check input params */
695 	if ((pipeline_name == NULL) ||
696 		(stats == NULL))
697 		return -1;
698 
699 	p = pipeline_find(pipeline_name);
700 	if ((p == NULL) ||
701 		(p->enabled == 0) ||
702 		(port_id >= p->n_ports_in))
703 		return -1;
704 
705 	/* Allocate request */
706 	req = pipeline_msg_alloc();
707 	if (req == NULL)
708 		return -1;
709 
710 	/* Write request */
711 	req->type = PIPELINE_REQ_PORT_IN_STATS_READ;
712 	req->id = port_id;
713 	req->port_in_stats_read.clear = clear;
714 
715 	/* Send request and wait for response */
716 	rsp = pipeline_msg_send_recv(p, req);
717 	if (rsp == NULL)
718 		return -1;
719 
720 	/* Read response */
721 	status = rsp->status;
722 	if (status)
723 		memcpy(stats, &rsp->port_in_stats_read.stats, sizeof(*stats));
724 
725 	/* Free response */
726 	pipeline_msg_free(rsp);
727 
728 	return status;
729 }
730 
731 int
732 pipeline_port_in_enable(const char *pipeline_name,
733 	uint32_t port_id)
734 {
735 	struct pipeline *p;
736 	struct pipeline_msg_req *req;
737 	struct pipeline_msg_rsp *rsp;
738 	int status;
739 
740 	/* Check input params */
741 	if (pipeline_name == NULL)
742 		return -1;
743 
744 	p = pipeline_find(pipeline_name);
745 	if ((p == NULL) ||
746 		(p->enabled == 0) ||
747 		(port_id >= p->n_ports_in))
748 		return -1;
749 
750 	/* Allocate request */
751 	req = pipeline_msg_alloc();
752 	if (req == NULL)
753 		return -1;
754 
755 	/* Write request */
756 	req->type = PIPELINE_REQ_PORT_IN_ENABLE;
757 	req->id = port_id;
758 
759 	/* Send request and wait for response */
760 	rsp = pipeline_msg_send_recv(p, req);
761 	if (rsp == NULL)
762 		return -1;
763 
764 	/* Read response */
765 	status = rsp->status;
766 
767 	/* Free response */
768 	pipeline_msg_free(rsp);
769 
770 	return status;
771 }
772 
773 int
774 pipeline_port_in_disable(const char *pipeline_name,
775 	uint32_t port_id)
776 {
777 	struct pipeline *p;
778 	struct pipeline_msg_req *req;
779 	struct pipeline_msg_rsp *rsp;
780 	int status;
781 
782 	/* Check input params */
783 	if (pipeline_name == NULL)
784 		return -1;
785 
786 	p = pipeline_find(pipeline_name);
787 	if ((p == NULL) ||
788 		(p->enabled == 0) ||
789 		(port_id >= p->n_ports_in))
790 		return -1;
791 
792 	/* Allocate request */
793 	req = pipeline_msg_alloc();
794 	if (req == NULL)
795 		return -1;
796 
797 	/* Write request */
798 	req->type = PIPELINE_REQ_PORT_IN_DISABLE;
799 	req->id = port_id;
800 
801 	/* Send request and wait for response */
802 	rsp = pipeline_msg_send_recv(p, req);
803 	if (rsp == NULL)
804 		return -1;
805 
806 	/* Read response */
807 	status = rsp->status;
808 
809 	/* Free response */
810 	pipeline_msg_free(rsp);
811 
812 	return status;
813 }
814 
815 int
816 pipeline_port_out_stats_read(const char *pipeline_name,
817 	uint32_t port_id,
818 	struct rte_pipeline_port_out_stats *stats,
819 	int clear)
820 {
821 	struct pipeline *p;
822 	struct pipeline_msg_req *req;
823 	struct pipeline_msg_rsp *rsp;
824 	int status;
825 
826 	/* Check input params */
827 	if ((pipeline_name == NULL) ||
828 		(stats == NULL))
829 		return -1;
830 
831 	p = pipeline_find(pipeline_name);
832 	if ((p == NULL) ||
833 		(p->enabled == 0) ||
834 		(port_id >= p->n_ports_out))
835 		return -1;
836 
837 	/* Allocate request */
838 	req = pipeline_msg_alloc();
839 	if (req == NULL)
840 		return -1;
841 
842 	/* Write request */
843 	req->type = PIPELINE_REQ_PORT_OUT_STATS_READ;
844 	req->id = port_id;
845 	req->port_out_stats_read.clear = clear;
846 
847 	/* Send request and wait for response */
848 	rsp = pipeline_msg_send_recv(p, req);
849 	if (rsp == NULL)
850 		return -1;
851 
852 	/* Read response */
853 	status = rsp->status;
854 	if (status)
855 		memcpy(stats, &rsp->port_out_stats_read.stats, sizeof(*stats));
856 
857 	/* Free response */
858 	pipeline_msg_free(rsp);
859 
860 	return status;
861 }
862 
863 int
864 pipeline_table_stats_read(const char *pipeline_name,
865 	uint32_t table_id,
866 	struct rte_pipeline_table_stats *stats,
867 	int clear)
868 {
869 	struct pipeline *p;
870 	struct pipeline_msg_req *req;
871 	struct pipeline_msg_rsp *rsp;
872 	int status;
873 
874 	/* Check input params */
875 	if ((pipeline_name == NULL) ||
876 		(stats == NULL))
877 		return -1;
878 
879 	p = pipeline_find(pipeline_name);
880 	if ((p == NULL) ||
881 		(p->enabled == 0) ||
882 		(table_id >= p->n_tables))
883 		return -1;
884 
885 	/* Allocate request */
886 	req = pipeline_msg_alloc();
887 	if (req == NULL)
888 		return -1;
889 
890 	/* Write request */
891 	req->type = PIPELINE_REQ_TABLE_STATS_READ;
892 	req->id = table_id;
893 	req->table_stats_read.clear = clear;
894 
895 	/* Send request and wait for response */
896 	rsp = pipeline_msg_send_recv(p, req);
897 	if (rsp == NULL)
898 		return -1;
899 
900 	/* Read response */
901 	status = rsp->status;
902 	if (status)
903 		memcpy(stats, &rsp->table_stats_read.stats, sizeof(*stats));
904 
905 	/* Free response */
906 	pipeline_msg_free(rsp);
907 
908 	return status;
909 }
910 
911 static int
912 match_check(struct table_rule_match *match,
913 	struct pipeline *p,
914 	uint32_t table_id)
915 {
916 	struct table *table;
917 
918 	if ((match == NULL) ||
919 		(p == NULL) ||
920 		(table_id >= p->n_tables))
921 		return -1;
922 
923 	table = &p->table[table_id];
924 	if (match->match_type != table->params.match_type)
925 		return -1;
926 
927 	switch (match->match_type) {
928 	case TABLE_ACL:
929 	{
930 		struct table_acl_params *t = &table->params.match.acl;
931 		struct table_rule_match_acl *r = &match->match.acl;
932 
933 		if ((r->ip_version && (t->ip_version == 0)) ||
934 			((r->ip_version == 0) && t->ip_version))
935 			return -1;
936 
937 		if (r->ip_version) {
938 			if ((r->sa_depth > 32) ||
939 				(r->da_depth > 32))
940 				return -1;
941 		} else {
942 			if ((r->sa_depth > 128) ||
943 				(r->da_depth > 128))
944 				return -1;
945 		}
946 		return 0;
947 	}
948 
949 	case TABLE_ARRAY:
950 		return 0;
951 
952 	case TABLE_HASH:
953 		return 0;
954 
955 	case TABLE_LPM:
956 	{
957 		struct table_lpm_params *t = &table->params.match.lpm;
958 		struct table_rule_match_lpm *r = &match->match.lpm;
959 
960 		if ((r->ip_version && (t->key_size != 4)) ||
961 			((r->ip_version == 0) && (t->key_size != 16)))
962 			return -1;
963 
964 		if (r->ip_version) {
965 			if (r->depth > 32)
966 				return -1;
967 		} else {
968 			if (r->depth > 128)
969 				return -1;
970 		}
971 		return 0;
972 	}
973 
974 	case TABLE_STUB:
975 		return -1;
976 
977 	default:
978 		return -1;
979 	}
980 }
981 
982 static int
983 action_check(struct table_rule_action *action,
984 	struct pipeline *p,
985 	uint32_t table_id)
986 {
987 	struct table_action_profile *ap;
988 
989 	if ((action == NULL) ||
990 		(p == NULL) ||
991 		(table_id >= p->n_tables))
992 		return -1;
993 
994 	ap = p->table[table_id].ap;
995 	if (action->action_mask != ap->params.action_mask)
996 		return -1;
997 
998 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
999 		if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1000 			(action->fwd.id >= p->n_ports_out))
1001 			return -1;
1002 
1003 		if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1004 			(action->fwd.id >= p->n_tables))
1005 			return -1;
1006 	}
1007 
1008 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
1009 		uint32_t tc_mask0 = (1 << ap->params.mtr.n_tc) - 1;
1010 		uint32_t tc_mask1 = action->mtr.tc_mask;
1011 
1012 		if (tc_mask1 != tc_mask0)
1013 			return -1;
1014 	}
1015 
1016 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
1017 		uint32_t n_subports_per_port =
1018 			ap->params.tm.n_subports_per_port;
1019 		uint32_t n_pipes_per_subport =
1020 			ap->params.tm.n_pipes_per_subport;
1021 		uint32_t subport_id = action->tm.subport_id;
1022 		uint32_t pipe_id = action->tm.pipe_id;
1023 
1024 		if ((subport_id >= n_subports_per_port) ||
1025 			(pipe_id >= n_pipes_per_subport))
1026 			return -1;
1027 	}
1028 
1029 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
1030 		uint64_t encap_mask = ap->params.encap.encap_mask;
1031 		enum rte_table_action_encap_type type = action->encap.type;
1032 
1033 		if ((encap_mask & (1LLU << type)) == 0)
1034 			return -1;
1035 	}
1036 
1037 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
1038 		int ip_version0 = ap->params.common.ip_version;
1039 		int ip_version1 = action->nat.ip_version;
1040 
1041 		if ((ip_version1 && (ip_version0 == 0)) ||
1042 			((ip_version1 == 0) && ip_version0))
1043 			return -1;
1044 	}
1045 
1046 	return 0;
1047 }
1048 
1049 static int
1050 action_default_check(struct table_rule_action *action,
1051 	struct pipeline *p,
1052 	uint32_t table_id)
1053 {
1054 	if ((action == NULL) ||
1055 		(action->action_mask != (1LLU << RTE_TABLE_ACTION_FWD)) ||
1056 		(p == NULL) ||
1057 		(table_id >= p->n_tables))
1058 		return -1;
1059 
1060 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1061 		if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1062 			(action->fwd.id >= p->n_ports_out))
1063 			return -1;
1064 
1065 		if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1066 			(action->fwd.id >= p->n_tables))
1067 			return -1;
1068 	}
1069 
1070 	return 0;
1071 }
1072 
1073 int
1074 pipeline_table_rule_add(const char *pipeline_name,
1075 	uint32_t table_id,
1076 	struct table_rule_match *match,
1077 	struct table_rule_action *action,
1078 	void **data)
1079 {
1080 	struct pipeline *p;
1081 	struct pipeline_msg_req *req;
1082 	struct pipeline_msg_rsp *rsp;
1083 	int status;
1084 
1085 	/* Check input params */
1086 	if ((pipeline_name == NULL) ||
1087 		(match == NULL) ||
1088 		(action == NULL) ||
1089 		(data == NULL))
1090 		return -1;
1091 
1092 	p = pipeline_find(pipeline_name);
1093 	if ((p == NULL) ||
1094 		(p->enabled == 0) ||
1095 		(table_id >= p->n_tables) ||
1096 		match_check(match, p, table_id) ||
1097 		action_check(action, p, table_id))
1098 		return -1;
1099 
1100 	/* Allocate request */
1101 	req = pipeline_msg_alloc();
1102 	if (req == NULL)
1103 		return -1;
1104 
1105 	/* Write request */
1106 	req->type = PIPELINE_REQ_TABLE_RULE_ADD;
1107 	req->id = table_id;
1108 	memcpy(&req->table_rule_add.match, match, sizeof(*match));
1109 	memcpy(&req->table_rule_add.action, action, sizeof(*action));
1110 
1111 	/* Send request and wait for response */
1112 	rsp = pipeline_msg_send_recv(p, req);
1113 	if (rsp == NULL)
1114 		return -1;
1115 
1116 	/* Read response */
1117 	status = rsp->status;
1118 	if (status == 0)
1119 		*data = rsp->table_rule_add.data;
1120 
1121 	/* Free response */
1122 	pipeline_msg_free(rsp);
1123 
1124 	return status;
1125 }
1126 
1127 int
1128 pipeline_table_rule_add_default(const char *pipeline_name,
1129 	uint32_t table_id,
1130 	struct table_rule_action *action,
1131 	void **data)
1132 {
1133 	struct pipeline *p;
1134 	struct pipeline_msg_req *req;
1135 	struct pipeline_msg_rsp *rsp;
1136 	int status;
1137 
1138 	/* Check input params */
1139 	if ((pipeline_name == NULL) ||
1140 		(action == NULL) ||
1141 		(data == NULL))
1142 		return -1;
1143 
1144 	p = pipeline_find(pipeline_name);
1145 	if ((p == NULL) ||
1146 		(p->enabled == 0) ||
1147 		(table_id >= p->n_tables) ||
1148 		action_default_check(action, p, table_id))
1149 		return -1;
1150 
1151 	/* Allocate request */
1152 	req = pipeline_msg_alloc();
1153 	if (req == NULL)
1154 		return -1;
1155 
1156 	/* Write request */
1157 	req->type = PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT;
1158 	req->id = table_id;
1159 	memcpy(&req->table_rule_add_default.action, action, sizeof(*action));
1160 
1161 	/* Send request and wait for response */
1162 	rsp = pipeline_msg_send_recv(p, req);
1163 	if (rsp == NULL)
1164 		return -1;
1165 
1166 	/* Read response */
1167 	status = rsp->status;
1168 	if (status == 0)
1169 		*data = rsp->table_rule_add_default.data;
1170 
1171 	/* Free response */
1172 	pipeline_msg_free(rsp);
1173 
1174 	return status;
1175 }
1176 
1177 int
1178 pipeline_table_rule_add_bulk(const char *pipeline_name,
1179 	uint32_t table_id,
1180 	struct table_rule_match *match,
1181 	struct table_rule_action *action,
1182 	void **data,
1183 	uint32_t *n_rules)
1184 {
1185 	struct pipeline *p;
1186 	struct pipeline_msg_req *req;
1187 	struct pipeline_msg_rsp *rsp;
1188 	uint32_t i;
1189 	int status;
1190 
1191 	/* Check input params */
1192 	if ((pipeline_name == NULL) ||
1193 		(match == NULL) ||
1194 		(action == NULL) ||
1195 		(data == NULL) ||
1196 		(n_rules == NULL) ||
1197 		(*n_rules == 0))
1198 		return -1;
1199 
1200 	p = pipeline_find(pipeline_name);
1201 	if ((p == NULL) ||
1202 		(p->enabled == 0) ||
1203 		(table_id >= p->n_tables))
1204 		return -1;
1205 
1206 	for (i = 0; i < *n_rules; i++)
1207 		if (match_check(match, p, table_id) ||
1208 			action_check(action, p, table_id))
1209 			return -1;
1210 
1211 	/* Allocate request */
1212 	req = pipeline_msg_alloc();
1213 	if (req == NULL)
1214 		return -1;
1215 
1216 	/* Write request */
1217 	req->type = PIPELINE_REQ_TABLE_RULE_ADD_BULK;
1218 	req->id = table_id;
1219 	req->table_rule_add_bulk.match = match;
1220 	req->table_rule_add_bulk.action = action;
1221 	req->table_rule_add_bulk.data = data;
1222 	req->table_rule_add_bulk.n_rules = *n_rules;
1223 	req->table_rule_add_bulk.bulk =
1224 		(p->table[table_id].params.match_type == TABLE_ACL) ? 1 : 0;
1225 
1226 	/* Send request and wait for response */
1227 	rsp = pipeline_msg_send_recv(p, req);
1228 	if (rsp == NULL)
1229 		return -1;
1230 
1231 	/* Read response */
1232 	status = rsp->status;
1233 	if (status == 0)
1234 		*n_rules = rsp->table_rule_add_bulk.n_rules;
1235 
1236 	/* Free response */
1237 	pipeline_msg_free(rsp);
1238 
1239 	return status;
1240 }
1241 
1242 int
1243 pipeline_table_rule_delete(const char *pipeline_name,
1244 	uint32_t table_id,
1245 	struct table_rule_match *match)
1246 {
1247 	struct pipeline *p;
1248 	struct pipeline_msg_req *req;
1249 	struct pipeline_msg_rsp *rsp;
1250 	int status;
1251 
1252 	/* Check input params */
1253 	if ((pipeline_name == NULL) ||
1254 		(match == NULL))
1255 		return -1;
1256 
1257 	p = pipeline_find(pipeline_name);
1258 	if ((p == NULL) ||
1259 		(p->enabled == 0) ||
1260 		(table_id >= p->n_tables) ||
1261 		match_check(match, p, table_id))
1262 		return -1;
1263 
1264 	/* Allocate request */
1265 	req = pipeline_msg_alloc();
1266 	if (req == NULL)
1267 		return -1;
1268 
1269 	/* Write request */
1270 	req->type = PIPELINE_REQ_TABLE_RULE_DELETE;
1271 	req->id = table_id;
1272 	memcpy(&req->table_rule_delete.match, match, sizeof(*match));
1273 
1274 	/* Send request and wait for response */
1275 	rsp = pipeline_msg_send_recv(p, req);
1276 	if (rsp == NULL)
1277 		return -1;
1278 
1279 	/* Read response */
1280 	status = rsp->status;
1281 
1282 	/* Free response */
1283 	pipeline_msg_free(rsp);
1284 
1285 	return status;
1286 }
1287 
1288 int
1289 pipeline_table_rule_delete_default(const char *pipeline_name,
1290 	uint32_t table_id)
1291 {
1292 	struct pipeline *p;
1293 	struct pipeline_msg_req *req;
1294 	struct pipeline_msg_rsp *rsp;
1295 	int status;
1296 
1297 	/* Check input params */
1298 	if (pipeline_name == NULL)
1299 		return -1;
1300 
1301 	p = pipeline_find(pipeline_name);
1302 	if ((p == NULL) ||
1303 		(p->enabled == 0) ||
1304 		(table_id >= p->n_tables))
1305 		return -1;
1306 
1307 	/* Allocate request */
1308 	req = pipeline_msg_alloc();
1309 	if (req == NULL)
1310 		return -1;
1311 
1312 	/* Write request */
1313 	req->type = PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT;
1314 	req->id = table_id;
1315 
1316 	/* Send request and wait for response */
1317 	rsp = pipeline_msg_send_recv(p, req);
1318 	if (rsp == NULL)
1319 		return -1;
1320 
1321 	/* Read response */
1322 	status = rsp->status;
1323 
1324 	/* Free response */
1325 	pipeline_msg_free(rsp);
1326 
1327 	return status;
1328 }
1329 
1330 int
1331 pipeline_table_rule_stats_read(const char *pipeline_name,
1332 	uint32_t table_id,
1333 	void *data,
1334 	struct rte_table_action_stats_counters *stats,
1335 	int clear)
1336 {
1337 	struct pipeline *p;
1338 	struct pipeline_msg_req *req;
1339 	struct pipeline_msg_rsp *rsp;
1340 	int status;
1341 
1342 	/* Check input params */
1343 	if ((pipeline_name == NULL) ||
1344 		(data == NULL) ||
1345 		(stats == NULL))
1346 		return -1;
1347 
1348 	p = pipeline_find(pipeline_name);
1349 	if ((p == NULL) ||
1350 		(p->enabled == 0) ||
1351 		(table_id >= p->n_tables))
1352 		return -1;
1353 
1354 	/* Allocate request */
1355 	req = pipeline_msg_alloc();
1356 	if (req == NULL)
1357 		return -1;
1358 
1359 	/* Write request */
1360 	req->type = PIPELINE_REQ_TABLE_RULE_STATS_READ;
1361 	req->id = table_id;
1362 	req->table_rule_stats_read.data = data;
1363 	req->table_rule_stats_read.clear = clear;
1364 
1365 	/* Send request and wait for response */
1366 	rsp = pipeline_msg_send_recv(p, req);
1367 	if (rsp == NULL)
1368 		return -1;
1369 
1370 	/* Read response */
1371 	status = rsp->status;
1372 	if (status)
1373 		memcpy(stats, &rsp->table_rule_stats_read.stats, sizeof(*stats));
1374 
1375 	/* Free response */
1376 	pipeline_msg_free(rsp);
1377 
1378 	return status;
1379 }
1380 
1381 int
1382 pipeline_table_mtr_profile_add(const char *pipeline_name,
1383 	uint32_t table_id,
1384 	uint32_t meter_profile_id,
1385 	struct rte_table_action_meter_profile *profile)
1386 {
1387 	struct pipeline *p;
1388 	struct pipeline_msg_req *req;
1389 	struct pipeline_msg_rsp *rsp;
1390 	int status;
1391 
1392 	/* Check input params */
1393 	if ((pipeline_name == NULL) ||
1394 		(profile == NULL))
1395 		return -1;
1396 
1397 	p = pipeline_find(pipeline_name);
1398 	if ((p == NULL) ||
1399 		(p->enabled == 0) ||
1400 		(table_id >= p->n_tables))
1401 		return -1;
1402 
1403 	/* Allocate request */
1404 	req = pipeline_msg_alloc();
1405 	if (req == NULL)
1406 		return -1;
1407 
1408 	/* Write request */
1409 	req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_ADD;
1410 	req->id = table_id;
1411 	req->table_mtr_profile_add.meter_profile_id = meter_profile_id;
1412 	memcpy(&req->table_mtr_profile_add.profile, profile, sizeof(*profile));
1413 
1414 	/* Send request and wait for response */
1415 	rsp = pipeline_msg_send_recv(p, req);
1416 	if (rsp == NULL)
1417 		return -1;
1418 
1419 	/* Read response */
1420 	status = rsp->status;
1421 
1422 	/* Free response */
1423 	pipeline_msg_free(rsp);
1424 
1425 	return status;
1426 }
1427 
1428 int
1429 pipeline_table_mtr_profile_delete(const char *pipeline_name,
1430 	uint32_t table_id,
1431 	uint32_t meter_profile_id)
1432 {
1433 	struct pipeline *p;
1434 	struct pipeline_msg_req *req;
1435 	struct pipeline_msg_rsp *rsp;
1436 	int status;
1437 
1438 	/* Check input params */
1439 	if (pipeline_name == NULL)
1440 		return -1;
1441 
1442 	p = pipeline_find(pipeline_name);
1443 	if ((p == NULL) ||
1444 		(p->enabled == 0) ||
1445 		(table_id >= p->n_tables))
1446 		return -1;
1447 
1448 	/* Allocate request */
1449 	req = pipeline_msg_alloc();
1450 	if (req == NULL)
1451 		return -1;
1452 
1453 	/* Write request */
1454 	req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE;
1455 	req->id = table_id;
1456 	req->table_mtr_profile_delete.meter_profile_id = meter_profile_id;
1457 
1458 	/* Send request and wait for response */
1459 	rsp = pipeline_msg_send_recv(p, req);
1460 	if (rsp == NULL)
1461 		return -1;
1462 
1463 	/* Read response */
1464 	status = rsp->status;
1465 
1466 	/* Free response */
1467 	pipeline_msg_free(rsp);
1468 
1469 	return status;
1470 }
1471 
1472 int
1473 pipeline_table_rule_mtr_read(const char *pipeline_name,
1474 	uint32_t table_id,
1475 	void *data,
1476 	uint32_t tc_mask,
1477 	struct rte_table_action_mtr_counters *stats,
1478 	int clear)
1479 {
1480 	struct pipeline *p;
1481 	struct pipeline_msg_req *req;
1482 	struct pipeline_msg_rsp *rsp;
1483 	int status;
1484 
1485 	/* Check input params */
1486 	if ((pipeline_name == NULL) ||
1487 		(data == NULL) ||
1488 		(stats == NULL))
1489 		return -1;
1490 
1491 	p = pipeline_find(pipeline_name);
1492 	if ((p == NULL) ||
1493 		(p->enabled == 0) ||
1494 		(table_id >= p->n_tables))
1495 		return -1;
1496 
1497 	/* Allocate request */
1498 	req = pipeline_msg_alloc();
1499 	if (req == NULL)
1500 		return -1;
1501 
1502 	/* Write request */
1503 	req->type = PIPELINE_REQ_TABLE_RULE_MTR_READ;
1504 	req->id = table_id;
1505 	req->table_rule_mtr_read.data = data;
1506 	req->table_rule_mtr_read.tc_mask = tc_mask;
1507 	req->table_rule_mtr_read.clear = clear;
1508 
1509 	/* Send request and wait for response */
1510 	rsp = pipeline_msg_send_recv(p, req);
1511 	if (rsp == NULL)
1512 		return -1;
1513 
1514 	/* Read response */
1515 	status = rsp->status;
1516 	if (status)
1517 		memcpy(stats, &rsp->table_rule_mtr_read.stats, sizeof(*stats));
1518 
1519 	/* Free response */
1520 	pipeline_msg_free(rsp);
1521 
1522 	return status;
1523 }
1524 
1525 int
1526 pipeline_table_dscp_table_update(const char *pipeline_name,
1527 	uint32_t table_id,
1528 	uint64_t dscp_mask,
1529 	struct rte_table_action_dscp_table *dscp_table)
1530 {
1531 	struct pipeline *p;
1532 	struct pipeline_msg_req *req;
1533 	struct pipeline_msg_rsp *rsp;
1534 	int status;
1535 
1536 	/* Check input params */
1537 	if ((pipeline_name == NULL) ||
1538 		(dscp_table == NULL))
1539 		return -1;
1540 
1541 	p = pipeline_find(pipeline_name);
1542 	if ((p == NULL) ||
1543 		(p->enabled == 0) ||
1544 		(table_id >= p->n_tables))
1545 		return -1;
1546 
1547 	/* Allocate request */
1548 	req = pipeline_msg_alloc();
1549 	if (req == NULL)
1550 		return -1;
1551 
1552 	/* Write request */
1553 	req->type = PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE;
1554 	req->id = table_id;
1555 	req->table_dscp_table_update.dscp_mask = dscp_mask;
1556 	memcpy(&req->table_dscp_table_update.dscp_table,
1557 		dscp_table, sizeof(*dscp_table));
1558 
1559 	/* Send request and wait for response */
1560 	rsp = pipeline_msg_send_recv(p, req);
1561 	if (rsp == NULL)
1562 		return -1;
1563 
1564 	/* Read response */
1565 	status = rsp->status;
1566 
1567 	/* Free response */
1568 	pipeline_msg_free(rsp);
1569 
1570 	return status;
1571 }
1572 
1573 int
1574 pipeline_table_rule_ttl_read(const char *pipeline_name,
1575 	uint32_t table_id,
1576 	void *data,
1577 	struct rte_table_action_ttl_counters *stats,
1578 	int clear)
1579 {
1580 	struct pipeline *p;
1581 	struct pipeline_msg_req *req;
1582 	struct pipeline_msg_rsp *rsp;
1583 	int status;
1584 
1585 	/* Check input params */
1586 	if ((pipeline_name == NULL) ||
1587 		(data == NULL) ||
1588 		(stats == NULL))
1589 		return -1;
1590 
1591 	p = pipeline_find(pipeline_name);
1592 	if ((p == NULL) ||
1593 		(p->enabled == 0) ||
1594 		(table_id >= p->n_tables))
1595 		return -1;
1596 
1597 	/* Allocate request */
1598 	req = pipeline_msg_alloc();
1599 	if (req == NULL)
1600 		return -1;
1601 
1602 	/* Write request */
1603 	req->type = PIPELINE_REQ_TABLE_RULE_TTL_READ;
1604 	req->id = table_id;
1605 	req->table_rule_ttl_read.data = data;
1606 	req->table_rule_ttl_read.clear = clear;
1607 
1608 	/* Send request and wait for response */
1609 	rsp = pipeline_msg_send_recv(p, req);
1610 	if (rsp == NULL)
1611 		return -1;
1612 
1613 	/* Read response */
1614 	status = rsp->status;
1615 	if (status)
1616 		memcpy(stats, &rsp->table_rule_ttl_read.stats, sizeof(*stats));
1617 
1618 	/* Free response */
1619 	pipeline_msg_free(rsp);
1620 
1621 	return status;
1622 }
1623 
1624 /**
1625  * Data plane threads: message handling
1626  */
1627 static inline struct pipeline_msg_req *
1628 pipeline_msg_recv(struct rte_ring *msgq_req)
1629 {
1630 	struct pipeline_msg_req *req;
1631 
1632 	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
1633 
1634 	if (status != 0)
1635 		return NULL;
1636 
1637 	return req;
1638 }
1639 
1640 static inline void
1641 pipeline_msg_send(struct rte_ring *msgq_rsp,
1642 	struct pipeline_msg_rsp *rsp)
1643 {
1644 	int status;
1645 
1646 	do {
1647 		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
1648 	} while (status == -ENOBUFS);
1649 }
1650 
1651 static struct pipeline_msg_rsp *
1652 pipeline_msg_handle_port_in_stats_read(struct pipeline_data *p,
1653 	struct pipeline_msg_req *req)
1654 {
1655 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
1656 	uint32_t port_id = req->id;
1657 	int clear = req->port_in_stats_read.clear;
1658 
1659 	rsp->status = rte_pipeline_port_in_stats_read(p->p,
1660 		port_id,
1661 		&rsp->port_in_stats_read.stats,
1662 		clear);
1663 
1664 	return rsp;
1665 }
1666 
1667 static struct pipeline_msg_rsp *
1668 pipeline_msg_handle_port_in_enable(struct pipeline_data *p,
1669 	struct pipeline_msg_req *req)
1670 {
1671 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
1672 	uint32_t port_id = req->id;
1673 
1674 	rsp->status = rte_pipeline_port_in_enable(p->p,
1675 		port_id);
1676 
1677 	return rsp;
1678 }
1679 
1680 static struct pipeline_msg_rsp *
1681 pipeline_msg_handle_port_in_disable(struct pipeline_data *p,
1682 	struct pipeline_msg_req *req)
1683 {
1684 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
1685 	uint32_t port_id = req->id;
1686 
1687 	rsp->status = rte_pipeline_port_in_disable(p->p,
1688 		port_id);
1689 
1690 	return rsp;
1691 }
1692 
1693 static struct pipeline_msg_rsp *
1694 pipeline_msg_handle_port_out_stats_read(struct pipeline_data *p,
1695 	struct pipeline_msg_req *req)
1696 {
1697 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
1698 	uint32_t port_id = req->id;
1699 	int clear = req->port_out_stats_read.clear;
1700 
1701 	rsp->status = rte_pipeline_port_out_stats_read(p->p,
1702 		port_id,
1703 		&rsp->port_out_stats_read.stats,
1704 		clear);
1705 
1706 	return rsp;
1707 }
1708 
1709 static struct pipeline_msg_rsp *
1710 pipeline_msg_handle_table_stats_read(struct pipeline_data *p,
1711 	struct pipeline_msg_req *req)
1712 {
1713 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
1714 	uint32_t port_id = req->id;
1715 	int clear = req->table_stats_read.clear;
1716 
1717 	rsp->status = rte_pipeline_table_stats_read(p->p,
1718 		port_id,
1719 		&rsp->table_stats_read.stats,
1720 		clear);
1721 
1722 	return rsp;
1723 }
1724 
1725 union table_rule_match_low_level {
1726 	struct rte_table_acl_rule_add_params acl_add;
1727 	struct rte_table_acl_rule_delete_params acl_delete;
1728 	struct rte_table_array_key array;
1729 	uint8_t hash[TABLE_RULE_MATCH_SIZE_MAX];
1730 	struct rte_table_lpm_key lpm_ipv4;
1731 	struct rte_table_lpm_ipv6_key lpm_ipv6;
1732 };
1733 
1734 static int
1735 match_convert_ipv6_depth(uint32_t depth, uint32_t *depth32)
1736 {
1737 	if (depth > 128)
1738 		return -1;
1739 
1740 	switch (depth / 32) {
1741 	case 0:
1742 		depth32[0] = depth;
1743 		depth32[1] = 0;
1744 		depth32[2] = 0;
1745 		depth32[3] = 0;
1746 		return 0;
1747 
1748 	case 1:
1749 		depth32[0] = 32;
1750 		depth32[1] = depth - 32;
1751 		depth32[2] = 0;
1752 		depth32[3] = 0;
1753 		return 0;
1754 
1755 	case 2:
1756 		depth32[0] = 32;
1757 		depth32[1] = 32;
1758 		depth32[2] = depth - 64;
1759 		depth32[3] = 0;
1760 		return 0;
1761 
1762 	case 3:
1763 		depth32[0] = 32;
1764 		depth32[1] = 32;
1765 		depth32[2] = 32;
1766 		depth32[3] = depth - 96;
1767 		return 0;
1768 
1769 	case 4:
1770 		depth32[0] = 32;
1771 		depth32[1] = 32;
1772 		depth32[2] = 32;
1773 		depth32[3] = 32;
1774 		return 0;
1775 
1776 	default:
1777 		return -1;
1778 	}
1779 }
1780 
1781 static int
1782 match_convert(struct table_rule_match *mh,
1783 	union table_rule_match_low_level *ml,
1784 	int add)
1785 {
1786 	memset(ml, 0, sizeof(*ml));
1787 
1788 	switch (mh->match_type) {
1789 	case TABLE_ACL:
1790 		if (mh->match.acl.ip_version)
1791 			if (add) {
1792 				ml->acl_add.field_value[0].value.u8 =
1793 					mh->match.acl.proto;
1794 				ml->acl_add.field_value[0].mask_range.u8 =
1795 					mh->match.acl.proto_mask;
1796 
1797 				ml->acl_add.field_value[1].value.u32 =
1798 					mh->match.acl.ipv4.sa;
1799 				ml->acl_add.field_value[1].mask_range.u32 =
1800 					mh->match.acl.sa_depth;
1801 
1802 				ml->acl_add.field_value[2].value.u32 =
1803 					mh->match.acl.ipv4.da;
1804 				ml->acl_add.field_value[2].mask_range.u32 =
1805 					mh->match.acl.da_depth;
1806 
1807 				ml->acl_add.field_value[3].value.u16 =
1808 					mh->match.acl.sp0;
1809 				ml->acl_add.field_value[3].mask_range.u16 =
1810 					mh->match.acl.sp1;
1811 
1812 				ml->acl_add.field_value[4].value.u16 =
1813 					mh->match.acl.dp0;
1814 				ml->acl_add.field_value[4].mask_range.u16 =
1815 					mh->match.acl.dp1;
1816 
1817 				ml->acl_add.priority =
1818 					(int32_t) mh->match.acl.priority;
1819 			} else {
1820 				ml->acl_delete.field_value[0].value.u8 =
1821 					mh->match.acl.proto;
1822 				ml->acl_delete.field_value[0].mask_range.u8 =
1823 					mh->match.acl.proto_mask;
1824 
1825 				ml->acl_delete.field_value[1].value.u32 =
1826 					mh->match.acl.ipv4.sa;
1827 				ml->acl_delete.field_value[1].mask_range.u32 =
1828 					mh->match.acl.sa_depth;
1829 
1830 				ml->acl_delete.field_value[2].value.u32 =
1831 					mh->match.acl.ipv4.da;
1832 				ml->acl_delete.field_value[2].mask_range.u32 =
1833 					mh->match.acl.da_depth;
1834 
1835 				ml->acl_delete.field_value[3].value.u16 =
1836 					mh->match.acl.sp0;
1837 				ml->acl_delete.field_value[3].mask_range.u16 =
1838 					mh->match.acl.sp1;
1839 
1840 				ml->acl_delete.field_value[4].value.u16 =
1841 					mh->match.acl.dp0;
1842 				ml->acl_delete.field_value[4].mask_range.u16 =
1843 					mh->match.acl.dp1;
1844 			}
1845 		else
1846 			if (add) {
1847 				uint32_t *sa32 =
1848 					(uint32_t *) mh->match.acl.ipv6.sa;
1849 				uint32_t *da32 =
1850 					(uint32_t *) mh->match.acl.ipv6.da;
1851 				uint32_t sa32_depth[4], da32_depth[4];
1852 				int status;
1853 
1854 				status = match_convert_ipv6_depth(
1855 					mh->match.acl.sa_depth,
1856 					sa32_depth);
1857 				if (status)
1858 					return status;
1859 
1860 				status = match_convert_ipv6_depth(
1861 					mh->match.acl.da_depth,
1862 					da32_depth);
1863 				if (status)
1864 					return status;
1865 
1866 				ml->acl_add.field_value[0].value.u8 =
1867 					mh->match.acl.proto;
1868 				ml->acl_add.field_value[0].mask_range.u8 =
1869 					mh->match.acl.proto_mask;
1870 
1871 				ml->acl_add.field_value[1].value.u32 = sa32[0];
1872 				ml->acl_add.field_value[1].mask_range.u32 =
1873 					sa32_depth[0];
1874 				ml->acl_add.field_value[2].value.u32 = sa32[1];
1875 				ml->acl_add.field_value[2].mask_range.u32 =
1876 					sa32_depth[1];
1877 				ml->acl_add.field_value[3].value.u32 = sa32[2];
1878 				ml->acl_add.field_value[3].mask_range.u32 =
1879 					sa32_depth[2];
1880 				ml->acl_add.field_value[4].value.u32 = sa32[3];
1881 				ml->acl_add.field_value[4].mask_range.u32 =
1882 					sa32_depth[3];
1883 
1884 				ml->acl_add.field_value[5].value.u32 = da32[0];
1885 				ml->acl_add.field_value[5].mask_range.u32 =
1886 					da32_depth[0];
1887 				ml->acl_add.field_value[6].value.u32 = da32[1];
1888 				ml->acl_add.field_value[6].mask_range.u32 =
1889 					da32_depth[1];
1890 				ml->acl_add.field_value[7].value.u32 = da32[2];
1891 				ml->acl_add.field_value[7].mask_range.u32 =
1892 					da32_depth[2];
1893 				ml->acl_add.field_value[8].value.u32 = da32[3];
1894 				ml->acl_add.field_value[8].mask_range.u32 =
1895 					da32_depth[3];
1896 
1897 				ml->acl_add.field_value[9].value.u16 =
1898 					mh->match.acl.sp0;
1899 				ml->acl_add.field_value[9].mask_range.u16 =
1900 					mh->match.acl.sp1;
1901 
1902 				ml->acl_add.field_value[10].value.u16 =
1903 					mh->match.acl.dp0;
1904 				ml->acl_add.field_value[10].mask_range.u16 =
1905 					mh->match.acl.dp1;
1906 
1907 				ml->acl_add.priority =
1908 					(int32_t) mh->match.acl.priority;
1909 			} else {
1910 				uint32_t *sa32 =
1911 					(uint32_t *) mh->match.acl.ipv6.sa;
1912 				uint32_t *da32 =
1913 					(uint32_t *) mh->match.acl.ipv6.da;
1914 				uint32_t sa32_depth[4], da32_depth[4];
1915 				int status;
1916 
1917 				status = match_convert_ipv6_depth(
1918 					mh->match.acl.sa_depth,
1919 					sa32_depth);
1920 				if (status)
1921 					return status;
1922 
1923 				status = match_convert_ipv6_depth(
1924 					mh->match.acl.da_depth,
1925 					da32_depth);
1926 				if (status)
1927 					return status;
1928 
1929 				ml->acl_delete.field_value[0].value.u8 =
1930 					mh->match.acl.proto;
1931 				ml->acl_delete.field_value[0].mask_range.u8 =
1932 					mh->match.acl.proto_mask;
1933 
1934 				ml->acl_delete.field_value[1].value.u32 =
1935 					sa32[0];
1936 				ml->acl_delete.field_value[1].mask_range.u32 =
1937 					sa32_depth[0];
1938 				ml->acl_delete.field_value[2].value.u32 =
1939 					sa32[1];
1940 				ml->acl_delete.field_value[2].mask_range.u32 =
1941 					sa32_depth[1];
1942 				ml->acl_delete.field_value[3].value.u32 =
1943 					sa32[2];
1944 				ml->acl_delete.field_value[3].mask_range.u32 =
1945 					sa32_depth[2];
1946 				ml->acl_delete.field_value[4].value.u32 =
1947 					sa32[3];
1948 				ml->acl_delete.field_value[4].mask_range.u32 =
1949 					sa32_depth[3];
1950 
1951 				ml->acl_delete.field_value[5].value.u32 =
1952 					da32[0];
1953 				ml->acl_delete.field_value[5].mask_range.u32 =
1954 					da32_depth[0];
1955 				ml->acl_delete.field_value[6].value.u32 =
1956 					da32[1];
1957 				ml->acl_delete.field_value[6].mask_range.u32 =
1958 					da32_depth[1];
1959 				ml->acl_delete.field_value[7].value.u32 =
1960 					da32[2];
1961 				ml->acl_delete.field_value[7].mask_range.u32 =
1962 					da32_depth[2];
1963 				ml->acl_delete.field_value[8].value.u32 =
1964 					da32[3];
1965 				ml->acl_delete.field_value[8].mask_range.u32 =
1966 					da32_depth[3];
1967 
1968 				ml->acl_delete.field_value[9].value.u16 =
1969 					mh->match.acl.sp0;
1970 				ml->acl_delete.field_value[9].mask_range.u16 =
1971 					mh->match.acl.sp1;
1972 
1973 				ml->acl_delete.field_value[10].value.u16 =
1974 					mh->match.acl.dp0;
1975 				ml->acl_delete.field_value[10].mask_range.u16 =
1976 					mh->match.acl.dp1;
1977 			}
1978 		return 0;
1979 
1980 	case TABLE_ARRAY:
1981 		ml->array.pos = mh->match.array.pos;
1982 		return 0;
1983 
1984 	case TABLE_HASH:
1985 		memcpy(ml->hash, mh->match.hash.key, sizeof(ml->hash));
1986 		return 0;
1987 
1988 	case TABLE_LPM:
1989 		if (mh->match.lpm.ip_version) {
1990 			ml->lpm_ipv4.ip = mh->match.lpm.ipv4;
1991 			ml->lpm_ipv4.depth = mh->match.lpm.depth;
1992 		} else {
1993 			memcpy(ml->lpm_ipv6.ip,
1994 				mh->match.lpm.ipv6, sizeof(ml->lpm_ipv6.ip));
1995 			ml->lpm_ipv6.depth = mh->match.lpm.depth;
1996 		}
1997 
1998 		return 0;
1999 
2000 	default:
2001 		return -1;
2002 	}
2003 }
2004 
2005 static struct pipeline_msg_rsp *
2006 pipeline_msg_handle_table_rule_add(struct pipeline_data *p,
2007 	struct pipeline_msg_req *req)
2008 {
2009 	union table_rule_match_low_level match_ll;
2010 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2011 	struct table_rule_match *match = &req->table_rule_add.match;
2012 	struct table_rule_action *action = &req->table_rule_add.action;
2013 	struct rte_pipeline_table_entry *data_in, *data_out;
2014 	uint32_t table_id = req->id;
2015 	int key_found, status;
2016 	struct rte_table_action *a = p->table_data[table_id].a;
2017 
2018 	/* Apply actions */
2019 	memset(p->buffer, 0, sizeof(p->buffer));
2020 	data_in = (struct rte_pipeline_table_entry *) p->buffer;
2021 
2022 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
2023 		status = rte_table_action_apply(a,
2024 			data_in,
2025 			RTE_TABLE_ACTION_FWD,
2026 			&action->fwd);
2027 
2028 		if (status) {
2029 			rsp->status = -1;
2030 			return rsp;
2031 		}
2032 	}
2033 
2034 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_LB)) {
2035 		status = rte_table_action_apply(a,
2036 			data_in,
2037 			RTE_TABLE_ACTION_LB,
2038 			&action->lb);
2039 
2040 		if (status) {
2041 			rsp->status = -1;
2042 			return rsp;
2043 		}
2044 	}
2045 
2046 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
2047 		status = rte_table_action_apply(a,
2048 			data_in,
2049 			RTE_TABLE_ACTION_MTR,
2050 			&action->mtr);
2051 
2052 		if (status) {
2053 			rsp->status = -1;
2054 			return rsp;
2055 		}
2056 	}
2057 
2058 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
2059 		status = rte_table_action_apply(a,
2060 			data_in,
2061 			RTE_TABLE_ACTION_TM,
2062 			&action->tm);
2063 
2064 		if (status) {
2065 			rsp->status = -1;
2066 			return rsp;
2067 		}
2068 	}
2069 
2070 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
2071 		status = rte_table_action_apply(a,
2072 			data_in,
2073 			RTE_TABLE_ACTION_ENCAP,
2074 			&action->encap);
2075 
2076 		if (status) {
2077 			rsp->status = -1;
2078 			return rsp;
2079 		}
2080 	}
2081 
2082 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
2083 		status = rte_table_action_apply(a,
2084 			data_in,
2085 			RTE_TABLE_ACTION_NAT,
2086 			&action->nat);
2087 
2088 		if (status) {
2089 			rsp->status = -1;
2090 			return rsp;
2091 		}
2092 	}
2093 
2094 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TTL)) {
2095 		status = rte_table_action_apply(a,
2096 			data_in,
2097 			RTE_TABLE_ACTION_TTL,
2098 			&action->ttl);
2099 
2100 		if (status) {
2101 			rsp->status = -1;
2102 			return rsp;
2103 		}
2104 	}
2105 
2106 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_STATS)) {
2107 		status = rte_table_action_apply(a,
2108 			data_in,
2109 			RTE_TABLE_ACTION_STATS,
2110 			&action->stats);
2111 
2112 		if (status) {
2113 			rsp->status = -1;
2114 			return rsp;
2115 		}
2116 	}
2117 
2118 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TIME)) {
2119 		status = rte_table_action_apply(a,
2120 			data_in,
2121 			RTE_TABLE_ACTION_TIME,
2122 			&action->time);
2123 
2124 		if (status) {
2125 			rsp->status = -1;
2126 			return rsp;
2127 		}
2128 	}
2129 
2130 	/* Add rule (match, action) to table */
2131 	status = match_convert(match, &match_ll, 1);
2132 	if (status) {
2133 		rsp->status = -1;
2134 		return rsp;
2135 	}
2136 
2137 	status = rte_pipeline_table_entry_add(p->p,
2138 		table_id,
2139 		&match_ll,
2140 		data_in,
2141 		&key_found,
2142 		&data_out);
2143 	if (status) {
2144 		rsp->status = -1;
2145 		return rsp;
2146 	}
2147 
2148 	/* Write response */
2149 	rsp->status = 0;
2150 	rsp->table_rule_add.data = data_out;
2151 
2152 	return rsp;
2153 }
2154 
2155 static struct pipeline_msg_rsp *
2156 pipeline_msg_handle_table_rule_add_default(struct pipeline_data *p,
2157 	struct pipeline_msg_req *req)
2158 {
2159 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2160 	struct table_rule_action *action = &req->table_rule_add_default.action;
2161 	struct rte_pipeline_table_entry *data_in, *data_out;
2162 	uint32_t table_id = req->id;
2163 	int status;
2164 
2165 	/* Apply actions */
2166 	memset(p->buffer, 0, sizeof(p->buffer));
2167 	data_in = (struct rte_pipeline_table_entry *) p->buffer;
2168 
2169 	data_in->action = action->fwd.action;
2170 	if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
2171 		data_in->port_id = action->fwd.id;
2172 	if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
2173 		data_in->table_id = action->fwd.id;
2174 
2175 	/* Add default rule to table */
2176 	status = rte_pipeline_table_default_entry_add(p->p,
2177 		table_id,
2178 		data_in,
2179 		&data_out);
2180 	if (status) {
2181 		rsp->status = -1;
2182 		return rsp;
2183 	}
2184 
2185 	/* Write response */
2186 	rsp->status = 0;
2187 	rsp->table_rule_add_default.data = data_out;
2188 
2189 	return rsp;
2190 }
2191 
2192 static struct pipeline_msg_rsp *
2193 pipeline_msg_handle_table_rule_add_bulk(struct pipeline_data *p,
2194 	struct pipeline_msg_req *req)
2195 {
2196 
2197 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2198 
2199 	uint32_t table_id = req->id;
2200 	struct table_rule_match *match = req->table_rule_add_bulk.match;
2201 	struct table_rule_action *action = req->table_rule_add_bulk.action;
2202 	struct rte_pipeline_table_entry **data =
2203 		(struct rte_pipeline_table_entry **)req->table_rule_add_bulk.data;
2204 	uint32_t n_rules = req->table_rule_add_bulk.n_rules;
2205 	uint32_t bulk = req->table_rule_add_bulk.bulk;
2206 
2207 	struct rte_table_action *a = p->table_data[table_id].a;
2208 	union table_rule_match_low_level *match_ll;
2209 	uint8_t *action_ll;
2210 	void **match_ll_ptr;
2211 	struct rte_pipeline_table_entry **action_ll_ptr;
2212 	int *found, status;
2213 	uint32_t i;
2214 
2215 	/* Memory allocation */
2216 	match_ll = calloc(n_rules, sizeof(union table_rule_match_low_level));
2217 	action_ll = calloc(n_rules, TABLE_RULE_ACTION_SIZE_MAX);
2218 	match_ll_ptr = calloc(n_rules, sizeof(void *));
2219 	action_ll_ptr =
2220 		calloc(n_rules, sizeof(struct rte_pipeline_table_entry *));
2221 	found = calloc(n_rules, sizeof(int));
2222 
2223 	if ((match_ll == NULL) ||
2224 		(action_ll == NULL) ||
2225 		(match_ll_ptr == NULL) ||
2226 		(action_ll_ptr == NULL) ||
2227 		(found == NULL))
2228 		goto fail;
2229 
2230 	for (i = 0; i < n_rules; i++) {
2231 		match_ll_ptr[i] = (void *)&match_ll[i];
2232 		action_ll_ptr[i] =
2233 			(struct rte_pipeline_table_entry *)&action_ll[i * TABLE_RULE_ACTION_SIZE_MAX];
2234 	}
2235 
2236 	/* Rule match conversion */
2237 	for (i = 0; i < n_rules; i++) {
2238 		status = match_convert(&match[i], match_ll_ptr[i], 1);
2239 		if (status)
2240 			goto fail;
2241 	}
2242 
2243 	/* Rule action conversion */
2244 	for (i = 0; i < n_rules; i++) {
2245 		void *data_in = action_ll_ptr[i];
2246 		struct table_rule_action *act = &action[i];
2247 
2248 		if (act->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
2249 			status = rte_table_action_apply(a,
2250 				data_in,
2251 				RTE_TABLE_ACTION_FWD,
2252 				&act->fwd);
2253 
2254 			if (status)
2255 				goto fail;
2256 		}
2257 
2258 		if (act->action_mask & (1LLU << RTE_TABLE_ACTION_LB)) {
2259 			status = rte_table_action_apply(a,
2260 				data_in,
2261 				RTE_TABLE_ACTION_LB,
2262 				&act->lb);
2263 
2264 			if (status)
2265 				goto fail;
2266 		}
2267 
2268 		if (act->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
2269 			status = rte_table_action_apply(a,
2270 				data_in,
2271 				RTE_TABLE_ACTION_MTR,
2272 				&act->mtr);
2273 
2274 			if (status)
2275 				goto fail;
2276 		}
2277 
2278 		if (act->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
2279 			status = rte_table_action_apply(a,
2280 				data_in,
2281 				RTE_TABLE_ACTION_TM,
2282 				&act->tm);
2283 
2284 			if (status)
2285 				goto fail;
2286 		}
2287 
2288 		if (act->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
2289 			status = rte_table_action_apply(a,
2290 				data_in,
2291 				RTE_TABLE_ACTION_ENCAP,
2292 				&act->encap);
2293 
2294 			if (status)
2295 				goto fail;
2296 		}
2297 
2298 		if (act->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
2299 			status = rte_table_action_apply(a,
2300 				data_in,
2301 				RTE_TABLE_ACTION_NAT,
2302 				&act->nat);
2303 
2304 			if (status)
2305 				goto fail;
2306 		}
2307 
2308 		if (act->action_mask & (1LLU << RTE_TABLE_ACTION_TTL)) {
2309 			status = rte_table_action_apply(a,
2310 				data_in,
2311 				RTE_TABLE_ACTION_TTL,
2312 				&act->ttl);
2313 
2314 			if (status)
2315 				goto fail;
2316 		}
2317 
2318 		if (act->action_mask & (1LLU << RTE_TABLE_ACTION_STATS)) {
2319 			status = rte_table_action_apply(a,
2320 				data_in,
2321 				RTE_TABLE_ACTION_STATS,
2322 				&act->stats);
2323 
2324 			if (status)
2325 				goto fail;
2326 		}
2327 
2328 		if (act->action_mask & (1LLU << RTE_TABLE_ACTION_TIME)) {
2329 			status = rte_table_action_apply(a,
2330 				data_in,
2331 				RTE_TABLE_ACTION_TIME,
2332 				&act->time);
2333 
2334 			if (status)
2335 				goto fail;
2336 		}
2337 	}
2338 
2339 	/* Add rule (match, action) to table */
2340 	if (bulk) {
2341 		status = rte_pipeline_table_entry_add_bulk(p->p,
2342 			table_id,
2343 			match_ll_ptr,
2344 			action_ll_ptr,
2345 			n_rules,
2346 			found,
2347 			data);
2348 		if (status)
2349 			n_rules = 0;
2350 	} else
2351 		for (i = 0; i < n_rules; i++) {
2352 			status = rte_pipeline_table_entry_add(p->p,
2353 				table_id,
2354 				match_ll_ptr[i],
2355 				action_ll_ptr[i],
2356 				&found[i],
2357 				&data[i]);
2358 			if (status) {
2359 				n_rules = i;
2360 				break;
2361 			}
2362 		}
2363 
2364 	/* Write response */
2365 	rsp->status = 0;
2366 	rsp->table_rule_add_bulk.n_rules = n_rules;
2367 
2368 	/* Free */
2369 	free(found);
2370 	free(action_ll_ptr);
2371 	free(match_ll_ptr);
2372 	free(action_ll);
2373 	free(match_ll);
2374 
2375 	return rsp;
2376 
2377 fail:
2378 	free(found);
2379 	free(action_ll_ptr);
2380 	free(match_ll_ptr);
2381 	free(action_ll);
2382 	free(match_ll);
2383 
2384 	rsp->status = -1;
2385 	rsp->table_rule_add_bulk.n_rules = 0;
2386 	return rsp;
2387 }
2388 
2389 static struct pipeline_msg_rsp *
2390 pipeline_msg_handle_table_rule_delete(struct pipeline_data *p,
2391 	struct pipeline_msg_req *req)
2392 {
2393 	union table_rule_match_low_level match_ll;
2394 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2395 	struct table_rule_match *match = &req->table_rule_delete.match;
2396 	uint32_t table_id = req->id;
2397 	int key_found, status;
2398 
2399 	status = match_convert(match, &match_ll, 0);
2400 	if (status) {
2401 		rsp->status = -1;
2402 		return rsp;
2403 	}
2404 
2405 	rsp->status = rte_pipeline_table_entry_delete(p->p,
2406 		table_id,
2407 		&match_ll,
2408 		&key_found,
2409 		NULL);
2410 
2411 	return rsp;
2412 }
2413 
2414 static struct pipeline_msg_rsp *
2415 pipeline_msg_handle_table_rule_delete_default(struct pipeline_data *p,
2416 	struct pipeline_msg_req *req)
2417 {
2418 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2419 	uint32_t table_id = req->id;
2420 
2421 	rsp->status = rte_pipeline_table_default_entry_delete(p->p,
2422 		table_id,
2423 		NULL);
2424 
2425 	return rsp;
2426 }
2427 
2428 static struct pipeline_msg_rsp *
2429 pipeline_msg_handle_table_rule_stats_read(struct pipeline_data *p,
2430 	struct pipeline_msg_req *req)
2431 {
2432 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2433 	uint32_t table_id = req->id;
2434 	void *data = req->table_rule_stats_read.data;
2435 	int clear = req->table_rule_stats_read.clear;
2436 	struct rte_table_action *a = p->table_data[table_id].a;
2437 
2438 	rsp->status = rte_table_action_stats_read(a,
2439 		data,
2440 		&rsp->table_rule_stats_read.stats,
2441 		clear);
2442 
2443 	return rsp;
2444 }
2445 
2446 static struct pipeline_msg_rsp *
2447 pipeline_msg_handle_table_mtr_profile_add(struct pipeline_data *p,
2448 	struct pipeline_msg_req *req)
2449 {
2450 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2451 	uint32_t table_id = req->id;
2452 	uint32_t meter_profile_id = req->table_mtr_profile_add.meter_profile_id;
2453 	struct rte_table_action_meter_profile *profile =
2454 		&req->table_mtr_profile_add.profile;
2455 	struct rte_table_action *a = p->table_data[table_id].a;
2456 
2457 	rsp->status = rte_table_action_meter_profile_add(a,
2458 		meter_profile_id,
2459 		profile);
2460 
2461 	return rsp;
2462 }
2463 
2464 static struct pipeline_msg_rsp *
2465 pipeline_msg_handle_table_mtr_profile_delete(struct pipeline_data *p,
2466 	struct pipeline_msg_req *req)
2467 {
2468 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2469 	uint32_t table_id = req->id;
2470 	uint32_t meter_profile_id =
2471 		req->table_mtr_profile_delete.meter_profile_id;
2472 	struct rte_table_action *a = p->table_data[table_id].a;
2473 
2474 	rsp->status = rte_table_action_meter_profile_delete(a,
2475 		meter_profile_id);
2476 
2477 	return rsp;
2478 }
2479 
2480 static struct pipeline_msg_rsp *
2481 pipeline_msg_handle_table_rule_mtr_read(struct pipeline_data *p,
2482 	struct pipeline_msg_req *req)
2483 {
2484 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2485 	uint32_t table_id = req->id;
2486 	void *data = req->table_rule_mtr_read.data;
2487 	uint32_t tc_mask = req->table_rule_mtr_read.tc_mask;
2488 	int clear = req->table_rule_mtr_read.clear;
2489 	struct rte_table_action *a = p->table_data[table_id].a;
2490 
2491 	rsp->status = rte_table_action_meter_read(a,
2492 		data,
2493 		tc_mask,
2494 		&rsp->table_rule_mtr_read.stats,
2495 		clear);
2496 
2497 	return rsp;
2498 }
2499 
2500 static struct pipeline_msg_rsp *
2501 pipeline_msg_handle_table_dscp_table_update(struct pipeline_data *p,
2502 	struct pipeline_msg_req *req)
2503 {
2504 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2505 	uint32_t table_id = req->id;
2506 	uint64_t dscp_mask = req->table_dscp_table_update.dscp_mask;
2507 	struct rte_table_action_dscp_table *dscp_table =
2508 		&req->table_dscp_table_update.dscp_table;
2509 	struct rte_table_action *a = p->table_data[table_id].a;
2510 
2511 	rsp->status = rte_table_action_dscp_table_update(a,
2512 		dscp_mask,
2513 		dscp_table);
2514 
2515 	return rsp;
2516 }
2517 
2518 static struct pipeline_msg_rsp *
2519 pipeline_msg_handle_table_rule_ttl_read(struct pipeline_data *p,
2520 	struct pipeline_msg_req *req)
2521 {
2522 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2523 	uint32_t table_id = req->id;
2524 	void *data = req->table_rule_ttl_read.data;
2525 	int clear = req->table_rule_ttl_read.clear;
2526 	struct rte_table_action *a = p->table_data[table_id].a;
2527 
2528 	rsp->status = rte_table_action_ttl_read(a,
2529 		data,
2530 		&rsp->table_rule_ttl_read.stats,
2531 		clear);
2532 
2533 	return rsp;
2534 }
2535 
2536 static void
2537 pipeline_msg_handle(struct pipeline_data *p)
2538 {
2539 	for ( ; ; ) {
2540 		struct pipeline_msg_req *req;
2541 		struct pipeline_msg_rsp *rsp;
2542 
2543 		req = pipeline_msg_recv(p->msgq_req);
2544 		if (req == NULL)
2545 			break;
2546 
2547 		switch (req->type) {
2548 		case PIPELINE_REQ_PORT_IN_STATS_READ:
2549 			rsp = pipeline_msg_handle_port_in_stats_read(p, req);
2550 			break;
2551 
2552 		case PIPELINE_REQ_PORT_IN_ENABLE:
2553 			rsp = pipeline_msg_handle_port_in_enable(p, req);
2554 			break;
2555 
2556 		case PIPELINE_REQ_PORT_IN_DISABLE:
2557 			rsp = pipeline_msg_handle_port_in_disable(p, req);
2558 			break;
2559 
2560 		case PIPELINE_REQ_PORT_OUT_STATS_READ:
2561 			rsp = pipeline_msg_handle_port_out_stats_read(p, req);
2562 			break;
2563 
2564 		case PIPELINE_REQ_TABLE_STATS_READ:
2565 			rsp = pipeline_msg_handle_table_stats_read(p, req);
2566 			break;
2567 
2568 		case PIPELINE_REQ_TABLE_RULE_ADD:
2569 			rsp = pipeline_msg_handle_table_rule_add(p, req);
2570 			break;
2571 
2572 		case PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT:
2573 			rsp = pipeline_msg_handle_table_rule_add_default(p,	req);
2574 			break;
2575 
2576 		case PIPELINE_REQ_TABLE_RULE_ADD_BULK:
2577 			rsp = pipeline_msg_handle_table_rule_add_bulk(p, req);
2578 			break;
2579 
2580 		case PIPELINE_REQ_TABLE_RULE_DELETE:
2581 			rsp = pipeline_msg_handle_table_rule_delete(p, req);
2582 			break;
2583 
2584 		case PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT:
2585 			rsp = pipeline_msg_handle_table_rule_delete_default(p, req);
2586 			break;
2587 
2588 		case PIPELINE_REQ_TABLE_RULE_STATS_READ:
2589 			rsp = pipeline_msg_handle_table_rule_stats_read(p, req);
2590 			break;
2591 
2592 		case PIPELINE_REQ_TABLE_MTR_PROFILE_ADD:
2593 			rsp = pipeline_msg_handle_table_mtr_profile_add(p, req);
2594 			break;
2595 
2596 		case PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE:
2597 			rsp = pipeline_msg_handle_table_mtr_profile_delete(p, req);
2598 			break;
2599 
2600 		case PIPELINE_REQ_TABLE_RULE_MTR_READ:
2601 			rsp = pipeline_msg_handle_table_rule_mtr_read(p, req);
2602 			break;
2603 
2604 		case PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE:
2605 			rsp = pipeline_msg_handle_table_dscp_table_update(p, req);
2606 			break;
2607 
2608 		case PIPELINE_REQ_TABLE_RULE_TTL_READ:
2609 			rsp = pipeline_msg_handle_table_rule_ttl_read(p, req);
2610 			break;
2611 
2612 		default:
2613 			rsp = (struct pipeline_msg_rsp *) req;
2614 			rsp->status = -1;
2615 		}
2616 
2617 		pipeline_msg_send(p->msgq_rsp, rsp);
2618 	}
2619 }
2620 
2621 /**
2622  * Data plane threads: main
2623  */
2624 int
2625 thread_main(void *arg __rte_unused)
2626 {
2627 	struct thread_data *t;
2628 	uint32_t thread_id, i;
2629 
2630 	thread_id = rte_lcore_id();
2631 	t = &thread_data[thread_id];
2632 
2633 	/* Dispatch loop */
2634 	for (i = 0; ; i++) {
2635 		uint32_t j;
2636 
2637 		/* Data Plane */
2638 		for (j = 0; j < t->n_pipelines; j++)
2639 			rte_pipeline_run(t->p[j]);
2640 
2641 		/* Control Plane */
2642 		if ((i & 0xF) == 0) {
2643 			uint64_t time = rte_get_tsc_cycles();
2644 			uint64_t time_next_min = UINT64_MAX;
2645 
2646 			if (time < t->time_next_min)
2647 				continue;
2648 
2649 			/* Pipeline message queues */
2650 			for (j = 0; j < t->n_pipelines; j++) {
2651 				struct pipeline_data *p =
2652 					&t->pipeline_data[j];
2653 				uint64_t time_next = p->time_next;
2654 
2655 				if (time_next <= time) {
2656 					pipeline_msg_handle(p);
2657 					rte_pipeline_flush(p->p);
2658 					time_next = time + p->timer_period;
2659 					p->time_next = time_next;
2660 				}
2661 
2662 				if (time_next < time_next_min)
2663 					time_next_min = time_next;
2664 			}
2665 
2666 			/* Thread message queues */
2667 			{
2668 				uint64_t time_next = t->time_next;
2669 
2670 				if (time_next <= time) {
2671 					thread_msg_handle(t);
2672 					time_next = time + t->timer_period;
2673 					t->time_next = time_next;
2674 				}
2675 
2676 				if (time_next < time_next_min)
2677 					time_next_min = time_next;
2678 			}
2679 
2680 			t->time_next_min = time_next_min;
2681 		}
2682 	}
2683 
2684 	return 0;
2685 }
2686