xref: /dpdk/drivers/event/opdl/opdl_evdev_init.c (revision 4851ef2b40bc31accfffc3bb476930a73f50afac)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation
3  */
4 
5 #include <inttypes.h>
6 #include <string.h>
7 
8 #include <bus_vdev_driver.h>
9 #include <rte_errno.h>
10 #include <rte_cycles.h>
11 #include <rte_memzone.h>
12 
13 #include "opdl_evdev.h"
14 #include "opdl_ring.h"
15 #include "opdl_log.h"
16 
17 
18 static __rte_always_inline uint32_t
enqueue_check(struct opdl_port * p,const struct rte_event ev[],uint16_t num,uint16_t num_events)19 enqueue_check(struct opdl_port *p,
20 		const struct rte_event ev[],
21 		uint16_t num,
22 		uint16_t num_events)
23 {
24 	uint16_t i;
25 
26 	if (p->opdl->do_validation) {
27 
28 		for (i = 0; i < num; i++) {
29 			if (ev[i].queue_id != p->next_external_qid) {
30 				PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
31 					     "ERROR - port:[%u] - event wants"
32 					     " to enq to q_id[%u],"
33 					     " but should be [%u]",
34 					     opdl_pmd_dev_id(p->opdl),
35 					     p->id,
36 					     ev[i].queue_id,
37 					     p->next_external_qid);
38 				rte_errno = EINVAL;
39 				return 0;
40 			}
41 		}
42 
43 		/* Stats */
44 		if (p->p_type == OPDL_PURE_RX_PORT ||
45 				p->p_type == OPDL_ASYNC_PORT) {
46 			/* Stats */
47 			if (num_events) {
48 				p->port_stat[claim_pkts_requested] += num;
49 				p->port_stat[claim_pkts_granted] += num_events;
50 				p->port_stat[claim_non_empty]++;
51 				p->start_cycles = rte_rdtsc();
52 			} else {
53 				p->port_stat[claim_empty]++;
54 				p->start_cycles = 0;
55 			}
56 		} else {
57 			if (p->start_cycles) {
58 				uint64_t end_cycles = rte_rdtsc();
59 				p->port_stat[total_cycles] +=
60 					end_cycles - p->start_cycles;
61 			}
62 		}
63 	} else {
64 		if (num > 0 &&
65 				ev[0].queue_id != p->next_external_qid) {
66 			rte_errno = EINVAL;
67 			return 0;
68 		}
69 	}
70 
71 	return num;
72 }
73 
74 static __rte_always_inline void
update_on_dequeue(struct opdl_port * p,struct rte_event ev[],uint16_t num,uint16_t num_events)75 update_on_dequeue(struct opdl_port *p,
76 		struct rte_event ev[],
77 		uint16_t num,
78 		uint16_t num_events)
79 {
80 	if (p->opdl->do_validation) {
81 		int16_t i;
82 		for (i = 0; i < num; i++)
83 			ev[i].queue_id =
84 				p->opdl->queue[p->queue_id].external_qid;
85 
86 		/* Stats */
87 		if (num_events) {
88 			p->port_stat[claim_pkts_requested] += num;
89 			p->port_stat[claim_pkts_granted] += num_events;
90 			p->port_stat[claim_non_empty]++;
91 			p->start_cycles = rte_rdtsc();
92 		} else {
93 			p->port_stat[claim_empty]++;
94 			p->start_cycles = 0;
95 		}
96 	} else {
97 		if (num > 0)
98 			ev[0].queue_id =
99 				p->opdl->queue[p->queue_id].external_qid;
100 	}
101 }
102 
103 
104 /*
105  * Error RX enqueue:
106  *
107  *
108  */
109 
110 static uint16_t
opdl_rx_error_enqueue(struct opdl_port * p,const struct rte_event ev[],uint16_t num)111 opdl_rx_error_enqueue(struct opdl_port *p,
112 		const struct rte_event ev[],
113 		uint16_t num)
114 {
115 	RTE_SET_USED(p);
116 	RTE_SET_USED(ev);
117 	RTE_SET_USED(num);
118 
119 	rte_errno = ENOSPC;
120 
121 	return 0;
122 }
123 
124 /*
125  * RX enqueue:
126  *
127  * This function handles enqueue for a single input stage_inst with
128  *	threadsafe disabled or enabled. eg 1 thread using a stage_inst or
129  *	multiple threads sharing a stage_inst
130  */
131 
132 static uint16_t
opdl_rx_enqueue(struct opdl_port * p,const struct rte_event ev[],uint16_t num)133 opdl_rx_enqueue(struct opdl_port *p,
134 		const struct rte_event ev[],
135 		uint16_t num)
136 {
137 	uint16_t enqueued = 0;
138 
139 	enqueued = opdl_ring_input(opdl_stage_get_opdl_ring(p->enq_stage_inst),
140 				   ev,
141 				   num,
142 				   false);
143 	if (!enqueue_check(p, ev, num, enqueued))
144 		return 0;
145 
146 
147 	if (enqueued < num)
148 		rte_errno = ENOSPC;
149 
150 	return enqueued;
151 }
152 
153 /*
154  * Error TX handler
155  *
156  */
157 
158 static uint16_t
opdl_tx_error_dequeue(struct opdl_port * p,struct rte_event ev[],uint16_t num)159 opdl_tx_error_dequeue(struct opdl_port *p,
160 		struct rte_event ev[],
161 		uint16_t num)
162 {
163 	RTE_SET_USED(p);
164 	RTE_SET_USED(ev);
165 	RTE_SET_USED(num);
166 
167 	rte_errno = ENOSPC;
168 
169 	return 0;
170 }
171 
172 /*
173  * TX single threaded claim
174  *
175  * This function handles dequeue for a single worker stage_inst with
176  *	threadsafe disabled. eg 1 thread using an stage_inst
177  */
178 
179 static uint16_t
opdl_tx_dequeue_single_thread(struct opdl_port * p,struct rte_event ev[],uint16_t num)180 opdl_tx_dequeue_single_thread(struct opdl_port *p,
181 			struct rte_event ev[],
182 			uint16_t num)
183 {
184 	uint16_t returned;
185 
186 	struct opdl_ring  *ring;
187 
188 	ring = opdl_stage_get_opdl_ring(p->deq_stage_inst);
189 
190 	returned = opdl_ring_copy_to_burst(ring,
191 					   p->deq_stage_inst,
192 					   ev,
193 					   num,
194 					   false);
195 
196 	update_on_dequeue(p, ev, num, returned);
197 
198 	return returned;
199 }
200 
201 /*
202  * TX multi threaded claim
203  *
204  * This function handles dequeue for multiple worker stage_inst with
205  *	threadsafe disabled. eg multiple stage_inst each with its own instance
206  */
207 
208 static uint16_t
opdl_tx_dequeue_multi_inst(struct opdl_port * p,struct rte_event ev[],uint16_t num)209 opdl_tx_dequeue_multi_inst(struct opdl_port *p,
210 			struct rte_event ev[],
211 			uint16_t num)
212 {
213 	uint32_t num_events = 0;
214 
215 	num_events = opdl_stage_claim(p->deq_stage_inst,
216 				    (void *)ev,
217 				    num,
218 				    NULL,
219 				    false,
220 				    false);
221 
222 	update_on_dequeue(p, ev, num, num_events);
223 
224 	return opdl_stage_disclaim(p->deq_stage_inst, num_events, false);
225 }
226 
227 
228 /*
229  * Worker thread claim
230  *
231  */
232 
233 static uint16_t
opdl_claim(struct opdl_port * p,struct rte_event ev[],uint16_t num)234 opdl_claim(struct opdl_port *p, struct rte_event ev[], uint16_t num)
235 {
236 	uint32_t num_events = 0;
237 
238 	if (unlikely(num > MAX_OPDL_CONS_Q_DEPTH)) {
239 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
240 			     "Attempt to dequeue num of events larger than port (%d) max",
241 			     opdl_pmd_dev_id(p->opdl),
242 			     p->id);
243 		rte_errno = EINVAL;
244 		return 0;
245 	}
246 
247 
248 	num_events = opdl_stage_claim(p->deq_stage_inst,
249 			(void *)ev,
250 			num,
251 			NULL,
252 			false,
253 			p->atomic_claim);
254 
255 
256 	update_on_dequeue(p, ev, num, num_events);
257 
258 	return num_events;
259 }
260 
261 /*
262  * Worker thread disclaim
263  */
264 
265 static uint16_t
opdl_disclaim(struct opdl_port * p,const struct rte_event ev[],uint16_t num)266 opdl_disclaim(struct opdl_port *p, const struct rte_event ev[], uint16_t num)
267 {
268 	uint16_t enqueued = 0;
269 
270 	uint32_t i = 0;
271 
272 	for (i = 0; i < num; i++)
273 		opdl_ring_cas_slot(p->enq_stage_inst, &ev[i],
274 				i, p->atomic_claim);
275 
276 	enqueued = opdl_stage_disclaim(p->enq_stage_inst,
277 				       num,
278 				       false);
279 
280 	return enqueue_check(p, ev, num, enqueued);
281 }
282 
283 static __rte_always_inline struct opdl_stage *
stage_for_port(struct opdl_queue * q,unsigned int i)284 stage_for_port(struct opdl_queue *q, unsigned int i)
285 {
286 	if (q->q_pos == OPDL_Q_POS_START || q->q_pos == OPDL_Q_POS_MIDDLE)
287 		return q->ports[i]->enq_stage_inst;
288 	else
289 		return q->ports[i]->deq_stage_inst;
290 }
291 
opdl_add_deps(struct opdl_evdev * device,int q_id,int deps_q_id)292 static int opdl_add_deps(struct opdl_evdev *device,
293 			 int q_id,
294 			 int deps_q_id)
295 {
296 	unsigned int i, j;
297 	int status;
298 	struct opdl_ring  *ring;
299 	struct opdl_queue *queue = &device->queue[q_id];
300 	struct opdl_queue *queue_deps = &device->queue[deps_q_id];
301 	struct opdl_stage *dep_stages[OPDL_PORTS_MAX];
302 
303 	/* sanity check that all stages are for same opdl ring */
304 	for (i = 0; i < queue->nb_ports; i++) {
305 		struct opdl_ring *r =
306 			opdl_stage_get_opdl_ring(stage_for_port(queue, i));
307 		for (j = 0; j < queue_deps->nb_ports; j++) {
308 			struct opdl_ring *rj =
309 				opdl_stage_get_opdl_ring(
310 						stage_for_port(queue_deps, j));
311 			if (r != rj) {
312 				PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
313 					     "Stages and dependents"
314 					     " are not for same opdl ring",
315 					     opdl_pmd_dev_id(device));
316 				uint32_t k;
317 				for (k = 0; k < device->nb_opdls; k++) {
318 					opdl_ring_dump(device->opdl[k],
319 							stdout);
320 				}
321 				return -EINVAL;
322 			}
323 		}
324 	}
325 
326 	/* Gather all stages instance in deps */
327 	for (i = 0; i < queue_deps->nb_ports; i++)
328 		dep_stages[i] = stage_for_port(queue_deps, i);
329 
330 
331 	/* Add all deps for each port->stage_inst in this queue */
332 	for (i = 0; i < queue->nb_ports; i++) {
333 
334 		ring = opdl_stage_get_opdl_ring(stage_for_port(queue, i));
335 
336 		status = opdl_stage_deps_add(ring,
337 				stage_for_port(queue, i),
338 				queue->ports[i]->num_instance,
339 				queue->ports[i]->instance_id,
340 				dep_stages,
341 				queue_deps->nb_ports);
342 		if (status < 0)
343 			return -EINVAL;
344 	}
345 
346 	return 0;
347 }
348 
349 int
opdl_add_event_handlers(struct rte_eventdev * dev)350 opdl_add_event_handlers(struct rte_eventdev *dev)
351 {
352 	int err = 0;
353 
354 	struct opdl_evdev *device = opdl_pmd_priv(dev);
355 	unsigned int i;
356 
357 	for (i = 0; i < device->max_port_nb; i++) {
358 
359 		struct opdl_port *port = &device->ports[i];
360 
361 		if (port->configured) {
362 			if (port->p_type == OPDL_PURE_RX_PORT) {
363 				port->enq = opdl_rx_enqueue;
364 				port->deq = opdl_tx_error_dequeue;
365 
366 			} else if (port->p_type == OPDL_PURE_TX_PORT) {
367 
368 				port->enq = opdl_rx_error_enqueue;
369 
370 				if (port->num_instance == 1)
371 					port->deq =
372 						opdl_tx_dequeue_single_thread;
373 				else
374 					port->deq = opdl_tx_dequeue_multi_inst;
375 
376 			} else if (port->p_type == OPDL_REGULAR_PORT) {
377 
378 				port->enq = opdl_disclaim;
379 				port->deq = opdl_claim;
380 
381 			} else if (port->p_type == OPDL_ASYNC_PORT) {
382 
383 				port->enq = opdl_rx_enqueue;
384 
385 				/* Always single instance */
386 				port->deq = opdl_tx_dequeue_single_thread;
387 			} else {
388 				PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
389 					     "port:[%u] has invalid port type - ",
390 					     opdl_pmd_dev_id(port->opdl),
391 					     port->id);
392 				err = -EINVAL;
393 				break;
394 			}
395 			port->initialized = 1;
396 		}
397 	}
398 
399 	if (!err)
400 		fprintf(stdout, "Success - enqueue/dequeue handler(s) added\n");
401 	return err;
402 }
403 
404 int
build_all_dependencies(struct rte_eventdev * dev)405 build_all_dependencies(struct rte_eventdev *dev)
406 {
407 
408 	int err = 0;
409 	unsigned int i;
410 	struct opdl_evdev *device = opdl_pmd_priv(dev);
411 
412 	uint8_t start_qid = 0;
413 
414 	for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
415 		struct opdl_queue *queue = &device->queue[i];
416 		if (!queue->initialized)
417 			break;
418 
419 		if (queue->q_pos == OPDL_Q_POS_START) {
420 			start_qid = i;
421 			continue;
422 		}
423 
424 		if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
425 			err = opdl_add_deps(device, i, i-1);
426 			if (err < 0) {
427 				PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
428 					     "dependency addition for queue:[%u] - FAILED",
429 					     dev->data->dev_id,
430 					     queue->external_qid);
431 				break;
432 			}
433 		}
434 
435 		if (queue->q_pos == OPDL_Q_POS_END) {
436 			/* Add this dependency */
437 			err = opdl_add_deps(device, i, i-1);
438 			if (err < 0) {
439 				PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
440 					     "dependency addition for queue:[%u] - FAILED",
441 					     dev->data->dev_id,
442 					     queue->external_qid);
443 				break;
444 			}
445 			/* Add dependency for rx on tx */
446 			err = opdl_add_deps(device, start_qid, i);
447 			if (err < 0) {
448 				PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
449 					     "dependency addition for queue:[%u] - FAILED",
450 					     dev->data->dev_id,
451 					     queue->external_qid);
452 				break;
453 			}
454 		}
455 	}
456 
457 	if (!err)
458 		fprintf(stdout, "Success - dependencies built\n");
459 
460 	return err;
461 }
462 int
check_queues_linked(struct rte_eventdev * dev)463 check_queues_linked(struct rte_eventdev *dev)
464 {
465 
466 	int err = 0;
467 	unsigned int i;
468 	struct opdl_evdev *device = opdl_pmd_priv(dev);
469 	uint32_t nb_iq = 0;
470 
471 	for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
472 		struct opdl_queue *queue = &device->queue[i];
473 
474 		if (!queue->initialized)
475 			break;
476 
477 		if (queue->external_qid == OPDL_INVALID_QID)
478 			nb_iq++;
479 
480 		if (queue->nb_ports == 0) {
481 			PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
482 				     "queue:[%u] has no associated ports",
483 				     dev->data->dev_id,
484 				     i);
485 			err = -EINVAL;
486 			break;
487 		}
488 	}
489 	if (!err) {
490 		if ((i - nb_iq) != device->max_queue_nb) {
491 			PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
492 				     "%u queues counted but should be %u",
493 				     dev->data->dev_id,
494 				     i - nb_iq,
495 				     device->max_queue_nb);
496 			err = -1;
497 		}
498 
499 	}
500 	return err;
501 }
502 
503 void
destroy_queues_and_rings(struct rte_eventdev * dev)504 destroy_queues_and_rings(struct rte_eventdev *dev)
505 {
506 	struct opdl_evdev *device = opdl_pmd_priv(dev);
507 	uint32_t i;
508 
509 	for (i = 0; i < device->nb_opdls; i++) {
510 		if (device->opdl[i])
511 			opdl_ring_free(device->opdl[i]);
512 	}
513 
514 	memset(&device->queue,
515 			0,
516 			sizeof(struct opdl_queue)
517 			* RTE_EVENT_MAX_QUEUES_PER_DEV);
518 }
519 
520 #define OPDL_ID(d)(d->nb_opdls - 1)
521 
522 static __rte_always_inline void
initialise_queue(struct opdl_evdev * device,enum queue_pos pos,int32_t i)523 initialise_queue(struct opdl_evdev *device,
524 		enum queue_pos pos,
525 		int32_t i)
526 {
527 	struct opdl_queue *queue = &device->queue[device->nb_queues];
528 
529 	if (i == -1) {
530 		queue->q_type = OPDL_Q_TYPE_ORDERED;
531 		queue->external_qid = OPDL_INVALID_QID;
532 	} else {
533 		queue->q_type = device->q_md[i].type;
534 		queue->external_qid = device->q_md[i].ext_id;
535 		/* Add ex->in for queues setup */
536 		device->q_map_ex_to_in[queue->external_qid] = device->nb_queues;
537 	}
538 	queue->opdl_id = OPDL_ID(device);
539 	queue->q_pos = pos;
540 	queue->nb_ports = 0;
541 	queue->configured = 1;
542 
543 	device->nb_queues++;
544 }
545 
546 
547 static __rte_always_inline int
create_opdl(struct opdl_evdev * device)548 create_opdl(struct opdl_evdev *device)
549 {
550 	int err = 0;
551 
552 	char name[RTE_MEMZONE_NAMESIZE];
553 
554 	snprintf(name, RTE_MEMZONE_NAMESIZE,
555 			"%s_%u", device->service_name, device->nb_opdls);
556 
557 	device->opdl[device->nb_opdls] =
558 		opdl_ring_create(name,
559 				device->nb_events_limit,
560 				sizeof(struct rte_event),
561 				device->max_port_nb * 2,
562 				device->socket);
563 
564 	if (!device->opdl[device->nb_opdls]) {
565 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
566 			     "opdl ring %u creation - FAILED",
567 			     opdl_pmd_dev_id(device),
568 			     device->nb_opdls);
569 		err = -EINVAL;
570 	} else {
571 		device->nb_opdls++;
572 	}
573 	return err;
574 }
575 
576 static __rte_always_inline int
create_link_opdl(struct opdl_evdev * device,uint32_t index)577 create_link_opdl(struct opdl_evdev *device, uint32_t index)
578 {
579 
580 	int err = 0;
581 
582 	if (device->q_md[index + 1].type !=
583 			OPDL_Q_TYPE_SINGLE_LINK) {
584 
585 		/* async queue with regular
586 		 * queue following it
587 		 */
588 
589 		/* create a new opdl ring */
590 		err = create_opdl(device);
591 		if (!err) {
592 			/* create an initial
593 			 * dummy queue for new opdl
594 			 */
595 			initialise_queue(device,
596 					OPDL_Q_POS_START,
597 					-1);
598 		} else {
599 			err = -EINVAL;
600 		}
601 	} else {
602 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
603 			     "queue %u, two consecutive"
604 			     " SINGLE_LINK queues, not allowed",
605 			     opdl_pmd_dev_id(device),
606 			     index);
607 		err = -EINVAL;
608 	}
609 
610 	return err;
611 }
612 
613 int
create_queues_and_rings(struct rte_eventdev * dev)614 create_queues_and_rings(struct rte_eventdev *dev)
615 {
616 	int err = 0;
617 
618 	struct opdl_evdev *device = opdl_pmd_priv(dev);
619 
620 	device->nb_queues = 0;
621 
622 	if (device->nb_ports != device->max_port_nb) {
623 		PMD_DRV_LOG(ERR, "Number ports setup:%u NOT EQUAL to max port"
624 				" number:%u for this device",
625 				device->nb_ports,
626 				device->max_port_nb);
627 		err = -1;
628 	}
629 
630 	if (!err) {
631 		/* We will have at least one opdl so create it now */
632 		err = create_opdl(device);
633 	}
634 
635 	if (!err) {
636 
637 		/* Create 1st "dummy" queue */
638 		initialise_queue(device,
639 				 OPDL_Q_POS_START,
640 				 -1);
641 
642 		uint32_t i;
643 		for (i = 0; i < device->nb_q_md; i++) {
644 
645 			/* Check */
646 			if (!device->q_md[i].setup) {
647 
648 				PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
649 					     "queue meta data slot %u"
650 					     " not setup - FAILING",
651 					     dev->data->dev_id,
652 					     i);
653 				err = -EINVAL;
654 				break;
655 			} else if (device->q_md[i].type !=
656 					OPDL_Q_TYPE_SINGLE_LINK) {
657 
658 				if (!device->q_md[i + 1].setup) {
659 					/* Create a simple ORDERED/ATOMIC
660 					 * queue at the end
661 					 */
662 					initialise_queue(device,
663 							OPDL_Q_POS_END,
664 							i);
665 
666 				} else {
667 					/* Create a simple ORDERED/ATOMIC
668 					 * queue in the middle
669 					 */
670 					initialise_queue(device,
671 							OPDL_Q_POS_MIDDLE,
672 							i);
673 				}
674 			} else if (device->q_md[i].type ==
675 					OPDL_Q_TYPE_SINGLE_LINK) {
676 
677 				/* create last queue for this opdl */
678 				initialise_queue(device,
679 						OPDL_Q_POS_END,
680 						i);
681 
682 				err = create_link_opdl(device, i);
683 
684 				if (err)
685 					break;
686 
687 
688 			}
689 		}
690 	}
691 	if (err)
692 		destroy_queues_and_rings(dev);
693 
694 	return err;
695 }
696 
697 
698 int
initialise_all_other_ports(struct rte_eventdev * dev)699 initialise_all_other_ports(struct rte_eventdev *dev)
700 {
701 	int err = 0;
702 	struct opdl_stage *stage_inst = NULL;
703 
704 	struct opdl_evdev *device = opdl_pmd_priv(dev);
705 
706 	uint32_t i;
707 	for (i = 0; i < device->nb_ports; i++) {
708 		struct opdl_port *port = &device->ports[i];
709 		struct opdl_queue *queue = &device->queue[port->queue_id];
710 
711 		if (port->queue_id == 0) {
712 			continue;
713 		} else if (queue->q_type != OPDL_Q_TYPE_SINGLE_LINK) {
714 
715 			if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
716 
717 				/* Regular port with claim/disclaim */
718 				stage_inst = opdl_stage_add(
719 					device->opdl[queue->opdl_id],
720 						false,
721 						false);
722 				port->deq_stage_inst = stage_inst;
723 				port->enq_stage_inst = stage_inst;
724 
725 				if (queue->q_type == OPDL_Q_TYPE_ATOMIC)
726 					port->atomic_claim = true;
727 				else
728 					port->atomic_claim = false;
729 
730 				port->p_type =  OPDL_REGULAR_PORT;
731 
732 				/* Add the port to the queue array of ports */
733 				queue->ports[queue->nb_ports] = port;
734 				port->instance_id = queue->nb_ports;
735 				queue->nb_ports++;
736 				opdl_stage_set_queue_id(stage_inst,
737 						port->queue_id);
738 
739 			} else if (queue->q_pos == OPDL_Q_POS_END) {
740 
741 				/* tx port  */
742 				stage_inst = opdl_stage_add(
743 					device->opdl[queue->opdl_id],
744 						false,
745 						false);
746 				port->deq_stage_inst = stage_inst;
747 				port->enq_stage_inst = NULL;
748 				port->p_type = OPDL_PURE_TX_PORT;
749 
750 				/* Add the port to the queue array of ports */
751 				queue->ports[queue->nb_ports] = port;
752 				port->instance_id = queue->nb_ports;
753 				queue->nb_ports++;
754 			} else {
755 
756 				PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
757 					     "port %u:, linked incorrectly"
758 					     " to a q_pos START/INVALID %u",
759 					     opdl_pmd_dev_id(port->opdl),
760 					     port->id,
761 					     queue->q_pos);
762 				err = -EINVAL;
763 				break;
764 			}
765 
766 		} else if (queue->q_type == OPDL_Q_TYPE_SINGLE_LINK) {
767 
768 			port->p_type = OPDL_ASYNC_PORT;
769 
770 			/* -- tx -- */
771 			stage_inst = opdl_stage_add(
772 				device->opdl[queue->opdl_id],
773 					false,
774 					false); /* First stage */
775 			port->deq_stage_inst = stage_inst;
776 
777 			/* Add the port to the queue array of ports */
778 			queue->ports[queue->nb_ports] = port;
779 			port->instance_id = queue->nb_ports;
780 			queue->nb_ports++;
781 
782 			if (queue->nb_ports > 1) {
783 				PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
784 					     "queue %u:, setup as SINGLE_LINK"
785 					     " but has more than one port linked",
786 					     opdl_pmd_dev_id(port->opdl),
787 					     queue->external_qid);
788 				err = -EINVAL;
789 				break;
790 			}
791 
792 			/* -- single instance rx for next opdl -- */
793 			uint8_t next_qid =
794 				device->q_map_ex_to_in[queue->external_qid] + 1;
795 			if (next_qid < RTE_EVENT_MAX_QUEUES_PER_DEV &&
796 					device->queue[next_qid].configured) {
797 
798 				/* Remap the queue */
799 				queue = &device->queue[next_qid];
800 
801 				stage_inst = opdl_stage_add(
802 					device->opdl[queue->opdl_id],
803 						false,
804 						true);
805 				port->enq_stage_inst = stage_inst;
806 
807 				/* Add the port to the queue array of ports */
808 				queue->ports[queue->nb_ports] = port;
809 				port->instance_id = queue->nb_ports;
810 				queue->nb_ports++;
811 				if (queue->nb_ports > 1) {
812 					PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
813 						"dummy queue %u: for "
814 						"port %u, "
815 						"SINGLE_LINK but has more "
816 						"than one port linked",
817 						opdl_pmd_dev_id(port->opdl),
818 						next_qid,
819 						port->id);
820 					err = -EINVAL;
821 					break;
822 				}
823 				/* Set this queue to initialized as it is never
824 				 * referenced by any ports
825 				 */
826 				queue->initialized = 1;
827 			}
828 		}
829 	}
830 
831 	/* Now that all ports are initialised we need to
832 	 * setup the last bit of stage md
833 	 */
834 	if (!err) {
835 		for (i = 0; i < device->nb_ports; i++) {
836 			struct opdl_port *port = &device->ports[i];
837 			struct opdl_queue *queue =
838 				&device->queue[port->queue_id];
839 
840 			if (port->configured &&
841 					(port->queue_id != OPDL_INVALID_QID)) {
842 				if (queue->nb_ports == 0) {
843 					PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
844 						"queue:[%u] has no ports"
845 						" linked to it",
846 						opdl_pmd_dev_id(port->opdl),
847 						port->id);
848 					err = -EINVAL;
849 					break;
850 				}
851 
852 				port->num_instance = queue->nb_ports;
853 				port->initialized = 1;
854 				queue->initialized = 1;
855 			} else {
856 				PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
857 					     "Port:[%u] not configured  invalid"
858 					     " queue configuration",
859 					     opdl_pmd_dev_id(port->opdl),
860 					     port->id);
861 				err = -EINVAL;
862 				break;
863 			}
864 		}
865 	}
866 	return err;
867 }
868 
869 int
initialise_queue_zero_ports(struct rte_eventdev * dev)870 initialise_queue_zero_ports(struct rte_eventdev *dev)
871 {
872 	int err = 0;
873 	uint8_t mt_rx = 0;
874 	struct opdl_stage *stage_inst = NULL;
875 	struct opdl_queue *queue = NULL;
876 
877 	struct opdl_evdev *device = opdl_pmd_priv(dev);
878 
879 	/* Assign queue zero and figure out how many Q0 ports we have */
880 	uint32_t i;
881 	for (i = 0; i < device->nb_ports; i++) {
882 		struct opdl_port *port = &device->ports[i];
883 		if (port->queue_id == OPDL_INVALID_QID) {
884 			port->queue_id = 0;
885 			port->external_qid = OPDL_INVALID_QID;
886 			port->p_type = OPDL_PURE_RX_PORT;
887 			mt_rx++;
888 		}
889 	}
890 
891 	/* Create the stage */
892 	stage_inst = opdl_stage_add(device->opdl[0],
893 			(mt_rx > 1 ? true : false),
894 			true);
895 	if (stage_inst) {
896 
897 		/* Assign the new created input stage to all relevant ports */
898 		for (i = 0; i < device->nb_ports; i++) {
899 			struct opdl_port *port = &device->ports[i];
900 			if (port->queue_id == 0) {
901 				queue = &device->queue[port->queue_id];
902 				port->enq_stage_inst = stage_inst;
903 				port->deq_stage_inst = NULL;
904 				port->configured = 1;
905 				port->initialized = 1;
906 
907 				queue->ports[queue->nb_ports] = port;
908 				port->instance_id = queue->nb_ports;
909 				queue->nb_ports++;
910 			}
911 		}
912 	} else {
913 		err = -1;
914 	}
915 	return err;
916 }
917 
918 int
assign_internal_queue_ids(struct rte_eventdev * dev)919 assign_internal_queue_ids(struct rte_eventdev *dev)
920 {
921 	int err = 0;
922 	struct opdl_evdev *device = opdl_pmd_priv(dev);
923 	uint32_t i;
924 
925 	for (i = 0; i < device->nb_ports; i++) {
926 		struct opdl_port *port = &device->ports[i];
927 		if (port->external_qid != OPDL_INVALID_QID) {
928 			port->queue_id =
929 				device->q_map_ex_to_in[port->external_qid];
930 
931 			/* Now do the external_qid of the next queue */
932 			struct opdl_queue *queue =
933 				&device->queue[port->queue_id];
934 			if (queue->q_pos == OPDL_Q_POS_END)
935 				port->next_external_qid =
936 				device->queue[port->queue_id + 2].external_qid;
937 			else
938 				port->next_external_qid =
939 				device->queue[port->queue_id + 1].external_qid;
940 		}
941 	}
942 	return err;
943 }
944