1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2017 Intel Corporation 3 */ 4 #include <unistd.h> 5 6 #include <rte_cryptodev.h> 7 #include <rte_malloc.h> 8 9 #include "rte_cryptodev_scheduler_operations.h" 10 #include "scheduler_pmd_private.h" 11 12 #define MC_SCHED_ENQ_RING_NAME_PREFIX "MCS_ENQR_" 13 #define MC_SCHED_DEQ_RING_NAME_PREFIX "MCS_DEQR_" 14 15 #define MC_SCHED_BUFFER_SIZE 32 16 17 #define CRYPTO_OP_STATUS_BIT_COMPLETE 0x80 18 19 /** multi-core scheduler context */ 20 struct mc_scheduler_ctx { 21 uint32_t num_workers; /**< Number of workers polling */ 22 uint32_t stop_signal; 23 24 struct rte_ring *sched_enq_ring[RTE_MAX_LCORE]; 25 struct rte_ring *sched_deq_ring[RTE_MAX_LCORE]; 26 }; 27 28 struct mc_scheduler_qp_ctx { 29 struct scheduler_slave slaves[RTE_CRYPTODEV_SCHEDULER_MAX_NB_SLAVES]; 30 uint32_t nb_slaves; 31 32 uint32_t last_enq_worker_idx; 33 uint32_t last_deq_worker_idx; 34 35 struct mc_scheduler_ctx *mc_private_ctx; 36 }; 37 38 static uint16_t 39 schedule_enqueue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) 40 { 41 struct mc_scheduler_qp_ctx *mc_qp_ctx = 42 ((struct scheduler_qp_ctx *)qp)->private_qp_ctx; 43 struct mc_scheduler_ctx *mc_ctx = mc_qp_ctx->mc_private_ctx; 44 uint32_t worker_idx = mc_qp_ctx->last_enq_worker_idx; 45 uint16_t i, processed_ops = 0; 46 47 if (unlikely(nb_ops == 0)) 48 return 0; 49 50 for (i = 0; i < mc_ctx->num_workers && nb_ops != 0; i++) { 51 struct rte_ring *enq_ring = mc_ctx->sched_enq_ring[worker_idx]; 52 uint16_t nb_queue_ops = rte_ring_enqueue_burst(enq_ring, 53 (void *)(&ops[processed_ops]), nb_ops, NULL); 54 55 nb_ops -= nb_queue_ops; 56 processed_ops += nb_queue_ops; 57 58 if (++worker_idx == mc_ctx->num_workers) 59 worker_idx = 0; 60 } 61 mc_qp_ctx->last_enq_worker_idx = worker_idx; 62 63 return processed_ops; 64 } 65 66 static uint16_t 67 schedule_enqueue_ordering(void *qp, struct rte_crypto_op **ops, 68 uint16_t nb_ops) 69 { 70 struct rte_ring *order_ring = 71 ((struct scheduler_qp_ctx *)qp)->order_ring; 72 uint16_t nb_ops_to_enq = get_max_enqueue_order_count(order_ring, 73 nb_ops); 74 uint16_t nb_ops_enqd = schedule_enqueue(qp, ops, 75 nb_ops_to_enq); 76 77 scheduler_order_insert(order_ring, ops, nb_ops_enqd); 78 79 return nb_ops_enqd; 80 } 81 82 83 static uint16_t 84 schedule_dequeue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) 85 { 86 struct mc_scheduler_qp_ctx *mc_qp_ctx = 87 ((struct scheduler_qp_ctx *)qp)->private_qp_ctx; 88 struct mc_scheduler_ctx *mc_ctx = mc_qp_ctx->mc_private_ctx; 89 uint32_t worker_idx = mc_qp_ctx->last_deq_worker_idx; 90 uint16_t i, processed_ops = 0; 91 92 for (i = 0; i < mc_ctx->num_workers && nb_ops != 0; i++) { 93 struct rte_ring *deq_ring = mc_ctx->sched_deq_ring[worker_idx]; 94 uint16_t nb_deq_ops = rte_ring_dequeue_burst(deq_ring, 95 (void *)(&ops[processed_ops]), nb_ops, NULL); 96 97 nb_ops -= nb_deq_ops; 98 processed_ops += nb_deq_ops; 99 if (++worker_idx == mc_ctx->num_workers) 100 worker_idx = 0; 101 } 102 103 mc_qp_ctx->last_deq_worker_idx = worker_idx; 104 105 return processed_ops; 106 107 } 108 109 static uint16_t 110 schedule_dequeue_ordering(void *qp, struct rte_crypto_op **ops, 111 uint16_t nb_ops) 112 { 113 struct rte_ring *order_ring = ((struct scheduler_qp_ctx *)qp)->order_ring; 114 struct rte_crypto_op *op; 115 uint32_t nb_objs = rte_ring_count(order_ring); 116 uint32_t nb_ops_to_deq = 0; 117 uint32_t nb_ops_deqd = 0; 118 119 if (nb_objs > nb_ops) 120 nb_objs = nb_ops; 121 122 while (nb_ops_to_deq < nb_objs) { 123 SCHEDULER_GET_RING_OBJ(order_ring, nb_ops_to_deq, op); 124 125 if (!(op->status & CRYPTO_OP_STATUS_BIT_COMPLETE)) 126 break; 127 128 op->status &= ~CRYPTO_OP_STATUS_BIT_COMPLETE; 129 nb_ops_to_deq++; 130 } 131 132 if (nb_ops_to_deq) { 133 nb_ops_deqd = rte_ring_sc_dequeue_bulk(order_ring, 134 (void **)ops, nb_ops_to_deq, NULL); 135 } 136 137 return nb_ops_deqd; 138 } 139 140 static int 141 slave_attach(__rte_unused struct rte_cryptodev *dev, 142 __rte_unused uint8_t slave_id) 143 { 144 return 0; 145 } 146 147 static int 148 slave_detach(__rte_unused struct rte_cryptodev *dev, 149 __rte_unused uint8_t slave_id) 150 { 151 return 0; 152 } 153 154 static int 155 mc_scheduler_worker(struct rte_cryptodev *dev) 156 { 157 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 158 struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx; 159 struct rte_ring *enq_ring; 160 struct rte_ring *deq_ring; 161 uint32_t core_id = rte_lcore_id(); 162 int i, worker_idx = -1; 163 struct scheduler_slave *slave; 164 struct rte_crypto_op *enq_ops[MC_SCHED_BUFFER_SIZE]; 165 struct rte_crypto_op *deq_ops[MC_SCHED_BUFFER_SIZE]; 166 uint16_t processed_ops; 167 uint16_t pending_enq_ops = 0; 168 uint16_t pending_enq_ops_idx = 0; 169 uint16_t pending_deq_ops = 0; 170 uint16_t pending_deq_ops_idx = 0; 171 uint16_t inflight_ops = 0; 172 const uint8_t reordering_enabled = sched_ctx->reordering_enabled; 173 174 for (i = 0; i < (int)sched_ctx->nb_wc; i++) { 175 if (sched_ctx->wc_pool[i] == core_id) { 176 worker_idx = i; 177 break; 178 } 179 } 180 if (worker_idx == -1) { 181 CR_SCHED_LOG(ERR, "worker on core %u:cannot find worker index!", 182 core_id); 183 return -1; 184 } 185 186 slave = &sched_ctx->slaves[worker_idx]; 187 enq_ring = mc_ctx->sched_enq_ring[worker_idx]; 188 deq_ring = mc_ctx->sched_deq_ring[worker_idx]; 189 190 while (!mc_ctx->stop_signal) { 191 if (pending_enq_ops) { 192 processed_ops = 193 rte_cryptodev_enqueue_burst(slave->dev_id, 194 slave->qp_id, &enq_ops[pending_enq_ops_idx], 195 pending_enq_ops); 196 pending_enq_ops -= processed_ops; 197 pending_enq_ops_idx += processed_ops; 198 inflight_ops += processed_ops; 199 } else { 200 processed_ops = rte_ring_dequeue_burst(enq_ring, (void *)enq_ops, 201 MC_SCHED_BUFFER_SIZE, NULL); 202 if (processed_ops) { 203 pending_enq_ops_idx = rte_cryptodev_enqueue_burst( 204 slave->dev_id, slave->qp_id, 205 enq_ops, processed_ops); 206 pending_enq_ops = processed_ops - pending_enq_ops_idx; 207 inflight_ops += pending_enq_ops_idx; 208 } 209 } 210 211 if (pending_deq_ops) { 212 processed_ops = rte_ring_enqueue_burst( 213 deq_ring, (void *)&deq_ops[pending_deq_ops_idx], 214 pending_deq_ops, NULL); 215 pending_deq_ops -= processed_ops; 216 pending_deq_ops_idx += processed_ops; 217 } else if (inflight_ops) { 218 processed_ops = rte_cryptodev_dequeue_burst(slave->dev_id, 219 slave->qp_id, deq_ops, MC_SCHED_BUFFER_SIZE); 220 if (processed_ops) { 221 inflight_ops -= processed_ops; 222 if (reordering_enabled) { 223 uint16_t j; 224 225 for (j = 0; j < processed_ops; j++) { 226 deq_ops[j]->status |= 227 CRYPTO_OP_STATUS_BIT_COMPLETE; 228 } 229 } else { 230 pending_deq_ops_idx = rte_ring_enqueue_burst( 231 deq_ring, (void *)deq_ops, processed_ops, 232 NULL); 233 pending_deq_ops = processed_ops - 234 pending_deq_ops_idx; 235 } 236 } 237 } 238 239 rte_pause(); 240 } 241 242 return 0; 243 } 244 245 static int 246 scheduler_start(struct rte_cryptodev *dev) 247 { 248 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 249 struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx; 250 uint16_t i; 251 252 mc_ctx->stop_signal = 0; 253 254 for (i = 0; i < sched_ctx->nb_wc; i++) 255 rte_eal_remote_launch( 256 (lcore_function_t *)mc_scheduler_worker, dev, 257 sched_ctx->wc_pool[i]); 258 259 if (sched_ctx->reordering_enabled) { 260 dev->enqueue_burst = &schedule_enqueue_ordering; 261 dev->dequeue_burst = &schedule_dequeue_ordering; 262 } else { 263 dev->enqueue_burst = &schedule_enqueue; 264 dev->dequeue_burst = &schedule_dequeue; 265 } 266 267 for (i = 0; i < dev->data->nb_queue_pairs; i++) { 268 struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[i]; 269 struct mc_scheduler_qp_ctx *mc_qp_ctx = 270 qp_ctx->private_qp_ctx; 271 uint32_t j; 272 273 memset(mc_qp_ctx->slaves, 0, 274 RTE_CRYPTODEV_SCHEDULER_MAX_NB_SLAVES * 275 sizeof(struct scheduler_slave)); 276 for (j = 0; j < sched_ctx->nb_slaves; j++) { 277 mc_qp_ctx->slaves[j].dev_id = 278 sched_ctx->slaves[j].dev_id; 279 mc_qp_ctx->slaves[j].qp_id = i; 280 } 281 282 mc_qp_ctx->nb_slaves = sched_ctx->nb_slaves; 283 284 mc_qp_ctx->last_enq_worker_idx = 0; 285 mc_qp_ctx->last_deq_worker_idx = 0; 286 } 287 288 return 0; 289 } 290 291 static int 292 scheduler_stop(struct rte_cryptodev *dev) 293 { 294 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 295 struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx; 296 uint16_t i; 297 298 mc_ctx->stop_signal = 1; 299 300 for (i = 0; i < sched_ctx->nb_wc; i++) 301 rte_eal_wait_lcore(sched_ctx->wc_pool[i]); 302 303 return 0; 304 } 305 306 static int 307 scheduler_config_qp(struct rte_cryptodev *dev, uint16_t qp_id) 308 { 309 struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[qp_id]; 310 struct mc_scheduler_qp_ctx *mc_qp_ctx; 311 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 312 struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx; 313 314 mc_qp_ctx = rte_zmalloc_socket(NULL, sizeof(*mc_qp_ctx), 0, 315 rte_socket_id()); 316 if (!mc_qp_ctx) { 317 CR_SCHED_LOG(ERR, "failed allocate memory for private queue pair"); 318 return -ENOMEM; 319 } 320 321 mc_qp_ctx->mc_private_ctx = mc_ctx; 322 qp_ctx->private_qp_ctx = (void *)mc_qp_ctx; 323 324 325 return 0; 326 } 327 328 static int 329 scheduler_create_private_ctx(struct rte_cryptodev *dev) 330 { 331 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 332 struct mc_scheduler_ctx *mc_ctx = NULL; 333 uint16_t i; 334 335 if (sched_ctx->private_ctx) { 336 rte_free(sched_ctx->private_ctx); 337 sched_ctx->private_ctx = NULL; 338 } 339 340 mc_ctx = rte_zmalloc_socket(NULL, sizeof(struct mc_scheduler_ctx), 0, 341 rte_socket_id()); 342 if (!mc_ctx) { 343 CR_SCHED_LOG(ERR, "failed allocate memory"); 344 return -ENOMEM; 345 } 346 347 mc_ctx->num_workers = sched_ctx->nb_wc; 348 for (i = 0; i < sched_ctx->nb_wc; i++) { 349 char r_name[16]; 350 351 snprintf(r_name, sizeof(r_name), MC_SCHED_ENQ_RING_NAME_PREFIX 352 "%u_%u", dev->data->dev_id, i); 353 mc_ctx->sched_enq_ring[i] = rte_ring_lookup(r_name); 354 if (!mc_ctx->sched_enq_ring[i]) { 355 mc_ctx->sched_enq_ring[i] = rte_ring_create(r_name, 356 PER_SLAVE_BUFF_SIZE, 357 rte_socket_id(), 358 RING_F_SC_DEQ | RING_F_SP_ENQ); 359 if (!mc_ctx->sched_enq_ring[i]) { 360 CR_SCHED_LOG(ERR, "Cannot create ring for worker %u", 361 i); 362 goto exit; 363 } 364 } 365 snprintf(r_name, sizeof(r_name), MC_SCHED_DEQ_RING_NAME_PREFIX 366 "%u_%u", dev->data->dev_id, i); 367 mc_ctx->sched_deq_ring[i] = rte_ring_lookup(r_name); 368 if (!mc_ctx->sched_deq_ring[i]) { 369 mc_ctx->sched_deq_ring[i] = rte_ring_create(r_name, 370 PER_SLAVE_BUFF_SIZE, 371 rte_socket_id(), 372 RING_F_SC_DEQ | RING_F_SP_ENQ); 373 if (!mc_ctx->sched_deq_ring[i]) { 374 CR_SCHED_LOG(ERR, "Cannot create ring for worker %u", 375 i); 376 goto exit; 377 } 378 } 379 } 380 381 sched_ctx->private_ctx = (void *)mc_ctx; 382 383 return 0; 384 385 exit: 386 for (i = 0; i < sched_ctx->nb_wc; i++) { 387 rte_ring_free(mc_ctx->sched_enq_ring[i]); 388 rte_ring_free(mc_ctx->sched_deq_ring[i]); 389 } 390 rte_free(mc_ctx); 391 392 return -1; 393 } 394 395 static struct rte_cryptodev_scheduler_ops scheduler_mc_ops = { 396 slave_attach, 397 slave_detach, 398 scheduler_start, 399 scheduler_stop, 400 scheduler_config_qp, 401 scheduler_create_private_ctx, 402 NULL, /* option_set */ 403 NULL /* option_get */ 404 }; 405 406 static struct rte_cryptodev_scheduler mc_scheduler = { 407 .name = "multicore-scheduler", 408 .description = "scheduler which will run burst across multiple cpu cores", 409 .mode = CDEV_SCHED_MODE_MULTICORE, 410 .ops = &scheduler_mc_ops 411 }; 412 413 struct rte_cryptodev_scheduler *crypto_scheduler_multicore = &mc_scheduler; 414