xref: /dpdk/drivers/event/sw/sw_evdev.c (revision 88ca872150d0b61b4e6ffcb96f5cecc9e781adb5)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2017 Intel Corporation
3  */
4 
5 #include <inttypes.h>
6 #include <stdlib.h>
7 #include <string.h>
8 
9 #include <bus_vdev_driver.h>
10 #include <rte_kvargs.h>
11 #include <rte_ring.h>
12 #include <rte_errno.h>
13 #include <rte_event_ring.h>
14 #include <rte_service_component.h>
15 
16 #include "sw_evdev.h"
17 #include "iq_chunk.h"
18 #include "event_ring.h"
19 
20 #define EVENTDEV_NAME_SW_PMD event_sw
21 #define NUMA_NODE_ARG "numa_node"
22 #define SCHED_QUANTA_ARG "sched_quanta"
23 #define CREDIT_QUANTA_ARG "credit_quanta"
24 #define MIN_BURST_SIZE_ARG "min_burst"
25 #define DEQ_BURST_SIZE_ARG "deq_burst"
26 #define REFIL_ONCE_ARG "refill_once"
27 
28 static void
29 sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info);
30 
31 static int
32 sw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[],
33 		const uint8_t priorities[], uint16_t num)
34 {
35 	struct sw_port *p = port;
36 	struct sw_evdev *sw = sw_pmd_priv(dev);
37 	int i;
38 
39 	RTE_SET_USED(priorities);
40 	for (i = 0; i < num; i++) {
41 		struct sw_qid *q = &sw->qids[queues[i]];
42 		unsigned int j;
43 
44 		/* check for qid map overflow */
45 		if (q->cq_num_mapped_cqs >= RTE_DIM(q->cq_map)) {
46 			rte_errno = EDQUOT;
47 			break;
48 		}
49 
50 		if (p->is_directed && p->num_qids_mapped > 0) {
51 			rte_errno = EDQUOT;
52 			break;
53 		}
54 
55 		for (j = 0; j < q->cq_num_mapped_cqs; j++) {
56 			if (q->cq_map[j] == p->id)
57 				break;
58 		}
59 
60 		/* check if port is already linked */
61 		if (j < q->cq_num_mapped_cqs)
62 			continue;
63 
64 		if (q->type == SW_SCHED_TYPE_DIRECT) {
65 			/* check directed qids only map to one port */
66 			if (p->num_qids_mapped > 0) {
67 				rte_errno = EDQUOT;
68 				break;
69 			}
70 			/* check port only takes a directed flow */
71 			if (num > 1) {
72 				rte_errno = EDQUOT;
73 				break;
74 			}
75 
76 			p->is_directed = 1;
77 			p->num_qids_mapped = 1;
78 		} else if (q->type == RTE_SCHED_TYPE_ORDERED) {
79 			p->num_ordered_qids++;
80 			p->num_qids_mapped++;
81 		} else if (q->type == RTE_SCHED_TYPE_ATOMIC ||
82 				q->type == RTE_SCHED_TYPE_PARALLEL) {
83 			p->num_qids_mapped++;
84 		}
85 
86 		q->cq_map[q->cq_num_mapped_cqs] = p->id;
87 		rte_smp_wmb();
88 		q->cq_num_mapped_cqs++;
89 	}
90 	return i;
91 }
92 
93 static int
94 sw_port_unlink(struct rte_eventdev *dev, void *port, uint8_t queues[],
95 		uint16_t nb_unlinks)
96 {
97 	struct sw_port *p = port;
98 	struct sw_evdev *sw = sw_pmd_priv(dev);
99 	unsigned int i, j;
100 
101 	int unlinked = 0;
102 	for (i = 0; i < nb_unlinks; i++) {
103 		struct sw_qid *q = &sw->qids[queues[i]];
104 		for (j = 0; j < q->cq_num_mapped_cqs; j++) {
105 			if (q->cq_map[j] == p->id) {
106 				q->cq_map[j] =
107 					q->cq_map[q->cq_num_mapped_cqs - 1];
108 				rte_smp_wmb();
109 				q->cq_num_mapped_cqs--;
110 				unlinked++;
111 
112 				p->num_qids_mapped--;
113 
114 				if (q->type == RTE_SCHED_TYPE_ORDERED)
115 					p->num_ordered_qids--;
116 
117 				continue;
118 			}
119 		}
120 	}
121 
122 	p->unlinks_in_progress += unlinked;
123 	rte_smp_mb();
124 
125 	return unlinked;
126 }
127 
128 static int
129 sw_port_unlinks_in_progress(struct rte_eventdev *dev, void *port)
130 {
131 	RTE_SET_USED(dev);
132 	struct sw_port *p = port;
133 	return p->unlinks_in_progress;
134 }
135 
136 static int
137 sw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
138 		const struct rte_event_port_conf *conf)
139 {
140 	struct sw_evdev *sw = sw_pmd_priv(dev);
141 	struct sw_port *p = &sw->ports[port_id];
142 	char buf[RTE_RING_NAMESIZE];
143 	unsigned int i;
144 
145 	struct rte_event_dev_info info;
146 	sw_info_get(dev, &info);
147 
148 	/* detect re-configuring and return credits to instance if needed */
149 	if (p->initialized) {
150 		/* taking credits from pool is done one quanta at a time, and
151 		 * credits may be spend (counted in p->inflights) or still
152 		 * available in the port (p->inflight_credits). We must return
153 		 * the sum to no leak credits
154 		 */
155 		int possible_inflights = p->inflight_credits + p->inflights;
156 		rte_atomic32_sub(&sw->inflights, possible_inflights);
157 	}
158 
159 	*p = (struct sw_port){0}; /* zero entire structure */
160 	p->id = port_id;
161 	p->sw = sw;
162 
163 	/* check to see if rings exists - port_setup() can be called multiple
164 	 * times legally (assuming device is stopped). If ring exists, free it
165 	 * to so it gets re-created with the correct size
166 	 */
167 	snprintf(buf, sizeof(buf), "sw%d_p%u_%s", dev->data->dev_id,
168 			port_id, "rx_worker_ring");
169 	struct rte_event_ring *existing_ring = rte_event_ring_lookup(buf);
170 	rte_event_ring_free(existing_ring);
171 
172 	p->rx_worker_ring = rte_event_ring_create(buf, MAX_SW_PROD_Q_DEPTH,
173 			dev->data->socket_id,
174 			RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ);
175 	if (p->rx_worker_ring == NULL) {
176 		SW_LOG_ERR("Error creating RX worker ring for port %d",
177 				port_id);
178 		return -1;
179 	}
180 
181 	p->inflight_max = conf->new_event_threshold;
182 	p->implicit_release = !(conf->event_port_cfg &
183 				RTE_EVENT_PORT_CFG_DISABLE_IMPL_REL);
184 
185 	/* check if ring exists, same as rx_worker above */
186 	snprintf(buf, sizeof(buf), "sw%d_p%u, %s", dev->data->dev_id,
187 			port_id, "cq_worker_ring");
188 	existing_ring = rte_event_ring_lookup(buf);
189 	rte_event_ring_free(existing_ring);
190 
191 	p->cq_worker_ring = rte_event_ring_create(buf, conf->dequeue_depth,
192 			dev->data->socket_id,
193 			RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ);
194 	if (p->cq_worker_ring == NULL) {
195 		rte_event_ring_free(p->rx_worker_ring);
196 		SW_LOG_ERR("Error creating CQ worker ring for port %d",
197 				port_id);
198 		return -1;
199 	}
200 	sw->cq_ring_space[port_id] = conf->dequeue_depth;
201 
202 	/* set hist list contents to empty */
203 	for (i = 0; i < SW_PORT_HIST_LIST; i++) {
204 		p->hist_list[i].fid = -1;
205 		p->hist_list[i].qid = -1;
206 	}
207 	dev->data->ports[port_id] = p;
208 
209 	rte_smp_wmb();
210 	p->initialized = 1;
211 	return 0;
212 }
213 
214 static void
215 sw_port_release(void *port)
216 {
217 	struct sw_port *p = (void *)port;
218 	if (p == NULL)
219 		return;
220 
221 	rte_event_ring_free(p->rx_worker_ring);
222 	rte_event_ring_free(p->cq_worker_ring);
223 	memset(p, 0, sizeof(*p));
224 }
225 
226 static int32_t
227 qid_init(struct sw_evdev *sw, unsigned int idx, int type,
228 		const struct rte_event_queue_conf *queue_conf)
229 {
230 	unsigned int i;
231 	int socket_id = sw->data->socket_id;
232 	struct sw_qid *qid = &sw->qids[idx];
233 
234 	/* Initialize the FID structures to no pinning (-1), and zero packets */
235 	const struct sw_fid_t fid = {.cq = -1, .pcount = 0};
236 	for (i = 0; i < RTE_DIM(qid->fids); i++)
237 		qid->fids[i] = fid;
238 
239 	qid->id = idx;
240 	qid->type = type;
241 	qid->priority = queue_conf->priority;
242 
243 	if (qid->type == RTE_SCHED_TYPE_ORDERED) {
244 		uint32_t window_size;
245 
246 		/* rte_ring and window_size_mask require window_size to
247 		 * be a power-of-2.
248 		 */
249 		window_size = rte_align32pow2(
250 				queue_conf->nb_atomic_order_sequences);
251 
252 		qid->window_size = window_size - 1;
253 
254 		if (!window_size) {
255 			SW_LOG_DBG(
256 				"invalid reorder_window_size for ordered queue"
257 				);
258 			goto cleanup;
259 		}
260 
261 		qid->reorder_buffer = rte_zmalloc_socket(NULL,
262 				window_size * sizeof(qid->reorder_buffer[0]),
263 				0, socket_id);
264 		if (!qid->reorder_buffer) {
265 			SW_LOG_DBG("reorder_buffer malloc failed");
266 			goto cleanup;
267 		}
268 
269 		memset(&qid->reorder_buffer[0],
270 		       0,
271 		       window_size * sizeof(qid->reorder_buffer[0]));
272 
273 		qid->reorder_buffer_freelist = rob_ring_create(window_size,
274 				socket_id);
275 		if (!qid->reorder_buffer_freelist) {
276 			SW_LOG_DBG("freelist ring create failed");
277 			goto cleanup;
278 		}
279 
280 		/* Populate the freelist with reorder buffer entries. Enqueue
281 		 * 'window_size - 1' entries because the rte_ring holds only
282 		 * that many.
283 		 */
284 		for (i = 0; i < window_size - 1; i++) {
285 			if (rob_ring_enqueue(qid->reorder_buffer_freelist,
286 						&qid->reorder_buffer[i]) != 1)
287 				goto cleanup;
288 		}
289 
290 		qid->reorder_buffer_index = 0;
291 		qid->cq_next_tx = 0;
292 	}
293 
294 	qid->initialized = 1;
295 
296 	return 0;
297 
298 cleanup:
299 	if (qid->reorder_buffer) {
300 		rte_free(qid->reorder_buffer);
301 		qid->reorder_buffer = NULL;
302 	}
303 
304 	if (qid->reorder_buffer_freelist) {
305 		rob_ring_free(qid->reorder_buffer_freelist);
306 		qid->reorder_buffer_freelist = NULL;
307 	}
308 
309 	return -EINVAL;
310 }
311 
312 static void
313 sw_queue_release(struct rte_eventdev *dev, uint8_t id)
314 {
315 	struct sw_evdev *sw = sw_pmd_priv(dev);
316 	struct sw_qid *qid = &sw->qids[id];
317 
318 	if (qid->type == RTE_SCHED_TYPE_ORDERED) {
319 		rte_free(qid->reorder_buffer);
320 		rob_ring_free(qid->reorder_buffer_freelist);
321 	}
322 	memset(qid, 0, sizeof(*qid));
323 }
324 
325 static int
326 sw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id,
327 		const struct rte_event_queue_conf *conf)
328 {
329 	int type;
330 
331 	type = conf->schedule_type;
332 
333 	if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK & conf->event_queue_cfg) {
334 		type = SW_SCHED_TYPE_DIRECT;
335 	} else if (RTE_EVENT_QUEUE_CFG_ALL_TYPES
336 			& conf->event_queue_cfg) {
337 		SW_LOG_ERR("QUEUE_CFG_ALL_TYPES not supported");
338 		return -ENOTSUP;
339 	}
340 
341 	struct sw_evdev *sw = sw_pmd_priv(dev);
342 
343 	if (sw->qids[queue_id].initialized)
344 		sw_queue_release(dev, queue_id);
345 
346 	return qid_init(sw, queue_id, type, conf);
347 }
348 
349 static void
350 sw_init_qid_iqs(struct sw_evdev *sw)
351 {
352 	int i, j;
353 
354 	/* Initialize the IQ memory of all configured qids */
355 	for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
356 		struct sw_qid *qid = &sw->qids[i];
357 
358 		if (!qid->initialized)
359 			continue;
360 
361 		for (j = 0; j < SW_IQS_MAX; j++)
362 			iq_init(sw, &qid->iq[j]);
363 	}
364 }
365 
366 static int
367 sw_qids_empty(struct sw_evdev *sw)
368 {
369 	unsigned int i, j;
370 
371 	for (i = 0; i < sw->qid_count; i++) {
372 		for (j = 0; j < SW_IQS_MAX; j++) {
373 			if (iq_count(&sw->qids[i].iq[j]))
374 				return 0;
375 		}
376 	}
377 
378 	return 1;
379 }
380 
381 static int
382 sw_ports_empty(struct sw_evdev *sw)
383 {
384 	unsigned int i;
385 
386 	for (i = 0; i < sw->port_count; i++) {
387 		if ((rte_event_ring_count(sw->ports[i].rx_worker_ring)) ||
388 		     rte_event_ring_count(sw->ports[i].cq_worker_ring))
389 			return 0;
390 	}
391 
392 	return 1;
393 }
394 
395 static void
396 sw_drain_ports(struct rte_eventdev *dev)
397 {
398 	struct sw_evdev *sw = sw_pmd_priv(dev);
399 	eventdev_stop_flush_t flush;
400 	unsigned int i;
401 	uint8_t dev_id;
402 	void *arg;
403 
404 	flush = dev->dev_ops->dev_stop_flush;
405 	dev_id = dev->data->dev_id;
406 	arg = dev->data->dev_stop_flush_arg;
407 
408 	for (i = 0; i < sw->port_count; i++) {
409 		struct rte_event ev;
410 
411 		while (rte_event_dequeue_burst(dev_id, i, &ev, 1, 0)) {
412 			if (flush)
413 				flush(dev_id, ev, arg);
414 
415 			ev.op = RTE_EVENT_OP_RELEASE;
416 			rte_event_enqueue_burst(dev_id, i, &ev, 1);
417 		}
418 	}
419 }
420 
421 static void
422 sw_drain_queue(struct rte_eventdev *dev, struct sw_iq *iq)
423 {
424 	struct sw_evdev *sw = sw_pmd_priv(dev);
425 	eventdev_stop_flush_t flush;
426 	uint8_t dev_id;
427 	void *arg;
428 
429 	flush = dev->dev_ops->dev_stop_flush;
430 	dev_id = dev->data->dev_id;
431 	arg = dev->data->dev_stop_flush_arg;
432 
433 	while (iq_count(iq) > 0) {
434 		struct rte_event ev;
435 
436 		iq_dequeue_burst(sw, iq, &ev, 1);
437 
438 		if (flush)
439 			flush(dev_id, ev, arg);
440 	}
441 }
442 
443 static void
444 sw_drain_queues(struct rte_eventdev *dev)
445 {
446 	struct sw_evdev *sw = sw_pmd_priv(dev);
447 	unsigned int i, j;
448 
449 	for (i = 0; i < sw->qid_count; i++) {
450 		for (j = 0; j < SW_IQS_MAX; j++)
451 			sw_drain_queue(dev, &sw->qids[i].iq[j]);
452 	}
453 }
454 
455 static void
456 sw_clean_qid_iqs(struct rte_eventdev *dev)
457 {
458 	struct sw_evdev *sw = sw_pmd_priv(dev);
459 	int i, j;
460 
461 	/* Release the IQ memory of all configured qids */
462 	for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
463 		struct sw_qid *qid = &sw->qids[i];
464 
465 		for (j = 0; j < SW_IQS_MAX; j++) {
466 			if (!qid->iq[j].head)
467 				continue;
468 			iq_free_chunk_list(sw, qid->iq[j].head);
469 			qid->iq[j].head = NULL;
470 		}
471 	}
472 }
473 
474 static void
475 sw_queue_def_conf(struct rte_eventdev *dev, uint8_t queue_id,
476 				 struct rte_event_queue_conf *conf)
477 {
478 	RTE_SET_USED(dev);
479 	RTE_SET_USED(queue_id);
480 
481 	static const struct rte_event_queue_conf default_conf = {
482 		.nb_atomic_flows = 4096,
483 		.nb_atomic_order_sequences = 1,
484 		.schedule_type = RTE_SCHED_TYPE_ATOMIC,
485 		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
486 	};
487 
488 	*conf = default_conf;
489 }
490 
491 static void
492 sw_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
493 		 struct rte_event_port_conf *port_conf)
494 {
495 	RTE_SET_USED(dev);
496 	RTE_SET_USED(port_id);
497 
498 	port_conf->new_event_threshold = 1024;
499 	port_conf->dequeue_depth = 16;
500 	port_conf->enqueue_depth = 16;
501 	port_conf->event_port_cfg = 0;
502 }
503 
504 static int
505 sw_dev_configure(const struct rte_eventdev *dev)
506 {
507 	struct sw_evdev *sw = sw_pmd_priv(dev);
508 	const struct rte_eventdev_data *data = dev->data;
509 	const struct rte_event_dev_config *conf = &data->dev_conf;
510 	int num_chunks, i;
511 
512 	sw->qid_count = conf->nb_event_queues;
513 	sw->port_count = conf->nb_event_ports;
514 	sw->nb_events_limit = conf->nb_events_limit;
515 	rte_atomic32_set(&sw->inflights, 0);
516 
517 	/* Number of chunks sized for worst-case spread of events across IQs */
518 	num_chunks = ((SW_INFLIGHT_EVENTS_TOTAL/SW_EVS_PER_Q_CHUNK)+1) +
519 			sw->qid_count*SW_IQS_MAX*2;
520 
521 	/* If this is a reconfiguration, free the previous IQ allocation. All
522 	 * IQ chunk references were cleaned out of the QIDs in sw_stop(), and
523 	 * will be reinitialized in sw_start().
524 	 */
525 	rte_free(sw->chunks);
526 
527 	sw->chunks = rte_malloc_socket(NULL,
528 				       sizeof(struct sw_queue_chunk) *
529 				       num_chunks,
530 				       0,
531 				       sw->data->socket_id);
532 	if (!sw->chunks)
533 		return -ENOMEM;
534 
535 	sw->chunk_list_head = NULL;
536 	for (i = 0; i < num_chunks; i++)
537 		iq_free_chunk(sw, &sw->chunks[i]);
538 
539 	if (conf->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT)
540 		return -ENOTSUP;
541 
542 	return 0;
543 }
544 
545 struct rte_eth_dev;
546 
547 static int
548 sw_eth_rx_adapter_caps_get(const struct rte_eventdev *dev,
549 			const struct rte_eth_dev *eth_dev,
550 			uint32_t *caps)
551 {
552 	RTE_SET_USED(dev);
553 	RTE_SET_USED(eth_dev);
554 	*caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP;
555 	return 0;
556 }
557 
558 static int
559 sw_timer_adapter_caps_get(const struct rte_eventdev *dev, uint64_t flags,
560 			  uint32_t *caps,
561 			  const struct event_timer_adapter_ops **ops)
562 {
563 	RTE_SET_USED(dev);
564 	RTE_SET_USED(flags);
565 	*caps = RTE_EVENT_TIMER_ADAPTER_SW_CAP;
566 
567 	/* Use default SW ops */
568 	*ops = NULL;
569 
570 	return 0;
571 }
572 
573 static int
574 sw_crypto_adapter_caps_get(const struct rte_eventdev *dev,
575 			   const struct rte_cryptodev *cdev,
576 			   uint32_t *caps)
577 {
578 	RTE_SET_USED(dev);
579 	RTE_SET_USED(cdev);
580 	*caps = RTE_EVENT_CRYPTO_ADAPTER_SW_CAP;
581 	return 0;
582 }
583 
584 static void
585 sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info)
586 {
587 	RTE_SET_USED(dev);
588 
589 	static const struct rte_event_dev_info evdev_sw_info = {
590 			.driver_name = SW_PMD_NAME,
591 			.max_event_queues = RTE_EVENT_MAX_QUEUES_PER_DEV,
592 			.max_event_queue_flows = SW_QID_NUM_FIDS,
593 			.max_event_queue_priority_levels = SW_Q_PRIORITY_MAX,
594 			.max_event_priority_levels = SW_IQS_MAX,
595 			.max_event_ports = SW_PORTS_MAX,
596 			.max_event_port_dequeue_depth = MAX_SW_CONS_Q_DEPTH,
597 			.max_event_port_enqueue_depth = MAX_SW_PROD_Q_DEPTH,
598 			.max_num_events = SW_INFLIGHT_EVENTS_TOTAL,
599 			.event_dev_cap = (
600 				RTE_EVENT_DEV_CAP_ATOMIC |
601 				RTE_EVENT_DEV_CAP_ORDERED |
602 				RTE_EVENT_DEV_CAP_PARALLEL |
603 				RTE_EVENT_DEV_CAP_QUEUE_QOS |
604 				RTE_EVENT_DEV_CAP_BURST_MODE |
605 				RTE_EVENT_DEV_CAP_EVENT_QOS |
606 				RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE|
607 				RTE_EVENT_DEV_CAP_RUNTIME_PORT_LINK |
608 				RTE_EVENT_DEV_CAP_MULTIPLE_QUEUE_PORT |
609 				RTE_EVENT_DEV_CAP_NONSEQ_MODE |
610 				RTE_EVENT_DEV_CAP_CARRY_FLOW_ID |
611 				RTE_EVENT_DEV_CAP_MAINTENANCE_FREE),
612 			.max_profiles_per_port = 1,
613 	};
614 
615 	*info = evdev_sw_info;
616 }
617 
618 static void
619 sw_dump(struct rte_eventdev *dev, FILE *f)
620 {
621 	const struct sw_evdev *sw = sw_pmd_priv(dev);
622 
623 	static const char * const q_type_strings[] = {
624 			"Ordered", "Atomic", "Parallel", "Directed"
625 	};
626 	uint32_t i;
627 	fprintf(f, "EventDev %s: ports %d, qids %d\n",
628 		dev->data->name, sw->port_count, sw->qid_count);
629 
630 	fprintf(f, "\trx   %"PRIu64"\n\tdrop %"PRIu64"\n\ttx   %"PRIu64"\n",
631 		sw->stats.rx_pkts, sw->stats.rx_dropped, sw->stats.tx_pkts);
632 	fprintf(f, "\tsched calls: %"PRIu64"\n", sw->sched_called);
633 	fprintf(f, "\tsched cq/qid call: %"PRIu64"\n", sw->sched_cq_qid_called);
634 	fprintf(f, "\tsched no IQ enq: %"PRIu64"\n", sw->sched_no_iq_enqueues);
635 	fprintf(f, "\tsched no CQ enq: %"PRIu64"\n", sw->sched_no_cq_enqueues);
636 	uint32_t inflights = rte_atomic32_read(&sw->inflights);
637 	uint32_t credits = sw->nb_events_limit - inflights;
638 	fprintf(f, "\tinflight %d, credits: %d\n", inflights, credits);
639 
640 #define COL_RED "\x1b[31m"
641 #define COL_RESET "\x1b[0m"
642 
643 	for (i = 0; i < sw->port_count; i++) {
644 		int max, j;
645 		const struct sw_port *p = &sw->ports[i];
646 		if (!p->initialized) {
647 			fprintf(f, "  %sPort %d not initialized.%s\n",
648 				COL_RED, i, COL_RESET);
649 			continue;
650 		}
651 		fprintf(f, "  Port %d %s\n", i,
652 			p->is_directed ? " (SingleCons)" : "");
653 		fprintf(f, "\trx   %"PRIu64"\tdrop %"PRIu64"\ttx   %"PRIu64
654 			"\t%sinflight %d%s\n", sw->ports[i].stats.rx_pkts,
655 			sw->ports[i].stats.rx_dropped,
656 			sw->ports[i].stats.tx_pkts,
657 			(p->inflights == p->inflight_max) ?
658 				COL_RED : COL_RESET,
659 			sw->ports[i].inflights, COL_RESET);
660 
661 		fprintf(f, "\tMax New: %u"
662 			"\tAvg cycles PP: %"PRIu64"\tCredits: %u\n",
663 			sw->ports[i].inflight_max,
664 			sw->ports[i].avg_pkt_ticks,
665 			sw->ports[i].inflight_credits);
666 		fprintf(f, "\tReceive burst distribution:\n");
667 		float zp_percent = p->zero_polls * 100.0 / p->total_polls;
668 		fprintf(f, zp_percent < 10 ? "\t\t0:%.02f%% " : "\t\t0:%.0f%% ",
669 				zp_percent);
670 		for (max = (int)RTE_DIM(p->poll_buckets); max-- > 0;)
671 			if (p->poll_buckets[max] != 0)
672 				break;
673 		for (j = 0; j <= max; j++) {
674 			if (p->poll_buckets[j] != 0) {
675 				float poll_pc = p->poll_buckets[j] * 100.0 /
676 					p->total_polls;
677 				fprintf(f, "%u-%u:%.02f%% ",
678 					((j << SW_DEQ_STAT_BUCKET_SHIFT) + 1),
679 					((j+1) << SW_DEQ_STAT_BUCKET_SHIFT),
680 					poll_pc);
681 			}
682 		}
683 		fprintf(f, "\n");
684 
685 		if (p->rx_worker_ring) {
686 			uint64_t used = rte_event_ring_count(p->rx_worker_ring);
687 			uint64_t space = rte_event_ring_free_count(
688 					p->rx_worker_ring);
689 			const char *col = (space == 0) ? COL_RED : COL_RESET;
690 			fprintf(f, "\t%srx ring used: %4"PRIu64"\tfree: %4"
691 					PRIu64 COL_RESET"\n", col, used, space);
692 		} else
693 			fprintf(f, "\trx ring not initialized.\n");
694 
695 		if (p->cq_worker_ring) {
696 			uint64_t used = rte_event_ring_count(p->cq_worker_ring);
697 			uint64_t space = rte_event_ring_free_count(
698 					p->cq_worker_ring);
699 			const char *col = (space == 0) ? COL_RED : COL_RESET;
700 			fprintf(f, "\t%scq ring used: %4"PRIu64"\tfree: %4"
701 					PRIu64 COL_RESET"\n", col, used, space);
702 		} else
703 			fprintf(f, "\tcq ring not initialized.\n");
704 	}
705 
706 	for (i = 0; i < sw->qid_count; i++) {
707 		const struct sw_qid *qid = &sw->qids[i];
708 		if (!qid->initialized) {
709 			fprintf(f, "  %sQueue %d not initialized.%s\n",
710 				COL_RED, i, COL_RESET);
711 			continue;
712 		}
713 		int affinities_per_port[SW_PORTS_MAX] = {0};
714 
715 		fprintf(f, "  Queue %d (%s)\n", i, q_type_strings[qid->type]);
716 		fprintf(f, "\trx   %"PRIu64"\tdrop %"PRIu64"\ttx   %"PRIu64"\n",
717 			qid->stats.rx_pkts, qid->stats.rx_dropped,
718 			qid->stats.tx_pkts);
719 		if (qid->type == RTE_SCHED_TYPE_ORDERED) {
720 			struct rob_ring *rob_buf_free =
721 				qid->reorder_buffer_freelist;
722 			if (rob_buf_free)
723 				fprintf(f, "\tReorder entries in use: %u\n",
724 					rob_ring_free_count(rob_buf_free));
725 			else
726 				fprintf(f,
727 					"\tReorder buffer not initialized\n");
728 		}
729 
730 		uint32_t flow;
731 		for (flow = 0; flow < RTE_DIM(qid->fids); flow++)
732 			if (qid->fids[flow].cq != -1) {
733 				affinities_per_port[qid->fids[flow].cq]++;
734 			}
735 
736 		uint32_t port;
737 		fprintf(f, "\tPer Port Stats:\n");
738 		for (port = 0; port < sw->port_count; port++) {
739 			fprintf(f, "\t  Port %d: Pkts: %"PRIu64, port,
740 					qid->to_port[port]);
741 			fprintf(f, "\tFlows: %d\n", affinities_per_port[port]);
742 		}
743 
744 		uint32_t iq;
745 		uint32_t iq_printed = 0;
746 		for (iq = 0; iq < SW_IQS_MAX; iq++) {
747 			if (!qid->iq[iq].head) {
748 				fprintf(f, "\tiq %d is not initialized.\n", iq);
749 				iq_printed = 1;
750 				continue;
751 			}
752 			uint32_t used = iq_count(&qid->iq[iq]);
753 			const char *col = COL_RESET;
754 			if (used > 0) {
755 				fprintf(f, "\t%siq %d: Used %d"
756 					COL_RESET"\n", col, iq, used);
757 				iq_printed = 1;
758 			}
759 		}
760 		if (iq_printed == 0)
761 			fprintf(f, "\t-- iqs empty --\n");
762 	}
763 }
764 
765 static int
766 sw_start(struct rte_eventdev *dev)
767 {
768 	unsigned int i, j;
769 	struct sw_evdev *sw = sw_pmd_priv(dev);
770 
771 	rte_service_component_runstate_set(sw->service_id, 1);
772 
773 	/* check a service core is mapped to this service */
774 	if (!rte_service_runstate_get(sw->service_id)) {
775 		SW_LOG_ERR("Warning: No Service core enabled on service %s",
776 				sw->service_name);
777 		return -ENOENT;
778 	}
779 
780 	/* check all ports are set up */
781 	for (i = 0; i < sw->port_count; i++)
782 		if (sw->ports[i].rx_worker_ring == NULL) {
783 			SW_LOG_ERR("Port %d not configured", i);
784 			return -ESTALE;
785 		}
786 
787 	/* check all queues are configured and mapped to ports*/
788 	for (i = 0; i < sw->qid_count; i++)
789 		if (!sw->qids[i].initialized ||
790 		    sw->qids[i].cq_num_mapped_cqs == 0) {
791 			SW_LOG_ERR("Queue %d not configured", i);
792 			return -ENOLINK;
793 		}
794 
795 	/* build up our prioritized array of qids */
796 	/* We don't use qsort here, as if all/multiple entries have the same
797 	 * priority, the result is non-deterministic. From "man 3 qsort":
798 	 * "If two members compare as equal, their order in the sorted
799 	 * array is undefined."
800 	 */
801 	uint32_t qidx = 0;
802 	for (j = 0; j <= RTE_EVENT_DEV_PRIORITY_LOWEST; j++) {
803 		for (i = 0; i < sw->qid_count; i++) {
804 			if (sw->qids[i].priority == j) {
805 				sw->qids_prioritized[qidx] = &sw->qids[i];
806 				qidx++;
807 			}
808 		}
809 	}
810 
811 	sw_init_qid_iqs(sw);
812 
813 	if (sw_xstats_init(sw) < 0)
814 		return -EINVAL;
815 
816 	rte_smp_wmb();
817 	sw->started = 1;
818 
819 	return 0;
820 }
821 
822 static void
823 sw_stop(struct rte_eventdev *dev)
824 {
825 	struct sw_evdev *sw = sw_pmd_priv(dev);
826 	int32_t runstate;
827 
828 	/* Stop the scheduler if it's running */
829 	runstate = rte_service_runstate_get(sw->service_id);
830 	if (runstate == 1)
831 		rte_service_runstate_set(sw->service_id, 0);
832 
833 	while (rte_service_may_be_active(sw->service_id))
834 		rte_pause();
835 
836 	/* Flush all events out of the device */
837 	while (!(sw_qids_empty(sw) && sw_ports_empty(sw))) {
838 		sw_event_schedule(dev);
839 		sw_drain_ports(dev);
840 		sw_drain_queues(dev);
841 	}
842 
843 	sw_clean_qid_iqs(dev);
844 	sw_xstats_uninit(sw);
845 	sw->started = 0;
846 	rte_smp_wmb();
847 
848 	if (runstate == 1)
849 		rte_service_runstate_set(sw->service_id, 1);
850 }
851 
852 static int
853 sw_close(struct rte_eventdev *dev)
854 {
855 	struct sw_evdev *sw = sw_pmd_priv(dev);
856 	uint32_t i;
857 
858 	for (i = 0; i < sw->qid_count; i++)
859 		sw_queue_release(dev, i);
860 	sw->qid_count = 0;
861 
862 	for (i = 0; i < sw->port_count; i++)
863 		sw_port_release(&sw->ports[i]);
864 	sw->port_count = 0;
865 
866 	memset(&sw->stats, 0, sizeof(sw->stats));
867 	sw->sched_called = 0;
868 	sw->sched_no_iq_enqueues = 0;
869 	sw->sched_no_cq_enqueues = 0;
870 	sw->sched_cq_qid_called = 0;
871 
872 	return 0;
873 }
874 
875 static int
876 assign_numa_node(const char *key __rte_unused, const char *value, void *opaque)
877 {
878 	int *socket_id = opaque;
879 	*socket_id = atoi(value);
880 	if (*socket_id >= RTE_MAX_NUMA_NODES)
881 		return -1;
882 	return 0;
883 }
884 
885 static int
886 set_sched_quanta(const char *key __rte_unused, const char *value, void *opaque)
887 {
888 	int *quanta = opaque;
889 	*quanta = atoi(value);
890 	if (*quanta < 0 || *quanta >= 4096)
891 		return -1;
892 	return 0;
893 }
894 
895 static int
896 set_credit_quanta(const char *key __rte_unused, const char *value, void *opaque)
897 {
898 	int *credit = opaque;
899 	*credit = atoi(value);
900 	if (*credit < 0 || *credit >= 128)
901 		return -1;
902 	return 0;
903 }
904 
905 static int
906 set_deq_burst_sz(const char *key __rte_unused, const char *value, void *opaque)
907 {
908 	int *deq_burst_sz = opaque;
909 	*deq_burst_sz = atoi(value);
910 	if (*deq_burst_sz < 0 || *deq_burst_sz > SCHED_DEQUEUE_MAX_BURST_SIZE)
911 		return -1;
912 	return 0;
913 }
914 
915 static int
916 set_min_burst_sz(const char *key __rte_unused, const char *value, void *opaque)
917 {
918 	int *min_burst_sz = opaque;
919 	*min_burst_sz = atoi(value);
920 	if (*min_burst_sz < 0 || *min_burst_sz > SCHED_DEQUEUE_MAX_BURST_SIZE)
921 		return -1;
922 	return 0;
923 }
924 
925 static int
926 set_refill_once(const char *key __rte_unused, const char *value, void *opaque)
927 {
928 	int *refill_once_per_call = opaque;
929 	*refill_once_per_call = atoi(value);
930 	if (*refill_once_per_call < 0 || *refill_once_per_call > 1)
931 		return -1;
932 	return 0;
933 }
934 
935 static int32_t sw_sched_service_func(void *args)
936 {
937 	struct rte_eventdev *dev = args;
938 	return sw_event_schedule(dev);
939 }
940 
941 static int
942 sw_probe(struct rte_vdev_device *vdev)
943 {
944 	static struct eventdev_ops evdev_sw_ops = {
945 			.dev_configure = sw_dev_configure,
946 			.dev_infos_get = sw_info_get,
947 			.dev_close = sw_close,
948 			.dev_start = sw_start,
949 			.dev_stop = sw_stop,
950 			.dump = sw_dump,
951 
952 			.queue_def_conf = sw_queue_def_conf,
953 			.queue_setup = sw_queue_setup,
954 			.queue_release = sw_queue_release,
955 			.port_def_conf = sw_port_def_conf,
956 			.port_setup = sw_port_setup,
957 			.port_release = sw_port_release,
958 			.port_link = sw_port_link,
959 			.port_unlink = sw_port_unlink,
960 			.port_unlinks_in_progress = sw_port_unlinks_in_progress,
961 
962 			.eth_rx_adapter_caps_get = sw_eth_rx_adapter_caps_get,
963 
964 			.timer_adapter_caps_get = sw_timer_adapter_caps_get,
965 
966 			.crypto_adapter_caps_get = sw_crypto_adapter_caps_get,
967 
968 			.xstats_get = sw_xstats_get,
969 			.xstats_get_names = sw_xstats_get_names,
970 			.xstats_get_by_name = sw_xstats_get_by_name,
971 			.xstats_reset = sw_xstats_reset,
972 
973 			.dev_selftest = test_sw_eventdev,
974 	};
975 
976 	static const char *const args[] = {
977 		NUMA_NODE_ARG,
978 		SCHED_QUANTA_ARG,
979 		CREDIT_QUANTA_ARG,
980 		MIN_BURST_SIZE_ARG,
981 		DEQ_BURST_SIZE_ARG,
982 		REFIL_ONCE_ARG,
983 		NULL
984 	};
985 	const char *name;
986 	const char *params;
987 	struct rte_eventdev *dev;
988 	struct sw_evdev *sw;
989 	int socket_id = rte_socket_id();
990 	int sched_quanta  = SW_DEFAULT_SCHED_QUANTA;
991 	int credit_quanta = SW_DEFAULT_CREDIT_QUANTA;
992 	int min_burst_size = 1;
993 	int deq_burst_size = SCHED_DEQUEUE_DEFAULT_BURST_SIZE;
994 	int refill_once = 0;
995 
996 	name = rte_vdev_device_name(vdev);
997 	params = rte_vdev_device_args(vdev);
998 	if (params != NULL && params[0] != '\0') {
999 		struct rte_kvargs *kvlist = rte_kvargs_parse(params, args);
1000 
1001 		if (!kvlist) {
1002 			SW_LOG_INFO(
1003 				"Ignoring unsupported parameters when creating device '%s'",
1004 				name);
1005 		} else {
1006 			int ret = rte_kvargs_process(kvlist, NUMA_NODE_ARG,
1007 					assign_numa_node, &socket_id);
1008 			if (ret != 0) {
1009 				SW_LOG_ERR(
1010 					"%s: Error parsing numa node parameter",
1011 					name);
1012 				rte_kvargs_free(kvlist);
1013 				return ret;
1014 			}
1015 
1016 			ret = rte_kvargs_process(kvlist, SCHED_QUANTA_ARG,
1017 					set_sched_quanta, &sched_quanta);
1018 			if (ret != 0) {
1019 				SW_LOG_ERR(
1020 					"%s: Error parsing sched quanta parameter",
1021 					name);
1022 				rte_kvargs_free(kvlist);
1023 				return ret;
1024 			}
1025 
1026 			ret = rte_kvargs_process(kvlist, CREDIT_QUANTA_ARG,
1027 					set_credit_quanta, &credit_quanta);
1028 			if (ret != 0) {
1029 				SW_LOG_ERR(
1030 					"%s: Error parsing credit quanta parameter",
1031 					name);
1032 				rte_kvargs_free(kvlist);
1033 				return ret;
1034 			}
1035 
1036 			ret = rte_kvargs_process(kvlist, MIN_BURST_SIZE_ARG,
1037 					set_min_burst_sz, &min_burst_size);
1038 			if (ret != 0) {
1039 				SW_LOG_ERR(
1040 					"%s: Error parsing minimum burst size parameter",
1041 					name);
1042 				rte_kvargs_free(kvlist);
1043 				return ret;
1044 			}
1045 
1046 			ret = rte_kvargs_process(kvlist, DEQ_BURST_SIZE_ARG,
1047 					set_deq_burst_sz, &deq_burst_size);
1048 			if (ret != 0) {
1049 				SW_LOG_ERR(
1050 					"%s: Error parsing dequeue burst size parameter",
1051 					name);
1052 				rte_kvargs_free(kvlist);
1053 				return ret;
1054 			}
1055 
1056 			ret = rte_kvargs_process(kvlist, REFIL_ONCE_ARG,
1057 					set_refill_once, &refill_once);
1058 			if (ret != 0) {
1059 				SW_LOG_ERR(
1060 					"%s: Error parsing refill once per call switch",
1061 					name);
1062 				rte_kvargs_free(kvlist);
1063 				return ret;
1064 			}
1065 
1066 			rte_kvargs_free(kvlist);
1067 		}
1068 	}
1069 
1070 	SW_LOG_INFO(
1071 			"Creating eventdev sw device %s, numa_node=%d, "
1072 			"sched_quanta=%d, credit_quanta=%d "
1073 			"min_burst=%d, deq_burst=%d, refill_once=%d",
1074 			name, socket_id, sched_quanta, credit_quanta,
1075 			min_burst_size, deq_burst_size, refill_once);
1076 
1077 	dev = rte_event_pmd_vdev_init(name,
1078 			sizeof(struct sw_evdev), socket_id, vdev);
1079 	if (dev == NULL) {
1080 		SW_LOG_ERR("eventdev vdev init() failed");
1081 		return -EFAULT;
1082 	}
1083 	dev->dev_ops = &evdev_sw_ops;
1084 	dev->enqueue_burst = sw_event_enqueue_burst;
1085 	dev->enqueue_new_burst = sw_event_enqueue_burst;
1086 	dev->enqueue_forward_burst = sw_event_enqueue_burst;
1087 	dev->dequeue_burst = sw_event_dequeue_burst;
1088 
1089 	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
1090 		return 0;
1091 
1092 	sw = dev->data->dev_private;
1093 	sw->data = dev->data;
1094 
1095 	/* copy values passed from vdev command line to instance */
1096 	sw->credit_update_quanta = credit_quanta;
1097 	sw->sched_quanta = sched_quanta;
1098 	sw->sched_min_burst_size = min_burst_size;
1099 	sw->sched_deq_burst_size = deq_burst_size;
1100 	sw->refill_once_per_iter = refill_once;
1101 
1102 	/* register service with EAL */
1103 	struct rte_service_spec service;
1104 	memset(&service, 0, sizeof(struct rte_service_spec));
1105 	snprintf(service.name, sizeof(service.name), "%s_service", name);
1106 	snprintf(sw->service_name, sizeof(sw->service_name), "%s_service",
1107 			name);
1108 	service.socket_id = socket_id;
1109 	service.callback = sw_sched_service_func;
1110 	service.callback_userdata = (void *)dev;
1111 
1112 	int32_t ret = rte_service_component_register(&service, &sw->service_id);
1113 	if (ret) {
1114 		SW_LOG_ERR("service register() failed");
1115 		return -ENOEXEC;
1116 	}
1117 
1118 	dev->data->service_inited = 1;
1119 	dev->data->service_id = sw->service_id;
1120 
1121 	event_dev_probing_finish(dev);
1122 
1123 	return 0;
1124 }
1125 
1126 static int
1127 sw_remove(struct rte_vdev_device *vdev)
1128 {
1129 	const char *name;
1130 
1131 	name = rte_vdev_device_name(vdev);
1132 	if (name == NULL)
1133 		return -EINVAL;
1134 
1135 	SW_LOG_INFO("Closing eventdev sw device %s", name);
1136 
1137 	return rte_event_pmd_vdev_uninit(name);
1138 }
1139 
1140 static struct rte_vdev_driver evdev_sw_pmd_drv = {
1141 	.probe = sw_probe,
1142 	.remove = sw_remove
1143 };
1144 
1145 RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_SW_PMD, evdev_sw_pmd_drv);
1146 RTE_PMD_REGISTER_PARAM_STRING(event_sw, NUMA_NODE_ARG "=<int> "
1147 		SCHED_QUANTA_ARG "=<int>" CREDIT_QUANTA_ARG "=<int>"
1148 		MIN_BURST_SIZE_ARG "=<int>" DEQ_BURST_SIZE_ARG "=<int>"
1149 		REFIL_ONCE_ARG "=<int>");
1150 RTE_LOG_REGISTER_DEFAULT(eventdev_sw_log_level, NOTICE);
1151