xref: /dpdk/drivers/event/sw/sw_evdev.c (revision 089e5ed727a15da2729cfee9b63533dd120bd04c)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2017 Intel Corporation
3  */
4 
5 #include <inttypes.h>
6 #include <string.h>
7 
8 #include <rte_bus_vdev.h>
9 #include <rte_kvargs.h>
10 #include <rte_ring.h>
11 #include <rte_errno.h>
12 #include <rte_event_ring.h>
13 #include <rte_service_component.h>
14 
15 #include "sw_evdev.h"
16 #include "iq_chunk.h"
17 
18 #define EVENTDEV_NAME_SW_PMD event_sw
19 #define NUMA_NODE_ARG "numa_node"
20 #define SCHED_QUANTA_ARG "sched_quanta"
21 #define CREDIT_QUANTA_ARG "credit_quanta"
22 
23 static void
24 sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info);
25 
26 static int
27 sw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[],
28 		const uint8_t priorities[], uint16_t num)
29 {
30 	struct sw_port *p = port;
31 	struct sw_evdev *sw = sw_pmd_priv(dev);
32 	int i;
33 
34 	RTE_SET_USED(priorities);
35 	for (i = 0; i < num; i++) {
36 		struct sw_qid *q = &sw->qids[queues[i]];
37 		unsigned int j;
38 
39 		/* check for qid map overflow */
40 		if (q->cq_num_mapped_cqs >= RTE_DIM(q->cq_map)) {
41 			rte_errno = EDQUOT;
42 			break;
43 		}
44 
45 		if (p->is_directed && p->num_qids_mapped > 0) {
46 			rte_errno = EDQUOT;
47 			break;
48 		}
49 
50 		for (j = 0; j < q->cq_num_mapped_cqs; j++) {
51 			if (q->cq_map[j] == p->id)
52 				break;
53 		}
54 
55 		/* check if port is already linked */
56 		if (j < q->cq_num_mapped_cqs)
57 			continue;
58 
59 		if (q->type == SW_SCHED_TYPE_DIRECT) {
60 			/* check directed qids only map to one port */
61 			if (p->num_qids_mapped > 0) {
62 				rte_errno = EDQUOT;
63 				break;
64 			}
65 			/* check port only takes a directed flow */
66 			if (num > 1) {
67 				rte_errno = EDQUOT;
68 				break;
69 			}
70 
71 			p->is_directed = 1;
72 			p->num_qids_mapped = 1;
73 		} else if (q->type == RTE_SCHED_TYPE_ORDERED) {
74 			p->num_ordered_qids++;
75 			p->num_qids_mapped++;
76 		} else if (q->type == RTE_SCHED_TYPE_ATOMIC ||
77 				q->type == RTE_SCHED_TYPE_PARALLEL) {
78 			p->num_qids_mapped++;
79 		}
80 
81 		q->cq_map[q->cq_num_mapped_cqs] = p->id;
82 		rte_smp_wmb();
83 		q->cq_num_mapped_cqs++;
84 	}
85 	return i;
86 }
87 
88 static int
89 sw_port_unlink(struct rte_eventdev *dev, void *port, uint8_t queues[],
90 		uint16_t nb_unlinks)
91 {
92 	struct sw_port *p = port;
93 	struct sw_evdev *sw = sw_pmd_priv(dev);
94 	unsigned int i, j;
95 
96 	int unlinked = 0;
97 	for (i = 0; i < nb_unlinks; i++) {
98 		struct sw_qid *q = &sw->qids[queues[i]];
99 		for (j = 0; j < q->cq_num_mapped_cqs; j++) {
100 			if (q->cq_map[j] == p->id) {
101 				q->cq_map[j] =
102 					q->cq_map[q->cq_num_mapped_cqs - 1];
103 				rte_smp_wmb();
104 				q->cq_num_mapped_cqs--;
105 				unlinked++;
106 
107 				p->num_qids_mapped--;
108 
109 				if (q->type == RTE_SCHED_TYPE_ORDERED)
110 					p->num_ordered_qids--;
111 
112 				continue;
113 			}
114 		}
115 	}
116 
117 	p->unlinks_in_progress += unlinked;
118 	rte_smp_mb();
119 
120 	return unlinked;
121 }
122 
123 static int
124 sw_port_unlinks_in_progress(struct rte_eventdev *dev, void *port)
125 {
126 	RTE_SET_USED(dev);
127 	struct sw_port *p = port;
128 	return p->unlinks_in_progress;
129 }
130 
131 static int
132 sw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
133 		const struct rte_event_port_conf *conf)
134 {
135 	struct sw_evdev *sw = sw_pmd_priv(dev);
136 	struct sw_port *p = &sw->ports[port_id];
137 	char buf[RTE_RING_NAMESIZE];
138 	unsigned int i;
139 
140 	struct rte_event_dev_info info;
141 	sw_info_get(dev, &info);
142 
143 	/* detect re-configuring and return credits to instance if needed */
144 	if (p->initialized) {
145 		/* taking credits from pool is done one quanta at a time, and
146 		 * credits may be spend (counted in p->inflights) or still
147 		 * available in the port (p->inflight_credits). We must return
148 		 * the sum to no leak credits
149 		 */
150 		int possible_inflights = p->inflight_credits + p->inflights;
151 		rte_atomic32_sub(&sw->inflights, possible_inflights);
152 	}
153 
154 	*p = (struct sw_port){0}; /* zero entire structure */
155 	p->id = port_id;
156 	p->sw = sw;
157 
158 	/* check to see if rings exists - port_setup() can be called multiple
159 	 * times legally (assuming device is stopped). If ring exists, free it
160 	 * to so it gets re-created with the correct size
161 	 */
162 	snprintf(buf, sizeof(buf), "sw%d_p%u_%s", dev->data->dev_id,
163 			port_id, "rx_worker_ring");
164 	struct rte_event_ring *existing_ring = rte_event_ring_lookup(buf);
165 	if (existing_ring)
166 		rte_event_ring_free(existing_ring);
167 
168 	p->rx_worker_ring = rte_event_ring_create(buf, MAX_SW_PROD_Q_DEPTH,
169 			dev->data->socket_id,
170 			RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ);
171 	if (p->rx_worker_ring == NULL) {
172 		SW_LOG_ERR("Error creating RX worker ring for port %d\n",
173 				port_id);
174 		return -1;
175 	}
176 
177 	p->inflight_max = conf->new_event_threshold;
178 	p->implicit_release = !conf->disable_implicit_release;
179 
180 	/* check if ring exists, same as rx_worker above */
181 	snprintf(buf, sizeof(buf), "sw%d_p%u, %s", dev->data->dev_id,
182 			port_id, "cq_worker_ring");
183 	existing_ring = rte_event_ring_lookup(buf);
184 	if (existing_ring)
185 		rte_event_ring_free(existing_ring);
186 
187 	p->cq_worker_ring = rte_event_ring_create(buf, conf->dequeue_depth,
188 			dev->data->socket_id,
189 			RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ);
190 	if (p->cq_worker_ring == NULL) {
191 		rte_event_ring_free(p->rx_worker_ring);
192 		SW_LOG_ERR("Error creating CQ worker ring for port %d\n",
193 				port_id);
194 		return -1;
195 	}
196 	sw->cq_ring_space[port_id] = conf->dequeue_depth;
197 
198 	/* set hist list contents to empty */
199 	for (i = 0; i < SW_PORT_HIST_LIST; i++) {
200 		p->hist_list[i].fid = -1;
201 		p->hist_list[i].qid = -1;
202 	}
203 	dev->data->ports[port_id] = p;
204 
205 	rte_smp_wmb();
206 	p->initialized = 1;
207 	return 0;
208 }
209 
210 static void
211 sw_port_release(void *port)
212 {
213 	struct sw_port *p = (void *)port;
214 	if (p == NULL)
215 		return;
216 
217 	rte_event_ring_free(p->rx_worker_ring);
218 	rte_event_ring_free(p->cq_worker_ring);
219 	memset(p, 0, sizeof(*p));
220 }
221 
222 static int32_t
223 qid_init(struct sw_evdev *sw, unsigned int idx, int type,
224 		const struct rte_event_queue_conf *queue_conf)
225 {
226 	unsigned int i;
227 	int dev_id = sw->data->dev_id;
228 	int socket_id = sw->data->socket_id;
229 	char buf[IQ_ROB_NAMESIZE];
230 	struct sw_qid *qid = &sw->qids[idx];
231 
232 	/* Initialize the FID structures to no pinning (-1), and zero packets */
233 	const struct sw_fid_t fid = {.cq = -1, .pcount = 0};
234 	for (i = 0; i < RTE_DIM(qid->fids); i++)
235 		qid->fids[i] = fid;
236 
237 	qid->id = idx;
238 	qid->type = type;
239 	qid->priority = queue_conf->priority;
240 
241 	if (qid->type == RTE_SCHED_TYPE_ORDERED) {
242 		char ring_name[RTE_RING_NAMESIZE];
243 		uint32_t window_size;
244 
245 		/* rte_ring and window_size_mask require require window_size to
246 		 * be a power-of-2.
247 		 */
248 		window_size = rte_align32pow2(
249 				queue_conf->nb_atomic_order_sequences);
250 
251 		qid->window_size = window_size - 1;
252 
253 		if (!window_size) {
254 			SW_LOG_DBG(
255 				"invalid reorder_window_size for ordered queue\n"
256 				);
257 			goto cleanup;
258 		}
259 
260 		snprintf(buf, sizeof(buf), "sw%d_iq_%d_rob", dev_id, i);
261 		qid->reorder_buffer = rte_zmalloc_socket(buf,
262 				window_size * sizeof(qid->reorder_buffer[0]),
263 				0, socket_id);
264 		if (!qid->reorder_buffer) {
265 			SW_LOG_DBG("reorder_buffer malloc failed\n");
266 			goto cleanup;
267 		}
268 
269 		memset(&qid->reorder_buffer[0],
270 		       0,
271 		       window_size * sizeof(qid->reorder_buffer[0]));
272 
273 		snprintf(ring_name, sizeof(ring_name), "sw%d_q%d_freelist",
274 				dev_id, idx);
275 
276 		/* lookup the ring, and if it already exists, free it */
277 		struct rte_ring *cleanup = rte_ring_lookup(ring_name);
278 		if (cleanup)
279 			rte_ring_free(cleanup);
280 
281 		qid->reorder_buffer_freelist = rte_ring_create(ring_name,
282 				window_size,
283 				socket_id,
284 				RING_F_SP_ENQ | RING_F_SC_DEQ);
285 		if (!qid->reorder_buffer_freelist) {
286 			SW_LOG_DBG("freelist ring create failed");
287 			goto cleanup;
288 		}
289 
290 		/* Populate the freelist with reorder buffer entries. Enqueue
291 		 * 'window_size - 1' entries because the rte_ring holds only
292 		 * that many.
293 		 */
294 		for (i = 0; i < window_size - 1; i++) {
295 			if (rte_ring_sp_enqueue(qid->reorder_buffer_freelist,
296 						&qid->reorder_buffer[i]) < 0)
297 				goto cleanup;
298 		}
299 
300 		qid->reorder_buffer_index = 0;
301 		qid->cq_next_tx = 0;
302 	}
303 
304 	qid->initialized = 1;
305 
306 	return 0;
307 
308 cleanup:
309 	if (qid->reorder_buffer) {
310 		rte_free(qid->reorder_buffer);
311 		qid->reorder_buffer = NULL;
312 	}
313 
314 	if (qid->reorder_buffer_freelist) {
315 		rte_ring_free(qid->reorder_buffer_freelist);
316 		qid->reorder_buffer_freelist = NULL;
317 	}
318 
319 	return -EINVAL;
320 }
321 
322 static void
323 sw_queue_release(struct rte_eventdev *dev, uint8_t id)
324 {
325 	struct sw_evdev *sw = sw_pmd_priv(dev);
326 	struct sw_qid *qid = &sw->qids[id];
327 
328 	if (qid->type == RTE_SCHED_TYPE_ORDERED) {
329 		rte_free(qid->reorder_buffer);
330 		rte_ring_free(qid->reorder_buffer_freelist);
331 	}
332 	memset(qid, 0, sizeof(*qid));
333 }
334 
335 static int
336 sw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id,
337 		const struct rte_event_queue_conf *conf)
338 {
339 	int type;
340 
341 	type = conf->schedule_type;
342 
343 	if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK & conf->event_queue_cfg) {
344 		type = SW_SCHED_TYPE_DIRECT;
345 	} else if (RTE_EVENT_QUEUE_CFG_ALL_TYPES
346 			& conf->event_queue_cfg) {
347 		SW_LOG_ERR("QUEUE_CFG_ALL_TYPES not supported\n");
348 		return -ENOTSUP;
349 	}
350 
351 	struct sw_evdev *sw = sw_pmd_priv(dev);
352 
353 	if (sw->qids[queue_id].initialized)
354 		sw_queue_release(dev, queue_id);
355 
356 	return qid_init(sw, queue_id, type, conf);
357 }
358 
359 static void
360 sw_init_qid_iqs(struct sw_evdev *sw)
361 {
362 	int i, j;
363 
364 	/* Initialize the IQ memory of all configured qids */
365 	for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
366 		struct sw_qid *qid = &sw->qids[i];
367 
368 		if (!qid->initialized)
369 			continue;
370 
371 		for (j = 0; j < SW_IQS_MAX; j++)
372 			iq_init(sw, &qid->iq[j]);
373 	}
374 }
375 
376 static int
377 sw_qids_empty(struct sw_evdev *sw)
378 {
379 	unsigned int i, j;
380 
381 	for (i = 0; i < sw->qid_count; i++) {
382 		for (j = 0; j < SW_IQS_MAX; j++) {
383 			if (iq_count(&sw->qids[i].iq[j]))
384 				return 0;
385 		}
386 	}
387 
388 	return 1;
389 }
390 
391 static int
392 sw_ports_empty(struct sw_evdev *sw)
393 {
394 	unsigned int i;
395 
396 	for (i = 0; i < sw->port_count; i++) {
397 		if ((rte_event_ring_count(sw->ports[i].rx_worker_ring)) ||
398 		     rte_event_ring_count(sw->ports[i].cq_worker_ring))
399 			return 0;
400 	}
401 
402 	return 1;
403 }
404 
405 static void
406 sw_drain_ports(struct rte_eventdev *dev)
407 {
408 	struct sw_evdev *sw = sw_pmd_priv(dev);
409 	eventdev_stop_flush_t flush;
410 	unsigned int i;
411 	uint8_t dev_id;
412 	void *arg;
413 
414 	flush = dev->dev_ops->dev_stop_flush;
415 	dev_id = dev->data->dev_id;
416 	arg = dev->data->dev_stop_flush_arg;
417 
418 	for (i = 0; i < sw->port_count; i++) {
419 		struct rte_event ev;
420 
421 		while (rte_event_dequeue_burst(dev_id, i, &ev, 1, 0)) {
422 			if (flush)
423 				flush(dev_id, ev, arg);
424 
425 			ev.op = RTE_EVENT_OP_RELEASE;
426 			rte_event_enqueue_burst(dev_id, i, &ev, 1);
427 		}
428 	}
429 }
430 
431 static void
432 sw_drain_queue(struct rte_eventdev *dev, struct sw_iq *iq)
433 {
434 	struct sw_evdev *sw = sw_pmd_priv(dev);
435 	eventdev_stop_flush_t flush;
436 	uint8_t dev_id;
437 	void *arg;
438 
439 	flush = dev->dev_ops->dev_stop_flush;
440 	dev_id = dev->data->dev_id;
441 	arg = dev->data->dev_stop_flush_arg;
442 
443 	while (iq_count(iq) > 0) {
444 		struct rte_event ev;
445 
446 		iq_dequeue_burst(sw, iq, &ev, 1);
447 
448 		if (flush)
449 			flush(dev_id, ev, arg);
450 	}
451 }
452 
453 static void
454 sw_drain_queues(struct rte_eventdev *dev)
455 {
456 	struct sw_evdev *sw = sw_pmd_priv(dev);
457 	unsigned int i, j;
458 
459 	for (i = 0; i < sw->qid_count; i++) {
460 		for (j = 0; j < SW_IQS_MAX; j++)
461 			sw_drain_queue(dev, &sw->qids[i].iq[j]);
462 	}
463 }
464 
465 static void
466 sw_clean_qid_iqs(struct rte_eventdev *dev)
467 {
468 	struct sw_evdev *sw = sw_pmd_priv(dev);
469 	int i, j;
470 
471 	/* Release the IQ memory of all configured qids */
472 	for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
473 		struct sw_qid *qid = &sw->qids[i];
474 
475 		for (j = 0; j < SW_IQS_MAX; j++) {
476 			if (!qid->iq[j].head)
477 				continue;
478 			iq_free_chunk_list(sw, qid->iq[j].head);
479 			qid->iq[j].head = NULL;
480 		}
481 	}
482 }
483 
484 static void
485 sw_queue_def_conf(struct rte_eventdev *dev, uint8_t queue_id,
486 				 struct rte_event_queue_conf *conf)
487 {
488 	RTE_SET_USED(dev);
489 	RTE_SET_USED(queue_id);
490 
491 	static const struct rte_event_queue_conf default_conf = {
492 		.nb_atomic_flows = 4096,
493 		.nb_atomic_order_sequences = 1,
494 		.schedule_type = RTE_SCHED_TYPE_ATOMIC,
495 		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
496 	};
497 
498 	*conf = default_conf;
499 }
500 
501 static void
502 sw_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
503 		 struct rte_event_port_conf *port_conf)
504 {
505 	RTE_SET_USED(dev);
506 	RTE_SET_USED(port_id);
507 
508 	port_conf->new_event_threshold = 1024;
509 	port_conf->dequeue_depth = 16;
510 	port_conf->enqueue_depth = 16;
511 	port_conf->disable_implicit_release = 0;
512 }
513 
514 static int
515 sw_dev_configure(const struct rte_eventdev *dev)
516 {
517 	struct sw_evdev *sw = sw_pmd_priv(dev);
518 	const struct rte_eventdev_data *data = dev->data;
519 	const struct rte_event_dev_config *conf = &data->dev_conf;
520 	int num_chunks, i;
521 
522 	sw->qid_count = conf->nb_event_queues;
523 	sw->port_count = conf->nb_event_ports;
524 	sw->nb_events_limit = conf->nb_events_limit;
525 	rte_atomic32_set(&sw->inflights, 0);
526 
527 	/* Number of chunks sized for worst-case spread of events across IQs */
528 	num_chunks = ((SW_INFLIGHT_EVENTS_TOTAL/SW_EVS_PER_Q_CHUNK)+1) +
529 			sw->qid_count*SW_IQS_MAX*2;
530 
531 	/* If this is a reconfiguration, free the previous IQ allocation. All
532 	 * IQ chunk references were cleaned out of the QIDs in sw_stop(), and
533 	 * will be reinitialized in sw_start().
534 	 */
535 	if (sw->chunks)
536 		rte_free(sw->chunks);
537 
538 	sw->chunks = rte_malloc_socket(NULL,
539 				       sizeof(struct sw_queue_chunk) *
540 				       num_chunks,
541 				       0,
542 				       sw->data->socket_id);
543 	if (!sw->chunks)
544 		return -ENOMEM;
545 
546 	sw->chunk_list_head = NULL;
547 	for (i = 0; i < num_chunks; i++)
548 		iq_free_chunk(sw, &sw->chunks[i]);
549 
550 	if (conf->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT)
551 		return -ENOTSUP;
552 
553 	return 0;
554 }
555 
556 struct rte_eth_dev;
557 
558 static int
559 sw_eth_rx_adapter_caps_get(const struct rte_eventdev *dev,
560 			const struct rte_eth_dev *eth_dev,
561 			uint32_t *caps)
562 {
563 	RTE_SET_USED(dev);
564 	RTE_SET_USED(eth_dev);
565 	*caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP;
566 	return 0;
567 }
568 
569 static int
570 sw_timer_adapter_caps_get(const struct rte_eventdev *dev,
571 			  uint64_t flags,
572 			  uint32_t *caps,
573 			  const struct rte_event_timer_adapter_ops **ops)
574 {
575 	RTE_SET_USED(dev);
576 	RTE_SET_USED(flags);
577 	*caps = 0;
578 
579 	/* Use default SW ops */
580 	*ops = NULL;
581 
582 	return 0;
583 }
584 
585 static int
586 sw_crypto_adapter_caps_get(const struct rte_eventdev *dev,
587 			   const struct rte_cryptodev *cdev,
588 			   uint32_t *caps)
589 {
590 	RTE_SET_USED(dev);
591 	RTE_SET_USED(cdev);
592 	*caps = RTE_EVENT_CRYPTO_ADAPTER_SW_CAP;
593 	return 0;
594 }
595 
596 static void
597 sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info)
598 {
599 	RTE_SET_USED(dev);
600 
601 	static const struct rte_event_dev_info evdev_sw_info = {
602 			.driver_name = SW_PMD_NAME,
603 			.max_event_queues = RTE_EVENT_MAX_QUEUES_PER_DEV,
604 			.max_event_queue_flows = SW_QID_NUM_FIDS,
605 			.max_event_queue_priority_levels = SW_Q_PRIORITY_MAX,
606 			.max_event_priority_levels = SW_IQS_MAX,
607 			.max_event_ports = SW_PORTS_MAX,
608 			.max_event_port_dequeue_depth = MAX_SW_CONS_Q_DEPTH,
609 			.max_event_port_enqueue_depth = MAX_SW_PROD_Q_DEPTH,
610 			.max_num_events = SW_INFLIGHT_EVENTS_TOTAL,
611 			.event_dev_cap = (
612 				RTE_EVENT_DEV_CAP_QUEUE_QOS |
613 				RTE_EVENT_DEV_CAP_BURST_MODE |
614 				RTE_EVENT_DEV_CAP_EVENT_QOS |
615 				RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE|
616 				RTE_EVENT_DEV_CAP_RUNTIME_PORT_LINK |
617 				RTE_EVENT_DEV_CAP_MULTIPLE_QUEUE_PORT |
618 				RTE_EVENT_DEV_CAP_NONSEQ_MODE),
619 	};
620 
621 	*info = evdev_sw_info;
622 }
623 
624 static void
625 sw_dump(struct rte_eventdev *dev, FILE *f)
626 {
627 	const struct sw_evdev *sw = sw_pmd_priv(dev);
628 
629 	static const char * const q_type_strings[] = {
630 			"Ordered", "Atomic", "Parallel", "Directed"
631 	};
632 	uint32_t i;
633 	fprintf(f, "EventDev %s: ports %d, qids %d\n", "todo-fix-name",
634 			sw->port_count, sw->qid_count);
635 
636 	fprintf(f, "\trx   %"PRIu64"\n\tdrop %"PRIu64"\n\ttx   %"PRIu64"\n",
637 		sw->stats.rx_pkts, sw->stats.rx_dropped, sw->stats.tx_pkts);
638 	fprintf(f, "\tsched calls: %"PRIu64"\n", sw->sched_called);
639 	fprintf(f, "\tsched cq/qid call: %"PRIu64"\n", sw->sched_cq_qid_called);
640 	fprintf(f, "\tsched no IQ enq: %"PRIu64"\n", sw->sched_no_iq_enqueues);
641 	fprintf(f, "\tsched no CQ enq: %"PRIu64"\n", sw->sched_no_cq_enqueues);
642 	uint32_t inflights = rte_atomic32_read(&sw->inflights);
643 	uint32_t credits = sw->nb_events_limit - inflights;
644 	fprintf(f, "\tinflight %d, credits: %d\n", inflights, credits);
645 
646 #define COL_RED "\x1b[31m"
647 #define COL_RESET "\x1b[0m"
648 
649 	for (i = 0; i < sw->port_count; i++) {
650 		int max, j;
651 		const struct sw_port *p = &sw->ports[i];
652 		if (!p->initialized) {
653 			fprintf(f, "  %sPort %d not initialized.%s\n",
654 				COL_RED, i, COL_RESET);
655 			continue;
656 		}
657 		fprintf(f, "  Port %d %s\n", i,
658 			p->is_directed ? " (SingleCons)" : "");
659 		fprintf(f, "\trx   %"PRIu64"\tdrop %"PRIu64"\ttx   %"PRIu64
660 			"\t%sinflight %d%s\n", sw->ports[i].stats.rx_pkts,
661 			sw->ports[i].stats.rx_dropped,
662 			sw->ports[i].stats.tx_pkts,
663 			(p->inflights == p->inflight_max) ?
664 				COL_RED : COL_RESET,
665 			sw->ports[i].inflights, COL_RESET);
666 
667 		fprintf(f, "\tMax New: %u"
668 			"\tAvg cycles PP: %"PRIu64"\tCredits: %u\n",
669 			sw->ports[i].inflight_max,
670 			sw->ports[i].avg_pkt_ticks,
671 			sw->ports[i].inflight_credits);
672 		fprintf(f, "\tReceive burst distribution:\n");
673 		float zp_percent = p->zero_polls * 100.0 / p->total_polls;
674 		fprintf(f, zp_percent < 10 ? "\t\t0:%.02f%% " : "\t\t0:%.0f%% ",
675 				zp_percent);
676 		for (max = (int)RTE_DIM(p->poll_buckets); max-- > 0;)
677 			if (p->poll_buckets[max] != 0)
678 				break;
679 		for (j = 0; j <= max; j++) {
680 			if (p->poll_buckets[j] != 0) {
681 				float poll_pc = p->poll_buckets[j] * 100.0 /
682 					p->total_polls;
683 				fprintf(f, "%u-%u:%.02f%% ",
684 					((j << SW_DEQ_STAT_BUCKET_SHIFT) + 1),
685 					((j+1) << SW_DEQ_STAT_BUCKET_SHIFT),
686 					poll_pc);
687 			}
688 		}
689 		fprintf(f, "\n");
690 
691 		if (p->rx_worker_ring) {
692 			uint64_t used = rte_event_ring_count(p->rx_worker_ring);
693 			uint64_t space = rte_event_ring_free_count(
694 					p->rx_worker_ring);
695 			const char *col = (space == 0) ? COL_RED : COL_RESET;
696 			fprintf(f, "\t%srx ring used: %4"PRIu64"\tfree: %4"
697 					PRIu64 COL_RESET"\n", col, used, space);
698 		} else
699 			fprintf(f, "\trx ring not initialized.\n");
700 
701 		if (p->cq_worker_ring) {
702 			uint64_t used = rte_event_ring_count(p->cq_worker_ring);
703 			uint64_t space = rte_event_ring_free_count(
704 					p->cq_worker_ring);
705 			const char *col = (space == 0) ? COL_RED : COL_RESET;
706 			fprintf(f, "\t%scq ring used: %4"PRIu64"\tfree: %4"
707 					PRIu64 COL_RESET"\n", col, used, space);
708 		} else
709 			fprintf(f, "\tcq ring not initialized.\n");
710 	}
711 
712 	for (i = 0; i < sw->qid_count; i++) {
713 		const struct sw_qid *qid = &sw->qids[i];
714 		if (!qid->initialized) {
715 			fprintf(f, "  %sQueue %d not initialized.%s\n",
716 				COL_RED, i, COL_RESET);
717 			continue;
718 		}
719 		int affinities_per_port[SW_PORTS_MAX] = {0};
720 		uint32_t inflights = 0;
721 
722 		fprintf(f, "  Queue %d (%s)\n", i, q_type_strings[qid->type]);
723 		fprintf(f, "\trx   %"PRIu64"\tdrop %"PRIu64"\ttx   %"PRIu64"\n",
724 			qid->stats.rx_pkts, qid->stats.rx_dropped,
725 			qid->stats.tx_pkts);
726 		if (qid->type == RTE_SCHED_TYPE_ORDERED) {
727 			struct rte_ring *rob_buf_free =
728 				qid->reorder_buffer_freelist;
729 			if (rob_buf_free)
730 				fprintf(f, "\tReorder entries in use: %u\n",
731 					rte_ring_free_count(rob_buf_free));
732 			else
733 				fprintf(f,
734 					"\tReorder buffer not initialized\n");
735 		}
736 
737 		uint32_t flow;
738 		for (flow = 0; flow < RTE_DIM(qid->fids); flow++)
739 			if (qid->fids[flow].cq != -1) {
740 				affinities_per_port[qid->fids[flow].cq]++;
741 				inflights += qid->fids[flow].pcount;
742 			}
743 
744 		uint32_t port;
745 		fprintf(f, "\tPer Port Stats:\n");
746 		for (port = 0; port < sw->port_count; port++) {
747 			fprintf(f, "\t  Port %d: Pkts: %"PRIu64, port,
748 					qid->to_port[port]);
749 			fprintf(f, "\tFlows: %d\n", affinities_per_port[port]);
750 		}
751 
752 		uint32_t iq;
753 		uint32_t iq_printed = 0;
754 		for (iq = 0; iq < SW_IQS_MAX; iq++) {
755 			if (!qid->iq[iq].head) {
756 				fprintf(f, "\tiq %d is not initialized.\n", iq);
757 				iq_printed = 1;
758 				continue;
759 			}
760 			uint32_t used = iq_count(&qid->iq[iq]);
761 			const char *col = COL_RESET;
762 			if (used > 0) {
763 				fprintf(f, "\t%siq %d: Used %d"
764 					COL_RESET"\n", col, iq, used);
765 				iq_printed = 1;
766 			}
767 		}
768 		if (iq_printed == 0)
769 			fprintf(f, "\t-- iqs empty --\n");
770 	}
771 }
772 
773 static int
774 sw_start(struct rte_eventdev *dev)
775 {
776 	unsigned int i, j;
777 	struct sw_evdev *sw = sw_pmd_priv(dev);
778 
779 	rte_service_component_runstate_set(sw->service_id, 1);
780 
781 	/* check a service core is mapped to this service */
782 	if (!rte_service_runstate_get(sw->service_id)) {
783 		SW_LOG_ERR("Warning: No Service core enabled on service %s\n",
784 				sw->service_name);
785 		return -ENOENT;
786 	}
787 
788 	/* check all ports are set up */
789 	for (i = 0; i < sw->port_count; i++)
790 		if (sw->ports[i].rx_worker_ring == NULL) {
791 			SW_LOG_ERR("Port %d not configured\n", i);
792 			return -ESTALE;
793 		}
794 
795 	/* check all queues are configured and mapped to ports*/
796 	for (i = 0; i < sw->qid_count; i++)
797 		if (!sw->qids[i].initialized ||
798 		    sw->qids[i].cq_num_mapped_cqs == 0) {
799 			SW_LOG_ERR("Queue %d not configured\n", i);
800 			return -ENOLINK;
801 		}
802 
803 	/* build up our prioritized array of qids */
804 	/* We don't use qsort here, as if all/multiple entries have the same
805 	 * priority, the result is non-deterministic. From "man 3 qsort":
806 	 * "If two members compare as equal, their order in the sorted
807 	 * array is undefined."
808 	 */
809 	uint32_t qidx = 0;
810 	for (j = 0; j <= RTE_EVENT_DEV_PRIORITY_LOWEST; j++) {
811 		for (i = 0; i < sw->qid_count; i++) {
812 			if (sw->qids[i].priority == j) {
813 				sw->qids_prioritized[qidx] = &sw->qids[i];
814 				qidx++;
815 			}
816 		}
817 	}
818 
819 	sw_init_qid_iqs(sw);
820 
821 	if (sw_xstats_init(sw) < 0)
822 		return -EINVAL;
823 
824 	rte_smp_wmb();
825 	sw->started = 1;
826 
827 	return 0;
828 }
829 
830 static void
831 sw_stop(struct rte_eventdev *dev)
832 {
833 	struct sw_evdev *sw = sw_pmd_priv(dev);
834 	int32_t runstate;
835 
836 	/* Stop the scheduler if it's running */
837 	runstate = rte_service_runstate_get(sw->service_id);
838 	if (runstate == 1)
839 		rte_service_runstate_set(sw->service_id, 0);
840 
841 	while (rte_service_may_be_active(sw->service_id))
842 		rte_pause();
843 
844 	/* Flush all events out of the device */
845 	while (!(sw_qids_empty(sw) && sw_ports_empty(sw))) {
846 		sw_event_schedule(dev);
847 		sw_drain_ports(dev);
848 		sw_drain_queues(dev);
849 	}
850 
851 	sw_clean_qid_iqs(dev);
852 	sw_xstats_uninit(sw);
853 	sw->started = 0;
854 	rte_smp_wmb();
855 
856 	if (runstate == 1)
857 		rte_service_runstate_set(sw->service_id, 1);
858 }
859 
860 static int
861 sw_close(struct rte_eventdev *dev)
862 {
863 	struct sw_evdev *sw = sw_pmd_priv(dev);
864 	uint32_t i;
865 
866 	for (i = 0; i < sw->qid_count; i++)
867 		sw_queue_release(dev, i);
868 	sw->qid_count = 0;
869 
870 	for (i = 0; i < sw->port_count; i++)
871 		sw_port_release(&sw->ports[i]);
872 	sw->port_count = 0;
873 
874 	memset(&sw->stats, 0, sizeof(sw->stats));
875 	sw->sched_called = 0;
876 	sw->sched_no_iq_enqueues = 0;
877 	sw->sched_no_cq_enqueues = 0;
878 	sw->sched_cq_qid_called = 0;
879 
880 	return 0;
881 }
882 
883 static int
884 assign_numa_node(const char *key __rte_unused, const char *value, void *opaque)
885 {
886 	int *socket_id = opaque;
887 	*socket_id = atoi(value);
888 	if (*socket_id >= RTE_MAX_NUMA_NODES)
889 		return -1;
890 	return 0;
891 }
892 
893 static int
894 set_sched_quanta(const char *key __rte_unused, const char *value, void *opaque)
895 {
896 	int *quanta = opaque;
897 	*quanta = atoi(value);
898 	if (*quanta < 0 || *quanta >= 4096)
899 		return -1;
900 	return 0;
901 }
902 
903 static int
904 set_credit_quanta(const char *key __rte_unused, const char *value, void *opaque)
905 {
906 	int *credit = opaque;
907 	*credit = atoi(value);
908 	if (*credit < 0 || *credit >= 128)
909 		return -1;
910 	return 0;
911 }
912 
913 
914 static int32_t sw_sched_service_func(void *args)
915 {
916 	struct rte_eventdev *dev = args;
917 	sw_event_schedule(dev);
918 	return 0;
919 }
920 
921 static int
922 sw_probe(struct rte_vdev_device *vdev)
923 {
924 	static struct rte_eventdev_ops evdev_sw_ops = {
925 			.dev_configure = sw_dev_configure,
926 			.dev_infos_get = sw_info_get,
927 			.dev_close = sw_close,
928 			.dev_start = sw_start,
929 			.dev_stop = sw_stop,
930 			.dump = sw_dump,
931 
932 			.queue_def_conf = sw_queue_def_conf,
933 			.queue_setup = sw_queue_setup,
934 			.queue_release = sw_queue_release,
935 			.port_def_conf = sw_port_def_conf,
936 			.port_setup = sw_port_setup,
937 			.port_release = sw_port_release,
938 			.port_link = sw_port_link,
939 			.port_unlink = sw_port_unlink,
940 			.port_unlinks_in_progress = sw_port_unlinks_in_progress,
941 
942 			.eth_rx_adapter_caps_get = sw_eth_rx_adapter_caps_get,
943 
944 			.timer_adapter_caps_get = sw_timer_adapter_caps_get,
945 
946 			.crypto_adapter_caps_get = sw_crypto_adapter_caps_get,
947 
948 			.xstats_get = sw_xstats_get,
949 			.xstats_get_names = sw_xstats_get_names,
950 			.xstats_get_by_name = sw_xstats_get_by_name,
951 			.xstats_reset = sw_xstats_reset,
952 
953 			.dev_selftest = test_sw_eventdev,
954 	};
955 
956 	static const char *const args[] = {
957 		NUMA_NODE_ARG,
958 		SCHED_QUANTA_ARG,
959 		CREDIT_QUANTA_ARG,
960 		NULL
961 	};
962 	const char *name;
963 	const char *params;
964 	struct rte_eventdev *dev;
965 	struct sw_evdev *sw;
966 	int socket_id = rte_socket_id();
967 	int sched_quanta  = SW_DEFAULT_SCHED_QUANTA;
968 	int credit_quanta = SW_DEFAULT_CREDIT_QUANTA;
969 
970 	name = rte_vdev_device_name(vdev);
971 	params = rte_vdev_device_args(vdev);
972 	if (params != NULL && params[0] != '\0') {
973 		struct rte_kvargs *kvlist = rte_kvargs_parse(params, args);
974 
975 		if (!kvlist) {
976 			SW_LOG_INFO(
977 				"Ignoring unsupported parameters when creating device '%s'\n",
978 				name);
979 		} else {
980 			int ret = rte_kvargs_process(kvlist, NUMA_NODE_ARG,
981 					assign_numa_node, &socket_id);
982 			if (ret != 0) {
983 				SW_LOG_ERR(
984 					"%s: Error parsing numa node parameter",
985 					name);
986 				rte_kvargs_free(kvlist);
987 				return ret;
988 			}
989 
990 			ret = rte_kvargs_process(kvlist, SCHED_QUANTA_ARG,
991 					set_sched_quanta, &sched_quanta);
992 			if (ret != 0) {
993 				SW_LOG_ERR(
994 					"%s: Error parsing sched quanta parameter",
995 					name);
996 				rte_kvargs_free(kvlist);
997 				return ret;
998 			}
999 
1000 			ret = rte_kvargs_process(kvlist, CREDIT_QUANTA_ARG,
1001 					set_credit_quanta, &credit_quanta);
1002 			if (ret != 0) {
1003 				SW_LOG_ERR(
1004 					"%s: Error parsing credit quanta parameter",
1005 					name);
1006 				rte_kvargs_free(kvlist);
1007 				return ret;
1008 			}
1009 
1010 			rte_kvargs_free(kvlist);
1011 		}
1012 	}
1013 
1014 	SW_LOG_INFO(
1015 			"Creating eventdev sw device %s, numa_node=%d, sched_quanta=%d, credit_quanta=%d\n",
1016 			name, socket_id, sched_quanta, credit_quanta);
1017 
1018 	dev = rte_event_pmd_vdev_init(name,
1019 			sizeof(struct sw_evdev), socket_id);
1020 	if (dev == NULL) {
1021 		SW_LOG_ERR("eventdev vdev init() failed");
1022 		return -EFAULT;
1023 	}
1024 	dev->dev_ops = &evdev_sw_ops;
1025 	dev->enqueue = sw_event_enqueue;
1026 	dev->enqueue_burst = sw_event_enqueue_burst;
1027 	dev->enqueue_new_burst = sw_event_enqueue_burst;
1028 	dev->enqueue_forward_burst = sw_event_enqueue_burst;
1029 	dev->dequeue = sw_event_dequeue;
1030 	dev->dequeue_burst = sw_event_dequeue_burst;
1031 
1032 	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
1033 		return 0;
1034 
1035 	sw = dev->data->dev_private;
1036 	sw->data = dev->data;
1037 
1038 	/* copy values passed from vdev command line to instance */
1039 	sw->credit_update_quanta = credit_quanta;
1040 	sw->sched_quanta = sched_quanta;
1041 
1042 	/* register service with EAL */
1043 	struct rte_service_spec service;
1044 	memset(&service, 0, sizeof(struct rte_service_spec));
1045 	snprintf(service.name, sizeof(service.name), "%s_service", name);
1046 	snprintf(sw->service_name, sizeof(sw->service_name), "%s_service",
1047 			name);
1048 	service.socket_id = socket_id;
1049 	service.callback = sw_sched_service_func;
1050 	service.callback_userdata = (void *)dev;
1051 
1052 	int32_t ret = rte_service_component_register(&service, &sw->service_id);
1053 	if (ret) {
1054 		SW_LOG_ERR("service register() failed");
1055 		return -ENOEXEC;
1056 	}
1057 
1058 	dev->data->service_inited = 1;
1059 	dev->data->service_id = sw->service_id;
1060 
1061 	return 0;
1062 }
1063 
1064 static int
1065 sw_remove(struct rte_vdev_device *vdev)
1066 {
1067 	const char *name;
1068 
1069 	name = rte_vdev_device_name(vdev);
1070 	if (name == NULL)
1071 		return -EINVAL;
1072 
1073 	SW_LOG_INFO("Closing eventdev sw device %s\n", name);
1074 
1075 	return rte_event_pmd_vdev_uninit(name);
1076 }
1077 
1078 static struct rte_vdev_driver evdev_sw_pmd_drv = {
1079 	.probe = sw_probe,
1080 	.remove = sw_remove
1081 };
1082 
1083 RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_SW_PMD, evdev_sw_pmd_drv);
1084 RTE_PMD_REGISTER_PARAM_STRING(event_sw, NUMA_NODE_ARG "=<int> "
1085 		SCHED_QUANTA_ARG "=<int>" CREDIT_QUANTA_ARG "=<int>");
1086 
1087 /* declared extern in header, for access from other .c files */
1088 int eventdev_sw_log_level;
1089 
1090 RTE_INIT(evdev_sw_init_log)
1091 {
1092 	eventdev_sw_log_level = rte_log_register("pmd.event.sw");
1093 	if (eventdev_sw_log_level >= 0)
1094 		rte_log_set_level(eventdev_sw_log_level, RTE_LOG_NOTICE);
1095 }
1096