1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2018 Ericsson AB 3 */ 4 5 #include <stdbool.h> 6 7 #include <rte_cycles.h> 8 #include <eventdev_pmd.h> 9 #include <eventdev_pmd_vdev.h> 10 #include <rte_random.h> 11 #include <rte_ring_elem.h> 12 13 #include "dsw_evdev.h" 14 15 #define EVENTDEV_NAME_DSW_PMD event_dsw 16 17 static int 18 dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id, 19 const struct rte_event_port_conf *conf) 20 { 21 struct dsw_evdev *dsw = dsw_pmd_priv(dev); 22 struct dsw_port *port; 23 struct rte_event_ring *in_ring; 24 struct rte_ring *ctl_in_ring; 25 char ring_name[RTE_RING_NAMESIZE]; 26 27 port = &dsw->ports[port_id]; 28 29 *port = (struct dsw_port) { 30 .id = port_id, 31 .dsw = dsw, 32 .dequeue_depth = conf->dequeue_depth, 33 .enqueue_depth = conf->enqueue_depth, 34 .new_event_threshold = conf->new_event_threshold 35 }; 36 37 snprintf(ring_name, sizeof(ring_name), "dsw%d_p%u", dev->data->dev_id, 38 port_id); 39 40 in_ring = rte_event_ring_create(ring_name, DSW_IN_RING_SIZE, 41 dev->data->socket_id, 42 RING_F_SC_DEQ|RING_F_EXACT_SZ); 43 44 if (in_ring == NULL) 45 return -ENOMEM; 46 47 snprintf(ring_name, sizeof(ring_name), "dswctl%d_p%u", 48 dev->data->dev_id, port_id); 49 50 ctl_in_ring = rte_ring_create_elem(ring_name, 51 sizeof(struct dsw_ctl_msg), 52 DSW_CTL_IN_RING_SIZE, 53 dev->data->socket_id, 54 RING_F_SC_DEQ|RING_F_EXACT_SZ); 55 56 if (ctl_in_ring == NULL) { 57 rte_event_ring_free(in_ring); 58 return -ENOMEM; 59 } 60 61 port->in_ring = in_ring; 62 port->ctl_in_ring = ctl_in_ring; 63 64 rte_atomic16_init(&port->load); 65 rte_atomic32_init(&port->immigration_load); 66 67 port->load_update_interval = 68 (DSW_LOAD_UPDATE_INTERVAL * rte_get_timer_hz()) / US_PER_S; 69 70 port->migration_interval = 71 (DSW_MIGRATION_INTERVAL * rte_get_timer_hz()) / US_PER_S; 72 73 dev->data->ports[port_id] = port; 74 75 return 0; 76 } 77 78 static void 79 dsw_port_def_conf(struct rte_eventdev *dev __rte_unused, 80 uint8_t port_id __rte_unused, 81 struct rte_event_port_conf *port_conf) 82 { 83 *port_conf = (struct rte_event_port_conf) { 84 .new_event_threshold = 1024, 85 .dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH / 4, 86 .enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH / 4 87 }; 88 } 89 90 static void 91 dsw_port_release(void *p) 92 { 93 struct dsw_port *port = p; 94 95 rte_event_ring_free(port->in_ring); 96 rte_ring_free(port->ctl_in_ring); 97 } 98 99 static int 100 dsw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id, 101 const struct rte_event_queue_conf *conf) 102 { 103 struct dsw_evdev *dsw = dsw_pmd_priv(dev); 104 struct dsw_queue *queue = &dsw->queues[queue_id]; 105 106 if (RTE_EVENT_QUEUE_CFG_ALL_TYPES & conf->event_queue_cfg) 107 return -ENOTSUP; 108 109 /* SINGLE_LINK is better off treated as TYPE_ATOMIC, since it 110 * avoid the "fake" TYPE_PARALLEL flow_id assignment. Since 111 * the queue will only have a single serving port, no 112 * migration will ever happen, so the extra TYPE_ATOMIC 113 * migration overhead is avoided. 114 */ 115 if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK & conf->event_queue_cfg) 116 queue->schedule_type = RTE_SCHED_TYPE_ATOMIC; 117 else { 118 if (conf->schedule_type == RTE_SCHED_TYPE_ORDERED) 119 return -ENOTSUP; 120 /* atomic or parallel */ 121 queue->schedule_type = conf->schedule_type; 122 } 123 124 queue->num_serving_ports = 0; 125 126 return 0; 127 } 128 129 static void 130 dsw_queue_def_conf(struct rte_eventdev *dev __rte_unused, 131 uint8_t queue_id __rte_unused, 132 struct rte_event_queue_conf *queue_conf) 133 { 134 *queue_conf = (struct rte_event_queue_conf) { 135 .nb_atomic_flows = 4096, 136 .schedule_type = RTE_SCHED_TYPE_ATOMIC, 137 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL 138 }; 139 } 140 141 static void 142 dsw_queue_release(struct rte_eventdev *dev __rte_unused, 143 uint8_t queue_id __rte_unused) 144 { 145 } 146 147 static void 148 queue_add_port(struct dsw_queue *queue, uint16_t port_id) 149 { 150 queue->serving_ports[queue->num_serving_ports] = port_id; 151 queue->num_serving_ports++; 152 } 153 154 static bool 155 queue_remove_port(struct dsw_queue *queue, uint16_t port_id) 156 { 157 uint16_t i; 158 159 for (i = 0; i < queue->num_serving_ports; i++) 160 if (queue->serving_ports[i] == port_id) { 161 uint16_t last_idx = queue->num_serving_ports - 1; 162 if (i != last_idx) 163 queue->serving_ports[i] = 164 queue->serving_ports[last_idx]; 165 queue->num_serving_ports--; 166 return true; 167 } 168 return false; 169 } 170 171 static int 172 dsw_port_link_unlink(struct rte_eventdev *dev, void *port, 173 const uint8_t queues[], uint16_t num, bool link) 174 { 175 struct dsw_evdev *dsw = dsw_pmd_priv(dev); 176 struct dsw_port *p = port; 177 uint16_t i; 178 uint16_t count = 0; 179 180 for (i = 0; i < num; i++) { 181 uint8_t qid = queues[i]; 182 struct dsw_queue *q = &dsw->queues[qid]; 183 if (link) { 184 queue_add_port(q, p->id); 185 count++; 186 } else { 187 bool removed = queue_remove_port(q, p->id); 188 if (removed) 189 count++; 190 } 191 } 192 193 return count; 194 } 195 196 static int 197 dsw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[], 198 const uint8_t priorities[] __rte_unused, uint16_t num) 199 { 200 return dsw_port_link_unlink(dev, port, queues, num, true); 201 } 202 203 static int 204 dsw_port_unlink(struct rte_eventdev *dev, void *port, uint8_t queues[], 205 uint16_t num) 206 { 207 return dsw_port_link_unlink(dev, port, queues, num, false); 208 } 209 210 static void 211 dsw_info_get(struct rte_eventdev *dev __rte_unused, 212 struct rte_event_dev_info *info) 213 { 214 *info = (struct rte_event_dev_info) { 215 .driver_name = DSW_PMD_NAME, 216 .max_event_queues = DSW_MAX_QUEUES, 217 .max_event_queue_flows = DSW_MAX_FLOWS, 218 .max_event_queue_priority_levels = 1, 219 .max_event_priority_levels = 1, 220 .max_event_ports = DSW_MAX_PORTS, 221 .max_event_port_dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH, 222 .max_event_port_enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH, 223 .max_num_events = DSW_MAX_EVENTS, 224 .event_dev_cap = RTE_EVENT_DEV_CAP_BURST_MODE| 225 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED| 226 RTE_EVENT_DEV_CAP_NONSEQ_MODE| 227 RTE_EVENT_DEV_CAP_MULTIPLE_QUEUE_PORT| 228 RTE_EVENT_DEV_CAP_CARRY_FLOW_ID 229 }; 230 } 231 232 static int 233 dsw_configure(const struct rte_eventdev *dev) 234 { 235 struct dsw_evdev *dsw = dsw_pmd_priv(dev); 236 const struct rte_event_dev_config *conf = &dev->data->dev_conf; 237 int32_t min_max_in_flight; 238 239 dsw->num_ports = conf->nb_event_ports; 240 dsw->num_queues = conf->nb_event_queues; 241 242 /* Avoid a situation where consumer ports are holding all the 243 * credits, without making use of them. 244 */ 245 min_max_in_flight = conf->nb_event_ports * DSW_PORT_MAX_CREDITS; 246 247 dsw->max_inflight = RTE_MAX(conf->nb_events_limit, min_max_in_flight); 248 249 return 0; 250 } 251 252 253 static void 254 initial_flow_to_port_assignment(struct dsw_evdev *dsw) 255 { 256 uint8_t queue_id; 257 for (queue_id = 0; queue_id < dsw->num_queues; queue_id++) { 258 struct dsw_queue *queue = &dsw->queues[queue_id]; 259 uint16_t flow_hash; 260 for (flow_hash = 0; flow_hash < DSW_MAX_FLOWS; flow_hash++) { 261 uint8_t port_idx = 262 rte_rand() % queue->num_serving_ports; 263 uint8_t port_id = 264 queue->serving_ports[port_idx]; 265 dsw->queues[queue_id].flow_to_port_map[flow_hash] = 266 port_id; 267 } 268 } 269 } 270 271 static int 272 dsw_start(struct rte_eventdev *dev) 273 { 274 struct dsw_evdev *dsw = dsw_pmd_priv(dev); 275 uint16_t i; 276 uint64_t now; 277 278 rte_atomic32_init(&dsw->credits_on_loan); 279 280 initial_flow_to_port_assignment(dsw); 281 282 now = rte_get_timer_cycles(); 283 for (i = 0; i < dsw->num_ports; i++) { 284 dsw->ports[i].measurement_start = now; 285 dsw->ports[i].busy_start = now; 286 } 287 288 return 0; 289 } 290 291 static void 292 dsw_port_drain_buf(uint8_t dev_id, struct rte_event *buf, uint16_t buf_len, 293 eventdev_stop_flush_t flush, void *flush_arg) 294 { 295 uint16_t i; 296 297 for (i = 0; i < buf_len; i++) 298 flush(dev_id, buf[i], flush_arg); 299 } 300 301 static void 302 dsw_port_drain_paused(uint8_t dev_id, struct dsw_port *port, 303 eventdev_stop_flush_t flush, void *flush_arg) 304 { 305 dsw_port_drain_buf(dev_id, port->paused_events, port->paused_events_len, 306 flush, flush_arg); 307 } 308 309 static void 310 dsw_port_drain_out(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port, 311 eventdev_stop_flush_t flush, void *flush_arg) 312 { 313 uint16_t dport_id; 314 315 for (dport_id = 0; dport_id < dsw->num_ports; dport_id++) 316 if (dport_id != port->id) 317 dsw_port_drain_buf(dev_id, port->out_buffer[dport_id], 318 port->out_buffer_len[dport_id], 319 flush, flush_arg); 320 } 321 322 static void 323 dsw_port_drain_in_ring(uint8_t dev_id, struct dsw_port *port, 324 eventdev_stop_flush_t flush, void *flush_arg) 325 { 326 struct rte_event ev; 327 328 while (rte_event_ring_dequeue_burst(port->in_ring, &ev, 1, NULL)) 329 flush(dev_id, ev, flush_arg); 330 } 331 332 static void 333 dsw_drain(uint8_t dev_id, struct dsw_evdev *dsw, 334 eventdev_stop_flush_t flush, void *flush_arg) 335 { 336 uint16_t port_id; 337 338 if (flush == NULL) 339 return; 340 341 for (port_id = 0; port_id < dsw->num_ports; port_id++) { 342 struct dsw_port *port = &dsw->ports[port_id]; 343 344 dsw_port_drain_out(dev_id, dsw, port, flush, flush_arg); 345 dsw_port_drain_paused(dev_id, port, flush, flush_arg); 346 dsw_port_drain_in_ring(dev_id, port, flush, flush_arg); 347 } 348 } 349 350 static void 351 dsw_stop(struct rte_eventdev *dev) 352 { 353 struct dsw_evdev *dsw = dsw_pmd_priv(dev); 354 uint8_t dev_id; 355 eventdev_stop_flush_t flush; 356 void *flush_arg; 357 358 dev_id = dev->data->dev_id; 359 flush = dev->dev_ops->dev_stop_flush; 360 flush_arg = dev->data->dev_stop_flush_arg; 361 362 dsw_drain(dev_id, dsw, flush, flush_arg); 363 } 364 365 static int 366 dsw_close(struct rte_eventdev *dev) 367 { 368 struct dsw_evdev *dsw = dsw_pmd_priv(dev); 369 370 dsw->num_ports = 0; 371 dsw->num_queues = 0; 372 373 return 0; 374 } 375 376 static struct rte_eventdev_ops dsw_evdev_ops = { 377 .port_setup = dsw_port_setup, 378 .port_def_conf = dsw_port_def_conf, 379 .port_release = dsw_port_release, 380 .queue_setup = dsw_queue_setup, 381 .queue_def_conf = dsw_queue_def_conf, 382 .queue_release = dsw_queue_release, 383 .port_link = dsw_port_link, 384 .port_unlink = dsw_port_unlink, 385 .dev_infos_get = dsw_info_get, 386 .dev_configure = dsw_configure, 387 .dev_start = dsw_start, 388 .dev_stop = dsw_stop, 389 .dev_close = dsw_close, 390 .xstats_get = dsw_xstats_get, 391 .xstats_get_names = dsw_xstats_get_names, 392 .xstats_get_by_name = dsw_xstats_get_by_name 393 }; 394 395 static int 396 dsw_probe(struct rte_vdev_device *vdev) 397 { 398 const char *name; 399 struct rte_eventdev *dev; 400 struct dsw_evdev *dsw; 401 402 name = rte_vdev_device_name(vdev); 403 404 dev = rte_event_pmd_vdev_init(name, sizeof(struct dsw_evdev), 405 rte_socket_id()); 406 if (dev == NULL) 407 return -EFAULT; 408 409 dev->dev_ops = &dsw_evdev_ops; 410 dev->enqueue = dsw_event_enqueue; 411 dev->enqueue_burst = dsw_event_enqueue_burst; 412 dev->enqueue_new_burst = dsw_event_enqueue_new_burst; 413 dev->enqueue_forward_burst = dsw_event_enqueue_forward_burst; 414 dev->dequeue = dsw_event_dequeue; 415 dev->dequeue_burst = dsw_event_dequeue_burst; 416 417 if (rte_eal_process_type() != RTE_PROC_PRIMARY) 418 return 0; 419 420 dsw = dev->data->dev_private; 421 dsw->data = dev->data; 422 423 return 0; 424 } 425 426 static int 427 dsw_remove(struct rte_vdev_device *vdev) 428 { 429 const char *name; 430 431 name = rte_vdev_device_name(vdev); 432 if (name == NULL) 433 return -EINVAL; 434 435 return rte_event_pmd_vdev_uninit(name); 436 } 437 438 static struct rte_vdev_driver evdev_dsw_pmd_drv = { 439 .probe = dsw_probe, 440 .remove = dsw_remove 441 }; 442 443 RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_DSW_PMD, evdev_dsw_pmd_drv); 444