xref: /dpdk/drivers/event/sw/sw_evdev.c (revision daa02b5cddbb8e11b31d41e2bf7bb1ae64dcae2f)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2017 Intel Corporation
3  */
4 
5 #include <inttypes.h>
6 #include <string.h>
7 
8 #include <rte_bus_vdev.h>
9 #include <rte_kvargs.h>
10 #include <rte_ring.h>
11 #include <rte_errno.h>
12 #include <rte_event_ring.h>
13 #include <rte_service_component.h>
14 
15 #include "sw_evdev.h"
16 #include "iq_chunk.h"
17 #include "event_ring.h"
18 
19 #define EVENTDEV_NAME_SW_PMD event_sw
20 #define NUMA_NODE_ARG "numa_node"
21 #define SCHED_QUANTA_ARG "sched_quanta"
22 #define CREDIT_QUANTA_ARG "credit_quanta"
23 #define MIN_BURST_SIZE_ARG "min_burst"
24 #define DEQ_BURST_SIZE_ARG "deq_burst"
25 #define REFIL_ONCE_ARG "refill_once"
26 
27 static void
28 sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info);
29 
30 static int
31 sw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[],
32 		const uint8_t priorities[], uint16_t num)
33 {
34 	struct sw_port *p = port;
35 	struct sw_evdev *sw = sw_pmd_priv(dev);
36 	int i;
37 
38 	RTE_SET_USED(priorities);
39 	for (i = 0; i < num; i++) {
40 		struct sw_qid *q = &sw->qids[queues[i]];
41 		unsigned int j;
42 
43 		/* check for qid map overflow */
44 		if (q->cq_num_mapped_cqs >= RTE_DIM(q->cq_map)) {
45 			rte_errno = EDQUOT;
46 			break;
47 		}
48 
49 		if (p->is_directed && p->num_qids_mapped > 0) {
50 			rte_errno = EDQUOT;
51 			break;
52 		}
53 
54 		for (j = 0; j < q->cq_num_mapped_cqs; j++) {
55 			if (q->cq_map[j] == p->id)
56 				break;
57 		}
58 
59 		/* check if port is already linked */
60 		if (j < q->cq_num_mapped_cqs)
61 			continue;
62 
63 		if (q->type == SW_SCHED_TYPE_DIRECT) {
64 			/* check directed qids only map to one port */
65 			if (p->num_qids_mapped > 0) {
66 				rte_errno = EDQUOT;
67 				break;
68 			}
69 			/* check port only takes a directed flow */
70 			if (num > 1) {
71 				rte_errno = EDQUOT;
72 				break;
73 			}
74 
75 			p->is_directed = 1;
76 			p->num_qids_mapped = 1;
77 		} else if (q->type == RTE_SCHED_TYPE_ORDERED) {
78 			p->num_ordered_qids++;
79 			p->num_qids_mapped++;
80 		} else if (q->type == RTE_SCHED_TYPE_ATOMIC ||
81 				q->type == RTE_SCHED_TYPE_PARALLEL) {
82 			p->num_qids_mapped++;
83 		}
84 
85 		q->cq_map[q->cq_num_mapped_cqs] = p->id;
86 		rte_smp_wmb();
87 		q->cq_num_mapped_cqs++;
88 	}
89 	return i;
90 }
91 
92 static int
93 sw_port_unlink(struct rte_eventdev *dev, void *port, uint8_t queues[],
94 		uint16_t nb_unlinks)
95 {
96 	struct sw_port *p = port;
97 	struct sw_evdev *sw = sw_pmd_priv(dev);
98 	unsigned int i, j;
99 
100 	int unlinked = 0;
101 	for (i = 0; i < nb_unlinks; i++) {
102 		struct sw_qid *q = &sw->qids[queues[i]];
103 		for (j = 0; j < q->cq_num_mapped_cqs; j++) {
104 			if (q->cq_map[j] == p->id) {
105 				q->cq_map[j] =
106 					q->cq_map[q->cq_num_mapped_cqs - 1];
107 				rte_smp_wmb();
108 				q->cq_num_mapped_cqs--;
109 				unlinked++;
110 
111 				p->num_qids_mapped--;
112 
113 				if (q->type == RTE_SCHED_TYPE_ORDERED)
114 					p->num_ordered_qids--;
115 
116 				continue;
117 			}
118 		}
119 	}
120 
121 	p->unlinks_in_progress += unlinked;
122 	rte_smp_mb();
123 
124 	return unlinked;
125 }
126 
127 static int
128 sw_port_unlinks_in_progress(struct rte_eventdev *dev, void *port)
129 {
130 	RTE_SET_USED(dev);
131 	struct sw_port *p = port;
132 	return p->unlinks_in_progress;
133 }
134 
135 static int
136 sw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
137 		const struct rte_event_port_conf *conf)
138 {
139 	struct sw_evdev *sw = sw_pmd_priv(dev);
140 	struct sw_port *p = &sw->ports[port_id];
141 	char buf[RTE_RING_NAMESIZE];
142 	unsigned int i;
143 
144 	struct rte_event_dev_info info;
145 	sw_info_get(dev, &info);
146 
147 	/* detect re-configuring and return credits to instance if needed */
148 	if (p->initialized) {
149 		/* taking credits from pool is done one quanta at a time, and
150 		 * credits may be spend (counted in p->inflights) or still
151 		 * available in the port (p->inflight_credits). We must return
152 		 * the sum to no leak credits
153 		 */
154 		int possible_inflights = p->inflight_credits + p->inflights;
155 		rte_atomic32_sub(&sw->inflights, possible_inflights);
156 	}
157 
158 	*p = (struct sw_port){0}; /* zero entire structure */
159 	p->id = port_id;
160 	p->sw = sw;
161 
162 	/* check to see if rings exists - port_setup() can be called multiple
163 	 * times legally (assuming device is stopped). If ring exists, free it
164 	 * to so it gets re-created with the correct size
165 	 */
166 	snprintf(buf, sizeof(buf), "sw%d_p%u_%s", dev->data->dev_id,
167 			port_id, "rx_worker_ring");
168 	struct rte_event_ring *existing_ring = rte_event_ring_lookup(buf);
169 	if (existing_ring)
170 		rte_event_ring_free(existing_ring);
171 
172 	p->rx_worker_ring = rte_event_ring_create(buf, MAX_SW_PROD_Q_DEPTH,
173 			dev->data->socket_id,
174 			RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ);
175 	if (p->rx_worker_ring == NULL) {
176 		SW_LOG_ERR("Error creating RX worker ring for port %d\n",
177 				port_id);
178 		return -1;
179 	}
180 
181 	p->inflight_max = conf->new_event_threshold;
182 	p->implicit_release = !(conf->event_port_cfg &
183 				RTE_EVENT_PORT_CFG_DISABLE_IMPL_REL);
184 
185 	/* check if ring exists, same as rx_worker above */
186 	snprintf(buf, sizeof(buf), "sw%d_p%u, %s", dev->data->dev_id,
187 			port_id, "cq_worker_ring");
188 	existing_ring = rte_event_ring_lookup(buf);
189 	if (existing_ring)
190 		rte_event_ring_free(existing_ring);
191 
192 	p->cq_worker_ring = rte_event_ring_create(buf, conf->dequeue_depth,
193 			dev->data->socket_id,
194 			RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ);
195 	if (p->cq_worker_ring == NULL) {
196 		rte_event_ring_free(p->rx_worker_ring);
197 		SW_LOG_ERR("Error creating CQ worker ring for port %d\n",
198 				port_id);
199 		return -1;
200 	}
201 	sw->cq_ring_space[port_id] = conf->dequeue_depth;
202 
203 	/* set hist list contents to empty */
204 	for (i = 0; i < SW_PORT_HIST_LIST; i++) {
205 		p->hist_list[i].fid = -1;
206 		p->hist_list[i].qid = -1;
207 	}
208 	dev->data->ports[port_id] = p;
209 
210 	rte_smp_wmb();
211 	p->initialized = 1;
212 	return 0;
213 }
214 
215 static void
216 sw_port_release(void *port)
217 {
218 	struct sw_port *p = (void *)port;
219 	if (p == NULL)
220 		return;
221 
222 	rte_event_ring_free(p->rx_worker_ring);
223 	rte_event_ring_free(p->cq_worker_ring);
224 	memset(p, 0, sizeof(*p));
225 }
226 
227 static int32_t
228 qid_init(struct sw_evdev *sw, unsigned int idx, int type,
229 		const struct rte_event_queue_conf *queue_conf)
230 {
231 	unsigned int i;
232 	int dev_id = sw->data->dev_id;
233 	int socket_id = sw->data->socket_id;
234 	char buf[IQ_ROB_NAMESIZE];
235 	struct sw_qid *qid = &sw->qids[idx];
236 
237 	/* Initialize the FID structures to no pinning (-1), and zero packets */
238 	const struct sw_fid_t fid = {.cq = -1, .pcount = 0};
239 	for (i = 0; i < RTE_DIM(qid->fids); i++)
240 		qid->fids[i] = fid;
241 
242 	qid->id = idx;
243 	qid->type = type;
244 	qid->priority = queue_conf->priority;
245 
246 	if (qid->type == RTE_SCHED_TYPE_ORDERED) {
247 		uint32_t window_size;
248 
249 		/* rte_ring and window_size_mask require require window_size to
250 		 * be a power-of-2.
251 		 */
252 		window_size = rte_align32pow2(
253 				queue_conf->nb_atomic_order_sequences);
254 
255 		qid->window_size = window_size - 1;
256 
257 		if (!window_size) {
258 			SW_LOG_DBG(
259 				"invalid reorder_window_size for ordered queue\n"
260 				);
261 			goto cleanup;
262 		}
263 
264 		snprintf(buf, sizeof(buf), "sw%d_iq_%d_rob", dev_id, i);
265 		qid->reorder_buffer = rte_zmalloc_socket(buf,
266 				window_size * sizeof(qid->reorder_buffer[0]),
267 				0, socket_id);
268 		if (!qid->reorder_buffer) {
269 			SW_LOG_DBG("reorder_buffer malloc failed\n");
270 			goto cleanup;
271 		}
272 
273 		memset(&qid->reorder_buffer[0],
274 		       0,
275 		       window_size * sizeof(qid->reorder_buffer[0]));
276 
277 		qid->reorder_buffer_freelist = rob_ring_create(window_size,
278 				socket_id);
279 		if (!qid->reorder_buffer_freelist) {
280 			SW_LOG_DBG("freelist ring create failed");
281 			goto cleanup;
282 		}
283 
284 		/* Populate the freelist with reorder buffer entries. Enqueue
285 		 * 'window_size - 1' entries because the rte_ring holds only
286 		 * that many.
287 		 */
288 		for (i = 0; i < window_size - 1; i++) {
289 			if (rob_ring_enqueue(qid->reorder_buffer_freelist,
290 						&qid->reorder_buffer[i]) != 1)
291 				goto cleanup;
292 		}
293 
294 		qid->reorder_buffer_index = 0;
295 		qid->cq_next_tx = 0;
296 	}
297 
298 	qid->initialized = 1;
299 
300 	return 0;
301 
302 cleanup:
303 	if (qid->reorder_buffer) {
304 		rte_free(qid->reorder_buffer);
305 		qid->reorder_buffer = NULL;
306 	}
307 
308 	if (qid->reorder_buffer_freelist) {
309 		rob_ring_free(qid->reorder_buffer_freelist);
310 		qid->reorder_buffer_freelist = NULL;
311 	}
312 
313 	return -EINVAL;
314 }
315 
316 static void
317 sw_queue_release(struct rte_eventdev *dev, uint8_t id)
318 {
319 	struct sw_evdev *sw = sw_pmd_priv(dev);
320 	struct sw_qid *qid = &sw->qids[id];
321 
322 	if (qid->type == RTE_SCHED_TYPE_ORDERED) {
323 		rte_free(qid->reorder_buffer);
324 		rob_ring_free(qid->reorder_buffer_freelist);
325 	}
326 	memset(qid, 0, sizeof(*qid));
327 }
328 
329 static int
330 sw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id,
331 		const struct rte_event_queue_conf *conf)
332 {
333 	int type;
334 
335 	type = conf->schedule_type;
336 
337 	if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK & conf->event_queue_cfg) {
338 		type = SW_SCHED_TYPE_DIRECT;
339 	} else if (RTE_EVENT_QUEUE_CFG_ALL_TYPES
340 			& conf->event_queue_cfg) {
341 		SW_LOG_ERR("QUEUE_CFG_ALL_TYPES not supported\n");
342 		return -ENOTSUP;
343 	}
344 
345 	struct sw_evdev *sw = sw_pmd_priv(dev);
346 
347 	if (sw->qids[queue_id].initialized)
348 		sw_queue_release(dev, queue_id);
349 
350 	return qid_init(sw, queue_id, type, conf);
351 }
352 
353 static void
354 sw_init_qid_iqs(struct sw_evdev *sw)
355 {
356 	int i, j;
357 
358 	/* Initialize the IQ memory of all configured qids */
359 	for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
360 		struct sw_qid *qid = &sw->qids[i];
361 
362 		if (!qid->initialized)
363 			continue;
364 
365 		for (j = 0; j < SW_IQS_MAX; j++)
366 			iq_init(sw, &qid->iq[j]);
367 	}
368 }
369 
370 static int
371 sw_qids_empty(struct sw_evdev *sw)
372 {
373 	unsigned int i, j;
374 
375 	for (i = 0; i < sw->qid_count; i++) {
376 		for (j = 0; j < SW_IQS_MAX; j++) {
377 			if (iq_count(&sw->qids[i].iq[j]))
378 				return 0;
379 		}
380 	}
381 
382 	return 1;
383 }
384 
385 static int
386 sw_ports_empty(struct sw_evdev *sw)
387 {
388 	unsigned int i;
389 
390 	for (i = 0; i < sw->port_count; i++) {
391 		if ((rte_event_ring_count(sw->ports[i].rx_worker_ring)) ||
392 		     rte_event_ring_count(sw->ports[i].cq_worker_ring))
393 			return 0;
394 	}
395 
396 	return 1;
397 }
398 
399 static void
400 sw_drain_ports(struct rte_eventdev *dev)
401 {
402 	struct sw_evdev *sw = sw_pmd_priv(dev);
403 	eventdev_stop_flush_t flush;
404 	unsigned int i;
405 	uint8_t dev_id;
406 	void *arg;
407 
408 	flush = dev->dev_ops->dev_stop_flush;
409 	dev_id = dev->data->dev_id;
410 	arg = dev->data->dev_stop_flush_arg;
411 
412 	for (i = 0; i < sw->port_count; i++) {
413 		struct rte_event ev;
414 
415 		while (rte_event_dequeue_burst(dev_id, i, &ev, 1, 0)) {
416 			if (flush)
417 				flush(dev_id, ev, arg);
418 
419 			ev.op = RTE_EVENT_OP_RELEASE;
420 			rte_event_enqueue_burst(dev_id, i, &ev, 1);
421 		}
422 	}
423 }
424 
425 static void
426 sw_drain_queue(struct rte_eventdev *dev, struct sw_iq *iq)
427 {
428 	struct sw_evdev *sw = sw_pmd_priv(dev);
429 	eventdev_stop_flush_t flush;
430 	uint8_t dev_id;
431 	void *arg;
432 
433 	flush = dev->dev_ops->dev_stop_flush;
434 	dev_id = dev->data->dev_id;
435 	arg = dev->data->dev_stop_flush_arg;
436 
437 	while (iq_count(iq) > 0) {
438 		struct rte_event ev;
439 
440 		iq_dequeue_burst(sw, iq, &ev, 1);
441 
442 		if (flush)
443 			flush(dev_id, ev, arg);
444 	}
445 }
446 
447 static void
448 sw_drain_queues(struct rte_eventdev *dev)
449 {
450 	struct sw_evdev *sw = sw_pmd_priv(dev);
451 	unsigned int i, j;
452 
453 	for (i = 0; i < sw->qid_count; i++) {
454 		for (j = 0; j < SW_IQS_MAX; j++)
455 			sw_drain_queue(dev, &sw->qids[i].iq[j]);
456 	}
457 }
458 
459 static void
460 sw_clean_qid_iqs(struct rte_eventdev *dev)
461 {
462 	struct sw_evdev *sw = sw_pmd_priv(dev);
463 	int i, j;
464 
465 	/* Release the IQ memory of all configured qids */
466 	for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
467 		struct sw_qid *qid = &sw->qids[i];
468 
469 		for (j = 0; j < SW_IQS_MAX; j++) {
470 			if (!qid->iq[j].head)
471 				continue;
472 			iq_free_chunk_list(sw, qid->iq[j].head);
473 			qid->iq[j].head = NULL;
474 		}
475 	}
476 }
477 
478 static void
479 sw_queue_def_conf(struct rte_eventdev *dev, uint8_t queue_id,
480 				 struct rte_event_queue_conf *conf)
481 {
482 	RTE_SET_USED(dev);
483 	RTE_SET_USED(queue_id);
484 
485 	static const struct rte_event_queue_conf default_conf = {
486 		.nb_atomic_flows = 4096,
487 		.nb_atomic_order_sequences = 1,
488 		.schedule_type = RTE_SCHED_TYPE_ATOMIC,
489 		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
490 	};
491 
492 	*conf = default_conf;
493 }
494 
495 static void
496 sw_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
497 		 struct rte_event_port_conf *port_conf)
498 {
499 	RTE_SET_USED(dev);
500 	RTE_SET_USED(port_id);
501 
502 	port_conf->new_event_threshold = 1024;
503 	port_conf->dequeue_depth = 16;
504 	port_conf->enqueue_depth = 16;
505 	port_conf->event_port_cfg = 0;
506 }
507 
508 static int
509 sw_dev_configure(const struct rte_eventdev *dev)
510 {
511 	struct sw_evdev *sw = sw_pmd_priv(dev);
512 	const struct rte_eventdev_data *data = dev->data;
513 	const struct rte_event_dev_config *conf = &data->dev_conf;
514 	int num_chunks, i;
515 
516 	sw->qid_count = conf->nb_event_queues;
517 	sw->port_count = conf->nb_event_ports;
518 	sw->nb_events_limit = conf->nb_events_limit;
519 	rte_atomic32_set(&sw->inflights, 0);
520 
521 	/* Number of chunks sized for worst-case spread of events across IQs */
522 	num_chunks = ((SW_INFLIGHT_EVENTS_TOTAL/SW_EVS_PER_Q_CHUNK)+1) +
523 			sw->qid_count*SW_IQS_MAX*2;
524 
525 	/* If this is a reconfiguration, free the previous IQ allocation. All
526 	 * IQ chunk references were cleaned out of the QIDs in sw_stop(), and
527 	 * will be reinitialized in sw_start().
528 	 */
529 	if (sw->chunks)
530 		rte_free(sw->chunks);
531 
532 	sw->chunks = rte_malloc_socket(NULL,
533 				       sizeof(struct sw_queue_chunk) *
534 				       num_chunks,
535 				       0,
536 				       sw->data->socket_id);
537 	if (!sw->chunks)
538 		return -ENOMEM;
539 
540 	sw->chunk_list_head = NULL;
541 	for (i = 0; i < num_chunks; i++)
542 		iq_free_chunk(sw, &sw->chunks[i]);
543 
544 	if (conf->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT)
545 		return -ENOTSUP;
546 
547 	return 0;
548 }
549 
550 struct rte_eth_dev;
551 
552 static int
553 sw_eth_rx_adapter_caps_get(const struct rte_eventdev *dev,
554 			const struct rte_eth_dev *eth_dev,
555 			uint32_t *caps)
556 {
557 	RTE_SET_USED(dev);
558 	RTE_SET_USED(eth_dev);
559 	*caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP;
560 	return 0;
561 }
562 
563 static int
564 sw_timer_adapter_caps_get(const struct rte_eventdev *dev, uint64_t flags,
565 			  uint32_t *caps,
566 			  const struct event_timer_adapter_ops **ops)
567 {
568 	RTE_SET_USED(dev);
569 	RTE_SET_USED(flags);
570 	*caps = 0;
571 
572 	/* Use default SW ops */
573 	*ops = NULL;
574 
575 	return 0;
576 }
577 
578 static int
579 sw_crypto_adapter_caps_get(const struct rte_eventdev *dev,
580 			   const struct rte_cryptodev *cdev,
581 			   uint32_t *caps)
582 {
583 	RTE_SET_USED(dev);
584 	RTE_SET_USED(cdev);
585 	*caps = RTE_EVENT_CRYPTO_ADAPTER_SW_CAP;
586 	return 0;
587 }
588 
589 static void
590 sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info)
591 {
592 	RTE_SET_USED(dev);
593 
594 	static const struct rte_event_dev_info evdev_sw_info = {
595 			.driver_name = SW_PMD_NAME,
596 			.max_event_queues = RTE_EVENT_MAX_QUEUES_PER_DEV,
597 			.max_event_queue_flows = SW_QID_NUM_FIDS,
598 			.max_event_queue_priority_levels = SW_Q_PRIORITY_MAX,
599 			.max_event_priority_levels = SW_IQS_MAX,
600 			.max_event_ports = SW_PORTS_MAX,
601 			.max_event_port_dequeue_depth = MAX_SW_CONS_Q_DEPTH,
602 			.max_event_port_enqueue_depth = MAX_SW_PROD_Q_DEPTH,
603 			.max_num_events = SW_INFLIGHT_EVENTS_TOTAL,
604 			.event_dev_cap = (
605 				RTE_EVENT_DEV_CAP_QUEUE_QOS |
606 				RTE_EVENT_DEV_CAP_BURST_MODE |
607 				RTE_EVENT_DEV_CAP_EVENT_QOS |
608 				RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE|
609 				RTE_EVENT_DEV_CAP_RUNTIME_PORT_LINK |
610 				RTE_EVENT_DEV_CAP_MULTIPLE_QUEUE_PORT |
611 				RTE_EVENT_DEV_CAP_NONSEQ_MODE |
612 				RTE_EVENT_DEV_CAP_CARRY_FLOW_ID),
613 	};
614 
615 	*info = evdev_sw_info;
616 }
617 
618 static void
619 sw_dump(struct rte_eventdev *dev, FILE *f)
620 {
621 	const struct sw_evdev *sw = sw_pmd_priv(dev);
622 
623 	static const char * const q_type_strings[] = {
624 			"Ordered", "Atomic", "Parallel", "Directed"
625 	};
626 	uint32_t i;
627 	fprintf(f, "EventDev %s: ports %d, qids %d\n", "todo-fix-name",
628 			sw->port_count, sw->qid_count);
629 
630 	fprintf(f, "\trx   %"PRIu64"\n\tdrop %"PRIu64"\n\ttx   %"PRIu64"\n",
631 		sw->stats.rx_pkts, sw->stats.rx_dropped, sw->stats.tx_pkts);
632 	fprintf(f, "\tsched calls: %"PRIu64"\n", sw->sched_called);
633 	fprintf(f, "\tsched cq/qid call: %"PRIu64"\n", sw->sched_cq_qid_called);
634 	fprintf(f, "\tsched no IQ enq: %"PRIu64"\n", sw->sched_no_iq_enqueues);
635 	fprintf(f, "\tsched no CQ enq: %"PRIu64"\n", sw->sched_no_cq_enqueues);
636 	uint32_t inflights = rte_atomic32_read(&sw->inflights);
637 	uint32_t credits = sw->nb_events_limit - inflights;
638 	fprintf(f, "\tinflight %d, credits: %d\n", inflights, credits);
639 
640 #define COL_RED "\x1b[31m"
641 #define COL_RESET "\x1b[0m"
642 
643 	for (i = 0; i < sw->port_count; i++) {
644 		int max, j;
645 		const struct sw_port *p = &sw->ports[i];
646 		if (!p->initialized) {
647 			fprintf(f, "  %sPort %d not initialized.%s\n",
648 				COL_RED, i, COL_RESET);
649 			continue;
650 		}
651 		fprintf(f, "  Port %d %s\n", i,
652 			p->is_directed ? " (SingleCons)" : "");
653 		fprintf(f, "\trx   %"PRIu64"\tdrop %"PRIu64"\ttx   %"PRIu64
654 			"\t%sinflight %d%s\n", sw->ports[i].stats.rx_pkts,
655 			sw->ports[i].stats.rx_dropped,
656 			sw->ports[i].stats.tx_pkts,
657 			(p->inflights == p->inflight_max) ?
658 				COL_RED : COL_RESET,
659 			sw->ports[i].inflights, COL_RESET);
660 
661 		fprintf(f, "\tMax New: %u"
662 			"\tAvg cycles PP: %"PRIu64"\tCredits: %u\n",
663 			sw->ports[i].inflight_max,
664 			sw->ports[i].avg_pkt_ticks,
665 			sw->ports[i].inflight_credits);
666 		fprintf(f, "\tReceive burst distribution:\n");
667 		float zp_percent = p->zero_polls * 100.0 / p->total_polls;
668 		fprintf(f, zp_percent < 10 ? "\t\t0:%.02f%% " : "\t\t0:%.0f%% ",
669 				zp_percent);
670 		for (max = (int)RTE_DIM(p->poll_buckets); max-- > 0;)
671 			if (p->poll_buckets[max] != 0)
672 				break;
673 		for (j = 0; j <= max; j++) {
674 			if (p->poll_buckets[j] != 0) {
675 				float poll_pc = p->poll_buckets[j] * 100.0 /
676 					p->total_polls;
677 				fprintf(f, "%u-%u:%.02f%% ",
678 					((j << SW_DEQ_STAT_BUCKET_SHIFT) + 1),
679 					((j+1) << SW_DEQ_STAT_BUCKET_SHIFT),
680 					poll_pc);
681 			}
682 		}
683 		fprintf(f, "\n");
684 
685 		if (p->rx_worker_ring) {
686 			uint64_t used = rte_event_ring_count(p->rx_worker_ring);
687 			uint64_t space = rte_event_ring_free_count(
688 					p->rx_worker_ring);
689 			const char *col = (space == 0) ? COL_RED : COL_RESET;
690 			fprintf(f, "\t%srx ring used: %4"PRIu64"\tfree: %4"
691 					PRIu64 COL_RESET"\n", col, used, space);
692 		} else
693 			fprintf(f, "\trx ring not initialized.\n");
694 
695 		if (p->cq_worker_ring) {
696 			uint64_t used = rte_event_ring_count(p->cq_worker_ring);
697 			uint64_t space = rte_event_ring_free_count(
698 					p->cq_worker_ring);
699 			const char *col = (space == 0) ? COL_RED : COL_RESET;
700 			fprintf(f, "\t%scq ring used: %4"PRIu64"\tfree: %4"
701 					PRIu64 COL_RESET"\n", col, used, space);
702 		} else
703 			fprintf(f, "\tcq ring not initialized.\n");
704 	}
705 
706 	for (i = 0; i < sw->qid_count; i++) {
707 		const struct sw_qid *qid = &sw->qids[i];
708 		if (!qid->initialized) {
709 			fprintf(f, "  %sQueue %d not initialized.%s\n",
710 				COL_RED, i, COL_RESET);
711 			continue;
712 		}
713 		int affinities_per_port[SW_PORTS_MAX] = {0};
714 		uint32_t inflights = 0;
715 
716 		fprintf(f, "  Queue %d (%s)\n", i, q_type_strings[qid->type]);
717 		fprintf(f, "\trx   %"PRIu64"\tdrop %"PRIu64"\ttx   %"PRIu64"\n",
718 			qid->stats.rx_pkts, qid->stats.rx_dropped,
719 			qid->stats.tx_pkts);
720 		if (qid->type == RTE_SCHED_TYPE_ORDERED) {
721 			struct rob_ring *rob_buf_free =
722 				qid->reorder_buffer_freelist;
723 			if (rob_buf_free)
724 				fprintf(f, "\tReorder entries in use: %u\n",
725 					rob_ring_free_count(rob_buf_free));
726 			else
727 				fprintf(f,
728 					"\tReorder buffer not initialized\n");
729 		}
730 
731 		uint32_t flow;
732 		for (flow = 0; flow < RTE_DIM(qid->fids); flow++)
733 			if (qid->fids[flow].cq != -1) {
734 				affinities_per_port[qid->fids[flow].cq]++;
735 				inflights += qid->fids[flow].pcount;
736 			}
737 
738 		uint32_t port;
739 		fprintf(f, "\tPer Port Stats:\n");
740 		for (port = 0; port < sw->port_count; port++) {
741 			fprintf(f, "\t  Port %d: Pkts: %"PRIu64, port,
742 					qid->to_port[port]);
743 			fprintf(f, "\tFlows: %d\n", affinities_per_port[port]);
744 		}
745 
746 		uint32_t iq;
747 		uint32_t iq_printed = 0;
748 		for (iq = 0; iq < SW_IQS_MAX; iq++) {
749 			if (!qid->iq[iq].head) {
750 				fprintf(f, "\tiq %d is not initialized.\n", iq);
751 				iq_printed = 1;
752 				continue;
753 			}
754 			uint32_t used = iq_count(&qid->iq[iq]);
755 			const char *col = COL_RESET;
756 			if (used > 0) {
757 				fprintf(f, "\t%siq %d: Used %d"
758 					COL_RESET"\n", col, iq, used);
759 				iq_printed = 1;
760 			}
761 		}
762 		if (iq_printed == 0)
763 			fprintf(f, "\t-- iqs empty --\n");
764 	}
765 }
766 
767 static int
768 sw_start(struct rte_eventdev *dev)
769 {
770 	unsigned int i, j;
771 	struct sw_evdev *sw = sw_pmd_priv(dev);
772 
773 	rte_service_component_runstate_set(sw->service_id, 1);
774 
775 	/* check a service core is mapped to this service */
776 	if (!rte_service_runstate_get(sw->service_id)) {
777 		SW_LOG_ERR("Warning: No Service core enabled on service %s\n",
778 				sw->service_name);
779 		return -ENOENT;
780 	}
781 
782 	/* check all ports are set up */
783 	for (i = 0; i < sw->port_count; i++)
784 		if (sw->ports[i].rx_worker_ring == NULL) {
785 			SW_LOG_ERR("Port %d not configured\n", i);
786 			return -ESTALE;
787 		}
788 
789 	/* check all queues are configured and mapped to ports*/
790 	for (i = 0; i < sw->qid_count; i++)
791 		if (!sw->qids[i].initialized ||
792 		    sw->qids[i].cq_num_mapped_cqs == 0) {
793 			SW_LOG_ERR("Queue %d not configured\n", i);
794 			return -ENOLINK;
795 		}
796 
797 	/* build up our prioritized array of qids */
798 	/* We don't use qsort here, as if all/multiple entries have the same
799 	 * priority, the result is non-deterministic. From "man 3 qsort":
800 	 * "If two members compare as equal, their order in the sorted
801 	 * array is undefined."
802 	 */
803 	uint32_t qidx = 0;
804 	for (j = 0; j <= RTE_EVENT_DEV_PRIORITY_LOWEST; j++) {
805 		for (i = 0; i < sw->qid_count; i++) {
806 			if (sw->qids[i].priority == j) {
807 				sw->qids_prioritized[qidx] = &sw->qids[i];
808 				qidx++;
809 			}
810 		}
811 	}
812 
813 	sw_init_qid_iqs(sw);
814 
815 	if (sw_xstats_init(sw) < 0)
816 		return -EINVAL;
817 
818 	rte_smp_wmb();
819 	sw->started = 1;
820 
821 	return 0;
822 }
823 
824 static void
825 sw_stop(struct rte_eventdev *dev)
826 {
827 	struct sw_evdev *sw = sw_pmd_priv(dev);
828 	int32_t runstate;
829 
830 	/* Stop the scheduler if it's running */
831 	runstate = rte_service_runstate_get(sw->service_id);
832 	if (runstate == 1)
833 		rte_service_runstate_set(sw->service_id, 0);
834 
835 	while (rte_service_may_be_active(sw->service_id))
836 		rte_pause();
837 
838 	/* Flush all events out of the device */
839 	while (!(sw_qids_empty(sw) && sw_ports_empty(sw))) {
840 		sw_event_schedule(dev);
841 		sw_drain_ports(dev);
842 		sw_drain_queues(dev);
843 	}
844 
845 	sw_clean_qid_iqs(dev);
846 	sw_xstats_uninit(sw);
847 	sw->started = 0;
848 	rte_smp_wmb();
849 
850 	if (runstate == 1)
851 		rte_service_runstate_set(sw->service_id, 1);
852 }
853 
854 static int
855 sw_close(struct rte_eventdev *dev)
856 {
857 	struct sw_evdev *sw = sw_pmd_priv(dev);
858 	uint32_t i;
859 
860 	for (i = 0; i < sw->qid_count; i++)
861 		sw_queue_release(dev, i);
862 	sw->qid_count = 0;
863 
864 	for (i = 0; i < sw->port_count; i++)
865 		sw_port_release(&sw->ports[i]);
866 	sw->port_count = 0;
867 
868 	memset(&sw->stats, 0, sizeof(sw->stats));
869 	sw->sched_called = 0;
870 	sw->sched_no_iq_enqueues = 0;
871 	sw->sched_no_cq_enqueues = 0;
872 	sw->sched_cq_qid_called = 0;
873 
874 	return 0;
875 }
876 
877 static int
878 assign_numa_node(const char *key __rte_unused, const char *value, void *opaque)
879 {
880 	int *socket_id = opaque;
881 	*socket_id = atoi(value);
882 	if (*socket_id >= RTE_MAX_NUMA_NODES)
883 		return -1;
884 	return 0;
885 }
886 
887 static int
888 set_sched_quanta(const char *key __rte_unused, const char *value, void *opaque)
889 {
890 	int *quanta = opaque;
891 	*quanta = atoi(value);
892 	if (*quanta < 0 || *quanta >= 4096)
893 		return -1;
894 	return 0;
895 }
896 
897 static int
898 set_credit_quanta(const char *key __rte_unused, const char *value, void *opaque)
899 {
900 	int *credit = opaque;
901 	*credit = atoi(value);
902 	if (*credit < 0 || *credit >= 128)
903 		return -1;
904 	return 0;
905 }
906 
907 static int
908 set_deq_burst_sz(const char *key __rte_unused, const char *value, void *opaque)
909 {
910 	int *deq_burst_sz = opaque;
911 	*deq_burst_sz = atoi(value);
912 	if (*deq_burst_sz < 0 || *deq_burst_sz > SCHED_DEQUEUE_MAX_BURST_SIZE)
913 		return -1;
914 	return 0;
915 }
916 
917 static int
918 set_min_burst_sz(const char *key __rte_unused, const char *value, void *opaque)
919 {
920 	int *min_burst_sz = opaque;
921 	*min_burst_sz = atoi(value);
922 	if (*min_burst_sz < 0 || *min_burst_sz > SCHED_DEQUEUE_MAX_BURST_SIZE)
923 		return -1;
924 	return 0;
925 }
926 
927 static int
928 set_refill_once(const char *key __rte_unused, const char *value, void *opaque)
929 {
930 	int *refill_once_per_call = opaque;
931 	*refill_once_per_call = atoi(value);
932 	if (*refill_once_per_call < 0 || *refill_once_per_call > 1)
933 		return -1;
934 	return 0;
935 }
936 
937 static int32_t sw_sched_service_func(void *args)
938 {
939 	struct rte_eventdev *dev = args;
940 	sw_event_schedule(dev);
941 	return 0;
942 }
943 
944 static int
945 sw_probe(struct rte_vdev_device *vdev)
946 {
947 	static struct eventdev_ops evdev_sw_ops = {
948 			.dev_configure = sw_dev_configure,
949 			.dev_infos_get = sw_info_get,
950 			.dev_close = sw_close,
951 			.dev_start = sw_start,
952 			.dev_stop = sw_stop,
953 			.dump = sw_dump,
954 
955 			.queue_def_conf = sw_queue_def_conf,
956 			.queue_setup = sw_queue_setup,
957 			.queue_release = sw_queue_release,
958 			.port_def_conf = sw_port_def_conf,
959 			.port_setup = sw_port_setup,
960 			.port_release = sw_port_release,
961 			.port_link = sw_port_link,
962 			.port_unlink = sw_port_unlink,
963 			.port_unlinks_in_progress = sw_port_unlinks_in_progress,
964 
965 			.eth_rx_adapter_caps_get = sw_eth_rx_adapter_caps_get,
966 
967 			.timer_adapter_caps_get = sw_timer_adapter_caps_get,
968 
969 			.crypto_adapter_caps_get = sw_crypto_adapter_caps_get,
970 
971 			.xstats_get = sw_xstats_get,
972 			.xstats_get_names = sw_xstats_get_names,
973 			.xstats_get_by_name = sw_xstats_get_by_name,
974 			.xstats_reset = sw_xstats_reset,
975 
976 			.dev_selftest = test_sw_eventdev,
977 	};
978 
979 	static const char *const args[] = {
980 		NUMA_NODE_ARG,
981 		SCHED_QUANTA_ARG,
982 		CREDIT_QUANTA_ARG,
983 		MIN_BURST_SIZE_ARG,
984 		DEQ_BURST_SIZE_ARG,
985 		REFIL_ONCE_ARG,
986 		NULL
987 	};
988 	const char *name;
989 	const char *params;
990 	struct rte_eventdev *dev;
991 	struct sw_evdev *sw;
992 	int socket_id = rte_socket_id();
993 	int sched_quanta  = SW_DEFAULT_SCHED_QUANTA;
994 	int credit_quanta = SW_DEFAULT_CREDIT_QUANTA;
995 	int min_burst_size = 1;
996 	int deq_burst_size = SCHED_DEQUEUE_DEFAULT_BURST_SIZE;
997 	int refill_once = 0;
998 
999 	name = rte_vdev_device_name(vdev);
1000 	params = rte_vdev_device_args(vdev);
1001 	if (params != NULL && params[0] != '\0') {
1002 		struct rte_kvargs *kvlist = rte_kvargs_parse(params, args);
1003 
1004 		if (!kvlist) {
1005 			SW_LOG_INFO(
1006 				"Ignoring unsupported parameters when creating device '%s'\n",
1007 				name);
1008 		} else {
1009 			int ret = rte_kvargs_process(kvlist, NUMA_NODE_ARG,
1010 					assign_numa_node, &socket_id);
1011 			if (ret != 0) {
1012 				SW_LOG_ERR(
1013 					"%s: Error parsing numa node parameter",
1014 					name);
1015 				rte_kvargs_free(kvlist);
1016 				return ret;
1017 			}
1018 
1019 			ret = rte_kvargs_process(kvlist, SCHED_QUANTA_ARG,
1020 					set_sched_quanta, &sched_quanta);
1021 			if (ret != 0) {
1022 				SW_LOG_ERR(
1023 					"%s: Error parsing sched quanta parameter",
1024 					name);
1025 				rte_kvargs_free(kvlist);
1026 				return ret;
1027 			}
1028 
1029 			ret = rte_kvargs_process(kvlist, CREDIT_QUANTA_ARG,
1030 					set_credit_quanta, &credit_quanta);
1031 			if (ret != 0) {
1032 				SW_LOG_ERR(
1033 					"%s: Error parsing credit quanta parameter",
1034 					name);
1035 				rte_kvargs_free(kvlist);
1036 				return ret;
1037 			}
1038 
1039 			ret = rte_kvargs_process(kvlist, MIN_BURST_SIZE_ARG,
1040 					set_min_burst_sz, &min_burst_size);
1041 			if (ret != 0) {
1042 				SW_LOG_ERR(
1043 					"%s: Error parsing minimum burst size parameter",
1044 					name);
1045 				rte_kvargs_free(kvlist);
1046 				return ret;
1047 			}
1048 
1049 			ret = rte_kvargs_process(kvlist, DEQ_BURST_SIZE_ARG,
1050 					set_deq_burst_sz, &deq_burst_size);
1051 			if (ret != 0) {
1052 				SW_LOG_ERR(
1053 					"%s: Error parsing dequeue burst size parameter",
1054 					name);
1055 				rte_kvargs_free(kvlist);
1056 				return ret;
1057 			}
1058 
1059 			ret = rte_kvargs_process(kvlist, REFIL_ONCE_ARG,
1060 					set_refill_once, &refill_once);
1061 			if (ret != 0) {
1062 				SW_LOG_ERR(
1063 					"%s: Error parsing refill once per call switch",
1064 					name);
1065 				rte_kvargs_free(kvlist);
1066 				return ret;
1067 			}
1068 
1069 			rte_kvargs_free(kvlist);
1070 		}
1071 	}
1072 
1073 	SW_LOG_INFO(
1074 			"Creating eventdev sw device %s, numa_node=%d, "
1075 			"sched_quanta=%d, credit_quanta=%d "
1076 			"min_burst=%d, deq_burst=%d, refill_once=%d\n",
1077 			name, socket_id, sched_quanta, credit_quanta,
1078 			min_burst_size, deq_burst_size, refill_once);
1079 
1080 	dev = rte_event_pmd_vdev_init(name,
1081 			sizeof(struct sw_evdev), socket_id);
1082 	if (dev == NULL) {
1083 		SW_LOG_ERR("eventdev vdev init() failed");
1084 		return -EFAULT;
1085 	}
1086 	dev->dev_ops = &evdev_sw_ops;
1087 	dev->enqueue = sw_event_enqueue;
1088 	dev->enqueue_burst = sw_event_enqueue_burst;
1089 	dev->enqueue_new_burst = sw_event_enqueue_burst;
1090 	dev->enqueue_forward_burst = sw_event_enqueue_burst;
1091 	dev->dequeue = sw_event_dequeue;
1092 	dev->dequeue_burst = sw_event_dequeue_burst;
1093 
1094 	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
1095 		return 0;
1096 
1097 	sw = dev->data->dev_private;
1098 	sw->data = dev->data;
1099 
1100 	/* copy values passed from vdev command line to instance */
1101 	sw->credit_update_quanta = credit_quanta;
1102 	sw->sched_quanta = sched_quanta;
1103 	sw->sched_min_burst_size = min_burst_size;
1104 	sw->sched_deq_burst_size = deq_burst_size;
1105 	sw->refill_once_per_iter = refill_once;
1106 
1107 	/* register service with EAL */
1108 	struct rte_service_spec service;
1109 	memset(&service, 0, sizeof(struct rte_service_spec));
1110 	snprintf(service.name, sizeof(service.name), "%s_service", name);
1111 	snprintf(sw->service_name, sizeof(sw->service_name), "%s_service",
1112 			name);
1113 	service.socket_id = socket_id;
1114 	service.callback = sw_sched_service_func;
1115 	service.callback_userdata = (void *)dev;
1116 
1117 	int32_t ret = rte_service_component_register(&service, &sw->service_id);
1118 	if (ret) {
1119 		SW_LOG_ERR("service register() failed");
1120 		return -ENOEXEC;
1121 	}
1122 
1123 	dev->data->service_inited = 1;
1124 	dev->data->service_id = sw->service_id;
1125 
1126 	event_dev_probing_finish(dev);
1127 
1128 	return 0;
1129 }
1130 
1131 static int
1132 sw_remove(struct rte_vdev_device *vdev)
1133 {
1134 	const char *name;
1135 
1136 	name = rte_vdev_device_name(vdev);
1137 	if (name == NULL)
1138 		return -EINVAL;
1139 
1140 	SW_LOG_INFO("Closing eventdev sw device %s\n", name);
1141 
1142 	return rte_event_pmd_vdev_uninit(name);
1143 }
1144 
1145 static struct rte_vdev_driver evdev_sw_pmd_drv = {
1146 	.probe = sw_probe,
1147 	.remove = sw_remove
1148 };
1149 
1150 RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_SW_PMD, evdev_sw_pmd_drv);
1151 RTE_PMD_REGISTER_PARAM_STRING(event_sw, NUMA_NODE_ARG "=<int> "
1152 		SCHED_QUANTA_ARG "=<int>" CREDIT_QUANTA_ARG "=<int>"
1153 		MIN_BURST_SIZE_ARG "=<int>" DEQ_BURST_SIZE_ARG "=<int>"
1154 		REFIL_ONCE_ARG "=<int>");
1155 RTE_LOG_REGISTER_DEFAULT(eventdev_sw_log_level, NOTICE);
1156