xref: /dpdk/drivers/event/dlb2/dlb2.c (revision e20e2148cf9268fa16ad6d0baff943a3eaae5bf0)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2022 Intel Corporation
3  */
4 
5 #include <assert.h>
6 #include <errno.h>
7 #include <nmmintrin.h>
8 #include <pthread.h>
9 #include <stdint.h>
10 #include <stdbool.h>
11 #include <stdio.h>
12 #include <string.h>
13 #include <sys/mman.h>
14 #include <fcntl.h>
15 
16 #include <rte_common.h>
17 #include <rte_config.h>
18 #include <rte_cycles.h>
19 #include <rte_debug.h>
20 #include <dev_driver.h>
21 #include <rte_errno.h>
22 #include <rte_eventdev.h>
23 #include <eventdev_pmd.h>
24 #include <rte_io.h>
25 #include <rte_kvargs.h>
26 #include <rte_log.h>
27 #include <rte_malloc.h>
28 #include <rte_mbuf.h>
29 #include <rte_power_intrinsics.h>
30 #include <rte_prefetch.h>
31 #include <rte_ring.h>
32 #include <rte_string_fns.h>
33 
34 #include "dlb2_priv.h"
35 #include "dlb2_iface.h"
36 #include "dlb2_inline_fns.h"
37 
38 /*
39  * Bypass memory fencing instructions when port is of Producer type.
40  * This should be enabled very carefully with understanding that producer
41  * is not doing any writes which need fencing. The movdir64 instruction used to
42  * enqueue events to DLB is a weakly-ordered instruction and movdir64 write
43  * to DLB can go ahead of relevant application writes like updates to buffers
44  * being sent with event
45  */
46 #define DLB2_BYPASS_FENCE_ON_PP 0  /* 1 == Bypass fence, 0 == do not bypass */
47 
48 /*
49  * Resources exposed to eventdev. Some values overridden at runtime using
50  * values returned by the DLB kernel driver.
51  */
52 #if (RTE_EVENT_MAX_QUEUES_PER_DEV > UINT8_MAX)
53 #error "RTE_EVENT_MAX_QUEUES_PER_DEV cannot fit in member max_event_queues"
54 #endif
55 static struct rte_event_dev_info evdev_dlb2_default_info = {
56 	.driver_name = "", /* probe will set */
57 	.min_dequeue_timeout_ns = DLB2_MIN_DEQUEUE_TIMEOUT_NS,
58 	.max_dequeue_timeout_ns = DLB2_MAX_DEQUEUE_TIMEOUT_NS,
59 #if (RTE_EVENT_MAX_QUEUES_PER_DEV < DLB2_MAX_NUM_LDB_QUEUES)
60 	.max_event_queues = RTE_EVENT_MAX_QUEUES_PER_DEV,
61 #else
62 	.max_event_queues = DLB2_MAX_NUM_LDB_QUEUES,
63 #endif
64 	.max_event_queue_flows = DLB2_MAX_NUM_FLOWS,
65 	.max_event_queue_priority_levels = DLB2_QID_PRIORITIES,
66 	.max_event_priority_levels = DLB2_QID_PRIORITIES,
67 	.max_event_ports = DLB2_MAX_NUM_LDB_PORTS,
68 	.max_event_port_dequeue_depth = DLB2_DEFAULT_CQ_DEPTH,
69 	.max_event_port_enqueue_depth = DLB2_MAX_ENQUEUE_DEPTH,
70 	.max_event_port_links = DLB2_MAX_NUM_QIDS_PER_LDB_CQ,
71 	.max_num_events = DLB2_MAX_NUM_LDB_CREDITS,
72 	.max_single_link_event_port_queue_pairs =
73 		DLB2_MAX_NUM_DIR_PORTS(DLB2_HW_V2),
74 	.event_dev_cap = (RTE_EVENT_DEV_CAP_ATOMIC |
75 			  RTE_EVENT_DEV_CAP_ORDERED |
76 			  RTE_EVENT_DEV_CAP_PARALLEL |
77 			  RTE_EVENT_DEV_CAP_EVENT_QOS |
78 			  RTE_EVENT_DEV_CAP_NONSEQ_MODE |
79 			  RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED |
80 			  RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES |
81 			  RTE_EVENT_DEV_CAP_BURST_MODE |
82 			  RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE |
83 			  RTE_EVENT_DEV_CAP_RUNTIME_PORT_LINK |
84 			  RTE_EVENT_DEV_CAP_MULTIPLE_QUEUE_PORT |
85 			  RTE_EVENT_DEV_CAP_INDEPENDENT_ENQ |
86 			  RTE_EVENT_DEV_CAP_MAINTENANCE_FREE),
87 	.max_profiles_per_port = 1,
88 };
89 
90 struct process_local_port_data
91 dlb2_port[DLB2_MAX_NUM_PORTS_ALL][DLB2_NUM_PORT_TYPES];
92 
93 static void
94 dlb2_free_qe_mem(struct dlb2_port *qm_port)
95 {
96 	if (qm_port == NULL)
97 		return;
98 
99 	rte_free(qm_port->qe4);
100 	qm_port->qe4 = NULL;
101 
102 	if (qm_port->order) {
103 		rte_free(qm_port->order);
104 		qm_port->order = NULL;
105 	}
106 
107 	rte_free(qm_port->int_arm_qe);
108 	qm_port->int_arm_qe = NULL;
109 
110 	rte_free(qm_port->consume_qe);
111 	qm_port->consume_qe = NULL;
112 
113 	rte_memzone_free(dlb2_port[qm_port->id][PORT_TYPE(qm_port)].mz);
114 	dlb2_port[qm_port->id][PORT_TYPE(qm_port)].mz = NULL;
115 }
116 
117 /* override defaults with value(s) provided on command line */
118 static void
119 dlb2_init_queue_depth_thresholds(struct dlb2_eventdev *dlb2,
120 				 int *qid_depth_thresholds)
121 {
122 	int q;
123 
124 	for (q = 0; q < DLB2_MAX_NUM_QUEUES(dlb2->version); q++) {
125 		if (qid_depth_thresholds[q] != 0)
126 			dlb2->ev_queues[q].depth_threshold =
127 				qid_depth_thresholds[q];
128 	}
129 }
130 
131 /* override defaults with value(s) provided on command line */
132 static void
133 dlb2_init_port_cos(struct dlb2_eventdev *dlb2, int *port_cos)
134 {
135 	int q;
136 
137 	for (q = 0; q < DLB2_MAX_NUM_PORTS_ALL; q++) {
138 		dlb2->ev_ports[q].cos_id = port_cos[q];
139 		if (port_cos[q] != DLB2_COS_DEFAULT &&
140 		    dlb2->cos_ports[port_cos[q]] < DLB2_MAX_NUM_LDB_PORTS_PER_COS) {
141 			dlb2->cos_ports[port_cos[q]]++;
142 			dlb2->max_cos_port = q;
143 		}
144 	}
145 }
146 
147 static void
148 dlb2_init_cos_bw(struct dlb2_eventdev *dlb2,
149 		 struct dlb2_cos_bw *cos_bw)
150 {
151 	int q;
152 
153 
154 	/* If cos_bw not set, then split evenly */
155 	if (cos_bw->val[0] == 0 && cos_bw->val[1] == 0 &&
156 		cos_bw->val[2] == 0 && cos_bw->val[3] == 0) {
157 		cos_bw->val[0] = 25;
158 		cos_bw->val[1] = 25;
159 		cos_bw->val[2] = 25;
160 		cos_bw->val[3] = 25;
161 	}
162 
163 	for (q = 0; q < DLB2_COS_NUM_VALS; q++)
164 		dlb2->cos_bw[q] = cos_bw->val[q];
165 
166 }
167 
168 static int
169 dlb2_hw_query_resources(struct dlb2_eventdev *dlb2)
170 {
171 	struct dlb2_hw_dev *handle = &dlb2->qm_instance;
172 	int num_ldb_ports;
173 	int ret;
174 
175 	/* Query driver resources provisioned for this device */
176 
177 	ret = dlb2_iface_get_num_resources(handle,
178 					   &dlb2->hw_rsrc_query_results);
179 	if (ret) {
180 		DLB2_LOG_ERR("ioctl get dlb2 num resources, err=%d", ret);
181 		return ret;
182 	}
183 
184 	/* Complete filling in device resource info returned to evdev app,
185 	 * overriding any default values.
186 	 * The capabilities (CAPs) were set at compile time.
187 	 */
188 
189 	if (dlb2->max_cq_depth != DLB2_DEFAULT_CQ_DEPTH)
190 		num_ldb_ports = DLB2_MAX_HL_ENTRIES / dlb2->max_cq_depth;
191 	else
192 		num_ldb_ports = dlb2->hw_rsrc_query_results.num_ldb_ports;
193 
194 	evdev_dlb2_default_info.max_event_queues =
195 		dlb2->hw_rsrc_query_results.num_ldb_queues;
196 
197 	evdev_dlb2_default_info.max_event_ports = num_ldb_ports;
198 
199 	if (dlb2->version == DLB2_HW_V2_5) {
200 		evdev_dlb2_default_info.max_num_events =
201 			dlb2->hw_rsrc_query_results.num_credits;
202 	} else {
203 		evdev_dlb2_default_info.max_num_events =
204 			dlb2->hw_rsrc_query_results.num_ldb_credits;
205 	}
206 	/* Save off values used when creating the scheduling domain. */
207 
208 	handle->info.num_sched_domains =
209 		dlb2->hw_rsrc_query_results.num_sched_domains;
210 
211 	if (dlb2->version == DLB2_HW_V2_5) {
212 		handle->info.hw_rsrc_max.nb_events_limit =
213 			dlb2->hw_rsrc_query_results.num_credits;
214 	} else {
215 		handle->info.hw_rsrc_max.nb_events_limit =
216 			dlb2->hw_rsrc_query_results.num_ldb_credits;
217 	}
218 	handle->info.hw_rsrc_max.num_queues =
219 		dlb2->hw_rsrc_query_results.num_ldb_queues +
220 		dlb2->hw_rsrc_query_results.num_dir_ports;
221 
222 	handle->info.hw_rsrc_max.num_ldb_queues =
223 		dlb2->hw_rsrc_query_results.num_ldb_queues;
224 
225 	handle->info.hw_rsrc_max.num_ldb_ports = num_ldb_ports;
226 
227 	handle->info.hw_rsrc_max.num_dir_ports =
228 		dlb2->hw_rsrc_query_results.num_dir_ports;
229 
230 	handle->info.hw_rsrc_max.reorder_window_size =
231 		dlb2->hw_rsrc_query_results.num_hist_list_entries;
232 
233 	return 0;
234 }
235 
236 #define DLB2_BASE_10 10
237 
238 static int
239 dlb2_string_to_int(int *result, const char *str)
240 {
241 	long ret;
242 	char *endptr;
243 
244 	if (str == NULL || result == NULL)
245 		return -EINVAL;
246 
247 	errno = 0;
248 	ret = strtol(str, &endptr, DLB2_BASE_10);
249 	if (errno)
250 		return -errno;
251 
252 	/* long int and int may be different width for some architectures */
253 	if (ret < INT_MIN || ret > INT_MAX || endptr == str)
254 		return -EINVAL;
255 
256 	*result = ret;
257 	return 0;
258 }
259 
260 static int
261 set_producer_coremask(const char *key __rte_unused,
262 		      const char *value,
263 		      void *opaque)
264 {
265 	const char **mask_str = opaque;
266 
267 	if (value == NULL || opaque == NULL) {
268 		DLB2_LOG_ERR("NULL pointer");
269 		return -EINVAL;
270 	}
271 
272 	*mask_str = value;
273 
274 	return 0;
275 }
276 
277 static int
278 set_numa_node(const char *key __rte_unused, const char *value, void *opaque)
279 {
280 	int *socket_id = opaque;
281 	int ret;
282 
283 	ret = dlb2_string_to_int(socket_id, value);
284 	if (ret < 0)
285 		return ret;
286 
287 	if (*socket_id > RTE_MAX_NUMA_NODES)
288 		return -EINVAL;
289 	return 0;
290 }
291 
292 
293 static int
294 set_max_cq_depth(const char *key __rte_unused,
295 		 const char *value,
296 		 void *opaque)
297 {
298 	int *max_cq_depth = opaque;
299 	int ret;
300 
301 	if (value == NULL || opaque == NULL) {
302 		DLB2_LOG_ERR("NULL pointer");
303 		return -EINVAL;
304 	}
305 
306 	ret = dlb2_string_to_int(max_cq_depth, value);
307 	if (ret < 0)
308 		return ret;
309 
310 	if (*max_cq_depth < DLB2_MIN_CQ_DEPTH_OVERRIDE ||
311 	    *max_cq_depth > DLB2_MAX_CQ_DEPTH_OVERRIDE ||
312 	    !rte_is_power_of_2(*max_cq_depth)) {
313 		DLB2_LOG_ERR("dlb2: Allowed max_cq_depth range %d - %d and should be power of 2",
314 			     DLB2_MIN_CQ_DEPTH_OVERRIDE,
315 			     DLB2_MAX_CQ_DEPTH_OVERRIDE);
316 		return -EINVAL;
317 	}
318 
319 	return 0;
320 }
321 
322 static int
323 set_max_enq_depth(const char *key __rte_unused,
324 		  const char *value,
325 		  void *opaque)
326 {
327 	int *max_enq_depth = opaque;
328 	int ret;
329 
330 	if (value == NULL || opaque == NULL) {
331 		DLB2_LOG_ERR("NULL pointer");
332 		return -EINVAL;
333 	}
334 
335 	ret = dlb2_string_to_int(max_enq_depth, value);
336 	if (ret < 0)
337 		return ret;
338 
339 	if (*max_enq_depth < DLB2_MIN_ENQ_DEPTH_OVERRIDE ||
340 	    *max_enq_depth > DLB2_MAX_ENQ_DEPTH_OVERRIDE ||
341 	    !rte_is_power_of_2(*max_enq_depth)) {
342 		DLB2_LOG_ERR("dlb2: max_enq_depth %d and %d and a power of 2",
343 		DLB2_MIN_ENQ_DEPTH_OVERRIDE,
344 		DLB2_MAX_ENQ_DEPTH_OVERRIDE);
345 		return -EINVAL;
346 	}
347 
348 	return 0;
349 }
350 
351 static int
352 set_max_num_events(const char *key __rte_unused,
353 		   const char *value,
354 		   void *opaque)
355 {
356 	int *max_num_events = opaque;
357 	int ret;
358 
359 	if (value == NULL || opaque == NULL) {
360 		DLB2_LOG_ERR("NULL pointer");
361 		return -EINVAL;
362 	}
363 
364 	ret = dlb2_string_to_int(max_num_events, value);
365 	if (ret < 0)
366 		return ret;
367 
368 	if (*max_num_events < 0 || *max_num_events >
369 			DLB2_MAX_NUM_LDB_CREDITS) {
370 		DLB2_LOG_ERR("dlb2: max_num_events must be between 0 and %d",
371 			     DLB2_MAX_NUM_LDB_CREDITS);
372 		return -EINVAL;
373 	}
374 
375 	return 0;
376 }
377 
378 static int
379 set_num_dir_credits(const char *key __rte_unused,
380 		    const char *value,
381 		    void *opaque)
382 {
383 	int *num_dir_credits = opaque;
384 	int ret;
385 
386 	if (value == NULL || opaque == NULL) {
387 		DLB2_LOG_ERR("NULL pointer");
388 		return -EINVAL;
389 	}
390 
391 	ret = dlb2_string_to_int(num_dir_credits, value);
392 	if (ret < 0)
393 		return ret;
394 
395 	if (*num_dir_credits < 0 ||
396 	    *num_dir_credits > DLB2_MAX_NUM_DIR_CREDITS(DLB2_HW_V2)) {
397 		DLB2_LOG_ERR("dlb2: num_dir_credits must be between 0 and %d",
398 			     DLB2_MAX_NUM_DIR_CREDITS(DLB2_HW_V2));
399 		return -EINVAL;
400 	}
401 
402 	return 0;
403 }
404 
405 static int
406 set_dev_id(const char *key __rte_unused,
407 	   const char *value,
408 	   void *opaque)
409 {
410 	int *dev_id = opaque;
411 	int ret;
412 
413 	if (value == NULL || opaque == NULL) {
414 		DLB2_LOG_ERR("NULL pointer");
415 		return -EINVAL;
416 	}
417 
418 	ret = dlb2_string_to_int(dev_id, value);
419 	if (ret < 0)
420 		return ret;
421 
422 	return 0;
423 }
424 
425 static int
426 set_poll_interval(const char *key __rte_unused,
427 	const char *value,
428 	void *opaque)
429 {
430 	int *poll_interval = opaque;
431 	int ret;
432 
433 	if (value == NULL || opaque == NULL) {
434 		DLB2_LOG_ERR("NULL pointer");
435 		return -EINVAL;
436 	}
437 
438 	ret = dlb2_string_to_int(poll_interval, value);
439 	if (ret < 0)
440 		return ret;
441 
442 	return 0;
443 }
444 
445 static int
446 set_port_cos(const char *key __rte_unused,
447 	     const char *value,
448 	     void *opaque)
449 {
450 	struct dlb2_port_cos *port_cos = opaque;
451 	int first, last, cos_id, i;
452 
453 	if (value == NULL || opaque == NULL) {
454 		DLB2_LOG_ERR("NULL pointer");
455 		return -EINVAL;
456 	}
457 
458 	/* command line override may take one of the following 3 forms:
459 	 * port_cos=port-port:<cos_id> ... a range of ports
460 	 * port_cos=port:<cos_id> ... just one port
461 	 */
462 	if (sscanf(value, "%d-%d:%d", &first, &last, &cos_id) == 3) {
463 		/* we have everything we need */
464 	} else if (sscanf(value, "%d:%d", &first, &cos_id) == 2) {
465 		last = first;
466 	} else {
467 		DLB2_LOG_ERR("Error parsing ldb port port_cos devarg. Should be port-port:val, or port:val");
468 		return -EINVAL;
469 	}
470 
471 	if (first > last || first < 0 ||
472 		last >= DLB2_MAX_NUM_LDB_PORTS) {
473 		DLB2_LOG_ERR("Error parsing ldb port cos_id arg, invalid port value");
474 		return -EINVAL;
475 	}
476 
477 	if (cos_id < DLB2_COS_0 || cos_id > DLB2_COS_3) {
478 		DLB2_LOG_ERR("Error parsing ldb port cos_id devarg, must be between 0 and 4");
479 		return -EINVAL;
480 	}
481 
482 	for (i = first; i <= last; i++)
483 		port_cos->cos_id[i] = cos_id; /* indexed by port */
484 
485 	return 0;
486 }
487 
488 static int
489 set_cos_bw(const char *key __rte_unused,
490 	     const char *value,
491 	     void *opaque)
492 {
493 	struct dlb2_cos_bw *cos_bw = opaque;
494 
495 	if (opaque == NULL) {
496 		DLB2_LOG_ERR("NULL pointer");
497 		return -EINVAL;
498 	}
499 
500 	/* format must be %d,%d,%d,%d */
501 
502 	if (sscanf(value, "%d:%d:%d:%d", &cos_bw->val[0], &cos_bw->val[1],
503 		   &cos_bw->val[2], &cos_bw->val[3]) != 4) {
504 		DLB2_LOG_ERR("Error parsing cos bandwidth devarg. Should be bw0:bw1:bw2:bw3 where all values combined are <= 100");
505 		return -EINVAL;
506 	}
507 	if (cos_bw->val[0] + cos_bw->val[1] + cos_bw->val[2] + cos_bw->val[3] > 100) {
508 		DLB2_LOG_ERR("Error parsing cos bandwidth devarg. Should be bw0:bw1:bw2:bw3  where all values combined are <= 100");
509 		return -EINVAL;
510 	}
511 
512 	return 0;
513 }
514 
515 static int
516 set_sw_credit_quanta(const char *key __rte_unused,
517 	const char *value,
518 	void *opaque)
519 {
520 	int *sw_credit_quanta = opaque;
521 	int ret;
522 
523 	if (value == NULL || opaque == NULL) {
524 		DLB2_LOG_ERR("NULL pointer");
525 		return -EINVAL;
526 	}
527 
528 	ret = dlb2_string_to_int(sw_credit_quanta, value);
529 	if (ret < 0)
530 		return ret;
531 
532 	if (*sw_credit_quanta <= 0) {
533 		DLB2_LOG_ERR("sw_credit_quanta must be > 0");
534 		return -EINVAL;
535 	}
536 
537 	return 0;
538 }
539 
540 static int
541 set_hw_credit_quanta(const char *key __rte_unused,
542 	const char *value,
543 	void *opaque)
544 {
545 	int *hw_credit_quanta = opaque;
546 	int ret;
547 
548 	if (value == NULL || opaque == NULL) {
549 		DLB2_LOG_ERR("NULL pointer");
550 		return -EINVAL;
551 	}
552 
553 	ret = dlb2_string_to_int(hw_credit_quanta, value);
554 	if (ret < 0)
555 		return ret;
556 
557 	return 0;
558 }
559 
560 static int
561 set_default_depth_thresh(const char *key __rte_unused,
562 	const char *value,
563 	void *opaque)
564 {
565 	int *default_depth_thresh = opaque;
566 	int ret;
567 
568 	if (value == NULL || opaque == NULL) {
569 		DLB2_LOG_ERR("NULL pointer");
570 		return -EINVAL;
571 	}
572 
573 	ret = dlb2_string_to_int(default_depth_thresh, value);
574 	if (ret < 0)
575 		return ret;
576 
577 	return 0;
578 }
579 
580 static int
581 set_vector_opts_enab(const char *key __rte_unused,
582 	const char *value,
583 	void *opaque)
584 {
585 	bool *dlb2_vector_opts_enabled = opaque;
586 
587 	if (value == NULL || opaque == NULL) {
588 		DLB2_LOG_ERR("NULL pointer");
589 		return -EINVAL;
590 	}
591 
592 	if ((*value == 'y') || (*value == 'Y'))
593 		*dlb2_vector_opts_enabled = true;
594 	else
595 		*dlb2_vector_opts_enabled = false;
596 
597 	return 0;
598 }
599 
600 static int
601 set_default_ldb_port_allocation(const char *key __rte_unused,
602 		      const char *value,
603 		      void *opaque)
604 {
605 	bool *default_ldb_port_allocation = opaque;
606 
607 	if (value == NULL || opaque == NULL) {
608 		DLB2_LOG_ERR("NULL pointer");
609 		return -EINVAL;
610 	}
611 
612 	if ((*value == 'y') || (*value == 'Y'))
613 		*default_ldb_port_allocation = true;
614 	else
615 		*default_ldb_port_allocation = false;
616 
617 	return 0;
618 }
619 
620 static int
621 set_enable_cq_weight(const char *key __rte_unused,
622 		      const char *value,
623 		      void *opaque)
624 {
625 	bool *enable_cq_weight = opaque;
626 
627 	if (value == NULL || opaque == NULL) {
628 		DLB2_LOG_ERR("NULL pointer");
629 		return -EINVAL;
630 	}
631 
632 	if ((*value == 'y') || (*value == 'Y'))
633 		*enable_cq_weight = true;
634 	else
635 		*enable_cq_weight = false;
636 
637 	return 0;
638 }
639 
640 static int
641 set_qid_depth_thresh(const char *key __rte_unused,
642 		     const char *value,
643 		     void *opaque)
644 {
645 	struct dlb2_qid_depth_thresholds *qid_thresh = opaque;
646 	int first, last, thresh, i;
647 
648 	if (value == NULL || opaque == NULL) {
649 		DLB2_LOG_ERR("NULL pointer");
650 		return -EINVAL;
651 	}
652 
653 	/* command line override may take one of the following 3 forms:
654 	 * qid_depth_thresh=all:<threshold_value> ... all queues
655 	 * qid_depth_thresh=qidA-qidB:<threshold_value> ... a range of queues
656 	 * qid_depth_thresh=qid:<threshold_value> ... just one queue
657 	 */
658 	if (sscanf(value, "all:%d", &thresh) == 1) {
659 		first = 0;
660 		last = DLB2_MAX_NUM_QUEUES(DLB2_HW_V2) - 1;
661 	} else if (sscanf(value, "%d-%d:%d", &first, &last, &thresh) == 3) {
662 		/* we have everything we need */
663 	} else if (sscanf(value, "%d:%d", &first, &thresh) == 2) {
664 		last = first;
665 	} else {
666 		DLB2_LOG_ERR("Error parsing qid depth devarg. Should be all:val, qid-qid:val, or qid:val");
667 		return -EINVAL;
668 	}
669 
670 	if (first > last || first < 0 ||
671 		last >= DLB2_MAX_NUM_QUEUES(DLB2_HW_V2)) {
672 		DLB2_LOG_ERR("Error parsing qid depth devarg, invalid qid value");
673 		return -EINVAL;
674 	}
675 
676 	if (thresh < 0 || thresh > DLB2_MAX_QUEUE_DEPTH_THRESHOLD) {
677 		DLB2_LOG_ERR("Error parsing qid depth devarg, threshold > %d",
678 			     DLB2_MAX_QUEUE_DEPTH_THRESHOLD);
679 		return -EINVAL;
680 	}
681 
682 	for (i = first; i <= last; i++)
683 		qid_thresh->val[i] = thresh; /* indexed by qid */
684 
685 	return 0;
686 }
687 
688 static int
689 set_qid_depth_thresh_v2_5(const char *key __rte_unused,
690 			  const char *value,
691 			  void *opaque)
692 {
693 	struct dlb2_qid_depth_thresholds *qid_thresh = opaque;
694 	int first, last, thresh, i;
695 
696 	if (value == NULL || opaque == NULL) {
697 		DLB2_LOG_ERR("NULL pointer");
698 		return -EINVAL;
699 	}
700 
701 	/* command line override may take one of the following 3 forms:
702 	 * qid_depth_thresh=all:<threshold_value> ... all queues
703 	 * qid_depth_thresh=qidA-qidB:<threshold_value> ... a range of queues
704 	 * qid_depth_thresh=qid:<threshold_value> ... just one queue
705 	 */
706 	if (sscanf(value, "all:%d", &thresh) == 1) {
707 		first = 0;
708 		last = DLB2_MAX_NUM_QUEUES(DLB2_HW_V2_5) - 1;
709 	} else if (sscanf(value, "%d-%d:%d", &first, &last, &thresh) == 3) {
710 		/* we have everything we need */
711 	} else if (sscanf(value, "%d:%d", &first, &thresh) == 2) {
712 		last = first;
713 	} else {
714 		DLB2_LOG_ERR("Error parsing qid depth devarg. Should be all:val, qid-qid:val, or qid:val");
715 		return -EINVAL;
716 	}
717 
718 	if (first > last || first < 0 ||
719 		last >= DLB2_MAX_NUM_QUEUES(DLB2_HW_V2_5)) {
720 		DLB2_LOG_ERR("Error parsing qid depth devarg, invalid qid value");
721 		return -EINVAL;
722 	}
723 
724 	if (thresh < 0 || thresh > DLB2_MAX_QUEUE_DEPTH_THRESHOLD) {
725 		DLB2_LOG_ERR("Error parsing qid depth devarg, threshold > %d",
726 			     DLB2_MAX_QUEUE_DEPTH_THRESHOLD);
727 		return -EINVAL;
728 	}
729 
730 	for (i = first; i <= last; i++)
731 		qid_thresh->val[i] = thresh; /* indexed by qid */
732 
733 	return 0;
734 }
735 
736 static void
737 dlb2_eventdev_info_get(struct rte_eventdev *dev,
738 		       struct rte_event_dev_info *dev_info)
739 {
740 	struct dlb2_eventdev *dlb2 = dlb2_pmd_priv(dev);
741 	int ret;
742 
743 	ret = dlb2_hw_query_resources(dlb2);
744 	if (ret) {
745 		const struct rte_eventdev_data *data = dev->data;
746 
747 		DLB2_LOG_ERR("get resources err=%d, devid=%d",
748 			     ret, data->dev_id);
749 		/* fn is void, so fall through and return values set up in
750 		 * probe
751 		 */
752 	}
753 
754 	/* Add num resources currently owned by this domain.
755 	 * These would become available if the scheduling domain were reset due
756 	 * to the application recalling eventdev_configure to *reconfigure* the
757 	 * domain.
758 	 */
759 	evdev_dlb2_default_info.max_event_ports += dlb2->num_ldb_ports;
760 	evdev_dlb2_default_info.max_event_queues += dlb2->num_ldb_queues;
761 	if (dlb2->version == DLB2_HW_V2_5) {
762 		evdev_dlb2_default_info.max_num_events +=
763 			dlb2->max_credits;
764 	} else {
765 		evdev_dlb2_default_info.max_num_events +=
766 			dlb2->max_ldb_credits;
767 	}
768 	evdev_dlb2_default_info.max_event_queues =
769 		RTE_MIN(evdev_dlb2_default_info.max_event_queues,
770 			RTE_EVENT_MAX_QUEUES_PER_DEV);
771 
772 	evdev_dlb2_default_info.max_num_events =
773 		RTE_MIN(evdev_dlb2_default_info.max_num_events,
774 			dlb2->max_num_events_override);
775 
776 	*dev_info = evdev_dlb2_default_info;
777 }
778 
779 static int
780 dlb2_hw_create_sched_domain(struct dlb2_eventdev *dlb2,
781 			    struct dlb2_hw_dev *handle,
782 			    const struct dlb2_hw_rsrcs *resources_asked,
783 			    uint8_t device_version)
784 {
785 	int ret = 0;
786 	uint32_t cos_ports = 0;
787 	struct dlb2_create_sched_domain_args *cfg;
788 
789 	if (resources_asked == NULL) {
790 		DLB2_LOG_ERR("dlb2: dlb2_create NULL parameter");
791 		ret = EINVAL;
792 		goto error_exit;
793 	}
794 
795 	/* Map generic qm resources to dlb2 resources */
796 	cfg = &handle->cfg.resources;
797 
798 	/* DIR ports and queues */
799 
800 	cfg->num_dir_ports = resources_asked->num_dir_ports;
801 	if (device_version == DLB2_HW_V2_5)
802 		cfg->num_credits = resources_asked->num_credits;
803 	else
804 		cfg->num_dir_credits = resources_asked->num_dir_credits;
805 
806 	/* LDB queues */
807 
808 	cfg->num_ldb_queues = resources_asked->num_ldb_queues;
809 
810 	/* LDB ports */
811 
812 	/* tally of COS ports from cmd line */
813 	cos_ports = dlb2->cos_ports[0] + dlb2->cos_ports[1] +
814 		    dlb2->cos_ports[2] + dlb2->cos_ports[3];
815 
816 	if (cos_ports > resources_asked->num_ldb_ports ||
817 	    (cos_ports && dlb2->max_cos_port >= resources_asked->num_ldb_ports)) {
818 		DLB2_LOG_ERR("dlb2: num_ldb_ports < cos_ports");
819 		ret = EINVAL;
820 		goto error_exit;
821 	}
822 
823 	cfg->cos_strict = 0; /* Best effort */
824 	cfg->num_cos_ldb_ports[0] = dlb2->cos_ports[0];
825 	cfg->num_cos_ldb_ports[1] = dlb2->cos_ports[1];
826 	cfg->num_cos_ldb_ports[2] = dlb2->cos_ports[2];
827 	cfg->num_cos_ldb_ports[3] = dlb2->cos_ports[3];
828 	cfg->num_ldb_ports = resources_asked->num_ldb_ports - cos_ports;
829 
830 	if (device_version == DLB2_HW_V2)
831 		cfg->num_ldb_credits = resources_asked->num_ldb_credits;
832 
833 	cfg->num_atomic_inflights =
834 		DLB2_NUM_ATOMIC_INFLIGHTS_PER_QUEUE *
835 		cfg->num_ldb_queues;
836 
837 	cfg->num_hist_list_entries = resources_asked->num_ldb_ports *
838 		evdev_dlb2_default_info.max_event_port_dequeue_depth;
839 
840 	if (device_version == DLB2_HW_V2_5) {
841 		DLB2_LOG_LINE_DBG("sched domain create - ldb_qs=%d, ldb_ports=%d, dir_ports=%d, atomic_inflights=%d, hist_list_entries=%d, credits=%d",
842 			     cfg->num_ldb_queues,
843 			     resources_asked->num_ldb_ports,
844 			     cfg->num_dir_ports,
845 			     cfg->num_atomic_inflights,
846 			     cfg->num_hist_list_entries,
847 			     cfg->num_credits);
848 	} else {
849 		DLB2_LOG_LINE_DBG("sched domain create - ldb_qs=%d, ldb_ports=%d, dir_ports=%d, atomic_inflights=%d, hist_list_entries=%d, ldb_credits=%d, dir_credits=%d",
850 			     cfg->num_ldb_queues,
851 			     resources_asked->num_ldb_ports,
852 			     cfg->num_dir_ports,
853 			     cfg->num_atomic_inflights,
854 			     cfg->num_hist_list_entries,
855 			     cfg->num_ldb_credits,
856 			     cfg->num_dir_credits);
857 	}
858 
859 	/* Configure the QM */
860 
861 	ret = dlb2_iface_sched_domain_create(handle, cfg);
862 	if (ret < 0) {
863 		DLB2_LOG_ERR("dlb2: domain create failed, ret = %d, extra status: %s",
864 			     ret,
865 			     dlb2_error_strings[cfg->response.status]);
866 
867 		goto error_exit;
868 	}
869 
870 	handle->domain_id = cfg->response.id;
871 	handle->cfg.configured = true;
872 
873 error_exit:
874 
875 	return ret;
876 }
877 
878 static void
879 dlb2_hw_reset_sched_domain(const struct rte_eventdev *dev, bool reconfig)
880 {
881 	struct dlb2_eventdev *dlb2 = dlb2_pmd_priv(dev);
882 	enum dlb2_configuration_state config_state;
883 	int i, j;
884 
885 	dlb2_iface_domain_reset(dlb2);
886 
887 	/* Free all dynamically allocated port memory */
888 	for (i = 0; i < dlb2->num_ports; i++)
889 		dlb2_free_qe_mem(&dlb2->ev_ports[i].qm_port);
890 
891 	/* If reconfiguring, mark the device's queues and ports as "previously
892 	 * configured." If the user doesn't reconfigure them, the PMD will
893 	 * reapply their previous configuration when the device is started.
894 	 */
895 	config_state = (reconfig) ? DLB2_PREV_CONFIGURED :
896 		DLB2_NOT_CONFIGURED;
897 
898 	for (i = 0; i < dlb2->num_ports; i++) {
899 		dlb2->ev_ports[i].qm_port.config_state = config_state;
900 		/* Reset setup_done so ports can be reconfigured */
901 		dlb2->ev_ports[i].setup_done = false;
902 		for (j = 0; j < DLB2_MAX_NUM_QIDS_PER_LDB_CQ; j++)
903 			dlb2->ev_ports[i].link[j].mapped = false;
904 	}
905 
906 	for (i = 0; i < dlb2->num_queues; i++)
907 		dlb2->ev_queues[i].qm_queue.config_state = config_state;
908 
909 	for (i = 0; i < DLB2_MAX_NUM_QUEUES(DLB2_HW_V2_5); i++)
910 		dlb2->ev_queues[i].setup_done = false;
911 
912 	dlb2->num_ports = 0;
913 	dlb2->num_ldb_ports = 0;
914 	dlb2->num_dir_ports = 0;
915 	dlb2->num_queues = 0;
916 	dlb2->num_ldb_queues = 0;
917 	dlb2->num_dir_queues = 0;
918 	dlb2->configured = false;
919 }
920 
921 /* Note: 1 QM instance per QM device, QM instance/device == event device */
922 static int
923 dlb2_eventdev_configure(const struct rte_eventdev *dev)
924 {
925 	struct dlb2_eventdev *dlb2 = dlb2_pmd_priv(dev);
926 	struct dlb2_hw_dev *handle = &dlb2->qm_instance;
927 	struct dlb2_hw_rsrcs *rsrcs = &handle->info.hw_rsrc_max;
928 	const struct rte_eventdev_data *data = dev->data;
929 	const struct rte_event_dev_config *config = &data->dev_conf;
930 	int ret;
931 
932 	/* If this eventdev is already configured, we must release the current
933 	 * scheduling domain before attempting to configure a new one.
934 	 */
935 	if (dlb2->configured) {
936 		dlb2_hw_reset_sched_domain(dev, true);
937 		ret = dlb2_hw_query_resources(dlb2);
938 		if (ret) {
939 			DLB2_LOG_ERR("get resources err=%d, devid=%d",
940 				     ret, data->dev_id);
941 			return ret;
942 		}
943 	}
944 
945 	if (config->nb_event_queues > rsrcs->num_queues) {
946 		DLB2_LOG_ERR("nb_event_queues parameter (%d) exceeds the QM device's capabilities (%d).",
947 			     config->nb_event_queues,
948 			     rsrcs->num_queues);
949 		return -EINVAL;
950 	}
951 	if (config->nb_event_ports > (rsrcs->num_ldb_ports
952 			+ rsrcs->num_dir_ports)) {
953 		DLB2_LOG_ERR("nb_event_ports parameter (%d) exceeds the QM device's capabilities (%d).",
954 			     config->nb_event_ports,
955 			     (rsrcs->num_ldb_ports + rsrcs->num_dir_ports));
956 		return -EINVAL;
957 	}
958 	if (config->nb_events_limit > rsrcs->nb_events_limit) {
959 		DLB2_LOG_ERR("nb_events_limit parameter (%d) exceeds the QM device's capabilities (%d).",
960 			     config->nb_events_limit,
961 			     rsrcs->nb_events_limit);
962 		return -EINVAL;
963 	}
964 
965 	if (config->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT)
966 		dlb2->global_dequeue_wait = false;
967 	else {
968 		uint32_t timeout32;
969 
970 		dlb2->global_dequeue_wait = true;
971 
972 		/* note size mismatch of timeout vals in eventdev lib. */
973 		timeout32 = config->dequeue_timeout_ns;
974 
975 		dlb2->global_dequeue_wait_ticks =
976 			timeout32 * (rte_get_timer_hz() / 1E9);
977 	}
978 
979 	/* Does this platform support umonitor/umwait? */
980 	if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_WAITPKG))
981 		dlb2->umwait_allowed = true;
982 
983 	rsrcs->num_dir_ports = config->nb_single_link_event_port_queues;
984 	rsrcs->num_ldb_ports  = config->nb_event_ports - rsrcs->num_dir_ports;
985 	/* 1 dir queue per dir port */
986 	rsrcs->num_ldb_queues = config->nb_event_queues - rsrcs->num_dir_ports;
987 
988 	if (dlb2->version == DLB2_HW_V2_5) {
989 		rsrcs->num_credits = 0;
990 		if (rsrcs->num_ldb_queues || rsrcs->num_dir_ports)
991 			rsrcs->num_credits = config->nb_events_limit;
992 	} else {
993 		/* Scale down nb_events_limit by 4 for directed credits,
994 		 * since there are 4x as many load-balanced credits.
995 		 */
996 		rsrcs->num_ldb_credits = 0;
997 		rsrcs->num_dir_credits = 0;
998 
999 		if (rsrcs->num_ldb_queues)
1000 			rsrcs->num_ldb_credits = config->nb_events_limit;
1001 		if (rsrcs->num_dir_ports)
1002 			rsrcs->num_dir_credits = config->nb_events_limit / 2;
1003 		if (dlb2->num_dir_credits_override != -1)
1004 			rsrcs->num_dir_credits = dlb2->num_dir_credits_override;
1005 	}
1006 
1007 	if (dlb2_hw_create_sched_domain(dlb2, handle, rsrcs,
1008 					dlb2->version) < 0) {
1009 		DLB2_LOG_ERR("dlb2_hw_create_sched_domain failed");
1010 		return -ENODEV;
1011 	}
1012 
1013 	dlb2->new_event_limit = config->nb_events_limit;
1014 	rte_atomic_store_explicit(&dlb2->inflights, 0, rte_memory_order_seq_cst);
1015 
1016 	/* Save number of ports/queues for this event dev */
1017 	dlb2->num_ports = config->nb_event_ports;
1018 	dlb2->num_queues = config->nb_event_queues;
1019 	dlb2->num_dir_ports = rsrcs->num_dir_ports;
1020 	dlb2->num_ldb_ports = dlb2->num_ports - dlb2->num_dir_ports;
1021 	dlb2->num_ldb_queues = dlb2->num_queues - dlb2->num_dir_ports;
1022 	dlb2->num_dir_queues = dlb2->num_dir_ports;
1023 	if (dlb2->version == DLB2_HW_V2_5) {
1024 		dlb2->credit_pool = rsrcs->num_credits;
1025 		dlb2->max_credits = rsrcs->num_credits;
1026 	} else {
1027 		dlb2->ldb_credit_pool = rsrcs->num_ldb_credits;
1028 		dlb2->max_ldb_credits = rsrcs->num_ldb_credits;
1029 		dlb2->dir_credit_pool = rsrcs->num_dir_credits;
1030 		dlb2->max_dir_credits = rsrcs->num_dir_credits;
1031 	}
1032 
1033 	dlb2->configured = true;
1034 
1035 	return 0;
1036 }
1037 
1038 static void
1039 dlb2_eventdev_port_default_conf_get(struct rte_eventdev *dev,
1040 				    uint8_t port_id,
1041 				    struct rte_event_port_conf *port_conf)
1042 {
1043 	RTE_SET_USED(port_id);
1044 	struct dlb2_eventdev *dlb2 = dlb2_pmd_priv(dev);
1045 
1046 	port_conf->new_event_threshold = dlb2->new_event_limit;
1047 	port_conf->dequeue_depth = 32;
1048 	port_conf->enqueue_depth = DLB2_MAX_ENQUEUE_DEPTH;
1049 	port_conf->event_port_cfg = 0;
1050 }
1051 
1052 static void
1053 dlb2_eventdev_queue_default_conf_get(struct rte_eventdev *dev,
1054 				     uint8_t queue_id,
1055 				     struct rte_event_queue_conf *queue_conf)
1056 {
1057 	RTE_SET_USED(dev);
1058 	RTE_SET_USED(queue_id);
1059 
1060 	queue_conf->nb_atomic_flows = 1024;
1061 	queue_conf->nb_atomic_order_sequences = 64;
1062 	queue_conf->event_queue_cfg = 0;
1063 	queue_conf->priority = 0;
1064 }
1065 
1066 static int32_t
1067 dlb2_get_sn_allocation(struct dlb2_eventdev *dlb2, int group)
1068 {
1069 	struct dlb2_hw_dev *handle = &dlb2->qm_instance;
1070 	struct dlb2_get_sn_allocation_args cfg;
1071 	int ret;
1072 
1073 	cfg.group = group;
1074 
1075 	ret = dlb2_iface_get_sn_allocation(handle, &cfg);
1076 	if (ret < 0) {
1077 		DLB2_LOG_ERR("dlb2: get_sn_allocation ret=%d (driver status: %s)",
1078 			     ret, dlb2_error_strings[cfg.response.status]);
1079 		return ret;
1080 	}
1081 
1082 	return cfg.response.id;
1083 }
1084 
1085 static int
1086 dlb2_set_sn_allocation(struct dlb2_eventdev *dlb2, int group, int num)
1087 {
1088 	struct dlb2_hw_dev *handle = &dlb2->qm_instance;
1089 	struct dlb2_set_sn_allocation_args cfg;
1090 	int ret;
1091 
1092 	cfg.num = num;
1093 	cfg.group = group;
1094 
1095 	ret = dlb2_iface_set_sn_allocation(handle, &cfg);
1096 	if (ret < 0) {
1097 		DLB2_LOG_ERR("dlb2: set_sn_allocation ret=%d (driver status: %s)",
1098 			     ret, dlb2_error_strings[cfg.response.status]);
1099 		return ret;
1100 	}
1101 
1102 	return ret;
1103 }
1104 
1105 static int32_t
1106 dlb2_get_sn_occupancy(struct dlb2_eventdev *dlb2, int group)
1107 {
1108 	struct dlb2_hw_dev *handle = &dlb2->qm_instance;
1109 	struct dlb2_get_sn_occupancy_args cfg;
1110 	int ret;
1111 
1112 	cfg.group = group;
1113 
1114 	ret = dlb2_iface_get_sn_occupancy(handle, &cfg);
1115 	if (ret < 0) {
1116 		DLB2_LOG_ERR("dlb2: get_sn_occupancy ret=%d (driver status: %s)",
1117 			     ret, dlb2_error_strings[cfg.response.status]);
1118 		return ret;
1119 	}
1120 
1121 	return cfg.response.id;
1122 }
1123 
1124 /* Query the current sequence number allocations and, if they conflict with the
1125  * requested LDB queue configuration, attempt to re-allocate sequence numbers.
1126  * This is best-effort; if it fails, the PMD will attempt to configure the
1127  * load-balanced queue and return an error.
1128  */
1129 static void
1130 dlb2_program_sn_allocation(struct dlb2_eventdev *dlb2,
1131 			   const struct rte_event_queue_conf *queue_conf)
1132 {
1133 	int grp_occupancy[DLB2_NUM_SN_GROUPS];
1134 	int grp_alloc[DLB2_NUM_SN_GROUPS];
1135 	int i, sequence_numbers;
1136 
1137 	sequence_numbers = (int)queue_conf->nb_atomic_order_sequences;
1138 
1139 	for (i = 0; i < DLB2_NUM_SN_GROUPS; i++) {
1140 		int total_slots;
1141 
1142 		grp_alloc[i] = dlb2_get_sn_allocation(dlb2, i);
1143 		if (grp_alloc[i] < 0)
1144 			return;
1145 
1146 		total_slots = DLB2_MAX_LDB_SN_ALLOC / grp_alloc[i];
1147 
1148 		grp_occupancy[i] = dlb2_get_sn_occupancy(dlb2, i);
1149 		if (grp_occupancy[i] < 0)
1150 			return;
1151 
1152 		/* DLB has at least one available slot for the requested
1153 		 * sequence numbers, so no further configuration required.
1154 		 */
1155 		if (grp_alloc[i] == sequence_numbers &&
1156 		    grp_occupancy[i] < total_slots)
1157 			return;
1158 	}
1159 
1160 	/* None of the sequence number groups are configured for the requested
1161 	 * sequence numbers, so we have to reconfigure one of them. This is
1162 	 * only possible if a group is not in use.
1163 	 */
1164 	for (i = 0; i < DLB2_NUM_SN_GROUPS; i++) {
1165 		if (grp_occupancy[i] == 0)
1166 			break;
1167 	}
1168 
1169 	if (i == DLB2_NUM_SN_GROUPS) {
1170 		DLB2_LOG_ERR("[%s()] No groups with %d sequence_numbers are available or have free slots",
1171 		       __func__, sequence_numbers);
1172 		return;
1173 	}
1174 
1175 	/* Attempt to configure slot i with the requested number of sequence
1176 	 * numbers. Ignore the return value -- if this fails, the error will be
1177 	 * caught during subsequent queue configuration.
1178 	 */
1179 	dlb2_set_sn_allocation(dlb2, i, sequence_numbers);
1180 }
1181 
1182 static int32_t
1183 dlb2_hw_create_ldb_queue(struct dlb2_eventdev *dlb2,
1184 			 struct dlb2_eventdev_queue *ev_queue,
1185 			 const struct rte_event_queue_conf *evq_conf)
1186 {
1187 	struct dlb2_hw_dev *handle = &dlb2->qm_instance;
1188 	struct dlb2_queue *queue = &ev_queue->qm_queue;
1189 	struct dlb2_create_ldb_queue_args cfg;
1190 	int32_t ret;
1191 	uint32_t qm_qid;
1192 	int sched_type = -1;
1193 
1194 	if (evq_conf == NULL)
1195 		return -EINVAL;
1196 
1197 	if (evq_conf->event_queue_cfg & RTE_EVENT_QUEUE_CFG_ALL_TYPES) {
1198 		if (evq_conf->nb_atomic_order_sequences != 0)
1199 			sched_type = RTE_SCHED_TYPE_ORDERED;
1200 		else
1201 			sched_type = RTE_SCHED_TYPE_PARALLEL;
1202 	} else
1203 		sched_type = evq_conf->schedule_type;
1204 
1205 	cfg.num_atomic_inflights = DLB2_NUM_ATOMIC_INFLIGHTS_PER_QUEUE;
1206 	cfg.num_sequence_numbers = evq_conf->nb_atomic_order_sequences;
1207 	cfg.num_qid_inflights = evq_conf->nb_atomic_order_sequences;
1208 
1209 	if (sched_type != RTE_SCHED_TYPE_ORDERED) {
1210 		cfg.num_sequence_numbers = 0;
1211 		cfg.num_qid_inflights = 2048;
1212 	}
1213 
1214 	/* App should set this to the number of hardware flows they want, not
1215 	 * the overall number of flows they're going to use. E.g. if app is
1216 	 * using 64 flows and sets compression to 64, best-case they'll get
1217 	 * 64 unique hashed flows in hardware.
1218 	 */
1219 	switch (evq_conf->nb_atomic_flows) {
1220 	/* Valid DLB2 compression levels */
1221 	case 64:
1222 	case 128:
1223 	case 256:
1224 	case 512:
1225 	case (1 * 1024): /* 1K */
1226 	case (2 * 1024): /* 2K */
1227 	case (4 * 1024): /* 4K */
1228 	case (64 * 1024): /* 64K */
1229 		cfg.lock_id_comp_level = evq_conf->nb_atomic_flows;
1230 		break;
1231 	default:
1232 		/* Invalid compression level */
1233 		cfg.lock_id_comp_level = 0; /* no compression */
1234 	}
1235 
1236 	if (ev_queue->depth_threshold == 0) {
1237 		cfg.depth_threshold = dlb2->default_depth_thresh;
1238 		ev_queue->depth_threshold =
1239 			dlb2->default_depth_thresh;
1240 	} else
1241 		cfg.depth_threshold = ev_queue->depth_threshold;
1242 
1243 	ret = dlb2_iface_ldb_queue_create(handle, &cfg);
1244 	if (ret < 0) {
1245 		DLB2_LOG_ERR("dlb2: create LB event queue error, ret=%d (driver status: %s)",
1246 			     ret, dlb2_error_strings[cfg.response.status]);
1247 		return -EINVAL;
1248 	}
1249 
1250 	qm_qid = cfg.response.id;
1251 
1252 	/* Save off queue config for debug, resource lookups, and reconfig */
1253 	queue->num_qid_inflights = cfg.num_qid_inflights;
1254 	queue->num_atm_inflights = cfg.num_atomic_inflights;
1255 
1256 	queue->sched_type = sched_type;
1257 	queue->config_state = DLB2_CONFIGURED;
1258 
1259 	DLB2_LOG_LINE_DBG("Created LB event queue %d, nb_inflights=%d, nb_seq=%d, qid inflights=%d",
1260 		     qm_qid,
1261 		     cfg.num_atomic_inflights,
1262 		     cfg.num_sequence_numbers,
1263 		     cfg.num_qid_inflights);
1264 
1265 	return qm_qid;
1266 }
1267 
1268 static int
1269 dlb2_eventdev_ldb_queue_setup(struct rte_eventdev *dev,
1270 			      struct dlb2_eventdev_queue *ev_queue,
1271 			      const struct rte_event_queue_conf *queue_conf)
1272 {
1273 	struct dlb2_eventdev *dlb2 = dlb2_pmd_priv(dev);
1274 	int32_t qm_qid;
1275 
1276 	if (queue_conf->nb_atomic_order_sequences)
1277 		dlb2_program_sn_allocation(dlb2, queue_conf);
1278 
1279 	qm_qid = dlb2_hw_create_ldb_queue(dlb2, ev_queue, queue_conf);
1280 	if (qm_qid < 0) {
1281 		DLB2_LOG_ERR("Failed to create the load-balanced queue");
1282 
1283 		return qm_qid;
1284 	}
1285 
1286 	dlb2->qm_ldb_to_ev_queue_id[qm_qid] = ev_queue->id;
1287 
1288 	ev_queue->qm_queue.id = qm_qid;
1289 
1290 	return 0;
1291 }
1292 
1293 static int dlb2_num_dir_queues_setup(struct dlb2_eventdev *dlb2)
1294 {
1295 	int i, num = 0;
1296 
1297 	for (i = 0; i < dlb2->num_queues; i++) {
1298 		if (dlb2->ev_queues[i].setup_done &&
1299 		    dlb2->ev_queues[i].qm_queue.is_directed)
1300 			num++;
1301 	}
1302 
1303 	return num;
1304 }
1305 
1306 static void
1307 dlb2_queue_link_teardown(struct dlb2_eventdev *dlb2,
1308 			 struct dlb2_eventdev_queue *ev_queue)
1309 {
1310 	struct dlb2_eventdev_port *ev_port;
1311 	int i, j;
1312 
1313 	for (i = 0; i < dlb2->num_ports; i++) {
1314 		ev_port = &dlb2->ev_ports[i];
1315 
1316 		for (j = 0; j < DLB2_MAX_NUM_QIDS_PER_LDB_CQ; j++) {
1317 			if (!ev_port->link[j].valid ||
1318 			    ev_port->link[j].queue_id != ev_queue->id)
1319 				continue;
1320 
1321 			ev_port->link[j].valid = false;
1322 			ev_port->num_links--;
1323 		}
1324 	}
1325 
1326 	ev_queue->num_links = 0;
1327 }
1328 
1329 static int
1330 dlb2_eventdev_queue_setup(struct rte_eventdev *dev,
1331 			  uint8_t ev_qid,
1332 			  const struct rte_event_queue_conf *queue_conf)
1333 {
1334 	struct dlb2_eventdev *dlb2 = dlb2_pmd_priv(dev);
1335 	struct dlb2_eventdev_queue *ev_queue;
1336 	int ret;
1337 
1338 	if (queue_conf == NULL)
1339 		return -EINVAL;
1340 
1341 	if (ev_qid >= dlb2->num_queues)
1342 		return -EINVAL;
1343 
1344 	ev_queue = &dlb2->ev_queues[ev_qid];
1345 
1346 	ev_queue->qm_queue.is_directed = queue_conf->event_queue_cfg &
1347 		RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
1348 	ev_queue->id = ev_qid;
1349 	ev_queue->conf = *queue_conf;
1350 
1351 	if (!ev_queue->qm_queue.is_directed) {
1352 		ret = dlb2_eventdev_ldb_queue_setup(dev, ev_queue, queue_conf);
1353 	} else {
1354 		/* The directed queue isn't setup until link time, at which
1355 		 * point we know its directed port ID. Directed queue setup
1356 		 * will only fail if this queue is already setup or there are
1357 		 * no directed queues left to configure.
1358 		 */
1359 		ret = 0;
1360 
1361 		ev_queue->qm_queue.config_state = DLB2_NOT_CONFIGURED;
1362 
1363 		if (ev_queue->setup_done ||
1364 		    dlb2_num_dir_queues_setup(dlb2) == dlb2->num_dir_queues)
1365 			ret = -EINVAL;
1366 	}
1367 
1368 	/* Tear down pre-existing port->queue links */
1369 	if (!ret && dlb2->run_state == DLB2_RUN_STATE_STOPPED)
1370 		dlb2_queue_link_teardown(dlb2, ev_queue);
1371 
1372 	if (!ret)
1373 		ev_queue->setup_done = true;
1374 
1375 	return ret;
1376 }
1377 
1378 static int
1379 dlb2_init_consume_qe(struct dlb2_port *qm_port, char *mz_name)
1380 {
1381 	struct dlb2_cq_pop_qe *qe;
1382 
1383 	qe = rte_zmalloc(mz_name,
1384 			DLB2_NUM_QES_PER_CACHE_LINE *
1385 				sizeof(struct dlb2_cq_pop_qe),
1386 			RTE_CACHE_LINE_SIZE);
1387 
1388 	if (qe == NULL)	{
1389 		DLB2_LOG_ERR("dlb2: no memory for consume_qe");
1390 		return -ENOMEM;
1391 	}
1392 	qm_port->consume_qe = qe;
1393 
1394 	qe->qe_valid = 0;
1395 	qe->qe_frag = 0;
1396 	qe->qe_comp = 0;
1397 	qe->cq_token = 1;
1398 	/* Tokens value is 0-based; i.e. '0' returns 1 token, '1' returns 2,
1399 	 * and so on.
1400 	 */
1401 	qe->tokens = 0;	/* set at run time */
1402 	qe->meas_lat = 0;
1403 	qe->no_dec = 0;
1404 	/* Completion IDs are disabled */
1405 	qe->cmp_id = 0;
1406 
1407 	return 0;
1408 }
1409 
1410 static int
1411 dlb2_init_int_arm_qe(struct dlb2_port *qm_port, char *mz_name)
1412 {
1413 	struct dlb2_enqueue_qe *qe;
1414 
1415 	qe = rte_zmalloc(mz_name,
1416 			DLB2_NUM_QES_PER_CACHE_LINE *
1417 				sizeof(struct dlb2_enqueue_qe),
1418 			RTE_CACHE_LINE_SIZE);
1419 
1420 	if (qe == NULL) {
1421 		DLB2_LOG_ERR("dlb2: no memory for complete_qe");
1422 		return -ENOMEM;
1423 	}
1424 	qm_port->int_arm_qe = qe;
1425 
1426 	/* V2 - INT ARM is CQ_TOKEN + FRAG */
1427 	qe->qe_valid = 0;
1428 	qe->qe_frag = 1;
1429 	qe->qe_comp = 0;
1430 	qe->cq_token = 1;
1431 	qe->meas_lat = 0;
1432 	qe->no_dec = 0;
1433 	/* Completion IDs are disabled */
1434 	qe->cmp_id = 0;
1435 
1436 	return 0;
1437 }
1438 
1439 static int
1440 dlb2_init_qe_mem(struct dlb2_port *qm_port, char *mz_name)
1441 {
1442 	int ret, sz;
1443 
1444 	sz = DLB2_NUM_QES_PER_CACHE_LINE * sizeof(struct dlb2_enqueue_qe);
1445 
1446 	qm_port->qe4 = rte_zmalloc(mz_name, sz, RTE_CACHE_LINE_SIZE);
1447 
1448 	if (qm_port->qe4 == NULL) {
1449 		DLB2_LOG_ERR("dlb2: no qe4 memory");
1450 		ret = -ENOMEM;
1451 		goto error_exit;
1452 	}
1453 
1454 	if (qm_port->reorder_en) {
1455 		sz = sizeof(struct dlb2_reorder);
1456 		qm_port->order = rte_zmalloc(mz_name, sz, RTE_CACHE_LINE_SIZE);
1457 
1458 		if (qm_port->order == NULL) {
1459 			DLB2_LOG_ERR("dlb2: no reorder memory");
1460 			ret = -ENOMEM;
1461 			goto error_exit;
1462 		}
1463 	}
1464 
1465 	ret = dlb2_init_int_arm_qe(qm_port, mz_name);
1466 	if (ret < 0) {
1467 		DLB2_LOG_ERR("dlb2: dlb2_init_int_arm_qe ret=%d", ret);
1468 		goto error_exit;
1469 	}
1470 
1471 	ret = dlb2_init_consume_qe(qm_port, mz_name);
1472 	if (ret < 0) {
1473 		DLB2_LOG_ERR("dlb2: dlb2_init_consume_qe ret=%d", ret);
1474 		goto error_exit;
1475 	}
1476 
1477 	return 0;
1478 
1479 error_exit:
1480 
1481 	dlb2_free_qe_mem(qm_port);
1482 
1483 	return ret;
1484 }
1485 
1486 static inline uint16_t
1487 dlb2_event_enqueue_burst_delayed(void *event_port,
1488 				 const struct rte_event events[],
1489 				 uint16_t num);
1490 
1491 static inline uint16_t
1492 dlb2_event_enqueue_new_burst_delayed(void *event_port,
1493 				     const struct rte_event events[],
1494 				     uint16_t num);
1495 
1496 static inline uint16_t
1497 dlb2_event_enqueue_forward_burst_delayed(void *event_port,
1498 					 const struct rte_event events[],
1499 					 uint16_t num);
1500 
1501 /* Generate the required bitmask for rotate-style expected QE gen bits.
1502  * This requires a pattern of 1's and zeros, starting with expected as
1503  * 1 bits, so when hardware writes 0's they're "new". This requires the
1504  * ring size to be powers of 2 to wrap correctly.
1505  */
1506 static void
1507 dlb2_hw_cq_bitmask_init(struct dlb2_port *qm_port, uint32_t cq_depth)
1508 {
1509 	uint64_t cq_build_mask = 0;
1510 	uint32_t i;
1511 
1512 	if (cq_depth > 64)
1513 		return; /* need to fall back to scalar code */
1514 
1515 	/*
1516 	 * all 1's in first u64, all zeros in second is correct bit pattern to
1517 	 * start. Special casing == 64 easier than adapting complex loop logic.
1518 	 */
1519 	if (cq_depth == 64) {
1520 		qm_port->cq_rolling_mask = 0;
1521 		qm_port->cq_rolling_mask_2 = -1;
1522 		return;
1523 	}
1524 
1525 	for (i = 0; i < 64; i += (cq_depth * 2))
1526 		cq_build_mask |= ((1ULL << cq_depth) - 1) << (i + cq_depth);
1527 
1528 	qm_port->cq_rolling_mask = cq_build_mask;
1529 	qm_port->cq_rolling_mask_2 = cq_build_mask;
1530 }
1531 
1532 static int
1533 dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
1534 			struct dlb2_eventdev_port *ev_port,
1535 			uint32_t dequeue_depth,
1536 			uint32_t enqueue_depth)
1537 {
1538 	struct dlb2_hw_dev *handle = &dlb2->qm_instance;
1539 	struct dlb2_create_ldb_port_args cfg = { {0} };
1540 	int ret;
1541 	struct dlb2_port *qm_port = NULL;
1542 	char mz_name[RTE_MEMZONE_NAMESIZE];
1543 	uint32_t qm_port_id;
1544 	uint16_t ldb_credit_high_watermark = 0;
1545 	uint16_t dir_credit_high_watermark = 0;
1546 	uint16_t credit_high_watermark = 0;
1547 
1548 	if (handle == NULL)
1549 		return -EINVAL;
1550 
1551 	if (dequeue_depth < DLB2_MIN_CQ_DEPTH) {
1552 		DLB2_LOG_ERR("dlb2: invalid cq depth, must be at least %d",
1553 			     DLB2_MIN_CQ_DEPTH);
1554 		return -EINVAL;
1555 	}
1556 
1557 	rte_spinlock_lock(&handle->resource_lock);
1558 
1559 	/* We round up to the next power of 2 if necessary */
1560 	cfg.cq_depth = rte_align32pow2(dequeue_depth);
1561 	cfg.cq_depth_threshold = 1;
1562 
1563 	cfg.cq_history_list_size = cfg.cq_depth;
1564 
1565 	cfg.cos_id = ev_port->cos_id;
1566 	cfg.cos_strict = 0;/* best effots */
1567 
1568 	/* User controls the LDB high watermark via enqueue depth. The DIR high
1569 	 * watermark is equal, unless the directed credit pool is too small.
1570 	 */
1571 	if (dlb2->version == DLB2_HW_V2) {
1572 		ldb_credit_high_watermark = enqueue_depth;
1573 		/* If there are no directed ports, the kernel driver will
1574 		 * ignore this port's directed credit settings. Don't use
1575 		 * enqueue_depth if it would require more directed credits
1576 		 * than are available.
1577 		 */
1578 		dir_credit_high_watermark =
1579 			RTE_MIN(enqueue_depth,
1580 				handle->cfg.num_dir_credits / dlb2->num_ports);
1581 	} else
1582 		credit_high_watermark = enqueue_depth;
1583 
1584 	/* Per QM values */
1585 
1586 	ret = dlb2_iface_ldb_port_create(handle, &cfg,  dlb2->poll_mode);
1587 	if (ret < 0) {
1588 		DLB2_LOG_ERR("dlb2: dlb2_ldb_port_create error, ret=%d (driver status: %s)",
1589 			     ret, dlb2_error_strings[cfg.response.status]);
1590 		goto error_exit;
1591 	}
1592 
1593 	qm_port_id = cfg.response.id;
1594 
1595 	DLB2_LOG_LINE_DBG("dlb2: ev_port %d uses qm LB port %d <<<<<",
1596 		     ev_port->id, qm_port_id);
1597 
1598 	qm_port = &ev_port->qm_port;
1599 	qm_port->ev_port = ev_port; /* back ptr */
1600 	qm_port->dlb2 = dlb2; /* back ptr */
1601 	/*
1602 	 * Allocate and init local qe struct(s).
1603 	 * Note: MOVDIR64 requires the enqueue QE (qe4) to be aligned.
1604 	 */
1605 
1606 	snprintf(mz_name, sizeof(mz_name), "dlb2_ldb_port%d",
1607 		 ev_port->id);
1608 
1609 	ret = dlb2_init_qe_mem(qm_port, mz_name);
1610 	if (ret < 0) {
1611 		DLB2_LOG_ERR("dlb2: init_qe_mem failed, ret=%d", ret);
1612 		goto error_exit;
1613 	}
1614 
1615 	qm_port->id = qm_port_id;
1616 
1617 	if (dlb2->version == DLB2_HW_V2_5 && (dlb2->enable_cq_weight == true)) {
1618 		struct dlb2_enable_cq_weight_args cq_weight_args = { {0} };
1619 		cq_weight_args.port_id = qm_port->id;
1620 		cq_weight_args.limit = dequeue_depth;
1621 		ret = dlb2_iface_enable_cq_weight(handle, &cq_weight_args);
1622 
1623 		if (ret < 0) {
1624 			DLB2_LOG_ERR("dlb2: dlb2_dir_port_create error, ret=%d (driver status: %s)",
1625 					ret,
1626 					dlb2_error_strings[cfg.response.  status]);
1627 			goto error_exit;
1628 		}
1629 	}
1630 
1631 	/* CQs with depth < 8 use an 8-entry queue, but withhold credits so
1632 	 * the effective depth is smaller.
1633 	 */
1634 	qm_port->cq_depth = cfg.cq_depth <= 8 ? 8 : cfg.cq_depth;
1635 	qm_port->cq_idx = 0;
1636 	qm_port->cq_idx_unmasked = 0;
1637 
1638 	if (dlb2->poll_mode == DLB2_CQ_POLL_MODE_SPARSE)
1639 		qm_port->cq_depth_mask = (qm_port->cq_depth * 4) - 1;
1640 	else
1641 		qm_port->cq_depth_mask = qm_port->cq_depth - 1;
1642 
1643 	qm_port->gen_bit_shift = rte_popcount32(qm_port->cq_depth_mask);
1644 	/* starting value of gen bit - it toggles at wrap time */
1645 	qm_port->gen_bit = 1;
1646 
1647 	dlb2_hw_cq_bitmask_init(qm_port, qm_port->cq_depth);
1648 
1649 	qm_port->int_armed = false;
1650 
1651 	/* Save off for later use in info and lookup APIs. */
1652 	qm_port->qid_mappings = &dlb2->qm_ldb_to_ev_queue_id[0];
1653 
1654 	qm_port->dequeue_depth = dequeue_depth;
1655 	qm_port->token_pop_thresh = dequeue_depth;
1656 
1657 	/* The default enqueue functions do not include delayed-pop support for
1658 	 * performance reasons.
1659 	 */
1660 	if (qm_port->token_pop_mode == DELAYED_POP) {
1661 		dlb2->event_dev->enqueue_burst =
1662 			dlb2_event_enqueue_burst_delayed;
1663 		dlb2->event_dev->enqueue_new_burst =
1664 			dlb2_event_enqueue_new_burst_delayed;
1665 		dlb2->event_dev->enqueue_forward_burst =
1666 			dlb2_event_enqueue_forward_burst_delayed;
1667 	}
1668 
1669 	qm_port->owed_tokens = 0;
1670 	qm_port->issued_releases = 0;
1671 
1672 	/* Save config message too. */
1673 	rte_memcpy(&qm_port->cfg.ldb, &cfg, sizeof(qm_port->cfg.ldb));
1674 
1675 	/* update state */
1676 	qm_port->state = PORT_STARTED; /* enabled at create time */
1677 	qm_port->config_state = DLB2_CONFIGURED;
1678 
1679 	if (dlb2->version == DLB2_HW_V2) {
1680 		qm_port->dir_credits = dir_credit_high_watermark;
1681 		qm_port->ldb_credits = ldb_credit_high_watermark;
1682 		qm_port->credit_pool[DLB2_DIR_QUEUE] = &dlb2->dir_credit_pool;
1683 		qm_port->credit_pool[DLB2_LDB_QUEUE] = &dlb2->ldb_credit_pool;
1684 
1685 		DLB2_LOG_LINE_DBG("dlb2: created ldb port %d, depth = %d, ldb credits=%d, dir credits=%d",
1686 			     qm_port_id,
1687 			     dequeue_depth,
1688 			     qm_port->ldb_credits,
1689 			     qm_port->dir_credits);
1690 	} else {
1691 		qm_port->credits = credit_high_watermark;
1692 		qm_port->credit_pool[DLB2_COMBINED_POOL] = &dlb2->credit_pool;
1693 
1694 		DLB2_LOG_LINE_DBG("dlb2: created ldb port %d, depth = %d, credits=%d",
1695 			     qm_port_id,
1696 			     dequeue_depth,
1697 			     qm_port->credits);
1698 	}
1699 
1700 	qm_port->use_scalar = false;
1701 
1702 #if (!defined RTE_ARCH_X86_64)
1703 	qm_port->use_scalar = true;
1704 #else
1705 	if ((qm_port->cq_depth > 64) ||
1706 	    (!rte_is_power_of_2(qm_port->cq_depth)) ||
1707 	    (dlb2->vector_opts_enabled == false))
1708 		qm_port->use_scalar = true;
1709 #endif
1710 
1711 	rte_spinlock_unlock(&handle->resource_lock);
1712 
1713 	return 0;
1714 
1715 error_exit:
1716 
1717 	if (qm_port)
1718 		dlb2_free_qe_mem(qm_port);
1719 
1720 	rte_spinlock_unlock(&handle->resource_lock);
1721 
1722 	DLB2_LOG_ERR("dlb2: create ldb port failed!");
1723 
1724 	return ret;
1725 }
1726 
1727 static void
1728 dlb2_port_link_teardown(struct dlb2_eventdev *dlb2,
1729 			struct dlb2_eventdev_port *ev_port)
1730 {
1731 	struct dlb2_eventdev_queue *ev_queue;
1732 	int i;
1733 
1734 	for (i = 0; i < DLB2_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
1735 		if (!ev_port->link[i].valid)
1736 			continue;
1737 
1738 		ev_queue = &dlb2->ev_queues[ev_port->link[i].queue_id];
1739 
1740 		ev_port->link[i].valid = false;
1741 		ev_port->num_links--;
1742 		ev_queue->num_links--;
1743 	}
1744 }
1745 
1746 static int
1747 dlb2_hw_create_dir_port(struct dlb2_eventdev *dlb2,
1748 			struct dlb2_eventdev_port *ev_port,
1749 			uint32_t dequeue_depth,
1750 			uint32_t enqueue_depth)
1751 {
1752 	struct dlb2_hw_dev *handle = &dlb2->qm_instance;
1753 	struct dlb2_create_dir_port_args cfg = { {0} };
1754 	int ret;
1755 	struct dlb2_port *qm_port = NULL;
1756 	char mz_name[RTE_MEMZONE_NAMESIZE];
1757 	uint32_t qm_port_id;
1758 	uint16_t ldb_credit_high_watermark = 0;
1759 	uint16_t dir_credit_high_watermark = 0;
1760 	uint16_t credit_high_watermark = 0;
1761 
1762 	if (dlb2 == NULL || handle == NULL)
1763 		return -EINVAL;
1764 
1765 	if (dequeue_depth < DLB2_MIN_CQ_DEPTH) {
1766 		DLB2_LOG_ERR("dlb2: invalid dequeue_depth, must be %d-%d",
1767 			     DLB2_MIN_CQ_DEPTH, DLB2_MAX_INPUT_QUEUE_DEPTH);
1768 		return -EINVAL;
1769 	}
1770 
1771 	if (enqueue_depth < DLB2_MIN_ENQUEUE_DEPTH) {
1772 		DLB2_LOG_ERR("dlb2: invalid enqueue_depth, must be at least %d",
1773 			     DLB2_MIN_ENQUEUE_DEPTH);
1774 		return -EINVAL;
1775 	}
1776 
1777 	rte_spinlock_lock(&handle->resource_lock);
1778 
1779 	/* Directed queues are configured at link time. */
1780 	cfg.queue_id = -1;
1781 
1782 	/* We round up to the next power of 2 if necessary */
1783 	cfg.cq_depth = rte_align32pow2(dequeue_depth);
1784 	cfg.cq_depth_threshold = 1;
1785 
1786 	/* User controls the LDB high watermark via enqueue depth. The DIR high
1787 	 * watermark is equal, unless the directed credit pool is too small.
1788 	 */
1789 	if (dlb2->version == DLB2_HW_V2) {
1790 		ldb_credit_high_watermark = enqueue_depth;
1791 		/* Don't use enqueue_depth if it would require more directed
1792 		 * credits than are available.
1793 		 */
1794 		dir_credit_high_watermark =
1795 			RTE_MIN(enqueue_depth,
1796 				handle->cfg.num_dir_credits / dlb2->num_ports);
1797 	} else
1798 		credit_high_watermark = enqueue_depth;
1799 
1800 	if (ev_port->conf.event_port_cfg & RTE_EVENT_PORT_CFG_HINT_PRODUCER)
1801 		cfg.is_producer = 1;
1802 
1803 	/* Per QM values */
1804 
1805 	ret = dlb2_iface_dir_port_create(handle, &cfg,  dlb2->poll_mode);
1806 	if (ret < 0) {
1807 		DLB2_LOG_ERR("dlb2: dlb2_dir_port_create error, ret=%d (driver status: %s)",
1808 			     ret, dlb2_error_strings[cfg.response.status]);
1809 		goto error_exit;
1810 	}
1811 
1812 	qm_port_id = cfg.response.id;
1813 
1814 	DLB2_LOG_LINE_DBG("dlb2: ev_port %d uses qm DIR port %d <<<<<",
1815 		     ev_port->id, qm_port_id);
1816 
1817 	qm_port = &ev_port->qm_port;
1818 	qm_port->ev_port = ev_port; /* back ptr */
1819 	qm_port->dlb2 = dlb2;  /* back ptr */
1820 
1821 	/*
1822 	 * Init local qe struct(s).
1823 	 * Note: MOVDIR64 requires the enqueue QE to be aligned
1824 	 */
1825 
1826 	snprintf(mz_name, sizeof(mz_name), "dlb2_dir_port%d",
1827 		 ev_port->id);
1828 
1829 	ret = dlb2_init_qe_mem(qm_port, mz_name);
1830 
1831 	if (ret < 0) {
1832 		DLB2_LOG_ERR("dlb2: init_qe_mem failed, ret=%d", ret);
1833 		goto error_exit;
1834 	}
1835 
1836 	qm_port->id = qm_port_id;
1837 
1838 	if (dlb2->version == DLB2_HW_V2) {
1839 		qm_port->cached_ldb_credits = 0;
1840 		qm_port->cached_dir_credits = 0;
1841 	} else
1842 		qm_port->cached_credits = 0;
1843 
1844 	/* CQs with depth < 8 use an 8-entry queue, but withhold credits so
1845 	 * the effective depth is smaller.
1846 	 */
1847 	qm_port->cq_depth = cfg.cq_depth <= 8 ? 8 : cfg.cq_depth;
1848 	qm_port->cq_idx = 0;
1849 	qm_port->cq_idx_unmasked = 0;
1850 
1851 	if (dlb2->poll_mode == DLB2_CQ_POLL_MODE_SPARSE)
1852 		qm_port->cq_depth_mask = (cfg.cq_depth * 4) - 1;
1853 	else
1854 		qm_port->cq_depth_mask = cfg.cq_depth - 1;
1855 
1856 	qm_port->gen_bit_shift = rte_popcount32(qm_port->cq_depth_mask);
1857 	/* starting value of gen bit - it toggles at wrap time */
1858 	qm_port->gen_bit = 1;
1859 	dlb2_hw_cq_bitmask_init(qm_port, qm_port->cq_depth);
1860 
1861 	qm_port->int_armed = false;
1862 
1863 	/* Save off for later use in info and lookup APIs. */
1864 	qm_port->qid_mappings = &dlb2->qm_dir_to_ev_queue_id[0];
1865 
1866 	qm_port->dequeue_depth = dequeue_depth;
1867 
1868 	/* Directed ports are auto-pop, by default. */
1869 	qm_port->token_pop_mode = AUTO_POP;
1870 	qm_port->owed_tokens = 0;
1871 	qm_port->issued_releases = 0;
1872 
1873 	/* Save config message too. */
1874 	rte_memcpy(&qm_port->cfg.dir, &cfg, sizeof(qm_port->cfg.dir));
1875 
1876 	/* update state */
1877 	qm_port->state = PORT_STARTED; /* enabled at create time */
1878 	qm_port->config_state = DLB2_CONFIGURED;
1879 
1880 	if (dlb2->version == DLB2_HW_V2) {
1881 		qm_port->dir_credits = dir_credit_high_watermark;
1882 		qm_port->ldb_credits = ldb_credit_high_watermark;
1883 		qm_port->credit_pool[DLB2_DIR_QUEUE] = &dlb2->dir_credit_pool;
1884 		qm_port->credit_pool[DLB2_LDB_QUEUE] = &dlb2->ldb_credit_pool;
1885 
1886 		DLB2_LOG_LINE_DBG("dlb2: created dir port %d, depth = %d cr=%d,%d",
1887 			     qm_port_id,
1888 			     dequeue_depth,
1889 			     dir_credit_high_watermark,
1890 			     ldb_credit_high_watermark);
1891 	} else {
1892 		qm_port->credits = credit_high_watermark;
1893 		qm_port->credit_pool[DLB2_COMBINED_POOL] = &dlb2->credit_pool;
1894 
1895 		DLB2_LOG_LINE_DBG("dlb2: created dir port %d, depth = %d cr=%d",
1896 			     qm_port_id,
1897 			     dequeue_depth,
1898 			     credit_high_watermark);
1899 	}
1900 
1901 #if (!defined RTE_ARCH_X86_64)
1902 	qm_port->use_scalar = true;
1903 #else
1904 	if ((qm_port->cq_depth > 64) ||
1905 	    (!rte_is_power_of_2(qm_port->cq_depth)) ||
1906 	    (dlb2->vector_opts_enabled == false))
1907 		qm_port->use_scalar = true;
1908 #endif
1909 
1910 	rte_spinlock_unlock(&handle->resource_lock);
1911 
1912 	return 0;
1913 
1914 error_exit:
1915 
1916 	if (qm_port)
1917 		dlb2_free_qe_mem(qm_port);
1918 
1919 	rte_spinlock_unlock(&handle->resource_lock);
1920 
1921 	DLB2_LOG_ERR("dlb2: create dir port failed!");
1922 
1923 	return ret;
1924 }
1925 
1926 static int
1927 dlb2_eventdev_port_setup(struct rte_eventdev *dev,
1928 			 uint8_t ev_port_id,
1929 			 const struct rte_event_port_conf *port_conf)
1930 {
1931 	struct dlb2_eventdev *dlb2;
1932 	struct dlb2_eventdev_port *ev_port;
1933 	uint32_t hw_credit_quanta, sw_credit_quanta;
1934 	int ret;
1935 
1936 	if (dev == NULL || port_conf == NULL) {
1937 		DLB2_LOG_ERR("Null parameter");
1938 		return -EINVAL;
1939 	}
1940 
1941 	dlb2 = dlb2_pmd_priv(dev);
1942 
1943 	if (ev_port_id >= DLB2_MAX_NUM_PORTS(dlb2->version))
1944 		return -EINVAL;
1945 
1946 	if (port_conf->dequeue_depth >
1947 		evdev_dlb2_default_info.max_event_port_dequeue_depth ||
1948 	    port_conf->enqueue_depth >
1949 		evdev_dlb2_default_info.max_event_port_enqueue_depth)
1950 		return -EINVAL;
1951 
1952 	if ((port_conf->event_port_cfg & RTE_EVENT_PORT_CFG_INDEPENDENT_ENQ) &&
1953 	    port_conf->dequeue_depth > DLB2_MAX_CQ_DEPTH_REORDER) {
1954 		DLB2_LOG_ERR("evport %d: Max dequeue depth supported with reorder is %d",
1955 			     ev_port_id, DLB2_MAX_CQ_DEPTH_REORDER);
1956 		return -EINVAL;
1957 	}
1958 
1959 	ev_port = &dlb2->ev_ports[ev_port_id];
1960 	/* configured? */
1961 	if (ev_port->setup_done) {
1962 		DLB2_LOG_ERR("evport %d is already configured", ev_port_id);
1963 		return -EINVAL;
1964 	}
1965 
1966 	/* Default for worker ports */
1967 	sw_credit_quanta = dlb2->sw_credit_quanta;
1968 	hw_credit_quanta = dlb2->hw_credit_quanta;
1969 
1970 	ev_port->qm_port.is_producer = false;
1971 	ev_port->qm_port.is_directed = port_conf->event_port_cfg &
1972 		RTE_EVENT_PORT_CFG_SINGLE_LINK;
1973 
1974 	if (port_conf->event_port_cfg & RTE_EVENT_PORT_CFG_HINT_PRODUCER) {
1975 		/* Producer type ports. Mostly enqueue */
1976 		sw_credit_quanta = DLB2_SW_CREDIT_P_QUANTA_DEFAULT;
1977 		hw_credit_quanta = DLB2_SW_CREDIT_P_BATCH_SZ;
1978 		ev_port->qm_port.is_producer = true;
1979 	}
1980 	if (port_conf->event_port_cfg & RTE_EVENT_PORT_CFG_HINT_CONSUMER) {
1981 		/* Consumer type ports. Mostly dequeue */
1982 		sw_credit_quanta = DLB2_SW_CREDIT_C_QUANTA_DEFAULT;
1983 		hw_credit_quanta = DLB2_SW_CREDIT_C_BATCH_SZ;
1984 	}
1985 	ev_port->credit_update_quanta = sw_credit_quanta;
1986 	ev_port->qm_port.hw_credit_quanta = hw_credit_quanta;
1987 
1988 	/*
1989 	 * Validate credit config before creating port
1990 	 */
1991 
1992 	if (port_conf->enqueue_depth > sw_credit_quanta ||
1993 	    port_conf->enqueue_depth > hw_credit_quanta) {
1994 		DLB2_LOG_ERR("Invalid port config. Enqueue depth %d must be <= credit quanta %d and batch size %d",
1995 			     port_conf->enqueue_depth,
1996 			     sw_credit_quanta,
1997 			     hw_credit_quanta);
1998 		return -EINVAL;
1999 	}
2000 	ev_port->enq_retries = port_conf->enqueue_depth;
2001 
2002 	ev_port->qm_port.reorder_id = 0;
2003 	ev_port->qm_port.reorder_en = port_conf->event_port_cfg &
2004 				      RTE_EVENT_PORT_CFG_INDEPENDENT_ENQ;
2005 
2006 	/* Save off port config for reconfig */
2007 	ev_port->conf = *port_conf;
2008 
2009 
2010 	/*
2011 	 * Create port
2012 	 */
2013 
2014 	if (!ev_port->qm_port.is_directed) {
2015 		ret = dlb2_hw_create_ldb_port(dlb2,
2016 					      ev_port,
2017 					      port_conf->dequeue_depth,
2018 					      port_conf->enqueue_depth);
2019 		if (ret < 0) {
2020 			DLB2_LOG_ERR("Failed to create the lB port ve portId=%d",
2021 				     ev_port_id);
2022 
2023 			return ret;
2024 		}
2025 	} else {
2026 		ret = dlb2_hw_create_dir_port(dlb2,
2027 					      ev_port,
2028 					      port_conf->dequeue_depth,
2029 					      port_conf->enqueue_depth);
2030 		if (ret < 0) {
2031 			DLB2_LOG_ERR("Failed to create the DIR port");
2032 			return ret;
2033 		}
2034 	}
2035 
2036 	ev_port->id = ev_port_id;
2037 	ev_port->enq_configured = true;
2038 	ev_port->setup_done = true;
2039 	ev_port->inflight_max = port_conf->new_event_threshold;
2040 	ev_port->implicit_release = !(port_conf->event_port_cfg &
2041 		  RTE_EVENT_PORT_CFG_DISABLE_IMPL_REL);
2042 	ev_port->outstanding_releases = 0;
2043 	ev_port->inflight_credits = 0;
2044 	ev_port->dlb2 = dlb2; /* reverse link */
2045 
2046 	/* Default for worker ports */
2047 	sw_credit_quanta = dlb2->sw_credit_quanta;
2048 	hw_credit_quanta = dlb2->hw_credit_quanta;
2049 
2050 	if (port_conf->event_port_cfg & RTE_EVENT_PORT_CFG_HINT_PRODUCER) {
2051 		/* Producer type ports. Mostly enqueue */
2052 		sw_credit_quanta = DLB2_SW_CREDIT_P_QUANTA_DEFAULT;
2053 		hw_credit_quanta = DLB2_SW_CREDIT_P_BATCH_SZ;
2054 	}
2055 	if (port_conf->event_port_cfg & RTE_EVENT_PORT_CFG_HINT_CONSUMER) {
2056 		/* Consumer type ports. Mostly dequeue */
2057 		sw_credit_quanta = DLB2_SW_CREDIT_C_QUANTA_DEFAULT;
2058 		hw_credit_quanta = DLB2_SW_CREDIT_C_BATCH_SZ;
2059 	}
2060 	ev_port->credit_update_quanta = sw_credit_quanta;
2061 	ev_port->qm_port.hw_credit_quanta = hw_credit_quanta;
2062 
2063 
2064 	/* Tear down pre-existing port->queue links */
2065 	if (dlb2->run_state == DLB2_RUN_STATE_STOPPED)
2066 		dlb2_port_link_teardown(dlb2, &dlb2->ev_ports[ev_port_id]);
2067 
2068 	dev->data->ports[ev_port_id] = &dlb2->ev_ports[ev_port_id];
2069 
2070 	if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_AVX512VL) &&
2071 	    rte_vect_get_max_simd_bitwidth() >= RTE_VECT_SIMD_512)
2072 		ev_port->qm_port.use_avx512 = true;
2073 	else
2074 		ev_port->qm_port.use_avx512 = false;
2075 
2076 	return 0;
2077 }
2078 
2079 static int16_t
2080 dlb2_hw_map_ldb_qid_to_port(struct dlb2_hw_dev *handle,
2081 			    uint32_t qm_port_id,
2082 			    uint16_t qm_qid,
2083 			    uint8_t priority)
2084 {
2085 	struct dlb2_map_qid_args cfg;
2086 	int32_t ret;
2087 
2088 	if (handle == NULL)
2089 		return -EINVAL;
2090 
2091 	/* Build message */
2092 	cfg.port_id = qm_port_id;
2093 	cfg.qid = qm_qid;
2094 	cfg.priority = EV_TO_DLB2_PRIO(priority);
2095 
2096 	ret = dlb2_iface_map_qid(handle, &cfg);
2097 	if (ret < 0) {
2098 		DLB2_LOG_ERR("dlb2: map qid error, ret=%d (driver status: %s)",
2099 			     ret, dlb2_error_strings[cfg.response.status]);
2100 		DLB2_LOG_ERR("dlb2: grp=%d, qm_port=%d, qm_qid=%d prio=%d",
2101 			     handle->domain_id, cfg.port_id,
2102 			     cfg.qid,
2103 			     cfg.priority);
2104 	} else {
2105 		DLB2_LOG_LINE_DBG("dlb2: mapped queue %d to qm_port %d",
2106 			     qm_qid, qm_port_id);
2107 	}
2108 
2109 	return ret;
2110 }
2111 
2112 static int
2113 dlb2_event_queue_join_ldb(struct dlb2_eventdev *dlb2,
2114 			  struct dlb2_eventdev_port *ev_port,
2115 			  struct dlb2_eventdev_queue *ev_queue,
2116 			  uint8_t priority)
2117 {
2118 	int first_avail = -1;
2119 	int ret, i;
2120 
2121 	for (i = 0; i < DLB2_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
2122 		if (ev_port->link[i].valid) {
2123 			if (ev_port->link[i].queue_id == ev_queue->id &&
2124 			    ev_port->link[i].priority == priority) {
2125 				if (ev_port->link[i].mapped)
2126 					return 0; /* already mapped */
2127 				first_avail = i;
2128 			}
2129 		} else if (first_avail == -1)
2130 			first_avail = i;
2131 	}
2132 	if (first_avail == -1) {
2133 		DLB2_LOG_ERR("dlb2: qm_port %d has no available QID slots.",
2134 			     ev_port->qm_port.id);
2135 		return -EINVAL;
2136 	}
2137 
2138 	ret = dlb2_hw_map_ldb_qid_to_port(&dlb2->qm_instance,
2139 					  ev_port->qm_port.id,
2140 					  ev_queue->qm_queue.id,
2141 					  priority);
2142 
2143 	if (!ret)
2144 		ev_port->link[first_avail].mapped = true;
2145 
2146 	return ret;
2147 }
2148 
2149 static int32_t
2150 dlb2_hw_create_dir_queue(struct dlb2_eventdev *dlb2,
2151 			 struct dlb2_eventdev_queue *ev_queue,
2152 			 int32_t qm_port_id)
2153 {
2154 	struct dlb2_hw_dev *handle = &dlb2->qm_instance;
2155 	struct dlb2_create_dir_queue_args cfg;
2156 	int32_t ret;
2157 
2158 	/* The directed port is always configured before its queue */
2159 	cfg.port_id = qm_port_id;
2160 
2161 	if (ev_queue->depth_threshold == 0) {
2162 		cfg.depth_threshold = dlb2->default_depth_thresh;
2163 		ev_queue->depth_threshold =
2164 			dlb2->default_depth_thresh;
2165 	} else
2166 		cfg.depth_threshold = ev_queue->depth_threshold;
2167 
2168 	ret = dlb2_iface_dir_queue_create(handle, &cfg);
2169 	if (ret < 0) {
2170 		DLB2_LOG_ERR("dlb2: create DIR event queue error, ret=%d (driver status: %s)",
2171 			     ret, dlb2_error_strings[cfg.response.status]);
2172 		return -EINVAL;
2173 	}
2174 
2175 	return cfg.response.id;
2176 }
2177 
2178 static int
2179 dlb2_eventdev_dir_queue_setup(struct dlb2_eventdev *dlb2,
2180 			      struct dlb2_eventdev_queue *ev_queue,
2181 			      struct dlb2_eventdev_port *ev_port)
2182 {
2183 	int32_t qm_qid;
2184 
2185 	qm_qid = dlb2_hw_create_dir_queue(dlb2, ev_queue, ev_port->qm_port.id);
2186 
2187 	if (qm_qid < 0) {
2188 		DLB2_LOG_ERR("Failed to create the DIR queue");
2189 		return qm_qid;
2190 	}
2191 
2192 	dlb2->qm_dir_to_ev_queue_id[qm_qid] = ev_queue->id;
2193 
2194 	ev_queue->qm_queue.id = qm_qid;
2195 
2196 	return 0;
2197 }
2198 
2199 static int
2200 dlb2_do_port_link(struct rte_eventdev *dev,
2201 		  struct dlb2_eventdev_queue *ev_queue,
2202 		  struct dlb2_eventdev_port *ev_port,
2203 		  uint8_t prio)
2204 {
2205 	struct dlb2_eventdev *dlb2 = dlb2_pmd_priv(dev);
2206 	int err;
2207 
2208 	/* Don't link until start time. */
2209 	if (dlb2->run_state == DLB2_RUN_STATE_STOPPED)
2210 		return 0;
2211 
2212 	if (ev_queue->qm_queue.is_directed)
2213 		err = dlb2_eventdev_dir_queue_setup(dlb2, ev_queue, ev_port);
2214 	else
2215 		err = dlb2_event_queue_join_ldb(dlb2, ev_port, ev_queue, prio);
2216 
2217 	if (err) {
2218 		DLB2_LOG_ERR("port link failure for %s ev_q %d, ev_port %d",
2219 			     ev_queue->qm_queue.is_directed ? "DIR" : "LDB",
2220 			     ev_queue->id, ev_port->id);
2221 
2222 		rte_errno = err;
2223 		return -1;
2224 	}
2225 
2226 	return 0;
2227 }
2228 
2229 static int
2230 dlb2_validate_port_link(struct dlb2_eventdev_port *ev_port,
2231 			uint8_t queue_id,
2232 			bool link_exists,
2233 			int index)
2234 {
2235 	struct dlb2_eventdev *dlb2 = ev_port->dlb2;
2236 	struct dlb2_eventdev_queue *ev_queue;
2237 	bool port_is_dir, queue_is_dir;
2238 
2239 	if (queue_id > dlb2->num_queues) {
2240 		rte_errno = -EINVAL;
2241 		return -1;
2242 	}
2243 
2244 	ev_queue = &dlb2->ev_queues[queue_id];
2245 
2246 	if (!ev_queue->setup_done &&
2247 	    ev_queue->qm_queue.config_state != DLB2_PREV_CONFIGURED) {
2248 		rte_errno = -EINVAL;
2249 		return -1;
2250 	}
2251 
2252 	port_is_dir = ev_port->qm_port.is_directed;
2253 	queue_is_dir = ev_queue->qm_queue.is_directed;
2254 
2255 	if (port_is_dir != queue_is_dir) {
2256 		DLB2_LOG_ERR("%s queue %u can't link to %s port %u",
2257 			     queue_is_dir ? "DIR" : "LDB", ev_queue->id,
2258 			     port_is_dir ? "DIR" : "LDB", ev_port->id);
2259 
2260 		rte_errno = -EINVAL;
2261 		return -1;
2262 	}
2263 
2264 	/* Check if there is space for the requested link */
2265 	if (!link_exists && index == -1) {
2266 		DLB2_LOG_ERR("no space for new link");
2267 		rte_errno = -ENOSPC;
2268 		return -1;
2269 	}
2270 
2271 	/* Check if the directed port is already linked */
2272 	if (ev_port->qm_port.is_directed && ev_port->num_links > 0 &&
2273 	    !link_exists) {
2274 		DLB2_LOG_ERR("Can't link DIR port %d to >1 queues",
2275 			     ev_port->id);
2276 		rte_errno = -EINVAL;
2277 		return -1;
2278 	}
2279 
2280 	/* Check if the directed queue is already linked */
2281 	if (ev_queue->qm_queue.is_directed && ev_queue->num_links > 0 &&
2282 	    !link_exists) {
2283 		DLB2_LOG_ERR("Can't link DIR queue %d to >1 ports",
2284 			     ev_queue->id);
2285 		rte_errno = -EINVAL;
2286 		return -1;
2287 	}
2288 
2289 	return 0;
2290 }
2291 
2292 static int
2293 dlb2_eventdev_port_link(struct rte_eventdev *dev, void *event_port,
2294 			const uint8_t queues[], const uint8_t priorities[],
2295 			uint16_t nb_links)
2296 
2297 {
2298 	struct dlb2_eventdev_port *ev_port = event_port;
2299 	struct dlb2_eventdev *dlb2;
2300 	int i, j;
2301 
2302 	RTE_SET_USED(dev);
2303 
2304 	if (ev_port == NULL) {
2305 		DLB2_LOG_ERR("dlb2: evport not setup");
2306 		rte_errno = -EINVAL;
2307 		return 0;
2308 	}
2309 
2310 	if (!ev_port->setup_done &&
2311 	    ev_port->qm_port.config_state != DLB2_PREV_CONFIGURED) {
2312 		DLB2_LOG_ERR("dlb2: evport not setup");
2313 		rte_errno = -EINVAL;
2314 		return 0;
2315 	}
2316 
2317 	/* Note: rte_event_port_link() ensures the PMD won't receive a NULL
2318 	 * queues pointer.
2319 	 */
2320 	if (nb_links == 0) {
2321 		DLB2_LOG_LINE_DBG("dlb2: nb_links is 0");
2322 		return 0; /* Ignore and return success */
2323 	}
2324 
2325 	dlb2 = ev_port->dlb2;
2326 
2327 	DLB2_LOG_LINE_DBG("Linking %u queues to %s port %d",
2328 		     nb_links,
2329 		     ev_port->qm_port.is_directed ? "DIR" : "LDB",
2330 		     ev_port->id);
2331 
2332 	for (i = 0; i < nb_links; i++) {
2333 		struct dlb2_eventdev_queue *ev_queue;
2334 		uint8_t queue_id, prio;
2335 		bool found = false;
2336 		int index = -1;
2337 
2338 		queue_id = queues[i];
2339 		prio = priorities[i];
2340 
2341 		/* Check if the link already exists. */
2342 		for (j = 0; j < DLB2_MAX_NUM_QIDS_PER_LDB_CQ; j++)
2343 			if (ev_port->link[j].valid) {
2344 				if (ev_port->link[j].queue_id == queue_id) {
2345 					found = true;
2346 					index = j;
2347 					break;
2348 				}
2349 			} else if (index == -1) {
2350 				index = j;
2351 			}
2352 
2353 		/* could not link */
2354 		if (index == -1)
2355 			break;
2356 
2357 		/* Check if already linked at the requested priority */
2358 		if (found && ev_port->link[j].priority == prio)
2359 			continue;
2360 
2361 		if (dlb2_validate_port_link(ev_port, queue_id, found, index))
2362 			break; /* return index of offending queue */
2363 
2364 		ev_queue = &dlb2->ev_queues[queue_id];
2365 
2366 		if (dlb2_do_port_link(dev, ev_queue, ev_port, prio))
2367 			break; /* return index of offending queue */
2368 
2369 		ev_queue->num_links++;
2370 
2371 		ev_port->link[index].queue_id = queue_id;
2372 		ev_port->link[index].priority = prio;
2373 		ev_port->link[index].valid = true;
2374 		/* Entry already exists?  If so, then must be prio change */
2375 		if (!found)
2376 			ev_port->num_links++;
2377 	}
2378 	return i;
2379 }
2380 
2381 static int16_t
2382 dlb2_hw_unmap_ldb_qid_from_port(struct dlb2_hw_dev *handle,
2383 				uint32_t qm_port_id,
2384 				uint16_t qm_qid)
2385 {
2386 	struct dlb2_unmap_qid_args cfg;
2387 	int32_t ret;
2388 
2389 	if (handle == NULL)
2390 		return -EINVAL;
2391 
2392 	cfg.port_id = qm_port_id;
2393 	cfg.qid = qm_qid;
2394 
2395 	ret = dlb2_iface_unmap_qid(handle, &cfg);
2396 	if (ret < 0)
2397 		DLB2_LOG_ERR("dlb2: unmap qid error, ret=%d (driver status: %s)",
2398 			     ret, dlb2_error_strings[cfg.response.status]);
2399 
2400 	return ret;
2401 }
2402 
2403 static int
2404 dlb2_event_queue_detach_ldb(struct dlb2_eventdev *dlb2,
2405 			    struct dlb2_eventdev_port *ev_port,
2406 			    struct dlb2_eventdev_queue *ev_queue)
2407 {
2408 	int ret, i;
2409 
2410 	/* Don't unlink until start time. */
2411 	if (dlb2->run_state == DLB2_RUN_STATE_STOPPED)
2412 		return 0;
2413 
2414 	for (i = 0; i < DLB2_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
2415 		if (ev_port->link[i].valid &&
2416 		    ev_port->link[i].queue_id == ev_queue->id)
2417 			break; /* found */
2418 	}
2419 
2420 	/* This is expected with eventdev API!
2421 	 * It blindly attempts to unmap all queues.
2422 	 */
2423 	if (i == DLB2_MAX_NUM_QIDS_PER_LDB_CQ) {
2424 		DLB2_LOG_LINE_DBG("dlb2: ignoring LB QID %d not mapped for qm_port %d.",
2425 			     ev_queue->qm_queue.id,
2426 			     ev_port->qm_port.id);
2427 		return 0;
2428 	}
2429 
2430 	ret = dlb2_hw_unmap_ldb_qid_from_port(&dlb2->qm_instance,
2431 					      ev_port->qm_port.id,
2432 					      ev_queue->qm_queue.id);
2433 	if (!ret)
2434 		ev_port->link[i].mapped = false;
2435 
2436 	return ret;
2437 }
2438 
2439 static int
2440 dlb2_eventdev_port_unlink(struct rte_eventdev *dev, void *event_port,
2441 			  uint8_t queues[], uint16_t nb_unlinks)
2442 {
2443 	struct dlb2_eventdev_port *ev_port = event_port;
2444 	struct dlb2_eventdev *dlb2;
2445 	int i;
2446 
2447 	RTE_SET_USED(dev);
2448 
2449 	if (!ev_port->setup_done) {
2450 		DLB2_LOG_ERR("dlb2: evport %d is not configured",
2451 			     ev_port->id);
2452 		rte_errno = -EINVAL;
2453 		return 0;
2454 	}
2455 
2456 	if (queues == NULL || nb_unlinks == 0) {
2457 		DLB2_LOG_LINE_DBG("dlb2: queues is NULL or nb_unlinks is 0");
2458 		return 0; /* Ignore and return success */
2459 	}
2460 
2461 	if (ev_port->qm_port.is_directed) {
2462 		DLB2_LOG_LINE_DBG("dlb2: ignore unlink from dir port %d",
2463 			     ev_port->id);
2464 		rte_errno = 0;
2465 		return nb_unlinks; /* as if success */
2466 	}
2467 
2468 	dlb2 = ev_port->dlb2;
2469 
2470 	for (i = 0; i < nb_unlinks; i++) {
2471 		struct dlb2_eventdev_queue *ev_queue;
2472 		int ret, j;
2473 
2474 		if (queues[i] >= dlb2->num_queues) {
2475 			DLB2_LOG_ERR("dlb2: invalid queue id %d", queues[i]);
2476 			rte_errno = -EINVAL;
2477 			return i; /* return index of offending queue */
2478 		}
2479 
2480 		ev_queue = &dlb2->ev_queues[queues[i]];
2481 
2482 		/* Does a link exist? */
2483 		for (j = 0; j < DLB2_MAX_NUM_QIDS_PER_LDB_CQ; j++)
2484 			if (ev_port->link[j].queue_id == queues[i] &&
2485 			    ev_port->link[j].valid)
2486 				break;
2487 
2488 		if (j == DLB2_MAX_NUM_QIDS_PER_LDB_CQ)
2489 			continue;
2490 
2491 		ret = dlb2_event_queue_detach_ldb(dlb2, ev_port, ev_queue);
2492 		if (ret) {
2493 			DLB2_LOG_ERR("unlink err=%d for port %d queue %d",
2494 				     ret, ev_port->id, queues[i]);
2495 			rte_errno = -ENOENT;
2496 			return i; /* return index of offending queue */
2497 		}
2498 
2499 		ev_port->link[j].valid = false;
2500 		ev_port->num_links--;
2501 		ev_queue->num_links--;
2502 	}
2503 
2504 	return nb_unlinks;
2505 }
2506 
2507 static int
2508 dlb2_eventdev_port_unlinks_in_progress(struct rte_eventdev *dev,
2509 				       void *event_port)
2510 {
2511 	struct dlb2_eventdev_port *ev_port = event_port;
2512 	struct dlb2_eventdev *dlb2;
2513 	struct dlb2_hw_dev *handle;
2514 	struct dlb2_pending_port_unmaps_args cfg;
2515 	int ret;
2516 
2517 	RTE_SET_USED(dev);
2518 
2519 	if (!ev_port->setup_done) {
2520 		DLB2_LOG_ERR("dlb2: evport %d is not configured",
2521 			     ev_port->id);
2522 		rte_errno = -EINVAL;
2523 		return 0;
2524 	}
2525 
2526 	cfg.port_id = ev_port->qm_port.id;
2527 	dlb2 = ev_port->dlb2;
2528 	handle = &dlb2->qm_instance;
2529 	ret = dlb2_iface_pending_port_unmaps(handle, &cfg);
2530 
2531 	if (ret < 0) {
2532 		DLB2_LOG_ERR("dlb2: num_unlinks_in_progress ret=%d (driver status: %s)",
2533 			     ret, dlb2_error_strings[cfg.response.status]);
2534 		return ret;
2535 	}
2536 
2537 	return cfg.response.id;
2538 }
2539 
2540 static int
2541 dlb2_eventdev_reapply_configuration(struct rte_eventdev *dev)
2542 {
2543 	struct dlb2_eventdev *dlb2 = dlb2_pmd_priv(dev);
2544 	int ret, i;
2545 
2546 	/* If an event queue or port was previously configured, but hasn't been
2547 	 * reconfigured, reapply its original configuration.
2548 	 */
2549 	for (i = 0; i < dlb2->num_queues; i++) {
2550 		struct dlb2_eventdev_queue *ev_queue;
2551 
2552 		ev_queue = &dlb2->ev_queues[i];
2553 
2554 		if (ev_queue->qm_queue.config_state != DLB2_PREV_CONFIGURED)
2555 			continue;
2556 
2557 		ret = dlb2_eventdev_queue_setup(dev, i, &ev_queue->conf);
2558 		if (ret < 0) {
2559 			DLB2_LOG_ERR("dlb2: failed to reconfigure queue %d", i);
2560 			return ret;
2561 		}
2562 	}
2563 
2564 	for (i = 0; i < dlb2->num_ports; i++) {
2565 		struct dlb2_eventdev_port *ev_port = &dlb2->ev_ports[i];
2566 
2567 		if (ev_port->qm_port.config_state != DLB2_PREV_CONFIGURED)
2568 			continue;
2569 
2570 		ret = dlb2_eventdev_port_setup(dev, i, &ev_port->conf);
2571 		if (ret < 0) {
2572 			DLB2_LOG_ERR("dlb2: failed to reconfigure ev_port %d",
2573 				     i);
2574 			return ret;
2575 		}
2576 	}
2577 
2578 	return 0;
2579 }
2580 
2581 static int
2582 dlb2_eventdev_apply_port_links(struct rte_eventdev *dev)
2583 {
2584 	struct dlb2_eventdev *dlb2 = dlb2_pmd_priv(dev);
2585 	int i;
2586 
2587 	/* Perform requested port->queue links */
2588 	for (i = 0; i < dlb2->num_ports; i++) {
2589 		struct dlb2_eventdev_port *ev_port = &dlb2->ev_ports[i];
2590 		int j;
2591 
2592 		for (j = 0; j < DLB2_MAX_NUM_QIDS_PER_LDB_CQ; j++) {
2593 			struct dlb2_eventdev_queue *ev_queue;
2594 			uint8_t prio, queue_id;
2595 
2596 			if (!ev_port->link[j].valid)
2597 				continue;
2598 
2599 			prio = ev_port->link[j].priority;
2600 			queue_id = ev_port->link[j].queue_id;
2601 
2602 			if (dlb2_validate_port_link(ev_port, queue_id, true, j))
2603 				return -EINVAL;
2604 
2605 			ev_queue = &dlb2->ev_queues[queue_id];
2606 
2607 			if (dlb2_do_port_link(dev, ev_queue, ev_port, prio))
2608 				return -EINVAL;
2609 		}
2610 	}
2611 
2612 	return 0;
2613 }
2614 
2615 static int
2616 dlb2_eventdev_start(struct rte_eventdev *dev)
2617 {
2618 	struct dlb2_eventdev *dlb2 = dlb2_pmd_priv(dev);
2619 	struct dlb2_hw_dev *handle = &dlb2->qm_instance;
2620 	struct dlb2_start_domain_args cfg;
2621 	int ret, i;
2622 
2623 	rte_spinlock_lock(&dlb2->qm_instance.resource_lock);
2624 	if (dlb2->run_state != DLB2_RUN_STATE_STOPPED) {
2625 		DLB2_LOG_ERR("bad state %d for dev_start",
2626 			     (int)dlb2->run_state);
2627 		rte_spinlock_unlock(&dlb2->qm_instance.resource_lock);
2628 		return -EINVAL;
2629 	}
2630 	dlb2->run_state = DLB2_RUN_STATE_STARTING;
2631 	rte_spinlock_unlock(&dlb2->qm_instance.resource_lock);
2632 
2633 	/* If the device was configured more than once, some event ports and/or
2634 	 * queues may need to be reconfigured.
2635 	 */
2636 	ret = dlb2_eventdev_reapply_configuration(dev);
2637 	if (ret)
2638 		return ret;
2639 
2640 	/* The DLB PMD delays port links until the device is started. */
2641 	ret = dlb2_eventdev_apply_port_links(dev);
2642 	if (ret)
2643 		return ret;
2644 
2645 	for (i = 0; i < dlb2->num_ports; i++) {
2646 		if (!dlb2->ev_ports[i].setup_done) {
2647 			DLB2_LOG_ERR("dlb2: port %d not setup", i);
2648 			return -ESTALE;
2649 		}
2650 	}
2651 
2652 	for (i = 0; i < dlb2->num_queues; i++) {
2653 		if (dlb2->ev_queues[i].num_links == 0) {
2654 			DLB2_LOG_ERR("dlb2: queue %d is not linked", i);
2655 			return -ENOLINK;
2656 		}
2657 	}
2658 
2659 	ret = dlb2_iface_sched_domain_start(handle, &cfg);
2660 	if (ret < 0) {
2661 		DLB2_LOG_ERR("dlb2: sched_domain_start ret=%d (driver status: %s)",
2662 			     ret, dlb2_error_strings[cfg.response.status]);
2663 		return ret;
2664 	}
2665 
2666 	dlb2->run_state = DLB2_RUN_STATE_STARTED;
2667 	DLB2_LOG_LINE_DBG("dlb2: sched_domain_start completed OK");
2668 
2669 	return 0;
2670 }
2671 
2672 static inline uint32_t
2673 dlb2_port_credits_get(struct dlb2_port *qm_port,
2674 		      enum dlb2_hw_queue_types type)
2675 {
2676 	uint32_t credits = *qm_port->credit_pool[type];
2677 	/* By default hw_credit_quanta is DLB2_SW_CREDIT_BATCH_SZ */
2678 	uint32_t batch_size = qm_port->hw_credit_quanta;
2679 
2680 	if (unlikely(credits < batch_size))
2681 		batch_size = credits;
2682 
2683 	if (likely(credits &&
2684 		   rte_atomic_compare_exchange_strong_explicit(
2685 			qm_port->credit_pool[type],
2686 			&credits, credits - batch_size,
2687 			rte_memory_order_seq_cst, rte_memory_order_seq_cst)))
2688 		return batch_size;
2689 	else
2690 		return 0;
2691 }
2692 
2693 static inline void
2694 dlb2_replenish_sw_credits(struct dlb2_eventdev *dlb2,
2695 			  struct dlb2_eventdev_port *ev_port)
2696 {
2697 	uint16_t quanta = ev_port->credit_update_quanta;
2698 
2699 	if (ev_port->inflight_credits >= quanta * 2) {
2700 		/* Replenish credits, saving one quanta for enqueues */
2701 		uint16_t val = ev_port->inflight_credits - quanta;
2702 
2703 		rte_atomic_fetch_sub_explicit(&dlb2->inflights, val, rte_memory_order_seq_cst);
2704 		ev_port->inflight_credits -= val;
2705 	}
2706 }
2707 
2708 static inline int
2709 dlb2_check_enqueue_sw_credits(struct dlb2_eventdev *dlb2,
2710 			      struct dlb2_eventdev_port *ev_port)
2711 {
2712 	uint32_t sw_inflights = rte_atomic_load_explicit(&dlb2->inflights,
2713 						rte_memory_order_seq_cst);
2714 	const int num = 1;
2715 
2716 	if (unlikely(ev_port->inflight_max < sw_inflights)) {
2717 		DLB2_INC_STAT(ev_port->stats.traffic.tx_nospc_inflight_max, 1);
2718 		rte_errno = -ENOSPC;
2719 		return 1;
2720 	}
2721 
2722 	if (ev_port->inflight_credits < num) {
2723 		/* check if event enqueue brings ev_port over max threshold */
2724 		uint32_t credit_update_quanta = ev_port->credit_update_quanta;
2725 
2726 		if (sw_inflights + credit_update_quanta >
2727 				dlb2->new_event_limit) {
2728 			DLB2_INC_STAT(
2729 			ev_port->stats.traffic.tx_nospc_new_event_limit,
2730 			1);
2731 			rte_errno = -ENOSPC;
2732 			return 1;
2733 		}
2734 
2735 		rte_atomic_fetch_add_explicit(&dlb2->inflights, credit_update_quanta,
2736 				   rte_memory_order_seq_cst);
2737 		ev_port->inflight_credits += (credit_update_quanta);
2738 
2739 		if (ev_port->inflight_credits < num) {
2740 			DLB2_INC_STAT(
2741 			ev_port->stats.traffic.tx_nospc_inflight_credits,
2742 			1);
2743 			rte_errno = -ENOSPC;
2744 			return 1;
2745 		}
2746 	}
2747 
2748 	return 0;
2749 }
2750 
2751 static inline int
2752 dlb2_check_enqueue_hw_ldb_credits(struct dlb2_port *qm_port)
2753 {
2754 	if (unlikely(qm_port->cached_ldb_credits == 0)) {
2755 		qm_port->cached_ldb_credits =
2756 			dlb2_port_credits_get(qm_port,
2757 					      DLB2_LDB_QUEUE);
2758 		if (unlikely(qm_port->cached_ldb_credits == 0)) {
2759 			DLB2_INC_STAT(
2760 			qm_port->ev_port->stats.traffic.tx_nospc_ldb_hw_credits,
2761 			1);
2762 			DLB2_LOG_LINE_DBG("ldb credits exhausted");
2763 			return 1; /* credits exhausted */
2764 		}
2765 	}
2766 
2767 	return 0;
2768 }
2769 
2770 static inline int
2771 dlb2_check_enqueue_hw_dir_credits(struct dlb2_port *qm_port)
2772 {
2773 	if (unlikely(qm_port->cached_dir_credits == 0)) {
2774 		qm_port->cached_dir_credits =
2775 			dlb2_port_credits_get(qm_port,
2776 					      DLB2_DIR_QUEUE);
2777 		if (unlikely(qm_port->cached_dir_credits == 0)) {
2778 			DLB2_INC_STAT(
2779 			qm_port->ev_port->stats.traffic.tx_nospc_dir_hw_credits,
2780 			1);
2781 			DLB2_LOG_LINE_DBG("dir credits exhausted");
2782 			return 1; /* credits exhausted */
2783 		}
2784 	}
2785 
2786 	return 0;
2787 }
2788 
2789 static inline int
2790 dlb2_check_enqueue_hw_credits(struct dlb2_port *qm_port)
2791 {
2792 	if (unlikely(qm_port->cached_credits == 0)) {
2793 		qm_port->cached_credits =
2794 			dlb2_port_credits_get(qm_port,
2795 					      DLB2_COMBINED_POOL);
2796 		if (unlikely(qm_port->cached_credits == 0)) {
2797 			DLB2_INC_STAT(
2798 			qm_port->ev_port->stats.traffic.tx_nospc_hw_credits, 1);
2799 			DLB2_LOG_LINE_DBG("credits exhausted");
2800 			return 1; /* credits exhausted */
2801 		}
2802 	}
2803 
2804 	return 0;
2805 }
2806 
2807 static __rte_always_inline void
2808 dlb2_pp_write(struct process_local_port_data *port_data, struct dlb2_enqueue_qe *qe4)
2809 {
2810 	dlb2_movdir64b(port_data->pp_addr, qe4);
2811 }
2812 
2813 static __rte_always_inline void
2814 dlb2_pp_write_reorder(struct process_local_port_data *port_data,
2815 	      struct dlb2_enqueue_qe *qe4)
2816 {
2817 	for (uint8_t i = 0; i < 4; i++) {
2818 		if (qe4[i].cmd_byte != DLB2_NOOP_CMD_BYTE) {
2819 			dlb2_movdir64b(port_data->pp_addr, qe4);
2820 			return;
2821 		}
2822 	}
2823 }
2824 
2825 static __rte_always_inline int
2826 dlb2_pp_check4_write(struct process_local_port_data *port_data,
2827 	      struct dlb2_enqueue_qe *qe4)
2828 {
2829 	for (uint8_t i = 0; i < DLB2_NUM_QES_PER_CACHE_LINE; i++)
2830 		if (((uint64_t *)&qe4[i])[1] == 0)
2831 			return 0;
2832 
2833 	dlb2_movdir64b(port_data->pp_addr, qe4);
2834 	memset(qe4, 0, DLB2_NUM_QES_PER_CACHE_LINE * sizeof(struct dlb2_enqueue_qe));
2835 	return DLB2_NUM_QES_PER_CACHE_LINE;
2836 }
2837 
2838 static inline int
2839 dlb2_consume_qe_immediate(struct dlb2_port *qm_port, int num)
2840 {
2841 	struct process_local_port_data *port_data;
2842 	struct dlb2_cq_pop_qe *qe;
2843 
2844 	RTE_ASSERT(qm_port->config_state == DLB2_CONFIGURED);
2845 
2846 	qe = qm_port->consume_qe;
2847 
2848 	qe->tokens = num - 1;
2849 
2850 	/* No store fence needed since no pointer is being sent, and CQ token
2851 	 * pops can be safely reordered with other HCWs.
2852 	 */
2853 	port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)];
2854 
2855 	dlb2_movdir64b_single(port_data->pp_addr, qe);
2856 
2857 	DLB2_LOG_LINE_DBG("dlb2: consume immediate - %d QEs", num);
2858 
2859 	qm_port->owed_tokens = 0;
2860 
2861 	return 0;
2862 }
2863 
2864 static inline void
2865 dlb2_hw_do_enqueue(struct dlb2_port *qm_port,
2866 		   bool do_sfence,
2867 		   struct process_local_port_data *port_data)
2868 {
2869 	/* Since MOVDIR64B is weakly-ordered, use an SFENCE to ensure that
2870 	 * application writes complete before enqueueing the QE.
2871 	 */
2872 	if (do_sfence)
2873 		rte_wmb();
2874 
2875 	dlb2_pp_write(port_data, qm_port->qe4);
2876 }
2877 
2878 static inline void
2879 dlb2_construct_token_pop_qe(struct dlb2_port *qm_port, int idx)
2880 {
2881 	struct dlb2_cq_pop_qe *qe = (void *)qm_port->qe4;
2882 	int num = qm_port->owed_tokens;
2883 
2884 	qe[idx].cmd_byte = DLB2_POP_CMD_BYTE;
2885 	qe[idx].tokens = num - 1;
2886 
2887 	qm_port->owed_tokens = 0;
2888 }
2889 
2890 static inline int
2891 dlb2_event_enqueue_prep(struct dlb2_eventdev_port *ev_port,
2892 			struct dlb2_port *qm_port,
2893 			const struct rte_event ev[],
2894 			uint8_t *sched_type,
2895 			uint8_t *queue_id)
2896 {
2897 	struct dlb2_eventdev *dlb2 = ev_port->dlb2;
2898 	struct dlb2_eventdev_queue *ev_queue;
2899 	uint16_t *cached_credits = NULL;
2900 	struct dlb2_queue *qm_queue;
2901 
2902 	ev_queue = &dlb2->ev_queues[ev->queue_id];
2903 	qm_queue = &ev_queue->qm_queue;
2904 	*queue_id = qm_queue->id;
2905 
2906 	/* Ignore sched_type and hardware credits on release events */
2907 	if (ev->op == RTE_EVENT_OP_RELEASE)
2908 		goto op_check;
2909 
2910 	if (!qm_queue->is_directed) {
2911 		/* Load balanced destination queue */
2912 
2913 		if (dlb2->version == DLB2_HW_V2) {
2914 			if (dlb2_check_enqueue_hw_ldb_credits(qm_port)) {
2915 				rte_errno = -ENOSPC;
2916 				return 1;
2917 			}
2918 			cached_credits = &qm_port->cached_ldb_credits;
2919 		} else {
2920 			if (dlb2_check_enqueue_hw_credits(qm_port)) {
2921 				rte_errno = -ENOSPC;
2922 				return 1;
2923 			}
2924 			cached_credits = &qm_port->cached_credits;
2925 		}
2926 		switch (ev->sched_type) {
2927 		case RTE_SCHED_TYPE_ORDERED:
2928 			DLB2_LOG_LINE_DBG("dlb2: put_qe: RTE_SCHED_TYPE_ORDERED");
2929 			if (qm_queue->sched_type != RTE_SCHED_TYPE_ORDERED) {
2930 				DLB2_LOG_ERR("dlb2: tried to send ordered event to unordered queue %d",
2931 					     *queue_id);
2932 				rte_errno = -EINVAL;
2933 				return 1;
2934 			}
2935 			*sched_type = DLB2_SCHED_ORDERED;
2936 			break;
2937 		case RTE_SCHED_TYPE_ATOMIC:
2938 			DLB2_LOG_LINE_DBG("dlb2: put_qe: RTE_SCHED_TYPE_ATOMIC");
2939 			*sched_type = DLB2_SCHED_ATOMIC;
2940 			break;
2941 		case RTE_SCHED_TYPE_PARALLEL:
2942 			DLB2_LOG_LINE_DBG("dlb2: put_qe: RTE_SCHED_TYPE_PARALLEL");
2943 			if (qm_queue->sched_type == RTE_SCHED_TYPE_ORDERED)
2944 				*sched_type = DLB2_SCHED_ORDERED;
2945 			else
2946 				*sched_type = DLB2_SCHED_UNORDERED;
2947 			break;
2948 		default:
2949 			DLB2_LOG_ERR("Unsupported LDB sched type in put_qe");
2950 			DLB2_INC_STAT(ev_port->stats.tx_invalid, 1);
2951 			rte_errno = -EINVAL;
2952 			return 1;
2953 		}
2954 	} else {
2955 		/* Directed destination queue */
2956 
2957 		if (dlb2->version == DLB2_HW_V2) {
2958 			if (dlb2_check_enqueue_hw_dir_credits(qm_port)) {
2959 				rte_errno = -ENOSPC;
2960 				return 1;
2961 			}
2962 			cached_credits = &qm_port->cached_dir_credits;
2963 		} else {
2964 			if (dlb2_check_enqueue_hw_credits(qm_port)) {
2965 				rte_errno = -ENOSPC;
2966 				return 1;
2967 			}
2968 			cached_credits = &qm_port->cached_credits;
2969 		}
2970 		DLB2_LOG_LINE_DBG("dlb2: put_qe: RTE_SCHED_TYPE_DIRECTED");
2971 
2972 		*sched_type = DLB2_SCHED_DIRECTED;
2973 	}
2974 
2975 op_check:
2976 	switch (ev->op) {
2977 	case RTE_EVENT_OP_NEW:
2978 		/* Check that a sw credit is available */
2979 		if (dlb2_check_enqueue_sw_credits(dlb2, ev_port)) {
2980 			rte_errno = -ENOSPC;
2981 			return 1;
2982 		}
2983 		ev_port->inflight_credits--;
2984 		(*cached_credits)--;
2985 		break;
2986 	case RTE_EVENT_OP_FORWARD:
2987 		/* Check for outstanding_releases underflow. If this occurs,
2988 		 * the application is not using the EVENT_OPs correctly; for
2989 		 * example, forwarding or releasing events that were not
2990 		 * dequeued.
2991 		 */
2992 		RTE_ASSERT(ev_port->outstanding_releases > 0);
2993 		ev_port->outstanding_releases--;
2994 		qm_port->issued_releases++;
2995 		(*cached_credits)--;
2996 		break;
2997 	case RTE_EVENT_OP_RELEASE:
2998 		ev_port->inflight_credits++;
2999 		/* Check for outstanding_releases underflow. If this occurs,
3000 		 * the application is not using the EVENT_OPs correctly; for
3001 		 * example, forwarding or releasing events that were not
3002 		 * dequeued.
3003 		 */
3004 		RTE_ASSERT(ev_port->outstanding_releases > 0);
3005 		ev_port->outstanding_releases--;
3006 		qm_port->issued_releases++;
3007 
3008 		/* Replenish s/w credits if enough are cached */
3009 		dlb2_replenish_sw_credits(dlb2, ev_port);
3010 		break;
3011 	}
3012 
3013 	DLB2_INC_STAT(ev_port->stats.tx_op_cnt[ev->op], 1);
3014 	DLB2_INC_STAT(ev_port->stats.traffic.tx_ok, 1);
3015 
3016 #ifndef RTE_LIBRTE_PMD_DLB_QUELL_STATS
3017 	if (ev->op != RTE_EVENT_OP_RELEASE) {
3018 		DLB2_INC_STAT(ev_port->stats.queue[ev->queue_id].enq_ok, 1);
3019 		DLB2_INC_STAT(ev_port->stats.tx_sched_cnt[*sched_type], 1);
3020 	}
3021 #endif
3022 
3023 	return 0;
3024 }
3025 
3026 static inline __m128i
3027 dlb2_event_to_qe(const struct rte_event *ev, uint8_t cmd, uint8_t sched_type, uint8_t qid)
3028 {
3029 	__m128i dlb2_to_qe_shuffle = _mm_set_epi8(
3030 	    0xFF, 0xFF,			 /* zero out cmd word */
3031 	    1, 0,			 /* low 16-bits of flow id */
3032 	    0xFF, 0xFF, /* zero QID, sched_type etc fields to be filled later */
3033 	    3, 2,			 /* top of flow id, event type and subtype */
3034 	    15, 14, 13, 12, 11, 10, 9, 8 /* data from end of event goes at start */
3035 	);
3036 
3037 	/* event may not be 16 byte aligned. Use 16 byte unaligned load */
3038 	__m128i tmp = _mm_lddqu_si128((const __m128i *)ev);
3039 	__m128i qe = _mm_shuffle_epi8(tmp, dlb2_to_qe_shuffle);
3040 	struct dlb2_enqueue_qe *dq = (struct dlb2_enqueue_qe *)&qe;
3041 	/* set the cmd field */
3042 	qe = _mm_insert_epi8(qe, cmd, 15);
3043 	/* insert missing 16-bits with qid, sched_type and priority */
3044 	uint16_t qid_stype_prio =
3045 	    qid | (uint16_t)sched_type << 8 | ((uint16_t)ev->priority & 0xE0) << 5;
3046 	qe = _mm_insert_epi16(qe, qid_stype_prio, 5);
3047 	dq->weight = RTE_PMD_DLB2_GET_QE_WEIGHT(ev);
3048 	return qe;
3049 }
3050 
3051 static inline uint16_t
3052 __dlb2_event_enqueue_burst_reorder(void *event_port,
3053 		const struct rte_event events[],
3054 		uint16_t num,
3055 		bool use_delayed)
3056 {
3057 	struct dlb2_eventdev_port *ev_port = event_port;
3058 	struct dlb2_port *qm_port = &ev_port->qm_port;
3059 	struct dlb2_reorder *order = qm_port->order;
3060 	struct process_local_port_data *port_data;
3061 	bool is_directed = qm_port->is_directed;
3062 	uint8_t n = order->next_to_enqueue;
3063 	uint8_t p_cnt = 0;
3064 	int retries = ev_port->enq_retries;
3065 	__m128i new_qes[4], *from = NULL;
3066 	int num_new = 0;
3067 	int num_tx;
3068 	int i;
3069 
3070 	RTE_ASSERT(ev_port->enq_configured);
3071 	RTE_ASSERT(events != NULL);
3072 
3073 	port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)];
3074 
3075 	num_tx = RTE_MIN(num, ev_port->conf.enqueue_depth);
3076 #if DLB2_BYPASS_FENCE_ON_PP == 1
3077 	if (!qm_port->is_producer) /* Call memory fense once at the start */
3078 		rte_wmb();	   /*  calls _mm_sfence() */
3079 #else
3080 	rte_wmb(); /*  calls _mm_sfence() */
3081 #endif
3082 	for (i = 0; i < num_tx; i++) {
3083 		uint8_t sched_type = 0;
3084 		uint8_t reorder_idx = events[i].impl_opaque;
3085 		int16_t thresh = qm_port->token_pop_thresh;
3086 		uint8_t qid = 0;
3087 		int ret;
3088 
3089 		while ((ret = dlb2_event_enqueue_prep(ev_port, qm_port, &events[i],
3090 						      &sched_type, &qid)) != 0 &&
3091 		       rte_errno == -ENOSPC && --retries > 0)
3092 			rte_pause();
3093 
3094 		if (ret != 0) /* Either there is error or retires exceeded */
3095 			break;
3096 
3097 		switch (events[i].op) {
3098 		case RTE_EVENT_OP_NEW:
3099 			new_qes[num_new++] = dlb2_event_to_qe(
3100 			    &events[i], DLB2_NEW_CMD_BYTE, sched_type, qid);
3101 			if (num_new == RTE_DIM(new_qes)) {
3102 				dlb2_pp_write(port_data, (struct dlb2_enqueue_qe *)&new_qes);
3103 				num_new = 0;
3104 			}
3105 			break;
3106 		case RTE_EVENT_OP_FORWARD: {
3107 			order->enq_reorder[reorder_idx].m128 = dlb2_event_to_qe(
3108 			    &events[i], is_directed ? DLB2_NEW_CMD_BYTE : DLB2_FWD_CMD_BYTE,
3109 			    sched_type, qid);
3110 			n += dlb2_pp_check4_write(port_data, &order->enq_reorder[n].qe);
3111 			break;
3112 		}
3113 		case RTE_EVENT_OP_RELEASE: {
3114 			order->enq_reorder[reorder_idx].m128 = dlb2_event_to_qe(
3115 			    &events[i], is_directed ? DLB2_NOOP_CMD_BYTE : DLB2_COMP_CMD_BYTE,
3116 			    sched_type, 0xFF);
3117 			break;
3118 		}
3119 		}
3120 
3121 		if (use_delayed && qm_port->token_pop_mode == DELAYED_POP &&
3122 		    (events[i].op == RTE_EVENT_OP_FORWARD ||
3123 		     events[i].op == RTE_EVENT_OP_RELEASE) &&
3124 		    qm_port->issued_releases >= thresh - 1) {
3125 
3126 			dlb2_consume_qe_immediate(qm_port, qm_port->owed_tokens);
3127 
3128 			/* Reset the releases for the next QE batch */
3129 			qm_port->issued_releases -= thresh;
3130 
3131 			/* When using delayed token pop mode, the
3132 			 * initial token threshold is the full CQ
3133 			 * depth. After the first token pop, we need to
3134 			 * reset it to the dequeue_depth.
3135 			 */
3136 			qm_port->token_pop_thresh =
3137 			    qm_port->dequeue_depth;
3138 		}
3139 	}
3140 	while (order->enq_reorder[n].u64[1] != 0) {
3141 		__m128i tmp[4] = {0}, *send = NULL;
3142 		bool enq;
3143 
3144 		if (!p_cnt)
3145 			from = &order->enq_reorder[n].m128;
3146 
3147 		p_cnt++;
3148 		n++;
3149 
3150 		enq = !n || p_cnt == 4 || !order->enq_reorder[n].u64[1];
3151 		if (!enq)
3152 			continue;
3153 
3154 		if (p_cnt < 4) {
3155 			memcpy(tmp, from, p_cnt * sizeof(struct dlb2_enqueue_qe));
3156 			send = tmp;
3157 		} else {
3158 			send  = from;
3159 		}
3160 
3161 		if (is_directed)
3162 			dlb2_pp_write_reorder(port_data, (struct dlb2_enqueue_qe *)send);
3163 		else
3164 			dlb2_pp_write(port_data, (struct dlb2_enqueue_qe *)send);
3165 		memset(from, 0, p_cnt * sizeof(struct dlb2_enqueue_qe));
3166 		p_cnt = 0;
3167 	}
3168 	order->next_to_enqueue = n;
3169 
3170 	if (num_new > 0) {
3171 		switch (num_new) {
3172 		case 1:
3173 			new_qes[1] = _mm_setzero_si128(); /* fall-through */
3174 		case 2:
3175 			new_qes[2] = _mm_setzero_si128(); /* fall-through */
3176 		case 3:
3177 			new_qes[3] = _mm_setzero_si128();
3178 		}
3179 		dlb2_pp_write(port_data, (struct dlb2_enqueue_qe *)&new_qes);
3180 		num_new = 0;
3181 	}
3182 
3183 	return i;
3184 }
3185 
3186 static inline uint16_t
3187 __dlb2_event_enqueue_burst(void *event_port,
3188 			   const struct rte_event events[],
3189 			   uint16_t num,
3190 			   bool use_delayed)
3191 {
3192 	struct dlb2_eventdev_port *ev_port = event_port;
3193 	struct dlb2_port *qm_port = &ev_port->qm_port;
3194 	struct process_local_port_data *port_data;
3195 	int retries = ev_port->enq_retries;
3196 	int num_tx;
3197 	int i;
3198 
3199 	RTE_ASSERT(ev_port->enq_configured);
3200 	RTE_ASSERT(events != NULL);
3201 
3202 	if (qm_port->reorder_en)
3203 		return __dlb2_event_enqueue_burst_reorder(event_port, events, num, use_delayed);
3204 
3205 	i = 0;
3206 
3207 	port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)];
3208 	num_tx = RTE_MIN(num, ev_port->conf.enqueue_depth);
3209 	while (i < num_tx) {
3210 		uint8_t sched_types[DLB2_NUM_QES_PER_CACHE_LINE];
3211 		uint8_t queue_ids[DLB2_NUM_QES_PER_CACHE_LINE];
3212 		int pop_offs = 0;
3213 		int j = 0;
3214 
3215 		memset(qm_port->qe4,
3216 		       0,
3217 		       DLB2_NUM_QES_PER_CACHE_LINE *
3218 		       sizeof(struct dlb2_enqueue_qe));
3219 
3220 		for (; j < DLB2_NUM_QES_PER_CACHE_LINE && (i + j) < num; j++) {
3221 			const struct rte_event *ev = &events[i + j];
3222 			int16_t thresh = qm_port->token_pop_thresh;
3223 			int ret;
3224 
3225 			if (use_delayed &&
3226 			    qm_port->token_pop_mode == DELAYED_POP &&
3227 			    (ev->op == RTE_EVENT_OP_FORWARD ||
3228 			     ev->op == RTE_EVENT_OP_RELEASE) &&
3229 			    qm_port->issued_releases >= thresh - 1) {
3230 				/* Insert the token pop QE and break out. This
3231 				 * may result in a partial HCW, but that is
3232 				 * simpler than supporting arbitrary QE
3233 				 * insertion.
3234 				 */
3235 				dlb2_construct_token_pop_qe(qm_port, j);
3236 
3237 				/* Reset the releases for the next QE batch */
3238 				qm_port->issued_releases -= thresh;
3239 
3240 				pop_offs = 1;
3241 				j++;
3242 				break;
3243 			}
3244 
3245 			/*
3246 			 * Retry if insufficient credits
3247 			 */
3248 			do {
3249 				ret = dlb2_event_enqueue_prep(ev_port,
3250 							      qm_port,
3251 							      ev,
3252 							      &sched_types[j],
3253 							      &queue_ids[j]);
3254 			} while ((ret == -ENOSPC) && (retries-- > 0));
3255 
3256 			if (ret != 0)
3257 				break;
3258 		}
3259 
3260 		if (j == 0)
3261 			break;
3262 
3263 		dlb2_event_build_hcws(qm_port, &events[i], j - pop_offs,
3264 				      sched_types, queue_ids);
3265 
3266 #if DLB2_BYPASS_FENCE_ON_PP == 1
3267 		/* Bypass fence instruction for producer ports */
3268 		dlb2_hw_do_enqueue(qm_port, i == 0 && !qm_port->is_producer, port_data);
3269 #else
3270 		dlb2_hw_do_enqueue(qm_port, i == 0, port_data);
3271 #endif
3272 
3273 		/* Don't include the token pop QE in the enqueue count */
3274 		i += j - pop_offs;
3275 
3276 		/* Don't interpret j < DLB2_NUM_... as out-of-credits if
3277 		 * pop_offs != 0
3278 		 */
3279 		if (j < DLB2_NUM_QES_PER_CACHE_LINE && pop_offs == 0)
3280 			break;
3281 	}
3282 
3283 	return i;
3284 }
3285 
3286 static uint16_t
3287 dlb2_event_enqueue_burst(void *event_port,
3288 			     const struct rte_event events[],
3289 			     uint16_t num)
3290 {
3291 	return __dlb2_event_enqueue_burst(event_port, events, num, false);
3292 }
3293 
3294 static uint16_t
3295 dlb2_event_enqueue_burst_delayed(void *event_port,
3296 				     const struct rte_event events[],
3297 				     uint16_t num)
3298 {
3299 	return __dlb2_event_enqueue_burst(event_port, events, num, true);
3300 }
3301 
3302 static uint16_t
3303 dlb2_event_enqueue_new_burst(void *event_port,
3304 			     const struct rte_event events[],
3305 			     uint16_t num)
3306 {
3307 	return __dlb2_event_enqueue_burst(event_port, events, num, false);
3308 }
3309 
3310 static uint16_t
3311 dlb2_event_enqueue_new_burst_delayed(void *event_port,
3312 				     const struct rte_event events[],
3313 				     uint16_t num)
3314 {
3315 	return __dlb2_event_enqueue_burst(event_port, events, num, true);
3316 }
3317 
3318 static uint16_t
3319 dlb2_event_enqueue_forward_burst(void *event_port,
3320 				 const struct rte_event events[],
3321 				 uint16_t num)
3322 {
3323 	return __dlb2_event_enqueue_burst(event_port, events, num, false);
3324 }
3325 
3326 static uint16_t
3327 dlb2_event_enqueue_forward_burst_delayed(void *event_port,
3328 					 const struct rte_event events[],
3329 					 uint16_t num)
3330 {
3331 	return __dlb2_event_enqueue_burst(event_port, events, num, true);
3332 }
3333 
3334 static void
3335 dlb2_event_release(struct dlb2_eventdev *dlb2,
3336 		   uint8_t port_id,
3337 		   int n)
3338 {
3339 	struct process_local_port_data *port_data;
3340 	struct dlb2_eventdev_port *ev_port;
3341 	struct dlb2_port *qm_port;
3342 	int i;
3343 
3344 	if (port_id > dlb2->num_ports) {
3345 		DLB2_LOG_ERR("Invalid port id %d in dlb2-event_release",
3346 			     port_id);
3347 		rte_errno = -EINVAL;
3348 		return;
3349 	}
3350 
3351 	ev_port = &dlb2->ev_ports[port_id];
3352 	qm_port = &ev_port->qm_port;
3353 	port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)];
3354 
3355 	i = 0;
3356 
3357 	if (qm_port->is_directed) {
3358 		i = n;
3359 		goto sw_credit_update;
3360 	}
3361 
3362 	while (i < n) {
3363 		int pop_offs = 0;
3364 		int j = 0;
3365 
3366 		/* Zero-out QEs */
3367 		_mm_storeu_si128((void *)&qm_port->qe4[0], _mm_setzero_si128());
3368 		_mm_storeu_si128((void *)&qm_port->qe4[1], _mm_setzero_si128());
3369 		_mm_storeu_si128((void *)&qm_port->qe4[2], _mm_setzero_si128());
3370 		_mm_storeu_si128((void *)&qm_port->qe4[3], _mm_setzero_si128());
3371 
3372 
3373 		for (; j < DLB2_NUM_QES_PER_CACHE_LINE && (i + j) < n; j++) {
3374 			int16_t thresh = qm_port->token_pop_thresh;
3375 
3376 			if (qm_port->token_pop_mode == DELAYED_POP &&
3377 			    qm_port->issued_releases >= thresh - 1) {
3378 				/* Insert the token pop QE */
3379 				dlb2_construct_token_pop_qe(qm_port, j);
3380 
3381 				/* Reset the releases for the next QE batch */
3382 				qm_port->issued_releases -= thresh;
3383 
3384 				pop_offs = 1;
3385 				j++;
3386 				break;
3387 			}
3388 
3389 			qm_port->qe4[j].cmd_byte = DLB2_COMP_CMD_BYTE;
3390 			qm_port->issued_releases++;
3391 		}
3392 
3393 		dlb2_hw_do_enqueue(qm_port, i == 0, port_data);
3394 
3395 		/* Don't include the token pop QE in the release count */
3396 		i += j - pop_offs;
3397 	}
3398 
3399 sw_credit_update:
3400 	/* each release returns one credit */
3401 	if (unlikely(!ev_port->outstanding_releases)) {
3402 		DLB2_LOG_ERR("%s: Outstanding releases underflowed.",
3403 			     __func__);
3404 		return;
3405 	}
3406 	ev_port->outstanding_releases -= i;
3407 	ev_port->inflight_credits += i;
3408 
3409 	/* Replenish s/w credits if enough releases are performed */
3410 	dlb2_replenish_sw_credits(dlb2, ev_port);
3411 }
3412 
3413 static inline void
3414 dlb2_port_credits_inc(struct dlb2_port *qm_port, int num)
3415 {
3416 	uint32_t batch_size = qm_port->hw_credit_quanta;
3417 
3418 	/* increment port credits, and return to pool if exceeds threshold */
3419 	if (!qm_port->is_directed) {
3420 		if (qm_port->dlb2->version == DLB2_HW_V2) {
3421 			qm_port->cached_ldb_credits += num;
3422 			if (qm_port->cached_ldb_credits >= 2 * batch_size) {
3423 				rte_atomic_fetch_add_explicit(
3424 					qm_port->credit_pool[DLB2_LDB_QUEUE],
3425 					batch_size, rte_memory_order_seq_cst);
3426 				qm_port->cached_ldb_credits -= batch_size;
3427 			}
3428 		} else {
3429 			qm_port->cached_credits += num;
3430 			if (qm_port->cached_credits >= 2 * batch_size) {
3431 				rte_atomic_fetch_add_explicit(
3432 				      qm_port->credit_pool[DLB2_COMBINED_POOL],
3433 				      batch_size, rte_memory_order_seq_cst);
3434 				qm_port->cached_credits -= batch_size;
3435 			}
3436 		}
3437 	} else {
3438 		if (qm_port->dlb2->version == DLB2_HW_V2) {
3439 			qm_port->cached_dir_credits += num;
3440 			if (qm_port->cached_dir_credits >= 2 * batch_size) {
3441 				rte_atomic_fetch_add_explicit(
3442 					qm_port->credit_pool[DLB2_DIR_QUEUE],
3443 					batch_size, rte_memory_order_seq_cst);
3444 				qm_port->cached_dir_credits -= batch_size;
3445 			}
3446 		} else {
3447 			qm_port->cached_credits += num;
3448 			if (qm_port->cached_credits >= 2 * batch_size) {
3449 				rte_atomic_fetch_add_explicit(
3450 				      qm_port->credit_pool[DLB2_COMBINED_POOL],
3451 				      batch_size, rte_memory_order_seq_cst);
3452 				qm_port->cached_credits -= batch_size;
3453 			}
3454 		}
3455 	}
3456 }
3457 
3458 #define CLB_MASK_IDX 0
3459 #define CLB_VAL_IDX 1
3460 static int
3461 dlb2_monitor_callback(const uint64_t val,
3462 		const uint64_t opaque[RTE_POWER_MONITOR_OPAQUE_SZ])
3463 {
3464 	/* abort if the value matches */
3465 	return (val & opaque[CLB_MASK_IDX]) == opaque[CLB_VAL_IDX] ? -1 : 0;
3466 }
3467 
3468 static inline int
3469 dlb2_dequeue_wait(struct dlb2_eventdev *dlb2,
3470 		  struct dlb2_eventdev_port *ev_port,
3471 		  struct dlb2_port *qm_port,
3472 		  uint64_t timeout,
3473 		  uint64_t start_ticks)
3474 {
3475 	struct process_local_port_data *port_data;
3476 	uint64_t elapsed_ticks;
3477 
3478 	port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)];
3479 
3480 	elapsed_ticks = rte_get_timer_cycles() - start_ticks;
3481 
3482 	/* Wait/poll time expired */
3483 	if (elapsed_ticks >= timeout) {
3484 		return 1;
3485 	} else if (dlb2->umwait_allowed) {
3486 		struct rte_power_monitor_cond pmc;
3487 		volatile struct dlb2_dequeue_qe *cq_base;
3488 		union {
3489 			uint64_t raw_qe[2];
3490 			struct dlb2_dequeue_qe qe;
3491 		} qe_mask;
3492 		uint64_t expected_value;
3493 		volatile uint64_t *monitor_addr;
3494 
3495 		qe_mask.qe.cq_gen = 1; /* set mask */
3496 
3497 		cq_base = port_data->cq_base;
3498 		monitor_addr = (volatile uint64_t *)(volatile void *)
3499 			&cq_base[qm_port->cq_idx];
3500 		monitor_addr++; /* cq_gen bit is in second 64bit location */
3501 
3502 		if (qm_port->gen_bit)
3503 			expected_value = qe_mask.raw_qe[1];
3504 		else
3505 			expected_value = 0;
3506 
3507 		pmc.addr = monitor_addr;
3508 		/* store expected value and comparison mask in opaque data */
3509 		pmc.opaque[CLB_VAL_IDX] = expected_value;
3510 		pmc.opaque[CLB_MASK_IDX] = qe_mask.raw_qe[1];
3511 		/* set up callback */
3512 		pmc.fn = dlb2_monitor_callback;
3513 		pmc.size = sizeof(uint64_t);
3514 
3515 		rte_power_monitor(&pmc, timeout + start_ticks);
3516 
3517 		DLB2_INC_STAT(ev_port->stats.traffic.rx_umonitor_umwait, 1);
3518 	} else {
3519 		uint64_t poll_interval = dlb2->poll_interval;
3520 		uint64_t curr_ticks = rte_get_timer_cycles();
3521 		uint64_t init_ticks = curr_ticks;
3522 
3523 		while ((curr_ticks - start_ticks < timeout) &&
3524 		       (curr_ticks - init_ticks < poll_interval))
3525 			curr_ticks = rte_get_timer_cycles();
3526 	}
3527 
3528 	return 0;
3529 }
3530 
3531 static __rte_noinline int
3532 dlb2_process_dequeue_qes(struct dlb2_eventdev_port *ev_port,
3533 			 struct dlb2_port *qm_port,
3534 			 struct rte_event *events,
3535 			 struct dlb2_dequeue_qe *qes,
3536 			 int cnt)
3537 {
3538 	uint8_t *qid_mappings = qm_port->qid_mappings;
3539 	int i, num, evq_id;
3540 
3541 	for (i = 0, num = 0; i < cnt; i++) {
3542 		struct dlb2_dequeue_qe *qe = &qes[i];
3543 		int sched_type_map[DLB2_NUM_HW_SCHED_TYPES] = {
3544 			[DLB2_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
3545 			[DLB2_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
3546 			[DLB2_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
3547 			[DLB2_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
3548 		};
3549 
3550 		/* Fill in event information.
3551 		 * Note that flow_id must be embedded in the data by
3552 		 * the app, such as the mbuf RSS hash field if the data
3553 		 * buffer is a mbuf.
3554 		 */
3555 		if (unlikely(qe->error)) {
3556 			DLB2_LOG_ERR("QE error bit ON");
3557 			DLB2_INC_STAT(ev_port->stats.traffic.rx_drop, 1);
3558 			dlb2_consume_qe_immediate(qm_port, 1);
3559 			continue; /* Ignore */
3560 		}
3561 
3562 		events[num].u64 = qe->data;
3563 		events[num].flow_id = qe->flow_id;
3564 		events[num].priority = DLB2_TO_EV_PRIO((uint8_t)qe->priority);
3565 		events[num].event_type = qe->u.event_type.major;
3566 		events[num].sub_event_type = qe->u.event_type.sub;
3567 		events[num].sched_type = sched_type_map[qe->sched_type];
3568 		events[num].impl_opaque = qm_port->reorder_id++;
3569 		RTE_PMD_DLB2_SET_QID_DEPTH(&events[num], qe->qid_depth);
3570 
3571 		/* qid not preserved for directed queues */
3572 		if (qm_port->is_directed)
3573 			evq_id = ev_port->link[0].queue_id;
3574 		else
3575 			evq_id = qid_mappings[qe->qid];
3576 
3577 		events[num].queue_id = evq_id;
3578 		DLB2_INC_STAT(
3579 			ev_port->stats.queue[evq_id].qid_depth[qe->qid_depth],
3580 			1);
3581 		DLB2_INC_STAT(ev_port->stats.rx_sched_cnt[qe->sched_type], 1);
3582 		num++;
3583 	}
3584 
3585 	DLB2_INC_STAT(ev_port->stats.traffic.rx_ok, num);
3586 
3587 	return num;
3588 }
3589 
3590 static inline int
3591 dlb2_process_dequeue_four_qes(struct dlb2_eventdev_port *ev_port,
3592 			      struct dlb2_port *qm_port,
3593 			      struct rte_event *events,
3594 			      struct dlb2_dequeue_qe *qes)
3595 {
3596 	int sched_type_map[] = {
3597 		[DLB2_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
3598 		[DLB2_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
3599 		[DLB2_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
3600 		[DLB2_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
3601 	};
3602 	const int num_events = DLB2_NUM_QES_PER_CACHE_LINE;
3603 	uint8_t *qid_mappings = qm_port->qid_mappings;
3604 
3605 	/* In the unlikely case that any of the QE error bits are set, process
3606 	 * them one at a time.
3607 	 */
3608 	if (unlikely(qes[0].error || qes[1].error ||
3609 		     qes[2].error || qes[3].error))
3610 		return dlb2_process_dequeue_qes(ev_port, qm_port, events,
3611 						 qes, num_events);
3612 	const __m128i qe_to_ev_shuffle =
3613 	    _mm_set_epi8(7, 6, 5, 4, 3, 2, 1, 0, /* last 8-bytes = data from first 8 */
3614 			 0xFF, 0xFF, 0xFF, 0xFF, /* fill in later as 32-bit value*/
3615 			 9, 8,			 /* event type and sub-event, + 4 zero bits */
3616 			 13, 12 /* flow id, 16 bits */);
3617 	for (int i = 0; i < 4; i++) {
3618 		const __m128i hw_qe = _mm_load_si128((void *)&qes[i]);
3619 		const __m128i event = _mm_shuffle_epi8(hw_qe, qe_to_ev_shuffle);
3620 		/* prepare missing 32-bits for op, sched_type, QID, Priority and
3621 		 * sequence number in impl_opaque
3622 		 */
3623 		const uint16_t qid_sched_prio = _mm_extract_epi16(hw_qe, 5);
3624 		/* Extract qid_depth and format it as per event header */
3625 		const uint8_t qid_depth = (_mm_extract_epi8(hw_qe, 15) & 0x6) << 1;
3626 		const uint32_t qid =  (qm_port->is_directed) ? ev_port->link[0].queue_id :
3627 					qid_mappings[(uint8_t)qid_sched_prio];
3628 		const uint32_t sched_type = sched_type_map[(qid_sched_prio >> 8) & 0x3];
3629 		const uint32_t priority = (qid_sched_prio >> 5) & 0xE0;
3630 
3631 		const uint32_t dword1 = qid_depth |
3632 		    sched_type << 6 | qid << 8 | priority << 16 | (qm_port->reorder_id + i) << 24;
3633 
3634 		/* events[] may not be 16 byte aligned. So use separate load and store */
3635 		const __m128i tmpEv = _mm_insert_epi32(event, dword1, 1);
3636 		_mm_storeu_si128((__m128i *) &events[i], tmpEv);
3637 	}
3638 	qm_port->reorder_id += 4;
3639 
3640 	DLB2_INC_STAT(ev_port->stats.rx_sched_cnt[qes[0].sched_type], 1);
3641 	DLB2_INC_STAT(ev_port->stats.rx_sched_cnt[qes[1].sched_type], 1);
3642 	DLB2_INC_STAT(ev_port->stats.rx_sched_cnt[qes[2].sched_type], 1);
3643 	DLB2_INC_STAT(ev_port->stats.rx_sched_cnt[qes[3].sched_type], 1);
3644 
3645 	DLB2_INC_STAT(
3646 		ev_port->stats.queue[events[0].queue_id].
3647 			qid_depth[qes[0].qid_depth],
3648 		1);
3649 	DLB2_INC_STAT(
3650 		ev_port->stats.queue[events[1].queue_id].
3651 			qid_depth[qes[1].qid_depth],
3652 		1);
3653 	DLB2_INC_STAT(
3654 		ev_port->stats.queue[events[2].queue_id].
3655 			qid_depth[qes[2].qid_depth],
3656 		1);
3657 	DLB2_INC_STAT(
3658 		ev_port->stats.queue[events[3].queue_id].
3659 			qid_depth[qes[3].qid_depth],
3660 		1);
3661 
3662 	DLB2_INC_STAT(ev_port->stats.traffic.rx_ok, num_events);
3663 
3664 	return num_events;
3665 }
3666 
3667 static __rte_always_inline int
3668 dlb2_recv_qe_sparse(struct dlb2_port *qm_port, struct dlb2_dequeue_qe *qe)
3669 {
3670 	volatile struct dlb2_dequeue_qe *cq_addr;
3671 	uint8_t xor_mask[2] = {0x0F, 0x00};
3672 	const uint8_t and_mask = 0x0F;
3673 	__m128i *qes = (__m128i *)qe;
3674 	uint8_t gen_bits, gen_bit;
3675 	uintptr_t addr[4];
3676 	uint16_t idx;
3677 
3678 	cq_addr = dlb2_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
3679 
3680 	idx = qm_port->cq_idx_unmasked & qm_port->cq_depth_mask;
3681 	/* Load the next 4 QEs */
3682 	addr[0] = (uintptr_t)&cq_addr[idx];
3683 	addr[1] = (uintptr_t)&cq_addr[(idx +  4) & qm_port->cq_depth_mask];
3684 	addr[2] = (uintptr_t)&cq_addr[(idx +  8) & qm_port->cq_depth_mask];
3685 	addr[3] = (uintptr_t)&cq_addr[(idx + 12) & qm_port->cq_depth_mask];
3686 
3687 	/* Prefetch next batch of QEs (all CQs occupy minimum 8 cache lines) */
3688 	rte_prefetch0(&cq_addr[(idx + 16) & qm_port->cq_depth_mask]);
3689 	rte_prefetch0(&cq_addr[(idx + 20) & qm_port->cq_depth_mask]);
3690 	rte_prefetch0(&cq_addr[(idx + 24) & qm_port->cq_depth_mask]);
3691 	rte_prefetch0(&cq_addr[(idx + 28) & qm_port->cq_depth_mask]);
3692 
3693 	/* Correct the xor_mask for wrap-around QEs */
3694 	gen_bit = qm_port->gen_bit;
3695 	xor_mask[gen_bit] ^= !!((idx +  4) > qm_port->cq_depth_mask) << 1;
3696 	xor_mask[gen_bit] ^= !!((idx +  8) > qm_port->cq_depth_mask) << 2;
3697 	xor_mask[gen_bit] ^= !!((idx + 12) > qm_port->cq_depth_mask) << 3;
3698 
3699 	/* Read the cache lines backwards to ensure that if QE[N] (N > 0) is
3700 	 * valid, then QEs[0:N-1] are too.
3701 	 */
3702 	qes[3] = _mm_load_si128((__m128i *)(void *)addr[3]);
3703 	rte_compiler_barrier();
3704 	qes[2] = _mm_load_si128((__m128i *)(void *)addr[2]);
3705 	rte_compiler_barrier();
3706 	qes[1] = _mm_load_si128((__m128i *)(void *)addr[1]);
3707 	rte_compiler_barrier();
3708 	qes[0] = _mm_load_si128((__m128i *)(void *)addr[0]);
3709 
3710 	/* Extract and combine the gen bits */
3711 	gen_bits = ((_mm_extract_epi8(qes[0], 15) & 0x1) << 0) |
3712 		   ((_mm_extract_epi8(qes[1], 15) & 0x1) << 1) |
3713 		   ((_mm_extract_epi8(qes[2], 15) & 0x1) << 2) |
3714 		   ((_mm_extract_epi8(qes[3], 15) & 0x1) << 3);
3715 
3716 	/* XOR the combined bits such that a 1 represents a valid QE */
3717 	gen_bits ^= xor_mask[gen_bit];
3718 
3719 	/* Mask off gen bits we don't care about */
3720 	gen_bits &= and_mask;
3721 
3722 	return rte_popcount32(gen_bits);
3723 }
3724 
3725 static inline void
3726 _process_deq_qes_vec_impl(struct dlb2_port *qm_port,
3727 			  struct rte_event *events,
3728 			  __m128i v_qe_3,
3729 			  __m128i v_qe_2,
3730 			  __m128i v_qe_1,
3731 			  __m128i v_qe_0,
3732 			  __m128i v_qe_meta,
3733 			  __m128i v_qe_status,
3734 			  uint32_t valid_events)
3735 {
3736 	/* Look up the event QIDs, using the hardware QIDs to index the
3737 	 * port's QID mapping.
3738 	 *
3739 	 * Each v_qe_[0-4] is just a 16-byte load of the whole QE. It is
3740 	 * passed along in registers as the QE data is required later.
3741 	 *
3742 	 * v_qe_meta is an u32 unpack of all 4x QEs. A.k.a, it contains one
3743 	 * 32-bit slice of each QE, so makes up a full SSE register. This
3744 	 * allows parallel processing of 4x QEs in a single register.
3745 	 */
3746 
3747 	__m128i v_qid_done = {0};
3748 	int hw_qid0 = _mm_extract_epi8(v_qe_meta, 2);
3749 	int hw_qid1 = _mm_extract_epi8(v_qe_meta, 6);
3750 	int hw_qid2 = _mm_extract_epi8(v_qe_meta, 10);
3751 	int hw_qid3 = _mm_extract_epi8(v_qe_meta, 14);
3752 
3753 	int ev_qid0 = qm_port->qid_mappings[hw_qid0];
3754 	int ev_qid1 = qm_port->qid_mappings[hw_qid1];
3755 	int ev_qid2 = qm_port->qid_mappings[hw_qid2];
3756 	int ev_qid3 = qm_port->qid_mappings[hw_qid3];
3757 
3758 	int hw_sched0 = _mm_extract_epi8(v_qe_meta, 3) & 3ul;
3759 	int hw_sched1 = _mm_extract_epi8(v_qe_meta, 7) & 3ul;
3760 	int hw_sched2 = _mm_extract_epi8(v_qe_meta, 11) & 3ul;
3761 	int hw_sched3 = _mm_extract_epi8(v_qe_meta, 15) & 3ul;
3762 
3763 	v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid0, 2);
3764 	v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid1, 6);
3765 	v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid2, 10);
3766 	v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid3, 14);
3767 
3768 	/* Schedule field remapping using byte shuffle
3769 	 * - Full byte containing sched field handled here (op, rsvd are zero)
3770 	 * - Note sanitizing the register requires two masking ANDs:
3771 	 *   1) to strip prio/msg_type from byte for correct shuffle lookup
3772 	 *   2) to strip any non-sched-field lanes from any results to OR later
3773 	 * - Final byte result is >> 10 to another byte-lane inside the u32.
3774 	 *   This makes the final combination OR easier to make the rte_event.
3775 	 */
3776 	__m128i v_sched_done;
3777 	__m128i v_sched_bits;
3778 	{
3779 		static const uint8_t sched_type_map[16] = {
3780 			[DLB2_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
3781 			[DLB2_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
3782 			[DLB2_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
3783 			[DLB2_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
3784 		};
3785 		static const uint8_t sched_and_mask[16] = {
3786 			0x00, 0x00, 0x00, 0x03,
3787 			0x00, 0x00, 0x00, 0x03,
3788 			0x00, 0x00, 0x00, 0x03,
3789 			0x00, 0x00, 0x00, 0x03,
3790 		};
3791 
3792 		static const uint8_t qid_depth_mask[16] = {
3793 			0x00, 0x00, 0x00, 0x06,
3794 			0x00, 0x00, 0x00, 0x06,
3795 			0x00, 0x00, 0x00, 0x06,
3796 			0x00, 0x00, 0x00, 0x06,
3797 		};
3798 		const __m128i v_qid_depth_mask  = _mm_loadu_si128(
3799 						  (const __m128i *)qid_depth_mask);
3800 		const __m128i v_sched_map = _mm_loadu_si128(
3801 					     (const __m128i *)sched_type_map);
3802 		__m128i v_sched_mask = _mm_loadu_si128(
3803 					     (const __m128i *)&sched_and_mask);
3804 		v_sched_bits = _mm_and_si128(v_qe_meta, v_sched_mask);
3805 		__m128i v_sched_remapped = _mm_shuffle_epi8(v_sched_map,
3806 							    v_sched_bits);
3807 		__m128i v_preshift = _mm_and_si128(v_sched_remapped,
3808 						   v_sched_mask);
3809 		v_sched_done = _mm_srli_epi32(v_preshift, 10);
3810 		__m128i v_qid_depth =  _mm_and_si128(v_qe_status, v_qid_depth_mask);
3811 		v_qid_depth = _mm_srli_epi32(v_qid_depth, 15);
3812 		v_sched_done = _mm_or_si128(v_sched_done, v_qid_depth);
3813 	}
3814 
3815 	/* Priority handling
3816 	 * - QE provides 3 bits of priority
3817 	 * - Shift << 3 to move to MSBs for byte-prio in rte_event
3818 	 * - Mask bits to avoid pollution, leaving only 3 prio MSBs in reg
3819 	 */
3820 	__m128i v_prio_done;
3821 	{
3822 		static const uint8_t prio_mask[16] = {
3823 			0x00, 0x00, 0x00, 0x07 << 5,
3824 			0x00, 0x00, 0x00, 0x07 << 5,
3825 			0x00, 0x00, 0x00, 0x07 << 5,
3826 			0x00, 0x00, 0x00, 0x07 << 5,
3827 		};
3828 		__m128i v_prio_mask  = _mm_loadu_si128(
3829 						(const __m128i *)prio_mask);
3830 		__m128i v_prio_shifted = _mm_slli_epi32(v_qe_meta, 3);
3831 		v_prio_done = _mm_and_si128(v_prio_shifted, v_prio_mask);
3832 	}
3833 
3834 	/* Event Sub/Type handling:
3835 	 * we want to keep the lower 12 bits of each QE. Shift up by 20 bits
3836 	 * to get the sub/ev type data into rte_event location, clearing the
3837 	 * lower 20 bits in the process.
3838 	 */
3839 	__m128i v_types_done;
3840 	{
3841 		static const uint8_t event_mask[16] = {
3842 			0x0f, 0x00, 0x00, 0x00,
3843 			0x0f, 0x00, 0x00, 0x00,
3844 			0x0f, 0x00, 0x00, 0x00,
3845 			0x0f, 0x00, 0x00, 0x00,
3846 		};
3847 		static const uint8_t sub_event_mask[16] = {
3848 			0xff, 0x00, 0x00, 0x00,
3849 			0xff, 0x00, 0x00, 0x00,
3850 			0xff, 0x00, 0x00, 0x00,
3851 			0xff, 0x00, 0x00, 0x00,
3852 		};
3853 		static const uint8_t flow_mask[16] = {
3854 			0xff, 0xff, 0x00, 0x00,
3855 			0xff, 0xff, 0x00, 0x00,
3856 			0xff, 0xff, 0x00, 0x00,
3857 			0xff, 0xff, 0x00, 0x00,
3858 		};
3859 		__m128i v_event_mask  = _mm_loadu_si128(
3860 					(const __m128i *)event_mask);
3861 		__m128i v_sub_event_mask  = _mm_loadu_si128(
3862 					(const __m128i *)sub_event_mask);
3863 		__m128i v_flow_mask  = _mm_loadu_si128(
3864 				       (const __m128i *)flow_mask);
3865 		__m128i v_sub = _mm_srli_epi32(v_qe_meta, 4);
3866 		v_sub = _mm_and_si128(v_sub, v_sub_event_mask);
3867 		__m128i v_type = _mm_srli_epi32(v_qe_meta, 12);
3868 		v_type = _mm_and_si128(v_type, v_event_mask);
3869 		v_type = _mm_slli_epi32(v_type, 8);
3870 		v_types_done = _mm_or_si128(v_type, v_sub);
3871 		v_types_done = _mm_slli_epi32(v_types_done, 20);
3872 		__m128i v_flow = _mm_and_si128(v_qe_status, v_flow_mask);
3873 		v_types_done = _mm_or_si128(v_types_done, v_flow);
3874 	}
3875 
3876 	/* Combine QID, Sched and Prio fields, then Shift >> 8 bits to align
3877 	 * with the rte_event, allowing unpacks to move/blend with payload.
3878 	 */
3879 	__m128i v_q_s_p_done;
3880 	{
3881 		__m128i v_qid_sched = _mm_or_si128(v_qid_done, v_sched_done);
3882 		__m128i v_q_s_prio = _mm_or_si128(v_qid_sched, v_prio_done);
3883 		v_q_s_p_done = _mm_srli_epi32(v_q_s_prio, 8);
3884 	}
3885 
3886 	__m128i v_unpk_ev_23, v_unpk_ev_01, v_ev_2, v_ev_3, v_ev_0, v_ev_1;
3887 
3888 	/* Unpack evs into u64 metadata, then indiv events */
3889 	v_unpk_ev_23 = _mm_unpackhi_epi32(v_types_done, v_q_s_p_done);
3890 	v_unpk_ev_01 = _mm_unpacklo_epi32(v_types_done, v_q_s_p_done);
3891 
3892 	switch (valid_events) {
3893 	case 4:
3894 		v_ev_3 = _mm_blend_epi16(v_unpk_ev_23, v_qe_3, 0x0F);
3895 		v_ev_3 = _mm_alignr_epi8(v_ev_3, v_ev_3, 8);
3896 		v_ev_3 = _mm_insert_epi8(v_ev_3, qm_port->reorder_id + 3, 7);
3897 		_mm_storeu_si128((__m128i *)&events[3], v_ev_3);
3898 		DLB2_INC_STAT(qm_port->ev_port->stats.rx_sched_cnt[hw_sched3],
3899 			      1);
3900 		/* fallthrough */
3901 	case 3:
3902 		v_ev_2 = _mm_unpacklo_epi64(v_unpk_ev_23, v_qe_2);
3903 		v_ev_2 = _mm_insert_epi8(v_ev_2, qm_port->reorder_id + 2, 7);
3904 		_mm_storeu_si128((__m128i *)&events[2], v_ev_2);
3905 		DLB2_INC_STAT(qm_port->ev_port->stats.rx_sched_cnt[hw_sched2],
3906 			      1);
3907 		/* fallthrough */
3908 	case 2:
3909 		v_ev_1 = _mm_blend_epi16(v_unpk_ev_01, v_qe_1, 0x0F);
3910 		v_ev_1 = _mm_alignr_epi8(v_ev_1, v_ev_1, 8);
3911 		v_ev_1 = _mm_insert_epi8(v_ev_1, qm_port->reorder_id + 1, 7);
3912 		_mm_storeu_si128((__m128i *)&events[1], v_ev_1);
3913 		DLB2_INC_STAT(qm_port->ev_port->stats.rx_sched_cnt[hw_sched1],
3914 			      1);
3915 		/* fallthrough */
3916 	case 1:
3917 		v_ev_0 = _mm_unpacklo_epi64(v_unpk_ev_01, v_qe_0);
3918 		v_ev_0 = _mm_insert_epi8(v_ev_0, qm_port->reorder_id, 7);
3919 		_mm_storeu_si128((__m128i *)&events[0], v_ev_0);
3920 		DLB2_INC_STAT(qm_port->ev_port->stats.rx_sched_cnt[hw_sched0],
3921 			      1);
3922 	}
3923 	qm_port->reorder_id += valid_events;
3924 }
3925 
3926 static __rte_always_inline int
3927 dlb2_recv_qe_sparse_vec(struct dlb2_port *qm_port, void *events,
3928 			uint32_t max_events)
3929 {
3930 	/* Using unmasked idx for perf, and masking manually */
3931 	uint16_t idx = qm_port->cq_idx_unmasked;
3932 	volatile struct dlb2_dequeue_qe *cq_addr;
3933 
3934 	cq_addr = dlb2_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
3935 
3936 	uintptr_t qe_ptr_3 = (uintptr_t)&cq_addr[(idx + 12) &
3937 						 qm_port->cq_depth_mask];
3938 	uintptr_t qe_ptr_2 = (uintptr_t)&cq_addr[(idx +  8) &
3939 						 qm_port->cq_depth_mask];
3940 	uintptr_t qe_ptr_1 = (uintptr_t)&cq_addr[(idx +  4) &
3941 						 qm_port->cq_depth_mask];
3942 	uintptr_t qe_ptr_0 = (uintptr_t)&cq_addr[(idx +  0) &
3943 						 qm_port->cq_depth_mask];
3944 
3945 	/* Load QEs from CQ: use compiler barriers to avoid load reordering */
3946 	__m128i v_qe_3 = _mm_loadu_si128((const __m128i *)qe_ptr_3);
3947 	rte_compiler_barrier();
3948 	__m128i v_qe_2 = _mm_loadu_si128((const __m128i *)qe_ptr_2);
3949 	rte_compiler_barrier();
3950 	__m128i v_qe_1 = _mm_loadu_si128((const __m128i *)qe_ptr_1);
3951 	rte_compiler_barrier();
3952 	__m128i v_qe_0 = _mm_loadu_si128((const __m128i *)qe_ptr_0);
3953 
3954 	/* Generate the pkt_shuffle mask;
3955 	 * - Avoids load in otherwise load-heavy section of code
3956 	 * - Moves bytes 3,7,11,15 (gen bit bytes) to LSB bytes in XMM
3957 	 */
3958 	const uint32_t stat_shuf_bytes = (15 << 24) | (11 << 16) | (7 << 8) | 3;
3959 	__m128i v_zeros = _mm_setzero_si128();
3960 	__m128i v_ffff = _mm_cmpeq_epi8(v_zeros, v_zeros);
3961 	__m128i v_stat_shuf_mask = _mm_insert_epi32(v_ffff, stat_shuf_bytes, 0);
3962 
3963 	/* Extract u32 components required from the QE
3964 	 * - QE[64 to 95 ] for metadata (qid, sched, prio, event type, ...)
3965 	 * - QE[96 to 127] for status (cq gen bit, error)
3966 	 *
3967 	 * Note that stage 1 of the unpacking is re-used for both u32 extracts
3968 	 */
3969 	__m128i v_qe_02 = _mm_unpackhi_epi32(v_qe_0, v_qe_2);
3970 	__m128i v_qe_13 = _mm_unpackhi_epi32(v_qe_1, v_qe_3);
3971 	__m128i v_qe_status = _mm_unpackhi_epi32(v_qe_02, v_qe_13);
3972 	__m128i v_qe_meta   = _mm_unpacklo_epi32(v_qe_02, v_qe_13);
3973 
3974 	/* Status byte (gen_bit, error) handling:
3975 	 * - Shuffle to lanes 0,1,2,3, clear all others
3976 	 * - Shift right by 7 for gen bit to MSB, movemask to scalar
3977 	 * - Shift right by 2 for error bit to MSB, movemask to scalar
3978 	 */
3979 	__m128i v_qe_shuffled = _mm_shuffle_epi8(v_qe_status, v_stat_shuf_mask);
3980 	__m128i v_qes_shift_gen_bit = _mm_slli_epi32(v_qe_shuffled, 7);
3981 	int32_t qe_gen_bits = _mm_movemask_epi8(v_qes_shift_gen_bit) & 0xf;
3982 
3983 	/* Expected vs Reality of QE Gen bits
3984 	 * - cq_rolling_mask provides expected bits
3985 	 * - QE loads, unpacks/shuffle and movemask provides reality
3986 	 * - XOR of the two gives bitmask of new packets
3987 	 * - POPCNT to get the number of new events
3988 	 */
3989 	uint64_t rolling = qm_port->cq_rolling_mask & 0xF;
3990 	uint64_t qe_xor_bits = (qe_gen_bits ^ rolling);
3991 	uint32_t count_new = rte_popcount32(qe_xor_bits);
3992 	count_new = RTE_MIN(count_new, max_events);
3993 	if (!count_new)
3994 		return 0;
3995 
3996 	/* emulate a 128 bit rotate using 2x 64-bit numbers and bit-shifts */
3997 
3998 	uint64_t m_rshift = qm_port->cq_rolling_mask >> count_new;
3999 	uint64_t m_lshift = qm_port->cq_rolling_mask << (64 - count_new);
4000 	uint64_t m2_rshift = qm_port->cq_rolling_mask_2 >> count_new;
4001 	uint64_t m2_lshift = qm_port->cq_rolling_mask_2 << (64 - count_new);
4002 
4003 	/* shifted out of m2 into MSB of m */
4004 	qm_port->cq_rolling_mask = (m_rshift | m2_lshift);
4005 
4006 	/* shifted out of m "looped back" into MSB of m2 */
4007 	qm_port->cq_rolling_mask_2 = (m2_rshift | m_lshift);
4008 
4009 	/* Prefetch the next QEs - should run as IPC instead of cycles */
4010 	rte_prefetch0(&cq_addr[(idx + 16) & qm_port->cq_depth_mask]);
4011 	rte_prefetch0(&cq_addr[(idx + 20) & qm_port->cq_depth_mask]);
4012 	rte_prefetch0(&cq_addr[(idx + 24) & qm_port->cq_depth_mask]);
4013 	rte_prefetch0(&cq_addr[(idx + 28) & qm_port->cq_depth_mask]);
4014 
4015 	/* Convert QEs from XMM regs to events and store events directly */
4016 	_process_deq_qes_vec_impl(qm_port, events, v_qe_3, v_qe_2, v_qe_1,
4017 				  v_qe_0, v_qe_meta, v_qe_status, count_new);
4018 
4019 	return count_new;
4020 }
4021 
4022 static inline void
4023 dlb2_inc_cq_idx(struct dlb2_port *qm_port, int cnt)
4024 {
4025 	uint16_t idx = qm_port->cq_idx_unmasked + cnt;
4026 
4027 	qm_port->cq_idx_unmasked = idx;
4028 	qm_port->cq_idx = idx & qm_port->cq_depth_mask;
4029 	qm_port->gen_bit = (~(idx >> qm_port->gen_bit_shift)) & 0x1;
4030 }
4031 
4032 static inline int16_t
4033 dlb2_hw_dequeue_sparse(struct dlb2_eventdev *dlb2,
4034 		       struct dlb2_eventdev_port *ev_port,
4035 		       struct rte_event *events,
4036 		       uint16_t max_num,
4037 		       uint64_t dequeue_timeout_ticks)
4038 {
4039 	uint64_t start_ticks = 0ULL;
4040 	struct dlb2_port *qm_port;
4041 	int num = 0;
4042 	bool use_scalar;
4043 	uint64_t timeout;
4044 
4045 	qm_port = &ev_port->qm_port;
4046 	use_scalar = qm_port->use_scalar;
4047 
4048 	if (!dlb2->global_dequeue_wait)
4049 		timeout = dequeue_timeout_ticks;
4050 	else
4051 		timeout = dlb2->global_dequeue_wait_ticks;
4052 
4053 	if (timeout != 0)
4054 		start_ticks = rte_get_timer_cycles();
4055 
4056 	use_scalar = use_scalar || (max_num & 0x3);
4057 
4058 	while (num < max_num) {
4059 		struct dlb2_dequeue_qe qes[DLB2_NUM_QES_PER_CACHE_LINE];
4060 		int num_avail;
4061 
4062 		if (use_scalar) {
4063 			int n_iter = 0;
4064 			uint64_t m_rshift, m_lshift, m2_rshift, m2_lshift;
4065 
4066 			num_avail = dlb2_recv_qe_sparse(qm_port, qes);
4067 			num_avail = RTE_MIN(num_avail, max_num - num);
4068 			dlb2_inc_cq_idx(qm_port, num_avail << 2);
4069 			if (num_avail == DLB2_NUM_QES_PER_CACHE_LINE)
4070 				n_iter = dlb2_process_dequeue_four_qes(ev_port,
4071 								qm_port,
4072 								&events[num],
4073 								&qes[0]);
4074 			else if (num_avail)
4075 				n_iter = dlb2_process_dequeue_qes(ev_port,
4076 								qm_port,
4077 								&events[num],
4078 								&qes[0],
4079 								num_avail);
4080 			if (n_iter != 0) {
4081 				num += n_iter;
4082 				/* update rolling_mask for vector code support */
4083 				m_rshift = qm_port->cq_rolling_mask >> n_iter;
4084 				m_lshift = qm_port->cq_rolling_mask << (64 - n_iter);
4085 				m2_rshift = qm_port->cq_rolling_mask_2 >> n_iter;
4086 				m2_lshift = qm_port->cq_rolling_mask_2 <<
4087 					(64 - n_iter);
4088 				qm_port->cq_rolling_mask = (m_rshift | m2_lshift);
4089 				qm_port->cq_rolling_mask_2 = (m2_rshift | m_lshift);
4090 			}
4091 		} else { /* !use_scalar */
4092 			num_avail = dlb2_recv_qe_sparse_vec(qm_port,
4093 							    &events[num],
4094 							    max_num - num);
4095 			dlb2_inc_cq_idx(qm_port, num_avail << 2);
4096 			num += num_avail;
4097 			DLB2_INC_STAT(ev_port->stats.traffic.rx_ok, num_avail);
4098 		}
4099 		if (!num_avail) {
4100 			if ((timeout == 0) || (num > 0))
4101 				/* Not waiting in any form or 1+ events recd */
4102 				break;
4103 			else if (dlb2_dequeue_wait(dlb2, ev_port, qm_port,
4104 						   timeout, start_ticks))
4105 				break;
4106 		}
4107 	}
4108 
4109 	qm_port->owed_tokens += num;
4110 
4111 	if (num) {
4112 		if (qm_port->token_pop_mode == AUTO_POP)
4113 			dlb2_consume_qe_immediate(qm_port, num);
4114 
4115 		ev_port->outstanding_releases += num;
4116 
4117 		dlb2_port_credits_inc(qm_port, num);
4118 	}
4119 
4120 	return num;
4121 }
4122 
4123 static __rte_always_inline int
4124 dlb2_recv_qe(struct dlb2_port *qm_port, struct dlb2_dequeue_qe *qe,
4125 	     uint8_t *offset)
4126 {
4127 	uint8_t xor_mask[2][4] = { {0x0F, 0x0E, 0x0C, 0x08},
4128 				   {0x00, 0x01, 0x03, 0x07} };
4129 	uint8_t and_mask[4] = {0x0F, 0x0E, 0x0C, 0x08};
4130 	volatile struct dlb2_dequeue_qe *cq_addr;
4131 	__m128i *qes = (__m128i *)qe;
4132 	uint64_t *cache_line_base;
4133 	uint8_t gen_bits;
4134 
4135 	cq_addr = dlb2_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
4136 	cq_addr = &cq_addr[qm_port->cq_idx];
4137 
4138 	cache_line_base = (void *)(((uintptr_t)cq_addr) & ~0x3F);
4139 	*offset = ((uintptr_t)cq_addr & 0x30) >> 4;
4140 
4141 	/* Load the next CQ cache line from memory. Pack these reads as tight
4142 	 * as possible to reduce the chance that DLB invalidates the line while
4143 	 * the CPU is reading it. Read the cache line backwards to ensure that
4144 	 * if QE[N] (N > 0) is valid, then QEs[0:N-1] are too.
4145 	 *
4146 	 * (Valid QEs start at &qe[offset])
4147 	 */
4148 	qes[3] = _mm_load_si128((__m128i *)&cache_line_base[6]);
4149 	qes[2] = _mm_load_si128((__m128i *)&cache_line_base[4]);
4150 	qes[1] = _mm_load_si128((__m128i *)&cache_line_base[2]);
4151 	qes[0] = _mm_load_si128((__m128i *)&cache_line_base[0]);
4152 
4153 	/* Evict the cache line ASAP */
4154 	rte_cldemote(cache_line_base);
4155 
4156 	/* Extract and combine the gen bits */
4157 	gen_bits = ((_mm_extract_epi8(qes[0], 15) & 0x1) << 0) |
4158 		   ((_mm_extract_epi8(qes[1], 15) & 0x1) << 1) |
4159 		   ((_mm_extract_epi8(qes[2], 15) & 0x1) << 2) |
4160 		   ((_mm_extract_epi8(qes[3], 15) & 0x1) << 3);
4161 
4162 	/* XOR the combined bits such that a 1 represents a valid QE */
4163 	gen_bits ^= xor_mask[qm_port->gen_bit][*offset];
4164 
4165 	/* Mask off gen bits we don't care about */
4166 	gen_bits &= and_mask[*offset];
4167 
4168 	return rte_popcount32(gen_bits);
4169 }
4170 
4171 static inline int16_t
4172 dlb2_hw_dequeue(struct dlb2_eventdev *dlb2,
4173 		struct dlb2_eventdev_port *ev_port,
4174 		struct rte_event *events,
4175 		uint16_t max_num,
4176 		uint64_t dequeue_timeout_ticks)
4177 {
4178 	uint64_t timeout;
4179 	uint64_t start_ticks = 0ULL;
4180 	struct dlb2_port *qm_port;
4181 	int num = 0;
4182 
4183 	qm_port = &ev_port->qm_port;
4184 
4185 	/* We have a special implementation for waiting. Wait can be:
4186 	 * 1) no waiting at all
4187 	 * 2) busy poll only
4188 	 * 3) wait for interrupt. If wakeup and poll time
4189 	 * has expired, then return to caller
4190 	 * 4) umonitor/umwait repeatedly up to poll time
4191 	 */
4192 
4193 	/* If configured for per dequeue wait, then use wait value provided
4194 	 * to this API. Otherwise we must use the global
4195 	 * value from eventdev config time.
4196 	 */
4197 	if (!dlb2->global_dequeue_wait)
4198 		timeout = dequeue_timeout_ticks;
4199 	else
4200 		timeout = dlb2->global_dequeue_wait_ticks;
4201 
4202 	if (timeout != 0)
4203 		start_ticks = rte_get_timer_cycles();
4204 
4205 	while (num < max_num) {
4206 		struct dlb2_dequeue_qe qes[DLB2_NUM_QES_PER_CACHE_LINE];
4207 		uint8_t offset;
4208 		int num_avail;
4209 
4210 		/* Copy up to 4 QEs from the current cache line into qes */
4211 		num_avail = dlb2_recv_qe(qm_port, qes, &offset);
4212 
4213 		/* But don't process more than the user requested */
4214 		num_avail = RTE_MIN(num_avail, max_num - num);
4215 
4216 		dlb2_inc_cq_idx(qm_port, num_avail);
4217 
4218 		if (num_avail == DLB2_NUM_QES_PER_CACHE_LINE)
4219 			num += dlb2_process_dequeue_four_qes(ev_port,
4220 							     qm_port,
4221 							     &events[num],
4222 							     &qes[offset]);
4223 		else if (num_avail)
4224 			num += dlb2_process_dequeue_qes(ev_port,
4225 							qm_port,
4226 							&events[num],
4227 							&qes[offset],
4228 							num_avail);
4229 		else if ((timeout == 0) || (num > 0))
4230 			/* Not waiting in any form, or 1+ events received? */
4231 			break;
4232 		else if (dlb2_dequeue_wait(dlb2, ev_port, qm_port,
4233 					   timeout, start_ticks))
4234 			break;
4235 	}
4236 
4237 	qm_port->owed_tokens += num;
4238 
4239 	if (num) {
4240 		if (qm_port->token_pop_mode == AUTO_POP)
4241 			dlb2_consume_qe_immediate(qm_port, num);
4242 
4243 		ev_port->outstanding_releases += num;
4244 
4245 		dlb2_port_credits_inc(qm_port, num);
4246 	}
4247 
4248 	return num;
4249 }
4250 
4251 static uint16_t
4252 dlb2_event_dequeue_burst(void *event_port, struct rte_event *ev, uint16_t num,
4253 			 uint64_t wait)
4254 {
4255 	struct dlb2_eventdev_port *ev_port = event_port;
4256 	struct dlb2_port *qm_port = &ev_port->qm_port;
4257 	struct dlb2_eventdev *dlb2 = ev_port->dlb2;
4258 	struct dlb2_reorder *order = qm_port->order;
4259 	uint16_t cnt;
4260 
4261 	RTE_ASSERT(ev_port->setup_done);
4262 	RTE_ASSERT(ev != NULL);
4263 
4264 	if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
4265 		uint16_t out_rels = ev_port->outstanding_releases;
4266 		if (qm_port->reorder_en) {
4267 			/* for directed, no-op command-byte = 0, but set dsi field */
4268 			/* for load-balanced, set COMP */
4269 			uint64_t release_u64 =
4270 			    qm_port->is_directed ? 0xFF : (uint64_t)DLB2_COMP_CMD_BYTE << 56;
4271 
4272 			for (uint8_t i = order->next_to_enqueue; i != qm_port->reorder_id; i++)
4273 				if (order->enq_reorder[i].u64[1] == 0)
4274 					order->enq_reorder[i].u64[1] = release_u64;
4275 
4276 			__dlb2_event_enqueue_burst_reorder(event_port, NULL, 0,
4277 						   qm_port->token_pop_mode == DELAYED_POP);
4278 		} else {
4279 			dlb2_event_release(dlb2, ev_port->id, out_rels);
4280 		}
4281 
4282 		DLB2_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
4283 	}
4284 
4285 	if (qm_port->token_pop_mode == DEFERRED_POP && qm_port->owed_tokens)
4286 		dlb2_consume_qe_immediate(qm_port, qm_port->owed_tokens);
4287 
4288 	cnt = dlb2_hw_dequeue(dlb2, ev_port, ev, num, wait);
4289 
4290 	DLB2_INC_STAT(ev_port->stats.traffic.total_polls, 1);
4291 	DLB2_INC_STAT(ev_port->stats.traffic.zero_polls, ((cnt == 0) ? 1 : 0));
4292 
4293 	return cnt;
4294 }
4295 
4296 static uint16_t
4297 dlb2_event_dequeue_burst_sparse(void *event_port, struct rte_event *ev,
4298 				uint16_t num, uint64_t wait)
4299 {
4300 	struct dlb2_eventdev_port *ev_port = event_port;
4301 	struct dlb2_port *qm_port = &ev_port->qm_port;
4302 	struct dlb2_eventdev *dlb2 = ev_port->dlb2;
4303 	struct dlb2_reorder *order = qm_port->order;
4304 	uint16_t cnt;
4305 
4306 	RTE_ASSERT(ev_port->setup_done);
4307 	RTE_ASSERT(ev != NULL);
4308 
4309 	if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
4310 		uint16_t out_rels = ev_port->outstanding_releases;
4311 		if (qm_port->reorder_en) {
4312 			struct rte_event release_burst[8];
4313 			int num_releases = 0;
4314 
4315 			/* go through reorder buffer looking for missing releases. */
4316 			for (uint8_t i = order->next_to_enqueue; i != qm_port->reorder_id; i++) {
4317 				if (order->enq_reorder[i].u64[1] == 0) {
4318 					release_burst[num_releases++] = (struct rte_event){
4319 						.op = RTE_EVENT_OP_RELEASE,
4320 							.impl_opaque = i,
4321 					};
4322 
4323 					if (num_releases == RTE_DIM(release_burst)) {
4324 						__dlb2_event_enqueue_burst_reorder(event_port,
4325 							release_burst, RTE_DIM(release_burst),
4326 							qm_port->token_pop_mode == DELAYED_POP);
4327 						num_releases = 0;
4328 					}
4329 				}
4330 			}
4331 
4332 			if (num_releases)
4333 				__dlb2_event_enqueue_burst_reorder(event_port, release_burst
4334 					, num_releases, qm_port->token_pop_mode == DELAYED_POP);
4335 		} else {
4336 			dlb2_event_release(dlb2, ev_port->id, out_rels);
4337 		}
4338 
4339 		RTE_ASSERT(ev_port->outstanding_releases == 0);
4340 		DLB2_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
4341 	}
4342 
4343 	if (qm_port->token_pop_mode == DEFERRED_POP && qm_port->owed_tokens)
4344 		dlb2_consume_qe_immediate(qm_port, qm_port->owed_tokens);
4345 
4346 	cnt = dlb2_hw_dequeue_sparse(dlb2, ev_port, ev, num, wait);
4347 
4348 	DLB2_INC_STAT(ev_port->stats.traffic.total_polls, 1);
4349 	DLB2_INC_STAT(ev_port->stats.traffic.zero_polls, ((cnt == 0) ? 1 : 0));
4350 	return cnt;
4351 }
4352 
4353 static void
4354 dlb2_flush_port(struct rte_eventdev *dev, int port_id)
4355 {
4356 	struct dlb2_eventdev *dlb2 = dlb2_pmd_priv(dev);
4357 	struct dlb2_eventdev_port *ev_port = &dlb2->ev_ports[port_id];
4358 	struct dlb2_reorder *order = ev_port->qm_port.order;
4359 	eventdev_stop_flush_t flush;
4360 	struct rte_event ev;
4361 	uint8_t dev_id;
4362 	void *arg;
4363 	int i;
4364 
4365 	flush = dev->dev_ops->dev_stop_flush;
4366 	dev_id = dev->data->dev_id;
4367 	arg = dev->data->dev_stop_flush_arg;
4368 
4369 	while (rte_event_dequeue_burst(dev_id, port_id, &ev, 1, 0)) {
4370 		if (flush)
4371 			flush(dev_id, ev, arg);
4372 
4373 		if (dlb2->ev_ports[port_id].qm_port.is_directed)
4374 			continue;
4375 
4376 		ev.op = RTE_EVENT_OP_RELEASE;
4377 
4378 		rte_event_enqueue_burst(dev_id, port_id, &ev, 1);
4379 	}
4380 
4381 	/* Enqueue any additional outstanding releases */
4382 	ev.op = RTE_EVENT_OP_RELEASE;
4383 
4384 	for (i = dlb2->ev_ports[port_id].outstanding_releases; i > 0; i--) {
4385 		ev.impl_opaque = order ? order->next_to_enqueue : 0;
4386 		rte_event_enqueue_burst(dev_id, port_id, &ev, 1);
4387 	}
4388 }
4389 
4390 static uint32_t
4391 dlb2_get_ldb_queue_depth(struct dlb2_eventdev *dlb2,
4392 			 struct dlb2_eventdev_queue *queue)
4393 {
4394 	struct dlb2_hw_dev *handle = &dlb2->qm_instance;
4395 	struct dlb2_get_ldb_queue_depth_args cfg;
4396 	int ret;
4397 
4398 	cfg.queue_id = queue->qm_queue.id;
4399 
4400 	ret = dlb2_iface_get_ldb_queue_depth(handle, &cfg);
4401 	if (ret < 0) {
4402 		DLB2_LOG_ERR("dlb2: get_ldb_queue_depth ret=%d (driver status: %s)",
4403 			     ret, dlb2_error_strings[cfg.response.status]);
4404 		return ret;
4405 	}
4406 
4407 	return cfg.response.id;
4408 }
4409 
4410 static uint32_t
4411 dlb2_get_dir_queue_depth(struct dlb2_eventdev *dlb2,
4412 			 struct dlb2_eventdev_queue *queue)
4413 {
4414 	struct dlb2_hw_dev *handle = &dlb2->qm_instance;
4415 	struct dlb2_get_dir_queue_depth_args cfg;
4416 	int ret;
4417 
4418 	cfg.queue_id = queue->qm_queue.id;
4419 
4420 	ret = dlb2_iface_get_dir_queue_depth(handle, &cfg);
4421 	if (ret < 0) {
4422 		DLB2_LOG_ERR("dlb2: get_dir_queue_depth ret=%d (driver status: %s)",
4423 			     ret, dlb2_error_strings[cfg.response.status]);
4424 		return ret;
4425 	}
4426 
4427 	return cfg.response.id;
4428 }
4429 
4430 uint32_t
4431 dlb2_get_queue_depth(struct dlb2_eventdev *dlb2,
4432 		     struct dlb2_eventdev_queue *queue)
4433 {
4434 	if (queue->qm_queue.is_directed)
4435 		return dlb2_get_dir_queue_depth(dlb2, queue);
4436 	else
4437 		return dlb2_get_ldb_queue_depth(dlb2, queue);
4438 }
4439 
4440 static bool
4441 dlb2_queue_is_empty(struct dlb2_eventdev *dlb2,
4442 		    struct dlb2_eventdev_queue *queue)
4443 {
4444 	return dlb2_get_queue_depth(dlb2, queue) == 0;
4445 }
4446 
4447 static bool
4448 dlb2_linked_queues_empty(struct dlb2_eventdev *dlb2)
4449 {
4450 	int i;
4451 
4452 	for (i = 0; i < dlb2->num_queues; i++) {
4453 		if (dlb2->ev_queues[i].num_links == 0)
4454 			continue;
4455 		if (!dlb2_queue_is_empty(dlb2, &dlb2->ev_queues[i]))
4456 			return false;
4457 	}
4458 
4459 	return true;
4460 }
4461 
4462 static bool
4463 dlb2_queues_empty(struct dlb2_eventdev *dlb2)
4464 {
4465 	int i;
4466 
4467 	for (i = 0; i < dlb2->num_queues; i++) {
4468 		if (!dlb2_queue_is_empty(dlb2, &dlb2->ev_queues[i]))
4469 			return false;
4470 	}
4471 
4472 	return true;
4473 }
4474 
4475 static void
4476 dlb2_drain(struct rte_eventdev *dev)
4477 {
4478 	struct dlb2_eventdev *dlb2 = dlb2_pmd_priv(dev);
4479 	struct dlb2_eventdev_port *ev_port = NULL;
4480 	uint8_t dev_id;
4481 	int i;
4482 
4483 	dev_id = dev->data->dev_id;
4484 
4485 	while (!dlb2_linked_queues_empty(dlb2)) {
4486 		/* Flush all the ev_ports, which will drain all their connected
4487 		 * queues.
4488 		 */
4489 		for (i = 0; i < dlb2->num_ports; i++)
4490 			dlb2_flush_port(dev, i);
4491 	}
4492 
4493 	/* The queues are empty, but there may be events left in the ports. */
4494 	for (i = 0; i < dlb2->num_ports; i++)
4495 		dlb2_flush_port(dev, i);
4496 
4497 	/* If the domain's queues are empty, we're done. */
4498 	if (dlb2_queues_empty(dlb2))
4499 		return;
4500 
4501 	/* Else, there must be at least one unlinked load-balanced queue.
4502 	 * Select a load-balanced port with which to drain the unlinked
4503 	 * queue(s).
4504 	 */
4505 	for (i = 0; i < dlb2->num_ports; i++) {
4506 		ev_port = &dlb2->ev_ports[i];
4507 
4508 		if (!ev_port->qm_port.is_directed)
4509 			break;
4510 	}
4511 
4512 	if (i == dlb2->num_ports) {
4513 		DLB2_LOG_ERR("internal error: no LDB ev_ports");
4514 		return;
4515 	}
4516 
4517 	rte_errno = 0;
4518 	rte_event_port_unlink(dev_id, ev_port->id, NULL, 0);
4519 
4520 	if (rte_errno) {
4521 		DLB2_LOG_ERR("internal error: failed to unlink ev_port %d",
4522 			     ev_port->id);
4523 		return;
4524 	}
4525 
4526 	for (i = 0; i < dlb2->num_queues; i++) {
4527 		uint8_t qid, prio;
4528 		int ret;
4529 
4530 		if (dlb2_queue_is_empty(dlb2, &dlb2->ev_queues[i]))
4531 			continue;
4532 
4533 		qid = i;
4534 		prio = 0;
4535 
4536 		/* Link the ev_port to the queue */
4537 		ret = rte_event_port_link(dev_id, ev_port->id, &qid, &prio, 1);
4538 		if (ret != 1) {
4539 			DLB2_LOG_ERR("internal error: failed to link ev_port %d to queue %d",
4540 				     ev_port->id, qid);
4541 			return;
4542 		}
4543 
4544 		/* Flush the queue */
4545 		while (!dlb2_queue_is_empty(dlb2, &dlb2->ev_queues[i]))
4546 			dlb2_flush_port(dev, ev_port->id);
4547 
4548 		/* Drain any extant events in the ev_port. */
4549 		dlb2_flush_port(dev, ev_port->id);
4550 
4551 		/* Unlink the ev_port from the queue */
4552 		ret = rte_event_port_unlink(dev_id, ev_port->id, &qid, 1);
4553 		if (ret != 1) {
4554 			DLB2_LOG_ERR("internal error: failed to unlink ev_port %d to queue %d",
4555 				     ev_port->id, qid);
4556 			return;
4557 		}
4558 	}
4559 }
4560 
4561 static void
4562 dlb2_eventdev_stop(struct rte_eventdev *dev)
4563 {
4564 	struct dlb2_eventdev *dlb2 = dlb2_pmd_priv(dev);
4565 
4566 	rte_spinlock_lock(&dlb2->qm_instance.resource_lock);
4567 
4568 	if (dlb2->run_state == DLB2_RUN_STATE_STOPPED) {
4569 		DLB2_LOG_LINE_DBG("Internal error: already stopped");
4570 		rte_spinlock_unlock(&dlb2->qm_instance.resource_lock);
4571 		return;
4572 	} else if (dlb2->run_state != DLB2_RUN_STATE_STARTED) {
4573 		DLB2_LOG_ERR("Internal error: bad state %d for dev_stop",
4574 			     (int)dlb2->run_state);
4575 		rte_spinlock_unlock(&dlb2->qm_instance.resource_lock);
4576 		return;
4577 	}
4578 
4579 	dlb2->run_state = DLB2_RUN_STATE_STOPPING;
4580 
4581 	rte_spinlock_unlock(&dlb2->qm_instance.resource_lock);
4582 
4583 	dlb2_drain(dev);
4584 
4585 	dlb2->run_state = DLB2_RUN_STATE_STOPPED;
4586 }
4587 
4588 static int
4589 dlb2_eventdev_close(struct rte_eventdev *dev)
4590 {
4591 	dlb2_hw_reset_sched_domain(dev, false);
4592 
4593 	return 0;
4594 }
4595 
4596 static void
4597 dlb2_eventdev_queue_release(struct rte_eventdev *dev, uint8_t id)
4598 {
4599 	RTE_SET_USED(dev);
4600 	RTE_SET_USED(id);
4601 
4602 	/* This function intentionally left blank. */
4603 }
4604 
4605 static void
4606 dlb2_eventdev_port_release(void *port)
4607 {
4608 	struct dlb2_eventdev_port *ev_port = port;
4609 	struct dlb2_port *qm_port;
4610 
4611 	if (ev_port) {
4612 		qm_port = &ev_port->qm_port;
4613 		if (qm_port->config_state == DLB2_CONFIGURED)
4614 			dlb2_free_qe_mem(qm_port);
4615 	}
4616 }
4617 
4618 static int
4619 dlb2_eventdev_timeout_ticks(struct rte_eventdev *dev, uint64_t ns,
4620 			    uint64_t *timeout_ticks)
4621 {
4622 	RTE_SET_USED(dev);
4623 	uint64_t cycles_per_ns = rte_get_timer_hz() / 1E9;
4624 
4625 	*timeout_ticks = ns * cycles_per_ns;
4626 
4627 	return 0;
4628 }
4629 
4630 static void
4631 dlb2_entry_points_init(struct rte_eventdev *dev)
4632 {
4633 	struct dlb2_eventdev *dlb2;
4634 
4635 	/* Expose PMD's eventdev interface */
4636 	static struct eventdev_ops dlb2_eventdev_entry_ops = {
4637 		.dev_infos_get    = dlb2_eventdev_info_get,
4638 		.dev_configure    = dlb2_eventdev_configure,
4639 		.dev_start        = dlb2_eventdev_start,
4640 		.dev_stop         = dlb2_eventdev_stop,
4641 		.dev_close        = dlb2_eventdev_close,
4642 		.queue_def_conf   = dlb2_eventdev_queue_default_conf_get,
4643 		.queue_setup      = dlb2_eventdev_queue_setup,
4644 		.queue_release    = dlb2_eventdev_queue_release,
4645 		.port_def_conf    = dlb2_eventdev_port_default_conf_get,
4646 		.port_setup       = dlb2_eventdev_port_setup,
4647 		.port_release     = dlb2_eventdev_port_release,
4648 		.port_link        = dlb2_eventdev_port_link,
4649 		.port_unlink      = dlb2_eventdev_port_unlink,
4650 		.port_unlinks_in_progress =
4651 				    dlb2_eventdev_port_unlinks_in_progress,
4652 		.timeout_ticks    = dlb2_eventdev_timeout_ticks,
4653 		.dump             = dlb2_eventdev_dump,
4654 		.xstats_get       = dlb2_eventdev_xstats_get,
4655 		.xstats_get_names = dlb2_eventdev_xstats_get_names,
4656 		.xstats_get_by_name = dlb2_eventdev_xstats_get_by_name,
4657 		.xstats_reset	    = dlb2_eventdev_xstats_reset,
4658 		.dev_selftest     = test_dlb2_eventdev,
4659 	};
4660 
4661 	/* Expose PMD's eventdev interface */
4662 
4663 	dev->dev_ops = &dlb2_eventdev_entry_ops;
4664 	dev->enqueue_burst = dlb2_event_enqueue_burst;
4665 	dev->enqueue_new_burst = dlb2_event_enqueue_new_burst;
4666 	dev->enqueue_forward_burst = dlb2_event_enqueue_forward_burst;
4667 
4668 	dlb2 = dev->data->dev_private;
4669 	if (dlb2->poll_mode == DLB2_CQ_POLL_MODE_SPARSE)
4670 		dev->dequeue_burst = dlb2_event_dequeue_burst_sparse;
4671 	else
4672 		dev->dequeue_burst = dlb2_event_dequeue_burst;
4673 }
4674 
4675 int
4676 dlb2_primary_eventdev_probe(struct rte_eventdev *dev,
4677 			    const char *name,
4678 			    struct dlb2_devargs *dlb2_args)
4679 {
4680 	struct dlb2_eventdev *dlb2;
4681 	int err, i;
4682 
4683 	dlb2 = dev->data->dev_private;
4684 
4685 	dlb2->event_dev = dev; /* backlink */
4686 
4687 	evdev_dlb2_default_info.driver_name = name;
4688 
4689 	dlb2->max_num_events_override = dlb2_args->max_num_events;
4690 	dlb2->num_dir_credits_override = dlb2_args->num_dir_credits_override;
4691 	dlb2->poll_interval = dlb2_args->poll_interval;
4692 	dlb2->sw_credit_quanta = dlb2_args->sw_credit_quanta;
4693 	dlb2->hw_credit_quanta = dlb2_args->hw_credit_quanta;
4694 	dlb2->default_depth_thresh = dlb2_args->default_depth_thresh;
4695 	dlb2->vector_opts_enabled = dlb2_args->vector_opts_enabled;
4696 	dlb2->enable_cq_weight = dlb2_args->enable_cq_weight;
4697 
4698 
4699 	if (dlb2_args->max_cq_depth != 0)
4700 		dlb2->max_cq_depth = dlb2_args->max_cq_depth;
4701 	else
4702 		dlb2->max_cq_depth = DLB2_DEFAULT_CQ_DEPTH;
4703 
4704 	evdev_dlb2_default_info.max_event_port_dequeue_depth = dlb2->max_cq_depth;
4705 
4706 	if (dlb2_args->max_enq_depth != 0)
4707 		dlb2->max_enq_depth = dlb2_args->max_enq_depth;
4708 	else
4709 		dlb2->max_enq_depth = DLB2_DEFAULT_CQ_DEPTH;
4710 
4711 	evdev_dlb2_default_info.max_event_port_enqueue_depth =
4712 		dlb2->max_enq_depth;
4713 
4714 	dlb2_init_queue_depth_thresholds(dlb2,
4715 					 dlb2_args->qid_depth_thresholds.val);
4716 
4717 	dlb2_init_port_cos(dlb2,
4718 			   dlb2_args->port_cos.cos_id);
4719 
4720 	dlb2_init_cos_bw(dlb2,
4721 			 &dlb2_args->cos_bw);
4722 
4723 	err = dlb2_iface_open(&dlb2->qm_instance, name);
4724 	if (err < 0) {
4725 		DLB2_LOG_ERR("could not open event hardware device, err=%d",
4726 			     err);
4727 		return err;
4728 	}
4729 
4730 	err = dlb2_iface_get_device_version(&dlb2->qm_instance,
4731 					    &dlb2->revision);
4732 	if (err < 0) {
4733 		DLB2_LOG_ERR("dlb2: failed to get the device version, err=%d",
4734 			     err);
4735 		return err;
4736 	}
4737 
4738 	err = dlb2_hw_query_resources(dlb2);
4739 	if (err) {
4740 		DLB2_LOG_ERR("get resources err=%d for %s",
4741 			     err, name);
4742 		return err;
4743 	}
4744 
4745 	dlb2_iface_hardware_init(&dlb2->qm_instance);
4746 
4747 	/* configure class of service */
4748 	{
4749 		struct dlb2_set_cos_bw_args
4750 			set_cos_bw_args = { {0} };
4751 		int id;
4752 		int ret = 0;
4753 
4754 		for (id = 0; id < DLB2_COS_NUM_VALS; id++) {
4755 			set_cos_bw_args.cos_id = id;
4756 			set_cos_bw_args.bandwidth = dlb2->cos_bw[id];
4757 			ret = dlb2_iface_set_cos_bw(&dlb2->qm_instance,
4758 						    &set_cos_bw_args);
4759 			if (ret != 0)
4760 				break;
4761 		}
4762 		if (ret) {
4763 			DLB2_LOG_ERR("dlb2: failed to configure class of service, err=%d",
4764 				     err);
4765 			return err;
4766 		}
4767 	}
4768 
4769 	err = dlb2_iface_get_cq_poll_mode(&dlb2->qm_instance, &dlb2->poll_mode);
4770 	if (err < 0) {
4771 		DLB2_LOG_ERR("dlb2: failed to get the poll mode, err=%d",
4772 			     err);
4773 		return err;
4774 	}
4775 
4776 	/* Complete xtstats runtime initialization */
4777 	err = dlb2_xstats_init(dlb2);
4778 	if (err) {
4779 		DLB2_LOG_ERR("dlb2: failed to init xstats, err=%d", err);
4780 		return err;
4781 	}
4782 
4783 	/* Initialize each port's token pop mode */
4784 	for (i = 0; i < DLB2_MAX_NUM_PORTS(dlb2->version); i++)
4785 		dlb2->ev_ports[i].qm_port.token_pop_mode = AUTO_POP;
4786 
4787 	rte_spinlock_init(&dlb2->qm_instance.resource_lock);
4788 
4789 	dlb2_iface_low_level_io_init();
4790 
4791 	dlb2_entry_points_init(dev);
4792 
4793 	return 0;
4794 }
4795 
4796 int
4797 dlb2_secondary_eventdev_probe(struct rte_eventdev *dev,
4798 			      const char *name)
4799 {
4800 	struct dlb2_eventdev *dlb2;
4801 	int err;
4802 
4803 	dlb2 = dev->data->dev_private;
4804 
4805 	evdev_dlb2_default_info.driver_name = name;
4806 
4807 	err = dlb2_iface_open(&dlb2->qm_instance, name);
4808 	if (err < 0) {
4809 		DLB2_LOG_ERR("could not open event hardware device, err=%d",
4810 			     err);
4811 		return err;
4812 	}
4813 
4814 	err = dlb2_hw_query_resources(dlb2);
4815 	if (err) {
4816 		DLB2_LOG_ERR("get resources err=%d for %s",
4817 			     err, name);
4818 		return err;
4819 	}
4820 
4821 	dlb2_iface_low_level_io_init();
4822 
4823 	dlb2_entry_points_init(dev);
4824 
4825 	return 0;
4826 }
4827 
4828 int
4829 dlb2_parse_params(const char *params,
4830 		  const char *name,
4831 		  struct dlb2_devargs *dlb2_args,
4832 		  uint8_t version)
4833 {
4834 	int ret = 0;
4835 	static const char * const args[] = { NUMA_NODE_ARG,
4836 					     DLB2_MAX_NUM_EVENTS,
4837 					     DLB2_NUM_DIR_CREDITS,
4838 					     DEV_ID_ARG,
4839 					     DLB2_QID_DEPTH_THRESH_ARG,
4840 					     DLB2_POLL_INTERVAL_ARG,
4841 					     DLB2_SW_CREDIT_QUANTA_ARG,
4842 					     DLB2_HW_CREDIT_QUANTA_ARG,
4843 					     DLB2_DEPTH_THRESH_ARG,
4844 					     DLB2_VECTOR_OPTS_ENAB_ARG,
4845 					     DLB2_MAX_CQ_DEPTH,
4846 					     DLB2_MAX_ENQ_DEPTH,
4847 					     DLB2_PORT_COS,
4848 					     DLB2_COS_BW,
4849 					     DLB2_PRODUCER_COREMASK,
4850 					     DLB2_DEFAULT_LDB_PORT_ALLOCATION_ARG,
4851 					     DLB2_ENABLE_CQ_WEIGHT_ARG,
4852 					     NULL };
4853 
4854 	if (params != NULL && params[0] != '\0') {
4855 		struct rte_kvargs *kvlist = rte_kvargs_parse(params, args);
4856 
4857 		if (kvlist == NULL) {
4858 			DLB2_LOG_INFO("Ignoring unsupported parameters when creating device '%s'",
4859 				      name);
4860 		} else {
4861 			int ret = rte_kvargs_process(kvlist, NUMA_NODE_ARG,
4862 						     set_numa_node,
4863 						     &dlb2_args->socket_id);
4864 			if (ret != 0) {
4865 				DLB2_LOG_ERR("%s: Error parsing numa node parameter",
4866 					     name);
4867 				rte_kvargs_free(kvlist);
4868 				return ret;
4869 			}
4870 
4871 			ret = rte_kvargs_process(kvlist, DLB2_MAX_NUM_EVENTS,
4872 						 set_max_num_events,
4873 						 &dlb2_args->max_num_events);
4874 			if (ret != 0) {
4875 				DLB2_LOG_ERR("%s: Error parsing max_num_events parameter",
4876 					     name);
4877 				rte_kvargs_free(kvlist);
4878 				return ret;
4879 			}
4880 
4881 			if (version == DLB2_HW_V2) {
4882 				ret = rte_kvargs_process(kvlist,
4883 					DLB2_NUM_DIR_CREDITS,
4884 					set_num_dir_credits,
4885 					&dlb2_args->num_dir_credits_override);
4886 				if (ret != 0) {
4887 					DLB2_LOG_ERR("%s: Error parsing num_dir_credits parameter",
4888 						     name);
4889 					rte_kvargs_free(kvlist);
4890 					return ret;
4891 				}
4892 			}
4893 			ret = rte_kvargs_process(kvlist, DEV_ID_ARG,
4894 						 set_dev_id,
4895 						 &dlb2_args->dev_id);
4896 			if (ret != 0) {
4897 				DLB2_LOG_ERR("%s: Error parsing dev_id parameter",
4898 					     name);
4899 				rte_kvargs_free(kvlist);
4900 				return ret;
4901 			}
4902 
4903 			if (version == DLB2_HW_V2) {
4904 				ret = rte_kvargs_process(
4905 					kvlist,
4906 					DLB2_QID_DEPTH_THRESH_ARG,
4907 					set_qid_depth_thresh,
4908 					&dlb2_args->qid_depth_thresholds);
4909 			} else {
4910 				ret = rte_kvargs_process(
4911 					kvlist,
4912 					DLB2_QID_DEPTH_THRESH_ARG,
4913 					set_qid_depth_thresh_v2_5,
4914 					&dlb2_args->qid_depth_thresholds);
4915 			}
4916 			if (ret != 0) {
4917 				DLB2_LOG_ERR("%s: Error parsing qid_depth_thresh parameter",
4918 					     name);
4919 				rte_kvargs_free(kvlist);
4920 				return ret;
4921 			}
4922 
4923 			ret = rte_kvargs_process(kvlist, DLB2_POLL_INTERVAL_ARG,
4924 						 set_poll_interval,
4925 						 &dlb2_args->poll_interval);
4926 			if (ret != 0) {
4927 				DLB2_LOG_ERR("%s: Error parsing poll interval parameter",
4928 					     name);
4929 				rte_kvargs_free(kvlist);
4930 				return ret;
4931 			}
4932 
4933 			ret = rte_kvargs_process(kvlist,
4934 						 DLB2_SW_CREDIT_QUANTA_ARG,
4935 						 set_sw_credit_quanta,
4936 						 &dlb2_args->sw_credit_quanta);
4937 			if (ret != 0) {
4938 				DLB2_LOG_ERR("%s: Error parsing sw credit quanta parameter",
4939 					     name);
4940 				rte_kvargs_free(kvlist);
4941 				return ret;
4942 			}
4943 
4944 			ret = rte_kvargs_process(kvlist,
4945 						 DLB2_HW_CREDIT_QUANTA_ARG,
4946 						 set_hw_credit_quanta,
4947 						 &dlb2_args->hw_credit_quanta);
4948 			if (ret != 0) {
4949 				DLB2_LOG_ERR("%s: Error parsing hw credit quanta parameter",
4950 					     name);
4951 				rte_kvargs_free(kvlist);
4952 				return ret;
4953 			}
4954 
4955 			ret = rte_kvargs_process(kvlist, DLB2_DEPTH_THRESH_ARG,
4956 					set_default_depth_thresh,
4957 					&dlb2_args->default_depth_thresh);
4958 			if (ret != 0) {
4959 				DLB2_LOG_ERR("%s: Error parsing set depth thresh parameter",
4960 					     name);
4961 				rte_kvargs_free(kvlist);
4962 				return ret;
4963 			}
4964 
4965 			ret = rte_kvargs_process(kvlist,
4966 					DLB2_VECTOR_OPTS_ENAB_ARG,
4967 					set_vector_opts_enab,
4968 					&dlb2_args->vector_opts_enabled);
4969 			if (ret != 0) {
4970 				DLB2_LOG_ERR("%s: Error parsing vector opts enabled",
4971 					     name);
4972 				rte_kvargs_free(kvlist);
4973 				return ret;
4974 			}
4975 
4976 			ret = rte_kvargs_process(kvlist,
4977 					DLB2_MAX_CQ_DEPTH,
4978 					set_max_cq_depth,
4979 					&dlb2_args->max_cq_depth);
4980 			if (ret != 0) {
4981 				DLB2_LOG_ERR("%s: Error parsing max cq depth",
4982 					     name);
4983 				rte_kvargs_free(kvlist);
4984 				return ret;
4985 			}
4986 
4987 			ret = rte_kvargs_process(kvlist,
4988 						 DLB2_MAX_ENQ_DEPTH,
4989 						 set_max_enq_depth,
4990 						 &dlb2_args->max_enq_depth);
4991 			if (ret != 0) {
4992 				DLB2_LOG_ERR("%s: Error parsing vector opts enabled",
4993 					     name);
4994 				rte_kvargs_free(kvlist);
4995 				return ret;
4996 			}
4997 
4998 			ret = rte_kvargs_process(kvlist,
4999 					DLB2_PORT_COS,
5000 					set_port_cos,
5001 					&dlb2_args->port_cos);
5002 			if (ret != 0) {
5003 				DLB2_LOG_ERR("%s: Error parsing port cos",
5004 					     name);
5005 				rte_kvargs_free(kvlist);
5006 				return ret;
5007 			}
5008 
5009 			ret = rte_kvargs_process(kvlist,
5010 					DLB2_COS_BW,
5011 					set_cos_bw,
5012 					&dlb2_args->cos_bw);
5013 			if (ret != 0) {
5014 				DLB2_LOG_ERR("%s: Error parsing cos_bw",
5015 					     name);
5016 				rte_kvargs_free(kvlist);
5017 				return ret;
5018 			}
5019 
5020 
5021 			ret = rte_kvargs_process(kvlist,
5022 						 DLB2_PRODUCER_COREMASK,
5023 						 set_producer_coremask,
5024 						 &dlb2_args->producer_coremask);
5025 			if (ret != 0) {
5026 				DLB2_LOG_ERR(
5027 					"%s: Error parsing producer coremask",
5028 					name);
5029 				rte_kvargs_free(kvlist);
5030 				return ret;
5031 			}
5032 
5033 			ret = rte_kvargs_process(kvlist,
5034 						 DLB2_DEFAULT_LDB_PORT_ALLOCATION_ARG,
5035 						 set_default_ldb_port_allocation,
5036 						 &dlb2_args->default_ldb_port_allocation);
5037 			if (ret != 0) {
5038 				DLB2_LOG_ERR("%s: Error parsing ldb default port allocation arg",
5039 					     name);
5040 				rte_kvargs_free(kvlist);
5041 				return ret;
5042 			}
5043 
5044 			ret = rte_kvargs_process(kvlist,
5045 						 DLB2_ENABLE_CQ_WEIGHT_ARG,
5046 						 set_enable_cq_weight,
5047 						 &dlb2_args->enable_cq_weight);
5048 			if (ret != 0) {
5049 				DLB2_LOG_ERR("%s: Error parsing enable_cq_weight arg",
5050 					     name);
5051 				rte_kvargs_free(kvlist);
5052 				return ret;
5053 			}
5054 			if (version == DLB2_HW_V2 && dlb2_args->enable_cq_weight)
5055 				DLB2_LOG_INFO("Ignoring 'enable_cq_weight=y'. Only supported for 2.5 HW onwards");
5056 
5057 			rte_kvargs_free(kvlist);
5058 		}
5059 	}
5060 	return ret;
5061 }
5062 RTE_LOG_REGISTER_DEFAULT(eventdev_dlb2_log_level, NOTICE);
5063