1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2017 Intel Corporation 3 */ 4 #include <unistd.h> 5 6 #include <cryptodev_pmd.h> 7 #include <rte_malloc.h> 8 9 #include "rte_cryptodev_scheduler_operations.h" 10 #include "scheduler_pmd_private.h" 11 12 #define MC_SCHED_ENQ_RING_NAME_PREFIX "MCS_ENQR_" 13 #define MC_SCHED_DEQ_RING_NAME_PREFIX "MCS_DEQR_" 14 15 #define MC_SCHED_BUFFER_SIZE 32 16 17 #define CRYPTO_OP_STATUS_BIT_COMPLETE 0x80 18 19 /** multi-core scheduler context */ 20 struct mc_scheduler_ctx { 21 uint32_t num_workers; /**< Number of workers polling */ 22 uint32_t stop_signal; 23 24 struct rte_ring *sched_enq_ring[RTE_MAX_LCORE]; 25 struct rte_ring *sched_deq_ring[RTE_MAX_LCORE]; 26 }; 27 28 struct mc_scheduler_qp_ctx { 29 struct scheduler_worker workers[RTE_CRYPTODEV_SCHEDULER_MAX_NB_WORKERS]; 30 uint32_t nb_workers; 31 32 uint32_t last_enq_worker_idx; 33 uint32_t last_deq_worker_idx; 34 35 struct mc_scheduler_ctx *mc_private_ctx; 36 }; 37 38 static uint16_t 39 schedule_enqueue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) 40 { 41 struct mc_scheduler_qp_ctx *mc_qp_ctx = 42 ((struct scheduler_qp_ctx *)qp)->private_qp_ctx; 43 struct mc_scheduler_ctx *mc_ctx = mc_qp_ctx->mc_private_ctx; 44 uint32_t worker_idx = mc_qp_ctx->last_enq_worker_idx; 45 uint16_t i, processed_ops = 0; 46 47 if (unlikely(nb_ops == 0)) 48 return 0; 49 50 for (i = 0; i < mc_ctx->num_workers && nb_ops != 0; i++) { 51 struct rte_ring *enq_ring = mc_ctx->sched_enq_ring[worker_idx]; 52 uint16_t nb_queue_ops = rte_ring_enqueue_burst(enq_ring, 53 (void *)(&ops[processed_ops]), nb_ops, NULL); 54 55 nb_ops -= nb_queue_ops; 56 processed_ops += nb_queue_ops; 57 58 if (++worker_idx == mc_ctx->num_workers) 59 worker_idx = 0; 60 } 61 mc_qp_ctx->last_enq_worker_idx = worker_idx; 62 63 return processed_ops; 64 } 65 66 static uint16_t 67 schedule_enqueue_ordering(void *qp, struct rte_crypto_op **ops, 68 uint16_t nb_ops) 69 { 70 struct rte_ring *order_ring = 71 ((struct scheduler_qp_ctx *)qp)->order_ring; 72 uint16_t nb_ops_to_enq = get_max_enqueue_order_count(order_ring, 73 nb_ops); 74 uint16_t nb_ops_enqd = schedule_enqueue(qp, ops, 75 nb_ops_to_enq); 76 77 scheduler_order_insert(order_ring, ops, nb_ops_enqd); 78 79 return nb_ops_enqd; 80 } 81 82 83 static uint16_t 84 schedule_dequeue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) 85 { 86 struct mc_scheduler_qp_ctx *mc_qp_ctx = 87 ((struct scheduler_qp_ctx *)qp)->private_qp_ctx; 88 struct mc_scheduler_ctx *mc_ctx = mc_qp_ctx->mc_private_ctx; 89 uint32_t worker_idx = mc_qp_ctx->last_deq_worker_idx; 90 uint16_t i, processed_ops = 0; 91 92 for (i = 0; i < mc_ctx->num_workers && nb_ops != 0; i++) { 93 struct rte_ring *deq_ring = mc_ctx->sched_deq_ring[worker_idx]; 94 uint16_t nb_deq_ops = rte_ring_dequeue_burst(deq_ring, 95 (void *)(&ops[processed_ops]), nb_ops, NULL); 96 97 nb_ops -= nb_deq_ops; 98 processed_ops += nb_deq_ops; 99 if (++worker_idx == mc_ctx->num_workers) 100 worker_idx = 0; 101 } 102 103 mc_qp_ctx->last_deq_worker_idx = worker_idx; 104 105 return processed_ops; 106 107 } 108 109 static uint16_t 110 schedule_dequeue_ordering(void *qp, struct rte_crypto_op **ops, 111 uint16_t nb_ops) 112 { 113 struct rte_ring *order_ring = 114 ((struct scheduler_qp_ctx *)qp)->order_ring; 115 struct rte_crypto_op *op; 116 uint32_t nb_objs, nb_ops_to_deq; 117 118 nb_objs = rte_ring_dequeue_burst_start(order_ring, (void **)ops, 119 nb_ops, NULL); 120 if (nb_objs == 0) 121 return 0; 122 123 for (nb_ops_to_deq = 0; nb_ops_to_deq != nb_objs; nb_ops_to_deq++) { 124 op = ops[nb_ops_to_deq]; 125 if (!(op->status & CRYPTO_OP_STATUS_BIT_COMPLETE)) 126 break; 127 op->status &= ~CRYPTO_OP_STATUS_BIT_COMPLETE; 128 } 129 130 rte_ring_dequeue_finish(order_ring, nb_ops_to_deq); 131 return nb_ops_to_deq; 132 } 133 134 static int 135 worker_attach(__rte_unused struct rte_cryptodev *dev, 136 __rte_unused uint8_t worker_id) 137 { 138 return 0; 139 } 140 141 static int 142 worker_detach(__rte_unused struct rte_cryptodev *dev, 143 __rte_unused uint8_t worker_id) 144 { 145 return 0; 146 } 147 148 static int 149 mc_scheduler_worker(struct rte_cryptodev *dev) 150 { 151 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 152 struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx; 153 struct rte_ring *enq_ring; 154 struct rte_ring *deq_ring; 155 uint32_t core_id = rte_lcore_id(); 156 int i, worker_idx = -1; 157 struct scheduler_worker *worker; 158 struct rte_crypto_op *enq_ops[MC_SCHED_BUFFER_SIZE]; 159 struct rte_crypto_op *deq_ops[MC_SCHED_BUFFER_SIZE]; 160 uint16_t processed_ops; 161 uint16_t pending_enq_ops = 0; 162 uint16_t pending_enq_ops_idx = 0; 163 uint16_t pending_deq_ops = 0; 164 uint16_t pending_deq_ops_idx = 0; 165 uint16_t inflight_ops = 0; 166 const uint8_t reordering_enabled = sched_ctx->reordering_enabled; 167 168 for (i = 0; i < (int)sched_ctx->nb_wc; i++) { 169 if (sched_ctx->wc_pool[i] == core_id) { 170 worker_idx = i; 171 break; 172 } 173 } 174 if (worker_idx == -1) { 175 CR_SCHED_LOG(ERR, "worker on core %u:cannot find worker index!", 176 core_id); 177 return -1; 178 } 179 180 worker = &sched_ctx->workers[worker_idx]; 181 enq_ring = mc_ctx->sched_enq_ring[worker_idx]; 182 deq_ring = mc_ctx->sched_deq_ring[worker_idx]; 183 184 while (!mc_ctx->stop_signal) { 185 if (pending_enq_ops) { 186 scheduler_set_worker_sessions( 187 &enq_ops[pending_enq_ops_idx], pending_enq_ops, 188 worker_idx); 189 processed_ops = 190 rte_cryptodev_enqueue_burst(worker->dev_id, 191 worker->qp_id, 192 &enq_ops[pending_enq_ops_idx], 193 pending_enq_ops); 194 if (processed_ops < pending_deq_ops) 195 scheduler_retrieve_sessions( 196 &enq_ops[pending_enq_ops_idx + 197 processed_ops], 198 pending_deq_ops - processed_ops); 199 pending_enq_ops -= processed_ops; 200 pending_enq_ops_idx += processed_ops; 201 inflight_ops += processed_ops; 202 } else { 203 processed_ops = rte_ring_dequeue_burst(enq_ring, (void *)enq_ops, 204 MC_SCHED_BUFFER_SIZE, NULL); 205 if (processed_ops) { 206 scheduler_set_worker_sessions(enq_ops, 207 processed_ops, worker_idx); 208 pending_enq_ops_idx = rte_cryptodev_enqueue_burst( 209 worker->dev_id, worker->qp_id, 210 enq_ops, processed_ops); 211 if (pending_enq_ops_idx < processed_ops) 212 scheduler_retrieve_sessions( 213 enq_ops + pending_enq_ops_idx, 214 processed_ops - 215 pending_enq_ops_idx); 216 pending_enq_ops = processed_ops - pending_enq_ops_idx; 217 inflight_ops += pending_enq_ops_idx; 218 } 219 } 220 221 if (pending_deq_ops) { 222 processed_ops = rte_ring_enqueue_burst( 223 deq_ring, (void *)&deq_ops[pending_deq_ops_idx], 224 pending_deq_ops, NULL); 225 pending_deq_ops -= processed_ops; 226 pending_deq_ops_idx += processed_ops; 227 } else if (inflight_ops) { 228 processed_ops = rte_cryptodev_dequeue_burst( 229 worker->dev_id, worker->qp_id, deq_ops, 230 MC_SCHED_BUFFER_SIZE); 231 if (processed_ops) { 232 scheduler_retrieve_sessions(deq_ops, 233 processed_ops); 234 inflight_ops -= processed_ops; 235 if (reordering_enabled) { 236 uint16_t j; 237 238 for (j = 0; j < processed_ops; j++) { 239 deq_ops[j]->status |= 240 CRYPTO_OP_STATUS_BIT_COMPLETE; 241 } 242 } else { 243 pending_deq_ops_idx = rte_ring_enqueue_burst( 244 deq_ring, (void *)deq_ops, processed_ops, 245 NULL); 246 pending_deq_ops = processed_ops - 247 pending_deq_ops_idx; 248 } 249 } 250 } 251 252 rte_pause(); 253 } 254 255 return 0; 256 } 257 258 static int 259 scheduler_start(struct rte_cryptodev *dev) 260 { 261 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 262 struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx; 263 uint16_t i; 264 265 mc_ctx->stop_signal = 0; 266 267 for (i = 0; i < sched_ctx->nb_wc; i++) 268 rte_eal_remote_launch( 269 (lcore_function_t *)mc_scheduler_worker, dev, 270 sched_ctx->wc_pool[i]); 271 272 if (sched_ctx->reordering_enabled) { 273 dev->enqueue_burst = &schedule_enqueue_ordering; 274 dev->dequeue_burst = &schedule_dequeue_ordering; 275 } else { 276 dev->enqueue_burst = &schedule_enqueue; 277 dev->dequeue_burst = &schedule_dequeue; 278 } 279 280 for (i = 0; i < dev->data->nb_queue_pairs; i++) { 281 struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[i]; 282 struct mc_scheduler_qp_ctx *mc_qp_ctx = 283 qp_ctx->private_qp_ctx; 284 uint32_t j; 285 286 memset(mc_qp_ctx->workers, 0, 287 RTE_CRYPTODEV_SCHEDULER_MAX_NB_WORKERS * 288 sizeof(struct scheduler_worker)); 289 for (j = 0; j < sched_ctx->nb_workers; j++) { 290 mc_qp_ctx->workers[j].dev_id = 291 sched_ctx->workers[j].dev_id; 292 mc_qp_ctx->workers[j].qp_id = i; 293 } 294 295 mc_qp_ctx->nb_workers = sched_ctx->nb_workers; 296 297 mc_qp_ctx->last_enq_worker_idx = 0; 298 mc_qp_ctx->last_deq_worker_idx = 0; 299 } 300 301 return 0; 302 } 303 304 static int 305 scheduler_stop(struct rte_cryptodev *dev) 306 { 307 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 308 struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx; 309 uint16_t i; 310 311 mc_ctx->stop_signal = 1; 312 313 for (i = 0; i < sched_ctx->nb_wc; i++) 314 rte_eal_wait_lcore(sched_ctx->wc_pool[i]); 315 316 return 0; 317 } 318 319 static int 320 scheduler_config_qp(struct rte_cryptodev *dev, uint16_t qp_id) 321 { 322 struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[qp_id]; 323 struct mc_scheduler_qp_ctx *mc_qp_ctx; 324 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 325 struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx; 326 327 mc_qp_ctx = rte_zmalloc_socket(NULL, sizeof(*mc_qp_ctx), 0, 328 rte_socket_id()); 329 if (!mc_qp_ctx) { 330 CR_SCHED_LOG(ERR, "failed allocate memory for private queue pair"); 331 return -ENOMEM; 332 } 333 334 mc_qp_ctx->mc_private_ctx = mc_ctx; 335 qp_ctx->private_qp_ctx = (void *)mc_qp_ctx; 336 337 338 return 0; 339 } 340 341 static int 342 scheduler_create_private_ctx(struct rte_cryptodev *dev) 343 { 344 struct scheduler_ctx *sched_ctx = dev->data->dev_private; 345 struct mc_scheduler_ctx *mc_ctx = NULL; 346 uint16_t i; 347 348 if (sched_ctx->private_ctx) { 349 rte_free(sched_ctx->private_ctx); 350 sched_ctx->private_ctx = NULL; 351 } 352 353 mc_ctx = rte_zmalloc_socket(NULL, sizeof(struct mc_scheduler_ctx), 0, 354 rte_socket_id()); 355 if (!mc_ctx) { 356 CR_SCHED_LOG(ERR, "failed allocate memory"); 357 return -ENOMEM; 358 } 359 360 mc_ctx->num_workers = sched_ctx->nb_wc; 361 for (i = 0; i < sched_ctx->nb_wc; i++) { 362 char r_name[16]; 363 364 snprintf(r_name, sizeof(r_name), MC_SCHED_ENQ_RING_NAME_PREFIX 365 "%u_%u", dev->data->dev_id, i); 366 mc_ctx->sched_enq_ring[i] = rte_ring_lookup(r_name); 367 if (!mc_ctx->sched_enq_ring[i]) { 368 mc_ctx->sched_enq_ring[i] = rte_ring_create(r_name, 369 PER_WORKER_BUFF_SIZE, 370 rte_socket_id(), 371 RING_F_SC_DEQ | RING_F_SP_ENQ); 372 if (!mc_ctx->sched_enq_ring[i]) { 373 CR_SCHED_LOG(ERR, "Cannot create ring for worker %u", 374 i); 375 goto exit; 376 } 377 } 378 snprintf(r_name, sizeof(r_name), MC_SCHED_DEQ_RING_NAME_PREFIX 379 "%u_%u", dev->data->dev_id, i); 380 mc_ctx->sched_deq_ring[i] = rte_ring_lookup(r_name); 381 if (!mc_ctx->sched_deq_ring[i]) { 382 mc_ctx->sched_deq_ring[i] = rte_ring_create(r_name, 383 PER_WORKER_BUFF_SIZE, 384 rte_socket_id(), 385 RING_F_SC_DEQ | RING_F_SP_ENQ); 386 if (!mc_ctx->sched_deq_ring[i]) { 387 CR_SCHED_LOG(ERR, "Cannot create ring for worker %u", 388 i); 389 goto exit; 390 } 391 } 392 } 393 394 sched_ctx->private_ctx = (void *)mc_ctx; 395 396 return 0; 397 398 exit: 399 for (i = 0; i < sched_ctx->nb_wc; i++) { 400 rte_ring_free(mc_ctx->sched_enq_ring[i]); 401 rte_ring_free(mc_ctx->sched_deq_ring[i]); 402 } 403 rte_free(mc_ctx); 404 405 return -1; 406 } 407 408 static struct rte_cryptodev_scheduler_ops scheduler_mc_ops = { 409 worker_attach, 410 worker_detach, 411 scheduler_start, 412 scheduler_stop, 413 scheduler_config_qp, 414 scheduler_create_private_ctx, 415 NULL, /* option_set */ 416 NULL /* option_get */ 417 }; 418 419 static struct rte_cryptodev_scheduler mc_scheduler = { 420 .name = "multicore-scheduler", 421 .description = "scheduler which will run burst across multiple cpu cores", 422 .mode = CDEV_SCHED_MODE_MULTICORE, 423 .ops = &scheduler_mc_ops 424 }; 425 426 struct rte_cryptodev_scheduler *crypto_scheduler_multicore = &mc_scheduler; 427