xref: /dpdk/drivers/event/sw/sw_evdev.c (revision a3a2e2c8f7de433e10b1548df65b20bf10086d9c)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
5  *
6  *   Redistribution and use in source and binary forms, with or without
7  *   modification, are permitted provided that the following conditions
8  *   are met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above copyright
13  *       notice, this list of conditions and the following disclaimer in
14  *       the documentation and/or other materials provided with the
15  *       distribution.
16  *     * Neither the name of Intel Corporation nor the names of its
17  *       contributors may be used to endorse or promote products derived
18  *       from this software without specific prior written permission.
19  *
20  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 #include <inttypes.h>
34 #include <string.h>
35 
36 #include <rte_vdev.h>
37 #include <rte_memzone.h>
38 #include <rte_kvargs.h>
39 #include <rte_ring.h>
40 #include <rte_errno.h>
41 
42 #include "sw_evdev.h"
43 #include "iq_ring.h"
44 #include "event_ring.h"
45 
46 #define EVENTDEV_NAME_SW_PMD event_sw
47 #define NUMA_NODE_ARG "numa_node"
48 #define SCHED_QUANTA_ARG "sched_quanta"
49 #define CREDIT_QUANTA_ARG "credit_quanta"
50 
51 static void
52 sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info);
53 
54 static int
55 sw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[],
56 		const uint8_t priorities[], uint16_t num)
57 {
58 	struct sw_port *p = port;
59 	struct sw_evdev *sw = sw_pmd_priv(dev);
60 	int i;
61 
62 	RTE_SET_USED(priorities);
63 	for (i = 0; i < num; i++) {
64 		struct sw_qid *q = &sw->qids[queues[i]];
65 
66 		/* check for qid map overflow */
67 		if (q->cq_num_mapped_cqs >= RTE_DIM(q->cq_map)) {
68 			rte_errno = -EDQUOT;
69 			break;
70 		}
71 
72 		if (p->is_directed && p->num_qids_mapped > 0) {
73 			rte_errno = -EDQUOT;
74 			break;
75 		}
76 
77 		if (q->type == SW_SCHED_TYPE_DIRECT) {
78 			/* check directed qids only map to one port */
79 			if (p->num_qids_mapped > 0) {
80 				rte_errno = -EDQUOT;
81 				break;
82 			}
83 			/* check port only takes a directed flow */
84 			if (num > 1) {
85 				rte_errno = -EDQUOT;
86 				break;
87 			}
88 
89 			p->is_directed = 1;
90 			p->num_qids_mapped = 1;
91 		} else if (q->type == RTE_SCHED_TYPE_ORDERED) {
92 			p->num_ordered_qids++;
93 			p->num_qids_mapped++;
94 		} else if (q->type == RTE_SCHED_TYPE_ATOMIC ||
95 				q->type == RTE_SCHED_TYPE_PARALLEL) {
96 			p->num_qids_mapped++;
97 		}
98 
99 		q->cq_map[q->cq_num_mapped_cqs] = p->id;
100 		rte_smp_wmb();
101 		q->cq_num_mapped_cqs++;
102 	}
103 	return i;
104 }
105 
106 static int
107 sw_port_unlink(struct rte_eventdev *dev, void *port, uint8_t queues[],
108 		uint16_t nb_unlinks)
109 {
110 	struct sw_port *p = port;
111 	struct sw_evdev *sw = sw_pmd_priv(dev);
112 	unsigned int i, j;
113 
114 	int unlinked = 0;
115 	for (i = 0; i < nb_unlinks; i++) {
116 		struct sw_qid *q = &sw->qids[queues[i]];
117 		for (j = 0; j < q->cq_num_mapped_cqs; j++) {
118 			if (q->cq_map[j] == p->id) {
119 				q->cq_map[j] =
120 					q->cq_map[q->cq_num_mapped_cqs - 1];
121 				rte_smp_wmb();
122 				q->cq_num_mapped_cqs--;
123 				unlinked++;
124 
125 				p->num_qids_mapped--;
126 
127 				if (q->type == RTE_SCHED_TYPE_ORDERED)
128 					p->num_ordered_qids--;
129 
130 				continue;
131 			}
132 		}
133 	}
134 	return unlinked;
135 }
136 
137 static int
138 sw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
139 		const struct rte_event_port_conf *conf)
140 {
141 	struct sw_evdev *sw = sw_pmd_priv(dev);
142 	struct sw_port *p = &sw->ports[port_id];
143 	char buf[QE_RING_NAMESIZE];
144 	unsigned int i;
145 
146 	struct rte_event_dev_info info;
147 	sw_info_get(dev, &info);
148 
149 	/* detect re-configuring and return credits to instance if needed */
150 	if (p->initialized) {
151 		/* taking credits from pool is done one quanta at a time, and
152 		 * credits may be spend (counted in p->inflights) or still
153 		 * available in the port (p->inflight_credits). We must return
154 		 * the sum to no leak credits
155 		 */
156 		int possible_inflights = p->inflight_credits + p->inflights;
157 		rte_atomic32_sub(&sw->inflights, possible_inflights);
158 	}
159 
160 	*p = (struct sw_port){0}; /* zero entire structure */
161 	p->id = port_id;
162 	p->sw = sw;
163 
164 	snprintf(buf, sizeof(buf), "sw%d_%s", dev->data->dev_id,
165 			"rx_worker_ring");
166 	p->rx_worker_ring = qe_ring_create(buf, MAX_SW_PROD_Q_DEPTH,
167 			dev->data->socket_id);
168 	if (p->rx_worker_ring == NULL) {
169 		SW_LOG_ERR("Error creating RX worker ring for port %d\n",
170 				port_id);
171 		return -1;
172 	}
173 
174 	p->inflight_max = conf->new_event_threshold;
175 
176 	snprintf(buf, sizeof(buf), "sw%d_%s", dev->data->dev_id,
177 			"cq_worker_ring");
178 	p->cq_worker_ring = qe_ring_create(buf, conf->dequeue_depth,
179 			dev->data->socket_id);
180 	if (p->cq_worker_ring == NULL) {
181 		qe_ring_destroy(p->rx_worker_ring);
182 		SW_LOG_ERR("Error creating CQ worker ring for port %d\n",
183 				port_id);
184 		return -1;
185 	}
186 	sw->cq_ring_space[port_id] = conf->dequeue_depth;
187 
188 	/* set hist list contents to empty */
189 	for (i = 0; i < SW_PORT_HIST_LIST; i++) {
190 		p->hist_list[i].fid = -1;
191 		p->hist_list[i].qid = -1;
192 	}
193 	dev->data->ports[port_id] = p;
194 
195 	rte_smp_wmb();
196 	p->initialized = 1;
197 	return 0;
198 }
199 
200 static void
201 sw_port_release(void *port)
202 {
203 	struct sw_port *p = (void *)port;
204 	if (p == NULL)
205 		return;
206 
207 	qe_ring_destroy(p->rx_worker_ring);
208 	qe_ring_destroy(p->cq_worker_ring);
209 	memset(p, 0, sizeof(*p));
210 }
211 
212 static int32_t
213 qid_init(struct sw_evdev *sw, unsigned int idx, int type,
214 		const struct rte_event_queue_conf *queue_conf)
215 {
216 	unsigned int i;
217 	int dev_id = sw->data->dev_id;
218 	int socket_id = sw->data->socket_id;
219 	char buf[IQ_RING_NAMESIZE];
220 	struct sw_qid *qid = &sw->qids[idx];
221 
222 	for (i = 0; i < SW_IQS_MAX; i++) {
223 		snprintf(buf, sizeof(buf), "q_%u_iq_%d", idx, i);
224 		qid->iq[i] = iq_ring_create(buf, socket_id);
225 		if (!qid->iq[i]) {
226 			SW_LOG_DBG("ring create failed");
227 			goto cleanup;
228 		}
229 	}
230 
231 	/* Initialize the FID structures to no pinning (-1), and zero packets */
232 	const struct sw_fid_t fid = {.cq = -1, .pcount = 0};
233 	for (i = 0; i < RTE_DIM(qid->fids); i++)
234 		qid->fids[i] = fid;
235 
236 	qid->id = idx;
237 	qid->type = type;
238 	qid->priority = queue_conf->priority;
239 
240 	if (qid->type == RTE_SCHED_TYPE_ORDERED) {
241 		char ring_name[RTE_RING_NAMESIZE];
242 		uint32_t window_size;
243 
244 		/* rte_ring and window_size_mask require require window_size to
245 		 * be a power-of-2.
246 		 */
247 		window_size = rte_align32pow2(
248 				queue_conf->nb_atomic_order_sequences);
249 
250 		qid->window_size = window_size - 1;
251 
252 		if (!window_size) {
253 			SW_LOG_DBG(
254 				"invalid reorder_window_size for ordered queue\n"
255 				);
256 			goto cleanup;
257 		}
258 
259 		snprintf(buf, sizeof(buf), "sw%d_iq_%d_rob", dev_id, i);
260 		qid->reorder_buffer = rte_zmalloc_socket(buf,
261 				window_size * sizeof(qid->reorder_buffer[0]),
262 				0, socket_id);
263 		if (!qid->reorder_buffer) {
264 			SW_LOG_DBG("reorder_buffer malloc failed\n");
265 			goto cleanup;
266 		}
267 
268 		memset(&qid->reorder_buffer[0],
269 		       0,
270 		       window_size * sizeof(qid->reorder_buffer[0]));
271 
272 		snprintf(ring_name, sizeof(ring_name), "sw%d_q%d_freelist",
273 				dev_id, idx);
274 
275 		/* lookup the ring, and if it already exists, free it */
276 		struct rte_ring *cleanup = rte_ring_lookup(ring_name);
277 		if (cleanup)
278 			rte_ring_free(cleanup);
279 
280 		qid->reorder_buffer_freelist = rte_ring_create(ring_name,
281 				window_size,
282 				socket_id,
283 				RING_F_SP_ENQ | RING_F_SC_DEQ);
284 		if (!qid->reorder_buffer_freelist) {
285 			SW_LOG_DBG("freelist ring create failed");
286 			goto cleanup;
287 		}
288 
289 		/* Populate the freelist with reorder buffer entries. Enqueue
290 		 * 'window_size - 1' entries because the rte_ring holds only
291 		 * that many.
292 		 */
293 		for (i = 0; i < window_size - 1; i++) {
294 			if (rte_ring_sp_enqueue(qid->reorder_buffer_freelist,
295 						&qid->reorder_buffer[i]) < 0)
296 				goto cleanup;
297 		}
298 
299 		qid->reorder_buffer_index = 0;
300 		qid->cq_next_tx = 0;
301 	}
302 
303 	qid->initialized = 1;
304 
305 	return 0;
306 
307 cleanup:
308 	for (i = 0; i < SW_IQS_MAX; i++) {
309 		if (qid->iq[i])
310 			iq_ring_destroy(qid->iq[i]);
311 	}
312 
313 	if (qid->reorder_buffer) {
314 		rte_free(qid->reorder_buffer);
315 		qid->reorder_buffer = NULL;
316 	}
317 
318 	if (qid->reorder_buffer_freelist) {
319 		rte_ring_free(qid->reorder_buffer_freelist);
320 		qid->reorder_buffer_freelist = NULL;
321 	}
322 
323 	return -EINVAL;
324 }
325 
326 static int
327 sw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id,
328 		const struct rte_event_queue_conf *conf)
329 {
330 	int type;
331 
332 	/* SINGLE_LINK can be OR-ed with other types, so handle first */
333 	if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK & conf->event_queue_cfg) {
334 		type = SW_SCHED_TYPE_DIRECT;
335 	} else {
336 		switch (conf->event_queue_cfg) {
337 		case RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY:
338 			type = RTE_SCHED_TYPE_ATOMIC;
339 			break;
340 		case RTE_EVENT_QUEUE_CFG_ORDERED_ONLY:
341 			type = RTE_SCHED_TYPE_ORDERED;
342 			break;
343 		case RTE_EVENT_QUEUE_CFG_PARALLEL_ONLY:
344 			type = RTE_SCHED_TYPE_PARALLEL;
345 			break;
346 		case RTE_EVENT_QUEUE_CFG_ALL_TYPES:
347 			SW_LOG_ERR("QUEUE_CFG_ALL_TYPES not supported\n");
348 			return -ENOTSUP;
349 		default:
350 			SW_LOG_ERR("Unknown queue type %d requested\n",
351 				   conf->event_queue_cfg);
352 			return -EINVAL;
353 		}
354 	}
355 
356 	struct sw_evdev *sw = sw_pmd_priv(dev);
357 	return qid_init(sw, queue_id, type, conf);
358 }
359 
360 static void
361 sw_queue_release(struct rte_eventdev *dev, uint8_t id)
362 {
363 	struct sw_evdev *sw = sw_pmd_priv(dev);
364 	struct sw_qid *qid = &sw->qids[id];
365 	uint32_t i;
366 
367 	for (i = 0; i < SW_IQS_MAX; i++)
368 		iq_ring_destroy(qid->iq[i]);
369 
370 	if (qid->type == RTE_SCHED_TYPE_ORDERED) {
371 		rte_free(qid->reorder_buffer);
372 		rte_ring_free(qid->reorder_buffer_freelist);
373 	}
374 	memset(qid, 0, sizeof(*qid));
375 }
376 
377 static void
378 sw_queue_def_conf(struct rte_eventdev *dev, uint8_t queue_id,
379 				 struct rte_event_queue_conf *conf)
380 {
381 	RTE_SET_USED(dev);
382 	RTE_SET_USED(queue_id);
383 
384 	static const struct rte_event_queue_conf default_conf = {
385 		.nb_atomic_flows = 4096,
386 		.nb_atomic_order_sequences = 1,
387 		.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY,
388 		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
389 	};
390 
391 	*conf = default_conf;
392 }
393 
394 static void
395 sw_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
396 		 struct rte_event_port_conf *port_conf)
397 {
398 	RTE_SET_USED(dev);
399 	RTE_SET_USED(port_id);
400 
401 	port_conf->new_event_threshold = 1024;
402 	port_conf->dequeue_depth = 16;
403 	port_conf->enqueue_depth = 16;
404 }
405 
406 static int
407 sw_dev_configure(const struct rte_eventdev *dev)
408 {
409 	struct sw_evdev *sw = sw_pmd_priv(dev);
410 	const struct rte_eventdev_data *data = dev->data;
411 	const struct rte_event_dev_config *conf = &data->dev_conf;
412 
413 	sw->qid_count = conf->nb_event_queues;
414 	sw->port_count = conf->nb_event_ports;
415 	sw->nb_events_limit = conf->nb_events_limit;
416 	rte_atomic32_set(&sw->inflights, 0);
417 
418 	if (conf->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT)
419 		return -ENOTSUP;
420 
421 	return 0;
422 }
423 
424 static void
425 sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info)
426 {
427 	RTE_SET_USED(dev);
428 
429 	static const struct rte_event_dev_info evdev_sw_info = {
430 			.driver_name = SW_PMD_NAME,
431 			.max_event_queues = RTE_EVENT_MAX_QUEUES_PER_DEV,
432 			.max_event_queue_flows = SW_QID_NUM_FIDS,
433 			.max_event_queue_priority_levels = SW_Q_PRIORITY_MAX,
434 			.max_event_priority_levels = SW_IQS_MAX,
435 			.max_event_ports = SW_PORTS_MAX,
436 			.max_event_port_dequeue_depth = MAX_SW_CONS_Q_DEPTH,
437 			.max_event_port_enqueue_depth = MAX_SW_PROD_Q_DEPTH,
438 			.max_num_events = SW_INFLIGHT_EVENTS_TOTAL,
439 			.event_dev_cap = (RTE_EVENT_DEV_CAP_QUEUE_QOS |
440 					RTE_EVENT_DEV_CAP_BURST_MODE |
441 					RTE_EVENT_DEV_CAP_EVENT_QOS),
442 	};
443 
444 	*info = evdev_sw_info;
445 }
446 
447 static void
448 sw_dump(struct rte_eventdev *dev, FILE *f)
449 {
450 	const struct sw_evdev *sw = sw_pmd_priv(dev);
451 
452 	static const char * const q_type_strings[] = {
453 			"Ordered", "Atomic", "Parallel", "Directed"
454 	};
455 	uint32_t i;
456 	fprintf(f, "EventDev %s: ports %d, qids %d\n", "todo-fix-name",
457 			sw->port_count, sw->qid_count);
458 
459 	fprintf(f, "\trx   %"PRIu64"\n\tdrop %"PRIu64"\n\ttx   %"PRIu64"\n",
460 		sw->stats.rx_pkts, sw->stats.rx_dropped, sw->stats.tx_pkts);
461 	fprintf(f, "\tsched calls: %"PRIu64"\n", sw->sched_called);
462 	fprintf(f, "\tsched cq/qid call: %"PRIu64"\n", sw->sched_cq_qid_called);
463 	fprintf(f, "\tsched no IQ enq: %"PRIu64"\n", sw->sched_no_iq_enqueues);
464 	fprintf(f, "\tsched no CQ enq: %"PRIu64"\n", sw->sched_no_cq_enqueues);
465 	uint32_t inflights = rte_atomic32_read(&sw->inflights);
466 	uint32_t credits = sw->nb_events_limit - inflights;
467 	fprintf(f, "\tinflight %d, credits: %d\n", inflights, credits);
468 
469 #define COL_RED "\x1b[31m"
470 #define COL_RESET "\x1b[0m"
471 
472 	for (i = 0; i < sw->port_count; i++) {
473 		int max, j;
474 		const struct sw_port *p = &sw->ports[i];
475 		if (!p->initialized) {
476 			fprintf(f, "  %sPort %d not initialized.%s\n",
477 				COL_RED, i, COL_RESET);
478 			continue;
479 		}
480 		fprintf(f, "  Port %d %s\n", i,
481 			p->is_directed ? " (SingleCons)" : "");
482 		fprintf(f, "\trx   %"PRIu64"\tdrop %"PRIu64"\ttx   %"PRIu64
483 			"\t%sinflight %d%s\n", sw->ports[i].stats.rx_pkts,
484 			sw->ports[i].stats.rx_dropped,
485 			sw->ports[i].stats.tx_pkts,
486 			(p->inflights == p->inflight_max) ?
487 				COL_RED : COL_RESET,
488 			sw->ports[i].inflights, COL_RESET);
489 
490 		fprintf(f, "\tMax New: %u"
491 			"\tAvg cycles PP: %"PRIu64"\tCredits: %u\n",
492 			sw->ports[i].inflight_max,
493 			sw->ports[i].avg_pkt_ticks,
494 			sw->ports[i].inflight_credits);
495 		fprintf(f, "\tReceive burst distribution:\n");
496 		float zp_percent = p->zero_polls * 100.0 / p->total_polls;
497 		fprintf(f, zp_percent < 10 ? "\t\t0:%.02f%% " : "\t\t0:%.0f%% ",
498 				zp_percent);
499 		for (max = (int)RTE_DIM(p->poll_buckets); max-- > 0;)
500 			if (p->poll_buckets[max] != 0)
501 				break;
502 		for (j = 0; j <= max; j++) {
503 			if (p->poll_buckets[j] != 0) {
504 				float poll_pc = p->poll_buckets[j] * 100.0 /
505 					p->total_polls;
506 				fprintf(f, "%u-%u:%.02f%% ",
507 					((j << SW_DEQ_STAT_BUCKET_SHIFT) + 1),
508 					((j+1) << SW_DEQ_STAT_BUCKET_SHIFT),
509 					poll_pc);
510 			}
511 		}
512 		fprintf(f, "\n");
513 
514 		if (p->rx_worker_ring) {
515 			uint64_t used = qe_ring_count(p->rx_worker_ring);
516 			uint64_t space = qe_ring_free_count(p->rx_worker_ring);
517 			const char *col = (space == 0) ? COL_RED : COL_RESET;
518 			fprintf(f, "\t%srx ring used: %4"PRIu64"\tfree: %4"
519 					PRIu64 COL_RESET"\n", col, used, space);
520 		} else
521 			fprintf(f, "\trx ring not initialized.\n");
522 
523 		if (p->cq_worker_ring) {
524 			uint64_t used = qe_ring_count(p->cq_worker_ring);
525 			uint64_t space = qe_ring_free_count(p->cq_worker_ring);
526 			const char *col = (space == 0) ? COL_RED : COL_RESET;
527 			fprintf(f, "\t%scq ring used: %4"PRIu64"\tfree: %4"
528 					PRIu64 COL_RESET"\n", col, used, space);
529 		} else
530 			fprintf(f, "\tcq ring not initialized.\n");
531 	}
532 
533 	for (i = 0; i < sw->qid_count; i++) {
534 		const struct sw_qid *qid = &sw->qids[i];
535 		if (!qid->initialized) {
536 			fprintf(f, "  %sQueue %d not initialized.%s\n",
537 				COL_RED, i, COL_RESET);
538 			continue;
539 		}
540 		int affinities_per_port[SW_PORTS_MAX] = {0};
541 		uint32_t inflights = 0;
542 
543 		fprintf(f, "  Queue %d (%s)\n", i, q_type_strings[qid->type]);
544 		fprintf(f, "\trx   %"PRIu64"\tdrop %"PRIu64"\ttx   %"PRIu64"\n",
545 			qid->stats.rx_pkts, qid->stats.rx_dropped,
546 			qid->stats.tx_pkts);
547 		if (qid->type == RTE_SCHED_TYPE_ORDERED) {
548 			struct rte_ring *rob_buf_free =
549 				qid->reorder_buffer_freelist;
550 			if (rob_buf_free)
551 				fprintf(f, "\tReorder entries in use: %u\n",
552 					rte_ring_free_count(rob_buf_free));
553 			else
554 				fprintf(f,
555 					"\tReorder buffer not initialized\n");
556 		}
557 
558 		uint32_t flow;
559 		for (flow = 0; flow < RTE_DIM(qid->fids); flow++)
560 			if (qid->fids[flow].cq != -1) {
561 				affinities_per_port[qid->fids[flow].cq]++;
562 				inflights += qid->fids[flow].pcount;
563 			}
564 
565 		uint32_t port;
566 		fprintf(f, "\tPer Port Stats:\n");
567 		for (port = 0; port < sw->port_count; port++) {
568 			fprintf(f, "\t  Port %d: Pkts: %"PRIu64, port,
569 					qid->to_port[port]);
570 			fprintf(f, "\tFlows: %d\n", affinities_per_port[port]);
571 		}
572 
573 		uint32_t iq;
574 		uint32_t iq_printed = 0;
575 		for (iq = 0; iq < SW_IQS_MAX; iq++) {
576 			if (!qid->iq[iq]) {
577 				fprintf(f, "\tiq %d is not initialized.\n", iq);
578 				iq_printed = 1;
579 				continue;
580 			}
581 			uint32_t used = iq_ring_count(qid->iq[iq]);
582 			uint32_t free = iq_ring_free_count(qid->iq[iq]);
583 			const char *col = (free == 0) ? COL_RED : COL_RESET;
584 			if (used > 0) {
585 				fprintf(f, "\t%siq %d: Used %d\tFree %d"
586 					COL_RESET"\n", col, iq, used, free);
587 				iq_printed = 1;
588 			}
589 		}
590 		if (iq_printed == 0)
591 			fprintf(f, "\t-- iqs empty --\n");
592 	}
593 }
594 
595 static int
596 sw_start(struct rte_eventdev *dev)
597 {
598 	unsigned int i, j;
599 	struct sw_evdev *sw = sw_pmd_priv(dev);
600 	/* check all ports are set up */
601 	for (i = 0; i < sw->port_count; i++)
602 		if (sw->ports[i].rx_worker_ring == NULL) {
603 			SW_LOG_ERR("Port %d not configured\n", i);
604 			return -ESTALE;
605 		}
606 
607 	/* check all queues are configured and mapped to ports*/
608 	for (i = 0; i < sw->qid_count; i++)
609 		if (sw->qids[i].iq[0] == NULL ||
610 				sw->qids[i].cq_num_mapped_cqs == 0) {
611 			SW_LOG_ERR("Queue %d not configured\n", i);
612 			return -ENOLINK;
613 		}
614 
615 	/* build up our prioritized array of qids */
616 	/* We don't use qsort here, as if all/multiple entries have the same
617 	 * priority, the result is non-deterministic. From "man 3 qsort":
618 	 * "If two members compare as equal, their order in the sorted
619 	 * array is undefined."
620 	 */
621 	uint32_t qidx = 0;
622 	for (j = 0; j <= RTE_EVENT_DEV_PRIORITY_LOWEST; j++) {
623 		for (i = 0; i < sw->qid_count; i++) {
624 			if (sw->qids[i].priority == j) {
625 				sw->qids_prioritized[qidx] = &sw->qids[i];
626 				qidx++;
627 			}
628 		}
629 	}
630 
631 	if (sw_xstats_init(sw) < 0)
632 		return -EINVAL;
633 
634 	rte_smp_wmb();
635 	sw->started = 1;
636 
637 	return 0;
638 }
639 
640 static void
641 sw_stop(struct rte_eventdev *dev)
642 {
643 	struct sw_evdev *sw = sw_pmd_priv(dev);
644 	sw_xstats_uninit(sw);
645 	sw->started = 0;
646 	rte_smp_wmb();
647 }
648 
649 static int
650 sw_close(struct rte_eventdev *dev)
651 {
652 	struct sw_evdev *sw = sw_pmd_priv(dev);
653 	uint32_t i;
654 
655 	for (i = 0; i < sw->qid_count; i++)
656 		sw_queue_release(dev, i);
657 	sw->qid_count = 0;
658 
659 	for (i = 0; i < sw->port_count; i++)
660 		sw_port_release(&sw->ports[i]);
661 	sw->port_count = 0;
662 
663 	memset(&sw->stats, 0, sizeof(sw->stats));
664 	sw->sched_called = 0;
665 	sw->sched_no_iq_enqueues = 0;
666 	sw->sched_no_cq_enqueues = 0;
667 	sw->sched_cq_qid_called = 0;
668 
669 	return 0;
670 }
671 
672 static int
673 assign_numa_node(const char *key __rte_unused, const char *value, void *opaque)
674 {
675 	int *socket_id = opaque;
676 	*socket_id = atoi(value);
677 	if (*socket_id >= RTE_MAX_NUMA_NODES)
678 		return -1;
679 	return 0;
680 }
681 
682 static int
683 set_sched_quanta(const char *key __rte_unused, const char *value, void *opaque)
684 {
685 	int *quanta = opaque;
686 	*quanta = atoi(value);
687 	if (*quanta < 0 || *quanta >= 4096)
688 		return -1;
689 	return 0;
690 }
691 
692 static int
693 set_credit_quanta(const char *key __rte_unused, const char *value, void *opaque)
694 {
695 	int *credit = opaque;
696 	*credit = atoi(value);
697 	if (*credit < 0 || *credit >= 128)
698 		return -1;
699 	return 0;
700 }
701 
702 static int
703 sw_probe(struct rte_vdev_device *vdev)
704 {
705 	static const struct rte_eventdev_ops evdev_sw_ops = {
706 			.dev_configure = sw_dev_configure,
707 			.dev_infos_get = sw_info_get,
708 			.dev_close = sw_close,
709 			.dev_start = sw_start,
710 			.dev_stop = sw_stop,
711 			.dump = sw_dump,
712 
713 			.queue_def_conf = sw_queue_def_conf,
714 			.queue_setup = sw_queue_setup,
715 			.queue_release = sw_queue_release,
716 			.port_def_conf = sw_port_def_conf,
717 			.port_setup = sw_port_setup,
718 			.port_release = sw_port_release,
719 			.port_link = sw_port_link,
720 			.port_unlink = sw_port_unlink,
721 
722 			.xstats_get = sw_xstats_get,
723 			.xstats_get_names = sw_xstats_get_names,
724 			.xstats_get_by_name = sw_xstats_get_by_name,
725 			.xstats_reset = sw_xstats_reset,
726 	};
727 
728 	static const char *const args[] = {
729 		NUMA_NODE_ARG,
730 		SCHED_QUANTA_ARG,
731 		CREDIT_QUANTA_ARG,
732 		NULL
733 	};
734 	const char *name;
735 	const char *params;
736 	struct rte_eventdev *dev;
737 	struct sw_evdev *sw;
738 	int socket_id = rte_socket_id();
739 	int sched_quanta  = SW_DEFAULT_SCHED_QUANTA;
740 	int credit_quanta = SW_DEFAULT_CREDIT_QUANTA;
741 
742 	name = rte_vdev_device_name(vdev);
743 	params = rte_vdev_device_args(vdev);
744 	if (params != NULL && params[0] != '\0') {
745 		struct rte_kvargs *kvlist = rte_kvargs_parse(params, args);
746 
747 		if (!kvlist) {
748 			SW_LOG_INFO(
749 				"Ignoring unsupported parameters when creating device '%s'\n",
750 				name);
751 		} else {
752 			int ret = rte_kvargs_process(kvlist, NUMA_NODE_ARG,
753 					assign_numa_node, &socket_id);
754 			if (ret != 0) {
755 				SW_LOG_ERR(
756 					"%s: Error parsing numa node parameter",
757 					name);
758 				rte_kvargs_free(kvlist);
759 				return ret;
760 			}
761 
762 			ret = rte_kvargs_process(kvlist, SCHED_QUANTA_ARG,
763 					set_sched_quanta, &sched_quanta);
764 			if (ret != 0) {
765 				SW_LOG_ERR(
766 					"%s: Error parsing sched quanta parameter",
767 					name);
768 				rte_kvargs_free(kvlist);
769 				return ret;
770 			}
771 
772 			ret = rte_kvargs_process(kvlist, CREDIT_QUANTA_ARG,
773 					set_credit_quanta, &credit_quanta);
774 			if (ret != 0) {
775 				SW_LOG_ERR(
776 					"%s: Error parsing credit quanta parameter",
777 					name);
778 				rte_kvargs_free(kvlist);
779 				return ret;
780 			}
781 
782 			rte_kvargs_free(kvlist);
783 		}
784 	}
785 
786 	SW_LOG_INFO(
787 			"Creating eventdev sw device %s, numa_node=%d, sched_quanta=%d, credit_quanta=%d\n",
788 			name, socket_id, sched_quanta, credit_quanta);
789 
790 	dev = rte_event_pmd_vdev_init(name,
791 			sizeof(struct sw_evdev), socket_id);
792 	if (dev == NULL) {
793 		SW_LOG_ERR("eventdev vdev init() failed");
794 		return -EFAULT;
795 	}
796 	dev->dev_ops = &evdev_sw_ops;
797 	dev->enqueue = sw_event_enqueue;
798 	dev->enqueue_burst = sw_event_enqueue_burst;
799 	dev->dequeue = sw_event_dequeue;
800 	dev->dequeue_burst = sw_event_dequeue_burst;
801 	dev->schedule = sw_event_schedule;
802 
803 	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
804 		return 0;
805 
806 	sw = dev->data->dev_private;
807 	sw->data = dev->data;
808 
809 	/* copy values passed from vdev command line to instance */
810 	sw->credit_update_quanta = credit_quanta;
811 	sw->sched_quanta = sched_quanta;
812 
813 	return 0;
814 }
815 
816 static int
817 sw_remove(struct rte_vdev_device *vdev)
818 {
819 	const char *name;
820 
821 	name = rte_vdev_device_name(vdev);
822 	if (name == NULL)
823 		return -EINVAL;
824 
825 	SW_LOG_INFO("Closing eventdev sw device %s\n", name);
826 
827 	return rte_event_pmd_vdev_uninit(name);
828 }
829 
830 static struct rte_vdev_driver evdev_sw_pmd_drv = {
831 	.probe = sw_probe,
832 	.remove = sw_remove
833 };
834 
835 RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_SW_PMD, evdev_sw_pmd_drv);
836 RTE_PMD_REGISTER_PARAM_STRING(event_sw, NUMA_NODE_ARG "=<int> "
837 		SCHED_QUANTA_ARG "=<int>" CREDIT_QUANTA_ARG "=<int>");
838