xref: /dpdk/examples/ip_pipeline/thread.c (revision 09442498ef736d0a96632cf8b8c15d8ca78a6468)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2018 Intel Corporation
3  */
4 
5 #include <stdlib.h>
6 
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11 
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17 
18 #include "common.h"
19 #include "thread.h"
20 #include "pipeline.h"
21 
22 #ifndef THREAD_PIPELINES_MAX
23 #define THREAD_PIPELINES_MAX                               256
24 #endif
25 
26 #ifndef THREAD_MSGQ_SIZE
27 #define THREAD_MSGQ_SIZE                                   64
28 #endif
29 
30 #ifndef THREAD_TIMER_PERIOD_MS
31 #define THREAD_TIMER_PERIOD_MS                             100
32 #endif
33 
34 /**
35  * Main thread: data plane thread context
36  */
37 struct thread {
38 	struct rte_ring *msgq_req;
39 	struct rte_ring *msgq_rsp;
40 
41 	uint32_t enabled;
42 };
43 
44 static struct thread thread[RTE_MAX_LCORE];
45 
46 /**
47  * Data plane threads: context
48  */
49 struct table_data {
50 	struct rte_table_action *a;
51 };
52 
53 struct pipeline_data {
54 	struct rte_pipeline *p;
55 	struct table_data table_data[RTE_PIPELINE_TABLE_MAX];
56 	uint32_t n_tables;
57 
58 	struct rte_ring *msgq_req;
59 	struct rte_ring *msgq_rsp;
60 	uint64_t timer_period; /* Measured in CPU cycles. */
61 	uint64_t time_next;
62 
63 	uint8_t buffer[TABLE_RULE_ACTION_SIZE_MAX];
64 };
65 
66 struct thread_data {
67 	struct rte_pipeline *p[THREAD_PIPELINES_MAX];
68 	uint32_t n_pipelines;
69 
70 	struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
71 	struct rte_ring *msgq_req;
72 	struct rte_ring *msgq_rsp;
73 	uint64_t timer_period; /* Measured in CPU cycles. */
74 	uint64_t time_next;
75 	uint64_t time_next_min;
76 } __rte_cache_aligned;
77 
78 static struct thread_data thread_data[RTE_MAX_LCORE];
79 
80 /**
81  * Main thread: data plane thread init
82  */
83 static void
84 thread_free(void)
85 {
86 	uint32_t i;
87 
88 	for (i = 0; i < RTE_MAX_LCORE; i++) {
89 		struct thread *t = &thread[i];
90 
91 		if (!rte_lcore_is_enabled(i))
92 			continue;
93 
94 		/* MSGQs */
95 		rte_ring_free(t->msgq_req);
96 
97 		rte_ring_free(t->msgq_rsp);
98 	}
99 }
100 
101 int
102 thread_init(void)
103 {
104 	uint32_t i;
105 
106 	RTE_LCORE_FOREACH_WORKER(i) {
107 		char name[NAME_MAX];
108 		struct rte_ring *msgq_req, *msgq_rsp;
109 		struct thread *t = &thread[i];
110 		struct thread_data *t_data = &thread_data[i];
111 		uint32_t cpu_id = rte_lcore_to_socket_id(i);
112 
113 		/* MSGQs */
114 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
115 
116 		msgq_req = rte_ring_create(name,
117 			THREAD_MSGQ_SIZE,
118 			cpu_id,
119 			RING_F_SP_ENQ | RING_F_SC_DEQ);
120 
121 		if (msgq_req == NULL) {
122 			thread_free();
123 			return -1;
124 		}
125 
126 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
127 
128 		msgq_rsp = rte_ring_create(name,
129 			THREAD_MSGQ_SIZE,
130 			cpu_id,
131 			RING_F_SP_ENQ | RING_F_SC_DEQ);
132 
133 		if (msgq_rsp == NULL) {
134 			thread_free();
135 			return -1;
136 		}
137 
138 		/* Main thread records */
139 		t->msgq_req = msgq_req;
140 		t->msgq_rsp = msgq_rsp;
141 		t->enabled = 1;
142 
143 		/* Data plane thread records */
144 		t_data->n_pipelines = 0;
145 		t_data->msgq_req = msgq_req;
146 		t_data->msgq_rsp = msgq_rsp;
147 		t_data->timer_period =
148 			(rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
149 		t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
150 		t_data->time_next_min = t_data->time_next;
151 	}
152 
153 	return 0;
154 }
155 
156 static inline int
157 thread_is_running(uint32_t thread_id)
158 {
159 	enum rte_lcore_state_t thread_state;
160 
161 	thread_state = rte_eal_get_lcore_state(thread_id);
162 	return (thread_state == RUNNING) ? 1 : 0;
163 }
164 
165 /**
166  * Pipeline is running when:
167  *    (A) Pipeline is mapped to a data plane thread AND
168  *    (B) Its data plane thread is in RUNNING state.
169  */
170 static inline int
171 pipeline_is_running(struct pipeline *p)
172 {
173 	if (p->enabled == 0)
174 		return 0;
175 
176 	return thread_is_running(p->thread_id);
177 }
178 
179 /**
180  * Main thread & data plane threads: message passing
181  */
182 enum thread_req_type {
183 	THREAD_REQ_PIPELINE_ENABLE = 0,
184 	THREAD_REQ_PIPELINE_DISABLE,
185 	THREAD_REQ_MAX
186 };
187 
188 struct thread_msg_req {
189 	enum thread_req_type type;
190 
191 	union {
192 		struct {
193 			struct rte_pipeline *p;
194 			struct {
195 				struct rte_table_action *a;
196 			} table[RTE_PIPELINE_TABLE_MAX];
197 			struct rte_ring *msgq_req;
198 			struct rte_ring *msgq_rsp;
199 			uint32_t timer_period_ms;
200 			uint32_t n_tables;
201 		} pipeline_enable;
202 
203 		struct {
204 			struct rte_pipeline *p;
205 		} pipeline_disable;
206 	};
207 };
208 
209 struct thread_msg_rsp {
210 	int status;
211 };
212 
213 /**
214  * Main thread
215  */
216 static struct thread_msg_req *
217 thread_msg_alloc(void)
218 {
219 	size_t size = RTE_MAX(sizeof(struct thread_msg_req),
220 		sizeof(struct thread_msg_rsp));
221 
222 	return calloc(1, size);
223 }
224 
225 static void
226 thread_msg_free(struct thread_msg_rsp *rsp)
227 {
228 	free(rsp);
229 }
230 
231 static struct thread_msg_rsp *
232 thread_msg_send_recv(uint32_t thread_id,
233 	struct thread_msg_req *req)
234 {
235 	struct thread *t = &thread[thread_id];
236 	struct rte_ring *msgq_req = t->msgq_req;
237 	struct rte_ring *msgq_rsp = t->msgq_rsp;
238 	struct thread_msg_rsp *rsp;
239 	int status;
240 
241 	/* send */
242 	do {
243 		status = rte_ring_sp_enqueue(msgq_req, req);
244 	} while (status == -ENOBUFS);
245 
246 	/* recv */
247 	do {
248 		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
249 	} while (status != 0);
250 
251 	return rsp;
252 }
253 
254 int
255 thread_pipeline_enable(uint32_t thread_id,
256 	const char *pipeline_name)
257 {
258 	struct pipeline *p = pipeline_find(pipeline_name);
259 	struct thread *t;
260 	struct thread_msg_req *req;
261 	struct thread_msg_rsp *rsp;
262 	uint32_t i;
263 	int status;
264 
265 	/* Check input params */
266 	if ((thread_id >= RTE_MAX_LCORE) ||
267 		(p == NULL) ||
268 		(p->n_ports_in == 0) ||
269 		(p->n_ports_out == 0) ||
270 		(p->n_tables == 0))
271 		return -1;
272 
273 	t = &thread[thread_id];
274 	if ((t->enabled == 0) ||
275 		p->enabled)
276 		return -1;
277 
278 	if (!thread_is_running(thread_id)) {
279 		struct thread_data *td = &thread_data[thread_id];
280 		struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
281 
282 		if (td->n_pipelines >= THREAD_PIPELINES_MAX)
283 			return -1;
284 
285 		/* Data plane thread */
286 		td->p[td->n_pipelines] = p->p;
287 
288 		tdp->p = p->p;
289 		for (i = 0; i < p->n_tables; i++)
290 			tdp->table_data[i].a = p->table[i].a;
291 
292 		tdp->n_tables = p->n_tables;
293 
294 		tdp->msgq_req = p->msgq_req;
295 		tdp->msgq_rsp = p->msgq_rsp;
296 		tdp->timer_period = (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
297 		tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
298 
299 		td->n_pipelines++;
300 
301 		/* Pipeline */
302 		p->thread_id = thread_id;
303 		p->enabled = 1;
304 
305 		return 0;
306 	}
307 
308 	/* Allocate request */
309 	req = thread_msg_alloc();
310 	if (req == NULL)
311 		return -1;
312 
313 	/* Write request */
314 	req->type = THREAD_REQ_PIPELINE_ENABLE;
315 	req->pipeline_enable.p = p->p;
316 	for (i = 0; i < p->n_tables; i++)
317 		req->pipeline_enable.table[i].a =
318 			p->table[i].a;
319 	req->pipeline_enable.msgq_req = p->msgq_req;
320 	req->pipeline_enable.msgq_rsp = p->msgq_rsp;
321 	req->pipeline_enable.timer_period_ms = p->timer_period_ms;
322 	req->pipeline_enable.n_tables = p->n_tables;
323 
324 	/* Send request and wait for response */
325 	rsp = thread_msg_send_recv(thread_id, req);
326 
327 	/* Read response */
328 	status = rsp->status;
329 
330 	/* Free response */
331 	thread_msg_free(rsp);
332 
333 	/* Request completion */
334 	if (status)
335 		return status;
336 
337 	p->thread_id = thread_id;
338 	p->enabled = 1;
339 
340 	return 0;
341 }
342 
343 int
344 thread_pipeline_disable(uint32_t thread_id,
345 	const char *pipeline_name)
346 {
347 	struct pipeline *p = pipeline_find(pipeline_name);
348 	struct thread *t;
349 	struct thread_msg_req *req;
350 	struct thread_msg_rsp *rsp;
351 	int status;
352 
353 	/* Check input params */
354 	if ((thread_id >= RTE_MAX_LCORE) ||
355 		(p == NULL))
356 		return -1;
357 
358 	t = &thread[thread_id];
359 	if (t->enabled == 0)
360 		return -1;
361 
362 	if (p->enabled == 0)
363 		return 0;
364 
365 	if (p->thread_id != thread_id)
366 		return -1;
367 
368 	if (!thread_is_running(thread_id)) {
369 		struct thread_data *td = &thread_data[thread_id];
370 		uint32_t i;
371 
372 		for (i = 0; i < td->n_pipelines; i++) {
373 			struct pipeline_data *tdp = &td->pipeline_data[i];
374 
375 			if (tdp->p != p->p)
376 				continue;
377 
378 			/* Data plane thread */
379 			if (i < td->n_pipelines - 1) {
380 				struct rte_pipeline *pipeline_last =
381 					td->p[td->n_pipelines - 1];
382 				struct pipeline_data *tdp_last =
383 					&td->pipeline_data[td->n_pipelines - 1];
384 
385 				td->p[i] = pipeline_last;
386 				memcpy(tdp, tdp_last, sizeof(*tdp));
387 			}
388 
389 			td->n_pipelines--;
390 
391 			/* Pipeline */
392 			p->enabled = 0;
393 
394 			break;
395 		}
396 
397 		return 0;
398 	}
399 
400 	/* Allocate request */
401 	req = thread_msg_alloc();
402 	if (req == NULL)
403 		return -1;
404 
405 	/* Write request */
406 	req->type = THREAD_REQ_PIPELINE_DISABLE;
407 	req->pipeline_disable.p = p->p;
408 
409 	/* Send request and wait for response */
410 	rsp = thread_msg_send_recv(thread_id, req);
411 
412 	/* Read response */
413 	status = rsp->status;
414 
415 	/* Free response */
416 	thread_msg_free(rsp);
417 
418 	/* Request completion */
419 	if (status)
420 		return status;
421 
422 	p->enabled = 0;
423 
424 	return 0;
425 }
426 
427 /**
428  * Data plane threads: message handling
429  */
430 static inline struct thread_msg_req *
431 thread_msg_recv(struct rte_ring *msgq_req)
432 {
433 	struct thread_msg_req *req;
434 
435 	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
436 
437 	if (status != 0)
438 		return NULL;
439 
440 	return req;
441 }
442 
443 static inline void
444 thread_msg_send(struct rte_ring *msgq_rsp,
445 	struct thread_msg_rsp *rsp)
446 {
447 	int status;
448 
449 	do {
450 		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
451 	} while (status == -ENOBUFS);
452 }
453 
454 static struct thread_msg_rsp *
455 thread_msg_handle_pipeline_enable(struct thread_data *t,
456 	struct thread_msg_req *req)
457 {
458 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
459 	struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
460 	uint32_t i;
461 
462 	/* Request */
463 	if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
464 		rsp->status = -1;
465 		return rsp;
466 	}
467 
468 	t->p[t->n_pipelines] = req->pipeline_enable.p;
469 
470 	p->p = req->pipeline_enable.p;
471 	for (i = 0; i < req->pipeline_enable.n_tables; i++)
472 		p->table_data[i].a =
473 			req->pipeline_enable.table[i].a;
474 
475 	p->n_tables = req->pipeline_enable.n_tables;
476 
477 	p->msgq_req = req->pipeline_enable.msgq_req;
478 	p->msgq_rsp = req->pipeline_enable.msgq_rsp;
479 	p->timer_period =
480 		(rte_get_tsc_hz() * req->pipeline_enable.timer_period_ms) / 1000;
481 	p->time_next = rte_get_tsc_cycles() + p->timer_period;
482 
483 	t->n_pipelines++;
484 
485 	/* Response */
486 	rsp->status = 0;
487 	return rsp;
488 }
489 
490 static struct thread_msg_rsp *
491 thread_msg_handle_pipeline_disable(struct thread_data *t,
492 	struct thread_msg_req *req)
493 {
494 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
495 	uint32_t n_pipelines = t->n_pipelines;
496 	struct rte_pipeline *pipeline = req->pipeline_disable.p;
497 	uint32_t i;
498 
499 	/* find pipeline */
500 	for (i = 0; i < n_pipelines; i++) {
501 		struct pipeline_data *p = &t->pipeline_data[i];
502 
503 		if (p->p != pipeline)
504 			continue;
505 
506 		if (i < n_pipelines - 1) {
507 			struct rte_pipeline *pipeline_last =
508 				t->p[n_pipelines - 1];
509 			struct pipeline_data *p_last =
510 				&t->pipeline_data[n_pipelines - 1];
511 
512 			t->p[i] = pipeline_last;
513 			memcpy(p, p_last, sizeof(*p));
514 		}
515 
516 		t->n_pipelines--;
517 
518 		rsp->status = 0;
519 		return rsp;
520 	}
521 
522 	/* should not get here */
523 	rsp->status = 0;
524 	return rsp;
525 }
526 
527 static void
528 thread_msg_handle(struct thread_data *t)
529 {
530 	for ( ; ; ) {
531 		struct thread_msg_req *req;
532 		struct thread_msg_rsp *rsp;
533 
534 		req = thread_msg_recv(t->msgq_req);
535 		if (req == NULL)
536 			break;
537 
538 		switch (req->type) {
539 		case THREAD_REQ_PIPELINE_ENABLE:
540 			rsp = thread_msg_handle_pipeline_enable(t, req);
541 			break;
542 
543 		case THREAD_REQ_PIPELINE_DISABLE:
544 			rsp = thread_msg_handle_pipeline_disable(t, req);
545 			break;
546 
547 		default:
548 			rsp = (struct thread_msg_rsp *) req;
549 			rsp->status = -1;
550 		}
551 
552 		thread_msg_send(t->msgq_rsp, rsp);
553 	}
554 }
555 
556 /**
557  * Main thread & data plane threads: message passing
558  */
559 enum pipeline_req_type {
560 	/* Port IN */
561 	PIPELINE_REQ_PORT_IN_STATS_READ,
562 	PIPELINE_REQ_PORT_IN_ENABLE,
563 	PIPELINE_REQ_PORT_IN_DISABLE,
564 
565 	/* Port OUT */
566 	PIPELINE_REQ_PORT_OUT_STATS_READ,
567 
568 	/* Table */
569 	PIPELINE_REQ_TABLE_STATS_READ,
570 	PIPELINE_REQ_TABLE_RULE_ADD,
571 	PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT,
572 	PIPELINE_REQ_TABLE_RULE_ADD_BULK,
573 	PIPELINE_REQ_TABLE_RULE_DELETE,
574 	PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT,
575 	PIPELINE_REQ_TABLE_RULE_STATS_READ,
576 	PIPELINE_REQ_TABLE_MTR_PROFILE_ADD,
577 	PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE,
578 	PIPELINE_REQ_TABLE_RULE_MTR_READ,
579 	PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE,
580 	PIPELINE_REQ_TABLE_RULE_TTL_READ,
581 	PIPELINE_REQ_TABLE_RULE_TIME_READ,
582 	PIPELINE_REQ_MAX
583 };
584 
585 struct pipeline_msg_req_port_in_stats_read {
586 	int clear;
587 };
588 
589 struct pipeline_msg_req_port_out_stats_read {
590 	int clear;
591 };
592 
593 struct pipeline_msg_req_table_stats_read {
594 	int clear;
595 };
596 
597 struct pipeline_msg_req_table_rule_add {
598 	struct table_rule_match match;
599 	struct table_rule_action action;
600 };
601 
602 struct pipeline_msg_req_table_rule_add_default {
603 	struct table_rule_action action;
604 };
605 
606 struct pipeline_msg_req_table_rule_add_bulk {
607 	struct table_rule_list *list;
608 	int bulk;
609 };
610 
611 struct pipeline_msg_req_table_rule_delete {
612 	struct table_rule_match match;
613 };
614 
615 struct pipeline_msg_req_table_rule_stats_read {
616 	void *data;
617 	int clear;
618 };
619 
620 struct pipeline_msg_req_table_mtr_profile_add {
621 	uint32_t meter_profile_id;
622 	struct rte_table_action_meter_profile profile;
623 };
624 
625 struct pipeline_msg_req_table_mtr_profile_delete {
626 	uint32_t meter_profile_id;
627 };
628 
629 struct pipeline_msg_req_table_rule_mtr_read {
630 	void *data;
631 	uint32_t tc_mask;
632 	int clear;
633 };
634 
635 struct pipeline_msg_req_table_dscp_table_update {
636 	uint64_t dscp_mask;
637 	struct rte_table_action_dscp_table dscp_table;
638 };
639 
640 struct pipeline_msg_req_table_rule_ttl_read {
641 	void *data;
642 	int clear;
643 };
644 
645 struct pipeline_msg_req_table_rule_time_read {
646 	void *data;
647 };
648 
649 struct pipeline_msg_req {
650 	enum pipeline_req_type type;
651 	uint32_t id; /* Port IN, port OUT or table ID */
652 
653 	RTE_STD_C11
654 	union {
655 		struct pipeline_msg_req_port_in_stats_read port_in_stats_read;
656 		struct pipeline_msg_req_port_out_stats_read port_out_stats_read;
657 		struct pipeline_msg_req_table_stats_read table_stats_read;
658 		struct pipeline_msg_req_table_rule_add table_rule_add;
659 		struct pipeline_msg_req_table_rule_add_default table_rule_add_default;
660 		struct pipeline_msg_req_table_rule_add_bulk table_rule_add_bulk;
661 		struct pipeline_msg_req_table_rule_delete table_rule_delete;
662 		struct pipeline_msg_req_table_rule_stats_read table_rule_stats_read;
663 		struct pipeline_msg_req_table_mtr_profile_add table_mtr_profile_add;
664 		struct pipeline_msg_req_table_mtr_profile_delete table_mtr_profile_delete;
665 		struct pipeline_msg_req_table_rule_mtr_read table_rule_mtr_read;
666 		struct pipeline_msg_req_table_dscp_table_update table_dscp_table_update;
667 		struct pipeline_msg_req_table_rule_ttl_read table_rule_ttl_read;
668 		struct pipeline_msg_req_table_rule_time_read table_rule_time_read;
669 	};
670 };
671 
672 struct pipeline_msg_rsp_port_in_stats_read {
673 	struct rte_pipeline_port_in_stats stats;
674 };
675 
676 struct pipeline_msg_rsp_port_out_stats_read {
677 	struct rte_pipeline_port_out_stats stats;
678 };
679 
680 struct pipeline_msg_rsp_table_stats_read {
681 	struct rte_pipeline_table_stats stats;
682 };
683 
684 struct pipeline_msg_rsp_table_rule_add {
685 	void *data;
686 };
687 
688 struct pipeline_msg_rsp_table_rule_add_default {
689 	void *data;
690 };
691 
692 struct pipeline_msg_rsp_table_rule_add_bulk {
693 	uint32_t n_rules;
694 };
695 
696 struct pipeline_msg_rsp_table_rule_stats_read {
697 	struct rte_table_action_stats_counters stats;
698 };
699 
700 struct pipeline_msg_rsp_table_rule_mtr_read {
701 	struct rte_table_action_mtr_counters stats;
702 };
703 
704 struct pipeline_msg_rsp_table_rule_ttl_read {
705 	struct rte_table_action_ttl_counters stats;
706 };
707 
708 struct pipeline_msg_rsp_table_rule_time_read {
709 	uint64_t timestamp;
710 };
711 
712 struct pipeline_msg_rsp {
713 	int status;
714 
715 	RTE_STD_C11
716 	union {
717 		struct pipeline_msg_rsp_port_in_stats_read port_in_stats_read;
718 		struct pipeline_msg_rsp_port_out_stats_read port_out_stats_read;
719 		struct pipeline_msg_rsp_table_stats_read table_stats_read;
720 		struct pipeline_msg_rsp_table_rule_add table_rule_add;
721 		struct pipeline_msg_rsp_table_rule_add_default table_rule_add_default;
722 		struct pipeline_msg_rsp_table_rule_add_bulk table_rule_add_bulk;
723 		struct pipeline_msg_rsp_table_rule_stats_read table_rule_stats_read;
724 		struct pipeline_msg_rsp_table_rule_mtr_read table_rule_mtr_read;
725 		struct pipeline_msg_rsp_table_rule_ttl_read table_rule_ttl_read;
726 		struct pipeline_msg_rsp_table_rule_time_read table_rule_time_read;
727 	};
728 };
729 
730 /**
731  * Main thread
732  */
733 static struct pipeline_msg_req *
734 pipeline_msg_alloc(void)
735 {
736 	size_t size = RTE_MAX(sizeof(struct pipeline_msg_req),
737 		sizeof(struct pipeline_msg_rsp));
738 
739 	return calloc(1, size);
740 }
741 
742 static void
743 pipeline_msg_free(struct pipeline_msg_rsp *rsp)
744 {
745 	free(rsp);
746 }
747 
748 static struct pipeline_msg_rsp *
749 pipeline_msg_send_recv(struct pipeline *p,
750 	struct pipeline_msg_req *req)
751 {
752 	struct rte_ring *msgq_req = p->msgq_req;
753 	struct rte_ring *msgq_rsp = p->msgq_rsp;
754 	struct pipeline_msg_rsp *rsp;
755 	int status;
756 
757 	/* send */
758 	do {
759 		status = rte_ring_sp_enqueue(msgq_req, req);
760 	} while (status == -ENOBUFS);
761 
762 	/* recv */
763 	do {
764 		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
765 	} while (status != 0);
766 
767 	return rsp;
768 }
769 
770 int
771 pipeline_port_in_stats_read(const char *pipeline_name,
772 	uint32_t port_id,
773 	struct rte_pipeline_port_in_stats *stats,
774 	int clear)
775 {
776 	struct pipeline *p;
777 	struct pipeline_msg_req *req;
778 	struct pipeline_msg_rsp *rsp;
779 	int status;
780 
781 	/* Check input params */
782 	if ((pipeline_name == NULL) ||
783 		(stats == NULL))
784 		return -1;
785 
786 	p = pipeline_find(pipeline_name);
787 	if ((p == NULL) ||
788 		(port_id >= p->n_ports_in))
789 		return -1;
790 
791 	if (!pipeline_is_running(p)) {
792 		status = rte_pipeline_port_in_stats_read(p->p,
793 			port_id,
794 			stats,
795 			clear);
796 
797 		return status;
798 	}
799 
800 	/* Allocate request */
801 	req = pipeline_msg_alloc();
802 	if (req == NULL)
803 		return -1;
804 
805 	/* Write request */
806 	req->type = PIPELINE_REQ_PORT_IN_STATS_READ;
807 	req->id = port_id;
808 	req->port_in_stats_read.clear = clear;
809 
810 	/* Send request and wait for response */
811 	rsp = pipeline_msg_send_recv(p, req);
812 
813 	/* Read response */
814 	status = rsp->status;
815 	if (status == 0)
816 		memcpy(stats, &rsp->port_in_stats_read.stats, sizeof(*stats));
817 
818 	/* Free response */
819 	pipeline_msg_free(rsp);
820 
821 	return status;
822 }
823 
824 int
825 pipeline_port_in_enable(const char *pipeline_name,
826 	uint32_t port_id)
827 {
828 	struct pipeline *p;
829 	struct pipeline_msg_req *req;
830 	struct pipeline_msg_rsp *rsp;
831 	int status;
832 
833 	/* Check input params */
834 	if (pipeline_name == NULL)
835 		return -1;
836 
837 	p = pipeline_find(pipeline_name);
838 	if ((p == NULL) ||
839 		(port_id >= p->n_ports_in))
840 		return -1;
841 
842 	if (!pipeline_is_running(p)) {
843 		status = rte_pipeline_port_in_enable(p->p, port_id);
844 		return status;
845 	}
846 
847 	/* Allocate request */
848 	req = pipeline_msg_alloc();
849 	if (req == NULL)
850 		return -1;
851 
852 	/* Write request */
853 	req->type = PIPELINE_REQ_PORT_IN_ENABLE;
854 	req->id = port_id;
855 
856 	/* Send request and wait for response */
857 	rsp = pipeline_msg_send_recv(p, req);
858 
859 	/* Read response */
860 	status = rsp->status;
861 
862 	/* Free response */
863 	pipeline_msg_free(rsp);
864 
865 	return status;
866 }
867 
868 int
869 pipeline_port_in_disable(const char *pipeline_name,
870 	uint32_t port_id)
871 {
872 	struct pipeline *p;
873 	struct pipeline_msg_req *req;
874 	struct pipeline_msg_rsp *rsp;
875 	int status;
876 
877 	/* Check input params */
878 	if (pipeline_name == NULL)
879 		return -1;
880 
881 	p = pipeline_find(pipeline_name);
882 	if ((p == NULL) ||
883 		(port_id >= p->n_ports_in))
884 		return -1;
885 
886 	if (!pipeline_is_running(p)) {
887 		status = rte_pipeline_port_in_disable(p->p, port_id);
888 		return status;
889 	}
890 
891 	/* Allocate request */
892 	req = pipeline_msg_alloc();
893 	if (req == NULL)
894 		return -1;
895 
896 	/* Write request */
897 	req->type = PIPELINE_REQ_PORT_IN_DISABLE;
898 	req->id = port_id;
899 
900 	/* Send request and wait for response */
901 	rsp = pipeline_msg_send_recv(p, req);
902 
903 	/* Read response */
904 	status = rsp->status;
905 
906 	/* Free response */
907 	pipeline_msg_free(rsp);
908 
909 	return status;
910 }
911 
912 int
913 pipeline_port_out_stats_read(const char *pipeline_name,
914 	uint32_t port_id,
915 	struct rte_pipeline_port_out_stats *stats,
916 	int clear)
917 {
918 	struct pipeline *p;
919 	struct pipeline_msg_req *req;
920 	struct pipeline_msg_rsp *rsp;
921 	int status;
922 
923 	/* Check input params */
924 	if ((pipeline_name == NULL) ||
925 		(stats == NULL))
926 		return -1;
927 
928 	p = pipeline_find(pipeline_name);
929 	if ((p == NULL) ||
930 		(port_id >= p->n_ports_out))
931 		return -1;
932 
933 	if (!pipeline_is_running(p)) {
934 		status = rte_pipeline_port_out_stats_read(p->p,
935 			port_id,
936 			stats,
937 			clear);
938 
939 		return status;
940 	}
941 
942 	/* Allocate request */
943 	req = pipeline_msg_alloc();
944 	if (req == NULL)
945 		return -1;
946 
947 	/* Write request */
948 	req->type = PIPELINE_REQ_PORT_OUT_STATS_READ;
949 	req->id = port_id;
950 	req->port_out_stats_read.clear = clear;
951 
952 	/* Send request and wait for response */
953 	rsp = pipeline_msg_send_recv(p, req);
954 
955 	/* Read response */
956 	status = rsp->status;
957 	if (status == 0)
958 		memcpy(stats, &rsp->port_out_stats_read.stats, sizeof(*stats));
959 
960 	/* Free response */
961 	pipeline_msg_free(rsp);
962 
963 	return status;
964 }
965 
966 int
967 pipeline_table_stats_read(const char *pipeline_name,
968 	uint32_t table_id,
969 	struct rte_pipeline_table_stats *stats,
970 	int clear)
971 {
972 	struct pipeline *p;
973 	struct pipeline_msg_req *req;
974 	struct pipeline_msg_rsp *rsp;
975 	int status;
976 
977 	/* Check input params */
978 	if ((pipeline_name == NULL) ||
979 		(stats == NULL))
980 		return -1;
981 
982 	p = pipeline_find(pipeline_name);
983 	if ((p == NULL) ||
984 		(table_id >= p->n_tables))
985 		return -1;
986 
987 	if (!pipeline_is_running(p)) {
988 		status = rte_pipeline_table_stats_read(p->p,
989 			table_id,
990 			stats,
991 			clear);
992 
993 		return status;
994 	}
995 
996 	/* Allocate request */
997 	req = pipeline_msg_alloc();
998 	if (req == NULL)
999 		return -1;
1000 
1001 	/* Write request */
1002 	req->type = PIPELINE_REQ_TABLE_STATS_READ;
1003 	req->id = table_id;
1004 	req->table_stats_read.clear = clear;
1005 
1006 	/* Send request and wait for response */
1007 	rsp = pipeline_msg_send_recv(p, req);
1008 
1009 	/* Read response */
1010 	status = rsp->status;
1011 	if (status == 0)
1012 		memcpy(stats, &rsp->table_stats_read.stats, sizeof(*stats));
1013 
1014 	/* Free response */
1015 	pipeline_msg_free(rsp);
1016 
1017 	return status;
1018 }
1019 
1020 static int
1021 match_check(struct table_rule_match *match,
1022 	struct pipeline *p,
1023 	uint32_t table_id)
1024 {
1025 	struct table *table;
1026 
1027 	if ((match == NULL) ||
1028 		(p == NULL) ||
1029 		(table_id >= p->n_tables))
1030 		return -1;
1031 
1032 	table = &p->table[table_id];
1033 	if (match->match_type != table->params.match_type)
1034 		return -1;
1035 
1036 	switch (match->match_type) {
1037 	case TABLE_ACL:
1038 	{
1039 		struct table_acl_params *t = &table->params.match.acl;
1040 		struct table_rule_match_acl *r = &match->match.acl;
1041 
1042 		if ((r->ip_version && (t->ip_version == 0)) ||
1043 			((r->ip_version == 0) && t->ip_version))
1044 			return -1;
1045 
1046 		if (r->ip_version) {
1047 			if ((r->sa_depth > 32) ||
1048 				(r->da_depth > 32))
1049 				return -1;
1050 		} else {
1051 			if ((r->sa_depth > 128) ||
1052 				(r->da_depth > 128))
1053 				return -1;
1054 		}
1055 		return 0;
1056 	}
1057 
1058 	case TABLE_ARRAY:
1059 		return 0;
1060 
1061 	case TABLE_HASH:
1062 		return 0;
1063 
1064 	case TABLE_LPM:
1065 	{
1066 		struct table_lpm_params *t = &table->params.match.lpm;
1067 		struct table_rule_match_lpm *r = &match->match.lpm;
1068 
1069 		if ((r->ip_version && (t->key_size != 4)) ||
1070 			((r->ip_version == 0) && (t->key_size != 16)))
1071 			return -1;
1072 
1073 		if (r->ip_version) {
1074 			if (r->depth > 32)
1075 				return -1;
1076 		} else {
1077 			if (r->depth > 128)
1078 				return -1;
1079 		}
1080 		return 0;
1081 	}
1082 
1083 	case TABLE_STUB:
1084 		return -1;
1085 
1086 	default:
1087 		return -1;
1088 	}
1089 }
1090 
1091 static int
1092 action_check(struct table_rule_action *action,
1093 	struct pipeline *p,
1094 	uint32_t table_id)
1095 {
1096 	struct table_action_profile *ap;
1097 
1098 	if ((action == NULL) ||
1099 		(p == NULL) ||
1100 		(table_id >= p->n_tables))
1101 		return -1;
1102 
1103 	ap = p->table[table_id].ap;
1104 	if (action->action_mask != ap->params.action_mask)
1105 		return -1;
1106 
1107 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1108 		if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1109 			(action->fwd.id >= p->n_ports_out))
1110 			return -1;
1111 
1112 		if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1113 			(action->fwd.id >= p->n_tables))
1114 			return -1;
1115 	}
1116 
1117 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
1118 		uint32_t tc_mask0 = (1 << ap->params.mtr.n_tc) - 1;
1119 		uint32_t tc_mask1 = action->mtr.tc_mask;
1120 
1121 		if (tc_mask1 != tc_mask0)
1122 			return -1;
1123 	}
1124 
1125 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
1126 		uint32_t n_subports_per_port =
1127 			ap->params.tm.n_subports_per_port;
1128 		uint32_t n_pipes_per_subport =
1129 			ap->params.tm.n_pipes_per_subport;
1130 		uint32_t subport_id = action->tm.subport_id;
1131 		uint32_t pipe_id = action->tm.pipe_id;
1132 
1133 		if ((subport_id >= n_subports_per_port) ||
1134 			(pipe_id >= n_pipes_per_subport))
1135 			return -1;
1136 	}
1137 
1138 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
1139 		uint64_t encap_mask = ap->params.encap.encap_mask;
1140 		enum rte_table_action_encap_type type = action->encap.type;
1141 
1142 		if ((encap_mask & (1LLU << type)) == 0)
1143 			return -1;
1144 	}
1145 
1146 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
1147 		int ip_version0 = ap->params.common.ip_version;
1148 		int ip_version1 = action->nat.ip_version;
1149 
1150 		if ((ip_version1 && (ip_version0 == 0)) ||
1151 			((ip_version1 == 0) && ip_version0))
1152 			return -1;
1153 	}
1154 
1155 	return 0;
1156 }
1157 
1158 static int
1159 action_default_check(struct table_rule_action *action,
1160 	struct pipeline *p,
1161 	uint32_t table_id)
1162 {
1163 	if ((action == NULL) ||
1164 		(action->action_mask != (1LLU << RTE_TABLE_ACTION_FWD)) ||
1165 		(p == NULL) ||
1166 		(table_id >= p->n_tables))
1167 		return -1;
1168 
1169 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
1170 		if ((action->fwd.action == RTE_PIPELINE_ACTION_PORT) &&
1171 			(action->fwd.id >= p->n_ports_out))
1172 			return -1;
1173 
1174 		if ((action->fwd.action == RTE_PIPELINE_ACTION_TABLE) &&
1175 			(action->fwd.id >= p->n_tables))
1176 			return -1;
1177 	}
1178 
1179 	return 0;
1180 }
1181 
1182 union table_rule_match_low_level {
1183 	struct rte_table_acl_rule_add_params acl_add;
1184 	struct rte_table_acl_rule_delete_params acl_delete;
1185 	struct rte_table_array_key array;
1186 	uint8_t hash[TABLE_RULE_MATCH_SIZE_MAX];
1187 	struct rte_table_lpm_key lpm_ipv4;
1188 	struct rte_table_lpm_ipv6_key lpm_ipv6;
1189 };
1190 
1191 static int
1192 match_convert(struct table_rule_match *mh,
1193 	union table_rule_match_low_level *ml,
1194 	int add);
1195 
1196 static int
1197 action_convert(struct rte_table_action *a,
1198 	struct table_rule_action *action,
1199 	struct rte_pipeline_table_entry *data);
1200 
1201 struct table_ll {
1202 	struct rte_pipeline *p;
1203 	int table_id;
1204 	struct rte_table_action *a;
1205 	int bulk_supported;
1206 };
1207 
1208 static int
1209 table_rule_add_bulk_ll(struct table_ll *table,
1210 	struct table_rule_list *list,
1211 	uint32_t *n_rules)
1212 {
1213 	union table_rule_match_low_level *match_ll = NULL;
1214 	uint8_t *action_ll = NULL;
1215 	void **match_ll_ptr = NULL;
1216 	struct rte_pipeline_table_entry **action_ll_ptr = NULL;
1217 	struct rte_pipeline_table_entry **entries_ptr = NULL;
1218 	int *found = NULL;
1219 	struct table_rule *rule;
1220 	uint32_t n, i;
1221 	int status = 0;
1222 
1223 	n = 0;
1224 	TAILQ_FOREACH(rule, list, node)
1225 		n++;
1226 
1227 	/* Memory allocation */
1228 	match_ll = calloc(n, sizeof(union table_rule_match_low_level));
1229 	action_ll = calloc(n, TABLE_RULE_ACTION_SIZE_MAX);
1230 
1231 	match_ll_ptr = calloc(n, sizeof(void *));
1232 	action_ll_ptr = calloc(n, sizeof(struct rte_pipeline_table_entry *));
1233 
1234 	entries_ptr = calloc(n, sizeof(struct rte_pipeline_table_entry *));
1235 	found = calloc(n, sizeof(int));
1236 
1237 	if (match_ll == NULL ||
1238 		action_ll == NULL ||
1239 		match_ll_ptr == NULL ||
1240 		action_ll_ptr == NULL ||
1241 		entries_ptr == NULL ||
1242 		found == NULL) {
1243 			status = -ENOMEM;
1244 			goto table_rule_add_bulk_ll_free;
1245 	}
1246 
1247 	/* Init */
1248 	for (i = 0; i < n; i++) {
1249 		match_ll_ptr[i] = (void *)&match_ll[i];
1250 		action_ll_ptr[i] = (struct rte_pipeline_table_entry *)
1251 			&action_ll[i * TABLE_RULE_ACTION_SIZE_MAX];
1252 	}
1253 
1254 	/* Rule (match, action) conversion */
1255 	i = 0;
1256 	TAILQ_FOREACH(rule, list, node) {
1257 		status = match_convert(&rule->match, match_ll_ptr[i], 1);
1258 		if (status)
1259 			goto table_rule_add_bulk_ll_free;
1260 
1261 		status = action_convert(table->a, &rule->action, action_ll_ptr[i]);
1262 		if (status)
1263 			goto table_rule_add_bulk_ll_free;
1264 
1265 		i++;
1266 	}
1267 
1268 	/* Add rule (match, action) to table */
1269 	if (table->bulk_supported) {
1270 		status = rte_pipeline_table_entry_add_bulk(table->p,
1271 			table->table_id,
1272 			match_ll_ptr,
1273 			action_ll_ptr,
1274 			n,
1275 			found,
1276 			entries_ptr);
1277 		if (status)
1278 			goto table_rule_add_bulk_ll_free;
1279 	} else
1280 		for (i = 0; i < n; i++) {
1281 			status = rte_pipeline_table_entry_add(table->p,
1282 				table->table_id,
1283 				match_ll_ptr[i],
1284 				action_ll_ptr[i],
1285 				&found[i],
1286 				&entries_ptr[i]);
1287 			if (status) {
1288 				if (i == 0)
1289 					goto table_rule_add_bulk_ll_free;
1290 
1291 				/* No roll-back. */
1292 				status = 0;
1293 				n = i;
1294 				break;
1295 			}
1296 		}
1297 
1298 	/* Write back to the rule list. */
1299 	i = 0;
1300 	TAILQ_FOREACH(rule, list, node) {
1301 		if (i >= n)
1302 			break;
1303 
1304 		rule->data = entries_ptr[i];
1305 
1306 		i++;
1307 	}
1308 
1309 	*n_rules = n;
1310 
1311 	/* Free */
1312 table_rule_add_bulk_ll_free:
1313 	free(found);
1314 	free(entries_ptr);
1315 	free(action_ll_ptr);
1316 	free(match_ll_ptr);
1317 	free(action_ll);
1318 	free(match_ll);
1319 
1320 	return status;
1321 }
1322 
1323 int
1324 pipeline_table_rule_add(const char *pipeline_name,
1325 	uint32_t table_id,
1326 	struct table_rule_match *match,
1327 	struct table_rule_action *action)
1328 {
1329 	struct pipeline *p;
1330 	struct table *table;
1331 	struct pipeline_msg_req *req;
1332 	struct pipeline_msg_rsp *rsp;
1333 	struct table_rule *rule;
1334 	int status;
1335 
1336 	/* Check input params */
1337 	if ((pipeline_name == NULL) ||
1338 		(match == NULL) ||
1339 		(action == NULL))
1340 		return -1;
1341 
1342 	p = pipeline_find(pipeline_name);
1343 	if ((p == NULL) ||
1344 		(table_id >= p->n_tables) ||
1345 		match_check(match, p, table_id) ||
1346 		action_check(action, p, table_id))
1347 		return -1;
1348 
1349 	table = &p->table[table_id];
1350 
1351 	rule = calloc(1, sizeof(struct table_rule));
1352 	if (rule == NULL)
1353 		return -1;
1354 
1355 	memcpy(&rule->match, match, sizeof(*match));
1356 	memcpy(&rule->action, action, sizeof(*action));
1357 
1358 	if (!pipeline_is_running(p)) {
1359 		union table_rule_match_low_level match_ll;
1360 		struct rte_pipeline_table_entry *data_in, *data_out;
1361 		int key_found;
1362 		uint8_t *buffer;
1363 
1364 		buffer = calloc(TABLE_RULE_ACTION_SIZE_MAX, sizeof(uint8_t));
1365 		if (buffer == NULL) {
1366 			free(rule);
1367 			return -1;
1368 		}
1369 
1370 		/* Table match-action rule conversion */
1371 		data_in = (struct rte_pipeline_table_entry *)buffer;
1372 
1373 		status = match_convert(match, &match_ll, 1);
1374 		if (status) {
1375 			free(buffer);
1376 			free(rule);
1377 			return -1;
1378 		}
1379 
1380 		status = action_convert(table->a, action, data_in);
1381 		if (status) {
1382 			free(buffer);
1383 			free(rule);
1384 			return -1;
1385 		}
1386 
1387 		/* Add rule (match, action) to table */
1388 		status = rte_pipeline_table_entry_add(p->p,
1389 				table_id,
1390 				&match_ll,
1391 				data_in,
1392 				&key_found,
1393 				&data_out);
1394 		if (status) {
1395 			free(buffer);
1396 			free(rule);
1397 			return -1;
1398 		}
1399 
1400 		/* Write Response */
1401 		rule->data = data_out;
1402 		table_rule_add(table, rule);
1403 
1404 		free(buffer);
1405 		return 0;
1406 	}
1407 
1408 	/* Allocate request */
1409 	req = pipeline_msg_alloc();
1410 	if (req == NULL) {
1411 		free(rule);
1412 		return -1;
1413 	}
1414 
1415 	/* Write request */
1416 	req->type = PIPELINE_REQ_TABLE_RULE_ADD;
1417 	req->id = table_id;
1418 	memcpy(&req->table_rule_add.match, match, sizeof(*match));
1419 	memcpy(&req->table_rule_add.action, action, sizeof(*action));
1420 
1421 	/* Send request and wait for response */
1422 	rsp = pipeline_msg_send_recv(p, req);
1423 
1424 	/* Read response */
1425 	status = rsp->status;
1426 	if (status == 0) {
1427 		rule->data = rsp->table_rule_add.data;
1428 		table_rule_add(table, rule);
1429 	} else
1430 		free(rule);
1431 
1432 	/* Free response */
1433 	pipeline_msg_free(rsp);
1434 
1435 	return status;
1436 }
1437 
1438 int
1439 pipeline_table_rule_add_default(const char *pipeline_name,
1440 	uint32_t table_id,
1441 	struct table_rule_action *action)
1442 {
1443 	struct pipeline *p;
1444 	struct table *table;
1445 	struct pipeline_msg_req *req;
1446 	struct pipeline_msg_rsp *rsp;
1447 	struct table_rule *rule;
1448 	int status;
1449 
1450 	/* Check input params */
1451 	if ((pipeline_name == NULL) ||
1452 		(action == NULL))
1453 		return -1;
1454 
1455 	p = pipeline_find(pipeline_name);
1456 	if ((p == NULL) ||
1457 		(table_id >= p->n_tables) ||
1458 		action_default_check(action, p, table_id))
1459 		return -1;
1460 
1461 	table = &p->table[table_id];
1462 
1463 	rule = calloc(1, sizeof(struct table_rule));
1464 	if (rule == NULL)
1465 		return -1;
1466 
1467 	memcpy(&rule->action, action, sizeof(*action));
1468 
1469 	if (!pipeline_is_running(p)) {
1470 		struct rte_pipeline_table_entry *data_in, *data_out;
1471 		uint8_t *buffer;
1472 
1473 		buffer = calloc(TABLE_RULE_ACTION_SIZE_MAX, sizeof(uint8_t));
1474 		if (buffer == NULL) {
1475 			free(rule);
1476 			return -1;
1477 		}
1478 
1479 		/* Apply actions */
1480 		data_in = (struct rte_pipeline_table_entry *)buffer;
1481 
1482 		data_in->action = action->fwd.action;
1483 		if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
1484 			data_in->port_id = action->fwd.id;
1485 		if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
1486 			data_in->table_id = action->fwd.id;
1487 
1488 		/* Add default rule to table */
1489 		status = rte_pipeline_table_default_entry_add(p->p,
1490 				table_id,
1491 				data_in,
1492 				&data_out);
1493 		if (status) {
1494 			free(buffer);
1495 			free(rule);
1496 			return -1;
1497 		}
1498 
1499 		/* Write Response */
1500 		rule->data = data_out;
1501 		table_rule_default_add(table, rule);
1502 
1503 		free(buffer);
1504 		return 0;
1505 	}
1506 
1507 	/* Allocate request */
1508 	req = pipeline_msg_alloc();
1509 	if (req == NULL) {
1510 		free(rule);
1511 		return -1;
1512 	}
1513 
1514 	/* Write request */
1515 	req->type = PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT;
1516 	req->id = table_id;
1517 	memcpy(&req->table_rule_add_default.action, action, sizeof(*action));
1518 
1519 	/* Send request and wait for response */
1520 	rsp = pipeline_msg_send_recv(p, req);
1521 
1522 	/* Read response */
1523 	status = rsp->status;
1524 	if (status == 0) {
1525 		rule->data = rsp->table_rule_add_default.data;
1526 		table_rule_default_add(table, rule);
1527 	} else
1528 		free(rule);
1529 
1530 	/* Free response */
1531 	pipeline_msg_free(rsp);
1532 
1533 	return status;
1534 }
1535 
1536 static uint32_t
1537 table_rule_list_free(struct table_rule_list *list)
1538 {
1539 	uint32_t n = 0;
1540 
1541 	if (!list)
1542 		return 0;
1543 
1544 	for ( ; ; ) {
1545 		struct table_rule *rule;
1546 
1547 		rule = TAILQ_FIRST(list);
1548 		if (rule == NULL)
1549 			break;
1550 
1551 		TAILQ_REMOVE(list, rule, node);
1552 		free(rule);
1553 		n++;
1554 	}
1555 
1556 	free(list);
1557 	return n;
1558 }
1559 
1560 int
1561 pipeline_table_rule_add_bulk(const char *pipeline_name,
1562 	uint32_t table_id,
1563 	struct table_rule_list *list,
1564 	uint32_t *n_rules_added,
1565 	uint32_t *n_rules_not_added)
1566 {
1567 	struct pipeline *p;
1568 	struct table *table;
1569 	struct pipeline_msg_req *req;
1570 	struct pipeline_msg_rsp *rsp;
1571 	struct table_rule *rule;
1572 	int status = 0;
1573 
1574 	/* Check input params */
1575 	if ((pipeline_name == NULL) ||
1576 		(list == NULL) ||
1577 		TAILQ_EMPTY(list) ||
1578 		(n_rules_added == NULL) ||
1579 		(n_rules_not_added == NULL)) {
1580 		table_rule_list_free(list);
1581 		return -EINVAL;
1582 	}
1583 
1584 	p = pipeline_find(pipeline_name);
1585 	if ((p == NULL) ||
1586 		(table_id >= p->n_tables)) {
1587 		table_rule_list_free(list);
1588 		return -EINVAL;
1589 	}
1590 
1591 	table = &p->table[table_id];
1592 
1593 	TAILQ_FOREACH(rule, list, node)
1594 		if (match_check(&rule->match, p, table_id) ||
1595 			action_check(&rule->action, p, table_id)) {
1596 			table_rule_list_free(list);
1597 			return -EINVAL;
1598 		}
1599 
1600 	if (!pipeline_is_running(p)) {
1601 		struct table_ll table_ll = {
1602 			.p = p->p,
1603 			.table_id = table_id,
1604 			.a = table->a,
1605 			.bulk_supported = table->params.match_type == TABLE_ACL,
1606 		};
1607 
1608 		status = table_rule_add_bulk_ll(&table_ll, list, n_rules_added);
1609 		if (status) {
1610 			table_rule_list_free(list);
1611 			return status;
1612 		}
1613 
1614 		table_rule_add_bulk(table, list, *n_rules_added);
1615 		*n_rules_not_added = table_rule_list_free(list);
1616 		return 0;
1617 	}
1618 
1619 	/* Allocate request */
1620 	req = pipeline_msg_alloc();
1621 	if (req == NULL) {
1622 		table_rule_list_free(list);
1623 		return -ENOMEM;
1624 	}
1625 
1626 	/* Write request */
1627 	req->type = PIPELINE_REQ_TABLE_RULE_ADD_BULK;
1628 	req->id = table_id;
1629 	req->table_rule_add_bulk.list = list;
1630 	req->table_rule_add_bulk.bulk = table->params.match_type == TABLE_ACL;
1631 
1632 	/* Send request and wait for response */
1633 	rsp = pipeline_msg_send_recv(p, req);
1634 
1635 	/* Read response */
1636 	status = rsp->status;
1637 	if (status == 0) {
1638 		*n_rules_added = rsp->table_rule_add_bulk.n_rules;
1639 
1640 		table_rule_add_bulk(table, list, *n_rules_added);
1641 		*n_rules_not_added = table_rule_list_free(list);
1642 	} else
1643 		table_rule_list_free(list);
1644 
1645 
1646 	/* Free response */
1647 	pipeline_msg_free(rsp);
1648 
1649 	return status;
1650 }
1651 
1652 int
1653 pipeline_table_rule_delete(const char *pipeline_name,
1654 	uint32_t table_id,
1655 	struct table_rule_match *match)
1656 {
1657 	struct pipeline *p;
1658 	struct table *table;
1659 	struct pipeline_msg_req *req;
1660 	struct pipeline_msg_rsp *rsp;
1661 	int status;
1662 
1663 	/* Check input params */
1664 	if ((pipeline_name == NULL) ||
1665 		(match == NULL))
1666 		return -1;
1667 
1668 	p = pipeline_find(pipeline_name);
1669 	if ((p == NULL) ||
1670 		(table_id >= p->n_tables) ||
1671 		match_check(match, p, table_id))
1672 		return -1;
1673 
1674 	table = &p->table[table_id];
1675 
1676 	if (!pipeline_is_running(p)) {
1677 		union table_rule_match_low_level match_ll;
1678 		int key_found;
1679 
1680 		status = match_convert(match, &match_ll, 0);
1681 		if (status)
1682 			return -1;
1683 
1684 		status = rte_pipeline_table_entry_delete(p->p,
1685 				table_id,
1686 				&match_ll,
1687 				&key_found,
1688 				NULL);
1689 
1690 		if (status == 0)
1691 			table_rule_delete(table, match);
1692 
1693 		return status;
1694 	}
1695 
1696 	/* Allocate request */
1697 	req = pipeline_msg_alloc();
1698 	if (req == NULL)
1699 		return -1;
1700 
1701 	/* Write request */
1702 	req->type = PIPELINE_REQ_TABLE_RULE_DELETE;
1703 	req->id = table_id;
1704 	memcpy(&req->table_rule_delete.match, match, sizeof(*match));
1705 
1706 	/* Send request and wait for response */
1707 	rsp = pipeline_msg_send_recv(p, req);
1708 
1709 	/* Read response */
1710 	status = rsp->status;
1711 	if (status == 0)
1712 		table_rule_delete(table, match);
1713 
1714 	/* Free response */
1715 	pipeline_msg_free(rsp);
1716 
1717 	return status;
1718 }
1719 
1720 int
1721 pipeline_table_rule_delete_default(const char *pipeline_name,
1722 	uint32_t table_id)
1723 {
1724 	struct pipeline *p;
1725 	struct table *table;
1726 	struct pipeline_msg_req *req;
1727 	struct pipeline_msg_rsp *rsp;
1728 	int status;
1729 
1730 	/* Check input params */
1731 	if (pipeline_name == NULL)
1732 		return -1;
1733 
1734 	p = pipeline_find(pipeline_name);
1735 	if ((p == NULL) ||
1736 		(table_id >= p->n_tables))
1737 		return -1;
1738 
1739 	table = &p->table[table_id];
1740 
1741 	if (!pipeline_is_running(p)) {
1742 		status = rte_pipeline_table_default_entry_delete(p->p,
1743 			table_id,
1744 			NULL);
1745 
1746 		if (status == 0)
1747 			table_rule_default_delete(table);
1748 
1749 		return status;
1750 	}
1751 
1752 	/* Allocate request */
1753 	req = pipeline_msg_alloc();
1754 	if (req == NULL)
1755 		return -1;
1756 
1757 	/* Write request */
1758 	req->type = PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT;
1759 	req->id = table_id;
1760 
1761 	/* Send request and wait for response */
1762 	rsp = pipeline_msg_send_recv(p, req);
1763 
1764 	/* Read response */
1765 	status = rsp->status;
1766 	if (status == 0)
1767 		table_rule_default_delete(table);
1768 
1769 	/* Free response */
1770 	pipeline_msg_free(rsp);
1771 
1772 	return status;
1773 }
1774 
1775 int
1776 pipeline_table_rule_stats_read(const char *pipeline_name,
1777 	uint32_t table_id,
1778 	struct table_rule_match *match,
1779 	struct rte_table_action_stats_counters *stats,
1780 	int clear)
1781 {
1782 	struct pipeline *p;
1783 	struct table *table;
1784 	struct pipeline_msg_req *req;
1785 	struct pipeline_msg_rsp *rsp;
1786 	struct table_rule *rule;
1787 	int status;
1788 
1789 	/* Check input params */
1790 	if ((pipeline_name == NULL) ||
1791 		(match == NULL) ||
1792 		(stats == NULL))
1793 		return -1;
1794 
1795 	p = pipeline_find(pipeline_name);
1796 	if ((p == NULL) ||
1797 		(table_id >= p->n_tables) ||
1798 		match_check(match, p, table_id))
1799 		return -1;
1800 
1801 	table = &p->table[table_id];
1802 	rule = table_rule_find(table, match);
1803 	if (rule == NULL)
1804 		return -1;
1805 
1806 	if (!pipeline_is_running(p)) {
1807 		status = rte_table_action_stats_read(table->a,
1808 			rule->data,
1809 			stats,
1810 			clear);
1811 
1812 		return status;
1813 	}
1814 
1815 	/* Allocate request */
1816 	req = pipeline_msg_alloc();
1817 	if (req == NULL)
1818 		return -1;
1819 
1820 	/* Write request */
1821 	req->type = PIPELINE_REQ_TABLE_RULE_STATS_READ;
1822 	req->id = table_id;
1823 	req->table_rule_stats_read.data = rule->data;
1824 	req->table_rule_stats_read.clear = clear;
1825 
1826 	/* Send request and wait for response */
1827 	rsp = pipeline_msg_send_recv(p, req);
1828 
1829 	/* Read response */
1830 	status = rsp->status;
1831 	if (status == 0)
1832 		memcpy(stats, &rsp->table_rule_stats_read.stats, sizeof(*stats));
1833 
1834 	/* Free response */
1835 	pipeline_msg_free(rsp);
1836 
1837 	return status;
1838 }
1839 
1840 int
1841 pipeline_table_mtr_profile_add(const char *pipeline_name,
1842 	uint32_t table_id,
1843 	uint32_t meter_profile_id,
1844 	struct rte_table_action_meter_profile *profile)
1845 {
1846 	struct pipeline *p;
1847 	struct pipeline_msg_req *req;
1848 	struct pipeline_msg_rsp *rsp;
1849 	int status;
1850 
1851 	/* Check input params */
1852 	if ((pipeline_name == NULL) ||
1853 		(profile == NULL))
1854 		return -1;
1855 
1856 	p = pipeline_find(pipeline_name);
1857 	if ((p == NULL) ||
1858 		(table_id >= p->n_tables))
1859 		return -1;
1860 
1861 	if (!pipeline_is_running(p)) {
1862 		struct rte_table_action *a = p->table[table_id].a;
1863 
1864 		status = rte_table_action_meter_profile_add(a,
1865 			meter_profile_id,
1866 			profile);
1867 
1868 		return status;
1869 	}
1870 
1871 	/* Allocate request */
1872 	req = pipeline_msg_alloc();
1873 	if (req == NULL)
1874 		return -1;
1875 
1876 	/* Write request */
1877 	req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_ADD;
1878 	req->id = table_id;
1879 	req->table_mtr_profile_add.meter_profile_id = meter_profile_id;
1880 	memcpy(&req->table_mtr_profile_add.profile, profile, sizeof(*profile));
1881 
1882 	/* Send request and wait for response */
1883 	rsp = pipeline_msg_send_recv(p, req);
1884 
1885 	/* Read response */
1886 	status = rsp->status;
1887 
1888 	/* Free response */
1889 	pipeline_msg_free(rsp);
1890 
1891 	return status;
1892 }
1893 
1894 int
1895 pipeline_table_mtr_profile_delete(const char *pipeline_name,
1896 	uint32_t table_id,
1897 	uint32_t meter_profile_id)
1898 {
1899 	struct pipeline *p;
1900 	struct pipeline_msg_req *req;
1901 	struct pipeline_msg_rsp *rsp;
1902 	int status;
1903 
1904 	/* Check input params */
1905 	if (pipeline_name == NULL)
1906 		return -1;
1907 
1908 	p = pipeline_find(pipeline_name);
1909 	if ((p == NULL) ||
1910 		(table_id >= p->n_tables))
1911 		return -1;
1912 
1913 	if (!pipeline_is_running(p)) {
1914 		struct rte_table_action *a = p->table[table_id].a;
1915 
1916 		status = rte_table_action_meter_profile_delete(a,
1917 				meter_profile_id);
1918 
1919 		return status;
1920 	}
1921 
1922 	/* Allocate request */
1923 	req = pipeline_msg_alloc();
1924 	if (req == NULL)
1925 		return -1;
1926 
1927 	/* Write request */
1928 	req->type = PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE;
1929 	req->id = table_id;
1930 	req->table_mtr_profile_delete.meter_profile_id = meter_profile_id;
1931 
1932 	/* Send request and wait for response */
1933 	rsp = pipeline_msg_send_recv(p, req);
1934 
1935 	/* Read response */
1936 	status = rsp->status;
1937 
1938 	/* Free response */
1939 	pipeline_msg_free(rsp);
1940 
1941 	return status;
1942 }
1943 
1944 int
1945 pipeline_table_rule_mtr_read(const char *pipeline_name,
1946 	uint32_t table_id,
1947 	struct table_rule_match *match,
1948 	struct rte_table_action_mtr_counters *stats,
1949 	int clear)
1950 {
1951 	struct pipeline *p;
1952 	struct table *table;
1953 	struct pipeline_msg_req *req;
1954 	struct pipeline_msg_rsp *rsp;
1955 	struct table_rule *rule;
1956 	uint32_t tc_mask;
1957 	int status;
1958 
1959 	/* Check input params */
1960 	if ((pipeline_name == NULL) ||
1961 		(match == NULL) ||
1962 		(stats == NULL))
1963 		return -1;
1964 
1965 	p = pipeline_find(pipeline_name);
1966 	if ((p == NULL) ||
1967 		(table_id >= p->n_tables) ||
1968 		match_check(match, p, table_id))
1969 		return -1;
1970 
1971 	table = &p->table[table_id];
1972 	tc_mask = (1 << table->ap->params.mtr.n_tc) - 1;
1973 
1974 	rule = table_rule_find(table, match);
1975 	if (rule == NULL)
1976 		return -1;
1977 
1978 	if (!pipeline_is_running(p)) {
1979 		status = rte_table_action_meter_read(table->a,
1980 				rule->data,
1981 				tc_mask,
1982 				stats,
1983 				clear);
1984 
1985 		return status;
1986 	}
1987 
1988 	/* Allocate request */
1989 	req = pipeline_msg_alloc();
1990 	if (req == NULL)
1991 		return -1;
1992 
1993 	/* Write request */
1994 	req->type = PIPELINE_REQ_TABLE_RULE_MTR_READ;
1995 	req->id = table_id;
1996 	req->table_rule_mtr_read.data = rule->data;
1997 	req->table_rule_mtr_read.tc_mask = tc_mask;
1998 	req->table_rule_mtr_read.clear = clear;
1999 
2000 	/* Send request and wait for response */
2001 	rsp = pipeline_msg_send_recv(p, req);
2002 
2003 	/* Read response */
2004 	status = rsp->status;
2005 	if (status == 0)
2006 		memcpy(stats, &rsp->table_rule_mtr_read.stats, sizeof(*stats));
2007 
2008 	/* Free response */
2009 	pipeline_msg_free(rsp);
2010 
2011 	return status;
2012 }
2013 
2014 int
2015 pipeline_table_dscp_table_update(const char *pipeline_name,
2016 	uint32_t table_id,
2017 	uint64_t dscp_mask,
2018 	struct rte_table_action_dscp_table *dscp_table)
2019 {
2020 	struct pipeline *p;
2021 	struct pipeline_msg_req *req;
2022 	struct pipeline_msg_rsp *rsp;
2023 	int status;
2024 
2025 	/* Check input params */
2026 	if ((pipeline_name == NULL) ||
2027 		(dscp_table == NULL))
2028 		return -1;
2029 
2030 	p = pipeline_find(pipeline_name);
2031 	if ((p == NULL) ||
2032 		(table_id >= p->n_tables))
2033 		return -1;
2034 
2035 	if (!pipeline_is_running(p)) {
2036 		struct rte_table_action *a = p->table[table_id].a;
2037 
2038 		status = rte_table_action_dscp_table_update(a,
2039 				dscp_mask,
2040 				dscp_table);
2041 
2042 		return status;
2043 	}
2044 
2045 	/* Allocate request */
2046 	req = pipeline_msg_alloc();
2047 	if (req == NULL)
2048 		return -1;
2049 
2050 	/* Write request */
2051 	req->type = PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE;
2052 	req->id = table_id;
2053 	req->table_dscp_table_update.dscp_mask = dscp_mask;
2054 	memcpy(&req->table_dscp_table_update.dscp_table,
2055 		dscp_table, sizeof(*dscp_table));
2056 
2057 	/* Send request and wait for response */
2058 	rsp = pipeline_msg_send_recv(p, req);
2059 
2060 	/* Read response */
2061 	status = rsp->status;
2062 
2063 	/* Free response */
2064 	pipeline_msg_free(rsp);
2065 
2066 	return status;
2067 }
2068 
2069 int
2070 pipeline_table_rule_ttl_read(const char *pipeline_name,
2071 	uint32_t table_id,
2072 	struct table_rule_match *match,
2073 	struct rte_table_action_ttl_counters *stats,
2074 	int clear)
2075 {
2076 	struct pipeline *p;
2077 	struct table *table;
2078 	struct pipeline_msg_req *req;
2079 	struct pipeline_msg_rsp *rsp;
2080 	struct table_rule *rule;
2081 	int status;
2082 
2083 	/* Check input params */
2084 	if ((pipeline_name == NULL) ||
2085 		(match == NULL) ||
2086 		(stats == NULL))
2087 		return -1;
2088 
2089 	p = pipeline_find(pipeline_name);
2090 	if ((p == NULL) ||
2091 		(table_id >= p->n_tables) ||
2092 		match_check(match, p, table_id))
2093 		return -1;
2094 
2095 	table = &p->table[table_id];
2096 	if (!table->ap->params.ttl.n_packets_enabled)
2097 		return -1;
2098 
2099 	rule = table_rule_find(table, match);
2100 	if (rule == NULL)
2101 		return -1;
2102 
2103 	if (!pipeline_is_running(p)) {
2104 		status = rte_table_action_ttl_read(table->a,
2105 				rule->data,
2106 				stats,
2107 				clear);
2108 
2109 		return status;
2110 	}
2111 
2112 	/* Allocate request */
2113 	req = pipeline_msg_alloc();
2114 	if (req == NULL)
2115 		return -1;
2116 
2117 	/* Write request */
2118 	req->type = PIPELINE_REQ_TABLE_RULE_TTL_READ;
2119 	req->id = table_id;
2120 	req->table_rule_ttl_read.data = rule->data;
2121 	req->table_rule_ttl_read.clear = clear;
2122 
2123 	/* Send request and wait for response */
2124 	rsp = pipeline_msg_send_recv(p, req);
2125 
2126 	/* Read response */
2127 	status = rsp->status;
2128 	if (status == 0)
2129 		memcpy(stats, &rsp->table_rule_ttl_read.stats, sizeof(*stats));
2130 
2131 	/* Free response */
2132 	pipeline_msg_free(rsp);
2133 
2134 	return status;
2135 }
2136 
2137 int
2138 pipeline_table_rule_time_read(const char *pipeline_name,
2139 	uint32_t table_id,
2140 	struct table_rule_match *match,
2141 	uint64_t *timestamp)
2142 {
2143 	struct pipeline *p;
2144 	struct table *table;
2145 	struct pipeline_msg_req *req;
2146 	struct pipeline_msg_rsp *rsp;
2147 	struct table_rule *rule;
2148 	int status;
2149 
2150 	/* Check input params */
2151 	if ((pipeline_name == NULL) ||
2152 		(match == NULL) ||
2153 		(timestamp == NULL))
2154 		return -1;
2155 
2156 	p = pipeline_find(pipeline_name);
2157 	if ((p == NULL) ||
2158 		(table_id >= p->n_tables) ||
2159 		match_check(match, p, table_id))
2160 		return -1;
2161 
2162 	table = &p->table[table_id];
2163 
2164 	rule = table_rule_find(table, match);
2165 	if (rule == NULL)
2166 		return -1;
2167 
2168 	if (!pipeline_is_running(p)) {
2169 		status = rte_table_action_time_read(table->a,
2170 				rule->data,
2171 				timestamp);
2172 
2173 		return status;
2174 	}
2175 
2176 	/* Allocate request */
2177 	req = pipeline_msg_alloc();
2178 	if (req == NULL)
2179 		return -1;
2180 
2181 	/* Write request */
2182 	req->type = PIPELINE_REQ_TABLE_RULE_TIME_READ;
2183 	req->id = table_id;
2184 	req->table_rule_time_read.data = rule->data;
2185 
2186 	/* Send request and wait for response */
2187 	rsp = pipeline_msg_send_recv(p, req);
2188 
2189 	/* Read response */
2190 	status = rsp->status;
2191 	if (status == 0)
2192 		*timestamp = rsp->table_rule_time_read.timestamp;
2193 
2194 	/* Free response */
2195 	pipeline_msg_free(rsp);
2196 
2197 	return status;
2198 }
2199 
2200 /**
2201  * Data plane threads: message handling
2202  */
2203 static inline struct pipeline_msg_req *
2204 pipeline_msg_recv(struct rte_ring *msgq_req)
2205 {
2206 	struct pipeline_msg_req *req;
2207 
2208 	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
2209 
2210 	if (status != 0)
2211 		return NULL;
2212 
2213 	return req;
2214 }
2215 
2216 static inline void
2217 pipeline_msg_send(struct rte_ring *msgq_rsp,
2218 	struct pipeline_msg_rsp *rsp)
2219 {
2220 	int status;
2221 
2222 	do {
2223 		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
2224 	} while (status == -ENOBUFS);
2225 }
2226 
2227 static struct pipeline_msg_rsp *
2228 pipeline_msg_handle_port_in_stats_read(struct pipeline_data *p,
2229 	struct pipeline_msg_req *req)
2230 {
2231 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2232 	uint32_t port_id = req->id;
2233 	int clear = req->port_in_stats_read.clear;
2234 
2235 	rsp->status = rte_pipeline_port_in_stats_read(p->p,
2236 		port_id,
2237 		&rsp->port_in_stats_read.stats,
2238 		clear);
2239 
2240 	return rsp;
2241 }
2242 
2243 static struct pipeline_msg_rsp *
2244 pipeline_msg_handle_port_in_enable(struct pipeline_data *p,
2245 	struct pipeline_msg_req *req)
2246 {
2247 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2248 	uint32_t port_id = req->id;
2249 
2250 	rsp->status = rte_pipeline_port_in_enable(p->p,
2251 		port_id);
2252 
2253 	return rsp;
2254 }
2255 
2256 static struct pipeline_msg_rsp *
2257 pipeline_msg_handle_port_in_disable(struct pipeline_data *p,
2258 	struct pipeline_msg_req *req)
2259 {
2260 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2261 	uint32_t port_id = req->id;
2262 
2263 	rsp->status = rte_pipeline_port_in_disable(p->p,
2264 		port_id);
2265 
2266 	return rsp;
2267 }
2268 
2269 static struct pipeline_msg_rsp *
2270 pipeline_msg_handle_port_out_stats_read(struct pipeline_data *p,
2271 	struct pipeline_msg_req *req)
2272 {
2273 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2274 	uint32_t port_id = req->id;
2275 	int clear = req->port_out_stats_read.clear;
2276 
2277 	rsp->status = rte_pipeline_port_out_stats_read(p->p,
2278 		port_id,
2279 		&rsp->port_out_stats_read.stats,
2280 		clear);
2281 
2282 	return rsp;
2283 }
2284 
2285 static struct pipeline_msg_rsp *
2286 pipeline_msg_handle_table_stats_read(struct pipeline_data *p,
2287 	struct pipeline_msg_req *req)
2288 {
2289 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2290 	uint32_t port_id = req->id;
2291 	int clear = req->table_stats_read.clear;
2292 
2293 	rsp->status = rte_pipeline_table_stats_read(p->p,
2294 		port_id,
2295 		&rsp->table_stats_read.stats,
2296 		clear);
2297 
2298 	return rsp;
2299 }
2300 
2301 static int
2302 match_convert_ipv6_depth(uint32_t depth, uint32_t *depth32)
2303 {
2304 	if (depth > 128)
2305 		return -1;
2306 
2307 	switch (depth / 32) {
2308 	case 0:
2309 		depth32[0] = depth;
2310 		depth32[1] = 0;
2311 		depth32[2] = 0;
2312 		depth32[3] = 0;
2313 		return 0;
2314 
2315 	case 1:
2316 		depth32[0] = 32;
2317 		depth32[1] = depth - 32;
2318 		depth32[2] = 0;
2319 		depth32[3] = 0;
2320 		return 0;
2321 
2322 	case 2:
2323 		depth32[0] = 32;
2324 		depth32[1] = 32;
2325 		depth32[2] = depth - 64;
2326 		depth32[3] = 0;
2327 		return 0;
2328 
2329 	case 3:
2330 		depth32[0] = 32;
2331 		depth32[1] = 32;
2332 		depth32[2] = 32;
2333 		depth32[3] = depth - 96;
2334 		return 0;
2335 
2336 	case 4:
2337 		depth32[0] = 32;
2338 		depth32[1] = 32;
2339 		depth32[2] = 32;
2340 		depth32[3] = 32;
2341 		return 0;
2342 
2343 	default:
2344 		return -1;
2345 	}
2346 }
2347 
2348 static int
2349 match_convert(struct table_rule_match *mh,
2350 	union table_rule_match_low_level *ml,
2351 	int add)
2352 {
2353 	memset(ml, 0, sizeof(*ml));
2354 
2355 	switch (mh->match_type) {
2356 	case TABLE_ACL:
2357 		if (mh->match.acl.ip_version)
2358 			if (add) {
2359 				ml->acl_add.field_value[0].value.u8 =
2360 					mh->match.acl.proto;
2361 				ml->acl_add.field_value[0].mask_range.u8 =
2362 					mh->match.acl.proto_mask;
2363 
2364 				ml->acl_add.field_value[1].value.u32 =
2365 					mh->match.acl.ipv4.sa;
2366 				ml->acl_add.field_value[1].mask_range.u32 =
2367 					mh->match.acl.sa_depth;
2368 
2369 				ml->acl_add.field_value[2].value.u32 =
2370 					mh->match.acl.ipv4.da;
2371 				ml->acl_add.field_value[2].mask_range.u32 =
2372 					mh->match.acl.da_depth;
2373 
2374 				ml->acl_add.field_value[3].value.u16 =
2375 					mh->match.acl.sp0;
2376 				ml->acl_add.field_value[3].mask_range.u16 =
2377 					mh->match.acl.sp1;
2378 
2379 				ml->acl_add.field_value[4].value.u16 =
2380 					mh->match.acl.dp0;
2381 				ml->acl_add.field_value[4].mask_range.u16 =
2382 					mh->match.acl.dp1;
2383 
2384 				ml->acl_add.priority =
2385 					(int32_t) mh->match.acl.priority;
2386 			} else {
2387 				ml->acl_delete.field_value[0].value.u8 =
2388 					mh->match.acl.proto;
2389 				ml->acl_delete.field_value[0].mask_range.u8 =
2390 					mh->match.acl.proto_mask;
2391 
2392 				ml->acl_delete.field_value[1].value.u32 =
2393 					mh->match.acl.ipv4.sa;
2394 				ml->acl_delete.field_value[1].mask_range.u32 =
2395 					mh->match.acl.sa_depth;
2396 
2397 				ml->acl_delete.field_value[2].value.u32 =
2398 					mh->match.acl.ipv4.da;
2399 				ml->acl_delete.field_value[2].mask_range.u32 =
2400 					mh->match.acl.da_depth;
2401 
2402 				ml->acl_delete.field_value[3].value.u16 =
2403 					mh->match.acl.sp0;
2404 				ml->acl_delete.field_value[3].mask_range.u16 =
2405 					mh->match.acl.sp1;
2406 
2407 				ml->acl_delete.field_value[4].value.u16 =
2408 					mh->match.acl.dp0;
2409 				ml->acl_delete.field_value[4].mask_range.u16 =
2410 					mh->match.acl.dp1;
2411 			}
2412 		else
2413 			if (add) {
2414 				uint32_t *sa32 =
2415 					(uint32_t *) mh->match.acl.ipv6.sa;
2416 				uint32_t *da32 =
2417 					(uint32_t *) mh->match.acl.ipv6.da;
2418 				uint32_t sa32_depth[4], da32_depth[4];
2419 				int status;
2420 
2421 				status = match_convert_ipv6_depth(
2422 					mh->match.acl.sa_depth,
2423 					sa32_depth);
2424 				if (status)
2425 					return status;
2426 
2427 				status = match_convert_ipv6_depth(
2428 					mh->match.acl.da_depth,
2429 					da32_depth);
2430 				if (status)
2431 					return status;
2432 
2433 				ml->acl_add.field_value[0].value.u8 =
2434 					mh->match.acl.proto;
2435 				ml->acl_add.field_value[0].mask_range.u8 =
2436 					mh->match.acl.proto_mask;
2437 
2438 				ml->acl_add.field_value[1].value.u32 =
2439 					rte_be_to_cpu_32(sa32[0]);
2440 				ml->acl_add.field_value[1].mask_range.u32 =
2441 					sa32_depth[0];
2442 				ml->acl_add.field_value[2].value.u32 =
2443 					rte_be_to_cpu_32(sa32[1]);
2444 				ml->acl_add.field_value[2].mask_range.u32 =
2445 					sa32_depth[1];
2446 				ml->acl_add.field_value[3].value.u32 =
2447 					rte_be_to_cpu_32(sa32[2]);
2448 				ml->acl_add.field_value[3].mask_range.u32 =
2449 					sa32_depth[2];
2450 				ml->acl_add.field_value[4].value.u32 =
2451 					rte_be_to_cpu_32(sa32[3]);
2452 				ml->acl_add.field_value[4].mask_range.u32 =
2453 					sa32_depth[3];
2454 
2455 				ml->acl_add.field_value[5].value.u32 =
2456 					rte_be_to_cpu_32(da32[0]);
2457 				ml->acl_add.field_value[5].mask_range.u32 =
2458 					da32_depth[0];
2459 				ml->acl_add.field_value[6].value.u32 =
2460 					rte_be_to_cpu_32(da32[1]);
2461 				ml->acl_add.field_value[6].mask_range.u32 =
2462 					da32_depth[1];
2463 				ml->acl_add.field_value[7].value.u32 =
2464 					rte_be_to_cpu_32(da32[2]);
2465 				ml->acl_add.field_value[7].mask_range.u32 =
2466 					da32_depth[2];
2467 				ml->acl_add.field_value[8].value.u32 =
2468 					rte_be_to_cpu_32(da32[3]);
2469 				ml->acl_add.field_value[8].mask_range.u32 =
2470 					da32_depth[3];
2471 
2472 				ml->acl_add.field_value[9].value.u16 =
2473 					mh->match.acl.sp0;
2474 				ml->acl_add.field_value[9].mask_range.u16 =
2475 					mh->match.acl.sp1;
2476 
2477 				ml->acl_add.field_value[10].value.u16 =
2478 					mh->match.acl.dp0;
2479 				ml->acl_add.field_value[10].mask_range.u16 =
2480 					mh->match.acl.dp1;
2481 
2482 				ml->acl_add.priority =
2483 					(int32_t) mh->match.acl.priority;
2484 			} else {
2485 				uint32_t *sa32 =
2486 					(uint32_t *) mh->match.acl.ipv6.sa;
2487 				uint32_t *da32 =
2488 					(uint32_t *) mh->match.acl.ipv6.da;
2489 				uint32_t sa32_depth[4], da32_depth[4];
2490 				int status;
2491 
2492 				status = match_convert_ipv6_depth(
2493 					mh->match.acl.sa_depth,
2494 					sa32_depth);
2495 				if (status)
2496 					return status;
2497 
2498 				status = match_convert_ipv6_depth(
2499 					mh->match.acl.da_depth,
2500 					da32_depth);
2501 				if (status)
2502 					return status;
2503 
2504 				ml->acl_delete.field_value[0].value.u8 =
2505 					mh->match.acl.proto;
2506 				ml->acl_delete.field_value[0].mask_range.u8 =
2507 					mh->match.acl.proto_mask;
2508 
2509 				ml->acl_delete.field_value[1].value.u32 =
2510 					rte_be_to_cpu_32(sa32[0]);
2511 				ml->acl_delete.field_value[1].mask_range.u32 =
2512 					sa32_depth[0];
2513 				ml->acl_delete.field_value[2].value.u32 =
2514 					rte_be_to_cpu_32(sa32[1]);
2515 				ml->acl_delete.field_value[2].mask_range.u32 =
2516 					sa32_depth[1];
2517 				ml->acl_delete.field_value[3].value.u32 =
2518 					rte_be_to_cpu_32(sa32[2]);
2519 				ml->acl_delete.field_value[3].mask_range.u32 =
2520 					sa32_depth[2];
2521 				ml->acl_delete.field_value[4].value.u32 =
2522 					rte_be_to_cpu_32(sa32[3]);
2523 				ml->acl_delete.field_value[4].mask_range.u32 =
2524 					sa32_depth[3];
2525 
2526 				ml->acl_delete.field_value[5].value.u32 =
2527 					rte_be_to_cpu_32(da32[0]);
2528 				ml->acl_delete.field_value[5].mask_range.u32 =
2529 					da32_depth[0];
2530 				ml->acl_delete.field_value[6].value.u32 =
2531 					rte_be_to_cpu_32(da32[1]);
2532 				ml->acl_delete.field_value[6].mask_range.u32 =
2533 					da32_depth[1];
2534 				ml->acl_delete.field_value[7].value.u32 =
2535 					rte_be_to_cpu_32(da32[2]);
2536 				ml->acl_delete.field_value[7].mask_range.u32 =
2537 					da32_depth[2];
2538 				ml->acl_delete.field_value[8].value.u32 =
2539 					rte_be_to_cpu_32(da32[3]);
2540 				ml->acl_delete.field_value[8].mask_range.u32 =
2541 					da32_depth[3];
2542 
2543 				ml->acl_delete.field_value[9].value.u16 =
2544 					mh->match.acl.sp0;
2545 				ml->acl_delete.field_value[9].mask_range.u16 =
2546 					mh->match.acl.sp1;
2547 
2548 				ml->acl_delete.field_value[10].value.u16 =
2549 					mh->match.acl.dp0;
2550 				ml->acl_delete.field_value[10].mask_range.u16 =
2551 					mh->match.acl.dp1;
2552 			}
2553 		return 0;
2554 
2555 	case TABLE_ARRAY:
2556 		ml->array.pos = mh->match.array.pos;
2557 		return 0;
2558 
2559 	case TABLE_HASH:
2560 		memcpy(ml->hash, mh->match.hash.key, sizeof(ml->hash));
2561 		return 0;
2562 
2563 	case TABLE_LPM:
2564 		if (mh->match.lpm.ip_version) {
2565 			ml->lpm_ipv4.ip = mh->match.lpm.ipv4;
2566 			ml->lpm_ipv4.depth = mh->match.lpm.depth;
2567 		} else {
2568 			memcpy(ml->lpm_ipv6.ip,
2569 				mh->match.lpm.ipv6, sizeof(ml->lpm_ipv6.ip));
2570 			ml->lpm_ipv6.depth = mh->match.lpm.depth;
2571 		}
2572 
2573 		return 0;
2574 
2575 	default:
2576 		return -1;
2577 	}
2578 }
2579 
2580 static int
2581 action_convert(struct rte_table_action *a,
2582 	struct table_rule_action *action,
2583 	struct rte_pipeline_table_entry *data)
2584 {
2585 	int status;
2586 
2587 	/* Apply actions */
2588 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_FWD)) {
2589 		status = rte_table_action_apply(a,
2590 			data,
2591 			RTE_TABLE_ACTION_FWD,
2592 			&action->fwd);
2593 
2594 		if (status)
2595 			return status;
2596 	}
2597 
2598 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_LB)) {
2599 		status = rte_table_action_apply(a,
2600 			data,
2601 			RTE_TABLE_ACTION_LB,
2602 			&action->lb);
2603 
2604 		if (status)
2605 			return status;
2606 	}
2607 
2608 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_MTR)) {
2609 		status = rte_table_action_apply(a,
2610 			data,
2611 			RTE_TABLE_ACTION_MTR,
2612 			&action->mtr);
2613 
2614 		if (status)
2615 			return status;
2616 	}
2617 
2618 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TM)) {
2619 		status = rte_table_action_apply(a,
2620 			data,
2621 			RTE_TABLE_ACTION_TM,
2622 			&action->tm);
2623 
2624 		if (status)
2625 			return status;
2626 	}
2627 
2628 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_ENCAP)) {
2629 		status = rte_table_action_apply(a,
2630 			data,
2631 			RTE_TABLE_ACTION_ENCAP,
2632 			&action->encap);
2633 
2634 		if (status)
2635 			return status;
2636 	}
2637 
2638 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_NAT)) {
2639 		status = rte_table_action_apply(a,
2640 			data,
2641 			RTE_TABLE_ACTION_NAT,
2642 			&action->nat);
2643 
2644 		if (status)
2645 			return status;
2646 	}
2647 
2648 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TTL)) {
2649 		status = rte_table_action_apply(a,
2650 			data,
2651 			RTE_TABLE_ACTION_TTL,
2652 			&action->ttl);
2653 
2654 		if (status)
2655 			return status;
2656 	}
2657 
2658 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_STATS)) {
2659 		status = rte_table_action_apply(a,
2660 			data,
2661 			RTE_TABLE_ACTION_STATS,
2662 			&action->stats);
2663 
2664 		if (status)
2665 			return status;
2666 	}
2667 
2668 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TIME)) {
2669 		status = rte_table_action_apply(a,
2670 			data,
2671 			RTE_TABLE_ACTION_TIME,
2672 			&action->time);
2673 
2674 		if (status)
2675 			return status;
2676 	}
2677 
2678 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_SYM_CRYPTO)) {
2679 		status = rte_table_action_apply(a,
2680 			data,
2681 			RTE_TABLE_ACTION_SYM_CRYPTO,
2682 			&action->sym_crypto);
2683 
2684 		if (status)
2685 			return status;
2686 	}
2687 
2688 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_TAG)) {
2689 		status = rte_table_action_apply(a,
2690 			data,
2691 			RTE_TABLE_ACTION_TAG,
2692 			&action->tag);
2693 
2694 		if (status)
2695 			return status;
2696 	}
2697 
2698 	if (action->action_mask & (1LLU << RTE_TABLE_ACTION_DECAP)) {
2699 		status = rte_table_action_apply(a,
2700 			data,
2701 			RTE_TABLE_ACTION_DECAP,
2702 			&action->decap);
2703 
2704 		if (status)
2705 			return status;
2706 	}
2707 
2708 	return 0;
2709 }
2710 
2711 static struct pipeline_msg_rsp *
2712 pipeline_msg_handle_table_rule_add(struct pipeline_data *p,
2713 	struct pipeline_msg_req *req)
2714 {
2715 	union table_rule_match_low_level match_ll;
2716 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2717 	struct table_rule_match *match = &req->table_rule_add.match;
2718 	struct table_rule_action *action = &req->table_rule_add.action;
2719 	struct rte_pipeline_table_entry *data_in, *data_out;
2720 	uint32_t table_id = req->id;
2721 	int key_found, status;
2722 	struct rte_table_action *a = p->table_data[table_id].a;
2723 
2724 	/* Apply actions */
2725 	memset(p->buffer, 0, sizeof(p->buffer));
2726 	data_in = (struct rte_pipeline_table_entry *) p->buffer;
2727 
2728 	status = match_convert(match, &match_ll, 1);
2729 	if (status) {
2730 		rsp->status = -1;
2731 		return rsp;
2732 	}
2733 
2734 	status = action_convert(a, action, data_in);
2735 	if (status) {
2736 		rsp->status = -1;
2737 		return rsp;
2738 	}
2739 
2740 	status = rte_pipeline_table_entry_add(p->p,
2741 		table_id,
2742 		&match_ll,
2743 		data_in,
2744 		&key_found,
2745 		&data_out);
2746 	if (status) {
2747 		rsp->status = -1;
2748 		return rsp;
2749 	}
2750 
2751 	/* Write response */
2752 	rsp->status = 0;
2753 	rsp->table_rule_add.data = data_out;
2754 
2755 	return rsp;
2756 }
2757 
2758 static struct pipeline_msg_rsp *
2759 pipeline_msg_handle_table_rule_add_default(struct pipeline_data *p,
2760 	struct pipeline_msg_req *req)
2761 {
2762 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2763 	struct table_rule_action *action = &req->table_rule_add_default.action;
2764 	struct rte_pipeline_table_entry *data_in, *data_out;
2765 	uint32_t table_id = req->id;
2766 	int status;
2767 
2768 	/* Apply actions */
2769 	memset(p->buffer, 0, sizeof(p->buffer));
2770 	data_in = (struct rte_pipeline_table_entry *) p->buffer;
2771 
2772 	data_in->action = action->fwd.action;
2773 	if (action->fwd.action == RTE_PIPELINE_ACTION_PORT)
2774 		data_in->port_id = action->fwd.id;
2775 	if (action->fwd.action == RTE_PIPELINE_ACTION_TABLE)
2776 		data_in->table_id = action->fwd.id;
2777 
2778 	/* Add default rule to table */
2779 	status = rte_pipeline_table_default_entry_add(p->p,
2780 		table_id,
2781 		data_in,
2782 		&data_out);
2783 	if (status) {
2784 		rsp->status = -1;
2785 		return rsp;
2786 	}
2787 
2788 	/* Write response */
2789 	rsp->status = 0;
2790 	rsp->table_rule_add_default.data = data_out;
2791 
2792 	return rsp;
2793 }
2794 
2795 static struct pipeline_msg_rsp *
2796 pipeline_msg_handle_table_rule_add_bulk(struct pipeline_data *p,
2797 	struct pipeline_msg_req *req)
2798 {
2799 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2800 
2801 	uint32_t table_id = req->id;
2802 	struct table_rule_list *list = req->table_rule_add_bulk.list;
2803 	uint32_t bulk = req->table_rule_add_bulk.bulk;
2804 
2805 	uint32_t n_rules_added;
2806 	int status;
2807 
2808 	struct table_ll table_ll = {
2809 		.p = p->p,
2810 		.table_id = table_id,
2811 		.a = p->table_data[table_id].a,
2812 		.bulk_supported = bulk,
2813 	};
2814 
2815 	status = table_rule_add_bulk_ll(&table_ll, list, &n_rules_added);
2816 	if (status) {
2817 		rsp->status = -1;
2818 		rsp->table_rule_add_bulk.n_rules = 0;
2819 		return rsp;
2820 	}
2821 
2822 	/* Write response */
2823 	rsp->status = 0;
2824 	rsp->table_rule_add_bulk.n_rules = n_rules_added;
2825 	return rsp;
2826 }
2827 
2828 static struct pipeline_msg_rsp *
2829 pipeline_msg_handle_table_rule_delete(struct pipeline_data *p,
2830 	struct pipeline_msg_req *req)
2831 {
2832 	union table_rule_match_low_level match_ll;
2833 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2834 	struct table_rule_match *match = &req->table_rule_delete.match;
2835 	uint32_t table_id = req->id;
2836 	int key_found, status;
2837 
2838 	status = match_convert(match, &match_ll, 0);
2839 	if (status) {
2840 		rsp->status = -1;
2841 		return rsp;
2842 	}
2843 
2844 	rsp->status = rte_pipeline_table_entry_delete(p->p,
2845 		table_id,
2846 		&match_ll,
2847 		&key_found,
2848 		NULL);
2849 
2850 	return rsp;
2851 }
2852 
2853 static struct pipeline_msg_rsp *
2854 pipeline_msg_handle_table_rule_delete_default(struct pipeline_data *p,
2855 	struct pipeline_msg_req *req)
2856 {
2857 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2858 	uint32_t table_id = req->id;
2859 
2860 	rsp->status = rte_pipeline_table_default_entry_delete(p->p,
2861 		table_id,
2862 		NULL);
2863 
2864 	return rsp;
2865 }
2866 
2867 static struct pipeline_msg_rsp *
2868 pipeline_msg_handle_table_rule_stats_read(struct pipeline_data *p,
2869 	struct pipeline_msg_req *req)
2870 {
2871 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2872 	uint32_t table_id = req->id;
2873 	void *data = req->table_rule_stats_read.data;
2874 	int clear = req->table_rule_stats_read.clear;
2875 	struct rte_table_action *a = p->table_data[table_id].a;
2876 
2877 	rsp->status = rte_table_action_stats_read(a,
2878 		data,
2879 		&rsp->table_rule_stats_read.stats,
2880 		clear);
2881 
2882 	return rsp;
2883 }
2884 
2885 static struct pipeline_msg_rsp *
2886 pipeline_msg_handle_table_mtr_profile_add(struct pipeline_data *p,
2887 	struct pipeline_msg_req *req)
2888 {
2889 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2890 	uint32_t table_id = req->id;
2891 	uint32_t meter_profile_id = req->table_mtr_profile_add.meter_profile_id;
2892 	struct rte_table_action_meter_profile *profile =
2893 		&req->table_mtr_profile_add.profile;
2894 	struct rte_table_action *a = p->table_data[table_id].a;
2895 
2896 	rsp->status = rte_table_action_meter_profile_add(a,
2897 		meter_profile_id,
2898 		profile);
2899 
2900 	return rsp;
2901 }
2902 
2903 static struct pipeline_msg_rsp *
2904 pipeline_msg_handle_table_mtr_profile_delete(struct pipeline_data *p,
2905 	struct pipeline_msg_req *req)
2906 {
2907 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2908 	uint32_t table_id = req->id;
2909 	uint32_t meter_profile_id =
2910 		req->table_mtr_profile_delete.meter_profile_id;
2911 	struct rte_table_action *a = p->table_data[table_id].a;
2912 
2913 	rsp->status = rte_table_action_meter_profile_delete(a,
2914 		meter_profile_id);
2915 
2916 	return rsp;
2917 }
2918 
2919 static struct pipeline_msg_rsp *
2920 pipeline_msg_handle_table_rule_mtr_read(struct pipeline_data *p,
2921 	struct pipeline_msg_req *req)
2922 {
2923 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2924 	uint32_t table_id = req->id;
2925 	void *data = req->table_rule_mtr_read.data;
2926 	uint32_t tc_mask = req->table_rule_mtr_read.tc_mask;
2927 	int clear = req->table_rule_mtr_read.clear;
2928 	struct rte_table_action *a = p->table_data[table_id].a;
2929 
2930 	rsp->status = rte_table_action_meter_read(a,
2931 		data,
2932 		tc_mask,
2933 		&rsp->table_rule_mtr_read.stats,
2934 		clear);
2935 
2936 	return rsp;
2937 }
2938 
2939 static struct pipeline_msg_rsp *
2940 pipeline_msg_handle_table_dscp_table_update(struct pipeline_data *p,
2941 	struct pipeline_msg_req *req)
2942 {
2943 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2944 	uint32_t table_id = req->id;
2945 	uint64_t dscp_mask = req->table_dscp_table_update.dscp_mask;
2946 	struct rte_table_action_dscp_table *dscp_table =
2947 		&req->table_dscp_table_update.dscp_table;
2948 	struct rte_table_action *a = p->table_data[table_id].a;
2949 
2950 	rsp->status = rte_table_action_dscp_table_update(a,
2951 		dscp_mask,
2952 		dscp_table);
2953 
2954 	return rsp;
2955 }
2956 
2957 static struct pipeline_msg_rsp *
2958 pipeline_msg_handle_table_rule_ttl_read(struct pipeline_data *p,
2959 	struct pipeline_msg_req *req)
2960 {
2961 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2962 	uint32_t table_id = req->id;
2963 	void *data = req->table_rule_ttl_read.data;
2964 	int clear = req->table_rule_ttl_read.clear;
2965 	struct rte_table_action *a = p->table_data[table_id].a;
2966 
2967 	rsp->status = rte_table_action_ttl_read(a,
2968 		data,
2969 		&rsp->table_rule_ttl_read.stats,
2970 		clear);
2971 
2972 	return rsp;
2973 }
2974 
2975 static struct pipeline_msg_rsp *
2976 pipeline_msg_handle_table_rule_time_read(struct pipeline_data *p,
2977 	struct pipeline_msg_req *req)
2978 {
2979 	struct pipeline_msg_rsp *rsp = (struct pipeline_msg_rsp *) req;
2980 	uint32_t table_id = req->id;
2981 	void *data = req->table_rule_time_read.data;
2982 	struct rte_table_action *a = p->table_data[table_id].a;
2983 
2984 	rsp->status = rte_table_action_time_read(a,
2985 		data,
2986 		&rsp->table_rule_time_read.timestamp);
2987 
2988 	return rsp;
2989 }
2990 
2991 static void
2992 pipeline_msg_handle(struct pipeline_data *p)
2993 {
2994 	for ( ; ; ) {
2995 		struct pipeline_msg_req *req;
2996 		struct pipeline_msg_rsp *rsp;
2997 
2998 		req = pipeline_msg_recv(p->msgq_req);
2999 		if (req == NULL)
3000 			break;
3001 
3002 		switch (req->type) {
3003 		case PIPELINE_REQ_PORT_IN_STATS_READ:
3004 			rsp = pipeline_msg_handle_port_in_stats_read(p, req);
3005 			break;
3006 
3007 		case PIPELINE_REQ_PORT_IN_ENABLE:
3008 			rsp = pipeline_msg_handle_port_in_enable(p, req);
3009 			break;
3010 
3011 		case PIPELINE_REQ_PORT_IN_DISABLE:
3012 			rsp = pipeline_msg_handle_port_in_disable(p, req);
3013 			break;
3014 
3015 		case PIPELINE_REQ_PORT_OUT_STATS_READ:
3016 			rsp = pipeline_msg_handle_port_out_stats_read(p, req);
3017 			break;
3018 
3019 		case PIPELINE_REQ_TABLE_STATS_READ:
3020 			rsp = pipeline_msg_handle_table_stats_read(p, req);
3021 			break;
3022 
3023 		case PIPELINE_REQ_TABLE_RULE_ADD:
3024 			rsp = pipeline_msg_handle_table_rule_add(p, req);
3025 			break;
3026 
3027 		case PIPELINE_REQ_TABLE_RULE_ADD_DEFAULT:
3028 			rsp = pipeline_msg_handle_table_rule_add_default(p,	req);
3029 			break;
3030 
3031 		case PIPELINE_REQ_TABLE_RULE_ADD_BULK:
3032 			rsp = pipeline_msg_handle_table_rule_add_bulk(p, req);
3033 			break;
3034 
3035 		case PIPELINE_REQ_TABLE_RULE_DELETE:
3036 			rsp = pipeline_msg_handle_table_rule_delete(p, req);
3037 			break;
3038 
3039 		case PIPELINE_REQ_TABLE_RULE_DELETE_DEFAULT:
3040 			rsp = pipeline_msg_handle_table_rule_delete_default(p, req);
3041 			break;
3042 
3043 		case PIPELINE_REQ_TABLE_RULE_STATS_READ:
3044 			rsp = pipeline_msg_handle_table_rule_stats_read(p, req);
3045 			break;
3046 
3047 		case PIPELINE_REQ_TABLE_MTR_PROFILE_ADD:
3048 			rsp = pipeline_msg_handle_table_mtr_profile_add(p, req);
3049 			break;
3050 
3051 		case PIPELINE_REQ_TABLE_MTR_PROFILE_DELETE:
3052 			rsp = pipeline_msg_handle_table_mtr_profile_delete(p, req);
3053 			break;
3054 
3055 		case PIPELINE_REQ_TABLE_RULE_MTR_READ:
3056 			rsp = pipeline_msg_handle_table_rule_mtr_read(p, req);
3057 			break;
3058 
3059 		case PIPELINE_REQ_TABLE_DSCP_TABLE_UPDATE:
3060 			rsp = pipeline_msg_handle_table_dscp_table_update(p, req);
3061 			break;
3062 
3063 		case PIPELINE_REQ_TABLE_RULE_TTL_READ:
3064 			rsp = pipeline_msg_handle_table_rule_ttl_read(p, req);
3065 			break;
3066 
3067 		case PIPELINE_REQ_TABLE_RULE_TIME_READ:
3068 			rsp = pipeline_msg_handle_table_rule_time_read(p, req);
3069 			break;
3070 
3071 		default:
3072 			rsp = (struct pipeline_msg_rsp *) req;
3073 			rsp->status = -1;
3074 		}
3075 
3076 		pipeline_msg_send(p->msgq_rsp, rsp);
3077 	}
3078 }
3079 
3080 /**
3081  * Data plane threads: main
3082  */
3083 int
3084 thread_main(void *arg __rte_unused)
3085 {
3086 	struct thread_data *t;
3087 	uint32_t thread_id, i;
3088 
3089 	thread_id = rte_lcore_id();
3090 	t = &thread_data[thread_id];
3091 
3092 	/* Dispatch loop */
3093 	for (i = 0; ; i++) {
3094 		uint32_t j;
3095 
3096 		/* Data Plane */
3097 		for (j = 0; j < t->n_pipelines; j++)
3098 			rte_pipeline_run(t->p[j]);
3099 
3100 		/* Control Plane */
3101 		if ((i & 0xF) == 0) {
3102 			uint64_t time = rte_get_tsc_cycles();
3103 			uint64_t time_next_min = UINT64_MAX;
3104 
3105 			if (time < t->time_next_min)
3106 				continue;
3107 
3108 			/* Pipeline message queues */
3109 			for (j = 0; j < t->n_pipelines; j++) {
3110 				struct pipeline_data *p =
3111 					&t->pipeline_data[j];
3112 				uint64_t time_next = p->time_next;
3113 
3114 				if (time_next <= time) {
3115 					pipeline_msg_handle(p);
3116 					rte_pipeline_flush(p->p);
3117 					time_next = time + p->timer_period;
3118 					p->time_next = time_next;
3119 				}
3120 
3121 				if (time_next < time_next_min)
3122 					time_next_min = time_next;
3123 			}
3124 
3125 			/* Thread message queues */
3126 			{
3127 				uint64_t time_next = t->time_next;
3128 
3129 				if (time_next <= time) {
3130 					thread_msg_handle(t);
3131 					time_next = time + t->timer_period;
3132 					t->time_next = time_next;
3133 				}
3134 
3135 				if (time_next < time_next_min)
3136 					time_next_min = time_next;
3137 			}
3138 
3139 			t->time_next_min = time_next_min;
3140 		}
3141 	}
3142 
3143 	return 0;
3144 }
3145